From: Marcel van der Veldt Date: Wed, 6 Apr 2022 12:01:13 +0000 (+0200) Subject: prevent duplicate sync tasks X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=c91018908b0c5d03ac29ce4a5e2aaabaf30a2eee;p=music-assistant-server.git prevent duplicate sync tasks --- diff --git a/music_assistant/controllers/music/__init__.py b/music_assistant/controllers/music/__init__.py index 9fa0337e..6012f9ad 100755 --- a/music_assistant/controllers/music/__init__.py +++ b/music_assistant/controllers/music/__init__.py @@ -14,7 +14,7 @@ from music_assistant.controllers.music.tracks import TracksController from music_assistant.helpers.cache import cached from music_assistant.helpers.datetime import utc_timestamp from music_assistant.helpers.typing import MusicAssistant -from music_assistant.helpers.util import create_task +from music_assistant.helpers.util import create_task, run_periodic from music_assistant.models.errors import ( AlreadyRegisteredError, MusicAssistantError, @@ -48,6 +48,7 @@ class MusicController: self.radio = RadioController(mass) self.playlists = PlaylistController(mass) self._providers: Dict[str, MusicProvider] = {} + self._sync_tasks = set() async def setup(self): """Async initialize of module.""" @@ -58,7 +59,7 @@ class MusicController: await self.tracks.setup() await self.radio.setup() await self.playlists.setup() - self.__schedule_sync_tasks() + create_task(self.__periodic_sync) @property def provider_count(self) -> int: @@ -94,7 +95,7 @@ class MusicController: else: self._providers[provider.id] = provider self.mass.signal_event(EventType.PROVIDER_REGISTERED, provider) - await self.schedule_provider_sync(provider.id) + create_task(self.run_provider_sync(provider.id)) async def search( self, search_query, media_types: List[MediaType], limit: int = 10 @@ -345,8 +346,8 @@ class MusicController: job_desc, ) - async def schedule_provider_sync(self, provider_id: str): - """Schedule library sync for a provider.""" + async def run_provider_sync(self, provider_id: str): + """Run library sync for a provider.""" provider = self.get_provider(provider_id) if not provider: return @@ -363,50 +364,53 @@ class MusicController: self, media_type: MediaType, provider_id: str ) -> None: """Sync library items for given provider.""" - music_provider = self.get_provider(provider_id) - if not music_provider or not music_provider.available: - return - controller = self._get_controller(media_type) - # create a set of all previous and current db id's - prev_ids = set() - for db_item in await controller.library(): - for prov_id in db_item.provider_ids: - if prov_id.provider == provider_id: - prev_ids.add(db_item.item_id) - cur_ids = set() - for prov_item in await music_provider.get_library_items(media_type): - prov_item: MediaItemType = prov_item - db_item: MediaItemType = await controller.get_db_item_by_prov_id( - prov_item.provider, prov_item.item_id - ) - if not db_item and media_type == MediaType.ARTIST: - # for artists we need a fully matched item (with musicbrainz id) - db_item = await controller.get( - prov_item.item_id, prov_item.provider, details=prov_item - ) - elif not db_item: - # for other mediatypes its enough to simply dump the item in the db - db_item = await controller.add_db_item(prov_item) - cur_ids.add(db_item.item_id) - if not db_item.in_library: - await controller.set_db_library(db_item.item_id, True) - # sync album tracks - if media_type == MediaType.ALBUM: - self.mass.add_job( - self._sync_album_tracks(db_item), - f"Sync album tracks for album {db_item.name}", - ) - # sync playlist tracks - if media_type == MediaType.PLAYLIST: - self.mass.add_job( - self._sync_playlist_tracks(db_item), - f"Sync playlist tracks for playlist {db_item.name}", + sync_id = f"{media_type.value}.{provider_id}" + if sync_id in self._sync_tasks: + self.logger.debug("Abort sync task %s because its already running", sync_id) + return # already running + self._sync_tasks.add(sync_id) + try: + music_provider = self.get_provider(provider_id) + if not music_provider or not music_provider.available: + return + controller = self._get_controller(media_type) + # create a set of all previous and current db id's + prev_ids = set() + for db_item in await controller.library(): + for prov_id in db_item.provider_ids: + if prov_id.provider == provider_id: + prev_ids.add(db_item.item_id) + cur_ids = set() + for prov_item in await music_provider.get_library_items(media_type): + prov_item: MediaItemType = prov_item + db_item: MediaItemType = await controller.get_db_item_by_prov_id( + prov_item.provider, prov_item.item_id ) - - # process deletions - for item_id in prev_ids: - if item_id not in cur_ids: - await controller.set_db_library(item_id, False) + if not db_item and media_type == MediaType.ARTIST: + # for artists we need a fully matched item (with musicbrainz id) + db_item = await controller.get( + prov_item.item_id, prov_item.provider, details=prov_item + ) + elif not db_item: + # for other mediatypes its enough to simply dump the item in the db + db_item = await controller.add_db_item(prov_item) + cur_ids.add(db_item.item_id) + if not db_item.in_library: + await controller.set_db_library(db_item.item_id, True) + # sync album tracks + if media_type == MediaType.ALBUM: + await self._sync_album_tracks(db_item) + # sync playlist tracks + if media_type == MediaType.PLAYLIST: + await self._sync_playlist_tracks(db_item) + + # process deletions + for item_id in prev_ids: + if item_id not in cur_ids: + await controller.set_db_library(item_id, False) + + finally: + self._sync_tasks.remove(sync_id) async def _sync_album_tracks(self, db_album: Album) -> None: """Store album tracks of in-library album in database.""" @@ -492,9 +496,8 @@ class MusicController: UNIQUE(item_id, provider));""" ) - def __schedule_sync_tasks(self): - """Schedule the sync tasks.""" + @run_periodic(3 * 3600, True) + async def __periodic_sync(self): + """Periodically sync all providers.""" for prov in self.providers: - create_task(self.schedule_provider_sync(prov.id)) - # reschedule self - self.mass.loop.call_later(3 * 3600, self.__schedule_sync_tasks) + await self.run_provider_sync(prov.id) diff --git a/music_assistant/helpers/util.py b/music_assistant/helpers/util.py index 43a59913..f314e660 100755 --- a/music_assistant/helpers/util.py +++ b/music_assistant/helpers/util.py @@ -66,14 +66,18 @@ def create_task( return loop.create_task(executor_wrapper(target, *args, **kwargs)) -def run_periodic(period): +def run_periodic(delay: float, later: bool = False): """Run a coroutine at interval.""" def scheduler(fcn): async def wrapper(*args, **kwargs): while True: - asyncio.create_task(fcn(*args, **kwargs)) - await asyncio.sleep(period) + if later: + await asyncio.sleep(delay) + await fcn(*args, **kwargs) + else: + await fcn(*args, **kwargs) + await asyncio.sleep(delay) return wrapper