from collections import deque
from functools import partial
from time import time
+from tkinter import NONE
from types import TracebackType
from typing import Any, Callable, Coroutine, Deque, List, Optional, Tuple, Type, Union
from uuid import uuid4
def add_job(
self, coro: Coroutine, name: Optional[str] = None, allow_duplicate=False
- ) -> None:
+ ) -> BackgroundJob:
"""Add job to be (slowly) processed in the background."""
if not allow_duplicate:
- # pylint: disable=protected-access
- if any(x for x in self._jobs if x.name == name):
+ if existing := next((x for x in self._jobs if x.name == name), NONE):
self.logger.debug("Ignored duplicate job: %s", name)
coro.close()
- return
+ return existing
if not name:
name = coro.__qualname__ or coro.__name__
job = BackgroundJob(str(uuid4()), name=name, coro=coro)
self._jobs.append(job)
self._jobs_event.set()
self.signal_event(MassEvent(EventType.BACKGROUND_JOB_UPDATED, data=job))
+ return job
def create_task(
self,
exc_info=err,
)
else:
+ job.result = task.result()
job.status = JobStatus.FINISHED
self.logger.info(
"Finished job [%s] in %s seconds.", job.name, execution_time
)
self._jobs.remove(job)
self._jobs_event.set()
+ # mark job as done
+ job.done()
self.signal_event(MassEvent(EventType.BACKGROUND_JOB_UPDATED, data=job))
async def __aenter__(self) -> "MusicAssistant":
"""Model for a Background Job."""
-from dataclasses import dataclass
+import asyncio
+from dataclasses import dataclass, field
from time import time
-from typing import Coroutine
+from typing import Any, Coroutine
from music_assistant.models.enums import JobStatus
name: str
timestamp: float = time()
status: JobStatus = JobStatus.PENDING
+ result: Any = None
+ _evt: asyncio.Event = field(init=False, default_factory=asyncio.Event)
def to_dict(self):
"""Return serializable dict from object."""
"timestamp": self.status.value,
"status": self.status.value,
}
+
+ async def wait(self) -> None:
+ """Wait for the job to complete."""
+ await self._evt.wait()
+
+ def done(self) -> None:
+ """Mark job as done."""
+ self._evt.set()
provider_id=provider_id,
)
if db_item and (time() - db_item.last_refresh) > REFRESH_INTERVAL:
+ # it's been too long since the full metadata was last retrieved (or never at all)
force_refresh = True
if db_item and force_refresh:
+ # get (first) provider item id belonging to this db item
provider_id, provider_item_id = await self.get_provider_id(db_item)
elif db_item:
+ # we have a db item and no refreshing is needed, return the results!
return db_item
if not details and provider_id:
+ # no details provider nor in db, fetch them from the provider
details = await self.get_provider_item(provider_item_id, provider_id)
if not details and provider:
# check providers for given provider type one by one
else:
break
if not details:
+ # we couldn't get a match from any of the providers, raise error
raise MediaNotFoundError(
f"Item not found: {provider.value or provider_id}/{provider_item_id}"
)
+ # create job to add the item to the db, including matching metadata etc. takes some time
+ # in 99% of the cases we just return lazy because we want the details as fast as possible
+ # only if we really need to wait for the result (e.g. to prevent race conditions), we
+ # can set lazy to false and we await to job to complete.
+ add_job = self.mass.add_job(self.add(details), f"Add {details.uri} to database")
if not lazy:
- return await self.add(details)
- self.mass.add_job(self.add(details), f"Add {details.uri} to database")
+ await add_job.wait()
+ return add_job.result
+
return db_item if db_item else details
async def search(
) -> None:
"""Add an item to the library."""
# make sure we have a valid full item
+ # note that we set 'lazy' to False because we need a full db item
db_item = await self.get(
provider_item_id, provider=provider, provider_id=provider_id, lazy=False
)
) -> None:
"""Remove item from the library."""
# make sure we have a valid full item
+ # note that we set 'lazy' to False because we need a full db item
db_item = await self.get(
provider_item_id, provider=provider, provider_id=provider_id, lazy=False
)
- # add to provider's libraries
+ # remove from provider's libraries
for prov_id in db_item.provider_ids:
if prov := self.mass.music.get_provider(prov_id.prov_id):
await prov.library_remove(prov_id.item_id, self.media_type)