from typing import TYPE_CHECKING, Any, TypeVar, cast
from music_assistant_models.enums import EventType, ExternalID, MediaType, ProviderFeature
-from music_assistant_models.errors import MediaNotFoundError, ProviderUnavailableError
+from music_assistant_models.errors import (
+ InsufficientPermissions,
+ MediaNotFoundError,
+ ProviderUnavailableError,
+)
from music_assistant_models.media_items import ItemMapping, MediaItemType, ProviderMapping, Track
from music_assistant.constants import DB_TABLE_PLAYLOG, DB_TABLE_PROVIDER_MAPPINGS, MASS_LOGGER_NAME
limit=limit,
offset=offset,
order_by=order_by,
- provider=provider,
+ provider_filter=self._ensure_provider_filter(provider),
extra_query_parts=[extra_query] if extra_query else None,
extra_query_params=extra_query_params,
)
limit: int = 500,
offset: int = 0,
order_by: str | None = None,
- provider: str | list[str] | None = None,
+ provider_filter: list[str] | None = None,
extra_query_parts: list[str] | None = None,
extra_query_params: dict[str, Any] | None = None,
extra_join_parts: list[str] | None = None,
query_params = extra_query_params or {}
query_parts: list[str] = extra_query_parts or []
join_parts: list[str] = extra_join_parts or []
-
search = self._preprocess_search(search, query_params)
-
# create special performant random query
if order_by and order_by.startswith("random"):
self._apply_random_subquery(
- query_parts, query_params, join_parts, favorite, search, provider, limit
+ query_parts=query_parts,
+ query_params=query_params,
+ join_parts=join_parts,
+ favorite=favorite,
+ search=search,
+ provider_filter=provider_filter,
+ limit=limit,
)
else:
# apply filters
- self._apply_filters(query_parts, query_params, join_parts, favorite, search, provider)
-
+ self._apply_filters(
+ query_parts=query_parts,
+ query_params=query_params,
+ join_parts=join_parts,
+ favorite=favorite,
+ search=search,
+ provider_filter=provider_filter,
+ )
# build and execute final query
sql_query = self._build_final_query(query_parts, join_parts, order_by)
return [
join_parts: list[str],
favorite: bool | None,
search: str | None,
- provider: str | list[str] | None,
+ provider_filter: list[str] | None,
limit: int,
) -> None:
"""Build a fast random subquery with all filters applied."""
# Apply all filters to the subquery
self._apply_filters(
- sub_query_parts, query_params, sub_join_parts, favorite, search, provider
+ query_parts=sub_query_parts,
+ query_params=query_params,
+ join_parts=sub_join_parts,
+ favorite=favorite,
+ search=search,
+ provider_filter=provider_filter,
)
# Build the subquery
join_parts: list[str],
favorite: bool | None,
search: str | None,
- provider: str | list[str] | None,
+ provider_filter: list[str] | None,
) -> None:
- """Apply search, favorite, and provider filters.
-
- If the current user has a provider_filter set, it will be applied automatically.
- If an explicit provider filter is provided, it will be validated against the user's
- allowed providers.
- """
+ """Apply search, favorite, and provider filters."""
# handle search
if search:
query_parts.append(f"{self.db_table}.search_name LIKE :search")
-
# handle favorite filter
if favorite is not None:
query_parts.append(f"{self.db_table}.favorite = :favorite")
query_params["favorite"] = favorite
-
- # Apply user provider filter
- user = get_current_user()
- user_provider_filter = user.provider_filter if user and user.provider_filter else None
-
- # Determine final provider filter
- final_provider_filter: list[str] | None = None
-
- if user_provider_filter:
- # User has a provider filter set
- if provider:
- # Explicit provider filter provided - validate against user's allowed providers
- requested_providers = [provider] if isinstance(provider, str) else provider
- # Only include providers that are in both the user's filter and the requested list
- final_provider_filter = [
- p for p in requested_providers if p in user_provider_filter
- ]
- if not final_provider_filter:
- # No overlap - user requested providers they don't have access to
- # Return empty results by adding impossible condition
- query_parts.append("1 = 0")
- return
- else:
- # No explicit filter - use user's provider filter
- final_provider_filter = user_provider_filter
- elif provider:
- # No user filter, but explicit provider filter provided
- final_provider_filter = [provider] if isinstance(provider, str) else provider
-
- # Apply the final provider filter
- if final_provider_filter:
+ # Apply the provider filter
+ if provider_filter:
provider_conditions = []
- for prov in final_provider_filter:
+ for prov in provider_filter:
provider_conditions.append(
f"provider_mappings.provider_instance = '{prov}' "
f"OR provider_mappings.provider_domain = '{prov}'"
else:
db_row_dict["metadata"]["images"] = [album_thumb]
return db_row_dict
+
+ def _ensure_provider_filter(
+ self,
+ provider: str | list[str] | None,
+ ) -> list[str] | None:
+ """Ensure the provider filter respects the current user's provider filter."""
+ # Apply user provider filter if needed
+ user = get_current_user()
+ user_provider_filter = user.provider_filter if user and user.provider_filter else None
+ final_provider_filter: list[str] | None = None
+ if user_provider_filter:
+ # User has a provider filter set
+ if provider:
+ # Explicit provider filter provided - validate against user's allowed providers
+ requested_providers = [provider] if isinstance(provider, str) else provider
+ # Only include providers that are in both the user's filter and the requested list
+ final_provider_filter = [
+ p for p in requested_providers if p in user_provider_filter
+ ]
+ if not final_provider_filter:
+ # No overlap - user requested providers they don't have access to
+ raise InsufficientPermissions(
+ "User does not have permission to access the requested provider(s)."
+ )
+ else:
+ # No explicit filter - use user's provider filter
+ final_provider_filter = user_provider_filter
+ return final_provider_filter
# so instead we just query the db...
if media_types is None or MediaType.TRACK in media_types:
result.tracks = await self.mass.music.tracks._get_library_items_by_query(
- search=search_query, provider=self.instance_id, limit=limit
+ search=search_query, provider_filter=[self.instance_id], limit=limit
)
if media_types is None or MediaType.ALBUM in media_types:
result.albums = await self.mass.music.albums._get_library_items_by_query(
search=search_query,
- provider=self.instance_id,
+ provider_filter=[self.instance_id],
limit=limit,
)
if media_types is None or MediaType.ARTIST in media_types:
result.artists = await self.mass.music.artists._get_library_items_by_query(
search=search_query,
- provider=self.instance_id,
+ provider_filter=[self.instance_id],
limit=limit,
)
if media_types is None or MediaType.PLAYLIST in media_types:
result.playlists = await self.mass.music.playlists._get_library_items_by_query(
search=search_query,
- provider=self.instance_id,
+ provider_filter=[self.instance_id],
limit=limit,
)
if media_types is None or MediaType.AUDIOBOOK in media_types:
result.audiobooks = await self.mass.music.audiobooks._get_library_items_by_query(
search=search_query,
- provider=self.instance_id,
+ provider_filter=[self.instance_id],
limit=limit,
)
if media_types is None or MediaType.PODCAST in media_types:
result.podcasts = await self.mass.music.podcasts._get_library_items_by_query(
search=search_query,
- provider=self.instance_id,
+ provider_filter=[self.instance_id],
limit=limit,
)
return result