From: Marcel van der Veldt Date: Wed, 23 Oct 2024 21:34:36 +0000 (+0200) Subject: Several improvements to the queue controller (#1744) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=603c8a608411f602497a773770f914772693a5b9;p=music-assistant-server.git Several improvements to the queue controller (#1744) --- diff --git a/music_assistant/common/models/enums.py b/music_assistant/common/models/enums.py index 8a22c039..ecfef45b 100644 --- a/music_assistant/common/models/enums.py +++ b/music_assistant/common/models/enums.py @@ -281,7 +281,7 @@ class PlayerFeature(StrEnum): sync: The player supports syncing with other players (of the same platform). accurate_time: The player provides millisecond accurate timing information. seek: The player supports seeking to a specific. - queue: The player supports (en)queuing of media items natively. + enqueue: The player supports (en)queuing of media items natively. """ POWER = "power" @@ -292,6 +292,7 @@ class PlayerFeature(StrEnum): SEEK = "seek" NEXT_PREVIOUS = "next_previous" PLAY_ANNOUNCEMENT = "play_announcement" + ENQUEUE = "enqueue" UNKNOWN = "unknown" @classmethod @@ -311,6 +312,7 @@ class EventType(StrEnum): QUEUE_UPDATED = "queue_updated" QUEUE_ITEMS_UPDATED = "queue_items_updated" QUEUE_TIME_UPDATED = "queue_time_updated" + MEDIA_ITEM_PLAYED = "media_item_played" SHUTDOWN = "application_shutdown" MEDIA_ITEM_ADDED = "media_item_added" MEDIA_ITEM_UPDATED = "media_item_updated" diff --git a/music_assistant/common/models/player_queue.py b/music_assistant/common/models/player_queue.py index 836b96d6..a42b132f 100644 --- a/music_assistant/common/models/player_queue.py +++ b/music_assistant/common/models/player_queue.py @@ -9,11 +9,21 @@ from typing import Any, Self from mashumaro import DataClassDictMixin from music_assistant.common.models.media_items import MediaItemType +from music_assistant.constants import FALLBACK_DURATION from .enums import PlayerState, RepeatMode from .queue_item import QueueItem +@dataclass +class PlayLogEntry: + """Representation of a PlayLogEntry within Music Assistant.""" + + queue_item_id: str + duration: int = FALLBACK_DURATION + seconds_streamed: float | None = None + + @dataclass class PlayerQueue(DataClassDictMixin): """Representation of a PlayerQueue within Music Assistant.""" @@ -37,14 +47,10 @@ class PlayerQueue(DataClassDictMixin): current_item: QueueItem | None = None next_item: QueueItem | None = None radio_source: list[MediaItemType] = field(default_factory=list) - # Use a list of media item uri's here to avoid having to store full MediaItem objects - enqueued_media_items: list[str] = field(default_factory=list) + enqueued_media_items: list[MediaItemType] = field(default_factory=list) flow_mode: bool = False resume_pos: int = 0 - # flow_mode_start_index: index of the first item of the flow stream - flow_mode_start_index: int = 0 - stream_finished: bool | None = None - end_of_track_reached: bool | None = None + flow_mode_stream_log: list[PlayLogEntry] = field(default_factory=list) @property def corrected_elapsed_time(self) -> float: @@ -58,7 +64,8 @@ class PlayerQueue(DataClassDictMixin): d.pop("next_item", None) d.pop("index_in_buffer", None) d.pop("flow_mode", None) - d.pop("flow_mode_start_index", None) + d.pop("flow_mode_stream_log", None) + d.pop("enqueued_media_items", None) return d @classmethod @@ -68,5 +75,6 @@ class PlayerQueue(DataClassDictMixin): d.pop("next_item", None) d.pop("index_in_buffer", None) d.pop("flow_mode", None) - d.pop("flow_mode_start_index", None) + d.pop("flow_mode_stream_log", None) + d.pop("enqueued_media_items", None) return cls.from_dict(d) diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index 7364cef4..d838d721 100644 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -41,18 +41,19 @@ from music_assistant.common.models.errors import ( MusicAssistantError, PlayerUnavailableError, QueueEmpty, + UnsupportedFeaturedException, +) +from music_assistant.common.models.media_items import ( + AudioFormat, + MediaItemType, + Playlist, + media_from_dict, ) -from music_assistant.common.models.media_items import AudioFormat, MediaItemType, media_from_dict from music_assistant.common.models.player import PlayerMedia from music_assistant.common.models.player_queue import PlayerQueue from music_assistant.common.models.queue_item import QueueItem from music_assistant.common.models.streamdetails import StreamDetails -from music_assistant.constants import ( - CONF_CROSSFADE, - CONF_FLOW_MODE, - FALLBACK_DURATION, - MASS_LOGO_ONLINE, -) +from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE, MASS_LOGO_ONLINE from music_assistant.server.helpers.api import api_command from music_assistant.server.helpers.audio import get_stream_details from music_assistant.server.helpers.throttle_retry import BYPASS_THROTTLER @@ -295,22 +296,10 @@ class PlayerQueuesController(CoreController): if queue.repeat_mode == repeat_mode: return # no change queue.repeat_mode = repeat_mode - # ensure that we restart playback or trigger enqueue next if repeat mode changed self.signal_update(queue_id) - if ( - repeat_mode == RepeatMode.ONE - and queue.flow_mode - and queue.state == PlayerState.PLAYING - and queue.current_index != queue.index_in_buffer - ): - # edge case; repeat one enabled in flow mode but the - # flow stream had already loaded a new item in the buffer, - # we need to restart playback - self.mass.create_task(self.resume(queue_id)) - else: - task_id = f"enqueue_next_{queue_id}" - self.logger.info("Repeat mode detected, enqueue next item") - self.mass.call_later(2, self._enqueue_next, queue, queue.current_index, task_id=task_id) + # ensure that we trigger enqueue next if repeat mode changed (if needed/supported) + task_id = f"enqueue_next_{queue_id}" + self.mass.call_later(5, self._enqueue_next, queue, queue.current_index, task_id=task_id) @api_command("player_queues/play_media") async def play_media( @@ -351,9 +340,9 @@ class PlayerQueuesController(CoreController): # clear queue if needed if option == QueueOption.REPLACE: self.clear(queue_id) - # Clear the 'played media item' list when a new queue is requested + # Clear the 'enqueued media item' list when a new queue is requested if option not in (QueueOption.ADD, QueueOption.NEXT): - queue.enqueued_media_items = [] + queue.enqueued_media_items.clear() tracks: list[MediaItemType] = [] radio_source: list[MediaItemType] = [] @@ -370,9 +359,15 @@ class PlayerQueuesController(CoreController): # Save requested media item to play on the queue so we can use it as a source # for Don't stop the music. Use FIFO list to keep track of the last 10 played items - queue.enqueued_media_items.append(media_item.uri) - if len(queue.enqueued_media_items) > 10: - queue.enqueued_media_items.pop(0) + if media_item.media_type in ( + MediaType.TRACK, + MediaType.ALBUM, + MediaType.PLAYLIST, + MediaType.ARTIST, + ): + queue.enqueued_media_items.append(media_item) + if len(queue.enqueued_media_items) > 10: + queue.enqueued_media_items.pop(0) # handle default enqueue option if needed if option is None: @@ -389,31 +384,7 @@ class PlayerQueuesController(CoreController): if radio_mode: radio_source.append(media_item) elif media_item.media_type == MediaType.PLAYLIST: - async for playlist_track in self.mass.music.playlists.tracks( - media_item.item_id, media_item.provider - ): - if not playlist_track.available: - continue - # allow first track to start playing immediately while we still - # work out the rest of the queue - if ( - not queue.shuffle_enabled - and not first_track_seen - and option == QueueOption.REPLACE - and not start_item - ): - first_track_seen = True - self.load( - queue_id, - queue_items=[QueueItem.from_media_item(queue_id, playlist_track)], - keep_remaining=False, - keep_played=False, - ) - await self.play_index(queue_id, 0) - # add the remaining items - option = QueueOption.ADD - else: - tracks.append(playlist_track) + tracks += await self.get_playlist_tracks(media_item, start_item) self.mass.create_task( self.mass.music.mark_item_played( media_item.media_type, media_item.item_id, media_item.provider @@ -427,7 +398,7 @@ class PlayerQueuesController(CoreController): ) ) elif media_item.media_type == MediaType.ALBUM: - tracks += await self.get_album_tracks(media_item) + tracks += await self.get_album_tracks(media_item, start_item) self.mass.create_task( self.mass.music.mark_item_played( media_item.media_type, media_item.item_id, media_item.provider @@ -437,17 +408,6 @@ class PlayerQueuesController(CoreController): # single track or radio item tracks += [media_item] - # handle optional start item (play playlist/album from here feature) - if start_item is not None: - prev_items = [] - next_items = [] - for track in tracks: - if next_items or track.item_id == start_item: - next_items.append(track) - else: - prev_items.append(track) - tracks = next_items + prev_items - except MusicAssistantError as err: # invalid MA uri or item not found error self.logger.warning("Skipping %s: %s", item, str(err)) @@ -591,8 +551,6 @@ class PlayerQueuesController(CoreController): """Clear all items in the queue.""" queue = self._queues[queue_id] queue.radio_source = [] - queue.stream_finished = None - queue.end_of_track_reached = None if queue.state != PlayerState.IDLE: self.mass.create_task(self.stop(queue_id)) queue.current_index = None @@ -610,8 +568,6 @@ class PlayerQueuesController(CoreController): """ if (queue := self.get(queue_id)) and queue.active: queue.resume_pos = queue.corrected_elapsed_time - queue.stream_finished = None - queue.end_of_track_reached = None # forward the actual command to the player provider if player_provider := self.mass.players.get_player_provider(queue.queue_id): await player_provider.cmd_stop(queue_id) @@ -784,12 +740,9 @@ class PlayerQueuesController(CoreController): raise FileNotFoundError(msg) queue.current_index = index queue.index_in_buffer = index - queue.flow_mode_start_index = index + queue.flow_mode_stream_log = [] queue.flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE) next_index = self._get_next_index(queue_id, index, allow_repeat=False) - queue.stream_finished = False - queue.end_of_track_reached = False - queue.current_item = queue_item self.signal_update(queue_id) @@ -968,7 +921,9 @@ class PlayerQueuesController(CoreController): # update current item from player report if queue.flow_mode: # flow mode active, calculate current item - queue.current_index, queue.elapsed_time = self.__get_queue_stream_index(queue, player) + queue.current_index, queue.elapsed_time = self._get_flow_queue_stream_index( + queue, player + ) queue.elapsed_time_last_updated = time.time() else: # queue is active and player has one of our tracks loaded, update state @@ -1021,28 +976,6 @@ class PlayerQueuesController(CoreController): # return early if nothing changed if len(changed_keys) == 0: return - # check if we've reached the end of (the current) track - if ( - queue.current_item - and (duration := queue.current_item.duration) - and (duration - queue.elapsed_time) < 10 - ): - queue.end_of_track_reached = True - elif prev_state["current_index"] != new_state["current_index"]: - queue.end_of_track_reached = False - - # handle auto restart of queue in flow mode when repeat is enabled - if ( - queue.flow_mode - and queue.repeat_mode != RepeatMode.OFF - and queue.stream_finished - and prev_state["state"] == PlayerState.PLAYING - and new_state["state"] == PlayerState.IDLE - ): - # flow mode and repeat mode is on, restart the queue - next_index = self._get_next_index(queue_id, queue.current_index, allow_repeat=True) - if next_index is not None: - self.mass.create_task(self.play_index(queue_id, next_index)) # do not send full updates if only time was updated if changed_keys == {"elapsed_time"}: @@ -1053,38 +986,62 @@ class PlayerQueuesController(CoreController): ) self._prev_states[queue_id] = new_state return - # handle player was playing and is now stopped - # if player finished playing a track for 90%, mark current item as finished - if ( - prev_state["state"] == "playing" - and queue.state == PlayerState.IDLE - and ( - queue.current_item - and queue.current_item.duration - and prev_state["elapsed_time"] > (queue.current_item.duration * 0.90) - ) - ): - queue.current_index += 1 - queue.current_item = None - queue.next_item = None + # signal update and store state self.signal_update(queue_id) self._prev_states[queue_id] = new_state + + # detect change in current index to report that a item has been played + end_of_queue_reached = ( + prev_state["state"] == PlayerState.PLAYING + and new_state["state"] == PlayerState.IDLE + and queue.current_item is not None + and queue.next_item is None + ) + if ( + prev_state["current_index"] is not None + and (prev_state["current_index"] != new_state["current_index"] or end_of_queue_reached) + and (queue_item := self.get_item(queue_id, prev_state["current_index"])) + and (stream_details := queue_item.streamdetails) + ): + seconds_streamed = prev_state["elapsed_time"] + if music_prov := self.mass.get_provider(stream_details.provider): + if seconds_streamed > 10: + self.mass.create_task(music_prov.on_streamed(stream_details, seconds_streamed)) + if queue_item.media_item and seconds_streamed > 10: + # signal 'media item played' event, + # which is useful for plugins that want to do scrobbling + self.mass.signal_event( + EventType.MEDIA_ITEM_PLAYED, + object_id=queue_item.media_item.uri, + data=round(seconds_streamed, 2), + ) + if end_of_queue_reached: + # end of queue reached, clear items + self.mass.call_later( + 5, self._check_clear_queue, queue, task_id=f"clear_queue_{queue_id}" + ) + # watch dynamic radio items refill if needed - if "current_index" in changed_keys: + elif "current_index" in changed_keys: if ( - queue.radio_source - and queue.current_index - and (queue.items - queue.current_index) < 5 - ): - self.mass.create_task(self._fill_radio_tracks(queue_id)) - elif ( - # We have received the last item in the queue and Don't stop the music is enabled queue.dont_stop_the_music_enabled - and queue.current_index + and queue.enqueued_media_items + and queue.current_index is not None and (queue.items - queue.current_index) <= 1 ): - self.mass.create_task(self._schedule_dont_stop_the_music(queue)) + # We have received the last item in the queue and Don't stop the music is enabled + # set the played media item(s) as radio items (which will refill the queue) + # note that this will fail if there are no media items for which we have + # a dynamic radio source. + queue.radio_source = queue.enqueued_media_items + if ( + queue.radio_source + and queue.current_index is not None + and (queue.items - queue.current_index) < 5 + ): + task_id = f"fill_radio_tracks_{queue_id}" + self.mass.call_later(5, self._fill_radio_tracks(queue_id), task_id=task_id) def on_player_remove(self, player_id: str) -> None: """Call when a player is removed from the registry.""" @@ -1093,11 +1050,10 @@ class PlayerQueuesController(CoreController): self._queues.pop(player_id, None) self._queue_items.pop(player_id, None) - async def preload_next_item( + async def load_next_item( self, queue_id: str, current_item_id_or_index: str | int | None = None, - allow_repeat: bool = True, ) -> QueueItem: """Call when a player wants to (pre)load the next item into the buffer. @@ -1116,7 +1072,7 @@ class PlayerQueuesController(CoreController): idx = 0 while True: next_item: QueueItem | None = None - next_index = self._get_next_index(queue_id, cur_index + idx, allow_repeat=allow_repeat) + next_index = self._get_next_index(queue_id, cur_index + idx) if next_index is None: raise QueueEmpty("No more tracks left in the queue.") queue_item = self.get_item(queue_id, next_index) @@ -1165,8 +1121,10 @@ class PlayerQueuesController(CoreController): # No stream details found, skip this QueueItem self.logger.debug("Skipping unplayable item: %s", next_item) # we need to set a fake streamdetails object on the item - # otherwise our flow mode logic will break that + # otherwise our flow mode logic will break which # calculates where we are in the queue + playlog = queue_item.streamdetails.play_log if queue_item.streamdetails else [] + playlog.append(0.0) queue_item.streamdetails = StreamDetails( provider=queue_item.media_item.provider if queue_item.media_item else "unknown", item_id=queue_item.media_item.item_id if queue_item.media_item else "unknown", @@ -1303,6 +1261,91 @@ class PlayerQueuesController(CoreController): media.image_url = self.mass.metadata.get_image_url(queue_item.image) return media + async def get_artist_tracks(self, artist: Artist) -> list[Track]: + """Return tracks for given artist, based on user preference.""" + artist_items_conf = self.mass.config.get_raw_core_config_value( + self.domain, + CONF_DEFAULT_ENQUEUE_SELECT_ARTIST, + ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE, + ) + self.logger.debug( + "Fetching tracks to play for artist %s", + artist.name, + ) + if artist_items_conf in ("library_tracks", "all_tracks"): + all_items = await self.mass.music.artists.tracks( + artist.item_id, + artist.provider, + in_library_only=artist_items_conf == "library_tracks", + ) + random.shuffle(all_items) + return all_items + + if artist_items_conf in ("library_album_tracks", "all_album_tracks"): + all_items: list[Track] = [] + for library_album in await self.mass.music.artists.albums( + artist.item_id, + artist.provider, + in_library_only=artist_items_conf == "library_album_tracks", + ): + for album_track in await self.mass.music.albums.tracks( + library_album.item_id, library_album.provider + ): + if album_track not in all_items: + all_items.append(album_track) + random.shuffle(all_items) + return all_items + + return [] + + async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]: + """Return tracks for given album, based on user preference.""" + album_items_conf = self.mass.config.get_raw_core_config_value( + self.domain, + CONF_DEFAULT_ENQUEUE_SELECT_ALBUM, + ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE, + ) + result: list[Track] = [] + start_item_found = False + self.logger.debug( + "Fetching tracks to play for album %s", + album.name, + ) + for album_track in await self.mass.music.albums.tracks( + item_id=album.item_id, + provider_instance_id_or_domain=album.provider, + in_library_only=album_items_conf == "library_tracks", + ): + if not album_track.available: + continue + if start_item in (album_track.item_id, album_track.uri): + start_item_found = True + if start_item is not None and not start_item_found: + continue + result.append(album_track) + return result + + async def get_playlist_tracks(self, playlist: Playlist, start_item: str | None) -> list[Track]: + """Return tracks for given playlist, based on user preference.""" + result: list[Track] = [] + start_item_found = False + self.logger.debug( + "Fetching tracks to play for playlist %s", + playlist.name, + ) + # TODO: Handle other sort options etc. + async for playlist_track in self.mass.music.playlists.tracks( + playlist.item_id, playlist.provider + ): + if not playlist_track.available: + continue + if start_item in (playlist_track.item_id, playlist_track.uri): + start_item_found = True + if start_item is not None and not start_item_found: + continue + result.append(playlist_track) + return result + def _get_next_index( self, queue_id: str, cur_index: int | None, is_skip: bool = False, allow_repeat: bool = True ) -> int | None: @@ -1336,11 +1379,6 @@ class PlayerQueuesController(CoreController): async def _fill_radio_tracks(self, queue_id: str) -> None: """Fill a Queue with (additional) Radio tracks.""" - # we need to debounce, if we're called twice within a short timeframe - debounce_key = f"fill_radio_{queue_id}" - if getattr(self, debounce_key, None): - return - setattr(self, debounce_key, True) tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False) # fill queue - filter out unavailable items queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available] @@ -1349,15 +1387,15 @@ class PlayerQueuesController(CoreController): queue_items, insert_at_index=len(self._queue_items[queue_id]) + 1, ) - await asyncio.sleep(5) - setattr(self, debounce_key, None) async def _enqueue_next(self, queue: PlayerQueue, current_index: int | str) -> None: """Enqueue the next item in the queue.""" + if queue.flow_mode: + return if isinstance(current_index, str): current_index = self.index_by_id(queue.queue_id, current_index) with suppress(QueueEmpty): - next_item = await self.preload_next_item(queue.queue_id, current_index) + next_item = await self.load_next_item(queue.queue_id, current_index) await self.mass.players.enqueue_next_media( player_id=queue.queue_id, media=self.player_media_from_queue_item(next_item, queue.flow_mode), @@ -1368,19 +1406,31 @@ class PlayerQueuesController(CoreController): ) -> list[Track]: """Call the registered music providers for dynamic tracks.""" queue = self._queues[queue_id] - assert queue.radio_source, "No Radio item(s) loaded/active!" + if not queue.radio_source: + # this may happen during race conditions as this method is called delayed + return None available_base_tracks: list[Track] = [] base_track_sample_size = 5 # Grab all the available base tracks based on the selected source items. # shuffle the source items, just in case for radio_item in random.sample(queue.radio_source, len(queue.radio_source)): ctrl = self.mass.music.get_controller(radio_item.media_type) - available_base_tracks += [ - track - for track in await ctrl.dynamic_base_tracks(radio_item.item_id, radio_item.provider) - # Avoid duplicate base tracks - if track not in available_base_tracks - ] + try: + available_base_tracks += [ + track + for track in await ctrl.dynamic_base_tracks( + radio_item.item_id, radio_item.provider + ) + # Avoid duplicate base tracks + if track not in available_base_tracks + ] + except UnsupportedFeaturedException: + self.logger.debug( + "Skip loading radio items for %s: - " + "Provider %s does not support dynamic (base) tracks", + radio_item.uri, + radio_item.provider, + ) # Sample tracks from the base tracks, which will be used to calculate the dynamic ones base_tracks = random.sample( available_base_tracks, min(base_track_sample_size, len(available_base_tracks)) @@ -1418,100 +1468,56 @@ class PlayerQueuesController(CoreController): ) return queue_tracks - async def get_artist_tracks(self, artist: Artist) -> list[Track]: - """Return tracks for given artist, based on user preference.""" - artist_items_conf = self.mass.config.get_raw_core_config_value( - self.domain, - CONF_DEFAULT_ENQUEUE_SELECT_ARTIST, - ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE, - ) - self.logger.debug( - "Fetching tracks to play for artist %s", - artist.name, - ) - if artist_items_conf in ("library_tracks", "all_tracks"): - all_items = await self.mass.music.artists.tracks( - artist.item_id, - artist.provider, - in_library_only=artist_items_conf == "library_tracks", - ) - random.shuffle(all_items) - return all_items - - if artist_items_conf in ("library_album_tracks", "all_album_tracks"): - all_items: list[Track] = [] - for library_album in await self.mass.music.artists.albums( - artist.item_id, - artist.provider, - in_library_only=artist_items_conf == "library_album_tracks", - ): - for album_track in await self.mass.music.albums.tracks( - library_album.item_id, library_album.provider - ): - if album_track not in all_items: - all_items.append(album_track) - random.shuffle(all_items) - return all_items - - return [] - - async def get_album_tracks(self, album: Album) -> list[Track]: - """Return tracks for given album, based on user preference.""" - album_items_conf = self.mass.config.get_raw_core_config_value( - self.domain, - CONF_DEFAULT_ENQUEUE_SELECT_ALBUM, - ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE, - ) - self.logger.debug( - "Fetching tracks to play for album %s", - album.name, - ) - return await self.mass.music.albums.tracks( - item_id=album.item_id, - provider_instance_id_or_domain=album.provider, - in_library_only=album_items_conf == "library_tracks", - ) - - def __get_queue_stream_index(self, queue: PlayerQueue, player: Player) -> tuple[int, int]: - """Calculate current queue index and current track elapsed time.""" - # player is playing a constant stream so we need to do this the hard way - queue_index = 0 - elapsed_time_queue = player.corrected_elapsed_time or 0 - total_time = 0 + def _check_clear_queue(self, queue: PlayerQueue) -> None: + """Check if the queue should be cleared after the current item.""" + if queue.state != PlayerState.IDLE: + return + if queue.next_item is not None: + return + if queue.current_index >= len(self._queue_items[queue.queue_id]) - 1: + self.logger.info("End of queue reached, clearing items") + self.clear(queue.queue_id) + + def _get_flow_queue_stream_index( + self, queue: PlayerQueue, player: Player + ) -> tuple[int | None, int]: + """Calculate current queue index and current track elapsed time when flow mode is active.""" + elapsed_time_queue_total = player.corrected_elapsed_time or 0 + if queue.current_index is None: + return None, elapsed_time_queue_total + + # For each track that has been streamed/buffered to the player, + # a playlog entry will be created with the queue item id + # and the amount of seconds streamed. We traverse the playlog to figure + # out where we are in the queue, accounting for actual streamed + # seconds (and not duration) and skipped seconds. If a track has been repeated, + # it will simply be in the playlog multiple times. + played_time = 0 + queue_index = queue.current_index or 0 track_time = 0 - queue_items = self._queue_items[queue.queue_id] - if queue_items and len(queue_items) > queue.flow_mode_start_index: - # start_index: holds the position from which the flow stream started - queue_index = queue.flow_mode_start_index - queue_track = None - while len(queue_items) > queue_index: - # keep enumerating the queue tracks to find current track - # starting from the start index - queue_track = queue_items[queue_index] - if not queue_track.streamdetails: - track_time = elapsed_time_queue - total_time - break - track_duration = ( - # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error! - queue_track.streamdetails.seconds_streamed - if queue_track.streamdetails.seconds_streamed is not None - else ( - queue_track.streamdetails.duration - or queue_track.duration - or FALLBACK_DURATION - ) - ) - if elapsed_time_queue > (track_duration + total_time): - # total elapsed time is more than (streamed) track duration - # move index one up - total_time += track_duration - queue_index += 1 + for play_log_entry in queue.flow_mode_stream_log: + queue_item_duration = ( + # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error! + play_log_entry.seconds_streamed + if play_log_entry.seconds_streamed is not None + else play_log_entry.duration + ) + if elapsed_time_queue_total > (queue_item_duration + played_time): + # total elapsed time is more than (streamed) track duration + # this track has been fully played, move in. + played_time += queue_item_duration + else: + # no more seconds left to divide, this is our track + # account for any seeking by adding the skipped/seeked seconds + queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id) + queue_item = self.get_item(queue.queue_id, queue_index) + if queue_item and queue_item.streamdetails: + track_sec_skipped = queue_item.streamdetails.seek_position else: - # no more seconds left to divide, this is our track - # account for any seeking by adding the skipped/seeked seconds - track_sec_skipped = queue_track.streamdetails.seek_position - track_time = elapsed_time_queue + track_sec_skipped - total_time - break + track_sec_skipped = 0 + track_time = elapsed_time_queue_total + track_sec_skipped - played_time + break + return queue_index, track_time def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None: @@ -1530,10 +1536,3 @@ class PlayerQueuesController(CoreController): if self.get_item(queue_id, current_item_id): return current_item_id return None - - async def _schedule_dont_stop_the_music(self, queue: PlayerQueue): - """Auto turn on Radio Mode based on enqueued Media Items.""" - queue.radio_source = [ - await self.mass.music.get_item_by_uri(uri) for uri in queue.enqueued_media_items - ] - await self._fill_radio_tracks(queue.queue_id) diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index 64c6ae77..f796b991 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -555,9 +555,10 @@ class PlayerController(CoreController): async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: """Handle enqueuing of a next media item on the player.""" - player_prov = self.get_player_provider(player_id) - async with self._player_throttlers[player_id]: - await player_prov.enqueue_next_media(player_id=player_id, media=media) + if (player := self.get(player_id)) and PlayerFeature.ENQUEUE in player.supported_features: + player_prov = self.mass.get_provider(player.provider) + async with self._player_throttlers[player_id]: + await player_prov.enqueue_next_media(player_id=player_id, media=media) @api_command("players/cmd/sync") @handle_player_command diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 8b4dd9b3..0b464bca 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -33,6 +33,7 @@ from music_assistant.common.models.enums import ( ) from music_assistant.common.models.errors import QueueEmpty from music_assistant.common.models.media_items import AudioFormat +from music_assistant.common.models.player_queue import PlayLogEntry from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.constants import ( ANNOUNCE_ALERT_FILE, @@ -395,8 +396,6 @@ class StreamsController(CoreController): queue_item.uri, queue.display_name, ) - if queue.stream_finished is not None: - queue.stream_finished = True return resp async def serve_queue_flow_stream(self, request: web.Request) -> web.Response: @@ -620,10 +619,6 @@ class StreamsController(CoreController): queue_track = None last_fadeout_part = b"" queue.flow_mode = True - queue.stream_finished = False - queue.flow_mode_start_index = self.mass.player_queues.index_by_id( - queue.queue_id, start_queue_item.queue_item_id - ) use_crossfade = await self.mass.config.get_player_config_value( queue.queue_id, CONF_CROSSFADE ) @@ -648,9 +643,7 @@ class StreamsController(CoreController): queue_track = start_queue_item else: try: - queue_track = await self.mass.player_queues.preload_next_item( - queue.queue_id, allow_repeat=False - ) + queue_track = await self.mass.player_queues.load_next_item(queue.queue_id) except QueueEmpty: break @@ -668,6 +661,9 @@ class StreamsController(CoreController): self.mass.player_queues.track_loaded_in_buffer( queue.queue_id, queue_track.queue_item_id ) + # append to play log so the queue controller can work out which track is playing + play_log_entry = PlayLogEntry(queue_track.queue_item_id) + queue.flow_mode_stream_log.append(play_log_entry) # set some basic vars pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2) @@ -749,6 +745,8 @@ class StreamsController(CoreController): queue_track.streamdetails.duration = ( queue_track.streamdetails.seek_position + seconds_streamed ) + play_log_entry.seconds_streamed = seconds_streamed + play_log_entry.duration = queue_track.streamdetails.duration total_bytes_sent += bytes_written self.logger.debug( "Finished Streaming queue track: %s (%s) on queue %s", @@ -766,8 +764,6 @@ class StreamsController(CoreController): queue_track.streamdetails.duration += last_part_seconds del last_fadeout_part total_bytes_sent += bytes_written - if queue.stream_finished is not None: - queue.stream_finished = True self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name) async def get_announcement_stream( diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index b37ca762..abf291f6 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -402,7 +402,9 @@ async def get_media_stream( task_id = f"analyze_loudness_{streamdetails.uri}" mass.create_task(analyze_loudness, mass, streamdetails, task_id=task_id) - # report playback + # mark item as played in db if finished or streamed for 30 seconds + # NOTE that this is not the actual played time but the buffered time + # the queue controller will update the actual played time when the item is played if finished or seconds_streamed > 30: mass.create_task( mass.music.mark_item_played( @@ -411,10 +413,6 @@ async def get_media_stream( streamdetails.provider, ) ) - # TODO: move this to the queue controller so we report - # actual playback time instead of buffered seconds - if music_prov := mass.get_provider(streamdetails.provider): - mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed)) def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=None): diff --git a/music_assistant/server/models/player_provider.py b/music_assistant/server/models/player_provider.py index 9dfbe776..51ba8229 100644 --- a/music_assistant/server/models/player_provider.py +++ b/music_assistant/server/models/player_provider.py @@ -111,7 +111,7 @@ class PlayerProvider(Provider): This will NOT be called if the end of the queue is reached (and repeat disabled). This will NOT be called if the player is using flow mode to playback the queue. """ - # will only be called for players with ENQUEUE NEXT feature set. + # will only be called for players with ENQUEUE feature set. raise NotImplementedError async def play_announcement( diff --git a/music_assistant/server/providers/chromecast/__init__.py b/music_assistant/server/providers/chromecast/__init__.py index 032669ab..b118a5dc 100644 --- a/music_assistant/server/providers/chromecast/__init__.py +++ b/music_assistant/server/providers/chromecast/__init__.py @@ -388,6 +388,7 @@ class ChromecastProvider(PlayerProvider): PlayerFeature.VOLUME_SET, PlayerFeature.PAUSE, PlayerFeature.NEXT_PREVIOUS, + PlayerFeature.ENQUEUE, ), enabled_by_default=enabled_by_default, needs_poll=True, diff --git a/music_assistant/server/providers/dlna/__init__.py b/music_assistant/server/providers/dlna/__init__.py index 234f064b..f13f7003 100644 --- a/music_assistant/server/providers/dlna/__init__.py +++ b/music_assistant/server/providers/dlna/__init__.py @@ -60,10 +60,7 @@ if TYPE_CHECKING: from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType -BASE_PLAYER_FEATURES = ( - PlayerFeature.VOLUME_MUTE, - PlayerFeature.VOLUME_SET, -) +BASE_PLAYER_FEATURES = (PlayerFeature.VOLUME_MUTE, PlayerFeature.VOLUME_SET) PLAYER_CONFIG_ENTRIES = ( @@ -622,4 +619,11 @@ class DLNAPlayerProvider(PlayerProvider): def _set_player_features(self, dlna_player: DLNAPlayer) -> None: """Set Player Features based on config values and capabilities.""" - dlna_player.player.supported_features = BASE_PLAYER_FEATURES + if self.mass.config.get_raw_player_config_value( + dlna_player.udn, + CONF_ENTRY_FLOW_MODE_DEFAULT_ENABLED.key, + CONF_ENTRY_FLOW_MODE_DEFAULT_ENABLED.default_value, + ): + dlna_player.player.supported_features = BASE_PLAYER_FEATURES + else: + dlna_player.player.supported_features = (*BASE_PLAYER_FEATURES, PlayerFeature.ENQUEUE) diff --git a/music_assistant/server/providers/hass_players/__init__.py b/music_assistant/server/providers/hass_players/__init__.py index f70d7f0b..a1865447 100644 --- a/music_assistant/server/providers/hass_players/__init__.py +++ b/music_assistant/server/providers/hass_players/__init__.py @@ -381,6 +381,8 @@ class HomeAssistantPlayers(PlayerProvider): supported_features.append(PlayerFeature.VOLUME_SET) if MediaPlayerEntityFeature.VOLUME_MUTE in hass_supported_features: supported_features.append(PlayerFeature.VOLUME_MUTE) + if MediaPlayerEntityFeature.MEDIA_ENQUEUE in hass_supported_features: + supported_features.append(PlayerFeature.ENQUEUE) if ( MediaPlayerEntityFeature.TURN_ON in hass_supported_features and MediaPlayerEntityFeature.TURN_OFF in hass_supported_features diff --git a/music_assistant/server/providers/player_group/__init__.py b/music_assistant/server/providers/player_group/__init__.py index 2ef226ae..b442ed3c 100644 --- a/music_assistant/server/providers/player_group/__init__.py +++ b/music_assistant/server/providers/player_group/__init__.py @@ -658,10 +658,7 @@ class PlayerGroupProvider(PlayerProvider): player_provider = cast(PlayerProvider, player_provider) model_name = "Sync Group" manufacturer = self.mass.get_provider(group_type).name - for feature in ( - PlayerFeature.PAUSE, - PlayerFeature.VOLUME_MUTE, - ): + for feature in (PlayerFeature.PAUSE, PlayerFeature.VOLUME_MUTE, PlayerFeature.ENQUEUE): if all(feature in x.supported_features for x in player_provider.players): player_features.add(feature) else: diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index 51b1da42..4d62cd9c 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -646,6 +646,7 @@ class SlimprotoProvider(PlayerProvider): PlayerFeature.VOLUME_SET, PlayerFeature.PAUSE, PlayerFeature.VOLUME_MUTE, + PlayerFeature.ENQUEUE, ), ) await self.mass.players.register_or_update(player) diff --git a/music_assistant/server/providers/sonos/const.py b/music_assistant/server/providers/sonos/const.py index af19560e..75ff2933 100644 --- a/music_assistant/server/providers/sonos/const.py +++ b/music_assistant/server/providers/sonos/const.py @@ -17,6 +17,7 @@ PLAYER_FEATURES_BASE = { PlayerFeature.SYNC, PlayerFeature.VOLUME_MUTE, PlayerFeature.PAUSE, + PlayerFeature.ENQUEUE, } SOURCE_LINE_IN = "line_in" diff --git a/music_assistant/server/providers/sonos_s1/__init__.py b/music_assistant/server/providers/sonos_s1/__init__.py index d6e9cbf8..c3ab38dd 100644 --- a/music_assistant/server/providers/sonos_s1/__init__.py +++ b/music_assistant/server/providers/sonos_s1/__init__.py @@ -56,6 +56,7 @@ PLAYER_FEATURES = ( PlayerFeature.SYNC, PlayerFeature.VOLUME_MUTE, PlayerFeature.PAUSE, + PlayerFeature.ENQUEUE, ) CONF_NETWORK_SCAN = "network_scan"