f"ON {DB_TABLE_CACHE}(base_key);"
)
await self.database.execute(
- f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_sub_key_idx "
- f"ON {DB_TABLE_CACHE}(sub_key);"
+ f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_sub_key_idx ON {DB_TABLE_CACHE}(sub_key);"
)
await self.database.execute(
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_category_base_key_idx "
if prov.domain == raw_conf["domain"]:
break
else:
- msg = f'Unknown provider domain: {raw_conf["domain"]}'
+ msg = f"Unknown provider domain: {raw_conf['domain']}"
raise KeyError(msg)
return ProviderConfig.parse(config_entries, raw_conf)
msg = f"No config found for provider id {instance_id}"
query_parts = [x[5:] if x.lower().startswith("where ") else x for x in query_parts]
# concetenate all join and/or where queries
if join_parts:
- sql_query += f' {" ".join(join_parts)} '
+ sql_query += f" {' '.join(join_parts)} "
if query_parts:
sql_query += " WHERE " + " AND ".join(query_parts)
# build final query
self.logger.debug("Performing database cleanup...")
# Remove playlog entries older than 90 days
await self.database.delete_where_query(
- DB_TABLE_PLAYLOG, f"timestamp < strftime('%s','now') - {3600 * 24 * 90}"
+ DB_TABLE_PLAYLOG, f"timestamp < strftime('%s','now') - {3600 * 24 * 90}"
)
# db tables cleanup
for ctrl in (
)
# index on play_count
await self.database.execute(
- f"CREATE INDEX IF NOT EXISTS {db_table}_play_count_idx "
- f"on {db_table}(play_count);"
+ f"CREATE INDEX IF NOT EXISTS {db_table}_play_count_idx on {db_table}(play_count);"
)
# index on last_played
await self.database.execute(
- f"CREATE INDEX IF NOT EXISTS {db_table}_last_played_idx "
- f"on {db_table}(last_played);"
+ f"CREATE INDEX IF NOT EXISTS {db_table}_last_played_idx on {db_table}(last_played);"
)
# indexes on provider_mappings table
self._prev_states: dict[str, CompareState] = {}
self.manifest.name = "Player Queues controller"
self.manifest.description = (
- "Music Assistant's core controller " "which manages the queues for all players."
+ "Music Assistant's core controller which manages the queues for all players."
)
self.manifest.icon = "playlist-music"
return crossfaded_audio
# no crossfade_data, return original data instead
LOGGER.debug(
- "crossfade of pcm chunks failed: not enough data? "
- "- fade_in_part: %s - fade_out_part: %s",
+ "crossfade of pcm chunks failed: not enough data? - fade_in_part: %s - fade_out_part: %s",
len(fade_in_part),
len(fade_out_part),
)
"""Insert data in given table."""
keys = tuple(values.keys())
if allow_replace:
- sql_query = f'INSERT OR REPLACE INTO {table}({",".join(keys)})'
+ sql_query = f"INSERT OR REPLACE INTO {table}({','.join(keys)})"
else:
- sql_query = f'INSERT INTO {table}({",".join(keys)})'
- sql_query += f' VALUES ({",".join(f":{x}" for x in keys)})'
+ sql_query = f"INSERT INTO {table}({','.join(keys)})"
+ sql_query += f" VALUES ({','.join(f':{x}' for x in keys)})"
row_id = await self._db.execute_insert(sql_query, values)
await self._db.commit()
return row_id[0]
) -> Mapping:
"""Update record."""
keys = tuple(values.keys())
- sql_query = f'UPDATE {table} SET {",".join(f"{x}=:{x}" for x in keys)} WHERE '
+ sql_query = f"UPDATE {table} SET {','.join(f'{x}=:{x}' for x in keys)} WHERE "
sql_query += " AND ".join(f"{x} = :{x}" for x in match)
await self.execute(sql_query, {**match, **values})
await self._db.commit()
static_content: tuple[str, str, str] | None = None,
) -> None:
"""Async initialize of module."""
- self._base_url = base_url[:-1] if base_url.endswith("/") else base_url
+ self._base_url = base_url.removesuffix("/")
self._bind_port = bind_port
self._static_routes = static_routes
self._webapp = web.Application(
self._players = {}
self.cliraop_bin: str | None = await get_cliraop_binary()
dacp_port = await select_free_port(39831, 49831)
- self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}"
+ self.dacp_id = dacp_id = f"{randrange(2**64):X}"
self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port)
self._dacp_server = await asyncio.start_server(
self._handle_dacp_request, "0.0.0.0", dacp_port
subfolder = subfolder.replace("\\", "/")
if not subfolder.startswith("/"):
subfolder = "/" + subfolder
- if subfolder.endswith("/"):
- subfolder = subfolder[:-1]
+ subfolder = subfolder.removesuffix("/")
env_vars = {
**os.environ,
player_entities: list[ConfigValueOption] = []
if hass_prov and hass_prov.hass.connected:
async for state in _get_hass_media_players(hass_prov):
- name = f'{state["attributes"]["friendly_name"]} ({state["entity_id"]})'
+ name = f"{state['attributes']['friendly_name']} ({state['entity_id']})"
player_entities.append(ConfigValueOption(name, state["entity_id"]))
return (
ConfigEntry(
type=ConfigEntryType.STRING,
label="Base URL",
required=True,
- description="Base URL for the server, e.g. " "https://subsonic.mydomain.tld",
+ description="Base URL for the server, e.g. https://subsonic.mydomain.tld",
),
ConfigEntry(
key=CONF_PORT,
raise CredentialError
except (AuthError, CredentialError) as e:
msg = (
- "Failed to connect to "
- f"{self.config.get_value(CONF_BASE_URL)}"
- ", check your settings."
+ f"Failed to connect to {self.config.get_value(CONF_BASE_URL)}, check your settings."
)
raise LoginFailed(msg) from e
self._enable_podcasts = bool(self.config.get_value(CONF_ENABLE_PODCASTS))
item_id=str(artist_obj["id"]),
provider_domain=self.domain,
provider_instance=self.instance_id,
- url=f'https://open.qobuz.com/artist/{artist_obj["id"]}',
+ url=f"https://open.qobuz.com/artist/{artist_obj['id']}",
)
},
)
sample_rate=album_obj["maximum_sampling_rate"] * 1000,
bit_depth=album_obj["maximum_bit_depth"],
),
- url=f'https://open.qobuz.com/album/{album_obj["id"]}',
+ url=f"https://open.qobuz.com/album/{album_obj['id']}",
)
},
)
sample_rate=track_obj["maximum_sampling_rate"] * 1000,
bit_depth=track_obj["maximum_bit_depth"],
),
- url=f'https://open.qobuz.com/track/{track_obj["id"]}',
+ url=f"https://open.qobuz.com/track/{track_obj['id']}",
)
},
disc_number=track_obj.get("media_number", 0),
item_id=str(playlist_obj["id"]),
provider_domain=self.domain,
provider_instance=self.instance_id,
- url=f'https://open.qobuz.com/playlist/{playlist_obj["id"]}',
+ url=f"https://open.qobuz.com/playlist/{playlist_obj['id']}",
)
},
is_editable=is_editable,
liked_songs = Playlist(
item_id=self._get_liked_songs_playlist_id(),
provider=self.lookup_key,
- name=f'Liked Songs {self._sp_user["display_name"]}', # TODO to be translated
+ name=f"Liked Songs {self._sp_user['display_name']}", # TODO to be translated
owner=self._sp_user["display_name"],
provider_mappings={
ProviderMapping(
for item in spotify_result["items"]:
if not (item and item["track"] and item["track"]["id"]):
continue
- track_uris.append({"uri": f'spotify:track:{item["track"]["id"]}'})
+ track_uris.append({"uri": f"spotify:track:{item['track']['id']}"})
data = {"tracks": track_uris}
await self._delete_data(f"playlists/{prov_playlist_id}/tracks", data=data)
kwargs["country"] = "from_token"
if not (auth_info := kwargs.pop("auth_info", None)):
auth_info = await self.login()
- headers = {"Authorization": f'Bearer {auth_info["access_token"]}'}
+ headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
locale = self.mass.metadata.locale.replace("_", "-")
language = locale.split("-")[0]
headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5"
"""Delete data from api."""
url = f"https://api.spotify.com/v1/{endpoint}"
auth_info = kwargs.pop("auth_info", await self.login())
- headers = {"Authorization": f'Bearer {auth_info["access_token"]}'}
+ headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
async with self.mass.http_session.delete(
url, headers=headers, params=kwargs, json=data, ssl=False
) as response:
"""Put data on api."""
url = f"https://api.spotify.com/v1/{endpoint}"
auth_info = kwargs.pop("auth_info", await self.login())
- headers = {"Authorization": f'Bearer {auth_info["access_token"]}'}
+ headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
async with self.mass.http_session.put(
url, headers=headers, params=kwargs, json=data, ssl=False
) as response:
"""Post data on api."""
url = f"https://api.spotify.com/v1/{endpoint}"
auth_info = kwargs.pop("auth_info", await self.login())
- headers = {"Authorization": f'Bearer {auth_info["access_token"]}'}
+ headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
async with self.mass.http_session.post(
url, headers=headers, params=kwargs, json=data, ssl=False
) as response:
name=name,
provider_mappings={
ProviderMapping(
- item_id=f'{details["preset_id"]}--{stream["media_type"]}',
+ item_id=f"{details['preset_id']}--{stream['media_type']}",
provider_domain=self.domain,
provider_instance=self.instance_id,
audio_format=AudioFormat(
"pytest-cov==5.0.0",
"syrupy==4.8.1",
"tomli==2.2.1",
- "ruff==0.8.6",
+ "ruff==0.9.1",
]
[project.scripts]
"PTH202",
"ASYNC109",
"ASYNC110",
+ "A005",
]
select = ["ALL"]
print(
"\n*** Trace for largest memory block - "
- f"({largest.count} blocks, {largest.size/1024} Kb) ***"
+ f"({largest.count} blocks, {largest.size / 1024} Kb) ***"
)
for l in largest.traceback.format():
print(l)