from __future__ import annotations
import logging
+import warnings
from collections.abc import Iterable
from dataclasses import dataclass
from enum import Enum
from .enums import ConfigEntryType
+# TEMP: ignore UserWarnings from mashumaro
+# https://github.com/Fatal1ty/mashumaro/issues/221
+warnings.filterwarnings("ignore", category=UserWarning, module="mashumaro")
+
LOGGER = logging.getLogger(__name__)
ENCRYPT_CALLBACK: callable[[str], str] | None = None
label=CONF_FLOW_MODE,
default_value=True,
value=True,
+ hidden=True,
)
CONF_ENTRY_AUTO_PLAY = ConfigEntry(
label="Target level for volume normalization",
description="Adjust average (perceived) loudness to this target level",
depends_on=CONF_VOLUME_NORMALIZATION,
- category="audio",
+ category="advanced",
)
CONF_ENTRY_EQ_BASS = ConfigEntry(
label="Crossfade duration",
description="Duration in seconds of the crossfade between tracks (if enabled)",
depends_on=CONF_CROSSFADE,
- category="audio",
+ category="advanced",
)
CONF_ENTRY_HIDE_PLAYER = ConfigEntry(
async def get_provider_config_value(self, instance_id: str, key: str) -> ConfigValueType:
"""Return single configentry value for a provider."""
cache_key = f"prov_conf_value_{instance_id}.{key}"
- if cached_value := self._value_cache.get(cache_key) is not None:
+ if (cached_value := self._value_cache.get(cache_key)) is not None:
return cached_value
conf = await self.get_provider_config(instance_id)
val = (
async def get_player_config(self, player_id: str) -> PlayerConfig:
"""Return (full) configuration for a single player."""
if raw_conf := self.get(f"{CONF_PLAYERS}/{player_id}"):
- if prov := self.mass.get_provider(raw_conf["provider"]):
+ if player := self.mass.players.get(player_id, False):
+ raw_conf["default_name"] = player.display_name
+ raw_conf["provider"] = player.provider
+ prov = self.mass.get_provider(player.provider)
conf_entries = await prov.get_player_config_entries(player_id)
- if player := self.mass.players.get(player_id, False):
- raw_conf["default_name"] = player.display_name
else:
- conf_entries = ()
+ # handle unavailable player and/or provider
+ if prov := self.mass.get_provider(raw_conf["provider"]):
+ conf_entries = await prov.get_player_config_entries(player_id)
+ else:
+ conf_entries = ()
raw_conf["available"] = False
raw_conf["name"] = raw_conf.get("name")
raw_conf["default_name"] = raw_conf.get("default_name") or raw_conf["player_id"]
from music_assistant.constants import (
DB_TABLE_ALBUMS,
DB_TABLE_ARTISTS,
+ DB_TABLE_PLAYLOG,
DB_TABLE_PROVIDER_MAPPINGS,
MASS_LOGGER_NAME,
)
DB_TABLE_PROVIDER_MAPPINGS,
{"media_type": self.media_type.value, "item_id": db_id},
)
+ # cleanup playlog table
+ await self.mass.music.database.delete(
+ DB_TABLE_PLAYLOG,
+ {
+ "media_type": self.media_type.value,
+ "item_id": db_id,
+ "provider": "library",
+ },
+ )
+ for prov_mapping in library_item.provider_mappings:
+ await self.mass.music.database.delete(
+ DB_TABLE_PLAYLOG,
+ {
+ "media_type": self.media_type.value,
+ "item_id": prov_mapping.item_id,
+ "provider": prov_mapping.provider_instance,
+ },
+ )
# NOTE: this does not delete any references to this item in other records,
# this is handled/overridden in the mediatype specific controllers
self.mass.signal_event(EventType.MEDIA_ITEM_DELETED, library_item.uri, library_item)
"provider_item_id": provider_item_id,
},
)
+ # cleanup playlog table
+ await self.mass.music.database.delete(
+ DB_TABLE_PLAYLOG,
+ {
+ "media_type": self.media_type.value,
+ "item_id": provider_item_id,
+ "provider": provider_instance_id,
+ },
+ )
if library_item.provider_mappings:
# we (temporary?) duplicate the provider mappings in a separate column of the media
# item's table, because the json_group_array query is superslow
final_tracks.append(track)
else:
final_tracks = tracks
- # we set total to None as we have no idea how many tracks there are
- # the frontend can figure this out and stop paging when it gets an empty list
- return PagedItems(items=final_tracks, limit=limit, offset=offset, total=None)
+ # We set total to None as we have no idea how many tracks there are.
+ # The frontend can figure this out and stop paging when it gets an empty list.
+ # Exception is when we receive a result that is either much higher
+ # or smaller than the limit - in that case we consider the list final.
+ total = None
+ count = len(final_tracks)
+ if count and (count < (limit - 10) or count > (limit + 10)):
+ total = offset + len(final_tracks)
+ return PagedItems(items=final_tracks, limit=limit, offset=offset, total=total, count=count)
async def create_playlist(
self, name: str, provider_instance_or_domain: str | None = None
break
if paged_items.count == 0:
break
+ if paged_items.total is None and paged_items.items == result:
+ # safety guard for malfunctioning provider
+ break
offset += paged_items.count
return result
prov_key = provider_instance_id_or_domain
# do not try to store dynamic urls (e.g. with auth token etc.),
- # stick with plaun uri/urls only
+ # stick with plain uri/urls only
if "http" in item_id and "?" in item_id:
return
else:
self.logger.info("Sync task for %s completed", provider.name)
self.mass.signal_event(EventType.SYNC_TASKS_UPDATED, data=self.in_progress_syncs)
+ # schedule db cleanup after sync
+ if not self.in_progress_syncs:
+ self.mass.create_task(self._cleanup_database())
task.add_done_callback(on_sync_task_done)
if remaining_items_count := await self.database.get_count_from_query(query):
errors += remaining_items_count
+ # cleanup playlog table
+ await self.mass.music.database.delete(
+ DB_TABLE_PLAYLOG,
+ {
+ "provider": provider_instance,
+ },
+ )
+
if errors == 0:
# cleanup successful, remove from the deleted_providers setting
self.logger.info("Provider %s removed from library", provider_instance)
# NOTE: sync_interval is stored in minutes, we need seconds
self.mass.loop.call_later(sync_interval * 60, self._schedule_sync)
+ async def _cleanup_database(self) -> None:
+ """Perform database cleanup/maintenance."""
+ self.logger.debug("Performing database cleanup...")
+ # Remove playlog entries older than 90 days
+ await self.database.delete_where_query(
+ DB_TABLE_PLAYLOG, f"timestamp < strftime('%s','now') - {3600 * 24 * 90}"
+ )
+ # db tables cleanup
+ for ctrl in (self.albums, self.artists, self.tracks, self.playlists, self.radio):
+ # Provider mappings where the db item is removed
+ query = (
+ f"item_id not in (SELECT item_id from {ctrl.db_table}) "
+ f"AND media_type = '{ctrl.media_type}'"
+ )
+ await self.database.delete_where_query(DB_TABLE_PROVIDER_MAPPINGS, query)
+ # Orphaned db items
+ query = (
+ f"item_id not in (SELECT item_id from {DB_TABLE_PROVIDER_MAPPINGS} "
+ f"WHERE media_type = '{ctrl.media_type}')"
+ )
+ await self.database.delete_where_query(ctrl.db_table, query)
+ # Cleanup removed db items from the playlog
+ where_clause = (
+ f"media_type = '{ctrl.media_type}' AND provider = 'library' "
+ f"AND item_id not in (select item_id from {ctrl.db_table})"
+ )
+ await self.mass.music.database.delete_where_query(DB_TABLE_PLAYLOG, where_clause)
+ self.logger.debug("Database cleanup done")
+
async def _setup_database(self) -> None:
"""Initialize database."""
db_path = os.path.join(self.mass.storage_path, "library.db")
"Player %s does not support (un)sync commands", child_player.name
)
continue
+ if child_player.synced_to and child_player.synced_to == target_player:
+ continue # already synced to this target
if child_player.synced_to and child_player.synced_to != target_player:
# player already synced to another player, unsync first
self.logger.warning(
continue
# if we reach here, all checks passed
final_player_ids.append(child_player_id)
+ # set active source if player is synced
+ child_player.active_source = parent_player.active_source
# forward command to the player provider after all (base) sanity checks
player_provider = self.get_player_provider(target_player)
@api_command("players/cmd/unsync_many")
async def cmd_unsync_many(self, player_ids: list[str]) -> None:
- """Handle UNSYNC command for all the given players.
-
- Remove the given player from any syncgroups it currently is synced to.
-
- - player_id: player_id of the player to handle the command.
- """
+ """Handle UNSYNC command for all the given players."""
# filter all player ids on compatibility and availability
final_player_ids: UniqueList[str] = UniqueList()
for player_id in player_ids:
)
continue
final_player_ids.append(player_id)
+ # reset active source player if is unsynced
+ child_player.active_source = None
if not final_player_ids:
return
get_hls_stream,
get_icy_stream,
get_player_filter_params,
+ get_silence,
parse_loudnorm,
strip_silence,
)
)
elif streamdetails.stream_type == StreamType.ICY:
audio_source = get_icy_stream(self.mass, streamdetails.path, streamdetails)
+ # pad some silence before the radio stream starts to create some headroom
+ # for radio stations that do not provide any look ahead buffer
+ # without this, some radio streams jitter a lot
+ async for chunk in get_silence(2, pcm_format):
+ yield chunk
elif streamdetails.stream_type == StreamType.HLS:
audio_source = get_hls_stream(
self.mass, streamdetails.path, streamdetails, streamdetails.seek_position
CONF_ALAC_ENCODE = "alac_encode"
CONF_VOLUME_START = "volume_start"
CONF_PASSWORD = "password"
-
+CONF_BIND_INTERFACE = "bind_interface"
PLAYER_CONFIG_ENTRIES = (
CONF_ENTRY_FLOW_MODE_ENFORCED,
values: the (intermediate) raw values for config entries sent with the action.
"""
# ruff: noqa: ARG001
- return () # we do not have any config entries (yet)
+ return (
+ ConfigEntry(
+ key=CONF_BIND_INTERFACE,
+ type=ConfigEntryType.STRING,
+ default_value=mass.streams.publish_ip,
+ label="Bind interface",
+ description="Interface to bind to for Airplay streaming.",
+ category="advanced",
+ ),
+ )
def convert_airplay_volume(value: float) -> int:
extra_args = []
player_id = self.airplay_player.player_id
mass_player = self.mass.players.get(player_id)
+ bind_ip = await self.mass.config.get_provider_config_value(
+ self.prov.instance_id, CONF_BIND_INTERFACE
+ )
+ extra_args += ["-if", bind_ip]
if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False):
extra_args += ["-encrypt"]
if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
"""Stop playback and cleanup."""
if self._stopped:
return
- if not self._cliraop_proc.closed:
+ if self._cliraop_proc.proc and not self._cliraop_proc.closed:
await self.send_cli_command("ACTION=STOP")
self._stopped = True # set after send_cli command!
if self.audio_source_task and not self.audio_source_task.done():
self.audio_source_task.cancel()
- try:
- await asyncio.wait_for(self._cliraop_proc.wait(), 5)
- except TimeoutError:
- self.prov.logger.warning(
- "Raop process for %s did not stop in time, is the player offline?",
- self.airplay_player.player_id,
- )
- await self._cliraop_proc.close(True)
+ if self._cliraop_proc.proc:
+ try:
+ await asyncio.wait_for(self._cliraop_proc.wait(), 5)
+ except TimeoutError:
+ self.prov.logger.warning(
+ "Raop process for %s did not stop in time, is the player offline?",
+ self.airplay_player.player_id,
+ )
+ await self._cliraop_proc.close(True)
# ffmpeg can sometimes hang due to the connected pipes
# we handle closing it but it can be a bit slow so do that in the background
# prefer interactive command to our streamer
tg.create_task(airplay_player.active_stream.send_cli_command("ACTION=PAUSE"))
- async def play_media(
+ async def play_media( # noqa: PLR0915
self,
player_id: str,
media: PlayerMedia,
ugp_stream = ugp_provider.streams[media.queue_id]
input_format = ugp_stream.audio_format
audio_source = ugp_stream.subscribe_raw()
+ elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id:
+ # radio stream - consume media stream directly
+ input_format = AIRPLAY_PCM_FORMAT
+ queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id)
+ audio_source = self.mass.streams.get_media_stream(
+ streamdetails=queue_item.streamdetails,
+ pcm_format=AIRPLAY_PCM_FORMAT,
+ )
elif media.queue_id and media.queue_item_id:
- # regular queue stream request
+ # regular queue (flow) stream request
input_format = AIRPLAY_PCM_FORMAT
audio_source = self.mass.streams.get_flow_stream(
queue=self.mass.player_queues.get(media.queue_id),
# Convert HTTP errors to exceptions
if response.status == 404:
raise MediaNotFoundError(f"{endpoint} not found")
+ if response.status == 504:
+ # See if we can get more info from the response on occasional timeouts
+ self.logger.debug("Apple Music API Timeout: %s", response.json(loads=json_loads))
+ raise ResourceTemporarilyUnavailable("Apple Music API Timeout")
if response.status == 429:
# Debug this for now to see if the response headers give us info about the
# backoff time. There is no documentation on this.
track.position = offset + index
result.append(track)
except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
- self.logger.warning("Skipping item in playlist: %s:%s", uri, str(err))
+ self.logger.warning(
+ "Skipping %s in playlist %s: %s", uri, prov_playlist_id, str(err)
+ )
return result
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
- # TODO: implement pagination!
+ # TODO: access the underlying paging on the deezer api instead of this hack
playlist = await self.client.get_playlist(int(prov_playlist_id))
- for index, deezer_track in enumerate(await playlist.get_tracks(offset=offset, limit=limit)):
+ playlist_tracks = await playlist.get_tracks()
+ for index, deezer_track in enumerate(playlist_tracks[offset : offset + limit], 1):
result.append(
self.parse_track(
track=deezer_track,
DB_TABLE_ALBUM_TRACKS,
DB_TABLE_ALBUMS,
DB_TABLE_ARTISTS,
- DB_TABLE_PLAYLOG,
DB_TABLE_PROVIDER_MAPPINGS,
DB_TABLE_TRACK_ARTISTS,
VARIOUS_ARTISTS_NAME,
async def _process_orphaned_albums_and_artists(self) -> None:
"""Process deletion of orphaned albums and artists."""
- # process orphaned albums and artists
-
# Remove albums without any tracks
query = (
f"SELECT item_id FROM {DB_TABLE_ALBUMS} "
- f"WHERE item_id not in (select album_id from {DB_TABLE_ALBUM_TRACKS})"
+ f"WHERE item_id not in ( SELECT album_id from {DB_TABLE_ALBUM_TRACKS}) "
+ f"AND item_id in ( SELECT item_id from {DB_TABLE_PROVIDER_MAPPINGS} "
+ f"WHERE provider_instance = '{self.instance_id}' and media_type = 'album' )"
)
for db_row in await self.mass.music.database.get_rows_from_query(
query,
# Remove artists without any tracks or albums
query = (
f"SELECT item_id FROM {DB_TABLE_ARTISTS} "
- f"WHERE item_id not in ("
- f"select artist_id from {DB_TABLE_TRACK_ARTISTS} "
- f"UNION "
- f"select artist_id from {DB_TABLE_ALBUM_ARTISTS}"
- ")"
+ f"WHERE item_id not in "
+ f"( select artist_id from {DB_TABLE_TRACK_ARTISTS} "
+ f"UNION SELECT artist_id from {DB_TABLE_ALBUM_ARTISTS} )"
+ f"AND item_id in ( SELECT item_id from {DB_TABLE_PROVIDER_MAPPINGS} "
+ f"WHERE provider_instance = '{self.instance_id}' and media_type = 'artist' )"
)
for db_row in await self.mass.music.database.get_rows_from_query(
query,
):
await self.mass.music.artists.remove_item_from_library(db_row["item_id"])
- # Provider mappings where the album is removed
- query = (
- f"SELECT item_id FROM {DB_TABLE_PROVIDER_MAPPINGS} "
- f"WHERE media_type = 'album' "
- f"and item_id not in (select item_id from {DB_TABLE_ALBUMS})"
- )
- for db_row in await self.mass.music.database.get_rows_from_query(query, limit=100000):
- await self.mass.music.albums.remove_provider_mappings(
- db_row["item_id"], self.instance_id
- )
-
- # Provider mappings where the artist is removed
- query = (
- f"SELECT item_id FROM {DB_TABLE_PROVIDER_MAPPINGS} "
- f"WHERE media_type = 'artist' "
- f"and item_id not in (select item_id from {DB_TABLE_ARTISTS})"
- )
- for db_row in await self.mass.music.database.get_rows_from_query(query, limit=100000):
- await self.mass.music.artists.remove_provider_mappings(
- db_row["item_id"], self.instance_id
- )
-
- # Remove albums that are removed from the playlog
- where_clause = (
- f"media_type = 'album' "
- f"and provider = '{self.instance_id}' "
- f"and item_id not in (select item_id from albums)"
- )
- await self.mass.music.database.delete_where_query(DB_TABLE_PLAYLOG, where_clause)
-
async def _process_deletions(self, deleted_files: set[str]) -> None:
"""Process all deletions."""
# process deleted tracks/playlists
)\r
if not playlist_items:\r
return result\r
- for index, jellyfin_track in enumerate(playlist_items[offset : offset + limit]):\r
+ for index, jellyfin_track in enumerate(playlist_items[offset : offset + limit], 1):\r
try:\r
if track := await self._parse_track(jellyfin_track):\r
if not track.position:\r
- track.position = index\r
+ track.position = offset + index\r
result.append(track)\r
except (KeyError, ValueError) as err:\r
self.logger.error(\r
except (ParameterError, DataNotFoundError) as e:
msg = f"Playlist {prov_playlist_id} not found"
raise MediaNotFoundError(msg) from e
- for index, sonic_song in enumerate(sonic_playlist.songs[offset : offset + limit]):
+ # TODO: figure out if subsonic supports paging here
+ for index, sonic_song in enumerate(sonic_playlist.songs[offset : offset + limit], 1):
track = self._parse_track(sonic_song)
- track.position = index
+ track.position = offset + index
result.append(track)
return result
import asyncio
import logging
from asyncio import TaskGroup
+from contextlib import suppress
from typing import TYPE_CHECKING
import plexapi.exceptions
)
# Only add 5-star rated albums to Favorites. rating will be 10.0 for those.
# TODO: Let user set threshold?
- try:
+ with suppress(KeyError):
+ # suppress KeyError (as it doesn't exist for items without rating),
+ # allow sync to continue
album.favorite = plex_album._data.attrib["userRating"] == "10.0"
- except KeyError:
- # Log but suppress exception, allow sync to continue
- self.logger.error("ERROR: %s has no rating", plex_album.title)
if plex_album.year:
album.year = plex_album.year
)
# Only add 5-star rated tracks to Favorites. userRating will be 10.0 for those.
# TODO: Let user set threshold?
- try:
+ with suppress(KeyError):
+ # suppress KeyError (as it doesn't exist for items without rating),
+ # allow sync to continue
track.favorite = plex_track._data.attrib["userRating"] == "10.0"
- except KeyError:
- # Log but suppress exception, allow sync to continue
- self.logger.error("ERROR: %s has no userRating", plex_track.title)
if plex_track.originalTitle and plex_track.originalTitle != plex_track.grandparentTitle:
# The artist of the track if different from the album's artist.
plex_playlist: PlexPlaylist = await self._get_data(prov_playlist_id, PlexPlaylist)
if not (playlist_items := await self._run_async(plex_playlist.items)):
return result
- for index, plex_track in enumerate(playlist_items[offset : offset + limit]):
+ for index, plex_track in enumerate(playlist_items[offset : offset + limit], 1):
if track := await self._parse_track(plex_track):
track.position = index
result.append(track)
await self._get_snapgroup(player_id).set_stream("default")
self._handle_update()
- async def play_media(self, player_id: str, media: PlayerMedia) -> None:
+ async def play_media(self, player_id: str, media: PlayerMedia) -> None: # noqa: PLR0915
"""Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
if player.synced_to:
ugp_stream = ugp_provider.streams[media.queue_id]
input_format = ugp_stream.audio_format
audio_source = ugp_stream.subscribe_raw()
+ elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id:
+ # radio stream - consume media stream directly
+ input_format = DEFAULT_SNAPCAST_FORMAT
+ queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id)
+ audio_source = self.mass.streams.get_media_stream(
+ streamdetails=queue_item.streamdetails,
+ pcm_format=DEFAULT_SNAPCAST_FORMAT,
+ )
elif media.queue_id and media.queue_item_id:
- # regular queue stream request
+ # regular queue (flow) stream request
input_format = DEFAULT_SNAPCAST_FORMAT
audio_source = self.mass.streams.get_flow_stream(
queue=self.mass.player_queues.get(media.queue_id),
await asyncio.to_thread(set_volume_mute, player_id, muted)
+ async def cmd_sync_many(self, target_player: str, child_player_ids: list[str]) -> None:
+ """Create temporary sync group by joining given players to target player."""
+ sonos_master_player = self.sonosplayers[target_player]
+ await sonos_master_player.join(
+ [self.sonosplayers[player_id] for player_id in child_player_ids]
+ )
+
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player.
playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=prov_playlist_id)
if "tracks" not in playlist_obj:
return result
- for index, item in enumerate(playlist_obj["tracks"][offset : offset + limit]):
+ for index, item in enumerate(playlist_obj["tracks"][offset : offset + limit], 1):
song = await self._soundcloud.get_track_details(item["id"])
try:
# TODO: is it really needed to grab the entire track with an api call ?
else f"playlists/{prov_playlist_id}/tracks"
)
spotify_result = await self._get_data(uri, limit=limit, offset=offset)
- for index, item in enumerate(spotify_result["items"]):
+ for index, item in enumerate(spotify_result["items"], 1):
if not (item and item["track"] and item["track"]["id"]):
continue
# use count as position
tidal_session = await self._get_tidal_session()
result: list[Track] = []
track_obj: TidalTrack # satisfy the type checker
- for index, track_obj in enumerate(
- await get_playlist_tracks(tidal_session, prov_playlist_id, limit=limit, offset=offset)
- ):
+ tidal_tracks = await get_playlist_tracks(
+ tidal_session, prov_playlist_id, limit=limit, offset=offset
+ )
+ for index, track_obj in enumerate(tidal_tracks, 1):
track = self._parse_track(track_obj=track_obj)
track.position = offset + index
result.append(track)
label="Please note that although the universal group "
"allows you to group any player, it will not enable audio sync "
"between players of different ecosystems.",
+ required=False,
),
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
if "tracks" not in playlist_obj:
return None
result = []
- for index, track_obj in enumerate(playlist_obj["tracks"]):
+ # TODO: figure out how to handle paging in YTM
+ for index, track_obj in enumerate(playlist_obj["tracks"][offset : offset + limit]):
if track_obj["isAvailable"]:
# Playlist tracks sometimes do not have a valid artist id
# In that case, call the API for track details based on track id
artist_obj = await get_artist(prov_artist_id=prov_artist_id, headers=self._headers)
if artist_obj.get("songs") and artist_obj["songs"].get("browseId"):
prov_playlist_id = artist_obj["songs"]["browseId"]
- playlist_tracks = await self.get_playlist_tracks(prov_playlist_id, 0, 0)
- return playlist_tracks[:25]
+ return await self.get_playlist_tracks(prov_playlist_id, 0, 25)
return []
async def library_add(self, item: MediaItemType) -> bool: