from __future__ import annotations
+from collections.abc import Iterable
from dataclasses import dataclass, field, fields
-from typing import TYPE_CHECKING, Any, Self, cast
+from typing import TYPE_CHECKING, Any, Self, TypeVar, cast
from mashumaro import DataClassDictMixin
MetadataTypes = int | bool | str | list[str]
+_T = TypeVar("_T")
+
+
+class UniqueList(list[_T]):
+ """Custom list that ensures the inserted items are unique."""
+
+ def __init__(self, iterable: Iterable[_T] | None = None) -> None:
+ """Initialize."""
+ if not iterable:
+ super().__init__()
+ return
+ seen = set()
+ seen_add = seen.add
+ super().__init__(x for x in iterable if not (x in seen or seen_add(x)))
+
+ def append(self, item: _T) -> None:
+ """Append item."""
+ if item in self:
+ return
+ super().append(item)
+
+ def extend(self, other: Iterable[_T]) -> None:
+ """Extend list."""
+ other = [x for x in other if x not in self]
+ super().extend(other)
+
@dataclass(kw_only=True)
class AudioFormat(DataClassDictMixin):
review: str | None = None
explicit: bool | None = None
# NOTE: images is a list of available images, sorted by preference
- images: list[MediaItemImage] | None = None
+ images: UniqueList[MediaItemImage] | None = None
genres: set[str] | None = None
mood: str | None = None
style: str | None = None
lyrics: str | None = None # tracks only
label: str | None = None
links: set[MediaItemLink] | None = None
- chapters: list[MediaItemChapter] | None = None
+ chapters: UniqueList[MediaItemChapter] | None = None
performers: set[str] | None = None
preview: str | None = None
popularity: int | None = None
def __post_init__(self):
"""Call after init."""
+ if not self.name:
+ # we've got some reports where the name was empty, causing weird issues.
+ # e.g. here: https://github.com/music-assistant/hass-music-assistant/issues/1515
+ self.name = "[Unknown]"
if not self.uri:
self.uri = create_uri(self.media_type, self.provider, self.item_id)
if not self.sort_name:
class MediaItem(_MediaItemBase):
"""Base representation of a media item."""
+ __hash__ = _MediaItemBase.__hash__
+ __eq__ = _MediaItemBase.__eq__
+
provider_mappings: set[ProviderMapping]
# optional fields below
metadata: MediaItemMetadata = field(default_factory=MediaItemMetadata)
favorite: bool = False
+ position: int | None = None # required for playlist tracks, optional for all other
# timestamps to determine when the item was added/modified to the db
timestamp_added: int = 0
timestamp_modified: int = 0
class ItemMapping(_MediaItemBase):
"""Representation of a minimized item object."""
+ __hash__ = _MediaItemBase.__hash__
+ __eq__ = _MediaItemBase.__eq__
+
available: bool = True
image: MediaItemImage | None = None
class Artist(MediaItem):
"""Model for an artist."""
+ __hash__ = _MediaItemBase.__hash__
+ __eq__ = _MediaItemBase.__eq__
+
media_type: MediaType = MediaType.ARTIST
class Album(MediaItem):
"""Model for an album."""
+ __hash__ = _MediaItemBase.__hash__
+ __eq__ = _MediaItemBase.__eq__
+
media_type: MediaType = MediaType.ALBUM
version: str = ""
year: int | None = None
- artists: list[Artist | ItemMapping] = field(default_factory=list)
+ artists: UniqueList[Artist | ItemMapping] = field(default_factory=UniqueList)
album_type: AlbumType = AlbumType.UNKNOWN
class Track(MediaItem):
"""Model for a track."""
+ __hash__ = _MediaItemBase.__hash__
+ __eq__ = _MediaItemBase.__eq__
+
media_type: MediaType = MediaType.TRACK
duration: int = 0
version: str = ""
- artists: list[Artist | ItemMapping] = field(default_factory=list)
+ artists: UniqueList[Artist | ItemMapping] = field(default_factory=UniqueList)
album: Album | ItemMapping | None = None # optional
disc_number: int | None = None # required for album tracks
track_number: int | None = None # required for album tracks
- position: int | None = None # required for playlist tracks
def __hash__(self):
"""Return custom hash."""
album, disc_number and track_number
"""
+ __hash__ = _MediaItemBase.__hash__
+ __eq__ = _MediaItemBase.__eq__
+
album: Album | ItemMapping
disc_number: int
track_number: int
Same as regular Track but with explicit and required definition of position.
"""
+ __hash__ = _MediaItemBase.__hash__
+ __eq__ = _MediaItemBase.__eq__
+
position: int
@classmethod
class Playlist(MediaItem):
"""Model for a playlist."""
+ __hash__ = _MediaItemBase.__hash__
+ __eq__ = _MediaItemBase.__eq__
+
media_type: MediaType = MediaType.PLAYLIST
owner: str = ""
is_editable: bool = False
class Radio(MediaItem):
"""Model for a radio station."""
+ __hash__ = _MediaItemBase.__hash__
+ __eq__ = _MediaItemBase.__eq__
+
media_type: MediaType = MediaType.RADIO
duration: int = 172800
class BrowseFolder(MediaItem):
"""Representation of a Folder used in Browse (which contains media items)."""
+ __hash__ = _MediaItemBase.__hash__
+ __eq__ = _MediaItemBase.__eq__
+
media_type: MediaType = MediaType.FOLDER
# path: the path (in uri style) to/for this browse folder
path: str = ""
ItemMapping,
MediaType,
Track,
+ UniqueList,
)
from music_assistant.constants import (
DB_TABLE_ALBUM_ARTISTS,
{self.db_table}.*,
{DB_TABLE_ARTISTS}.sort_name AS sort_artist,
json_group_array(
- json_object(
+ DISTINCT json_object(
'item_id', {DB_TABLE_PROVIDER_MAPPINGS}.provider_item_id,
'provider_domain', {DB_TABLE_PROVIDER_MAPPINGS}.provider_domain,
'provider_instance', {DB_TABLE_PROVIDER_MAPPINGS}.provider_instance,
'details', {DB_TABLE_PROVIDER_MAPPINGS}.details
)) filter ( where {DB_TABLE_PROVIDER_MAPPINGS}.item_id is not null) as {DB_TABLE_PROVIDER_MAPPINGS},
json_group_array(
- json_object(
+ DISTINCT json_object(
'item_id', {DB_TABLE_ARTISTS}.item_id,
'provider', 'library',
'name', {DB_TABLE_ARTISTS}.name,
add_to_library=add_to_library,
)
# append artist details to full track item (resolve ItemMappings)
- album_artists = []
+ album_artists = UniqueList()
for artist in album.artists:
if not isinstance(artist, ItemMapping):
album_artists.append(artist)
db_id = int(item_id) # ensure integer
# recursively also remove artist albums
for db_row in await self.mass.music.database.get_rows_from_query(
- f"SELECT item_id FROM {DB_TABLE_ALBUMS} WHERE artists LIKE '%\"{db_id}\"%'",
+ f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = {db_id}",
limit=5000,
):
with contextlib.suppress(MediaNotFoundError):
- await self.mass.music.albums.remove_item_from_library(db_row["item_id"])
+ await self.mass.music.albums.remove_item_from_library(db_row["album_id"])
# recursively also remove artist tracks
for db_row in await self.mass.music.database.get_rows_from_query(
- f"SELECT item_id FROM {DB_TABLE_TRACKS} WHERE artists LIKE '%\"{db_id}\"%'",
+ f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = {db_id}",
limit=5000,
):
with contextlib.suppress(MediaNotFoundError):
- await self.mass.music.tracks.remove_item_from_library(db_row["item_id"])
+ await self.mass.music.tracks.remove_item_from_library(db_row["track_id"])
# delete the artist itself from db
await super().remove_item_from_library(db_id)
provider_instance_id_or_domain,
):
query = (
- f"WHERE trackartists.artist_id = {db_artist.item_id} AND "
- f'(provider_mappings.provider_domain = "{provider_instance_id_or_domain}" OR '
- f'provider_mappings.provider_instance = "{provider_instance_id_or_domain}")'
+ "WHERE trackartists.artist_id = :artist_id AND "
+ "(provider_mappings.provider_domain = :prov_id OR "
+ "provider_mappings.provider_instance = :prov_id)"
+ )
+ query_params = {
+ "artist_id": db_artist.item_id,
+ "prov_id": provider_instance_id_or_domain,
+ }
+ paged_list = await self.mass.music.tracks.library_items(
+ extra_query=query, extra_query_params=query_params
)
- paged_list = await self.mass.music.tracks.library_items(extra_query=query)
return paged_list.items
# store (serializable items) in cache
if prov.is_streaming_provider:
SELECT
{self.db_table}.*,
json_group_array(
- json_object(
+ DISTINCT json_object(
'item_id', {DB_TABLE_PROVIDER_MAPPINGS}.provider_item_id,
'provider_domain', {DB_TABLE_PROVIDER_MAPPINGS}.provider_domain,
'provider_instance', {DB_TABLE_PROVIDER_MAPPINGS}.provider_instance,
) -> list[ItemCls]:
"""Fetch all records from library for given provider."""
query_parts = []
- prov_ids_str = str(tuple(provider_item_ids or ()))
- if prov_ids_str.endswith(",)"):
- prov_ids_str = prov_ids_str.replace(",)", ")")
+ query_params = {
+ "prov_id": provider_instance_id_or_domain,
+ }
if provider_instance_id_or_domain == "library":
# request for specific library id's
if provider_item_ids:
- query_parts.append(f"{self.db_table}.item_id in {prov_ids_str}")
+ query_parts.append(f"{self.db_table}.item_id in :item_ids")
+ query_params["item_ids"] = provider_item_ids
else:
# provider filtered response
query_parts.append(
- f"(provider_mappings.provider_instance = '{provider_instance_id_or_domain}' "
- f"OR provider_mappings.provider_domain = '{provider_instance_id_or_domain}')"
+ "(provider_mappings.provider_instance = :prov_id "
+ "OR provider_mappings.provider_domain = :prov_id)"
)
if provider_item_ids:
- query_parts.append(f"provider_mappings.provider_item_id in {prov_ids_str}")
+ query_parts.append("provider_mappings.provider_item_id in :item_ids")
+ query_params["item_ids"] = provider_item_ids
# build final query
query = "WHERE " + " AND ".join(query_parts)
- paged_list = await self.library_items(limit=limit, offset=offset, extra_query=query)
+ paged_list = await self.library_items(
+ limit=limit, offset=offset, extra_query=query, extra_query_params=query_params
+ )
return paged_list.items
async def iter_library_items_by_prov_id(
},
)
- # update the item in db (provider_mappings column only)
+ # update the item's provider mappings (and check if we still have any)
library_item.provider_mappings = {
x for x in library_item.provider_mappings if x.provider_instance != provider_instance_id
}
{DB_TABLE_ARTISTS}.sort_name AS sort_artist,
{DB_TABLE_ARTISTS}.sort_name AS sort_album,
json_group_array(
- json_object(
+ DISTINCT json_object(
'item_id', {DB_TABLE_PROVIDER_MAPPINGS}.provider_item_id,
'provider_domain', {DB_TABLE_PROVIDER_MAPPINGS}.provider_domain,
'provider_instance', {DB_TABLE_PROVIDER_MAPPINGS}.provider_instance,
'details', {DB_TABLE_PROVIDER_MAPPINGS}.details
)) filter ( where {DB_TABLE_PROVIDER_MAPPINGS}.item_id is not null) as {DB_TABLE_PROVIDER_MAPPINGS},
json_group_array(
- json_object(
+ DISTINCT json_object(
'item_id', {DB_TABLE_ARTISTS}.item_id,
'provider', 'library',
'name', {DB_TABLE_ARTISTS}.name,
from __future__ import annotations
import asyncio
+import logging
import os
import shutil
from collections.abc import AsyncGenerator
except Exception as err:
# we dont want the whole removal process to stall on one item
# so in case of an unexpected error, we log and move on.
- self.logger.warning("Error while removing %s: %s", item.uri, str(err))
+ self.logger.warning(
+ "Error while removing %s: %s",
+ item.uri,
+ str(err),
+ exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None,
+ )
errors += 1
if errors == 0:
import time
from collections.abc import AsyncGenerator
from contextlib import suppress
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, TypedDict
from music_assistant.common.helpers.util import get_changed_keys
from music_assistant.common.models.config_entries import (
CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_action_playlist"
+class CompareState(TypedDict):
+ """Simple object where we store the (previous) state of a queue.
+
+ Used for compare actions.
+ """
+
+ queue_id: str
+ state: PlayerState
+ current_index: int | None
+ elapsed_time: int
+
+
class PlayerQueuesController(CoreController):
"""Controller holding all logic to enqueue music for players."""
super().__init__(*args, **kwargs)
self._queues: dict[str, PlayerQueue] = {}
self._queue_items: dict[str, list[QueueItem]] = {}
- self._prev_states: dict[str, dict] = {}
+ self._prev_states: dict[str, CompareState] = {}
self.manifest.name = "Player Queues controller"
self.manifest.description = (
"Music Assistant's core controller which manages the queues for all players."
queue.elapsed_time += queue.current_item.streamdetails.seek_position
# basic throttle: do not send state changed events if queue did not actually change
- prev_state = self._prev_states.get(queue_id, {})
- new_state = queue.to_dict()
- new_state.pop("elapsed_time_last_updated", None)
+ prev_state = self._prev_states.get(
+ queue_id,
+ CompareState(
+ queue_id=queue_id,
+ state=PlayerState.IDLE,
+ current_index=None,
+ elapsed_time=0,
+ ),
+ )
+ new_state = CompareState(
+ queue_id=queue_id,
+ state=queue.state,
+ current_index=queue.current_index,
+ elapsed_time=queue.elapsed_time,
+ )
changed_keys = get_changed_keys(prev_state, new_state)
-
# return early if nothing changed
if len(changed_keys) == 0:
return
self._prev_states[queue_id] = new_state
return
# handle player was playing and is now stopped
- # if player finished playing a track for 85%, mark current item as finished
+ # if player finished playing a track for 90%, mark current item as finished
if (
- prev_state.get("state") == "playing"
+ prev_state["state"] == "playing"
and queue.state == PlayerState.IDLE
and (
queue.current_item
and queue.current_item.duration
- and prev_state.get("elapsed_time", queue.elapsed_time)
- > (queue.current_item.duration * 0.85)
+ and prev_state["elapsed_time"] > (queue.current_item.duration * 0.90)
)
):
queue.current_index += 1
queue.next_item = None
# signal update and store state
self.signal_update(queue_id)
+
self._prev_states[queue_id] = new_state
# watch dynamic radio items refill if needed
if "current_index" in changed_keys:
- fill_index = len(self._queue_items[queue_id]) - 5
- if queue.radio_source and queue.current_index and (queue.current_index >= fill_index):
+ if (
+ queue.radio_source
+ and queue.current_index
+ and (queue.items - queue.current_index) < 5
+ ):
self.mass.create_task(self._fill_radio_tracks(queue_id))
def on_player_remove(self, player_id: str) -> None:
async def _fill_radio_tracks(self, queue_id: str) -> None:
"""Fill a Queue with (additional) Radio tracks."""
+ # we need to debounce, if we're called twice within a short timeframe
+ debounce_key = f"fill_radio_{queue_id}"
+ if getattr(self, debounce_key, None):
+ return
+ setattr(self, debounce_key, True)
tracks = await self._get_radio_tracks(queue_id)
# fill queue - filter out unavailable items
queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
self.load(
queue_id,
queue_items,
- insert_at_index=len(self._queue_items[queue_id]) - 1,
+ insert_at_index=len(self._queue_items[queue_id]) + 1,
)
+ await asyncio.sleep(5)
+ setattr(self, debounce_key, None)
def _check_enqueue_next(
self,
player: Player,
queue: PlayerQueue,
- prev_state: dict[str, Any],
- new_state: dict[str, Any],
+ prev_state: CompareState,
+ new_state: CompareState,
) -> None:
"""Check if we need to enqueue the next item to the player itself."""
if not queue.active:
return
- if prev_state.get("state") != PlayerState.PLAYING:
+ if prev_state["state"] != PlayerState.PLAYING:
return
if (player := self.mass.players.get(queue.queue_id)) and player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
if PlayerFeature.ENQUEUE_NEXT in player.supported_features:
# we enqueue the next track after a new track
# has started playing and (repeat) before the current track ends
- new_track_started = new_state.get("state") == PlayerState.PLAYING and prev_state.get(
- "current_index"
- ) != new_state.get("current_index")
+ new_track_started = (
+ new_state["state"] == PlayerState.PLAYING
+ and prev_state["current_index"] != new_state["current_index"]
+ )
if (
new_track_started
or seconds_remaining == 15
from collections.abc import AsyncGenerator, Mapping
+def query_params(query: str, params: dict[str, Any] | None) -> tuple[str, dict[str, Any]]:
+ """Extend query parameters support."""
+ if params is None:
+ return (query, params)
+ count = 0
+ result_query = query
+ result_params = {}
+ for key, value in params.items():
+ # add support for a list within the query params
+ # recreates the params as (:_param_0, :_param_1) etc
+ if isinstance(value, list | tuple):
+ subparams = []
+ for subval in value:
+ subparam_name = f"_param_{count}"
+ result_params[subparam_name] = subval
+ subparams.append(subparam_name)
+ count += 1
+ params_str = ",".join(f":{x}" for x in subparams)
+ result_query = result_query.replace(f" :{key}", f" ({params_str})")
+ else:
+ result_params[key] = params[key]
+ return (result_query, result_params)
+
+
class DatabaseConnection:
"""Class that holds the (connection to the) database with some convenience helper functions."""
) -> list[Mapping]:
"""Get all rows for given custom query."""
query = f"{query} LIMIT {limit} OFFSET {offset}"
- return await self._db.execute_fetchall(query, params)
+ _query, _params = query_params(query, params)
+ return await self._db.execute_fetchall(_query, _params)
async def get_count_from_query(
self,
) -> int:
"""Get row count for given custom query."""
query = f"SELECT count() FROM ({query})"
- async with self._db.execute(query, params) as cursor:
+ _query, _params = query_params(query, params)
+ async with self._db.execute(_query, _params) as cursor:
if result := await cursor.fetchone():
return result[0]
return 0
all_uris: list[str] = []
skipped_lines = 0
for playlist_line in playlist_data.split("\n"):
+ playlist_line = playlist_line.strip() # noqa: PLW2901
+ if not playlist_line:
+ continue
if "://" not in playlist_line:
skipped_lines += 1
self.logger.debug("Ignoring line in migration playlist: %s", playlist_line)
import cchardet
import xmltodict
-from music_assistant.common.helpers.util import create_sort_name, parse_title_and_version
+from music_assistant.common.helpers.util import parse_title_and_version
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigEntryType,
item_id=artist_path,
provider=self.instance_id,
name=name,
- sort_name=sort_name or create_sort_name(name),
+ sort_name=sort_name,
provider_mappings={
ProviderMapping(
item_id=artist_path,
item_id=album_path,
provider=self.instance_id,
name=name,
- sort_name=sort_name or create_sort_name(name),
+ sort_name=sort_name,
artists=artists,
provider_mappings={
ProviderMapping(
ProviderFeature,\r
StreamType,\r
)\r
-from music_assistant.common.models.errors import (\r
- InvalidDataError,\r
- LoginFailed,\r
- MediaNotFoundError,\r
- MusicAssistantError,\r
-)\r
+from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError\r
from music_assistant.common.models.media_items import (\r
Album,\r
AlbumTrack,\r
jellyfin_server_url, jellyfin_server_user, jellyfin_server_password\r
)\r
credentials = client.auth.credentials.get_credentials()\r
+ if not credentials["Servers"]:\r
+ raise IndexError("No servers found")\r
server = credentials["Servers"][0]\r
server["username"] = jellyfin_server_user\r
_jellyfin_server = client\r
# json.dumps(server)\r
- except MusicAssistantError as err:\r
- msg = "Authentication failed: %s", str(err)\r
- raise LoginFailed(msg)\r
+ except Exception as err:\r
+ msg = f"Authentication failed: {err}"\r
+ raise LoginFailed(msg) from err\r
return _jellyfin_server\r
\r
self._jellyfin_server = await self._run_async(connect)\r
current_jellyfin_album[ITEM_KEY_ALBUM_ARTIST],\r
)\r
)\r
- elif len(current_jellyfin_album[ITEM_KEY_ARTIST_ITEMS]) >= 1:\r
+ elif (\r
+ ITEM_KEY_ARTIST_ITEMS in current_jellyfin_album\r
+ and len(current_jellyfin_album[ITEM_KEY_ARTIST_ITEMS]) >= 1\r
+ ):\r
num_artists = len(current_jellyfin_album[ITEM_KEY_ARTIST_ITEMS])\r
for i in range(num_artists):\r
album.artists.append(\r
)
async def _get_or_create_artist_by_name(self, artist_name) -> Artist:
- query = "WHERE name = :name AND provider_mappings = :provider_instance"
- params = {
- "name": f"%{artist_name}%",
- "provider_instance": f"%{self.instance_id}%",
- }
+ query = (
+ "WHERE artists.name = :name AND "
+ "provider_mappings.provider_instance = :provider_instance"
+ )
+ query_params = {"name": artist_name, "provider_instance": self.instance_id}
paged_list = await self.mass.music.artists.library_items(
- extra_query=query, extra_query_params=params
+ extra_query=query, extra_query_params=query_params
)
if paged_list and paged_list.items:
return ItemMapping.from_item(paged_list.items[0])
remotely_accessible=False,
)
]
- playlist.is_editable = True
+ playlist.is_editable = not plex_playlist.smart
+ playlist.metadata.cache_checksum = str(plex_playlist.updatedAt.timestamp())
+
return playlist
async def _parse_track(
):
if not (track_obj and track_obj["id"]):
continue
- track_obj["position"] = count
track = await self._parse_track(track_obj)
- yield track
+ yield PlaylistTrack.from_track(track, position=count)
count += 1
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
album.metadata.explicit = True
return album
- async def _parse_track(self, track_obj: dict) -> Track | AlbumTrack | PlaylistTrack:
+ async def _parse_track(self, track_obj: dict) -> Track:
"""Parse qobuz track object to generic layout."""
# pylint: disable=too-many-branches
name, version = parse_title_and_version(track_obj["title"], track_obj.get("version"))
- if "position" in track_obj:
- track_class = PlaylistTrack
- extra_init_kwargs = {"position": track_obj["position"]}
- elif "media_number" in track_obj and "track_number" in track_obj:
- track_class = AlbumTrack
- extra_init_kwargs = {
- "disc_number": track_obj["media_number"],
- "track_number": track_obj["track_number"],
- }
- else:
- track_class = Track
- extra_init_kwargs = {}
- track = track_class(
+ track = Track(
item_id=str(track_obj["id"]),
provider=self.domain,
name=name,
url=f'https://open.qobuz.com/track/{track_obj["id"]}',
)
},
- **extra_init_kwargs,
+ disc_number=track_obj.get("media_number"),
+ track_number=track_obj.get("track_number"),
)
if isrc := track_obj.get("isrc"):
track.external_ids.add((ExternalID.ISRC, isrc))
async def get_album_tracks(self, prov_album_id) -> list[AlbumTrack]:
"""Get all album tracks for given album id."""
return [
- await self._parse_track(item)
+ AlbumTrack.from_track(await self._parse_track(item))
for item in await self._get_all_items(f"albums/{prov_album_id}/tracks")
if item["id"]
]
if not (item and item["track"] and item["track"]["id"]):
continue
# use count as position
- item["track"]["position"] = count
track = await self._parse_track(item["track"])
- yield track
+ yield PlaylistTrack.from_track(track, position=count)
count += 1
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
self,
track_obj: dict[str, Any],
artist=None,
- ) -> Track | AlbumTrack | PlaylistTrack:
+ ) -> Track:
"""Parse spotify track object to generic layout."""
name, version = parse_title_and_version(track_obj["name"])
- if "position" in track_obj:
- track_class = PlaylistTrack
- extra_init_kwargs = {"position": track_obj["position"]}
- elif "disc_number" in track_obj and "track_number" in track_obj:
- track_class = AlbumTrack
- extra_init_kwargs = {
- "disc_number": track_obj["disc_number"],
- "track_number": track_obj["track_number"],
- }
- else:
- track_class = Track
- extra_init_kwargs = {}
-
- track = track_class(
+ track = Track(
item_id=track_obj["id"],
provider=self.domain,
name=name,
available=not track_obj["is_local"] and track_obj["is_playable"],
)
},
- **extra_init_kwargs,
+ disc_number=track_obj.get("disc_number"),
+ track_number=track_obj.get("track_number"),
)
if isrc := track_obj.get("external_ids", {}).get("isrc"):
track.external_ids.add((ExternalID.ISRC, isrc))
from asyncio_throttle import Throttler
-from music_assistant.common.helpers.util import create_sort_name
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType
from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
) -> AsyncGenerator[Radio, None]:
for item in items:
item_type = item.get("type", "")
+ if "unavailable" in item.get("key", ""):
+ continue
+ if not item.get("is_available", True):
+ continue
if item_type == "audio":
if "preset_id" not in item:
continue
- if "- Not Supported" in item.get("name", ""):
- continue
- if "- Not Supported" in item.get("text", ""):
- continue
# each radio station can have multiple streams add each one as different quality
stream_info = await self.__get_data("Tune.ashx", id=item["preset_id"])
for stream in stream_info["body"]:
bit_rate=bit_rate,
),
details=url,
+ available=details.get("is_available", True),
)
},
)
# preset number is used for sorting (not present at stream time)
- preset_number = details.get("preset_number")
- if preset_number and folder:
- radio.sort_name = f'{folder}-{details["preset_number"]}'
- elif preset_number:
- radio.sort_name = details["preset_number"]
- radio.sort_name += create_sort_name(name)
+ preset_number = details.get("preset_number", 0)
+ radio.position = preset_number
if "text" in details:
radio.metadata.description = details["text"]
# images
return []
tracks = []
for idx, track_obj in enumerate(album_obj["tracks"], 1):
- track_obj["disc_number"] = 0
- track_obj["track_number"] = track_obj.get("trackNumber", idx)
try:
- track = await self._parse_track(track_obj=track_obj)
+ track = AlbumTrack.from_track(
+ await self._parse_track(track_obj=track_obj),
+ disc_number=0,
+ track_number=track_obj.get("trackNumber", idx),
+ )
except InvalidDataError:
continue
tracks.append(track)
# Playlist tracks sometimes do not have a valid artist id
# In that case, call the API for track details based on track id
try:
- track_obj["position"] = index + 1
if track := await self._parse_track(track_obj):
- yield track
+ yield PlaylistTrack.from_track(track, index + 1)
except InvalidDataError:
if track := await self.get_track(track_obj["videoId"]):
- yield PlaylistTrack.from_dict({**track.to_dict(), "position": index + 1})
+ yield PlaylistTrack.from_track(track, index + 1)
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
"""Get a list of albums for the given artist."""
playlist.metadata.cache_checksum = playlist_obj.get("checksum")
return playlist
- async def _parse_track(self, track_obj: dict) -> Track | AlbumTrack | PlaylistTrack:
+ async def _parse_track(self, track_obj: dict) -> Track:
"""Parse a YT Track response to a Track model object."""
if not track_obj.get("videoId"):
msg = "Track is missing videoId"
raise InvalidDataError(msg)
- if "position" in track_obj:
- track_class = PlaylistTrack
- extra_init_kwargs = {"position": track_obj["position"]}
- elif "disc_number" in track_obj and "track_number" in track_obj:
- track_class = AlbumTrack
- extra_init_kwargs = {
- "disc_number": track_obj["disc_number"],
- "track_number": track_obj["track_number"],
- }
- else:
- track_class = Track
- extra_init_kwargs = {}
- track = track_class(
+ track = Track(
item_id=track_obj["videoId"],
provider=self.domain,
name=track_obj["title"],
),
)
},
- **extra_init_kwargs,
)
if track_obj.get("artists"):