small fix for snapcast stream end
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 21 Aug 2024 19:21:48 +0000 (21:21 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 21 Aug 2024 19:21:48 +0000 (21:21 +0200)
music_assistant/server/helpers/util.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/server.py

index 31656ef96fc1f1e966b76832c0ac29f1663256dd..c6496db20b0ddbd5cb1b495934e416aa142c6666 100644 (file)
@@ -165,9 +165,9 @@ class TaskManager:
         self.mass = mass
         self._tasks: list[asyncio.Task] = []
 
-    def create_task(self, coro: Coroutine, eager_start: bool = False) -> None:
+    def create_task(self, coro: Coroutine) -> None:
         """Create a new task and add it to the manager."""
-        task = self.mass.create_task(coro, eager_start=eager_start)
+        task = self.mass.create_task(coro)
         self._tasks.append(task)
 
     async def __aenter__(self) -> Self:
index 25ca9008dd1ad065c65578e3136070b3f6944308..4e4032d2627dd1aac65599a03fc98038156ed29c 100644 (file)
@@ -644,92 +644,93 @@ class AirplayProvider(PlayerProvider):
         media: PlayerMedia,
     ) -> None:
         """Handle PLAY MEDIA on given player."""
-        await self._play_media_lock.acquire()
-        player = self.mass.players.get(player_id)
-        if player.synced_to:
-            # should not happen, but just in case
-            raise RuntimeError("Player is synced")
-        # always stop existing stream first
-        async with TaskManager(self.mass) as tg:
-            for airplay_player in self._get_sync_clients(player_id):
-                tg.create_task(airplay_player.cmd_stop(update_state=False))
-        # select audio source
-        if media.media_type == MediaType.ANNOUNCEMENT:
-            # special case: stream announcement
-            input_format = AIRPLAY_PCM_FORMAT
-            audio_source = self.mass.streams.get_announcement_stream(
-                media.custom_data["url"],
-                output_format=AIRPLAY_PCM_FORMAT,
-                use_pre_announce=media.custom_data["use_pre_announce"],
-            )
-        elif media.queue_id.startswith("ugp_"):
-            # special case: UGP stream
-            ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
-            ugp_stream = ugp_provider.streams[media.queue_id]
-            input_format = ugp_stream.audio_format
-            audio_source = ugp_stream.subscribe_raw()
-        elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id:
-            # radio stream - consume media stream directly
-            input_format = AIRPLAY_PCM_FORMAT
-            queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id)
-            audio_source = self.mass.streams.get_media_stream(
-                streamdetails=queue_item.streamdetails,
-                pcm_format=AIRPLAY_PCM_FORMAT,
-            )
-        elif media.queue_id and media.queue_item_id:
-            # regular queue (flow) stream request
-            input_format = AIRPLAY_PCM_FORMAT
-            audio_source = self.mass.streams.get_flow_stream(
-                queue=self.mass.player_queues.get(media.queue_id),
-                start_queue_item=self.mass.player_queues.get_item(
-                    media.queue_id, media.queue_item_id
-                ),
-                pcm_format=AIRPLAY_PCM_FORMAT,
-            )
-        else:
-            # assume url or some other direct path
-            # NOTE: this will fail if its an uri not playable by ffmpeg
-            input_format = AIRPLAY_PCM_FORMAT
-            audio_source = get_ffmpeg_stream(
-                audio_input=media.uri,
-                input_format=AudioFormat(ContentType.try_parse(media.uri)),
-                output_format=AIRPLAY_PCM_FORMAT,
-            )
+        async with self._play_media_lock:
+            player = self.mass.players.get(player_id)
+            if player.synced_to:
+                # should not happen, but just in case
+                raise RuntimeError("Player is synced")
+            # always stop existing stream first
+            async with TaskManager(self.mass) as tg:
+                for airplay_player in self._get_sync_clients(player_id):
+                    tg.create_task(airplay_player.cmd_stop(update_state=False))
+            # select audio source
+            if media.media_type == MediaType.ANNOUNCEMENT:
+                # special case: stream announcement
+                input_format = AIRPLAY_PCM_FORMAT
+                audio_source = self.mass.streams.get_announcement_stream(
+                    media.custom_data["url"],
+                    output_format=AIRPLAY_PCM_FORMAT,
+                    use_pre_announce=media.custom_data["use_pre_announce"],
+                )
+            elif media.queue_id.startswith("ugp_"):
+                # special case: UGP stream
+                ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
+                ugp_stream = ugp_provider.streams[media.queue_id]
+                input_format = ugp_stream.audio_format
+                audio_source = ugp_stream.subscribe_raw()
+            elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id:
+                # radio stream - consume media stream directly
+                input_format = AIRPLAY_PCM_FORMAT
+                queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id)
+                audio_source = self.mass.streams.get_media_stream(
+                    streamdetails=queue_item.streamdetails,
+                    pcm_format=AIRPLAY_PCM_FORMAT,
+                )
+            elif media.queue_id and media.queue_item_id:
+                # regular queue (flow) stream request
+                input_format = AIRPLAY_PCM_FORMAT
+                audio_source = self.mass.streams.get_flow_stream(
+                    queue=self.mass.player_queues.get(media.queue_id),
+                    start_queue_item=self.mass.player_queues.get_item(
+                        media.queue_id, media.queue_item_id
+                    ),
+                    pcm_format=AIRPLAY_PCM_FORMAT,
+                )
+            else:
+                # assume url or some other direct path
+                # NOTE: this will fail if its an uri not playable by ffmpeg
+                input_format = AIRPLAY_PCM_FORMAT
+                audio_source = get_ffmpeg_stream(
+                    audio_input=media.uri,
+                    input_format=AudioFormat(ContentType.try_parse(media.uri)),
+                    output_format=AIRPLAY_PCM_FORMAT,
+                )
 
-        # Python is not suitable for realtime audio streaming so we do the actual streaming
-        # of (RAOP) audio using a small executable written in C based on libraop to do the actual
-        # timestamped playback, which reads pcm audio from stdin
-        # and we can send some interactive commands using a named pipe.
+            # Python is not suitable for realtime audio streaming so we do the actual streaming
+            # of (RAOP) audio using a small executable written in C based on libraop to do the actual
+            # timestamped playback, which reads pcm audio from stdin
+            # and we can send some interactive commands using a named pipe.
 
-        # setup RaopStream for player and its sync childs
-        sync_clients = self._get_sync_clients(player_id)
-        for airplay_player in sync_clients:
-            airplay_player.raop_stream = RaopStream(self, airplay_player, input_format=input_format)
+            # setup RaopStream for player and its sync childs
+            sync_clients = self._get_sync_clients(player_id)
+            for airplay_player in sync_clients:
+                airplay_player.raop_stream = RaopStream(
+                    self, airplay_player, input_format=input_format
+                )
 
-        async def audio_streamer() -> None:
-            async for chunk in audio_source:
+            async def audio_streamer() -> None:
+                async for chunk in audio_source:
+                    await asyncio.gather(
+                        *[x.raop_stream.write_chunk(chunk) for x in sync_clients],
+                        return_exceptions=True,
+                    )
+                # entire stream consumed: send EOF
                 await asyncio.gather(
-                    *[x.raop_stream.write_chunk(chunk) for x in sync_clients],
+                    *[x.raop_stream.write_eof() for x in sync_clients],
                     return_exceptions=True,
                 )
-            # entire stream consumed: send EOF
+
+            # get current ntp and start cliraop
+            _, stdout = await check_output(self.cliraop_bin, "-ntp")
+            start_ntp = int(stdout.strip())
+            wait_start = 1250 + (250 * len(sync_clients))
             await asyncio.gather(
-                *[x.raop_stream.write_eof() for x in sync_clients],
+                *[x.raop_stream.start(start_ntp, wait_start) for x in sync_clients],
                 return_exceptions=True,
             )
-
-        # get current ntp and start cliraop
-        _, stdout = await check_output(self.cliraop_bin, "-ntp")
-        start_ntp = int(stdout.strip())
-        wait_start = 1250 + (250 * len(sync_clients))
-        await asyncio.gather(
-            *[x.raop_stream.start(start_ntp, wait_start) for x in sync_clients],
-            return_exceptions=True,
-        )
-        self._players[player_id].raop_stream.audio_source_task = asyncio.create_task(
-            audio_streamer()
-        )
-        self._play_media_lock.release()
+            self._players[player_id].raop_stream.audio_source_task = asyncio.create_task(
+                audio_streamer()
+            )
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
         """Send VOLUME_SET command to given player.
index 18284877c7eb25db1d957f5bcc4b267f17d07940..f51c2f09cb36c3cb3dedd04c728d6648e6b2b446 100644 (file)
@@ -522,19 +522,17 @@ class SnapCastProvider(PlayerProvider):
                     self.mass.players.update(player_id)
                     self._set_childs_state(player_id)
                     await ffmpeg_proc.wait()
-                    # we need to wait a bit for the stream status to become idle
-                    # to ensure that all snapclients have consumed the audio
-                    await asyncio.sleep(5)
-
-                    player.state = PlayerState.IDLE
-                    self.mass.players.update(player_id)
-                    self._set_childs_state(player_id)
-
             finally:
                 self.logger.debug("Finished streaming to %s", stream_path)
-
+                # we need to wait a bit for the stream status to become idle
+                # to ensure that all snapclients have consumed the audio
+                while stream.status != "idle":
+                    await asyncio.sleep(0.25)
                 with suppress(TypeError, KeyError, AttributeError):
                     await self._snapserver.stream_remove_stream(stream.identifier)
+                player.state = PlayerState.IDLE
+                self.mass.players.update(player_id)
+                self._set_childs_state(player_id)
 
         # start streaming the queue (pcm) audio in a background task
         self._stream_tasks[player_id] = asyncio.create_task(_streamer())
index 5e145aacac8f46bf6bc50e19778fefc2f12e7f73..905e12584381d79e45b1423730c62cdd7535930f 100644 (file)
@@ -314,7 +314,7 @@ class MusicAssistant:
         target: Coroutine | Awaitable | Callable,
         *args: Any,
         task_id: str | None = None,
-        eager_start: bool = False,
+        abort_existing: bool = False,
         **kwargs: Any,
     ) -> asyncio.Task | asyncio.Future:
         """Create Task on (main) event loop from Coroutine(function).
@@ -324,32 +324,24 @@ class MusicAssistant:
         if target is None:
             msg = "Target is missing"
             raise RuntimeError(msg)
-        if task_id and (existing := self._tracked_tasks.get(task_id)):
+        if task_id and (existing := self._tracked_tasks.get(task_id)) and not existing.done():
             # prevent duplicate tasks if task_id is given and already present
-            return existing
-        if asyncio.iscoroutinefunction(target):
-            # coroutine function (with or without eager start)
-            if eager_start:
-                task = asyncio.Task(target(*args, **kwargs), loop=self.loop, eager_start=True)
+            if abort_existing:
+                existing.cancel()
             else:
-                task = self.loop.create_task(target(*args, **kwargs))
+                return existing
+        if asyncio.iscoroutinefunction(target):
+            # coroutine function
+            task = self.loop.create_task(target(*args, **kwargs))
         elif asyncio.iscoroutine(target):
-            # coroutine (with or without eager start)
-            if eager_start:
-                task = asyncio.Task(target, loop=self.loop, eager_start=True)
-            else:
-                task = self.loop.create_task(target)
-        elif eager_start:
-            # regular callback (non async function)
-            task = asyncio.Task(
-                asyncio.to_thread(target, *args, **kwargs), loop=self.loop, eager_start=True
-            )
+            # coroutine
+            task = self.loop.create_task(target)
         else:
             task = self.loop.create_task(asyncio.to_thread(target, *args, **kwargs))
 
         def task_done_callback(_task: asyncio.Task) -> None:
             _task_id = task.task_id
-            self._tracked_tasks.pop(_task_id)
+            self._tracked_tasks.pop(_task_id, None)
             # log unhandled exceptions
             if (
                 LOGGER.isEnabledFor(logging.DEBUG)
@@ -393,7 +385,7 @@ class MusicAssistant:
 
         def _create_task() -> None:
             self._tracked_timers.pop(task_id)
-            self.create_task(target, *args, task_id=task_id, **kwargs)
+            self.create_task(target, *args, task_id=task_id, abort_existing=True, **kwargs)
 
         handle = self.loop.call_later(delay, _create_task)
         self._tracked_timers[task_id] = handle