Refactor enqueing of items during playback and standardize player settings (#1008)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 19 Jan 2024 14:32:57 +0000 (15:32 +0100)
committerGitHub <noreply@github.com>
Fri, 19 Jan 2024 14:32:57 +0000 (15:32 +0100)
21 files changed:
music_assistant/client/players.py
music_assistant/common/models/config_entries.py
music_assistant/common/models/enums.py
music_assistant/common/models/player.py
music_assistant/common/models/player_queue.py
music_assistant/constants.py
music_assistant/server/controllers/config.py
music_assistant/server/controllers/player_queues.py
music_assistant/server/controllers/players.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/util.py
music_assistant/server/models/player_provider.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/chromecast/__init__.py
music_assistant/server/providers/dlna/__init__.py
music_assistant/server/providers/slimproto/__init__.py
music_assistant/server/providers/slimproto/cli.py
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/providers/sonos/__init__.py
music_assistant/server/providers/ugp/__init__.py

index 562b3140e432411ceb37703f798fd5a5fdef8917..6e6e1f8f168df655a4695196f3095e59f6d5bb80 100644 (file)
@@ -259,12 +259,6 @@ class Players:
             "players/queue/repeat", queue_id=queue_id, repeat_mode=repeat_mode
         )
 
-    async def queue_command_crossfade(self, queue_id: str, crossfade_enabled: bool) -> None:
-        """Configure crossfade mode on the the queue."""
-        await self.client.send_command(
-            "players/queue/crossfade", queue_id=queue_id, crossfade_enabled=crossfade_enabled
-        )
-
     async def play_media(
         self,
         queue_id: str,
index 826b2ba099d5d59f299a2f7acb83f373628da033..27a519f57db102e7471c67377b9ef5ca16d524d1 100644 (file)
@@ -12,6 +12,7 @@ from mashumaro import DataClassDictMixin
 from music_assistant.common.models.enums import ProviderType
 from music_assistant.constants import (
     CONF_AUTO_PLAY,
+    CONF_CROSSFADE,
     CONF_CROSSFADE_DURATION,
     CONF_EQ_BASS,
     CONF_EQ_MID,
@@ -20,7 +21,6 @@ from music_assistant.constants import (
     CONF_HIDE_GROUP_CHILDS,
     CONF_LOG_LEVEL,
     CONF_OUTPUT_CHANNELS,
-    CONF_OUTPUT_CODEC,
     CONF_VOLUME_NORMALIZATION,
     CONF_VOLUME_NORMALIZATION_TARGET,
     SECURE_STRING_SUBSTITUTE,
@@ -297,36 +297,18 @@ DEFAULT_CORE_CONFIG_ENTRIES = (CONF_ENTRY_LOG_LEVEL,)
 
 # some reusable player config entries
 
-CONF_ENTRY_OUTPUT_CODEC = ConfigEntry(
-    key=CONF_OUTPUT_CODEC,
-    type=ConfigEntryType.STRING,
-    label="Output codec",
-    options=[
-        ConfigValueOption("FLAC (lossless, compact file size)", "flac"),
-        ConfigValueOption("AAC (lossy, superior quality)", "aac"),
-        ConfigValueOption("MP3 (lossy, average quality)", "mp3"),
-        ConfigValueOption("WAV (lossless, huge file size)", "wav"),
-        ConfigValueOption("PCM (lossless, huge file size)", "pcm"),
-    ],
-    default_value="flac",
-    description="Define the codec that is sent to the player when streaming audio. "
-    "By default Music Assistant prefers FLAC because it is lossless, has a "
-    "respectable filesize and is supported by most player devices. "
-    "Change this setting only if needed for your device/environment.",
-    advanced=True,
-)
-
 CONF_ENTRY_FLOW_MODE = ConfigEntry(
     key=CONF_FLOW_MODE,
     type=ConfigEntryType.BOOLEAN,
     label="Enable queue flow mode",
     default_value=False,
     description='Enable "flow" mode where all queue tracks are sent as a continuous '
-    "audio stream. Use for players that do not natively support gapless and/or "
+    "audio stream. \nUse for players that do not natively support gapless and/or "
     "crossfading or if the player has trouble transitioning between tracks.",
     advanced=False,
 )
 
+
 CONF_ENTRY_AUTO_PLAY = ConfigEntry(
     key=CONF_AUTO_PLAY,
     type=ConfigEntryType.BOOLEAN,
@@ -418,27 +400,22 @@ CONF_ENTRY_HIDE_GROUP_MEMBERS = ConfigEntry(
     advanced=False,
 )
 
+CONF_ENTRY_CROSSFADE = ConfigEntry(
+    key=CONF_CROSSFADE,
+    type=ConfigEntryType.BOOLEAN,
+    label="Enable crossfade",
+    default_value=False,
+    description="Enable a crossfade transition between (queue) tracks.",
+    advanced=False,
+)
+
 CONF_ENTRY_CROSSFADE_DURATION = ConfigEntry(
     key=CONF_CROSSFADE_DURATION,
     type=ConfigEntryType.INTEGER,
-    range=(1, 20),
+    range=(1, 10),
     default_value=8,
     label="Crossfade duration",
     description="Duration in seconds of the crossfade between tracks (if enabled)",
-    depends_on=CONF_FLOW_MODE,
+    depends_on=CONF_CROSSFADE,
     advanced=True,
 )
-
-
-DEFAULT_PLAYER_CONFIG_ENTRIES = (
-    CONF_ENTRY_VOLUME_NORMALIZATION,
-    CONF_ENTRY_FLOW_MODE,
-    CONF_ENTRY_AUTO_PLAY,
-    CONF_ENTRY_OUTPUT_CODEC,
-    CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
-    CONF_ENTRY_EQ_BASS,
-    CONF_ENTRY_EQ_MID,
-    CONF_ENTRY_EQ_TREBLE,
-    CONF_ENTRY_OUTPUT_CHANNELS,
-    CONF_ENTRY_CROSSFADE_DURATION,
-)
index 14e340a4a4115a0d12085c312e90db2b922f419d..15398b721c9d22977309a2c1482ed82fd1b6a027 100644 (file)
@@ -229,8 +229,7 @@ class PlayerFeature(StrEnum):
     SYNC = "sync"
     ACCURATE_TIME = "accurate_time"
     SEEK = "seek"
-    SET_MEMBERS = "set_members"
-    QUEUE = "queue"
+    ENQUEUE_NEXT = "enqueue_next"
     CROSSFADE = "crossfade"
 
 
index 52c201eed7fdcde9110a43fc38b44bfddcfa49a2..20d3660e64159f768e3e3ce9643e08d9aa8e391a 100644 (file)
@@ -34,7 +34,6 @@ class Player(DataClassDictMixin):
 
     elapsed_time: float = 0
     elapsed_time_last_updated: float = time.time()
-    current_url: str | None = None
     state: PlayerState = PlayerState.IDLE
 
     volume_level: int = 100
@@ -53,6 +52,10 @@ class Player(DataClassDictMixin):
     # otherwise it will be set to the own player_id
     active_source: str = ""
 
+    # current_item_id: return item_id/uri of the current active/loaded item on the player
+    # this may be a MA queue_item_id, url, uri or some provider specific string
+    current_item_id: str | None = None
+
     # can_sync_with: return tuple of player_ids that can be synced to/with this player
     # usually this is just a list of all player_ids within the playerprovider
     can_sync_with: tuple[str, ...] = field(default=tuple())
index 66a35af0f040eb9b5fc76af94af40f46e13afe85..c08b0a5e6606e1d4a8ea5bffa984c14445d3b282 100644 (file)
@@ -24,7 +24,6 @@ class PlayerQueue(DataClassDictMixin):
 
     shuffle_enabled: bool = False
     repeat_mode: RepeatMode = RepeatMode.OFF
-    crossfade_enabled: bool = True
     # current_index: index that is active (e.g. being played) by the player
     current_index: int | None = None
     # index_in_buffer: index that has been preloaded/buffered by the player
@@ -37,6 +36,9 @@ class PlayerQueue(DataClassDictMixin):
     radio_source: list[MediaItemType] = field(default_factory=list)
     announcement_in_progress: bool = False
     flow_mode: bool = False
+    # flow_mode_start_index: index of the first item of the flow stream
+    flow_mode_start_index: int = 0
+    next_track_enqueued: bool = False
 
     @property
     def corrected_elapsed_time(self) -> float:
index ea64aa717bdc86905b8d373bce36b786d055590c..b96d78b9a575817e499a0a4a55c945737b22455f 100755 (executable)
@@ -44,13 +44,13 @@ CONF_OUTPUT_CHANNELS: Final[str] = "output_channels"
 CONF_FLOW_MODE: Final[str] = "flow_mode"
 CONF_LOG_LEVEL: Final[str] = "log_level"
 CONF_HIDE_GROUP_CHILDS: Final[str] = "hide_group_childs"
-CONF_OUTPUT_CODEC: Final[str] = "output_codec"
 CONF_GROUPED_POWER_ON: Final[str] = "grouped_power_on"
 CONF_CROSSFADE_DURATION: Final[str] = "crossfade_duration"
 CONF_BIND_IP: Final[str] = "bind_ip"
 CONF_BIND_PORT: Final[str] = "bind_port"
 CONF_PUBLISH_IP: Final[str] = "publish_ip"
 CONF_AUTO_PLAY: Final[str] = "auto_play"
+CONF_CROSSFADE: Final[str] = "crossfade"
 
 # config default values
 DEFAULT_HOST: Final[str] = "0.0.0.0"
index 1a5bb03b4d94b0f1eeccfa78136e908bd895ccc4..7aaa3ade5a9a32677e379b557449c8a888d7ce71 100644 (file)
@@ -18,7 +18,6 @@ from music_assistant.common.helpers.json import JSON_DECODE_EXCEPTIONS, json_dum
 from music_assistant.common.models import config_entries
 from music_assistant.common.models.config_entries import (
     DEFAULT_CORE_CONFIG_ENTRIES,
-    DEFAULT_PLAYER_CONFIG_ENTRIES,
     DEFAULT_PROVIDER_CONFIG_ENTRIES,
     ConfigEntry,
     ConfigValueType,
@@ -327,36 +326,30 @@ class ConfigController:
         """Return configuration for a single player."""
         if raw_conf := self.get(f"{CONF_PLAYERS}/{player_id}"):
             if prov := self.mass.get_provider(raw_conf["provider"]):
-                prov_entries = await prov.get_player_config_entries(player_id)
+                conf_entries = await prov.get_player_config_entries(player_id)
                 if player := self.mass.players.get(player_id, False):
                     raw_conf["default_name"] = player.display_name
             else:
-                prov_entries = tuple()
+                conf_entries = tuple()
                 raw_conf["available"] = False
                 raw_conf["name"] = raw_conf.get("name")
                 raw_conf["default_name"] = raw_conf.get("default_name") or raw_conf["player_id"]
-            prov_entries_keys = {x.key for x in prov_entries}
-            # combine provider defined entries with default player config entries
-            entries = prov_entries + tuple(
-                x for x in DEFAULT_PLAYER_CONFIG_ENTRIES if x.key not in prov_entries_keys
-            )
-            return PlayerConfig.parse(entries, raw_conf)
+            return PlayerConfig.parse(conf_entries, raw_conf)
         raise KeyError(f"No config found for player id {player_id}")
 
     @api_command("config/players/get_value")
-    async def get_player_config_value(self, player_id: str, key: str) -> ConfigValueType:
+    async def get_player_config_value(
+        self,
+        player_id: str,
+        key: str,
+    ) -> ConfigValueType:
         """Return single configentry value for a player."""
-        cache_key = f"player_conf_value_{player_id}.{key}"
-        if (cached_value := self._value_cache.get(cache_key)) and cached_value is not None:
-            return cached_value
         conf = await self.get_player_config(player_id)
         val = (
             conf.values[key].value
             if conf.values[key].value is not None
             else conf.values[key].default_value
         )
-        # store value in cache because this method can potentially be called very often
-        self._value_cache[cache_key] = val
         return val
 
     def get_raw_player_config_value(
index 60de382f85ad15e76b7b1694b3ca9f88544d3667..55aa3b57402006106c7d40fff199ae6f04ab26b0 100755 (executable)
@@ -5,6 +5,7 @@ import logging
 import random
 import time
 from collections.abc import AsyncGenerator
+from contextlib import suppress
 from typing import TYPE_CHECKING, Any
 
 from music_assistant.common.helpers.util import get_changed_keys
@@ -25,9 +26,9 @@ from music_assistant.common.models.errors import (
 from music_assistant.common.models.media_items import MediaItemType, media_from_dict
 from music_assistant.common.models.player_queue import PlayerQueue
 from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_FLOW_MODE, FALLBACK_DURATION, ROOT_LOGGER_NAME
+from music_assistant.constants import FALLBACK_DURATION, ROOT_LOGGER_NAME
 from music_assistant.server.helpers.api import api_command
-from music_assistant.server.helpers.audio import get_stream_details
+from music_assistant.server.helpers.audio import set_stream_details
 from music_assistant.server.models.core_controller import CoreController
 
 if TYPE_CHECKING:
@@ -139,15 +140,6 @@ class PlayerQueuesController(CoreController):
         queue.repeat_mode = repeat_mode
         self.signal_update(queue_id)
 
-    @api_command("players/queue/crossfade")
-    def set_crossfade(self, queue_id: str, crossfade_enabled: bool) -> None:
-        """Configure crossfade setting on the the queue."""
-        queue = self._queues[queue_id]
-        if queue.crossfade_enabled == crossfade_enabled:
-            return  # no change
-        queue.crossfade_enabled = crossfade_enabled
-        self.signal_update(queue_id)
-
     @api_command("players/queue/play_media")
     async def play_media(
         self,
@@ -408,6 +400,11 @@ class PlayerQueuesController(CoreController):
         if self._queues[queue_id].announcement_in_progress:
             LOGGER.warning("Ignore queue command for %s because an announcement is in progress.")
             return
+        player = self.mass.players.get(queue_id, True)
+        if PlayerFeature.PAUSE not in player.supported_features:
+            # if player does not support pause, we need to send stop
+            await self.stop(queue_id)
+            return
         # simply forward the command to underlying player
         await self.mass.players.cmd_pause(queue_id)
 
@@ -429,7 +426,7 @@ class PlayerQueuesController(CoreController):
         - queue_id: queue_id of the queue to handle the command.
         """
         current_index = self._queues[queue_id].current_index
-        next_index = self.get_next_index(queue_id, current_index, True)
+        next_index = self._get_next_index(queue_id, current_index, True)
         if next_index is None:
             return
         await self.play_index(queue_id, next_index)
@@ -528,41 +525,14 @@ class PlayerQueuesController(CoreController):
             raise FileNotFoundError(f"Unknown index/id: {index}")
         queue.current_index = index
         queue.index_in_buffer = index
-        # execute the play_media command on the player
-        queue_player = self.mass.players.get(queue_id)
-        need_multi_stream = (
-            queue_player.provider in ("airplay", "ugp", "slimproto")
-            and len(queue_player.group_childs) > 1
-        )
+        queue.flow_mode_start_index = index
+        queue.flow_mode = False  # reset
         player_prov = self.mass.players.get_player_provider(queue_id)
-        if need_multi_stream:
-            # handle special multi client stream
-            queue.flow_mode = True
-            stream_job = await self.mass.streams.create_multi_client_stream_job(
-                queue_id=queue_id,
-                start_queue_item=queue_item,
-                seek_position=int(seek_position),
-                fade_in=fade_in,
-            )
-            await player_prov.cmd_handle_stream_job(player_id=queue_id, stream_job=stream_job)
-            return
-        # regular stream
-        queue.flow_mode = await self.mass.config.get_player_config_value(
-            queue.queue_id, CONF_FLOW_MODE
-        )
-        url = await self.mass.streams.resolve_stream_url(
-            queue_id=queue_id,
+        await player_prov.play_media(
+            player_id=queue_id,
             queue_item=queue_item,
             seek_position=int(seek_position),
             fade_in=fade_in,
-            flow_mode=queue.flow_mode,
-        )
-        await player_prov.cmd_play_url(
-            player_id=queue_id,
-            url=url,
-            # set queue_item to None if we're sending a flow mode url
-            # as the metadata is rather useless then
-            queue_item=None if queue.flow_mode else queue_item,
         )
 
     # Interaction with player
@@ -599,7 +569,11 @@ class PlayerQueuesController(CoreController):
     def on_player_update(
         self, player: Player, changed_values: dict[str, tuple[Any, Any]]  # noqa: ARG002
     ) -> None:
-        """Call when a PlayerQueue needs to be updated (e.g. when player updates)."""
+        """
+        Call when a PlayerQueue needs to be updated (e.g. when player updates).
+
+        NOTE: This is called every second if the player is playing.
+        """
         if player.player_id not in self._queues:
             # race condition
             return
@@ -613,37 +587,34 @@ class PlayerQueuesController(CoreController):
         queue.items = len(self._queue_items[queue_id])
         # determine if this queue is currently active for this player
         queue.active = player.active_source == queue.queue_id
-        if queue.active:
-            queue.state = player.state
-            # update current item from player report
-            player_item_index = self._get_player_item_index(queue_id, player.current_url)
-            if player_item_index is not None:
-                if queue.flow_mode:
-                    # flow mode active, calculate current item
-                    current_index, item_time = self.__get_queue_stream_index(
-                        queue, player, player_item_index
-                    )
-                else:
-                    # queue is active and player has one of our tracks loaded, update state
-                    current_index = player_item_index
-                    item_time = int(player.corrected_elapsed_time)
-                # only update these attributes if the queue is active
-                # and has an item loaded so we are able to resume it
-                queue.current_index = current_index
-                queue.elapsed_time = item_time
-                queue.elapsed_time_last_updated = time.time()
-                queue.current_item = self.get_item(queue_id, queue.current_index)
-                queue.next_item = self.get_next_item(queue_id)
-                # correct elapsed time when seeking
-                if (
-                    queue.current_item
-                    and queue.current_item.streamdetails
-                    and queue.current_item.streamdetails.seconds_skipped
-                    and not queue.flow_mode
-                ):
-                    queue.elapsed_time += queue.current_item.streamdetails.seconds_skipped
-        else:
+        if not queue.active:
             queue.state = PlayerState.IDLE
+            return
+        # update current item from player report
+        if queue.flow_mode:
+            # flow mode active, calculate current item
+            queue.current_index, queue.elapsed_time = self.__get_queue_stream_index(queue, player)
+        else:
+            # queue is active and player has one of our tracks loaded, update state
+            if item_id := self._parse_player_current_item_id(queue_id, player.current_item_id):
+                queue.current_index = self.index_by_id(queue_id, item_id)
+            queue.elapsed_time = int(player.corrected_elapsed_time)
+
+        # only update these attributes if the queue is active
+        # and has an item loaded so we are able to resume it
+        queue.state = player.state
+        queue.elapsed_time_last_updated = time.time()
+        queue.current_item = self.get_item(queue_id, queue.current_index)
+        queue.next_item = self._get_next_item(queue_id)
+        # correct elapsed time when seeking
+        if (
+            queue.current_item
+            and queue.current_item.streamdetails
+            and queue.current_item.streamdetails.seconds_skipped
+            and not queue.flow_mode
+        ):
+            queue.elapsed_time += queue.current_item.streamdetails.seconds_skipped
+
         # basic throttle: do not send state changed events if queue did not actually change
         prev_state = self._prev_states.get(queue_id, {})
         new_state = queue.to_dict()
@@ -653,6 +624,9 @@ class PlayerQueuesController(CoreController):
         # return early if nothing changed
         if len(changed_keys) == 0:
             return
+        # handle enqueuing of next item to play
+        if not queue.flow_mode:
+            self._check_enqueue_next(player, queue, prev_state, new_state)
         # do not send full updates if only time was updated
         if changed_keys == {"elapsed_time"}:
             self.mass.signal_event(
@@ -692,58 +666,45 @@ class PlayerQueuesController(CoreController):
         self._queues.pop(player_id, None)
         self._queue_items.pop(player_id, None)
 
-    async def preload_next_url(
-        self, queue_id: str, current_item_id: str | None = None
-    ) -> tuple[str, QueueItem, bool]:
-        """Call when a player wants to load the next track/url into the buffer.
+    async def preload_next_item(
+        self, queue_id: str, current_item_id_or_index: str | int | None = None
+    ) -> QueueItem:
+        """Call when a player wants to (pre)load the next item into the buffer.
 
-        The result is a tuple of the next url + QueueItem to Play,
-        and a bool if the player should crossfade (if supported).
         Raises QueueEmpty if there are no more tracks left.
         """
         queue = self.get(queue_id)
         if not queue:
             raise PlayerUnavailableError(f"PlayerQueue {queue_id} is not available")
-        if current_item_id:
-            cur_index = self.index_by_id(queue_id, current_item_id) or 0
-        else:
+        if current_item_id_or_index is None:
             cur_index = queue.index_in_buffer or queue.current_index or 0
-        cur_item = self.get_item(queue_id, cur_index)
+        elif isinstance(current_item_id_or_index, str):
+            cur_index = self.index_by_id(queue_id, current_item_id_or_index)
+        else:
+            cur_index = current_item_id_or_index
         idx = 0
         while True:
-            next_index = self.get_next_index(queue_id, cur_index + idx)
-            next_item = self.get_item(queue_id, next_index)
-            if not cur_item or not next_item:
+            next_index = self._get_next_index(queue_id, cur_index + idx)
+            if next_index is None:
                 raise QueueEmpty("No more tracks left in the queue.")
+            next_item = self.get_item(queue_id, next_index)
             try:
                 # Check if the QueueItem is playable. For example, YT Music returns Radio Items
                 # that are not playable which will stop playback.
-                next_item.streamdetails = await get_stream_details(
-                    mass=self.mass, queue_item=next_item
-                )
+                await set_stream_details(mass=self.mass, queue_item=next_item)
                 # Lazy load the full MediaItem for the QueueItem, making sure to get the
                 # maximum quality of thumbs
                 next_item.media_item = await self.mass.music.get_item_by_uri(next_item.uri)
                 break
             except MediaNotFoundError:
                 # No stream details found, skip this QueueItem
+                next_item = None
                 idx += 1
+        if next_item is None:
+            raise QueueEmpty("No more (playable) tracks left in the queue.")
         queue.index_in_buffer = next_index
-        # work out crossfade
-        crossfade = queue.crossfade_enabled
-        if (
-            cur_item.media_type == MediaType.TRACK
-            and next_item.media_type == MediaType.TRACK
-            and cur_item.media_item.album == next_item.media_item.album
-        ):
-            # disable crossfade if playing tracks from same album
-            # TODO: make this a bit more intelligent.
-            crossfade = False
-        url = await self.mass.streams.resolve_stream_url(
-            queue_id=queue_id,
-            queue_item=next_item,
-        )
-        return (url, next_item, crossfade)
+        queue.next_track_enqueued = True
+        return next_item
 
     # Main queue manipulation methods
 
@@ -798,43 +759,6 @@ class PlayerQueuesController(CoreController):
             return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
         return None
 
-    def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
-        """Get index by queue_item_id."""
-        queue_items = self._queue_items[queue_id]
-        for index, item in enumerate(queue_items):
-            if item.queue_item_id == queue_item_id:
-                return index
-        return None
-
-    def get_next_index(self, queue_id: str, cur_index: int | None, is_skip: bool = False) -> int:
-        """Return the next index for the queue, accounting for repeat settings."""
-        queue = self._queues[queue_id]
-        queue_items = self._queue_items[queue_id]
-        # handle repeat single track
-        if queue.repeat_mode == RepeatMode.ONE and not is_skip:
-            return cur_index
-        # handle repeat all
-        if (
-            queue.repeat_mode == RepeatMode.ALL
-            and queue_items
-            and cur_index == (len(queue_items) - 1)
-        ):
-            return 0
-        # simply return the next index. other logic is guarded to detect the index
-        # being higher than the number of items to detect end of queue and/or handle repeat.
-        if cur_index is None:
-            return 0
-        next_index = cur_index + 1
-        return next_index
-
-    def get_next_item(self, queue_id: str, cur_index: int | None = None) -> QueueItem | None:
-        """Return next QueueItem for given queue."""
-        queue = self._queues[queue_id]
-        if cur_index is None:
-            cur_index = queue.current_index
-        next_index = self.get_next_index(queue_id, cur_index)
-        return self.get_item(queue_id, next_index)
-
     def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
         """Signal state changed of given queue."""
         queue = self._queues[queue_id]
@@ -858,6 +782,44 @@ class PlayerQueuesController(CoreController):
             )
         )
 
+    def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
+        """Get index by queue_item_id."""
+        queue_items = self._queue_items[queue_id]
+        for index, item in enumerate(queue_items):
+            if item.queue_item_id == queue_item_id:
+                return index
+        return None
+
+    def _get_next_index(
+        self, queue_id: str, cur_index: int | None, is_skip: bool = False
+    ) -> int | None:
+        """
+        Return the next index for the queue, accounting for repeat settings.
+
+        Will return None if there are no (more) items in the queue.
+        """
+        queue = self._queues[queue_id]
+        queue_items = self._queue_items[queue_id]
+        if not queue_items or cur_index is None:
+            # queue is empty
+            return None
+        if cur_index is None:
+            cur_index = queue.current_index
+        # handle repeat single track
+        if queue.repeat_mode == RepeatMode.ONE and not is_skip:
+            return cur_index
+        # handle cur_index is last index of the queue
+        if cur_index >= (len(queue_items) - 1):
+            # if repeat all is enabled, we simply start again from the beginning
+            return 0 if RepeatMode.ALL else None
+        return cur_index + 1
+
+    def _get_next_item(self, queue_id: str, cur_index: int | None = None) -> QueueItem | None:
+        """Return next QueueItem for given queue."""
+        if (next_index := self._get_next_index(queue_id, cur_index)) is not None:
+            return self.get_item(queue_id, next_index)
+        return None
+
     async def _fill_radio_tracks(self, queue_id: str) -> None:
         """Fill a Queue with (additional) Radio tracks."""
         tracks = await self._get_radio_tracks(queue_id)
@@ -869,6 +831,56 @@ class PlayerQueuesController(CoreController):
             insert_at_index=len(self._queue_items[queue_id]) - 1,
         )
 
+    def _check_enqueue_next(
+        self,
+        player: Player,
+        queue: PlayerQueue,
+        prev_state: dict[str, Any],
+        new_state: dict[str, Any],
+    ) -> None:
+        """Check if we need to enqueue the next item to the player itself."""
+        if not queue.active:
+            return
+        if prev_state.get("state") != PlayerState.PLAYING:
+            return
+        current_item = self.get_item(queue.queue_id, queue.current_index)
+        if not current_item:
+            return  # guard, just in case something bad happened
+        if not current_item.duration:
+            return
+        if current_item.streamdetails and current_item.streamdetails.seconds_streamed:
+            duration = current_item.streamdetails.seconds_streamed
+        else:
+            duration = current_item.duration
+        seconds_remaining = duration - player.corrected_elapsed_time
+
+        if PlayerFeature.ENQUEUE_NEXT in player.supported_features:
+            # player supports enqueue next feature.
+            # we enqueue the next track 15 seconds before the current track ends
+            end_of_track_reached = seconds_remaining <= 15
+        else:
+            # player does not support enqueue next feature.
+            # we wait for the player to stop after it reaches the end of the track
+            prev_seconds_remaining = prev_state.get("seconds_remaining", seconds_remaining)
+            end_of_track_reached = prev_seconds_remaining <= 6 and queue.state == PlayerState.IDLE
+            new_state["seconds_remaining"] = seconds_remaining
+
+        if not end_of_track_reached:
+            queue.next_track_enqueued = False  # reset
+            return
+        if queue.next_track_enqueued:
+            return  # already enqueued
+
+        async def _enqueue_next(index: int):
+            player_prov = self.mass.players.get_player_provider(player.player_id)
+            with suppress(QueueEmpty):
+                next_item = await self.preload_next_item(queue.queue_id, index)
+                await player_prov.enqueue_next_queue_item(
+                    player_id=player.player_id, queue_item=next_item
+                )
+
+        self.mass.create_task(_enqueue_next(queue.current_index))
+
     async def _get_radio_tracks(self, queue_id: str) -> list[MediaItemType]:
         """Call the registered music providers for dynamic tracks."""
         queue = self._queues[queue_id]
@@ -884,9 +896,7 @@ class PlayerQueuesController(CoreController):
                 break
         return tracks
 
-    def __get_queue_stream_index(
-        self, queue: PlayerQueue, player: Player, start_index: int
-    ) -> tuple[int, int]:
+    def __get_queue_stream_index(self, queue: PlayerQueue, player: Player) -> tuple[int, int]:
         """Calculate current queue index and current track elapsed time."""
         # player is playing a constant stream so we need to do this the hard way
         queue_index = 0
@@ -894,9 +904,9 @@ class PlayerQueuesController(CoreController):
         total_time = 0
         track_time = 0
         queue_items = self._queue_items[queue.queue_id]
-        if queue_items and len(queue_items) > start_index:
+        if queue_items and len(queue_items) > queue.flow_mode_start_index:
             # start_index: holds the position from which the flow stream started
-            queue_index = start_index
+            queue_index = queue.flow_mode_start_index
             queue_track = None
             while len(queue_items) > queue_index:
                 # keep enumerating the queue tracks to find current track
@@ -905,10 +915,12 @@ class PlayerQueuesController(CoreController):
                 if not queue_track.streamdetails:
                     track_time = elapsed_time_queue - total_time
                     break
-                if queue_track.streamdetails.seconds_streamed is not None:
-                    track_duration = queue_track.streamdetails.seconds_streamed
-                else:
-                    track_duration = queue_track.duration or FALLBACK_DURATION
+                track_duration = (
+                    queue_track.streamdetails.seconds_streamed
+                    or queue_track.streamdetails.duration
+                    or queue_track.duration
+                    or FALLBACK_DURATION
+                )
                 if elapsed_time_queue > (track_duration + total_time):
                     # total elapsed time is more than (streamed) track duration
                     # move index one up
@@ -922,10 +934,13 @@ class PlayerQueuesController(CoreController):
                     break
         return queue_index, track_time
 
-    def _get_player_item_index(self, queue_id: str, url: str) -> str | None:
-        """Parse (start) QueueItem ID from Player's current url."""
-        if url and self.mass.streams.base_url in url and queue_id in url:
-            # try to extract the item id from the uri
-            current_item_id = url.rsplit("/")[-1].split(".")[0]
-            return self.index_by_id(queue_id, current_item_id)
+    def _parse_player_current_item_id(self, queue_id: str, current_item_id: str) -> str | None:
+        """Parse QueueItem ID from Player's current url."""
+        if not current_item_id:
+            return None
+        if queue_id in current_item_id:
+            # try to extract the item id from either a url or queue_id/item_id combi
+            current_item_id = current_item_id.rsplit("/")[-1].split(".")[0]
+        if self.get_item(queue_id, current_item_id):
+            return current_item_id
         return None
index 83576ed455699acd2d0753af1bf8a87cbf80e366..6701dff1b4acb27d1454d2ae7da9c0c899a2d1c1 100755 (executable)
@@ -328,6 +328,11 @@ class PlayerController(CoreController):
         - player_id: player_id of the player to handle the command.
         """
         player_id = self._check_redirect(player_id)
+        player = self.get(player_id, True)
+        if PlayerFeature.PAUSE not in player.supported_features:
+            # if player does not support pause, we need to send stop
+            await self.cmd_stop(player_id)
+            return
         player_provider = self.get_player_provider(player_id)
         await player_provider.cmd_pause(player_id)
 
@@ -636,24 +641,16 @@ class PlayerController(CoreController):
         if group_players := self._get_player_groups(player.player_id):
             # prefer the first playing (or paused) group parent
             for group_player in group_players:
-                # if the group player's playerid is within the curtrent url,
+                # if the group player's playerid is within the current_item_id
                 # this group is definitely active
-                if player.current_url and group_player.player_id in player.current_url:
+                if player.current_item_id and group_player.player_id in player.current_item_id:
                     return group_player.player_id
             # fallback to the first powered group player
             for group_player in group_players:
                 if group_player.powered:
                     return group_player.player_id
-        # guess source from player's current url
-        if player.current_url and player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
-            if self.mass.streams.base_url in player.current_url:
-                return player.player_id
-            if ":" in player.current_url:
-                # extract source from uri/url
-                return player.current_url.split(":")[0]
-            return player.current_url
-        # defaults to the player's own player id
-        return player.player_id
+        # defaults to the player's own player id if not active source set
+        return player.active_source or player.player_id
 
     def _get_group_volume_level(self, player: Player) -> int:
         """Calculate a group volume from the grouped members."""
index cdde438c6093ae91c546eccb6209c2df7238ee95..fe485d07161287f47d98b49d04134153903f7100 100644 (file)
@@ -32,19 +32,19 @@ from music_assistant.common.models.queue_item import QueueItem
 from music_assistant.constants import (
     CONF_BIND_IP,
     CONF_BIND_PORT,
+    CONF_CROSSFADE,
     CONF_CROSSFADE_DURATION,
     CONF_EQ_BASS,
     CONF_EQ_MID,
     CONF_EQ_TREBLE,
     CONF_OUTPUT_CHANNELS,
-    CONF_OUTPUT_CODEC,
     CONF_PUBLISH_IP,
 )
 from music_assistant.server.helpers.audio import (
     check_audio_support,
     crossfade_pcm_parts,
     get_media_stream,
-    get_stream_details,
+    set_stream_details,
 )
 from music_assistant.server.helpers.process import AsyncProcess
 from music_assistant.server.helpers.util import get_ips
@@ -132,16 +132,8 @@ class MultiClientStreamJob:
             with suppress(asyncio.QueueFull):
                 sub_queue.put_nowait(b"")
 
-    async def resolve_stream_url(
-        self,
-        child_player_id: str,
-    ) -> str:
+    def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
         """Resolve the childplayer specific stream URL to this streamjob."""
-        output_codec = ContentType(
-            await self.stream_controller.mass.config.get_player_config_value(
-                child_player_id, CONF_OUTPUT_CODEC
-            )
-        )
         fmt = output_codec.value
         # handle raw pcm
         if output_codec.is_pcm():
@@ -149,8 +141,8 @@ class MultiClientStreamJob:
             player_max_bit_depth = 24 if player.supports_24bit else 16
             output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate)
             output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth)
-            output_channels = await self.stream_controller.mass.config.get_player_config_value(
-                child_player_id, CONF_OUTPUT_CHANNELS
+            output_channels = self.stream_controller.mass.config.get_raw_player_config_value(
+                child_player_id, CONF_OUTPUT_CHANNELS, "stereo"
             )
             channels = 1 if output_channels != "stereo" else 2
             fmt += (
@@ -373,35 +365,31 @@ class StreamsController(CoreController):
 
     async def resolve_stream_url(
         self,
-        queue_id: str,
         queue_item: QueueItem,
+        output_codec: ContentType,
         seek_position: int = 0,
         fade_in: bool = False,
         flow_mode: bool = False,
     ) -> str:
-        """Resolve the (regular, single player) stream URL for the given QueueItem.
-
-        This is called just-in-time by the Queue controller to get the URL to the audio.
-        """
-        output_codec = ContentType(
-            await self.mass.config.get_player_config_value(queue_id, CONF_OUTPUT_CODEC)
-        )
+        """Resolve the stream URL for the given QueueItem."""
         fmt = output_codec.value
         # handle raw pcm
         if output_codec.is_pcm():
-            player = self.mass.players.get(queue_id)
+            player = self.mass.players.get(queue_item.queue_id)
             player_max_bit_depth = 24 if player.supports_24bit else 16
             if flow_mode:
                 output_sample_rate = min(FLOW_MAX_SAMPLE_RATE, player.max_sample_rate)
                 output_bit_depth = min(FLOW_MAX_BIT_DEPTH, player_max_bit_depth)
             else:
-                streamdetails = await get_stream_details(self.mass, queue_item)
+                await set_stream_details(self.mass, queue_item)
                 output_sample_rate = min(
-                    streamdetails.audio_format.sample_rate, player.max_sample_rate
+                    queue_item.streamdetails.audio_format.sample_rate, player.max_sample_rate
                 )
-                output_bit_depth = min(streamdetails.audio_format.bit_depth, player_max_bit_depth)
-            output_channels = await self.mass.config.get_player_config_value(
-                queue_id, CONF_OUTPUT_CHANNELS
+                output_bit_depth = min(
+                    queue_item.streamdetails.audio_format.bit_depth, player_max_bit_depth
+                )
+            output_channels = self.mass.config.get_raw_player_config_value(
+                queue_item.queue_id, CONF_OUTPUT_CHANNELS, "stereo"
             )
             channels = 1 if output_channels != "stereo" else 2
             fmt += (
@@ -410,7 +398,7 @@ class StreamsController(CoreController):
             )
         query_params = {}
         base_path = "flow" if flow_mode else "single"
-        url = f"{self._server.base_url}/{queue_id}/{base_path}/{queue_item.queue_item_id}.{fmt}"
+        url = f"{self._server.base_url}/{queue_item.queue_id}/{base_path}/{queue_item.queue_item_id}.{fmt}"  # noqa: E501
         if seek_position:
             query_params["seek_position"] = str(seek_position)
         if fade_in:
@@ -470,7 +458,7 @@ class StreamsController(CoreController):
         if not queue_item:
             raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
         try:
-            streamdetails = await get_stream_details(self.mass, queue_item=queue_item)
+            await set_stream_details(self.mass, queue_item=queue_item)
         except MediaNotFoundError:
             raise web.HTTPNotFound(
                 reason=f"Unable to retrieve streamdetails for item: {queue_item}"
@@ -481,8 +469,8 @@ class StreamsController(CoreController):
         output_format = await self._get_output_format(
             output_format_str=request.match_info["fmt"],
             queue_player=queue_player,
-            default_sample_rate=streamdetails.audio_format.sample_rate,
-            default_bit_depth=streamdetails.audio_format.bit_depth,
+            default_sample_rate=queue_item.streamdetails.audio_format.sample_rate,
+            default_bit_depth=queue_item.streamdetails.audio_format.bit_depth,
         )
 
         # prepare request, add some DLNA/UPNP compatible headers
@@ -508,9 +496,11 @@ class StreamsController(CoreController):
 
         # collect player specific ffmpeg args to re-encode the source PCM stream
         pcm_format = AudioFormat(
-            content_type=ContentType.from_bit_depth(streamdetails.audio_format.bit_depth),
-            sample_rate=streamdetails.audio_format.sample_rate,
-            bit_depth=streamdetails.audio_format.bit_depth,
+            content_type=ContentType.from_bit_depth(
+                queue_item.streamdetails.audio_format.bit_depth
+            ),
+            sample_rate=queue_item.streamdetails.audio_format.sample_rate,
+            bit_depth=queue_item.streamdetails.audio_format.bit_depth,
         )
         ffmpeg_args = await self._get_player_ffmpeg_args(
             queue_player,
@@ -524,7 +514,7 @@ class StreamsController(CoreController):
                 try:
                     async for chunk in get_media_stream(
                         self.mass,
-                        streamdetails=streamdetails,
+                        streamdetails=queue_item.streamdetails,
                         pcm_format=pcm_format,
                         seek_position=seek_position,
                         fade_in=fade_in,
@@ -766,43 +756,35 @@ class StreamsController(CoreController):
         queue_track = None
         last_fadeout_part = b""
         total_bytes_written = 0
-        self.logger.info("Start Queue Flow stream for Queue %s", queue.display_name)
+        queue.flow_mode = True
+        use_crossfade = self.mass.config.get_raw_player_config_value(
+            queue.queue_id, CONF_CROSSFADE, False
+        )
+        pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
+        self.logger.info(
+            "Start Queue Flow stream for Queue %s - crossfade: %s",
+            queue.display_name,
+            use_crossfade,
+        )
 
         while True:
             # get (next) queue item to stream
             if queue_track is None:
                 queue_track = start_queue_item
-                use_crossfade = queue.crossfade_enabled
+                await set_stream_details(self.mass, queue_track)
             else:
                 seek_position = 0
                 fade_in = False
                 try:
-                    (
-                        _,
-                        queue_track,
-                        use_crossfade,
-                    ) = await self.mass.player_queues.preload_next_url(queue.queue_id)
+                    queue_track = await self.mass.player_queues.preload_next_item(queue.queue_id)
                 except QueueEmpty:
                     break
 
-            # get streamdetails
-            try:
-                streamdetails = await get_stream_details(self.mass, queue_track)
-            except MediaNotFoundError as err:
-                # streamdetails retrieval failed, skip to next track instead of bailing out...
-                self.logger.warning(
-                    "Skip track %s due to missing streamdetails",
-                    queue_track.name,
-                    exc_info=err,
-                )
-                continue
-
             self.logger.debug(
-                "Start Streaming queue track: %s (%s) for queue %s - crossfade: %s",
-                streamdetails.uri,
+                "Start Streaming queue track: %s (%s) for queue %s",
+                queue_track.streamdetails.uri,
                 queue_track.name,
                 queue.display_name,
-                use_crossfade,
             )
 
             # set some basic vars
@@ -813,22 +795,28 @@ class StreamsController(CoreController):
             crossfade_size = int(pcm_sample_size * crossfade_duration)
             queue_track.streamdetails.seconds_skipped = seek_position
             buffer_size = crossfade_size if use_crossfade else int(pcm_sample_size * 2)
-
-            buffer = b""
             bytes_written = 0
+            buffer = b""
             chunk_num = 0
             # handle incoming audio chunks
             async for chunk in get_media_stream(
                 self.mass,
-                streamdetails,
+                queue_track.streamdetails,
                 pcm_format=pcm_format,
                 seek_position=seek_position,
                 fade_in=fade_in,
-                # only allow strip silence from begin if track is being crossfaded
-                strip_silence_begin=last_fadeout_part != b"",
+                # strip silence from begin/end if track is being crossfaded
+                strip_silence_begin=use_crossfade,
+                strip_silence_end=use_crossfade,
             ):
                 chunk_num += 1
 
+                # throttle buffer, do not allow more than 30 seconds in buffer
+                seconds_buffered = total_bytes_written / pcm_sample_size
+                player = self.mass.players.get(queue.queue_id)
+                while (seconds_buffered - player.corrected_elapsed_time) > 30:
+                    await asyncio.sleep(1)
+
                 ####  HANDLE FIRST PART OF TRACK
 
                 # buffer full for crossfade
@@ -869,12 +857,6 @@ class StreamsController(CoreController):
 
             #### HANDLE END OF TRACK
 
-            if bytes_written == 0:
-                # stream error: got empty first chunk ?!
-                self.logger.warning("Stream error on %s", streamdetails.uri)
-                queue_track.streamdetails.seconds_streamed = 0
-                continue
-
             if buffer and use_crossfade:
                 # if crossfade is enabled, save fadeout part to pickup for next track
                 last_fadeout_part = buffer[-crossfade_size:]
@@ -886,14 +868,18 @@ class StreamsController(CoreController):
                 yield buffer
                 bytes_written += len(buffer)
 
-            # end of the track reached - store accurate duration
+            # update duration details based on the actual pcm data we sent
+            # this also accounts for crossfade and silence stripping
             queue_track.streamdetails.seconds_streamed = bytes_written / pcm_sample_size
-            total_bytes_written += bytes_written
+            queue_track.streamdetails.duration = (
+                seek_position + queue_track.streamdetails.seconds_streamed
+            )
             self.logger.debug(
-                "Finished Streaming queue track: %s (%s) on queue %s",
+                "Finished Streaming queue track: %s (%s) on queue %s - seconds streamed: %s",
                 queue_track.streamdetails.uri,
                 queue_track.name,
                 queue.display_name,
+                queue_track.streamdetails.seconds_streamed,
             )
 
         self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
@@ -905,7 +891,6 @@ class StreamsController(CoreController):
         output_format: AudioFormat,
     ) -> list[str]:
         """Get player specific arguments for the given (pcm) input and output details."""
-        player_conf = await self.mass.config.get_player_config(player.player_id)
         # generic args
         generic_args = [
             "ffmpeg",
@@ -954,16 +939,28 @@ class StreamsController(CoreController):
 
         # the below is a very basic 3-band equalizer,
         # this could be a lot more sophisticated at some point
-        if eq_bass := player_conf.get_value(CONF_EQ_BASS):
+        if (
+            eq_bass := self.mass.config.get_raw_player_config_value(
+                player.player_id, CONF_EQ_BASS, 0
+            )
+        ) != 0:
             filter_params.append(f"equalizer=frequency=100:width=200:width_type=h:gain={eq_bass}")
-        if eq_mid := player_conf.get_value(CONF_EQ_MID):
+        if (
+            eq_mid := self.mass.config.get_raw_player_config_value(player.player_id, CONF_EQ_MID, 0)
+        ) != 0:
             filter_params.append(f"equalizer=frequency=900:width=1800:width_type=h:gain={eq_mid}")
-        if eq_treble := player_conf.get_value(CONF_EQ_TREBLE):
+        if (
+            eq_treble := self.mass.config.get_raw_player_config_value(
+                player.player_id, CONF_EQ_TREBLE, 0
+            )
+        ) != 0:
             filter_params.append(
                 f"equalizer=frequency=9000:width=18000:width_type=h:gain={eq_treble}"
             )
         # handle output mixing only left or right
-        conf_channels = player_conf.get_value(CONF_OUTPUT_CHANNELS)
+        conf_channels = self.mass.config.get_raw_player_config_value(
+            player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
+        )
         if conf_channels == "left":
             filter_params.append("pan=mono|c0=FL")
         elif conf_channels == "right":
@@ -1008,8 +1005,8 @@ class StreamsController(CoreController):
             output_sample_rate = min(default_sample_rate, queue_player.max_sample_rate)
             player_max_bit_depth = 24 if queue_player.supports_24bit else 16
             output_bit_depth = min(default_bit_depth, player_max_bit_depth)
-            output_channels_str = await self.mass.config.get_player_config_value(
-                queue_player.player_id, CONF_OUTPUT_CHANNELS
+            output_channels_str = self.mass.config.get_raw_player_config_value(
+                queue_player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
             )
             output_channels = 1 if output_channels_str != "stereo" else 2
         return AudioFormat(
index 4440206127cbccffbfa8609dd82f137b86f907e5..62ae2b814f64c306d7afadc1e24e44033484f378 100644 (file)
@@ -243,8 +243,8 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N
             )
 
 
-async def get_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> StreamDetails:
-    """Get streamdetails for the given QueueItem.
+async def set_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> None:
+    """Set streamdetails for the given QueueItem.
 
     This is called just-in-time when a PlayerQueue wants a MediaItem to be played.
     Do not try to request streamdetails in advance as this is expiring data.
@@ -301,10 +301,8 @@ async def get_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> Str
         and streamdetails.data.startswith("http")
     ):
         streamdetails.direct = streamdetails.data
-    # set streamdetails as attribute on the media_item
-    # this way the app knows what content is playing
+    # set streamdetails as attribute on the queue_item
     queue_item.streamdetails = streamdetails
-    return streamdetails
 
 
 async def get_gain_correct(
@@ -494,6 +492,7 @@ async def get_media_stream(  # noqa: PLR0915
 
             # update duration details based on the actual pcm data we sent
             streamdetails.seconds_streamed = bytes_sent / pcm_sample_size
+            streamdetails.duration = seek_position + streamdetails.seconds_streamed
 
         except (asyncio.CancelledError, GeneratorExit) as err:
             LOGGER.debug("media stream aborted for: %s", streamdetails.uri)
index b02e2bafa23a6c77c03daaa29cf90f23c75b3a38..4627e50044b9049a5390d63888c3b5bf2a23f5f8 100644 (file)
@@ -9,6 +9,7 @@ import tempfile
 import urllib.error
 import urllib.parse
 import urllib.request
+from collections.abc import Iterator
 from functools import lru_cache
 from importlib.metadata import PackageNotFoundError
 from importlib.metadata import version as pkg_version
@@ -102,3 +103,9 @@ def create_tempfile():
     if platform.system() == "Linux":
         return memory_tempfile.MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
     return tempfile.NamedTemporaryFile(buffering=0)
+
+
+def divide_chunks(data: bytes, chunk_size: int) -> Iterator[bytes]:
+    """Chunk bytes data into smaller chunks."""
+    for i in range(0, len(data), chunk_size):
+        yield data[i : i + chunk_size]
index a6942ef6919e261d64a9ca3788952843df5abcae..29584b4e1b773630275c938f5f23e12375867c82 100644 (file)
@@ -4,14 +4,18 @@ from __future__ import annotations
 from abc import abstractmethod
 from typing import TYPE_CHECKING
 
+from music_assistant.common.models.config_entries import (
+    CONF_ENTRY_AUTO_PLAY,
+    CONF_ENTRY_VOLUME_NORMALIZATION,
+    CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
+)
 from music_assistant.common.models.player import Player
-from music_assistant.common.models.queue_item import QueueItem
 
 from .provider import Provider
 
 if TYPE_CHECKING:
     from music_assistant.common.models.config_entries import ConfigEntry, PlayerConfig
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
+    from music_assistant.common.models.queue_item import QueueItem
 
 # ruff: noqa: ARG001, ARG002
 
@@ -24,7 +28,11 @@ class PlayerProvider(Provider):
 
     async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
         """Return all (provider/player specific) Config Entries for the given player (if any)."""
-        return tuple()
+        return (
+            CONF_ENTRY_VOLUME_NORMALIZATION,
+            CONF_ENTRY_AUTO_PLAY,
+            CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
+        )
 
     def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
         """Call (by config manager) when the configuration of a player changes."""
@@ -46,45 +54,52 @@ class PlayerProvider(Provider):
         - player_id: player_id of the player to handle the command.
         """
 
-    @abstractmethod
     async def cmd_pause(self, player_id: str) -> None:
         """Send PAUSE command to given player.
 
         - player_id: player_id of the player to handle the command.
         """
+        # will only be called for players with Pause feature set.
 
-    @abstractmethod
-    async def cmd_play_url(
+    async def play_media(
         self,
         player_id: str,
-        url: str,
-        queue_item: QueueItem | None,
+        queue_item: QueueItem,
+        seek_position: int,
+        fade_in: bool,
     ) -> None:
-        """Send PLAY URL command to given player.
+        """Handle PLAY MEDIA on given player.
 
-        This is called when the Queue wants the player to start playing a specific url.
-        If an item from the Queue is being played, the QueueItem will be provided with
-        all metadata present.
+        This is called by the Queue controller to start playing a queue item on the given player.
+        The provider's own implementation should work out how to handle this request.
 
             - player_id: player_id of the player to handle the command.
-            - url: the url that the player should start playing.
-            - queue_item: the QueueItem that is related to the URL (None when playing direct url).
+            - queue_item: The QueueItem that needs to be played on the player.
+            - seek_position: Optional seek to this position.
+            - fade_in: Optionally fade in the item at playback start.
         """
 
-    async def cmd_handle_stream_job(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle StreamJob play command on given player.
+    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+        """
+        Handle enqueuing of the next queue item on the player.
 
-        This is called when the Queue wants the player to start playing media
-        to multiple subscribers at the same time using a MultiClientStreamJob.
-        The default implementation is that the URL to the stream is resolved for the player
-        and played like any regular play_url command, but implementation may override
-        this behavior for any more sophisticated handling (e.g. when syncing etc.)
+        If the player supports PlayerFeature.ENQUE_NEXT:
+          This will be called about 10 seconds before the end of the track.
+        If the player does NOT report support for PlayerFeature.ENQUE_NEXT:
+          This will be called when the end of the track is reached.
 
-            - player_id: player_id of the player to handle the command.
-            - stream_job: the MultiClientStreamJob that the player should start playing.
+        A PlayerProvider implementation is in itself responsible for handling this
+        so that the queue items keep playing until its empty or the player stopped.
+
+        This will NOT be called if the end of the queue is reached (and repeat disabled).
+        This will NOT be called if the player is using flow mode to playback the queue.
         """
-        url = await stream_job.resolve_stream_url(player_id)
-        await self.cmd_play_url(player_id=player_id, url=url, queue_item=None)
+        # default implementation (for a player without enqueue_next feature):
+        # simply start playback of the next track.
+        # player providers need to override this behavior if/when needed
+        await self.play_media(
+            player_id=player_id, queue_item=queue_item, seek_position=0, fade_in=False
+        )
 
     async def cmd_power(self, player_id: str, powered: bool) -> None:
         """Send POWER command to given player.
index 6c464a10fd1e9f4624db4af589ce57d6e6ed11f5..b56f4373e45ce7905ea3dab1ac5cef6d78f039dd 100644 (file)
@@ -15,22 +15,17 @@ from typing import TYPE_CHECKING
 
 import aiofiles
 
-from music_assistant.common.models.config_entries import (
-    CONF_ENTRY_OUTPUT_CODEC,
-    ConfigEntry,
-    ConfigValueType,
-)
+from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
 from music_assistant.common.models.enums import ConfigEntryType
 from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.common.models.queue_item import QueueItem
 from music_assistant.constants import CONF_LOG_LEVEL, CONF_PLAYERS
 from music_assistant.server.models.player_provider import PlayerProvider
 
 if TYPE_CHECKING:
     from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
+    from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
     from music_assistant.server.models import ProviderInstanceType
     from music_assistant.server.providers.slimproto import SlimprotoProvider
 
@@ -93,9 +88,6 @@ PLAYER_CONFIG_ENTRIES = (
         "this number of seconds.",
         advanced=True,
     ),
-    ConfigEntry.from_dict(
-        {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "default_value": "flac", "hidden": True}
-    ),
 )
 
 NEED_BRIDGE_RESTART = {"values/read_ahead", "values/encryption", "values/alac_encode", "enabled"}
@@ -168,7 +160,7 @@ class AirplayProvider(PlayerProvider):
         """Return all (provider/player specific) Config Entries for the given player (if any)."""
         slimproto_prov = self.mass.get_provider("slimproto")
         base_entries = await slimproto_prov.get_player_config_entries(player_id)
-        return tuple(base_entries + PLAYER_CONFIG_ENTRIES)
+        return base_entries + PLAYER_CONFIG_ENTRIES
 
     def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
         """Call (by config manager) when the configuration of a player changes."""
@@ -200,35 +192,37 @@ class AirplayProvider(PlayerProvider):
         slimproto_prov = self.mass.get_provider("slimproto")
         await slimproto_prov.cmd_play(player_id)
 
-    async def cmd_play_url(
+    async def play_media(
         self,
         player_id: str,
-        url: str,
-        queue_item: QueueItem | None,
+        queue_item: QueueItem,
+        seek_position: int,
+        fade_in: bool,
     ) -> None:
-        """Send PLAY URL command to given player.
+        """Handle PLAY MEDIA on given player.
 
-        This is called when the Queue wants the player to start playing a specific url.
-        If an item from the Queue is being played, the QueueItem will be provided with
-        all metadata present.
+        This is called by the Queue controller to start playing a queue item on the given player.
+        The provider's own implementation should work out how to handle this request.
 
             - player_id: player_id of the player to handle the command.
-            - url: the url that the player should start playing.
-            - queue_item: the QueueItem that is related to the URL (None when playing direct url).
+            - queue_item: The QueueItem that needs to be played on the player.
+            - seek_position: Optional seek to this position.
+            - fade_in: Optionally fade in the item at playback start.
         """
         # simply forward to underlying slimproto player
         slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.cmd_play_url(
+        await slimproto_prov.play_media(
             player_id,
-            url=url,
             queue_item=queue_item,
+            seek_position=seek_position,
+            fade_in=fade_in,
         )
 
-    async def cmd_handle_stream_job(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle StreamJob play command on given player."""
+    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+        """Handle enqueuing of the next queue item on the player."""
         # simply forward to underlying slimproto player
         slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.cmd_handle_stream_job(player_id=player_id, stream_job=stream_job)
+        await slimproto_prov.enqueue_next_queue_item(player_id, queue_item)
 
     async def cmd_pause(self, player_id: str) -> None:
         """Send PAUSE command to given player."""
index 5cb1e88504be34db734116591af451b92f1d5d5c..4088bce06e7e5e95bae0beb47673fc741d3f6113 100644 (file)
@@ -20,21 +20,23 @@ from pychromecast.models import CastInfo
 from pychromecast.socket_client import CONNECTION_STATUS_CONNECTED, CONNECTION_STATUS_DISCONNECTED
 
 from music_assistant.common.models.config_entries import (
+    CONF_ENTRY_CROSSFADE_DURATION,
     CONF_ENTRY_HIDE_GROUP_MEMBERS,
     ConfigEntry,
     ConfigValueType,
 )
 from music_assistant.common.models.enums import (
     ConfigEntryType,
+    ContentType,
     MediaType,
     PlayerFeature,
     PlayerState,
     PlayerType,
 )
-from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty
+from music_assistant.common.models.errors import PlayerUnavailableError
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_LOG_LEVEL, CONF_PLAYERS, MASS_LOGO_ONLINE
+from music_assistant.constants import CONF_CROSSFADE, CONF_LOG_LEVEL, CONF_PLAYERS, MASS_LOGO_ONLINE
 from music_assistant.server.models.player_provider import PlayerProvider
 
 from .helpers import CastStatusListener, ChromecastInfo
@@ -53,7 +55,17 @@ if TYPE_CHECKING:
 CONF_ALT_APP = "alt_app"
 
 
-BASE_PLAYER_CONFIG_ENTRIES = (
+PLAYER_CONFIG_ENTRIES = (
+    ConfigEntry(
+        key=CONF_CROSSFADE,
+        type=ConfigEntryType.BOOLEAN,
+        label="Enable crossfade",
+        default_value=False,
+        description="Enable a crossfade transition between (queue) tracks. \n"
+        "Note that Chromecast does not natively support crossfading so Music Assistant "
+        "uses a 'flow mode' workaround for this at the cost of on-player metadata.",
+        advanced=False,
+    ),
     ConfigEntry(
         key=CONF_ALT_APP,
         type=ConfigEntryType.BOOLEAN,
@@ -63,6 +75,7 @@ BASE_PLAYER_CONFIG_ENTRIES = (
         "the playback experience but may not work on non-Google hardware.",
         advanced=True,
     ),
+    CONF_ENTRY_CROSSFADE_DURATION,
 )
 
 
@@ -103,7 +116,6 @@ class CastPlayer:
     logger: Logger
     status_listener: CastStatusListener | None = None
     mz_controller: MultizoneController | None = None
-    next_url: str | None = None
     active_group: str | None = None
     current_queue_item_id: str | None = None
 
@@ -161,7 +173,8 @@ class ChromecastProvider(PlayerProvider):
     async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
         """Return all (provider/player specific) Config Entries for the given player (if any)."""
         cast_player = self.castplayers.get(player_id)
-        entries = BASE_PLAYER_CONFIG_ENTRIES
+        base_entries = await super().get_player_config_entries(player_id)
+        entries = base_entries + PLAYER_CONFIG_ENTRIES
         if (
             cast_player
             and cast_player.cast_info.is_audio_group
@@ -187,26 +200,64 @@ class ChromecastProvider(PlayerProvider):
         castplayer = self.castplayers[player_id]
         await asyncio.to_thread(castplayer.cc.media_controller.play)
 
-    async def cmd_play_url(
+    async def cmd_pause(self, player_id: str) -> None:
+        """Send PAUSE command to given player."""
+        castplayer = self.castplayers[player_id]
+        await asyncio.to_thread(castplayer.cc.media_controller.pause)
+
+    async def cmd_power(self, player_id: str, powered: bool) -> None:
+        """Send POWER command to given player."""
+        castplayer = self.castplayers[player_id]
+        # set mute_as_power feature for group members
+        if castplayer.player.type == PlayerType.GROUP:
+            for child_player_id in castplayer.player.group_childs:
+                if child_player := self.mass.players.get(child_player_id):
+                    child_player.mute_as_power = powered
+        if powered:
+            await self._launch_app(castplayer)
+        else:
+            await asyncio.to_thread(castplayer.cc.quit_app)
+
+    async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
+        """Send VOLUME_SET command to given player."""
+        castplayer = self.castplayers[player_id]
+        await asyncio.to_thread(castplayer.cc.set_volume, volume_level / 100)
+
+    async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
+        """Send VOLUME MUTE command to given player."""
+        castplayer = self.castplayers[player_id]
+        await asyncio.to_thread(castplayer.cc.set_volume_muted, muted)
+
+    async def play_media(
         self,
         player_id: str,
-        url: str,
-        queue_item: QueueItem | None,
+        queue_item: QueueItem,
+        seek_position: int,
+        fade_in: bool,
     ) -> None:
-        """Send PLAY URL command to given player.
+        """Handle PLAY MEDIA on given player.
 
-        This is called when the Queue wants the player to start playing a specific url.
-        If an item from the Queue is being played, the QueueItem will be provided with
-        all metadata present.
+        This is called by the Queue controller to start playing a queue item on the given player.
+        The provider's own implementation should work out how to handle this request.
 
             - player_id: player_id of the player to handle the command.
-            - url: the url that the player should start playing.
-            - queue_item: the QueueItem that is related to the URL (None when playing direct url).
+            - queue_item: The QueueItem that needs to be played on the player.
+            - seek_position: Optional seek to this position.
+            - fade_in: Optionally fade in the item at playback start.
         """
         castplayer = self.castplayers[player_id]
-
-        # in flow/direct url mode, we just send the url and the metadata is of no use
-        if not queue_item:
+        # Google cast does not support crossfading so we use flow mode to provide this feature
+        use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
+        url = await self.mass.streams.resolve_stream_url(
+            queue_item=queue_item,
+            output_codec=ContentType.FLAC,
+            seek_position=seek_position,
+            fade_in=fade_in,
+            flow_mode=use_flow_mode,
+        )
+        if use_flow_mode:
+            # In flow mode, all queue tracks are sent to the player as continuous stream.
+            # This comes at the cost of metadata (cast does not support ICY metadata).
             await asyncio.to_thread(
                 castplayer.cc.play_media,
                 url,
@@ -215,8 +266,8 @@ class ChromecastProvider(PlayerProvider):
                 thumb=MASS_LOGO_ONLINE,
             )
             return
-
-        cc_queue_items = [self._create_queue_item(queue_item, url)]
+        # handle normal playback using the chromecast queue to play items one by one
+        cc_queue_items = [self._create_cc_queue_item(queue_item, url)]
         queuedata = {
             "type": "QUEUE_LOAD",
             "repeatMode": "REPEAT_OFF",  # handled by our queue controller
@@ -225,40 +276,27 @@ class ChromecastProvider(PlayerProvider):
             "startIndex": 0,  # Item index to play after this request or keep same item if undefined
             "items": cc_queue_items,
         }
-        # make sure that media controller app is launched
+        # make sure that the media controller app is launched
         await self._launch_app(castplayer)
         # send queue info to the CC
-        castplayer.next_url = None
         media_controller = castplayer.cc.media_controller
         await asyncio.to_thread(media_controller.send_message, queuedata, True)
 
-    async def cmd_pause(self, player_id: str) -> None:
-        """Send PAUSE command to given player."""
+    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+        """Handle enqueuing of the next queue item on the player."""
         castplayer = self.castplayers[player_id]
-        await asyncio.to_thread(castplayer.cc.media_controller.pause)
-
-    async def cmd_power(self, player_id: str, powered: bool) -> None:
-        """Send POWER command to given player."""
-        castplayer = self.castplayers[player_id]
-        # set mute_as_power feature for group members
-        if castplayer.player.type == PlayerType.GROUP:
-            for child_player_id in castplayer.player.group_childs:
-                if child_player := self.mass.players.get(child_player_id):
-                    child_player.mute_as_power = powered
-        if powered:
-            await self._launch_app(castplayer)
-        else:
-            await asyncio.to_thread(castplayer.cc.quit_app)
-
-    async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
-        """Send VOLUME_SET command to given player."""
-        castplayer = self.castplayers[player_id]
-        await asyncio.to_thread(castplayer.cc.set_volume, volume_level / 100)
-
-    async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
-        """Send VOLUME MUTE command to given player."""
-        castplayer = self.castplayers[player_id]
-        await asyncio.to_thread(castplayer.cc.set_volume_muted, muted)
+        url = await self.mass.streams.resolve_stream_url(
+            queue_item=queue_item,
+            output_codec=ContentType.FLAC,
+        )
+        queuedata = {
+            "type": "QUEUE_INSERT",
+            "insertBefore": None,
+            "items": [self._create_cc_queue_item(queue_item, url)],
+        }
+        media_controller = castplayer.cc.media_controller
+        queuedata["mediaSessionId"] = media_controller.status.media_session_id
+        await asyncio.to_thread(media_controller.send_message, queuedata, True)
 
     async def poll_player(self, player_id: str) -> None:
         """Poll player for state updates.
@@ -358,6 +396,8 @@ class ChromecastProvider(PlayerProvider):
                         PlayerFeature.POWER,
                         PlayerFeature.VOLUME_MUTE,
                         PlayerFeature.VOLUME_SET,
+                        PlayerFeature.ENQUEUE_NEXT,
+                        PlayerFeature.PAUSE,
                     ),
                     max_sample_rate=96000,
                     supports_24bit=True,
@@ -430,7 +470,6 @@ class ChromecastProvider(PlayerProvider):
         """Handle updated MediaStatus."""
         castplayer.logger.debug("Received media status update: %s", status.player_state)
         # player state
-        prev_state = castplayer.player.state
         if status.player_is_playing:
             castplayer.player.state = PlayerState.PLAYING
         elif status.player_is_paused:
@@ -445,36 +484,24 @@ class ChromecastProvider(PlayerProvider):
         else:
             castplayer.player.elapsed_time = status.current_time
 
+        # active source
+        if status.content_id and castplayer.player_id in status.content_id:
+            castplayer.player.active_source = castplayer.player_id
+        else:
+            castplayer.player.active_source = castplayer.cc.app_display_name
+
         # current media
-        castplayer.player.current_url = status.content_id
+        castplayer.player.current_item_id = status.content_id
         self.mass.loop.call_soon_threadsafe(self.mass.players.update, castplayer.player_id)
 
-        # enqueue next item if player is almost at the end of the track
-        if (  # noqa: SIM114
-            castplayer.player.state == PlayerState.PLAYING
-            and castplayer.player.active_source == castplayer.player.player_id
-            and (queue := self.mass.player_queues.get(castplayer.player_id))
-            and (current_item := queue.current_item)
-            and current_item.duration
-            and (current_item.duration - castplayer.player.elapsed_time) <= 10
-        ):
-            asyncio.run_coroutine_threadsafe(self._enqueue_next_track(castplayer), self.mass.loop)
-        # failsafe enqueue next item if player stopped at the end of the track
-        elif (
-            castplayer.player.state == PlayerState.IDLE
-            and prev_state == PlayerState.PLAYING
-            and castplayer.player.active_source == castplayer.player.player_id
-            and castplayer.player.current_url == castplayer.next_url
-        ):
-            asyncio.run_coroutine_threadsafe(self._enqueue_next_track(castplayer), self.mass.loop)
-        # handle end of MA queue - set current item to None
-        elif (
+        # handle end of MA queue - reset current_item_id
+        if (
             castplayer.player.state == PlayerState.IDLE
-            and castplayer.player.current_url
+            and castplayer.player.current_item_id
             and (queue := self.mass.player_queues.get(castplayer.player_id))
             and queue.next_item is None
         ):
-            castplayer.player.current_url = None
+            castplayer.player.current_item_id = None
 
     def on_new_connection_status(self, castplayer: CastPlayer, status: ConnectionStatus) -> None:
         """Handle updated ConnectionStatus."""
@@ -508,49 +535,6 @@ class ChromecastProvider(PlayerProvider):
 
     ### Helpers / utils
 
-    async def _enqueue_next_track(self, castplayer: CastPlayer) -> None:
-        """Enqueue the next track of the MA queue on the CC queue."""
-        try:
-            next_url, next_item, _ = await self.mass.player_queues.preload_next_url(
-                castplayer.player_id, castplayer.current_queue_item_id
-            )
-        except QueueEmpty:
-            return
-
-        if castplayer.next_url == next_url:
-            return  # already set ?!
-        castplayer.next_url = next_url
-        castplayer.current_queue_item_id = next_item.queue_item_id
-
-        # in flow/direct url mode, we just send the url and the metadata is of no use
-        if not next_item:
-            await asyncio.to_thread(
-                castplayer.cc.play_media,
-                next_url,
-                content_type=f'audio/{next_url.split(".")[-1].split("?")[0]}',
-                title="Music Assistant",
-                thumb=MASS_LOGO_ONLINE,
-                enqueue=True,
-                media_info={
-                    "customData": {
-                        "queue_item_id": "flow",
-                    }
-                },
-            )
-            return
-        cc_queue_items = [self._create_queue_item(next_item, next_url)]
-
-        queuedata = {
-            "type": "QUEUE_INSERT",
-            "insertBefore": None,
-            "items": cc_queue_items,
-        }
-        media_controller = castplayer.cc.media_controller
-        queuedata["mediaSessionId"] = media_controller.status.media_session_id
-
-        await asyncio.sleep(0.5)  # throttle commands to CC a bit or it will crash
-        await asyncio.to_thread(media_controller.send_message, queuedata, True)
-
     async def _launch_app(self, castplayer: CastPlayer) -> None:
         """Launch the default Media Receiver App on a Chromecast."""
         event = asyncio.Event()
@@ -599,7 +583,7 @@ class ChromecastProvider(PlayerProvider):
         castplayer.status_listener = None
         self.castplayers.pop(castplayer.player_id, None)
 
-    def _create_queue_item(self, queue_item: QueueItem, stream_url: str):
+    def _create_cc_queue_item(self, queue_item: QueueItem, stream_url: str):
         """Create CC queue item from MA QueueItem."""
         duration = int(queue_item.duration) if queue_item.duration else None
         image_url = self.mass.metadata.get_image_url(queue_item.image) if queue_item.image else ""
index 7db38698fc8302b84de9958b4337205c5837fd07..94f827fa3969a9121007574117becff7e4b365cb 100644 (file)
@@ -24,17 +24,23 @@ from async_upnp_client.profiles.dlna import DmrDevice, TransportState
 from async_upnp_client.search import async_search
 from async_upnp_client.utils import CaseInsensitiveDict
 
-from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
+from music_assistant.common.models.config_entries import (
+    CONF_ENTRY_CROSSFADE_DURATION,
+    CONF_ENTRY_FLOW_MODE,
+    ConfigEntry,
+    ConfigValueType,
+)
 from music_assistant.common.models.enums import (
     ConfigEntryType,
+    ContentType,
     PlayerFeature,
     PlayerState,
     PlayerType,
 )
-from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty
+from music_assistant.common.models.errors import PlayerUnavailableError
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_PLAYERS
+from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE, CONF_PLAYERS
 from music_assistant.server.helpers.didl_lite import create_didl_metadata
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -46,13 +52,50 @@ if TYPE_CHECKING:
     from music_assistant.server import MusicAssistant
     from music_assistant.server.models import ProviderInstanceType
 
-PLAYER_FEATURES = (
-    PlayerFeature.SET_MEMBERS,
-    PlayerFeature.SYNC,
+BASE_PLAYER_FEATURES = (
     PlayerFeature.VOLUME_MUTE,
     PlayerFeature.VOLUME_SET,
 )
 
+CONF_ENQUEUE_NEXT = "enqueue_next"
+CONF_ENFORCE_MP3 = "enforce_mp3"
+
+PLAYER_CONFIG_ENTRIES = (
+    ConfigEntry(
+        key=CONF_ENQUEUE_NEXT,
+        type=ConfigEntryType.BOOLEAN,
+        label="Player supports enqueue next/gapless",
+        default_value=False,
+        description="If the player supports enqueuing the next item for fluid/gapless playback. "
+        "\n\nUnfortunately this feature is missing or broken on many DLNA players. \n"
+        "Enable it with care. If music stops after one song, disable this setting.",
+    ),
+    ConfigEntry(
+        key=CONF_CROSSFADE,
+        type=ConfigEntryType.BOOLEAN,
+        label="Enable crossfade",
+        default_value=False,
+        description="Enable a crossfade transition between (queue) tracks. \n\n"
+        "Note that DLNA does not natively support crossfading so you need to enable "
+        "the 'flow mode' workaround to use crossfading with DLNA players.",
+        advanced=False,
+        depends_on=CONF_FLOW_MODE,
+    ),
+    CONF_ENTRY_FLOW_MODE,
+    CONF_ENTRY_CROSSFADE_DURATION,
+    ConfigEntry(
+        key=CONF_ENFORCE_MP3,
+        type=ConfigEntryType.BOOLEAN,
+        label="Enforce (lossy) mp3 stream",
+        default_value=False,
+        description="By default, Music Assistant sends lossless, high quality audio "
+        "to all players. Some players can not deal with that and require the stream to be packed "
+        "into a lossy mp3 codec. \n\n "
+        "Only enable when needed. Saves some bandwidth at the cost of audio quality.",
+        advanced=True,
+    ),
+)
+
 CONF_NETWORK_SCAN = "network_scan"
 
 _DLNAPlayerProviderT = TypeVar("_DLNAPlayerProviderT", bound="DLNAPlayerProvider")
@@ -142,12 +185,7 @@ class DLNAPlayer:
     # Track BOOTID in SSDP advertisements for device changes
     bootid: int | None = None
     last_seen: float = field(default_factory=time.time)
-    next_url: str | None = None
-    next_item: QueueItem | None = None
-    supports_next_uri: bool | None = None
-    end_of_track_reached: float | None = None
     last_command: float = field(default_factory=time.time)
-    need_elapsed_time_workaround: bool = False
 
     def update_attributes(self):
         """Update attributes of the MA Player from DLNA state."""
@@ -160,7 +198,16 @@ class DLNAPlayer:
             self.player.volume_muted = self.device.is_volume_muted or False
             self.player.state = self.get_state(self.device)
             self.player.supported_features = self.get_supported_features(self.device)
-            self.player.current_url = self.device.current_track_uri or ""
+            self.player.current_item_id = self.device.current_track_uri or ""
+            if self.player.player_id in self.player.current_item_id:
+                self.player.active_source = self.player.player_id
+            elif "spotify" in self.player.current_item_id:
+                self.player.active_source = "spotify"
+            elif self.player.current_item_id.startswith("http"):
+                self.player.active_source = "http"
+            else:
+                # TODO: handle other possible sources here
+                self.player.active_source = None
             if self.device.media_position:
                 # only update elapsed_time if the device actually reports it
                 self.player.elapsed_time = float(self.device.media_position)
@@ -168,15 +215,6 @@ class DLNAPlayer:
                     self.player.elapsed_time_last_updated = (
                         self.device.media_position_updated_at.timestamp()
                     )
-            # some dlna players get stuck at the end of the track and won't
-            # automatically play the next track, try to workaround that
-            if (
-                self.device.media_duration
-                and self.player.corrected_elapsed_time
-                and self.player.state == PlayerState.PLAYING
-                and (self.device.media_duration - self.player.corrected_elapsed_time) <= 10
-            ):
-                self.end_of_track_reached = time.time()
         else:
             # device is unavailable
             self.player.available = False
@@ -220,9 +258,6 @@ class DLNAPlayer:
         if device.has_volume_mute:
             supported_features.add(PlayerFeature.VOLUME_MUTE)
 
-        if device.can_seek_rel_time or device.can_seek_abs_time:
-            supported_features.add(PlayerFeature.SEEK)
-
         return supported_features
 
 
@@ -257,19 +292,28 @@ class DLNAPlayerProvider(PlayerProvider):
             for dlna_player in self.dlnaplayers.values():
                 tg.create_task(self._device_disconnect(dlna_player))
 
+    async def get_player_config_entries(
+        self, player_id: str  # noqa: ARG002
+    ) -> tuple[ConfigEntry, ...]:
+        """Return all (provider/player specific) Config Entries for the given player (if any)."""
+        base_entries = await super().get_player_config_entries(player_id)
+        return base_entries + PLAYER_CONFIG_ENTRIES
+
     def on_player_config_changed(
         self, config: PlayerConfig, changed_keys: set[str]  # noqa: ARG002
     ) -> None:
         """Call (by config manager) when the configuration of a player changes."""
         # run discovery to catch any re-enabled players
         self.mass.create_task(self._run_discovery())
+        # reset player features based on config values
+        if not (dlna_player := self.dlnaplayers.get(config.player_id)):
+            return
+        self._set_player_features(dlna_player)
 
     @catch_request_errors
     async def cmd_stop(self, player_id: str) -> None:
         """Send STOP command to given player."""
         dlna_player = self.dlnaplayers[player_id]
-        dlna_player.end_of_track_reached = None
-        dlna_player.next_url = None
         assert dlna_player.device is not None
         await dlna_player.device.async_stop()
 
@@ -281,31 +325,42 @@ class DLNAPlayerProvider(PlayerProvider):
         await dlna_player.device.async_play()
 
     @catch_request_errors
-    async def cmd_play_url(
+    async def play_media(
         self,
         player_id: str,
-        url: str,
-        queue_item: QueueItem | None,
+        queue_item: QueueItem,
+        seek_position: int,
+        fade_in: bool,
     ) -> None:
-        """Send PLAY URL command to given player.
+        """Handle PLAY MEDIA on given player.
 
-        This is called when the Queue wants the player to start playing a specific url.
-        If an item from the Queue is being played, the QueueItem will be provided with
-        all metadata present.
+        This is called by the Queue controller to start playing a queue item on the given player.
+        The provider's own implementation should work out how to handle this request.
 
             - player_id: player_id of the player to handle the command.
-            - url: the url that the player should start playing.
-            - queue_item: the QueueItem that is related to the URL (None when playing direct url).
+            - queue_item: The QueueItem that needs to be played on the player.
+            - seek_position: Optional seek to this position.
+            - fade_in: Optionally fade in the item at playback start.
         """
+        # DLNA players do not support crossfading so we enforce flow mode to provide this feature
+        use_flow_mode = await self.mass.config.get_player_config_value(
+            player_id, CONF_FLOW_MODE
+        ) or await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
+        enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
+        url = await self.mass.streams.resolve_stream_url(
+            queue_item=queue_item,
+            output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
+            seek_position=seek_position,
+            fade_in=fade_in,
+            flow_mode=use_flow_mode,
+        )
         dlna_player = self.dlnaplayers[player_id]
-
         # always clear queue (by sending stop) first
         if dlna_player.device.can_stop:
             await self.cmd_stop(player_id)
-        dlna_player.next_url = None
-        dlna_player.end_of_track_reached = None
-
-        didl_metadata = create_didl_metadata(self.mass, url, queue_item)
+        didl_metadata = create_didl_metadata(
+            self.mass, url, queue_item if not use_flow_mode else None
+        )
         title = queue_item.name if queue_item else "Music Assistant"
         await dlna_player.device.async_set_transport_uri(url, title, didl_metadata)
         # Play it
@@ -316,13 +371,55 @@ class DLNAPlayerProvider(PlayerProvider):
         dlna_player.player.elapsed_time = 0
         dlna_player.player.elapsed_time_last_updated = now
         await dlna_player.device.async_play()
-
         # force poll the device
         for sleep in (1, 2):
             await asyncio.sleep(sleep)
             dlna_player.force_poll = True
             await self.poll_player(dlna_player.udn)
 
+    @catch_request_errors
+    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+        """
+        Handle enqueuing of the next queue item on the player.
+
+        If the player supports PlayerFeature.ENQUE_NEXT:
+          This will be called about 10 seconds before the end of the track.
+        If the player does NOT report support for PlayerFeature.ENQUE_NEXT:
+          This will be called when the end of the track is reached.
+
+        A PlayerProvider implementation is in itself responsible for handling this
+        so that the queue items keep playing until its empty or the player stopped.
+
+        This will NOT be called if the end of the queue is reached (and repeat disabled).
+        This will NOT be called if flow mode is enabled on the queue.
+        """
+        dlna_player = self.dlnaplayers[player_id]
+        url = await self.mass.streams.resolve_stream_url(
+            queue_item=queue_item,
+            output_codec=ContentType.FLAC,
+        )
+        didl_metadata = create_didl_metadata(self.mass, url, queue_item)
+        title = queue_item.name
+        if self.mass.config.get_raw_player_config_value(player_id, CONF_ENQUEUE_NEXT, False):
+            # use the 'next_transport_uri' feature
+            await dlna_player.device.async_set_next_transport_uri(url, title, didl_metadata)
+            self.logger.debug(
+                "Enqued next track (%s) to player %s",
+                title,
+                dlna_player.player.display_name,
+            )
+        else:
+            # simply use regular play command
+            await dlna_player.device.async_set_transport_uri(url, title, didl_metadata)
+            # Play it
+            await dlna_player.device.async_wait_for_can_play(10)
+            # optimistically set this timestamp to help in case of a player
+            # that does not report the progress
+            now = time.time()
+            dlna_player.player.elapsed_time = 0
+            dlna_player.player.elapsed_time_last_updated = now
+            await dlna_player.device.async_play()
+
     @catch_request_errors
     async def cmd_pause(self, player_id: str) -> None:
         """Send PAUSE command to given player."""
@@ -416,6 +513,9 @@ class DLNAPlayerProvider(PlayerProvider):
                 if ssdp_udn in discovered_devices:
                     # already processed this device
                     return
+                if "rincon" in ssdp_udn.lower():
+                    # ignore Sonos devices
+                    return
 
                 discovered_devices.add(ssdp_udn)
 
@@ -466,10 +566,9 @@ class DLNAPlayerProvider(PlayerProvider):
                 dlna_player.description_url = description_url
             else:
                 # new player detected, setup our DLNAPlayer wrapper
-
-                # ignore disabled players
                 conf_key = f"{CONF_PLAYERS}/{udn}/enabled"
                 enabled = self.mass.config.get(conf_key, True)
+                # ignore disabled players
                 if not enabled:
                     self.logger.debug("Ignoring disabled player: %s", udn)
                     return
@@ -485,7 +584,6 @@ class DLNAPlayerProvider(PlayerProvider):
                         name=udn,
                         available=False,
                         powered=False,
-                        supported_features=PLAYER_FEATURES,
                         # device info will be discovered later after connect
                         device_info=DeviceInfo(
                             model="unknown",
@@ -503,6 +601,7 @@ class DLNAPlayerProvider(PlayerProvider):
 
             await self._device_connect(dlna_player)
 
+            self._set_player_features(dlna_player)
             dlna_player.update_attributes()
             self.mass.players.register_or_update(dlna_player.player)
 
@@ -577,50 +676,12 @@ class DLNAPlayerProvider(PlayerProvider):
         dlna_player.last_seen = time.time()
         self.mass.create_task(self._update_player(dlna_player))
 
-    async def _enqueue_next_track(self, dlna_player: DLNAPlayer) -> None:
-        """Enqueue the next track of the MA queue on the CC queue."""
-        try:
-            (
-                next_url,
-                next_item,
-                _,
-            ) = await self.mass.player_queues.preload_next_url(dlna_player.udn)
-        except QueueEmpty:
-            return
-
-        if dlna_player.next_url == next_url:
-            return  # already set ?!
-        dlna_player.next_url = next_url
-        dlna_player.next_item = next_item
-
-        # no need to try setting the next url if we already know the player does not support it
-        if dlna_player.supports_next_uri is False:
-            return
-
-        # send queue item to dlna queue
-        didl_metadata = create_didl_metadata(self.mass, next_url, next_item)
-        title = next_item.name if next_item else "Music Assistant"
-        try:
-            await dlna_player.device.async_set_next_transport_uri(next_url, title, didl_metadata)
-        except UpnpError:
-            dlna_player.supports_next_uri = False
-            self.logger.info(
-                "Player does not support next transport uri feature, "
-                "gapless playback is not possible."
-            )
-
-        self.logger.debug(
-            "Enqued next track (%s) to player %s",
-            title,
-            dlna_player.player.display_name,
-        )
-
     async def _update_player(self, dlna_player: DLNAPlayer) -> None:
         """Update DLNA Player."""
-        prev_url = dlna_player.player.current_url
+        prev_url = dlna_player.player.current_item_id
         prev_state = dlna_player.player.state
         dlna_player.update_attributes()
-        current_url = dlna_player.player.current_url
+        current_url = dlna_player.player.current_item_id
         current_state = dlna_player.player.state
 
         if (prev_url != current_url) or (prev_state != current_state):
@@ -630,30 +691,11 @@ class DLNAPlayerProvider(PlayerProvider):
         # let the MA player manager work out if something actually updated
         self.mass.players.update(dlna_player.udn)
 
-        # enqueue next item if needed
-        if (
-            dlna_player.player.state == PlayerState.PLAYING
-            and dlna_player.player.active_source == dlna_player.player.player_id
-            and dlna_player.next_url in (None, dlna_player.player.current_url)
-            # prevent race conditions at start/stop by doing this check
-            and (time.time() - dlna_player.last_command) > 4
-        ):
-            self.mass.create_task(self._enqueue_next_track(dlna_player))
-        # if player does not support next uri, manual play it
-        if (
-            (
-                dlna_player.supports_next_uri is False
-                or (dlna_player.supports_next_uri is None and dlna_player.end_of_track_reached)
-            )
-            and prev_state == PlayerState.PLAYING
-            and current_state == PlayerState.IDLE
-            and dlna_player.next_url
-        ):
-            self.logger.warning(
-                "Player does not support next_uri and end of track reached, "
-                "sending next url manually."
+    def _set_player_features(self, dlna_player: DLNAPlayer) -> None:
+        """Set Player Features based on config values and capabilities."""
+        dlna_player.player.supported_features = BASE_PLAYER_FEATURES
+        player_id = dlna_player.player.player_id
+        if self.mass.config.get_raw_player_config_value(player_id, CONF_ENQUEUE_NEXT, False):
+            dlna_player.player.supported_features = dlna_player.player.supported_features + (
+                PlayerFeature.ENQUEUE_NEXT,
             )
-            await self.cmd_play_url(dlna_player.udn, dlna_player.next_url, dlna_player.next_item)
-            dlna_player.end_of_track_reached = False
-            dlna_player.next_url = None
-            dlna_player.supports_next_uri = False
index d3e10b1ad8a674438310bfbd5d46286e6975ddff..a0e85a976110eae7d341065b93805f41e976717e 100644 (file)
@@ -5,7 +5,7 @@ import asyncio
 import statistics
 import time
 from collections import deque
-from collections.abc import Callable, Coroutine, Generator
+from collections.abc import Callable, Coroutine
 from contextlib import suppress
 from dataclasses import dataclass
 from typing import TYPE_CHECKING, Any
@@ -17,7 +17,11 @@ from aioslimproto.const import EventType as SlimEventType
 from aioslimproto.discovery import start_discovery
 
 from music_assistant.common.models.config_entries import (
-    CONF_ENTRY_OUTPUT_CODEC,
+    CONF_ENTRY_CROSSFADE,
+    CONF_ENTRY_EQ_BASS,
+    CONF_ENTRY_EQ_MID,
+    CONF_ENTRY_EQ_TREBLE,
+    CONF_ENTRY_OUTPUT_CHANNELS,
     ConfigEntry,
     ConfigValueOption,
     ConfigValueType,
@@ -32,7 +36,7 @@ from music_assistant.common.models.enums import (
 from music_assistant.common.models.errors import QueueEmpty, SetupFailedError
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_CROSSFADE_DURATION, CONF_PORT
+from music_assistant.constants import CONF_CROSSFADE, CONF_CROSSFADE_DURATION, CONF_PORT
 from music_assistant.server.models.player_provider import PlayerProvider
 
 from .cli import LmsCli
@@ -41,9 +45,12 @@ if TYPE_CHECKING:
     from music_assistant.common.models.config_entries import ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.server import MusicAssistant
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
     from music_assistant.server.models import ProviderInstanceType
 
+
+# monkey patch the SlimClient
+SlimClient._process_stat_stmf = lambda x, y: None  # noqa: ARG005
+
 CACHE_KEY_PREV_STATE = "slimproto_prev_state"
 
 # sync constants
@@ -284,21 +291,13 @@ class SlimprotoProvider(PlayerProvider):
 
     async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
         """Return all (provider/player specific) Config Entries for the given player (if any)."""
-        # pick default codec based on capabilities
-        default_codec = ContentType.PCM
-        if client := self._socket_clients.get(player_id):
-            for fmt, fmt_type in (
-                ("flc", ContentType.FLAC),
-                ("pcm", ContentType.PCM),
-                ("mp3", ContentType.MP3),
-            ):
-                if fmt in client.supported_codecs:
-                    default_codec = fmt_type
-                    break
+        base_entries = await super().get_player_config_entries(player_id)
+        if not (client := self._socket_clients.get(player_id)):
+            return base_entries
 
         # create preset entries (for players that support it)
         preset_entries = tuple()
-        if not (client and client.device_model in self._virtual_providers):
+        if client.device_model not in self._virtual_providers:
             presets = []
             async for playlist in self.mass.music.playlists.iter_library_items(True):
                 presets.append(ConfigValueOption(playlist.name, playlist.uri))
@@ -327,27 +326,33 @@ class SlimprotoProvider(PlayerProvider):
                 for index in range(1, preset_count + 1)
             )
 
-        return preset_entries + (
-            ConfigEntry(
-                key=CONF_SYNC_ADJUST,
-                type=ConfigEntryType.INTEGER,
-                range=(0, 1500),
-                default_value=0,
-                label="Audio synchronization delay correction",
-                description="If this player is playing audio synced with other players "
-                "and you always hear the audio too late on this player, "
-                "you can shift the audio a bit.",
-                advanced=True,
-            ),
-            CONF_ENTRY_CROSSFADE_DURATION,
-            ConfigEntry.from_dict(
-                {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "default_value": default_codec}
-            ),
+        return (
+            base_entries
+            + preset_entries
+            + (
+                CONF_ENTRY_CROSSFADE,
+                CONF_ENTRY_EQ_BASS,
+                CONF_ENTRY_EQ_MID,
+                CONF_ENTRY_EQ_TREBLE,
+                CONF_ENTRY_OUTPUT_CHANNELS,
+                CONF_ENTRY_CROSSFADE_DURATION,
+                ConfigEntry(
+                    key=CONF_SYNC_ADJUST,
+                    type=ConfigEntryType.INTEGER,
+                    range=(0, 1500),
+                    default_value=0,
+                    label="Audio synchronization delay correction",
+                    description="If this player is playing audio synced with other players "
+                    "and you always hear the audio too late on this player, "
+                    "you can shift the audio a bit.",
+                    advanced=True,
+                ),
+            )
         )
 
     async def cmd_stop(self, player_id: str) -> None:
         """Send STOP command to given player."""
-        # forward command to player and any connected sync child's
+        # forward command to player and any connected sync members
         for client in self._get_sync_clients(player_id):
             if client.state == SlimPlayerState.STOPPED:
                 continue
@@ -357,7 +362,7 @@ class SlimprotoProvider(PlayerProvider):
 
     async def cmd_play(self, player_id: str) -> None:
         """Send PLAY command to given player."""
-        # forward command to player and any connected sync child's
+        # forward command to player and any connected sync members
         async with asyncio.TaskGroup() as tg:
             for client in self._get_sync_clients(player_id):
                 if client.state not in (
@@ -368,64 +373,75 @@ class SlimprotoProvider(PlayerProvider):
                     continue
                 tg.create_task(client.play())
 
-    async def cmd_play_url(
+    async def play_media(
         self,
         player_id: str,
-        url: str,
-        queue_item: QueueItem | None,
+        queue_item: QueueItem,
+        seek_position: int,
+        fade_in: bool,
     ) -> None:
-        """Send PLAY URL command to given player.
+        """Handle PLAY MEDIA on given player.
 
-        This is called when the Queue wants the player to start playing a specific url.
-        If an item from the Queue is being played, the QueueItem will be provided with
-        all metadata present.
+        This is called by the Queue controller to start playing a queue item on the given player.
+        The provider's own implementation should work out how to handle this request.
 
             - player_id: player_id of the player to handle the command.
-            - url: the url that the player should start playing.
-            - queue_item: the QueueItem that is related to the URL (None when playing direct url).
+            - queue_item: The QueueItem that needs to be played on the player.
+            - seek_position: Optional seek to this position.
+            - fade_in: Optionally fade in the item at playback start.
         """
-        # send stop first
-        await self.cmd_stop(player_id)
-
         player = self.mass.players.get(player_id)
         if player.synced_to:
             raise RuntimeError("A synced player cannot receive play commands directly")
-
-        # forward command to player and any connected sync child's
-        sync_clients = [x for x in self._get_sync_clients(player_id)]
-        async with asyncio.TaskGroup() as tg:
-            for client in sync_clients:
-                tg.create_task(
-                    self._handle_play_url(
-                        client,
-                        url=url,
-                        queue_item=queue_item,
-                        send_flush=True,
-                        auto_play=len(sync_clients) == 1,
-                    )
-                )
-
-    async def cmd_handle_stream_job(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle StreamJob play command on given player."""
-        # send stop first
+        # stop any existing streams first
         await self.cmd_stop(player_id)
-
-        player = self.mass.players.get(player_id)
-        if player.synced_to:
-            raise RuntimeError("A synced player cannot receive play commands directly")
-        sync_clients = [x for x in self._get_sync_clients(player_id)]
-        async with asyncio.TaskGroup() as tg:
-            for client in sync_clients:
-                url = await stream_job.resolve_stream_url(client.player_id)
-                tg.create_task(
-                    self._handle_play_url(
-                        client,
-                        url=url,
-                        queue_item=None,
-                        send_flush=True,
-                        auto_play=len(sync_clients) == 1,
+        if player.group_childs:
+            # player has sync members, we need to start a multi client stream job
+            stream_job = await self.mass.streams.create_multi_client_stream_job(
+                queue_id=queue_item.queue_id,
+                start_queue_item=queue_item,
+                seek_position=int(seek_position),
+                fade_in=fade_in,
+            )
+            # forward command to player and any connected sync members
+            sync_clients = self._get_sync_clients(player_id)
+            async with asyncio.TaskGroup() as tg:
+                for client in sync_clients:
+                    tg.create_task(
+                        self._handle_play_url(
+                            client,
+                            url=stream_job.resolve_stream_url(
+                                client.player_id, output_codec=ContentType.FLAC
+                            ),
+                            queue_item=None,
+                            send_flush=True,
+                            auto_play=False,
+                        )
                     )
-                )
+        else:
+            # regular, single player playback
+            client = self._socket_clients[player_id]
+            url = await self.mass.streams.resolve_stream_url(
+                queue_item=queue_item,
+                # for now just hardcode flac as we assume that every (modern)
+                # slimproto based player can handle that just fine
+                output_codec=ContentType.FLAC,
+                seek_position=seek_position,
+                fade_in=fade_in,
+                flow_mode=False,
+            )
+            await self._handle_play_url(
+                client,
+                url=url,
+                queue_item=queue_item,
+                send_flush=True,
+                auto_play=True,
+            )
+
+    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+        """Handle enqueuing of the next queue item on the player."""
+        # we don't have to do anything,
+        # enqueuing the next item is handled in the buffer ready callback
 
     async def _handle_play_url(
         self,
@@ -433,12 +449,11 @@ class SlimprotoProvider(PlayerProvider):
         url: str,
         queue_item: QueueItem | None,
         send_flush: bool = True,
-        crossfade: bool = False,
         auto_play: bool = False,
     ) -> None:
-        """Handle PlayMedia on slimproto player(s)."""
+        """Handle playback of an url on slimproto player(s)."""
         player_id = client.player_id
-        if crossfade:
+        if crossfade := await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE):
             transition_duration = await self.mass.config.get_player_config_value(
                 player_id, CONF_CROSSFADE_DURATION
             )
@@ -450,7 +465,7 @@ class SlimprotoProvider(PlayerProvider):
             mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
             metadata={"item_id": queue_item.queue_item_id, "title": queue_item.name}
             if queue_item
-            else None,
+            else {"item_id": client.player_id, "title": "Music Assistant"},
             send_flush=send_flush,
             transition=SlimTransition.CROSSFADE if crossfade else SlimTransition.NONE,
             transition_duration=transition_duration,
@@ -462,7 +477,7 @@ class SlimprotoProvider(PlayerProvider):
 
     async def cmd_pause(self, player_id: str) -> None:
         """Send PAUSE command to given player."""
-        # forward command to player and any connected sync child's
+        # forward command to player and any connected sync members
         async with asyncio.TaskGroup() as tg:
             for client in self._get_sync_clients(player_id):
                 if client.state not in (
@@ -590,7 +605,12 @@ class SlimprotoProvider(PlayerProvider):
 
         # update player state on player events
         player.available = True
-        player.current_url = client.current_url
+        player.current_item_id = (
+            client.current_metadata.get("item_id")
+            if client.current_metadata
+            else client.current_url
+        )
+        player.active_source = player.player_id
         player.name = client.name
         player.powered = client.powered
         player.state = STATE_MAP[client.state]
@@ -632,7 +652,7 @@ class SlimprotoProvider(PlayerProvider):
         player = self.mass.players.get(client.player_id)
         sync_master_id = player.synced_to
         if not sync_master_id:
-            # we only correct sync child's, not the sync master itself
+            # we only correct sync members, not the sync master itself
             return
         if sync_master_id not in self._socket_clients:
             return  # just here as a guard as bad things can happen
@@ -712,24 +732,20 @@ class SlimprotoProvider(PlayerProvider):
             return
         if player.active_source != player.player_id:
             return
-        try:
-            next_url, next_item, crossfade = await self.mass.player_queues.preload_next_url(
-                client.player_id
+        with suppress(QueueEmpty):
+            next_item = await self.mass.player_queues.preload_next_item(client.player_id)
+            url = await self.mass.streams.resolve_stream_url(
+                queue_item=next_item,
+                output_codec=ContentType.FLAC,
+                flow_mode=False,
+            )
+            await self._handle_play_url(
+                client,
+                url=url,
+                queue_item=next_item,
+                send_flush=False,
+                auto_play=True,
             )
-            async with asyncio.TaskGroup() as tg:
-                for client in self._get_sync_clients(client.player_id):
-                    tg.create_task(
-                        self._handle_play_url(
-                            client,
-                            url=next_url,
-                            queue_item=next_item,
-                            send_flush=False,
-                            crossfade=crossfade,
-                            auto_play=True,
-                        )
-                    )
-        except QueueEmpty:
-            pass
 
     async def _handle_buffer_ready(self, client: SlimClient) -> None:
         """Handle buffer ready event, player has buffered a (new) track."""
@@ -825,15 +841,17 @@ class SlimprotoProvider(PlayerProvider):
         # https://wiki.slimdevices.com/index.php/SlimProto_TCP_protocol.html#u.2C_p.2C_a_.26_t_commands_and_replay_gain_field
         await client.send_strm(b"a", replay_gain=int(millis))
 
-    def _get_sync_clients(self, player_id: str) -> Generator[SlimClient]:
+    def _get_sync_clients(self, player_id: str) -> list[SlimClient]:
         """Get all sync clients for a player."""
         player = self.mass.players.get(player_id)
+        sync_clients: list[SlimClient] = []
         # we need to return the player itself too
         group_child_ids = {player_id}
         group_child_ids.update(player.group_childs)
         for child_id in group_child_ids:
             if client := self._socket_clients.get(child_id):
-                yield client
+                sync_clients.append(client)
+        return sync_clients
 
     def _get_corrected_elapsed_milliseconds(self, client: SlimClient) -> int:
         """Return corrected elapsed milliseconds."""
index 85c8ff1f68c36cab7612f9bc9bee5c4242ede59c..4a5f1505ad25ef9c635ac84b64236c390f0cf4f8 100644 (file)
@@ -915,9 +915,6 @@ class LmsCli:
             new_repeat_mode = repeat_map.get(int(arg))
             self.mass.player_queues.set_repeat(queue.queue_id, new_repeat_mode)
             return
-        if subcommand == "crossfade":
-            self.mass.player_queues.set_crossfade(queue.queue_id, bool(arg))
-            return
 
         self.logger.warning("Unhandled command: playlist/%s", subcommand)
 
index 4ca85d81986387ab1cacd0ca46832c126be5c7fa..2660f223a6d587c923db6717e0e59f1bea4aa444 100644 (file)
@@ -2,34 +2,43 @@
 from __future__ import annotations
 
 import asyncio
+import random
 import time
-import uuid
-from typing import TYPE_CHECKING
+from contextlib import suppress
+from typing import TYPE_CHECKING, cast
 
-from ffmpeg import FFmpegError
-from ffmpeg.asyncio import FFmpeg
 from snapcast.control import create_server
-from snapcast.control.client import Snapclient as SnapClient
-from snapcast.control.group import Snapgroup as SnapGroup
-from snapcast.control.stream import Snapstream as SnapStream
-
-from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
+from snapcast.control.client import Snapclient
+from snapcast.control.group import Snapgroup
+from snapcast.control.stream import Snapstream
+
+from music_assistant.common.models.config_entries import (
+    CONF_ENTRY_CROSSFADE,
+    CONF_ENTRY_CROSSFADE_DURATION,
+    ConfigEntry,
+    ConfigValueType,
+)
 from music_assistant.common.models.enums import (
     ConfigEntryType,
+    ContentType,
     PlayerFeature,
     PlayerState,
     PlayerType,
 )
 from music_assistant.common.models.errors import SetupFailedError
+from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.common.models.queue_item import QueueItem
 from music_assistant.server.models.player_provider import PlayerProvider
 
 if TYPE_CHECKING:
+    from snapcast.control.server import Snapserver
+
     from music_assistant.common.models.config_entries import ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.server import MusicAssistant
     from music_assistant.server.models import ProviderInstanceType
+
 CONF_SNAPCAST_SERVER_HOST = "snapcast_server_host"
 CONF_SNAPCAST_SERVER_CONTROL_PORT = "snapcast_server_control_port"
 
@@ -84,14 +93,16 @@ async def get_config_entries(
 class SnapCastProvider(PlayerProvider):
     """Player provider for Snapcast based players."""
 
-    _snapserver: [asyncio.Server | asyncio.BaseTransport]
+    _snapserver: Snapserver
     snapcast_server_host: str
     snapcast_server_control_port: int
+    _stream_tasks: dict[str, asyncio.Task]
 
     async def handle_setup(self) -> None:
         """Handle async initialization of the provider."""
         self.snapcast_server_host = self.config.get_value(CONF_SNAPCAST_SERVER_HOST)
         self.snapcast_server_control_port = self.config.get_value(CONF_SNAPCAST_SERVER_CONTROL_PORT)
+        self._stream_tasks = {}
         try:
             self._snapserver = await create_server(
                 self.mass.loop,
@@ -105,8 +116,8 @@ class SnapCastProvider(PlayerProvider):
                 f"Started Snapserver connection on:"
                 f"{self.snapcast_server_host}:{self.snapcast_server_control_port}"
             )
-        except OSError:
-            raise SetupFailedError("Unable to start the Snapserver connection ?")
+        except OSError as err:
+            raise SetupFailedError("Unable to start the Snapserver connection ?") from err
 
     def _handle_update(self) -> None:
         """Process Snapcast init Player/Group and set callback ."""
@@ -118,18 +129,17 @@ class SnapCastProvider(PlayerProvider):
         for snap_group in self._snapserver.groups:
             snap_group.set_callback(self._handle_group_update)
 
-    def _handle_group_update(self, snap_group: SnapGroup) -> None:  # noqa: ARG002
+    def _handle_group_update(self, snap_group: Snapgroup) -> None:  # noqa: ARG002
         """Process Snapcast group callback."""
         for snap_client in self._snapserver.clients:
             self._handle_player_update(snap_client)
 
-    def _handle_player_init(self, snap_client: SnapClient) -> None:
+    def _handle_player_init(self, snap_client: Snapclient) -> None:
         """Process Snapcast add to Player controller."""
         player_id = snap_client.identifier
         player = self.mass.players.get(player_id, raise_unavailable=False)
         if not player:
-            snap_client = self._snapserver.client(player_id)
-            self.mass.create_task(self._set_snapclient_empty_stream(player_id))
+            snap_client = cast(Snapclient, self._snapserver.client(player_id))
             player = Player(
                 player_id=player_id,
                 provider=self.domain,
@@ -150,7 +160,7 @@ class SnapCastProvider(PlayerProvider):
             )
         self.mass.players.register_or_update(player)
 
-    def _handle_player_update(self, snap_client: SnapClient) -> None:
+    def _handle_player_update(self, snap_client: Snapclient) -> None:
         """Process Snapcast update to Player controller."""
         player_id = snap_client.identifier
         player = self.mass.players.get(player_id)
@@ -165,6 +175,10 @@ class SnapCastProvider(PlayerProvider):
         )
         player.synced_to = self._synced_to(player_id)
         player.group_childs = self._group_childs(player_id)
+        if player.current_item_id and player_id in player.current_item_id:
+            player.active_source = player_id
+        elif stream := self._get_snapstream(player_id):
+            player.active_source = stream.name
         self.mass.players.register_or_update(player)
 
     async def unload(self) -> None:
@@ -173,85 +187,31 @@ class SnapCastProvider(PlayerProvider):
             await self.cmd_stop(client.identifier)
         self._snapserver.stop()
 
+    async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
+        """Return all (provider/player specific) Config Entries for the given player (if any)."""
+        base_entries = await super().get_player_config_entries(player_id)
+        return base_entries + (
+            CONF_ENTRY_CROSSFADE,
+            CONF_ENTRY_CROSSFADE_DURATION,
+        )
+
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
         """Send VOLUME_SET command to given player."""
-        mass_player = self.mass.players.get(player_id)
-        if mass_player.volume_level != volume_level:
-            await self._snapserver.client_volume(
-                player_id, {"percent": volume_level, "muted": mass_player.volume_muted}
-            )
-            self.cmd_volume_mute(player_id, False)
-
-    async def cmd_play_url(
-        self,
-        player_id: str,
-        url: str,
-        queue_item: QueueItem | None,  # noqa: ARG002
-    ) -> None:
-        """Send PLAY URL command to given player.
-
-        This is called when the Queue wants the player to start playing a specific url.
-        If an item from the Queue is being played, the QueueItem will be provided with
-        all metadata present.
-
-            - player_id: player_id of the player to handle the command.
-            - url: the url that the player should start playing.
-            - queue_item: the QueueItem that is related to the URL (None when playing direct url).
-        """
-        player = self.mass.players.get(player_id)
-        stream = await self._set_snapclient_empty_stream(player_id)
-
-        stream_host = stream._stream.get("uri").get("host")
-        stream_host = stream_host.replace("0.0.0.0", self.snapcast_server_host)
-        ffmpeg = (
-            FFmpeg()
-            .option("y")
-            .option("re")
-            .input(url=url)
-            .output(
-                f"tcp://{stream_host}",
-                f="s16le",
-                acodec="pcm_s16le",
-                ac=2,
-                ar=48000,
-            )
+        await self._snapserver.client_volume(
+            player_id, {"percent": volume_level, "muted": volume_level != 0}
         )
 
-        await self.cmd_stop(player_id)
-
-        ffmpeg_task = self.mass.create_task(ffmpeg.execute())
-
-        @ffmpeg.on("start")
-        async def on_start(arguments: list[str]):
-            self.logger.debug("Ffmpeg stream is running")
-            stream.ffmpeg = ffmpeg
-            stream.ffmpeg_task = ffmpeg_task
-            player.current_url = url
-            player.elapsed_time = 0
-            player.elapsed_time_last_updated = time.time()
-            player.state = PlayerState.PLAYING
-            self._set_childs_state(player_id, PlayerState.PLAYING)
-            self.mass.players.register_or_update(player)
-
     async def cmd_stop(self, player_id: str) -> None:
         """Send STOP command to given player."""
         player = self.mass.players.get(player_id, raise_unavailable=False)
-        if player.state != PlayerState.IDLE:
-            stream = self._get_snapstream(player_id)
-            if hasattr(stream, "ffmpeg_task") and stream.ffmpeg_task.done() is False:
-                try:
-                    stream.ffmpeg.terminate()
-                    stream.ffmpeg_task.cancel()
-                    self.logger.debug("ffmpeg player stopped")
-                except FFmpegError:
-                    self.logger.debug("Fail to stop ffmpeg player")
-                player.state = PlayerState.IDLE
-                self._set_childs_state(player_id, PlayerState.IDLE)
-                self.mass.players.register_or_update(player)
-
-    async def cmd_pause(self, player_id: str) -> None:
-        """Send PAUSE command to given player."""
-        await self.cmd_stop(player_id)
+        if stream_task := self._stream_tasks.pop(player_id, None):  # noqa: SIM102
+            if not stream_task.done():
+                stream_task.cancel()
+        player.state = PlayerState.IDLE
+        self._set_childs_state(player_id, PlayerState.IDLE)
+        self.mass.players.register_or_update(player)
+        # assign default/empty stream to the player
+        await self._get_snapgroup(player_id).set_stream("default")
 
     async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
         """Send MUTE command to given player."""
@@ -266,20 +226,87 @@ class SnapCastProvider(PlayerProvider):
         """Unsync Snapcast player."""
         group = self._get_snapgroup(player_id)
         await group.remove_client(player_id)
-        group = self._get_snapgroup(player_id)
-        stream_id = await self._get_empty_stream()
-        await group.set_stream(stream_id)
+        # assign default/empty stream to the player
+        await self._get_snapgroup(player_id).set_stream("default")
         self._handle_update()
 
-    def _get_snapgroup(self, player_id: str) -> SnapGroup:
+    async def play_media(
+        self,
+        player_id: str,
+        queue_item: QueueItem,
+        seek_position: int,
+        fade_in: bool,
+    ) -> None:
+        """Handle PLAY MEDIA on given player.
+
+        This is called by the Queue controller to start playing a queue item on the given player.
+        The provider's own implementation should work out how to handle this request.
+
+            - player_id: player_id of the player to handle the command.
+            - queue_item: The QueueItem that needs to be played on the player.
+            - seek_position: Optional seek to this position.
+            - fade_in: Optionally fade in the item at playback start.
+        """
+        player = self.mass.players.get(player_id)
+        if player.synced_to:
+            raise RuntimeError("A synced player cannot receive play commands directly")
+        # stop any existing streams first
+        await self.cmd_stop(player_id)
+        queue = self.mass.player_queues.get(queue_item.queue_id)
+        stream, port = await self._create_stream()
+        snap_group = self._get_snapgroup(player_id)
+        await snap_group.set_stream(stream.identifier)
+
+        async def queue_streamer():
+            host = self.snapcast_server_host
+            _, writer = await asyncio.open_connection(host, port)
+            self.logger.debug("Opened connection to %s:%s", host, port)
+            player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
+            player.elapsed_time = 0
+            player.elapsed_time_last_updated = time.time()
+            player.state = PlayerState.PLAYING
+            self._set_childs_state(player_id, PlayerState.PLAYING)
+            self.mass.players.register_or_update(player)
+            # TODO: can we handle 24 bits bit depth ?
+            pcm_format = AudioFormat(
+                content_type=ContentType.PCM_S16LE,
+                sample_rate=48000,
+                bit_depth=16,
+                channels=2,
+            )
+            try:
+                async for pcm_chunk in self.mass.streams.get_flow_stream(
+                    queue,
+                    start_queue_item=queue_item,
+                    pcm_format=pcm_format,
+                    seek_position=seek_position,
+                    fade_in=fade_in,
+                ):
+                    writer.write(pcm_chunk)
+                    await writer.drain()
+
+            finally:
+                await self._snapserver.stream_remove_stream(stream.identifier)
+                if writer.can_write_eof():
+                    writer.close()
+                if not writer.is_closing():
+                    writer.close()
+                self.logger.debug("Closed connection to %s:%s", host, port)
+
+        # start streaming the queue (pcm) audio in a background task
+        self._stream_tasks[player_id] = asyncio.create_task(queue_streamer())
+
+    def _get_snapgroup(self, player_id: str) -> Snapgroup:
         """Get snapcast group for given player_id."""
-        client = self._snapserver.client(player_id)
+        client: Snapclient = self._snapserver.client(player_id)
         return client.group
 
-    def _get_snapstream(self, player_id: str) -> SnapStream:
+    def _get_snapstream(self, player_id: str) -> Snapstream | None:
         """Get snapcast stream for given player_id."""
-        group = self._get_snapgroup(player_id)
-        return self._snapserver.stream(group.stream)
+        if group := self._get_snapgroup(player_id):
+            with suppress(KeyError):
+                return self._snapserver.stream(group.stream)
+        return None
 
     def _synced_to(self, player_id: str) -> str | None:
         """Return player_id of the player this player is synced to."""
@@ -292,26 +319,25 @@ class SnapCastProvider(PlayerProvider):
         snap_group = self._get_snapgroup(player_id)
         return {snap_client for snap_client in snap_group.clients if snap_client != player_id}
 
-    async def _get_empty_stream(self) -> str:
-        """Find or create empty stream on snapcast server.
-
-        This method ensures that there is a snapstream for each snapclient,
-        even if the snapserver only have one stream configured. This is needed
-        because the default config of snapserver is one stream on a named pipe.
-        """
-        used_streams = {group.stream for group in self._snapserver.groups}
-        for stream in self._snapserver.streams:
-            if stream.path == "" and stream.identifier not in used_streams:
-                return stream.identifier
-        port = 4953
-        name = str(uuid.uuid4())
-        while True:
-            new_stream = await self._snapserver.stream_add_stream(
-                f"tcp://0.0.0.0:{port}?name={f'MA_{name}'}&sampleformat=48000:16:2",
+    async def _create_stream(self) -> tuple[Snapstream, int]:
+        """Create new stream on snapcast server."""
+        attempts = 50
+        while attempts:
+            attempts -= 1
+            # pick a random port
+            port = random.randint(4953, 4953 + 200)
+            name = f"MusicAssistant--{port}"
+            result = await self._snapserver.stream_add_stream(
+                # TODO: can we handle 24 bits bit depth ?
+                f"tcp://0.0.0.0:{port}?name={name}&sampleformat=48000:16:2",
             )
-            if "id" in new_stream and new_stream["id"] not in used_streams:
-                return new_stream["id"]
-            port += 1
+            if "id" not in result:
+                # if the port is already taken, the result will be an error
+                self.logger.warning(result)
+                continue
+            stream = self._snapserver.stream(result["id"])
+            return (stream, port)
+        raise RuntimeError("Unable to create stream - No free port found?")
 
     def _get_player_state(self, player_id: str) -> PlayerState:
         """Return the state of the player."""
@@ -323,11 +349,4 @@ class SnapCastProvider(PlayerProvider):
         for child_player_id in self._group_childs(player_id):
             player = self.mass.players.get(child_player_id)
             player.state = state
-            self.mass.players.update(player)
-
-    async def _set_snapclient_empty_stream(self, player_id: str) -> SnapStream:
-        """Set the snapclient stream to empty and return new stream."""
-        new_stream_id = await self._get_empty_stream()
-        await self._get_snapgroup(player_id).set_stream(new_stream_id)
-        stream = self._snapserver.stream(new_stream_id)
-        return stream
+            self.mass.players.update(child_player_id)
index 007795d33761bd3d1c1f9838f56f07bb67ff5205..e35d65231935b722683dd35a0bdda0e6ecba5509 100644 (file)
@@ -14,17 +14,22 @@ from soco.events_base import Event as SonosEvent
 from soco.events_base import SubscriptionBase
 from soco.groups import ZoneGroup
 
-from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
+from music_assistant.common.models.config_entries import (
+    CONF_ENTRY_CROSSFADE,
+    ConfigEntry,
+    ConfigValueType,
+)
 from music_assistant.common.models.enums import (
     ConfigEntryType,
+    ContentType,
     PlayerFeature,
     PlayerState,
     PlayerType,
 )
-from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty
+from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_PLAYERS
+from music_assistant.constants import CONF_CROSSFADE, CONF_PLAYERS
 from music_assistant.server.helpers.didl_lite import create_didl_metadata
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -36,10 +41,10 @@ if TYPE_CHECKING:
 
 
 PLAYER_FEATURES = (
-    PlayerFeature.SET_MEMBERS,
     PlayerFeature.SYNC,
     PlayerFeature.VOLUME_MUTE,
     PlayerFeature.VOLUME_SET,
+    PlayerFeature.ENQUEUE_NEXT,
 )
 
 CONF_NETWORK_SCAN = "network_scan"
@@ -48,6 +53,19 @@ CONF_NETWORK_SCAN = "network_scan"
 # to allow coextistence with HA on the same host
 config.EVENT_LISTENER_PORT = 1700
 
+HIRES_MODELS = (
+    "Sonos Roam",
+    "Sonos Arc",
+    "Sonos Beam",
+    "Sonos Five",
+    "Sonos Move",
+    "Sonos One SL",
+    "Sonos Port",
+    "Sonos Amp",
+    "SYMFONISK Bookshelf",
+    "SYMFONISK Table Lamp",
+)
+
 
 async def setup(
     mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
@@ -92,7 +110,6 @@ class SonosPlayer:
     soco: soco.SoCo
     player: Player
     is_stereo_pair: bool = False
-    next_url: str | None = None
     elapsed_time: int = 0
     playback_started: float | None = None
     need_elapsed_time_workaround: bool = False
@@ -170,7 +187,16 @@ class SonosPlayer:
             self.playback_started = now
 
         # media info (track info)
-        self.player.current_url = self.track_info["uri"]
+        self.player.current_item_id = self.track_info["uri"]
+        if self.player.player_id in self.player.current_item_id:
+            self.player.active_source = self.player.player_id
+        elif "spotify" in self.player.current_item_id:
+            self.player.active_source = "spotify"
+        elif self.player.current_item_id.startswith("http"):
+            self.player.active_source = "http"
+        else:
+            # TODO: handle other possible sources here
+            self.player.active_source = None
         if not self.need_elapsed_time_workaround:
             self.player.elapsed_time = self.elapsed_time
             self.player.elapsed_time_last_updated = self.track_info_updated
@@ -262,12 +288,75 @@ class SonosPlayerProvider(PlayerProvider):
                     player.soco.end_direct_control_session()
         self.sonosplayers = None
 
+    async def get_player_config_entries(
+        self, player_id: str  # noqa: ARG002
+    ) -> tuple[ConfigEntry, ...]:
+        """Return Config Entries for the given player."""
+        base_entries = await super().get_player_config_entries(player_id)
+        if not (sonos_player := self.sonosplayers.get(player_id)):
+            return base_entries
+        return base_entries + (
+            CONF_ENTRY_CROSSFADE,
+            ConfigEntry(
+                key="sonos_bass",
+                type=ConfigEntryType.INTEGER,
+                label="Bass",
+                default_value=0,
+                range=(-10, 10),
+                description="Set the Bass level for the Sonos player",
+                value=sonos_player.soco.bass,
+                advanced=True,
+            ),
+            ConfigEntry(
+                key="sonos_treble",
+                type=ConfigEntryType.INTEGER,
+                label="Treble",
+                default_value=0,
+                range=(-10, 10),
+                description="Set the Treble level for the Sonos player",
+                value=sonos_player.soco.treble,
+                advanced=True,
+            ),
+            ConfigEntry(
+                key="sonos_loudness",
+                type=ConfigEntryType.BOOLEAN,
+                label="Loudness compensation",
+                default_value=True,
+                description="Enable loudness compensation on the Sonos player",
+                value=sonos_player.soco.loudness,
+                advanced=True,
+            ),
+        )
+
     def on_player_config_changed(
         self, config: PlayerConfig, changed_keys: set[str]  # noqa: ARG002
     ) -> None:
         """Call (by config manager) when the configuration of a player changes."""
-        # run discovery to catch any re-enabled players
-        self.mass.create_task(self._run_discovery())
+        if "enabled" in changed_keys:
+            # run discovery to catch any re-enabled players
+            self.mass.create_task(self._run_discovery())
+        if not (sonos_player := self.sonosplayers.get(config.player_id)):
+            return
+        if "values/sonos_bass" in changed_keys:
+            self.mass.create_task(
+                sonos_player.soco.renderingControl.SetBass,
+                [("InstanceID", 0), ("DesiredBass", config.get_value("sonos_bass"))],
+            )
+        if "values/sonos_treble" in changed_keys:
+            self.mass.create_task(
+                sonos_player.soco.renderingControl.SetTreble,
+                [("InstanceID", 0), ("DesiredTreble", config.get_value("sonos_treble"))],
+            )
+        if "values/sonos_loudness" in changed_keys:
+            loudness_value = "1" if config.get_value("sonos_loudness") else "0"
+            self.mass.create_task(
+                sonos_player.soco.renderingControl.SetLoudness,
+                [
+                    ("InstanceID", 0),
+                    ("Channel", "Master"),
+                    ("DesiredLoudness", loudness_value),
+                ],
+            )
 
     async def cmd_stop(self, player_id: str) -> None:
         """Send STOP command to given player."""
@@ -293,49 +382,6 @@ class SonosPlayerProvider(PlayerProvider):
             return
         await asyncio.to_thread(sonos_player.soco.play)
 
-    async def cmd_play_url(
-        self,
-        player_id: str,
-        url: str,
-        queue_item: QueueItem | None,
-    ) -> None:
-        """Send PLAY URL command to given player.
-
-        This is called when the Queue wants the player to start playing a specific url.
-        If an item from the Queue is being played, the QueueItem will be provided with
-        all metadata present.
-
-            - player_id: player_id of the player to handle the command.
-            - url: the url that the player should start playing.
-            - queue_item: the QueueItem that is related to the URL (None when playing direct url).
-        """
-        sonos_player = self.sonosplayers[player_id]
-        if not sonos_player.soco.is_coordinator:
-            self.logger.debug(
-                "Ignore PLAY_MEDIA command for %s: Player is synced to another player.",
-                player_id,
-            )
-            return
-        # always stop and clear queue first
-        sonos_player.next_url = None
-        await asyncio.to_thread(sonos_player.soco.stop)
-        await asyncio.to_thread(sonos_player.soco.clear_queue)
-
-        if queue_item is None:
-            # enforce mp3 radio mode for flow stream
-            url = url.replace(".flac", ".mp3").replace(".wav", ".mp3")
-            await asyncio.to_thread(
-                sonos_player.soco.play_uri, url, title="Music Assistant", force_radio=True
-            )
-        else:
-            await self._enqueue_item(sonos_player, url=url, queue_item=queue_item)
-            await asyncio.to_thread(sonos_player.soco.play_from_queue, 0)
-        # optimistically set this timestamp to help figure out elapsed time later
-        now = time.time()
-        sonos_player.playback_started = now
-        sonos_player.player.elapsed_time = 0
-        sonos_player.player.elapsed_time_last_updated = now
-
     async def cmd_pause(self, player_id: str) -> None:
         """Send PAUSE command to given player."""
         sonos_player = self.sonosplayers[player_id]
@@ -398,6 +444,80 @@ class SonosPlayerProvider(PlayerProvider):
             update_group_info=True,
         )
 
+    async def play_media(
+        self,
+        player_id: str,
+        queue_item: QueueItem,
+        seek_position: int,
+        fade_in: bool,
+    ) -> None:
+        """Handle PLAY MEDIA on given player.
+
+        This is called by the Queue controller to start playing a queue item on the given player.
+        The provider's own implementation should work out how to handle this request.
+
+            - player_id: player_id of the player to handle the command.
+            - queue_item: The QueueItem that needs to be played on the player.
+            - seek_position: Optional seek to this position.
+            - fade_in: Optionally fade in the item at playback start.
+        """
+        url = await self.mass.streams.resolve_stream_url(
+            queue_item=queue_item,
+            output_codec=ContentType.FLAC,
+            seek_position=seek_position,
+            fade_in=fade_in,
+            flow_mode=False,
+        )
+        sonos_player = self.sonosplayers[player_id]
+        if not sonos_player.soco.is_coordinator:
+            # this should be already handled by the player manager, but just in case...
+            raise PlayerCommandFailed(
+                f"Player {sonos_player.player.display_name} can not "
+                "accept play_media command, it is synced to another player."
+            )
+        # always stop and clear queue first
+        await asyncio.to_thread(sonos_player.soco.stop)
+        await asyncio.to_thread(sonos_player.soco.clear_queue)
+        await self._enqueue_item(sonos_player, url=url, queue_item=queue_item)
+        await asyncio.to_thread(sonos_player.soco.play_from_queue, 0)
+        # optimistically set this timestamp to help figure out elapsed time later
+        now = time.time()
+        sonos_player.playback_started = now
+        sonos_player.player.elapsed_time = 0
+        sonos_player.player.elapsed_time_last_updated = now
+
+    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+        """
+        Handle enqueuing of the next queue item on the player.
+
+        If the player supports PlayerFeature.ENQUE_NEXT:
+          This will be called about 10 seconds before the end of the track.
+        If the player does NOT report support for PlayerFeature.ENQUE_NEXT:
+          This will be called when the end of the track is reached.
+
+        A PlayerProvider implementation is in itself responsible for handling this
+        so that the queue items keep playing until its empty or the player stopped.
+
+        This will NOT be called if the end of the queue is reached (and repeat disabled).
+        This will NOT be called if flow mode is enabled on the queue.
+        """
+        sonos_player = self.sonosplayers[player_id]
+        url = await self.mass.streams.resolve_stream_url(
+            queue_item=queue_item,
+            output_codec=ContentType.FLAC,
+        )
+        # set crossfade according to player setting
+        crossfade = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
+        if sonos_player.soco.cross_fade != crossfade:
+
+            def set_crossfade():
+                with suppress(Exception):
+                    sonos_player.soco.cross_fade = crossfade
+
+            await asyncio.to_thread(set_crossfade)
+
+        await self._enqueue_item(sonos_player, url=url, queue_item=queue_item)
+
     async def poll_player(self, player_id: str) -> None:
         """Poll player for state updates.
 
@@ -489,12 +609,16 @@ class SonosPlayerProvider(PlayerProvider):
                     address=soco_device.ip_address,
                     manufacturer=self.name,
                 ),
-                max_sample_rate=48000,
-                supports_24bit=True,
+                max_sample_rate=441000,
+                supports_24bit=False,
             ),
             speaker_info=speaker_info,
             speaker_info_updated=time.time(),
         )
+        if speaker_info["model_name"] in HIRES_MODELS:
+            sonos_player.player.max_sample_rate = 48000
+            sonos_player.player.supports_24bit = True
+
         # poll all endpoints once and update attributes
         await sonos_player.check_poll()
         sonos_player.update_attributes()
@@ -560,36 +684,11 @@ class SonosPlayerProvider(PlayerProvider):
         sonos_player.group_info_updated = time.time()
         asyncio.run_coroutine_threadsafe(self._update_player(sonos_player), self.mass.loop)
 
-    async def _enqueue_next_track(self, sonos_player: SonosPlayer) -> None:
-        """Enqueue the next track of the MA queue on the CC queue."""
-        try:
-            next_url, next_item, crossfade = await self.mass.player_queues.preload_next_url(
-                sonos_player.player_id
-            )
-        except QueueEmpty:
-            return
-
-        if sonos_player.next_url == next_url:
-            return  # already set ?!
-        sonos_player.next_url = next_url
-
-        # set crossfade according to queue mode
-        if sonos_player.soco.cross_fade != crossfade:
-
-            def set_crossfade():
-                with suppress(Exception):
-                    sonos_player.soco.cross_fade = crossfade
-
-            await asyncio.to_thread(set_crossfade)
-
-        # send queue item to sonos queue
-        await self._enqueue_item(sonos_player, url=next_url, queue_item=next_item)
-
     async def _enqueue_item(
         self,
         sonos_player: SonosPlayer,
         url: str,
-        queue_item: QueueItem | None = None,
+        queue_item: QueueItem,
     ) -> None:
         """Enqueue a queue item to the Sonos player Queue."""
         metadata = create_didl_metadata(self.mass, url, queue_item)
@@ -612,13 +711,13 @@ class SonosPlayerProvider(PlayerProvider):
 
     async def _update_player(self, sonos_player: SonosPlayer, signal_update: bool = True) -> None:
         """Update Sonos Player."""
-        prev_url = sonos_player.player.current_url
+        prev_url = sonos_player.player.current_item_id
         prev_state = sonos_player.player.state
         sonos_player.update_attributes()
         sonos_player.player.can_sync_with = tuple(
             x for x in self.sonosplayers if x != sonos_player.player_id
         )
-        current_url = sonos_player.player.current_url
+        current_url = sonos_player.player.current_item_id
         current_state = sonos_player.player.state
 
         if (prev_url != current_url) or (prev_state != current_state):
@@ -635,14 +734,6 @@ class SonosPlayerProvider(PlayerProvider):
             # will detect changes to the player object itself
             self.mass.players.update(sonos_player.player_id)
 
-        # enqueue next item if needed
-        if (
-            sonos_player.player.state == PlayerState.PLAYING
-            and sonos_player.player.active_source == sonos_player.player.player_id
-            and sonos_player.next_url in (None, sonos_player.player.current_url)
-        ):
-            self.mass.create_task(self._enqueue_next_track(sonos_player))
-
 
 def _convert_state(sonos_state: str) -> PlayerState:
     """Convert Sonos state to PlayerState."""
index 4b6f91b47588e5d16b9af173498e2ac10e9ee009..1d2b03781ee15f275cde9e98c097201516d633aa 100644 (file)
@@ -9,8 +9,6 @@ from __future__ import annotations
 import asyncio
 from typing import TYPE_CHECKING
 
-import shortuuid
-
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_EQ_BASS,
     CONF_ENTRY_EQ_MID,
@@ -18,7 +16,6 @@ from music_assistant.common.models.config_entries import (
     CONF_ENTRY_FLOW_MODE,
     CONF_ENTRY_HIDE_GROUP_MEMBERS,
     CONF_ENTRY_OUTPUT_CHANNELS,
-    CONF_ENTRY_OUTPUT_CODEC,
     ConfigEntry,
     ConfigValueOption,
     ConfigValueType,
@@ -38,7 +35,6 @@ if TYPE_CHECKING:
     from music_assistant.common.models.config_entries import ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.server import MusicAssistant
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
     from music_assistant.server.models import ProviderInstanceType
 
 
@@ -61,9 +57,6 @@ CONF_ENTRY_EQ_MID_HIDDEN = ConfigEntry.from_dict({**CONF_ENTRY_EQ_MID.to_dict(),
 CONF_ENTRY_EQ_TREBLE_HIDDEN = ConfigEntry.from_dict(
     {**CONF_ENTRY_EQ_TREBLE.to_dict(), "hidden": True}
 )
-CONF_ENTRY_OUTPUT_CODEC_HIDDEN = ConfigEntry.from_dict(
-    {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "hidden": True}
-)
 CONF_ENTRY_GROUPED_POWER_ON = ConfigEntry(
     key=CONF_GROUPED_POWER_ON,
     type=ConfigEntryType.BOOLEAN,
@@ -170,9 +163,8 @@ class UniversalGroupProvider(PlayerProvider):
                     PlayerFeature.PAUSE,
                     PlayerFeature.VOLUME_SET,
                     PlayerFeature.VOLUME_MUTE,
-                    PlayerFeature.SET_MEMBERS,
                 ),
-                max_sample_rate=96000,
+                max_sample_rate=48000,
                 supports_24bit=True,
                 active_source=conf_key,
                 group_childs=player_conf,
@@ -203,14 +195,6 @@ class UniversalGroupProvider(PlayerProvider):
                 "child players will be treated as (un)mute commands to prevent the small "
                 "interruption of music when the stream is restarted.",
             ),
-            CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO,
-            CONF_ENTRY_FORCED_FLOW_MODE,
-            # group player outputs to individual members so
-            # these settings make no sense, hide them
-            CONF_ENTRY_EQ_BASS_HIDDEN,
-            CONF_ENTRY_EQ_MID_HIDDEN,
-            CONF_ENTRY_EQ_TREBLE_HIDDEN,
-            CONF_ENTRY_OUTPUT_CODEC_HIDDEN,
         )
 
     async def cmd_stop(self, player_id: str) -> None:
@@ -236,36 +220,26 @@ class UniversalGroupProvider(PlayerProvider):
             ):
                 tg.create_task(self.mass.players.cmd_play(member.player_id))
 
-    async def cmd_play_url(
+    async def play_media(
         self,
         player_id: str,
-        url: str,
-        queue_item: QueueItem | None,
+        queue_item: QueueItem,
+        seek_position: int,
+        fade_in: bool,
     ) -> None:
-        """Send PLAY URL command to given player.
+        """Handle PLAY MEDIA on given player.
 
-        This is called when the Queue wants the player to start playing a specific url.
-        If an item from the Queue is being played, the QueueItem will be provided with
-        all metadata present.
+        This is called by the Queue controller to start playing a queue item on the given player.
+        The provider's own implementation should work out how to handle this request.
 
             - player_id: player_id of the player to handle the command.
-            - url: the url that the player should start playing.
-            - queue_item: the QueueItem that is related to the URL (None when playing direct url).
+            - queue_item: The QueueItem that needs to be played on the player.
+            - seek_position: Optional seek to this position.
+            - fade_in: Optionally fade in the item at playback start.
         """
-        # send stop first
-        await self.cmd_stop(player_id)
-        # debounce
-        # this can potentially be called multiple times at the (near) exact time
-        # due to many child players being powered on (or resynced) at the same time
-        # debounce the command a bit by only letting through the last one.
-        self.debounce_id = debounce_id = shortuuid.uuid()
-        await asyncio.sleep(100)
-        if self.debounce_id != debounce_id:
-            return
         # power ON
         await self.cmd_power(player_id, True)
         group_player = self.mass.players.get(player_id)
-
         active_members = self._get_active_members(
             player_id, only_powered=True, skip_sync_childs=True
         )
@@ -279,43 +253,41 @@ class UniversalGroupProvider(PlayerProvider):
 
         group_player.extra_data["optimistic_state"] = PlayerState.PLAYING
 
-        # forward command to all (powered) group child's
+        # forward the command to all (sync master) group child's
         async with asyncio.TaskGroup() as tg:
             for member in active_members:
                 player_prov = self.mass.players.get_player_provider(member.player_id)
                 tg.create_task(
-                    player_prov.cmd_play_url(member.player_id, url=url, queue_item=queue_item)
+                    player_prov.play_media(
+                        member.player_id,
+                        queue_item=queue_item,
+                        seek_position=seek_position,
+                        fade_in=fade_in,
+                    )
                 )
 
-    async def cmd_handle_stream_job(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle StreamJob play command on given player."""
-        #  send stop first
-        await self.cmd_stop(player_id)
-        # power ON
-        await self.cmd_power(player_id, True)
-        group_player = self.mass.players.get(player_id)
+    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+        """
+        Handle enqueuing of the next queue item on the player.
 
-        active_members = self._get_active_members(
-            player_id, only_powered=True, skip_sync_childs=True
-        )
-        if len(active_members) == 0:
-            self.logger.warning(
-                "Play media requested for player %s but no member players are powered, "
-                "the request will be ignored",
-                group_player.display_name,
-            )
-            return
+        If the player supports PlayerFeature.ENQUE_NEXT:
+          This will be called about 10 seconds before the end of the track.
+        If the player does NOT report support for PlayerFeature.ENQUE_NEXT:
+          This will be called when the end of the track is reached.
 
-        group_player.extra_data["optimistic_state"] = PlayerState.PLAYING
+        A PlayerProvider implementation is in itself responsible for handling this
+        so that the queue items keep playing until its empty or the player stopped.
 
-        # forward command to all (powered) group child's
+        This will NOT be called if the end of the queue is reached (and repeat disabled).
+        This will NOT be called if the player is using flow mode to playback the queue.
+        """
+        # forward the command to all (sync master) group child's
         async with asyncio.TaskGroup() as tg:
-            for member in active_members:
+            for member in self._get_active_members(
+                player_id, only_powered=False, skip_sync_childs=True
+            ):
                 player_prov = self.mass.players.get_player_provider(member.player_id)
-                # we forward the stream_job to child to allow for nested groups etc
-                tg.create_task(
-                    player_prov.cmd_handle_stream_job(member.player_id, stream_job=stream_job)
-                )
+                tg.create_task(player_prov.enqueue_next_queue_item(member.player_id, queue_item))
 
     async def cmd_pause(self, player_id: str) -> None:
         """Send PAUSE command to given player."""
@@ -390,14 +362,14 @@ class UniversalGroupProvider(PlayerProvider):
         group_player = self.mass.players.get(player_id)
         if not group_player.powered:
             group_player.state = PlayerState.IDLE
+            group_player.active_source = None
             return
 
         all_members = self._get_active_members(
             player_id, only_powered=False, skip_sync_childs=False
         )
-        if all_members:
-            group_player.max_sample_rate = max(x.max_sample_rate for x in all_members)
         group_player.group_childs = list(x.player_id for x in all_members)
+        group_player.active_source = player_id
         # read the state from the first active group member
         for member in all_members:
             if member.synced_to:
@@ -408,7 +380,7 @@ class UniversalGroupProvider(PlayerProvider):
                 player_powered = member.powered
             if not player_powered:
                 continue
-            group_player.current_url = member.current_url
+            group_player.current_item_id = member.current_item_id
             group_player.elapsed_time = member.elapsed_time
             group_player.elapsed_time_last_updated = member.elapsed_time_last_updated
             group_player.state = member.state