From a531f5739b784b95143f121e34e6f68a03d828c1 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 12 Apr 2024 13:08:49 +0200 Subject: [PATCH] Some small fixes and enhancements to audio streaming (#1219) --- music_assistant/common/models/player_queue.py | 2 +- .../server/controllers/player_queues.py | 5 ++++- music_assistant/server/controllers/streams.py | 8 ++++---- music_assistant/server/helpers/audio.py | 11 ++++++----- .../server/providers/chromecast/__init__.py | 15 ++++++++++++--- .../server/providers/chromecast/helpers.py | 16 ++++++++++++---- 6 files changed, 39 insertions(+), 18 deletions(-) diff --git a/music_assistant/common/models/player_queue.py b/music_assistant/common/models/player_queue.py index 28ceb298..45176099 100644 --- a/music_assistant/common/models/player_queue.py +++ b/music_assistant/common/models/player_queue.py @@ -39,7 +39,7 @@ class PlayerQueue(DataClassDictMixin): flow_mode: bool = False # flow_mode_start_index: index of the first item of the flow stream flow_mode_start_index: int = 0 - stream_finished: bool = False + stream_finished: bool | None = None @property def corrected_elapsed_time(self) -> float: diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index 7970ee8c..facfb4a9 100644 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -496,6 +496,7 @@ class PlayerQueuesController(CoreController): return queue = self._queues[queue_id] queue.radio_source = [] + queue.stream_finished = None if queue.state != PlayerState.IDLE: self.mass.create_task(self.stop(queue_id)) queue.current_index = None @@ -515,7 +516,7 @@ class PlayerQueuesController(CoreController): self.logger.warning("Ignore queue command: An announcement is in progress") return if queue := self.get(queue_id): - queue.stream_finished = False + queue.stream_finished = None # simply forward the command to underlying player await self.mass.players.cmd_stop(queue_id) @@ -682,6 +683,7 @@ class PlayerQueuesController(CoreController): ) next_index = self._get_next_index(queue_id, index, allow_repeat=False) queue.flow_mode = player_needs_flow_mode and next_index is not None + queue.stream_finished = False # get streamdetails - do this here to catch unavailable items early queue_item.streamdetails = await get_stream_details( self.mass, queue_item, seek_position=seek_position, fade_in=fade_in @@ -1100,6 +1102,7 @@ class PlayerQueuesController(CoreController): # player does not support enqueue next feature. # we wait for the player to stop after it reaches the end of the track if queue.stream_finished and queue.state == PlayerState.IDLE: + queue.stream_finished = None self.mass.create_task(_enqueue_next(queue.current_index, False)) return diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 30f48144..b07dd5e0 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -305,7 +305,8 @@ class StreamsController(CoreController): await resp.write(chunk) except (BrokenPipeError, ConnectionResetError): break - + if queue.stream_finished is not None: + queue.stream_finished = True return resp async def serve_queue_flow_stream(self, request: web.Request) -> web.Response: @@ -355,7 +356,6 @@ class StreamsController(CoreController): # all checks passed, start streaming! self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name) - queue.stream_finished = False # collect player specific ffmpeg args to re-encode the source PCM stream pcm_format = AudioFormat( @@ -409,7 +409,6 @@ class StreamsController(CoreController): length_b = chr(int(length / 16)).encode() await resp.write(length_b + metadata) - queue.stream_finished = True return resp async def serve_command_request(self, request: web.Request) -> web.Response: @@ -669,7 +668,8 @@ class StreamsController(CoreController): queue_track.streamdetails.duration += last_part_seconds del last_fadeout_part total_bytes_sent += bytes_written - queue.stream_finished = True + if queue.stream_finished is not None: + queue.stream_finished = True self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name) async def get_announcement_stream( diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 8dfa838f..648cb3d1 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -143,11 +143,12 @@ class FFMpeg(AsyncProcess): self.proc.stdin.close() await asyncio.sleep(0) # yield to loop # abort existing readers on stdout first before we send communicate - if self.proc.stdout: - if self.proc.stdout._waiter is not None: - with suppress(asyncio.exceptions.InvalidStateError): - self.proc.stdout._waiter.set_exception(asyncio.CancelledError()) - # read reamaing bytes to unblock pipe + waiter: asyncio.Future + if self.proc.stdout and (waiter := self.proc.stdout._waiter): + self.proc.stdout._waiter = None + if waiter and not waiter.done(): + waiter.set_exception(asyncio.CancelledError()) + # read remaining bytes to unblock pipe await self.read(-1) # wait for log task to complete that reads the remaining data from stderr with suppress(TimeoutError): diff --git a/music_assistant/server/providers/chromecast/__init__.py b/music_assistant/server/providers/chromecast/__init__.py index 9e9459b6..f933b983 100644 --- a/music_assistant/server/providers/chromecast/__init__.py +++ b/music_assistant/server/providers/chromecast/__init__.py @@ -102,6 +102,8 @@ class CastPlayer: status_listener: CastStatusListener | None = None mz_controller: MultizoneController | None = None active_group: str | None = None + last_poll: float = 0 + flow_meta_checksum: str | None = None class ChromecastProvider(PlayerProvider): @@ -286,7 +288,10 @@ class ChromecastProvider(PlayerProvider): if not castplayer.cc.media_controller.is_active: return try: - await asyncio.to_thread(castplayer.cc.media_controller.update_status) + now = time.time() + if (now - castplayer.last_poll) >= 30: + castplayer.last_poll = now + await asyncio.to_thread(castplayer.cc.media_controller.update_status) await self.update_flow_metadata(castplayer) except ConnectionResetError as err: raise PlayerUnavailableError from err @@ -375,8 +380,8 @@ class ChromecastProvider(PlayerProvider): # originally/officially cast supports 96k sample rate # but it seems a (recent?) update broke this # for now use 48k as max sample rate to play safe - max_sample_rate=44100 if cast_info.is_audio_group else 48000, - supports_24bit=not cast_info.is_audio_group, + max_sample_rate=48000, + supports_24bit=True, enabled_by_default=enabled_by_default, ), ) @@ -612,6 +617,10 @@ class ChromecastProvider(PlayerProvider): album = "" artist = "" title = current_item.name + flow_meta_checksum = title + image_url + if castplayer.flow_meta_checksum == flow_meta_checksum: + return + castplayer.flow_meta_checksum = flow_meta_checksum queuedata = { "type": "PLAY", "mediaSessionId": media_controller.status.media_session_id, diff --git a/music_assistant/server/providers/chromecast/helpers.py b/music_assistant/server/providers/chromecast/helpers.py index ae8724ef..34fae5eb 100644 --- a/music_assistant/server/providers/chromecast/helpers.py +++ b/music_assistant/server/providers/chromecast/helpers.py @@ -10,6 +10,8 @@ from uuid import UUID from pychromecast import dial from pychromecast.const import CAST_TYPE_GROUP +from music_assistant.constants import VERBOSE_LOG_LEVEL + if TYPE_CHECKING: from pychromecast.controllers.media import MediaStatus from pychromecast.controllers.multizone import MultizoneManager @@ -192,8 +194,11 @@ class CastStatusListener: elif group_uuid == self.castplayer.active_group: self.castplayer.active_group = None self.castplayer.player.active_source = self.castplayer.player.player_id - self.prov.logger.debug( - "%s got new cast status for group: %s", self.castplayer.player.display_name, group_uuid + self.prov.logger.log( + VERBOSE_LOG_LEVEL, + "%s got new cast status for group: %s", + self.castplayer.player.display_name, + group_uuid, ) self.new_cast_status(self.castplayer.cc.status) @@ -201,8 +206,11 @@ class CastStatusListener: """Handle reception of a new MediaStatus for a group.""" if not self._valid: return - self.prov.logger.debug( - "%s got new media_status for group: %s", self.castplayer.player.display_name, group_uuid + self.prov.logger.log( + VERBOSE_LOG_LEVEL, + "%s got new media_status for group: %s", + self.castplayer.player.display_name, + group_uuid, ) def load_media_failed(self, queue_item_id, error_code) -> None: -- 2.34.1