"""Filesystem musicprovider support for MusicAssistant."""
+import asyncio
import base64
import os
-import re
-from typing import List, Optional
+from typing import List, Optional, Tuple
import aiofiles
-
-import taglib
-
-from music_assistant.models.errors import InvalidDataError
-from music_assistant.helpers.util import parse_title_and_version
+from tinytag import TinyTag
+from music_assistant.helpers.util import parse_title_and_version, try_parse_int
+from music_assistant.helpers.compare import get_compare_string, compare_strings
from music_assistant.models.media_items import (
Album,
+ AlbumType,
Artist,
ContentType,
MediaItemProviderId,
from music_assistant.models.provider import MusicProvider
+def split_items(org_str: str) -> Tuple[str]:
+ """Split up a tag string by common splitter."""
+ if org_str is None:
+ return tuple()
+ for splitter in ["/", ";", ","]:
+ if splitter in org_str:
+ return tuple((x.strip() for x in org_str.split(splitter)))
+ return (org_str,)
+
+
class FileSystemProvider(MusicProvider):
"""
Very basic implementation of a musicprovider for local files.
Should be compatible with LMS
"""
- # pylint chokes on taglib so ignore these
- # pylint: disable=unsubscriptable-object,unsupported-membership-test
-
def __init__(self, music_dir: str, playlist_dir: str | None = None) -> None:
"""
Initialize the Filesystem provider.
:param media_types: A list of media_types to include. All types if None.
:param limit: Number of items to return in the search (per type).
"""
- # TODO !
- return []
+ result = []
+ for track in await self.get_library_tracks():
+ for search_part in search_query.split(" - "):
+ if media_types is None or MediaType.TRACK in media_types:
+ if compare_strings(track.name, search_part):
+ result.append(track)
+ if media_types is None or MediaType.ALBUM in media_types:
+ if track.album:
+ if compare_strings(track.album.name, search_part):
+ result.append(track.album)
+ if media_types is None or MediaType.ARTIST in media_types:
+ if track.album and track.album.artist:
+ if compare_strings(track.album.artist, search_part):
+ result.append(track.album.artist)
+ return result
async def get_library_artists(self) -> List[Artist]:
"""Retrieve all library artists."""
- if not os.path.isdir(self._music_dir):
- self.logger.error("music path does not exist: %s", self._music_dir)
- return None
result = []
- for dirname in os.listdir(self._music_dir):
- dirpath = os.path.join(self._music_dir, dirname)
- if os.path.isdir(dirpath) and not dirpath.startswith("."):
- artist = await self.get_artist(dirpath)
- if artist:
- result.append(artist)
+ prev_ids = set()
+ for track in await self.get_library_tracks():
+ if track.album is not None and track.album.artist is not None:
+ if track.album.artist.item_id not in prev_ids:
+ result.append(track.album.artist)
+ prev_ids.add(track.album.artist.item_id)
return result
async def get_library_albums(self) -> List[Album]:
"""Get album folders recursively."""
result = []
- for artist in await self.get_library_artists():
- for album in await self.get_artist_albums(artist.item_id):
- result.append(album)
+ prev_ids = set()
+ for track in await self.get_library_tracks():
+ if track.album is not None:
+ if track.album.item_id not in prev_ids:
+ result.append(track.album)
+ prev_ids.add(track.album.item_id)
return result
async def get_library_tracks(self) -> List[Track]:
"""Get all tracks recursively."""
- # TODO: support disk subfolders
+ # TODO: apply caching for very large libraries ?
result = []
- for album in await self.get_library_albums():
- for track in await self.get_album_tracks(album.item_id):
- result.append(track)
+ for _root, _dirs, _files in os.walk(self._music_dir):
+ for file in _files:
+ filename = os.path.join(_root, file)
+ if TinyTag.is_supported(filename):
+ if track := await self._parse_track(filename):
+ result.append(track)
return result
async def get_library_playlists(self) -> List[Playlist]:
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
- if os.sep not in prov_artist_id:
- itempath = base64.b64decode(prov_artist_id).decode("utf-8")
- else:
- itempath = prov_artist_id
- prov_artist_id = base64.b64encode(itempath.encode("utf-8")).decode("utf-8")
- if not os.path.isdir(itempath):
- self.logger.error("Artist path does not exist: %s", itempath)
- return None
- name = itempath.split(os.sep)[-1]
- artist = Artist(item_id=prov_artist_id, provider=self.id, name=name)
- artist.provider_ids.append(
- MediaItemProviderId(provider=self.id, item_id=artist.item_id)
+ return next(
+ (
+ track.album.artist
+ for track in await self.get_library_tracks()
+ if track.album is not None
+ and track.album.artist is not None
+ and track.album.artist.item_id == prov_artist_id
+ ),
+ None,
)
- return artist
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
- if os.sep not in prov_album_id:
- itempath = base64.b64decode(prov_album_id).decode("utf-8")
- else:
- itempath = prov_album_id
- prov_album_id = base64.b64encode(itempath.encode("utf-8")).decode("utf-8")
- if not os.path.isdir(itempath):
- self.logger.error("album path does not exist: %s", itempath)
- return None
- name = itempath.split(os.sep)[-1]
- artistpath = itempath.rsplit(os.sep, 1)[0]
- name, version = parse_title_and_version(name)
- album = Album(
- item_id=prov_album_id, provider=self.id, name=name, version=version
- )
- album.artist = await self.get_artist(artistpath)
- if not album.artist:
- raise InvalidDataError(f"No album artist ! {artistpath}")
- album.provider_ids.append(
- MediaItemProviderId(provider=self.id, item_id=prov_album_id)
+ return next(
+ (
+ track.album
+ for track in await self.get_library_tracks()
+ if track.album is not None
+ and track.album.item_id == prov_album_id
+ ),
+ None,
)
- return album
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
playlist.provider_ids.append(
MediaItemProviderId(provider=self.id, item_id=prov_playlist_id)
)
- playlist.owner = "disk"
+ playlist.owner = self._attr_name
playlist.checksum = os.path.getmtime(itempath)
return playlist
async def get_album_tracks(self, prov_album_id) -> List[Track]:
"""Get album tracks for given album id."""
- result = []
- if os.sep not in prov_album_id:
- albumpath = base64.b64decode(prov_album_id).decode("utf-8")
- else:
- albumpath = prov_album_id
- if not os.path.isdir(albumpath):
- self.logger.error("album path does not exist: %s", albumpath)
- return []
- album = await self.get_album(albumpath)
- for filename in os.listdir(albumpath):
- filepath = os.path.join(albumpath, filename)
- if os.path.isfile(filepath) and not filepath.startswith("."):
- track = await self._parse_track(filepath)
- if track:
- track.album = album
- result.append(track)
- return result
+ return [
+ track
+ for track in await self.get_library_tracks()
+ if track.album is not None and track.album.item_id == prov_album_id
+ ]
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get playlist tracks for given playlist id."""
async def get_artist_albums(self, prov_artist_id: str) -> List[Album]:
"""Get a list of albums for the given artist."""
- result = []
- if os.sep not in prov_artist_id:
- artistpath = base64.b64decode(prov_artist_id).decode("utf-8")
- else:
- artistpath = prov_artist_id
- if not os.path.isdir(artistpath):
- self.logger.error("artist path does not exist: %s", artistpath)
- return
- for dirname in os.listdir(artistpath):
- dirpath = os.path.join(artistpath, dirname)
- if os.path.isdir(dirpath) and not dirpath.startswith("."):
- album = await self.get_album(dirpath)
- if album:
- result.append(album)
- return result
+ return [
+ track.album
+ for track in await self.get_library_tracks()
+ if track.album is not None
+ and track.album.artist is not None
+ and track.album.artist.item_id == prov_artist_id
+ ]
async def get_artist_toptracks(self, prov_artist_id: str) -> List[Track]:
- """Get a list of random tracks as we have no clue about preference."""
- result = []
- for album in await self.get_artist_albums(prov_artist_id):
- for track in await self.get_album_tracks(album.item_id):
- result.append(track)
- return result
+ """Get a list of all tracks as we have no clue about preference."""
+ return [
+ track
+ for track in await self.get_library_tracks()
+ if track.artists is not None
+ and prov_artist_id in [x.item_id for x in track.provider_ids]
+ ]
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
+ filename = item_id
if os.sep not in item_id:
- track_id = base64.b64decode(item_id).decode("utf-8")
- if not os.path.isfile(track_id):
+ filename = base64.b64decode(item_id).decode("utf-8")
+ if not os.path.isfile(filename):
return None
- # TODO: retrieve sanple rate and bitdepth
+
+ def parse_tag():
+ return TinyTag.get(filename)
+
+ tag = await self.mass.loop.run_in_executor(None, parse_tag)
+
return StreamDetails(
type=StreamType.FILE,
provider=self.id,
item_id=item_id,
- content_type=ContentType(track_id.split(".")[-1]),
- path=track_id,
- sample_rate=44100,
- bit_depth=16,
+ content_type=ContentType(filename.split(".")[-1]),
+ path=filename,
+ sample_rate=tag.samplerate or 44100,
+ bit_depth=16, # TODO: parse bitdepth
)
- async def _parse_track(self, filename):
- """Try to parse a track from a filename with taglib."""
- # pylint: disable=broad-except
- try:
- song = taglib.File(filename)
- except Exception:
- return None # not a media file ?
+ async def _parse_track(self, filename: str) -> Track | None:
+ """Try to parse a track from a filename by reading its tags."""
+ if not TinyTag.is_supported(filename):
+ return None
+
+ def parse_tag():
+ return TinyTag.get(filename)
+
+ # TODO: Fall back to parsing base details from filename if no tags found/supported
+ tag = await self.mass.loop.run_in_executor(None, parse_tag)
prov_item_id = base64.b64encode(filename.encode("utf-8")).decode("utf-8")
- try:
- name = song.tags["TITLE"][0]
- except KeyError:
- name = filename.split("/")[-1].split(".")[0]
- name, version = parse_title_and_version(name)
+ name, version = parse_title_and_version(tag.title)
track = Track(
item_id=prov_item_id, provider=self.id, name=name, version=version
)
- track.duration = song.length
- albumpath = filename.rsplit(os.sep, 1)[0]
- track.album = await self.get_album(albumpath)
- if "ARTIST" in song.tags:
- artists = []
- for artist_str in song.tags["ARTIST"]:
- local_artist_path = os.path.join(self._music_dir, artist_str)
- if os.path.isfile(local_artist_path):
- artist = await self.get_artist(local_artist_path)
- else:
- fake_artistpath = os.path.join(self._music_dir, artist_str)
- artist = Artist(
- item_id=fake_artistpath, provider=self.id, name=artist_str
- )
- artist.provider_ids.append(
- MediaItemProviderId(
- provider=self.id,
- item_id=base64.b64encode(
- fake_artistpath.encode("utf-8")
- ).decode("utf-8"),
- )
- )
- artists.append(artist)
- track.artists = artists
- else:
- artistpath = filename.rsplit(os.sep, 2)[0]
- artist = await self.get_artist(artistpath)
- track.artists.append(artist)
- if "GENRE" in song.tags:
- track.metadata["genres"] = song.tags["GENRE"]
- if "ISRC" in song.tags and song.tags["ISRC"]:
- track.isrc = song.tags["ISRC"][0]
- if "DISCNUMBER" in song.tags and song.tags["DISCNUMBER"]:
- regexp_numbers = re.findall(r"\d+", song.tags["DISCNUMBER"][0])
- track.disc_number = int(regexp_numbers[0] if regexp_numbers else "0")
- if "TRACKNUMBER" in song.tags and song.tags["TRACKNUMBER"]:
- regexp_numbers = re.findall(r"\d+", song.tags["TRACKNUMBER"][0])
- track.track_number = int(regexp_numbers[0] if regexp_numbers else "0")
+ track.duration = tag.duration
+ # parse track artists
+ track.artists = [
+ Artist(
+ item_id=get_compare_string(item),
+ provider=self._attr_id,
+ name=item,
+ )
+ for item in split_items(tag.artist)
+ ]
+
+ # parse album
+ if tag.album is not None:
+ track.album = Album(
+ item_id=get_compare_string(tag.album),
+ provider=self._attr_id,
+ name=tag.album,
+ year=try_parse_int(tag.year),
+ )
+ if tag.albumartist is not None:
+ track.album.artist = Artist(
+ item_id=get_compare_string(tag.albumartist),
+ provider=self._attr_id,
+ name=tag.albumartist,
+ )
+ if tag.title.lower().startswith(tag.album.lower()):
+ track.album.album_type = AlbumType.SINGLE
+ elif tag.albumartist not in split_items(tag.artist):
+ track.album.album_type = AlbumType.COMPILATION
+ else:
+ track.album.album_type = AlbumType.ALBUM
+ # parse other info
+ track.metadata["genres"] = split_items(tag.genre)
+ track.disc_number = try_parse_int(tag.disc)
+ track.track_number = try_parse_int(tag.track)
+ track.isrc = tag.extra.get("isrc", "")
+ if "copyright" in tag.extra:
+ track.metadata["copyright"] = tag.extra["copyright"]
+ if "lyrics" in tag.extra:
+ track.metadata["lyrics"] = tag.extra["lyrics"]
+
quality_details = ""
if filename.endswith(".flac"):
# TODO: get bit depth
quality = MediaQuality.FLAC_LOSSLESS
- if song.sampleRate > 192000:
+ if tag.samplerate > 192000:
quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4
- elif song.sampleRate > 96000:
+ elif tag.samplerate > 96000:
quality = MediaQuality.FLAC_LOSSLESS_HI_RES_3
- elif song.sampleRate > 48000:
+ elif tag.samplerate > 48000:
quality = MediaQuality.FLAC_LOSSLESS_HI_RES_2
- quality_details = f"{song.sampleRate / 1000} Khz"
+ quality_details = f"{tag.samplerate / 1000} Khz"
elif filename.endswith(".ogg"):
quality = MediaQuality.LOSSY_OGG
- quality_details = f"{song.bitrate} kbps"
+ quality_details = f"{tag.bitrate} kbps"
elif filename.endswith(".m4a"):
quality = MediaQuality.LOSSY_AAC
- quality_details = f"{song.bitrate} kbps"
+ quality_details = f"{tag.bitrate} kbps"
else:
quality = MediaQuality.LOSSY_MP3
- quality_details = f"{song.bitrate} kbps"
+ quality_details = f"{tag.bitrate} kbps"
track.provider_ids.append(
MediaItemProviderId(
provider=self.id,