A few small fixes (#1151)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 18 Mar 2024 15:33:08 +0000 (16:33 +0100)
committerGitHub <noreply@github.com>
Mon, 18 Mar 2024 15:33:08 +0000 (16:33 +0100)
music_assistant/server/controllers/streams.py
music_assistant/server/controllers/webserver.py
music_assistant/server/helpers/audio.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/filesystem_local/__init__.py
music_assistant/server/providers/filesystem_local/base.py
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/providers/spotify/__init__.py

index 26d7bd10663bb1ba65f243a1bba8fd984023fd7d..f0f706ff184b96788b7229153d0a5db3a0c3c858 100644 (file)
@@ -274,7 +274,7 @@ class StreamsController(CoreController):
         """Return all Config Entries for this core module (if any)."""
         default_ip = await get_ip()
         all_ips = await get_ips()
-        default_port = await select_free_port(8096, 9200)
+        default_port = await select_free_port(8097, 9200)
         return (
             ConfigEntry(
                 key=CONF_BIND_PORT,
index 095ccca63c1f3d4a92b56a76789ac802544567e6..6140b035b7d1702d917a68032a82f697ba088afe 100644 (file)
@@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Any, Final
 from aiohttp import WSMsgType, web
 from music_assistant_frontend import where as locate_frontend
 
-from music_assistant.common.helpers.util import get_ip, select_free_port
+from music_assistant.common.helpers.util import get_ip
 from music_assistant.common.models.api import (
     ChunkedResultMessage,
     CommandMessage,
@@ -102,8 +102,7 @@ class WebserverController(CoreController):
         # HA supervisor not present: user is responsible for securing the webserver
         # we give the tools to do so by presenting config options
         all_ips = await get_ips()
-        default_port = await select_free_port(8095, 9200)
-        default_base_url = f"http://{default_publish_ip}:{default_port}"
+        default_base_url = f"http://{default_publish_ip}:{DEFAULT_SERVER_PORT}"
         return (
             ConfigEntry(
                 key=CONF_BASE_URL,
@@ -117,7 +116,7 @@ class WebserverController(CoreController):
             ConfigEntry(
                 key=CONF_BIND_PORT,
                 type=ConfigEntryType.INTEGER,
-                default_value=default_port,
+                default_value=DEFAULT_SERVER_PORT,
                 label="TCP Port",
                 description="The TCP port to run the webserver.",
             ),
index 884493fea3f4b4be294ffe1b689e783107fe627b..32f21912929b85948af2cd3d9cd08f44c213c3ec 100644 (file)
@@ -729,13 +729,6 @@ async def get_ffmpeg_stream(
     """
     logger = LOGGER.getChild("media_stream")
     use_stdin = not isinstance(audio_input, str)
-    if input_format == output_format and not filter_params and not chunk_size and use_stdin:
-        # edge case: input and output exactly the same, we can bypass ffmpeg
-        # return the raw input stream, no actions needed here
-        async for chunk in audio_input:
-            yield chunk
-        return
-
     ffmpeg_args = _get_ffmpeg_args(
         input_format=input_format,
         output_format=output_format,
index 08071ac25b5a0216af47006ecc4ab592fbe6c152..58faa52da66c5350b979c32b00f189ea06797954 100644 (file)
@@ -262,25 +262,34 @@ class AirplayStreamJob:
         self._log_reader_task = asyncio.create_task(self._log_watcher())
         self._audio_reader_task = asyncio.create_task(self._audio_reader())
 
-    async def stop(self):
+    async def stop(self, wait: bool = True):
         """Stop playback and cleanup."""
         if not self.running:
             return
-        # always stop the audio feeder
-        if self._audio_reader_task and not self._audio_reader_task.done():
-            with suppress(asyncio.CancelledError):
-                self._audio_reader_task.cancel()
-                await self._audio_reader_task
-        await self.send_cli_command("ACTION=STOP")
         self._stop_requested = True
-        with suppress(TimeoutError):
-            await asyncio.wait_for(self._cliraop_proc.wait(), 5)
-        if self._cliraop_proc.returncode is None:
-            self._cliraop_proc.kill()
+        # send stop with cli command
+        await self.send_cli_command("ACTION=STOP")
+
+        async def wait_for_stop() -> None:
+            # always stop the audio feeder
+            if self._audio_reader_task and not self._audio_reader_task.done():
+                with suppress(asyncio.CancelledError):
+                    self._audio_reader_task.cancel()
+            # make sure stdin is drained (otherwise we'll deadlock)
+            if self._cliraop_proc and self._cliraop_proc.returncode is None:
+                if self._cliraop_proc.stdin.can_write_eof():
+                    self._cliraop_proc.stdin.write_eof()
+                with suppress(BrokenPipeError):
+                    await self._cliraop_proc.stdin.drain()
+            await asyncio.wait_for(self._cliraop_proc.wait(), 30)
+
+        task = self.mass.create_task(wait_for_stop())
+        if wait:
+            await task
 
     async def send_cli_command(self, command: str) -> None:
         """Send an interactive command to the running CLIRaop binary."""
-        if not self.running:
+        if not (self._cliraop_proc and self._cliraop_proc.returncode is None):
             return
 
         named_pipe = f"/tmp/fifo-{self.active_remote_id}"  # noqa: S108
@@ -299,7 +308,6 @@ class AirplayStreamJob:
         airplay_player = self.airplay_player
         mass_player = self.mass.players.get(airplay_player.player_id)
         logger = airplay_player.logger
-        airplay_player.logger.debug("Starting log watcher task...")
         lost_packets = 0
         async for line in self._cliraop_proc.stderr:
             line = line.decode().strip()  # noqa: PLW2901
@@ -321,11 +329,6 @@ class AirplayStreamJob:
                 mass_player.state = PlayerState.PLAYING
                 self.mass.players.update(airplay_player.player_id)
                 continue
-            if "Stopped at" in line:
-                logger.debug("raop streaming stopped")
-                mass_player.state = PlayerState.IDLE
-                self.mass.players.update(airplay_player.player_id)
-                continue
             if "restarting w/o pause" in line:
                 # streaming has started
                 logger.debug("raop streaming started")
@@ -337,9 +340,9 @@ class AirplayStreamJob:
             if "lost packet out of backlog" in line:
                 lost_packets += 1
                 if lost_packets == 50:
-                    logger.warning("High packet loss detected, restart playback...")
+                    logger.warning("High packet loss detected, stopping playback...")
                     queue = self.mass.player_queues.get_active_queue(mass_player.player_id)
-                    await self.mass.player_queues.resume(queue.queue_id)
+                    await self.mass.player_queues.stop(queue.queue_id)
                 else:
                     logger.debug(line)
                 continue
@@ -351,10 +354,7 @@ class AirplayStreamJob:
             "CLIRaop process stopped with errorcode %s",
             self._cliraop_proc.returncode,
         )
-        if not airplay_player.active_stream or (
-            airplay_player.active_stream
-            and airplay_player.active_stream.active_remote_id == self.active_remote_id
-        ):
+        if airplay_player.active_stream == self:
             mass_player.state = PlayerState.IDLE
             self.mass.players.update(airplay_player.player_id)
 
@@ -378,6 +378,8 @@ class AirplayStreamJob:
                 await self._cliraop_proc.stdin.drain()
             except (BrokenPipeError, ConnectionResetError):
                 break
+            if not self.running:
+                return
             # send metadata to player(s) if needed
             # NOTE: this must all be done in separate tasks to not disturb audio
             now = time.time()
@@ -407,6 +409,8 @@ class AirplayStreamJob:
 
     async def _send_metadata(self, queue: PlayerQueue) -> None:
         """Send metadata to player (and connected sync childs)."""
+        if not self.running:
+            return
         if not queue or not queue.current_item:
             return
         duration = min(queue.current_item.duration or 0, 3600)
@@ -445,6 +449,8 @@ class AirplayStreamJob:
 
     async def _send_progress(self, queue: PlayerQueue) -> None:
         """Send progress report to player (and connected sync childs)."""
+        if not self.running:
+            return
         if not queue or not queue.current_item:
             return
         progress = int(queue.corrected_elapsed_time)
@@ -613,7 +619,7 @@ class AirplayProvider(PlayerProvider):
         # always stop existing stream first
         for airplay_player in self._get_sync_clients(player_id):
             if airplay_player.active_stream and airplay_player.active_stream.running:
-                await airplay_player.active_stream.stop()
+                await airplay_player.active_stream.stop(wait=False)
         pcm_format = AudioFormat(
             content_type=ContentType.PCM_S16LE,
             sample_rate=44100,
index 5ae0a780678cff02121e4347fb9541eab9ffe6b2..86a32d77a29dcac7e45ebbbdfff656476aab51ae 100644 (file)
@@ -116,8 +116,6 @@ class LocalFileSystemProvider(FileSystemProviderBase):
 
         """
         abs_path = get_absolute_path(self.base_path, path)
-        rel_path = get_relative_path(self.base_path, path)
-        self.logger.debug("Processing: %s", rel_path)
         entries = await asyncio.to_thread(os.scandir, abs_path)
         for entry in entries:
             if entry.name.startswith(".") or any(x in entry.name for x in IGNORE_DIRS):
index fb3869508fe83d22d647dd5c3d5e3e35558319f0..974c7cd73b5a4766c56a0d0adcc22d3bbf457bc4 100644 (file)
@@ -340,7 +340,7 @@ class FileSystemProviderBase(MusicProvider):
                 # continue if the item did not change (checksum still the same)
                 if item.checksum == prev_checksums.get(item.path):
                     continue
-
+                self.logger.debug("Processing: %s", item.path)
                 if item.ext in TRACK_EXTENSIONS:
                     # add/update track to db
                     track = await self._parse_track(item)
index 7a29bf7950b9c22264c351b8cb865c60f37e7ea5..b6becf6ac8efc9a6e2742fcb157e930b0f769f61 100644 (file)
@@ -424,5 +424,7 @@ class SnapCastProvider(PlayerProvider):
                 data = data.decode().strip()  # noqa: PLW2901
                 for line in data.split("\n"):
                     logger.debug(line)
-                    if "Name now registered and active" in line:
-                        self._snapserver_started.set()
+                    if "(Snapserver) Version 0.27.0" in line:
+                        # delay init a small bit to prevent race conditions
+                        # where we try to connect too soon
+                        self.mass.loop.call_later(2, self._snapserver_started.set)
index 7733d2160cf2760f8765d71eb9adbda4fa47b105..3a64e30d817623834ae012d5db718aed2ff23592 100644 (file)
@@ -12,7 +12,6 @@ from json.decoder import JSONDecodeError
 from tempfile import gettempdir
 from typing import TYPE_CHECKING, Any
 
-import aiohttp
 from asyncio_throttle import Throttler
 
 from music_assistant.common.helpers.json import json_loads
@@ -173,33 +172,31 @@ class SpotifyProvider(MusicProvider):
             searchtypes.append("playlist")
         searchtype = ",".join(searchtypes)
         search_query = search_query.replace("'", "")
-        if searchresult := await self._get_data(
-            "search", q=search_query, type=searchtype, limit=limit
-        ):
-            if "artists" in searchresult:
-                result.artists += [
-                    await self._parse_artist(item)
-                    for item in searchresult["artists"]["items"]
-                    if (item and item["id"])
-                ]
-            if "albums" in searchresult:
-                result.albums += [
-                    await self._parse_album(item)
-                    for item in searchresult["albums"]["items"]
-                    if (item and item["id"])
-                ]
-            if "tracks" in searchresult:
-                result.tracks += [
-                    await self._parse_track(item)
-                    for item in searchresult["tracks"]["items"]
-                    if (item and item["id"])
-                ]
-            if "playlists" in searchresult:
-                result.playlists += [
-                    await self._parse_playlist(item)
-                    for item in searchresult["playlists"]["items"]
-                    if (item and item["id"])
-                ]
+        searchresult = await self._get_data("search", q=search_query, type=searchtype, limit=limit)
+        if "artists" in searchresult:
+            result.artists += [
+                await self._parse_artist(item)
+                for item in searchresult["artists"]["items"]
+                if (item and item["id"])
+            ]
+        if "albums" in searchresult:
+            result.albums += [
+                await self._parse_album(item)
+                for item in searchresult["albums"]["items"]
+                if (item and item["id"])
+            ]
+        if "tracks" in searchresult:
+            result.tracks += [
+                await self._parse_track(item)
+                for item in searchresult["tracks"]["items"]
+                if (item and item["id"])
+            ]
+        if "playlists" in searchresult:
+            result.playlists += [
+                await self._parse_playlist(item)
+                for item in searchresult["playlists"]["items"]
+                if (item and item["id"])
+            ]
         return result
 
     async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
@@ -241,35 +238,29 @@ class SpotifyProvider(MusicProvider):
     async def get_artist(self, prov_artist_id) -> Artist:
         """Get full artist details by id."""
         artist_obj = await self._get_data(f"artists/{prov_artist_id}")
-        return await self._parse_artist(artist_obj) if artist_obj else None
+        return await self._parse_artist(artist_obj)
 
     async def get_album(self, prov_album_id) -> Album:
         """Get full album details by id."""
-        if album_obj := await self._get_data(f"albums/{prov_album_id}"):
-            return await self._parse_album(album_obj)
-        msg = f"Item {prov_album_id} not found"
-        raise MediaNotFoundError(msg)
+        album_obj = await self._get_data(f"albums/{prov_album_id}")
+        return await self._parse_album(album_obj)
 
     async def get_track(self, prov_track_id) -> Track:
         """Get full track details by id."""
-        if track_obj := await self._get_data(f"tracks/{prov_track_id}"):
-            return await self._parse_track(track_obj)
-        msg = f"Item {prov_track_id} not found"
-        raise MediaNotFoundError(msg)
+        track_obj = await self._get_data(f"tracks/{prov_track_id}")
+        return await self._parse_track(track_obj)
 
     async def get_playlist(self, prov_playlist_id) -> Playlist:
         """Get full playlist details by id."""
-        if playlist_obj := await self._get_data(f"playlists/{prov_playlist_id}"):
-            return await self._parse_playlist(playlist_obj)
-        msg = f"Item {prov_playlist_id} not found"
-        raise MediaNotFoundError(msg)
+        playlist_obj = await self._get_data(f"playlists/{prov_playlist_id}")
+        return await self._parse_playlist(playlist_obj)
 
     async def get_album_tracks(self, prov_album_id) -> list[AlbumTrack]:
         """Get all album tracks for given album id."""
         return [
             await self._parse_track(item)
             for item in await self._get_all_items(f"albums/{prov_album_id}/tracks")
-            if (item and item["id"])
+            if item["id"]
         ]
 
     async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[PlaylistTrack, None]:
@@ -733,7 +724,7 @@ class SpotifyProvider(MusicProvider):
                 break
         return all_items
 
-    async def _get_data(self, endpoint, **kwargs):
+    async def _get_data(self, endpoint, **kwargs) -> dict[str, Any]:
         """Get data from api."""
         url = f"https://api.spotify.com/v1/{endpoint}"
         kwargs["market"] = "from_token"
@@ -742,74 +733,78 @@ class SpotifyProvider(MusicProvider):
         if tokeninfo is None:
             tokeninfo = await self.login()
         headers = {"Authorization": f'Bearer {tokeninfo["accessToken"]}'}
-        async with self._throttler:
-            time_start = time.time()
-            result = None
-            try:
-                async with self.mass.http_session.get(
-                    url, headers=headers, params=kwargs, ssl=False, timeout=120
-                ) as response:
-                    # handle spotify rate limiter
-                    if response.status == 429:
-                        backoff_time = int(response.headers["Retry-After"])
-                        self.logger.debug(
-                            "Waiting %s seconds on Spotify rate limiter", backoff_time
-                        )
-                        await asyncio.sleep(backoff_time)
-                        return await self._get_data(endpoint, **kwargs)
-                    # get text before json so we can log the body in case of errors
-                    result = await response.text()
-                    result = json_loads(result)
-                    if "error" in result or ("status" in result and "error" in result["status"]):
-                        self.logger.error("%s - %s", endpoint, result)
-                        return None
-            except (
-                aiohttp.ContentTypeError,
-                JSONDecodeError,
-            ):
-                self.logger.error("Error while processing %s: %s", endpoint, result)
-                return None
-            self.logger.debug(
-                "Processing GET/%s took %s seconds",
-                endpoint,
-                round(time.time() - time_start, 2),
-            )
-            return result
-
-    async def _delete_data(self, endpoint, data=None, **kwargs):
+        async with self._throttler, self.mass.http_session.get(
+            url, headers=headers, params=kwargs, ssl=True, timeout=120
+        ) as response:
+            # handle spotify rate limiter
+            if response.status == 429:
+                backoff_time = int(response.headers["Retry-After"])
+                self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time)
+                await asyncio.sleep(backoff_time)
+                return await self._get_data(endpoint, **kwargs)
+            # handle temporary server error
+            if response.status == 503:
+                self.logger.debug(
+                    "Request to %s failed with 503 error, retrying in 30 seconds...",
+                    endpoint,
+                )
+                await asyncio.sleep(30)
+                return await self._get_data(endpoint, **kwargs)
+            # handle 404 not found, convert to MediaNotFoundError
+            if response.status == 404:
+                raise MediaNotFoundError(f"{endpoint} not found")
+            response.raise_for_status()
+            return await response.json(loads=json_loads)
+
+    async def _delete_data(self, endpoint, data=None, **kwargs) -> str:
         """Delete data from api."""
         url = f"https://api.spotify.com/v1/{endpoint}"
         token = await self.login()
-        if not token:
-            return None
         headers = {"Authorization": f'Bearer {token["accessToken"]}'}
         async with self.mass.http_session.delete(
             url, headers=headers, params=kwargs, json=data, ssl=False
         ) as response:
+            # handle spotify rate limiter
+            if response.status == 429:
+                backoff_time = int(response.headers["Retry-After"])
+                self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time)
+                await asyncio.sleep(backoff_time)
+                return await self._delete_data(endpoint, data=data, **kwargs)
+            response.raise_for_status()
             return await response.text()
 
-    async def _put_data(self, endpoint, data=None, **kwargs):
+    async def _put_data(self, endpoint, data=None, **kwargs) -> str:
         """Put data on api."""
         url = f"https://api.spotify.com/v1/{endpoint}"
         token = await self.login()
-        if not token:
-            return None
         headers = {"Authorization": f'Bearer {token["accessToken"]}'}
         async with self.mass.http_session.put(
             url, headers=headers, params=kwargs, json=data, ssl=False
         ) as response:
+            # handle spotify rate limiter
+            if response.status == 429:
+                backoff_time = int(response.headers["Retry-After"])
+                self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time)
+                await asyncio.sleep(backoff_time)
+                return await self._put_data(endpoint, data=data, **kwargs)
+            response.raise_for_status()
             return await response.text()
 
-    async def _post_data(self, endpoint, data=None, **kwargs):
+    async def _post_data(self, endpoint, data=None, **kwargs) -> str:
         """Post data on api."""
         url = f"https://api.spotify.com/v1/{endpoint}"
         token = await self.login()
-        if not token:
-            return None
         headers = {"Authorization": f'Bearer {token["accessToken"]}'}
         async with self.mass.http_session.post(
             url, headers=headers, params=kwargs, json=data, ssl=False
         ) as response:
+            # handle spotify rate limiter
+            if response.status == 429:
+                backoff_time = int(response.headers["Retry-After"])
+                self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time)
+                await asyncio.sleep(backoff_time)
+                return await self._post_data(endpoint, data=data, **kwargs)
+            response.raise_for_status()
             return await response.text()
 
     async def get_librespot_binary(self):