"icy-name": "Streaming from Music Assistant",
"icy-pub": "0",
"Cache-Control": "no-cache",
- "icy-metaint": str(queue_stream.chunk_size),
+ "icy-metaint": str(queue_stream.output_chunk_size),
"contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000",
}
pcm_sample_rate = 41000
pcm_bit_depth = 16
pcm_channels = 2
- pcm_resample = True
+ allow_resample = True
elif queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
pcm_sample_rate = min(96000, queue.settings.max_sample_rate)
pcm_bit_depth = 24
pcm_channels = 2
- pcm_resample = True
+ allow_resample = True
elif streamdetails.sample_rate > queue.settings.max_sample_rate:
pcm_sample_rate = queue.settings.max_sample_rate
pcm_bit_depth = streamdetails.bit_depth
pcm_channels = streamdetails.channels
- pcm_resample = True
+ allow_resample = True
else:
pcm_sample_rate = streamdetails.sample_rate
pcm_bit_depth = streamdetails.bit_depth
pcm_channels = streamdetails.channels
- pcm_resample = False
+ allow_resample = False
self.queue_streams[stream_id] = stream = QueueStream(
queue=queue,
pcm_sample_rate=pcm_sample_rate,
pcm_bit_depth=pcm_bit_depth,
pcm_channels=pcm_channels,
- pcm_resample=pcm_resample,
+ allow_resample=allow_resample,
is_alert=is_alert,
autostart=True,
)
pcm_bit_depth: int,
pcm_channels: int = 2,
pcm_floating_point: bool = False,
- pcm_resample: bool = False,
+ allow_resample: bool = False,
is_alert: bool = False,
autostart: bool = False,
):
self.pcm_bit_depth = pcm_bit_depth
self.pcm_channels = pcm_channels
self.pcm_floating_point = pcm_floating_point
- self.pcm_resample = pcm_resample
+ self.allow_resample = allow_resample
self.is_alert = is_alert
self.url = queue.mass.streams.get_stream_url(stream_id, output_format)
self.all_clients_connected = asyncio.Event()
self.index_in_buffer = start_index
self.signal_next: bool = False
- self.chunk_size = get_chunksize(
- output_format,
- self.pcm_sample_rate,
- self.pcm_bit_depth,
- self.pcm_channels,
- )
self._runner_task: Optional[asyncio.Task] = None
self._prev_chunk: bytes = b""
+ self.output_chunk_size = get_chunksize(
+ output_format,
+ pcm_sample_rate,
+ pcm_bit_depth,
+ pcm_channels,
+ )
if autostart:
self.mass.create_task(self.start())
self._runner_task = None
self.connected_clients = {}
- self._prev_chunk = b""
# run garbage collection manually due to the high number of
# processed bytes blocks
self.logger.debug("client connected: %s", client_id)
if len(self.connected_clients) == self.expected_clients:
self.all_clients_connected.set()
-
- # workaround for reconnecting clients (such as kodi)
- # send the previous chunk if we have one
- if self._prev_chunk:
- await callback(self._prev_chunk)
try:
await self.done.wait()
finally:
await self._check_stop()
async def _queue_stream_runner(self) -> None:
- """Distribute audio chunks over connected client queues."""
+ """Distribute audio chunks over connected client(s)."""
# collect ffmpeg args
input_format = ContentType.from_bit_depth(
self.pcm_bit_depth, self.pcm_floating_point
"ffmpeg",
"-hide_banner",
"-loglevel",
- "error",
+ "quiet",
"-ignore_unknown",
# pcm input args
"-f",
]
# get the raw pcm bytes from the queue stream and on the fly encode to wanted format
# send the compressed/encoded stream to the client(s).
- async with AsyncProcess(ffmpeg_args, True, self.chunk_size) as ffmpeg_proc:
+ async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc:
async def writer():
"""Task that sends the raw pcm audio to the ffmpeg process."""
async for audio_chunk in self._get_queue_stream():
await ffmpeg_proc.write(audio_chunk)
- del audio_chunk
# write eof when last packet is received
ffmpeg_proc.write_eof()
# Read bytes from final output and send chunk to child callback.
chunk_num = 0
- async for chunk in ffmpeg_proc.iterate_chunks():
+ async for chunk in ffmpeg_proc.iter_chunked(self.output_chunk_size):
chunk_num += 1
if len(self.connected_clients) == 0:
):
self.connected_clients.pop(client_id, None)
- # back off a bit after first chunk to handle reconnecting clients (e.g. kodi)
- if chunk_num == 1:
- await asyncio.sleep(0.5)
-
# complete queue streamed
if self.signal_next:
# the queue stream was aborted (e.g. because of sample rate mismatch)
continue
# check the PCM samplerate/bitrate
- if not self.pcm_resample and streamdetails.bit_depth > self.pcm_bit_depth:
+ if not self.allow_resample and streamdetails.bit_depth > self.pcm_bit_depth:
self.signal_next = True
self.logger.debug(
"Abort queue stream %s due to bit depth mismatch",
)
break
if (
- not self.pcm_resample
+ not self.allow_resample
and streamdetails.sample_rate > self.pcm_sample_rate
and streamdetails.sample_rate <= self.queue.settings.max_sample_rate
):
pcm_fmt=pcm_fmt,
sample_rate=self.pcm_sample_rate,
channels=self.pcm_channels,
- chunk_size=sample_size,
seek_position=seek_position,
):
chunk_count += 1
if queue_track.duration is None or queue_track.duration < 30:
bytes_written += len(chunk)
yield chunk
- del chunk
continue
# first part of track and we need to (cross)fade: fill buffer
if bytes_written < buf_size and (last_fadeout_part or fade_in):
bytes_written += len(chunk)
buffer += chunk
- del chunk
continue
# last part of track: fill buffer
if bytes_written >= (total_size - buf_size):
bytes_written += len(chunk)
buffer += chunk
- del chunk
continue
# buffer full for fade-in / crossfade
# send crossfade_part
yield crossfade_part
bytes_written += len(crossfade_part)
- del crossfade_part
- del fadein_part
# also write the leftover bytes from the strip action
if remaining_bytes:
yield remaining_bytes
bytes_written += len(remaining_bytes)
- del remaining_bytes
else:
# fade-in
fadein_part = await fadein_pcm_part(
)
yield fadein_part
bytes_written += len(fadein_part)
- del fadein_part
# clear vars
last_fadeout_part = b""
- del first_part
- del chunk
buffer = b""
continue
# all other: middle of track or no fade actions, just yield the audio
bytes_written += len(chunk)
yield chunk
- del chunk
continue
#### HANDLE END OF TRACK
- # strip silence from end of audio
- last_part = await strip_silence(
- buffer, pcm_fmt, self.pcm_sample_rate, reverse=True
- )
+ if buffer:
+ # strip silence from end of audio
+ last_part = await strip_silence(
+ buffer, pcm_fmt, self.pcm_sample_rate, reverse=True
+ )
- # handle crossfading support
- # store fade section to be picked up for next track
-
- if use_crossfade:
- # crossfade is enabled, save fadeout part to pickup for next track
- last_part = last_part[-crossfade_size:]
- remaining_bytes = last_part[:-crossfade_size]
- # yield remaining bytes
- bytes_written += len(remaining_bytes)
- yield remaining_bytes
- last_fadeout_part = last_part
- del remaining_bytes
- else:
- # no crossfade enabled, just yield the stripped audio data
- bytes_written += len(last_part)
- yield last_part
- del last_part
+ # handle crossfading support
+ # store fade section to be picked up for next track
+
+ if use_crossfade:
+ # crossfade is enabled, save fadeout part to pickup for next track
+ last_part = last_part[-crossfade_size:]
+ remaining_bytes = last_part[:-crossfade_size]
+ # yield remaining bytes
+ bytes_written += len(remaining_bytes)
+ yield remaining_bytes
+ last_fadeout_part = last_part
+ else:
+ # no crossfade enabled, just yield the stripped audio data
+ bytes_written += len(last_part)
+ yield last_part
# end of the track reached
queue_track.streamdetails.seconds_streamed = bytes_written / sample_size
self.queue.player.name,
)
# end of queue reached, pass last fadeout bits to final output
- yield last_fadeout_part
- del last_fadeout_part
+ if last_fadeout_part:
+ yield last_fadeout_part
# END OF QUEUE STREAM
self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name)
import struct
from io import BytesIO
from time import time
-from typing import TYPE_CHECKING, AsyncGenerator, List, Optional, Tuple
+from typing import TYPE_CHECKING, AsyncGenerator, List, Tuple
import aiofiles
from aiohttp import ClientError, ClientTimeout
"ffmpeg",
"-hide_banner",
"-loglevel",
- "error",
+ "quiet",
# fadeout part (as file)
"-acodec",
fmt.name.lower(),
"ffmpeg",
"-hide_banner",
"-loglevel",
- "error",
+ "quiet",
# fade_in part (stdin)
"-acodec",
fmt.name.lower(),
) -> bytes:
"""Strip silence from (a chunk of) pcm audio."""
# input args
- args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
+ args = ["ffmpeg", "-hide_banner", "-loglevel", "quiet"]
args += [
"-acodec",
fmt.name.lower(),
pcm_fmt: ContentType,
sample_rate: int,
channels: int = 2,
- chunk_size: Optional[int] = None,
seek_position: int = 0,
) -> AsyncGenerator[bytes, None]:
"""Get the PCM audio stream for the given streamdetails."""
args = await _get_ffmpeg_args(
streamdetails, pcm_fmt, pcm_sample_rate=sample_rate, pcm_channels=channels
)
- async with AsyncProcess(
- args, enable_stdin=True, chunk_size=chunk_size
- ) as ffmpeg_proc:
+ async with AsyncProcess(args, enable_stdin=True) as ffmpeg_proc:
LOGGER.debug(
"start media stream for: %s, using args: %s", streamdetails.uri, str(args)
ffmpeg_proc.attach_task(writer())
# yield chunks from stdout
+ sample_size = get_chunksize(pcm_fmt, sample_rate, 24, channels, 10)
try:
- async for chunk in ffmpeg_proc.iterate_chunks():
+ async for chunk in ffmpeg_proc.iter_any(sample_size):
yield chunk
except (asyncio.CancelledError, GeneratorExit) as err:
if buffer_all:
skip_bytes = streamdetails.size / streamdetails.duration * seek_position
yield buffer[:skip_bytes]
- del buffer
async def get_file_stream(
"ffmpeg",
"-hide_banner",
"-loglevel",
- "error",
+ "quiet",
"-f",
streamdetails.content_type.value,
"-i",
ffmpeg_proc.attach_task(writer())
# yield chunks from stdout
- async for chunk in ffmpeg_proc.iterate_chunks():
+ async for chunk in ffmpeg_proc.iter_any():
yield chunk
"ffmpeg",
"-hide_banner",
"-loglevel",
- "error",
+ "quiet",
"-f",
"lavfi",
"-i",
output_fmt.value,
"-",
]
- chunk_size = get_chunksize(output_fmt)
- async with AsyncProcess(args, chunk_size=chunk_size) as ffmpeg_proc:
- async for chunk in ffmpeg_proc.iterate_chunks():
+ async with AsyncProcess(args) as ffmpeg_proc:
+ async for chunk in ffmpeg_proc.iter_any():
yield chunk
pcm_size = int(sample_rate * (bit_depth / 8) * channels * seconds)
if content_type.is_pcm() or content_type == ContentType.WAV:
return pcm_size
- if content_type == ContentType.FLAC:
- return int(pcm_size * 0.61)
- if content_type == ContentType.WAVPACK:
- return int(pcm_size * 0.60)
- if content_type in (
- ContentType.AAC,
- ContentType.M4A,
- ):
- return int(256000 * seconds)
- if content_type in (
- ContentType.MP3,
- ContentType.OGG,
- ):
- return int(320000 * seconds)
- return 256000
+ if content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF):
+ return pcm_size
+ if content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC):
+ return int(pcm_size * 0.6)
+ if content_type in (ContentType.MP3, ContentType.OGG, ContentType.M4A):
+ return int(640000 * seconds)
+ return 32000 * seconds
async def _get_ffmpeg_args(
"ffmpeg",
"-hide_banner",
"-loglevel",
- "error",
+ "quiet",
"-ignore_unknown",
]
if streamdetails.content_type != ContentType.UNKNOWN:
DEFAULT_CHUNKSIZE = 128000
DEFAULT_TIMEOUT = 120
+# pylint: disable=invalid-name
+
class AsyncProcess:
"""Implementation of a (truly) non blocking subprocess."""
self,
args: Union[List, str],
enable_stdin: bool = False,
- chunk_size: int = DEFAULT_CHUNKSIZE,
enable_stdout: bool = True,
enable_stderr: bool = False,
):
self._proc = None
self._args = args
self._enable_stdin = enable_stdin
- self.chunk_size = chunk_size or DEFAULT_CHUNKSIZE
self._enable_stdout = enable_stdout
self._enable_stderr = enable_stderr
self._attached_task: asyncio.Task = None
stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
- limit=64000000,
+ limit=32 * 1024 * 1024,
close_fds=True,
)
else:
stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
- limit=64000000,
+ limit=32 * 1024 * 102,
close_fds=True,
)
return self
# just in case?
self._proc.kill()
- async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
- """Yield chunks from the process stdout. Generator."""
+ async def iter_chunked(
+ self, n: int = DEFAULT_CHUNKSIZE
+ ) -> AsyncGenerator[bytes, None]:
+ """Yield chunks of n size from the process stdout."""
while True:
- chunk = await self._read_chunk()
+ chunk = await self.readexactly(n)
yield chunk
- if len(chunk) < self.chunk_size:
- del chunk
+ if len(chunk) < n:
break
- del chunk
- async def _read_chunk(self, timeout: int = DEFAULT_TIMEOUT) -> bytes:
- """Read chunk_size bytes from the process stdout."""
+ async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
+ """Yield chunks as they come in from process stdout."""
+ while True:
+ chunk = await self._proc.stdout.read(n)
+ if chunk == b"":
+ break
+ yield chunk
+
+ async def readexactly(self, n: int, timeout: int = DEFAULT_TIMEOUT) -> bytes:
+ """Read exactly n bytes from the process stdout (or less if eof)."""
if self.closed:
return b""
try:
async with _timeout(timeout):
- return await self._proc.stdout.readexactly(self.chunk_size)
+ return await self._proc.stdout.readexactly(n)
except asyncio.IncompleteReadError as err:
return err.partial
except asyncio.TimeoutError:
return b""
+ async def read(self, n: int, timeout: int = DEFAULT_TIMEOUT) -> bytes:
+ """
+ Read up to n bytes from the stdout stream.
+
+ If n is positive, this function try to read n bytes,
+ and may return less or equal bytes than requested, but at least one byte.
+ If EOF was received before any byte is read, this function returns empty byte object.
+ """
+ if self.closed:
+ return b""
+ try:
+ async with _timeout(timeout):
+ return await self._proc.stdout.read(n)
+ except asyncio.TimeoutError:
+ return b""
+
async def write(self, data: bytes) -> None:
"""Write data to process stdin."""
if self.closed: