PLAYER_UPDATED = "player updated"
STREAM_STARTED = "streaming started"
STREAM_ENDED = "streaming ended"
- CONFIG_CHANGED = "config changed"
MUSIC_SYNC_STATUS = "music sync status"
QUEUE_ADDED = "queue_added"
QUEUE_UPDATED = "queue updated"
ALBUM_ADDED = "album added"
TRACK_ADDED = "track added"
PLAYLIST_ADDED = "playlist added"
+ PLAYLIST_UPDATED = "playlist updated"
RADIO_ADDED = "radio added"
TASK_UPDATED = "task updated"
PROVIDER_REGISTERED = "provider registered"
self.radio = RadioController(mass)
self.playlists = PlaylistController(mass)
self._providers: Dict[str, MusicProvider] = {}
- self._sync_tasks = set()
async def setup(self):
"""Async initialize of module."""
provider_id,
),
f"Library sync of {media_type.value}s for provider {provider.name}",
+ allow_duplicate=False,
)
async def _library_items_sync(
self, media_type: MediaType, provider_id: str
) -> None:
"""Sync library items for given provider."""
- sync_id = f"{media_type.value}.{provider_id}"
- if sync_id in self._sync_tasks:
- self.logger.debug("Abort sync task %s because its already running", sync_id)
- return # already running
- self._sync_tasks.add(sync_id)
- try:
- music_provider = self.get_provider(provider_id)
- if not music_provider or not music_provider.available:
- return
- controller = self._get_controller(media_type)
- # create a set of all previous and current db id's
- prev_ids = set()
- for db_item in await controller.library():
- for prov_id in db_item.provider_ids:
- if prov_id.provider == provider_id:
- prev_ids.add(db_item.item_id)
- cur_ids = set()
- for prov_item in await music_provider.get_library_items(media_type):
- prov_item: MediaItemType = prov_item
- db_item: MediaItemType = await controller.get_db_item_by_prov_id(
- prov_item.provider, prov_item.item_id
+ music_provider = self.get_provider(provider_id)
+ if not music_provider or not music_provider.available:
+ return
+ controller = self._get_controller(media_type)
+ # create a set of all previous and current db id's
+ prev_ids = set()
+ for db_item in await controller.library():
+ for prov_id in db_item.provider_ids:
+ if prov_id.provider == provider_id:
+ prev_ids.add(db_item.item_id)
+ cur_ids = set()
+ for prov_item in await music_provider.get_library_items(media_type):
+ prov_item: MediaItemType = prov_item
+ db_item: MediaItemType = await controller.get_db_item_by_prov_id(
+ prov_item.provider, prov_item.item_id
+ )
+ if not db_item and media_type == MediaType.ARTIST:
+ # for artists we need a fully matched item (with musicbrainz id)
+ db_item = await controller.get(
+ prov_item.item_id,
+ prov_item.provider,
+ details=prov_item,
+ lazy=False,
)
- if not db_item and media_type == MediaType.ARTIST:
- # for artists we need a fully matched item (with musicbrainz id)
- db_item = await controller.get(
- prov_item.item_id,
- prov_item.provider,
- details=prov_item,
- lazy=False,
- )
- elif db_item and db_item.available != prov_item.available:
- # availability changed
- db_item = await controller.add_db_item(prov_item)
- elif db_item and not db_item.available:
- # use auto matching magic to find a substitute for missing item
- db_item = await controller.add(prov_item)
- elif not db_item:
- # for other mediatypes its enough to simply dump the item in the db
- db_item = await controller.add_db_item(prov_item)
- cur_ids.add(db_item.item_id)
- if not db_item.in_library:
- await controller.set_db_library(db_item.item_id, True)
- # sync album tracks
- if media_type == MediaType.ALBUM:
- await self._sync_album_tracks(db_item)
- # sync playlist tracks
- if media_type == MediaType.PLAYLIST:
- await self._sync_playlist_tracks(db_item)
-
- # process deletions
- for item_id in prev_ids:
- if item_id not in cur_ids:
- await controller.set_db_library(item_id, False)
-
- finally:
- self._sync_tasks.remove(sync_id)
+ elif db_item and db_item.available != prov_item.available:
+ # availability changed
+ db_item = await controller.add_db_item(prov_item)
+ elif db_item and not db_item.available:
+ # use auto matching magic to find a substitute for missing item
+ db_item = await controller.get(
+ prov_item.item_id,
+ prov_item.provider,
+ lazy=False,
+ details=prov_item,
+ )
+ elif not db_item:
+ # for other mediatypes its enough to simply dump the item in the db
+ db_item = await controller.add_db_item(prov_item)
+ cur_ids.add(db_item.item_id)
+ if not db_item.in_library:
+ await controller.set_db_library(db_item.item_id, True)
+ # sync album tracks
+ if media_type == MediaType.ALBUM:
+ await self._sync_album_tracks(db_item)
+ # sync playlist tracks
+ if media_type == MediaType.PLAYLIST:
+ await self._sync_playlist_tracks(db_item)
+ # cool down a bit as we don't want to sync process to consume all IO
+ await asyncio.sleep(0.05)
+
+ # process deletions
+ for item_id in prev_ids:
+ if item_id not in cur_ids:
+ await controller.set_db_library(item_id, False)
async def _sync_album_tracks(self, db_album: Album) -> None:
"""Store album tracks of in-library album in database."""
)
if db_track and not db_track.available:
# use auto matching magic to find a substitute for missing track
- db_track = await self.tracks.add(album_track)
+ db_track = await self.tracks.get(
+ album_track.item_id,
+ album_track.provider,
+ lazy=False,
+ details=album_track,
+ )
elif not db_track:
db_track = await self.tracks.add_db_item(album_track)
album_track.disc_number,
album_track.track_number,
)
+ # cool down a bit as we don't want to sync process to consume all IO
+ await asyncio.sleep(0.05)
async def _sync_playlist_tracks(self, db_playlist: Playlist) -> None:
"""Store playlist tracks of in-library playlist in database."""
provider = self.get_provider(prov_id.provider)
if not provider:
continue
+ # clear db first
+ await self.playlists.remove_db_playlist_track(db_playlist.item_id)
for playlist_track in await self.playlists.get_provider_playlist_tracks(
prov_id.item_id, prov_id.provider
):
playlist_track.provider, playlist_track.item_id
)
if db_track and not db_track.available:
- # use auto matching magic to find a substitute for missing track
- db_track = await self.tracks.add(playlist_track)
+ # try auto matching magic to find a substitute for missing track
+ db_track = await self.tracks.get(
+ playlist_track.item_id,
+ playlist_track.provider,
+ lazy=False,
+ details=playlist_track,
+ )
elif not db_track:
db_track = await self.tracks.add_db_item(playlist_track)
assert playlist_track.position is not None
db_track.item_id,
playlist_track.position,
)
+ # cool down a bit as we don't want to sync process to consume all IO
+ await asyncio.sleep(0.05)
def _get_controller(
self, media_type: MediaType
"""Manage MediaItems of type Playlist."""
from __future__ import annotations
-import time
-from typing import List
+from typing import List, Optional
from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.cache import cached
)
return db_item
- async def add_playlist_tracks(
- self, playlist_id: str, playlist_provider: str, uris: List[str]
- ) -> None:
+ async def add_playlist_tracks(self, db_playlist_id: str, uris: List[str]) -> None:
"""Add multiple tracks to playlist. Creates background tasks to process the action."""
- playlist = await self.get(playlist_id, playlist_provider)
+ playlist = await self.get_db_item(db_playlist_id)
if not playlist:
- raise MediaNotFoundError(
- f"Playlist {playlist_provider}/{playlist_id} not found"
- )
+ raise MediaNotFoundError(f"Playlist with id {db_playlist_id} not found")
if not playlist.is_editable:
raise InvalidDataError(f"Playlist {playlist.name} is not editable")
for uri in uris:
- job_desc = f"Add track {uri} to playlist {playlist.uri}"
- self.mass.add_job(
- self.add_playlist_track(playlist_id, playlist_provider, uri), job_desc
- )
+ job_desc = f"Add track {uri} to playlist {playlist.name}"
+ self.mass.add_job(self.add_playlist_track(db_playlist_id, uri), job_desc)
- async def add_playlist_track(
- self, playlist_id: str, playlist_provider: str, track_uri: str
- ) -> None:
+ async def add_playlist_track(self, db_playlist_id: str, track_uri: str) -> None:
"""Add track to playlist - make sure we dont add duplicates."""
# we can only edit playlists that are in the database (marked as editable)
- playlist = await self.get(playlist_id, playlist_provider)
+ playlist = await self.get_db_item(db_playlist_id)
if not playlist:
- raise MediaNotFoundError(
- f"Playlist {playlist_provider}/{playlist_id} not found"
- )
+ raise MediaNotFoundError(f"Playlist with id {db_playlist_id} not found")
if not playlist.is_editable:
raise InvalidDataError(f"Playlist {playlist.name} is not editable")
# make sure we have recent full track details
- track = await self.mass.music.get_item_by_uri(
- track_uri, force_refresh=True, lazy=False
- )
+ track = await self.mass.music.get_item_by_uri(track_uri, lazy=False)
assert track.media_type == MediaType.TRACK
# a playlist can only have one provider (for now)
playlist_prov = next(iter(playlist.provider_ids))
# grab all existing track ids in the playlist so we can check for duplicates
cur_playlist_track_ids = set()
+ count = 0
for item in await self.tracks(playlist_prov.item_id, playlist_prov.provider):
+ count += 1
cur_playlist_track_ids.update(
{
i.item_id
"Track is not available on provider {playlist_prov.provider}"
)
# actually add the tracks to the playlist on the provider
- # invalidate cache
- playlist.checksum = str(time.time())
- await self.update_db_playlist(playlist.item_id, playlist)
- # return result of the action on the provider
provider = self.mass.music.get_provider(playlist_prov.provider)
- return await provider.add_playlist_tracks(
- playlist_prov.item_id, [track_id_to_add]
+ await provider.add_playlist_tracks(playlist_prov.item_id, [track_id_to_add])
+ # update local db entry
+ await self.add_db_playlist_track(db_playlist_id, track.item_id, count + 1)
+ self.mass.signal_event(
+ MassEvent(
+ type=EventType.PLAYLIST_UPDATED, object_id=db_playlist_id, data=playlist
+ )
)
async def remove_playlist_tracks(
- self, playlist_id: str, playlist_provider: str, uris: List[str]
+ self, db_playlist_id: str, positions: List[int]
) -> None:
- """Remove multiple tracks from playlist. Creates background tasks to process the action."""
- playlist = await self.get(playlist_id, playlist_provider)
+ """Remove multiple tracks from playlist."""
+ playlist = await self.get_db_item(db_playlist_id)
if not playlist:
- raise MediaNotFoundError(
- f"Playlist {playlist_provider}/{playlist_id} not found"
- )
+ raise MediaNotFoundError(f"Playlist with id {db_playlist_id} not found")
if not playlist.is_editable:
raise InvalidDataError(f"Playlist {playlist.name} is not editable")
- for uri in uris:
- job_desc = f"Remove track {uri} from playlist {playlist.uri}"
- self.mass.add_job(
- self.remove_playlist_track(playlist_id, playlist_provider, uri),
- job_desc,
- )
-
- async def remove_playlist_track(
- self, playlist_id: str, playlist_provider: str, track_uri: str
- ) -> None:
- """Remove track from playlist."""
- # we can only edit playlists that are in the database (marked as editable)
- playlist = await self.get(playlist_id, playlist_provider)
- if not playlist:
- raise MediaNotFoundError(
- f"Playlist {playlist_provider}/{playlist_id} not found"
- )
- if not playlist.is_editable:
- raise InvalidDataError(f"Playlist {playlist.name} is not editable")
- # playlist can only have one provider (for now)
- prov_playlist = next(iter(playlist.provider_ids))
- track_ids_to_remove = set()
- track = await self.mass.music.get_item_by_uri(track_uri, lazy=True)
- assert track.media_type == MediaType.TRACK
- # a track can contain multiple versions on the same provider, remove all
- for track_provider in track.provider_ids:
- if track_provider.provider == prov_playlist.provider:
- track_ids_to_remove.add(track_provider.item_id)
- # actually remove the tracks from the playlist on the provider
- if track_ids_to_remove:
- # invalidate cache
- playlist.checksum = str(time.time())
- await self.update_db_playlist(playlist.item_id, playlist)
- provider = self.mass.music.get_provider(prov_playlist.provider)
- return await provider.remove_playlist_tracks(
- prov_playlist.item_id, track_ids_to_remove
+ for prov in playlist.provider_ids:
+ track_ids_to_remove = []
+ for playlist_track in await self.get_provider_playlist_tracks(
+ prov.item_id, prov.provider
+ ):
+ if playlist_track.position not in positions:
+ continue
+ track_ids_to_remove.append(playlist_track.item_id)
+ # actually remove the tracks from the playlist on the provider
+ if track_ids_to_remove:
+ provider = self.mass.music.get_provider(prov.provider)
+ await provider.remove_playlist_tracks(prov.item_id, track_ids_to_remove)
+ # update db
+ for pos in positions:
+ await self.remove_db_playlist_track(db_playlist_id, position=pos)
+ self.mass.signal_event(
+ MassEvent(
+ type=EventType.PLAYLIST_UPDATED, object_id=db_playlist_id, data=playlist
)
+ )
async def get_provider_playlist_tracks(
self, item_id: str, provider_id: str
item_id, MediaType.PLAYLIST, playlist.provider_ids
)
self.logger.debug("updated %s in database: %s", playlist.name, item_id)
- return await self.get_db_item(item_id)
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(type=EventType.PLAYLIST_UPDATED, object_id=item_id, data=playlist)
+ )
+ return db_item
async def get_db_playlist_tracks(self, item_id) -> List[Track]:
"""Get playlist tracks for an in-library playlist."""
"playlist_tracks",
{"playlist_id": playlist_id, "track_id": track_id, "position": position},
)
+
+ async def remove_db_playlist_track(
+ self,
+ playlist_id: int,
+ track_id: Optional[int] = None,
+ position: Optional[int] = None,
+ ) -> None:
+ """Remove playlist track from an in-library playlist."""
+ match = {"playlist_id": playlist_id}
+ if track_id is not None:
+ match["track_id"] = track_id
+ if position is not None:
+ match["position"] = position
+ return await self.mass.database.delete(
+ "playlist_tracks",
+ match,
+ )
"-i",
streamdetails.path,
]
- output_args = ["-ss", "30", "-to", "60", "-f", "mp3", "-q:a", "9", "-"]
+ output_args = ["-ss", "30", "-to", "60", "-f", "mp3", "-q:a", "6", "-"]
async with AsyncProcess(input_args + output_args) as proc:
# yield chunks from stdout
async with self.get_db(db) as _db:
sql_query = f"DELETE FROM {table}"
sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
- await _db.execute(sql_query)
+ await _db.execute(sql_query, match)
async def _migrate(self):
"""Perform database migration actions if needed."""
) -> None:
"""Add job to be (slowly) processed in the background (one by one)."""
if not allow_duplicate and name in self._job_names:
- self.logger("Ignoring job %s because it is already in the queue", name)
+ self.logger.debug("Ignored duplicate job: %s", name)
+ job.close()
return
if not name:
name = job.__qualname__ or job.__name__
async def add_playlist_tracks(
self, prov_playlist_id: str, prov_track_ids: List[str]
- ) -> bool:
- """Add track(s) to playlist. Return true on succes."""
+ ) -> None:
+ """Add track(s) to playlist."""
if MediaType.PLAYLIST in self.supported_mediatypes:
raise NotImplementedError
async def remove_playlist_tracks(
self, prov_playlist_id: str, prov_track_ids: List[str]
- ) -> bool:
- """Remove track(s) from playlist. Return true on succes."""
+ ) -> None:
+ """Remove track(s) from playlist."""
if MediaType.PLAYLIST in self.supported_mediatypes:
raise NotImplementedError
)
return result
- async def add_playlist_tracks(self, prov_playlist_id, prov_track_ids):
+ async def add_playlist_tracks(
+ self, prov_playlist_id: str, prov_track_ids: List[str]
+ ) -> None:
"""Add track(s) to playlist."""
params = {
"playlist_id": prov_playlist_id,
}
return await self._get_data("playlist/addTracks", params)
- async def remove_playlist_tracks(self, prov_playlist_id, prov_track_ids):
+ async def remove_playlist_tracks(
+ self, prov_playlist_id: str, prov_track_ids: List[str]
+ ) -> None:
"""Remove track(s) from playlist."""
playlist_track_ids = set()
params = {"playlist_id": prov_playlist_id, "extra": "tracks"}
result = await self._delete_data(f"playlists/{prov_item_id}/followers")
return result
- async def add_playlist_tracks(self, prov_playlist_id, prov_track_ids):
+ async def add_playlist_tracks(
+ self, prov_playlist_id: str, prov_track_ids: List[str]
+ ):
"""Add track(s) to playlist."""
track_uris = []
for track_id in prov_track_ids:
data = {"uris": track_uris}
return await self._post_data(f"playlists/{prov_playlist_id}/tracks", data=data)
- async def remove_playlist_tracks(self, prov_playlist_id, prov_track_ids):
+ async def remove_playlist_tracks(
+ self, prov_playlist_id: str, prov_track_ids: List[str]
+ ) -> None:
"""Remove track(s) from playlist."""
track_uris = []
for track_id in prov_track_ids: