import os
import os.path
import time
-from collections.abc import Iterator
-from typing import TYPE_CHECKING, cast
+from collections.abc import AsyncGenerator, Iterator
+from typing import TYPE_CHECKING, Any, cast
import aiofiles
import shortuuid
import xmltodict
from aiofiles.os import wrap
-from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
from music_assistant_models.enums import (
- ConfigEntryType,
ContentType,
ExternalID,
ImageType,
from music_assistant_models.media_items import (
Album,
Artist,
+ Audiobook,
AudioFormat,
BrowseFolder,
ItemMapping,
+ MediaItemChapter,
MediaItemImage,
MediaItemType,
Playlist,
+ Podcast,
+ PodcastEpisode,
ProviderMapping,
SearchResults,
Track,
VARIOUS_ARTISTS_NAME,
)
from music_assistant.helpers.compare import compare_strings, create_safe_string
+from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
+from music_assistant.helpers.json import json_loads
from music_assistant.helpers.playlists import parse_m3u, parse_pls
from music_assistant.helpers.tags import AudioTags, async_parse_tags, parse_tags, split_items
-from music_assistant.helpers.util import parse_title_and_version
+from music_assistant.helpers.util import TaskManager, parse_title_and_version, try_parse_int
from music_assistant.models.music_provider import MusicProvider
+from .constants import (
+ AUDIOBOOK_EXTENSIONS,
+ CONF_ENTRY_CONTENT_TYPE,
+ CONF_ENTRY_CONTENT_TYPE_READ_ONLY,
+ CONF_ENTRY_MISSING_ALBUM_ARTIST,
+ CONF_ENTRY_PATH,
+ CONF_MISSING_ALBUM_ARTIST_ACTION,
+ IMAGE_EXTENSIONS,
+ PLAYLIST_EXTENSIONS,
+ PODCAST_EPISODE_EXTENSIONS,
+ SUPPORTED_EXTENSIONS,
+ TRACK_EXTENSIONS,
+ IsChapterFile,
+)
from .helpers import (
IGNORE_DIRS,
FileSystemItem,
)
if TYPE_CHECKING:
- from music_assistant_models.config_entries import ProviderConfig
+ from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
from music_assistant_models.provider import ProviderManifest
from music_assistant.mass import MusicAssistant
from music_assistant.models import ProviderInstanceType
-CONF_MISSING_ALBUM_ARTIST_ACTION = "missing_album_artist_action"
-
-CONF_ENTRY_MISSING_ALBUM_ARTIST = ConfigEntry(
- key=CONF_MISSING_ALBUM_ARTIST_ACTION,
- type=ConfigEntryType.STRING,
- label="Action when a track is missing the Albumartist ID3 tag",
- default_value="various_artists",
- help_link="https://music-assistant.io/music-providers/filesystem/#tagging-files",
- required=False,
- options=(
- ConfigValueOption("Use Track artist(s)", "track_artist"),
- ConfigValueOption("Use Various Artists", "various_artists"),
- ConfigValueOption("Use Folder name (if possible)", "folder_name"),
- ),
-)
-
-TRACK_EXTENSIONS = (
- "mp3",
- "m4a",
- "m4b",
- "mp4",
- "flac",
- "wav",
- "ogg",
- "aiff",
- "wma",
- "dsf",
- "opus",
- "aac",
- "wv",
- "amr",
- "awb",
- "spx",
- "tak",
- "ape",
- "mpc",
- "mp2",
- "mp1",
- "dra",
- "mpeg",
- "mpg",
- "ac3",
- "ec3",
- "aif",
- "oga",
- "dff",
- "ts",
- "m2ts",
- "mp+",
-)
-PLAYLIST_EXTENSIONS = ("m3u", "pls", "m3u8")
-SUPPORTED_EXTENSIONS = TRACK_EXTENSIONS + PLAYLIST_EXTENSIONS
-IMAGE_EXTENSIONS = ("jpg", "jpeg", "JPG", "JPEG", "png", "PNG", "gif", "GIF")
-SEEKABLE_FILES = (ContentType.MP3, ContentType.WAV, ContentType.FLAC)
-
-
-SUPPORTED_FEATURES = {
- ProviderFeature.LIBRARY_ARTISTS,
- ProviderFeature.LIBRARY_ALBUMS,
- ProviderFeature.LIBRARY_TRACKS,
- ProviderFeature.LIBRARY_PLAYLISTS,
- ProviderFeature.BROWSE,
- ProviderFeature.SEARCH,
-}
isdir = wrap(os.path.isdir)
isfile = wrap(os.path.isfile)
prov = LocalFileSystemProvider(mass, manifest, config)
prov.base_path = str(config.get_value(CONF_PATH))
await prov.check_write_access()
+ prov.media_content_type = cast(str, config.get_value(CONF_ENTRY_CONTENT_TYPE.key))
return prov
values: the (intermediate) raw values for config entries sent with the action.
"""
# ruff: noqa: ARG001
+ if instance_id is None or values is None:
+ return (
+ CONF_ENTRY_CONTENT_TYPE,
+ CONF_ENTRY_PATH,
+ CONF_ENTRY_MISSING_ALBUM_ARTIST,
+ )
+ media_type = values.get(CONF_ENTRY_CONTENT_TYPE.key)
+ if media_type == "music":
+ return (
+ CONF_ENTRY_PATH,
+ CONF_ENTRY_CONTENT_TYPE_READ_ONLY,
+ CONF_ENTRY_MISSING_ALBUM_ARTIST,
+ )
return (
- ConfigEntry(
- key="path",
- type=ConfigEntryType.STRING,
- label="Path",
- default_value="/media",
- ),
- CONF_ENTRY_MISSING_ALBUM_ARTIST,
+ CONF_ENTRY_PATH,
+ CONF_ENTRY_CONTENT_TYPE_READ_ONLY,
)
base_path: str
write_access: bool = False
sync_running: bool = False
+ media_content_type: str = "music"
@property
def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
+ base_features = {
+ ProviderFeature.BROWSE,
+ ProviderFeature.SEARCH,
+ }
+ if self.media_content_type == "audiobooks":
+ return {ProviderFeature.LIBRARY_AUDIOBOOKS, *base_features}
+ if self.media_content_type == "podcasts":
+ return {ProviderFeature.LIBRARY_PODCASTS, *base_features}
+ music_features = {
+ ProviderFeature.LIBRARY_ARTISTS,
+ ProviderFeature.LIBRARY_ALBUMS,
+ ProviderFeature.LIBRARY_TRACKS,
+ # for now, only support playlists for music files and not for podcasts or audiobooks
+ ProviderFeature.LIBRARY_PLAYLISTS,
+ *base_features,
+ }
if self.write_access:
- return {
- *SUPPORTED_FEATURES,
- ProviderFeature.PLAYLIST_CREATE,
- ProviderFeature.PLAYLIST_TRACKS_EDIT,
- }
- return SUPPORTED_FEATURES
+ music_features.add(ProviderFeature.PLAYLIST_TRACKS_EDIT)
+ return music_features
@property
def is_streaming_provider(self) -> bool:
"""Return True if the provider is a streaming provider."""
return False
+ @property
+ def name(self) -> str:
+ """Return (custom) friendly name for this provider instance."""
+ if self.config.name:
+ return self.config.name
+ postfix = self.base_path.split(os.sep)[-1]
+ return f"{self.manifest.name} {postfix}"
+
async def search(
self,
search_query: str,
provider=self.instance_id,
limit=limit,
)
+ if media_types is None or MediaType.AUDIOBOOK in media_types:
+ result.audiobooks = await self.mass.music.audiobooks._get_library_items_by_query(
+ search=search_query,
+ provider=self.instance_id,
+ limit=limit,
+ )
+ if media_types is None or MediaType.PODCAST in media_types:
+ result.podcasts = await self.mass.music.podcasts._get_library_items_by_query(
+ search=search_query,
+ provider=self.instance_id,
+ limit=limit,
+ )
return result
async def browse(self, path: str) -> list[MediaItemType | ItemMapping]:
self.name,
)
file_checksums: dict[str, str] = {}
+ # NOTE: we always run a scan of the entire library, as we need to detect changes
+ # we ignore any given mediatype(s) and just scan all supported files
query = (
f"SELECT provider_item_id, details FROM {DB_TABLE_PROVIDER_MAPPINGS} "
f"WHERE provider_instance = '{self.instance_id}' "
- "AND media_type in ('track', 'playlist')"
+ f"AND media_type in ('track', 'playlist', 'audiobook', 'podcast_episode')"
)
for db_row in await self.mass.music.database.get_rows_from_query(query, limit=0):
file_checksums[db_row["provider_item_id"]] = str(db_row["details"])
- # find all music files in the music directory and all subfolders
+ # find all supported files in the base directory and all subfolders
# we work bottom up, as-in we derive all info from the tracks
cur_filenames = set()
prev_filenames = set(file_checksums.keys())
# NOTE: we do the entire traversing of the directory structure, including parsing tags
- # in a single executor threads to save the overhead of having to spin up tons of tasks
+ # in a single executor thread to save the overhead of having to spin up tons of tasks
def listdir(path: str) -> Iterator[FileSystemItem]:
"""Recursively traverse directory entries."""
for item in os.scandir(path):
# skip files without extension
if "." not in item.name:
continue
+ ext = item.name.rsplit(".", 1)[1].lower()
+ if ext not in SUPPORTED_EXTENSIONS:
+ # skip unsupported file extension
+ continue
yield FileSystemItem.from_dir_entry(item, self.base_path)
def run_sync() -> None:
self.sync_running = True
try:
for item in listdir(self.base_path):
- if item.ext not in SUPPORTED_EXTENSIONS:
- # unsupported file extension
- continue
-
cur_filenames.add(item.relative_path)
-
# continue if the item did not change (checksum still the same)
prev_checksum = file_checksums.get(item.relative_path)
if item.checksum == prev_checksum:
"""Process a single item. NOT async friendly."""
try:
self.logger.debug("Processing: %s", item.relative_path)
- if item.ext in TRACK_EXTENSIONS:
+ if item.ext in TRACK_EXTENSIONS and self.media_content_type == "music":
# handle track item
tags = parse_tags(item.absolute_path, item.file_size)
asyncio.run_coroutine_threadsafe(process_track(), self.mass.loop).result()
return
- if item.ext in PLAYLIST_EXTENSIONS:
+ if item.ext in AUDIOBOOK_EXTENSIONS and self.media_content_type == "audiobooks":
+ # handle audiobook item
+ tags = parse_tags(item.absolute_path, item.file_size)
+
+ async def process_audiobook() -> None:
+ try:
+ audiobook = await self._parse_audiobook(item, tags)
+ except IsChapterFile:
+ return
+ # add/update audiobook to db
+ # note that filesystem items are always overwriting existing info
+ # when they are detected as changed
+ await self.mass.music.audiobooks.add_item_to_library(
+ audiobook, overwrite_existing=prev_checksum is not None
+ )
+
+ asyncio.run_coroutine_threadsafe(process_audiobook(), self.mass.loop).result()
+ return
+
+ if item.ext in PODCAST_EPISODE_EXTENSIONS and self.media_content_type == "podcasts":
+ # handle podcast(episode) item
+ tags = parse_tags(item.absolute_path, item.file_size)
+
+ async def process_episode() -> None:
+ episode = await self._parse_podcast_episode(item, tags)
+ assert isinstance(episode.podcast, Podcast)
+ # add/update episode to db
+ # note that filesystem items are always overwriting existing info
+ # when they are detected as changed
+ await self.mass.music.podcasts.add_item_to_library(
+ episode.podcast, overwrite_existing=prev_checksum is not None
+ )
+
+ asyncio.run_coroutine_threadsafe(process_episode(), self.mass.loop).result()
+ return
+
+ if item.ext in PLAYLIST_EXTENSIONS and self.media_content_type == "music":
async def process_playlist() -> None:
playlist = await self.get_playlist(item.relative_path)
artist_ids = set()
for file_path in deleted_files:
_, ext = file_path.rsplit(".", 1)
- if ext not in SUPPORTED_EXTENSIONS:
- # unsupported file extension
- continue
-
- if ext in PLAYLIST_EXTENSIONS:
+ if ext in PODCAST_EPISODE_EXTENSIONS and self.media_content_type == "podcasts":
+ controller = self.mass.music.get_controller(MediaType.PODCAST_EPISODE)
+ elif ext in AUDIOBOOK_EXTENSIONS and self.media_content_type == "audiobooks":
+ controller = self.mass.music.get_controller(MediaType.AUDIOBOOK)
+ elif ext in PLAYLIST_EXTENSIONS and self.media_content_type == "music":
controller = self.mass.music.get_controller(MediaType.PLAYLIST)
- else:
+ elif ext in TRACK_EXTENSIONS and self.media_content_type == "music":
controller = self.mass.music.get_controller(MediaType.TRACK)
+ else:
+ # unsupported file extension?
+ continue
if library_item := await controller.get_library_item_by_prov_id(
file_path, self.instance_id
playlist.cache_checksum = checksum
return playlist
+ async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
+ """Get full audiobook details by id."""
+ # ruff: noqa: PLR0915, PLR0912
+ if not await self.exists(prov_audiobook_id):
+ msg = f"Audiobook path does not exist: {prov_audiobook_id}"
+ raise MediaNotFoundError(msg)
+
+ file_item = await self.resolve(prov_audiobook_id)
+ tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
+ return await self._parse_audiobook(file_item, tags=tags)
+
+ async def get_podcast(self, prov_podcast_id: str) -> Podcast:
+ """Get full podcast details by id."""
+ for episode in await self.get_podcast_episodes(prov_podcast_id):
+ assert isinstance(episode.podcast, Podcast)
+ return episode.podcast
+ msg = f"Podcast not found: {prov_podcast_id}"
+ raise MediaNotFoundError(msg)
+
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
# filesystem items are always stored in db so we can query the database
)
return result
+ async def get_podcast_episodes(self, prov_podcast_id: str) -> list[PodcastEpisode]:
+ """Get podcast episodes for given podcast id."""
+ episodes: list[PodcastEpisode] = []
+
+ async def _process_podcast_episode(item: FileSystemItem) -> None:
+ tags = await async_parse_tags(item.absolute_path, item.file_size)
+ try:
+ episode = await self._parse_podcast_episode(item, tags)
+ except MusicAssistantError as err:
+ self.logger.warning(
+ "Could not parse uri/file %s to podcast episode: %s",
+ item.relative_path,
+ str(err),
+ )
+ else:
+ episodes.append(episode)
+
+ async with TaskManager(self.mass, 25) as tm:
+ for item in await asyncio.to_thread(sorted_scandir, self.base_path, prov_podcast_id):
+ if "." not in item.relative_path or item.is_dir:
+ continue
+ if item.ext not in PODCAST_EPISODE_EXTENSIONS:
+ continue
+ tm.create_task(_process_podcast_episode(item))
+
+ return episodes
+
async def _parse_playlist_line(self, line: str, playlist_path: str) -> Track | None:
"""Try to parse a track from a playlist line."""
try:
await _file.write("#EXTM3U\n")
return await self.get_playlist(filename)
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- library_item = await self.mass.music.tracks.get_library_item_by_prov_id(
- item_id, self.instance_id
- )
- if library_item is None:
- # this could be a file that has just been added, try parsing it
- file_item = await self.resolve(item_id)
- tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
- if not (library_item := await self._parse_track(file_item, tags)):
- msg = f"Item not found: {item_id}"
- raise MediaNotFoundError(msg)
-
- prov_mapping = next(x for x in library_item.provider_mappings if x.item_id == item_id)
- file_item = await self.resolve(item_id)
+ if media_type == MediaType.AUDIOBOOK:
+ return await self._get_stream_details_for_audiobook(item_id)
+ if media_type == MediaType.PODCAST_EPISODE:
+ return await self._get_stream_details_for_podcast_episode(item_id)
+ return await self._get_stream_details_for_track(item_id)
+
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """
+ Return the (custom) audio stream for the provider item.
- return StreamDetails(
- provider=self.instance_id,
- item_id=item_id,
- audio_format=prov_mapping.audio_format,
- media_type=MediaType.TRACK,
- stream_type=StreamType.LOCAL_FILE,
- duration=library_item.duration,
- size=file_item.file_size,
- data=file_item,
- path=file_item.absolute_path,
- can_seek=True,
- )
+ Will only be called when the stream_type is set to CUSTOM,
+ currently only for multi-part audiobooks.
+ """
+ stream_data: tuple[AudioFormat, list[tuple[str, float]]] = streamdetails.data
+ format_org, file_based_chapters = stream_data
+ total_duration = 0.0
+ for chapter_file, chapter_duration in file_based_chapters:
+ total_duration += chapter_duration
+ if total_duration < seek_position:
+ continue
+ seek_position_netto = round(
+ max(0, seek_position - (total_duration - chapter_duration)), 2
+ )
+ async for chunk in get_ffmpeg_stream(
+ self.get_absolute_path(chapter_file),
+ input_format=format_org,
+ # output format is always pcm because we are sending
+ # the result of multiple files as one big stream
+ output_format=streamdetails.audio_format,
+ extra_input_args=["-ss", str(seek_position_netto)] if seek_position_netto else [],
+ ):
+ yield chunk
async def resolve_image(self, path: str) -> str | bytes:
"""
async def _parse_track(
self, file_item: FileSystemItem, tags: AudioTags, full_album_metadata: bool = False
) -> Track:
- """Get full track details by id. NOT async friendly."""
+ """Parse full track details from file tags."""
# ruff: noqa: PLR0915, PLR0912
name, version = parse_title_and_version(tags.title, tags.version)
track = Track(
for isrsc in isrc_tags:
track.external_ids.add((ExternalID.ISRC, isrsc))
- if acoustid := tags.get("acoustidid"):
+ if acoustid := tags.get("acoustid"):
track.external_ids.add((ExternalID.ACOUSTID, acoustid))
# album
track.track_number = tags.track
track.metadata.copyright = tags.get("copyright")
track.metadata.lyrics = tags.lyrics
+ track.metadata.description = tags.get("comment")
explicit_tag = tags.get("itunesadvisory")
if explicit_tag is not None:
track.metadata.explicit = explicit_tag == "1"
return artist
+ async def _parse_audiobook(self, file_item: FileSystemItem, tags: AudioTags) -> Audiobook:
+ """Parse full Audiobook details from file tags."""
+ # an audiobook can either be a single file with chapters embedded in the file
+ # or a folder with multiple files (each file being a chapter)
+ # we only scrape all tags from the first file in the folder
+ if tags.track and tags.track > 1:
+ raise IsChapterFile
+ # in case of a multi-file audiobook, the title is the chapter name
+ # and the album is the actual audiobook name
+ # so we prefer the album name as the audiobook name
+ if tags.album:
+ book_name = tags.album
+ sort_name = tags.album_sort
+ elif (title := tags.tags.get("title")) and tags.track is None:
+ book_name = title
+ sort_name = tags.title_sort
+ else:
+ # file(s) without tags, use foldername
+ book_name = file_item.parent_name
+ sort_name = None
+
+ # collect all chapters
+ total_duration, chapters = await self._get_chapters_for_audiobook(file_item, tags)
+
+ audio_book = Audiobook(
+ item_id=file_item.relative_path,
+ provider=self.instance_id,
+ name=book_name,
+ sort_name=sort_name,
+ version=tags.version,
+ duration=total_duration or int(tags.duration or 0),
+ provider_mappings={
+ ProviderMapping(
+ item_id=file_item.relative_path,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(tags.format),
+ sample_rate=tags.sample_rate,
+ bit_depth=tags.bits_per_sample,
+ channels=tags.channels,
+ bit_rate=tags.bit_rate,
+ ),
+ details=file_item.checksum,
+ )
+ },
+ )
+ audio_book.metadata.chapters = chapters
+
+ # handle embedded cover image
+ if tags.has_cover_image:
+ # we do not actually embed the image in the metadata because that would consume too
+ # much space and bandwidth. Instead we set the filename as value so the image can
+ # be retrieved later in realtime.
+ audio_book.metadata.add_image(
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=file_item.relative_path,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ )
+
+ # parse other info
+ audio_book.authors.set(tags.writers or tags.album_artists or tags.artists)
+ audio_book.metadata.genres = set(tags.genres)
+ audio_book.metadata.copyright = tags.get("copyright")
+ audio_book.metadata.lyrics = tags.lyrics
+ audio_book.metadata.description = tags.get("comment")
+ explicit_tag = tags.get("itunesadvisory")
+ if explicit_tag is not None:
+ audio_book.metadata.explicit = explicit_tag == "1"
+ if tags.musicbrainz_recordingid:
+ audio_book.mbid = tags.musicbrainz_recordingid
+
+ # try to fetch additional metadata from the folder
+ if not audio_book.image or not audio_book.metadata.description:
+ # try to get an image by traversing files in the same folder
+ abs_path = self.get_absolute_path(file_item.parent_path)
+ for _item in await asyncio.to_thread(sorted_scandir, self.base_path, abs_path):
+ if "." not in _item.relative_path or _item.is_dir:
+ continue
+ if _item.ext in IMAGE_EXTENSIONS and not audio_book.image:
+ audio_book.metadata.add_image(
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=_item.relative_path,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ )
+ if _item.ext == "txt" and not audio_book.metadata.description:
+ # try to parse a description from a text file
+ try:
+ async with aiofiles.open(_item.absolute_path, encoding="utf-8") as _file:
+ description = await _file.read()
+ audio_book.metadata.description = description
+ except Exception as err:
+ self.logger.warning(
+ "Could not read description from file %s: %s",
+ _item.relative_path,
+ str(err),
+ )
+
+ # handle (optional) loudness measurement tag(s)
+ if tags.track_loudness is not None:
+ self.mass.create_task(
+ self.mass.music.set_loudness(
+ audio_book.item_id,
+ self.instance_id,
+ tags.track_loudness,
+ tags.track_album_loudness,
+ media_type=MediaType.AUDIOBOOK,
+ )
+ )
+ return audio_book
+
+ async def _parse_podcast_episode(
+ self, file_item: FileSystemItem, tags: AudioTags
+ ) -> PodcastEpisode:
+ """Parse full PodcastEpisode details from file tags."""
+ # ruff: noqa: PLR0915, PLR0912
+ podcast_name = tags.album or file_item.parent_name
+ podcast_path = get_relative_path(self.base_path, file_item.parent_path)
+ episode = PodcastEpisode(
+ item_id=file_item.relative_path,
+ provider=self.instance_id,
+ name=tags.title,
+ sort_name=tags.title_sort,
+ provider_mappings={
+ ProviderMapping(
+ item_id=file_item.relative_path,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(tags.format),
+ sample_rate=tags.sample_rate,
+ bit_depth=tags.bits_per_sample,
+ channels=tags.channels,
+ bit_rate=tags.bit_rate,
+ ),
+ details=file_item.checksum,
+ )
+ },
+ position=tags.track or 0,
+ duration=try_parse_int(tags.duration) or 0,
+ podcast=Podcast(
+ item_id=podcast_path,
+ provider=self.instance_id,
+ name=podcast_name,
+ sort_name=tags.album_sort,
+ publisher=tags.tags.get("publisher"),
+ provider_mappings={
+ ProviderMapping(
+ item_id=podcast_path,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ ),
+ )
+ # handle embedded cover image
+ if tags.has_cover_image:
+ # we do not actually embed the image in the metadata because that would consume too
+ # much space and bandwidth. Instead we set the filename as value so the image can
+ # be retrieved later in realtime.
+ episode.metadata.add_image(
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=file_item.relative_path,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ )
+ # parse other info
+ episode.metadata.genres = set(tags.genres)
+ episode.metadata.copyright = tags.get("copyright")
+ episode.metadata.lyrics = tags.lyrics
+ episode.metadata.description = tags.get("comment")
+ explicit_tag = tags.get("itunesadvisory")
+ if explicit_tag is not None:
+ episode.metadata.explicit = explicit_tag == "1"
+
+ # handle (optional) chapters
+ if tags.chapters:
+ episode.metadata.chapters = [
+ MediaItemChapter(
+ position=chapter.chapter_id,
+ name=chapter.title or f"Chapter {chapter.chapter_id}",
+ start=chapter.position_start,
+ end=chapter.position_end,
+ )
+ for chapter in tags.chapters
+ ]
+
+ # try to fetch additional Podcast metadata from the folder
+ assert isinstance(episode.podcast, Podcast)
+ if images := await self._get_local_images(file_item.parent_path):
+ episode.podcast.metadata.images = images
+ if metadata := await self._get_podcast_metadata(file_item.parent_path):
+ if title := metadata.get("title"):
+ episode.podcast.name = title
+ if sort_name := metadata.get("sorttitle"):
+ episode.podcast.sort_name = sort_name
+ if description := metadata.get("description"):
+ episode.podcast.metadata.description = description
+ if genres := metadata.get("genres"):
+ episode.podcast.metadata.genres = set(genres)
+ if publisher := metadata.get("publisher"):
+ episode.podcast.publisher = publisher
+ if image := metadata.get("imageURL"):
+ episode.podcast.metadata.add_image(
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
+ # copy (embedded) image from episode (or vice versa)
+ if not episode.podcast.image and episode.image:
+ episode.podcast.metadata.add_image(episode.image)
+ elif not episode.image and episode.podcast.image:
+ episode.metadata.add_image(episode.podcast.image)
+
+ # handle (optional) loudness measurement tag(s)
+ if tags.track_loudness is not None:
+ self.mass.create_task(
+ self.mass.music.set_loudness(
+ episode.item_id,
+ self.instance_id,
+ tags.track_loudness,
+ tags.track_album_loudness,
+ media_type=MediaType.PODCAST_EPISODE,
+ )
+ )
+ return episode
+
async def _parse_album(self, track_path: str, track_tags: AudioTags) -> Album:
"""Parse Album metadata from Track tags."""
assert track_tags.album
async def _get_local_images(self, folder: str) -> UniqueList[MediaItemImage]:
"""Return local images found in a given folderpath."""
+ cache_base_key = f"{self.lookup_key}.folderimages"
+ if (cache := await self.cache.get(folder, base_key=cache_base_key)) is not None:
+ return cast(UniqueList[MediaItemImage], cache)
images: UniqueList[MediaItemImage] = UniqueList()
abs_path = self.get_absolute_path(folder)
for item in await asyncio.to_thread(sorted_scandir, self.base_path, abs_path, sort=False):
)
)
break
+ await self.cache.set(folder, images, base_key=cache_base_key, expiration=120)
return images
async def check_write_access(self) -> None:
def get_absolute_path(self, file_path: str) -> str:
"""Return absolute path for given file path."""
return get_absolute_path(self.base_path, file_path)
+
+ async def _get_stream_details_for_track(self, item_id: str) -> StreamDetails:
+ """Return the streamdetails for a track/song."""
+ library_item = await self.mass.music.tracks.get_library_item_by_prov_id(
+ item_id, self.instance_id
+ )
+ if library_item is None:
+ # this could be a file that has just been added, try parsing it
+ file_item = await self.resolve(item_id)
+ tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
+ if not (library_item := await self._parse_track(file_item, tags)):
+ msg = f"Item not found: {item_id}"
+ raise MediaNotFoundError(msg)
+
+ prov_mapping = next(x for x in library_item.provider_mappings if x.item_id == item_id)
+ file_item = await self.resolve(item_id)
+
+ return StreamDetails(
+ provider=self.instance_id,
+ item_id=item_id,
+ audio_format=prov_mapping.audio_format,
+ media_type=MediaType.TRACK,
+ stream_type=StreamType.LOCAL_FILE,
+ duration=library_item.duration,
+ size=file_item.file_size,
+ data=file_item,
+ path=file_item.absolute_path,
+ can_seek=True,
+ )
+
+ async def _get_stream_details_for_podcast_episode(self, item_id: str) -> StreamDetails:
+ """Return the streamdetails for a podcast episode."""
+ # podcasts episodes are never stored in the library so we need to parse the file
+ file_item = await self.resolve(item_id)
+ tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
+ return StreamDetails(
+ provider=self.instance_id,
+ item_id=item_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(tags.format),
+ sample_rate=tags.sample_rate,
+ bit_depth=tags.bits_per_sample,
+ channels=tags.channels,
+ bit_rate=tags.bit_rate,
+ ),
+ media_type=MediaType.PODCAST_EPISODE,
+ stream_type=StreamType.LOCAL_FILE,
+ duration=try_parse_int(tags.duration or 0),
+ size=file_item.file_size,
+ data=file_item,
+ path=file_item.absolute_path,
+ allow_seek=True,
+ can_seek=True,
+ )
+
+ async def _get_stream_details_for_audiobook(self, item_id: str) -> StreamDetails:
+ """Return the streamdetails for an audiobook."""
+ library_item = await self.mass.music.audiobooks.get_library_item_by_prov_id(
+ item_id, self.instance_id
+ )
+ if library_item is None:
+ # this could be a file that has just been added, try parsing it
+ file_item = await self.resolve(item_id)
+ tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
+ if not (library_item := await self._parse_audiobook(file_item, tags)):
+ msg = f"Item not found: {item_id}"
+ raise MediaNotFoundError(msg)
+
+ prov_mapping = next(x for x in library_item.provider_mappings if x.item_id == item_id)
+ file_item = await self.resolve(item_id)
+
+ file_based_chapters: list[tuple[str, float]] | None
+ if file_based_chapters := await self.cache.get(
+ file_item.relative_path,
+ base_key=f"{self.lookup_key}.audiobook.chapters",
+ ):
+ # this is a multi-file audiobook, we have the chapter(files) stored in cache
+ # use custom stream to simply send the chapter files one by one
+ return StreamDetails(
+ provider=self.instance_id,
+ item_id=item_id,
+ # for the concatanated stream, we need to use a pcm stream format
+ audio_format=AudioFormat(
+ content_type=ContentType.from_bit_depth(prov_mapping.audio_format.bit_depth),
+ sample_rate=prov_mapping.audio_format.sample_rate,
+ channels=prov_mapping.audio_format.channels,
+ ),
+ media_type=MediaType.AUDIOBOOK,
+ stream_type=StreamType.CUSTOM,
+ duration=library_item.duration,
+ data=(prov_mapping.audio_format, file_based_chapters),
+ allow_seek=True,
+ can_seek=True,
+ )
+
+ # regular single-file streaming, simply let ffmpeg deal with the file directly
+ return StreamDetails(
+ provider=self.instance_id,
+ item_id=item_id,
+ audio_format=prov_mapping.audio_format,
+ media_type=MediaType.AUDIOBOOK,
+ stream_type=StreamType.LOCAL_FILE,
+ duration=library_item.duration,
+ size=file_item.file_size,
+ data=file_item,
+ path=file_item.absolute_path,
+ allow_seek=True,
+ can_seek=True,
+ )
+
+ async def _get_chapters_for_audiobook(
+ self, audiobook_file_item: FileSystemItem, tags: AudioTags
+ ) -> tuple[int, list[MediaItemChapter]]:
+ """Return the chapters for an audiobook."""
+ chapters: list[MediaItemChapter] = []
+ if tags.chapters:
+ # The chapters are embedded in the file
+ chapters = [
+ MediaItemChapter(
+ position=chapter.chapter_id,
+ name=chapter.title or f"Chapter {chapter.chapter_id}",
+ start=chapter.position_start,
+ end=chapter.position_end,
+ )
+ for chapter in tags.chapters
+ ]
+ return (try_parse_int(tags.duration) or 0, chapters)
+ # there could be multiple files for this audiobook in the same folder,
+ # where each file is a portion/chapter of the audiobook
+ # try to gather the chapters by traversing files in the same folder
+ chapter_file_tags: list[AudioTags] = []
+ total_duration = 0.0
+ abs_path = self.get_absolute_path(audiobook_file_item.parent_path)
+ for item in await asyncio.to_thread(sorted_scandir, self.base_path, abs_path, sort=True):
+ if "." not in item.relative_path or item.is_dir:
+ continue
+ if item.ext not in AUDIOBOOK_EXTENSIONS:
+ continue
+ item_tags = await async_parse_tags(item.absolute_path, item.file_size)
+ if not (tags.album == item_tags.album or (item_tags.tags.get("title") is None)):
+ continue
+ if item_tags.track is None:
+ continue
+ chapter_file_tags.append(item_tags)
+ chapter_file_tags.sort(key=lambda x: x.track or 0)
+ all_chapter_files: list[tuple[str, float]] = []
+ for chapter_tags in chapter_file_tags:
+ assert chapter_tags.duration is not None
+ chapters.append(
+ MediaItemChapter(
+ position=chapter_tags.track or 0,
+ name=chapter_tags.title,
+ start=total_duration,
+ end=total_duration + chapter_tags.duration,
+ )
+ )
+ all_chapter_files.append(
+ (get_relative_path(self.base_path, chapter_tags.filename), chapter_tags.duration)
+ )
+ total_duration += chapter_tags.duration
+ # store chapter files in cache
+ # for easy access from streamdetails
+ await self.cache.set(
+ audiobook_file_item.relative_path,
+ all_chapter_files,
+ base_key=f"{self.lookup_key}.audiobook.chapters",
+ )
+ return (int(total_duration), chapters)
+
+ async def _get_podcast_metadata(self, podcast_folder: str) -> dict[str, Any]:
+ """Return metadata for a podcast."""
+ cache_base_key = f"{self.lookup_key}.podcastmetadata"
+ if (cache := await self.cache.get(podcast_folder, base_key=cache_base_key)) is not None:
+ return cast(dict[str, Any], cache)
+ data: dict[str, Any] = {}
+ metadata_file = os.path.join(podcast_folder, "metadata.json")
+ if await self.exists(metadata_file):
+ # found json file with metadata
+ metadata_file = self.get_absolute_path(metadata_file)
+ async with aiofiles.open(metadata_file) as _file:
+ data.update(json_loads(await _file.read()))
+ await self.cache.set(podcast_folder, data, base_key=cache_base_key)
+ return data