break
if queue_track.streamdetails is None:
- raise InvalidDataError(
- "No Streamdetails known for queue item %s",
+ self.logger.error(
+ "No StreamDetails for queue item %s (%s) on queue %s - skipping track",
queue_track.queue_item_id,
+ queue_track.name,
+ queue.display_name,
)
+ continue
self.logger.debug(
"Start Streaming queue track: %s (%s) for queue %s",
fadein_part = buffer[:crossfade_buffer_size]
remaining_bytes = buffer[crossfade_buffer_size:]
# Use the mixer to handle all crossfade logic
- crossfade_part = await self._smart_fades_mixer.mix(
- fade_in_part=fadein_part,
- fade_out_part=last_fadeout_part,
- fade_in_streamdetails=queue_track.streamdetails,
- fade_out_streamdetails=last_streamdetails,
- pcm_format=pcm_format,
- standard_crossfade_duration=standard_crossfade_duration,
- mode=smart_fades_mode,
- )
- # because the crossfade exists of both the fadein and fadeout part
- # we need to correct the bytes_written accordingly so the duration
- # calculations at the end of the track are correct
- crossfade_part_len = len(crossfade_part)
- bytes_written += int(crossfade_part_len / 2)
- if last_play_log_entry:
- assert last_play_log_entry.seconds_streamed is not None
- last_play_log_entry.seconds_streamed += (
- crossfade_part_len / 2 / pcm_sample_size
+ try:
+ crossfade_part = await self._smart_fades_mixer.mix(
+ fade_in_part=fadein_part,
+ fade_out_part=last_fadeout_part,
+ fade_in_streamdetails=queue_track.streamdetails,
+ fade_out_streamdetails=last_streamdetails,
+ pcm_format=pcm_format,
+ standard_crossfade_duration=standard_crossfade_duration,
+ mode=smart_fades_mode,
)
- # yield crossfade_part (in pcm_sample_size chunks)
- for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
- yield _chunk
- del _chunk
- del crossfade_part
+ except Exception as mix_err:
+ self.logger.warning(
+ "Crossfade mixer failed for %s, falling back to simple concat: %s",
+ queue_track.name,
+ mix_err,
+ )
+ # Fallback: just output the fadeout part then the buffer
+ for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
+ yield _chunk
+ bytes_written += len(_chunk)
+ del _chunk
+ crossfade_part = b""
+ remaining_bytes = buffer
+ if crossfade_part:
+ # because the crossfade exists of both the fadein and fadeout part
+ # we need to correct the bytes_written accordingly so the duration
+ # calculations at the end of the track are correct
+ crossfade_part_len = len(crossfade_part)
+ bytes_written += int(crossfade_part_len / 2)
+ if last_play_log_entry:
+ assert last_play_log_entry.seconds_streamed is not None
+ last_play_log_entry.seconds_streamed += (
+ crossfade_part_len / 2 / pcm_sample_size
+ )
+ # yield crossfade_part (in pcm_sample_size chunks)
+ for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
+ yield _chunk
+ del _chunk
+ del crossfade_part
# also write the leftover bytes from the crossfade action
if remaining_bytes:
yield remaining_bytes
buffer = buffer[pcm_sample_size:]
#### HANDLE END OF TRACK
+ if not first_chunk_received:
+ # Track failed to stream - no chunks received at all
+ self.logger.warning(
+ "Track %s (%s) on queue %s produced no audio data - skipping",
+ queue_track.name,
+ queue_track.streamdetails.uri if queue_track.streamdetails else "unknown",
+ queue.display_name,
+ )
+ # Clean up and continue to next track
+ queue_track.streamdetails.stream_error = True
+ del buffer
+ continue
if last_fadeout_part:
# edge case: we did not get enough data to make the crossfade
for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
streamdetails.uri,
err,
)
+ except asyncio.CancelledError:
+ # Don't swallow cancellation - let it propagate
+ raise
+ except Exception as err:
+ # Catch any other unexpected exceptions to prevent them from
+ # silently killing the entire queue stream
+ streamdetails.stream_error = True
+ if raise_on_error:
+ raise
+ self.logger.exception(
+ "Unexpected error while streaming queue item %s (%s): %s",
+ queue_item.name,
+ streamdetails.uri,
+ err,
+ )
finally:
# determine how many seconds we've streamed
# for pcm output we can calculate this easily
self._stdin_feeder_task.cancel()
# Always await the task to consume any exception and prevent
# "Task exception was never retrieved" errors.
- # Suppress CancelledError (from cancel) and any other exception
- # since exceptions have already been propagated through the generator chain.
- with suppress(asyncio.CancelledError, Exception):
+ try:
await self._stdin_feeder_task
+ except asyncio.CancelledError:
+ pass # Expected when we cancel the task
+ except Exception as err:
+ # Log unexpected exceptions from the stdin feeder before suppressing
+ # The audio source may have failed, and we need visibility into this
+ self.logger.warning(
+ "FFMpeg stdin feeder task ended with error: %s",
+ err,
+ )
if self._stderr_reader_task:
if not self._stderr_reader_task.done():
self._stderr_reader_task.cancel()
self._stdin_feeder_task.cancel()
# Always await the task to consume any exception and prevent
# "Task exception was never retrieved" errors.
- # Suppress CancelledError (from cancel) and any other exception
- # since exceptions have already been propagated through the generator chain.
- with suppress(asyncio.CancelledError, Exception):
+ try:
await self._stdin_feeder_task
+ except asyncio.CancelledError:
+ pass # Expected when we cancel the task
+ except Exception as err:
+ # Log unexpected exceptions from the stdin feeder before suppressing
+ LOGGER.warning(
+ "Process stdin feeder task ended with error: %s",
+ err,
+ )
# close stdin to signal we're done sending data
with suppress(TimeoutError, asyncio.CancelledError):
"""Stream audio to all players."""
pcm_sample_size = self.pcm_format.pcm_sample_size
watchdog_task = asyncio.create_task(self._silence_watchdog(pcm_sample_size))
+ stream_error: BaseException | None = None
try:
async for chunk in audio_source:
if not self._first_chunk_received.is_set():
self.prov.logger.debug("No running clients remaining, stopping audio streamer")
break
self.seconds_streamed += len(chunk) / pcm_sample_size
+ except asyncio.CancelledError:
+ self.prov.logger.debug("Audio streamer cancelled after %.1fs", self.seconds_streamed)
+ raise
+ except Exception as err:
+ stream_error = err
+ self.prov.logger.error(
+ "Audio source error after %.1fs of streaming: %s",
+ self.seconds_streamed,
+ err,
+ exc_info=err,
+ )
finally:
if not watchdog_task.done():
watchdog_task.cancel()
with suppress(asyncio.CancelledError):
await watchdog_task
+ if stream_error:
+ self.prov.logger.warning(
+ "Stream ended prematurely due to error - notifying players"
+ )
async with self._lock:
await asyncio.gather(
*[
for t in pending:
t.cancel()
await asyncio.gather(*pending, return_exceptions=True)
-
+ except asyncio.CancelledError:
+ self._logger.debug("Snapcast stream %s cancelled", self.stream_name)
+ raise
+ except Exception as err:
+ self._logger.error("Snapcast stream %s error: %s", self.stream_name, err, exc_info=err)
+ raise
finally:
self._is_streaming = False
self._logger.debug("Finished streaming to %s", stream_path)
- # Wait a bit for snap stream to become idle
- try:
-
- async def wait_until_idle() -> None:
- while True:
- stream_is_idle = False
- with suppress(KeyError):
- snap_stream = self._provider._snapserver.stream(self.stream_name)
- stream_is_idle = snap_stream.status == "idle"
- if self._mass.closing or stream_is_idle:
- break
- await asyncio.sleep(0.25)
-
- await asyncio.wait_for(wait_until_idle(), timeout=10.0)
- except TimeoutError:
- self._logger.warning(
- "Timeout waiting for stream %s to become idle",
- self.stream_name,
- )
- finally:
- self._streaming_started_at = None
+ await self._wait_stream_idle()
+
+ async def _wait_stream_idle(self) -> None:
+ """Wait for the Snapcast stream to become idle after streaming ends."""
+ try:
+
+ async def wait_until_idle() -> None:
+ while True:
+ stream_is_idle = False
+ with suppress(KeyError):
+ snap_stream = self._provider._snapserver.stream(self.stream_name)
+ stream_is_idle = snap_stream.status == "idle"
+ if self._mass.closing or stream_is_idle:
+ break
+ await asyncio.sleep(0.25)
+
+ await asyncio.wait_for(wait_until_idle(), timeout=10.0)
+ except TimeoutError:
+ self._logger.warning(
+ "Timeout waiting for stream %s to become idle",
+ self.stream_name,
+ )
+ finally:
+ self._streaming_started_at = None
def _on_streamer_done(self, t: asyncio.Task[None]) -> None:
"""Handle streamer task completion and optional cleanup."""
from __future__ import annotations
import asyncio
+import logging
from collections.abc import AsyncGenerator, Awaitable, Callable
from contextlib import suppress
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from music_assistant_models.media_items import AudioFormat
+from music_assistant.constants import MASS_LOGGER_NAME
from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
from music_assistant.helpers.util import empty_queue
+LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.providers.ugp_stream")
+
class UGPStream:
"""
async def _runner(self) -> None:
"""Run the stream for the given audio source."""
await asyncio.sleep(0.25) # small delay to allow subscribers to connect
- async for chunk in get_ffmpeg_stream(
- audio_input=self.audio_source,
- input_format=self.input_format,
- output_format=self.base_pcm_format,
- # we don't allow the player to buffer too much ahead so we use readrate limiting
- extra_input_args=["-readrate", "1.1", "-readrate_initial_burst", "10"],
- ):
- await asyncio.gather(
- *[sub(chunk) for sub in self.subscribers],
- return_exceptions=True,
- )
- # empty chunk when done
- await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True)
- self._done.set()
+ try:
+ async for chunk in get_ffmpeg_stream(
+ audio_input=self.audio_source,
+ input_format=self.input_format,
+ output_format=self.base_pcm_format,
+ # we don't allow the player to buffer too much ahead so we use readrate limiting
+ extra_input_args=["-readrate", "1.1", "-readrate_initial_burst", "10"],
+ ):
+ await asyncio.gather(
+ *[sub(chunk) for sub in self.subscribers],
+ return_exceptions=True,
+ )
+ except asyncio.CancelledError:
+ LOGGER.debug("UGP stream runner cancelled")
+ raise
+ except Exception as err:
+ LOGGER.error("UGP stream runner error: %s", err, exc_info=err)
+ finally:
+ # empty chunk when done
+ await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True)
+ self._done.set()