BYPASS_THROTTLER.set(True)
self.logger.debug(
- "loading (next) item for queue %s...",
+ "(pre)loading (next) item for queue %s...",
queue.display_name,
)
queue.index_in_buffer = self.index_by_id(queue_id, item_id)
self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
self.signal_update(queue_id)
- # enqueue the item on the player as soon as one is loaded
- if next_item := self.get_next_item(queue_id, item_id):
- self._enqueue_next_item(queue_id, next_item)
# preload next streamdetails
self._preload_next_item(queue_id, item_id)
def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
"""Update the existing queue items, mostly caused by reordering."""
self._queue_items[queue_id] = queue_items
- self._queues[queue_id].items = len(self._queue_items[queue_id])
+ queue = self._queues[queue_id]
+ queue.items = len(self._queue_items[queue_id])
# to track if the queue items changed we set a timestamp
# this is a simple way to detect changes in the list of items
# without having to compare the entire list
- self._queues[queue_id].items_last_updated = time.time()
+ queue.items_last_updated = time.time()
self.signal_update(queue_id, True)
+ if queue.state == PlayerState.PLAYING:
+ # if the queue is playing,
+ # ensure to (re)queue the next track because it might have changed
+ if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
+ self._enqueue_next_item(queue_id, next_item)
# Helper methods
if (next_index := self._get_next_index(queue_id, cur_index)) is None:
break
next_item = self.get_item(queue_id, next_index)
- if next_item.media_item and not next_item.media_item.available:
+ if not next_item.available:
# ensure that we skip unavailable items (set by load_next track logic)
continue
return next_item
)
def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
- """Enqueue/precache the next item on the player."""
+ """Enqueue the next item on the player."""
if not next_item:
# no next item, nothing to do...
return
)
task_id = f"enqueue_next_item_{queue_id}"
- self.mass.create_task(
- _enqueue_next_item_on_player(next_item), task_id=task_id, abort_existing=True
- )
+ self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
"""
- Preload the next item in the queue.
+ Preload the streamdetails for the next item in the queue/buffer.
This basically ensures the item is playable and fetches the stream details.
If caching is enabled, this will also start filling the stream cache.
If an error occurs, the item will be skipped and the next item will be loaded.
"""
- async def _preload_streamdetails() -> None:
+ async def _preload_streamdetails(item_id_in_buffer: str) -> None:
try:
next_item = await self.preload_next_queue_item(queue_id, item_id_in_buffer)
+ self._enqueue_next_item(queue_id, next_item)
except QueueEmpty:
return
- # always send enqueue next (even though we may have already sent that)
- # because it could have been changed and also because some players
- # sometimes miss the enqueue_next call when its sent too short after
- # the play_media call, so consider this a safety net.
- self._enqueue_next_item(queue_id, next_item)
if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
# this should not happen, but guard anyways
return
task_id = f"preload_next_item_{queue_id}"
- self.mass.call_later(30, _preload_streamdetails, task_id=task_id)
+ self.mass.call_later(0.5, _preload_streamdetails, item_id_in_buffer, task_id=task_id)
async def _resolve_media_items(
self, media_item: MediaItemTypeOrItemMapping, start_item: str | None = None
import shutil
import urllib.parse
from collections.abc import AsyncGenerator
-from contextlib import suppress
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
)
from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
-from music_assistant.helpers.ffmpeg import FFMpeg, check_ffmpeg_version, get_ffmpeg_stream
+from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
from music_assistant.helpers.util import (
get_folder_size,
get_free_space,
channels=2,
)
- # inform the queue that the track is now loaded in the buffer
- # so for example the next track can be enqueued
- self.mass.player_queues.track_loaded_in_buffer(queue_id, queue_item_id)
-
- # work out crossfade details
- self._crossfade_data.setdefault(queue_id, CrossfadeData())
- crossfade_data = self._crossfade_data[queue_id]
- enable_crossfade = self._get_crossfade_config(queue_item, flow_mode=False)
+ crossfade = await self.mass.config.get_player_config_value(queue.queue_id, CONF_CROSSFADE)
+ if crossfade and PlayerFeature.GAPLESS_PLAYBACK not in queue_player.supported_features:
+ # crossfade is not supported on this player due to missing gapless playback
+ self.logger.warning("Crossfade disabled: gapless playback not supported on player")
+ return False
- async for chunk in get_ffmpeg_stream(
- audio_input=self.get_queue_item_stream(
+ if crossfade:
+ # crossfade is enabled, use special crossfaded single item stream
+ # where the crossfade of the next track is present in the stream of
+ # a single track. This only works if the player supports gapless playback.
+ audio_input = self.get_queue_item_stream_with_crossfade(
queue_item=queue_item,
pcm_format=pcm_format,
- enable_crossfade=enable_crossfade,
- crossfade_data=crossfade_data,
session_id=session_id,
- ),
+ )
+ else:
+ audio_input = self.get_queue_item_stream(
+ queue_item=queue_item,
+ pcm_format=pcm_format,
+ )
+
+ async for chunk in get_ffmpeg_stream(
+ audio_input=audio_input,
input_format=pcm_format,
output_format=output_format,
filter_params=get_player_filter_params(
await resp.write(chunk)
except (BrokenPipeError, ConnectionResetError, ConnectionError):
break
- return resp
- # lookup next item in queue to determine additional actions
- next_item = self.mass.player_queues.get_next_item(queue_id, queue_item_id)
- if not next_item:
- # end of queue reached: make sure we yield the last_fadeout_part
- if crossfade_data and crossfade_data.fadeout_part:
- await resp.write(crossfade_data.fadeout_part)
- crossfade_data.fadeout_part = b""
- if (
- crossfade_data.fadeout_part
- and next_item
- and next_item.streamdetails
- and next_item.streamdetails.audio_format.sample_rate
- != crossfade_data.pcm_format.sample_rate
- and PlayerFeature.GAPLESS_DIFFERENT_SAMPLERATE not in queue_player.supported_features
- ):
- # next track's sample rate differs from current track
- # most players do not properly support gapless playback between different sample rates
- # so let's just output the fadeout data
- crossfade_data.session_id = ""
- self.logger.debug("Skipping crossfade: sample rate mismatch")
- async with FFMpeg(
- audio_input="-",
- input_format=crossfade_data.pcm_format,
- output_format=output_format,
- ) as ffmpeg:
- res = await ffmpeg.communicate(crossfade_data.fadeout_part)
- with suppress(BrokenPipeError, ConnectionResetError, ConnectionError):
- await resp.write(res[0])
-
return resp
async def serve_queue_flow_stream(self, request: web.Request) -> web.Response:
"""Get a flow stream of all tracks in the queue as raw PCM audio."""
# ruff: noqa: PLR0915
assert pcm_format.content_type.is_pcm()
- queue_item: QueueItem | None = None
- crossfade_data = CrossfadeData(b"", pcm_format)
+ queue_track = None
+ last_fadeout_part = b""
queue.flow_mode = True
-
if not start_queue_item:
# this can happen in some (edge case) race conditions
return
-
pcm_sample_size = int(
pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels
)
+ crossfade_enabled = await self.mass.config.get_player_config_value(
+ queue.queue_id, CONF_CROSSFADE
+ )
+ if start_queue_item.media_type != MediaType.TRACK:
+ # we only support crossfade for tracks, not for radio items
+ crossfade_enabled = False
+ crossfade_duration = self.mass.config.get_raw_player_config_value(
+ queue.queue_id, CONF_CROSSFADE_DURATION, 10
+ )
self.logger.info(
- "Start Queue Flow stream for Queue %s",
+ "Start Queue Flow stream for Queue %s - crossfade: %s",
queue.display_name,
+ f"{crossfade_duration}s" if crossfade_enabled else "disabled",
)
+ total_bytes_sent = 0
+
while True:
# get (next) queue item to stream
- if queue_item is None:
- queue_item = start_queue_item
+ if queue_track is None:
+ queue_track = start_queue_item
else:
try:
- queue_item = await self.mass.player_queues.preload_next_queue_item(
- queue.queue_id, queue_item.queue_item_id
+ queue_track = await self.mass.player_queues.preload_next_queue_item(
+ queue.queue_id, queue_track.queue_item_id
)
except QueueEmpty:
break
- if queue_item.streamdetails is None:
+ if queue_track.streamdetails is None:
raise RuntimeError(
"No Streamdetails known for queue item %s",
- queue_item.queue_item_id,
+ queue_track.queue_item_id,
)
- self.mass.player_queues.track_loaded_in_buffer(queue.queue_id, queue_item.queue_item_id)
+ self.logger.debug(
+ "Start Streaming queue track: %s (%s) for queue %s",
+ queue_track.streamdetails.uri,
+ queue_track.name,
+ queue.display_name,
+ )
# append to play log so the queue controller can work out which track is playing
- play_log_entry = PlayLogEntry(queue_item.queue_item_id)
+ play_log_entry = PlayLogEntry(queue_track.queue_item_id)
queue.flow_mode_stream_log.append(play_log_entry)
- # work out crossfade details
- enable_crossfade = self._get_crossfade_config(queue_item, flow_mode=True)
-
+ # set some basic vars
+ pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
+ crossfade_size = int(pcm_sample_size * crossfade_duration)
+ bytes_written = 0
+ buffer = b""
# handle incoming audio chunks
async for chunk in self.get_queue_item_stream(
- queue_item,
+ queue_track,
pcm_format=pcm_format,
- enable_crossfade=enable_crossfade,
- crossfade_data=crossfade_data,
):
- yield chunk
+ # buffer size needs to be big enough to include the crossfade part
+ req_buffer_size = pcm_sample_size if not crossfade_enabled else crossfade_size
+
+ # ALWAYS APPEND CHUNK TO BUFFER
+ buffer += chunk
+ del chunk
+ if len(buffer) < req_buffer_size:
+ # buffer is not full enough, move on
+ continue
- #### HANDLE END OF TRACK
- play_log_entry.seconds_streamed = queue_item.streamdetails.seconds_streamed
- play_log_entry.duration = queue_item.streamdetails.duration
+ #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
+ if last_fadeout_part:
+ # perform crossfade
+ fadein_part = buffer[:crossfade_size]
+ remaining_bytes = buffer[crossfade_size:]
+ crossfade_part = await crossfade_pcm_parts(
+ fadein_part,
+ last_fadeout_part,
+ pcm_format=pcm_format,
+ )
+ # send crossfade_part (as one big chunk)
+ bytes_written += len(crossfade_part)
+ yield crossfade_part
+
+ # also write the leftover bytes from the crossfade action
+ if remaining_bytes:
+ yield remaining_bytes
+ bytes_written += len(remaining_bytes)
+ del remaining_bytes
+ # clear vars
+ last_fadeout_part = b""
+ buffer = b""
+
+ #### OTHER: enough data in buffer, feed to output
+ while len(buffer) > req_buffer_size:
+ yield buffer[:pcm_sample_size]
+ bytes_written += pcm_sample_size
+ buffer = buffer[pcm_sample_size:]
+ #### HANDLE END OF TRACK
+ if last_fadeout_part:
+ # edge case: we did not get enough data to make the crossfade
+ yield last_fadeout_part
+ bytes_written += len(last_fadeout_part)
+ last_fadeout_part = b""
+ if self._crossfade_allowed(queue_track, flow_mode=True):
+ # if crossfade is enabled, save fadeout part to pickup for next track
+ last_fadeout_part = buffer[-crossfade_size:]
+ remaining_bytes = buffer[:-crossfade_size]
+ if remaining_bytes:
+ yield remaining_bytes
+ bytes_written += len(remaining_bytes)
+ del remaining_bytes
+ elif buffer:
+ # no crossfade enabled, just yield the buffer last part
+ bytes_written += len(buffer)
+ yield buffer
+ # make sure the buffer gets cleaned up
+ del buffer
+
+ # update duration details based on the actual pcm data we sent
+ # this also accounts for crossfade and silence stripping
+ seconds_streamed = bytes_written / pcm_sample_size
+ queue_track.streamdetails.seconds_streamed = seconds_streamed
+ queue_track.streamdetails.duration = (
+ queue_track.streamdetails.seek_position + seconds_streamed
+ )
+ play_log_entry.seconds_streamed = seconds_streamed
+ play_log_entry.duration = queue_track.streamdetails.duration
+ total_bytes_sent += bytes_written
+ self.logger.debug(
+ "Finished Streaming queue track: %s (%s) on queue %s",
+ queue_track.streamdetails.uri,
+ queue_track.name,
+ queue.display_name,
+ )
#### HANDLE END OF QUEUE FLOW STREAM
# end of queue flow: make sure we yield the last_fadeout_part
- if crossfade_data and crossfade_data.fadeout_part:
- yield crossfade_data.fadeout_part
+ if last_fadeout_part:
+ yield last_fadeout_part
# correct seconds streamed/duration
- last_part_seconds = len(crossfade_data.fadeout_part) / pcm_sample_size
- queue_item.streamdetails.seconds_streamed += last_part_seconds
- queue_item.streamdetails.duration += last_part_seconds
- del crossfade_data
+ last_part_seconds = len(last_fadeout_part) / pcm_sample_size
+ queue_track.streamdetails.seconds_streamed += last_part_seconds
+ queue_track.streamdetails.duration += last_part_seconds
+ del last_fadeout_part
+ total_bytes_sent += bytes_written
self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
async def get_announcement_stream(
self,
queue_item: QueueItem,
pcm_format: AudioFormat,
- enable_crossfade: bool = False,
- crossfade_data: CrossfadeData | None = None,
- session_id: str | None = None,
) -> AsyncGenerator[bytes, None]:
"""Get the audio stream for a single queue item as raw PCM audio."""
# collect all arguments for ffmpeg
streamdetails = queue_item.streamdetails
assert streamdetails
filter_params = []
- crossfade_duration = self.mass.config.get_raw_player_config_value(
- queue_item.queue_id, CONF_CROSSFADE_DURATION, 10
- )
-
- queue = self.mass.player_queues.get(queue_item.queue_id)
- self.logger.debug(
- "Start Streaming queue track: %s (%s) for queue %s - crossfade: %s",
- queue_item.streamdetails.uri,
- queue_item.name,
- queue.display_name,
- f"{crossfade_duration}s" if enable_crossfade else "disabled",
- )
# handle volume normalization
gain_correct: float | None = None
filter_params.append(f"volume={gain_correct}dB")
streamdetails.volume_normalization_gain_correct = gain_correct
- pad_silence_seconds = 0
if streamdetails.media_type == MediaType.RADIO or not streamdetails.duration:
# pad some silence before the radio/live stream starts to create some headroom
# for radio stations (or other live streams) that do not provide any look ahead buffer
# without this, some radio streams jitter a lot, especially with dynamic normalization,
# if the stream does not provide a look ahead buffer
- pad_silence_seconds = 4
+ async for silence in get_silence(4, pcm_format):
+ yield silence
+ del silence
+
+ first_chunk_received = False
+ async for chunk in get_media_stream(
+ self.mass,
+ streamdetails=streamdetails,
+ pcm_format=pcm_format,
+ filter_params=filter_params,
+ ):
+ if not first_chunk_received:
+ first_chunk_received = True
+ # inform the queue that the track is now loaded in the buffer
+ # so for example the next track can be enqueued
+ self.mass.player_queues.track_loaded_in_buffer(
+ queue_item.queue_id, queue_item.queue_item_id
+ )
+ yield chunk
+ del chunk
+
+ async def get_queue_item_stream_with_crossfade(
+ self,
+ queue_item: QueueItem,
+ pcm_format: AudioFormat,
+ session_id: str | None = None,
+ ) -> AsyncGenerator[bytes, None]:
+ """Get the audio stream for a single queue item with crossfade to the next item."""
+ queue = self.mass.player_queues.get(queue_item.queue_id)
+ streamdetails = queue_item.streamdetails
+ assert streamdetails
+ crossfade_duration = self.mass.config.get_raw_player_config_value(
+ queue_item.queue_id, CONF_CROSSFADE_DURATION, 10
+ )
+ self._crossfade_data.setdefault(queue.queue_id, CrossfadeData())
+ crossfade_data = self._crossfade_data[queue.queue_id]
+
+ self.logger.debug(
+ "Start Streaming queue track: %s (%s) for queue %s - crossfade: %s",
+ queue_item.streamdetails.uri,
+ queue_item.name,
+ queue.display_name,
+ f"{crossfade_duration} seconds",
+ )
if crossfade_data.session_id != session_id:
# invalidate expired crossfade data
crossfade_data.fadeout_part = b""
- first_chunk_received = False
buffer = b""
bytes_written = 0
pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
# buffer size needs to be big enough to include the crossfade part
crossfade_size = int(pcm_sample_size * crossfade_duration)
- req_buffer_size = pcm_sample_size
- if enable_crossfade or (crossfade_data and crossfade_data.fadeout_part):
- # crossfade is enabled, so we need to make sure we have enough data in the buffer
- # to perform the crossfade
- req_buffer_size += crossfade_size
-
- async for chunk in get_media_stream(
- self.mass,
- streamdetails=streamdetails,
- pcm_format=pcm_format,
- filter_params=filter_params,
- ):
- # yield silence when the chunk has been received from source but not yet sent to player
- # so we have a bit of backpressure to prevent jittering
- if not first_chunk_received and pad_silence_seconds:
- first_chunk_received = True
- async for silence in get_silence(pad_silence_seconds, pcm_format):
- yield silence
- del silence
+ async for chunk in self.get_queue_item_stream(queue_item, pcm_format):
# ALWAYS APPEND CHUNK TO BUFFER
buffer += chunk
del chunk
- if len(buffer) < req_buffer_size:
+ if len(buffer) < crossfade_size:
# buffer is not full enough, move on
continue
del fade_in_part
#### OTHER: enough data in buffer, feed to output
- while len(buffer) > req_buffer_size:
+ while len(buffer) > crossfade_size:
yield buffer[:pcm_sample_size]
bytes_written += pcm_sample_size
buffer = buffer[pcm_sample_size:]
if crossfade_data.pcm_format == pcm_format:
yield crossfade_data.fadeout_part
bytes_written += len(crossfade_data.fadeout_part)
- crossfade_data.fadeout_part = b""
- if enable_crossfade:
+ # always reset fadeout part at this point
+ crossfade_data.fadeout_part = b""
+ if self._crossfade_allowed(queue_item, flow_mode=False):
# if crossfade is enabled, save fadeout part to pickup for next track
crossfade_data.fadeout_part = buffer[-crossfade_size:]
crossfade_data.pcm_format = pcm_format
bytes_written += len(remaining_bytes)
del remaining_bytes
elif buffer:
- # no crossfade enabled, just yield the buffer last part
+ # no crossfade enabled/allowed, just yield the buffer last part
bytes_written += len(buffer)
yield buffer
# make sure the buffer gets cleaned up
# reschedule self
self.mass.call_later(3600, self._clean_audio_cache)
- def _get_crossfade_config(self, queue_item: QueueItem, flow_mode: bool = False) -> bool:
+ def _crossfade_allowed(self, queue_item: QueueItem, flow_mode: bool = False) -> bool:
"""Get the crossfade config for a queue item."""
if not (queue_player := self.mass.players.get(queue_item.queue_id)):
return False # just a guard
- use_crossfade = self.mass.config.get_raw_player_config_value(
- queue_item.queue_id, CONF_CROSSFADE, False
- )
- if not use_crossfade:
- return False
- if not flow_mode and PlayerFeature.GAPLESS_PLAYBACK not in queue_player.supported_features:
- # crossfade is not supported on this player due to missing gapless playback
- self.logger.debug("Skipping crossfade: gapless playback not supported on player")
- return False
if queue_item.media_type != MediaType.TRACK:
+ self.logger.debug("Skipping crossfade: current item is not a track")
return False
# check if the next item is part of the same album
next_item = self.mass.player_queues.get_next_item(