queue.queue_id, CONF_CROSSFADE_DURATION, 8
)
crossfade_size = int(pcm_sample_size * crossfade_duration)
- buffer_size = int(pcm_sample_size * 2) # 2 seconds
+ buffer_size = int(pcm_sample_size * 5) # 5 seconds
if use_crossfade:
buffer_size += crossfade_size
bytes_written = 0
buffer = b""
#### OTHER: enough data in buffer, feed to output
- else:
+ while len(buffer) > buffer_size:
yield buffer[:pcm_sample_size]
bytes_written += pcm_sample_size
buffer = buffer[pcm_sample_size:]
filter_params=filter_params,
extra_args=extra_args,
input_path=input_path,
- loglevel="info", # needed for loudness measurement
+ # loglevel info is needed for loudness measurement
+ loglevel="info",
)
async def log_reader(ffmpeg_proc: AsyncProcess, state_data: dict[str, Any]):
# we did not receive any data, somethinh wet wrong
# raise here to prevent an endless loop elsewhere
if state_data["bytes_sent"] == 0:
- raise AudioError("stream error on %s", streamdetails.uri)
+ raise AudioError(f"stream error on {streamdetails.uri}")
# all chunks received, strip silence of last part if needed and yield remaining bytes
if strip_silence_end and prev_chunk:
stdin=stdin if self._enable_stdin else None,
stdout=stdout if self._enable_stdout else None,
stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
- # setting the buffer limit is important to prevent exceeding the limit
- # when reading lines from stderr (e.g. with long running ffmpeg process)
- # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
- limit=1024 * 1024,
+ # setting the buffer limit somewhat high because we're working with large (PCM)
+ # audio chunks sent between (ffmpeg) processes. We'd rather consume a bit
+ # more memory than cpu cycles.
+ limit=1024000,
)
LOGGER.debug("Started %s with PID %s", self._name, self.proc.pid)
yield line
except ValueError as err:
# we're waiting for a line (separator found), but the line was too big
+ # this may happen with ffmpeg during a long (radio) stream where progress
+ # gets outputted to the stderr but no newline
+ # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
# NOTE: this consumes the line that was too big
if "chunk exceed the limit" in str(err):
continue
+ # raise for all other (value) errors
raise
async def _feed_stdin(self, custom_stdin: AsyncGenerator[bytes, None]) -> None:
from zeroconf.asyncio import AsyncServiceInfo
from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import get_ip_pton, select_free_port
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
CONF_VOLUME_START = "volume_start"
CONF_PASSWORD = "password"
+REQUIRED_BUFFER = int(44100 * (16 / 8) * 2) * 10 # 10 seconds
+
PLAYER_CONFIG_ENTRIES = (
CONF_ENTRY_CROSSFADE,
self._cliraop_proc: AsyncProcess | None = None
self._ffmpeg_proc: AsyncProcess | None = None
self._stop_requested = False
- self.buffer = asyncio.Queue(5)
@property
def running(self) -> bool:
if platform.system() == "Darwin":
os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
- # connect cliraop stdin with ffmpeg stdout using os pipes
- read, write = os.pipe()
-
# launch ffmpeg, feeding (player specific) audio chunks on stdout
# one could argue that the intermediate ffmpeg towards cliraop is not needed
# when there are no player specific filters or extras but in this case
input_format=self.input_format,
output_format=AIRPLAY_PCM_FORMAT,
filter_params=get_player_filter_params(self.mass, player_id),
+ loglevel="fatal",
)
-
- async def get_chunks() -> AsyncGenerator[bytes, None]:
- while True:
- chunk = await self.buffer.get()
- if chunk == b"EOF":
- break
- yield chunk
-
- # launch ffmpeg process with player specific settings
- # the stream_job_runner will start pushing pcm chunks to the stdin
- # the ffmpeg process will send the output directly to the stdin of cliraop
self._ffmpeg_proc = AsyncProcess(
ffmpeg_args,
enable_stdin=True,
enable_stdout=True,
enable_stderr=False,
- custom_stdin=get_chunks(),
- custom_stdout=write,
name="cliraop_ffmpeg",
)
await self._ffmpeg_proc.start()
enable_stdin=True,
enable_stdout=False,
enable_stderr=True,
- custom_stdin=read,
+ custom_stdin=self._audio_feeder(),
name="cliraop",
)
await self._cliraop_proc.start()
if self._cliraop_proc.closed and self._ffmpeg_proc.closed:
return
self._stop_requested = True
- empty_queue(self.buffer)
async def _stop() -> None:
# ffmpeg MUST be stopped before cliraop due to the chained pipes
if wait:
await task
+ async def write_chunk(self, chunk: bytes) -> None:
+ """Write a (pcm) audio chunk to ffmpeg."""
+ await self._ffmpeg_proc.write(chunk)
+
+ async def write_eof(self) -> None:
+ """Write EOF to the ffmpeg stdin."""
+ await self._ffmpeg_proc.write_eof()
+ await self._ffmpeg_proc.wait()
+ await self.stop()
+
async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLIRaop binary."""
if not self._cliraop_proc or self._cliraop_proc.closed:
if "lost packet out of backlog" in line:
lost_packets += 1
if lost_packets == 100:
- logger.warning("High packet loss detected, stopping playback...")
+ logger.error("High packet loss detected, stopping playback...")
await self.stop(False)
+ elif lost_packets % 10 == 0:
+ logger.warning("Packet loss detected!")
logger.log(VERBOSE_LOG_LEVEL, line)
progress = int(queue.corrected_elapsed_time)
await self.send_cli_command(f"PROGRESS={progress}\n")
+ async def _audio_feeder(self) -> AsyncGenerator[bytes, None]:
+ """Read chunks from ffmpeg and feed (buffered) to cliraop."""
+ buffer = b""
+ async for chunk in self._ffmpeg_proc.iter_any():
+ if self._stop_requested:
+ break
+ buffer += chunk
+ chunksize = len(chunk)
+ del chunk
+ while len(buffer) > REQUIRED_BUFFER:
+ yield buffer[:chunksize]
+ buffer = buffer[chunksize:]
+ # end of stream
+ if not self._stop_requested:
+ yield buffer
+ await self._cliraop_proc.write_eof()
+ del buffer
+
@dataclass
class AirPlayPlayer:
"""Handle streaming of audio to one or more airplay players."""
# Python is not suitable for realtime audio streaming so we do the actual streaming
# of (RAOP) audio using a small executable written in C based on libraop to do the actual
- # timestamped playback, whicj reads pcm audio from stdin
+ # timestamped playback, which reads pcm audio from stdin
# and we can send some interactive commands using a named pipe.
# get current ntp before we start
if airplay_player.active_stream.start_ntp != start_ntp:
# checksum mismatch
continue
- tg.create_task(airplay_player.active_stream.buffer.put(chunk))
+ tg.create_task(airplay_player.active_stream.write_chunk(chunk))
active_clients += 1
+
if active_clients == 0:
# no more clients
return
or airplay_player.active_stream.start_ntp != start_ntp
):
continue
- tg.create_task(airplay_player.active_stream.buffer.put(b"EOF"))
+ tg.create_task(airplay_player.active_stream.write_eof())
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.