"""
if raw_conf := self.get(f"{CONF_PLAYER_DSP}/{player_id}"):
return DSPConfig.from_dict(raw_conf)
- else:
- # return default DSP config
- dsp_config = DSPConfig()
+ # return default DSP config
+ dsp_config = DSPConfig()
- deprecated_eq_bass = self.mass.config.get_raw_player_config_value(
- player_id, CONF_DEPRECATED_EQ_BASS, 0
- )
- deprecated_eq_mid = self.mass.config.get_raw_player_config_value(
- player_id, CONF_DEPRECATED_EQ_MID, 0
- )
- deprecated_eq_treble = self.mass.config.get_raw_player_config_value(
- player_id, CONF_DEPRECATED_EQ_TREBLE, 0
- )
- if deprecated_eq_bass != 0 or deprecated_eq_mid != 0 or deprecated_eq_treble != 0:
- # the user previously used the now deprecated EQ settings:
- # add a tone control filter with the old values, reset the deprecated values and
- # save this as the new DSP config
- # TODO: remove this in a future release
- dsp_config.enabled = True
- dsp_config.filters.append(
- ToneControlFilter(
- enabled=True,
- bass_level=float(deprecated_eq_bass)
- if isinstance(deprecated_eq_bass, (int, float, str))
- else 0.0,
- mid_level=float(deprecated_eq_mid)
- if isinstance(deprecated_eq_mid, (int, float, str))
- else 0.0,
- treble_level=float(deprecated_eq_treble)
- if isinstance(deprecated_eq_treble, (int, float, str))
- else 0.0,
- )
+ deprecated_eq_bass = self.mass.config.get_raw_player_config_value(
+ player_id, CONF_DEPRECATED_EQ_BASS, 0
+ )
+ deprecated_eq_mid = self.mass.config.get_raw_player_config_value(
+ player_id, CONF_DEPRECATED_EQ_MID, 0
+ )
+ deprecated_eq_treble = self.mass.config.get_raw_player_config_value(
+ player_id, CONF_DEPRECATED_EQ_TREBLE, 0
+ )
+ if deprecated_eq_bass != 0 or deprecated_eq_mid != 0 or deprecated_eq_treble != 0:
+ # the user previously used the now deprecated EQ settings:
+ # add a tone control filter with the old values, reset the deprecated values and
+ # save this as the new DSP config
+ # TODO: remove this in a future release
+ dsp_config.enabled = True
+ dsp_config.filters.append(
+ ToneControlFilter(
+ enabled=True,
+ bass_level=float(deprecated_eq_bass)
+ if isinstance(deprecated_eq_bass, (int, float, str))
+ else 0.0,
+ mid_level=float(deprecated_eq_mid)
+ if isinstance(deprecated_eq_mid, (int, float, str))
+ else 0.0,
+ treble_level=float(deprecated_eq_treble)
+ if isinstance(deprecated_eq_treble, (int, float, str))
+ else 0.0,
)
+ )
- deprecated_eq_keys = [
- CONF_DEPRECATED_EQ_BASS,
- CONF_DEPRECATED_EQ_MID,
- CONF_DEPRECATED_EQ_TREBLE,
- ]
- for key in deprecated_eq_keys:
- if self.mass.config.get_raw_player_config_value(player_id, key, 0) != 0:
- self.mass.config.set_raw_player_config_value(player_id, key, 0)
+ deprecated_eq_keys = [
+ CONF_DEPRECATED_EQ_BASS,
+ CONF_DEPRECATED_EQ_MID,
+ CONF_DEPRECATED_EQ_TREBLE,
+ ]
+ for key in deprecated_eq_keys:
+ if self.mass.config.get_raw_player_config_value(player_id, key, 0) != 0:
+ self.mass.config.set_raw_player_config_value(player_id, key, 0)
- self.set(f"{CONF_PLAYER_DSP}/{player_id}", dsp_config.to_dict())
- else:
- # The DSP config does not do anything by default, so we disable it
- dsp_config.enabled = False
+ self.set(f"{CONF_PLAYER_DSP}/{player_id}", dsp_config.to_dict())
+ else:
+ # The DSP config does not do anything by default, so we disable it
+ dsp_config.enabled = False
- return dsp_config
+ return dsp_config
@api_command("config/players/dsp/save", required_role="admin")
async def save_dsp_config(self, player_id: str, config: DSPConfig) -> DSPConfig:
if media_item.image and media_item.image.type == img_type:
if media_item.image.remotely_accessible and resolve:
return self.get_image_url(media_item.image)
- elif not media_item.image.remotely_accessible:
+ if not media_item.image.remotely_accessible:
return media_item.image.path
# Only retrieve full item if we don't have the image we need
else:
if media_type == MediaType.ARTIST:
return SearchResults(artists=[item])
- elif media_type == MediaType.ALBUM:
+ if media_type == MediaType.ALBUM:
return SearchResults(albums=[item])
- elif media_type == MediaType.TRACK:
+ if media_type == MediaType.TRACK:
return SearchResults(tracks=[item])
- elif media_type == MediaType.PLAYLIST:
+ if media_type == MediaType.PLAYLIST:
return SearchResults(playlists=[item])
- elif media_type == MediaType.AUDIOBOOK:
+ if media_type == MediaType.AUDIOBOOK:
return SearchResults(audiobooks=[item])
- elif media_type == MediaType.PODCAST:
+ if media_type == MediaType.PODCAST:
return SearchResults(podcasts=[item])
- else:
- return SearchResults()
+ return SearchResults()
# handle normal global search by querying all providers
results_per_provider: list[SearchResults] = []
# always first search the library
# Return the higher position to ensure users never lose progress
if ma_position_ms >= provider_position_ms:
return ma_fully_played, ma_position_ms
- else:
- return provider_fully_played, provider_position_ms
+ return provider_fully_played, provider_position_ms
def get_controller(
self, media_type: MediaType
# nothing to do anymore, player was not previously powered
# and does not support power control
return
- elif prev_synced_to:
+ if prev_synced_to:
self.logger.debug(
"Announcement to player %s - syncing back to %s...",
player.display_name,
"""Return the current playback state of the player."""
if self.powered:
return self.sync_leader.playback_state if self.sync_leader else PlaybackState.IDLE
- else:
- return PlaybackState.IDLE
+ return PlaybackState.IDLE
@cached_property
def flow_mode(self) -> bool:
"""
if self.is_dynamic and (leader := self.sync_leader):
return leader.can_group_with
- elif self.is_dynamic:
+ if self.is_dynamic:
return {self.provider.instance_id}
- else:
- return set()
+ return set()
async def get_config_entries(
self,
if raw_crossfade_output:
return raw_crossfade_output
- else:
- stderr_msg = stderr.decode() if stderr else "(no stderr output)"
- raise RuntimeError(f"Smart crossfade failed. FFmpeg stderr: {stderr_msg}")
+ stderr_msg = stderr.decode() if stderr else "(no stderr output)"
+ raise RuntimeError(f"Smart crossfade failed. FFmpeg stderr: {stderr_msg}")
def __repr__(self) -> str:
"""Return string representation of SmartFade showing the filter chain."""
# Exponential curve for smoother transitions
if direction == "up":
return f"'pow({norm_t},2)':eval=frame"
- else:
- return f"'1-pow({norm_t},2)':eval=frame"
- elif curve == "logarithmic":
+ return f"'1-pow({norm_t},2)':eval=frame"
+ if curve == "logarithmic":
# Logarithmic curve for more aggressive initial change
if direction == "up":
return f"'sqrt({norm_t})':eval=frame"
- else:
- return f"'1-sqrt({norm_t})':eval=frame"
- elif direction == "up":
+ return f"'1-sqrt({norm_t})':eval=frame"
+ if direction == "up":
return f"'{norm_t}':eval=frame"
- else:
- return f"'1-{norm_t}':eval=frame"
+ return f"'1-{norm_t}':eval=frame"
def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]:
"""Generate FFmpeg filters for frequency sweep effect."""
if "system-admin" in group_ids:
LOGGER.debug("HA user %s is admin, granting ADMIN role", ha_user_id)
return UserRole.ADMIN
- else:
- return UserRole.USER
+ return UserRole.USER
raise RuntimeError(f"HA user ID {ha_user_id} not found in user list")
except Exception as err:
msg = f"Failed to check HA admin status: {err}"
)
if i + 1 < len(parts):
return parts[i + 1 :], 0
- else:
- return parts[i:], int(position) # last part, cannot skip
+ return parts[i:], int(position) # last part, cannot skip
return parts[i:], int(position)
"""Validate Provider ID."""
if provider == "spotify":
return valid_base62_length22(item_id)
- else:
- return True
+ return True
async def parse_uri(uri: str, validate_id: bool = False) -> tuple[MediaType, str, str]:
):
# always ensure the player_id is in the group_members list for players
return [self.player_id, *self._attr_group_members]
- elif self._attr_group_members == [self.player_id]:
+ if self._attr_group_members == [self.player_id]:
return []
return self._attr_group_members
elapsed_time=int(active_queue.elapsed_time),
elapsed_time_last_updated=active_queue.elapsed_time_last_updated,
)
- elif active_queue:
+ if active_queue:
# queue is active but no current item
return None
# return native current media if no group/queue is active
"""Get stream of item."""
if media_type == MediaType.PODCAST_EPISODE:
return await self._get_stream_details_episode(item_id)
- elif media_type == MediaType.AUDIOBOOK:
+ if media_type == MediaType.AUDIOBOOK:
abs_audiobook = await self._get_abs_expanded_audiobook(prov_audiobook_id=item_id)
return await self._get_stream_details_audiobook(abs_audiobook)
raise MediaNotFoundError("Stream unknown")
if len(sub_path) == 1:
if lib_key == AbsBrowsePaths.LIBRARIES_PODCAST:
return await self._browse_lib_podcasts(library_id=lib_id)
- else:
- return self._browse_lib_audiobooks(current_path=path)
- elif len(sub_path) == 2:
+ return self._browse_lib_audiobooks(current_path=path)
+ if len(sub_path) == 2:
item_key = sub_path[1]
match item_key:
case AbsBrowsePaths.AUTHORS:
self.logger.debug(f"Getting stream details for {item_id} ({media_type})")
if media_type in [MediaType.PODCAST_EPISODE, MediaType.TRACK]:
return await self._get_playable_stream_details(item_id, media_type)
- else:
- return await self._get_station_stream_details(item_id)
+ return await self._get_station_stream_details(item_id)
async def _get_programme_segments(self, vpid: str) -> list[Segment] | None:
"""Get on demand segments from cache or API."""
for obj in [await self._render_browse_item(item) for item in category.sub_items]
if obj is not None
]
- else:
- return []
+ return []
async def _get_collection(
self, pid: str
]
if obj
]
- else:
- return []
+ return []
async def _get_menu(
self, path_parts: list[str] | None = None
) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
if self.client.auth.is_logged_in and await self.client.auth.is_uk_listener:
return await self._get_full_menu(path_parts=path_parts)
- else:
- return await self._get_slim_menu(path_parts=path_parts)
+ return await self._get_slim_menu(path_parts=path_parts)
async def _get_full_menu(
self, path_parts: list[str] | None = None
(BrowseFolder | Track | Podcast | PodcastEpisode | RecommendationFolder | Radio),
):
return new_item
- else:
- return None
+ return None
async def _get_subpath_menu(
self, sub_path: str
if new_folder:
items.append(new_folder)
return items
- elif sub_sub_path:
+ if sub_sub_path:
# Date listings for a station
date_folders = [
BrowseFolder(
]
)
return date_folders
- else:
- return [
- BrowseFolder(
- item_id=station.item_id,
- provider=self.domain,
- name=station.name,
- path="/".join([*path_parts, station.item_id]),
- image=(
- MediaItemImage(
- type=ImageType.THUMB,
- path=station.metadata.images[0].path,
- provider=self.domain,
- )
- if station.metadata.images
- else None
- ),
- )
- for station in await self._station_list(include_local=show_local)
- ]
+ return [
+ BrowseFolder(
+ item_id=station.item_id,
+ provider=self.domain,
+ name=station.name,
+ path="/".join([*path_parts, station.item_id]),
+ image=(
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=station.metadata.images[0].path,
+ provider=self.domain,
+ )
+ if station.metadata.images
+ else None
+ ),
+ )
+ for station in await self._station_list(include_local=show_local)
+ ]
async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
"""Browse this provider's items.
if sub_path == "":
return await self._get_menu()
- elif sub_path == "categories" and sub_sub_path:
+ if sub_path == "categories" and sub_sub_path:
return await self._get_category(sub_sub_path)
- elif sub_path == "collections" and sub_sub_path:
+ if sub_path == "collections" and sub_sub_path:
return await self._get_collection(sub_sub_path)
- elif sub_path != "stations":
+ if sub_path != "stations":
return await self._get_subpath_menu(sub_path)
- elif sub_path == "stations":
+ if sub_path == "stations":
return await self._get_station_schedule_menu(
self.show_local_stations, path_parts, sub_sub_path, sub_sub_sub_path
)
- else:
- return []
+ return []
async def search(
self, search_query: str, media_types: list[MediaType] | None, limit: int = 5
"""Convert the source object to target type."""
if isinstance(source_obj, Station):
return self._convert_station(source_obj)
- elif isinstance(source_obj, LiveStation):
+ if isinstance(source_obj, LiveStation):
return self._convert_live_station(source_obj)
- elif isinstance(source_obj, StationSearchResult):
+ if isinstance(source_obj, StationSearchResult):
return self._convert_station_search_result(source_obj)
self.logger.error(f"Failed to convert station {type(source_obj)}: {source_obj}")
raise ConversionError(f"Failed to convert station {type(source_obj)}: {source_obj}")
show_title=show.titles["secondary"],
date=_to_date(show.start),
)
- elif show.titles:
+ if show.titles:
# TODO: when getting a schedule listing, we have a broadcast time
# when we fetch the streaming details later we lose that from the new API call
title = self.SCHEDULE_ITEM_DEFAULT_FORMAT.format(
"""Convert podcast objects."""
if isinstance(source_obj, (Podcast, RadioSeries)) or self.context.force_type is Podcast:
return await self._convert_podcast(source_obj)
- elif isinstance(source_obj, PodcastEpisode):
+ if isinstance(source_obj, PodcastEpisode):
return await self._convert_podcast_episode(source_obj)
- elif isinstance(source_obj, RadioShow):
+ if isinstance(source_obj, RadioShow):
return await self._convert_radio_show(source_obj)
- elif isinstance(source_obj, RadioClip) or self.context.force_type is Track:
+ if isinstance(source_obj, RadioClip) or self.context.force_type is Track:
return await self._convert_radio_clip(source_obj)
return source_obj
),
provider_mappings={self._create_provider_mapping(show.pid)},
)
- else:
- # Handle as episode
- podcast = None
- if hasattr(show, "container") and show.container:
- podcast = await PodcastConverter(self.context).convert(show.container)
+ # Handle as episode
+ podcast = None
+ if hasattr(show, "container") and show.container:
+ podcast = await PodcastConverter(self.context).convert(show.container)
- if not podcast or not isinstance(podcast, MAPodcast):
- raise ConversionError(f"No podcast for episode for {show}")
+ if not podcast or not isinstance(podcast, MAPodcast):
+ raise ConversionError(f"No podcast for episode for {show}")
- return MAPodcastEpisode(
- item_id=show.pid,
- name=self._format_show_title(show),
- provider=self.context.provider_domain,
- duration=duration,
- resume_position_ms=resume_position,
- metadata=ImageProvider.create_metadata_with_image(
- show.image_url, self.context.provider_domain
- ),
- podcast=podcast,
- provider_mappings={self._create_provider_mapping(show.pid)},
- position=1,
- )
+ return MAPodcastEpisode(
+ item_id=show.pid,
+ name=self._format_show_title(show),
+ provider=self.context.provider_domain,
+ duration=duration,
+ resume_position_ms=resume_position,
+ metadata=ImageProvider.create_metadata_with_image(
+ show.image_url, self.context.provider_domain
+ ),
+ podcast=podcast,
+ provider_mappings={self._create_provider_mapping(show.pid)},
+ position=1,
+ )
async def _convert_radio_clip(self, clip: RadioClip) -> Track | MAPodcastEpisode:
duration = self._get_attr(clip, "duration.value")
podcast=podcast,
position=0,
)
- else:
- return Track(
- item_id=clip.pid,
- name=self._get_attr(clip, "titles.entity_title", "Unknown Track"),
- provider=self.context.provider_domain,
- duration=duration,
- metadata=ImageProvider.create_metadata_with_image(
- clip.image_url, self.context.provider_domain, description
- ),
- provider_mappings={self._create_provider_mapping(clip.pid)},
- )
+ return Track(
+ item_id=clip.pid,
+ name=self._get_attr(clip, "titles.entity_title", "Unknown Track"),
+ provider=self.context.provider_domain,
+ duration=duration,
+ metadata=ImageProvider.create_metadata_with_image(
+ clip.image_url, self.context.provider_domain, description
+ ),
+ provider_mappings={self._create_provider_mapping(clip.pid)},
+ )
class BrowseConverter(BaseConverter):
"""Convert browsable objects."""
if isinstance(source_obj, MenuItem) and self.context.force_type is not RecommendationFolder:
return self._convert_menu_item(source_obj)
- elif isinstance(source_obj, (Category, Collection)):
+ if isinstance(source_obj, (Category, Collection)):
return self._convert_category_or_collection(source_obj)
- elif isinstance(source_obj, Schedule):
+ if isinstance(source_obj, Schedule):
return self._convert_schedule(source_obj)
- elif isinstance(source_obj, RecommendedMenuItem):
+ if isinstance(source_obj, RecommendedMenuItem):
return await self._convert_recommended_item(source_obj)
self.logger.error(f"Failed to convert browse object {type(source_obj)}: {source_obj}")
raise ConversionError(f"Browse conversion failed: {source_obj}")
return await self._gw_api_call(
method, use_csrf_token, args, params, http_method, False
)
- else:
- msg = "Failed to call GW-API"
- raise DeezerGWError(msg, result_json["error"])
+ msg = "Failed to call GW-API"
+ raise DeezerGWError(msg, result_json["error"])
return cast("dict[str, Any]", result_json)
async def get_song_data(self, track_id: str) -> dict[str, Any]:
subfolder = str(self.config.get_value(CONF_SUBFOLDER))
if subfolder:
return subfolder
- elif share:
+ if share:
return share
return None
"""Get streamdetails for a track or audiobook."""
if "#" in item_id:
return self._get_single_file_stream(item_id, {}, media_type)
- else:
- audio_files = await self.provider.client.get_audio_files(item_id)
- if not audio_files:
- raise MediaNotFoundError(f"No audio files found for {item_id}")
-
- if media_type == MediaType.AUDIOBOOK and len(audio_files) > 1:
- return await self._get_multi_file_audiobook_stream(item_id, audio_files)
- else:
- return self._get_single_file_stream(item_id, audio_files[0], media_type)
+ audio_files = await self.provider.client.get_audio_files(item_id)
+ if not audio_files:
+ raise MediaNotFoundError(f"No audio files found for {item_id}")
+
+ if media_type == MediaType.AUDIOBOOK and len(audio_files) > 1:
+ return await self._get_multi_file_audiobook_stream(item_id, audio_files)
+ return self._get_single_file_stream(item_id, audio_files[0], media_type)
async def _get_multi_file_audiobook_stream(
self, item_id: str, audio_files: list[dict[str, Any]]
self.logger.debug("Found synchronized lyrics for %s by %s", track.name, artist_name)
return metadata
- else:
- self.logger.debug(
- "No synchronized lyrics found for %s by %s with album name %s and with a "
- "duration within 2 secs of %s",
- track.name,
- artist_name,
- album_name,
- duration,
- )
+ self.logger.debug(
+ "No synchronized lyrics found for %s by %s with album name %s and with a "
+ "duration within 2 secs of %s",
+ track.name,
+ artist_name,
+ album_name,
+ duration,
+ )
plain_lyrics = data.get("plainLyrics")
self.logger.debug("Found plain lyrics for %s by %s", track.name, artist_name)
return metadata
- else:
- self.logger.info(
- "No lyrics found for %s by %s with album name %s and with a "
- "duration within 2 secs of %s",
- track.name,
- artist_name,
- album_name,
- duration,
- )
+ self.logger.info(
+ "No lyrics found for %s by %s with album name %s and with a "
+ "duration within 2 secs of %s",
+ track.name,
+ artist_name,
+ album_name,
+ duration,
+ )
return None
):
# nothing to do, device is already connected
return
- else:
- # new or updated player detected
- physical_device = MusicCastPhysicalDevice(
- device=MusicCastDevice(
- client=self.mass.http_session,
- ip=device_ip,
- upnp_description=description_url,
- ),
- controller=self.mc_controller,
+ # new or updated player detected
+ physical_device = MusicCastPhysicalDevice(
+ device=MusicCastDevice(
+ client=self.mass.http_session,
+ ip=device_ip,
+ upnp_description=description_url,
+ ),
+ controller=self.mc_controller,
+ )
+ self.update_player_locks[device_id] = asyncio.Lock()
+ success = await physical_device.async_init() # fetch + polling
+ if not success:
+ self.logger.debug(
+ "Had trouble setting up device at %s. Will be retried on next discovery.",
+ device_ip,
)
- self.update_player_locks[device_id] = asyncio.Lock()
- success = await physical_device.async_init() # fetch + polling
- if not success:
- self.logger.debug(
- "Had trouble setting up device at %s. Will be retried on next discovery.",
- device_ip,
- )
- return
- await self._register_player(physical_device, device_id)
+ return
+ await self._register_player(physical_device, device_id)
async def _register_player(
self, physical_device: MusicCastPhysicalDevice, device_id: str
# Type-specific availability checks
if isinstance(video, EssentialVideo):
return not video.is_payment_required and not video.is_muted
- else: # WatchVideo
- return not video.is_deleted
+ # WatchVideo
+ return not video.is_deleted
)
},
)
- elif prov_artist_id.startswith(NAVI_VARIOUS_PREFIX):
+ if prov_artist_id.startswith(NAVI_VARIOUS_PREFIX):
# Special case for handling track artists on various artists album for Navidrome.
return Artist(
item_id=prov_artist_id,
if len(playlists) >= 50:
break
return playlists
- elif subpath == "years":
+ if subpath == "years":
return await self._browse_years(path, subsubpath)
- elif subpath == "recent":
+ if subpath == "recent":
return await self._browse_recent()
- elif subpath == "random":
+ if subpath == "random":
return await self._browse_random()
- elif subpath == "today":
+ if subpath == "today":
return await self._browse_today()
- elif subpath == "venues":
+ if subpath == "venues":
return await self._browse_venues(path, subsubpath)
- elif subpath == "tags":
+ if subpath == "tags":
return await self._browse_tags(path, subsubpath)
- elif subpath == "top_shows":
+ if subpath == "top_shows":
return await self._browse_top_shows()
- elif subpath == "top_tracks":
+ if subpath == "top_tracks":
return await self._browse_top_tracks()
return []
tag_slug, content_type = subsubpath.split("/", 1)
if content_type == "shows":
return await self._get_shows_for_tag(tag_slug)
- elif content_type == "tracks":
+ if content_type == "tracks":
return await self._get_tracks_for_tag(tag_slug)
- else:
- return []
+ return []
@use_cache(expiration=86400) # 24 hours - Tag associations could change as new shows are tagged
async def _get_tracks_for_tag(self, tag_slug: str) -> list[BrowseFolder | Album | Track]:
async def _parse(self, plex_media: PlexObject) -> MediaItem | None:
if plex_media.type == "artist":
return await self._parse_artist(plex_media)
- elif plex_media.type == "album":
+ if plex_media.type == "album":
return await self._parse_album(plex_media)
- elif plex_media.type == "track":
+ if plex_media.type == "track":
return await self._parse_track(plex_media)
- elif plex_media.type == "playlist":
+ if plex_media.type == "playlist":
return await self._parse_playlist(plex_media)
return None
local_server_ip = entry.get("from")[0]
local_server_port = data.get("Port")
return local_server_ip, local_server_port
- else:
- return None, None
+ return None, None
return await asyncio.to_thread(_discover_local_servers)
if subpath == BROWSE_TRENDING:
return await self._browse_trending()
- elif subpath == BROWSE_RECENT:
+ if subpath == BROWSE_RECENT:
return await self._browse_recent_episodes()
- elif subpath == BROWSE_CATEGORIES:
+ if subpath == BROWSE_CATEGORIES:
if len(subpath_parts) > 1:
# Browse specific category - category name is directly in path
category_name = subpath_parts[1]
return await self._browse_category_podcasts(category_name)
- else:
- # Browse categories
- return await self._browse_categories()
+ # Browse categories
+ return await self._browse_categories()
return []
"""Get single MediaItem from provider."""
if media_type == MediaType.PODCAST:
return await self.get_podcast(prov_item_id)
- elif media_type == MediaType.PODCAST_EPISODE:
+ if media_type == MediaType.PODCAST_EPISODE:
return await self.get_podcast_episode(prov_item_id)
- else:
- raise MediaNotFoundError(f"Media type {media_type} not supported by this provider")
+ raise MediaNotFoundError(f"Media type {media_type} not supported by this provider")
async def _fetch_podcasts(
self, endpoint: str, params: dict[str, Any] | None = None
new_id = "ma_" + str(re.sub(r"\W+", "", snap_client_id))
self._ids_map[new_id] = snap_client_id
return new_id
- else:
- return self._get_ma_id(snap_client_id)
+ return self._get_ma_id(snap_client_id)
def _handle_player_init(self, snap_client: Snapclient) -> SnapCastPlayer:
"""Process Snapcast add to Player controller."""
self._attr_current_media = airplay_player.current_media
# return early as we dont need further info
return
- else:
- self._attr_active_source = SOURCE_AIRPLAY
+ self._attr_active_source = SOURCE_AIRPLAY
elif (
container_type == ContainerType.STATION
and active_service != MusicService.MUSIC_ASSISTANT
# Handle system playlists
result = await self._soundcloud.get_system_playlist_details(prov_playlist_id)
return cast("dict[str, Any]", result)
- else:
- # Handle regular playlists
- result = await self._soundcloud.get_playlist_details(prov_playlist_id)
- return cast("dict[str, Any]", result)
+ # Handle regular playlists
+ result = await self._soundcloud.get_playlist_details(prov_playlist_id)
+ return cast("dict[str, Any]", result)
@use_cache(3600 * 3) # Cache for 3 hours
async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
position_ms = resume_point.get("resume_position_ms", 0)
return fully_played, position_ms
- elif media_type == MediaType.AUDIOBOOK:
+ if media_type == MediaType.AUDIOBOOK:
if not self.audiobooks_supported:
raise NotImplementedError("Audiobook support is disabled")
if not self.audiobook_progress_sync_enabled:
return prov, ret_id
# no subsonic mapping has been found in library item, ignore...
return None, item_id
- elif provider_instance_id_or_domain.startswith("opensubsonic"):
+ if provider_instance_id_or_domain.startswith("opensubsonic"):
# found a subsonic mapping, proceed...
prov = self.mass.get_provider(provider_instance_id_or_domain)
assert isinstance(prov, OpenSonicProvider)
)
type_counts[MediaType.PLAYLIST] += 1
return media_item
- elif item_type == "PLAYLIST":
+ if item_type == "PLAYLIST":
media_item = parse_playlist(self.provider, item)
type_counts[MediaType.PLAYLIST] += 1
return media_item
- elif item_type == "ALBUM":
+ if item_type == "ALBUM":
media_item = parse_album(self.provider, item)
type_counts[MediaType.ALBUM] += 1
return media_item
- elif item_type == "TRACK":
+ if item_type == "TRACK":
media_item = parse_track(self.provider, item)
type_counts[MediaType.TRACK] += 1
return media_item
- elif item_type == "ARTIST":
+ if item_type == "ARTIST":
media_item = parse_artist(self.provider, item)
type_counts[MediaType.ARTIST] += 1
return media_item
- else:
- # Last resort - try to infer from structure for unlabeled items
- if "uuid" in item:
- media_item = parse_playlist(self.provider, item)
- type_counts[MediaType.PLAYLIST] += 1
- return media_item
- elif "id" in item and "title" in item and "duration" in item:
- media_item = parse_track(self.provider, item)
- type_counts[MediaType.TRACK] += 1
- return media_item
- elif "id" in item and "title" in item and "numberOfTracks" in item:
- media_item = parse_album(self.provider, item)
- type_counts[MediaType.ALBUM] += 1
- return media_item
+ # Last resort - try to infer from structure for unlabeled items
+ if "uuid" in item:
+ media_item = parse_playlist(self.provider, item)
+ type_counts[MediaType.PLAYLIST] += 1
+ return media_item
+ if "id" in item and "title" in item and "duration" in item:
+ media_item = parse_track(self.provider, item)
+ type_counts[MediaType.TRACK] += 1
+ return media_item
+ if "id" in item and "title" in item and "numberOfTracks" in item:
+ media_item = parse_album(self.provider, item)
+ type_counts[MediaType.ALBUM] += 1
+ return media_item
- self.logger.warning("Unknown item type, could not parse: %s", item)
- return None
+ self.logger.warning("Unknown item type, could not parse: %s", item)
+ return None
except (KeyError, ValueError, TypeError) as err:
self.logger.debug("Error parsing %s item: %s", item_type, err)
"ARG002",
"S311",
"TRY301",
- "RET505",
"PLR0912",
"B904",
"TRY401",
try:
if isinstance(data, list):
return [fixture_type.model_validate(item) for item in data]
- else:
- # Single object case
- return fixture_type.model_validate(data)
+ # Single object case
+ return fixture_type.model_validate(data)
except Exception as e:
pytest.fail(f"Failed to validate fixture {relative_path}: {e}")
if isinstance(obj, dict):
# Sort dictionary keys and recursively process values
return {key: sort_dict_keys_and_lists(obj[key]) for key in sorted(obj.keys())}
- elif isinstance(obj, list):
+ if isinstance(obj, list):
# Recursively process list items first
sorted_items = [sort_dict_keys_and_lists(item) for item in obj]
try: