from json import JSONDecodeError
from typing import Any
-import eyed3
+import mutagen
from music_assistant_models.enums import AlbumType
from music_assistant_models.errors import InvalidDataError
LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.tags")
-# silence the eyed3 logger because it is too verbose
-logging.getLogger("eyed3").setLevel(logging.ERROR)
-
# the only multi-item splitter we accept is the semicolon,
# which is also the default in Musicbrainz Picard.
return await asyncio.to_thread(parse_tags, input_file, file_size)
-def parse_tags(input_file: str, file_size: int | None = None) -> AudioTags: # noqa: PLR0915
+def parse_tags(input_file: str, file_size: int | None = None) -> AudioTags:
"""
Parse tags from a media file (or URL). NOT Async friendly.
if not tags.duration and tags.raw.get("format", {}).get("duration"):
tags.duration = float(tags.raw["format"]["duration"])
- if (
- not input_file.startswith("http")
- and input_file.endswith(".mp3")
- and os.path.isfile(input_file)
- ):
- # eyed3 is able to extract the musicbrainzrecordingid from the unique file id
- # this is actually a bug in ffmpeg/ffprobe which does not expose this tag
- # so we use this as alternative approach for mp3 files
- # TODO: Convert all the tag reading to Mutagen!
- audiofile = eyed3.load(input_file)
- if audiofile is not None and audiofile.tag is not None:
- for uf_id in audiofile.tag.unique_file_ids:
- if uf_id.owner_id == b"http://musicbrainz.org" and uf_id.uniq_id:
- tags.tags["musicbrainzrecordingid"] = uf_id.uniq_id.decode()
- break
- if audiofile.tag.version == (2, 4, 0):
- # ffmpeg messes up reading ID3v2.4 tags from mp3 files, especially
- # on multi-item tags. We need to read the TXXX frames manually
- if frameset := audiofile.tag.frame_set.get(b"TXXX"):
- for raw_tag in frameset:
- if not hasattr(raw_tag, "description"):
- continue
- if raw_tag.description.upper() == "ARTISTS":
- tags.tags["artists"] = raw_tag.text.split("\x00\ufeff")
- if raw_tag.description == "MusicBrainz Artist Id":
- tags.tags["musicbrainzartistid"] = raw_tag.text.split("\x00\ufeff")
- if raw_tag.description == "MusicBrainz Album Artist Id":
- tags.tags["musicbrainzalbumartistid"] = raw_tag.text.split(
- "\x00\ufeff"
- )
- if frameset := audiofile.tag.frame_set.get(b"TSOP"):
- for raw_tag in frameset:
- if not hasattr(raw_tag, "text"):
- continue
- tags.tags["artistsort"] = raw_tag.text.split("\x00\ufeff")
- if frameset := audiofile.tag.frame_set.get(b"TSO2"):
- for raw_tag in frameset:
- if not hasattr(raw_tag, "text"):
- continue
- tags.tags["albumartistsort"] = raw_tag.text.split("\x00\ufeff")
- if frameset := audiofile.tag.frame_set.get(b"TCON"):
- for raw_tag in frameset:
- if not hasattr(raw_tag, "text"):
- continue
- tags.tags["genre"] = raw_tag.text.split("\x00\ufeff")
- del audiofile
+ # we parse all (basic) tags for all file formats using ffmpeg
+ # but we also try to extract some extra tags for local files using mutagen
+ if not input_file.startswith("http") and os.path.isfile(input_file):
+ extra_tags = parse_tags_mutagen(input_file)
+ if extra_tags:
+ tags.tags.update(extra_tags)
return tags
except subprocess.CalledProcessError as err:
error_msg = f"Unable to retrieve info for {input_file}"
raise InvalidDataError(msg) from err
+def parse_tags_mutagen(input_file: str) -> dict[str, Any]:
+ """
+ Parse tags from an audio file using Mutagen.
+
+ NOT Async friendly.
+ """
+ result = {}
+ try:
+ # TODO: extend with more tags and file types!
+ tags = mutagen.File(input_file)
+ if tags is None:
+ return result
+ tags = dict(tags.tags)
+ # ID3 tags
+ if "TIT2" in tags:
+ result["title"] = tags["TIT2"].text[0]
+ if "TPE1" in tags:
+ result["artist"] = tags["TPE1"].text[0]
+ if "TPE2" in tags:
+ result["albumartist"] = tags["TPE2"].text[0]
+ if "TALB" in tags:
+ result["album"] = tags["TALB"].text[0]
+ if "TCON" in tags:
+ result["genre"] = tags["TCON"].text
+ if "TXXX:ARTISTS" in tags:
+ result["artists"] = tags["TXXX:ARTISTS"].text
+ if "TXXX:MusicBrainz Album Id" in tags:
+ result["musicbrainzalbumid"] = tags["TXXX:MusicBrainz Album Id"].text[0]
+ if "TXXX:MusicBrainz Album Artist Id" in tags:
+ result["musicbrainzalbumartistid"] = tags["TXXX:MusicBrainz Album Artist Id"].text
+ if "TXXX:MusicBrainz Artist Id" in tags:
+ result["musicbrainzartistid"] = tags["TXXX:MusicBrainz Artist Id"].text
+ if "TXXX:MusicBrainz Release Group Id" in tags:
+ result["musicbrainzreleasegroupid"] = tags["TXXX:MusicBrainz Release Group Id"].text[0]
+ if "UFID:http://musicbrainz.org" in tags:
+ result["musicbrainzrecordingid"] = tags["UFID:http://musicbrainz.org"].data.decode()
+ if "TXXX:MusicBrainz Track Id" in tags:
+ result["musicbrainztrackid"] = tags["TXXX:MusicBrainz Track Id"].text[0]
+ if "TXXX:BARCODE" in tags:
+ result["barcode"] = tags["TXXX:BARCODE"].text
+ if "TXXX:TSRC" in tags:
+ result["tsrc"] = tags["TXXX:TSRC"].text
+ if "TSOP" in tags:
+ result["artistsort"] = tags["TSOP"].text
+ if "TSO2" in tags:
+ result["albumartistsort"] = tags["TSO2"].text
+ if "TSOT" in tags:
+ result["titlesort"] = tags["TSOT"].text
+ if "TSOA" in tags:
+ result["albumsort"] = tags["TSOA"].text
+
+ del tags
+ return result
+ except Exception as err:
+ LOGGER.debug(f"Error parsing mutagen tags for {input_file}: {err}")
+ return result
+
+
async def get_embedded_image(input_file: str) -> bytes | None:
"""Return embedded image data.