From 4938443c4feacc76d8ac28c0b233410bed22c50d Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Mon, 18 Mar 2024 16:33:08 +0100 Subject: [PATCH] A few small fixes (#1151) --- music_assistant/server/controllers/streams.py | 2 +- .../server/controllers/webserver.py | 7 +- music_assistant/server/helpers/audio.py | 7 - .../server/providers/airplay/__init__.py | 56 +++--- .../providers/filesystem_local/__init__.py | 2 - .../server/providers/filesystem_local/base.py | 2 +- .../server/providers/snapcast/__init__.py | 6 +- .../server/providers/spotify/__init__.py | 167 +++++++++--------- 8 files changed, 121 insertions(+), 128 deletions(-) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 26d7bd10..f0f706ff 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -274,7 +274,7 @@ class StreamsController(CoreController): """Return all Config Entries for this core module (if any).""" default_ip = await get_ip() all_ips = await get_ips() - default_port = await select_free_port(8096, 9200) + default_port = await select_free_port(8097, 9200) return ( ConfigEntry( key=CONF_BIND_PORT, diff --git a/music_assistant/server/controllers/webserver.py b/music_assistant/server/controllers/webserver.py index 095ccca6..6140b035 100644 --- a/music_assistant/server/controllers/webserver.py +++ b/music_assistant/server/controllers/webserver.py @@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Any, Final from aiohttp import WSMsgType, web from music_assistant_frontend import where as locate_frontend -from music_assistant.common.helpers.util import get_ip, select_free_port +from music_assistant.common.helpers.util import get_ip from music_assistant.common.models.api import ( ChunkedResultMessage, CommandMessage, @@ -102,8 +102,7 @@ class WebserverController(CoreController): # HA supervisor not present: user is responsible for securing the webserver # we give the tools to do so by presenting config options all_ips = await get_ips() - default_port = await select_free_port(8095, 9200) - default_base_url = f"http://{default_publish_ip}:{default_port}" + default_base_url = f"http://{default_publish_ip}:{DEFAULT_SERVER_PORT}" return ( ConfigEntry( key=CONF_BASE_URL, @@ -117,7 +116,7 @@ class WebserverController(CoreController): ConfigEntry( key=CONF_BIND_PORT, type=ConfigEntryType.INTEGER, - default_value=default_port, + default_value=DEFAULT_SERVER_PORT, label="TCP Port", description="The TCP port to run the webserver.", ), diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 884493fe..32f21912 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -729,13 +729,6 @@ async def get_ffmpeg_stream( """ logger = LOGGER.getChild("media_stream") use_stdin = not isinstance(audio_input, str) - if input_format == output_format and not filter_params and not chunk_size and use_stdin: - # edge case: input and output exactly the same, we can bypass ffmpeg - # return the raw input stream, no actions needed here - async for chunk in audio_input: - yield chunk - return - ffmpeg_args = _get_ffmpeg_args( input_format=input_format, output_format=output_format, diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 08071ac2..58faa52d 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -262,25 +262,34 @@ class AirplayStreamJob: self._log_reader_task = asyncio.create_task(self._log_watcher()) self._audio_reader_task = asyncio.create_task(self._audio_reader()) - async def stop(self): + async def stop(self, wait: bool = True): """Stop playback and cleanup.""" if not self.running: return - # always stop the audio feeder - if self._audio_reader_task and not self._audio_reader_task.done(): - with suppress(asyncio.CancelledError): - self._audio_reader_task.cancel() - await self._audio_reader_task - await self.send_cli_command("ACTION=STOP") self._stop_requested = True - with suppress(TimeoutError): - await asyncio.wait_for(self._cliraop_proc.wait(), 5) - if self._cliraop_proc.returncode is None: - self._cliraop_proc.kill() + # send stop with cli command + await self.send_cli_command("ACTION=STOP") + + async def wait_for_stop() -> None: + # always stop the audio feeder + if self._audio_reader_task and not self._audio_reader_task.done(): + with suppress(asyncio.CancelledError): + self._audio_reader_task.cancel() + # make sure stdin is drained (otherwise we'll deadlock) + if self._cliraop_proc and self._cliraop_proc.returncode is None: + if self._cliraop_proc.stdin.can_write_eof(): + self._cliraop_proc.stdin.write_eof() + with suppress(BrokenPipeError): + await self._cliraop_proc.stdin.drain() + await asyncio.wait_for(self._cliraop_proc.wait(), 30) + + task = self.mass.create_task(wait_for_stop()) + if wait: + await task async def send_cli_command(self, command: str) -> None: """Send an interactive command to the running CLIRaop binary.""" - if not self.running: + if not (self._cliraop_proc and self._cliraop_proc.returncode is None): return named_pipe = f"/tmp/fifo-{self.active_remote_id}" # noqa: S108 @@ -299,7 +308,6 @@ class AirplayStreamJob: airplay_player = self.airplay_player mass_player = self.mass.players.get(airplay_player.player_id) logger = airplay_player.logger - airplay_player.logger.debug("Starting log watcher task...") lost_packets = 0 async for line in self._cliraop_proc.stderr: line = line.decode().strip() # noqa: PLW2901 @@ -321,11 +329,6 @@ class AirplayStreamJob: mass_player.state = PlayerState.PLAYING self.mass.players.update(airplay_player.player_id) continue - if "Stopped at" in line: - logger.debug("raop streaming stopped") - mass_player.state = PlayerState.IDLE - self.mass.players.update(airplay_player.player_id) - continue if "restarting w/o pause" in line: # streaming has started logger.debug("raop streaming started") @@ -337,9 +340,9 @@ class AirplayStreamJob: if "lost packet out of backlog" in line: lost_packets += 1 if lost_packets == 50: - logger.warning("High packet loss detected, restart playback...") + logger.warning("High packet loss detected, stopping playback...") queue = self.mass.player_queues.get_active_queue(mass_player.player_id) - await self.mass.player_queues.resume(queue.queue_id) + await self.mass.player_queues.stop(queue.queue_id) else: logger.debug(line) continue @@ -351,10 +354,7 @@ class AirplayStreamJob: "CLIRaop process stopped with errorcode %s", self._cliraop_proc.returncode, ) - if not airplay_player.active_stream or ( - airplay_player.active_stream - and airplay_player.active_stream.active_remote_id == self.active_remote_id - ): + if airplay_player.active_stream == self: mass_player.state = PlayerState.IDLE self.mass.players.update(airplay_player.player_id) @@ -378,6 +378,8 @@ class AirplayStreamJob: await self._cliraop_proc.stdin.drain() except (BrokenPipeError, ConnectionResetError): break + if not self.running: + return # send metadata to player(s) if needed # NOTE: this must all be done in separate tasks to not disturb audio now = time.time() @@ -407,6 +409,8 @@ class AirplayStreamJob: async def _send_metadata(self, queue: PlayerQueue) -> None: """Send metadata to player (and connected sync childs).""" + if not self.running: + return if not queue or not queue.current_item: return duration = min(queue.current_item.duration or 0, 3600) @@ -445,6 +449,8 @@ class AirplayStreamJob: async def _send_progress(self, queue: PlayerQueue) -> None: """Send progress report to player (and connected sync childs).""" + if not self.running: + return if not queue or not queue.current_item: return progress = int(queue.corrected_elapsed_time) @@ -613,7 +619,7 @@ class AirplayProvider(PlayerProvider): # always stop existing stream first for airplay_player in self._get_sync_clients(player_id): if airplay_player.active_stream and airplay_player.active_stream.running: - await airplay_player.active_stream.stop() + await airplay_player.active_stream.stop(wait=False) pcm_format = AudioFormat( content_type=ContentType.PCM_S16LE, sample_rate=44100, diff --git a/music_assistant/server/providers/filesystem_local/__init__.py b/music_assistant/server/providers/filesystem_local/__init__.py index 5ae0a780..86a32d77 100644 --- a/music_assistant/server/providers/filesystem_local/__init__.py +++ b/music_assistant/server/providers/filesystem_local/__init__.py @@ -116,8 +116,6 @@ class LocalFileSystemProvider(FileSystemProviderBase): """ abs_path = get_absolute_path(self.base_path, path) - rel_path = get_relative_path(self.base_path, path) - self.logger.debug("Processing: %s", rel_path) entries = await asyncio.to_thread(os.scandir, abs_path) for entry in entries: if entry.name.startswith(".") or any(x in entry.name for x in IGNORE_DIRS): diff --git a/music_assistant/server/providers/filesystem_local/base.py b/music_assistant/server/providers/filesystem_local/base.py index fb386950..974c7cd7 100644 --- a/music_assistant/server/providers/filesystem_local/base.py +++ b/music_assistant/server/providers/filesystem_local/base.py @@ -340,7 +340,7 @@ class FileSystemProviderBase(MusicProvider): # continue if the item did not change (checksum still the same) if item.checksum == prev_checksums.get(item.path): continue - + self.logger.debug("Processing: %s", item.path) if item.ext in TRACK_EXTENSIONS: # add/update track to db track = await self._parse_track(item) diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 7a29bf79..b6becf6a 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -424,5 +424,7 @@ class SnapCastProvider(PlayerProvider): data = data.decode().strip() # noqa: PLW2901 for line in data.split("\n"): logger.debug(line) - if "Name now registered and active" in line: - self._snapserver_started.set() + if "(Snapserver) Version 0.27.0" in line: + # delay init a small bit to prevent race conditions + # where we try to connect too soon + self.mass.loop.call_later(2, self._snapserver_started.set) diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index 7733d216..3a64e30d 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -12,7 +12,6 @@ from json.decoder import JSONDecodeError from tempfile import gettempdir from typing import TYPE_CHECKING, Any -import aiohttp from asyncio_throttle import Throttler from music_assistant.common.helpers.json import json_loads @@ -173,33 +172,31 @@ class SpotifyProvider(MusicProvider): searchtypes.append("playlist") searchtype = ",".join(searchtypes) search_query = search_query.replace("'", "") - if searchresult := await self._get_data( - "search", q=search_query, type=searchtype, limit=limit - ): - if "artists" in searchresult: - result.artists += [ - await self._parse_artist(item) - for item in searchresult["artists"]["items"] - if (item and item["id"]) - ] - if "albums" in searchresult: - result.albums += [ - await self._parse_album(item) - for item in searchresult["albums"]["items"] - if (item and item["id"]) - ] - if "tracks" in searchresult: - result.tracks += [ - await self._parse_track(item) - for item in searchresult["tracks"]["items"] - if (item and item["id"]) - ] - if "playlists" in searchresult: - result.playlists += [ - await self._parse_playlist(item) - for item in searchresult["playlists"]["items"] - if (item and item["id"]) - ] + searchresult = await self._get_data("search", q=search_query, type=searchtype, limit=limit) + if "artists" in searchresult: + result.artists += [ + await self._parse_artist(item) + for item in searchresult["artists"]["items"] + if (item and item["id"]) + ] + if "albums" in searchresult: + result.albums += [ + await self._parse_album(item) + for item in searchresult["albums"]["items"] + if (item and item["id"]) + ] + if "tracks" in searchresult: + result.tracks += [ + await self._parse_track(item) + for item in searchresult["tracks"]["items"] + if (item and item["id"]) + ] + if "playlists" in searchresult: + result.playlists += [ + await self._parse_playlist(item) + for item in searchresult["playlists"]["items"] + if (item and item["id"]) + ] return result async def get_library_artists(self) -> AsyncGenerator[Artist, None]: @@ -241,35 +238,29 @@ class SpotifyProvider(MusicProvider): async def get_artist(self, prov_artist_id) -> Artist: """Get full artist details by id.""" artist_obj = await self._get_data(f"artists/{prov_artist_id}") - return await self._parse_artist(artist_obj) if artist_obj else None + return await self._parse_artist(artist_obj) async def get_album(self, prov_album_id) -> Album: """Get full album details by id.""" - if album_obj := await self._get_data(f"albums/{prov_album_id}"): - return await self._parse_album(album_obj) - msg = f"Item {prov_album_id} not found" - raise MediaNotFoundError(msg) + album_obj = await self._get_data(f"albums/{prov_album_id}") + return await self._parse_album(album_obj) async def get_track(self, prov_track_id) -> Track: """Get full track details by id.""" - if track_obj := await self._get_data(f"tracks/{prov_track_id}"): - return await self._parse_track(track_obj) - msg = f"Item {prov_track_id} not found" - raise MediaNotFoundError(msg) + track_obj = await self._get_data(f"tracks/{prov_track_id}") + return await self._parse_track(track_obj) async def get_playlist(self, prov_playlist_id) -> Playlist: """Get full playlist details by id.""" - if playlist_obj := await self._get_data(f"playlists/{prov_playlist_id}"): - return await self._parse_playlist(playlist_obj) - msg = f"Item {prov_playlist_id} not found" - raise MediaNotFoundError(msg) + playlist_obj = await self._get_data(f"playlists/{prov_playlist_id}") + return await self._parse_playlist(playlist_obj) async def get_album_tracks(self, prov_album_id) -> list[AlbumTrack]: """Get all album tracks for given album id.""" return [ await self._parse_track(item) for item in await self._get_all_items(f"albums/{prov_album_id}/tracks") - if (item and item["id"]) + if item["id"] ] async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[PlaylistTrack, None]: @@ -733,7 +724,7 @@ class SpotifyProvider(MusicProvider): break return all_items - async def _get_data(self, endpoint, **kwargs): + async def _get_data(self, endpoint, **kwargs) -> dict[str, Any]: """Get data from api.""" url = f"https://api.spotify.com/v1/{endpoint}" kwargs["market"] = "from_token" @@ -742,74 +733,78 @@ class SpotifyProvider(MusicProvider): if tokeninfo is None: tokeninfo = await self.login() headers = {"Authorization": f'Bearer {tokeninfo["accessToken"]}'} - async with self._throttler: - time_start = time.time() - result = None - try: - async with self.mass.http_session.get( - url, headers=headers, params=kwargs, ssl=False, timeout=120 - ) as response: - # handle spotify rate limiter - if response.status == 429: - backoff_time = int(response.headers["Retry-After"]) - self.logger.debug( - "Waiting %s seconds on Spotify rate limiter", backoff_time - ) - await asyncio.sleep(backoff_time) - return await self._get_data(endpoint, **kwargs) - # get text before json so we can log the body in case of errors - result = await response.text() - result = json_loads(result) - if "error" in result or ("status" in result and "error" in result["status"]): - self.logger.error("%s - %s", endpoint, result) - return None - except ( - aiohttp.ContentTypeError, - JSONDecodeError, - ): - self.logger.error("Error while processing %s: %s", endpoint, result) - return None - self.logger.debug( - "Processing GET/%s took %s seconds", - endpoint, - round(time.time() - time_start, 2), - ) - return result - - async def _delete_data(self, endpoint, data=None, **kwargs): + async with self._throttler, self.mass.http_session.get( + url, headers=headers, params=kwargs, ssl=True, timeout=120 + ) as response: + # handle spotify rate limiter + if response.status == 429: + backoff_time = int(response.headers["Retry-After"]) + self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time) + await asyncio.sleep(backoff_time) + return await self._get_data(endpoint, **kwargs) + # handle temporary server error + if response.status == 503: + self.logger.debug( + "Request to %s failed with 503 error, retrying in 30 seconds...", + endpoint, + ) + await asyncio.sleep(30) + return await self._get_data(endpoint, **kwargs) + # handle 404 not found, convert to MediaNotFoundError + if response.status == 404: + raise MediaNotFoundError(f"{endpoint} not found") + response.raise_for_status() + return await response.json(loads=json_loads) + + async def _delete_data(self, endpoint, data=None, **kwargs) -> str: """Delete data from api.""" url = f"https://api.spotify.com/v1/{endpoint}" token = await self.login() - if not token: - return None headers = {"Authorization": f'Bearer {token["accessToken"]}'} async with self.mass.http_session.delete( url, headers=headers, params=kwargs, json=data, ssl=False ) as response: + # handle spotify rate limiter + if response.status == 429: + backoff_time = int(response.headers["Retry-After"]) + self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time) + await asyncio.sleep(backoff_time) + return await self._delete_data(endpoint, data=data, **kwargs) + response.raise_for_status() return await response.text() - async def _put_data(self, endpoint, data=None, **kwargs): + async def _put_data(self, endpoint, data=None, **kwargs) -> str: """Put data on api.""" url = f"https://api.spotify.com/v1/{endpoint}" token = await self.login() - if not token: - return None headers = {"Authorization": f'Bearer {token["accessToken"]}'} async with self.mass.http_session.put( url, headers=headers, params=kwargs, json=data, ssl=False ) as response: + # handle spotify rate limiter + if response.status == 429: + backoff_time = int(response.headers["Retry-After"]) + self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time) + await asyncio.sleep(backoff_time) + return await self._put_data(endpoint, data=data, **kwargs) + response.raise_for_status() return await response.text() - async def _post_data(self, endpoint, data=None, **kwargs): + async def _post_data(self, endpoint, data=None, **kwargs) -> str: """Post data on api.""" url = f"https://api.spotify.com/v1/{endpoint}" token = await self.login() - if not token: - return None headers = {"Authorization": f'Bearer {token["accessToken"]}'} async with self.mass.http_session.post( url, headers=headers, params=kwargs, json=data, ssl=False ) as response: + # handle spotify rate limiter + if response.status == 429: + backoff_time = int(response.headers["Retry-After"]) + self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time) + await asyncio.sleep(backoff_time) + return await self._post_data(endpoint, data=data, **kwargs) + response.raise_for_status() return await response.text() async def get_librespot_binary(self): -- 2.34.1