from mashumaro import DataClassDictMixin
from .enums import MediaType
-from .media_items import Album, ItemMapping, MediaItemImage, Radio, Track
+from .media_items import ItemMapping, MediaItemImage, Radio, Track
from .streamdetails import StreamDetails
return None
if media_item.image:
return media_item.image
- if isinstance(media_item, Track) and isinstance(media_item.album, Album):
- return media_item.album.image
+ if media_item.media_type == MediaType.TRACK and (album := getattr(media_item, "album", None)):
+ return get_image(album)
return None
REFRESH_INTERVAL = 60 * 60 * 24 * 30
JSON_KEYS = ("artists", "album", "metadata", "provider_mappings", "external_ids")
+SORT_KEYS = {
+ "name": "name COLLATE NOCASE ASC",
+ "name_desc": "name COLLATE NOCASE DESC",
+ "sort_name": "sort_name ASC",
+ "sort_name_desc": "sort_name DESC",
+ "timestamp_added": "timestamp_added ASC",
+ "timestamp_added_desc": "timestamp_added DESC",
+ "last_played": "last_played ASC",
+ "last_played_desc": "last_played DESC",
+ "play_count": "play_count ASC",
+ "play_count_desc": "play_count DESC",
+ "artist": "artists.name COLLATE NOCASE ASC",
+ "album": "albums.name COLLATE NOCASE ASC",
+ "sort_artist": "artists.sort_name ASC",
+ "sort_album": "albums.sort_name ASC",
+ "year": "year ASC",
+ "year_desc": "year DESC",
+ "position": "position ASC",
+ "position_desc": "position DESC",
+ "random": "RANDOM()",
+}
+
class MediaControllerBase(Generic[ItemCls], metaclass=ABCMeta):
"""Base model for controller managing a MediaType."""
add_to_library: bool = False,
) -> ItemCls:
"""Return (full) details for a single media item."""
- metadata_lookup = force_refresh or add_to_library
+ metadata_lookup = False
# always prefer the full library item if we have it
library_item = await self.get_library_item_by_prov_id(
item_id,
if library_item.available:
# do not attempts metadata refresh on unavailable items as it has side effects
metadata_lookup = True
- if library_item and (force_refresh or metadata_lookup):
+
+ if library_item and not (force_refresh or metadata_lookup or add_to_library):
+ # we have a library item and no refreshing is needed, return the results!
+ return library_item
+
+ if force_refresh:
# get (first) provider item id belonging to this library item
add_to_library = True
+ metadata_lookup = True
provider_instance_id_or_domain, item_id = await self.get_provider_mapping(library_item)
- elif library_item:
- # we have a library item and no refreshing is needed, return the results!
- return library_item
- if (
- provider_instance_id_or_domain
- and item_id
- and (
- not details
- or isinstance(details, ItemMapping)
- or (add_to_library and details.provider == "library")
- )
- ):
- # grab full details from the provider
- details = await self.get_provider_item(
- item_id,
- provider_instance_id_or_domain,
- force_refresh=force_refresh,
- fallback=details,
- )
+
+ # grab full details from the provider
+ details = await self.get_provider_item(
+ item_id,
+ provider_instance_id_or_domain,
+ force_refresh=force_refresh,
+ fallback=details,
+ )
if not details and library_item:
# something went wrong while trying to fetch/refresh this item
# return the existing (unavailable) library item and leave this for another day
return library_item
+
if not details:
# we couldn't get a match from any of the providers, raise error
msg = f"Item not found: {provider_instance_id_or_domain}/{item_id}"
raise MediaNotFoundError(msg)
+
if not (add_to_library or metadata_lookup):
# return the provider item as-is
return details
+
# create task to add the item to the library,
# including matching metadata etc. takes some time
# in 99% of the cases we just return lazy because we want the details as fast as possible
fallback: ItemMapping | ItemCls = None,
) -> ItemCls:
"""Return item details for the given provider item id."""
+ if provider_instance_id_or_domain == "library":
+ return await self.get_library_item(item_id)
if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
raise ProviderUnavailableError(f"{provider_instance_id_or_domain} is not available")
cache_key = f"provider_item.{self.media_type.value}.{provider.lookup_key}.{item_id}"
- if provider_instance_id_or_domain == "library":
- return await self.get_library_item(item_id)
if not force_refresh and (cache := await self.mass.cache.get(cache_key)):
return self.item_cls.from_dict(cache)
if provider := self.mass.get_provider(provider_instance_id_or_domain):
if count_only:
return await self.mass.music.database.get_count_from_query(sql_query, query_params)
if order_by:
- order_by = order_by.replace("sort_artist", f"{DB_TABLE_ARTISTS}.sort_name")
- order_by = order_by.replace("sort_album", f"{DB_TABLE_ALBUMS}.sort_name")
- sql_query += f" ORDER BY {order_by}"
+ if sort_key := SORT_KEYS.get(order_by):
+ sql_query += f" ORDER BY {sort_key}"
+ else:
+ self.logger.warning("%s is not a valid sort option!", order_by)
+
# return dbresult parsed to media item model
return [
self.item_cls.from_dict(self._parse_db_row(db_row))
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.helpers.uri import create_uri, parse_uri
-from music_assistant.common.models.enums import MediaType, ProviderFeature
+from music_assistant.common.models.enums import MediaType, ProviderFeature, ProviderType
from music_assistant.common.models.errors import (
InvalidDataError,
MediaNotFoundError,
force_refresh: bool = False,
offset: int = 0,
limit: int = 50,
+ prefer_library_items: bool = True,
) -> PagedItems[PlaylistTrack]:
"""Return playlist tracks for the given provider playlist id."""
playlist = await self.get(
offset=offset,
limit=limit,
)
- return PagedItems(items=tracks, limit=limit, offset=offset)
+ if prefer_library_items:
+ final_tracks = []
+ for track in tracks:
+ if db_item := await self.mass.music.tracks.get_library_item_by_prov_id(
+ track.item_id, track.provider
+ ):
+ final_tracks.append(db_item)
+ else:
+ final_tracks.append(track)
+ else:
+ final_tracks = tracks
+ return PagedItems(items=final_tracks, limit=limit, offset=offset)
async def create_playlist(
self, name: str, provider_instance_or_domain: str | None = None
force_refresh=True,
)
- async def get_all_playlist_tracks(self, playlist: Playlist) -> list[PlaylistTrack]:
+ async def get_all_playlist_tracks(
+ self, playlist: Playlist, prefer_library_items: bool = False
+ ) -> list[PlaylistTrack]:
"""Return all tracks for given playlist (by unwrapping the paged listing)."""
result: list[PlaylistTrack] = []
offset = 0
provider_instance_id_or_domain=playlist.provider,
offset=offset,
limit=limit,
+ prefer_library_items=prefer_library_items,
)
result += paged_items.items
if paged_items.count != limit:
provider_instance_id_or_domain: str,
cache_checksum: Any = None,
offset: int = 0,
- limit: int = 100,
+ limit: int = 50,
) -> list[PlaylistTrack]:
"""Return playlist tracks for the given provider playlist id."""
assert provider_instance_id_or_domain != "library"
provider = self.mass.get_provider(provider_instance_id_or_domain)
if not provider or ProviderFeature.SIMILAR_TRACKS not in provider.supported_features:
return []
+ playlist = await self.get(item_id, provider_instance_id_or_domain)
playlist_tracks = [
x
- for x in await self._get_provider_playlist_tracks(
- item_id, provider_instance_id_or_domain
- )
+ for x in await self.get_all_playlist_tracks(playlist)
# filter out unavailable tracks
if x.available
]
limit = min(limit, len(playlist_tracks))
# use set to prevent duplicates
- final_items = []
+ final_items: list[Track] = []
# to account for playlists with mixed content we grab suggestions from a few
# random playlist tracks to prevent getting too many tracks of one of the
# source playlist's genres.
limit: int = 25,
) -> list[Track]:
"""Get dynamic list of tracks for given item, fallback/default implementation."""
- # TODO: query metadata provider(s) to get similar tracks (or tracks from similar artists)
- msg = "No Music Provider found that supports requesting similar tracks."
- raise UnsupportedFeaturedException(msg)
+ # check if we have any provider that supports dynamic tracks
+ # TODO: query metadata provider(s) (such as lastfm?)
+ # to get similar tracks (or tracks from similar artists)
+ for prov in self.mass.get_providers(ProviderType.MUSIC):
+ if ProviderFeature.SIMILAR_TRACKS in prov.supported_features:
+ break
+ else:
+ msg = "No Music Provider found that supports requesting similar tracks."
+ raise UnsupportedFeaturedException(msg)
+
+ radio_items: list[Track] = []
+ radio_item_titles: set[str] = set()
+ playlist_tracks = await self.get_all_playlist_tracks(media_item, prefer_library_items=True)
+ random.shuffle(playlist_tracks)
+ for playlist_track in playlist_tracks:
+ if not playlist_track.available:
+ continue
+ # include base item in the list
+ radio_items.append(playlist_track)
+ radio_item_titles.add(playlist_track.name)
+ # now try to find similar tracks
+ for item_prov_mapping in playlist_track.provider_mappings:
+ if not (prov := self.mass.get_provider(item_prov_mapping.provider_instance)):
+ continue
+ if ProviderFeature.SIMILAR_TRACKS not in prov.supported_features:
+ continue
+ # fetch some similar tracks on this provider
+ for similar_track in await prov.get_similar_tracks(
+ prov_track_id=item_prov_mapping.item_id, limit=5
+ ):
+ if similar_track.name not in radio_item_titles:
+ radio_items.append(similar_track)
+ radio_item_titles.add(similar_track.name)
+ continue
+ if len(radio_items) >= limit:
+ break
+ # Shuffle the final items list
+ random.shuffle(radio_items)
+ return radio_items
import shutil
from contextlib import suppress
from itertools import zip_longest
+from math import inf
from typing import TYPE_CHECKING
from music_assistant.common.helpers.datetime import utc_timestamp
DEFAULT_SYNC_INTERVAL = 3 * 60 # default sync interval in minutes
CONF_SYNC_INTERVAL = "sync_interval"
CONF_DELETED_PROVIDERS = "deleted_providers"
+CONF_ADD_LIBRARY_ON_PLAY = "add_library_on_play"
class MusicController(CoreController):
description="Interval (in minutes) that a (delta) sync "
"of all providers should be performed.",
),
+ ConfigEntry(
+ key=CONF_ADD_LIBRARY_ON_PLAY,
+ type=ConfigEntryType.BOOLEAN,
+ default_value=False,
+ label="Add item to the library as soon as its played",
+ description="Automatically add a track or radio station to "
+ "the library when played (if its not already in the library).",
+ ),
)
async def setup(self, config: CoreConfig) -> None:
{
"item_id": item_id,
"provider": provider.lookup_key,
- "integrated": loudness.integrated,
- "true_peak": loudness.true_peak,
- "lra": loudness.lra,
- "threshold": loudness.threshold,
- "target_offset": loudness.target_offset,
+ "integrated": round(loudness.integrated, 2),
+ "true_peak": round(loudness.true_peak, 2),
+ "lra": round(loudness.lra, 2),
+ "threshold": round(loudness.threshold, 2),
+ "target_offset": round(loudness.target_offset, 2),
},
allow_replace=True,
)
"provider": provider.lookup_key,
},
):
+ if result["integrated"] == inf or result["integrated"] == -inf:
+ return None
+
return LoudnessMeasurement(
integrated=result["integrated"],
true_peak=result["true_peak"],
)
# also update playcount in library table
- if provider_instance_id_or_domain != "library":
- return
ctrl = self.get_controller(media_type)
- await self.database.execute(
- f"UPDATE {ctrl.db_table} SET play_count = play_count + 1, "
- f"last_played = {timestamp} WHERE item_id = {item_id}"
- )
+ if self.mass.config.get_raw_core_config_value(self.domain, CONF_ADD_LIBRARY_ON_PLAY):
+ # handle feature to add to the lib on playback
+ db_item = await ctrl.get(
+ item_id, provider_instance_id_or_domain, lazy=False, add_to_library=True
+ )
+ else:
+ db_item = await ctrl.get_library_item_by_prov_id(
+ item_id, provider_instance_id_or_domain
+ )
+ if db_item:
+ await self.database.execute(
+ f"UPDATE {ctrl.db_table} SET play_count = play_count + 1, "
+ f"last_played = {timestamp} WHERE item_id = {db_item.item_id}"
+ )
await self.database.commit()
def get_controller(
@api_command("player_queues/shuffle")
def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
"""Configure shuffle setting on the the queue."""
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
@api_command("player_queues/repeat")
def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
"""Configure repeat setting on the the queue."""
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
"""
# ruff: noqa: PLR0915,PLR0912
queue = self._queues[queue_id]
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
- pos_shift: move item x positions up if negative value
- pos_shift: move item to top of queue as next item if 0.
"""
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
@api_command("player_queues/delete_item")
def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
"""Delete item (by id or index) from the queue."""
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
if isinstance(item_id_or_index, str):
@api_command("player_queues/clear")
def clear(self, queue_id: str) -> None:
"""Clear all items in the queue."""
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
- queue_id: queue_id of the playerqueue to handle the command.
"""
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
if queue := self.get(queue_id):
- queue_id: queue_id of the playerqueue to handle the command.
"""
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
if self._queues[queue_id].state == PlayerState.PAUSED:
- queue_id: queue_id of the playerqueue to handle the command.
"""
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
- player = self.mass.players.get(queue_id, True)
if PlayerFeature.PAUSE not in player.supported_features:
# if player does not support pause, we need to send stop
await self.stop(queue_id)
- queue_id: queue_id of the queue to handle the command.
"""
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
current_index = self._queues[queue_id].current_index
- queue_id: queue_id of the queue to handle the command.
"""
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
current_index = self._queues[queue_id].current_index
- queue_id: queue_id of the queue to handle the command.
- seconds: number of seconds to skip in track. Use negative value to skip back.
"""
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
await self.seek(queue_id, self._queues[queue_id].elapsed_time + seconds)
- queue_id: queue_id of the queue to handle the command.
- position: position in seconds to seek to in the current playing item.
"""
- if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ # always fetch the underlying player so we can raise early if its not available
+ player = self.mass.players.get(queue_id, True)
+ if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
artist.provider,
in_library_only=artist_items_conf == "library_album_tracks",
):
- for album_track in self.mass.music.albums.tracks(
+ for album_track in await self.mass.music.albums.tracks(
library_album.item_id, library_album.provider
):
if album_track not in all_items:
func.__name__,
player.display_name,
)
- await func(self, *args, **kwargs)
+ try:
+ await func(self, *args, **kwargs)
+ except Exception as err:
+ raise PlayerCommandFailed(str(err)) from err
return wrapper
CONF_EQ_MID,
CONF_EQ_TREBLE,
CONF_OUTPUT_CHANNELS,
+ CONF_VOLUME_NORMALIZATION,
CONF_VOLUME_NORMALIZATION_TARGET,
MASS_LOGGER_NAME,
VERBOSE_LOG_LEVEL,
streamdetails.item_id, streamdetails.provider
)
player_settings = await mass.config.get_player_config(streamdetails.queue_id)
- if player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET):
+ if player_settings.get_value(CONF_VOLUME_NORMALIZATION):
streamdetails.target_loudness = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET)
else:
streamdetails.target_loudness = None
if "[Parsed_loudnorm_" not in stderr_data:
return None
stderr_data = stderr_data.split("[Parsed_loudnorm_")[1]
- stderr_data = stderr_data.rsplit("]")[-1].strip()
+ stderr_data = "{" + stderr_data.rsplit("{")[-1].strip()
stderr_data = stderr_data.rsplit("}")[0].strip() + "}"
try:
loudness_data = json_loads(stderr_data)
continue
elif len(line) != 0:
# Get song path from all other, non-blank lines
+ if "%20" in line:
+ # apparently VLC manages to encode spaces in filenames
+ line = line.replace("%20", " ") # noqa: PLW2901
playlist.append(
PlaylistItem(path=line, length=length, title=title, stream_info=stream_info)
)
player_id,
)
return
- await self.mass.create_task(sonos_player.soco.stop)
+ await asyncio.to_thread(sonos_player.soco.stop)
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY command to given player."""
player_id,
)
return
- await self.mass.create_task(sonos_player.soco.play)
+ await asyncio.to_thread(sonos_player.soco.play)
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player."""
# pause not possible
await self.cmd_stop(player_id)
return
- await self.mass.create_task(sonos_player.soco.pause)
+ await asyncio.to_thread(sonos_player.soco.pause)
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
sonos_player = self.sonosplayers[player_id]
sonos_player.soco.volume = volume_level
- await self.mass.create_task(set_volume_level, player_id, volume_level)
+ await asyncio.to_thread(set_volume_level, player_id, volume_level)
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
"""Send VOLUME MUTE command to given player."""
sonos_player = self.sonosplayers[player_id]
sonos_player.soco.mute = muted
- await self.mass.create_task(set_volume_mute, player_id, muted)
+ await asyncio.to_thread(set_volume_mute, player_id, muted)
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player.
raise PlayerCommandFailed(msg)
didl_metadata = create_didl_metadata(media)
- self.mass.create_task(sonos_player.soco.play_uri, media.uri, meta=didl_metadata)
+ await asyncio.to_thread(sonos_player.soco.play_uri, media.uri, meta=didl_metadata)
async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle enqueuing of the next queue item on the player."""
liked_songs = Playlist(
item_id=self._get_liked_songs_playlist_id(),
provider=self.domain,
- name="Liked Songs", # TODO to be translated
- owner="Me", # TODO Get logged in user display name
+ name=f'Liked Songs {self._sp_user["display_name"]}', # TODO to be translated
+ owner=self._sp_user["display_name"],
provider_mappings={
ProviderMapping(
item_id=self._get_liked_songs_playlist_id(),