From 5551404cf4e86fa4a00fc9c75b7178e7881e40bb Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 22 Oct 2025 20:21:24 +0200 Subject: [PATCH] Fix various issues with streaming (e.g. on Sonos devices) (#2536) Fix various issues with streaming --- music_assistant/controllers/streams.py | 80 ++++++++++++++------------ music_assistant/helpers/audio.py | 49 +++++++++------- 2 files changed, 71 insertions(+), 58 deletions(-) diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index db9c82a0..4633d2bb 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -412,47 +412,48 @@ class StreamsController(CoreController): ) smart_fades_mode = SmartFadesMode.DISABLED + # work out pcm format based on output format + pcm_format = AudioFormat( + sample_rate=output_format.sample_rate, + # always use f32 internally for extra headroom for filters etc + content_type=ContentType.PCM_F32LE, + bit_depth=DEFAULT_PCM_FORMAT.bit_depth, + channels=2, + ) if smart_fades_mode != SmartFadesMode.DISABLED: # crossfade is enabled, use special crossfaded single item stream # where the crossfade of the next track is present in the stream of # a single track. This only works if the player supports gapless playback. - # work out pcm format based on output format - pcm_format = AudioFormat( - content_type=DEFAULT_PCM_FORMAT.content_type, - sample_rate=output_format.sample_rate, - # always use f32 internally for extra headroom for filters etc - bit_depth=DEFAULT_PCM_FORMAT.bit_depth, - channels=2, - ) - audio_input = get_ffmpeg_stream( - audio_input=self.get_queue_item_stream_with_smartfade( - queue_item=queue_item, - pcm_format=pcm_format, - session_id=session_id, - smart_fades_mode=smart_fades_mode, - standard_crossfade_duration=standard_crossfade_duration, - ), - input_format=pcm_format, - output_format=output_format, - filter_params=get_player_filter_params( - self.mass, queue_player.player_id, pcm_format, output_format - ), + + audio_input = self.get_queue_item_stream_with_smartfade( + queue_item=queue_item, + pcm_format=pcm_format, + session_id=session_id, + smart_fades_mode=smart_fades_mode, + standard_crossfade_duration=standard_crossfade_duration, ) else: # no crossfade, just a regular single item stream - # no need to convert to pcm first, request output format directly audio_input = self.get_queue_item_stream( queue_item=queue_item, - output_format=output_format, - filter_params=get_player_filter_params( - self.mass, - queue_player.player_id, - queue_item.streamdetails.audio_format, - output_format, - ), + pcm_format=pcm_format, ) - - async for chunk in audio_input: + # stream the audio + # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into + # the desired output format for the player including any player specific filter params + # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats + async for chunk in get_ffmpeg_stream( + audio_input=audio_input, + input_format=pcm_format, + output_format=output_format, + filter_params=get_player_filter_params( + self.mass, + player_id=queue_player.player_id, + input_format=pcm_format, + output_format=output_format, + ), + chunk_size=get_chunksize(output_format), + ): try: await resp.write(chunk) except (BrokenPipeError, ConnectionResetError, ConnectionError): @@ -541,6 +542,9 @@ class StreamsController(CoreController): return resp # all checks passed, start streaming! + # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into + # the desired output format for the player including any player specific filter params + # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name) async for chunk in get_ffmpeg_stream( @@ -836,7 +840,7 @@ class StreamsController(CoreController): # handle incoming audio chunks async for chunk in self.get_queue_item_stream( queue_track, - output_format=pcm_format, + pcm_format=pcm_format, ): # buffer size needs to be big enough to include the crossfade part req_buffer_size = ( @@ -1032,14 +1036,13 @@ class StreamsController(CoreController): async def get_queue_item_stream( self, queue_item: QueueItem, - output_format: AudioFormat, - filter_params: list[str] | None = None, + pcm_format: AudioFormat, ) -> AsyncGenerator[bytes, None]: - """Get the audio stream for a single queue item.""" + """Get the (PCM) audio stream for a single queue item.""" # collect all arguments for ffmpeg streamdetails = queue_item.streamdetails assert streamdetails - filter_params = filter_params or [] + filter_params: list[str] = [] # handle volume normalization gain_correct: float | None = None @@ -1075,7 +1078,7 @@ class StreamsController(CoreController): async for chunk in get_media_stream( self.mass, streamdetails=streamdetails, - output_format=output_format, + pcm_format=pcm_format, filter_params=filter_params, ): if not first_chunk_received: @@ -1279,6 +1282,9 @@ class StreamsController(CoreController): # no point in having a higher bit depth for lossy formats output_bit_depth = 16 output_sample_rate = min(48000, output_sample_rate) + if content_type == ContentType.WAV and output_bit_depth > 16: + # WAV 24bit is not widely supported, fallback to 16bit + output_bit_depth = 16 if output_format_str == "pcm": content_type = ContentType.from_bit_depth(output_bit_depth) return AudioFormat( diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index f9e082fb..9c1446c7 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -424,10 +424,10 @@ async def get_stream_details( async def get_media_stream( mass: MusicAssistant, streamdetails: StreamDetails, - output_format: AudioFormat, + pcm_format: AudioFormat, filter_params: list[str] | None = None, ) -> AsyncGenerator[bytes, None]: - """Get audio stream for given media details.""" + """Get audio stream for given media details as raw PCM.""" logger = LOGGER.getChild("media_stream") logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri) extra_input_args = streamdetails.extra_input_args or [] @@ -487,7 +487,7 @@ async def get_media_stream( ffmpeg_proc = FFMpeg( audio_input=audio_source, input_format=streamdetails.audio_format, - output_format=output_format, + output_format=pcm_format, filter_params=filter_params, extra_input_args=extra_input_args, collect_log_history=True, @@ -506,12 +506,12 @@ async def get_media_stream( streamdetails.uri, streamdetails.stream_type, streamdetails.volume_normalization_mode, - output_format.content_type.value, + pcm_format.content_type.value, ffmpeg_proc.proc.pid, ) stream_start = mass.loop.time() - chunk_size = get_chunksize(output_format, 1) + chunk_size = get_chunksize(pcm_format, 1) async for chunk in ffmpeg_proc.iter_chunked(chunk_size): if not first_chunk_received: # At this point ffmpeg has started and should now know the codec used @@ -547,17 +547,14 @@ async def get_media_stream( finally: # always ensure close is called which also handles all cleanup await ffmpeg_proc.close() - # try to determine how many seconds we've streamed - if output_format.content_type.is_pcm(): - # for pcm output we can calculate this easily - seconds_streamed = bytes_sent / output_format.pcm_sample_size if bytes_sent else 0 - streamdetails.seconds_streamed = seconds_streamed - # store accurate duration - if finished and not streamdetails.seek_position and seconds_streamed: - streamdetails.duration = int(seconds_streamed) - else: - # this is a less accurate estimate for compressed audio - seconds_streamed = bytes_sent / get_chunksize(output_format, 1) + # determine how many seconds we've streamed + # for pcm output we can calculate this easily + seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 + streamdetails.seconds_streamed = seconds_streamed + # store accurate duration + if finished and not streamdetails.seek_position and seconds_streamed: + streamdetails.duration = int(seconds_streamed) + logger.debug( "stream %s (with code %s) for %s - seconds streamed: %s", "cancelled" if cancelled else "finished" if finished else "aborted", @@ -574,7 +571,7 @@ async def get_media_stream( ): # if dynamic volume normalization is enabled and the entire track is streamed # the loudnorm filter will output the measurement in the log, - # so we can use those directly instead of analyzing the audio + # so we can use that directly instead of analyzing the audio logger.log(VERBOSE_LOG_LEVEL, "Collecting loudness measurement...") if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)): logger.debug( @@ -1047,11 +1044,21 @@ async def get_preview_stream( if TYPE_CHECKING: # avoid circular import assert isinstance(music_prov, MusicProvider) streamdetails = await music_prov.get_stream_details(item_id, media_type) - streamdetails.extra_input_args += ["-t", "30"] # cut after 30 seconds - async for chunk in get_media_stream( - mass=mass, - streamdetails=streamdetails, + pcm_format = AudioFormat( + content_type=ContentType.from_bit_depth(streamdetails.audio_format.bit_depth), + sample_rate=streamdetails.audio_format.sample_rate, + bit_depth=streamdetails.audio_format.bit_depth, + channels=streamdetails.audio_format.channels, + ) + async for chunk in get_ffmpeg_stream( + audio_input=get_media_stream( + mass=mass, + streamdetails=streamdetails, + pcm_format=pcm_format, + ), + input_format=pcm_format, output_format=AudioFormat(content_type=ContentType.AAC), + extra_input_args=["-t", "30"], # cut after 30 seconds ): yield chunk -- 2.34.1