stream_type=StreamType.HTTP,
# explore the StreamDetails model and StreamType enum for more options
# but the above should be the mandatory fields to set.
+ allow_seek=True,
+ # set allow_seek to True if the stream may be seeked
+ can_seek=True,
+ # set can_seek to True if the stream supports seeking
)
async def get_audio_stream(
path=stream_url,
decryption_key=await self._get_decryption_key(license_url, key_id, uri, item_id),
can_seek=True,
+ allow_seek=True,
)
def _parse_artist(self, artist_obj):
import audible
import audible.register
from audible import AsyncClient
-from music_assistant_models.enums import (
- ContentType,
- ImageType,
- MediaType,
- StreamType,
-)
+from music_assistant_models.enums import ContentType, ImageType, MediaType, StreamType
from music_assistant_models.errors import LoginFailed
from music_assistant_models.media_items import (
Audiobook,
stream_type=StreamType.HTTP,
path=m3u8_url,
can_seek=True,
+ allow_seek=True,
duration=duration,
data={"acr": acr},
)
media_type=media_type,
stream_type=StreamType.HLS,
path=stream_url,
+ can_seek=True,
+ allow_seek=True,
)
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
media_type=MediaType.AUDIOBOOK,
stream_type=StreamType.HTTP,
path=stream_url,
+ can_seek=True,
+ allow_seek=True,
)
async def _get_stream_details_podcast_episode(self, podcast_id: str) -> StreamDetails:
media_type=MediaType.PODCAST_EPISODE,
stream_type=StreamType.HTTP,
path=full_url,
+ can_seek=True,
+ allow_seek=True,
)
async def on_played(
stream_type=StreamType.HTTP,
path=item_id,
can_seek=not is_radio,
+ allow_seek=not is_radio,
)
async def _get_builtin_playlist_random_favorite_tracks(self) -> list[Track]:
# separately so we can use it later on.
data={"url": url, "format": url_details["format"], "track_id": song_data["SNG_ID"]},
size=int(song_data[f"FILESIZE_{url_details['format']}"]),
+ can_seek=True,
+ allow_seek=True,
)
async def get_audio_stream(
data=file_item,
path=file_item.absolute_path,
can_seek=True,
+ allow_seek=True,
)
async def _get_stream_details_for_podcast_episode(self, item_id: str) -> StreamDetails:
),
stream_type=StreamType.HTTP,
path=url,
+ can_seek=True,
+ allow_seek=True,
)
async def _get_tracks(self, track_ids: list[int], is_playlist: bool = False) -> list[Track]:
jellyfin_track[ITEM_KEY_RUNTIME_TICKS] / 10000000
), # 10000000 ticks per millisecond)
path=url,
+ can_seek=True,
+ allow_seek=True,
)
async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
stream_type=StreamType.HTTP,
duration=plex_track.duration,
data=plex_track,
+ can_seek=True,
+ allow_seek=True,
)
if content_type != ContentType.M4A:
media_type=MediaType.PODCAST_EPISODE,
stream_type=StreamType.HTTP,
path=episode["enclosures"][0]["url"],
+ can_seek=True,
+ allow_seek=True,
)
raise MediaNotFoundError("Stream not found")
duration=streamdata["duration"],
data=streamdata, # we need these details for reporting playback
path=streamdata["url"],
+ can_seek=True,
+ allow_seek=True,
)
async def _report_playback_started(self, streamdata: dict) -> None:
if TYPE_CHECKING:
stored_radios = cast(list[str], stored_radios)
for item in stored_radios:
- yield await self.get_radio(item)
+ try:
+ yield await self.get_radio(item)
+ except MediaNotFoundError as err:
+ self.logger.warning("Radio station %s not found: %s", item, err)
async def library_add(self, item: MediaItemType) -> bool:
"""Add item to provider's library. Return true on success."""
stream_type=StreamType.HTTP,
path=stream.url_resolved,
can_seek=False,
+ allow_seek=False,
)
media_type=MediaType.RADIO,
path=hls_path,
can_seek=False,
+ allow_seek=False,
)
return self._current_stream_details
if url.startswith("https://cf-hls-media.sndcdn.com")
else StreamType.HTTP,
path=url,
+ can_seek=True,
+ allow_seek=True,
)
async def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
stream_type=StreamType.HTTP,
path=SILENCE_FILE_LONG,
can_seek=True,
+ allow_seek=True,
)
stream_type=StreamType.HTTP,
duration=track.duration,
path=url,
+ can_seek=True,
+ allow_seek=True,
)
@throttle_with_retries
media_type=MediaType.RADIO,
stream_type=StreamType.HTTP,
path=item_id,
+ allow_seek=False,
can_seek=False,
)
if "--" in item_id:
media_type=MediaType.RADIO,
stream_type=StreamType.HTTP,
path=stream["url"],
+ allow_seek=False,
can_seek=False,
)
msg = f"Unable to retrieve stream details for {item_id}"
),
stream_type=StreamType.HTTP,
path=stream_format["url"],
+ can_seek=True,
+ allow_seek=True,
)
if (
stream_format.get("audio_channels")