from zeroconf.asyncio import AsyncServiceInfo
from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
self._cliraop_proc: AsyncProcess | None = None
self._ffmpeg_proc: AsyncProcess | None = None
self._stop_requested = False
+ self.buffer = asyncio.Queue(5)
@property
def running(self) -> bool:
output_format=AIRPLAY_PCM_FORMAT,
filter_params=get_player_filter_params(self.mass, player_id),
)
+
+ async def get_chunks() -> AsyncGenerator[bytes, None]:
+ while True:
+ chunk = await self.buffer.get()
+ if chunk == b"EOF":
+ break
+ yield chunk
+
# launch ffmpeg process with player specific settings
# the stream_job_runner will start pushing pcm chunks to the stdin
- # the ffmpeg process will send the output directly to the given path (e.g. tcp socket)
+ # the ffmpeg process will send the output directly to the stdin of cliraop
self._ffmpeg_proc = AsyncProcess(
ffmpeg_args,
enable_stdin=True,
enable_stdout=True,
enable_stderr=False,
+ custom_stdin=get_chunks(),
custom_stdout=write,
name="cliraop_ffmpeg",
)
enable_stdout=False,
enable_stderr=True,
custom_stdin=read,
+ name="cliraop",
)
await self._cliraop_proc.start()
self._log_reader_task = asyncio.create_task(self._log_watcher())
- async def write_chunk(self, chunk: bytes) -> None:
- """Write a (pcm) audio chunk to the player."""
- await self._ffmpeg_proc.write(chunk)
-
- async def write_eof(self) -> None:
- """Write EOF to the ffmpeg stdin."""
- await self._ffmpeg_proc.write_eof()
- await self._ffmpeg_proc.wait()
- await self.stop()
-
async def stop(self, wait: bool = True):
"""Stop playback and cleanup."""
if self._cliraop_proc.closed and self._ffmpeg_proc.closed:
return
self._stop_requested = True
+ empty_queue(self.buffer)
async def _stop() -> None:
# ffmpeg MUST be stopped before cliraop due to the chained pipes
if airplay_player.active_stream.start_ntp != start_ntp:
# checksum mismatch
continue
- tg.create_task(airplay_player.active_stream.write_chunk(chunk))
+ tg.create_task(airplay_player.active_stream.buffer.put(chunk))
active_clients += 1
if active_clients == 0:
# no more clients
or airplay_player.active_stream.start_ntp != start_ntp
):
continue
- tg.create_task(airplay_player.active_stream.write_eof())
+ tg.create_task(airplay_player.active_stream.buffer.put(b"EOF"))
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.