ConfigEntry(
entry_key=CONF_MUSIC_DIR,
entry_type=ConfigEntryType.STRING,
- description="file_prov_music_path",
+ label="file_prov_music_path",
+ description="file_prov_music_path_desc",
),
ConfigEntry(
entry_key=CONF_PLAYLISTS_DIR,
entry_type=ConfigEntryType.STRING,
- description="file_prov_playlists_path",
+ label="file_prov_playlists_path",
+ description="file_prov_playlists_path_desc",
),
]
"""Retrieve all library artists."""
if not os.path.isdir(self._music_dir):
LOGGER.error("music path does not exist: %s", self._music_dir)
- yield None
- return
+ return None
+ result = []
for dirname in os.listdir(self._music_dir):
dirpath = os.path.join(self._music_dir, dirname)
if os.path.isdir(dirpath) and not dirpath.startswith("."):
artist = await self.get_artist(dirpath)
if artist:
- yield artist
+ result.append(artist)
+ return result
async def get_library_albums(self) -> List[Album]:
"""Get album folders recursively."""
- async for artist in self.get_library_artists():
- async for album in self.get_artist_albums(artist.item_id):
- yield album
+ result = []
+ for artist in await self.get_library_artists():
+ for album in await self.get_artist_albums(artist.item_id):
+ result.append(album)
+ return result
async def get_library_tracks(self) -> List[Track]:
"""Get all tracks recursively."""
# TODO: support disk subfolders
- async for album in self.get_library_albums():
- async for track in self.get_album_tracks(album.item_id):
- yield track
+ result = []
+ for album in await self.get_library_albums():
+ for track in await self.get_album_tracks(album.item_id):
+ result.append(track)
+ return result
async def get_library_playlists(self) -> List[Playlist]:
"""Retrieve playlists from disk."""
if not self._playlists_dir:
- yield None
- return
+ return []
+ result = []
for filename in os.listdir(self._playlists_dir):
filepath = os.path.join(self._playlists_dir, filename)
if (
):
playlist = await self.get_playlist(filepath)
if playlist:
- yield playlist
+ result.append(playlist)
+ return result
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
LOGGER.error("Artist path does not exist: %s", itempath)
return None
name = itempath.split(os.sep)[-1]
- artist = Artist()
- artist.item_id = prov_artist_id
- artist.provider = PROV_ID
- artist.name = name
+ artist = Artist(item_id=prov_artist_id, provider=PROV_ID, name=name)
artist.provider_ids.add(
MediaItemProviderId(provider=PROV_ID, item_id=artist.item_id)
)
return None
name = itempath.split(os.sep)[-1]
artistpath = itempath.rsplit(os.sep, 1)[0]
- album = Album()
- album.item_id = prov_album_id
- album.provider = PROV_ID
+ album = Album(item_id=prov_album_id, provider=PROV_ID)
album.name, album.version = parse_title_and_version(name)
album.artist = await self.get_artist(artistpath)
if not album.artist:
async def get_album_tracks(self, prov_album_id) -> List[Track]:
"""Get album tracks for given album id."""
+ result = []
if os.sep not in prov_album_id:
albumpath = base64.b64decode(prov_album_id).decode("utf-8")
else:
albumpath = prov_album_id
if not os.path.isdir(albumpath):
LOGGER.error("album path does not exist: %s", albumpath)
- return
+ return []
album = await self.get_album(albumpath)
for filename in os.listdir(albumpath):
filepath = os.path.join(albumpath, filename)
track = await self._parse_track(filepath)
if track:
track.album = album
- yield track
+ result.append(track)
+ return result
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get playlist tracks for given playlist id."""
+ result = []
if os.sep not in prov_playlist_id:
itempath = base64.b64decode(prov_playlist_id).decode("utf-8")
else:
itempath = prov_playlist_id
if not os.path.isfile(itempath):
LOGGER.error("playlist path does not exist: %s", itempath)
- return
+ return result
index = 0
with open(itempath) as _file:
for line in _file.readlines():
if line and not line.startswith("#"):
track = await self._parse_track_from_uri(line)
if track:
- yield track
+ result.append(track)
index += 1
+ return result
async def get_artist_albums(self, prov_artist_id: str) -> List[Album]:
"""Get a list of albums for the given artist."""
+ result = []
if os.sep not in prov_artist_id:
artistpath = base64.b64decode(prov_artist_id).decode("utf-8")
else:
if os.path.isdir(dirpath) and not dirpath.startswith("."):
album = await self.get_album(dirpath)
if album:
- yield album
+ result.append(album)
+ return result
async def get_artist_toptracks(self, prov_artist_id: str) -> List[Track]:
"""Get a list of random tracks as we have no clue about preference."""
- async for album in self.get_artist_albums(prov_artist_id):
- async for track in self.get_album_tracks(album.item_id):
- yield track
+ result = []
+ for album in await self.get_artist_albums(prov_artist_id):
+ for track in await self.get_album_tracks(album.item_id):
+ result.append(track)
+ return result
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
type=StreamType.FILE,
provider=PROV_ID,
item_id=item_id,
- content_type=ContentType(item_id.split(".")[-1]),
- path=item_id,
+ content_type=ContentType(track_id.split(".")[-1]),
+ path=track_id,
sample_rate=44100,
bit_depth=16,
)
async def _parse_track(self, filename):
"""Try to parse a track from a filename with taglib."""
- track = Track()
# pylint: disable=broad-except
try:
song = taglib.File(filename)
except Exception:
return None # not a media file ?
prov_item_id = base64.b64encode(filename.encode("utf-8")).decode("utf-8")
+ track = Track(item_id=prov_item_id, provider=PROV_ID)
track.duration = song.length
- track.item_id = prov_item_id
- track.provider = PROV_ID
- name = song.tags["TITLE"][0]
+ try:
+ name = song.tags["TITLE"][0]
+ except KeyError:
+ name = filename.split("/")[-1].split(".")[0]
track.name, track.version = parse_title_and_version(name)
albumpath = filename.rsplit(os.sep, 1)[0]
track.album = await self.get_album(albumpath)
- artists = set()
- for artist_str in song.tags["ARTIST"]:
- local_artist_path = os.path.join(self._music_dir, artist_str)
- if os.path.isfile(local_artist_path):
- artist = await self.get_artist(local_artist_path)
- else:
- artist = Artist()
- artist.name = artist_str
- fake_artistpath = os.path.join(self._music_dir, artist_str)
- artist.item_id = fake_artistpath # temporary id
- artist.provider_ids.add(
- MediaItemProviderId(
- provider=PROV_ID,
- item_id=base64.b64encode(
- fake_artistpath.encode("utf-8")
- ).decode("utf-8"),
+ if "ARTIST" in song.tags:
+ artists = set()
+ for artist_str in song.tags["ARTIST"]:
+ local_artist_path = os.path.join(self._music_dir, artist_str)
+ if os.path.isfile(local_artist_path):
+ artist = await self.get_artist(local_artist_path)
+ else:
+ fake_artistpath = os.path.join(self._music_dir, artist_str)
+ artist = Artist(
+ item_id=fake_artistpath, provider=PROV_ID, name=artist_str
)
- )
- artists.add(artist)
- track.artists = artists
+ artist.provider_ids.add(
+ MediaItemProviderId(
+ provider=PROV_ID,
+ item_id=base64.b64encode(
+ fake_artistpath.encode("utf-8")
+ ).decode("utf-8"),
+ )
+ )
+ artists.add(artist)
+ track.artists = artists
+ else:
+ artistpath = filename.rsplit(os.sep, 2)[0]
+ artist = await self.get_artist(artistpath)
+ track.artists.add(artist)
if "GENRE" in song.tags:
track.metadata["genres"] = song.tags["GENRE"]
if "ISRC" in song.tags: