media_from_dict,
)
from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
-from music_assistant_models.player import PlayerMedia
from music_assistant_models.player_queue import PlayerQueue
from music_assistant_models.queue_item import QueueItem
from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
from music_assistant.helpers.util import get_changed_keys, percentage
from music_assistant.models.core_controller import CoreController
+from music_assistant.models.player import Player, PlayerMedia
if TYPE_CHECKING:
from collections.abc import Iterator
if not player.enabled:
return
- # register playerqueue for this player
- self.mass.create_task(self.mass.player_queues.on_player_register(player))
-
# register throttler for this player
self._player_throttlers[player_id] = Throttler(1, 0.05)
# signal event that a player was added
self.mass.signal_event(EventType.PLAYER_ADDED, object_id=player.player_id, data=player)
+ # register playerqueue for this player
+ await self.mass.player_queues.on_player_register(player)
+
async def register_or_update(self, player: Player) -> None:
"""Register a new player on the controller or update existing one."""
if self.mass.closing:
if not retry_on_fail or not self.mass_player:
raise
self._attr_available = False
- self.mass.players.update(self.player_id)
+ self.update_state()
self.reconnect(min(retry_on_fail + 30, 3600))
return
self.connected = True
# we rely on mdns to pick it up again later
await self._disconnect()
self._attr_available = False
- self.mass.players.update(self.player_id)
+ self.update_state()
self.reconnect(5)
self._listen_task = self.mass.create_task(_listener())
ConfigEntryType,
ContentType,
MediaType,
+ PlaybackState,
PlayerFeature,
PlayerType,
RepeatMode,
PlayerFeature.GAPLESS_PLAYBACK,
}
self._attr_can_group_with = {provider.lookup_key}
- self._multi_client_stream: MultiClientStream | None = None
+ self.multi_client_stream: MultiClientStream | None = None
self._sync_playpoints: deque[SyncPlayPoint] = deque(maxlen=MIN_REQ_PLAYPOINTS)
self._do_not_resync_before: float = 0.0
if not self.group_members:
# Simple, single-player playback
- await self._handle_play_url(
+ await self._handle_play_url_for_slimplayer(
+ self.client,
url=media.uri,
media=media,
send_flush=True,
output_format=master_audio_format,
)
# start the stream task
- self._multi_client_stream = stream = MultiClientStream(
+ self.multi_client_stream = stream = MultiClientStream(
audio_source=audio_source, audio_format=master_audio_format
)
base_url = (
url = f"{base_url}&child_player_id={slimplayer.player_id}"
stream.expected_clients += 1
tg.create_task(
- self._handle_play_url(
+ self._handle_play_url_for_slimplayer(
slimplayer,
url=url,
media=media,
async def enqueue_next_media(self, media: PlayerMedia) -> None:
"""Handle enqueuing next media item."""
- await self._handle_play_url(
+ await self._handle_play_url_for_slimplayer(
+ self.client,
url=media.uri,
media=media,
enqueue=True,
# always update the state after modifying group members
self.update_state()
- stream_session = self._multi_client_stream
- if players_added and stream_session and not stream_session.done:
+ if players_added and self.current_media and self.playback_state == PlaybackState.PLAYING:
# restart stream session if it was already playing
# for now, we dont support late joining into an existing stream
self.mass.create_task(self.play_media(self.current_media))
else:
self._attr_current_media = None
- async def _handle_play_url(
+ async def _handle_play_url_for_slimplayer(
self,
+ slimplayer: SlimClient,
url: str,
media: PlayerMedia,
enqueue: bool = False,
if queue := self.mass.player_queues.get(media.queue_id):
self.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode]
self.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
- await self.client.play_url(
+ await slimplayer.play_url(
url=url,
mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
metadata=metadata,
if queue and queue.repeat_mode == RepeatMode.ONE:
self.mass.call_later(
0.2,
- self.client.play_url(
+ slimplayer.play_url(
url=url,
mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
metadata=metadata,
childs_ready += 1
if childs_total == childs_ready:
break
+ count += 1
# all child's ready (or timeout) - start play
async with TaskManager(self.mass) as tg:
from __future__ import annotations
import logging
-from dataclasses import dataclass
from typing import TYPE_CHECKING, cast
+from aiohttp import web
from aioslimproto.models import EventType as SlimEventType
from aioslimproto.models import SlimEvent
from aioslimproto.server import SlimServer
-from music_assistant_models.enums import ProviderFeature
+from music_assistant_models.enums import ContentType, ProviderFeature
from music_assistant_models.errors import SetupFailedError
+from music_assistant_models.media_items import AudioFormat
from music_assistant.constants import CONF_PORT, CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
+from music_assistant.helpers.audio import get_player_filter_params
from music_assistant.helpers.util import is_port_in_use
from music_assistant.models.player_provider import PlayerProvider
from .constants import CONF_CLI_JSON_PORT, CONF_CLI_TELNET_PORT
-from .multi_client_stream import MultiClientStream
from .player import SqueezelitePlayer
if TYPE_CHECKING:
from aioslimproto.client import SlimClient
-@dataclass
-class StreamInfo:
- """Dataclass to store stream information."""
-
- stream_id: str
- players: list[str]
- stream_obj: MultiClientStream
-
-
class SqueezelitePlayerProvider(PlayerProvider):
"""Player provider for players using slimproto (like Squeezelite)."""
super().__init__(*args, **kwargs)
self.slimproto: SlimServer | None = None
self._players: dict[str, SqueezelitePlayer] = {}
- self._multi_client_streams: dict[str, StreamInfo] = {}
@property
def supported_features(self) -> set[ProviderFeature]:
# forward all other events to the player itself
player.handle_slim_event(event)
+
+ async def _serve_multi_client_stream(self, request: web.Request) -> web.Response:
+ """Serve the multi-client flow stream audio to a player."""
+ player_id = request.query.get("player_id")
+ fmt = request.query.get("fmt")
+ child_player_id = request.query.get("child_player_id")
+
+ if not (sync_parent := self.mass.players.get(player_id)):
+ raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
+ sync_parent = cast("SqueezelitePlayer", sync_parent)
+
+ if not (child_player := self.mass.players.get(child_player_id)):
+ raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
+
+ if not (stream := sync_parent.multi_client_stream) or stream.done:
+ raise web.HTTPNotFound(f"There is no active stream for {player_id}!")
+
+ resp = web.StreamResponse(
+ status=200,
+ reason="OK",
+ headers={
+ "Content-Type": f"audio/{fmt}",
+ },
+ )
+ await resp.prepare(request)
+
+ # return early if this is not a GET request
+ if request.method != "GET":
+ return resp
+
+ # all checks passed, start streaming!
+ self.logger.debug(
+ "Start serving multi-client flow audio stream to %s",
+ child_player.display_name,
+ )
+ output_format = AudioFormat(content_type=ContentType.try_parse(fmt))
+ async for chunk in stream.get_stream(
+ output_format=output_format,
+ filter_params=get_player_filter_params(
+ self.mass, child_player_id, stream.audio_format, output_format
+ )
+ if child_player_id
+ else None,
+ ):
+ try:
+ await resp.write(chunk)
+ except (BrokenPipeError, ConnectionResetError, ConnectionError):
+ # race condition
+ break
+
+ return resp