from music_assistant_models.errors import PlayerCommandFailed
from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
-from music_assistant.helpers.audio import get_player_filter_params
+from music_assistant.helpers.audio import get_chunksize, get_player_filter_params
from music_assistant.helpers.ffmpeg import FFMpeg
from music_assistant.helpers.process import AsyncProcess, check_output
from music_assistant.helpers.util import TaskManager, close_async_generator
"""Initialize RaopStreamSession."""
# initialize raop stream for all players
- async def audio_streamer() -> None:
- """Stream audio to all players."""
- generator_exhausted = False
- try:
- async for chunk in self._audio_source:
- async with self._lock:
- sync_clients = [
- x for x in self._sync_clients if x.raop_stream and x.raop_stream.running
- ]
- if not sync_clients:
- return
- await asyncio.gather(
- *[
- x.raop_stream.write_chunk(chunk)
- for x in sync_clients
- if x.raop_stream
- ],
- return_exceptions=True,
- )
- # entire stream consumed: send EOF
- generator_exhausted = True
- async with self._lock:
- await asyncio.gather(
- *[
- x.raop_stream.write_eof()
- for x in self._sync_clients
- if x.raop_stream and x.raop_stream.running
- ],
- return_exceptions=True,
- )
- except Exception as err:
- logger = self.prov.logger
- logger.error(
- "Stream error: %s",
- str(err) or err.__class__.__name__,
- exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
- )
- raise
- finally:
- if not generator_exhausted:
- await close_async_generator(self._audio_source)
-
# get current ntp and start RaopStream per player
assert self.prov.cliraop_bin
_, stdout = await check_output(self.prov.cliraop_bin, "-ntp")
async with TaskManager(self.mass) as tm:
for _raop_player in self._sync_clients:
tm.create_task(_start_client(_raop_player))
- self._audio_source_task = asyncio.create_task(audio_streamer())
+ self._audio_source_task = asyncio.create_task(self._audio_streamer())
async def stop(self) -> None:
"""Stop playback and cleanup."""
return
assert airplay_player.raop_stream
assert airplay_player.raop_stream.session == self
- self._sync_clients.remove(airplay_player)
+ async with self._lock:
+ self._sync_clients.remove(airplay_player)
await airplay_player.raop_stream.stop()
airplay_player.raop_stream = None
# e.g. by counting the number of frames sent etc.
raise NotImplementedError("Adding clients to a session is not yet supported")
+ async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
+ """Replace the audio source of the stream."""
+ # cancel the per-player ffmpeg reader tasks
+ for _raop_player in self._sync_clients:
+ assert _raop_player.raop_stream # for type checker
+ assert _raop_player.raop_stream.ffmpeg_reader_task # for type checker
+ _raop_player.raop_stream.ffmpeg_reader_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await _raop_player.raop_stream.ffmpeg_reader_task
+ # cancel the current audio source task
+ assert self._audio_source_task # for type checker
+ self._audio_source_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._audio_source_task
+ # set new audio source and restart the stream
+ self._audio_source = audio_source
+ self._audio_source_task = asyncio.create_task(self._audio_streamer())
+ for _raop_player in self._sync_clients:
+ assert _raop_player.raop_stream # for type checker
+ _raop_player.raop_stream.ffmpeg_reader_task = self.mass.create_task(
+ _raop_player.raop_stream.ffmpeg_reader()
+ )
+
+ async def _audio_streamer(self) -> None:
+ """Stream audio to all players."""
+ generator_exhausted = False
+ try:
+ async for chunk in self._audio_source:
+ async with self._lock:
+ sync_clients = [
+ x for x in self._sync_clients if x.raop_stream and x.raop_stream.running
+ ]
+ if not sync_clients:
+ return
+ await asyncio.gather(
+ *[x.raop_stream.write_chunk(chunk) for x in sync_clients if x.raop_stream],
+ return_exceptions=True,
+ )
+ # entire stream consumed: send EOF
+ generator_exhausted = True
+ async with self._lock:
+ await asyncio.gather(
+ *[
+ x.raop_stream.write_eof()
+ for x in self._sync_clients
+ if x.raop_stream and x.raop_stream.running
+ ],
+ return_exceptions=True,
+ )
+ except Exception as err:
+ logger = self.prov.logger
+ logger.error(
+ "Stream error: %s",
+ str(err) or err.__class__.__name__,
+ exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
+ )
+ raise
+ finally:
+ if not generator_exhausted:
+ await close_async_generator(self._audio_source)
+
class RaopStream:
"""
self._stderr_reader_task: asyncio.Task[None] | None = None
self._cliraop_proc: AsyncProcess | None = None
self._ffmpeg_proc: AsyncProcess | None = None
+ self.ffmpeg_reader_task: asyncio.Task[None] | None = None
self._started = asyncio.Event()
self._stopped = False
+ self._total_bytes_sent = 0
+ self._stream_bytes_sent = 0
@property
def running(self) -> bool:
read_ahead = await self.mass.config.get_player_config_value(
player_id, CONF_READ_AHEAD_BUFFER
)
- # create os pipes to pipe ffmpeg to cliraop
- read, write = await asyncio.to_thread(os.pipe)
# ffmpeg handles the player specific stream + filters and pipes
# audio to the cliraop process
- self._ffmpeg_proc = FFMpeg(
- audio_input="-",
- input_format=self.session.input_format,
- output_format=AIRPLAY_PCM_FORMAT,
- filter_params=get_player_filter_params(
- self.mass, player_id, self.session.input_format, AIRPLAY_PCM_FORMAT
- ),
- audio_output=write,
- )
- await self._ffmpeg_proc.start()
- await asyncio.to_thread(os.close, write)
+ self.ffmpeg_reader_task = self.mass.create_task(self.ffmpeg_reader())
# cliraop is the binary that handles the actual raop streaming to the player
# this is a slightly modified bversion of philippe44's libraop
self.airplay_player.address,
"-",
]
- self._cliraop_proc = AsyncProcess(cliraop_args, stdin=read, stderr=True, name="cliraop")
+ self._cliraop_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
if platform.system() == "Darwin":
os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
await self._cliraop_proc.start()
- await asyncio.to_thread(os.close, read)
# read first 10 lines of stderr to get the initial status
for _ in range(10):
line = (await self._cliraop_proc.read_stderr()).decode("utf-8", errors="ignore")
self._started.set()
break
if "Cannot connect to AirPlay device" in line:
+ self.ffmpeg_reader_task.cancel()
raise PlayerCommandFailed("Cannot connect to AirPlay device")
# start reading the stderr of the cliraop process from another task
self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
await self._cliraop_proc.wait_with_timeout(2)
if self._stderr_reader_task and not self._stderr_reader_task.done():
self._stderr_reader_task.cancel()
+ if self.ffmpeg_reader_task and not self.ffmpeg_reader_task.done():
+ self.ffmpeg_reader_task.cancel()
if self._cliraop_proc.proc and not self._cliraop_proc.closed:
await self._cliraop_proc.close(True)
- if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
- await self._ffmpeg_proc.close(True)
- self._cliraop_proc = None
- self._ffmpeg_proc = None
async def write_chunk(self, chunk: bytes) -> None:
"""Write a (pcm) audio chunk."""
self.airplay_player.last_command_sent = time.time()
await asyncio.to_thread(send_data)
- async def _stderr_reader(self) -> None: # noqa: PLR0915
+ async def ffmpeg_reader(self) -> None:
+ """Read audio from the audio source and pipe it to the CLIRaop process."""
+ self._ffmpeg_proc = FFMpeg(
+ audio_input="-",
+ input_format=self.session.input_format,
+ output_format=AIRPLAY_PCM_FORMAT,
+ filter_params=get_player_filter_params(
+ self.mass,
+ self.airplay_player.player_id,
+ self.session.input_format,
+ AIRPLAY_PCM_FORMAT,
+ ),
+ )
+ self._stream_bytes_sent = 0
+ mass_player = self.mass.players.get(self.airplay_player.player_id)
+ assert mass_player # for type checker
+ try:
+ await self._ffmpeg_proc.start()
+ chunksize = get_chunksize(AIRPLAY_PCM_FORMAT)
+ # wait for cliraop to be ready
+ await asyncio.wait_for(self._started.wait(), 20)
+ async for chunk in self._ffmpeg_proc.iter_chunked(chunksize):
+ if self._stopped:
+ break
+ if not self._cliraop_proc or self._cliraop_proc.closed:
+ break
+ await self._cliraop_proc.write(chunk)
+ self._stream_bytes_sent += len(chunk)
+ self._total_bytes_sent += len(chunk)
+ del chunk
+ # we base elapsed time on the amount of bytes sent
+ # so we can account for reusing the same session for multiple streams
+ mass_player.elapsed_time = self._stream_bytes_sent / chunksize
+ mass_player.elapsed_time_last_updated = time.time()
+ if self._cliraop_proc and not self._cliraop_proc.closed:
+ await self._cliraop_proc.write_eof()
+ finally:
+ await self._ffmpeg_proc.close()
+
+ async def _stderr_reader(self) -> None:
"""Monitor stderr for the running CLIRaop process."""
airplay_player = self.airplay_player
mass_player = self.mass.players.get(airplay_player.player_id)
async for line in self._cliraop_proc.iter_stderr():
if "elapsed milliseconds:" in line:
# this is received more or less every second while playing
- millis = int(line.split("elapsed milliseconds: ")[1])
- mass_player.elapsed_time = millis / 1000
- mass_player.elapsed_time_last_updated = time.time()
+ # millis = int(line.split("elapsed milliseconds: ")[1])
+ # mass_player.elapsed_time = (millis / 1000) - self.elapsed_time_correction
+ # mass_player.elapsed_time_last_updated = time.time()
# send metadata to player(s) if needed
# NOTE: this must all be done in separate tasks to not disturb audio
now = time.time()
if (
- mass_player.elapsed_time > 2
+ (mass_player.elapsed_time or 0) > 2
and queue
and queue.current_item
and queue.current_item.streamdetails