d.pop("current_item", None)
d.pop("next_item", None)
d.pop("index_in_buffer", None)
- d.pop("announcement_in_progress", None)
d.pop("flow_mode", None)
d.pop("flow_mode_start_index", None)
return d
d.pop("current_item", None)
d.pop("next_item", None)
d.pop("index_in_buffer", None)
- d.pop("announcement_in_progress", None)
d.pop("flow_mode", None)
d.pop("flow_mode_start_index", None)
return cls.from_dict(d)
self.mass, queue_item, seek_position=seek_position, fade_in=fade_in
)
# send play_media request to player
- await self.mass.players.play_media(
+ # NOTE that we debounce this a bit to account for someone hitting the next button
+ # like a madman. This will prevent the player from being overloaded with requests.
+ self.mass.call_later(
+ 0.25,
+ self.mass.players.play_media,
player_id=queue_id,
# transform into PlayerMedia to send to the actual player implementation
media=self.player_media_from_queue_item(queue_item, queue.flow_mode),
+ task_id=f"play_media_{queue_id}",
)
# Interaction with player
)
self.manifest.icon = "speaker-multiple"
self._poll_task: asyncio.Task | None = None
+ self._player_locks: dict[str, asyncio.Lock] = {}
async def setup(self, config: CoreConfig) -> None:
"""Async initialize of module."""
await self.mass.player_queues.play(player_id)
return
player_provider = self.get_player_provider(player_id)
- await player_provider.cmd_play(player_id)
+ async with self._player_locks[player_id]:
+ await player_provider.cmd_play(player_id)
@api_command("players/cmd/pause")
@handle_player_command
self.mass.create_task(_watch_pause(player_id))
@api_command("players/cmd/play_pause")
- @handle_player_command
async def cmd_play_pause(self, player_id: str) -> None:
"""Toggle play/pause on given player.
if PlayerFeature.POWER in player.supported_features:
# forward to player provider
player_provider = self.get_player_provider(player_id)
- await player_provider.cmd_power(player_id, powered)
+ async with self._player_locks[player_id]:
+ await player_provider.cmd_power(player_id, powered)
else:
# allow the stop command to process and prevent race conditions
await asyncio.sleep(0.2)
msg = f"Player {player.display_name} does not support volume_set"
raise UnsupportedFeaturedException(msg)
player_provider = self.get_player_provider(player_id)
- await player_provider.cmd_volume_set(player_id, volume_level)
+ async with self._player_locks[player_id]:
+ await player_provider.cmd_volume_set(player_id, volume_level)
@api_command("players/cmd/volume_up")
@handle_player_command
msg = f"Player {player.display_name} does not support muting"
raise UnsupportedFeaturedException(msg)
player_provider = self.get_player_provider(player_id)
- await player_provider.cmd_volume_mute(player_id, muted)
+ async with self._player_locks[player_id]:
+ await player_provider.cmd_volume_mute(player_id, muted)
@api_command("players/cmd/seek")
async def cmd_seek(self, player_id: str, position: int) -> None:
) -> None:
"""Handle playback of an announcement (url) on given player."""
player = self.get(player_id, True)
- if player.announcement_in_progress:
- return
+ while player.announcement_in_progress:
+ await asyncio.sleep(0.5)
if not url.startswith("http"):
raise PlayerCommandFailed("Only URLs are supported for announcements")
try:
)
return
player_prov = self.mass.players.get_player_provider(player_id)
- await player_prov.enqueue_next_media(player_id=player_id, media=media)
+ async with self._player_locks[player_id]:
+ await player_prov.enqueue_next_media(player_id=player_id, media=media)
@api_command("players/cmd/sync")
@handle_player_command
continue
if child_player.synced_to and child_player.synced_to == target_player:
continue # already synced to this target
- if child_player.synced_to and child_player.synced_to != target_player:
+ elif child_player.synced_to:
# player already synced to another player, unsync first
self.logger.warning(
"Player %s is already synced, unsyncing first", child_player.name
# forward command to the player provider after all (base) sanity checks
player_provider = self.get_player_provider(target_player)
- await player_provider.cmd_sync_many(target_player, child_player_ids)
+ async with self._player_locks[target_player]:
+ await player_provider.cmd_sync_many(target_player, child_player_ids)
@api_command("players/cmd/unsync_many")
async def cmd_unsync_many(self, player_ids: list[str]) -> None:
# register playerqueue for this player
self.mass.create_task(self.mass.player_queues.on_player_register(player))
+ # register lock for this player
+ self._player_locks[player_id] = asyncio.Lock()
+
self._players[player_id] = player
# ignore disabled players
use_crossfade = await self.mass.config.get_player_config_value(
queue.queue_id, CONF_CROSSFADE
)
+ if not start_queue_item:
+ # this can happen in some (edge case) race conditions
+ return
if start_queue_item.media_type != MediaType.TRACK:
use_crossfade = False
pcm_sample_size = int(
# del chunk
finished = True
finally:
+ if "ffmpeg_proc" not in locals():
+ # edge case: ffmpeg process was not yet started
+ return # noqa: B012
if finished and not ffmpeg_proc.closed:
await asyncio.wait_for(ffmpeg_proc.wait(), 60)
elif not ffmpeg_proc.closed:
self.mass = mass
self._tasks: list[asyncio.Task] = []
- def create_task(self, coro: Coroutine) -> None:
+ def create_task(self, coro: Coroutine, eager_start: bool = False) -> None:
"""Create a new task and add it to the manager."""
- task = self.mass.create_task(coro)
+ task = self.mass.create_task(coro, eager_start=eager_start)
self._tasks.append(task)
async def __aenter__(self) -> Self:
from zeroconf.asyncio import AsyncServiceInfo
from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import get_ip_pton, select_free_port
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
await self._cliraop_proc.start()
await asyncio.to_thread(os.close, read)
self._started.set()
- self._log_reader_task = asyncio.create_task(self._log_watcher())
+ self._log_reader_task = self.mass.create_task(self._log_watcher())
async def stop(self):
"""Stop playback and cleanup."""
return
if self.audio_source_task and not self.audio_source_task.done():
self.audio_source_task.cancel()
- if self._cliraop_proc.proc and not self._cliraop_proc.closed:
- await self.send_cli_command("ACTION=STOP")
- self._stopped = True # set after send_cli command!
if self._cliraop_proc.proc:
- try:
- await asyncio.wait_for(self._cliraop_proc.wait(), 5)
- except TimeoutError:
- self.prov.logger.warning(
- "Raop process for %s did not stop in time, is the player offline?",
- self.airplay_player.player_id,
- )
- await self._cliraop_proc.close(True)
-
+ await self._cliraop_proc.close(True)
+ self._stopped = True # set after close command!
# ffmpeg can sometimes hang due to the connected pipes
# we handle closing it but it can be a bit slow so do that in the background
if not self._ffmpeg_proc.closed:
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream.running:
# prefer interactive command to our streamer
- tg.create_task(airplay_player.active_stream.send_cli_command("ACTION=PLAY"))
+ tg.create_task(
+ airplay_player.active_stream.send_cli_command("ACTION=PLAY"),
+ )
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player.
self, airplay_player, input_format=input_format
)
- # use a buffer here to consume small hiccups as the
- # raop streaming is pretty much realtime and without a buffer to stdin
- buffer: asyncio.Queue[bytes] = asyncio.Queue(10)
-
- async def fill_buffer() -> None:
- async for chunk in audio_source:
- await buffer.put(chunk)
- await buffer.put(b"EOF")
-
- fill_buffer_task = asyncio.create_task(fill_buffer())
-
async def audio_streamer() -> None:
- try:
- while True:
- chunk = await buffer.get()
- if chunk == b"EOF":
- break
- await asyncio.gather(
- *[x.active_stream.write_chunk(chunk) for x in sync_clients],
- return_exceptions=True,
- )
-
- # entire stream consumed: send EOF
+ async for chunk in audio_source:
await asyncio.gather(
- *[x.active_stream.write_eof() for x in sync_clients],
+ *[x.active_stream.write_chunk(chunk) for x in sync_clients],
return_exceptions=True,
)
- finally:
- if not fill_buffer_task.done():
- fill_buffer_task.cancel()
- empty_queue(buffer)
+ # entire stream consumed: send EOF
+ await asyncio.gather(
+ *[x.active_stream.write_eof() for x in sync_clients],
+ return_exceptions=True,
+ )
# get current ntp and start cliraop
_, stdout = await check_output(self.cliraop_bin, "-ntp")
*[x.active_stream.start(start_ntp, wait_start) for x in sync_clients],
return_exceptions=True,
)
- self._players[player_id].active_stream.audio_source_task = asyncio.create_task(
+ self._players[player_id].active_stream.audio_source_task = self.mass.create_task(
audio_streamer()
)
- player_id: player_id of the player to handle the command.
- target_player: player_id of the syncgroup master or group player.
"""
+ if player_id == target_player:
+ return
child_player = self.mass.players.get(player_id)
assert child_player # guard
parent_player = self.mass.players.get(target_player)
group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
group_leader.group_childs.remove(player_id)
player.synced_to = None
- # guard if this was the last sync child of the group player
- if group_leader.group_childs == {group_leader.player_id}:
- group_leader.group_childs.remove(group_leader.player_id)
await self.cmd_stop(player_id)
self.mass.players.update(player_id)
import os
import time
from collections.abc import AsyncGenerator
-from typing import TYPE_CHECKING, NotRequired, TypedDict
+from typing import TYPE_CHECKING, NotRequired, TypedDict, cast
import aiofiles
import shortuuid
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- parsed_item = await self.parse_item(prov_track_id)
- assert isinstance(parsed_item, Track)
+ parsed_item = cast(Track, await self.parse_item(prov_track_id))
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
if stored_item := next((x for x in stored_items if x["item_id"] == prov_track_id), None):
# always prefer the stored info, such as the name