active_groups = self.active_groups
return active_groups[0] if active_groups else None
- @property
+ @cached_property
@final
def current_media(self) -> PlayerMedia | None:
"""
if parent_player := self.mass.players.get(parent_player_id):
return parent_player.current_media
# if a pluginsource is currently active, return those details
- if self.active_source and (
- source := self.mass.players.get_plugin_source(self.active_source)
+ if (
+ self.active_source
+ and (source := self.mass.players.get_plugin_source(self.active_source))
+ and source.metadata
):
- return deepcopy(source.metadata)
+ return PlayerMedia(
+ uri=source.metadata.uri or source.id,
+ title=source.metadata.title,
+ artist=source.metadata.artist,
+ album=source.metadata.album,
+ image_url=source.metadata.image_url,
+ duration=source.metadata.duration,
+ source_id=source.id,
+ elapsed_time=source.metadata.elapsed_time,
+ elapsed_time_last_updated=source.metadata.elapsed_time_last_updated,
+ )
+ # if MA queue is active, return those details
+ active_queue = None
+ if self._current_media and self._current_media.source_id:
+ active_queue = self.mass.player_queues.get(self._current_media.source_id)
+ if not active_queue and self.active_source:
+ active_queue = self.mass.player_queues.get(self.active_source)
+
+ if active_queue and (current_item := active_queue.current_item):
+ item_image_url = (
+ self.mass.metadata.get_image_url(current_item.image) if current_item.image else None
+ )
+ if current_item.streamdetails and (
+ stream_metadata := current_item.streamdetails.stream_metadata
+ ):
+ # handle stream metadata in streamdetails (e.g. for radio stream)
+ return PlayerMedia(
+ uri=current_item.uri,
+ title=stream_metadata.title or current_item.name,
+ artist=stream_metadata.artist,
+ album=stream_metadata.album or current_item.name,
+ image_url=(stream_metadata.image_url or item_image_url),
+ duration=stream_metadata.duration or current_item.duration,
+ source_id=active_queue.queue_id,
+ queue_item_id=current_item.queue_item_id,
+ elapsed_time=stream_metadata.elapsed_time or int(active_queue.elapsed_time),
+ elapsed_time_last_updated=stream_metadata.elapsed_time_last_updated
+ or active_queue.elapsed_time_last_updated,
+ )
+ if media_item := current_item.media_item:
+ # normal media item
+ return PlayerMedia(
+ uri=str(media_item.uri),
+ title=media_item.name,
+ artist=getattr(media_item, "artist_str", None),
+ album=album.name if (album := getattr(media_item, "album", None)) else None,
+ image_url=self.mass.metadata.get_image_url(current_item.media_item.image)
+ or item_image_url
+ if current_item.media_item.image
+ else item_image_url,
+ duration=media_item.duration,
+ source_id=active_queue.queue_id,
+ queue_item_id=current_item.queue_item_id,
+ elapsed_time=int(active_queue.elapsed_time),
+ elapsed_time_last_updated=active_queue.elapsed_time_last_updated,
+ )
+ # fallback to basic current item details
+ return PlayerMedia(
+ uri=current_item.uri,
+ title=current_item.name,
+ image_url=item_image_url,
+ duration=current_item.duration,
+ source_id=active_queue.queue_id,
+ queue_item_id=current_item.queue_item_id,
+ elapsed_time=int(active_queue.elapsed_time),
+ elapsed_time_last_updated=active_queue.elapsed_time_last_updated,
+ )
+ # return native current media if no group/queue is active
return self._current_media
@cached_property
from collections.abc import AsyncGenerator, Awaitable, Callable
from dataclasses import dataclass, field
+from typing import TYPE_CHECKING
from mashumaro import field_options, pass_through
from music_assistant_models.enums import ContentType, StreamType
from music_assistant_models.media_items.audio_format import AudioFormat
-from music_assistant.models.player import PlayerMedia, PlayerSource
+from music_assistant.models.player import PlayerSource
from .provider import Provider
+if TYPE_CHECKING:
+ from music_assistant_models.streamdetails import StreamMetadata
+
@dataclass
class PluginSource(PlayerSource):
)
# metadata of the current playing media (if known)
- metadata: PlayerMedia | None = field(
+ metadata: StreamMetadata | None = field(
default=None,
compare=False,
metadata=field_options(serialize="omit", deserialize=pass_through),
import asyncio
import os
import pathlib
+import time
from collections.abc import Callable
from contextlib import suppress
from typing import TYPE_CHECKING, cast
ConfigEntryType,
ContentType,
EventType,
- MediaType,
ProviderFeature,
ProviderType,
StreamType,
)
from music_assistant_models.errors import UnsupportedFeaturedException
from music_assistant_models.media_items import AudioFormat
+from music_assistant_models.streamdetails import StreamMetadata
from music_assistant.constants import CONF_ENTRY_WARN_PREVIEW
from music_assistant.helpers.process import AsyncProcess, check_output
-from music_assistant.models.player import PlayerMedia
from music_assistant.models.plugin import PluginProvider, PluginSource
from music_assistant.providers.spotify.helpers import get_librespot_binary
bit_depth=16,
channels=2,
),
- metadata=PlayerMedia(
- f"Spotify Connect | {connect_name}",
+ metadata=StreamMetadata(
+ title=f"Spotify Connect | {connect_name}",
),
stream_type=StreamType.NAMED_PIPE,
path=self.named_pipe,
else:
image_url = None
if self._source_details.metadata is None:
- self._source_details.metadata = PlayerMedia(uri, media_type=MediaType.TRACK)
+ self._source_details.metadata = StreamMetadata(uri=uri, title=title)
self._source_details.metadata.uri = uri
self._source_details.metadata.title = title
self._source_details.metadata.artist = artist
self._source_details.metadata.album = album
self._source_details.metadata.image_url = image_url
+ self._source_details.metadata.description = json_data.get(
+ "episode_metadata_fields", {}
+ ).get("description")
+ self._source_details.metadata.duration = json_data.get("duration_ms", 0) * 0.001
+ self._source_details.metadata.elapsed_time = json_data.get("position", None)
+ self._source_details.metadata.elapsed_time_last_updated = time.time()
if self._source_details.in_use_by:
# tell connected player to update metadata
self.mass.players.trigger_player_update(self._source_details.in_use_by)