from collections.abc import Iterable
from typing import TYPE_CHECKING, Any
-from music_assistant_models.enums import AlbumType, CacheCategory, MediaType, ProviderFeature
-from music_assistant_models.errors import (
- InvalidDataError,
- MediaNotFoundError,
- UnsupportedFeaturedException,
+from music_assistant_models.enums import (
+ AlbumType,
+ CacheCategory,
+ MediaType,
+ ProviderFeature,
+)
+from music_assistant_models.errors import InvalidDataError, MediaNotFoundError
+from music_assistant_models.media_items import (
+ Album,
+ Artist,
+ ItemMapping,
+ Track,
+ UniqueList,
)
-from music_assistant_models.media_items import Album, Artist, ItemMapping, Track, UniqueList
-from music_assistant.constants import DB_TABLE_ALBUM_ARTISTS, DB_TABLE_ALBUM_TRACKS, DB_TABLE_ALBUMS
+from music_assistant.constants import (
+ DB_TABLE_ALBUM_ARTISTS,
+ DB_TABLE_ALBUM_TRACKS,
+ DB_TABLE_ALBUMS,
+)
from music_assistant.controllers.media.base import MediaControllerBase
from music_assistant.helpers.compare import (
compare_album,
)
return items
- async def _get_provider_dynamic_base_tracks(
+ async def radio_mode_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
):
"""Get the list of base tracks from the controller used to calculate the dynamic radio."""
- assert provider_instance_id_or_domain != "library"
- return await self._get_provider_album_tracks(item_id, provider_instance_id_or_domain)
-
- async def _get_dynamic_tracks(
- self,
- media_item: Album,
- limit: int = 25,
- ) -> list[Track]:
- """Get dynamic list of tracks for given item, fallback/default implementation."""
- # TODO: query metadata provider(s) to get similar tracks (or tracks from similar artists)
- msg = "No Music Provider found that supports requesting similar tracks."
- raise UnsupportedFeaturedException(msg)
+ return await self.tracks(item_id, provider_instance_id_or_domain, in_library_only=False)
async def _set_album_artists(
- self, db_id: int, artists: Iterable[Artist | ItemMapping], overwrite: bool = False
+ self,
+ db_id: int,
+ artists: Iterable[Artist | ItemMapping],
+ overwrite: bool = False,
) -> None:
"""Store Album Artists."""
if overwrite:
import contextlib
from typing import TYPE_CHECKING, Any
-from music_assistant_models.enums import AlbumType, CacheCategory, MediaType, ProviderFeature
-from music_assistant_models.errors import (
- MediaNotFoundError,
- ProviderUnavailableError,
- UnsupportedFeaturedException,
+from music_assistant_models.enums import (
+ AlbumType,
+ CacheCategory,
+ MediaType,
+ ProviderFeature,
+)
+from music_assistant_models.errors import MediaNotFoundError, ProviderUnavailableError
+from music_assistant_models.media_items import (
+ Album,
+ Artist,
+ ItemMapping,
+ Track,
+ UniqueList,
)
-from music_assistant_models.media_items import Album, Artist, ItemMapping, Track, UniqueList
from music_assistant.constants import (
DB_TABLE_ALBUM_ARTISTS,
await self._set_provider_mappings(db_id, provider_mappings, overwrite)
self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
- async def _get_provider_dynamic_base_tracks(
+ async def radio_mode_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
):
"""Get the list of base tracks from the controller used to calculate the dynamic radio."""
- assert provider_instance_id_or_domain != "library"
- return await self.get_provider_artist_toptracks(
+ return await self.tracks(
item_id,
provider_instance_id_or_domain,
+ in_library_only=False,
)
- async def _get_dynamic_tracks(
- self,
- media_item: Artist,
- limit: int = 25,
- ) -> list[Track]:
- """Get dynamic list of tracks for given item, fallback/default implementation."""
- # TODO: query metadata provider(s) to get similar tracks (or tracks from similar artists)
- msg = "No Music Provider found that supports requesting similar tracks."
- raise UnsupportedFeaturedException(msg)
-
async def match_providers(self, db_artist: Artist) -> None:
"""Try to find matching artists on all providers for the provided (database) item_id.
await asyncio.gather(*[set_resume_position(chapter) for chapter in items])
return items
- async def _get_provider_dynamic_base_tracks(
+ async def radio_mode_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
msg = "Dynamic tracks not supported for Radio MediaItem"
raise NotImplementedError(msg)
- async def _get_dynamic_tracks(self, media_item: Audiobook, limit: int = 25) -> list[Track]:
- """Get dynamic list of tracks for given item, fallback/default implementation."""
- msg = "Dynamic tracks not supported for Audiobook MediaItem"
- raise NotImplementedError(msg)
-
async def match_providers(self, db_audiobook: Audiobook) -> None:
"""Try to find match on all (streaming) providers for the provided (database) audiobook.
Track,
)
-from music_assistant.constants import DB_TABLE_PLAYLOG, DB_TABLE_PROVIDER_MAPPINGS, MASS_LOGGER_NAME
+from music_assistant.constants import (
+ DB_TABLE_PLAYLOG,
+ DB_TABLE_PROVIDER_MAPPINGS,
+ MASS_LOGGER_NAME,
+)
from music_assistant.helpers.compare import compare_media_item
from music_assistant.helpers.json import json_loads, serialize_to_json
else:
external_id_str = f'%"{external_id}"%'
for item in await self._get_library_items_by_query(
- extra_query_parts=[query], extra_query_params={"external_id_str": external_id_str}
+ extra_query_parts=[query],
+ extra_query_params={"external_id_str": external_id_str},
):
return item
return None
subquery = f"SELECT item_id FROM provider_mappings WHERE {' AND '.join(subquery_parts)}"
query = f"WHERE {self.db_table}.item_id IN ({subquery})"
return await self._get_library_items_by_query(
- limit=limit, offset=offset, extra_query_parts=[query], extra_query_params=query_params
+ limit=limit,
+ offset=offset,
+ extra_query_parts=[query],
+ extra_query_params=query_params,
)
async def iter_library_items_by_prov_id(
with suppress(MediaNotFoundError):
if item := await provider.get_item(self.media_type, item_id):
await self.mass.cache.set(
- cache_key, item.to_dict(), category=cache_category, base_key=cache_base_key
+ cache_key,
+ item.to_dict(),
+ category=cache_category,
+ base_key=cache_base_key,
)
return item
# if we reach this point all possibilities failed and the item could not be found.
with suppress(AssertionError):
await self.remove_item_from_library(db_id)
- async def dynamic_base_tracks(
- self,
- item_id: str,
- provider_instance_id_or_domain: str,
- ) -> list[Track]:
- """Return a list of base tracks to calculate a list of dynamic tracks."""
- ref_item = await self.get(item_id, provider_instance_id_or_domain)
- for prov_mapping in ref_item.provider_mappings:
- prov = self.mass.get_provider(prov_mapping.provider_instance)
- if prov is None:
- continue
- if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
- continue
- return await self._get_provider_dynamic_base_tracks(
- prov_mapping.item_id,
- prov_mapping.provider_instance,
- )
- # Fallback to the default implementation
- return await self._get_dynamic_tracks(ref_item)
-
@abstractmethod
async def _add_library_item(
self,
"""
@abstractmethod
- async def _get_provider_dynamic_base_tracks(
+ async def radio_mode_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
) -> list[Track]:
"""Get the list of base tracks from the controller used to calculate the dynamic radio."""
- @abstractmethod
- async def _get_dynamic_tracks(self, media_item: ItemCls, limit: int = 25) -> list[Track]:
- """Get dynamic list of tracks for given item, fallback/default implementation."""
-
async def _get_library_items_by_query(
self,
favorite: bool | None = None,
from __future__ import annotations
-import random
import time
from collections.abc import AsyncGenerator
from typing import Any
-from music_assistant_models.enums import CacheCategory, MediaType, ProviderFeature, ProviderType
+from music_assistant_models.enums import CacheCategory, MediaType, ProviderFeature
from music_assistant_models.errors import (
InvalidDataError,
MediaNotFoundError,
ProviderUnavailableError,
- UnsupportedFeaturedException,
)
from music_assistant_models.media_items import Playlist, Track
# add the new playlist to the library
return await self.add_item_to_library(playlist, False)
- async def add_playlist_tracks(self, db_playlist_id: str | int, uris: list[str]) -> None: # noqa: PLR0915
+ async def add_playlist_tracks(self, db_playlist_id: str | int, uris: list[str]) -> None:
"""Add tracks to playlist."""
+ # ruff: noqa: PLR0915
db_id = int(db_playlist_id) # ensure integer
playlist = await self.get_library_item(db_id)
if not playlist:
# skip if item already in the playlist
if uri in cur_playlist_track_uris:
self.logger.info(
- "Not adding %s to playlist %s - it already exists", uri, playlist.name
+ "Not adding %s to playlist %s - it already exists",
+ uri,
+ playlist.name,
)
continue
# skip if item already in the playlist
if item_id in cur_playlist_track_ids:
self.logger.warning(
- "Not adding %s to playlist %s - it already exists", uri, playlist.name
+ "Not adding %s to playlist %s - it already exists",
+ uri,
+ playlist.name,
)
continue
)
return items
- async def _get_provider_dynamic_base_tracks(
+ async def radio_mode_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
):
"""Get the list of base tracks from the controller used to calculate the dynamic radio."""
- assert provider_instance_id_or_domain != "library"
- playlist = await self.get(item_id, provider_instance_id_or_domain)
return [
x
- async for x in self.tracks(playlist.item_id, playlist.provider)
+ async for x in self.tracks(item_id, provider_instance_id_or_domain)
# filter out unavailable tracks
if x.available
]
-
- async def _get_dynamic_tracks(
- self,
- media_item: Playlist,
- limit: int = 25,
- ) -> list[Track]:
- """Get dynamic list of tracks for given item, fallback/default implementation."""
- # check if we have any provider that supports dynamic tracks
- # TODO: query metadata provider(s) (such as lastfm?)
- # to get similar tracks (or tracks from similar artists)
- for prov in self.mass.get_providers(ProviderType.MUSIC):
- if ProviderFeature.SIMILAR_TRACKS in prov.supported_features:
- break
- else:
- msg = "No Music Provider found that supports requesting similar tracks."
- raise UnsupportedFeaturedException(msg)
-
- radio_items: list[Track] = []
- radio_item_titles: set[str] = set()
- playlist_tracks = [x async for x in self.tracks(media_item.item_id, media_item.provider)]
- random.shuffle(playlist_tracks)
- for playlist_track in playlist_tracks:
- # prefer library item if available so we can use all providers
- if playlist_track.provider != "library" and (
- db_item := await self.mass.music.tracks.get_library_item_by_prov_id(
- playlist_track.item_id, playlist_track.provider
- )
- ):
- playlist_track = db_item # noqa: PLW2901
-
- if not playlist_track.available:
- continue
- # include base item in the list
- radio_items.append(playlist_track)
- radio_item_titles.add(playlist_track.name)
- # now try to find similar tracks
- for item_prov_mapping in playlist_track.provider_mappings:
- if not (prov := self.mass.get_provider(item_prov_mapping.provider_instance)):
- continue
- if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
- continue
- # fetch some similar tracks on this provider
- for similar_track in await prov.get_similar_tracks(
- prov_track_id=item_prov_mapping.item_id, limit=5
- ):
- if similar_track.name not in radio_item_titles:
- radio_items.append(similar_track)
- radio_item_titles.add(similar_track.name)
- continue
- if len(radio_items) >= limit:
- break
- # Shuffle the final items list
- random.shuffle(radio_items)
- return radio_items
await asyncio.gather(*[set_resume_position(chapter) for chapter in items])
return items
- async def _get_provider_dynamic_base_tracks(
+ async def radio_mode_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
msg = "Dynamic tracks not supported for Podcast MediaItem"
raise NotImplementedError(msg)
- async def _get_dynamic_tracks(self, media_item: Podcast, limit: int = 25) -> list[Track]:
- """Get dynamic list of tracks for given item, fallback/default implementation."""
- msg = "Dynamic tracks not supported for Podcast MediaItem"
- raise NotImplementedError(msg)
-
async def match_providers(self, db_podcast: Podcast) -> None:
"""Try to find match on all (streaming) providers for the provided (database) podcast.
await self._set_provider_mappings(db_id, provider_mappings, overwrite)
self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
- async def _get_provider_dynamic_base_tracks(
+ async def radio_mode_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
"""Get the list of base tracks from the controller used to calculate the dynamic radio."""
msg = "Dynamic tracks not supported for Radio MediaItem"
raise NotImplementedError(msg)
-
- async def _get_dynamic_tracks(self, media_item: Radio, limit: int = 25) -> list[Track]:
- """Get dynamic list of tracks for given item, fallback/default implementation."""
- msg = "Dynamic tracks not supported for Radio MediaItem"
- raise NotImplementedError(msg)
from contextlib import suppress
from typing import Any
-from music_assistant_models.enums import MediaType, ProviderFeature
+from music_assistant_models.enums import MediaType, ProviderFeature, ProviderType
from music_assistant_models.errors import (
InvalidDataError,
MediaNotFoundError,
self.mass.register_api_command(f"music/{api_base}/track_versions", self.versions)
self.mass.register_api_command(f"music/{api_base}/track_albums", self.albums)
self.mass.register_api_command(f"music/{api_base}/preview", self.get_preview_url)
+ self.mass.register_api_command(f"music/{api_base}/similar_tracks", self.similar_tracks)
async def get(
self,
result.append(prov_item.album)
return result
+ async def similar_tracks(
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ limit: int = 25,
+ allow_lookup: bool = False,
+ ):
+ """Get a list of similar tracks for the given track."""
+ ref_item = await self.get(item_id, provider_instance_id_or_domain)
+ for prov_mapping in ref_item.provider_mappings:
+ prov = self.mass.get_provider(prov_mapping.provider_instance)
+ if prov is None:
+ continue
+ if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
+ continue
+ # Grab similar tracks from the music provider
+ return await prov.get_similar_tracks(prov_track_id=prov_mapping.item_id, limit=limit)
+ if not allow_lookup:
+ return []
+
+ # check if we have any provider that supports dynamic tracks
+ # TODO: query metadata provider(s) (such as lastfm?)
+ # to get similar tracks (or tracks from similar artists)
+ for prov in self.mass.get_providers(ProviderType.MUSIC):
+ if ProviderFeature.SIMILAR_TRACKS in prov.supported_features:
+ break
+ else:
+ msg = "No Music Provider found that supports requesting similar tracks."
+ raise UnsupportedFeaturedException(msg)
+
+ if ref_item.provider == "library":
+ await self.mass.metadata.update_metadata(ref_item)
+ else:
+ await self.match_providers(ref_item)
+
+ return []
+
async def remove_item_from_library(self, item_id: str | int) -> None:
"""Delete record from the database."""
db_id = int(item_id) # ensure integer
query = f"{DB_TABLE_ALBUMS}.item_id in ({subquery})"
return await self.mass.music.albums._get_library_items_by_query(extra_query_parts=[query])
- async def match_providers(self, db_track: Track) -> None:
+ async def match_providers(self, ref_track: Track) -> None:
"""Try to find matching track on all providers for the provided (database) track_id.
This is used to link objects of different providers/qualities together.
"""
- if db_track.provider != "library":
- return # Matching only supported for database items
- track_albums = await self.albums(db_track.item_id, db_track.provider)
+ track_albums = await self.albums(ref_track.item_id, ref_track.provider)
for provider in self.mass.music.providers:
if ProviderFeature.SEARCH not in provider.supported_features:
continue
if not provider.library_supported(MediaType.TRACK):
continue
provider_matches = await self.match_provider(
- provider, db_track, strict=True, ref_albums=track_albums
+ provider, ref_track, strict=True, ref_albums=track_albums
)
for provider_mapping in provider_matches:
# 100% match, we update the db with the additional provider mapping(s)
- await self.add_provider_mapping(db_track.item_id, provider_mapping)
- db_track.provider_mappings.add(provider_mapping)
+ await self.add_provider_mapping(ref_track.item_id, provider_mapping)
+ ref_track.provider_mappings.add(provider_mapping)
async def match_provider(
self,
)
return matches
- async def get_provider_similar_tracks(
- self, item_id: str, provider_instance_id_or_domain: str, limit: int = 25
- ):
- """Get a list of similar tracks from the provider, based on the track."""
- ref_item = await self.get(item_id, provider_instance_id_or_domain)
- for prov_mapping in ref_item.provider_mappings:
- prov = self.mass.get_provider(prov_mapping.provider_instance)
- if prov is None:
- continue
- if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
- continue
- # Grab similar tracks from the music provider
- return await prov.get_similar_tracks(prov_track_id=prov_mapping.item_id, limit=limit)
- return []
-
- async def _get_provider_dynamic_base_tracks(
+ async def radio_mode_base_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
):
"""Get the list of base tracks from the controller used to calculate the dynamic radio."""
- assert provider_instance_id_or_domain != "library"
return [await self.get(item_id, provider_instance_id_or_domain)]
- async def _get_dynamic_tracks(
- self,
- media_item: Track,
- limit: int = 25,
- ) -> list[Track]:
- """Get dynamic list of tracks for given item, fallback/default implementation."""
- # TODO: query metadata provider(s) to get similar tracks (or tracks from similar artists)
- msg = "No Music Provider found that supports requesting similar tracks."
- raise UnsupportedFeaturedException(msg)
-
async def _add_library_item(self, item: Track) -> int:
"""Add a new item record to the database."""
if not isinstance(item, Track):
)
async def _set_track_artists(
- self, db_id: int, artists: Iterable[Artist | ItemMapping], overwrite: bool = False
+ self,
+ db_id: int,
+ artists: Iterable[Artist | ItemMapping],
+ overwrite: bool = False,
) -> None:
"""Store Track Artists."""
if overwrite:
from types import NoneType
from typing import TYPE_CHECKING, Any, TypedDict
-from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
+from music_assistant_models.config_entries import (
+ ConfigEntry,
+ ConfigValueOption,
+ ConfigValueType,
+)
from music_assistant_models.enums import (
CacheCategory,
ConfigEntryType,
UnsupportedFeaturedException,
)
from music_assistant_models.media_items import (
- AudioFormat,
Chapter,
Episode,
MediaItemType,
from music_assistant_models.player import PlayerMedia
from music_assistant_models.player_queue import PlayerQueue
from music_assistant_models.queue_item import QueueItem
-from music_assistant_models.streamdetails import StreamDetails
from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE, MASS_LOGO_ONLINE
from music_assistant.helpers.api import api_command
queue_id: str
state: PlayerState
- current_index: int | None
+ current_item_id: str | None
+ next_item_id: str | None
elapsed_time: int
stream_title: str | None
content_type: str | None
self._prev_states: dict[str, CompareState] = {}
self.manifest.name = "Player Queues controller"
self.manifest.description = (
- "Music Assistant's core controller which manages the queues for all players."
+ "Music Assistant's core controller " "which manages the queues for all players."
)
self.manifest.icon = "playlist-music"
break
except MediaNotFoundError:
self.logger.warning(
- "Failed to fetch next track for queue %s - trying next item", queue.display_name
+ "Failed to fetch next track for queue %s - trying next item",
+ queue.display_name,
)
idx += 1
queue.index_in_buffer = index
queue.flow_mode_stream_log = []
queue.flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE)
- next_index = self._get_next_index(queue_id, index, allow_repeat=False)
queue.current_item = queue_item
queue.next_track_enqueued = None
- self.signal_update(queue_id)
# handle resume point of audiobook(chapter) or podcast(episode)
if not seek_position and (
):
seek_position = max(0, int((resume_position_ms - 500) / 1000))
- # work out if we are playing an album and if we should prefer album loudness
- if (
- next_index is not None
- and (next_item := self.get_item(queue_id, next_index))
- and (
- queue_item.media_item
- and hasattr(queue_item.media_item, "album")
- and hasattr(next_item.media_item, "album")
- and queue_item.media_item.album
- and next_item.media_item
- and next_item.media_item.album
- and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
- )
- ):
- prefer_album_loudness = True
- else:
- prefer_album_loudness = False
-
- # get streamdetails - do this here to catch unavailable items early
- queue_item.streamdetails = await get_stream_details(
- self.mass,
+ # load item (which also fetches the streamdetails)
+ # do this here to catch unavailable items early
+ next_index = self._get_next_index(queue_id, index, allow_repeat=False)
+ await self._load_item(
queue_item,
+ next_index,
+ is_start=True,
seek_position=seek_position,
fade_in=fade_in,
- prefer_album_loudness=prefer_album_loudness,
)
- # allow stripping silence from the end of the track if crossfade is enabled
- # this will allow for smoother crossfades
- if await self.mass.config.get_player_config_value(queue_id, CONF_CROSSFADE):
- queue_item.streamdetails.strip_silence_end = True
# send play_media request to player
# NOTE that we debounce this a bit to account for someone hitting the next button
# like a madman. This will prevent the player from being overloaded with requests.
target_queue.shuffle_enabled = source_queue.shuffle_enabled
target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
target_queue.radio_source = source_queue.radio_source
+ target_queue.enqueued_media_items = source_queue.enqueued_media_items
target_queue.resume_pos = source_queue.elapsed_time
target_queue.current_index = source_queue.current_index
if source_queue.current_item:
queue.available = player.available
queue.items = len(self._queue_items[queue_id])
# determine if this queue is currently active for this player
- queue.active = player.powered and player.active_source == queue.queue_id
- if not queue.active:
- # return early if the queue is not active
+ queue.active = player.active_source == queue.queue_id
+ if not queue.active and queue_id not in self._prev_states:
queue.state = PlayerState.IDLE
- if prev_state := self._prev_states.pop(queue_id, None):
- self.signal_update(queue_id)
+ # return early if the queue is not active and we have no previous state
return
+
# update current item from player report
- if queue.flow_mode:
- # flow mode active, calculate current item
- queue.current_index, queue.elapsed_time = self._get_flow_queue_stream_index(
- queue, player
- )
+ if player.state == PlayerState.PLAYING:
+ if queue.flow_mode:
+ # flow mode active, the player is playing one long stream
+ # so we need to calculate the current index and elapsed time
+ queue.current_index, queue.elapsed_time = self._get_flow_queue_stream_index(
+ queue, player
+ )
+ else:
+ # normal mode, the player itself will report the current item
+ queue.elapsed_time = int(player.corrected_elapsed_time or 0)
+ if item_id := self._parse_player_current_item_id(queue_id, player):
+ queue.current_index = self.index_by_id(queue_id, item_id)
+ # generic attributes we update when player is playing
+ queue.state = PlayerState.PLAYING
queue.elapsed_time_last_updated = time.time()
else:
- # queue is active and player has one of our tracks loaded, update state
- if item_id := self._parse_player_current_item_id(queue_id, player):
- queue.current_index = self.index_by_id(queue_id, item_id)
- if player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
- queue.elapsed_time = int(player.corrected_elapsed_time or 0)
- queue.elapsed_time_last_updated = player.elapsed_time_last_updated or 0
+ queue.state = player.state or PlayerState.IDLE
- # only update these attributes if the queue is active
- # and has an item loaded so we are able to resume it
- queue.state = player.state or PlayerState.IDLE
+ # set current item and next item from the current index
queue.current_item = self.get_item(queue_id, queue.current_index)
- queue.next_item = (
- self.get_item(queue_id, queue.next_track_enqueued)
- if queue.next_track_enqueued
- else self._get_next_item(queue_id, queue.current_index)
- )
+ queue.next_item = self._get_next_item(queue_id, queue.current_index)
# correct elapsed time when seeking
if (
- queue.current_item
+ player.state == PlayerState.PLAYING
+ and not queue.flow_mode
+ and queue.current_item
and queue.current_item.streamdetails
and queue.current_item.streamdetails.seek_position
- and player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
- and not queue.flow_mode
):
queue.elapsed_time += queue.current_item.streamdetails.seek_position
- # enqueue next track if needed
- if (
- queue.state == PlayerState.PLAYING
- and queue.next_item is not None
- and not queue.next_track_enqueued
- and queue.corrected_elapsed_time > 2
- ):
- self._check_enqueue_next(queue)
-
- # basic throttle: do not send state changed events if queue did not actually change
- prev_state = self._prev_states.get(
+ prev_state: CompareState = self._prev_states.get(
queue_id,
CompareState(
queue_id=queue_id,
state=PlayerState.IDLE,
- current_index=None,
+ current_item_id=None,
+ next_item_id=None,
elapsed_time=0,
stream_title=None,
),
)
+
+ # enqueue/preload next track if needed
+ next_item_id = queue.next_item.queue_item_id if queue.next_item else None
+ prev_next_item_id = prev_state["next_item_id"] if prev_state else None
+ if queue.state == PlayerState.PLAYING and (
+ next_item_id != prev_next_item_id or queue.next_track_enqueued is None
+ ):
+ self._preload_next_item(queue)
+
+ # basic throttle: do not send state changed events if queue did not actually change
new_state = CompareState(
queue_id=queue_id,
state=queue.state,
- current_index=queue.current_index,
+ current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
+ next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
elapsed_time=queue.elapsed_time,
stream_title=queue.current_item.streamdetails.stream_title
if queue.current_item and queue.current_item.streamdetails
if len(changed_keys) == 0:
return
- # do not send full updates if only time was updated
+ # signal update and store state
if changed_keys == {"elapsed_time"}:
+ # do not send full updates if only time was updated
self.mass.signal_event(
EventType.QUEUE_TIME_UPDATED,
object_id=queue_id,
data=queue.elapsed_time,
)
+ else:
+ self.signal_update(queue_id)
+ if queue.active:
self._prev_states[queue_id] = new_state
- return
-
- # signal update and store state
- self.signal_update(queue_id)
- self._prev_states[queue_id] = new_state
+ else:
+ self._prev_states.pop(queue_id, None)
# detect change in current index to report that a item has been played
end_of_queue_reached = (
and queue.current_item is not None
and queue.next_item is None
)
+ prev_item_id = prev_state["current_item_id"]
if (
- prev_state["current_index"] is not None
- and (prev_state["current_index"] != new_state["current_index"] or end_of_queue_reached)
- and (queue_item := self.get_item(queue_id, prev_state["current_index"]))
- and (stream_details := queue_item.streamdetails)
+ prev_item_id is not None
+ and (prev_item_id != new_state["current_item_id"] or end_of_queue_reached)
+ and (prev_item := self.get_item(queue_id, prev_item_id))
+ and (stream_details := prev_item.streamdetails)
):
- seconds_played = prev_state["elapsed_time"]
+ seconds_played = int(prev_state["elapsed_time"])
fully_played = seconds_played >= (stream_details.duration or 3600) - 5
+ self.logger.debug(
+ "PlayerQueue %s played item %s for %s seconds",
+ queue.display_name,
+ prev_item.uri,
+ seconds_played,
+ )
if music_prov := self.mass.get_provider(stream_details.provider):
- if fully_played or (seconds_played > 10):
- self.mass.create_task(music_prov.on_streamed(stream_details, seconds_played))
+ self.mass.create_task(
+ music_prov.on_streamed(stream_details, seconds_played, fully_played)
+ )
+ if prev_item.media_item and (fully_played or seconds_played > 2):
+ # add entry to playlog - this also handles resume of podcasts/audiobooks
self.mass.create_task(
self.mass.music.mark_item_played(
stream_details.media_type,
seconds_played=seconds_played,
)
)
- if queue_item.media_item and (fully_played or seconds_played > 10):
# signal 'media item played' event,
# which is useful for plugins that want to do scrobbling
self.mass.signal_event(
EventType.MEDIA_ITEM_PLAYED,
- object_id=queue_item.media_item.uri,
- data=round(seconds_played, 2),
+ object_id=prev_item.media_item.uri,
+ data={
+ "media_item": prev_item.media_item.uri,
+ "seconds_played": seconds_played,
+ "fully_played": fully_played,
+ },
)
if end_of_queue_reached:
# end of queue reached, clear items
+ self.logger.debug(
+ "PlayerQueue %s reached end of queue...",
+ queue.display_name,
+ )
self.mass.call_later(
5, self._check_clear_queue, queue, task_id=f"clear_queue_{queue_id}"
)
- # clear 'next track enqueued' flag if new track is loaded
- if prev_state["current_index"] != new_state["current_index"]:
- queue.next_track_enqueued = None
-
# watch dynamic radio items refill if needed
- if "current_index" in changed_keys:
+ if "current_item_id" in changed_keys:
+ # auto enable radio mode if dont stop the music is enabled
if (
queue.dont_stop_the_music_enabled
and queue.enqueued_media_items
# set the played media item(s) as radio items (which will refill the queue)
# note that this will fail if there are no media items for which we have
# a dynamic radio source.
+ self.logger.debug(
+ "End of queue detected and Don't stop the music is enabled for %s"
+ " - setting enqueued media items as radio source: %s",
+ queue.display_name,
+ ", ".join([x.uri for x in queue.enqueued_media_items]),
+ )
queue.radio_source = queue.enqueued_media_items
+ # auto fill radio tracks if less than 5 tracks left in the queue
if (
queue.radio_source
and queue.current_index is not None
async def load_next_item(
self,
queue_id: str,
- current_item_id_or_index: str | int | None = None,
+ current_item_id: str,
) -> QueueItem:
- """Call when a player wants to (pre)load the next item into the buffer.
+ """
+ Call when a player wants to (pre)load the next item into the buffer.
Raises QueueEmpty if there are no more tracks left.
"""
if not queue:
msg = f"PlayerQueue {queue_id} is not available"
raise PlayerUnavailableError(msg)
- if current_item_id_or_index is None:
- cur_index = queue.index_in_buffer or queue.current_index or 0
- elif isinstance(current_item_id_or_index, str):
- cur_index = self.index_by_id(queue_id, current_item_id_or_index)
- else:
- cur_index = current_item_id_or_index
+ cur_index = self.index_by_id(queue_id, current_item_id)
idx = 0
while True:
next_item: QueueItem | None = None
queue_item = self.get_item(queue_id, next_index)
if queue_item is None:
raise QueueEmpty("No more tracks left in the queue.")
-
- # work out if we are playing an album and if we should prefer album loudness
- if (
- next_index is not None
- and (next_item := self.get_item(queue_id, next_index))
- and (
- queue_item.media_item
- and hasattr(queue_item.media_item, "album")
- and queue_item.media_item.album
- and next_item.media_item
- and hasattr(next_item.media_item, "album")
- and next_item.media_item.album
- and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
- )
- ):
- prefer_album_loudness = True
- else:
- prefer_album_loudness = False
-
try:
- # Check if the QueueItem is playable. For example, YT Music returns Radio Items
- # that are not playable which will stop playback.
- queue_item.streamdetails = await get_stream_details(
- mass=self.mass,
- queue_item=queue_item,
- prefer_album_loudness=prefer_album_loudness,
- )
- # Ensure we have at least an image for the queue item,
- # so grab full item if needed. Note that for YTM this is always needed
- # because it has poor thumbs by default (..sigh)
- if queue_item.media_item and (
- not queue_item.media_item.image
- or queue_item.media_item.provider.startswith("ytmusic")
- ):
- queue_item.media_item = await self.mass.music.get_item_by_uri(queue_item.uri)
- # allow stripping silence from the begin/end of the track if crossfade is enabled
- # this will allow for (much) smoother crossfades
- if await self.mass.config.get_player_config_value(queue_id, CONF_CROSSFADE):
- queue_item.streamdetails.strip_silence_end = True
- queue_item.streamdetails.strip_silence_begin = True
+ await self._load_item(queue_item, next_index)
# we're all set, this is our next item
next_item = queue_item
break
except MediaNotFoundError:
# No stream details found, skip this QueueItem
self.logger.debug("Skipping unplayable item: %s", next_item)
- queue_item.streamdetails = StreamDetails(
- provider=queue_item.media_item.provider if queue_item.media_item else "unknown",
- item_id=queue_item.media_item.item_id if queue_item.media_item else "unknown",
- audio_format=AudioFormat(),
- media_type=queue_item.media_type,
- seconds_streamed=0,
- )
+ if queue_item.media_item:
+ queue_item.media_item.available = False
idx += 1
if next_item is None:
raise QueueEmpty("No more (playable) tracks left in the queue.")
+
return next_item
+ async def _load_item(
+ self,
+ queue_item: QueueItem,
+ next_index: int | None,
+ is_start: bool = False,
+ seek_position: int = 0,
+ fade_in: bool = False,
+ ) -> None:
+ """Try to load the stream details for the given queue item."""
+ queue_id = queue_item.queue_id
+
+ # we use a contextvar to bypass the throttler for this asyncio task/context
+ # this makes sure that playback has priority over other requests that may be
+ # happening in the background
+ BYPASS_THROTTLER.set(True)
+
+ # work out if we are playing an album and if we should prefer album loudness
+ prefer_album_loudness = (
+ next_index is not None
+ and (next_item := self.get_item(queue_id, next_index))
+ and (
+ queue_item.media_item
+ and hasattr(queue_item.media_item, "album")
+ and queue_item.media_item.album
+ and next_item.media_item
+ and hasattr(next_item.media_item, "album")
+ and next_item.media_item.album
+ and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
+ )
+ )
+ if queue_item.media_item:
+ # prefer the full library media item so we have all metadata and provider(quality) info
+ # always request the full library item as there might be other qualities available
+ if library_item := await self.mass.music.get_library_item_by_prov_id(
+ queue_item.media_item.media_type,
+ queue_item.media_item.item_id,
+ queue_item.media_item.provider,
+ ):
+ queue_item.media_item = library_item
+ elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
+ "ytmusic"
+ ):
+ # Youtube Music has poor thumbs by default, so we always fetch the full item
+ # this also catches the case where they have an unavailable item in a listing
+ queue_item.media_item = await self.mass.music.get_item_by_uri(queue_item.uri)
+ # Fetch the streamdetails, which could raise in case of an unplayable item.
+ # For example, YT Music returns Radio Items that are not playable.
+ queue_item.streamdetails = await get_stream_details(
+ mass=self.mass,
+ queue_item=queue_item,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ prefer_album_loudness=prefer_album_loudness,
+ )
+ # allow stripping silence from the begin/end of the track if crossfade is enabled
+ # this will allow for (much) smoother crossfades
+ if await self.mass.config.get_player_config_value(queue_id, CONF_CROSSFADE):
+ queue_item.streamdetails.strip_silence_end = True
+ queue_item.streamdetails.strip_silence_begin = not is_start
+
def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
"""Call when a player has (started) loading a track in the buffer."""
queue = self.get(queue_id)
# store the index of the item that is currently (being) loaded in the buffer
# which helps us a bit to determine how far the player has buffered ahead
queue.index_in_buffer = self.index_by_id(queue_id, item_id)
- if queue.flow_mode:
- return # nothing to do when flow mode is active
self.signal_update(queue_id)
# Main queue manipulation methods
return all_episodes[episode_index:]
def _get_next_index(
- self, queue_id: str, cur_index: int | None, is_skip: bool = False, allow_repeat: bool = True
+ self,
+ queue_id: str,
+ cur_index: int | None,
+ is_skip: bool = False,
+ allow_repeat: bool = True,
) -> int | None:
"""
Return the next index for the queue, accounting for repeat settings.
def _get_next_item(self, queue_id: str, cur_index: int | None = None) -> QueueItem | None:
"""Return next QueueItem for given queue."""
- if (next_index := self._get_next_index(queue_id, cur_index)) is not None:
- return self.get_item(queue_id, next_index)
+ while True:
+ if (next_index := self._get_next_index(queue_id, cur_index)) is None:
+ break
+ if next_item := self.get_item(queue_id, next_index):
+ if next_item.media_item and not next_item.media_item.available:
+ # ensure that we skip unavailable items (set by load_next track logic)
+ continue
+ return next_item
return None
async def _fill_radio_tracks(self, queue_id: str) -> None:
"""Fill a Queue with (additional) Radio tracks."""
+ self.logger.debug(
+ "Filling radio tracks for queue %s",
+ queue_id,
+ )
tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
# fill queue - filter out unavailable items
queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
insert_at_index=len(self._queue_items[queue_id]) + 1,
)
- def _check_enqueue_next(self, queue: PlayerQueue) -> None:
- """Enqueue the next item in the queue (if needed)."""
- if queue.flow_mode:
- return
- if queue.next_item is None:
+ def _preload_next_item(self, queue: PlayerQueue) -> None:
+ """Preload the next item in the queue (if needed)."""
+ current_item = queue.current_item
+ if current_item is None or queue.next_item is None:
return
if queue.next_track_enqueued == queue.next_item.queue_item_id:
return
+ # ensure we're at least 2 seconds in the current track
+ if queue.corrected_elapsed_time < 2:
+ return
+ # preload happens when we're (at least) halfway the current track
+ if current_item.streamdetails and current_item.streamdetails.duration:
+ track_time = queue.current_item.streamdetails.duration
+ else:
+ track_time = current_item.duration or 10
+ if (queue.corrected_elapsed_time - track_time) < (track_time / 2):
+ return
async def _enqueue_next():
- next_item = await self.load_next_item(queue.queue_id, queue.current_index)
+ next_item = await self.load_next_item(queue.queue_id, current_item.queue_item_id)
+ # abort if we already enqueued the (selected) next track
+ if queue.next_track_enqueued == next_item.queue_item_id:
+ return
queue.next_track_enqueued = next_item.queue_item_id
- await self.mass.players.enqueue_next_media(
- player_id=queue.queue_id,
- media=self.player_media_from_queue_item(next_item, False),
- )
+ if not queue.flow_mode:
+ await self.mass.players.enqueue_next_media(
+ player_id=queue.queue_id,
+ media=self.player_media_from_queue_item(next_item, False),
+ )
self.mass.create_task(_enqueue_next())
if not queue.radio_source:
# this may happen during race conditions as this method is called delayed
return None
+ self.logger.info(
+ "Fetching radio tracks for queue %s based on: %s",
+ queue.display_name,
+ ", ".join([x.name for x in queue.radio_source]),
+ )
available_base_tracks: list[Track] = []
base_track_sample_size = 5
# Grab all the available base tracks based on the selected source items.
try:
available_base_tracks += [
track
- for track in await ctrl.dynamic_base_tracks(
+ for track in await ctrl.radio_mode_base_tracks(
radio_item.item_id, radio_item.provider
)
# Avoid duplicate base tracks
if track not in available_base_tracks
]
- except UnsupportedFeaturedException:
+ except UnsupportedFeaturedException as err:
self.logger.debug(
- "Skip loading radio items for %s: - "
- "Provider %s does not support dynamic (base) tracks",
+ "Skip loading radio items for %s: %s ",
radio_item.uri,
- radio_item.provider,
+ str(err),
)
+ if not available_base_tracks:
+ raise UnsupportedFeaturedException("Radio mode not available for source items")
+
# Sample tracks from the base tracks, which will be used to calculate the dynamic ones
base_tracks = random.sample(
- available_base_tracks, min(base_track_sample_size, len(available_base_tracks))
+ available_base_tracks,
+ min(base_track_sample_size, len(available_base_tracks)),
)
# Use a set to avoid duplicate dynamic tracks
dynamic_tracks: set[Track] = set()
- track_ctrl = self.mass.music.get_controller(MediaType.TRACK)
# Use base tracks + Trackcontroller to obtain similar tracks for every base Track
- for base_track in base_tracks:
- [
- dynamic_tracks.add(track)
- for track in await track_ctrl.get_provider_similar_tracks(
- base_track.item_id, base_track.provider
- )
- if track not in base_tracks
- # Ignore tracks that are too long for radio mode, e.g. mixes
- and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
- ]
- if len(dynamic_tracks) >= 50:
+ for allow_lookup in (False, True):
+ if dynamic_tracks:
break
+ for base_track in base_tracks:
+ [
+ dynamic_tracks.add(track)
+ for track in await self.mass.music.tracks.similar_tracks(
+ base_track.item_id,
+ base_track.provider,
+ allow_lookup=allow_lookup,
+ )
+ if track not in base_tracks
+ # Ignore tracks that are too long for radio mode, e.g. mixes
+ and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
+ ]
+ if len(dynamic_tracks) >= 50:
+ break
queue_tracks: list[Track] = []
dynamic_tracks = list(dynamic_tracks)
# Only include the sampled base tracks when the radio mode is first initialized
if len(base_tracks) > 1:
for base_track in base_tracks[1:]:
queue_tracks += [base_track]
- queue_tracks += random.sample(dynamic_tracks, 2)
+ if len(dynamic_tracks) > 2:
+ queue_tracks += random.sample(dynamic_tracks, 2)
+ else:
+ queue_tracks += dynamic_tracks
# Add dynamic tracks to the queue, make sure to exclude already picked tracks
remaining_dynamic_tracks = [t for t in dynamic_tracks if t not in queue_tracks]
- queue_tracks += random.sample(
- remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
- )
+ if remaining_dynamic_tracks:
+ queue_tracks += random.sample(
+ remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
+ )
return queue_tracks
async def _check_clear_queue(self, queue: PlayerQueue) -> None:
)
if elapsed_time_queue_total > (queue_item_duration + played_time):
# total elapsed time is more than (streamed) track duration
- # this track has been fully played, move in.
+ # this track has been fully played, move on.
played_time += queue_item_duration
else:
# no more seconds left to divide, this is our track
from aiofiles.os import wrap
from aiohttp import web
-from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
+from music_assistant_models.config_entries import (
+ ConfigEntry,
+ ConfigValueOption,
+ ConfigValueType,
+)
from music_assistant_models.enums import (
ConfigEntryType,
ContentType,
)
from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
-from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool
+from music_assistant.helpers.util import (
+ get_ip,
+ get_ips,
+ select_free_port,
+ try_parse_bool,
+)
from music_assistant.helpers.webserver import Webserver
from music_assistant.models.core_controller import CoreController
from music_assistant.models.plugin import PluginProvider
async for chunk in get_ffmpeg_stream(
audio_input=self.get_flow_stream(
- queue=queue, start_queue_item=start_queue_item, pcm_format=flow_pcm_format
+ queue=queue,
+ start_queue_item=start_queue_item,
+ pcm_format=flow_pcm_format,
),
input_format=flow_pcm_format,
output_format=output_format,
queue_track = start_queue_item
else:
try:
- queue_track = await self.mass.player_queues.load_next_item(queue.queue_id)
+ queue_track = await self.mass.player_queues.load_next_item(
+ queue.queue_id, queue_track.queue_item_id
+ )
except QueueEmpty:
break
if queue_track.streamdetails is None:
raise RuntimeError(
- "No Streamdetails known for queue item %s", queue_track.queue_item_id
+ "No Streamdetails known for queue item %s",
+ queue_track.queue_item_id,
)
self.logger.debug(
self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
async def get_announcement_stream(
- self, announcement_url: str, output_format: AudioFormat, use_pre_announce: bool = False
+ self,
+ announcement_url: str,
+ output_format: AudioFormat,
+ use_pre_announce: bool = False,
) -> AsyncGenerator[bytes, None]:
"""Get the special announcement stream."""
# work out output format/details
import aiofiles
from aiohttp import ClientTimeout
-from music_assistant_models.enums import ContentType, MediaType, StreamType, VolumeNormalizationMode
+from music_assistant_models.enums import (
+ ContentType,
+ MediaType,
+ StreamType,
+ VolumeNormalizationMode,
+)
from music_assistant_models.errors import (
InvalidDataError,
MediaNotFoundError,
from .ffmpeg import FFMpeg, get_ffmpeg_stream
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, check_output, communicate
-from .throttle_retry import BYPASS_THROTTLER
from .util import TimedAsyncGenerator, create_tempfile, detect_charset
if TYPE_CHECKING:
fade_in: bool = False,
prefer_album_loudness: bool = False,
) -> StreamDetails:
- """Get streamdetails for the given QueueItem.
+ """
+ Get streamdetails for the given QueueItem.
This is called just-in-time when a PlayerQueue wants a MediaItem to be played.
- Do not try to request streamdetails in advance as this is expiring data.
- param media_item: The QueueItem for which to request the streamdetails for.
+ Do not try to request streamdetails too much in advance as this is expiring data.
"""
time_start = time.time()
LOGGER.debug("Getting streamdetails for %s", queue_item.uri)
if seek_position and (queue_item.media_type == MediaType.RADIO or not queue_item.duration):
LOGGER.warning("seeking is not possible on duration-less streams!")
seek_position = 0
- # we use a contextvar to bypass the throttler for this asyncio task/context
- # this makes sure that playback has priority over other requests that may be
- # happening in the background
- BYPASS_THROTTLER.set(True)
- if not queue_item.media_item:
+
+ if not queue_item.media_item and not queue_item.streamdetails:
+ # in case of a non-media item queue item, the streamdetails should already be provided
# this should not happen, but guard it just in case
- assert queue_item.streamdetails, "streamdetails required for non-mediaitem queueitems"
- return queue_item.streamdetails
- # always request the full library item as there might be other qualities available
- media_item = (
- await mass.music.get_library_item_by_prov_id(
- queue_item.media_item.media_type,
- queue_item.media_item.item_id,
- queue_item.media_item.provider,
- )
- or queue_item.media_item
- )
- # sort by quality and check item's availability
- for prov_media in sorted(
- media_item.provider_mappings, key=lambda x: x.quality or 0, reverse=True
- ):
- if not prov_media.available:
- LOGGER.debug(f"Skipping unavailable {prov_media}")
- continue
- # guard that provider is available
- music_prov = mass.get_provider(prov_media.provider_instance)
- if not music_prov:
- LOGGER.debug(f"Skipping {prov_media} - provider not available")
- continue # provider not available ?
- # get streamdetails from provider
- try:
- streamdetails: StreamDetails = await music_prov.get_stream_details(
- prov_media.item_id, media_item.media_type
- )
- except MusicAssistantError as err:
- LOGGER.warning(str(err))
- else:
- break
- else:
raise MediaNotFoundError(
f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
)
+ if queue_item.streamdetails and not queue_item.streamdetails.seconds_streamed:
+ # already got a fresh/unused streamdetails
+ streamdetails = queue_item.streamdetails
+ else:
+ media_item = queue_item.media_item
+ # sort by quality and check item's availability
+ for prov_media in sorted(
+ media_item.provider_mappings, key=lambda x: x.quality or 0, reverse=True
+ ):
+ if not prov_media.available:
+ LOGGER.debug(f"Skipping unavailable {prov_media}")
+ continue
+ # guard that provider is available
+ music_prov = mass.get_provider(prov_media.provider_instance)
+ if not music_prov:
+ LOGGER.debug(f"Skipping {prov_media} - provider not available")
+ continue # provider not available ?
+ # get streamdetails from provider
+ try:
+ streamdetails: StreamDetails = await music_prov.get_stream_details(
+ prov_media.item_id, media_item.media_type
+ )
+ except MusicAssistantError as err:
+ LOGGER.warning(str(err))
+ else:
+ break
+ else:
+ raise MediaNotFoundError(
+ f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
+ )
# work out how to handle radio stream
if (
streamdetails.target_loudness = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET)
process_time = int((time.time() - time_start) * 1000)
- LOGGER.debug("retrieved streamdetails for %s in %s milliseconds", queue_item.uri, process_time)
+ LOGGER.debug(
+ "retrieved streamdetails for %s in %s milliseconds",
+ queue_item.uri,
+ process_time,
+ )
return streamdetails
stream_title = stream_title.group(1).decode("iso-8859-1", errors="replace")
cleaned_stream_title = clean_stream_title(stream_title)
if cleaned_stream_title != streamdetails.stream_title:
- LOGGER.log(VERBOSE_LOG_LEVEL, "ICY Radio streamtitle original: %s", stream_title)
LOGGER.log(
- VERBOSE_LOG_LEVEL, "ICY Radio streamtitle cleaned: %s", cleaned_stream_title
+ VERBOSE_LOG_LEVEL,
+ "ICY Radio streamtitle original: %s",
+ stream_title,
+ )
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "ICY Radio streamtitle cleaned: %s",
+ cleaned_stream_title,
)
streamdetails.stream_title = cleaned_stream_title
yield
raise NotImplementedError
- async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ async def on_streamed(
+ self,
+ streamdetails: StreamDetails,
+ seconds_streamed: int,
+ fully_played: bool = False,
+ ) -> None:
"""Handle callback when an item completed streaming."""
async def resolve_image(self, path: str) -> str | bytes:
query = "artists.item_id in :ids"
query_params = {"ids": library_items}
return await self.mass.music.artists.library_items(
- provider=self.instance_id, extra_query=query, extra_query_params=query_params
+ provider=self.instance_id,
+ extra_query=query,
+ extra_query_params=query_params,
)
if subpath == "albums":
library_items = await self.mass.cache.get(
if not library_item and not prov_item.available:
# skip unavailable tracks
self.logger.debug(
- "Skipping sync of item %s because it is unavailable", prov_item.uri
+ "Skipping sync of item %s because it is unavailable",
+ prov_item.uri,
)
continue
if not library_item:
await asyncio.sleep(0) # yield to eventloop
except MusicAssistantError as err:
self.logger.warning(
- "Skipping sync of item %s - error details: %s", prov_item.uri, str(err)
+ "Skipping sync of item %s - error details: %s",
+ prov_item.uri,
+ str(err),
)
# process deletions (= no longer in library)
await controller.set_favorite(db_id, False)
await asyncio.sleep(0) # yield to eventloop
await self.mass.cache.set(
- media_type.value, list(cur_db_ids), category=cache_category, base_key=cache_base_key
+ media_type.value,
+ list(cur_db_ids),
+ category=cache_category,
+ base_key=cache_base_key,
)
# DO NOT OVERRIDE BELOW
yield b""
raise NotImplementedError
- async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ async def on_streamed(
+ self,
+ streamdetails: StreamDetails,
+ seconds_streamed: int,
+ fully_played: bool = False,
+ ) -> None:
"""Handle callback when an item completed streaming."""
from collections.abc import AsyncGenerator, Sequence
from typing import TYPE_CHECKING
-from music_assistant_models.enums import ContentType, MediaType, ProviderFeature, StreamType
+from music_assistant_models.enums import (
+ ContentType,
+ MediaType,
+ ProviderFeature,
+ StreamType,
+)
from music_assistant_models.media_items import (
Album,
Artist,
from music_assistant.models.music_provider import MusicProvider
if TYPE_CHECKING:
- from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
+ from music_assistant_models.config_entries import (
+ ConfigEntry,
+ ConfigValueType,
+ ProviderConfig,
+ )
from music_assistant_models.provider import ProviderManifest
from music_assistant import MusicAssistant
# stream_type is set to CUSTOM in the get_stream_details method.
yield # type: ignore
- async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ async def on_streamed(
+ self,
+ streamdetails: StreamDetails,
+ seconds_streamed: int,
+ fully_played: bool = False,
+ ) -> None:
"""Handle callback when an item completed streaming."""
# This is OPTIONAL callback that is called when an item has been streamed.
# You can use this e.g. for playback reporting or statistics.
from aiohttp import ClientSession, ClientTimeout
from Crypto.Cipher import Blowfish
from deezer import exceptions as deezer_exceptions
-from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
+from music_assistant_models.config_entries import (
+ ConfigEntry,
+ ConfigValueType,
+ ProviderConfig,
+)
from music_assistant_models.enums import (
AlbumType,
ConfigEntryType,
del buffer[:2048]
yield bytes(buffer)
- async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ async def on_streamed(
+ self,
+ streamdetails: StreamDetails,
+ seconds_streamed: int,
+ fully_played: bool = False,
+ ) -> None:
"""Handle callback when an item completed streaming."""
await self.gw_client.log_listen(last_track=streamdetails)
from async_upnp_client.profiles.dlna import DmrDevice, TransportState
from async_upnp_client.search import async_search
from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant_models.enums import ConfigEntryType, PlayerFeature, PlayerState, PlayerType
+from music_assistant_models.enums import (
+ ConfigEntryType,
+ PlayerFeature,
+ PlayerState,
+ PlayerType,
+)
from music_assistant_models.errors import PlayerUnavailableError
from music_assistant_models.player import DeviceInfo, Player, PlayerMedia
def _set_player_features(self, dlna_player: DLNAPlayer) -> None:
"""Set Player Features based on config values and capabilities."""
- supported_features: set[PlayerFeature] = set()
- if not self.mass.config.get_raw_player_config_value(
- dlna_player.udn,
- CONF_ENTRY_FLOW_MODE_DEFAULT_ENABLED.key,
- CONF_ENTRY_FLOW_MODE_DEFAULT_ENABLED.default_value,
- ):
- supported_features.add(PlayerFeature.ENQUEUE)
-
+ supported_features: set[PlayerFeature] = {
+ # there is no way to check if a dlna player support enqueuing
+ # so we simply assume it does and if it doesn't
+ # you'll find out at playback time and we log a warning
+ PlayerFeature.ENQUEUE,
+ }
if dlna_player.device.has_volume_level:
supported_features.add(PlayerFeature.VOLUME_SET)
if dlna_player.device.has_volume_mute:
offset = 0
size = 500
albums = await self._run_async(
- self._conn.getAlbumList2, ltype="alphabeticalByArtist", size=size, offset=offset
+ self._conn.getAlbumList2,
+ ltype="alphabeticalByArtist",
+ size=size,
+ offset=offset,
)
while albums:
for album in albums:
yield self._parse_album(album)
offset += size
albums = await self._run_async(
- self._conn.getAlbumList2, ltype="alphabeticalByArtist", size=size, offset=offset
+ self._conn.getAlbumList2,
+ ltype="alphabeticalByArtist",
+ size=size,
+ offset=offset,
)
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""
try:
await self._run_async(
- self._conn.updatePlaylist, lid=prov_playlist_id, songIdsToAdd=prov_track_ids
+ self._conn.updatePlaylist,
+ lid=prov_playlist_id,
+ songIdsToAdd=prov_track_ids,
)
except SonicError as ex:
msg = f"Failed to add songs to {prov_playlist_id}, check your permissions."
self.logger.debug("scrobble for now playing called for %s", item_id)
await self._run_async(self._conn.scrobble, sid=item_id, submission=False)
- async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ async def on_streamed(
+ self,
+ streamdetails: StreamDetails,
+ seconds_streamed: int,
+ fully_played: bool = False,
+ ) -> None:
"""Handle callback when an item completed streaming."""
self.logger.debug("on_streamed called for %s", streamdetails.item_id)
if seconds_streamed >= streamdetails.duration / 2:
self.logger.debug("starting stream of item '%s'", streamdetails.item_id)
try:
with self._conn.stream(
- streamdetails.item_id, timeOffset=seek_position, estimateContentLength=True
+ streamdetails.item_id,
+ timeOffset=seek_position,
+ estimateContentLength=True,
) as stream:
for chunk in stream.iter_content(chunk_size=40960):
asyncio.run_coroutine_threadsafe(
action=CONF_ACTION_LIBRARY,
action_label="Select Plex Music Library",
)
- if action in (CONF_ACTION_LIBRARY, CONF_ACTION_AUTH_MYPLEX, CONF_ACTION_AUTH_LOCAL):
+ if action in (
+ CONF_ACTION_LIBRARY,
+ CONF_ACTION_AUTH_MYPLEX,
+ CONF_ACTION_AUTH_LOCAL,
+ ):
token = mass.config.decrypt_string(str(values.get(CONF_AUTH_TOKEN)))
server_http_ip = str(values.get(CONF_LOCAL_SERVER_IP))
server_http_port = str(values.get(CONF_LOCAL_SERVER_PORT))
return stream_details
- async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ async def on_streamed(
+ self,
+ streamdetails: StreamDetails,
+ seconds_streamed: int,
+ fully_played: bool = False,
+ ) -> None:
"""Handle callback when an item completed streaming."""
def mark_played() -> None:
item = streamdetails.data
- params = {"key": str(item.ratingKey), "identifier": "com.plexapp.plugins.library"}
+ params = {
+ "key": str(item.ratingKey),
+ "identifier": "com.plexapp.plugins.library",
+ }
self._plex_server.query("/:/scrobble", params=params)
await asyncio.to_thread(mark_played)
)
from music_assistant.helpers.app_vars import app_var
from music_assistant.helpers.json import json_loads
-from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
+from music_assistant.helpers.throttle_retry import (
+ ThrottlerManager,
+ throttle_with_retries,
+)
from music_assistant.helpers.util import lock, parse_title_and_version, try_parse_int
from music_assistant.models.music_provider import MusicProvider
async with self.throttler.bypass():
await self._post_data("track/reportStreamingStart", data=events)
- async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ async def on_streamed(
+ self,
+ streamdetails: StreamDetails,
+ seconds_streamed: int,
+ fully_played: bool = False,
+ ) -> None:
"""Handle callback when an item completed streaming."""
user_id = self._user_auth_info["user"]["id"]
async with self.throttler.bypass():
"mashumaro==3.14",
"memory-tempfile==2.2.3",
"music-assistant-frontend==2.10.1",
- "music-assistant-models==1.1.6",
+ "music-assistant-models==1.1.7",
"orjson==3.10.12",
"pillow==11.0.0",
"python-slugify==8.0.4",
mashumaro==3.14
memory-tempfile==2.2.3
music-assistant-frontend==2.10.1
-music-assistant-models==1.1.6
+music-assistant-models==1.1.7
orjson==3.10.12
pillow==11.0.0
pkce==1.0.3