# No user filter - use the provided filter as is
final_provider_filter = [provider] if isinstance(provider, str) else provider
return final_provider_filter
+
+ def _select_provider_id(self, library_item: ItemCls) -> tuple[str, str]:
+ """Select the correct provider id to use for fetching the item."""
+ user = get_current_user()
+ user_provider_filter = user.provider_filter if user and user.provider_filter else None
+ # prefer user provider filter if available
+ for mapping in library_item.provider_mappings:
+ if user_provider_filter and mapping.provider_instance not in user_provider_filter:
+ continue
+ return (mapping.provider_instance, mapping.item_id)
+ # fallback to first mapping
+ mapping = next(iter(library_item.provider_mappings))
+ return (mapping.provider_instance, mapping.item_id)
"""Return playlist tracks for the given provider playlist id."""
if provider_instance_id_or_domain == "library":
library_item = await self.get_library_item(item_id)
- # a playlist can only have one provider so simply pick the first one
- prov_map = next(x for x in library_item.provider_mappings)
- item_id = prov_map.item_id
- provider_instance_id_or_domain = prov_map.provider_instance
+ provider_instance_id_or_domain, item_id = self._select_provider_id(library_item)
# playlist tracks are not stored in the db,
# we always fetched them (cached) from the provider
page = 0
library_podcast = await self.get_library_item(item_id)
if not library_podcast:
raise MediaNotFoundError(f"Podcast {item_id} not found in library")
- for provider_mapping in library_podcast.provider_mappings:
- item_id = provider_mapping.item_id
- provider_instance_id_or_domain = provider_mapping.provider_instance
- break
+ provider_instance_id_or_domain, item_id = self._select_provider_id(library_podcast)
# podcast episodes are not stored in the db/library
# so we always need to fetch them from the provider
async for episode in self._get_provider_podcast_episodes(
"""
Return all unique MusicProvider (instance or domain) ids.
- This will return instance_ids for non-streaming providers
- and domain names for streaming providers to avoid duplicates.
+ This will return a set of provider instance ids but will only return
+ a single instance_id per streaming provider domain.
"""
+ processed_domains: set[str] = set()
+ # Get user provider filter if set
+ user = get_current_user()
+ user_provider_filter = user.provider_filter if user and user.provider_filter else None
result: set[str] = set()
for provider in self.providers:
- if provider.is_streaming_provider:
- result.add(provider.domain)
- else:
- result.add(provider.instance_id)
+ if provider.is_streaming_provider and provider.domain in processed_domains:
+ continue
+ if user_provider_filter and provider.instance_id not in user_provider_filter:
+ continue
+ result.add(provider.instance_id)
+ processed_domains.add(provider.domain)
return result
async def cleanup_provider(self, provider_instance: str) -> None: