if ProviderFeature.SIMILAR_TRACKS in self.supported_features:
raise NotImplementedError
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a track/radio/chapter/episode."""
raise NotImplementedError
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING
-from music_assistant_models.enums import MediaType
-
from .provider import Provider
if TYPE_CHECKING:
+ from music_assistant_models.enums import MediaType
from music_assistant_models.media_items import PluginSource
from music_assistant_models.streamdetails import StreamDetails
# Will only be called if ProviderFeature.AUDIO_SOURCE is declared
raise NotImplementedError
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the streamdetails to stream an (audio)source provided by this plugin."""
# Will only be called if ProviderFeature.AUDIO_SOURCE is declared
raise NotImplementedError
# Get a list of similar tracks based on the provided track.
# This is only called if the provider supports the SIMILAR_TRACKS feature.
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a track/radio."""
# Get stream details for a track or radio.
# Implementing this method is MANDATORY to allow playback.
found_tracks.append(self._parse_track(track))
return found_tracks
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
stream_metadata = await self._fetch_song_stream_metadata(item_id)
license_url = stream_metadata["hls-key-server-url"]
raise ValueError(f"Audiobook with id {prov_audiobook_id} not found")
return audiobook
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.AUDIOBOOK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a audiobook based of asin."""
return await self.helper.get_stream(asin=item_id)
path=stream_url,
)
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get stream of item."""
# self.logger.debug(f"Streamdetails: {item_id}")
if media_type == MediaType.PODCAST_EPISODE:
)
return media_info
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a track/radio."""
media_info = await self._get_media_info(item_id)
is_radio = media_info.get("icy-name") or not media_info.duration
]["data"][:limit]
return [await self.get_track(track["SNG_ID"]) for track in tracks]
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
url_details, song_data = await self.gw_client.get_deezer_track_urls(item_id)
url = url_details["sources"][0]["url"]
return tracks
return await self._get_tracks(playlist_obj["tracks"], True)
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
# How to buildup a stream url:
# [streaming_server]/[url]?Expires=[now]&Signature=[user token]&file_id=[file ID]
for album in albums["Items"]
]
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
jellyfin_track = await self._client.get_track(item_id)
url = self._client.audio_url(
msg = f"Failed to remove songs from {prov_playlist_id}, check your permissions."
raise ProviderPermissionDenied(msg) from ex
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get the details needed to process a specified track."""
item: SonicSong | SonicEpisode
if media_type == MediaType.TRACK:
return albums
return []
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a track."""
plex_track = await self._get_data(item_id, PlexTrack)
if not plex_track or not plex_track.media:
episodes.append(await self._parse_episode(episode, idx))
return episodes
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a track/radio."""
for episode in self.parsed["episodes"]:
if item_id == episode["guid"]:
playlist_track_ids=",".join(playlist_track_ids),
)
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
streamdata = None
for format_id in [27, 7, 6, 5]:
return radio
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.RADIO
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a radio station."""
stream = await self.radios.station(uuid=item_id)
if not stream:
return self._parse_radio(self._channels_by_id[prov_radio_id])
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.RADIO
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a track/radio."""
# There's a chance that the SiriusXM auth session has expired
# by the time the user clicks to play a station. The sxm-client
return tracks
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
url: str = await self._soundcloud.get_stream_url(track_id=item_id)
return StreamDetails(
items = await self._get_data(endpoint, seed_tracks=prov_track_id, limit=limit)
return [self._parse_track(item) for item in items["tracks"] if (item and item["id"])]
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
return StreamDetails(
item_id=item_id,
},
)
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the streamdetails to stream an audiosource provided by this plugin."""
self._current_streamdetails = streamdetails = StreamDetails(
item_id=CONNECT_ITEM_ID,
position=int(episode_idx),
)
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a track/radio."""
return StreamDetails(
provider=self.lookup_key,
from enum import StrEnum
from typing import TYPE_CHECKING, ParamSpec, TypeVar, cast
-from music_assistant_models.config_entries import (
- ConfigEntry,
- ConfigValueOption,
- ConfigValueType,
-)
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
from music_assistant_models.enums import (
AlbumType,
CacheCategory,
from music_assistant.helpers.auth import AuthenticationHelper
from music_assistant.helpers.tags import AudioTags, async_parse_tags
-from music_assistant.helpers.throttle_retry import (
- ThrottlerManager,
- throttle_with_retries,
-)
+from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
from music_assistant.models.music_provider import MusicProvider
from .helpers import (
)
return self._parse_playlist(playlist_obj)
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
tidal_session = await self._get_tidal_session()
# make sure a valid track is requested.
await self.mass.cache.set(preset_id, result, base_key=cache_base_key)
return result
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.RADIO
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a radio station."""
if item_id.startswith("http"):
# custom url
return tracks
return []
- async def get_stream_details(
- self, item_id: str, media_type: MediaType = MediaType.TRACK
- ) -> StreamDetails:
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
if media_type == MediaType.PODCAST_EPISODE:
item_id = item_id.split(PODCAST_EPISODE_SPLITTER)[1]