Use lookup key to automatically select instance id vs domain to define generic URI/provider
key_id = base64.b64decode(uri.split(",")[1])
return StreamDetails(
item_id=item_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
audio_format=AudioFormat(
content_type=ContentType.UNKNOWN,
),
# No more details available other than the id, return an ItemMapping
return ItemMapping(
media_type=MediaType.ARTIST,
- provider=self.instance_id,
+ provider=self.lookup_key,
item_id=artist_id,
name=artist_id,
)
# No more details available other than the id, return an ItemMapping
return ItemMapping(
media_type=MediaType.ALBUM,
- provider=self.instance_id,
+ provider=self.lookup_key,
item_id=album_id,
name=album_id,
)
album.artists = [
ItemMapping(
media_type=MediaType.ARTIST,
- provider=self.instance_id,
+ provider=self.lookup_key,
item_id=artist_name,
name=artist_name,
)
ItemMapping(
media_type=MediaType.ARTIST,
item_id=artist_name,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=artist_name,
)
]
"""Parse Apple Music playlist object to generic layout."""
attributes = playlist_obj["attributes"]
playlist_id = attributes["playParams"].get("globalId") or playlist_obj["id"]
+ is_editable = attributes.get("canEdit", False)
playlist = Playlist(
item_id=playlist_id,
- provider=self.domain,
+ provider=self.instance_id if is_editable else self.lookup_key,
name=attributes["name"],
owner=attributes.get("curatorName", "me"),
provider_mappings={
url=attributes.get("url"),
)
},
+ is_editable=is_editable,
)
if artwork := attributes.get("artwork"):
url = artwork["url"]
]
if description := attributes.get("description"):
playlist.metadata.description = description.get("standard")
- playlist.is_editable = attributes.get("canEdit", False)
if checksum := attributes.get("lastModifiedDate"):
playlist.cache_checksum = checksum
return playlist
item_id=abs_podcast.id_,
name=title,
publisher=abs_podcast.media.metadata.author,
- provider=self.domain,
+ provider=self.lookup_key,
total_episodes=abs_podcast.media.num_episodes,
provider_mappings={
ProviderMapping(
position = fallback_episode_cnt
mass_episode = PodcastEpisode(
item_id=episode_id,
- provider=self.domain,
+ provider=self.lookup_key,
name=episode.title,
duration=int(episode.duration),
position=position,
podcast=ItemMapping(
item_id=prov_podcast_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=episode.title,
media_type=MediaType.PODCAST,
),
async def _parse_audiobook(self, abs_audiobook: ABSAudioBook) -> Audiobook:
mass_audiobook = Audiobook(
item_id=abs_audiobook.id_,
- provider=self.domain,
+ provider=self.lookup_key,
name=abs_audiobook.media.metadata.title,
duration=int(abs_audiobook.media.duration),
provider_mappings={
# audiobookshelf returns information of stream, so we should be able
# to lift unknown at some point.
return StreamDetails(
- provider=self.instance_id,
+ provider=self.lookup_key,
item_id=audiobook_id,
audio_format=AudioFormat(
content_type=ContentType.UNKNOWN,
media_url = abs_episode.audio_track.content_url
full_url = f"{base_url}{media_url}?token={token}"
return StreamDetails(
- provider=self.instance_id,
+ provider=self.lookup_key,
item_id=podcast_id,
audio_format=AudioFormat(
content_type=ContentType.UNKNOWN,
BrowseFolder(
item_id=library.id_,
name=library.name,
- provider=self.instance_id,
+ provider=self.lookup_key,
path=f"{self.instance_id}://{item_path}/{library.id_}",
)
)
return ItemMapping(
media_type=media_type,
item_id=item.id_,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=title,
image=image,
)
url = url_details["sources"][0]["url"]
return StreamDetails(
item_id=item_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
audio_format=AudioFormat(
content_type=ContentType.try_parse(url_details["format"].split("_")[0])
),
"""Parse the deezer-python artist to a Music Assistant artist."""
return Artist(
item_id=str(artist.id),
- provider=self.domain,
+ provider=self.lookup_key,
name=artist.name,
media_type=MediaType.ARTIST,
provider_mappings={
return Album(
album_type=AlbumType(album.type),
item_id=str(album.id),
- provider=self.domain,
+ provider=self.lookup_key,
name=album.title,
artists=[
ItemMapping(
media_type=MediaType.ARTIST,
item_id=str(album.artist.id),
- provider=self.instance_id,
+ provider=self.lookup_key,
name=album.artist.name,
)
],
def parse_playlist(self, playlist: deezer.Playlist) -> Playlist:
"""Parse the deezer-python playlist to a Music Assistant playlist."""
creator = self.get_playlist_creator(playlist)
+ is_editable = creator.id == self.user.id
return Playlist(
item_id=str(playlist.id),
- provider=self.domain,
+ provider=self.instance_id if is_editable else self.lookup_key,
name=playlist.title,
media_type=MediaType.PLAYLIST,
provider_mappings={
)
],
),
- is_editable=creator.id == self.user.id,
+ is_editable=is_editable,
owner=creator.name,
cache_checksum=playlist.checksum,
)
artist = ItemMapping(
media_type=MediaType.ARTIST,
item_id=str(getattr(track.artist, "id", f"deezer-{track.artist.name}")),
- provider=self.instance_id,
+ provider=self.lookup_key,
name=track.artist.name,
)
else:
album = ItemMapping(
media_type=MediaType.ALBUM,
item_id=str(track.album.id),
- provider=self.instance_id,
+ provider=self.lookup_key,
name=track.album.title,
)
else:
item = Track(
item_id=str(track.id),
- provider=self.domain,
+ provider=self.lookup_key,
name=track.title,
sort_name=self.get_short_title(track),
duration=track.duration,
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=name,
)
url = await self._client.get_full_stream_url(int(item_id), "music-assistant")
return StreamDetails(
- provider=self.instance_id,
+ provider=self.lookup_key,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.UNKNOWN,
artist = Artist(
item_id=artist_id,
name=artist_obj["name"],
- provider=self.domain,
+ provider=self.lookup_key,
provider_mappings={
ProviderMapping(
item_id=artist_id,
MediaItemImage(
type=ImageType.THUMB,
path=await self._client.get_artist_artwork_url(artist_id),
- provider=self.instance_id,
+ provider=self.lookup_key,
remotely_accessible=True,
)
]
name, version = parse_title_and_version(album_obj["name"])
album = Album(
item_id=album_id,
- provider=self.domain,
+ provider=self.lookup_key,
name=name,
year=album_obj["year"],
version=version,
artist = Artist(
item_id=VARIOUS_ARTISTS_MBID,
name=VARIOUS_ARTISTS_NAME,
- provider=self.instance_id,
+ provider=self.lookup_key,
provider_mappings={
ProviderMapping(
item_id=VARIOUS_ARTISTS_MBID,
return MediaItemImage(
type=ImageType.THUMB,
path=url,
- provider=self.instance_id,
+ provider=self.lookup_key,
remotely_accessible=True,
)
"""Parse an iBroadcast track object to a Track model object."""
track = Track(
item_id=track_obj["track_id"],
- provider=self.domain,
+ provider=self.lookup_key,
name=track_obj["title"],
provider_mappings={
ProviderMapping(
playlist_id = str(playlist_obj["playlist_id"])
playlist = Playlist(
item_id=playlist_id,
- provider=self.domain,
+ provider=self.lookup_key,
name=playlist_obj["name"],
provider_mappings={
ProviderMapping(
artist = Artist(
item_id=UNKNOWN_ARTIST_MAPPING.item_id,
name=UNKNOWN_ARTIST_MAPPING.name,
- provider=self.domain,
+ provider=self.lookup_key,
provider_mappings={
ProviderMapping(
item_id=UNKNOWN_ARTIST_MAPPING.item_id,
content_type = ContentType.try_parse(mimetype) if mimetype else ContentType.UNKNOWN
return StreamDetails(
item_id=jellyfin_track[ITEM_KEY_ID],
- provider=self.instance_id,
+ provider=self.lookup_key,
audio_format=AudioFormat(
content_type=content_type,
channels=jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0][ITEM_KEY_MEDIA_CHANNELS],
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=name,
version=version,
)
async def _get_or_create_artist_by_name(self, artist_name: str) -> Artist | ItemMapping:
if library_items := await self.mass.music.artists._get_library_items_by_query(
- search=artist_name, provider=self.instance_id
+ search=artist_name, provider=self.lookup_key
):
return ItemMapping.from_item(library_items[0])
return Artist(
item_id=artist_id,
name=artist_name or UNKNOWN_ARTIST,
- provider=self.domain,
+ provider=self.lookup_key,
provider_mappings={
ProviderMapping(
item_id=str(artist_id),
album_id = plex_album.key
album = Album(
item_id=album_id,
- provider=self.domain,
+ provider=self.lookup_key,
name=plex_album.title or "[Unknown]",
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=thumb,
- provider=self.instance_id,
+ provider=self.lookup_key,
remotely_accessible=False,
)
]
artist = Artist(
item_id=artist_id,
name=plex_artist.title or UNKNOWN_ARTIST,
- provider=self.domain,
+ provider=self.lookup_key,
provider_mappings={
ProviderMapping(
item_id=str(artist_id),
MediaItemImage(
type=ImageType.THUMB,
path=thumb,
- provider=self.instance_id,
+ provider=self.lookup_key,
remotely_accessible=False,
)
]
"""Parse a Plex Playlist response to a Playlist object."""
playlist = Playlist(
item_id=plex_playlist.key,
- provider=self.domain,
+ provider=self.lookup_key,
name=plex_playlist.title or "[Unknown]",
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=thumb,
- provider=self.instance_id,
+ provider=self.lookup_key,
remotely_accessible=False,
)
]
content = None
track = Track(
item_id=plex_track.key,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=plex_track.title or "[Unknown]",
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=thumb,
- provider=self.instance_id,
+ provider=self.lookup_key,
remotely_accessible=False,
)
]
stream_details = StreamDetails(
item_id=plex_track.key,
- provider=self.instance_id,
+ provider=self.lookup_key,
audio_format=AudioFormat(
content_type=content_type,
channels=media.audioChannels,
for episode in self.parsed["episodes"]:
if item_id == episode["guid"]:
return StreamDetails(
- provider=self.instance_id,
+ provider=self.lookup_key,
item_id=item_id,
audio_format=AudioFormat(
# hard coded to unknown, so ffmpeg figures out
podcast = Podcast(
item_id=self.podcast_id,
name=self.parsed["title"],
- provider=self.domain,
+ provider=self.lookup_key,
uri=self.parsed["link"],
total_episodes=len(self.parsed["episodes"]),
provider_mappings={
item_id = episode_obj["guid"]
episode = PodcastEpisode(
item_id=item_id,
- provider=self.domain,
+ provider=self.lookup_key,
name=name,
duration=episode_obj["total_time"],
position=episode_obj.get("number", fallback_position),
podcast=ItemMapping(
item_id=self.podcast_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=self.parsed["title"],
media_type=MediaType.PODCAST,
),
self.mass.create_task(self._report_playback_started(streamdata))
return StreamDetails(
item_id=str(item_id),
- provider=self.instance_id,
+ provider=self.lookup_key,
audio_format=AudioFormat(
content_type=content_type,
sample_rate=int(streamdata["sampling_rate"] * 1000),
def _parse_playlist(self, playlist_obj):
"""Parse qobuz playlist object to generic layout."""
+ is_editable = (
+ playlist_obj["owner"]["id"] == self._user_auth_info["user"]["id"]
+ or playlist_obj["is_collaborative"]
+ )
playlist = Playlist(
item_id=str(playlist_obj["id"]),
- provider=self.domain,
+ provider=self.instance_id if is_editable else self.lookup_key,
name=playlist_obj["name"],
owner=playlist_obj["owner"]["name"],
provider_mappings={
url=f'https://open.qobuz.com/playlist/{playlist_obj["id"]}',
)
},
- )
- playlist.is_editable = (
- playlist_obj["owner"]["id"] == self._user_auth_info["user"]["id"]
- or playlist_obj["is_collaborative"]
+ is_editable=is_editable,
)
if img := self.__get_image(playlist_obj):
playlist.metadata.images = [
# See `_channel_updated` for where this is handled.
self._current_stream_details = StreamDetails(
item_id=item_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
audio_format=AudioFormat(
content_type=ContentType.AAC,
),
def _parse_radio(self, channel: XMChannel) -> Radio:
radio = Radio(
- provider=self.instance_id,
+ provider=self.lookup_key,
item_id=channel.id,
name=channel.name,
provider_mappings={
if icon is not None:
images.append(
MediaItemImage(
- provider=self.instance_id,
+ provider=self.lookup_key,
type=ImageType.THUMB,
path=icon,
remotely_accessible=True,
)
images.append(
MediaItemImage(
- provider=self.instance_id,
+ provider=self.lookup_key,
type=ImageType.LOGO,
path=icon,
remotely_accessible=True,
if banner is not None:
images.append(
MediaItemImage(
- provider=self.instance_id,
+ provider=self.lookup_key,
type=ImageType.BANNER,
path=banner,
remotely_accessible=True,
)
images.append(
MediaItemImage(
- provider=self.instance_id,
+ provider=self.lookup_key,
type=ImageType.LANDSCAPE,
path=banner,
remotely_accessible=True,
"""Return the content details for the given track when it will be streamed."""
url: str = await self._soundcloud.get_stream_url(track_id=item_id)
return StreamDetails(
- provider=self.instance_id,
+ provider=self.lookup_key,
item_id=item_id,
# let ffmpeg work out the details itself as
# soundcloud uses a mix of different content types and streaming methods
async def _get_liked_songs_playlist(self) -> Playlist:
liked_songs = Playlist(
item_id=self._get_liked_songs_playlist_id(),
- provider=self.domain,
+ provider=self.lookup_key,
name=f'Liked Songs {self._sp_user["display_name"]}', # TODO to be translated
owner=self._sp_user["display_name"],
provider_mappings={
MediaItemImage(
type=ImageType.THUMB,
path="https://misc.scdn.co/liked-songs/liked-songs-64.png",
- provider=self.domain,
+ provider=self.lookup_key,
remotely_accessible=True,
)
]
"""Return the content details for the given track when it will be streamed."""
return StreamDetails(
item_id=item_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
audio_format=AudioFormat(
content_type=ContentType.OGG,
),
"""Parse spotify artist object to generic layout."""
artist = Artist(
item_id=artist_obj["id"],
- provider=self.domain,
+ provider=self.lookup_key,
name=artist_obj["name"] or artist_obj["id"],
provider_mappings={
ProviderMapping(
MediaItemImage(
type=ImageType.THUMB,
path=img_url,
- provider=self.instance_id,
+ provider=self.lookup_key,
remotely_accessible=True,
)
]
name, version = parse_title_and_version(album_obj["name"])
album = Album(
item_id=album_obj["id"],
- provider=self.domain,
+ provider=self.lookup_key,
name=name,
version=version,
provider_mappings={
MediaItemImage(
type=ImageType.THUMB,
path=album_obj["images"][0]["url"],
- provider=self.instance_id,
+ provider=self.lookup_key,
remotely_accessible=True,
)
]
name, version = parse_title_and_version(track_obj["name"])
track = Track(
item_id=track_obj["id"],
- provider=self.domain,
+ provider=self.lookup_key,
name=name,
version=version,
duration=track_obj["duration_ms"] / 1000,
MediaItemImage(
type=ImageType.THUMB,
path=track_obj["album"]["images"][0]["url"],
- provider=self.instance_id,
+ provider=self.lookup_key,
remotely_accessible=True,
)
]
def _parse_playlist(self, playlist_obj):
"""Parse spotify playlist object to generic layout."""
+ is_editable = (
+ playlist_obj["owner"]["id"] == self._sp_user["id"] or playlist_obj["collaborative"]
+ )
playlist = Playlist(
item_id=playlist_obj["id"],
- provider=self.domain,
+ provider=self.instance_id if is_editable else self.lookup_key,
name=playlist_obj["name"],
owner=playlist_obj["owner"]["display_name"],
provider_mappings={
url=playlist_obj["external_urls"]["spotify"],
)
},
- )
- playlist.is_editable = (
- playlist_obj["owner"]["id"] == self._sp_user["id"] or playlist_obj["collaborative"]
+ is_editable=is_editable,
)
if playlist_obj.get("images"):
playlist.metadata.images = [
MediaItemImage(
type=ImageType.THUMB,
path=playlist_obj["images"][0]["url"],
- provider=self.instance_id,
+ provider=self.lookup_key,
remotely_accessible=True,
)
]
artist_idx, album_idx, track_idx = prov_track_id.split("_", 3)
return Track(
item_id=prov_track_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=f"Test Track {artist_idx} - {album_idx} - {track_idx}",
duration=60,
artists=UniqueList([await self.get_artist(artist_idx)]),
"""Get full artist details by id."""
return Artist(
item_id=prov_artist_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=f"Test Artist {prov_artist_id}",
metadata=MediaItemMetadata(images=UniqueList([DEFAULT_THUMB, DEFAULT_FANART])),
provider_mappings={
artist_idx, album_idx = prov_album_id.split("_", 2)
return Album(
item_id=prov_album_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=f"Test Album {album_idx}",
artists=UniqueList([await self.get_artist(artist_idx)]),
provider_mappings={
"""Get full podcast details by id."""
return Podcast(
item_id=prov_podcast_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=f"Test Podcast {prov_podcast_id}",
metadata=MediaItemMetadata(images=UniqueList([DEFAULT_THUMB])),
provider_mappings={
"""Get full audiobook details by id."""
return Audiobook(
item_id=prov_audiobook_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=f"Test Audiobook {prov_audiobook_id}",
metadata=MediaItemMetadata(
images=UniqueList([DEFAULT_THUMB]),
podcast_id, episode_idx = prov_episode_id.split("_", 2)
return PodcastEpisode(
item_id=prov_episode_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=f"Test PodcastEpisode {podcast_id}-{episode_idx}",
duration=60,
podcast=ItemMapping(
item_id=podcast_id,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=f"Test Podcast {podcast_id}",
media_type=MediaType.PODCAST,
image=DEFAULT_THUMB,
) -> StreamDetails:
"""Get streamdetails for a track/radio."""
return StreamDetails(
- provider=self.instance_id,
+ provider=self.lookup_key,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.OGG,
return StreamDetails(
item_id=track.id,
- provider=self.instance_id,
+ provider=self.lookup_key,
audio_format=AudioFormat(
content_type=ContentType.try_parse(manifest.codecs),
sample_rate=manifest.sample_rate,
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=name,
)
artist_id = artist_obj.id
artist = Artist(
item_id=str(artist_id),
- provider=self.instance_id,
+ provider=self.lookup_key,
name=artist_obj.name,
provider_mappings={
ProviderMapping(
album_id = album_obj.id
album = Album(
item_id=str(album_id),
- provider=self.instance_id,
+ provider=self.lookup_key,
name=name,
version=version,
provider_mappings={
track_id = str(track_obj.id)
track = Track(
item_id=str(track_id),
- provider=self.instance_id,
+ provider=self.lookup_key,
name=track_obj.name,
version=version,
duration=track_obj.duration,
playlist_id = playlist_obj.id
creator_id = playlist_obj.creator.id if playlist_obj.creator else None
creator_name = playlist_obj.creator.name if playlist_obj.creator else "Tidal"
+ is_editable = bool(creator_id and str(creator_id) == self._tidal_user_id)
playlist = Playlist(
item_id=str(playlist_id),
- provider=self.instance_id,
+ provider=self.instance_id if is_editable else self.lookup_key,
name=playlist_obj.name,
owner=creator_name,
provider_mappings={
url=f"{BROWSE_URL}/playlist/{playlist_id}",
)
},
+ is_editable=is_editable,
)
- is_editable = bool(creator_id and str(creator_id) == self._tidal_user_id)
- playlist.is_editable = is_editable
# metadata
playlist.cache_checksum = str(playlist_obj.last_updated)
playlist.metadata.popularity = playlist_obj.popularity
if item_id.startswith("http"):
# custom url
return StreamDetails(
- provider=self.instance_id,
+ provider=self.lookup_key,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.UNKNOWN,
if media_type and stream["media_type"] != media_type:
continue
return StreamDetails(
- provider=self.domain,
+ provider=self.lookup_key,
item_id=item_id,
# set contenttype to unknown so ffmpeg can auto detect it
audio_format=AudioFormat(content_type=ContentType.UNKNOWN),
stream_format = await self._get_stream_format(item_id=item_id)
self.logger.debug("Found stream_format: %s for song %s", stream_format["format"], item_id)
stream_details = StreamDetails(
- provider=self.instance_id,
+ provider=self.lookup_key,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.try_parse(stream_format["audio_ext"]),
album = Album(
item_id=album_id,
name=name,
- provider=self.domain,
+ provider=self.lookup_key,
provider_mappings={
ProviderMapping(
item_id=str(album_id),
artist = Artist(
item_id=artist_id,
name=artist_obj["name"],
- provider=self.domain,
+ provider=self.lookup_key,
provider_mappings={
ProviderMapping(
item_id=str(artist_id),
"""Parse a YT Playlist response to a Playlist object."""
playlist_id = playlist_obj["id"]
playlist_name = playlist_obj["title"]
+ is_editable = playlist_obj.get("privacy") and playlist_obj.get("privacy") == "PRIVATE"
# Playlist ID's are not unique across instances for lists like 'Likes', 'Supermix', etc.
# So suffix with the instance id to make them unique
if playlist_id in YT_PERSONAL_PLAYLISTS:
playlist_name = f"{playlist_name} ({self.name})"
playlist = Playlist(
item_id=playlist_id,
- provider=self.domain,
+ provider=self.instance_id if is_editable else self.lookup_key,
name=playlist_name,
provider_mappings={
ProviderMapping(
url=f"{YTM_DOMAIN}/playlist?list={playlist_id}",
)
},
+ is_editable=is_editable,
)
if "description" in playlist_obj:
playlist.metadata.description = playlist_obj["description"]
if playlist_obj.get("thumbnails"):
playlist.metadata.images = self._parse_thumbnails(playlist_obj["thumbnails"])
- is_editable = False
- if playlist_obj.get("privacy") and playlist_obj.get("privacy") == "PRIVATE":
- is_editable = True
- playlist.is_editable = is_editable
+
if authors := playlist_obj.get("author"):
if isinstance(authors, str):
playlist.owner = authors
track_id = str(track_obj["videoId"])
track = Track(
item_id=track_id,
- provider=self.domain,
+ provider=self.lookup_key,
name=track_obj["title"],
provider_mappings={
ProviderMapping(
return ItemMapping(
media_type=media_type,
item_id=key,
- provider=self.instance_id,
+ provider=self.lookup_key,
name=name,
)