media: PlayerMedia,
) -> None:
"""Handle PLAY MEDIA on given player."""
- await self._play_media_lock.acquire()
- player = self.mass.players.get(player_id)
- if player.synced_to:
- # should not happen, but just in case
- raise RuntimeError("Player is synced")
- # always stop existing stream first
- async with TaskManager(self.mass) as tg:
- for airplay_player in self._get_sync_clients(player_id):
- tg.create_task(airplay_player.cmd_stop(update_state=False))
- # select audio source
- if media.media_type == MediaType.ANNOUNCEMENT:
- # special case: stream announcement
- input_format = AIRPLAY_PCM_FORMAT
- audio_source = self.mass.streams.get_announcement_stream(
- media.custom_data["url"],
- output_format=AIRPLAY_PCM_FORMAT,
- use_pre_announce=media.custom_data["use_pre_announce"],
- )
- elif media.queue_id.startswith("ugp_"):
- # special case: UGP stream
- ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
- ugp_stream = ugp_provider.streams[media.queue_id]
- input_format = ugp_stream.audio_format
- audio_source = ugp_stream.subscribe_raw()
- elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id:
- # radio stream - consume media stream directly
- input_format = AIRPLAY_PCM_FORMAT
- queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id)
- audio_source = self.mass.streams.get_media_stream(
- streamdetails=queue_item.streamdetails,
- pcm_format=AIRPLAY_PCM_FORMAT,
- )
- elif media.queue_id and media.queue_item_id:
- # regular queue (flow) stream request
- input_format = AIRPLAY_PCM_FORMAT
- audio_source = self.mass.streams.get_flow_stream(
- queue=self.mass.player_queues.get(media.queue_id),
- start_queue_item=self.mass.player_queues.get_item(
- media.queue_id, media.queue_item_id
- ),
- pcm_format=AIRPLAY_PCM_FORMAT,
- )
- else:
- # assume url or some other direct path
- # NOTE: this will fail if its an uri not playable by ffmpeg
- input_format = AIRPLAY_PCM_FORMAT
- audio_source = get_ffmpeg_stream(
- audio_input=media.uri,
- input_format=AudioFormat(ContentType.try_parse(media.uri)),
- output_format=AIRPLAY_PCM_FORMAT,
- )
+ async with self._play_media_lock:
+ player = self.mass.players.get(player_id)
+ if player.synced_to:
+ # should not happen, but just in case
+ raise RuntimeError("Player is synced")
+ # always stop existing stream first
+ async with TaskManager(self.mass) as tg:
+ for airplay_player in self._get_sync_clients(player_id):
+ tg.create_task(airplay_player.cmd_stop(update_state=False))
+ # select audio source
+ if media.media_type == MediaType.ANNOUNCEMENT:
+ # special case: stream announcement
+ input_format = AIRPLAY_PCM_FORMAT
+ audio_source = self.mass.streams.get_announcement_stream(
+ media.custom_data["url"],
+ output_format=AIRPLAY_PCM_FORMAT,
+ use_pre_announce=media.custom_data["use_pre_announce"],
+ )
+ elif media.queue_id.startswith("ugp_"):
+ # special case: UGP stream
+ ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
+ ugp_stream = ugp_provider.streams[media.queue_id]
+ input_format = ugp_stream.audio_format
+ audio_source = ugp_stream.subscribe_raw()
+ elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id:
+ # radio stream - consume media stream directly
+ input_format = AIRPLAY_PCM_FORMAT
+ queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id)
+ audio_source = self.mass.streams.get_media_stream(
+ streamdetails=queue_item.streamdetails,
+ pcm_format=AIRPLAY_PCM_FORMAT,
+ )
+ elif media.queue_id and media.queue_item_id:
+ # regular queue (flow) stream request
+ input_format = AIRPLAY_PCM_FORMAT
+ audio_source = self.mass.streams.get_flow_stream(
+ queue=self.mass.player_queues.get(media.queue_id),
+ start_queue_item=self.mass.player_queues.get_item(
+ media.queue_id, media.queue_item_id
+ ),
+ pcm_format=AIRPLAY_PCM_FORMAT,
+ )
+ else:
+ # assume url or some other direct path
+ # NOTE: this will fail if its an uri not playable by ffmpeg
+ input_format = AIRPLAY_PCM_FORMAT
+ audio_source = get_ffmpeg_stream(
+ audio_input=media.uri,
+ input_format=AudioFormat(ContentType.try_parse(media.uri)),
+ output_format=AIRPLAY_PCM_FORMAT,
+ )
- # Python is not suitable for realtime audio streaming so we do the actual streaming
- # of (RAOP) audio using a small executable written in C based on libraop to do the actual
- # timestamped playback, which reads pcm audio from stdin
- # and we can send some interactive commands using a named pipe.
+ # Python is not suitable for realtime audio streaming so we do the actual streaming
+ # of (RAOP) audio using a small executable written in C based on libraop to do the actual
+ # timestamped playback, which reads pcm audio from stdin
+ # and we can send some interactive commands using a named pipe.
- # setup RaopStream for player and its sync childs
- sync_clients = self._get_sync_clients(player_id)
- for airplay_player in sync_clients:
- airplay_player.raop_stream = RaopStream(self, airplay_player, input_format=input_format)
+ # setup RaopStream for player and its sync childs
+ sync_clients = self._get_sync_clients(player_id)
+ for airplay_player in sync_clients:
+ airplay_player.raop_stream = RaopStream(
+ self, airplay_player, input_format=input_format
+ )
- async def audio_streamer() -> None:
- async for chunk in audio_source:
+ async def audio_streamer() -> None:
+ async for chunk in audio_source:
+ await asyncio.gather(
+ *[x.raop_stream.write_chunk(chunk) for x in sync_clients],
+ return_exceptions=True,
+ )
+ # entire stream consumed: send EOF
await asyncio.gather(
- *[x.raop_stream.write_chunk(chunk) for x in sync_clients],
+ *[x.raop_stream.write_eof() for x in sync_clients],
return_exceptions=True,
)
- # entire stream consumed: send EOF
+
+ # get current ntp and start cliraop
+ _, stdout = await check_output(self.cliraop_bin, "-ntp")
+ start_ntp = int(stdout.strip())
+ wait_start = 1250 + (250 * len(sync_clients))
await asyncio.gather(
- *[x.raop_stream.write_eof() for x in sync_clients],
+ *[x.raop_stream.start(start_ntp, wait_start) for x in sync_clients],
return_exceptions=True,
)
-
- # get current ntp and start cliraop
- _, stdout = await check_output(self.cliraop_bin, "-ntp")
- start_ntp = int(stdout.strip())
- wait_start = 1250 + (250 * len(sync_clients))
- await asyncio.gather(
- *[x.raop_stream.start(start_ntp, wait_start) for x in sync_clients],
- return_exceptions=True,
- )
- self._players[player_id].raop_stream.audio_source_task = asyncio.create_task(
- audio_streamer()
- )
- self._play_media_lock.release()
+ self._players[player_id].raop_stream.audio_source_task = asyncio.create_task(
+ audio_streamer()
+ )
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.
target: Coroutine | Awaitable | Callable,
*args: Any,
task_id: str | None = None,
- eager_start: bool = False,
+ abort_existing: bool = False,
**kwargs: Any,
) -> asyncio.Task | asyncio.Future:
"""Create Task on (main) event loop from Coroutine(function).
if target is None:
msg = "Target is missing"
raise RuntimeError(msg)
- if task_id and (existing := self._tracked_tasks.get(task_id)):
+ if task_id and (existing := self._tracked_tasks.get(task_id)) and not existing.done():
# prevent duplicate tasks if task_id is given and already present
- return existing
- if asyncio.iscoroutinefunction(target):
- # coroutine function (with or without eager start)
- if eager_start:
- task = asyncio.Task(target(*args, **kwargs), loop=self.loop, eager_start=True)
+ if abort_existing:
+ existing.cancel()
else:
- task = self.loop.create_task(target(*args, **kwargs))
+ return existing
+ if asyncio.iscoroutinefunction(target):
+ # coroutine function
+ task = self.loop.create_task(target(*args, **kwargs))
elif asyncio.iscoroutine(target):
- # coroutine (with or without eager start)
- if eager_start:
- task = asyncio.Task(target, loop=self.loop, eager_start=True)
- else:
- task = self.loop.create_task(target)
- elif eager_start:
- # regular callback (non async function)
- task = asyncio.Task(
- asyncio.to_thread(target, *args, **kwargs), loop=self.loop, eager_start=True
- )
+ # coroutine
+ task = self.loop.create_task(target)
else:
task = self.loop.create_task(asyncio.to_thread(target, *args, **kwargs))
def task_done_callback(_task: asyncio.Task) -> None:
_task_id = task.task_id
- self._tracked_tasks.pop(_task_id)
+ self._tracked_tasks.pop(_task_id, None)
# log unhandled exceptions
if (
LOGGER.isEnabledFor(logging.DEBUG)
def _create_task() -> None:
self._tracked_timers.pop(task_id)
- self.create_task(target, *args, task_id=task_id, **kwargs)
+ self.create_task(target, *args, task_id=task_id, abort_existing=True, **kwargs)
handle = self.loop.call_later(delay, _create_task)
self._tracked_timers[task_id] = handle