conf_key = f"{CONF_PROVIDERS}/{config.instance_id}"
self.set(conf_key, config.to_raw())
- @api_command("config/providers/reload")
- async def reload_provider(self, instance_id: str) -> None:
- """Reload provider."""
- try:
- config = await self.get_provider_config(instance_id)
- except KeyError:
- # Edge case: Provider was removed before we could reload it
- return
- await self._load_provider_config(config)
-
@api_command("config/players")
async def get_player_configs(
self, provider: str | None = None, include_values: bool = False
await _file.write(json_dumps(self._data, indent=True))
LOGGER.debug("Saved data to persistent storage")
+ @api_command("config/providers/reload")
+ async def _reload_provider(self, instance_id: str) -> None:
+ """Reload provider."""
+ try:
+ config = await self.get_provider_config(instance_id)
+ except KeyError:
+ # Edge case: Provider was removed before we could reload it
+ return
+ await self.mass.load_provider_config(config)
+
async def _update_provider_config(
self, instance_id: str, values: dict[str, ConfigValueType]
) -> ProviderConfig:
raw_conf = config.to_raw()
self.set(conf_key, raw_conf)
if config.enabled:
- await self._load_provider_config(config)
+ await self.mass.load_provider_config(config)
else:
# disable provider
prov_manifest = self.mass.get_provider_manifest(config.domain)
self.set(conf_key, config.to_raw())
# try to load the provider
try:
- await self._load_provider_config(config)
+ await self.mass.load_provider_config(config)
except Exception:
# loading failed, remove config
self.remove(conf_key)
raise
return config
-
- async def _load_provider_config(self, config: ProviderConfig) -> None:
- """Load given provider config."""
- # check if there are no other providers dependent of this provider
- deps = set()
- for dep_prov in self.mass.providers:
- if dep_prov.manifest.depends_on == config.domain:
- deps.add(dep_prov.instance_id)
- await self.mass.unload_provider(dep_prov.instance_id)
- # (re)load the provider
- await self.mass.load_provider_config(config)
- # reload any dependants
- for dep in deps:
- conf = await self.get_provider_config(dep)
- await self.mass.load_provider(conf.instance_id)
async def cleanup_provider(self, provider_instance: str) -> None:
"""Cleanup provider records from the database."""
+ if provider_instance.startswith(("filesystem", "jellyfin", "plex", "opensubsonic")):
+ # removal of a local provider can become messy very fast due to the relations
+ # such as images pointing at the files etc. so we just reset the whole db
+ self.logger.warning(
+ "Removal of local provider detected, issuing full database reset..."
+ )
+ await self._reset_database()
+ return
deleted_providers = self.mass.config.get_raw_core_config_value(
self.domain, CONF_DELETED_PROVIDERS, []
)
)
self.mass.config.save(True)
- # clean cache items from deleted provider(s)
- await self.mass.cache.clear(provider_instance)
+ # always clear cache when a provider is removed
+ await self.mass.cache.clear()
# cleanup media items from db matched to deleted provider
self.logger.info(
"Database migration failed - setup can not continue. "
"Try restarting the server. If this issue persists, create an issue report "
" on Github and/or re-install the server (or restore a backup).",
- exc_info=err,
+ exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None,
)
# restore backup file
await asyncio.to_thread(shutil.copyfile, db_path_backup, db_path)
async def __migrate_database(self, prev_version: int) -> None:
"""Perform a database migration."""
+ # ruff: noqa: PLR0915
self.logger.info(
"Migrating database from version %s to %s", prev_version, DB_SCHEMA_VERSION
)
item.provider_mappings = {
x for x in item.provider_mappings if x.provider_instance is not None
}
- await ctrl.update_item_in_library(item.item_id, item, True)
+ try:
+ await ctrl.update_item_in_library(item.item_id, item, True)
+ except Exception as err:
+ self.logger.warning(
+ "Error while migrating %s: %s",
+ item.item_id,
+ str(err),
+ exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None,
+ )
+ await ctrl.remove_item_from_library(item.item_id)
if prev_version <= 5:
# mark all provider mappings as available to recover from the bug
changes = True
if changes:
media_item.metadata.images = images
- await ctrl.update_item_in_library(media_item.item_id, media_item, True)
+ try:
+ await ctrl.update_item_in_library(media_item.item_id, media_item, True)
+ except Exception as err:
+ self.logger.warning(
+ "Error while migrating %s: %s",
+ media_item.item_id,
+ str(err),
+ exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None,
+ )
+ await ctrl.remove_item_from_library(media_item.item_id)
# save changes
await self.database.commit()
check_audio_support,
crossfade_pcm_parts,
get_ffmpeg_stream,
- get_hls_stream,
+ get_hls_substream,
get_icy_stream,
get_player_filter_params,
get_silence,
async for chunk in get_silence(2, pcm_format):
yield chunk
elif streamdetails.stream_type == StreamType.HLS:
- audio_source = get_hls_stream(
- self.mass, streamdetails.path, streamdetails, streamdetails.seek_position
- )
+ # we simply select the best quality substream here
+ # if we ever want to support adaptive stream selection based on bandwidth
+ # we need to move the substream selection into the loop below and make it
+ # bandwidth aware. For now we just assume domestic high bandwidth where
+ # the user wants the best quality possible at all times.
+ substream = await get_hls_substream(self.mass, streamdetails.path)
+ audio_source = substream.path
elif streamdetails.stream_type == StreamType.ENCRYPTED_HTTP:
audio_source = streamdetails.path
extra_input_args += ["-decryption_key", streamdetails.decryption_key]
import os
import re
import struct
-import time
from collections import deque
from collections.abc import AsyncGenerator
from contextlib import suppress
fetch_playlist,
parse_m3u,
)
-from music_assistant.server.helpers.tags import parse_tags
from .process import AsyncProcess, check_output, communicate
from .util import create_tempfile
# work out how to handle radio stream
if (
- streamdetails.media_type == MediaType.RADIO
+ streamdetails.media_type in (MediaType.RADIO, StreamType.ICY, StreamType.HLS)
and streamdetails.stream_type == StreamType.HTTP
):
resolved_url, is_icy, is_hls = await resolve_radio_stream(mass, streamdetails.path)
- bool if the URL represents a ICY (radio) stream.
- bool uf the URL represents a HLS stream/playlist.
"""
- base_url = url.split("?")[0]
cache_key = f"RADIO_RESOLVED_{url}"
if cache := await mass.cache.get(cache_key):
return cache
is_icy = headers.get("icy-metaint") is not None
is_hls = headers.get("content-type") in HLS_CONTENT_TYPES
if (
- base_url.endswith((".m3u", ".m3u8", ".pls"))
- or headers.get("content-type") == "audio/x-mpegurl"
+ url.endswith((".m3u", ".m3u8", ".pls"))
+ or ".m3u?" in url
+ or ".m3u8?" in url
+ or ".pls?" in url
+ or "audio/x-mpegurl" in headers.get("content-type")
+ or "audio/x-scpls" in headers.get("content-type", "")
):
# url is playlist, we need to unfold it
substreams = await fetch_playlist(mass, url)
if not line.is_url:
continue
# unfold first url of playlist
- resolved_url, is_icy, is_hls = await resolve_radio_stream(mass, line.path)
+ return await resolve_radio_stream(mass, line.path)
raise InvalidDataError("No content found in playlist")
except IsHLSPlaylist:
is_hls = True
streamdetails.stream_title = cleaned_stream_title
-async def get_hls_stream(
- mass: MusicAssistant,
- url: str,
- streamdetails: StreamDetails,
- seek_position: int = 0,
-) -> AsyncGenerator[bytes, None]:
- """Get audio stream from HTTP HLS stream."""
- logger = LOGGER.getChild("hls_stream")
- logger.debug("Start streaming HLS stream for url %s", url)
- timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
- prev_chunks: deque[str] = deque(maxlen=50)
- has_playlist_metadata: bool | None = None
- has_id3_metadata: bool | None = None
- is_live_stream = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
- # we simply select the best quality substream here
- # if we ever want to support adaptive stream selection based on bandwidth
- # we need to move the substream selection into the loop below and make it
- # bandwidth aware. For now we just assume domestic high bandwidth where
- # the user wants the best quality possible at all times.
- playlist_item = await get_hls_substream(mass, url)
- substream_url = playlist_item.path
- seconds_skipped = 0
- empty_loops = 0
- while True:
- logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url)
- async with mass.http_session.get(
- substream_url, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout
- ) as resp:
- resp.raise_for_status()
- charset = resp.charset or "utf-8"
- substream_m3u_data = await resp.text(charset)
- # get chunk-parts from the substream
- hls_chunks = parse_m3u(substream_m3u_data)
- chunk_seconds = 0
- time_start = time.time()
- for chunk_item in hls_chunks:
- if chunk_item.path in prev_chunks:
- continue
- chunk_length = int(chunk_item.length) if chunk_item.length else 6
- # try to support seeking here
- if seek_position and (seconds_skipped + chunk_length) < seek_position:
- seconds_skipped += chunk_length
- continue
- chunk_item_url = chunk_item.path
- if not chunk_item_url.startswith("http"):
- # path is relative, stitch it together
- base_path = substream_url.rsplit("/", 1)[0]
- chunk_item_url = base_path + "/" + chunk_item.path
- # handle (optional) in-playlist (timed) metadata
- if has_playlist_metadata is None:
- has_playlist_metadata = chunk_item.title not in (None, "")
- logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata)
- if has_playlist_metadata and chunk_item.title != "no desc":
- # bbc (and maybe others?) set the title to 'no desc'
- cleaned_stream_title = clean_stream_title(chunk_item.title)
- if cleaned_stream_title != streamdetails.stream_title:
- logger.log(
- VERBOSE_LOG_LEVEL, "HLS Radio streamtitle original: %s", chunk_item.title
- )
- logger.log(
- VERBOSE_LOG_LEVEL, "HLS Radio streamtitle cleaned: %s", cleaned_stream_title
- )
- streamdetails.stream_title = cleaned_stream_title
- logger.log(VERBOSE_LOG_LEVEL, "playing chunk %s", chunk_item)
- # prevent that we play this chunk again if we loop through
- prev_chunks.append(chunk_item.path)
- async with mass.http_session.get(
- chunk_item_url, headers=HTTP_HEADERS, timeout=timeout
- ) as resp:
- yield await resp.content.read()
- chunk_seconds += chunk_length
- # handle (optional) in-band (m3u) metadata
- if has_id3_metadata is not None and has_playlist_metadata:
- continue
- if has_id3_metadata in (None, True):
- tags = await parse_tags(chunk_item_url)
- has_id3_metadata = tags.title and tags.title not in chunk_item.path
- logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata)
- # end of stream reached - for non livestreams, we are ready and should return
- # for livestreams we loop around to get the next playlist with chunks
- if not is_live_stream:
- return
- # safeguard for an endless loop
- # this may happen if we're simply going too fast for the live stream
- # we already throttle it a bit but we may end up in a situation where something is wrong
- # and we want to break out of this loop, hence this check
- if chunk_seconds == 0:
- empty_loops += 1
- await asyncio.sleep(1)
- else:
- empty_loops = 0
- if empty_loops == 50:
- logger.warning("breaking out of endless loop")
- break
- # ensure that we're not going to fast - otherwise we get the same substream playlist
- while (time.time() - time_start) < (chunk_seconds - 1):
- await asyncio.sleep(0.5)
-
-
async def get_hls_substream(
mass: MusicAssistant,
url: str,
charset = resp.charset or "utf-8"
master_m3u_data = await resp.text(charset)
substreams = parse_m3u(master_m3u_data)
+ if any(x for x in substreams if x.length):
+ # this is already a substream!
+ return PlaylistItem(
+ path=url,
+ )
# sort substreams on best quality (highest bandwidth) when available
if any(x for x in substreams if x.stream_info):
substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
if len(info) != 2:
continue
length = info[0].strip()[0]
+ if length == "-1":
+ length = None
title = info[1].strip()
elif line.startswith("#EXT-X-STREAM-INF:"):
# HLS stream properties
if file_option not in playlist_section:
continue
itempath = playlist_section[file_option]
+ length = playlist_section.get(f"Length{entry}")
playlist.append(
PlaylistItem(
- length=playlist_section.get(f"Length{entry}"),
+ length=length if length and length != "-1" else None,
title=playlist_section.get(f"Title{entry}"),
path=itempath,
)
force_radio: bool = False,
) -> Track | Radio:
"""Parse plain URL to MediaItem of type Radio or Track."""
- media_info = await self._get_media_info(url, force_refresh)
+ try:
+ media_info = await self._get_media_info(url, force_refresh)
+ except Exception as err:
+ raise MediaNotFoundError from err
is_radio = media_info.get("icyname") or not media_info.duration
provider_mappings = {
ProviderMapping(
"""Call (by config manager) when the configuration of a player changes."""
super().on_player_config_changed(config, changed_keys)
if "enabled" in changed_keys and config.player_id not in self.castplayers:
- self.mass.create_task(self.mass.config.reload_provider, self.instance_id)
+ self.mass.create_task(self.mass.load_provider, self.instance_id)
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
await self.hass.start_listening()
except BaseHassClientError as err:
self.logger.warning("Connection to HA lost due to error: %s", err)
- self.logger.info("Connection to HA lost. Reloading provider in 5 seconds.")
+ self.logger.info("Connection to HA lost. Connection will be automatically retried later.")
# schedule a reload of the provider
- self.mass.call_later(5, self.mass.config.reload_provider(self.instance_id))
+ self.mass.call_later(5, self.mass.load_provider(self.instance_id, allow_retry=True))
"Connection to SnapServer lost, reason: %s. Reloading provider in 5 seconds.", str(exc)
)
# schedule a reload of the provider
- self.mass.call_later(5, self.mass.config.reload_provider(self.instance_id))
+ self.mass.call_later(5, self.mass.load_provider(self.instance_id, allow_retry=True))
if not artist_id:
msg = "Artist does not have a valid ID"
raise InvalidDataError(msg)
+ artist_id = str(artist_id)
artist = Artist(
item_id=artist_id,
name=artist_obj["username"],
async def _parse_playlist(self, playlist_obj: dict) -> Playlist:
"""Parse a Soundcloud Playlist response to a Playlist object."""
+ playlist_id = str(playlist_obj["id"])
playlist = Playlist(
- item_id=playlist_obj["id"],
+ item_id=playlist_id,
provider=self.domain,
name=playlist_obj["title"],
provider_mappings={
ProviderMapping(
- item_id=playlist_obj["id"],
+ item_id=playlist_id,
provider_domain=self.domain,
provider_instance=self.instance_id,
)
async def _parse_track(self, track_obj: dict, playlist_position: int = 0) -> Track:
"""Parse a Soundcloud Track response to a Track model object."""
name, version = parse_title_and_version(track_obj["title"])
+ track_id = str(track_obj["id"])
track = Track(
- item_id=track_obj["id"],
+ item_id=track_id,
provider=self.domain,
name=name,
version=version,
duration=track_obj["duration"] / 1000,
provider_mappings={
ProviderMapping(
- item_id=track_obj["id"],
+ item_id=track_id,
provider_domain=self.domain,
provider_instance=self.instance_id,
audio_format=AudioFormat(
if not (refresh_token := self.config.get_value(CONF_REFRESH_TOKEN)):
raise LoginFailed("Authentication required")
- expires_at = self.config.get_value(CONF_AUTH_EXPIRES_AT) or 0
- access_token = self.config.get_value(CONF_ACCESS_TOKEN)
-
- if expires_at < (time.time() - 300):
- # refresh token
- client_id = self.config.get_value(CONF_CLIENT_ID) or app_var(2)
- params = {
- "grant_type": "refresh_token",
- "refresh_token": refresh_token,
- "client_id": client_id,
- }
- async with self.mass.http_session.post(
- "https://accounts.spotify.com/api/token", data=params
- ) as response:
- if response.status != 200:
- err = await response.text()
- self.mass.config.set_raw_provider_config_value(
- self.instance_id, CONF_REFRESH_TOKEN, None
- )
- raise LoginFailed(f"Failed to refresh access token: {err}")
- data = await response.json()
- access_token = data["access_token"]
- refresh_token = data["refresh_token"]
- expires_at = int(data["expires_in"] + time.time())
- self.logger.debug("Successfully refreshed access token")
+ # refresh token
+ client_id = self.config.get_value(CONF_CLIENT_ID) or app_var(2)
+ params = {
+ "grant_type": "refresh_token",
+ "refresh_token": refresh_token,
+ "client_id": client_id,
+ }
+ async with self.mass.http_session.post(
+ "https://accounts.spotify.com/api/token", data=params
+ ) as response:
+ if response.status != 200:
+ err = await response.text()
+ self.mass.config.set_raw_provider_config_value(
+ self.instance_id, CONF_REFRESH_TOKEN, None
+ )
+ raise LoginFailed(f"Failed to refresh access token: {err}")
+ data = await response.json()
+ access_token = data["access_token"]
+ refresh_token = data["refresh_token"]
+ expires_at = int(data["expires_in"] + time.time())
+ self.logger.debug("Successfully refreshed access token")
self._auth_info = auth_info = {
"access_token": access_token,
if existing := self._tracked_timers.pop(task_id, None):
existing.cancel()
- try:
- await self._load_provider(prov_conf)
- # pylint: disable=broad-except
- except Exception as exc:
- LOGGER.error(
- "Error loading provider(instance) %s: %s",
- prov_conf.name or prov_conf.instance_id,
- str(exc) or exc.__class__.__name__,
- # log full stack trace if debug logging is enabled
- exc_info=exc if LOGGER.isEnabledFor(logging.DEBUG) else None,
- )
- raise
+ await self._load_provider(prov_conf)
+
+ # (re)load any dependants
+ prov_configs = await self.config.get_provider_configs(include_values=True)
+ for dep_prov_conf in prov_configs:
+ if not dep_prov_conf.enabled:
+ continue
+ manifest = self.get_provider_manifest(dep_prov_conf.domain)
+ if not manifest.depends_on:
+ continue
+ if manifest.depends_on == prov_conf.domain:
+ await self._load_provider(dep_prov_conf)
async def load_provider(
self,
# auto schedule a retry if the (re)load failed (handled exceptions only)
if isinstance(exc, MusicAssistantError) and allow_retry:
self.call_later(
- 300,
+ 120,
self.load_provider,
instance_id,
allow_retry,
task_id=task_id,
)
+ LOGGER.warning(
+ "Error loading provider(instance) %s: %s (will be retried later)",
+ prov_conf.name or prov_conf.instance_id,
+ str(exc) or exc.__class__.__name__,
+ # log full stack trace if verbose logging is enabled
+ exc_info=exc if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
+ )
return
# raise in all other situations
raise
+ # (re)load any dependents if needed
+ for dep_prov in self.providers:
+ if dep_prov.available:
+ continue
+ if dep_prov.manifest.depends_on == prov_conf.domain:
+ await self.unload_provider(dep_prov.instance_id)
+
async def unload_provider(self, instance_id: str) -> None:
"""Unload a provider."""
if provider := self._providers.get(instance_id):
raise SetupFailedError(msg)
# handle dependency on other provider
- if prov_manifest.depends_on:
- for _ in range(30):
- if self.get_provider(prov_manifest.depends_on):
- break
- await asyncio.sleep(1)
- else:
- msg = (
- f"Provider {domain} depends on {prov_manifest.depends_on} "
- "which is not available."
- )
- raise SetupFailedError(msg)
+ if prov_manifest.depends_on and not self.get_provider(prov_manifest.depends_on):
+ msg = (
+ f"Provider {domain} depends on {prov_manifest.depends_on} "
+ "which is not (yet) available."
+ )
+ raise SetupFailedError(msg)
# try to setup the module
prov_mod = await load_provider_module(domain, prov_manifest.requirements)