async def get_provider_loudness(self, provider: ProviderType) -> float | None:
"""Get average integrated loudness for tracks of given provider."""
all_items = []
+ if provider == ProviderType.URL:
+ # this is not a very good idea for random urls
+ return None
for db_row in await self.mass.database.get_rows(
TABLE_TRACK_LOUDNESS,
{
from music_assistant.helpers.audio import (
check_audio_support,
crossfade_pcm_parts,
- fadein_pcm_part,
get_chunksize,
get_media_stream,
get_preview_stream,
"""Return url to control endpoint."""
return f"{self.base_url}/{queue_id}/{control}"
+ def get_silence_url(
+ self,
+ content_type: ContentType = ContentType.WAV,
+ ) -> str:
+ """Generate stream url for a silence Stream."""
+ ext = content_type.value
+ return f"{self.base_url}/silence.{ext}"
+
async def setup(self) -> None:
"""Async initialize of module."""
app = web.Application()
@staticmethod
async def serve_silence(request: web.Request):
"""Serve some nice silence."""
- duration = int(request.query.get("duration", 60))
+ duration = int(request.query.get("duration", 3600))
fmt = ContentType.try_parse(request.match_info["fmt"])
resp = web.StreamResponse(
seek_position: int,
fade_in: bool,
output_format: ContentType,
- is_alert: bool,
) -> QueueStream:
"""Start running a queue stream."""
# generate unique stream url
streamdetails = await get_stream_details(self.mass, first_item, queue.queue_id)
# work out pcm details
- if is_alert:
- pcm_sample_rate = 41000
+ if streamdetails.media_type == MediaType.ANNOUNCEMENT:
+ pcm_sample_rate = 44100
pcm_bit_depth = 16
pcm_channels = 2
allow_resample = True
pcm_bit_depth=pcm_bit_depth,
pcm_channels=pcm_channels,
allow_resample=allow_resample,
- is_alert=is_alert,
autostart=True,
)
# cleanup stale previous queue tasks
pcm_channels: int = 2,
pcm_floating_point: bool = False,
allow_resample: bool = False,
- is_alert: bool = False,
autostart: bool = False,
):
"""Init QueueStreamJob instance."""
self.pcm_channels = pcm_channels
self.pcm_floating_point = pcm_floating_point
self.allow_resample = allow_resample
- self.is_alert = is_alert
self.url = queue.mass.streams.get_stream_url(stream_id, output_format)
self.mass = queue.mass
# add metadata
"-metadata",
"title=Streaming from Music Assistant",
+ ]
+ # fade-in if needed
+ if self.fade_in:
+ ffmpeg_args += ["-af", "afade=t=in:st=0:d=5"]
+
+ ffmpeg_args += [
# output args
"-f",
self.output_format.value,
if track_count == 1:
queue_index = self.start_index
seek_position = self.seek_position
- fade_in = self.fade_in
else:
next_index = self.queue.get_next_index(queue_index)
# break here if repeat is enabled
break
queue_index = next_index
seek_position = 0
- fade_in = False
self.index_in_buffer = queue_index
# send signal that we've loaded a new track into the buffer
self.queue.signal_update()
break
# check crossfade ability
- use_crossfade = self.queue.settings.crossfade_mode != CrossFadeMode.DISABLED
+ use_crossfade = (
+ self.queue.settings.crossfade_mode != CrossFadeMode.DISABLED
+ and self.queue.settings.crossfade_duration > 0
+ and streamdetails.media_type != MediaType.ANNOUNCEMENT
+ )
+ # do not crossfade tracks of same album
if (
- prev_track is not None
+ use_crossfade
+ and self.queue.settings.crossfade_mode != CrossFadeMode.ALWAYS
+ and prev_track is not None
and prev_track.media_type == MediaType.TRACK
and queue_track.media_type == MediaType.TRACK
):
and new_item.album is not None
and prev_item.album == new_item.album
):
+ self.logger.debug("Skipping crossfade: Tracks are from same album")
use_crossfade = False
prev_track = queue_track
- # calculate sample_size based on PCM params for 200ms of audio
+ # calculate sample_size based on PCM params for 1 second of audio
input_format = ContentType.from_bit_depth(
self.pcm_bit_depth, self.pcm_floating_point
)
- sample_size = get_chunksize(
+ sample_size_per_second = get_chunksize(
input_format,
self.pcm_sample_rate,
self.pcm_bit_depth,
self.pcm_channels,
)
- # buffer size is duration of crossfade + 3 seconds
- crossfade_duration = self.queue.settings.crossfade_duration or fade_in or 1
- crossfade_size = (sample_size * 5) * crossfade_duration
- buf_size = (sample_size * 5) * (crossfade_duration * 3)
- total_size = (sample_size * 5) * (queue_track.duration or 0)
+ crossfade_duration = self.queue.settings.crossfade_duration
+ crossfade_size = sample_size_per_second * crossfade_duration
+ # buffer_duration has some overhead to account for padded silence
+ buffer_duration = (crossfade_duration or 2) * 2
+ # predict total size to expect for this track from duration
+ stream_duration = (queue_track.duration or 0) - seek_position
self.logger.info(
- "Start Streaming queue track: %s (%s) for queue %s",
+ "Start Streaming queue track: %s (%s) for queue %s - crossfade: %s",
queue_track.uri,
queue_track.name,
self.queue.player.name,
+ use_crossfade,
)
queue_track.streamdetails.seconds_skipped = seek_position
- chunk_count = 0
buffer = b""
bytes_written = 0
# handle incoming audio chunks
sample_rate=self.pcm_sample_rate,
channels=self.pcm_channels,
seek_position=seek_position,
+ chunk_size=sample_size_per_second,
):
- chunk_count += 1
+
+ seconds_streamed = bytes_written / sample_size_per_second
+ seconds_in_buffer = len(buffer) / sample_size_per_second
#### HANDLE FIRST PART OF TRACK
queue_track.streamdetails.seconds_streamed = 0
break
- # track has no duration or duration < 30s: pypass any further processing
- if queue_track.duration is None or queue_track.duration < 30:
- bytes_written += len(chunk)
+ # bypass any processing for radiostreams and announcements
+ if (
+ streamdetails.media_type == MediaType.ANNOUNCEMENT
+ or not stream_duration
+ ):
yield chunk
- continue
-
- # first part of track and we need to (cross)fade: fill buffer
- if bytes_written < buf_size and (last_fadeout_part or fade_in):
- bytes_written += len(chunk)
- buffer += chunk
- continue
-
- # last part of track: fill buffer
- if bytes_written >= (total_size - buf_size):
bytes_written += len(chunk)
- buffer += chunk
continue
- # buffer full for fade-in / crossfade
- if buffer and (last_fadeout_part or fade_in):
- # strip silence of start and create fade-in part
+ # buffer full for crossfade
+ if last_fadeout_part and (seconds_in_buffer >= buffer_duration):
+ # strip silence of start
first_part = await strip_silence(
buffer + chunk, pcm_fmt, self.pcm_sample_rate
)
-
- if last_fadeout_part:
- # crossfade
- fadein_part = first_part[:crossfade_size]
- remaining_bytes = first_part[crossfade_size:]
- crossfade_part = await crossfade_pcm_parts(
- fadein_part,
- last_fadeout_part,
- crossfade_duration,
- pcm_fmt,
- self.pcm_sample_rate,
- )
- # send crossfade_part
- yield crossfade_part
- bytes_written += len(crossfade_part)
- # also write the leftover bytes from the strip action
- if remaining_bytes:
- yield remaining_bytes
- bytes_written += len(remaining_bytes)
- else:
- # fade-in
- fadein_part = await fadein_pcm_part(
- first_part,
- fade_in,
- pcm_fmt,
- self.pcm_sample_rate,
- )
- yield fadein_part
- bytes_written += len(fadein_part)
+ # perform crossfade
+ fadein_part = first_part[:crossfade_size]
+ remaining_bytes = first_part[crossfade_size:]
+ crossfade_part = await crossfade_pcm_parts(
+ fadein_part,
+ last_fadeout_part,
+ crossfade_duration,
+ pcm_fmt,
+ self.pcm_sample_rate,
+ )
+ # send crossfade_part
+ yield crossfade_part
+ bytes_written += len(crossfade_part)
+ # also write the leftover bytes from the strip action
+ if remaining_bytes:
+ yield remaining_bytes
+ bytes_written += len(remaining_bytes)
# clear vars
last_fadeout_part = b""
buffer = b""
continue
- # all other: middle of track or no fade actions, just yield the audio
- bytes_written += len(chunk)
+ # first part of track and we need to crossfade: fill buffer
+ if last_fadeout_part:
+ buffer += chunk
+ continue
+
+ # last part of track: fill buffer
+ if buffer or (seconds_streamed >= (stream_duration - buffer_duration)):
+ buffer += chunk
+ continue
+
+ # all other: middle of track or no crossfade action, just yield the audio
yield chunk
+ bytes_written += len(chunk)
continue
#### HANDLE END OF TRACK
+ self.logger.debug(
+ "end of track reached - seconds_streamed: %s - seconds_in_buffer: %s - stream_duration: %s",
+ seconds_streamed,
+ seconds_in_buffer,
+ stream_duration,
+ )
if buffer:
# strip silence from end of audio
last_part = await strip_silence(
buffer, pcm_fmt, self.pcm_sample_rate, reverse=True
)
-
- # handle crossfading support
- # store fade section to be picked up for next track
-
- if use_crossfade:
- # crossfade is enabled, save fadeout part to pickup for next track
- last_part = last_part[-crossfade_size:]
+ # if crossfade is enabled, save fadeout part to pickup for next track
+ if use_crossfade and len(last_part) > crossfade_size:
+ # yield remaining bytes from strip action,
+ # we only need the crossfade_size part
+ last_fadeout_part = last_part[-crossfade_size:]
remaining_bytes = last_part[:-crossfade_size]
- # yield remaining bytes
- bytes_written += len(remaining_bytes)
yield remaining_bytes
+ bytes_written += len(remaining_bytes)
+ elif use_crossfade:
last_fadeout_part = last_part
else:
# no crossfade enabled, just yield the stripped audio data
- bytes_written += len(last_part)
yield last_part
+ bytes_written += len(last_part)
- # end of the track reached
- queue_track.streamdetails.seconds_streamed = bytes_written / sample_size
+ # end of the track reached - store accurate duration
+ buffer = b""
+ queue_track.streamdetails.seconds_streamed = (
+ bytes_written / sample_size_per_second
+ )
self.logger.debug(
"Finished Streaming queue track: %s (%s) on queue %s",
queue_track.uri,
from typing import TYPE_CHECKING, AsyncGenerator, List, Optional, Tuple
import aiofiles
-from aiohttp import ClientError, ClientTimeout
+from aiohttp import ClientTimeout
from music_assistant.helpers.process import AsyncProcess, check_output
from music_assistant.helpers.util import create_tempfile
]
async with AsyncProcess(args, True) as proc:
crossfade_data, _ = await proc.communicate(fade_in_part)
+ if crossfade_data:
+ LOGGER.debug(
+ "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - result: %s",
+ len(fade_in_part),
+ len(fade_out_part),
+ len(crossfade_data),
+ )
+ return crossfade_data
+ # no crossfade_data, return original data instead
LOGGER.debug(
- "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - result: %s",
+ "crossfade of pcm chunks failed: not enough data. fade_in_part: %s - fade_out_part: %s",
len(fade_in_part),
len(fade_out_part),
- len(crossfade_data),
)
- return crossfade_data
-
-
-async def fadein_pcm_part(
- pcm_audio: bytes,
- fade_length: int,
- fmt: ContentType,
- sample_rate: int,
- channels: int = 2,
-) -> bytes:
- """Fadein chunk of pcm/raw audio using ffmpeg."""
- args = [
- # generic args
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "quiet",
- # fade_in part (stdin)
- "-acodec",
- fmt.name.lower(),
- "-f",
- fmt.value,
- "-ac",
- str(channels),
- "-ar",
- str(sample_rate),
- "-i",
- "-",
- # filter args
- "-af",
- f"afade=type=in:start_time=0:duration={fade_length}",
- # output args
- "-f",
- fmt.value,
- "-",
- ]
- async with AsyncProcess(args, True) as proc:
- result_audio, _ = await proc.communicate(pcm_audio)
- return result_audio
+ return fade_out_part + fade_in_part
async def strip_silence(
]
# filter args
if reverse:
- args += ["-af", "areverse,silenceremove=1:0:-50dB:detection=peak,areverse"]
+ args += [
+ "-af",
+ "areverse,atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02,areverse",
+ ]
else:
- args += ["-af", "silenceremove=1:0:-50dB:detection=peak"]
+ args += [
+ "-af",
+ "atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02",
+ ]
# output args
args += ["-f", fmt.value, "-"]
async with AsyncProcess(args, True) as proc:
queue_item.streamdetails.seconds_skipped = 0
queue_item.streamdetails.seconds_streamed = 0
streamdetails = queue_item.streamdetails
-
- # fetch streamdetails from provider
- # always request the full item as there might be other qualities available
- full_item = await mass.music.get_item_by_uri(queue_item.uri)
- # sort by quality and check track availability
- for prov_media in sorted(
- full_item.provider_ids, key=lambda x: x.quality or 0, reverse=True
- ):
- if not prov_media.available:
- continue
- # get streamdetails from provider
- music_prov = mass.music.get_provider(prov_media.prov_id)
- if not music_prov or not music_prov.available:
- continue # provider temporary unavailable ?
- try:
- streamdetails: StreamDetails = await music_prov.get_stream_details(
- prov_media.item_id
- )
- streamdetails.content_type = ContentType(streamdetails.content_type)
- except MusicAssistantError as err:
- LOGGER.warning(str(err))
- else:
- break
+ else:
+ # fetch streamdetails from provider
+ # always request the full item as there might be other qualities available
+ full_item = await mass.music.get_item_by_uri(queue_item.uri)
+ # sort by quality and check track availability
+ for prov_media in sorted(
+ full_item.provider_ids, key=lambda x: x.quality or 0, reverse=True
+ ):
+ if not prov_media.available:
+ continue
+ # get streamdetails from provider
+ music_prov = mass.music.get_provider(prov_media.prov_id)
+ if not music_prov or not music_prov.available:
+ continue # provider temporary unavailable ?
+ try:
+ streamdetails: StreamDetails = await music_prov.get_stream_details(
+ prov_media.item_id
+ )
+ streamdetails.content_type = ContentType(streamdetails.content_type)
+ except MusicAssistantError as err:
+ LOGGER.warning(str(err))
+ else:
+ break
if not streamdetails:
raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}")
sample_rate: int,
channels: int = 2,
seek_position: int = 0,
+ chunk_size: int = 64000,
) -> AsyncGenerator[bytes, None]:
"""Get the PCM audio stream for the given streamdetails."""
assert pcm_fmt.is_pcm(), "Output format must be a PCM type"
ffmpeg_proc.attach_task(writer())
# yield chunks from stdout
- sample_size = get_chunksize(pcm_fmt, sample_rate, 24, channels, 10)
try:
- async for chunk in ffmpeg_proc.iter_any(sample_size):
+ async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
yield chunk
except (asyncio.CancelledError, GeneratorExit) as err:
) -> AsyncGenerator[bytes, None]:
"""Get radio audio stream from HTTP, including metadata retrieval."""
headers = {"Icy-MetaData": "1"}
- timeout = ClientTimeout(total=0, connect=10, sock_read=10)
- reconnects = 0
- while True:
- # in loop to reconnect on connection failure
- try:
- LOGGER.debug("radio stream (re)connecting to: %s", url)
- async with mass.http_session.get(
- url, headers=headers, timeout=timeout
- ) as resp:
- headers = resp.headers
- meta_int = int(headers.get("icy-metaint", "0"))
- # stream with ICY Metadata
- if meta_int:
- while True:
- audio_chunk = await resp.content.readexactly(meta_int)
- yield audio_chunk
- meta_byte = await resp.content.readexactly(1)
- meta_length = ord(meta_byte) * 16
- meta_data = await resp.content.readexactly(meta_length)
- if not meta_data:
- continue
- meta_data = meta_data.rstrip(b"\0")
- stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
- if not stream_title:
- continue
- stream_title = stream_title.group(1).decode()
- if stream_title != streamdetails.stream_title:
- streamdetails.stream_title = stream_title
- if queue := mass.players.get_player_queue(
- streamdetails.queue_id
- ):
- queue.signal_update()
- # Regular HTTP stream
- else:
- async for chunk in resp.content.iter_any():
- yield chunk
- except (asyncio.exceptions.TimeoutError, ClientError) as err:
- # reconnect on http error (max 5 times)
- if reconnects >= 5:
- raise err
- reconnects += 1
+ timeout = ClientTimeout(total=0, connect=30, sock_read=120)
+ async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp:
+ headers = resp.headers
+ meta_int = int(headers.get("icy-metaint", "0"))
+ # stream with ICY Metadata
+ if meta_int:
+ while True:
+ audio_chunk = await resp.content.readexactly(meta_int)
+ yield audio_chunk
+ meta_byte = await resp.content.readexactly(1)
+ meta_length = ord(meta_byte) * 16
+ meta_data = await resp.content.readexactly(meta_length)
+ if not meta_data:
+ continue
+ meta_data = meta_data.rstrip(b"\0")
+ stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
+ if not stream_title:
+ continue
+ stream_title = stream_title.group(1).decode()
+ if stream_title != streamdetails.stream_title:
+ streamdetails.stream_title = stream_title
+ if queue := mass.players.get_player_queue(streamdetails.queue_id):
+ queue.signal_update()
+ # Regular HTTP stream
+ else:
+ async for chunk in resp.content.iter_any():
+ yield chunk
async def get_http_stream(
buffer = b""
buffer_all = False
bytes_received = 0
- async with mass.http_session.get(url, headers=headers) as resp:
+ timeout = ClientTimeout(total=0, connect=30, sock_read=120)
+ async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp:
is_partial = resp.status == 206
buffer_all = seek_position and not is_partial
async for chunk in resp.content.iter_any():
async def get_setting(self, key: str) -> str | None:
"""Get setting from settings table."""
- return await self.get_row(TABLE_SETTINGS, {"key": key})
+ if db_row := await self.get_row(TABLE_SETTINGS, {"key": key}):
+ return db_row["value"]
+ return None
async def set_setting(self, key: str, value: str) -> None:
"""Set setting in settings table."""
await self.__create_database_tables()
try:
if prev_version := await self.get_setting("version"):
- prev_version = int(prev_version["value"])
+ prev_version = int(prev_version)
else:
prev_version = 0
except (KeyError, ValueError):
# convert all tag-keys to lowercase without spaces
tags = {
key.lower().replace(" ", "").replace("_", ""): value
- for key, value in raw["format"]["tags"].items()
+ for key, value in raw["format"].get("tags", {}).items()
}
return AudioTags(
PLAYLIST = "playlist"
RADIO = "radio"
FOLDER = "folder"
+ ANNOUNCEMENT = "announcement"
UNKNOWN = "unknown"
import asyncio
from abc import ABC
from dataclasses import dataclass
-from typing import TYPE_CHECKING, Any, Dict, List, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List
from mashumaro import DataClassDictMixin
_attr_volume_level: int = 100
_attr_volume_muted: bool = False
_attr_device_info: DeviceInfo = DeviceInfo()
- _attr_default_sample_rates: Tuple[int] = (44100, 48000, 88200, 96000)
- _attr_default_stream_type: ContentType = ContentType.FLAC
+ _attr_max_sample_rate: int = 96000
+ _attr_stream_type: ContentType = ContentType.FLAC
# below objects will be set by playermanager at register/update
mass: MusicAssistant = None # type: ignore[assignment]
_prev_state: dict = {}
# DEFAULT PLAYER SETTINGS
@property
- def default_sample_rates(self) -> Tuple[int]:
- """Return the default supported sample rates."""
+ def max_sample_rate(self) -> int:
+ """Return the (default) max supported sample rate."""
# if a player does not report/set its supported sample rates, we use a pretty safe default
- return self._attr_default_sample_rates
+ return self._attr_max_sample_rate
@property
- def default_stream_type(self) -> ContentType:
- """Return the default content type to use for streaming."""
- return self._attr_default_stream_type
+ def stream_type(self) -> ContentType:
+ """Return the default/preferred content type to use for streaming."""
+ return self._attr_stream_type
# GROUP PLAYER ATTRIBUTES AND METHODS (may be overridden if needed)
# a player can optionally be a group leader (e.g. Sonos)
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from music_assistant.models.enums import (
+ ContentType,
EventType,
MediaType,
ProviderType,
)
from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.models.event import MassEvent
-from music_assistant.models.media_items import MediaItemType, media_from_dict
+from music_assistant.models.media_items import (
+ MediaItemType,
+ StreamDetails,
+ media_from_dict,
+)
from .player import Player, PlayerState
from .queue_item import QueueItem
items: List[QueueItem]
index: Optional[int]
position: int
- repeat: RepeatMode
- shuffle: bool
+ settings: dict
+ volume_level: int
class PlayerQueue:
self._last_player_update: int = 0
self._last_stream_id: str = ""
self._snapshot: Optional[QueueSnapShot] = None
+ self._announcement_in_progress: bool = False
async def setup(self) -> None:
"""Handle async setup of instance."""
if not self._stream_id:
return None
if stream := self.mass.streams.queue_streams.get(self._stream_id):
- return stream.index_in_buffer
- return None
+ if not stream.done.is_set():
+ return stream.index_in_buffer
+ return self.current_index
@property
def active(self) -> bool:
QueueOption.ADD -> Append new items at end of the queue
:param passive: if passive set to true the stream url will not be sent to the player.
"""
+ if self._announcement_in_progress:
+ self.logger.warning("Ignore queue command: An announcement is in progress")
+ return
# a single item or list of items may be provided
if not isinstance(media, list):
media = [media]
elif queue_opt == QueueOption.ADD:
await self.append(queue_items)
- async def play_alert(
- self, uri: str, announce: bool = False, gain_correct: int = 6
- ) -> str:
+ async def play_announcement(self, url: str, prepend_alert: bool = False) -> str:
"""
- Play given uri as Alert on the queue.
+ Play given uri as Announcement on the queue.
- uri: Uri that should be played as announcement, can be Music Assistant URI or plain url.
- announce: Prepend the (TTS) alert with a small announce sound.
- gain_correct: Adjust the gain of the alert sound (in dB).
+ url: URL that should be played as announcement, can only be plain url.
+ prepend_alert: Prepend the (TTS) announcement with an alert bell sound.
"""
- assert not self._snapshot, "Alert already in progress"
+ if self._announcement_in_progress:
+ self.logger.warning(
+ "Ignore queue command: An announcement is (already) in progress"
+ )
+ return
- # create snapshot
- await self.snapshot_create()
+ def create_announcement(_url: str):
+ return QueueItem(
+ uri=_url,
+ name="announcement",
+ duration=30,
+ streamdetails=StreamDetails(
+ provider=ProviderType.URL,
+ item_id=_url,
+ content_type=ContentType.try_parse(_url),
+ media_type=MediaType.ANNOUNCEMENT,
+ loudness=0,
+ gain_correct=4,
+ data=_url,
+ ),
+ media_type=MediaType.ANNOUNCEMENT,
+ )
- queue_items = []
+ try:
+ # create snapshot
+ await self.snapshot_create()
+ # stop player if needed
+ if self.active and self.player.state in (
+ PlayerState.PLAYING,
+ PlayerState.PAUSED,
+ ):
+ await self.stop()
+ await self._wait_for_state((PlayerState.OFF, PlayerState.IDLE))
+ self._announcement_in_progress = True
+
+ # turn on player if needed
+ if not self.player.powered:
+ await self.player.power(True)
+ await self._wait_for_state(PlayerState.IDLE)
+
+ # adjust volume if needed
+ if self._settings.announce_volume_increase:
+ announce_volume = (
+ self.player.volume_level + self._settings.announce_volume_increase
+ )
+ announce_volume = min(announce_volume, 100)
+ announce_volume = max(announce_volume, 0)
+ await self.player.volume_set(announce_volume)
+
+ # adjust queue settings for announce playback
+ self._settings.from_dict(
+ {
+ "repeat_mode": "off",
+ "shuffle_enabled": False,
+ }
+ )
- # prepend annnounce sound if needed
- if announce:
- url_prov = self.mass.music.get_provider(ProviderType.URL)
- media_item = await url_prov.parse_item(ALERT_ANNOUNCE_FILE)
- queue_item = QueueItem.from_media_item(media_item)
- queue_items.append(queue_item)
+ queue_items = []
+ # prepend alert sound if needed
+ if prepend_alert:
+ queue_items.append(create_announcement(ALERT_ANNOUNCE_FILE))
- # parse provided uri into a MA MediaItem
- try:
- media_item = await self.mass.music.get_item_by_uri(uri)
- queue_items.append(QueueItem.from_media_item(media_item))
- except MusicAssistantError as err:
- # invalid MA uri or item not found error
- raise MediaNotFoundError(f"Invalid uri: {uri}") from err
+ queue_items.append(create_announcement(url))
- # start queue with alert sound(s)
- self._items = queue_items
- stream = await self.queue_stream_start(
- start_index=0, seek_position=0, fade_in=False, is_alert=True
- )
- # execute the play command on the player(s)
- await self.player.play_url(stream.url)
+ # append silence track. we use that as a reliable way to make sure
+ # there is enough buffer for the player to start quickly
+ # and to detect when we finished playing the alert
+ silence_item = create_announcement(self.mass.streams.get_silence_url())
+ queue_items.append(silence_item)
- # wait for the player to finish playing
- play_started = asyncio.Event()
- play_stopped = asyncio.Event()
+ # start queue with announcement sound(s)
+ self._items = queue_items
+ stream = await self.queue_stream_start(start_index=0, seek_position=0)
+ # execute the play command on the player(s)
+ await self.player.play_url(stream.url)
- def handle_event(evt: MassEvent):
- if self.player.state == PlayerState.PLAYING:
- play_started.set()
- elif play_started.is_set():
- play_stopped.set()
+ # wait for the player to finish playing
+ await asyncio.sleep(5)
+ await self._wait_for_state(PlayerState.PLAYING, silence_item.item_id)
- unsub = self.mass.subscribe(
- handle_event, EventType.QUEUE_UPDATED, self.queue_id
- )
- try:
- await asyncio.wait_for(play_stopped.wait(), 30)
- except asyncio.TimeoutError:
- self.logger.warning("Timeout while playing alert")
+ except Exception as err: # pylint: disable=broad-except
+ self.logger.exception("Error while playing announcement", exc_info=err)
finally:
- unsub()
# restore queue
+ self._announcement_in_progress = False
await self.snapshot_restore()
async def stop(self) -> None:
"""Stop command on queue player."""
+ if self._announcement_in_progress:
+ self.logger.warning("Ignore queue command: An announcement is in progress")
+ return
self.signal_next = False
# redirect to underlying player
await self.player.stop()
async def play(self) -> None:
"""Play (unpause) command on queue player."""
+ if self._announcement_in_progress:
+ self.logger.warning("Ignore queue command: An announcement is in progress")
+ return
if self.active and self.player.state == PlayerState.PAUSED:
await self.player.play()
else:
async def pause(self) -> None:
"""Pause command on queue player."""
+ if self._announcement_in_progress:
+ self.logger.warning("Ignore queue command: An announcement is in progress")
+ return
# redirect to underlying player
await self.player.pause()
if resume_item is not None:
resume_pos = resume_pos if resume_pos > 10 else 0
- fade_in = 5 if resume_pos else 0
+ fade_in = resume_pos > 0
await self.play_index(resume_item.item_id, resume_pos, fade_in)
elif len(self._items) > 0:
# items available in queue but no previous track, start at 0
items=self._items,
index=self._current_index,
position=self._current_item_elapsed_time,
- repeat=self._settings.repeat_mode,
- shuffle=self._settings.shuffle_enabled,
+ settings=self._settings.to_dict(),
+ volume_level=self.player.volume_level,
)
async def snapshot_restore(self) -> None:
"""Restore snapshot of Queue state."""
assert self._snapshot, "Create snapshot before restoring it."
- # clear queue first
- await self.clear()
- # restore queue
- self._settings.repeat_mode = self._snapshot.repeat
- self._settings.shuffle_enabled = self._snapshot.shuffle
- await self._update_items(self._snapshot.items)
- self._current_index = self._snapshot.index
- self._current_item_elapsed_time = self._snapshot.position
- if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED):
- await self.resume()
- if self._snapshot.state == PlayerState.PAUSED:
- await self.pause()
- if not self._snapshot.powered:
- await self.player.power(False)
- # reset snapshot once restored
- self.logger.debug("Restored snapshot...")
- self._snapshot = None
+ try:
+ # clear queue first
+ await self.clear()
+ # restore volume if needed
+ if self._snapshot.volume_level != self.player.volume_level:
+ await self.player.volume_set(self._snapshot.volume_level)
+ # restore queue
+ self._settings.from_dict(self._snapshot.settings)
+ await self.update_items(self._snapshot.items)
+ self._current_index = self._snapshot.index
+ self._current_item_elapsed_time = self._snapshot.position
+ if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED):
+ await self.resume()
+ if self._snapshot.state == PlayerState.PAUSED:
+ await self.pause()
+ if not self._snapshot.powered:
+ await self.player.power(False)
+ # reset snapshot once restored
+ self.logger.debug("Restored snapshot...")
+ except Exception as err: # pylint: disable=broad-except
+ self.logger.exception("Error while restoring snapshot", exc_info=err)
+ finally:
+ self._snapshot = None
async def play_index(
self,
index: Union[int, str],
seek_position: int = 0,
- fade_in: int = 0,
+ fade_in: bool = False,
passive: bool = False,
) -> None:
"""Play item at index (or item_id) X in queue."""
+ if self._announcement_in_progress:
+ self.logger.warning("Ignore queue command: An announcement is in progress")
+ return
if not isinstance(index, int):
index = self.index_by_id(index)
if index is None:
return
# move the item in the list
items.insert(new_index, items.pop(item_index))
- await self._update_items(items)
+ await self.update_items(items)
async def delete_item(self, queue_item_id: str) -> None:
"""Delete item (by id or index) from the queue."""
queue_items = played_items + cur_item + next_items
else:
queue_items = self._items + queue_items
- await self._update_items(queue_items)
+ await self.update_items(queue_items)
async def clear(self) -> None:
"""Clear all items in the queue."""
if self.player.state not in (PlayerState.IDLE, PlayerState.OFF):
await self.stop()
- await self._update_items([])
+ await self.update_items([])
def on_player_update(self) -> None:
"""Call when player updates."""
"""Update queue details, called when player updates."""
if self.player.active_queue != self:
return
+ if not self.active:
+ return
new_index = self._current_index
track_time = self._current_item_elapsed_time
new_item_loaded = False
async def queue_stream_start(
self,
start_index: int,
- seek_position: int,
- fade_in: bool,
- is_alert: bool = False,
+ seek_position: int = 0,
+ fade_in: bool = False,
) -> QueueStream:
"""Start the queue stream runner."""
- self._current_item_elapsed_time = 0
- self._current_index = start_index
-
# start the queue stream background task
stream = await self.mass.streams.start_queue_stream(
queue=self,
seek_position=seek_position,
fade_in=fade_in,
output_format=self._settings.stream_type,
- is_alert=is_alert,
)
self._stream_id = stream.stream_id
+ self._current_item_elapsed_time = 0
+ self._current_index = start_index
return stream
def get_next_index(self, cur_index: Optional[int]) -> int:
"""Return the next index for the queue, accounting for repeat settings."""
- alert_active = self.stream and self.stream.is_alert
# handle repeat single track
- if not alert_active and self.settings.repeat_mode == RepeatMode.ONE:
+ if self.settings.repeat_mode == RepeatMode.ONE:
return cur_index
# handle repeat all
if (
- not alert_active
- and self.settings.repeat_mode == RepeatMode.ALL
+ self.settings.repeat_mode == RepeatMode.ALL
and self._items
and cur_index == (len(self._items) - 1)
):
"settings": self.settings.to_dict(),
}
- async def _update_items(self, queue_items: List[QueueItem]) -> None:
+ async def update_items(self, queue_items: List[QueueItem]) -> None:
"""Update the existing queue items, mostly caused by reordering."""
self._items = queue_items
self.signal_update(True)
"current_item_elapsed_time": self._current_item_elapsed_time,
},
)
+
+ async def _wait_for_state(
+ self,
+ state: Union[None, PlayerState, Tuple[PlayerState]],
+ queue_item_id: Optional[str] = None,
+ timeout: int = 30,
+ ) -> None:
+ """Wait for player(queue) to reach a specific state."""
+ if state is not None and not isinstance(state, tuple):
+ state = (state,)
+
+ count = 0
+ while count < timeout * 10:
+
+ if (state is None or self.player.state in state) and (
+ queue_item_id is None
+ or self.current_item
+ and self.current_item.item_id == queue_item_id
+ ):
+ return
+
+ count += 1
+ await asyncio.sleep(0.1)
+
+ raise TimeoutError(f"Timeout while waiting on state(s) {state}")
import asyncio
import random
-from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Optional
from .enums import ContentType, CrossFadeMode, RepeatMode
self._crossfade_duration: int = 6
self._volume_normalization_enabled: bool = True
self._volume_normalization_target: int = -14
- self._stream_type: Optional[ContentType] = None
- self._sample_rates: Optional[Tuple[int]] = None
+ self._stream_type: ContentType = queue.player.stream_type
+ self._max_sample_rate: int = queue.player.max_sample_rate
+ self._announce_volume_increase: int = 15
@property
def repeat_mode(self) -> RepeatMode:
if not self._shuffle_enabled and enabled:
# shuffle requested
self._shuffle_enabled = True
- if self._queue.current_index is not None:
- played_items = self._queue.items[: self._queue.current_index]
- next_items = self._queue.items[self._queue.current_index + 1 :]
+ cur_index = self._queue.index_in_buffer
+ cur_item = self._queue.get_item(cur_index)
+ if cur_item is not None:
+ played_items = self._queue.items[:cur_index]
+ next_items = self._queue.items[cur_index + 1 :]
# for now we use default python random function
# can be extended with some more magic based on last_played and stuff
next_items = random.sample(next_items, len(next_items))
- items = played_items + [self._queue.current_item] + next_items
- asyncio.create_task(self._queue.load(items))
+
+ items = played_items + [cur_item] + next_items
+ asyncio.create_task(self._queue.update_items(items))
self._on_update("shuffle_enabled")
elif self._shuffle_enabled and not enabled:
# unshuffle
self._shuffle_enabled = False
- if self._queue.current_index is not None:
- played_items = self._queue.items[: self._queue.current_index]
- next_items = self._queue.items[self._queue.current_index + 1 :]
+ cur_index = self._queue.index_in_buffer
+ cur_item = self._queue.get_item(cur_index)
+ if cur_item is not None:
+ played_items = self._queue.items[:cur_index]
+ next_items = self._queue.items[cur_index + 1 :]
next_items.sort(key=lambda x: x.sort_index, reverse=False)
- items = played_items + [self._queue.current_item] + next_items
- asyncio.create_task(self._queue.load(items))
+ items = played_items + [cur_item] + next_items
+ asyncio.create_task(self._queue.update_items(items))
self._on_update("shuffle_enabled")
@property
@property
def stream_type(self) -> ContentType:
"""Return supported/preferred stream type for this playerqueue."""
- if self._stream_type is None:
- # return player's default
- return self._queue.player.default_stream_type
return self._stream_type
@stream_type.setter
self._on_update("stream_type")
@property
- def sample_rates(self) -> Tuple[int]:
- """Return supported/preferred sample rate(s) for this playerqueue."""
- if self._sample_rates is None:
- # return player's default
- return self._queue.player.default_sample_rates
- return self._sample_rates
-
- @sample_rates.setter
- def sample_rates(self, value: ContentType) -> None:
+ def max_sample_rate(self) -> int:
+ """Return max supported/needed sample rate(s) for this playerqueue."""
+ return self._max_sample_rate
+
+ @max_sample_rate.setter
+ def max_sample_rate(self, value: ContentType) -> None:
"""Set supported/preferred sample rate(s) for this playerqueue."""
- if self._stream_type != value:
- self._stream_type = value
- self._on_update("sample_rates")
+ if self._max_sample_rate != value:
+ self._max_sample_rate = value
+ self._on_update("max_sample_rate")
@property
- def max_sample_rate(self) -> int:
- """Return the maximum samplerate supported by this playerqueue."""
- return max(self.sample_rates)
+ def announce_volume_increase(self) -> int:
+ """Return announce_volume_increase setting (percentage relative to current)."""
+ return self._announce_volume_increase
+
+ @announce_volume_increase.setter
+ def announce_volume_increase(self, volume_increase: int) -> None:
+ """Set announce_volume_increase setting."""
+ if self._announce_volume_increase != volume_increase:
+ self._announce_volume_increase = volume_increase
+ self._on_update("announce_volume_increase")
def to_dict(self) -> Dict[str, Any]:
"""Return dict from settings."""
"volume_normalization_enabled": self.volume_normalization_enabled,
"volume_normalization_target": self.volume_normalization_target,
"stream_type": self.stream_type.value,
- "sample_rates": self.sample_rates,
+ "max_sample_rate": self.max_sample_rate,
+ "announce_volume_increase": self.announce_volume_increase,
}
+ def from_dict(self, d: Dict[str, Any]) -> None:
+ """Initialize settings from dict."""
+ self._repeat_mode = RepeatMode(d.get("repeat_mode", self._repeat_mode.value))
+ self._shuffle_enabled = bool(d.get("shuffle_enabled", self._shuffle_enabled))
+ self._crossfade_mode = CrossFadeMode(
+ d.get("crossfade_mode", self._crossfade_mode.value)
+ )
+ self._crossfade_duration = int(
+ d.get("crossfade_duration", self._crossfade_duration)
+ )
+ self._volume_normalization_enabled = bool(
+ d.get("volume_normalization_enabled", self._volume_normalization_enabled)
+ )
+ self._volume_normalization_target = float(
+ d.get("volume_normalization_target", self._volume_normalization_target)
+ )
+ self._stream_type = ContentType(d.get("stream_type", self._stream_type.value))
+ self._max_sample_rate = int(d.get("max_sample_rate", self._max_sample_rate))
+ self._announce_volume_increase = int(
+ d.get("announce_volume_increase", self._announce_volume_increase)
+ )
+
async def restore(self) -> None:
"""Restore state from db."""
- for key, val_type in (
- ("repeat_mode", RepeatMode),
- ("crossfade_mode", CrossFadeMode),
- ("shuffle_enabled", bool),
- ("crossfade_duration", int),
- ("volume_normalization_enabled", bool),
- ("volume_normalization_target", float),
- ("stream_type", ContentType),
- ("sample_rates", tuple),
- ):
+ values = {}
+ for key in self.to_dict():
db_key = f"{self._queue.queue_id}_{key}"
if db_value := await self.mass.database.get_setting(db_key):
- value = val_type(db_value["value"])
- setattr(self, f"_{key}", value)
+ values[key] = db_value
+ self.from_dict(values)
def _on_update(self, changed_key: Optional[str] = None) -> None:
"""Handle state changed."""
self._queue.signal_update()
self.mass.create_task(self.save(changed_key))
- # TODO: restart play if setting changed that impacts playing queue
async def save(self, changed_key: Optional[str] = None) -> None:
"""Save state in db."""