msg = f"Music Directory {conf_path} does not exist"
raise SetupFailedError(msg)
prov = LocalFileSystemProvider(mass, manifest, config)
- prov.base_path = config.get_value(CONF_PATH)
+ prov.base_path = str(config.get_value(CONF_PATH))
await prov.check_write_access()
mass.call_later(30, prov.migrate_playlists)
return prov
)
-def sorted_scandir(base_path: str, sub_path: str) -> list[os.DirEntry]:
+def sorted_scandir(base_path: str, sub_path: str) -> list[FileSystemItem]:
"""Implement os.scandir that returns (naturally) sorted entries."""
- def nat_key(name: str) -> tuple[int, str]:
+ def nat_key(name: str) -> tuple[int | str, ...]:
"""Sort key for natural sorting."""
return tuple(int(s) if s.isdigit() else s for s in re.split(r"(\d+)", name))
- def create_item(entry: os.DirEntry):
+ def create_item(entry: os.DirEntry) -> FileSystemItem:
"""Create FileSystemItem from os.DirEntry."""
absolute_path = get_absolute_path(base_path, entry.path)
stat = entry.stat(follow_symlinks=False)
"""
absolute_path = get_absolute_path(self.base_path, file_path)
- def _create_item():
+ def _create_item() -> FileSystemItem:
stat = os.stat(absolute_path, follow_symlinks=False)
return FileSystemItem(
filename=os.path.basename(file_path),
if not file_path:
return False # guard
abs_path = get_absolute_path(self.base_path, file_path)
- return await exists(abs_path)
+ return bool(await exists(abs_path))
async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]:
"""Yield (binary) contents of file in chunks of bytes."""
continue
if item.ext != "m3u":
continue
- playlist_data = b""
+ playlist_bytes = b""
async for chunk in self.read_file_content(item.absolute_path):
- playlist_data += chunk
- encoding_details = await asyncio.to_thread(cchardet.detect, playlist_data)
- playlist_data = playlist_data.decode(encoding_details["encoding"] or "utf-8")
+ playlist_bytes += chunk
+ encoding_details = await asyncio.to_thread(cchardet.detect, playlist_bytes)
+ playlist_data = playlist_bytes.decode(encoding_details["encoding"] or "utf-8")
# a (legacy) playlist file created by MA does not have EXTINFO tags and has uri's
if "EXTINF" in playlist_data or "://" not in playlist_data:
continue
ProviderMapping,
SearchResults,
Track,
+ UniqueList,
+ is_track,
)
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import (
AsyncGenerator yielding FileSystemItem objects.
"""
- yield
+ # mypy will infer wrong type without an explicit yield
+ # https://github.com/python/mypy/issues/5070
+ yield # type: ignore[misc]
@abstractmethod
async def resolve(self, file_path: str) -> FileSystemItem:
@abstractmethod
async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]:
"""Yield (binary) contents of file in chunks of bytes."""
+ # mypy will infer wrong type without an explicit yield
+ # https://github.com/python/mypy/issues/5070
+ yield # type: ignore[misc]
@abstractmethod
async def write_file_content(self, file_path: str, data: bytes) -> None:
async def search(
self,
search_query: str,
- media_types=list[MediaType] | None,
+ media_types: list[MediaType] | None,
limit: int = 5,
) -> SearchResults:
"""Perform search on this file based musicprovider."""
)
return result
- async def browse(self, path: str, offset: int, limit: int) -> list[MediaItemType]:
+ async def browse(self, path: str, offset: int, limit: int) -> list[MediaItemType | ItemMapping]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provid://artists).
if offset:
# we do not support pagination
return []
- items: list[MediaItemType] = []
+ items: list[MediaItemType | ItemMapping] = []
item_path = path.split("://", 1)[1]
if not item_path:
item_path = ""
async def sync_library(self, media_types: tuple[MediaType, ...]) -> None:
"""Run library sync for this provider."""
+ assert self.mass.music.database
file_checksums: dict[str, str] = {}
query = (
f"SELECT provider_item_id, details FROM {DB_TABLE_PROVIDER_MAPPINGS} "
async def _process_orphaned_albums_and_artists(self) -> None:
"""Process deletion of orphaned albums and artists."""
+ assert self.mass.music.database
# Remove albums without any tracks
query = (
f"SELECT item_id FROM {DB_TABLE_ALBUMS} "
if library_item := await controller.get_library_item_by_prov_id(
file_path, self.instance_id
):
- if library_item.media_type == MediaType.TRACK:
+ if is_track(library_item):
if library_item.album:
album_ids.add(library_item.album.item_id)
# need to fetch the library album to resolve the itemmapping
for prov_mapping in track.provider_mappings:
if prov_mapping.provider_instance == self.instance_id:
full_track = await self.get_track(prov_mapping.item_id)
+ assert isinstance(full_track.album, Album)
return full_track.album
msg = f"Album not found: {prov_album_id}"
raise MediaNotFoundError(msg)
_, ext = prov_playlist_id.rsplit(".", 1)
try:
# get playlist file contents
- playlist_data = b""
+ playlist_bytes = b""
async for chunk in self.read_file_content(prov_playlist_id):
- playlist_data += chunk
- encoding_details = await asyncio.to_thread(cchardet.detect, playlist_data)
- playlist_data = playlist_data.decode(encoding_details["encoding"] or "utf-8")
+ playlist_bytes += chunk
+ encoding_details = await asyncio.to_thread(cchardet.detect, playlist_bytes)
+ playlist_data = playlist_bytes.decode(encoding_details["encoding"] or "utf-8")
if ext in ("m3u", "m3u8"):
playlist_lines = parse_m3u(playlist_data)
except MusicAssistantError as err:
self.logger.warning("Could not parse uri/file %s to track: %s", line, str(err))
- return None
+
+ return None
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
"""Add track(s) to playlist."""
if not await self.exists(prov_playlist_id):
msg = f"Playlist path does not exist: {prov_playlist_id}"
raise MediaNotFoundError(msg)
- playlist_data = b""
+ playlist_bytes = b""
async for chunk in self.read_file_content(prov_playlist_id):
- playlist_data += chunk
- encoding_details = await asyncio.to_thread(cchardet.detect, playlist_data)
- playlist_data = playlist_data.decode(encoding_details["encoding"] or "utf-8")
+ playlist_bytes += chunk
+ encoding_details = await asyncio.to_thread(cchardet.detect, playlist_bytes)
+ playlist_data = playlist_bytes.decode(encoding_details["encoding"] or "utf-8")
for file_path in prov_track_ids:
track = await self.get_track(file_path)
playlist_data += f"\n#EXTINF:{track.duration or 0},{track.name}\n{file_path}\n"
raise MediaNotFoundError(msg)
_, ext = prov_playlist_id.rsplit(".", 1)
# get playlist file contents
- playlist_data = b""
+ playlist_bytes = b""
async for chunk in self.read_file_content(prov_playlist_id):
- playlist_data += chunk
- encoding_details = await asyncio.to_thread(cchardet.detect, playlist_data)
- playlist_data = playlist_data.decode(encoding_details["encoding"] or "utf-8")
+ playlist_bytes += chunk
+ encoding_details = await asyncio.to_thread(cchardet.detect, playlist_bytes)
+ playlist_data = playlist_bytes.decode(encoding_details["encoding"] or "utf-8")
# get current contents first
if ext in ("m3u", "m3u8"):
playlist_items = parse_m3u(playlist_data)
details=file_item.checksum,
)
},
- disc_number=tags.disc,
- track_number=tags.track,
+ disc_number=tags.disc or 0,
+ track_number=tags.track or 0,
)
if isrc_tags := tags.isrc:
if acoustid := tags.get("acoustidid"):
track.external_ids.add((ExternalID.ACOUSTID, acoustid))
+ album: Album | None
+ album_artists: list[Artist] = []
+
# album
if tags.album:
# work out if we have an album and/or disc folder
album_dir = get_album_dir(file_item.path, tags.album, disc_dir)
# album artist(s)
- album_artists = []
if tags.album_artists:
for index, album_artist_str in enumerate(tags.album_artists):
artist = await self._parse_artist(album_artist_str, album_path=album_dir)
)
album_artists = [await self._parse_artist(name=VARIOUS_ARTISTS_NAME)]
- track.album = await self._parse_album(
+ album = track.album = await self._parse_album(
tags.album,
track_path=file_item.path,
album_path=album_dir,
# track artist(s)
for index, track_artist_str in enumerate(tags.artists):
# reuse album artist details if possible
- if track.album and (
- album_artist := next(
- (x for x in track.album.artists if x.name == track_artist_str), None
- )
+ if album and (
+ album_artist := next((x for x in album_artists if x.name == track_artist_str), None)
):
artist = album_artist
else:
# we do not actually embed the image in the metadata because that would consume too
# much space and bandwidth. Instead we set the filename as value so the image can
# be retrieved later in realtime.
- track.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=file_item.path,
- provider=self.instance_id,
- remotely_accessible=False,
- )
- ]
+ track.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=file_item.path,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ )
- if track.album and not track.album.metadata.images:
+ if album and not album.metadata.images:
# set embedded cover on album if it does not have one yet
- track.album.metadata.images = track.metadata.images
+ album.metadata.images = track.metadata.images
# copy album image from track (only if the album itself doesn't have an image)
# this deals with embedded images from filesystem providers
- if track.album and not track.album.image and track.image:
- track.album.metadata.images = [track.image]
+ if album and not album.image and track.image:
+ album.metadata.images = UniqueList([track.image])
# parse other info
- track.duration = tags.duration or 0
+ track.duration = int(tags.duration or 0)
track.metadata.genres = set(tags.genres)
if tags.disc:
track.disc_number = tags.disc
if explicit_tag is not None:
track.metadata.explicit = explicit_tag == "1"
track.mbid = tags.musicbrainz_recordingid
- track.metadata.chapters = tags.chapters
- if track.album:
- if not track.album.mbid:
- track.album.mbid = tags.musicbrainz_releasegroupid
- if not track.album.year:
- track.album.year = tags.year
- track.album.album_type = tags.album_type
- track.album.metadata.explicit = track.metadata.explicit
+ track.metadata.chapters = UniqueList(tags.chapters)
+ if album:
+ if not album.mbid:
+ album.mbid = tags.musicbrainz_releasegroupid
+ if not album.year:
+ album.year = tags.year
+ album.album_type = tags.album_type
+ album.metadata.explicit = track.metadata.explicit
# set checksum to invalidate any cached listings
track.metadata.cache_checksum = file_item.checksum
- if track.album:
+ if album:
# use track checksum for album(artists) too
- track.album.metadata.cache_checksum = track.metadata.cache_checksum
- for artist in track.album.artists:
+ album.metadata.cache_checksum = track.metadata.cache_checksum
+ for artist in album_artists:
artist.metadata.cache_checksum = track.metadata.cache_checksum
return track
"""Parse Artist metadata into an Artist object."""
cache_key = f"{self.instance_id}-artistdata-{name}-{artist_path}"
if cache := await self.mass.cache.get(cache_key):
+ assert isinstance(cache, Artist)
return cache
if not artist_path and album_path:
# try to find (album)artist folder based on album path
artist.metadata.genres = set(split_items(genre))
# find local images
if images := await self._get_local_images(artist_path):
- artist.metadata.images = images
+ artist.metadata.images = UniqueList(images)
await self.mass.cache.set(cache_key, artist, expiration=120)
return artist
"""Parse Album metadata into an Album object."""
cache_key = f"{self.instance_id}-albumdata-{name}-{album_path}"
if cache := await self.mass.cache.get(cache_key):
+ assert isinstance(cache, Album)
return cache
if album_path:
provider=self.instance_id,
name=name,
sort_name=sort_name,
- artists=artists,
+ artists=UniqueList(artists),
provider_mappings={
ProviderMapping(
item_id=item_id,
# find local images
if images := await self._get_local_images(folder_path):
if album.metadata.images is None:
- album.metadata.images = images
+ album.metadata.images = UniqueList(images)
else:
album.metadata.images += images