Make the URL Music provider a first class citizen.
- Grab metadata if a plain url (or file) is passed to play_media
- Determine if URL is a Track or Radio stream
- Allow URL's in local playlists
for track in await self.mass.music.playlists.tracks(
playlist.item_id, playlist.provider
):
+ if track.media_type != MediaType.TRACK:
+ # filter out radio items
+ continue
if track.metadata.genres:
playlist.metadata.genres.update(track.metadata.genres)
elif track.album and track.album.metadata.genres:
) -> bytes | str:
"""Get/create thumbnail image for path (image url or local path)."""
# check if we already have this cached in the db
- match = {"path": path, "size": size}
+ match_path = path.split("?")[0].split("&")[0]
+ match = {"path": match_path, "size": size}
if result := await self.mass.database.get_row(TABLE_THUMBS, match):
thumbnail = result["data"]
else:
) -> MediaItemType:
"""Get single music item by id and media type."""
assert provider or provider_id, "provider or provider_id must be supplied"
+ if provider == ProviderType.URL or provider_id == "url":
+ # handle special case of 'URL' MusicProvider which allows us to play regular url's
+ return await self.get_provider(ProviderType.URL).parse_item(item_id)
ctrl = self.get_controller(media_type)
return await ctrl.get(
provider_item_id=item_id,
import struct
from io import BytesIO
from time import time
-from typing import TYPE_CHECKING, AsyncGenerator, List, Tuple
+from typing import TYPE_CHECKING, AsyncGenerator, List, Optional, Tuple
import aiofiles
from aiohttp import ClientError, ClientTimeout
from music_assistant.helpers.process import AsyncProcess, check_output
from music_assistant.helpers.util import create_tempfile
-from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import (
AudioError,
MediaNotFoundError,
param media_item: The MediaItem (track/radio) for which to request the streamdetails for.
param queue_id: Optionally provide the queue_id which will play this stream.
"""
+ streamdetails = None
if queue_item.streamdetails and (time() < queue_item.streamdetails.expires):
# we already have fresh streamdetails, use these
queue_item.streamdetails.seconds_skipped = 0
queue_item.streamdetails.seconds_streamed = 0
streamdetails = queue_item.streamdetails
- elif queue_item.media_type == MediaType.URL:
- # handle URL provider items
- url_prov = mass.music.get_provider(ProviderType.URL)
- streamdetails = await url_prov.get_stream_details(queue_item.uri)
- else:
- # media item: fetch streamdetails from provider
- # always request the full db track as there might be other qualities available
- full_item = await mass.music.get_item_by_uri(queue_item.uri)
- # sort by quality and check track availability
- for prov_media in sorted(
- full_item.provider_ids, key=lambda x: x.quality or 0, reverse=True
- ):
- if not prov_media.available:
- continue
- # get streamdetails from provider
- music_prov = mass.music.get_provider(prov_media.prov_id)
- if not music_prov or not music_prov.available:
- continue # provider temporary unavailable ?
- try:
- streamdetails: StreamDetails = await music_prov.get_stream_details(
- prov_media.item_id
- )
- streamdetails.content_type = ContentType(streamdetails.content_type)
- except MusicAssistantError as err:
- LOGGER.warning(str(err))
- else:
- break
+
+ # fetch streamdetails from provider
+ # always request the full item as there might be other qualities available
+ full_item = await mass.music.get_item_by_uri(queue_item.uri)
+ # sort by quality and check track availability
+ for prov_media in sorted(
+ full_item.provider_ids, key=lambda x: x.quality or 0, reverse=True
+ ):
+ if not prov_media.available:
+ continue
+ # get streamdetails from provider
+ music_prov = mass.music.get_provider(prov_media.prov_id)
+ if not music_prov or not music_prov.available:
+ continue # provider temporary unavailable ?
+ try:
+ streamdetails: StreamDetails = await music_prov.get_stream_details(
+ prov_media.item_id
+ )
+ streamdetails.content_type = ContentType(streamdetails.content_type)
+ except MusicAssistantError as err:
+ LOGGER.warning(str(err))
+ else:
+ break
if not streamdetails:
raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}")
# set queue_id on the streamdetails so we know what is being streamed
streamdetails.queue_id = queue_id
# get gain correct / replaygain
- if not streamdetails.gain_correct:
+ if streamdetails.gain_correct is None:
loudness, gain_correct = await get_gain_correct(mass, streamdetails)
streamdetails.gain_correct = gain_correct
streamdetails.loudness = loudness
async def get_gain_correct(
mass: MusicAssistant, streamdetails: StreamDetails
-) -> Tuple[float, float]:
+) -> Tuple[Optional[float], Optional[float]]:
"""Get gain correction for given queue / track combination."""
queue = mass.players.get_player_queue(streamdetails.queue_id)
if not queue or not queue.settings.volume_normalization_enabled:
- return (0, 0)
+ return (None, None)
if streamdetails.gain_correct is not None:
return (streamdetails.loudness, streamdetails.gain_correct)
target_gain = queue.settings.volume_normalization_target
)
finally:
# send analyze job to background worker
- if streamdetails.loudness is None and streamdetails.media_type in (
- MediaType.TRACK,
- MediaType.RADIO,
- ):
+ if streamdetails.loudness is None:
mass.add_job(
analyze_audio(mass, streamdetails),
f"Analyze audio for {streamdetails.uri}",
# collect extra and filter args
extra_args = []
filter_params = []
- if streamdetails.gain_correct:
+ if streamdetails.gain_correct is not None:
filter_params.append(f"volume={streamdetails.gain_correct}dB")
if (
streamdetails.sample_rate != pcm_sample_rate
mass: MusicAssistant, path: str, size: Optional[int]
) -> bytes:
"""Create thumbnail from image url."""
- img_data = None
- if path.startswith("http"):
- async with mass.http_session.get(path, verify_ssl=False) as response:
- assert response.status == 200
- img_data = await response.read()
- else:
+ # always try ffmpeg first to get the image because it supports
+ # both online and offline image files as well as embedded images in media files
+ img_data = await get_embedded_image(path)
+ if not img_data:
# assume file from file provider, we need to fetch it here...
for prov in mass.music.providers:
if not prov.type.is_file():
continue
if not await prov.exists(path):
continue
- # embedded image in music file
+ path = await prov.resolve(path)
img_data = await get_embedded_image(path)
- # regular image file on disk
- if not img_data:
- async with prov.open_file(path) as _file:
- img_data = await _file.read()
break
if not img_data:
raise FileNotFoundError(f"Image not found: {path}")
import json
import os
from dataclasses import dataclass
-from typing import Any, Dict, Tuple
+from typing import Any, Dict, Optional, Tuple
+
+from requests import JSONDecodeError
from music_assistant.helpers.process import AsyncProcess
+from music_assistant.models.errors import InvalidDataError
FALLBACK_ARTIST = "Various Artists"
bits_per_sample: int
format: str
bit_rate: int
- duration: float
+ duration: Optional[float]
tags: Dict[str, str]
has_cover_image: bool
filename: str
return AudioTags(
raw=raw,
- sample_rate=int(audio_stream["sample_rate"]),
- channels=audio_stream["channels"],
+ sample_rate=int(audio_stream.get("sample_rate", 44100)),
+ channels=audio_stream.get("channels", 2),
bits_per_sample=int(
audio_stream.get(
"bits_per_raw_sample", audio_stream.get("bits_per_sample")
)
- )
- or 16,
+ or 16
+ ),
format=raw["format"]["format_name"],
bit_rate=int(raw["format"].get("bit_rate", 320)),
- duration=float(raw["format"]["duration"]),
+ duration=float(raw["format"].get("duration", 0)) or None,
tags=tags,
has_cover_image=has_cover_image,
filename=raw["format"]["filename"],
args, enable_stdin=False, enable_stdout=True, enable_stderr=False
) as proc:
- res, _ = await proc.communicate()
- return AudioTags.parse(json.loads(res))
+ try:
+ res, _ = await proc.communicate()
+ data = json.loads(res)
+ if error := data.get("error"):
+ raise InvalidDataError(error["string"])
+ return AudioTags.parse(data)
+ except (KeyError, ValueError, JSONDecodeError, InvalidDataError) as err:
+ raise InvalidDataError(
+ f"Unable to retrieve info for {file_path}: {str(err)}"
+ ) from err
async def get_embedded_image(file_path: str) -> bytes | None:
media_type_str = uri.split("/")[3]
media_type = MediaType(media_type_str)
item_id = uri.split("/")[4].split("?")[0]
+ elif uri.startswith("http://") or uri.startswith("https://"):
+ # Translate a plain URL to the URL provider
+ provider = ProviderType.URL
+ media_type = MediaType.UNKNOWN
+ item_id = uri
elif "://" in uri:
# music assistant-style uri
# provider://media_type/item_id
TRACK = "track"
PLAYLIST = "playlist"
RADIO = "radio"
- URL = "url"
FOLDER = "folder"
UNKNOWN = "unknown"
LOSSLESS_HI_RES_3 = 22 # 176/192khz 24 bits HI-RES
LOSSLESS_HI_RES_4 = 23 # above 192khz 24 bits HI-RES
+ @classmethod
+ def from_file_type(cls, file_type: str) -> "MediaQuality":
+ """Try to parse MediaQuality from file type/extension."""
+ if "mp3" in file_type:
+ return MediaQuality.LOSSY_MP3
+ if "ogg" in file_type:
+ return MediaQuality.LOSSY_OGG
+ if "aac" in file_type:
+ return MediaQuality.LOSSY_AAC
+ if "m4a" in file_type:
+ return MediaQuality.LOSSY_M4A
+ if "flac" in file_type:
+ return MediaQuality.LOSSLESS
+ if "wav" in file_type:
+ return MediaQuality.LOSSLESS
+ return MediaQuality.UNKNOWN
+
class LinkType(Enum):
"""Enum wth link types."""
async def library_add(self, prov_item_id: str, media_type: MediaType) -> bool:
"""Add item to provider's library. Return true on succes."""
- raise NotImplementedError
+ return True
async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
"""Remove item from provider's library. Return true on succes."""
- raise NotImplementedError
+ return True
async def add_playlist_tracks(
self, prov_playlist_id: str, prov_track_ids: List[str]
return await self.get_artist(prov_item_id)
if media_type == MediaType.ALBUM:
return await self.get_album(prov_item_id)
- if media_type == MediaType.TRACK:
- return await self.get_track(prov_item_id)
if media_type == MediaType.PLAYLIST:
return await self.get_playlist(prov_item_id)
if media_type == MediaType.RADIO:
return await self.get_radio(prov_item_id)
+ return await self.get_track(prov_item_id)
async def browse(self, path: Optional[str] = None) -> List[MediaItemType]:
"""
from __future__ import annotations
import asyncio
-import os
import pathlib
import random
from asyncio import TimerHandle
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
-from music_assistant.models.enums import EventType, MediaType, QueueOption, RepeatMode
+from music_assistant.models.enums import (
+ EventType,
+ MediaType,
+ ProviderType,
+ QueueOption,
+ RepeatMode,
+)
from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.models.event import MassEvent
from music_assistant.models.media_items import MediaItemType, media_from_dict
media_item = await self.mass.music.get_item_by_uri(item)
except MusicAssistantError as err:
# invalid MA uri or item not found error
- if item.startswith("http") or os.path.isfile(item):
- # a plain url (or local file) was provided
- queue_items.append(QueueItem.from_url(item))
- continue
raise MediaNotFoundError(f"Invalid uri: {item}") from err
elif isinstance(item, dict):
media_item = media_from_dict(item)
elif media_item.media_type in (
MediaType.RADIO,
MediaType.TRACK,
- MediaType.URL,
):
# single item
tracks = [media_item]
# prepend annnounce sound if needed
if announce:
- queue_item = QueueItem.from_url(ALERT_ANNOUNCE_FILE, "alert")
- queue_item.streamdetails.gain_correct = 10
+ url_prov = self.mass.music.get_provider(ProviderType.URL)
+ media_item = await url_prov.parse_item(ALERT_ANNOUNCE_FILE)
+ queue_item = QueueItem.from_media_item(media_item)
queue_items.append(queue_item)
- # parse provided uri into a MA MediaItem or Basic QueueItem from URL
+ # parse provided uri into a MA MediaItem
try:
media_item = await self.mass.music.get_item_by_uri(uri)
queue_items.append(QueueItem.from_media_item(media_item))
except MusicAssistantError as err:
# invalid MA uri or item not found error
- if uri.startswith("http") or os.path.isfile(uri):
- # a plain url was provided
- queue_item = QueueItem.from_url(uri, "alert")
- queue_item.streamdetails.gain_correct = gain_correct
- queue_items.append(queue_item)
- else:
- raise MediaNotFoundError(f"Invalid uri: {uri}") from err
+ raise MediaNotFoundError(f"Invalid uri: {uri}") from err
# start queue with alert sound(s)
self._items = queue_items
from mashumaro import DataClassDictMixin
-from music_assistant.models.enums import ContentType, MediaType, ProviderType
+from music_assistant.models.enums import MediaType
from music_assistant.models.media_items import Radio, StreamDetails, Track
d.pop("duration")
return d
- @classmethod
- def from_url(
- cls,
- url: str,
- name: Optional[str] = None,
- media_type: MediaType = MediaType.URL,
- ) -> QueueItem:
- """Create QueueItem from plain url (or local file)."""
- return cls(
- uri=url,
- name=name or url.split("?")[0],
- media_type=media_type,
- streamdetails=StreamDetails(
- provider=ProviderType.URL,
- item_id=url,
- content_type=ContentType.try_parse(url),
- media_type=media_type,
- data=url,
- ),
- )
-
@classmethod
def from_media_item(cls, media_item: Track | Radio):
"""Construct QueueItem from track/radio item."""
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
- itempath = await self.get_filepath(MediaType.ARTIST, prov_artist_id)
+ itempath = await self._get_filepath(MediaType.ARTIST, prov_artist_id)
if await self.exists(itempath):
# if path exists on disk allow parsing full details to allow refresh of metadata
return await self._parse_artist(artist_path=itempath)
)
if db_album is None:
raise MediaNotFoundError(f"Album not found: {prov_album_id}")
- itempath = await self.get_filepath(MediaType.ALBUM, prov_album_id)
+ itempath = await self._get_filepath(MediaType.ALBUM, prov_album_id)
if await self.exists(itempath):
# if path exists on disk allow parsing full details to allow refresh of metadata
return await self._parse_album(None, itempath, db_album.artists)
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- itempath = await self.get_filepath(MediaType.TRACK, prov_track_id)
+ itempath = await self._get_filepath(MediaType.TRACK, prov_track_id)
return await self._parse_track(itempath)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
- itempath = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
+ itempath = await self._get_filepath(MediaType.PLAYLIST, prov_playlist_id)
return await self._parse_playlist(itempath)
async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get playlist tracks for given playlist id."""
result = []
- playlist_path = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
+ playlist_path = await self._get_filepath(MediaType.PLAYLIST, prov_playlist_id)
if not await self.exists(playlist_path):
raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
playlist_base_path = Path(playlist_path).parent
self, prov_playlist_id: str, prov_track_ids: List[str]
) -> None:
"""Add track(s) to playlist."""
- itempath = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
+ itempath = await self._get_filepath(MediaType.PLAYLIST, prov_playlist_id)
if not await self.exists(itempath):
raise MediaNotFoundError(f"Playlist path does not exist: {itempath}")
async with self.open_file(itempath, "r") as _file:
self, prov_playlist_id: str, prov_track_ids: List[str]
) -> None:
"""Remove track(s) from playlist."""
- itempath = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
+ itempath = await self._get_filepath(MediaType.PLAYLIST, prov_playlist_id)
if not await self.exists(itempath):
raise MediaNotFoundError(f"Playlist path does not exist: {itempath}")
cur_lines = []
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- itempath = await self.get_filepath(MediaType.TRACK, item_id)
+ itempath = await self._get_filepath(MediaType.TRACK, item_id)
if not await self.exists(itempath):
raise MediaNotFoundError(f"Track path does not exist: {itempath}")
# ensure we have a full path and not relative
if self.config.path not in file_path:
file_path = os.path.join(self.config.path, file_path)
- # remote file locations should return a tempfile here ?
+ file_path = await self.resolve(file_path)
async with aiofiles.open(file_path, mode) as _file:
yield _file
- async def get_filepath(
+ async def resolve(self, file_path: str) -> str:
+ """Resolve local accessible file."""
+ # remote file locations should return a tempfile here so this is future proofing
+ return file_path
+
+ async def _get_filepath(
self, media_type: MediaType, prov_item_id: str
) -> str | None:
"""Get full filepath on disk for item_id."""
from __future__ import annotations
import os
-from typing import AsyncGenerator, List, Optional
+from typing import AsyncGenerator, List, Optional, Tuple
from music_assistant.helpers.audio import (
get_file_stream,
get_http_stream,
get_radio_stream,
)
+from music_assistant.helpers.tags import AudioTags, parse_tags
from music_assistant.models.config import MusicProviderConfig
-from music_assistant.models.enums import ContentType, MediaType, ProviderType
-from music_assistant.models.media_items import MediaItemType, StreamDetails
+from music_assistant.models.enums import (
+ ContentType,
+ ImageType,
+ MediaQuality,
+ MediaType,
+ ProviderType,
+)
+from music_assistant.models.media_items import (
+ Artist,
+ MediaItemImage,
+ MediaItemProviderId,
+ MediaItemType,
+ Radio,
+ StreamDetails,
+ Track,
+)
from music_assistant.models.music_provider import MusicProvider
PROVIDER_CONFIG = MusicProviderConfig(ProviderType.URL)
+# pylint: disable=arguments-renamed
+
class URLProvider(MusicProvider):
"""Music Provider for manual URL's/files added to the queue."""
_attr_available: bool = True
_attr_supports_browse: bool = False
_attr_supported_mediatypes: List[MediaType] = []
+ _full_url = {}
async def setup(self) -> bool:
"""
"""
return True
+ async def get_track(self, prov_track_id: str) -> Track:
+ """Get full track details by id."""
+ return await self.parse_item(prov_track_id)
+
+ async def get_radio(self, prov_radio_id: str) -> Radio:
+ """Get full radio details by id."""
+ return await self.parse_item(prov_radio_id)
+
+ async def get_artist(self, prov_artist_id: str) -> Track:
+ """Get full artist details by id."""
+ artist = prov_artist_id
+ # this is here for compatibility reasons only
+ return Artist(
+ artist,
+ self.type,
+ artist,
+ provider_ids={
+ MediaItemProviderId(artist, self.type, self.id, available=False)
+ },
+ )
+
+ async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
+ """Get single MediaItem from provider."""
+ if media_type == MediaType.ARTIST:
+ return await self.get_artist(prov_item_id)
+ if media_type == MediaType.TRACK:
+ return await self.get_track(prov_item_id)
+ if media_type == MediaType.RADIO:
+ return await self.get_radio(prov_item_id)
+ if media_type == MediaType.UNKNOWN:
+ return await self.parse_item(prov_item_id)
+ raise NotImplementedError
+
+ async def parse_item(
+ self, item_id_or_url: str, force_refresh: bool = False
+ ) -> Track | Radio:
+ """Parse plain URL to MediaItem of type Radio or Track."""
+ item_id, url, media_info = await self._get_media_info(
+ item_id_or_url, force_refresh
+ )
+ is_radio = media_info.get("icy-name") or not media_info.duration
+ if is_radio:
+ # treat as radio
+ media_item = Radio(
+ item_id=item_id,
+ provider=self.type,
+ name=media_info.get("icy-name") or media_info.title,
+ )
+ else:
+ media_item = Track(
+ item_id=item_id,
+ provider=self.type,
+ name=media_info.title,
+ duration=int(media_info.duration or 0),
+ artists=[
+ await self.get_artist(artist) for artist in media_info.artists
+ ],
+ )
+
+ quality = MediaQuality.from_file_type(media_info.format)
+ media_item.provider_ids = {
+ MediaItemProviderId(item_id, self.type, self.id, quality=quality)
+ }
+ if media_info.has_cover_image:
+ media_item.metadata.images = [MediaItemImage(ImageType.THUMB, url, True)]
+ return media_item
+
+ async def _get_media_info(
+ self, item_id_or_url: str, force_refresh: bool = False
+ ) -> Tuple[str, str, AudioTags]:
+ """Retrieve (cached) mediainfo for url."""
+ if "?" in item_id_or_url or "&" in item_id_or_url:
+ # store the 'real' full url to be picked up later
+ # this makes sure that we're not storing any temporary data like auth keys etc
+ # a request for an url mediaitem always passes here first before streamdetails
+ url = item_id_or_url
+ item_id = item_id_or_url.split("?")[0].split("&")[0]
+ self._full_url[item_id] = url
+ else:
+ url = self._full_url.get(item_id_or_url, item_id_or_url)
+ item_id = item_id_or_url
+ cache_key = f"{self.type.value}.media_info.{item_id}"
+ # do we have some cached info for this url ?
+ cached_info = await self.mass.cache.get(cache_key)
+ if cached_info and not force_refresh:
+ media_info = AudioTags.parse(cached_info)
+ else:
+ # parse info with ffprobe (and store in cache)
+ media_info = await parse_tags(url)
+ await self.mass.cache.set(cache_key, media_info.raw)
+ return (item_id, url, media_info)
+
async def search(
self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
) -> List[MediaItemType]:
async def get_stream_details(self, item_id: str) -> StreamDetails | None:
"""Get streamdetails for a track/radio."""
- url = item_id
+ item_id, url, media_info = await self._get_media_info(item_id)
+ is_radio = media_info.get("icy-name") or not media_info.duration
return StreamDetails(
- provider=ProviderType.URL,
+ provider=self.type,
item_id=item_id,
- content_type=ContentType.try_parse(url),
- media_type=MediaType.URL,
+ content_type=ContentType.try_parse(media_info.format),
+ media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
+ sample_rate=media_info.sample_rate,
+ bit_depth=media_info.bits_per_sample,
data=url,
)