return player
return None
- def get_player_queue(
- self, queue_id: str, include_unavailable: bool = False
- ) -> PlayerQueue | None:
+ def get_player_queue(self, queue_id: str) -> PlayerQueue | None:
"""Return PlayerQueue by id or None if not found/unavailable."""
- if player_queue := self._player_queues.get(queue_id):
- if player_queue.available or include_unavailable:
- return player_queue
- return None
+ return self._player_queues.get(queue_id)
def get_player_by_name(self, name: str) -> PlayerType | None:
"""Return Player by name or None if no match is found."""
# make sure that the mass instance is set on the player
player.mass = self.mass
+ player._attr_active_queue_id = player_id # pylint: disable=protected-access
self._players[player_id] = player
# create playerqueue for this player
from music_assistant.helpers.process import AsyncProcess
from music_assistant.helpers.typing import MusicAssistant
from music_assistant.helpers.util import get_ip
+from music_assistant.models.errors import MediaNotFoundError
from music_assistant.models.media_items import ContentType
from music_assistant.models.player_queue import PlayerQueue
self.logger.debug("no (more) tracks in queue %s", queue.queue_id)
break
# get streamdetails
- streamdetails = await get_stream_details(
- self.mass, queue_track, queue.queue_id, lazy=track_count == 1
- )
+ try:
+ streamdetails = await get_stream_details(
+ self.mass, queue_track, queue.queue_id, lazy=track_count == 1
+ )
+ except MediaNotFoundError as err:
+ self.logger.error(str(err), exc_info=err)
+ streamdetails = None
+
if not streamdetails:
self.logger.warning("Skip track due to missing streamdetails")
continue
async def get_stream_details(
mass: MusicAssistant, queue_item: QueueItem, queue_id: str = "", lazy: bool = True
-) -> StreamDetails | None:
+) -> StreamDetails:
"""
Get streamdetails for the given QueueItem.
)
else:
# always request the full db track as there might be other qualities available
- try:
- full_item = await mass.music.get_item_by_uri(
- queue_item.uri, force_refresh=not lazy, lazy=lazy
- )
- except MediaNotFoundError as err:
- LOGGER.warning(str(err))
- return None
-
- if not full_item:
- return None
+ full_item = await mass.music.get_item_by_uri(
+ queue_item.uri, force_refresh=not lazy, lazy=lazy
+ )
# sort by quality and check track availability
for prov_media in sorted(
full_item.provider_ids, key=lambda x: x.quality, reverse=True
else:
break
- if streamdetails:
- # set player_id on the streamdetails so we know what players stream
- streamdetails.queue_id = queue_id
- # get gain correct / replaygain
- loudness, gain_correct = await get_gain_correct(
- mass, queue_id, streamdetails.item_id, streamdetails.provider
- )
- streamdetails.gain_correct = gain_correct
- streamdetails.loudness = loudness
- # set streamdetails as attribute on the media_item
- # this way the app knows what content is playing
- queue_item.streamdetails = streamdetails
- return streamdetails
- return None
+ if not streamdetails:
+ raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}")
+
+ # set player_id on the streamdetails so we know what players stream
+ streamdetails.queue_id = queue_id
+ # get gain correct / replaygain
+ loudness, gain_correct = await get_gain_correct(
+ mass, queue_id, streamdetails.item_id, streamdetails.provider
+ )
+ streamdetails.gain_correct = gain_correct
+ streamdetails.loudness = loudness
+ # set streamdetails as attribute on the media_item
+ # this way the app knows what content is playing
+ queue_item.streamdetails = streamdetails
+ return streamdetails
async def get_gain_correct(
mass: MusicAssistant, queue_id: str, item_id: str, provider_id: str
) -> Tuple[float, float]:
"""Get gain correction for given queue / track combination."""
- queue = mass.players.get_player_queue(queue_id, True)
+ queue = mass.players.get_player_queue(queue_id)
if not queue or not queue.volume_normalization_enabled:
return 0
target_gain = queue.volume_normalization_target
try:
self._proc.stdin.write(data)
await self._proc.stdin.drain()
- except BrokenPipeError:
- pass
- except (AttributeError, AssertionError) as err:
+ except (AttributeError, AssertionError, BrokenPipeError) as err:
raise asyncio.CancelledError() from err
def write_eof(self) -> None:
class AudioError(MusicAssistantError):
"""Error raised when an issue arrised when processing audio."""
+
+
+class QueueEmpty(MusicAssistantError):
+ """Error raised when trying to start queue stream while queue is empty."""
"""Model for a music player."""
player_id: str
- is_group: bool = False
+ _attr_is_group: bool = False
+ _attr_group_childs: List[str] = []
_attr_name: str = None
_attr_powered: bool = False
_attr_elapsed_time: int = 0
_attr_volume_level: int = 100
_attr_device_info: DeviceInfo = DeviceInfo()
_attr_max_sample_rate: int = 96000
+ _attr_active_queue_id: str = ""
# mass object will be set by playermanager at register
mass: MusicAssistant = None # type: ignore[assignment]
"""Return player name."""
return self._attr_name or self.player_id
+ @property
+ def is_group(self) -> bool:
+ """Return bool if this player is a grouped player (playergroup)."""
+ return self._attr_is_group
+
+ @property
+ def group_childs(self) -> List[str]:
+ """Return list of child player id's of PlayerGroup (if player is group)."""
+ return self._attr_group_childs
+
@property
def powered(self) -> bool:
"""Return current power state of player."""
"""Return the maximum supported sample rate this player supports."""
return self._attr_max_sample_rate
+ @property
+ def active_queue(self) -> PlayerQueue:
+ """
+ Return the currently active queue for this player.
+
+ If the player is a group child this will return its parent when that is playing,
+ otherwise it will return the player's own queue.
+ """
+ return self.mass.players.get_player_queue(self._attr_active_queue_id)
+
async def play_url(self, url: str) -> None:
"""Play the specified url on the player."""
raise NotImplementedError
# DO NOT OVERRIDE BELOW
- @property
- def queue(self) -> "PlayerQueue":
- """Return PlayerQueue for this player."""
- return self.mass.players.get_player_queue(self.player_id, True)
-
def update_state(self) -> None:
"""Update current player state in the player manager."""
+ # determine active queue for player
+ queue_id = self.player_id
+ for player_id in self.get_group_parents():
+ if player := self.mass.players.get_player(player_id):
+ if player.state in [PlayerState.PLAYING, PlayerState.PAUSED]:
+ queue_id = player_id
+ break
+ self._attr_active_queue_id = queue_id
# basic throttle: do not send state changed events if player did not change
prev_state = getattr(self, "_prev_state", None)
cur_state = self.to_dict()
return
setattr(self, "_prev_state", cur_state)
self.mass.signal_event(EventType.PLAYER_CHANGED, self)
- self.queue.on_player_update()
+ self.mass.players.get_player_queue(self.player_id).on_player_update()
if self.is_group:
# update group player childs when parent updates
for child_player_id in self.group_childs:
"elapsed_time": self.elapsed_time,
"state": self.state.value,
"available": self.available,
+ "is_group": self.is_group,
+ "group_childs": self.group_childs,
"volume_level": int(self.volume_level),
"device_info": self.device_info.to_dict(),
+ "active_queue": self.active_queue.queue_id,
}
class PlayerGroup(Player):
- """Model for a player group."""
+ """Convenience Model for a player group with some additional helper methods."""
is_group: bool = True
_attr_group_childs: List[str] = []
_attr_support_join_control: bool = True
- @property
- def support_join_control(self) -> bool:
- """Return bool if joining/unjoining of players to this group is supported."""
- return self._attr_support_join_control
-
- @property
- def group_childs(self) -> List[str]:
- """Return list of child player id's of this PlayerGroup."""
- return self._attr_group_childs
-
@property
def volume_level(self) -> int:
"""Return current volume level of player (scale 0..100)."""
if not self.available:
return 0
# calculate group volume from powered players for convenience
+ # may be overridden if implementation provides this natively
group_volume = 0
active_players = 0
for child_player in self._get_players(True):
group_volume = group_volume / active_players
return int(group_volume)
- async def power(self, powered: bool) -> None:
- """Send POWER command to player."""
- try:
- super().power(powered)
- except NotImplementedError:
- self._attr_powered = powered
- if not powered:
- # turn off all childs
- for child_player in self._get_players(True):
- await child_player.power(False)
-
async def volume_set(self, volume_level: int) -> None:
"""Send volume level (0..100) command to player."""
- # handle group volume
+ # handle group volume by only applying the valume to powered childs
+ # may be overridden if implementation provides this natively
cur_volume = self.volume_level
new_volume = volume_level
volume_dif = new_volume - cur_volume
)
await child_player.volume_set(new_child_volume)
- async def join(self, player_id: str) -> None:
- """Command to add/join a player to this group."""
- raise NotImplementedError
-
- async def unjoin(self, player_id: str) -> None:
- """Command to remove/unjoin a player to this group."""
- raise NotImplementedError
-
def _get_players(self, only_powered: bool = False) -> List[Player]:
"""Get players attached to this group."""
return [
from music_assistant.helpers.audio import get_stream_details
from music_assistant.helpers.typing import MusicAssistant
from music_assistant.helpers.util import create_task
+from music_assistant.models.errors import MediaNotFoundError, QueueEmpty
from music_assistant.models.media_items import MediaType, StreamDetails
from .player import Player, PlayerState
self.mass = mass
self.logger = mass.players.logger
self.queue_id = player_id
- self.player_id = player_id
self._shuffle_enabled: bool = False
self._repeat_enabled: bool = False
self._current_index: Optional[int] = None
self._current_item_time: int = 0
self._last_item: Optional[QueueItem] = None
- self._start_index: int = 0
- self._next_index: int = 0
+ self._start_index: int = 0 # from which index did the queue start playing
+ self._next_start_index: int = 0 # which index should the stream start
self._last_state = PlayerState.IDLE
self._items: List[QueueItem] = []
self._save_task: TimerHandle = None
@property
def player(self) -> Player:
"""Return the player attached to this queue."""
- return self.mass.players.get_player(self.player_id, include_unavailable=True)
-
- @property
- def available(self) -> bool:
- """Return bool if this queue is available."""
- return self.player.available
+ return self.mass.players.get_player(self.queue_id, include_unavailable=True)
@property
def active(self) -> bool:
"""Return bool if the queue is currenty active on the player."""
- # TODO: figure out a way to handle group childs playing the parent queue
if self.player.current_url is None:
return False
return self._stream_url in self.player.current_url
"""Return all items in this queue."""
return self._items
- @property
- def current_index(self) -> Optional[int]:
- """
- Return the current index of the queue.
-
- Returns None if queue is empty.
- """
- if self._current_index >= len(self._items):
- return None
- return self._current_index
-
@property
def current_item(self) -> QueueItem | None:
"""
return None
return self._items[self._current_index]
- @property
- def next_index(self) -> Optional[int]:
- """
- Return the next index for this PlayerQueue.
-
- Return None if queue is empty or no more items.
- """
- if not self._items:
- # queue is empty
- return None
- if self._current_index is None:
- # playback just started
- return 0
- # player already playing (or paused) so return the next item
- if len(self._items) > (self._current_index + 1):
- return self._current_index + 1
- if self.repeat_enabled:
- # repeat enabled, start queue at beginning
- return 0
- return None
-
@property
def next_item(self) -> QueueItem | None:
"""
Returns None if queue is empty or no more items.
"""
- if self.next_index is not None:
- return self._items[self.next_index]
+ if next_index := self.get_next_index(self._current_index):
+ return self._items[next_index]
return None
@property
async def next(self) -> None:
"""Play the next track in the queue."""
- if self._current_index is None:
- return
- if self.next_index is None:
- return
- await self.play_index(self.next_index)
+ next_index = self.get_next_index(self._current_index)
+ if next_index is None:
+ return None
+ await self.play_index(next_index)
async def previous(self) -> None:
"""Play the previous track in the queue."""
if self._current_index is None:
return
- await self.play_index(self._current_index - 1)
+ await self.play_index(max(self._current_index - 1, 0))
async def resume(self) -> None:
"""Resume previous queue."""
if not len(self.items) > index:
return
self._current_index = index
+ self._next_start_index = index
# send stream url to player connected to this queue
self._stream_url = self.mass.players.streams.get_stream_url(self.queue_id)
return True
return False
- async def queue_stream_prepare(self) -> StreamDetails | None:
+ async def queue_stream_prepare(self) -> StreamDetails:
"""Call when queue_streamer is about to start playing."""
- if next_item := self.next_item:
+ start_from_index = self._next_start_index
+ try:
+ next_item = self._items[start_from_index]
+ except IndexError as err:
+ raise QueueEmpty() from err
+ try:
return await get_stream_details(self.mass, next_item, self.queue_id)
- return None
+ except MediaNotFoundError as err:
+ # something bad happened, try to recover by requesting the next track in the queue
+ await self.play_index(self._current_index + 2)
+ raise err
- async def queue_stream_start(self) -> None:
+ async def queue_stream_start(self) -> int:
"""Call when queue_streamer starts playing the queue stream."""
+ start_from_index = self._next_start_index
self._current_item_time = 0
- self._current_index = self.next_index
- self._start_index = self._current_index
- return self._current_index
+ self._current_index = start_from_index
+ self._start_index = start_from_index
+ self._next_start_index = self.get_next_index(start_from_index)
+ return start_from_index
- async def queue_stream_next(self, cur_index: int) -> None:
+ async def queue_stream_next(self, cur_index: int) -> int | None:
"""Call when queue_streamer loads next track in buffer."""
- next_index = 0
- if len(self.items) > (next_index):
- next_index = cur_index + 1
- elif self._repeat_enabled:
+ next_idx = self._next_start_index
+ self._next_start_index = self.get_next_index(self._next_start_index)
+ return next_idx
+
+ def get_next_index(self, index: int) -> int | None:
+ """Return the next index or None if no more items."""
+ if not self._items:
+ # queue is empty
+ return None
+ if index is None:
+ # guard just in case
+ return 0
+ if len(self._items) > (index + 1):
+ return index + 1
+ if self.repeat_enabled:
# repeat enabled, start queue at beginning
- next_index = 0
- self._next_index = next_index + 1
- return next_index
+ return 0
+ return None
async def queue_stream_signal_next(self):
- """Indicate that queue stream needs to start nex index once playback finished."""
+ """Indicate that queue stream needs to start next index once playback finished."""
self._signal_next = True
async def __update_task(self) -> None:
import datetime
import hashlib
import time
+from json import JSONDecodeError
from typing import List, Optional
+import aiohttp
from asyncio_throttle import Throttler
from music_assistant.constants import EventType
from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module
async with self.mass.http_session.get(
url, headers=headers, params=params, verify_ssl=False
) as response:
- result = await response.json()
- if "error" in result or (
- "status" in result and "error" in result["status"]
- ):
- self.logger.error("%s - %s", endpoint, result)
+ try:
+ result = await response.json()
+ if "error" in result or (
+ "status" in result and "error" in result["status"]
+ ):
+ self.logger.error("%s - %s", endpoint, result)
+ return None
+ except (
+ aiohttp.ContentTypeError,
+ JSONDecodeError,
+ ) as err:
+ self.logger.error("%s - %s", endpoint, str(err))
return None
return result
from json.decoder import JSONDecodeError
from typing import List, Optional
+import aiohttp
from asyncio_throttle import Throttler
from music_assistant.helpers.app_vars import ( # noqa # pylint: disable=no-name-in-module
get_app_var,
async with self.mass.http_session.get(
url, headers=headers, params=params, verify_ssl=False
) as response:
- result = await response.json()
- if not result or "error" in result:
- self.logger.error("%s - %s", endpoint, result)
- result = None
+ try:
+ result = await response.json()
+ if "error" in result or (
+ "status" in result and "error" in result["status"]
+ ):
+ self.logger.error("%s - %s", endpoint, result)
+ return None
+ except (
+ aiohttp.ContentTypeError,
+ JSONDecodeError,
+ ) as err:
+ self.logger.error("%s - %s", endpoint, str(err))
+ return None
return result
async def _delete_data(self, endpoint, params=None, data=None):