From fa37fa8d16e048d3e7953a5fcf667beb9e9cb7eb Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Mon, 25 Mar 2024 22:57:52 +0100 Subject: [PATCH] Performance and stability fixes (#1180) --- .devcontainer/devcontainer.json | 8 +- .vscode/launch.json | 4 +- Dockerfile | 9 +- music_assistant/server/controllers/streams.py | 49 ++++---- music_assistant/server/helpers/audio.py | 74 ++++++------ music_assistant/server/helpers/process.py | 74 +++++++----- .../server/models/core_controller.py | 4 +- .../server/providers/airplay/__init__.py | 106 ++++++++---------- .../server/providers/snapcast/__init__.py | 2 +- .../server/providers/spotify/__init__.py | 16 +-- .../server/providers/url/__init__.py | 4 +- music_assistant/server/server.py | 8 +- 12 files changed, 180 insertions(+), 178 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index ea4b78a2..20a027ed 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -3,10 +3,8 @@ "dockerfile": "Dockerfile", "context": ".." }, - "features": { - }, + "features": {}, "postCreateCommand": "./.devcontainer/post-create.sh", - "forwardPorts": [ - 8095 - ] + "forwardPorts": [8095, 3483, 9000, 9090], + "runArgs": ["--network=host"] } diff --git a/.vscode/launch.json b/.vscode/launch.json index 5a280cfd..7ea91c4b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -6,14 +6,14 @@ "configurations": [ { "name": "Python: Module", - "type": "python", + "type": "debugpy", "request": "launch", "module": "music_assistant", "justMyCode": false, "args":[ "--log-level", "debug" ], - // "env": {"PYTHONDEVMODE": "1"} + "env": {"PYTHONDEVMODE": "1"} } ] } diff --git a/Dockerfile b/Dockerfile index 7fea4f19..939f01a8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -46,6 +46,9 @@ ARG TARGETPLATFORM RUN set -x \ # add bookworm backports repo && sh -c 'echo "deb http://deb.debian.org/debian bookworm-backports main" >> /etc/apt/sources.list' \ + # add multimedia repo + && sh -c 'echo "Types: deb\nURIs: https://www.deb-multimedia.org\nSuites: stable\nComponents: main non-free\nSigned-By: /etc/apt/trusted.gpg.d/deb-multimedia-keyring.gpg" >> /etc/apt/sources.list.d/deb-multimedia.sources' \ + && sh -c 'echo "Package: *\nPin: origin www.deb-multimedia.org\nPin-Priority: 1" >> /etc/apt/preferences.d/99deb-multimedia' \ && apt-get update \ && apt-get install -y --no-install-recommends \ ca-certificates \ @@ -53,7 +56,6 @@ RUN set -x \ git \ wget \ tzdata \ - ffmpeg \ libsox-fmt-all \ libsox3 \ sox \ @@ -62,6 +64,11 @@ RUN set -x \ libjemalloc2 \ # install snapcast server 0.27 from bookworm backports && apt-get install -y --no-install-recommends -t bookworm-backports snapserver \ + # install ffmpeg 6 from multimedia repo + && cd /tmp && curl -sLO https://www.deb-multimedia.org/pool/main/d/deb-multimedia-keyring/deb-multimedia-keyring_2016.8.1_all.deb \ + && apt install -y /tmp/deb-multimedia-keyring_2016.8.1_all.deb \ + && apt-get update \ + && apt install -y -t 'o=Unofficial Multimedia Packages' ffmpeg \ # cleanup && rm -rf /tmp/* \ && rm -rf /var/lib/apt/lists/* diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index a1887daf..d7131158 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -37,11 +37,11 @@ from music_assistant.constants import ( CONF_CROSSFADE_DURATION, CONF_OUTPUT_CHANNELS, CONF_PUBLISH_IP, + ROOT_LOGGER_NAME, SILENCE_FILE, UGP_PREFIX, VERBOSE_LOG_LEVEL, ) -from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.server.helpers.audio import ( check_audio_support, crossfade_pcm_parts, @@ -362,7 +362,7 @@ class StreamsController(CoreController): "with libsoxr support" if libsoxr_support else "", ) # copy log level to audio module - AUDIO_LOGGER.setLevel(self.logger.level) + logging.getLogger(f"{ROOT_LOGGER_NAME}.audio").setLevel(self.logger.level) # start the webserver self.publish_port = config.get_value(CONF_BIND_PORT) self.publish_ip = config.get_value(CONF_PUBLISH_IP) @@ -828,8 +828,9 @@ class StreamsController(CoreController): queue.queue_id, CONF_CROSSFADE_DURATION, 8 ) crossfade_size = int(pcm_sample_size * crossfade_duration) - buffer_size = int(pcm_sample_size * 5) # 5 seconds + buffer_size = int(pcm_sample_size * 2) # 2 seconds if use_crossfade: + # buffer size needs to be big enough to include the crossfade part buffer_size += crossfade_size bytes_written = 0 buffer = b"" @@ -859,9 +860,10 @@ class StreamsController(CoreController): pcm_format.bit_depth, pcm_format.sample_rate, ) - # send crossfade_part - yield crossfade_part + # send crossfade_part (as one big chunk) bytes_written += len(crossfade_part) + yield crossfade_part + # also write the leftover bytes from the crossfade action if remaining_bytes: yield remaining_bytes @@ -873,9 +875,11 @@ class StreamsController(CoreController): #### OTHER: enough data in buffer, feed to output while len(buffer) > buffer_size: - yield buffer[:pcm_sample_size] - bytes_written += pcm_sample_size + subchunk = buffer[:pcm_sample_size] buffer = buffer[pcm_sample_size:] + bytes_written += len(subchunk) + yield subchunk + del subchunk #### HANDLE END OF TRACK if last_fadeout_part: @@ -891,9 +895,10 @@ class StreamsController(CoreController): bytes_written += len(remaining_bytes) del remaining_bytes else: - # no crossfade enabled, just yield the (entire) buffer last part - yield buffer + # no crossfade enabled, just yield the buffer last part bytes_written += len(buffer) + yield buffer + del buffer # update duration details based on the actual pcm data we sent # this also accounts for crossfade and silence stripping @@ -914,7 +919,7 @@ class StreamsController(CoreController): if last_fadeout_part: yield last_fadeout_part del last_fadeout_part - del buffer + self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name) async def get_announcement_stream( @@ -960,10 +965,10 @@ class StreamsController(CoreController): is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration if is_radio or streamdetails.seek_position: strip_silence_begin = False - # chunk size = 2 seconds of pcm audio - pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2) - chunk_size = pcm_sample_size * (1 if is_radio else 2) - expected_chunks = int((streamdetails.duration or 0) / 2) + # chunk size = 1 second of pcm audio + pcm_sample_size = pcm_format.pcm_sample_size + chunk_size = pcm_sample_size # chunk size = sample size (= 1 second) + expected_chunks = int(((streamdetails.duration or 0) * pcm_sample_size) / chunk_size) if expected_chunks < 10: strip_silence_end = False @@ -980,7 +985,7 @@ class StreamsController(CoreController): extra_args += ["-ss", str(seek_pos)] if streamdetails.target_loudness is not None: # add loudnorm filters - filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5" + filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=11:TP=-2" if streamdetails.loudness: filter_rule += f":measured_I={streamdetails.loudness.integrated}" filter_rule += f":measured_LRA={streamdetails.loudness.lra}" @@ -1013,6 +1018,7 @@ class StreamsController(CoreController): input_path=input_path, # loglevel info is needed for loudness measurement loglevel="info", + extra_input_args=["-filter_threads", "1"], ) async def log_reader(ffmpeg_proc: AsyncProcess, state_data: dict[str, Any]): @@ -1068,10 +1074,6 @@ class StreamsController(CoreController): if music_prov := self.mass.get_provider(streamdetails.provider): self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed)) - # cleanup - del state_data - del ffmpeg_proc - async with AsyncProcess( ffmpeg_args, enable_stdin=audio_source_iterator is not None, @@ -1116,8 +1118,7 @@ class StreamsController(CoreController): # collect this chunk for next round prev_chunk = chunk - - # we did not receive any data, somethinh wet wrong + # if we did not receive any data, something went (terribly) wrong # raise here to prevent an endless loop elsewhere if state_data["bytes_sent"] == 0: raise AudioError(f"stream error on {streamdetails.uri}") @@ -1134,7 +1135,11 @@ class StreamsController(CoreController): else: final_chunk = prev_chunk - # yield final chunk to output + # yield final chunk to output (in chunk_size parts) + while len(final_chunk) > chunk_size: + yield final_chunk[:chunk_size] + final_chunk = final_chunk[chunk_size:] + state_data["bytes_sent"] += len(final_chunk) yield final_chunk state_data["bytes_sent"] += len(final_chunk) state_data["finished"].set() diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 18ff1fd0..a1e1828c 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -12,7 +12,7 @@ from time import time from typing import TYPE_CHECKING import aiofiles -from aiohttp import ClientError, ClientResponseError, ClientTimeout +from aiohttp import ClientResponseError, ClientTimeout from music_assistant.common.helpers.global_cache import ( get_global_cache_value, @@ -380,24 +380,19 @@ async def get_radio_stream( ) -> AsyncGenerator[bytes, None]: """Get radio audio stream from HTTP, including metadata retrieval.""" resolved_url, supports_icy, is_hls = await resolve_radio_stream(mass, url) - retries = 0 - while True: - try: - retries += 1 - if is_hls: # special HLS stream - async for chunk in get_hls_stream(mass, resolved_url, streamdetails): - yield chunk - elif supports_icy: # http stream supports icy metadata - async for chunk in get_icy_stream(mass, resolved_url, streamdetails): - yield chunk - else: # generic http stream (without icy metadata) - async for chunk in get_http_stream(mass, resolved_url, streamdetails): - yield chunk - except ClientError: - LOGGER.warning("Streaming radio %s failed, retrying...", streamdetails.uri) - if retries >= 5: - raise - await asyncio.sleep(1 * retries) + # handle special HLS stream + if is_hls: + async for chunk in get_hls_stream(mass, resolved_url, streamdetails): + yield chunk + return + # handle http stream supports icy metadata + if supports_icy: + async for chunk in get_icy_stream(mass, resolved_url, streamdetails): + yield chunk + return + # generic http stream (without icy metadata) + async for chunk in get_http_stream(mass, resolved_url, streamdetails): + yield chunk async def get_icy_stream( @@ -495,18 +490,16 @@ async def get_hls_stream( "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url ) - input_format = streamdetails.audio_format - output_format = streamdetails.audio_format if streamdetails.audio_format.content_type == ContentType.UNKNOWN: streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC) - output_format = AudioFormat(content_type=ContentType.FLAC) try: metadata_task = asyncio.create_task(watch_metadata()) async for chunk in get_ffmpeg_stream( audio_input=substream_url, - input_format=input_format, - output_format=output_format, + input_format=streamdetails.audio_format, + # we need a self-explaining codec but not loose data from re-encoding + output_format=AudioFormat(content_type=ContentType.FLAC), ): yield chunk finally: @@ -625,7 +618,7 @@ async def get_ffmpeg_stream( enable_stdout=True, enable_stderr=False, custom_stdin=audio_input if use_stdin else None, - name="player_ffmpeg_stream", + name="ffmpeg_stream", ) as ffmpeg_proc: # read final chunks from stdout chunk_size = chunk_size or get_chunksize(output_format, 1) @@ -799,6 +792,7 @@ def get_ffmpeg_args( input_path: str = "-", output_path: str = "-", loglevel: str = "info", + extra_input_args: list[str] | None = None, ) -> list[str]: """Collect all args to send to the ffmpeg process.""" if extra_args is None: @@ -824,16 +818,13 @@ def get_ffmpeg_args( "-ignore_unknown", "-protocol_whitelist", "file,http,https,tcp,tls,crypto,pipe,data,fd", + "-filter_complex_threads", + "1", ] # collect input args - input_args = [ - "-ac", - str(input_format.channels), - "-channel_layout", - "mono" if input_format.channels == 1 else "stereo", - ] - if input_format.content_type.is_pcm(): - input_args += ["-ar", str(input_format.sample_rate)] + input_args = [] + if extra_input_args: + input_args += extra_input_args if input_path.startswith("http"): # append reconnect options for direct stream from http input_args += [ @@ -852,15 +843,24 @@ def get_ffmpeg_args( "-reconnect_on_http_error", "5xx", ] - if input_format.content_type != ContentType.UNKNOWN: - input_args += ["-f", input_format.content_type.value] + if input_format.content_type.is_pcm(): + input_args += [ + "-ac", + str(input_format.channels), + "-channel_layout", + "mono" if input_format.channels == 1 else "stereo", + "-ar", + str(input_format.sample_rate), + "-acodec", + input_format.content_type.name.lower(), + "-f", + input_format.content_type.value, + ] input_args += ["-i", input_path] # collect output args if output_path.upper() == "NULL": output_args = ["-f", "null", "-"] - elif output_format.content_type == ContentType.UNKNOWN: - output_args = [output_path] else: output_args = [ "-acodec", diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index 299e410d..080fbe99 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -60,7 +60,6 @@ class AsyncProcess: self._custom_stdin = None self.attached_tasks.append(asyncio.create_task(self._feed_stdin(custom_stdin))) self._custom_stdout = custom_stdout - self._stderr_locked = asyncio.Lock() @property def closed(self) -> bool: @@ -74,7 +73,9 @@ class AsyncProcess: return self._returncode if self.proc is None: return None - return self.proc.returncode + if (ret_code := self.proc.returncode) is not None: + self._returncode = ret_code + return ret_code async def __aenter__(self) -> AsyncProcess: """Enter context manager.""" @@ -88,7 +89,8 @@ class AsyncProcess: exc_tb: TracebackType | None, ) -> bool | None: """Exit context manager.""" - await self.close() + # send interrupt signal to process when we're cancelled + await self.close(send_signal=exc_type in (GeneratorExit, asyncio.CancelledError)) self._returncode = self.returncode async def start(self) -> None: @@ -105,7 +107,7 @@ class AsyncProcess: async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks of n size from the process stdout.""" - while True: + while self.returncode is None: chunk = await self.readexactly(n) if len(chunk) == 0: break @@ -113,7 +115,7 @@ class AsyncProcess: async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks as they come in from process stdout.""" - while True: + while self.returncode is None: chunk = await self.read(n) if len(chunk) == 0: break @@ -121,6 +123,8 @@ class AsyncProcess: async def readexactly(self, n: int) -> bytes: """Read exactly n bytes from the process stdout (or less if eof).""" + if not self.proc.stdout or self.proc.stdout.at_eof(): + return b"" try: return await self.proc.stdout.readexactly(n) except asyncio.IncompleteReadError as err: @@ -133,11 +137,13 @@ class AsyncProcess: and may return less or equal bytes than requested, but at least one byte. If EOF was received before any byte is read, this function returns empty byte object. """ + if not self.proc.stdout or self.proc.stdout.at_eof(): + return b"" return await self.proc.stdout.read(n) async def write(self, data: bytes) -> None: """Write data to process stdin.""" - if self._close_called or self.proc.stdin.is_closing(): + if self.returncode is not None or self.proc.stdin.is_closing(): raise asyncio.CancelledError("write called while process already done") self.proc.stdin.write(data) with suppress(BrokenPipeError, ConnectionResetError): @@ -145,6 +151,8 @@ class AsyncProcess: async def write_eof(self) -> None: """Write end of file to to process stdin.""" + if self.returncode is not None or self.proc.stdin.is_closing(): + return try: if self.proc.stdin.can_write_eof(): self.proc.stdin.write_eof() @@ -158,7 +166,7 @@ class AsyncProcess: # already exited, race condition pass - async def close(self) -> int: + async def close(self, send_signal: bool = False) -> int: """Close/terminate the process and wait for exit.""" self._close_called = True # close any/all attached (writer) tasks @@ -167,27 +175,26 @@ class AsyncProcess: task.cancel() with suppress(asyncio.CancelledError): await task - - if self.proc.returncode is None: - # always first try to send sigint signal to try clean shutdown - # for example ffmpeg needs this to cleanly shutdown and not lock on pipes + if send_signal and self.returncode is None: self.proc.send_signal(SIGINT) - # allow the process a little bit of time to respond to the signal - await asyncio.sleep(0.1) + # allow the process a bit of time to respond to the signal before we go nuclear + await asyncio.sleep(0.5) - # send communicate until we exited - while self.proc.returncode is None: - # make sure the process is really cleaned up. - # especially with pipes this can cause deadlocks if not properly guarded - # we need to use communicate to ensure buffers are flushed - # we do that with sending communicate - if self._enable_stdin and not self.proc.stdin.is_closing(): - self.proc.stdin.close() + # make sure the process is really cleaned up. + # especially with pipes this can cause deadlocks if not properly guarded + # we need to ensure stdout and stderr are flushed and stdin closed + while self.returncode is None: try: - if self.proc.stdout and self._stderr_locked.locked(): - await asyncio.wait_for(self.proc.stdout.read(), 5) - else: - await asyncio.wait_for(self.proc.communicate(), 5) + async with asyncio.timeout(30): + # abort existing readers on stderr/stdout first before we send communicate + if self.proc.stdout and self.proc.stdout._waiter is not None: + self.proc.stdout._waiter.set_exception(asyncio.CancelledError()) + self.proc.stdout._waiter = None + if self.proc.stderr and self.proc.stderr._waiter is not None: + self.proc.stderr._waiter.set_exception(asyncio.CancelledError()) + self.proc.stderr._waiter = None + # use communicate to flush all pipe buffers + await self.proc.communicate() except TimeoutError: LOGGER.debug( "Process %s with PID %s did not stop in time. Sending terminate...", @@ -199,27 +206,32 @@ class AsyncProcess: "Process %s with PID %s stopped with returncode %s", self._name, self.proc.pid, - self.proc.returncode, + self.returncode, ) - return self.proc.returncode + return self.returncode async def wait(self) -> int: """Wait for the process and return the returncode.""" if self.returncode is not None: return self.returncode - return await self.proc.wait() + self._returncode = await self.proc.wait() + return self._returncode async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]: """Write bytes to process and read back results.""" stdout, stderr = await self.proc.communicate(input_data) + self._returncode = self.proc.returncode return (stdout, stderr) async def iter_stderr(self) -> AsyncGenerator[bytes, None]: """Iterate lines from the stderr stream.""" - while not self.closed: + while self.returncode is None: + if self.proc.stderr.at_eof(): + break try: - async with self._stderr_locked: - yield await self.proc.stderr.readline() + yield await self.proc.stderr.readline() + if self.proc.stderr.at_eof(): + break except ValueError as err: # we're waiting for a line (separator found), but the line was too big # this may happen with ffmpeg during a long (radio) stream where progress diff --git a/music_assistant/server/models/core_controller.py b/music_assistant/server/models/core_controller.py index 00bf37c3..36c24e31 100644 --- a/music_assistant/server/models/core_controller.py +++ b/music_assistant/server/models/core_controller.py @@ -70,6 +70,8 @@ class CoreController: ) if log_level == "GLOBAL": self.logger.setLevel(mass_logger.level) - elif logging.getLogger().level > self.logger.level: + else: + self.logger.setLevel(log_level) + if logging.getLogger().level > self.logger.level: # if the root logger's level is higher, we need to adjust that too logging.getLogger().setLevel(self.logger.level) diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index d8448fca..5768c12d 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -61,7 +61,8 @@ CONF_ALAC_ENCODE = "alac_encode" CONF_VOLUME_START = "volume_start" CONF_PASSWORD = "password" -REQUIRED_BUFFER = int(44100 * (16 / 8) * 2) * 10 # 10 seconds +# the output buffer to raop must be big enough to prevent small hiccups +REQUIRED_BUFFER = int(44100 * (16 / 8) * 2) * 3 # 2 seconds PLAYER_CONFIG_ENTRIES = ( @@ -196,20 +197,12 @@ class AirplayStream: self.active_remote_id: str = str(randint(1000, 8000)) self.start_ntp: int | None = None # use as checksum self.prevent_playback: bool = False + self.running = True self._log_reader_task: asyncio.Task | None = None self._audio_reader_task: asyncio.Task | None = None self._cliraop_proc: AsyncProcess | None = None self._ffmpeg_proc: AsyncProcess | None = None - self._stop_requested = False - - @property - def running(self) -> bool: - """Return bool if we're running.""" - return ( - not self._stop_requested - and self._cliraop_proc - and self._cliraop_proc.returncode is None - ) + self._buffer = asyncio.Queue(10) async def start(self, start_ntp: int) -> None: """Initialize CLIRaop process for a player.""" @@ -241,7 +234,7 @@ class AirplayStream: "-port", str(self.airplay_player.discovery_info.port), "-wait", - str(2000 - sync_adjust), + str(2500 - sync_adjust), "-volume", str(mass_player.volume_level), *extra_args, @@ -261,6 +254,16 @@ class AirplayStream: # one could argue that the intermediate ffmpeg towards cliraop is not needed # when there are no player specific filters or extras but in this case # ffmpeg serves as a small buffer towards the realtime cliraop streamer + read, write = os.pipe() + + async def read_from_buffer() -> AsyncGenerator[bytes, None]: + while True: + next_chunk = await self._buffer.get() + if not next_chunk: + break + yield next_chunk + del next_chunk + ffmpeg_args = get_ffmpeg_args( input_format=self.input_format, output_format=AIRPLAY_PCM_FORMAT, @@ -272,35 +275,40 @@ class AirplayStream: enable_stdin=True, enable_stdout=True, enable_stderr=False, + custom_stdin=read_from_buffer(), + custom_stdout=write, name="cliraop_ffmpeg", ) await self._ffmpeg_proc.start() + os.close(write) + self._cliraop_proc = AsyncProcess( cliraop_args, enable_stdin=True, enable_stdout=False, enable_stderr=True, - custom_stdin=self._audio_feeder(), + custom_stdin=read, name="cliraop", ) await self._cliraop_proc.start() + os.close(read) self._log_reader_task = asyncio.create_task(self._log_watcher()) async def stop(self, wait: bool = True): """Stop playback and cleanup.""" - if self._cliraop_proc.closed and self._ffmpeg_proc.closed: - return - self._stop_requested = True + self.running = False async def _stop() -> None: # ffmpeg MUST be stopped before cliraop due to the chained pipes - await self._ffmpeg_proc.close() + if not self._ffmpeg_proc.closed: + await self._ffmpeg_proc.close(True) # allow the cliraop process to stop gracefully first - await self.send_cli_command("ACTION=STOP") - with suppress(TimeoutError): - await asyncio.wait_for(self._cliraop_proc.wait(), 5) + if not self._cliraop_proc.closed: + await self.send_cli_command("ACTION=STOP") + with suppress(TimeoutError): + await asyncio.wait_for(self._cliraop_proc.wait(), 5) # send regular close anyway (which also logs the returncode) - await self._cliraop_proc.close() + await self._cliraop_proc.close(True) task = self.mass.create_task(_stop()) if wait: @@ -308,13 +316,13 @@ class AirplayStream: async def write_chunk(self, chunk: bytes) -> None: """Write a (pcm) audio chunk to ffmpeg.""" - await self._ffmpeg_proc.write(chunk) + await self._buffer.put(chunk) async def write_eof(self) -> None: """Write EOF to the ffmpeg stdin.""" - await self._ffmpeg_proc.write_eof() - await self._ffmpeg_proc.wait() - await self.stop() + if not self.running: + return + await self._buffer.put(b"") async def send_cli_command(self, command: str) -> None: """Send an interactive command to the running CLIRaop binary.""" @@ -380,7 +388,7 @@ class AirplayStream: self.mass.players.update(airplay_player.player_id) if "lost packet out of backlog" in line: lost_packets += 1 - if lost_packets == 100: + if lost_packets == 50: logger.error("High packet loss detected, stopping playback...") await self.stop(False) elif lost_packets % 10 == 0: @@ -389,16 +397,16 @@ class AirplayStream: logger.log(VERBOSE_LOG_LEVEL, line) # if we reach this point, the process exited + self.running = False if airplay_player.active_stream == self: mass_player.state = PlayerState.IDLE self.mass.players.update(airplay_player.player_id) # ensure we're cleaned up afterwards - await self.stop() + if self._ffmpeg_proc.returncode is None or self._cliraop_proc.returncode is None: + await self.stop() async def _send_metadata(self, queue: PlayerQueue) -> None: """Send metadata to player (and connected sync childs).""" - if not self.running: - return if not queue or not queue.current_item: return duration = min(queue.current_item.duration or 0, 3600) @@ -437,31 +445,11 @@ class AirplayStream: async def _send_progress(self, queue: PlayerQueue) -> None: """Send progress report to player (and connected sync childs).""" - if not self.running: - return if not queue or not queue.current_item: return progress = int(queue.corrected_elapsed_time) await self.send_cli_command(f"PROGRESS={progress}\n") - async def _audio_feeder(self) -> AsyncGenerator[bytes, None]: - """Read chunks from ffmpeg and feed (buffered) to cliraop.""" - buffer = b"" - async for chunk in self._ffmpeg_proc.iter_any(): - if self._stop_requested: - break - buffer += chunk - chunksize = len(chunk) - del chunk - while len(buffer) > REQUIRED_BUFFER: - yield buffer[:chunksize] - buffer = buffer[chunksize:] - # end of stream - if not self._stop_requested: - yield buffer - await self._cliraop_proc.write_eof() - del buffer - @dataclass class AirPlayPlayer: @@ -666,18 +654,17 @@ class AirplayProvider(PlayerProvider): start_ntp = int(stdout.strip()) # setup Raop process for player and its sync childs - async with asyncio.TaskGroup() as tg: - for airplay_player in self._get_sync_clients(player_id): - airplay_player.active_stream = AirplayStream( - self, airplay_player, input_format=input_format - ) - tg.create_task(airplay_player.active_stream.start(start_ntp)) + for airplay_player in self._get_sync_clients(player_id): + airplay_player.active_stream = AirplayStream( + self, airplay_player, input_format=input_format + ) + self.mass.create_task(airplay_player.active_stream.start(start_ntp)) async for chunk in audio_source: active_clients = 0 async with asyncio.TaskGroup() as tg: for airplay_player in self._get_sync_clients(player_id): - if not (airplay_player.active_stream and airplay_player.active_stream.running): + if not airplay_player.active_stream or not airplay_player.active_stream.running: # player stopped or switched to a new stream continue if airplay_player.active_stream.start_ntp != start_ntp: @@ -689,14 +676,9 @@ class AirplayProvider(PlayerProvider): if active_clients == 0: # no more clients return - # entire stream consumed: send EOF + # entire stream consumed: send EOF (empty chunk) async with asyncio.TaskGroup() as tg: for airplay_player in self._get_sync_clients(player_id): - if ( - not airplay_player.active_stream - or airplay_player.active_stream.start_ntp != start_ntp - ): - continue tg.create_task(airplay_player.active_stream.write_eof()) async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 0a903ca4..6698b803 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -330,7 +330,7 @@ class SnapCastProvider(PlayerProvider): input_format = DEFAULT_SNAPCAST_FORMAT audio_source = self.mass.streams.get_announcement_stream( queue_item.streamdetails.data["url"], - pcm_format=DEFAULT_SNAPCAST_FORMAT, + output_format=DEFAULT_SNAPCAST_FORMAT, use_pre_announce=queue_item.streamdetails.data["use_pre_announce"], ) else: diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index 23fe6b3b..c9560b40 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -637,10 +637,10 @@ class SpotifyProvider(MusicProvider): try: retries += 1 if not tokeninfo: - async with asyncio.timeout(5): + async with asyncio.timeout(10): tokeninfo = await self._get_token() if tokeninfo and not userinfo: - async with asyncio.timeout(5): + async with asyncio.timeout(10): userinfo = await self._get_data("me", tokeninfo=tokeninfo) if tokeninfo and userinfo: # we have all info we need! @@ -689,10 +689,8 @@ class SpotifyProvider(MusicProvider): ] if self._ap_workaround: args += ["--ap-port", "12345"] - librespot = await asyncio.create_subprocess_exec( - *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT - ) - stdout, _ = await librespot.communicate() + async with AsyncProcess(args, enable_stdout=True) as librespot: + stdout = await librespot.read(-1) if stdout.decode().strip() != "authorized": raise LoginFailed(f"Login failed for username {self.config.get_value(CONF_USERNAME)}") # get token with (authorized) librespot @@ -727,10 +725,8 @@ class SpotifyProvider(MusicProvider): ] if self._ap_workaround: args += ["--ap-port", "12345"] - librespot = await asyncio.create_subprocess_exec( - *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT - ) - stdout, _ = await librespot.communicate() + async with AsyncProcess(args, enable_stdout=True) as librespot: + stdout = await librespot.read(-1) duration = round(time.time() - time_start, 2) try: result = json.loads(stdout) diff --git a/music_assistant/server/providers/url/__init__.py b/music_assistant/server/providers/url/__init__.py index c3ec5dd7..bffb8cbc 100644 --- a/music_assistant/server/providers/url/__init__.py +++ b/music_assistant/server/providers/url/__init__.py @@ -152,14 +152,14 @@ class URLProvider(MusicProvider): media_item = Radio( item_id=url, provider=self.domain, - name=media_info.get("icy-name") or media_info.title, + name=media_info.get("icy-name") or url, provider_mappings=provider_mappings, ) else: media_item = Track( item_id=url, provider=self.domain, - name=media_info.title, + name=media_info.title or url, duration=int(media_info.duration or 0), artists=[await self.get_artist(artist) for artist in media_info.artists], provider_mappings=provider_mappings, diff --git a/music_assistant/server/server.py b/music_assistant/server/server.py index 78398cf9..e4ff1a35 100644 --- a/music_assistant/server/server.py +++ b/music_assistant/server/server.py @@ -262,9 +262,9 @@ class MusicAssistant: if self.closing: return - if LOGGER.isEnabledFor(logging.DEBUG) and event != EventType.QUEUE_TIME_UPDATED: + if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL): # do not log queue time updated events because that is too chatty - LOGGER.getChild("event").debug("%s %s", event.value, object_id or "") + LOGGER.getChild("event").log(VERBOSE_LOG_LEVEL, "%s %s", event.value, object_id or "") event_obj = MassEvent(event=event, object_id=object_id, data=data) for cb_func, event_filter, id_filter in self._subscribers: @@ -362,13 +362,13 @@ class MusicAssistant: *args: Any, task_id: str | None = None, **kwargs: Any, - ) -> asyncio.Task | asyncio.Future: + ) -> asyncio.TimerHandle: """Run callable/awaitable after given delay.""" def _create_task() -> None: self.create_task(target, *args, task_id=task_id, **kwargs) - self.loop.call_later(delay, _create_task) + return self.loop.call_later(delay, _create_task) def get_task(self, task_id: str) -> asyncio.Task | asyncio.Future: """Get existing scheduled task.""" -- 2.34.1