)
self.manifest.icon = "speaker-multiple"
self._poll_task: asyncio.Task | None = None
+ self._player_locks: dict[str, asyncio.Lock] = {}
async def setup(self, config: CoreConfig) -> None:
"""Async initialize of module."""
await self.mass.player_queues.play(player_id)
return
player_provider = self.get_player_provider(player_id)
- await player_provider.cmd_play(player_id)
+ async with self._player_locks[player_id]:
+ await player_provider.cmd_play(player_id)
@api_command("players/cmd/pause")
@handle_player_command
self.mass.create_task(_watch_pause(player_id))
@api_command("players/cmd/play_pause")
- @handle_player_command
async def cmd_play_pause(self, player_id: str) -> None:
"""Toggle play/pause on given player.
if PlayerFeature.POWER in player.supported_features:
# forward to player provider
player_provider = self.get_player_provider(player_id)
- await player_provider.cmd_power(player_id, powered)
+ async with self._player_locks[player_id]:
+ await player_provider.cmd_power(player_id, powered)
else:
# allow the stop command to process and prevent race conditions
await asyncio.sleep(0.2)
msg = f"Player {player.display_name} does not support volume_set"
raise UnsupportedFeaturedException(msg)
player_provider = self.get_player_provider(player_id)
- await player_provider.cmd_volume_set(player_id, volume_level)
+ async with self._player_locks[player_id]:
+ await player_provider.cmd_volume_set(player_id, volume_level)
@api_command("players/cmd/volume_up")
@handle_player_command
msg = f"Player {player.display_name} does not support muting"
raise UnsupportedFeaturedException(msg)
player_provider = self.get_player_provider(player_id)
- await player_provider.cmd_volume_mute(player_id, muted)
+ async with self._player_locks[player_id]:
+ await player_provider.cmd_volume_mute(player_id, muted)
@api_command("players/cmd/seek")
async def cmd_seek(self, player_id: str, position: int) -> None:
) -> None:
"""Handle playback of an announcement (url) on given player."""
player = self.get(player_id, True)
- if player.announcement_in_progress:
- return
+ while player.announcement_in_progress:
+ await asyncio.sleep(0.5)
if not url.startswith("http"):
raise PlayerCommandFailed("Only URLs are supported for announcements")
try:
)
return
player_prov = self.mass.players.get_player_provider(player_id)
- await player_prov.enqueue_next_media(player_id=player_id, media=media)
+ async with self._player_locks[player_id]:
+ await player_prov.enqueue_next_media(player_id=player_id, media=media)
@api_command("players/cmd/sync")
@handle_player_command
continue
if child_player.synced_to and child_player.synced_to == target_player:
continue # already synced to this target
- if child_player.synced_to and child_player.synced_to != target_player:
+ elif child_player.synced_to:
# player already synced to another player, unsync first
self.logger.warning(
"Player %s is already synced, unsyncing first", child_player.name
# forward command to the player provider after all (base) sanity checks
player_provider = self.get_player_provider(target_player)
- await player_provider.cmd_sync_many(target_player, child_player_ids)
+ async with self._player_locks[target_player]:
+ await player_provider.cmd_sync_many(target_player, child_player_ids)
@api_command("players/cmd/unsync_many")
async def cmd_unsync_many(self, player_ids: list[str]) -> None:
# register playerqueue for this player
self.mass.create_task(self.mass.player_queues.on_player_register(player))
+ # register lock for this player
+ self._player_locks[player_id] = asyncio.Lock()
+
self._players[player_id] = player
# ignore disabled players
import socket
import time
from contextlib import suppress
-from dataclasses import dataclass
from random import randint, randrange
from typing import TYPE_CHECKING
from zeroconf.asyncio import AsyncServiceInfo
from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import get_ip_pton, select_free_port
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
CONF_ENTRY_SYNC_ADJUST,
ConfigEntry,
ConfigValueType,
+ ProviderConfig,
create_sample_rates_config_entry,
)
from music_assistant.common.models.enums import (
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.common.models.player_queue import PlayerQueue
+from music_assistant.common.models.provider import ProviderManifest
from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
+from music_assistant.server import MusicAssistant
from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params
from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.helpers.util import TaskManager
return None
-class AirplayStream:
- """Object that holds the details of a stream job."""
+class RaopStream:
+ """Object that holds the details of a (RAOP) stream job."""
def __init__(
self, prov: AirplayProvider, airplay_player: AirPlayPlayer, input_format: AudioFormat
) -> None:
- """Initialize AirplayStream."""
+ """Initialize RaopStream."""
self.prov = prov
self.mass = prov.mass
self.airplay_player = airplay_player
await self._cliraop_proc.start()
await asyncio.to_thread(os.close, read)
self._started.set()
- self._log_reader_task = asyncio.create_task(self._log_watcher())
+ self._log_reader_task = self.mass.create_task(self._log_watcher())
async def stop(self):
"""Stop playback and cleanup."""
if self._cliraop_proc.proc and not self._cliraop_proc.closed:
await self.send_cli_command("ACTION=STOP")
self._stopped = True # set after send_cli command!
- if self._cliraop_proc.proc:
- try:
- await asyncio.wait_for(self._cliraop_proc.wait(), 5)
- except TimeoutError:
- self.prov.logger.warning(
- "Raop process for %s did not stop in time, is the player offline?",
- self.airplay_player.player_id,
- )
- await self._cliraop_proc.close(True)
-
- # ffmpeg can sometimes hang due to the connected pipes
- # we handle closing it but it can be a bit slow so do that in the background
- if not self._ffmpeg_proc.closed:
- self.mass.create_task(self._ffmpeg_proc.close(True))
+ if self._cliraop_proc.proc and not self._cliraop_proc.closed:
+ await self._cliraop_proc.close(True)
+ if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
+ await self._ffmpeg_proc.close(True)
+ self._cliraop_proc = None
+ self._ffmpeg_proc = None
async def write_chunk(self, chunk: bytes) -> None:
"""Write a (pcm) audio chunk."""
named_pipe = f"/tmp/raop-{self.active_remote_id}" # noqa: S108
self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
- await self.mass.create_task(send_data)
+ await asyncio.to_thread(send_data)
async def _log_watcher(self) -> None:
"""Monitor stderr for the running CLIRaop process."""
logger.log(VERBOSE_LOG_LEVEL, line)
# if we reach this point, the process exited
- if airplay_player.active_stream == self:
+ if airplay_player.raop_stream == self:
mass_player.state = PlayerState.IDLE
self.mass.players.update(airplay_player.player_id)
# ensure we're cleaned up afterwards (this also logs the returncode)
await self.send_cli_command(f"PROGRESS={progress}\n")
-@dataclass
class AirPlayPlayer:
"""Holds the details of the (discovered) Airplay (RAOP) player."""
- player_id: str
- discovery_info: AsyncServiceInfo
- address: str
- logger: logging.Logger
- active_stream: AirplayStream | None = None
+ def __init__(
+ self, prov: AirplayProvider, player_id: str, discovery_info: AsyncServiceInfo, address: str
+ ) -> None:
+ """Initialize AirPlayPlayer."""
+ self.prov = prov
+ self.mass = prov.mass
+ self.player_id = player_id
+ self.discovery_info = discovery_info
+ self.address = address
+ self.logger = prov.logger.getChild(player_id)
+ self.raop_stream: RaopStream | None = None
+
+ async def cmd_stop(self, update_state: bool = True) -> None:
+ """Send STOP command to player."""
+ if self.raop_stream:
+ await self.raop_stream.stop()
+ if update_state and (mass_player := self.mass.players.get(self.player_id)):
+ mass_player.state = PlayerState.IDLE
+ self.mass.players.update(mass_player.player_id)
+
+ async def cmd_play(self) -> None:
+ """Send PLAY (unpause) command to player."""
+ if self.raop_stream and self.raop_stream.running:
+ await self.raop_stream.send_cli_command("ACTION=PLAY")
+
+ async def cmd_pause(self) -> None:
+ """Send PAUSE command to player."""
+ if not self.raop_stream or not self.raop_stream.running:
+ return
+ await self.raop_stream.send_cli_command("ACTION=PAUSE")
class AirplayProvider(PlayerProvider):
_discovery_running: bool = False
_dacp_server: asyncio.Server = None
_dacp_info: AsyncServiceInfo = None
+ _play_media_lock: asyncio.Lock = asyncio.Lock()
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
# forward command to player and any connected sync members
async with TaskManager(self.mass) as tg:
for airplay_player in self._get_sync_clients(player_id):
- if airplay_player.active_stream:
- tg.create_task(airplay_player.active_stream.stop())
- if mass_player := self.mass.players.get(airplay_player.player_id):
- mass_player.state = PlayerState.IDLE
- self.mass.players.update(mass_player.player_id)
+ tg.create_task(airplay_player.cmd_stop())
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY (unpause) command to given player.
# forward command to player and any connected sync members
async with TaskManager(self.mass) as tg:
for airplay_player in self._get_sync_clients(player_id):
- if airplay_player.active_stream and airplay_player.active_stream.running:
- # prefer interactive command to our streamer
- tg.create_task(airplay_player.active_stream.send_cli_command("ACTION=PLAY"))
+ tg.create_task(airplay_player.cmd_play())
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player.
await self.cmd_stop(player_id)
return
airplay_player = self._players[player_id]
- if airplay_player.active_stream and airplay_player.active_stream.running:
- # prefer interactive command to our streamer
- await airplay_player.active_stream.send_cli_command("ACTION=PAUSE")
+ await airplay_player.cmd_pause()
async def play_media(
self,
media: PlayerMedia,
) -> None:
"""Handle PLAY MEDIA on given player."""
+ await self._play_media_lock.acquire()
player = self.mass.players.get(player_id)
if player.synced_to:
# should not happen, but just in case
# always stop existing stream first
async with TaskManager(self.mass) as tg:
for airplay_player in self._get_sync_clients(player_id):
- if active_stream := airplay_player.active_stream:
- tg.create_task(active_stream.stop())
+ tg.create_task(airplay_player.cmd_stop(update_state=False))
# select audio source
if media.media_type == MediaType.ANNOUNCEMENT:
# special case: stream announcement
# timestamped playback, which reads pcm audio from stdin
# and we can send some interactive commands using a named pipe.
- # setup AirplayStream for player and its sync childs
+ # setup RaopStream for player and its sync childs
sync_clients = self._get_sync_clients(player_id)
for airplay_player in sync_clients:
- airplay_player.active_stream = AirplayStream(
- self, airplay_player, input_format=input_format
- )
-
- # use a buffer here to consume small hiccups as the
- # raop streaming is pretty much realtime and without a buffer to stdin
- buffer: asyncio.Queue[bytes] = asyncio.Queue(10)
-
- async def fill_buffer() -> None:
- async for chunk in audio_source:
- await buffer.put(chunk)
- await buffer.put(b"EOF")
-
- fill_buffer_task = asyncio.create_task(fill_buffer())
+ airplay_player.raop_stream = RaopStream(self, airplay_player, input_format=input_format)
async def audio_streamer() -> None:
- try:
- while True:
- chunk = await buffer.get()
- if chunk == b"EOF":
- break
- await asyncio.gather(
- *[x.active_stream.write_chunk(chunk) for x in sync_clients],
- return_exceptions=True,
- )
-
- # entire stream consumed: send EOF
+ async for chunk in audio_source:
await asyncio.gather(
- *[x.active_stream.write_eof() for x in sync_clients],
+ *[x.raop_stream.write_chunk(chunk) for x in sync_clients],
return_exceptions=True,
)
-
- finally:
- if not fill_buffer_task.done():
- fill_buffer_task.cancel()
- empty_queue(buffer)
+ # entire stream consumed: send EOF
+ await asyncio.gather(
+ *[x.raop_stream.write_eof() for x in sync_clients],
+ return_exceptions=True,
+ )
# get current ntp and start cliraop
_, stdout = await check_output(self.cliraop_bin, "-ntp")
start_ntp = int(stdout.strip())
wait_start = 1250 + (250 * len(sync_clients))
await asyncio.gather(
- *[x.active_stream.start(start_ntp, wait_start) for x in sync_clients],
+ *[x.raop_stream.start(start_ntp, wait_start) for x in sync_clients],
return_exceptions=True,
)
- self._players[player_id].active_stream.audio_source_task = asyncio.create_task(
+ self._players[player_id].raop_stream.audio_source_task = asyncio.create_task(
audio_streamer()
)
+ self._play_media_lock.release()
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.
- volume_level: volume level (0..100) to set on the player.
"""
airplay_player = self._players[player_id]
- if airplay_player.active_stream:
- await airplay_player.active_stream.send_cli_command(f"VOLUME={volume_level}\n")
+ if airplay_player.raop_stream and airplay_player.raop_stream.running:
+ await airplay_player.raop_stream.send_cli_command(f"VOLUME={volume_level}\n")
mass_player = self.mass.players.get(player_id)
mass_player.volume_level = volume_level
self.mass.players.update(player_id)
- player_id: player_id of the player to handle the command.
- target_player: player_id of the syncgroup master or group player.
"""
+ if player_id == target_player:
+ return
child_player = self.mass.players.get(player_id)
assert child_player # guard
parent_player = self.mass.players.get(target_player)
group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
group_leader.group_childs.remove(player_id)
player.synced_to = None
- # guard if this was the last sync child of the group player
- if group_leader.group_childs == {group_leader.player_id}:
- group_leader.group_childs.remove(group_leader.player_id)
await self.cmd_stop(player_id)
self.mass.players.update(player_id)
if address is None:
return
self.logger.debug("Discovered Airplay device %s on %s", display_name, address)
- self._players[player_id] = AirPlayPlayer(
- player_id, discovery_info=info, address=address, logger=self.logger.getChild(player_id)
- )
+ self._players[player_id] = AirPlayPlayer(self, player_id, info, address)
manufacturer, model = get_model_from_am(info.decoded_properties.get("am"))
if "apple tv" in model.lower():
# For now, we ignore the Apple TV until we implement the authentication.
(
x
for x in self._players.values()
- if x.active_stream and x.active_stream.active_remote_id == active_remote
+ if x.raop_stream and x.raop_stream.active_remote_id == active_remote
),
None,
)
mass_player.volume_level = volume
elif "device-prevent-playback=1" in path:
# device switched to another source (or is powered off)
- if active_stream := airplay_player.active_stream:
+ if raop_stream := airplay_player.raop_stream:
# ignore this if we just started playing to prevent false positives
if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING:
- active_stream.prevent_playback = True
+ raop_stream.prevent_playback = True
self.mass.create_task(self.monitor_prevent_playback(player_id))
elif "device-prevent-playback=0" in path:
# device reports that its ready for playback again
- if active_stream := airplay_player.active_stream:
- active_stream.prevent_playback = False
+ if raop_stream := airplay_player.raop_stream:
+ raop_stream.prevent_playback = False
# send response
date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
count = 0
if not (airplay_player := self._players.get(player_id)):
return
- prev_active_remote_id = airplay_player.active_stream.active_remote_id
+ prev_active_remote_id = airplay_player.raop_stream.active_remote_id
while count < 40:
count += 1
if not (airplay_player := self._players.get(player_id)):
return
- if not (active_stream := airplay_player.active_stream):
+ if not (raop_stream := airplay_player.raop_stream):
return
- if active_stream.active_remote_id != prev_active_remote_id:
+ if raop_stream.active_remote_id != prev_active_remote_id:
# checksum
return
- if not active_stream.prevent_playback:
+ if not raop_stream.prevent_playback:
return
await asyncio.sleep(0.5)