From 7ac73dbf50313945571f399391ecbe765229ee1b Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Mon, 18 Mar 2024 00:02:18 +0100 Subject: [PATCH] Announce support (TTS with automatic resume) (#1148) --- .gitignore | 1 + music_assistant/client/players.py | 14 ++ .../common/helpers/global_cache.py | 1 - music_assistant/common/helpers/uri.py | 2 +- music_assistant/common/helpers/util.py | 7 + music_assistant/common/models/enums.py | 66 +++++- music_assistant/common/models/player.py | 3 + music_assistant/common/models/player_queue.py | 1 - music_assistant/common/models/queue_item.py | 2 + .../common/models/streamdetails.py | 6 +- music_assistant/constants.py | 4 +- .../server/controllers/player_queues.py | 80 +++++-- music_assistant/server/controllers/players.py | 199 +++++++++++++++--- music_assistant/server/controllers/streams.py | 143 +++++++++---- music_assistant/server/helpers/audio.py | 37 ++-- .../server/models/player_provider.py | 11 +- .../server/providers/airplay/__init__.py | 72 +++---- .../server/providers/chromecast/__init__.py | 15 +- .../server/providers/dlna/__init__.py | 15 +- .../server/providers/fully_kiosk/__init__.py | 15 +- .../server/providers/hass_players/__init__.py | 15 +- .../server/providers/plex/__init__.py | 3 - .../server/providers/slimproto/__init__.py | 17 +- .../server/providers/snapcast/__init__.py | 11 +- .../server/providers/sonos/__init__.py | 43 ++-- 25 files changed, 536 insertions(+), 247 deletions(-) diff --git a/.gitignore b/.gitignore index 9356c840..673eb1d9 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ venv/ *.spec .history .idea +.coverage diff --git a/music_assistant/client/players.py b/music_assistant/client/players.py index 119081df..2e920324 100644 --- a/music_assistant/client/players.py +++ b/music_assistant/client/players.py @@ -121,6 +121,20 @@ class Players: """ await self.client.send_command("players/cmd/unsync", player_id=player_id) + async def play_announcement( + self, + player_id: str, + url: str, + use_pre_announce: bool = False, + ) -> None: + """Handle playback of an announcement (url) on given player.""" + await self.client.send_command( + "players/cmd/play_announcement", + player_id=player_id, + url=url, + use_pre_announce=use_pre_announce, + ) + # PlayerGroup related endpoints/commands async def set_player_group_volume(self, player_id: str, volume_level: int) -> None: diff --git a/music_assistant/common/helpers/global_cache.py b/music_assistant/common/helpers/global_cache.py index 719b2a09..a33f50a9 100644 --- a/music_assistant/common/helpers/global_cache.py +++ b/music_assistant/common/helpers/global_cache.py @@ -1,6 +1,5 @@ """Provides a simple global memory cache.""" - from __future__ import annotations import asyncio diff --git a/music_assistant/common/helpers/uri.py b/music_assistant/common/helpers/uri.py index 5db630da..558417bc 100644 --- a/music_assistant/common/helpers/uri.py +++ b/music_assistant/common/helpers/uri.py @@ -24,7 +24,7 @@ def parse_uri(uri: str) -> tuple[MediaType, str, str]: provider_instance_id_or_domain = "url" media_type = MediaType.UNKNOWN item_id = uri - elif "://" in uri: + elif "://" in uri and len(uri.split("/")) >= 4: # music assistant-style uri # provider://media_type/item_id provider_instance_id_or_domain = uri.split("://")[0] diff --git a/music_assistant/common/helpers/util.py b/music_assistant/common/helpers/util.py index 13334871..2dd0657a 100644 --- a/music_assistant/common/helpers/util.py +++ b/music_assistant/common/helpers/util.py @@ -289,3 +289,10 @@ def is_valid_uuid(uuid_to_test: str) -> bool: except ValueError: return False return str(uuid_obj) == uuid_to_test + + +class classproperty(property): # noqa: N801 + """Implement class property for python3.11+.""" + + def __get__(self, cls, owner): # noqa: D105 + return classmethod(self.fget).__get__(None, owner)() diff --git a/music_assistant/common/models/enums.py b/music_assistant/common/models/enums.py index f9193f58..9753c3f6 100644 --- a/music_assistant/common/models/enums.py +++ b/music_assistant/common/models/enums.py @@ -3,6 +3,9 @@ from __future__ import annotations from enum import StrEnum +from typing import Self + +from music_assistant.common.helpers.util import classproperty class MediaType(StrEnum): @@ -14,11 +17,16 @@ class MediaType(StrEnum): PLAYLIST = "playlist" RADIO = "radio" FOLDER = "folder" + ANNOUNCEMENT = "announcement" UNKNOWN = "unknown" @classmethod - @property - def ALL(cls) -> tuple[MediaType, ...]: # noqa: N802 + def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003 + """Set default enum member if an unknown value is provided.""" + return cls.UNKNOWN + + @classproperty + def ALL(self) -> tuple[MediaType, ...]: # noqa: N802 """Return all (default) MediaTypes as tuple.""" return ( MediaType.ARTIST, @@ -26,6 +34,7 @@ class MediaType(StrEnum): MediaType.TRACK, MediaType.PLAYLIST, MediaType.RADIO, + MediaType.ANNOUNCEMENT, ) @@ -43,6 +52,12 @@ class ExternalID(StrEnum): ASIN = "asin" # amazon unique number to identify albums DISCOGS = "discogs" # id for media item on discogs TADB = "tadb" # the audio db id + UNKNOWN = "unknown" + + @classmethod + def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003 + """Set default enum member if an unknown value is provided.""" + return cls.UNKNOWN class LinkType(StrEnum): @@ -59,6 +74,12 @@ class LinkType(StrEnum): DISCOGS = "discogs" WIKIPEDIA = "wikipedia" ALLMUSIC = "allmusic" + UNKNOWN = "unknown" + + @classmethod + def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003 + """Set default enum member if an unknown value is provided.""" + return cls.UNKNOWN class ImageType(StrEnum): @@ -75,6 +96,11 @@ class ImageType(StrEnum): DISCART = "discart" OTHER = "other" + @classmethod + def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003 + """Set default enum member if an unknown value is provided.""" + return cls.OTHER + class AlbumType(StrEnum): """Enum for Album type.""" @@ -114,6 +140,11 @@ class ContentType(StrEnum): MPEG_DASH = "dash" UNKNOWN = "?" + @classmethod + def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003 + """Set default enum member if an unknown value is provided.""" + return cls.UNKNOWN + @classmethod def try_parse(cls: ContentType, string: str) -> ContentType: """Try to parse ContentType from (url)string/extension.""" @@ -211,6 +242,12 @@ class PlayerType(StrEnum): STEREO_PAIR = "stereo_pair" GROUP = "group" SYNC_GROUP = "sync_group" + UNKNOWN = "unknown" + + @classmethod + def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003 + """Set default enum member if an unknown value is provided.""" + return cls.UNKNOWN class PlayerFeature(StrEnum): @@ -232,6 +269,13 @@ class PlayerFeature(StrEnum): SYNC = "sync" SEEK = "seek" ENQUEUE_NEXT = "enqueue_next" + PLAY_ANNOUNCEMENT = "play_announcement" + UNKNOWN = "unknown" + + @classmethod + def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003 + """Set default enum member if an unknown value is provided.""" + return cls.UNKNOWN class EventType(StrEnum): @@ -254,6 +298,12 @@ class EventType(StrEnum): PLAYER_CONFIG_UPDATED = "player_config_updated" SYNC_TASKS_UPDATED = "sync_tasks_updated" AUTH_SESSION = "auth_session" + UNKNOWN = "unknown" + + @classmethod + def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003 + """Set default enum member if an unknown value is provided.""" + return cls.UNKNOWN class ProviderFeature(StrEnum): @@ -310,6 +360,12 @@ class ProviderFeature(StrEnum): # # PLUGIN FEATURES # + UNKNOWN = "unknown" + + @classmethod + def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003 + """Set default enum member if an unknown value is provided.""" + return cls.UNKNOWN class ProviderType(StrEnum): @@ -333,3 +389,9 @@ class ConfigEntryType(StrEnum): LABEL = "label" DIVIDER = "divider" ACTION = "action" + UNKNOWN = "unknown" + + @classmethod + def _missing_(cls: Self, value: object) -> Self: # noqa: ARG003 + """Set default enum member if an unknown value is provided.""" + return cls.UNKNOWN diff --git a/music_assistant/common/models/player.py b/music_assistant/common/models/player.py index ba4e3fab..9ca4f123 100644 --- a/music_assistant/common/models/player.py +++ b/music_assistant/common/models/player.py @@ -104,6 +104,9 @@ class Player(DataClassDictMixin): # and pass along freely extra_data: dict[str, Any] = field(default_factory=dict) + # announcement_in_progress boolean to indicate there's an announcement in progress. + announcement_in_progress: bool = False + @property def corrected_elapsed_time(self) -> float: """Return the corrected/realtime elapsed time.""" diff --git a/music_assistant/common/models/player_queue.py b/music_assistant/common/models/player_queue.py index eb46ab5f..de1a1930 100644 --- a/music_assistant/common/models/player_queue.py +++ b/music_assistant/common/models/player_queue.py @@ -36,7 +36,6 @@ class PlayerQueue(DataClassDictMixin): current_item: QueueItem | None = None next_item: QueueItem | None = None radio_source: list[MediaItemType] = field(default_factory=list) - announcement_in_progress: bool = False flow_mode: bool = False # flow_mode_start_index: index of the first item of the flow stream flow_mode_start_index: int = 0 diff --git a/music_assistant/common/models/queue_item.py b/music_assistant/common/models/queue_item.py index c01b60ba..bf98bf2b 100644 --- a/music_assistant/common/models/queue_item.py +++ b/music_assistant/common/models/queue_item.py @@ -56,6 +56,8 @@ class QueueItem(DataClassDictMixin): """Return MediaType for this QueueItem (for convenience purposes).""" if self.media_item: return self.media_item.media_type + if self.streamdetails: + return self.streamdetails.media_type return MediaType.UNKNOWN @classmethod diff --git a/music_assistant/common/models/streamdetails.py b/music_assistant/common/models/streamdetails.py index 008b1184..f958d596 100644 --- a/music_assistant/common/models/streamdetails.py +++ b/music_assistant/common/models/streamdetails.py @@ -53,10 +53,11 @@ class StreamDetails(DataClassDictMixin): can_seek: bool = True # the fields below will be set/controlled by the streamcontroller + seek_position: int = 0 + fade_in: bool = False loudness: LoudnessMeasurement | None = None queue_id: str | None = None seconds_streamed: float | None = None - seconds_skipped: float | None = None target_loudness: float | None = None def __str__(self) -> str: @@ -67,7 +68,8 @@ class StreamDetails(DataClassDictMixin): """Execute action(s) on serialization.""" d.pop("queue_id", None) d.pop("seconds_streamed", None) - d.pop("seconds_skipped", None) + d.pop("seek_position", None) + d.pop("fade_in", None) d.pop("target_loudness", None) return d diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 6b463568..44b9f328 100644 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -3,8 +3,8 @@ import pathlib from typing import Final -API_SCHEMA_VERSION: Final[int] = 23 -MIN_SCHEMA_VERSION: Final[int] = 23 +API_SCHEMA_VERSION: Final[int] = 24 +MIN_SCHEMA_VERSION: Final[int] = 24 DB_SCHEMA_VERSION: Final[int] = 28 ROOT_LOGGER_NAME: Final[str] = "music_assistant" diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index 50fe0c6d..80d0daef 100644 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -215,6 +215,9 @@ class PlayerQueuesController(CoreController): @api_command("players/queue/shuffle") def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None: """Configure shuffle setting on the the queue.""" + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") + return queue = self._queues[queue_id] if queue.shuffle_enabled == shuffle_enabled: return # no change @@ -243,6 +246,9 @@ class PlayerQueuesController(CoreController): @api_command("players/queue/repeat") def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None: """Configure repeat setting on the the queue.""" + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") + return queue = self._queues[queue_id] if queue.repeat_mode == repeat_mode: return # no change @@ -267,7 +273,7 @@ class PlayerQueuesController(CoreController): """ # ruff: noqa: PLR0915,PLR0912 queue = self._queues[queue_id] - if queue.announcement_in_progress: + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: LOGGER.warning("Ignore queue command: An announcement is in progress") return @@ -434,6 +440,9 @@ class PlayerQueuesController(CoreController): - pos_shift: move item x positions up if negative value - pos_shift: move item to top of queue as next item if 0. """ + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") + return queue = self._queues[queue_id] item_index = self.index_by_id(queue_id, queue_item_id) if item_index <= queue.index_in_buffer: @@ -458,6 +467,9 @@ class PlayerQueuesController(CoreController): @api_command("players/queue/delete_item") def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None: """Delete item (by id or index) from the queue.""" + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") + return if isinstance(item_id_or_index, str): item_index = self.index_by_id(queue_id, item_id_or_index) else: @@ -475,6 +487,9 @@ class PlayerQueuesController(CoreController): @api_command("players/queue/clear") def clear(self, queue_id: str) -> None: """Clear all items in the queue.""" + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") + return queue = self._queues[queue_id] queue.radio_source = [] if queue.state != PlayerState.IDLE: @@ -492,8 +507,8 @@ class PlayerQueuesController(CoreController): - queue_id: queue_id of the playerqueue to handle the command. """ - if self._queues[queue_id].announcement_in_progress: - LOGGER.warning("Ignore queue command for %s because an announcement is in progress.") + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") return # simply forward the command to underlying player await self.mass.players.cmd_stop(queue_id) @@ -505,8 +520,8 @@ class PlayerQueuesController(CoreController): - queue_id: queue_id of the playerqueue to handle the command. """ - if self._queues[queue_id].announcement_in_progress: - LOGGER.warning("Ignore queue command for %s because an announcement is in progress.") + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") return if self._queues[queue_id].state == PlayerState.PAUSED: # simply forward the command to underlying player @@ -520,8 +535,8 @@ class PlayerQueuesController(CoreController): - queue_id: queue_id of the playerqueue to handle the command. """ - if self._queues[queue_id].announcement_in_progress: - LOGGER.warning("Ignore queue command for %s because an announcement is in progress.") + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") return player = self.mass.players.get(queue_id, True) if PlayerFeature.PAUSE not in player.supported_features: @@ -548,6 +563,9 @@ class PlayerQueuesController(CoreController): - queue_id: queue_id of the queue to handle the command. """ + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") + return current_index = self._queues[queue_id].current_index next_index = self._get_next_index(queue_id, current_index, True) if next_index is None: @@ -560,6 +578,9 @@ class PlayerQueuesController(CoreController): - queue_id: queue_id of the queue to handle the command. """ + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") + return current_index = self._queues[queue_id].current_index if current_index is None: return @@ -572,6 +593,9 @@ class PlayerQueuesController(CoreController): - queue_id: queue_id of the queue to handle the command. - seconds: number of seconds to skip in track. Use negative value to skip back. """ + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") + return await self.seek(queue_id, self._queues[queue_id].elapsed_time + seconds) @api_command("players/queue/seek") @@ -581,6 +605,9 @@ class PlayerQueuesController(CoreController): - queue_id: queue_id of the queue to handle the command. - position: position in seconds to seek to in the current playing item. """ + if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") + return queue = self._queues[queue_id] assert queue.current_item, "No item loaded" assert queue.current_item.media_item.media_type == MediaType.TRACK @@ -637,9 +664,6 @@ class PlayerQueuesController(CoreController): ) -> None: """Play item at index (or item_id) X in queue.""" queue = self._queues[queue_id] - if queue.announcement_in_progress: - LOGGER.warning("Ignore queue command for %s because an announcement is in progress.") - return if isinstance(index, str): index = self.index_by_id(queue_id, index) queue_item = self.get_item(queue_id, index) @@ -650,11 +674,13 @@ class PlayerQueuesController(CoreController): queue.index_in_buffer = index queue.flow_mode_start_index = index queue.flow_mode = False # reset + # get streamdetails - do this here to catch unavailable items early + queue_item.streamdetails = await get_stream_details( + self.mass, queue_item, seek_position=seek_position, fade_in=fade_in + ) await self.mass.players.play_media( player_id=queue_id, queue_item=queue_item, - seek_position=int(seek_position), - fade_in=fade_in, ) # Interaction with player @@ -703,6 +729,9 @@ class PlayerQueuesController(CoreController): if player.player_id not in self._queues: # race condition return + if player.announcement_in_progress: + # do nothing while the announcement is in progress + return queue_id = player.player_id player = self.mass.players.get(queue_id) queue = self._queues[queue_id] @@ -714,33 +743,38 @@ class PlayerQueuesController(CoreController): # determine if this queue is currently active for this player queue.active = player.powered and player.active_source == queue.queue_id if not queue.active: + # return early if the queue is not active queue.state = PlayerState.IDLE - self._prev_states.pop(queue_id, None) + if prev_state := self._prev_states.pop(queue_id, None): + self.signal_update(queue_id) return # update current item from player report if queue.flow_mode: # flow mode active, calculate current item queue.current_index, queue.elapsed_time = self.__get_queue_stream_index(queue, player) + queue.elapsed_time_last_updated = time.time() else: # queue is active and player has one of our tracks loaded, update state if item_id := self._parse_player_current_item_id(queue_id, player.current_item_id): queue.current_index = self.index_by_id(queue_id, item_id) - queue.elapsed_time = int(player.corrected_elapsed_time) + if player.state == PlayerState.PLAYING: + queue.elapsed_time = int(player.corrected_elapsed_time) + queue.elapsed_time_last_updated = player.elapsed_time_last_updated # only update these attributes if the queue is active # and has an item loaded so we are able to resume it queue.state = player.state - queue.elapsed_time_last_updated = time.time() queue.current_item = self.get_item(queue_id, queue.current_index) queue.next_item = self._get_next_item(queue_id) # correct elapsed time when seeking if ( queue.current_item and queue.current_item.streamdetails - and queue.current_item.streamdetails.seconds_skipped + and queue.current_item.streamdetails.seek_position + and player.state in (PlayerState.PLAYING, PlayerState.PAUSED) and not queue.flow_mode ): - queue.elapsed_time += queue.current_item.streamdetails.seconds_skipped + queue.elapsed_time += queue.current_item.streamdetails.seek_position # basic throttle: do not send state changed events if queue did not actually change prev_state = self._prev_states.get(queue_id, {}) @@ -972,6 +1006,9 @@ class PlayerQueuesController(CoreController): return if prev_state.get("state") != PlayerState.PLAYING: return + if (player := self.mass.players.get(queue.queue_id)) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") + return current_item = self.get_item(queue.queue_id, queue.current_index) if not current_item: return # guard, just in case something bad happened @@ -985,6 +1022,11 @@ class PlayerQueuesController(CoreController): seconds_remaining = int(duration - player.corrected_elapsed_time) async def _enqueue_next(index: int, supports_enqueue: bool = False) -> None: + if ( + player := self.mass.players.get(queue.queue_id) + ) and player.announcement_in_progress: + LOGGER.warning("Ignore queue command: An announcement is in progress") + return with suppress(QueueEmpty): next_item = await self.preload_next_item(queue.queue_id, index) if supports_enqueue: @@ -1162,8 +1204,8 @@ class PlayerQueuesController(CoreController): queue_index += 1 else: # no more seconds left to divide, this is our track - # account for any seeking by adding the skipped seconds - track_sec_skipped = queue_track.streamdetails.seconds_skipped or 0 + # account for any seeking by adding the skipped/seeked seconds + track_sec_skipped = queue_track.streamdetails.seek_position track_time = elapsed_time_queue + track_sec_skipped - total_time break return queue_index, track_time diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index 74a46398..f49f0dca 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -5,13 +5,16 @@ from __future__ import annotations import asyncio import functools import logging +from contextlib import suppress from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, cast import shortuuid from music_assistant.common.helpers.util import get_changed_values from music_assistant.common.models.enums import ( + ContentType, EventType, + MediaType, PlayerFeature, PlayerState, PlayerType, @@ -24,7 +27,10 @@ from music_assistant.common.models.errors import ( ProviderUnavailableError, UnsupportedFeaturedException, ) +from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.common.models.queue_item import QueueItem +from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.constants import ( CONF_AUTO_PLAY, CONF_GROUP_MEMBERS, @@ -41,7 +47,7 @@ if TYPE_CHECKING: from collections.abc import Awaitable, Callable, Coroutine, Iterable, Iterator from music_assistant.common.models.config_entries import CoreConfig - from music_assistant.common.models.queue_item import QueueItem + LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.players") @@ -50,14 +56,14 @@ _R = TypeVar("_R") _P = ParamSpec("_P") -def log_player_command( +def handle_player_command( func: Callable[Concatenate[_PlayerControllerT, _P], Awaitable[_R]], ) -> Callable[Concatenate[_PlayerControllerT, _P], Coroutine[Any, Any, _R | None]]: """Check and log commands to players.""" @functools.wraps(func) async def wrapper(self: _PlayerControllerT, *args: _P.args, **kwargs: _P.kwargs) -> _R | None: - """Log and log_player_command commands to players.""" + """Log and handle_player_command commands to players.""" player_id = kwargs["player_id"] if "player_id" in kwargs else args[0] if (player := self._players.get(player_id)) is None or not player.available: # player not existent @@ -67,6 +73,7 @@ def log_player_command( player_id, ) return + self.logger.debug( "Handling command %s for player %s", func.__name__, @@ -321,7 +328,7 @@ class PlayerController(CoreController): # Player commands @api_command("players/cmd/stop") - @log_player_command + @handle_player_command async def cmd_stop(self, player_id: str) -> None: """Send STOP command to given player. @@ -332,7 +339,7 @@ class PlayerController(CoreController): await player_provider.cmd_stop(player_id) @api_command("players/cmd/play") - @log_player_command + @handle_player_command async def cmd_play(self, player_id: str) -> None: """Send PLAY (unpause) command to given player. @@ -343,7 +350,7 @@ class PlayerController(CoreController): await player_provider.cmd_play(player_id) @api_command("players/cmd/pause") - @log_player_command + @handle_player_command async def cmd_pause(self, player_id: str) -> None: """Send PAUSE command to given player. @@ -380,7 +387,7 @@ class PlayerController(CoreController): self.mass.create_task(_watch_pause(player_id)) @api_command("players/cmd/play_pause") - @log_player_command + @handle_player_command async def cmd_play_pause(self, player_id: str) -> None: """Toggle play/pause on given player. @@ -393,7 +400,7 @@ class PlayerController(CoreController): await self.cmd_play(player_id) @api_command("players/cmd/power") - @log_player_command + @handle_player_command async def cmd_power(self, player_id: str, powered: bool) -> None: """Send POWER command to given player. @@ -455,7 +462,7 @@ class PlayerController(CoreController): await self.mass.player_queues.resume(player_id) @api_command("players/cmd/volume_set") - @log_player_command + @handle_player_command async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player. @@ -475,7 +482,7 @@ class PlayerController(CoreController): await player_provider.cmd_volume_set(player_id, volume_level) @api_command("players/cmd/volume_up") - @log_player_command + @handle_player_command async def cmd_volume_up(self, player_id: str) -> None: """Send VOLUME_UP command to given player. @@ -485,7 +492,7 @@ class PlayerController(CoreController): await self.cmd_volume_set(player_id, new_volume) @api_command("players/cmd/volume_down") - @log_player_command + @handle_player_command async def cmd_volume_down(self, player_id: str) -> None: """Send VOLUME_DOWN command to given player. @@ -495,7 +502,7 @@ class PlayerController(CoreController): await self.cmd_volume_set(player_id, new_volume) @api_command("players/cmd/group_volume") - @log_player_command + @handle_player_command async def cmd_group_volume(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given playergroup. @@ -554,7 +561,7 @@ class PlayerController(CoreController): self.update(player_id) @api_command("players/cmd/volume_mute") - @log_player_command + @handle_player_command async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: """Send VOLUME_MUTE command to given player. @@ -585,13 +592,31 @@ class PlayerController(CoreController): player_prov = self.mass.players.get_player_provider(player_id) await player_prov.cmd_seek(player_id, position) - async def play_media( + @api_command("players/cmd/play_announcement") + async def play_announcement( self, player_id: str, - queue_item: QueueItem, - seek_position: int, - fade_in: bool, + url: str, + use_pre_announce: bool = False, ) -> None: + """Handle playback of an announcement (url) on given player.""" + player = self.get(player_id, True) + if player.announcement_in_progress: + return + try: + # mark announcement_in_progress on player + player.announcement_in_progress = True + # check for native announce support + if PlayerFeature.PLAY_ANNOUNCEMENT in player.supported_features: + if prov := self.mass.get_provider(player.provider): + await prov.play_announcement(player_id, url, use_pre_announce) + return + # use fallback/default implementation + await self._play_announcement(player, url, use_pre_announce) + finally: + player.announcement_in_progress = False + + async def play_media(self, player_id: str, queue_item: QueueItem) -> None: """Handle PLAY MEDIA on given player. This is called by the Queue controller to start playing a queue item on the given player. @@ -599,28 +624,19 @@ class PlayerController(CoreController): - player_id: player_id of the player to handle the command. - queue_item: The QueueItem that needs to be played on the player. - - seek_position: Optional seek to this position. - - fade_in: Optionally fade in the item at playback start. """ if player_id.startswith(SYNCGROUP_PREFIX): # redirect to syncgroup-leader if needed await self.cmd_group_power(player_id, True) group_player = self.get(player_id, True) if sync_leader := self.get_sync_leader(group_player): - await self.play_media( - sync_leader.player_id, - queue_item=queue_item, - seek_position=seek_position, - fade_in=fade_in, - ) + await self.play_media(sync_leader.player_id, queue_item=queue_item) group_player.state = PlayerState.PLAYING return player_prov = self.mass.players.get_player_provider(player_id) await player_prov.play_media( player_id=player_id, queue_item=queue_item, - seek_position=int(seek_position), - fade_in=fade_in, ) async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: @@ -650,7 +666,7 @@ class PlayerController(CoreController): await player_prov.enqueue_next_queue_item(player_id=player_id, queue_item=queue_item) @api_command("players/cmd/sync") - @log_player_command + @handle_player_command async def cmd_sync(self, player_id: str, target_player: str) -> None: """Handle SYNC command for given player. @@ -691,7 +707,7 @@ class PlayerController(CoreController): await player_provider.cmd_sync(player_id, target_player) @api_command("players/cmd/unsync") - @log_player_command + @handle_player_command async def cmd_unsync(self, player_id: str) -> None: """Handle UNSYNC command for given player. @@ -1049,3 +1065,128 @@ class PlayerController(CoreController): if new_power: # if a child player turned ON while the group player is on, we need to resync/resume self.mass.create_task(self._sync_syncgroup(group_player.player_id)) + + async def _play_announcement( + self, + player: Player, + url: str, + use_pre_announce: bool | None = None, + ) -> None: + """Handle (default/fallback) implementation of the play announcement feature. + + This default implementation will; + - stop playback of the current media (if needed) + - power on the player (if needed) + - raise the volume a bit + - play the announcement (from given url) + - wait for the player to finish playing + - restore the previous power and volume + - restore playback (if needed and if possible) + + This default implementation will only be used if the player's + provider has no native support for the PLAY_ANNOUNCEMENT feature. + """ + if player.synced_to: + # redirect to sync master if player is group child + self.mass.create_task(self.play_announcement(player.synced_to, url)) + return + if active_group := self._get_active_player_group(player): + # redirect to group player if playergroup is atcive + self.mass.create_task(self.play_announcement(active_group.player_id, url)) + return + self.logger.info( + "Playback announcement to player %s (with pre-announce: %s): %s", + player.display_name, + use_pre_announce, + url, + ) + # use stream server to host announcement on local network + # this ensures playback on all players, including ones that do not + # like https hosts and it also offers the pre-announce 'bell' + url = self.mass.streams.get_announcement_url(player.player_id, url, use_pre_announce) + # create a queue item for the announcement so + # we can send a regular play-media call downstream + queue_item = QueueItem( + queue_id=player.player_id, + queue_item_id=url, + name="Announcement", + duration=None, + streamdetails=StreamDetails( + provider="url", + item_id=url, + audio_format=AudioFormat( + content_type=ContentType.try_parse(url), + ), + media_type=MediaType.ANNOUNCEMENT, + direct=url, + data=url, + target_loudness=-10, + ), + ) + prev_power = player.powered + prev_volume = player.volume_level + prev_state = player.state + queue = self.mass.player_queues.get_active_queue(player.player_id) + prev_queue_active = queue.active + prev_item_id = player.current_item_id + # stop player if its currently playing + if prev_state in (PlayerState.PLAYING, PlayerState.PAUSED): + self.logger.debug( + "Announcement to player %s - stop existing content (%s)...", + player.display_name, + prev_item_id, + ) + await self.cmd_stop(player.player_id) + # wait for the player to stop + with suppress(TimeoutError): + await self.wait_for_state(player, PlayerState.IDLE, 5) + # increase volume a bit + temp_volume = int(min(75, prev_volume * 1.5)) + if temp_volume > prev_volume: + self.logger.debug( + "Announcement to player %s - setting temporary volume (%s)...", + player.display_name, + temp_volume, + ) + await self.cmd_volume_set(player.player_id, temp_volume) + # play the announcement + self.logger.debug( + "Announcement to player %s - playing the announcement on the player...", + player.display_name, + ) + await self.play_media(player_id=player.player_id, queue_item=queue_item) + # wait for the player to play + with suppress(TimeoutError): + await self.wait_for_state(player, PlayerState.PLAYING, 5) + self.logger.debug( + "Announcement to player %s - waiting on the player to stop playing...", + player.display_name, + ) + # wait for the player to stop playing + with suppress(TimeoutError): + await self.wait_for_state(player, PlayerState.IDLE, 30) + self.logger.debug( + "Announcement to player %s - restore previous state...", player.display_name + ) + # restore volume + if temp_volume != prev_volume: + await self.cmd_volume_set(player.player_id, prev_volume) + player.current_item_id = prev_item_id + # either power off the player or resume playing + if not prev_power: + await self.cmd_power(player.player_id, False) + return + elif prev_queue_active and prev_state == PlayerState.PLAYING: + await self.mass.player_queues.resume(queue.queue_id, True) + elif prev_state == PlayerState.PLAYING: + # player was playing something else - try to resume that here + self.logger.warning("Can not resume %s on %s", prev_item_id, player.display_name) + # TODO !! + + async def wait_for_state( + self, player: Player, wanted_state: PlayerState, timeout: float = 60.0 + ) -> None: + """Wait for the given player to reach the given state.""" + async with asyncio.timeout(timeout): + while player.state != wanted_state: + await asyncio.sleep(0.1) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index e9db6aae..26d7bd10 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -18,16 +18,22 @@ from typing import TYPE_CHECKING import shortuuid from aiohttp import web -from music_assistant.common.helpers.util import empty_queue, get_ip, select_free_port +from music_assistant.common.helpers.util import ( + empty_queue, + get_ip, + select_free_port, + try_parse_bool, +) from music_assistant.common.models.config_entries import ( ConfigEntry, ConfigValueOption, ConfigValueType, ) from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType -from music_assistant.common.models.errors import MediaNotFoundError, QueueEmpty +from music_assistant.common.models.errors import QueueEmpty from music_assistant.common.models.media_items import AudioFormat from music_assistant.constants import ( + ANNOUNCE_ALERT_FILE, CONF_BIND_IP, CONF_BIND_PORT, CONF_CROSSFADE, @@ -43,7 +49,6 @@ from music_assistant.server.helpers.audio import ( get_ffmpeg_stream, get_media_stream, get_player_filter_params, - get_stream_details, ) from music_assistant.server.helpers.util import get_ips from music_assistant.server.helpers.webserver import Webserver @@ -254,6 +259,7 @@ class StreamsController(CoreController): "some player specific local control callbacks." ) self.manifest.icon = "cast-audio" + self.announcements: dict[str, str] = {} @property def base_url(self) -> str: @@ -351,6 +357,11 @@ class StreamsController(CoreController): "/command/{queue_id}/{command}.mp3", self.serve_command_request, ), + ( + "*", + "/announcement/{player_id}.{fmt}", + self.serve_announcement_stream, + ), ], ) @@ -363,12 +374,13 @@ class StreamsController(CoreController): player_id: str, queue_item: QueueItem, output_codec: ContentType, - seek_position: int = 0, - fade_in: bool = False, flow_mode: bool = False, ) -> str: """Resolve the stream URL for the given QueueItem.""" fmt = output_codec.value + # handle announcement item + if queue_item.media_type == MediaType.ANNOUNCEMENT: + return queue_item.queue_item_id # handle request for multi client queue stream stream_job = self.multi_client_jobs.get(queue_item.queue_id) if queue_item.queue_item_id == "flow" or stream_job and stream_job.pending: @@ -379,10 +391,6 @@ class StreamsController(CoreController): query_params = {} base_path = "flow" if flow_mode else "single" url = f"{self._server.base_url}/{base_path}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" # noqa: E501 - if seek_position: - query_params["seek_position"] = str(seek_position) - if fade_in: - query_params["fade_in"] = "1" # we add a timestamp as basic checksum # most importantly this is to invalidate any caches # but also to handle edge cases such as single track repeat @@ -394,8 +402,6 @@ class StreamsController(CoreController): self, queue_id: str, start_queue_item: QueueItem, - seek_position: int = 0, - fade_in: bool = False, pcm_bit_depth: int = 24, pcm_sample_rate: int = 48000, expected_players: set[str] | None = None, @@ -424,8 +430,6 @@ class StreamsController(CoreController): queue=queue, start_queue_item=start_queue_item, pcm_format=pcm_format, - seek_position=seek_position, - fade_in=fade_in, ), pcm_format=pcm_format, expected_players=expected_players or set(), @@ -444,15 +448,8 @@ class StreamsController(CoreController): queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id) if not queue_item: raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}") - try: - queue_item.streamdetails = await get_stream_details(self.mass, queue_item=queue_item) - except MediaNotFoundError: - raise web.HTTPNotFound( - reason=f"Unable to retrieve streamdetails for item: {queue_item}" - ) - seek_position = int(request.query.get("seek_position", 0)) - queue_item.streamdetails.seconds_skipped = seek_position - fade_in = bool(request.query.get("fade_in", 0)) + if not queue_item.streamdetails: + raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}") # work out output format/details output_format = await self._get_output_format( output_format_str=request.match_info["fmt"], @@ -460,7 +457,6 @@ class StreamsController(CoreController): default_sample_rate=queue_item.streamdetails.audio_format.sample_rate, default_bit_depth=queue_item.streamdetails.audio_format.bit_depth, ) - # prepare request, add some DLNA/UPNP compatible headers headers = { **DEFAULT_STREAM_HEADERS, @@ -496,8 +492,6 @@ class StreamsController(CoreController): self.mass, streamdetails=queue_item.streamdetails, pcm_format=pcm_format, - seek_position=seek_position, - fade_in=fade_in, ), input_format=pcm_format, output_format=output_format, @@ -520,8 +514,6 @@ class StreamsController(CoreController): start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id) if not start_queue_item: raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}") - seek_position = int(request.query.get("seek_position", 0)) - fade_in = bool(request.query.get("fade_in", 0)) queue_player = self.mass.players.get(queue_id) # work out output format/details output_format = await self._get_output_format( @@ -565,8 +557,6 @@ class StreamsController(CoreController): queue=queue, start_queue_item=start_queue_item, pcm_format=pcm_format, - seek_position=seek_position, - fade_in=fade_in, ), input_format=pcm_format, output_format=output_format, @@ -700,17 +690,94 @@ class StreamsController(CoreController): self.mass.create_task(self.mass.player_queues.next(queue_id)) return web.FileResponse(SILENCE_FILE) + async def serve_announcement_stream(self, request: web.Request) -> web.Response: + """Stream announcement audio to a player.""" + self._log_request(request) + player_id = request.match_info["player_id"] + player = self.mass.player_queues.get(player_id) + if not player: + raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}") + if player_id not in self.announcements: + raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}") + announcement = self.announcements[player_id] + use_pre_announce = try_parse_bool(request.query.get("pre_announce")) + + # work out output format/details + fmt = request.match_info.get("fmt", announcement.rsplit(".")[-1]) + audio_format = AudioFormat(content_type=ContentType.try_parse(fmt)) + # prepare request, add some DLNA/UPNP compatible headers + headers = { + **DEFAULT_STREAM_HEADERS, + "Content-Type": f"audio/{audio_format.output_format_str}", + } + resp = web.StreamResponse( + status=200, + reason="OK", + headers=headers, + ) + await resp.prepare(request) + + # return early if this is not a GET request + if request.method != "GET": + return resp + + # all checks passed, start streaming! + self.logger.debug( + "Start serving audio stream for Announcement %s to %s", + announcement, + player.display_name, + ) + extra_args = [] + filter_params = ["loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5"] + if use_pre_announce: + extra_args += [ + "-i", + ANNOUNCE_ALERT_FILE, + "-filter_complex", + "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5", + ] + filter_params = [] + + async for chunk in get_ffmpeg_stream( + audio_input=announcement, + input_format=audio_format, + output_format=audio_format, + extra_args=extra_args, + filter_params=filter_params, + ): + try: + await resp.write(chunk) + except (BrokenPipeError, ConnectionResetError): + break + + self.logger.debug( + "Finished serving audio stream for Announcement %s to %s", + announcement, + player.display_name, + ) + + return resp + def get_command_url(self, player_or_queue_id: str, command: str) -> str: """Get the url for the special command stream.""" return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3" + def get_announcement_url( + self, + player_id: str, + announcement_url: str, + use_pre_announce: bool = False, + content_type: ContentType = ContentType.MP3, + ) -> str: + """Get the url for the special announcement stream.""" + self.announcements[player_id] = announcement_url + return f"{self.base_url}/announcement/{player_id}.{content_type.value}?pre_announce={use_pre_announce}" # noqa: E501 + async def get_flow_stream( self, queue: PlayerQueue, start_queue_item: QueueItem, pcm_format: AudioFormat, - seek_position: int = 0, - fade_in: bool = False, ) -> AsyncGenerator[bytes, None]: """Get a flow stream of all tracks in the queue as raw PCM audio.""" # ruff: noqa: PLR0915 @@ -735,15 +802,17 @@ class StreamsController(CoreController): # get (next) queue item to stream if queue_track is None: queue_track = start_queue_item - queue_track.streamdetails = await get_stream_details(self.mass, queue_track) else: - seek_position = 0 - fade_in = False try: queue_track = await self.mass.player_queues.preload_next_item(queue.queue_id) except QueueEmpty: break + if queue_track.streamdetails is None: + raise RuntimeError( + "No Streamdetails known for queue item %s", queue_track.queue_item_id + ) + self.logger.debug( "Start Streaming queue track: %s (%s) for queue %s", queue_track.streamdetails.uri, @@ -760,7 +829,6 @@ class StreamsController(CoreController): queue.queue_id, CONF_CROSSFADE_DURATION, 8 ) crossfade_size = int(pcm_sample_size * crossfade_duration) - queue_track.streamdetails.seconds_skipped = seek_position buffer_size = int(pcm_sample_size * 2) # 2 seconds if use_crossfade: buffer_size += crossfade_size @@ -771,8 +839,6 @@ class StreamsController(CoreController): self.mass, queue_track.streamdetails, pcm_format=pcm_format, - seek_position=seek_position, - fade_in=fade_in, # strip silence from begin/end if track is being crossfaded strip_silence_begin=use_crossfade, strip_silence_end=use_crossfade, @@ -837,7 +903,8 @@ class StreamsController(CoreController): # this also accounts for crossfade and silence stripping queue_track.streamdetails.seconds_streamed = bytes_written / pcm_sample_size queue_track.streamdetails.duration = ( - seek_position + queue_track.streamdetails.seconds_streamed + queue_track.streamdetails.seconds_skipped + or 0 + queue_track.streamdetails.seconds_streamed ) total_bytes_written += bytes_written self.logger.debug( diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index defcffbd..884493fe 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -234,17 +234,23 @@ async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) - analyze_jobs.discard(streamdetails.uri) -async def get_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> StreamDetails: +async def get_stream_details( + mass: MusicAssistant, + queue_item: QueueItem, + seek_position: int = 0, + fade_in: bool = False, +) -> StreamDetails: """Get streamdetails for the given QueueItem. This is called just-in-time when a PlayerQueue wants a MediaItem to be played. Do not try to request streamdetails in advance as this is expiring data. param media_item: The QueueItem for which to request the streamdetails for. """ - if queue_item.streamdetails and (time() < (queue_item.streamdetails.expires - 360)): - LOGGER.debug(f"Using cached streamdetails from queue_item for {queue_item.uri}") + if queue_item.streamdetails and (time() < queue_item.streamdetails.expires): + LOGGER.debug(f"Using (pre)cached streamdetails from queue_item for {queue_item.uri}") # we already have (fresh) streamdetails stored on the queueitem, use these. - # make a copy to prevent we're altering an existing object and introduce race conditions. + # this happens for example while seeking in a track. + # we create a copy (using to/from dict) to ensure the one-time values are cleared streamdetails = StreamDetails.from_dict(queue_item.streamdetails.to_dict()) else: # always request the full item as there might be other qualities available @@ -288,12 +294,17 @@ async def get_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> Str # set queue_id on the streamdetails so we know what is being streamed streamdetails.queue_id = queue_item.queue_id + # handle skip/fade_in details + streamdetails.seek_position = seek_position + streamdetails.fade_in = fade_in # handle volume normalization details if not streamdetails.loudness: streamdetails.loudness = await mass.music.get_track_loudness( streamdetails.item_id, streamdetails.provider ) - if ( + if streamdetails.target_loudness is not None: + streamdetails.target_loudness = streamdetails.target_loudness + elif ( player_settings := await mass.config.get_player_config(streamdetails.queue_id) ) and player_settings.get_value(CONF_VOLUME_NORMALIZATION): streamdetails.target_loudness = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET) @@ -370,8 +381,6 @@ async def get_media_stream( # noqa: PLR0915 mass: MusicAssistant, streamdetails: StreamDetails, pcm_format: AudioFormat, - seek_position: int = 0, - fade_in: bool = False, strip_silence_begin: bool = False, strip_silence_end: bool = False, ) -> AsyncGenerator[tuple[bool, bytes], None]: @@ -383,9 +392,9 @@ async def get_media_stream( # noqa: PLR0915 """ logger = LOGGER.getChild("media_stream") bytes_sent = 0 - streamdetails.seconds_skipped = seek_position + streamdetails.seconds_skipped = streamdetails.seek_position is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration - if is_radio or seek_position: + if is_radio or streamdetails.seek_position: strip_silence_begin = False # chunk size = 2 seconds of pcm audio pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2) @@ -397,7 +406,9 @@ async def get_media_stream( # noqa: PLR0915 # collect all arguments for ffmpeg filter_params = [] extra_args = [] - seek_pos = seek_position if (streamdetails.direct or not streamdetails.can_seek) else 0 + seek_pos = ( + streamdetails.seek_position if (streamdetails.direct or not streamdetails.can_seek) else 0 + ) if seek_pos: # only use ffmpeg seeking if the provider stream does not support seeking extra_args += ["-ss", str(seek_pos)] @@ -411,7 +422,7 @@ async def get_media_stream( # noqa: PLR0915 filter_rule += f":measured_thresh={streamdetails.loudness.threshold}" filter_rule += ":print_format=json" filter_params.append(filter_rule) - if fade_in: + if streamdetails.fade_in: filter_params.append("afade=type=in:start_time=0:duration=3") ffmpeg_args = _get_ffmpeg_args( input_format=streamdetails.audio_format, @@ -434,7 +445,7 @@ async def get_media_stream( # noqa: PLR0915 """Task that grabs the source audio and feeds it to ffmpeg.""" logger.log(VERBOSE_LOG_LEVEL, "writer started for %s", streamdetails.uri) music_prov = mass.get_provider(streamdetails.provider) - seek_pos = seek_position if streamdetails.can_seek else 0 + seek_pos = streamdetails.seek_position if streamdetails.can_seek else 0 async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos): await ffmpeg_proc.write(audio_chunk) # write eof when last packet is received @@ -507,7 +518,7 @@ async def get_media_stream( # noqa: PLR0915 seconds_streamed, ) # store accurate duration - streamdetails.duration = seek_position + seconds_streamed + streamdetails.duration = streamdetails.seek_position + seconds_streamed else: logger.debug( "stream aborted for %s (%s seconds streamed)", diff --git a/music_assistant/server/models/player_provider.py b/music_assistant/server/models/player_provider.py index 85b05576..e7cd5f69 100644 --- a/music_assistant/server/models/player_provider.py +++ b/music_assistant/server/models/player_provider.py @@ -107,8 +107,6 @@ class PlayerProvider(Provider): self, player_id: str, queue_item: QueueItem, - seek_position: int, - fade_in: bool, ) -> None: """Handle PLAY MEDIA on given player. @@ -117,8 +115,6 @@ class PlayerProvider(Provider): - player_id: player_id of the player to handle the command. - queue_item: The QueueItem that needs to be played on the player. - - seek_position: Optional seek to this position. - - fade_in: Optionally fade in the item at playback start. """ raise NotImplementedError @@ -137,6 +133,13 @@ class PlayerProvider(Provider): This will NOT be called if the player is using flow mode to playback the queue. """ + async def play_announcement( + self, player_id: str, announcement_url: str, use_pre_announce: bool = False + ) -> None: + """Handle (provider native) playback of an announcement on given player.""" + # will only be called for players with PLAY_ANNOUNCEMENT feature set. + raise NotImplementedError + async def cmd_power(self, player_id: str, powered: bool) -> None: """Send POWER command to given player. diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 99471075..8e4b159f 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -32,6 +32,7 @@ from music_assistant.common.models.config_entries import ( from music_assistant.common.models.enums import ( ConfigEntryType, ContentType, + MediaType, PlayerFeature, PlayerState, PlayerType, @@ -41,7 +42,11 @@ from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.player_queue import PlayerQueue from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL -from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params +from music_assistant.server.helpers.audio import ( + get_ffmpeg_stream, + get_media_stream, + get_player_filter_params, +) from music_assistant.server.helpers.process import check_output from music_assistant.server.models.player_provider import PlayerProvider @@ -256,25 +261,19 @@ class AirplayStreamJob: self._log_reader_task = asyncio.create_task(self._log_watcher()) self._audio_reader_task = asyncio.create_task(self._audio_reader()) - async def stop(self, force=False): + async def stop(self): """Stop playback and cleanup.""" if not self.running: return - await self.send_cli_command("ACTION=STOP") - self._stop_requested = True - if not force: - return - # stop background tasks + # always stop the audio feeder if self._audio_reader_task and not self._audio_reader_task.done(): with suppress(asyncio.CancelledError): self._audio_reader_task.cancel() await self._audio_reader_task - if self._log_reader_task and not self._log_reader_task.done(): - with suppress(asyncio.CancelledError): - self._log_reader_task.cancel() - await self._log_reader_task + await self.send_cli_command("ACTION=STOP") + self._stop_requested = True with suppress(TimeoutError): - await asyncio.wait_for(self._cliraop_proc.communicate(), 5) + await asyncio.wait_for(self._cliraop_proc.wait(), 5) if self._cliraop_proc.returncode is None: self._cliraop_proc.kill() @@ -288,7 +287,7 @@ class AirplayStreamJob: command += "\n" def send_data(): - with open(named_pipe, "w") as f: + with suppress(BrokenPipeError), open(named_pipe, "w") as f: f.write(command) self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command) @@ -336,8 +335,8 @@ class AirplayStreamJob: continue if "lost packet out of backlog" in line: lost_packets += 1 - if lost_packets == 30: - logger.warning("Packet loss detected, restart playback...") + if lost_packets == 50: + logger.warning("High packet loss detected, restart playback...") queue = self.mass.player_queues.get_active_queue(mass_player.player_id) await self.mass.player_queues.resume(queue.queue_id) else: @@ -351,7 +350,7 @@ class AirplayStreamJob: "CLIRaop process stopped with errorcode %s", self._cliraop_proc.returncode, ) - if ( + if not airplay_player.active_stream or ( airplay_player.active_stream and airplay_player.active_stream.active_remote_id == self.active_remote_id ): @@ -566,18 +565,11 @@ class AirplayProvider(PlayerProvider): - player_id: player_id of the player to handle the command. """ - - async def stop_player(airplay_player: AirPlayPlayer) -> None: - if airplay_player.active_stream: - await airplay_player.active_stream.stop(force=False) - mass_player = self.mass.players.get(airplay_player.player_id) - mass_player.state = PlayerState.IDLE - self.mass.players.update(airplay_player.player_id) - # forward command to player and any connected sync members async with asyncio.TaskGroup() as tg: for airplay_player in self._get_sync_clients(player_id): - tg.create_task(stop_player(airplay_player)) + if airplay_player.active_stream: + tg.create_task(airplay_player.active_stream.stop()) async def cmd_play(self, player_id: str) -> None: """Send PLAY (unpause) command to given player. @@ -607,19 +599,8 @@ class AirplayProvider(PlayerProvider): self, player_id: str, queue_item: QueueItem, - seek_position: int, - fade_in: bool, ) -> None: - """Handle PLAY MEDIA on given player. - - This is called by the Queue controller to start playing a queue item on the given player. - The provider's own implementation should work out how to handle this request. - - - player_id: player_id of the player to handle the command. - - queue_item: The QueueItem that needs to be played on the player. - - seek_position: Optional seek to this position. - - fade_in: Optionally fade in the item at playback start. - """ + """Handle PLAY MEDIA on given player.""" player = self.mass.players.get(player_id) if player.synced_to: # should not happen, but just in case @@ -631,15 +612,17 @@ class AirplayProvider(PlayerProvider): # always stop existing stream first for airplay_player in self._get_sync_clients(player_id): if airplay_player.active_stream and airplay_player.active_stream.running: - await airplay_player.active_stream.stop(force=True) + await airplay_player.active_stream.stop() pcm_format = AudioFormat( content_type=ContentType.PCM_S16LE, sample_rate=44100, bit_depth=16, channels=2, ) - - if queue_item.queue_item_id == "flow": + if queue_item.media_type == MediaType.ANNOUNCEMENT: + # stream announcement url directly + stream_job = None + elif stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id): # handle special case for UGP multi client stream stream_job = self.mass.streams.multi_client_jobs.get(queue_item.queue_id) elif player.group_childs: @@ -647,8 +630,6 @@ class AirplayProvider(PlayerProvider): stream_job = await self.mass.streams.create_multi_client_stream_job( queue_item.queue_id, queue_item, - seek_position=seek_position, - fade_in=fade_in, pcm_bit_depth=16, pcm_sample_rate=44100, ) @@ -675,6 +656,11 @@ class AirplayProvider(PlayerProvider): player_id=airplay_player.player_id, output_format=pcm_format, ) + elif queue_item.media_type == MediaType.ANNOUNCEMENT: + # stream announcement url directly + audio_iterator = get_media_stream( + self.mass, queue_item.streamdetails, pcm_format=pcm_format + ) else: queue = self.mass.player_queues.get_active_queue(queue_item.queue_id) audio_iterator = get_ffmpeg_stream( @@ -682,8 +668,6 @@ class AirplayProvider(PlayerProvider): queue, start_queue_item=queue_item, pcm_format=pcm_format, - seek_position=seek_position, - fade_in=fade_in, ), input_format=pcm_format, output_format=pcm_format, diff --git a/music_assistant/server/providers/chromecast/__init__.py b/music_assistant/server/providers/chromecast/__init__.py index 719e830a..3e6528b2 100644 --- a/music_assistant/server/providers/chromecast/__init__.py +++ b/music_assistant/server/providers/chromecast/__init__.py @@ -238,19 +238,8 @@ class ChromecastProvider(PlayerProvider): self, player_id: str, queue_item: QueueItem, - seek_position: int, - fade_in: bool, ) -> None: - """Handle PLAY MEDIA on given player. - - This is called by the Queue controller to start playing a queue item on the given player. - The provider's own implementation should work out how to handle this request. - - - player_id: player_id of the player to handle the command. - - queue_item: The QueueItem that needs to be played on the player. - - seek_position: Optional seek to this position. - - fade_in: Optionally fade in the item at playback start. - """ + """Handle PLAY MEDIA on given player.""" castplayer = self.castplayers[player_id] use_flow_mode = await self.mass.config.get_player_config_value( player_id, CONF_FLOW_MODE @@ -259,8 +248,6 @@ class ChromecastProvider(PlayerProvider): player_id, queue_item=queue_item, output_codec=ContentType.FLAC, - seek_position=seek_position, - fade_in=fade_in, flow_mode=use_flow_mode, ) queuedata = { diff --git a/music_assistant/server/providers/dlna/__init__.py b/music_assistant/server/providers/dlna/__init__.py index bfa3e378..d69b8e00 100644 --- a/music_assistant/server/providers/dlna/__init__.py +++ b/music_assistant/server/providers/dlna/__init__.py @@ -346,27 +346,14 @@ class DLNAPlayerProvider(PlayerProvider): self, player_id: str, queue_item: QueueItem, - seek_position: int, - fade_in: bool, ) -> None: - """Handle PLAY MEDIA on given player. - - This is called by the Queue controller to start playing a queue item on the given player. - The provider's own implementation should work out how to handle this request. - - - player_id: player_id of the player to handle the command. - - queue_item: The QueueItem that needs to be played on the player. - - seek_position: Optional seek to this position. - - fade_in: Optionally fade in the item at playback start. - """ + """Handle PLAY MEDIA on given player.""" use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE) enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) url = await self.mass.streams.resolve_stream_url( player_id, queue_item=queue_item, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, - seek_position=seek_position, - fade_in=fade_in, flow_mode=use_flow_mode, ) dlna_player = self.dlnaplayers[player_id] diff --git a/music_assistant/server/providers/fully_kiosk/__init__.py b/music_assistant/server/providers/fully_kiosk/__init__.py index a0887c33..d51e6589 100644 --- a/music_assistant/server/providers/fully_kiosk/__init__.py +++ b/music_assistant/server/providers/fully_kiosk/__init__.py @@ -182,27 +182,14 @@ class FullyKioskProvider(PlayerProvider): self, player_id: str, queue_item: QueueItem, - seek_position: int, - fade_in: bool, ) -> None: - """Handle PLAY MEDIA on given player. - - This is called by the Queue controller to start playing a queue item on the given player. - The provider's own implementation should work out how to handle this request. - - - player_id: player_id of the player to handle the command. - - queue_item: The QueueItem that needs to be played on the player. - - seek_position: Optional seek to this position. - - fade_in: Optionally fade in the item at playback start. - """ + """Handle PLAY MEDIA on given player.""" player = self.mass.players.get(player_id) enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) url = await self.mass.streams.resolve_stream_url( player_id, queue_item=queue_item, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, - seek_position=seek_position, - fade_in=fade_in, flow_mode=True, ) await self._fully.playSound(url, AUDIOMANAGER_STREAM_MUSIC) diff --git a/music_assistant/server/providers/hass_players/__init__.py b/music_assistant/server/providers/hass_players/__init__.py index 7323b6ce..10a275d6 100644 --- a/music_assistant/server/providers/hass_players/__init__.py +++ b/music_assistant/server/providers/hass_players/__init__.py @@ -249,27 +249,14 @@ class HomeAssistantPlayers(PlayerProvider): self, player_id: str, queue_item: QueueItem, - seek_position: int, - fade_in: bool, ) -> None: - """Handle PLAY MEDIA on given player. - - This is called by the Queue controller to start playing a queue item on the given player. - The provider's own implementation should work out how to handle this request. - - - player_id: player_id of the player to handle the command. - - queue_item: The QueueItem that needs to be played on the player. - - seek_position: Optional seek to this position. - - fade_in: Optionally fade in the item at playback start. - """ + """Handle PLAY MEDIA on given player.""" use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE) enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) url = await self.mass.streams.resolve_stream_url( player_id, queue_item=queue_item, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, - seek_position=seek_position, - fade_in=fade_in, flow_mode=use_flow_mode, ) await self.hass_prov.hass.call_service( diff --git a/music_assistant/server/providers/plex/__init__.py b/music_assistant/server/providers/plex/__init__.py index 430e4716..defc9989 100644 --- a/music_assistant/server/providers/plex/__init__.py +++ b/music_assistant/server/providers/plex/__init__.py @@ -760,9 +760,6 @@ class PlexProvider(MusicProvider): data=plex_track, ) - if audio_stream.loudness: - stream_details.loudness = audio_stream.loudness - if media_type != ContentType.M4A: stream_details.direct = self._plex_server.url(media_part.key, True) if audio_stream.samplingRate: diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index d2cce790..6e20cf3a 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -325,19 +325,8 @@ class SlimprotoProvider(PlayerProvider): self, player_id: str, queue_item: QueueItem, - seek_position: int, - fade_in: bool, ) -> None: - """Handle PLAY MEDIA on given player. - - This is called by the Queue controller to start playing a queue item on the given player. - The provider's own implementation should work out how to handle this request. - - - player_id: player_id of the player to handle the command. - - queue_item: The QueueItem that needs to be played on the player. - - seek_position: Optional seek to this position. - - fade_in: Optionally fade in the item at playback start. - """ + """Handle PLAY MEDIA on given player.""" # fix race condition where resync and play media are called at more or less the same time if self._resync_handle: self._resync_handle.cancel() @@ -352,8 +341,6 @@ class SlimprotoProvider(PlayerProvider): stream_job = await self.mass.streams.create_multi_client_stream_job( queue_id=queue_item.queue_id, start_queue_item=queue_item, - seek_position=int(seek_position), - fade_in=fade_in, ) # forward command to player and any connected sync members sync_clients = self._get_sync_clients(player_id) @@ -384,8 +371,6 @@ class SlimprotoProvider(PlayerProvider): # for now just hardcode flac as we assume that every (modern) # slimproto based player can handle that just fine output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, - seek_position=seek_position, - fade_in=fade_in, flow_mode=False, ) await self._handle_play_url( diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 8cf61b0d..646ccb2e 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -20,6 +20,7 @@ from music_assistant.common.models.config_entries import ( from music_assistant.common.models.enums import ( ConfigEntryType, ContentType, + MediaType, PlayerFeature, PlayerState, PlayerType, @@ -28,6 +29,7 @@ from music_assistant.common.models.enums import ( from music_assistant.common.models.errors import SetupFailedError from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.server.helpers.audio import get_media_stream from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: @@ -274,8 +276,13 @@ class SnapCastProvider(PlayerProvider): bit_depth=16, channels=2, ) - # handle special case for UGP multi client stream - if stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id): + if queue_item.media_type == MediaType.ANNOUNCEMENT: + # stream announcement url directly + audio_iterator = get_media_stream( + self.mass, queue_item.streamdetails, pcm_format=pcm_format + ) + elif stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id): + # handle special case for UGP multi client stream stream_job.expected_players.add(player_id) audio_iterator = stream_job.subscribe( player_id=player_id, diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index 3690e26d..0abe5b53 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -17,6 +17,7 @@ import soco.config as soco_config from requests.exceptions import RequestException from soco import events_asyncio, zonegroupstate from soco.discovery import discover +from sonos_websocket.exception import SonosWebsocketError from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE, @@ -54,6 +55,7 @@ PLAYER_FEATURES = ( PlayerFeature.VOLUME_SET, PlayerFeature.ENQUEUE_NEXT, PlayerFeature.PAUSE, + PlayerFeature.PLAY_ANNOUNCEMENT, ) CONF_NETWORK_SCAN = "network_scan" @@ -337,25 +339,12 @@ class SonosPlayerProvider(PlayerProvider): self, player_id: str, queue_item: QueueItem, - seek_position: int, - fade_in: bool, ) -> None: - """Handle PLAY MEDIA on given player. - - This is called by the Queue controller to start playing a queue item on the given player. - The provider's own implementation should work out how to handle this request. - - - player_id: player_id of the player to handle the command. - - queue_item: The QueueItem that needs to be played on the player. - - seek_position: Optional seek to this position. - - fade_in: Optionally fade in the item at playback start. - """ + """Handle PLAY MEDIA on given player.""" url = await self.mass.streams.resolve_stream_url( player_id, queue_item=queue_item, output_codec=ContentType.FLAC, - seek_position=seek_position, - fade_in=fade_in, ) sonos_player = self.sonosplayers[player_id] mass_player = self.mass.players.get(player_id) @@ -410,6 +399,32 @@ class SonosPlayerProvider(PlayerProvider): await self._enqueue_item(sonos_player, url=url, queue_item=queue_item) + async def play_announcement( + self, player_id: str, announcement_url: str, use_pre_announce: bool = False + ) -> None: + """Handle (provider native) playback of an announcement on given player.""" + if use_pre_announce: + announcement_url = self.mass.streams.get_announcement_url( + player_id, announcement_url, True + ) + sonos_player = self.sonosplayers[player_id] + mass_player = self.mass.players.get(player_id) + temp_volume = int(min(75, mass_player.volume_level * 1.5)) + self.logger.debug( + "Playing announcement %s using websocket audioclip on %s", + announcement_url, + sonos_player.zone_name, + ) + try: + response, _ = await sonos_player.websocket.play_clip( + announcement_url, + volume=temp_volume, + ) + except SonosWebsocketError as exc: + raise PlayerCommandFailed(f"Error when calling Sonos websocket: {exc}") from exc + if response["success"]: + return + async def poll_player(self, player_id: str) -> None: """Poll player for state updates. -- 2.34.1