From: Marcel van der Veldt Date: Tue, 14 Jun 2022 12:35:24 +0000 (+0200) Subject: Fix stream issues (#364) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=92ecd064244a7a8f9f027f15993299508bd3a591;p=music-assistant-server.git Fix stream issues (#364) * fix stream issues * try to survive radio reconnects * no need for redundant logging * only allow 30 seconds buffer ahead --- diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index dbefc0c5..817246f8 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio import gc import urllib.parse +from time import time from types import CoroutineType from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional from uuid import uuid4 @@ -16,7 +17,6 @@ from music_assistant.helpers.audio import ( crossfade_pcm_parts, fadein_pcm_part, get_chunksize, - get_ffmpeg_args_for_pcm_stream, get_media_stream, get_preview_stream, get_stream_details, @@ -265,11 +265,13 @@ class QueueStream: self.logger = self.queue.logger.getChild("stream") self.expected_clients = expected_clients self.connected_clients: Dict[str, CoroutineType[bytes]] = {} - self._runner_task: Optional[asyncio.Task] = None + self.seconds_streamed = 0 + self.streaming_started = 0 self.done = asyncio.Event() self.all_clients_connected = asyncio.Event() self.index_in_buffer = start_index self.signal_next: bool = False + self._runner_task: Optional[asyncio.Task] = None if autostart: self.mass.create_task(self.start()) @@ -313,22 +315,55 @@ class QueueStream: async def _queue_stream_runner(self) -> None: """Distribute audio chunks over connected client queues.""" - ffmpeg_args = await get_ffmpeg_args_for_pcm_stream( - self.pcm_sample_rate, - self.pcm_bit_depth, - self.pcm_channels, - output_format=self.output_format, + # collect ffmpeg args + input_format = ContentType.from_bit_depth( + self.pcm_bit_depth, self.pcm_floating_point ) + ffmpeg_args = [ + "ffmpeg", + "-hide_banner", + "-loglevel", + "error", + "-ignore_unknown", + # pcm input args + "-f", + input_format.value, + "-ac", + str(self.pcm_channels), + "-ar", + str(self.pcm_sample_rate), + "-i", + "-", + # output args + "-f", + self.output_format.value, + "-compression_level", + "0", + "-", + ] # get the raw pcm bytes from the queue stream and on the fly encode to wanted format # send the compressed/encoded stream to the client(s). chunk_size = get_chunksize(self.output_format) + sample_size = int( + self.pcm_sample_rate * (self.pcm_bit_depth / 8) * self.pcm_channels + ) async with AsyncProcess(ffmpeg_args, True, chunk_size) as ffmpeg_proc: async def writer(): """Task that sends the raw pcm audio to the ffmpeg process.""" async for audio_chunk in self._get_queue_stream(): await ffmpeg_proc.write(audio_chunk) + self.seconds_streamed += len(audio_chunk) / sample_size del audio_chunk + # allow clients to only buffer max ~30 seconds ahead + seconds_allowed = int(time() - self.streaming_started) + 30 + diff = self.seconds_streamed - seconds_allowed + if diff > 1: + self.logger.debug( + "Player is buffering %s seconds ahead, slowing it down", + diff, + ) + await asyncio.sleep(10) # write eof when last packet is received ffmpeg_proc.write_eof() @@ -336,14 +371,15 @@ class QueueStream: # wait max 5 seconds for all client(s) to connect try: - await asyncio.wait_for(self.all_clients_connected.wait(), 5) + await asyncio.wait_for(self.all_clients_connected.wait(), 10) except asyncio.exceptions.TimeoutError: self.logger.warning( - "Abort: client(s) did not connect within 5 seconds." + "Abort: client(s) did not connect within 10 seconds." ) self.done.set() return self.logger.debug("%s clients connected", len(self.connected_clients)) + self.streaming_started = time() # Read bytes from final output and send chunk to child callback. async for chunk in ffmpeg_proc.iterate_chunks(): @@ -492,21 +528,26 @@ class QueueStream: prev_chunk = None bytes_written = 0 # handle incoming audio chunks - async for chunk in get_media_stream( + async for is_last_chunk, chunk in get_media_stream( self.mass, streamdetails, - pcm_fmt, - pcm_sample_rate=self.pcm_sample_rate, + pcm_fmt=pcm_fmt, + sample_rate=self.pcm_sample_rate, + channels=self.pcm_channels, chunk_size=buffer_size, seek_position=seek_position, ): cur_chunk += 1 - is_last_chunk = len(chunk) < buffer_size # HANDLE FIRST PART OF TRACK if len(chunk) == 0 and bytes_written == 0 and is_last_chunk: # stream error: got empy first chunk ?! self.logger.warning("Stream error on %s", queue_track.uri) + elif cur_chunk == 1 and is_last_chunk: + # audio only has one single chunk (alert?) + bytes_written += len(chunk) + yield chunk + del chunk elif cur_chunk == 1 and last_fadeout_data: prev_chunk = chunk del chunk @@ -563,7 +604,7 @@ class QueueStream: # with the previous chunk and this chunk # and strip off silence last_part = await strip_silence( - prev_chunk + chunk, pcm_fmt, self.pcm_sample_rate, True + prev_chunk + chunk, pcm_fmt, self.pcm_sample_rate, reverse=True ) if len(last_part) < buffer_size: # part is too short after the strip action diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index cb59054d..c449dda5 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -37,28 +37,48 @@ async def crossfade_pcm_parts( fade_length: int, fmt: ContentType, sample_rate: int, + channels: int = 2, ) -> bytes: """Crossfade two chunks of pcm/raw audio using ffmpeg.""" fadeoutfile = create_tempfile() async with aiofiles.open(fadeoutfile.name, "wb") as outfile: await outfile.write(fade_out_part) - # input args - args = ["ffmpeg", "-hide_banner", "-loglevel", "error"] - args += [ + args = [ + # generic args + "ffmpeg", + "-hide_banner", + "-loglevel", + "error", + # fadeout part (as file) + "-acodec", + fmt.name.lower(), "-f", fmt.value, "-ac", - "2", + str(channels), "-ar", str(sample_rate), "-i", fadeoutfile.name, + # fade_in part (stdin) + "-acodec", + fmt.name.lower(), + "-f", + fmt.value, + "-ac", + str(channels), + "-ar", + str(sample_rate), + "-i", + "-", + # filter args + "-filter_complex", + f"[0][1]acrossfade=d={fade_length}", + # output args + "-f", + fmt.value, + "-", ] - args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"] - # filter args - args += ["-filter_complex", f"[0][1]acrossfade=d={fade_length}"] - # output args - args += ["-f", fmt.value, "-"] async with AsyncProcess(args, True) as proc: crossfade_data, _ = await proc.communicate(fade_in_part) LOGGER.debug( @@ -75,36 +95,61 @@ async def fadein_pcm_part( fade_length: int, fmt: ContentType, sample_rate: int, + channels: int = 2, ) -> bytes: """Fadein chunk of pcm/raw audio using ffmpeg.""" - # input args - args = ["ffmpeg", "-hide_banner", "-loglevel", "error"] - args += [ + args = [ + # generic args + "ffmpeg", + "-hide_banner", + "-loglevel", + "error", + # fade_in part (stdin) + "-acodec", + fmt.name.lower(), "-f", fmt.value, "-ac", - "2", + str(channels), "-ar", str(sample_rate), "-i", "-", + # filter args + "-af", + f"afade=type=in:start_time=0:duration={fade_length}", + # output args + "-f", + fmt.value, + "-", ] - # filter args - args += ["-af", f"afade=type=in:start_time=0:duration={fade_length}"] - # output args - args += ["-f", fmt.value, "-"] async with AsyncProcess(args, True) as proc: result_audio, _ = await proc.communicate(pcm_audio) return result_audio async def strip_silence( - audio_data: bytes, fmt: ContentType, sample_rate: int, reverse=False + audio_data: bytes, + fmt: ContentType, + sample_rate: int, + channels: int = 2, + reverse=False, ) -> bytes: """Strip silence from (a chunk of) pcm audio.""" # input args args = ["ffmpeg", "-hide_banner", "-loglevel", "error"] - args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"] + args += [ + "-acodec", + fmt.name.lower(), + "-f", + fmt.value, + "-ac", + str(channels), + "-ar", + str(sample_rate), + "-i", + "-", + ] # filter args if reverse: args += ["-af", "areverse,silenceremove=1:0:-50dB:detection=peak,areverse"] @@ -134,10 +179,10 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N started = time() proc_args = [ "ffmpeg", - "-f", - streamdetails.content_type.value, "-i", "-", + "-f", + streamdetails.content_type.value, "-af", "ebur128=framelog=verbose", "-f", @@ -331,83 +376,20 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration= return file.getvalue() -async def get_ffmpeg_args( - streamdetails: StreamDetails, - output_format: Optional[ContentType] = None, - pcm_sample_rate: Optional[int] = None, - pcm_channels: int = 2, -) -> List[str]: - """Collect all args to send to the ffmpeg process.""" - input_format = streamdetails.content_type - if output_format is None: - output_format = input_format - - ffmpeg_present, libsoxr_support = await check_audio_support() - - if not ffmpeg_present: - raise AudioError( - "FFmpeg binary is missing from system." - "Please install ffmpeg on your OS to enable playback.", - ) - # collect input args - input_args = [ - "ffmpeg", - "-hide_banner", - "-loglevel", - "error", - "-ignore_unknown", - ] - if streamdetails.content_type != ContentType.UNKNOWN: - input_args += ["-f", input_format.value] - input_args += ["-i", "-"] - # collect output args - if output_format.is_pcm(): - output_args = [ - "-acodec", - output_format.name.lower(), - "-f", - output_format.value, - "-ac", - str(pcm_channels), - "-ar", - str(pcm_sample_rate), - "-", - ] - else: - output_args = ["-f", output_format.value, "-"] - # collect extra and filter args - extra_args = [] - filter_params = [] - if streamdetails.gain_correct: - filter_params.append(f"volume={streamdetails.gain_correct}dB") - if ( - pcm_sample_rate is not None - and streamdetails.sample_rate != pcm_sample_rate - and libsoxr_support - and streamdetails.media_type == MediaType.TRACK - ): - # prefer libsoxr high quality resampler (if present) for sample rate conversions - filter_params.append("aresample=resampler=soxr") - if filter_params: - extra_args += ["-af", ",".join(filter_params)] - - if pcm_sample_rate is not None and not output_format.is_pcm(): - extra_args += ["-ar", str(pcm_sample_rate)] - - return input_args + extra_args + output_args - - async def get_media_stream( mass: MusicAssistant, streamdetails: StreamDetails, - output_format: Optional[ContentType] = None, - pcm_sample_rate: Optional[int] = None, + pcm_fmt: ContentType, + sample_rate: int, + channels: int = 2, chunk_size: Optional[int] = None, seek_position: int = 0, -) -> AsyncGenerator[bytes, None]: - """Get the audio stream for the given streamdetails.""" - - args = await get_ffmpeg_args(streamdetails, output_format, pcm_sample_rate) +) -> AsyncGenerator[Tuple[bool, bytes], None]: + """Get the PCM audio stream for the given streamdetails.""" + assert pcm_fmt.is_pcm(), "Output format must be a PCM type" + args = await _get_ffmpeg_args( + streamdetails, pcm_fmt, pcm_sample_rate=sample_rate, pcm_channels=channels + ) async with AsyncProcess( args, enable_write=True, chunk_size=chunk_size ) as ffmpeg_proc: @@ -431,9 +413,15 @@ async def get_media_stream( ffmpeg_proc.attach_task(writer()) # yield chunks from stdout + # we keep 1 chunk behind to detect end of stream properly try: + prev_chunk = b"" async for chunk in ffmpeg_proc.iterate_chunks(): - yield chunk + if prev_chunk: + yield (False, prev_chunk) + prev_chunk = chunk + # send last chunk + yield (True, prev_chunk) except (asyncio.CancelledError, GeneratorExit) as err: LOGGER.debug("media stream aborted for: %s", streamdetails.uri) raise err @@ -458,35 +446,38 @@ async def get_radio_stream( headers = {"Icy-MetaData": "1"} while True: # in loop to reconnect on connection failure - LOGGER.debug("radio stream (re)connecting to: %s", url) - async with mass.http_session.get(url, headers=headers) as resp: - headers = resp.headers - meta_int = int(headers.get("icy-metaint", "0")) - # stream with ICY Metadata - if meta_int: - while True: - audio_chunk = await resp.content.readexactly(meta_int) - yield audio_chunk - meta_byte = await resp.content.readexactly(1) - meta_length = ord(meta_byte) * 16 - meta_data = await resp.content.readexactly(meta_length) - if not meta_data: - continue - meta_data = meta_data.rstrip(b"\0") - stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data) - if not stream_title: - continue - stream_title = stream_title.group(1).decode() - if stream_title != streamdetails.stream_title: - streamdetails.stream_title = stream_title - if queue := mass.players.get_player_queue( - streamdetails.queue_id - ): - queue.signal_update() - # Regular HTTP stream - else: - async for chunk in resp.content.iter_any(): - yield chunk + try: + LOGGER.debug("radio stream (re)connecting to: %s", url) + async with mass.http_session.get(url, headers=headers, timeout=60) as resp: + headers = resp.headers + meta_int = int(headers.get("icy-metaint", "0")) + # stream with ICY Metadata + if meta_int: + while True: + audio_chunk = await resp.content.readexactly(meta_int) + yield audio_chunk + meta_byte = await resp.content.readexactly(1) + meta_length = ord(meta_byte) * 16 + meta_data = await resp.content.readexactly(meta_length) + if not meta_data: + continue + meta_data = meta_data.rstrip(b"\0") + stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data) + if not stream_title: + continue + stream_title = stream_title.group(1).decode() + if stream_title != streamdetails.stream_title: + streamdetails.stream_title = stream_title + if queue := mass.players.get_player_queue( + streamdetails.queue_id + ): + queue.signal_update() + # Regular HTTP stream + else: + async for chunk in resp.content.iter_any(): + yield chunk + except asyncio.exceptions.TimeoutError: + pass async def get_http_stream( @@ -498,7 +489,6 @@ async def get_http_stream( """Get audio stream from HTTP.""" if seek_position: assert streamdetails.duration, "Duration required for seek requests" - chunk_size = get_chunksize(streamdetails.content_type) # try to get filesize with a head request if seek_position and not streamdetails.size: async with mass.http_session.head(url) as resp: @@ -518,7 +508,7 @@ async def get_http_stream( async with mass.http_session.get(url, headers=headers) as resp: is_partial = resp.status == 206 buffer_all = seek_position and not is_partial - async for chunk in resp.content.iter_chunked(chunk_size): + async for chunk in resp.content.iter_any(): bytes_received += len(chunk) if buffer_all and not skip_bytes: buffer += chunk @@ -588,32 +578,6 @@ async def check_audio_support(try_install: bool = False) -> Tuple[bool, bool]: return result -async def get_ffmpeg_args_for_pcm_stream( - sample_rate: int, - bit_depth: int, - channels: int, - floating_point: bool = False, - output_format: ContentType = ContentType.FLAC, -) -> List[str]: - """Collect args for ffmpeg when converting from raw pcm to another contenttype.""" - input_format = ContentType.from_bit_depth(bit_depth, floating_point) - # collect input args - input_args = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-ignore_unknown"] - input_args += [ - "-f", - input_format.value, - "-ac", - str(channels), - "-ar", - str(sample_rate), - "-i", - "-", - ] - # collect output args - output_args = ["-f", output_format.value, "-"] - return input_args + output_args - - async def get_preview_stream( mass: MusicAssistant, provider_id: str, @@ -668,3 +632,61 @@ def get_chunksize(content_type: ContentType) -> int: ): return 64000 return 256000 + + +async def _get_ffmpeg_args( + streamdetails: StreamDetails, + pcm_output_format: ContentType, + pcm_sample_rate: int, + pcm_channels: int = 2, +) -> List[str]: + """Collect all args to send to the ffmpeg process.""" + input_format = streamdetails.content_type + assert pcm_output_format.is_pcm(), "Output format needs to be PCM" + + ffmpeg_present, libsoxr_support = await check_audio_support() + + if not ffmpeg_present: + raise AudioError( + "FFmpeg binary is missing from system." + "Please install ffmpeg on your OS to enable playback.", + ) + # collect input args + input_args = [ + "ffmpeg", + "-hide_banner", + "-loglevel", + "error", + "-ignore_unknown", + ] + if streamdetails.content_type != ContentType.UNKNOWN: + input_args += ["-f", input_format.value] + input_args += ["-i", "-"] + # collect output args + output_args = [ + "-acodec", + pcm_output_format.name.lower(), + "-f", + pcm_output_format.value, + "-ac", + str(pcm_channels), + "-ar", + str(pcm_sample_rate), + "-", + ] + # collect extra and filter args + extra_args = [] + filter_params = [] + if streamdetails.gain_correct: + filter_params.append(f"volume={streamdetails.gain_correct}dB") + if ( + streamdetails.sample_rate != pcm_sample_rate + and libsoxr_support + and streamdetails.media_type == MediaType.TRACK + ): + # prefer libsoxr high quality resampler (if present) for sample rate conversions + filter_params.append("aresample=resampler=soxr") + if filter_params: + extra_args += ["-af", ",".join(filter_params)] + + return input_args + extra_args + output_args diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 1eba0a1b..65c9575c 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -262,14 +262,14 @@ class PlayerQueue: await self.append(queue_items) async def play_alert( - self, uri: str, announce: bool = False, volume_adjust: int = 10 + self, uri: str, announce: bool = False, gain_correct: int = 6 ) -> str: """ Play given uri as Alert on the queue. uri: Uri that should be played as announcement, can be Music Assistant URI or plain url. announce: Prepend the (TTS) alert with a small announce sound. - volume_adjust: Adjust the volume of the player by this percentage (relative). + gain_correct: Adjust the gain of the alert sound (in dB). """ if self._snapshot: self.logger.debug("Ignore play_alert: already in progress") @@ -295,37 +295,28 @@ class PlayerQueue: if uri.startswith("http") or os.path.isfile(uri): # a plain url was provided queue_item = QueueItem.from_url(uri, "alert") - queue_item.streamdetails.gain_correct = 6 + queue_item.streamdetails.gain_correct = gain_correct queue_items.append(queue_item) else: raise MediaNotFoundError(f"Invalid uri: {uri}") from err - # append silence track, we use this to reliably detect when the alert is ready - silence_url = self.mass.streams.get_silence_url(600) - queue_item = QueueItem.from_url(silence_url, "alert") - queue_items.append(queue_item) - # load queue with alert sound(s) await self.load(queue_items) # wait for the alert to finish playing + await self.stream.done.wait() alert_done = asyncio.Event() def handle_event(evt: MassEvent): - if ( - self.current_item - and self.current_item.uri == silence_url - and self.elapsed_time - ): + if self.player.state != PlayerState.PLAYING: alert_done.set() unsub = self.mass.subscribe( - handle_event, EventType.QUEUE_TIME_UPDATED, self.queue_id + handle_event, EventType.QUEUE_UPDATED, self.queue_id ) try: - await asyncio.wait_for(alert_done.wait(), 120) + await asyncio.wait_for(alert_done.wait(), 30) finally: - unsub() # restore queue await self.snapshot_restore() @@ -635,7 +626,7 @@ class PlayerQueue: async def queue_stream_start( self, start_index: int, seek_position: int, fade_in: bool, passive: bool = False - ) -> None: + ) -> QueueStream: """Start the queue stream runner.""" output_format = self._settings.stream_type if self.player.use_multi_stream: @@ -660,6 +651,7 @@ class PlayerQueue: # execute the play command on the player(s) if not passive: await self.player.play_url(stream.url) + return stream def get_next_index(self, cur_index: Optional[int]) -> int: """Return the next index for the queue, accounting for repeat settings."""