StreamType,
VolumeNormalizationMode,
)
-from music_assistant_models.errors import QueueEmpty
+from music_assistant_models.errors import AudioError, QueueEmpty
from music_assistant_models.media_items import AudioFormat
from music_assistant_models.player_queue import PlayLogEntry
get_chunksize,
get_media_stream,
get_player_filter_params,
- get_silence,
get_stream_details,
resample_pcm_audio,
)
)
# Start periodic garbage collection task
# This ensures memory from audio buffers and streams is cleaned up regularly
- # DISABLED FOR TESTING - may cause event loop blocking
- # self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
+ self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
async def close(self) -> None:
"""Cleanup on exit."""
break
if queue_item.streamdetails.stream_error:
self.logger.error(
- "Error streaming QueueItem %s (%s) to %s",
+ "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
queue_item.name,
queue_item.uri,
queue.display_name,
)
- # some players do not like it when we dont return anything after an error
- # so we send some silence so they move on to the next track on their own (hopefully)
- async for chunk in get_silence(10, output_format):
- try:
- await resp.write(chunk)
- except (BrokenPipeError, ConnectionResetError, ConnectionError):
- break
+ # try to skip to the next item in the queue after a short delay
+ self.mass.call_later(5, self.mass.player_queues.next(queue_id))
return resp
async def serve_queue_flow_stream(self, request: web.Request) -> web.Response:
queue_track,
pcm_format=pcm_format,
seek_position=queue_track.streamdetails.seek_position,
+ raise_on_error=False,
):
if not first_chunk_received:
first_chunk_received = True
#### HANDLE END OF TRACK
if last_fadeout_part:
# edge case: we did not get enough data to make the crossfade
- yield last_fadeout_part
+ for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
+ yield _chunk
+ del _chunk
bytes_written += len(last_fadeout_part)
last_fadeout_part = b""
if self._crossfade_allowed(
elif buffer:
# no crossfade enabled, just yield the buffer last part
bytes_written += len(buffer)
- yield buffer
+ for _chunk in divide_chunks(buffer, pcm_sample_size):
+ yield _chunk
+ del _chunk
# make sure the buffer gets cleaned up
del buffer
#### HANDLE END OF QUEUE FLOW STREAM
# end of queue flow: make sure we yield the last_fadeout_part
if last_fadeout_part:
- yield last_fadeout_part
+ for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
+ yield _chunk
+ del _chunk
# correct seconds streamed/duration
last_part_seconds = len(last_fadeout_part) / pcm_sample_size
queue_track.streamdetails.seconds_streamed += last_part_seconds
queue_item: QueueItem,
pcm_format: AudioFormat,
seek_position: int = 0,
+ raise_on_error: bool = True,
) -> AsyncGenerator[bytes, None]:
"""Get the (PCM) audio stream for a single queue item."""
# collect all arguments for ffmpeg
yield chunk
# help garbage collection by explicitly deleting chunk
del chunk
- if not bytes_received:
- self.logger.error("No audio data received from source for %s", queue_item.name)
finished = True
+ except AudioError as err:
+ queue_item.streamdetails.stream_error = True
+ queue_item.available = False
+ if raise_on_error:
+ raise
+ # yes, we swallow the error here after logging it
+ # so the outer stream can handle it gracefully
+ self.logger.error(
+ "AudioError while streaming queue item %s (%s): %s",
+ queue_item.name,
+ queue_item.streamdetails.uri,
+ err,
+ )
finally:
# determine how many seconds we've streamed
# for pcm output we can calculate this easily
streamdetails = queue_item.streamdetails
assert streamdetails
- crossfade_data = self._crossfade_data.get(queue.queue_id)
+ crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
- if crossfade_data and crossfade_data.queue_item_id != queue_item.queue_item_id:
+ if crossfade_data and streamdetails.seek_position > 0:
+ # don't do crossfade when seeking into track
+ crossfade_data = None
+ if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
# edge case alert: the next item changed just while we were preloading/crossfading
self.logger.warning(
"Skipping crossfade data for queue %s - next item changed!", queue.display_name
# send the (second half of the) crossfade data
if crossfade_data.pcm_format != pcm_format:
# edge case: pcm format mismatch, we need to resample
- crossfade_data.data = await resample_pcm_audio(
+ resampled_data = await resample_pcm_audio(
crossfade_data.data,
crossfade_data.pcm_format,
pcm_format,
)
- yield crossfade_data.data
+ for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
+ yield _chunk
+ else:
+ for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
+ yield _chunk
bytes_written += len(crossfade_data.data)
# clear vars
crossfade_data = None
crossfade_data.pcm_format,
pcm_format,
)
- yield crossfade_data.data
+ for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
+ yield _chunk
bytes_written += len(crossfade_data.data)
crossfade_data = None
+ next_queue_item: QueueItem | None = None
if not self._crossfade_allowed(
queue_item, smart_fades_mode=smart_fades_mode, flow_mode=False
):
# no crossfade enabled/allowed, just yield the buffer last part
bytes_written += len(buffer)
- yield buffer
+ for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
+ yield _chunk
else:
# if crossfade is enabled, save fadeout part in buffer to pickup for next track
fade_out_data = buffer
buffer = b""
# get next track for crossfade
+ self.logger.debug(
+ "Preloading NEXT track for crossfade for queue %s",
+ queue.display_name,
+ )
try:
next_queue_item = await self.mass.player_queues.load_next_queue_item(
queue.queue_id, queue_item.queue_item_id
mode=smart_fades_mode,
)
# send half of the crossfade_part (= approx the fadeout part)
- crossfade_first, crossfade_second = (
- crossfade_bytes[: len(crossfade_bytes) // 2 + len(crossfade_bytes) % 2],
- crossfade_bytes[len(crossfade_bytes) // 2 + len(crossfade_bytes) % 2 :],
- )
+ split_point = (len(crossfade_bytes) + 1) // 2
+ crossfade_first = crossfade_bytes[:split_point]
+ crossfade_second = crossfade_bytes[split_point:]
del crossfade_bytes
bytes_written += len(crossfade_first)
- yield crossfade_first
- del crossfade_first
+ for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
+ yield _chunk
# store the other half for the next track
self._crossfade_data[queue_item.queue_id] = CrossfadeData(
data=crossfade_second,
pcm_format=pcm_format,
queue_item_id=next_queue_item.queue_item_id,
)
-
- except QueueEmpty:
- # end of queue reached or crossfade failed - no crossfade possible
+ except (QueueEmpty, AudioError):
+ # end of queue reached, next item skipped or crossfade failed
+ # no crossfade possible, just yield the fade_out_data
+ next_queue_item = None
yield fade_out_data
bytes_written += len(fade_out_data)
del fade_out_data
streamdetails.seconds_streamed = seconds_streamed
streamdetails.duration = streamdetails.seek_position + seconds_streamed
self.logger.debug(
- "Finished Streaming queue track: %s (%s) on queue %s",
+ "Finished Streaming queue track: %s (%s) on queue %s "
+ "- crossfade data prepared for next track: %s",
queue_item.streamdetails.uri,
queue_item.name,
queue.display_name,
+ next_queue_item.name if next_queue_item else "N/A",
)
def _log_request(self, request: web.Request) -> None:
):
chunk_count += 1
await audio_buffer.put(chunk)
+ # Only set EOF if we completed successfully
+ await audio_buffer.set_eof()
except asyncio.CancelledError:
status = "cancelled"
raise
status = "aborted with error"
raise
finally:
- await audio_buffer.set_eof()
LOGGER.log(
VERBOSE_LOG_LEVEL,
"fill_buffer_task: %s (%s chunks) for %s",
logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.")
# wait until stderr also completed reading
await ffmpeg_proc.wait_with_timeout(5)
+ if ffmpeg_proc.returncode not in (0, None):
+ log_trail = "\n".join(list(ffmpeg_proc.log_history)[-5:])
+ raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}: {log_trail}")
if bytes_sent == 0:
- # edge case: no audio data was sent
+ # edge case: no audio data was received at all
raise AudioError("No audio was received")
- elif ffmpeg_proc.returncode not in (0, None):
- raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}")
finished = True
except (Exception, GeneratorExit, asyncio.CancelledError) as err:
if isinstance(err, asyncio.CancelledError | GeneratorExit):
# we were cancelled, just raise
cancelled = True
raise
- logger.error("Error while streaming %s: %s", streamdetails.uri, err)
# dump the last 10 lines of the log in case of an unclean exit
logger.warning("\n".join(list(ffmpeg_proc.log_history)[-10:]))
- streamdetails.stream_error = True
- raise
+ raise AudioError(f"Error while streaming: {err}") from err
finally:
# always ensure close is called which also handles all cleanup
await ffmpeg_proc.close()