From: Marcel van der Veldt Date: Tue, 26 Mar 2024 12:26:45 +0000 (+0100) Subject: small enhancements X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=1bbbc2eaae76d7be1fc19344e5c1e97de06c895a;p=music-assistant-server.git small enhancements --- diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 09ffa277..79e19a8f 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -37,11 +37,11 @@ from music_assistant.constants import ( CONF_CROSSFADE_DURATION, CONF_OUTPUT_CHANNELS, CONF_PUBLISH_IP, - ROOT_LOGGER_NAME, SILENCE_FILE, UGP_PREFIX, VERBOSE_LOG_LEVEL, ) +from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.server.helpers.audio import ( check_audio_support, crossfade_pcm_parts, @@ -362,7 +362,7 @@ class StreamsController(CoreController): "with libsoxr support" if libsoxr_support else "", ) # copy log level to audio module - logging.getLogger(f"{ROOT_LOGGER_NAME}.audio").setLevel(self.logger.level) + AUDIO_LOGGER.setLevel(self.logger.level) # start the webserver self.publish_port = config.get_value(CONF_BIND_PORT) self.publish_ip = config.get_value(CONF_PUBLISH_IP) @@ -969,10 +969,11 @@ class StreamsController(CoreController): is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration if is_radio or streamdetails.seek_position: strip_silence_begin = False - # chunk size = 1 second of pcm audio - pcm_sample_size = pcm_format.pcm_sample_size - chunk_size = pcm_sample_size # chunk size = sample size (= 1 second) - expected_chunks = int(((streamdetails.duration or 0) * pcm_sample_size) / chunk_size) + # pcm_sample_size = chunk size = 1 second of pcm audio + chunk_size = pcm_format.pcm_sample_size + expected_chunks = int( + ((streamdetails.duration or 0) * pcm_format.pcm_sample_size) / chunk_size + ) if expected_chunks < 10: strip_silence_end = False @@ -1041,7 +1042,7 @@ class StreamsController(CoreController): await state_data["finished"].wait() finished = ffmpeg_proc.returncode == 0 and state_data["finished"].is_set() bytes_sent = state_data["bytes_sent"] - seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0 + seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 streamdetails.seconds_streamed = seconds_streamed state_str = "finished" if finished else "aborted" logger.debug( @@ -1123,10 +1124,10 @@ class StreamsController(CoreController): # collect this chunk for next round prev_chunk = chunk + # if we did not receive any data, something went (terribly) wrong # raise here to prevent an (endless) loop elsewhere if state_data["bytes_sent"] == 0: - del prev_chunk raise AudioError(f"stream error on {streamdetails.uri}") # all chunks received, strip silence of last part if needed and yield remaining bytes diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index 533cb9e7..6ed4539a 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -107,7 +107,7 @@ class AsyncProcess: async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks of n size from the process stdout.""" - while self.returncode is None: + while not self._close_called: chunk = await self.readexactly(n) if chunk == b"": break @@ -115,7 +115,7 @@ class AsyncProcess: async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks as they come in from process stdout.""" - while self.returncode is None: + while not self._close_called: chunk = await self.read(n) if chunk == b"": break @@ -123,8 +123,6 @@ class AsyncProcess: async def readexactly(self, n: int) -> bytes: """Read exactly n bytes from the process stdout (or less if eof).""" - if not self.proc.stdout or self.proc.stdout.at_eof(): - return b"" try: return await self.proc.stdout.readexactly(n) except asyncio.IncompleteReadError as err: @@ -137,21 +135,19 @@ class AsyncProcess: and may return less or equal bytes than requested, but at least one byte. If EOF was received before any byte is read, this function returns empty byte object. """ - if not self.proc.stdout or self.proc.stdout.at_eof(): - return b"" return await self.proc.stdout.read(n) async def write(self, data: bytes) -> None: """Write data to process stdin.""" - if self.returncode is not None or self.proc.stdin.is_closing(): - raise asyncio.CancelledError("write called while process already done") + if self._close_called: + raise RuntimeError("write called while process already done") self.proc.stdin.write(data) with suppress(BrokenPipeError, ConnectionResetError): await self.proc.stdin.drain() async def write_eof(self) -> None: """Write end of file to to process stdin.""" - if self.returncode is not None or self.proc.stdin.is_closing(): + if self._close_called: return try: if self.proc.stdin.can_write_eof(): @@ -183,9 +179,9 @@ class AsyncProcess: # make sure the process is really cleaned up. # especially with pipes this can cause deadlocks if not properly guarded # we need to ensure stdout and stderr are flushed and stdin closed - while self.returncode is None: + while True: try: - async with asyncio.timeout(30): + async with asyncio.timeout(5): # abort existing readers on stderr/stdout first before we send communicate if self.proc.stdout and self.proc.stdout._waiter is not None: self.proc.stdout._waiter.set_exception(asyncio.CancelledError()) @@ -195,6 +191,8 @@ class AsyncProcess: self.proc.stderr._waiter = None # use communicate to flush all pipe buffers await self.proc.communicate() + if self.returncode is not None: + break except TimeoutError: LOGGER.debug( "Process %s with PID %s did not stop in time. Sending terminate...", diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index b907a326..3655bba2 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -318,8 +318,6 @@ class AirplayStream: async def write_eof(self) -> None: """Write EOF to the ffmpeg stdin.""" - if not self.running: - return await self._buffer.put(b"") async def send_cli_command(self, command: str) -> None: @@ -634,7 +632,7 @@ class AirplayProvider(PlayerProvider): input_format = AIRPLAY_PCM_FORMAT audio_source = self.mass.streams.get_announcement_stream( queue_item.streamdetails.data["url"], - pcm_format=AIRPLAY_PCM_FORMAT, + output_format=AIRPLAY_PCM_FORMAT, use_pre_announce=queue_item.streamdetails.data["use_pre_announce"], ) else: