check_audio_support,
create_wave_header,
crossfade_pcm_parts,
+ fadein_pcm_part,
get_media_stream,
get_preview_stream,
get_sox_args_for_pcm_stream,
enc_track_id = urllib.parse.quote(track_id)
return f"http://{self._ip}:{self._port}/preview?provider_id={provider.value}&item_id={enc_track_id}"
+ def get_silence_url(self, duration: int = 600) -> str:
+ """Return url to silence."""
+ return f"http://{self._ip}:{self._port}/silence?duration={duration}"
+
async def setup(self) -> None:
"""Async initialize of module."""
app = web.Application()
app.router.add_get("/preview", self.serve_preview)
+ app.router.add_get("/silence", self.serve_silence)
app.router.add_get(
"/{queue_id}/{player_id}.{format}",
self.serve_multi_client_queue_stream,
self.logger.info("Started stream server on port %s", self._port)
+ @staticmethod
+ async def serve_silence(request: web.Request):
+ """Serve silence."""
+ resp = web.StreamResponse(
+ status=200, reason="OK", headers={"Content-Type": "audio/wav"}
+ )
+ await resp.prepare(request)
+ duration = int(request.query.get("duration", 600))
+ await resp.write(create_wave_header(duration=duration))
+ for _ in range(0, duration):
+ await resp.write(b"\0" * 1764000)
+ return resp
+
async def serve_preview(self, request: web.Request):
"""Serve short preview sample."""
provider_id = request.query["provider_id"]
# send stop here to prevent the player from retrying over and over
await queue.stop()
# send some silence to allow the player to process the stop request
- result = create_wave_header(duration=10)
- result += b"\0" * 1764000
- return web.Response(status=200, body=result, content_type="audio/wav")
+ return await self.serve_silence(request)
resp = web.StreamResponse(
status=200, reason="OK", headers={"Content-Type": f"audio/{fmt}"}
resample: bool = False,
) -> AsyncGenerator[None, bytes]:
"""Stream the PlayerQueue's tracks as constant feed of PCM raw audio."""
+ bytes_written_total = 0
last_fadeout_data = b""
queue_index = None
track_count = 0
# stream queue tracks one by one
while True:
- start_timestamp = time()
# get the (next) track in queue
track_count += 1
if track_count == 1:
# report start of queue playback so we can calculate current track/duration etc.
- queue_index, seek_position = await queue.queue_stream_start()
+ queue_index, seek_position, fade_in = await queue.queue_stream_start()
else:
queue_index = await queue.queue_stream_next(queue_index)
seek_position = 0
+ fade_in = 0
queue_track = queue.get_item(queue_index)
if not queue_track:
self.logger.debug(
# HANDLE FIRST PART OF TRACK
if not chunk and bytes_written == 0 and is_last_chunk:
- # stream error: got empy first chunk
+ # stream error: got empy first chunk ?!
self.logger.warning("Stream error on %s", queue_track.uri)
- # prevent player queue get stuck by just skipping to the next track
- queue_track.duration = 0
- continue
- if cur_chunk <= 2 and not last_fadeout_data:
+ elif cur_chunk == 1 and last_fadeout_data:
+ prev_chunk = chunk
+ del chunk
+ elif cur_chunk == 1 and fade_in:
+ # fadein first chunk
+ fadein_first_part = await fadein_pcm_part(
+ chunk, fade_in, pcm_fmt, sample_rate
+ )
+ yield fadein_first_part
+ bytes_written += len(fadein_first_part)
+ del chunk
+ del fadein_first_part
+ elif cur_chunk <= 2 and not last_fadeout_data:
# no fadeout_part available so just pass it to the output directly
yield chunk
bytes_written += len(chunk)
del chunk
- elif cur_chunk == 1 and last_fadeout_data:
- prev_chunk = chunk
- del chunk
# HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN
elif cur_chunk == 2 and last_fadeout_data:
# combine the first 2 chunks and strip off silence
else:
prev_chunk = chunk
del chunk
- # allow clients to only buffer max ~15 seconds ahead
- seconds_streamed = bytes_written / sample_size
- seconds_needed = int(time() - start_timestamp) + 15
- diff = seconds_streamed - seconds_needed
- if diff:
+ # allow clients to only buffer max ~10 seconds ahead
+ queue_track.streamdetails.seconds_played = bytes_written / sample_size
+ seconds_buffered = (bytes_written_total + bytes_written) / sample_size
+ seconds_needed = queue.player.elapsed_time + 10
+ diff = seconds_buffered - seconds_needed
+ track_time = queue_track.duration or 0
+ if track_time > 10 and diff > 1:
await asyncio.sleep(diff)
# end of the track reached
- # set actual duration to the queue for more accurate now playing info
- queue_track.streamdetails.seconds_played = bytes_written / sample_size
+ bytes_written_total += bytes_written
self.logger.debug(
"Finished Streaming queue track: %s (%s) on queue %s",
queue_track.uri,
return crossfade_part
+async def fadein_pcm_part(
+ pcm_audio: bytes,
+ fade_length: int,
+ fmt: ContentType,
+ sample_rate: int,
+) -> bytes:
+ """Fadein chunk of pcm/raw audio using ffmpeg."""
+ # input args
+ args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
+ args += [
+ "-f",
+ fmt.value,
+ "-ac",
+ "2",
+ "-ar",
+ str(sample_rate),
+ "-i",
+ "-",
+ ]
+ # filter args
+ args += ["-af", f"afade=type=in:start_time=0:duration={fade_length}"]
+ # output args
+ args += ["-f", fmt.value, "-"]
+ async with AsyncProcess(args, True) as proc:
+ result_audio, _ = await proc.communicate(pcm_audio)
+ return result_audio
+
+
async def strip_silence(
audio_data: bytes, fmt: ContentType, sample_rate: int, reverse=False
) -> bytes:
"-i",
stream_path,
]
+ if seek_position:
+ input_args += ["-ss", str(seek_position)]
# collect output args
if output_format.is_pcm():
output_args = [
filter_args += ["-filter:a", f"volume={streamdetails.gain_correct}dB"]
if resample or input_format.is_pcm():
filter_args += ["-ar", str(resample)]
- if seek_position:
- filter_args += ["-ss", str(seek_position)]
return input_args + filter_args + output_args
# Prefer SoX for all other (=highest quality)
) -> AsyncGenerator[Tuple[bool, bytes], None]:
"""Get the audio stream for the given streamdetails."""
+ if chunk_size is None:
+ if streamdetails.content_type in (
+ ContentType.AAC,
+ ContentType.M4A,
+ ContentType.MP3,
+ ContentType.OGG,
+ ):
+ chunk_size = 32000
+ else:
+ chunk_size = 256000
+
mass.signal_event(
MassEvent(
EventType.STREAM_STARTED,
streamdetails.item_id, streamdetails.provider
)
# send analyze job to background worker
- if streamdetails.loudness is None and streamdetails.provider != "url":
+ if (
+ streamdetails.loudness is None
+ and streamdetails.provider != ProviderType.URL
+ ):
uri = f"{streamdetails.provider.value}://{streamdetails.media_type.value}/{streamdetails.item_id}"
mass.add_job(
analyze_audio(mass, streamdetails), f"Analyze audio for {uri}"
from __future__ import annotations
import asyncio
+import pathlib
import random
from asyncio import Task, TimerHandle
+from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from music_assistant.helpers.audio import get_stream_details
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
+RESOURCES_DIR = (
+ pathlib.Path(__file__)
+ .parent.resolve()
+ .parent.resolve()
+ .joinpath("helpers/resources")
+)
+ALERT_ANNOUNCE_FILE = str(RESOURCES_DIR.joinpath("announce.flac"))
+FALLBACK_DURATION = 172800 # if duration is None (e.g. radio stream) = 48 hours
+
+
+@dataclass
+class QueueSnapShot:
+ """Represent a snapshot of the queue and its settings."""
+
+ powered: bool
+ state: PlayerState
+ volume_level: int
+ items: List[QueueItem]
+ index: Optional[int]
+ position: int
+
class PlayerQueue:
"""Represents a PlayerQueue object."""
self._start_index: int = 0
# start_pos: from which position (in seconds) did the first track start playing?
self._start_pos: int = 0
+ self._next_fadein: int = 0
self._next_start_index: int = 0
self._next_start_pos: int = 0
self._last_state = PlayerState.IDLE
self._signal_next: bool = False
self._last_player_update: int = 0
self._stream_url: str = ""
+ self._snapshot: Optional[QueueSnapShot] = None
async def setup(self) -> None:
"""Handle async setup of instance."""
"""
Play media item(s) on the given queue.
- :param queue_id: queue id of the PlayerQueue to handle the command.
:param uri: uri(s) that should be played (single item or list of uri's).
:param queue_opt:
QueueOption.PLAY -> Insert new items in queue and start playing at inserted position
await self.append(queue_items)
return self._stream_url
+ async def play_alert(
+ self, uri: str, announce: bool = False, volume_adjust: int = 10
+ ) -> str:
+ """
+ Play given uri as Alert on the queue.
+
+ uri: Uri that should be played as announcement, can be Music Assistant URI or plain url.
+ announce: Prepend the (TTS) alert with a small announce sound.
+ volume_adjust: Adjust the volume of the player by this percentage (relative).
+ """
+ if self._snapshot:
+ self.logger.debug("Ignore play_alert: already in progress")
+ return
+
+ # create snapshot
+ await self.snapshot_create()
+
+ queue_items = []
+
+ # prepend annnounce sound if needed
+ if announce:
+ queue_items.append(QueueItem.from_url(ALERT_ANNOUNCE_FILE, "alert"))
+
+ # parse provided uri into a MA MediaItem or Basic QueueItem from URL
+ try:
+ media_item = await self.mass.music.get_item_by_uri(uri)
+ queue_items.append(QueueItem.from_media_item(media_item))
+ except MusicAssistantError as err:
+ # invalid MA uri or item not found error
+ if uri.startswith("http"):
+ # a plain url was provided
+ queue_items.append(QueueItem.from_url(uri, "alert"))
+ else:
+ raise MediaNotFoundError(f"Invalid uri: {uri}") from err
+
+ # append silence track, we use this to reliably detect when the alert is ready
+ silence_url = self.mass.streams.get_silence_url(600)
+ queue_items.append(QueueItem.from_url(silence_url, "alert"))
+
+ # load queue with alert sound(s)
+ await self.load(queue_items)
+ # set new volume
+ new_volume = self.player.volume_level + (
+ self.player.volume_level / 100 * volume_adjust
+ )
+ await self.player.volume_set(new_volume)
+
+ # wait for the alert to finish playing
+ alert_done = asyncio.Event()
+
+ def handle_event(evt: MassEvent):
+ if (
+ self.current_item
+ and self.current_item.uri == silence_url
+ and self.elapsed_time
+ ):
+ alert_done.set()
+
+ unsub = self.mass.subscribe(
+ handle_event, EventType.QUEUE_TIME_UPDATED, self.queue_id
+ )
+ try:
+ await asyncio.wait_for(alert_done.wait(), 120)
+ finally:
+
+ unsub()
+ # restore queue
+ await self.snapshot_restore()
+
async def stop(self) -> None:
"""Stop command on queue player."""
# redirect to underlying player
"""Resume previous queue."""
resume_item = self.current_item
resume_pos = self._current_item_elapsed_time
- if resume_item and resume_pos > (resume_item.duration * 0.8):
- # track is already played for > 80% - skip to next
+ if (
+ resume_item
+ and resume_item.duration
+ and resume_pos > (resume_item.duration * 0.9)
+ ):
+ # track is already played for > 90% - skip to next
resume_item = self.next_item
resume_pos = 0
if resume_item is not None:
- await self.play_index(resume_item.item_id, resume_pos)
+ await self.play_index(resume_item.item_id, resume_pos, 5)
else:
self.logger.warning(
"resume queue requested for %s but queue is empty", self.queue_id
)
+ async def snapshot_create(self) -> None:
+ """Create snapshot of current Queue state."""
+ self._snapshot = QueueSnapShot(
+ powered=self.player.powered,
+ state=self.player.state,
+ volume_level=self.player.volume_level,
+ items=self._items,
+ index=self._current_index,
+ position=self._current_item_elapsed_time,
+ )
+
+ async def snapshot_restore(self) -> None:
+ """Restore snapshot of Queue state."""
+ assert self._snapshot, "Create snapshot before restoring it."
+ # clear queue first
+ await self.clear()
+ # restore volume
+ await self.player.volume_set(self._snapshot.volume_level)
+ # restore queue
+ await self.update(self._snapshot.items)
+ self._current_index = self._snapshot.index
+ if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED):
+ await self.resume()
+ if self._snapshot.state == PlayerState.PAUSED:
+ await self.pause()
+ if not self._snapshot.powered:
+ await self.player.power(False)
+ # reset snapshot once restored
+ self._snapshot = None
+
async def play_index(
- self, index: Union[int, str], seek_position: int = 0, passive: bool = False
+ self,
+ index: Union[int, str],
+ seek_position: int = 0,
+ fade_in: int = 0,
+ passive: bool = False,
) -> None:
"""Play item at index (or item_id) X in queue."""
if self.player.use_multi_stream:
self._current_index = index
self._next_start_index = index
self._next_start_pos = int(seek_position)
+ self._next_fadein = fade_in
# send stream url to player connected to this queue
self._stream_url = self.mass.streams.get_stream_url(
self.queue_id, content_type=self._settings.stream_type
await self.play_index(self._current_index + 2)
raise err
- async def queue_stream_start(self) -> Tuple[int, int]:
+ async def queue_stream_start(self) -> Tuple[int, int, int]:
"""Call when queue_streamer starts playing the queue stream."""
start_from_index = self._next_start_index
self._current_item_elapsed_time = 0
self._index_in_buffer = start_from_index
seek_position = self._next_start_pos
self._next_start_pos = 0
- return (start_from_index, seek_position)
+ fade_in = self._next_fadein
+ self._next_fadein = 0
+ return (start_from_index, seek_position, fade_in)
async def queue_stream_next(self, cur_index: int) -> int | None:
"""Call when queue_streamer loads next track in buffer."""
if not queue_track.streamdetails:
track_time = elapsed_time_queue - total_time
break
- track_duration = (
- queue_track.streamdetails.seconds_played or queue_track.duration
- )
- if elapsed_time_queue > (track_duration + total_time):
+ track_seconds = queue_track.streamdetails.seconds_played
+ if elapsed_time_queue > (track_seconds + total_time):
# total elapsed time is more than (streamed) track duration
# move index one up
- total_time += track_duration
+ total_time += track_seconds
queue_index += 1
else:
# no more seconds left to divide, this is our track
try:
self._items = [QueueItem.from_dict(x) for x in queue_cache["items"]]
self._current_index = queue_cache["current_index"]
+ self._current_item_elapsed_time = queue_cache.get(
+ "current_item_elapsed_time", 0
+ )
except (KeyError, AttributeError, TypeError) as err:
self.logger.warning(
"Unable to restore queue state for queue %s",
{
"items": [x.to_dict() for x in self._items],
"current_index": self._current_index,
+ "current_item_elapsed_time": self._current_item_elapsed_time,
},
)