default_value: list[str] = []
for option in CONF_ENTRY_SAMPLE_RATES.options:
- option_value = cast(str, option.value)
+ option_value = cast("str", option.value)
sample_rate_str, bit_depth_str = option_value.split(MULTI_VALUE_SPLITTER, 1)
sample_rate = int(sample_rate_str)
bit_depth = int(bit_depth_str)
def providers(self) -> list[MetadataProvider]:
"""Return all loaded/running MetadataProviders."""
if TYPE_CHECKING:
- return cast(list[MetadataProvider], self.mass.get_providers(ProviderType.METADATA))
+ return cast("list[MetadataProvider]", self.mass.get_providers(ProviderType.METADATA))
return self.mass.get_providers(ProviderType.METADATA)
@property
# collect (local) metadata from all local providers
local_provs = get_global_cache_value("non_streaming_providers")
if TYPE_CHECKING:
- local_provs = cast(set[str], local_provs)
+ local_provs = cast("set[str]", local_provs)
# ensure the item is matched to all providers
await self.mass.music.artists.match_providers(artist)
unique_keys: set[str] = set()
local_provs = get_global_cache_value("non_streaming_providers")
if TYPE_CHECKING:
- local_provs = cast(set[str], local_provs)
+ local_provs = cast("set[str]", local_provs)
for prov_mapping in sorted(album.provider_mappings, key=lambda x: x.priority, reverse=True):
if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
continue
unique_keys: set[str] = set()
local_provs = get_global_cache_value("non_streaming_providers")
if TYPE_CHECKING:
- local_provs = cast(set[str], local_provs)
+ local_provs = cast("set[str]", local_provs)
for prov_mapping in sorted(track.provider_mappings, key=lambda x: x.priority, reverse=True):
if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
continue
musicbrainz: MusicbrainzProvider = self.mass.get_provider("musicbrainz")
if TYPE_CHECKING:
- musicbrainz = cast(MusicbrainzProvider, musicbrainz)
+ musicbrainz = cast("MusicbrainzProvider", musicbrainz)
# first try with resource URL (e.g. streaming provider share URL)
for prov_mapping in artist.provider_mappings:
if prov_mapping.url and prov_mapping.url.startswith("http"):
available_providers = get_global_cache_value("available_providers")
if TYPE_CHECKING:
- available_providers = cast(set[str], available_providers)
+ available_providers = cast("set[str]", available_providers)
# fetch the first (available) provider item
for prov_mapping in sorted(
"Fetching resume point to play for Podcast episode %s",
episode.name,
)
- episode = cast(PodcastEpisode, episode)
+ episode = cast("PodcastEpisode", episode)
fully_played, resume_position_ms = await self.mass.music.get_resume_position(episode)
episode.fully_played = fully_played
episode.resume_position_ms = 0 if fully_played else resume_position_ms
"""Return PlayerProvider for given player."""
player = self._players[player_id]
player_provider = self.mass.get_provider(player.provider)
- return cast(PlayerProvider, player_provider)
+ return cast("PlayerProvider", player_provider)
def get_announcement_volume(self, player_id: str, volume_override: int | None) -> int | None:
"""Get the (player specific) volume for a announcement."""
if streamdetails.cache is None:
streamdetails.cache = StreamCache(mass, streamdetails)
else:
- streamdetails.cache = cast(StreamCache, streamdetails.cache)
+ streamdetails.cache = cast("StreamCache", streamdetails.cache)
# create cache (if needed) and wait until the cache is available
await streamdetails.cache.create()
LOGGER.debug(
# work out audio source for these streamdetails
stream_type = streamdetails.stream_type
if stream_type == StreamType.CACHE:
- cache = cast(StreamCache, streamdetails.cache)
+ cache = cast("StreamCache", streamdetails.cache)
audio_source = await cache.get_audio_stream()
elif stream_type == StreamType.MULTI_FILE:
audio_source = get_multi_file_stream(mass, streamdetails)
# release cache if needed
if cache := streamdetails.cache:
- cache = cast(StreamCache, streamdetails.cache)
+ cache = cast("StreamCache", streamdetails.cache)
cache.release()
# parse loudnorm data if we have that collected (and enabled)
"600",
]
if streamdetails.stream_type == StreamType.CACHE:
- cache = cast(StreamCache, streamdetails.cache)
+ cache = cast("StreamCache", streamdetails.cache)
audio_source = await cache.get_audio_stream()
elif streamdetails.stream_type == StreamType.MULTI_FILE:
audio_source = get_multi_file_stream(mass, streamdetails)
finally:
# release cache if needed
if cache := streamdetails.cache:
- cache = cast(StreamCache, streamdetails.cache)
+ cache = cast("StreamCache", streamdetails.cache)
cache.release()
wrapper_func: Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]
if asyncio.iscoroutinefunction(check_func):
- async_func = cast(Callable[..., Coroutine[Any, Any, None]], func)
+ async_func = cast("Callable[..., Coroutine[Any, Any, None]]", func)
@wraps(async_func)
async def async_wrapper(*args: Any) -> None:
continue
if asyncio.iscoroutinefunction(cb_func):
if TYPE_CHECKING:
- cb_func = cast(Callable[[MassEvent], Coroutine[Any, Any, None]], cb_func)
+ cb_func = cast("Callable[[MassEvent], Coroutine[Any, Any, None]]", cb_func)
self.create_task(cb_func, event_obj)
else:
if TYPE_CHECKING:
- cb_func = cast(Callable[[MassEvent], None], cb_func)
+ cb_func = cast("Callable[[MassEvent], None]", cb_func)
self.loop.call_soon_threadsafe(cb_func, event_obj)
def subscribe(
if asyncio.iscoroutinefunction(target) or asyncio.iscoroutine(target):
# coroutine function
if TYPE_CHECKING:
- target = cast(Coroutine[Any, Any, _R], target)
+ target = cast("Coroutine[Any, Any, _R]", target)
handle = self.loop.call_later(delay, _create_task, target)
else:
# regular callable
if TYPE_CHECKING:
- target = cast(Callable[..., _R], target)
+ target = cast("Callable[..., _R]", target)
handle = self.loop.call_later(delay, target, *args)
self._tracked_timers[task_id] = handle
return handle
async def load_provider_manifest(provider_domain: str, provider_path: str) -> None:
"""Preload all available provider manifest files."""
# get files in subdirectory
- for file_str in os.listdir(provider_path):
+ for file_str in os.listdir(provider_path): # noqa: PTH208, RUF100
file_path = os.path.join(provider_path, file_str)
if not await isfile(file_path):
continue
)
async with TaskManager(self) as tg:
- for dir_str in os.listdir(PROVIDERS_PATH):
+ for dir_str in os.listdir(PROVIDERS_PATH): # noqa: PTH208, RUF100
if dir_str.startswith(("_", ".")):
continue
dir_path = os.path.join(PROVIDERS_PATH, dir_str)
)
if not library_item_ids:
return [x async for x in self.get_library_artists()]
- library_items = cast(list[int], library_item_ids)
+ library_items = cast("list[int]", library_item_ids)
query = "artists.item_id in :ids"
query_params = {"ids": library_items}
return await self.mass.music.artists.library_items(
)
if not library_item_ids:
return [x async for x in self.get_library_albums()]
- library_item_ids = cast(list[int], library_item_ids)
+ library_item_ids = cast("list[int]", library_item_ids)
query = "albums.item_id in :ids"
query_params = {"ids": library_item_ids}
return await self.mass.music.albums.library_items(
)
if not library_item_ids:
return [x async for x in self.get_library_tracks()]
- library_item_ids = cast(list[int], library_item_ids)
+ library_item_ids = cast("list[int]", library_item_ids)
query = "tracks.item_id in :ids"
query_params = {"ids": library_items}
return await self.mass.music.tracks.library_items(
)
if not library_item_ids:
return [x async for x in self.get_library_radios()]
- library_item_ids = cast(list[int], library_item_ids)
+ library_item_ids = cast("list[int]", library_item_ids)
query = "radios.item_id in :ids"
query_params = {"ids": library_item_ids}
return await self.mass.music.radio.library_items(
)
if not library_item_ids:
return [x async for x in self.get_library_playlists()]
- library_item_ids = cast(list[int], library_item_ids)
+ library_item_ids = cast("list[int]", library_item_ids)
query = "playlists.item_id in :ids"
query_params = {"ids": library_item_ids}
return await self.mass.music.playlists.library_items(
)
if not library_item_ids:
return [x async for x in self.get_library_audiobooks()]
- library_item_ids = cast(list[int], library_item_ids)
+ library_item_ids = cast("list[int]", library_item_ids)
query = "audiobooks.item_id in :ids"
query_params = {"ids": library_item_ids}
return await self.mass.music.audiobooks.library_items(
)
if not library_item_ids:
return [x async for x in self.get_library_podcasts()]
- library_item_ids = cast(list[int], library_item_ids)
+ library_item_ids = cast("list[int]", library_item_ids)
query = "podcasts.item_id in :ids"
query_params = {"ids": library_item_ids}
return await self.mass.music.podcasts.library_items(
ConfigEntry(
key=CONF_BIND_INTERFACE,
type=ConfigEntryType.STRING,
- default_value=cast(str, mass.streams.publish_ip),
+ default_value=cast("str", mass.streams.publish_ip),
label="Bind interface",
description="Interface to bind to for Airplay streaming.",
category="advanced",
)
elif media.queue_id and media.queue_id.startswith("ugp_"):
# special case: UGP stream
- ugp_provider = cast(PlayerGroupProvider, self.mass.get_provider("player_group"))
+ ugp_provider = cast("PlayerGroupProvider", self.mass.get_provider("player_group"))
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
input_format = ugp_stream.base_pcm_format
audio_source = ugp_stream.subscribe_raw()
if values is None:
values = {}
- locale = cast(str, values.get("locale", "") or "us")
- auth_file = cast(str, values.get(CONF_AUTH_FILE))
+ locale = cast("str", values.get("locale", "") or "us")
+ auth_file = cast("str", values.get(CONF_AUTH_FILE))
auth_required = True
if auth_file and await check_file_exists(auth_file):
type=ConfigEntryType.STRING,
label="Post Login Url",
required=False,
- value=cast(str | None, values.get(CONF_POST_LOGIN_URL)),
+ value=cast("str | None", values.get(CONF_POST_LOGIN_URL)),
hidden=not auth_required,
),
ConfigEntry(
label="Code Verifier",
hidden=True,
required=False,
- value=cast(str | None, values.get(CONF_CODE_VERIFIER)),
+ value=cast("str | None", values.get(CONF_CODE_VERIFIER)),
),
ConfigEntry(
key=CONF_SERIAL,
label="Serial",
hidden=True,
required=False,
- value=cast(str | None, values.get(CONF_SERIAL)),
+ value=cast("str | None", values.get(CONF_SERIAL)),
),
ConfigEntry(
key=CONF_LOGIN_URL,
label="Login Url",
hidden=True,
required=False,
- value=cast(str | None, values.get(CONF_LOGIN_URL)),
+ value=cast("str | None", values.get(CONF_LOGIN_URL)),
),
ConfigEntry(
key=CONF_AUTH_FILE,
label="Authentication File",
hidden=True,
required=True,
- value=cast(str | None, values.get(CONF_AUTH_FILE)),
+ value=cast("str | None", values.get(CONF_AUTH_FILE)),
),
)
) -> None:
"""Initialize the Audible Audiobook Provider."""
super().__init__(mass, manifest, config)
- self.locale = cast(str, self.config.get_value(CONF_LOCALE) or "us")
- self.auth_file = cast(str, self.config.get_value(CONF_AUTH_FILE))
+ self.locale = cast("str", self.config.get_value(CONF_LOCALE) or "us")
+ self.auth_file = cast("str", self.config.get_value(CONF_AUTH_FILE))
self._client: audible.AsyncClient | None = None
audible.log_helper.set_level(getLevelName(self.logger.level))
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- parsed_item = cast(Track, await self.parse_item(prov_track_id))
+ parsed_item = cast("Track", await self.parse_item(prov_track_id))
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
if stored_item := next((x for x in stored_items if x["item_id"] == prov_track_id), None):
# always prefer the stored info, such as the name
) -> None:
"""Handle PLAY MEDIA on given player."""
url = f"builtin_player/flow/{player_id}.mp3"
- player = cast(Player, self.mass.players.get(player_id, raise_unavailable=True))
+ player = cast("Player", self.mass.players.get(player_id, raise_unavailable=True))
player.current_media = media
self.mass.signal_event(
if "error" in result and "limit" in result["error"]:
self.logger.warning(result["error"])
return None
- return cast(dict[str, Any], result)
+ return cast("dict[str, Any]", result)
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
) -> ProviderInstanceType:
"""Initialize provider(instance) with given configuration."""
- base_path = cast(str, config.get_value(CONF_PATH))
+ base_path = cast("str", config.get_value(CONF_PATH))
return LocalFileSystemProvider(mass, manifest, config, base_path)
self.base_path: str = base_path
self.write_access: bool = False
self.sync_running: bool = False
- self.media_content_type = cast(str, config.get_value(CONF_ENTRY_CONTENT_TYPE.key))
+ self.media_content_type = cast("str", config.get_value(CONF_ENTRY_CONTENT_TYPE.key))
@property
def supported_features(self) -> set[ProviderFeature]:
# prefer (short lived) cache for a bit more speed
cache_base_key = f"{self.instance_id}.artist"
if artist_path and (cache := await self.cache.get(artist_path, base_key=cache_base_key)):
- return cast(Artist, cache)
+ return cast("Artist", cache)
prov_artist_id = artist_path or name
artist = Artist(
cache_base_key = f"{self.instance_id}.album"
if album_dir and (cache := await self.cache.get(album_dir, base_key=cache_base_key)):
- return cast(Album, cache)
+ return cast("Album", cache)
# album artist(s)
album_artists: UniqueList[Artist | ItemMapping] = UniqueList()
"""Return local images found in a given folderpath."""
cache_base_key = f"{self.lookup_key}.folderimages"
if (cache := await self.cache.get(folder, base_key=cache_base_key)) is not None:
- return cast(UniqueList[MediaItemImage], cache)
+ return cast("UniqueList[MediaItemImage]", cache)
if extra_thumb_names is None:
extra_thumb_names = ()
images: UniqueList[MediaItemImage] = UniqueList()
"""Return metadata for a podcast."""
cache_base_key = f"{self.lookup_key}.podcastmetadata"
if (cache := await self.cache.get(podcast_folder, base_key=cache_base_key)) is not None:
- return cast(dict[str, Any], cache)
+ return cast("dict[str, Any]", cache)
data: dict[str, Any] = {}
metadata_file = os.path.join(podcast_folder, "metadata.json")
if await self.exists(metadata_file):
label="URL",
required=True,
description="URL to your Home Assistant instance (e.g. http://192.168.1.1:8123)",
- value=cast(str, values.get(CONF_URL)) if values else None,
+ value=cast("str", values.get(CONF_URL)) if values else None,
),
ConfigEntry(
key=CONF_ACTION_AUTH,
description="You can either paste a Long Lived Token here manually or use the "
"'authenticate' button to generate a token for you with logging in.",
depends_on=CONF_URL,
- value=cast(str, values.get(CONF_AUTH_TOKEN)) if values else None,
+ value=cast("str", values.get(CONF_AUTH_TOKEN)) if values else None,
category="advanced",
),
ConfigEntry(
# append player controls entries (if we have an active instance)
if instance_id and (hass_prov := mass.get_provider(instance_id)) and hass_prov.available:
- hass_prov = cast(HomeAssistantProvider, hass_prov)
+ hass_prov = cast("HomeAssistantProvider", hass_prov)
return (*base_entries, *(await _get_player_control_config_entries(hass_prov.hass)))
return (
async def _register_player_controls(self) -> None:
"""Register all player controls."""
- power_controls = cast(list[str], self.config.get_value(CONF_POWER_CONTROLS))
- mute_controls = cast(list[str], self.config.get_value(CONF_MUTE_CONTROLS))
- volume_controls = cast(list[str], self.config.get_value(CONF_VOLUME_CONTROLS))
+ power_controls = cast("list[str]", self.config.get_value(CONF_POWER_CONTROLS))
+ mute_controls = cast("list[str]", self.config.get_value(CONF_MUTE_CONTROLS))
+ volume_controls = cast("list[str]", self.config.get_value(CONF_VOLUME_CONTROLS))
control_entity_ids: set[str] = {
*power_controls,
*mute_controls,
if "supported_formats" not in media_player_obj:
continue
for supported_format_obj in media_player_obj["supported_formats"]:
- result.append(cast(ESPHomeSupportedAudioFormat, supported_format_obj))
+ result.append(cast("ESPHomeSupportedAudioFormat", supported_format_obj))
except Exception as exc:
self.logger.warning(
"Failed to fetch diagnostics for ESPHome player: %s",
provider: str = str(config.get(CONF_PROVIDER))
if TYPE_CHECKING:
- key = cast(str, key)
- secret = cast(str, secret)
- session_key = cast(str, session_key)
- username = cast(str, username)
+ key = cast("str", key)
+ secret = cast("str", secret)
+ session_key = cast("str", session_key)
+ username = cast("str", username)
match provider.lower():
case "lastfm":
for key, value in data.items():
new_key = key.replace("-", "_")
new_values[new_key] = replace_hyphens(value)
- return cast(_T, new_values)
+ return cast("_T", new_values)
if isinstance(data, list):
- return cast(_T, [replace_hyphens(x) if isinstance(x, dict) else x for x in data])
+ return cast("_T", [replace_hyphens(x) if isinstance(x, dict) else x for x in data])
return data
if not (player_provider := self.mass.get_provider(group_type)):
return base_entries # guard
if TYPE_CHECKING:
- player_provider = cast(PlayerProvider, player_provider)
+ player_provider = cast("PlayerProvider", player_provider)
assert player_provider.instance_id != self.instance_id
if not (child_player := next((x for x in player_provider.players), None)):
return base_entries # guard
"""Handle POWER command to group player."""
group_player = self.mass.players.get(player_id, raise_unavailable=True)
if TYPE_CHECKING:
- group_player = cast(Player, group_player)
+ group_player = cast("Player", group_player)
# always stop at power off
if not powered and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
"""
group_player = self.mass.players.get(target_player, raise_unavailable=True)
if TYPE_CHECKING:
- group_player = cast(Player, group_player)
+ group_player = cast("Player", group_player)
dynamic_members_enabled = self.mass.config.get_raw_player_config_value(
group_player.player_id,
CONFIG_ENTRY_DYNAMIC_MEMBERS.key,
)
child_player = self.mass.players.get(player_id, raise_unavailable=True)
if TYPE_CHECKING:
- group_player = cast(Player, group_player)
+ group_player = cast("Player", group_player)
if child_player.active_group and child_player.active_group != group_player.player_id:
raise InvalidDataError(
f"Player {child_player.display_name} already has another group active"
group_player = self.mass.players.get(target_player, raise_unavailable=True)
child_player = self.mass.players.get(player_id, raise_unavailable=True)
if TYPE_CHECKING:
- group_player = cast(Player, group_player)
- child_player = cast(Player, child_player)
+ group_player = cast("Player", group_player)
+ child_player = cast("Player", child_player)
dynamic_members_enabled = self.mass.config.get_raw_player_config_value(
group_player.player_id,
CONFIG_ENTRY_DYNAMIC_MEMBERS.key,
elif player_provider := self.mass.get_provider(group_type):
# grab additional details from one of the provider's players
if TYPE_CHECKING:
- player_provider = cast(PlayerProvider, player_provider)
+ player_provider = cast("PlayerProvider", player_provider)
model_name = "Sync Group"
manufacturer = self.mass.get_provider(group_type).name
can_group_with = {player_provider.instance_id}
label="Local server IP",
description="The local server IP (e.g. 192.168.1.77)",
required=True,
- value=cast(str, values.get(CONF_LOCAL_SERVER_IP)) if values else None,
+ value=cast("str", values.get(CONF_LOCAL_SERVER_IP)) if values else None,
),
ConfigEntry(
key=CONF_LOCAL_SERVER_PORT,
description="The local server port (e.g. 32400)",
required=True,
default_value=32400,
- value=cast(int, values.get(CONF_LOCAL_SERVER_PORT)) if values else None,
+ value=cast("int", values.get(CONF_LOCAL_SERVER_PORT)) if values else None,
),
ConfigEntry(
key=CONF_LOCAL_SERVER_SSL,
type=ConfigEntryType.SECURE_STRING,
label=CONF_AUTH_TOKEN,
action=CONF_AUTH_TOKEN,
- value=cast(str | None, values.get(CONF_AUTH_TOKEN)) if values else None,
+ value=cast("str | None", values.get(CONF_AUTH_TOKEN)) if values else None,
hidden=True,
),
]
async def _get_data(self, key: str, cls: type[PlexObjectT]) -> PlexObjectT:
results = await self._run_async(self._plex_library.fetchItem, key, cls)
- return cast(PlexObjectT, results)
+ return cast("PlexObjectT", results)
def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
name, version = parse_title_and_version(name)
async def _search_track(self, search_query: str | None, limit: int) -> list[PlexTrack]:
return cast(
- list[PlexTrack],
+ "list[PlexTrack]",
await self._run_async(self._plex_library.searchTracks, title=search_query, limit=limit),
)
async def _search_album(self, search_query: str, limit: int) -> list[PlexAlbum]:
return cast(
- list[PlexAlbum],
+ "list[PlexAlbum]",
await self._run_async(self._plex_library.searchAlbums, title=search_query, limit=limit),
)
async def _search_artist(self, search_query: str, limit: int) -> list[PlexArtist]:
return cast(
- list[PlexArtist],
+ "list[PlexArtist]",
await self._run_async(
self._plex_library.searchArtists, title=search_query, limit=limit
),
async def _search_playlist(self, search_query: str, limit: int) -> list[PlexPlaylist]:
return cast(
- list[PlexPlaylist],
+ "list[PlexPlaylist]",
await self._run_async(self._plex_library.playlists, title=search_query, limit=limit),
)
async def _search_track_advanced(self, limit: int, **kwargs: Any) -> list[PlexTrack]:
return cast(
- list[PlexPlaylist],
+ "list[PlexPlaylist]",
await self._run_async(self._plex_library.searchTracks, filters=kwargs, limit=limit),
)
async def _search_album_advanced(self, limit: int, **kwargs: Any) -> list[PlexAlbum]:
return cast(
- list[PlexPlaylist],
+ "list[PlexPlaylist]",
await self._run_async(self._plex_library.searchAlbums, filters=kwargs, limit=limit),
)
async def _search_artist_advanced(self, limit: int, **kwargs: Any) -> list[PlexArtist]:
return cast(
- list[PlexPlaylist],
+ "list[PlexPlaylist]",
await self._run_async(self._plex_library.searchArtists, filters=kwargs, limit=limit),
)
async def _search_playlist_advanced(self, limit: int, **kwargs: Any) -> list[PlexPlaylist]:
return cast(
- list[PlexPlaylist],
+ "list[PlexPlaylist]",
await self._run_async(self._plex_library.playlists, filters=kwargs, limit=limit),
)
"""Get a list of albums for the given artist."""
if not prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
plex_artist = await self._get_data(prov_artist_id, PlexArtist)
- plex_albums = cast(list[PlexAlbum], await self._run_async(plex_artist.albums))
+ plex_albums = cast("list[PlexAlbum]", await self._run_async(plex_artist.albums))
if plex_albums:
albums = []
for album_obj in plex_albums:
auth_token,
session=session,
)
- for media_section in cast(list[PlexLibrarySection], plex_server.library.sections()):
+ for media_section in cast("list[PlexLibrarySection]", plex_server.library.sections()):
if media_section.type != PlexMusicSection.TYPE:
continue
# TODO: figure out what plex uses as stable id and use that instead of names
return all_libraries
if cache := await mass.cache.get(cache_key, checksum=auth_token):
- return cast(list[str], cache)
+ return cast("list[str]", cache)
result = await asyncio.to_thread(_get_libraries)
# use short expiration for in-memory cache
"""Retrieve library/subscribed radio stations from the provider."""
stored_radios = self.config.get_value(CONF_STORED_RADIOS)
if TYPE_CHECKING:
- stored_radios = cast(list[str], stored_radios)
+ stored_radios = cast("list[str]", stored_radios)
for item in stored_radios:
try:
yield await self.get_radio(item)
"""Add item to provider's library. Return true on success."""
stored_radios = self.config.get_value(CONF_STORED_RADIOS)
if TYPE_CHECKING:
- stored_radios = cast(list[str], stored_radios)
+ stored_radios = cast("list[str]", stored_radios)
if item.item_id in stored_radios:
return False
self.logger.debug("Adding radio %s to stored radios", item.item_id)
"""Remove item from provider's library. Return true on success."""
stored_radios = self.config.get_value(CONF_STORED_RADIOS)
if TYPE_CHECKING:
- stored_radios = cast(list[str], stored_radios)
+ stored_radios = cast("list[str]", stored_radios)
if prov_item_id not in stored_radios:
return False
self.logger.debug("Removing radio %s from stored radios", prov_item_id)
bind_port=bind_port,
base_url=self._base_url,
static_routes=[
- ("*", "/{tail:.*}", cast(Awaitable, http_handler)),
+ ("*", "/{tail:.*}", cast("Awaitable", http_handler)),
],
)
from music_assistant_models.media_items import AudioFormat
from music_assistant_models.player import DeviceInfo, Player, PlayerMedia
from snapcast.control import create_server
-from snapcast.control.client import Snapclient
from zeroconf import NonUniqueNameException
from zeroconf.asyncio import AsyncServiceInfo
if TYPE_CHECKING:
from music_assistant_models.config_entries import ProviderConfig
from music_assistant_models.provider import ProviderManifest
+ from snapcast.control.client import Snapclient
from snapcast.control.group import Snapgroup
from snapcast.control.server import Snapserver
from snapcast.control.stream import Snapstream
player = self.mass.players.get(player_id, raise_unavailable=False)
if not player:
snap_client = cast(
- Snapclient, self._snapserver.client(self._get_snapclient_id(player_id))
+ "Snapclient", self._snapserver.client(self._get_snapclient_id(player_id))
)
player = Player(
player_id=player_id,
# Handle config option for manual IP's
manual_ip_config = cast(
- list[str], self.config.get_value(CONF_ENTRY_MANUAL_DISCOVERY_IPS.key)
+ "list[str]", self.config.get_value(CONF_ENTRY_MANUAL_DISCOVERY_IPS.key)
)
for ip_address in manual_ip_config:
try:
async def discover_household_ids(mass: MusicAssistant, prefer_s1: bool = True) -> list[str]:
"""Discover the HouseHold ID of S1 speaker(s) the network."""
if cache := await mass.cache.get("sonos_household_ids"):
- return cast(list[str], cache)
+ return cast("list[str]", cache)
household_ids: list[str] = []
def get_all_sonos_ips() -> set[SoCo]:
import pkce
code_verifier, code_challenge = pkce.generate_pkce_pair()
- async with AuthenticationHelper(mass, cast(str, values["session_id"])) as auth_helper:
+ async with AuthenticationHelper(mass, cast("str", values["session_id"])) as auth_helper:
params = {
"response_type": "code",
"client_id": values.get(CONF_CLIENT_ID) or app_var(2),
) -> None:
"""Initialize MusicProvider."""
super().__init__(mass, manifest, config)
- self.mass_player_id = cast(str, self.config.get_value(CONF_MASS_PLAYER_ID))
+ self.mass_player_id = cast("str", self.config.get_value(CONF_MASS_PLAYER_ID))
self.cache_dir = os.path.join(self.mass.cache_path, self.instance_id)
self._librespot_bin: str | None = None
self._stop_called: bool = False
if not album.year:
album.year = int(adb_album.get("intYearReleased", "0"))
if album.album_type == AlbumType.UNKNOWN and adb_album.get("strReleaseFormat"):
- releaseformat = cast(str, adb_album.get("strReleaseFormat"))
+ releaseformat = cast("str", adb_album.get("strReleaseFormat"))
album.album_type = ALBUMTYPE_MAPPING.get(releaseformat, AlbumType.UNKNOWN)
# update the artist mbid while at it
for album_artist in album.artists:
self.mass.http_session.get(url, params=kwargs, ssl=False) as response,
):
try:
- result = cast(dict[str, Any], await response.json())
+ result = cast("dict[str, Any]", await response.json())
except (
aiohttp.client_exceptions.ContentTypeError,
JSONDecodeError,
assert values is not None
if action == CONF_ACTION_START_PKCE_LOGIN:
- async with ManualAuthenticationHelper(mass, cast(str, values["session_id"])) as auth_helper:
+ async with ManualAuthenticationHelper(
+ mass, cast("str", values["session_id"])
+ ) as auth_helper:
quality = str(values.get(CONF_QUALITY))
base64_session = await TidalAuthManager.generate_auth_url(auth_helper, quality)
values[CONF_TEMP_SESSION] = base64_session
label=CONF_QUALITY,
required=True,
hidden=True,
- value=cast(str, values.get(CONF_QUALITY) or TidalQualityEnum.HI_RES.value),
- default_value=cast(str, values.get(CONF_QUALITY) or TidalQualityEnum.HI_RES.value),
+ value=cast("str", values.get(CONF_QUALITY) or TidalQualityEnum.HI_RES.value),
+ default_value=cast(
+ "str", values.get(CONF_QUALITY) or TidalQualityEnum.HI_RES.value
+ ),
),
)
else:
description="HIGH_LOSSLESS = 16bit 44.1kHz, HI_RES = Up to 24bit 192kHz",
options=[ConfigValueOption(x.value, x.name) for x in TidalQualityEnum],
default_value=TidalQualityEnum.HI_RES.value,
- value=cast(str, values.get(CONF_QUALITY)) if values else None,
+ value=cast("str", values.get(CONF_QUALITY)) if values else None,
),
ConfigEntry(
key=LABEL_START_PKCE_LOGIN,
action=CONF_ACTION_START_PKCE_LOGIN,
depends_on=CONF_QUALITY,
action_label="Starts the auth process via PKCE on Tidal.com",
- value=cast(str, values.get(CONF_TEMP_SESSION)) if values else None,
+ value=cast("str", values.get(CONF_TEMP_SESSION)) if values else None,
hidden=action == CONF_ACTION_START_PKCE_LOGIN,
),
ConfigEntry(
label="Temporary session for Tidal",
hidden=True,
required=False,
- value=cast(str, values.get(CONF_TEMP_SESSION)) if values else None,
+ value=cast("str", values.get(CONF_TEMP_SESSION)) if values else None,
),
ConfigEntry(
key=LABEL_OOPS_URL,
" Tidal.com and being redirected to a page that prominently displays"
" 'Oops' at the top.",
depends_on=CONF_ACTION_START_PKCE_LOGIN,
- value=cast(str, values.get(CONF_OOPS_URL)) if values else None,
+ value=cast("str", values.get(CONF_OOPS_URL)) if values else None,
hidden=action != CONF_ACTION_START_PKCE_LOGIN,
),
ConfigEntry(
label="Authentication token for Tidal",
description="You need to link Music Assistant to your Tidal account.",
hidden=True,
- value=cast(str, values.get(CONF_AUTH_TOKEN)) if values else None,
+ value=cast("str", values.get(CONF_AUTH_TOKEN)) if values else None,
),
ConfigEntry(
key=CONF_REFRESH_TOKEN,
label="Refresh token for Tidal",
description="You need to link Music Assistant to your Tidal account.",
hidden=True,
- value=cast(str, values.get(CONF_REFRESH_TOKEN)) if values else None,
+ value=cast("str", values.get(CONF_REFRESH_TOKEN)) if values else None,
),
ConfigEntry(
key=CONF_EXPIRY_TIME,
type=ConfigEntryType.STRING,
label="Expiry time of auth token for Tidal",
hidden=True,
- value=cast(str, values.get(CONF_EXPIRY_TIME)) if values else None,
+ value=cast("str", values.get(CONF_EXPIRY_TIME)) if values else None,
),
ConfigEntry(
key=CONF_USER_ID,
label="Your Tidal User ID",
description="This is your unique Tidal user ID.",
hidden=True,
- value=cast(str, values.get(CONF_USER_ID)) if values else None,
+ value=cast("str", values.get(CONF_USER_ID)) if values else None,
),
)
# Use data parameter for form-encoded data
async with self.mass.http_session.post(url, data=data, **kwargs) as response:
return cast(
- dict[str, Any],
+ "dict[str, Any]",
await self._handle_response(response, return_etag=False),
)
else:
# Use json parameter for JSON data (default)
async with self.mass.http_session.post(url, json=data, **kwargs) as response:
return cast(
- dict[str, Any],
+ "dict[str, Any]",
await self._handle_response(response, return_etag=False),
)
# For DELETE requests with a body, we need to use json parameter
async with self.mass.http_session.delete(url, json=data, **kwargs) as response:
- return cast(dict[str, Any], await self._handle_response(response, return_etag=False))
+ return cast("dict[str, Any]", await self._handle_response(response, return_etag=False))
async def _handle_response(
self, response: ClientResponse, return_etag: bool = False
"pytest-cov==5.0.0",
"syrupy==4.8.2",
"tomli==2.2.1",
- "ruff==0.9.6",
+ "ruff==0.11.2",
]
[project.scripts]
"EM102", # Just annoying, not really useful
"FIX002", # Just annoying, not really useful
"PLR2004", # Just annoying, not really useful
+ "PGH004", # Just annoying, not really useful
"PD011", # Just annoying, not really useful
"S101", # assert is often used to satisfy type checking
"TC001", # Just annoying, not really useful
"""Gather all of the requirements from provider manifests."""
dependencies: list[str] = []
providers_path = "music_assistant/providers"
- for dir_str in os.listdir(providers_path):
+ for dir_str in os.listdir(providers_path): # noqa: PTH208, RUF100
dir_path = os.path.join(providers_path, dir_str)
if not os.path.isdir(dir_path):
continue
# get files in subdirectory
- for file_str in os.listdir(dir_path):
+ for file_str in os.listdir(dir_path): # noqa: PTH208, RUF100
file_path = os.path.join(dir_path, file_str)
if not os.path.isfile(file_path):
continue