help="Provide logging level. Example --log-level debug, "
"default=info, possible=(critical, error, warning, info, debug, verbose)",
)
+ parser.add_argument(
+ "--safe-mode",
+ action=argparse.BooleanOptionalAction,
+ help="Start in safe mode (core controllers only, no providers)",
+ )
return parser.parse_args()
# prefer value in hass_options
log_level = hass_options.get("log_level", args.log_level).upper()
dev_mode = os.environ.get("PYTHONDEVMODE", "0") == "1"
+ safe_mode = bool(
+ args.safe_mode or hass_options.get("safe_mode") or os.environ.get("MASS_SAFE_MODE")
+ )
# setup logger
logger = setup_logger(data_dir, log_level)
- mass = MusicAssistant(data_dir)
+ mass = MusicAssistant(data_dir, safe_mode)
# enable alpine subprocess workaround
_enable_posix_spawn()
from music_assistant.common.models.enums import MediaType
from music_assistant.common.models.media_items import (
Album,
+ AlbumTrack,
Artist,
MediaItemType,
PagedItems,
self,
item_id: str,
provider_instance_id_or_domain: str,
+ in_library_only: bool = False,
) -> list[Album]:
"""Get all (known) albums this track is featured on."""
return [
"music/tracks/track_albums",
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
+ in_library_only=in_library_only,
)
]
self,
item_id: str,
provider_instance_id_or_domain: str,
- ) -> list[Track]:
+ in_library_only: bool = False,
+ ) -> list[AlbumTrack]:
"""Get tracks for given album."""
return [
- Track.from_dict(item)
+ AlbumTrack.from_dict(item)
for item in await self.client.send_command(
"music/albums/album_tracks",
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
+ in_library_only=in_library_only,
)
]
self,
item_id: str,
provider_instance_id_or_domain: str,
+ in_library_only: bool = False,
) -> list[Track]:
"""Get (top)tracks for given artist."""
return [
"music/artists/artist_tracks",
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
+ in_library_only=in_library_only,
)
]
self,
item_id: str,
provider_instance_id_or_domain: str,
+ in_library_only: bool = False,
) -> list[Album]:
"""Get (top)albums for given artist."""
return [
"music/artists/artist_albums",
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
+ in_library_only=in_library_only,
)
]
CONF_ICON,
CONF_LOG_LEVEL,
CONF_OUTPUT_CHANNELS,
+ CONF_SAMPLE_RATES,
CONF_SYNC_ADJUST,
CONF_TTS_PRE_ANNOUNCE,
CONF_VOLUME_NORMALIZATION,
ENCRYPT_CALLBACK: callable[[str], str] | None = None
DECRYPT_CALLBACK: callable[[str], str] | None = None
-ConfigValueType = str | int | float | bool | list[str] | list[int] | None
+ConfigValueType = (
+ str
+ | int
+ | float
+ | bool
+ | tuple[int, int]
+ | list[str]
+ | list[int]
+ | list[tuple[int, int]]
+ | None
+)
ConfigEntryTypeMap = {
ConfigEntryType.BOOLEAN: bool,
ConfigEntryType.STRING: str,
ConfigEntryType.SECURE_STRING: str,
ConfigEntryType.INTEGER: int,
+ ConfigEntryType.INTEGER_TUPLE: tuple[int, int],
ConfigEntryType.FLOAT: float,
ConfigEntryType.LABEL: str,
ConfigEntryType.DIVIDER: str,
CONF_ENTRY_PLAYER_ICON_GROUP = ConfigEntry.from_dict(
{**CONF_ENTRY_PLAYER_ICON.to_dict(), "default_value": "mdi-speaker-multiple"}
)
+
+CONF_ENTRY_SAMPLE_RATES = ConfigEntry(
+ key=CONF_SAMPLE_RATES,
+ type=ConfigEntryType.INTEGER_TUPLE,
+ options=[
+ ConfigValueOption("44.1kHz / 16 bits", (44100, 16)),
+ ConfigValueOption("44.1kHz / 24 bits", (44100, 24)),
+ ConfigValueOption("48kHz / 16 bits", (48000, 16)),
+ ConfigValueOption("48kHz / 16 bits", (48000, 24)),
+ ConfigValueOption("88.2kHz / 16 bits", (88200, 16)),
+ ConfigValueOption("88.2kHz / 24 bits", (88200, 24)),
+ ConfigValueOption("96kHz / 16 bits", (96000, 16)),
+ ConfigValueOption("96kHz / 24 bits", (96000, 24)),
+ ConfigValueOption("176.4kHz / 16 bits", (176400, 16)),
+ ConfigValueOption("176.4kHz / 24 bits", (176400, 24)),
+ ConfigValueOption("192kHz / 16 bits", (192000, 16)),
+ ConfigValueOption("192kHz / 24 bits", (192000, 24)),
+ ConfigValueOption("352.8kHz / 16 bits", (352800, 16)),
+ ConfigValueOption("352.8kHz / 24 bits", (352800, 24)),
+ ConfigValueOption("384kHz / 16 bits", (384000, 16)),
+ ConfigValueOption("384kHz / 24 bits", (384000, 24)),
+ ],
+ default_value=[(44100, 16), (48000, 16)],
+ required=True,
+ multi_value=True,
+ label="Sample rates supported by this player",
+ category="advanced",
+ description="The sample rates (and bit depths) supported by this player.\n"
+ "Content with unsupported sample rates will be automatically resampled.",
+)
+
+
+def create_sample_rates_config_entry(
+ max_sample_rate: int,
+ max_bit_depth: int,
+ safe_max_sample_rate: int = 48000,
+ safe_max_bit_depth: int = 16,
+ hidden: bool = False,
+) -> ConfigEntry:
+ """Create sample rates config entry based on player specific helpers."""
+ conf_entry = ConfigEntry.from_dict(CONF_ENTRY_SAMPLE_RATES.to_dict())
+ conf_entry.options = []
+ conf_entry.default_value = []
+ conf_entry.hidden = hidden
+ for option in CONF_ENTRY_SAMPLE_RATES.options:
+ sample_rate, bit_depth = option.value
+ if sample_rate <= max_sample_rate and bit_depth <= max_bit_depth:
+ conf_entry.options.append(option)
+ if sample_rate <= safe_max_sample_rate and bit_depth <= safe_max_bit_depth:
+ conf_entry.default_value.append(option.value)
+ return conf_entry
return cls.UNKNOWN
@classmethod
- def try_parse(cls: ContentType, string: str) -> ContentType:
+ def try_parse(cls: Self, string: str) -> Self:
"""Try to parse ContentType from (url)string/extension."""
tempstr = string.lower()
if "audio/" in tempstr:
INTEGER = "integer"
FLOAT = "float"
LABEL = "label"
+ INTEGER_TUPLE = "integer_tuple"
DIVIDER = "divider"
ACTION = "action"
ICON = "icon"
"""Call after init."""
# having items for unavailable providers can have all sorts
# of unpredictable results so ensure we have accurate availability status
- if available_providers := get_global_cache_value("unique_providers"):
- if TYPE_CHECKING:
- available_providers = cast(set[str], available_providers)
- if not available_providers.intersection({self.provider_domain, self.provider_instance}):
- self.available = False
+ if not (available_providers := get_global_cache_value("unique_providers")):
+ self.available = False
+ return
+ if TYPE_CHECKING:
+ available_providers = cast(set[str], available_providers)
+ if not available_providers.intersection({self.provider_domain, self.provider_instance}):
+ self.available = False
def __hash__(self) -> int:
"""Return custom hash."""
def update(
self,
new_values: MediaItemMetadata,
- allow_overwrite: bool = False,
) -> MediaItemMetadata:
"""Update metadata (in-place) with new values."""
if not new_values:
if new_val is None:
continue
cur_val = getattr(self, fld.name)
- if allow_overwrite and new_val:
- setattr(self, fld.name, new_val)
- elif isinstance(cur_val, list) and isinstance(new_val, list):
+ if isinstance(cur_val, list) and isinstance(new_val, list):
new_val = merge_lists(cur_val, new_val)
setattr(self, fld.name, new_val)
- elif isinstance(cur_val, set) and isinstance(new_val, list):
+ elif isinstance(cur_val, set) and isinstance(new_val, set | list | tuple):
new_val = cur_val.update(new_val)
setattr(self, fld.name, new_val)
elif new_val and fld.name in ("popularity", "last_refresh", "cache_checksum"):
# some fields are always allowed to be overwritten
# (such as checksum and last_refresh)
setattr(self, fld.name, new_val)
- elif cur_val is None or (allow_overwrite and new_val):
+ elif cur_val is None:
setattr(self, fld.name, new_val)
return self
@classmethod
def from_track(
- cls: Self,
+ cls: type,
track: Track,
album: Album | None = None,
disc_number: int | None = None,
position: int
- @classmethod
- def from_track(cls: Self, track: Track, position: int | None = None) -> Self:
- """Cast Track to PlaylistTrack."""
- if position is None:
- position = track.position
- # let mushmumaro instantiate a new object - this will ensure that valididation takes place
- return PlaylistTrack.from_dict(
- {
- **track.to_dict(),
- "position": position,
- }
- )
-
@dataclass(kw_only=True)
class Playlist(MediaItem):
# also referred to as "sync master"
synced_to: str | None = None
- # max_sample_rate: maximum supported sample rate the player supports
- max_sample_rate: int = 48000
-
- # supports_24bit: bool if player supports 24bits (hi res) audio
- supports_24bit: bool = True
-
# enabled_by_default: if the player is enabled by default
# can be used by a player provider to exclude some sort of players
enabled_by_default: bool = True
CONF_PASSWORD: Final[str] = "password"
CONF_VOLUME_NORMALIZATION: Final[str] = "volume_normalization"
CONF_VOLUME_NORMALIZATION_TARGET: Final[str] = "volume_normalization_target"
-CONF_MAX_SAMPLE_RATE: Final[str] = "max_sample_rate"
CONF_EQ_BASS: Final[str] = "eq_bass"
CONF_EQ_MID: Final[str] = "eq_mid"
CONF_EQ_TREBLE: Final[str] = "eq_treble"
CONF_ANNOUNCE_VOLUME_MAX: Final[str] = "announce_volume_max"
CONF_ICON: Final[str] = "icon"
CONF_LANGUAGE: Final[str] = "language"
+CONF_SAMPLE_RATES: Final[str] = "sample_rates"
# config default values
DEFAULT_HOST: Final[str] = "0.0.0.0"
import asyncio
import contextlib
from random import choice, random
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, cast
from music_assistant.common.helpers.datetime import utc_timestamp
+from music_assistant.common.helpers.global_cache import get_global_cache_value
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.models.enums import EventType, ProviderFeature
from music_assistant.common.models.errors import (
# edge case where playlist track has invalid artistdetails
self.logger.warning("Unable to fetch artist details %s - %s", artist.uri, str(err))
album.artists = album_artists
+ if not force_refresh:
+ return album
+ # if force refresh, we need to ensure that we also refresh all album tracks
+ # in case of a filebased (non streaming) provider to ensure we catch changes the user
+ # made on track level and then pressed the refresh button on album level.
+ file_provs = get_global_cache_value("non_streaming_providers", [])
+ for album_provider_mapping in album.provider_mappings:
+ if album_provider_mapping.provider_instance not in file_provs:
+ continue
+ for prov_album_track in await self._get_provider_album_tracks(
+ album_provider_mapping.item_id, album_provider_mapping.provider_instance
+ ):
+ if prov_album_track.provider != "library":
+ continue
+ for track_prov_map in prov_album_track.provider_mappings:
+ if track_prov_map.provider_instance != album_provider_mapping.provider_instance:
+ continue
+ prov_track = await self.mass.music.tracks.get_provider_item(
+ track_prov_map.item_id, track_prov_map.provider_instance, force_refresh=True
+ )
+ if (
+ prov_track.metadata.cache_checksum
+ == prov_album_track.metadata.cache_checksum
+ ):
+ continue
+ await self.mass.music.tracks.update_item_in_library(
+ prov_album_track.item_id, prov_track, True
+ )
+ break
return album
async def add_item_to_library(
"""Update existing record in the database."""
db_id = int(item_id) # ensure integer
cur_item = await self.get_library_item(db_id)
- metadata = cur_item.metadata.update(update.metadata, overwrite)
+ metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
if getattr(update, "album_type", AlbumType.UNKNOWN) != AlbumType.UNKNOWN:
album_type = update.album_type
else:
"""Delete record from the database."""
db_id = int(item_id) # ensure integer
# recursively also remove album tracks
- for db_track in await self._get_db_album_tracks(db_id):
+ for db_track in await self.get_library_album_tracks(db_id):
with contextlib.suppress(MediaNotFoundError):
await self.mass.music.tracks.remove_item_from_library(db_track.item_id)
# delete entry(s) from albumtracks table
self,
item_id: str,
provider_instance_id_or_domain: str,
- ) -> list[Track]:
+ in_library_only: bool = False,
+ ) -> list[AlbumTrack]:
"""Return album tracks for the given provider album id."""
- if provider_instance_id_or_domain == "library":
- return await self._get_db_album_tracks(item_id)
- # return provider album tracks
- return await self._get_provider_album_tracks(item_id, provider_instance_id_or_domain)
+ full_album = await self.get(item_id, provider_instance_id_or_domain)
+ db_items = (
+ await self.get_library_album_tracks(full_album.item_id)
+ if full_album.provider == "library"
+ else []
+ )
+ if full_album.provider == "library" and in_library_only:
+ # return in-library items only
+ return db_items
+ # return all (unique) items from all providers
+ result: list[AlbumTrack] = [*db_items]
+ unique_ids: set[str] = set()
+ for provider_mapping in full_album.provider_mappings:
+ provider_tracks = await self._get_provider_album_tracks(
+ provider_mapping.item_id, provider_mapping.provider_instance
+ )
+ for provider_track in provider_tracks:
+ unique_id = f"{provider_track.disc_number or 1}.{provider_track.track_number}"
+ if unique_id in unique_ids:
+ continue
+ unique_ids.add(unique_id)
+ # prefer db item
+ if db_item := await self.mass.music.tracks.get_library_item_by_prov_id(
+ provider_track.item_id, provider_track.provider
+ ):
+ if db_item in db_items:
+ continue
+ result.append(
+ AlbumTrack.from_track(
+ db_item,
+ full_album,
+ disc_number=provider_track.disc_number,
+ track_number=provider_track.track_number,
+ )
+ )
+ elif not in_library_only and provider_track not in result:
+ result.append(AlbumTrack.from_track(provider_track, full_album))
+ return result
async def versions(
self,
) -> list[Album]:
"""Return all versions of an album we can find on all providers."""
album = await self.get(item_id, provider_instance_id_or_domain, add_to_library=False)
- search_query = f"{album.artists[0].name} - {album.name}"
+ search_query = f"{album.artists[0].name} - {album.name}" if album.artists else album.name
result: list[Album] = []
for provider_id in self.mass.music.get_unique_providers():
provider = self.mass.get_provider(provider_id)
if loose_compare_strings(album.name, prov_item.name)
and compare_artists(prov_item.artists, album.artists, any_match=True)
# make sure that the 'base' version is NOT included
- and prov_item.item_id != item_id
+ and not album.provider_mappings.intersection(prov_item.provider_mappings)
]
return result
+ async def get_library_album_tracks(
+ self,
+ item_id: str | int,
+ ) -> list[AlbumTrack]:
+ """Return in-database album tracks for the given database album."""
+ subquery = (
+ f"SELECT DISTINCT track_id FROM {DB_TABLE_ALBUM_TRACKS} "
+ f"WHERE {DB_TABLE_ALBUM_TRACKS}.album_id = {item_id} AND albums.item_id = {item_id}"
+ )
+ query = f"WHERE {DB_TABLE_TRACKS}.item_id in ({subquery})"
+ result = await self.mass.music.tracks.library_items(extra_query=query)
+ if TYPE_CHECKING:
+ return cast(list[AlbumTrack], result.items)
+ return result.items
+
async def _add_library_item(self, item: Album) -> Album:
"""Add a new record to the database."""
new_item = await self.mass.music.database.insert(
async def _get_provider_album_tracks(
self, item_id: str, provider_instance_id_or_domain: str
- ) -> list[AlbumTrack]:
+ ) -> list[Track]:
"""Return album tracks for the given provider album id."""
- assert provider_instance_id_or_domain != "library"
- prov = self.mass.get_provider(provider_instance_id_or_domain)
+ prov: MusicProvider = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
return []
-
- full_album = await self.get_provider_item(item_id, provider_instance_id_or_domain)
- # prefer cache items (if any) for streaming providers only
+ # prefer cache items (if any) - for streaming providers only
cache_key = f"{prov.lookup_key}.albumtracks.{item_id}"
if (
prov.is_streaming_provider
and (cache := await self.mass.cache.get(cache_key)) is not None
):
- return [AlbumTrack.from_dict(x) for x in cache]
+ return [Track.from_dict(x) for x in cache]
# no items in cache - get listing from provider
- items = []
- for track in await prov.get_album_tracks(item_id):
- assert isinstance(track, AlbumTrack)
- # make sure that the (full) album is stored on the tracks
- track.album = full_album
- if not isinstance(full_album, ItemMapping) and full_album.metadata.images:
- track.metadata.images = full_album.metadata.images
- items.append(track)
+ items = await prov.get_album_tracks(item_id)
# store (serializable items) in cache
if prov.is_streaming_provider:
self.mass.create_task(self.mass.cache.set(cache_key, [x.to_dict() for x in items]))
msg = "No Music Provider found that supports requesting similar tracks."
raise UnsupportedFeaturedException(msg)
- async def _get_db_album_tracks(
- self,
- item_id: str | int,
- ) -> list[AlbumTrack]:
- """Return in-database album tracks for the given database album."""
- db_id = int(item_id) # ensure integer
- db_album = await self.get_library_item(db_id)
- subquery = f"SELECT track_id FROM {DB_TABLE_ALBUM_TRACKS} WHERE album_id = {item_id}"
- query = f"WHERE {DB_TABLE_TRACKS}.item_id in ({subquery})"
- return sorted(
- [
- AlbumTrack.from_track(track, db_album)
- async for track in self.mass.music.tracks.iter_library_items(extra_query=query)
- ],
- key=lambda x: (x.disc_number, x.track_number),
- )
-
async def _match(self, db_album: Album) -> None:
"""Try to find match on all (streaming) providers for the provided (database) album.
"""Update existing record in the database."""
db_id = int(item_id) # ensure integer
cur_item = await self.get_library_item(db_id)
- metadata = cur_item.metadata.update(getattr(update, "metadata", None), overwrite)
+ metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
cur_item.external_ids.update(update.external_ids)
# enforce various artists name + id
mbid = cur_item.mbid
self,
item_id: str,
provider_instance_id_or_domain: str,
+ in_library_only: bool = False,
) -> list[Track]:
"""Return all/top tracks for an artist."""
- if provider_instance_id_or_domain == "library":
- return await self.get_library_artist_tracks(
- item_id,
- )
- return await self.get_provider_artist_toptracks(
- item_id,
- provider_instance_id_or_domain,
+ full_artist = await self.get(item_id, provider_instance_id_or_domain)
+ db_items = (
+ await self.get_library_artist_tracks(full_artist.item_id)
+ if full_artist.provider == "library"
+ else []
)
+ if full_artist.provider == "library" and in_library_only:
+ # return in-library items only
+ return db_items
+ # return all (unique) items from all providers
+ result: list[Track] = [*db_items]
+ unique_ids: set[str] = set()
+ for provider_mapping in full_artist.provider_mappings:
+ provider_tracks = await self.get_provider_artist_toptracks(
+ provider_mapping.item_id, provider_mapping.provider_instance
+ )
+ for provider_track in provider_tracks:
+ unique_id = f"{provider_track.name}.{provider_track.version}"
+ if unique_id in unique_ids:
+ continue
+ unique_ids.add(unique_id)
+ # prefer db item
+ if db_item := await self.mass.music.tracks.get_library_item_by_prov_id(
+ provider_track.item_id, provider_track.provider
+ ):
+ if db_item not in db_items:
+ result.append(db_item)
+ elif not in_library_only and provider_track not in result:
+ result.append(provider_track)
+ return result
async def albums(
self,
item_id: str,
provider_instance_id_or_domain: str,
+ in_library_only: bool = False,
) -> list[Album]:
"""Return (all/most popular) albums for an artist."""
- if provider_instance_id_or_domain == "library":
- return await self.get_library_artist_albums(
- item_id,
- )
- return await self.get_provider_artist_albums(
- item_id,
- provider_instance_id_or_domain,
+ full_artist = await self.get(item_id, provider_instance_id_or_domain)
+ db_items = (
+ await self.get_library_artist_albums(full_artist.item_id)
+ if full_artist.provider == "library"
+ else []
)
+ if full_artist.provider == "library" and in_library_only:
+ # return in-library items only
+ return db_items
+ # return all (unique) items from all providers
+ result: list[Album] = [*db_items]
+ unique_ids: set[str] = set()
+ for provider_mapping in full_artist.provider_mappings:
+ provider_albums = await self.get_provider_artist_albums(
+ provider_mapping.item_id, provider_mapping.provider_instance
+ )
+ for provider_album in provider_albums:
+ unique_id = f"{provider_album.name}.{provider_album.version}"
+ if unique_id in unique_ids:
+ continue
+ unique_ids.add(unique_id)
+ # prefer db item
+ if db_item := await self.mass.music.albums.get_library_item_by_prov_id(
+ provider_album.item_id, provider_album.provider
+ ):
+ if db_item not in db_items:
+ result.append(db_item)
+ elif not in_library_only and provider_album not in result:
+ result.append(provider_album)
+ return result
async def remove_item_from_library(self, item_id: str | int) -> None:
"""Delete record from the database."""
self,
item_id: str | int,
) -> list[Track]:
- """Return all tracks for an artist in the library."""
+ """Return all tracks for an artist in the library/db."""
subquery = f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = {item_id}"
query = f"WHERE {DB_TABLE_TRACKS}.item_id in ({subquery})"
paged_list = await self.mass.music.tracks.library_items(extra_query=query)
query_parts.append(extra_query)
if search:
params["search"] = f"%{search}%"
- if self.media_type in (MediaType.ALBUM, MediaType.TRACK):
+ if self.media_type == MediaType.ALBUM:
query_parts.append(
- f"({self.db_table}.name LIKE :search OR {self.db_table}.sort_name LIKE :search)"
+ f"({self.db_table}.name LIKE :search OR {self.db_table}.sort_name LIKE :search "
+ "OR sort_artist LIKE :search)"
+ )
+ elif self.media_type == MediaType.TRACK:
+ query_parts.append(
+ f"({self.db_table}.name LIKE :search OR {self.db_table}.sort_name LIKE :search "
+ "OR sort_artist LIKE :search OR sort_album LIKE :search)"
)
else:
- query_parts.append(f"{self.db_table}.name LIKE :search")
+ query_parts.append(
+ f"{self.db_table}.name LIKE :search OR {self.db_table}.sort_name LIKE :search"
+ )
if favorite is not None:
query_parts.append(f"{self.db_table}.favorite = :favorite")
params["favorite"] = favorite
if provider_mapping in library_item.provider_mappings:
return
# update provider_mappings table
- await self._set_provider_mappings(
- item_id=item_id, provider_mappings=library_item.provider_mappings
- )
+ await self._set_provider_mappings(item_id=item_id, provider_mappings=[provider_mapping])
async def remove_provider_mapping(
self, item_id: str | int, provider_instance_id: str, provider_item_id: str
for key in JSON_KEYS:
if key in db_row_dict and db_row_dict[key] not in (None, ""):
db_row_dict[key] = json_loads(db_row_dict[key])
+ if key == "provider_mappings":
+ for prov_mapping_dict in db_row_dict[key]:
+ prov_mapping_dict["available"] = bool(prov_mapping_dict["available"])
if "favorite" in db_row_dict:
db_row_dict["favorite"] = bool(db_row_dict["favorite"])
import asyncio
import random
from collections.abc import AsyncGenerator
-from typing import Any
+from typing import TYPE_CHECKING, Any, cast
from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.helpers.json import serialize_to_json
)
from music_assistant.common.models.media_items import ItemMapping, Playlist, PlaylistTrack, Track
from music_assistant.constants import DB_TABLE_PLAYLISTS
+from music_assistant.server.models.music_provider import MusicProvider
from .base import MediaControllerBase
"""Update existing record in the database."""
db_id = int(item_id) # ensure integer
cur_item = await self.get_library_item(db_id)
- metadata = cur_item.metadata.update(getattr(update, "metadata", None), overwrite)
+ metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
cur_item.external_ids.update(update.external_ids)
await self.mass.music.database.update(
self.db_table,
) -> AsyncGenerator[PlaylistTrack, None]:
"""Return album tracks for the given provider album id."""
assert provider_instance_id_or_domain != "library"
- provider = self.mass.get_provider(provider_instance_id_or_domain)
+ provider: MusicProvider = self.mass.get_provider(provider_instance_id_or_domain)
if not provider:
return
# prefer cache items (if any)
async for item in provider.get_playlist_tracks(item_id):
# double check if position set
assert item.position is not None, "Playlist items require position to be set"
- yield item
+ yield cast(PlaylistTrack, item) if TYPE_CHECKING else item
all_items.append(item)
# if this is a complete track object, pre-cache it as
# that will save us an (expensive) lookup later
"""Update existing record in the database."""
db_id = int(item_id) # ensure integer
cur_item = await self.get_library_item(db_id)
- metadata = cur_item.metadata.update(getattr(update, "metadata", None), overwrite)
+ metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
cur_item.external_ids.update(update.external_ids)
match = {"item_id": db_id}
await self.mass.music.database.update(
SELECT
{self.db_table}.*,
{DB_TABLE_ARTISTS}.sort_name AS sort_artist,
- {DB_TABLE_ARTISTS}.sort_name AS sort_album,
+ {DB_TABLE_ALBUMS}.sort_name AS sort_album,
json_group_array(
DISTINCT json_object(
'item_id', {DB_TABLE_PROVIDER_MAPPINGS}.provider_item_id,
"""Update Track record in the database, merging data."""
db_id = int(item_id) # ensure integer
cur_item = await self.get_library_item(db_id)
- metadata = cur_item.metadata.update(getattr(update, "metadata", None), overwrite)
+ metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
cur_item.external_ids.update(update.external_ids)
await self.mass.music.database.update(
self.db_table,
if loose_compare_strings(track.name, prov_item.name)
and compare_artists(prov_item.artists, track.artists, any_match=True)
# make sure that the 'base' version is NOT included
- and prov_item.item_id != item_id
+ and not track.provider_mappings.intersection(prov_item.provider_mappings)
]
return result
self,
item_id: str,
provider_instance_id_or_domain: str,
+ in_library_only: bool = False,
) -> list[Album]:
"""Return all albums the track appears on."""
- if provider_instance_id_or_domain == "library":
- return [
- await self.mass.music.albums.get_library_item(album_track_row["album_id"])
- async for album_track_row in self.mass.music.database.iter_items(
- DB_TABLE_ALBUM_TRACKS, {"track_id": int(item_id)}
- )
- ]
+ full_track = await self.get(item_id, provider_instance_id_or_domain)
+ db_items = (
+ await self.get_library_track_albums(full_track.item_id)
+ if full_track.provider == "library"
+ else []
+ )
+ if full_track.provider == "library" and in_library_only:
+ # return in-library items only
+ return db_items
+ # return all (unique) items from all providers
+ result: list[Album] = [*db_items]
# use search to get all items on the provider
+ search_query = f"{full_track.artist_str} - {full_track.name}"
# TODO: we could use musicbrainz info here to get a list of all releases known
- track = await self.get(item_id, provider_instance_id_or_domain, add_to_library=False)
- search_query = f"{track.artists[0].name} - {track.name}"
- return [
- prov_item.album
- for prov_item in await self.search(search_query, provider_instance_id_or_domain)
- if loose_compare_strings(track.name, prov_item.name)
- and prov_item.album
- and compare_artists(prov_item.artists, track.artists, any_match=True)
- ]
+ result: list[Track] = [*db_items]
+ unique_ids: set[str] = set()
+ for prov_item in (await self.mass.music.search(search_query, [MediaType.TRACK])).tracks:
+ if not loose_compare_strings(full_track.name, prov_item.name):
+ continue
+ if not prov_item.album:
+ continue
+ if not compare_artists(full_track.artists, prov_item.artists, any_match=True):
+ continue
+ unique_id = f"{prov_item.album.name}.{prov_item.album.version}"
+ if unique_id in unique_ids:
+ continue
+ unique_ids.add(unique_id)
+ # prefer db item
+ if db_item := await self.mass.music.albums.get_library_item_by_prov_id(
+ prov_item.album.item_id, prov_item.album.provider
+ ):
+ if db_item not in db_items:
+ result.append(db_item)
+ elif not in_library_only:
+ result.append(prov_item.album)
+ return result
async def remove_item_from_library(self, item_id: str | int) -> None:
"""Delete record from the database."""
f"provider={provider_instance_id_or_domain}&item_id={enc_track_id}"
)
+ async def get_library_track_albums(
+ self,
+ item_id: str | int,
+ ) -> list[Album]:
+ """Return all in-library albums for a track."""
+ subquery = f"SELECT album_id FROM {DB_TABLE_ALBUM_TRACKS} WHERE track_id = {item_id}"
+ query = f"WHERE {DB_TABLE_ALBUMS}.item_id in ({subquery})"
+ paged_list = await self.mass.music.albums.library_items(extra_query=query)
+ return paged_list.items
+
async def _match(self, db_track: Track) -> None:
"""Try to find matching track on all providers for the provided (database) track_id.
if ProviderFeature.ARTIST_METADATA not in provider.supported_features:
continue
if metadata := await provider.get_artist_metadata(artist):
- artist.metadata.update(metadata, allow_overwrite=False)
+ artist.metadata.update(metadata)
self.logger.debug(
"Fetched metadata for Artist %s on provider %s",
artist.name,
if ProviderFeature.ALBUM_METADATA not in provider.supported_features:
continue
if metadata := await provider.get_album_metadata(album):
- album.metadata.update(metadata, allow_overwrite=False)
+ album.metadata.update(metadata)
self.logger.debug(
"Fetched metadata for Album %s on provider %s",
album.name,
if ProviderFeature.TRACK_METADATA not in provider.supported_features:
continue
if metadata := await provider.get_track_metadata(track):
- track.metadata.update(metadata, allow_overwrite=False)
+ track.metadata.update(metadata)
self.logger.debug(
"Fetched metadata for Track %s on provider %s",
track.name,
from typing import TYPE_CHECKING
from music_assistant.common.helpers.datetime import utc_timestamp
+from music_assistant.common.helpers.global_cache import get_global_cache_value
from music_assistant.common.helpers.json import json_dumps, json_loads
from music_assistant.common.helpers.uri import parse_uri
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
db_rows = await self.mass.music.database.get_rows_from_query(query, limit=limit)
result: list[MediaItemType] = []
for db_row in db_rows:
+ if db_row["provider"] not in get_global_cache_value("unique_providers", []):
+ continue
with suppress(MediaNotFoundError, ProviderUnavailableError):
media_type = MediaType(db_row["media_type"])
item = await self.get_item(media_type, db_row["item_id"], db_row["provider"])
[url] text,
[audio_format] json,
[details] json,
- UNIQUE(media_type, provider_instance, provider_item_id)
+ UNIQUE(media_type, item_id, provider_instance, provider_item_id)
);"""
)
PlayerUnavailableError,
QueueEmpty,
)
-from music_assistant.common.models.media_items import MediaItemType, media_from_dict
+from music_assistant.common.models.media_items import AlbumTrack, MediaItemType, media_from_dict
from music_assistant.common.models.player import PlayerMedia
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
)
- if artist_items_conf == "library_tracks":
- # make sure we have an in-library artist
- artist = await self.mass.music.artists.get(
- artist.item_id, artist.provider, lazy=False, details=artist
- )
- return await self.mass.music.artists.get_library_artist_tracks(artist.item_id)
- if artist_items_conf == "library_album_tracks":
- # make sure we have an in-library artist
- artist = await self.mass.music.artists.get(
- artist.item_id, artist.provider, lazy=False, details=artist
+ if artist_items_conf in ("library_tracks", "all_tracks"):
+ all_items = await self.mass.music.artists.tracks(
+ artist.item_id,
+ artist.provider,
+ in_library_only=artist_items_conf == "library_tracks",
)
+ random.shuffle(all_items)
+ return all_items
+
+ if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
all_items: list[Track] = []
- for library_album in await self.mass.music.artists.get_library_artist_albums(
- artist.item_id
+ for library_album in await self.mass.music.artists.albums(
+ artist.item_id,
+ artist.provider,
+ in_library_only=artist_items_conf == "library_album_tracks",
):
for album_track in self.mass.music.albums.tracks(
library_album.item_id, library_album.provider
all_items.append(album_track)
random.shuffle(all_items)
return all_items
- if artist_items_conf == "all_tracks":
- artist = await self.mass.music.artists.get(
- artist.item_id, artist.provider, details=artist
- )
- all_items: list[Track] = []
- unique_tracks = set()
- for provider in artist.provider_mappings:
- for artist_track in await self.mass.music.artists.tracks(
- provider.item_id, provider.provider_instance
- ):
- if artist_track in all_items:
- continue
- unique_key = f"{artist_track.name.lower()}.{artist_track.version.lower()}"
- if unique_key in unique_tracks:
- continue
- all_items.append(artist_track)
- unique_tracks.add(unique_key)
- random.shuffle(all_items)
- return all_items
- if artist_items_conf == "all_album_tracks":
- artist = await self.mass.music.artists.get(
- artist.item_id, artist.provider, details=artist
- )
- all_items: list[Track] = []
- unique_tracks = set()
- for provider in artist.provider_mappings:
- for album in await self.mass.music.artists.albums(
- provider.item_id, provider.provider_instance
- ):
- for album_track in await self.mass.music.albums.tracks(
- album.item_id, album.provider
- ):
- if album_track in all_items:
- continue
- unique_key = f"{album_track.name.lower()}.{album_track.version.lower()}.{album_track.duration}" # noqa: E501
- if unique_key in unique_tracks:
- continue
- all_items.append(album_track)
- unique_tracks.add(unique_key)
- random.shuffle(all_items)
- return all_items
+
return []
- async def get_album_tracks(self, album: Album) -> list[Track]:
+ async def get_album_tracks(self, album: Album) -> list[AlbumTrack]:
"""Return tracks for given album, based on user preference."""
album_items_conf = self.mass.config.get_raw_core_config_value(
self.domain,
CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
)
- if album_items_conf == "library_tracks":
- # make sure we have an in-library album
- album = await self.mass.music.albums.get(
- album.item_id, album.provider, lazy=False, details=album
- )
- return await self.mass.music.albums.tracks(album.item_id, album.provider)
- if album_items_conf == "all_tracks":
- for provider in album.provider_mappings:
- if album_tracks := await self.mass.music.albums.tracks(
- provider.item_id, provider.provider_instance
- ):
- return album_tracks
- return []
+ return await self.mass.music.albums.tracks(
+ item_id=album.item_id,
+ provider_instance_id_or_domain=album.provider,
+ in_library_only=album_items_conf == "library_tracks",
+ )
def __get_queue_stream_index(self, queue: PlayerQueue, player: Player) -> tuple[int, int]:
"""Calculate current queue index and current track elapsed time."""
CONF_CROSSFADE_DURATION,
CONF_OUTPUT_CHANNELS,
CONF_PUBLISH_IP,
+ CONF_SAMPLE_RATES,
SILENCE_FILE,
)
from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
# work out output format/details
output_format = await self._get_output_format(
output_format_str=request.match_info["fmt"],
- queue_player=queue_player,
+ player=queue_player,
default_sample_rate=queue_item.streamdetails.audio_format.sample_rate,
- default_bit_depth=24, # always prefer 24 bits to prevent dithering
+ default_bit_depth=queue_item.streamdetails.audio_format.bit_depth,
)
# prepare request, add some DLNA/UPNP compatible headers
headers = {
start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
if not start_queue_item:
raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
+
+ # select the highest possible PCM settings for this player
+ flow_pcm_format = await self._select_flow_format(queue_player)
+
# work out output format/details
output_format = await self._get_output_format(
output_format_str=request.match_info["fmt"],
- queue_player=queue_player,
- default_sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
- default_bit_depth=24, # always prefer 24 bits to prevent dithering
+ player=queue_player,
+ default_sample_rate=flow_pcm_format.sample_rate,
+ default_bit_depth=flow_pcm_format.bit_depth,
)
# play it safe: only allow icy metadata for mp3 and aac
enable_icy = request.headers.get(
# all checks passed, start streaming!
self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
- # collect player specific ffmpeg args to re-encode the source PCM stream
- pcm_format = AudioFormat(
- content_type=ContentType.from_bit_depth(output_format.bit_depth),
- sample_rate=output_format.sample_rate,
- bit_depth=output_format.bit_depth,
- channels=2,
- )
async for chunk in get_ffmpeg_stream(
audio_input=self.get_flow_stream(
- queue=queue, start_queue_item=start_queue_item, pcm_format=pcm_format
+ queue=queue, start_queue_item=start_queue_item, pcm_format=flow_pcm_format
),
- input_format=pcm_format,
+ input_format=flow_pcm_format,
output_format=output_format,
filter_params=get_player_filter_params(self.mass, queue_player.player_id),
chunk_size=icy_meta_interval if enable_icy else None,
collect_log_history=True,
logger=logger,
) as ffmpeg_proc:
- async for chunk in ffmpeg_proc.iter_any(pcm_format.pcm_sample_size):
+ async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
bytes_sent += len(chunk)
yield chunk
- del chunk
+ # del chunk
finished = True
finally:
if finished and not ffmpeg_proc.closed:
async def _get_output_format(
self,
output_format_str: str,
- queue_player: Player,
+ player: Player,
default_sample_rate: int,
default_bit_depth: int,
) -> AudioFormat:
"""Parse (player specific) output format details for given format string."""
- content_type = ContentType.try_parse(output_format_str)
+ content_type: ContentType = ContentType.try_parse(output_format_str)
+ supported_rates_conf = await self.mass.config.get_player_config_value(
+ player.player_id, CONF_SAMPLE_RATES
+ )
+ supported_sample_rates: tuple[int] = tuple(x[0] for x in supported_rates_conf)
+ supported_bit_depths: tuple[int] = tuple(x[1] for x in supported_rates_conf)
+ player_max_bit_depth = max(supported_bit_depths)
if content_type.is_pcm() or content_type == ContentType.WAV:
# parse pcm details from format string
output_sample_rate, output_bit_depth, output_channels = parse_pcm_info(
# resolve generic pcm type
content_type = ContentType.from_bit_depth(output_bit_depth)
else:
- output_sample_rate = min(default_sample_rate, queue_player.max_sample_rate)
- player_max_bit_depth = 24 if queue_player.supports_24bit else 16
+ if default_sample_rate in supported_sample_rates:
+ output_sample_rate = default_sample_rate
+ else:
+ output_sample_rate = min(supported_sample_rates)
output_bit_depth = min(default_bit_depth, player_max_bit_depth)
output_channels_str = self.mass.config.get_raw_player_config_value(
- queue_player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
+ player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
)
output_channels = 1 if output_channels_str != "stereo" else 2
return AudioFormat(
channels=output_channels,
output_format_str=output_format_str,
)
+
+ async def _select_flow_format(
+ self,
+ player: Player,
+ ) -> AudioFormat:
+ """Parse (player specific) flow stream PCM format."""
+ supported_rates_conf = await self.mass.config.get_player_config_value(
+ player.player_id, CONF_SAMPLE_RATES
+ )
+ supported_sample_rates: tuple[int] = tuple(x[0] for x in supported_rates_conf)
+ supported_bit_depths: tuple[int] = tuple(x[1] for x in supported_rates_conf)
+ player_max_bit_depth = max(supported_bit_depths)
+ for sample_rate in (192000, 96000, 48000, 44100):
+ if sample_rate in supported_sample_rates:
+ output_sample_rate = sample_rate
+ break
+ output_bit_depth = min(24, player_max_bit_depth)
+ return AudioFormat(
+ content_type=ContentType.from_bit_depth(output_bit_depth),
+ sample_rate=output_sample_rate,
+ bit_depth=output_bit_depth,
+ channels=2,
+ )
logger=logger,
) as ffmpeg_proc:
# read final chunks from stdout
- iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
+ iterator = (
+ ffmpeg_proc.iter_chunked(chunk_size)
+ if chunk_size
+ else ffmpeg_proc.iter_any(get_chunksize(output_format))
+ )
async for chunk in iterator:
yield chunk
"-",
]
async with AsyncProcess(args, stdout=True) as ffmpeg_proc:
- async for chunk in ffmpeg_proc.iter_any():
+ async for chunk in ffmpeg_proc.iter_chunked():
yield chunk
from music_assistant.common.models.enums import ExternalID
from music_assistant.common.models.media_items import (
Album,
- AlbumTrack,
Artist,
ItemMapping,
MediaItem,
def compare_track(
- base_item: Track | AlbumTrack,
- compare_item: Track | AlbumTrack,
+ base_item: Track,
+ compare_item: Track,
strict: bool = True,
track_albums: list[Album | ItemMapping] | None = None,
) -> bool:
return abs(base_item.duration - compare_item.duration) <= 3
# exact albumtrack match = 100% match
if (
- isinstance(base_item, AlbumTrack)
- and isinstance(compare_item, AlbumTrack)
+ base_item.album
+ and compare_item.album
and compare_album(base_item.album, compare_item.album)
and base_item.track_number == compare_item.track_number
):
from music_assistant.common.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.common.models.media_items import (
Album,
- AlbumTrack,
Artist,
BrowseFolder,
MediaItemType,
Playlist,
- PlaylistTrack,
Radio,
SearchResults,
Track,
raise NotImplementedError
yield # type: ignore
- async def get_library_tracks(self) -> AsyncGenerator[Track | AlbumTrack, None]:
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from the provider."""
if ProviderFeature.LIBRARY_TRACKS in self.supported_features:
raise NotImplementedError
async def get_album_tracks(
self,
prov_album_id: str, # type: ignore[return]
- ) -> list[AlbumTrack]:
+ ) -> list[Track]:
"""Get album tracks for given album id."""
if ProviderFeature.LIBRARY_ALBUMS in self.supported_features:
raise NotImplementedError
async def get_playlist_tracks( # type: ignore[return]
self, prov_playlist_id: str
- ) -> AsyncGenerator[PlaylistTrack, None]:
+ ) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given playlist id."""
if ProviderFeature.LIBRARY_PLAYLISTS in self.supported_features:
raise NotImplementedError
CONF_ENTRY_HIDE_PLAYER,
CONF_ENTRY_PLAYER_ICON,
CONF_ENTRY_PLAYER_ICON_GROUP,
+ CONF_ENTRY_SAMPLE_RATES,
CONF_ENTRY_TTS_PRE_ANNOUNCE,
CONF_ENTRY_VOLUME_NORMALIZATION,
CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
CONF_ENTRY_HIDE_PLAYER,
CONF_ENTRY_TTS_PRE_ANNOUNCE,
+ CONF_ENTRY_SAMPLE_RATES,
)
if player_id.startswith(SYNCGROUP_PREFIX):
# add default entries for syncgroups
CONF_ENTRY_SYNC_ADJUST,
ConfigEntry,
ConfigValueType,
+ create_sample_rates_config_entry,
)
from music_assistant.common.models.enums import (
ConfigEntryType,
content_type=ContentType.from_bit_depth(16), sample_rate=44100, bit_depth=16
)
+# airplay has fixed sample rate/bit depth so make this config entry static and hidden
+CONF_ENTRY_SAMPLE_RATES_AIRPLAY = create_sample_rates_config_entry(44100, 16, 44100, 16, True)
+
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
base_entries = await super().get_player_config_entries(player_id)
if player_id not in self._players:
# most probably a syncgroup
- return (*base_entries, CONF_ENTRY_CROSSFADE, CONF_ENTRY_CROSSFADE_DURATION)
- return base_entries + PLAYER_CONFIG_ENTRIES
+ return (
+ *base_entries,
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_SAMPLE_RATES_AIRPLAY,
+ )
+ return (*base_entries, *PLAYER_CONFIG_ENTRIES, CONF_ENTRY_SAMPLE_RATES_AIRPLAY)
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player.
PlayerFeature.SYNC,
PlayerFeature.VOLUME_SET,
),
- max_sample_rate=44100,
- supports_24bit=False,
can_sync_with=tuple(x for x in self._players if x != player_id),
volume_level=volume,
)
ProviderUnavailableError,
)
from music_assistant.common.models.media_items import (
- AlbumTrack,
Artist,
AudioFormat,
MediaItemImage,
MediaItemMetadata,
MediaItemType,
Playlist,
- PlaylistTrack,
ProviderMapping,
Radio,
Track,
return await self.parse_item(prov_item_id)
raise NotImplementedError
- async def get_library_tracks(self) -> AsyncGenerator[Track | AlbumTrack, None]:
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from the provider."""
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
for item in stored_items:
self.mass.config.set(key, stored_items)
return True
- async def get_playlist_tracks(
- self, prov_playlist_id: str
- ) -> AsyncGenerator[PlaylistTrack, None]:
+ async def get_playlist_tracks(self, prov_playlist_id: str) -> AsyncGenerator[Track, None]:
# handle built-in playlists
"""Get all playlist tracks for given playlist id."""
if prov_playlist_id in BUILTIN_PLAYLISTS:
# as we only need basic track info here
media_type, provider_instance_id_or_domain, item_id = await parse_uri(uri)
media_controller = self.mass.music.get_controller(media_type)
- base_item = await media_controller.get_provider_item(
+ track = await media_controller.get_provider_item(
item_id, provider_instance_id_or_domain
)
- yield PlaylistTrack.from_dict({**base_item.to_dict(), "position": count})
+ track.position = count
+ yield track
except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
self.logger.warning("Skipping item in playlist: %s:%s", uri, str(err))
async def _get_builtin_playlist_tracks(
self, builtin_playlist_id: str
- ) -> AsyncGenerator[PlaylistTrack, None]:
+ ) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given builtin playlist id."""
count = 0
if builtin_playlist_id == ALL_LIBRARY_TRACKS:
async for item in self.mass.music.tracks.iter_library_items(order_by="RANDOM()"):
count += 1
- yield PlaylistTrack.from_dict({**item.to_dict(), "position": count})
+ item.position = count
+ yield item
return
if builtin_playlist_id == ALL_FAVORITE_TRACKS:
async for item in self.mass.music.tracks.iter_library_items(
favorite=True, order_by="RANDOM()"
):
count += 1
- yield PlaylistTrack.from_dict({**item.to_dict(), "position": count})
+ item.position = count
+ yield item
return
if builtin_playlist_id == RANDOM_TRACKS:
async for item in self.mass.music.tracks.iter_library_items(order_by="RANDOM()"):
count += 1
- yield PlaylistTrack.from_dict({**item.to_dict(), "position": count})
+ item.position = count
+ yield item
if count == 100:
return
return
# already handles unwrapping an album by user preference
for album_track in await self.mass.player_queues.get_album_tracks(random_album):
count += 1
- yield PlaylistTrack.from_dict({**album_track.to_dict(), "position": count})
+ album_track.position = count
+ yield album_track
return
if builtin_playlist_id == RANDOM_ARTIST:
async for random_artist in self.mass.music.artists.iter_library_items(
# already handles unwrapping an artist by user preference
for artist_track in await self.mass.player_queues.get_artist_tracks(random_artist):
count += 1
- yield PlaylistTrack.from_dict({**artist_track.to_dict(), "position": count})
+ artist_track.position = count
+ yield artist_track
return
if builtin_playlist_id == RECENTLY_PLAYED:
for track in await self.mass.music.recently_played(250, [MediaType.TRACK]):
count += 1
- yield PlaylistTrack.from_dict({**track.to_dict(), "position": count})
+ track.position = count
+ yield track
return
async def _read_playlist_file_items(self, playlist_id: str) -> list[str]:
CONF_ENTRY_CROSSFADE_FLOW_MODE_REQUIRED,
ConfigEntry,
ConfigValueType,
+ create_sample_rates_config_entry,
)
from music_assistant.common.models.enums import MediaType, PlayerFeature, PlayerState, PlayerType
from music_assistant.common.models.errors import PlayerUnavailableError
CONF_ENTRY_CROSSFADE_DURATION,
)
+# originally/officially cast supports 96k sample rate (even for groups)
+# but it seems a (recent?) update broke this ?!
+# For now only set safe default values and let the user try out higher values
+CONF_ENTRY_SAMPLE_RATES_CAST = create_sample_rates_config_entry(96000, 24, 48000, 24)
+CONF_ENTRY_SAMPLE_RATES_CAST_GROUP = create_sample_rates_config_entry(96000, 24, 44100, 16)
+
+
MASS_APP_ID = "C35B0678"
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
+ cast_player = self.castplayers.get(player_id)
base_entries = await super().get_player_config_entries(player_id)
- return base_entries + PLAYER_CONFIG_ENTRIES
+ if cast_player and cast_player.cast_info.is_audio_group:
+ return (*base_entries, *PLAYER_CONFIG_ENTRIES, CONF_ENTRY_SAMPLE_RATES_CAST_GROUP)
+ return (*base_entries, *PLAYER_CONFIG_ENTRIES, CONF_ENTRY_SAMPLE_RATES_CAST)
def on_player_config_changed(
self,
PlayerFeature.ENQUEUE_NEXT,
PlayerFeature.PAUSE,
),
- # originally/officially cast supports 96k sample rate
- # but it seems a (recent?) update broke this
- # for now use 48k as max sample rate to play safe
- max_sample_rate=48000,
- supports_24bit=True,
enabled_by_default=enabled_by_default,
),
)
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from math import ceil
-from typing import Any
import deezer
from aiohttp import ClientSession, ClientTimeout
from music_assistant.common.models.errors import LoginFailed
from music_assistant.common.models.media_items import (
Album,
- AlbumTrack,
Artist,
AudioFormat,
ItemMapping,
MediaItemMetadata,
MediaItemType,
Playlist,
- PlaylistTrack,
ProviderMapping,
SearchResults,
Track,
except deezer_exceptions.DeezerErrorResponse as error:
self.logger.warning("Failed getting track: %s", error)
- async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]:
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get all tracks in an album."""
album = await self.client.get_album(album_id=int(prov_album_id))
return [
self.parse_track(
track=deezer_track,
user_country=self.gw_client.user_country,
- extra_init_kwargs={"disc_number": 0, "track_number": count + 1},
+ # TODO: doesn't Deezer have disc and track number in the api ?
+ position=count,
)
- for count, deezer_track in enumerate(await album.get_tracks())
+ for count, deezer_track in enumerate(await album.get_tracks(), 1)
]
- async def get_playlist_tracks(
- self, prov_playlist_id: str
- ) -> AsyncGenerator[PlaylistTrack, None]:
+ async def get_playlist_tracks(self, prov_playlist_id: str) -> AsyncGenerator[Track, None]:
"""Get all tracks in a playlist."""
playlist = await self.client.get_playlist(int(prov_playlist_id))
- count = 1
- async for deezer_track in await playlist.get_tracks():
+ for count, deezer_track in enumerate(await playlist.get_tracks(), 1):
yield self.parse_track(
- track=deezer_track,
- user_country=self.gw_client.user_country,
- extra_init_kwargs={"position": count},
+ track=deezer_track, user_country=self.gw_client.user_country, position=count
)
- count += 1
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get albums by an artist."""
return playlist.creator
return playlist.user
- def parse_track(
- self,
- track: deezer.Track,
- user_country: str,
- extra_init_kwargs: dict[str, Any] | None = None,
- ) -> Track | PlaylistTrack | AlbumTrack:
+ def parse_track(self, track: deezer.Track, user_country: str, position: int = 0) -> Track:
"""Parse the deezer-python track to a Music Assistant track."""
if hasattr(track, "artist"):
artist = ItemMapping(
)
else:
album = None
- if extra_init_kwargs is None:
- extra_init_kwargs = {}
- track_class = Track
- elif "position" in extra_init_kwargs:
- track_class = PlaylistTrack
- elif "disc_number" in extra_init_kwargs and "track_number" in extra_init_kwargs:
- track_class = AlbumTrack
- else:
- track_class = Track
- item = track_class(
+
+ item = Track(
item_id=str(track.id),
provider=self.domain,
name=track.title,
)
},
metadata=self.parse_metadata_track(track=track),
- **extra_init_kwargs,
+ track_number=position,
+ position=position,
)
if isrc := getattr(track, "isrc", None):
item.external_ids.add((ExternalID.ISRC, isrc))
CONF_ENTRY_ENFORCE_MP3,
ConfigEntry,
ConfigValueType,
+ create_sample_rates_config_entry,
)
from music_assistant.common.models.enums import (
ConfigEntryType,
CONF_ENTRY_CROSSFADE_FLOW_MODE_REQUIRED,
CONF_ENTRY_CROSSFADE_DURATION,
CONF_ENTRY_ENFORCE_MP3,
+ create_sample_rates_config_entry(192000, 24, 96000, 24),
)
+
CONF_NETWORK_SCAN = "network_scan"
_DLNAPlayerProviderT = TypeVar("_DLNAPlayerProviderT", bound="DLNAPlayerProvider")
self.logger.debug("Ignoring disabled player: %s", udn)
return
- is_sonos = "rincon" in udn.lower()
-
dlna_player = DLNAPlayer(
udn=udn,
player=Player(
address=description_url,
manufacturer="unknown",
),
- max_sample_rate=48000 if is_sonos else 192000,
- supports_24bit=True,
- # disable sonos players by default in dlna
- enabled_by_default=not is_sonos,
),
description_url=description_url,
)
)
from music_assistant.common.models.media_items import (
Album,
- AlbumTrack,
Artist,
AudioFormat,
BrowseFolder,
MediaItemType,
MediaType,
Playlist,
- PlaylistTrack,
ProviderMapping,
SearchResults,
Track,
key=CONF_MISSING_ALBUM_ARTIST_ACTION,
type=ConfigEntryType.STRING,
label="Action when a track is missing the Albumartist ID3 tag",
- default_value="skip",
- description="Music Assistant prefers information stored in ID3 tags and only uses"
- " online sources for additional metadata. This means that the ID3 tags need to be "
- "accurate, preferably tagged with MusicBrainz Picard.",
+ default_value="folder_name",
+ help_link="https://music-assistant.io/music-providers/filesystem/#tagging-files",
required=False,
options=(
ConfigValueOption("Skip track and log warning", "skip"),
ConfigValueOption("Use Track artist(s)", "track_artist"),
ConfigValueOption("Use Various Artists", "various_artists"),
+ ConfigValueOption("Use Folder name", "folder_name"),
),
)
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
- # all data is originated from the actual files (tracks) so grab the data from there
for track in await self.get_album_tracks(prov_album_id):
for prov_mapping in track.provider_mappings:
if prov_mapping.provider_instance == self.instance_id:
playlist.metadata.cache_checksum = checksum
return playlist
- async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]:
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
# filesystem items are always stored in db so we can query the database
db_album = await self.mass.music.albums.get_library_item_by_prov_id(
if db_album is None:
msg = f"Album not found: {prov_album_id}"
raise MediaNotFoundError(msg)
- album_tracks = await self.mass.music.albums.tracks(db_album.item_id, db_album.provider)
+ album_tracks = await self.mass.music.albums.get_library_album_tracks(db_album.item_id)
return [
- AlbumTrack.from_track(track, db_album)
+ track
for track in album_tracks
if any(x.provider_instance == self.instance_id for x in track.provider_mappings)
]
- async def get_playlist_tracks(
- self, prov_playlist_id: str
- ) -> AsyncGenerator[PlaylistTrack, None]:
+ async def get_playlist_tracks(self, prov_playlist_id: str) -> AsyncGenerator[Track, None]:
"""Get playlist tracks for given playlist id."""
if not await self.exists(prov_playlist_id):
msg = f"Playlist path does not exist: {prov_playlist_id}"
if track := await self._parse_playlist_line(
playlist_line.path, os.path.dirname(prov_playlist_id)
):
- yield PlaylistTrack.from_track(track, line_no)
+ track.position = line_no
+ yield track
except Exception as err: # pylint: disable=broad-except
self.logger.warning(
await self._parse_artist(name=track_artist_str)
for track_artist_str in tags.artists
]
+ elif fallback_action == "folder_name" and album_dir:
+ possible_artist_folder = os.path.dirname(album_dir)
+ self.logger.warning(
+ "%s is missing ID3 tag [albumartist], using foldername %s as fallback",
+ file_item.path,
+ possible_artist_folder,
+ )
+ album_artist_str = possible_artist_folder.rsplit(os.sep)[-1]
+ album_artists = [await self._parse_artist(name=album_artist_str)]
# fallback to just log error and add track without album
else:
# default action is to skip the track
\r
from __future__ import annotations\r
\r
+import asyncio\r
import logging\r
import mimetypes\r
import socket\r
from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError\r
from music_assistant.common.models.media_items import (\r
Album,\r
- AlbumTrack,\r
Artist,\r
AudioFormat,\r
ItemMapping,\r
MediaItem,\r
MediaItemImage,\r
Playlist,\r
- PlaylistTrack,\r
ProviderMapping,\r
SearchResults,\r
Track,\r
)\r
-from music_assistant.common.models.media_items import Album as JellyfinAlbum\r
-from music_assistant.common.models.media_items import Artist as JellyfinArtist\r
-from music_assistant.common.models.media_items import Playlist as JellyfinPlaylist\r
-from music_assistant.common.models.media_items import Track as JellyfinTrack\r
from music_assistant.common.models.streamdetails import StreamDetails\r
\r
if TYPE_CHECKING:\r
return await self._parse_playlist(jellyfin_media)\r
return None\r
\r
- async def _search_track(self, search_query, limit) -> list[JellyfinTrack]:\r
+ async def _search_track(self, search_query, limit) -> list[dict[str, Any]]:\r
resultset = await self._run_async(\r
API.search_media_items,\r
self._jellyfin_server.jellyfin,\r
)\r
return resultset["Items"]\r
\r
- async def _search_album(self, search_query, limit) -> list[JellyfinAlbum]:\r
+ async def _search_album(self, search_query, limit) -> list[dict[str, Any]]:\r
if "-" in search_query:\r
searchterms = search_query.split(" - ")\r
albumname = searchterms[1]\r
)\r
return resultset["Items"]\r
\r
- async def _search_artist(self, search_query, limit) -> list[JellyfinArtist]:\r
+ async def _search_artist(self, search_query, limit) -> list[dict[str, Any]]:\r
resultset = await self._run_async(\r
API.search_media_items,\r
self._jellyfin_server.jellyfin,\r
)\r
return resultset["Items"]\r
\r
- async def _search_playlist(self, search_query, limit) -> list[JellyfinPlaylist]:\r
+ async def _search_playlist(self, search_query, limit) -> list[dict[str, Any]]:\r
resultset = await self._run_async(\r
API.search_media_items,\r
self._jellyfin_server.jellyfin,\r
]\r
return artist\r
\r
- async def _parse_track(\r
- self, jellyfin_track: dict[str, Any], extra_init_kwargs: dict[str, Any] | None = None\r
- ) -> Track | AlbumTrack | PlaylistTrack:\r
+ async def _parse_track(self, jellyfin_track: dict[str, Any]) -> Track:\r
"""Parse a Jellyfin Track response to a Track model object."""\r
- if extra_init_kwargs and "position" in extra_init_kwargs:\r
- track_class = PlaylistTrack\r
- elif (\r
- extra_init_kwargs\r
- and "disc_number" in extra_init_kwargs\r
- and "track_number" in extra_init_kwargs\r
- ):\r
- track_class = AlbumTrack\r
- else:\r
- track_class = Track\r
- current_jellyfin_track = API.get_item(\r
- self._jellyfin_server.jellyfin, jellyfin_track[ITEM_KEY_ID]\r
+ current_jellyfin_track = await asyncio.to_thread(\r
+ API.get_item, self._jellyfin_server.jellyfin, jellyfin_track[ITEM_KEY_ID]\r
)\r
available = False\r
content = None\r
available = current_jellyfin_track[ITEM_KEY_CAN_DOWNLOAD]\r
content = current_jellyfin_track[ITEM_KEY_MEDIA_STREAMS][0][ITEM_KEY_MEDIA_CODEC]\r
- track = track_class(\r
+ track = Track(\r
item_id=jellyfin_track[ITEM_KEY_ID],\r
provider=self.instance_id,\r
name=jellyfin_track[ITEM_KEY_NAME],\r
- **extra_init_kwargs or {},\r
provider_mappings={\r
ProviderMapping(\r
item_id=jellyfin_track[ITEM_KEY_ID],\r
},\r
)\r
\r
+ track.disc_number = current_jellyfin_track.get(ITEM_KEY_PARENT_INDEX_NUM, 1)\r
+ if "IndexNumber" in current_jellyfin_track:\r
+ if current_jellyfin_track["IndexNumber"] >= 1:\r
+ track_idx = current_jellyfin_track["IndexNumber"]\r
+ track.track_number = track_idx\r
+ track.position = track_idx\r
+\r
if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_track):\r
track.metadata.images = [\r
MediaItemImage(\r
track.mbid = current_jellyfin_track[ITEM_KEY_PROVIDER_IDS][ITEM_KEY_MUSICBRAINZ_TRACK]\r
return track\r
\r
- async def _parse_playlist(self, jellyfin_playlist: JellyfinPlaylist) -> Playlist:\r
+ async def _parse_playlist(self, jellyfin_playlist: dict[str, Any]) -> Playlist:\r
"""Parse a Jellyfin Playlist response to a Playlist object."""\r
playlistid = jellyfin_playlist[ITEM_KEY_ID]\r
playlist = Playlist(\r
jellyfin_album_tracks = await self._get_children(\r
self._jellyfin_server, prov_album_id, ITEM_TYPE_AUDIO\r
)\r
- tracks = []\r
- for jellyfin_album_track in jellyfin_album_tracks:\r
- discnum = jellyfin_album_track.get(ITEM_KEY_PARENT_INDEX_NUM, 1)\r
- if "IndexNumber" in jellyfin_album_track:\r
- if jellyfin_album_track["IndexNumber"] >= 1:\r
- tracknum = jellyfin_album_track["IndexNumber"]\r
- else:\r
- tracknum = jellyfin_album_track["IndexNumber"]\r
- else:\r
- tracknum = 99\r
- track = await self._parse_track(\r
- jellyfin_album_track,\r
- {\r
- "disc_number": discnum,\r
- "track_number": tracknum,\r
- },\r
- )\r
- tracks.append(track)\r
- return tracks\r
+ return [\r
+ await self._parse_track(jellyfin_album_track)\r
+ for jellyfin_album_track in jellyfin_album_tracks\r
+ ]\r
\r
async def get_artist(self, prov_artist_id) -> Artist:\r
"""Get full artist details by id."""\r
\r
if not playlist_items:\r
yield None\r
- for index, jellyfin_track in enumerate(playlist_items):\r
- if track := await self._parse_track(jellyfin_track, {"position": index + 1}):\r
+ for index, jellyfin_track in enumerate(playlist_items, 1):\r
+ if track := await self._parse_track(jellyfin_track):\r
+ if not track.position:\r
+ track.position = index\r
yield track\r
\r
async def get_artist_albums(self, prov_artist_id) -> list[Album]:\r
import asyncio
import logging
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING
from libopensonic.connection import Connection as SonicConnection
from libopensonic.errors import (
from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
Album,
- AlbumTrack,
AlbumType,
Artist,
AudioFormat,
ItemMapping,
MediaItemImage,
Playlist,
- PlaylistTrack,
ProviderMapping,
SearchResults,
Track,
return album
- def _parse_track(
- self, sonic_song: SonicSong, extra_init_kwargs: dict[str, Any] | None = None
- ) -> AlbumTrack | PlaylistTrack:
- if extra_init_kwargs and "position" in extra_init_kwargs:
- track_class = PlaylistTrack
- else:
- track_class = AlbumTrack
-
+ def _parse_track(self, sonic_song: SonicSong) -> Track:
mapping = None
if sonic_song.album_id is not None and sonic_song.album is not None:
mapping = self._get_item_mapping(MediaType.ALBUM, sonic_song.album_id, sonic_song.album)
- track = track_class(
+ track = Track(
item_id=sonic_song.id,
provider=self.instance_id,
name=sonic_song.title,
album=mapping,
duration=sonic_song.duration if sonic_song.duration is not None else 0,
- **extra_init_kwargs or {},
provider_mappings={
ProviderMapping(
item_id=sonic_song.id,
),
)
},
+ track_number=getattr(sonic_song, "track", 0),
)
- if not extra_init_kwargs:
- track.track_number = int(sonic_song.track) if sonic_song.track is not None else 1
-
# We need to find an artist for this track but various implementations seem to disagree
# about where the artist with the valid ID needs to be found. We will add any artist with
# an ID and only use UNKNOWN if none are found.
for entry in results:
yield self._parse_playlist(entry)
- async def get_library_tracks(self) -> AsyncGenerator[Track | AlbumTrack, None]:
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""
Provide a generator for library tracks.
return self._parse_album(sonic_album, sonic_info)
- async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]:
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Return a list of tracks on the specified Album."""
try:
sonic_album: SonicAlbum = await self._run_async(self._conn.getAlbum, prov_album_id)
msg = f"Playlist {prov_playlist_id} not found"
raise MediaNotFoundError(msg) from e
for index, sonic_song in enumerate(sonic_playlist.songs):
- yield self._parse_track(sonic_song, {"position": index + 1})
+ track = self._parse_track(sonic_song)
+ track.position = index
+ yield track
async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get the top listed tracks for a specified artist."""
import asyncio
import logging
from asyncio import TaskGroup
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING
import plexapi.exceptions
import requests
from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
Album,
- AlbumTrack,
Artist,
AudioFormat,
ItemMapping,
MediaItemChapter,
MediaItemImage,
Playlist,
- PlaylistTrack,
ProviderMapping,
SearchResults,
Track,
return playlist
- async def _parse_track(
- self, plex_track: PlexTrack, extra_init_kwargs: dict[str, Any] | None = None
- ) -> Track | AlbumTrack | PlaylistTrack:
+ async def _parse_track(self, plex_track: PlexTrack) -> Track:
"""Parse a Plex Track response to a Track model object."""
- if extra_init_kwargs and "position" in extra_init_kwargs:
- track_class = PlaylistTrack
- elif (
- extra_init_kwargs
- and "disc_number" in extra_init_kwargs
- and "track_number" in extra_init_kwargs
- ):
- track_class = AlbumTrack
- else:
- track_class = Track
if plex_track.media:
available = True
content = plex_track.media[0].container
else:
available = False
content = None
- track = track_class(
+ track = Track(
item_id=plex_track.key,
provider=self.instance_id,
name=plex_track.title,
- **extra_init_kwargs or {},
provider_mappings={
ProviderMapping(
item_id=plex_track.key,
msg = f"Item {prov_album_id} not found"
raise MediaNotFoundError(msg)
- async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]:
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
plex_album: PlexAlbum = await self._get_data(prov_album_id, PlexAlbum)
tracks = []
for idx, plex_track in enumerate(await self._run_async(plex_album.tracks), 1):
track = await self._parse_track(
plex_track,
- {
- "disc_number": plex_track.parentIndex,
- "track_number": plex_track.trackNumber or idx,
- },
)
+ track.disc_number = plex_track.parentIndex
+ track.track_number = plex_track.trackNumber or idx
tracks.append(track)
return tracks
playlist_items = await self._run_async(plex_playlist.items)
for index, plex_track in enumerate(playlist_items or []):
- if track := await self._parse_track(plex_track, {"position": index + 1}):
+ if track := await self._parse_track(plex_track):
+ track.position = index
yield track
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
Album,
- AlbumTrack,
AlbumType,
Artist,
AudioFormat,
MediaItemType,
MediaType,
Playlist,
- PlaylistTrack,
ProviderMapping,
SearchResults,
Track,
if searchresult := await self._get_data("catalog/search", **params):
if "artists" in searchresult:
result.artists += [
- await self._parse_artist(item)
+ self._parse_artist(item)
for item in searchresult["artists"]["items"]
if (item and item["id"])
]
]
if "playlists" in searchresult:
result.playlists += [
- await self._parse_playlist(item)
+ self._parse_playlist(item)
for item in searchresult["playlists"]["items"]
if (item and item["id"])
]
endpoint = "favorite/getUserFavorites"
for item in await self._get_all_items(endpoint, key="artists", type="artists"):
if item and item["id"]:
- yield await self._parse_artist(item)
+ yield self._parse_artist(item)
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve all library albums from Qobuz."""
endpoint = "playlist/getUserPlaylists"
for item in await self._get_all_items(endpoint, key="playlists"):
if item and item["id"]:
- yield await self._parse_playlist(item)
+ yield self._parse_playlist(item)
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
params = {"artist_id": prov_artist_id}
if (artist_obj := await self._get_data("artist/get", **params)) and artist_obj["id"]:
- return await self._parse_artist(artist_obj)
+ return self._parse_artist(artist_obj)
msg = f"Item {prov_artist_id} not found"
raise MediaNotFoundError(msg)
"""Get full playlist details by id."""
params = {"playlist_id": prov_playlist_id}
if (playlist_obj := await self._get_data("playlist/get", **params)) and playlist_obj["id"]:
- return await self._parse_playlist(playlist_obj)
+ return self._parse_playlist(playlist_obj)
msg = f"Item {prov_playlist_id} not found"
raise MediaNotFoundError(msg)
- async def get_album_tracks(self, prov_album_id) -> list[AlbumTrack]:
+ async def get_album_tracks(self, prov_album_id) -> list[Track]:
"""Get all album tracks for given album id."""
params = {"album_id": prov_album_id}
return [
if (item and item["id"])
]
- async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[PlaylistTrack, None]:
+ async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given playlist id."""
count = 1
for track_obj in await self._get_all_items(
if not (track_obj and track_obj["id"]):
continue
track = await self._parse_track(track_obj)
- yield PlaylistTrack.from_track(track, position=count)
+ track.position = count
+ yield track
count += 1
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
duration=try_parse_int(seconds_streamed),
)
- async def _parse_artist(self, artist_obj: dict):
+ def _parse_artist(self, artist_obj: dict):
"""Parse qobuz artist object to generic layout."""
artist = Artist(
item_id=str(artist_obj["id"]),
},
)
album.external_ids.add((ExternalID.BARCODE, album_obj["upc"]))
- album.artists.append(await self._parse_artist(artist_obj or album_obj["artist"]))
+ album.artists.append(self._parse_artist(artist_obj or album_obj["artist"]))
if (
album_obj.get("product_type", "") == "single"
or album_obj.get("release_type", "") == "single"
if isrc := track_obj.get("isrc"):
track.external_ids.add((ExternalID.ISRC, isrc))
if track_obj.get("performer") and "Various " not in track_obj["performer"]:
- artist = await self._parse_artist(track_obj["performer"])
+ artist = self._parse_artist(track_obj["performer"])
if artist:
track.artists.append(artist)
# try to grab artist from album
and track_obj["album"].get("artist")
and "Various " not in track_obj["album"]["artist"]
):
- artist = await self._parse_artist(track_obj["album"]["artist"])
+ artist = self._parse_artist(track_obj["album"]["artist"])
if artist:
track.artists.append(artist)
if not track.artists:
return track
- async def _parse_playlist(self, playlist_obj):
+ def _parse_playlist(self, playlist_obj):
"""Parse qobuz playlist object to generic layout."""
playlist = Playlist(
item_id=str(playlist_obj["id"]),
ConfigValueOption,
ConfigValueType,
PlayerConfig,
+ create_sample_rates_config_entry,
)
from music_assistant.common.models.enums import (
ConfigEntryType,
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
base_entries = await super().get_player_config_entries(player_id)
- if not self.slimproto.get_player(player_id):
+ if not (slimclient := self.slimproto.get_player(player_id)):
# most probably a syncgroup
- return (*base_entries, CONF_ENTRY_CROSSFADE, CONF_ENTRY_CROSSFADE_DURATION)
+ return (
+ *base_entries,
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ create_sample_rates_config_entry(96000, 24, 48000, 24),
+ )
# create preset entries (for players that support it)
preset_entries = ()
CONF_ENTRY_SYNC_ADJUST,
CONF_ENTRY_DISPLAY,
CONF_ENTRY_VISUALIZATION,
+ create_sample_rates_config_entry(int(slimclient.max_sample_rate), 24, 48000, 24),
)
)
PlayerFeature.VOLUME_MUTE,
PlayerFeature.ENQUEUE_NEXT,
),
- max_sample_rate=int(slimplayer.max_sample_rate),
- supports_24bit=int(slimplayer.max_sample_rate) > 44100,
can_sync_with=tuple(
x.player_id for x in self.slimproto.players if x.player_id != player_id
),
)
- if slimplayer.device_type == "squeezeesp32":
- # squeezeesp32 with default settings - override with sane defaults
- if slimplayer.max_sample_rate == 192000:
- player.max_sample_rate = 44100
- player.supports_24bit = False
self.mass.players.register_or_update(player)
# update player state on player events
CONF_ENTRY_FLOW_MODE_ENFORCED,
ConfigEntry,
ConfigValueType,
+ create_sample_rates_config_entry,
)
from music_assistant.common.models.enums import (
ConfigEntryType,
CONF_SERVER_CONTROL_PORT = "snapcast_server_control_port"
CONF_USE_EXTERNAL_SERVER = "snapcast_use_external_server"
+# airplay has fixed sample rate/bit depth so make this config entry static and hidden
+CONF_ENTRY_SAMPLE_RATES_SNAPCAST = create_sample_rates_config_entry(48000, 16, 48000, 16, True)
+
SNAP_STREAM_STATUS_MAP = {
"idle": PlayerState.IDLE,
"playing": PlayerState.PLAYING,
CONF_ENTRY_FLOW_MODE_ENFORCED,
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
)
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
CONF_ENTRY_CROSSFADE,
ConfigEntry,
ConfigValueType,
+ create_sample_rates_config_entry,
)
from music_assistant.common.models.enums import (
ConfigEntryType,
ZGS_SUBSCRIPTION_TIMEOUT = 2
-HIRES_MODELS = (
+S2_MODELS = (
"Sonos Roam",
"Sonos Arc",
"Sonos Beam",
"Sonos Era 300",
)
+CONF_ENTRY_SAMPLE_RATES_SONOS_S2 = create_sample_rates_config_entry(48000, 24, 48000, 24, True)
+CONF_ENTRY_SAMPLE_RATES_SONOS_S1 = create_sample_rates_config_entry(48000, 16, 48000, 16, True)
+
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
if not (sonos_player := self.sonosplayers.get(player_id)):
# most probably a syncgroup
return (*base_entries, CONF_ENTRY_CROSSFADE)
+ is_s2 = sonos_player.soco.speaker_info["model_name"] in S2_MODELS
return (
*base_entries,
CONF_ENTRY_CROSSFADE,
description="Enable loudness compensation on the Sonos player",
category="advanced",
),
+ CONF_ENTRY_SAMPLE_RATES_SONOS_S2 if is_s2 else CONF_ENTRY_SAMPLE_RATES_SONOS_S1,
)
def on_player_config_changed(
if soco.uid not in self.boot_counts:
self.boot_counts[soco.uid] = soco.boot_seqnum
self.logger.debug("Adding new player: %s", speaker_info)
- support_hires = speaker_info["model_name"] in HIRES_MODELS
if not (mass_player := self.mass.players.get(soco.uid)):
mass_player = Player(
player_id=soco.uid,
address=soco.ip_address,
manufacturer="SONOS",
),
- max_sample_rate=48000 if support_hires else 44100,
- supports_24bit=support_hires,
)
self.sonosplayers[player_id] = sonos_player = SonosPlayer(
self,
MUSIC_SRC_SPOTIFY_CONNECT: SOURCE_SPOTIFY_CONNECT,
}
-HIRES_MODELS = (
- "Sonos Roam",
- "Sonos Arc",
- "Sonos Beam",
- "Sonos Five",
- "Sonos Move",
- "Sonos One SL",
- "Sonos Port",
- "Sonos Amp",
- "SYMFONISK Bookshelf",
- "SYMFONISK Table Lamp",
- "Sonos Era 100",
- "Sonos Era 300",
-)
-
class SonosSubscriptionsFailed(PlayerCommandFailed):
"""Subscription creation failed."""
MediaItemImage,
MediaType,
Playlist,
- PlaylistTrack,
ProviderMapping,
SearchResults,
Track,
self.logger.debug("Parse playlist failed: %s", playlist_obj, exc_info=error)
return playlist
- async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[PlaylistTrack, None]:
+ async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given playlist id."""
playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=prov_playlist_id)
if "tracks" not in playlist_obj:
playlist.metadata.style = playlist_obj["tag_list"]
return playlist
- async def _parse_track(
- self, track_obj: dict, playlist_position: int | None = None
- ) -> Track | PlaylistTrack:
+ async def _parse_track(self, track_obj: dict, playlist_position: int = 0) -> Track:
"""Parse a Soundcloud Track response to a Track model object."""
name, version = parse_title_and_version(track_obj["title"])
- track_class = PlaylistTrack if playlist_position is not None else Track
- track = track_class( # pylint: disable=missing-kwoa
+ track = Track(
item_id=track_obj["id"],
provider=self.domain,
name=name,
url=track_obj["permalink_url"],
)
},
- **{"position": playlist_position} if playlist_position else {},
+ position=playlist_position,
)
user_id = track_obj["user"]["id"]
user = await self._soundcloud.get_user_details(user_id)
from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
Album,
- AlbumTrack,
AlbumType,
Artist,
AudioFormat,
MediaItemType,
MediaType,
Playlist,
- PlaylistTrack,
ProviderMapping,
SearchResults,
Track,
from music_assistant.server.helpers.app_vars import app_var
# pylint: enable=no-name-in-module
+from music_assistant.server.helpers.audio import get_chunksize
from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.models.music_provider import MusicProvider
searchresult = await self._get_data("search", q=search_query, type=searchtype, limit=limit)
if "artists" in searchresult:
result.artists += [
- await self._parse_artist(item)
+ self._parse_artist(item)
for item in searchresult["artists"]["items"]
if (item and item["id"] and item["name"])
]
if "albums" in searchresult:
result.albums += [
- await self._parse_album(item)
+ self._parse_album(item)
for item in searchresult["albums"]["items"]
if (item and item["id"])
]
if "tracks" in searchresult:
result.tracks += [
- await self._parse_track(item)
+ self._parse_track(item)
for item in searchresult["tracks"]["items"]
if (item and item["id"])
]
if "playlists" in searchresult:
result.playlists += [
- await self._parse_playlist(item)
+ self._parse_playlist(item)
for item in searchresult["playlists"]["items"]
if (item and item["id"])
]
)
for item in spotify_artists["artists"]["items"]:
if item and item["id"]:
- yield await self._parse_artist(item)
+ yield self._parse_artist(item)
if spotify_artists["artists"]["next"]:
endpoint = spotify_artists["artists"]["next"]
endpoint = endpoint.replace("https://api.spotify.com/v1/", "")
"""Retrieve library albums from the provider."""
for item in await self._get_all_items("me/albums"):
if item["album"] and item["album"]["id"]:
- yield await self._parse_album(item["album"])
+ yield self._parse_album(item["album"])
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from the provider."""
for item in await self._get_all_items("me/tracks"):
if item and item["track"]["id"]:
- yield await self._parse_track(item["track"])
+ yield self._parse_track(item["track"])
def _get_liked_songs_playlist_id(self) -> str:
return f"{LIKED_SONGS_FAKE_PLAYLIST_ID_PREFIX}-{self.instance_id}"
yield await self._get_liked_songs_playlist()
for item in await self._get_all_items("me/playlists"):
if item and item["id"]:
- yield await self._parse_playlist(item)
+ yield self._parse_playlist(item)
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
artist_obj = await self._get_data(f"artists/{prov_artist_id}")
- return await self._parse_artist(artist_obj)
+ return self._parse_artist(artist_obj)
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
album_obj = await self._get_data(f"albums/{prov_album_id}")
- return await self._parse_album(album_obj)
+ return self._parse_album(album_obj)
async def get_track(self, prov_track_id) -> Track:
"""Get full track details by id."""
track_obj = await self._get_data(f"tracks/{prov_track_id}")
- return await self._parse_track(track_obj)
+ return self._parse_track(track_obj)
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
return await self._get_liked_songs_playlist()
playlist_obj = await self._get_data(f"playlists/{prov_playlist_id}")
- return await self._parse_playlist(playlist_obj)
+ return self._parse_playlist(playlist_obj)
- async def get_album_tracks(self, prov_album_id) -> list[AlbumTrack]:
+ async def get_album_tracks(self, prov_album_id) -> list[Track]:
"""Get all album tracks for given album id."""
return [
- AlbumTrack.from_track(await self._parse_track(item))
+ self._parse_track(item)
for item in await self._get_all_items(f"albums/{prov_album_id}/tracks")
if item["id"]
]
- async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[PlaylistTrack, None]:
+ async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given playlist id."""
count = 1
uri = (
if not (item and item["track"] and item["track"]["id"]):
continue
# use count as position
- track = await self._parse_track(item["track"])
- yield PlaylistTrack.from_track(track, position=count)
+ track = self._parse_track(item["track"])
+ track.position = count
count += 1
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
"""Get a list of all albums for the given artist."""
return [
- await self._parse_album(item)
+ self._parse_album(item)
for item in await self._get_all_items(
f"artists/{prov_artist_id}/albums?include_groups=album,single,compilation"
)
endpoint = f"artists/{prov_artist_id}/top-tracks"
items = await self._get_data(endpoint)
return [
- await self._parse_track(item, artist=artist)
+ self._parse_track(item, artist=artist)
for item in items["tracks"]
if (item and item["id"])
]
"""Retrieve a dynamic list of tracks based on the provided item."""
endpoint = "recommendations"
items = await self._get_data(endpoint, seed_tracks=prov_track_id, limit=limit)
- return [await self._parse_track(item) for item in items["tracks"] if (item and item["id"])]
+ return [self._parse_track(item) for item in items["tracks"] if (item and item["id"])]
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
args += ["--start-position", str(int(seek_position))]
if self._ap_workaround:
args += ["--ap-port", "12345"]
+ chunk_size = get_chunksize(streamdetails.audio_format)
async with AsyncProcess(args, stdout=True, name="librespot") as librespot_proc:
- async for chunk in librespot_proc.iter_any():
+ async for chunk in librespot_proc.iter_any(chunk_size):
yield chunk
- async def _parse_artist(self, artist_obj):
+ def _parse_artist(self, artist_obj):
"""Parse spotify artist object to generic layout."""
artist = Artist(
item_id=artist_obj["id"],
break
return artist
- async def _parse_album(self, album_obj: dict):
+ def _parse_album(self, album_obj: dict):
"""Parse spotify album object to generic layout."""
name, version = parse_title_and_version(album_obj["name"])
album = Album(
for artist_obj in album_obj["artists"]:
if not artist_obj.get("name") or not artist_obj.get("id"):
continue
- album.artists.append(await self._parse_artist(artist_obj))
+ album.artists.append(self._parse_artist(artist_obj))
with contextlib.suppress(ValueError):
album.album_type = AlbumType(album_obj["album_type"])
album.metadata.explicit = album_obj["explicit"]
return album
- async def _parse_track(
+ def _parse_track(
self,
track_obj: dict[str, Any],
artist=None,
for track_artist in track_obj.get("artists", []):
if not track_artist.get("name") or not track_artist.get("id"):
continue
- artist = await self._parse_artist(track_artist)
+ artist = self._parse_artist(track_artist)
if artist and artist.item_id not in {x.item_id for x in track.artists}:
track.artists.append(artist)
if "preview_url" in track_obj:
track.metadata.preview = track_obj["preview_url"]
if "album" in track_obj:
- track.album = await self._parse_album(track_obj["album"])
+ track.album = self._parse_album(track_obj["album"])
if track_obj["album"].get("images"):
track.metadata.images = [
MediaItemImage(
track.metadata.popularity = track_obj["popularity"]
return track
- async def _parse_playlist(self, playlist_obj):
+ def _parse_playlist(self, playlist_obj):
"""Parse spotify playlist object to generic layout."""
playlist = Playlist(
item_id=playlist_obj["id"],
ProviderFeature,
StreamType,
)
-from music_assistant.common.models.errors import (
- LoginFailed,
- MediaNotFoundError,
-)
+from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
Album,
- AlbumTrack,
Artist,
AudioFormat,
ContentType,
ItemMapping,
MediaItemImage,
Playlist,
- PlaylistTrack,
ProviderMapping,
SearchResults,
Track,
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.helpers.tags import AudioTags, parse_tags
-from music_assistant.server.helpers.throttle_retry import (
- AsyncThrottleWithRetryContextManager,
-)
+from music_assistant.server.helpers.throttle_retry import AsyncThrottleWithRetryContextManager
from music_assistant.server.models.music_provider import MusicProvider
from .helpers import (
):
yield self._parse_playlist(playlist)
- async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]:
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
tidal_session = await self._get_tidal_session()
- async with self._throttle_retry as manager:
- album_obj = await manager.wrapped_function_with_retry(
- get_album, tidal_session, prov_album_id
- )
-
async with self._throttle_retry as manager:
tracks_obj = await manager.wrapped_function_with_retry(
get_album_tracks, tidal_session, prov_album_id
)
- return [
- AlbumTrack.from_track(
- track=self._parse_track(track_obj=track_obj),
- album=self._parse_album(album_obj=album_obj),
- disc_number=track_obj.volume_num,
- track_number=track_obj.track_num,
- )
- for track_obj in tracks_obj
- ]
+ return [self._parse_track(track_obj=track_obj) for track_obj in tracks_obj]
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get a list of all albums for the given artist."""
)
return [self._parse_track(track) for track in artist_toptracks_obj]
- async def get_playlist_tracks(
- self, prov_playlist_id: str
- ) -> AsyncGenerator[PlaylistTrack, None]:
+ async def get_playlist_tracks(self, prov_playlist_id: str) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given playlist id."""
tidal_session = await self._get_tidal_session()
total_playlist_tracks = 0
- track: TidalTrack # satisfy the type checker
+ track_obj: TidalTrack # satisfy the type checker
async for track_obj in self._iter_items(
get_playlist_tracks, tidal_session, prov_playlist_id, limit=DEFAULT_LIMIT
):
total_playlist_tracks += 1
- track = PlaylistTrack.from_track(
- self._parse_track(track_obj=track_obj), total_playlist_tracks
- )
+ track = self._parse_track(track_obj=track_obj)
+ track.position = total_playlist_tracks
yield track
async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
def _parse_track(
self,
track_obj: TidalTrack,
- extra_init_kwargs: dict[str, Any] | None = None,
- ) -> Track | AlbumTrack | PlaylistTrack:
+ ) -> Track:
"""Parse tidal track object to generic layout."""
version = track_obj.version or ""
track_id = str(track_obj.id)
- if extra_init_kwargs is None:
- extra_init_kwargs = {}
- if "position" in extra_init_kwargs:
- track_class = PlaylistTrack
- elif "disc_number" in extra_init_kwargs and "track_number" in extra_init_kwargs:
- track_class = AlbumTrack
- else:
- track_class = Track
- track = track_class(
+ track = Track(
item_id=str(track_id),
provider=self.instance_id,
name=track_obj.name,
available=track_obj.available,
)
},
- **extra_init_kwargs,
+ disc_number=track_obj.volume_num,
+ track_number=track_obj.track_num,
)
if track_obj.isrc:
track.external_ids.add((ExternalID.ISRC, track_obj.isrc))
radio.position = preset_number
if "text" in details:
radio.metadata.description = details["text"]
- # images
- if img := details.get("image"):
- radio.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=img,
- provider=self.instance_id,
- remotely_accessible=True,
- )
- ]
- if img := details.get("logo"):
+ # image
+ if img := details.get("image") or details.get("logo"):
radio.metadata.images = [
MediaItemImage(
type=ImageType.THUMB,
ConfigEntry,
ConfigValueOption,
ConfigValueType,
+ create_sample_rates_config_entry,
)
from music_assistant.common.models.enums import (
ConfigEntryType,
content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
)
+CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(48000, 24, 48000, 24, True)
+
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
),
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_SAMPLE_RATES_UGP,
)
async def cmd_stop(self, player_id: str) -> None:
)
from music_assistant.common.models.media_items import (
Album,
- AlbumTrack,
AlbumType,
Artist,
AudioFormat,
MediaItemType,
MediaType,
Playlist,
- PlaylistTrack,
ProviderMapping,
SearchResults,
Track,
msg = f"Item {prov_album_id} not found"
raise MediaNotFoundError(msg)
- async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]:
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
await self._check_oauth_token()
album_obj = await get_album(prov_album_id=prov_album_id, language=self.language)
tracks = []
for idx, track_obj in enumerate(album_obj["tracks"], 1):
try:
- track = AlbumTrack.from_track(
- await self._parse_track(track_obj=track_obj),
- disc_number=0,
- track_number=track_obj.get("trackNumber", idx),
- )
+ track = await self._parse_track(track_obj=track_obj)
+ track.track_number = track_obj.get("trackNumber", idx)
except InvalidDataError:
continue
tracks.append(track)
msg = f"Item {prov_playlist_id} not found"
raise MediaNotFoundError(msg)
- async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[PlaylistTrack, None]:
+ async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given playlist id."""
await self._check_oauth_token()
# Grab the playlist id from the full url in case of personal playlists
# In that case, call the API for track details based on track id
try:
if track := await self._parse_track(track_obj):
- yield PlaylistTrack.from_track(track, index + 1)
+ track.position = index + 1
+ yield track
except InvalidDataError:
if track := await self.get_track(track_obj["videoId"]):
- yield PlaylistTrack.from_track(track, index + 1)
+ track.position = index + 1
+ yield track
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
"""Get a list of albums for the given artist."""
streams: StreamsController
_aiobrowser: AsyncServiceBrowser
- def __init__(self, storage_path: str) -> None:
+ def __init__(self, storage_path: str, safe_mode: bool = False) -> None:
"""Initialize the MusicAssistant Server."""
self.storage_path = storage_path
+ self.safe_mode = safe_mode
# we dynamically register command handlers which can be consumed by the apis
self.command_handlers: dict[str, APICommandHandler] = {}
self._subscribers: set[EventSubscriptionType] = set()
self.config = ConfigController(self)
await self.config.setup()
LOGGER.info(
- "Starting Music Assistant Server (%s) version %s - HA add-on: %s",
+ "Starting Music Assistant Server (%s) version %s - HA add-on: %s - Safe mode: %s",
self.server_id,
self.version,
self.running_as_hass_addon,
+ self.safe_mode,
)
# setup other core controllers
self.cache = CacheController(self)
# setup discovery
self.create_task(self._setup_discovery())
# load providers
- await self._load_providers()
+ if not self.safe_mode:
+ await self._load_providers()
async def stop(self) -> None:
"""Stop running the music assistant server."""
*{x.instance_id for x in self.providers},
},
"unique_providers": {x.lookup_key for x in self.providers},
+ "streaming_providers": {
+ x.lookup_key
+ for x in self.providers
+ if x.type == ProviderType.MUSIC and x.is_streaming_provider
+ },
+ "non_streaming_providers": {
+ x.lookup_key
+ for x in self.providers
+ if not (x.type == ProviderType.MUSIC and x.is_streaming_provider)
+ },
}
)