From c836c2572eb916a5f15f6c5e0e1d47a473d8ac77 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 10 Apr 2024 21:45:12 +0200 Subject: [PATCH] Various fixes and code cleanup to the streaming logic (#1216) --- music_assistant/common/helpers/util.py | 13 + .../common/models/streamdetails.py | 3 +- music_assistant/constants.py | 2 +- music_assistant/server/controllers/cache.py | 1 + .../server/controllers/media/artists.py | 1 + .../server/controllers/media/playlists.py | 10 +- .../server/controllers/media/tracks.py | 1 + .../server/controllers/metadata.py | 1 + music_assistant/server/controllers/music.py | 4 +- .../server/controllers/player_queues.py | 2 + music_assistant/server/controllers/streams.py | 245 +++++++----------- music_assistant/server/helpers/audio.py | 166 +++++++++--- music_assistant/server/helpers/database.py | 2 + music_assistant/server/helpers/process.py | 151 +++++------ .../server/models/music_provider.py | 3 + .../server/models/player_provider.py | 2 + .../server/providers/airplay/__init__.py | 71 +++-- .../server/providers/chromecast/__init__.py | 8 +- .../server/providers/filesystem_local/base.py | 3 + .../providers/filesystem_smb/__init__.py | 22 +- .../server/providers/slimproto/__init__.py | 4 +- .../server/providers/snapcast/__init__.py | 4 +- .../server/providers/spotify/__init__.py | 27 +- 23 files changed, 379 insertions(+), 367 deletions(-) diff --git a/music_assistant/common/helpers/util.py b/music_assistant/common/helpers/util.py index 9b7bd055..989588a7 100644 --- a/music_assistant/common/helpers/util.py +++ b/music_assistant/common/helpers/util.py @@ -46,6 +46,19 @@ def try_parse_bool(possible_bool: Any) -> str: return possible_bool in ["true", "True", "1", "on", "ON", 1] +def try_parse_duration(duration_str: str) -> float: + """Try to parse a duration in seconds from a duration (HH:MM:SS) string.""" + milliseconds = float("0." + duration_str.split(".")[-1]) if "." in duration_str else 0.0 + duration_parts = duration_str.split(".")[0].split(",")[0].split(":") + if len(duration_parts) == 3: + seconds = sum(x * int(t) for x, t in zip([3600, 60, 1], duration_parts, strict=False)) + elif len(duration_parts) == 2: + seconds = sum(x * int(t) for x, t in zip([60, 1], duration_parts, strict=False)) + else: + seconds = int(duration_parts[0]) + return seconds + milliseconds + + def create_sort_name(input_str: str) -> str: """Create sort name/title from string.""" input_str = input_str.lower().strip() diff --git a/music_assistant/common/models/streamdetails.py b/music_assistant/common/models/streamdetails.py index 230c5acf..c56b6f88 100644 --- a/music_assistant/common/models/streamdetails.py +++ b/music_assistant/common/models/streamdetails.py @@ -19,6 +19,7 @@ class LoudnessMeasurement(DataClassDictMixin): true_peak: float lra: float threshold: float + target_offset: float | None = None @dataclass(kw_only=True) @@ -50,8 +51,6 @@ class StreamDetails(DataClassDictMixin): # can_seek: bool to indicate that the providers 'get_audio_stream' supports seeking of the item can_seek: bool = True - # stream_type: - # the fields below will be set/controlled by the streamcontroller seek_position: int = 0 fade_in: bool = False diff --git a/music_assistant/constants.py b/music_assistant/constants.py index a2eaa1f0..861d558e 100644 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -5,7 +5,7 @@ from typing import Final API_SCHEMA_VERSION: Final[int] = 24 MIN_SCHEMA_VERSION: Final[int] = 24 -DB_SCHEMA_VERSION: Final[int] = 28 +DB_SCHEMA_VERSION: Final[int] = 29 MASS_LOGGER_NAME: Final[str] = "music_assistant" diff --git a/music_assistant/server/controllers/cache.py b/music_assistant/server/controllers/cache.py index 23fca100..6eae0310 100644 --- a/music_assistant/server/controllers/cache.py +++ b/music_assistant/server/controllers/cache.py @@ -164,6 +164,7 @@ class CacheController(CoreController): if db_row["expires"] < cur_timestamp: await self.delete(db_row["key"]) cleaned_records += 1 + await asyncio.sleep(0) # yield to eventloop if cleaned_records > 50: self.logger.debug("Compacting database...") await self.database.vacuum() diff --git a/music_assistant/server/controllers/media/artists.py b/music_assistant/server/controllers/media/artists.py index f9c33287..6d39428a 100644 --- a/music_assistant/server/controllers/media/artists.py +++ b/music_assistant/server/controllers/media/artists.py @@ -87,6 +87,7 @@ class ArtistsController(MediaControllerBase[Artist]): # overhead of grabbing the musicbrainz id upfront library_item = await self.update_item_in_library(db_item.item_id, item) break + await asyncio.sleep(0) # yield to eventloop if not library_item: # actually add (or update) the item in the library db # use the lock to prevent a race condition of the same item being added twice diff --git a/music_assistant/server/controllers/media/playlists.py b/music_assistant/server/controllers/media/playlists.py index 91eb00ad..0e5a131a 100644 --- a/music_assistant/server/controllers/media/playlists.py +++ b/music_assistant/server/controllers/media/playlists.py @@ -18,12 +18,7 @@ from music_assistant.common.models.errors import ( ProviderUnavailableError, UnsupportedFeaturedException, ) -from music_assistant.common.models.media_items import ( - ItemMapping, - Playlist, - PlaylistTrack, - Track, -) +from music_assistant.common.models.media_items import ItemMapping, Playlist, PlaylistTrack, Track from music_assistant.constants import DB_TABLE_PLAYLISTS from music_assistant.server.helpers.compare import compare_strings @@ -93,7 +88,7 @@ class PlaylistController(MediaControllerBase[Playlist]): library_item = await self._add_library_item(item) # preload playlist tracks listing (do not load them in the db) async for _ in self.tracks(item.item_id, item.provider): - pass + await asyncio.sleep(0) # yield to eventloop # metadata lookup we need to do after adding it to the db if metadata_lookup: await self.mass.metadata.get_playlist_metadata(library_item) @@ -228,6 +223,7 @@ class PlaylistController(MediaControllerBase[Playlist]): if i.provider_instance == playlist_prov.provider_instance } ) + await asyncio.sleep(0) # yield to eventloop # check for duplicates for track_prov in track.provider_mappings: if ( diff --git a/music_assistant/server/controllers/media/tracks.py b/music_assistant/server/controllers/media/tracks.py index b3068511..70ef210e 100644 --- a/music_assistant/server/controllers/media/tracks.py +++ b/music_assistant/server/controllers/media/tracks.py @@ -161,6 +161,7 @@ class TracksController(MediaControllerBase[Track]): # existing item found: update it library_item = await self.update_item_in_library(db_item.item_id, item) break + await asyncio.sleep(0) # yield to eventloop if not library_item: # actually add a new item in the library db # use the lock to prevent a race condition of the same item being added twice diff --git a/music_assistant/server/controllers/metadata.py b/music_assistant/server/controllers/metadata.py index d52c5fd3..23ff8b05 100644 --- a/music_assistant/server/controllers/metadata.py +++ b/music_assistant/server/controllers/metadata.py @@ -198,6 +198,7 @@ class MetaDataController(CoreController): if genre not in playlist_genres: playlist_genres[genre] = 0 playlist_genres[genre] += 1 + await asyncio.sleep(0) # yield to eventloop playlist_genres_filtered = { genre for genre, count in playlist_genres.items() if count > 5 diff --git a/music_assistant/server/controllers/music.py b/music_assistant/server/controllers/music.py index db34f2f6..d5d2ac8f 100644 --- a/music_assistant/server/controllers/music.py +++ b/music_assistant/server/controllers/music.py @@ -478,6 +478,7 @@ class MusicController(CoreController): "true_peak": loudness.true_peak, "lra": loudness.lra, "threshold": loudness.threshold, + "target_offset": loudness.target_offset, }, allow_replace=True, ) @@ -659,7 +660,7 @@ class MusicController(CoreController): await asyncio.to_thread(shutil.copyfile, db_path, db_path_backup) # handle db migration from previous schema to this one - if prev_version == 27: + if prev_version in (27, 28): self.logger.info( "Performing database migration from %s to %s", prev_version, @@ -727,6 +728,7 @@ class MusicController(CoreController): true_peak REAL, lra REAL, threshold REAL, + target_offset REAL, UNIQUE(item_id, provider));""" ) await self.database.execute( diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index b662c560..7970ee8c 100644 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import random import time from collections.abc import AsyncGenerator @@ -325,6 +326,7 @@ class PlayerQueuesController(CoreController): elif media_item.media_type == MediaType.PLAYLIST: async for playlist_track in ctrl.tracks(media_item.item_id, media_item.provider): tracks.append(playlist_track) + await asyncio.sleep(0) # yield to eventloop await self.mass.music.mark_item_played( media_item.media_type, media_item.item_id, media_item.provider ) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 1a1a3a18..b8a1d6e9 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -19,7 +19,12 @@ from typing import TYPE_CHECKING from aiofiles.os import wrap from aiohttp import web -from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool +from music_assistant.common.helpers.util import ( + get_ip, + select_free_port, + try_parse_bool, + try_parse_duration, +) from music_assistant.common.models.config_entries import ( ConfigEntry, ConfigValueOption, @@ -254,7 +259,7 @@ class StreamsController(CoreController): output_format_str=request.match_info["fmt"], queue_player=queue_player, default_sample_rate=queue_item.streamdetails.audio_format.sample_rate, - default_bit_depth=queue_item.streamdetails.audio_format.bit_depth, + default_bit_depth=24, # always prefer 24 bits to prevent dithering ) # prepare request, add some DLNA/UPNP compatible headers headers = { @@ -279,21 +284,10 @@ class StreamsController(CoreController): queue.display_name, ) queue.index_in_buffer = self.mass.player_queues.index_by_id(queue_id, queue_item_id) - pcm_format = AudioFormat( - content_type=ContentType.from_bit_depth( - queue_item.streamdetails.audio_format.bit_depth - ), - sample_rate=queue_item.streamdetails.audio_format.sample_rate, - bit_depth=queue_item.streamdetails.audio_format.bit_depth, - ) - async for chunk in get_ffmpeg_stream( - audio_input=self._get_media_stream( - streamdetails=queue_item.streamdetails, - pcm_format=pcm_format, - ), - input_format=pcm_format, + async for chunk in self.get_media_stream( + streamdetails=queue_item.streamdetails, output_format=output_format, - filter_params=get_player_filter_params(self.mass, queue_player.player_id), + extra_filter_params=get_player_filter_params(self.mass, queue_player.player_id), ): try: await resp.write(chunk) @@ -320,7 +314,7 @@ class StreamsController(CoreController): output_format_str=request.match_info["fmt"], queue_player=queue_player, default_sample_rate=FLOW_DEFAULT_SAMPLE_RATE, - default_bit_depth=FLOW_DEFAULT_BIT_DEPTH, + default_bit_depth=24, # always prefer 24 bits to prevent dithering ) # play it safe: only allow icy metadata for mp3 and aac enable_icy = request.headers.get( @@ -545,21 +539,19 @@ class StreamsController(CoreController): bytes_written = 0 buffer = b"" # handle incoming audio chunks - async for chunk in self._get_media_stream( + async for chunk in self.get_media_stream( queue_track.streamdetails, - pcm_format=pcm_format, - # strip silence from begin/end if track is being crossfaded - strip_silence_begin=use_crossfade and total_bytes_sent > 0, - strip_silence_end=use_crossfade, + output_format=pcm_format, ): # buffer size needs to be big enough to include the crossfade part # allow it to be a bit smaller when playback just starts - if not use_crossfade: - req_buffer_size = pcm_sample_size - elif (total_bytes_sent + bytes_written) < crossfade_size: - req_buffer_size = int(crossfade_size / 2) + if not use_crossfade or (total_bytes_sent + bytes_written == 0): + req_buffer_size = pcm_sample_size * 2 + elif (total_bytes_sent + bytes_written) < (crossfade_size * 2): + req_buffer_size = pcm_sample_size * 5 else: - req_buffer_size = crossfade_size + # additional 5 seconds to strip silence from last part + req_buffer_size = crossfade_size + pcm_sample_size * 5 # ALWAYS APPEND CHUNK TO BUFFER buffer += chunk @@ -570,6 +562,14 @@ class StreamsController(CoreController): #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK if last_fadeout_part: + # strip silence from last part + buffer = await strip_silence( + self.mass, + buffer, + sample_rate=pcm_format.sample_rate, + bit_depth=pcm_format.bit_depth, + reverse=False, + ) # perform crossfade fadein_part = buffer[:crossfade_size] remaining_bytes = buffer[crossfade_size:] @@ -595,6 +595,7 @@ class StreamsController(CoreController): #### OTHER: enough data in buffer, feed to output while len(buffer) > req_buffer_size: yield buffer[:pcm_sample_size] + await asyncio.sleep(0) # yield to eventloop bytes_written += pcm_sample_size buffer = buffer[pcm_sample_size:] @@ -605,6 +606,14 @@ class StreamsController(CoreController): bytes_written += len(last_fadeout_part) last_fadeout_part = b"" if use_crossfade: + # strip silence from last part + buffer = await strip_silence( + self.mass, + buffer, + sample_rate=pcm_format.sample_rate, + bit_depth=pcm_format.bit_depth, + reverse=True, + ) # if crossfade is enabled, save fadeout part to pickup for next track last_fadeout_part = buffer[-crossfade_size:] remaining_bytes = buffer[:-crossfade_size] @@ -660,7 +669,7 @@ class StreamsController(CoreController): "-i", ANNOUNCE_ALERT_FILE, "-filter_complex", - "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=11:TP=-2", + "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=11:TP=-1.5", ] filter_params = [] async for chunk in get_ffmpeg_stream( @@ -672,52 +681,34 @@ class StreamsController(CoreController): ): yield chunk - async def _get_media_stream( + async def get_media_stream( self, streamdetails: StreamDetails, - pcm_format: AudioFormat, - strip_silence_begin: bool = False, - strip_silence_end: bool = False, + output_format: AudioFormat, + extra_filter_params: list[str] | None = None, ) -> AsyncGenerator[tuple[bool, bytes], None]: - """ - Get the (raw PCM) audio stream for the given streamdetails. - - Other than stripping silence at end and beginning and optional - volume normalization this is the pure, unaltered audio data as PCM chunks. - """ + """Get the audio stream for the given streamdetails.""" logger = self.logger.getChild("media_stream") is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration if is_radio: streamdetails.seek_position = 0 - strip_silence_begin = False - strip_silence_end = False - if streamdetails.seek_position: - strip_silence_begin = False - if not streamdetails.duration or streamdetails.duration < 30: - strip_silence_end = False - # pcm_sample_size = chunk size = 1 second of pcm audio - pcm_sample_size = pcm_format.pcm_sample_size - buffer_size = ( - pcm_sample_size * 5 - if (strip_silence_begin or strip_silence_end) - # always require a small amount of buffer to prevent livestreams stuttering - else pcm_sample_size * 2 - ) # collect all arguments for ffmpeg - filter_params = [] + filter_params = extra_filter_params or [] if streamdetails.target_loudness is not None: # add loudnorm filters - filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=11:TP=-2" + filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-1.5:LRA=11" if streamdetails.loudness: filter_rule += f":measured_I={streamdetails.loudness.integrated}" filter_rule += f":measured_LRA={streamdetails.loudness.lra}" filter_rule += f":measured_tp={streamdetails.loudness.true_peak}" filter_rule += f":measured_thresh={streamdetails.loudness.threshold}" + if streamdetails.loudness.target_offset is not None: + filter_rule += f":offset={streamdetails.loudness.target_offset}" + filter_rule += ":linear=true" filter_rule += ":print_format=json" filter_params.append(filter_rule) if streamdetails.fade_in: filter_params.append("afade=type=in:start_time=0:duration=3") - if streamdetails.stream_type == StreamType.CUSTOM: audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream( streamdetails, @@ -732,13 +723,14 @@ class StreamsController(CoreController): extra_input_args = [] if streamdetails.seek_position and streamdetails.stream_type != StreamType.CUSTOM: extra_input_args += ["-ss", str(int(streamdetails.seek_position))] - logger.debug("start media stream for: %s", streamdetails.uri) - state_data = {"finished": asyncio.Event(), "bytes_sent": 0} + logger.debug("start media stream for: %s", streamdetails.uri) + bytes_sent = 0 + finished = False async with FFMpeg( audio_input=audio_source, input_format=streamdetails.audio_format, - output_format=pcm_format, + output_format=output_format, filter_params=filter_params, extra_input_args=[ *extra_input_args, @@ -747,61 +739,53 @@ class StreamsController(CoreController): "-filter_threads", "1", ], - name="ffmpeg_media_stream", - stderr_enabled=True, + collect_log_history=True, + logger=logger, ) as ffmpeg_proc: - - async def log_reader(): - # To prevent stderr locking up, we must keep reading it - stderr_data = "" - async for line in ffmpeg_proc.iter_stderr(): - if "error" in line or "warning" in line: - logger.warning(line) - elif "critical" in line: - logger.critical(line) - elif ( - streamdetails.audio_format.content_type == ContentType.UNKNOWN - and line.startswith("Stream #0:0: Audio: ") - ): - # if streamdetails contenttype is unknown, try parse it from the ffmpeg log - streamdetails.audio_format.content_type = ContentType.try_parse( - line.split("Stream #0:0: Audio: ")[1].split(" ")[0] - ) - elif stderr_data or "loudnorm" in line: - stderr_data += line - else: - logger.debug(line) - del line - - # if we reach this point, the process is finished (completed or aborted) - if ffmpeg_proc.returncode == 0: - await state_data["finished"].wait() - finished = state_data["finished"].is_set() - bytes_sent = state_data["bytes_sent"] - seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 - streamdetails.seconds_streamed = seconds_streamed - state_str = "finished" if finished else "aborted" + try: + async for chunk in ffmpeg_proc.iter_any(): + bytes_sent += len(chunk) + yield chunk + del chunk + finished = True + finally: + await ffmpeg_proc.close() logger.debug( - "stream %s for: %s (%s seconds streamed, exitcode %s)", - state_str, - streamdetails.uri, - seconds_streamed, + "stream %s (with code %s) for %s", + "finished" if finished else "aborted", ffmpeg_proc.returncode, + streamdetails.uri, ) + # try to determine how many seconds we've streamed + seconds_streamed = 0 + if output_format.content_type.is_pcm(): + seconds_streamed = ( + bytes_sent / output_format.pcm_sample_size if bytes_sent else 0 + ) + elif line := next((x for x in ffmpeg_proc.log_history if "time=" in x), None): + duration_str = line.split("time=")[1].split(" ")[0] + seconds_streamed = try_parse_duration(duration_str) + + if seconds_streamed: + streamdetails.seconds_streamed = seconds_streamed # store accurate duration - if finished and not streamdetails.seek_position: + if finished and not streamdetails.seek_position and seconds_streamed: streamdetails.duration = seconds_streamed # parse loudnorm data if we have that collected - if stderr_data and (loudness_details := parse_loudnorm(stderr_data)): + if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)): required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120 if finished or (seconds_streamed >= required_seconds): logger.debug( - "Loudness measurement for %s: %s", streamdetails.uri, loudness_details + "Loudness measurement for %s: %s", + streamdetails.uri, + loudness_details, ) streamdetails.loudness = loudness_details - await self.mass.music.set_track_loudness( - streamdetails.item_id, streamdetails.provider, loudness_details + self.mass.create_task( + self.mass.music.set_track_loudness( + streamdetails.item_id, streamdetails.provider, loudness_details + ) ) # report playback @@ -809,70 +793,15 @@ class StreamsController(CoreController): if finished or seconds_streamed > 30: self.mass.create_task( self.mass.music.mark_item_played( - streamdetails.media_type, streamdetails.item_id, streamdetails.provider + streamdetails.media_type, + streamdetails.item_id, + streamdetails.provider, ) ) if music_prov := self.mass.get_provider(streamdetails.provider): self.mass.create_task( music_prov.on_streamed(streamdetails, seconds_streamed) ) - # cleanup - del stderr_data - - self.mass.create_task(log_reader()) - - # get pcm chunks from stdout - # we always stay buffer_size of bytes behind - # so we can strip silence at the beginning and end of a track - buffer = b"" - chunk_num = 0 - async for chunk in ffmpeg_proc.iter_chunked(pcm_sample_size): - chunk_num += 1 - buffer += chunk - del chunk - - if len(buffer) < buffer_size: - # buffer is not full enough, move on - continue - - if strip_silence_begin and chunk_num == 2: - # first 2 chunks received, strip silence of beginning - stripped_audio = await strip_silence( - self.mass, - buffer, - sample_rate=pcm_format.sample_rate, - bit_depth=pcm_format.bit_depth, - ) - yield stripped_audio - state_data["bytes_sent"] += len(stripped_audio) - buffer = b"" - del stripped_audio - continue - - #### OTHER: enough data in buffer, feed to output - while len(buffer) > buffer_size: - yield buffer[:pcm_sample_size] - state_data["bytes_sent"] += pcm_sample_size - buffer = buffer[pcm_sample_size:] - - # all chunks received, strip silence of last part if needed and yield remaining bytes - if strip_silence_end: - final_chunk = await strip_silence( - self.mass, - buffer, - sample_rate=pcm_format.sample_rate, - bit_depth=pcm_format.bit_depth, - reverse=True, - ) - else: - final_chunk = buffer - - # yield final chunk to output (as one big chunk) - yield final_chunk - state_data["bytes_sent"] += len(final_chunk) - state_data["finished"].set() - del final_chunk - del buffer def _log_request(self, request: web.Request) -> None: """Log request.""" diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index cd8013fa..8e70a5f2 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -8,7 +8,10 @@ import os import re import struct from collections import deque +from collections.abc import AsyncGenerator +from contextlib import suppress from io import BytesIO +from signal import SIGINT from typing import TYPE_CHECKING import aiofiles @@ -46,12 +49,10 @@ from music_assistant.server.helpers.playlists import ( ) from music_assistant.server.helpers.tags import parse_tags -from .process import AsyncProcess, check_output +from .process import AsyncProcess, check_output, communicate from .util import create_tempfile if TYPE_CHECKING: - from collections.abc import AsyncGenerator - from music_assistant.common.models.player_queue import QueueItem from music_assistant.server import MusicAssistant @@ -66,7 +67,7 @@ HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"} class FFMpeg(AsyncProcess): """FFMpeg wrapped as AsyncProcess.""" - def __init__( # noqa: PLR0913 + def __init__( self, audio_input: AsyncGenerator[bytes, None] | str | int, input_format: AudioFormat, @@ -74,10 +75,9 @@ class FFMpeg(AsyncProcess): filter_params: list[str] | None = None, extra_args: list[str] | None = None, extra_input_args: list[str] | None = None, - name: str = "ffmpeg", - stderr_enabled: bool = False, audio_output: str | int = "-", - loglevel: str | None = None, + collect_log_history: bool = False, + logger: logging.Logger | None = None, ) -> None: """Initialize AsyncProcess.""" ffmpeg_args = get_ffmpeg_args( @@ -88,17 +88,92 @@ class FFMpeg(AsyncProcess): input_path=audio_input if isinstance(audio_input, str) else "-", output_path=audio_output if isinstance(audio_output, str) else "-", extra_input_args=extra_input_args or [], - loglevel=loglevel or "info" - if stderr_enabled or LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) - else "error", + loglevel="info", ) + self.audio_input = audio_input + self.input_format = input_format + self.collect_log_history = collect_log_history + self.log_history: deque[str] = deque(maxlen=100) + self._stdin_task: asyncio.Task | None = None + self._logger_task: asyncio.Task | None = None super().__init__( ffmpeg_args, - stdin=True if isinstance(audio_input, str) else audio_input, + stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input, stdout=True if isinstance(audio_output, str) else audio_output, - stderr=stderr_enabled, - name=name, + stderr=True, ) + self.logger = logger or LOGGER.getChild("ffmpeg") + + async def start(self) -> None: + """Perform Async init of process.""" + await super().start() + self._logger_task = asyncio.create_task(self._log_reader_task()) + if isinstance(self.audio_input, AsyncGenerator): + self._stdin_task = asyncio.create_task(self._feed_stdin()) + + async def close(self, send_signal: bool = True) -> None: + """Close/terminate the process and wait for exit.""" + if self._stdin_task and not self._stdin_task.done(): + self._stdin_task.cancel() + with suppress(asyncio.CancelledError): + await self._stdin_task + # make sure the stdin generator is also properly closed + # by propagating a cancellederror within + task = asyncio.create_task(self.audio_input.__anext__()) + task.cancel() + if not self.collect_log_history: + await super().close(send_signal) + return + # override close logic to make sure we catch all logging + self._close_called = True + if send_signal and self.returncode is None: + self.proc.send_signal(SIGINT) + if self.proc.stdin and not self.proc.stdin.is_closing(): + self.proc.stdin.close() + await asyncio.sleep(0) # yield to loop + # abort existing readers on stdout first before we send communicate + if self.proc.stdout: + if self.proc.stdout._waiter is not None: + with suppress(asyncio.exceptions.InvalidStateError): + self.proc.stdout._waiter.set_exception(asyncio.CancelledError()) + # read reamaing bytes to unblock pipe + await self.read(-1) + # wait for log task to complete that reads the remaining data from stderr + with suppress(TimeoutError): + await asyncio.wait_for(self._logger_task, 5) + await super().close(False) + + async def _log_reader_task(self) -> None: + """Read ffmpeg log from stderr.""" + async for line in self.iter_stderr(): + if self.collect_log_history: + self.log_history.append(line) + if "error" in line or "warning" in line: + self.logger.warning(line) + elif "critical" in line: + self.logger.critical(line) + else: + self.logger.log(VERBOSE_LOG_LEVEL, line) + + # if streamdetails contenttype is unknown, try parse it from the ffmpeg log + if line.startswith("Stream #0:0: Audio: "): + if self.input_format.content_type == ContentType.UNKNOWN: + content_type_raw = line.split("Stream #0:0: Audio: ")[1].split(" ")[0] + content_type = ContentType.try_parse(content_type_raw) + self.logger.info( + "Detected (input) content type: %s (%s)", content_type, content_type_raw + ) + self.input_format.content_type = content_type + del line + + async def _feed_stdin(self) -> None: + """Feed stdin with audio chunks from an AsyncGenerator.""" + if TYPE_CHECKING: + self.audio_input: AsyncGenerator[bytes, None] + async for chunk in self.audio_input: + await self.write(chunk) + # write EOF once we've reached the end of the input stream + await self.write_eof() async def crossfade_pcm_parts( @@ -151,26 +226,24 @@ async def crossfade_pcm_parts( fmt, "-", ] - async with AsyncProcess(args, stdin=True, stdout=True) as proc: - crossfade_data, _ = await proc.communicate(fade_in_part) - if crossfade_data: - LOGGER.log( - 5, - "crossfaded 2 pcm chunks. fade_in_part: %s - " - "fade_out_part: %s - fade_length: %s seconds", - len(fade_in_part), - len(fade_out_part), - fade_length, - ) - return crossfade_data - # no crossfade_data, return original data instead - LOGGER.debug( - "crossfade of pcm chunks failed: not enough data? " - "fade_in_part: %s - fade_out_part: %s", + _returncode, crossfaded_audio, _stderr = await communicate(args, fade_in_part) + if crossfaded_audio: + LOGGER.log( + 5, + "crossfaded 2 pcm chunks. fade_in_part: %s - " + "fade_out_part: %s - fade_length: %s seconds", len(fade_in_part), len(fade_out_part), + fade_length, ) - return fade_out_part + fade_in_part + return crossfaded_audio + # no crossfade_data, return original data instead + LOGGER.debug( + "crossfade of pcm chunks failed: not enough data? " "fade_in_part: %s - fade_out_part: %s", + len(fade_in_part), + len(fade_out_part), + ) + return fade_out_part + fade_in_part async def strip_silence( @@ -208,8 +281,7 @@ async def strip_silence( ] # output args args += ["-f", fmt, "-"] - async with AsyncProcess(args, stdin=True, stdout=True) as proc: - stripped_data, _ = await proc.communicate(audio_data) + _returncode, stripped_data, _stderr = await communicate(args, audio_data) # return stripped audio bytes_stripped = len(audio_data) - len(stripped_data) @@ -643,7 +715,7 @@ async def get_ffmpeg_stream( extra_args: list[str] | None = None, chunk_size: int | None = None, extra_input_args: list[str] | None = None, - name: str = "ffmpeg", + logger: logging.Logger | None = None, ) -> AsyncGenerator[bytes, None]: """ Get the ffmpeg audio stream as async generator. @@ -658,7 +730,7 @@ async def get_ffmpeg_stream( filter_params=filter_params, extra_args=extra_args, extra_input_args=extra_input_args, - name=name, + logger=logger, ) as ffmpeg_proc: # read final chunks from stdout iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any() @@ -895,11 +967,28 @@ def get_ffmpeg_args( output_args = ["-f", "wav", output_path] else: # use explicit format identifier for all other - output_args = ["-f", output_format.content_type.value, output_path] + output_args = [ + "-f", + output_format.content_type.value, + "-ar", + str(output_format.sample_rate), + output_path, + ] - # prefer libsoxr high quality resampler (if present) for sample rate conversions - if input_format.sample_rate != output_format.sample_rate and libsoxr_support: - filter_params.append("aresample=resampler=soxr") + # determine if we need to do resampling + if ( + input_format.sample_rate != output_format.sample_rate + or input_format.bit_depth != output_format.bit_depth + ): + # prefer resampling with libsoxr due to its high quality + resample_filter = f'aresample=resampler={"soxr" if libsoxr_support else "swr"}' + if output_format.bit_depth < input_format.bit_depth: + # apply dithering when going down to 16 bits + resample_filter += ":osf=s16:dither_method=triangular_hp" + if not output_format.content_type.is_pcm(): + # specify sample rate if output format is not pcm + resample_filter += f":osr={output_format.sample_rate}" + filter_params.append(resample_filter) if filter_params and "-filter_complex" not in extra_args: extra_args += ["-af", ",".join(filter_params)] @@ -924,4 +1013,5 @@ def parse_loudnorm(raw_stderr: bytes | str) -> LoudnessMeasurement | None: true_peak=float(loudness_data["input_tp"]), lra=float(loudness_data["input_lra"]), threshold=float(loudness_data["input_thresh"]), + target_offset=float(loudness_data["target_offset"]), ) diff --git a/music_assistant/server/helpers/database.py b/music_assistant/server/helpers/database.py index a15148d4..9da98c67 100644 --- a/music_assistant/server/helpers/database.py +++ b/music_assistant/server/helpers/database.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio from typing import TYPE_CHECKING, Any import aiosqlite @@ -172,6 +173,7 @@ class DatabaseConnection: yield item if len(next_items) < limit: break + await asyncio.sleep(0) # yield to eventloop offset += limit async def vacuum(self) -> None: diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index ca6e7139..83cb3226 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -17,7 +17,7 @@ from collections.abc import AsyncGenerator from contextlib import suppress from signal import SIGINT from types import TracebackType -from typing import TYPE_CHECKING +from typing import Self from music_assistant.constants import MASS_LOGGER_NAME @@ -41,9 +41,9 @@ class AsyncProcess: def __init__( self, args: list[str], - stdin: bool | int | AsyncGenerator[bytes, None] | None = None, + stdin: bool | int | None = None, stdout: bool | int | None = None, - stderr: bool | int | None = None, + stderr: bool | int | None = False, name: str | None = None, ) -> None: """Initialize AsyncProcess.""" @@ -51,7 +51,6 @@ class AsyncProcess: if name is None: name = args[0].split(os.sep)[-1] self.name = name - self.attached_tasks: list[asyncio.Task] = [] self.logger = LOGGER.getChild(name) self._args = args self._stdin = None if stdin is False else stdin @@ -76,7 +75,7 @@ class AsyncProcess: self._returncode = ret_code return ret_code - async def __aenter__(self) -> AsyncProcess: + async def __aenter__(self) -> Self: """Enter context manager.""" await self.start() return self @@ -96,9 +95,7 @@ class AsyncProcess: """Perform Async init of process.""" self.proc = await asyncio.create_subprocess_exec( *self._args, - stdin=asyncio.subprocess.PIPE - if (self._stdin is True or isinstance(self._stdin, AsyncGenerator)) - else self._stdin, + stdin=asyncio.subprocess.PIPE if self._stdin is True else self._stdin, stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout, stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr, # because we're exchanging big amounts of (audio) data with pipes @@ -107,12 +104,10 @@ class AsyncProcess: pipesize=1000000, ) self.logger.debug("Process %s started with PID %s", self.name, self.proc.pid) - if isinstance(self._stdin, AsyncGenerator): - self.attached_tasks.append(asyncio.create_task(self._feed_stdin())) async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks of n size from the process stdout.""" - while not self._close_called: + while True: chunk = await self.readexactly(n) if chunk == b"": break @@ -120,7 +115,7 @@ class AsyncProcess: async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks as they come in from process stdout.""" - while not self._close_called: + while True: chunk = await self.read(n) if chunk == b"": break @@ -145,7 +140,8 @@ class AsyncProcess: async def write(self, data: bytes) -> None: """Write data to process stdin.""" if self._close_called: - raise RuntimeError("write called while process already done") + self.logger.warning("write called while process already done") + return self.proc.stdin.write(data) with suppress(BrokenPipeError, ConnectionResetError): await self.proc.stdin.drain() @@ -167,34 +163,55 @@ class AsyncProcess: # already exited, race condition pass - async def close(self, send_signal: bool = False) -> int: + async def read_stderr(self) -> bytes: + """Read line from stderr.""" + try: + return await self.proc.stderr.readline() + except ValueError as err: + # we're waiting for a line (separator found), but the line was too big + # this may happen with ffmpeg during a long (radio) stream where progress + # gets outputted to the stderr but no newline + # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit + # NOTE: this consumes the line that was too big + if "chunk exceed the limit" in str(err): + return await self.proc.stderr.readline() + # raise for all other (value) errors + raise + + async def iter_stderr(self) -> AsyncGenerator[str, None]: + """Iterate lines from the stderr stream as string.""" + while True: + line = await self.read_stderr() + if line == b"": + break + line = line.decode().strip() + if not line: + continue + yield line + + async def close(self, send_signal: bool = False) -> None: """Close/terminate the process and wait for exit.""" self._close_called = True - # close any/all attached (writer) tasks - for task in self.attached_tasks: - if not task.done(): - task.cancel() if send_signal and self.returncode is None: self.proc.send_signal(SIGINT) - + if self.proc.stdin and not self.proc.stdin.is_closing(): + self.proc.stdin.close() + await asyncio.sleep(0) # yield to loop # abort existing readers on stderr/stdout first before we send communicate if self.proc.stdout and self.proc.stdout._waiter is not None: - self.proc.stdout._waiter.set_exception(asyncio.CancelledError()) - self.proc.stdout._waiter = None + with suppress(asyncio.exceptions.InvalidStateError): + self.proc.stdout._waiter.set_exception(asyncio.CancelledError()) if self.proc.stderr and self.proc.stderr._waiter is not None: - self.proc.stderr._waiter.set_exception(asyncio.CancelledError()) - self.proc.stderr._waiter = None + with suppress(asyncio.exceptions.InvalidStateError): + self.proc.stderr._waiter.set_exception(asyncio.CancelledError()) # make sure the process is really cleaned up. # especially with pipes this can cause deadlocks if not properly guarded # we need to ensure stdout and stderr are flushed and stdin closed - while True: + while self.returncode is None: try: - async with asyncio.timeout(5): - # use communicate to flush all pipe buffers - await self.proc.communicate() - if self.returncode is not None: - break + # use communicate to flush all pipe buffers + await asyncio.wait_for(self.proc.communicate(), 5) except TimeoutError: self.logger.debug( "Process %s with PID %s did not stop in time. Sending terminate...", @@ -208,7 +225,6 @@ class AsyncProcess: self.proc.pid, self.returncode, ) - return self.returncode async def wait(self) -> int: """Wait for the process and return the returncode.""" @@ -216,59 +232,9 @@ class AsyncProcess: self._returncode = await self.proc.wait() return self._returncode - async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]: - """Write bytes to process and read back results.""" - stdout, stderr = await self.proc.communicate(input_data) - self._returncode = self.proc.returncode - return (stdout, stderr) - - async def _feed_stdin(self) -> None: - """Feed stdin with chunks from an AsyncGenerator.""" - if TYPE_CHECKING: - self._stdin: AsyncGenerator[bytes, None] - try: - async for chunk in self._stdin: - if self._close_called or self.proc.stdin.is_closing(): - return - await self.write(chunk) - await self.write_eof() - except Exception as err: - if not isinstance(err, asyncio.CancelledError): - self.logger.exception(err) - # make sure the stdin generator is also properly closed - # by propagating a cancellederror within - task = asyncio.create_task(self._stdin.__anext__()) - task.cancel() - - async def read_stderr(self) -> bytes: - """Read line from stderr.""" - try: - return await self.proc.stderr.readline() - except ValueError as err: - # we're waiting for a line (separator found), but the line was too big - # this may happen with ffmpeg during a long (radio) stream where progress - # gets outputted to the stderr but no newline - # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit - # NOTE: this consumes the line that was too big - if "chunk exceed the limit" in str(err): - return await self.proc.stderr.readline() - # raise for all other (value) errors - raise - - async def iter_stderr(self) -> AsyncGenerator[str, None]: - """Iterate lines from the stderr stream as string.""" - while True: - line = await self.read_stderr() - if line == b"": - break - line = line.decode().strip() - if not line: - continue - yield line - async def check_output(args: str | list[str]) -> tuple[int, bytes]: - """Run subprocess and return output.""" + """Run subprocess and return returncode and output.""" if isinstance(args, str): proc = await asyncio.create_subprocess_shell( args, @@ -283,3 +249,26 @@ async def check_output(args: str | list[str]) -> tuple[int, bytes]: ) stdout, _ = await proc.communicate() return (proc.returncode, stdout) + + +async def communicate( + args: str | list[str], + input: bytes | None = None, # noqa: A002 +) -> tuple[int, bytes, bytes]: + """Communicate with subprocess and return returncode, stdout and stderr output.""" + if isinstance(args, str): + proc = await asyncio.create_subprocess_shell( + args, + stderr=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE if input is not None else None, + ) + else: + proc = await asyncio.create_subprocess_exec( + *args, + stderr=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE if input is not None else None, + ) + stdout, stderr = await proc.communicate(input) + return (proc.returncode, stdout, stderr) diff --git a/music_assistant/server/models/music_provider.py b/music_assistant/server/models/music_provider.py index e1b981c8..04e6a21e 100644 --- a/music_assistant/server/models/music_provider.py +++ b/music_assistant/server/models/music_provider.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio from typing import TYPE_CHECKING from music_assistant.common.models.enums import MediaType, ProviderFeature @@ -411,6 +412,7 @@ class MusicProvider(Provider): library_item.item_id, prov_item ) cur_db_ids.add(library_item.item_id) + await asyncio.sleep(0) # yield to eventloop except MusicAssistantError as err: self.logger.warning( "Skipping sync of item %s - error details: %s", prov_item.uri, str(err) @@ -442,6 +444,7 @@ class MusicProvider(Provider): else: # otherwise: just unmark favorite await controller.set_favorite(db_id, False) + await asyncio.sleep(0) # yield to eventloop await self.mass.cache.set(cache_key, list(cur_db_ids)) # DO NOT OVERRIDE BELOW diff --git a/music_assistant/server/models/player_provider.py b/music_assistant/server/models/player_provider.py index c85d0e35..1147d729 100644 --- a/music_assistant/server/models/player_provider.py +++ b/music_assistant/server/models/player_provider.py @@ -13,6 +13,7 @@ from music_assistant.common.models.config_entries import ( CONF_ENTRY_ANNOUNCE_VOLUME_MIN, CONF_ENTRY_ANNOUNCE_VOLUME_STRATEGY, CONF_ENTRY_AUTO_PLAY, + CONF_ENTRY_FLOW_MODE, CONF_ENTRY_HIDE_PLAYER, CONF_ENTRY_PLAYER_ICON, CONF_ENTRY_PLAYER_ICON_GROUP, @@ -48,6 +49,7 @@ class PlayerProvider(Provider): """Return all (provider/player specific) Config Entries for the given player (if any).""" entries = ( CONF_ENTRY_PLAYER_ICON, + CONF_ENTRY_FLOW_MODE, CONF_ENTRY_VOLUME_NORMALIZATION, CONF_ENTRY_AUTO_PLAY, CONF_ENTRY_VOLUME_NORMALIZATION_TARGET, diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 0ce6177d..55d0d1fc 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -42,11 +42,7 @@ from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia from music_assistant.common.models.player_queue import PlayerQueue from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL -from music_assistant.server.helpers.audio import ( - get_ffmpeg_args, - get_ffmpeg_stream, - get_player_filter_params, -) +from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params from music_assistant.server.helpers.process import AsyncProcess, check_output from music_assistant.server.models.player_provider import PlayerProvider @@ -204,6 +200,11 @@ class AirplayStream: self._started = asyncio.Event() self._stopped = False + @property + def running(self) -> bool: + """Return boolean if this stream is running.""" + return not self._stopped and self._started.is_set() + async def start(self, start_ntp: int, wait_start: int = 1000) -> None: """Initialize CLIRaop process for a player.""" extra_args = [] @@ -226,6 +227,23 @@ class AirplayStream: elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL): extra_args += ["-debug", "10"] + # create os pipes to pipe ffmpeg to cliraop + read, write = await asyncio.to_thread(os.pipe) + + # ffmpeg handles the player specific stream + filters and pipes + # audio to the cliraop process + self._ffmpeg_proc = FFMpeg( + audio_input="-", + input_format=self.input_format, + output_format=AIRPLAY_PCM_FORMAT, + filter_params=get_player_filter_params(self.mass, player_id), + audio_output=write, + logger=self.airplay_player.logger.getChild("ffmpeg"), + ) + await self._ffmpeg_proc.start() + await asyncio.to_thread(os.close, write) + + # cliraop is the binary that handles the actual raop streaming to the player cliraop_args = [ self.prov.cliraop_bin, "-ntpstart", @@ -246,30 +264,9 @@ class AirplayStream: self.airplay_player.address, "-", ] + self._cliraop_proc = AsyncProcess(cliraop_args, stdin=read, stderr=True, name="cliraop") if platform.system() == "Darwin": os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" - - # ffmpeg handles the player specific stream + filters and pipes - # audio to the cliraop process - read, write = await asyncio.to_thread(os.pipe) - ffmpeg_args = get_ffmpeg_args( - input_format=self.input_format, - output_format=AIRPLAY_PCM_FORMAT, - filter_params=get_player_filter_params(self.mass, player_id), - loglevel="fatal", - ) - self._ffmpeg_proc = AsyncProcess( - ffmpeg_args, - stdin=True, - stdout=write, - name="cliraop_ffmpeg", - ) - await self._ffmpeg_proc.start() - await asyncio.to_thread(os.close, write) - - self._cliraop_proc = AsyncProcess( - cliraop_args, stdin=read, stdout=False, stderr=True, name="cliraop" - ) await self._cliraop_proc.start() await asyncio.to_thread(os.close, read) self._started.set() @@ -388,7 +385,6 @@ class AirplayStream: logger.log(VERBOSE_LOG_LEVEL, line) # if we reach this point, the process exited - self.running = False if airplay_player.active_stream == self: mass_player.state = PlayerState.IDLE self.mass.players.update(airplay_player.player_id) @@ -562,6 +558,9 @@ class AirplayProvider(PlayerProvider): for airplay_player in self._get_sync_clients(player_id): if airplay_player.active_stream: tg.create_task(airplay_player.active_stream.stop()) + if mass_player := self.mass.players.get(airplay_player.player_id): + mass_player.state = PlayerState.IDLE + self.mass.players.update(mass_player.player_id) async def cmd_play(self, player_id: str) -> None: """Send PLAY (unpause) command to given player. @@ -685,7 +684,7 @@ class AirplayProvider(PlayerProvider): # get current ntp and start cliraop _, stdout = await check_output(f"{self.cliraop_bin} -ntp") start_ntp = int(stdout.strip()) - wait_start = 1000 + (500 * len(sync_clients)) + wait_start = 1250 + (250 * len(sync_clients)) async with asyncio.TaskGroup() as tg: for airplay_player in self._get_sync_clients(player_id): tg.create_task(airplay_player.active_stream.start(start_ntp, wait_start)) @@ -739,7 +738,9 @@ class AirplayProvider(PlayerProvider): # so we debounce the resync a bit here with a timer self.mass.call_later( 1, - self.mass.player_queues.resume(active_queue.queue_id, fade_in=False), + self.mass.player_queues.resume, + active_queue.queue_id, + fade_in=False, task_id=f"resume_{active_queue.queue_id}", ) else: @@ -774,14 +775,10 @@ class AirplayProvider(PlayerProvider): async def check_binary(cliraop_path: str) -> str | None: try: - cliraop = await asyncio.create_subprocess_exec( - *[cliraop_path, "-check"], - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, + returncode, output = await check_output( + [cliraop_path, "-check"], ) - stdout, _ = await cliraop.communicate() - stdout = stdout.strip().decode() - if cliraop.returncode == 0 and stdout == "cliraop check": + if returncode == 0 and output.strip().decode() == "cliraop check": self.cliraop_bin = cliraop_path return cliraop_path except OSError: diff --git a/music_assistant/server/providers/chromecast/__init__.py b/music_assistant/server/providers/chromecast/__init__.py index fd5511e8..564796d7 100644 --- a/music_assistant/server/providers/chromecast/__init__.py +++ b/music_assistant/server/providers/chromecast/__init__.py @@ -71,7 +71,7 @@ PLAYER_CONFIG_ENTRIES = ( CONF_ENTRY_CROSSFADE_DURATION, ) -MASS_APP_ID = "46C1A819" # use the cast receiver app from philippe44 for now until we get our own +MASS_APP_ID = "C35B0678" # Monkey patch the Media controller here to store the queue items @@ -491,11 +491,7 @@ class ChromecastProvider(PlayerProvider): castplayer.player.elapsed_time = status.current_time # active source - if ( - status.content_id - and self.mass.streams.base_url in status.content_id - and castplayer.player_id in status.content_id - ): + if castplayer.cc.app_id == MASS_APP_ID: castplayer.player.active_source = castplayer.player_id else: castplayer.player.active_source = castplayer.cc.app_display_name diff --git a/music_assistant/server/providers/filesystem_local/base.py b/music_assistant/server/providers/filesystem_local/base.py index ba57c40a..baf575b6 100644 --- a/music_assistant/server/providers/filesystem_local/base.py +++ b/music_assistant/server/providers/filesystem_local/base.py @@ -295,6 +295,7 @@ class FileSystemProviderBase(MusicProvider): provider=self.instance_id, name=item.name, ) + await asyncio.sleep(0) # yield to eventloop async def sync_library(self, media_types: tuple[MediaType, ...]) -> None: """Run library sync for this provider.""" @@ -308,6 +309,7 @@ class FileSystemProviderBase(MusicProvider): if x.provider_instance == self.instance_id ) prev_checksums[file_name] = db_item.metadata.checksum + await asyncio.sleep(0) # yield to eventloop # process all deleted (or renamed) files first cur_filenames = set() @@ -320,6 +322,7 @@ class FileSystemProviderBase(MusicProvider): # unsupported file extension continue cur_filenames.add(item.path) + await asyncio.sleep(0) # yield to eventloop # work out deletions deleted_files = set(prev_checksums.keys()) - cur_filenames await self._process_deletions(deleted_files) diff --git a/music_assistant/server/providers/filesystem_smb/__init__.py b/music_assistant/server/providers/filesystem_smb/__init__.py index 2ba4ea5d..9178fc0f 100644 --- a/music_assistant/server/providers/filesystem_smb/__init__.py +++ b/music_assistant/server/providers/filesystem_smb/__init__.py @@ -2,7 +2,6 @@ from __future__ import annotations -import asyncio import platform from collections.abc import AsyncGenerator from typing import TYPE_CHECKING @@ -14,6 +13,7 @@ from music_assistant.common.models.errors import LoginFailed from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME from music_assistant.server.helpers.audio import get_file_stream +from music_assistant.server.helpers.process import check_output from music_assistant.server.providers.filesystem_local import ( CONF_ENTRY_MISSING_ALBUM_ARTIST, LocalFileSystemProvider, @@ -204,21 +204,13 @@ class SMBFileSystemProvider(LocalFileSystemProvider): self.logger.info("Mounting //%s/%s%s to %s", server, share, subfolder, self.base_path) self.logger.debug("Using mount command: %s", mount_cmd.replace(password, "########")) - proc = await asyncio.create_subprocess_shell( - mount_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - _, stderr = await proc.communicate() - if proc.returncode != 0: - msg = f"SMB mount failed with error: {stderr.decode()}" + returncode, output = await check_output(mount_cmd) + if returncode != 0: + msg = f"SMB mount failed with error: {output.decode()}" raise LoginFailed(msg) async def unmount(self, ignore_error: bool = False) -> None: """Unmount the remote share.""" - proc = await asyncio.create_subprocess_shell( - f"umount {self.base_path}", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - _, stderr = await proc.communicate() - if proc.returncode != 0 and not ignore_error: - self.logger.warning("SMB unmount failed with error: %s", stderr.decode()) + returncode, output = await check_output(f"umount {self.base_path}") + if returncode != 0 and not ignore_error: + self.logger.warning("SMB unmount failed with error: %s", output.decode()) diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index c4d3dacf..e15fc699 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -533,7 +533,9 @@ class SlimprotoProvider(PlayerProvider): # so we debounce the resync a bit here with a timer self.mass.call_later( 1, - self.mass.player_queues.resume(active_queue.queue_id, fade_in=False), + self.mass.player_queues.resume, + active_queue.queue_id, + fade_in=False, task_id=f"resume_{active_queue.queue_id}", ) else: diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index a180dc96..330933f6 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -380,8 +380,8 @@ class SnapCastProvider(PlayerProvider): input_format=input_format, output_format=DEFAULT_SNAPCAST_FORMAT, filter_params=get_player_filter_params(self.mass, player_id), - name="snapcast_ffmpeg", audio_output=f"tcp://{host}:{port}", + logger=self.logger.getChild("ffmpeg"), ) as ffmpeg_proc: await ffmpeg_proc.wait() # we need to wait a bit for the stream status to become idle @@ -505,7 +505,7 @@ class SnapCastProvider(PlayerProvider): "--tcp.enabled=true", "--tcp.port=1705", ] - async with AsyncProcess(args, stdin=False, stdout=True, stderr=False) as snapserver_proc: + async with AsyncProcess(args, stdout=True, name="snapserver") as snapserver_proc: # keep reading from stdout until exit async for data in snapserver_proc.iter_any(): data = data.decode().strip() # noqa: PLW2901 diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index 9bf688ae..85fff758 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -47,7 +47,7 @@ from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME from music_assistant.server.helpers.app_vars import app_var # pylint: enable=no-name-in-module -from music_assistant.server.helpers.process import AsyncProcess +from music_assistant.server.helpers.process import AsyncProcess, check_output from music_assistant.server.models.music_provider import MusicProvider if TYPE_CHECKING: @@ -430,7 +430,7 @@ class SpotifyProvider(MusicProvider): if self._ap_workaround: args += ["--ap-port", "12345"] bytes_sent = 0 - async with AsyncProcess(args, stdout=True) as librespot_proc: + async with AsyncProcess(args, stdout=True, name="librespot") as librespot_proc: async for chunk in librespot_proc.iter_any(): yield chunk bytes_sent += len(chunk) @@ -677,9 +677,8 @@ class SpotifyProvider(MusicProvider): ] if self._ap_workaround: args += ["--ap-port", "12345"] - async with AsyncProcess(args, stdout=True) as librespot: - stdout = await librespot.read(-1) - if stdout.decode().strip() != "authorized": + _returncode, output = await check_output(args) + if _returncode == 0 and output.decode().strip() != "authorized": raise LoginFailed(f"Login failed for username {self.config.get_value(CONF_USERNAME)}") # get token with (authorized) librespot scopes = [ @@ -713,16 +712,15 @@ class SpotifyProvider(MusicProvider): ] if self._ap_workaround: args += ["--ap-port", "12345"] - async with AsyncProcess(args, stdout=True) as librespot: - stdout = await librespot.read(-1) + _returncode, output = await check_output(args) duration = round(time.time() - time_start, 2) try: - result = json.loads(stdout) + result = json.loads(output) except JSONDecodeError: self.logger.warning( "Error while retrieving Spotify token after %s seconds, details: %s", duration, - stdout.decode(), + output.decode(), ) return None self.logger.debug( @@ -847,15 +845,8 @@ class SpotifyProvider(MusicProvider): async def check_librespot(librespot_path: str) -> str | None: try: - librespot = await asyncio.create_subprocess_exec( - *[librespot_path, "--check"], stdout=asyncio.subprocess.PIPE - ) - stdout, _ = await librespot.communicate() - if ( - librespot.returncode == 0 - and b"ok spotty" in stdout - and b"using librespot" in stdout - ): + returncode, output = await check_output([librespot_path, "--check"]) + if returncode == 0 and b"ok spotty" in output and b"using librespot" in output: self._librespot_bin = librespot_path return librespot_path except OSError: -- 2.34.1