from uuid import UUID
import pychromecast
-from pychromecast.controllers.bubbleupnp import BubbleUPNPController
-from pychromecast.controllers.media import STREAM_TYPE_BUFFERED, STREAM_TYPE_LIVE
+from pychromecast.controllers.media import STREAM_TYPE_BUFFERED, STREAM_TYPE_LIVE, MediaController
from pychromecast.controllers.multizone import MultizoneController, MultizoneManager
from pychromecast.discovery import CastBrowser, SimpleCastListener
from pychromecast.models import CastInfo
from music_assistant.server.models import ProviderInstanceType
-CONF_ALT_APP = "alt_app"
-
-
PLAYER_CONFIG_ENTRIES = (
ConfigEntry(
key=CONF_CROSSFADE,
"uses a 'flow mode' workaround for this at the cost of on-player metadata.",
advanced=False,
),
- ConfigEntry(
- key=CONF_ALT_APP,
- type=ConfigEntryType.BOOLEAN,
- label="Use alternate Media app",
- default_value=False,
- description="Using the BubbleUPNP Media controller for playback improves "
- "the playback experience but may not work on non-Google hardware.",
- advanced=True,
- ),
CONF_ENTRY_CROSSFADE_DURATION,
)
+# Monkey patch the Media controller here to store the queue items
+_patched_process_media_status_org = MediaController._process_media_status
+
+
+def _patched_process_media_status(self, data):
+ """Process STATUS message(s) of the media controller."""
+ _patched_process_media_status_org(self, data)
+ for status_msg in data.get("status", []):
+ if items := status_msg.get("items"):
+ self.status.items = items
+
+
+MediaController._process_media_status = _patched_process_media_status
+
+
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
) -> ProviderInstanceType:
if use_flow_mode:
# In flow mode, all queue tracks are sent to the player as continuous stream.
# This comes at the cost of metadata (cast does not support ICY metadata).
- await asyncio.to_thread(
- castplayer.cc.play_media,
- url,
- content_type=f'audio/{url.split(".")[-1].split("?")[0]}',
- title="Music Assistant",
- thumb=MASS_LOGO_ONLINE,
+ cc_queue_items = [
+ self._create_cc_queue_item(None, url),
+ ]
+ else:
+ # handle normal playback using the chromecast queue to play items one by one
+ cc_queue_items = [
+ self._create_cc_queue_item(queue_item, url),
+ ]
+ # add a special 'command' item to the queue
+ # this allows for on-player next buttons/commands to still work
+ cc_queue_items.append(
+ self._create_cc_queue_item(
+ None, self.mass.streams.get_command_url(queue_item.queue_id, "next")
)
- return
- # handle normal playback using the chromecast queue to play items one by one
- cc_queue_items = [self._create_cc_queue_item(queue_item, url)]
+ )
queuedata = {
"type": "QUEUE_LOAD",
"repeatMode": "REPEAT_OFF", # handled by our queue controller
queue_item=queue_item,
output_codec=ContentType.FLAC,
)
+ if cast_queue_items := getattr(castplayer.cc.media_controller.status, "items"):
+ next_item_id = cast_queue_items[-1]["itemId"]
queuedata = {
"type": "QUEUE_INSERT",
- "insertBefore": None,
+ "insertBefore": next_item_id,
"items": [self._create_cc_queue_item(queue_item, url)],
}
media_controller = castplayer.cc.media_controller
castplayer.player.elapsed_time = status.current_time
# active source
- if status.content_id and castplayer.player_id in status.content_id:
+ if status.content_id and castplayer.player_id in status.content_id: # noqa: SIM114
+ castplayer.player.active_source = castplayer.player_id
+ elif castplayer.cc.app_id == pychromecast.config.APP_MEDIA_RECEIVER:
castplayer.player.active_source = castplayer.player_id
else:
castplayer.player.active_source = castplayer.cc.app_display_name
async def _launch_app(self, castplayer: CastPlayer) -> None:
"""Launch the default Media Receiver App on a Chromecast."""
event = asyncio.Event()
- if use_alt_app := await self.mass.config.get_player_config_value(
- castplayer.player_id, CONF_ALT_APP
- ):
- app_id = pychromecast.config.APP_BUBBLEUPNP
- else:
- app_id = pychromecast.config.APP_MEDIA_RECEIVER
+ app_id = pychromecast.config.APP_MEDIA_RECEIVER
if castplayer.cc.app_id == app_id:
return # already active
# Quit the previous app before starting splash screen or media player
if castplayer.cc.app_id is not None:
castplayer.cc.quit_app()
- # Use BubbleUPNP media receiver app if configured
- # which enables a more rich display but does not work on all players
- # so its configurable to turn it on/off
- if use_alt_app:
- castplayer.logger.debug(
- "Launching BubbleUPNPController (%s) as active app.", app_id
- )
- controller = BubbleUPNPController()
- castplayer.cc.register_handler(controller)
- controller.launch(launched_callback)
- else:
- castplayer.logger.debug(
- "Launching Default Media Receiver (%s) as active app.", app_id
- )
- castplayer.cc.media_controller.launch(launched_callback)
+ castplayer.logger.debug("Launching Default Media Receiver (%s) as active app.", app_id)
+ castplayer.cc.media_controller.launch(launched_callback)
await self.mass.loop.run_in_executor(None, launch)
await event.wait()
castplayer.status_listener = None
self.castplayers.pop(castplayer.player_id, None)
- def _create_cc_queue_item(self, queue_item: QueueItem, stream_url: str):
+ def _create_cc_queue_item(self, queue_item: QueueItem | None, stream_url: str):
"""Create CC queue item from MA QueueItem."""
+ if queue_item is None:
+ # flow mode or other special type
+ return {
+ "autoplay": True,
+ "preloadTime": 10,
+ "startTime": 0,
+ "activeTrackIds": [],
+ "media": {
+ "contentId": stream_url,
+ "customData": {
+ "uri": stream_url,
+ "queue_item_id": stream_url,
+ },
+ "contentType": "audio/flac",
+ "streamType": STREAM_TYPE_LIVE,
+ "metadata": {
+ "metadataType": 0,
+ "title": "Music Assistant",
+ "images": [{"url": MASS_LOGO_ONLINE}],
+ },
+ "duration": None,
+ },
+ }
duration = int(queue_item.duration) if queue_item.duration else None
image_url = self.mass.metadata.get_image_url(queue_item.image) if queue_item.image else ""
if queue_item.media_type == MediaType.TRACK and queue_item.media_item: