From: Marcel van der Veldt Date: Wed, 21 Aug 2024 19:21:48 +0000 (+0200) Subject: small fix for snapcast stream end X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=690bad3c2c50bf7305da238046da7039951320af;p=music-assistant-server.git small fix for snapcast stream end --- diff --git a/music_assistant/server/helpers/util.py b/music_assistant/server/helpers/util.py index 31656ef9..c6496db2 100644 --- a/music_assistant/server/helpers/util.py +++ b/music_assistant/server/helpers/util.py @@ -165,9 +165,9 @@ class TaskManager: self.mass = mass self._tasks: list[asyncio.Task] = [] - def create_task(self, coro: Coroutine, eager_start: bool = False) -> None: + def create_task(self, coro: Coroutine) -> None: """Create a new task and add it to the manager.""" - task = self.mass.create_task(coro, eager_start=eager_start) + task = self.mass.create_task(coro) self._tasks.append(task) async def __aenter__(self) -> Self: diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 25ca9008..4e4032d2 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -644,92 +644,93 @@ class AirplayProvider(PlayerProvider): media: PlayerMedia, ) -> None: """Handle PLAY MEDIA on given player.""" - await self._play_media_lock.acquire() - player = self.mass.players.get(player_id) - if player.synced_to: - # should not happen, but just in case - raise RuntimeError("Player is synced") - # always stop existing stream first - async with TaskManager(self.mass) as tg: - for airplay_player in self._get_sync_clients(player_id): - tg.create_task(airplay_player.cmd_stop(update_state=False)) - # select audio source - if media.media_type == MediaType.ANNOUNCEMENT: - # special case: stream announcement - input_format = AIRPLAY_PCM_FORMAT - audio_source = self.mass.streams.get_announcement_stream( - media.custom_data["url"], - output_format=AIRPLAY_PCM_FORMAT, - use_pre_announce=media.custom_data["use_pre_announce"], - ) - elif media.queue_id.startswith("ugp_"): - # special case: UGP stream - ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp") - ugp_stream = ugp_provider.streams[media.queue_id] - input_format = ugp_stream.audio_format - audio_source = ugp_stream.subscribe_raw() - elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id: - # radio stream - consume media stream directly - input_format = AIRPLAY_PCM_FORMAT - queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id) - audio_source = self.mass.streams.get_media_stream( - streamdetails=queue_item.streamdetails, - pcm_format=AIRPLAY_PCM_FORMAT, - ) - elif media.queue_id and media.queue_item_id: - # regular queue (flow) stream request - input_format = AIRPLAY_PCM_FORMAT - audio_source = self.mass.streams.get_flow_stream( - queue=self.mass.player_queues.get(media.queue_id), - start_queue_item=self.mass.player_queues.get_item( - media.queue_id, media.queue_item_id - ), - pcm_format=AIRPLAY_PCM_FORMAT, - ) - else: - # assume url or some other direct path - # NOTE: this will fail if its an uri not playable by ffmpeg - input_format = AIRPLAY_PCM_FORMAT - audio_source = get_ffmpeg_stream( - audio_input=media.uri, - input_format=AudioFormat(ContentType.try_parse(media.uri)), - output_format=AIRPLAY_PCM_FORMAT, - ) + async with self._play_media_lock: + player = self.mass.players.get(player_id) + if player.synced_to: + # should not happen, but just in case + raise RuntimeError("Player is synced") + # always stop existing stream first + async with TaskManager(self.mass) as tg: + for airplay_player in self._get_sync_clients(player_id): + tg.create_task(airplay_player.cmd_stop(update_state=False)) + # select audio source + if media.media_type == MediaType.ANNOUNCEMENT: + # special case: stream announcement + input_format = AIRPLAY_PCM_FORMAT + audio_source = self.mass.streams.get_announcement_stream( + media.custom_data["url"], + output_format=AIRPLAY_PCM_FORMAT, + use_pre_announce=media.custom_data["use_pre_announce"], + ) + elif media.queue_id.startswith("ugp_"): + # special case: UGP stream + ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp") + ugp_stream = ugp_provider.streams[media.queue_id] + input_format = ugp_stream.audio_format + audio_source = ugp_stream.subscribe_raw() + elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id: + # radio stream - consume media stream directly + input_format = AIRPLAY_PCM_FORMAT + queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id) + audio_source = self.mass.streams.get_media_stream( + streamdetails=queue_item.streamdetails, + pcm_format=AIRPLAY_PCM_FORMAT, + ) + elif media.queue_id and media.queue_item_id: + # regular queue (flow) stream request + input_format = AIRPLAY_PCM_FORMAT + audio_source = self.mass.streams.get_flow_stream( + queue=self.mass.player_queues.get(media.queue_id), + start_queue_item=self.mass.player_queues.get_item( + media.queue_id, media.queue_item_id + ), + pcm_format=AIRPLAY_PCM_FORMAT, + ) + else: + # assume url or some other direct path + # NOTE: this will fail if its an uri not playable by ffmpeg + input_format = AIRPLAY_PCM_FORMAT + audio_source = get_ffmpeg_stream( + audio_input=media.uri, + input_format=AudioFormat(ContentType.try_parse(media.uri)), + output_format=AIRPLAY_PCM_FORMAT, + ) - # Python is not suitable for realtime audio streaming so we do the actual streaming - # of (RAOP) audio using a small executable written in C based on libraop to do the actual - # timestamped playback, which reads pcm audio from stdin - # and we can send some interactive commands using a named pipe. + # Python is not suitable for realtime audio streaming so we do the actual streaming + # of (RAOP) audio using a small executable written in C based on libraop to do the actual + # timestamped playback, which reads pcm audio from stdin + # and we can send some interactive commands using a named pipe. - # setup RaopStream for player and its sync childs - sync_clients = self._get_sync_clients(player_id) - for airplay_player in sync_clients: - airplay_player.raop_stream = RaopStream(self, airplay_player, input_format=input_format) + # setup RaopStream for player and its sync childs + sync_clients = self._get_sync_clients(player_id) + for airplay_player in sync_clients: + airplay_player.raop_stream = RaopStream( + self, airplay_player, input_format=input_format + ) - async def audio_streamer() -> None: - async for chunk in audio_source: + async def audio_streamer() -> None: + async for chunk in audio_source: + await asyncio.gather( + *[x.raop_stream.write_chunk(chunk) for x in sync_clients], + return_exceptions=True, + ) + # entire stream consumed: send EOF await asyncio.gather( - *[x.raop_stream.write_chunk(chunk) for x in sync_clients], + *[x.raop_stream.write_eof() for x in sync_clients], return_exceptions=True, ) - # entire stream consumed: send EOF + + # get current ntp and start cliraop + _, stdout = await check_output(self.cliraop_bin, "-ntp") + start_ntp = int(stdout.strip()) + wait_start = 1250 + (250 * len(sync_clients)) await asyncio.gather( - *[x.raop_stream.write_eof() for x in sync_clients], + *[x.raop_stream.start(start_ntp, wait_start) for x in sync_clients], return_exceptions=True, ) - - # get current ntp and start cliraop - _, stdout = await check_output(self.cliraop_bin, "-ntp") - start_ntp = int(stdout.strip()) - wait_start = 1250 + (250 * len(sync_clients)) - await asyncio.gather( - *[x.raop_stream.start(start_ntp, wait_start) for x in sync_clients], - return_exceptions=True, - ) - self._players[player_id].raop_stream.audio_source_task = asyncio.create_task( - audio_streamer() - ) - self._play_media_lock.release() + self._players[player_id].raop_stream.audio_source_task = asyncio.create_task( + audio_streamer() + ) async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player. diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 18284877..f51c2f09 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -522,19 +522,17 @@ class SnapCastProvider(PlayerProvider): self.mass.players.update(player_id) self._set_childs_state(player_id) await ffmpeg_proc.wait() - # we need to wait a bit for the stream status to become idle - # to ensure that all snapclients have consumed the audio - await asyncio.sleep(5) - - player.state = PlayerState.IDLE - self.mass.players.update(player_id) - self._set_childs_state(player_id) - finally: self.logger.debug("Finished streaming to %s", stream_path) - + # we need to wait a bit for the stream status to become idle + # to ensure that all snapclients have consumed the audio + while stream.status != "idle": + await asyncio.sleep(0.25) with suppress(TypeError, KeyError, AttributeError): await self._snapserver.stream_remove_stream(stream.identifier) + player.state = PlayerState.IDLE + self.mass.players.update(player_id) + self._set_childs_state(player_id) # start streaming the queue (pcm) audio in a background task self._stream_tasks[player_id] = asyncio.create_task(_streamer()) diff --git a/music_assistant/server/server.py b/music_assistant/server/server.py index 5e145aac..905e1258 100644 --- a/music_assistant/server/server.py +++ b/music_assistant/server/server.py @@ -314,7 +314,7 @@ class MusicAssistant: target: Coroutine | Awaitable | Callable, *args: Any, task_id: str | None = None, - eager_start: bool = False, + abort_existing: bool = False, **kwargs: Any, ) -> asyncio.Task | asyncio.Future: """Create Task on (main) event loop from Coroutine(function). @@ -324,32 +324,24 @@ class MusicAssistant: if target is None: msg = "Target is missing" raise RuntimeError(msg) - if task_id and (existing := self._tracked_tasks.get(task_id)): + if task_id and (existing := self._tracked_tasks.get(task_id)) and not existing.done(): # prevent duplicate tasks if task_id is given and already present - return existing - if asyncio.iscoroutinefunction(target): - # coroutine function (with or without eager start) - if eager_start: - task = asyncio.Task(target(*args, **kwargs), loop=self.loop, eager_start=True) + if abort_existing: + existing.cancel() else: - task = self.loop.create_task(target(*args, **kwargs)) + return existing + if asyncio.iscoroutinefunction(target): + # coroutine function + task = self.loop.create_task(target(*args, **kwargs)) elif asyncio.iscoroutine(target): - # coroutine (with or without eager start) - if eager_start: - task = asyncio.Task(target, loop=self.loop, eager_start=True) - else: - task = self.loop.create_task(target) - elif eager_start: - # regular callback (non async function) - task = asyncio.Task( - asyncio.to_thread(target, *args, **kwargs), loop=self.loop, eager_start=True - ) + # coroutine + task = self.loop.create_task(target) else: task = self.loop.create_task(asyncio.to_thread(target, *args, **kwargs)) def task_done_callback(_task: asyncio.Task) -> None: _task_id = task.task_id - self._tracked_tasks.pop(_task_id) + self._tracked_tasks.pop(_task_id, None) # log unhandled exceptions if ( LOGGER.isEnabledFor(logging.DEBUG) @@ -393,7 +385,7 @@ class MusicAssistant: def _create_task() -> None: self._tracked_timers.pop(task_id) - self.create_task(target, *args, task_id=task_id, **kwargs) + self.create_task(target, *args, task_id=task_id, abort_existing=True, **kwargs) handle = self.loop.call_later(delay, _create_task) self._tracked_timers[task_id] = handle