Some small fixes and enhancements to audio streaming (#1219)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 12 Apr 2024 11:08:49 +0000 (13:08 +0200)
committerGitHub <noreply@github.com>
Fri, 12 Apr 2024 11:08:49 +0000 (13:08 +0200)
music_assistant/common/models/player_queue.py
music_assistant/server/controllers/player_queues.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/providers/chromecast/__init__.py
music_assistant/server/providers/chromecast/helpers.py

index 28ceb298b06d96a4626ffba8c593559e08ef2b3e..45176099e01cdaa8587ce1d7298a5a35969a24c3 100644 (file)
@@ -39,7 +39,7 @@ class PlayerQueue(DataClassDictMixin):
     flow_mode: bool = False
     # flow_mode_start_index: index of the first item of the flow stream
     flow_mode_start_index: int = 0
-    stream_finished: bool = False
+    stream_finished: bool | None = None
 
     @property
     def corrected_elapsed_time(self) -> float:
index 7970ee8c5f05edd32113eeca487058238e64291d..facfb4a960c6cbae7ce3c5503e902bc9f3c5153f 100644 (file)
@@ -496,6 +496,7 @@ class PlayerQueuesController(CoreController):
             return
         queue = self._queues[queue_id]
         queue.radio_source = []
+        queue.stream_finished = None
         if queue.state != PlayerState.IDLE:
             self.mass.create_task(self.stop(queue_id))
         queue.current_index = None
@@ -515,7 +516,7 @@ class PlayerQueuesController(CoreController):
             self.logger.warning("Ignore queue command: An announcement is in progress")
             return
         if queue := self.get(queue_id):
-            queue.stream_finished = False
+            queue.stream_finished = None
         # simply forward the command to underlying player
         await self.mass.players.cmd_stop(queue_id)
 
@@ -682,6 +683,7 @@ class PlayerQueuesController(CoreController):
         )
         next_index = self._get_next_index(queue_id, index, allow_repeat=False)
         queue.flow_mode = player_needs_flow_mode and next_index is not None
+        queue.stream_finished = False
         # get streamdetails - do this here to catch unavailable items early
         queue_item.streamdetails = await get_stream_details(
             self.mass, queue_item, seek_position=seek_position, fade_in=fade_in
@@ -1100,6 +1102,7 @@ class PlayerQueuesController(CoreController):
         # player does not support enqueue next feature.
         # we wait for the player to stop after it reaches the end of the track
         if queue.stream_finished and queue.state == PlayerState.IDLE:
+            queue.stream_finished = None
             self.mass.create_task(_enqueue_next(queue.current_index, False))
             return
 
index 30f4814463271862b64b0c196fa848d26d9aeefe..b07dd5e0743327dfaddd7f40a5b58947bb013641 100644 (file)
@@ -305,7 +305,8 @@ class StreamsController(CoreController):
                 await resp.write(chunk)
             except (BrokenPipeError, ConnectionResetError):
                 break
-
+        if queue.stream_finished is not None:
+            queue.stream_finished = True
         return resp
 
     async def serve_queue_flow_stream(self, request: web.Request) -> web.Response:
@@ -355,7 +356,6 @@ class StreamsController(CoreController):
 
         # all checks passed, start streaming!
         self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
-        queue.stream_finished = False
 
         # collect player specific ffmpeg args to re-encode the source PCM stream
         pcm_format = AudioFormat(
@@ -409,7 +409,6 @@ class StreamsController(CoreController):
             length_b = chr(int(length / 16)).encode()
             await resp.write(length_b + metadata)
 
-        queue.stream_finished = True
         return resp
 
     async def serve_command_request(self, request: web.Request) -> web.Response:
@@ -669,7 +668,8 @@ class StreamsController(CoreController):
             queue_track.streamdetails.duration += last_part_seconds
             del last_fadeout_part
         total_bytes_sent += bytes_written
-        queue.stream_finished = True
+        if queue.stream_finished is not None:
+            queue.stream_finished = True
         self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
 
     async def get_announcement_stream(
index 8dfa838ff0ca884bf9b9e017c94043c0f9bb73a5..648cb3d176d7438b2776aa6a8b9848789c37a2d4 100644 (file)
@@ -143,11 +143,12 @@ class FFMpeg(AsyncProcess):
             self.proc.stdin.close()
             await asyncio.sleep(0)  # yield to loop
         # abort existing readers on stdout first before we send communicate
-        if self.proc.stdout:
-            if self.proc.stdout._waiter is not None:
-                with suppress(asyncio.exceptions.InvalidStateError):
-                    self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
-            # read reamaing bytes to unblock pipe
+        waiter: asyncio.Future
+        if self.proc.stdout and (waiter := self.proc.stdout._waiter):
+            self.proc.stdout._waiter = None
+            if waiter and not waiter.done():
+                waiter.set_exception(asyncio.CancelledError())
+            # read remaining bytes to unblock pipe
             await self.read(-1)
         # wait for log task to complete that reads the remaining data from stderr
         with suppress(TimeoutError):
index 9e9459b6ae066422a171e6693f0eeeafb775a79c..f933b9837feba007c40188e9f61ffa8c526ed2f9 100644 (file)
@@ -102,6 +102,8 @@ class CastPlayer:
     status_listener: CastStatusListener | None = None
     mz_controller: MultizoneController | None = None
     active_group: str | None = None
+    last_poll: float = 0
+    flow_meta_checksum: str | None = None
 
 
 class ChromecastProvider(PlayerProvider):
@@ -286,7 +288,10 @@ class ChromecastProvider(PlayerProvider):
         if not castplayer.cc.media_controller.is_active:
             return
         try:
-            await asyncio.to_thread(castplayer.cc.media_controller.update_status)
+            now = time.time()
+            if (now - castplayer.last_poll) >= 30:
+                castplayer.last_poll = now
+                await asyncio.to_thread(castplayer.cc.media_controller.update_status)
             await self.update_flow_metadata(castplayer)
         except ConnectionResetError as err:
             raise PlayerUnavailableError from err
@@ -375,8 +380,8 @@ class ChromecastProvider(PlayerProvider):
                     # originally/officially cast supports 96k sample rate
                     # but it seems a (recent?) update broke this
                     # for now use 48k as max sample rate to play safe
-                    max_sample_rate=44100 if cast_info.is_audio_group else 48000,
-                    supports_24bit=not cast_info.is_audio_group,
+                    max_sample_rate=48000,
+                    supports_24bit=True,
                     enabled_by_default=enabled_by_default,
                 ),
             )
@@ -612,6 +617,10 @@ class ChromecastProvider(PlayerProvider):
                 album = ""
                 artist = ""
                 title = current_item.name
+            flow_meta_checksum = title + image_url
+            if castplayer.flow_meta_checksum == flow_meta_checksum:
+                return
+            castplayer.flow_meta_checksum = flow_meta_checksum
             queuedata = {
                 "type": "PLAY",
                 "mediaSessionId": media_controller.status.media_session_id,
index ae8724ef808f3e73aa83be1d1d42a6d83959a031..34fae5eb313e794313aa4a44059efde597c932bf 100644 (file)
@@ -10,6 +10,8 @@ from uuid import UUID
 from pychromecast import dial
 from pychromecast.const import CAST_TYPE_GROUP
 
+from music_assistant.constants import VERBOSE_LOG_LEVEL
+
 if TYPE_CHECKING:
     from pychromecast.controllers.media import MediaStatus
     from pychromecast.controllers.multizone import MultizoneManager
@@ -192,8 +194,11 @@ class CastStatusListener:
             elif group_uuid == self.castplayer.active_group:
                 self.castplayer.active_group = None
                 self.castplayer.player.active_source = self.castplayer.player.player_id
-        self.prov.logger.debug(
-            "%s got new cast status for group: %s", self.castplayer.player.display_name, group_uuid
+        self.prov.logger.log(
+            VERBOSE_LOG_LEVEL,
+            "%s got new cast status for group: %s",
+            self.castplayer.player.display_name,
+            group_uuid,
         )
         self.new_cast_status(self.castplayer.cc.status)
 
@@ -201,8 +206,11 @@ class CastStatusListener:
         """Handle reception of a new MediaStatus for a group."""
         if not self._valid:
             return
-        self.prov.logger.debug(
-            "%s got new media_status for group: %s", self.castplayer.player.display_name, group_uuid
+        self.prov.logger.log(
+            VERBOSE_LOG_LEVEL,
+            "%s got new media_status for group: %s",
+            self.castplayer.player.display_name,
+            group_uuid,
         )
 
     def load_media_failed(self, queue_item_id, error_code) -> None: