from __future__ import annotations
+import contextlib
from enum import StrEnum
from typing import Self
AIFF = "aiff"
WMA = "wma"
M4A = "m4a"
+ MP4 = "mp4"
M4B = "m4b"
DSF = "dsf"
OPUS = "opus"
for splitter in (".", ","):
if splitter in tempstr:
for val in tempstr.split(splitter):
- try:
- return cls(val.strip())
- except ValueError:
- pass
-
+ with contextlib.suppress(ValueError):
+ parsed = cls(val.strip())
+ if parsed != ContentType.UNKNOWN:
+ return parsed
tempstr = tempstr.split("?")[0]
tempstr = tempstr.split("&")[0]
tempstr = tempstr.split(";")[0]
tempstr = tempstr.replace("mp4", "m4a")
- tempstr = tempstr.replace("mpd", "dash")
+ tempstr = tempstr.replace("mp4a", "m4a")
try:
return cls(tempstr)
except ValueError:
# expires: timestamp this streamdetails expire
expires: float = time() + 3600
# data: provider specific data (not exposed externally)
+ # this info is for example used to pass details to the get_audio_stream
data: Any = None
- # direct: if the url/file is supported by ffmpeg directly, use direct stream
- direct: str | None = None
# can_seek: bool to indicate that the providers 'get_audio_stream' supports seeking of the item
can_seek: bool = True
content_type=ContentType.try_parse(url),
),
media_type=MediaType.ANNOUNCEMENT,
- direct=url,
data={"url": url, "use_pre_announce": use_pre_announce},
target_loudness=-10,
),
get_ffmpeg_args,
get_ffmpeg_stream,
get_player_filter_params,
- get_radio_stream,
parse_loudnorm,
strip_silence,
)
# collect all arguments for ffmpeg
filter_params = []
- extra_args = []
- seek_pos = (
- streamdetails.seek_position
- if (streamdetails.direct or not streamdetails.can_seek)
- else 0
- )
- if seek_pos:
- # only use ffmpeg seeking if the provider stream does not support seeking
- extra_args += ["-ss", str(seek_pos)]
if streamdetails.target_loudness is not None:
# add loudnorm filters
filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=11:TP=-2"
if streamdetails.fade_in:
filter_params.append("afade=type=in:start_time=0:duration=3")
- if is_radio and streamdetails.direct and streamdetails.direct.startswith("http"):
- # ensure we use the radio streamer for radio items
- audio_source_iterator = get_radio_stream(self.mass, streamdetails.direct, streamdetails)
- input_path = "-"
- elif streamdetails.direct:
- audio_source_iterator = None
- input_path = streamdetails.direct
- else:
- audio_source_iterator = self.mass.get_provider(streamdetails.provider).get_audio_stream(
- streamdetails,
- seek_position=streamdetails.seek_position if streamdetails.can_seek else 0,
- )
- input_path = "-"
-
+ audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
+ streamdetails,
+ seek_position=streamdetails.seek_position,
+ )
ffmpeg_args = get_ffmpeg_args(
input_format=streamdetails.audio_format,
output_format=pcm_format,
filter_params=filter_params,
- extra_args=extra_args,
- input_path=input_path,
+ input_path="-",
# loglevel info is needed for loudness measurement
loglevel="info",
+ # we criple ffmpeg a bit on purpose with the filter_threads
+ # option so it doesn't consume all cpu when calculating loudnorm
extra_input_args=["-filter_threads", "1"],
)
stderr_data = ""
async for line in ffmpeg_proc.iter_stderr():
line = line.decode().strip() # noqa: PLW2901
- # if streamdetails contenttype is uinknown, try pars eit from the ffmpeg log output
+ if not line:
+ continue
+ logger.log(VERBOSE_LOG_LEVEL, line)
+ # if streamdetails contenttype is unknown, try parse it from the ffmpeg log output
# this has no actual usecase, other than displaying the correct codec in the UI
if (
streamdetails.audio_format.content_type == ContentType.UNKNOWN
stderr_data += line
elif "HTTP error" in line:
logger.warning(line)
- elif line:
- logger.log(VERBOSE_LOG_LEVEL, line)
del line
# if we reach this point, the process is finished (finish or aborted)
async with AsyncProcess(
ffmpeg_args,
- enable_stdin=audio_source_iterator is not None,
+ enable_stdin=True,
enable_stderr=True,
- custom_stdin=audio_source_iterator,
+ custom_stdin=audio_source,
name="ffmpeg_media_stream",
) as ffmpeg_proc:
state_data = {"finished": asyncio.Event(), "bytes_sent": 0}
logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata)
-async def get_hls_stream_org(
- mass: MusicAssistant, url: str, streamdetails: StreamDetails
-) -> AsyncGenerator[bytes, None]:
- """Get audio stream from HTTP HLS stream."""
- timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
- # fetch master playlist and select (best) child playlist
- # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
- async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp:
- charset = resp.charset or "utf-8"
- master_m3u_data = await resp.text(charset)
- substreams = parse_m3u(master_m3u_data)
- if any(x for x in substreams if x.path.endswith(".ts")) or not all(
- x for x in substreams if x.stream_info is not None
- ):
- # the url we got is already a substream
- substream_url = url
- else:
- # sort substreams on best quality (highest bandwidth)
- substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
- substream = substreams[0]
- substream_url = substream.path
- if not substream_url.startswith("http"):
- # path is relative, stitch it together
- base_path = url.rsplit("/", 1)[0]
- substream_url = base_path + "/" + substream.path
-
- async def watch_metadata():
- # ffmpeg is not (yet?) able to handle metadata updates that is provided
- # in the substream playlist and/or the ID3 metadata
- # so we do that here in a separate task.
- # this also gets the basic
- prev_chunk = ""
- while True:
- async with mass.http_session.get(
- substream_url, headers=VLC_HEADERS, timeout=timeout
- ) as resp:
- charset = resp.charset or "utf-8"
- substream_m3u_data = await resp.text(charset)
- # get chunk-parts from the substream
- hls_chunks = parse_m3u(substream_m3u_data)
- metadata_found = False
- for chunk_item in hls_chunks:
- if chunk_item.path == prev_chunk:
- continue
- chunk_item_url = chunk_item.path
- if not chunk_item_url.startswith("http"):
- # path is relative, stitch it together
- base_path = substream_url.rsplit("/", 1)[0]
- chunk_item_url = base_path + "/" + chunk_item.path
- if chunk_item.title and chunk_item.title != "no desc":
- streamdetails.stream_title = chunk_item.title
- metadata_found = True
- # prevent that we play this chunk again if we loop through
- prev_chunk = chunk_item.path
- if chunk_item.length and chunk_item.length.isnumeric():
- await asyncio.sleep(int(chunk_item.length))
- else:
- await asyncio.sleep(5)
- if not metadata_found:
- # this station does not provide metadata embedded in the HLS playlist
- return
-
- LOGGER.debug(
- "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url
- )
-
- if streamdetails.audio_format.content_type == ContentType.UNKNOWN:
- streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC)
-
- try:
- metadata_task = asyncio.create_task(watch_metadata())
- async for chunk in get_ffmpeg_stream(
- audio_input=substream_url,
- input_format=streamdetails.audio_format,
- # we need a self-explaining codec but not loose data from re-encoding
- output_format=AudioFormat(content_type=ContentType.FLAC),
- ):
- yield chunk
- finally:
- if metadata_task and not metadata_task.done():
- metadata_task.cancel()
-
-
async def get_http_stream(
mass: MusicAssistant,
url: str,
if seek_position:
assert streamdetails.duration, "Duration required for seek requests"
# try to get filesize with a head request
- if seek_position and not streamdetails.size:
- async with mass.http_session.head(url) as resp:
+ seek_supported = streamdetails.can_seek
+ if seek_position or not streamdetails.size:
+ async with mass.http_session.head(url, headers=VLC_HEADERS) as resp:
+ resp.raise_for_status()
if size := resp.headers.get("Content-Length"):
streamdetails.size = int(size)
+ seek_supported = resp.headers.get("Accept-Ranges") == "bytes"
# headers
- headers = {}
+ headers = {**VLC_HEADERS}
+ timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
skip_bytes = 0
if seek_position and streamdetails.size:
skip_bytes = int(streamdetails.size / streamdetails.duration * seek_position)
- headers["Range"] = f"bytes={skip_bytes}-"
+ headers["Range"] = f"bytes={skip_bytes}-{streamdetails.size}"
+
+ # seeking an unknown or container format is not supported due to the (moov) headers
+ if seek_position and (
+ not seek_supported
+ or streamdetails.audio_format.content_type
+ in (
+ ContentType.UNKNOWN,
+ ContentType.M4A,
+ ContentType.M4B,
+ )
+ ):
+ LOGGER.debug(
+ "Seeking in %s (%s) not possible, fallback to ffmpeg seeking.",
+ streamdetails.uri,
+ streamdetails.audio_format.output_format_str,
+ )
+ async for chunk in get_ffmpeg_stream(
+ url,
+ # we must set the input content type to unknown to
+ # enforce ffmpeg to determine it from the headers
+ input_format=AudioFormat(content_type=ContentType.UNKNOWN),
+ # enforce wav as we dont want to re-encode lossy formats
+ # choose wav so we have descriptive headers and move on
+ output_format=AudioFormat(content_type=ContentType.WAV),
+ extra_input_args=["-ss", str(seek_position)],
+ ):
+ yield chunk
+ return
# start the streaming from http
- buffer = b""
- buffer_all = False
bytes_received = 0
- timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
- async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp:
+ async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp:
is_partial = resp.status == 206
- buffer_all = seek_position and not is_partial
+ if seek_position and not is_partial:
+ raise InvalidDataError("HTTP source does not support seeking!")
+ resp.raise_for_status()
async for chunk in resp.content.iter_any():
bytes_received += len(chunk)
- if buffer_all and not skip_bytes:
- buffer += chunk
- continue
- if not is_partial and skip_bytes and bytes_received < skip_bytes:
- continue
yield chunk
# store size on streamdetails for later use
if not streamdetails.size:
streamdetails.size = bytes_received
- if buffer_all:
- skip_bytes = streamdetails.size / streamdetails.duration * seek_position
- yield buffer[:skip_bytes]
LOGGER.debug(
"Finished HTTP stream for %s (transferred %s/%s bytes)",
streamdetails.uri,
extra_args: list[str] | None = None,
chunk_size: int | None = None,
loglevel: str | None = None,
+ extra_input_args: list[str] | None = None,
) -> AsyncGenerator[bytes, None]:
"""
Get the ffmpeg audio stream as async generator.
input_path="-" if use_stdin else audio_input,
output_path="-",
loglevel=loglevel,
+ extra_input_args=extra_input_args or [],
)
async with AsyncProcess(
ffmpeg_args,
) -> AsyncGenerator[bytes, None]:
"""Create a 30 seconds preview audioclip for the given streamdetails."""
music_prov = mass.get_provider(provider_instance_id_or_domain)
-
streamdetails = await music_prov.get_stream_details(track_id)
-
- input_args = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "quiet",
- "-ignore_unknown",
- ]
- if streamdetails.direct:
- input_args += ["-ss", "30", "-i", streamdetails.direct]
- else:
- # the input is received from pipe/stdin
- if streamdetails.audio_format.content_type != ContentType.UNKNOWN:
- input_args += ["-f", streamdetails.audio_format.content_type]
- input_args += ["-i", "-"]
-
- output_args = ["-to", "30", "-f", "mp3", "-"]
- args = input_args + output_args
-
- writer_task: asyncio.Task | None = None
- ffmpeg_proc = AsyncProcess(args, enable_stdin=True, enable_stdout=True, enable_stderr=False)
- await ffmpeg_proc.start()
-
- async def writer() -> None:
- """Task that grabs the source audio and feeds it to ffmpeg."""
- music_prov = mass.get_provider(streamdetails.provider)
- async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30):
- await ffmpeg_proc.write(audio_chunk)
- # write eof when last packet is received
- await ffmpeg_proc.write_eof()
-
- if not streamdetails.direct:
- writer_task = asyncio.create_task(writer())
-
- # yield chunks from stdout
- try:
- async for chunk in ffmpeg_proc.iter_any():
- yield chunk
- finally:
- if writer_task and not writer_task.done():
- writer_task.cancel()
- await ffmpeg_proc.close()
+ async for chunk in get_ffmpeg_stream(
+ audio_input=music_prov.get_audio_stream(streamdetails, 30),
+ input_format=streamdetails.audio_format,
+ output_format=AudioFormat(content_type=ContentType.MP3),
+ extra_input_args=["-to", "30"],
+ ):
+ yield chunk
async def get_silence(
input_format.content_type.name.lower(),
"-f",
input_format.content_type.value,
+ "-i",
+ input_path,
]
- input_args += ["-i", input_path]
+ elif input_format.content_type == ContentType.UNKNOWN:
+ # let ffmpeg guess/auto detect the content type
+ input_args += ["-i", input_path]
+ else:
+ # use explicit format identifier for all other
+ input_args += ["-f", input_format.content_type.value, "-i", input_path]
# collect output args
if output_path.upper() == "NULL":
output_args = ["-f", "null", "-"]
- else:
+ elif output_format.content_type.is_pcm():
output_args = [
"-acodec",
output_format.content_type.name.lower(),
str(output_format.sample_rate),
output_path,
]
+ elif output_format.content_type == ContentType.UNKNOWN:
+ # use wav so we at least have some headers for the rest of the chain
+ output_args = ["-f", "wav", output_path]
+ else:
+ # use explicit format identifier for all other
+ output_args = ["-f", output_format.content_type.value, output_path]
# prefer libsoxr high quality resampler (if present) for sample rate conversions
if input_format.sample_rate != output_format.sample_rate and libsoxr_support:
self, streamdetails: StreamDetails, seek_position: int = 0
) -> AsyncGenerator[bytes, None]:
"""Return the audio stream for the provider item."""
- if streamdetails.direct is None:
- raise NotImplementedError
+ raise NotImplementedError
async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
"""Handle callback when an item completed streaming."""
media_type=MediaType.TRACK,
duration=library_item.duration,
size=file_item.file_size,
- direct=file_item.local_path,
+ data=file_item.local_path,
can_seek=prov_mapping.audio_format.content_type in SEEKABLE_FILES,
)
import plexapi.exceptions
import requests
-from aiohttp import ClientTimeout
from plexapi.audio import Album as PlexAlbum
from plexapi.audio import Artist as PlexArtist
from plexapi.audio import Playlist as PlexPlaylist
Track,
)
from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.server.helpers.audio import get_http_stream
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.helpers.tags import parse_tags
from music_assistant.server.models.music_provider import MusicProvider
)
if media_type != ContentType.M4A:
- stream_details.direct = self._plex_server.url(media_part.key, True)
+ stream_details.data = self._plex_server.url(media_part.key, True)
if audio_stream.samplingRate:
stream_details.audio_format.sample_rate = audio_stream.samplingRate
if audio_stream.bitDepth:
self, streamdetails: StreamDetails, seek_position: int = 0
) -> AsyncGenerator[bytes, None]:
"""Return the audio stream for the provider item."""
- url = streamdetails.data.getStreamURL(offset=seek_position)
-
- timeout = ClientTimeout(total=0, connect=30, sock_read=600)
- async with self.mass.http_session.get(url, timeout=timeout) as resp:
- async for chunk in resp.content.iter_any():
- yield chunk
+ if isinstance(streamdetails.data, str):
+ url = streamdetails.data
+ else:
+ url = streamdetails.data.getStreamURL(offset=seek_position)
+ async for chunk in get_http_stream(self.mass, url, streamdetails, 0):
+ yield chunk
async def get_myplex_account_and_refresh_token(self, auth_token: str) -> MyPlexAccount:
"""Get a MyPlexAccount object and refresh the token if needed."""
from music_assistant.server.helpers.app_vars import app_var
# pylint: enable=no-name-in-module
+from music_assistant.server.helpers.audio import get_http_stream
from music_assistant.server.models.music_provider import MusicProvider
if TYPE_CHECKING:
else:
msg = f"Unsupported mime type for {item_id}"
raise MediaNotFoundError(msg)
- # report playback started as soon as the streamdetails are requested
- self.mass.create_task(self._report_playback_started(streamdata))
return StreamDetails(
item_id=str(item_id),
provider=self.instance_id,
),
duration=streamdata["duration"],
data=streamdata, # we need these details for reporting playback
- expires=time.time() + 60, # may not be cached
- direct=streamdata["url"],
+ expires=time.time() + 300, # url expires very fast
)
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ # report playback started as soon as we start streaming
+ self.mass.create_task(self._report_playback_started(streamdetails.data))
+ async for chunk in get_http_stream(
+ self.mass, streamdetails.data["url"], streamdetails, seek_position
+ ):
+ yield chunk
+
async def _report_playback_started(self, streamdata: dict) -> None:
"""Report playback start to qobuz."""
# TODO: need to figure out if the streamed track is purchased by user
SearchResults,
)
from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.server.helpers.audio import get_radio_stream
from music_assistant.server.models.music_provider import MusicProvider
SUPPORTED_FEATURES = (ProviderFeature.SEARCH, ProviderFeature.BROWSE)
content_type=ContentType.try_parse(stream.codec),
),
media_type=MediaType.RADIO,
- direct=stream.url_resolved,
+ data=stream.url_resolved,
expires=time() + 3600,
)
+
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ # report playback started as soon as we start streaming
+ async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails, 0):
+ yield chunk
Track,
)
from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.server.helpers.audio import (
+ get_hls_stream,
+ get_http_stream,
+ resolve_radio_stream,
+)
from music_assistant.server.models.music_provider import MusicProvider
from .soundcloudpy.asyncsoundcloudpy import SoundcloudAsyncAPI
audio_format=AudioFormat(
content_type=ContentType.try_parse(stream_format),
),
- direct=url,
+ data=url,
)
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ resolved_url, _, is_hls = await resolve_radio_stream(self.mass, streamdetails.data)
+ if is_hls:
+ # some soundcloud streams are HLS, prefer the radio streamer
+ async for chunk in get_hls_stream(self.mass, resolved_url, streamdetails):
+ yield chunk
+ return
+ # regular stream from http
+ async for chunk in get_http_stream(self.mass, resolved_url, streamdetails, seek_position):
+ yield chunk
+
async def _parse_artist(self, artist_obj: dict) -> Artist:
"""Parse a Soundcloud user response to Artist model object."""
artist_id = None
Track,
)
from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.server.helpers.audio import get_http_stream
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.helpers.tags import AudioTags, parse_tags
from music_assistant.server.models.music_provider import MusicProvider
channels=media_info.channels,
),
duration=track.duration,
- direct=url,
+ data=url,
)
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ # report playback started as soon as we start streaming
+ async for chunk in get_http_stream(
+ self.mass, streamdetails.data, streamdetails, seek_position
+ ):
+ yield chunk
+
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get artist details for given artist id."""
tidal_session = await self._get_tidal_session()
)
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_USERNAME
+from music_assistant.server.helpers.audio import get_radio_stream
from music_assistant.server.helpers.tags import parse_tags
from music_assistant.server.models.music_provider import MusicProvider
content_type=ContentType.UNKNOWN,
),
media_type=MediaType.RADIO,
- direct=item_id,
+ data=item_id,
expires=time() + 3600,
)
stream_item_id, media_type = item_id.split("--", 1)
content_type=ContentType(stream["media_type"]),
),
media_type=MediaType.RADIO,
- direct=stream["url"],
+ data=stream["url"],
expires=time() + 3600,
)
msg = f"Unable to retrieve stream details for {item_id}"
raise MediaNotFoundError(msg)
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ # report playback started as soon as we start streaming
+ async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails, 0):
+ yield chunk
+
async def __get_data(self, endpoint: str, **kwargs):
"""Get data from api."""
if endpoint.startswith("http"):
bit_depth=media_info.bits_per_sample,
),
media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
- direct=item_id if is_radio else None,
- data=item_id,
+ data={"url": item_id},
)
async def get_audio_stream(
"""Return the audio stream for the provider item."""
if streamdetails.media_type == MediaType.RADIO:
# radio stream url
- async for chunk in get_radio_stream(self.mass, streamdetails.data, streamdetails):
+ async for chunk in get_radio_stream(
+ self.mass, streamdetails.data["url"], streamdetails
+ ):
yield chunk
elif os.path.isfile(streamdetails.data):
# local file
async for chunk in get_file_stream(
- self.mass, streamdetails.data, streamdetails, seek_position
+ self.mass, streamdetails.data["url"], streamdetails, seek_position
):
yield chunk
else:
# regular stream url (without icy meta)
async for chunk in get_http_stream(
- self.mass, streamdetails.data, streamdetails, seek_position
+ self.mass, streamdetails.data["url"], streamdetails, seek_position
):
yield chunk
from urllib.parse import unquote
import pytube
+from aiohttp import ClientResponseError
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
Track,
)
from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.server.helpers.audio import get_http_stream
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.models.music_provider import MusicProvider
audio_format=AudioFormat(
content_type=ContentType.try_parse(stream_format["mimeType"]),
),
- direct=url,
+ data=url,
)
if (
track_obj["streamingData"].get("expiresInSeconds")
stream_details.audio_format.sample_rate = int(stream_format.get("audioSampleRate"))
return stream_details
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ is_retry = False
+ while True:
+ try:
+ async for chunk in get_http_stream(
+ self.mass, streamdetails.data, streamdetails, seek_position
+ ):
+ yield chunk
+ return
+ except ClientResponseError as err:
+ if not is_retry and err.status == 403:
+ # cipher expired, get a fresh one
+ self.logger.warning("Cipher expired, trying to refresh...")
+ streamdetails = await self.get_stream_details(streamdetails.item_id)
+ continue
+ # raise for all other cases or we have already retried
+ raise
+
async def _post_data(self, endpoint: str, data: dict[str, str], **kwargs):
"""Post data to the given endpoint."""
await self._check_oauth_token()