help="Provide logging level. Example --log-level debug, "
"default=info, possible=(critical, error, warning, info, debug)",
)
- parser.add_argument("-u", "--enable-uvloop", action="store_true")
return parser.parse_args()
hass_options = {}
log_level = hass_options.get("log_level", args.log_level).upper()
- enable_uvloop = bool(hass_options.get("enable_uvloop", args.enable_uvloop))
dev_mode = os.environ.get("PYTHONDEVMODE", "0") == "1"
# setup logger
run(
start_mass(),
- use_uvloop=enable_uvloop,
shutdown_callback=on_shutdown,
executor_workers=32,
)
player = self.get(player_id, True)
if player.announcement_in_progress:
return
- # use stream server to host announcement on local network
- # this ensures playback on all players, including ones that do not
- # like https hosts and it also offers the pre-announce 'bell'
- announcement_url = self.mass.streams.get_announcement_url(
- player.player_id, url, use_pre_announce=use_pre_announce
- )
try:
# mark announcement_in_progress on player
player.announcement_in_progress = True
# check for native announce support
if PlayerFeature.PLAY_ANNOUNCEMENT in player.supported_features:
if prov := self.mass.get_provider(player.provider):
+ # use stream server to host announcement on local network
+ # this ensures playback on all players, including ones that do not
+ # like https hosts and it also offers the pre-announce 'bell'
+ announcement_url = self.mass.streams.get_announcement_url(
+ player.player_id, url, use_pre_announce=use_pre_announce
+ )
await prov.play_announcement(player_id, announcement_url)
return
# use fallback/default implementation
- await self._play_announcement(player, announcement_url)
+ await self._play_announcement(player, url, use_pre_announce)
finally:
player.announcement_in_progress = False
# if a child player turned ON while the group player is on, we need to resync/resume
self.mass.create_task(self._sync_syncgroup(group_player.player_id))
- async def _play_announcement(
- self,
- player: Player,
- announcement_url: str,
- ) -> None:
+ async def _play_announcement(self, player: Player, url: str, use_pre_announce: bool) -> None:
"""Handle (default/fallback) implementation of the play announcement feature.
This default implementation will;
"""
if player.synced_to:
# redirect to sync master if player is group child
- self.mass.create_task(self.play_announcement(player.synced_to, announcement_url))
+ self.mass.create_task(self.play_announcement(player.synced_to, url))
return
if active_group := self._get_active_player_group(player):
- # redirect to group player if playergroup is atcive
- self.mass.create_task(self.play_announcement(active_group.player_id, announcement_url))
+ # redirect to group player if playergroup is active
+ self.mass.create_task(self.play_announcement(active_group.player_id, url))
return
# create a queue item for the announcement so
# we can send a regular play-media call downstream
queue_item = QueueItem(
queue_id=player.player_id,
- queue_item_id=announcement_url,
+ queue_item_id=url,
name="Announcement",
duration=None,
streamdetails=StreamDetails(
provider="url",
- item_id=announcement_url,
+ item_id=url,
audio_format=AudioFormat(
- content_type=ContentType.try_parse(announcement_url),
+ content_type=ContentType.try_parse(url),
),
media_type=MediaType.ANNOUNCEMENT,
- direct=announcement_url,
- data=announcement_url,
+ direct=url,
+ data={"url": url, "use_pre_announce": use_pre_announce},
target_loudness=-10,
),
)
import shortuuid
from aiohttp import web
-from music_assistant.common.helpers.util import (
- empty_queue,
- get_ip,
- select_free_port,
- try_parse_bool,
-)
+from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueOption,
CONF_PUBLISH_IP,
SILENCE_FILE,
UGP_PREFIX,
- VERBOSE_LOG_LEVEL,
)
from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.server.helpers.audio import (
check_audio_support,
crossfade_pcm_parts,
- get_ffmpeg_args,
get_ffmpeg_stream,
get_media_stream,
get_player_filter_params,
)
-from music_assistant.server.helpers.process import AsyncProcess
from music_assistant.server.helpers.util import get_ips
from music_assistant.server.helpers.webserver import Webserver
from music_assistant.server.models.core_controller import CoreController
from music_assistant.common.models.player import Player
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
- from music_assistant.server import MusicAssistant
DEFAULT_STREAM_HEADERS = {
# pylint:disable=too-many-locals
-class QueueStreamJob:
+class MultiClientStreamJob:
"""
- Representation of a (multiclient) Audio stream job/task.
-
- The whole idea here is that the (pcm) audio source can be sent to multiple
- players at once. For example for (slimproto/airplay) syncgroups and universal group.
+ Representation of a (multiclient) Audio Queue stream job/task.
- All client players receive the exact same PCM audio chunks from the source audio,
- which then can be optionally encoded and/or resampled to the player's preferences.
- In case a stream is restarted (e.g. when seeking),
- a new QueueStreamJob will be created.
+ The whole idea here is that in case of a player (sync)group,
+ all client players receive the exact same (PCM) audio chunks from the source audio.
+ A StreamJob is tied to a Queue and streams the queue flow stream,
+ In case a stream is restarted (e.g. when seeking), a new MultiClientStreamJob will be created.
"""
_audio_task: asyncio.Task | None = None
def __init__(
self,
- mass: MusicAssistant,
- pcm_audio_source: AsyncGenerator[bytes, None],
+ stream_controller: StreamsController,
+ queue_id: str,
pcm_format: AudioFormat,
- auto_start: bool = False,
+ start_queue_item: QueueItem,
) -> None:
- """Initialize QueueStreamJob instance."""
- self.mass = mass
- self.pcm_audio_source = pcm_audio_source
+ """Initialize MultiClientStreamJob instance."""
+ self.stream_controller = stream_controller
+ self.queue_id = queue_id
+ self.queue = self.stream_controller.mass.player_queues.get(queue_id)
+ assert self.queue # just in case
self.pcm_format = pcm_format
- self.expected_players: set[str] = set()
+ self.start_queue_item = start_queue_item
self.job_id = shortuuid.uuid()
- self.bytes_streamed: int = 0
- self.logger = self.mass.streams.logger
+ self.expected_players: set[str] = set()
self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {}
- self._finished = False
- self.allow_start = auto_start
+ self.bytes_streamed: int = 0
self._all_clients_connected = asyncio.Event()
+ self.logger = stream_controller.logger.getChild("streamjob")
+ self._finished: bool = False
+ # start running the audio task in the background
self._audio_task = asyncio.create_task(self._stream_job_runner())
@property
def finished(self) -> bool:
"""Return if this StreamJob is finished."""
- return self._finished or (self._audio_task and self._audio_task.done())
+ return self._finished or self._audio_task and self._audio_task.done()
@property
def pending(self) -> bool:
@property
def running(self) -> bool:
"""Return if this Job is running."""
- return (
- self._all_clients_connected.is_set()
- and self._audio_task
- and not self._audio_task.done()
- )
-
- def start(self) -> None:
- """Start running (send audio chunks to connected players)."""
- if self.finished:
- raise RuntimeError("Task is already finished")
- self.allow_start = True
- if self.expected_players and len(self.subscribed_players) >= len(self.expected_players):
- self._all_clients_connected.set()
+ return not self.finished and not self.pending
def stop(self) -> None:
"""Stop running this job."""
- if self._audio_task and not self._audio_task.done():
- self._audio_task.cancel()
- if not self._finished:
- # we need to make sure that we close the async generator
- with suppress(StopAsyncIteration):
- task = asyncio.create_task(self.pcm_audio_source.__anext__())
- task.cancel()
self._finished = True
+ if self._audio_task and self._audio_task.done():
+ return
+ if self._audio_task:
+ self._audio_task.cancel()
for sub_queue in self.subscribed_players.values():
- empty_queue(sub_queue)
+ with suppress(asyncio.QueueFull):
+ sub_queue.put_nowait(b"")
- def resolve_stream_url(self, player_id: str, output_codec: ContentType) -> str:
+ def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
"""Resolve the childplayer specific stream URL to this streamjob."""
fmt = output_codec.value
# handle raw pcm
if output_codec.is_pcm():
- player = self.mass.streams.mass.players.get(player_id)
+ player = self.stream_controller.mass.players.get(child_player_id)
player_max_bit_depth = 24 if player.supports_24bit else 16
output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate)
output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth)
- output_channels = self.mass.config.get_raw_player_config_value(
- player_id, CONF_OUTPUT_CHANNELS, "stereo"
+ output_channels = self.stream_controller.mass.config.get_raw_player_config_value(
+ child_player_id, CONF_OUTPUT_CHANNELS, "stereo"
)
channels = 1 if output_channels != "stereo" else 2
fmt += (
f";codec=pcm;rate={output_sample_rate};"
f"bitrate={output_bit_depth};channels={channels}"
)
- url = f"{self.mass.streams._server.base_url}/flow/{self.job_id}/{player_id}.{fmt}"
- self.expected_players.add(player_id)
+ url = f"{self.stream_controller._server.base_url}/multi/{self.queue_id}/{self.job_id}/{child_player_id}/{self.start_queue_item.queue_item_id}.{fmt}" # noqa: E501
+ self.expected_players.add(child_player_id)
return url
- async def iter_player_audio(
- self, player_id: str, output_format: AudioFormat, chunk_size: int | None = None
- ) -> AsyncGenerator[bytes, None]:
- """Subscribe consumer and iterate player-specific audio."""
- async for chunk in get_ffmpeg_stream(
- audio_input=self.subscribe(player_id),
- input_format=self.pcm_format,
- output_format=output_format,
- filter_params=get_player_filter_params(self.mass, player_id),
- chunk_size=chunk_size,
- ):
- yield chunk
-
- async def stream_to_custom_output_path(
- self, player_id: str, output_format: AudioFormat, output_path: str | int
- ) -> None:
- """Subscribe consumer and instruct ffmpeg to send the audio to the given output path."""
- custom_file_pointer = isinstance(output_path, int)
- ffmpeg_args = get_ffmpeg_args(
- input_format=self.pcm_format,
- output_format=output_format,
- filter_params=get_player_filter_params(self.mass, player_id),
- extra_args=[],
- input_path="-",
- output_path="-" if custom_file_pointer else output_path,
- loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet",
- )
- # launch ffmpeg process with player specific settings
- # the stream_job_runner will start pushing pcm chunks to the stdin
- # the ffmpeg process will send the output directly to the given path (e.g. tcp socket)
- async with AsyncProcess(
- ffmpeg_args,
- enable_stdin=True,
- enable_stdout=custom_file_pointer,
- enable_stderr=False,
- custom_stdin=self.subscribe(player_id),
- custom_stdout=output_path if custom_file_pointer else None,
- name="ffmpeg_custom_output_path",
- ) as ffmpeg_proc:
- # we simply wait for the process to exit
- await ffmpeg_proc.wait()
-
async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]:
"""Subscribe consumer and iterate incoming chunks on the queue."""
try:
- self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2)
-
- if self._all_clients_connected.is_set():
- # client subscribes while we're already started
+ # some players (e.g. dlna, sonos) misbehave and do multiple GET requests
+ # to the stream in an attempt to get the audio details such as duration
+ # which is a bit pointless for our duration-less queue stream
+ # and it completely messes with the subscription logic
+ if player_id in self.subscribed_players:
self.logger.warning(
- "Client %s is joining while the stream is already started", player_id
+ "Player %s is making multiple requests "
+ "to the same stream, playback may be disturbed!",
+ player_id,
+ )
+ player_id = f"{player_id}_{shortuuid.random(4)}"
+ elif self._all_clients_connected.is_set():
+ # client subscribes while we're already started - that is going to be messy for sure
+ self.logger.warning(
+ "Player %s is is joining while the stream is already started, "
+ "playback may be disturbed!",
+ player_id,
)
+ self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2)
+
+ if self._all_clients_connected.is_set():
+ # client subscribes while we're already started - we dont support that (for now?)
+ msg = f"Client {player_id} is joining while the stream is already started"
+ raise RuntimeError(msg)
self.logger.debug("Subscribed client %s", player_id)
- if (
- self.expected_players
- and self.allow_start
- and len(self.subscribed_players) == len(self.expected_players)
- ):
+ if len(self.subscribed_players) == len(self.expected_players):
# we reached the number of expected subscribers, set event
# so that chunks can be pushed
+ await asyncio.sleep(0.2)
self._all_clients_connected.set()
# keep reading audio chunks from the queue until we receive an empty one
await asyncio.sleep(2)
if len(self.subscribed_players) == 0 and self._audio_task and not self.finished:
self.logger.debug("Cleaning up, all clients disappeared...")
- self.stop()
+ self._audio_task.cancel()
async def _put_chunk(self, chunk: bytes) -> None:
"""Put chunk of data to all subscribers."""
async def _stream_job_runner(self) -> None:
"""Feed audio chunks to StreamJob subscribers."""
chunk_num = 0
- async for chunk in self.pcm_audio_source:
+ async for chunk in self.stream_controller.get_flow_stream(
+ self.queue,
+ self.start_queue_item,
+ self.pcm_format,
+ ):
chunk_num += 1
if chunk_num == 1:
# wait until all expected clients are connected
await self._all_clients_connected.wait()
except TimeoutError:
if len(self.subscribed_players) == 0:
- self.logger.exception(
- "Abort multi client stream job %s: "
- "client(s) did not connect within timeout",
- self.job_id,
+ self.stream_controller.logger.exception(
+ "Abort multi client stream job for queue %s: "
+ "clients did not connect within timeout",
+ self.queue.display_name,
)
break
# not all clients connected but timeout expired, set flag and move on
# with all clients that did connect
self._all_clients_connected.set()
else:
- self.logger.debug(
- "Starting queue stream job %s with %s (out of %s) connected clients",
- self.job_id,
+ self.stream_controller.logger.debug(
+ "Starting multi client stream job for queue %s "
+ "with %s out of %s connected clients",
+ self.queue.display_name,
len(self.subscribed_players),
len(self.expected_players),
)
"""Initialize instance."""
super().__init__(*args, **kwargs)
self._server = Webserver(self.logger, enable_dynamic_routes=True)
- self.stream_jobs: dict[str, QueueStreamJob] = {}
+ self.multi_client_jobs: dict[str, MultiClientStreamJob] = {}
self.register_dynamic_route = self._server.register_dynamic_route
self.unregister_dynamic_route = self._server.unregister_dynamic_route
self.manifest.name = "Streamserver"
static_routes=[
(
"*",
- "/flow/{job_id}/{player_id}.{fmt}",
+ "/flow/{queue_id}/{queue_item_id}.{fmt}",
self.serve_queue_flow_stream,
),
(
"/single/{queue_id}/{queue_item_id}.{fmt}",
self.serve_queue_item_stream,
),
+ (
+ "*",
+ "/multi/{queue_id}/{job_id}/{player_id}/{queue_item_id}.{fmt}",
+ self.serve_multi_subscriber_stream,
+ ),
(
"*",
"/command/{queue_id}/{command}.mp3",
) -> str:
"""Resolve the stream URL for the given QueueItem."""
fmt = output_codec.value
- # handle announcement item
- if queue_item.media_type == MediaType.ANNOUNCEMENT:
- return queue_item.queue_item_id
- # handle request for (multi client) queue flow stream
+ # handle special stream created by UGP
if queue_item.queue_id.startswith(UGP_PREFIX):
- # special case: we got forwarded a request from a Universal Group Player
- # use the existing stream job that was already created by UGP
- stream_job = self.mass.streams.stream_jobs[queue_item.queue_id]
- return stream_job.resolve_stream_url(player_id, output_codec)
-
- if flow_mode:
- # create a new flow mode stream job session
- pcm_format = AudioFormat(
- content_type=ContentType.from_bit_depth(24),
- sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
- bit_depth=FLOW_DEFAULT_BIT_DEPTH,
+ return self.multi_client_jobs[queue_item.queue_id].resolve_stream_url(
+ player_id, output_codec
)
- stream_job = self.create_stream_job(
- queue_item.queue_id,
- pcm_audio_source=self.get_flow_stream(
- self.mass.player_queues.get(queue_item.queue_id),
- start_queue_item=queue_item,
- pcm_format=pcm_format,
- ),
- pcm_format=pcm_format,
- auto_start=True,
+ # handle announcement item
+ if queue_item.media_type == MediaType.ANNOUNCEMENT:
+ return self.get_announcement_url(
+ player_id=queue_item.queue_id,
+ announcement_url=queue_item.streamdetails.data["url"],
+ use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
+ content_type=output_codec,
)
-
- return stream_job.resolve_stream_url(player_id, output_codec)
-
# handle raw pcm without exact format specifiers
if output_codec.is_pcm() and ";" not in fmt:
fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
query_params = {}
- url = (
- f"{self._server.base_url}/single/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}"
- )
+ base_path = "flow" if flow_mode else "single"
+ url = f"{self._server.base_url}/{base_path}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" # noqa: E501
# we add a timestamp as basic checksum
# most importantly this is to invalidate any caches
# but also to handle edge cases such as single track repeat
url += "?" + urllib.parse.urlencode(query_params)
return url
- def create_stream_job(
+ def create_multi_client_stream_job(
self,
queue_id: str,
- pcm_audio_source: AsyncGenerator[bytes, None],
- pcm_format: AudioFormat,
- auto_start: bool = False,
- ) -> QueueStreamJob:
- """
- Create a QueueStreamJob for the given queue..
+ start_queue_item: QueueItem,
+ pcm_bit_depth: int = FLOW_DEFAULT_BIT_DEPTH,
+ pcm_sample_rate: int = FLOW_DEFAULT_SAMPLE_RATE,
+ ) -> MultiClientStreamJob:
+ """Create a MultiClientStreamJob for the given queue..
This is called by player/sync group implementations to start streaming
the queue audio to multiple players at once.
"""
- if existing_job := self.stream_jobs.pop(queue_id, None):
+ if existing_job := self.multi_client_jobs.pop(queue_id, None):
+ if (
+ queue_id.startswith(UGP_PREFIX)
+ and existing_job.job_id == start_queue_item.queue_item_id
+ ):
+ return existing_job
# cleanup existing job first
- existing_job.stop()
- self.stream_jobs[queue_id] = stream_job = QueueStreamJob(
- self.mass,
- pcm_audio_source=pcm_audio_source,
- pcm_format=pcm_format,
- auto_start=auto_start,
+ if not existing_job.finished:
+ self.logger.warning("Detected existing (running) stream job for queue %s", queue_id)
+ existing_job.stop()
+ self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob(
+ self,
+ queue_id=queue_id,
+ pcm_format=AudioFormat(
+ content_type=ContentType.from_bit_depth(pcm_bit_depth),
+ sample_rate=pcm_sample_rate,
+ bit_depth=pcm_bit_depth,
+ channels=2,
+ ),
+ start_queue_item=start_queue_item,
)
return stream_job
async def serve_queue_flow_stream(self, request: web.Request) -> web.Response:
"""Stream Queue Flow audio to player."""
self._log_request(request)
- job_id = request.match_info["job_id"]
- for queue_id, stream_job in self.stream_jobs.items():
- if stream_job.job_id == job_id:
- break
- else:
- raise web.HTTPNotFound(reason=f"Unknown StreamJob: {job_id}")
- if stream_job.finished:
- raise web.HTTPNotFound(reason=f"StreamJob {job_id} already finished")
- if not (queue := self.mass.player_queues.get(queue_id)):
+ queue_id = request.match_info["queue_id"]
+ queue = self.mass.player_queues.get(queue_id)
+ if not queue:
raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
- player_id = request.match_info["player_id"]
- child_player = self.mass.players.get(player_id)
- if not child_player:
- raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
- # work out (childplayer specific!) output format/details
+ if not (queue_player := self.mass.players.get(queue_id)):
+ raise web.HTTPNotFound(reason=f"Unknown Player: {queue_id}")
+ start_queue_item_id = request.match_info["queue_item_id"]
+ start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
+ if not start_queue_item:
+ raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
+ # work out output format/details
output_format = await self._get_output_format(
output_format_str=request.match_info["fmt"],
- queue_player=child_player,
- default_sample_rate=stream_job.pcm_format.sample_rate,
- default_bit_depth=stream_job.pcm_format.bit_depth,
+ queue_player=queue_player,
+ default_sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
+ default_bit_depth=FLOW_DEFAULT_BIT_DEPTH,
)
# play it safe: only allow icy metadata for mp3 and aac
enable_icy = request.headers.get(
"Icy-MetaData", ""
) == "1" and output_format.content_type in (ContentType.MP3, ContentType.AAC)
- icy_meta_interval = 16384 * 4 if output_format.content_type.is_lossless() else 16384
+ icy_meta_interval = 16384
# prepare request, add some DLNA/UPNP compatible headers
headers = {
if request.method != "GET":
return resp
- # some players (e.g. dlna, sonos) misbehave and do multiple GET requests
- # to the stream in an attempt to get the audio details such as duration
- # which is a bit pointless for our duration-less queue stream
- # and it completely messes with the subscription logic
- if player_id in stream_job.subscribed_players:
- self.logger.warning(
- "Player %s is making multiple requests "
- "to the same stream, playback may be disturbed!",
- player_id,
- )
- elif "rincon" in player_id.lower():
- await asyncio.sleep(0.1)
-
# all checks passed, start streaming!
- self.logger.debug(
- "Start serving Queue flow audio stream for queue %s to player %s",
- queue.display_name,
- child_player.display_name,
+ self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
+
+ # collect player specific ffmpeg args to re-encode the source PCM stream
+ pcm_format = AudioFormat(
+ content_type=ContentType.from_bit_depth(output_format.bit_depth),
+ sample_rate=output_format.sample_rate,
+ bit_depth=output_format.bit_depth,
+ channels=2,
)
- async for chunk in stream_job.iter_player_audio(
- player_id, output_format, chunk_size=icy_meta_interval if enable_icy else None
+ async for chunk in get_ffmpeg_stream(
+ audio_input=self.get_flow_stream(
+ queue=queue, start_queue_item=start_queue_item, pcm_format=pcm_format
+ ),
+ input_format=pcm_format,
+ output_format=output_format,
+ filter_params=get_player_filter_params(self.mass, queue_player.player_id),
+ chunk_size=icy_meta_interval if enable_icy else None,
):
try:
await resp.write(chunk)
except (BrokenPipeError, ConnectionResetError):
+ # race condition
break
+
if not enable_icy:
continue
return resp
+ async def serve_multi_subscriber_stream(self, request: web.Request) -> web.Response:
+ """Stream Queue Flow audio to a child player within a multi subscriber setup."""
+ self._log_request(request)
+ queue_id = request.match_info["queue_id"]
+ streamjob = self.multi_client_jobs.get(queue_id)
+ if not streamjob:
+ raise web.HTTPNotFound(reason=f"Unknown StreamJob for queue: {queue_id}")
+ job_id = request.match_info["job_id"]
+ if job_id != streamjob.job_id:
+ raise web.HTTPNotFound(reason=f"StreamJob ID {job_id} mismatch for queue: {queue_id}")
+ child_player_id = request.match_info["player_id"]
+ child_player = self.mass.players.get(child_player_id)
+ if not child_player:
+ raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
+ # work out (childplayer specific!) output format/details
+ output_format = await self._get_output_format(
+ output_format_str=request.match_info["fmt"],
+ queue_player=child_player,
+ default_sample_rate=streamjob.pcm_format.sample_rate,
+ default_bit_depth=streamjob.pcm_format.bit_depth,
+ )
+ # prepare request, add some DLNA/UPNP compatible headers
+ headers = {
+ **DEFAULT_STREAM_HEADERS,
+ "Content-Type": f"audio/{output_format.output_format_str}",
+ }
+ resp = web.StreamResponse(
+ status=200,
+ reason="OK",
+ headers=headers,
+ )
+ await resp.prepare(request)
+
+ # return early if this is not a GET request
+ if request.method != "GET":
+ return resp
+
+ # all checks passed, start streaming!
+ self.logger.debug(
+ "Start serving multi-subscriber Queue flow audio stream for queue %s to player %s",
+ streamjob.queue.display_name,
+ child_player.display_name,
+ )
+
+ async for chunk in get_ffmpeg_stream(
+ audio_input=streamjob.subscribe(child_player_id),
+ input_format=streamjob.pcm_format,
+ output_format=output_format,
+ filter_params=get_player_filter_params(self.mass, child_player_id),
+ ):
+ try:
+ await resp.write(chunk)
+ except (BrokenPipeError, ConnectionResetError):
+ # race condition
+ break
+
+ return resp
+
async def serve_command_request(self, request: web.Request) -> web.Response:
"""Handle special 'command' request for a player."""
self._log_request(request)
raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
if player_id not in self.announcements:
raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
- announcement = self.announcements[player_id]
+ announcement_url = self.announcements[player_id]
use_pre_announce = try_parse_bool(request.query.get("pre_announce"))
# work out output format/details
- fmt = request.match_info.get("fmt", announcement.rsplit(".")[-1])
+ fmt = request.match_info.get("fmt", announcement_url.rsplit(".")[-1])
audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
# prepare request, add some DLNA/UPNP compatible headers
headers = {
# all checks passed, start streaming!
self.logger.debug(
"Start serving audio stream for Announcement %s to %s",
- announcement,
+ announcement_url,
player.display_name,
)
- extra_args = []
- filter_params = ["loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5"]
- if use_pre_announce:
- extra_args += [
- "-i",
- ANNOUNCE_ALERT_FILE,
- "-filter_complex",
- "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5",
- ]
- filter_params = []
-
- async for chunk in get_ffmpeg_stream(
- audio_input=announcement,
- input_format=audio_format,
+ async for chunk in self.get_announcement_stream(
+ announcement_url=announcement_url,
output_format=audio_format,
- extra_args=extra_args,
- filter_params=filter_params,
+ use_pre_announce=use_pre_announce,
):
try:
await resp.write(chunk)
self.logger.debug(
"Finished serving audio stream for Announcement %s to %s",
- announcement,
+ announcement_url,
player.display_name,
)
del buffer
self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
+ async def get_announcement_stream(
+ self, announcement_url: str, output_format: AudioFormat, use_pre_announce: bool = False
+ ) -> AsyncGenerator[bytes, None]:
+ """Get the special announcement stream."""
+ # work out output format/details
+ fmt = announcement_url.rsplit(".")[-1]
+ audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
+ extra_args = []
+ filter_params = ["loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5"]
+ if use_pre_announce:
+ extra_args += [
+ "-i",
+ ANNOUNCE_ALERT_FILE,
+ "-filter_complex",
+ "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5",
+ ]
+ filter_params = []
+ async for chunk in get_ffmpeg_stream(
+ audio_input=announcement_url,
+ input_format=audio_format,
+ output_format=output_format,
+ extra_args=extra_args,
+ filter_params=filter_params,
+ ):
+ yield chunk
+
def _log_request(self, request: web.Request) -> None:
"""Log request."""
if not self.logger.isEnabledFor(logging.DEBUG):
filter_params=filter_params,
extra_args=extra_args,
input_path=streamdetails.direct or "-",
+ loglevel="info", # needed for loudness measurement
)
finished = False
ffmpeg_args,
enable_stdin=streamdetails.direct is None,
enable_stderr=True,
+ custom_stdin=mass.get_provider(streamdetails.provider).get_audio_stream(
+ streamdetails,
+ seek_position=streamdetails.seek_position if streamdetails.can_seek else 0,
+ )
+ if not streamdetails.direct
+ else None,
name="ffmpeg_media_stream",
)
await ffmpeg_proc.start()
logger = LOGGER.getChild("media_stream")
logger.debug("start media stream for: %s", streamdetails.uri)
- async def writer() -> None:
- """Task that grabs the source audio and feeds it to ffmpeg."""
- music_prov = mass.get_provider(streamdetails.provider)
- seek_pos = streamdetails.seek_position if streamdetails.can_seek else 0
- async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
- await ffmpeg_proc.write(audio_chunk)
- # write eof when last packet is received
- await ffmpeg_proc.write_eof()
-
- if streamdetails.direct is None:
- ffmpeg_proc.attached_tasks.append(asyncio.create_task(writer()))
-
# get pcm chunks from stdout
# we always stay one chunk behind to properly detect end of chunks
# so we can strip silence at the beginning and end of a track
finally:
seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
streamdetails.seconds_streamed = seconds_streamed
+ state_str = "finished" if finished else "aborted"
+ logger.debug(
+ "stream %s for: %s (%s seconds streamed)",
+ state_str,
+ streamdetails.uri,
+ seconds_streamed,
+ )
+ # store accurate duration
if finished:
- logger.debug(
- "finished stream for: %s (%s seconds streamed)",
- streamdetails.uri,
- seconds_streamed,
- )
- # store accurate duration
streamdetails.duration = streamdetails.seek_position + seconds_streamed
- else:
- logger.debug(
- "stream aborted for %s (%s seconds streamed)",
- streamdetails.uri,
- seconds_streamed,
- )
# use communicate to read stderr and wait for exit
# read log for loudness measurement (or errors)
- _, stderr = await ffmpeg_proc.communicate()
- if ffmpeg_proc.returncode != 0:
- # ffmpeg has a non zero returncode meaning trouble
+ try:
+ _, stderr = await asyncio.wait_for(ffmpeg_proc.communicate(), 5)
+ except TimeoutError:
+ stderr = b""
+ # ensure to send close here so we terminate and cleanup the process
+ await ffmpeg_proc.close()
+ if ffmpeg_proc.returncode != 0 and not bytes_sent:
logger.warning("stream error on %s", streamdetails.uri)
- logger.warning(stderr.decode())
- finished = False
- elif loudness_details := _parse_loudnorm(stderr):
- logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
+ elif stderr and (loudness_details := _parse_loudnorm(stderr)):
required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
if finished or (seconds_streamed >= required_seconds):
LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details)
await mass.music.set_track_loudness(
streamdetails.item_id, streamdetails.provider, loudness_details
)
- else:
+ elif stderr:
logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
# report playback
input_format: AudioFormat,
output_format: AudioFormat,
filter_params: list[str],
- extra_args: list[str],
+ extra_args: list[str] | None = None,
input_path: str = "-",
output_path: str = "-",
- loglevel: str = "info",
+ loglevel: str | None = None,
) -> list[str]:
"""Collect all args to send to the ffmpeg process."""
+ if loglevel is None:
+ loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet"
+ if extra_args is None:
+ extra_args = []
+ extra_args += ["-bufsize", "32M"]
ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support")
-
if not ffmpeg_present:
msg = (
"FFmpeg binary is missing from system."
import logging
import os
from contextlib import suppress
+from signal import SIGINT
from types import TracebackType
from typing import TYPE_CHECKING
stdin=stdin if self._enable_stdin else None,
stdout=stdout if self._enable_stdout else None,
stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
- limit=4000000,
- pipesize=256000,
)
LOGGER.debug("Started %s with PID %s", self._name, self.proc.pid)
task.cancel()
with suppress(asyncio.CancelledError):
await task
+ if self.proc.returncode is None:
+ # always first try to send sigint signal to try clean shutdown
+ # for example ffmpeg needs this to cleanly shutdown and not lock on pipes
+ self.proc.send_signal(SIGINT)
+ # allow the process a little bit of time to respond to the signal
+ await asyncio.sleep(0.1)
+
# send communicate until we exited
while self.proc.returncode is None:
- # make sure the process is cleaned up
+ # make sure the process is really cleaned up.
+ # especially with pipes this can cause deadlocks if not properly guarded
+ # we need to use communicate to ensure buffers are flushed
+ # we do that with sending communicate
try:
- # we need to use communicate to ensure buffers are flushed
- await asyncio.wait_for(self.proc.communicate(), 10)
+ await asyncio.wait_for(self.proc.communicate(), 2)
except TimeoutError:
LOGGER.debug(
"Process %s with PID %s did not stop in time. Sending terminate...",
self.proc.pid,
)
self.proc.terminate()
- await asyncio.sleep(0.5)
LOGGER.debug(
"Process %s with PID %s stopped with returncode %s",
self._name,
import platform
import socket
import time
+from collections.abc import AsyncGenerator
from contextlib import suppress
from dataclasses import dataclass
from random import randint, randrange
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.constants import CONF_SYNC_ADJUST, UGP_PREFIX, VERBOSE_LOG_LEVEL
-from music_assistant.server.helpers.audio import get_media_stream
+from music_assistant.server.helpers.audio import get_ffmpeg_args, get_player_filter_params
from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.models.player_provider import PlayerProvider
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
- from music_assistant.server.controllers.streams import QueueStreamJob
from music_assistant.server.models import ProviderInstanceType
DOMAIN = "airplay"
class AirplayStream:
"""Object that holds the details of a stream job."""
- def __init__(self, prov: AirplayProvider, airplay_player: AirPlayPlayer) -> None:
+ def __init__(
+ self, prov: AirplayProvider, airplay_player: AirPlayPlayer, input_format: AudioFormat
+ ) -> None:
"""Initialize AirplayStream."""
self.prov = prov
self.mass = prov.mass
self.airplay_player = airplay_player
+ self.input_format = input_format
# always generate a new active remote id to prevent race conditions
# with the named pipe used to send audio
self.active_remote_id: str = str(randint(1000, 8000))
self.start_ntp: int | None = None # use as checksum
self.prevent_playback: bool = False
- self.stream_job: QueueStreamJob | None = None
self._log_reader_task: asyncio.Task | None = None
self._audio_reader_task: asyncio.Task | None = None
self._cliraop_proc: AsyncProcess | None = None
+ self._ffmpeg_proc: AsyncProcess | None = None
self._stop_requested = False
@property
and self._cliraop_proc.returncode is None
)
- async def start(self, start_ntp: int, stream_job: QueueStreamJob) -> None:
+ async def start(self, start_ntp: int) -> None:
"""Initialize CLIRaop process for a player."""
self.start_ntp = start_ntp
- self.stream_job = stream_job
extra_args = []
player_id = self.airplay_player.player_id
mass_player = self.mass.players.get(player_id)
"-port",
str(self.airplay_player.discovery_info.port),
"-wait",
- str(3000 - sync_adjust),
+ str(2000 - sync_adjust),
"-volume",
str(mass_player.volume_level),
*extra_args,
# connect cliraop stdin with ffmpeg stdout using os pipes
read, write = os.pipe()
+
# launch ffmpeg, feeding (player specific) audio chunks on stdout
- self._audio_reader_task = asyncio.create_task(
- stream_job.stream_to_custom_output_path(
- player_id=player_id, output_format=AIRPLAY_PCM_FORMAT, output_path=write
- )
+ # one could argue that the intermediate ffmpeg towards cliraop is not needed
+ # when there are no player specific filters or extras but in this case
+ # ffmpeg serves as a small buffer towards the realtime cliraop streamer
+ ffmpeg_args = get_ffmpeg_args(
+ input_format=self.input_format,
+ output_format=AIRPLAY_PCM_FORMAT,
+ filter_params=get_player_filter_params(self.mass, player_id),
+ )
+ # launch ffmpeg process with player specific settings
+ # the stream_job_runner will start pushing pcm chunks to the stdin
+ # the ffmpeg process will send the output directly to the given path (e.g. tcp socket)
+ self._ffmpeg_proc = AsyncProcess(
+ ffmpeg_args,
+ enable_stdin=True,
+ enable_stdout=True,
+ enable_stderr=False,
+ custom_stdout=write,
+ name="cliraop_ffmpeg",
)
+ await self._ffmpeg_proc.start()
self._cliraop_proc = AsyncProcess(
cliraop_args,
enable_stdin=True,
await self._cliraop_proc.start()
self._log_reader_task = asyncio.create_task(self._log_watcher())
- # self._audio_reader_task = asyncio.create_task(self._audio_reader())
+ async def write_chunk(self, chunk: bytes) -> None:
+ """Write a (pcm) audio chunk to the player."""
+ await self._ffmpeg_proc.write(chunk)
+
+ async def write_eof(self) -> None:
+ """Write EOF to the ffmpeg stdin."""
+ await self._ffmpeg_proc.write_eof()
+ await self._ffmpeg_proc.wait()
+ await self.stop()
async def stop(self, wait: bool = True):
"""Stop playback and cleanup."""
- if not self.running:
+ if self._cliraop_proc.closed and self._ffmpeg_proc.closed:
return
self._stop_requested = True
- # send stop with cli command
- await self.send_cli_command("ACTION=STOP")
async def _stop() -> None:
- # always stop the audio feeder
- if self._audio_reader_task and not self._audio_reader_task.done():
- with suppress(asyncio.CancelledError):
- self._audio_reader_task.cancel()
- await self._cliraop_proc.write_eof()
- # the process should exit gracefully after the stop request was processed
- await asyncio.wait_for(self._cliraop_proc.wait(), 30)
+ # ffmpeg MUST be stopped before cliraop due to the chained pipes
+ await self._ffmpeg_proc.close()
+ # allow the cliraop process to stop gracefully first
+ await self.send_cli_command("ACTION=STOP")
+ with suppress(TimeoutError):
+ await asyncio.wait_for(self._cliraop_proc.wait(), 5)
+ # send regular close anyway (which also logs the returncode)
+ await self._cliraop_proc.close()
task = self.mass.create_task(_stop())
if wait:
self.mass.players.update(airplay_player.player_id)
if "lost packet out of backlog" in line:
lost_packets += 1
- if lost_packets == 50:
+ if lost_packets == 100:
logger.warning("High packet loss detected, stopping playback...")
- queue = self.mass.player_queues.get_active_queue(mass_player.player_id)
- await self.mass.player_queues.stop(queue.queue_id)
- else:
- logger.debug(line)
+ await self.stop(False)
logger.log(VERBOSE_LOG_LEVEL, line)
if airplay_player.active_stream == self:
mass_player.state = PlayerState.IDLE
self.mass.players.update(airplay_player.player_id)
+ # ensure we're cleaned up afterwards
+ await self.stop()
async def _send_metadata(self, queue: PlayerQueue) -> None:
"""Send metadata to player (and connected sync childs)."""
if queue_item.queue_id.startswith(UGP_PREFIX):
# special case: we got forwarded a request from the UGP
# use the existing stream job that was already created by UGP
- stream_job = self.mass.streams.stream_jobs[queue_item.queue_id]
+ stream_job = self.mass.streams.multi_client_jobs[queue_item.queue_id]
+ stream_job.expected_players.add(player_id)
+ input_format = stream_job.pcm_format
+ audio_source = stream_job.subscribe(player_id)
+ elif queue_item.media_type == MediaType.ANNOUNCEMENT:
+ # special case: stream announcement
+ input_format = AIRPLAY_PCM_FORMAT
+ audio_source = self.mass.streams.get_announcement_stream(
+ queue_item.streamdetails.data["url"],
+ pcm_format=AIRPLAY_PCM_FORMAT,
+ use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
+ )
else:
- if queue_item.media_type == MediaType.ANNOUNCEMENT:
- # stream announcement url directly
- audio_source = get_media_stream(
- self.mass, queue_item.streamdetails, pcm_format=AIRPLAY_PCM_FORMAT
- )
- else:
- queue = self.mass.player_queues.get(queue_item.queue_id)
- audio_source = self.mass.streams.get_flow_stream(
- queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT
- )
- stream_job = self.mass.streams.create_stream_job(
- queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=AIRPLAY_PCM_FORMAT
+ queue = self.mass.player_queues.get(queue_item.queue_id)
+ input_format = AIRPLAY_PCM_FORMAT
+ audio_source = self.mass.streams.get_flow_stream(
+ queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT
)
+ self.mass.create_task(self._handle_stream_audio, player_id, audio_source, input_format)
+ async def _handle_stream_audio(
+ self, player_id: str, audio_source: AsyncGenerator[bytes, None], input_format: AudioFormat
+ ) -> None:
+ """Handle streaming of audio to one or more airplay players."""
# Python is not suitable for realtime audio streaming so we do the actual streaming
# of (RAOP) audio using a small executable written in C based on libraop to do the actual
- # timestamped playback. The raw pcm audio is fed to a named pipe, read by the executable
- # and we can send some ingteractie commands to the process stdin.
+ # timestamped playback, whicj reads pcm audio from stdin
+ # and we can send some interactive commands using a named pipe.
# get current ntp before we start
_, stdout = await check_output(f"{self.cliraop_bin} -ntp")
# setup Raop process for player and its sync childs
async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
- stream_job.expected_players.add(airplay_player.player_id)
- airplay_player.active_stream = AirplayStream(self, airplay_player)
- tg.create_task(airplay_player.active_stream.start(start_ntp, stream_job))
- if not queue_item.queue_id.startswith(UGP_PREFIX):
- stream_job.start()
+ airplay_player.active_stream = AirplayStream(
+ self, airplay_player, input_format=input_format
+ )
+ tg.create_task(airplay_player.active_stream.start(start_ntp))
+
+ async for chunk in audio_source:
+ active_clients = 0
+ async with asyncio.TaskGroup() as tg:
+ for airplay_player in self._get_sync_clients(player_id):
+ if not (airplay_player.active_stream and airplay_player.active_stream.running):
+ # player stopped or switched to a new stream
+ continue
+ if airplay_player.active_stream.start_ntp != start_ntp:
+ # checksum mismatch
+ continue
+ tg.create_task(airplay_player.active_stream.write_chunk(chunk))
+ active_clients += 1
+ if active_clients == 0:
+ # no more clients
+ return
+ # entire stream consumed: send EOF
+ async with asyncio.TaskGroup() as tg:
+ for airplay_player in self._get_sync_clients(player_id):
+ if (
+ not airplay_player.active_stream
+ or airplay_player.active_stream.start_ntp != start_ntp
+ ):
+ continue
+ tg.create_task(airplay_player.active_stream.write_eof())
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.
# originally/officially cast supports 96k sample rate
# but it seems a (recent?) update broke this
# for now use 48k as max sample rate to play safe
- max_sample_rate=48000,
- supports_24bit=True,
+ max_sample_rate=44100 if cast_info.is_audio_group else 48000,
+ supports_24bit=not cast_info.is_audio_group,
enabled_by_default=enabled_by_default,
),
logger=self.logger.getChild(cast_info.friendly_name),
RepeatMode,
)
from music_assistant.common.models.errors import MusicAssistantError, SetupFailedError
-from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.constants import (
CONF_CROSSFADE,
CONF_PORT,
CONF_SYNC_ADJUST,
MASS_LOGO_ONLINE,
- UGP_PREFIX,
VERBOSE_LOG_LEVEL,
)
from music_assistant.server.models.player_provider import PlayerProvider
if player.group_childs:
# player has sync members, we need to start a (multi-player) stream job
# to make sure that all clients receive the exact same audio
- pcm_format = AudioFormat(
- content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
- )
- queue = self.mass.player_queues.get(queue_item.queue_id)
- stream_job = self.mass.streams.create_stream_job(
+ stream_job = self.mass.streams.create_multi_client_stream_job(
queue_id=queue_item.queue_id,
- pcm_audio_source=self.mass.streams.get_flow_stream(
- queue,
- start_queue_item=queue_item,
- pcm_format=pcm_format,
- ),
- pcm_format=pcm_format,
+ start_queue_item=queue_item,
)
# forward command to player and any connected sync members
- sync_clients = self._get_sync_clients(player_id)
async with asyncio.TaskGroup() as tg:
- for slimplayer in sync_clients:
+ for slimplayer in self._get_sync_clients(player_id):
enforce_mp3 = await self.mass.config.get_player_config_value(
slimplayer.player_id, CONF_ENFORCE_MP3
)
auto_play=False,
)
)
- if not queue_item.queue_id.startswith(UGP_PREFIX):
- stream_job.start()
else:
# regular, single player playback
slimplayer = self.slimproto.get_player(player_id)
sync_playpoints = self._sync_playpoints[slimplayer.player_id]
active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
- stream_job = self.mass.streams.stream_jobs.get(active_queue.queue_id)
+ stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id)
if not stream_job:
# should not happen, but just in case
return
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.constants import UGP_PREFIX
-from music_assistant.server.helpers.audio import get_media_stream
+from music_assistant.server.helpers.audio import get_ffmpeg_args, get_player_filter_params
from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.models.player_provider import PlayerProvider
SNAPWEB_DIR: Final[pathlib.Path] = pathlib.Path(__file__).parent.resolve().joinpath("snapweb")
+DEFAULT_SNAPCAST_FORMAT = AudioFormat(
+ content_type=ContentType.PCM_S16LE,
+ sample_rate=48000,
+ # TODO: can we handle 24 bits bit depth ?
+ bit_depth=16,
+ channels=2,
+)
+
+
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
) -> ProviderInstanceType:
snap_group = self._get_snapgroup(player_id)
await snap_group.set_stream(stream.identifier)
- # TODO: can we handle 24 bits bit depth ?
- pcm_format = AudioFormat(
- content_type=ContentType.PCM_S16LE,
- sample_rate=48000,
- bit_depth=16,
- channels=2,
- )
-
if queue_item.queue_id.startswith(UGP_PREFIX):
# special case: we got forwarded a request from the UGP
# use the existing stream job that was already created by UGP
- stream_job = self.mass.streams.stream_jobs[queue_item.queue_id]
+ stream_job = self.mass.streams.multi_client_jobs[queue_item.queue_id]
+ stream_job.expected_players.add(player_id)
+ input_format = stream_job.pcm_format
+ audio_source = stream_job.subscribe(player_id)
+ elif queue_item.media_type == MediaType.ANNOUNCEMENT:
+ # special case: stream announcement
+ input_format = DEFAULT_SNAPCAST_FORMAT
+ audio_source = self.mass.streams.get_announcement_stream(
+ queue_item.streamdetails.data["url"],
+ pcm_format=DEFAULT_SNAPCAST_FORMAT,
+ use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
+ )
else:
- if queue_item.media_type == MediaType.ANNOUNCEMENT:
- # stream announcement url directly
- audio_source = get_media_stream(
- self.mass, queue_item.streamdetails, pcm_format=pcm_format
- )
- else:
- queue = self.mass.player_queues.get(queue_item.queue_id)
- audio_source = self.mass.streams.get_flow_stream(
- queue, start_queue_item=queue_item, pcm_format=pcm_format
- )
- stream_job = self.mass.streams.create_stream_job(
- queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=pcm_format
+ queue = self.mass.player_queues.get(queue_item.queue_id)
+ input_format = DEFAULT_SNAPCAST_FORMAT
+ audio_source = self.mass.streams.get_flow_stream(
+ queue, start_queue_item=queue_item, pcm_format=DEFAULT_SNAPCAST_FORMAT
)
- stream_job.expected_players.add(player_id)
async def _streamer() -> None:
host = self._snapcast_server_host
stream.set_callback(stream_callback)
stream_path = f"tcp://{host}:{port}"
self.logger.debug("Start streaming to %s", stream_path)
+ ffmpeg_args = get_ffmpeg_args(
+ input_format=input_format,
+ output_format=DEFAULT_SNAPCAST_FORMAT,
+ filter_params=get_player_filter_params(self.mass, player_id),
+ output_path=f"tcp://{host}:{port}",
+ )
try:
- await stream_job.stream_to_custom_output_path(
- player_id, pcm_format, f"tcp://{host}:{port}"
- )
- # we need to wait a bit for the stream status to become idle
- # to ensure that all snapclients have consumed the audio
- await self.mass.players.wait_for_state(player, PlayerState.IDLE)
+ async with AsyncProcess(
+ ffmpeg_args,
+ enable_stdin=True,
+ enable_stdout=False,
+ enable_stderr=False,
+ name="snapcast_ffmpeg",
+ ) as ffmpeg_proc:
+ async for chunk in audio_source:
+ await ffmpeg_proc.write(chunk)
+ await ffmpeg_proc.write_eof()
+ # we need to wait a bit for the stream status to become idle
+ # to ensure that all snapclients have consumed the audio
+ await self.mass.players.wait_for_state(player, PlayerState.IDLE)
finally:
self.logger.debug("Finished streaming to %s", stream_path)
# there is no way to unsub the callback to we do this nasty
stream._callback_func = None
- await self._snapserver.stream_remove_stream(stream.identifier)
+ with suppress(TypeError, KeyError, AttributeError):
+ await self._snapserver.stream_remove_stream(stream.identifier)
# start streaming the queue (pcm) audio in a background task
self._stream_tasks[player_id] = asyncio.create_task(_streamer())
- if not queue_item.queue_id.startswith(UGP_PREFIX):
- stream_job.start()
def _get_snapgroup(self, player_id: str) -> Snapgroup:
"""Get snapcast group for given player_id."""
result.artists += [
await self._parse_artist(item)
for item in searchresult["artists"]["items"]
- if (item and item["id"])
+ if (item and item["id"] and item["name"])
]
if "albums" in searchresult:
result.albums += [
artist = Artist(
item_id=artist_obj["id"],
provider=self.domain,
- name=artist_obj["name"],
+ name=artist_obj["name"] or artist_obj["id"],
provider_mappings={
ProviderMapping(
item_id=artist_obj["id"],
album.external_ids.add((ExternalID.BARCODE, album_obj["external_ids"]["ean"]))
for artist_obj in album_obj["artists"]:
+ if not artist_obj.get("name") or not artist_obj.get("id"):
+ continue
album.artists.append(await self._parse_artist(artist_obj))
with contextlib.suppress(ValueError):
if artist:
track.artists.append(artist)
for track_artist in track_obj.get("artists", []):
+ if not track_artist.get("name") or not track_artist.get("id"):
+ continue
artist = await self._parse_artist(track_artist)
if artist and artist.item_id not in {x.item_id for x in track.artists}:
track.artists.append(artist)
)
from music_assistant.common.models.enums import (
ConfigEntryType,
- ContentType,
PlayerFeature,
PlayerState,
PlayerType,
ProviderFeature,
)
-from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.constants import (
SYNCGROUP_PREFIX,
UGP_PREFIX,
)
-from music_assistant.server.controllers.streams import (
- FLOW_DEFAULT_BIT_DEPTH,
- FLOW_DEFAULT_SAMPLE_RATE,
-)
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
if member.state == PlayerState.IDLE:
continue
tg.create_task(self.mass.players.cmd_stop(member.player_id))
- if existing := self.mass.streams.stream_jobs.pop(player_id, None):
+ if existing := self.mass.streams.multi_client_jobs.pop(player_id, None):
existing.stop()
async def cmd_play(self, player_id: str) -> None:
# create a multi-client stream job - all (direct) child's of this UGP group
# will subscribe to this multi client queue stream
- pcm_format = AudioFormat(
- content_type=ContentType.from_bit_depth(FLOW_DEFAULT_BIT_DEPTH),
- sample_rate=FLOW_DEFAULT_SAMPLE_RATE,
- bit_depth=FLOW_DEFAULT_BIT_DEPTH,
- )
queue = self.mass.player_queues.get(player_id)
- stream_job = self.mass.streams.create_stream_job(
+ stream_job = self.mass.streams.create_multi_client_stream_job(
queue.queue_id,
- pcm_audio_source=self.mass.streams.get_flow_stream(
- queue=queue, start_queue_item=queue_item, pcm_format=pcm_format
- ),
- pcm_format=pcm_format,
+ start_queue_item=queue_item,
)
# create a fake queue item to forward to downstream play_media commands
ugp_queue_item = QueueItem(
- player_id, queue_item_id="flow", name=group_player.display_name, duration=None
+ player_id,
+ queue_item_id=stream_job.job_id,
+ name="Music Assistant",
+ duration=None,
)
# forward the stream job to all group members
if member is None:
continue
tg.create_task(player_prov.play_media(member.player_id, ugp_queue_item))
- stream_job.start()
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates."""
self.config = ConfigController(self)
await self.config.setup()
LOGGER.info(
- "Starting Music Assistant Server (%s) version %s - uvloop: %s",
+ "Starting Music Assistant Server (%s) version %s - HA add-on: %s",
self.server_id,
self.version,
- "uvloop" in str(self.loop),
+ self.running_as_hass_addon,
)
# setup other core controllers
self.cache = CacheController(self)
"zeroconf==0.131.0",
"cryptography==42.0.5",
"ifaddr==0.2.0",
- "uvloop==0.19.0",
]
test = [
"black==24.2.0",
sonos-websocket==0.1.3
tidalapi==0.7.4
unidecode==1.3.8
-uvloop==0.19.0
xmltodict==0.13.0
ytmusicapi==1.6.0
zeroconf==0.131.0