Group playback fixes (#1144)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 15 Mar 2024 00:29:23 +0000 (01:29 +0100)
committerGitHub <noreply@github.com>
Fri, 15 Mar 2024 00:29:23 +0000 (01:29 +0100)
20 files changed:
music_assistant/common/models/media_items.py
music_assistant/server/controllers/player_queues.py
music_assistant/server/controllers/players.py
music_assistant/server/controllers/streams.py
music_assistant/server/controllers/webserver.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/didl_lite.py
music_assistant/server/helpers/process.py
music_assistant/server/helpers/tags.py
music_assistant/server/models/player_provider.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/chromecast/__init__.py
music_assistant/server/providers/dlna/__init__.py
music_assistant/server/providers/fully_kiosk/__init__.py
music_assistant/server/providers/hass_players/__init__.py
music_assistant/server/providers/slimproto/__init__.py
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/providers/sonos/__init__.py
music_assistant/server/providers/spotify/__init__.py
music_assistant/server/providers/ugp/__init__.py

index 5c5e1b637bb6122679ecf05434c440e492d7661d..2727d7b2f1c6167c4cd7e6e49ba4855cb49698b1 100644 (file)
@@ -60,6 +60,12 @@ class AudioFormat(DataClassDictMixin):
         """Return the PCM sample size."""
         return int(self.sample_rate * (self.bit_depth / 8) * self.channels)
 
+    def __eq__(self, other: AudioFormat) -> bool:
+        """Check equality of two items."""
+        if not other:
+            return False
+        return self.output_format_str == other.output_format_str
+
 
 @dataclass(frozen=True, kw_only=True)
 class ProviderMapping(DataClassDictMixin):
index d49bebc920f3480143111eeec3d3c7dbe3a0919b..50fe0c6d0c0832fd8446d9825fd33c70e1c5c2d4 100644 (file)
@@ -821,7 +821,9 @@ class PlayerQueuesController(CoreController):
             try:
                 # Check if the QueueItem is playable. For example, YT Music returns Radio Items
                 # that are not playable which will stop playback.
-                await get_stream_details(mass=self.mass, queue_item=next_item)
+                next_item.streamdetails = await get_stream_details(
+                    mass=self.mass, queue_item=next_item
+                )
                 # Lazy load the full MediaItem for the QueueItem, making sure to get the
                 # maximum quality of thumbs
                 next_item.media_item = await self.mass.music.get_item_by_uri(next_item.uri)
index 3702c3775fa23f9044ffa264eac2946341d50aa8..812c27d9985eae9475724708a58e8225523a1004 100644 (file)
@@ -525,24 +525,26 @@ class PlayerController(CoreController):
 
         if group_player.powered == power:
             return  # nothing to do
+        # make sure to update the group power state
+        group_player.powered = power
 
         # always stop (group/master)player at power off
         if not power and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
             await self.cmd_stop(player_id)
 
         async with asyncio.TaskGroup() as tg:
-            members_powered = False
+            any_member_powered = False
             for member in self.iter_group_members(group_player, only_powered=True):
-                members_powered = True
+                any_member_powered = True
                 if power:
                     # set active source to group player if the group (is going to be) powered
                     member.active_source = group_player.player_id
-                elif member.active_source == group_player.player_id:
+                else:
                     # turn off child player when group turns off
                     tg.create_task(self.cmd_power(member.player_id, False))
                     member.active_source = None
             # edge case: group turned on but no members are powered, power them all!
-            if not members_powered and power:
+            if not any_member_powered and power:
                 for member in self.iter_group_members(group_player, only_powered=False):
                     tg.create_task(self.cmd_power(member.player_id, True))
                     member.active_source = group_player.player_id
index 514e44e81c75e4ba778b90aeac77103c90de3832..d5dfaf971872712755c36c9e8e8d281951d0f96d 100644 (file)
@@ -13,13 +13,12 @@ import logging
 import time
 import urllib.parse
 from collections.abc import AsyncGenerator
-from contextlib import suppress
 from typing import TYPE_CHECKING
 
 import shortuuid
 from aiohttp import web
 
-from music_assistant.common.helpers.util import get_ip, select_free_port
+from music_assistant.common.helpers.util import empty_queue, get_ip, select_free_port
 from music_assistant.common.models.config_entries import (
     ConfigEntry,
     ConfigValueOption,
@@ -33,9 +32,6 @@ from music_assistant.constants import (
     CONF_BIND_PORT,
     CONF_CROSSFADE,
     CONF_CROSSFADE_DURATION,
-    CONF_EQ_BASS,
-    CONF_EQ_MID,
-    CONF_EQ_TREBLE,
     CONF_OUTPUT_CHANNELS,
     CONF_PUBLISH_IP,
     SILENCE_FILE,
@@ -43,10 +39,11 @@ from music_assistant.constants import (
 from music_assistant.server.helpers.audio import (
     check_audio_support,
     crossfade_pcm_parts,
+    get_ffmpeg_stream,
     get_media_stream,
+    get_player_filter_params,
     get_stream_details,
 )
-from music_assistant.server.helpers.process import AsyncProcess
 from music_assistant.server.helpers.util import get_ips
 from music_assistant.server.helpers.webserver import Webserver
 from music_assistant.server.models.core_controller import CoreController
@@ -56,6 +53,7 @@ if TYPE_CHECKING:
     from music_assistant.common.models.player import Player
     from music_assistant.common.models.player_queue import PlayerQueue
     from music_assistant.common.models.queue_item import QueueItem
+    from music_assistant.server import MusicAssistant
 
 
 DEFAULT_STREAM_HEADERS = {
@@ -67,88 +65,94 @@ DEFAULT_STREAM_HEADERS = {
     "icy-name": "Music Assistant",
     "icy-pub": "0",
 }
-FLOW_MAX_SAMPLE_RATE = 192000
+FLOW_MAX_SAMPLE_RATE = 96000
 FLOW_MAX_BIT_DEPTH = 24
 
 # pylint:disable=too-many-locals
 
 
-class MultiClientStreamJob:
+class MultiClientQueueStreamJob:
     """Representation of a (multiclient) Audio Queue stream job/task.
 
-    The whole idea here is that in case of a player (sync)group,
-    all client players receive the exact same (PCM) audio chunks from the source audio.
+    The whole idea here is that the queue stream audio can be sent to multiple
+    players at once. For example for (slimproto/airplay) syncgroups and universal group.
+    all client players receive the exact same audio chunks from the source audio,
+    encoded and/or resampled to the player's preferences.
     A StreamJob is tied to a Queue and streams the queue flow stream,
-    In case a stream is restarted (e.g. when seeking), a new MultiClientStreamJob will be created.
+    In case a stream is restarted (e.g. when seeking),
+    a new MultiClientQueueStreamJob will be created.
     """
 
     _audio_task: asyncio.Task | None = None
 
     def __init__(
         self,
-        stream_controller: StreamsController,
-        queue_id: str,
+        mass: MusicAssistant,
+        pcm_audio_source: AsyncGenerator[bytes, None],
         pcm_format: AudioFormat,
-        start_queue_item: QueueItem,
-        seek_position: int = 0,
-        fade_in: bool = False,
+        expected_players: set[str],
+        auto_start: bool = True,
     ) -> None:
-        """Initialize MultiClientStreamJob instance."""
-        self.stream_controller = stream_controller
-        self.queue_id = queue_id
-        self.queue = self.stream_controller.mass.player_queues.get(queue_id)
-        assert self.queue  # just in case
+        """Initialize MultiClientQueueStreamJob instance."""
+        self.mass = mass
+        self.pcm_audio_source = pcm_audio_source
         self.pcm_format = pcm_format
-        self.start_queue_item = start_queue_item
-        self.seek_position = seek_position
-        self.fade_in = fade_in
+        self.expected_players = expected_players
         self.job_id = shortuuid.uuid()
-        self.expected_players: set[str] = set()
-        self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {}
+        self.auto_start = auto_start
         self.bytes_streamed: int = 0
-        self._all_clients_connected = asyncio.Event()
-        self.logger = stream_controller.logger.getChild(f"streamjob_{self.job_id}")
-        self._finished: bool = False
-        self.workaround_players_seen: set[str] = set()
-        # start running the audio task in the background
-        self._audio_task = asyncio.create_task(self._stream_job_runner())
+        self.logger = self.mass.streams.logger.getChild(f"stream_job.{self.job_id}")
+        self._subscribed_players: dict[str, asyncio.Queue] = {}
+        self._finished = asyncio.Event()
+        self._audio_task: asyncio.Task | None = None
 
     @property
     def finished(self) -> bool:
         """Return if this StreamJob is finished."""
-        return self._finished or self._audio_task and self._audio_task.done()
+        return self._finished.is_set() or self._audio_task and self._audio_task.done()
 
     @property
     def pending(self) -> bool:
         """Return if this Job is pending start."""
-        return not self.finished and not self._all_clients_connected.is_set()
+        return not self.finished and not self._audio_task
 
     @property
     def running(self) -> bool:
         """Return if this Job is running."""
-        return not self.finished and not self.pending
+        return self._audio_task and not self._audio_task.done()
+
+    def start(self) -> None:
+        """Start running (send audio chunks to connected players)."""
+        if self.running:
+            return
+        if self.finished:
+            raise RuntimeError("Task is already finished")
+        self.logger.debug(
+            "Starting multi client stream job %s with %s out of %s connected clients",
+            self.job_id,
+            len(self._subscribed_players),
+            len(self.expected_players),
+        )
+        self._audio_task = asyncio.create_task(self._stream_job_runner())
 
     def stop(self) -> None:
         """Stop running this job."""
-        self._finished = True
         if self._audio_task and self._audio_task.done():
             return
         if self._audio_task:
             self._audio_task.cancel()
-        for sub_queue in self.subscribed_players.values():
-            with suppress(asyncio.QueueFull):
-                sub_queue.put_nowait(b"EOF")
+        self._finished.set()
 
     def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
         """Resolve the childplayer specific stream URL to this streamjob."""
         fmt = output_codec.value
         # handle raw pcm
         if output_codec.is_pcm():
-            player = self.stream_controller.mass.players.get(child_player_id)
+            player = self.mass.streams.mass.players.get(child_player_id)
             player_max_bit_depth = 24 if player.supports_24bit else 16
             output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate)
             output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth)
-            output_channels = self.stream_controller.mass.config.get_raw_player_config_value(
+            output_channels = self.mass.config.get_raw_player_config_value(
                 child_player_id, CONF_OUTPUT_CHANNELS, "stereo"
             )
             channels = 1 if output_channels != "stereo" else 2
@@ -156,98 +160,66 @@ class MultiClientStreamJob:
                 f";codec=pcm;rate={output_sample_rate};"
                 f"bitrate={output_bit_depth};channels={channels}"
             )
-        url = f"{self.stream_controller._server.base_url}/{self.queue_id}/multi/{self.job_id}/{child_player_id}/{self.start_queue_item.queue_item_id}.{fmt}"  # noqa: E501
+        url = f"{self.mass.streams._server.base_url}/multi/{self.job_id}/{child_player_id}.{fmt}"
         self.expected_players.add(child_player_id)
         return url
 
-    async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]:
-        """Subscribe consumer and iterate incoming chunks on the queue."""
-        if (
-            player_id in self.stream_controller.workaround_players
-            and player_id not in self.workaround_players_seen
+    async def subscribe(
+        self, player_id: str, output_format: AudioFormat, chunk_size: int | None = None
+    ) -> AsyncGenerator[bytes, None]:
+        """Subscribe consumer and iterate chunks on the queue encoded to given output format."""
+        async for chunk in get_ffmpeg_stream(
+            audio_input=self._subscribe_pcm(player_id),
+            input_format=self.pcm_format,
+            output_format=output_format,
+            filter_params=get_player_filter_params(self.mass, player_id),
+            chunk_size=chunk_size,
         ):
-            self.workaround_players_seen.add(player_id)
-            yield b"EOF"
-            return
+            yield chunk
 
+    async def _subscribe_pcm(self, player_id: str) -> AsyncGenerator[bytes, None]:
+        """Subscribe consumer and iterate incoming (raw pcm) chunks on the queue."""
         try:
-            self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2)
-
-            if self._all_clients_connected.is_set():
-                # client subscribes while we're already started - we dont support that (for now?)
-                msg = f"Client {player_id} is joining while the stream is already started"
-                raise RuntimeError(msg)
-            self.logger.debug("Subscribed client %s", player_id)
+            self._subscribed_players[player_id] = queue = asyncio.Queue(1)
+
+            if self.running:
+                # client subscribes while we're already started
+                # that will probably cause side effects but let it go
+                self.logger.warning(
+                    "Player %s is joining while the stream is already started!", player_id
+                )
+            else:
+                self.logger.debug("Subscribed player %s", player_id)
 
-            if len(self.subscribed_players) == len(self.expected_players):
+            await asyncio.sleep(0.2)  # debounce
+            if (
+                self.auto_start
+                and not self.running
+                and len(self._subscribed_players) == len(self.expected_players)
+            ):
                 # we reached the number of expected subscribers, set event
                 # so that chunks can be pushed
-                self._all_clients_connected.set()
-
-            # keep reading audio chunks from the queue until we receive an EOF chunk
-            while True:
-                chunk = await sub_queue.get()
-                if chunk == b"EOF":
-                    # EOF chunk received
-                    break
-                yield chunk
+                self.start()
+            # yield from queue until finished
+            while not self._finished.is_set():
+                yield await queue.get()
         finally:
-            self.subscribed_players.pop(player_id, None)
+            if sub_queue := self._subscribed_players.pop(player_id, None):
+                empty_queue(sub_queue)
             self.logger.debug("Unsubscribed client %s", player_id)
             # check if this was the last subscriber and we should cancel
             await asyncio.sleep(2)
-            if len(self.subscribed_players) == 0 and self._audio_task and not self.finished:
+            if len(self._subscribed_players) == 0 and not self.finished:
                 self.logger.debug("Cleaning up, all clients disappeared...")
-                self._audio_task.cancel()
-
-    async def _put_chunk(self, chunk: bytes) -> None:
-        """Put chunk of data to all subscribers."""
-        async with asyncio.TaskGroup() as tg:
-            for sub_queue in list(self.subscribed_players.values()):
-                # put this chunk on the player's subqueue
-                tg.create_task(sub_queue.put(chunk))
-        self.bytes_streamed += len(chunk)
+                self.stop()
 
     async def _stream_job_runner(self) -> None:
         """Feed audio chunks to StreamJob subscribers."""
-        chunk_num = 0
-        async for chunk in self.stream_controller.get_flow_stream(
-            self.queue,
-            self.start_queue_item,
-            self.pcm_format,
-            self.seek_position,
-            self.fade_in,
-        ):
-            chunk_num += 1
-            if chunk_num == 1:
-                # wait until all expected clients are connected
-                try:
-                    async with asyncio.timeout(10):
-                        await self._all_clients_connected.wait()
-                except TimeoutError:
-                    if len(self.subscribed_players) == 0:
-                        self.stream_controller.logger.exception(
-                            "Abort multi client stream job for queue %s: "
-                            "clients did not connect within timeout",
-                            self.queue.display_name,
-                        )
-                        break
-                    # not all clients connected but timeout expired, set flag and move on
-                    # with all clients that did connect
-                    self._all_clients_connected.set()
-                else:
-                    self.stream_controller.logger.debug(
-                        "Starting multi client stream job for queue %s "
-                        "with %s out of %s connected clients",
-                        self.queue.display_name,
-                        len(self.subscribed_players),
-                        len(self.expected_players),
-                    )
-
-            await self._put_chunk(chunk)
-
-        # mark EOF with EOF chunk
-        await self._put_chunk(b"EOF")
+        async for chunk in self.pcm_audio_source:
+            async with asyncio.TaskGroup() as tg:
+                for listener_queue in list(self._subscribed_players.values()):
+                    tg.create_task(listener_queue.put(chunk))
+        self._finished.set()
 
 
 def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
@@ -270,7 +242,7 @@ class StreamsController(CoreController):
         """Initialize instance."""
         super().__init__(*args, **kwargs)
         self._server = Webserver(self.logger, enable_dynamic_routes=True)
-        self.multi_client_jobs: dict[str, MultiClientStreamJob] = {}
+        self.multi_client_jobs: dict[str, MultiClientQueueStreamJob] = {}
         self.register_dynamic_route = self._server.register_dynamic_route
         self.unregister_dynamic_route = self._server.unregister_dynamic_route
         self.manifest.name = "Streamserver"
@@ -280,7 +252,6 @@ class StreamsController(CoreController):
             "some player specific local control callbacks."
         )
         self.manifest.icon = "cast-audio"
-        self.workaround_players: set[str] = set()
 
     @property
     def base_url(self) -> str:
@@ -358,22 +329,22 @@ class StreamsController(CoreController):
             static_routes=[
                 (
                     "*",
-                    "/{queue_id}/multi/{job_id}/{player_id}/{queue_item_id}.{fmt}",
+                    "/multi/{job_id}/{player_id}.{fmt}",
                     self.serve_multi_subscriber_stream,
                 ),
                 (
                     "*",
-                    "/{queue_id}/flow/{queue_item_id}.{fmt}",
+                    "/flow/{queue_id}/{queue_item_id}.{fmt}",
                     self.serve_queue_flow_stream,
                 ),
                 (
                     "*",
-                    "/{queue_id}/single/{queue_item_id}.{fmt}",
+                    "/single/{queue_id}/{queue_item_id}.{fmt}",
                     self.serve_queue_item_stream,
                 ),
                 (
                     "*",
-                    "/{queue_id}/command/{command}.mp3",
+                    "/command/{queue_id}/{command}.mp3",
                     self.serve_command_request,
                 ),
             ],
@@ -385,6 +356,7 @@ class StreamsController(CoreController):
 
     async def resolve_stream_url(
         self,
+        player_id: str,
         queue_item: QueueItem,
         output_codec: ContentType,
         seek_position: int = 0,
@@ -393,12 +365,16 @@ class StreamsController(CoreController):
     ) -> str:
         """Resolve the stream URL for the given QueueItem."""
         fmt = output_codec.value
+        # handle request for multi client queue stream
+        stream_job = self.multi_client_jobs.get(queue_item.queue_id)
+        if queue_item.queue_item_id == "flow" or stream_job and stream_job.pending:
+            return stream_job.resolve_stream_url(player_id, output_codec)
         # handle raw pcm without exact format specifiers
         if output_codec.is_pcm() and ";" not in fmt:
             fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
         query_params = {}
         base_path = "flow" if flow_mode else "single"
-        url = f"{self._server.base_url}/{queue_item.queue_id}/{base_path}/{queue_item.queue_item_id}.{fmt}"  # noqa: E501
+        url = f"{self._server.base_url}/{base_path}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}"  # noqa: E501
         if seek_position:
             query_params["seek_position"] = str(seek_position)
         if fade_in:
@@ -418,29 +394,39 @@ class StreamsController(CoreController):
         fade_in: bool = False,
         pcm_bit_depth: int = 24,
         pcm_sample_rate: int = 48000,
-    ) -> MultiClientStreamJob:
-        """Create a MultiClientStreamJob for the given queue..
+        expected_players: set[str] | None = None,
+        auto_start: bool = True,
+    ) -> MultiClientQueueStreamJob:
+        """
+        Create a MultiClientQueueStreamJob for the given queue..
 
         This is called by player/sync group implementations to start streaming
         the queue audio to multiple players at once.
         """
-        if existing_job := self.multi_client_jobs.pop(queue_id, None):
+        if existing_job := self.multi_client_jobs.get(queue_id, None):
+            if existing_job.pending:
+                return existing_job
             # cleanup existing job first
-            if not existing_job.finished:
-                self.logger.warning("Detected existing (running) stream job for queue %s", queue_id)
-                existing_job.stop()
-        self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob(
-            self,
-            queue_id=queue_id,
-            pcm_format=AudioFormat(
-                content_type=ContentType.from_bit_depth(pcm_bit_depth),
-                sample_rate=pcm_sample_rate,
-                bit_depth=pcm_bit_depth,
-                channels=2,
+            existing_job.stop()
+        queue = self.mass.player_queues.get(queue_id)
+        pcm_format = AudioFormat(
+            content_type=ContentType.from_bit_depth(pcm_bit_depth),
+            sample_rate=pcm_sample_rate,
+            bit_depth=pcm_bit_depth,
+            channels=2,
+        )
+        self.multi_client_jobs[queue_id] = stream_job = MultiClientQueueStreamJob(
+            self.mass,
+            pcm_audio_source=self.get_flow_stream(
+                queue=queue,
+                start_queue_item=start_queue_item,
+                pcm_format=pcm_format,
+                seek_position=seek_position,
+                fade_in=fade_in,
             ),
-            start_queue_item=start_queue_item,
-            seek_position=seek_position,
-            fade_in=fade_in,
+            pcm_format=pcm_format,
+            expected_players=expected_players or set(),
+            auto_start=auto_start,
         )
         return stream_job
 
@@ -496,7 +482,6 @@ class StreamsController(CoreController):
             queue.display_name,
         )
         queue.index_in_buffer = self.mass.player_queues.index_by_id(queue_id, queue_item_id)
-        # collect player specific ffmpeg args to re-encode the source PCM stream
         pcm_format = AudioFormat(
             content_type=ContentType.from_bit_depth(
                 queue_item.streamdetails.audio_format.bit_depth
@@ -504,39 +489,22 @@ class StreamsController(CoreController):
             sample_rate=queue_item.streamdetails.audio_format.sample_rate,
             bit_depth=queue_item.streamdetails.audio_format.bit_depth,
         )
-        ffmpeg_args = await self._get_player_ffmpeg_args(
-            queue_player,
+        async for chunk in get_ffmpeg_stream(
+            audio_input=get_media_stream(
+                self.mass,
+                streamdetails=queue_item.streamdetails,
+                pcm_format=pcm_format,
+                seek_position=seek_position,
+                fade_in=fade_in,
+            ),
             input_format=pcm_format,
             output_format=output_format,
-        )
-
-        async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc:
-            # feed stdin with pcm audio chunks from origin
-            async def read_audio() -> None:
-                try:
-                    async for _, chunk in get_media_stream(
-                        self.mass,
-                        streamdetails=queue_item.streamdetails,
-                        pcm_format=pcm_format,
-                        seek_position=seek_position,
-                        fade_in=fade_in,
-                    ):
-                        try:
-                            await ffmpeg_proc.write(chunk)
-                        except BrokenPipeError:
-                            break
-                finally:
-                    ffmpeg_proc.write_eof()
-
-            ffmpeg_proc.attach_task(read_audio())
-
-            # read final chunks from stdout
-            async for chunk in ffmpeg_proc.iter_any(768000):
-                try:
-                    await resp.write(chunk)
-                except (BrokenPipeError, ConnectionResetError):
-                    # race condition
-                    break
+            filter_params=get_player_filter_params(self.mass, queue_player.player_id),
+        ):
+            try:
+                await resp.write(chunk)
+            except (BrokenPipeError, ConnectionResetError):
+                break
 
         return resp
 
@@ -544,8 +512,7 @@ class StreamsController(CoreController):
         """Stream Queue Flow audio to player."""
         self._log_request(request)
         queue_id = request.match_info["queue_id"]
-        queue = self.mass.player_queues.get(queue_id)
-        if not queue:
+        if not (queue := self.mass.player_queues.get(queue_id)):
             raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
         start_queue_item_id = request.match_info["queue_item_id"]
         start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
@@ -585,105 +552,92 @@ class StreamsController(CoreController):
         # all checks passed, start streaming!
         self.logger.debug("Start serving Queue flow audio stream for %s", queue_player.name)
 
-        # collect player specific ffmpeg args to re-encode the source PCM stream
         pcm_format = AudioFormat(
             content_type=ContentType.from_bit_depth(output_format.bit_depth),
             sample_rate=output_format.sample_rate,
             bit_depth=output_format.bit_depth,
             channels=2,
         )
-        ffmpeg_args = await self._get_player_ffmpeg_args(
-            queue_player,
+        async for chunk in get_ffmpeg_stream(
+            audio_input=self.get_flow_stream(
+                queue=queue,
+                start_queue_item=start_queue_item,
+                pcm_format=pcm_format,
+                seek_position=seek_position,
+                fade_in=fade_in,
+            ),
             input_format=pcm_format,
             output_format=output_format,
-        )
-
-        async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc:
-            # feed stdin with pcm audio chunks from origin
-            async def read_audio() -> None:
-                try:
-                    async for chunk in self.get_flow_stream(
-                        queue=queue,
-                        start_queue_item=start_queue_item,
-                        pcm_format=pcm_format,
-                        seek_position=seek_position,
-                        fade_in=fade_in,
-                    ):
-                        try:
-                            await ffmpeg_proc.write(chunk)
-                        except BrokenPipeError:
-                            break
-                finally:
-                    ffmpeg_proc.write_eof()
-
-            ffmpeg_proc.attach_task(read_audio())
-
-            # read final chunks from stdout
-            iterator = (
-                ffmpeg_proc.iter_chunked(icy_meta_interval)
-                if enable_icy
-                else ffmpeg_proc.iter_any(768000)
-            )
-            async for chunk in iterator:
-                try:
-                    await resp.write(chunk)
-                except (BrokenPipeError, ConnectionResetError):
-                    # race condition
-                    break
-
-                if not enable_icy:
-                    continue
-
-                # if icy metadata is enabled, send the icy metadata after the chunk
-                if (
-                    # use current item here and not buffered item, otherwise
-                    # the icy metadata will be too much ahead
-                    (current_item := queue.current_item)
-                    and current_item.streamdetails
-                    and current_item.streamdetails.stream_title
-                ):
-                    title = current_item.streamdetails.stream_title
-                elif queue and current_item and current_item.name:
-                    title = current_item.name
-                else:
-                    title = "Music Assistant"
-                metadata = f"StreamTitle='{title}';".encode()
-                if current_item and current_item.image:
-                    metadata += f"StreamURL='{current_item.image.path}'".encode()
-                while len(metadata) % 16 != 0:
-                    metadata += b"\x00"
-                length = len(metadata)
-                length_b = chr(int(length / 16)).encode()
-                await resp.write(length_b + metadata)
+            filter_params=get_player_filter_params(self.mass, queue_player.player_id),
+            chunk_size=icy_meta_interval if enable_icy else None,
+        ):
+            try:
+                await resp.write(chunk)
+            except (BrokenPipeError, ConnectionResetError):
+                break
+
+            if not enable_icy:
+                continue
+
+            # if icy metadata is enabled, send the icy metadata after the chunk
+            if (
+                # use current item here and not buffered item, otherwise
+                # the icy metadata will be too much ahead
+                (current_item := queue.current_item)
+                and current_item.streamdetails
+                and current_item.streamdetails.stream_title
+            ):
+                title = current_item.streamdetails.stream_title
+            elif queue and current_item and current_item.name:
+                title = current_item.name
+            else:
+                title = "Music Assistant"
+            metadata = f"StreamTitle='{title}';".encode()
+            if current_item and current_item.image:
+                metadata += f"StreamURL='{current_item.image.path}'".encode()
+            while len(metadata) % 16 != 0:
+                metadata += b"\x00"
+            length = len(metadata)
+            length_b = chr(int(length / 16)).encode()
+            await resp.write(length_b + metadata)
 
         return resp
 
     async def serve_multi_subscriber_stream(self, request: web.Request) -> web.Response:
         """Stream Queue Flow audio to a child player within a multi subscriber setup."""
         self._log_request(request)
-        queue_id = request.match_info["queue_id"]
-        streamjob = self.multi_client_jobs.get(queue_id)
-        if not streamjob:
-            raise web.HTTPNotFound(reason=f"Unknown StreamJob for queue: {queue_id}")
         job_id = request.match_info["job_id"]
-        if job_id != streamjob.job_id:
-            raise web.HTTPNotFound(reason=f"StreamJob ID {job_id} mismatch for queue: {queue_id}")
-        child_player_id = request.match_info["player_id"]
-        child_player = self.mass.players.get(child_player_id)
+        for queue_id, stream_job in self.multi_client_jobs.items():
+            if stream_job.job_id == job_id:
+                break
+        else:
+            raise web.HTTPNotFound(reason=f"Unknown StreamJob: {job_id}")
+        if stream_job.finished:
+            raise web.HTTPNotFound(reason=f"StreamJob {job_id} already finished")
+        if not (queue := self.mass.player_queues.get(queue_id)):
+            raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
+
+        player_id = request.match_info["player_id"]
+        child_player = self.mass.players.get(player_id)
         if not child_player:
-            raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
+            raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
         # work out (childplayer specific!) output format/details
         output_format = await self._get_output_format(
             output_format_str=request.match_info["fmt"],
             queue_player=child_player,
-            default_sample_rate=streamjob.pcm_format.sample_rate,
-            default_bit_depth=streamjob.pcm_format.bit_depth,
+            default_sample_rate=stream_job.pcm_format.sample_rate,
+            default_bit_depth=stream_job.pcm_format.bit_depth,
         )
         # prepare request, add some DLNA/UPNP compatible headers
+        enable_icy = request.headers.get("Icy-MetaData", "") == "1"
+        icy_meta_interval = 16384 * 4 if output_format.content_type.is_lossless() else 16384
         headers = {
             **DEFAULT_STREAM_HEADERS,
             "Content-Type": f"audio/{output_format.output_format_str}",
         }
+        if enable_icy:
+            headers["icy-metaint"] = str(icy_meta_interval)
+
         resp = web.StreamResponse(
             status=200,
             reason="OK",
@@ -695,53 +649,43 @@ class StreamsController(CoreController):
         if request.method != "GET":
             return resp
 
-        # some players (e.g. dlna, sonos) misbehave and do multiple GET requests
-        # to the stream in an attempt to get the audio details such as duration
-        # which is a bit pointless for our duration-less queue stream
-        # and it completely messes with the subscription logic
-        if child_player_id in streamjob.subscribed_players:
-            self.logger.warning(
-                "Player %s is making multiple requests "
-                "to the same stream, playback may be disturbed!",
-                child_player_id,
-            )
-            self.workaround_players.add(child_player_id)
-
         # all checks passed, start streaming!
         self.logger.debug(
             "Start serving multi-subscriber Queue flow audio stream for queue %s to player %s",
-            streamjob.queue.display_name,
+            queue.display_name,
             child_player.display_name,
         )
-
-        # collect player specific ffmpeg args to re-encode the source PCM stream
-        ffmpeg_args = await self._get_player_ffmpeg_args(
-            child_player,
-            input_format=streamjob.pcm_format,
-            output_format=output_format,
-        )
-
-        async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc:
-            # feed stdin with pcm audio chunks from origin
-            async def read_audio() -> None:
-                try:
-                    async for chunk in streamjob.subscribe(child_player_id):
-                        try:
-                            await ffmpeg_proc.write(chunk)
-                        except BrokenPipeError:
-                            break
-                finally:
-                    ffmpeg_proc.write_eof()
-
-            ffmpeg_proc.attach_task(read_audio())
-
-            # read final chunks from stdout
-            async for chunk in ffmpeg_proc.iter_any(768000):
-                try:
-                    await resp.write(chunk)
-                except (BrokenPipeError, ConnectionResetError):
-                    # race condition
-                    break
+        async for chunk in stream_job.subscribe(
+            player_id, output_format, chunk_size=icy_meta_interval if enable_icy else None
+        ):
+            try:
+                await resp.write(chunk)
+            except (BrokenPipeError, ConnectionResetError):
+                break
+            if not enable_icy:
+                continue
+
+            # if icy metadata is enabled, send the icy metadata after the chunk
+            if (
+                # use current item here and not buffered item, otherwise
+                # the icy metadata will be too much ahead
+                (current_item := queue.current_item)
+                and current_item.streamdetails
+                and current_item.streamdetails.stream_title
+            ):
+                title = current_item.streamdetails.stream_title
+            elif queue and current_item and current_item.name:
+                title = current_item.name
+            else:
+                title = "Music Assistant"
+            metadata = f"StreamTitle='{title}';".encode()
+            if current_item and current_item.image:
+                metadata += f"StreamURL='{current_item.image.path}'".encode()
+            while len(metadata) % 16 != 0:
+                metadata += b"\x00"
+            length = len(metadata)
+            length_b = chr(int(length / 16)).encode()
+            await resp.write(length_b + metadata)
 
         return resp
 
@@ -756,7 +700,7 @@ class StreamsController(CoreController):
 
     def get_command_url(self, player_or_queue_id: str, command: str) -> str:
         """Get the url for the special command stream."""
-        return f"{self.base_url}/{player_or_queue_id}/command/{command}.mp3"
+        return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
 
     async def get_flow_stream(
         self,
@@ -821,7 +765,7 @@ class StreamsController(CoreController):
             bytes_written = 0
             buffer = b""
             # handle incoming audio chunks
-            async for is_last_chunk, chunk in get_media_stream(
+            async for chunk in get_media_stream(
                 self.mass,
                 queue_track.streamdetails,
                 pcm_format=pcm_format,
@@ -831,21 +775,14 @@ class StreamsController(CoreController):
                 strip_silence_begin=use_crossfade,
                 strip_silence_end=use_crossfade,
             ):
-                # throttle buffer, do not allow more than 30 seconds in player's own buffer
-                seconds_buffered = (total_bytes_written + bytes_written) / pcm_sample_size
-                player = self.mass.players.get(queue.queue_id)
-                if seconds_buffered > 60 and player.corrected_elapsed_time > 30:
-                    while (seconds_buffered - player.corrected_elapsed_time) > 30:
-                        await asyncio.sleep(1)
-
                 # ALWAYS APPEND CHUNK TO BUFFER
                 buffer += chunk
-                if not is_last_chunk and len(buffer) < buffer_size:
+                if len(buffer) < buffer_size:
                     # buffer is not full enough, move on
                     continue
 
                 ####  HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
-                if not is_last_chunk and last_fadeout_part:
+                if last_fadeout_part:
                     # perform crossfade
                     fadein_part = buffer[:crossfade_size]
                     remaining_bytes = buffer[crossfade_size:]
@@ -867,22 +804,6 @@ class StreamsController(CoreController):
                     last_fadeout_part = b""
                     buffer = b""
 
-                #### HANDLE END OF TRACK
-                elif is_last_chunk:
-                    if use_crossfade:
-                        # if crossfade is enabled, save fadeout part to pickup for next track
-                        last_fadeout_part = buffer[-crossfade_size:]
-                        remaining_bytes = buffer[:-crossfade_size]
-                        yield remaining_bytes
-                        bytes_written += len(remaining_bytes)
-                        del remaining_bytes
-                    else:
-                        # no crossfade enabled, just yield the (entire) buffer last part
-                        yield buffer
-                        bytes_written += len(buffer)
-                    # clear vars
-                    buffer = b""
-
                 #### OTHER: enough data in buffer, feed to output
                 else:
                     chunk_size = len(chunk)
@@ -890,6 +811,26 @@ class StreamsController(CoreController):
                     bytes_written += chunk_size
                     buffer = buffer[chunk_size:]
 
+            #### HANDLE END OF TRACK
+            if last_fadeout_part:
+                # edge case: we did not get enough data to make the crossfade
+                yield last_fadeout_part
+                bytes_written += len(last_fadeout_part)
+                last_fadeout_part = b""
+            if use_crossfade:
+                # if crossfade is enabled, save fadeout part to pickup for next track
+                last_fadeout_part = buffer[-crossfade_size:]
+                remaining_bytes = buffer[:-crossfade_size]
+                yield remaining_bytes
+                bytes_written += len(remaining_bytes)
+                del remaining_bytes
+            else:
+                # no crossfade enabled, just yield the (entire) buffer last part
+                yield buffer
+                bytes_written += len(buffer)
+            # clear vars
+            buffer = b""
+
             # update duration details based on the actual pcm data we sent
             # this also accounts for crossfade and silence stripping
             queue_track.streamdetails.seconds_streamed = bytes_written / pcm_sample_size
@@ -909,92 +850,6 @@ class StreamsController(CoreController):
             yield last_fadeout_part
         self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
 
-    async def _get_player_ffmpeg_args(
-        self,
-        player: Player,
-        input_format: AudioFormat,
-        output_format: AudioFormat,
-    ) -> list[str]:
-        """Get player specific arguments for the given (pcm) input and output details."""
-        # generic args
-        generic_args = [
-            "ffmpeg",
-            "-hide_banner",
-            "-loglevel",
-            "warning" if self.logger.isEnabledFor(logging.DEBUG) else "quiet",
-            "-ignore_unknown",
-        ]
-        # input args
-        input_args = [
-            "-f",
-            input_format.content_type.value,
-            "-ac",
-            str(input_format.channels),
-            "-channel_layout",
-            "mono" if input_format.channels == 1 else "stereo",
-            "-ar",
-            str(input_format.sample_rate),
-            "-i",
-            "-",
-        ]
-        # select output args
-        if output_format.content_type == ContentType.FLAC:
-            # set compression level to 0 to prevent issues with cast players
-            output_args = ["-f", "flac", "-compression_level", "0"]
-        elif output_format.content_type == ContentType.AAC:
-            output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "320k"]
-        elif output_format.content_type == ContentType.MP3:
-            output_args = ["-f", "mp3", "-c:a", "mp3", "-b:a", "320k"]
-        else:
-            output_args = ["-f", output_format.content_type.value]
-
-        # append channels
-        output_args += ["-ac", str(output_format.channels)]
-        # append sample rate (if codec is lossless)
-        if output_format.content_type.is_lossless():
-            output_args += ["-ar", str(output_format.sample_rate)]
-        # append output = pipe
-        output_args += ["-"]
-
-        # collect extra and filter args
-        # TODO: add convolution/DSP/roomcorrections here!
-        extra_args = []
-        filter_params = []
-
-        # the below is a very basic 3-band equalizer,
-        # this could be a lot more sophisticated at some point
-        if (
-            eq_bass := self.mass.config.get_raw_player_config_value(
-                player.player_id, CONF_EQ_BASS, 0
-            )
-        ) != 0:
-            filter_params.append(f"equalizer=frequency=100:width=200:width_type=h:gain={eq_bass}")
-        if (
-            eq_mid := self.mass.config.get_raw_player_config_value(player.player_id, CONF_EQ_MID, 0)
-        ) != 0:
-            filter_params.append(f"equalizer=frequency=900:width=1800:width_type=h:gain={eq_mid}")
-        if (
-            eq_treble := self.mass.config.get_raw_player_config_value(
-                player.player_id, CONF_EQ_TREBLE, 0
-            )
-        ) != 0:
-            filter_params.append(
-                f"equalizer=frequency=9000:width=18000:width_type=h:gain={eq_treble}"
-            )
-        # handle output mixing only left or right
-        conf_channels = self.mass.config.get_raw_player_config_value(
-            player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
-        )
-        if conf_channels == "left":
-            filter_params.append("pan=mono|c0=FL")
-        elif conf_channels == "right":
-            filter_params.append("pan=mono|c0=FR")
-
-        if filter_params:
-            extra_args += ["-af", ",".join(filter_params)]
-
-        return generic_args + input_args + extra_args + output_args
-
     def _log_request(self, request: web.Request) -> None:
         """Log request."""
         if not self.logger.isEnabledFor(logging.DEBUG):
index 507f42d52c973c11d40e513811ef27fa75fde7a8..dd726900e77b9277d3167e378180bbaf2cd66e0f 100644 (file)
@@ -375,7 +375,7 @@ class WebsocketClientHandler:
                 result = await result
             self._send_message(SuccessResultMessage(msg.message_id, result))
         except Exception as err:  # pylint: disable=broad-except
-            if self.log_level == "VERBOSE":
+            if self._logger.isEnabledFor(logging.DEBUG):
                 self._logger.exception("Error handling message: %s", msg)
             else:
                 self._logger.error("Error handling message: %s: %s", msg.command, str(err))
index 298042a40ac6bd9742f25f2d21808c9a7fbba1e4..529bd4f693533b15975c617b4af02a44f525a9c3 100644 (file)
@@ -25,6 +25,10 @@ from music_assistant.common.models.errors import (
 from music_assistant.common.models.media_items import AudioFormat, ContentType, MediaType
 from music_assistant.common.models.streamdetails import LoudnessMeasurement, StreamDetails
 from music_assistant.constants import (
+    CONF_EQ_BASS,
+    CONF_EQ_MID,
+    CONF_EQ_TREBLE,
+    CONF_OUTPUT_CHANNELS,
     CONF_VOLUME_NORMALIZATION,
     CONF_VOLUME_NORMALIZATION_TARGET,
     ROOT_LOGGER_NAME,
@@ -204,9 +208,7 @@ async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) -
             enable_stdout=False,
             enable_stderr=True,
         ) as ffmpeg_proc:
-
-            async def writer() -> None:
-                """Task that grabs the source audio and feeds it to ffmpeg."""
+            if streamdetails.direct is None:
                 music_prov = mass.get_provider(streamdetails.provider)
                 chunk_count = 0
                 async for audio_chunk in music_prov.get_audio_stream(streamdetails):
@@ -217,11 +219,6 @@ async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) -
                         break
                 ffmpeg_proc.write_eof()
 
-            if streamdetails.direct is None:
-                writer_task = ffmpeg_proc.attach_task(writer())
-                # wait for the writer task to finish
-                await writer_task
-
             _, stderr = await ffmpeg_proc.communicate()
             if loudness_details := _parse_loudnorm(stderr):
                 LOGGER.debug("Loudness measurement for %s: %s", item_name, loudness_details)
@@ -384,6 +381,7 @@ async def get_media_stream(  # noqa: PLR0915
     Other than stripping silence at end and beginning and optional
     volume normalization this is the pure, unaltered audio data as PCM chunks.
     """
+    logger = LOGGER.getChild("media_stream")
     bytes_sent = 0
     streamdetails.seconds_skipped = seek_position
     is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
@@ -397,127 +395,159 @@ async def get_media_stream(  # noqa: PLR0915
         strip_silence_end = False
 
     # collect all arguments for ffmpeg
+    filter_params = []
+    extra_args = []
     seek_pos = seek_position if (streamdetails.direct or not streamdetails.can_seek) else 0
-    args = await _get_ffmpeg_args(
-        streamdetails=streamdetails,
-        pcm_output_format=pcm_format,
+    if seek_pos:
         # only use ffmpeg seeking if the provider stream does not support seeking
-        seek_position=seek_pos,
-        fade_in=fade_in,
+        extra_args += ["-ss", str(seek_pos)]
+    if streamdetails.target_loudness is not None:
+        # add loudnorm filters
+        filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5"
+        if streamdetails.loudness:
+            filter_rule += f":measured_I={streamdetails.loudness.integrated}"
+            filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
+            filter_rule += f":measured_tp={streamdetails.loudness.true_peak}"
+            filter_rule += f":measured_thresh={streamdetails.loudness.threshold}"
+        filter_rule += ":print_format=json"
+        filter_params.append(filter_rule)
+    if fade_in:
+        filter_params.append("afade=type=in:start_time=0:duration=3")
+    ffmpeg_args = await _get_ffmpeg_args(
+        input_format=streamdetails.audio_format,
+        output_format=pcm_format,
+        filter_params=filter_params,
+        extra_args=extra_args,
+        input_path=streamdetails.direct or "-",
     )
 
-    async with AsyncProcess(
-        args, enable_stdin=streamdetails.direct is None, enable_stderr=True
-    ) as ffmpeg_proc:
-        LOGGER.debug("start media stream for: %s", streamdetails.uri)
-
-        async def writer() -> None:
-            """Task that grabs the source audio and feeds it to ffmpeg."""
-            LOGGER.debug("writer started for %s", streamdetails.uri)
-            music_prov = mass.get_provider(streamdetails.provider)
-            seek_pos = seek_position if streamdetails.can_seek else 0
-            async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
-                await ffmpeg_proc.write(audio_chunk)
-            # write eof when last packet is received
-            ffmpeg_proc.write_eof()
-            LOGGER.debug("writer finished for %s", streamdetails.uri)
-
-        if streamdetails.direct is None:
-            ffmpeg_proc.attach_task(writer())
-
-        # get pcm chunks from stdout
-        # we always stay one chunk behind to properly detect end of chunks
-        # so we can strip silence at the beginning and end of a track
-        prev_chunk = b""
-        chunk_num = 0
-        try:
-            async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
-                chunk_num += 1
-                if strip_silence_begin and chunk_num == 2:
-                    # first 2 chunks received, strip silence of beginning
-                    stripped_audio = await strip_silence(
-                        mass,
-                        prev_chunk + chunk,
-                        sample_rate=pcm_format.sample_rate,
-                        bit_depth=pcm_format.bit_depth,
-                    )
-                    yield (False, stripped_audio)
-                    bytes_sent += len(stripped_audio)
-                    prev_chunk = b""
-                    del stripped_audio
-                    continue
-                if strip_silence_end and chunk_num >= (expected_chunks - 6):
-                    # last part of the track, collect multiple chunks to strip silence later
-                    prev_chunk += chunk
-                    continue
-
-                # middle part of the track, send previous chunk and collect current chunk
-                if prev_chunk:
-                    yield (False, prev_chunk)
-                    bytes_sent += len(prev_chunk)
+    finished = False
+    logger.debug("start media stream for: %s", streamdetails.uri)
 
-                prev_chunk = chunk
-
-            # all chunks received, strip silence of last part
-
-            if strip_silence_end and prev_chunk:
-                final_chunk = await strip_silence(
+    writer_task: asyncio.Task | None = None
+    ffmpeg_proc = AsyncProcess(
+        ffmpeg_args, enable_stdin=streamdetails.direct is None, enable_stderr=True
+    )
+    await ffmpeg_proc.start()
+
+    async def writer() -> None:
+        """Task that grabs the source audio and feeds it to ffmpeg."""
+        logger.debug("writer started for %s", streamdetails.uri)
+        music_prov = mass.get_provider(streamdetails.provider)
+        seek_pos = seek_position if streamdetails.can_seek else 0
+        async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
+            await ffmpeg_proc.write(audio_chunk)
+        # write eof when last packet is received
+        ffmpeg_proc.write_eof()
+        logger.debug("writer finished for %s", streamdetails.uri)
+
+    if streamdetails.direct is None:
+        writer_task = asyncio.create_task(writer())
+
+    # get pcm chunks from stdout
+    # we always stay one chunk behind to properly detect end of chunks
+    # so we can strip silence at the beginning and end of a track
+    prev_chunk = b""
+    chunk_num = 0
+    try:
+        async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
+            chunk_num += 1
+            if strip_silence_begin and chunk_num == 2:
+                # first 2 chunks received, strip silence of beginning
+                stripped_audio = await strip_silence(
                     mass,
-                    prev_chunk,
+                    prev_chunk + chunk,
                     sample_rate=pcm_format.sample_rate,
                     bit_depth=pcm_format.bit_depth,
-                    reverse=True,
                 )
-            else:
-                final_chunk = prev_chunk
+                yield stripped_audio
+                bytes_sent += len(stripped_audio)
+                prev_chunk = b""
+                del stripped_audio
+                continue
+            if strip_silence_end and chunk_num >= (expected_chunks - 6):
+                # last part of the track, collect multiple chunks to strip silence later
+                prev_chunk += chunk
+                continue
 
-            # use communicate to read stderr and wait for exit
-            # read log for loudness measurement (or errors)
-            _, stderr = await ffmpeg_proc.communicate()
-            if ffmpeg_proc.returncode != 0:
-                # ffmpeg has a non zero returncode meaning trouble
-                LOGGER.getChild("ffmpeg").warning("STREAM ERROR on %s", streamdetails.uri)
-                LOGGER.getChild("ffmpeg").warning(stderr.decode())
-            elif loudness_details := _parse_loudnorm(stderr):
+            # middle part of the track, send previous chunk and collect current chunk
+            if prev_chunk:
+                yield prev_chunk
+                bytes_sent += len(prev_chunk)
+
+            prev_chunk = chunk
+
+        # all chunks received, strip silence of last part
+
+        if strip_silence_end and prev_chunk:
+            final_chunk = await strip_silence(
+                mass,
+                prev_chunk,
+                sample_rate=pcm_format.sample_rate,
+                bit_depth=pcm_format.bit_depth,
+                reverse=True,
+            )
+        else:
+            final_chunk = prev_chunk
+
+        # ensure the final chunk is sent
+        # its important this is done here at the end so we can catch errors first
+        yield final_chunk
+        bytes_sent += len(final_chunk)
+        del final_chunk
+        del prev_chunk
+        finished = True
+    finally:
+        seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
+        streamdetails.seconds_streamed = seconds_streamed
+        if finished:
+            logger.debug(
+                "finished stream for: %s (%s seconds streamed)",
+                streamdetails.uri,
+                seconds_streamed,
+            )
+            # store accurate duration
+            streamdetails.duration = seek_position + seconds_streamed
+        else:
+            logger.debug(
+                "stream aborted for %s (%s seconds streamed)",
+                streamdetails.uri,
+                seconds_streamed,
+            )
+        if writer_task and not writer_task.done():
+            writer_task.cancel()
+        # use communicate to read stderr and wait for exit
+        # read log for loudness measurement (or errors)
+        _, stderr = await ffmpeg_proc.communicate()
+        if ffmpeg_proc.returncode != 0:
+            # ffmpeg has a non zero returncode meaning trouble
+            logger.warning("STREAM ERROR on %s", streamdetails.uri)
+            logger.warning(stderr.decode())
+        elif loudness_details := _parse_loudnorm(stderr):
+            required_seconds = 300 if streamdetails.media_type == MediaType.RADIO else 60
+            if finished or seconds_streamed >= required_seconds:
                 LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details)
                 streamdetails.loudness = loudness_details
                 await mass.music.set_track_loudness(
                     streamdetails.item_id, streamdetails.provider, loudness_details
                 )
-            else:
-                LOGGER.getChild("ffmpeg").debug(stderr.decode())
-
-            # ensure the final chunk is sent and mark as final
-            # its important this is done here at the end so we can catch errors first
-            yield (True, final_chunk)
-            bytes_sent += len(final_chunk)
-            del final_chunk
-            del prev_chunk
-
-        except (asyncio.CancelledError, GeneratorExit):
-            LOGGER.debug("media stream aborted for: %s", streamdetails.uri)
-            raise
         else:
-            LOGGER.debug("finished media stream for: %s", streamdetails.uri)
-            # store accurate duration
-            streamdetails.duration = seek_position + bytes_sent / pcm_sample_size
-        finally:
-            # report playback
-            seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
-            streamdetails.seconds_streamed = seconds_streamed
-            if seconds_streamed < 20:
-                mass.create_task(
-                    mass.music.mark_item_played(
-                        streamdetails.media_type, streamdetails.item_id, streamdetails.provider
-                    )
+            logger.debug(stderr.decode())
+
+        # report playback
+        if finished or seconds_streamed > 30:
+            mass.create_task(
+                mass.music.mark_item_played(
+                    streamdetails.media_type, streamdetails.item_id, streamdetails.provider
                 )
-                if music_prov := mass.get_provider(streamdetails.provider):
-                    mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
+            )
+            if music_prov := mass.get_provider(streamdetails.provider):
+                mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
 
-                if not streamdetails.loudness:
-                    # send loudness analyze job to background worker
-                    # note that we only do this if a track was at least been partially played
-                    mass.create_task(analyze_loudness(mass, streamdetails))
+            if not streamdetails.loudness:
+                # send loudness analyze job to background worker
+                # note that we only do this if a track was at least been partially played
+                mass.create_task(analyze_loudness(mass, streamdetails))
 
 
 async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool]:
@@ -670,6 +700,77 @@ async def get_file_stream(
             yield data
 
 
+async def get_ffmpeg_stream(
+    audio_input: AsyncGenerator[bytes, None] | str,
+    input_format: AudioFormat,
+    output_format: AudioFormat,
+    filter_params: list[str] | None = None,
+    extra_args: list[str] | None = None,
+    chunk_size: int | None = None,
+) -> AsyncGenerator[bytes, None]:
+    """
+    Get the ffmpeg audio stream as async generator.
+
+    Takes care of resampling and/or recoding if needed,
+    according to player preferences.
+    """
+    logger = LOGGER.getChild("media_stream")
+    use_stdin = not isinstance(audio_input, str)
+    if input_format == output_format and not filter_params and not chunk_size and use_stdin:
+        # edge case: input and output exactly the same, we can bypass ffmpeg
+        # return the raw input stream, no actions needed here
+        async for chunk in audio_input:
+            yield chunk
+        return
+
+    ffmpeg_args = await _get_ffmpeg_args(
+        input_format=input_format,
+        output_format=output_format,
+        filter_params=filter_params or [],
+        extra_args=extra_args or [],
+        input_path="-" if use_stdin else audio_input,
+        output_path="-",
+    )
+
+    writer_task: asyncio.Task | None = None
+    ffmpeg_proc = AsyncProcess(
+        ffmpeg_args, enable_stdin=use_stdin, enable_stdout=True, enable_stderr=True
+    )
+    await ffmpeg_proc.start()
+
+    # feed stdin with pcm audio chunks from origin
+    async def writer() -> None:
+        async for chunk in audio_input:
+            if ffmpeg_proc.closed:
+                return
+            await ffmpeg_proc.write(chunk)
+        ffmpeg_proc.write_eof()
+
+    try:
+        if not isinstance(audio_input, str):
+            writer_task = asyncio.create_task(writer())
+
+        # read final chunks from stdout
+        chunk_size = chunk_size or get_chunksize(output_format, 1)
+        async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
+            try:
+                yield chunk
+            except (BrokenPipeError, ConnectionResetError):
+                # race condition
+                break
+    finally:
+        if writer_task and not writer_task.done():
+            writer_task.cancel()
+        # use communicate to read stderr and wait for exit
+        # read log for loudness measurement (or errors)
+        _, stderr = await ffmpeg_proc.communicate()
+        if ffmpeg_proc.returncode != 0:
+            # ffmpeg has a non zero returncode meaning trouble
+            logger.warning("FFMPEG ERROR\n%s", stderr.decode())
+        else:
+            logger.debug(stderr.decode())
+
+
 async def check_audio_support() -> tuple[bool, bool, str]:
     """Check if ffmpeg is present (with/without libsoxr support)."""
     cache_key = "audio_support_cache"
@@ -702,7 +803,7 @@ async def get_preview_stream(
         "ffmpeg",
         "-hide_banner",
         "-loglevel",
-        "quiet",
+        "info",
         "-ignore_unknown",
     ]
     if streamdetails.direct:
@@ -715,22 +816,30 @@ async def get_preview_stream(
 
     output_args = ["-to", "30", "-f", "mp3", "-"]
     args = input_args + output_args
-    async with AsyncProcess(args, True) as ffmpeg_proc:
 
-        async def writer() -> None:
-            """Task that grabs the source audio and feeds it to ffmpeg."""
-            music_prov = mass.get_provider(streamdetails.provider)
-            async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30):
-                await ffmpeg_proc.write(audio_chunk)
-            # write eof when last packet is received
-            ffmpeg_proc.write_eof()
+    writer_task: asyncio.Task | None = None
+    ffmpeg_proc = AsyncProcess(args, enable_stdin=True, enable_stdout=True, enable_stderr=False)
+    await ffmpeg_proc.start()
 
-        if not streamdetails.direct:
-            ffmpeg_proc.attach_task(writer())
+    async def writer() -> None:
+        """Task that grabs the source audio and feeds it to ffmpeg."""
+        music_prov = mass.get_provider(streamdetails.provider)
+        async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30):
+            await ffmpeg_proc.write(audio_chunk)
+        # write eof when last packet is received
+        ffmpeg_proc.write_eof()
 
-        # yield chunks from stdout
+    if not streamdetails.direct:
+        writer_task = asyncio.create_task(writer())
+
+    # yield chunks from stdout
+    try:
         async for chunk in ffmpeg_proc.iter_any():
             yield chunk
+    finally:
+        if writer_task and not writer_task.done():
+            writer_task.cancel()
+        await ffmpeg_proc.close()
 
 
 async def get_silence(
@@ -767,7 +876,7 @@ async def get_silence(
         "-t",
         str(duration),
         "-f",
-        output_format.output_fmt.value,
+        output_format.output_format_str,
         "-",
     ]
     async with AsyncProcess(args) as ffmpeg_proc:
@@ -794,11 +903,42 @@ def get_chunksize(
     return int((320000 / 8) * seconds)
 
 
+def get_player_filter_params(
+    mass: MusicAssistant,
+    player_id: str,
+) -> list[str]:
+    """Get player specific filter parameters for ffmpeg (if any)."""
+    # collect all players-specific filter args
+    # TODO: add convolution/DSP/roomcorrections here?!
+    filter_params = []
+
+    # the below is a very basic 3-band equalizer,
+    # this could be a lot more sophisticated at some point
+    if (eq_bass := mass.config.get_raw_player_config_value(player_id, CONF_EQ_BASS, 0)) != 0:
+        filter_params.append(f"equalizer=frequency=100:width=200:width_type=h:gain={eq_bass}")
+    if (eq_mid := mass.config.get_raw_player_config_value(player_id, CONF_EQ_MID, 0)) != 0:
+        filter_params.append(f"equalizer=frequency=900:width=1800:width_type=h:gain={eq_mid}")
+    if (eq_treble := mass.config.get_raw_player_config_value(player_id, CONF_EQ_TREBLE, 0)) != 0:
+        filter_params.append(f"equalizer=frequency=9000:width=18000:width_type=h:gain={eq_treble}")
+    # handle output mixing only left or right
+    conf_channels = mass.config.get_raw_player_config_value(
+        player_id, CONF_OUTPUT_CHANNELS, "stereo"
+    )
+    if conf_channels == "left":
+        filter_params.append("pan=mono|c0=FL")
+    elif conf_channels == "right":
+        filter_params.append("pan=mono|c0=FR")
+
+    return filter_params
+
+
 async def _get_ffmpeg_args(
-    streamdetails: StreamDetails,
-    pcm_output_format: AudioFormat,
-    seek_position: int = 0,
-    fade_in: bool = False,
+    input_format: AudioFormat,
+    output_format: AudioFormat,
+    filter_params: list[str],
+    extra_args: list[str],
+    input_path: str = "-",
+    output_path: str = "-",
 ) -> list[str]:
     """Collect all args to send to the ffmpeg process."""
     ffmpeg_present, libsoxr_support, version = await check_audio_support()
@@ -827,76 +967,51 @@ async def _get_ffmpeg_args(
     # collect input args
     input_args = [
         "-ac",
-        str(streamdetails.audio_format.channels),
+        str(input_format.channels),
         "-channel_layout",
-        "mono" if streamdetails.audio_format.channels == 1 else "stereo",
+        "mono" if input_format.channels == 1 else "stereo",
     ]
-    if seek_position:
-        input_args += ["-ss", str(seek_position)]
-    if streamdetails.direct:
-        # ffmpeg can access the inputfile (or url) directly
-        if streamdetails.direct.startswith("http"):
-            # append reconnect options for direct stream from http
+    if input_format.content_type.is_pcm():
+        input_args += ["-ar", str(input_format.sample_rate)]
+    if input_path.startswith("http"):
+        # append reconnect options for direct stream from http
+        input_args += [
+            "-reconnect",
+            "1",
+            "-reconnect_streamed",
+            "1",
+            "-reconnect_delay_max",
+            "10",
+        ]
+        if major_version > 4:
+            # these options are only supported in ffmpeg > 5
             input_args += [
-                "-reconnect",
+                "-reconnect_on_network_error",
                 "1",
-                "-reconnect_streamed",
-                "1",
-                "-reconnect_delay_max",
-                "10",
+                "-reconnect_on_http_error",
+                "5xx",
             ]
-            if major_version > 4:
-                # these options are only supported in ffmpeg > 5
-                input_args += [
-                    "-reconnect_on_network_error",
-                    "1",
-                    "-reconnect_on_http_error",
-                    "5xx",
-                ]
-
-        input_args += ["-i", streamdetails.direct]
-    else:
-        # the input is received from pipe/stdin
-        if streamdetails.audio_format.content_type != ContentType.UNKNOWN:
-            input_args += ["-f", streamdetails.audio_format.content_type.value]
-        input_args += [
-            "-i",
-            "-",
-        ]
+    if input_format.content_type != ContentType.UNKNOWN:
+        input_args += ["-f", input_format.content_type.value]
+    input_args += ["-i", input_path]
 
     # collect output args
     output_args = [
         "-acodec",
-        pcm_output_format.content_type.name.lower(),
+        output_format.content_type.name.lower(),
         "-f",
-        pcm_output_format.content_type.value,
+        output_format.content_type.value,
         "-ac",
-        str(pcm_output_format.channels),
+        str(output_format.channels),
         "-ar",
-        str(pcm_output_format.sample_rate),
-        "-",
+        str(output_format.sample_rate),
+        output_path,
     ]
-    # collect extra and filter args
-    extra_args = []
-    filter_params = []
-    if streamdetails.target_loudness is not None:
-        filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5"
-        if streamdetails.loudness:
-            filter_rule += f":measured_I={streamdetails.loudness.integrated}"
-            filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
-            filter_rule += f":measured_tp={streamdetails.loudness.true_peak}"
-            filter_rule += f":measured_thresh={streamdetails.loudness.threshold}"
-        filter_rule += ":print_format=json"
-        filter_params.append(filter_rule)
-    if (
-        streamdetails.audio_format.sample_rate != pcm_output_format.sample_rate
-        and libsoxr_support
-        and streamdetails.media_type == MediaType.TRACK
-    ):
-        # prefer libsoxr high quality resampler (if present) for sample rate conversions
+
+    # prefer libsoxr high quality resampler (if present) for sample rate conversions
+    if input_format.sample_rate != output_format.sample_rate and libsoxr_support:
         filter_params.append("aresample=resampler=soxr")
-    if fade_in:
-        filter_params.append("afade=type=in:start_time=0:duration=3")
+
     if filter_params:
         extra_args += ["-af", ",".join(filter_params)]
 
index 159be91075033cb2e3a1531909f928f848ed4ffd..c3a27314bffc258f6b26dfa1b0deb36672dd4b4b 100644 (file)
@@ -33,12 +33,14 @@ def create_didl_metadata(
             "</DIDL-Lite>"
         )
     is_radio = queue_item.media_type != MediaType.TRACK or not queue_item.duration
-    image_url = mass.metadata.get_image_url(queue_item.image) if queue_item.image else ""
+    image_url = (
+        mass.metadata.get_image_url(queue_item.image) if queue_item.image else MASS_LOGO_ONLINE
+    )
     if is_radio:
         # radio or other non-track item
         return (
             '<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
-            '<item id="1" parentID="0" restricted="1">'
+            f'<item id="flowmode" parentID="0" restricted="1">'
             f"<dc:title>{escape_string(queue_item.name)}</dc:title>"
             f"<upnp:albumArtURI>{escape_string(image_url)}</upnp:albumArtURI>"
             f"<dc:queueItemId>{queue_item.queue_item_id}</dc:queueItemId>"
index eeab4f658332e378505357e429a5a03d62f67dbc..8668b4992da55b9c1e99ce6b2f4cbcf8673d5b05 100644 (file)
@@ -12,7 +12,7 @@ from contextlib import suppress
 from typing import TYPE_CHECKING
 
 if TYPE_CHECKING:
-    from collections.abc import AsyncGenerator, Coroutine
+    from collections.abc import AsyncGenerator
 
 LOGGER = logging.getLogger(__name__)
 
@@ -37,9 +37,18 @@ class AsyncProcess:
         self._enable_stdin = enable_stdin
         self._enable_stdout = enable_stdout
         self._enable_stderr = enable_stderr
-        self._attached_task: asyncio.Task = None
-        self.closed = False
-        self.returncode: int | None = None
+
+    @property
+    def closed(self) -> bool:
+        """Return if the process was closed."""
+        return self.returncode is not None
+
+    @property
+    def returncode(self) -> int | None:
+        """Return the erturncode of the process."""
+        if self._proc is None:
+            return None
+        return self._proc.returncode
 
     async def __aenter__(self) -> AsyncProcess:
         """Enter context manager."""
@@ -119,37 +128,27 @@ class AsyncProcess:
             # already exited, race condition
             pass
 
-    async def close(self) -> None:
-        """Close/terminate the process."""
-        self.closed = True
-        if self._attached_task and not self._attached_task.done():
-            with suppress(asyncio.CancelledError):
-                self._attached_task.cancel()
+    async def close(self) -> int:
+        """Close/terminate the process and wait for exit."""
+        if self.returncode is not None:
+            return self.returncode
         # make sure the process is cleaned up
-        self.write_eof()
-        if self._proc.returncode is None:
-            try:
-                async with asyncio.timeout(10):
-                    await self.communicate()
-            except TimeoutError:
-                self._proc.kill()
-        await self.wait()
+        try:
+            async with asyncio.timeout(10):
+                await self.communicate()
+        except (TimeoutError, asyncio.CancelledError):
+            self._proc.terminate()
+        return await self.wait()
 
     async def wait(self) -> int:
         """Wait for the process and return the returncode."""
         if self.returncode is not None:
             return self.returncode
-        if self._proc.returncode is not None:
-            self.returncode = self._proc.returncode
-            return self.returncode
-        self.returncode = await self._proc.wait()
-        self.closed = True
-        return self.returncode
+        return await self._proc.wait()
 
     async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
         """Write bytes to process and read back results."""
         stdout, stderr = await self._proc.communicate(input_data)
-        self.returncode = self._proc.returncode
         return (stdout, stderr)
 
     async def read_stderr(self, n: int = -1) -> bytes:
@@ -161,11 +160,6 @@ class AsyncProcess:
         """
         return await self._proc.stderr.read(n)
 
-    def attach_task(self, coro: Coroutine) -> asyncio.Task:
-        """Attach given coro func as reader/writer task to properly cancel it when needed."""
-        self._attached_task = task = asyncio.create_task(coro)
-        return task
-
 
 async def check_output(shell_cmd: str) -> tuple[int, bytes]:
     """Run shell subprocess and return output."""
index 8f5830a00af24eca38a5802b8c406f1bffd7e06b..d917ef306da6c09dda5260eb8552fbeb211ce72d 100644 (file)
@@ -2,6 +2,7 @@
 
 from __future__ import annotations
 
+import asyncio
 import json
 import logging
 import os
@@ -366,51 +367,54 @@ async def parse_tags(
         file_path,
     )
 
-    async with AsyncProcess(
+    writer_task: asyncio.Task | None = None
+    ffmpeg_proc = AsyncProcess(
         args, enable_stdin=file_path == "-", enable_stdout=True, enable_stderr=False
-    ) as proc:
-        if file_path == "-":
-            # feed the file contents to the process
-
-            async def chunk_feeder() -> None:
-                bytes_read = 0
-                try:
-                    async for chunk in input_file:
-                        if proc.closed:
-                            break
-                        await proc.write(chunk)
-                        bytes_read += len(chunk)
-                        del chunk
-                        if bytes_read > 25 * 1000000:
-                            # this is possibly a m4a file with 'moove atom' metadata at the
-                            # end of the file
-                            # we'll have to read the entire file to do something with it
-                            # for now we just ignore/deny these files
-                            LOGGER.error("Found file with tags not present at beginning of file")
-                            break
-                finally:
-                    proc.write_eof()
-
-            proc.attach_task(chunk_feeder())
-
-        try:
-            res = await proc.read(-1)
-            data = json.loads(res)
-            if error := data.get("error"):
-                raise InvalidDataError(error["string"])
-            if not data.get("streams"):
-                msg = "Not an audio file"
-                raise InvalidDataError(msg)
-            tags = AudioTags.parse(data)
-            del res
-            del data
-            if not tags.duration and file_size and tags.bit_rate:
-                # estimate duration from filesize/bitrate
-                tags.duration = int((file_size * 8) / tags.bit_rate)
-            return tags
-        except (KeyError, ValueError, JSONDecodeError, InvalidDataError) as err:
-            msg = f"Unable to retrieve info for {file_path}: {err!s}"
-            raise InvalidDataError(msg) from err
+    )
+    await ffmpeg_proc.start()
+
+    async def writer() -> None:
+        bytes_read = 0
+        async for chunk in input_file:
+            if ffmpeg_proc.closed:
+                break
+            await ffmpeg_proc.write(chunk)
+            bytes_read += len(chunk)
+            del chunk
+            if bytes_read > 25 * 1000000:
+                # this is possibly a m4a file with 'moove atom' metadata at the
+                # end of the file
+                # we'll have to read the entire file to do something with it
+                # for now we just ignore/deny these files
+                LOGGER.error("Found file with tags not present at beginning of file")
+                break
+
+    if file_path == "-":
+        # feed the file contents to the process
+        writer_task = asyncio.create_task(writer)
+
+    try:
+        res = await ffmpeg_proc.read(-1)
+        data = json.loads(res)
+        if error := data.get("error"):
+            raise InvalidDataError(error["string"])
+        if not data.get("streams"):
+            msg = "Not an audio file"
+            raise InvalidDataError(msg)
+        tags = AudioTags.parse(data)
+        del res
+        del data
+        if not tags.duration and file_size and tags.bit_rate:
+            # estimate duration from filesize/bitrate
+            tags.duration = int((file_size * 8) / tags.bit_rate)
+        return tags
+    except (KeyError, ValueError, JSONDecodeError, InvalidDataError) as err:
+        msg = f"Unable to retrieve info for {file_path}: {err!s}"
+        raise InvalidDataError(msg) from err
+    finally:
+        if writer_task and not writer_task.done():
+            writer_task.cancel()
+        await ffmpeg_proc.close()
 
 
 async def get_embedded_image(input_file: str | AsyncGenerator[bytes, None]) -> bytes | None:
@@ -436,20 +440,27 @@ async def get_embedded_image(input_file: str | AsyncGenerator[bytes, None]) -> b
         "-",
     )
 
-    async with AsyncProcess(
+    writer_task: asyncio.Task | None = None
+    ffmpeg_proc = AsyncProcess(
         args, enable_stdin=file_path == "-", enable_stdout=True, enable_stderr=False
-    ) as proc:
-        if file_path == "-":
-            # feed the file contents to the process
-            async def chunk_feeder() -> None:
-                try:
-                    async for chunk in input_file:
-                        if proc.closed:
-                            break
-                        await proc.write(chunk)
-                finally:
-                    proc.write_eof()
-
-            proc.attach_task(chunk_feeder())
-
-        return await proc.read(-1)
+    )
+    await ffmpeg_proc.start()
+
+    async def writer() -> None:
+        async for chunk in input_file:
+            if ffmpeg_proc.closed:
+                break
+            await ffmpeg_proc.write(chunk)
+        ffmpeg_proc.write_eof()
+
+    # feed the file contents to the process stdin
+    if file_path == "-":
+        writer_task = asyncio.create_task(writer)
+
+    # return image bytes from stdout
+    try:
+        return await ffmpeg_proc.read(-1)
+    finally:
+        if writer_task and not writer_task.cancelled():
+            writer_task.cancel()
+        await ffmpeg_proc.close()
index cfacf47e76fd34aa95c9a08e4193036dba5e62af..85b05576e0a84f865fe16406c017faa604690798 100644 (file)
@@ -22,7 +22,6 @@ from .provider import Provider
 if TYPE_CHECKING:
     from music_assistant.common.models.player import Player
     from music_assistant.common.models.queue_item import QueueItem
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
 
 
 # ruff: noqa: ARG001, ARG002
@@ -123,13 +122,6 @@ class PlayerProvider(Provider):
         """
         raise NotImplementedError
 
-    async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle PLAY STREAM on given player.
-
-        This is a special feature from the Universal Group provider.
-        """
-        raise NotImplementedError
-
     async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
         """
         Handle enqueuing of the next queue item on the player.
index 7cdbfe216e5b1dc8f260500b929ab9e818fa1555..263b839dd621c58dcafde73857f79aaba822418c 100644 (file)
@@ -17,10 +17,14 @@ from zeroconf import IPVersion, ServiceStateChange
 from zeroconf.asyncio import AsyncServiceInfo
 
 from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import get_ip_pton, select_free_port
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_CROSSFADE,
     CONF_ENTRY_CROSSFADE_DURATION,
+    CONF_ENTRY_EQ_BASS,
+    CONF_ENTRY_EQ_MID,
+    CONF_ENTRY_EQ_TREBLE,
+    CONF_ENTRY_OUTPUT_CHANNELS,
     CONF_ENTRY_SYNC_ADJUST,
     ConfigEntry,
     ConfigValueType,
@@ -37,6 +41,7 @@ from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.common.models.player_queue import PlayerQueue
 from music_assistant.constants import CONF_SYNC_ADJUST
+from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
 from music_assistant.server.helpers.process import check_output
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -45,7 +50,6 @@ if TYPE_CHECKING:
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
     from music_assistant.server.models import ProviderInstanceType
 
 DOMAIN = "airplay"
@@ -59,6 +63,10 @@ CONF_PASSWORD = "password"
 PLAYER_CONFIG_ENTRIES = (
     CONF_ENTRY_CROSSFADE,
     CONF_ENTRY_CROSSFADE_DURATION,
+    CONF_ENTRY_EQ_BASS,
+    CONF_ENTRY_EQ_MID,
+    CONF_ENTRY_EQ_TREBLE,
+    CONF_ENTRY_OUTPUT_CHANNELS,
     ConfigEntry(
         key=CONF_ENCRYPTION,
         type=ConfigEntryType.BOOLEAN,
@@ -177,7 +185,7 @@ class AirplayStreamJob:
         self.active_remote_id: str = str(randint(1000, 8000))
         self.start_ntp: int | None = None  # use as checksum
         self.prevent_playback: bool = False
-        self._audio_buffer = asyncio.Queue(2)
+        self._audio_iterator: AsyncGenerator[bytes, None] | None = None
         self._log_reader_task: asyncio.Task | None = None
         self._audio_reader_task: asyncio.Task | None = None
         self._cliraop_proc: asyncio.subprocess.Process | None = None
@@ -192,9 +200,10 @@ class AirplayStreamJob:
             and self._cliraop_proc.returncode is None
         )
 
-    async def init_cliraop(self, start_ntp: int) -> None:
+    async def start(self, start_ntp: int, audio_iterator: AsyncGenerator[bytes, None]) -> None:
         """Initialize CLIRaop process for a player."""
         self.start_ntp = start_ntp
+        self._audio_iterator = audio_iterator
         extra_args = []
         player_id = self.airplay_player.player_id
         mass_player = self.mass.players.get(player_id)
@@ -202,9 +211,6 @@ class AirplayStreamJob:
             extra_args += ["-encrypt"]
         if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
             extra_args += ["-alac"]
-        if "airport" in mass_player.device_info.model.lower():
-            # enforce auth on airport express
-            extra_args += ["-auth"]
         for prop in ("et", "md", "am", "pk", "pw"):
             if prop_value := self.airplay_player.discovery_info.decoded_properties.get(prop):
                 extra_args += [f"-{prop}", prop_value]
@@ -213,8 +219,6 @@ class AirplayStreamJob:
         if device_password := self.mass.config.get_raw_player_config_value(
             player_id, CONF_PASSWORD, None
         ):
-            # NOTE: This may not work as we might need to do
-            # some fancy hashing with the plain password first?!
             extra_args += ["-password", device_password]
         if self.prov.log_level == "DEBUG":
             extra_args += ["-debug", "5"]
@@ -258,20 +262,21 @@ class AirplayStreamJob:
             return
         await self.send_cli_command("ACTION=STOP")
         self._stop_requested = True
+        if not force:
+            return
         # stop background tasks
-        if self._log_reader_task and not self._log_reader_task.done():
-            if force:
-                self._log_reader_task.cancel()
-            with suppress(asyncio.CancelledError):
-                await self._log_reader_task
         if self._audio_reader_task and not self._audio_reader_task.done():
-            if force:
-                self._audio_reader_task.cancel()
             with suppress(asyncio.CancelledError):
+                self._audio_reader_task.cancel()
                 await self._audio_reader_task
-
-        empty_queue(self._audio_buffer)
-        await asyncio.wait_for(self._cliraop_proc.communicate(), 30)
+        if self._log_reader_task and not self._log_reader_task.done():
+            with suppress(asyncio.CancelledError):
+                self._log_reader_task.cancel()
+                await self._log_reader_task
+        with suppress(TimeoutError):
+            await asyncio.wait_for(self._cliraop_proc.communicate(), 5)
+        if self._cliraop_proc.returncode is None:
+            self._cliraop_proc.kill()
 
     async def send_cli_command(self, command: str) -> None:
         """Send an interactive command to the running CLIRaop binary."""
@@ -356,35 +361,96 @@ class AirplayStreamJob:
             self.mass.players.update(airplay_player.player_id)
 
     async def _audio_reader(self) -> None:
-        """Read audio chunks from buffer and send them to the cliraop process."""
+        """Read audio chunks and send them to the cliraop process."""
         logger = self.airplay_player.logger
-        logger.debug("Audio reader started")
-        while self.running:
-            chunk = await self._audio_buffer.get()
-            if chunk == b"EOF":
-                # EOF chunk
-                break
+        mass_player = self.mass.players.get(self.airplay_player.player_id, True)
+        queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
+        logger.debug(
+            "Starting RAOP stream for Queue %s to %s",
+            queue.display_name,
+            mass_player.display_name,
+        )
+        prev_metadata_checksum: str = ""
+        prev_progress_report: float = 0
+        async for chunk in self._audio_iterator:
+            if not self.running:
+                return
             self._cliraop_proc.stdin.write(chunk)
-            with suppress(BrokenPipeError, ConnectionResetError):
+            try:
                 await self._cliraop_proc.stdin.drain()
+            except (BrokenPipeError, ConnectionResetError):
+                break
+            # send metadata to player(s) if needed
+            # NOTE: this must all be done in separate tasks to not disturb audio
+            now = time.time()
+            if queue and queue.current_item and queue.current_item.streamdetails:
+                metadata_checksum = (
+                    queue.current_item.streamdetails.stream_title
+                    or queue.current_item.queue_item_id
+                )
+                if prev_metadata_checksum != metadata_checksum:
+                    prev_metadata_checksum = metadata_checksum
+                    prev_progress_report = now
+                    self.mass.create_task(self._send_metadata(queue))
+                # send the progress report every 5 seconds
+                elif now - prev_progress_report >= 5:
+                    prev_progress_report = now
+                    self.mass.create_task(self._send_progress(queue))
         # send EOF
         if self._cliraop_proc.returncode is None and not self._cliraop_proc.stdin.is_closing():
             self._cliraop_proc.stdin.write_eof()
             with suppress(BrokenPipeError, ConnectionResetError):
                 await self._cliraop_proc.stdin.drain()
-        logger.debug("Audio reader finished")
+        logger.debug(
+            "Finished RAOP stream for Queue %s to %s",
+            queue.display_name,
+            mass_player.display_name,
+        )
 
-    async def write_chunk(self, data: bytes) -> None:
-        """Write a chunk of (pcm) data to the audio buffer."""
-        if not self.running:
+    async def _send_metadata(self, queue: PlayerQueue) -> None:
+        """Send metadata to player (and connected sync childs)."""
+        if not queue or not queue.current_item:
             return
-        await self._audio_buffer.put(data)
+        duration = min(queue.current_item.duration or 0, 3600)
+        title = queue.current_item.name
+        artist = ""
+        album = ""
+        if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title:
+            # stream title from radio station
+            stream_title = queue.current_item.streamdetails.stream_title
+            if " - " in stream_title:
+                artist, title = stream_title.split(" - ", 1)
+            else:
+                title = stream_title
+            # set album to radio station name
+            album = queue.current_item.name
+        if media_item := queue.current_item.media_item:
+            if artist_str := getattr(media_item, "artist_str", None):
+                artist = artist_str
+            if _album := getattr(media_item, "album", None):
+                album = _album.name
 
-    async def write_eof(self) -> None:
-        """Write end-of-file chunk to the audo buffer."""
-        if not self.running:
+        cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n"
+        cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
+
+        await self.send_cli_command(cmd)
+
+        # get image
+        if not queue.current_item.image:
             return
-        await self._audio_buffer.put(b"EOF")
+
+        # the image format needs to be 500x500 jpeg for maximum compatibility with players
+        image_url = self.mass.metadata.get_image_url(
+            queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg"
+        )
+        await self.send_cli_command(f"ARTWORK={image_url}\n")
+
+    async def _send_progress(self, queue: PlayerQueue) -> None:
+        """Send progress report to player (and connected sync childs)."""
+        if not queue or not queue.current_item:
+            return
+        progress = int(queue.corrected_elapsed_time)
+        await self.send_cli_command(f"PROGRESS={progress}\n")
 
 
 @dataclass
@@ -404,7 +470,6 @@ class AirplayProvider(PlayerProvider):
     cliraop_bin: str | None = None
     _players: dict[str, AirPlayPlayer]
     _discovery_running: bool = False
-    _stream_tasks: dict[str, asyncio.Task]
     _dacp_server: asyncio.Server = None
     _dacp_info: AsyncServiceInfo = None
 
@@ -416,7 +481,6 @@ class AirplayProvider(PlayerProvider):
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
         self._players = {}
-        self._stream_tasks = {}
         self.cliraop_bin = await self._getcliraop_binary()
         dacp_port = await select_free_port(39831, 49831)
         self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}"
@@ -504,8 +568,6 @@ class AirplayProvider(PlayerProvider):
 
         - player_id: player_id of the player to handle the command.
         """
-        if existing_stream := self._stream_tasks.get(player_id):
-            existing_stream.cancel()
 
         async def stop_player(airplay_player: AirPlayPlayer) -> None:
             if airplay_player.active_stream:
@@ -560,80 +622,41 @@ class AirplayProvider(PlayerProvider):
             - seek_position: Optional seek to this position.
             - fade_in: Optionally fade in the item at playback start.
         """
+        player = self.mass.players.get(player_id)
+        if player.synced_to:
+            # should not happen, but just in case
+            raise RuntimeError("Player is synced")
         # fix race condition where resync and play media are called at more or less the same time
         if self._resync_handle:
             self._resync_handle.cancel()
             self._resync_handle = None
         # always stop existing stream first
-        if existing_stream := self._stream_tasks.get(player_id):
-            existing_stream.cancel()
         for airplay_player in self._get_sync_clients(player_id):
             if airplay_player.active_stream and airplay_player.active_stream.running:
                 self.mass.create_task(airplay_player.active_stream.stop(force=True))
-        # start streaming the queue (pcm) audio in a background task
-        queue = self.mass.player_queues.get_active_queue(player_id)
-        self._stream_tasks[player_id] = asyncio.create_task(
-            self._stream_audio(
-                player_id,
-                queue=queue,
-                audio_iterator=self.mass.streams.get_flow_stream(
-                    queue,
-                    start_queue_item=queue_item,
-                    pcm_format=AudioFormat(
-                        content_type=ContentType.PCM_S16LE,
-                        sample_rate=44100,
-                        bit_depth=16,
-                        channels=2,
-                    ),
-                    seek_position=seek_position,
-                    fade_in=fade_in,
-                ),
-            )
+        pcm_format = AudioFormat(
+            content_type=ContentType.PCM_S16LE,
+            sample_rate=44100,
+            bit_depth=16,
+            channels=2,
         )
 
-    async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle PLAY STREAM on given player.
-
-        This is a special feature from the Universal Group provider.
-        """
-        # fix race condition where resync and play media are called at more or less the same time
-        if self._resync_handle:
-            self._resync_handle.cancel()
-            self._resync_handle = None
-        # always stop existing stream first
-        if existing_stream := self._stream_tasks.get(player_id):
-            existing_stream.cancel()
-        async with asyncio.TaskGroup() as tg:
-            for airplay_player in self._get_sync_clients(player_id):
-                if airplay_player.active_stream and airplay_player.active_stream.running:
-                    tg.create_task(airplay_player.active_stream.stop(force=True))
-        if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100:
-            # TODO: resample on the fly here ?
-            raise RuntimeError("Unsupported PCM format")
-        # start streaming the queue (pcm) audio in a background task
-        queue = self.mass.player_queues.get_active_queue(player_id)
-        self._stream_tasks[player_id] = asyncio.create_task(
-            self._stream_audio(
-                player_id,
-                queue=queue,
-                audio_iterator=stream_job.subscribe(player_id),
+        if queue_item.queue_item_id == "flow":
+            # handle special case for UGP multi client stream
+            stream_job = self.mass.streams.multi_client_jobs.get(queue_item.queue_id)
+        elif player.group_childs:
+            # create a new multi client flow stream
+            stream_job = await self.mass.streams.create_multi_client_stream_job(
+                queue_item.queue_id,
+                queue_item,
+                seek_position=seek_position,
+                fade_in=fade_in,
+                pcm_bit_depth=16,
+                pcm_sample_rate=44100,
             )
-        )
-
-    async def _stream_audio(
-        self, player_id: str, queue: PlayerQueue, audio_iterator: AsyncGenerator[bytes, None]
-    ) -> None:
-        """Handle the actual streaming of audio to Airplay."""
-        player = self.mass.players.get(player_id)
-        if player.synced_to:
-            # should not happen, but just in case
-            raise RuntimeError("Player is synced")
-        synced_player_ids = [x.player_id for x in self._get_sync_clients(player_id)]
-        self.logger.debug(
-            "Starting RAOP stream for Queue %s to %s",
-            queue.display_name,
-            "/".join(synced_player_ids),
-        )
+        else:
+            # for a single player we just consume the flow stream directly
+            stream_job = None
 
         # Python is not suitable for realtime audio streaming.
         # So, I've decided to go the fancy route here. I've created a small binary
@@ -646,60 +669,30 @@ class AirplayProvider(PlayerProvider):
         start_ntp = int(stdout.strip())
 
         # setup Raop process for player and its sync childs
-        for airplay_player in self._get_sync_clients(player_id):
-            airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
-            await airplay_player.active_stream.init_cliraop(start_ntp)
-        prev_metadata_checksum: str = ""
-        prev_progress_report: float = 0
-        async for pcm_chunk in audio_iterator:
-            # send audio chunk to player(s)
-            available_clients = 0
+        async with asyncio.TaskGroup() as tg:
             for airplay_player in self._get_sync_clients(player_id):
-                if (
-                    not airplay_player.active_stream
-                    or not airplay_player.active_stream.running
-                    or airplay_player.active_stream.start_ntp != start_ntp
-                ):
-                    # catch when this stream is no longer active on the player
-                    continue
-                available_clients += 1
-                await airplay_player.active_stream.write_chunk(pcm_chunk)
-            if not available_clients:
-                # this streamjob is no longer active
-                return
-
-            # send metadata to player(s) if needed
-            # NOTE: this must all be done in separate tasks to not disturb audio
-            now = time.time()
-            if queue and queue.current_item and queue.current_item.streamdetails:
-                metadata_checksum = (
-                    queue.current_item.streamdetails.stream_title
-                    or queue.current_item.queue_item_id
-                )
-                if prev_metadata_checksum != metadata_checksum:
-                    prev_metadata_checksum = metadata_checksum
-                    prev_progress_report = now
-                    self.mass.create_task(self._send_metadata(player_id, queue))
-                # send the progress report every 5 seconds
-                elif now - prev_progress_report >= 5:
-                    prev_progress_report = now
-                    self.mass.create_task(self._send_progress(player_id, queue))
-
-        # end of stream reached - write eof
-        self.logger.debug(
-            "Finished RAOP stream for Queue %s to %s",
-            queue.display_name,
-            "/".join(synced_player_ids),
-        )
-        for airplay_player in self._get_sync_clients(player_id):
-            if (
-                not airplay_player.active_stream
-                or not airplay_player.active_stream.running
-                or airplay_player.active_stream.start_ntp != start_ntp
-            ):
-                # this may not happen, but guard just in case
-                continue
-            await airplay_player.active_stream.write_eof()
+                if stream_job:
+                    stream_job.expected_players.add(airplay_player.player_id)
+                    audio_iterator = stream_job.subscribe(
+                        player_id=airplay_player.player_id,
+                        output_format=pcm_format,
+                    )
+                else:
+                    queue = self.mass.player_queues.get_active_queue(queue_item.queue_id)
+                    audio_iterator = get_ffmpeg_stream(
+                        self.mass.streams.get_flow_stream(
+                            queue,
+                            start_queue_item=queue_item,
+                            pcm_format=pcm_format,
+                            seek_position=seek_position,
+                            fade_in=fade_in,
+                        ),
+                        input_format=pcm_format,
+                        output_format=pcm_format,
+                        filter_params=get_player_filter_params(self.mass, airplay_player.player_id),
+                    )
+                airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
+                tg.create_task(airplay_player.active_stream.start(start_ntp, audio_iterator))
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
         """Send VOLUME_SET command to given player.
@@ -960,7 +953,7 @@ class AirplayProvider(PlayerProvider):
                 # device switched to another source (or is powered off)
                 if active_stream := airplay_player.active_stream:
                     # ignore this if we just started playing to prevent false positives
-                    if mass_player.elapsed_time > 2 and mass_player.state == PlayerState.PLAYING:
+                    if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING:
                         active_stream.prevent_playback = True
                         self.mass.create_task(self.monitor_prevent_playback(player_id))
             elif "device-prevent-playback=0" in path:
@@ -987,60 +980,6 @@ class AirplayProvider(PlayerProvider):
         finally:
             writer.close()
 
-    async def _send_metadata(self, player_id: str, queue: PlayerQueue) -> None:
-        """Send metadata to player (and connected sync childs)."""
-        if not queue or not queue.current_item:
-            return
-        duration = min(queue.current_item.duration or 0, 3600)
-        title = queue.current_item.name
-        artist = ""
-        album = ""
-        if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title:
-            # stream title from radio station
-            stream_title = queue.current_item.streamdetails.stream_title
-            if " - " in stream_title:
-                artist, title = stream_title.split(" - ", 1)
-            else:
-                title = stream_title
-            # set album to radio station name
-            album = queue.current_item.name
-        if media_item := queue.current_item.media_item:
-            if artist_str := getattr(media_item, "artist_str", None):
-                artist = artist_str
-            if _album := getattr(media_item, "album", None):
-                album = _album.name
-
-        cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n"
-        cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
-
-        for airplay_player in self._get_sync_clients(player_id):
-            if not airplay_player.active_stream or not airplay_player.active_stream.running:
-                continue
-            await airplay_player.active_stream.send_cli_command(cmd)
-
-        # get image
-        if not queue.current_item.image:
-            return
-
-        # the image format needs to be 500x500 jpeg for maximum compatibility with players
-        image_url = self.mass.metadata.get_image_url(
-            queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg"
-        )
-        for airplay_player in self._get_sync_clients(player_id):
-            if not airplay_player.active_stream or not airplay_player.active_stream.running:
-                continue
-            await airplay_player.active_stream.send_cli_command(f"ARTWORK={image_url}\n")
-
-    async def _send_progress(self, player_id: str, queue: PlayerQueue) -> None:
-        """Send progress report to player (and connected sync childs)."""
-        if not queue or not queue.current_item:
-            return
-        progress = int(queue.corrected_elapsed_time)
-        for airplay_player in self._get_sync_clients(player_id):
-            if not airplay_player.active_stream or not airplay_player.active_stream.running:
-                continue
-            await airplay_player.active_stream.send_cli_command(f"PROGRESS={progress}\n")
-
     async def monitor_prevent_playback(self, player_id: str):
         """Monitor the prevent playback state of an airplay player."""
         count = 0
@@ -1058,7 +997,7 @@ class AirplayProvider(PlayerProvider):
                 return
             if not active_stream.prevent_playback:
                 return
-            await asyncio.sleep(0.25)
+            await asyncio.sleep(0.5)
 
         airplay_player.logger.info(
             "Player has been in prevent playback mode for too long, powering off.",
index 6077108a144380f6b069a4ab24db29c48961046d..8019a1e791806a2a5f827280e2f8637187a51003 100644 (file)
@@ -49,7 +49,6 @@ if TYPE_CHECKING:
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
     from music_assistant.server.models import ProviderInstanceType
 
 
@@ -250,6 +249,7 @@ class ChromecastProvider(PlayerProvider):
             player_id, CONF_FLOW_MODE
         ) or await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
         url = await self.mass.streams.resolve_stream_url(
+            player_id,
             queue_item=queue_item,
             output_codec=ContentType.FLAC,
             seek_position=seek_position,
@@ -286,25 +286,11 @@ class ChromecastProvider(PlayerProvider):
         media_controller = castplayer.cc.media_controller
         await asyncio.to_thread(media_controller.send_message, queuedata, True)
 
-    async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle PLAY STREAM on given player.
-
-        This is a special feature from the Universal Group provider.
-        """
-        url = stream_job.resolve_stream_url(player_id, ContentType.FLAC)
-        castplayer = self.castplayers[player_id]
-        await asyncio.to_thread(
-            castplayer.cc.play_media,
-            url,
-            content_type="audio/flac",
-            title="Music Assistant",
-            thumb=MASS_LOGO_ONLINE,
-        )
-
     async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
         """Handle enqueuing of the next queue item on the player."""
         castplayer = self.castplayers[player_id]
         url = await self.mass.streams.resolve_stream_url(
+            player_id,
             queue_item=queue_item,
             output_codec=ContentType.FLAC,
         )
index 6e5d0244315fc7ca4fb10554928c5d1c44e61283..25606235ecf5562486434f72eda78c510fa2781a 100644 (file)
@@ -55,7 +55,6 @@ if TYPE_CHECKING:
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
     from music_assistant.server.models import ProviderInstanceType
 
 BASE_PLAYER_FEATURES = (
@@ -357,6 +356,7 @@ class DLNAPlayerProvider(PlayerProvider):
         use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
         enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
         url = await self.mass.streams.resolve_stream_url(
+            player_id,
             queue_item=queue_item,
             output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
             seek_position=seek_position,
@@ -386,40 +386,12 @@ class DLNAPlayerProvider(PlayerProvider):
             dlna_player.force_poll = True
             await self.poll_player(dlna_player.udn)
 
-    @catch_request_errors
-    async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle PLAY STREAM on given player.
-
-        This is a special feature from the Universal Group provider.
-        """
-        enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
-        output_codec = ContentType.MP3 if enforce_mp3 else ContentType.FLAC
-        url = stream_job.resolve_stream_url(player_id, output_codec)
-        dlna_player = self.dlnaplayers[player_id]
-        # always clear queue (by sending stop) first
-        if dlna_player.device.can_stop:
-            await self.cmd_stop(player_id)
-        didl_metadata = create_didl_metadata(self.mass, url, None)
-        await dlna_player.device.async_set_transport_uri(url, "Music Assistant", didl_metadata)
-        # Play it
-        await dlna_player.device.async_wait_for_can_play(10)
-        # optimistically set this timestamp to help in case of a player
-        # that does not report the progress
-        now = time.time()
-        dlna_player.player.elapsed_time = 0
-        dlna_player.player.elapsed_time_last_updated = now
-        await dlna_player.device.async_play()
-        # force poll the device
-        for sleep in (1, 2):
-            await asyncio.sleep(sleep)
-            dlna_player.force_poll = True
-            await self.poll_player(dlna_player.udn)
-
     @catch_request_errors
     async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
         """Handle enqueuing of the next queue item on the player."""
         dlna_player = self.dlnaplayers[player_id]
         url = await self.mass.streams.resolve_stream_url(
+            player_id,
             queue_item=queue_item,
             output_codec=ContentType.FLAC,
         )
index c505618c2b6fc80e20c85cff8c552141afdd19ee..a0887c33460361901966cb8038647b5795856531 100644 (file)
@@ -31,7 +31,6 @@ if TYPE_CHECKING:
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
     from music_assistant.server.models import ProviderInstanceType
 
 AUDIOMANAGER_STREAM_MUSIC = 3
@@ -199,6 +198,7 @@ class FullyKioskProvider(PlayerProvider):
         player = self.mass.players.get(player_id)
         enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
         url = await self.mass.streams.resolve_stream_url(
+            player_id,
             queue_item=queue_item,
             output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
             seek_position=seek_position,
@@ -212,22 +212,6 @@ class FullyKioskProvider(PlayerProvider):
         player.state = PlayerState.PLAYING
         self.mass.players.update(player_id)
 
-    async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle PLAY STREAM on given player.
-
-        This is a special feature from the Universal Group provider.
-        """
-        player = self.mass.players.get(player_id)
-        enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
-        output_codec = ContentType.MP3 if enforce_mp3 else ContentType.FLAC
-        url = stream_job.resolve_stream_url(player_id, output_codec)
-        await self._fully.playSound(url, AUDIOMANAGER_STREAM_MUSIC)
-        player.current_item_id = player_id
-        player.elapsed_time = 0
-        player.elapsed_time_last_updated = time.time()
-        player.state = PlayerState.PLAYING
-        self.mass.players.update(player_id)
-
     async def poll_player(self, player_id: str) -> None:
         """Poll player for state updates.
 
index 9d611a8a424b9dc2af67a876f63449114786f9f1..7323b6ce637e5c0132d5529ff32f419608d8cd19 100644 (file)
@@ -44,7 +44,6 @@ if TYPE_CHECKING:
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
     from music_assistant.server.models import ProviderInstanceType
     from music_assistant.server.providers.hass import HomeAssistant as HomeAssistantProvider
 
@@ -266,6 +265,7 @@ class HomeAssistantPlayers(PlayerProvider):
         use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
         enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
         url = await self.mass.streams.resolve_stream_url(
+            player_id,
             queue_item=queue_item,
             output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
             seek_position=seek_position,
@@ -287,25 +287,6 @@ class HomeAssistantPlayers(PlayerProvider):
             player.elapsed_time = 0
             player.elapsed_time_last_updated = time.time()
 
-    async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle PLAY STREAM on given player.
-
-        This is a special feature from the Universal Group provider.
-        """
-        enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
-        output_codec = ContentType.MP3 if enforce_mp3 else ContentType.FLAC
-        url = stream_job.resolve_stream_url(player_id, output_codec)
-        await self.hass_prov.hass.call_service(
-            domain="media_player",
-            service="play_media",
-            service_data={
-                "media_content_id": url,
-                "media_content_type": "music",
-                "enqueue": "replace",
-            },
-            target={"entity_id": player_id},
-        )
-
     async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
         """
         Handle enqueuing of the next queue item on the player.
@@ -321,6 +302,7 @@ class HomeAssistantPlayers(PlayerProvider):
         This will NOT be called if the player is using flow mode to playback the queue.
         """
         url = await self.mass.streams.resolve_stream_url(
+            player_id,
             queue_item=queue_item,
             output_codec=ContentType.FLAC,
         )
index 27a7c347109008c76ba0765c9863fb648107e324..93ae197c564700f0d2974dfa87e7e545317897e5 100644 (file)
@@ -62,7 +62,6 @@ if TYPE_CHECKING:
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
     from music_assistant.server.models import ProviderInstanceType
 
 
@@ -79,9 +78,10 @@ STATE_MAP = {
 REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
 
 # sync constants
-MIN_DEVIATION_ADJUST = 6  # 6 milliseconds
-MIN_REQ_PLAYPOINTS = 8  # we need at least 8 measurements
-MAX_SKIP_AHEAD_MS = 1500  # 1.5 seconds
+MIN_DEVIATION_ADJUST = 8  # 8 milliseconds
+MIN_REQ_PLAYPOINTS = 3  # we need at least 3 measurements
+DEVIATION_JUMP_IGNORE = 2000  # ignore a sudden unrealistic jump
+MAX_SKIP_AHEAD_MS = 500  # 0.5 seconds
 
 
 @dataclass
@@ -215,7 +215,6 @@ class SlimprotoProvider(PlayerProvider):
 
     slimproto: SlimServer
     _sync_playpoints: dict[str, deque[SyncPlayPoint]]
-    _do_not_resync_before: dict[str, float]
 
     @property
     def supported_features(self) -> tuple[ProviderFeature, ...]:
@@ -225,7 +224,6 @@ class SlimprotoProvider(PlayerProvider):
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
         self._sync_playpoints = {}
-        self._do_not_resync_before = {}
         self._resync_handle: asyncio.TimerHandle | None = None
         control_port = self.config.get_value(CONF_PORT)
         telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT)
@@ -360,6 +358,7 @@ class SlimprotoProvider(PlayerProvider):
                         self._handle_play_url(
                             slimplayer,
                             url=stream_job.resolve_stream_url(
+                                player_id,
                                 slimplayer.player_id,
                                 output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
                             ),
@@ -374,6 +373,7 @@ class SlimprotoProvider(PlayerProvider):
             if not slimplayer:
                 return
             url = await self.mass.streams.resolve_stream_url(
+                player_id,
                 queue_item=queue_item,
                 # for now just hardcode flac as we assume that every (modern)
                 # slimproto based player can handle that just fine
@@ -390,39 +390,13 @@ class SlimprotoProvider(PlayerProvider):
                 auto_play=True,
             )
 
-    async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle PLAY STREAM on given player.
-
-        This is a special feature from the Universal Group provider.
-        """
-        enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
-        # fix race condition where resync and play media are called at more or less the same time
-        if self._resync_handle:
-            self._resync_handle.cancel()
-            self._resync_handle = None
-        # forward command to player and any connected sync members
-        sync_clients = self._get_sync_clients(player_id)
-        async with asyncio.TaskGroup() as tg:
-            for slimplayer in sync_clients:
-                tg.create_task(
-                    self._handle_play_url(
-                        slimplayer,
-                        url=stream_job.resolve_stream_url(
-                            slimplayer.player_id,
-                            output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
-                        ),
-                        queue_item=None,
-                        send_flush=True,
-                        auto_play=False,
-                    )
-                )
-
     async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
         """Handle enqueuing of the next queue item on the player."""
         if not (slimplayer := self.slimproto.get_player(player_id)):
             return
         enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
         url = await self.mass.streams.resolve_stream_url(
+            player_id,
             queue_item=queue_item,
             output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
             flow_mode=False,
@@ -654,6 +628,11 @@ class SlimprotoProvider(PlayerProvider):
                     x.player_id for x in self.slimproto.players if x.player_id != player_id
                 ),
             )
+            if slimplayer.device_type == "squeezeesp32":
+                # squeezeesp32 with default settings - override with sane defaults
+                if slimplayer.max_sample_rate == 192000:
+                    player.max_sample_rate = 44100
+                player.supports_24bit = False
             self.mass.players.register_or_update(player)
 
         # update player state on player events
@@ -737,17 +716,12 @@ class SlimprotoProvider(PlayerProvider):
             return
         if slimplayer.state != SlimPlayerState.PLAYING:
             return
-
-        if backoff_time := self._do_not_resync_before.get(slimplayer.player_id):
-            # player has set a timestamp we should backoff from syncing it
-            if time.time() < backoff_time:
-                return
+        if slimplayer.player_id not in self._sync_playpoints:
+            return
 
         # we collect a few playpoints of the player to determine
         # average lag/drift so we can adjust accordingly
-        sync_playpoints = self._sync_playpoints.setdefault(
-            slimplayer.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS)
-        )
+        sync_playpoints = self._sync_playpoints[slimplayer.player_id]
 
         active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
         stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id)
@@ -755,8 +729,9 @@ class SlimprotoProvider(PlayerProvider):
             # should not happen, but just in case
             return
 
+        now = time.time()
         last_playpoint = sync_playpoints[-1] if sync_playpoints else None
-        if last_playpoint and (time.time() - last_playpoint.timestamp) > 10:
+        if last_playpoint and (now - last_playpoint.timestamp) > 10:
             # last playpoint is too old, invalidate
             sync_playpoints.clear()
         if last_playpoint and last_playpoint.sync_job_id != stream_job.job_id:
@@ -768,8 +743,12 @@ class SlimprotoProvider(PlayerProvider):
             - self._get_corrected_elapsed_milliseconds(slimplayer)
         )
 
+        if last_playpoint and abs(last_playpoint.diff - diff) > DEVIATION_JUMP_IGNORE:
+            # ignore unexpected spikes
+            return
+
         # we can now append the current playpoint to our list
-        sync_playpoints.append(SyncPlayPoint(time.time(), stream_job.job_id, diff))
+        sync_playpoints.append(SyncPlayPoint(now, stream_job.job_id, diff))
 
         if len(sync_playpoints) < MIN_REQ_PLAYPOINTS:
             return
@@ -783,24 +762,22 @@ class SlimprotoProvider(PlayerProvider):
 
         # resync the player by skipping ahead or pause for x amount of (milli)seconds
         sync_playpoints.clear()
-        self._do_not_resync_before[slimplayer.player_id] = time.time() + (delta / 1000) + 2
         if avg_diff > MAX_SKIP_AHEAD_MS:
             # player lagging behind more than MAX_SKIP_AHEAD_MS,
             # we need to correct the sync_master
-            self.logger.warning(
-                "%s is lagging behind more than %s milliseconds!",
-                player.display_name,
-                MAX_SKIP_AHEAD_MS,
-            )
+            self.logger.debug("%s resync: pauseFor %sms", sync_master.name, delta)
             self.mass.create_task(sync_master.pause_for(delta))
+            sync_master._elapsed_milliseconds -= delta
         elif avg_diff > 0:
             # handle player lagging behind, fix with skip_ahead
             self.logger.debug("%s resync: skipAhead %sms", player.display_name, delta)
             self.mass.create_task(slimplayer.skip_over(delta))
+            sync_master._elapsed_milliseconds += delta
         else:
             # handle player is drifting too far ahead, use pause_for to adjust
             self.logger.debug("%s resync: pauseFor %sms", player.display_name, delta)
             self.mass.create_task(slimplayer.pause_for(delta))
+            sync_master._elapsed_milliseconds -= delta
 
     async def _handle_buffer_ready(self, slimplayer: SlimClient) -> None:
         """Handle buffer ready event, player has buffered a (new) track.
@@ -813,29 +790,31 @@ class SlimprotoProvider(PlayerProvider):
             return
         if not player.group_childs:
             # not a sync group, continue
-            await slimplayer.play()
+            await slimplayer.unpause_at(slimplayer.jiffies)
             return
         count = 0
         while count < 40:
             childs_total = 0
             childs_ready = 0
+            await asyncio.sleep(0.2)
             for sync_child in self._get_sync_clients(player.player_id):
                 childs_total += 1
                 if sync_child.state == SlimPlayerState.BUFFER_READY:
                     childs_ready += 1
             if childs_total == childs_ready:
                 break
-            await asyncio.sleep(0.1)
+
         # all child's ready (or timeout) - start play
         async with asyncio.TaskGroup() as tg:
             for _client in self._get_sync_clients(player.player_id):
-                timestamp = _client.jiffies + 500
-                sync_delay = self.mass.config.get_raw_player_config_value(
-                    _client.player_id, CONF_SYNC_ADJUST, 0
-                )
-                timestamp -= sync_delay
-                self._do_not_resync_before[_client.player_id] = time.time() + 1
-                tg.create_task(_client.unpause_at(int(timestamp)))
+                self._sync_playpoints.setdefault(
+                    _client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS * 2)
+                ).clear()
+                # NOTE: Officially you should do an unpause_at based on the player timestamp
+                # but I did not have any good results with that.
+                # Instead just start playback on all players and let the sync logic work out
+                # the delays etc.
+                tg.create_task(_client.unpause_at(0))
 
     async def _handle_connected(self, slimplayer: SlimClient) -> None:
         """Handle a slimplayer connected event."""
index 3e81f4818350710be526dc4ad1bd916c0e592855..8cf61b0dbcaad800cac16f52e50179f986980e19 100644 (file)
@@ -39,7 +39,6 @@ if TYPE_CHECKING:
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
     from music_assistant.server.models import ProviderInstanceType
 
 CONF_SNAPCAST_SERVER_HOST = "snapcast_server_host"
@@ -268,81 +267,41 @@ class SnapCastProvider(PlayerProvider):
         snap_group = self._get_snapgroup(player_id)
         await snap_group.set_stream(stream.identifier)
 
-        async def _streamer() -> None:
-            host = self.snapcast_server_host
-            _, writer = await asyncio.open_connection(host, port)
-            self.logger.debug("Opened connection to %s:%s", host, port)
-            player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
-            player.elapsed_time = 0
-            player.elapsed_time_last_updated = time.time()
-            player.state = PlayerState.PLAYING
-            self._set_childs_state(player_id, PlayerState.PLAYING)
-            self.mass.players.register_or_update(player)
-            # TODO: can we handle 24 bits bit depth ?
-            pcm_format = AudioFormat(
-                content_type=ContentType.PCM_S16LE,
-                sample_rate=48000,
-                bit_depth=16,
-                channels=2,
+        # TODO: can we handle 24 bits bit depth ?
+        pcm_format = AudioFormat(
+            content_type=ContentType.PCM_S16LE,
+            sample_rate=48000,
+            bit_depth=16,
+            channels=2,
+        )
+        # handle special case for UGP multi client stream
+        if stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id):
+            stream_job.expected_players.add(player_id)
+            audio_iterator = stream_job.subscribe(
+                player_id=player_id,
+                output_format=pcm_format,
+            )
+        else:
+            audio_iterator = self.mass.streams.get_flow_stream(
+                queue,
+                start_queue_item=queue_item,
+                pcm_format=pcm_format,
+                seek_position=seek_position,
+                fade_in=fade_in,
             )
-            try:
-                async for pcm_chunk in self.mass.streams.get_flow_stream(
-                    queue,
-                    start_queue_item=queue_item,
-                    pcm_format=pcm_format,
-                    seek_position=seek_position,
-                    fade_in=fade_in,
-                ):
-                    writer.write(pcm_chunk)
-                    await writer.drain()
-                # end of the stream reached
-                if writer.can_write_eof():
-                    writer.write_eof()
-                    await writer.drain()
-                # we need to wait a bit before removing the stream to ensure
-                # that all snapclients have consumed the audio
-                # https://github.com/music-assistant/hass-music-assistant/issues/1962
-                await asyncio.sleep(30)
-            finally:
-                if not writer.is_closing():
-                    writer.close()
-                await self._snapserver.stream_remove_stream(stream.identifier)
-                self.logger.debug("Closed connection to %s:%s", host, port)
-
-        # start streaming the queue (pcm) audio in a background task
-        self._stream_tasks[player_id] = asyncio.create_task(_streamer())
-
-    async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle PLAY STREAM on given player.
-
-        This is a special feature from the Universal Group provider.
-        """
-        player = self.mass.players.get(player_id)
-        if player.synced_to:
-            msg = "A synced player cannot receive play commands directly"
-            raise RuntimeError(msg)
-        # stop any existing streams first
-        await self.cmd_stop(player_id)
-        if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 48000:
-            # TODO: resample on the fly here ?
-            raise RuntimeError("Unsupported PCM format")
-        stream, port = await self._create_stream()
-        stream_job.expected_players.add(player_id)
-        snap_group = self._get_snapgroup(player_id)
-        await snap_group.set_stream(stream.identifier)
 
         async def _streamer() -> None:
             host = self.snapcast_server_host
             _, writer = await asyncio.open_connection(host, port)
             self.logger.debug("Opened connection to %s:%s", host, port)
-            player.current_item_id = f"flow/{stream_job.queue_id}"
+            player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
             player.elapsed_time = 0
             player.elapsed_time_last_updated = time.time()
             player.state = PlayerState.PLAYING
             self._set_childs_state(player_id, PlayerState.PLAYING)
             self.mass.players.register_or_update(player)
             try:
-                async for pcm_chunk in stream_job.subscribe(player_id):
+                async for pcm_chunk in audio_iterator:
                     writer.write(pcm_chunk)
                     await writer.drain()
                 # end of the stream reached
index 01a677b069ab50e9f8d358cc3ab96b30ee50bd66..af0c41359a3013580e40c3629fb14aeb95eb2188 100644 (file)
@@ -9,7 +9,6 @@ from __future__ import annotations
 
 import asyncio
 import logging
-import time
 from collections import OrderedDict
 from dataclasses import dataclass, field
 from typing import TYPE_CHECKING
@@ -46,7 +45,6 @@ if TYPE_CHECKING:
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
-    from music_assistant.server.controllers.streams import MultiClientStreamJob
     from music_assistant.server.models import ProviderInstanceType
 
 
@@ -353,6 +351,7 @@ class SonosPlayerProvider(PlayerProvider):
             - fade_in: Optionally fade in the item at playback start.
         """
         url = await self.mass.streams.resolve_stream_url(
+            player_id,
             queue_item=queue_item,
             output_codec=ContentType.FLAC,
             seek_position=seek_position,
@@ -367,32 +366,11 @@ class SonosPlayerProvider(PlayerProvider):
                 "accept play_media command, it is synced to another player."
             )
             raise PlayerCommandFailed(msg)
-        metadata = create_didl_metadata(self.mass, url, queue_item)
-        await self.mass.create_task(sonos_player.soco.play_uri, url, meta=metadata)
-
-    async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
-        """Handle PLAY STREAM on given player.
-
-        This is a special feature from the Universal Group provider.
-        """
-        url = stream_job.resolve_stream_url(player_id, ContentType.FLAC)
-        sonos_player = self.sonosplayers[player_id]
-        mass_player = self.mass.players.get(player_id)
-        if sonos_player.sync_coordinator:
-            # this should be already handled by the player manager, but just in case...
-            msg = (
-                f"Player {mass_player.display_name} can not "
-                "accept play_stream command, it is synced to another player."
-            )
-            raise PlayerCommandFailed(msg)
-        metadata = create_didl_metadata(self.mass, url, None)
-        # sonos players do not like our multi client stream
-        # add to the workaround players list
-        self.mass.streams.workaround_players.add(player_id)
-        await self.mass.create_task(sonos_player.soco.play_uri, url, meta=metadata)
-        # optimistically set this timestamp to help figure out elapsed time later
-        mass_player.elapsed_time = 0
-        mass_player.elapsed_time_last_updated = time.time()
+        await self.mass.create_task(
+            sonos_player.soco.play_uri,
+            url,
+            meta=create_didl_metadata(self.mass, url, queue_item),
+        )
 
     async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
         """
@@ -411,6 +389,7 @@ class SonosPlayerProvider(PlayerProvider):
         """
         sonos_player = self.sonosplayers[player_id]
         url = await self.mass.streams.resolve_stream_url(
+            player_id,
             queue_item=queue_item,
             output_codec=ContentType.FLAC,
         )
index c5c2dfb83a8a9eb0449b502ea5e6e79d44267c4e..7733d2160cf2760f8765d71eb9adbda4fa47b105 100644 (file)
@@ -612,7 +612,7 @@ class SpotifyProvider(MusicProvider):
                 if retries > 2:
                     # switch to ap workaround after 2 retries
                     self._ap_workaround = True
-            except asyncio.exceptions.TimeoutError:
+            except TimeoutError:
                 await asyncio.sleep(2)
         if tokeninfo and userinfo:
             self._auth_token = tokeninfo
index 51ee24d7398d004d1e880fb121894fedbd022adf..6aa5b16462fe711a77fce1780d39b82c562545c0 100644 (file)
@@ -26,6 +26,7 @@ from music_assistant.common.models.enums import (
     ProviderFeature,
 )
 from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.queue_item import QueueItem
 from music_assistant.constants import CONF_CROSSFADE, CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -34,7 +35,6 @@ if TYPE_CHECKING:
 
     from music_assistant.common.models.config_entries import ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
-    from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
     from music_assistant.server.models import ProviderInstanceType
 
@@ -177,13 +177,18 @@ class UniversalGroupProvider(PlayerProvider):
         await self.cmd_power(player_id, True)
         group_player = self.mass.players.get(player_id)
 
-        # create multi-client stream job
-        stream_job = await self.mass.streams.create_multi_client_stream_job(
+        # create a multi-client stream job - all (direct) child's of this UGP group
+        # will subscribe to this multi client queue stream
+        await self.mass.streams.create_multi_client_stream_job(
             player_id,
             start_queue_item=queue_item,
             seek_position=seek_position,
             fade_in=fade_in,
         )
+        # create a fake queue item to forward to downstream play_media commands
+        ugp_queue_item = QueueItem(
+            player_id, queue_item_id="flow", name=group_player.display_name, duration=None
+        )
 
         # forward the stream job to all group members
         async with asyncio.TaskGroup() as tg:
@@ -193,7 +198,7 @@ class UniversalGroupProvider(PlayerProvider):
                     member = self.mass.players.get_sync_leader(member)  # noqa: PLW2901
                     if member is None:
                         continue
-                tg.create_task(player_prov.play_stream(member.player_id, stream_job))
+                tg.create_task(player_prov.play_media(member.player_id, ugp_queue_item, 0, False))
 
     async def poll_player(self, player_id: str) -> None:
         """Poll player for state updates."""