from __future__ import annotations
import asyncio
-import logging
import time
from collections.abc import AsyncGenerator
from contextlib import suppress
from music_assistant.helpers.audio import get_player_filter_params
from music_assistant.helpers.ffmpeg import FFMpeg
-from music_assistant.helpers.util import TaskManager, close_async_generator
+from music_assistant.helpers.util import TaskManager
from music_assistant.providers.airplay.helpers import unix_time_to_ntp
from .constants import CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT, StreamingProtocol
self._lock = asyncio.Lock()
self.start_ntp: int = 0
self.start_time: float = 0.0
+ self.wait_start: float = 0.0 # in seconds
self.seconds_streamed: float = 0 # Total seconds sent to session
# because we reuse an existing stream session for new play_media requests,
# we need to track when the last stream was started
wait_start_seconds = wait_start / 1000
self.wait_start = wait_start_seconds # in seconds
self.start_time = cur_time + wait_start_seconds
+ self.last_stream_started = self.start_time
self.start_ntp = unix_time_to_ntp(self.start_time)
self.prov.logger.debug(
"Starting stream session with %d clients",
async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
"""Replace the audio source of the stream."""
- # Cancel the current audio source task
- assert self._audio_source_task # for type checker
- self._audio_source_task.cancel()
- with suppress(asyncio.CancelledError):
- await self._audio_source_task
- # Restart the (player-specific) ffmpeg stream for all players
- # This is the easiest way to ensure the new audio source is used
- # as quickly as possible, without waiting for the buffers to be drained
- # It also allows changing the player settings such as DSP on the fly
- async with self._lock, TaskManager(self.mass) as tm:
- for sync_client in self.sync_clients:
- if not sync_client.stream:
- continue # guard
- tm.create_task(self._start_client_ffmpeg(sync_client))
- # Set new audio source and restart the stream
- self._audio_source = audio_source
- self._audio_source_task = asyncio.create_task(self._audio_streamer())
- self.last_stream_started = time.time()
+ async with self._lock:
+ # Cancel the current audio source task
+ assert self._audio_source_task # for type checker
+ self._audio_source_task.cancel()
+ # Set new audio source and restart the stream
+ self._audio_source = audio_source
+ self._audio_source_task = asyncio.create_task(self._audio_streamer())
+ self.last_stream_started = time.time() + self.wait_start
for sync_client in self.sync_clients:
sync_client.set_state_from_stream(state=None, elapsed_time=0)
+ # ensure we cleanly wait for the old audio source task to finish
+ with suppress(asyncio.CancelledError):
+ await self._audio_source_task
- async def _audio_streamer(self) -> None: # noqa: PLR0915
+ async def _audio_streamer(self) -> None:
"""Stream audio to all players."""
- generator_exhausted = False
_last_metadata: str | None = None
pcm_sample_size = self.pcm_format.pcm_sample_size
stream_start_time = time.time()
first_chunk_received = False
- try:
- # each chunk is exactly one second of audio data based on the pcm format.
- async for chunk in self._audio_source:
- if first_chunk_received is False:
- first_chunk_received = True
+ # each chunk is exactly one second of audio data based on the pcm format.
+ async for chunk in self._audio_source:
+ if first_chunk_received is False:
+ first_chunk_received = True
+ self.prov.logger.debug(
+ "First audio chunk received after %.3fs, "
+ "which is %.3fs before scheduled start time",
+ time.time() - stream_start_time,
+ self.last_stream_started - time.time(),
+ )
+ # wait until the clients are ready to receive audio
+ await asyncio.wait_for(self._clients_ready.wait(), timeout=10)
+ async with self._lock:
+ sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
+ if not sync_clients:
self.prov.logger.debug(
- "First audio chunk received after %.3fs, "
- "which is %.3fs before scheduled start time",
- time.time() - stream_start_time,
- time.time() - self.start_time,
+ "Audio streamer exiting: No running clients left in session"
)
- # wait until the clients are ready to receive audio
- await asyncio.wait_for(self._clients_ready.wait(), timeout=10)
- async with self._lock:
- sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
- if not sync_clients:
- self.prov.logger.debug(
- "Audio streamer exiting: No running clients left in session"
+ return
+
+ # Write chunk to all players
+ write_tasks = [
+ self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream
+ ]
+ results = await asyncio.gather(*write_tasks, return_exceptions=True)
+
+ # Check for write errors or timeouts
+ players_to_remove: list[AirPlayPlayer] = []
+ for i, result in enumerate(results):
+ if i >= len(sync_clients):
+ continue
+ player = sync_clients[i]
+
+ if isinstance(result, asyncio.TimeoutError):
+ self.prov.logger.error(
+ "TIMEOUT writing chunk to player %s - REMOVING from sync group!",
+ player.player_id,
)
- return
-
- # Write chunk to all players
- write_tasks = [
- self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream
- ]
- results = await asyncio.gather(*write_tasks, return_exceptions=True)
-
- # Check for write errors or timeouts
- players_to_remove: list[AirPlayPlayer] = []
- for i, result in enumerate(results):
- if i >= len(sync_clients):
- continue
- player = sync_clients[i]
-
- if isinstance(result, asyncio.TimeoutError):
- self.prov.logger.error(
- "TIMEOUT writing chunk to player %s - REMOVING from sync group!",
- player.player_id,
- )
- players_to_remove.append(player)
- elif isinstance(result, Exception):
- self.prov.logger.error(
- (
- "Error writing chunk to player %s: %s - "
- "REMOVING from sync group!"
- ),
- player.player_id,
- result,
- )
- players_to_remove.append(player)
-
- # Remove failed/timed-out players from sync group
- for player in players_to_remove:
- if player in self.sync_clients:
- self.sync_clients.remove(player)
- self.prov.logger.warning(
- "Player %s removed from sync group due to write failure/timeout",
- player.player_id,
- )
- # Stop the player's stream
- if player.stream:
- self.mass.create_task(player.stream.stop())
-
- # Update chunk counter (each chunk is exactly one second of audio)
- chunk_seconds = len(chunk) / pcm_sample_size
- self.seconds_streamed += chunk_seconds
-
- # send metadata if changed
- # do this in a separate task to not disturb audio streaming
- # NOTE: we should probably move this out of the audio stream task into it's own task
- metadata: PlayerMedia | None
- if (
- self.sync_clients
- and (_leader := self.sync_clients[0])
- and (_leader.corrected_elapsed_time or 0) > 2
- and (metadata := _leader.current_media) is not None
- ):
- now = time.time()
- metadata_checksum = f"{metadata.uri}.{metadata.title}.{metadata.image_url}"
- progress = int(metadata.corrected_elapsed_time or 0)
- if _last_metadata != metadata_checksum:
- _last_metadata = metadata_checksum
- prev_progress_report = now
- self.mass.create_task(self._send_metadata(progress, metadata))
- # send the progress report every 5 seconds
- elif now - prev_progress_report >= 5:
- prev_progress_report = now
- self.mass.create_task(self._send_metadata(progress, None))
- # Entire stream consumed: send EOF
- generator_exhausted = True
- async with self._lock:
- await asyncio.gather(
- *[
- self._write_eof_to_player(x)
- for x in self.sync_clients
- if x.stream and x.stream.running
- ],
- return_exceptions=True,
- )
- except Exception as err:
- logger = self.prov.logger
- logger.error(
- "Stream error: %s",
- str(err) or err.__class__.__name__,
- exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
+ players_to_remove.append(player)
+ elif isinstance(result, Exception):
+ self.prov.logger.error(
+ ("Error writing chunk to player %s: %s - REMOVING from sync group!"),
+ player.player_id,
+ result,
+ )
+ players_to_remove.append(player)
+
+ # Remove failed/timed-out players from sync group
+ for player in players_to_remove:
+ if player in self.sync_clients:
+ self.sync_clients.remove(player)
+ self.prov.logger.warning(
+ "Player %s removed from sync group due to write failure/timeout",
+ player.player_id,
+ )
+ # Stop the player's stream
+ if player.stream:
+ self.mass.create_task(player.stream.stop())
+
+ # Update chunk counter (each chunk is exactly one second of audio)
+ chunk_seconds = len(chunk) / pcm_sample_size
+ self.seconds_streamed += chunk_seconds
+
+ # send metadata if changed
+ # do this in a separate task to not disturb audio streaming
+ # NOTE: we should probably move this out of the audio stream task into it's own task
+ metadata: PlayerMedia | None
+ if (
+ self.sync_clients
+ and (_leader := self.sync_clients[0])
+ and (_leader.corrected_elapsed_time or 0) > 2
+ and (metadata := _leader.current_media) is not None
+ ):
+ now = time.time()
+ metadata_checksum = f"{metadata.uri}.{metadata.title}.{metadata.image_url}"
+ progress = int(metadata.corrected_elapsed_time or 0)
+ if _last_metadata != metadata_checksum:
+ _last_metadata = metadata_checksum
+ prev_progress_report = now
+ self.mass.create_task(self._send_metadata(progress, metadata))
+ # send the progress report every 5 seconds
+ elif now - prev_progress_report >= 5:
+ prev_progress_report = now
+ self.mass.create_task(self._send_metadata(progress, None))
+ # Entire stream consumed: send EOF
+ async with self._lock:
+ await asyncio.gather(
+ *[
+ self._write_eof_to_player(x)
+ for x in self.sync_clients
+ if x.stream and x.stream.running
+ ],
+ return_exceptions=True,
)
- raise
- finally:
- if not generator_exhausted:
- await close_async_generator(self._audio_source)
async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
"""
output_format=airplay_player.stream.pcm_format,
filter_params=filter_params,
audio_output=airplay_player.stream.audio_pipe.path,
- extra_input_args=["-y"],
+ extra_input_args=["-y", "-readrate", "1.0", "-readrate_initial_burst", "1.2"],
)
await ffmpeg.start()
self._player_ffmpeg[airplay_player.player_id] = ffmpeg