self._audio_source = audio_source
self._audio_source_task: asyncio.Task[None] | None = None
self._player_ffmpeg: dict[str, FFMpeg] = {}
- self._player_start_chunk: dict[str, int] = {} # Chunk number when player joined
self._lock = asyncio.Lock()
self.start_ntp: int = 0
self.start_time: float = 0.0
self.chunks_streamed: int = 0 # Total chunks sent to session (each chunk = 1 second)
+ # because we reuse an existing stream session for new play_media requests,
+ # we need to track when the last stream was started
+ self.last_stream_started: float = 0.0
async def start(self) -> None:
"""Initialize stream session for all players."""
# Stop existing stream if running
if airplay_player.stream and airplay_player.stream.running:
await airplay_player.stream.stop()
- if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
- await ffmpeg.close()
- del ffmpeg
-
- self._player_start_chunk[airplay_player.player_id] = 1
# Create appropriate stream type based on protocol
if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
else:
airplay_player.stream = RaopStream(self, airplay_player)
- # create optional FFMpeg instance per player if needed
- # this is used to do any optional DSP processing/filtering
- filter_params = get_player_filter_params(
- self.mass,
- airplay_player.player_id,
- self.pcm_format,
- airplay_player.stream.pcm_format,
- )
- if filter_params or self.pcm_format != airplay_player.stream.pcm_format:
- ffmpeg = FFMpeg(
- audio_input="-",
- input_format=self.pcm_format,
- output_format=airplay_player.stream.pcm_format,
- filter_params=filter_params,
- )
- await ffmpeg.start()
- self._player_ffmpeg[airplay_player.player_id] = ffmpeg
+ await self._start_client_ffmpeg(airplay_player)
+ # start the stream
await airplay_player.stream.start(self.start_ntp)
- # Tracking will be initialized on first write
-
async with TaskManager(self.mass) as tm:
for _airplay_player in self.sync_clients:
tm.create_task(_start_client(_airplay_player))
if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
await ffmpeg.close()
del ffmpeg
- # Clean up player tracking
- self._player_start_chunk.pop(airplay_player.player_id, None)
await airplay_player.stream.stop()
airplay_player.stream = None
# If this was the last client, stop the session
if airplay_player.stream and airplay_player.stream.running:
await airplay_player.stream.stop()
- # Clean up any existing FFmpeg instance for this player
- if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
- await ffmpeg.close()
- del ffmpeg
-
# Create appropriate stream type based on protocol
if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
airplay_player.stream = AirPlay2Stream(self, airplay_player)
else:
airplay_player.stream = RaopStream(self, airplay_player)
- # Create optional FFMpeg instance per player if needed
- filter_params = get_player_filter_params(
- self.mass,
- airplay_player.player_id,
- self.pcm_format,
- airplay_player.stream.pcm_format,
- )
- if filter_params or self.pcm_format != airplay_player.stream.pcm_format:
- ffmpeg = FFMpeg(
- audio_input="-",
- input_format=self.pcm_format,
- output_format=airplay_player.stream.pcm_format,
- filter_params=filter_params,
- )
- await ffmpeg.start()
- self._player_ffmpeg[airplay_player.player_id] = ffmpeg
-
# Snapshot chunks_streamed inside lock to prevent race conditions
# Keep lock held during stream.start() to ensure player doesn't miss any chunks
async with self._lock:
+ # (re)start the player specific ffmpeg process
+ await self._start_client_ffmpeg(airplay_player)
+
# Calculate skip_seconds based on how many chunks have been sent
skip_seconds = self.chunks_streamed
self._audio_source_task.cancel()
with suppress(asyncio.CancelledError):
await self._audio_source_task
- # Set new audio source and restart the stream
- self._audio_source = audio_source
- self._audio_source_task = asyncio.create_task(self._audio_streamer())
# Restart the (player-specific) ffmpeg stream for all players
# This is the easiest way to ensure the new audio source is used
# as quickly as possible, without waiting for the buffers to be drained
# It also allows changing the player settings such as DSP on the fly
- # for sync_client in self.sync_clients:
- # if not sync_client.stream:
- # continue # guard
- # sync_client.stream.start_ffmpeg_stream()
+ async with self._lock, TaskManager(self.mass) as tm:
+ for sync_client in self.sync_clients:
+ if not sync_client.stream:
+ continue # guard
+ tm.create_task(self._start_client_ffmpeg(sync_client))
+ # Set new audio source and restart the stream
+ self._audio_source = audio_source
+ self._audio_source_task = asyncio.create_task(self._audio_streamer())
+
+ self.last_stream_started = time.time()
+ for sync_client in self.sync_clients:
+ sync_client.set_state_from_stream(state=None, elapsed_time=0)
async def _audio_streamer(self) -> None:
"""Stream audio to all players."""
chunk_number = self.chunks_streamed + 1
player_id = airplay_player.player_id
- # Calculate chunk offset based on actual time vs start time
- self._player_start_chunk.pop(player_id, None)
-
# if the player has an associated FFMpeg instance, use that first
if ffmpeg := self._player_ffmpeg.get(player_id):
await ffmpeg.write(chunk)
],
return_exceptions=True,
)
+
+ async def _start_client_ffmpeg(self, airplay_player: AirPlayPlayer) -> None:
+ """Start or restart the player's ffmpeg stream."""
+ # Clean up any existing FFmpeg instance for this player
+ if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
+ await ffmpeg.close()
+ del ffmpeg
+ assert airplay_player.stream # for type checker
+ # Create optional FFMpeg instance per player if needed
+ filter_params = get_player_filter_params(
+ self.mass,
+ airplay_player.player_id,
+ self.pcm_format,
+ airplay_player.stream.pcm_format,
+ )
+ if filter_params or self.pcm_format != airplay_player.stream.pcm_format:
+ ffmpeg = FFMpeg(
+ audio_input="-",
+ input_format=self.pcm_format,
+ output_format=airplay_player.stream.pcm_format,
+ filter_params=filter_params,
+ )
+ await ffmpeg.start()
+ self._player_ffmpeg[airplay_player.player_id] = ffmpeg