various fixes for streams and retries on error
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 29 Aug 2024 23:32:08 +0000 (01:32 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 29 Aug 2024 23:32:08 +0000 (01:32 +0200)
music_assistant/common/models/streamdetails.py
music_assistant/server/controllers/player_queues.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/util.py
music_assistant/server/providers/spotify/__init__.py
music_assistant/server/server.py

index d33a25ce0e5bcf49d537b9d0abdca0e0bc92968a..f8caebb424dda6d004404ec7404d65433b106ae0 100644 (file)
@@ -59,6 +59,7 @@ class StreamDetails(DataClassDictMixin):
     queue_id: str | None = None
     seconds_streamed: float | None = None
     target_loudness: float | None = None
+    bypass_loudness_normalization: bool = False
 
     def __str__(self) -> str:
         """Return pretty printable string of object."""
index 1f01e1e44db0ef058005b401ec9f37654baeeeaf..8ff1059b039a65d822f65d4753308dbd898b4b57 100644 (file)
@@ -633,9 +633,17 @@ class PlayerQueuesController(CoreController):
         if (queue := self.get(queue_id)) is None or not queue.active:
             # TODO: forward to underlying player if not active
             return
-        current_index = self._queues[queue_id].current_index
-        if (next_index := self._get_next_index(queue_id, current_index, True)) is not None:
-            await self.play_index(queue_id, next_index)
+        idx = self._queues[queue_id].current_index
+        while True:
+            try:
+                if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
+                    await self.play_index(queue_id, next_index)
+                break
+            except MediaNotFoundError:
+                self.logger.warning(
+                    "Failed to fetch next track for queue %s - trying next item", queue.display_name
+                )
+                idx += 1
 
     @api_command("player_queues/previous")
     async def previous(self, queue_id: str) -> None:
index 2a179adfa802ea725f17ebed0f2ab04992a53370..495facbe5658498ea002b563e520843452ade32f 100644 (file)
@@ -287,10 +287,15 @@ class StreamsController(CoreController):
         if not queue_item:
             raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
         if not queue_item.streamdetails:
-            # raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
-            queue_item.streamdetails = await get_stream_details(
-                mass=self.mass, queue_item=queue_item
-            )
+            try:
+                queue_item.streamdetails = await get_stream_details(
+                    mass=self.mass, queue_item=queue_item
+                )
+            except Exception as e:
+                self.logger.error(
+                    "Failed to get streamdetails for QueueItem %s: %s", queue_item_id, e
+                )
+                raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
         # work out output format/details
         output_format = await self._get_output_format(
             output_format_str=request.match_info["fmt"],
@@ -775,7 +780,10 @@ class StreamsController(CoreController):
         extra_input_args = []
         # add loudnorm filter: volume normalization
         # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
-        if streamdetails.target_loudness is not None:
+        if (
+            streamdetails.target_loudness is not None
+            and not streamdetails.bypass_loudness_normalization
+        ):
             if streamdetails.loudness:
                 # we have a measurement so we can do linear mode
                 target_loudness = streamdetails.target_loudness
index be80692846c88577ac622233e44368f3ac7e4982..174ab9f8707c6ef9432e1f773b8f5b8db3202c4b 100644 (file)
@@ -383,28 +383,28 @@ async def get_stream_details(
     # handle skip/fade_in details
     streamdetails.seek_position = seek_position
     streamdetails.fade_in = fade_in
+    if not streamdetails.duration:
+        streamdetails.duration = queue_item.duration
     # handle volume normalization details
     is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
-    bypass_normalization = (
+    streamdetails.bypass_loudness_normalization = (
         is_radio
         and await mass.config.get_core_config_value("streams", CONF_BYPASS_NORMALIZATION_RADIO)
     ) or (
         streamdetails.duration is not None
-        and streamdetails.duration < 60
+        and streamdetails.duration < 30
         and await mass.config.get_core_config_value("streams", CONF_BYPASS_NORMALIZATION_SHORT)
     )
-    if not bypass_normalization and not streamdetails.loudness:
+    if not streamdetails.loudness:
         streamdetails.loudness = await mass.music.get_track_loudness(
             streamdetails.item_id, streamdetails.provider
         )
     player_settings = await mass.config.get_player_config(streamdetails.queue_id)
-    if bypass_normalization or not player_settings.get_value(CONF_VOLUME_NORMALIZATION):
+    if not player_settings.get_value(CONF_VOLUME_NORMALIZATION):
         streamdetails.target_loudness = None
     else:
         streamdetails.target_loudness = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET)
 
-    if not streamdetails.duration:
-        streamdetails.duration = queue_item.duration
     return streamdetails
 
 
@@ -958,10 +958,10 @@ def get_ffmpeg_args(
             str(output_format.sample_rate),
             "-ac",
             str(output_format.channels),
-            output_path,
         ]
         if output_format.output_format_str == "flac":
             output_args += ["-compression_level", "6"]
+        output_args += [output_path]
 
     # edge case: source file is not stereo - downmix to stereo
     if input_format.channels > 2 and output_format.channels == 2:
index 54ef01eadfb4a96be104b0c5fdbe51c43a80e08f..58505daae9375b461ce8128ed24d2ba48af32609 100644 (file)
@@ -47,19 +47,16 @@ async def install_package(package: str) -> None:
         raise RuntimeError(msg)
 
 
-async def get_package_version(pkg_name: str) -> str:
+async def get_package_version(pkg_name: str) -> str | None:
     """
     Return the version of an installed (python) package.
 
-    Will return `0.0.0` if the package is not found.
+    Will return None if the package is not found.
     """
     try:
-        installed_version = await asyncio.to_thread(pkg_version, pkg_name)
-        if installed_version is None:
-            return "0.0.0"  # type: ignore[unreachable]
-        return installed_version
+        return await asyncio.to_thread(pkg_version, pkg_name)
     except PackageNotFoundError:
-        return "0.0.0"
+        return None
 
 
 async def get_ips(include_ipv6: bool = False, ignore_loopback: bool = True) -> set[str]:
index dfd9fedf11526f891bd74c6ac690b2e0d3235b93..13388f09914aaf6e14e4a31b89f39ea9893f0db0 100644 (file)
@@ -20,6 +20,7 @@ from music_assistant.common.models.enums import (
     StreamType,
 )
 from music_assistant.common.models.errors import (
+    AudioError,
     LoginFailed,
     MediaNotFoundError,
     ResourceTemporarilyUnavailable,
@@ -561,6 +562,7 @@ class SpotifyProvider(MusicProvider):
         """Return the audio stream for the provider item."""
         auth_info = await self.login()
         librespot = await self.get_librespot_binary()
+        spotify_uri = f"spotify://track:{streamdetails.item_id}"
         args = [
             librespot,
             "-c",
@@ -573,7 +575,7 @@ class SpotifyProvider(MusicProvider):
             "--backend",
             "pipe",
             "--single-track",
-            f"spotify://track:{streamdetails.item_id}",
+            spotify_uri,
             "--token",
             auth_info["access_token"],
         ]
@@ -581,14 +583,25 @@ class SpotifyProvider(MusicProvider):
             args += ["--start-position", str(int(seek_position))]
         chunk_size = get_chunksize(streamdetails.audio_format)
         stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False
-        async with AsyncProcess(
-            args,
-            stdout=True,
-            stderr=stderr,
-            name="librespot",
-        ) as librespot_proc:
-            async for chunk in librespot_proc.iter_any(chunk_size):
-                yield chunk
+        self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot")
+        for retry in (True, False):
+            async with AsyncProcess(
+                args,
+                stdout=True,
+                stderr=stderr,
+                name="librespot",
+            ) as librespot_proc:
+                async for chunk in librespot_proc.iter_any(chunk_size):
+                    yield chunk
+                if librespot_proc.returncode == 0:
+                    self.logger.log(VERBOSE_LOG_LEVEL, f"Streaming {spotify_uri} ready.")
+                    break
+                if not retry:
+                    raise AudioError(
+                        f"Failed to stream {spotify_uri} - error: {librespot_proc.returncode}"
+                    )
+                # do one retry attempt
+                auth_info = await self.login(force_refresh=True)
 
     def _parse_artist(self, artist_obj):
         """Parse spotify artist object to generic layout."""
@@ -770,10 +783,12 @@ class SpotifyProvider(MusicProvider):
         playlist.cache_checksum = str(playlist_obj["snapshot_id"])
         return playlist
 
-    async def login(self, retry: bool = True) -> dict:
+    async def login(self, retry: bool = True, force_refresh: bool = False) -> dict:
         """Log-in Spotify and return Auth/token info."""
         # return existing token if we have one in memory
-        if self._auth_info and (self._auth_info["expires_at"] > (time.time() - 300)):
+        if self._auth_info and (
+            self._auth_info["expires_at"] > (time.time() - 1800 if force_refresh else 120)
+        ):
             return self._auth_info
         # request new access token using the refresh token
         if not (refresh_token := self.config.get_value(CONF_REFRESH_TOKEN)):
index 905e12584381d79e45b1423730c62cdd7535930f..bf7225dbbda15b8a7cdc416a66c08ef0febc5640 100644 (file)
@@ -106,7 +106,7 @@ class MusicAssistant:
         """Start running the Music Assistant server."""
         self.loop = asyncio.get_running_loop()
         self.running_as_hass_addon = await is_hass_supervisor()
-        self.version = await get_package_version("music_assistant")
+        self.version = await get_package_version("music_assistant") or "0.0.0"
         # create shared zeroconf instance
         # TODO: enumerate interfaces and enable IPv6 support
         self.aiozc = AsyncZeroconf(ip_version=IPVersion.V4Only)