queue.elapsed_time_last_updated = time.time()
else:
# queue is active and player has one of our tracks loaded, update state
- if item_id := self._parse_player_current_item_id(queue_id, player.current_item_id):
+ if item_id := self._parse_player_current_item_id(queue_id, player):
queue.current_index = self.index_by_id(queue_id, item_id)
if player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
queue.elapsed_time = int(player.corrected_elapsed_time)
break
return queue_index, track_time
- def _parse_player_current_item_id(self, queue_id: str, current_item_id: str) -> str | None:
+ def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
"""Parse QueueItem ID from Player's current url."""
- if not current_item_id:
+ if not player.current_media:
return None
- if queue_id in current_item_id:
+ if player.current_media.queue_id and player.current_media.queue_id != queue_id:
+ return None
+ if player.current_media.queue_item_id:
+ return player.current_media.queue_item_id
+ if queue_id in player.current_media.uri:
# try to extract the item id from either a url or queue_id/item_id combi
- current_item_id = current_item_id.rsplit("/")[-1].split(".")[0]
- if self.get_item(queue_id, current_item_id):
- return current_item_id
+ current_item_id = player.current_media.uri.rsplit("/")[-1].split(".")[0]
+ if self.get_item(queue_id, current_item_id):
+ return current_item_id
return None
self.mass_player.state = PLAYBACK_STATE_MAP[active_group.playback_state]
self.mass_player.elapsed_time = active_group.position
- is_playing = active_group.playback_state in (
- SonosPlayBackState.PLAYBACK_STATE_PLAYING,
- SonosPlayBackState.PLAYBACK_STATE_BUFFERING,
- )
# figure out the active source based on the container
container_type = active_group.container_type
active_service = active_group.active_service
container = active_group.playback_metadata.get("container")
- if is_playing and container_type == ContainerType.LINEIN:
+ if container_type == ContainerType.LINEIN:
self.mass_player.active_source = SOURCE_LINE_IN
- elif is_playing and container_type == ContainerType.AIRPLAY:
+ elif container_type == ContainerType.AIRPLAY:
# check if the MA airplay player is active
airplay_player = self.mass.players.get(self.airplay_player_id)
if airplay_player and airplay_player.state in (
self.mass_player.active_source = airplay_player.active_source
else:
self.mass_player.active_source = SOURCE_AIRPLAY
- elif is_playing and container_type == ContainerType.STATION:
+ elif container_type == ContainerType.STATION:
self.mass_player.active_source = SOURCE_RADIO
- elif is_playing and active_service == MusicService.SPOTIFY:
+ elif active_service == MusicService.SPOTIFY:
self.mass_player.active_source = SOURCE_SPOTIFY
elif active_service == MusicService.MUSIC_ASSISTANT:
- if (
- active_group.active_session_id
- and container
- and (object_id := container.get("id", {}).get("objectId"))
- ):
+ if object_id := container.get("id", {}).get("objectId"):
self.mass_player.active_source = object_id.split(":")[-1]
- else:
- self.mass_player.active_source = None
- elif is_playing:
+ else:
# its playing some service we did not yet map
self.mass_player.active_source = active_service
- else:
- # all our (known) options exhausted, fallback to unknown
- self.mass_player.active_source = None
- if self.mass_player.active_source == self.player_id and active_group.active_session_id:
- # active source is the mass queue
- # media details are updated through the time played callback
- return
+ # sonos has this weirdness that it maps idle to paused
+ # which is annoying to figure out if we want to resume or let
+ # MA back in control again. So for now, we just map it to idle here.
+ if (
+ self.mass_player.state == PlayerState.PAUSED
+ and active_service != MusicService.MUSIC_ASSISTANT
+ ):
+ self.mass_player.state = PlayerState.IDLE
# parse current media
self.mass_player.elapsed_time = self.client.player.group.position
self.mass_player.elapsed_time_last_updated = time.time()
+ current_media = None
if (current_item := active_group.playback_metadata.get("currentItem")) and (
(track := current_item.get("track")) and track.get("name")
):
track_images = track.get("images", [])
track_image_url = track_images[0].get("url") if track_images else None
track_duration_millis = track.get("durationMillis")
- self.mass_player.current_media = PlayerMedia(
+ current_media = PlayerMedia(
uri=track.get("id", {}).get("objectId") or track.get("mediaUrl"),
title=track["name"],
artist=track.get("artist", {}).get("name"),
duration=track_duration_millis / 1000 if track_duration_millis else None,
image_url=track_image_url,
)
- elif (
- container and container.get("name") and active_group.playback_metadata.get("streamInfo")
- ):
+ if active_service == MusicService.MUSIC_ASSISTANT:
+ current_media.queue_id = self.mass_player.active_source
+ current_media.queue_item_id = current_item["id"]
+ # radio stream info
+ if container and container.get("name") and active_group.playback_metadata.get("streamInfo"):
images = container.get("images", [])
image_url = images[0].get("url") if images else None
- self.mass_player.current_media = PlayerMedia(
+ current_media = PlayerMedia(
uri=container.get("id", {}).get("objectId"),
title=active_group.playback_metadata["streamInfo"],
album=container["name"],
image_url=image_url,
)
- elif container and container.get("name") and container.get("id"):
- images = container.get("images", [])
- image_url = images[0].get("url") if images else None
- self.mass_player.current_media = PlayerMedia(
- uri=container["id"]["objectId"],
- title=container["name"],
- image_url=image_url,
- )
- else:
- self.mass_player.current_media = None
+ # generic info from container (also when MA is playing!)
+ if container and container.get("name") and container.get("id"):
+ if not current_media:
+ current_media = PlayerMedia(container["id"]["objectId"])
+ if not current_media.image_url:
+ images = container.get("images", [])
+ current_media.image_url = images[0].get("url") if images else None
+ if not current_media.title:
+ current_media.title = container["name"]
+ if not current_media.uri:
+ current_media.uri = container["id"]["objectId"]
+
+ self.mass_player.current_media = current_media
def _on_player_event(self, event: SonosEvent) -> None:
"""Handle incoming event from player."""
"type": "track",
"mediaUrl": self.mass.streams.resolve_stream_url(item),
"contentType": "audio/flac",
- "service": {"name": "Music Assistant", "id": "8", "accountId": ""},
+ "service": {
+ "name": "Music Assistant",
+ "id": "8",
+ "accountId": "",
+ "objectId": item.queue_item_id,
+ },
"name": item.name,
"imageUrl": self.mass.metadata.get_image_url(
item.image, prefer_proxy=False, image_format="jpeg"
for item in json_body["items"]:
if item["queueVersion"] != sonos_player.queue_version:
continue
+ if item["type"] != "update":
+ continue
if "positionMillis" not in item:
continue
mass_player.current_media = PlayerMedia(