media_type_str = uri.split("/")[3]
media_type = MediaType(media_type_str)
item_id = uri.split("/")[4].split("?")[0]
- elif uri.startswith(("http://", "https://")):
+ elif uri.startswith(("http://", "https://", "rtsp://", "rtmp://")):
# Translate a plain URL to the URL provider
provider_instance_id_or_domain = "url"
media_type = MediaType.UNKNOWN
RADIO = "radio"
FOLDER = "folder"
ANNOUNCEMENT = "announcement"
+ FLOW_STREAM = "flow_stream"
UNKNOWN = "unknown"
@classmethod
MediaType.TRACK,
MediaType.PLAYLIST,
MediaType.RADIO,
- MediaType.ANNOUNCEMENT,
)
from mashumaro import DataClassDictMixin
-from .enums import PlayerFeature, PlayerState, PlayerType
+from .enums import MediaType, PlayerFeature, PlayerState, PlayerType
@dataclass(frozen=True)
manufacturer: str = "Unknown Manufacturer"
+@dataclass
+class PlayerMedia(DataClassDictMixin):
+ """Metadata of Media loading/loaded into a player."""
+
+ uri: str # uri or other identifier of the loaded media
+ media_type: MediaType = MediaType.UNKNOWN
+ title: str | None = None # optional
+ artist: str | None = None # optional
+ album: str | None = None # optional
+ image_url: str | None = None # optional
+ duration: int | None = None # optional
+ queue_id: str | None = None # only present for requests from queue controller
+ queue_item_id: str | None = None # only present for requests from queue controller
+ custom_data: dict | None = None # optional
+
+
@dataclass
class Player(DataClassDictMixin):
"""Representation of a Player within Music Assistant."""
# current_item_id: return item_id/uri of the current active/loaded item on the player
# this may be a MA queue_item_id, url, uri or some provider specific string
+ # deprecated: use current_media instead
current_item_id: str | None = None
+ # current_media: return current active/loaded item on the player
+ # this may be a MA queue item, url, uri or some provider specific string
+ # includes metadata if supported by the provider/player
+ current_media: PlayerMedia | None = None
+
# can_sync_with: return tuple of player_ids that can be synced to/with this player
# usually this is just a list of all player_ids within the playerprovider
can_sync_with: tuple[str, ...] = field(default=())
)
SYNCGROUP_PREFIX: Final[str] = "syncgroup_"
VERBOSE_LOG_LEVEL: Final[int] = 5
-UGP_PREFIX: Final[str] = "ugp_"
QueueEmpty,
)
from music_assistant.common.models.media_items import MediaItemType, media_from_dict
+from music_assistant.common.models.player import PlayerMedia
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import FALLBACK_DURATION
+from music_assistant.constants import CONF_FLOW_MODE, FALLBACK_DURATION, MASS_LOGO_ONLINE
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.audio import get_stream_details
from music_assistant.server.models.core_controller import CoreController
"""Return the current active/synced queue for a player."""
if player := self.mass.players.get(player_id):
# account for player that is synced (sync child)
- if player.synced_to:
+ if player.synced_to and player.synced_to != player.player_id:
return self.get_active_queue(player.synced_to)
+ # handle active group player
+ if player.active_group and player.active_group != player.player_id:
+ return self.get_active_queue(player.active_group)
# active_source may be filled with other queue id
if player.active_source != player_id and (
queue := self.get_active_queue(player.active_source)
queue.current_index = index
queue.index_in_buffer = index
queue.flow_mode_start_index = index
- queue.flow_mode = False # reset
+ queue.flow_mode = self.mass.config.get_raw_player_config_value(
+ queue_id, CONF_FLOW_MODE, False
+ )
# get streamdetails - do this here to catch unavailable items early
queue_item.streamdetails = await get_stream_details(
self.mass, queue_item, seek_position=seek_position, fade_in=fade_in
)
+ # send play_media request to player
await self.mass.players.play_media(
player_id=queue_id,
- queue_item=queue_item,
+ # transform into PlayerMedia to send to the actual player implementation
+ media=self.player_media_from_queue_item(queue_item, queue.flow_mode),
)
# Interaction with player
return index
return None
+ def player_media_from_queue_item(self, queue_item: QueueItem, flow_mode: bool) -> PlayerMedia:
+ """Parse PlayerMedia from QueueItem."""
+ media = PlayerMedia(
+ uri=self.mass.streams.resolve_stream_url(queue_item, flow_mode=flow_mode),
+ media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
+ title="Music Assistant" if flow_mode else queue_item.name,
+ image_url=MASS_LOGO_ONLINE,
+ duration=queue_item.duration,
+ queue_id=queue_item.queue_id,
+ queue_item_id=queue_item.queue_item_id,
+ )
+ if not flow_mode and queue_item.media_item:
+ media.title = queue_item.media_item.name
+ media.artist = getattr(queue_item.media_item, "artist_str", "")
+ media.album = (
+ album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
+ )
+ if queue_item.image:
+ media.image_url = self.mass.metadata.get_image_url(queue_item.image)
+ return media
+
def _get_next_index(
self, queue_id: str, cur_index: int | None, is_skip: bool = False
) -> int | None:
with suppress(QueueEmpty):
next_item = await self.preload_next_item(queue.queue_id, index)
if supports_enqueue:
- await self.mass.players.enqueue_next_queue_item(
- player_id=player.player_id, queue_item=next_item
+ await self.mass.players.enqueue_next_media(
+ player_id=player.player_id,
+ media=self.player_media_from_queue_item(next_item, queue.flow_mode),
)
return
await self.play_index(queue.queue_id, next_item.queue_item_id)
from contextlib import suppress
from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, cast
-import shortuuid
-
from music_assistant.common.helpers.util import get_changed_values
from music_assistant.common.models.config_entries import (
CONF_ENTRY_ANNOUNCE_VOLUME,
CONF_ENTRY_TTS_PRE_ANNOUNCE,
)
from music_assistant.common.models.enums import (
- ContentType,
EventType,
MediaType,
PlayerFeature,
PlayerType,
ProviderFeature,
ProviderType,
- StreamType,
)
from music_assistant.common.models.errors import (
AlreadyRegisteredError,
ProviderUnavailableError,
UnsupportedFeaturedException,
)
-from music_assistant.common.models.media_items import AudioFormat
-from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.common.models.player import Player, PlayerMedia
from music_assistant.constants import (
CONF_AUTO_PLAY,
CONF_GROUP_MEMBERS,
CONF_HIDE_PLAYER,
CONF_PLAYERS,
SYNCGROUP_PREFIX,
- UGP_PREFIX,
)
from music_assistant.server.helpers.api import api_command
+from music_assistant.server.helpers.tags import parse_tags
from music_assistant.server.models.core_controller import CoreController
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
- from collections.abc import Awaitable, Callable, Coroutine, Iterable, Iterator
+ from collections.abc import Awaitable, Callable, Coroutine, Iterator
from music_assistant.common.models.config_entries import CoreConfig
# always optimistically set the power state to update the UI
# as fast as possible and prevent race conditions
player.powered = powered
+ # always MA as active source on power ON
+ player.active_source = player_id if powered else None
self.update(player_id)
- # handle actions when a syncgroup child turns on
+ # handle actions when a (sync)group child turns on/off
if active_group_player := self._get_active_player_group(player):
- if active_group_player.startswith(SYNCGROUP_PREFIX):
- self._on_syncgroup_child_power(active_group_player, player.player_id, powered)
- elif player_prov := self.get_player_provider(active_group_player):
- player_prov.on_child_power(active_group_player, player.player_id, powered)
+ player_prov = self.get_player_provider(active_group_player)
+ player_prov.on_child_power(active_group_player, player.player_id, powered)
# handle 'auto play on power on' feature
elif (
powered
@api_command("players/cmd/group_power")
async def cmd_group_power(self, player_id: str, power: bool) -> None:
- """Handle power command for a PlayerGroup/SyncGroup."""
+ """Handle power command for a SyncGroup."""
group_player = self.get(player_id, True)
if group_player.powered == power:
return # nothing to do
- if group_player.type == PlayerType.GROUP and not player_id.startswith(UGP_PREFIX):
- # this is a native group player (and not UGP), redirect
+ if group_player.type == PlayerType.GROUP:
+ # this is a native group player, redirect
await self.cmd_power(player_id, power)
return
any_member_powered = True
if power:
# set active source to group player if the group (is going to be) powered
- member.active_source = group_player.player_id
+ member.active_group = group_player.player_id
+ member.active_source = group_player.active_source
else:
# turn off child player when group turns off
tg.create_task(self.cmd_power(member.player_id, False))
member.active_source = None
+ member.active_group = None
# edge case: group turned on but no members are powered, power them all!
if not any_member_powered and power:
for member in self.iter_group_members(group_player, only_powered=False):
tg.create_task(self.cmd_power(member.player_id, True))
- member.active_source = group_player.player_id
+ member.active_group = group_player.player_id
+ member.active_source = group_player.active_source
if power and group_player.player_id.startswith(SYNCGROUP_PREFIX):
- await self._sync_syncgroup(group_player.player_id)
+ await self.sync_syncgroup(group_player.player_id)
self.update(player_id)
@api_command("players/cmd/volume_mute")
player.active_group, url, use_pre_announce, volume_level
)
return
+ if player.type in (PlayerType.SYNC_GROUP, PlayerType.GROUP) and not player.powered:
+ # announcement request sent to inactive group,
+ # redirect to all underlying players instead
+ self.logger.warning(
+ "Detected announcement request to an inactive playergroup, "
+ "this will be redirected to the individual players."
+ )
+ async with asyncio.TaskGroup() as tg:
+ for group_member in player.group_childs:
+ tg.create_task(
+ self.play_announcement(
+ group_member,
+ url=url,
+ use_pre_announce=use_pre_announce,
+ volume_level=volume_level,
+ )
+ )
+ return
+
# determine pre-announce from (group)player config
if use_pre_announce is None and "tts" in url:
use_pre_announce = self.mass.config.get_raw_player_config_value(
use_pre_announce,
url,
)
- # create a queue item for the announcement so
+ # create a PlayerMedia object for the announcement so
# we can send a regular play-media call downstream
- announcement = QueueItem(
- queue_id=player.player_id,
- queue_item_id=url,
- name="Announcement",
- duration=None,
- streamdetails=StreamDetails(
- provider="url",
- item_id=url,
- audio_format=AudioFormat(
- content_type=ContentType.try_parse(url),
- ),
- stream_type=StreamType.HTTP,
- media_type=MediaType.ANNOUNCEMENT,
- path=url,
- target_loudness=-10,
- data={"url": url, "use_pre_announce": use_pre_announce},
- ),
+ announcement = PlayerMedia(
+ uri=self.mass.streams.get_announcement_url(player_id, url, use_pre_announce),
+ media_type=MediaType.ANNOUNCEMENT,
+ title="Announcement",
+ custom_data={"url": url, "use_pre_announce": use_pre_announce},
)
# handle native announce support
if native_announce_support:
finally:
player.announcement_in_progress = False
- async def play_media(self, player_id: str, queue_item: QueueItem) -> None:
+ @api_command("players/cmd/play_media")
+ async def play_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player.
- This is called by the Queue controller to start playing a queue item on the given player.
- The provider's own implementation should work out how to handle this request.
-
- - player_id: player_id of the player to handle the command.
- - queue_item: The QueueItem that needs to be played on the player.
+ - player_id: player_id of the player to handle the command.
+ - media: The Media that needs to be played on the player.
"""
if player_id.startswith(SYNCGROUP_PREFIX):
# redirect to syncgroup-leader if needed
await self.cmd_group_power(player_id, True)
group_player = self.get(player_id, True)
if sync_leader := self.get_sync_leader(group_player):
- await self.play_media(sync_leader.player_id, queue_item=queue_item)
+ await self.play_media(sync_leader.player_id, media=media)
group_player.state = PlayerState.PLAYING
return
player_prov = self.mass.players.get_player_provider(player_id)
await player_prov.play_media(
player_id=player_id,
- queue_item=queue_item,
+ media=media,
)
- async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
- """
- Handle enqueuing of the next queue item on the player.
-
- Only called if the player supports PlayerFeature.ENQUE_NEXT.
- Called about 1 second after a new track started playing.
- Called about 15 seconds before the end of the current track.
-
- A PlayerProvider implementation is in itself responsible for handling this
- so that the queue items keep playing until its empty or the player stopped.
-
- This will NOT be called if the end of the queue is reached (and repeat disabled).
- This will NOT be called if the player is using flow mode to playback the queue.
- """
+ @api_command("players/cmd/enqueue_next_media")
+ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
+ """Handle enqueuing of a next media item on the player."""
if player_id.startswith(SYNCGROUP_PREFIX):
# redirect to syncgroup-leader if needed
group_player = self.get(player_id, True)
if sync_leader := self.get_sync_leader(group_player):
- await self.enqueue_next_queue_item(
+ await self.enqueue_next_media(
sync_leader.player_id,
- queue_item=queue_item,
+ media=media,
)
return
player_prov = self.mass.players.get_player_provider(player_id)
- await player_prov.enqueue_next_queue_item(player_id=player_id, queue_item=queue_item)
+ await player_prov.enqueue_next_media(player_id=player_id, media=media)
@api_command("players/cmd/sync")
@handle_player_command
if PlayerFeature.SYNC not in parent_player.supported_features:
msg = f"Player {parent_player.name} does not support (un)sync commands"
raise UnsupportedFeaturedException(msg)
+ if player_id == target_player:
+ return
if child_player.synced_to:
if child_player.synced_to == parent_player.player_id:
# nothing to do: already synced to this parent
msg = f"Provider {provider} is not available!"
raise ProviderUnavailableError(msg)
if ProviderFeature.PLAYER_GROUP_CREATE in player_prov.supported_features:
- # provider supports group create feature: forward request to provider
- # the provider is itself responsible for
- # checking if the members can be used for grouping
+ # Provider supports group create feature: forward request to provider.
+ # NOTE: The provider is itself responsible for
+ # checking if the members can be used for grouping.
return await player_prov.create_group(name, members=members)
- if ProviderFeature.SYNC_PLAYERS in player_prov.supported_features:
- # default syncgroup implementation
- return await self._create_syncgroup(player_prov.instance_id, name, members)
msg = f"Provider {player_prov.name} does not support creating groups"
raise UnsupportedFeaturedException(msg)
# Syncgroup specific functions/helpers
- async def _create_syncgroup(self, provider: str, name: str, members: list[str]) -> Player:
- """Create new (providers-specific) SyncGroup with given name and members."""
- new_group_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}"
- # cleanup list, filter groups (should be handled by frontend, but just in case)
- members = [
- x.player_id
- for x in self
- if x.player_id in members
- if not x.player_id.startswith(SYNCGROUP_PREFIX)
- if x.provider == provider and PlayerFeature.SYNC in x.supported_features
- ]
- # create default config with the user chosen name
- self.mass.config.create_default_player_config(
- new_group_id,
- provider,
- name=name,
- enabled=True,
- values={CONF_GROUP_MEMBERS: members},
- )
- return self._register_syncgroup(
- group_player_id=new_group_id, provider=provider, name=name, members=members
- )
-
def get_sync_leader(self, group_player: Player) -> Player | None:
"""Get the active sync leader player for a syncgroup or synced player."""
if group_player.synced_to:
return child_player
return None
- async def _sync_syncgroup(self, player_id: str) -> None:
+ async def sync_syncgroup(self, player_id: str) -> None:
"""Sync all (possible) players of a syncgroup."""
group_player = self.get(player_id, True)
sync_leader = self.get_sync_leader(group_player)
async def _register_syncgroups(self) -> None:
"""Register all (virtual/fake) syncgroup players."""
- player_configs = await self.mass.config.get_player_configs(include_values=True)
+ player_configs = await self.mass.config.get_player_configs()
for player_config in player_configs:
if not player_config.player_id.startswith(SYNCGROUP_PREFIX):
continue
- members = player_config.get_value(CONF_GROUP_MEMBERS)
- self._register_syncgroup(
+ if not (player_prov := self.mass.get_provider(player_config.provider)):
+ continue
+ members = self.mass.config.get_raw_player_config_value(
+ player_config.player_id, CONF_GROUP_MEMBERS
+ )
+ player_prov.register_syncgroup(
group_player_id=player_config.player_id,
- provider=player_config.provider,
name=player_config.name or player_config.default_name,
members=members,
)
- def _register_syncgroup(
- self, group_player_id: str, provider: str, name: str, members: Iterable[str]
- ) -> Player:
- """Register a (virtual/fake) syncgroup player."""
- # extract player features from first/random player
- for member in members:
- if first_player := self.get(member):
- break
- else:
- # edge case: no child player is (yet) available; postpone register
- return None
- player = Player(
- player_id=group_player_id,
- provider=provider,
- type=PlayerType.SYNC_GROUP,
- name=name,
- available=True,
- powered=False,
- device_info=DeviceInfo(model="SyncGroup", manufacturer=provider.title()),
- supported_features=first_player.supported_features,
- group_childs=set(members),
- active_source=group_player_id,
- )
- self.mass.players.register_or_update(player)
- return player
-
- def _on_syncgroup_child_power(
- self, player_id: str, child_player_id: str, new_power: bool
- ) -> None:
- """
- Call when a power command was executed on one of the child player of a Player/Sync group.
-
- This is used to handle special actions such as (re)syncing.
- """
- group_player = self.mass.players.get(player_id)
- child_player = self.mass.players.get(child_player_id)
-
- if not group_player.powered:
- # guard, this should be caught in the player controller but just in case...
- return
-
- powered_childs = list(self.iter_group_members(group_player, True))
- if not new_power and child_player in powered_childs:
- powered_childs.remove(child_player)
- if new_power and child_player not in powered_childs:
- powered_childs.append(child_player)
-
- # if the last player of a group turned off, turn off the group
- if len(powered_childs) == 0:
- self.logger.debug(
- "Group %s has no more powered members, turning off group player",
- group_player.display_name,
- )
- self.mass.create_task(self.cmd_power(player_id, False))
- return
-
- group_playing = group_player.state == PlayerState.PLAYING
- is_sync_leader = (
- len(child_player.group_childs) > 0
- and child_player.active_source == group_player.player_id
- )
- if group_playing and not new_power and is_sync_leader:
- # the current sync leader player turned OFF while the group player
- # should still be playing - we need to select a new sync leader and resume
- self.logger.warning(
- "Syncleader %s turned off while syncgroup is playing, "
- "a forced resume for syngroup %s will be attempted in 5 seconds...",
- child_player.display_name,
- group_player.display_name,
- )
-
- async def forced_resync() -> None:
- # we need to wait a bit here to not run into massive race conditions
- await asyncio.sleep(5)
- await self._sync_syncgroup(group_player.player_id)
- await self.mass.player_queues.resume(group_player.player_id)
-
- self.mass.create_task(forced_resync())
- return
- if new_power:
- # if a child player turned ON while the group player is on, we need to resync/resume
- self.mass.create_task(self._sync_syncgroup(group_player.player_id))
-
async def _play_announcement(
self,
player: Player,
- announcement: QueueItem,
+ announcement: PlayerMedia,
volume_level: int | None = None,
) -> None:
"""Handle (default/fallback) implementation of the play announcement feature.
# wait for the player to stop
with suppress(TimeoutError):
await self.wait_for_state(player, PlayerState.IDLE, 10)
- # a small amount of pause before the volume command
- # prevents that the last piece of music is very loud
- await asyncio.sleep(0.2)
+ # a small amount of pause before the volume command
+ # prevents that the last piece of music is very loud
+ await asyncio.sleep(0.2)
# adjust volume if needed
# in case of a (sync) group, we need to do this for all child players
prev_volumes: dict[str, int] = {}
for volume_player_id in player.group_childs or (player.player_id,):
if not (volume_player := self.get(volume_player_id)):
continue
- if volume_player.active_source != player.active_source:
+ # filter out players that have a different source active
+ if volume_player.active_source not in (
+ player.active_source,
+ volume_player.player_id,
+ None,
+ ):
continue
prev_volume = volume_player.volume_level
announcement_volume = self.get_announcement_volume(volume_player_id, volume_level)
"Announcement to player %s - playing the announcement on the player...",
player.display_name,
)
- await self.play_media(player_id=player.player_id, queue_item=announcement)
- # wait for the player to play
+ await self.play_media(player_id=player.player_id, media=announcement)
+ # wait for the player(s) to play
with suppress(TimeoutError):
await self.wait_for_state(player, PlayerState.PLAYING, 10)
self.logger.debug(
player.display_name,
)
# wait for the player to stop playing
+ if not announcement.duration:
+ media_info = await parse_tags(announcement.custom_data["url"])
+ announcement.duration = media_info.duration
with suppress(TimeoutError):
- await self.wait_for_state(
- player, PlayerState.IDLE, (announcement.streamdetails.duration or 60) + 3
- )
+ await self.wait_for_state(player, PlayerState.IDLE, (announcement.duration or 60) + 3)
self.logger.debug(
"Announcement to player %s - restore previous state...", player.display_name
)
import time
import urllib.parse
from collections.abc import AsyncGenerator
-from contextlib import suppress
from typing import TYPE_CHECKING
-import shortuuid
from aiofiles.os import wrap
from aiohttp import web
CONF_OUTPUT_CHANNELS,
CONF_PUBLISH_IP,
SILENCE_FILE,
- UGP_PREFIX,
)
from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.server.helpers.audio import (
isfile = wrap(os.path.isfile)
-class MultiClientStreamJob:
- """
- Representation of a (multiclient) Audio Queue stream job/task.
-
- The whole idea here is that in case of a player (sync)group,
- all client players receive the exact same (PCM) audio chunks from the source audio.
- A StreamJob is tied to a Queue and streams the queue flow stream,
- In case a stream is restarted (e.g. when seeking), a new MultiClientStreamJob will be created.
- """
-
- _audio_task: asyncio.Task | None = None
-
- def __init__(
- self,
- stream_controller: StreamsController,
- queue_id: str,
- pcm_format: AudioFormat,
- start_queue_item: QueueItem,
- ) -> None:
- """Initialize MultiClientStreamJob instance."""
- self.stream_controller = stream_controller
- self.queue_id = queue_id
- self.queue = self.stream_controller.mass.player_queues.get(queue_id)
- assert self.queue # just in case
- self.pcm_format = pcm_format
- self.start_queue_item = start_queue_item
- self.job_id = shortuuid.uuid()
- self.expected_players: set[str] = set()
- self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {}
- self.bytes_streamed: int = 0
- self._all_clients_connected = asyncio.Event()
- self.logger = stream_controller.logger.getChild("streamjob")
- self._finished: bool = False
- # start running the audio task in the background
- self._audio_task = asyncio.create_task(self._stream_job_runner())
-
- @property
- def finished(self) -> bool:
- """Return if this StreamJob is finished."""
- return self._finished or self._audio_task and self._audio_task.done()
-
- @property
- def pending(self) -> bool:
- """Return if this Job is pending start."""
- return not self.finished and not self._all_clients_connected.is_set()
-
- @property
- def running(self) -> bool:
- """Return if this Job is running."""
- return not self.finished and not self.pending
-
- def stop(self) -> None:
- """Stop running this job."""
- self._finished = True
- if self._audio_task and self._audio_task.done():
- return
- if self._audio_task:
- self._audio_task.cancel()
- for sub_queue in self.subscribed_players.values():
- with suppress(asyncio.QueueFull):
- sub_queue.put_nowait(b"")
-
- def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
- """Resolve the childplayer specific stream URL to this streamjob."""
- fmt = output_codec.value
- # handle raw pcm
- if output_codec.is_pcm():
- player = self.stream_controller.mass.players.get(child_player_id)
- player_max_bit_depth = 24 if player.supports_24bit else 16
- output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate)
- output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth)
- output_channels = self.stream_controller.mass.config.get_raw_player_config_value(
- child_player_id, CONF_OUTPUT_CHANNELS, "stereo"
- )
- channels = 1 if output_channels != "stereo" else 2
- fmt += (
- f";codec=pcm;rate={output_sample_rate};"
- f"bitrate={output_bit_depth};channels={channels}"
- )
- url = f"{self.stream_controller._server.base_url}/multi/{self.queue_id}/{self.job_id}/{child_player_id}/{self.start_queue_item.queue_item_id}.{fmt}" # noqa: E501
- self.expected_players.add(child_player_id)
- return url
-
- async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]:
- """Subscribe consumer and iterate incoming chunks on the queue."""
- try:
- # some players (e.g. dlna, sonos) misbehave and do multiple GET requests
- # to the stream in an attempt to get the audio details such as duration
- # which is a bit pointless for our duration-less queue stream
- # and it completely messes with the subscription logic
- if player_id in self.subscribed_players:
- self.logger.warning(
- "Player %s is making multiple requests "
- "to the same stream, playback may be disturbed!",
- player_id,
- )
- player_id = f"{player_id}_{shortuuid.random(4)}"
- elif self._all_clients_connected.is_set():
- # client subscribes while we're already started - that is going to be messy for sure
- self.logger.warning(
- "Player %s is is joining while the stream is already started, "
- "playback may be disturbed!",
- player_id,
- )
-
- self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2)
-
- if self._all_clients_connected.is_set():
- # client subscribes while we're already started,
- # that will most probably lead to a bad experience but support it anyways
- self.logger.warning(
- "Client %s is joining while the stream is already started", player_id
- )
- self.logger.debug("Subscribed client %s", player_id)
-
- if len(self.subscribed_players) == len(self.expected_players):
- # we reached the number of expected subscribers, set event
- # so that chunks can be pushed
- await asyncio.sleep(0.2)
- self._all_clients_connected.set()
-
- # keep reading audio chunks from the queue until we receive an empty one
- while True:
- chunk = await sub_queue.get()
- if chunk == b"":
- # EOF chunk received
- break
- yield chunk
- finally:
- self.subscribed_players.pop(player_id, None)
- self.logger.debug("Unsubscribed client %s", player_id)
- # check if this was the last subscriber and we should cancel
- await asyncio.sleep(2)
- if len(self.subscribed_players) == 0 and self._audio_task and not self.finished:
- self.logger.debug("Cleaning up, all clients disappeared...")
- self._audio_task.cancel()
-
- async def _put_chunk(self, chunk: bytes) -> None:
- """Put chunk of data to all subscribers."""
- async with asyncio.TaskGroup() as tg:
- for sub_queue in list(self.subscribed_players.values()):
- # put this chunk on the player's subqueue
- tg.create_task(sub_queue.put(chunk))
- self.bytes_streamed += len(chunk)
-
- async def _stream_job_runner(self) -> None:
- """Feed audio chunks to StreamJob subscribers."""
- chunk_num = 0
- async for chunk in self.stream_controller.get_flow_stream(
- self.queue,
- self.start_queue_item,
- self.pcm_format,
- ):
- chunk_num += 1
- if chunk_num == 1:
- # wait until all expected clients are connected
- try:
- async with asyncio.timeout(10):
- await self._all_clients_connected.wait()
- except TimeoutError:
- if len(self.subscribed_players) == 0:
- self.stream_controller.logger.exception(
- "Abort multi client stream job for queue %s: "
- "clients did not connect within timeout",
- self.queue.display_name,
- )
- break
- # not all clients connected but timeout expired, set flag and move on
- # with all clients that did connect
- self._all_clients_connected.set()
- else:
- self.stream_controller.logger.debug(
- "Starting multi client stream job for queue %s "
- "with %s out of %s connected clients",
- self.queue.display_name,
- len(self.subscribed_players),
- len(self.expected_players),
- )
-
- await self._put_chunk(chunk)
-
- # mark EOF with empty chunk
- await self._put_chunk(b"")
-
-
def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
"""Parse PCM info from a codec/content_type string."""
params = (
"""Initialize instance."""
super().__init__(*args, **kwargs)
self._server = Webserver(self.logger, enable_dynamic_routes=True)
- self.multi_client_jobs: dict[str, MultiClientStreamJob] = {}
self.register_dynamic_route = self._server.register_dynamic_route
self.unregister_dynamic_route = self._server.unregister_dynamic_route
self.manifest.name = "Streamserver"
"/single/{queue_id}/{queue_item_id}.{fmt}",
self.serve_queue_item_stream,
),
- (
- "*",
- "/multi/{queue_id}/{job_id}/{player_id}/{queue_item_id}.{fmt}",
- self.serve_multi_subscriber_stream,
- ),
(
"*",
"/command/{queue_id}/{command}.mp3",
def resolve_stream_url(
self,
- player_id: str,
queue_item: QueueItem,
- output_codec: ContentType,
flow_mode: bool = False,
+ output_codec: ContentType = ContentType.FLAC,
) -> str:
"""Resolve the stream URL for the given QueueItem."""
fmt = output_codec.value
- # handle special stream created by UGP
- if queue_item.queue_id.startswith(UGP_PREFIX):
- return self.multi_client_jobs[queue_item.queue_id].resolve_stream_url(
- player_id, output_codec
- )
- # handle announcement item
- if queue_item.media_type == MediaType.ANNOUNCEMENT:
- return self.get_announcement_url(
- player_id=queue_item.queue_id,
- announcement_url=queue_item.streamdetails.data["url"],
- use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
- content_type=output_codec,
- )
# handle raw pcm without exact format specifiers
if output_codec.is_pcm() and ";" not in fmt:
fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
url += "?" + urllib.parse.urlencode(query_params)
return url
- def create_multi_client_stream_job(
- self,
- queue_id: str,
- start_queue_item: QueueItem,
- pcm_bit_depth: int = FLOW_DEFAULT_BIT_DEPTH,
- pcm_sample_rate: int = FLOW_DEFAULT_SAMPLE_RATE,
- ) -> MultiClientStreamJob:
- """Create a MultiClientStreamJob for the given queue..
-
- This is called by player/sync group implementations to start streaming
- the queue audio to multiple players at once.
- """
- if existing_job := self.multi_client_jobs.pop(queue_id, None):
- if (
- queue_id.startswith(UGP_PREFIX)
- and existing_job.job_id == start_queue_item.queue_item_id
- ):
- return existing_job
- # cleanup existing job first
- if not existing_job.finished:
- existing_job.stop()
- self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob(
- self,
- queue_id=queue_id,
- pcm_format=AudioFormat(
- content_type=ContentType.from_bit_depth(pcm_bit_depth),
- sample_rate=pcm_sample_rate,
- bit_depth=pcm_bit_depth,
- channels=2,
- ),
- start_queue_item=start_queue_item,
- )
- return stream_job
-
async def serve_queue_item_stream(self, request: web.Request) -> web.Response:
"""Stream single queueitem audio to a player."""
self._log_request(request)
return resp
- async def serve_multi_subscriber_stream(self, request: web.Request) -> web.Response:
- """Stream Queue Flow audio to a child player within a multi subscriber setup."""
- self._log_request(request)
- queue_id = request.match_info["queue_id"]
- streamjob = self.multi_client_jobs.get(queue_id)
- if not streamjob:
- raise web.HTTPNotFound(reason=f"Unknown StreamJob for queue: {queue_id}")
- job_id = request.match_info["job_id"]
- if job_id != streamjob.job_id:
- raise web.HTTPNotFound(reason=f"StreamJob ID {job_id} mismatch for queue: {queue_id}")
- child_player_id = request.match_info["player_id"]
- child_player = self.mass.players.get(child_player_id)
- if not child_player:
- raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
- # work out (childplayer specific!) output format/details
- output_format = await self._get_output_format(
- output_format_str=request.match_info["fmt"],
- queue_player=child_player,
- default_sample_rate=streamjob.pcm_format.sample_rate,
- default_bit_depth=streamjob.pcm_format.bit_depth,
- )
- # prepare request, add some DLNA/UPNP compatible headers
- headers = {
- **DEFAULT_STREAM_HEADERS,
- "Content-Type": f"audio/{output_format.output_format_str}",
- }
- resp = web.StreamResponse(
- status=200,
- reason="OK",
- headers=headers,
- )
- await resp.prepare(request)
-
- # return early if this is not a GET request
- if request.method != "GET":
- return resp
-
- # all checks passed, start streaming!
- self.logger.debug(
- "Start serving multi-subscriber Queue flow audio stream for queue %s to player %s",
- streamjob.queue.display_name,
- child_player.display_name,
- )
-
- async for chunk in get_ffmpeg_stream(
- audio_input=streamjob.subscribe(child_player_id),
- input_format=streamjob.pcm_format,
- output_format=output_format,
- filter_params=get_player_filter_params(self.mass, child_player_id),
- ):
- try:
- await resp.write(chunk)
- except (BrokenPipeError, ConnectionResetError):
- # race condition
- break
-
- return resp
-
async def serve_command_request(self, request: web.Request) -> web.Response:
"""Handle special 'command' request for a player."""
self._log_request(request)
queue.display_name,
use_crossfade,
)
+ total_bytes_sent = 0
while True:
# get (next) queue item to stream
queue_track.streamdetails,
pcm_format=pcm_format,
# strip silence from begin/end if track is being crossfaded
- strip_silence_begin=use_crossfade and bytes_written > 0,
+ strip_silence_begin=use_crossfade and total_bytes_sent > 0,
strip_silence_end=use_crossfade,
):
- # required buffer size is a bit dynamic,
- # it needs to be small when the flow stream starts
- seconds_streamed = int(bytes_written / pcm_sample_size)
- if not use_crossfade or seconds_streamed < 5:
- buffer_size = pcm_sample_size
- elif seconds_streamed < 10:
- buffer_size = pcm_sample_size * 2
- elif use_crossfade and seconds_streamed < 20:
- buffer_size = pcm_sample_size * 5
+ # buffer size needs to be big enough to include the crossfade part
+ # allow it to be a bit smaller when playback just starts
+ if not use_crossfade:
+ req_buffer_size = pcm_sample_size
+ elif (total_bytes_sent + bytes_written) < crossfade_size:
+ req_buffer_size = int(crossfade_size / 2)
else:
- buffer_size = crossfade_size + pcm_sample_size * 2
- # buffer size needs to be big enough to include the crossfade part
+ req_buffer_size = crossfade_size
# ALWAYS APPEND CHUNK TO BUFFER
buffer += chunk
del chunk
- if len(buffer) < buffer_size:
+ if len(buffer) < req_buffer_size:
# buffer is not full enough, move on
continue
buffer = b""
#### OTHER: enough data in buffer, feed to output
- while len(buffer) > buffer_size:
- subchunk = buffer[:pcm_sample_size]
+ while len(buffer) > req_buffer_size:
+ yield buffer[:pcm_sample_size]
+ bytes_written += pcm_sample_size
buffer = buffer[pcm_sample_size:]
- bytes_written += len(subchunk)
- yield subchunk
- del subchunk
#### HANDLE END OF TRACK
if last_fadeout_part:
# if crossfade is enabled, save fadeout part to pickup for next track
last_fadeout_part = buffer[-crossfade_size:]
remaining_bytes = buffer[:-crossfade_size]
- yield remaining_bytes
- bytes_written += len(remaining_bytes)
+ if remaining_bytes:
+ yield remaining_bytes
+ bytes_written += len(remaining_bytes)
del remaining_bytes
- else:
+ elif buffer:
# no crossfade enabled, just yield the buffer last part
bytes_written += len(buffer)
yield buffer
queue_track.streamdetails.duration = (
queue_track.streamdetails.seek_position + seconds_streamed
)
+ total_bytes_sent += bytes_written
self.logger.debug(
"Finished Streaming queue track: %s (%s) on queue %s",
queue_track.streamdetails.uri,
queue_track.streamdetails.seconds_streamed += last_part_seconds
queue_track.streamdetails.duration += last_part_seconds
del last_fadeout_part
-
+ total_bytes_sent += bytes_written
self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
async def get_announcement_stream(
strip_silence_end = False
# pcm_sample_size = chunk size = 1 second of pcm audio
pcm_sample_size = pcm_format.pcm_sample_size
- buffer_size_begin = pcm_sample_size * 2 if strip_silence_begin else pcm_sample_size * 1
- buffer_size_end = pcm_sample_size * 5 if strip_silence_end else pcm_sample_size * 1
+ buffer_size = (
+ pcm_sample_size * 5 if (strip_silence_begin or strip_silence_end) else pcm_sample_size
+ )
# collect all arguments for ffmpeg
filter_params = []
input_format=streamdetails.audio_format,
output_format=pcm_format,
filter_params=filter_params,
- # we criple ffmpeg a bit on purpose with the filter_threads
- # option so it doesn't consume all cpu when calculating loudnorm
- extra_input_args=[*extra_input_args, "-filter_threads", "1"],
+ extra_input_args=[
+ *extra_input_args,
+ # we criple ffmpeg a bit on purpose with the filter_threads
+ # option so it doesn't consume all cpu when calculating loudnorm
+ "-filter_threads",
+ "1",
+ ],
name="ffmpeg_media_stream",
stderr_enabled=True,
) as ffmpeg_proc:
chunk_num = 0
async for chunk in ffmpeg_proc.iter_chunked(pcm_sample_size):
chunk_num += 1
- required_buffer = buffer_size_begin if chunk_num < 60 else buffer_size_end
buffer += chunk
del chunk
- if len(buffer) < required_buffer:
+ if len(buffer) < buffer_size:
# buffer is not full enough, move on
continue
continue
#### OTHER: enough data in buffer, feed to output
- while len(buffer) > required_buffer:
- subchunk = buffer[:pcm_sample_size]
+ while len(buffer) > buffer_size:
+ yield buffer[:pcm_sample_size]
+ state_data["bytes_sent"] += pcm_sample_size
buffer = buffer[pcm_sample_size:]
- state_data["bytes_sent"] += len(subchunk)
- yield subchunk
- del subchunk
# all chunks received, strip silence of last part if needed and yield remaining bytes
if strip_silence_end:
class FFMpeg(AsyncProcess):
"""FFMpeg wrapped as AsyncProcess."""
- def __init__(
+ def __init__( # noqa: PLR0913
self,
audio_input: AsyncGenerator[bytes, None] | str | int,
input_format: AudioFormat,
name: str = "ffmpeg",
stderr_enabled: bool = False,
audio_output: str | int = "-",
+ loglevel: str | None = None,
) -> None:
"""Initialize AsyncProcess."""
ffmpeg_args = get_ffmpeg_args(
input_path=audio_input if isinstance(audio_input, str) else "-",
output_path=audio_output if isinstance(audio_output, str) else "-",
extra_input_args=extra_input_args or [],
- loglevel="info"
+ loglevel=loglevel or "info"
if stderr_enabled or LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL)
else "error",
)
"-nostats",
"-ignore_unknown",
"-protocol_whitelist",
- "file,http,https,tcp,tls,crypto,pipe,data,fd",
+ "file,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp",
]
# collect input args
input_args = []
from typing import TYPE_CHECKING
from music_assistant.common.models.enums import MediaType
-from music_assistant.constants import MASS_LOGO_ONLINE, UGP_PREFIX
+from music_assistant.constants import MASS_LOGO_ONLINE
if TYPE_CHECKING:
- from music_assistant.common.models.queue_item import QueueItem
- from music_assistant.server import MusicAssistant
+ from music_assistant.common.models.player import PlayerMedia
# ruff: noqa: E501
-def create_didl_metadata(mass: MusicAssistant, url: str, queue_item: QueueItem) -> str:
- """Create DIDL metadata string from url and (optional) QueueItem."""
- ext = url.split(".")[-1].split("?")[0]
- if "flow" in url or queue_item.queue_id.startswith(UGP_PREFIX):
- # flow stream
+def create_didl_metadata(media: PlayerMedia) -> str:
+ """Create DIDL metadata string from url and PlayerMedia."""
+ ext = media.uri.split(".")[-1].split("?")[0]
+ image_url = media.image_url or MASS_LOGO_ONLINE
+ if media.media_type in (MediaType.FLOW_STREAM, MediaType.RADIO) or not media.duration:
+ # flow stream, radio or other duration-less stream
+ title = media.title or media.uri
return (
'<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
f'<item id="flowmode" parentID="0" restricted="1">'
- f"<dc:title>Music Assistant</dc:title>"
- f"<upnp:albumArtURI>{escape_string(MASS_LOGO_ONLINE)}</upnp:albumArtURI>"
- f"<dc:queueItemId>{queue_item.queue_id}</dc:queueItemId>"
- "<upnp:class>object.item.audioItem.audioBroadcast</upnp:class>"
- f"<upnp:mimeType>audio/{ext}</upnp:mimeType>"
- f'<res duration="23:59:59.000" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_PN={ext.upper()};DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{escape_string(url)}</res>'
- "</item>"
- "</DIDL-Lite>"
- )
- image_url = (
- mass.metadata.get_image_url(queue_item.image) if queue_item.image else MASS_LOGO_ONLINE
- )
- if queue_item.media_type != MediaType.TRACK or not queue_item.duration:
- # radio or other non-track item
- return (
- '<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
- f'<item id="flowmode" parentID="0" restricted="1">'
- f"<dc:title>{escape_string(queue_item.name)}</dc:title>"
+ f"<dc:title>{escape_string(title)}</dc:title>"
f"<upnp:albumArtURI>{escape_string(image_url)}</upnp:albumArtURI>"
- f"<dc:queueItemId>{queue_item.queue_item_id}</dc:queueItemId>"
+ f"<dc:queueItemId>{media.uri}</dc:queueItemId>"
"<upnp:class>object.item.audioItem.audioBroadcast</upnp:class>"
f"<upnp:mimeType>audio/{ext}</upnp:mimeType>"
- f'<res duration="23:59:59.000" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_PN={ext.upper()};DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{escape_string(url)}</res>'
+ f'<res duration="23:59:59.000" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_PN={ext.upper()};DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{escape_string(media.uri)}</res>'
"</item>"
"</DIDL-Lite>"
)
- title = escape_string(queue_item.media_item.name)
- if queue_item.media_item.artists and queue_item.media_item.artists[0].name:
- artist = escape_string(queue_item.media_item.artists[0].name)
- else:
- artist = ""
- if queue_item.media_item.album and queue_item.media_item.album.name:
- album = escape_string(queue_item.media_item.album.name)
- else:
- album = ""
- duration_str = str(datetime.timedelta(seconds=queue_item.duration)) + ".000"
+ duration_str = str(datetime.timedelta(seconds=media.duration or 0)) + ".000"
return (
'<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
'<item id="1" parentID="0" restricted="1">'
- f"<dc:title>{title}</dc:title>"
- f"<dc:creator>{artist}</dc:creator>"
- f"<upnp:album>{album}</upnp:album>"
- f"<upnp:artist>{artist}</upnp:artist>"
- f"<upnp:duration>{int(queue_item.duration)}</upnp:duration>"
- f"<dc:queueItemId>{queue_item.queue_item_id}</dc:queueItemId>"
+ f"<dc:title>{escape_string(media.title or media.uri)}</dc:title>"
+ f"<dc:creator>{escape_string(media.artist or '')}</dc:creator>"
+ f"<upnp:album>{escape_string(media.album or '')}</upnp:album>"
+ f"<upnp:artist>{escape_string(media.artist or '')}</upnp:artist>"
+ f"<upnp:duration>{int(media.duration or 0)}</upnp:duration>"
+ f"<dc:queueItemId>{media.uri}</dc:queueItemId>"
f"<upnp:albumArtURI>{escape_string(image_url)}</upnp:albumArtURI>"
"<upnp:class>object.item.audioItem.audioBroadcast</upnp:class>"
f"<upnp:mimeType>audio/{ext}</upnp:mimeType>"
- f'<res duration="{duration_str}" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_PN={ext.upper()};DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{escape_string(url)}</res>'
+ f'<res duration="{duration_str}" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_PN={ext.upper()};DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{escape_string(media.uri)}</res>'
"</item>"
"</DIDL-Lite>"
)
--- /dev/null
+"""Implementation of a simple multi-client stream task/job."""
+
+import asyncio
+import logging
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+
+from music_assistant.common.helpers.util import empty_queue
+from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.server.helpers.audio import get_ffmpeg_stream
+
+LOGGER = logging.getLogger(__name__)
+
+
+class MultiClientStream:
+ """Implementation of a simple multi-client (audio) stream task/job."""
+
+ def __init__(
+ self,
+ audio_source: AsyncGenerator[bytes, None],
+ audio_format: AudioFormat,
+ expected_clients: int = 0,
+ ) -> None:
+ """Initialize MultiClientStream."""
+ self.audio_source = audio_source
+ self.audio_format = audio_format
+ self.subscribers: list[asyncio.Queue] = []
+ self.expected_clients = expected_clients
+ self.task = asyncio.create_task(self._runner())
+
+ @property
+ def done(self) -> bool:
+ """Return if this stream is already done."""
+ return self.task.done()
+
+ async def stop(self) -> None:
+ """Stop/cancel the stream."""
+ if self.done:
+ return
+ self.task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self.task
+ for sub_queue in list(self.subscribers):
+ empty_queue(sub_queue)
+
+ async def get_stream(
+ self, output_format: AudioFormat, filter_params: list[str] | None = None
+ ) -> AsyncGenerator[bytes, None]:
+ """Get (client specific encoded) ffmpeg stream."""
+ async for chunk in get_ffmpeg_stream(
+ audio_input=self.subscribe_raw(),
+ input_format=self.audio_format,
+ output_format=output_format,
+ filter_params=filter_params,
+ ):
+ yield chunk
+
+ async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
+ """Subscribe to the raw/unaltered audio stream."""
+ try:
+ queue = asyncio.Queue(1)
+ self.subscribers.append(queue)
+ while True:
+ chunk = await queue.get()
+ if chunk == b"":
+ break
+ yield chunk
+ finally:
+ with suppress(ValueError):
+ self.subscribers.remove(queue)
+
+ async def _runner(self) -> None:
+ """Run the stream for the given audio source."""
+ expected_clients = self.expected_clients or 1
+ # wait for first/all subscriber
+ count = 0
+ while count < 50:
+ await asyncio.sleep(0.5)
+ count += 1
+ if len(self.subscribers) >= expected_clients:
+ break
+ if count == 50:
+ return
+ LOGGER.debug(
+ "Starting multi-client stream with %s/%s clients",
+ len(self.subscribers),
+ self.expected_clients,
+ )
+ async for chunk in self.audio_source:
+ if len(self.subscribers) == 0:
+ return
+ async with asyncio.TaskGroup() as tg:
+ for sub in list(self.subscribers):
+ tg.create_task(sub.put(chunk))
+ # EOF: send empty chunk
+ async with asyncio.TaskGroup() as tg:
+ for sub in list(self.subscribers):
+ tg.create_task(sub.put(b""))
},
)
self.logger.info("Starting server on %s:%s - base url: %s", bind_ip, bind_port, base_url)
- self._apprunner = web.AppRunner(self._webapp, access_log=None)
+ self._apprunner = web.AppRunner(self._webapp, access_log=None, shutdown_timeout=10)
# add static routes
if self._static_routes:
for method, path, handler in self._static_routes:
await self._apprunner.setup()
# set host to None to bind to all addresses on both IPv4 and IPv6
host = None if bind_ip == "0.0.0.0" else bind_ip
- self._tcp_site = web.TCPSite(
- self._apprunner, host=host, port=bind_port, shutdown_timeout=10
- )
+ self._tcp_site = web.TCPSite(self._apprunner, host=host, port=bind_port)
await self._tcp_site.start()
async def close(self) -> None:
from __future__ import annotations
from abc import abstractmethod
-from typing import TYPE_CHECKING
+from collections.abc import Iterable
+
+import shortuuid
from music_assistant.common.models.config_entries import (
CONF_ENTRY_ANNOUNCE_VOLUME,
ConfigValueOption,
PlayerConfig,
)
-from music_assistant.common.models.enums import ConfigEntryType
-from music_assistant.constants import (
- CONF_GROUP_MEMBERS,
- CONF_GROUP_PLAYERS,
- SYNCGROUP_PREFIX,
- UGP_PREFIX,
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ PlayerFeature,
+ PlayerState,
+ PlayerType,
+ ProviderFeature,
)
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
+from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_GROUP_PLAYERS, SYNCGROUP_PREFIX
from .provider import Provider
-if TYPE_CHECKING:
- from music_assistant.common.models.player import Player
- from music_assistant.common.models.queue_item import QueueItem
-
-
# ruff: noqa: ARG001, ARG002
),
CONF_ENTRY_PLAYER_ICON_GROUP,
)
- if not player_id.startswith((SYNCGROUP_PREFIX, UGP_PREFIX)):
+ if not player_id.startswith(SYNCGROUP_PREFIX):
# add default entries for announce feature
entries = (
*entries,
async def play_media(
self,
player_id: str,
- queue_item: QueueItem,
+ media: PlayerMedia,
) -> None:
"""Handle PLAY MEDIA on given player.
- This is called by the Queue controller to start playing a queue item on the given player.
+ This is called by the Players controller to start playing a mediaitem on the given player.
The provider's own implementation should work out how to handle this request.
- player_id: player_id of the player to handle the command.
- - queue_item: The QueueItem that needs to be played on the player.
+ - media: Details of the item that needs to be played on the player.
"""
raise NotImplementedError
- async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
+ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
"""
- Handle enqueuing of the next queue item on the player.
+ Handle enqueuing of the next (queue) item on the player.
Only called if the player supports PlayerFeature.ENQUE_NEXT.
Called about 1 second after a new track started playing.
"""
async def play_announcement(
- self, player_id: str, announcement: QueueItem, volume_level: int | None = None
+ self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None
) -> None:
"""Handle (provider native) playback of an announcement on given player."""
# will only be called for players with PLAY_ANNOUNCEMENT feature set.
- name: Name for the new group to create.
- members: A list of player_id's that should be part of this group.
"""
- # will only be called for players with PLAYER_GROUP_CREATE feature set.
- raise NotImplementedError
+ # should only be called for providers with PLAYER_GROUP_CREATE feature set.
+ if ProviderFeature.PLAYER_GROUP_CREATE not in self.supported_features:
+ raise NotImplementedError
+ # default implementation: create syncgroup
+ new_group_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}"
+ # cleanup list, filter groups (should be handled by frontend, but just in case)
+ members = [
+ x.player_id
+ for x in self.players
+ if x.player_id in members
+ if not x.player_id.startswith(SYNCGROUP_PREFIX)
+ if x.provider == self.instance_id and PlayerFeature.SYNC in x.supported_features
+ ]
+ # create default config with the user chosen name
+ self.mass.config.create_default_player_config(
+ new_group_id,
+ self.instance_id,
+ name=name,
+ enabled=True,
+ values={CONF_GROUP_MEMBERS: members},
+ )
+ return self.register_syncgroup(group_player_id=new_group_id, name=name, members=members)
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates.
def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None:
"""
- Call when a power command was executed on one of the child player of a Player/Sync group.
+ Call when a power command was executed on one of the child players of a Sync group.
This is used to handle special actions such as (re)syncing.
"""
+ group_player = self.mass.players.get(player_id)
+ child_player = self.mass.players.get(child_player_id)
+
+ if not group_player.powered:
+ # guard, this should be caught in the player controller but just in case...
+ return
+
+ powered_childs = list(self.mass.players.iter_group_members(group_player, True))
+ if not new_power and child_player in powered_childs:
+ powered_childs.remove(child_player)
+ if new_power and child_player not in powered_childs:
+ powered_childs.append(child_player)
+
+ # if the last player of a group turned off, turn off the group
+ if len(powered_childs) == 0:
+ self.logger.debug(
+ "Group %s has no more powered members, turning off group player",
+ group_player.display_name,
+ )
+ self.mass.create_task(self.mass.players.cmd_power(player_id, False))
+ return
+
+ # the below actions are only suitable for syncgroups
+ if ProviderFeature.SYNC_PLAYERS not in self.supported_features:
+ return
+
+ group_playing = group_player.state == PlayerState.PLAYING
+ is_sync_leader = (
+ len(child_player.group_childs) > 0
+ and child_player.active_source == group_player.player_id
+ )
+ if group_playing and not new_power and is_sync_leader:
+ # the current sync leader player turned OFF while the group player
+ # should still be playing - we need to select a new sync leader and resume
+ self.logger.warning(
+ "Syncleader %s turned off while syncgroup is playing, "
+ "a forced resume for syngroup %s will be attempted in 5 seconds...",
+ child_player.display_name,
+ group_player.display_name,
+ )
+
+ async def full_resync() -> None:
+ await self.mass.players.sync_syncgroup(group_player.player_id)
+ await self.mass.player_queues.resume(group_player.player_id)
+
+ self.mass.call_later(5, full_resync, task_id=f"forced_resync_{player_id}")
+ return
+ elif new_power:
+ # if a child player turned ON while the group is already active, we need to resync
+ sync_leader = self.mass.players.get_sync_leader(group_player)
+ if sync_leader.player_id != child_player_id:
+ self.mass.create_task(
+ self.cmd_sync(child_player_id, sync_leader.player_id),
+ )
+
+ def register_syncgroup(self, group_player_id: str, name: str, members: Iterable[str]) -> Player:
+ """Register a (virtual/fake) syncgroup player."""
+ # extract player features from first/random player
+ for member in members:
+ if first_player := self.mass.players.get(member):
+ break
+ else:
+ # edge case: no child player is (yet) available; postpone register
+ return None
+ player = Player(
+ player_id=group_player_id,
+ provider=self.instance_id,
+ type=PlayerType.SYNC_GROUP,
+ name=name,
+ available=True,
+ powered=False,
+ device_info=DeviceInfo(model="SyncGroup", manufacturer=self.name),
+ supported_features=first_player.supported_features,
+ group_childs=set(members),
+ active_source=group_player_id,
+ )
+ self.mass.players.register_or_update(player)
+ return player
# DO NOT OVERRIDE BELOW
ProviderFeature,
)
from music_assistant.common.models.media_items import AudioFormat
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.common.models.player_queue import PlayerQueue
-from music_assistant.constants import CONF_SYNC_ADJUST, UGP_PREFIX, VERBOSE_LOG_LEVEL
-from music_assistant.server.helpers.audio import get_ffmpeg_args, get_player_filter_params
+from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
+from music_assistant.server.helpers.audio import (
+ get_ffmpeg_args,
+ get_ffmpeg_stream,
+ get_player_filter_params,
+)
from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
+ from music_assistant.server.providers.ugp import UniversalGroupProvider
DOMAIN = "airplay"
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS,)
+ return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE)
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
server=f"{socket.gethostname()}.local",
)
await self.mass.aiozc.async_register_service(self._dacp_info)
- self._resync_handle: asyncio.TimerHandle | None = None
async def on_mdns_service_state_change(
self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
async def play_media(
self,
player_id: str,
- queue_item: QueueItem,
+ media: PlayerMedia,
) -> None:
"""Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
if player.synced_to:
# should not happen, but just in case
raise RuntimeError("Player is synced")
- # fix race condition where resync and play media are called at more or less the same time
- if self._resync_handle:
- self._resync_handle.cancel()
- self._resync_handle = None
# always stop existing stream first
- wait_stopped = not queue_item.streamdetails or queue_item.streamdetails.seek_position == 0
async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream.running:
- tg.create_task(airplay_player.active_stream.stop(wait=wait_stopped))
+ tg.create_task(airplay_player.active_stream.stop(wait=False))
# select audio source
- if queue_item.media_type == MediaType.ANNOUNCEMENT:
+ if media.media_type == MediaType.ANNOUNCEMENT:
# special case: stream announcement
input_format = AIRPLAY_PCM_FORMAT
audio_source = self.mass.streams.get_announcement_stream(
- queue_item.streamdetails.data["url"],
+ media.custom_data["url"],
output_format=AIRPLAY_PCM_FORMAT,
- use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
+ use_pre_announce=media.custom_data["use_pre_announce"],
)
- elif queue_item.queue_id.startswith(UGP_PREFIX):
- # special case: we got forwarded a request from the UGP
- # use the existing stream job that was already created by UGP
- stream_job = self.mass.streams.multi_client_jobs[queue_item.queue_id]
- stream_job.expected_players.add(player_id)
- input_format = stream_job.pcm_format
- audio_source = stream_job.subscribe(player_id)
- else:
- queue = self.mass.player_queues.get(queue_item.queue_id)
+ elif media.queue_id.startswith("ugp_"):
+ # special case: UGP stream
+ ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
+ ugp_stream = ugp_provider.streams[media.queue_id]
+ input_format = ugp_stream.audio_format
+ audio_source = ugp_stream.subscribe_raw()
+ elif media.queue_id and media.queue_item_id:
+ # regular queue stream request
input_format = AIRPLAY_PCM_FORMAT
audio_source = self.mass.streams.get_flow_stream(
- queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT
+ queue=self.mass.player_queues.get(media.queue_id),
+ start_queue_item=self.mass.player_queues.get_item(
+ media.queue_id, media.queue_item_id
+ ),
+ pcm_format=AIRPLAY_PCM_FORMAT,
+ )
+ else:
+ # assume url or some other direct path
+ # NOTE: this will fail if its an uri not playable by ffmpeg
+ input_format = AIRPLAY_PCM_FORMAT
+ audio_source = get_ffmpeg_stream(
+ audio_input=media.uri,
+ input_format=AudioFormat(ContentType.try_parse(media.uri)),
+ output_format=AIRPLAY_PCM_FORMAT,
)
self.mass.create_task(self._handle_stream_audio, player_id, audio_source, input_format)
active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
if active_queue.state == PlayerState.PLAYING:
# playback needs to be restarted to form a new multi client stream session
- def resync() -> None:
- self._resync_handle = None
- self.mass.create_task(
- self.mass.player_queues.resume(active_queue.queue_id, fade_in=False)
- )
-
# this could potentially be called by multiple players at the exact same time
# so we debounce the resync a bit here with a timer
- if self._resync_handle:
- self._resync_handle.cancel()
- self._resync_handle = self.mass.loop.call_later(0.5, resync)
+ self.mass.call_later(
+ 1,
+ self.mass.player_queues.resume(active_queue.queue_id, fade_in=False),
+ task_id=f"resume_{active_queue.queue_id}",
+ )
else:
# make sure that the player manager gets an update
self.mass.players.update(child_player.player_id, skip_forward=True)
import threading
import time
from dataclasses import dataclass
-from logging import Logger
from typing import TYPE_CHECKING, Any
from uuid import UUID
)
from music_assistant.common.models.enums import (
ConfigEntryType,
- ContentType,
MediaType,
PlayerFeature,
PlayerState,
PlayerType,
)
from music_assistant.common.models.errors import PlayerUnavailableError
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.constants import (
CONF_CROSSFADE,
CONF_FLOW_MODE,
from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
cast_info: ChromecastInfo
cc: pychromecast.Chromecast
player: Player
- logger: Logger
status_listener: CastStatusListener | None = None
mz_controller: MultizoneController | None = None
active_group: str | None = None
async def play_media(
self,
player_id: str,
- queue_item: QueueItem,
+ media: PlayerMedia,
) -> None:
"""Handle PLAY MEDIA on given player."""
castplayer = self.castplayers[player_id]
- use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
- url = self.mass.streams.resolve_stream_url(
- player_id,
- queue_item=queue_item,
- output_codec=ContentType.FLAC,
- flow_mode=use_flow_mode,
- )
+ is_flow_mode = "/flow/" in media.uri
queuedata = {
"type": "LOAD",
- "media": self._create_cc_media_item(queue_item, url),
+ "media": self._create_cc_media_item(media),
}
-
# make sure that the media controller app is launched
- app_id = ALT_APP_ID if use_flow_mode else DEFAULT_APP_ID
+ app_id = ALT_APP_ID if is_flow_mode else DEFAULT_APP_ID
await self._launch_app(castplayer, app_id)
# send queue info to the CC
media_controller = castplayer.cc.media_controller
await asyncio.to_thread(media_controller.send_message, data=queuedata, inc_session_id=True)
- async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
- """Handle enqueuing of the next queue item on the player."""
+ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
+ """Handle enqueuing of the next item on the player."""
castplayer = self.castplayers[player_id]
- if isinstance(queue_item, str):
- url = self.mass.streams.get_command_url(queue_item, "next")
- queue_item = None
- else:
- url = self.mass.streams.resolve_stream_url(
- player_id,
- queue_item=queue_item,
- output_codec=ContentType.FLAC,
- )
next_item_id = None
status = castplayer.cc.media_controller.status
# lookup position of current track in cast queue
continue
next_item_id = item["itemId"]
# check if the next queue item isn't already queued
- if item.get("media", {}).get("customData", {}).get("uri") == url:
+ if item.get("media", {}).get("customData", {}).get("uri") == media.uri:
return
queuedata = {
"type": "QUEUE_INSERT",
"autoplay": True,
"startTime": 0,
"preloadTime": 0,
- "media": self._create_cc_media_item(queue_item, url),
+ "media": self._create_cc_media_item(media),
}
],
}
self.mass.create_task(media_controller.send_message, data=queuedata, inc_session_id=True)
self.logger.debug(
"Enqued next track (%s) to player %s",
- queue_item.name if queue_item else url,
+ media.title or media.uri,
castplayer.player.display_name,
)
supports_24bit=not cast_info.is_audio_group,
enabled_by_default=enabled_by_default,
),
- logger=self.logger.getChild(cast_info.friendly_name),
)
self.castplayers[player_id] = castplayer
"""Handle updated CastStatus."""
if status is None:
return # guard
- castplayer.logger.debug(
- "Received cast status - app_id: %s - volume: %s",
+ self.logger.log(
+ VERBOSE_LOG_LEVEL,
+ "Received cast status for %s - app_id: %s - volume: %s",
+ castplayer.player.display_name,
status.app_id,
status.volume_level,
)
def on_new_media_status(self, castplayer: CastPlayer, status: MediaStatus) -> None:
"""Handle updated MediaStatus."""
- castplayer.logger.debug("Received media status update: %s", status.player_state)
+ self.logger.log(
+ VERBOSE_LOG_LEVEL,
+ "Received media status for %s update: %s",
+ castplayer.player.display_name,
+ status.player_state,
+ )
# player state
castplayer.player.elapsed_time_last_updated = time.time()
if status.player_is_playing:
def on_new_connection_status(self, castplayer: CastPlayer, status: ConnectionStatus) -> None:
"""Handle updated ConnectionStatus."""
- castplayer.logger.debug("Received connection status update - status: %s", status.status)
+ self.logger.log(
+ VERBOSE_LOG_LEVEL,
+ "Received connection status update for %s - status: %s",
+ castplayer.player.display_name,
+ status.status,
+ )
if status.status == CONNECTION_STATUS_DISCONNECTED:
castplayer.player.available = False
# Quit the previous app before starting splash screen or media player
if castplayer.cc.app_id is not None:
castplayer.cc.quit_app()
- castplayer.logger.debug("Launching App %s.", app_id)
+ self.logger.debug("Launching App %s.", app_id)
castplayer.cc.socket_client.receiver_controller.launch_app(
app_id,
force_launch=True,
async def _disconnect_chromecast(self, castplayer: CastPlayer) -> None:
"""Disconnect Chromecast object if it is set."""
- castplayer.logger.debug("Disconnecting from chromecast socket")
+ self.logger.debug("Disconnecting from chromecast socket %s", castplayer.player.display_name)
await self.mass.loop.run_in_executor(None, castplayer.cc.disconnect, 10)
castplayer.mz_controller = None
castplayer.status_listener.invalidate()
castplayer.status_listener = None
self.castplayers.pop(castplayer.player_id, None)
- def _create_cc_media_item(self, queue_item: QueueItem, stream_url: str) -> dict[str, Any]:
- """Create CC media item from MA QueueItem."""
- duration = int(queue_item.duration) if queue_item.duration else None
- image_url = self.mass.metadata.get_image_url(queue_item.image) if queue_item.image else ""
- if queue_item.media_type == MediaType.TRACK and queue_item.media_item:
+ def _create_cc_media_item(self, media: PlayerMedia) -> dict[str, Any]:
+ """Create CC media item from MA PlayerMedia."""
+ if media.media_type == MediaType.TRACK:
stream_type = STREAM_TYPE_BUFFERED
- metadata = {
- "metadataType": 3,
- "albumName": (
- queue_item.media_item.album.name if queue_item.media_item.album else ""
- ),
- "songName": queue_item.media_item.name,
- "artist": (
- queue_item.media_item.artists[0].name if queue_item.media_item.artists else ""
- ),
- "title": queue_item.media_item.name,
- "images": [{"url": image_url}] if image_url else None,
- }
- elif queue_item.streamdetails and queue_item.streamdetails.stream_title:
- stream_type = STREAM_TYPE_LIVE
- metadata = {
- "metadataType": 3,
- "songName": queue_item.streamdetails.stream_title.split(" - ")[-1],
- "artist": queue_item.streamdetails.stream_title.split(" - ")[0],
- "albumName": queue_item.name,
- "images": [{"url": image_url}] if image_url else None,
- "title": queue_item.streamdetails.stream_title.split(" - ")[-1],
- }
else:
stream_type = STREAM_TYPE_LIVE
- metadata = {
- "metadataType": 0,
- "title": queue_item.name,
- "images": [{"url": image_url}] if image_url else None,
- }
+ metadata = {
+ "metadataType": 3,
+ "albumName": media.album or "",
+ "songName": media.title or "",
+ "artist": media.title or "",
+ "title": media.title or "",
+ "images": [{"url": media.image_url}] if media.image_url else None,
+ }
return {
- "contentId": stream_url,
+ "contentId": media.uri,
"customData": {
- "uri": queue_item.uri,
- "queue_item_id": queue_item.queue_item_id,
+ "uri": media.uri,
+ "queue_item_id": media.uri,
"deviceName": "Music Assistant",
},
"contentType": "audio/flac",
"streamType": stream_type,
"metadata": metadata,
- "duration": duration,
+ "duration": media.duration,
}
async def update_flow_metadata(self, castplayer: CastPlayer) -> None:
media_controller = castplayer.cc.media_controller
# update metadata of current item chromecast
if media_controller.status.media_custom_data["queue_item_id"] != current_item.queue_item_id:
- cc_item = self._create_cc_media_item(current_item, "")
+ image_url = self.mass.metadata.get_image_url(current_item.image)
queuedata = {
"type": "PLAY",
"mediaSessionId": media_controller.status.media_session_id,
"customData": {
- "metadata": cc_item["metadata"],
+ "metadata": {
+ "metadataType": 3,
+ "albumName": album.name
+ if (album := getattr(current_item.media_item, "album", None))
+ else "",
+ "songName": current_item.media_item.name,
+ "artist": getattr(current_item.media_item, "artist_str", ""),
+ "title": current_item.media_item.name,
+ "images": [{"url": image_url}] if image_url else None,
+ }
},
}
self.mass.create_task(
)
from music_assistant.common.models.enums import (
ConfigEntryType,
- ContentType,
PlayerFeature,
PlayerState,
PlayerType,
)
from music_assistant.common.models.errors import PlayerUnavailableError
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.constants import (
CONF_CROSSFADE,
CONF_ENFORCE_MP3,
from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
await dlna_player.device.async_play()
@catch_request_errors
- async def play_media(
- self,
- player_id: str,
- queue_item: QueueItem,
- ) -> None:
+ async def play_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player."""
- use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
- enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
- url = self.mass.streams.resolve_stream_url(
- player_id,
- queue_item=queue_item,
- output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
- flow_mode=use_flow_mode,
- )
+ if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False):
+ media.uri = media.uri.replace(".flac", ".mp3")
dlna_player = self.dlnaplayers[player_id]
# always clear queue (by sending stop) first
if dlna_player.device.can_stop:
await self.cmd_stop(player_id)
- didl_metadata = create_didl_metadata(self.mass, url, queue_item)
- title = queue_item.name if queue_item else "Music Assistant"
- await dlna_player.device.async_set_transport_uri(url, title, didl_metadata)
+ didl_metadata = create_didl_metadata(media)
+ title = media.title or media.uri
+ await dlna_player.device.async_set_transport_uri(media.uri, title, didl_metadata)
# Play it
await dlna_player.device.async_wait_for_can_play(10)
# optimistically set this timestamp to help in case of a player
await self.poll_player(dlna_player.udn)
@catch_request_errors
- async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
+ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle enqueuing of the next queue item on the player."""
dlna_player = self.dlnaplayers[player_id]
- url = self.mass.streams.resolve_stream_url(
- player_id,
- queue_item=queue_item,
- output_codec=ContentType.FLAC,
- )
- didl_metadata = create_didl_metadata(self.mass, url, queue_item)
- title = queue_item.name
- await dlna_player.device.async_set_next_transport_uri(url, title, didl_metadata)
+ if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False):
+ media.uri = media.uri.replace(".flac", ".mp3")
+ didl_metadata = create_didl_metadata(media)
+ title = media.title or media.uri
+ await dlna_player.device.async_set_next_transport_uri(media.uri, title, didl_metadata)
self.logger.debug(
"Enqued next track (%s) to player %s",
title,
)
from music_assistant.common.models.enums import (
ConfigEntryType,
- ContentType,
PlayerFeature,
PlayerState,
PlayerType,
)
from music_assistant.common.models.errors import PlayerUnavailableError, SetupFailedError
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.constants import CONF_IP_ADDRESS, CONF_PASSWORD, CONF_PORT
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
async def play_media(
self,
player_id: str,
- queue_item: QueueItem,
+ media: PlayerMedia,
) -> None:
"""Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
- enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
- url = self.mass.streams.resolve_stream_url(
- player_id,
- queue_item=queue_item,
- output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
- flow_mode=True,
- )
- await self._fully.playSound(url, AUDIOMANAGER_STREAM_MUSIC)
- player.current_item_id = queue_item.queue_id
+ if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False):
+ media.uri = media.uri.replace(".flac", ".mp3")
+ await self._fully.playSound(media.uri, AUDIOMANAGER_STREAM_MUSIC)
+ player.current_media = media
player.elapsed_time = 0
player.elapsed_time_last_updated = time.time()
player.state = PlayerState.PLAYING
)
from music_assistant.common.models.enums import (
ConfigEntryType,
- ContentType,
PlayerFeature,
PlayerState,
PlayerType,
)
from music_assistant.common.models.errors import SetupFailedError
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE
from music_assistant.server.models.player_provider import PlayerProvider
from music_assistant.server.providers.hass import DOMAIN as HASS_DOMAIN
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
from music_assistant.server.providers.hass import HomeAssistant as HomeAssistantProvider
target={"entity_id": player_id},
)
- async def play_media(
- self,
- player_id: str,
- queue_item: QueueItem,
- ) -> None:
+ async def play_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player."""
- use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
- enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
- url = self.mass.streams.resolve_stream_url(
- player_id,
- queue_item=queue_item,
- output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
- flow_mode=use_flow_mode,
- )
+ if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False):
+ media.uri = media.uri.replace(".flac", ".mp3")
await self.hass_prov.hass.call_service(
domain="media_player",
service="play_media",
service_data={
- "media_content_id": url,
+ "media_content_id": media.uri,
"media_content_type": "music",
"enqueue": "replace",
},
player.elapsed_time = 0
player.elapsed_time_last_updated = time.time()
- async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
- """
- Handle enqueuing of the next queue item on the player.
-
- Only called if the player supports PlayerFeature.ENQUE_NEXT.
- Called about 1 second after a new track started playing.
- Called about 15 seconds before the end of the current track.
-
- A PlayerProvider implementation is in itself responsible for handling this
- so that the queue items keep playing until its empty or the player stopped.
-
- This will NOT be called if the end of the queue is reached (and repeat disabled).
- This will NOT be called if the player is using flow mode to playback the queue.
- """
- url = self.mass.streams.resolve_stream_url(
- player_id,
- queue_item=queue_item,
- output_codec=ContentType.FLAC,
- )
+ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
+ """Handle enqueuing of the next queue item on the player."""
+ if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False):
+ media.uri = media.uri.replace(".flac", ".mp3")
await self.hass_prov.hass.call_service(
domain="media_player",
service="play_media",
service_data={
- "media_content_id": url,
+ "media_content_id": media.uri,
"media_content_type": "music",
"enqueue": "next",
},
from dataclasses import dataclass
from typing import TYPE_CHECKING
+from aiohttp import web
from aioslimproto.client import PlayerState as SlimPlayerState
from aioslimproto.client import SlimClient
from aioslimproto.client import TransitionType as SlimTransition
RepeatMode,
)
from music_assistant.common.models.errors import MusicAssistantError, SetupFailedError
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.constants import (
CONF_CROSSFADE,
CONF_CROSSFADE_DURATION,
CONF_ENFORCE_MP3,
CONF_PORT,
CONF_SYNC_ADJUST,
- MASS_LOGO_ONLINE,
VERBOSE_LOG_LEVEL,
)
+from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
+from music_assistant.server.helpers.multi_client_stream import MultiClientStream
from music_assistant.server.models.player_provider import PlayerProvider
+from music_assistant.server.providers.ugp import UniversalGroupProvider
if TYPE_CHECKING:
from aioslimproto.models import SlimEvent
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
# sync constants
MIN_DEVIATION_ADJUST = 8 # 5 milliseconds
MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements
-DEVIATION_JUMP_IGNORE = 5000 # ignore a sudden unrealistic jump
+DEVIATION_JUMP_IGNORE = 500 # ignore a sudden unrealistic jump
MAX_SKIP_AHEAD_MS = 800 # 0.8 seconds
"""Simple structure to describe a Sync Playpoint."""
timestamp: float
- sync_job_id: str
+ sync_master: str
diff: int
slimproto: SlimServer
_sync_playpoints: dict[str, deque[SyncPlayPoint]]
_do_not_resync_before: dict[str, float]
+ _multi_streams: dict[str, MultiClientStream]
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS,)
+ return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE)
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self._sync_playpoints = {}
self._do_not_resync_before = {}
- self._resync_handle: asyncio.TimerHandle | None = None
+ self._multi_streams = {}
control_port = self.config.get_value(CONF_PORT)
telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT)
json_port = self.config.get_value(CONF_CLI_JSON_PORT)
try:
await self.slimproto.start()
except OSError as err:
- msg = f"Unable to start the Slimproto server - is port {control_port} already taken ?"
- raise SetupFailedError(msg) from err
+ raise SetupFailedError(
+ "Unable to start the Slimproto server - "
+ "is one of the required TCP ports already taken ?"
+ ) from err
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
self.slimproto.subscribe(self._client_callback)
+ self.mass.streams.register_dynamic_route(
+ "/slimproto/multi", self._serve_multi_client_stream
+ )
async def unload(self) -> None:
"""Handle close/cleanup of the provider."""
+ self.mass.streams.unregister_dynamic_route("/slimproto/multi")
await self.slimproto.stop()
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
async def play_media(
self,
player_id: str,
- queue_item: QueueItem,
+ media: PlayerMedia,
) -> None:
"""Handle PLAY MEDIA on given player."""
- # fix race condition where resync and play media are called at more or less the same time
- if self._resync_handle:
- self._resync_handle.cancel()
- self._resync_handle = None
player = self.mass.players.get(player_id)
if player.synced_to:
msg = "A synced player cannot receive play commands directly"
raise RuntimeError(msg)
- if player.group_childs and queue_item.media_type != MediaType.ANNOUNCEMENT:
- # player has sync members, we need to start a (multi-player) stream job
- # to make sure that all clients receive the exact same audio
- stream_job = self.mass.streams.create_multi_client_stream_job(
- queue_id=queue_item.queue_id,
- start_queue_item=queue_item,
+ if not player.group_childs:
+ slimplayer = self.slimproto.get_player(player_id)
+ # simple, single-player playback
+ await self._handle_play_url(
+ slimplayer,
+ url=media.uri,
+ media=media,
+ send_flush=True,
+ auto_play=False,
+ )
+ return
+
+ # this is a syncgroup, we need to handle this with a multi client stream
+ master_audio_format = AudioFormat(
+ content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
+ )
+ if media.media_type == MediaType.ANNOUNCEMENT:
+ # special case: stream announcement
+ audio_source = self.mass.streams.get_announcement_stream(
+ media.custom_data["url"],
+ output_format=master_audio_format,
+ use_pre_announce=media.custom_data["use_pre_announce"],
+ )
+ elif media.queue_id.startswith("ugp_"):
+ # special case: UGP stream
+ ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
+ ugp_stream = ugp_provider.streams[media.queue_id]
+ audio_source = ugp_stream.subscribe_raw()
+ elif media.queue_id and media.queue_item_id:
+ # regular queue stream request
+ audio_source = self.mass.streams.get_flow_stream(
+ queue=self.mass.player_queues.get(media.queue_id),
+ start_queue_item=self.mass.player_queues.get_item(
+ media.queue_id, media.queue_item_id
+ ),
+ pcm_format=master_audio_format,
)
else:
- stream_job = None
- # forward command to player and any connected sync members
+ # assume url or some other direct path
+ # NOTE: this will fail if its an uri not playable by ffmpeg
+ audio_source = get_ffmpeg_stream(
+ audio_input=media.uri,
+ input_format=AudioFormat(ContentType.try_parse(media.uri)),
+ output_format=master_audio_format,
+ )
+ # start the stream task
+ self._multi_streams[player_id] = stream = MultiClientStream(
+ audio_source=audio_source, audio_format=master_audio_format
+ )
+ base_url = f"{self.mass.streams.base_url}/slimproto/multi?player_id={player_id}&fmt=flac"
+
+ # forward to downstream play_media commands
async with asyncio.TaskGroup() as tg:
for slimplayer in self._get_sync_clients(player_id):
- enforce_mp3 = await self.mass.config.get_player_config_value(
- slimplayer.player_id, CONF_ENFORCE_MP3
- )
+ url = f"{base_url}&child_player_id={slimplayer.player_id}"
+ if self.mass.config.get_raw_player_config_value(
+ slimplayer.player_id, CONF_ENFORCE_MP3, False
+ ):
+ url = url.replace("flac", "mp3")
+ stream.expected_clients += 1
tg.create_task(
self._handle_play_url(
slimplayer,
- url=stream_job.resolve_stream_url(
- slimplayer.player_id,
- output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
- )
- if stream_job
- else self.mass.streams.resolve_stream_url(
- slimplayer.player_id,
- queue_item=queue_item,
- output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
- ),
- queue_item=None,
+ url=url,
+ media=media,
send_flush=True,
- auto_play=stream_job is None,
+ auto_play=False,
)
)
- async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
+ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle enqueuing of the next queue item on the player."""
if not (slimplayer := self.slimproto.get_player(player_id)):
return
- enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
- url = self.mass.streams.resolve_stream_url(
- player_id,
- queue_item=queue_item,
- output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
- flow_mode=False,
- )
+ url = media.uri
+ if self.mass.config.get_raw_player_config_value(
+ slimplayer.player_id, CONF_ENFORCE_MP3, False
+ ):
+ url = url.replace("flac", "mp3")
+
await self._handle_play_url(
slimplayer,
url=url,
- queue_item=queue_item,
+ media=media,
enqueue=True,
send_flush=False,
auto_play=True,
self,
slimplayer: SlimClient,
url: str,
- queue_item: QueueItem | None,
+ media: PlayerMedia,
enqueue: bool = False,
send_flush: bool = True,
auto_play: bool = False,
else:
transition_duration = 0
- if queue_item and queue_item.media_item:
- album = getattr(queue_item.media_item, "album", None)
- metadata = {
- "item_id": queue_item.queue_item_id,
- "title": queue_item.media_item.name,
- "album": album.name if album else "",
- "artist": getattr(queue_item.media_item, "artist_str", "Music Assistant"),
- "image_url": self.mass.metadata.get_image_url(
- queue_item.image,
- size=512,
- prefer_proxy=True,
- )
- if queue_item.image
- else MASS_LOGO_ONLINE,
- "duration": queue_item.duration,
- }
- elif queue_item:
- metadata = {
- "item_id": queue_item.queue_item_id,
- "title": queue_item.name,
- "artist": "Music Assistant",
- "image_url": self.mass.metadata.get_image_url(
- queue_item.image,
- size=512,
- prefer_proxy=True,
- )
- if queue_item.image
- else MASS_LOGO_ONLINE,
- "duration": queue_item.duration,
- }
- else:
- metadata = {
- "item_id": "flow",
- "title": "Music Assistant",
- "image_url": MASS_LOGO_ONLINE,
- }
- queue = self.mass.player_queues.get(queue_item.queue_id if queue_item else player_id)
+ metadata = {
+ "item_id": media.queue_item_id or media.uri,
+ "title": media.title,
+ "album": media.album,
+ "artist": media.artist,
+ "image_url": media.image_url,
+ "duration": media.duration,
+ }
+ queue = self.mass.player_queues.get(media.queue_id or player_id)
slimplayer.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode]
slimplayer.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
# slimplayer.extra_data["can_seek"] = 1 if queue_item else 0
parent_player = self.mass.players.get(target_player)
assert parent_player # guard
if parent_player.synced_to:
- raise RuntimeError("Player is already synced")
+ raise RuntimeError("Parent player is already synced!")
if child_player.synced_to and child_player.synced_to != target_player:
raise RuntimeError("Player is already synced to another player")
# always make sure that the parent player is part of the sync group
# check if we should (re)start or join a stream session
active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
if active_queue.state == PlayerState.PLAYING:
- # playback needs to be restarted to form a new multi slimplayer stream session
- def resync() -> None:
- self._resync_handle = None
- self.mass.create_task(
- self.mass.player_queues.resume(active_queue.queue_id, fade_in=False)
- )
-
+ # playback needs to be restarted to form a new multi client stream session
# this could potentially be called by multiple players at the exact same time
# so we debounce the resync a bit here with a timer
- if self._resync_handle:
- self._resync_handle.cancel()
- self._resync_handle = self.mass.loop.call_later(0.5, resync)
+ self.mass.call_later(
+ 1,
+ self.mass.player_queues.resume(active_queue.queue_id, fade_in=False),
+ task_id=f"resume_{active_queue.queue_id}",
+ )
else:
# make sure that the player manager gets an update
self.mass.players.update(child_player.player_id, skip_forward=True)
# average lag/drift so we can adjust accordingly
sync_playpoints = self._sync_playpoints[slimplayer.player_id]
- active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
- stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id)
- if not stream_job:
- # should not happen, but just in case
- return
-
now = time.time()
if now < self._do_not_resync_before[slimplayer.player_id]:
return
if last_playpoint and (now - last_playpoint.timestamp) > 10:
# last playpoint is too old, invalidate
sync_playpoints.clear()
- if last_playpoint and last_playpoint.sync_job_id != stream_job.job_id:
- # streamjob has changed, invalidate
+ if last_playpoint and last_playpoint.sync_master != sync_master.player_id:
+ # this should not happen, but just in case
sync_playpoints.clear()
diff = int(
- self._get_corrected_elapsed_milliseconds(slimplayer)
)
- if last_playpoint and abs(last_playpoint.diff - diff) > DEVIATION_JUMP_IGNORE:
- # ignore unexpected spikes
+ # ignore unexpected spikes
+ if (
+ sync_playpoints
+ and abs(statistics.fmean(x.diff for x in sync_playpoints)) > DEVIATION_JUMP_IGNORE
+ ):
return
# we can now append the current playpoint to our list
- sync_playpoints.append(SyncPlayPoint(now, stream_job.job_id, diff))
+ sync_playpoints.append(SyncPlayPoint(now, sync_master.player_id, diff))
min_req_playpoints = 2 if sync_master.elapsed_seconds < 2 else MIN_REQ_PLAYPOINTS
if len(sync_playpoints) < min_req_playpoints:
# Instead just start playback on all players and let the sync logic work out
# the delays etc.
self._do_not_resync_before[_client.player_id] = time.time() + 1
- tg.create_task(_client.unpause_at(0))
+ tg.create_task(_client.pause_for(200))
async def _handle_connected(self, slimplayer: SlimClient) -> None:
"""Handle a slimplayer connected event."""
await slimplayer.configure_display(
visualisation=SlimVisualisationType(visualization), disabled=not display_enabled
)
+
+ async def _serve_multi_client_stream(self, request: web.Request) -> web.Response:
+ """Serve the multi-client flow stream audio to a player."""
+ player_id = request.query.get("player_id")
+ fmt = request.query.get("fmt")
+ child_player_id = request.query.get("child_player_id")
+
+ if not (player := self.mass.players.get(player_id)):
+ raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
+
+ if not (child_player := self.mass.players.get(child_player_id)):
+ raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
+
+ if not (stream := self._multi_streams.get(player_id, None)) or stream.done:
+ raise web.HTTPNotFound(f"There is no active stream for {player_id}!")
+
+ resp = web.StreamResponse(
+ status=200,
+ reason="OK",
+ headers={
+ "Content-Type": f"audio/{fmt}",
+ },
+ )
+ await resp.prepare(request)
+
+ # return early if this is not a GET request
+ if request.method != "GET":
+ return resp
+
+ # all checks passed, start streaming!
+ self.logger.debug(
+ "Start serving multi-client flow audio stream for player %s to %s",
+ player.display_name,
+ child_player.display_name,
+ )
+
+ async for chunk in stream.get_stream(
+ output_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
+ filter_params=get_player_filter_params(self.mass, child_player_id)
+ if child_player_id
+ else None,
+ ):
+ try:
+ await resp.write(chunk)
+ except (BrokenPipeError, ConnectionResetError):
+ # race condition
+ break
+
+ return resp
)
from music_assistant.common.models.errors import SetupFailedError
from music_assistant.common.models.media_items import AudioFormat
-from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.constants import UGP_PREFIX
-from music_assistant.server.helpers.audio import FFMpeg, get_player_filter_params
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
+from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params
from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.models.player_provider import PlayerProvider
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
+ from music_assistant.server.providers.ugp import UniversalGroupProvider
CONF_SERVER_HOST = "snapcast_server_host"
CONF_SERVER_CONTROL_PORT = "snapcast_server_control_port"
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS,)
+ return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE)
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
)
player.synced_to = self._synced_to(player_id)
player.group_childs = self._group_childs(player_id)
- if player.current_item_id and player_id in player.current_item_id:
- player.active_source = player_id
- elif stream := self._get_snapstream(player_id):
- player.active_source = stream.name
+ if player.active_group is None:
+ if stream := self._get_snapstream(player_id):
+ if stream.name.startswith(("MusicAssistant", "default")):
+ player.active_source = player_id
+ else:
+ player.active_source = stream.name
+ else:
+ player.active_source = player_id
self.mass.players.register_or_update(player)
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
await self._get_snapgroup(player_id).set_stream("default")
self._handle_update()
- async def play_media(self, player_id: str, queue_item: QueueItem) -> None:
+ async def play_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
if player.synced_to:
raise RuntimeError(msg)
# stop any existing streams first
await self.cmd_stop(player_id)
- queue = self.mass.player_queues.get(queue_item.queue_id)
stream, port = await self._create_stream()
snap_group = self._get_snapgroup(player_id)
await snap_group.set_stream(stream.identifier)
- if queue_item.media_type == MediaType.ANNOUNCEMENT:
+ # select audio source
+ if media.media_type == MediaType.ANNOUNCEMENT:
# special case: stream announcement
input_format = DEFAULT_SNAPCAST_FORMAT
audio_source = self.mass.streams.get_announcement_stream(
- queue_item.streamdetails.data["url"],
+ media.custom_data["url"],
output_format=DEFAULT_SNAPCAST_FORMAT,
- use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
+ use_pre_announce=media.custom_data["use_pre_announce"],
)
- elif queue_item.queue_id.startswith(UGP_PREFIX):
- # special case: we got forwarded a request from the UGP
- # use the existing stream job that was already created by UGP
- stream_job = self.mass.streams.multi_client_jobs[queue_item.queue_id]
- stream_job.expected_players.add(player_id)
- input_format = stream_job.pcm_format
- audio_source = stream_job.subscribe(player_id)
- else:
- queue = self.mass.player_queues.get(queue_item.queue_id)
+ elif media.queue_id.startswith("ugp_"):
+ # special case: UGP stream
+ ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
+ ugp_stream = ugp_provider.streams[media.queue_id]
+ input_format = ugp_stream.audio_format
+ audio_source = ugp_stream.subscribe_raw()
+ elif media.queue_id and media.queue_item_id:
+ # regular queue stream request
input_format = DEFAULT_SNAPCAST_FORMAT
audio_source = self.mass.streams.get_flow_stream(
- queue, start_queue_item=queue_item, pcm_format=DEFAULT_SNAPCAST_FORMAT
+ queue=self.mass.player_queues.get(media.queue_id),
+ start_queue_item=self.mass.player_queues.get_item(
+ media.queue_id, media.queue_item_id
+ ),
+ pcm_format=DEFAULT_SNAPCAST_FORMAT,
+ )
+ else:
+ # assume url or some other direct path
+ # NOTE: this will fail if its an uri not playable by ffmpeg
+ input_format = DEFAULT_SNAPCAST_FORMAT
+ audio_source = get_ffmpeg_stream(
+ audio_input=media.uri,
+ input_format=AudioFormat(ContentType.try_parse(media.uri)),
+ output_format=DEFAULT_SNAPCAST_FORMAT,
)
async def _streamer() -> None:
def stream_callback(_stream) -> None:
player.state = PlayerState(_stream.status)
if player.state == PlayerState.PLAYING:
- player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
+ player.current_media = media
player.elapsed_time = 0
player.elapsed_time_last_updated = time.time()
self.mass.players.update(player_id)
)
from music_assistant.common.models.enums import (
ConfigEntryType,
- ContentType,
PlayerFeature,
PlayerType,
ProviderFeature,
)
from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.constants import CONF_CROSSFADE, SYNCGROUP_PREFIX, VERBOSE_LOG_LEVEL
from music_assistant.server.helpers.didl_lite import create_didl_metadata
from music_assistant.server.models.player_provider import PlayerProvider
from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS,)
+ return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE)
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
async def play_media(
self,
player_id: str,
- queue_item: QueueItem,
+ media: PlayerMedia,
) -> None:
"""Handle PLAY MEDIA on given player."""
sonos_player = self.sonosplayers[player_id]
)
raise PlayerCommandFailed(msg)
- url = self.mass.streams.resolve_stream_url(
- player_id,
- queue_item=queue_item,
- output_codec=ContentType.FLAC,
- )
- self.mass.create_task(
- sonos_player.soco.play_uri, url, meta=create_didl_metadata(self.mass, url, queue_item)
- )
-
- async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
- """
- Handle enqueuing of the next queue item on the player.
-
- If the player supports PlayerFeature.ENQUE_NEXT:
- This will be called about 10 seconds before the end of the track.
- If the player does NOT report support for PlayerFeature.ENQUE_NEXT:
- This will be called when the end of the track is reached.
+ didl_metadata = create_didl_metadata(media)
+ self.mass.create_task(sonos_player.soco.play_uri, media.uri, meta=didl_metadata)
- A PlayerProvider implementation is in itself responsible for handling this
- so that the queue items keep playing until its empty or the player stopped.
-
- This will NOT be called if the end of the queue is reached (and repeat disabled).
- This will NOT be called if flow mode is enabled on the queue.
- """
+ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
+ """Handle enqueuing of the next queue item on the player."""
sonos_player = self.sonosplayers[player_id]
- url = self.mass.streams.resolve_stream_url(
- player_id,
- queue_item=queue_item,
- output_codec=ContentType.FLAC,
- )
+ didl_metadata = create_didl_metadata(media)
# set crossfade according to player setting
crossfade = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
if sonos_player.crossfade != crossfade:
await asyncio.to_thread(set_crossfade)
- await self._enqueue_item(sonos_player, url=url, queue_item=queue_item)
+ try:
+ await asyncio.to_thread(
+ sonos_player.soco.avTransport.SetNextAVTransportURI,
+ [("InstanceID", 0), ("NextURI", media.uri), ("NextURIMetaData", didl_metadata)],
+ timeout=60,
+ )
+ except Exception as err:
+ self.logger.warning(
+ "Unable to enqueue next track on player: %s: %s", sonos_player.zone_name, err
+ )
+ else:
+ self.logger.debug(
+ "Enqued next track (%s) to player %s",
+ media.title or media.uri,
+ sonos_player.soco.player_name,
+ )
async def play_announcement(
- self, player_id: str, announcement: QueueItem, volume_level: int | None = None
+ self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None
) -> None:
"""Handle (provider native) playback of an announcement on given player."""
if player_id.startswith(SYNCGROUP_PREFIX):
self.play_announcement(child_player_id, announcement, volume_level)
)
return
- announcement_url = self.mass.streams.resolve_stream_url(
- player_id, announcement, ContentType.MP3
- )
sonos_player = self.sonosplayers[player_id]
self.logger.debug(
"Playing announcement %s using websocket audioclip on %s",
- announcement_url,
+ announcement.uri,
sonos_player.zone_name,
)
volume_level = self.mass.players.get_announcement_volume(player_id, volume_level)
try:
response, _ = await sonos_player.websocket.play_clip(
- announcement_url,
+ announcement.uri,
volume=volume_level,
)
except SonosWebsocketError as exc:
self.mass.loop.call_soon_threadsafe(
self.mass.players.register_or_update, sonos_player.mass_player
)
-
- async def _enqueue_item(
- self,
- sonos_player: SonosPlayer,
- url: str,
- queue_item: QueueItem | None,
- ) -> None:
- """Enqueue a queue item to the Sonos player Queue."""
- metadata = create_didl_metadata(self.mass, url, queue_item)
- try:
- await asyncio.to_thread(
- sonos_player.soco.avTransport.SetNextAVTransportURI,
- [("InstanceID", 0), ("NextURI", url), ("NextURIMetaData", metadata)],
- timeout=60,
- )
- except Exception as err:
- self.logger.warning(
- "Unable to enqueue next track on player: %s: %s", sonos_player.zone_name, err
- )
- else:
- self.logger.debug(
- "Enqued next track (%s) to player %s",
- queue_item.name if queue_item else url,
- sonos_player.soco.player_name,
- )
"""Handle callback for topology change event."""
if xml := event.variables.get("zone_group_state"):
zgs = ET.fromstring(xml)
- vanished_devices = zgs.find("VanishedDevices") or []
- for vanished_device in vanished_devices:
- if (reason := vanished_device.get("Reason")) not in SUPPORTED_VANISH_REASONS:
- self.logger.debug(
- "Ignoring %s marked %s as vanished with reason: %s",
- self.zone_name,
- vanished_device.get("ZoneName"),
- reason,
- )
- continue
- self.mass.create_task(self._vanished(reason))
+ if vanished_devices := zgs.find("VanishedDevices"):
+ for vanished_device in vanished_devices:
+ if (reason := vanished_device.get("Reason")) not in SUPPORTED_VANISH_REASONS:
+ self.logger.debug(
+ "Ignoring %s marked %s as vanished with reason: %s",
+ self.zone_name,
+ vanished_device.get("ZoneName"),
+ reason,
+ )
+ continue
+ self.mass.create_task(self._vanished(reason))
if "zone_player_uui_ds_in_group" not in event.variables:
return
from __future__ import annotations
import asyncio
+from time import time
from typing import TYPE_CHECKING
import shortuuid
+from aiohttp import web
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE_DURATION,
)
from music_assistant.common.models.enums import (
ConfigEntryType,
+ ContentType,
MediaType,
PlayerFeature,
PlayerState,
PlayerType,
ProviderFeature,
)
-from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import (
- CONF_CROSSFADE,
- CONF_GROUP_MEMBERS,
- SYNCGROUP_PREFIX,
- UGP_PREFIX,
-)
+from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
+from music_assistant.constants import CONF_CROSSFADE, CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX
+from music_assistant.server.controllers.streams import DEFAULT_STREAM_HEADERS
+from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
+from music_assistant.server.helpers.multi_client_stream import MultiClientStream
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
# ruff: noqa: ARG002
+UGP_FORMAT = AudioFormat(
+ content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
+)
+
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
class UniversalGroupProvider(PlayerProvider):
"""Base/builtin provider for universally grouping players."""
- prev_sync_leaders: dict[str, tuple[str]] | None = None
- debounce_id: str | None = None
-
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
) -> None:
"""Initialize MusicProvider."""
super().__init__(mass, manifest, config)
- self.prev_sync_leaders = {}
+ self._registered_routes: set[str] = set()
+ self.streams: dict[str, MultiClientStream] = {}
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
await self._register_all_players()
+ async def unload(self) -> None:
+ """
+ Handle unload/close of the provider.
+
+ Called when provider is deregistered (e.g. MA exiting or config reloading).
+ """
+ for route_path in list(self._registered_routes):
+ self._registered_routes.remove(route_path)
+ self.mass.streams.unregister_dynamic_route(route_path)
+
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
base_entries = await super().get_player_config_entries(player_id)
"""Send STOP command to given player."""
group_player = self.mass.players.get(player_id)
group_player.state = PlayerState.IDLE
+ self.mass.players.update(player_id)
# forward command to player and any connected sync child's
async with asyncio.TaskGroup() as tg:
for member in self.mass.players.iter_group_members(group_player, only_powered=True):
if member.state == PlayerState.IDLE:
continue
tg.create_task(self.mass.players.cmd_stop(member.player_id))
- if existing := self.mass.streams.multi_client_jobs.pop(player_id, None):
- existing.stop()
+ if (stream := self.streams.pop(player_id, None)) and not stream.done:
+ await stream.stop()
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY command to given player."""
- async def cmd_pause(self, player_id: str) -> None:
- """Send PAUSE command to given player."""
-
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player."""
await self.mass.players.cmd_group_power(player_id, powered)
"""Send VOLUME_SET command to given player."""
# group volume is already handled in the player manager
- async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
- """Send VOLUME MUTE command to given player."""
-
async def play_media(
self,
player_id: str,
- queue_item: QueueItem,
+ media: PlayerMedia,
) -> None:
"""Handle PLAY MEDIA on given player."""
# power ON
await self.cmd_power(player_id, True)
group_player = self.mass.players.get(player_id)
+ # stop any existing stream first
+ if (existing := self.streams.pop(player_id, None)) and not existing.done:
+ existing.task.cancel()
+
+ # select audio source
+ if media.media_type == MediaType.ANNOUNCEMENT:
+ # special case: stream announcement
+ audio_source = self.mass.streams.get_announcement_stream(
+ media.custom_data["url"],
+ output_format=UGP_FORMAT,
+ use_pre_announce=media.custom_data["use_pre_announce"],
+ )
+ elif media.queue_id and media.queue_item_id:
+ # regular queue stream request
+ audio_source = self.mass.streams.get_flow_stream(
+ queue=self.mass.player_queues.get(media.queue_id),
+ start_queue_item=self.mass.player_queues.get_item(
+ media.queue_id, media.queue_item_id
+ ),
+ pcm_format=UGP_FORMAT,
+ )
+ else:
+ # assume url or some other direct path
+ # NOTE: this will fail if its an uri not playable by ffmpeg
+ audio_source = get_ffmpeg_stream(
+ audio_input=media.uri,
+ input_format=AudioFormat(ContentType.try_parse(media.uri)),
+ output_format=UGP_FORMAT,
+ )
- # create a multi-client stream job - all (direct) child's of this UGP group
- # will subscribe to this multi client queue stream
- queue = self.mass.player_queues.get(player_id)
- stream_job = self.mass.streams.create_multi_client_stream_job(
- queue.queue_id,
- start_queue_item=queue_item,
+ # start the stream task
+ self.streams[player_id] = MultiClientStream(
+ audio_source=audio_source, audio_format=UGP_FORMAT
)
- # create a fake queue item to forward to downstream play_media commands
- ugp_queue_item = QueueItem(
- player_id,
- queue_item_id=stream_job.job_id,
- name="Music Assistant",
- duration=None,
- )
- # special case: handle announcement sent to this UGP
- # we just forward this as-is downstream and let all child players handle this themselves
- if queue_item.media_type == MediaType.ANNOUNCEMENT:
- ugp_queue_item = queue_item
+ base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac"
- # forward the stream job to all group members
+ # forward to downstream play_media commands
async with asyncio.TaskGroup() as tg:
for member in self.mass.players.iter_group_members(group_player, only_powered=True):
- player_prov = self.mass.players.get_player_provider(member.player_id)
if member.player_id.startswith(SYNCGROUP_PREFIX):
member = self.mass.players.get_sync_leader(member) # noqa: PLW2901
if member is None:
continue
- tg.create_task(player_prov.play_media(member.player_id, ugp_queue_item))
-
- async def poll_player(self, player_id: str) -> None:
- """Poll player for state updates."""
- self.update_attributes(player_id)
- self.mass.players.update(player_id, skip_forward=True)
+ tg.create_task(
+ self.mass.players.play_media(
+ member.player_id,
+ media=PlayerMedia(
+ uri=f"{base_url}?player_id={member.player_id}",
+ media_type=MediaType.FLOW_STREAM,
+ title=group_player.display_name,
+ queue_id=group_player.player_id,
+ ),
+ )
+ )
+ # set the state optimistically
+ group_player.elapsed_time = 0
+ group_player.elapsed_time_last_updated = time() - 1
+ group_player.state = PlayerState.PLAYING
+ self.mass.players.update(player_id)
async def create_group(self, name: str, members: list[str]) -> Player:
"""Create new PlayerGroup on this provider.
- name: Name for the new group to create.
- members: A list of player_id's that should be part of this group.
"""
- new_group_id = f"{UGP_PREFIX}{shortuuid.random(8).lower()}"
+ new_group_id = f"{self.domain}_{shortuuid.random(8).lower()}"
# cleanup list, filter groups (should be handled by frontend, but just in case)
members = [
x.player_id
group_childs=set(members),
)
self.mass.players.register_or_update(player)
- return player
-
- def update_attributes(self, player_id: str) -> None:
- """Update player attributes."""
- group_player = self.mass.players.get(player_id)
- if not group_player.powered:
- group_player.state = PlayerState.IDLE
- return
+ # register dynamic routes for the ugp stream (both flac and mp3)
+ for fmt in ("mp3", "flac"):
+ route_path = f"/ugp/{group_player_id}.{fmt}"
+ self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream)
+ self._registered_routes.add(route_path)
- # read the state from the first active group member
- for member in self.mass.players.iter_group_members(group_player, only_powered=True):
- group_player.current_item_id = member.current_item_id
- group_player.elapsed_time = member.elapsed_time
- group_player.elapsed_time_last_updated = member.elapsed_time_last_updated
- group_player.state = member.state
- break
+ return player
def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None:
"""
return False
# if a child player turned ON while the group player is already playing
- # we need to resync/resume
+ # we just direct it to the existing stream (we dont care about the audio being in sync)
if new_power and group_player.state == PlayerState.PLAYING:
- self.logger.warning(
- "Player %s turned on while syncgroup is playing, "
- "a forced resume for %s will be performed...",
- child_player.display_name,
- group_player.display_name,
+ base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac"
+ self.mass.create_task(
+ self.mass.players.play_media(
+ child_player.player_id,
+ media=PlayerMedia(
+ uri=f"{base_url}?player_id={child_player.player_id}",
+ media_type=MediaType.FLOW_STREAM,
+ title=group_player.display_name,
+ queue_id=group_player.player_id,
+ ),
+ )
)
- self.mass.loop.call_later(
- 1,
- self.mass.create_task,
- self.mass.player_queues.resume(group_player.player_id),
- )
- return None
+
return None
+
+ async def _serve_ugp_stream(self, request: web.Request) -> web.Response:
+ """Serve the UGP (multi-client) flow stream audio to a player."""
+ ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1]
+ fmt = request.path.rsplit(".")[-1]
+ child_player_id = request.query.get("player_id") # optional!
+
+ if not (ugp_player := self.mass.players.get(ugp_player_id)):
+ raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}")
+
+ if not (stream := self.streams.get(ugp_player_id, None)) or stream.done:
+ raise web.HTTPNotFound(f"There is no active UGP stream for {ugp_player_id}!")
+
+ resp = web.StreamResponse(
+ status=200,
+ reason="OK",
+ headers={
+ **DEFAULT_STREAM_HEADERS,
+ "Content-Type": f"audio/{fmt}",
+ },
+ )
+ await resp.prepare(request)
+
+ # return early if this is not a GET request
+ if request.method != "GET":
+ return resp
+
+ # all checks passed, start streaming!
+ self.logger.debug(
+ "Start serving UGP flow audio stream for UGP-player %s to %s",
+ ugp_player.display_name,
+ child_player_id or request.remote,
+ )
+
+ async for chunk in stream.get_stream(
+ output_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
+ filter_params=get_player_filter_params(self.mass, child_player_id)
+ if child_player_id
+ else None,
+ ):
+ try:
+ await resp.write(chunk)
+ except (BrokenPipeError, ConnectionResetError):
+ # race condition
+ break
+
+ return resp
self._provider_manifests: dict[str, ProviderManifest] = {}
self._providers: dict[str, ProviderInstanceType] = {}
self._tracked_tasks: dict[str, asyncio.Task] = {}
+ self._tracked_timers: dict[str, asyncio.TimerHandle] = {}
self.closing = False
self.running_as_hass_addon: bool = False
self.version: str = "0.0.0"
task_id: str | None = None,
**kwargs: Any,
) -> asyncio.TimerHandle:
- """Run callable/awaitable after given delay."""
+ """
+ Run callable/awaitable after given delay.
+
+ Use task_id for debouncing.
+ """
+ if not task_id:
+ task_id = uuid4().hex
+
+ if existing := self._tracked_timers.get(task_id):
+ existing.cancel()
def _create_task() -> None:
+ self._tracked_timers.pop(task_id)
self.create_task(target, *args, task_id=task_id, **kwargs)
- return self.loop.call_later(delay, _create_task)
+ handle = self.loop.call_later(delay, _create_task)
+ self._tracked_timers[task_id] = handle
+ return handle
def get_task(self, task_id: str) -> asyncio.Task | asyncio.Future:
"""Get existing scheduled task."""