Various follow up fixes (#242)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 5 Apr 2022 23:38:50 +0000 (01:38 +0200)
committerGitHub <noreply@github.com>
Tue, 5 Apr 2022 23:38:50 +0000 (01:38 +0200)
* handle brokenpipe error at shutdown

* catch json parse errors in aiohttp

* stability fixes

* remove group player workaround

music_assistant/controllers/players.py
music_assistant/controllers/stream.py
music_assistant/helpers/audio.py
music_assistant/helpers/process.py
music_assistant/models/errors.py
music_assistant/models/player.py
music_assistant/models/player_queue.py
music_assistant/providers/qobuz.py
music_assistant/providers/spotify/__init__.py

index dcbe672d7cd93cf74762bd4bf7cf9260f9408183..a77e63fe7490dc6d877fe6637297e7340b00f808 100755 (executable)
@@ -63,14 +63,9 @@ class PlayerController:
                 return player
         return None
 
-    def get_player_queue(
-        self, queue_id: str, include_unavailable: bool = False
-    ) -> PlayerQueue | None:
+    def get_player_queue(self, queue_id: str) -> PlayerQueue | None:
         """Return PlayerQueue by id or None if not found/unavailable."""
-        if player_queue := self._player_queues.get(queue_id):
-            if player_queue.available or include_unavailable:
-                return player_queue
-        return None
+        return self._player_queues.get(queue_id)
 
     def get_player_by_name(self, name: str) -> PlayerType | None:
         """Return Player by name or None if no match is found."""
@@ -85,6 +80,7 @@ class PlayerController:
 
         # make sure that the mass instance is set on the player
         player.mass = self.mass
+        player._attr_active_queue_id = player_id  # pylint: disable=protected-access
         self._players[player_id] = player
 
         # create playerqueue for this player
index 832ec8eaf2121cc533a17ddfc96d8b67683e9dcb..cfe62cd5d6693b9c945dd3044b48f82da2961e1b 100644 (file)
@@ -22,6 +22,7 @@ from music_assistant.helpers.audio import (
 from music_assistant.helpers.process import AsyncProcess
 from music_assistant.helpers.typing import MusicAssistant
 from music_assistant.helpers.util import get_ip
+from music_assistant.models.errors import MediaNotFoundError
 from music_assistant.models.media_items import ContentType
 from music_assistant.models.player_queue import PlayerQueue
 
@@ -295,9 +296,14 @@ class StreamController:
                 self.logger.debug("no (more) tracks in queue %s", queue.queue_id)
                 break
             # get streamdetails
-            streamdetails = await get_stream_details(
-                self.mass, queue_track, queue.queue_id, lazy=track_count == 1
-            )
+            try:
+                streamdetails = await get_stream_details(
+                    self.mass, queue_track, queue.queue_id, lazy=track_count == 1
+                )
+            except MediaNotFoundError as err:
+                self.logger.error(str(err), exc_info=err)
+                streamdetails = None
+
             if not streamdetails:
                 self.logger.warning("Skip track due to missing streamdetails")
                 continue
index be26e707a10ed0ff068fcff41f632d9aa7201702..fff0e5b8c9c22a2a02bb521ac94eb68273353590 100644 (file)
@@ -175,7 +175,7 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N
 
 async def get_stream_details(
     mass: MusicAssistant, queue_item: QueueItem, queue_id: str = "", lazy: bool = True
-) -> StreamDetails | None:
+) -> StreamDetails:
     """
     Get streamdetails for the given QueueItem.
 
@@ -195,16 +195,9 @@ async def get_stream_details(
         )
     else:
         # always request the full db track as there might be other qualities available
-        try:
-            full_item = await mass.music.get_item_by_uri(
-                queue_item.uri, force_refresh=not lazy, lazy=lazy
-            )
-        except MediaNotFoundError as err:
-            LOGGER.warning(str(err))
-            return None
-
-        if not full_item:
-            return None
+        full_item = await mass.music.get_item_by_uri(
+            queue_item.uri, force_refresh=not lazy, lazy=lazy
+        )
         # sort by quality and check track availability
         for prov_media in sorted(
             full_item.provider_ids, key=lambda x: x.quality, reverse=True
@@ -227,27 +220,28 @@ async def get_stream_details(
                 else:
                     break
 
-    if streamdetails:
-        # set player_id on the streamdetails so we know what players stream
-        streamdetails.queue_id = queue_id
-        # get gain correct / replaygain
-        loudness, gain_correct = await get_gain_correct(
-            mass, queue_id, streamdetails.item_id, streamdetails.provider
-        )
-        streamdetails.gain_correct = gain_correct
-        streamdetails.loudness = loudness
-        # set streamdetails as attribute on the media_item
-        # this way the app knows what content is playing
-        queue_item.streamdetails = streamdetails
-        return streamdetails
-    return None
+    if not streamdetails:
+        raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}")
+
+    # set player_id on the streamdetails so we know what players stream
+    streamdetails.queue_id = queue_id
+    # get gain correct / replaygain
+    loudness, gain_correct = await get_gain_correct(
+        mass, queue_id, streamdetails.item_id, streamdetails.provider
+    )
+    streamdetails.gain_correct = gain_correct
+    streamdetails.loudness = loudness
+    # set streamdetails as attribute on the media_item
+    # this way the app knows what content is playing
+    queue_item.streamdetails = streamdetails
+    return streamdetails
 
 
 async def get_gain_correct(
     mass: MusicAssistant, queue_id: str, item_id: str, provider_id: str
 ) -> Tuple[float, float]:
     """Get gain correction for given queue / track combination."""
-    queue = mass.players.get_player_queue(queue_id, True)
+    queue = mass.players.get_player_queue(queue_id)
     if not queue or not queue.volume_normalization_enabled:
         return 0
     target_gain = queue.volume_normalization_target
index 76979dc2d23072e19777f2fc8a666f9ecaebf13b..f2a84852c2404de0c8e3f5a98f1c0ff10b46e27f 100644 (file)
@@ -99,9 +99,7 @@ class AsyncProcess:
         try:
             self._proc.stdin.write(data)
             await self._proc.stdin.drain()
-        except BrokenPipeError:
-            pass
-        except (AttributeError, AssertionError) as err:
+        except (AttributeError, AssertionError, BrokenPipeError) as err:
             raise asyncio.CancelledError() from err
 
     def write_eof(self) -> None:
index ae7a5b00756614a43124a06c0ddba67d4324f58d..04cd5c27371d0f2825642b8aa3957340f62f266c 100644 (file)
@@ -31,3 +31,7 @@ class LoginFailed(MusicAssistantError):
 
 class AudioError(MusicAssistantError):
     """Error raised when an issue arrised when processing audio."""
+
+
+class QueueEmpty(MusicAssistantError):
+    """Error raised when trying to start queue stream while queue is empty."""
index e58d85750603c4deaadf6c28d4c523ab67532c70..c00a77c8888bc50c23611701f499f19e24dd81b8 100755 (executable)
@@ -45,7 +45,8 @@ class Player(ABC):
     """Model for a music player."""
 
     player_id: str
-    is_group: bool = False
+    _attr_is_group: bool = False
+    _attr_group_childs: List[str] = []
     _attr_name: str = None
     _attr_powered: bool = False
     _attr_elapsed_time: int = 0
@@ -55,6 +56,7 @@ class Player(ABC):
     _attr_volume_level: int = 100
     _attr_device_info: DeviceInfo = DeviceInfo()
     _attr_max_sample_rate: int = 96000
+    _attr_active_queue_id: str = ""
     # mass object will be set by playermanager at register
     mass: MusicAssistant = None  # type: ignore[assignment]
 
@@ -63,6 +65,16 @@ class Player(ABC):
         """Return player name."""
         return self._attr_name or self.player_id
 
+    @property
+    def is_group(self) -> bool:
+        """Return bool if this player is a grouped player (playergroup)."""
+        return self._attr_is_group
+
+    @property
+    def group_childs(self) -> List[str]:
+        """Return list of child player id's of PlayerGroup (if player is group)."""
+        return self._attr_group_childs
+
     @property
     def powered(self) -> bool:
         """Return current power state of player."""
@@ -102,6 +114,16 @@ class Player(ABC):
         """Return the maximum supported sample rate this player supports."""
         return self._attr_max_sample_rate
 
+    @property
+    def active_queue(self) -> PlayerQueue:
+        """
+        Return the currently active queue for this player.
+
+        If the player is a group child this will return its parent when that is playing,
+        otherwise it will return the player's own queue.
+        """
+        return self.mass.players.get_player_queue(self._attr_active_queue_id)
+
     async def play_url(self, url: str) -> None:
         """Play the specified url on the player."""
         raise NotImplementedError
@@ -151,13 +173,16 @@ class Player(ABC):
 
     # DO NOT OVERRIDE BELOW
 
-    @property
-    def queue(self) -> "PlayerQueue":
-        """Return PlayerQueue for this player."""
-        return self.mass.players.get_player_queue(self.player_id, True)
-
     def update_state(self) -> None:
         """Update current player state in the player manager."""
+        # determine active queue for player
+        queue_id = self.player_id
+        for player_id in self.get_group_parents():
+            if player := self.mass.players.get_player(player_id):
+                if player.state in [PlayerState.PLAYING, PlayerState.PAUSED]:
+                    queue_id = player_id
+                    break
+        self._attr_active_queue_id = queue_id
         # basic throttle: do not send state changed events if player did not change
         prev_state = getattr(self, "_prev_state", None)
         cur_state = self.to_dict()
@@ -165,7 +190,7 @@ class Player(ABC):
             return
         setattr(self, "_prev_state", cur_state)
         self.mass.signal_event(EventType.PLAYER_CHANGED, self)
-        self.queue.on_player_update()
+        self.mass.players.get_player_queue(self.player_id).on_player_update()
         if self.is_group:
             # update group player childs when parent updates
             for child_player_id in self.group_childs:
@@ -194,34 +219,28 @@ class Player(ABC):
             "elapsed_time": self.elapsed_time,
             "state": self.state.value,
             "available": self.available,
+            "is_group": self.is_group,
+            "group_childs": self.group_childs,
             "volume_level": int(self.volume_level),
             "device_info": self.device_info.to_dict(),
+            "active_queue": self.active_queue.queue_id,
         }
 
 
 class PlayerGroup(Player):
-    """Model for a player group."""
+    """Convenience Model for a player group with some additional helper methods."""
 
     is_group: bool = True
     _attr_group_childs: List[str] = []
     _attr_support_join_control: bool = True
 
-    @property
-    def support_join_control(self) -> bool:
-        """Return bool if joining/unjoining of players to this group is supported."""
-        return self._attr_support_join_control
-
-    @property
-    def group_childs(self) -> List[str]:
-        """Return list of child player id's of this PlayerGroup."""
-        return self._attr_group_childs
-
     @property
     def volume_level(self) -> int:
         """Return current volume level of player (scale 0..100)."""
         if not self.available:
             return 0
         # calculate group volume from powered players for convenience
+        # may be overridden if implementation provides this natively
         group_volume = 0
         active_players = 0
         for child_player in self._get_players(True):
@@ -231,20 +250,10 @@ class PlayerGroup(Player):
             group_volume = group_volume / active_players
         return int(group_volume)
 
-    async def power(self, powered: bool) -> None:
-        """Send POWER command to player."""
-        try:
-            super().power(powered)
-        except NotImplementedError:
-            self._attr_powered = powered
-        if not powered:
-            # turn off all childs
-            for child_player in self._get_players(True):
-                await child_player.power(False)
-
     async def volume_set(self, volume_level: int) -> None:
         """Send volume level (0..100) command to player."""
-        # handle group volume
+        # handle group volume by only applying the valume to powered childs
+        # may be overridden if implementation provides this natively
         cur_volume = self.volume_level
         new_volume = volume_level
         volume_dif = new_volume - cur_volume
@@ -259,14 +268,6 @@ class PlayerGroup(Player):
             )
             await child_player.volume_set(new_child_volume)
 
-    async def join(self, player_id: str) -> None:
-        """Command to add/join a player to this group."""
-        raise NotImplementedError
-
-    async def unjoin(self, player_id: str) -> None:
-        """Command to remove/unjoin a player to this group."""
-        raise NotImplementedError
-
     def _get_players(self, only_powered: bool = False) -> List[Player]:
         """Get players attached to this group."""
         return [
index f1952ce2a3b42669c2a6ae41aebfd6026c81178a..0baaf2f05e74a1b77bcc03f70b3eb3e8a11cfa54 100644 (file)
@@ -15,6 +15,7 @@ from music_assistant.constants import EventType
 from music_assistant.helpers.audio import get_stream_details
 from music_assistant.helpers.typing import MusicAssistant
 from music_assistant.helpers.util import create_task
+from music_assistant.models.errors import MediaNotFoundError, QueueEmpty
 from music_assistant.models.media_items import MediaType, StreamDetails
 
 from .player import Player, PlayerState
@@ -70,7 +71,6 @@ class PlayerQueue:
         self.mass = mass
         self.logger = mass.players.logger
         self.queue_id = player_id
-        self.player_id = player_id
 
         self._shuffle_enabled: bool = False
         self._repeat_enabled: bool = False
@@ -81,8 +81,8 @@ class PlayerQueue:
         self._current_index: Optional[int] = None
         self._current_item_time: int = 0
         self._last_item: Optional[QueueItem] = None
-        self._start_index: int = 0
-        self._next_index: int = 0
+        self._start_index: int = 0  # from which index did the queue start playing
+        self._next_start_index: int = 0  # which index should the stream start
         self._last_state = PlayerState.IDLE
         self._items: List[QueueItem] = []
         self._save_task: TimerHandle = None
@@ -99,17 +99,11 @@ class PlayerQueue:
     @property
     def player(self) -> Player:
         """Return the player attached to this queue."""
-        return self.mass.players.get_player(self.player_id, include_unavailable=True)
-
-    @property
-    def available(self) -> bool:
-        """Return bool if this queue is available."""
-        return self.player.available
+        return self.mass.players.get_player(self.queue_id, include_unavailable=True)
 
     @property
     def active(self) -> bool:
         """Return bool if the queue is currenty active on the player."""
-        # TODO: figure out a way to handle group childs playing the parent queue
         if self.player.current_url is None:
             return False
         return self._stream_url in self.player.current_url
@@ -148,17 +142,6 @@ class PlayerQueue:
         """Return all items in this queue."""
         return self._items
 
-    @property
-    def current_index(self) -> Optional[int]:
-        """
-        Return the current index of the queue.
-
-        Returns None if queue is empty.
-        """
-        if self._current_index >= len(self._items):
-            return None
-        return self._current_index
-
     @property
     def current_item(self) -> QueueItem | None:
         """
@@ -172,27 +155,6 @@ class PlayerQueue:
             return None
         return self._items[self._current_index]
 
-    @property
-    def next_index(self) -> Optional[int]:
-        """
-        Return the next index for this PlayerQueue.
-
-        Return None if queue is empty or no more items.
-        """
-        if not self._items:
-            # queue is empty
-            return None
-        if self._current_index is None:
-            # playback just started
-            return 0
-        # player already playing (or paused) so return the next item
-        if len(self._items) > (self._current_index + 1):
-            return self._current_index + 1
-        if self.repeat_enabled:
-            # repeat enabled, start queue at beginning
-            return 0
-        return None
-
     @property
     def next_item(self) -> QueueItem | None:
         """
@@ -200,8 +162,8 @@ class PlayerQueue:
 
         Returns None if queue is empty or no more items.
         """
-        if self.next_index is not None:
-            return self._items[self.next_index]
+        if next_index := self.get_next_index(self._current_index):
+            return self._items[next_index]
         return None
 
     @property
@@ -377,17 +339,16 @@ class PlayerQueue:
 
     async def next(self) -> None:
         """Play the next track in the queue."""
-        if self._current_index is None:
-            return
-        if self.next_index is None:
-            return
-        await self.play_index(self.next_index)
+        next_index = self.get_next_index(self._current_index)
+        if next_index is None:
+            return None
+        await self.play_index(next_index)
 
     async def previous(self) -> None:
         """Play the previous track in the queue."""
         if self._current_index is None:
             return
-        await self.play_index(self._current_index - 1)
+        await self.play_index(max(self._current_index - 1, 0))
 
     async def resume(self) -> None:
         """Resume previous queue."""
@@ -409,6 +370,7 @@ class PlayerQueue:
         if not len(self.items) > index:
             return
         self._current_index = index
+        self._next_start_index = index
 
         # send stream url to player connected to this queue
         self._stream_url = self.mass.players.streams.get_stream_url(self.queue_id)
@@ -563,32 +525,52 @@ class PlayerQueue:
             return True
         return False
 
-    async def queue_stream_prepare(self) -> StreamDetails | None:
+    async def queue_stream_prepare(self) -> StreamDetails:
         """Call when queue_streamer is about to start playing."""
-        if next_item := self.next_item:
+        start_from_index = self._next_start_index
+        try:
+            next_item = self._items[start_from_index]
+        except IndexError as err:
+            raise QueueEmpty() from err
+        try:
             return await get_stream_details(self.mass, next_item, self.queue_id)
-        return None
+        except MediaNotFoundError as err:
+            # something bad happened, try to recover by requesting the next track in the queue
+            await self.play_index(self._current_index + 2)
+            raise err
 
-    async def queue_stream_start(self) -> None:
+    async def queue_stream_start(self) -> int:
         """Call when queue_streamer starts playing the queue stream."""
+        start_from_index = self._next_start_index
         self._current_item_time = 0
-        self._current_index = self.next_index
-        self._start_index = self._current_index
-        return self._current_index
+        self._current_index = start_from_index
+        self._start_index = start_from_index
+        self._next_start_index = self.get_next_index(start_from_index)
+        return start_from_index
 
-    async def queue_stream_next(self, cur_index: int) -> None:
+    async def queue_stream_next(self, cur_index: int) -> int | None:
         """Call when queue_streamer loads next track in buffer."""
-        next_index = 0
-        if len(self.items) > (next_index):
-            next_index = cur_index + 1
-        elif self._repeat_enabled:
+        next_idx = self._next_start_index
+        self._next_start_index = self.get_next_index(self._next_start_index)
+        return next_idx
+
+    def get_next_index(self, index: int) -> int | None:
+        """Return the next index or None if no more items."""
+        if not self._items:
+            # queue is empty
+            return None
+        if index is None:
+            # guard just in case
+            return 0
+        if len(self._items) > (index + 1):
+            return index + 1
+        if self.repeat_enabled:
             # repeat enabled, start queue at beginning
-            next_index = 0
-        self._next_index = next_index + 1
-        return next_index
+            return 0
+        return None
 
     async def queue_stream_signal_next(self):
-        """Indicate that queue stream needs to start nex index once playback finished."""
+        """Indicate that queue stream needs to start next index once playback finished."""
         self._signal_next = True
 
     async def __update_task(self) -> None:
index 5432de93ce0c40443a8a10d5e3b6c2d85b9381fc..e7443d221add776cea315f5beaf5588c326c2cb1 100644 (file)
@@ -4,8 +4,10 @@ from __future__ import annotations
 import datetime
 import hashlib
 import time
+from json import JSONDecodeError
 from typing import List, Optional
 
+import aiohttp
 from asyncio_throttle import Throttler
 from music_assistant.constants import EventType
 from music_assistant.helpers.app_vars import (  # pylint: disable=no-name-in-module
@@ -657,11 +659,18 @@ class QobuzProvider(MusicProvider):
             async with self.mass.http_session.get(
                 url, headers=headers, params=params, verify_ssl=False
             ) as response:
-                result = await response.json()
-                if "error" in result or (
-                    "status" in result and "error" in result["status"]
-                ):
-                    self.logger.error("%s - %s", endpoint, result)
+                try:
+                    result = await response.json()
+                    if "error" in result or (
+                        "status" in result and "error" in result["status"]
+                    ):
+                        self.logger.error("%s - %s", endpoint, result)
+                        return None
+                except (
+                    aiohttp.ContentTypeError,
+                    JSONDecodeError,
+                ) as err:
+                    self.logger.error("%s - %s", endpoint, str(err))
                     return None
                 return result
 
index 47ee29ece3de9d3f48fa1c4006009fe9f13c3a4a..fcc49649556e8b8dba64ad9a4055b91f3c38876e 100644 (file)
@@ -9,6 +9,7 @@ import time
 from json.decoder import JSONDecodeError
 from typing import List, Optional
 
+import aiohttp
 from asyncio_throttle import Throttler
 from music_assistant.helpers.app_vars import (  # noqa # pylint: disable=no-name-in-module
     get_app_var,
@@ -508,10 +509,19 @@ class SpotifyProvider(MusicProvider):
             async with self.mass.http_session.get(
                 url, headers=headers, params=params, verify_ssl=False
             ) as response:
-                result = await response.json()
-                if not result or "error" in result:
-                    self.logger.error("%s - %s", endpoint, result)
-                    result = None
+                try:
+                    result = await response.json()
+                    if "error" in result or (
+                        "status" in result and "error" in result["status"]
+                    ):
+                        self.logger.error("%s - %s", endpoint, result)
+                        return None
+                except (
+                    aiohttp.ContentTypeError,
+                    JSONDecodeError,
+                ) as err:
+                    self.logger.error("%s - %s", endpoint, str(err))
+                    return None
                 return result
 
     async def _delete_data(self, endpoint, params=None, data=None):