From 698954291aa6e0d67d0d3de5e37bb2046d2f07ad Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sun, 15 May 2022 17:32:00 +0200 Subject: [PATCH] Fix BrokenPipe Error when streaming is aborted (#312) * Fix BrokenPipe Error when aborting a stream * adjust some logging --- music_assistant/controllers/stream.py | 31 +++++++++++++++++---------- music_assistant/helpers/audio.py | 2 +- music_assistant/helpers/process.py | 27 +++++++++++++++++++---- 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py index 60f8fb86..6bd911cb 100644 --- a/music_assistant/controllers/stream.py +++ b/music_assistant/controllers/stream.py @@ -179,7 +179,7 @@ class StreamController: # write eof when last packet is received sox_proc.write_eof() - self.mass.create_task(writer) + sox_proc.attach_task(writer()) # read bytes from final output async for audio_chunk in sox_proc.iterate_chunks(): @@ -408,7 +408,7 @@ class StreamController: self.logger.info( "Starting Queue audio stream for Queue %s (PCM format: %s - sample rate: %s)", queue.player.name, - pcm_fmt, + pcm_fmt.value, sample_rate, ) @@ -424,7 +424,9 @@ class StreamController: queue_index = await queue.queue_stream_next(queue_index) queue_track = queue.get_item(queue_index) if not queue_track: - self.logger.debug("no (more) tracks in queue %s", queue.queue_id) + self.logger.debug( + "Abort Queue stream %s: no (more) tracks in queue", queue.queue_id + ) break # get streamdetails try: @@ -433,21 +435,28 @@ class StreamController: ) except MediaNotFoundError as err: self.logger.warning( - "Skip track due to missing streamdetails", exc_info=err + "Skip track %s due to missing streamdetails", + queue_track.name, + exc_info=err, ) continue # check the PCM samplerate/bitrate if not resample and streamdetails.bit_depth > bit_depth: await queue.queue_stream_signal_next() - self.logger.info("Abort queue stream due to bit depth mismatch") + self.logger.debug( + "Abort queue stream %s due to bit depth mismatch", queue.player.name + ) break if ( not resample and streamdetails.sample_rate > sample_rate and streamdetails.sample_rate <= queue.max_sample_rate ): - self.logger.info("Abort queue stream due to sample rate mismatch") + self.logger.debug( + "Abort queue stream %s due to sample rate mismatch", + queue.player.name, + ) await queue.queue_stream_signal_next() break @@ -475,9 +484,9 @@ class StreamController: use_crossfade = False buffer_size = sample_size - self.logger.debug( + self.logger.info( "Start Streaming queue track: %s (%s) for queue %s", - queue_track.item_id, + queue_track.uri, queue_track.name, queue.player.name, ) @@ -498,7 +507,7 @@ class StreamController: # HANDLE FIRST PART OF TRACK if not chunk and bytes_written == 0 and is_last_chunk: # stream error: got empy first chunk - self.logger.warning("Stream error on track %s", queue_track.item_id) + self.logger.warning("Stream error on %s", queue_track.uri) # prevent player queue get stuck by just skipping to the next track queue_track.duration = 0 continue @@ -602,11 +611,11 @@ class StreamController: queue_track.duration = accurate_duration self.logger.debug( "Finished Streaming queue track: %s (%s) on queue %s", - queue_track.item_id, + queue_track.uri, queue_track.name, queue.player.name, ) # end of queue reached, pass last fadeout bits to final output yield last_fadeout_data # END OF QUEUE STREAM - self.logger.info("Queue stream for Queue %s finished.", queue.queue_id) + self.logger.info("Queue stream for Queue %s finished.", queue.player.name) diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index f0a9e9e4..2d6d63f3 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -539,7 +539,7 @@ async def get_sox_args_for_pcm_stream( if not sox_present: if not ffmpeg_present: raise AudioError( - "FFmpeg binary is missing from system." + "FFmpeg binary is missing from system. " "Please install ffmpeg on your OS to enable playback.", ) # collect input args diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index 2e149d0f..d0900e8d 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -8,7 +8,7 @@ from __future__ import annotations import asyncio import logging -from typing import AsyncGenerator, List, Optional, Tuple, Union +from typing import AsyncGenerator, Coroutine, List, Optional, Tuple, Union from async_timeout import timeout as _timeout @@ -26,6 +26,7 @@ class AsyncProcess: self._proc = None self._args = args self._enable_write = enable_write + self._attached_task: asyncio.Task = None self.closed = False async def __aenter__(self) -> "AsyncProcess": @@ -56,15 +57,27 @@ class AsyncProcess: async def __aexit__(self, exc_type, exc_value, traceback) -> bool: """Exit context manager.""" self.closed = True + if self._attached_task: + # cancel the attached reader/writer task + self._attached_task.cancel() if self._proc.returncode is None: # prevent subprocess deadlocking, send terminate and read remaining bytes - if self._enable_write: - self._proc.stdin.close() try: self._proc.terminate() + # close stdin and let it drain + if self._enable_write: + await self._proc.stdin.drain() + self._proc.stdin.close() + # read remaining bytes await self._proc.stdout.read() + # we really want to make this thing die ;-) self._proc.kill() - except (ProcessLookupError, BrokenPipeError, RuntimeError): + except ( + ProcessLookupError, + BrokenPipeError, + RuntimeError, + ConnectionResetError, + ): pass del self._proc @@ -98,6 +111,8 @@ class AsyncProcess: async def write(self, data: bytes) -> None: """Write data to process stdin.""" + if self.closed: + return try: self._proc.stdin.write(data) await self._proc.stdin.drain() @@ -118,6 +133,10 @@ class AsyncProcess: """Write bytes to process and read back results.""" return await self._proc.communicate(input_data) + def attach_task(self, coro: Coroutine) -> None: + """Attach given coro func as reader/writer task to properly cancel it when needed.""" + self._attached_task = asyncio.create_task(coro) + async def check_output(shell_cmd: str) -> Tuple[int, bytes]: """Run shell subprocess and return output.""" -- 2.34.1