import asyncio
import gc
import urllib.parse
+from time import time
from types import CoroutineType
from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional
from uuid import uuid4
crossfade_pcm_parts,
fadein_pcm_part,
get_chunksize,
- get_ffmpeg_args_for_pcm_stream,
get_media_stream,
get_preview_stream,
get_stream_details,
self.logger = self.queue.logger.getChild("stream")
self.expected_clients = expected_clients
self.connected_clients: Dict[str, CoroutineType[bytes]] = {}
- self._runner_task: Optional[asyncio.Task] = None
+ self.seconds_streamed = 0
+ self.streaming_started = 0
self.done = asyncio.Event()
self.all_clients_connected = asyncio.Event()
self.index_in_buffer = start_index
self.signal_next: bool = False
+ self._runner_task: Optional[asyncio.Task] = None
if autostart:
self.mass.create_task(self.start())
async def _queue_stream_runner(self) -> None:
"""Distribute audio chunks over connected client queues."""
- ffmpeg_args = await get_ffmpeg_args_for_pcm_stream(
- self.pcm_sample_rate,
- self.pcm_bit_depth,
- self.pcm_channels,
- output_format=self.output_format,
+ # collect ffmpeg args
+ input_format = ContentType.from_bit_depth(
+ self.pcm_bit_depth, self.pcm_floating_point
)
+ ffmpeg_args = [
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ "error",
+ "-ignore_unknown",
+ # pcm input args
+ "-f",
+ input_format.value,
+ "-ac",
+ str(self.pcm_channels),
+ "-ar",
+ str(self.pcm_sample_rate),
+ "-i",
+ "-",
+ # output args
+ "-f",
+ self.output_format.value,
+ "-compression_level",
+ "0",
+ "-",
+ ]
# get the raw pcm bytes from the queue stream and on the fly encode to wanted format
# send the compressed/encoded stream to the client(s).
chunk_size = get_chunksize(self.output_format)
+ sample_size = int(
+ self.pcm_sample_rate * (self.pcm_bit_depth / 8) * self.pcm_channels
+ )
async with AsyncProcess(ffmpeg_args, True, chunk_size) as ffmpeg_proc:
async def writer():
"""Task that sends the raw pcm audio to the ffmpeg process."""
async for audio_chunk in self._get_queue_stream():
await ffmpeg_proc.write(audio_chunk)
+ self.seconds_streamed += len(audio_chunk) / sample_size
del audio_chunk
+ # allow clients to only buffer max ~30 seconds ahead
+ seconds_allowed = int(time() - self.streaming_started) + 30
+ diff = self.seconds_streamed - seconds_allowed
+ if diff > 1:
+ self.logger.debug(
+ "Player is buffering %s seconds ahead, slowing it down",
+ diff,
+ )
+ await asyncio.sleep(10)
# write eof when last packet is received
ffmpeg_proc.write_eof()
# wait max 5 seconds for all client(s) to connect
try:
- await asyncio.wait_for(self.all_clients_connected.wait(), 5)
+ await asyncio.wait_for(self.all_clients_connected.wait(), 10)
except asyncio.exceptions.TimeoutError:
self.logger.warning(
- "Abort: client(s) did not connect within 5 seconds."
+ "Abort: client(s) did not connect within 10 seconds."
)
self.done.set()
return
self.logger.debug("%s clients connected", len(self.connected_clients))
+ self.streaming_started = time()
# Read bytes from final output and send chunk to child callback.
async for chunk in ffmpeg_proc.iterate_chunks():
prev_chunk = None
bytes_written = 0
# handle incoming audio chunks
- async for chunk in get_media_stream(
+ async for is_last_chunk, chunk in get_media_stream(
self.mass,
streamdetails,
- pcm_fmt,
- pcm_sample_rate=self.pcm_sample_rate,
+ pcm_fmt=pcm_fmt,
+ sample_rate=self.pcm_sample_rate,
+ channels=self.pcm_channels,
chunk_size=buffer_size,
seek_position=seek_position,
):
cur_chunk += 1
- is_last_chunk = len(chunk) < buffer_size
# HANDLE FIRST PART OF TRACK
if len(chunk) == 0 and bytes_written == 0 and is_last_chunk:
# stream error: got empy first chunk ?!
self.logger.warning("Stream error on %s", queue_track.uri)
+ elif cur_chunk == 1 and is_last_chunk:
+ # audio only has one single chunk (alert?)
+ bytes_written += len(chunk)
+ yield chunk
+ del chunk
elif cur_chunk == 1 and last_fadeout_data:
prev_chunk = chunk
del chunk
# with the previous chunk and this chunk
# and strip off silence
last_part = await strip_silence(
- prev_chunk + chunk, pcm_fmt, self.pcm_sample_rate, True
+ prev_chunk + chunk, pcm_fmt, self.pcm_sample_rate, reverse=True
)
if len(last_part) < buffer_size:
# part is too short after the strip action
fade_length: int,
fmt: ContentType,
sample_rate: int,
+ channels: int = 2,
) -> bytes:
"""Crossfade two chunks of pcm/raw audio using ffmpeg."""
fadeoutfile = create_tempfile()
async with aiofiles.open(fadeoutfile.name, "wb") as outfile:
await outfile.write(fade_out_part)
- # input args
- args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
- args += [
+ args = [
+ # generic args
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ "error",
+ # fadeout part (as file)
+ "-acodec",
+ fmt.name.lower(),
"-f",
fmt.value,
"-ac",
- "2",
+ str(channels),
"-ar",
str(sample_rate),
"-i",
fadeoutfile.name,
+ # fade_in part (stdin)
+ "-acodec",
+ fmt.name.lower(),
+ "-f",
+ fmt.value,
+ "-ac",
+ str(channels),
+ "-ar",
+ str(sample_rate),
+ "-i",
+ "-",
+ # filter args
+ "-filter_complex",
+ f"[0][1]acrossfade=d={fade_length}",
+ # output args
+ "-f",
+ fmt.value,
+ "-",
]
- args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"]
- # filter args
- args += ["-filter_complex", f"[0][1]acrossfade=d={fade_length}"]
- # output args
- args += ["-f", fmt.value, "-"]
async with AsyncProcess(args, True) as proc:
crossfade_data, _ = await proc.communicate(fade_in_part)
LOGGER.debug(
fade_length: int,
fmt: ContentType,
sample_rate: int,
+ channels: int = 2,
) -> bytes:
"""Fadein chunk of pcm/raw audio using ffmpeg."""
- # input args
- args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
- args += [
+ args = [
+ # generic args
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ "error",
+ # fade_in part (stdin)
+ "-acodec",
+ fmt.name.lower(),
"-f",
fmt.value,
"-ac",
- "2",
+ str(channels),
"-ar",
str(sample_rate),
"-i",
"-",
+ # filter args
+ "-af",
+ f"afade=type=in:start_time=0:duration={fade_length}",
+ # output args
+ "-f",
+ fmt.value,
+ "-",
]
- # filter args
- args += ["-af", f"afade=type=in:start_time=0:duration={fade_length}"]
- # output args
- args += ["-f", fmt.value, "-"]
async with AsyncProcess(args, True) as proc:
result_audio, _ = await proc.communicate(pcm_audio)
return result_audio
async def strip_silence(
- audio_data: bytes, fmt: ContentType, sample_rate: int, reverse=False
+ audio_data: bytes,
+ fmt: ContentType,
+ sample_rate: int,
+ channels: int = 2,
+ reverse=False,
) -> bytes:
"""Strip silence from (a chunk of) pcm audio."""
# input args
args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
- args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"]
+ args += [
+ "-acodec",
+ fmt.name.lower(),
+ "-f",
+ fmt.value,
+ "-ac",
+ str(channels),
+ "-ar",
+ str(sample_rate),
+ "-i",
+ "-",
+ ]
# filter args
if reverse:
args += ["-af", "areverse,silenceremove=1:0:-50dB:detection=peak,areverse"]
started = time()
proc_args = [
"ffmpeg",
- "-f",
- streamdetails.content_type.value,
"-i",
"-",
+ "-f",
+ streamdetails.content_type.value,
"-af",
"ebur128=framelog=verbose",
"-f",
return file.getvalue()
-async def get_ffmpeg_args(
- streamdetails: StreamDetails,
- output_format: Optional[ContentType] = None,
- pcm_sample_rate: Optional[int] = None,
- pcm_channels: int = 2,
-) -> List[str]:
- """Collect all args to send to the ffmpeg process."""
- input_format = streamdetails.content_type
- if output_format is None:
- output_format = input_format
-
- ffmpeg_present, libsoxr_support = await check_audio_support()
-
- if not ffmpeg_present:
- raise AudioError(
- "FFmpeg binary is missing from system."
- "Please install ffmpeg on your OS to enable playback.",
- )
- # collect input args
- input_args = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "error",
- "-ignore_unknown",
- ]
- if streamdetails.content_type != ContentType.UNKNOWN:
- input_args += ["-f", input_format.value]
- input_args += ["-i", "-"]
- # collect output args
- if output_format.is_pcm():
- output_args = [
- "-acodec",
- output_format.name.lower(),
- "-f",
- output_format.value,
- "-ac",
- str(pcm_channels),
- "-ar",
- str(pcm_sample_rate),
- "-",
- ]
- else:
- output_args = ["-f", output_format.value, "-"]
- # collect extra and filter args
- extra_args = []
- filter_params = []
- if streamdetails.gain_correct:
- filter_params.append(f"volume={streamdetails.gain_correct}dB")
- if (
- pcm_sample_rate is not None
- and streamdetails.sample_rate != pcm_sample_rate
- and libsoxr_support
- and streamdetails.media_type == MediaType.TRACK
- ):
- # prefer libsoxr high quality resampler (if present) for sample rate conversions
- filter_params.append("aresample=resampler=soxr")
- if filter_params:
- extra_args += ["-af", ",".join(filter_params)]
-
- if pcm_sample_rate is not None and not output_format.is_pcm():
- extra_args += ["-ar", str(pcm_sample_rate)]
-
- return input_args + extra_args + output_args
-
-
async def get_media_stream(
mass: MusicAssistant,
streamdetails: StreamDetails,
- output_format: Optional[ContentType] = None,
- pcm_sample_rate: Optional[int] = None,
+ pcm_fmt: ContentType,
+ sample_rate: int,
+ channels: int = 2,
chunk_size: Optional[int] = None,
seek_position: int = 0,
-) -> AsyncGenerator[bytes, None]:
- """Get the audio stream for the given streamdetails."""
-
- args = await get_ffmpeg_args(streamdetails, output_format, pcm_sample_rate)
+) -> AsyncGenerator[Tuple[bool, bytes], None]:
+ """Get the PCM audio stream for the given streamdetails."""
+ assert pcm_fmt.is_pcm(), "Output format must be a PCM type"
+ args = await _get_ffmpeg_args(
+ streamdetails, pcm_fmt, pcm_sample_rate=sample_rate, pcm_channels=channels
+ )
async with AsyncProcess(
args, enable_write=True, chunk_size=chunk_size
) as ffmpeg_proc:
ffmpeg_proc.attach_task(writer())
# yield chunks from stdout
+ # we keep 1 chunk behind to detect end of stream properly
try:
+ prev_chunk = b""
async for chunk in ffmpeg_proc.iterate_chunks():
- yield chunk
+ if prev_chunk:
+ yield (False, prev_chunk)
+ prev_chunk = chunk
+ # send last chunk
+ yield (True, prev_chunk)
except (asyncio.CancelledError, GeneratorExit) as err:
LOGGER.debug("media stream aborted for: %s", streamdetails.uri)
raise err
headers = {"Icy-MetaData": "1"}
while True:
# in loop to reconnect on connection failure
- LOGGER.debug("radio stream (re)connecting to: %s", url)
- async with mass.http_session.get(url, headers=headers) as resp:
- headers = resp.headers
- meta_int = int(headers.get("icy-metaint", "0"))
- # stream with ICY Metadata
- if meta_int:
- while True:
- audio_chunk = await resp.content.readexactly(meta_int)
- yield audio_chunk
- meta_byte = await resp.content.readexactly(1)
- meta_length = ord(meta_byte) * 16
- meta_data = await resp.content.readexactly(meta_length)
- if not meta_data:
- continue
- meta_data = meta_data.rstrip(b"\0")
- stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
- if not stream_title:
- continue
- stream_title = stream_title.group(1).decode()
- if stream_title != streamdetails.stream_title:
- streamdetails.stream_title = stream_title
- if queue := mass.players.get_player_queue(
- streamdetails.queue_id
- ):
- queue.signal_update()
- # Regular HTTP stream
- else:
- async for chunk in resp.content.iter_any():
- yield chunk
+ try:
+ LOGGER.debug("radio stream (re)connecting to: %s", url)
+ async with mass.http_session.get(url, headers=headers, timeout=60) as resp:
+ headers = resp.headers
+ meta_int = int(headers.get("icy-metaint", "0"))
+ # stream with ICY Metadata
+ if meta_int:
+ while True:
+ audio_chunk = await resp.content.readexactly(meta_int)
+ yield audio_chunk
+ meta_byte = await resp.content.readexactly(1)
+ meta_length = ord(meta_byte) * 16
+ meta_data = await resp.content.readexactly(meta_length)
+ if not meta_data:
+ continue
+ meta_data = meta_data.rstrip(b"\0")
+ stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
+ if not stream_title:
+ continue
+ stream_title = stream_title.group(1).decode()
+ if stream_title != streamdetails.stream_title:
+ streamdetails.stream_title = stream_title
+ if queue := mass.players.get_player_queue(
+ streamdetails.queue_id
+ ):
+ queue.signal_update()
+ # Regular HTTP stream
+ else:
+ async for chunk in resp.content.iter_any():
+ yield chunk
+ except asyncio.exceptions.TimeoutError:
+ pass
async def get_http_stream(
"""Get audio stream from HTTP."""
if seek_position:
assert streamdetails.duration, "Duration required for seek requests"
- chunk_size = get_chunksize(streamdetails.content_type)
# try to get filesize with a head request
if seek_position and not streamdetails.size:
async with mass.http_session.head(url) as resp:
async with mass.http_session.get(url, headers=headers) as resp:
is_partial = resp.status == 206
buffer_all = seek_position and not is_partial
- async for chunk in resp.content.iter_chunked(chunk_size):
+ async for chunk in resp.content.iter_any():
bytes_received += len(chunk)
if buffer_all and not skip_bytes:
buffer += chunk
return result
-async def get_ffmpeg_args_for_pcm_stream(
- sample_rate: int,
- bit_depth: int,
- channels: int,
- floating_point: bool = False,
- output_format: ContentType = ContentType.FLAC,
-) -> List[str]:
- """Collect args for ffmpeg when converting from raw pcm to another contenttype."""
- input_format = ContentType.from_bit_depth(bit_depth, floating_point)
- # collect input args
- input_args = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-ignore_unknown"]
- input_args += [
- "-f",
- input_format.value,
- "-ac",
- str(channels),
- "-ar",
- str(sample_rate),
- "-i",
- "-",
- ]
- # collect output args
- output_args = ["-f", output_format.value, "-"]
- return input_args + output_args
-
-
async def get_preview_stream(
mass: MusicAssistant,
provider_id: str,
):
return 64000
return 256000
+
+
+async def _get_ffmpeg_args(
+ streamdetails: StreamDetails,
+ pcm_output_format: ContentType,
+ pcm_sample_rate: int,
+ pcm_channels: int = 2,
+) -> List[str]:
+ """Collect all args to send to the ffmpeg process."""
+ input_format = streamdetails.content_type
+ assert pcm_output_format.is_pcm(), "Output format needs to be PCM"
+
+ ffmpeg_present, libsoxr_support = await check_audio_support()
+
+ if not ffmpeg_present:
+ raise AudioError(
+ "FFmpeg binary is missing from system."
+ "Please install ffmpeg on your OS to enable playback.",
+ )
+ # collect input args
+ input_args = [
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ "error",
+ "-ignore_unknown",
+ ]
+ if streamdetails.content_type != ContentType.UNKNOWN:
+ input_args += ["-f", input_format.value]
+ input_args += ["-i", "-"]
+ # collect output args
+ output_args = [
+ "-acodec",
+ pcm_output_format.name.lower(),
+ "-f",
+ pcm_output_format.value,
+ "-ac",
+ str(pcm_channels),
+ "-ar",
+ str(pcm_sample_rate),
+ "-",
+ ]
+ # collect extra and filter args
+ extra_args = []
+ filter_params = []
+ if streamdetails.gain_correct:
+ filter_params.append(f"volume={streamdetails.gain_correct}dB")
+ if (
+ streamdetails.sample_rate != pcm_sample_rate
+ and libsoxr_support
+ and streamdetails.media_type == MediaType.TRACK
+ ):
+ # prefer libsoxr high quality resampler (if present) for sample rate conversions
+ filter_params.append("aresample=resampler=soxr")
+ if filter_params:
+ extra_args += ["-af", ",".join(filter_params)]
+
+ return input_args + extra_args + output_args
await self.append(queue_items)
async def play_alert(
- self, uri: str, announce: bool = False, volume_adjust: int = 10
+ self, uri: str, announce: bool = False, gain_correct: int = 6
) -> str:
"""
Play given uri as Alert on the queue.
uri: Uri that should be played as announcement, can be Music Assistant URI or plain url.
announce: Prepend the (TTS) alert with a small announce sound.
- volume_adjust: Adjust the volume of the player by this percentage (relative).
+ gain_correct: Adjust the gain of the alert sound (in dB).
"""
if self._snapshot:
self.logger.debug("Ignore play_alert: already in progress")
if uri.startswith("http") or os.path.isfile(uri):
# a plain url was provided
queue_item = QueueItem.from_url(uri, "alert")
- queue_item.streamdetails.gain_correct = 6
+ queue_item.streamdetails.gain_correct = gain_correct
queue_items.append(queue_item)
else:
raise MediaNotFoundError(f"Invalid uri: {uri}") from err
- # append silence track, we use this to reliably detect when the alert is ready
- silence_url = self.mass.streams.get_silence_url(600)
- queue_item = QueueItem.from_url(silence_url, "alert")
- queue_items.append(queue_item)
-
# load queue with alert sound(s)
await self.load(queue_items)
# wait for the alert to finish playing
+ await self.stream.done.wait()
alert_done = asyncio.Event()
def handle_event(evt: MassEvent):
- if (
- self.current_item
- and self.current_item.uri == silence_url
- and self.elapsed_time
- ):
+ if self.player.state != PlayerState.PLAYING:
alert_done.set()
unsub = self.mass.subscribe(
- handle_event, EventType.QUEUE_TIME_UPDATED, self.queue_id
+ handle_event, EventType.QUEUE_UPDATED, self.queue_id
)
try:
- await asyncio.wait_for(alert_done.wait(), 120)
+ await asyncio.wait_for(alert_done.wait(), 30)
finally:
-
unsub()
# restore queue
await self.snapshot_restore()
async def queue_stream_start(
self, start_index: int, seek_position: int, fade_in: bool, passive: bool = False
- ) -> None:
+ ) -> QueueStream:
"""Start the queue stream runner."""
output_format = self._settings.stream_type
if self.player.use_multi_stream:
# execute the play command on the player(s)
if not passive:
await self.player.play_url(stream.url)
+ return stream
def get_next_index(self, cur_index: Optional[int]) -> int:
"""Return the next index for the queue, accounting for repeat settings."""