from music_assistant.helpers.process import AsyncProcess
from music_assistant.helpers.typing import MusicAssistant
from music_assistant.helpers.util import get_ip
-from music_assistant.models.errors import MediaNotFoundError
+from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.models.media_items import ContentType
from music_assistant.models.player_queue import PlayerQueue
await self.subscribe_client(queue_id, player_id)
try:
while True:
- audio_chunk = await self._client_queues[queue_id][player_id].get()
- await resp.write(audio_chunk)
- self._client_queues[queue_id][player_id].task_done()
+ client_queue = self._client_queues.get(queue_id).get(player_id)
+ if not client_queue:
+ break
+ audio_chunk = await client_queue.get()
if audio_chunk == b"":
- # last chunk
+ # eof
break
+ await resp.write(audio_chunk)
+ client_queue.task_done()
finally:
await self.unsubscribe_client(queue_id, player_id)
return resp
async def subscribe_client(self, queue_id: str, player_id: str) -> None:
"""Subscribe client to queue stream."""
+ if queue_id not in self._stream_tasks:
+ raise MusicAssistantError(f"No Queue stream available for {queue_id}")
+
if queue_id not in self._subscribers:
self._subscribers[queue_id] = set()
self._subscribers[queue_id].add(player_id)
if player_id in self._subscribers[queue_id]:
self._subscribers[queue_id].remove(player_id)
- self._client_queues.get(queue_id, {}).pop(player_id, None)
+ self.__cleanup_client_queue(queue_id, player_id)
self.logger.debug(
"Unsubscribed player %s from multi queue stream %s", player_id, queue_id
)
self.__multi_client_queue_stream_runner(queue_id, output_fmt)
)
- async def unpause_synced_start():
- # unpause if we got all clients (or timeout) for (more or less) synced start
- expected_clients = len(self._client_queues[queue_id])
- time_started = self._time_started[queue_id]
- while True:
- await asyncio.sleep(0.1)
- if (len(self._subscribers) >= expected_clients) or (
- time() - time_started
- ) > 5:
- await asyncio.gather(
- *[
- self.mass.players.get_player(client_id).play()
- for client_id in self._subscribers.get(queue_id, {})
- ]
- )
- break
-
- self.mass.create_task(unpause_synced_start)
-
async def stop_multi_client_queue_stream(self, queue_id: str) -> None:
"""Signal a running queue stream task and its listeners to stop."""
if queue_id not in self._stream_tasks:
"""Distribute audio chunks over connected clients in a multi client queue stream."""
queue = self.mass.players.get_player_queue(queue_id)
- def cleanup_client_queue(player_id: str):
- if client_queue := self._client_queues[queue_id].get(player_id, None):
- self.logger.debug("cleaning up child queue %s", player_id)
- for _ in range(client_queue.qsize()):
- client_queue.get_nowait()
- client_queue.task_done()
- client_queue.put_nowait(b"")
-
start_streamdetails = await queue.queue_stream_prepare()
sox_args = await get_sox_args_for_pcm_stream(
start_streamdetails.sample_rate,
async with AsyncProcess(sox_args, True) as sox_proc:
async def writer():
- # task that sends the raw pcm audio to the sox/ffmpeg process
+ """Task that sends the raw pcm audio to the sox/ffmpeg process."""
async for audio_chunk in self._get_queue_stream(
queue,
sample_rate=start_streamdetails.sample_rate,
sox_proc.write_eof()
async def reader():
- # read bytes from final output
+ """Read bytes from final output and put chunk on child queues."""
chunks_sent = 0
- chunksize = 32000 if output_fmt == ContentType.MP3 else 90000
- async for chunk in sox_proc.iterate_chunks(chunksize):
+ async for chunk in sox_proc.iterate_chunks(256000):
chunks_sent += 1
coros = []
for player_id in list(self._client_queues[queue_id].keys()):
if (
- chunks_sent >= 20
+ self._client_queues[queue_id][player_id].full()
+ and chunks_sent >= 10
and player_id not in self._subscribers[queue_id]
):
- # assume client did not connect or got disconnected somehow
- cleanup_client_queue(player_id)
+ # assume client did not connect at all or got disconnected somehow
+ self.__cleanup_client_queue(queue_id, player_id)
self._client_queues[queue_id].pop(player_id, None)
else:
coros.append(
)
await asyncio.gather(*coros)
+ # launch the reader and writer
await asyncio.gather(*[writer(), reader()])
- # send empty chunk to inform EOF
- await asyncio.gather(
- *[cq.put(b"") for cq in self._client_queues[queue_id].values()]
- )
+ # wait for all queues to consume their data
+ await asyncio.gather(
+ *[cq.join() for cq in self._client_queues[queue_id].values()]
+ )
+ # send empty chunk to inform EOF
+ await asyncio.gather(
+ *[cq.put(b"") for cq in self._client_queues[queue_id].values()]
+ )
finally:
+ self.logger.debug("Multi client queue stream %s finished", queue.queue_id)
# cleanup
self._stream_tasks.pop(queue_id, None)
for player_id in list(self._client_queues[queue_id].keys()):
- cleanup_client_queue(player_id)
+ self.__cleanup_client_queue(queue_id, player_id)
self.logger.debug("Multi client queue stream %s ended", queue.queue_id)
+ def __cleanup_client_queue(self, queue_id: str, player_id: str):
+ """Cleanup a client queue after it completes/disconnects."""
+ if client_queue := self._client_queues.get(queue_id, {}).pop(player_id, None):
+ for _ in range(client_queue.qsize()):
+ client_queue.get_nowait()
+ client_queue.task_done()
+ client_queue.put_nowait(b"")
+
async def _get_queue_stream(
self,
queue: PlayerQueue,
last_fadeout_data = b""
queue_index = None
track_count = 0
- start_timestamp = time()
pcm_fmt = ContentType.from_bit_depth(bit_depth)
self.logger.info(
# stream queue tracks one by one
while True:
+ start_timestamp = time()
# get the (next) track in queue
track_count += 1
if track_count == 1:
else:
prev_chunk = chunk
del chunk
- # guard for clients buffering too much
+ # allow clients to only buffer max ~15 seconds ahead
seconds_streamed = bytes_written / sample_size
- seconds_needed = int(time() - start_timestamp)
- if (seconds_streamed - seconds_needed) >= 10:
- await asyncio.sleep(8)
+ seconds_needed = int(time() - start_timestamp) + 15
+ diff = seconds_streamed - seconds_needed
+ if diff:
+ await asyncio.sleep(diff)
# end of the track reached
# update actual duration to the queue for more accurate now playing info
accurate_duration = bytes_written / sample_size
"""Models and helpers for a player."""
from __future__ import annotations
+import asyncio
from abc import ABC
from dataclasses import dataclass
from enum import Enum, IntEnum
_attr_group_childs: List[str] = []
_attr_name: str = ""
_attr_powered: bool = False
- _attr_elapsed_time: int = 0
+ _attr_elapsed_time: float = 0
_attr_current_url: str = ""
_attr_state: PlayerState = PlayerState.IDLE
_attr_available: bool = True
return self._attr_powered
@property
- def elapsed_time(self) -> int:
+ def elapsed_time(self) -> float:
"""Return elapsed time of current playing media in seconds."""
return self._attr_elapsed_time
@property
- def corrected_elapsed_time(self) -> int:
+ def corrected_elapsed_time(self) -> float:
"""Return corrected elapsed time of current playing media in seconds."""
return self._attr_elapsed_time + (time() - self._last_elapsed_time_received)
at more or less the same time. The player's implementation will be responsible for
synchronization of audio on child players (if possible), Music Assistant will only
coordinate the start and makes sure that every child received the same audio chunk
- within the same timespan. Multi stream is currently limited to 44100/16 only.
+ within the same timespan.
"""
return self._attr_use_multi_stream
if "elapsed_time" in changed_keys:
self._last_elapsed_time_received = time()
- elif "state" in changed_keys and self.state == PlayerState.PLAYING:
- self._attr_elapsed_time = 0
- self._last_elapsed_time_received = time()
# always update the playerqueue
self.mass.players.get_player_queue(self.player_id).on_player_update()
- if len(changed_keys) == 0 or changed_keys == {"elapsed_time"}:
+ if len(changed_keys) == 0 and changed_keys != {"corrected_elapsed_time"}:
return
self._prev_state = cur_state
"player_id": self.player_id,
"name": self.name,
"powered": self.powered,
- "elapsed_time": self.elapsed_time,
+ "elapsed_time": int(self.elapsed_time),
+ "corrected_elapsed_time": int(self.corrected_elapsed_time),
"state": self.state.value,
"available": self.available,
"is_group": self.is_group,
is_group: bool = True
_attr_group_childs: List[str] = []
- _attr_support_join_control: bool = True
+ _correct_progress = set()
@property
def volume_level(self) -> int:
# may be overridden if implementation provides this natively
group_volume = 0
active_players = 0
- for child_player in self._get_players(True):
+ for child_player in self._get_child_players(True):
group_volume += child_player.volume_level
active_players += 1
if active_players:
group_volume = group_volume / active_players
return int(group_volume)
+ @property
+ def corrected_elapsed_time(self) -> float:
+ """Return the corrected/precise elsapsed time of the grouped player."""
+ if not self.use_multi_stream:
+ return super().corrected_elapsed_time
+ # calculate from group childs
+ for child_player in self._get_child_players(True):
+ if not child_player.current_url:
+ continue
+ if self.player_id not in child_player.current_url:
+ continue
+ # if child_player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]:
+ # continue
+ return child_player.corrected_elapsed_time
+ return 0
+
+ @property
+ def state(self) -> PlayerState:
+ """Return the state of the grouped player."""
+ if not self.use_multi_stream:
+ return super().state
+ # calculate from group childs
+ for child_player in self._get_child_players(True):
+ if not child_player.current_url:
+ continue
+ if self.player_id not in child_player.current_url:
+ continue
+ if child_player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]:
+ continue
+ return child_player.state
+ return super().state
+
+ @property
+ def current_url(self) -> str:
+ """Return the current_url of the grouped player."""
+ if not self.use_multi_stream:
+ return super().current_url
+ # calculate from group childs
+ for child_player in self._get_child_players(True):
+ if not child_player.current_url:
+ continue
+ if self.player_id not in child_player.current_url:
+ continue
+ return child_player.current_url
+ return super().current_url
+
+ async def stop(self) -> None:
+ """Send STOP command to player."""
+ if not self.use_multi_stream:
+ return await super().stop()
+ # redirect command to all child players
+ await asyncio.gather(*[x.stop() for x in self._get_child_players(True)])
+
+ async def play(self) -> None:
+ """Send PLAY/UNPAUSE command to player."""
+ if not self.use_multi_stream:
+ return await super().play()
+ # redirect command to all child players
+ await asyncio.gather(*[x.play() for x in self._get_child_players(True)])
+
+ async def pause(self) -> None:
+ """Send PAUSE command to player."""
+ if not self.use_multi_stream:
+ return await super().pause()
+ # redirect command to all child players
+ await asyncio.gather(*[x.pause() for x in self._get_child_players(True)])
+
+ async def power(self, powered: bool) -> None:
+ """Send POWER command to player."""
+ if not self.use_multi_stream:
+ return await super().power(powered)
+ # redirect command to all child players
+ await asyncio.gather(*[x.power(powered) for x in self._get_child_players(True)])
+
async def volume_set(self, volume_level: int) -> None:
"""Send volume level (0..100) command to player."""
# handle group volume by only applying the valume to powered childs
volume_dif_percent = 1 + (new_volume / 100)
else:
volume_dif_percent = volume_dif / cur_volume
- for child_player in self._get_players(True):
+ for child_player in self._get_child_players(True):
cur_child_volume = child_player.volume_level
new_child_volume = cur_child_volume + (
cur_child_volume * volume_dif_percent
)
await child_player.volume_set(new_child_volume)
- def _get_players(self, only_powered: bool = False) -> List[Player]:
+ def _get_child_players(
+ self, only_powered: bool = False, only_playing: bool = False
+ ) -> List[Player]:
"""Get players attached to this group."""
- return [
- x
- for x in self.mass.players
- if x.player_id in self.group_childs and x.powered or not only_powered
- ]
+ if not self.mass:
+ return []
+ child_players = []
+ for child_id in self.group_childs:
+ if child_player := self.mass.players.get_player(child_id):
+ if not (not only_powered or child_player.powered):
+ continue
+ if not (not only_playing or child_player.state == PlayerState.PLAYING):
+ continue
+ child_players.append(child_player)
+ return child_players
def on_child_update(self, player_id: str, changed_keys: set) -> None:
"""Call when one of the child players of a playergroup updates."""
- super().on_child_update(player_id, changed_keys)
+ self.update_state(True)
if "powered" in changed_keys:
# convenience helper:
# power off group player if last child player turns off
import asyncio
import random
-import time
from asyncio import Task, TimerHandle
from dataclasses import dataclass
from enum import Enum
from music_assistant.models.errors import MediaNotFoundError, QueueEmpty
from music_assistant.models.media_items import ContentType, MediaType, StreamDetails
-from .player import Player, PlayerState
+from .player import Player, PlayerGroup, PlayerState
if TYPE_CHECKING:
from music_assistant.models.media_items import Radio, Track
self._save_task: TimerHandle = None
self._update_task: Task = None
self._signal_next: bool = False
- self._last_player_update: int = 0
self._stream_url: str = self.mass.players.streams.get_stream_url(self.queue_id)
async def setup(self) -> None:
self.mass.signal_event(EventType.QUEUE_ADDED, self)
@property
- def player(self) -> Player:
+ def player(self) -> Player | PlayerGroup:
"""Return the player attached to this queue."""
return self.mass.players.get_player(self.queue_id, include_unavailable=True)
@property
def active(self) -> bool:
"""Return bool if the queue is currenty active on the player."""
+ if self.player.use_multi_stream:
+ return self.queue_id in self.player.current_url
return self._stream_url == self.player.current_url
@property
- def elapsed_time(self) -> int:
+ def elapsed_time(self) -> float:
"""Return elapsed time of current playing media in seconds."""
if not self.active:
- return self.player.elapsed_time
+ return self.player.corrected_elapsed_time
return self._current_item_time
@property
if self.player.use_multi_stream:
# multi stream enabled, all child players should receive the same audio stream
# redirect command to all (powered) players
- tasks = []
+ coros = []
expected_clients = set()
for child_id in self.player.group_childs:
if child_player := self.mass.players.get_player(child_id):
self.queue_id, child_id
)
expected_clients.add(child_id)
- tasks.append(child_player.play_url(player_url))
- tasks.append(child_player.pause())
+ coros.append(child_player.play_url(player_url))
await self.mass.players.streams.start_multi_client_queue_stream(
self.queue_id, expected_clients, ContentType.FLAC
)
- await asyncio.gather(*tasks)
+ await asyncio.gather(*coros)
else:
# regular (single player) request
await self.player.play_url(self._stream_url)
def on_player_update(self) -> None:
"""Call when player updates."""
- self._last_player_update = time.time()
if self._last_state != self.player.state:
self._last_state = self.player.state
# handle case where stream stopped on purpose and we need to restart it
if self.player.state != PlayerState.PLAYING and self._signal_next:
self._signal_next = False
- self.mass.create_task(self.play())
+ self.mass.create_task(self.resume())
# start updater task if needed
if self.player.state == PlayerState.PLAYING:
if not self._update_task:
self._update_task = self.mass.create_task(self.__update_task())
- else:
- if self._update_task:
- self._update_task.cancel()
+ elif self._update_task:
+ self._update_task.cancel()
self._update_task = None
if not self.update_state():
def update_state(self) -> bool:
"""Update queue details, called when player updates."""
+ if self.player.active_queue.queue_id != self.queue_id:
+ return
new_index = self._current_index
track_time = self._current_item_time
new_item_loaded = False
- # if self.player.state == PlayerState.PLAYING and self.elapsed_time > 1:
- if self.player.state == PlayerState.PLAYING:
+ # if self.player.state == PlayerState.PLAYING:
+ if (
+ self.player.state == PlayerState.PLAYING
+ and self.player.corrected_elapsed_time > 0
+ ):
new_index, track_time = self.__get_queue_stream_index()
# process new index
if self._current_index != new_index:
self._signal_next = True
async def __update_task(self) -> None:
- """Update player queue every interval."""
+ """Update player queue every second while playing."""
while True:
- self.update_state()
await asyncio.sleep(1)
+ self.update_state()
def __get_queue_stream_index(self) -> Tuple[int, int]:
"""Calculate current queue index and current track elapsed time."""