from music_assistant.helpers.cache import cached
from music_assistant.helpers.datetime import utc_timestamp
from music_assistant.helpers.typing import MusicAssistant
-from music_assistant.helpers.util import create_task
+from music_assistant.helpers.util import create_task, run_periodic
from music_assistant.models.errors import (
AlreadyRegisteredError,
MusicAssistantError,
self.radio = RadioController(mass)
self.playlists = PlaylistController(mass)
self._providers: Dict[str, MusicProvider] = {}
+ self._sync_tasks = set()
async def setup(self):
"""Async initialize of module."""
await self.tracks.setup()
await self.radio.setup()
await self.playlists.setup()
- self.__schedule_sync_tasks()
+ create_task(self.__periodic_sync)
@property
def provider_count(self) -> int:
else:
self._providers[provider.id] = provider
self.mass.signal_event(EventType.PROVIDER_REGISTERED, provider)
- await self.schedule_provider_sync(provider.id)
+ create_task(self.run_provider_sync(provider.id))
async def search(
self, search_query, media_types: List[MediaType], limit: int = 10
job_desc,
)
- async def schedule_provider_sync(self, provider_id: str):
- """Schedule library sync for a provider."""
+ async def run_provider_sync(self, provider_id: str):
+ """Run library sync for a provider."""
provider = self.get_provider(provider_id)
if not provider:
return
self, media_type: MediaType, provider_id: str
) -> None:
"""Sync library items for given provider."""
- music_provider = self.get_provider(provider_id)
- if not music_provider or not music_provider.available:
- return
- controller = self._get_controller(media_type)
- # create a set of all previous and current db id's
- prev_ids = set()
- for db_item in await controller.library():
- for prov_id in db_item.provider_ids:
- if prov_id.provider == provider_id:
- prev_ids.add(db_item.item_id)
- cur_ids = set()
- for prov_item in await music_provider.get_library_items(media_type):
- prov_item: MediaItemType = prov_item
- db_item: MediaItemType = await controller.get_db_item_by_prov_id(
- prov_item.provider, prov_item.item_id
- )
- if not db_item and media_type == MediaType.ARTIST:
- # for artists we need a fully matched item (with musicbrainz id)
- db_item = await controller.get(
- prov_item.item_id, prov_item.provider, details=prov_item
- )
- elif not db_item:
- # for other mediatypes its enough to simply dump the item in the db
- db_item = await controller.add_db_item(prov_item)
- cur_ids.add(db_item.item_id)
- if not db_item.in_library:
- await controller.set_db_library(db_item.item_id, True)
- # sync album tracks
- if media_type == MediaType.ALBUM:
- self.mass.add_job(
- self._sync_album_tracks(db_item),
- f"Sync album tracks for album {db_item.name}",
- )
- # sync playlist tracks
- if media_type == MediaType.PLAYLIST:
- self.mass.add_job(
- self._sync_playlist_tracks(db_item),
- f"Sync playlist tracks for playlist {db_item.name}",
+ sync_id = f"{media_type.value}.{provider_id}"
+ if sync_id in self._sync_tasks:
+ self.logger.debug("Abort sync task %s because its already running", sync_id)
+ return # already running
+ self._sync_tasks.add(sync_id)
+ try:
+ music_provider = self.get_provider(provider_id)
+ if not music_provider or not music_provider.available:
+ return
+ controller = self._get_controller(media_type)
+ # create a set of all previous and current db id's
+ prev_ids = set()
+ for db_item in await controller.library():
+ for prov_id in db_item.provider_ids:
+ if prov_id.provider == provider_id:
+ prev_ids.add(db_item.item_id)
+ cur_ids = set()
+ for prov_item in await music_provider.get_library_items(media_type):
+ prov_item: MediaItemType = prov_item
+ db_item: MediaItemType = await controller.get_db_item_by_prov_id(
+ prov_item.provider, prov_item.item_id
)
-
- # process deletions
- for item_id in prev_ids:
- if item_id not in cur_ids:
- await controller.set_db_library(item_id, False)
+ if not db_item and media_type == MediaType.ARTIST:
+ # for artists we need a fully matched item (with musicbrainz id)
+ db_item = await controller.get(
+ prov_item.item_id, prov_item.provider, details=prov_item
+ )
+ elif not db_item:
+ # for other mediatypes its enough to simply dump the item in the db
+ db_item = await controller.add_db_item(prov_item)
+ cur_ids.add(db_item.item_id)
+ if not db_item.in_library:
+ await controller.set_db_library(db_item.item_id, True)
+ # sync album tracks
+ if media_type == MediaType.ALBUM:
+ await self._sync_album_tracks(db_item)
+ # sync playlist tracks
+ if media_type == MediaType.PLAYLIST:
+ await self._sync_playlist_tracks(db_item)
+
+ # process deletions
+ for item_id in prev_ids:
+ if item_id not in cur_ids:
+ await controller.set_db_library(item_id, False)
+
+ finally:
+ self._sync_tasks.remove(sync_id)
async def _sync_album_tracks(self, db_album: Album) -> None:
"""Store album tracks of in-library album in database."""
UNIQUE(item_id, provider));"""
)
- def __schedule_sync_tasks(self):
- """Schedule the sync tasks."""
+ @run_periodic(3 * 3600, True)
+ async def __periodic_sync(self):
+ """Periodically sync all providers."""
for prov in self.providers:
- create_task(self.schedule_provider_sync(prov.id))
- # reschedule self
- self.mass.loop.call_later(3 * 3600, self.__schedule_sync_tasks)
+ await self.run_provider_sync(prov.id)