from typing import TYPE_CHECKING, Any, TypedDict, cast
import shortuuid
-from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
+from music_assistant_models.config_entries import (
+ ConfigEntry,
+ ConfigValueOption,
+ ConfigValueType,
+)
from music_assistant_models.enums import (
ConfigEntryType,
ContentType,
UnsupportedFeaturedException,
)
from music_assistant_models.media_items import (
+ Album,
+ Artist,
+ Audiobook,
BrowseFolder,
ItemMapping,
MediaItemType,
PlayableMediaItemType,
Playlist,
+ Podcast,
PodcastEpisode,
+ Track,
+ UniqueList,
media_from_dict,
)
-from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
+from music_assistant_models.playback_progress_report import (
+ MediaItemPlaybackProgressReport,
+)
from music_assistant_models.player_queue import PlayerQueue
from music_assistant_models.queue_item import QueueItem
if TYPE_CHECKING:
from collections.abc import Iterator
- from music_assistant_models.media_items import (
- Album,
- Artist,
- Audiobook,
- Podcast,
- Track,
- UniqueList,
- )
+ from music_assistant_models.media_items.metadata import MediaItemImage
from music_assistant import MusicAssistant
from music_assistant.models.player import Player
return
# a single item or list of items may be provided
- if not isinstance(media, list):
- media = [media]
+ media_list = media if isinstance(media, list) else [media]
# clear queue if needed
if option == QueueOption.REPLACE:
media_items: list[MediaItemType] = []
radio_source: list[MediaItemType] = []
# resolve all media items
- for item in media:
+ for item in media_list:
try:
# parse provided uri into a MA MediaItem or Basic QueueItem from URL
+ media_item: MediaItemType | ItemMapping | BrowseFolder
if isinstance(item, str):
media_item = await self.mass.music.get_item_by_uri(item)
- elif isinstance(item, dict):
- media_item = media_from_dict(item)
+ elif isinstance(item, dict): # type: ignore[unreachable]
+ # TODO: Investigate why the API parser sometimes passes raw dicts instead of
+ # converting them to MediaItem objects. The parse_value function in api.py
+ # should handle dict-to-object conversion, but dicts are slipping through
+ # in some cases. This is defensive handling for that parser bug.
+ media_item = media_from_dict(item) # type: ignore[unreachable]
+ self.logger.debug("Converted to: %s", type(media_item))
else:
+ # item is MediaItemType | ItemMapping at this point
media_item = item
+
# Save requested media item to play on the queue so we can use it as a source
# for Don't stop the music. Use FIFO list to keep track of the last 10 played items
- if media_item.media_type in (
+ # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
+ if not isinstance(
+ media_item, (ItemMapping, BrowseFolder)
+ ) and media_item.media_type in (
MediaType.TRACK,
MediaType.ALBUM,
MediaType.PLAYLIST,
queue.enqueued_media_items.append(media_item)
if len(queue.enqueued_media_items) > 10:
queue.enqueued_media_items.pop(0)
+
# handle default enqueue option if needed
if option is None:
- option = QueueOption(
- await self.mass.config.get_core_config_value(
- self.domain,
- f"default_enqueue_option_{media_item.media_type.value}",
- )
+ config_value = await self.mass.config.get_core_config_value(
+ self.domain,
+ f"default_enqueue_option_{media_item.media_type.value}",
+ return_type=str,
)
+ option = QueueOption(config_value)
if option == QueueOption.REPLACE:
self.clear(queue_id, skip_stop=True)
+
# collect media_items to play
if radio_mode:
- radio_source.append(media_item)
+ # Type guard for mypy - only add full MediaItemType to radio_source
+ if not isinstance(media_item, (ItemMapping, BrowseFolder)):
+ radio_source.append(media_item)
else:
- media_items += await self._resolve_media_items(media_item, start_item)
+ # Convert start_item to string URI if needed
+ start_item_uri: str | None = None
+ if isinstance(start_item, str):
+ start_item_uri = start_item
+ elif start_item is not None:
+ start_item_uri = start_item.uri
+ media_items += await self._resolve_media_items(media_item, start_item_uri)
except MusicAssistantError as err:
# invalid MA uri or item not found error
queue.radio_source += radio_source
# Use collected media items to calculate the radio if radio mode is on
if radio_mode:
- media_items = await self._get_radio_tracks(
+ radio_tracks = await self._get_radio_tracks(
queue_id=queue_id, is_initial_radio_mode=True
)
+ media_items = list(radio_tracks)
# only add valid/available items
- queue_items = [
- QueueItem.from_media_item(queue_id, x) for x in media_items if x and x.available
- ]
+ queue_items: list[QueueItem] = []
+ for x in media_items:
+ if not x or not x.available:
+ continue
+ queue_items.append(
+ QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
+ )
if not queue_items:
raise MediaNotFoundError("No playable items found")
"""
queue = self._queues[queue_id]
item_index = self.index_by_id(queue_id, queue_item_id)
- if item_index <= queue.index_in_buffer:
+ if item_index is None:
+ raise InvalidDataError(f"Item {queue_item_id} not found in queue")
+ if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
msg = f"{item_index} is already played/buffered"
raise IndexError(msg)
"""Delete item (by id or index) from the queue."""
if isinstance(item_id_or_index, str):
item_index = self.index_by_id(queue_id, item_id_or_index)
+ if item_index is None:
+ raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
else:
item_index = item_id_or_index
queue = self._queues[queue_id]
- queue_id: queue_id of the playerqueue to handle the command.
"""
- queue_player: Player = self.mass.players.get(queue_id, True)
+ queue_player = self.mass.players.get(queue_id, True)
+ if queue_player is None:
+ raise PlayerUnavailableError(f"Player {queue_id} is not available")
if (queue := self.get(queue_id)) and queue.active:
if queue.state == PlaybackState.PLAYING:
- queue.resume_pos = queue.corrected_elapsed_time
- # forward the actual command to the player
- if queue_player := self.mass.players.get(queue_id):
- await queue_player.stop()
+ queue.resume_pos = int(queue.corrected_elapsed_time)
+ # forward the actual command to the player
+ if temp_player := self.mass.players.get(queue_id):
+ await temp_player.stop()
@api_command("player_queues/play")
async def play(self, queue_id: str) -> None:
- queue_id: queue_id of the playerqueue to handle the command.
"""
- queue_player: Player = self.mass.players.get(queue_id, True)
+ queue_player = self.mass.players.get(queue_id, True)
+ if queue_player is None:
+ raise PlayerUnavailableError(f"Player {queue_id} is not available")
if (
(queue := self._queues.get(queue_id))
and queue.active
"""
if queue := self._queues.get(queue_id):
if queue.state == PlaybackState.PLAYING:
- queue.resume_pos = queue.corrected_elapsed_time
+ queue.resume_pos = int(queue.corrected_elapsed_time)
# forward the actual command to the player controller
queue_player = self.mass.players.get(queue_id)
assert queue_player is not None # for type checking
# TODO: forward to underlying player if not active
return
idx = self._queues[queue_id].current_index
+ if idx is None:
+ self.logger.warning("Queue %s has no current index", queue.display_name)
+ return
attempts = 5
while attempts:
try:
if (queue := self.get(queue_id)) is None or not queue.active:
# TODO: forward to underlying player if not active
return
- await self.seek(queue_id, self._queues[queue_id].elapsed_time + seconds)
+ await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
@api_command("player_queues/seek")
async def seek(self, queue_id: str, position: int = 10) -> None:
"""
if not (queue := self.get(queue_id)):
return
- queue_player: Player = self.mass.players.get(queue_id, True)
+ queue_player = self.mass.players.get(queue_id, True)
+ if queue_player is None:
+ raise PlayerUnavailableError(f"Player {queue_id} is not available")
if not queue.current_item:
raise InvalidCommand(f"Queue {queue_player.display_name} has no item(s) loaded.")
if not queue.current_item.duration:
position = max(0, int(position))
if position > queue.current_item.duration:
raise InvalidCommand("Can not seek outside of duration range.")
+ if queue.current_index is None:
+ raise InvalidCommand(f"Queue {queue_player.display_name} has no current index.")
await self.play_index(queue_id, queue.current_index, seek_position=position)
@api_command("player_queues/resume")
if resume_item is not None:
queue_player = self.mass.players.get(queue_id)
+ if queue_player is None:
+ raise PlayerUnavailableError(f"Player {queue_id} is not available")
if (
fade_in is None
and queue_player.playback_state == PlaybackState.IDLE
if resume_item.media_type == MediaType.RADIO:
# we're not able to skip in online radio so this is pointless
resume_pos = 0
- await self.play_index(queue_id, resume_item.queue_item_id, resume_pos, fade_in)
+ await self.play_index(
+ queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
+ )
else:
msg = f"Resume queue requested but queue {queue.display_name} is empty"
raise QueueEmpty(msg)
queue = self._queues[queue_id]
queue.resume_pos = 0
if isinstance(index, str):
- index = self.index_by_id(queue_id, index)
+ temp_index = self.index_by_id(queue_id, index)
+ if temp_index is None:
+ raise InvalidDataError(f"Item {index} not found in queue")
+ index = temp_index
+ # At this point index is guaranteed to be int
queue.current_index = index
# update current item and elapsed time and signal update
# this way the UI knows immediately that a new item is loading
self.signal_update(queue_id)
queue.index_in_buffer = index
queue.flow_mode_stream_log = []
- prefer_flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE)
+ prefer_flow_mode = await self.mass.config.get_player_config_value(
+ queue_id, CONF_FLOW_MODE, return_type=bool
+ )
target_player = self.mass.players.get(queue_id)
+ if target_player is None:
+ raise PlayerUnavailableError(f"Player {queue_id} is not available")
enqueue_supported = PlayerFeature.ENQUEUE in target_player.supported_features
queue.next_item_id_enqueued = None
# always update session id when we start a new playback session
queue.session_id = shortuuid.random(length=8)
-
# handle resume point of audiobook(chapter) or podcast(episode)
if (
not seek_position
break
except (MediaNotFoundError, AudioError):
# the requested index can not be played.
- self.logger.warning(
- "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
- )
- queue_item.available = False
- index = self._get_next_index(queue_id, index, allow_repeat=False)
+ if queue_item:
+ self.logger.warning(
+ "Skipping unplayable item %s (%s)",
+ queue_item.name,
+ queue_item.uri,
+ )
+ queue_item.available = False
+ next_index = self._get_next_index(queue_id, index, allow_repeat=False)
+ if next_index is None:
+ raise MediaNotFoundError("No next item available")
+ index = next_index
else:
# all attempts to find a playable item failed
raise MediaNotFoundError("No playable item found to start playback")
auto_play = source_queue.state == PlaybackState.PLAYING
target_player = self.mass.players.get(target_queue_id)
+ if target_player is None:
+ raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
if target_player.active_group or target_player.synced_to:
# edge case: the user wants to move playback from the group as a whole, to a single
# player in the group or it is grouped and the command targeted at the single player.
# We need to dissolve the group first.
- await self.mass.players.cmd_ungroup(
- target_player.active_group or target_player.synced_to
- )
+ group_id = target_player.active_group or target_player.synced_to
+ assert group_id is not None # checked in if condition above
+ await self.mass.players.cmd_ungroup(group_id)
await asyncio.sleep(3)
source_items = self._queue_items[source_queue_id]
target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
target_queue.radio_source = source_queue.radio_source
target_queue.enqueued_media_items = source_queue.enqueued_media_items
- target_queue.resume_pos = source_queue.elapsed_time
+ target_queue.resume_pos = int(source_queue.elapsed_time)
target_queue.current_index = source_queue.current_index
if source_queue.current_item:
target_queue.current_item = source_queue.current_item
queue = None
# try to restore previous state
if prev_state := await self.mass.cache.get(
- key=queue_id, provider=self.domain, category=CACHE_CATEGORY_PLAYER_QUEUE_STATE
+ key=queue_id,
+ provider=self.domain,
+ category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
):
try:
queue = PlayerQueue.from_dict(prev_state)
queue_items = [QueueItem.from_cache(x) for x in prev_items]
if queue.enqueued_media_items:
# we need to restore the MediaItem objects for the enqueued media items
- restored_enqueued_items = []
- for item in queue.enqueued_media_items:
+ # Items from cache may be dicts that need deserialization
+
+ restored_enqueued_items: list[MediaItemType] = []
+ cached_items: list[Any] = cast("list[Any]", queue.enqueued_media_items)
+ for item in cached_items:
if isinstance(item, dict):
restored_item = media_from_dict(item)
- restored_enqueued_items.append(restored_item)
+ restored_enqueued_items.append(cast("MediaItemType", restored_item))
else:
restored_enqueued_items.append(item)
queue.enqueued_media_items = restored_enqueued_items
# if the player is permanently removed, we also remove the cached queue data
self.mass.create_task(
self.mass.cache.delete(
- key=player_id, provider=self.domain, category=CACHE_CATEGORY_PLAYER_QUEUE_STATE
+ key=player_id,
+ provider=self.domain,
+ category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
)
)
self.mass.create_task(
)
)
current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
- previous_track_from_same_album = (
- (previous_index := max(current_index - 1, 0))
- and (previous_index > 0)
- and (previous_item := self.get_item(queue_id, previous_index))
- and (
- queue_item.media_item
- and hasattr(queue_item.media_item, "album")
- and queue_item.media_item.album
- and previous_item.media_item
+ if current_index is None:
+ previous_track_from_same_album = False
+ else:
+ previous_index = max(current_index - 1, 0)
+ previous_track_from_same_album = (
+ previous_index > 0
+ and (previous_item := self.get_item(queue_id, previous_index)) is not None
+ and previous_item.media_item is not None
and hasattr(previous_item.media_item, "album")
- and previous_item.media_item.album
+ and previous_item.media_item.album is not None
+ and queue_item.media_item is not None
+ and hasattr(queue_item.media_item, "album")
+ and queue_item.media_item.album is not None
and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
)
- )
playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
- if queue_item.media_item and queue_item.media_item.media_type == MediaType.TRACK:
+ if queue_item.media_item and isinstance(queue_item.media_item, Track):
album = queue_item.media_item.album
# prefer the full library media item so we have all metadata and provider(quality) info
# always request the full library item as there might be other qualities available
queue_item.media_item.item_id,
queue_item.media_item.provider,
):
- queue_item.media_item = library_item
+ queue_item.media_item = cast("Track", library_item)
elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
"ytmusic"
):
# Youtube Music has poor thumbs by default, so we always fetch the full item
# this also catches the case where they have an unavailable item in a listing
- queue_item.media_item = await self.mass.music.get_item_by_uri(queue_item.uri)
+ fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
+ queue_item.media_item = cast("Track", fetched_item)
+
# ensure we got the full (original) album set
if album and (
library_album := await self.mass.music.get_library_item_by_prov_id(
album.provider,
)
):
- queue_item.media_item.album = library_album
+ queue_item.media_item.album = cast("Album", library_album)
elif album:
# Restore original album if we have no better alternative from the library
queue_item.media_item.album = album
# prefer album image over track image
if queue_item.media_item.album and queue_item.media_item.album.image:
- org_images = queue_item.media_item.metadata.images or []
- queue_item.media_item.metadata.images = [
- queue_item.media_item.album.image,
- *org_images,
- ]
+ org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
+ queue_item.media_item.metadata.images = UniqueList(
+ [
+ queue_item.media_item.album.image,
+ *org_images,
+ ]
+ )
# Fetch the streamdetails, which could raise in case of an unplayable item.
# For example, YT Music returns Radio Items that are not playable.
queue_item.streamdetails = await get_stream_details(
queue_item=queue_item,
seek_position=seek_position,
fade_in=fade_in,
- prefer_album_loudness=playing_album_tracks,
+ prefer_album_loudness=bool(playing_album_tracks),
)
def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
# without having to compare the entire list
queue.items_last_updated = time.time()
self.signal_update(queue_id, True)
- if queue.state == PlaybackState.PLAYING and queue.index_in_buffer == queue.current_index:
+ if (
+ queue.state == PlaybackState.PLAYING
+ and queue.index_in_buffer is not None
+ and queue.index_in_buffer == queue.current_index
+ ):
# if the queue is playing,
# ensure to (re)queue the next track because it might have changed
# note that we only do this if the player has loaded the current track
duration = duration - queue_item.streamdetails.seek_position
else:
duration = queue_item.duration
+ if queue.session_id is None:
+ # handle error or return early
+ raise InvalidDataError("Queue session_id is None")
media = PlayerMedia(
uri=await self.mass.streams.resolve_stream_url(
queue.session_id, queue_item, flow_mode=flow_mode
)
random.shuffle(all_items)
return all_items
-
if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
- all_items: list[Track] = []
+ all_tracks: list[Track] = []
for library_album in await self.mass.music.artists.albums(
artist.item_id,
artist.provider,
for album_track in await self.mass.music.albums.tracks(
library_album.item_id, library_album.provider
):
- if album_track not in all_items:
- all_items.append(album_track)
- random.shuffle(all_items)
- return all_items
-
+ if album_track not in all_tracks:
+ all_tracks.append(album_track)
+ random.shuffle(all_tracks)
+ return all_tracks
return []
async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
)
if chapter is not None:
# user explicitly selected a chapter to play
- if isinstance(chapter, str):
- start_chapter = int(chapter)
+ start_chapter = int(chapter) if isinstance(chapter, str) else chapter
if chapters := audio_book.metadata.chapters:
if _chapter := next((x for x in chapters if x.position == start_chapter), None):
- return _chapter.start * 1000
+ return int(_chapter.start * 1000)
raise InvalidDataError(
f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
)
raise InvalidDataError("Either podcast or episode must be provided")
if podcast is None:
# single podcast episode requested
+ assert isinstance(episode, PodcastEpisode) # checked above
self.logger.debug(
"Fetching resume point to play for Podcast episode %s",
episode.name,
)
- episode = cast("PodcastEpisode", episode)
- fully_played, resume_position_ms = await self.mass.music.get_resume_position(episode)
+ (
+ fully_played,
+ resume_position_ms,
+ ) = await self.mass.music.get_resume_position(episode)
episode.fully_played = fully_played
episode.resume_position_ms = 0 if fully_played else resume_position_ms
- return [episode]
+ return UniqueList([episode])
# podcast with optional start episode requested
self.logger.debug(
"Fetching episode(s) and resume point to play for Podcast %s",
all_episodes.sort(key=lambda x: x.position)
# if a episode was provided, a user explicitly selected a episode to play
# so we need to find the index of the episode in the list
+ resolved_episode: PodcastEpisode | None = None
if isinstance(episode, PodcastEpisode):
- episode = next((x for x in all_episodes if x.uri == episode.uri), None)
- # ensure we have accurate resume info
- fully_played, resume_position_ms = await self.mass.music.get_resume_position(episode)
- episode.resume_position_ms = 0 if fully_played else resume_position_ms
+ resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
+ if resolved_episode:
+ # ensure we have accurate resume info
+ (
+ fully_played,
+ resume_position_ms,
+ ) = await self.mass.music.get_resume_position(resolved_episode)
+ resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
elif isinstance(episode, str):
- episode = next((x for x in all_episodes if episode in (x.uri, x.item_id)), None)
- # ensure we have accurate resume info
- fully_played, resume_position_ms = await self.mass.music.get_resume_position(episode)
- episode.resume_position_ms = 0 if fully_played else resume_position_ms
+ resolved_episode = next(
+ (x for x in all_episodes if episode in (x.uri, x.item_id)), None
+ )
+ if resolved_episode:
+ # ensure we have accurate resume info
+ (
+ fully_played,
+ resume_position_ms,
+ ) = await self.mass.music.get_resume_position(resolved_episode)
+ resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
else:
# get first episode that is not fully played
- for episode in all_episodes:
- if episode.fully_played:
+ for ep in all_episodes:
+ if ep.fully_played:
continue
# ensure we have accurate resume info
- fully_played, resume_position_ms = await self.mass.music.get_resume_position(
- episode
- )
+ (
+ fully_played,
+ resume_position_ms,
+ ) = await self.mass.music.get_resume_position(ep)
if fully_played:
continue
- episode.resume_position_ms = resume_position_ms
+ ep.resume_position_ms = resume_position_ms
+ resolved_episode = ep
break
else:
# no episodes found that are not fully played, so we start at the beginning
- episode = next((x for x in all_episodes), None)
- if episode is None:
+ resolved_episode = next((x for x in all_episodes), None)
+ if resolved_episode is None:
raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
# get the index of the episode
- episode_index = all_episodes.index(episode)
+ episode_index = all_episodes.index(resolved_episode)
# return the (remaining) episode(s) to play
- return all_episodes[episode_index:]
+ return UniqueList(all_episodes[episode_index:])
def _get_next_index(
self,
def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
"""Return next QueueItem for given queue."""
+ index: int
if isinstance(cur_index, str):
- cur_index = self.index_by_id(queue_id, cur_index)
- if cur_index is None:
- return None # guard
+ resolved_index = self.index_by_id(queue_id, cur_index)
+ if resolved_index is None:
+ return None # guard
+ index = resolved_index
+ else:
+ index = cur_index
+ # At this point index is guaranteed to be int
for skip in range(5):
- if (next_index := self._get_next_index(queue_id, cur_index + skip)) is None:
+ if (next_index := self._get_next_index(queue_id, index + skip)) is None:
break
next_item = self.get_item(queue_id, next_index)
+ if next_item is None:
+ continue
if not next_item.available:
# ensure that we skip unavailable items (set by load_next track logic)
continue
task_id = f"preload_next_item_{queue_id}"
self.mass.create_task(
- _preload_streamdetails, item_id_in_buffer, task_id=task_id, abort_existing=True
+ _preload_streamdetails,
+ item_id_in_buffer,
+ task_id=task_id,
+ abort_existing=True,
)
async def _resolve_media_items(
"""Resolve/unwrap media items to enqueue."""
# resolve Itemmapping to full media item
if isinstance(media_item, ItemMapping):
+ if media_item.uri is None:
+ raise InvalidDataError("ItemMapping has no URI")
media_item = await self.mass.music.get_item_by_uri(media_item.uri)
if media_item.media_type == MediaType.PLAYLIST:
+ media_item = cast("Playlist", media_item)
self.mass.create_task(self.mass.music.mark_item_played(media_item))
- return await self.get_playlist_tracks(media_item, start_item)
+ return list(await self.get_playlist_tracks(media_item, start_item))
if media_item.media_type == MediaType.ARTIST:
+ media_item = cast("Artist", media_item)
self.mass.create_task(self.mass.music.mark_item_played(media_item))
- return await self.get_artist_tracks(media_item)
+ return list(await self.get_artist_tracks(media_item))
if media_item.media_type == MediaType.ALBUM:
+ media_item = cast("Album", media_item)
self.mass.create_task(self.mass.music.mark_item_played(media_item))
- return await self.get_album_tracks(media_item, start_item)
+ return list(await self.get_album_tracks(media_item, start_item))
if media_item.media_type == MediaType.AUDIOBOOK:
+ media_item = cast("Audiobook", media_item)
# ensure we grab the correct/latest resume point info
media_item.resume_position_ms = await self.get_audiobook_resume_point(
media_item, start_item
)
return [media_item]
if media_item.media_type == MediaType.PODCAST:
+ media_item = cast("Podcast", media_item)
self.mass.create_task(self.mass.music.mark_item_played(media_item))
- return await self.get_next_podcast_episodes(media_item, start_item)
+ return list(await self.get_next_podcast_episodes(media_item, start_item))
if media_item.media_type == MediaType.PODCAST_EPISODE:
- return await self.get_next_podcast_episodes(None, media_item)
+ media_item = cast("PodcastEpisode", media_item)
+ return list(await self.get_next_podcast_episodes(None, media_item))
if media_item.media_type == MediaType.FOLDER:
- return await self._get_folder_tracks(media_item)
+ media_item = cast("BrowseFolder", media_item)
+ return list(await self._get_folder_tracks(media_item))
# all other: single track or radio item
- return [media_item]
+ return [cast("MediaItemType", media_item)]
async def _get_radio_tracks(
self, queue_id: str, is_initial_radio_mode: bool = False
) -> list[Track]:
"""Call the registered music providers for dynamic tracks."""
queue = self._queues[queue_id]
- queue_track_items = [q.media_item for q in self._queue_items[queue_id] if q.media_item]
+ queue_track_items: list[Track] = [
+ q.media_item
+ for q in self._queue_items[queue_id]
+ if q.media_item and isinstance(q.media_item, Track)
+ ]
if not queue.radio_source:
# this may happen during race conditions as this method is called delayed
- return None
+ return []
self.logger.info(
"Fetching radio tracks for queue %s based on: %s",
queue.display_name,
# Some providers don't have similar tracks for all items. For example,
# Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
# in that case, just skip the track.
- self.logger.debug("No similar tracks not found for track %s", base_track.name)
+ self.logger.debug("Similar tracks not found for track %s", base_track.name)
continue
for track in _similar_tracks:
if (
if len(dynamic_tracks) >= 50:
break
queue_tracks: list[Track] = []
- dynamic_tracks = list(dynamic_tracks)
+ dynamic_tracks_list = list(dynamic_tracks)
# Only include the sampled base tracks when the radio mode is first initialized
if is_initial_radio_mode:
queue_tracks += [base_tracks[0]]
if len(base_tracks) > 1:
for base_track in base_tracks[1:]:
queue_tracks += [base_track]
- if len(dynamic_tracks) > 2:
- queue_tracks += random.sample(dynamic_tracks, 2)
+ if len(dynamic_tracks_list) > 2:
+ queue_tracks += random.sample(dynamic_tracks_list, 2)
else:
- queue_tracks += dynamic_tracks
+ queue_tracks += dynamic_tracks_list
# Add dynamic tracks to the queue, make sure to exclude already picked tracks
- remaining_dynamic_tracks = [t for t in dynamic_tracks if t not in queue_tracks]
+ remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
if remaining_dynamic_tracks:
queue_tracks += random.sample(
remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
if not item.is_playable:
continue
# recursively fetch tracks from all media types
- tracks += await self._resolve_media_items(item)
+ resolved = await self._resolve_media_items(item)
+ tracks += [x for x in resolved if isinstance(x, Track)]
return tracks
) -> None:
"""Update the Queue when the player state changed."""
queue_id = player.player_id
- player = self.mass.players.get(queue_id)
queue = self._queues[queue_id]
# basic properties
player.playback_state or PlaybackState.IDLE if queue.active else PlaybackState.IDLE
)
# update current item/index from player report
- if queue.active and queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
+ if queue.active and queue.state in (
+ PlaybackState.PLAYING,
+ PlaybackState.PAUSED,
+ ):
# NOTE: If the queue is not playing (yet) we will not update the current index
# to ensure we keep the previously known current index
if queue.flow_mode:
# get current/next item based on current index
queue.current_index = current_index
queue.current_item = current_item = self.get_item(queue_id, current_index)
- queue.next_item = self.get_next_item(queue_id, current_index) if current_item else None
+ queue.next_item = (
+ self.get_next_item(queue_id, current_index)
+ if current_item and current_index is not None
+ else None
+ )
# correct elapsed time when seeking
if (
current_item=None,
elapsed_time=0,
stream_title=None,
+ codec_type=None,
output_formats=None,
),
)
current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
current_item=queue.current_item,
- elapsed_time=queue.elapsed_time,
+ elapsed_time=int(queue.elapsed_time),
stream_title=(
queue.current_item.streamdetails.stream_title
if queue.current_item and queue.current_item.streamdetails
),
output_formats=output_formats,
)
- changed_keys = get_changed_keys(prev_state, new_state)
+ changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
with suppress(KeyError):
changed_keys.remove("next_item_id")
"End of queue detected and Don't stop the music is enabled for %s"
" - setting enqueued media items as radio source: %s",
queue.display_name,
- ", ".join([x.uri for x in queue.enqueued_media_items]),
+ ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
)
queue.radio_source = queue.enqueued_media_items
# auto fill radio tracks if less than 5 tracks left in the queue
"""Calculate current queue index and current track elapsed time when flow mode is active."""
elapsed_time_queue_total = player.corrected_elapsed_time or 0
if queue.current_index is None and not queue.flow_mode_stream_log:
- return queue.current_index, queue.elapsed_time
+ return queue.current_index, int(queue.elapsed_time)
# For each track that has been streamed/buffered to the player,
# a playlog entry will be created with the queue item id
# out where we are in the queue, accounting for actual streamed
# seconds (and not duration) and skipped seconds. If a track has been repeated,
# it will simply be in the playlog multiple times.
- played_time = 0
- queue_index = queue.current_index or 0
- track_time = 0
+ played_time = 0.0
+ queue_index: int | None = queue.current_index or 0
+ track_time = 0.0
for play_log_entry in queue.flow_mode_stream_log:
queue_item_duration = (
# NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
if player.playback_state != PlaybackState.PLAYING:
# if the player is not playing, we can't be sure that the elapsed time is correct
# so we just return the queue index and the elapsed time
- return queue.current_index, queue.elapsed_time
- return queue_index, track_time
+ return queue.current_index, int(queue.elapsed_time)
+ return queue_index, int(track_time)
def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
"""Parse QueueItem ID from Player's current url."""
if prev_state["current_item_id"] is None:
return
- async def _clear_queue_delayed():
+ async def _clear_queue_delayed() -> None:
for _ in range(5):
await asyncio.sleep(1)
if queue.state != PlaybackState.IDLE:
# report on current item
is_current_item = True
item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
- if not item_to_report:
- return # guard against invalid items
seconds_played = int(new_state["elapsed_time"])
+ if not item_to_report:
+ return # guard against invalid items
+
if not item_to_report.media_item:
# only report on media items
return
+ assert item_to_report.media_item.uri is not None # uri is set in __post_init__
if item_to_report.streamdetails and item_to_report.streamdetails.duration:
duration = int(item_to_report.streamdetails.duration)