InvalidProviderURI,
MediaNotFoundError,
MusicAssistantError,
- ProviderUnavailableError,
)
from music_assistant_models.helpers import get_global_cache_value
from music_assistant_models.media_items import (
CONF_SYNC_INTERVAL = "sync_interval"
CONF_DELETED_PROVIDERS = "deleted_providers"
CONF_ADD_LIBRARY_ON_PLAY = "add_library_on_play"
-DB_SCHEMA_VERSION: Final[int] = 14
+DB_SCHEMA_VERSION: Final[int] = 15
class MusicController(CoreController):
@api_command("music/recently_played_items")
async def recently_played(
self, limit: int = 10, media_types: list[MediaType] | None = None
- ) -> list[MediaItemType]:
+ ) -> list[ItemMapping]:
"""Return a list of the last played items."""
if media_types is None:
media_types = MediaType.ALL
media_types_str = "(" + ",".join(f'"{x}"' for x in media_types) + ")"
- # temporary fix to avoid too many queries on providers:
- # we only query for library items for now
query = (
- f"SELECT * FROM {DB_TABLE_PLAYLOG} WHERE provider = 'library' "
- f"AND media_type in {media_types_str} ORDER BY timestamp DESC"
+ f"SELECT * FROM {DB_TABLE_PLAYLOG} "
+ f"WHERE media_type in {media_types_str} ORDER BY timestamp DESC"
)
db_rows = await self.mass.music.database.get_rows_from_query(query, limit=limit)
- result: list[MediaItemType] = []
+ result: list[ItemMapping] = []
+ available_providers = ("library", *get_global_cache_value("unique_providers", []))
for db_row in db_rows:
- if db_row["provider"] not in get_global_cache_value("unique_providers", []):
- continue
- with suppress(MediaNotFoundError, ProviderUnavailableError):
- media_type = MediaType(db_row["media_type"])
- ctrl = self.get_controller(media_type)
- item = await ctrl.get(
- db_row["item_id"],
- db_row["provider"],
+ result.append(
+ ItemMapping.from_dict(
+ {
+ "item_id": db_row["item_id"],
+ "provider": db_row["provider"],
+ "media_type": db_row["media_type"],
+ "name": db_row["name"],
+ "image": json_loads(db_row["image"]) if db_row["image"] else None,
+ "available": db_row["provider"] in available_providers,
+ }
)
- result.append(item)
+ )
return result
@api_command("music/item_by_uri")
@api_command("music/mark_played")
async def mark_item_played(
self,
- media_type: MediaType,
- item_id: str,
- provider_instance_id_or_domain: str,
+ media_item: MediaItemType | ItemMapping,
fully_played: bool | None = None,
seconds_played: int | None = None,
) -> None:
"""Mark item as played in playlog."""
timestamp = utc_timestamp()
-
if (
- provider_instance_id_or_domain.startswith("builtin")
- and media_type != MediaType.PLAYLIST
+ media_item.provider.startswith("builtin")
+ and media_item.media_type != MediaType.PLAYLIST
):
# we deliberately skip builtin provider items as those are often
# one-off items like TTS or some sound effect etc.
return
- if provider_instance_id_or_domain == "library":
- prov_key = "library"
- elif prov := self.mass.get_provider(provider_instance_id_or_domain):
- prov_key = prov.lookup_key
- else:
- prov_key = provider_instance_id_or_domain
-
# update generic playlog table
await self.database.insert(
DB_TABLE_PLAYLOG,
{
- "item_id": item_id,
- "provider": prov_key,
- "media_type": media_type.value,
+ "item_id": media_item.item_id,
+ "provider": media_item.provider,
+ "media_type": media_item.media_type.value,
+ "name": media_item.name,
+ "image": serialize_to_json(media_item.image.to_dict())
+ if media_item.image
+ else None,
"fully_played": fully_played,
"seconds_played": seconds_played,
"timestamp": timestamp,
allow_replace=True,
)
+ # forward to provider(s) to sync resume state (e.g. for audiobooks)
+ for prov_mapping in media_item.provider_mappings:
+ if music_prov := self.mass.get_provider(prov_mapping.provider_instance):
+ self.mass.create_task(
+ music_prov.on_played(
+ media_type=media_item.media_type,
+ item_id=prov_mapping.item_id,
+ fully_played=False,
+ position=0,
+ )
+ )
+
# also update playcount in library table
- if not (ctrl := self.get_controller(media_type)):
+ if not (ctrl := self.get_controller(media_item.media_type)):
# skip non media items (e.g. plugin source)
return
- db_item = await ctrl.get_library_item_by_prov_id(item_id, provider_instance_id_or_domain)
+ db_item = await ctrl.get_library_item_by_prov_id(media_item.item_id, media_item.provider)
if (
not db_item
- and media_type in (MediaType.TRACK, MediaType.RADIO)
+ and media_item.media_type in (MediaType.TRACK, MediaType.RADIO)
and self.mass.config.get_raw_core_config_value(self.domain, CONF_ADD_LIBRARY_ON_PLAY)
):
# handle feature to add to the lib on playback
- full_item = await ctrl.get(item_id, provider_instance_id_or_domain)
- db_item = await ctrl.add_item_to_library(full_item)
+ db_item = await self.add_item_to_library(media_item)
if db_item:
await self.database.execute(
@api_command("music/mark_unplayed")
async def mark_item_unplayed(
- self, media_type: MediaType, item_id: str, provider_instance_id_or_domain: str
+ self,
+ media_item: MediaItemType | ItemMapping,
) -> None:
"""Mark item as unplayed in playlog."""
- if provider_instance_id_or_domain == "library":
- prov_key = "library"
- elif prov := self.mass.get_provider(provider_instance_id_or_domain):
- prov_key = prov.lookup_key
- else:
- prov_key = provider_instance_id_or_domain
# update generic playlog table
await self.database.delete(
DB_TABLE_PLAYLOG,
{
- "item_id": item_id,
- "provider": prov_key,
- "media_type": media_type.value,
+ "item_id": media_item.item_id,
+ "provider": media_item.provider,
+ "media_type": media_item.media_type.value,
},
)
+ # forward to provider(s) to sync resume state (e.g. for audiobooks)
+ for prov_mapping in media_item.provider_mappings:
+ if music_prov := self.mass.get_provider(prov_mapping.provider_instance):
+ self.mass.create_task(
+ music_prov.on_played(
+ media_type=media_item.media_type,
+ item_id=prov_mapping.item_id,
+ fully_played=False,
+ position=0,
+ )
+ )
# also update playcount in library table
- ctrl = self.get_controller(media_type)
- db_item = await ctrl.get_library_item_by_prov_id(item_id, provider_instance_id_or_domain)
+ ctrl = self.get_controller(media_item.media_type)
+ db_item = await ctrl.get_library_item_by_prov_id(media_item.item_id, media_item.provider)
if db_item:
await self.database.execute(f"UPDATE {ctrl.db_table} SET play_count = play_count - 1")
await self.database.commit()
"Migrating database from version %s to %s", prev_version, DB_SCHEMA_VERSION
)
- if prev_version <= 6:
- # unhandled schema version
- # we do not try to handle more complex migrations
- self.logger.warning(
- "Database schema too old - Resetting library/database - "
- "a full rescan will be performed, this can take a while!"
- )
- for table in (
- DB_TABLE_TRACKS,
- DB_TABLE_ALBUMS,
- DB_TABLE_ARTISTS,
- DB_TABLE_PLAYLISTS,
- DB_TABLE_RADIOS,
- DB_TABLE_AUDIOBOOKS,
- DB_TABLE_PODCASTS,
- DB_TABLE_ALBUM_TRACKS,
- DB_TABLE_PLAYLOG,
- DB_TABLE_PROVIDER_MAPPINGS,
- ):
- await self.database.execute(f"DROP TABLE IF EXISTS {table}")
- await self.database.commit()
- # recreate missing tables
- await self.__create_database_tables()
- return
-
- if prev_version <= 7:
- # remove redundant artists and provider_mappings columns
- for table in (
- DB_TABLE_TRACKS,
- DB_TABLE_ALBUMS,
- DB_TABLE_ARTISTS,
- DB_TABLE_RADIOS,
- DB_TABLE_PLAYLISTS,
- ):
- for column in ("artists", "provider_mappings"):
- try:
- await self.database.execute(f"ALTER TABLE {table} DROP COLUMN {column}")
- except Exception as err:
- if "no such column" in str(err):
- continue
- raise
- # add cache_checksum column to playlists
- try:
- await self.database.execute(
- f"ALTER TABLE {DB_TABLE_PLAYLISTS} ADD COLUMN cache_checksum TEXT DEFAULT ''"
- )
- except Exception as err:
- if "duplicate column" not in str(err):
- raise
-
- if prev_version <= 8:
- # migrate track_loudness --> loudness_measurements
- async for db_row in self.database.iter_items("track_loudness"):
- if db_row["integrated"] == inf or db_row["integrated"] == -inf:
- continue
- if db_row["provider"] in ("radiobrowser", "tunein"):
- continue
- await self.database.insert_or_replace(
- DB_TABLE_LOUDNESS_MEASUREMENTS,
- {
- "item_id": db_row["item_id"],
- "media_type": "track",
- "provider": db_row["provider"],
- "loudness": db_row["integrated"],
- },
- )
- await self.database.execute("DROP TABLE IF EXISTS track_loudness")
+ if prev_version <= 9:
+ raise MusicAssistantError("Database schema version too old to migrate")
if prev_version <= 10:
# add new columns to playlog table
{"metadata": serialize_to_json(metadata)},
)
+ if prev_version <= 14:
+ # Recreate playlog table due to complete new layout
+ await self.database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_PLAYLOG}")
+ await self.__create_database_tables()
+
# save changes
await self.database.commit()
[id] INTEGER PRIMARY KEY AUTOINCREMENT,
[item_id] TEXT NOT NULL,
[provider] TEXT NOT NULL,
- [media_type] TEXT NOT NULL DEFAULT 'track',
+ [media_type] TEXT NOT NULL,
+ [name] TEXT NOT NULL,
+ [image] json,
[timestamp] INTEGER DEFAULT 0,
[fully_played] BOOLEAN,
[seconds_played] INTEGER,
and (prev_item := self.get_item(queue_id, prev_item_id))
and (stream_details := prev_item.streamdetails)
):
- seconds_played = int(prev_state["elapsed_time"])
- fully_played = seconds_played >= (stream_details.duration or 3600) - 5
+ position = int(prev_state["elapsed_time"])
+ seconds_played = position - stream_details.seek_position
+ fully_played = position >= (stream_details.duration or 3600) - 5
self.logger.debug(
"PlayerQueue %s played item %s for %s seconds",
queue.display_name,
prev_item.uri,
seconds_played,
)
- if music_prov := self.mass.get_provider(stream_details.provider):
- self.mass.create_task(
- music_prov.on_streamed(stream_details, seconds_played, fully_played)
- )
if prev_item.media_item and (fully_played or seconds_played > 10):
# add entry to playlog - this also handles resume of podcasts/audiobooks
self.mass.create_task(
self.mass.music.mark_item_played(
- stream_details.media_type,
- stream_details.item_id,
- stream_details.provider,
+ prev_item.media_item,
fully_played=fully_played,
seconds_played=seconds_played,
)
) -> list[MediaItemType]:
"""Resolve/unwrap media items to enqueue."""
if media_item.media_type == MediaType.PLAYLIST:
- self.mass.create_task(
- self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
- )
- )
+ self.mass.create_task(self.mass.music.mark_item_played(media_item))
return await self.get_playlist_tracks(media_item, start_item)
if media_item.media_type == MediaType.ARTIST:
- self.mass.create_task(
- self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
- )
- )
+ self.mass.create_task(self.mass.music.mark_item_played(media_item))
return await self.get_artist_tracks(media_item)
if media_item.media_type == MediaType.ALBUM:
- self.mass.create_task(
- self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
- )
- )
+ self.mass.create_task(self.mass.music.mark_item_played(media_item))
return await self.get_album_tracks(media_item, start_item)
if media_item.media_type == MediaType.AUDIOBOOK:
if resume_point := await self.get_audiobook_resume_point(media_item, start_item):
media_item.resume_position_ms = resume_point
return [media_item]
if media_item.media_type == MediaType.PODCAST:
- self.mass.create_task(
- self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
- )
- )
+ self.mass.create_task(self.mass.music.mark_item_played(media_item))
return await self.get_next_podcast_episodes(media_item, start_item or media_item)
if media_item.media_type == MediaType.PODCAST_EPISODE:
return await self.get_next_podcast_episodes(None, media_item)
- player_id: player_id of the player to handle the command.
"""
player = self._get_player_with_redirect(player_id)
+ if player.state == PlayerState.PLAYING:
+ self.logger.info(
+ "Ignore PLAY request to player %s: player is already playing", player.display_name
+ )
+ return
# Redirect to queue controller if it is active
active_source = player.active_source or player.player_id
if (active_queue := self.mass.player_queues.get(active_source)) and active_queue.items:
task_id = f"analyze_loudness_{streamdetails.uri}"
mass.create_task(analyze_loudness, mass, streamdetails, task_id=task_id)
+ # report stream to provider
+ if (finished or seconds_streamed >= 30) and (
+ music_prov := mass.get_provider(streamdetails.provider)
+ ):
+ mass.create_task(music_prov.on_streamed(streamdetails))
+
def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=None):
"""Generate a wave header from given params."""
async def on_streamed(
self,
streamdetails: StreamDetails,
- seconds_streamed: int,
- fully_played: bool = False,
) -> None:
- """Handle callback when an item completed streaming."""
+ """
+ Handle callback when given streamdetails completed streaming.
+
+ To get the number of seconds streamed, see streamdetails.seconds_streamed.
+ To get the number of seconds seeked/skipped, see streamdetails.seek_position.
+ Note that seconds_streamed is the total streamed seconds, so without seeked time.
+
+ NOTE: Due to internal and player buffering,
+ this may be called in advance of the actual completion.
+ """
+
+ async def on_played(
+ self,
+ media_type: MediaType,
+ item_id: str,
+ fully_played: bool,
+ position: int,
+ ) -> None:
+ """
+ Handle callback when a (playable) media item has been played.
+
+ This is called by the Queue controller when;
+ - a track has been fully played
+ - a track has been skipped
+ - a track has been stopped after being played
+
+ Fully played is True when the track has been played to the end.
+ Position is the last known position of the track in seconds, to sync resume state.
+ When fully_played is set to false and position is 0,
+ the user marked the item as unplayed in the UI.
+ """
async def resolve_image(self, path: str) -> str | bytes:
"""
async def on_streamed(
self,
streamdetails: StreamDetails,
- seconds_streamed: int,
- fully_played: bool = False,
) -> None:
- """Handle callback when an item completed streaming."""
+ """
+ Handle callback when given streamdetails completed streaming.
+
+ To get the number of seconds streamed, see streamdetails.seconds_streamed.
+ To get the number of seconds seeked/skipped, see streamdetails.seek_position.
+ Note that seconds_streamed is the total streamed seconds, so without seeked time.
+
+ NOTE: Due to internal and player buffering,
+ this may be called in advance of the actual completion.
+ """
async def on_streamed(
self,
streamdetails: StreamDetails,
- seconds_streamed: int,
- fully_played: bool = False,
) -> None:
- """Handle callback when an item completed streaming."""
- # This is OPTIONAL callback that is called when an item has been streamed.
+ """
+ Handle callback when given streamdetails completed streaming.
+
+ To get the number of seconds streamed, see streamdetails.seconds_streamed.
+ To get the number of seconds seeked/skipped, see streamdetails.seek_position.
+ Note that seconds_streamed is the total streamed seconds, so without seeked time.
+
+ NOTE: Due to internal and player buffering,
+ this may be called in advance of the actual completion.
+ """
+ # This is an OPTIONAL callback that is called when an item has been streamed.
+ # You can use this e.g. for playback reporting or statistics.
+
+ async def on_played(
+ self,
+ media_type: MediaType,
+ item_id: str,
+ fully_played: bool,
+ position: int,
+ ) -> None:
+ """
+ Handle callback when a (playable) media item has been played.
+
+ This is called by the Queue controller when;
+ - a track has been fully played
+ - a track has been skipped
+ - a track has been stopped after being played
+
+ Fully played is True when the track has been played to the end.
+ Position is the last known position of the track in seconds, to sync resume state.
+ When fully_played is set to false and position is 0,
+ the user marked the item as unplayed in the UI.
+ """
+ # This is an OPTIONAL callback that is called when an item has been streamed.
# You can use this e.g. for playback reporting or statistics.
async def resolve_image(self, path: str) -> str | bytes:
"""Get streamdetails for a audiobook based of asin."""
return await self.helper.get_stream(asin=item_id)
- async def on_streamed(
+ async def on_played(
self,
- streamdetails: StreamDetails,
- seconds_streamed: int,
- fully_played: bool = False,
+ media_type: MediaType,
+ item_id: str,
+ fully_played: bool,
+ position: int,
) -> None:
- """Handle callback when an item completed streaming."""
- await self.helper.set_last_position(streamdetails.item_id, seconds_streamed)
+ """
+ Handle callback when a (playable) media item has been played.
+
+ This is called by the Queue controller when;
+ - a track has been fully played
+ - a track has been skipped
+ - a track has been stopped after being played
+
+ Fully played is True when the track has been played to the end.
+ Position is the last known position of the track in seconds, to sync resume state.
+ When fully_played is set to false and position is 0,
+ the user marked the item as unplayed in the UI.
+ """
+ await self.helper.set_last_position(item_id, position)
async def unload(self, is_removed: bool = False) -> None:
"""
async def _get_builtin_playlist_recently_played(self) -> list[Track]:
result: list[Track] = []
recent_tracks = await self.mass.music.recently_played(100, [MediaType.TRACK])
- for idx, track in enumerate(recent_tracks, 1):
- assert isinstance(track, Track)
+ for idx, item in enumerate(recent_tracks, 1):
+ if not (item_provider := self.mass.get_provider(item.provider)):
+ continue
+ track = Track(
+ item_id=item.item_id,
+ provider=item.provider,
+ name=item.name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=item.item_id,
+ provider_domain=item_provider.domain,
+ provider_instance=item_provider.instance_id,
+ )
+ },
+ )
+ if item.image:
+ track.metadata.add_image(item.image)
track.position = idx
result.append(track)
return result
from aiohttp import ClientSession, ClientTimeout
from Crypto.Cipher import Blowfish
from deezer import exceptions as deezer_exceptions
-from music_assistant_models.config_entries import (
- ConfigEntry,
- ConfigValueType,
- ProviderConfig,
-)
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
from music_assistant_models.enums import (
AlbumType,
ConfigEntryType,
async def on_streamed(
self,
streamdetails: StreamDetails,
- seconds_streamed: int,
- fully_played: bool = False,
) -> None:
"""Handle callback when an item completed streaming."""
await self.gw_client.log_listen(last_track=streamdetails)
self.logger.debug("scrobble for now playing called for %s", item_id)
await self._run_async(self._conn.scrobble, sid=item_id, submission=False)
- async def on_streamed(
+ async def on_played(
self,
- streamdetails: StreamDetails,
- seconds_streamed: int,
- fully_played: bool = False,
+ media_type: MediaType,
+ item_id: str,
+ fully_played: bool,
+ position: int,
) -> None:
- """Handle callback when an item completed streaming."""
- self.logger.debug("on_streamed called for %s", streamdetails.item_id)
- if streamdetails.duration and seconds_streamed >= streamdetails.duration / 2:
- self.logger.debug("scrobble for listen count called for %s", streamdetails.item_id)
- await self._run_async(self._conn.scrobble, sid=streamdetails.item_id, submission=True)
+ """
+ Handle callback when a (playable) media item has been played.
+
+ This is called by the Queue controller when;
+ - a track has been fully played
+ - a track has been skipped
+ - a track has been stopped after being played
+
+ Fully played is True when the track has been played to the end.
+ Position is the last known position of the track in seconds, to sync resume state.
+ When fully_played is set to false and position is 0,
+ the user marked the item as unplayed in the UI.
+ """
+ self.logger.debug("scrobble for listen count called for %s", item_id)
+ await self._run_async(self._conn.scrobble, sid=item_id, submission=True)
async def get_audio_stream(
self, streamdetails: StreamDetails, seek_position: int = 0
async def on_streamed(
self,
streamdetails: StreamDetails,
- seconds_streamed: int,
- fully_played: bool = False,
) -> None:
"""Handle callback when an item completed streaming."""
)
from music_assistant.helpers.app_vars import app_var
from music_assistant.helpers.json import json_loads
-from music_assistant.helpers.throttle_retry import (
- ThrottlerManager,
- throttle_with_retries,
-)
+from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
from music_assistant.helpers.util import lock, parse_title_and_version, try_parse_int
from music_assistant.models.music_provider import MusicProvider
async def on_streamed(
self,
streamdetails: StreamDetails,
- seconds_streamed: int,
- fully_played: bool = False,
) -> None:
"""Handle callback when an item completed streaming."""
user_id = self._user_auth_info["user"]["id"]
"/track/reportStreamingEnd",
user_id=user_id,
track_id=str(streamdetails.item_id),
- duration=try_parse_int(seconds_streamed),
+ duration=try_parse_int(streamdetails.seconds_streamed),
)
def _parse_artist(self, artist_obj: dict):