From: Marcel van der Veldt Date: Tue, 5 Apr 2022 23:38:50 +0000 (+0200) Subject: Various follow up fixes (#242) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=b34cd75ed366f6205cf8a02f1a52d5cd3b8388aa;p=music-assistant-server.git Various follow up fixes (#242) * handle brokenpipe error at shutdown * catch json parse errors in aiohttp * stability fixes * remove group player workaround --- diff --git a/music_assistant/controllers/players.py b/music_assistant/controllers/players.py index dcbe672d..a77e63fe 100755 --- a/music_assistant/controllers/players.py +++ b/music_assistant/controllers/players.py @@ -63,14 +63,9 @@ class PlayerController: return player return None - def get_player_queue( - self, queue_id: str, include_unavailable: bool = False - ) -> PlayerQueue | None: + def get_player_queue(self, queue_id: str) -> PlayerQueue | None: """Return PlayerQueue by id or None if not found/unavailable.""" - if player_queue := self._player_queues.get(queue_id): - if player_queue.available or include_unavailable: - return player_queue - return None + return self._player_queues.get(queue_id) def get_player_by_name(self, name: str) -> PlayerType | None: """Return Player by name or None if no match is found.""" @@ -85,6 +80,7 @@ class PlayerController: # make sure that the mass instance is set on the player player.mass = self.mass + player._attr_active_queue_id = player_id # pylint: disable=protected-access self._players[player_id] = player # create playerqueue for this player diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py index 832ec8ea..cfe62cd5 100644 --- a/music_assistant/controllers/stream.py +++ b/music_assistant/controllers/stream.py @@ -22,6 +22,7 @@ from music_assistant.helpers.audio import ( from music_assistant.helpers.process import AsyncProcess from music_assistant.helpers.typing import MusicAssistant from music_assistant.helpers.util import get_ip +from music_assistant.models.errors import MediaNotFoundError from music_assistant.models.media_items import ContentType from music_assistant.models.player_queue import PlayerQueue @@ -295,9 +296,14 @@ class StreamController: self.logger.debug("no (more) tracks in queue %s", queue.queue_id) break # get streamdetails - streamdetails = await get_stream_details( - self.mass, queue_track, queue.queue_id, lazy=track_count == 1 - ) + try: + streamdetails = await get_stream_details( + self.mass, queue_track, queue.queue_id, lazy=track_count == 1 + ) + except MediaNotFoundError as err: + self.logger.error(str(err), exc_info=err) + streamdetails = None + if not streamdetails: self.logger.warning("Skip track due to missing streamdetails") continue diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index be26e707..fff0e5b8 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -175,7 +175,7 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N async def get_stream_details( mass: MusicAssistant, queue_item: QueueItem, queue_id: str = "", lazy: bool = True -) -> StreamDetails | None: +) -> StreamDetails: """ Get streamdetails for the given QueueItem. @@ -195,16 +195,9 @@ async def get_stream_details( ) else: # always request the full db track as there might be other qualities available - try: - full_item = await mass.music.get_item_by_uri( - queue_item.uri, force_refresh=not lazy, lazy=lazy - ) - except MediaNotFoundError as err: - LOGGER.warning(str(err)) - return None - - if not full_item: - return None + full_item = await mass.music.get_item_by_uri( + queue_item.uri, force_refresh=not lazy, lazy=lazy + ) # sort by quality and check track availability for prov_media in sorted( full_item.provider_ids, key=lambda x: x.quality, reverse=True @@ -227,27 +220,28 @@ async def get_stream_details( else: break - if streamdetails: - # set player_id on the streamdetails so we know what players stream - streamdetails.queue_id = queue_id - # get gain correct / replaygain - loudness, gain_correct = await get_gain_correct( - mass, queue_id, streamdetails.item_id, streamdetails.provider - ) - streamdetails.gain_correct = gain_correct - streamdetails.loudness = loudness - # set streamdetails as attribute on the media_item - # this way the app knows what content is playing - queue_item.streamdetails = streamdetails - return streamdetails - return None + if not streamdetails: + raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}") + + # set player_id on the streamdetails so we know what players stream + streamdetails.queue_id = queue_id + # get gain correct / replaygain + loudness, gain_correct = await get_gain_correct( + mass, queue_id, streamdetails.item_id, streamdetails.provider + ) + streamdetails.gain_correct = gain_correct + streamdetails.loudness = loudness + # set streamdetails as attribute on the media_item + # this way the app knows what content is playing + queue_item.streamdetails = streamdetails + return streamdetails async def get_gain_correct( mass: MusicAssistant, queue_id: str, item_id: str, provider_id: str ) -> Tuple[float, float]: """Get gain correction for given queue / track combination.""" - queue = mass.players.get_player_queue(queue_id, True) + queue = mass.players.get_player_queue(queue_id) if not queue or not queue.volume_normalization_enabled: return 0 target_gain = queue.volume_normalization_target diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index 76979dc2..f2a84852 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -99,9 +99,7 @@ class AsyncProcess: try: self._proc.stdin.write(data) await self._proc.stdin.drain() - except BrokenPipeError: - pass - except (AttributeError, AssertionError) as err: + except (AttributeError, AssertionError, BrokenPipeError) as err: raise asyncio.CancelledError() from err def write_eof(self) -> None: diff --git a/music_assistant/models/errors.py b/music_assistant/models/errors.py index ae7a5b00..04cd5c27 100644 --- a/music_assistant/models/errors.py +++ b/music_assistant/models/errors.py @@ -31,3 +31,7 @@ class LoginFailed(MusicAssistantError): class AudioError(MusicAssistantError): """Error raised when an issue arrised when processing audio.""" + + +class QueueEmpty(MusicAssistantError): + """Error raised when trying to start queue stream while queue is empty.""" diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index e58d8575..c00a77c8 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -45,7 +45,8 @@ class Player(ABC): """Model for a music player.""" player_id: str - is_group: bool = False + _attr_is_group: bool = False + _attr_group_childs: List[str] = [] _attr_name: str = None _attr_powered: bool = False _attr_elapsed_time: int = 0 @@ -55,6 +56,7 @@ class Player(ABC): _attr_volume_level: int = 100 _attr_device_info: DeviceInfo = DeviceInfo() _attr_max_sample_rate: int = 96000 + _attr_active_queue_id: str = "" # mass object will be set by playermanager at register mass: MusicAssistant = None # type: ignore[assignment] @@ -63,6 +65,16 @@ class Player(ABC): """Return player name.""" return self._attr_name or self.player_id + @property + def is_group(self) -> bool: + """Return bool if this player is a grouped player (playergroup).""" + return self._attr_is_group + + @property + def group_childs(self) -> List[str]: + """Return list of child player id's of PlayerGroup (if player is group).""" + return self._attr_group_childs + @property def powered(self) -> bool: """Return current power state of player.""" @@ -102,6 +114,16 @@ class Player(ABC): """Return the maximum supported sample rate this player supports.""" return self._attr_max_sample_rate + @property + def active_queue(self) -> PlayerQueue: + """ + Return the currently active queue for this player. + + If the player is a group child this will return its parent when that is playing, + otherwise it will return the player's own queue. + """ + return self.mass.players.get_player_queue(self._attr_active_queue_id) + async def play_url(self, url: str) -> None: """Play the specified url on the player.""" raise NotImplementedError @@ -151,13 +173,16 @@ class Player(ABC): # DO NOT OVERRIDE BELOW - @property - def queue(self) -> "PlayerQueue": - """Return PlayerQueue for this player.""" - return self.mass.players.get_player_queue(self.player_id, True) - def update_state(self) -> None: """Update current player state in the player manager.""" + # determine active queue for player + queue_id = self.player_id + for player_id in self.get_group_parents(): + if player := self.mass.players.get_player(player_id): + if player.state in [PlayerState.PLAYING, PlayerState.PAUSED]: + queue_id = player_id + break + self._attr_active_queue_id = queue_id # basic throttle: do not send state changed events if player did not change prev_state = getattr(self, "_prev_state", None) cur_state = self.to_dict() @@ -165,7 +190,7 @@ class Player(ABC): return setattr(self, "_prev_state", cur_state) self.mass.signal_event(EventType.PLAYER_CHANGED, self) - self.queue.on_player_update() + self.mass.players.get_player_queue(self.player_id).on_player_update() if self.is_group: # update group player childs when parent updates for child_player_id in self.group_childs: @@ -194,34 +219,28 @@ class Player(ABC): "elapsed_time": self.elapsed_time, "state": self.state.value, "available": self.available, + "is_group": self.is_group, + "group_childs": self.group_childs, "volume_level": int(self.volume_level), "device_info": self.device_info.to_dict(), + "active_queue": self.active_queue.queue_id, } class PlayerGroup(Player): - """Model for a player group.""" + """Convenience Model for a player group with some additional helper methods.""" is_group: bool = True _attr_group_childs: List[str] = [] _attr_support_join_control: bool = True - @property - def support_join_control(self) -> bool: - """Return bool if joining/unjoining of players to this group is supported.""" - return self._attr_support_join_control - - @property - def group_childs(self) -> List[str]: - """Return list of child player id's of this PlayerGroup.""" - return self._attr_group_childs - @property def volume_level(self) -> int: """Return current volume level of player (scale 0..100).""" if not self.available: return 0 # calculate group volume from powered players for convenience + # may be overridden if implementation provides this natively group_volume = 0 active_players = 0 for child_player in self._get_players(True): @@ -231,20 +250,10 @@ class PlayerGroup(Player): group_volume = group_volume / active_players return int(group_volume) - async def power(self, powered: bool) -> None: - """Send POWER command to player.""" - try: - super().power(powered) - except NotImplementedError: - self._attr_powered = powered - if not powered: - # turn off all childs - for child_player in self._get_players(True): - await child_player.power(False) - async def volume_set(self, volume_level: int) -> None: """Send volume level (0..100) command to player.""" - # handle group volume + # handle group volume by only applying the valume to powered childs + # may be overridden if implementation provides this natively cur_volume = self.volume_level new_volume = volume_level volume_dif = new_volume - cur_volume @@ -259,14 +268,6 @@ class PlayerGroup(Player): ) await child_player.volume_set(new_child_volume) - async def join(self, player_id: str) -> None: - """Command to add/join a player to this group.""" - raise NotImplementedError - - async def unjoin(self, player_id: str) -> None: - """Command to remove/unjoin a player to this group.""" - raise NotImplementedError - def _get_players(self, only_powered: bool = False) -> List[Player]: """Get players attached to this group.""" return [ diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index f1952ce2..0baaf2f0 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -15,6 +15,7 @@ from music_assistant.constants import EventType from music_assistant.helpers.audio import get_stream_details from music_assistant.helpers.typing import MusicAssistant from music_assistant.helpers.util import create_task +from music_assistant.models.errors import MediaNotFoundError, QueueEmpty from music_assistant.models.media_items import MediaType, StreamDetails from .player import Player, PlayerState @@ -70,7 +71,6 @@ class PlayerQueue: self.mass = mass self.logger = mass.players.logger self.queue_id = player_id - self.player_id = player_id self._shuffle_enabled: bool = False self._repeat_enabled: bool = False @@ -81,8 +81,8 @@ class PlayerQueue: self._current_index: Optional[int] = None self._current_item_time: int = 0 self._last_item: Optional[QueueItem] = None - self._start_index: int = 0 - self._next_index: int = 0 + self._start_index: int = 0 # from which index did the queue start playing + self._next_start_index: int = 0 # which index should the stream start self._last_state = PlayerState.IDLE self._items: List[QueueItem] = [] self._save_task: TimerHandle = None @@ -99,17 +99,11 @@ class PlayerQueue: @property def player(self) -> Player: """Return the player attached to this queue.""" - return self.mass.players.get_player(self.player_id, include_unavailable=True) - - @property - def available(self) -> bool: - """Return bool if this queue is available.""" - return self.player.available + return self.mass.players.get_player(self.queue_id, include_unavailable=True) @property def active(self) -> bool: """Return bool if the queue is currenty active on the player.""" - # TODO: figure out a way to handle group childs playing the parent queue if self.player.current_url is None: return False return self._stream_url in self.player.current_url @@ -148,17 +142,6 @@ class PlayerQueue: """Return all items in this queue.""" return self._items - @property - def current_index(self) -> Optional[int]: - """ - Return the current index of the queue. - - Returns None if queue is empty. - """ - if self._current_index >= len(self._items): - return None - return self._current_index - @property def current_item(self) -> QueueItem | None: """ @@ -172,27 +155,6 @@ class PlayerQueue: return None return self._items[self._current_index] - @property - def next_index(self) -> Optional[int]: - """ - Return the next index for this PlayerQueue. - - Return None if queue is empty or no more items. - """ - if not self._items: - # queue is empty - return None - if self._current_index is None: - # playback just started - return 0 - # player already playing (or paused) so return the next item - if len(self._items) > (self._current_index + 1): - return self._current_index + 1 - if self.repeat_enabled: - # repeat enabled, start queue at beginning - return 0 - return None - @property def next_item(self) -> QueueItem | None: """ @@ -200,8 +162,8 @@ class PlayerQueue: Returns None if queue is empty or no more items. """ - if self.next_index is not None: - return self._items[self.next_index] + if next_index := self.get_next_index(self._current_index): + return self._items[next_index] return None @property @@ -377,17 +339,16 @@ class PlayerQueue: async def next(self) -> None: """Play the next track in the queue.""" - if self._current_index is None: - return - if self.next_index is None: - return - await self.play_index(self.next_index) + next_index = self.get_next_index(self._current_index) + if next_index is None: + return None + await self.play_index(next_index) async def previous(self) -> None: """Play the previous track in the queue.""" if self._current_index is None: return - await self.play_index(self._current_index - 1) + await self.play_index(max(self._current_index - 1, 0)) async def resume(self) -> None: """Resume previous queue.""" @@ -409,6 +370,7 @@ class PlayerQueue: if not len(self.items) > index: return self._current_index = index + self._next_start_index = index # send stream url to player connected to this queue self._stream_url = self.mass.players.streams.get_stream_url(self.queue_id) @@ -563,32 +525,52 @@ class PlayerQueue: return True return False - async def queue_stream_prepare(self) -> StreamDetails | None: + async def queue_stream_prepare(self) -> StreamDetails: """Call when queue_streamer is about to start playing.""" - if next_item := self.next_item: + start_from_index = self._next_start_index + try: + next_item = self._items[start_from_index] + except IndexError as err: + raise QueueEmpty() from err + try: return await get_stream_details(self.mass, next_item, self.queue_id) - return None + except MediaNotFoundError as err: + # something bad happened, try to recover by requesting the next track in the queue + await self.play_index(self._current_index + 2) + raise err - async def queue_stream_start(self) -> None: + async def queue_stream_start(self) -> int: """Call when queue_streamer starts playing the queue stream.""" + start_from_index = self._next_start_index self._current_item_time = 0 - self._current_index = self.next_index - self._start_index = self._current_index - return self._current_index + self._current_index = start_from_index + self._start_index = start_from_index + self._next_start_index = self.get_next_index(start_from_index) + return start_from_index - async def queue_stream_next(self, cur_index: int) -> None: + async def queue_stream_next(self, cur_index: int) -> int | None: """Call when queue_streamer loads next track in buffer.""" - next_index = 0 - if len(self.items) > (next_index): - next_index = cur_index + 1 - elif self._repeat_enabled: + next_idx = self._next_start_index + self._next_start_index = self.get_next_index(self._next_start_index) + return next_idx + + def get_next_index(self, index: int) -> int | None: + """Return the next index or None if no more items.""" + if not self._items: + # queue is empty + return None + if index is None: + # guard just in case + return 0 + if len(self._items) > (index + 1): + return index + 1 + if self.repeat_enabled: # repeat enabled, start queue at beginning - next_index = 0 - self._next_index = next_index + 1 - return next_index + return 0 + return None async def queue_stream_signal_next(self): - """Indicate that queue stream needs to start nex index once playback finished.""" + """Indicate that queue stream needs to start next index once playback finished.""" self._signal_next = True async def __update_task(self) -> None: diff --git a/music_assistant/providers/qobuz.py b/music_assistant/providers/qobuz.py index 5432de93..e7443d22 100644 --- a/music_assistant/providers/qobuz.py +++ b/music_assistant/providers/qobuz.py @@ -4,8 +4,10 @@ from __future__ import annotations import datetime import hashlib import time +from json import JSONDecodeError from typing import List, Optional +import aiohttp from asyncio_throttle import Throttler from music_assistant.constants import EventType from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module @@ -657,11 +659,18 @@ class QobuzProvider(MusicProvider): async with self.mass.http_session.get( url, headers=headers, params=params, verify_ssl=False ) as response: - result = await response.json() - if "error" in result or ( - "status" in result and "error" in result["status"] - ): - self.logger.error("%s - %s", endpoint, result) + try: + result = await response.json() + if "error" in result or ( + "status" in result and "error" in result["status"] + ): + self.logger.error("%s - %s", endpoint, result) + return None + except ( + aiohttp.ContentTypeError, + JSONDecodeError, + ) as err: + self.logger.error("%s - %s", endpoint, str(err)) return None return result diff --git a/music_assistant/providers/spotify/__init__.py b/music_assistant/providers/spotify/__init__.py index 47ee29ec..fcc49649 100644 --- a/music_assistant/providers/spotify/__init__.py +++ b/music_assistant/providers/spotify/__init__.py @@ -9,6 +9,7 @@ import time from json.decoder import JSONDecodeError from typing import List, Optional +import aiohttp from asyncio_throttle import Throttler from music_assistant.helpers.app_vars import ( # noqa # pylint: disable=no-name-in-module get_app_var, @@ -508,10 +509,19 @@ class SpotifyProvider(MusicProvider): async with self.mass.http_session.get( url, headers=headers, params=params, verify_ssl=False ) as response: - result = await response.json() - if not result or "error" in result: - self.logger.error("%s - %s", endpoint, result) - result = None + try: + result = await response.json() + if "error" in result or ( + "status" in result and "error" in result["status"] + ): + self.logger.error("%s - %s", endpoint, result) + return None + except ( + aiohttp.ContentTypeError, + JSONDecodeError, + ) as err: + self.logger.error("%s - %s", endpoint, str(err)) + return None return result async def _delete_data(self, endpoint, params=None, data=None):