MusicAssistantError,
PlayerUnavailableError,
QueueEmpty,
+ UnsupportedFeaturedException,
+)
+from music_assistant.common.models.media_items import (
+ AudioFormat,
+ MediaItemType,
+ Playlist,
+ media_from_dict,
)
-from music_assistant.common.models.media_items import AudioFormat, MediaItemType, media_from_dict
from music_assistant.common.models.player import PlayerMedia
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.constants import (
- CONF_CROSSFADE,
- CONF_FLOW_MODE,
- FALLBACK_DURATION,
- MASS_LOGO_ONLINE,
-)
+from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE, MASS_LOGO_ONLINE
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.audio import get_stream_details
from music_assistant.server.helpers.throttle_retry import BYPASS_THROTTLER
if queue.repeat_mode == repeat_mode:
return # no change
queue.repeat_mode = repeat_mode
- # ensure that we restart playback or trigger enqueue next if repeat mode changed
self.signal_update(queue_id)
- if (
- repeat_mode == RepeatMode.ONE
- and queue.flow_mode
- and queue.state == PlayerState.PLAYING
- and queue.current_index != queue.index_in_buffer
- ):
- # edge case; repeat one enabled in flow mode but the
- # flow stream had already loaded a new item in the buffer,
- # we need to restart playback
- self.mass.create_task(self.resume(queue_id))
- else:
- task_id = f"enqueue_next_{queue_id}"
- self.logger.info("Repeat mode detected, enqueue next item")
- self.mass.call_later(2, self._enqueue_next, queue, queue.current_index, task_id=task_id)
+ # ensure that we trigger enqueue next if repeat mode changed (if needed/supported)
+ task_id = f"enqueue_next_{queue_id}"
+ self.mass.call_later(5, self._enqueue_next, queue, queue.current_index, task_id=task_id)
@api_command("player_queues/play_media")
async def play_media(
# clear queue if needed
if option == QueueOption.REPLACE:
self.clear(queue_id)
- # Clear the 'played media item' list when a new queue is requested
+ # Clear the 'enqueued media item' list when a new queue is requested
if option not in (QueueOption.ADD, QueueOption.NEXT):
- queue.enqueued_media_items = []
+ queue.enqueued_media_items.clear()
tracks: list[MediaItemType] = []
radio_source: list[MediaItemType] = []
# Save requested media item to play on the queue so we can use it as a source
# for Don't stop the music. Use FIFO list to keep track of the last 10 played items
- queue.enqueued_media_items.append(media_item.uri)
- if len(queue.enqueued_media_items) > 10:
- queue.enqueued_media_items.pop(0)
+ if media_item.media_type in (
+ MediaType.TRACK,
+ MediaType.ALBUM,
+ MediaType.PLAYLIST,
+ MediaType.ARTIST,
+ ):
+ queue.enqueued_media_items.append(media_item)
+ if len(queue.enqueued_media_items) > 10:
+ queue.enqueued_media_items.pop(0)
# handle default enqueue option if needed
if option is None:
if radio_mode:
radio_source.append(media_item)
elif media_item.media_type == MediaType.PLAYLIST:
- async for playlist_track in self.mass.music.playlists.tracks(
- media_item.item_id, media_item.provider
- ):
- if not playlist_track.available:
- continue
- # allow first track to start playing immediately while we still
- # work out the rest of the queue
- if (
- not queue.shuffle_enabled
- and not first_track_seen
- and option == QueueOption.REPLACE
- and not start_item
- ):
- first_track_seen = True
- self.load(
- queue_id,
- queue_items=[QueueItem.from_media_item(queue_id, playlist_track)],
- keep_remaining=False,
- keep_played=False,
- )
- await self.play_index(queue_id, 0)
- # add the remaining items
- option = QueueOption.ADD
- else:
- tracks.append(playlist_track)
+ tracks += await self.get_playlist_tracks(media_item, start_item)
self.mass.create_task(
self.mass.music.mark_item_played(
media_item.media_type, media_item.item_id, media_item.provider
)
)
elif media_item.media_type == MediaType.ALBUM:
- tracks += await self.get_album_tracks(media_item)
+ tracks += await self.get_album_tracks(media_item, start_item)
self.mass.create_task(
self.mass.music.mark_item_played(
media_item.media_type, media_item.item_id, media_item.provider
# single track or radio item
tracks += [media_item]
- # handle optional start item (play playlist/album from here feature)
- if start_item is not None:
- prev_items = []
- next_items = []
- for track in tracks:
- if next_items or track.item_id == start_item:
- next_items.append(track)
- else:
- prev_items.append(track)
- tracks = next_items + prev_items
-
except MusicAssistantError as err:
# invalid MA uri or item not found error
self.logger.warning("Skipping %s: %s", item, str(err))
"""Clear all items in the queue."""
queue = self._queues[queue_id]
queue.radio_source = []
- queue.stream_finished = None
- queue.end_of_track_reached = None
if queue.state != PlayerState.IDLE:
self.mass.create_task(self.stop(queue_id))
queue.current_index = None
"""
if (queue := self.get(queue_id)) and queue.active:
queue.resume_pos = queue.corrected_elapsed_time
- queue.stream_finished = None
- queue.end_of_track_reached = None
# forward the actual command to the player provider
if player_provider := self.mass.players.get_player_provider(queue.queue_id):
await player_provider.cmd_stop(queue_id)
raise FileNotFoundError(msg)
queue.current_index = index
queue.index_in_buffer = index
- queue.flow_mode_start_index = index
+ queue.flow_mode_stream_log = []
queue.flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE)
next_index = self._get_next_index(queue_id, index, allow_repeat=False)
- queue.stream_finished = False
- queue.end_of_track_reached = False
-
queue.current_item = queue_item
self.signal_update(queue_id)
# update current item from player report
if queue.flow_mode:
# flow mode active, calculate current item
- queue.current_index, queue.elapsed_time = self.__get_queue_stream_index(queue, player)
+ queue.current_index, queue.elapsed_time = self._get_flow_queue_stream_index(
+ queue, player
+ )
queue.elapsed_time_last_updated = time.time()
else:
# queue is active and player has one of our tracks loaded, update state
# return early if nothing changed
if len(changed_keys) == 0:
return
- # check if we've reached the end of (the current) track
- if (
- queue.current_item
- and (duration := queue.current_item.duration)
- and (duration - queue.elapsed_time) < 10
- ):
- queue.end_of_track_reached = True
- elif prev_state["current_index"] != new_state["current_index"]:
- queue.end_of_track_reached = False
-
- # handle auto restart of queue in flow mode when repeat is enabled
- if (
- queue.flow_mode
- and queue.repeat_mode != RepeatMode.OFF
- and queue.stream_finished
- and prev_state["state"] == PlayerState.PLAYING
- and new_state["state"] == PlayerState.IDLE
- ):
- # flow mode and repeat mode is on, restart the queue
- next_index = self._get_next_index(queue_id, queue.current_index, allow_repeat=True)
- if next_index is not None:
- self.mass.create_task(self.play_index(queue_id, next_index))
# do not send full updates if only time was updated
if changed_keys == {"elapsed_time"}:
)
self._prev_states[queue_id] = new_state
return
- # handle player was playing and is now stopped
- # if player finished playing a track for 90%, mark current item as finished
- if (
- prev_state["state"] == "playing"
- and queue.state == PlayerState.IDLE
- and (
- queue.current_item
- and queue.current_item.duration
- and prev_state["elapsed_time"] > (queue.current_item.duration * 0.90)
- )
- ):
- queue.current_index += 1
- queue.current_item = None
- queue.next_item = None
+
# signal update and store state
self.signal_update(queue_id)
self._prev_states[queue_id] = new_state
+
+ # detect change in current index to report that a item has been played
+ end_of_queue_reached = (
+ prev_state["state"] == PlayerState.PLAYING
+ and new_state["state"] == PlayerState.IDLE
+ and queue.current_item is not None
+ and queue.next_item is None
+ )
+ if (
+ prev_state["current_index"] is not None
+ and (prev_state["current_index"] != new_state["current_index"] or end_of_queue_reached)
+ and (queue_item := self.get_item(queue_id, prev_state["current_index"]))
+ and (stream_details := queue_item.streamdetails)
+ ):
+ seconds_streamed = prev_state["elapsed_time"]
+ if music_prov := self.mass.get_provider(stream_details.provider):
+ if seconds_streamed > 10:
+ self.mass.create_task(music_prov.on_streamed(stream_details, seconds_streamed))
+ if queue_item.media_item and seconds_streamed > 10:
+ # signal 'media item played' event,
+ # which is useful for plugins that want to do scrobbling
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_PLAYED,
+ object_id=queue_item.media_item.uri,
+ data=round(seconds_streamed, 2),
+ )
+ if end_of_queue_reached:
+ # end of queue reached, clear items
+ self.mass.call_later(
+ 5, self._check_clear_queue, queue, task_id=f"clear_queue_{queue_id}"
+ )
+
# watch dynamic radio items refill if needed
- if "current_index" in changed_keys:
+ elif "current_index" in changed_keys:
if (
- queue.radio_source
- and queue.current_index
- and (queue.items - queue.current_index) < 5
- ):
- self.mass.create_task(self._fill_radio_tracks(queue_id))
- elif (
- # We have received the last item in the queue and Don't stop the music is enabled
queue.dont_stop_the_music_enabled
- and queue.current_index
+ and queue.enqueued_media_items
+ and queue.current_index is not None
and (queue.items - queue.current_index) <= 1
):
- self.mass.create_task(self._schedule_dont_stop_the_music(queue))
+ # We have received the last item in the queue and Don't stop the music is enabled
+ # set the played media item(s) as radio items (which will refill the queue)
+ # note that this will fail if there are no media items for which we have
+ # a dynamic radio source.
+ queue.radio_source = queue.enqueued_media_items
+ if (
+ queue.radio_source
+ and queue.current_index is not None
+ and (queue.items - queue.current_index) < 5
+ ):
+ task_id = f"fill_radio_tracks_{queue_id}"
+ self.mass.call_later(5, self._fill_radio_tracks(queue_id), task_id=task_id)
def on_player_remove(self, player_id: str) -> None:
"""Call when a player is removed from the registry."""
self._queues.pop(player_id, None)
self._queue_items.pop(player_id, None)
- async def preload_next_item(
+ async def load_next_item(
self,
queue_id: str,
current_item_id_or_index: str | int | None = None,
- allow_repeat: bool = True,
) -> QueueItem:
"""Call when a player wants to (pre)load the next item into the buffer.
idx = 0
while True:
next_item: QueueItem | None = None
- next_index = self._get_next_index(queue_id, cur_index + idx, allow_repeat=allow_repeat)
+ next_index = self._get_next_index(queue_id, cur_index + idx)
if next_index is None:
raise QueueEmpty("No more tracks left in the queue.")
queue_item = self.get_item(queue_id, next_index)
# No stream details found, skip this QueueItem
self.logger.debug("Skipping unplayable item: %s", next_item)
# we need to set a fake streamdetails object on the item
- # otherwise our flow mode logic will break that
+ # otherwise our flow mode logic will break which
# calculates where we are in the queue
+ playlog = queue_item.streamdetails.play_log if queue_item.streamdetails else []
+ playlog.append(0.0)
queue_item.streamdetails = StreamDetails(
provider=queue_item.media_item.provider if queue_item.media_item else "unknown",
item_id=queue_item.media_item.item_id if queue_item.media_item else "unknown",
media.image_url = self.mass.metadata.get_image_url(queue_item.image)
return media
+ async def get_artist_tracks(self, artist: Artist) -> list[Track]:
+ """Return tracks for given artist, based on user preference."""
+ artist_items_conf = self.mass.config.get_raw_core_config_value(
+ self.domain,
+ CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
+ ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
+ )
+ self.logger.debug(
+ "Fetching tracks to play for artist %s",
+ artist.name,
+ )
+ if artist_items_conf in ("library_tracks", "all_tracks"):
+ all_items = await self.mass.music.artists.tracks(
+ artist.item_id,
+ artist.provider,
+ in_library_only=artist_items_conf == "library_tracks",
+ )
+ random.shuffle(all_items)
+ return all_items
+
+ if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
+ all_items: list[Track] = []
+ for library_album in await self.mass.music.artists.albums(
+ artist.item_id,
+ artist.provider,
+ in_library_only=artist_items_conf == "library_album_tracks",
+ ):
+ for album_track in await self.mass.music.albums.tracks(
+ library_album.item_id, library_album.provider
+ ):
+ if album_track not in all_items:
+ all_items.append(album_track)
+ random.shuffle(all_items)
+ return all_items
+
+ return []
+
+ async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
+ """Return tracks for given album, based on user preference."""
+ album_items_conf = self.mass.config.get_raw_core_config_value(
+ self.domain,
+ CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
+ ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
+ )
+ result: list[Track] = []
+ start_item_found = False
+ self.logger.debug(
+ "Fetching tracks to play for album %s",
+ album.name,
+ )
+ for album_track in await self.mass.music.albums.tracks(
+ item_id=album.item_id,
+ provider_instance_id_or_domain=album.provider,
+ in_library_only=album_items_conf == "library_tracks",
+ ):
+ if not album_track.available:
+ continue
+ if start_item in (album_track.item_id, album_track.uri):
+ start_item_found = True
+ if start_item is not None and not start_item_found:
+ continue
+ result.append(album_track)
+ return result
+
+ async def get_playlist_tracks(self, playlist: Playlist, start_item: str | None) -> list[Track]:
+ """Return tracks for given playlist, based on user preference."""
+ result: list[Track] = []
+ start_item_found = False
+ self.logger.debug(
+ "Fetching tracks to play for playlist %s",
+ playlist.name,
+ )
+ # TODO: Handle other sort options etc.
+ async for playlist_track in self.mass.music.playlists.tracks(
+ playlist.item_id, playlist.provider
+ ):
+ if not playlist_track.available:
+ continue
+ if start_item in (playlist_track.item_id, playlist_track.uri):
+ start_item_found = True
+ if start_item is not None and not start_item_found:
+ continue
+ result.append(playlist_track)
+ return result
+
def _get_next_index(
self, queue_id: str, cur_index: int | None, is_skip: bool = False, allow_repeat: bool = True
) -> int | None:
async def _fill_radio_tracks(self, queue_id: str) -> None:
"""Fill a Queue with (additional) Radio tracks."""
- # we need to debounce, if we're called twice within a short timeframe
- debounce_key = f"fill_radio_{queue_id}"
- if getattr(self, debounce_key, None):
- return
- setattr(self, debounce_key, True)
tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
# fill queue - filter out unavailable items
queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
queue_items,
insert_at_index=len(self._queue_items[queue_id]) + 1,
)
- await asyncio.sleep(5)
- setattr(self, debounce_key, None)
async def _enqueue_next(self, queue: PlayerQueue, current_index: int | str) -> None:
"""Enqueue the next item in the queue."""
+ if queue.flow_mode:
+ return
if isinstance(current_index, str):
current_index = self.index_by_id(queue.queue_id, current_index)
with suppress(QueueEmpty):
- next_item = await self.preload_next_item(queue.queue_id, current_index)
+ next_item = await self.load_next_item(queue.queue_id, current_index)
await self.mass.players.enqueue_next_media(
player_id=queue.queue_id,
media=self.player_media_from_queue_item(next_item, queue.flow_mode),
) -> list[Track]:
"""Call the registered music providers for dynamic tracks."""
queue = self._queues[queue_id]
- assert queue.radio_source, "No Radio item(s) loaded/active!"
+ if not queue.radio_source:
+ # this may happen during race conditions as this method is called delayed
+ return None
available_base_tracks: list[Track] = []
base_track_sample_size = 5
# Grab all the available base tracks based on the selected source items.
# shuffle the source items, just in case
for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
ctrl = self.mass.music.get_controller(radio_item.media_type)
- available_base_tracks += [
- track
- for track in await ctrl.dynamic_base_tracks(radio_item.item_id, radio_item.provider)
- # Avoid duplicate base tracks
- if track not in available_base_tracks
- ]
+ try:
+ available_base_tracks += [
+ track
+ for track in await ctrl.dynamic_base_tracks(
+ radio_item.item_id, radio_item.provider
+ )
+ # Avoid duplicate base tracks
+ if track not in available_base_tracks
+ ]
+ except UnsupportedFeaturedException:
+ self.logger.debug(
+ "Skip loading radio items for %s: - "
+ "Provider %s does not support dynamic (base) tracks",
+ radio_item.uri,
+ radio_item.provider,
+ )
# Sample tracks from the base tracks, which will be used to calculate the dynamic ones
base_tracks = random.sample(
available_base_tracks, min(base_track_sample_size, len(available_base_tracks))
)
return queue_tracks
- async def get_artist_tracks(self, artist: Artist) -> list[Track]:
- """Return tracks for given artist, based on user preference."""
- artist_items_conf = self.mass.config.get_raw_core_config_value(
- self.domain,
- CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
- ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
- )
- self.logger.debug(
- "Fetching tracks to play for artist %s",
- artist.name,
- )
- if artist_items_conf in ("library_tracks", "all_tracks"):
- all_items = await self.mass.music.artists.tracks(
- artist.item_id,
- artist.provider,
- in_library_only=artist_items_conf == "library_tracks",
- )
- random.shuffle(all_items)
- return all_items
-
- if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
- all_items: list[Track] = []
- for library_album in await self.mass.music.artists.albums(
- artist.item_id,
- artist.provider,
- in_library_only=artist_items_conf == "library_album_tracks",
- ):
- for album_track in await self.mass.music.albums.tracks(
- library_album.item_id, library_album.provider
- ):
- if album_track not in all_items:
- all_items.append(album_track)
- random.shuffle(all_items)
- return all_items
-
- return []
-
- async def get_album_tracks(self, album: Album) -> list[Track]:
- """Return tracks for given album, based on user preference."""
- album_items_conf = self.mass.config.get_raw_core_config_value(
- self.domain,
- CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
- ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
- )
- self.logger.debug(
- "Fetching tracks to play for album %s",
- album.name,
- )
- return await self.mass.music.albums.tracks(
- item_id=album.item_id,
- provider_instance_id_or_domain=album.provider,
- in_library_only=album_items_conf == "library_tracks",
- )
-
- def __get_queue_stream_index(self, queue: PlayerQueue, player: Player) -> tuple[int, int]:
- """Calculate current queue index and current track elapsed time."""
- # player is playing a constant stream so we need to do this the hard way
- queue_index = 0
- elapsed_time_queue = player.corrected_elapsed_time or 0
- total_time = 0
+ def _check_clear_queue(self, queue: PlayerQueue) -> None:
+ """Check if the queue should be cleared after the current item."""
+ if queue.state != PlayerState.IDLE:
+ return
+ if queue.next_item is not None:
+ return
+ if queue.current_index >= len(self._queue_items[queue.queue_id]) - 1:
+ self.logger.info("End of queue reached, clearing items")
+ self.clear(queue.queue_id)
+
+ def _get_flow_queue_stream_index(
+ self, queue: PlayerQueue, player: Player
+ ) -> tuple[int | None, int]:
+ """Calculate current queue index and current track elapsed time when flow mode is active."""
+ elapsed_time_queue_total = player.corrected_elapsed_time or 0
+ if queue.current_index is None:
+ return None, elapsed_time_queue_total
+
+ # For each track that has been streamed/buffered to the player,
+ # a playlog entry will be created with the queue item id
+ # and the amount of seconds streamed. We traverse the playlog to figure
+ # out where we are in the queue, accounting for actual streamed
+ # seconds (and not duration) and skipped seconds. If a track has been repeated,
+ # it will simply be in the playlog multiple times.
+ played_time = 0
+ queue_index = queue.current_index or 0
track_time = 0
- queue_items = self._queue_items[queue.queue_id]
- if queue_items and len(queue_items) > queue.flow_mode_start_index:
- # start_index: holds the position from which the flow stream started
- queue_index = queue.flow_mode_start_index
- queue_track = None
- while len(queue_items) > queue_index:
- # keep enumerating the queue tracks to find current track
- # starting from the start index
- queue_track = queue_items[queue_index]
- if not queue_track.streamdetails:
- track_time = elapsed_time_queue - total_time
- break
- track_duration = (
- # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
- queue_track.streamdetails.seconds_streamed
- if queue_track.streamdetails.seconds_streamed is not None
- else (
- queue_track.streamdetails.duration
- or queue_track.duration
- or FALLBACK_DURATION
- )
- )
- if elapsed_time_queue > (track_duration + total_time):
- # total elapsed time is more than (streamed) track duration
- # move index one up
- total_time += track_duration
- queue_index += 1
+ for play_log_entry in queue.flow_mode_stream_log:
+ queue_item_duration = (
+ # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
+ play_log_entry.seconds_streamed
+ if play_log_entry.seconds_streamed is not None
+ else play_log_entry.duration
+ )
+ if elapsed_time_queue_total > (queue_item_duration + played_time):
+ # total elapsed time is more than (streamed) track duration
+ # this track has been fully played, move in.
+ played_time += queue_item_duration
+ else:
+ # no more seconds left to divide, this is our track
+ # account for any seeking by adding the skipped/seeked seconds
+ queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
+ queue_item = self.get_item(queue.queue_id, queue_index)
+ if queue_item and queue_item.streamdetails:
+ track_sec_skipped = queue_item.streamdetails.seek_position
else:
- # no more seconds left to divide, this is our track
- # account for any seeking by adding the skipped/seeked seconds
- track_sec_skipped = queue_track.streamdetails.seek_position
- track_time = elapsed_time_queue + track_sec_skipped - total_time
- break
+ track_sec_skipped = 0
+ track_time = elapsed_time_queue_total + track_sec_skipped - played_time
+ break
+
return queue_index, track_time
def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
if self.get_item(queue_id, current_item_id):
return current_item_id
return None
-
- async def _schedule_dont_stop_the_music(self, queue: PlayerQueue):
- """Auto turn on Radio Mode based on enqueued Media Items."""
- queue.radio_source = [
- await self.mass.music.get_item_by_uri(uri) for uri in queue.enqueued_media_items
- ]
- await self._fill_radio_tracks(queue.queue_id)