if raw_conf := self.get(f"{CONF_PLAYERS}/{player_id}"):
if player := self.mass.players.get(player_id, False):
raw_conf["default_name"] = player.display_name
- raw_conf["provider"] = player.provider.lookup_key
+ raw_conf["provider"] = player.provider.instance_id
# pass action and values to get_config_entries
if values is None:
values = raw_conf.get("values", {})
# cleanup provider specific entries for this item
# we always prefer the library playlog entry
for prov_mapping in media_item.provider_mappings:
- if not (provider := self.mass.get_provider(prov_mapping.provider_instance)):
- continue
await self.mass.music.database.delete(
DB_TABLE_PLAYLOG,
{
"media_type": self.media_type.value,
"item_id": prov_mapping.item_id,
- "provider": provider.lookup_key,
+ "provider": prov_mapping.provider_instance,
},
)
if media_item.fully_played is None and media_item.resume_position_ms is None:
# search by (exact) name match
query = f"{self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
query_params = {"name": item.name, "sort_name": item.sort_name}
- async for db_item in self.iter_library_items(
- extra_query=query, extra_query_params=query_params
+ for db_item in await self._get_library_items_by_query(
+ extra_query_parts=[query], extra_query_params=query_params
):
if compare_media_item(db_item, item, True):
return int(db_item.item_id)
"""Iterate all in-database items."""
limit: int = 500
offset: int = 0
+ if provider is not None:
+ provider_filter = provider if isinstance(provider, list) else [provider]
+ else:
+ provider_filter = None
while True:
- next_items = await self.library_items(
+ next_items = await self._get_library_items_by_query(
favorite=favorite,
search=search,
limit=limit,
offset=offset,
order_by=order_by,
- provider=provider,
- extra_query=extra_query,
+ provider_filter=provider_filter,
+ extra_query_parts=[extra_query] if extra_query else None,
extra_query_params=extra_query_params,
)
for item in next_items:
"""Get single library item by id."""
db_id = int(item_id) # ensure integer
extra_query = f"WHERE {self.db_table}.item_id = {item_id}"
- async for db_item in self.iter_library_items(extra_query=extra_query):
+ for db_item in await self._get_library_items_by_query(
+ extra_query_parts=[extra_query],
+ ):
return db_item
msg = f"{self.media_type.value} not found in library: {db_id}"
raise MediaNotFoundError(msg)
provider_instance_id_or_domain,
)
continue
- if item_prov.lookup_key == playlist_prov.lookup_key:
+ if item_prov.instance_id == playlist_prov.instance_id:
if item_id not in ids_to_add:
ids_to_add.append(item_id)
continue
continue
track_version_uri = create_uri(
MediaType.TRACK,
- item_prov.lookup_key,
+ item_prov.instance_id,
track_version.item_id,
)
if track_version_uri in cur_playlist_track_uris:
playlist.name,
)
break
- if item_prov.lookup_key == playlist_prov.lookup_key:
+ if item_prov.instance_id == playlist_prov.instance_id:
if track_version.item_id not in ids_to_add:
ids_to_add.append(track_version.item_id)
self.logger.info(
DB_TABLE_PLAYLOG,
{
"item_id": episode.item_id,
- "provider": prov.lookup_key,
+ "provider": prov.instance_id,
"media_type": MediaType.PODCAST_EPISODE,
},
)
from music_assistant.helpers.images import create_collage, get_image_thumb
from music_assistant.helpers.throttle_retry import Throttler
from music_assistant.models.core_controller import CoreController
+from music_assistant.models.music_provider import MusicProvider
if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig
for prov_mapping in sorted(
artist.provider_mappings, key=lambda x: x.priority, reverse=True
):
- if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
+ prov = self.mass.get_provider(
+ prov_mapping.provider_instance, provider_type=MusicProvider
+ )
+ if prov is None:
continue
- if prov.lookup_key in unique_keys:
+ # prefer domain for streaming providers as the catalog is the same across instances
+ prov_key = prov.domain if prov.is_streaming_provider else prov.instance_id
+ if prov_key in unique_keys:
continue
- if prov.lookup_key not in local_provs:
- unique_keys.add(prov.lookup_key)
+ unique_keys.add(prov_key)
with suppress(MediaNotFoundError):
prov_item = await self.mass.music.artists.get_provider_item(
prov_mapping.item_id, prov_mapping.provider_instance
# note that we sort the providers by priority so that we always
# prefer local providers over online providers
unique_keys: set[str] = set()
- local_provs = get_global_cache_value("non_streaming_providers")
- if TYPE_CHECKING:
- local_provs = cast("set[str]", local_provs)
for prov_mapping in sorted(album.provider_mappings, key=lambda x: x.priority, reverse=True):
- if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
+ prov = self.mass.get_provider(
+ prov_mapping.provider_instance, provider_type=MusicProvider
+ )
+ if prov is None:
continue
- if prov.lookup_key in unique_keys:
+ # prefer domain for streaming providers as the catalog is the same across instances
+ prov_key = prov.domain if prov.is_streaming_provider else prov.instance_id
+ if prov_key in unique_keys:
continue
- if prov.lookup_key not in local_provs:
- unique_keys.add(prov.lookup_key)
+ unique_keys.add(prov_key)
with suppress(MediaNotFoundError):
prov_item = await self.mass.music.albums.get_provider_item(
prov_mapping.item_id, prov_mapping.provider_instance
# note that we sort the providers by priority so that we always
# prefer local providers over online providers
unique_keys: set[str] = set()
- local_provs = get_global_cache_value("non_streaming_providers")
- if TYPE_CHECKING:
- local_provs = cast("set[str]", local_provs)
for prov_mapping in sorted(track.provider_mappings, key=lambda x: x.priority, reverse=True):
- if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
+ prov = self.mass.get_provider(
+ prov_mapping.provider_instance, provider_type=MusicProvider
+ )
+ if prov is None:
continue
- if prov.lookup_key in unique_keys:
+ # prefer domain for streaming providers as the catalog is the same across instances
+ prov_key = prov.domain if prov.is_streaming_provider else prov.instance_id
+ if prov_key in unique_keys:
continue
- unique_keys.add(prov.lookup_key)
+ unique_keys.add(prov_key)
with suppress(MediaNotFoundError):
prov_item = await self.mass.music.tracks.get_provider_item(
prov_mapping.item_id, prov_mapping.provider_instance
# note that we sort the providers by priority so that we always
# prefer local providers over online providers
unique_keys: set[str] = set()
- local_provs = get_global_cache_value("non_streaming_providers")
- if TYPE_CHECKING:
- local_provs = cast("set[str]", local_provs)
for prov_mapping in sorted(
audiobook.provider_mappings, key=lambda x: x.priority, reverse=True
):
- if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
+ prov = self.mass.get_provider(
+ prov_mapping.provider_instance, provider_type=MusicProvider
+ )
+ if prov is None:
continue
- if prov.lookup_key in unique_keys:
+ # prefer domain for streaming providers as the catalog is the same across instances
+ prov_key = prov.domain if prov.is_streaming_provider else prov.instance_id
+ if prov_key in unique_keys:
continue
- if prov.lookup_key not in local_provs:
- unique_keys.add(prov.lookup_key)
+ unique_keys.add(prov_key)
with suppress(MediaNotFoundError):
prov_item = await self.mass.music.audiobooks.get_provider_item(
prov_mapping.item_id, prov_mapping.provider_instance
# note that we sort the providers by priority so that we always
# prefer local providers over online providers
unique_keys: set[str] = set()
- local_provs = get_global_cache_value("non_streaming_providers")
- if TYPE_CHECKING:
- local_provs = cast("set[str]", local_provs)
for prov_mapping in sorted(
podcast.provider_mappings, key=lambda x: x.priority, reverse=True
):
- if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
+ prov = self.mass.get_provider(
+ prov_mapping.provider_instance, provider_type=MusicProvider
+ )
+ if prov is None:
continue
- if prov.lookup_key in unique_keys:
+ # prefer domain for streaming providers as the catalog is the same across instances
+ prov_key = prov.domain if prov.is_streaming_provider else prov.instance_id
+ if prov_key in unique_keys:
continue
- if prov.lookup_key not in local_provs:
- unique_keys.add(prov.lookup_key)
+ unique_keys.add(prov_key)
with suppress(MediaNotFoundError):
prov_item = await self.mass.music.podcasts.get_provider_item(
prov_mapping.item_id, prov_mapping.provider_instance
if media_types is None:
media_types = MediaType.ALL
media_types_str = "(" + ",".join(f'"{x}"' for x in media_types) + ")"
- available_providers = ("library", *get_global_cache_value("unique_providers", []))
+ available_providers = ("library", *self.get_unique_providers())
available_providers_str = "(" + ",".join(f'"{x}"' for x in available_providers) + ")"
query = (
f"SELECT * FROM {DB_TABLE_PLAYLOG} "
self, limit: int = 10, all_users: bool = False
) -> list[ItemMapping]:
"""Return a list of the Audiobooks and PodcastEpisodes that are in progress."""
- available_providers = ("library", *get_global_cache_value("unique_providers", []))
+ available_providers = ("library", *self.get_unique_providers())
available_providers_str = "(" + ",".join(f'"{x}"' for x in available_providers) + ")"
query = (
f"SELECT * FROM {DB_TABLE_PLAYLOG} "
if loudness in (None, inf, -inf):
# skip invalid values
return
+ # prefer domain for streaming providers as the catalog is the same across instances
+ prov_key = provider.domain if provider.is_streaming_provider else provider.instance_id
values = {
"item_id": item_id,
"media_type": media_type.value,
- "provider": provider.lookup_key,
+ "provider": prov_key,
"loudness": loudness,
}
if album_loudness not in (None, inf, -inf):
return
beats_json = await asyncio.to_thread(lambda: json_dumps(analysis.beats.tolist()))
downbeats_json = await asyncio.to_thread(lambda: json_dumps(analysis.downbeats.tolist()))
+ # prefer domain for streaming providers as the catalog is the same across instances
+ prov_key = provider.domain if provider.is_streaming_provider else provider.instance_id
values = {
"fragment": analysis.fragment.value,
"item_id": item_id,
- "provider": provider.lookup_key,
+ "provider": prov_key,
"bpm": analysis.bpm,
"beats": beats_json,
"downbeats": downbeats_json,
"""Get Smart Fades BPM analysis for a track from db."""
if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
return None
+ # prefer domain for streaming providers as the catalog is the same across instances
+ prov_key = provider.domain if provider.is_streaming_provider else provider.instance_id
db_row = await self.database.get_row(
DB_TABLE_SMART_FADES_ANALYSIS,
{
"item_id": item_id,
- "provider": provider.lookup_key,
+ "provider": prov_key,
"fragment": fragment.value,
},
)
"""Get (EBU-R128) Integrated Loudness Measurement for a mediaitem in db."""
if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
return None
+ # prefer domain for streaming providers as the catalog is the same across instances
+ prov_key = provider.domain if provider.is_streaming_provider else provider.instance_id
db_row = await self.database.get_row(
DB_TABLE_LOUDNESS_MEASUREMENTS,
{
"item_id": item_id,
"media_type": media_type.value,
- "provider": provider.lookup_key,
+ "provider": prov_key,
},
)
if db_row and db_row["loudness"] != inf and db_row["loudness"] != -inf:
# Try to get position from providers
for prov_mapping in media_item.provider_mappings:
- if not (provider := self.mass.get_provider(prov_mapping.provider_instance)):
+ if not (
+ provider := self.mass.get_provider(
+ prov_mapping.provider_instance, provider_type=MusicProvider
+ )
+ ):
continue
- # Type guard: ensure this is a MusicProvider with get_resume_position method
- if isinstance(provider, MusicProvider):
- with suppress(NotImplementedError):
- (
- provider_fully_played,
- provider_position_ms,
- ) = await provider.get_resume_position(
- prov_mapping.item_id, media_item.media_type
- )
- break # Use first provider that returns data
+ with suppress(NotImplementedError):
+ (
+ provider_fully_played,
+ provider_position_ms,
+ ) = await provider.get_resume_position(prov_mapping.item_id, media_item.media_type)
+ break # Use first provider that returns data
# Get MA's internal position from playlog
ma_fully_played = False
def get_unique_providers(self) -> set[str]:
"""
- Return all unique MusicProvider instance ids.
+ Return all unique MusicProvider (instance or domain) ids.
- This will return all filebased instances but only one instance
- for streaming providers.
+ This will return instance_ids for non-streaming providers
+ and domain names for streaming providers to avoid duplicates.
"""
- instances = set()
- domains = set()
+ result: set[str] = set()
for provider in self.providers:
- if provider.domain not in domains or not provider.is_streaming_provider:
- instances.add(provider.instance_id)
- domains.add(provider.domain)
- return instances
+ if provider.is_streaming_provider:
+ result.add(provider.domain)
+ else:
+ result.add(provider.instance_id)
+ return result
async def cleanup_provider(self, provider_instance: str) -> None:
"""Cleanup provider records from the database."""
for player in self._players.values()
if (player.available or return_unavailable)
and (player.enabled or return_disabled)
- and (provider_filter is None or player.provider.lookup_key == provider_filter)
+ and (provider_filter is None or player.provider.instance_id == provider_filter)
and (not user_filter or player.player_id in user_filter)
and (return_sync_groups or not isinstance(player, SyncGroupPlayer))
]
# check if player can be synced/grouped with the target player
if not (
child_player_id in parent_player.can_group_with
- or child_player.provider.lookup_key in parent_player.can_group_with
+ or child_player.provider.instance_id in parent_player.can_group_with
or "*" in parent_player.can_group_with
):
raise UnsupportedFeaturedException(
if self.is_dynamic and (leader := self.sync_leader):
return leader.can_group_with
elif self.is_dynamic:
- return {self.provider.lookup_key}
+ return {self.provider.instance_id}
else:
return set()
player_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}"
self.mass.config.create_default_player_config(
player_id=player_id,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=name,
enabled=True,
values={
async def on_provider_loaded(self, provider: PlayerProvider) -> None:
"""Handle logic when a provider is loaded."""
# register existing syncgroup players for this provider
- for player_conf in await self.mass.config.get_player_configs(provider.lookup_key):
+ for player_conf in await self.mass.config.get_player_configs(provider.instance_id):
if player_conf.player_id.startswith(SYNCGROUP_PREFIX):
await self._register_syncgroup_player(player_conf.player_id, provider)
"""Handle logic when a provider is (about to get) unloaded."""
# unregister existing syncgroup players for this provider
for player in self.mass.players.all(
- provider_filter=provider.lookup_key, return_sync_groups=True
+ provider_filter=provider.instance_id, return_sync_groups=True
):
if player.player_id.startswith(SYNCGROUP_PREFIX):
await self.mass.players.unregister(player.player_id, False)
This is called just-in-time when a PlayerQueue wants a MediaItem to be played.
Do not try to request streamdetails too much in advance as this is expiring data.
"""
+ streamdetails: StreamDetails | None = None
time_start = time.time()
LOGGER.debug("Getting streamdetails for %s", queue_item.uri)
if seek_position and (queue_item.media_type == MediaType.RADIO or not queue_item.duration):
# already got a fresh/unused (or unexpired) streamdetails
streamdetails = queue_item.streamdetails
else:
+ # need to (re)create streamdetails
# retrieve streamdetails from provider
+
media_item = queue_item.media_item
assert media_item is not None # for type checking
- # sort by quality and check item's availability
- for prov_media in sorted(
- media_item.provider_mappings, key=lambda x: x.quality or 0, reverse=True
+ preferred_providers: list[str] = []
+ if (
+ (queue := mass.player_queues.get(queue_item.queue_id))
+ and queue.userid
+ and (playback_user := await mass.webserver.auth.get_user(queue.userid))
+ and playback_user.provider_filter
):
- if not prov_media.available:
- LOGGER.debug(f"Skipping unavailable {prov_media}")
- continue
- # guard that provider is available
- music_prov = mass.get_provider(prov_media.provider_instance)
- if TYPE_CHECKING: # avoid circular import
- assert isinstance(music_prov, MusicProvider)
- if not music_prov:
- LOGGER.debug(f"Skipping {prov_media} - provider not available")
- continue # provider not available ?
- # get streamdetails from provider
- try:
- BYPASS_THROTTLER.set(True)
- streamdetails = await music_prov.get_stream_details(
- prov_media.item_id, media_item.media_type
- )
- except MusicAssistantError as err:
- LOGGER.warning(str(err))
- else:
- break
- finally:
- BYPASS_THROTTLER.set(False)
+ # handle steering into user preferred providerinstance
+ preferred_providers = playback_user.provider_filter
else:
+ preferred_providers = [x.provider_instance for x in media_item.provider_mappings]
+ for allow_other_provider in (False, True):
+ # sort by quality and check item's availability
+ for prov_media in sorted(
+ media_item.provider_mappings, key=lambda x: x.quality or 0, reverse=True
+ ):
+ if not prov_media.available:
+ LOGGER.debug(f"Skipping unavailable {prov_media}")
+ continue
+ if (
+ not allow_other_provider
+ and prov_media.provider_instance not in preferred_providers
+ ):
+ continue
+ # guard that provider is available
+ music_prov = mass.get_provider(prov_media.provider_instance)
+ if TYPE_CHECKING: # avoid circular import
+ assert isinstance(music_prov, MusicProvider)
+ if not music_prov:
+ LOGGER.debug(f"Skipping {prov_media} - provider not available")
+ continue # provider not available ?
+ # get streamdetails from provider
+ try:
+ BYPASS_THROTTLER.set(True)
+ streamdetails = await music_prov.get_stream_details(
+ prov_media.item_id, media_item.media_type
+ )
+ except MusicAssistantError as err:
+ LOGGER.warning(str(err))
+ else:
+ break
+ finally:
+ BYPASS_THROTTLER.set(False)
+
+ if not streamdetails:
msg = f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
raise MediaNotFoundError(msg)
*,
feed_url: str,
parsed_feed: dict[str, Any],
- lookup_key: str,
- domain: str,
instance_id: str,
+ domain: str,
mass_item_id: str | None = None,
) -> Podcast:
"""Podcast -> Mass Podcast.
item_id=item_id,
name=parsed_feed.get("title", "NO_TITLE"),
publisher=publisher,
- provider=lookup_key,
+ provider=instance_id,
uri=parsed_feed.get("link"),
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=podcast_cover,
- provider=lookup_key,
+ provider=instance_id,
remotely_accessible=True,
)
]
prov_podcast_id: str,
episode_cnt: int,
podcast_cover: str | None = None,
- lookup_key: str,
- domain: str,
instance_id: str,
+ domain: str,
mass_item_id: str | None = None,
) -> PodcastEpisode | None:
"""Podcast Episode -> Mass Podcast Episode.
episode_id = f"{prov_podcast_id} {guid_or_stream_url}" if mass_item_id is None else mass_item_id
mass_episode = PodcastEpisode(
item_id=episode_id,
- provider=lookup_key,
+ provider=instance_id,
name=episode_title,
duration=int(episode_duration),
position=episode_cnt,
podcast=ItemMapping(
item_id=prov_podcast_id,
- provider=lookup_key,
+ provider=instance_id,
name=episode_title,
media_type=MediaType.PODCAST,
),
MediaItemImage(
type=ImageType.THUMB,
path=episode_cover,
- provider=lookup_key,
+ provider=instance_id,
remotely_accessible=True,
)
]
import pathlib
import threading
from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine
-from typing import TYPE_CHECKING, Any, Self, TypeGuard, TypeVar, cast
+from typing import TYPE_CHECKING, Any, Self, TypeGuard, TypeVar, cast, overload
from uuid import uuid4
import aiofiles
PROVIDERS_PATH = os.path.join(BASE_DIR, "providers")
_R = TypeVar("_R")
+_ProviderT = TypeVar("_ProviderT", bound=ProviderInstanceType)
def is_music_provider(provider: ProviderInstanceType) -> TypeGuard[MusicProvider]:
"""Return all loaded/running Providers (instances)."""
return list(self._providers.values())
+ @overload
def get_provider(
- self, provider_instance_or_domain: str, return_unavailable: bool = False
- ) -> ProviderInstanceType | None:
- """Return provider by instance id or domain."""
+ self,
+ provider_instance_or_domain: str,
+ return_unavailable: bool = False,
+ provider_type: None = None,
+ ) -> ProviderInstanceType | None: ...
+
+ @overload
+ def get_provider(
+ self,
+ provider_instance_or_domain: str,
+ return_unavailable: bool = False,
+ *,
+ provider_type: type[_ProviderT],
+ ) -> _ProviderT | None: ...
+
+ def get_provider(
+ self,
+ provider_instance_or_domain: str,
+ return_unavailable: bool = False,
+ provider_type: type[_ProviderT] | None = None,
+ ) -> ProviderInstanceType | _ProviderT | None:
+ """Return provider by instance id or domain.
+
+ :param provider_instance_or_domain: Instance ID or domain of the provider.
+ :param return_unavailable: Also return unavailable providers.
+ :param provider_type: Optional type hint for the expected provider type (unused at runtime).
+ """
# lookup by instance_id first
if prov := self._providers.get(provider_instance_or_domain):
if return_unavailable or prov.available:
*{x.domain for x in self.providers},
*{x.instance_id for x in self.providers},
},
- "unique_providers": {x.lookup_key for x in self.providers},
+ "unique_providers": self.music.get_unique_providers(),
"streaming_providers": {
- x.lookup_key
+ x.domain
for x in self.providers
if is_music_provider(x) and x.is_streaming_provider
},
"non_streaming_providers": {
- x.lookup_key
+ x.instance_id
for x in self.providers
if not (is_music_provider(x) and x.is_streaming_provider)
},
"""
return True
- @property
- def lookup_key(self) -> str:
- """Return domain if (multi-instance) streaming_provider or instance_id otherwise."""
- if self.is_streaming_provider or not self.manifest.multi_instance:
- return self.domain
- return self.instance_id
-
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
@property
@final
def provider_id(self) -> str:
- """Return the provider id of the player."""
- return self._provider.lookup_key
+ """Return the provider (instance) id of the player."""
+ return self._provider.instance_id
@property
@final
@property
def players(self) -> list[Player]:
"""Return all players belonging to this provider."""
- return self.mass.players.all(provider_filter=self.lookup_key, return_sync_groups=False)
+ return self.mass.players.all(provider_filter=self.instance_id, return_sync_groups=False)
# should not be overridden in normal circumstances
return self._supported_features
- @property
- def lookup_key(self) -> str:
- """Return instance_id if multi_instance capable or domain otherwise."""
- # should not be overridden in normal circumstances
- return self.instance_id if self.manifest.multi_instance else self.domain
-
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
"default_name": self.default_name,
"instance_name_postfix": self.instance_name_postfix,
"instance_id": self.instance_id,
- "lookup_key": self.lookup_key,
"supported_features": [x.value for x in self.supported_features],
"available": self.available,
"is_streaming_provider": getattr(self, "is_streaming_provider", None),
# the audio_format field should be the native audio format of the stream
# that is returned by the get_audio_stream method.
return PluginSource(
- id=self.lookup_key,
+ id=self.instance_id,
name=self.name,
passive=False,
can_play_pause=False,
PlayerFeature.VOLUME_SET,
}
self._attr_volume_level = initial_volume
- self._attr_can_group_with = {provider.lookup_key}
+ self._attr_can_group_with = {provider.instance_id}
self._attr_enabled_by_default = not is_broken_airplay_model(manufacturer, model)
@cached_property
await self.mass.cache.set(
key=self.player_id,
data=volume_level,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
category=CACHE_CATEGORY_PREV_VOLUME,
)
# Get volume from cache
if not (
volume := await self.mass.cache.get(
- key=player_id, provider=self.lookup_key, category=CACHE_CATEGORY_PREV_VOLUME
+ key=player_id, provider=self.instance_id, category=CACHE_CATEGORY_PREV_VOLUME
)
):
volume = FALLBACK_VOLUME
) from exc
return StreamDetails(
item_id=item_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
path=stream_url,
stream_type=StreamType.HTTP,
audio_format=AudioFormat(content_type=ContentType.UNKNOWN),
key_id = base64.b64decode(uri.split(",")[1])
return StreamDetails(
item_id=item_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
audio_format=AudioFormat(content_type=ContentType.MP4, codec_type=ContentType.AAC),
stream_type=StreamType.ENCRYPTED_HTTP,
decryption_key=await self._get_decryption_key(license_url, key_id, uri, item_id),
# No more details available other than the id, return an ItemMapping
return ItemMapping(
media_type=MediaType.ARTIST,
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=artist_id,
name=artist_id,
)
if artwork := attributes.get("artwork"):
artist.metadata.add_image(
MediaItemImage(
- provider=self.lookup_key,
+ provider=self.instance_id,
type=ImageType.THUMB,
path=artwork["url"].format(w=artwork["width"], h=artwork["height"]),
remotely_accessible=True,
# No more details available other than the id, return an ItemMapping
return ItemMapping(
media_type=MediaType.ALBUM,
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=album_id,
name=album_id,
)
[
ItemMapping(
media_type=MediaType.ARTIST,
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=artist_name,
name=artist_name,
)
if artwork := attributes.get("artwork"):
album.metadata.add_image(
MediaItemImage(
- provider=self.lookup_key,
+ provider=self.instance_id,
type=ImageType.THUMB,
path=artwork["url"].format(w=artwork["width"], h=artwork["height"]),
remotely_accessible=True,
ItemMapping(
media_type=MediaType.ARTIST,
item_id=artist_name,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=artist_name,
)
]
if artwork := attributes.get("artwork"):
track.metadata.add_image(
MediaItemImage(
- provider=self.lookup_key,
+ provider=self.instance_id,
type=ImageType.THUMB,
path=artwork["url"].format(w=artwork["width"], h=artwork["height"]),
remotely_accessible=True,
is_editable = attributes.get("canEdit", False)
playlist = Playlist(
item_id=playlist_id,
- provider=self.instance_id if is_editable else self.lookup_key,
+ provider=self.instance_id,
name=attributes.get("name", UNKNOWN_PLAYLIST_NAME),
owner=attributes.get("curatorName", "me"),
provider_mappings={
url = url.format(w=artwork["width"], h=artwork["height"])
playlist.metadata.add_image(
MediaItemImage(
- provider=self.lookup_key,
+ provider=self.instance_id,
type=ImageType.THUMB,
path=url,
remotely_accessible=True,
podcasts += [
_parse_podcast(
self.domain,
- self.lookup_key,
self.instance_id,
element,
element["coreId"],
return _parse_podcast(
self.domain,
- self.lookup_key,
self.instance_id,
result,
prov_podcast_id,
progress = self._get_progress(episode_id)
yield _parse_podcast_episode(
self.domain,
- self.lookup_key,
self.instance_id,
episode,
episode_id,
progress = self._get_progress(prov_episode_id)
return _parse_podcast_episode(
self.domain,
- self.lookup_key,
self.instance_id,
result,
result["showId"],
podcast = _parse_podcast(
self.domain,
- self.lookup_key,
self.instance_id,
pod,
pod["coreId"],
def _parse_podcast(
domain: str,
- lookup_key: str,
instance_id: str,
podcast_query: dict[str, Any],
podcast_id: str,
name=podcast_query["title"],
item_id=podcast_id,
publisher=podcast_query["publicationService"]["title"],
- provider=lookup_key,
+ provider=instance_id,
provider_mappings={
ProviderMapping(
item_id=podcast_id,
def _parse_podcast_episode(
domain: str,
- lookup_key: str,
instance_id: str,
episode: dict[str, Any],
podcast_id: str,
name=episode["title"],
duration=episode["duration"],
item_id=episode["coreId"],
- provider=lookup_key,
+ provider=instance_id,
podcast=ItemMapping(
item_id=podcast_id,
- provider=lookup_key,
+ provider=instance_id,
name=podcast_title,
media_type=MediaType.PODCAST,
),
assert isinstance(podcast_minified, LibraryItemMinifiedPodcast)
mass_podcast = parse_podcast(
abs_podcast=podcast_minified,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
token=self._client.token,
base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
)
abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
return parse_podcast(
abs_podcast=abs_podcast,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
token=self._client.token,
base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
)
episode=abs_episode,
prov_podcast_id=prov_podcast_id,
fallback_episode_cnt=episode_cnt,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
token=self._client.token,
base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
media_progress=progress,
episode=abs_episode,
prov_podcast_id=prov_podcast_id,
fallback_episode_cnt=episode_cnt,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
token=self._client.token,
base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
media_progress=progress,
continue
mass_audiobook = parse_audiobook(
abs_audiobook=book_expanded,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
token=self._client.token,
base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
)
abs_audiobook = await self._get_abs_expanded_audiobook(prov_audiobook_id=prov_audiobook_id)
return parse_audiobook(
abs_audiobook=abs_audiobook,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
token=self._client.token,
base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
media_progress=progress,
file_parts.append(MultiPartPath(path=stream_url, duration=track.duration))
return StreamDetails(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=abs_audiobook.id_,
audio_format=AudioFormat(content_type=content_type),
media_type=MediaType.AUDIOBOOK,
base_url = str(self.config.get_value(CONF_URL))
stream_url = f"{base_url}{abs_episode.audio_track.content_url}?token={self._client.token}"
return StreamDetails(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=podcast_id,
audio_format=AudioFormat(
content_type=content_type,
icon=ABS_SHELF_ID_ICONS.get(shelf_id),
# translation_key=shelf.id_,
items=UniqueList(recommendation_items),
- provider=self.lookup_key,
+ provider=self.instance_id,
)
)
icon="mdi-bookshelf",
# translation_key=shelf.id_,
items=UniqueList(browse_items),
- provider=self.lookup_key,
+ provider=self.instance_id,
)
)
item = parse_podcast_episode(
episode=entity.recent_episode,
prov_podcast_id=podcast_id,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
token=self._client.token,
base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
)
BrowseFolder(
item_id=entity.id_,
name=entity.name,
- provider=self.lookup_key,
+ provider=self.instance_id,
path=path,
)
)
BrowseFolder(
item_id=entity.id_,
name=entity.name,
- provider=self.lookup_key,
+ provider=self.instance_id,
path=path,
)
)
return BrowseFolder(
item_id=lib_id,
name=lib_name,
- provider=self.lookup_key,
+ provider=self.instance_id,
path=f"{self.instance_id}://{path}",
)
BrowseFolder(
item_id=item_name.lower(),
name=item_name,
- provider=self.lookup_key,
+ provider=self.instance_id,
path=path,
)
)
BrowseFolder(
item_id=author.id_,
name=author.name,
- provider=self.lookup_key,
+ provider=self.instance_id,
path=path,
)
)
BrowseFolder(
item_id=narrator.id_,
name=narrator.name,
- provider=self.lookup_key,
+ provider=self.instance_id,
path=path,
)
)
BrowseFolder(
item_id=abs_series.id_,
name=abs_series.name,
- provider=self.lookup_key,
+ provider=self.instance_id,
path=path,
)
)
BrowseFolder(
item_id=abs_collection.id_,
name=abs_collection.name,
- provider=self.lookup_key,
+ provider=self.instance_id,
path=path,
)
)
BrowseFolder(
item_id=series.id_,
name=f"{series.name} ({AbsBrowseItemsBook.SERIES})",
- provider=self.lookup_key,
+ provider=self.instance_id,
path=path,
)
)
await self.mass.music.audiobooks.add_item_to_library(
parse_audiobook(
abs_audiobook=abs_item,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
token=self._client.token,
base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
),
)
mass_podcast = parse_podcast(
abs_podcast=abs_item,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
token=self._client.token,
base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
)
if discarded_item := await self.mass.music.get_library_item_by_prov_id(
media_type=MediaType.AUDIOBOOK,
item_id=discarded_progress_id,
- provider_instance_id_or_domain=self.lookup_key,
+ provider_instance_id_or_domain=self.instance_id,
):
self.progress_guard.add_progress(discarded_progress_id)
await self.mass.music.mark_item_unplayed(discarded_item)
abs_podcast: AbsLibraryItemExpandedPodcast
| AbsLibraryItemMinifiedPodcast
| AbsLibraryItemPodcast,
- lookup_key: str,
- domain: str,
instance_id: str,
+ domain: str,
token: str | None,
base_url: str,
) -> MassPodcast:
item_id=abs_podcast.id_,
name=title,
publisher=abs_podcast.media.metadata.author,
- provider=lookup_key,
+ provider=instance_id,
provider_mappings={
ProviderMapping(
item_id=abs_podcast.id_,
if token is not None:
image_url = f"{base_url}/api/items/{abs_podcast.id_}/cover?token={token}"
mass_podcast.metadata.images = UniqueList(
- [MediaItemImage(type=ImageType.THUMB, path=image_url, provider=lookup_key)]
+ [MediaItemImage(type=ImageType.THUMB, path=image_url, provider=instance_id)]
)
mass_podcast.metadata.explicit = abs_podcast.media.metadata.explicit
if abs_podcast.media.metadata.language is not None:
episode: AbsPodcastEpisode | AbsPodcastEpisodeExpanded,
prov_podcast_id: str,
fallback_episode_cnt: int | None = None,
- lookup_key: str,
- domain: str,
instance_id: str,
+ domain: str,
token: str | None,
base_url: str,
media_progress: AbsMediaProgress | None = None,
position = fallback_episode_cnt
mass_episode = MassPodcastEpisode(
item_id=episode_id,
- provider=lookup_key,
+ provider=instance_id,
name=episode.title,
duration=duration,
position=position,
podcast=ItemMapping(
item_id=prov_podcast_id,
- provider=lookup_key,
+ provider=instance_id,
name=episode.title,
media_type=MediaType.PODCAST,
),
url_api = f"/api/items/{prov_podcast_id}/cover?token={token}"
url_cover = f"{base_url}{url_api}"
mass_episode.metadata.images = UniqueList(
- [MediaItemImage(type=ImageType.THUMB, path=url_cover, provider=lookup_key)]
+ [MediaItemImage(type=ImageType.THUMB, path=url_cover, provider=instance_id)]
)
if media_progress is not None and media_progress.current_time is not None:
def parse_audiobook(
*,
abs_audiobook: AbsLibraryItemExpandedBook | AbsLibraryItemMinifiedBook,
- lookup_key: str,
- domain: str,
instance_id: str,
+ domain: str,
token: str | None,
base_url: str,
media_progress: AbsMediaProgress | None = None,
title += f" | {subtitle}"
mass_audiobook = MassAudiobook(
item_id=abs_audiobook.id_,
- provider=lookup_key,
+ provider=instance_id,
name=title,
duration=int(abs_audiobook.media.duration),
provider_mappings={
api_url = f"/api/items/{abs_audiobook.id_}/cover?token={token}"
cover_url = f"{base_url}{api_url}"
mass_audiobook.metadata.images = UniqueList(
- [MediaItemImage(type=ImageType.THUMB, path=cover_url, provider=lookup_key)]
+ [MediaItemImage(type=ImageType.THUMB, path=cover_url, provider=instance_id)]
)
# expanded version
self._attr_source_list = []
self._attr_needs_poll = True
self._attr_poll_interval = IDLE_POLL_INTERVAL
- self._attr_can_group_with = {provider.lookup_key}
+ self._attr_can_group_with = {provider.instance_id}
async def setup(self) -> None:
"""Set up the player."""
self.logger.warning("Radio station %s not found: %s", item, err)
yield Radio(
item_id=item["item_id"],
- provider=self.lookup_key,
+ provider=self.instance_id,
name=item["name"],
provider_mappings={
ProviderMapping(
return [
RecommendationFolder(
item_id="recommended_tracks",
- provider=self.lookup_key,
+ provider=self.instance_id,
name="Recommended tracks",
translation_key="recommended_tracks",
items=UniqueList(
url = url_details["sources"][0]["url"]
return StreamDetails(
item_id=item_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
audio_format=AudioFormat(
content_type=ContentType.try_parse(url_details["format"].split("_")[0])
),
MediaItemImage(
type=ImageType.THUMB,
path=track.album.cover_big,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
)
MediaItemImage(
type=ImageType.THUMB,
path=album.cover_big,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
MediaItemImage(
type=ImageType.THUMB,
path=artist.picture_big,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
"""Parse the deezer-python artist to a Music Assistant artist."""
return Artist(
item_id=str(artist.id),
- provider=self.lookup_key,
+ provider=self.instance_id,
name=artist.name,
media_type=MediaType.ARTIST,
provider_mappings={
return Album(
album_type=self.get_album_type(album),
item_id=str(album.id),
- provider=self.lookup_key,
+ provider=self.instance_id,
name=album.title,
artists=UniqueList(
[
ItemMapping(
media_type=MediaType.ARTIST,
item_id=str(album.artist.id),
- provider=self.lookup_key,
+ provider=self.instance_id,
name=album.artist.name,
)
]
is_editable = creator.id == self.user.id
return Playlist(
item_id=str(playlist.id),
- provider=self.instance_id if is_editable else self.lookup_key,
+ provider=self.instance_id,
name=playlist.title,
media_type=MediaType.PLAYLIST,
provider_mappings={
MediaItemImage(
type=ImageType.THUMB,
path=playlist.picture_big,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
artist = ItemMapping(
media_type=MediaType.ARTIST,
item_id=str(getattr(track.artist, "id", f"deezer-{track.artist.name}")),
- provider=self.lookup_key,
+ provider=self.instance_id,
name=track.artist.name,
)
else:
album = ItemMapping(
media_type=MediaType.ALBUM,
item_id=str(track.album.id),
- provider=self.lookup_key,
+ provider=self.instance_id,
name=track.album.title,
)
else:
item = Track(
item_id=str(track.id),
- provider=self.lookup_key,
+ provider=self.instance_id,
name=track.title,
sort_name=self.get_short_title(track),
duration=track.duration,
yield parse_podcast(
feed_url=feed_url,
parsed_feed=parsed_podcast,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
)
self.timestamp_subscriptions = subscriptions.timestamp
return parse_podcast(
feed_url=prov_podcast_id,
parsed_feed=parsed_podcast,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
)
async def get_podcast_episodes(
episode_cnt=cnt,
podcast_cover=podcast_cover,
domain=self.domain,
- lookup_key=self.lookup_key,
instance_id=self.instance_id,
)
if mass_episode is None:
if stream_url is None:
raise MediaNotFoundError
return StreamDetails(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.try_parse(stream_url),
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=name,
)
url = await self._client.get_full_stream_url(int(item_id), "music-assistant")
return StreamDetails(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.UNKNOWN,
artist = Artist(
item_id=artist_id,
name=artist_obj["name"],
- provider=self.lookup_key,
+ provider=self.instance_id,
provider_mappings={
ProviderMapping(
item_id=artist_id,
MediaItemImage(
type=ImageType.THUMB,
path=await self._client.get_artist_artwork_url(artist_id),
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
name, version = parse_title_and_version(album_obj["name"])
album = Album(
item_id=album_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=name,
year=album_obj["year"],
version=version,
artist = Artist(
item_id=VARIOUS_ARTISTS_MBID,
name=VARIOUS_ARTISTS_NAME,
- provider=self.lookup_key,
+ provider=self.instance_id,
provider_mappings={
ProviderMapping(
item_id=VARIOUS_ARTISTS_MBID,
return MediaItemImage(
type=ImageType.THUMB,
path=url,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
"""Parse an iBroadcast track object to a Track model object."""
track = Track(
item_id=track_obj["track_id"],
- provider=self.lookup_key,
+ provider=self.instance_id,
name=track_obj["title"],
provider_mappings={
ProviderMapping(
playlist_id = str(playlist_obj["playlist_id"])
playlist = Playlist(
item_id=playlist_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=playlist_obj["name"],
provider_mappings={
ProviderMapping(
name=result.track_name,
item_id=result.feed_url,
publisher=result.artist_name,
- provider=self.lookup_key,
+ provider=self.instance_id,
provider_mappings={
ProviderMapping(
item_id=result.feed_url,
if artwork_url is not None:
image_list.append(
MediaItemImage(
- type=ImageType.THUMB, path=artwork_url, provider=self.lookup_key
+ type=ImageType.THUMB, path=artwork_url, provider=self.instance_id
)
)
podcast.metadata.images = UniqueList(image_list)
return parse_podcast(
feed_url=prov_podcast_id,
parsed_feed=parsed,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
)
async def get_podcast_episodes(
episode_cnt=cnt,
podcast_cover=podcast_cover,
domain=self.domain,
- lookup_key=self.lookup_key,
instance_id=self.instance_id,
):
yield mass_episode
icon="mdi-trending-up",
# translation_key=shelf.id_,
items=UniqueList(podcast_list),
- provider=self.lookup_key,
+ provider=self.instance_id,
)
]
if stream_url is None:
raise MediaNotFoundError
return StreamDetails(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.try_parse(stream_url),
artist = Artist(
item_id=UNKNOWN_ARTIST_MAPPING.item_id,
name=UNKNOWN_ARTIST_MAPPING.name,
- provider=self.lookup_key,
+ provider=self.instance_id,
provider_mappings={
ProviderMapping(
item_id=UNKNOWN_ARTIST_MAPPING.item_id,
)
return StreamDetails(
item_id=jellyfin_track[ITEM_KEY_ID],
- provider=self.lookup_key,
+ provider=self.instance_id,
audio_format=audio_format(jellyfin_track),
stream_type=StreamType.HTTP,
duration=int(
self._attr_name = self.zone_device.zone_data.name
# group
- self._attr_can_group_with = {self.provider.lookup_key}
+ self._attr_can_group_with = {self.provider.instance_id}
self._attr_available = True
async def unload(self, is_removed: bool = False) -> None:
"""Call on unload."""
- for mc_player in self.mass.players.all(provider_filter=self.lookup_key):
+ for mc_player in self.mass.players.all(provider_filter=self.instance_id):
assert isinstance(mc_player, MusicCastPlayer) # for type checking
mc_player.physical_device.remove()
# Create album with common structure
album = Album(
item_id=item_id,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
name=name,
metadata=MediaItemMetadata(
description=description,
if owner_id:
owner_artist = Artist(
item_id=str(owner_id),
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
name=owner_name if owner_name else "",
provider_mappings=self.helper.create_provider_mapping(
item_id=str(owner_id),
MediaItemImage(
type=ImageType.THUMB,
path=thumbnail_url,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
remotely_accessible=True,
)
]
artist = Artist(
item_id=item_id,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
name=name,
metadata=MediaItemMetadata(
description=owner_or_user.description
MediaItemImage(
type=ImageType.THUMB,
path=icon_url,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
remotely_accessible=True,
)
)
"""Convert a nicovideo UserMylistItem into a Playlist."""
playlist = Playlist(
item_id=str(mylist.id_),
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
name=(mylist.title if isinstance(mylist, EssentialMylist) else mylist.name),
owner=mylist.owner.id_ or "",
is_editable=True, # Own mylists are editable by default
MediaItemImage(
type=ImageType.THUMB,
path=mylist.owner.icon_url,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
remotely_accessible=True,
)
)
# Create track with available information
return Track(
item_id=content.id_,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
name=content.title,
duration=content.video.duration,
artists=artists_list,
# Create base track with enhanced metadata
return Track(
item_id=video.id_,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
name=video.title,
duration=video.duration,
artists=artists_list,
# Create base track with enhanced metadata
track = Track(
item_id=video.id_,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
name=video.title,
duration=video.duration,
artists=artists_list,
MediaItemImage(
type=ImageType.THUMB,
path=url,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
remotely_accessible=True,
)
)
MediaItemImage(
type=ImageType.THUMB,
path=thumbnail_url,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
remotely_accessible=True,
)
]
MediaItemImage(
type=ImageType.THUMB,
path=thumbnail.nhd_url,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
remotely_accessible=True,
)
)
MediaItemImage(
type=ImageType.THUMB,
path=thumbnail.large_url,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
remotely_accessible=True,
)
)
MediaItemImage(
type=ImageType.THUMB,
path=thumbnail.middle_url,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
remotely_accessible=True,
)
)
RecommendationFolder(
item_id="nicovideo_recommendations",
name="nicovideo recommendations",
- provider=self.lookup_key,
+ provider=self.instance_id,
icon="mdi-star-circle-outline",
items=UniqueList(main_recommendation_tracks),
)
RecommendationFolder(
item_id="nicovideo_history",
name="Recently watched (nicovideo history)",
- provider=self.lookup_key,
+ provider=self.instance_id,
icon="mdi-history",
items=UniqueList(history_tracks),
)
RecommendationFolder(
item_id="nicovideo_following_activities",
name="New Tracks from Followed Users",
- provider=self.lookup_key,
+ provider=self.instance_id,
icon="mdi-account-plus-outline",
items=UniqueList(following_activities_tracks),
)
RecommendationFolder(
item_id="nicovideo_like_history",
name="Recently liked (Like history)",
- provider=self.lookup_key,
+ provider=self.instance_id,
icon="mdi-heart-outline",
items=UniqueList(like_history_tracks),
)
stream_url = await self._get_stream_url(item_id)
return StreamDetails(
item_id=item_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
audio_format=AudioFormat(
content_type=ContentType.UNKNOWN,
),
popular_folder = RecommendationFolder(
name="Most Popular",
item_id="nugs_popular_shows",
- provider=self.lookup_key,
+ provider=self.instance_id,
)
recommended_folder = RecommendationFolder(
name="Recommended Shows",
item_id="nugs_recommended_shows",
- provider=self.lookup_key,
+ provider=self.instance_id,
)
recent_folder = RecommendationFolder(
name="Recent Shows",
item_id="nugs_recent_shows",
- provider=self.lookup_key,
+ provider=self.instance_id,
)
popular_data = await self._get_data("catalog", popular, limit=20)
for item in popular_data["items"]:
artist_name = artist_obj.get("artistName") or artist_obj.get("name")
artist = Artist(
item_id=str(artist_id),
- provider=self.lookup_key,
+ provider=self.instance_id,
name=str(artist_name),
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=artist_obj["avatarImage"]["url"],
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
)
title = album_obj.get("title") or album_obj.get("containerInfo")
album = Album(
item_id=str(item_id),
- provider=self.lookup_key,
+ provider=self.instance_id,
name=str(title),
# version=album_obj["type"],
provider_mappings={
MediaItemImage(
type=ImageType.THUMB,
path=path,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
)
"""Parse nugs playlist object to generic layout."""
return Playlist(
item_id=playlist_obj["id"],
- provider=self.lookup_key,
+ provider=self.instance_id,
name=playlist_obj["name"],
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=playlist_obj["imageUrl"],
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
track = Track(
item_id=str(track_id),
- provider=self.lookup_key,
+ provider=self.instance_id,
name=str(track_name),
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
)
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=name,
)
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=mapped_name,
version=mapped_version,
)
async def _get_or_create_artist_by_name(self, artist_name: str) -> Artist | ItemMapping:
if library_items := await self.mass.music.artists._get_library_items_by_query(
- search=artist_name, provider_filter=[self.lookup_key]
+ search=artist_name, provider_filter=[self.instance_id]
):
return ItemMapping.from_item(library_items[0])
return Artist(
item_id=artist_id,
name=artist_name or UNKNOWN_ARTIST,
- provider=self.lookup_key,
+ provider=self.instance_id,
provider_mappings={
ProviderMapping(
item_id=str(artist_id),
album_id = plex_album.key
album = Album(
item_id=album_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=plex_album.title or "[Unknown]",
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=thumb,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=False,
)
]
artist = Artist(
item_id=artist_id,
name=plex_artist.title or UNKNOWN_ARTIST,
- provider=self.lookup_key,
+ provider=self.instance_id,
provider_mappings={
ProviderMapping(
item_id=str(artist_id),
MediaItemImage(
type=ImageType.THUMB,
path=thumb,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=False,
)
]
"""Parse a Plex Playlist response to a Playlist object."""
playlist = Playlist(
item_id=plex_playlist.key,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=plex_playlist.title or "[Unknown]",
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=thumb,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=False,
)
]
# Collections are imported as playlists with the configured prefix
playlist = Playlist(
item_id=f"collection:{plex_collection.key}",
- provider=self.lookup_key,
+ provider=self.instance_id,
name=f"{collection_prefix}{plex_collection.title}",
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=thumb,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=False,
)
]
content = None
track = Track(
item_id=plex_track.key,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=plex_track.title or "[Unknown]",
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=thumb,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=False,
)
]
folder = RecommendationFolder(
name=hub.title,
item_id=f"{self.instance_id}_{hub.hubIdentifier}",
- provider=self.lookup_key,
+ provider=self.instance_id,
icon="mdi-music",
)
stream_details = StreamDetails(
item_id=plex_track.key,
- provider=self.lookup_key,
+ provider=self.instance_id,
audio_format=AudioFormat(
content_type=content_type,
channels=media.audioChannels,
def parse_podcast_from_feed(
- feed_data: dict[str, Any], lookup_key: str, domain: str, instance_id: str
+ feed_data: dict[str, Any], instance_id: str, domain: str
) -> Podcast | None:
"""Parse podcast from API feed data."""
feed_url = feed_data.get("url")
item_id=str(podcast_id),
name=feed_data.get("title", "Unknown Podcast"),
publisher=feed_data.get("author") or feed_data.get("ownerName", "Unknown"),
- provider=lookup_key,
+ provider=instance_id,
provider_mappings={
ProviderMapping(
item_id=str(podcast_id),
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
- provider=lookup_key,
+ provider=instance_id,
remotely_accessible=True,
)
)
episode_data: dict[str, Any],
podcast_id: str,
episode_idx: int,
- lookup_key: str,
- domain: str,
instance_id: str,
+ domain: str,
podcast_name: str | None = None,
) -> PodcastEpisode | None:
"""Parse episode from API episode data."""
episode = PodcastEpisode(
item_id=episode_id,
- provider=lookup_key,
+ provider=instance_id,
name=episode_data.get("title", "Unknown Episode"),
duration=duration,
position=position,
podcast=ItemMapping(
item_id=podcast_id,
- provider=lookup_key,
+ provider=instance_id,
name=podcast_name,
media_type=MediaType.PODCAST,
),
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
- provider=lookup_key,
+ provider=instance_id,
remotely_accessible=True,
)
)
podcasts = []
for feed_data in response.get("feeds", []):
- podcast = parse_podcast_from_feed(
- feed_data, self.lookup_key, self.domain, self.instance_id
- )
+ podcast = parse_podcast_from_feed(feed_data, self.instance_id, self.domain)
if podcast:
podcasts.append(podcast)
# Try by ID first
response = await self._api_request("podcasts/byfeedid", params={"id": prov_podcast_id})
if response.get("feed"):
- podcast = parse_podcast_from_feed(
- response["feed"], self.lookup_key, self.domain, self.instance_id
- )
+ podcast = parse_podcast_from_feed(response["feed"], self.instance_id, self.domain)
if podcast:
return podcast
except (ProviderUnavailableError, InvalidDataError):
episode_data,
prov_podcast_id,
idx,
- self.lookup_key,
- self.domain,
self.instance_id,
+ self.domain,
podcast_name,
)
if episode:
if episode_data:
episode = parse_episode_from_data(
- episode_data, podcast_id, 0, self.lookup_key, self.domain, self.instance_id
+ episode_data, podcast_id, 0, self.instance_id, self.domain
)
if episode:
return episode
stream_url = episode_data.get("enclosureUrl")
if stream_url:
return StreamDetails(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.try_parse(
response = await self._api_request(endpoint, params)
podcasts = []
for feed_data in response.get("feeds", []):
- podcast = parse_podcast_from_feed(
- feed_data, self.lookup_key, self.domain, self.instance_id
- )
+ podcast = parse_podcast_from_feed(feed_data, self.instance_id, self.domain)
if podcast:
podcasts.append(podcast)
return podcasts
episode_data,
podcast_id,
idx,
- self.lookup_key,
- self.domain,
self.instance_id,
+ self.domain,
podcast_name,
)
if episode:
podcasts = []
for feed_data in search_response.get("feeds", []):
- podcast = parse_podcast_from_feed(
- feed_data, self.lookup_key, self.domain, self.instance_id
- )
+ podcast = parse_podcast_from_feed(feed_data, self.instance_id, self.domain)
if podcast:
podcasts.append(podcast)
if item_id == episode["guid"]:
stream_url = episode["enclosures"][0]["url"]
return StreamDetails(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.try_parse(stream_url),
return parse_podcast(
feed_url=self.feed_url,
parsed_feed=self.parsed_podcast,
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
mass_item_id=self.podcast_id,
)
prov_podcast_id=self.podcast_id,
episode_cnt=fallback_position,
podcast_cover=self.parsed_podcast.get("cover_url"),
- lookup_key=self.lookup_key,
- domain=self.domain,
instance_id=self.instance_id,
+ domain=self.domain,
mass_item_id=episode_obj["guid"],
)
# Override remotely_accessible as these providers can have unreliable image URLs
self.mass.create_task(self._report_playback_started(streamdata))
return StreamDetails(
item_id=str(item_id),
- provider=self.lookup_key,
+ provider=self.instance_id,
audio_format=AudioFormat(
content_type=content_type,
sample_rate=int(streamdata["sampling_rate"] * 1000),
MediaItemImage(
type=ImageType.THUMB,
path=img,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
)
if img := self.__get_image(album_obj):
album.metadata.add_image(
MediaItemImage(
- provider=self.lookup_key,
+ provider=self.instance_id,
type=ImageType.THUMB,
path=img,
remotely_accessible=True,
MediaItemImage(
type=ImageType.THUMB,
path=img,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
)
)
playlist = Playlist(
item_id=str(playlist_obj["id"]),
- provider=self.instance_id if is_editable else self.lookup_key,
+ provider=self.instance_id,
name=playlist_obj["name"],
owner=playlist_obj["owner"]["name"],
provider_mappings={
MediaItemImage(
type=ImageType.THUMB,
path=img,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
)
folder.image = MediaItemImage(
type=ImageType.THUMB,
path=country.favicon,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
items.append(folder)
MediaItemImage(
type=ImageType.THUMB,
path=radio_obj.favicon,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
from .helpers import enhance_title_with_upcoming # noqa: F401
-def parse_radio(
- channel_id: str, provider_lookup_key: str, provider_domain: str, instance_id: str
-) -> Radio:
+def parse_radio(channel_id: str, instance_id: str, provider_domain: str) -> Radio:
"""Create a Radio object from cached channel information."""
channel_info = RADIO_PARADISE_CHANNELS.get(channel_id, {})
radio = Radio(
- provider=provider_lookup_key,
+ provider=instance_id,
item_id=channel_id,
name=channel_info.get("name", "Unknown Radio"),
provider_mappings={
icon_url = f"{STATION_ICONS_BASE_URL}/{station_icon}"
radio.metadata.add_image(
MediaItemImage(
- provider=provider_lookup_key,
+ provider=instance_id,
type=ImageType.THUMB,
path=icon_url,
remotely_accessible=True,
stream_details = StreamDetails(
item_id=item_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
audio_format=AudioFormat(
content_type=content_type,
channels=2,
def _parse_radio(self, channel_id: str) -> Radio:
"""Create a Radio object from cached channel information."""
- return parsers.parse_radio(channel_id, self.lookup_key, self.domain, self.instance_id)
+ return parsers.parse_radio(channel_id, self.instance_id, self.domain)
async def _get_channel_metadata(self, channel_id: str) -> dict[str, Any] | None:
"""Get current track and upcoming tracks from Radio Paradise's block API.
PlayerFeature.VOLUME_SET,
PlayerFeature.VOLUME_MUTE,
}
- self._attr_can_group_with = {provider.lookup_key}
+ self._attr_can_group_with = {provider.instance_id}
self._attr_power_control = PLAYER_CONTROL_NONE
self._attr_device_info = DeviceInfo()
if player_client := sendspin_client.player:
# See `_channel_updated` for where this is handled.
self._current_stream_details = StreamDetails(
item_id=item_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
audio_format=AudioFormat(
content_type=ContentType.AAC,
),
def _parse_radio(self, channel: XMChannel) -> Radio:
radio = Radio(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=channel.id,
name=channel.name,
provider_mappings={
if icon is not None:
images.append(
MediaItemImage(
- provider=self.lookup_key,
+ provider=self.instance_id,
type=ImageType.THUMB,
path=icon,
remotely_accessible=True,
)
images.append(
MediaItemImage(
- provider=self.lookup_key,
+ provider=self.instance_id,
type=ImageType.LOGO,
path=icon,
remotely_accessible=True,
if banner is not None:
images.append(
MediaItemImage(
- provider=self.lookup_key,
+ provider=self.instance_id,
type=ImageType.BANNER,
path=banner,
remotely_accessible=True,
)
images.append(
MediaItemImage(
- provider=self.lookup_key,
+ provider=self.instance_id,
type=ImageType.LANDSCAPE,
path=banner,
remotely_accessible=True,
PlayerFeature.VOLUME_MUTE,
PlayerFeature.PLAY_ANNOUNCEMENT,
}
- self._attr_can_group_with = {self.provider.lookup_key}
+ self._attr_can_group_with = {self.provider.instance_id}
async def volume_set(self, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
)
self._attr_device_info.model = self.discovery_info["device"]["modelDisplayName"]
self._attr_device_info.manufacturer = self._provider.manifest.name
- self._attr_can_group_with = {self._provider.lookup_key}
+ self._attr_can_group_with = {self._provider.instance_id}
if SonosCapability.LINE_IN in self.discovery_info["device"]["capabilities"]:
self._attr_source_list.append(PLAYER_SOURCE_MAP[SOURCE_LINE_IN])
if x.player_id != airplay_player.player_id
)
else:
- self._attr_can_group_with = {self._provider.lookup_key}
+ self._attr_can_group_with = {self._provider.instance_id}
else:
# player is group child (synced to another player)
group_parent: SonosPlayer = self.mass.players.get(
self._attr_needs_poll = True
self._attr_poll_interval = 5
self._attr_available = True
- self._attr_can_group_with = {provider.lookup_key}
+ self._attr_can_group_with = {provider.instance_id}
# Subscriptions and events
self._subscriptions: list[SubscriptionBase] = []
except TimeoutError:
self.logger.warning("Timeout waiting for target groups %s", groups)
- if players := self.mass.players.all(provider_filter=_provider.lookup_key):
+ if players := self.mass.players.all(provider_filter=_provider.instance_id):
any_speaker = cast("SonosPlayer", players[0])
any_speaker.soco.zone_group_state.clear_cache()
while self._discovery_running:
await asyncio.sleep(0.5)
# Clean up subscriptions and connections
- for sonos_player in self.mass.players.all(provider_filter=self.lookup_key):
+ for sonos_player in self.mass.players.all(provider_filter=self.instance_id):
sonos_player = cast("SonosPlayer", sonos_player)
await sonos_player.offline()
# Stop the async event listener
folder = RecommendationFolder(
name=collection["title"],
item_id=f"{self.instance_id}_{collection['id']}",
- provider=self.lookup_key,
+ provider=self.instance_id,
icon="mdi-playlist-music",
)
for playlist in collection.get("items").get("collection", []):
folder = RecommendationFolder(
name="SoundCloud Feed",
item_id=f"{self.instance_id}_sc_subscribed_feed",
- provider=self.lookup_key,
+ provider=self.instance_id,
icon="mdi-rss",
)
for item in feed["collection"]:
"""Return the content details for the given track when it will be streamed."""
url: str = await self._soundcloud.get_stream_url(track_id=item_id, presets=["mp3"])
return StreamDetails(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=item_id,
# let ffmpeg work out the details itself as
# soundcloud uses a mix of different content types and streaming methods
MediaItemImage(
type=ImageType.THUMB,
path=img_url,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
MediaItemImage(
type=ImageType.THUMB,
path=self._transform_artwork_url(playlist_obj["artwork_url"]),
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
MediaItemImage(
type=ImageType.THUMB,
path=self._transform_artwork_url(track_obj["artwork_url"]),
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
def parse_images(
- images_list: list[dict[str, Any]], lookup_key: str, exclude_generic: bool = False
+ images_list: list[dict[str, Any]], instance_id: str, exclude_generic: bool = False
) -> UniqueList[MediaItemImage]:
"""Parse images list into MediaItemImage objects."""
if not images_list:
MediaItemImage(
type=ImageType.THUMB,
path=best_image["url"],
- provider=lookup_key,
+ provider=instance_id,
remotely_accessible=True,
)
]
"""Parse spotify artist object to generic layout."""
artist = Artist(
item_id=artist_obj["id"],
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=artist_obj["name"] or artist_obj["id"],
provider_mappings={
ProviderMapping(
# Use unified image parsing with generic exclusion
artist.metadata.images = parse_images(
- artist_obj.get("images", []), provider.lookup_key, exclude_generic=True
+ artist_obj.get("images", []), provider.instance_id, exclude_generic=True
)
return artist
name, version = parse_title_and_version(album_obj["name"])
album = Album(
item_id=album_obj["id"],
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=name,
version=version,
provider_mappings={
if "genres" in album_obj:
album.metadata.genres = set(album_obj["genres"])
- album.metadata.images = parse_images(album_obj.get("images", []), provider.lookup_key)
+ album.metadata.images = parse_images(album_obj.get("images", []), provider.instance_id)
if "label" in album_obj:
album.metadata.label = album_obj["label"]
name, version = parse_title_and_version(track_obj["name"])
track = Track(
item_id=track_obj["id"],
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=name,
version=version,
duration=track_obj["duration_ms"] / 1000,
if "album" in track_obj:
track.album = parse_album(track_obj["album"], provider)
track.metadata.images = parse_images(
- track_obj["album"].get("images", []), provider.lookup_key
+ track_obj["album"].get("images", []), provider.instance_id
)
if track_obj.get("copyright"):
track.metadata.copyright = track_obj["copyright"]
playlist = Playlist(
item_id=playlist_obj["id"],
- provider=provider.instance_id if is_editable else provider.lookup_key,
+ provider=provider.instance_id,
name=playlist_obj["name"],
owner=owner_name,
provider_mappings={
is_editable=is_editable,
)
- playlist.metadata.images = parse_images(playlist_obj.get("images", []), provider.lookup_key)
+ playlist.metadata.images = parse_images(playlist_obj.get("images", []), provider.instance_id)
return playlist
"""Parse spotify podcast (show) object to generic layout."""
podcast = Podcast(
item_id=podcast_obj["id"],
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=podcast_obj["name"],
provider_mappings={
ProviderMapping(
if podcast_obj.get("description"):
podcast.metadata.description = podcast_obj["description"]
- podcast.metadata.images = parse_images(podcast_obj.get("images", []), provider.lookup_key)
+ podcast.metadata.images = parse_images(podcast_obj.get("images", []), provider.instance_id)
if "explicit" in podcast_obj:
podcast.metadata.explicit = podcast_obj["explicit"]
if podcast is None and "show" in episode_obj:
podcast = Podcast(
item_id=episode_obj["show"]["id"],
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=episode_obj["show"]["name"],
provider_mappings={
ProviderMapping(
# Create a minimal podcast reference if none available
podcast = Podcast(
item_id="unknown",
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name="Unknown Podcast",
provider_mappings=set(),
)
episode = PodcastEpisode(
item_id=episode_obj["id"],
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=episode_obj["name"],
duration=episode_obj["duration_ms"] // 1000 if episode_obj.get("duration_ms") else 0,
podcast=podcast,
episode.metadata.release_date = datetime.fromisoformat(date_str)
- episode.metadata.images = parse_images(episode_obj.get("images", []), provider.lookup_key)
+ episode.metadata.images = parse_images(episode_obj.get("images", []), provider.instance_id)
# Use podcast artwork if episode has none
if not episode.metadata.images and isinstance(podcast, Podcast) and podcast.metadata.images:
if audiobook_obj.get("publisher"):
audiobook.publisher = audiobook_obj["publisher"]
- audiobook.metadata.images = parse_images(audiobook_obj.get("images", []), provider.lookup_key)
+ audiobook.metadata.images = parse_images(audiobook_obj.get("images", []), provider.instance_id)
if audiobook_obj.get("explicit"):
audiobook.metadata.explicit = audiobook_obj["explicit"]
# For all other media types (tracks, podcast episodes)
return StreamDetails(
item_id=item_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
media_type=media_type,
audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
stream_type=StreamType.CUSTOM,
liked_songs = Playlist(
item_id=self._get_liked_songs_playlist_id(),
- provider=self.lookup_key,
+ provider=self.instance_id,
name=f"Liked Songs {self._sp_user['display_name']}", # TODO to be translated
owner=self._sp_user["display_name"],
provider_mappings={
image = MediaItemImage(
type=ImageType.THUMB,
path="https://misc.scdn.co/liked-songs/liked-songs-64.png",
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
if liked_songs.metadata.images is None:
PlayerFeature.GAPLESS_PLAYBACK,
PlayerFeature.GAPLESS_DIFFERENT_SAMPLERATE,
}
- self._attr_can_group_with = {provider.lookup_key}
+ self._attr_can_group_with = {provider.instance_id}
self.multi_client_stream: MultiClientStream | None = None
self._sync_playpoints: deque[SyncPlayPoint] = deque(maxlen=MIN_REQ_PLAYPOINTS)
self._do_not_resync_before: float = 0.0
artist_idx, album_idx, track_idx = prov_track_id.split("_", 3)
return Track(
item_id=prov_track_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=f"Test Track {artist_idx} - {album_idx} - {track_idx}",
duration=60,
artists=UniqueList([await self.get_artist(artist_idx)]),
"""Get full artist details by id."""
return Artist(
item_id=prov_artist_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=f"Test Artist {prov_artist_id}",
metadata=MediaItemMetadata(images=UniqueList([DEFAULT_THUMB, DEFAULT_FANART])),
provider_mappings={
artist_idx, album_idx = prov_album_id.split("_", 2)
return Album(
item_id=prov_album_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=f"Test Album {album_idx}",
artists=UniqueList([await self.get_artist(artist_idx)]),
provider_mappings={
"""Get full podcast details by id."""
return Podcast(
item_id=prov_podcast_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=f"Test Podcast {prov_podcast_id}",
metadata=MediaItemMetadata(images=UniqueList([DEFAULT_THUMB])),
provider_mappings={
"""Get full audiobook details by id."""
return Audiobook(
item_id=prov_audiobook_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=f"Test Audiobook {prov_audiobook_id}",
metadata=MediaItemMetadata(
images=UniqueList([DEFAULT_THUMB]),
podcast_id, episode_idx = prov_episode_id.split("_", 2)
return PodcastEpisode(
item_id=prov_episode_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=f"Test PodcastEpisode {podcast_id}-{episode_idx}",
duration=60,
podcast=ItemMapping(
item_id=podcast_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=f"Test Podcast {podcast_id}",
media_type=MediaType.PODCAST,
image=DEFAULT_THUMB,
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a track/radio."""
return StreamDetails(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.OGG,
MediaItemImage(
type=img_type,
path=img,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
)
MediaItemImage(
type=img_type,
path=img,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
)
MediaItemImage(
type=img_type,
path=img,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
)
artist_id = str(artist_obj["id"])
artist = Artist(
item_id=artist_id,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=artist_obj["name"],
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
remotely_accessible=True,
)
]
album = Album(
item_id=album_id,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=name,
version=version,
provider_mappings={
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
remotely_accessible=True,
)
]
hi_res_lossless = any(tag in tags for tag in ["HIRES_LOSSLESS", "HI_RES_LOSSLESS"])
track = Track(
item_id=track_id,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=track_obj.get("title", "Unknown"),
version=version,
duration=track_obj.get("duration", 0),
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
remotely_accessible=True,
)
]
playlist = Playlist(
item_id=playlist_id,
- provider=provider.instance_id if is_editable else provider.lookup_key,
+ provider=provider.instance_id,
name=playlist_obj.get("title", "Unknown"),
owner=owner_name,
provider_mappings={
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
remotely_accessible=True,
)
]
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
remotely_accessible=True,
)
]
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=name,
)
RecommendationFolder(
item_id=item_id,
name=folder_name,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
items=UniqueList[MediaItemType | ItemMapping | BrowseFolder](unique_items),
subtitle=f"From {page_name} • {len(unique_items)} items",
translation_key=item_id,
return StreamDetails(
item_id=track.item_id,
- provider=self.provider.lookup_key,
+ provider=self.provider.instance_id,
audio_format=AudioFormat(
content_type=content_type,
sample_rate=stream_data.get("sampleRate", 44100),
preferred_stream = stream_info[0]
radio = Radio(
item_id=details["preset_id"],
- provider=self.lookup_key,
+ provider=self.instance_id,
name=name,
provider_mappings={
ProviderMapping(
# custom url (no stream object present)
radio = Radio(
item_id=details["URL"],
- provider=self.lookup_key,
+ provider=self.instance_id,
name=name,
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=img,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
]
if item_id.startswith("http"):
# custom url
return StreamDetails(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.UNKNOWN,
# and the first one is the best quality
preferred_stream = stream_info[0]
return StreamDetails(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=item_id,
# set contenttype to unknown so ffmpeg can auto detect it
audio_format=AudioFormat(content_type=ContentType.UNKNOWN),
)
# allow grouping with all providers, except the ugp provider itself
self._attr_can_group_with = {
- x.lookup_key
+ x.instance_id
for x in self.mass.players.providers
- if x.lookup_key != self.provider.lookup_key
+ if x.instance_id != self.provider.instance_id
}
self._set_attributes()
player_id = f"{UGP_PREFIX}{shortuuid.random(8).lower()}"
self.mass.config.create_default_player_config(
player_id=player_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=name,
enabled=True,
values={
async def discover_players(self) -> None:
"""Discover players."""
- for player_conf in await self.mass.config.get_player_configs(self.lookup_key):
+ for player_conf in await self.mass.config.get_player_configs(self.instance_id):
if player_conf.player_id.startswith(UGP_PREFIX):
await self._register_player(player_conf.player_id)
stream_format = await self._get_stream_format(item_id=item_id)
self.logger.debug("Found stream_format: %s for song %s", stream_format["format"], item_id)
stream_details = StreamDetails(
- provider=self.lookup_key,
+ provider=self.instance_id,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.try_parse(stream_format["audio_ext"]),
folder = RecommendationFolder(
name=section["title"],
item_id=f"{self.instance_id}_{section['title']}",
- provider=self.lookup_key,
+ provider=self.instance_id,
icon=determine_recommendation_icon(section["title"]),
)
for recommended_item in section.get("contents", []):
album = Album(
item_id=album_id,
name=name,
- provider=self.lookup_key,
+ provider=self.instance_id,
provider_mappings={
ProviderMapping(
item_id=str(album_id),
artist = Artist(
item_id=artist_id,
name=artist_obj["name"],
- provider=self.lookup_key,
+ provider=self.instance_id,
provider_mappings={
ProviderMapping(
item_id=str(artist_id),
playlist_name = f"{playlist_name} ({self.name})"
playlist = Playlist(
item_id=playlist_id,
- provider=self.instance_id if is_editable else self.lookup_key,
+ provider=self.instance_id,
name=playlist_name,
provider_mappings={
ProviderMapping(
track_id = str(track_obj["videoId"])
track = Track(
item_id=track_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=track_obj["title"],
provider_mappings={
ProviderMapping(
podcast = Podcast(
item_id=podcast_obj["podcastId"],
name=podcast_obj["title"],
- provider=self.lookup_key,
+ provider=self.instance_id,
provider_mappings={
ProviderMapping(
item_id=podcast_obj["podcastId"],
item_id = f"{podcast.item_id}{PODCAST_EPISODE_SPLITTER}{episode_id}"
episode = PodcastEpisode(
item_id=item_id,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=episode_obj.get("title"),
podcast=podcast,
provider_mappings={
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=self.lookup_key,
+ provider=self.instance_id,
name=name,
)
MediaItemImage(
type=image_type,
path=url,
- provider=self.lookup_key,
+ provider=self.instance_id,
remotely_accessible=True,
)
)
}),
'name': '',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': '',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
}),
'name': 'テストシリーズ68461151-527007',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'テストシリース68461151-527007',
'translation_key': None,
- 'uri': 'nicovideo://album/527007',
+ 'uri': 'nicovideo_test://album/527007',
'version': '',
'year': None,
})
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
}),
'name': 'テストシリーズ68461151-527007',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'テストシリース68461151-527007',
'translation_key': None,
- 'uri': 'nicovideo://album/527007',
+ 'uri': 'nicovideo_test://album/527007',
'version': '',
'year': None,
}),
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
'images': list([
dict({
'path': 'https://img.cdn.nimg.jp/s/nicovideo/thumbnails/45285955/45285955.27227006.original/r640x360l?key=7098d92a9c12d0b14101a4c3c8297e04c6e7a59eb59ffe9e71c88fa0181b0cb5',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
dict({
'path': 'https://nicovideo.cdn.nimg.jp/thumbnails/45285955/45285955.27227006.L',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'APIテスト用',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
'sort_name': 'apiテスト用',
'track_number': 0,
'translation_key': None,
- 'uri': 'nicovideo://track/sm45285955',
+ 'uri': 'nicovideo_test://track/sm45285955',
'version': '',
}),
]),
}),
'name': '',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': '',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
}),
'name': 'テストシリーズ68461151-527007',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'テストシリース68461151-527007',
'translation_key': None,
- 'uri': 'nicovideo://album/527007',
+ 'uri': 'nicovideo_test://album/527007',
'version': '',
'year': None,
})
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': '中の',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': '中の',
'translation_key': None,
- 'uri': 'nicovideo://artist/4',
+ 'uri': 'nicovideo_test://artist/4',
'version': '',
})
# ---
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
})
# ---
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
'images': list([
dict({
'path': 'https://img.cdn.nimg.jp/s/nicovideo/thumbnails/45285955/45285955.27227006.original/r640x360l?key=7098d92a9c12d0b14101a4c3c8297e04c6e7a59eb59ffe9e71c88fa0181b0cb5',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
dict({
'path': 'https://nicovideo.cdn.nimg.jp/thumbnails/45285955/45285955.27227006.L',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'APIテスト用',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
'sort_name': 'apiテスト用',
'track_number': 0,
'translation_key': None,
- 'uri': 'nicovideo://track/sm45285955',
+ 'uri': 'nicovideo_test://track/sm45285955',
'version': '',
})
# ---
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
'images': list([
dict({
'path': 'https://img.cdn.nimg.jp/s/nicovideo/thumbnails/45285955/45285955.27227006.original/r640x360l?key=7098d92a9c12d0b14101a4c3c8297e04c6e7a59eb59ffe9e71c88fa0181b0cb5',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
dict({
'path': 'https://nicovideo.cdn.nimg.jp/thumbnails/45285955/45285955.27227006.L',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'APIテスト用',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
'sort_name': 'apiテスト用',
'track_number': 0,
'translation_key': None,
- 'uri': 'nicovideo://track/sm45285955',
+ 'uri': 'nicovideo_test://track/sm45285955',
'version': '',
})
# ---
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
'name': 'テストマイリスト68461151-78597499',
'owner': '68461151',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'テストマイリスト68461151-78597499',
'translation_key': None,
- 'uri': 'nicovideo://playlist/78597499',
+ 'uri': 'nicovideo_test://playlist/78597499',
'version': '',
})
# ---
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
'name': 'テストマイリスト68461151-78597499',
'owner': '68461151',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'テストマイリスト68461151-78597499',
'translation_key': None,
- 'uri': 'nicovideo://playlist/78597499',
+ 'uri': 'nicovideo_test://playlist/78597499',
'version': '',
})
# ---
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
'name': 'テストマイリスト68461151-78597499',
'owner': '68461151',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'テストマイリスト68461151-78597499',
'translation_key': None,
- 'uri': 'nicovideo://playlist/78597499',
+ 'uri': 'nicovideo_test://playlist/78597499',
'version': '',
}),
'tracks': list([
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
'images': list([
dict({
'path': 'https://img.cdn.nimg.jp/s/nicovideo/thumbnails/45285955/45285955.27227006.original/r640x360l?key=7098d92a9c12d0b14101a4c3c8297e04c6e7a59eb59ffe9e71c88fa0181b0cb5',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
dict({
'path': 'https://nicovideo.cdn.nimg.jp/thumbnails/45285955/45285955.27227006.L',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'APIテスト用',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
'sort_name': 'apiテスト用',
'track_number': 0,
'translation_key': None,
- 'uri': 'nicovideo://track/sm45285955',
+ 'uri': 'nicovideo_test://track/sm45285955',
'version': '',
}),
]),
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
'name': 'テストマイリスト68461151-78597499',
'owner': '68461151',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'テストマイリスト68461151-78597499',
'translation_key': None,
- 'uri': 'nicovideo://playlist/78597499',
+ 'uri': 'nicovideo_test://playlist/78597499',
'version': '',
})
# ---
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
}),
'name': 'テストシリーズ68461151-527007',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'テストシリース68461151-527007',
'translation_key': None,
- 'uri': 'nicovideo://album/527007',
+ 'uri': 'nicovideo_test://album/527007',
'version': '',
'year': None,
})
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
'images': list([
dict({
'path': 'https://img.cdn.nimg.jp/s/nicovideo/thumbnails/45285955/45285955.27227006.original/r640x360l?key=7098d92a9c12d0b14101a4c3c8297e04c6e7a59eb59ffe9e71c88fa0181b0cb5',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
dict({
'path': 'https://nicovideo.cdn.nimg.jp/thumbnails/45285955/45285955.27227006.L',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'APIテスト用',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
'sort_name': 'apiテスト用',
'track_number': 0,
'translation_key': None,
- 'uri': 'nicovideo://track/sm45285955',
+ 'uri': 'nicovideo_test://track/sm45285955',
'version': '',
})
# ---
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
'images': list([
dict({
'path': 'https://img.cdn.nimg.jp/s/nicovideo/thumbnails/45285955/45285955.27227006.original/r640x360l?key=7098d92a9c12d0b14101a4c3c8297e04c6e7a59eb59ffe9e71c88fa0181b0cb5',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
dict({
'path': 'https://nicovideo.cdn.nimg.jp/thumbnails/45285955/45285955.27227006.L',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'APIテスト用',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
'sort_name': 'apiテスト用',
'track_number': 0,
'translation_key': None,
- 'uri': 'nicovideo://track/sm45285955',
+ 'uri': 'nicovideo_test://track/sm45285955',
'version': '',
})
# ---
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
'images': list([
dict({
'path': 'https://img.cdn.nimg.jp/s/nicovideo/thumbnails/45285955/45285955.27227006.original/r640x360l?key=7098d92a9c12d0b14101a4c3c8297e04c6e7a59eb59ffe9e71c88fa0181b0cb5',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
dict({
'path': 'https://nicovideo.cdn.nimg.jp/thumbnails/45285955/45285955.27227006.L',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'APIテスト用',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
'sort_name': 'apiテスト用',
'track_number': 0,
'translation_key': None,
- 'uri': 'nicovideo://track/sm45285955',
+ 'uri': 'nicovideo_test://track/sm45285955',
'version': '',
})
# ---
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
'images': list([
dict({
'path': 'https://img.cdn.nimg.jp/s/nicovideo/thumbnails/45285955/45285955.27227006.original/r640x360l?key=7098d92a9c12d0b14101a4c3c8297e04c6e7a59eb59ffe9e71c88fa0181b0cb5',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
dict({
'path': 'https://nicovideo.cdn.nimg.jp/thumbnails/45285955/45285955.27227006.L',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'APIテスト用',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
'sort_name': 'apiテスト用',
'track_number': 0,
'translation_key': None,
- 'uri': 'nicovideo://track/sm45285955',
+ 'uri': 'nicovideo_test://track/sm45285955',
'version': '',
})
# ---
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
}),
'name': 'テストシリーズ68461151-527007',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'テストシリース68461151-527007',
'translation_key': None,
- 'uri': 'nicovideo://album/527007',
+ 'uri': 'nicovideo_test://album/527007',
'version': '',
'year': None,
}),
'images': list([
dict({
'path': 'https://secure-dcdn.cdn.nimg.jp/nicoaccount/usericon/defaults/blank.jpg',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'ゲスト',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'ケスト',
'translation_key': None,
- 'uri': 'nicovideo://artist/68461151',
+ 'uri': 'nicovideo_test://artist/68461151',
'version': '',
}),
]),
'images': list([
dict({
'path': 'https://nicovideo.cdn.nimg.jp/thumbnails/45285955/45285955.27227006',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
dict({
'path': 'https://nicovideo.cdn.nimg.jp/thumbnails/45285955/45285955.27227006.L',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
dict({
'path': 'https://nicovideo.cdn.nimg.jp/thumbnails/45285955/45285955.27227006.M',
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'APIテスト用',
'position': None,
- 'provider': 'nicovideo',
+ 'provider': 'nicovideo_test',
'provider_mappings': list([
dict({
'audio_format': dict({
'sort_name': 'apiテスト用',
'track_number': 0,
'translation_key': None,
- 'uri': 'nicovideo://track/sm45285955',
+ 'uri': 'nicovideo_test://track/sm45285955',
'version': '',
})
# ---
"""Create a NicovideoConverterManager for testing."""
# Create mock provider
mock_provider = Mock()
- mock_provider.lookup_key = "nicovideo"
mock_provider.instance_id = "nicovideo_test"
mock_provider.domain = "nicovideo"
'images': list([
dict({
'path': 'https://resources.tidal.com/images/1234/5678/90ab/cdef/750x750.jpg',
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'Test Artist',
'position': None,
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'test artist',
'translation_key': None,
- 'uri': 'tidal://artist/12345',
+ 'uri': 'tidal_instance://artist/12345',
'version': '',
}),
]),
'images': list([
dict({
'path': 'https://resources.tidal.com/images/abcd/ef01/2345/6789/750x750.jpg',
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'Test Album',
'position': None,
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'test album',
'translation_key': None,
- 'uri': 'tidal://album/67890',
+ 'uri': 'tidal_instance://album/67890',
'version': 'Deluxe Edition',
'year': 2023,
})
'images': list([
dict({
'path': 'https://resources.tidal.com/images/1234/5678/90ab/cdef/750x750.jpg',
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'Test Artist',
'position': None,
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'test artist',
'translation_key': None,
- 'uri': 'tidal://artist/12345',
+ 'uri': 'tidal_instance://artist/12345',
'version': '',
})
# ---
'images': list([
dict({
'path': 'http://example.com/mix-medium.jpg',
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'remotely_accessible': True,
'type': 'thumb',
}),
'name': 'My Daily Discovery',
'owner': 'Created by Tidal',
'position': None,
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'my daily discovery',
'translation_key': None,
- 'uri': 'tidal://playlist/mix_mix_123',
+ 'uri': 'tidal_instance://playlist/mix_mix_123',
'version': '',
})
# ---
'images': list([
dict({
'path': 'https://resources.tidal.com/images/playlist/square/image/id/750x750.jpg',
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'remotely_accessible': True,
'type': 'thumb',
}),
'name': 'Test Playlist',
'owner': 'Tidal',
'position': None,
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'test playlist',
'translation_key': None,
- 'uri': 'tidal://playlist/aabbcc-1122-3344-5566',
+ 'uri': 'tidal_instance://playlist/aabbcc-1122-3344-5566',
'version': '',
})
# ---
'item_id': '67890',
'media_type': 'album',
'name': 'Test Album',
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'sort_name': 'test album',
'translation_key': None,
- 'uri': 'tidal://album/67890',
+ 'uri': 'tidal_instance://album/67890',
'version': '',
}),
'artists': list([
'images': list([
dict({
'path': 'https://resources.tidal.com/images/1234/5678/90ab/cdef/750x750.jpg',
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'Test Artist',
'position': None,
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'provider_mappings': list([
dict({
'audio_format': dict({
]),
'sort_name': 'test artist',
'translation_key': None,
- 'uri': 'tidal://artist/12345',
+ 'uri': 'tidal_instance://artist/12345',
'version': '',
}),
]),
'images': list([
dict({
'path': 'https://resources.tidal.com/images/abcd/ef01/2345/6789/750x750.jpg',
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'remotely_accessible': True,
'type': 'thumb',
}),
}),
'name': 'Test Track',
'position': None,
- 'provider': 'tidal',
+ 'provider': 'tidal_instance',
'provider_mappings': list([
dict({
'audio_format': dict({
'sort_name': 'test track',
'track_number': 1,
'translation_key': None,
- 'uri': 'tidal://track/112233',
+ 'uri': 'tidal_instance://track/112233',
'version': 'Remastered',
})
# ---
def provider_mock() -> Mock:
"""Return a mock provider."""
provider = Mock()
- provider.lookup_key = "tidal"
provider.domain = "tidal"
provider.instance_id = "tidal_instance"
provider.auth.user_id = "12345"
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=name,
)
def provider_mock() -> Mock:
"""Return a mock provider."""
provider = Mock()
- provider.lookup_key = "tidal"
provider.domain = "tidal"
provider.instance_id = "tidal_instance"
provider.auth.user_id = "12345"
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=name,
)
def provider_mock() -> Mock:
"""Return a mock provider."""
provider = Mock()
- provider.lookup_key = "tidal"
provider.domain = "tidal"
provider.instance_id = "tidal_instance"
provider.auth.user_id = "12345"
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=name,
)
def provider_mock() -> Mock:
"""Return a mock provider."""
provider = Mock()
- provider.lookup_key = "tidal"
provider.domain = "tidal"
provider.instance_id = "tidal_instance"
provider.auth.user_id = "12345"
def provider_mock() -> Mock:
"""Return a mock provider."""
provider = Mock()
- provider.lookup_key = "tidal"
provider.domain = "tidal"
provider.instance_id = "tidal_instance"
provider.auth.user_id = "12345"
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=name,
)
def provider_mock() -> Mock:
"""Return a mock provider."""
provider = Mock()
- provider.lookup_key = "tidal"
provider.domain = "tidal"
provider.instance_id = "tidal_instance"
provider.auth.user_id = "12345"
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=provider.lookup_key,
+ provider=provider.instance_id,
name=name,
)
assert mapping.media_type == MediaType.ARTIST
assert mapping.item_id == "123"
- assert mapping.provider == provider.lookup_key
+ assert mapping.provider == provider.instance_id
assert mapping.name == "Test Artist"
def provider_mock() -> Mock:
"""Return a mock provider."""
provider = Mock()
- provider.lookup_key = "tidal"
+ provider.domain = "tidal"
provider.instance_id = "tidal_instance"
provider.config.get_value.return_value = "HIGH"
provider.api = AsyncMock()
stream_details = await streaming_manager.get_stream_details("123")
assert stream_details.item_id == "123"
- assert stream_details.provider == "tidal"
+ assert stream_details.provider == "tidal_instance"
assert stream_details.audio_format.content_type == ContentType.FLAC
assert stream_details.audio_format.sample_rate == 44100
assert stream_details.audio_format.bit_depth == 16