more robust handling of disconnected clients
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 18 Feb 2021 23:32:25 +0000 (00:32 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 18 Feb 2021 23:32:25 +0000 (00:32 +0100)
14 files changed:
.github/workflows/test.yml
music_assistant/__main__.py
music_assistant/constants.py
music_assistant/helpers/process.py
music_assistant/managers/config.py
music_assistant/managers/database.py
music_assistant/managers/players.py
music_assistant/managers/streams.py
music_assistant/mass.py
music_assistant/models/player.py
music_assistant/models/player_queue.py
music_assistant/providers/chromecast/player.py
music_assistant/providers/universal_group/__init__.py
music_assistant/translations.json

index 410d70581769f8f31aadc12c1e7bb2c5815d56f9..28287732930ac4ef3747b2762fd918848f907501 100644 (file)
@@ -14,7 +14,7 @@ jobs:
     runs-on: ubuntu-latest
     strategy:
       matrix:
-        python-version: [3.7, 3.8]
+        python-version: [3.8, 3.9]
 
     steps:
       - uses: actions/checkout@v2
index e6fbb2f97bbb3ef33200897716180c71df9912ce..b33253fc1ccdcb7970a8fc62d89a72fc9f9d1bfa 100755 (executable)
@@ -56,7 +56,7 @@ def main():
     if not os.path.isdir(data_dir):
         os.makedirs(data_dir)
     # config debug settings if needed
-    if args.debug:
+    if args.debug or bool(os.environ.get("DEBUG")):
         logger.setLevel(logging.DEBUG)
     else:
         logger.setLevel(logging.INFO)
index 948031368cb220405d08834b4a10414e28542cb0..2a9a08f683d7c69a92dfc7e374bf0ff64dc753dc 100755 (executable)
@@ -1,7 +1,7 @@
 """All constants for Music Assistant."""
 
-__version__ = "0.1.2"
-REQUIRED_PYTHON_VER = "3.7"
+__version__ = "0.1.3"
+REQUIRED_PYTHON_VER = "3.8"
 
 # configuration keys/attributes
 CONF_USERNAME = "username"
@@ -13,7 +13,6 @@ CONF_TOKEN = "token"
 CONF_URL = "url"
 CONF_NAME = "name"
 CONF_CROSSFADE_DURATION = "crossfade_duration"
-CONF_FALLBACK_GAIN_CORRECT = "fallback_gain_correct"
 CONF_GROUP_DELAY = "group_delay"
 CONF_VOLUME_CONTROL = "volume_control"
 CONF_POWER_CONTROL = "power_control"
index 79a740f84ecb1d2c91b9394533e934b764efaa5d..661de90ed15789d575a8e6f8cc2ffc1498b1e3bb 100644 (file)
@@ -22,27 +22,27 @@ class AsyncProcess:
         self._proc = None
         self._process_args = process_args
         self._enable_write = enable_write
-        self._cancelled = False
 
     async def __aenter__(self) -> "AsyncProcess":
         """Enter context manager."""
         self._proc = await asyncio.create_subprocess_exec(
             *self._process_args,
             stdin=asyncio.subprocess.PIPE if self._enable_write else None,
-            stdout=asyncio.subprocess.PIPE,
-            limit=4000000
+            stdout=asyncio.subprocess.PIPE
         )
         return self
 
     async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
         """Exit context manager."""
-        self._cancelled = True
         if self._proc.returncode is None:
             # prevent subprocess deadlocking, send terminate and read remaining bytes
-            if self._enable_write and self._proc.stdin.can_write_eof():
-                self._proc.stdin.write_eof()
-            self._proc.terminate()
-            await self._proc.stdout.read()
+            await self.write_eof()
+            try:
+                self._proc.terminate()
+                await self._proc.stdout.read()
+                self._proc.kill()
+            except (ProcessLookupError, BrokenPipeError):
+                pass
         del self._proc
 
     async def iterate_chunks(
@@ -57,8 +57,6 @@ class AsyncProcess:
 
     async def read(self, chunk_size: int = DEFAULT_CHUNKSIZE) -> bytes:
         """Read x bytes from the process stdout."""
-        if self._cancelled:
-            raise asyncio.CancelledError()
         try:
             return await self._proc.stdout.readexactly(chunk_size)
         except asyncio.IncompleteReadError as err:
@@ -66,25 +64,21 @@ class AsyncProcess:
 
     async def write(self, data: bytes) -> None:
         """Write data to process stdin."""
-        if self._cancelled or not self._proc:
-            raise asyncio.CancelledError()
-        if self._proc.stdin.is_closing():
-            raise asyncio.CancelledError()
-        self._proc.stdin.write(data)
         try:
+            self._proc.stdin.write(data)
             await self._proc.stdin.drain()
         except BrokenPipeError:
             pass
 
     async def write_eof(self) -> None:
         """Write eof to process."""
-        if self._cancelled:
-            raise asyncio.CancelledError()
-        if self._proc.stdin.can_write_eof():
+        if not (self._enable_write and self._proc.stdin.can_write_eof()):
+            return
+        try:
             self._proc.stdin.write_eof()
+        except BrokenPipeError:
+            pass
 
     async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
         """Write bytes to process and read back results."""
-        if self._cancelled:
-            raise asyncio.CancelledError()
         return await self._proc.communicate(input_data)
index d64d37d39c544cec577cd459c9f0c31fed3ef74f..53f2a9217158b2bfac6de7f1e5cd67d4aa1cb5e7 100755 (executable)
@@ -11,7 +11,6 @@ from typing import Any, List
 from music_assistant.constants import (
     CONF_CROSSFADE_DURATION,
     CONF_ENABLED,
-    CONF_FALLBACK_GAIN_CORRECT,
     CONF_GROUP_DELAY,
     CONF_KEY_BASE,
     CONF_KEY_METADATA_PROVIDERS,
@@ -80,15 +79,6 @@ DEFAULT_PLAYER_CONFIG_ENTRIES = [
         description="desc_target_volume",
         depends_on=CONF_VOLUME_NORMALISATION,
     ),
-    ConfigEntry(
-        entry_key=CONF_FALLBACK_GAIN_CORRECT,
-        entry_type=ConfigEntryType.INT,
-        range=(-20, 0),
-        default_value=-12,
-        label=CONF_FALLBACK_GAIN_CORRECT,
-        description="desc_gain_correct",
-        depends_on=CONF_VOLUME_NORMALISATION,
-    ),
     ConfigEntry(
         entry_key=CONF_CROSSFADE_DURATION,
         entry_type=ConfigEntryType.INT,
index a3151218f2c3a453700c3882424b8ba664d51496..a6a9aeb9165ab89ce3d83cf4b0c5b99ab2321c44 100755 (executable)
@@ -2,6 +2,7 @@
 # pylint: disable=too-many-lines
 import logging
 import os
+import statistics
 from datetime import datetime
 from typing import List, Optional, Set, Union
 
@@ -842,6 +843,23 @@ class DatabaseManager:
                 return result[0]
         return None
 
+    async def get_provider_loudness(self, provider) -> Optional[float]:
+        """Get average integrated loudness for tracks of given provider."""
+        all_items = []
+        async with aiosqlite.connect(self._dbfile, timeout=120) as db_conn:
+            sql_query = """SELECT loudness FROM track_loudness WHERE provider = ?"""
+            async with db_conn.execute(sql_query, (provider,)) as cursor:
+                result = await cursor.fetchone()
+            if result:
+                return result[0]
+        sql_query = """SELECT loudness FROM track_loudness WHERE provider = ?"""
+        async with aiosqlite.connect(self._dbfile, timeout=120) as db_conn:
+            for db_row in await db_conn.execute_fetchall(sql_query, (provider,)):
+                all_items.append(db_row[0])
+        if all_items:
+            return statistics.fmean(all_items)
+        return None
+
     async def mark_item_played(self, item_id: str, provider: str):
         """Mark item as played in playlog."""
         timestamp = datetime.utcnow().timestamp()
index bcdc462e01d7c79329d7c4a4c77858a60513a075..f3765efbaa50e499ce150611918c2958cfcdf420 100755 (executable)
@@ -36,7 +36,6 @@ class PlayerManager:
         self._players = {}
         self._providers = {}
         self._player_queues = {}
-        self._poll_ticks = 0
         self._controls = {}
 
     async def setup(self) -> None:
@@ -50,21 +49,15 @@ class PlayerManager:
         for player in self:
             await player.on_remove()
 
-    @run_periodic(1)
+    @run_periodic(30)
     async def poll_task(self):
         """Check for updates on players that need to be polled."""
         for player in self:
             if not player.player_state.available:
                 continue
-            if player.should_poll and (
-                self._poll_ticks >= POLL_INTERVAL
-                or player.state == PlaybackState.Playing
-            ):
-                await player.on_poll()
-        if self._poll_ticks >= POLL_INTERVAL:
-            self._poll_ticks = 0
-        else:
-            self._poll_ticks += 1
+            if not player.should_poll:
+                continue
+            await player.on_poll()
 
     @property
     def players(self) -> Dict[str, Player]:
@@ -711,13 +704,15 @@ class PlayerManager:
         if not player_conf["volume_normalisation"]:
             return 0
         target_gain = int(player_conf["target_volume"])
-        fallback_gain = int(player_conf["fallback_gain_correct"])
         track_loudness = await self.mass.database.get_track_loudness(
             item_id, provider_id
         )
         if track_loudness is None:
-            gain_correct = fallback_gain
-        else:
-            gain_correct = target_gain - track_loudness
+            # fallback to provider average
+            track_loudness = await self.mass.database.get_provider_loudness(provider_id)
+            if track_loudness is None:
+                # fallback to some (hopefully sane) average value for now
+                track_loudness = -8.5
+        gain_correct = target_gain - track_loudness
         gain_correct = round(gain_correct, 2)
         return gain_correct
index 4900bb84228ba65b1f89b14468d5ed9e17b8cfd5..f24affa56e859663b7c93242e90c56f0912a03ec 100755 (executable)
@@ -7,7 +7,6 @@ of music with crossfade/gapless support (queue stream).
 All audio is processed by the SoX executable, using various subprocess streams.
 """
 import asyncio
-import gc
 import logging
 import shlex
 import subprocess
@@ -54,7 +53,7 @@ class StreamManager:
         output_format: SoxOutputFormat = SoxOutputFormat.FLAC,
         resample: Optional[int] = None,
         gain_db_adjust: Optional[float] = None,
-        chunk_size: int = 4000000,
+        chunk_size: int = 512000,
     ) -> AsyncGenerator[Tuple[bool, bytes], None]:
         """Get the sox manipulated audio data for the given streamdetails."""
         # collect all args for sox
@@ -93,21 +92,28 @@ class StreamManager:
             fill_buffer_task = self.mass.loop.create_task(fill_buffer())
             # yield chunks from stdout
             # we keep 1 chunk behind to detect end of stream properly
-            prev_chunk = b""
-            async for chunk in sox_proc.iterate_chunks(chunk_size):
-                if len(chunk) < chunk_size:
-                    # last chunk
-                    yield (True, prev_chunk + chunk)
-                    break
-                if prev_chunk:
-                    yield (False, prev_chunk)
-                prev_chunk = chunk
-            await asyncio.wait([fill_buffer_task])
-            LOGGER.debug(
-                "finished sox stream for: %s/%s",
-                streamdetails.provider,
-                streamdetails.item_id,
-            )
+            try:
+                prev_chunk = b""
+                async for chunk in sox_proc.iterate_chunks(chunk_size):
+                    if prev_chunk:
+                        yield (False, prev_chunk)
+                    prev_chunk = chunk
+                # send last chunk
+                yield (True, prev_chunk)
+            except (asyncio.CancelledError, GeneratorExit) as err:
+                LOGGER.debug(
+                    "get_sox_stream aborted for: %s/%s",
+                    streamdetails.provider,
+                    streamdetails.item_id,
+                )
+                fill_buffer_task.cancel()
+                raise err
+            else:
+                LOGGER.debug(
+                    "finished sox stream for: %s/%s",
+                    streamdetails.provider,
+                    streamdetails.item_id,
+                )
 
     async def queue_stream_flac(self, player_id) -> AsyncGenerator[bytes, None]:
         """Stream the PlayerQueue's tracks as constant feed in flac format."""
@@ -138,9 +144,21 @@ class StreamManager:
             fill_buffer_task = self.mass.loop.create_task(fill_buffer())
 
             # start yielding audio chunks
-            async for chunk in sox_proc.iterate_chunks():
-                yield chunk
-            await asyncio.wait([fill_buffer_task])
+            try:
+                async for chunk in sox_proc.iterate_chunks():
+                    yield chunk
+            except (asyncio.CancelledError, GeneratorExit) as err:
+                LOGGER.debug(
+                    "queue_stream_flac aborted for: %s",
+                    player_id,
+                )
+                fill_buffer_task.cancel()
+                raise err
+            else:
+                LOGGER.debug(
+                    "finished queue_stream_flac for: %s",
+                    player_id,
+                )
 
     async def queue_stream_pcm(
         self, player_id, sample_rate=96000, bit_depth=32
@@ -204,10 +222,10 @@ class StreamManager:
                 # HANDLE FIRST PART OF TRACK
                 if not chunk and bytes_written == 0:
                     # stream error: got empy first chunk
-                    # prevent player queue get stuck by sending next track command
-                    self.mass.add_job(player_queue.next())
                     LOGGER.error("Stream error on track %s", queue_track.item_id)
-                    return
+                    # prevent player queue get stuck by just skipping to the next track
+                    queue_track.duration = 0
+                    continue
                 if cur_chunk <= 2 and not last_fadeout_data:
                     # no fadeout_part available so just pass it to the output directly
                     yield chunk
@@ -253,15 +271,17 @@ class StreamManager:
                         # part is too short after the strip action
                         # so we just use the entire original data
                         last_part = prev_chunk + chunk
-                        if len(last_part) < buffer_size:
-                            LOGGER.warning(
-                                "Not enough data for crossfade: %s", len(last_part)
-                            )
                     if (
                         not player_queue.crossfade_enabled
                         or len(last_part) < buffer_size
                     ):
-                        # crossfading is not enabled so just pass the (stripped) audio data
+                        # crossfading is not enabled or not enough data,
+                        # so just pass the (stripped) audio data
+                        if not player_queue.crossfade_enabled:
+                            LOGGER.warning(
+                                "Not enough data for crossfade: %s", len(last_part)
+                            )
+
                         yield last_part
                         bytes_written += len(last_part)
                         del last_part
@@ -272,8 +292,9 @@ class StreamManager:
                         last_fadeout_data = last_part[-buffer_size:]
                         remaining_bytes = last_part[:-buffer_size]
                         # write remaining bytes
-                        yield remaining_bytes
-                        bytes_written += len(remaining_bytes)
+                        if remaining_bytes:
+                            yield remaining_bytes
+                            bytes_written += len(remaining_bytes)
                         del last_part
                         del remaining_bytes
                         del chunk
@@ -299,14 +320,11 @@ class StreamManager:
                 queue_track.name,
                 player_id,
             )
-            # run garbage collect manually to avoid too much memory fragmentation
-            self.mass.add_job(gc.collect)
         # end of queue reached, pass last fadeout bits to final output
-        yield last_fadeout_data
+        if last_fadeout_data:
+            yield last_fadeout_data
         del last_fadeout_data
         # END OF QUEUE STREAM
-        # run garbage collect manually to avoid too much memory fragmentation
-        self.mass.add_job(gc.collect)
         LOGGER.info("streaming of queue for player %s completed", player_id)
 
     async def stream_queue_item(
@@ -364,7 +382,7 @@ class StreamManager:
         # stream from URL
         if stream_type == StreamType.URL:
             async with self.mass.http_session.get(stream_path) as response:
-                async for chunk in response.content.iter_any():
+                async for chunk, _ in response.content.iter_chunks():
                     yield chunk
                     if needs_analyze and len(audio_data) < 100000000:
                         audio_data += chunk
index 7fe71d7031c10daa2c09a9f04bac6765d6074feb..253324e02e3d4ac32e12bdf2273aea2a6995e5b4 100644 (file)
@@ -34,9 +34,7 @@ LOGGER = logging.getLogger("mass")
 
 def global_exception_handler(loop: asyncio.AbstractEventLoop, context: Dict) -> None:
     """Global exception handler."""
-    LOGGER.exception(
-        "Caught exception: %s", context.get("exception", context["message"])
-    )
+    LOGGER.debug("Caught exception: %s", context.get("exception", context["message"]))
     if "Broken pipe" in str(context.get("exception")):
         # fix for the spamming subprocess
         return
@@ -225,7 +223,7 @@ class MusicAssistant:
         include_unavailable: bool = False,
     ) -> Tuple[Provider]:
         """Return all providers, optionally filtered by type."""
-        return (
+        return tuple(
             item
             for item in self._providers.values()
             if (filter_type is None or item.type == filter_type)
index ce311be779e3b9ecfe38dd3e5af3d3b46eb5fcc2..ccfb142012f831fe4cc7e0a924d7b4808b2baec6 100755 (executable)
@@ -97,7 +97,7 @@ class PlayerState(DataClassDictMixin):
     is_group_player: bool = False
     group_childs: Set[str] = field(default_factory=set)
     device_info: DeviceInfo = field(default_factory=DeviceInfo)
-    updated_at: datetime = None
+    updated_at: datetime = datetime.now()
     group_parents: Set[str] = field(default_factory=set)
     features: Set[PlayerFeature] = field(default_factory=set)
     active_queue: str = None
@@ -116,6 +116,8 @@ class PlayerState(DataClassDictMixin):
                 setattr(self, key, new_val)
                 if key != "updated_at":
                     changed_keys.add(key)
+        if changed_keys:
+            self.updated_at = datetime.now()
         return changed_keys
 
 
@@ -502,7 +504,7 @@ class Player:
             provider_id=self.provider_id,
             name=self._get_name(),
             powered=self._get_powered(),
-            state=self.state,
+            state=self._get_state(),
             available=self._get_available(),
             volume_level=self._get_volume_level(),
             elapsed_time=self.elapsed_time,
@@ -513,7 +515,6 @@ class Player:
             group_parents=self._get_group_parents(),
             features=self.features,
             active_queue=self._get_active_queue(),
-            updated_at=datetime.now(),
         )
 
     def to_dict(self) -> dict:
index 7daf74d23818bc402a4be1a21dd493b9ff029aa4..83e0496ccd4e38b45631202e19875495af0f08b7 100755 (executable)
@@ -536,6 +536,7 @@ class PlayerQueue:
         """Instance attributes as dict so it can be serialized to json."""
         return {
             "queue_id": self.player.player_id,
+            "queue_name": self.player.player_state.name,
             "shuffle_enabled": self.shuffle_enabled,
             "repeat_enabled": self.repeat_enabled,
             "crossfade_enabled": self.crossfade_enabled,
index 185996f0a4ce92771a850375effb761249231fba..10ca0fffceb02a12df16075a61d6e67798061d0f 100644 (file)
@@ -1,7 +1,6 @@
 """Representation of a Cast device on the network."""
 import logging
 import uuid
-from datetime import datetime
 from typing import List, Optional
 
 import pychromecast
@@ -112,30 +111,6 @@ class ChromecastPlayer(Player):
         # Not playing, return last reported seek time
         return self.media_status.current_time
 
-    @property
-    def elapsed_milliseconds(self) -> int:
-        """Return (realtime) elapsed time of current playing media in milliseconds."""
-        if self.media_status is None or not (
-            self.media_status.player_is_playing
-            or self.media_status.player_is_paused
-            or self.media_status.player_is_idle
-        ):
-            return 0
-        if self.media_status.player_is_playing:
-            # Add time since last update
-            return int(
-                (
-                    self.media_status.current_time
-                    + (
-                        datetime.utcnow().timestamp()
-                        - self.media_status.last_updated.timestamp()
-                    )
-                )
-                * 1000
-            )
-        # Not playing, return last reported seek time
-        return self.media_status.current_time * 1000
-
     @property
     def available(self) -> bool:
         """Return availablity state of this player."""
@@ -300,15 +275,6 @@ class ChromecastPlayer(Player):
             if self._cast_info.is_audio_group and new_available:
                 self.mass.add_job(self._chromecast.mz_controller.update_members)
 
-    async def on_poll(self) -> None:
-        """Call when player is periodically polled by the player manager (should_poll=True)."""
-        if self.active_queue.startswith("group_player"):
-            # the group player wants very accurate elapsed_time state so we request it very often
-            await self.chromecast_command(
-                self._chromecast.media_controller.update_status
-            )
-        self.update_state()
-
     # ========== Service Calls ==========
 
     async def cmd_stop(self) -> None:
index 920798dc0623f6d78a311751e97cf09a95ad032b..c8ce97ffd15c5b07acd0250d8c69503fd531f1a8 100644 (file)
@@ -351,7 +351,7 @@ class GroupPlayer(Player):
     async def queue_stream_task(self):
         """Handle streaming queue to connected child players."""
         ticks = 0
-        while ticks < 60 and len(self.connected_clients) != len(self.group_childs):
+        while ticks < 60 and (len(self.connected_clients) != len(self.group_childs)):
             # TODO: Support situation where not all clients of the group are powered
             await asyncio.sleep(0.1)
             ticks += 1
index 282baa5712f96ef8aafbebf997ba9c8860d4b747..47ee13b91f7ab7ddfd96d831f5da101fe00857f2 100644 (file)
@@ -9,7 +9,6 @@
     "max_sample_rate": "Maximum sample rate",
     "volume_normalisation": "Enable Volume normalisation",
     "target_volume": "Target Volume level",
-    "fallback_gain_correct": "Fallback gain correction level",
     "desc_player_name": "Set a custom name for this player.",
     "crossfade_duration": "Enable crossfade",
     "group_delay": "Correction of groupdelay",
@@ -21,7 +20,6 @@
     "desc_sample_rate": "Set the maximum sample rate this player can handle.",
     "desc_volume_normalisation": "Enable R128 volume normalisation to play music at an equally loud volume.",
     "desc_target_volume": "Set the preferred target volume level in LUFS. The R128 default is -22 LUFS.",
-    "desc_gain_correct": "Set a fallback gain correction when there is no R128 measurement available.",
     "desc_crossfade": "Enable crossfading of Queue tracks by setting a crossfade duration in seconds.",
     "desc_enable_provider": "Enable this provider.",
     "desc_base_username": "Username to access this Music Assistant server.",
@@ -40,7 +38,6 @@
     "max_sample_rate": "Maximale sample rate",
     "volume_normalisation": "Volume normalisering inschakelen",
     "target_volume": "Doel volume",
-    "fallback_gain_correct": "Fallback gain correctie niveau",
     "desc_player_name": "Stel een aangepaste naam in voor deze speler.",
     "crossfade_duration": "Crossfade inschakelen",
     "security": "Beveiliging",
@@ -52,7 +49,6 @@
     "desc_sample_rate": "Stel de maximale sample rate in die deze speler aankan.",
     "desc_volume_normalisation": "R128 volume normalisatie inschakelen om muziek altijd op een gelijk volume af te spelen.",
     "desc_target_volume": "Selecteer het gewenste doelvolume in LUFS. De R128 standaard is -22 LUFS.",
-    "desc_gain_correct": "Stel een fallback gain correctie in als er geen R128 meting beschikbaar is.",
     "desc_crossfade": "Crossfade inschakelen door het instellen van een crossfade duur in seconden.",
     "desc_enable_provider": "Deze provider inschakelen.",
     "desc_base_username": "Gebruikersnaam waarmee deze server beveiligd moet worden.",