"dockerfile": "Dockerfile",
"context": ".."
},
- "features": {
- },
+ "features": {},
"postCreateCommand": "./.devcontainer/post-create.sh",
- "forwardPorts": [
- 8095
- ]
+ "forwardPorts": [8095, 3483, 9000, 9090],
+ "runArgs": ["--network=host"]
}
"configurations": [
{
"name": "Python: Module",
- "type": "python",
+ "type": "debugpy",
"request": "launch",
"module": "music_assistant",
"justMyCode": false,
"args":[
"--log-level", "debug"
],
- // "env": {"PYTHONDEVMODE": "1"}
+ "env": {"PYTHONDEVMODE": "1"}
}
]
}
RUN set -x \
# add bookworm backports repo
&& sh -c 'echo "deb http://deb.debian.org/debian bookworm-backports main" >> /etc/apt/sources.list' \
+ # add multimedia repo
+ && sh -c 'echo "Types: deb\nURIs: https://www.deb-multimedia.org\nSuites: stable\nComponents: main non-free\nSigned-By: /etc/apt/trusted.gpg.d/deb-multimedia-keyring.gpg" >> /etc/apt/sources.list.d/deb-multimedia.sources' \
+ && sh -c 'echo "Package: *\nPin: origin www.deb-multimedia.org\nPin-Priority: 1" >> /etc/apt/preferences.d/99deb-multimedia' \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
ca-certificates \
git \
wget \
tzdata \
- ffmpeg \
libsox-fmt-all \
libsox3 \
sox \
libjemalloc2 \
# install snapcast server 0.27 from bookworm backports
&& apt-get install -y --no-install-recommends -t bookworm-backports snapserver \
+ # install ffmpeg 6 from multimedia repo
+ && cd /tmp && curl -sLO https://www.deb-multimedia.org/pool/main/d/deb-multimedia-keyring/deb-multimedia-keyring_2016.8.1_all.deb \
+ && apt install -y /tmp/deb-multimedia-keyring_2016.8.1_all.deb \
+ && apt-get update \
+ && apt install -y -t 'o=Unofficial Multimedia Packages' ffmpeg \
# cleanup
&& rm -rf /tmp/* \
&& rm -rf /var/lib/apt/lists/*
CONF_CROSSFADE_DURATION,
CONF_OUTPUT_CHANNELS,
CONF_PUBLISH_IP,
+ ROOT_LOGGER_NAME,
SILENCE_FILE,
UGP_PREFIX,
VERBOSE_LOG_LEVEL,
)
-from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.server.helpers.audio import (
check_audio_support,
crossfade_pcm_parts,
"with libsoxr support" if libsoxr_support else "",
)
# copy log level to audio module
- AUDIO_LOGGER.setLevel(self.logger.level)
+ logging.getLogger(f"{ROOT_LOGGER_NAME}.audio").setLevel(self.logger.level)
# start the webserver
self.publish_port = config.get_value(CONF_BIND_PORT)
self.publish_ip = config.get_value(CONF_PUBLISH_IP)
queue.queue_id, CONF_CROSSFADE_DURATION, 8
)
crossfade_size = int(pcm_sample_size * crossfade_duration)
- buffer_size = int(pcm_sample_size * 5) # 5 seconds
+ buffer_size = int(pcm_sample_size * 2) # 2 seconds
if use_crossfade:
+ # buffer size needs to be big enough to include the crossfade part
buffer_size += crossfade_size
bytes_written = 0
buffer = b""
pcm_format.bit_depth,
pcm_format.sample_rate,
)
- # send crossfade_part
- yield crossfade_part
+ # send crossfade_part (as one big chunk)
bytes_written += len(crossfade_part)
+ yield crossfade_part
+
# also write the leftover bytes from the crossfade action
if remaining_bytes:
yield remaining_bytes
#### OTHER: enough data in buffer, feed to output
while len(buffer) > buffer_size:
- yield buffer[:pcm_sample_size]
- bytes_written += pcm_sample_size
+ subchunk = buffer[:pcm_sample_size]
buffer = buffer[pcm_sample_size:]
+ bytes_written += len(subchunk)
+ yield subchunk
+ del subchunk
#### HANDLE END OF TRACK
if last_fadeout_part:
bytes_written += len(remaining_bytes)
del remaining_bytes
else:
- # no crossfade enabled, just yield the (entire) buffer last part
- yield buffer
+ # no crossfade enabled, just yield the buffer last part
bytes_written += len(buffer)
+ yield buffer
+ del buffer
# update duration details based on the actual pcm data we sent
# this also accounts for crossfade and silence stripping
if last_fadeout_part:
yield last_fadeout_part
del last_fadeout_part
- del buffer
+
self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
async def get_announcement_stream(
is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
if is_radio or streamdetails.seek_position:
strip_silence_begin = False
- # chunk size = 2 seconds of pcm audio
- pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
- chunk_size = pcm_sample_size * (1 if is_radio else 2)
- expected_chunks = int((streamdetails.duration or 0) / 2)
+ # chunk size = 1 second of pcm audio
+ pcm_sample_size = pcm_format.pcm_sample_size
+ chunk_size = pcm_sample_size # chunk size = sample size (= 1 second)
+ expected_chunks = int(((streamdetails.duration or 0) * pcm_sample_size) / chunk_size)
if expected_chunks < 10:
strip_silence_end = False
extra_args += ["-ss", str(seek_pos)]
if streamdetails.target_loudness is not None:
# add loudnorm filters
- filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5"
+ filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=11:TP=-2"
if streamdetails.loudness:
filter_rule += f":measured_I={streamdetails.loudness.integrated}"
filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
input_path=input_path,
# loglevel info is needed for loudness measurement
loglevel="info",
+ extra_input_args=["-filter_threads", "1"],
)
async def log_reader(ffmpeg_proc: AsyncProcess, state_data: dict[str, Any]):
if music_prov := self.mass.get_provider(streamdetails.provider):
self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
- # cleanup
- del state_data
- del ffmpeg_proc
-
async with AsyncProcess(
ffmpeg_args,
enable_stdin=audio_source_iterator is not None,
# collect this chunk for next round
prev_chunk = chunk
-
- # we did not receive any data, somethinh wet wrong
+ # if we did not receive any data, something went (terribly) wrong
# raise here to prevent an endless loop elsewhere
if state_data["bytes_sent"] == 0:
raise AudioError(f"stream error on {streamdetails.uri}")
else:
final_chunk = prev_chunk
- # yield final chunk to output
+ # yield final chunk to output (in chunk_size parts)
+ while len(final_chunk) > chunk_size:
+ yield final_chunk[:chunk_size]
+ final_chunk = final_chunk[chunk_size:]
+ state_data["bytes_sent"] += len(final_chunk)
yield final_chunk
state_data["bytes_sent"] += len(final_chunk)
state_data["finished"].set()
from typing import TYPE_CHECKING
import aiofiles
-from aiohttp import ClientError, ClientResponseError, ClientTimeout
+from aiohttp import ClientResponseError, ClientTimeout
from music_assistant.common.helpers.global_cache import (
get_global_cache_value,
) -> AsyncGenerator[bytes, None]:
"""Get radio audio stream from HTTP, including metadata retrieval."""
resolved_url, supports_icy, is_hls = await resolve_radio_stream(mass, url)
- retries = 0
- while True:
- try:
- retries += 1
- if is_hls: # special HLS stream
- async for chunk in get_hls_stream(mass, resolved_url, streamdetails):
- yield chunk
- elif supports_icy: # http stream supports icy metadata
- async for chunk in get_icy_stream(mass, resolved_url, streamdetails):
- yield chunk
- else: # generic http stream (without icy metadata)
- async for chunk in get_http_stream(mass, resolved_url, streamdetails):
- yield chunk
- except ClientError:
- LOGGER.warning("Streaming radio %s failed, retrying...", streamdetails.uri)
- if retries >= 5:
- raise
- await asyncio.sleep(1 * retries)
+ # handle special HLS stream
+ if is_hls:
+ async for chunk in get_hls_stream(mass, resolved_url, streamdetails):
+ yield chunk
+ return
+ # handle http stream supports icy metadata
+ if supports_icy:
+ async for chunk in get_icy_stream(mass, resolved_url, streamdetails):
+ yield chunk
+ return
+ # generic http stream (without icy metadata)
+ async for chunk in get_http_stream(mass, resolved_url, streamdetails):
+ yield chunk
async def get_icy_stream(
"Start streaming HLS stream for url %s (selected substream %s)", url, substream_url
)
- input_format = streamdetails.audio_format
- output_format = streamdetails.audio_format
if streamdetails.audio_format.content_type == ContentType.UNKNOWN:
streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC)
- output_format = AudioFormat(content_type=ContentType.FLAC)
try:
metadata_task = asyncio.create_task(watch_metadata())
async for chunk in get_ffmpeg_stream(
audio_input=substream_url,
- input_format=input_format,
- output_format=output_format,
+ input_format=streamdetails.audio_format,
+ # we need a self-explaining codec but not loose data from re-encoding
+ output_format=AudioFormat(content_type=ContentType.FLAC),
):
yield chunk
finally:
enable_stdout=True,
enable_stderr=False,
custom_stdin=audio_input if use_stdin else None,
- name="player_ffmpeg_stream",
+ name="ffmpeg_stream",
) as ffmpeg_proc:
# read final chunks from stdout
chunk_size = chunk_size or get_chunksize(output_format, 1)
input_path: str = "-",
output_path: str = "-",
loglevel: str = "info",
+ extra_input_args: list[str] | None = None,
) -> list[str]:
"""Collect all args to send to the ffmpeg process."""
if extra_args is None:
"-ignore_unknown",
"-protocol_whitelist",
"file,http,https,tcp,tls,crypto,pipe,data,fd",
+ "-filter_complex_threads",
+ "1",
]
# collect input args
- input_args = [
- "-ac",
- str(input_format.channels),
- "-channel_layout",
- "mono" if input_format.channels == 1 else "stereo",
- ]
- if input_format.content_type.is_pcm():
- input_args += ["-ar", str(input_format.sample_rate)]
+ input_args = []
+ if extra_input_args:
+ input_args += extra_input_args
if input_path.startswith("http"):
# append reconnect options for direct stream from http
input_args += [
"-reconnect_on_http_error",
"5xx",
]
- if input_format.content_type != ContentType.UNKNOWN:
- input_args += ["-f", input_format.content_type.value]
+ if input_format.content_type.is_pcm():
+ input_args += [
+ "-ac",
+ str(input_format.channels),
+ "-channel_layout",
+ "mono" if input_format.channels == 1 else "stereo",
+ "-ar",
+ str(input_format.sample_rate),
+ "-acodec",
+ input_format.content_type.name.lower(),
+ "-f",
+ input_format.content_type.value,
+ ]
input_args += ["-i", input_path]
# collect output args
if output_path.upper() == "NULL":
output_args = ["-f", "null", "-"]
- elif output_format.content_type == ContentType.UNKNOWN:
- output_args = [output_path]
else:
output_args = [
"-acodec",
self._custom_stdin = None
self.attached_tasks.append(asyncio.create_task(self._feed_stdin(custom_stdin)))
self._custom_stdout = custom_stdout
- self._stderr_locked = asyncio.Lock()
@property
def closed(self) -> bool:
return self._returncode
if self.proc is None:
return None
- return self.proc.returncode
+ if (ret_code := self.proc.returncode) is not None:
+ self._returncode = ret_code
+ return ret_code
async def __aenter__(self) -> AsyncProcess:
"""Enter context manager."""
exc_tb: TracebackType | None,
) -> bool | None:
"""Exit context manager."""
- await self.close()
+ # send interrupt signal to process when we're cancelled
+ await self.close(send_signal=exc_type in (GeneratorExit, asyncio.CancelledError))
self._returncode = self.returncode
async def start(self) -> None:
async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks of n size from the process stdout."""
- while True:
+ while self.returncode is None:
chunk = await self.readexactly(n)
if len(chunk) == 0:
break
async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks as they come in from process stdout."""
- while True:
+ while self.returncode is None:
chunk = await self.read(n)
if len(chunk) == 0:
break
async def readexactly(self, n: int) -> bytes:
"""Read exactly n bytes from the process stdout (or less if eof)."""
+ if not self.proc.stdout or self.proc.stdout.at_eof():
+ return b""
try:
return await self.proc.stdout.readexactly(n)
except asyncio.IncompleteReadError as err:
and may return less or equal bytes than requested, but at least one byte.
If EOF was received before any byte is read, this function returns empty byte object.
"""
+ if not self.proc.stdout or self.proc.stdout.at_eof():
+ return b""
return await self.proc.stdout.read(n)
async def write(self, data: bytes) -> None:
"""Write data to process stdin."""
- if self._close_called or self.proc.stdin.is_closing():
+ if self.returncode is not None or self.proc.stdin.is_closing():
raise asyncio.CancelledError("write called while process already done")
self.proc.stdin.write(data)
with suppress(BrokenPipeError, ConnectionResetError):
async def write_eof(self) -> None:
"""Write end of file to to process stdin."""
+ if self.returncode is not None or self.proc.stdin.is_closing():
+ return
try:
if self.proc.stdin.can_write_eof():
self.proc.stdin.write_eof()
# already exited, race condition
pass
- async def close(self) -> int:
+ async def close(self, send_signal: bool = False) -> int:
"""Close/terminate the process and wait for exit."""
self._close_called = True
# close any/all attached (writer) tasks
task.cancel()
with suppress(asyncio.CancelledError):
await task
-
- if self.proc.returncode is None:
- # always first try to send sigint signal to try clean shutdown
- # for example ffmpeg needs this to cleanly shutdown and not lock on pipes
+ if send_signal and self.returncode is None:
self.proc.send_signal(SIGINT)
- # allow the process a little bit of time to respond to the signal
- await asyncio.sleep(0.1)
+ # allow the process a bit of time to respond to the signal before we go nuclear
+ await asyncio.sleep(0.5)
- # send communicate until we exited
- while self.proc.returncode is None:
- # make sure the process is really cleaned up.
- # especially with pipes this can cause deadlocks if not properly guarded
- # we need to use communicate to ensure buffers are flushed
- # we do that with sending communicate
- if self._enable_stdin and not self.proc.stdin.is_closing():
- self.proc.stdin.close()
+ # make sure the process is really cleaned up.
+ # especially with pipes this can cause deadlocks if not properly guarded
+ # we need to ensure stdout and stderr are flushed and stdin closed
+ while self.returncode is None:
try:
- if self.proc.stdout and self._stderr_locked.locked():
- await asyncio.wait_for(self.proc.stdout.read(), 5)
- else:
- await asyncio.wait_for(self.proc.communicate(), 5)
+ async with asyncio.timeout(30):
+ # abort existing readers on stderr/stdout first before we send communicate
+ if self.proc.stdout and self.proc.stdout._waiter is not None:
+ self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
+ self.proc.stdout._waiter = None
+ if self.proc.stderr and self.proc.stderr._waiter is not None:
+ self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
+ self.proc.stderr._waiter = None
+ # use communicate to flush all pipe buffers
+ await self.proc.communicate()
except TimeoutError:
LOGGER.debug(
"Process %s with PID %s did not stop in time. Sending terminate...",
"Process %s with PID %s stopped with returncode %s",
self._name,
self.proc.pid,
- self.proc.returncode,
+ self.returncode,
)
- return self.proc.returncode
+ return self.returncode
async def wait(self) -> int:
"""Wait for the process and return the returncode."""
if self.returncode is not None:
return self.returncode
- return await self.proc.wait()
+ self._returncode = await self.proc.wait()
+ return self._returncode
async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
"""Write bytes to process and read back results."""
stdout, stderr = await self.proc.communicate(input_data)
+ self._returncode = self.proc.returncode
return (stdout, stderr)
async def iter_stderr(self) -> AsyncGenerator[bytes, None]:
"""Iterate lines from the stderr stream."""
- while not self.closed:
+ while self.returncode is None:
+ if self.proc.stderr.at_eof():
+ break
try:
- async with self._stderr_locked:
- yield await self.proc.stderr.readline()
+ yield await self.proc.stderr.readline()
+ if self.proc.stderr.at_eof():
+ break
except ValueError as err:
# we're waiting for a line (separator found), but the line was too big
# this may happen with ffmpeg during a long (radio) stream where progress
)
if log_level == "GLOBAL":
self.logger.setLevel(mass_logger.level)
- elif logging.getLogger().level > self.logger.level:
+ else:
+ self.logger.setLevel(log_level)
+ if logging.getLogger().level > self.logger.level:
# if the root logger's level is higher, we need to adjust that too
logging.getLogger().setLevel(self.logger.level)
CONF_VOLUME_START = "volume_start"
CONF_PASSWORD = "password"
-REQUIRED_BUFFER = int(44100 * (16 / 8) * 2) * 10 # 10 seconds
+# the output buffer to raop must be big enough to prevent small hiccups
+REQUIRED_BUFFER = int(44100 * (16 / 8) * 2) * 3 # 2 seconds
PLAYER_CONFIG_ENTRIES = (
self.active_remote_id: str = str(randint(1000, 8000))
self.start_ntp: int | None = None # use as checksum
self.prevent_playback: bool = False
+ self.running = True
self._log_reader_task: asyncio.Task | None = None
self._audio_reader_task: asyncio.Task | None = None
self._cliraop_proc: AsyncProcess | None = None
self._ffmpeg_proc: AsyncProcess | None = None
- self._stop_requested = False
-
- @property
- def running(self) -> bool:
- """Return bool if we're running."""
- return (
- not self._stop_requested
- and self._cliraop_proc
- and self._cliraop_proc.returncode is None
- )
+ self._buffer = asyncio.Queue(10)
async def start(self, start_ntp: int) -> None:
"""Initialize CLIRaop process for a player."""
"-port",
str(self.airplay_player.discovery_info.port),
"-wait",
- str(2000 - sync_adjust),
+ str(2500 - sync_adjust),
"-volume",
str(mass_player.volume_level),
*extra_args,
# one could argue that the intermediate ffmpeg towards cliraop is not needed
# when there are no player specific filters or extras but in this case
# ffmpeg serves as a small buffer towards the realtime cliraop streamer
+ read, write = os.pipe()
+
+ async def read_from_buffer() -> AsyncGenerator[bytes, None]:
+ while True:
+ next_chunk = await self._buffer.get()
+ if not next_chunk:
+ break
+ yield next_chunk
+ del next_chunk
+
ffmpeg_args = get_ffmpeg_args(
input_format=self.input_format,
output_format=AIRPLAY_PCM_FORMAT,
enable_stdin=True,
enable_stdout=True,
enable_stderr=False,
+ custom_stdin=read_from_buffer(),
+ custom_stdout=write,
name="cliraop_ffmpeg",
)
await self._ffmpeg_proc.start()
+ os.close(write)
+
self._cliraop_proc = AsyncProcess(
cliraop_args,
enable_stdin=True,
enable_stdout=False,
enable_stderr=True,
- custom_stdin=self._audio_feeder(),
+ custom_stdin=read,
name="cliraop",
)
await self._cliraop_proc.start()
+ os.close(read)
self._log_reader_task = asyncio.create_task(self._log_watcher())
async def stop(self, wait: bool = True):
"""Stop playback and cleanup."""
- if self._cliraop_proc.closed and self._ffmpeg_proc.closed:
- return
- self._stop_requested = True
+ self.running = False
async def _stop() -> None:
# ffmpeg MUST be stopped before cliraop due to the chained pipes
- await self._ffmpeg_proc.close()
+ if not self._ffmpeg_proc.closed:
+ await self._ffmpeg_proc.close(True)
# allow the cliraop process to stop gracefully first
- await self.send_cli_command("ACTION=STOP")
- with suppress(TimeoutError):
- await asyncio.wait_for(self._cliraop_proc.wait(), 5)
+ if not self._cliraop_proc.closed:
+ await self.send_cli_command("ACTION=STOP")
+ with suppress(TimeoutError):
+ await asyncio.wait_for(self._cliraop_proc.wait(), 5)
# send regular close anyway (which also logs the returncode)
- await self._cliraop_proc.close()
+ await self._cliraop_proc.close(True)
task = self.mass.create_task(_stop())
if wait:
async def write_chunk(self, chunk: bytes) -> None:
"""Write a (pcm) audio chunk to ffmpeg."""
- await self._ffmpeg_proc.write(chunk)
+ await self._buffer.put(chunk)
async def write_eof(self) -> None:
"""Write EOF to the ffmpeg stdin."""
- await self._ffmpeg_proc.write_eof()
- await self._ffmpeg_proc.wait()
- await self.stop()
+ if not self.running:
+ return
+ await self._buffer.put(b"")
async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLIRaop binary."""
self.mass.players.update(airplay_player.player_id)
if "lost packet out of backlog" in line:
lost_packets += 1
- if lost_packets == 100:
+ if lost_packets == 50:
logger.error("High packet loss detected, stopping playback...")
await self.stop(False)
elif lost_packets % 10 == 0:
logger.log(VERBOSE_LOG_LEVEL, line)
# if we reach this point, the process exited
+ self.running = False
if airplay_player.active_stream == self:
mass_player.state = PlayerState.IDLE
self.mass.players.update(airplay_player.player_id)
# ensure we're cleaned up afterwards
- await self.stop()
+ if self._ffmpeg_proc.returncode is None or self._cliraop_proc.returncode is None:
+ await self.stop()
async def _send_metadata(self, queue: PlayerQueue) -> None:
"""Send metadata to player (and connected sync childs)."""
- if not self.running:
- return
if not queue or not queue.current_item:
return
duration = min(queue.current_item.duration or 0, 3600)
async def _send_progress(self, queue: PlayerQueue) -> None:
"""Send progress report to player (and connected sync childs)."""
- if not self.running:
- return
if not queue or not queue.current_item:
return
progress = int(queue.corrected_elapsed_time)
await self.send_cli_command(f"PROGRESS={progress}\n")
- async def _audio_feeder(self) -> AsyncGenerator[bytes, None]:
- """Read chunks from ffmpeg and feed (buffered) to cliraop."""
- buffer = b""
- async for chunk in self._ffmpeg_proc.iter_any():
- if self._stop_requested:
- break
- buffer += chunk
- chunksize = len(chunk)
- del chunk
- while len(buffer) > REQUIRED_BUFFER:
- yield buffer[:chunksize]
- buffer = buffer[chunksize:]
- # end of stream
- if not self._stop_requested:
- yield buffer
- await self._cliraop_proc.write_eof()
- del buffer
-
@dataclass
class AirPlayPlayer:
start_ntp = int(stdout.strip())
# setup Raop process for player and its sync childs
- async with asyncio.TaskGroup() as tg:
- for airplay_player in self._get_sync_clients(player_id):
- airplay_player.active_stream = AirplayStream(
- self, airplay_player, input_format=input_format
- )
- tg.create_task(airplay_player.active_stream.start(start_ntp))
+ for airplay_player in self._get_sync_clients(player_id):
+ airplay_player.active_stream = AirplayStream(
+ self, airplay_player, input_format=input_format
+ )
+ self.mass.create_task(airplay_player.active_stream.start(start_ntp))
async for chunk in audio_source:
active_clients = 0
async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
- if not (airplay_player.active_stream and airplay_player.active_stream.running):
+ if not airplay_player.active_stream or not airplay_player.active_stream.running:
# player stopped or switched to a new stream
continue
if airplay_player.active_stream.start_ntp != start_ntp:
if active_clients == 0:
# no more clients
return
- # entire stream consumed: send EOF
+ # entire stream consumed: send EOF (empty chunk)
async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
- if (
- not airplay_player.active_stream
- or airplay_player.active_stream.start_ntp != start_ntp
- ):
- continue
tg.create_task(airplay_player.active_stream.write_eof())
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
input_format = DEFAULT_SNAPCAST_FORMAT
audio_source = self.mass.streams.get_announcement_stream(
queue_item.streamdetails.data["url"],
- pcm_format=DEFAULT_SNAPCAST_FORMAT,
+ output_format=DEFAULT_SNAPCAST_FORMAT,
use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
)
else:
try:
retries += 1
if not tokeninfo:
- async with asyncio.timeout(5):
+ async with asyncio.timeout(10):
tokeninfo = await self._get_token()
if tokeninfo and not userinfo:
- async with asyncio.timeout(5):
+ async with asyncio.timeout(10):
userinfo = await self._get_data("me", tokeninfo=tokeninfo)
if tokeninfo and userinfo:
# we have all info we need!
]
if self._ap_workaround:
args += ["--ap-port", "12345"]
- librespot = await asyncio.create_subprocess_exec(
- *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
- )
- stdout, _ = await librespot.communicate()
+ async with AsyncProcess(args, enable_stdout=True) as librespot:
+ stdout = await librespot.read(-1)
if stdout.decode().strip() != "authorized":
raise LoginFailed(f"Login failed for username {self.config.get_value(CONF_USERNAME)}")
# get token with (authorized) librespot
]
if self._ap_workaround:
args += ["--ap-port", "12345"]
- librespot = await asyncio.create_subprocess_exec(
- *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
- )
- stdout, _ = await librespot.communicate()
+ async with AsyncProcess(args, enable_stdout=True) as librespot:
+ stdout = await librespot.read(-1)
duration = round(time.time() - time_start, 2)
try:
result = json.loads(stdout)
media_item = Radio(
item_id=url,
provider=self.domain,
- name=media_info.get("icy-name") or media_info.title,
+ name=media_info.get("icy-name") or url,
provider_mappings=provider_mappings,
)
else:
media_item = Track(
item_id=url,
provider=self.domain,
- name=media_info.title,
+ name=media_info.title or url,
duration=int(media_info.duration or 0),
artists=[await self.get_artist(artist) for artist in media_info.artists],
provider_mappings=provider_mappings,
if self.closing:
return
- if LOGGER.isEnabledFor(logging.DEBUG) and event != EventType.QUEUE_TIME_UPDATED:
+ if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
# do not log queue time updated events because that is too chatty
- LOGGER.getChild("event").debug("%s %s", event.value, object_id or "")
+ LOGGER.getChild("event").log(VERBOSE_LOG_LEVEL, "%s %s", event.value, object_id or "")
event_obj = MassEvent(event=event, object_id=object_id, data=data)
for cb_func, event_filter, id_filter in self._subscribers:
*args: Any,
task_id: str | None = None,
**kwargs: Any,
- ) -> asyncio.Task | asyncio.Future:
+ ) -> asyncio.TimerHandle:
"""Run callable/awaitable after given delay."""
def _create_task() -> None:
self.create_task(target, *args, task_id=task_id, **kwargs)
- self.loop.call_later(delay, _create_task)
+ return self.loop.call_later(delay, _create_task)
def get_task(self, task_id: str) -> asyncio.Task | asyncio.Future:
"""Get existing scheduled task."""