import urllib.parse
from time import time
from types import CoroutineType
-from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional
+from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional, Tuple
from uuid import uuid4
from aiohttp import web
get_chunksize,
get_media_stream,
get_preview_stream,
- get_silence,
get_stream_details,
strip_silence,
)
self._port = mass.config.stream_port
self._ip = mass.config.stream_ip
self.queue_streams: Dict[str, QueueStream] = {}
+ self.announcements: Dict[str, Tuple[str]] = {}
@property
def base_url(self) -> str:
ext = content_type.value
return f"{self.base_url}/{stream_id}.{ext}"
+ def get_announcement_url(
+ self,
+ queue_id: str,
+ urls: Tuple[str],
+ content_type: ContentType,
+ ) -> str:
+ """Start running a queue stream."""
+ self.announcements[queue_id] = urls
+ ext = content_type.value
+ return f"{self.base_url}/announce/{queue_id}.{ext}"
+
async def get_preview_url(self, provider: ProviderType, track_id: str) -> str:
"""Return url to short preview sample."""
track = await self.mass.music.tracks.get_provider_item(track_id, provider)
enc_track_id = urllib.parse.quote(track_id)
return f"{self.base_url}/preview?provider_id={provider.value}&item_id={enc_track_id}"
- def get_silence_url(
- self,
- content_type: ContentType = ContentType.WAV,
- ) -> str:
- """Generate stream url for a silence Stream."""
- ext = content_type.value
- return f"{self.base_url}/silence.{ext}"
-
async def setup(self) -> None:
"""Async initialize of module."""
app = web.Application()
app.router.add_get("/preview", self.serve_preview)
- app.router.add_get("/silence.{fmt}", self.serve_silence)
+ app.router.add_get("/announce/{queue_id}.{fmt}", self.serve_announcement)
app.router.add_get("/{stream_id}.{fmt}", self.serve_queue_stream)
runner = web.AppRunner(app, access_log=None)
await resp.write(chunk)
return resp
- @staticmethod
- async def serve_silence(request: web.Request):
- """Serve some nice silence."""
- duration = int(request.query.get("duration", 60))
+ async def serve_announcement(self, request: web.Request):
+ """Serve announcement broadcast."""
+ queue_id = request.match_info["queue_id"]
fmt = ContentType.try_parse(request.match_info["fmt"])
+ urls = self.announcements[queue_id]
- resp = web.StreamResponse(
- status=200, reason="OK", headers={"Content-Type": f"audio/{fmt.value}"}
- )
- await resp.prepare(request)
- if request.method == "GET":
- async for chunk in get_silence(duration, fmt):
- await resp.write(chunk)
- return resp
+ ffmpeg_args = ["ffmpeg", "-hide_banner", "-loglevel", "quiet"]
+ for url in urls:
+ ffmpeg_args += ["-i", url]
+ if len(urls) > 1:
+ ffmpeg_args += [
+ "-filter_complex",
+ f"[0:a][1:a]concat=n={len(urls)}:v=0:a=1",
+ ]
+ ffmpeg_args += ["-f", fmt.value, "-"]
+
+ async with AsyncProcess(ffmpeg_args) as ffmpeg_proc:
+ output, _ = await ffmpeg_proc.communicate()
+
+ return web.Response(body=output, headers={"Content-Type": f"audio/{fmt.value}"})
async def serve_queue_stream(self, request: web.Request):
"""Serve queue audio stream to a single player."""
streamdetails = await get_stream_details(self.mass, first_item, queue.queue_id)
# work out pcm details
- if streamdetails.media_type == MediaType.ANNOUNCEMENT:
- pcm_sample_rate = 44100
- pcm_bit_depth = 16
- pcm_channels = 2
- allow_resample = True
- elif queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
+ if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
pcm_sample_rate = min(96000, queue.settings.max_sample_rate)
pcm_bit_depth = 24
pcm_channels = 2
use_crossfade = (
self.queue.settings.crossfade_mode != CrossFadeMode.DISABLED
and self.queue.settings.crossfade_duration > 0
- and streamdetails.media_type != MediaType.ANNOUNCEMENT
)
# do not crossfade tracks of same album
if (
else:
buffer_duration = 2
+ # use dynamic buffer size to account for slow connections (or throttling providers, like YT)
+ # buffer_duration has some overhead to account for padded silence
+ if use_crossfade and buffered_ahead > (crossfade_duration * 4):
+ buffer_duration = crossfade_duration + 6
+ elif use_crossfade and buffered_ahead > (crossfade_duration * 2):
+ buffer_duration = crossfade_duration + 4
+ else:
+ buffer_duration = 2
+
#### HANDLE FIRST PART OF TRACK
if len(chunk) == 0 and bytes_written == 0:
from __future__ import annotations
import asyncio
-import os
import pathlib
import random
from asyncio import TimerHandle
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
+from music_assistant.helpers.tags import parse_tags
from music_assistant.helpers.util import try_parse_int
-from music_assistant.models.enums import (
- ContentType,
- EventType,
- MediaType,
- ProviderType,
- QueueOption,
- RepeatMode,
-)
+from music_assistant.models.enums import EventType, MediaType, QueueOption, RepeatMode
from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.models.event import MassEvent
-from music_assistant.models.media_items import (
- MediaItemType,
- StreamDetails,
- media_from_dict,
-)
+from music_assistant.models.media_items import MediaItemType, media_from_dict
from .player import Player, PlayerState
from .queue_item import QueueItem
.joinpath("helpers/resources")
)
-ALERT_ANNOUNCE_FILE = str(RESOURCES_DIR.joinpath("announce.flac"))
-if not os.path.isfile(ALERT_ANNOUNCE_FILE):
- ALERT_ANNOUNCE_FILE = None
+ANNOUNCE_ALERT_FILE = str(RESOURCES_DIR.joinpath("announce.flac"))
FALLBACK_DURATION = 172800 # if duration is None (e.g. radio stream) = 48 hours
)
return
- def create_announcement(_url: str):
- return QueueItem(
- name="announcement",
- duration=30,
- streamdetails=StreamDetails(
- provider=ProviderType.URL,
- item_id=_url,
- content_type=ContentType.try_parse(_url),
- media_type=MediaType.ANNOUNCEMENT,
- loudness=0,
- gain_correct=4,
- direct=_url,
- ),
- )
-
try:
# create snapshot
await self.snapshot_create()
+ wait_time = 2
# stop player if needed
- if self.active and self.player.state in (
- PlayerState.PLAYING,
- PlayerState.PAUSED,
- ):
+ if self.active and self.player.state == PlayerState.PLAYING:
await self.stop()
self.announcement_in_progress = True
- await self._wait_for_state((PlayerState.OFF, PlayerState.IDLE))
-
- # turn on player if needed
- if not self.player.powered:
- await self.player.power(True)
- await self._wait_for_state(PlayerState.IDLE)
+ await asyncio.sleep(0.1)
# adjust volume if needed
if self._settings.announce_volume_increase:
)
announce_volume = min(announce_volume, 100)
announce_volume = max(announce_volume, 0)
+ # turn on player if needed (might be needed before adjusting the volume)
+ if not self.player.powered:
+ await self.player.power(True)
+ wait_time += 2
await self.player.volume_set(announce_volume)
- # adjust queue settings for announce playback
- self._settings.from_dict(
- {
- "repeat_mode": "off",
- "shuffle_enabled": False,
- }
- )
-
- queue_items = []
# prepend alert sound if needed
- if prepend_alert and ALERT_ANNOUNCE_FILE:
- queue_items.append(create_announcement(ALERT_ANNOUNCE_FILE))
-
- queue_items.append(create_announcement(url))
-
- # append silence track. we use that as a reliable way to make sure
- # there is enough buffer for the player to start quickly
- # and to detect when we finished playing the alert
- silence_item = create_announcement(self.mass.streams.get_silence_url())
- queue_items.append(silence_item)
+ if prepend_alert:
+ announce_urls = (ANNOUNCE_ALERT_FILE, url)
+ wait_time += 2
+ else:
+ announce_urls = (url,)
- # start queue with announcement sound(s)
- self._items = queue_items
- stream = await self.queue_stream_start(start_index=0, seek_position=0)
- # execute the play command on the player(s)
- await self.player.play_url(stream.url)
+ # send announcement stream to player
+ announce_stream_url = self.mass.streams.get_announcement_url(
+ self.queue_id, announce_urls, self._settings.stream_type
+ )
+ await self.player.play_url(announce_stream_url)
# wait for the player to finish playing
- await self._wait_for_state(PlayerState.PLAYING, silence_item.item_id)
+ info = await parse_tags(url)
+ wait_time += info.duration or 10
+ await asyncio.sleep(wait_time)
except Exception as err: # pylint: disable=broad-except
self.logger.exception("Error while playing announcement", exc_info=err)
self,
state: Union[None, PlayerState, Tuple[PlayerState]],
queue_item_id: Optional[str] = None,
- timeout: int = 30,
+ timeout: int = 120,
) -> None:
"""Wait for player(queue) to reach a specific state."""
if state is not None and not isinstance(state, tuple):