import logging
import os
import os.path
+import time
+from collections.abc import Iterator
from typing import TYPE_CHECKING, cast
import aiofiles
import shortuuid
import xmltodict
from aiofiles.os import wrap
-from music_assistant_models.config_entries import (
- ConfigEntry,
- ConfigValueOption,
- ConfigValueType,
-)
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
from music_assistant_models.enums import (
ConfigEntryType,
ContentType,
ProviderFeature,
StreamType,
)
-from music_assistant_models.errors import (
- MediaNotFoundError,
- MusicAssistantError,
- SetupFailedError,
-)
+from music_assistant_models.errors import MediaNotFoundError, MusicAssistantError, SetupFailedError
from music_assistant_models.media_items import (
Album,
Artist,
)
from music_assistant.helpers.compare import compare_strings, create_safe_string
from music_assistant.helpers.playlists import parse_m3u, parse_pls
-from music_assistant.helpers.tags import AudioTags, parse_tags, split_items
-from music_assistant.helpers.util import TaskManager, parse_title_and_version
+from music_assistant.helpers.tags import AudioTags, async_parse_tags, parse_tags, split_items
+from music_assistant.helpers.util import parse_title_and_version
from music_assistant.models.music_provider import MusicProvider
from .helpers import (
+ IGNORE_DIRS,
FileSystemItem,
get_absolute_path,
get_album_dir,
)
if TYPE_CHECKING:
- from collections.abc import AsyncGenerator
-
from music_assistant_models.config_entries import ProviderConfig
from music_assistant_models.provider import ProviderManifest
ProviderFeature.SEARCH,
}
-listdir = wrap(os.listdir)
isdir = wrap(os.path.isdir)
isfile = wrap(os.path.isfile)
exists = wrap(os.path.exists)
makedirs = wrap(os.makedirs)
+scandir = wrap(os.scandir)
async def setup(
base_path: str
write_access: bool = False
- scan_limiter = asyncio.Semaphore(25)
+ sync_running: bool = False
@property
def supported_features(self) -> set[ProviderFeature]:
item_path = path.split("://", 1)[1]
if not item_path:
item_path = ""
- async for item in self.listdir(item_path, recursive=False, sort=True):
+ abs_path = self.get_absolute_path(item_path)
+ for item in await asyncio.to_thread(sorted_scandir, self.base_path, abs_path, sort=True):
if not item.is_dir and ("." not in item.filename or not item.ext):
# skip system files and files without extension
continue
if item.is_dir:
items.append(
BrowseFolder(
- item_id=item.path,
+ item_id=item.relative_path,
provider=self.instance_id,
- path=f"{self.instance_id}://{item.path}",
+ path=f"{self.instance_id}://{item.relative_path}",
name=item.filename,
)
)
items.append(
ItemMapping(
media_type=MediaType.TRACK,
- item_id=item.path,
+ item_id=item.relative_path,
provider=self.instance_id,
name=item.filename,
)
items.append(
ItemMapping(
media_type=MediaType.PLAYLIST,
- item_id=item.path,
+ item_id=item.relative_path,
provider=self.instance_id,
name=item.filename,
)
async def sync_library(self, media_types: tuple[MediaType, ...]) -> None:
"""Run library sync for this provider."""
assert self.mass.music.database
+ start_time = time.time()
+ if self.sync_running:
+ self.logger.warning("Library sync already running for %s", self.name)
+ return
+ self.logger.info(
+ "Started Library sync for %s",
+ self.name,
+ )
file_checksums: dict[str, str] = {}
query = (
f"SELECT provider_item_id, details FROM {DB_TABLE_PROVIDER_MAPPINGS} "
# we work bottom up, as-in we derive all info from the tracks
cur_filenames = set()
prev_filenames = set(file_checksums.keys())
- async with TaskManager(self.mass, 25) as tm:
- async for item in self.listdir("", recursive=True, sort=False):
- if "." not in item.filename or not item.ext:
- # skip system files and files without extension
- continue
- if item.ext not in SUPPORTED_EXTENSIONS:
- # unsupported file extension
+ # NOTE: we do the entire traversing of the directory structure, including parsing tags
+ # in a single executor threads to save the overhead of having to spin up tons of tasks
+ def listdir(path: str) -> Iterator[FileSystemItem]:
+ """Recursively traverse directory entries."""
+ for item in os.scandir(path):
+ # ignore invalid filenames
+ if item.name in IGNORE_DIRS or item.name.startswith((".", "_")):
continue
+ if item.is_dir(follow_symlinks=False):
+ yield from listdir(item.path)
+ elif item.is_file(follow_symlinks=False):
+ # skip files without extension
+ if "." not in item.name:
+ continue
+ yield FileSystemItem.from_dir_entry(item, self.base_path)
+
+ def run_sync() -> None:
+ """Run the actual sync (in an executor job)."""
+ self.sync_running = True
+ try:
+ for item in listdir(self.base_path):
+ if item.ext not in SUPPORTED_EXTENSIONS:
+ # unsupported file extension
+ continue
- cur_filenames.add(item.path)
+ cur_filenames.add(item.relative_path)
- # continue if the item did not change (checksum still the same)
- prev_checksum = file_checksums.get(item.path)
- if item.checksum == prev_checksum:
- continue
+ # continue if the item did not change (checksum still the same)
+ prev_checksum = file_checksums.get(item.relative_path)
+ if item.checksum == prev_checksum:
+ continue
+ self._process_item(item, prev_checksum)
+ finally:
+ self.sync_running = False
- await tm.create_task_with_limit(self._process_item(item, prev_checksum))
+ await asyncio.to_thread(run_sync)
+ end_time = time.time()
+ self.logger.info(
+ "Library sync for %s completed in %.2f seconds",
+ self.name,
+ end_time - start_time,
+ )
# work out deletions
deleted_files = prev_filenames - cur_filenames
await self._process_deletions(deleted_files)
# process orphaned albums and artists
await self._process_orphaned_albums_and_artists()
- async def _process_item(self, item: FileSystemItem, prev_checksum: str | None) -> None:
- """Process a single item."""
+ def _process_item(self, item: FileSystemItem, prev_checksum: str | None) -> None:
+ """Process a single item. NOT async friendly."""
try:
- self.logger.debug("Processing: %s", item.path)
+ self.logger.debug("Processing: %s", item.relative_path)
if item.ext in TRACK_EXTENSIONS:
- # add/update track to db
- # note that filesystem items are always overwriting existing info
- # when they are detected as changed
- track = await self._parse_track(item)
- await self.mass.music.tracks.add_item_to_library(
- track, overwrite_existing=prev_checksum is not None
- )
- elif item.ext in PLAYLIST_EXTENSIONS:
- playlist = await self.get_playlist(item.path)
- # add/update] playlist to db
- playlist.cache_checksum = item.checksum
- # playlist is always favorite
- playlist.favorite = True
- await self.mass.music.playlists.add_item_to_library(
- playlist,
- overwrite_existing=prev_checksum is not None,
- )
+ # handle track item
+ tags = parse_tags(item.absolute_path, item.file_size)
+
+ async def process_track() -> None:
+ track = await self._parse_track(item, tags)
+ # add/update track to db
+ # note that filesystem items are always overwriting existing info
+ # when they are detected as changed
+ await self.mass.music.tracks.add_item_to_library(
+ track, overwrite_existing=prev_checksum is not None
+ )
+
+ asyncio.run_coroutine_threadsafe(process_track(), self.mass.loop).result()
+ return
+
+ if item.ext in PLAYLIST_EXTENSIONS:
+
+ async def process_playlist() -> None:
+ playlist = await self.get_playlist(item.relative_path)
+ # add/update] playlist to db
+ playlist.cache_checksum = item.checksum
+ # playlist is always favorite
+ playlist.favorite = True
+ await self.mass.music.playlists.add_item_to_library(
+ playlist,
+ overwrite_existing=prev_checksum is not None,
+ )
+
+ asyncio.run_coroutine_threadsafe(process_playlist(), self.mass.loop).result()
+ return
+
except Exception as err:
# we don't want the whole sync to crash on one file so we catch all exceptions here
self.logger.error(
"Error processing %s - %s",
- item.path,
+ item.relative_path,
str(err),
exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None,
)
for prov_mapping in track.provider_mappings:
if prov_mapping.provider_instance == self.instance_id:
file_item = await self.resolve(prov_mapping.item_id)
- full_track = await self._parse_track(file_item)
+ tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
+ full_track = await self._parse_track(file_item, tags)
assert isinstance(full_track.album, Album)
return full_track.album
msg = f"Album not found: {prov_album_id}"
raise MediaNotFoundError(msg)
file_item = await self.resolve(prov_track_id)
- return await self._parse_track(file_item, full_album_metadata=True)
+ tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
+ return await self._parse_track(file_item, tags=tags, full_album_metadata=True)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
file_item = await self.resolve(prov_playlist_id)
playlist = Playlist(
- item_id=file_item.path,
+ item_id=file_item.relative_path,
provider=self.instance_id,
name=file_item.name,
provider_mappings={
ProviderMapping(
- item_id=file_item.path,
+ item_id=file_item.relative_path,
provider_domain=self.domain,
provider_instance=self.instance_id,
details=file_item.checksum,
# try to resolve the filename
for filename in (line, os.path.join(playlist_path, line)):
with contextlib.suppress(FileNotFoundError):
- item = await self.resolve(filename)
- return await self._parse_track(item)
+ file_item = await self.resolve(filename)
+ tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
+ return await self._parse_track(file_item, tags)
except MusicAssistantError as err:
self.logger.warning("Could not parse uri/file %s to track: %s", line, str(err))
if library_item is None:
# this could be a file that has just been added, try parsing it
file_item = await self.resolve(item_id)
- if not (library_item := await self._parse_track(file_item)):
+ tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
+ if not (library_item := await self._parse_track(file_item, tags)):
msg = f"Item not found: {item_id}"
raise MediaNotFoundError(msg)
return file_item.absolute_path
async def _parse_track(
- self, file_item: FileSystemItem, full_album_metadata: bool = False
+ self, file_item: FileSystemItem, tags: AudioTags, full_album_metadata: bool = False
) -> Track:
- """Get full track details by id."""
+ """Get full track details by id. NOT async friendly."""
# ruff: noqa: PLR0915, PLR0912
-
- # parse tags
- tags = await parse_tags(file_item.absolute_path, file_item.file_size)
name, version = parse_title_and_version(tags.title, tags.version)
track = Track(
- item_id=file_item.path,
+ item_id=file_item.relative_path,
provider=self.instance_id,
name=name,
sort_name=tags.title_sort,
version=version,
provider_mappings={
ProviderMapping(
- item_id=file_item.path,
+ item_id=file_item.relative_path,
provider_domain=self.domain,
provider_instance=self.instance_id,
audio_format=AudioFormat(
# album
album = track.album = (
- await self._parse_album(track_path=file_item.path, track_tags=tags)
+ await self._parse_album(track_path=file_item.relative_path, track_tags=tags)
if tags.album
else None
)
[
MediaItemImage(
type=ImageType.THUMB,
- path=file_item.path,
+ path=file_item.relative_path,
provider=self.instance_id,
remotely_accessible=False,
)
# handle (optional) loudness measurement tag(s)
if tags.track_loudness is not None:
- await self.mass.music.set_loudness(
- track.item_id,
- self.instance_id,
- tags.track_loudness,
- tags.track_album_loudness,
+ self.mass.create_task(
+ self.mass.music.set_loudness(
+ track.item_id,
+ self.instance_id,
+ tags.track_loudness,
+ tags.track_album_loudness,
+ )
)
return track
async def _get_local_images(self, folder: str) -> UniqueList[MediaItemImage]:
"""Return local images found in a given folderpath."""
images: UniqueList[MediaItemImage] = UniqueList()
- async for item in self.listdir(folder):
- if "." not in item.path or item.is_dir:
+ abs_path = self.get_absolute_path(folder)
+ for item in await asyncio.to_thread(sorted_scandir, self.base_path, abs_path, sort=False):
+ if "." not in item.relative_path or item.is_dir:
continue
for ext in IMAGE_EXTENSIONS:
if item.ext != ext:
images.append(
MediaItemImage(
type=ImageType(item.name),
- path=item.path,
+ path=item.relative_path,
provider=self.instance_id,
remotely_accessible=False,
)
images.append(
MediaItemImage(
type=ImageType.THUMB,
- path=item.path,
+ path=item.relative_path,
provider=self.instance_id,
remotely_accessible=False,
)
except Exception as err:
self.logger.debug("Write access disabled: %s", str(err))
- async def listdir(
- self, path: str, recursive: bool = False, sort: bool = False
- ) -> AsyncGenerator[FileSystemItem, None]:
- """List contents of a given provider directory/path.
-
- Parameters
- ----------
- - path: path of the directory (relative or absolute) to list contents of.
- Empty string for provider's root.
- - recursive: If True will recursively keep unwrapping subdirectories (scandir equivalent).
-
- Returns:
- -------
- AsyncGenerator yielding FileSystemItem objects.
-
- """
- abs_path = self.get_absolute_path(path)
- for entry in await asyncio.to_thread(sorted_scandir, self.base_path, abs_path, sort):
- if recursive and entry.is_dir:
- try:
- async for subitem in self.listdir(entry.absolute_path, True, sort):
- yield subitem
- except (OSError, PermissionError) as err:
- self.logger.warning("Skip folder %s: %s", entry.path, str(err))
- else:
- yield entry
-
async def resolve(
self,
file_path: str,
absolute_path = self.get_absolute_path(file_path)
def _create_item() -> FileSystemItem:
+ if os.path.isdir(absolute_path):
+ return FileSystemItem(
+ filename=os.path.basename(file_path),
+ relative_path=get_relative_path(self.base_path, file_path),
+ absolute_path=absolute_path,
+ is_dir=True,
+ )
stat = os.stat(absolute_path, follow_symlinks=False)
return FileSystemItem(
filename=os.path.basename(file_path),
- path=get_relative_path(self.base_path, file_path),
+ relative_path=get_relative_path(self.base_path, file_path),
absolute_path=absolute_path,
- is_dir=os.path.isdir(absolute_path),
- is_file=os.path.isfile(absolute_path),
+ is_dir=False,
checksum=str(int(stat.st_mtime)),
file_size=stat.st_size,
)