ALPINE_RELEASE_FILE = "/etc/alpine-release"
-class VerboseLogger(logging.Logger):
- """Custom python logger with included verbose log level."""
-
- def verbose(self, msg, *args, **kwargs):
- """Log a verbose message."""
- self.log(VERBOSE_LOG_LEVEL, msg, *args, **kwargs)
-
-
def get_arguments():
"""Arguments handling."""
parser = argparse.ArgumentParser(description="MusicAssistant")
logger = logging.getLogger()
logger.addHandler(file_handler)
logging.addLevelName(VERBOSE_LOG_LEVEL, "VERBOSE")
- logging.setLoggerClass(VerboseLogger)
# apply the configured global log level to the (root) music assistant logger
logging.getLogger(ROOT_LOGGER_NAME).setLevel(level)
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
-from music_assistant.client.exceptions import (
- ConnectionClosed,
- InvalidServerVersion,
- InvalidState,
-)
+from music_assistant.client.exceptions import ConnectionClosed, InvalidServerVersion, InvalidState
from music_assistant.common.models.api import (
ChunkedResultMessage,
CommandMessage,
return self
async def __aexit__(
- self, exc_type: Exception, exc_value: str, traceback: TracebackType
- ) -> None:
- """Disconnect from the server and exit."""
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> bool | None:
+ """Exit context manager."""
await self.disconnect()
def __repr__(self) -> str:
import time
import urllib.parse
from collections.abc import AsyncGenerator
+from contextlib import asynccontextmanager
from typing import TYPE_CHECKING
import shortuuid
from aiohttp import web
-from music_assistant.common.helpers.util import (
- empty_queue,
- get_ip,
- select_free_port,
- try_parse_bool,
-)
+from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueOption,
CONF_OUTPUT_CHANNELS,
CONF_PUBLISH_IP,
SILENCE_FILE,
+ VERBOSE_LOG_LEVEL,
)
from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.server.helpers.audio import (
check_audio_support,
crossfade_pcm_parts,
+ get_ffmpeg_args,
get_ffmpeg_stream,
get_media_stream,
get_player_filter_params,
)
+from music_assistant.server.helpers.process import AsyncProcess
from music_assistant.server.helpers.util import get_ips
from music_assistant.server.helpers.webserver import Webserver
from music_assistant.server.models.core_controller import CoreController
DEFAULT_STREAM_HEADERS = {
"transferMode.dlna.org": "Streaming",
"contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000", # noqa: E501
- "Cache-Control": "no-cache",
+ "Cache-Control": "no-cache,must-revalidate",
+ "Pragma": "no-cache",
"Connection": "close",
"Accept-Ranges": "none",
- "icy-name": "Music Assistant",
- "icy-pub": "0",
+ "Icy-Name": "Music Assistant",
+ "Icy-Url": "https://music-assistant.io",
}
-FLOW_MAX_SAMPLE_RATE = 96000
-FLOW_MAX_BIT_DEPTH = 24
+FLOW_DEFAULT_SAMPLE_RATE = 48000
+FLOW_DEFAULT_BIT_DEPTH = 24
# pylint:disable=too-many-locals
-class MultiClientQueueStreamJob:
- """Representation of a (multiclient) Audio Queue stream job/task.
+class QueueStreamJob:
+ """
+ Representation of a (multiclient) Audio stream job/task.
- The whole idea here is that the queue stream audio can be sent to multiple
+ The whole idea here is that the (pcm) audio source can be sent to multiple
players at once. For example for (slimproto/airplay) syncgroups and universal group.
- all client players receive the exact same audio chunks from the source audio,
- encoded and/or resampled to the player's preferences.
- A StreamJob is tied to a Queue and streams the queue flow stream,
+
+ All client players receive the exact same audio chunks from the source audio,
+ then encoded and/or resampled to the player's preferences.
In case a stream is restarted (e.g. when seeking),
- a new MultiClientQueueStreamJob will be created.
+ a new QueueStreamJob will be created.
"""
_audio_task: asyncio.Task | None = None
mass: MusicAssistant,
pcm_audio_source: AsyncGenerator[bytes, None],
pcm_format: AudioFormat,
- expected_players: set[str],
+ auto_start: bool = False,
) -> None:
- """Initialize MultiClientQueueStreamJob instance."""
+ """Initialize QueueStreamJob instance."""
self.mass = mass
self.pcm_audio_source = pcm_audio_source
self.pcm_format = pcm_format
- self.expected_players = expected_players
+ self.expected_players: set[str] = set()
self.job_id = shortuuid.uuid()
self.bytes_streamed: int = 0
self.logger = self.mass.streams.logger.getChild(f"stream_job.{self.job_id}")
- self._subscribed_players: dict[str, asyncio.Queue] = {}
+ self._subscribed_players: dict[str, AsyncProcess] = {}
self._finished = False
self._running = False
- self._allow_start = asyncio.Event()
+ self.allow_start = asyncio.Event()
self._audio_task = asyncio.create_task(self._stream_job_runner())
+ if auto_start:
+ self.allow_start.set()
@property
def finished(self) -> bool:
@property
def pending(self) -> bool:
"""Return if this Job is pending start."""
- return not self.finished and not self._audio_task
+ return not self.finished and not self.running
@property
def running(self) -> bool:
"""Start running (send audio chunks to connected players)."""
if self.finished:
raise RuntimeError("Task is already finished")
- self._allow_start.set()
+ self.allow_start.set()
def stop(self) -> None:
"""Stop running this job."""
- if self._audio_task and self._audio_task.done():
- return
- if self._audio_task:
+ if self._audio_task and not self._audio_task.done():
self._audio_task.cancel()
self._finished = True
- def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
+ def resolve_stream_url(self, player_id: str, output_codec: ContentType) -> str:
"""Resolve the childplayer specific stream URL to this streamjob."""
fmt = output_codec.value
# handle raw pcm
if output_codec.is_pcm():
- player = self.mass.streams.mass.players.get(child_player_id)
+ player = self.mass.streams.mass.players.get(player_id)
player_max_bit_depth = 24 if player.supports_24bit else 16
output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate)
output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth)
output_channels = self.mass.config.get_raw_player_config_value(
- child_player_id, CONF_OUTPUT_CHANNELS, "stereo"
+ player_id, CONF_OUTPUT_CHANNELS, "stereo"
)
channels = 1 if output_channels != "stereo" else 2
fmt += (
f";codec=pcm;rate={output_sample_rate};"
f"bitrate={output_bit_depth};channels={channels}"
)
- url = f"{self.mass.streams._server.base_url}/multi/{self.job_id}/{child_player_id}.{fmt}"
- self.expected_players.add(child_player_id)
+ url = f"{self.mass.streams._server.base_url}/flow/{self.job_id}/{player_id}.{fmt}"
+ self.expected_players.add(player_id)
return url
- async def subscribe(
+ async def iter_player_audio(
self, player_id: str, output_format: AudioFormat, chunk_size: int | None = None
) -> AsyncGenerator[bytes, None]:
- """Subscribe consumer and iterate chunks on the queue encoded to given output format."""
- async for chunk in get_ffmpeg_stream(
- audio_input=self._subscribe_pcm(player_id),
+ """Subscribe consumer and iterate player-specific audio."""
+ ffmpeg_args = get_ffmpeg_args(
input_format=self.pcm_format,
output_format=output_format,
filter_params=get_player_filter_params(self.mass, player_id),
- chunk_size=chunk_size,
- ):
- yield chunk
+ extra_args=[],
+ input_path="-",
+ output_path="-",
+ loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
+ )
+ # launch ffmpeg process with player specific settings
+ # the stream_job_runner will start pushing pcm chunks to the stdin
+ # we then read the players-specific (encoded) output chunks
+ # from ffmpeg stdout and yield them
+ async with AsyncProcess(
+ ffmpeg_args, enable_stdin=True, enable_stdout=True, enable_stderr=False
+ ) as ffmpeg_proc, self.subscribe(player_id, ffmpeg_proc):
+ # read final chunks from ffmpeg's stdout
+ iterator = (
+ ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
+ )
+ async for chunk in iterator:
+ try:
+ yield chunk
+ except (BrokenPipeError, ConnectionResetError):
+ # race condition?
+ break
- async def _subscribe_pcm(self, player_id: str) -> AsyncGenerator[bytes, None]:
- """Subscribe consumer and iterate incoming (raw pcm) chunks on the queue."""
+ async def stream_to_custom_output_path(
+ self, player_id: str, output_format: AudioFormat, output_path: str
+ ) -> None:
+ """Subscribe consumer and instruct ffmpeg to send the audio to the given output path."""
+ ffmpeg_args = get_ffmpeg_args(
+ input_format=self.pcm_format,
+ output_format=output_format,
+ filter_params=get_player_filter_params(self.mass, player_id),
+ extra_args=[],
+ input_path="-",
+ output_path=output_path,
+ loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
+ )
+ # launch ffmpeg process with player specific settings
+ # the stream_job_runner will start pushing pcm chunks to the stdin
+ # the ffmpeg process will send the output directly to the given path (e.g. tcp socket)
+ async with AsyncProcess(
+ ffmpeg_args,
+ enable_stdin=True,
+ enable_stdout=False,
+ enable_stderr=False,
+ ) as ffmpeg_proc, self.subscribe(player_id, ffmpeg_proc):
+ # we simply wait for the process to exit
+ await ffmpeg_proc.wait()
+
+ @asynccontextmanager
+ async def subscribe(
+ self, player_id: str, ffmpeg_proc: AsyncProcess
+ ) -> AsyncGenerator[QueueStreamJob]:
+ """Subscribe consumer's (output) ffmpeg process."""
+ if self.running:
+ # client subscribes while we're already started
+ # that will probably cause side effects but let it go
+ self.logger.warning(
+ "Player %s is joining while the stream is already started!", player_id
+ )
try:
- self._subscribed_players[player_id] = queue = asyncio.Queue(2)
-
- if self.running:
- # client subscribes while we're already started
- # that will probably cause side effects but let it go
- self.logger.warning(
- "Player %s is joining while the stream is already started!", player_id
- )
- else:
- self.logger.debug("Subscribed player %s", player_id)
-
- # yield from queue until finished
- while not self._finished:
- yield await queue.get()
+ self._subscribed_players[player_id] = ffmpeg_proc
+ self.logger.debug("Subscribed player %s", player_id)
+ yield self
finally:
- if sub_queue := self._subscribed_players.pop(player_id, None):
- empty_queue(sub_queue)
+ self._subscribed_players.pop(player_id, None)
self.logger.debug("Unsubscribed client %s", player_id)
# check if this was the last subscriber and we should cancel
- await asyncio.sleep(2)
+ await asyncio.sleep(5)
if len(self._subscribed_players) == 0 and not self.finished:
self.logger.debug("Cleaning up, all clients disappeared...")
self.stop()
async def _stream_job_runner(self) -> None:
"""Feed audio chunks to StreamJob subscribers."""
- await self._allow_start.wait()
+ await self.allow_start.wait()
retries = 50
while retries:
retries -= 1
- await asyncio.sleep(0.2)
+ await asyncio.sleep(0.1)
if len(self._subscribed_players) != len(self.expected_players):
continue
await asyncio.sleep(0.2)
break
self.logger.debug(
- "Starting multi client stream job %s with %s out of %s connected clients",
+ "Starting stream job %s with %s out of %s connected clients",
self.job_id,
len(self._subscribed_players),
len(self.expected_players),
)
async for chunk in self.pcm_audio_source:
+ num_subscribers = len(self._subscribed_players)
+ if num_subscribers == 0:
+ break
async with asyncio.TaskGroup() as tg:
- for listener_queue in list(self._subscribed_players.values()):
- tg.create_task(listener_queue.put(chunk))
+ for ffmpeg_proc in list(self._subscribed_players.values()):
+ tg.create_task(ffmpeg_proc.write(chunk))
+
+ # write EOF at end of queue stream
+ async with asyncio.TaskGroup() as tg:
+ for ffmpeg_proc in list(self._subscribed_players.values()):
+ tg.create_task(ffmpeg_proc.write_eof())
+ self.logger.debug("Finished stream job %s", self.job_id)
self._finished = True
"""Initialize instance."""
super().__init__(*args, **kwargs)
self._server = Webserver(self.logger, enable_dynamic_routes=True)
- self.multi_client_jobs: dict[str, MultiClientQueueStreamJob] = {}
+ self.stream_jobs: dict[str, QueueStreamJob] = {}
self.register_dynamic_route = self._server.register_dynamic_route
self.unregister_dynamic_route = self._server.unregister_dynamic_route
self.manifest.name = "Streamserver"
self.manifest.description = (
- "Music Assistant's core server that is responsible for "
+ "Music Assistant's core controller that is responsible for "
"streaming audio to players on the local network as well as "
"some player specific local control callbacks."
)
static_routes=[
(
"*",
- "/multi/{job_id}/{player_id}.{fmt}",
- self.serve_multi_subscriber_stream,
- ),
- (
- "*",
- "/flow/{queue_id}/{queue_item_id}.{fmt}",
+ "/flow/{job_id}/{player_id}.{fmt}",
self.serve_queue_flow_stream,
),
(
"""Cleanup on exit."""
await self._server.close()
- async def resolve_stream_url(
+ def resolve_stream_url(
self,
player_id: str,
queue_item: QueueItem,
# handle announcement item
if queue_item.media_type == MediaType.ANNOUNCEMENT:
return queue_item.queue_item_id
- # handle request for multi client queue stream
- stream_job = self.multi_client_jobs.get(queue_item.queue_id)
- if queue_item.queue_item_id == "flow" or stream_job and stream_job.pending:
+ # handle request for (multi client) queue flow stream
+ if queue_item.queue_item_id in ("flow", queue_item.queue_id) or flow_mode:
+ # note: this will return an existing streamjonb if that was already created
+ # e.g. in case of universal group player
+ pcm_format = AudioFormat(
+ content_type=ContentType.from_bit_depth(24),
+ sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
+ bit_depth=FLOW_DEFAULT_BIT_DEPTH,
+ )
+ stream_job = self.create_stream_job(
+ queue_item.queue_id,
+ pcm_audio_source=self.get_flow_stream(
+ self.mass.player_queues.get(queue_item.queue_id),
+ start_queue_item=queue_item,
+ pcm_format=pcm_format,
+ ),
+ pcm_format=pcm_format,
+ auto_start=True,
+ )
+
return stream_job.resolve_stream_url(player_id, output_codec)
+
# handle raw pcm without exact format specifiers
if output_codec.is_pcm() and ";" not in fmt:
fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
query_params = {}
- base_path = "flow" if flow_mode else "single"
- url = f"{self._server.base_url}/{base_path}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" # noqa: E501
+ url = (
+ f"{self._server.base_url}/single/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}"
+ )
# we add a timestamp as basic checksum
# most importantly this is to invalidate any caches
# but also to handle edge cases such as single track repeat
url += "?" + urllib.parse.urlencode(query_params)
return url
- async def create_multi_client_stream_job(
+ def create_stream_job(
self,
queue_id: str,
- start_queue_item: QueueItem,
- pcm_bit_depth: int = 24,
- pcm_sample_rate: int = 48000,
- expected_players: set[str] | None = None,
- ) -> MultiClientQueueStreamJob:
+ pcm_audio_source: AsyncGenerator[bytes, None],
+ pcm_format: AudioFormat,
+ auto_start: bool = False,
+ ) -> QueueStreamJob:
"""
- Create a MultiClientQueueStreamJob for the given queue..
+ Create a QueueStreamJob for the given queue..
This is called by player/sync group implementations to start streaming
the queue audio to multiple players at once.
"""
- if existing_job := self.multi_client_jobs.get(queue_id, None):
+ if existing_job := self.stream_jobs.get(queue_id, None):
if existing_job.pending:
return existing_job
# cleanup existing job first
existing_job.stop()
- queue = self.mass.player_queues.get(queue_id)
- pcm_format = AudioFormat(
- content_type=ContentType.from_bit_depth(pcm_bit_depth),
- sample_rate=pcm_sample_rate,
- bit_depth=pcm_bit_depth,
- channels=2,
- )
- self.multi_client_jobs[queue_id] = stream_job = MultiClientQueueStreamJob(
+ self.stream_jobs[queue_id] = stream_job = QueueStreamJob(
self.mass,
- pcm_audio_source=self.get_flow_stream(
- queue=queue,
- start_queue_item=start_queue_item,
- pcm_format=pcm_format,
- ),
+ pcm_audio_source=pcm_audio_source,
pcm_format=pcm_format,
- expected_players=expected_players or set(),
+ auto_start=auto_start,
)
return stream_job
async def serve_queue_flow_stream(self, request: web.Request) -> web.Response:
"""Stream Queue Flow audio to player."""
self._log_request(request)
- queue_id = request.match_info["queue_id"]
- if not (queue := self.mass.player_queues.get(queue_id)):
- raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
- start_queue_item_id = request.match_info["queue_item_id"]
- start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
- if not start_queue_item:
- raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
- queue_player = self.mass.players.get(queue_id)
- # work out output format/details
- output_format = await self._get_output_format(
- output_format_str=request.match_info["fmt"],
- queue_player=queue_player,
- default_sample_rate=FLOW_MAX_SAMPLE_RATE,
- default_bit_depth=FLOW_MAX_BIT_DEPTH,
- )
- # prepare request, add some DLNA/UPNP compatible headers
- enable_icy = request.headers.get("Icy-MetaData", "") == "1"
- icy_meta_interval = 16384 * 4 if output_format.content_type.is_lossless() else 16384
- headers = {
- **DEFAULT_STREAM_HEADERS,
- "Content-Type": f"audio/{output_format.output_format_str}",
- }
- if enable_icy:
- headers["icy-metaint"] = str(icy_meta_interval)
-
- resp = web.StreamResponse(
- status=200,
- reason="OK",
- headers=headers,
- )
- await resp.prepare(request)
-
- # return early if this is not a GET request
- if request.method != "GET":
- return resp
-
- # all checks passed, start streaming!
- self.logger.debug("Start serving Queue flow audio stream for %s", queue_player.name)
-
- pcm_format = AudioFormat(
- content_type=ContentType.from_bit_depth(output_format.bit_depth),
- sample_rate=output_format.sample_rate,
- bit_depth=output_format.bit_depth,
- channels=2,
- )
- async for chunk in get_ffmpeg_stream(
- audio_input=self.get_flow_stream(
- queue=queue,
- start_queue_item=start_queue_item,
- pcm_format=pcm_format,
- ),
- input_format=pcm_format,
- output_format=output_format,
- filter_params=get_player_filter_params(self.mass, queue_player.player_id),
- chunk_size=icy_meta_interval if enable_icy else None,
- ):
- try:
- await resp.write(chunk)
- except (BrokenPipeError, ConnectionResetError):
- break
-
- if not enable_icy:
- continue
-
- # if icy metadata is enabled, send the icy metadata after the chunk
- if (
- # use current item here and not buffered item, otherwise
- # the icy metadata will be too much ahead
- (current_item := queue.current_item)
- and current_item.streamdetails
- and current_item.streamdetails.stream_title
- ):
- title = current_item.streamdetails.stream_title
- elif queue and current_item and current_item.name:
- title = current_item.name
- else:
- title = "Music Assistant"
- metadata = f"StreamTitle='{title}';".encode()
- if current_item and current_item.image:
- metadata += f"StreamURL='{current_item.image.path}'".encode()
- while len(metadata) % 16 != 0:
- metadata += b"\x00"
- length = len(metadata)
- length_b = chr(int(length / 16)).encode()
- await resp.write(length_b + metadata)
-
- return resp
-
- async def serve_multi_subscriber_stream(self, request: web.Request) -> web.Response:
- """Stream Queue Flow audio to a child player within a multi subscriber setup."""
- self._log_request(request)
job_id = request.match_info["job_id"]
- for queue_id, stream_job in self.multi_client_jobs.items():
+ for queue_id, stream_job in self.stream_jobs.items():
if stream_job.job_id == job_id:
break
else:
raise web.HTTPNotFound(reason=f"StreamJob {job_id} already finished")
if not (queue := self.mass.player_queues.get(queue_id)):
raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
-
player_id = request.match_info["player_id"]
child_player = self.mass.players.get(player_id)
if not child_player:
# all checks passed, start streaming!
self.logger.debug(
- "Start serving multi-subscriber Queue flow audio stream for queue %s to player %s",
+ "Start serving Queue flow audio stream for queue %s to player %s",
queue.display_name,
child_player.display_name,
)
- async for chunk in stream_job.subscribe(
+ async for chunk in stream_job.iter_player_audio(
player_id, output_format, chunk_size=icy_meta_interval if enable_icy else None
):
try:
assert pcm_format.content_type.is_pcm()
queue_track = None
last_fadeout_part = b""
- total_bytes_written = 0
queue.flow_mode = True
use_crossfade = self.mass.config.get_raw_player_config_value(
queue.queue_id, CONF_CROSSFADE, False
):
# ALWAYS APPEND CHUNK TO BUFFER
buffer += chunk
+ del chunk
if len(buffer) < buffer_size:
# buffer is not full enough, move on
continue
#### OTHER: enough data in buffer, feed to output
else:
- chunk_size = len(chunk)
- yield buffer[:chunk_size]
- bytes_written += chunk_size
- buffer = buffer[chunk_size:]
+ yield buffer[:pcm_sample_size]
+ bytes_written += pcm_sample_size
+ buffer = buffer[pcm_sample_size:]
#### HANDLE END OF TRACK
if last_fadeout_part:
# no crossfade enabled, just yield the (entire) buffer last part
yield buffer
bytes_written += len(buffer)
- # clear vars
- buffer = b""
# update duration details based on the actual pcm data we sent
# this also accounts for crossfade and silence stripping
- queue_track.streamdetails.seconds_streamed = bytes_written / pcm_sample_size
+ seconds_streamed = (bytes_written + len(last_fadeout_part)) / pcm_sample_size
+ queue_track.streamdetails.seconds_streamed = seconds_streamed
queue_track.streamdetails.duration = (
- queue_track.streamdetails.seconds_skipped
- or 0 + queue_track.streamdetails.seconds_streamed
+ queue_track.streamdetails.seek_position + seconds_streamed
)
- total_bytes_written += bytes_written
self.logger.debug(
"Finished Streaming queue track: %s (%s) on queue %s - seconds streamed: %s",
queue_track.streamdetails.uri,
queue_track.name,
queue.display_name,
- queue_track.streamdetails.seconds_streamed,
+ seconds_streamed,
)
# end of queue flow: make sure we yield the last_fadeout_part
if last_fadeout_part:
yield last_fadeout_part
+ del last_fadeout_part
+ del buffer
self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
def _log_request(self, request: web.Request) -> None:
# also host the audio preview service
routes.append(("GET", "/preview", self.serve_preview_stream))
# start the webserver
+ default_publish_ip = await get_ip()
if self.mass.running_as_hass_addon:
# if we're running on the HA supervisor the webserver is secured by HA ingress
# we only start the webserver on the internal docker network and ingress connects
# to that internally and exposes the webUI securely
# if a user also wants to expose a the webserver non securely on his internal
# network he/she should explicitly do so (and know the risks)
- default_publish_ip = await get_ip()
self.publish_port = DEFAULT_SERVER_PORT
if config.get_value(CONF_EXPOSE_SERVER):
bind_ip = "0.0.0.0"
else:
base_url = config.get_value(CONF_BASE_URL)
self.publish_port = config.get_value(CONF_BIND_PORT)
- self.publish_ip = bind_ip = config.get_value(CONF_BIND_IP)
+ self.publish_ip = default_publish_ip
+ bind_ip = config.get_value(CONF_BIND_IP)
await self._server.setup(
bind_ip=bind_ip,
bind_port=self.publish_port,
item_name = f"{streamdetails.provider}/{streamdetails.item_id}"
LOGGER.debug("Start analyzing EBU R128 loudness for %s", item_name)
# calculate EBU R128 integrated loudness with ffmpeg
- ffmpeg_args = _get_ffmpeg_args(
+ ffmpeg_args = get_ffmpeg_args(
input_format=streamdetails.audio_format,
output_format=streamdetails.audio_format,
filter_params=["loudnorm=print_format=json"],
if chunk_count == 600:
# safety guard: max (more or less) 10 minutes of audio may be analyzed!
break
- ffmpeg_proc.write_eof()
+ await ffmpeg_proc.write_eof()
_, stderr = await ffmpeg_proc.communicate()
if loudness_details := _parse_loudnorm(stderr):
"""
logger = LOGGER.getChild("media_stream")
bytes_sent = 0
- streamdetails.seconds_skipped = streamdetails.seek_position
is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
if is_radio or streamdetails.seek_position:
strip_silence_begin = False
pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
chunk_size = pcm_sample_size * (1 if is_radio else 2)
expected_chunks = int((streamdetails.duration or 0) / 2)
- if expected_chunks < 60:
+ if expected_chunks < 10:
strip_silence_end = False
# collect all arguments for ffmpeg
filter_params.append(filter_rule)
if streamdetails.fade_in:
filter_params.append("afade=type=in:start_time=0:duration=3")
- ffmpeg_args = _get_ffmpeg_args(
+ ffmpeg_args = get_ffmpeg_args(
input_format=streamdetails.audio_format,
output_format=pcm_format,
filter_params=filter_params,
async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
await ffmpeg_proc.write(audio_chunk)
# write eof when last packet is received
- ffmpeg_proc.write_eof()
+ await ffmpeg_proc.write_eof()
logger.log(VERBOSE_LOG_LEVEL, "writer finished for %s", streamdetails.uri)
if streamdetails.direct is None:
if prev_chunk:
yield prev_chunk
bytes_sent += len(prev_chunk)
-
prev_chunk = chunk
- # all chunks received, strip silence of last part
-
+ # all chunks received, strip silence of last part if needed and yield remaining bytes
if strip_silence_end and prev_chunk:
final_chunk = await strip_silence(
mass,
)
else:
final_chunk = prev_chunk
-
- # ensure the final chunk is sent
- # its important this is done here at the end so we can catch errors first
yield final_chunk
bytes_sent += len(final_chunk)
del final_chunk
Takes care of resampling and/or recoding if needed,
according to player preferences.
"""
- logger = LOGGER.getChild("media_stream")
+ logger = LOGGER.getChild("ffmpeg_stream")
use_stdin = not isinstance(audio_input, str)
- ffmpeg_args = _get_ffmpeg_args(
+ ffmpeg_args = get_ffmpeg_args(
input_format=input_format,
output_format=output_format,
filter_params=filter_params or [],
if ffmpeg_proc.closed:
return
await ffmpeg_proc.write(chunk)
- ffmpeg_proc.write_eof()
+ await ffmpeg_proc.write_eof()
try:
if not isinstance(audio_input, str):
if writer_task and not writer_task.done():
writer_task.cancel()
# use communicate to read stderr and wait for exit
- # read log for loudness measurement (or errors)
_, stderr = await ffmpeg_proc.communicate()
if ffmpeg_proc.returncode != 0:
# ffmpeg has a non zero returncode meaning trouble
version = output.decode().split("ffmpeg version ")[1].split(" ")[0].split("-")[0]
libsoxr_support = "enable-libsoxr" in output.decode()
result = (ffmpeg_present, libsoxr_support, version)
- # store in global cache for easy access by '_get_ffmpeg_args'
+ # store in global cache for easy access by 'get_ffmpeg_args'
await set_global_cache_values({"ffmpeg_support": result})
return result
async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30):
await ffmpeg_proc.write(audio_chunk)
# write eof when last packet is received
- ffmpeg_proc.write_eof()
+ await ffmpeg_proc.write_eof()
if not streamdetails.direct:
writer_task = asyncio.create_task(writer())
return filter_params
-def _get_ffmpeg_args(
+def get_ffmpeg_args(
input_format: AudioFormat,
output_format: AudioFormat,
filter_params: list[str],
extra_args: list[str],
input_path: str = "-",
output_path: str = "-",
+ loglevel: str = "info",
) -> list[str]:
"""Collect all args to send to the ffmpeg process."""
ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support")
"ffmpeg",
"-hide_banner",
"-loglevel",
- "info",
+ loglevel,
"-ignore_unknown",
"-protocol_whitelist",
"file,http,https,tcp,tls,crypto,pipe,data,fd",
from __future__ import annotations
import asyncio
+from types import TracebackType
from typing import TYPE_CHECKING
from aiohttp.web import Request, Response
)
return self
- async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> bool | None:
"""Exit context manager."""
self.mass.streams.unregister_dynamic_route(f"/callback/{self.session_id}", "GET")
-"""Implementation of a (truly) non blocking subprocess.
+"""
+AsyncProcess.
-The subprocess implementation in asyncio can (still) sometimes cause deadlocks,
-even when properly handling reading/writes from different tasks.
+Wrapper around asyncio subprocess to help with using pipe streams and
+taking care of properly closing the process in case of exit (on both success and failures),
+without deadlocking.
"""
from __future__ import annotations
import asyncio
import logging
+import os
from contextlib import suppress
+from types import TracebackType
from typing import TYPE_CHECKING
if TYPE_CHECKING:
class AsyncProcess:
- """Implementation of a (truly) non blocking subprocess."""
+ """
+ AsyncProcess.
+
+ Wrapper around asyncio subprocess to help with using pipe streams and
+ taking care of properly closing the process in case of exit (on both success and failures),
+ without deadlocking.
+ """
def __init__(
self,
- args: list,
+ args: list[str],
enable_stdin: bool = False,
enable_stdout: bool = True,
enable_stderr: bool = False,
) -> None:
- """Initialize."""
- self._proc = None
+ """Initialize AsyncProcess."""
+ self.proc: asyncio.subprocess.Process | None = None
self._args = args
self._enable_stdin = enable_stdin
self._enable_stdout = enable_stdout
self._enable_stderr = enable_stderr
+ self._close_called = False
+ self._stdin_lock = asyncio.Lock()
+ self._stdout_lock = asyncio.Lock()
+ self._stderr_lock = asyncio.Lock()
+ self._returncode: bool | None = None
@property
def closed(self) -> bool:
"""Return if the process was closed."""
- return self.returncode is not None
+ return self._close_called or self.returncode is not None
@property
def returncode(self) -> int | None:
"""Return the erturncode of the process."""
- if self._proc is None:
+ if self._returncode is not None:
+ return self._returncode
+ if self.proc is None:
return None
- return self._proc.returncode
+ return self.proc.returncode
async def __aenter__(self) -> AsyncProcess:
"""Enter context manager."""
await self.start()
return self
- async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> bool | None:
"""Exit context manager."""
await self.close()
+ self._returncode = self.returncode
+ del self.proc
+ del self._stdin_lock
+ del self._stdout_lock
+ del self._returncode
async def start(self) -> None:
"""Perform Async init of process."""
- self._proc = await asyncio.create_subprocess_exec(
+ self.proc = await asyncio.create_subprocess_exec(
*self._args,
stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
close_fds=True,
)
+ proc_name_simple = self._args[0].split(os.sep)[-1]
+ LOGGER.debug("Started %s with PID %s", proc_name_simple, self.proc.pid)
async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks of n size from the process stdout."""
while True:
chunk = await self.readexactly(n)
- yield chunk
- if len(chunk) < n:
+ if len(chunk) == 0:
break
+ yield chunk
async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks as they come in from process stdout."""
while True:
chunk = await self.read(n)
- if chunk == b"":
+ if len(chunk) == 0:
break
yield chunk
async def readexactly(self, n: int) -> bytes:
"""Read exactly n bytes from the process stdout (or less if eof)."""
+ if self._close_called or self.proc.stdout.at_eof():
+ return b""
try:
- return await self._proc.stdout.readexactly(n)
+ async with self._stdout_lock:
+ return await self.proc.stdout.readexactly(n)
except asyncio.IncompleteReadError as err:
return err.partial
and may return less or equal bytes than requested, but at least one byte.
If EOF was received before any byte is read, this function returns empty byte object.
"""
- return await self._proc.stdout.read(n)
+ if self._close_called or self.proc.stdout.at_eof():
+ return b""
+ if self.proc.stdout.at_eof():
+ return b""
+ async with self._stdout_lock:
+ return await self.proc.stdout.read(n)
async def write(self, data: bytes) -> None:
"""Write data to process stdin."""
- if self.closed or self._proc.stdin.is_closing():
+ if self._close_called or self.proc.stdin.is_closing():
return
- self._proc.stdin.write(data)
- with suppress(BrokenPipeError):
- await self._proc.stdin.drain()
-
- def write_eof(self) -> None:
+ if not self.proc or self.proc.returncode is not None:
+ raise RuntimeError("Process not started or already exited")
+ async with self._stdin_lock:
+ self.proc.stdin.write(data)
+ with suppress(BrokenPipeError):
+ await self.proc.stdin.drain()
+
+ async def write_eof(self) -> None:
"""Write end of file to to process stdin."""
if not self._enable_stdin:
- return
- if self.closed or self._proc.stdin.is_closing():
+ raise RuntimeError("STDIN is not enabled")
+ if not self.proc or self.proc.returncode is not None:
+ raise RuntimeError("Process not started or already exited")
+ if self._close_called or self.proc.stdin.is_closing():
return
try:
- if self._proc.stdin.can_write_eof():
- self._proc.stdin.write_eof()
+ async with self._stdin_lock:
+ if self.proc.stdin.can_write_eof():
+ self.proc.stdin.write_eof()
except (
AttributeError,
AssertionError,
async def close(self) -> int:
"""Close/terminate the process and wait for exit."""
- if self.returncode is not None:
- return self.returncode
- # make sure the process is cleaned up
- self._proc.terminate()
- try:
- async with asyncio.timeout(10):
- await self.communicate()
- except (TimeoutError, asyncio.CancelledError):
- self._proc.kill()
- return await self.wait()
+ self._close_called = True
+ if self.proc.returncode is None:
+ # make sure the process is cleaned up
+ try:
+ # we need to use communicate to ensure buffers are flushed
+ await asyncio.wait_for(self.proc.communicate(), 5)
+ except TimeoutError:
+ LOGGER.debug(
+ "Process with PID %s did not stop within 5 seconds. Sending terminate...",
+ self.proc.pid,
+ )
+ self.proc.terminate()
+ await self.proc.communicate()
+ LOGGER.debug(
+ "Process with PID %s stopped with returncode %s", self.proc.pid, self.proc.returncode
+ )
+ return self.proc.returncode
async def wait(self) -> int:
"""Wait for the process and return the returncode."""
if self.returncode is not None:
return self.returncode
- return await self._proc.wait()
+ return await self.proc.wait()
async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
"""Write bytes to process and read back results."""
- stdout, stderr = await self._proc.communicate(input_data)
+ if self.closed:
+ return (b"", b"")
+ async with self._stdout_lock, self._stdin_lock, self._stderr_lock:
+ stdout, stderr = await self.proc.communicate(input_data)
return (stdout, stderr)
async def read_stderr(self) -> AsyncGenerator[bytes, None]:
"""Read lines from the stderr stream."""
- async for line in self._proc.stderr:
- yield line
+ async with self._stderr_lock:
+ async for line in self.proc.stderr:
+ if self.closed:
+ break
+ yield line
async def check_output(shell_cmd: str) -> tuple[int, bytes]:
if ffmpeg_proc.closed:
break
await ffmpeg_proc.write(chunk)
- ffmpeg_proc.write_eof()
+ await ffmpeg_proc.write_eof()
# feed the file contents to the process stdin
if file_path == "-":
import platform
import socket
import time
-from collections.abc import AsyncGenerator
from contextlib import suppress
from dataclasses import dataclass
from random import randint, randrange
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
-from music_assistant.server.helpers.audio import (
- get_ffmpeg_stream,
- get_media_stream,
- get_player_filter_params,
-)
-from music_assistant.server.helpers.process import check_output
+from music_assistant.server.helpers.audio import get_media_stream
+from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.models.player_provider import PlayerProvider
from music_assistant.server.providers.ugp import UGP_PREFIX
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
+ from music_assistant.server.controllers.streams import QueueStreamJob
from music_assistant.server.models import ProviderInstanceType
DOMAIN = "airplay"
CACHE_KEY_PREV_VOLUME = "airplay_prev_volume"
FALLBACK_VOLUME = 20
+AIRPLAY_PCM_FORMAT = AudioFormat(
+ content_type=ContentType.from_bit_depth(16), sample_rate=44100, bit_depth=16
+)
+
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
return None
-class AirplayStreamJob:
+class AirplayStream:
"""Object that holds the details of a stream job."""
def __init__(self, prov: AirplayProvider, airplay_player: AirPlayPlayer) -> None:
- """Initialize AirplayStreamJob."""
+ """Initialize AirplayStream."""
self.prov = prov
self.mass = prov.mass
self.airplay_player = airplay_player
# always generate a new active remote id to prevent race conditions
- # with the named pipe used to send commands
+ # with the named pipe used to send audio
self.active_remote_id: str = str(randint(1000, 8000))
self.start_ntp: int | None = None # use as checksum
self.prevent_playback: bool = False
- self._audio_iterator: AsyncGenerator[bytes, None] | None = None
+ self.stream_job: QueueStreamJob | None = None
self._log_reader_task: asyncio.Task | None = None
self._audio_reader_task: asyncio.Task | None = None
- self._cliraop_proc: asyncio.subprocess.Process | None = None
+ self._cliraop_proc: AsyncProcess | None = None
self._stop_requested = False
@property
and self._cliraop_proc.returncode is None
)
- async def start(self, start_ntp: int, audio_iterator: AsyncGenerator[bytes, None]) -> None:
+ async def start(self, start_ntp: int, stream_job: QueueStreamJob) -> None:
"""Initialize CLIRaop process for a player."""
self.start_ntp = start_ntp
- self._audio_iterator = audio_iterator
+ self.stream_job = stream_job
extra_args = []
player_id = self.airplay_player.player_id
mass_player = self.mass.players.get(player_id)
for prop in ("et", "md", "am", "pk", "pw"):
if prop_value := self.airplay_player.discovery_info.decoded_properties.get(prop):
extra_args += [f"-{prop}", prop_value]
-
sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
if device_password := self.mass.config.get_raw_player_config_value(
player_id, CONF_PASSWORD, None
elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
extra_args += ["-debug", "10"]
- args = [
+ cliraop_args = [
self.prov.cliraop_bin,
"-ntpstart",
str(start_ntp),
"-activeremote",
self.active_remote_id,
"-udn",
- str(self.airplay_player.discovery_info.name),
+ self.airplay_player.discovery_info.name,
self.airplay_player.address,
"-",
]
if platform.system() == "Darwin":
os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
- self._cliraop_proc = await asyncio.create_subprocess_exec(
- *args,
- stdin=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- close_fds=True,
+
+ self._cliraop_proc = AsyncProcess(
+ cliraop_args,
+ enable_stdin=True,
+ enable_stdout=False,
+ enable_stderr=True,
)
+ await self._cliraop_proc.start()
self._log_reader_task = asyncio.create_task(self._log_watcher())
self._audio_reader_task = asyncio.create_task(self._audio_reader())
# send stop with cli command
await self.send_cli_command("ACTION=STOP")
- async def wait_for_stop() -> None:
+ async def _stop() -> None:
# always stop the audio feeder
if self._audio_reader_task and not self._audio_reader_task.done():
with suppress(asyncio.CancelledError):
self._audio_reader_task.cancel()
- # make sure stdin is drained (otherwise we'll deadlock)
- if self._cliraop_proc and self._cliraop_proc.returncode is None:
- if self._cliraop_proc.stdin.can_write_eof():
- self._cliraop_proc.stdin.write_eof()
- with suppress(BrokenPipeError):
- await self._cliraop_proc.stdin.drain()
+ await self._cliraop_proc.write_eof()
+ # the process should exit gracefully after the stop request was processed
await asyncio.wait_for(self._cliraop_proc.wait(), 30)
- task = self.mass.create_task(wait_for_stop())
+ task = self.mass.create_task(_stop())
if wait:
await task
async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLIRaop binary."""
- if not (self._cliraop_proc and self._cliraop_proc.returncode is None):
+ if not self._cliraop_proc or self._cliraop_proc.closed:
return
named_pipe = f"/tmp/fifo-{self.active_remote_id}" # noqa: S108
mass_player = self.mass.players.get(airplay_player.player_id)
logger = airplay_player.logger
lost_packets = 0
- async for line in self._cliraop_proc.stderr:
+ async for line in self._cliraop_proc.read_stderr():
line = line.decode().strip() # noqa: PLW2901
if not line:
continue
logger.log(VERBOSE_LOG_LEVEL, line)
# if we reach this point, the process exited
- logger.debug(
- "CLIRaop process stopped with errorcode %s",
- self._cliraop_proc.returncode,
- )
if airplay_player.active_stream == self:
mass_player.state = PlayerState.IDLE
self.mass.players.update(airplay_player.player_id)
async def _audio_reader(self) -> None:
- """Read audio chunks and send them to the cliraop process."""
+ """Send audio chunks to the cliraop process."""
logger = self.airplay_player.logger
mass_player = self.mass.players.get(self.airplay_player.player_id, True)
queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
)
prev_metadata_checksum: str = ""
prev_progress_report: float = 0
- async for chunk in self._audio_iterator:
- if not self.running:
- return
- self._cliraop_proc.stdin.write(chunk)
- try:
- await self._cliraop_proc.stdin.drain()
- except (BrokenPipeError, ConnectionResetError):
- break
- if not self.running:
+
+ async for chunk in self.stream_job.iter_player_audio(
+ self.airplay_player.player_id, AIRPLAY_PCM_FORMAT
+ ):
+ if self._stop_requested:
return
+ await self._cliraop_proc.write(chunk)
# send metadata to player(s) if needed
# NOTE: this must all be done in separate tasks to not disturb audio
now = time.time()
prev_progress_report = now
self.mass.create_task(self._send_progress(queue))
# send EOF
- if self._cliraop_proc.returncode is None and not self._cliraop_proc.stdin.is_closing():
- self._cliraop_proc.stdin.write_eof()
- with suppress(BrokenPipeError, ConnectionResetError):
- await self._cliraop_proc.stdin.drain()
+ await self._cliraop_proc.write_eof()
logger.debug(
"Finished RAOP stream for Queue %s to %s",
queue.display_name,
discovery_info: AsyncServiceInfo
address: str
logger: logging.Logger
- active_stream: AirplayStreamJob | None = None
+ active_stream: AirplayStream | None = None
class AirplayProvider(PlayerProvider):
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream.running:
await airplay_player.active_stream.stop(wait=False)
- pcm_format = AudioFormat(
- content_type=ContentType.PCM_S16LE,
- sample_rate=44100,
- bit_depth=16,
- channels=2,
- )
- if queue_item.media_type == MediaType.ANNOUNCEMENT:
- # stream announcement url directly
- stream_job = None
- elif (
- queue_item.queue_id.startswith(UGP_PREFIX)
- and (stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id))
- and stream_job.pending
- ):
- # handle special case for UGP multi client stream
- pass
- elif player.group_childs:
- # create a new multi client flow stream
- stream_job = await self.mass.streams.create_multi_client_stream_job(
- queue_item.queue_id,
- queue_item,
- pcm_bit_depth=16,
- pcm_sample_rate=44100,
- )
+
+ if queue_item.queue_id.startswith(UGP_PREFIX):
+ # special case: we got forwarded a request from the UGP
+ # use the existing stream job that was already created by UGP
+ stream_job = self.mass.streams.stream_jobs[queue_item.queue_id]
else:
- # for a single player we just consume the flow stream directly
- stream_job = None
+ if queue_item.media_type == MediaType.ANNOUNCEMENT:
+ # stream announcement url directly
+ audio_source = get_media_stream(
+ self.mass, queue_item.streamdetails, pcm_format=AIRPLAY_PCM_FORMAT
+ )
+ else:
+ queue = self.mass.player_queues.get(queue_item.queue_id)
+ audio_source = self.mass.streams.get_flow_stream(
+ queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT
+ )
+ stream_job = self.mass.streams.create_stream_job(
+ queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=AIRPLAY_PCM_FORMAT
+ )
- # Python is not suitable for realtime audio streaming.
- # So, I've decided to go the fancy route here. I've created a small binary
- # written in C based on libraop to do the actual timestamped playback.
- # the raw pcm audio is fed to the stdin of this cliraop binary and we can
- # send some commands over a named pipe.
+ # Python is not suitable for realtime audio streaming so we do the actual streaming
+ # of (RAOP) audio using a small executable written in C based on libraop to do the actual
+ # timestamped playback. The raw pcm audio is fed to a named pipe, read by the executable
+ # and we can send some ingteractie commands to the process stdin.
# get current ntp before we start
_, stdout = await check_output(f"{self.cliraop_bin} -ntp")
# setup Raop process for player and its sync childs
async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
- if stream_job:
- stream_job.expected_players.add(airplay_player.player_id)
- audio_iterator = stream_job.subscribe(
- player_id=airplay_player.player_id,
- output_format=pcm_format,
- )
- elif queue_item.media_type == MediaType.ANNOUNCEMENT:
- # stream announcement url directly
- audio_iterator = get_media_stream(
- self.mass, queue_item.streamdetails, pcm_format=pcm_format
- )
- else:
- queue = self.mass.player_queues.get_active_queue(queue_item.queue_id)
- audio_iterator = get_ffmpeg_stream(
- self.mass.streams.get_flow_stream(
- queue,
- start_queue_item=queue_item,
- pcm_format=pcm_format,
- ),
- input_format=pcm_format,
- output_format=pcm_format,
- filter_params=get_player_filter_params(self.mass, airplay_player.player_id),
- )
- airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
- tg.create_task(airplay_player.active_stream.start(start_ntp, audio_iterator))
- if stream_job and queue_item.queue_item_id != "flow":
+ stream_job.expected_players.add(airplay_player.player_id)
+ airplay_player.active_stream = AirplayStream(self, airplay_player)
+ tg.create_task(airplay_player.active_stream.start(start_ntp, stream_job))
+ if not queue_item.queue_id.startswith(UGP_PREFIX):
stream_job.start()
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
elif path == "/ctrl-int/1/shuffle_songs":
queue = self.mass.player_queues.get(player_id)
- self.mass.create_task(
+ self.mass.loop.call_soon(
self.mass.player_queues.set_shuffle(
active_queue.queue_id, not queue.shuffle_enabled
)
use_flow_mode = await self.mass.config.get_player_config_value(
player_id, CONF_FLOW_MODE
) or await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
- url = await self.mass.streams.resolve_stream_url(
+ url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
url = self.mass.streams.get_command_url(queue_item, "next")
queue_item = None
else:
- url = await self.mass.streams.resolve_stream_url(
+ url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
"""Handle PLAY MEDIA on given player."""
use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
- url = await self.mass.streams.resolve_stream_url(
+ url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
"""Handle enqueuing of the next queue item on the player."""
dlna_player = self.dlnaplayers[player_id]
- url = await self.mass.streams.resolve_stream_url(
+ url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
"""Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
- url = await self.mass.streams.resolve_stream_url(
+ url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
"""Handle PLAY MEDIA on given player."""
use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
- url = await self.mass.streams.resolve_stream_url(
+ url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
This will NOT be called if the end of the queue is reached (and repeat disabled).
This will NOT be called if the player is using flow mode to playback the queue.
"""
- url = await self.mass.streams.resolve_stream_url(
+ url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
RepeatMode,
)
from music_assistant.common.models.errors import MusicAssistantError, SetupFailedError
+from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.constants import (
CONF_CROSSFADE,
VERBOSE_LOG_LEVEL,
)
from music_assistant.server.models.player_provider import PlayerProvider
+from music_assistant.server.providers.ugp import UGP_PREFIX
if TYPE_CHECKING:
from aioslimproto.models import SlimEvent
REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
# sync constants
-MIN_DEVIATION_ADJUST = 6 # 6 milliseconds
+MIN_DEVIATION_ADJUST = 8 # 5 milliseconds
MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements
-DEVIATION_JUMP_IGNORE = 2000 # ignore a sudden unrealistic jump
-MAX_SKIP_AHEAD_MS = 500 # 0.5 seconds
+DEVIATION_JUMP_IGNORE = 5000 # ignore a sudden unrealistic jump
+MAX_SKIP_AHEAD_MS = 800 # 0.8 seconds
@dataclass
CONF_ENTRY_DISPLAY = ConfigEntry(
key=CONF_DISPLAY,
type=ConfigEntryType.BOOLEAN,
- default_value=True,
+ default_value=False,
required=False,
label="Enable display support",
- description="Enable/disable native display support on " "squeezebox or squeezelite32 hardware.",
+ description="Enable/disable native display support on squeezebox or squeezelite32 hardware.",
advanced=True,
)
CONF_ENTRY_VISUALIZATION = ConfigEntry(
slimproto: SlimServer
_sync_playpoints: dict[str, deque[SyncPlayPoint]]
+ _do_not_resync_before: dict[str, float]
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self._sync_playpoints = {}
+ self._do_not_resync_before = {}
self._resync_handle: asyncio.TimerHandle | None = None
control_port = self.config.get_value(CONF_PORT)
telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT)
if player.synced_to:
msg = "A synced player cannot receive play commands directly"
raise RuntimeError(msg)
- enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
+
if player.group_childs:
- # player has sync members, we need to start a multi slimplayer stream job
- stream_job = await self.mass.streams.create_multi_client_stream_job(
+ # player has sync members, we need to start a (multi-player) stream job
+ # to make sure that all clients receive the exact same audio
+ pcm_format = AudioFormat(
+ content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
+ )
+ queue = self.mass.player_queues.get(queue_item.queue_id)
+ stream_job = self.mass.streams.create_stream_job(
queue_id=queue_item.queue_id,
- start_queue_item=queue_item,
+ pcm_audio_source=self.mass.streams.get_flow_stream(
+ queue,
+ start_queue_item=queue_item,
+ pcm_format=pcm_format,
+ ),
+ pcm_format=pcm_format,
)
# forward command to player and any connected sync members
sync_clients = self._get_sync_clients(player_id)
async with asyncio.TaskGroup() as tg:
for slimplayer in sync_clients:
+ enforce_mp3 = await self.mass.config.get_player_config_value(
+ slimplayer.player_id, CONF_ENFORCE_MP3
+ )
tg.create_task(
self._handle_play_url(
slimplayer,
auto_play=False,
)
)
- if queue_item.queue_item_id != "flow":
+ if not queue_item.queue_id.startswith(UGP_PREFIX):
stream_job.start()
else:
# regular, single player playback
slimplayer = self.slimproto.get_player(player_id)
if not slimplayer:
return
- url = await self.mass.streams.resolve_stream_url(
+ enforce_mp3 = await self.mass.config.get_player_config_value(
+ player_id, CONF_ENFORCE_MP3
+ )
+ url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
- # for now just hardcode flac as we assume that every (modern)
- # slimproto based player can handle that just fine
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
flow_mode=False,
)
if not (slimplayer := self.slimproto.get_player(player_id)):
return
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
- url = await self.mass.streams.resolve_stream_url(
+ url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
self.mass.player_queues.set_shuffle(queue.queue_id, not queue.shuffle_enabled)
slimplayer.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
slimplayer.signal_update()
- elif event.data == "button jump_fwd":
+ elif event.data in ("button jump_fwd", "button fwd"):
await self.mass.player_queues.next(queue.queue_id)
- elif event.data == "button jump_rew":
+ elif event.data in ("button jump_rew", "button rew"):
await self.mass.player_queues.previous(queue.queue_id)
elif event.data.startswith("time "):
# seek request
sync_playpoints = self._sync_playpoints[slimplayer.player_id]
active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
- stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id)
+ stream_job = self.mass.streams.stream_jobs.get(active_queue.queue_id)
if not stream_job:
# should not happen, but just in case
return
now = time.time()
+ if now < self._do_not_resync_before[slimplayer.player_id]:
+ return
+
last_playpoint = sync_playpoints[-1] if sync_playpoints else None
if last_playpoint and (now - last_playpoint.timestamp) > 10:
# last playpoint is too old, invalidate
# we can now append the current playpoint to our list
sync_playpoints.append(SyncPlayPoint(now, stream_job.job_id, diff))
- if len(sync_playpoints) < MIN_REQ_PLAYPOINTS:
+ min_req_playpoints = 2 if sync_master.elapsed_seconds < 2 else MIN_REQ_PLAYPOINTS
+ if len(sync_playpoints) < min_req_playpoints:
return
# get the average diff
# resync the player by skipping ahead or pause for x amount of (milli)seconds
sync_playpoints.clear()
+ self._do_not_resync_before[player.player_id] = now + 5
if avg_diff > MAX_SKIP_AHEAD_MS:
# player lagging behind more than MAX_SKIP_AHEAD_MS,
# we need to correct the sync_master
self.logger.debug("%s resync: pauseFor %sms", sync_master.name, delta)
self.mass.create_task(sync_master.pause_for(delta))
- sync_master._elapsed_milliseconds -= delta
elif avg_diff > 0:
# handle player lagging behind, fix with skip_ahead
self.logger.debug("%s resync: skipAhead %sms", player.display_name, delta)
self.mass.create_task(slimplayer.skip_over(delta))
- sync_master._elapsed_milliseconds += delta
else:
# handle player is drifting too far ahead, use pause_for to adjust
self.logger.debug("%s resync: pauseFor %sms", player.display_name, delta)
self.mass.create_task(slimplayer.pause_for(delta))
- sync_master._elapsed_milliseconds -= delta
async def _handle_buffer_ready(self, slimplayer: SlimClient) -> None:
"""Handle buffer ready event, player has buffered a (new) track.
async with asyncio.TaskGroup() as tg:
for _client in self._get_sync_clients(player.player_id):
self._sync_playpoints.setdefault(
- _client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS * 2)
+ _client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS)
).clear()
# NOTE: Officially you should do an unpause_at based on the player timestamp
# but I did not have any good results with that.
# Instead just start playback on all players and let the sync logic work out
# the delays etc.
+ self._do_not_resync_before[_client.player_id] = time.time() + 1
tg.create_task(_client.unpause_at(0))
async def _handle_connected(self, slimplayer: SlimClient) -> None:
init_volume = DEFAULT_PLAYER_VOLUME
init_power = False
await slimplayer.power(init_power)
+ await slimplayer.stop()
await slimplayer.volume_set(init_volume)
def _get_sync_clients(self, player_id: str) -> Iterator[SlimClient]:
import asyncio
import random
+import socket
import time
from contextlib import suppress
from typing import TYPE_CHECKING, cast
from snapcast.control import create_server
from snapcast.control.client import Snapclient
+from zeroconf import NonUniqueNameException
+from zeroconf.asyncio import AsyncServiceInfo
+from music_assistant.common.helpers.util import get_ip_pton
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
self._snapcast_server_control_port = self.config.get_value(CONF_SERVER_CONTROL_PORT)
self._use_builtin_server = not self.config.get_value(CONF_USE_EXTERNAL_SERVER)
self._stream_tasks = {}
+
if self._use_builtin_server:
# start our own builtin snapserver
self._snapserver_started = asyncio.Event()
"""Handle close/cleanup of the provider."""
for client in self._snapserver.clients:
await self.cmd_stop(client.identifier)
- await self._snapserver.stop()
- self._snapserver_started.clear()
if self._snapserver_runner and not self._snapserver_runner.done():
self._snapserver_runner.cancel()
- await asyncio.sleep(2) # prevent race conditions when reloading
+ await asyncio.sleep(6) # prevent race conditions when reloading
+ await self._snapserver.stop()
+ self._snapserver_started.clear()
+
+ def on_player_config_removed(self, player_id: str) -> None:
+ """Call (by config manager) when the configuration of a player is removed."""
+ super().on_player_config_removed(player_id)
+ if self._use_builtin_server:
+ self.mass.create_task(self._snapserver.delete_client(player_id))
def _handle_update(self) -> None:
"""Process Snapcast init Player/Group and set callback ."""
bit_depth=16,
channels=2,
)
- if queue_item.media_type == MediaType.ANNOUNCEMENT:
- # stream announcement url directly
- audio_iterator = get_media_stream(
- self.mass, queue_item.streamdetails, pcm_format=pcm_format
- )
- elif (
- queue_item.queue_id.startswith(UGP_PREFIX)
- and (stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id))
- and stream_job.pending
- ):
- # handle special case for UGP multi client stream
- stream_job = self.mass.streams.multi_client_jobs.get(queue_item.queue_id)
- stream_job.expected_players.add(player_id)
- audio_iterator = stream_job.subscribe(
- player_id=player_id,
- output_format=pcm_format,
- )
+
+ if queue_item.queue_id.startswith(UGP_PREFIX):
+ # special case: we got forwarded a request from the UGP
+ # use the existing stream job that was already created by UGP
+ stream_job = self.mass.streams.stream_jobs[queue_item.queue_id]
else:
- audio_iterator = self.mass.streams.get_flow_stream(
- queue,
- start_queue_item=queue_item,
- pcm_format=pcm_format,
+ if queue_item.media_type == MediaType.ANNOUNCEMENT:
+ # stream announcement url directly
+ audio_source = get_media_stream(
+ self.mass, queue_item.streamdetails, pcm_format=pcm_format
+ )
+ else:
+ queue = self.mass.player_queues.get(queue_item.queue_id)
+ audio_source = self.mass.streams.get_flow_stream(
+ queue, start_queue_item=queue_item, pcm_format=pcm_format
+ )
+ stream_job = self.mass.streams.create_stream_job(
+ queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=pcm_format
)
+ stream_job.expected_players.add(player_id)
async def _streamer() -> None:
host = self._snapcast_server_host
- _, writer = await asyncio.open_connection(host, port)
- self.logger.debug("Opened connection to %s:%s", host, port)
- player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
- player.elapsed_time = 0
- player.elapsed_time_last_updated = time.time()
- player.state = PlayerState.PLAYING
- self._set_childs_state(player_id, PlayerState.PLAYING)
- self.mass.players.register_or_update(player)
+ self.mass.players.update(player_id)
+
+ def stream_callback(_stream) -> None:
+ player.state = PlayerState(stream.status)
+ if player.state == PlayerState.PLAYING:
+ player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
+ player.elapsed_time = 0
+ player.elapsed_time_last_updated = time.time()
+ self._set_childs_state(player_id, player.state)
+
+ stream.set_callback(stream_callback)
+ stream_path = f"tcp://{host}:{port}"
+ self.logger.debug("Start streaming to %s", stream_path)
try:
- async for pcm_chunk in audio_iterator:
- writer.write(pcm_chunk)
- await writer.drain()
- # end of the stream reached
- if writer.can_write_eof():
- writer.write_eof()
- await writer.drain()
- # we need to wait a bit before removing the stream to ensure
- # that all snapclients have consumed the audio
- # https://github.com/music-assistant/hass-music-assistant/issues/1962
- await asyncio.sleep(30)
+ await stream_job.stream_to_custom_output_path(
+ player_id, pcm_format, f"tcp://{host}:{port}"
+ )
+ # we need to wait a bit for the stream status to become idle
+ # to ensure that all snapclients have consumed the audio
+ await self.mass.players.wait_for_state(player, PlayerState.IDLE)
finally:
- if not writer.is_closing():
- writer.close()
+ self.logger.debug("Finished streaming to %s", stream_path)
+ # there is no way to unsub the callback to we do this nasty
+ stream._callback_func = None
await self._snapserver.stream_remove_stream(stream.identifier)
- self.logger.debug("Closed connection to %s:%s", host, port)
# start streaming the queue (pcm) audio in a background task
self._stream_tasks[player_id] = asyncio.create_task(_streamer())
+ if not queue_item.queue_id.startswith(UGP_PREFIX):
+ stream_job.start()
def _get_snapgroup(self, player_id: str) -> Snapgroup:
"""Get snapcast group for given player_id."""
raise RuntimeError("Snapserver is already started!")
logger = self.logger.getChild("snapserver")
logger.info("Starting builtin Snapserver...")
+ # register the snapcast mdns services
+ for name, port in (
+ ("-http", 1780),
+ ("-jsonrpc", 1705),
+ ("-stream", 1704),
+ ("-tcp", 1705),
+ ("", 1704),
+ ):
+ zeroconf_type = f"_snapcast{name}._tcp.local."
+ try:
+ info = AsyncServiceInfo(
+ zeroconf_type,
+ name=f"Snapcast.{zeroconf_type}",
+ properties={"is_mass": "true"},
+ addresses=[await get_ip_pton(self.mass.webserver.publish_ip)],
+ port=port,
+ server=f"{socket.gethostname()}",
+ )
+ attr_name = f"zc_service_set{name}"
+ if getattr(self, attr_name, None):
+ await self.mass.aiozc.async_update_service(info)
+ else:
+ await self.mass.aiozc.async_register_service(info, strict=False)
+ setattr(self, attr_name, True)
+ except NonUniqueNameException:
+ self.logger.debug(
+ "Could not register mdns record for %s as its already in use", zeroconf_type
+ )
+ except Exception as err:
+ self.logger.exception(
+ "Could not register mdns record for %s: %s", zeroconf_type, str(err)
+ )
async with AsyncProcess(
["snapserver"], enable_stdin=False, enable_stdout=True, enable_stderr=False
) as snapserver_proc:
"domain": "snapcast",
"name": "Snapcast",
"description": "Support for snapcast server and clients.",
- "codeowners": ["@SantigoSotoC"],
- "requirements": ["snapcast-mod==2.4.3"],
+ "codeowners": [
+ "@SantigoSotoC"
+ ],
+ "requirements": [
+ "snapcast==2.3.6"
+ ],
"documentation": "https://music-assistant.io/player-support/snapcast/",
"multi_instance": false,
"builtin": false,
from music_assistant.constants import CONF_CROSSFADE, VERBOSE_LOG_LEVEL
from music_assistant.server.helpers.didl_lite import create_didl_metadata
from music_assistant.server.models.player_provider import PlayerProvider
+from music_assistant.server.providers.ugp import UGP_PREFIX
from .player import SonosPlayer
queue_item: QueueItem,
) -> None:
"""Handle PLAY MEDIA on given player."""
- url = await self.mass.streams.resolve_stream_url(
- player_id,
- queue_item=queue_item,
- output_codec=ContentType.FLAC,
- )
sonos_player = self.sonosplayers[player_id]
mass_player = self.mass.players.get(player_id)
if sonos_player.sync_coordinator:
"accept play_media command, it is synced to another player."
)
raise PlayerCommandFailed(msg)
- await self.mass.create_task(
+
+ is_flow_stream = queue_item.queue_item_id == "flow" or queue_item.queue_id.startswith(
+ UGP_PREFIX
+ )
+ url = self.mass.streams.resolve_stream_url(
+ player_id, queue_item=queue_item, output_codec=ContentType.FLAC
+ )
+ self.mass.create_task(
sonos_player.soco.play_uri,
url,
- meta=create_didl_metadata(self.mass, url, queue_item),
+ meta=create_didl_metadata(self.mass, url, None if is_flow_stream else queue_item),
)
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
This will NOT be called if flow mode is enabled on the queue.
"""
sonos_player = self.sonosplayers[player_id]
- url = await self.mass.streams.resolve_stream_url(
+ url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
# retry with ap-port set to invalid value, which will force fallback
args += ["--ap-port", "12345"]
async with AsyncProcess(args) as librespot_proc:
- async for chunk in librespot_proc.iter_any(64000):
+ async for chunk in librespot_proc.iter_any():
yield chunk
self._ap_workaround = True
)
from music_assistant.common.models.enums import (
ConfigEntryType,
+ ContentType,
PlayerFeature,
PlayerState,
PlayerType,
ProviderFeature,
)
+from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.constants import CONF_CROSSFADE, CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX
+from music_assistant.server.controllers.streams import (
+ FLOW_DEFAULT_BIT_DEPTH,
+ FLOW_DEFAULT_SAMPLE_RATE,
+)
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
if member.state == PlayerState.IDLE:
continue
tg.create_task(self.mass.players.cmd_stop(member.player_id))
+ if existing := self.mass.streams.stream_jobs.pop(player_id, None):
+ existing.stop()
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY command to given player."""
await self.cmd_power(player_id, True)
group_player = self.mass.players.get(player_id)
+ await self.cmd_stop(player_id)
+
# create a multi-client stream job - all (direct) child's of this UGP group
# will subscribe to this multi client queue stream
- stream_job = await self.mass.streams.create_multi_client_stream_job(
- player_id,
- start_queue_item=queue_item,
+ pcm_format = AudioFormat(
+ content_type=ContentType.from_bit_depth(FLOW_DEFAULT_BIT_DEPTH),
+ sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
+ bit_depth=FLOW_DEFAULT_BIT_DEPTH,
+ )
+ queue = self.mass.player_queues.get(player_id)
+ stream_job = self.mass.streams.create_stream_job(
+ queue.queue_id,
+ pcm_audio_source=self.mass.streams.get_flow_stream(
+ queue=queue, start_queue_item=queue_item, pcm_format=pcm_format
+ ),
+ pcm_format=pcm_format,
)
# create a fake queue item to forward to downstream play_media commands
ugp_queue_item = QueueItem(
CONFIGURABLE_CORE_CONTROLLERS,
MIN_SCHEMA_VERSION,
ROOT_LOGGER_NAME,
+ VERBOSE_LOG_LEVEL,
)
from music_assistant.server.controllers.cache import CacheController
from music_assistant.server.controllers.config import ConfigController
await info.async_request(zeroconf, 3000)
await prov.on_mdns_service_state_change(name, state_change, info)
- LOGGER.debug(f"Service {name} of type {service_type} state changed: {state_change}")
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "Service %s of type %s state changed: %s",
+ name,
+ service_type,
+ state_change,
+ )
for prov in self._providers.values():
if not prov.manifest.mdns_discovery:
continue
) -> bool | None:
"""Exit context manager."""
await self.stop()
- if exc_val:
- raise exc_val
- return exc_type
async def _update_available_providers_cache(self) -> None:
"""Update the global cache variable of loaded/available providers."""
python-slugify==8.0.4
radios==0.3.0
shortuuid==1.0.12
-snapcast-mod==2.4.3
+snapcast==2.3.6
soco==0.30.2
sonos-websocket==0.1.3
tidalapi==0.7.4