async with MusicAssistant(mass_conf) as mass:
# start sync
- await mass.music.start_sync()
+ await mass.music.start_sync(schedule=3)
# get some data
artists = await mass.music.artists.count()
await mass.setup()
# start sync
- await mass.music.start_sync()
+ await mass.music.start_sync(schedule=3)
# get some data
await mass.music.artists.library()
for prov_conf in self.mass.config.providers:
prov_cls = PROV_MAP[prov_conf.type]
await self._register_provider(prov_cls(self.mass, prov_conf), prov_conf)
- # TODO: handle deletion of providers ?
- async def start_sync(self, schedule: Optional[float] = 3) -> None:
+ async def start_sync(
+ self,
+ media_types: Optional[Tuple[MediaType]] = None,
+ prov_types: Optional[Tuple[ProviderType]] = None,
+ schedule: Optional[float] = None,
+ ) -> None:
"""
Start running the sync of all registred providers.
- :param schedule: schedule syncjob every X hours, set to None for just a manual sync run.
+ media_types: only sync these media types. None for all.
+ prov_types: only sync these provider types. None for all.
+ schedule: schedule syncjob every X hours, set to None for just a manual sync run.
"""
async def do_sync():
while True:
for prov in self.providers:
+ if prov_types is not None and prov.type not in prov_types:
+ continue
self.mass.add_job(
- prov.sync_library(),
+ prov.sync_library(media_types),
f"Library sync for provider {prov.name}",
allow_duplicate=False,
)
from typing import AsyncGenerator, List, Optional, Set, Tuple
import aiofiles
-import aiofiles.ospath as aiopath
import xmltodict
+from aiofiles.os import wrap
from aiofiles.threadpool.binary import AsyncFileIO
from tinytag.tinytag import TinyTag
async def setup(self) -> bool:
"""Handle async initialization of the provider."""
- if not await aiopath.isdir(self.config.path):
+ isdir = wrap(os.path.exists)
+
+ if not await isdir(self.config.path):
raise MediaNotFoundError(
f"Music Directory {self.config.path} does not exist"
)
result += playlists
return result
- async def sync_library(self) -> None:
+ async def sync_library(
+ self, media_types: Optional[Tuple[MediaType]] = None
+ ) -> None:
"""Run library sync for this provider."""
cache_key = f"{self.id}.checksums"
prev_checksums = await self.mass.cache.get(cache_key)
playlist_path = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
if not await self.exists(playlist_path):
raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
- mtime = await aiopath.getmtime(playlist_path)
+ getmtime = wrap(os.path.getmtime)
+ mtime = await getmtime(playlist_path)
checksum = f"{SCHEMA_VERSION}.{int(mtime)}"
cache_key = f"playlist_{self.id}_tracks_{prov_playlist_id}"
if cache := await self.mass.cache.get(cache_key, checksum):
# ensure we have a full path and not relative
if self.config.path not in file_path:
file_path = os.path.join(self.config.path, file_path)
- return await aiopath.exists(file_path)
+ _exists = wrap(os.path.exists)
+ return await _exists(file_path)
@asynccontextmanager
async def open_file(self, file_path: str, mode="rb") -> AsyncFileIO:
mass.signal_event(
MassEvent(
EventType.STREAM_ENDED,
- object_id=streamdetails.provider,
+ object_id=streamdetails.provider.value,
data=streamdetails,
)
)
continue
cur_val = getattr(self, fld.name)
if isinstance(cur_val, list):
- merge_lists(cur_val, new_val)
+ new_val = merge_lists(cur_val, new_val)
+ setattr(self, fld.name, new_val)
elif isinstance(cur_val, set):
- cur_val.update(new_val)
+ new_val = cur_val.update(new_val)
+ setattr(self, fld.name, new_val)
elif cur_val is None or allow_overwrite:
setattr(self, fld.name, new_val)
return self
async def clear(self) -> None:
"""Clear all items in the queue."""
- await self.stop()
+ if self.player.state not in (PlayerState.IDLE, PlayerState.OFF):
+ await self.stop()
await self.update([])
def on_player_update(self) -> None:
from __future__ import annotations
from abc import abstractmethod
-from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, List, Optional
+from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, List, Optional, Tuple
from music_assistant.models.config import MusicProviderConfig
from music_assistant.models.enums import MediaType, ProviderType
if media_type == MediaType.RADIO:
return await self.get_radio(prov_item_id)
- async def sync_library(self) -> None:
+ async def sync_library(
+ self, media_types: Optional[Tuple[MediaType]] = None
+ ) -> None:
"""Run library sync for this provider."""
# this reference implementation can be overridden with provider specific approach
# this logic is aimed at streaming/online providers,
# filesystem implementation(s) just override this.
async with self.mass.database.get_db() as db:
for media_type in self.supported_mediatypes:
+ if media_types is not None and media_type not in media_types:
+ continue
self.logger.debug("Start sync of %s items.", media_type.value)
controller = self.mass.music.get_controller(media_type)
async-timeout>=3.0,<=4.0.2
aiohttp>=3.7.0,>=3.8.1
asyncio-throttle>=1.0,<=1.0.2
-aiofiles>=0.8.0
+aiofiles>=0.7.0,<=0.8.5
databases>=0.5,<=0.5.5
aiosqlite>=0.13,<=0.17
python-slugify>=4.0,<6.1.3