"""Filesystem musicprovider support for MusicAssistant."""
from __future__ import annotations
+import asyncio
import os
import urllib.parse
from contextlib import asynccontextmanager
-from typing import Generator, List, Optional, Set, Tuple
+from typing import AsyncGenerator, List, Optional, Set, Tuple
import aiofiles
+import aiofiles.ospath as aiopath
import xmltodict
from aiofiles.threadpool.binary import AsyncFileIO
from tinytag.tinytag import TinyTag
from music_assistant.models.provider import MusicProvider
-def scantree(path: str) -> Generator[os.DirEntry]:
+async def scantree(path: str) -> AsyncGenerator[os.DirEntry, None]:
"""Recursively yield DirEntry objects for given directory."""
- for entry in os.scandir(path):
+ loop = asyncio.get_running_loop()
+ for entry in await loop.run_in_executor(None, os.scandir, path):
if entry.is_dir(follow_symlinks=False):
- yield from scantree(entry.path)
+ async for subitem in scantree(entry.path):
+ yield subitem
else:
yield entry
async def setup(self) -> bool:
"""Handle async initialization of the provider."""
- if not os.path.isdir(self.config.path):
+ if not await aiopath.isdir(self.config.path):
raise MediaNotFoundError(
f"Music Directory {self.config.path} does not exist"
)
title = search_query
if media_types is None or MediaType.TRACK in media_types:
query = f"SELECT * FROM tracks WHERE name LIKE '%{title}%'"
- query += f" AND provider_ids LIKE '%\"{self.type.value}\"%')"
+ query += f" AND provider_ids LIKE '%\"{self.type.value}\"%'"
if artist:
- query += f" AND artists LIKE '%{artist}%')"
+ query += f" AND artists LIKE '%{artist}%'"
tracks = await self.mass.music.tracks.get_db_items(query)
result.append(tracks)
if media_types is None or MediaType.ALBUM in media_types:
query = f"SELECT * FROM albums WHERE name LIKE '%{title}%'"
- query += f" AND provider_ids LIKE '%\"{self.type.value}\"%')"
+ query += f" AND provider_ids LIKE '%\"{self.type.value}\"%'"
if artist:
- query += f" AND artist LIKE '%{artist}%')"
+ query += f" AND artist LIKE '%{artist}%'"
albums = await self.mass.music.albums.get_db_items(query)
result.append(albums)
if media_types is None or MediaType.ARTIST in media_types:
query = f"SELECT * FROM artists WHERE name LIKE '%{title}%'"
- query += f" AND provider_ids LIKE '%\"{self.type.value}\"%')"
+ query += f" AND provider_ids LIKE '%\"{self.type.value}\"%'"
artists = await self.mass.music.artists.get_db_items(query)
result.append(artists)
if media_types is None or MediaType.PLAYLIST in media_types:
query = f"SELECT * FROM playlists WHERE name LIKE '%{title}%'"
- query += f" AND provider_ids LIKE '%\"{self.type.value}\"%')"
+ query += f" AND provider_ids LIKE '%\"{self.type.value}\"%'"
playlists = await self.mass.music.playlists.get_db_items(query)
result.append(playlists)
return result
# find all music files in the music directory and all subfolders
# we work bottom up, as-in we derive all info from the tracks
cur_checksums = {}
- for entry in scantree(self.config.path):
+ async for entry in scantree(self.config.path):
# mtime is used as file checksum
- checksum = int(entry.stat().st_mtime)
+ stat = await asyncio.get_running_loop().run_in_executor(None, entry.stat)
+ checksum = int(stat.st_mtime)
cur_checksums[entry.path] = checksum
if checksum == prev_checksums.get(entry.path):
continue
async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
"""Get album tracks for given album id."""
# filesystem items are always stored in db so we can query the database
- query = f"SELECT * FROM tracks WHERE (album LIKE '%\"{prov_album_id}\"%'"
- query += f" AND album LIKE '%\"{self.type.value}\"%')"
db_id = await self.mass.music.get_provider_mapping(
MediaType.ALBUM, provider=self.type, provider_item_id=prov_album_id
)
- if db_id is not None:
- query += f" OR (album LIKE '%\"{db_id}\"%' AND album LIKE '%\"database\"%')"
+ if db_id is None:
+ raise MediaNotFoundError(f"Album not found: {prov_album_id}")
+ query = f"SELECT * FROM tracks WHERE album LIKE '%\"{db_id}\"%'"
+ query += f" AND provider_ids like '%\"{self.type.value}\"%'"
return await self.mass.music.tracks.get_db_items(query)
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get playlist tracks for given playlist id."""
result = []
playlist_path = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
- if not self.exists(playlist_path):
+ if not await self.exists(playlist_path):
raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
- checksum = self._get_checksum(playlist_path)
+ checksum = await self._get_checksum(playlist_path)
cache_key = f"{self.id}_playlist_tracks_{prov_playlist_id}"
if cache := await self.mass.cache.get(cache_key, checksum):
return [Track.from_dict(x) for x in cache]
async def get_artist_albums(self, prov_artist_id: str) -> List[Album]:
"""Get a list of albums for the given artist."""
# filesystem items are always stored in db so we can query the database
- query = f"SELECT * FROM albums WHERE (artist LIKE '%\"{prov_artist_id}\"%'"
- query += f" AND artist LIKE '%\"{self.type.value}\"%')"
db_id = await self.mass.music.get_provider_mapping(
MediaType.ARTIST, provider=self.type, provider_item_id=prov_artist_id
)
- if db_id is not None:
- query += f" OR (artist LIKE '%{db_id}%' AND artist LIKE '%database%')"
+ if db_id is None:
+ raise MediaNotFoundError(f"Artist not found: {prov_artist_id}")
+ query = f"SELECT * FROM albums WHERE artist LIKE '%\"{prov_artist_id}\"%'"
+ query += f" AND provider_ids like '%\"{self.type.value}\"%'"
return await self.mass.music.albums.get_db_items(query)
async def get_artist_toptracks(self, prov_artist_id: str) -> List[Track]:
"""Get a list of all tracks as we have no clue about preference."""
# filesystem items are always stored in db so we can query the database
- query = f"SELECT * FROM tracks WHERE (artists LIKE '%\"{prov_artist_id}\"%'"
- query += f" AND artists LIKE '%\"{self.type.value}\"%')"
db_id = await self.mass.music.get_provider_mapping(
MediaType.ARTIST, provider=self.type, provider_item_id=prov_artist_id
)
- if db_id is not None:
- query += (
- f" OR (artists LIKE '%\"{db_id}\"%' AND artists LIKE '%\"database\"%')"
- )
+ if db_id is None:
+ raise MediaNotFoundError(f"Artist not found: {prov_artist_id}")
+ query = f"SELECT * FROM tracks WHERE artists LIKE '%\"{prov_artist_id}\"%'"
+ query += f" AND provider_ids like '%\"{self.type.value}\"%'"
return await self.mass.music.tracks.get_db_items(query)
async def library_add(self, *args, **kwargs) -> bool:
) -> None:
"""Add track(s) to playlist."""
itempath = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
- if not self.exists(itempath):
+ if not await self.exists(itempath):
raise MediaNotFoundError(f"Playlist path does not exist: {itempath}")
async with self.open_file(itempath, "r") as _file:
cur_data = await _file.read()
) -> None:
"""Remove track(s) from playlist."""
itempath = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
- if not self.exists(itempath):
+ if not await self.exists(itempath):
raise MediaNotFoundError(f"Playlist path does not exist: {itempath}")
cur_lines = []
async with self.open_file(itempath, "r") as _file:
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
itempath = await self.get_filepath(MediaType.TRACK, item_id)
- if not self.exists(itempath):
+ if not await self.exists(itempath):
raise MediaNotFoundError(f"Track path does not exist: {itempath}")
def parse_tag():
) -> Track | None:
"""Try to parse a track from a filename by reading its tags."""
- if not self.exists(track_path):
+ if not await self.exists(track_path):
raise MediaNotFoundError(f"Track path does not exist: {track_path}")
track_item_id = self._get_item_id(track_path)
# reading file/tags is slow so we keep a cache and checksum
- checksum = checksum or self._get_checksum(track_path)
+ checksum = checksum or await self._get_checksum(track_path)
cache_key = f"{self.id}_tracks_{track_item_id}"
if cache := await self.mass.cache.get(cache_key, checksum):
return Track.from_dict(cache)
in_library=in_library,
)
- if not self.exists(artist_path):
+ if not await self.exists(artist_path):
# return basic object if there is no dedicated artist folder
return artist
artist.in_library = True
nfo_file = os.path.join(artist_path, "artist.nfo")
- if self.exists(nfo_file):
+ if await self.exists(nfo_file):
# found NFO file with metadata
# https://kodi.wiki/view/NFO_files/Artists
async with self.open_file(nfo_file, "r") as _file:
in_library=in_library,
)
- if not self.exists(album_path):
+ if not await self.exists(album_path):
# return basic object if there is no dedicated album folder
return album
album.in_library = True
nfo_file = os.path.join(album_path, "album.nfo")
- if self.exists(nfo_file):
+ if await self.exists(nfo_file):
# found NFO file with metadata
# https://kodi.wiki/view/NFO_files/Artists
async with self.open_file(nfo_file) as _file:
) -> Playlist | None:
"""Parse playlist from file."""
playlist_item_id = self._get_item_id(playlist_path)
- checksum = checksum or self._get_checksum(playlist_path)
+ checksum = checksum or await self._get_checksum(playlist_path)
if not playlist_path.endswith(".m3u"):
return None
- if not self.exists(playlist_path):
+ if not await self.exists(playlist_path):
raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
name = playlist_path.split(os.sep)[-1].replace(".m3u", "")
except MediaNotFoundError:
return None
- def exists(self, file_path: str) -> bool:
+ async def exists(self, file_path: str) -> bool:
"""Return bool is this FileSystem musicprovider has given file/dir."""
if not file_path:
return False # guard
# ensure we have a full path and not relative
if self.config.path not in file_path:
file_path = os.path.join(self.config.path, file_path)
- return os.path.isfile(file_path) or os.path.isdir(file_path)
+ return await aiopath.exists(file_path)
@asynccontextmanager
async def open_file(self, file_path: str, mode="rb") -> AsyncFileIO:
return create_clean_string(file_path.replace(self.config.path, ""))
@staticmethod
- def _get_checksum(filename: str) -> int:
+ async def _get_checksum(filename: str) -> int:
"""Get checksum for file."""
# use last modified time as checksum
- return int(os.path.getmtime(filename))
+ return await aiopath.getmtime(filename)