small enhancements
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 26 Mar 2024 12:26:45 +0000 (13:26 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 26 Mar 2024 12:26:45 +0000 (13:26 +0100)
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/process.py
music_assistant/server/providers/airplay/__init__.py

index 09ffa2774be07f12000015aff0fcdd766605e127..79e19a8f5b0fa04b50dd3433533192d115f35ec0 100644 (file)
@@ -37,11 +37,11 @@ from music_assistant.constants import (
     CONF_CROSSFADE_DURATION,
     CONF_OUTPUT_CHANNELS,
     CONF_PUBLISH_IP,
-    ROOT_LOGGER_NAME,
     SILENCE_FILE,
     UGP_PREFIX,
     VERBOSE_LOG_LEVEL,
 )
+from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.server.helpers.audio import (
     check_audio_support,
     crossfade_pcm_parts,
@@ -362,7 +362,7 @@ class StreamsController(CoreController):
             "with libsoxr support" if libsoxr_support else "",
         )
         # copy log level to audio module
-        logging.getLogger(f"{ROOT_LOGGER_NAME}.audio").setLevel(self.logger.level)
+        AUDIO_LOGGER.setLevel(self.logger.level)
         # start the webserver
         self.publish_port = config.get_value(CONF_BIND_PORT)
         self.publish_ip = config.get_value(CONF_PUBLISH_IP)
@@ -969,10 +969,11 @@ class StreamsController(CoreController):
         is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
         if is_radio or streamdetails.seek_position:
             strip_silence_begin = False
-        # chunk size = 1 second of pcm audio
-        pcm_sample_size = pcm_format.pcm_sample_size
-        chunk_size = pcm_sample_size  # chunk size = sample size  (= 1 second)
-        expected_chunks = int(((streamdetails.duration or 0) * pcm_sample_size) / chunk_size)
+        # pcm_sample_size = chunk size = 1 second of pcm audio
+        chunk_size = pcm_format.pcm_sample_size
+        expected_chunks = int(
+            ((streamdetails.duration or 0) * pcm_format.pcm_sample_size) / chunk_size
+        )
         if expected_chunks < 10:
             strip_silence_end = False
 
@@ -1041,7 +1042,7 @@ class StreamsController(CoreController):
                 await state_data["finished"].wait()
             finished = ffmpeg_proc.returncode == 0 and state_data["finished"].is_set()
             bytes_sent = state_data["bytes_sent"]
-            seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
+            seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
             streamdetails.seconds_streamed = seconds_streamed
             state_str = "finished" if finished else "aborted"
             logger.debug(
@@ -1123,10 +1124,10 @@ class StreamsController(CoreController):
 
                 # collect this chunk for next round
                 prev_chunk = chunk
+
             # if we did not receive any data, something went (terribly) wrong
             # raise here to prevent an (endless) loop elsewhere
             if state_data["bytes_sent"] == 0:
-                del prev_chunk
                 raise AudioError(f"stream error on {streamdetails.uri}")
 
             # all chunks received, strip silence of last part if needed and yield remaining bytes
index 533cb9e74055495a5ff65fc85c83dfd724f75e74..6ed4539ae4685454742542885bc55250557652fb 100644 (file)
@@ -107,7 +107,7 @@ class AsyncProcess:
 
     async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks of n size from the process stdout."""
-        while self.returncode is None:
+        while not self._close_called:
             chunk = await self.readexactly(n)
             if chunk == b"":
                 break
@@ -115,7 +115,7 @@ class AsyncProcess:
 
     async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks as they come in from process stdout."""
-        while self.returncode is None:
+        while not self._close_called:
             chunk = await self.read(n)
             if chunk == b"":
                 break
@@ -123,8 +123,6 @@ class AsyncProcess:
 
     async def readexactly(self, n: int) -> bytes:
         """Read exactly n bytes from the process stdout (or less if eof)."""
-        if not self.proc.stdout or self.proc.stdout.at_eof():
-            return b""
         try:
             return await self.proc.stdout.readexactly(n)
         except asyncio.IncompleteReadError as err:
@@ -137,21 +135,19 @@ class AsyncProcess:
         and may return less or equal bytes than requested, but at least one byte.
         If EOF was received before any byte is read, this function returns empty byte object.
         """
-        if not self.proc.stdout or self.proc.stdout.at_eof():
-            return b""
         return await self.proc.stdout.read(n)
 
     async def write(self, data: bytes) -> None:
         """Write data to process stdin."""
-        if self.returncode is not None or self.proc.stdin.is_closing():
-            raise asyncio.CancelledError("write called while process already done")
+        if self._close_called:
+            raise RuntimeError("write called while process already done")
         self.proc.stdin.write(data)
         with suppress(BrokenPipeError, ConnectionResetError):
             await self.proc.stdin.drain()
 
     async def write_eof(self) -> None:
         """Write end of file to to process stdin."""
-        if self.returncode is not None or self.proc.stdin.is_closing():
+        if self._close_called:
             return
         try:
             if self.proc.stdin.can_write_eof():
@@ -183,9 +179,9 @@ class AsyncProcess:
         # make sure the process is really cleaned up.
         # especially with pipes this can cause deadlocks if not properly guarded
         # we need to ensure stdout and stderr are flushed and stdin closed
-        while self.returncode is None:
+        while True:
             try:
-                async with asyncio.timeout(30):
+                async with asyncio.timeout(5):
                     # abort existing readers on stderr/stdout first before we send communicate
                     if self.proc.stdout and self.proc.stdout._waiter is not None:
                         self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
@@ -195,6 +191,8 @@ class AsyncProcess:
                         self.proc.stderr._waiter = None
                     # use communicate to flush all pipe buffers
                     await self.proc.communicate()
+                    if self.returncode is not None:
+                        break
             except TimeoutError:
                 LOGGER.debug(
                     "Process %s with PID %s did not stop in time. Sending terminate...",
index b907a326419af3dcecefc04ddece95ccd69d8631..3655bba22d67e299009e01d65ec4d8d75ef99d95 100644 (file)
@@ -318,8 +318,6 @@ class AirplayStream:
 
     async def write_eof(self) -> None:
         """Write EOF to the ffmpeg stdin."""
-        if not self.running:
-            return
         await self._buffer.put(b"")
 
     async def send_cli_command(self, command: str) -> None:
@@ -634,7 +632,7 @@ class AirplayProvider(PlayerProvider):
             input_format = AIRPLAY_PCM_FORMAT
             audio_source = self.mass.streams.get_announcement_stream(
                 queue_item.streamdetails.data["url"],
-                pcm_format=AIRPLAY_PCM_FORMAT,
+                output_format=AIRPLAY_PCM_FORMAT,
                 use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
             )
         else: