Fix cleanup of (aborted) ffmpeg processes (#1166)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 23 Mar 2024 00:38:50 +0000 (01:38 +0100)
committerGitHub <noreply@github.com>
Sat, 23 Mar 2024 00:38:50 +0000 (01:38 +0100)
14 files changed:
music_assistant/__main__.py
music_assistant/server/controllers/players.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/process.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/chromecast/__init__.py
music_assistant/server/providers/slimproto/__init__.py
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/providers/spotify/__init__.py
music_assistant/server/providers/ugp/__init__.py
music_assistant/server/server.py
pyproject.toml
requirements_all.txt

index 712adc3f2d2ddfec394bc0f5bb2da0acfc2064f2..48604d8e36597793294286b3d2a60e541dc1f2f9 100644 (file)
@@ -50,7 +50,6 @@ def get_arguments():
         help="Provide logging level. Example --log-level debug, "
         "default=info, possible=(critical, error, warning, info, debug)",
     )
-    parser.add_argument("-u", "--enable-uvloop", action="store_true")
     return parser.parse_args()
 
 
@@ -179,7 +178,6 @@ def main() -> None:
         hass_options = {}
 
     log_level = hass_options.get("log_level", args.log_level).upper()
-    enable_uvloop = bool(hass_options.get("enable_uvloop", args.enable_uvloop))
     dev_mode = os.environ.get("PYTHONDEVMODE", "0") == "1"
 
     # setup logger
@@ -203,7 +201,6 @@ def main() -> None:
 
     run(
         start_mass(),
-        use_uvloop=enable_uvloop,
         shutdown_callback=on_shutdown,
         executor_workers=32,
     )
index b8b3d4763fd5d742c9064d0f74f06122a388d09b..28e33a78033a96ee5efb923440b18d9d1c4e172a 100644 (file)
@@ -603,12 +603,6 @@ class PlayerController(CoreController):
         player = self.get(player_id, True)
         if player.announcement_in_progress:
             return
-        # use stream server to host announcement on local network
-        # this ensures playback on all players, including ones that do not
-        # like https hosts and it also offers the pre-announce 'bell'
-        announcement_url = self.mass.streams.get_announcement_url(
-            player.player_id, url, use_pre_announce=use_pre_announce
-        )
         try:
             # mark announcement_in_progress on player
             player.announcement_in_progress = True
@@ -621,10 +615,16 @@ class PlayerController(CoreController):
             # check for native announce support
             if PlayerFeature.PLAY_ANNOUNCEMENT in player.supported_features:
                 if prov := self.mass.get_provider(player.provider):
+                    # use stream server to host announcement on local network
+                    # this ensures playback on all players, including ones that do not
+                    # like https hosts and it also offers the pre-announce 'bell'
+                    announcement_url = self.mass.streams.get_announcement_url(
+                        player.player_id, url, use_pre_announce=use_pre_announce
+                    )
                     await prov.play_announcement(player_id, announcement_url)
                     return
             # use fallback/default implementation
-            await self._play_announcement(player, announcement_url)
+            await self._play_announcement(player, url, use_pre_announce)
         finally:
             player.announcement_in_progress = False
 
@@ -1078,11 +1078,7 @@ class PlayerController(CoreController):
             # if a child player turned ON while the group player is on, we need to resync/resume
             self.mass.create_task(self._sync_syncgroup(group_player.player_id))
 
-    async def _play_announcement(
-        self,
-        player: Player,
-        announcement_url: str,
-    ) -> None:
+    async def _play_announcement(self, player: Player, url: str, use_pre_announce: bool) -> None:
         """Handle (default/fallback) implementation of the play announcement feature.
 
         This default implementation will;
@@ -1099,28 +1095,28 @@ class PlayerController(CoreController):
         """
         if player.synced_to:
             # redirect to sync master if player is group child
-            self.mass.create_task(self.play_announcement(player.synced_to, announcement_url))
+            self.mass.create_task(self.play_announcement(player.synced_to, url))
             return
         if active_group := self._get_active_player_group(player):
-            # redirect to group player if playergroup is atcive
-            self.mass.create_task(self.play_announcement(active_group.player_id, announcement_url))
+            # redirect to group player if playergroup is active
+            self.mass.create_task(self.play_announcement(active_group.player_id, url))
             return
         # create a queue item for the announcement so
         # we can send a regular play-media call downstream
         queue_item = QueueItem(
             queue_id=player.player_id,
-            queue_item_id=announcement_url,
+            queue_item_id=url,
             name="Announcement",
             duration=None,
             streamdetails=StreamDetails(
                 provider="url",
-                item_id=announcement_url,
+                item_id=url,
                 audio_format=AudioFormat(
-                    content_type=ContentType.try_parse(announcement_url),
+                    content_type=ContentType.try_parse(url),
                 ),
                 media_type=MediaType.ANNOUNCEMENT,
-                direct=announcement_url,
-                data=announcement_url,
+                direct=url,
+                data={"url": url, "use_pre_announce": use_pre_announce},
                 target_loudness=-10,
             ),
         )
index e773cf27aeb53d308c5cf1922d4a1334994a827d..6dc0f4d34217588ad03d3ba733c86fb2638b13e4 100644 (file)
@@ -19,12 +19,7 @@ from typing import TYPE_CHECKING
 import shortuuid
 from aiohttp import web
 
-from music_assistant.common.helpers.util import (
-    empty_queue,
-    get_ip,
-    select_free_port,
-    try_parse_bool,
-)
+from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool
 from music_assistant.common.models.config_entries import (
     ConfigEntry,
     ConfigValueOption,
@@ -43,18 +38,15 @@ from music_assistant.constants import (
     CONF_PUBLISH_IP,
     SILENCE_FILE,
     UGP_PREFIX,
-    VERBOSE_LOG_LEVEL,
 )
 from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.server.helpers.audio import (
     check_audio_support,
     crossfade_pcm_parts,
-    get_ffmpeg_args,
     get_ffmpeg_stream,
     get_media_stream,
     get_player_filter_params,
 )
-from music_assistant.server.helpers.process import AsyncProcess
 from music_assistant.server.helpers.util import get_ips
 from music_assistant.server.helpers.webserver import Webserver
 from music_assistant.server.models.core_controller import CoreController
@@ -64,7 +56,6 @@ if TYPE_CHECKING:
     from music_assistant.common.models.player import Player
     from music_assistant.common.models.player_queue import PlayerQueue
     from music_assistant.common.models.queue_item import QueueItem
-    from music_assistant.server import MusicAssistant
 
 
 DEFAULT_STREAM_HEADERS = {
@@ -83,46 +74,46 @@ FLOW_DEFAULT_BIT_DEPTH = 24
 # pylint:disable=too-many-locals
 
 
-class QueueStreamJob:
+class MultiClientStreamJob:
     """
-    Representation of a (multiclient) Audio stream job/task.
-
-    The whole idea here is that the (pcm) audio source can be sent to multiple
-    players at once. For example for (slimproto/airplay) syncgroups and universal group.
+    Representation of a (multiclient) Audio Queue stream job/task.
 
-    All client players receive the exact same PCM audio chunks from the source audio,
-    which then can be optionally encoded and/or resampled to the player's preferences.
-    In case a stream is restarted (e.g. when seeking),
-    a new QueueStreamJob will be created.
+    The whole idea here is that in case of a player (sync)group,
+    all client players receive the exact same (PCM) audio chunks from the source audio.
+    A StreamJob is tied to a Queue and streams the queue flow stream,
+    In case a stream is restarted (e.g. when seeking), a new MultiClientStreamJob will be created.
     """
 
     _audio_task: asyncio.Task | None = None
 
     def __init__(
         self,
-        mass: MusicAssistant,
-        pcm_audio_source: AsyncGenerator[bytes, None],
+        stream_controller: StreamsController,
+        queue_id: str,
         pcm_format: AudioFormat,
-        auto_start: bool = False,
+        start_queue_item: QueueItem,
     ) -> None:
-        """Initialize QueueStreamJob instance."""
-        self.mass = mass
-        self.pcm_audio_source = pcm_audio_source
+        """Initialize MultiClientStreamJob instance."""
+        self.stream_controller = stream_controller
+        self.queue_id = queue_id
+        self.queue = self.stream_controller.mass.player_queues.get(queue_id)
+        assert self.queue  # just in case
         self.pcm_format = pcm_format
-        self.expected_players: set[str] = set()
+        self.start_queue_item = start_queue_item
         self.job_id = shortuuid.uuid()
-        self.bytes_streamed: int = 0
-        self.logger = self.mass.streams.logger
+        self.expected_players: set[str] = set()
         self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {}
-        self._finished = False
-        self.allow_start = auto_start
+        self.bytes_streamed: int = 0
         self._all_clients_connected = asyncio.Event()
+        self.logger = stream_controller.logger.getChild("streamjob")
+        self._finished: bool = False
+        # start running the audio task in the background
         self._audio_task = asyncio.create_task(self._stream_job_runner())
 
     @property
     def finished(self) -> bool:
         """Return if this StreamJob is finished."""
-        return self._finished or (self._audio_task and self._audio_task.done())
+        return self._finished or self._audio_task and self._audio_task.done()
 
     @property
     def pending(self) -> bool:
@@ -132,116 +123,74 @@ class QueueStreamJob:
     @property
     def running(self) -> bool:
         """Return if this Job is running."""
-        return (
-            self._all_clients_connected.is_set()
-            and self._audio_task
-            and not self._audio_task.done()
-        )
-
-    def start(self) -> None:
-        """Start running (send audio chunks to connected players)."""
-        if self.finished:
-            raise RuntimeError("Task is already finished")
-        self.allow_start = True
-        if self.expected_players and len(self.subscribed_players) >= len(self.expected_players):
-            self._all_clients_connected.set()
+        return not self.finished and not self.pending
 
     def stop(self) -> None:
         """Stop running this job."""
-        if self._audio_task and not self._audio_task.done():
-            self._audio_task.cancel()
-        if not self._finished:
-            # we need to make sure that we close the async generator
-            with suppress(StopAsyncIteration):
-                task = asyncio.create_task(self.pcm_audio_source.__anext__())
-                task.cancel()
         self._finished = True
+        if self._audio_task and self._audio_task.done():
+            return
+        if self._audio_task:
+            self._audio_task.cancel()
         for sub_queue in self.subscribed_players.values():
-            empty_queue(sub_queue)
+            with suppress(asyncio.QueueFull):
+                sub_queue.put_nowait(b"")
 
-    def resolve_stream_url(self, player_id: str, output_codec: ContentType) -> str:
+    def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
         """Resolve the childplayer specific stream URL to this streamjob."""
         fmt = output_codec.value
         # handle raw pcm
         if output_codec.is_pcm():
-            player = self.mass.streams.mass.players.get(player_id)
+            player = self.stream_controller.mass.players.get(child_player_id)
             player_max_bit_depth = 24 if player.supports_24bit else 16
             output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate)
             output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth)
-            output_channels = self.mass.config.get_raw_player_config_value(
-                player_id, CONF_OUTPUT_CHANNELS, "stereo"
+            output_channels = self.stream_controller.mass.config.get_raw_player_config_value(
+                child_player_id, CONF_OUTPUT_CHANNELS, "stereo"
             )
             channels = 1 if output_channels != "stereo" else 2
             fmt += (
                 f";codec=pcm;rate={output_sample_rate};"
                 f"bitrate={output_bit_depth};channels={channels}"
             )
-        url = f"{self.mass.streams._server.base_url}/flow/{self.job_id}/{player_id}.{fmt}"
-        self.expected_players.add(player_id)
+        url = f"{self.stream_controller._server.base_url}/multi/{self.queue_id}/{self.job_id}/{child_player_id}/{self.start_queue_item.queue_item_id}.{fmt}"  # noqa: E501
+        self.expected_players.add(child_player_id)
         return url
 
-    async def iter_player_audio(
-        self, player_id: str, output_format: AudioFormat, chunk_size: int | None = None
-    ) -> AsyncGenerator[bytes, None]:
-        """Subscribe consumer and iterate player-specific audio."""
-        async for chunk in get_ffmpeg_stream(
-            audio_input=self.subscribe(player_id),
-            input_format=self.pcm_format,
-            output_format=output_format,
-            filter_params=get_player_filter_params(self.mass, player_id),
-            chunk_size=chunk_size,
-        ):
-            yield chunk
-
-    async def stream_to_custom_output_path(
-        self, player_id: str, output_format: AudioFormat, output_path: str | int
-    ) -> None:
-        """Subscribe consumer and instruct ffmpeg to send the audio to the given output path."""
-        custom_file_pointer = isinstance(output_path, int)
-        ffmpeg_args = get_ffmpeg_args(
-            input_format=self.pcm_format,
-            output_format=output_format,
-            filter_params=get_player_filter_params(self.mass, player_id),
-            extra_args=[],
-            input_path="-",
-            output_path="-" if custom_file_pointer else output_path,
-            loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
-        )
-        # launch ffmpeg process with player specific settings
-        # the stream_job_runner will start pushing pcm chunks to the stdin
-        # the ffmpeg process will send the output directly to the given path (e.g. tcp socket)
-        async with AsyncProcess(
-            ffmpeg_args,
-            enable_stdin=True,
-            enable_stdout=custom_file_pointer,
-            enable_stderr=False,
-            custom_stdin=self.subscribe(player_id),
-            custom_stdout=output_path if custom_file_pointer else None,
-            name="ffmpeg_custom_output_path",
-        ) as ffmpeg_proc:
-            # we simply wait for the process to exit
-            await ffmpeg_proc.wait()
-
     async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]:
         """Subscribe consumer and iterate incoming chunks on the queue."""
         try:
-            self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2)
-
-            if self._all_clients_connected.is_set():
-                # client subscribes while we're already started
+            # some players (e.g. dlna, sonos) misbehave and do multiple GET requests
+            # to the stream in an attempt to get the audio details such as duration
+            # which is a bit pointless for our duration-less queue stream
+            # and it completely messes with the subscription logic
+            if player_id in self.subscribed_players:
                 self.logger.warning(
-                    "Client %s is joining while the stream is already started", player_id
+                    "Player %s is making multiple requests "
+                    "to the same stream, playback may be disturbed!",
+                    player_id,
+                )
+                player_id = f"{player_id}_{shortuuid.random(4)}"
+            elif self._all_clients_connected.is_set():
+                # client subscribes while we're already started - that is going to be messy for sure
+                self.logger.warning(
+                    "Player %s is is joining while the stream is already started, "
+                    "playback may be disturbed!",
+                    player_id,
                 )
 
+            self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2)
+
+            if self._all_clients_connected.is_set():
+                # client subscribes while we're already started - we dont support that (for now?)
+                msg = f"Client {player_id} is joining while the stream is already started"
+                raise RuntimeError(msg)
             self.logger.debug("Subscribed client %s", player_id)
 
-            if (
-                self.expected_players
-                and self.allow_start
-                and len(self.subscribed_players) == len(self.expected_players)
-            ):
+            if len(self.subscribed_players) == len(self.expected_players):
                 # we reached the number of expected subscribers, set event
                 # so that chunks can be pushed
+                await asyncio.sleep(0.2)
                 self._all_clients_connected.set()
 
             # keep reading audio chunks from the queue until we receive an empty one
@@ -258,7 +207,7 @@ class QueueStreamJob:
             await asyncio.sleep(2)
             if len(self.subscribed_players) == 0 and self._audio_task and not self.finished:
                 self.logger.debug("Cleaning up, all clients disappeared...")
-                self.stop()
+                self._audio_task.cancel()
 
     async def _put_chunk(self, chunk: bytes) -> None:
         """Put chunk of data to all subscribers."""
@@ -271,7 +220,11 @@ class QueueStreamJob:
     async def _stream_job_runner(self) -> None:
         """Feed audio chunks to StreamJob subscribers."""
         chunk_num = 0
-        async for chunk in self.pcm_audio_source:
+        async for chunk in self.stream_controller.get_flow_stream(
+            self.queue,
+            self.start_queue_item,
+            self.pcm_format,
+        ):
             chunk_num += 1
             if chunk_num == 1:
                 # wait until all expected clients are connected
@@ -280,19 +233,20 @@ class QueueStreamJob:
                         await self._all_clients_connected.wait()
                 except TimeoutError:
                     if len(self.subscribed_players) == 0:
-                        self.logger.exception(
-                            "Abort multi client stream job  %s: "
-                            "client(s) did not connect within timeout",
-                            self.job_id,
+                        self.stream_controller.logger.exception(
+                            "Abort multi client stream job for queue %s: "
+                            "clients did not connect within timeout",
+                            self.queue.display_name,
                         )
                         break
                     # not all clients connected but timeout expired, set flag and move on
                     # with all clients that did connect
                     self._all_clients_connected.set()
                 else:
-                    self.logger.debug(
-                        "Starting queue stream job %s with %s (out of %s) connected clients",
-                        self.job_id,
+                    self.stream_controller.logger.debug(
+                        "Starting multi client stream job for queue %s "
+                        "with %s out of %s connected clients",
+                        self.queue.display_name,
                         len(self.subscribed_players),
                         len(self.expected_players),
                     )
@@ -323,7 +277,7 @@ class StreamsController(CoreController):
         """Initialize instance."""
         super().__init__(*args, **kwargs)
         self._server = Webserver(self.logger, enable_dynamic_routes=True)
-        self.stream_jobs: dict[str, QueueStreamJob] = {}
+        self.multi_client_jobs: dict[str, MultiClientStreamJob] = {}
         self.register_dynamic_route = self._server.register_dynamic_route
         self.unregister_dynamic_route = self._server.unregister_dynamic_route
         self.manifest.name = "Streamserver"
@@ -413,7 +367,7 @@ class StreamsController(CoreController):
             static_routes=[
                 (
                     "*",
-                    "/flow/{job_id}/{player_id}.{fmt}",
+                    "/flow/{queue_id}/{queue_item_id}.{fmt}",
                     self.serve_queue_flow_stream,
                 ),
                 (
@@ -421,6 +375,11 @@ class StreamsController(CoreController):
                     "/single/{queue_id}/{queue_item_id}.{fmt}",
                     self.serve_queue_item_stream,
                 ),
+                (
+                    "*",
+                    "/multi/{queue_id}/{job_id}/{player_id}/{queue_item_id}.{fmt}",
+                    self.serve_multi_subscriber_stream,
+                ),
                 (
                     "*",
                     "/command/{queue_id}/{command}.mp3",
@@ -447,43 +406,25 @@ class StreamsController(CoreController):
     ) -> str:
         """Resolve the stream URL for the given QueueItem."""
         fmt = output_codec.value
-        # handle announcement item
-        if queue_item.media_type == MediaType.ANNOUNCEMENT:
-            return queue_item.queue_item_id
-        # handle request for (multi client) queue flow stream
+        # handle special stream created by UGP
         if queue_item.queue_id.startswith(UGP_PREFIX):
-            # special case: we got forwarded a request from a Universal Group Player
-            # use the existing stream job that was already created by UGP
-            stream_job = self.mass.streams.stream_jobs[queue_item.queue_id]
-            return stream_job.resolve_stream_url(player_id, output_codec)
-
-        if flow_mode:
-            # create a new flow mode stream job session
-            pcm_format = AudioFormat(
-                content_type=ContentType.from_bit_depth(24),
-                sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
-                bit_depth=FLOW_DEFAULT_BIT_DEPTH,
+            return self.multi_client_jobs[queue_item.queue_id].resolve_stream_url(
+                player_id, output_codec
             )
-            stream_job = self.create_stream_job(
-                queue_item.queue_id,
-                pcm_audio_source=self.get_flow_stream(
-                    self.mass.player_queues.get(queue_item.queue_id),
-                    start_queue_item=queue_item,
-                    pcm_format=pcm_format,
-                ),
-                pcm_format=pcm_format,
-                auto_start=True,
+        # handle announcement item
+        if queue_item.media_type == MediaType.ANNOUNCEMENT:
+            return self.get_announcement_url(
+                player_id=queue_item.queue_id,
+                announcement_url=queue_item.streamdetails.data["url"],
+                use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
+                content_type=output_codec,
             )
-
-            return stream_job.resolve_stream_url(player_id, output_codec)
-
         # handle raw pcm without exact format specifiers
         if output_codec.is_pcm() and ";" not in fmt:
             fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
         query_params = {}
-        url = (
-            f"{self._server.base_url}/single/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}"
-        )
+        base_path = "flow" if flow_mode else "single"
+        url = f"{self._server.base_url}/{base_path}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}"  # noqa: E501
         # we add a timestamp as basic checksum
         # most importantly this is to invalidate any caches
         # but also to handle edge cases such as single track repeat
@@ -491,27 +432,38 @@ class StreamsController(CoreController):
         url += "?" + urllib.parse.urlencode(query_params)
         return url
 
-    def create_stream_job(
+    def create_multi_client_stream_job(
         self,
         queue_id: str,
-        pcm_audio_source: AsyncGenerator[bytes, None],
-        pcm_format: AudioFormat,
-        auto_start: bool = False,
-    ) -> QueueStreamJob:
-        """
-        Create a QueueStreamJob for the given queue..
+        start_queue_item: QueueItem,
+        pcm_bit_depth: int = FLOW_DEFAULT_BIT_DEPTH,
+        pcm_sample_rate: int = FLOW_DEFAULT_SAMPLE_RATE,
+    ) -> MultiClientStreamJob:
+        """Create a MultiClientStreamJob for the given queue..
 
         This is called by player/sync group implementations to start streaming
         the queue audio to multiple players at once.
         """
-        if existing_job := self.stream_jobs.pop(queue_id, None):
+        if existing_job := self.multi_client_jobs.pop(queue_id, None):
+            if (
+                queue_id.startswith(UGP_PREFIX)
+                and existing_job.job_id == start_queue_item.queue_item_id
+            ):
+                return existing_job
             # cleanup existing job first
-            existing_job.stop()
-        self.stream_jobs[queue_id] = stream_job = QueueStreamJob(
-            self.mass,
-            pcm_audio_source=pcm_audio_source,
-            pcm_format=pcm_format,
-            auto_start=auto_start,
+            if not existing_job.finished:
+                self.logger.warning("Detected existing (running) stream job for queue %s", queue_id)
+                existing_job.stop()
+        self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob(
+            self,
+            queue_id=queue_id,
+            pcm_format=AudioFormat(
+                content_type=ContentType.from_bit_depth(pcm_bit_depth),
+                sample_rate=pcm_sample_rate,
+                bit_depth=pcm_bit_depth,
+                channels=2,
+            ),
+            start_queue_item=start_queue_item,
         )
         return stream_job
 
@@ -586,32 +538,28 @@ class StreamsController(CoreController):
     async def serve_queue_flow_stream(self, request: web.Request) -> web.Response:
         """Stream Queue Flow audio to player."""
         self._log_request(request)
-        job_id = request.match_info["job_id"]
-        for queue_id, stream_job in self.stream_jobs.items():
-            if stream_job.job_id == job_id:
-                break
-        else:
-            raise web.HTTPNotFound(reason=f"Unknown StreamJob: {job_id}")
-        if stream_job.finished:
-            raise web.HTTPNotFound(reason=f"StreamJob {job_id} already finished")
-        if not (queue := self.mass.player_queues.get(queue_id)):
+        queue_id = request.match_info["queue_id"]
+        queue = self.mass.player_queues.get(queue_id)
+        if not queue:
             raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
-        player_id = request.match_info["player_id"]
-        child_player = self.mass.players.get(player_id)
-        if not child_player:
-            raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
-        # work out (childplayer specific!) output format/details
+        if not (queue_player := self.mass.players.get(queue_id)):
+            raise web.HTTPNotFound(reason=f"Unknown Player: {queue_id}")
+        start_queue_item_id = request.match_info["queue_item_id"]
+        start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
+        if not start_queue_item:
+            raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
+        # work out output format/details
         output_format = await self._get_output_format(
             output_format_str=request.match_info["fmt"],
-            queue_player=child_player,
-            default_sample_rate=stream_job.pcm_format.sample_rate,
-            default_bit_depth=stream_job.pcm_format.bit_depth,
+            queue_player=queue_player,
+            default_sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
+            default_bit_depth=FLOW_DEFAULT_BIT_DEPTH,
         )
         # play it safe: only allow icy metadata for mp3 and aac
         enable_icy = request.headers.get(
             "Icy-MetaData", ""
         ) == "1" and output_format.content_type in (ContentType.MP3, ContentType.AAC)
-        icy_meta_interval = 16384 * 4 if output_format.content_type.is_lossless() else 16384
+        icy_meta_interval = 16384
 
         # prepare request, add some DLNA/UPNP compatible headers
         headers = {
@@ -632,32 +580,31 @@ class StreamsController(CoreController):
         if request.method != "GET":
             return resp
 
-        # some players (e.g. dlna, sonos) misbehave and do multiple GET requests
-        # to the stream in an attempt to get the audio details such as duration
-        # which is a bit pointless for our duration-less queue stream
-        # and it completely messes with the subscription logic
-        if player_id in stream_job.subscribed_players:
-            self.logger.warning(
-                "Player %s is making multiple requests "
-                "to the same stream, playback may be disturbed!",
-                player_id,
-            )
-        elif "rincon" in player_id.lower():
-            await asyncio.sleep(0.1)
-
         # all checks passed, start streaming!
-        self.logger.debug(
-            "Start serving Queue flow audio stream for queue %s to player %s",
-            queue.display_name,
-            child_player.display_name,
+        self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
+
+        # collect player specific ffmpeg args to re-encode the source PCM stream
+        pcm_format = AudioFormat(
+            content_type=ContentType.from_bit_depth(output_format.bit_depth),
+            sample_rate=output_format.sample_rate,
+            bit_depth=output_format.bit_depth,
+            channels=2,
         )
-        async for chunk in stream_job.iter_player_audio(
-            player_id, output_format, chunk_size=icy_meta_interval if enable_icy else None
+        async for chunk in get_ffmpeg_stream(
+            audio_input=self.get_flow_stream(
+                queue=queue, start_queue_item=start_queue_item, pcm_format=pcm_format
+            ),
+            input_format=pcm_format,
+            output_format=output_format,
+            filter_params=get_player_filter_params(self.mass, queue_player.player_id),
+            chunk_size=icy_meta_interval if enable_icy else None,
         ):
             try:
                 await resp.write(chunk)
             except (BrokenPipeError, ConnectionResetError):
+                # race condition
                 break
+
             if not enable_icy:
                 continue
 
@@ -685,6 +632,64 @@ class StreamsController(CoreController):
 
         return resp
 
+    async def serve_multi_subscriber_stream(self, request: web.Request) -> web.Response:
+        """Stream Queue Flow audio to a child player within a multi subscriber setup."""
+        self._log_request(request)
+        queue_id = request.match_info["queue_id"]
+        streamjob = self.multi_client_jobs.get(queue_id)
+        if not streamjob:
+            raise web.HTTPNotFound(reason=f"Unknown StreamJob for queue: {queue_id}")
+        job_id = request.match_info["job_id"]
+        if job_id != streamjob.job_id:
+            raise web.HTTPNotFound(reason=f"StreamJob ID {job_id} mismatch for queue: {queue_id}")
+        child_player_id = request.match_info["player_id"]
+        child_player = self.mass.players.get(child_player_id)
+        if not child_player:
+            raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
+        # work out (childplayer specific!) output format/details
+        output_format = await self._get_output_format(
+            output_format_str=request.match_info["fmt"],
+            queue_player=child_player,
+            default_sample_rate=streamjob.pcm_format.sample_rate,
+            default_bit_depth=streamjob.pcm_format.bit_depth,
+        )
+        # prepare request, add some DLNA/UPNP compatible headers
+        headers = {
+            **DEFAULT_STREAM_HEADERS,
+            "Content-Type": f"audio/{output_format.output_format_str}",
+        }
+        resp = web.StreamResponse(
+            status=200,
+            reason="OK",
+            headers=headers,
+        )
+        await resp.prepare(request)
+
+        # return early if this is not a GET request
+        if request.method != "GET":
+            return resp
+
+        # all checks passed, start streaming!
+        self.logger.debug(
+            "Start serving multi-subscriber Queue flow audio stream for queue %s to player %s",
+            streamjob.queue.display_name,
+            child_player.display_name,
+        )
+
+        async for chunk in get_ffmpeg_stream(
+            audio_input=streamjob.subscribe(child_player_id),
+            input_format=streamjob.pcm_format,
+            output_format=output_format,
+            filter_params=get_player_filter_params(self.mass, child_player_id),
+        ):
+            try:
+                await resp.write(chunk)
+            except (BrokenPipeError, ConnectionResetError):
+                # race condition
+                break
+
+        return resp
+
     async def serve_command_request(self, request: web.Request) -> web.Response:
         """Handle special 'command' request for a player."""
         self._log_request(request)
@@ -703,11 +708,11 @@ class StreamsController(CoreController):
             raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
         if player_id not in self.announcements:
             raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
-        announcement = self.announcements[player_id]
+        announcement_url = self.announcements[player_id]
         use_pre_announce = try_parse_bool(request.query.get("pre_announce"))
 
         # work out output format/details
-        fmt = request.match_info.get("fmt", announcement.rsplit(".")[-1])
+        fmt = request.match_info.get("fmt", announcement_url.rsplit(".")[-1])
         audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
         # prepare request, add some DLNA/UPNP compatible headers
         headers = {
@@ -728,26 +733,13 @@ class StreamsController(CoreController):
         # all checks passed, start streaming!
         self.logger.debug(
             "Start serving audio stream for Announcement %s to %s",
-            announcement,
+            announcement_url,
             player.display_name,
         )
-        extra_args = []
-        filter_params = ["loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5"]
-        if use_pre_announce:
-            extra_args += [
-                "-i",
-                ANNOUNCE_ALERT_FILE,
-                "-filter_complex",
-                "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5",
-            ]
-            filter_params = []
-
-        async for chunk in get_ffmpeg_stream(
-            audio_input=announcement,
-            input_format=audio_format,
+        async for chunk in self.get_announcement_stream(
+            announcement_url=announcement_url,
             output_format=audio_format,
-            extra_args=extra_args,
-            filter_params=filter_params,
+            use_pre_announce=use_pre_announce,
         ):
             try:
                 await resp.write(chunk)
@@ -756,7 +748,7 @@ class StreamsController(CoreController):
 
         self.logger.debug(
             "Finished serving audio stream for Announcement %s to %s",
-            announcement,
+            announcement_url,
             player.display_name,
         )
 
@@ -922,6 +914,32 @@ class StreamsController(CoreController):
         del buffer
         self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
 
+    async def get_announcement_stream(
+        self, announcement_url: str, output_format: AudioFormat, use_pre_announce: bool = False
+    ) -> AsyncGenerator[bytes, None]:
+        """Get the special announcement stream."""
+        # work out output format/details
+        fmt = announcement_url.rsplit(".")[-1]
+        audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
+        extra_args = []
+        filter_params = ["loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5"]
+        if use_pre_announce:
+            extra_args += [
+                "-i",
+                ANNOUNCE_ALERT_FILE,
+                "-filter_complex",
+                "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5",
+            ]
+            filter_params = []
+        async for chunk in get_ffmpeg_stream(
+            audio_input=announcement_url,
+            input_format=audio_format,
+            output_format=output_format,
+            extra_args=extra_args,
+            filter_params=filter_params,
+        ):
+            yield chunk
+
     def _log_request(self, request: web.Request) -> None:
         """Log request."""
         if not self.logger.isEnabledFor(logging.DEBUG):
index 720d3ac54dfd94269a4953f17ca4e1e209a4c8b8..c6515755bfcca412d3c3d0b0124c7cb57791a4d8 100644 (file)
@@ -373,6 +373,7 @@ async def get_media_stream(  # noqa: PLR0915
         filter_params=filter_params,
         extra_args=extra_args,
         input_path=streamdetails.direct or "-",
+        loglevel="info",  # needed for loudness measurement
     )
 
     finished = False
@@ -381,24 +382,18 @@ async def get_media_stream(  # noqa: PLR0915
         ffmpeg_args,
         enable_stdin=streamdetails.direct is None,
         enable_stderr=True,
+        custom_stdin=mass.get_provider(streamdetails.provider).get_audio_stream(
+            streamdetails,
+            seek_position=streamdetails.seek_position if streamdetails.can_seek else 0,
+        )
+        if not streamdetails.direct
+        else None,
         name="ffmpeg_media_stream",
     )
     await ffmpeg_proc.start()
     logger = LOGGER.getChild("media_stream")
     logger.debug("start media stream for: %s", streamdetails.uri)
 
-    async def writer() -> None:
-        """Task that grabs the source audio and feeds it to ffmpeg."""
-        music_prov = mass.get_provider(streamdetails.provider)
-        seek_pos = streamdetails.seek_position if streamdetails.can_seek else 0
-        async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
-            await ffmpeg_proc.write(audio_chunk)
-        # write eof when last packet is received
-        await ffmpeg_proc.write_eof()
-
-    if streamdetails.direct is None:
-        ffmpeg_proc.attached_tasks.append(asyncio.create_task(writer()))
-
     # get pcm chunks from stdout
     # we always stay one chunk behind to properly detect end of chunks
     # so we can strip silence at the beginning and end of a track
@@ -450,31 +445,28 @@ async def get_media_stream(  # noqa: PLR0915
     finally:
         seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
         streamdetails.seconds_streamed = seconds_streamed
+        state_str = "finished" if finished else "aborted"
+        logger.debug(
+            "stream %s for: %s (%s seconds streamed)",
+            state_str,
+            streamdetails.uri,
+            seconds_streamed,
+        )
+        # store accurate duration
         if finished:
-            logger.debug(
-                "finished stream for: %s (%s seconds streamed)",
-                streamdetails.uri,
-                seconds_streamed,
-            )
-            # store accurate duration
             streamdetails.duration = streamdetails.seek_position + seconds_streamed
-        else:
-            logger.debug(
-                "stream aborted for %s (%s seconds streamed)",
-                streamdetails.uri,
-                seconds_streamed,
-            )
 
         # use communicate to read stderr and wait for exit
         # read log for loudness measurement (or errors)
-        _, stderr = await ffmpeg_proc.communicate()
-        if ffmpeg_proc.returncode != 0:
-            # ffmpeg has a non zero returncode meaning trouble
+        try:
+            _, stderr = await asyncio.wait_for(ffmpeg_proc.communicate(), 5)
+        except TimeoutError:
+            stderr = b""
+            # ensure to send close here so we terminate and cleanup the process
+            await ffmpeg_proc.close()
+        if ffmpeg_proc.returncode != 0 and not bytes_sent:
             logger.warning("stream error on %s", streamdetails.uri)
-            logger.warning(stderr.decode())
-            finished = False
-        elif loudness_details := _parse_loudnorm(stderr):
-            logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
+        elif stderr and (loudness_details := _parse_loudnorm(stderr)):
             required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
             if finished or (seconds_streamed >= required_seconds):
                 LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details)
@@ -482,7 +474,7 @@ async def get_media_stream(  # noqa: PLR0915
                 await mass.music.set_track_loudness(
                     streamdetails.item_id, streamdetails.provider, loudness_details
                 )
-        else:
+        elif stderr:
             logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
 
         # report playback
@@ -859,14 +851,18 @@ def get_ffmpeg_args(
     input_format: AudioFormat,
     output_format: AudioFormat,
     filter_params: list[str],
-    extra_args: list[str],
+    extra_args: list[str] | None = None,
     input_path: str = "-",
     output_path: str = "-",
-    loglevel: str = "info",
+    loglevel: str | None = None,
 ) -> list[str]:
     """Collect all args to send to the ffmpeg process."""
+    if loglevel is None:
+        loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet"
+    if extra_args is None:
+        extra_args = []
+    extra_args += ["-bufsize", "32M"]
     ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support")
-
     if not ffmpeg_present:
         msg = (
             "FFmpeg binary is missing from system."
index d4cbc312afcffbd66f2f5c19e1ecb6d3e5eb1ba2..c529bc1246fd16cc33fd05d17ae9d54398266303 100644 (file)
@@ -12,6 +12,7 @@ import asyncio
 import logging
 import os
 from contextlib import suppress
+from signal import SIGINT
 from types import TracebackType
 from typing import TYPE_CHECKING
 
@@ -98,8 +99,6 @@ class AsyncProcess:
             stdin=stdin if self._enable_stdin else None,
             stdout=stdout if self._enable_stdout else None,
             stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
-            limit=4000000,
-            pipesize=256000,
         )
         LOGGER.debug("Started %s with PID %s", self._name, self.proc.pid)
 
@@ -167,12 +166,21 @@ class AsyncProcess:
                 task.cancel()
                 with suppress(asyncio.CancelledError):
                     await task
+        if self.proc.returncode is None:
+            # always first try to send sigint signal to try clean shutdown
+            # for example ffmpeg needs this to cleanly shutdown and not lock on pipes
+            self.proc.send_signal(SIGINT)
+            # allow the process a little bit of time to respond to the signal
+            await asyncio.sleep(0.1)
+
         # send communicate until we exited
         while self.proc.returncode is None:
-            # make sure the process is cleaned up
+            # make sure the process is really cleaned up.
+            # especially with pipes this can cause deadlocks if not properly guarded
+            # we need to use communicate to ensure buffers are flushed
+            # we do that with sending communicate
             try:
-                # we need to use communicate to ensure buffers are flushed
-                await asyncio.wait_for(self.proc.communicate(), 10)
+                await asyncio.wait_for(self.proc.communicate(), 2)
             except TimeoutError:
                 LOGGER.debug(
                     "Process %s with PID %s did not stop in time. Sending terminate...",
@@ -180,7 +188,6 @@ class AsyncProcess:
                     self.proc.pid,
                 )
                 self.proc.terminate()
-                await asyncio.sleep(0.5)
         LOGGER.debug(
             "Process %s with PID %s stopped with returncode %s",
             self._name,
index 8faafab8ab3d73fb97157ad9782e4a55747e851e..4cdee394582d734dab7442ee1fd85f273b4a3195 100644 (file)
@@ -8,6 +8,7 @@ import os
 import platform
 import socket
 import time
+from collections.abc import AsyncGenerator
 from contextlib import suppress
 from dataclasses import dataclass
 from random import randint, randrange
@@ -42,7 +43,7 @@ from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.common.models.player_queue import PlayerQueue
 from music_assistant.constants import CONF_SYNC_ADJUST, UGP_PREFIX, VERBOSE_LOG_LEVEL
-from music_assistant.server.helpers.audio import get_media_stream
+from music_assistant.server.helpers.audio import get_ffmpeg_args, get_player_filter_params
 from music_assistant.server.helpers.process import AsyncProcess, check_output
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -51,7 +52,6 @@ if TYPE_CHECKING:
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
-    from music_assistant.server.controllers.streams import QueueStreamJob
     from music_assistant.server.models import ProviderInstanceType
 
 DOMAIN = "airplay"
@@ -181,20 +181,23 @@ def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None:
 class AirplayStream:
     """Object that holds the details of a stream job."""
 
-    def __init__(self, prov: AirplayProvider, airplay_player: AirPlayPlayer) -> None:
+    def __init__(
+        self, prov: AirplayProvider, airplay_player: AirPlayPlayer, input_format: AudioFormat
+    ) -> None:
         """Initialize AirplayStream."""
         self.prov = prov
         self.mass = prov.mass
         self.airplay_player = airplay_player
+        self.input_format = input_format
         # always generate a new active remote id to prevent race conditions
         # with the named pipe used to send audio
         self.active_remote_id: str = str(randint(1000, 8000))
         self.start_ntp: int | None = None  # use as checksum
         self.prevent_playback: bool = False
-        self.stream_job: QueueStreamJob | None = None
         self._log_reader_task: asyncio.Task | None = None
         self._audio_reader_task: asyncio.Task | None = None
         self._cliraop_proc: AsyncProcess | None = None
+        self._ffmpeg_proc: AsyncProcess | None = None
         self._stop_requested = False
 
     @property
@@ -206,10 +209,9 @@ class AirplayStream:
             and self._cliraop_proc.returncode is None
         )
 
-    async def start(self, start_ntp: int, stream_job: QueueStreamJob) -> None:
+    async def start(self, start_ntp: int) -> None:
         """Initialize CLIRaop process for a player."""
         self.start_ntp = start_ntp
-        self.stream_job = stream_job
         extra_args = []
         player_id = self.airplay_player.player_id
         mass_player = self.mass.players.get(player_id)
@@ -237,7 +239,7 @@ class AirplayStream:
             "-port",
             str(self.airplay_player.discovery_info.port),
             "-wait",
-            str(3000 - sync_adjust),
+            str(2000 - sync_adjust),
             "-volume",
             str(mass_player.volume_level),
             *extra_args,
@@ -255,12 +257,28 @@ class AirplayStream:
 
         # connect cliraop stdin with ffmpeg stdout using os pipes
         read, write = os.pipe()
+
         # launch ffmpeg, feeding (player specific) audio chunks on stdout
-        self._audio_reader_task = asyncio.create_task(
-            stream_job.stream_to_custom_output_path(
-                player_id=player_id, output_format=AIRPLAY_PCM_FORMAT, output_path=write
-            )
+        # one could argue that the intermediate ffmpeg towards cliraop is not needed
+        # when there are no player specific filters or extras but in this case
+        # ffmpeg serves as a small buffer towards the realtime cliraop streamer
+        ffmpeg_args = get_ffmpeg_args(
+            input_format=self.input_format,
+            output_format=AIRPLAY_PCM_FORMAT,
+            filter_params=get_player_filter_params(self.mass, player_id),
+        )
+        # launch ffmpeg process with player specific settings
+        # the stream_job_runner will start pushing pcm chunks to the stdin
+        # the ffmpeg process will send the output directly to the given path (e.g. tcp socket)
+        self._ffmpeg_proc = AsyncProcess(
+            ffmpeg_args,
+            enable_stdin=True,
+            enable_stdout=True,
+            enable_stderr=False,
+            custom_stdout=write,
+            name="cliraop_ffmpeg",
         )
+        await self._ffmpeg_proc.start()
         self._cliraop_proc = AsyncProcess(
             cliraop_args,
             enable_stdin=True,
@@ -271,24 +289,31 @@ class AirplayStream:
         await self._cliraop_proc.start()
         self._log_reader_task = asyncio.create_task(self._log_watcher())
 
-        # self._audio_reader_task = asyncio.create_task(self._audio_reader())
+    async def write_chunk(self, chunk: bytes) -> None:
+        """Write a (pcm) audio chunk to the player."""
+        await self._ffmpeg_proc.write(chunk)
+
+    async def write_eof(self) -> None:
+        """Write EOF to the ffmpeg stdin."""
+        await self._ffmpeg_proc.write_eof()
+        await self._ffmpeg_proc.wait()
+        await self.stop()
 
     async def stop(self, wait: bool = True):
         """Stop playback and cleanup."""
-        if not self.running:
+        if self._cliraop_proc.closed and self._ffmpeg_proc.closed:
             return
         self._stop_requested = True
-        # send stop with cli command
-        await self.send_cli_command("ACTION=STOP")
 
         async def _stop() -> None:
-            # always stop the audio feeder
-            if self._audio_reader_task and not self._audio_reader_task.done():
-                with suppress(asyncio.CancelledError):
-                    self._audio_reader_task.cancel()
-            await self._cliraop_proc.write_eof()
-            # the process should exit gracefully after the stop request was processed
-            await asyncio.wait_for(self._cliraop_proc.wait(), 30)
+            # ffmpeg MUST be stopped before cliraop due to the chained pipes
+            await self._ffmpeg_proc.close()
+            # allow the cliraop process to stop gracefully first
+            await self.send_cli_command("ACTION=STOP")
+            with suppress(TimeoutError):
+                await asyncio.wait_for(self._cliraop_proc.wait(), 5)
+            # send regular close anyway (which also logs the returncode)
+            await self._cliraop_proc.close()
 
         task = self.mass.create_task(_stop())
         if wait:
@@ -358,12 +383,9 @@ class AirplayStream:
                 self.mass.players.update(airplay_player.player_id)
             if "lost packet out of backlog" in line:
                 lost_packets += 1
-                if lost_packets == 50:
+                if lost_packets == 100:
                     logger.warning("High packet loss detected, stopping playback...")
-                    queue = self.mass.player_queues.get_active_queue(mass_player.player_id)
-                    await self.mass.player_queues.stop(queue.queue_id)
-                else:
-                    logger.debug(line)
+                    await self.stop(False)
 
             logger.log(VERBOSE_LOG_LEVEL, line)
 
@@ -371,6 +393,8 @@ class AirplayStream:
         if airplay_player.active_stream == self:
             mass_player.state = PlayerState.IDLE
             self.mass.players.update(airplay_player.player_id)
+        # ensure we're cleaned up afterwards
+        await self.stop()
 
     async def _send_metadata(self, queue: PlayerQueue) -> None:
         """Send metadata to player (and connected sync childs)."""
@@ -589,26 +613,34 @@ class AirplayProvider(PlayerProvider):
         if queue_item.queue_id.startswith(UGP_PREFIX):
             # special case: we got forwarded a request from the UGP
             # use the existing stream job that was already created by UGP
-            stream_job = self.mass.streams.stream_jobs[queue_item.queue_id]
+            stream_job = self.mass.streams.multi_client_jobs[queue_item.queue_id]
+            stream_job.expected_players.add(player_id)
+            input_format = stream_job.pcm_format
+            audio_source = stream_job.subscribe(player_id)
+        elif queue_item.media_type == MediaType.ANNOUNCEMENT:
+            # special case: stream announcement
+            input_format = AIRPLAY_PCM_FORMAT
+            audio_source = self.mass.streams.get_announcement_stream(
+                queue_item.streamdetails.data["url"],
+                pcm_format=AIRPLAY_PCM_FORMAT,
+                use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
+            )
         else:
-            if queue_item.media_type == MediaType.ANNOUNCEMENT:
-                # stream announcement url directly
-                audio_source = get_media_stream(
-                    self.mass, queue_item.streamdetails, pcm_format=AIRPLAY_PCM_FORMAT
-                )
-            else:
-                queue = self.mass.player_queues.get(queue_item.queue_id)
-                audio_source = self.mass.streams.get_flow_stream(
-                    queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT
-                )
-            stream_job = self.mass.streams.create_stream_job(
-                queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=AIRPLAY_PCM_FORMAT
+            queue = self.mass.player_queues.get(queue_item.queue_id)
+            input_format = AIRPLAY_PCM_FORMAT
+            audio_source = self.mass.streams.get_flow_stream(
+                queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT
             )
+        self.mass.create_task(self._handle_stream_audio, player_id, audio_source, input_format)
 
+    async def _handle_stream_audio(
+        self, player_id: str, audio_source: AsyncGenerator[bytes, None], input_format: AudioFormat
+    ) -> None:
+        """Handle streaming of audio to one or more airplay players."""
         # Python is not suitable for realtime audio streaming so we do the actual streaming
         # of (RAOP) audio using a small executable written in C based on libraop to do the actual
-        # timestamped playback. The raw pcm audio is fed to a named pipe, read by the executable
-        # and we can send some ingteractie commands to the process stdin.
+        # timestamped playback, whicj reads pcm audio from stdin
+        # and we can send some interactive commands using a named pipe.
 
         # get current ntp before we start
         _, stdout = await check_output(f"{self.cliraop_bin} -ntp")
@@ -617,11 +649,35 @@ class AirplayProvider(PlayerProvider):
         # setup Raop process for player and its sync childs
         async with asyncio.TaskGroup() as tg:
             for airplay_player in self._get_sync_clients(player_id):
-                stream_job.expected_players.add(airplay_player.player_id)
-                airplay_player.active_stream = AirplayStream(self, airplay_player)
-                tg.create_task(airplay_player.active_stream.start(start_ntp, stream_job))
-        if not queue_item.queue_id.startswith(UGP_PREFIX):
-            stream_job.start()
+                airplay_player.active_stream = AirplayStream(
+                    self, airplay_player, input_format=input_format
+                )
+                tg.create_task(airplay_player.active_stream.start(start_ntp))
+
+        async for chunk in audio_source:
+            active_clients = 0
+            async with asyncio.TaskGroup() as tg:
+                for airplay_player in self._get_sync_clients(player_id):
+                    if not (airplay_player.active_stream and airplay_player.active_stream.running):
+                        # player stopped or switched to a new stream
+                        continue
+                    if airplay_player.active_stream.start_ntp != start_ntp:
+                        # checksum mismatch
+                        continue
+                    tg.create_task(airplay_player.active_stream.write_chunk(chunk))
+                    active_clients += 1
+            if active_clients == 0:
+                # no more clients
+                return
+        # entire stream consumed: send EOF
+        async with asyncio.TaskGroup() as tg:
+            for airplay_player in self._get_sync_clients(player_id):
+                if (
+                    not airplay_player.active_stream
+                    or airplay_player.active_stream.start_ntp != start_ntp
+                ):
+                    continue
+                tg.create_task(airplay_player.active_stream.write_eof())
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
         """Send VOLUME_SET command to given player.
index a0c4d87b57a6ffdfbddba8374160ed6966c29833..5111b49d926483248b2c43645f0eb69a5166ebf2 100644 (file)
@@ -420,8 +420,8 @@ class ChromecastProvider(PlayerProvider):
                     # originally/officially cast supports 96k sample rate
                     # but it seems a (recent?) update broke this
                     # for now use 48k as max sample rate to play safe
-                    max_sample_rate=48000,
-                    supports_24bit=True,
+                    max_sample_rate=44100 if cast_info.is_audio_group else 48000,
+                    supports_24bit=not cast_info.is_audio_group,
                     enabled_by_default=enabled_by_default,
                 ),
                 logger=self.logger.getChild(cast_info.friendly_name),
index c66d73c87cfee7927c8a712d3853c1decea54343..d89f76c34d42b8271968f4a7f60928bb79f7ed4a 100644 (file)
@@ -44,7 +44,6 @@ from music_assistant.common.models.enums import (
     RepeatMode,
 )
 from music_assistant.common.models.errors import MusicAssistantError, SetupFailedError
-from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.constants import (
     CONF_CROSSFADE,
@@ -53,7 +52,6 @@ from music_assistant.constants import (
     CONF_PORT,
     CONF_SYNC_ADJUST,
     MASS_LOGO_ONLINE,
-    UGP_PREFIX,
     VERBOSE_LOG_LEVEL,
 )
 from music_assistant.server.models.player_provider import PlayerProvider
@@ -343,23 +341,13 @@ class SlimprotoProvider(PlayerProvider):
         if player.group_childs:
             # player has sync members, we need to start a (multi-player) stream job
             # to make sure that all clients receive the exact same audio
-            pcm_format = AudioFormat(
-                content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
-            )
-            queue = self.mass.player_queues.get(queue_item.queue_id)
-            stream_job = self.mass.streams.create_stream_job(
+            stream_job = self.mass.streams.create_multi_client_stream_job(
                 queue_id=queue_item.queue_id,
-                pcm_audio_source=self.mass.streams.get_flow_stream(
-                    queue,
-                    start_queue_item=queue_item,
-                    pcm_format=pcm_format,
-                ),
-                pcm_format=pcm_format,
+                start_queue_item=queue_item,
             )
             # forward command to player and any connected sync members
-            sync_clients = self._get_sync_clients(player_id)
             async with asyncio.TaskGroup() as tg:
-                for slimplayer in sync_clients:
+                for slimplayer in self._get_sync_clients(player_id):
                     enforce_mp3 = await self.mass.config.get_player_config_value(
                         slimplayer.player_id, CONF_ENFORCE_MP3
                     )
@@ -375,8 +363,6 @@ class SlimprotoProvider(PlayerProvider):
                             auto_play=False,
                         )
                     )
-            if not queue_item.queue_id.startswith(UGP_PREFIX):
-                stream_job.start()
         else:
             # regular, single player playback
             slimplayer = self.slimproto.get_player(player_id)
@@ -733,7 +719,7 @@ class SlimprotoProvider(PlayerProvider):
         sync_playpoints = self._sync_playpoints[slimplayer.player_id]
 
         active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
-        stream_job = self.mass.streams.stream_jobs.get(active_queue.queue_id)
+        stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id)
         if not stream_job:
             # should not happen, but just in case
             return
index 7633f2e473da0ba57c5d6e03473d2ada8dd8e559..5cdf4b880af4e9e26a7459e71ef6bc25a33adc80 100644 (file)
@@ -35,7 +35,7 @@ from music_assistant.common.models.errors import SetupFailedError
 from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.constants import UGP_PREFIX
-from music_assistant.server.helpers.audio import get_media_stream
+from music_assistant.server.helpers.audio import get_ffmpeg_args, get_player_filter_params
 from music_assistant.server.helpers.process import AsyncProcess, check_output
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -64,6 +64,15 @@ DEFAULT_SNAPSERVER_PORT = 1705
 SNAPWEB_DIR: Final[pathlib.Path] = pathlib.Path(__file__).parent.resolve().joinpath("snapweb")
 
 
+DEFAULT_SNAPCAST_FORMAT = AudioFormat(
+    content_type=ContentType.PCM_S16LE,
+    sample_rate=48000,
+    # TODO: can we handle 24 bits bit depth ?
+    bit_depth=16,
+    channels=2,
+)
+
+
 async def setup(
     mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
 ) -> ProviderInstanceType:
@@ -309,33 +318,27 @@ class SnapCastProvider(PlayerProvider):
         snap_group = self._get_snapgroup(player_id)
         await snap_group.set_stream(stream.identifier)
 
-        # TODO: can we handle 24 bits bit depth ?
-        pcm_format = AudioFormat(
-            content_type=ContentType.PCM_S16LE,
-            sample_rate=48000,
-            bit_depth=16,
-            channels=2,
-        )
-
         if queue_item.queue_id.startswith(UGP_PREFIX):
             # special case: we got forwarded a request from the UGP
             # use the existing stream job that was already created by UGP
-            stream_job = self.mass.streams.stream_jobs[queue_item.queue_id]
+            stream_job = self.mass.streams.multi_client_jobs[queue_item.queue_id]
+            stream_job.expected_players.add(player_id)
+            input_format = stream_job.pcm_format
+            audio_source = stream_job.subscribe(player_id)
+        elif queue_item.media_type == MediaType.ANNOUNCEMENT:
+            # special case: stream announcement
+            input_format = DEFAULT_SNAPCAST_FORMAT
+            audio_source = self.mass.streams.get_announcement_stream(
+                queue_item.streamdetails.data["url"],
+                pcm_format=DEFAULT_SNAPCAST_FORMAT,
+                use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
+            )
         else:
-            if queue_item.media_type == MediaType.ANNOUNCEMENT:
-                # stream announcement url directly
-                audio_source = get_media_stream(
-                    self.mass, queue_item.streamdetails, pcm_format=pcm_format
-                )
-            else:
-                queue = self.mass.player_queues.get(queue_item.queue_id)
-                audio_source = self.mass.streams.get_flow_stream(
-                    queue, start_queue_item=queue_item, pcm_format=pcm_format
-                )
-            stream_job = self.mass.streams.create_stream_job(
-                queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=pcm_format
+            queue = self.mass.player_queues.get(queue_item.queue_id)
+            input_format = DEFAULT_SNAPCAST_FORMAT
+            audio_source = self.mass.streams.get_flow_stream(
+                queue, start_queue_item=queue_item, pcm_format=DEFAULT_SNAPCAST_FORMAT
             )
-        stream_job.expected_players.add(player_id)
 
         async def _streamer() -> None:
             host = self._snapcast_server_host
@@ -352,23 +355,35 @@ class SnapCastProvider(PlayerProvider):
             stream.set_callback(stream_callback)
             stream_path = f"tcp://{host}:{port}"
             self.logger.debug("Start streaming to %s", stream_path)
+            ffmpeg_args = get_ffmpeg_args(
+                input_format=input_format,
+                output_format=DEFAULT_SNAPCAST_FORMAT,
+                filter_params=get_player_filter_params(self.mass, player_id),
+                output_path=f"tcp://{host}:{port}",
+            )
             try:
-                await stream_job.stream_to_custom_output_path(
-                    player_id, pcm_format, f"tcp://{host}:{port}"
-                )
-                # we need to wait a bit for the stream status to become idle
-                # to ensure that all snapclients have consumed the audio
-                await self.mass.players.wait_for_state(player, PlayerState.IDLE)
+                async with AsyncProcess(
+                    ffmpeg_args,
+                    enable_stdin=True,
+                    enable_stdout=False,
+                    enable_stderr=False,
+                    name="snapcast_ffmpeg",
+                ) as ffmpeg_proc:
+                    async for chunk in audio_source:
+                        await ffmpeg_proc.write(chunk)
+                    await ffmpeg_proc.write_eof()
+                    # we need to wait a bit for the stream status to become idle
+                    # to ensure that all snapclients have consumed the audio
+                    await self.mass.players.wait_for_state(player, PlayerState.IDLE)
             finally:
                 self.logger.debug("Finished streaming to %s", stream_path)
                 # there is no way to unsub the callback to we do this nasty
                 stream._callback_func = None
-                await self._snapserver.stream_remove_stream(stream.identifier)
+                with suppress(TypeError, KeyError, AttributeError):
+                    await self._snapserver.stream_remove_stream(stream.identifier)
 
         # start streaming the queue (pcm) audio in a background task
         self._stream_tasks[player_id] = asyncio.create_task(_streamer())
-        if not queue_item.queue_id.startswith(UGP_PREFIX):
-            stream_job.start()
 
     def _get_snapgroup(self, player_id: str) -> Snapgroup:
         """Get snapcast group for given player_id."""
index 1d0c53039e0e17d80b8e68eba896cc7128f9ef80..611efd94f118a5af22bb3e06881d2ae6d2165b42 100644 (file)
@@ -177,7 +177,7 @@ class SpotifyProvider(MusicProvider):
             result.artists += [
                 await self._parse_artist(item)
                 for item in searchresult["artists"]["items"]
-                if (item and item["id"])
+                if (item and item["id"] and item["name"])
             ]
         if "albums" in searchresult:
             result.albums += [
@@ -411,7 +411,7 @@ class SpotifyProvider(MusicProvider):
         artist = Artist(
             item_id=artist_obj["id"],
             provider=self.domain,
-            name=artist_obj["name"],
+            name=artist_obj["name"] or artist_obj["id"],
             provider_mappings={
                 ProviderMapping(
                     item_id=artist_obj["id"],
@@ -455,6 +455,8 @@ class SpotifyProvider(MusicProvider):
             album.external_ids.add((ExternalID.BARCODE, album_obj["external_ids"]["ean"]))
 
         for artist_obj in album_obj["artists"]:
+            if not artist_obj.get("name") or not artist_obj.get("id"):
+                continue
             album.artists.append(await self._parse_artist(artist_obj))
 
         with contextlib.suppress(ValueError):
@@ -523,6 +525,8 @@ class SpotifyProvider(MusicProvider):
         if artist:
             track.artists.append(artist)
         for track_artist in track_obj.get("artists", []):
+            if not track_artist.get("name") or not track_artist.get("id"):
+                continue
             artist = await self._parse_artist(track_artist)
             if artist and artist.item_id not in {x.item_id for x in track.artists}:
                 track.artists.append(artist)
index 43cf0a88b74895fc0c50a6194a3e9bb0e958410e..08d37700b08f0feb704334d6d8ebb73cddfc7222 100644 (file)
@@ -20,13 +20,11 @@ from music_assistant.common.models.config_entries import (
 )
 from music_assistant.common.models.enums import (
     ConfigEntryType,
-    ContentType,
     PlayerFeature,
     PlayerState,
     PlayerType,
     ProviderFeature,
 )
-from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.common.models.queue_item import QueueItem
 from music_assistant.constants import (
@@ -35,10 +33,6 @@ from music_assistant.constants import (
     SYNCGROUP_PREFIX,
     UGP_PREFIX,
 )
-from music_assistant.server.controllers.streams import (
-    FLOW_DEFAULT_BIT_DEPTH,
-    FLOW_DEFAULT_SAMPLE_RATE,
-)
 from music_assistant.server.models.player_provider import PlayerProvider
 
 if TYPE_CHECKING:
@@ -147,7 +141,7 @@ class UniversalGroupProvider(PlayerProvider):
                 if member.state == PlayerState.IDLE:
                     continue
                 tg.create_task(self.mass.players.cmd_stop(member.player_id))
-        if existing := self.mass.streams.stream_jobs.pop(player_id, None):
+        if existing := self.mass.streams.multi_client_jobs.pop(player_id, None):
             existing.stop()
 
     async def cmd_play(self, player_id: str) -> None:
@@ -179,22 +173,17 @@ class UniversalGroupProvider(PlayerProvider):
 
         # create a multi-client stream job - all (direct) child's of this UGP group
         # will subscribe to this multi client queue stream
-        pcm_format = AudioFormat(
-            content_type=ContentType.from_bit_depth(FLOW_DEFAULT_BIT_DEPTH),
-            sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
-            bit_depth=FLOW_DEFAULT_BIT_DEPTH,
-        )
         queue = self.mass.player_queues.get(player_id)
-        stream_job = self.mass.streams.create_stream_job(
+        stream_job = self.mass.streams.create_multi_client_stream_job(
             queue.queue_id,
-            pcm_audio_source=self.mass.streams.get_flow_stream(
-                queue=queue, start_queue_item=queue_item, pcm_format=pcm_format
-            ),
-            pcm_format=pcm_format,
+            start_queue_item=queue_item,
         )
         # create a fake queue item to forward to downstream play_media commands
         ugp_queue_item = QueueItem(
-            player_id, queue_item_id="flow", name=group_player.display_name, duration=None
+            player_id,
+            queue_item_id=stream_job.job_id,
+            name="Music Assistant",
+            duration=None,
         )
 
         # forward the stream job to all group members
@@ -206,7 +195,6 @@ class UniversalGroupProvider(PlayerProvider):
                     if member is None:
                         continue
                 tg.create_task(player_prov.play_media(member.player_id, ugp_queue_item))
-        stream_job.start()
 
     async def poll_player(self, player_id: str) -> None:
         """Poll player for state updates."""
index d087b22e2124c21fae804335ae0af39c2b7b98d7..78398cf9bdeddb43778d8fc0d2d302298b86e976 100644 (file)
@@ -124,10 +124,10 @@ class MusicAssistant:
         self.config = ConfigController(self)
         await self.config.setup()
         LOGGER.info(
-            "Starting Music Assistant Server (%s) version %s - uvloop: %s",
+            "Starting Music Assistant Server (%s) version %s - HA add-on: %s",
             self.server_id,
             self.version,
-            "uvloop" in str(self.loop),
+            self.running_as_hass_addon,
         )
         # setup other core controllers
         self.cache = CacheController(self)
index bef03bf8fc14c6fb4da1c5edebc8d888206256ec..45f3bcdc4f42ab15425e3f5f28ddcc57aaa9dc48 100644 (file)
@@ -40,7 +40,6 @@ server = [
   "zeroconf==0.131.0",
   "cryptography==42.0.5",
   "ifaddr==0.2.0",
-  "uvloop==0.19.0",
 ]
 test = [
   "black==24.2.0",
index 3e1ea6a1edd633e11af99c6917936fb61604ab2e..bcd67adb21472108efe30463815b74420eb450d3 100644 (file)
@@ -37,7 +37,6 @@ soco==0.30.2
 sonos-websocket==0.1.3
 tidalapi==0.7.4
 unidecode==1.3.8
-uvloop==0.19.0
 xmltodict==0.13.0
 ytmusicapi==1.6.0
 zeroconf==0.131.0