Fix BrokenPipe Error when streaming is aborted (#312)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 15 May 2022 15:32:00 +0000 (17:32 +0200)
committerGitHub <noreply@github.com>
Sun, 15 May 2022 15:32:00 +0000 (17:32 +0200)
* Fix BrokenPipe Error when aborting a stream
* adjust some logging

music_assistant/controllers/stream.py
music_assistant/helpers/audio.py
music_assistant/helpers/process.py

index 60f8fb86ec647a46a6d4dbf90b0a267478fa2ecf..6bd911cb22245e9e9d41555d9636e4d2b4cfc10d 100644 (file)
@@ -179,7 +179,7 @@ class StreamController:
                 # write eof when last packet is received
                 sox_proc.write_eof()
 
-            self.mass.create_task(writer)
+            sox_proc.attach_task(writer())
 
             # read bytes from final output
             async for audio_chunk in sox_proc.iterate_chunks():
@@ -408,7 +408,7 @@ class StreamController:
         self.logger.info(
             "Starting Queue audio stream for Queue %s (PCM format: %s - sample rate: %s)",
             queue.player.name,
-            pcm_fmt,
+            pcm_fmt.value,
             sample_rate,
         )
 
@@ -424,7 +424,9 @@ class StreamController:
                 queue_index = await queue.queue_stream_next(queue_index)
             queue_track = queue.get_item(queue_index)
             if not queue_track:
-                self.logger.debug("no (more) tracks in queue %s", queue.queue_id)
+                self.logger.debug(
+                    "Abort Queue stream %s: no (more) tracks in queue", queue.queue_id
+                )
                 break
             # get streamdetails
             try:
@@ -433,21 +435,28 @@ class StreamController:
                 )
             except MediaNotFoundError as err:
                 self.logger.warning(
-                    "Skip track due to missing streamdetails", exc_info=err
+                    "Skip track %s due to missing streamdetails",
+                    queue_track.name,
+                    exc_info=err,
                 )
                 continue
 
             # check the PCM samplerate/bitrate
             if not resample and streamdetails.bit_depth > bit_depth:
                 await queue.queue_stream_signal_next()
-                self.logger.info("Abort queue stream due to bit depth mismatch")
+                self.logger.debug(
+                    "Abort queue stream %s due to bit depth mismatch", queue.player.name
+                )
                 break
             if (
                 not resample
                 and streamdetails.sample_rate > sample_rate
                 and streamdetails.sample_rate <= queue.max_sample_rate
             ):
-                self.logger.info("Abort queue stream due to sample rate mismatch")
+                self.logger.debug(
+                    "Abort queue stream %s due to sample rate mismatch",
+                    queue.player.name,
+                )
                 await queue.queue_stream_signal_next()
                 break
 
@@ -475,9 +484,9 @@ class StreamController:
                 use_crossfade = False
                 buffer_size = sample_size
 
-            self.logger.debug(
+            self.logger.info(
                 "Start Streaming queue track: %s (%s) for queue %s",
-                queue_track.item_id,
+                queue_track.uri,
                 queue_track.name,
                 queue.player.name,
             )
@@ -498,7 +507,7 @@ class StreamController:
                 # HANDLE FIRST PART OF TRACK
                 if not chunk and bytes_written == 0 and is_last_chunk:
                     # stream error: got empy first chunk
-                    self.logger.warning("Stream error on track %s", queue_track.item_id)
+                    self.logger.warning("Stream error on %s", queue_track.uri)
                     # prevent player queue get stuck by just skipping to the next track
                     queue_track.duration = 0
                     continue
@@ -602,11 +611,11 @@ class StreamController:
             queue_track.duration = accurate_duration
             self.logger.debug(
                 "Finished Streaming queue track: %s (%s) on queue %s",
-                queue_track.item_id,
+                queue_track.uri,
                 queue_track.name,
                 queue.player.name,
             )
         # end of queue reached, pass last fadeout bits to final output
         yield last_fadeout_data
         # END OF QUEUE STREAM
-        self.logger.info("Queue stream for Queue %s finished.", queue.queue_id)
+        self.logger.info("Queue stream for Queue %s finished.", queue.player.name)
index f0a9e9e4ffcb5dc564bf82a86e5ba30a2c496255..2d6d63f3761980b50cfe1803418de1bcc2cf7934 100644 (file)
@@ -539,7 +539,7 @@ async def get_sox_args_for_pcm_stream(
     if not sox_present:
         if not ffmpeg_present:
             raise AudioError(
-                "FFmpeg binary is missing from system."
+                "FFmpeg binary is missing from system. "
                 "Please install ffmpeg on your OS to enable playback.",
             )
         # collect input args
index 2e149d0f4b05ece7db2d9b8c776e998637399a86..d0900e8d01acf2a3e214b65b56f8fe87be741445 100644 (file)
@@ -8,7 +8,7 @@ from __future__ import annotations
 
 import asyncio
 import logging
-from typing import AsyncGenerator, List, Optional, Tuple, Union
+from typing import AsyncGenerator, Coroutine, List, Optional, Tuple, Union
 
 from async_timeout import timeout as _timeout
 
@@ -26,6 +26,7 @@ class AsyncProcess:
         self._proc = None
         self._args = args
         self._enable_write = enable_write
+        self._attached_task: asyncio.Task = None
         self.closed = False
 
     async def __aenter__(self) -> "AsyncProcess":
@@ -56,15 +57,27 @@ class AsyncProcess:
     async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
         """Exit context manager."""
         self.closed = True
+        if self._attached_task:
+            # cancel the attached reader/writer task
+            self._attached_task.cancel()
         if self._proc.returncode is None:
             # prevent subprocess deadlocking, send terminate and read remaining bytes
-            if self._enable_write:
-                self._proc.stdin.close()
             try:
                 self._proc.terminate()
+                # close stdin and let it drain
+                if self._enable_write:
+                    await self._proc.stdin.drain()
+                    self._proc.stdin.close()
+                # read remaining bytes
                 await self._proc.stdout.read()
+                # we really want to make this thing die ;-)
                 self._proc.kill()
-            except (ProcessLookupError, BrokenPipeError, RuntimeError):
+            except (
+                ProcessLookupError,
+                BrokenPipeError,
+                RuntimeError,
+                ConnectionResetError,
+            ):
                 pass
         del self._proc
 
@@ -98,6 +111,8 @@ class AsyncProcess:
 
     async def write(self, data: bytes) -> None:
         """Write data to process stdin."""
+        if self.closed:
+            return
         try:
             self._proc.stdin.write(data)
             await self._proc.stdin.drain()
@@ -118,6 +133,10 @@ class AsyncProcess:
         """Write bytes to process and read back results."""
         return await self._proc.communicate(input_data)
 
+    def attach_task(self, coro: Coroutine) -> None:
+        """Attach given coro func as reader/writer task to properly cancel it when needed."""
+        self._attached_task = asyncio.create_task(coro)
+
 
 async def check_output(shell_cmd: str) -> Tuple[int, bytes]:
     """Run shell subprocess and return output."""