import time
import urllib.parse
from collections.abc import AsyncGenerator
-from contextlib import asynccontextmanager
from typing import TYPE_CHECKING
import shortuuid
from aiohttp import web
-from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool
+from music_assistant.common.helpers.util import (
+ empty_queue,
+ get_ip,
+ select_free_port,
+ try_parse_bool,
+)
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueOption,
The whole idea here is that the (pcm) audio source can be sent to multiple
players at once. For example for (slimproto/airplay) syncgroups and universal group.
- All client players receive the exact same audio chunks from the source audio,
- then encoded and/or resampled to the player's preferences.
+ All client players receive the exact same PCM audio chunks from the source audio,
+ which then can be optionally encoded and/or resampled to the player's preferences.
In case a stream is restarted (e.g. when seeking),
a new QueueStreamJob will be created.
"""
self.mass = mass
self.pcm_audio_source = pcm_audio_source
self.pcm_format = pcm_format
- self.auto_start = auto_start
self.expected_players: set[str] = set()
self.job_id = shortuuid.uuid()
self.bytes_streamed: int = 0
- self.logger = self.mass.streams.logger.getChild(f"stream_job.{self.job_id}")
- self._subscribed_players: dict[str, AsyncProcess] = {}
+ self.logger = self.mass.streams.logger
+ self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {}
self._finished = False
- self._running = False
- self.allow_start = asyncio.Event()
+ self.allow_start = auto_start
+ self._all_clients_connected = asyncio.Event()
self._audio_task = asyncio.create_task(self._stream_job_runner())
@property
def finished(self) -> bool:
"""Return if this StreamJob is finished."""
- return self._finished or self._audio_task and self._audio_task.done()
+ return self._finished or (self._audio_task and self._audio_task.done())
@property
def pending(self) -> bool:
"""Return if this Job is pending start."""
- return not self.finished and not self.running
+ return not self.finished and not self._all_clients_connected.is_set()
@property
def running(self) -> bool:
"""Return if this Job is running."""
- return self._running and self._audio_task and not self._audio_task.done()
+ return (
+ self._all_clients_connected.is_set()
+ and self._audio_task
+ and not self._audio_task.done()
+ )
def start(self) -> None:
"""Start running (send audio chunks to connected players)."""
if self.finished:
raise RuntimeError("Task is already finished")
- self.allow_start.set()
+ self.allow_start = True
+ if self.expected_players and len(self.subscribed_players) >= len(self.expected_players):
+ self._all_clients_connected.set()
def stop(self) -> None:
"""Stop running this job."""
if self._audio_task and not self._audio_task.done():
self._audio_task.cancel()
+ if not self._finished:
+ # we need to make sure that we close the async generator
+ task = asyncio.create_task(self.pcm_audio_source.__anext__())
+ task.cancel()
self._finished = True
+ for sub_queue in self.subscribed_players.values():
+ empty_queue(sub_queue)
def resolve_stream_url(self, player_id: str, output_codec: ContentType) -> str:
"""Resolve the childplayer specific stream URL to this streamjob."""
self, player_id: str, output_format: AudioFormat, chunk_size: int | None = None
) -> AsyncGenerator[bytes, None]:
"""Subscribe consumer and iterate player-specific audio."""
- ffmpeg_args = get_ffmpeg_args(
+ async for chunk in get_ffmpeg_stream(
+ audio_input=self.subscribe(player_id),
input_format=self.pcm_format,
output_format=output_format,
filter_params=get_player_filter_params(self.mass, player_id),
- extra_args=[],
- input_path="-",
- output_path="-",
- loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
- )
- # launch ffmpeg process with player specific settings
- # the stream_job_runner will start pushing pcm chunks to the stdin
- # we then read the players-specific (encoded) output chunks
- # from ffmpeg stdout and yield them
- async with AsyncProcess(
- ffmpeg_args, enable_stdin=True, enable_stdout=True, enable_stderr=False
- ) as ffmpeg_proc, self.subscribe(player_id, ffmpeg_proc):
- # read final chunks from ffmpeg's stdout
- iterator = (
- ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
- )
- async for chunk in iterator:
- try:
- yield chunk
- except (BrokenPipeError, ConnectionResetError):
- # race condition?
- break
+ chunk_size=chunk_size,
+ ):
+ yield chunk
async def stream_to_custom_output_path(
- self, player_id: str, output_format: AudioFormat, output_path: str
+ self, player_id: str, output_format: AudioFormat, output_path: str | int
) -> None:
"""Subscribe consumer and instruct ffmpeg to send the audio to the given output path."""
+ custom_file_pointer = isinstance(output_path, int)
ffmpeg_args = get_ffmpeg_args(
input_format=self.pcm_format,
output_format=output_format,
filter_params=get_player_filter_params(self.mass, player_id),
extra_args=[],
input_path="-",
- output_path=output_path,
+ output_path="-" if custom_file_pointer else output_path,
loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
)
# launch ffmpeg process with player specific settings
async with AsyncProcess(
ffmpeg_args,
enable_stdin=True,
- enable_stdout=False,
+ enable_stdout=custom_file_pointer,
enable_stderr=False,
- ) as ffmpeg_proc, self.subscribe(player_id, ffmpeg_proc):
+ custom_stdin=self.subscribe(player_id),
+ custom_stdout=output_path if custom_file_pointer else None,
+ name="ffmpeg_custom_output_path",
+ ) as ffmpeg_proc:
# we simply wait for the process to exit
await ffmpeg_proc.wait()
- @asynccontextmanager
- async def subscribe(
- self, player_id: str, ffmpeg_proc: AsyncProcess
- ) -> AsyncGenerator[QueueStreamJob]:
- """Subscribe consumer's (output) ffmpeg process."""
- if self.running:
- # client subscribes while we're already started
- # that will probably cause side effects but let it go
- self.logger.warning(
- "Player %s is joining while the stream is already started!", player_id
- )
+ async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]:
+ """Subscribe consumer and iterate incoming chunks on the queue."""
try:
- self._subscribed_players[player_id] = ffmpeg_proc
- self.logger.debug("Subscribed player %s", player_id)
- if self.auto_start and len(self._subscribed_players) == len(self.expected_players):
- self.allow_start.set()
- yield self
+ self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2)
+
+ if self._all_clients_connected.is_set():
+ # client subscribes while we're already started
+ self.logger.warning(
+ "Client %s is joining while the stream is already started", player_id
+ )
+
+ self.logger.debug("Subscribed client %s", player_id)
+
+ if (
+ self.expected_players
+ and self.allow_start
+ and len(self.subscribed_players) == len(self.expected_players)
+ ):
+ # we reached the number of expected subscribers, set event
+ # so that chunks can be pushed
+ self._all_clients_connected.set()
+
+ # keep reading audio chunks from the queue until we receive an empty one
+ while True:
+ chunk = await sub_queue.get()
+ if chunk == b"":
+ # EOF chunk received
+ break
+ yield chunk
finally:
- self._subscribed_players.pop(player_id, None)
+ self.subscribed_players.pop(player_id, None)
self.logger.debug("Unsubscribed client %s", player_id)
# check if this was the last subscriber and we should cancel
- await asyncio.sleep(5)
- if len(self._subscribed_players) == 0 and not self.finished:
+ await asyncio.sleep(2)
+ if len(self.subscribed_players) == 0 and self._audio_task and not self.finished:
self.logger.debug("Cleaning up, all clients disappeared...")
self.stop()
+ async def _put_chunk(self, chunk: bytes) -> None:
+ """Put chunk of data to all subscribers."""
+ async with asyncio.TaskGroup() as tg:
+ for sub_queue in list(self.subscribed_players.values()):
+ # put this chunk on the player's subqueue
+ tg.create_task(sub_queue.put(chunk))
+ self.bytes_streamed += len(chunk)
+
async def _stream_job_runner(self) -> None:
"""Feed audio chunks to StreamJob subscribers."""
- await self.allow_start.wait()
- retries = 50
- while retries:
- retries -= 1
- await asyncio.sleep(0.1)
- if len(self._subscribed_players) != len(self.expected_players):
- continue
- await asyncio.sleep(0.2)
- if len(self._subscribed_players) != len(self.expected_players):
- continue
- break
-
- self.logger.debug(
- "Starting stream job %s with %s out of %s connected clients",
- self.job_id,
- len(self._subscribed_players),
- len(self.expected_players),
- )
+ chunk_num = 0
async for chunk in self.pcm_audio_source:
- num_subscribers = len(self._subscribed_players)
- if num_subscribers == 0:
- break
- async with asyncio.TaskGroup() as tg:
- for ffmpeg_proc in list(self._subscribed_players.values()):
- tg.create_task(ffmpeg_proc.write(chunk))
+ chunk_num += 1
+ if chunk_num == 1:
+ # wait until all expected clients are connected
+ try:
+ async with asyncio.timeout(10):
+ await self._all_clients_connected.wait()
+ except TimeoutError:
+ if len(self.subscribed_players) == 0:
+ self.logger.exception(
+ "Abort multi client stream job %s: "
+ "client(s) did not connect within timeout",
+ self.job_id,
+ )
+ break
+ # not all clients connected but timeout expired, set flag and move on
+ # with all clients that did connect
+ self._all_clients_connected.set()
+ else:
+ self.logger.debug(
+ "Starting queue stream job %s with %s (out of %s) connected clients",
+ self.job_id,
+ len(self.subscribed_players),
+ len(self.expected_players),
+ )
- # write EOF at end of queue stream
- async with asyncio.TaskGroup() as tg:
- for ffmpeg_proc in list(self._subscribed_players.values()):
- tg.create_task(ffmpeg_proc.write_eof())
- self.logger.debug("Finished stream job %s", self.job_id)
- self._finished = True
+ await self._put_chunk(chunk)
+
+ # mark EOF with empty chunk
+ await self._put_chunk(b"")
def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
default_sample_rate=stream_job.pcm_format.sample_rate,
default_bit_depth=stream_job.pcm_format.bit_depth,
)
- # prepare request, add some DLNA/UPNP compatible headers
- enable_icy = request.headers.get("Icy-MetaData", "") == "1"
+ # play it safe: only allow icy metadata for mp3 and aac
+ enable_icy = request.headers.get(
+ "Icy-MetaData", ""
+ ) == "1" and output_format.content_type in (ContentType.MP3, ContentType.AAC)
icy_meta_interval = 16384 * 4 if output_format.content_type.is_lossless() else 16384
+
+ # prepare request, add some DLNA/UPNP compatible headers
headers = {
**DEFAULT_STREAM_HEADERS,
"Content-Type": f"audio/{output_format.output_format_str}",
if request.method != "GET":
return resp
+ # some players (e.g. dlna, sonos) misbehave and do multiple GET requests
+ # to the stream in an attempt to get the audio details such as duration
+ # which is a bit pointless for our duration-less queue stream
+ # and it completely messes with the subscription logic
+ if player_id in stream_job.subscribed_players:
+ self.logger.warning(
+ "Player %s is making multiple requests "
+ "to the same stream, playback may be disturbed!",
+ player_id,
+ )
+ elif "rincon" in player_id.lower():
+ await asyncio.sleep(0.1)
+
# all checks passed, start streaming!
self.logger.debug(
"Start serving Queue flow audio stream for queue %s to player %s",
if content_type == ContentType.PCM:
# resolve generic pcm type
content_type = ContentType.from_bit_depth(output_bit_depth)
-
else:
output_sample_rate = min(default_sample_rate, queue_player.max_sample_rate)
player_max_bit_depth = 24 if queue_player.supports_24bit else 16
Other than stripping silence at end and beginning and optional
volume normalization this is the pure, unaltered audio data as PCM chunks.
"""
- logger = LOGGER.getChild("media_stream")
bytes_sent = 0
is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
if is_radio or streamdetails.seek_position:
)
finished = False
- logger.debug("start media stream for: %s", streamdetails.uri)
- writer_task: asyncio.Task | None = None
ffmpeg_proc = AsyncProcess(
- ffmpeg_args, enable_stdin=streamdetails.direct is None, enable_stderr=True
+ ffmpeg_args,
+ enable_stdin=streamdetails.direct is None,
+ enable_stderr=True,
+ name="ffmpeg_media_stream",
)
await ffmpeg_proc.start()
+ logger = LOGGER.getChild("media_stream")
+ logger.debug("start media stream for: %s", streamdetails.uri)
async def writer() -> None:
"""Task that grabs the source audio and feeds it to ffmpeg."""
- logger.log(VERBOSE_LOG_LEVEL, "writer started for %s", streamdetails.uri)
music_prov = mass.get_provider(streamdetails.provider)
seek_pos = streamdetails.seek_position if streamdetails.can_seek else 0
async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
await ffmpeg_proc.write(audio_chunk)
# write eof when last packet is received
await ffmpeg_proc.write_eof()
- logger.log(VERBOSE_LOG_LEVEL, "writer finished for %s", streamdetails.uri)
if streamdetails.direct is None:
- writer_task = asyncio.create_task(writer())
+ ffmpeg_proc.attached_tasks.append(asyncio.create_task(writer()))
# get pcm chunks from stdout
# we always stay one chunk behind to properly detect end of chunks
streamdetails.uri,
seconds_streamed,
)
- if writer_task and not writer_task.done():
- writer_task.cancel()
+
# use communicate to read stderr and wait for exit
# read log for loudness measurement (or errors)
_, stderr = await ffmpeg_proc.communicate()
Takes care of resampling and/or recoding if needed,
according to player preferences.
"""
- logger = LOGGER.getChild("ffmpeg_stream")
use_stdin = not isinstance(audio_input, str)
ffmpeg_args = get_ffmpeg_args(
input_format=input_format,
extra_args=extra_args or [],
input_path="-" if use_stdin else audio_input,
output_path="-",
+ loglevel="info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
)
-
- writer_task: asyncio.Task | None = None
- ffmpeg_proc = AsyncProcess(
- ffmpeg_args, enable_stdin=use_stdin, enable_stdout=True, enable_stderr=True
- )
- await ffmpeg_proc.start()
-
- # feed stdin with pcm audio chunks from origin
- async def writer() -> None:
- async for chunk in audio_input:
- if ffmpeg_proc.closed:
- return
- await ffmpeg_proc.write(chunk)
- await ffmpeg_proc.write_eof()
-
- try:
- if not isinstance(audio_input, str):
- writer_task = asyncio.create_task(writer())
-
+ async with AsyncProcess(
+ ffmpeg_args,
+ enable_stdin=use_stdin,
+ enable_stdout=True,
+ enable_stderr=False,
+ custom_stdin=audio_input if use_stdin else None,
+ name="player_ffmpeg_stream",
+ ) as ffmpeg_proc:
# read final chunks from stdout
chunk_size = chunk_size or get_chunksize(output_format, 1)
async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
- try:
- yield chunk
- except (BrokenPipeError, ConnectionResetError):
- # race condition
- break
- finally:
- if writer_task and not writer_task.done():
- writer_task.cancel()
- # use communicate to read stderr and wait for exit
- _, stderr = await ffmpeg_proc.communicate()
- if ffmpeg_proc.returncode != 0:
- # ffmpeg has a non zero returncode meaning trouble
- logger.warning("FFMPEG ERROR\n%s", stderr.decode())
- else:
- logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
+ yield chunk
async def check_audio_support() -> tuple[bool, bool, str]:
from typing import TYPE_CHECKING
from music_assistant.common.models.enums import MediaType
-from music_assistant.constants import MASS_LOGO_ONLINE
+from music_assistant.constants import MASS_LOGO_ONLINE, UGP_PREFIX
if TYPE_CHECKING:
from music_assistant.common.models.queue_item import QueueItem
# ruff: noqa: E501
-def create_didl_metadata(
- mass: MusicAssistant, url: str, queue_item: QueueItem | None = None
-) -> str:
+def create_didl_metadata(mass: MusicAssistant, url: str, queue_item: QueueItem) -> str:
"""Create DIDL metadata string from url and (optional) QueueItem."""
ext = url.split(".")[-1].split("?")[0]
- if queue_item is None:
+ if "flow" in url or queue_item.queue_id.startswith(UGP_PREFIX):
+ # flow stream
return (
'<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
f'<item id="flowmode" parentID="0" restricted="1">'
- "<dc:title>Music Assistant</dc:title>"
+ f"<dc:title>Music Assistant</dc:title>"
f"<upnp:albumArtURI>{escape_string(MASS_LOGO_ONLINE)}</upnp:albumArtURI>"
+ f"<dc:queueItemId>{queue_item.queue_id}</dc:queueItemId>"
"<upnp:class>object.item.audioItem.audioBroadcast</upnp:class>"
f"<upnp:mimeType>audio/{ext}</upnp:mimeType>"
f'<res duration="23:59:59.000" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_PN={ext.upper()};DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{escape_string(url)}</res>'
"</item>"
"</DIDL-Lite>"
)
- is_radio = queue_item.media_type != MediaType.TRACK or not queue_item.duration
image_url = (
mass.metadata.get_image_url(queue_item.image) if queue_item.image else MASS_LOGO_ONLINE
)
- if is_radio:
+ if queue_item.media_type != MediaType.TRACK or not queue_item.duration:
# radio or other non-track item
return (
'<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
enable_stdin: bool = False,
enable_stdout: bool = True,
enable_stderr: bool = False,
+ custom_stdin: AsyncGenerator[bytes, None] | int | None = None,
+ custom_stdout: int | None = None,
+ name: str | None = None,
) -> None:
"""Initialize AsyncProcess."""
self.proc: asyncio.subprocess.Process | None = None
self._enable_stdout = enable_stdout
self._enable_stderr = enable_stderr
self._close_called = False
- self._stdin_lock = asyncio.Lock()
- self._stdout_lock = asyncio.Lock()
- self._stderr_lock = asyncio.Lock()
self._returncode: bool | None = None
+ self._name = name or self._args[0].split(os.sep)[-1]
+ self.attached_tasks: list[asyncio.Task] = []
+ self._custom_stdin = custom_stdin
+ if not isinstance(custom_stdin, int | None):
+ self._custom_stdin = None
+ self.attached_tasks.append(asyncio.create_task(self._feed_stdin(custom_stdin)))
+ self._custom_stdout = custom_stdout
@property
def closed(self) -> bool:
"""Exit context manager."""
await self.close()
self._returncode = self.returncode
- del self.proc
- del self._stdin_lock
- del self._stdout_lock
- del self._returncode
async def start(self) -> None:
"""Perform Async init of process."""
+ stdin = self._custom_stdin if self._custom_stdin is not None else asyncio.subprocess.PIPE
+ stdout = self._custom_stdout if self._custom_stdout is not None else asyncio.subprocess.PIPE
self.proc = await asyncio.create_subprocess_exec(
*self._args,
- stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
- stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
+ stdin=stdin if self._enable_stdin else None,
+ stdout=stdout if self._enable_stdout else None,
stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
- close_fds=True,
+ limit=4000000,
+ pipesize=256000,
)
- proc_name_simple = self._args[0].split(os.sep)[-1]
- LOGGER.debug("Started %s with PID %s", proc_name_simple, self.proc.pid)
+ LOGGER.debug("Started %s with PID %s", self._name, self.proc.pid)
async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks of n size from the process stdout."""
async def readexactly(self, n: int) -> bytes:
"""Read exactly n bytes from the process stdout (or less if eof)."""
- if self._close_called or self.proc.stdout.at_eof():
- return b""
try:
- async with self._stdout_lock:
- return await self.proc.stdout.readexactly(n)
+ return await self.proc.stdout.readexactly(n)
except asyncio.IncompleteReadError as err:
return err.partial
and may return less or equal bytes than requested, but at least one byte.
If EOF was received before any byte is read, this function returns empty byte object.
"""
- if self._close_called or self.proc.stdout.at_eof():
- return b""
- if self.proc.stdout.at_eof():
- return b""
- async with self._stdout_lock:
- return await self.proc.stdout.read(n)
+ return await self.proc.stdout.read(n)
async def write(self, data: bytes) -> None:
"""Write data to process stdin."""
if self._close_called or self.proc.stdin.is_closing():
- return
- if not self.proc or self.proc.returncode is not None:
- raise RuntimeError("Process not started or already exited")
- async with self._stdin_lock:
- self.proc.stdin.write(data)
- with suppress(BrokenPipeError):
- await self.proc.stdin.drain()
+ raise asyncio.CancelledError("write called while process already done")
+ self.proc.stdin.write(data)
+ with suppress(BrokenPipeError, ConnectionResetError):
+ await self.proc.stdin.drain()
async def write_eof(self) -> None:
"""Write end of file to to process stdin."""
- if not self._enable_stdin:
- raise RuntimeError("STDIN is not enabled")
- if not self.proc or self.proc.returncode is not None:
- raise RuntimeError("Process not started or already exited")
- if self._close_called or self.proc.stdin.is_closing():
- return
try:
- async with self._stdin_lock:
- if self.proc.stdin.can_write_eof():
- self.proc.stdin.write_eof()
+ if self.proc.stdin.can_write_eof():
+ self.proc.stdin.write_eof()
except (
AttributeError,
AssertionError,
async def close(self) -> int:
"""Close/terminate the process and wait for exit."""
self._close_called = True
- if self.proc.returncode is None:
+ # close any/all attached (writer) tasks
+ for task in self.attached_tasks:
+ if not task.done():
+ task.cancel()
+ with suppress(asyncio.CancelledError):
+ await task
+ # send communicate until we exited
+ while self.proc.returncode is None:
# make sure the process is cleaned up
try:
# we need to use communicate to ensure buffers are flushed
- await asyncio.wait_for(self.proc.communicate(), 5)
+ await asyncio.wait_for(self.proc.communicate(), 10)
except TimeoutError:
LOGGER.debug(
- "Process with PID %s did not stop within 5 seconds. Sending terminate...",
+ "Process %s with PID %s did not stop in time. Sending terminate...",
+ self._name,
self.proc.pid,
)
self.proc.terminate()
- await self.proc.communicate()
+ await asyncio.sleep(0.5)
LOGGER.debug(
- "Process with PID %s stopped with returncode %s", self.proc.pid, self.proc.returncode
+ "Process %s with PID %s stopped with returncode %s",
+ self._name,
+ self.proc.pid,
+ self.proc.returncode,
)
return self.proc.returncode
async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
"""Write bytes to process and read back results."""
- if self.closed:
- return (b"", b"")
- async with self._stdout_lock, self._stdin_lock, self._stderr_lock:
- stdout, stderr = await self.proc.communicate(input_data)
+ stdout, stderr = await self.proc.communicate(input_data)
return (stdout, stderr)
async def read_stderr(self) -> AsyncGenerator[bytes, None]:
"""Read lines from the stderr stream."""
- async with self._stderr_lock:
- async for line in self.proc.stderr:
- if self.closed:
- break
- yield line
+ async for line in self.proc.stderr:
+ yield line
+
+ async def _feed_stdin(self, custom_stdin: AsyncGenerator[bytes, None]) -> None:
+ """Feed stdin with chunks from an AsyncGenerator."""
+ try:
+ async for chunk in custom_stdin:
+ if self._close_called or self.proc.stdin.is_closing():
+ return
+ await self.write(chunk)
+ await self.write_eof()
+ except asyncio.CancelledError:
+ # make sure the stdin generator is also properly closed
+ # by propagating a cancellederror within
+ task = asyncio.create_task(custom_stdin.__anext__())
+ task.cancel()
async def check_output(shell_cmd: str) -> tuple[int, bytes]:
"-port",
str(self.airplay_player.discovery_info.port),
"-wait",
- str(2000 - sync_adjust),
+ str(3000 - sync_adjust),
"-volume",
str(mass_player.volume_level),
*extra_args,
if platform.system() == "Darwin":
os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
+ # connect cliraop stdin with ffmpeg stdout using os pipes
+ read, write = os.pipe()
+ # launch ffmpeg, feeding (player specific) audio chunks on stdout
+ self._audio_reader_task = asyncio.create_task(
+ stream_job.stream_to_custom_output_path(
+ player_id=player_id, output_format=AIRPLAY_PCM_FORMAT, output_path=write
+ )
+ )
self._cliraop_proc = AsyncProcess(
cliraop_args,
enable_stdin=True,
enable_stdout=False,
enable_stderr=True,
+ custom_stdin=read,
)
await self._cliraop_proc.start()
self._log_reader_task = asyncio.create_task(self._log_watcher())
- self._audio_reader_task = asyncio.create_task(self._audio_reader())
+
+ # self._audio_reader_task = asyncio.create_task(self._audio_reader())
async def stop(self, wait: bool = True):
"""Stop playback and cleanup."""
"""Monitor stderr for the running CLIRaop process."""
airplay_player = self.airplay_player
mass_player = self.mass.players.get(airplay_player.player_id)
+ queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
logger = airplay_player.logger
lost_packets = 0
+ prev_metadata_checksum: str = ""
+ prev_progress_report: float = 0
async for line in self._cliraop_proc.read_stderr():
line = line.decode().strip() # noqa: PLW2901
if not line:
continue
if "elapsed milliseconds:" in line:
- # do not log this line, its too verbose
+ # this is received more or less every second while playing
millis = int(line.split("elapsed milliseconds: ")[1])
mass_player.elapsed_time = millis / 1000
mass_player.elapsed_time_last_updated = time.time()
- continue
+ # send metadata to player(s) if needed
+ # NOTE: this must all be done in separate tasks to not disturb audio
+ now = time.time()
+ if queue and queue.current_item and queue.current_item.streamdetails:
+ metadata_checksum = (
+ queue.current_item.streamdetails.stream_title
+ or queue.current_item.queue_item_id
+ )
+ if prev_metadata_checksum != metadata_checksum:
+ prev_metadata_checksum = metadata_checksum
+ prev_progress_report = now
+ self.mass.create_task(self._send_metadata(queue))
+ # send the progress report every 5 seconds
+ elif now - prev_progress_report >= 5:
+ prev_progress_report = now
+ self.mass.create_task(self._send_progress(queue))
if "set pause" in line or "Pause at" in line:
- logger.debug("raop streaming paused")
mass_player.state = PlayerState.PAUSED
self.mass.players.update(airplay_player.player_id)
- continue
if "Restarted at" in line or "restarting w/ pause" in line:
- logger.debug("raop streaming restarted after pause")
mass_player.state = PlayerState.PLAYING
self.mass.players.update(airplay_player.player_id)
- continue
if "restarting w/o pause" in line:
# streaming has started
- logger.debug("raop streaming started")
mass_player.state = PlayerState.PLAYING
mass_player.elapsed_time = 0
mass_player.elapsed_time_last_updated = time.time()
self.mass.players.update(airplay_player.player_id)
- continue
if "lost packet out of backlog" in line:
lost_packets += 1
if lost_packets == 50:
await self.mass.player_queues.stop(queue.queue_id)
else:
logger.debug(line)
- continue
- # verbose log everything else
+
logger.log(VERBOSE_LOG_LEVEL, line)
# if we reach this point, the process exited
mass_player.state = PlayerState.IDLE
self.mass.players.update(airplay_player.player_id)
- async def _audio_reader(self) -> None:
- """Send audio chunks to the cliraop process."""
- logger = self.airplay_player.logger
- mass_player = self.mass.players.get(self.airplay_player.player_id, True)
- queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
- logger.debug(
- "Starting RAOP stream for Queue %s to %s",
- queue.display_name,
- mass_player.display_name,
- )
- prev_metadata_checksum: str = ""
- prev_progress_report: float = 0
-
- async for chunk in self.stream_job.iter_player_audio(
- self.airplay_player.player_id, AIRPLAY_PCM_FORMAT
- ):
- if self._stop_requested:
- return
- await self._cliraop_proc.write(chunk)
- # send metadata to player(s) if needed
- # NOTE: this must all be done in separate tasks to not disturb audio
- now = time.time()
- if queue and queue.current_item and queue.current_item.streamdetails:
- metadata_checksum = (
- queue.current_item.streamdetails.stream_title
- or queue.current_item.queue_item_id
- )
- if prev_metadata_checksum != metadata_checksum:
- prev_metadata_checksum = metadata_checksum
- prev_progress_report = now
- self.mass.create_task(self._send_metadata(queue))
- # send the progress report every 5 seconds
- elif now - prev_progress_report >= 5:
- prev_progress_report = now
- self.mass.create_task(self._send_progress(queue))
- # send EOF
- await self._cliraop_proc.write_eof()
- logger.debug(
- "Finished RAOP stream for Queue %s to %s",
- queue.display_name,
- mass_player.display_name,
- )
-
async def _send_metadata(self, queue: PlayerQueue) -> None:
"""Send metadata to player (and connected sync childs)."""
if not self.running:
# always clear queue (by sending stop) first
if dlna_player.device.can_stop:
await self.cmd_stop(player_id)
- didl_metadata = create_didl_metadata(
- self.mass, url, queue_item if not use_flow_mode else None
- )
+ didl_metadata = create_didl_metadata(self.mass, url, queue_item)
title = queue_item.name if queue_item else "Music Assistant"
await dlna_player.device.async_set_transport_uri(url, title, didl_metadata)
# Play it
from music_assistant.constants import CONF_CROSSFADE, VERBOSE_LOG_LEVEL
from music_assistant.server.helpers.didl_lite import create_didl_metadata
from music_assistant.server.models.player_provider import PlayerProvider
-from music_assistant.server.providers.ugp import UGP_PREFIX
from .player import SonosPlayer
)
raise PlayerCommandFailed(msg)
- is_flow_stream = queue_item.queue_item_id == "flow" or queue_item.queue_id.startswith(
- UGP_PREFIX
- )
url = self.mass.streams.resolve_stream_url(
- player_id, queue_item=queue_item, output_codec=ContentType.FLAC
+ player_id,
+ queue_item=queue_item,
+ output_codec=ContentType.FLAC,
)
self.mass.create_task(
- sonos_player.soco.play_uri,
- url,
- meta=create_didl_metadata(self.mass, url, None if is_flow_stream else queue_item),
+ sonos_player.soco.play_uri, url, meta=create_didl_metadata(self.mass, url, queue_item)
)
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
if target is None:
msg = "Target is missing"
raise RuntimeError(msg)
- if existing := self._tracked_tasks.get(task_id):
+ if task_id and (existing := self._tracked_tasks.get(task_id)):
# prevent duplicate tasks if task_id is given and already present
return existing
if asyncio.iscoroutinefunction(target):
task = target
else:
# assume normal callable (non coroutine or awaitable)
+ # that needs to be run in the executor
task = self.loop.create_task(asyncio.to_thread(target, *args, **kwargs))
def task_done_callback(_task: asyncio.Future | asyncio.Task) -> None:
_task_id = task.task_id
self._tracked_tasks.pop(_task_id)
- # print unhandled exceptions
- if LOGGER.isEnabledFor(logging.DEBUG) and not _task.cancelled() and _task.exception():
- task_name = _task.get_name() if hasattr(_task, "get_name") else _task
- LOGGER.exception(
- "Exception in task %s - target: %s",
+ # log unhandled exceptions
+ if (
+ LOGGER.isEnabledFor(logging.DEBUG)
+ and not _task.cancelled()
+ and (err := _task.exception())
+ ):
+ task_name = _task.get_name() if hasattr(_task, "get_name") else str(_task)
+ LOGGER.warning(
+ "Exception in task %s - target: %s: %s",
task_name,
str(target),
- exc_info=task.exception(),
+ str(err),
+ exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
)
if task_id is None: