From 94f73ebe58fa8482891a2e24245bbaa5b643d847 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 19 Feb 2021 00:32:25 +0100 Subject: [PATCH] more robust handling of disconnected clients --- .github/workflows/test.yml | 2 +- music_assistant/__main__.py | 2 +- music_assistant/constants.py | 5 +- music_assistant/helpers/process.py | 34 +++---- music_assistant/managers/config.py | 10 --- music_assistant/managers/database.py | 18 ++++ music_assistant/managers/players.py | 25 +++--- music_assistant/managers/streams.py | 90 +++++++++++-------- music_assistant/mass.py | 6 +- music_assistant/models/player.py | 7 +- music_assistant/models/player_queue.py | 1 + .../providers/chromecast/player.py | 34 ------- .../providers/universal_group/__init__.py | 2 +- music_assistant/translations.json | 4 - 14 files changed, 108 insertions(+), 132 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 410d7058..28287732 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.7, 3.8] + python-version: [3.8, 3.9] steps: - uses: actions/checkout@v2 diff --git a/music_assistant/__main__.py b/music_assistant/__main__.py index e6fbb2f9..b33253fc 100755 --- a/music_assistant/__main__.py +++ b/music_assistant/__main__.py @@ -56,7 +56,7 @@ def main(): if not os.path.isdir(data_dir): os.makedirs(data_dir) # config debug settings if needed - if args.debug: + if args.debug or bool(os.environ.get("DEBUG")): logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 94803136..2a9a08f6 100755 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -1,7 +1,7 @@ """All constants for Music Assistant.""" -__version__ = "0.1.2" -REQUIRED_PYTHON_VER = "3.7" +__version__ = "0.1.3" +REQUIRED_PYTHON_VER = "3.8" # configuration keys/attributes CONF_USERNAME = "username" @@ -13,7 +13,6 @@ CONF_TOKEN = "token" CONF_URL = "url" CONF_NAME = "name" CONF_CROSSFADE_DURATION = "crossfade_duration" -CONF_FALLBACK_GAIN_CORRECT = "fallback_gain_correct" CONF_GROUP_DELAY = "group_delay" CONF_VOLUME_CONTROL = "volume_control" CONF_POWER_CONTROL = "power_control" diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index 79a740f8..661de90e 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -22,27 +22,27 @@ class AsyncProcess: self._proc = None self._process_args = process_args self._enable_write = enable_write - self._cancelled = False async def __aenter__(self) -> "AsyncProcess": """Enter context manager.""" self._proc = await asyncio.create_subprocess_exec( *self._process_args, stdin=asyncio.subprocess.PIPE if self._enable_write else None, - stdout=asyncio.subprocess.PIPE, - limit=4000000 + stdout=asyncio.subprocess.PIPE ) return self async def __aexit__(self, exc_type, exc_value, traceback) -> bool: """Exit context manager.""" - self._cancelled = True if self._proc.returncode is None: # prevent subprocess deadlocking, send terminate and read remaining bytes - if self._enable_write and self._proc.stdin.can_write_eof(): - self._proc.stdin.write_eof() - self._proc.terminate() - await self._proc.stdout.read() + await self.write_eof() + try: + self._proc.terminate() + await self._proc.stdout.read() + self._proc.kill() + except (ProcessLookupError, BrokenPipeError): + pass del self._proc async def iterate_chunks( @@ -57,8 +57,6 @@ class AsyncProcess: async def read(self, chunk_size: int = DEFAULT_CHUNKSIZE) -> bytes: """Read x bytes from the process stdout.""" - if self._cancelled: - raise asyncio.CancelledError() try: return await self._proc.stdout.readexactly(chunk_size) except asyncio.IncompleteReadError as err: @@ -66,25 +64,21 @@ class AsyncProcess: async def write(self, data: bytes) -> None: """Write data to process stdin.""" - if self._cancelled or not self._proc: - raise asyncio.CancelledError() - if self._proc.stdin.is_closing(): - raise asyncio.CancelledError() - self._proc.stdin.write(data) try: + self._proc.stdin.write(data) await self._proc.stdin.drain() except BrokenPipeError: pass async def write_eof(self) -> None: """Write eof to process.""" - if self._cancelled: - raise asyncio.CancelledError() - if self._proc.stdin.can_write_eof(): + if not (self._enable_write and self._proc.stdin.can_write_eof()): + return + try: self._proc.stdin.write_eof() + except BrokenPipeError: + pass async def communicate(self, input_data: Optional[bytes] = None) -> bytes: """Write bytes to process and read back results.""" - if self._cancelled: - raise asyncio.CancelledError() return await self._proc.communicate(input_data) diff --git a/music_assistant/managers/config.py b/music_assistant/managers/config.py index d64d37d3..53f2a921 100755 --- a/music_assistant/managers/config.py +++ b/music_assistant/managers/config.py @@ -11,7 +11,6 @@ from typing import Any, List from music_assistant.constants import ( CONF_CROSSFADE_DURATION, CONF_ENABLED, - CONF_FALLBACK_GAIN_CORRECT, CONF_GROUP_DELAY, CONF_KEY_BASE, CONF_KEY_METADATA_PROVIDERS, @@ -80,15 +79,6 @@ DEFAULT_PLAYER_CONFIG_ENTRIES = [ description="desc_target_volume", depends_on=CONF_VOLUME_NORMALISATION, ), - ConfigEntry( - entry_key=CONF_FALLBACK_GAIN_CORRECT, - entry_type=ConfigEntryType.INT, - range=(-20, 0), - default_value=-12, - label=CONF_FALLBACK_GAIN_CORRECT, - description="desc_gain_correct", - depends_on=CONF_VOLUME_NORMALISATION, - ), ConfigEntry( entry_key=CONF_CROSSFADE_DURATION, entry_type=ConfigEntryType.INT, diff --git a/music_assistant/managers/database.py b/music_assistant/managers/database.py index a3151218..a6a9aeb9 100755 --- a/music_assistant/managers/database.py +++ b/music_assistant/managers/database.py @@ -2,6 +2,7 @@ # pylint: disable=too-many-lines import logging import os +import statistics from datetime import datetime from typing import List, Optional, Set, Union @@ -842,6 +843,23 @@ class DatabaseManager: return result[0] return None + async def get_provider_loudness(self, provider) -> Optional[float]: + """Get average integrated loudness for tracks of given provider.""" + all_items = [] + async with aiosqlite.connect(self._dbfile, timeout=120) as db_conn: + sql_query = """SELECT loudness FROM track_loudness WHERE provider = ?""" + async with db_conn.execute(sql_query, (provider,)) as cursor: + result = await cursor.fetchone() + if result: + return result[0] + sql_query = """SELECT loudness FROM track_loudness WHERE provider = ?""" + async with aiosqlite.connect(self._dbfile, timeout=120) as db_conn: + for db_row in await db_conn.execute_fetchall(sql_query, (provider,)): + all_items.append(db_row[0]) + if all_items: + return statistics.fmean(all_items) + return None + async def mark_item_played(self, item_id: str, provider: str): """Mark item as played in playlog.""" timestamp = datetime.utcnow().timestamp() diff --git a/music_assistant/managers/players.py b/music_assistant/managers/players.py index bcdc462e..f3765efb 100755 --- a/music_assistant/managers/players.py +++ b/music_assistant/managers/players.py @@ -36,7 +36,6 @@ class PlayerManager: self._players = {} self._providers = {} self._player_queues = {} - self._poll_ticks = 0 self._controls = {} async def setup(self) -> None: @@ -50,21 +49,15 @@ class PlayerManager: for player in self: await player.on_remove() - @run_periodic(1) + @run_periodic(30) async def poll_task(self): """Check for updates on players that need to be polled.""" for player in self: if not player.player_state.available: continue - if player.should_poll and ( - self._poll_ticks >= POLL_INTERVAL - or player.state == PlaybackState.Playing - ): - await player.on_poll() - if self._poll_ticks >= POLL_INTERVAL: - self._poll_ticks = 0 - else: - self._poll_ticks += 1 + if not player.should_poll: + continue + await player.on_poll() @property def players(self) -> Dict[str, Player]: @@ -711,13 +704,15 @@ class PlayerManager: if not player_conf["volume_normalisation"]: return 0 target_gain = int(player_conf["target_volume"]) - fallback_gain = int(player_conf["fallback_gain_correct"]) track_loudness = await self.mass.database.get_track_loudness( item_id, provider_id ) if track_loudness is None: - gain_correct = fallback_gain - else: - gain_correct = target_gain - track_loudness + # fallback to provider average + track_loudness = await self.mass.database.get_provider_loudness(provider_id) + if track_loudness is None: + # fallback to some (hopefully sane) average value for now + track_loudness = -8.5 + gain_correct = target_gain - track_loudness gain_correct = round(gain_correct, 2) return gain_correct diff --git a/music_assistant/managers/streams.py b/music_assistant/managers/streams.py index 4900bb84..f24affa5 100755 --- a/music_assistant/managers/streams.py +++ b/music_assistant/managers/streams.py @@ -7,7 +7,6 @@ of music with crossfade/gapless support (queue stream). All audio is processed by the SoX executable, using various subprocess streams. """ import asyncio -import gc import logging import shlex import subprocess @@ -54,7 +53,7 @@ class StreamManager: output_format: SoxOutputFormat = SoxOutputFormat.FLAC, resample: Optional[int] = None, gain_db_adjust: Optional[float] = None, - chunk_size: int = 4000000, + chunk_size: int = 512000, ) -> AsyncGenerator[Tuple[bool, bytes], None]: """Get the sox manipulated audio data for the given streamdetails.""" # collect all args for sox @@ -93,21 +92,28 @@ class StreamManager: fill_buffer_task = self.mass.loop.create_task(fill_buffer()) # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly - prev_chunk = b"" - async for chunk in sox_proc.iterate_chunks(chunk_size): - if len(chunk) < chunk_size: - # last chunk - yield (True, prev_chunk + chunk) - break - if prev_chunk: - yield (False, prev_chunk) - prev_chunk = chunk - await asyncio.wait([fill_buffer_task]) - LOGGER.debug( - "finished sox stream for: %s/%s", - streamdetails.provider, - streamdetails.item_id, - ) + try: + prev_chunk = b"" + async for chunk in sox_proc.iterate_chunks(chunk_size): + if prev_chunk: + yield (False, prev_chunk) + prev_chunk = chunk + # send last chunk + yield (True, prev_chunk) + except (asyncio.CancelledError, GeneratorExit) as err: + LOGGER.debug( + "get_sox_stream aborted for: %s/%s", + streamdetails.provider, + streamdetails.item_id, + ) + fill_buffer_task.cancel() + raise err + else: + LOGGER.debug( + "finished sox stream for: %s/%s", + streamdetails.provider, + streamdetails.item_id, + ) async def queue_stream_flac(self, player_id) -> AsyncGenerator[bytes, None]: """Stream the PlayerQueue's tracks as constant feed in flac format.""" @@ -138,9 +144,21 @@ class StreamManager: fill_buffer_task = self.mass.loop.create_task(fill_buffer()) # start yielding audio chunks - async for chunk in sox_proc.iterate_chunks(): - yield chunk - await asyncio.wait([fill_buffer_task]) + try: + async for chunk in sox_proc.iterate_chunks(): + yield chunk + except (asyncio.CancelledError, GeneratorExit) as err: + LOGGER.debug( + "queue_stream_flac aborted for: %s", + player_id, + ) + fill_buffer_task.cancel() + raise err + else: + LOGGER.debug( + "finished queue_stream_flac for: %s", + player_id, + ) async def queue_stream_pcm( self, player_id, sample_rate=96000, bit_depth=32 @@ -204,10 +222,10 @@ class StreamManager: # HANDLE FIRST PART OF TRACK if not chunk and bytes_written == 0: # stream error: got empy first chunk - # prevent player queue get stuck by sending next track command - self.mass.add_job(player_queue.next()) LOGGER.error("Stream error on track %s", queue_track.item_id) - return + # prevent player queue get stuck by just skipping to the next track + queue_track.duration = 0 + continue if cur_chunk <= 2 and not last_fadeout_data: # no fadeout_part available so just pass it to the output directly yield chunk @@ -253,15 +271,17 @@ class StreamManager: # part is too short after the strip action # so we just use the entire original data last_part = prev_chunk + chunk - if len(last_part) < buffer_size: - LOGGER.warning( - "Not enough data for crossfade: %s", len(last_part) - ) if ( not player_queue.crossfade_enabled or len(last_part) < buffer_size ): - # crossfading is not enabled so just pass the (stripped) audio data + # crossfading is not enabled or not enough data, + # so just pass the (stripped) audio data + if not player_queue.crossfade_enabled: + LOGGER.warning( + "Not enough data for crossfade: %s", len(last_part) + ) + yield last_part bytes_written += len(last_part) del last_part @@ -272,8 +292,9 @@ class StreamManager: last_fadeout_data = last_part[-buffer_size:] remaining_bytes = last_part[:-buffer_size] # write remaining bytes - yield remaining_bytes - bytes_written += len(remaining_bytes) + if remaining_bytes: + yield remaining_bytes + bytes_written += len(remaining_bytes) del last_part del remaining_bytes del chunk @@ -299,14 +320,11 @@ class StreamManager: queue_track.name, player_id, ) - # run garbage collect manually to avoid too much memory fragmentation - self.mass.add_job(gc.collect) # end of queue reached, pass last fadeout bits to final output - yield last_fadeout_data + if last_fadeout_data: + yield last_fadeout_data del last_fadeout_data # END OF QUEUE STREAM - # run garbage collect manually to avoid too much memory fragmentation - self.mass.add_job(gc.collect) LOGGER.info("streaming of queue for player %s completed", player_id) async def stream_queue_item( @@ -364,7 +382,7 @@ class StreamManager: # stream from URL if stream_type == StreamType.URL: async with self.mass.http_session.get(stream_path) as response: - async for chunk in response.content.iter_any(): + async for chunk, _ in response.content.iter_chunks(): yield chunk if needs_analyze and len(audio_data) < 100000000: audio_data += chunk diff --git a/music_assistant/mass.py b/music_assistant/mass.py index 7fe71d70..253324e0 100644 --- a/music_assistant/mass.py +++ b/music_assistant/mass.py @@ -34,9 +34,7 @@ LOGGER = logging.getLogger("mass") def global_exception_handler(loop: asyncio.AbstractEventLoop, context: Dict) -> None: """Global exception handler.""" - LOGGER.exception( - "Caught exception: %s", context.get("exception", context["message"]) - ) + LOGGER.debug("Caught exception: %s", context.get("exception", context["message"])) if "Broken pipe" in str(context.get("exception")): # fix for the spamming subprocess return @@ -225,7 +223,7 @@ class MusicAssistant: include_unavailable: bool = False, ) -> Tuple[Provider]: """Return all providers, optionally filtered by type.""" - return ( + return tuple( item for item in self._providers.values() if (filter_type is None or item.type == filter_type) diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index ce311be7..ccfb1420 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -97,7 +97,7 @@ class PlayerState(DataClassDictMixin): is_group_player: bool = False group_childs: Set[str] = field(default_factory=set) device_info: DeviceInfo = field(default_factory=DeviceInfo) - updated_at: datetime = None + updated_at: datetime = datetime.now() group_parents: Set[str] = field(default_factory=set) features: Set[PlayerFeature] = field(default_factory=set) active_queue: str = None @@ -116,6 +116,8 @@ class PlayerState(DataClassDictMixin): setattr(self, key, new_val) if key != "updated_at": changed_keys.add(key) + if changed_keys: + self.updated_at = datetime.now() return changed_keys @@ -502,7 +504,7 @@ class Player: provider_id=self.provider_id, name=self._get_name(), powered=self._get_powered(), - state=self.state, + state=self._get_state(), available=self._get_available(), volume_level=self._get_volume_level(), elapsed_time=self.elapsed_time, @@ -513,7 +515,6 @@ class Player: group_parents=self._get_group_parents(), features=self.features, active_queue=self._get_active_queue(), - updated_at=datetime.now(), ) def to_dict(self) -> dict: diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 7daf74d2..83e0496c 100755 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -536,6 +536,7 @@ class PlayerQueue: """Instance attributes as dict so it can be serialized to json.""" return { "queue_id": self.player.player_id, + "queue_name": self.player.player_state.name, "shuffle_enabled": self.shuffle_enabled, "repeat_enabled": self.repeat_enabled, "crossfade_enabled": self.crossfade_enabled, diff --git a/music_assistant/providers/chromecast/player.py b/music_assistant/providers/chromecast/player.py index 185996f0..10ca0fff 100644 --- a/music_assistant/providers/chromecast/player.py +++ b/music_assistant/providers/chromecast/player.py @@ -1,7 +1,6 @@ """Representation of a Cast device on the network.""" import logging import uuid -from datetime import datetime from typing import List, Optional import pychromecast @@ -112,30 +111,6 @@ class ChromecastPlayer(Player): # Not playing, return last reported seek time return self.media_status.current_time - @property - def elapsed_milliseconds(self) -> int: - """Return (realtime) elapsed time of current playing media in milliseconds.""" - if self.media_status is None or not ( - self.media_status.player_is_playing - or self.media_status.player_is_paused - or self.media_status.player_is_idle - ): - return 0 - if self.media_status.player_is_playing: - # Add time since last update - return int( - ( - self.media_status.current_time - + ( - datetime.utcnow().timestamp() - - self.media_status.last_updated.timestamp() - ) - ) - * 1000 - ) - # Not playing, return last reported seek time - return self.media_status.current_time * 1000 - @property def available(self) -> bool: """Return availablity state of this player.""" @@ -300,15 +275,6 @@ class ChromecastPlayer(Player): if self._cast_info.is_audio_group and new_available: self.mass.add_job(self._chromecast.mz_controller.update_members) - async def on_poll(self) -> None: - """Call when player is periodically polled by the player manager (should_poll=True).""" - if self.active_queue.startswith("group_player"): - # the group player wants very accurate elapsed_time state so we request it very often - await self.chromecast_command( - self._chromecast.media_controller.update_status - ) - self.update_state() - # ========== Service Calls ========== async def cmd_stop(self) -> None: diff --git a/music_assistant/providers/universal_group/__init__.py b/music_assistant/providers/universal_group/__init__.py index 920798dc..c8ce97ff 100644 --- a/music_assistant/providers/universal_group/__init__.py +++ b/music_assistant/providers/universal_group/__init__.py @@ -351,7 +351,7 @@ class GroupPlayer(Player): async def queue_stream_task(self): """Handle streaming queue to connected child players.""" ticks = 0 - while ticks < 60 and len(self.connected_clients) != len(self.group_childs): + while ticks < 60 and (len(self.connected_clients) != len(self.group_childs)): # TODO: Support situation where not all clients of the group are powered await asyncio.sleep(0.1) ticks += 1 diff --git a/music_assistant/translations.json b/music_assistant/translations.json index 282baa57..47ee13b9 100644 --- a/music_assistant/translations.json +++ b/music_assistant/translations.json @@ -9,7 +9,6 @@ "max_sample_rate": "Maximum sample rate", "volume_normalisation": "Enable Volume normalisation", "target_volume": "Target Volume level", - "fallback_gain_correct": "Fallback gain correction level", "desc_player_name": "Set a custom name for this player.", "crossfade_duration": "Enable crossfade", "group_delay": "Correction of groupdelay", @@ -21,7 +20,6 @@ "desc_sample_rate": "Set the maximum sample rate this player can handle.", "desc_volume_normalisation": "Enable R128 volume normalisation to play music at an equally loud volume.", "desc_target_volume": "Set the preferred target volume level in LUFS. The R128 default is -22 LUFS.", - "desc_gain_correct": "Set a fallback gain correction when there is no R128 measurement available.", "desc_crossfade": "Enable crossfading of Queue tracks by setting a crossfade duration in seconds.", "desc_enable_provider": "Enable this provider.", "desc_base_username": "Username to access this Music Assistant server.", @@ -40,7 +38,6 @@ "max_sample_rate": "Maximale sample rate", "volume_normalisation": "Volume normalisering inschakelen", "target_volume": "Doel volume", - "fallback_gain_correct": "Fallback gain correctie niveau", "desc_player_name": "Stel een aangepaste naam in voor deze speler.", "crossfade_duration": "Crossfade inschakelen", "security": "Beveiliging", @@ -52,7 +49,6 @@ "desc_sample_rate": "Stel de maximale sample rate in die deze speler aankan.", "desc_volume_normalisation": "R128 volume normalisatie inschakelen om muziek altijd op een gelijk volume af te spelen.", "desc_target_volume": "Selecteer het gewenste doelvolume in LUFS. De R128 standaard is -22 LUFS.", - "desc_gain_correct": "Stel een fallback gain correctie in als er geen R128 meting beschikbaar is.", "desc_crossfade": "Crossfade inschakelen door het instellen van een crossfade duur in seconden.", "desc_enable_provider": "Deze provider inschakelen.", "desc_base_username": "Gebruikersnaam waarmee deze server beveiligd moet worden.", -- 2.34.1