"""Return the PCM sample size."""
return int(self.sample_rate * (self.bit_depth / 8) * self.channels)
+ def __eq__(self, other: AudioFormat) -> bool:
+ """Check equality of two items."""
+ if not other:
+ return False
+ return self.output_format_str == other.output_format_str
+
@dataclass(frozen=True, kw_only=True)
class ProviderMapping(DataClassDictMixin):
try:
# Check if the QueueItem is playable. For example, YT Music returns Radio Items
# that are not playable which will stop playback.
- await get_stream_details(mass=self.mass, queue_item=next_item)
+ next_item.streamdetails = await get_stream_details(
+ mass=self.mass, queue_item=next_item
+ )
# Lazy load the full MediaItem for the QueueItem, making sure to get the
# maximum quality of thumbs
next_item.media_item = await self.mass.music.get_item_by_uri(next_item.uri)
if group_player.powered == power:
return # nothing to do
+ # make sure to update the group power state
+ group_player.powered = power
# always stop (group/master)player at power off
if not power and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
await self.cmd_stop(player_id)
async with asyncio.TaskGroup() as tg:
- members_powered = False
+ any_member_powered = False
for member in self.iter_group_members(group_player, only_powered=True):
- members_powered = True
+ any_member_powered = True
if power:
# set active source to group player if the group (is going to be) powered
member.active_source = group_player.player_id
- elif member.active_source == group_player.player_id:
+ else:
# turn off child player when group turns off
tg.create_task(self.cmd_power(member.player_id, False))
member.active_source = None
# edge case: group turned on but no members are powered, power them all!
- if not members_powered and power:
+ if not any_member_powered and power:
for member in self.iter_group_members(group_player, only_powered=False):
tg.create_task(self.cmd_power(member.player_id, True))
member.active_source = group_player.player_id
import time
import urllib.parse
from collections.abc import AsyncGenerator
-from contextlib import suppress
from typing import TYPE_CHECKING
import shortuuid
from aiohttp import web
-from music_assistant.common.helpers.util import get_ip, select_free_port
+from music_assistant.common.helpers.util import empty_queue, get_ip, select_free_port
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueOption,
CONF_BIND_PORT,
CONF_CROSSFADE,
CONF_CROSSFADE_DURATION,
- CONF_EQ_BASS,
- CONF_EQ_MID,
- CONF_EQ_TREBLE,
CONF_OUTPUT_CHANNELS,
CONF_PUBLISH_IP,
SILENCE_FILE,
from music_assistant.server.helpers.audio import (
check_audio_support,
crossfade_pcm_parts,
+ get_ffmpeg_stream,
get_media_stream,
+ get_player_filter_params,
get_stream_details,
)
-from music_assistant.server.helpers.process import AsyncProcess
from music_assistant.server.helpers.util import get_ips
from music_assistant.server.helpers.webserver import Webserver
from music_assistant.server.models.core_controller import CoreController
from music_assistant.common.models.player import Player
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
+ from music_assistant.server import MusicAssistant
DEFAULT_STREAM_HEADERS = {
"icy-name": "Music Assistant",
"icy-pub": "0",
}
-FLOW_MAX_SAMPLE_RATE = 192000
+FLOW_MAX_SAMPLE_RATE = 96000
FLOW_MAX_BIT_DEPTH = 24
# pylint:disable=too-many-locals
-class MultiClientStreamJob:
+class MultiClientQueueStreamJob:
"""Representation of a (multiclient) Audio Queue stream job/task.
- The whole idea here is that in case of a player (sync)group,
- all client players receive the exact same (PCM) audio chunks from the source audio.
+ The whole idea here is that the queue stream audio can be sent to multiple
+ players at once. For example for (slimproto/airplay) syncgroups and universal group.
+ all client players receive the exact same audio chunks from the source audio,
+ encoded and/or resampled to the player's preferences.
A StreamJob is tied to a Queue and streams the queue flow stream,
- In case a stream is restarted (e.g. when seeking), a new MultiClientStreamJob will be created.
+ In case a stream is restarted (e.g. when seeking),
+ a new MultiClientQueueStreamJob will be created.
"""
_audio_task: asyncio.Task | None = None
def __init__(
self,
- stream_controller: StreamsController,
- queue_id: str,
+ mass: MusicAssistant,
+ pcm_audio_source: AsyncGenerator[bytes, None],
pcm_format: AudioFormat,
- start_queue_item: QueueItem,
- seek_position: int = 0,
- fade_in: bool = False,
+ expected_players: set[str],
+ auto_start: bool = True,
) -> None:
- """Initialize MultiClientStreamJob instance."""
- self.stream_controller = stream_controller
- self.queue_id = queue_id
- self.queue = self.stream_controller.mass.player_queues.get(queue_id)
- assert self.queue # just in case
+ """Initialize MultiClientQueueStreamJob instance."""
+ self.mass = mass
+ self.pcm_audio_source = pcm_audio_source
self.pcm_format = pcm_format
- self.start_queue_item = start_queue_item
- self.seek_position = seek_position
- self.fade_in = fade_in
+ self.expected_players = expected_players
self.job_id = shortuuid.uuid()
- self.expected_players: set[str] = set()
- self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {}
+ self.auto_start = auto_start
self.bytes_streamed: int = 0
- self._all_clients_connected = asyncio.Event()
- self.logger = stream_controller.logger.getChild(f"streamjob_{self.job_id}")
- self._finished: bool = False
- self.workaround_players_seen: set[str] = set()
- # start running the audio task in the background
- self._audio_task = asyncio.create_task(self._stream_job_runner())
+ self.logger = self.mass.streams.logger.getChild(f"stream_job.{self.job_id}")
+ self._subscribed_players: dict[str, asyncio.Queue] = {}
+ self._finished = asyncio.Event()
+ self._audio_task: asyncio.Task | None = None
@property
def finished(self) -> bool:
"""Return if this StreamJob is finished."""
- return self._finished or self._audio_task and self._audio_task.done()
+ return self._finished.is_set() or self._audio_task and self._audio_task.done()
@property
def pending(self) -> bool:
"""Return if this Job is pending start."""
- return not self.finished and not self._all_clients_connected.is_set()
+ return not self.finished and not self._audio_task
@property
def running(self) -> bool:
"""Return if this Job is running."""
- return not self.finished and not self.pending
+ return self._audio_task and not self._audio_task.done()
+
+ def start(self) -> None:
+ """Start running (send audio chunks to connected players)."""
+ if self.running:
+ return
+ if self.finished:
+ raise RuntimeError("Task is already finished")
+ self.logger.debug(
+ "Starting multi client stream job %s with %s out of %s connected clients",
+ self.job_id,
+ len(self._subscribed_players),
+ len(self.expected_players),
+ )
+ self._audio_task = asyncio.create_task(self._stream_job_runner())
def stop(self) -> None:
"""Stop running this job."""
- self._finished = True
if self._audio_task and self._audio_task.done():
return
if self._audio_task:
self._audio_task.cancel()
- for sub_queue in self.subscribed_players.values():
- with suppress(asyncio.QueueFull):
- sub_queue.put_nowait(b"EOF")
+ self._finished.set()
def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
"""Resolve the childplayer specific stream URL to this streamjob."""
fmt = output_codec.value
# handle raw pcm
if output_codec.is_pcm():
- player = self.stream_controller.mass.players.get(child_player_id)
+ player = self.mass.streams.mass.players.get(child_player_id)
player_max_bit_depth = 24 if player.supports_24bit else 16
output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate)
output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth)
- output_channels = self.stream_controller.mass.config.get_raw_player_config_value(
+ output_channels = self.mass.config.get_raw_player_config_value(
child_player_id, CONF_OUTPUT_CHANNELS, "stereo"
)
channels = 1 if output_channels != "stereo" else 2
f";codec=pcm;rate={output_sample_rate};"
f"bitrate={output_bit_depth};channels={channels}"
)
- url = f"{self.stream_controller._server.base_url}/{self.queue_id}/multi/{self.job_id}/{child_player_id}/{self.start_queue_item.queue_item_id}.{fmt}" # noqa: E501
+ url = f"{self.mass.streams._server.base_url}/multi/{self.job_id}/{child_player_id}.{fmt}"
self.expected_players.add(child_player_id)
return url
- async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]:
- """Subscribe consumer and iterate incoming chunks on the queue."""
- if (
- player_id in self.stream_controller.workaround_players
- and player_id not in self.workaround_players_seen
+ async def subscribe(
+ self, player_id: str, output_format: AudioFormat, chunk_size: int | None = None
+ ) -> AsyncGenerator[bytes, None]:
+ """Subscribe consumer and iterate chunks on the queue encoded to given output format."""
+ async for chunk in get_ffmpeg_stream(
+ audio_input=self._subscribe_pcm(player_id),
+ input_format=self.pcm_format,
+ output_format=output_format,
+ filter_params=get_player_filter_params(self.mass, player_id),
+ chunk_size=chunk_size,
):
- self.workaround_players_seen.add(player_id)
- yield b"EOF"
- return
+ yield chunk
+ async def _subscribe_pcm(self, player_id: str) -> AsyncGenerator[bytes, None]:
+ """Subscribe consumer and iterate incoming (raw pcm) chunks on the queue."""
try:
- self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2)
-
- if self._all_clients_connected.is_set():
- # client subscribes while we're already started - we dont support that (for now?)
- msg = f"Client {player_id} is joining while the stream is already started"
- raise RuntimeError(msg)
- self.logger.debug("Subscribed client %s", player_id)
+ self._subscribed_players[player_id] = queue = asyncio.Queue(1)
+
+ if self.running:
+ # client subscribes while we're already started
+ # that will probably cause side effects but let it go
+ self.logger.warning(
+ "Player %s is joining while the stream is already started!", player_id
+ )
+ else:
+ self.logger.debug("Subscribed player %s", player_id)
- if len(self.subscribed_players) == len(self.expected_players):
+ await asyncio.sleep(0.2) # debounce
+ if (
+ self.auto_start
+ and not self.running
+ and len(self._subscribed_players) == len(self.expected_players)
+ ):
# we reached the number of expected subscribers, set event
# so that chunks can be pushed
- self._all_clients_connected.set()
-
- # keep reading audio chunks from the queue until we receive an EOF chunk
- while True:
- chunk = await sub_queue.get()
- if chunk == b"EOF":
- # EOF chunk received
- break
- yield chunk
+ self.start()
+ # yield from queue until finished
+ while not self._finished.is_set():
+ yield await queue.get()
finally:
- self.subscribed_players.pop(player_id, None)
+ if sub_queue := self._subscribed_players.pop(player_id, None):
+ empty_queue(sub_queue)
self.logger.debug("Unsubscribed client %s", player_id)
# check if this was the last subscriber and we should cancel
await asyncio.sleep(2)
- if len(self.subscribed_players) == 0 and self._audio_task and not self.finished:
+ if len(self._subscribed_players) == 0 and not self.finished:
self.logger.debug("Cleaning up, all clients disappeared...")
- self._audio_task.cancel()
-
- async def _put_chunk(self, chunk: bytes) -> None:
- """Put chunk of data to all subscribers."""
- async with asyncio.TaskGroup() as tg:
- for sub_queue in list(self.subscribed_players.values()):
- # put this chunk on the player's subqueue
- tg.create_task(sub_queue.put(chunk))
- self.bytes_streamed += len(chunk)
+ self.stop()
async def _stream_job_runner(self) -> None:
"""Feed audio chunks to StreamJob subscribers."""
- chunk_num = 0
- async for chunk in self.stream_controller.get_flow_stream(
- self.queue,
- self.start_queue_item,
- self.pcm_format,
- self.seek_position,
- self.fade_in,
- ):
- chunk_num += 1
- if chunk_num == 1:
- # wait until all expected clients are connected
- try:
- async with asyncio.timeout(10):
- await self._all_clients_connected.wait()
- except TimeoutError:
- if len(self.subscribed_players) == 0:
- self.stream_controller.logger.exception(
- "Abort multi client stream job for queue %s: "
- "clients did not connect within timeout",
- self.queue.display_name,
- )
- break
- # not all clients connected but timeout expired, set flag and move on
- # with all clients that did connect
- self._all_clients_connected.set()
- else:
- self.stream_controller.logger.debug(
- "Starting multi client stream job for queue %s "
- "with %s out of %s connected clients",
- self.queue.display_name,
- len(self.subscribed_players),
- len(self.expected_players),
- )
-
- await self._put_chunk(chunk)
-
- # mark EOF with EOF chunk
- await self._put_chunk(b"EOF")
+ async for chunk in self.pcm_audio_source:
+ async with asyncio.TaskGroup() as tg:
+ for listener_queue in list(self._subscribed_players.values()):
+ tg.create_task(listener_queue.put(chunk))
+ self._finished.set()
def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
"""Initialize instance."""
super().__init__(*args, **kwargs)
self._server = Webserver(self.logger, enable_dynamic_routes=True)
- self.multi_client_jobs: dict[str, MultiClientStreamJob] = {}
+ self.multi_client_jobs: dict[str, MultiClientQueueStreamJob] = {}
self.register_dynamic_route = self._server.register_dynamic_route
self.unregister_dynamic_route = self._server.unregister_dynamic_route
self.manifest.name = "Streamserver"
"some player specific local control callbacks."
)
self.manifest.icon = "cast-audio"
- self.workaround_players: set[str] = set()
@property
def base_url(self) -> str:
static_routes=[
(
"*",
- "/{queue_id}/multi/{job_id}/{player_id}/{queue_item_id}.{fmt}",
+ "/multi/{job_id}/{player_id}.{fmt}",
self.serve_multi_subscriber_stream,
),
(
"*",
- "/{queue_id}/flow/{queue_item_id}.{fmt}",
+ "/flow/{queue_id}/{queue_item_id}.{fmt}",
self.serve_queue_flow_stream,
),
(
"*",
- "/{queue_id}/single/{queue_item_id}.{fmt}",
+ "/single/{queue_id}/{queue_item_id}.{fmt}",
self.serve_queue_item_stream,
),
(
"*",
- "/{queue_id}/command/{command}.mp3",
+ "/command/{queue_id}/{command}.mp3",
self.serve_command_request,
),
],
async def resolve_stream_url(
self,
+ player_id: str,
queue_item: QueueItem,
output_codec: ContentType,
seek_position: int = 0,
) -> str:
"""Resolve the stream URL for the given QueueItem."""
fmt = output_codec.value
+ # handle request for multi client queue stream
+ stream_job = self.multi_client_jobs.get(queue_item.queue_id)
+ if queue_item.queue_item_id == "flow" or stream_job and stream_job.pending:
+ return stream_job.resolve_stream_url(player_id, output_codec)
# handle raw pcm without exact format specifiers
if output_codec.is_pcm() and ";" not in fmt:
fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
query_params = {}
base_path = "flow" if flow_mode else "single"
- url = f"{self._server.base_url}/{queue_item.queue_id}/{base_path}/{queue_item.queue_item_id}.{fmt}" # noqa: E501
+ url = f"{self._server.base_url}/{base_path}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" # noqa: E501
if seek_position:
query_params["seek_position"] = str(seek_position)
if fade_in:
fade_in: bool = False,
pcm_bit_depth: int = 24,
pcm_sample_rate: int = 48000,
- ) -> MultiClientStreamJob:
- """Create a MultiClientStreamJob for the given queue..
+ expected_players: set[str] | None = None,
+ auto_start: bool = True,
+ ) -> MultiClientQueueStreamJob:
+ """
+ Create a MultiClientQueueStreamJob for the given queue..
This is called by player/sync group implementations to start streaming
the queue audio to multiple players at once.
"""
- if existing_job := self.multi_client_jobs.pop(queue_id, None):
+ if existing_job := self.multi_client_jobs.get(queue_id, None):
+ if existing_job.pending:
+ return existing_job
# cleanup existing job first
- if not existing_job.finished:
- self.logger.warning("Detected existing (running) stream job for queue %s", queue_id)
- existing_job.stop()
- self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob(
- self,
- queue_id=queue_id,
- pcm_format=AudioFormat(
- content_type=ContentType.from_bit_depth(pcm_bit_depth),
- sample_rate=pcm_sample_rate,
- bit_depth=pcm_bit_depth,
- channels=2,
+ existing_job.stop()
+ queue = self.mass.player_queues.get(queue_id)
+ pcm_format = AudioFormat(
+ content_type=ContentType.from_bit_depth(pcm_bit_depth),
+ sample_rate=pcm_sample_rate,
+ bit_depth=pcm_bit_depth,
+ channels=2,
+ )
+ self.multi_client_jobs[queue_id] = stream_job = MultiClientQueueStreamJob(
+ self.mass,
+ pcm_audio_source=self.get_flow_stream(
+ queue=queue,
+ start_queue_item=start_queue_item,
+ pcm_format=pcm_format,
+ seek_position=seek_position,
+ fade_in=fade_in,
),
- start_queue_item=start_queue_item,
- seek_position=seek_position,
- fade_in=fade_in,
+ pcm_format=pcm_format,
+ expected_players=expected_players or set(),
+ auto_start=auto_start,
)
return stream_job
queue.display_name,
)
queue.index_in_buffer = self.mass.player_queues.index_by_id(queue_id, queue_item_id)
- # collect player specific ffmpeg args to re-encode the source PCM stream
pcm_format = AudioFormat(
content_type=ContentType.from_bit_depth(
queue_item.streamdetails.audio_format.bit_depth
sample_rate=queue_item.streamdetails.audio_format.sample_rate,
bit_depth=queue_item.streamdetails.audio_format.bit_depth,
)
- ffmpeg_args = await self._get_player_ffmpeg_args(
- queue_player,
+ async for chunk in get_ffmpeg_stream(
+ audio_input=get_media_stream(
+ self.mass,
+ streamdetails=queue_item.streamdetails,
+ pcm_format=pcm_format,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ ),
input_format=pcm_format,
output_format=output_format,
- )
-
- async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc:
- # feed stdin with pcm audio chunks from origin
- async def read_audio() -> None:
- try:
- async for _, chunk in get_media_stream(
- self.mass,
- streamdetails=queue_item.streamdetails,
- pcm_format=pcm_format,
- seek_position=seek_position,
- fade_in=fade_in,
- ):
- try:
- await ffmpeg_proc.write(chunk)
- except BrokenPipeError:
- break
- finally:
- ffmpeg_proc.write_eof()
-
- ffmpeg_proc.attach_task(read_audio())
-
- # read final chunks from stdout
- async for chunk in ffmpeg_proc.iter_any(768000):
- try:
- await resp.write(chunk)
- except (BrokenPipeError, ConnectionResetError):
- # race condition
- break
+ filter_params=get_player_filter_params(self.mass, queue_player.player_id),
+ ):
+ try:
+ await resp.write(chunk)
+ except (BrokenPipeError, ConnectionResetError):
+ break
return resp
"""Stream Queue Flow audio to player."""
self._log_request(request)
queue_id = request.match_info["queue_id"]
- queue = self.mass.player_queues.get(queue_id)
- if not queue:
+ if not (queue := self.mass.player_queues.get(queue_id)):
raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
start_queue_item_id = request.match_info["queue_item_id"]
start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
# all checks passed, start streaming!
self.logger.debug("Start serving Queue flow audio stream for %s", queue_player.name)
- # collect player specific ffmpeg args to re-encode the source PCM stream
pcm_format = AudioFormat(
content_type=ContentType.from_bit_depth(output_format.bit_depth),
sample_rate=output_format.sample_rate,
bit_depth=output_format.bit_depth,
channels=2,
)
- ffmpeg_args = await self._get_player_ffmpeg_args(
- queue_player,
+ async for chunk in get_ffmpeg_stream(
+ audio_input=self.get_flow_stream(
+ queue=queue,
+ start_queue_item=start_queue_item,
+ pcm_format=pcm_format,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ ),
input_format=pcm_format,
output_format=output_format,
- )
-
- async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc:
- # feed stdin with pcm audio chunks from origin
- async def read_audio() -> None:
- try:
- async for chunk in self.get_flow_stream(
- queue=queue,
- start_queue_item=start_queue_item,
- pcm_format=pcm_format,
- seek_position=seek_position,
- fade_in=fade_in,
- ):
- try:
- await ffmpeg_proc.write(chunk)
- except BrokenPipeError:
- break
- finally:
- ffmpeg_proc.write_eof()
-
- ffmpeg_proc.attach_task(read_audio())
-
- # read final chunks from stdout
- iterator = (
- ffmpeg_proc.iter_chunked(icy_meta_interval)
- if enable_icy
- else ffmpeg_proc.iter_any(768000)
- )
- async for chunk in iterator:
- try:
- await resp.write(chunk)
- except (BrokenPipeError, ConnectionResetError):
- # race condition
- break
-
- if not enable_icy:
- continue
-
- # if icy metadata is enabled, send the icy metadata after the chunk
- if (
- # use current item here and not buffered item, otherwise
- # the icy metadata will be too much ahead
- (current_item := queue.current_item)
- and current_item.streamdetails
- and current_item.streamdetails.stream_title
- ):
- title = current_item.streamdetails.stream_title
- elif queue and current_item and current_item.name:
- title = current_item.name
- else:
- title = "Music Assistant"
- metadata = f"StreamTitle='{title}';".encode()
- if current_item and current_item.image:
- metadata += f"StreamURL='{current_item.image.path}'".encode()
- while len(metadata) % 16 != 0:
- metadata += b"\x00"
- length = len(metadata)
- length_b = chr(int(length / 16)).encode()
- await resp.write(length_b + metadata)
+ filter_params=get_player_filter_params(self.mass, queue_player.player_id),
+ chunk_size=icy_meta_interval if enable_icy else None,
+ ):
+ try:
+ await resp.write(chunk)
+ except (BrokenPipeError, ConnectionResetError):
+ break
+
+ if not enable_icy:
+ continue
+
+ # if icy metadata is enabled, send the icy metadata after the chunk
+ if (
+ # use current item here and not buffered item, otherwise
+ # the icy metadata will be too much ahead
+ (current_item := queue.current_item)
+ and current_item.streamdetails
+ and current_item.streamdetails.stream_title
+ ):
+ title = current_item.streamdetails.stream_title
+ elif queue and current_item and current_item.name:
+ title = current_item.name
+ else:
+ title = "Music Assistant"
+ metadata = f"StreamTitle='{title}';".encode()
+ if current_item and current_item.image:
+ metadata += f"StreamURL='{current_item.image.path}'".encode()
+ while len(metadata) % 16 != 0:
+ metadata += b"\x00"
+ length = len(metadata)
+ length_b = chr(int(length / 16)).encode()
+ await resp.write(length_b + metadata)
return resp
async def serve_multi_subscriber_stream(self, request: web.Request) -> web.Response:
"""Stream Queue Flow audio to a child player within a multi subscriber setup."""
self._log_request(request)
- queue_id = request.match_info["queue_id"]
- streamjob = self.multi_client_jobs.get(queue_id)
- if not streamjob:
- raise web.HTTPNotFound(reason=f"Unknown StreamJob for queue: {queue_id}")
job_id = request.match_info["job_id"]
- if job_id != streamjob.job_id:
- raise web.HTTPNotFound(reason=f"StreamJob ID {job_id} mismatch for queue: {queue_id}")
- child_player_id = request.match_info["player_id"]
- child_player = self.mass.players.get(child_player_id)
+ for queue_id, stream_job in self.multi_client_jobs.items():
+ if stream_job.job_id == job_id:
+ break
+ else:
+ raise web.HTTPNotFound(reason=f"Unknown StreamJob: {job_id}")
+ if stream_job.finished:
+ raise web.HTTPNotFound(reason=f"StreamJob {job_id} already finished")
+ if not (queue := self.mass.player_queues.get(queue_id)):
+ raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
+
+ player_id = request.match_info["player_id"]
+ child_player = self.mass.players.get(player_id)
if not child_player:
- raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
+ raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
# work out (childplayer specific!) output format/details
output_format = await self._get_output_format(
output_format_str=request.match_info["fmt"],
queue_player=child_player,
- default_sample_rate=streamjob.pcm_format.sample_rate,
- default_bit_depth=streamjob.pcm_format.bit_depth,
+ default_sample_rate=stream_job.pcm_format.sample_rate,
+ default_bit_depth=stream_job.pcm_format.bit_depth,
)
# prepare request, add some DLNA/UPNP compatible headers
+ enable_icy = request.headers.get("Icy-MetaData", "") == "1"
+ icy_meta_interval = 16384 * 4 if output_format.content_type.is_lossless() else 16384
headers = {
**DEFAULT_STREAM_HEADERS,
"Content-Type": f"audio/{output_format.output_format_str}",
}
+ if enable_icy:
+ headers["icy-metaint"] = str(icy_meta_interval)
+
resp = web.StreamResponse(
status=200,
reason="OK",
if request.method != "GET":
return resp
- # some players (e.g. dlna, sonos) misbehave and do multiple GET requests
- # to the stream in an attempt to get the audio details such as duration
- # which is a bit pointless for our duration-less queue stream
- # and it completely messes with the subscription logic
- if child_player_id in streamjob.subscribed_players:
- self.logger.warning(
- "Player %s is making multiple requests "
- "to the same stream, playback may be disturbed!",
- child_player_id,
- )
- self.workaround_players.add(child_player_id)
-
# all checks passed, start streaming!
self.logger.debug(
"Start serving multi-subscriber Queue flow audio stream for queue %s to player %s",
- streamjob.queue.display_name,
+ queue.display_name,
child_player.display_name,
)
-
- # collect player specific ffmpeg args to re-encode the source PCM stream
- ffmpeg_args = await self._get_player_ffmpeg_args(
- child_player,
- input_format=streamjob.pcm_format,
- output_format=output_format,
- )
-
- async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc:
- # feed stdin with pcm audio chunks from origin
- async def read_audio() -> None:
- try:
- async for chunk in streamjob.subscribe(child_player_id):
- try:
- await ffmpeg_proc.write(chunk)
- except BrokenPipeError:
- break
- finally:
- ffmpeg_proc.write_eof()
-
- ffmpeg_proc.attach_task(read_audio())
-
- # read final chunks from stdout
- async for chunk in ffmpeg_proc.iter_any(768000):
- try:
- await resp.write(chunk)
- except (BrokenPipeError, ConnectionResetError):
- # race condition
- break
+ async for chunk in stream_job.subscribe(
+ player_id, output_format, chunk_size=icy_meta_interval if enable_icy else None
+ ):
+ try:
+ await resp.write(chunk)
+ except (BrokenPipeError, ConnectionResetError):
+ break
+ if not enable_icy:
+ continue
+
+ # if icy metadata is enabled, send the icy metadata after the chunk
+ if (
+ # use current item here and not buffered item, otherwise
+ # the icy metadata will be too much ahead
+ (current_item := queue.current_item)
+ and current_item.streamdetails
+ and current_item.streamdetails.stream_title
+ ):
+ title = current_item.streamdetails.stream_title
+ elif queue and current_item and current_item.name:
+ title = current_item.name
+ else:
+ title = "Music Assistant"
+ metadata = f"StreamTitle='{title}';".encode()
+ if current_item and current_item.image:
+ metadata += f"StreamURL='{current_item.image.path}'".encode()
+ while len(metadata) % 16 != 0:
+ metadata += b"\x00"
+ length = len(metadata)
+ length_b = chr(int(length / 16)).encode()
+ await resp.write(length_b + metadata)
return resp
def get_command_url(self, player_or_queue_id: str, command: str) -> str:
"""Get the url for the special command stream."""
- return f"{self.base_url}/{player_or_queue_id}/command/{command}.mp3"
+ return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
async def get_flow_stream(
self,
bytes_written = 0
buffer = b""
# handle incoming audio chunks
- async for is_last_chunk, chunk in get_media_stream(
+ async for chunk in get_media_stream(
self.mass,
queue_track.streamdetails,
pcm_format=pcm_format,
strip_silence_begin=use_crossfade,
strip_silence_end=use_crossfade,
):
- # throttle buffer, do not allow more than 30 seconds in player's own buffer
- seconds_buffered = (total_bytes_written + bytes_written) / pcm_sample_size
- player = self.mass.players.get(queue.queue_id)
- if seconds_buffered > 60 and player.corrected_elapsed_time > 30:
- while (seconds_buffered - player.corrected_elapsed_time) > 30:
- await asyncio.sleep(1)
-
# ALWAYS APPEND CHUNK TO BUFFER
buffer += chunk
- if not is_last_chunk and len(buffer) < buffer_size:
+ if len(buffer) < buffer_size:
# buffer is not full enough, move on
continue
#### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
- if not is_last_chunk and last_fadeout_part:
+ if last_fadeout_part:
# perform crossfade
fadein_part = buffer[:crossfade_size]
remaining_bytes = buffer[crossfade_size:]
last_fadeout_part = b""
buffer = b""
- #### HANDLE END OF TRACK
- elif is_last_chunk:
- if use_crossfade:
- # if crossfade is enabled, save fadeout part to pickup for next track
- last_fadeout_part = buffer[-crossfade_size:]
- remaining_bytes = buffer[:-crossfade_size]
- yield remaining_bytes
- bytes_written += len(remaining_bytes)
- del remaining_bytes
- else:
- # no crossfade enabled, just yield the (entire) buffer last part
- yield buffer
- bytes_written += len(buffer)
- # clear vars
- buffer = b""
-
#### OTHER: enough data in buffer, feed to output
else:
chunk_size = len(chunk)
bytes_written += chunk_size
buffer = buffer[chunk_size:]
+ #### HANDLE END OF TRACK
+ if last_fadeout_part:
+ # edge case: we did not get enough data to make the crossfade
+ yield last_fadeout_part
+ bytes_written += len(last_fadeout_part)
+ last_fadeout_part = b""
+ if use_crossfade:
+ # if crossfade is enabled, save fadeout part to pickup for next track
+ last_fadeout_part = buffer[-crossfade_size:]
+ remaining_bytes = buffer[:-crossfade_size]
+ yield remaining_bytes
+ bytes_written += len(remaining_bytes)
+ del remaining_bytes
+ else:
+ # no crossfade enabled, just yield the (entire) buffer last part
+ yield buffer
+ bytes_written += len(buffer)
+ # clear vars
+ buffer = b""
+
# update duration details based on the actual pcm data we sent
# this also accounts for crossfade and silence stripping
queue_track.streamdetails.seconds_streamed = bytes_written / pcm_sample_size
yield last_fadeout_part
self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
- async def _get_player_ffmpeg_args(
- self,
- player: Player,
- input_format: AudioFormat,
- output_format: AudioFormat,
- ) -> list[str]:
- """Get player specific arguments for the given (pcm) input and output details."""
- # generic args
- generic_args = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "warning" if self.logger.isEnabledFor(logging.DEBUG) else "quiet",
- "-ignore_unknown",
- ]
- # input args
- input_args = [
- "-f",
- input_format.content_type.value,
- "-ac",
- str(input_format.channels),
- "-channel_layout",
- "mono" if input_format.channels == 1 else "stereo",
- "-ar",
- str(input_format.sample_rate),
- "-i",
- "-",
- ]
- # select output args
- if output_format.content_type == ContentType.FLAC:
- # set compression level to 0 to prevent issues with cast players
- output_args = ["-f", "flac", "-compression_level", "0"]
- elif output_format.content_type == ContentType.AAC:
- output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "320k"]
- elif output_format.content_type == ContentType.MP3:
- output_args = ["-f", "mp3", "-c:a", "mp3", "-b:a", "320k"]
- else:
- output_args = ["-f", output_format.content_type.value]
-
- # append channels
- output_args += ["-ac", str(output_format.channels)]
- # append sample rate (if codec is lossless)
- if output_format.content_type.is_lossless():
- output_args += ["-ar", str(output_format.sample_rate)]
- # append output = pipe
- output_args += ["-"]
-
- # collect extra and filter args
- # TODO: add convolution/DSP/roomcorrections here!
- extra_args = []
- filter_params = []
-
- # the below is a very basic 3-band equalizer,
- # this could be a lot more sophisticated at some point
- if (
- eq_bass := self.mass.config.get_raw_player_config_value(
- player.player_id, CONF_EQ_BASS, 0
- )
- ) != 0:
- filter_params.append(f"equalizer=frequency=100:width=200:width_type=h:gain={eq_bass}")
- if (
- eq_mid := self.mass.config.get_raw_player_config_value(player.player_id, CONF_EQ_MID, 0)
- ) != 0:
- filter_params.append(f"equalizer=frequency=900:width=1800:width_type=h:gain={eq_mid}")
- if (
- eq_treble := self.mass.config.get_raw_player_config_value(
- player.player_id, CONF_EQ_TREBLE, 0
- )
- ) != 0:
- filter_params.append(
- f"equalizer=frequency=9000:width=18000:width_type=h:gain={eq_treble}"
- )
- # handle output mixing only left or right
- conf_channels = self.mass.config.get_raw_player_config_value(
- player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
- )
- if conf_channels == "left":
- filter_params.append("pan=mono|c0=FL")
- elif conf_channels == "right":
- filter_params.append("pan=mono|c0=FR")
-
- if filter_params:
- extra_args += ["-af", ",".join(filter_params)]
-
- return generic_args + input_args + extra_args + output_args
-
def _log_request(self, request: web.Request) -> None:
"""Log request."""
if not self.logger.isEnabledFor(logging.DEBUG):
result = await result
self._send_message(SuccessResultMessage(msg.message_id, result))
except Exception as err: # pylint: disable=broad-except
- if self.log_level == "VERBOSE":
+ if self._logger.isEnabledFor(logging.DEBUG):
self._logger.exception("Error handling message: %s", msg)
else:
self._logger.error("Error handling message: %s: %s", msg.command, str(err))
from music_assistant.common.models.media_items import AudioFormat, ContentType, MediaType
from music_assistant.common.models.streamdetails import LoudnessMeasurement, StreamDetails
from music_assistant.constants import (
+ CONF_EQ_BASS,
+ CONF_EQ_MID,
+ CONF_EQ_TREBLE,
+ CONF_OUTPUT_CHANNELS,
CONF_VOLUME_NORMALIZATION,
CONF_VOLUME_NORMALIZATION_TARGET,
ROOT_LOGGER_NAME,
enable_stdout=False,
enable_stderr=True,
) as ffmpeg_proc:
-
- async def writer() -> None:
- """Task that grabs the source audio and feeds it to ffmpeg."""
+ if streamdetails.direct is None:
music_prov = mass.get_provider(streamdetails.provider)
chunk_count = 0
async for audio_chunk in music_prov.get_audio_stream(streamdetails):
break
ffmpeg_proc.write_eof()
- if streamdetails.direct is None:
- writer_task = ffmpeg_proc.attach_task(writer())
- # wait for the writer task to finish
- await writer_task
-
_, stderr = await ffmpeg_proc.communicate()
if loudness_details := _parse_loudnorm(stderr):
LOGGER.debug("Loudness measurement for %s: %s", item_name, loudness_details)
Other than stripping silence at end and beginning and optional
volume normalization this is the pure, unaltered audio data as PCM chunks.
"""
+ logger = LOGGER.getChild("media_stream")
bytes_sent = 0
streamdetails.seconds_skipped = seek_position
is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
strip_silence_end = False
# collect all arguments for ffmpeg
+ filter_params = []
+ extra_args = []
seek_pos = seek_position if (streamdetails.direct or not streamdetails.can_seek) else 0
- args = await _get_ffmpeg_args(
- streamdetails=streamdetails,
- pcm_output_format=pcm_format,
+ if seek_pos:
# only use ffmpeg seeking if the provider stream does not support seeking
- seek_position=seek_pos,
- fade_in=fade_in,
+ extra_args += ["-ss", str(seek_pos)]
+ if streamdetails.target_loudness is not None:
+ # add loudnorm filters
+ filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5"
+ if streamdetails.loudness:
+ filter_rule += f":measured_I={streamdetails.loudness.integrated}"
+ filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
+ filter_rule += f":measured_tp={streamdetails.loudness.true_peak}"
+ filter_rule += f":measured_thresh={streamdetails.loudness.threshold}"
+ filter_rule += ":print_format=json"
+ filter_params.append(filter_rule)
+ if fade_in:
+ filter_params.append("afade=type=in:start_time=0:duration=3")
+ ffmpeg_args = await _get_ffmpeg_args(
+ input_format=streamdetails.audio_format,
+ output_format=pcm_format,
+ filter_params=filter_params,
+ extra_args=extra_args,
+ input_path=streamdetails.direct or "-",
)
- async with AsyncProcess(
- args, enable_stdin=streamdetails.direct is None, enable_stderr=True
- ) as ffmpeg_proc:
- LOGGER.debug("start media stream for: %s", streamdetails.uri)
-
- async def writer() -> None:
- """Task that grabs the source audio and feeds it to ffmpeg."""
- LOGGER.debug("writer started for %s", streamdetails.uri)
- music_prov = mass.get_provider(streamdetails.provider)
- seek_pos = seek_position if streamdetails.can_seek else 0
- async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
- await ffmpeg_proc.write(audio_chunk)
- # write eof when last packet is received
- ffmpeg_proc.write_eof()
- LOGGER.debug("writer finished for %s", streamdetails.uri)
-
- if streamdetails.direct is None:
- ffmpeg_proc.attach_task(writer())
-
- # get pcm chunks from stdout
- # we always stay one chunk behind to properly detect end of chunks
- # so we can strip silence at the beginning and end of a track
- prev_chunk = b""
- chunk_num = 0
- try:
- async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
- chunk_num += 1
- if strip_silence_begin and chunk_num == 2:
- # first 2 chunks received, strip silence of beginning
- stripped_audio = await strip_silence(
- mass,
- prev_chunk + chunk,
- sample_rate=pcm_format.sample_rate,
- bit_depth=pcm_format.bit_depth,
- )
- yield (False, stripped_audio)
- bytes_sent += len(stripped_audio)
- prev_chunk = b""
- del stripped_audio
- continue
- if strip_silence_end and chunk_num >= (expected_chunks - 6):
- # last part of the track, collect multiple chunks to strip silence later
- prev_chunk += chunk
- continue
-
- # middle part of the track, send previous chunk and collect current chunk
- if prev_chunk:
- yield (False, prev_chunk)
- bytes_sent += len(prev_chunk)
+ finished = False
+ logger.debug("start media stream for: %s", streamdetails.uri)
- prev_chunk = chunk
-
- # all chunks received, strip silence of last part
-
- if strip_silence_end and prev_chunk:
- final_chunk = await strip_silence(
+ writer_task: asyncio.Task | None = None
+ ffmpeg_proc = AsyncProcess(
+ ffmpeg_args, enable_stdin=streamdetails.direct is None, enable_stderr=True
+ )
+ await ffmpeg_proc.start()
+
+ async def writer() -> None:
+ """Task that grabs the source audio and feeds it to ffmpeg."""
+ logger.debug("writer started for %s", streamdetails.uri)
+ music_prov = mass.get_provider(streamdetails.provider)
+ seek_pos = seek_position if streamdetails.can_seek else 0
+ async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
+ await ffmpeg_proc.write(audio_chunk)
+ # write eof when last packet is received
+ ffmpeg_proc.write_eof()
+ logger.debug("writer finished for %s", streamdetails.uri)
+
+ if streamdetails.direct is None:
+ writer_task = asyncio.create_task(writer())
+
+ # get pcm chunks from stdout
+ # we always stay one chunk behind to properly detect end of chunks
+ # so we can strip silence at the beginning and end of a track
+ prev_chunk = b""
+ chunk_num = 0
+ try:
+ async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
+ chunk_num += 1
+ if strip_silence_begin and chunk_num == 2:
+ # first 2 chunks received, strip silence of beginning
+ stripped_audio = await strip_silence(
mass,
- prev_chunk,
+ prev_chunk + chunk,
sample_rate=pcm_format.sample_rate,
bit_depth=pcm_format.bit_depth,
- reverse=True,
)
- else:
- final_chunk = prev_chunk
+ yield stripped_audio
+ bytes_sent += len(stripped_audio)
+ prev_chunk = b""
+ del stripped_audio
+ continue
+ if strip_silence_end and chunk_num >= (expected_chunks - 6):
+ # last part of the track, collect multiple chunks to strip silence later
+ prev_chunk += chunk
+ continue
- # use communicate to read stderr and wait for exit
- # read log for loudness measurement (or errors)
- _, stderr = await ffmpeg_proc.communicate()
- if ffmpeg_proc.returncode != 0:
- # ffmpeg has a non zero returncode meaning trouble
- LOGGER.getChild("ffmpeg").warning("STREAM ERROR on %s", streamdetails.uri)
- LOGGER.getChild("ffmpeg").warning(stderr.decode())
- elif loudness_details := _parse_loudnorm(stderr):
+ # middle part of the track, send previous chunk and collect current chunk
+ if prev_chunk:
+ yield prev_chunk
+ bytes_sent += len(prev_chunk)
+
+ prev_chunk = chunk
+
+ # all chunks received, strip silence of last part
+
+ if strip_silence_end and prev_chunk:
+ final_chunk = await strip_silence(
+ mass,
+ prev_chunk,
+ sample_rate=pcm_format.sample_rate,
+ bit_depth=pcm_format.bit_depth,
+ reverse=True,
+ )
+ else:
+ final_chunk = prev_chunk
+
+ # ensure the final chunk is sent
+ # its important this is done here at the end so we can catch errors first
+ yield final_chunk
+ bytes_sent += len(final_chunk)
+ del final_chunk
+ del prev_chunk
+ finished = True
+ finally:
+ seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
+ streamdetails.seconds_streamed = seconds_streamed
+ if finished:
+ logger.debug(
+ "finished stream for: %s (%s seconds streamed)",
+ streamdetails.uri,
+ seconds_streamed,
+ )
+ # store accurate duration
+ streamdetails.duration = seek_position + seconds_streamed
+ else:
+ logger.debug(
+ "stream aborted for %s (%s seconds streamed)",
+ streamdetails.uri,
+ seconds_streamed,
+ )
+ if writer_task and not writer_task.done():
+ writer_task.cancel()
+ # use communicate to read stderr and wait for exit
+ # read log for loudness measurement (or errors)
+ _, stderr = await ffmpeg_proc.communicate()
+ if ffmpeg_proc.returncode != 0:
+ # ffmpeg has a non zero returncode meaning trouble
+ logger.warning("STREAM ERROR on %s", streamdetails.uri)
+ logger.warning(stderr.decode())
+ elif loudness_details := _parse_loudnorm(stderr):
+ required_seconds = 300 if streamdetails.media_type == MediaType.RADIO else 60
+ if finished or seconds_streamed >= required_seconds:
LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details)
streamdetails.loudness = loudness_details
await mass.music.set_track_loudness(
streamdetails.item_id, streamdetails.provider, loudness_details
)
- else:
- LOGGER.getChild("ffmpeg").debug(stderr.decode())
-
- # ensure the final chunk is sent and mark as final
- # its important this is done here at the end so we can catch errors first
- yield (True, final_chunk)
- bytes_sent += len(final_chunk)
- del final_chunk
- del prev_chunk
-
- except (asyncio.CancelledError, GeneratorExit):
- LOGGER.debug("media stream aborted for: %s", streamdetails.uri)
- raise
else:
- LOGGER.debug("finished media stream for: %s", streamdetails.uri)
- # store accurate duration
- streamdetails.duration = seek_position + bytes_sent / pcm_sample_size
- finally:
- # report playback
- seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
- streamdetails.seconds_streamed = seconds_streamed
- if seconds_streamed < 20:
- mass.create_task(
- mass.music.mark_item_played(
- streamdetails.media_type, streamdetails.item_id, streamdetails.provider
- )
+ logger.debug(stderr.decode())
+
+ # report playback
+ if finished or seconds_streamed > 30:
+ mass.create_task(
+ mass.music.mark_item_played(
+ streamdetails.media_type, streamdetails.item_id, streamdetails.provider
)
- if music_prov := mass.get_provider(streamdetails.provider):
- mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
+ )
+ if music_prov := mass.get_provider(streamdetails.provider):
+ mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
- if not streamdetails.loudness:
- # send loudness analyze job to background worker
- # note that we only do this if a track was at least been partially played
- mass.create_task(analyze_loudness(mass, streamdetails))
+ if not streamdetails.loudness:
+ # send loudness analyze job to background worker
+ # note that we only do this if a track was at least been partially played
+ mass.create_task(analyze_loudness(mass, streamdetails))
async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool]:
yield data
+async def get_ffmpeg_stream(
+ audio_input: AsyncGenerator[bytes, None] | str,
+ input_format: AudioFormat,
+ output_format: AudioFormat,
+ filter_params: list[str] | None = None,
+ extra_args: list[str] | None = None,
+ chunk_size: int | None = None,
+) -> AsyncGenerator[bytes, None]:
+ """
+ Get the ffmpeg audio stream as async generator.
+
+ Takes care of resampling and/or recoding if needed,
+ according to player preferences.
+ """
+ logger = LOGGER.getChild("media_stream")
+ use_stdin = not isinstance(audio_input, str)
+ if input_format == output_format and not filter_params and not chunk_size and use_stdin:
+ # edge case: input and output exactly the same, we can bypass ffmpeg
+ # return the raw input stream, no actions needed here
+ async for chunk in audio_input:
+ yield chunk
+ return
+
+ ffmpeg_args = await _get_ffmpeg_args(
+ input_format=input_format,
+ output_format=output_format,
+ filter_params=filter_params or [],
+ extra_args=extra_args or [],
+ input_path="-" if use_stdin else audio_input,
+ output_path="-",
+ )
+
+ writer_task: asyncio.Task | None = None
+ ffmpeg_proc = AsyncProcess(
+ ffmpeg_args, enable_stdin=use_stdin, enable_stdout=True, enable_stderr=True
+ )
+ await ffmpeg_proc.start()
+
+ # feed stdin with pcm audio chunks from origin
+ async def writer() -> None:
+ async for chunk in audio_input:
+ if ffmpeg_proc.closed:
+ return
+ await ffmpeg_proc.write(chunk)
+ ffmpeg_proc.write_eof()
+
+ try:
+ if not isinstance(audio_input, str):
+ writer_task = asyncio.create_task(writer())
+
+ # read final chunks from stdout
+ chunk_size = chunk_size or get_chunksize(output_format, 1)
+ async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
+ try:
+ yield chunk
+ except (BrokenPipeError, ConnectionResetError):
+ # race condition
+ break
+ finally:
+ if writer_task and not writer_task.done():
+ writer_task.cancel()
+ # use communicate to read stderr and wait for exit
+ # read log for loudness measurement (or errors)
+ _, stderr = await ffmpeg_proc.communicate()
+ if ffmpeg_proc.returncode != 0:
+ # ffmpeg has a non zero returncode meaning trouble
+ logger.warning("FFMPEG ERROR\n%s", stderr.decode())
+ else:
+ logger.debug(stderr.decode())
+
+
async def check_audio_support() -> tuple[bool, bool, str]:
"""Check if ffmpeg is present (with/without libsoxr support)."""
cache_key = "audio_support_cache"
"ffmpeg",
"-hide_banner",
"-loglevel",
- "quiet",
+ "info",
"-ignore_unknown",
]
if streamdetails.direct:
output_args = ["-to", "30", "-f", "mp3", "-"]
args = input_args + output_args
- async with AsyncProcess(args, True) as ffmpeg_proc:
- async def writer() -> None:
- """Task that grabs the source audio and feeds it to ffmpeg."""
- music_prov = mass.get_provider(streamdetails.provider)
- async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30):
- await ffmpeg_proc.write(audio_chunk)
- # write eof when last packet is received
- ffmpeg_proc.write_eof()
+ writer_task: asyncio.Task | None = None
+ ffmpeg_proc = AsyncProcess(args, enable_stdin=True, enable_stdout=True, enable_stderr=False)
+ await ffmpeg_proc.start()
- if not streamdetails.direct:
- ffmpeg_proc.attach_task(writer())
+ async def writer() -> None:
+ """Task that grabs the source audio and feeds it to ffmpeg."""
+ music_prov = mass.get_provider(streamdetails.provider)
+ async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30):
+ await ffmpeg_proc.write(audio_chunk)
+ # write eof when last packet is received
+ ffmpeg_proc.write_eof()
- # yield chunks from stdout
+ if not streamdetails.direct:
+ writer_task = asyncio.create_task(writer())
+
+ # yield chunks from stdout
+ try:
async for chunk in ffmpeg_proc.iter_any():
yield chunk
+ finally:
+ if writer_task and not writer_task.done():
+ writer_task.cancel()
+ await ffmpeg_proc.close()
async def get_silence(
"-t",
str(duration),
"-f",
- output_format.output_fmt.value,
+ output_format.output_format_str,
"-",
]
async with AsyncProcess(args) as ffmpeg_proc:
return int((320000 / 8) * seconds)
+def get_player_filter_params(
+ mass: MusicAssistant,
+ player_id: str,
+) -> list[str]:
+ """Get player specific filter parameters for ffmpeg (if any)."""
+ # collect all players-specific filter args
+ # TODO: add convolution/DSP/roomcorrections here?!
+ filter_params = []
+
+ # the below is a very basic 3-band equalizer,
+ # this could be a lot more sophisticated at some point
+ if (eq_bass := mass.config.get_raw_player_config_value(player_id, CONF_EQ_BASS, 0)) != 0:
+ filter_params.append(f"equalizer=frequency=100:width=200:width_type=h:gain={eq_bass}")
+ if (eq_mid := mass.config.get_raw_player_config_value(player_id, CONF_EQ_MID, 0)) != 0:
+ filter_params.append(f"equalizer=frequency=900:width=1800:width_type=h:gain={eq_mid}")
+ if (eq_treble := mass.config.get_raw_player_config_value(player_id, CONF_EQ_TREBLE, 0)) != 0:
+ filter_params.append(f"equalizer=frequency=9000:width=18000:width_type=h:gain={eq_treble}")
+ # handle output mixing only left or right
+ conf_channels = mass.config.get_raw_player_config_value(
+ player_id, CONF_OUTPUT_CHANNELS, "stereo"
+ )
+ if conf_channels == "left":
+ filter_params.append("pan=mono|c0=FL")
+ elif conf_channels == "right":
+ filter_params.append("pan=mono|c0=FR")
+
+ return filter_params
+
+
async def _get_ffmpeg_args(
- streamdetails: StreamDetails,
- pcm_output_format: AudioFormat,
- seek_position: int = 0,
- fade_in: bool = False,
+ input_format: AudioFormat,
+ output_format: AudioFormat,
+ filter_params: list[str],
+ extra_args: list[str],
+ input_path: str = "-",
+ output_path: str = "-",
) -> list[str]:
"""Collect all args to send to the ffmpeg process."""
ffmpeg_present, libsoxr_support, version = await check_audio_support()
# collect input args
input_args = [
"-ac",
- str(streamdetails.audio_format.channels),
+ str(input_format.channels),
"-channel_layout",
- "mono" if streamdetails.audio_format.channels == 1 else "stereo",
+ "mono" if input_format.channels == 1 else "stereo",
]
- if seek_position:
- input_args += ["-ss", str(seek_position)]
- if streamdetails.direct:
- # ffmpeg can access the inputfile (or url) directly
- if streamdetails.direct.startswith("http"):
- # append reconnect options for direct stream from http
+ if input_format.content_type.is_pcm():
+ input_args += ["-ar", str(input_format.sample_rate)]
+ if input_path.startswith("http"):
+ # append reconnect options for direct stream from http
+ input_args += [
+ "-reconnect",
+ "1",
+ "-reconnect_streamed",
+ "1",
+ "-reconnect_delay_max",
+ "10",
+ ]
+ if major_version > 4:
+ # these options are only supported in ffmpeg > 5
input_args += [
- "-reconnect",
+ "-reconnect_on_network_error",
"1",
- "-reconnect_streamed",
- "1",
- "-reconnect_delay_max",
- "10",
+ "-reconnect_on_http_error",
+ "5xx",
]
- if major_version > 4:
- # these options are only supported in ffmpeg > 5
- input_args += [
- "-reconnect_on_network_error",
- "1",
- "-reconnect_on_http_error",
- "5xx",
- ]
-
- input_args += ["-i", streamdetails.direct]
- else:
- # the input is received from pipe/stdin
- if streamdetails.audio_format.content_type != ContentType.UNKNOWN:
- input_args += ["-f", streamdetails.audio_format.content_type.value]
- input_args += [
- "-i",
- "-",
- ]
+ if input_format.content_type != ContentType.UNKNOWN:
+ input_args += ["-f", input_format.content_type.value]
+ input_args += ["-i", input_path]
# collect output args
output_args = [
"-acodec",
- pcm_output_format.content_type.name.lower(),
+ output_format.content_type.name.lower(),
"-f",
- pcm_output_format.content_type.value,
+ output_format.content_type.value,
"-ac",
- str(pcm_output_format.channels),
+ str(output_format.channels),
"-ar",
- str(pcm_output_format.sample_rate),
- "-",
+ str(output_format.sample_rate),
+ output_path,
]
- # collect extra and filter args
- extra_args = []
- filter_params = []
- if streamdetails.target_loudness is not None:
- filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5"
- if streamdetails.loudness:
- filter_rule += f":measured_I={streamdetails.loudness.integrated}"
- filter_rule += f":measured_LRA={streamdetails.loudness.lra}"
- filter_rule += f":measured_tp={streamdetails.loudness.true_peak}"
- filter_rule += f":measured_thresh={streamdetails.loudness.threshold}"
- filter_rule += ":print_format=json"
- filter_params.append(filter_rule)
- if (
- streamdetails.audio_format.sample_rate != pcm_output_format.sample_rate
- and libsoxr_support
- and streamdetails.media_type == MediaType.TRACK
- ):
- # prefer libsoxr high quality resampler (if present) for sample rate conversions
+
+ # prefer libsoxr high quality resampler (if present) for sample rate conversions
+ if input_format.sample_rate != output_format.sample_rate and libsoxr_support:
filter_params.append("aresample=resampler=soxr")
- if fade_in:
- filter_params.append("afade=type=in:start_time=0:duration=3")
+
if filter_params:
extra_args += ["-af", ",".join(filter_params)]
"</DIDL-Lite>"
)
is_radio = queue_item.media_type != MediaType.TRACK or not queue_item.duration
- image_url = mass.metadata.get_image_url(queue_item.image) if queue_item.image else ""
+ image_url = (
+ mass.metadata.get_image_url(queue_item.image) if queue_item.image else MASS_LOGO_ONLINE
+ )
if is_radio:
# radio or other non-track item
return (
'<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
- '<item id="1" parentID="0" restricted="1">'
+ f'<item id="flowmode" parentID="0" restricted="1">'
f"<dc:title>{escape_string(queue_item.name)}</dc:title>"
f"<upnp:albumArtURI>{escape_string(image_url)}</upnp:albumArtURI>"
f"<dc:queueItemId>{queue_item.queue_item_id}</dc:queueItemId>"
from typing import TYPE_CHECKING
if TYPE_CHECKING:
- from collections.abc import AsyncGenerator, Coroutine
+ from collections.abc import AsyncGenerator
LOGGER = logging.getLogger(__name__)
self._enable_stdin = enable_stdin
self._enable_stdout = enable_stdout
self._enable_stderr = enable_stderr
- self._attached_task: asyncio.Task = None
- self.closed = False
- self.returncode: int | None = None
+
+ @property
+ def closed(self) -> bool:
+ """Return if the process was closed."""
+ return self.returncode is not None
+
+ @property
+ def returncode(self) -> int | None:
+ """Return the erturncode of the process."""
+ if self._proc is None:
+ return None
+ return self._proc.returncode
async def __aenter__(self) -> AsyncProcess:
"""Enter context manager."""
# already exited, race condition
pass
- async def close(self) -> None:
- """Close/terminate the process."""
- self.closed = True
- if self._attached_task and not self._attached_task.done():
- with suppress(asyncio.CancelledError):
- self._attached_task.cancel()
+ async def close(self) -> int:
+ """Close/terminate the process and wait for exit."""
+ if self.returncode is not None:
+ return self.returncode
# make sure the process is cleaned up
- self.write_eof()
- if self._proc.returncode is None:
- try:
- async with asyncio.timeout(10):
- await self.communicate()
- except TimeoutError:
- self._proc.kill()
- await self.wait()
+ try:
+ async with asyncio.timeout(10):
+ await self.communicate()
+ except (TimeoutError, asyncio.CancelledError):
+ self._proc.terminate()
+ return await self.wait()
async def wait(self) -> int:
"""Wait for the process and return the returncode."""
if self.returncode is not None:
return self.returncode
- if self._proc.returncode is not None:
- self.returncode = self._proc.returncode
- return self.returncode
- self.returncode = await self._proc.wait()
- self.closed = True
- return self.returncode
+ return await self._proc.wait()
async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
"""Write bytes to process and read back results."""
stdout, stderr = await self._proc.communicate(input_data)
- self.returncode = self._proc.returncode
return (stdout, stderr)
async def read_stderr(self, n: int = -1) -> bytes:
"""
return await self._proc.stderr.read(n)
- def attach_task(self, coro: Coroutine) -> asyncio.Task:
- """Attach given coro func as reader/writer task to properly cancel it when needed."""
- self._attached_task = task = asyncio.create_task(coro)
- return task
-
async def check_output(shell_cmd: str) -> tuple[int, bytes]:
"""Run shell subprocess and return output."""
from __future__ import annotations
+import asyncio
import json
import logging
import os
file_path,
)
- async with AsyncProcess(
+ writer_task: asyncio.Task | None = None
+ ffmpeg_proc = AsyncProcess(
args, enable_stdin=file_path == "-", enable_stdout=True, enable_stderr=False
- ) as proc:
- if file_path == "-":
- # feed the file contents to the process
-
- async def chunk_feeder() -> None:
- bytes_read = 0
- try:
- async for chunk in input_file:
- if proc.closed:
- break
- await proc.write(chunk)
- bytes_read += len(chunk)
- del chunk
- if bytes_read > 25 * 1000000:
- # this is possibly a m4a file with 'moove atom' metadata at the
- # end of the file
- # we'll have to read the entire file to do something with it
- # for now we just ignore/deny these files
- LOGGER.error("Found file with tags not present at beginning of file")
- break
- finally:
- proc.write_eof()
-
- proc.attach_task(chunk_feeder())
-
- try:
- res = await proc.read(-1)
- data = json.loads(res)
- if error := data.get("error"):
- raise InvalidDataError(error["string"])
- if not data.get("streams"):
- msg = "Not an audio file"
- raise InvalidDataError(msg)
- tags = AudioTags.parse(data)
- del res
- del data
- if not tags.duration and file_size and tags.bit_rate:
- # estimate duration from filesize/bitrate
- tags.duration = int((file_size * 8) / tags.bit_rate)
- return tags
- except (KeyError, ValueError, JSONDecodeError, InvalidDataError) as err:
- msg = f"Unable to retrieve info for {file_path}: {err!s}"
- raise InvalidDataError(msg) from err
+ )
+ await ffmpeg_proc.start()
+
+ async def writer() -> None:
+ bytes_read = 0
+ async for chunk in input_file:
+ if ffmpeg_proc.closed:
+ break
+ await ffmpeg_proc.write(chunk)
+ bytes_read += len(chunk)
+ del chunk
+ if bytes_read > 25 * 1000000:
+ # this is possibly a m4a file with 'moove atom' metadata at the
+ # end of the file
+ # we'll have to read the entire file to do something with it
+ # for now we just ignore/deny these files
+ LOGGER.error("Found file with tags not present at beginning of file")
+ break
+
+ if file_path == "-":
+ # feed the file contents to the process
+ writer_task = asyncio.create_task(writer)
+
+ try:
+ res = await ffmpeg_proc.read(-1)
+ data = json.loads(res)
+ if error := data.get("error"):
+ raise InvalidDataError(error["string"])
+ if not data.get("streams"):
+ msg = "Not an audio file"
+ raise InvalidDataError(msg)
+ tags = AudioTags.parse(data)
+ del res
+ del data
+ if not tags.duration and file_size and tags.bit_rate:
+ # estimate duration from filesize/bitrate
+ tags.duration = int((file_size * 8) / tags.bit_rate)
+ return tags
+ except (KeyError, ValueError, JSONDecodeError, InvalidDataError) as err:
+ msg = f"Unable to retrieve info for {file_path}: {err!s}"
+ raise InvalidDataError(msg) from err
+ finally:
+ if writer_task and not writer_task.done():
+ writer_task.cancel()
+ await ffmpeg_proc.close()
async def get_embedded_image(input_file: str | AsyncGenerator[bytes, None]) -> bytes | None:
"-",
)
- async with AsyncProcess(
+ writer_task: asyncio.Task | None = None
+ ffmpeg_proc = AsyncProcess(
args, enable_stdin=file_path == "-", enable_stdout=True, enable_stderr=False
- ) as proc:
- if file_path == "-":
- # feed the file contents to the process
- async def chunk_feeder() -> None:
- try:
- async for chunk in input_file:
- if proc.closed:
- break
- await proc.write(chunk)
- finally:
- proc.write_eof()
-
- proc.attach_task(chunk_feeder())
-
- return await proc.read(-1)
+ )
+ await ffmpeg_proc.start()
+
+ async def writer() -> None:
+ async for chunk in input_file:
+ if ffmpeg_proc.closed:
+ break
+ await ffmpeg_proc.write(chunk)
+ ffmpeg_proc.write_eof()
+
+ # feed the file contents to the process stdin
+ if file_path == "-":
+ writer_task = asyncio.create_task(writer)
+
+ # return image bytes from stdout
+ try:
+ return await ffmpeg_proc.read(-1)
+ finally:
+ if writer_task and not writer_task.cancelled():
+ writer_task.cancel()
+ await ffmpeg_proc.close()
if TYPE_CHECKING:
from music_assistant.common.models.player import Player
from music_assistant.common.models.queue_item import QueueItem
- from music_assistant.server.controllers.streams import MultiClientStreamJob
# ruff: noqa: ARG001, ARG002
"""
raise NotImplementedError
- async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle PLAY STREAM on given player.
-
- This is a special feature from the Universal Group provider.
- """
- raise NotImplementedError
-
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
"""
Handle enqueuing of the next queue item on the player.
from zeroconf.asyncio import AsyncServiceInfo
from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import get_ip_pton, select_free_port
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_EQ_BASS,
+ CONF_ENTRY_EQ_MID,
+ CONF_ENTRY_EQ_TREBLE,
+ CONF_ENTRY_OUTPUT_CHANNELS,
CONF_ENTRY_SYNC_ADJUST,
ConfigEntry,
ConfigValueType,
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.constants import CONF_SYNC_ADJUST
+from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
from music_assistant.server.helpers.process import check_output
from music_assistant.server.models.player_provider import PlayerProvider
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
- from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
DOMAIN = "airplay"
PLAYER_CONFIG_ENTRIES = (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_EQ_BASS,
+ CONF_ENTRY_EQ_MID,
+ CONF_ENTRY_EQ_TREBLE,
+ CONF_ENTRY_OUTPUT_CHANNELS,
ConfigEntry(
key=CONF_ENCRYPTION,
type=ConfigEntryType.BOOLEAN,
self.active_remote_id: str = str(randint(1000, 8000))
self.start_ntp: int | None = None # use as checksum
self.prevent_playback: bool = False
- self._audio_buffer = asyncio.Queue(2)
+ self._audio_iterator: AsyncGenerator[bytes, None] | None = None
self._log_reader_task: asyncio.Task | None = None
self._audio_reader_task: asyncio.Task | None = None
self._cliraop_proc: asyncio.subprocess.Process | None = None
and self._cliraop_proc.returncode is None
)
- async def init_cliraop(self, start_ntp: int) -> None:
+ async def start(self, start_ntp: int, audio_iterator: AsyncGenerator[bytes, None]) -> None:
"""Initialize CLIRaop process for a player."""
self.start_ntp = start_ntp
+ self._audio_iterator = audio_iterator
extra_args = []
player_id = self.airplay_player.player_id
mass_player = self.mass.players.get(player_id)
extra_args += ["-encrypt"]
if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
extra_args += ["-alac"]
- if "airport" in mass_player.device_info.model.lower():
- # enforce auth on airport express
- extra_args += ["-auth"]
for prop in ("et", "md", "am", "pk", "pw"):
if prop_value := self.airplay_player.discovery_info.decoded_properties.get(prop):
extra_args += [f"-{prop}", prop_value]
if device_password := self.mass.config.get_raw_player_config_value(
player_id, CONF_PASSWORD, None
):
- # NOTE: This may not work as we might need to do
- # some fancy hashing with the plain password first?!
extra_args += ["-password", device_password]
if self.prov.log_level == "DEBUG":
extra_args += ["-debug", "5"]
return
await self.send_cli_command("ACTION=STOP")
self._stop_requested = True
+ if not force:
+ return
# stop background tasks
- if self._log_reader_task and not self._log_reader_task.done():
- if force:
- self._log_reader_task.cancel()
- with suppress(asyncio.CancelledError):
- await self._log_reader_task
if self._audio_reader_task and not self._audio_reader_task.done():
- if force:
- self._audio_reader_task.cancel()
with suppress(asyncio.CancelledError):
+ self._audio_reader_task.cancel()
await self._audio_reader_task
-
- empty_queue(self._audio_buffer)
- await asyncio.wait_for(self._cliraop_proc.communicate(), 30)
+ if self._log_reader_task and not self._log_reader_task.done():
+ with suppress(asyncio.CancelledError):
+ self._log_reader_task.cancel()
+ await self._log_reader_task
+ with suppress(TimeoutError):
+ await asyncio.wait_for(self._cliraop_proc.communicate(), 5)
+ if self._cliraop_proc.returncode is None:
+ self._cliraop_proc.kill()
async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLIRaop binary."""
self.mass.players.update(airplay_player.player_id)
async def _audio_reader(self) -> None:
- """Read audio chunks from buffer and send them to the cliraop process."""
+ """Read audio chunks and send them to the cliraop process."""
logger = self.airplay_player.logger
- logger.debug("Audio reader started")
- while self.running:
- chunk = await self._audio_buffer.get()
- if chunk == b"EOF":
- # EOF chunk
- break
+ mass_player = self.mass.players.get(self.airplay_player.player_id, True)
+ queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
+ logger.debug(
+ "Starting RAOP stream for Queue %s to %s",
+ queue.display_name,
+ mass_player.display_name,
+ )
+ prev_metadata_checksum: str = ""
+ prev_progress_report: float = 0
+ async for chunk in self._audio_iterator:
+ if not self.running:
+ return
self._cliraop_proc.stdin.write(chunk)
- with suppress(BrokenPipeError, ConnectionResetError):
+ try:
await self._cliraop_proc.stdin.drain()
+ except (BrokenPipeError, ConnectionResetError):
+ break
+ # send metadata to player(s) if needed
+ # NOTE: this must all be done in separate tasks to not disturb audio
+ now = time.time()
+ if queue and queue.current_item and queue.current_item.streamdetails:
+ metadata_checksum = (
+ queue.current_item.streamdetails.stream_title
+ or queue.current_item.queue_item_id
+ )
+ if prev_metadata_checksum != metadata_checksum:
+ prev_metadata_checksum = metadata_checksum
+ prev_progress_report = now
+ self.mass.create_task(self._send_metadata(queue))
+ # send the progress report every 5 seconds
+ elif now - prev_progress_report >= 5:
+ prev_progress_report = now
+ self.mass.create_task(self._send_progress(queue))
# send EOF
if self._cliraop_proc.returncode is None and not self._cliraop_proc.stdin.is_closing():
self._cliraop_proc.stdin.write_eof()
with suppress(BrokenPipeError, ConnectionResetError):
await self._cliraop_proc.stdin.drain()
- logger.debug("Audio reader finished")
+ logger.debug(
+ "Finished RAOP stream for Queue %s to %s",
+ queue.display_name,
+ mass_player.display_name,
+ )
- async def write_chunk(self, data: bytes) -> None:
- """Write a chunk of (pcm) data to the audio buffer."""
- if not self.running:
+ async def _send_metadata(self, queue: PlayerQueue) -> None:
+ """Send metadata to player (and connected sync childs)."""
+ if not queue or not queue.current_item:
return
- await self._audio_buffer.put(data)
+ duration = min(queue.current_item.duration or 0, 3600)
+ title = queue.current_item.name
+ artist = ""
+ album = ""
+ if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title:
+ # stream title from radio station
+ stream_title = queue.current_item.streamdetails.stream_title
+ if " - " in stream_title:
+ artist, title = stream_title.split(" - ", 1)
+ else:
+ title = stream_title
+ # set album to radio station name
+ album = queue.current_item.name
+ if media_item := queue.current_item.media_item:
+ if artist_str := getattr(media_item, "artist_str", None):
+ artist = artist_str
+ if _album := getattr(media_item, "album", None):
+ album = _album.name
- async def write_eof(self) -> None:
- """Write end-of-file chunk to the audo buffer."""
- if not self.running:
+ cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n"
+ cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
+
+ await self.send_cli_command(cmd)
+
+ # get image
+ if not queue.current_item.image:
return
- await self._audio_buffer.put(b"EOF")
+
+ # the image format needs to be 500x500 jpeg for maximum compatibility with players
+ image_url = self.mass.metadata.get_image_url(
+ queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg"
+ )
+ await self.send_cli_command(f"ARTWORK={image_url}\n")
+
+ async def _send_progress(self, queue: PlayerQueue) -> None:
+ """Send progress report to player (and connected sync childs)."""
+ if not queue or not queue.current_item:
+ return
+ progress = int(queue.corrected_elapsed_time)
+ await self.send_cli_command(f"PROGRESS={progress}\n")
@dataclass
cliraop_bin: str | None = None
_players: dict[str, AirPlayPlayer]
_discovery_running: bool = False
- _stream_tasks: dict[str, asyncio.Task]
_dacp_server: asyncio.Server = None
_dacp_info: AsyncServiceInfo = None
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self._players = {}
- self._stream_tasks = {}
self.cliraop_bin = await self._getcliraop_binary()
dacp_port = await select_free_port(39831, 49831)
self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}"
- player_id: player_id of the player to handle the command.
"""
- if existing_stream := self._stream_tasks.get(player_id):
- existing_stream.cancel()
async def stop_player(airplay_player: AirPlayPlayer) -> None:
if airplay_player.active_stream:
- seek_position: Optional seek to this position.
- fade_in: Optionally fade in the item at playback start.
"""
+ player = self.mass.players.get(player_id)
+ if player.synced_to:
+ # should not happen, but just in case
+ raise RuntimeError("Player is synced")
# fix race condition where resync and play media are called at more or less the same time
if self._resync_handle:
self._resync_handle.cancel()
self._resync_handle = None
# always stop existing stream first
- if existing_stream := self._stream_tasks.get(player_id):
- existing_stream.cancel()
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream.running:
self.mass.create_task(airplay_player.active_stream.stop(force=True))
- # start streaming the queue (pcm) audio in a background task
- queue = self.mass.player_queues.get_active_queue(player_id)
- self._stream_tasks[player_id] = asyncio.create_task(
- self._stream_audio(
- player_id,
- queue=queue,
- audio_iterator=self.mass.streams.get_flow_stream(
- queue,
- start_queue_item=queue_item,
- pcm_format=AudioFormat(
- content_type=ContentType.PCM_S16LE,
- sample_rate=44100,
- bit_depth=16,
- channels=2,
- ),
- seek_position=seek_position,
- fade_in=fade_in,
- ),
- )
+ pcm_format = AudioFormat(
+ content_type=ContentType.PCM_S16LE,
+ sample_rate=44100,
+ bit_depth=16,
+ channels=2,
)
- async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle PLAY STREAM on given player.
-
- This is a special feature from the Universal Group provider.
- """
- # fix race condition where resync and play media are called at more or less the same time
- if self._resync_handle:
- self._resync_handle.cancel()
- self._resync_handle = None
- # always stop existing stream first
- if existing_stream := self._stream_tasks.get(player_id):
- existing_stream.cancel()
- async with asyncio.TaskGroup() as tg:
- for airplay_player in self._get_sync_clients(player_id):
- if airplay_player.active_stream and airplay_player.active_stream.running:
- tg.create_task(airplay_player.active_stream.stop(force=True))
- if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100:
- # TODO: resample on the fly here ?
- raise RuntimeError("Unsupported PCM format")
- # start streaming the queue (pcm) audio in a background task
- queue = self.mass.player_queues.get_active_queue(player_id)
- self._stream_tasks[player_id] = asyncio.create_task(
- self._stream_audio(
- player_id,
- queue=queue,
- audio_iterator=stream_job.subscribe(player_id),
+ if queue_item.queue_item_id == "flow":
+ # handle special case for UGP multi client stream
+ stream_job = self.mass.streams.multi_client_jobs.get(queue_item.queue_id)
+ elif player.group_childs:
+ # create a new multi client flow stream
+ stream_job = await self.mass.streams.create_multi_client_stream_job(
+ queue_item.queue_id,
+ queue_item,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ pcm_bit_depth=16,
+ pcm_sample_rate=44100,
)
- )
-
- async def _stream_audio(
- self, player_id: str, queue: PlayerQueue, audio_iterator: AsyncGenerator[bytes, None]
- ) -> None:
- """Handle the actual streaming of audio to Airplay."""
- player = self.mass.players.get(player_id)
- if player.synced_to:
- # should not happen, but just in case
- raise RuntimeError("Player is synced")
- synced_player_ids = [x.player_id for x in self._get_sync_clients(player_id)]
- self.logger.debug(
- "Starting RAOP stream for Queue %s to %s",
- queue.display_name,
- "/".join(synced_player_ids),
- )
+ else:
+ # for a single player we just consume the flow stream directly
+ stream_job = None
# Python is not suitable for realtime audio streaming.
# So, I've decided to go the fancy route here. I've created a small binary
start_ntp = int(stdout.strip())
# setup Raop process for player and its sync childs
- for airplay_player in self._get_sync_clients(player_id):
- airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
- await airplay_player.active_stream.init_cliraop(start_ntp)
- prev_metadata_checksum: str = ""
- prev_progress_report: float = 0
- async for pcm_chunk in audio_iterator:
- # send audio chunk to player(s)
- available_clients = 0
+ async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
- if (
- not airplay_player.active_stream
- or not airplay_player.active_stream.running
- or airplay_player.active_stream.start_ntp != start_ntp
- ):
- # catch when this stream is no longer active on the player
- continue
- available_clients += 1
- await airplay_player.active_stream.write_chunk(pcm_chunk)
- if not available_clients:
- # this streamjob is no longer active
- return
-
- # send metadata to player(s) if needed
- # NOTE: this must all be done in separate tasks to not disturb audio
- now = time.time()
- if queue and queue.current_item and queue.current_item.streamdetails:
- metadata_checksum = (
- queue.current_item.streamdetails.stream_title
- or queue.current_item.queue_item_id
- )
- if prev_metadata_checksum != metadata_checksum:
- prev_metadata_checksum = metadata_checksum
- prev_progress_report = now
- self.mass.create_task(self._send_metadata(player_id, queue))
- # send the progress report every 5 seconds
- elif now - prev_progress_report >= 5:
- prev_progress_report = now
- self.mass.create_task(self._send_progress(player_id, queue))
-
- # end of stream reached - write eof
- self.logger.debug(
- "Finished RAOP stream for Queue %s to %s",
- queue.display_name,
- "/".join(synced_player_ids),
- )
- for airplay_player in self._get_sync_clients(player_id):
- if (
- not airplay_player.active_stream
- or not airplay_player.active_stream.running
- or airplay_player.active_stream.start_ntp != start_ntp
- ):
- # this may not happen, but guard just in case
- continue
- await airplay_player.active_stream.write_eof()
+ if stream_job:
+ stream_job.expected_players.add(airplay_player.player_id)
+ audio_iterator = stream_job.subscribe(
+ player_id=airplay_player.player_id,
+ output_format=pcm_format,
+ )
+ else:
+ queue = self.mass.player_queues.get_active_queue(queue_item.queue_id)
+ audio_iterator = get_ffmpeg_stream(
+ self.mass.streams.get_flow_stream(
+ queue,
+ start_queue_item=queue_item,
+ pcm_format=pcm_format,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ ),
+ input_format=pcm_format,
+ output_format=pcm_format,
+ filter_params=get_player_filter_params(self.mass, airplay_player.player_id),
+ )
+ airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
+ tg.create_task(airplay_player.active_stream.start(start_ntp, audio_iterator))
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.
# device switched to another source (or is powered off)
if active_stream := airplay_player.active_stream:
# ignore this if we just started playing to prevent false positives
- if mass_player.elapsed_time > 2 and mass_player.state == PlayerState.PLAYING:
+ if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING:
active_stream.prevent_playback = True
self.mass.create_task(self.monitor_prevent_playback(player_id))
elif "device-prevent-playback=0" in path:
finally:
writer.close()
- async def _send_metadata(self, player_id: str, queue: PlayerQueue) -> None:
- """Send metadata to player (and connected sync childs)."""
- if not queue or not queue.current_item:
- return
- duration = min(queue.current_item.duration or 0, 3600)
- title = queue.current_item.name
- artist = ""
- album = ""
- if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title:
- # stream title from radio station
- stream_title = queue.current_item.streamdetails.stream_title
- if " - " in stream_title:
- artist, title = stream_title.split(" - ", 1)
- else:
- title = stream_title
- # set album to radio station name
- album = queue.current_item.name
- if media_item := queue.current_item.media_item:
- if artist_str := getattr(media_item, "artist_str", None):
- artist = artist_str
- if _album := getattr(media_item, "album", None):
- album = _album.name
-
- cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n"
- cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
-
- for airplay_player in self._get_sync_clients(player_id):
- if not airplay_player.active_stream or not airplay_player.active_stream.running:
- continue
- await airplay_player.active_stream.send_cli_command(cmd)
-
- # get image
- if not queue.current_item.image:
- return
-
- # the image format needs to be 500x500 jpeg for maximum compatibility with players
- image_url = self.mass.metadata.get_image_url(
- queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg"
- )
- for airplay_player in self._get_sync_clients(player_id):
- if not airplay_player.active_stream or not airplay_player.active_stream.running:
- continue
- await airplay_player.active_stream.send_cli_command(f"ARTWORK={image_url}\n")
-
- async def _send_progress(self, player_id: str, queue: PlayerQueue) -> None:
- """Send progress report to player (and connected sync childs)."""
- if not queue or not queue.current_item:
- return
- progress = int(queue.corrected_elapsed_time)
- for airplay_player in self._get_sync_clients(player_id):
- if not airplay_player.active_stream or not airplay_player.active_stream.running:
- continue
- await airplay_player.active_stream.send_cli_command(f"PROGRESS={progress}\n")
-
async def monitor_prevent_playback(self, player_id: str):
"""Monitor the prevent playback state of an airplay player."""
count = 0
return
if not active_stream.prevent_playback:
return
- await asyncio.sleep(0.25)
+ await asyncio.sleep(0.5)
airplay_player.logger.info(
"Player has been in prevent playback mode for too long, powering off.",
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
- from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
player_id, CONF_FLOW_MODE
) or await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
url = await self.mass.streams.resolve_stream_url(
+ player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
seek_position=seek_position,
media_controller = castplayer.cc.media_controller
await asyncio.to_thread(media_controller.send_message, queuedata, True)
- async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle PLAY STREAM on given player.
-
- This is a special feature from the Universal Group provider.
- """
- url = stream_job.resolve_stream_url(player_id, ContentType.FLAC)
- castplayer = self.castplayers[player_id]
- await asyncio.to_thread(
- castplayer.cc.play_media,
- url,
- content_type="audio/flac",
- title="Music Assistant",
- thumb=MASS_LOGO_ONLINE,
- )
-
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
"""Handle enqueuing of the next queue item on the player."""
castplayer = self.castplayers[player_id]
url = await self.mass.streams.resolve_stream_url(
+ player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
)
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
- from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
BASE_PLAYER_FEATURES = (
use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
url = await self.mass.streams.resolve_stream_url(
+ player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
seek_position=seek_position,
dlna_player.force_poll = True
await self.poll_player(dlna_player.udn)
- @catch_request_errors
- async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle PLAY STREAM on given player.
-
- This is a special feature from the Universal Group provider.
- """
- enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
- output_codec = ContentType.MP3 if enforce_mp3 else ContentType.FLAC
- url = stream_job.resolve_stream_url(player_id, output_codec)
- dlna_player = self.dlnaplayers[player_id]
- # always clear queue (by sending stop) first
- if dlna_player.device.can_stop:
- await self.cmd_stop(player_id)
- didl_metadata = create_didl_metadata(self.mass, url, None)
- await dlna_player.device.async_set_transport_uri(url, "Music Assistant", didl_metadata)
- # Play it
- await dlna_player.device.async_wait_for_can_play(10)
- # optimistically set this timestamp to help in case of a player
- # that does not report the progress
- now = time.time()
- dlna_player.player.elapsed_time = 0
- dlna_player.player.elapsed_time_last_updated = now
- await dlna_player.device.async_play()
- # force poll the device
- for sleep in (1, 2):
- await asyncio.sleep(sleep)
- dlna_player.force_poll = True
- await self.poll_player(dlna_player.udn)
-
@catch_request_errors
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
"""Handle enqueuing of the next queue item on the player."""
dlna_player = self.dlnaplayers[player_id]
url = await self.mass.streams.resolve_stream_url(
+ player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
)
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
- from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
AUDIOMANAGER_STREAM_MUSIC = 3
player = self.mass.players.get(player_id)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
url = await self.mass.streams.resolve_stream_url(
+ player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
seek_position=seek_position,
player.state = PlayerState.PLAYING
self.mass.players.update(player_id)
- async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle PLAY STREAM on given player.
-
- This is a special feature from the Universal Group provider.
- """
- player = self.mass.players.get(player_id)
- enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
- output_codec = ContentType.MP3 if enforce_mp3 else ContentType.FLAC
- url = stream_job.resolve_stream_url(player_id, output_codec)
- await self._fully.playSound(url, AUDIOMANAGER_STREAM_MUSIC)
- player.current_item_id = player_id
- player.elapsed_time = 0
- player.elapsed_time_last_updated = time.time()
- player.state = PlayerState.PLAYING
- self.mass.players.update(player_id)
-
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates.
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
- from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
from music_assistant.server.providers.hass import HomeAssistant as HomeAssistantProvider
use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
url = await self.mass.streams.resolve_stream_url(
+ player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
seek_position=seek_position,
player.elapsed_time = 0
player.elapsed_time_last_updated = time.time()
- async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle PLAY STREAM on given player.
-
- This is a special feature from the Universal Group provider.
- """
- enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
- output_codec = ContentType.MP3 if enforce_mp3 else ContentType.FLAC
- url = stream_job.resolve_stream_url(player_id, output_codec)
- await self.hass_prov.hass.call_service(
- domain="media_player",
- service="play_media",
- service_data={
- "media_content_id": url,
- "media_content_type": "music",
- "enqueue": "replace",
- },
- target={"entity_id": player_id},
- )
-
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
"""
Handle enqueuing of the next queue item on the player.
This will NOT be called if the player is using flow mode to playback the queue.
"""
url = await self.mass.streams.resolve_stream_url(
+ player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
)
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
- from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
# sync constants
-MIN_DEVIATION_ADJUST = 6 # 6 milliseconds
-MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements
-MAX_SKIP_AHEAD_MS = 1500 # 1.5 seconds
+MIN_DEVIATION_ADJUST = 8 # 8 milliseconds
+MIN_REQ_PLAYPOINTS = 3 # we need at least 3 measurements
+DEVIATION_JUMP_IGNORE = 2000 # ignore a sudden unrealistic jump
+MAX_SKIP_AHEAD_MS = 500 # 0.5 seconds
@dataclass
slimproto: SlimServer
_sync_playpoints: dict[str, deque[SyncPlayPoint]]
- _do_not_resync_before: dict[str, float]
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self._sync_playpoints = {}
- self._do_not_resync_before = {}
self._resync_handle: asyncio.TimerHandle | None = None
control_port = self.config.get_value(CONF_PORT)
telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT)
self._handle_play_url(
slimplayer,
url=stream_job.resolve_stream_url(
+ player_id,
slimplayer.player_id,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
),
if not slimplayer:
return
url = await self.mass.streams.resolve_stream_url(
+ player_id,
queue_item=queue_item,
# for now just hardcode flac as we assume that every (modern)
# slimproto based player can handle that just fine
auto_play=True,
)
- async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle PLAY STREAM on given player.
-
- This is a special feature from the Universal Group provider.
- """
- enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
- # fix race condition where resync and play media are called at more or less the same time
- if self._resync_handle:
- self._resync_handle.cancel()
- self._resync_handle = None
- # forward command to player and any connected sync members
- sync_clients = self._get_sync_clients(player_id)
- async with asyncio.TaskGroup() as tg:
- for slimplayer in sync_clients:
- tg.create_task(
- self._handle_play_url(
- slimplayer,
- url=stream_job.resolve_stream_url(
- slimplayer.player_id,
- output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
- ),
- queue_item=None,
- send_flush=True,
- auto_play=False,
- )
- )
-
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
"""Handle enqueuing of the next queue item on the player."""
if not (slimplayer := self.slimproto.get_player(player_id)):
return
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
url = await self.mass.streams.resolve_stream_url(
+ player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
flow_mode=False,
x.player_id for x in self.slimproto.players if x.player_id != player_id
),
)
+ if slimplayer.device_type == "squeezeesp32":
+ # squeezeesp32 with default settings - override with sane defaults
+ if slimplayer.max_sample_rate == 192000:
+ player.max_sample_rate = 44100
+ player.supports_24bit = False
self.mass.players.register_or_update(player)
# update player state on player events
return
if slimplayer.state != SlimPlayerState.PLAYING:
return
-
- if backoff_time := self._do_not_resync_before.get(slimplayer.player_id):
- # player has set a timestamp we should backoff from syncing it
- if time.time() < backoff_time:
- return
+ if slimplayer.player_id not in self._sync_playpoints:
+ return
# we collect a few playpoints of the player to determine
# average lag/drift so we can adjust accordingly
- sync_playpoints = self._sync_playpoints.setdefault(
- slimplayer.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS)
- )
+ sync_playpoints = self._sync_playpoints[slimplayer.player_id]
active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id)
# should not happen, but just in case
return
+ now = time.time()
last_playpoint = sync_playpoints[-1] if sync_playpoints else None
- if last_playpoint and (time.time() - last_playpoint.timestamp) > 10:
+ if last_playpoint and (now - last_playpoint.timestamp) > 10:
# last playpoint is too old, invalidate
sync_playpoints.clear()
if last_playpoint and last_playpoint.sync_job_id != stream_job.job_id:
- self._get_corrected_elapsed_milliseconds(slimplayer)
)
+ if last_playpoint and abs(last_playpoint.diff - diff) > DEVIATION_JUMP_IGNORE:
+ # ignore unexpected spikes
+ return
+
# we can now append the current playpoint to our list
- sync_playpoints.append(SyncPlayPoint(time.time(), stream_job.job_id, diff))
+ sync_playpoints.append(SyncPlayPoint(now, stream_job.job_id, diff))
if len(sync_playpoints) < MIN_REQ_PLAYPOINTS:
return
# resync the player by skipping ahead or pause for x amount of (milli)seconds
sync_playpoints.clear()
- self._do_not_resync_before[slimplayer.player_id] = time.time() + (delta / 1000) + 2
if avg_diff > MAX_SKIP_AHEAD_MS:
# player lagging behind more than MAX_SKIP_AHEAD_MS,
# we need to correct the sync_master
- self.logger.warning(
- "%s is lagging behind more than %s milliseconds!",
- player.display_name,
- MAX_SKIP_AHEAD_MS,
- )
+ self.logger.debug("%s resync: pauseFor %sms", sync_master.name, delta)
self.mass.create_task(sync_master.pause_for(delta))
+ sync_master._elapsed_milliseconds -= delta
elif avg_diff > 0:
# handle player lagging behind, fix with skip_ahead
self.logger.debug("%s resync: skipAhead %sms", player.display_name, delta)
self.mass.create_task(slimplayer.skip_over(delta))
+ sync_master._elapsed_milliseconds += delta
else:
# handle player is drifting too far ahead, use pause_for to adjust
self.logger.debug("%s resync: pauseFor %sms", player.display_name, delta)
self.mass.create_task(slimplayer.pause_for(delta))
+ sync_master._elapsed_milliseconds -= delta
async def _handle_buffer_ready(self, slimplayer: SlimClient) -> None:
"""Handle buffer ready event, player has buffered a (new) track.
return
if not player.group_childs:
# not a sync group, continue
- await slimplayer.play()
+ await slimplayer.unpause_at(slimplayer.jiffies)
return
count = 0
while count < 40:
childs_total = 0
childs_ready = 0
+ await asyncio.sleep(0.2)
for sync_child in self._get_sync_clients(player.player_id):
childs_total += 1
if sync_child.state == SlimPlayerState.BUFFER_READY:
childs_ready += 1
if childs_total == childs_ready:
break
- await asyncio.sleep(0.1)
+
# all child's ready (or timeout) - start play
async with asyncio.TaskGroup() as tg:
for _client in self._get_sync_clients(player.player_id):
- timestamp = _client.jiffies + 500
- sync_delay = self.mass.config.get_raw_player_config_value(
- _client.player_id, CONF_SYNC_ADJUST, 0
- )
- timestamp -= sync_delay
- self._do_not_resync_before[_client.player_id] = time.time() + 1
- tg.create_task(_client.unpause_at(int(timestamp)))
+ self._sync_playpoints.setdefault(
+ _client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS * 2)
+ ).clear()
+ # NOTE: Officially you should do an unpause_at based on the player timestamp
+ # but I did not have any good results with that.
+ # Instead just start playback on all players and let the sync logic work out
+ # the delays etc.
+ tg.create_task(_client.unpause_at(0))
async def _handle_connected(self, slimplayer: SlimClient) -> None:
"""Handle a slimplayer connected event."""
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
- from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
CONF_SNAPCAST_SERVER_HOST = "snapcast_server_host"
snap_group = self._get_snapgroup(player_id)
await snap_group.set_stream(stream.identifier)
- async def _streamer() -> None:
- host = self.snapcast_server_host
- _, writer = await asyncio.open_connection(host, port)
- self.logger.debug("Opened connection to %s:%s", host, port)
- player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
- player.elapsed_time = 0
- player.elapsed_time_last_updated = time.time()
- player.state = PlayerState.PLAYING
- self._set_childs_state(player_id, PlayerState.PLAYING)
- self.mass.players.register_or_update(player)
- # TODO: can we handle 24 bits bit depth ?
- pcm_format = AudioFormat(
- content_type=ContentType.PCM_S16LE,
- sample_rate=48000,
- bit_depth=16,
- channels=2,
+ # TODO: can we handle 24 bits bit depth ?
+ pcm_format = AudioFormat(
+ content_type=ContentType.PCM_S16LE,
+ sample_rate=48000,
+ bit_depth=16,
+ channels=2,
+ )
+ # handle special case for UGP multi client stream
+ if stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id):
+ stream_job.expected_players.add(player_id)
+ audio_iterator = stream_job.subscribe(
+ player_id=player_id,
+ output_format=pcm_format,
+ )
+ else:
+ audio_iterator = self.mass.streams.get_flow_stream(
+ queue,
+ start_queue_item=queue_item,
+ pcm_format=pcm_format,
+ seek_position=seek_position,
+ fade_in=fade_in,
)
- try:
- async for pcm_chunk in self.mass.streams.get_flow_stream(
- queue,
- start_queue_item=queue_item,
- pcm_format=pcm_format,
- seek_position=seek_position,
- fade_in=fade_in,
- ):
- writer.write(pcm_chunk)
- await writer.drain()
- # end of the stream reached
- if writer.can_write_eof():
- writer.write_eof()
- await writer.drain()
- # we need to wait a bit before removing the stream to ensure
- # that all snapclients have consumed the audio
- # https://github.com/music-assistant/hass-music-assistant/issues/1962
- await asyncio.sleep(30)
- finally:
- if not writer.is_closing():
- writer.close()
- await self._snapserver.stream_remove_stream(stream.identifier)
- self.logger.debug("Closed connection to %s:%s", host, port)
-
- # start streaming the queue (pcm) audio in a background task
- self._stream_tasks[player_id] = asyncio.create_task(_streamer())
-
- async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle PLAY STREAM on given player.
-
- This is a special feature from the Universal Group provider.
- """
- player = self.mass.players.get(player_id)
- if player.synced_to:
- msg = "A synced player cannot receive play commands directly"
- raise RuntimeError(msg)
- # stop any existing streams first
- await self.cmd_stop(player_id)
- if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 48000:
- # TODO: resample on the fly here ?
- raise RuntimeError("Unsupported PCM format")
- stream, port = await self._create_stream()
- stream_job.expected_players.add(player_id)
- snap_group = self._get_snapgroup(player_id)
- await snap_group.set_stream(stream.identifier)
async def _streamer() -> None:
host = self.snapcast_server_host
_, writer = await asyncio.open_connection(host, port)
self.logger.debug("Opened connection to %s:%s", host, port)
- player.current_item_id = f"flow/{stream_job.queue_id}"
+ player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
player.elapsed_time = 0
player.elapsed_time_last_updated = time.time()
player.state = PlayerState.PLAYING
self._set_childs_state(player_id, PlayerState.PLAYING)
self.mass.players.register_or_update(player)
try:
- async for pcm_chunk in stream_job.subscribe(player_id):
+ async for pcm_chunk in audio_iterator:
writer.write(pcm_chunk)
await writer.drain()
# end of the stream reached
import asyncio
import logging
-import time
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
- from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
- fade_in: Optionally fade in the item at playback start.
"""
url = await self.mass.streams.resolve_stream_url(
+ player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
seek_position=seek_position,
"accept play_media command, it is synced to another player."
)
raise PlayerCommandFailed(msg)
- metadata = create_didl_metadata(self.mass, url, queue_item)
- await self.mass.create_task(sonos_player.soco.play_uri, url, meta=metadata)
-
- async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle PLAY STREAM on given player.
-
- This is a special feature from the Universal Group provider.
- """
- url = stream_job.resolve_stream_url(player_id, ContentType.FLAC)
- sonos_player = self.sonosplayers[player_id]
- mass_player = self.mass.players.get(player_id)
- if sonos_player.sync_coordinator:
- # this should be already handled by the player manager, but just in case...
- msg = (
- f"Player {mass_player.display_name} can not "
- "accept play_stream command, it is synced to another player."
- )
- raise PlayerCommandFailed(msg)
- metadata = create_didl_metadata(self.mass, url, None)
- # sonos players do not like our multi client stream
- # add to the workaround players list
- self.mass.streams.workaround_players.add(player_id)
- await self.mass.create_task(sonos_player.soco.play_uri, url, meta=metadata)
- # optimistically set this timestamp to help figure out elapsed time later
- mass_player.elapsed_time = 0
- mass_player.elapsed_time_last_updated = time.time()
+ await self.mass.create_task(
+ sonos_player.soco.play_uri,
+ url,
+ meta=create_didl_metadata(self.mass, url, queue_item),
+ )
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
"""
"""
sonos_player = self.sonosplayers[player_id]
url = await self.mass.streams.resolve_stream_url(
+ player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
)
if retries > 2:
# switch to ap workaround after 2 retries
self._ap_workaround = True
- except asyncio.exceptions.TimeoutError:
+ except TimeoutError:
await asyncio.sleep(2)
if tokeninfo and userinfo:
self._auth_token = tokeninfo
ProviderFeature,
)
from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.queue_item import QueueItem
from music_assistant.constants import CONF_CROSSFADE, CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX
from music_assistant.server.models.player_provider import PlayerProvider
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
await self.cmd_power(player_id, True)
group_player = self.mass.players.get(player_id)
- # create multi-client stream job
- stream_job = await self.mass.streams.create_multi_client_stream_job(
+ # create a multi-client stream job - all (direct) child's of this UGP group
+ # will subscribe to this multi client queue stream
+ await self.mass.streams.create_multi_client_stream_job(
player_id,
start_queue_item=queue_item,
seek_position=seek_position,
fade_in=fade_in,
)
+ # create a fake queue item to forward to downstream play_media commands
+ ugp_queue_item = QueueItem(
+ player_id, queue_item_id="flow", name=group_player.display_name, duration=None
+ )
# forward the stream job to all group members
async with asyncio.TaskGroup() as tg:
member = self.mass.players.get_sync_leader(member) # noqa: PLW2901
if member is None:
continue
- tg.create_task(player_prov.play_stream(member.player_id, stream_job))
+ tg.create_task(player_prov.play_media(member.player_id, ugp_queue_item, 0, False))
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates."""