prevent duplicate sync tasks
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 6 Apr 2022 12:01:13 +0000 (14:01 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 6 Apr 2022 12:01:13 +0000 (14:01 +0200)
music_assistant/controllers/music/__init__.py
music_assistant/helpers/util.py

index 9fa0337ec028c167fc8dd0cd47520bbe3c96de92..6012f9ad5589b36ed46f153a48ce3b7840b2adfb 100755 (executable)
@@ -14,7 +14,7 @@ from music_assistant.controllers.music.tracks import TracksController
 from music_assistant.helpers.cache import cached
 from music_assistant.helpers.datetime import utc_timestamp
 from music_assistant.helpers.typing import MusicAssistant
-from music_assistant.helpers.util import create_task
+from music_assistant.helpers.util import create_task, run_periodic
 from music_assistant.models.errors import (
     AlreadyRegisteredError,
     MusicAssistantError,
@@ -48,6 +48,7 @@ class MusicController:
         self.radio = RadioController(mass)
         self.playlists = PlaylistController(mass)
         self._providers: Dict[str, MusicProvider] = {}
+        self._sync_tasks = set()
 
     async def setup(self):
         """Async initialize of module."""
@@ -58,7 +59,7 @@ class MusicController:
         await self.tracks.setup()
         await self.radio.setup()
         await self.playlists.setup()
-        self.__schedule_sync_tasks()
+        create_task(self.__periodic_sync)
 
     @property
     def provider_count(self) -> int:
@@ -94,7 +95,7 @@ class MusicController:
         else:
             self._providers[provider.id] = provider
             self.mass.signal_event(EventType.PROVIDER_REGISTERED, provider)
-            await self.schedule_provider_sync(provider.id)
+            create_task(self.run_provider_sync(provider.id))
 
     async def search(
         self, search_query, media_types: List[MediaType], limit: int = 10
@@ -345,8 +346,8 @@ class MusicController:
                 job_desc,
             )
 
-    async def schedule_provider_sync(self, provider_id: str):
-        """Schedule library sync for a provider."""
+    async def run_provider_sync(self, provider_id: str):
+        """Run library sync for a provider."""
         provider = self.get_provider(provider_id)
         if not provider:
             return
@@ -363,50 +364,53 @@ class MusicController:
         self, media_type: MediaType, provider_id: str
     ) -> None:
         """Sync library items for given provider."""
-        music_provider = self.get_provider(provider_id)
-        if not music_provider or not music_provider.available:
-            return
-        controller = self._get_controller(media_type)
-        # create a set of all previous and current db id's
-        prev_ids = set()
-        for db_item in await controller.library():
-            for prov_id in db_item.provider_ids:
-                if prov_id.provider == provider_id:
-                    prev_ids.add(db_item.item_id)
-        cur_ids = set()
-        for prov_item in await music_provider.get_library_items(media_type):
-            prov_item: MediaItemType = prov_item
-            db_item: MediaItemType = await controller.get_db_item_by_prov_id(
-                prov_item.provider, prov_item.item_id
-            )
-            if not db_item and media_type == MediaType.ARTIST:
-                # for artists we need a fully matched item (with musicbrainz id)
-                db_item = await controller.get(
-                    prov_item.item_id, prov_item.provider, details=prov_item
-                )
-            elif not db_item:
-                # for other mediatypes its enough to simply dump the item in the db
-                db_item = await controller.add_db_item(prov_item)
-            cur_ids.add(db_item.item_id)
-            if not db_item.in_library:
-                await controller.set_db_library(db_item.item_id, True)
-            # sync album tracks
-            if media_type == MediaType.ALBUM:
-                self.mass.add_job(
-                    self._sync_album_tracks(db_item),
-                    f"Sync album tracks for album {db_item.name}",
-                )
-            # sync playlist tracks
-            if media_type == MediaType.PLAYLIST:
-                self.mass.add_job(
-                    self._sync_playlist_tracks(db_item),
-                    f"Sync playlist tracks for playlist {db_item.name}",
+        sync_id = f"{media_type.value}.{provider_id}"
+        if sync_id in self._sync_tasks:
+            self.logger.debug("Abort sync task %s because its already running", sync_id)
+            return  # already running
+        self._sync_tasks.add(sync_id)
+        try:
+            music_provider = self.get_provider(provider_id)
+            if not music_provider or not music_provider.available:
+                return
+            controller = self._get_controller(media_type)
+            # create a set of all previous and current db id's
+            prev_ids = set()
+            for db_item in await controller.library():
+                for prov_id in db_item.provider_ids:
+                    if prov_id.provider == provider_id:
+                        prev_ids.add(db_item.item_id)
+            cur_ids = set()
+            for prov_item in await music_provider.get_library_items(media_type):
+                prov_item: MediaItemType = prov_item
+                db_item: MediaItemType = await controller.get_db_item_by_prov_id(
+                    prov_item.provider, prov_item.item_id
                 )
-
-        # process deletions
-        for item_id in prev_ids:
-            if item_id not in cur_ids:
-                await controller.set_db_library(item_id, False)
+                if not db_item and media_type == MediaType.ARTIST:
+                    # for artists we need a fully matched item (with musicbrainz id)
+                    db_item = await controller.get(
+                        prov_item.item_id, prov_item.provider, details=prov_item
+                    )
+                elif not db_item:
+                    # for other mediatypes its enough to simply dump the item in the db
+                    db_item = await controller.add_db_item(prov_item)
+                cur_ids.add(db_item.item_id)
+                if not db_item.in_library:
+                    await controller.set_db_library(db_item.item_id, True)
+                # sync album tracks
+                if media_type == MediaType.ALBUM:
+                    await self._sync_album_tracks(db_item)
+                # sync playlist tracks
+                if media_type == MediaType.PLAYLIST:
+                    await self._sync_playlist_tracks(db_item)
+
+            # process deletions
+            for item_id in prev_ids:
+                if item_id not in cur_ids:
+                    await controller.set_db_library(item_id, False)
+
+        finally:
+            self._sync_tasks.remove(sync_id)
 
     async def _sync_album_tracks(self, db_album: Album) -> None:
         """Store album tracks of in-library album in database."""
@@ -492,9 +496,8 @@ class MusicController:
                     UNIQUE(item_id, provider));"""
             )
 
-    def __schedule_sync_tasks(self):
-        """Schedule the sync tasks."""
+    @run_periodic(3 * 3600, True)
+    async def __periodic_sync(self):
+        """Periodically sync all providers."""
         for prov in self.providers:
-            create_task(self.schedule_provider_sync(prov.id))
-        # reschedule self
-        self.mass.loop.call_later(3 * 3600, self.__schedule_sync_tasks)
+            await self.run_provider_sync(prov.id)
index 43a59913613df099f5ef32765ba77483fc387781..f314e660eeb0195bafc339befd89b016ce29afad 100755 (executable)
@@ -66,14 +66,18 @@ def create_task(
     return loop.create_task(executor_wrapper(target, *args, **kwargs))
 
 
-def run_periodic(period):
+def run_periodic(delay: float, later: bool = False):
     """Run a coroutine at interval."""
 
     def scheduler(fcn):
         async def wrapper(*args, **kwargs):
             while True:
-                asyncio.create_task(fcn(*args, **kwargs))
-                await asyncio.sleep(period)
+                if later:
+                    await asyncio.sleep(delay)
+                    await fcn(*args, **kwargs)
+                else:
+                    await fcn(*args, **kwargs)
+                    await asyncio.sleep(delay)
 
         return wrapper