from __future__ import annotations
+import asyncio
+import os
import time
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, NotRequired, TypedDict
+import aiofiles
import shortuuid
from music_assistant.common.helpers.uri import parse_uri
CONF_KEY_RADIOS = "stored_radios"
CONF_KEY_TRACKS = "stored_tracks"
CONF_KEY_PLAYLISTS = "stored_playlists"
-CONF_KEY_PLAYLIST_ITEMS = "stored_playlists_items"
ALL_LIBRARY_TRACKS = "all_library_tracks"
class BuiltinProvider(MusicProvider):
"""Built-in/generic provider to handle (manually added) media from files and (remote) urls."""
+ _playlists_dir: str
+ _playlist_lock: asyncio.Lock
+
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
+ self._playlist_lock = asyncio.Lock()
+ # make sure that our directory with collage images exists
+ self._playlists_dir = os.path.join(self.mass.storage_path, "playlists")
+ if not await asyncio.to_thread(os.path.exists, self._playlists_dir):
+ await asyncio.to_thread(os.mkdir, self._playlists_dir)
+
# TEMP: Migrate URL provider entries to builtin
# TODO: Remove this once 2.0 is released!
cache_key = f"{self.instance_id}.url_migration_done"
yield item
return
# user created universal playlist
- conf_key = f"{CONF_KEY_PLAYLIST_ITEMS}/{prov_playlist_id}"
- playlist_items: list[str] = self.mass.config.get(conf_key, [])
+ playlist_items = await self._read_playlist_file_items(prov_playlist_id)
for count, uri in enumerate(playlist_items, 1):
try:
# get the provider item and not the full track from a regular 'get' call
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
"""Add track(s) to playlist."""
- conf_key = f"{CONF_KEY_PLAYLIST_ITEMS}/{prov_playlist_id}"
- playlist_items: list[str] = self.mass.config.get(conf_key, [])
+ playlist_items = await self._read_playlist_file_items(prov_playlist_id)
for uri in prov_track_ids:
if uri not in playlist_items:
playlist_items.append(uri)
- self.mass.config.set(conf_key, playlist_items)
+ # store playlist file
+ await self._write_playlist_file_items(prov_playlist_id, playlist_items)
# mark last_updated on playlist object
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
) -> None:
"""Remove track(s) from playlist."""
- conf_key = f"{CONF_KEY_PLAYLIST_ITEMS}/{prov_playlist_id}"
- playlist_items: list[str] = self.mass.config.get(conf_key, [])
+ playlist_items = await self._read_playlist_file_items(prov_playlist_id)
# remove items by index
for i in sorted(positions_to_remove, reverse=True):
del playlist_items[i]
- self.mass.config.set(conf_key, playlist_items)
+ # store playlist file
+ await self._write_playlist_file_items(prov_playlist_id, playlist_items)
# mark last_updated on playlist object
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
count += 1
yield PlaylistTrack.from_dict({**track.to_dict(), "position": count})
return
+
+ async def _read_playlist_file_items(self, playlist_id: str) -> list[str]:
+ """Return lines of a playlist file."""
+ playlist_file = os.path.join(self._playlists_dir, playlist_id)
+ if not await asyncio.to_thread(os.path.isfile, playlist_file):
+ return []
+ async with (
+ self._playlist_lock,
+ aiofiles.open(playlist_file, "r", encoding="utf-8") as _file,
+ ):
+ lines = await _file.readlines()
+ return [x.strip() for x in lines]
+
+ async def _write_playlist_file_items(self, playlist_id: str, lines: list[str]) -> None:
+ """Return lines of a playlist file."""
+ playlist_file = os.path.join(self._playlists_dir, playlist_id)
+ async with (
+ self._playlist_lock,
+ aiofiles.open(playlist_file, "w", encoding="utf-8") as _file,
+ ):
+ await _file.write("\n".join(lines))