"""All constants for Music Assistant."""
-__version__ = "0.0.69"
+__version__ = "0.0.70"
REQUIRED_PYTHON_VER = "3.7"
# configuration keys/attributes
EVENT_SHUTDOWN = "application shutdown"
EVENT_PROVIDER_REGISTERED = "provider registered"
EVENT_PROVIDER_UNREGISTERED = "provider unregistered"
+EVENT_ARTIST_ADDED = "artist added"
+EVENT_ALBUM_ADDED = "album added"
+EVENT_TRACK_ADDED = "track added"
+EVENT_PLAYLIST_ADDED = "playlist added"
+EVENT_RADIO_ADDED = "radio added"
# player attributes
ATTR_PLAYER_ID = "player_id"
return match
+def compare_version(left_version: str, right_version: str):
+ """Compare version string."""
+ if not left_version and not right_version:
+ return True
+ if not left_version and right_version:
+ return False
+ if left_version and not right_version:
+ return False
+ if " " not in left_version:
+ return compare_strings(left_version, right_version)
+ # do this the hard way as sometimes the version string is in the wrong order
+ left_versions = left_version.lower().split(" ").sort()
+ right_versions = right_version.lower().split(" ").sort()
+ return left_versions == right_versions
+
+
def compare_artists(left_artists: List[Artist], right_artists: List[Artist]):
"""Compare two lists of artist and return True if a match was found."""
for left_artist in left_artists:
def compare_album(left_album: Album, right_album: Album):
"""Compare two album items and return True if they match."""
+ # do not match on year and albumtype as this info is often inaccurate on providers
if (
left_album.provider == right_album.provider
and left_album.item_id == right_album.item_id
):
return True
- if left_album.upc and left_album.upc == right_album.upc:
- # UPC is always 100% accurate match
- return True
+ if left_album.upc and right_album.upc:
+ if left_album.upc in right_album.upc or right_album.upc in left_album.upc:
+ # UPC is always 100% accurate match
+ return True
if not compare_strings(left_album.name, right_album.name):
return False
- if not compare_strings(left_album.version, right_album.version):
+ if not compare_version(left_album.version, right_album.version):
return False
if not compare_strings(left_album.artist.name, right_album.artist.name):
return False
- if left_album.year != right_album.year:
- return False
# 100% match, all criteria passed
return True
# track name and version must match
if not compare_strings(left_track.name, right_track.name):
return False
- if not compare_strings(left_track.version, right_track.version):
+ if not compare_version(left_track.version, right_track.version):
return False
# track artist(s) must match
if not compare_artists(left_track.artists, right_track.artists):
async def async_get_thumb_file(mass: MusicAssistantType, url, size: int = 150):
"""Get path to (resized) thumbnail image for given image url."""
+ assert url
cache_folder = os.path.join(mass.config.data_path, ".thumbs")
cache_id = await mass.database.async_get_thumbnail_id(url, size)
cache_file = os.path.join(cache_folder, f"{cache_id}.png")
prev_version = packaging.version.parse(mass.config.stored_config.get("version", ""))
# perform version specific migrations
- if not is_fresh_setup and prev_version < packaging.version.parse("0.0.64"):
- await run_migration_0064(mass)
+ if not is_fresh_setup and prev_version < packaging.version.parse("0.0.70"):
+ await run_migration_0070(mass)
# store version in config
mass.config.stored_config["version"] = app_version
await async_create_db_tables(mass.database.db_file)
-async def run_migration_0064(mass: MusicAssistantType):
- """Run migration for version 0.0.64."""
- # 0.0.64 introduced major changes to all data models and db structure
+async def run_migration_0070(mass: MusicAssistantType):
+ """Run migration for version 0.0.70."""
+ # 0.0.70 introduced major changes to all data models and db structure
# a full refresh of data is unavoidable
data_path = mass.config.data_path
tracks_loudness = []
import asyncio
import logging
import subprocess
-import threading
-import time
from typing import AsyncGenerator, List, Optional
-LOGGER = logging.getLogger("mass.helpers.process")
+LOGGER = logging.getLogger("AsyncProcess")
class AsyncProcess:
def __init__(
self,
process_args: List,
- chunksize=512000,
enable_write: bool = False,
enable_shell=False,
):
"""Initialize."""
- self._process_args = process_args
- self._chunksize = chunksize
- self._enable_write = enable_write
- self._enable_shell = enable_shell
+ self._proc = subprocess.Popen(
+ process_args,
+ shell=enable_shell,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE if enable_write else None,
+ )
self.loop = asyncio.get_running_loop()
- self.__queue_in = asyncio.Queue(4)
- self.__queue_out = asyncio.Queue(8)
- self.__proc_task = None
- self._exit = False
- self._id = int(time.time()) # some identifier for logging
+ self._cancelled = False
async def __aenter__(self) -> "AsyncProcess":
- """Enter context manager, start running the process in executor."""
- self.__proc_task = self.loop.run_in_executor(None, self.__run_proc)
+ """Enter context manager."""
return self
async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
"""Exit context manager."""
- if exc_type:
- LOGGER.debug(
- "[%s] Context manager exit with exception %s (%s)",
- self._id,
- exc_type,
- str(exc_value),
- )
-
- self._exit = True
- # prevent a deadlock by clearing the queues
- while self.__queue_in.qsize():
- await self.__queue_in.get()
- self.__queue_in.task_done()
- self.__queue_in.put_nowait(b"")
- while self.__queue_out.qsize():
- await self.__queue_out.get()
- self.__queue_out.task_done()
- await self.__proc_task
- return True
-
- async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
- """Yield chunks from the output Queue. Generator."""
+ self._cancelled = True
+ if await self.loop.run_in_executor(None, self._proc.poll) is None:
+ # prevent subprocess deadlocking, send terminate and read remaining bytes
+ await self.loop.run_in_executor(None, self._proc.kill)
+ self.loop.run_in_executor(None, self.__read)
+ del self._proc
+
+ async def iterate_chunks(
+ self, chunksize: int = 512000
+ ) -> AsyncGenerator[bytes, None]:
+ """Yield chunks from the process stdout. Generator."""
while True:
- chunk = await self.read()
- yield chunk
- if not chunk or len(chunk) < self._chunksize:
+ chunk = await self.read(chunksize)
+ if not chunk:
break
+ yield chunk
+
+ async def read(self, chunksize: int = -1) -> bytes:
+ """Read x bytes from the process stdout."""
+ if self._cancelled:
+ raise asyncio.CancelledError()
+ return await self.loop.run_in_executor(None, self.__read, chunksize)
- async def read(self) -> bytes:
- """Read single chunk from the output Queue."""
- if self._exit:
- raise RuntimeError("Already exited")
- data = await self.__queue_out.get()
- self.__queue_out.task_done()
- return data
+ def __read(self, chunksize: int = -1):
+ """Try read chunk from process."""
+ try:
+ return self._proc.stdout.read(chunksize)
+ except (BrokenPipeError, ValueError, AttributeError):
+ # Process already exited
+ return b""
async def write(self, data: bytes) -> None:
- """Write data to process."""
- if self._exit:
- raise RuntimeError("Already exited")
- await self.__queue_in.put(data)
+ """Write data to process stdin."""
+ if self._cancelled:
+ raise asyncio.CancelledError()
+
+ def __write():
+ try:
+ self._proc.stdin.write(data)
+ except (BrokenPipeError, ValueError, AttributeError):
+ # Process already exited
+ pass
+
+ await self.loop.run_in_executor(None, __write)
async def write_eof(self) -> None:
"""Write eof to process."""
- await self.__queue_in.put(b"")
+ if self._cancelled:
+ raise asyncio.CancelledError()
+
+ def __write_eof():
+ try:
+ self._proc.stdin.close()
+ except (BrokenPipeError, ValueError, AttributeError):
+ # Process already exited
+ pass
+
+ await self.loop.run_in_executor(None, __write_eof)
async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
"""Write bytes to process and read back results."""
- if not self._enable_write and input_data:
- raise RuntimeError("Write is disabled")
- if input_data:
- await self.write(input_data)
- await self.write_eof()
- output = b""
- async for chunk in self.iterate_chunks():
- output += chunk
- return output
-
- def __run_proc(self):
- """Run process in executor."""
- try:
- proc = subprocess.Popen(
- self._process_args,
- shell=self._enable_shell,
- stdout=subprocess.PIPE,
- stdin=subprocess.PIPE if self._enable_write else None,
- )
- if self._enable_write:
- threading.Thread(
- target=self.__write_stdin,
- args=(proc.stdin,),
- name=f"AsyncProcess_{self._id}_write_stdin",
- daemon=True,
- ).start()
- threading.Thread(
- target=self.__read_stdout,
- args=(proc.stdout,),
- name=f"AsyncProcess_{self._id}_read_stdout",
- daemon=True,
- ).start()
- proc.wait()
-
- except Exception as exc: # pylint: disable=broad-except
- LOGGER.warning("[%s] process exiting abormally: %s", self._id, str(exc))
- LOGGER.exception(exc)
- finally:
- if proc.poll() is None:
- proc.terminate()
- proc.communicate()
-
- def __write_stdin(self, _stdin):
- """Put chunks from queue to stdin."""
- try:
- while True:
- chunk = asyncio.run_coroutine_threadsafe(
- self.__queue_in.get(), self.loop
- ).result()
- self.__queue_in.task_done()
- if not chunk:
- _stdin.close()
- break
- _stdin.write(chunk)
- except Exception as exc: # pylint: disable=broad-except
- LOGGER.debug(
- "[%s] write to stdin aborted with exception: %s", self._id, str(exc)
- )
-
- def __read_stdout(self, _stdout):
- """Put chunks from stdout to queue."""
- try:
- while True:
- chunk = _stdout.read(self._chunksize)
- asyncio.run_coroutine_threadsafe(
- self.__queue_out.put(chunk), self.loop
- ).result()
- if not chunk or len(chunk) < self._chunksize:
- break
- # write empty chunk just in case
- asyncio.run_coroutine_threadsafe(self.__queue_out.put(b""), self.loop)
- except Exception as exc: # pylint: disable=broad-except
- LOGGER.debug(
- "[%s] read from stdout aborted with exception: %s", self._id, str(exc)
- )
+ if self._cancelled:
+ raise asyncio.CancelledError()
+ stdout, _ = await self.loop.run_in_executor(
+ None, self._proc.communicate, input_data
+ )
+ return stdout
"remix",
"mix",
"acoustic",
- " instrumental",
+ "instrumental",
"karaoke",
"remaster",
"versie",
"radio",
"unplugged",
"disco",
+ "akoestisch",
+ "deluxe",
]:
if version_str in title_part:
version = title_part
if not version and track_version:
version = track_version
version = get_version_substitute(version).title()
+ if version == title:
+ version = ""
return title, version
from music_assistant.helpers.web import json_serializer
from music_assistant.models.media_types import (
Album,
+ AlbumType,
Artist,
FullAlbum,
FullTrack,
)
metadata = merge_dict(cur_item.metadata, album.metadata)
provider_ids = merge_list(cur_item.provider_ids, album.provider_ids)
+ if cur_item.album_type == AlbumType.Unknown:
+ album_type = album.album_type
+ else:
+ album_type = cur_item.album_type
sql_query = """UPDATE albums
SET upc=?,
artist=?,
metadata=?,
- provider_ids=?
+ provider_ids=?,
+ album_type=?
WHERE item_id=?;"""
await db_conn.execute(
sql_query,
json_serializer(album_artist),
json_serializer(metadata),
json_serializer(provider_ids),
+ album_type.value,
item_id,
),
)
prev_db_ids = await self.mass.cache.async_get(cache_key, default=[])
cur_db_ids = []
for item in await music_provider.async_get_library_artists():
- db_item = await self.mass.music.async_get_artist(item.item_id, provider_id)
+ db_item = await self.mass.music.async_get_artist(
+ item.item_id, provider_id, lazy=False
+ )
cur_db_ids.append(db_item.item_id)
if not db_item.in_library:
await self.mass.database.async_add_to_library(
prev_db_ids = await self.mass.cache.async_get(cache_key, default=[])
cur_db_ids = []
for item in await music_provider.async_get_library_albums():
- db_album = await self.mass.music.async_get_album(item.item_id, provider_id)
- if db_album.available != item.available:
+ db_album = await self.mass.music.async_get_album(
+ item.item_id, provider_id, lazy=False
+ )
+ if not db_album.available and not item.available:
# album availability changed, sort this out with auto matching magic
db_album = await self.mass.music.async_match_album(db_album)
cur_db_ids.append(db_album.item_id)
db_album.item_id, MediaType.Album, provider_id
)
# precache album tracks
- for album_track in await self.mass.music.async_get_album_tracks(
- item.item_id, provider_id
- ):
- # try to find substitutes for unavailable tracks with matching technique
- if not album_track.available:
- if album_track.provider == "database":
- await self.mass.music.async_match_track(album_track)
- else:
- await self.mass.music.async_add_track(album_track)
+ await self.mass.music.async_get_album_tracks(item.item_id, provider_id)
# process album deletions
for db_id in prev_db_ids:
if db_id not in cur_db_ids:
prev_db_ids = await self.mass.cache.async_get(cache_key, default=[])
cur_db_ids = []
for item in await music_provider.async_get_library_tracks():
- db_item = await self.mass.music.async_get_track(item.item_id, provider_id)
- if db_item.available != item.available:
+ db_item = await self.mass.music.async_get_track(
+ item.item_id, provider_id, lazy=False
+ )
+ if not db_item.available and not item.available:
# track availability changed, sort this out with auto matching magic
db_item = await self.mass.music.async_add_track(item)
cur_db_ids.append(db_item.item_id)
cur_db_ids = []
for playlist in await music_provider.async_get_library_playlists():
db_item = await self.mass.music.async_get_playlist(
- playlist.item_id, provider_id
+ playlist.item_id, provider_id, lazy=False
)
if db_item.checksum != playlist.checksum:
db_item = await self.mass.database.async_add_playlist(playlist)
playlist.item_id, provider_id
):
# try to find substitutes for unavailable tracks with matching technique
- if not playlist_track.available:
+ if not db_item.available and not playlist_track.available:
if playlist_track.provider == "database":
await self.mass.music.async_match_track(playlist_track)
else:
prev_db_ids = await self.mass.cache.async_get(cache_key, default=[])
cur_db_ids = []
for item in await music_provider.async_get_library_radios():
- db_radio = await self.mass.music.async_get_radio(item.item_id, provider_id)
+ db_radio = await self.mass.music.async_get_radio(
+ item.item_id, provider_id, lazy=False
+ )
cur_db_ids.append(db_radio.item_id)
await self.mass.database.async_add_to_library(
db_radio.item_id, MediaType.Radio, provider_id
self.cache, cache_key, provider.async_get_artist_images, mb_artist_id
)
if res:
- merge_dict(metadata, res)
+ metadata = merge_dict(metadata, res)
return metadata
import logging
from typing import List
+from music_assistant.constants import (
+ EVENT_ALBUM_ADDED,
+ EVENT_ARTIST_ADDED,
+ EVENT_PLAYLIST_ADDED,
+ EVENT_RADIO_ADDED,
+ EVENT_TRACK_ADDED,
+)
from music_assistant.helpers.cache import async_cached
from music_assistant.helpers.compare import (
compare_album,
compare_strings,
compare_track,
)
-from music_assistant.helpers.encryption import async_encrypt_string
from music_assistant.helpers.musicbrainz import MusicBrainz
from music_assistant.helpers.util import unique_item_ids
from music_assistant.helpers.web import api_route
from music_assistant.models.media_types import (
Album,
+ AlbumType,
Artist,
FullAlbum,
- FullTrack,
+ ItemMapping,
MediaItem,
MediaType,
Playlist,
@api_route("items/:media_type/:provider_id/:item_id")
async def async_get_item(
- self, item_id: str, provider_id: str, media_type: MediaType
+ self,
+ item_id: str,
+ provider_id: str,
+ media_type: MediaType,
+ refresh: bool = False,
+ lazy: bool = True,
):
"""Get single music item by id and media type."""
if media_type == MediaType.Artist:
- return await self.async_get_artist(item_id, provider_id)
+ return await self.async_get_artist(
+ item_id, provider_id, refresh=refresh, lazy=lazy
+ )
if media_type == MediaType.Album:
- return await self.async_get_album(item_id, provider_id)
+ return await self.async_get_album(
+ item_id, provider_id, refresh=refresh, lazy=lazy
+ )
if media_type == MediaType.Track:
- return await self.async_get_track(item_id, provider_id)
+ return await self.async_get_track(
+ item_id, provider_id, refresh=refresh, lazy=lazy
+ )
if media_type == MediaType.Playlist:
- return await self.async_get_playlist(item_id, provider_id)
+ return await self.async_get_playlist(
+ item_id, provider_id, refresh=refresh, lazy=lazy
+ )
if media_type == MediaType.Radio:
- return await self.async_get_radio(item_id, provider_id)
+ return await self.async_get_radio(
+ item_id, provider_id, refresh=refresh, lazy=lazy
+ )
return None
@api_route("artists/:provider_id/:item_id")
async def async_get_artist(
- self, item_id: str, provider_id: str, refresh=False
+ self, item_id: str, provider_id: str, refresh: bool = False, lazy: bool = True
) -> Artist:
"""Return artist details for the given provider artist id."""
if provider_id == "database" and not refresh:
elif db_item:
return db_item
artist = await self.__async_get_provider_artist(item_id, provider_id)
- # fetching an artist is slow because of musicbrainz and metadata lookup
- # so we return the provider object
- self.mass.add_job(self.async_add_artist(artist))
- return artist
+ if not lazy:
+ return await self.async_add_artist(artist)
+ self.mass.add_background_task(self.async_add_artist(artist))
+ return db_item if db_item else artist
async def __async_get_provider_artist(
self, item_id: str, provider_id: str
@api_route("albums/:provider_id/:item_id")
async def async_get_album(
- self, item_id: str, provider_id: str, refresh=False
+ self, item_id: str, provider_id: str, refresh: bool = False, lazy: bool = True
) -> Album:
"""Return album details for the given provider album id."""
if provider_id == "database" and not refresh:
elif db_item:
return db_item
album = await self.__async_get_provider_album(item_id, provider_id)
- return await self.async_add_album(album)
+ if not lazy:
+ return await self.async_add_album(album)
+ self.mass.add_background_task(self.async_add_album(album))
+ return db_item if db_item else album
async def __async_get_provider_album(self, item_id: str, provider_id: str) -> Album:
"""Return album details for the given provider album id."""
track_details: Track = None,
album_details: Album = None,
refresh: bool = False,
+ lazy: bool = True,
) -> Track:
"""Return track details for the given provider track id."""
if provider_id == "database" and not refresh:
provider_id, item_id
)
if db_item and refresh:
- # in some cases (e.g. at playback time or requesting full track info)
- # it's useful to have the track refreshed from the provider instead of
- # the database cache to make sure that the track is available and perhaps
- # another or a higher quality version is available.
provider_id, item_id = await self.__get_provider_id(db_item)
elif db_item:
return db_item
track_details = await self.__async_get_provider_track(item_id, provider_id)
if album_details:
track_details.album = album_details
- return await self.async_add_track(track_details)
+ if not lazy:
+ return await self.async_add_track(track_details)
+ self.mass.add_background_task(self.async_add_track(track_details))
+ return db_item if db_item else track_details
- async def __async_get_provider_track(self, item_id: str, provider_id: str) -> Album:
+ async def __async_get_provider_track(self, item_id: str, provider_id: str) -> Track:
"""Return track details for the given provider track id."""
provider = self.mass.get_provider(provider_id)
if not provider or not provider.available:
return track
@api_route("playlists/:provider_id/:item_id")
- async def async_get_playlist(self, item_id: str, provider_id: str) -> Playlist:
+ async def async_get_playlist(
+ self, item_id: str, provider_id: str, refresh: bool = False, lazy: bool = True
+ ) -> Playlist:
"""Return playlist details for the given provider playlist id."""
assert item_id and provider_id
db_item = await self.mass.database.async_get_playlist_by_prov_id(
provider_id, item_id
)
- if not db_item:
- # item not yet in local database so fetch and store details
- provider = self.mass.get_provider(provider_id)
- if not provider.available:
- return None
- item_details = await provider.async_get_playlist(item_id)
- db_item = await self.mass.database.async_add_playlist(item_details)
- return db_item
+ if db_item and refresh:
+ provider_id, item_id = await self.__get_provider_id(db_item)
+ elif db_item:
+ return db_item
+ playlist = await self.__async_get_provider_playlist(item_id, provider_id)
+ if not lazy:
+ return await self.async_add_playlist(playlist)
+ self.mass.add_background_task(self.async_add_playlist(playlist))
+ return db_item if db_item else playlist
+
+ async def __async_get_provider_playlist(
+ self, item_id: str, provider_id: str
+ ) -> Playlist:
+ """Return playlist details for the given provider playlist id."""
+ provider = self.mass.get_provider(provider_id)
+ if not provider or not provider.available:
+ raise Exception("Provider %s is not available!" % provider_id)
+ cache_key = f"{provider_id}.get_playlist.{item_id}"
+ playlist = await async_cached(
+ self.cache,
+ cache_key,
+ provider.async_get_playlist,
+ item_id,
+ expires=86400 * 2,
+ )
+ if not playlist:
+ raise Exception(
+ "Playlist %s not found on provider %s" % (item_id, provider_id)
+ )
+ return playlist
@api_route("radios/:provider_id/:item_id")
- async def async_get_radio(self, item_id: str, provider_id: str) -> Radio:
- """Return radio details for the given provider playlist id."""
+ async def async_get_radio(
+ self, item_id: str, provider_id: str, refresh: bool = False, lazy: bool = True
+ ) -> Radio:
+ """Return radio details for the given provider radio id."""
assert item_id and provider_id
db_item = await self.mass.database.async_get_radio_by_prov_id(
provider_id, item_id
)
- if not db_item:
- # item not yet in local database so fetch and store details
- provider = self.mass.get_provider(provider_id)
- if not provider.available:
- return None
- item_details = await provider.async_get_radio(item_id)
- db_item = await self.mass.database.async_add_radio(item_details)
- return db_item
+ if db_item and refresh:
+ provider_id, item_id = await self.__get_provider_id(db_item)
+ elif db_item:
+ return db_item
+ radio = await self.__async_get_provider_radio(item_id, provider_id)
+ if not lazy:
+ return await self.async_add_radio(radio)
+ self.mass.add_background_task(self.async_add_radio(radio))
+ return db_item if db_item else radio
+
+ async def __async_get_provider_radio(self, item_id: str, provider_id: str) -> Radio:
+ """Return radio details for the given provider playlist id."""
+ provider = self.mass.get_provider(provider_id)
+ if not provider or not provider.available:
+ raise Exception("Provider %s is not available!" % provider_id)
+ cache_key = f"{provider_id}.get_radio.{item_id}"
+ radio = await async_cached(
+ self.cache, cache_key, provider.async_get_radio, item_id
+ )
+ if not radio:
+ raise Exception(
+ "Radio %s not found on provider %s" % (item_id, provider_id)
+ )
+ return radio
@api_route("albums/:provider_id/:item_id/tracks")
async def async_get_album_tracks(
if streamdetails:
# set player_id on the streamdetails so we know what players stream
streamdetails.player_id = player_id
- # store the path encrypted as we do not want it to be visible in the api
- streamdetails.path = await async_encrypt_string(streamdetails.path)
# set streamdetails as attribute on the media_item
# this way the app knows what content is playing
media_item.streamdetails = streamdetails
################ ADD MediaItem(s) to database helpers ################
- async def async_add_artist(self, artist: Artist) -> int:
+ async def async_add_artist(self, artist: Artist) -> Artist:
"""Add artist to local db and return the database item."""
if not artist.musicbrainz_id:
artist.musicbrainz_id = await self.__async_get_artist_musicbrainz_id(artist)
)
db_item = await self.mass.database.async_add_artist(artist)
# also fetch same artist on all providers
- self.mass.add_background_task(self.async_match_artist(db_item))
- self.mass.signal_event("artist added", db_item)
+ await self.async_match_artist(db_item)
+ self.mass.signal_event(EVENT_ARTIST_ADDED, db_item)
return db_item
- async def async_add_album(self, album: Album) -> int:
+ async def async_add_album(self, album: Album) -> Album:
"""Add album to local db and return the database item."""
# make sure we have an artist
assert album.artist
db_item = await self.mass.database.async_add_album(album)
# also fetch same album on all providers
- self.mass.add_background_task(self.async_match_album(db_item))
- self.mass.signal_event("album added", db_item)
+ await self.async_match_album(db_item)
+ self.mass.signal_event(EVENT_ALBUM_ADDED, db_item)
return db_item
- async def async_add_track(self, track: Track) -> int:
- """Add track to local db and return the new database id."""
+ async def async_add_track(self, track: Track) -> Track:
+ """Add track to local db and return the new database item."""
# make sure we have artists
assert track.artists
# make sure we have an album
assert track.album or track.albums
db_item = await self.mass.database.async_add_track(track)
# also fetch same track on all providers (will also get other quality versions)
- self.mass.add_background_task(self.async_match_track(db_item))
+ await self.async_match_track(db_item)
+ self.mass.signal_event(EVENT_TRACK_ADDED, db_item)
+ return db_item
+
+ async def async_add_playlist(self, playlist: Playlist) -> Playlist:
+ """Add playlist to local db and return the new database item."""
+ db_item = await self.mass.database.async_add_playlist(playlist)
+ self.mass.signal_event(EVENT_PLAYLIST_ADDED, db_item)
+ return db_item
+
+ async def async_add_radio(self, radio: Radio) -> Radio:
+ """Add radio to local db and return the new database item."""
+ db_item = await self.mass.database.async_add_radio(radio)
+ self.mass.signal_event(EVENT_RADIO_ADDED, db_item)
return db_item
async def __async_get_artist_musicbrainz_id(self, artist: Artist):
):
if not lookup_album:
continue
+ if artist.name != lookup_album.artist.name:
+ continue
musicbrainz_id = await self.musicbrainz.async_get_mb_artist_id(
artist.name,
albumname=lookup_album.name,
for provider in self.mass.get_providers(ProviderType.MUSIC_PROVIDER):
if provider.id in cur_providers:
continue
- if Artist not in provider.supported_mediatypes:
+ if MediaType.Artist not in provider.supported_mediatypes:
continue
if not await self.__async_match_prov_artist(db_artist, provider):
LOGGER.debug(
LOGGER.debug(
"Trying to match artist %s on provider %s", db_artist.name, provider.name
)
- # try to get a match with some reference albums of this artist
- for ref_album in await self.async_get_artist_albums(
- db_artist.item_id, db_artist.provider
- ):
- searchstr = "%s - %s" % (db_artist.name, ref_album.name)
- search_result = await self.async_search_provider(
- searchstr, provider.id, [MediaType.Album], limit=10
- )
- for search_result_item in search_result.albums:
- if compare_album(search_result_item, ref_album):
- # 100% album match, we can simply update the db with the provider id
- await self.mass.database.async_update_artist(
- db_artist.item_id, search_result_item.artist
- )
- return True
-
# try to get a match with some reference tracks of this artist
for ref_track in await self.async_get_artist_toptracks(
db_artist.item_id, db_artist.provider
):
- searchstr = "%s - %s" % (db_artist.name, ref_track.name)
+ # make sure we have a full track
+ if isinstance(ref_track.album, ItemMapping):
+ ref_track = await self.async_get_track(
+ ref_track.item_id, ref_track.provider
+ )
+ searchstr = "%s %s" % (db_artist.name, ref_track.name)
search_results = await self.async_search_provider(
- searchstr, provider.id, [MediaType.Track], limit=10
+ searchstr, provider.id, [MediaType.Track], limit=25
)
for search_result_item in search_results.tracks:
if compare_track(search_result_item, ref_track):
# get matching artist from track
for search_item_artist in search_result_item.artists:
if compare_strings(db_artist.name, search_item_artist.name):
- # 100% match, we can simply update the db with additional provider ids
+ # 100% album match
+ # get full artist details so we have all metadata
+ prov_artist = await self.__async_get_provider_artist(
+ search_item_artist.item_id, search_item_artist.provider
+ )
await self.mass.database.async_update_artist(
- db_artist.item_id, search_item_artist
+ db_artist.item_id, prov_artist
)
return True
+ # try to get a match with some reference albums of this artist
+ artist_albums = await self.async_get_artist_albums(
+ db_artist.item_id, db_artist.provider
+ )
+ for ref_album in artist_albums[:50]:
+ if ref_album.album_type == AlbumType.Compilation:
+ continue
+ searchstr = "%s %s" % (db_artist.name, ref_album.name)
+ search_result = await self.async_search_provider(
+ searchstr, provider.id, [MediaType.Album], limit=25
+ )
+ for search_result_item in search_result.albums:
+ # artist must match 100%
+ if not compare_strings(db_artist.name, search_result_item.artist.name):
+ continue
+ if compare_album(search_result_item, ref_album):
+ # 100% album match
+ # get full artist details so we have all metadata
+ prov_artist = await self.__async_get_provider_artist(
+ search_result_item.artist.item_id,
+ search_result_item.artist.provider,
+ )
+ await self.mass.database.async_update_artist(
+ db_artist.item_id, prov_artist
+ )
+ return True
return False
async def async_match_album(self, db_album: Album):
"Trying to match album %s on provider %s", db_album.name, provider.name
)
match_found = False
- searchstr = "%s - %s" % (db_album.artist.name, db_album.name)
+ searchstr = "%s %s" % (db_album.artist.name, db_album.name)
if db_album.version:
searchstr += " " + db_album.version
search_result = await self.async_search_provider(
- searchstr, provider.id, [MediaType.Album], limit=5
+ searchstr, provider.id, [MediaType.Album], limit=25
)
for search_result_item in search_result.albums:
if not search_result_item.available:
continue
- if compare_album(search_result_item, db_album):
+ if not compare_album(search_result_item, db_album):
+ continue
+ # we must fetch the full album version, search results are simplified objects
+ prov_album = await self.__async_get_provider_album(
+ search_result_item.item_id, search_result_item.provider
+ )
+ if compare_album(prov_album, db_album):
# 100% match, we can simply update the db with additional provider ids
await self.mass.database.async_update_album(
- db_album.item_id, search_result_item
+ db_album.item_id, prov_album
)
match_found = True
+ # while we're here, also match the artist
+ if db_album.artist.provider == "database":
+ prov_artist = await self.__async_get_provider_artist(
+ prov_album.artist.item_id, prov_album.artist.provider
+ )
+ await self.mass.database.async_update_artist(
+ db_album.artist.item_id, prov_artist
+ )
+
# no match found
if not match_found:
LOGGER.debug(
# try to find match on all providers
providers = self.mass.get_providers(ProviderType.MUSIC_PROVIDER)
for provider in providers:
- if Album in provider.supported_mediatypes:
+ if MediaType.Album in provider.supported_mediatypes:
await find_prov_match(provider)
async def async_match_track(self, db_track: Track):
assert (
db_track.provider == "database"
), "Matching only supported for database items!"
- if not isinstance(db_track, FullTrack):
+ if isinstance(db_track.album, ItemMapping):
# matching only works if we have a full track object
db_track = await self.mass.database.async_get_track(db_track.item_id)
for provider in self.mass.get_providers(ProviderType.MUSIC_PROVIDER):
- if Track not in provider.supported_mediatypes:
+ if MediaType.Track not in provider.supported_mediatypes:
continue
LOGGER.debug(
"Trying to match track %s on provider %s", db_track.name, provider.name
for db_track_artist in db_track.artists:
if match_found:
break
- searchstr = "%s - %s" % (db_track_artist.name, db_track.name)
+ searchstr = "%s %s" % (db_track_artist.name, db_track.name)
if db_track.version:
searchstr += " " + db_track.version
search_result = await self.async_search_provider(
- searchstr, provider.id, [MediaType.Track], limit=10
+ searchstr, provider.id, [MediaType.Track], limit=25
)
for search_result_item in search_result.tracks:
if not search_result_item.available:
await self.mass.database.async_update_track(
db_track.item_id, search_result_item
)
+ # while we're here, also match the artist
+ if db_track_artist.provider == "database":
+ for artist in search_result_item.artists:
+ if not compare_strings(
+ db_track_artist.name, artist.name
+ ):
+ continue
+ prov_artist = await self.__async_get_provider_artist(
+ artist.item_id, artist.provider
+ )
+ await self.mass.database.async_update_artist(
+ db_track_artist.item_id, prov_artist
+ )
if not match_found:
LOGGER.debug(
EVENT_STREAM_ENDED,
EVENT_STREAM_STARTED,
)
-from music_assistant.helpers.encryption import async_decrypt_string
from music_assistant.helpers.process import AsyncProcess
from music_assistant.helpers.typing import MusicAssistantType
from music_assistant.helpers.util import create_tempfile, get_ip, try_parse_int
if resample:
args += ["rate", "-v", str(resample)]
- async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc:
+ LOGGER.debug(
+ "start sox stream for: %s/%s", streamdetails.provider, streamdetails.item_id
+ )
- cancelled = False
+ async with AsyncProcess(args, enable_write=True) as sox_proc:
async def fill_buffer():
"""Forward audio chunks to sox stdin."""
# feed audio data into sox stdin for processing
async for chunk in self.async_get_media_stream(streamdetails):
- if self.mass.exit or cancelled or not chunk:
+ if not chunk:
break
await sox_proc.write(chunk)
await sox_proc.write_eof()
fill_buffer_task = self.mass.loop.create_task(fill_buffer())
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
- try:
- prev_chunk = b""
- async for chunk in sox_proc.iterate_chunks():
- if len(chunk) < chunk_size:
- # last chunk
- yield (True, prev_chunk + chunk)
- break
- if prev_chunk:
- yield (False, prev_chunk)
- prev_chunk = chunk
-
- await asyncio.wait([fill_buffer_task])
-
- # pylint: disable=broad-except
- except (
- GeneratorExit,
- asyncio.CancelledError,
- Exception,
- ) as exc:
- cancelled = True
- fill_buffer_task.cancel()
- LOGGER.debug(
- "[async_get_sox_stream] [%s/%s] cancelled: %s",
- streamdetails.provider,
- streamdetails.item_id,
- str(exc),
- )
+ prev_chunk = b""
+ async for chunk in sox_proc.iterate_chunks(chunk_size):
+ if len(chunk) < chunk_size:
+ # last chunk
+ yield (True, prev_chunk + chunk)
+ break
+ if prev_chunk:
+ yield (False, prev_chunk)
+ prev_chunk = chunk
+ await asyncio.wait([fill_buffer_task])
+ LOGGER.debug(
+ "finished sox stream for: %s/%s",
+ streamdetails.provider,
+ streamdetails.item_id,
+ )
async def async_queue_stream_flac(self, player_id) -> AsyncGenerator[bytes, None]:
"""Stream the PlayerQueue's tracks as constant feed in flac format."""
"flac",
"-",
]
- async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc:
+ async with AsyncProcess(args, enable_write=True) as sox_proc:
# feed stdin with pcm samples
- cancelled = False
-
async def fill_buffer():
"""Feed audio data into sox stdin for processing."""
async for chunk in self.async_queue_stream_pcm(
player_id, sample_rate, 32
):
- if self.mass.exit or cancelled or not chunk:
+ if not chunk:
break
await sox_proc.write(chunk)
- # write eof when no more data
- await sox_proc.write_eof()
fill_buffer_task = self.mass.loop.create_task(fill_buffer())
- try:
- # start yielding audio chunks
- async for chunk in sox_proc.iterate_chunks():
- yield chunk
- await asyncio.wait([fill_buffer_task])
- # pylint: disable=broad-except
- except (
- GeneratorExit,
- asyncio.CancelledError,
- Exception,
- ) as exc:
- cancelled = True
- fill_buffer_task.cancel()
- LOGGER.debug(
- "[async_queue_stream_flac] [%s] cancelled: %s", player_id, str(exc)
- )
+
+ # start yielding audio chunks
+ async for chunk in sox_proc.iterate_chunks(chunk_size):
+ yield chunk
+ await asyncio.wait([fill_buffer_task])
async def async_queue_stream_pcm(
self, player_id, sample_rate=96000, bit_depth=32
cur_chunk += 1
# HANDLE FIRST PART OF TRACK
- if not chunk and cur_chunk == 1 and is_last_chunk:
- raise RuntimeError("Stream error on track %s" % queue_track.item_id)
+ if not chunk and bytes_written == 0:
+ # stream error: got empy first chunk
+ # prevent player queue get stuck by sending next track command
+ self.mass.add_job(player_queue.async_next())
+ LOGGER.error("Stream error on track %s", queue_track.item_id)
+ return
if cur_chunk <= 2 and not last_fadeout_data:
# no fadeout_part available so just pass it to the output directly
yield chunk
player_id, streamdetails.item_id, streamdetails.provider
)
# start streaming
+ LOGGER.debug("Start streaming %s (%s)", queue_item_id, queue_item.name)
async for _, audio_chunk in self.async_get_sox_stream(
streamdetails, gain_db_adjust=gain_correct
):
yield audio_chunk
+ LOGGER.debug("Finished streaming %s (%s)", queue_item_id, queue_item.name)
async def async_get_media_stream(
self, streamdetails: StreamDetails
) -> AsyncGenerator[bytes, None]:
"""Get the (original/untouched) audio data for the given streamdetails. Generator."""
- stream_path = await async_decrypt_string(streamdetails.path)
+ stream_path = streamdetails.path
stream_type = StreamType(streamdetails.type)
audio_data = b""
chunk_size = 512000
+ track_loudness = await self.mass.database.async_get_track_loudness(
+ streamdetails.item_id, streamdetails.provider
+ )
+ needs_analyze = track_loudness is None
# support for AAC/MPEG created with ffmpeg in between
if streamdetails.content_type in [ContentType.AAC, ContentType.MPEG]:
# signal start of stream event
self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails)
+ LOGGER.debug(
+ "start media stream for: %s/%s (%s)",
+ streamdetails.provider,
+ streamdetails.item_id,
+ streamdetails.type,
+ )
if stream_type == StreamType.URL:
async with self.mass.http_session.get(stream_path) as response:
if not chunk:
break
yield chunk
- if len(audio_data) < 100000000:
+ if needs_analyze and len(audio_data) < 100000000:
audio_data += chunk
elif stream_type == StreamType.FILE:
async with AIOFile(stream_path) as afp:
async for chunk in Reader(afp, chunk_size=chunk_size):
+ if not chunk:
+ break
yield chunk
- if len(audio_data) < 100000000:
+ if needs_analyze and len(audio_data) < 100000000:
audio_data += chunk
elif stream_type == StreamType.EXECUTABLE:
args = shlex.split(stream_path)
- async with AsyncProcess(args, chunk_size, False) as process:
- async for chunk in process.iterate_chunks():
+ async with AsyncProcess(args) as process:
+ async for chunk in process.iterate_chunks(chunk_size):
+ if not chunk:
+ break
yield chunk
- if len(audio_data) < 100000000:
+ if needs_analyze and len(audio_data) < 100000000:
audio_data += chunk
# signal end of stream event
self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails)
+ LOGGER.debug(
+ "finished media stream for: %s/%s",
+ streamdetails.provider,
+ streamdetails.item_id,
+ )
# send analyze job to background worker
- self.mass.add_job(self.__analyze_audio, streamdetails, audio_data)
+ if needs_analyze and audio_data:
+ self.mass.add_job(self.__analyze_audio, streamdetails, audio_data)
def __get_player_sox_options(
self, player_id: str, streamdetails: StreamDetails
from typing import Any, List, Mapping
import ujson
-import unidecode
from mashumaro import DataClassDictMixin
Album = "album"
Single = "single"
Compilation = "compilation"
+ Unknown = "unknown"
class TrackQuality(IntEnum):
for item in ["The ", "De ", "de ", "Les "]:
if self.name.startswith(item):
sort_name = "".join(self.name.split(item)[1:])
- return unidecode.unidecode(sort_name).lower()
+ return sort_name.lower()
@property
def available(self):
version: str = ""
year: int = 0
artist: ItemMapping = None
- album_type: AlbumType = AlbumType.Album
+ album_type: AlbumType = AlbumType.Unknown
upc: str = ""
@classmethod
def from_track(cls, track: Union[Track, Radio]):
- """Construct QueueItem from track/raio item."""
+ """Construct QueueItem from track/radio item."""
return cls.from_dict(track.to_dict())
else:
# at this point we don't know if the queue is synced with the player
# so just to be safe we send the queue_items to the player
- await self.player.async_cmd_queue_load(self.items)
- await self.async_play_index(prev_index)
+ self._items = self._items[prev_index:]
+ return await self.player.async_cmd_queue_load(self._items)
else:
LOGGER.warning(
"resume queue requested for %s but queue is empty", self.queue_id
from enum import Enum
from typing import Any
-from mashumaro import DataClassDictMixin
-
class StreamType(Enum):
"""Enum with stream types."""
@dataclass
-class StreamDetails(DataClassDictMixin):
+class StreamDetails:
"""Model for streamdetails."""
type: StreamType
details: Any = None
seconds_played: int = 0
sox_options: str = None
+
+ def to_dict(
+ self,
+ use_bytes: bool = False,
+ use_enum: bool = False,
+ use_datetime: bool = False,
+ ):
+ """Handle conversion to dict."""
+ return {
+ "provider": self.provider,
+ "item_id": self.item_id,
+ "content_type": self.content_type.value,
+ "sample_rate": self.sample_rate,
+ "bit_depth": self.bit_depth,
+ "sox_options": self.sox_options,
+ "seconds_played": self.seconds_played,
+ }
async def async_cmd_stop(self) -> None:
"""Send stop command to player."""
if self._chromecast and self._chromecast.media_controller:
- await self.async_chromecast_command(self._chromecast.media_controller.stop)
+ await self.async_chromecast_command(self._chromecast.quit_app)
async def async_cmd_play(self) -> None:
"""Send play command to player."""
async def async_cmd_power_off(self) -> None:
"""Send power OFF command to player."""
- if self.media_status and (
- self.media_status.player_is_playing
- or self.media_status.player_is_paused
- or self.media_status.player_is_idle
- ):
- await self.async_chromecast_command(self._chromecast.media_controller.stop)
+ await self.async_cmd_stop()
# chromecast has no real poweroff so we send mute instead
await self.async_chromecast_command(self._chromecast.set_volume_muted, True)
async def async_get_artist_toptracks(self, prov_artist_id) -> List[Track]:
"""Get a list of most popular tracks for the given artist."""
- # artist toptracks not supported on Qobuz, so use search instead
- # assuming qobuz returns results sorted by popularity
+ params = {
+ "artist_id": prov_artist_id,
+ "extra": "playlists",
+ "offset": 0,
+ "limit": 25,
+ }
+ result = await self.__async_get_data("artist/get", params)
+ if result and result["playlists"]:
+ return [
+ await self.__async_parse_track(item)
+ for item in result["playlists"][0]["tracks"]["items"]
+ if (item and item["id"])
+ ]
+ # fallback to search
artist = await self.async_get_artist(prov_artist_id)
params = {"query": artist.name, "limit": 25, "type": "tracks"}
searchresult = await self.__async_get_data("catalog/search", params)
)
]
+ async def async_get_similar_artists(self, prov_artist_id):
+ """Get similar artists for given artist."""
+ # https://www.qobuz.com/api.json/0.2/artist/getSimilarArtists?artist_id=220020&offset=0&limit=3
+
async def async_library_add(self, prov_item_id, media_type: MediaType):
"""Add item to library."""
result = None
if not self.__user_auth_info:
return
# TODO: need to figure out if the streamed track is purchased by user
+ # https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx
+ # {"albums":{"total":0,"items":[]},"tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}}
if msg == EVENT_STREAM_STARTED and msg_details.provider == PROV_ID:
# report streaming started to qobuz
device_id = self.__user_auth_info["user"]["device"]["id"]
artist.provider_ids.append(
MediaItemProviderId(provider=PROV_ID, item_id=str(artist_obj["id"]))
)
- if artist_obj.get("image"):
- for key in ["extralarge", "large", "medium", "small"]:
- if artist_obj["image"].get(key):
- if (
- "2a96cbd8b46e442fc41c2b86b821562f"
- not in artist_obj["image"][key]
- ):
- artist.metadata["image"] = artist_obj["image"][key]
- break
+ artist.metadata["image"] = self.__get_image(artist_obj)
if artist_obj.get("biography"):
artist.metadata["biography"] = artist_obj["biography"].get("content", "")
if artist_obj.get("url"):
album.artist = artist_obj
else:
album.artist = await self.__async_parse_artist(album_obj["artist"])
- if album_obj.get("product_type", "") == "single":
+ if (
+ album_obj.get("product_type", "") == "single"
+ or album_obj.get("release_type", "") == "single"
+ ):
album.album_type = AlbumType.Single
elif (
album_obj.get("product_type", "") == "compilation"
or "Various" in album.artist.name
):
album.album_type = AlbumType.Compilation
- else:
+ elif (
+ album_obj.get("product_type", "") == "album"
+ or album_obj.get("release_type", "") == "album"
+ ):
album.album_type = AlbumType.Album
if "genre" in album_obj:
album.metadata["genre"] = album_obj["genre"]["name"]
- if album_obj.get("image"):
- for key in ["extralarge", "large", "medium", "small"]:
- if album_obj["image"].get(key):
- album.metadata["image"] = album_obj["image"][key]
- break
+ album.metadata["image"] = self.__get_image(album_obj)
if len(album_obj["upc"]) == 13:
# qobuz writes ean as upc ?!
album.metadata["ean"] = album_obj["upc"]
track.metadata["performers"] = track_obj["performers"]
if track_obj.get("copyright"):
track.metadata["copyright"] = track_obj["copyright"]
+ if track_obj.get("audio_info"):
+ track.metadata["replaygain"] = track_obj["audio_info"][
+ "replaygain_track_gain"
+ ]
+ if track_obj.get("parental_warning"):
+ track.metadata["explicit"] = True
+ track.metadata["image"] = self.__get_image(track_obj)
# get track quality
if track_obj["maximum_sampling_rate"] > 192:
quality = TrackQuality.FLAC_LOSSLESS_HI_RES_4
playlist_obj["owner"]["id"] == self.__user_auth_info["user"]["id"]
or playlist_obj["is_collaborative"]
)
- if playlist_obj.get("images300"):
- playlist.metadata["image"] = playlist_obj["images300"][0]
+ playlist.metadata["image"] = self.__get_image(playlist_obj)
if playlist_obj.get("url"):
playlist.metadata["qobuz_url"] = playlist_obj["url"]
playlist.checksum = playlist_obj["updated_at"]
LOGGER.error("%s - %s", endpoint, result)
return None
return result
+
+ def __get_image(self, obj: dict) -> Optional[str]:
+ """Try to parse image from Qobuz media object."""
+ if obj.get("image"):
+ for key in ["extralarge", "large", "medium", "small"]:
+ if obj["image"].get(key):
+ if "2a96cbd8b46e442fc41c2b86b821562f" in obj["image"][key]:
+ continue
+ return obj["image"][key]
+ if obj.get("images300"):
+ # playlists seem to use this strange format
+ return obj["images300"][0]
+ if obj.get("album"):
+ return self.__get_image(obj["album"])
+ if obj.get("artist"):
+ return self.__get_image(obj["artist"])
+ return None
album.album_type = AlbumType.Single
elif album_obj["album_type"] == "compilation":
album.album_type = AlbumType.Compilation
- else:
+ elif album_obj["album_type"] == "album":
album.album_type = AlbumType.Album
if "genres" in album_obj:
album.metadata["genres"] = album_obj["genres"]
track.isrc = track_obj["external_ids"]["isrc"]
if "album" in track_obj:
track.album = await self.__async_parse_album(track_obj["album"])
+ if track_obj["album"].get("images"):
+ track.metadata["image"] = track_obj["album"]["images"][0]["url"]
if track_obj.get("copyright"):
track.metadata["copyright"] = track_obj["copyright"]
if track_obj.get("explicit"):
url = await async_get_image_url(
self.mass, item.item_id, item.provider, item.media_type
)
- img_file = await async_get_thumb_file(self.mass, url, size)
- if img_file:
- with open(img_file, "rb") as _file:
- icon_data = _file.read()
- icon_data = b64encode(icon_data)
- return "data:image/png;base64," + icon_data.decode()
+ if url:
+ img_file = await async_get_thumb_file(self.mass, url, size)
+ if img_file:
+ with open(img_file, "rb") as _file:
+ icon_data = _file.read()
+ icon_data = b64encode(icon_data)
+ return "data:image/png;base64," + icon_data.decode()
raise KeyError("Invalid item or url")
@api_route("images/provider-icons/:provider_id?")