"""Filesystem musicprovider support for MusicAssistant."""
from __future__ import annotations
+import base64
import os
from typing import List, Optional, Tuple
import aiofiles
-from tinytag import TinyTag
+from tinytag.tinytag import TinyTag
from music_assistant.helpers.compare import compare_strings
from music_assistant.helpers.util import parse_title_and_version, try_parse_int
AlbumType,
Artist,
ContentType,
+ ImageType,
+ MediaItemImage,
MediaItemProviderId,
MediaItemType,
MediaQuality,
from music_assistant.models.provider import MusicProvider
-def split_items(org_str: str) -> Tuple[str]:
- """Split up a tag string by common splitter."""
+def split_items(org_str: str, splitters: Tuple[str] = None) -> Tuple[str]:
+ """Split up a tags string by common splitter."""
+ if splitters is None:
+ splitters = ("/", ";", ",")
if org_str is None:
return tuple()
- for splitter in ["/", ";", ","]:
+ for splitter in splitters:
if splitter in org_str:
return tuple((x.strip() for x in org_str.split(splitter)))
return (org_str,)
-DB_TABLE = "filesystem_mappings"
+FALLBACK_ARTIST = "Various Artists"
+ARTIST_SPLITTERS = (";", ",", "Featuring", " Feat. ", " Feat ", "feat.", " & ")
class FileSystemProvider(MusicProvider):
raise FileNotFoundError(
f"Playlist Directory {self._playlists_dir} does not exist"
)
- # simple db table to keep a mapping of filename to id
- async with self.mass.database.get_db() as _db:
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {DB_TABLE}(
- item_id INTEGER PRIMARY KEY AUTOINCREMENT,
- filename TEXT NOT NULL,
- media_type TEXT NOT NULL,
- UNIQUE(filename, media_type)
- );"""
- )
async def search(
self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
"""Retrieve all library artists."""
result = []
cur_ids = set()
- for track in await self.get_library_tracks(False):
- if track.album is not None and track.album.artist is not None:
- if track.album.artist.item_id not in cur_ids:
- result.append(track.album.artist)
- cur_ids.add(track.album.artist.item_id)
+ # for the sake of simplicity we only iterate over the files in one location only,
+ # which is the library tracks where we recursively enumerate the directory structure
+ # library artists = unique album artists across all tracks
+ # the track listing is cached so this should be (pretty) fast
+ for track in await self.get_library_tracks(True):
+ if track.album is None or track.album is None:
+ continue
+ if track.album.artist.item_id in cur_ids:
+ continue
+ result.append(track.album.artist)
+ cur_ids.add(track.album.artist.item_id)
return result
async def get_library_albums(self) -> List[Album]:
"""Get album folders recursively."""
result = []
cur_ids = set()
- for track in await self.get_library_tracks(False):
- if track.album is not None:
- if track.album.item_id not in cur_ids:
- result.append(track.album)
- cur_ids.add(track.album.item_id)
+ # for the sake of simplicity we only iterate over the files in one location only,
+ # which is the library tracks where we recurisvely enumerate the directory structure
+ # library albums = unique albums across all tracks
+ # the track listing is cached so this should be (pretty) fast
+ for track in await self.get_library_tracks(True):
+ if track.album is None:
+ continue
+ if track.album.item_id in cur_ids:
+ continue
+ result.append(track.album)
+ cur_ids.add(track.album.item_id)
return result
async def get_library_tracks(self, allow_cache=False) -> List[Track]:
"""Get all tracks recursively."""
# pylint: disable = arguments-differ
+ # we cache this listing in memory for performance and convenience reasons
+ # so we can easy retrieve the library artists and albums from the tracks listing
+ # if this may ever lead to memory issues, we can do the caching in db instead.
if allow_cache and self._cached_tracks:
return self._cached_tracks
result = []
cur_ids = set()
+ # find all music files in the music directory and all subfolders
for _root, _dirs, _files in os.walk(self._music_dir):
for file in _files:
filename = os.path.join(_root, file)
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- if os.sep in prov_track_id:
- # this is already a filename
- itempath = prov_track_id
- else:
- itempath = await self._get_filename(prov_track_id, MediaType.TRACK)
+ itempath = self._get_filename(prov_track_id)
if not os.path.isfile(itempath):
raise MediaNotFoundError(f"Track path does not exist: {itempath}")
return await self._parse_track(itempath)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
- if os.sep in prov_playlist_id:
- # this is already a filename
- itempath = prov_playlist_id
- else:
- itempath = await self._get_filename(prov_playlist_id, MediaType.PLAYLIST)
+ itempath = self._get_filename(prov_playlist_id)
if not os.path.isfile(itempath):
raise MediaNotFoundError(f"playlist path does not exist: {itempath}")
return await self._parse_playlist(itempath)
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get playlist tracks for given playlist id."""
result = []
- if os.sep in prov_playlist_id:
- # this is already a filename
- itempath = prov_playlist_id
- else:
- itempath = await self._get_filename(prov_playlist_id, MediaType.PLAYLIST)
+ itempath = self._get_filename(prov_playlist_id)
if not os.path.isfile(itempath):
raise MediaNotFoundError(f"playlist path does not exist: {itempath}")
index = 0
async def get_artist_albums(self, prov_artist_id: str) -> List[Album]:
"""Get a list of albums for the given artist."""
- return [
- track.album
- for track in await self.get_library_tracks(True)
- if track.album is not None
- and track.album.artist is not None
- and track.album.artist.item_id == prov_artist_id
- ]
+ result = []
+ cur_ids = set()
+ for track in await self.get_library_tracks(True):
+ if track.album is None:
+ continue
+ if track.album.item_id in cur_ids:
+ continue
+ if track.album.artist is None:
+ continue
+ if track.album.artist.item_id != prov_artist_id:
+ continue
+ result.append(track.album)
+ cur_ids.add(track.album.item_id)
+ return result
async def get_artist_toptracks(self, prov_artist_id: str) -> List[Track]:
"""Get a list of all tracks as we have no clue about preference."""
track
for track in await self.get_library_tracks(True)
if track.artists is not None
- and prov_artist_id in (x.item_id for x in track.provider_ids)
+ and prov_artist_id in (x.item_id for x in track.artists)
]
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- if os.sep in item_id:
- # this is already a filename
- itempath = item_id
- else:
- itempath = await self._get_filename(item_id, MediaType.TRACK)
+ itempath = self._get_filename(item_id)
if not os.path.isfile(itempath):
raise MediaNotFoundError(f"Track path does not exist: {itempath}")
def parse_tag():
return TinyTag.get(itempath)
- tag = await self.mass.loop.run_in_executor(None, parse_tag)
+ tags = await self.mass.loop.run_in_executor(None, parse_tag)
return StreamDetails(
type=StreamType.FILE,
item_id=item_id,
content_type=ContentType(itempath.split(".")[-1]),
path=itempath,
- sample_rate=tag.samplerate or 44100,
+ sample_rate=tags.samplerate or 44100,
bit_depth=16, # TODO: parse bitdepth
)
+ async def get_embedded_image(self, filename: str) -> str | None:
+ """Return the embedded image of an audio file as base64 string."""
+ if not TinyTag.is_supported(filename):
+ return None
+
+ def parse_tags():
+ return TinyTag.get(filename, tags=True, image=True, ignore_errors=True)
+
+ tags = await self.mass.loop.run_in_executor(None, parse_tags)
+ if image_data := tags.get_image():
+ enc_image = base64.b64encode(image_data).decode()
+ enc_image = f"data:image/png;base64,{enc_image}"
+ return enc_image
+
async def _parse_track(self, filename: str) -> Track | None:
"""Try to parse a track from a filename by reading its tags."""
if not TinyTag.is_supported(filename):
return None
- def parse_tag():
- return TinyTag.get(filename)
+ def parse_tags():
+ return TinyTag.get(filename, image=True, ignore_errors=True)
- # TODO: Fall back to parsing base details from filename if no tags found/supported
- tag = await self.mass.loop.run_in_executor(None, parse_tag)
+ # parse ID3 tags with TinyTag
+ tags = await self.mass.loop.run_in_executor(None, parse_tags)
- # we need at least a title and artist
- if tag.title is None or tag.artist is None:
- self.logger.warning("Skipping track due to invalid ID3 tags: %s", filename)
- return None
+ # use the relative filename as item_id
+ filename_base = filename.replace(self._music_dir, "")
+ if filename_base.startswith(os.sep):
+ filename_base = filename_base[1:]
+ prov_item_id = filename_base
- prov_item_id = await self._get_item_id(filename, MediaType.TRACK)
- name, version = parse_title_and_version(tag.title)
+ # work out if we have an artist/album/track.ext structure
+ filename_base = filename.replace(self._music_dir, "")
+ if filename_base.startswith(os.sep):
+ filename_base = filename_base[1:]
+ track_parts = filename_base.rsplit(os.sep)
+ if track_parts == 3:
+ album_artist_name = track_parts[0]
+ album_name = track_parts[1]
+ album_artist_name = tags.albumartist
+ album_name = tags.album
+
+ # prefer title from tag, fallback to filename
+ if tags.title:
+ track_title = tags.title
+ else:
+ ext = filename_base.split(".")[-1]
+ track_title = filename_base.replace(f".{ext}", "").replace("_", " ")
+ self.logger.warning(
+ "%s is missing ID3 tags, use filename as fallback", filename_base
+ )
+
+ name, version = parse_title_and_version(track_title)
track = Track(
item_id=prov_item_id, provider=self.id, name=name, version=version
)
- track.duration = tag.duration
- # parse track artists
+
+ # Parse track artist(s) from artist string using common splitters used in ID3 tags
+ # NOTE: do not use a '/' or '&' to prevent artists like AC/DC become messed up
+ track_artists_str = tags.artist or album_artist_name or FALLBACK_ARTIST
track.artists = [
Artist(
item_id=item,
provider=self._attr_id,
name=item,
)
- for item in split_items(tag.artist)
+ for item in split_items(track_artists_str, ARTIST_SPLITTERS)
]
- # parse album
- if tag.album is not None:
+ # Check if track has embedded metadata
+ if tags.get_image():
+ # we do not actually embed the image in the metadata because that would consume too
+ # much space and bandwidth. Instead we set the filename as value so the image can
+ # be retrieved later in realtime.
+ track.metadata.images = {MediaItemImage(ImageType.EMBEDDED_THUMB, filename)}
+
+ # Parse album (only if we have album + album artist tags)
+ if album_name and album_artist_name:
+ album_id = album_name
+ album_name, album_version = parse_title_and_version(album_name)
track.album = Album(
- item_id=tag.album,
+ item_id=album_id,
provider=self._attr_id,
- name=tag.album,
- year=try_parse_int(tag.year),
- )
- if tag.albumartist is not None:
- track.album.artist = Artist(
- item_id=tag.albumartist,
+ name=album_name,
+ version=album_version,
+ year=try_parse_int(tags.year) if tags.year else None,
+ artist=Artist(
+ item_id=album_artist_name,
provider=self._attr_id,
- name=tag.albumartist,
- )
- if tag.title.lower().startswith(tag.album.lower()):
+ name=album_artist_name,
+ ),
+ )
+ track.album.metadata.images = track.metadata.images
+
+ # try to guess the album type
+ if name.lower() == album_name.lower():
track.album.album_type = AlbumType.SINGLE
- elif tag.albumartist not in split_items(tag.artist):
+ elif album_artist_name not in (x.name for x in track.artists):
track.album.album_type = AlbumType.COMPILATION
else:
track.album.album_type = AlbumType.ALBUM
+
# parse other info
- track.metadata.genres = set(split_items(tag.genre))
- track.disc_number = try_parse_int(tag.disc)
- track.track_number = try_parse_int(tag.track)
- track.isrc = tag.extra.get("isrc", "")
- if "copyright" in tag.extra:
- track.metadata.copyright = tag.extra["copyright"]
- if "lyrics" in tag.extra:
- track.metadata.lyrics = tag.extra["lyrics"]
+ track.duration = tags.duration
+ track.metadata.genres = set(split_items(tags.genre))
+ track.disc_number = try_parse_int(tags.disc)
+ track.track_number = try_parse_int(tags.track)
+ track.isrc = tags.extra.get("isrc", "")
+ if "copyright" in tags.extra:
+ track.metadata.copyright = tags.extra["copyright"]
+ if "lyrics" in tags.extra:
+ track.metadata.lyrics = tags.extra["lyrics"]
quality_details = ""
if filename.endswith(".flac"):
# TODO: get bit depth
quality = MediaQuality.FLAC_LOSSLESS
- if tag.samplerate > 192000:
+ if tags.samplerate > 192000:
quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4
- elif tag.samplerate > 96000:
+ elif tags.samplerate > 96000:
quality = MediaQuality.FLAC_LOSSLESS_HI_RES_3
- elif tag.samplerate > 48000:
+ elif tags.samplerate > 48000:
quality = MediaQuality.FLAC_LOSSLESS_HI_RES_2
- quality_details = f"{tag.samplerate / 1000} Khz"
+ quality_details = f"{tags.samplerate / 1000} Khz"
elif filename.endswith(".ogg"):
quality = MediaQuality.LOSSY_OGG
- quality_details = f"{tag.bitrate} kbps"
+ quality_details = f"{tags.bitrate} kbps"
elif filename.endswith(".m4a"):
quality = MediaQuality.LOSSY_AAC
- quality_details = f"{tag.bitrate} kbps"
+ quality_details = f"{tags.bitrate} kbps"
else:
quality = MediaQuality.LOSSY_MP3
- quality_details = f"{tag.bitrate} kbps"
+ quality_details = f"{tags.bitrate} kbps"
track.add_provider_id(
MediaItemProviderId(
provider=self.id,
async def _parse_playlist(self, filename: str) -> Playlist | None:
"""Parse playlist from file."""
+ # use the relative filename as item_id
+ filename_base = filename.replace(self._music_dir, "")
+ if filename_base.startswith(os.sep):
+ filename_base = filename_base[1:]
+ prov_item_id = filename_base
+
name = filename.split(os.sep)[-1].replace(".m3u", "")
- prov_item_id = await self._get_item_id(filename, MediaType.PLAYLIST)
+
playlist = Playlist(prov_item_id, provider=self.id, name=name)
playlist.is_editable = True
playlist.add_provider_id(
except MediaNotFoundError:
return None
- async def _get_item_id(self, filename: str, media_type: MediaType) -> str:
- """Get/create item ID for given filename."""
- # we store the relative path in db
- filename_base = filename.replace(self._music_dir, "")
- if filename_base.startswith(os.sep):
- filename_base = filename_base[1:]
- match = {"filename": filename_base, "media_type": media_type.value}
- if db_row := await self.mass.database.get_row(DB_TABLE, match):
- return str(db_row["item_id"])
- # filename not yet known in db, create new record
- db_row = await self.mass.database.insert_or_replace(DB_TABLE, match)
- return str(db_row["item_id"])
-
- async def _get_filename(self, item_id: str, media_type: MediaType) -> str:
- """Get/create ID for given filename."""
- match = {"item_id": int(item_id), "media_type": media_type.value}
- db_row = await self.mass.database.get_row(DB_TABLE, match)
- if not db_row:
- raise MediaNotFoundError(f"Item not found: {item_id}")
- if media_type == MediaType.PLAYLIST:
- return os.path.join(self._playlists_dir, db_row["filename"])
- return os.path.join(self._music_dir, db_row["filename"])
+ def _get_filename(self, item_id: str, playlist: bool = False) -> str:
+ """Get filename for item_id."""
+ if self._music_dir in item_id:
+ return item_id
+ if playlist:
+ return os.path.join(self._playlists_dir, item_id)
+ return os.path.join(self._music_dir, item_id)