*.spec
.history
.idea
+.coverage
"""
await self.client.send_command("players/cmd/unsync", player_id=player_id)
+ async def play_announcement(
+ self,
+ player_id: str,
+ url: str,
+ use_pre_announce: bool = False,
+ ) -> None:
+ """Handle playback of an announcement (url) on given player."""
+ await self.client.send_command(
+ "players/cmd/play_announcement",
+ player_id=player_id,
+ url=url,
+ use_pre_announce=use_pre_announce,
+ )
+
# PlayerGroup related endpoints/commands
async def set_player_group_volume(self, player_id: str, volume_level: int) -> None:
"""Provides a simple global memory cache."""
-
from __future__ import annotations
import asyncio
provider_instance_id_or_domain = "url"
media_type = MediaType.UNKNOWN
item_id = uri
- elif "://" in uri:
+ elif "://" in uri and len(uri.split("/")) >= 4:
# music assistant-style uri
# provider://media_type/item_id
provider_instance_id_or_domain = uri.split("://")[0]
except ValueError:
return False
return str(uuid_obj) == uuid_to_test
+
+
+class classproperty(property): # noqa: N801
+ """Implement class property for python3.11+."""
+
+ def __get__(self, cls, owner): # noqa: D105
+ return classmethod(self.fget).__get__(None, owner)()
from __future__ import annotations
from enum import StrEnum
+from typing import Self
+
+from music_assistant.common.helpers.util import classproperty
class MediaType(StrEnum):
PLAYLIST = "playlist"
RADIO = "radio"
FOLDER = "folder"
+ ANNOUNCEMENT = "announcement"
UNKNOWN = "unknown"
@classmethod
- @property
- def ALL(cls) -> tuple[MediaType, ...]: # noqa: N802
+ def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003
+ """Set default enum member if an unknown value is provided."""
+ return cls.UNKNOWN
+
+ @classproperty
+ def ALL(self) -> tuple[MediaType, ...]: # noqa: N802
"""Return all (default) MediaTypes as tuple."""
return (
MediaType.ARTIST,
MediaType.TRACK,
MediaType.PLAYLIST,
MediaType.RADIO,
+ MediaType.ANNOUNCEMENT,
)
ASIN = "asin" # amazon unique number to identify albums
DISCOGS = "discogs" # id for media item on discogs
TADB = "tadb" # the audio db id
+ UNKNOWN = "unknown"
+
+ @classmethod
+ def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003
+ """Set default enum member if an unknown value is provided."""
+ return cls.UNKNOWN
class LinkType(StrEnum):
DISCOGS = "discogs"
WIKIPEDIA = "wikipedia"
ALLMUSIC = "allmusic"
+ UNKNOWN = "unknown"
+
+ @classmethod
+ def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003
+ """Set default enum member if an unknown value is provided."""
+ return cls.UNKNOWN
class ImageType(StrEnum):
DISCART = "discart"
OTHER = "other"
+ @classmethod
+ def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003
+ """Set default enum member if an unknown value is provided."""
+ return cls.OTHER
+
class AlbumType(StrEnum):
"""Enum for Album type."""
MPEG_DASH = "dash"
UNKNOWN = "?"
+ @classmethod
+ def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003
+ """Set default enum member if an unknown value is provided."""
+ return cls.UNKNOWN
+
@classmethod
def try_parse(cls: ContentType, string: str) -> ContentType:
"""Try to parse ContentType from (url)string/extension."""
STEREO_PAIR = "stereo_pair"
GROUP = "group"
SYNC_GROUP = "sync_group"
+ UNKNOWN = "unknown"
+
+ @classmethod
+ def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003
+ """Set default enum member if an unknown value is provided."""
+ return cls.UNKNOWN
class PlayerFeature(StrEnum):
SYNC = "sync"
SEEK = "seek"
ENQUEUE_NEXT = "enqueue_next"
+ PLAY_ANNOUNCEMENT = "play_announcement"
+ UNKNOWN = "unknown"
+
+ @classmethod
+ def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003
+ """Set default enum member if an unknown value is provided."""
+ return cls.UNKNOWN
class EventType(StrEnum):
PLAYER_CONFIG_UPDATED = "player_config_updated"
SYNC_TASKS_UPDATED = "sync_tasks_updated"
AUTH_SESSION = "auth_session"
+ UNKNOWN = "unknown"
+
+ @classmethod
+ def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003
+ """Set default enum member if an unknown value is provided."""
+ return cls.UNKNOWN
class ProviderFeature(StrEnum):
#
# PLUGIN FEATURES
#
+ UNKNOWN = "unknown"
+
+ @classmethod
+ def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003
+ """Set default enum member if an unknown value is provided."""
+ return cls.UNKNOWN
class ProviderType(StrEnum):
LABEL = "label"
DIVIDER = "divider"
ACTION = "action"
+ UNKNOWN = "unknown"
+
+ @classmethod
+ def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003
+ """Set default enum member if an unknown value is provided."""
+ return cls.UNKNOWN
# and pass along freely
extra_data: dict[str, Any] = field(default_factory=dict)
+ # announcement_in_progress boolean to indicate there's an announcement in progress.
+ announcement_in_progress: bool = False
+
@property
def corrected_elapsed_time(self) -> float:
"""Return the corrected/realtime elapsed time."""
current_item: QueueItem | None = None
next_item: QueueItem | None = None
radio_source: list[MediaItemType] = field(default_factory=list)
- announcement_in_progress: bool = False
flow_mode: bool = False
# flow_mode_start_index: index of the first item of the flow stream
flow_mode_start_index: int = 0
"""Return MediaType for this QueueItem (for convenience purposes)."""
if self.media_item:
return self.media_item.media_type
+ if self.streamdetails:
+ return self.streamdetails.media_type
return MediaType.UNKNOWN
@classmethod
can_seek: bool = True
# the fields below will be set/controlled by the streamcontroller
+ seek_position: int = 0
+ fade_in: bool = False
loudness: LoudnessMeasurement | None = None
queue_id: str | None = None
seconds_streamed: float | None = None
- seconds_skipped: float | None = None
target_loudness: float | None = None
def __str__(self) -> str:
"""Execute action(s) on serialization."""
d.pop("queue_id", None)
d.pop("seconds_streamed", None)
- d.pop("seconds_skipped", None)
+ d.pop("seek_position", None)
+ d.pop("fade_in", None)
d.pop("target_loudness", None)
return d
import pathlib
from typing import Final
-API_SCHEMA_VERSION: Final[int] = 23
-MIN_SCHEMA_VERSION: Final[int] = 23
+API_SCHEMA_VERSION: Final[int] = 24
+MIN_SCHEMA_VERSION: Final[int] = 24
DB_SCHEMA_VERSION: Final[int] = 28
ROOT_LOGGER_NAME: Final[str] = "music_assistant"
@api_command("players/queue/shuffle")
def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
"""Configure shuffle setting on the the queue."""
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
+ return
queue = self._queues[queue_id]
if queue.shuffle_enabled == shuffle_enabled:
return # no change
@api_command("players/queue/repeat")
def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
"""Configure repeat setting on the the queue."""
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
+ return
queue = self._queues[queue_id]
if queue.repeat_mode == repeat_mode:
return # no change
"""
# ruff: noqa: PLR0915,PLR0912
queue = self._queues[queue_id]
- if queue.announcement_in_progress:
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
LOGGER.warning("Ignore queue command: An announcement is in progress")
return
- pos_shift: move item x positions up if negative value
- pos_shift: move item to top of queue as next item if 0.
"""
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
+ return
queue = self._queues[queue_id]
item_index = self.index_by_id(queue_id, queue_item_id)
if item_index <= queue.index_in_buffer:
@api_command("players/queue/delete_item")
def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
"""Delete item (by id or index) from the queue."""
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
+ return
if isinstance(item_id_or_index, str):
item_index = self.index_by_id(queue_id, item_id_or_index)
else:
@api_command("players/queue/clear")
def clear(self, queue_id: str) -> None:
"""Clear all items in the queue."""
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
+ return
queue = self._queues[queue_id]
queue.radio_source = []
if queue.state != PlayerState.IDLE:
- queue_id: queue_id of the playerqueue to handle the command.
"""
- if self._queues[queue_id].announcement_in_progress:
- LOGGER.warning("Ignore queue command for %s because an announcement is in progress.")
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
return
# simply forward the command to underlying player
await self.mass.players.cmd_stop(queue_id)
- queue_id: queue_id of the playerqueue to handle the command.
"""
- if self._queues[queue_id].announcement_in_progress:
- LOGGER.warning("Ignore queue command for %s because an announcement is in progress.")
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
return
if self._queues[queue_id].state == PlayerState.PAUSED:
# simply forward the command to underlying player
- queue_id: queue_id of the playerqueue to handle the command.
"""
- if self._queues[queue_id].announcement_in_progress:
- LOGGER.warning("Ignore queue command for %s because an announcement is in progress.")
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
return
player = self.mass.players.get(queue_id, True)
if PlayerFeature.PAUSE not in player.supported_features:
- queue_id: queue_id of the queue to handle the command.
"""
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
+ return
current_index = self._queues[queue_id].current_index
next_index = self._get_next_index(queue_id, current_index, True)
if next_index is None:
- queue_id: queue_id of the queue to handle the command.
"""
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
+ return
current_index = self._queues[queue_id].current_index
if current_index is None:
return
- queue_id: queue_id of the queue to handle the command.
- seconds: number of seconds to skip in track. Use negative value to skip back.
"""
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
+ return
await self.seek(queue_id, self._queues[queue_id].elapsed_time + seconds)
@api_command("players/queue/seek")
- queue_id: queue_id of the queue to handle the command.
- position: position in seconds to seek to in the current playing item.
"""
+ if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
+ return
queue = self._queues[queue_id]
assert queue.current_item, "No item loaded"
assert queue.current_item.media_item.media_type == MediaType.TRACK
) -> None:
"""Play item at index (or item_id) X in queue."""
queue = self._queues[queue_id]
- if queue.announcement_in_progress:
- LOGGER.warning("Ignore queue command for %s because an announcement is in progress.")
- return
if isinstance(index, str):
index = self.index_by_id(queue_id, index)
queue_item = self.get_item(queue_id, index)
queue.index_in_buffer = index
queue.flow_mode_start_index = index
queue.flow_mode = False # reset
+ # get streamdetails - do this here to catch unavailable items early
+ queue_item.streamdetails = await get_stream_details(
+ self.mass, queue_item, seek_position=seek_position, fade_in=fade_in
+ )
await self.mass.players.play_media(
player_id=queue_id,
queue_item=queue_item,
- seek_position=int(seek_position),
- fade_in=fade_in,
)
# Interaction with player
if player.player_id not in self._queues:
# race condition
return
+ if player.announcement_in_progress:
+ # do nothing while the announcement is in progress
+ return
queue_id = player.player_id
player = self.mass.players.get(queue_id)
queue = self._queues[queue_id]
# determine if this queue is currently active for this player
queue.active = player.powered and player.active_source == queue.queue_id
if not queue.active:
+ # return early if the queue is not active
queue.state = PlayerState.IDLE
- self._prev_states.pop(queue_id, None)
+ if prev_state := self._prev_states.pop(queue_id, None):
+ self.signal_update(queue_id)
return
# update current item from player report
if queue.flow_mode:
# flow mode active, calculate current item
queue.current_index, queue.elapsed_time = self.__get_queue_stream_index(queue, player)
+ queue.elapsed_time_last_updated = time.time()
else:
# queue is active and player has one of our tracks loaded, update state
if item_id := self._parse_player_current_item_id(queue_id, player.current_item_id):
queue.current_index = self.index_by_id(queue_id, item_id)
- queue.elapsed_time = int(player.corrected_elapsed_time)
+ if player.state == PlayerState.PLAYING:
+ queue.elapsed_time = int(player.corrected_elapsed_time)
+ queue.elapsed_time_last_updated = player.elapsed_time_last_updated
# only update these attributes if the queue is active
# and has an item loaded so we are able to resume it
queue.state = player.state
- queue.elapsed_time_last_updated = time.time()
queue.current_item = self.get_item(queue_id, queue.current_index)
queue.next_item = self._get_next_item(queue_id)
# correct elapsed time when seeking
if (
queue.current_item
and queue.current_item.streamdetails
- and queue.current_item.streamdetails.seconds_skipped
+ and queue.current_item.streamdetails.seek_position
+ and player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
and not queue.flow_mode
):
- queue.elapsed_time += queue.current_item.streamdetails.seconds_skipped
+ queue.elapsed_time += queue.current_item.streamdetails.seek_position
# basic throttle: do not send state changed events if queue did not actually change
prev_state = self._prev_states.get(queue_id, {})
return
if prev_state.get("state") != PlayerState.PLAYING:
return
+ if (player := self.mass.players.get(queue.queue_id)) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
+ return
current_item = self.get_item(queue.queue_id, queue.current_index)
if not current_item:
return # guard, just in case something bad happened
seconds_remaining = int(duration - player.corrected_elapsed_time)
async def _enqueue_next(index: int, supports_enqueue: bool = False) -> None:
+ if (
+ player := self.mass.players.get(queue.queue_id)
+ ) and player.announcement_in_progress:
+ LOGGER.warning("Ignore queue command: An announcement is in progress")
+ return
with suppress(QueueEmpty):
next_item = await self.preload_next_item(queue.queue_id, index)
if supports_enqueue:
queue_index += 1
else:
# no more seconds left to divide, this is our track
- # account for any seeking by adding the skipped seconds
- track_sec_skipped = queue_track.streamdetails.seconds_skipped or 0
+ # account for any seeking by adding the skipped/seeked seconds
+ track_sec_skipped = queue_track.streamdetails.seek_position
track_time = elapsed_time_queue + track_sec_skipped - total_time
break
return queue_index, track_time
import asyncio
import functools
import logging
+from contextlib import suppress
from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, cast
import shortuuid
from music_assistant.common.helpers.util import get_changed_values
from music_assistant.common.models.enums import (
+ ContentType,
EventType,
+ MediaType,
PlayerFeature,
PlayerState,
PlayerType,
ProviderUnavailableError,
UnsupportedFeaturedException,
)
+from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.queue_item import QueueItem
+from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import (
CONF_AUTO_PLAY,
CONF_GROUP_MEMBERS,
from collections.abc import Awaitable, Callable, Coroutine, Iterable, Iterator
from music_assistant.common.models.config_entries import CoreConfig
- from music_assistant.common.models.queue_item import QueueItem
+
LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.players")
_P = ParamSpec("_P")
-def log_player_command(
+def handle_player_command(
func: Callable[Concatenate[_PlayerControllerT, _P], Awaitable[_R]],
) -> Callable[Concatenate[_PlayerControllerT, _P], Coroutine[Any, Any, _R | None]]:
"""Check and log commands to players."""
@functools.wraps(func)
async def wrapper(self: _PlayerControllerT, *args: _P.args, **kwargs: _P.kwargs) -> _R | None:
- """Log and log_player_command commands to players."""
+ """Log and handle_player_command commands to players."""
player_id = kwargs["player_id"] if "player_id" in kwargs else args[0]
if (player := self._players.get(player_id)) is None or not player.available:
# player not existent
player_id,
)
return
+
self.logger.debug(
"Handling command %s for player %s",
func.__name__,
# Player commands
@api_command("players/cmd/stop")
- @log_player_command
+ @handle_player_command
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player.
await player_provider.cmd_stop(player_id)
@api_command("players/cmd/play")
- @log_player_command
+ @handle_player_command
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY (unpause) command to given player.
await player_provider.cmd_play(player_id)
@api_command("players/cmd/pause")
- @log_player_command
+ @handle_player_command
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player.
self.mass.create_task(_watch_pause(player_id))
@api_command("players/cmd/play_pause")
- @log_player_command
+ @handle_player_command
async def cmd_play_pause(self, player_id: str) -> None:
"""Toggle play/pause on given player.
await self.cmd_play(player_id)
@api_command("players/cmd/power")
- @log_player_command
+ @handle_player_command
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player.
await self.mass.player_queues.resume(player_id)
@api_command("players/cmd/volume_set")
- @log_player_command
+ @handle_player_command
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.
await player_provider.cmd_volume_set(player_id, volume_level)
@api_command("players/cmd/volume_up")
- @log_player_command
+ @handle_player_command
async def cmd_volume_up(self, player_id: str) -> None:
"""Send VOLUME_UP command to given player.
await self.cmd_volume_set(player_id, new_volume)
@api_command("players/cmd/volume_down")
- @log_player_command
+ @handle_player_command
async def cmd_volume_down(self, player_id: str) -> None:
"""Send VOLUME_DOWN command to given player.
await self.cmd_volume_set(player_id, new_volume)
@api_command("players/cmd/group_volume")
- @log_player_command
+ @handle_player_command
async def cmd_group_volume(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given playergroup.
self.update(player_id)
@api_command("players/cmd/volume_mute")
- @log_player_command
+ @handle_player_command
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
"""Send VOLUME_MUTE command to given player.
player_prov = self.mass.players.get_player_provider(player_id)
await player_prov.cmd_seek(player_id, position)
- async def play_media(
+ @api_command("players/cmd/play_announcement")
+ async def play_announcement(
self,
player_id: str,
- queue_item: QueueItem,
- seek_position: int,
- fade_in: bool,
+ url: str,
+ use_pre_announce: bool = False,
) -> None:
+ """Handle playback of an announcement (url) on given player."""
+ player = self.get(player_id, True)
+ if player.announcement_in_progress:
+ return
+ try:
+ # mark announcement_in_progress on player
+ player.announcement_in_progress = True
+ # check for native announce support
+ if PlayerFeature.PLAY_ANNOUNCEMENT in player.supported_features:
+ if prov := self.mass.get_provider(player.provider):
+ await prov.play_announcement(player_id, url, use_pre_announce)
+ return
+ # use fallback/default implementation
+ await self._play_announcement(player, url, use_pre_announce)
+ finally:
+ player.announcement_in_progress = False
+
+ async def play_media(self, player_id: str, queue_item: QueueItem) -> None:
"""Handle PLAY MEDIA on given player.
This is called by the Queue controller to start playing a queue item on the given player.
- player_id: player_id of the player to handle the command.
- queue_item: The QueueItem that needs to be played on the player.
- - seek_position: Optional seek to this position.
- - fade_in: Optionally fade in the item at playback start.
"""
if player_id.startswith(SYNCGROUP_PREFIX):
# redirect to syncgroup-leader if needed
await self.cmd_group_power(player_id, True)
group_player = self.get(player_id, True)
if sync_leader := self.get_sync_leader(group_player):
- await self.play_media(
- sync_leader.player_id,
- queue_item=queue_item,
- seek_position=seek_position,
- fade_in=fade_in,
- )
+ await self.play_media(sync_leader.player_id, queue_item=queue_item)
group_player.state = PlayerState.PLAYING
return
player_prov = self.mass.players.get_player_provider(player_id)
await player_prov.play_media(
player_id=player_id,
queue_item=queue_item,
- seek_position=int(seek_position),
- fade_in=fade_in,
)
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
await player_prov.enqueue_next_queue_item(player_id=player_id, queue_item=queue_item)
@api_command("players/cmd/sync")
- @log_player_command
+ @handle_player_command
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player.
await player_provider.cmd_sync(player_id, target_player)
@api_command("players/cmd/unsync")
- @log_player_command
+ @handle_player_command
async def cmd_unsync(self, player_id: str) -> None:
"""Handle UNSYNC command for given player.
if new_power:
# if a child player turned ON while the group player is on, we need to resync/resume
self.mass.create_task(self._sync_syncgroup(group_player.player_id))
+
+ async def _play_announcement(
+ self,
+ player: Player,
+ url: str,
+ use_pre_announce: bool | None = None,
+ ) -> None:
+ """Handle (default/fallback) implementation of the play announcement feature.
+
+ This default implementation will;
+ - stop playback of the current media (if needed)
+ - power on the player (if needed)
+ - raise the volume a bit
+ - play the announcement (from given url)
+ - wait for the player to finish playing
+ - restore the previous power and volume
+ - restore playback (if needed and if possible)
+
+ This default implementation will only be used if the player's
+ provider has no native support for the PLAY_ANNOUNCEMENT feature.
+ """
+ if player.synced_to:
+ # redirect to sync master if player is group child
+ self.mass.create_task(self.play_announcement(player.synced_to, url))
+ return
+ if active_group := self._get_active_player_group(player):
+ # redirect to group player if playergroup is atcive
+ self.mass.create_task(self.play_announcement(active_group.player_id, url))
+ return
+ self.logger.info(
+ "Playback announcement to player %s (with pre-announce: %s): %s",
+ player.display_name,
+ use_pre_announce,
+ url,
+ )
+ # use stream server to host announcement on local network
+ # this ensures playback on all players, including ones that do not
+ # like https hosts and it also offers the pre-announce 'bell'
+ url = self.mass.streams.get_announcement_url(player.player_id, url, use_pre_announce)
+ # create a queue item for the announcement so
+ # we can send a regular play-media call downstream
+ queue_item = QueueItem(
+ queue_id=player.player_id,
+ queue_item_id=url,
+ name="Announcement",
+ duration=None,
+ streamdetails=StreamDetails(
+ provider="url",
+ item_id=url,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(url),
+ ),
+ media_type=MediaType.ANNOUNCEMENT,
+ direct=url,
+ data=url,
+ target_loudness=-10,
+ ),
+ )
+ prev_power = player.powered
+ prev_volume = player.volume_level
+ prev_state = player.state
+ queue = self.mass.player_queues.get_active_queue(player.player_id)
+ prev_queue_active = queue.active
+ prev_item_id = player.current_item_id
+ # stop player if its currently playing
+ if prev_state in (PlayerState.PLAYING, PlayerState.PAUSED):
+ self.logger.debug(
+ "Announcement to player %s - stop existing content (%s)...",
+ player.display_name,
+ prev_item_id,
+ )
+ await self.cmd_stop(player.player_id)
+ # wait for the player to stop
+ with suppress(TimeoutError):
+ await self.wait_for_state(player, PlayerState.IDLE, 5)
+ # increase volume a bit
+ temp_volume = int(min(75, prev_volume * 1.5))
+ if temp_volume > prev_volume:
+ self.logger.debug(
+ "Announcement to player %s - setting temporary volume (%s)...",
+ player.display_name,
+ temp_volume,
+ )
+ await self.cmd_volume_set(player.player_id, temp_volume)
+ # play the announcement
+ self.logger.debug(
+ "Announcement to player %s - playing the announcement on the player...",
+ player.display_name,
+ )
+ await self.play_media(player_id=player.player_id, queue_item=queue_item)
+ # wait for the player to play
+ with suppress(TimeoutError):
+ await self.wait_for_state(player, PlayerState.PLAYING, 5)
+ self.logger.debug(
+ "Announcement to player %s - waiting on the player to stop playing...",
+ player.display_name,
+ )
+ # wait for the player to stop playing
+ with suppress(TimeoutError):
+ await self.wait_for_state(player, PlayerState.IDLE, 30)
+ self.logger.debug(
+ "Announcement to player %s - restore previous state...", player.display_name
+ )
+ # restore volume
+ if temp_volume != prev_volume:
+ await self.cmd_volume_set(player.player_id, prev_volume)
+ player.current_item_id = prev_item_id
+ # either power off the player or resume playing
+ if not prev_power:
+ await self.cmd_power(player.player_id, False)
+ return
+ elif prev_queue_active and prev_state == PlayerState.PLAYING:
+ await self.mass.player_queues.resume(queue.queue_id, True)
+ elif prev_state == PlayerState.PLAYING:
+ # player was playing something else - try to resume that here
+ self.logger.warning("Can not resume %s on %s", prev_item_id, player.display_name)
+ # TODO !!
+
+ async def wait_for_state(
+ self, player: Player, wanted_state: PlayerState, timeout: float = 60.0
+ ) -> None:
+ """Wait for the given player to reach the given state."""
+ async with asyncio.timeout(timeout):
+ while player.state != wanted_state:
+ await asyncio.sleep(0.1)
import shortuuid
from aiohttp import web
-from music_assistant.common.helpers.util import empty_queue, get_ip, select_free_port
+from music_assistant.common.helpers.util import (
+ empty_queue,
+ get_ip,
+ select_free_port,
+ try_parse_bool,
+)
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueOption,
ConfigValueType,
)
from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType
-from music_assistant.common.models.errors import MediaNotFoundError, QueueEmpty
+from music_assistant.common.models.errors import QueueEmpty
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.constants import (
+ ANNOUNCE_ALERT_FILE,
CONF_BIND_IP,
CONF_BIND_PORT,
CONF_CROSSFADE,
get_ffmpeg_stream,
get_media_stream,
get_player_filter_params,
- get_stream_details,
)
from music_assistant.server.helpers.util import get_ips
from music_assistant.server.helpers.webserver import Webserver
"some player specific local control callbacks."
)
self.manifest.icon = "cast-audio"
+ self.announcements: dict[str, str] = {}
@property
def base_url(self) -> str:
"/command/{queue_id}/{command}.mp3",
self.serve_command_request,
),
+ (
+ "*",
+ "/announcement/{player_id}.{fmt}",
+ self.serve_announcement_stream,
+ ),
],
)
player_id: str,
queue_item: QueueItem,
output_codec: ContentType,
- seek_position: int = 0,
- fade_in: bool = False,
flow_mode: bool = False,
) -> str:
"""Resolve the stream URL for the given QueueItem."""
fmt = output_codec.value
+ # handle announcement item
+ if queue_item.media_type == MediaType.ANNOUNCEMENT:
+ return queue_item.queue_item_id
# handle request for multi client queue stream
stream_job = self.multi_client_jobs.get(queue_item.queue_id)
if queue_item.queue_item_id == "flow" or stream_job and stream_job.pending:
query_params = {}
base_path = "flow" if flow_mode else "single"
url = f"{self._server.base_url}/{base_path}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" # noqa: E501
- if seek_position:
- query_params["seek_position"] = str(seek_position)
- if fade_in:
- query_params["fade_in"] = "1"
# we add a timestamp as basic checksum
# most importantly this is to invalidate any caches
# but also to handle edge cases such as single track repeat
self,
queue_id: str,
start_queue_item: QueueItem,
- seek_position: int = 0,
- fade_in: bool = False,
pcm_bit_depth: int = 24,
pcm_sample_rate: int = 48000,
expected_players: set[str] | None = None,
queue=queue,
start_queue_item=start_queue_item,
pcm_format=pcm_format,
- seek_position=seek_position,
- fade_in=fade_in,
),
pcm_format=pcm_format,
expected_players=expected_players or set(),
queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id)
if not queue_item:
raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
- try:
- queue_item.streamdetails = await get_stream_details(self.mass, queue_item=queue_item)
- except MediaNotFoundError:
- raise web.HTTPNotFound(
- reason=f"Unable to retrieve streamdetails for item: {queue_item}"
- )
- seek_position = int(request.query.get("seek_position", 0))
- queue_item.streamdetails.seconds_skipped = seek_position
- fade_in = bool(request.query.get("fade_in", 0))
+ if not queue_item.streamdetails:
+ raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
# work out output format/details
output_format = await self._get_output_format(
output_format_str=request.match_info["fmt"],
default_sample_rate=queue_item.streamdetails.audio_format.sample_rate,
default_bit_depth=queue_item.streamdetails.audio_format.bit_depth,
)
-
# prepare request, add some DLNA/UPNP compatible headers
headers = {
**DEFAULT_STREAM_HEADERS,
self.mass,
streamdetails=queue_item.streamdetails,
pcm_format=pcm_format,
- seek_position=seek_position,
- fade_in=fade_in,
),
input_format=pcm_format,
output_format=output_format,
start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
if not start_queue_item:
raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
- seek_position = int(request.query.get("seek_position", 0))
- fade_in = bool(request.query.get("fade_in", 0))
queue_player = self.mass.players.get(queue_id)
# work out output format/details
output_format = await self._get_output_format(
queue=queue,
start_queue_item=start_queue_item,
pcm_format=pcm_format,
- seek_position=seek_position,
- fade_in=fade_in,
),
input_format=pcm_format,
output_format=output_format,
self.mass.create_task(self.mass.player_queues.next(queue_id))
return web.FileResponse(SILENCE_FILE)
+ async def serve_announcement_stream(self, request: web.Request) -> web.Response:
+ """Stream announcement audio to a player."""
+ self._log_request(request)
+ player_id = request.match_info["player_id"]
+ player = self.mass.player_queues.get(player_id)
+ if not player:
+ raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
+ if player_id not in self.announcements:
+ raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
+ announcement = self.announcements[player_id]
+ use_pre_announce = try_parse_bool(request.query.get("pre_announce"))
+
+ # work out output format/details
+ fmt = request.match_info.get("fmt", announcement.rsplit(".")[-1])
+ audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
+ # prepare request, add some DLNA/UPNP compatible headers
+ headers = {
+ **DEFAULT_STREAM_HEADERS,
+ "Content-Type": f"audio/{audio_format.output_format_str}",
+ }
+ resp = web.StreamResponse(
+ status=200,
+ reason="OK",
+ headers=headers,
+ )
+ await resp.prepare(request)
+
+ # return early if this is not a GET request
+ if request.method != "GET":
+ return resp
+
+ # all checks passed, start streaming!
+ self.logger.debug(
+ "Start serving audio stream for Announcement %s to %s",
+ announcement,
+ player.display_name,
+ )
+ extra_args = []
+ filter_params = ["loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5"]
+ if use_pre_announce:
+ extra_args += [
+ "-i",
+ ANNOUNCE_ALERT_FILE,
+ "-filter_complex",
+ "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5",
+ ]
+ filter_params = []
+
+ async for chunk in get_ffmpeg_stream(
+ audio_input=announcement,
+ input_format=audio_format,
+ output_format=audio_format,
+ extra_args=extra_args,
+ filter_params=filter_params,
+ ):
+ try:
+ await resp.write(chunk)
+ except (BrokenPipeError, ConnectionResetError):
+ break
+
+ self.logger.debug(
+ "Finished serving audio stream for Announcement %s to %s",
+ announcement,
+ player.display_name,
+ )
+
+ return resp
+
def get_command_url(self, player_or_queue_id: str, command: str) -> str:
"""Get the url for the special command stream."""
return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
+ def get_announcement_url(
+ self,
+ player_id: str,
+ announcement_url: str,
+ use_pre_announce: bool = False,
+ content_type: ContentType = ContentType.MP3,
+ ) -> str:
+ """Get the url for the special announcement stream."""
+ self.announcements[player_id] = announcement_url
+ return f"{self.base_url}/announcement/{player_id}.{content_type.value}?pre_announce={use_pre_announce}" # noqa: E501
+
async def get_flow_stream(
self,
queue: PlayerQueue,
start_queue_item: QueueItem,
pcm_format: AudioFormat,
- seek_position: int = 0,
- fade_in: bool = False,
) -> AsyncGenerator[bytes, None]:
"""Get a flow stream of all tracks in the queue as raw PCM audio."""
# ruff: noqa: PLR0915
# get (next) queue item to stream
if queue_track is None:
queue_track = start_queue_item
- queue_track.streamdetails = await get_stream_details(self.mass, queue_track)
else:
- seek_position = 0
- fade_in = False
try:
queue_track = await self.mass.player_queues.preload_next_item(queue.queue_id)
except QueueEmpty:
break
+ if queue_track.streamdetails is None:
+ raise RuntimeError(
+ "No Streamdetails known for queue item %s", queue_track.queue_item_id
+ )
+
self.logger.debug(
"Start Streaming queue track: %s (%s) for queue %s",
queue_track.streamdetails.uri,
queue.queue_id, CONF_CROSSFADE_DURATION, 8
)
crossfade_size = int(pcm_sample_size * crossfade_duration)
- queue_track.streamdetails.seconds_skipped = seek_position
buffer_size = int(pcm_sample_size * 2) # 2 seconds
if use_crossfade:
buffer_size += crossfade_size
self.mass,
queue_track.streamdetails,
pcm_format=pcm_format,
- seek_position=seek_position,
- fade_in=fade_in,
# strip silence from begin/end if track is being crossfaded
strip_silence_begin=use_crossfade,
strip_silence_end=use_crossfade,
# this also accounts for crossfade and silence stripping
queue_track.streamdetails.seconds_streamed = bytes_written / pcm_sample_size
queue_track.streamdetails.duration = (
- seek_position + queue_track.streamdetails.seconds_streamed
+ queue_track.streamdetails.seconds_skipped
+ or 0 + queue_track.streamdetails.seconds_streamed
)
total_bytes_written += bytes_written
self.logger.debug(
analyze_jobs.discard(streamdetails.uri)
-async def get_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> StreamDetails:
+async def get_stream_details(
+ mass: MusicAssistant,
+ queue_item: QueueItem,
+ seek_position: int = 0,
+ fade_in: bool = False,
+) -> StreamDetails:
"""Get streamdetails for the given QueueItem.
This is called just-in-time when a PlayerQueue wants a MediaItem to be played.
Do not try to request streamdetails in advance as this is expiring data.
param media_item: The QueueItem for which to request the streamdetails for.
"""
- if queue_item.streamdetails and (time() < (queue_item.streamdetails.expires - 360)):
- LOGGER.debug(f"Using cached streamdetails from queue_item for {queue_item.uri}")
+ if queue_item.streamdetails and (time() < queue_item.streamdetails.expires):
+ LOGGER.debug(f"Using (pre)cached streamdetails from queue_item for {queue_item.uri}")
# we already have (fresh) streamdetails stored on the queueitem, use these.
- # make a copy to prevent we're altering an existing object and introduce race conditions.
+ # this happens for example while seeking in a track.
+ # we create a copy (using to/from dict) to ensure the one-time values are cleared
streamdetails = StreamDetails.from_dict(queue_item.streamdetails.to_dict())
else:
# always request the full item as there might be other qualities available
# set queue_id on the streamdetails so we know what is being streamed
streamdetails.queue_id = queue_item.queue_id
+ # handle skip/fade_in details
+ streamdetails.seek_position = seek_position
+ streamdetails.fade_in = fade_in
# handle volume normalization details
if not streamdetails.loudness:
streamdetails.loudness = await mass.music.get_track_loudness(
streamdetails.item_id, streamdetails.provider
)
- if (
+ if streamdetails.target_loudness is not None:
+ streamdetails.target_loudness = streamdetails.target_loudness
+ elif (
player_settings := await mass.config.get_player_config(streamdetails.queue_id)
) and player_settings.get_value(CONF_VOLUME_NORMALIZATION):
streamdetails.target_loudness = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET)
mass: MusicAssistant,
streamdetails: StreamDetails,
pcm_format: AudioFormat,
- seek_position: int = 0,
- fade_in: bool = False,
strip_silence_begin: bool = False,
strip_silence_end: bool = False,
) -> AsyncGenerator[tuple[bool, bytes], None]:
"""
logger = LOGGER.getChild("media_stream")
bytes_sent = 0
- streamdetails.seconds_skipped = seek_position
+ streamdetails.seconds_skipped = streamdetails.seek_position
is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
- if is_radio or seek_position:
+ if is_radio or streamdetails.seek_position:
strip_silence_begin = False
# chunk size = 2 seconds of pcm audio
pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
# collect all arguments for ffmpeg
filter_params = []
extra_args = []
- seek_pos = seek_position if (streamdetails.direct or not streamdetails.can_seek) else 0
+ seek_pos = (
+ streamdetails.seek_position if (streamdetails.direct or not streamdetails.can_seek) else 0
+ )
if seek_pos:
# only use ffmpeg seeking if the provider stream does not support seeking
extra_args += ["-ss", str(seek_pos)]
filter_rule += f":measured_thresh={streamdetails.loudness.threshold}"
filter_rule += ":print_format=json"
filter_params.append(filter_rule)
- if fade_in:
+ if streamdetails.fade_in:
filter_params.append("afade=type=in:start_time=0:duration=3")
ffmpeg_args = _get_ffmpeg_args(
input_format=streamdetails.audio_format,
"""Task that grabs the source audio and feeds it to ffmpeg."""
logger.log(VERBOSE_LOG_LEVEL, "writer started for %s", streamdetails.uri)
music_prov = mass.get_provider(streamdetails.provider)
- seek_pos = seek_position if streamdetails.can_seek else 0
+ seek_pos = streamdetails.seek_position if streamdetails.can_seek else 0
async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
await ffmpeg_proc.write(audio_chunk)
# write eof when last packet is received
seconds_streamed,
)
# store accurate duration
- streamdetails.duration = seek_position + seconds_streamed
+ streamdetails.duration = streamdetails.seek_position + seconds_streamed
else:
logger.debug(
"stream aborted for %s (%s seconds streamed)",
self,
player_id: str,
queue_item: QueueItem,
- seek_position: int,
- fade_in: bool,
) -> None:
"""Handle PLAY MEDIA on given player.
- player_id: player_id of the player to handle the command.
- queue_item: The QueueItem that needs to be played on the player.
- - seek_position: Optional seek to this position.
- - fade_in: Optionally fade in the item at playback start.
"""
raise NotImplementedError
This will NOT be called if the player is using flow mode to playback the queue.
"""
+ async def play_announcement(
+ self, player_id: str, announcement_url: str, use_pre_announce: bool = False
+ ) -> None:
+ """Handle (provider native) playback of an announcement on given player."""
+ # will only be called for players with PLAY_ANNOUNCEMENT feature set.
+ raise NotImplementedError
+
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player.
from music_assistant.common.models.enums import (
ConfigEntryType,
ContentType,
+ MediaType,
PlayerFeature,
PlayerState,
PlayerType,
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
-from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
+from music_assistant.server.helpers.audio import (
+ get_ffmpeg_stream,
+ get_media_stream,
+ get_player_filter_params,
+)
from music_assistant.server.helpers.process import check_output
from music_assistant.server.models.player_provider import PlayerProvider
self._log_reader_task = asyncio.create_task(self._log_watcher())
self._audio_reader_task = asyncio.create_task(self._audio_reader())
- async def stop(self, force=False):
+ async def stop(self):
"""Stop playback and cleanup."""
if not self.running:
return
- await self.send_cli_command("ACTION=STOP")
- self._stop_requested = True
- if not force:
- return
- # stop background tasks
+ # always stop the audio feeder
if self._audio_reader_task and not self._audio_reader_task.done():
with suppress(asyncio.CancelledError):
self._audio_reader_task.cancel()
await self._audio_reader_task
- if self._log_reader_task and not self._log_reader_task.done():
- with suppress(asyncio.CancelledError):
- self._log_reader_task.cancel()
- await self._log_reader_task
+ await self.send_cli_command("ACTION=STOP")
+ self._stop_requested = True
with suppress(TimeoutError):
- await asyncio.wait_for(self._cliraop_proc.communicate(), 5)
+ await asyncio.wait_for(self._cliraop_proc.wait(), 5)
if self._cliraop_proc.returncode is None:
self._cliraop_proc.kill()
command += "\n"
def send_data():
- with open(named_pipe, "w") as f:
+ with suppress(BrokenPipeError), open(named_pipe, "w") as f:
f.write(command)
self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
continue
if "lost packet out of backlog" in line:
lost_packets += 1
- if lost_packets == 30:
- logger.warning("Packet loss detected, restart playback...")
+ if lost_packets == 50:
+ logger.warning("High packet loss detected, restart playback...")
queue = self.mass.player_queues.get_active_queue(mass_player.player_id)
await self.mass.player_queues.resume(queue.queue_id)
else:
"CLIRaop process stopped with errorcode %s",
self._cliraop_proc.returncode,
)
- if (
+ if not airplay_player.active_stream or (
airplay_player.active_stream
and airplay_player.active_stream.active_remote_id == self.active_remote_id
):
- player_id: player_id of the player to handle the command.
"""
-
- async def stop_player(airplay_player: AirPlayPlayer) -> None:
- if airplay_player.active_stream:
- await airplay_player.active_stream.stop(force=False)
- mass_player = self.mass.players.get(airplay_player.player_id)
- mass_player.state = PlayerState.IDLE
- self.mass.players.update(airplay_player.player_id)
-
# forward command to player and any connected sync members
async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
- tg.create_task(stop_player(airplay_player))
+ if airplay_player.active_stream:
+ tg.create_task(airplay_player.active_stream.stop())
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY (unpause) command to given player.
self,
player_id: str,
queue_item: QueueItem,
- seek_position: int,
- fade_in: bool,
) -> None:
- """Handle PLAY MEDIA on given player.
-
- This is called by the Queue controller to start playing a queue item on the given player.
- The provider's own implementation should work out how to handle this request.
-
- - player_id: player_id of the player to handle the command.
- - queue_item: The QueueItem that needs to be played on the player.
- - seek_position: Optional seek to this position.
- - fade_in: Optionally fade in the item at playback start.
- """
+ """Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
if player.synced_to:
# should not happen, but just in case
# always stop existing stream first
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream.running:
- await airplay_player.active_stream.stop(force=True)
+ await airplay_player.active_stream.stop()
pcm_format = AudioFormat(
content_type=ContentType.PCM_S16LE,
sample_rate=44100,
bit_depth=16,
channels=2,
)
-
- if queue_item.queue_item_id == "flow":
+ if queue_item.media_type == MediaType.ANNOUNCEMENT:
+ # stream announcement url directly
+ stream_job = None
+ elif stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id):
# handle special case for UGP multi client stream
stream_job = self.mass.streams.multi_client_jobs.get(queue_item.queue_id)
elif player.group_childs:
stream_job = await self.mass.streams.create_multi_client_stream_job(
queue_item.queue_id,
queue_item,
- seek_position=seek_position,
- fade_in=fade_in,
pcm_bit_depth=16,
pcm_sample_rate=44100,
)
player_id=airplay_player.player_id,
output_format=pcm_format,
)
+ elif queue_item.media_type == MediaType.ANNOUNCEMENT:
+ # stream announcement url directly
+ audio_iterator = get_media_stream(
+ self.mass, queue_item.streamdetails, pcm_format=pcm_format
+ )
else:
queue = self.mass.player_queues.get_active_queue(queue_item.queue_id)
audio_iterator = get_ffmpeg_stream(
queue,
start_queue_item=queue_item,
pcm_format=pcm_format,
- seek_position=seek_position,
- fade_in=fade_in,
),
input_format=pcm_format,
output_format=pcm_format,
self,
player_id: str,
queue_item: QueueItem,
- seek_position: int,
- fade_in: bool,
) -> None:
- """Handle PLAY MEDIA on given player.
-
- This is called by the Queue controller to start playing a queue item on the given player.
- The provider's own implementation should work out how to handle this request.
-
- - player_id: player_id of the player to handle the command.
- - queue_item: The QueueItem that needs to be played on the player.
- - seek_position: Optional seek to this position.
- - fade_in: Optionally fade in the item at playback start.
- """
+ """Handle PLAY MEDIA on given player."""
castplayer = self.castplayers[player_id]
use_flow_mode = await self.mass.config.get_player_config_value(
player_id, CONF_FLOW_MODE
player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
- seek_position=seek_position,
- fade_in=fade_in,
flow_mode=use_flow_mode,
)
queuedata = {
self,
player_id: str,
queue_item: QueueItem,
- seek_position: int,
- fade_in: bool,
) -> None:
- """Handle PLAY MEDIA on given player.
-
- This is called by the Queue controller to start playing a queue item on the given player.
- The provider's own implementation should work out how to handle this request.
-
- - player_id: player_id of the player to handle the command.
- - queue_item: The QueueItem that needs to be played on the player.
- - seek_position: Optional seek to this position.
- - fade_in: Optionally fade in the item at playback start.
- """
+ """Handle PLAY MEDIA on given player."""
use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
url = await self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
- seek_position=seek_position,
- fade_in=fade_in,
flow_mode=use_flow_mode,
)
dlna_player = self.dlnaplayers[player_id]
self,
player_id: str,
queue_item: QueueItem,
- seek_position: int,
- fade_in: bool,
) -> None:
- """Handle PLAY MEDIA on given player.
-
- This is called by the Queue controller to start playing a queue item on the given player.
- The provider's own implementation should work out how to handle this request.
-
- - player_id: player_id of the player to handle the command.
- - queue_item: The QueueItem that needs to be played on the player.
- - seek_position: Optional seek to this position.
- - fade_in: Optionally fade in the item at playback start.
- """
+ """Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
url = await self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
- seek_position=seek_position,
- fade_in=fade_in,
flow_mode=True,
)
await self._fully.playSound(url, AUDIOMANAGER_STREAM_MUSIC)
self,
player_id: str,
queue_item: QueueItem,
- seek_position: int,
- fade_in: bool,
) -> None:
- """Handle PLAY MEDIA on given player.
-
- This is called by the Queue controller to start playing a queue item on the given player.
- The provider's own implementation should work out how to handle this request.
-
- - player_id: player_id of the player to handle the command.
- - queue_item: The QueueItem that needs to be played on the player.
- - seek_position: Optional seek to this position.
- - fade_in: Optionally fade in the item at playback start.
- """
+ """Handle PLAY MEDIA on given player."""
use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
url = await self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
- seek_position=seek_position,
- fade_in=fade_in,
flow_mode=use_flow_mode,
)
await self.hass_prov.hass.call_service(
data=plex_track,
)
- if audio_stream.loudness:
- stream_details.loudness = audio_stream.loudness
-
if media_type != ContentType.M4A:
stream_details.direct = self._plex_server.url(media_part.key, True)
if audio_stream.samplingRate:
self,
player_id: str,
queue_item: QueueItem,
- seek_position: int,
- fade_in: bool,
) -> None:
- """Handle PLAY MEDIA on given player.
-
- This is called by the Queue controller to start playing a queue item on the given player.
- The provider's own implementation should work out how to handle this request.
-
- - player_id: player_id of the player to handle the command.
- - queue_item: The QueueItem that needs to be played on the player.
- - seek_position: Optional seek to this position.
- - fade_in: Optionally fade in the item at playback start.
- """
+ """Handle PLAY MEDIA on given player."""
# fix race condition where resync and play media are called at more or less the same time
if self._resync_handle:
self._resync_handle.cancel()
stream_job = await self.mass.streams.create_multi_client_stream_job(
queue_id=queue_item.queue_id,
start_queue_item=queue_item,
- seek_position=int(seek_position),
- fade_in=fade_in,
)
# forward command to player and any connected sync members
sync_clients = self._get_sync_clients(player_id)
# for now just hardcode flac as we assume that every (modern)
# slimproto based player can handle that just fine
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
- seek_position=seek_position,
- fade_in=fade_in,
flow_mode=False,
)
await self._handle_play_url(
from music_assistant.common.models.enums import (
ConfigEntryType,
ContentType,
+ MediaType,
PlayerFeature,
PlayerState,
PlayerType,
from music_assistant.common.models.errors import SetupFailedError
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.server.helpers.audio import get_media_stream
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
bit_depth=16,
channels=2,
)
- # handle special case for UGP multi client stream
- if stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id):
+ if queue_item.media_type == MediaType.ANNOUNCEMENT:
+ # stream announcement url directly
+ audio_iterator = get_media_stream(
+ self.mass, queue_item.streamdetails, pcm_format=pcm_format
+ )
+ elif stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id):
+ # handle special case for UGP multi client stream
stream_job.expected_players.add(player_id)
audio_iterator = stream_job.subscribe(
player_id=player_id,
from requests.exceptions import RequestException
from soco import events_asyncio, zonegroupstate
from soco.discovery import discover
+from sonos_websocket.exception import SonosWebsocketError
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
PlayerFeature.VOLUME_SET,
PlayerFeature.ENQUEUE_NEXT,
PlayerFeature.PAUSE,
+ PlayerFeature.PLAY_ANNOUNCEMENT,
)
CONF_NETWORK_SCAN = "network_scan"
self,
player_id: str,
queue_item: QueueItem,
- seek_position: int,
- fade_in: bool,
) -> None:
- """Handle PLAY MEDIA on given player.
-
- This is called by the Queue controller to start playing a queue item on the given player.
- The provider's own implementation should work out how to handle this request.
-
- - player_id: player_id of the player to handle the command.
- - queue_item: The QueueItem that needs to be played on the player.
- - seek_position: Optional seek to this position.
- - fade_in: Optionally fade in the item at playback start.
- """
+ """Handle PLAY MEDIA on given player."""
url = await self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
- seek_position=seek_position,
- fade_in=fade_in,
)
sonos_player = self.sonosplayers[player_id]
mass_player = self.mass.players.get(player_id)
await self._enqueue_item(sonos_player, url=url, queue_item=queue_item)
+ async def play_announcement(
+ self, player_id: str, announcement_url: str, use_pre_announce: bool = False
+ ) -> None:
+ """Handle (provider native) playback of an announcement on given player."""
+ if use_pre_announce:
+ announcement_url = self.mass.streams.get_announcement_url(
+ player_id, announcement_url, True
+ )
+ sonos_player = self.sonosplayers[player_id]
+ mass_player = self.mass.players.get(player_id)
+ temp_volume = int(min(75, mass_player.volume_level * 1.5))
+ self.logger.debug(
+ "Playing announcement %s using websocket audioclip on %s",
+ announcement_url,
+ sonos_player.zone_name,
+ )
+ try:
+ response, _ = await sonos_player.websocket.play_clip(
+ announcement_url,
+ volume=temp_volume,
+ )
+ except SonosWebsocketError as exc:
+ raise PlayerCommandFailed(f"Error when calling Sonos websocket: {exc}") from exc
+ if response["success"]:
+ return
+
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates.