LOGGER.debug("Start analyzing track %s", streamdetails.uri)
# calculate BS.1770 R128 integrated loudness with ffmpeg
started = time()
+ input_file = streamdetails.direct or "-"
proc_args = [
"ffmpeg",
"-i",
- "-",
+ input_file,
"-f",
streamdetails.content_type.value,
"-af",
"-",
]
async with AsyncProcess(
- proc_args, True, enable_stdout=False, enable_stderr=True
+ proc_args,
+ enable_stdin=streamdetails.direct is None,
+ enable_stdout=False,
+ enable_stderr=True,
) as ffmpeg_proc:
async def writer():
break
ffmpeg_proc.write_eof()
- writer_task = ffmpeg_proc.attach_task(writer())
- # wait for the writer task to finish
- await writer_task
+ if streamdetails.direct is None:
+ writer_task = ffmpeg_proc.attach_task(writer())
+ # wait for the writer task to finish
+ await writer_task
_, stderr = await ffmpeg_proc.communicate()
try:
"""Get the PCM audio stream for the given streamdetails."""
assert pcm_fmt.is_pcm(), "Output format must be a PCM type"
args = await _get_ffmpeg_args(
- streamdetails, pcm_fmt, pcm_sample_rate=sample_rate, pcm_channels=channels
+ streamdetails,
+ pcm_fmt,
+ pcm_sample_rate=sample_rate,
+ pcm_channels=channels,
+ seek_position=seek_position,
)
- async with AsyncProcess(args, enable_stdin=True) as ffmpeg_proc:
+ async with AsyncProcess(
+ args, enable_stdin=streamdetails.direct is None
+ ) as ffmpeg_proc:
- LOGGER.debug(
- "start media stream for: %s, using args: %s", streamdetails.uri, str(args)
- )
+ LOGGER.debug("start media stream for: %s", streamdetails.uri)
async def writer():
"""Task that grabs the source audio and feeds it to ffmpeg."""
ffmpeg_proc.write_eof()
LOGGER.debug("writer finished for %s", streamdetails.uri)
- ffmpeg_proc.attach_task(writer())
+ if streamdetails.direct is None:
+ ffmpeg_proc.attach_task(writer())
# yield chunks from stdout
try:
streamdetails.item_id, streamdetails.provider
)
finally:
+ # report playback
+ if streamdetails.callback:
+ mass.create_task(streamdetails.callback, streamdetails)
# send analyze job to background worker
if streamdetails.loudness is None:
mass.add_job(
pcm_output_format: ContentType,
pcm_sample_rate: int,
pcm_channels: int = 2,
+ seek_position: int = 0,
) -> List[str]:
"""Collect all args to send to the ffmpeg process."""
input_format = streamdetails.content_type
"quiet",
"-ignore_unknown",
]
- if streamdetails.content_type != ContentType.UNKNOWN:
- input_args += ["-f", input_format.value]
- input_args += ["-i", "-"]
+ if streamdetails.direct:
+ # ffmpeg can access the inputfile (or url) directly
+ if seek_position:
+ input_args += ["-ss", str(seek_position)]
+ input_args += ["-i", streamdetails.direct]
+ else:
+ # the input is received from pipe/stdin
+ if streamdetails.content_type != ContentType.UNKNOWN:
+ input_args += ["-f", input_format.value]
+ input_args += ["-i", "-"]
+
# collect output args
output_args = [
"-acodec",
expires: float = time() + 3600
# data: provider specific data (not exposed externally)
data: Optional[Any] = None
+ # if the url/file is supported by ffmpeg directly, use direct stream
+ direct: Optional[str] = None
+ # callback: optional callback function (or coroutine) to call when the stream completes.
+ # needed for streaming provivders to report what is playing
+ # receives the streamdetails as only argument from which to grab
+ # details such as seconds_streamed.
+ callback: Any = None
# the fields below will be set/controlled by the streamcontroller
queue_id: Optional[str] = None
def __post_serialize__(self, d: Dict[Any, Any]) -> Dict[Any, Any]:
"""Exclude internal fields from dict."""
d.pop("data")
+ d.pop("direct")
d.pop("expires")
d.pop("queue_id")
+ d.pop("callback")
return d
def __str__(self):
from __future__ import annotations
import asyncio
+import os
import pathlib
import random
from asyncio import TimerHandle
.parent.resolve()
.joinpath("helpers/resources")
)
+
ALERT_ANNOUNCE_FILE = str(RESOURCES_DIR.joinpath("announce.flac"))
+if not os.path.isfile(ALERT_ANNOUNCE_FILE):
+ ALERT_ANNOUNCE_FILE = None
+
FALLBACK_DURATION = 172800 # if duration is None (e.g. radio stream) = 48 hours
media_type=MediaType.ANNOUNCEMENT,
loudness=0,
gain_correct=4,
- data=_url,
+ direct=_url,
),
media_type=MediaType.ANNOUNCEMENT,
)
queue_items = []
# prepend alert sound if needed
- if prepend_alert:
+ if prepend_alert and ALERT_ANNOUNCE_FILE:
queue_items.append(create_announcement(ALERT_ANNOUNCE_FILE))
queue_items.append(create_announcement(url))
from aiofiles.os import wrap
from aiofiles.threadpool.binary import AsyncFileIO
-from music_assistant.helpers.audio import get_file_stream
from music_assistant.helpers.compare import compare_strings
from music_assistant.helpers.tags import FALLBACK_ARTIST, parse_tags, split_items
from music_assistant.helpers.util import create_safe_string, parse_title_and_version
size=stat.st_size,
sample_rate=metadata.sample_rate,
bit_depth=metadata.bits_per_sample,
- data=itempath,
+ direct=itempath,
)
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- async for chunk in get_file_stream(
- self.mass, streamdetails.data, streamdetails, seek_position
- ):
- yield chunk
-
async def _parse_track(self, track_path: str) -> Track | None:
"""Try to parse a track from a filename by reading its tags."""
from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module
app_var,
)
-from music_assistant.helpers.audio import get_http_stream
from music_assistant.helpers.util import parse_title_and_version, try_parse_int
from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import LoginFailed, MediaNotFoundError
content_type = ContentType.FLAC
else:
raise MediaNotFoundError(f"Unsupported mime type for {item_id}")
+ # report playback started as soon as the streamdetails are requested
+ self.mass.create_task(self._report_playback_started(item_id, streamdata))
return StreamDetails(
item_id=str(item_id),
provider=self.type,
bit_depth=streamdata["bit_depth"],
data=streamdata, # we need these details for reporting playback
expires=time.time() + 1800, # not sure about the real allowed value
+ direct=streamdata["url"],
+ callback=self._report_playback_stopped,
)
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- self.mass.create_task(self._report_playback_started(streamdetails))
- bytes_sent = 0
- try:
- url = streamdetails.data["url"]
- async for chunk in get_http_stream(
- self.mass, url, streamdetails, seek_position
- ):
- yield chunk
- bytes_sent += len(chunk)
- finally:
- if bytes_sent:
- self.mass.create_task(
- self._report_playback_stopped(streamdetails, bytes_sent)
- )
-
- async def _report_playback_started(self, streamdetails: StreamDetails) -> None:
+ async def _report_playback_started(self, item_id: str, streamdata: dict) -> None:
"""Report playback start to qobuz."""
# TODO: need to figure out if the streamed track is purchased by user
# https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx
device_id = self._user_auth_info["user"]["device"]["id"]
credential_id = self._user_auth_info["user"]["credential"]["id"]
user_id = self._user_auth_info["user"]["id"]
- format_id = streamdetails.data["format_id"]
+ format_id = streamdata["format_id"]
timestamp = int(time.time())
events = [
{
"sample": False,
"intent": "stream",
"device_id": device_id,
- "track_id": str(streamdetails.item_id),
+ "track_id": str(item_id),
"purchase": False,
"date": timestamp,
"credential_id": credential_id,
]
await self._post_data("track/reportStreamingStart", data=events)
- async def _report_playback_stopped(
- self, streamdetails: StreamDetails, bytes_sent: int
- ) -> None:
+ async def _report_playback_stopped(self, streamdetails: StreamDetails) -> None:
"""Report playback stop to qobuz."""
user_id = self._user_auth_info["user"]["id"]
await self._get_data(
media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
sample_rate=media_info.sample_rate,
bit_depth=media_info.bits_per_sample,
- data=url,
+ direct=None if is_radio else url,
)
async def get_audio_stream(
import pytube
import ytmusicapi
-from music_assistant.helpers.audio import get_http_stream
from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import (
InvalidDataError,
return StreamDetails(
provider=self.type,
item_id=item_id,
- data=url,
content_type=ContentType.try_parse(stream_format["mimeType"]),
+ direct=url,
)
- async def get_audio_stream(
- self, streamdetails: StreamDetails, seek_position: int = 0
- ) -> AsyncGenerator[bytes, None]:
- """Return the audio stream for the provider item."""
- async for chunk in get_http_stream(
- self.mass, streamdetails.data, streamdetails, seek_position
- ):
- yield chunk
-
async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs):
url = f"{YTM_BASE_URL}{endpoint}"
data.update(self._context)