from music_assistant.helpers.datetime import utc_timestamp
from music_assistant.helpers.uri import parse_uri
from music_assistant.models.config import MusicProviderConfig
-from music_assistant.models.enums import MediaType, ProviderType
+from music_assistant.models.enums import MediaType, MusicProviderFeature, ProviderType
from music_assistant.models.errors import (
MusicAssistantError,
ProviderUnavailableError,
"""
assert provider or provider_id, "Provider needs to be supplied"
prov = self.get_provider(provider_id or provider)
- await provider.search(search_query, media_types, limit)
+ if MusicProviderFeature.SEARCH not in prov.supported_features:
+ return []
# create safe search string
search_query = search_query.replace("/", " ").replace("'", "")
)
return items
- async def browse(self, uri: Optional[str] = None) -> List[BrowseFolder]:
+ async def browse(self, path: Optional[str] = None) -> BrowseFolder:
"""Browse Music providers."""
# root level; folder per provider
- if not uri:
- return [
- BrowseFolder(prov.id, prov.type, prov.name, uri=f"{prov.id}://")
- for prov in self.providers
- if prov.supports_browse
- ]
+ if not path or path == "root":
+ return BrowseFolder(
+ item_id="root",
+ provider=ProviderType.DATABASE,
+ path="root",
+ label="browse",
+ name="",
+ items=[
+ BrowseFolder(
+ item_id="root",
+ provider=prov.type,
+ path=f"{prov.id}://",
+ name=prov.name,
+ )
+ for prov in self.providers
+ if MusicProviderFeature.BROWSE in prov.supported_features
+ ],
+ )
# provider level
- provider_id, path = uri.split("://", 1)
+ provider_id = path.split("://", 1)[0]
prov = self.get_provider(provider_id)
return await prov.browse(path)
provider_item_id, provider=provider, provider_id=provider_id
)
+ async def delete_db_item(
+ self, media_type: MediaType, db_item_id: str, recursive: bool = False
+ ) -> None:
+ """Remove item from the library."""
+ ctrl = self.get_controller(media_type)
+ await ctrl.delete_db_item(db_item_id, recursive)
+
async def refresh_items(self, items: List[MediaItem]) -> None:
"""
Refresh MediaItems to force retrieval of full info and matches.
for prov_id in removed_providers:
# clean cache items from deleted provider(s)
- self.mass.cache.clear(prov_id)
+ await self.mass.cache.clear(prov_id)
# cleanup media items from db matched to deleted provider
for ctrl in (
- self.mass.music.artists,
- self.mass.music.albums,
- self.mass.music.tracks,
+ # order is important here to recursively cleanup bottom up
self.mass.music.radio,
self.mass.music.playlists,
+ self.mass.music.tracks,
+ self.mass.music.albums,
+ self.mass.music.artists,
):
prov_items = await ctrl.get_db_items_by_prov_id(provider_id=prov_id)
for item in prov_items:
from __future__ import annotations
import asyncio
-import itertools
-from typing import Any, Dict, List, Optional, Union
+from typing import List, Optional, Union
from music_assistant.helpers.compare import compare_album, compare_artist
from music_assistant.helpers.database import TABLE_ALBUMS, TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
from music_assistant.helpers.tags import FALLBACK_ARTIST
-from music_assistant.models.enums import MusicProviderFeature, ProviderType
+from music_assistant.models.enums import EventType, MusicProviderFeature, ProviderType
+from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
Album,
provider_id: Optional[str] = None,
) -> List[Track]:
"""Return album tracks for the given provider album id."""
- # if provider specific album is requested, return that directly
+
if not (provider == ProviderType.DATABASE or provider_id == "database"):
- return await self.get_provider_album_tracks(
- item_id, provider=provider, provider_id=provider_id
- )
+ # return provider album tracks
+ return await self._get_provider_album_tracks(item_id, provider, provider_id)
- # get results from all providers
- db_album = await self.get_db_item(item_id)
- coros = [
- self.get_provider_album_tracks(
- item.item_id, item.prov_type, cache_checksum=db_album.metadata.checksum
- )
- for item in db_album.provider_ids
- ]
- tracks = itertools.chain.from_iterable(await asyncio.gather(*coros))
- # merge duplicates using a dict
- final_items: Dict[str, Track] = {}
- for track in tracks:
- key = f".{track.name.lower()}.{track.disc_number}.{track.track_number}"
- if key in final_items:
- final_items[key].provider_ids.update(track.provider_ids)
- else:
- track.album = db_album
- final_items[key] = track
- return list(final_items.values())
+ # db_album requested: get results from first (non-file) provider
+ return await self._get_db_album_tracks(item_id)
async def versions(
self,
db_item = await self.get_db_item(db_item.item_id)
return db_item
- async def get_provider_album_tracks(
+ async def _get_provider_album_tracks(
self,
item_id: str,
provider: Optional[ProviderType] = None,
provider_id: Optional[str] = None,
- cache_checksum: Any = None,
) -> List[Track]:
"""Return album tracks for the given provider album id."""
prov = self.mass.music.get_provider(provider_id or provider)
if not prov:
return []
- # prefer cache items (if any) - do not use cache for filesystem
- cache_key = f"{prov.type.value}.album_tracks.{item_id}"
+ full_album = await self.get(item_id, provider, provider_id)
+ # prefer cache items (if any)
+ cache_key = f"{prov.type.value}.albumtracks.{item_id}"
+ cache_checksum = full_album.metadata.checksum
if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
return [Track.from_dict(x) for x in cache]
# no items in cache - get listing from provider
- items = await prov.get_album_tracks(item_id)
+ items = []
+ for track in await prov.get_album_tracks(item_id):
+ # make sure that the (full) album is stored on the tracks
+ track.album = full_album
+ if full_album.metadata.images:
+ track.metadata.images = full_album.metadata.images
+ items.append(track)
# store (serializable items) in cache
self.mass.create_task(
self.mass.cache.set(
)
return items
+ async def _get_db_album_tracks(
+ self,
+ item_id: str,
+ ) -> List[Track]:
+ """Return in-database album tracks for the given database album."""
+ album_tracks = []
+ db_album = await self.get_db_item(item_id)
+ # combine the info we have in the db with the full listing from a streaming provider
+ for prov in db_album.provider_ids:
+ for prov_track in await self._get_provider_album_tracks(
+ prov.item_id, prov.prov_type, prov.prov_id
+ ):
+ if db_track := await self.mass.music.tracks.get_db_item_by_prov_id(
+ prov_track.item_id, prov_track.provider
+ ):
+ if album_mapping := next(
+ (x for x in db_track.albums if x.item_id == db_album.item_id),
+ None,
+ ):
+ db_track.disc_number = album_mapping.disc_number
+ db_track.track_number = album_mapping.track_number
+ prov_track = db_track
+ # make sure that the (db) album is stored on the tracks
+ prov_track.album = db_album
+ prov_track.metadata.images = db_album.metadata.images
+ album_tracks.append(prov_track)
+ # once we have the details from one streaming provider,
+ # there is no need to iterate them all (if there are multiple)
+ # for the same album
+ if not prov.prov_type.is_file():
+ break
+
+ return album_tracks
+
async def add_db_item(self, item: Album, overwrite_existing: bool = False) -> Album:
"""Add a new record to the database."""
assert item.provider_ids, f"Album {item.name} is missing provider id(s)"
assert item.artist, f"Album {item.name} is missing artist"
- cur_item = None
- # always try to grab existing item by musicbrainz_id/upc
- if item.musicbrainz_id:
- match = {"musicbrainz_id": item.musicbrainz_id}
- cur_item = await self.mass.database.get_row(self.db_table, match)
- if not cur_item and item.upc:
- match = {"upc": item.upc}
- cur_item = await self.mass.database.get_row(self.db_table, match)
- if not cur_item:
- # fallback to search and match
- for row in await self.mass.database.search(self.db_table, item.name):
- row_album = Album.from_db_row(row)
- if compare_album(row_album, item):
- cur_item = row_album
- break
- if cur_item:
- # update existing
- return await self.update_db_item(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
+ async with self._db_add_lock:
+ cur_item = None
+ # always try to grab existing item by musicbrainz_id/upc
+ if item.musicbrainz_id:
+ match = {"musicbrainz_id": item.musicbrainz_id}
+ cur_item = await self.mass.database.get_row(self.db_table, match)
+ if not cur_item and item.upc:
+ match = {"upc": item.upc}
+ cur_item = await self.mass.database.get_row(self.db_table, match)
+ if not cur_item:
+ # fallback to search and match
+ for row in await self.mass.database.search(self.db_table, item.name):
+ row_album = Album.from_db_row(row)
+ if compare_album(row_album, item):
+ cur_item = row_album
+ break
+ if cur_item:
+ # update existing
+ return await self.update_db_item(
+ cur_item.item_id, item, overwrite=overwrite_existing
+ )
- # insert new item
- album_artists = await self._get_album_artists(item, cur_item)
- if album_artists:
- sort_artist = album_artists[0].sort_name
- else:
- sort_artist = ""
- new_item = await self.mass.database.insert(
- self.db_table,
- {
- **item.to_db_row(),
- "artists": json_serializer(album_artists) or None,
- "sort_artist": sort_artist,
- },
- )
- item_id = new_item["item_id"]
- self.logger.debug("added %s to database", item.name)
- # return created object
- db_item = await self.get_db_item(item_id)
- return db_item
+ # insert new item
+ album_artists = await self._get_album_artists(item, cur_item)
+ if album_artists:
+ sort_artist = album_artists[0].sort_name
+ else:
+ sort_artist = ""
+ new_item = await self.mass.database.insert(
+ self.db_table,
+ {
+ **item.to_db_row(),
+ "artists": json_serializer(album_artists) or None,
+ "sort_artist": sort_artist,
+ },
+ )
+ item_id = new_item["item_id"]
+ self.logger.debug("added %s to database", item.name)
+ # return created object
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_ADDED, db_item.uri, db_item)
+ )
+ return db_item
async def update_db_item(
self,
)
self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
+ )
return db_item
async def delete_db_item(self, item_id: int, recursive: bool = False) -> None:
"""Delete record from the database."""
-
# check album tracks
- db_rows = await self.mass.music.tracks.get_db_items_by_query(
- f"SELECT item_id FROM {TABLE_TRACKS} WHERE albums LIKE '%\"{item_id}\"%'"
+ db_rows = await self.mass.database.get_rows_from_query(
+ f"SELECT item_id FROM {TABLE_TRACKS} WHERE albums LIKE '%\"{item_id}\"%'",
+ limit=5000,
)
assert not (db_rows and not recursive), "Tracks attached to album"
for db_row in db_rows:
# delete the album itself from db
await super().delete_db_item(item_id)
- self.logger.debug("deleted item with id %s from database", item_id)
-
async def _match(self, db_album: Album) -> None:
"""
Try to find matching album on all providers for the provided (database) album.
from music_assistant.helpers.database import TABLE_ALBUMS, TABLE_ARTISTS, TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
-from music_assistant.models.enums import MusicProviderFeature, ProviderType
+from music_assistant.models.enums import EventType, MusicProviderFeature, ProviderType
+from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
Album,
) -> Artist:
"""Add a new item record to the database."""
assert item.provider_ids, "Artist is missing provider id(s)"
- # always try to grab existing item by musicbrainz_id
- cur_item = None
- if item.musicbrainz_id:
- match = {"musicbrainz_id": item.musicbrainz_id}
- cur_item = await self.mass.database.get_row(self.db_table, match)
- if not cur_item:
- # fallback to exact name match
- # NOTE: we match an artist by name which could theoretically lead to collisions
- # but the chance is so small it is not worth the additional overhead of grabbing
- # the musicbrainz id upfront
- match = {"sort_name": item.sort_name}
- for row in await self.mass.database.get_rows(self.db_table, match):
- row_artist = Artist.from_db_row(row)
- if row_artist.sort_name == item.sort_name:
- cur_item = row_artist
- break
- if cur_item:
- # update existing
- return await self.update_db_item(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
+ async with self._db_add_lock:
+ # always try to grab existing item by musicbrainz_id
+ cur_item = None
+ if item.musicbrainz_id:
+ match = {"musicbrainz_id": item.musicbrainz_id}
+ cur_item = await self.mass.database.get_row(self.db_table, match)
+ if not cur_item:
+ # fallback to exact name match
+ # NOTE: we match an artist by name which could theoretically lead to collisions
+ # but the chance is so small it is not worth the additional overhead of grabbing
+ # the musicbrainz id upfront
+ match = {"sort_name": item.sort_name}
+ for row in await self.mass.database.get_rows(self.db_table, match):
+ row_artist = Artist.from_db_row(row)
+ if row_artist.sort_name == item.sort_name:
+ cur_item = row_artist
+ break
+ if cur_item:
+ # update existing
+ return await self.update_db_item(
+ cur_item.item_id, item, overwrite=overwrite_existing
+ )
- # insert item
- if item.in_library and not item.timestamp:
- item.timestamp = int(time())
- new_item = await self.mass.database.insert(self.db_table, item.to_db_row())
- item_id = new_item["item_id"]
- self.logger.debug("added %s to database", item.name)
- # return created object
- db_item = await self.get_db_item(item_id)
- return db_item
+ # insert item
+ if item.in_library and not item.timestamp:
+ item.timestamp = int(time())
+ new_item = await self.mass.database.insert(self.db_table, item.to_db_row())
+ item_id = new_item["item_id"]
+ self.logger.debug("added %s to database", item.name)
+ # return created object
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_ADDED, db_item.uri, db_item)
+ )
+ return db_item
async def update_db_item(
self,
)
self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
+ )
return db_item
async def delete_db_item(self, item_id: int, recursive: bool = False) -> None:
"""Delete record from the database."""
-
# check artist albums
- db_rows = await self.mass.music.albums.get_db_items_by_query(
- f"SELECT item_id FROM {TABLE_ALBUMS} WHERE artists LIKE '%\"{item_id}\"%'"
+ db_rows = await self.mass.database.get_rows_from_query(
+ f"SELECT item_id FROM {TABLE_ALBUMS} WHERE artists LIKE '%\"{item_id}\"%'",
+ limit=5000,
)
assert not (db_rows and not recursive), "Albums attached to artist"
for db_row in db_rows:
await self.mass.music.albums.delete_db_item(db_row["item_id"], recursive)
# check artist tracks
- db_rows = await self.mass.music.tracks.get_db_items_by_query(
- f"SELECT item_id FROM {TABLE_TRACKS} WHERE artists LIKE '%\"{item_id}\"%'"
+ db_rows = await self.mass.database.get_rows_from_query(
+ f"SELECT item_id FROM {TABLE_TRACKS} WHERE artists LIKE '%\"{item_id}\"%'",
+ limit=5000,
)
assert not (db_rows and not recursive), "Tracks attached to artist"
for db_row in db_rows:
# delete the artist itself from db
await super().delete_db_item(item_id)
- self.logger.debug("deleted item with id %s from database", item_id)
-
async def _match(self, db_artist: Artist, provider: MusicProvider) -> bool:
"""Try to find matching artists on given provider for the provided (database) artist."""
self.logger.debug(
"""Manage MediaItems of type Playlist."""
from __future__ import annotations
+from ctypes import Union
from time import time
-from typing import Any, List, Optional
+from typing import Any, List, Optional, Tuple
from music_assistant.helpers.database import TABLE_PLAYLISTS
from music_assistant.helpers.json import json_serializer
from music_assistant.helpers.uri import create_uri
-from music_assistant.models.enums import MediaType, ProviderType
-from music_assistant.models.errors import InvalidDataError, MediaNotFoundError
+from music_assistant.models.enums import (
+ EventType,
+ MediaType,
+ MusicProviderFeature,
+ ProviderType,
+)
+from music_assistant.models.errors import (
+ InvalidDataError,
+ MediaNotFoundError,
+ ProviderUnavailableError,
+)
+from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import Playlist, Track
prov = self.mass.music.get_provider(provider_id or provider)
if not prov:
return []
- # prefer cache items (if any) - do not use cache for filesystem
- cache_key = f"{prov.type.value}.playlist.{item_id}.tracks"
+ # prefer cache items (if any)
+ cache_key = f"{prov.id}.playlist.{item_id}.tracks"
if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
return [Track.from_dict(x) for x in cache]
# no items in cache - get listing from provider
- items = []
- for index, playlist_track in enumerate(await prov.get_playlist_tracks(item_id)):
- # make sure we have a position set on the track
- if not playlist_track.position:
- playlist_track.position = index
- items.append(playlist_track)
+ items = await prov.get_playlist_tracks(item_id)
+ # double check if position set
+ if items:
+ assert (
+ items[0].position is not None
+ ), "Playlist items require position to be set"
# store (serializable items) in cache
self.mass.create_task(
self.mass.cache.set(
await self.mass.metadata.get_playlist_metadata(item)
return await self.add_db_item(item, overwrite_existing)
+ async def create(
+ self, name: str, prov_id: Union[ProviderType, str, None] = None
+ ) -> Playlist:
+ """Create new playlist."""
+ # if prov_id is omitted, prefer file
+ if prov_id:
+ provider = self.mass.music.get_provider(prov_id)
+ else:
+ try:
+ provider = self.mass.music.get_provider(ProviderType.FILESYSTEM_LOCAL)
+ except ProviderUnavailableError:
+ provider = next(
+ (
+ x
+ for x in self.mass.music.providers
+ if MusicProviderFeature.PLAYLIST_CREATE in x.supported_features
+ ),
+ None,
+ )
+ if provider is None:
+ raise ProviderUnavailableError(
+ "No provider available which allows playlists creation."
+ )
+
+ return await provider.create_playlist(name)
+
async def add_playlist_tracks(self, db_playlist_id: str, uris: List[str]) -> None:
"""Add multiple tracks to playlist. Creates background tasks to process the action."""
playlist = await self.get_db_item(db_playlist_id)
# actually add the tracks to the playlist on the provider
provider = self.mass.music.get_provider(playlist_prov.prov_id)
await provider.add_playlist_tracks(playlist_prov.item_id, [track_id_to_add])
+ # invalidate cache by updating the checksum
+ await self.get(
+ db_playlist_id, provider=ProviderType.DATABASE, force_refresh=True
+ )
async def remove_playlist_tracks(
- self, db_playlist_id: str, positions: List[int]
+ self, db_playlist_id: str, positions_to_remove: Tuple[int]
) -> None:
"""Remove multiple tracks from playlist."""
playlist = await self.get_db_item(db_playlist_id)
if not playlist.is_editable:
raise InvalidDataError(f"Playlist {playlist.name} is not editable")
for prov in playlist.provider_ids:
- track_ids_to_remove = []
- for playlist_track in await self.tracks(prov.item_id, prov.prov_type):
- if playlist_track.position not in positions:
- continue
- track_ids_to_remove.append(playlist_track.item_id)
- # actually remove the tracks from the playlist on the provider
- # TODO: send positions to provider to delete
- if track_ids_to_remove:
- provider = self.mass.music.get_provider(prov.prov_id)
- await provider.remove_playlist_tracks(prov.item_id, track_ids_to_remove)
+ provider = self.mass.music.get_provider(prov.prov_id)
+ if (
+ MusicProviderFeature.PLAYLIST_TRACKS_EDIT
+ not in provider.supported_features
+ ):
+ self.logger.warning(
+ "Provider %s does not support editing playlists",
+ prov.prov_type.value,
+ )
+ continue
+ await provider.remove_playlist_tracks(prov.item_id, positions_to_remove)
+ # invalidate cache by updating the checksum
+ await self.get(
+ db_playlist_id, provider=ProviderType.DATABASE, force_refresh=True
+ )
async def add_db_item(
self, item: Playlist, overwrite_existing: bool = False
) -> Playlist:
"""Add a new record to the database."""
- match = {"name": item.name, "owner": item.owner}
- if cur_item := await self.mass.database.get_row(self.db_table, match):
- # update existing
- return await self.update_db_item(
- cur_item["item_id"], item, overwrite=overwrite_existing
- )
+ async with self._db_add_lock:
+ match = {"name": item.name, "owner": item.owner}
+ if cur_item := await self.mass.database.get_row(self.db_table, match):
+ # update existing
+ return await self.update_db_item(
+ cur_item["item_id"], item, overwrite=overwrite_existing
+ )
- # insert new item
- new_item = await self.mass.database.insert(self.db_table, item.to_db_row())
- item_id = new_item["item_id"]
- self.logger.debug("added %s to database", item.name)
- # return created object
- db_item = await self.get_db_item(item_id)
- return db_item
+ # insert new item
+ new_item = await self.mass.database.insert(self.db_table, item.to_db_row())
+ item_id = new_item["item_id"]
+ self.logger.debug("added %s to database", item.name)
+ # return created object
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_ADDED, db_item.uri, db_item)
+ )
+ return db_item
async def update_db_item(
self,
},
)
self.logger.debug("updated %s in database: %s", item.name, item_id)
- return await self.get_db_item(item_id)
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
+ )
+ return db_item
from music_assistant.helpers.database import TABLE_RADIOS
from music_assistant.helpers.json import json_serializer
-from music_assistant.models.enums import MediaType
+from music_assistant.models.enums import EventType, MediaType
+from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import Radio
async def add_db_item(self, item: Radio, overwrite_existing: bool = False) -> Radio:
"""Add a new item record to the database."""
assert item.provider_ids
- match = {"name": item.name}
- if cur_item := await self.mass.database.get_row(self.db_table, match):
- # update existing
- return await self.update_db_item(
- cur_item["item_id"], item, overwrite=overwrite_existing
- )
+ async with self._db_add_lock:
+ match = {"name": item.name}
+ if cur_item := await self.mass.database.get_row(self.db_table, match):
+ # update existing
+ return await self.update_db_item(
+ cur_item["item_id"], item, overwrite=overwrite_existing
+ )
- # insert new item
- new_item = await self.mass.database.insert(self.db_table, item.to_db_row())
- item_id = new_item["item_id"]
- self.logger.debug("added %s to database", item.name)
- # return created object
- db_item = await self.get_db_item(item_id)
- return db_item
+ # insert new item
+ new_item = await self.mass.database.insert(self.db_table, item.to_db_row())
+ item_id = new_item["item_id"]
+ self.logger.debug("added %s to database", item.name)
+ # return created object
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_ADDED, db_item.uri, db_item)
+ )
+ return db_item
async def update_db_item(
self,
)
self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
+ )
return db_item
from music_assistant.helpers.compare import compare_artists, compare_track
from music_assistant.helpers.database import TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
-from music_assistant.models.enums import MediaType, MusicProviderFeature, ProviderType
+from music_assistant.models.enums import (
+ EventType,
+ MediaType,
+ MusicProviderFeature,
+ ProviderType,
+)
+from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
Album,
"""Add a new item record to the database."""
assert item.artists, "Track is missing artist(s)"
assert item.provider_ids, "Track is missing provider id(s)"
- cur_item = None
+ async with self._db_add_lock:
+ cur_item = None
- # always try to grab existing item by external_id
- if item.musicbrainz_id:
- match = {"musicbrainz_id": item.musicbrainz_id}
- cur_item = await self.mass.database.get_row(self.db_table, match)
- for isrc in item.isrcs:
- match = {"isrc": isrc}
- cur_item = await self.mass.database.get_row(self.db_table, match)
- if not cur_item:
- # fallback to matching
- match = {"sort_name": item.sort_name}
- for row in await self.mass.database.get_rows(self.db_table, match):
- row_track = Track.from_db_row(row)
- if compare_track(row_track, item):
- cur_item = row_track
- break
- if cur_item:
- # update existing
- return await self.update_db_item(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
+ # always try to grab existing item by external_id
+ if item.musicbrainz_id:
+ match = {"musicbrainz_id": item.musicbrainz_id}
+ cur_item = await self.mass.database.get_row(self.db_table, match)
+ for isrc in item.isrcs:
+ match = {"isrc": isrc}
+ cur_item = await self.mass.database.get_row(self.db_table, match)
+ if not cur_item:
+ # fallback to matching
+ match = {"sort_name": item.sort_name}
+ for row in await self.mass.database.get_rows(self.db_table, match):
+ row_track = Track.from_db_row(row)
+ if compare_track(row_track, item):
+ cur_item = row_track
+ break
+ if cur_item:
+ # update existing
+ return await self.update_db_item(
+ cur_item.item_id, item, overwrite=overwrite_existing
+ )
- # no existing match found: insert new item
- track_artists = await self._get_track_artists(item)
- track_albums = await self._get_track_albums(item, overwrite=overwrite_existing)
- if track_artists:
- sort_artist = track_artists[0].sort_name
- else:
- sort_artist = ""
- if track_albums:
- sort_album = track_albums[0].sort_name
- else:
- sort_album = ""
- new_item = await self.mass.database.insert(
- self.db_table,
- {
- **item.to_db_row(),
- "artists": json_serializer(track_artists),
- "albums": json_serializer(track_albums),
- "sort_artist": sort_artist,
- "sort_album": sort_album,
- },
- )
- item_id = new_item["item_id"]
- # return created object
- self.logger.debug("added %s to database: %s", item.name, item_id)
- db_item = await self.get_db_item(item_id)
- return db_item
+ # no existing match found: insert new item
+ track_artists = await self._get_track_artists(item)
+ track_albums = await self._get_track_albums(
+ item, overwrite=overwrite_existing
+ )
+ if track_artists:
+ sort_artist = track_artists[0].sort_name
+ else:
+ sort_artist = ""
+ if track_albums:
+ sort_album = track_albums[0].sort_name
+ else:
+ sort_album = ""
+ new_item = await self.mass.database.insert(
+ self.db_table,
+ {
+ **item.to_db_row(),
+ "artists": json_serializer(track_artists),
+ "albums": json_serializer(track_albums),
+ "sort_artist": sort_artist,
+ "sort_album": sort_album,
+ },
+ )
+ item_id = new_item["item_id"]
+ # return created object
+ self.logger.debug("added %s to database: %s", item.name, item_id)
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_ADDED, db_item.uri, db_item)
+ )
+ return db_item
async def update_db_item(
self,
)
self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
+ )
return db_item
async def _get_track_artists(
seconds_streamed = bytes_written / sample_size_per_second
seconds_in_buffer = len(buffer) / sample_size_per_second
+ queue_track.streamdetails.seconds_streamed = seconds_streamed
#### HANDLE FIRST PART OF TRACK
def compare_explicit(left: MediaItemMetadata, right: MediaItemMetadata) -> bool:
"""Compare if explicit is same in metadata."""
- if left.explicit is None and right.explicit is None:
+ if left.explicit is None or right.explicit is None:
+ # explicitness info is not always present in metadata
+ # only strict compare them if both have the info set
return True
return left == right
if left_artist is None or right_artist is None:
return False
# return early on exact item_id match
- if compare_item_id(left_artist, right_artist):
+ if compare_item_ids(left_artist, right_artist):
return True
# prefer match on musicbrainz_id
return len(left_artists) == matches
-def compare_item_id(
+def compare_item_ids(
left_item: Union[MediaItem, ItemMapping], right_item: Union[MediaItem, ItemMapping]
) -> bool:
- """Compare two lists of artist and return True if both lists match."""
+ """Compare item_id(s) of two media items."""
if (
left_item.provider == right_item.provider
and left_item.item_id == right_item.item_id
):
return True
- if not hasattr(left_item, "provider_ids") or not hasattr(
- right_item, "provider_ids"
- ):
- return False
- for prov_l in left_item.provider_ids:
+ left_prov_ids = getattr(left_item, "provider_ids", None)
+ right_prov_ids = getattr(right_item, "provider_ids", None)
+
+ if left_prov_ids is not None:
+ for prov_l in left_item.provider_ids:
+ if (
+ prov_l.prov_type == right_item.provider
+ and prov_l.item_id == right_item.item_id
+ ):
+ return True
+
+ if right_prov_ids is not None:
for prov_r in right_item.provider_ids:
- if prov_l.prov_type != prov_r.prov_type:
- continue
- if prov_l.item_id == prov_r.item_id:
+ if (
+ prov_r.prov_type == left_item.provider
+ and prov_r.item_id == left_item.item_id
+ ):
return True
+
+ if left_prov_ids is not None and right_prov_ids is not None:
+ for prov_l in left_item.provider_ids:
+ for prov_r in right_item.provider_ids:
+ if prov_l.prov_type != prov_r.prov_type:
+ continue
+ if prov_l.item_id == prov_r.item_id:
+ return True
return False
if left_album is None or right_album is None:
return False
# return early on exact item_id match
- if compare_item_id(left_album, right_album):
+ if compare_item_ids(left_album, right_album):
return True
# prefer match on UPC
if left_track is None or right_track is None:
return False
# return early on exact item_id match
- if compare_item_id(left_track, right_track):
+ if compare_item_ids(left_track, right_track):
return True
for left_isrc in left_track.isrcs:
for right_isrc in right_track.isrcs:
TABLE_SETTINGS, {"key": key, "value": value}, allow_replace=True
)
- async def get_count(
- self,
- table: str,
- match: dict = None,
- ) -> int:
- """Get row count for given table/query."""
- sql_query = f"SELECT count() FROM {table}"
- if match is not None:
- sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
- if res := await self._db.fetch_one(sql_query, match):
- return res["count()"]
- return 0
-
async def get_rows(
self,
table: str,
query = f"{query} LIMIT {limit} OFFSET {offset}"
return await self._db.fetch_all(query, params)
+ async def get_count_from_query(
+ self,
+ query: str,
+ params: Optional[dict] = None,
+ ) -> int:
+ """Get row count for given custom query."""
+ query = query.split("from", 1)[-1].split("FROM", 1)[-1]
+ query = f"SELECT count() FROM {query}"
+ if result := await self._db.fetch_one(query, params):
+ return result[0]
+ return 0
+
async def search(
self, table: str, search: str, column: str = "name"
) -> List[Mapping]:
self, table: str, match: Optional[dict] = None, query: Optional[str] = None
) -> None:
"""Delete data in given table."""
- assert "where" not in query.lower()
+ assert not (query and "where" in query.lower())
sql_query = f"DELETE FROM {table} "
if match:
sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
task.set_name(next_job.name)
task.add_done_callback(partial(self.__job_done_cb, job=next_job))
self.signal_event(
- MassEvent(EventType.BACKGROUND_JOB_UPDATED, data=next_job)
+ MassEvent(
+ EventType.BACKGROUND_JOB_UPDATED, next_job.name, data=next_job
+ )
)
def __job_done_cb(self, task: asyncio.Task, job: BackgroundJob):
self._jobs_event.set()
# mark job as done
job.done()
- self.signal_event(MassEvent(EventType.BACKGROUND_JOB_UPDATED, data=job))
+ self.signal_event(
+ MassEvent(EventType.BACKGROUND_JOB_FINISHED, job.name, data=job)
+ )
async def __aenter__(self) -> "MusicAssistant":
"""Return Context manager."""
QUEUE_TIME_UPDATED = "queue_time_updated"
SHUTDOWN = "application_shutdown"
BACKGROUND_JOB_UPDATED = "background_job_updated"
+ BACKGROUND_JOB_FINISHED = "background_job_finished"
+ MEDIA_ITEM_ADDED = "media_item_added"
+ MEDIA_ITEM_UPDATED = "media_item_updated"
+ MEDIA_ITEM_DELETED = "media_item_deleted"
class JobStatus(Enum):
"""Model for a base media_controller."""
from __future__ import annotations
+import asyncio
from abc import ABCMeta, abstractmethod
from time import time
from typing import (
Optional,
Tuple,
TypeVar,
+ Union,
)
from music_assistant.models.errors import MediaNotFoundError
+from music_assistant.models.event import MassEvent
-from .enums import MediaType, ProviderType
-from .media_items import MediaItemType, media_from_dict
+from .enums import EventType, MediaType, MusicProviderFeature, ProviderType
+from .media_items import MediaItemType, PagedItems, media_from_dict
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
"""Initialize class."""
self.mass = mass
self.logger = mass.logger.getChild(f"music.{self.media_type.value}")
+ self._db_add_lock = asyncio.Lock()
@abstractmethod
async def add(self, item: ItemCls, overwrite_existing: bool = False) -> ItemCls:
limit: int = 500,
offset: int = 0,
order_by: str = "sort_name",
- ) -> List[ItemCls]:
+ ) -> PagedItems:
"""Get in-database items."""
sql_query = f"SELECT * FROM {self.db_table}"
params = {}
if query_parts:
sql_query += " WHERE " + " AND ".join(query_parts)
sql_query += f" ORDER BY {order_by}"
- return await self.get_db_items_by_query(
+ items = await self.get_db_items_by_query(
sql_query, params, limit=limit, offset=offset
)
+ count = len(items)
+ if count < limit:
+ total = offset + count
+ else:
+ total = await self.mass.database.get_count_from_query(sql_query, params)
+ return PagedItems(items, count, limit, offset, total)
async def iter_db_items(
self,
offset=offset,
order_by=order_by,
)
- for item in next_items:
+ for item in next_items.items:
yield item
- if len(next_items) < limit:
+ if next_items.count < limit:
break
offset += limit
- async def count(self, in_library: Optional[bool] = None) -> int:
- """Return number of in-library items for this MediaType."""
- if in_library is not None:
- match = {"in_library": in_library}
- else:
- match = None
- return await self.mass.database.get_count(self.db_table, match)
-
async def get(
self,
provider_item_id: str,
]
prov = self.mass.music.get_provider(provider_id or provider)
- if not prov:
- return {}
+ if not prov or MusicProviderFeature.SEARCH not in prov.supported_features:
+ return []
# prefer cache items (if any)
cache_key = (
f"{prov.type.value}.search.{self.media_type.value}.{search_query}.{limit}"
)
- if not prov.type.is_file(): # do not cache filesystem results
- if cache := await self.mass.cache.get(cache_key):
- return [media_from_dict(x) for x in cache]
+ if cache := await self.mass.cache.get(cache_key):
+ return [media_from_dict(x) for x in cache]
# no items in cache - get listing from provider
items = await prov.search(
search_query,
limit,
)
# store (serializable items) in cache
- self.mass.create_task(
- self.mass.cache.set(
- cache_key, [x.to_dict() for x in items], expiration=86400 * 7
+ if not prov.type.is_file(): # do not cache filesystem results
+ self.mass.create_task(
+ self.mass.cache.set(
+ cache_key, [x.to_dict() for x in items], expiration=86400 * 7
+ )
)
- )
return items
async def add_to_library(
provider_id: Optional[str] = None,
) -> None:
"""Add an item to the library."""
- # make sure we have a valid full item
- # note that we set 'lazy' to False because we need a full db item
- db_item = await self.get(
- provider_item_id, provider=provider, provider_id=provider_id, lazy=False
+ prov_item = await self.get_db_item_by_prov_id(
+ provider_item_id, provider=provider, provider_id=provider_id
)
- # add to provider libraries
- for prov_id in db_item.provider_ids:
+ if prov_item is None:
+ prov_item = await self.get_provider_item(
+ provider_item_id, provider_id or provider
+ )
+ if prov_item.in_library is True:
+ return
+ # mark as favorite/library item on provider(s)
+ for prov_id in prov_item.provider_ids:
if prov := self.mass.music.get_provider(prov_id.prov_id):
await prov.library_add(prov_id.item_id, self.media_type)
- # mark as library item in internal db
- if not db_item.in_library:
- db_item.in_library = True
- await self.set_db_library(db_item.item_id, True)
+ # mark as library item in internal db if db item
+ if prov_item.provider == ProviderType.DATABASE:
+ if not prov_item.in_library:
+ prov_item.in_library = True
+ await self.set_db_library(prov_item.item_id, True)
async def remove_from_library(
self,
provider_id: Optional[str] = None,
) -> None:
"""Remove item from the library."""
- # make sure we have a valid full item
- # note that we set 'lazy' to False because we need a full db item
- db_item = await self.get(
- provider_item_id, provider=provider, provider_id=provider_id, lazy=False
+ prov_item = await self.get_db_item_by_prov_id(
+ provider_item_id, provider=provider, provider_id=provider_id
)
- # remove from provider's libraries
- for prov_id in db_item.provider_ids:
+ if prov_item is None:
+ prov_item = await self.get_provider_item(
+ provider_item_id, provider_id or provider
+ )
+ if prov_item.in_library is False:
+ return
+ # unmark as favorite/library item on provider(s)
+ for prov_id in prov_item.provider_ids:
if prov := self.mass.music.get_provider(prov_id.prov_id):
await prov.library_remove(prov_id.item_id, self.media_type)
- # unmark as library item in internal db
- if db_item.in_library:
- db_item.in_library = False
- await self.set_db_library(db_item.item_id, False)
+ # unmark as library item in internal db if db item
+ if prov_item.provider == ProviderType.DATABASE:
+ prov_item.in_library = False
+ await self.set_db_library(prov_item.item_id, False)
async def get_provider_id(self, item: ItemCls) -> Tuple[str, str]:
"""Return provider and item id."""
)
]
- async def get_db_item(self, item_id: int) -> ItemCls:
+ async def get_db_item(self, item_id: Union[int, str]) -> ItemCls:
"""Get record by id."""
match = {"item_id": int(item_id)}
if db_row := await self.mass.database.get_row(self.db_table, match):
await self.mass.database.update(
self.db_table, match, {"in_library": in_library, "timestamp": timestamp}
)
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
+ )
async def get_provider_item(
self,
item_id: str,
- provider_id: str,
+ provider_id: Union[str, ProviderType],
) -> ItemCls:
"""Return item details for the given provider item id."""
- if provider_id == "database":
+ if provider_id in ("database", ProviderType.DATABASE):
item = await self.get_db_item(item_id)
else:
provider = self.mass.music.get_provider(provider_id)
}
if not db_item.provider_ids:
# item has no more provider_ids left, it is completely deleted
- await self.delete_db_item(db_item.item_id)
+ try:
+ await self.delete_db_item(db_item.item_id)
+ except AssertionError:
+ self.logger.debug(
+ "Could not delete %s: it has items attached", db_item.item_id
+ )
return
await self.update_db_item(db_item.item_id, db_item, overwrite=True)
async def delete_db_item(self, item_id: int, recursive: bool = False) -> None:
"""Delete record from the database."""
+ db_item = await self.get_db_item(item_id)
+ assert db_item, f"Item does not exist: {item_id}"
# delete item
await self.mass.database.delete(
self.db_table,
{"item_id": int(item_id)},
)
- # NOTE: this does not delete any references to this item in other records!
+ # NOTE: this does not delete any references to this item in other records,
+ # this is handled/overridden in the mediatype specific controllers
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_DELETED, db_item.uri, db_item)
+ )
self.logger.debug("deleted item with id %s from database", item_id)
"""Representation of a Folder used in Browse (which contains media items)."""
media_type: MediaType = MediaType.FOLDER
+ # path: the path (in uri style) to/for this browse folder
+ path: str = ""
# label: a labelid that needs to be translated by the frontend
label: str = ""
- # items (max 25) to provide in recommendation listings
- items: Optional[List[MediaItemType]] = None
+ # subitems of this folder when expanding
+ items: Optional[List[Union[MediaItemType, BrowseFolder]]] = None
+
+ def __post_init__(self):
+ """Call after init."""
+ super().__post_init__()
+ if not self.path:
+ self.path = f"{self.provider}://{self.item_id}"
MediaItemType = Union[Artist, Album, Track, Radio, Playlist, BrowseFolder]
+@dataclass
+class PagedItems(DataClassDictMixin):
+ """Model for a paged listing."""
+
+ items: List[MediaItemType]
+ count: int
+ limit: int
+ offset: int
+ total: Optional[int] = None
+
+
def media_from_dict(media_item: dict) -> MediaItemType:
"""Return MediaItem from dict."""
if media_item["media_type"] == "artist":
raise NotImplementedError
async def remove_playlist_tracks(
- self, prov_playlist_id: str, prov_track_ids: List[str]
+ self, prov_playlist_id: str, positions_to_remove: Tuple[int]
) -> None:
"""Remove track(s) from playlist."""
if MusicProviderFeature.PLAYLIST_TRACKS_EDIT in self.supported_features:
raise NotImplementedError
- async def create_playlist(
- self, name: str, initial_items: Optional[List[Track]] = None
- ) -> Playlist:
+ async def create_playlist(self, name: str) -> Playlist:
"""Create a new playlist on provider with given name."""
raise NotImplementedError
return await self.get_radio(prov_item_id)
return await self.get_track(prov_item_id)
- async def browse(self, path: Optional[str] = None) -> List[MediaItemType]:
+ async def browse(self, path: str) -> BrowseFolder:
"""
Browse this provider's items.
- :param path: The path to browse, (e.g. artists) or None for root level.
+ :param path: The path to browse, (e.g. provid://artists).
"""
if MusicProviderFeature.BROWSE not in self.supported_features:
- # we may NOT use the default implementation if the browser does not support browse
+ # we may NOT use the default implementation if the provider does not support browse
raise NotImplementedError
+
+ _, subpath = path.split("://")
+
# this reference implementation can be overridden with provider specific approach
- if not path:
+ if not subpath:
# return main listing
root_items = []
if MusicProviderFeature.LIBRARY_ARTISTS in self.supported_features:
BrowseFolder(
item_id="artists",
provider=self.type,
+ path=path + "artists",
name="",
label="artists",
- uri=f"{self.type.value}://artists",
)
)
if MusicProviderFeature.LIBRARY_ALBUMS in self.supported_features:
BrowseFolder(
item_id="albums",
provider=self.type,
+ path=path + "albums",
name="",
label="albums",
- uri=f"{self.type.value}://albums",
)
)
if MusicProviderFeature.LIBRARY_TRACKS in self.supported_features:
BrowseFolder(
item_id="tracks",
provider=self.type,
+ path=path + "tracks",
name="",
label="tracks",
- uri=f"{self.type.value}://tracks",
)
)
if MusicProviderFeature.LIBRARY_PLAYLISTS in self.supported_features:
BrowseFolder(
item_id="playlists",
provider=self.type,
+ path=path + "playlists",
name="",
label="playlists",
- uri=f"{self.type.value}://playlists",
)
)
if MusicProviderFeature.LIBRARY_RADIOS in self.supported_features:
BrowseFolder(
item_id="radios",
provider=self.type,
+ path=path + "radios",
name="",
label="radios",
- uri=f"{self.type.value}://radios",
)
)
- return root_items
+ return BrowseFolder(
+ item_id="root",
+ provider=self.type,
+ path=path,
+ name="",
+ items=root_items,
+ )
# sublevel
- if path == "artists":
- return [x async for x in self.get_library_artists()]
- if path == "albums":
- return [x async for x in self.get_library_albums()]
- if path == "tracks":
- return [x async for x in self.get_library_tracks()]
- if path == "radios":
- return [x async for x in self.get_library_radios()]
- if path == "playlists":
- return [x async for x in self.get_library_playlists()]
+ if subpath == "artists":
+ return BrowseFolder(
+ item_id="artists",
+ provider=self.type,
+ path=path,
+ name="",
+ label="artists",
+ items=[x async for x in self.get_library_artists()],
+ )
+ if subpath == "albums":
+ return BrowseFolder(
+ item_id="albums",
+ provider=self.type,
+ path=path,
+ name="",
+ label="albums",
+ items=[x async for x in self.get_library_albums()],
+ )
+ if subpath == "tracks":
+ return BrowseFolder(
+ item_id="tracks",
+ provider=self.type,
+ path=path,
+ name="",
+ label="tracks",
+ items=[x async for x in self.get_library_tracks()],
+ )
+ if subpath == "radios":
+ return BrowseFolder(
+ item_id="radios",
+ provider=self.type,
+ path=path,
+ name="",
+ label="radios",
+ items=[x async for x in self.get_library_radios()],
+ )
+ if subpath == "playlists":
+ return BrowseFolder(
+ item_id="playlists",
+ provider=self.type,
+ path=path,
+ name="",
+ label="playlists",
+ items=[x async for x in self.get_library_playlists()],
+ )
async def recommendations(self) -> List[BrowseFolder]:
"""
if not db_item:
# dump the item in the db, rich metadata is lazy loaded later
db_item = await controller.add_db_item(prov_item)
+
elif (
db_item.metadata.checksum and prov_item.metadata.checksum
) and db_item.metadata.checksum != prov_item.metadata.checksum:
db_item = await controller.update_db_item(
db_item.item_id, prov_item
)
+ # preload album/playlist tracks
+ if prov_item.media_type == (MediaType.ALBUM, MediaType.PLAYLIST):
+ for track in controller.tracks(
+ prov_item.item_id, prov_item.provider
+ ):
+ await self.mass.music.tracks.add_db_item(track)
cur_db_ids.add(db_item.item_id)
if not db_item.in_library:
await controller.set_db_library(db_item.item_id, True)
}
def library_supported(self, media_type: MediaType) -> bool:
- """Return if Library is upported for given MediaType on this provider."""
+ """Return if Library is supported for given MediaType on this provider."""
if media_type == MediaType.ARTIST:
return MusicProviderFeature.LIBRARY_ARTISTS in self.supported_features
if media_type == MediaType.ALBUM:
MediaQuality,
MediaType,
Playlist,
+ Radio,
StreamDetails,
Track,
)
result += playlists
return result
- async def browse(self, path: Optional[str] = None) -> List[MediaItemType]:
+ async def browse(self, path: str) -> BrowseFolder:
"""
Browse this provider's items.
- :param path: The path to browse, (e.g. artists) or None for root level.
+ :param path: The path to browse, (e.g. provid://artists).
"""
- if not path:
- path = self.config.path
+ _, sub_path = path.split("://")
+ if not sub_path:
+ item_path = self.config.path
else:
- path = os.path.join(self.config.path, path)
- result = []
- for filename in await listdir(path):
- full_path: str = os.path.join(path, filename)
+ item_path = os.path.join(self.config.path, sub_path)
+ subitems = []
+ for filename in await listdir(item_path):
+ full_path: str = os.path.join(item_path, filename)
rel_path = full_path.replace(self.config.path + os.sep, "")
if await isdir(full_path):
- result.append(
+ subitems.append(
BrowseFolder(
item_id=rel_path,
provider=self.type,
+ path=f"{self.id}://{rel_path}",
name=filename,
- uri=f"{self.type.value}://{rel_path}",
)
)
- elif track := await self._parse_track(full_path):
- result.append(track)
- elif playlist := await self._parse_playlist(full_path):
- result.append(playlist)
- return result
+ continue
+
+ if "." not in filename or filename.startswith("."):
+ # skip system files and files without extension
+ continue
+
+ _, ext = filename.rsplit(".", 1)
+
+ if ext in TRACK_EXTENSIONS:
+ if track := await self._parse_track(full_path):
+ subitems.append(track)
+ continue
+ if ext in PLAYLIST_EXTENSIONS:
+ if playlist := await self._parse_playlist(full_path):
+ subitems.append(playlist)
+ continue
+
+ return BrowseFolder(
+ item_id=sub_path,
+ provider=self.type,
+ path=path,
+ name=path.split("://", 1)[-1],
+ items=subitems,
+ )
async def sync_library(
self, media_types: Optional[Tuple[MediaType]] = None
track.disc_number = album_mapping.disc_number
track.track_number = album_mapping.track_number
result.append(track)
- return sorted(result, key=lambda x: (x.disc_number, x.track_number))
+ return sorted(result, key=lambda x: (x.disc_number or 0, x.track_number or 0))
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get playlist tracks for given playlist id."""
if not await self.exists(playlist_path):
raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
playlist_base_path = Path(playlist_path).parent
- index = 0
try:
async with self.open_file(playlist_path, "r") as _file:
- for line in await _file.readlines():
+ for line_no, line in enumerate(await _file.readlines()):
line = urllib.parse.unquote(line.strip())
if line and not line.startswith("#"):
# TODO: add support for .pls playlist files
- if track := await self._parse_playlist_line(
+ if media_item := await self._parse_playlist_line(
line, playlist_base_path
):
- track.position = index
- result.append(track)
- index += 1
+ # use the linenumber as position for easier deletions
+ media_item.position = line_no
+ result.append(media_item)
except Exception as err: # pylint: disable=broad-except
self.logger.warning(
"Error while parsing playlist %s", playlist_path, exc_info=err
)
return result
- async def _parse_playlist_line(self, line: str, playlist_path: str) -> Track | None:
+ async def _parse_playlist_line(
+ self, line: str, playlist_path: str
+ ) -> Track | Radio | None:
"""Try to parse a track from a playlist line."""
try:
# try to treat uri as filename first
await _file.write(f"\n{uri}")
async def remove_playlist_tracks(
- self, prov_playlist_id: str, prov_track_ids: List[str]
+ self, prov_playlist_id: str, positions_to_remove: Tuple[int]
) -> None:
"""Remove track(s) from playlist."""
itempath = await self._get_filepath(MediaType.PLAYLIST, prov_playlist_id)
raise MediaNotFoundError(f"Playlist path does not exist: {itempath}")
cur_lines = []
async with self.open_file(itempath, "r") as _file:
- for line in await _file.readlines():
+ for line_no, line in enumerate(await _file.readlines()):
line = urllib.parse.unquote(line.strip())
- if line not in prov_track_ids:
+ if line_no not in positions_to_remove:
cur_lines.append(line)
async with self.open_file(itempath, "w") as _file:
for uri in cur_lines:
await _file.write(f"{uri}\n")
- async def create_playlist(
- self, name: str, initial_items: Optional[List[Track]] = None
- ) -> Playlist:
+ async def create_playlist(self, name: str) -> Playlist:
"""Create a new playlist on provider with given name."""
# creating a new playlist on the filesystem is as easy
# as creating a new (empty) file with the m3u extension...
- async with self.open_file(name, "w") as _file:
- for item in initial_items or []:
- await _file.write(item.uri + "\n")
+ filename = await self.resolve(f"{name}.m3u")
+ async with self.open_file(filename, "w") as _file:
await _file.write("\n")
+ playlist = await self._parse_playlist(filename)
+ db_playlist = await self.mass.music.playlists.add_db_item(playlist)
+ return db_playlist
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
"""Get all playlist tracks for given playlist id."""
- endpoint = "playlist/get"
- return [
- await self._parse_track(item)
- for item in await self._get_all_items(
- endpoint,
- key="tracks",
- playlist_id=prov_playlist_id,
- extra="tracks",
- )
- if (item and item["id"])
- ]
+ count = 0
+ result = []
+ for item in await self._get_all_items(
+ "playlist/get",
+ key="tracks",
+ playlist_id=prov_playlist_id,
+ extra="tracks",
+ ):
+ if not (item and item["id"]):
+ continue
+ track = await self._parse_track(item)
+ # use count as position
+ track.position = count
+ result.append(track)
+ count += 1
+ return result
async def get_artist_albums(self, prov_artist_id) -> List[Album]:
"""Get a list of albums for the given artist."""
)
async def remove_playlist_tracks(
- self, prov_playlist_id: str, prov_track_ids: List[str]
+ self, prov_playlist_id: str, positions_to_remove: Tuple[int]
) -> None:
"""Remove track(s) from playlist."""
playlist_track_ids = set()
- for track in await self._get_all_items(
- "playlist/get", key="tracks", playlist_id=prov_playlist_id, extra="tracks"
- ):
- if str(track["id"]) in prov_track_ids:
+ for track in await self.get_playlist_tracks(prov_playlist_id):
+ if track.position in positions_to_remove:
playlist_track_ids.add(str(track["playlist_track_id"]))
+ if len(playlist_track_ids) == positions_to_remove:
+ break
return await self._get_data(
"playlist/deleteTracks",
playlist_id=prov_playlist_id,
else:
raise MediaNotFoundError(f"Unsupported mime type for {item_id}")
# report playback started as soon as the streamdetails are requested
- self.mass.create_task(self._report_playback_started(item_id, streamdata))
+ self.mass.create_task(self._report_playback_started(streamdata))
return StreamDetails(
item_id=str(item_id),
provider=self.type,
callback=self._report_playback_stopped,
)
- async def _report_playback_started(self, item_id: str, streamdata: dict) -> None:
+ async def _report_playback_started(self, streamdata: dict) -> None:
"""Report playback start to qobuz."""
# TODO: need to figure out if the streamed track is purchased by user
# https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx
"sample": False,
"intent": "stream",
"device_id": device_id,
- "track_id": str(item_id),
+ "track_id": streamdata["track_id"],
"purchase": False,
"date": timestamp,
"credential_id": credential_id,
url, headers=headers, params=kwargs, verify_ssl=False
) as response:
try:
+ # make sure status is 200
+ assert response.status == 200
result = await response.json()
- if "error" in result or (
- "status" in result and "error" in result["status"]
- ):
- self.logger.error("%s - %s", endpoint, result)
- return None
+ # check for error in json
+ if error := result.get("error"):
+ raise ValueError(error)
+ if result.get("status") and "error" in result["status"]:
+ raise ValueError(result["status"])
except (
aiohttp.ContentTypeError,
JSONDecodeError,
+ AssertionError,
+ ValueError,
) as err:
- self.logger.error("%s - %s", endpoint, str(err))
+ text = await response.text()
+ self.logger.exception(
+ "Error while processing %s: %s", endpoint, text, exc_info=err
+ )
return None
return result
async with self.mass.http_session.post(
url, params=params, json=data, verify_ssl=False
) as response:
- result = await response.json()
- if "error" in result or (
- "status" in result and "error" in result["status"]
- ):
- self.logger.error("%s - %s", endpoint, result)
+ try:
+ result = await response.json()
+ # check for error in json
+ if error := result.get("error"):
+ raise ValueError(error)
+ if result.get("status") and "error" in result["status"]:
+ raise ValueError(result["status"])
+ except (
+ aiohttp.ContentTypeError,
+ JSONDecodeError,
+ AssertionError,
+ ValueError,
+ ) as err:
+ text = await response.text()
+ self.logger.exception(
+ "Error while processing %s: %s", endpoint, text, exc_info=err
+ )
return None
return result
async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
"""Get all playlist tracks for given playlist id."""
- return [
- await self._parse_track(item["track"])
- for item in await self._get_all_items(
- f"playlists/{prov_playlist_id}/tracks",
- )
- if (item and item["track"] and item["track"]["id"])
- ]
+ count = 0
+ result = []
+ for item in await self._get_all_items(
+ f"playlists/{prov_playlist_id}/tracks",
+ ):
+ if not (item and item["track"] and item["track"]["id"]):
+ continue
+ track = await self._parse_track(item["track"])
+ # use count as position
+ track.position = count
+ result.append(track)
+ count += 1
+ return result
async def get_artist_albums(self, prov_artist_id) -> List[Album]:
"""Get a list of all albums for the given artist."""
return await self._post_data(f"playlists/{prov_playlist_id}/tracks", data=data)
async def remove_playlist_tracks(
- self, prov_playlist_id: str, prov_track_ids: List[str]
+ self, prov_playlist_id: str, positions_to_remove: Tuple[int]
) -> None:
"""Remove track(s) from playlist."""
track_uris = []
- for track_id in prov_track_ids:
- track_uris.append({"uri": f"spotify:track:{track_id}"})
+ for track in await self.get_playlist_tracks(prov_playlist_id):
+ if track.position in positions_to_remove:
+ track_uris.append({"uri": f"spotify:track:{track.item_id}"})
+ if len(track_uris) == positions_to_remove:
+ break
data = {"tracks": track_uris}
return await self._delete_data(
f"playlists/{prov_playlist_id}/tracks", data=data
headers=self._headers,
username=self.config.username,
)
- if "tracks" in playlist_obj:
- tracks = []
- for track in playlist_obj["tracks"]:
- if track["isAvailable"]:
- # Playlist tracks sometimes do not have a valid artist id
- # In that case, call the API for track details based on track id
- try:
- track = await self._parse_track(track)
- if track:
- tracks.append(track)
- except InvalidDataError:
- track = await self.get_track(track["videoId"])
- if track:
- tracks.append(track)
- return tracks
- return []
+ if "tracks" not in playlist_obj:
+ return []
+ tracks = []
+ for index, track in enumerate(playlist_obj["tracks"]):
+ if track["isAvailable"]:
+ # Playlist tracks sometimes do not have a valid artist id
+ # In that case, call the API for track details based on track id
+ try:
+ track = await self._parse_track(track)
+ if track:
+ track.position = index
+ tracks.append(track)
+ except InvalidDataError:
+ track = await self.get_track(track["videoId"])
+ if track:
+ track.position = index
+ tracks.append(track)
+ return tracks
async def get_artist_albums(self, prov_artist_id) -> List[Album]:
"""Get a list of albums for the given artist."""
)
async def remove_playlist_tracks(
- self, prov_playlist_id: str, prov_track_ids: List[str]
+ self, prov_playlist_id: str, positions_to_remove: Tuple[int]
) -> None:
"""Remove track(s) from playlist."""
- # YT needs both the videoId and de setVideoId in order to remove
- # the track. Thus, we need to obtain the playlist details and
- # grab the info from there.
playlist_obj = await get_playlist(
prov_playlist_id=prov_playlist_id,
headers=self._headers,
username=self.config.username,
)
- if playlist_obj.get("tracks"):
- tracks_to_delete = [
- {"videoId": track["videoId"], "setVideoId": track["setVideoId"]}
- for track in playlist_obj.get("tracks")
- if track.get("videoId") in prov_track_ids
- ]
- return await add_remove_playlist_tracks(
- headers=self._headers,
- prov_playlist_id=prov_playlist_id,
- prov_track_ids=tracks_to_delete,
- add=False,
- username=self.config.username,
- )
+ if "tracks" not in playlist_obj:
+ return
+ tracks_to_delete = []
+ for index, track in enumerate(playlist_obj["tracks"]):
+ if index in positions_to_remove:
+ # YT needs both the videoId and the setVideoId in order to remove
+ # the track. Thus, we need to obtain the playlist details and
+ # grab the info from there.
+ tracks_to_delete.append(
+ {"videoId": track["videoId"], "setVideoId": track["setVideoId"]}
+ )
+
+ return await add_remove_playlist_tracks(
+ headers=self._headers,
+ prov_playlist_id=prov_playlist_id,
+ prov_track_ids=tracks_to_delete,
+ add=False,
+ username=self.config.username,
+ )
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""