ATTR_UPDATED_AT = "updated_at"
ATTR_ACTIVE_QUEUE = "active_queue"
ATTR_GROUP_PARENTS = "group_parents"
+
+
+ROOT_LOGGER_NAME = "music_assistant"
from .providers.qobuz import QobuzProvider
from .providers.spotify import SpotifyProvider
from .providers.tunein import TuneInProvider
+from .providers.url import PROVIDER_CONFIG as URL_CONFIG
+from .providers.url import URLProvider
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
for prov_conf in self.mass.config.providers:
prov_cls = PROV_MAP[prov_conf.type]
await self._register_provider(prov_cls(self.mass, prov_conf), prov_conf)
+ # always register url provider
+ await self._register_provider(URLProvider(self.mass, URL_CONFIG), URL_CONFIG)
async def start_sync(
self,
from aiofiles.threadpool.binary import AsyncFileIO
from tinytag.tinytag import TinyTag
+from music_assistant.helpers.audio import get_file_stream
from music_assistant.helpers.compare import compare_strings
from music_assistant.helpers.database import SCHEMA_VERSION
from music_assistant.helpers.util import (
MediaType,
Playlist,
StreamDetails,
- StreamType,
Track,
)
from music_assistant.models.provider import MusicProvider
_, ext = Path(itempath).name.rsplit(".", 1)
content_type = CONTENT_TYPE_EXT.get(ext.lower())
+ stat = await self.mass.loop.run_in_executor(None, os.stat, itempath)
+
return StreamDetails(
- type=StreamType.FILE,
provider=self.type,
item_id=item_id,
content_type=content_type,
- path=itempath,
+ media_type=MediaType.TRACK,
+ duration=tags.duration,
+ size=stat.st_size,
sample_rate=tags.samplerate or 44100,
bit_depth=16, # TODO: parse bitdepth
+ data=itempath,
)
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ async for chunk in get_file_stream(
+ self.mass, streamdetails.data, streamdetails, seek_position
+ ):
+ yield chunk
+
async def _parse_track(self, track_path: str) -> Track | None:
"""Try to parse a track from a filename by reading its tags."""
from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module
app_var,
)
+from music_assistant.helpers.audio import get_http_stream
from music_assistant.helpers.cache import use_cache
from music_assistant.helpers.util import parse_title_and_version, try_parse_int
-from music_assistant.models.enums import EventType, ProviderType
-from music_assistant.models.errors import LoginFailed
-from music_assistant.models.event import MassEvent
+from music_assistant.models.enums import ProviderType
+from music_assistant.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.models.media_items import (
Album,
AlbumType,
MediaType,
Playlist,
StreamDetails,
- StreamType,
Track,
)
from music_assistant.models.provider import MusicProvider
token = await self._auth_token()
if not token:
raise LoginFailed(f"Login failed for user {self.config.username}")
- # subscribe to stream events so we can report playback to Qobuz
- self.mass.subscribe(
- self.on_stream_event,
- (EventType.STREAM_STARTED, EventType.STREAM_ENDED),
- id_filter=self.type.value,
- )
return True
async def search(
streamdata = result
break
if not streamdata:
- self.logger.error("Unable to retrieve stream details for track %s", item_id)
- return None
+ raise MediaNotFoundError(f"Unable to retrieve stream details for {item_id}")
if streamdata["mime_type"] == "audio/mpeg":
content_type = ContentType.MPEG
elif streamdata["mime_type"] == "audio/flac":
content_type = ContentType.FLAC
else:
- self.logger.error("Unsupported mime type for track %s", item_id)
- return None
+ raise MediaNotFoundError(f"Unsupported mime type for {item_id}")
return StreamDetails(
- type=StreamType.URL,
item_id=str(item_id),
provider=self.type,
- path=streamdata["url"],
content_type=content_type,
+ duration=streamdata["duration"],
sample_rate=int(streamdata["sampling_rate"] * 1000),
bit_depth=streamdata["bit_depth"],
- details=streamdata, # we need these details for reporting playback
+ data=streamdata, # we need these details for reporting playback
+ expires=time.time() + 1800, # not sure about the real allowed value
)
- async def on_stream_event(self, event: MassEvent):
- """
- Received event from mass.
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ self.mass.create_task(self._report_playback_started(streamdetails))
+ bytes_sent = 0
+ try:
+ url = streamdetails.data["url"]
+ async for chunk in get_http_stream(
+ self.mass, url, streamdetails, seek_position
+ ):
+ yield chunk
+ bytes_sent += len(chunk)
+ finally:
+ if bytes_sent:
+ self.mass.create_task(
+ self._report_playback_stopped(streamdetails, bytes_sent)
+ )
- We use this to report playback start/stop to qobuz.
- """
- if not self._user_auth_info:
- return
+ async def _report_playback_started(self, streamdetails: StreamDetails) -> None:
+ """Report playback start to qobuz."""
# TODO: need to figure out if the streamed track is purchased by user
# https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx
# {"albums":{"total":0,"items":[]},"tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}}
- if event.type == EventType.STREAM_STARTED:
- # report streaming started to qobuz
- device_id = self._user_auth_info["user"]["device"]["id"]
- credential_id = self._user_auth_info["user"]["credential"]["id"]
- user_id = self._user_auth_info["user"]["id"]
- format_id = event.data.details["format_id"]
- timestamp = int(time.time())
- events = [
- {
- "online": True,
- "sample": False,
- "intent": "stream",
- "device_id": device_id,
- "track_id": str(event.data.item_id),
- "purchase": False,
- "date": timestamp,
- "credential_id": credential_id,
- "user_id": user_id,
- "local": False,
- "format_id": format_id,
- }
- ]
- await self._post_data("track/reportStreamingStart", data=events)
- elif event.type == EventType.STREAM_ENDED:
- # report streaming ended to qobuz
- user_id = self._user_auth_info["user"]["id"]
- await self._get_data(
- "/track/reportStreamingEnd",
- user_id=user_id,
- track_id=str(event.data.item_id),
- duration=try_parse_int(event.data.seconds_played),
- )
+ device_id = self._user_auth_info["user"]["device"]["id"]
+ credential_id = self._user_auth_info["user"]["credential"]["id"]
+ user_id = self._user_auth_info["user"]["id"]
+ format_id = streamdetails.data["format_id"]
+ timestamp = int(time.time())
+ events = [
+ {
+ "online": True,
+ "sample": False,
+ "intent": "stream",
+ "device_id": device_id,
+ "track_id": str(streamdetails.item_id),
+ "purchase": False,
+ "date": timestamp,
+ "credential_id": credential_id,
+ "user_id": user_id,
+ "local": False,
+ "format_id": format_id,
+ }
+ ]
+ await self._post_data("track/reportStreamingStart", data=events)
+
+ async def _report_playback_stopped(
+ self, streamdetails: StreamDetails, bytes_sent: int
+ ) -> None:
+ """Report playback stop to qobuz."""
+ user_id = self._user_auth_info["user"]["id"]
+ await self._get_data(
+ "/track/reportStreamingEnd",
+ user_id=user_id,
+ track_id=str(streamdetails.item_id),
+ duration=try_parse_int(streamdetails.seconds_streamed),
+ )
async def _parse_artist(self, artist_obj: dict):
"""Parse qobuz artist object to generic layout."""
app_var,
)
from music_assistant.helpers.cache import use_cache
+from music_assistant.helpers.process import AsyncProcess
from music_assistant.helpers.util import parse_title_and_version
from music_assistant.models.enums import ProviderType
-from music_assistant.models.errors import LoginFailed
+from music_assistant.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.models.media_items import (
Album,
AlbumType,
MediaType,
Playlist,
StreamDetails,
- StreamType,
Track,
)
from music_assistant.models.provider import MusicProvider
# make sure a valid track is requested.
track = await self.get_track(item_id)
if not track:
- return None
+ raise MediaNotFoundError(f"track {item_id} not found")
# make sure that the token is still valid by just requesting it
await self.get_token()
- librespot = await self.get_librespot_binary()
- librespot_exec = f'{librespot} -c "{self._cache_dir}" --pass-through -b 320 --single-track spotify://track:{track.item_id}'
return StreamDetails(
- type=StreamType.EXECUTABLE,
item_id=track.item_id,
provider=self.type,
- path=librespot_exec,
content_type=ContentType.OGG,
- sample_rate=44100,
- bit_depth=16,
+ duration=track.duration,
)
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ # make sure that the token is still valid by just requesting it
+ await self.get_token()
+ librespot = await self.get_librespot_binary()
+ args = [
+ librespot,
+ "-c",
+ self._cache_dir,
+ "--pass-through",
+ "-b",
+ "320",
+ "--single-track",
+ f"spotify://track:{streamdetails.item_id}",
+ ]
+ if seek_position:
+ args += ["--start-position", str(int(seek_position))]
+ async with AsyncProcess(args) as librespot_proc:
+ async for chunk in librespot_proc.iterate_chunks():
+ yield chunk
+
async def _parse_artist(self, artist_obj):
"""Parse spotify artist object to generic layout."""
artist = Artist(
from asyncio_throttle import Throttler
+from music_assistant.helpers.audio import get_radio_stream
from music_assistant.helpers.cache import use_cache
from music_assistant.helpers.util import create_clean_string
from music_assistant.models.enums import ProviderType
-from music_assistant.models.errors import LoginFailed
+from music_assistant.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.models.media_items import (
ContentType,
ImageType,
MediaType,
Radio,
StreamDetails,
- StreamType,
)
from music_assistant.models.provider import MusicProvider
for stream in stream_info["body"]:
if stream["media_type"] == media_type:
return StreamDetails(
- type=StreamType.URL,
- item_id=item_id,
provider=self.type,
- path=stream["url"],
+ item_id=item_id,
content_type=ContentType(stream["media_type"]),
- sample_rate=44100,
- bit_depth=16,
media_type=MediaType.RADIO,
- details=stream,
+ data=stream,
)
- return None
+ raise MediaNotFoundError(f"Unable to retrieve stream details for {item_id}")
+
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ async for chunk in get_radio_stream(
+ self.mass, streamdetails.data["url"], streamdetails
+ ):
+ yield chunk
@use_cache(3600 * 2)
async def __get_data(self, endpoint: str, **kwargs):
--- /dev/null
+"""Basic provider allowing for external URL's to be streamed."""
+from __future__ import annotations
+
+import os
+from typing import AsyncGenerator, List, Optional
+
+from music_assistant.helpers.audio import (
+ get_file_stream,
+ get_http_stream,
+ get_radio_stream,
+)
+from music_assistant.models.config import MusicProviderConfig
+from music_assistant.models.enums import ContentType, MediaType, ProviderType
+from music_assistant.models.media_items import MediaItemType, StreamDetails
+from music_assistant.models.provider import MusicProvider
+
+PROVIDER_CONFIG = MusicProviderConfig(ProviderType.URL)
+
+
+class URLProvider(MusicProvider):
+ """Music Provider for manual URL's/files added to the queue."""
+
+ _attr_name: str = "URL"
+ _attr_type: ProviderType = ProviderType.URL
+ _attr_available: bool = True
+ _attr_supported_mediatypes: List[MediaType] = []
+
+ async def setup(self) -> bool:
+ """
+ Handle async initialization of the provider.
+
+ Called when provider is registered.
+ """
+ return True
+
+ async def search(
+ self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
+ ) -> List[MediaItemType]:
+ """Perform search on musicprovider."""
+ return []
+
+ async def get_stream_details(self, item_id: str) -> StreamDetails | None:
+ """Get streamdetails for a track/radio."""
+ url = item_id
+ return StreamDetails(
+ provider=ProviderType.URL,
+ item_id=item_id,
+ content_type=ContentType.try_parse(url),
+ media_type=MediaType.URL,
+ data=url,
+ )
+
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ if streamdetails.media_type == MediaType.RADIO:
+ # radio stream url
+ async for chunk in get_radio_stream(
+ self.mass, streamdetails.data, streamdetails
+ ):
+ yield chunk
+ elif os.path.isfile(streamdetails.data):
+ # local file
+ async for chunk in get_file_stream(
+ self.mass, streamdetails.data, streamdetails, seek_position
+ ):
+ yield chunk
+ else:
+ # regular stream url (without icy meta and reconnect)
+ async for chunk in get_http_stream(
+ self.mass, streamdetails.data, streamdetails, seek_position
+ ):
+ yield chunk
+++ /dev/null
-"""Controller to stream audio to players."""
-from __future__ import annotations
-
-import asyncio
-import urllib.parse
-from asyncio import Task
-from time import time
-from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional, Set
-
-from aiohttp import web
-
-from music_assistant.helpers.audio import (
- check_audio_support,
- create_wave_header,
- crossfade_pcm_parts,
- fadein_pcm_part,
- get_media_stream,
- get_preview_stream,
- get_sox_args_for_pcm_stream,
- get_stream_details,
- strip_silence,
-)
-from music_assistant.helpers.process import AsyncProcess
-from music_assistant.models.enums import (
- ContentType,
- CrossFadeMode,
- EventType,
- MediaType,
- ProviderType,
-)
-from music_assistant.models.errors import (
- MediaNotFoundError,
- MusicAssistantError,
- QueueEmpty,
-)
-from music_assistant.models.event import MassEvent
-from music_assistant.models.player_queue import PlayerQueue, QueueItem
-
-if TYPE_CHECKING:
- from music_assistant.mass import MusicAssistant
-
-
-class StreamController:
- """Controller to stream audio to players."""
-
- def __init__(self, mass: MusicAssistant):
- """Initialize instance."""
- self.mass = mass
- self.logger = mass.logger.getChild("stream")
- self._port = mass.config.stream_port
- self._ip = mass.config.stream_ip
- self._subscribers: Dict[str, Set[str]] = {}
- self._client_queues: Dict[str, Dict[str, asyncio.Queue]] = {}
- self._stream_tasks: Dict[str, Task] = {}
- self._time_started: Dict[str, float] = {}
-
- def get_stream_url(
- self,
- queue_id: str,
- child_player: Optional[str] = None,
- content_type: ContentType = ContentType.FLAC,
- ) -> str:
- """Return the full stream url for the PlayerQueue Stream."""
- ext = content_type.value
- if child_player:
- return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{ext}"
- return f"http://{self._ip}:{self._port}/{queue_id}.{ext}"
-
- async def get_preview_url(self, provider: ProviderType, track_id: str) -> str:
- """Return url to short preview sample."""
- track = await self.mass.music.tracks.get_provider_item(track_id, provider)
- if preview := track.metadata.preview:
- return preview
- enc_track_id = urllib.parse.quote(track_id)
- return f"http://{self._ip}:{self._port}/preview?provider_id={provider.value}&item_id={enc_track_id}"
-
- def get_silence_url(self, duration: int = 600) -> str:
- """Return url to silence."""
- return f"http://{self._ip}:{self._port}/silence?duration={duration}"
-
- async def setup(self) -> None:
- """Async initialize of module."""
- app = web.Application()
-
- app.router.add_get("/preview", self.serve_preview)
- app.router.add_get("/silence", self.serve_silence)
- app.router.add_get(
- "/{queue_id}/{player_id}.{format}",
- self.serve_multi_client_queue_stream,
- )
- app.router.add_get("/{queue_id}.{format}", self.serve_queue_stream)
- app.router.add_get("/{queue_id}", self.serve_queue_stream)
-
- runner = web.AppRunner(app, access_log=None)
- await runner.setup()
- # set host to None to bind to all addresses on both IPv4 and IPv6
- http_site = web.TCPSite(runner, host=None, port=self._port)
- await http_site.start()
-
- async def on_shutdown_event(*event: MassEvent):
- """Handle shutdown event."""
- await http_site.stop()
- await runner.cleanup()
- await app.shutdown()
- await app.cleanup()
- self.logger.info("Streamserver exited.")
-
- self.mass.subscribe(on_shutdown_event, EventType.SHUTDOWN)
-
- sox_present, ffmpeg_present = await check_audio_support(True)
- if not ffmpeg_present and not sox_present:
- self.logger.error(
- "SoX or FFmpeg binary not found on your system, "
- "playback will NOT work!."
- )
- elif not ffmpeg_present:
- self.logger.warning(
- "The FFmpeg binary was not found on your system, "
- "you might experience issues with playback. "
- "Please install FFmpeg with your OS package manager.",
- )
- elif not sox_present:
- self.logger.warning(
- "The SoX binary was not found on your system, FFmpeg is used as fallback."
- )
-
- self.logger.info("Started stream server on port %s", self._port)
-
- @staticmethod
- async def serve_silence(request: web.Request):
- """Serve silence."""
- resp = web.StreamResponse(
- status=200, reason="OK", headers={"Content-Type": "audio/wav"}
- )
- await resp.prepare(request)
- duration = int(request.query.get("duration", 600))
- await resp.write(create_wave_header(duration=duration))
- for _ in range(0, duration):
- await resp.write(b"\0" * 1764000)
- return resp
-
- async def serve_preview(self, request: web.Request):
- """Serve short preview sample."""
- provider_id = request.query["provider_id"]
- item_id = urllib.parse.unquote(request.query["item_id"])
- resp = web.StreamResponse(
- status=200, reason="OK", headers={"Content-Type": "audio/mp3"}
- )
- await resp.prepare(request)
- async for _, chunk in get_preview_stream(self.mass, provider_id, item_id):
- await resp.write(chunk)
- return resp
-
- async def serve_queue_stream(self, request: web.Request):
- """Serve queue audio stream to a single player (encoded to fileformat of choice)."""
- queue_id = request.match_info["queue_id"]
- fmt = request.match_info.get("format", "flac")
- queue = self.mass.players.get_player_queue(queue_id)
-
- if queue is None:
- return web.Response(status=404)
-
- # prepare request
- try:
- start_streamdetails = await queue.queue_stream_prepare()
- except QueueEmpty:
- # send stop here to prevent the player from retrying over and over
- await queue.stop()
- # send some silence to allow the player to process the stop request
- return await self.serve_silence(request)
-
- resp = web.StreamResponse(
- status=200, reason="OK", headers={"Content-Type": f"audio/{fmt}"}
- )
- await resp.prepare(request)
-
- output_fmt = ContentType(fmt)
- # work out sample rate
- if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
- sample_rate = min(96000, queue.max_sample_rate)
- bit_depth = 24
- channels = 2
- resample = True
- else:
- sample_rate = start_streamdetails.sample_rate
- bit_depth = start_streamdetails.bit_depth
- channels = start_streamdetails.channels
- resample = False
- sox_args = await get_sox_args_for_pcm_stream(
- sample_rate,
- bit_depth,
- channels,
- output_format=output_fmt,
- )
- # get the raw pcm bytes from the queue stream and on the fly encode to wanted format
- # send the compressed/encoded stream to the client.
- async with AsyncProcess(sox_args, True) as sox_proc:
-
- async def writer():
- # task that sends the raw pcm audio to the sox/ffmpeg process
- async for audio_chunk in self._get_queue_stream(
- queue,
- sample_rate=sample_rate,
- bit_depth=bit_depth,
- channels=channels,
- resample=resample,
- ):
- if sox_proc.closed:
- return
- await sox_proc.write(audio_chunk)
- # write eof when last packet is received
- sox_proc.write_eof()
-
- sox_proc.attach_task(writer())
-
- # read bytes from final output
- async for audio_chunk in sox_proc.iterate_chunks():
- await resp.write(audio_chunk)
-
- return resp
-
- async def serve_multi_client_queue_stream(self, request: web.Request):
- """Serve queue audio stream to multiple (group)clients."""
- queue_id = request.match_info["queue_id"]
- player_id = request.match_info["player_id"]
- fmt = request.match_info.get("format", "flac")
- queue = self.mass.players.get_player_queue(queue_id)
- player = self.mass.players.get_player(player_id)
-
- if queue is None or player is None:
- return web.Response(status=404)
-
- # prepare request
- resp = web.StreamResponse(
- status=200,
- reason="OK",
- headers={"Content-Type": f"audio/{fmt}"},
- )
- await resp.prepare(request)
-
- # start delivering audio chunks
- await self.subscribe_client(queue_id, player_id)
- try:
- while True:
- client_queue = self._client_queues.get(queue_id).get(player_id)
- if not client_queue:
- break
- audio_chunk = await client_queue.get()
- if audio_chunk == b"":
- # eof
- break
- await resp.write(audio_chunk)
- client_queue.task_done()
- finally:
- await self.unsubscribe_client(queue_id, player_id)
- return resp
-
- async def subscribe_client(self, queue_id: str, player_id: str) -> None:
- """Subscribe client to queue stream."""
- if queue_id not in self._stream_tasks:
- raise MusicAssistantError(f"No Queue stream available for {queue_id}")
-
- if queue_id not in self._subscribers:
- self._subscribers[queue_id] = set()
- self._subscribers[queue_id].add(player_id)
-
- self.logger.debug(
- "Subscribed player %s to multi queue stream %s",
- player_id,
- queue_id,
- )
-
- async def unsubscribe_client(self, queue_id: str, player_id: str):
- """Unsubscribe client from queue stream."""
- if player_id in self._subscribers[queue_id]:
- self._subscribers[queue_id].remove(player_id)
-
- self.__cleanup_client_queue(queue_id, player_id)
- self.logger.debug(
- "Unsubscribed player %s from multi queue stream %s", player_id, queue_id
- )
- if len(self._subscribers[queue_id]) == 0:
- # no more clients, cancel stream task
- if task := self._stream_tasks.pop(queue_id, None):
- self.logger.debug(
- "Aborted multi queue stream %s due to no more clients", queue_id
- )
- task.cancel()
-
- async def start_multi_client_queue_stream(
- self, queue_id: str, expected_clients: Set[str], output_fmt: ContentType
- ) -> None:
- """Start the Queue stream feeding callbacks of listeners.."""
- assert queue_id not in self._stream_tasks, "already running!"
-
- self._time_started[queue_id] = time()
-
- # create queue for expected clients
- self._client_queues.setdefault(queue_id, {})
- for child_id in expected_clients:
- self._client_queues[queue_id][child_id] = asyncio.Queue(10)
-
- self._stream_tasks[queue_id] = asyncio.create_task(
- self.__multi_client_queue_stream_runner(queue_id, output_fmt)
- )
-
- async def stop_multi_client_queue_stream(self, queue_id: str) -> None:
- """Signal a running queue stream task and its listeners to stop."""
- if queue_id not in self._stream_tasks:
- return
-
- # send stop to child players
- await asyncio.gather(
- *[
- self.mass.players.get_player(client_id).stop()
- for client_id in self._subscribers.get(queue_id, {})
- ]
- )
-
- # stop background task
- if stream_task := self._stream_tasks.pop(queue_id, None):
- stream_task.cancel()
-
- # wait for cleanup
- while len(self._subscribers.get(queue_id, {})) != 0:
- await asyncio.sleep(0.1)
- while len(self._client_queues.get(queue_id, {})) != 0:
- await asyncio.sleep(0.1)
-
- async def __multi_client_queue_stream_runner(
- self, queue_id: str, output_fmt: ContentType
- ):
- """Distribute audio chunks over connected clients in a multi client queue stream."""
- queue = self.mass.players.get_player_queue(queue_id)
-
- start_streamdetails = await queue.queue_stream_prepare()
- # work out sample rate
- if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
- sample_rate = min(96000, queue.max_sample_rate)
- bit_depth = 24
- channels = 2
- resample = True
- else:
- sample_rate = start_streamdetails.sample_rate
- bit_depth = start_streamdetails.bit_depth
- channels = start_streamdetails.channels
- resample = False
- sox_args = await get_sox_args_for_pcm_stream(
- sample_rate,
- bit_depth,
- channels,
- output_format=output_fmt,
- )
- self.logger.debug("Multi client queue stream %s started", queue.queue_id)
- try:
-
- # get the raw pcm bytes from the queue stream and on the fly encode to wanted format
- # send the compressed/endoded stream to the client.
- async with AsyncProcess(sox_args, True) as sox_proc:
-
- async def writer():
- """Task that sends the raw pcm audio to the sox/ffmpeg process."""
- async for audio_chunk in self._get_queue_stream(
- queue,
- sample_rate=sample_rate,
- bit_depth=bit_depth,
- channels=channels,
- resample=resample,
- ):
- if sox_proc.closed:
- return
- await sox_proc.write(audio_chunk)
- # write eof when last packet is received
- sox_proc.write_eof()
-
- async def reader():
- """Read bytes from final output and put chunk on child queues."""
- chunks_sent = 0
- async for chunk in sox_proc.iterate_chunks():
- chunks_sent += 1
- coros = []
- for player_id in list(self._client_queues[queue_id].keys()):
- if (
- self._client_queues[queue_id][player_id].full()
- and chunks_sent >= 10
- and player_id not in self._subscribers[queue_id]
- ):
- # assume client did not connect at all or got disconnected somehow
- self.__cleanup_client_queue(queue_id, player_id)
- self._client_queues[queue_id].pop(player_id, None)
- else:
- coros.append(
- self._client_queues[queue_id][player_id].put(chunk)
- )
- await asyncio.gather(*coros)
-
- # launch the reader and writer
- await asyncio.gather(*[writer(), reader()])
- # wait for all queues to consume their data
- await asyncio.gather(
- *[cq.join() for cq in self._client_queues[queue_id].values()]
- )
- # send empty chunk to inform EOF
- await asyncio.gather(
- *[cq.put(b"") for cq in self._client_queues[queue_id].values()]
- )
-
- finally:
- self.logger.debug("Multi client queue stream %s finished", queue.queue_id)
- # cleanup
- self._stream_tasks.pop(queue_id, None)
- for player_id in list(self._client_queues[queue_id].keys()):
- self.__cleanup_client_queue(queue_id, player_id)
-
- self.logger.debug("Multi client queue stream %s ended", queue.queue_id)
-
- def __cleanup_client_queue(self, queue_id: str, player_id: str):
- """Cleanup a client queue after it completes/disconnects."""
- if client_queue := self._client_queues.get(queue_id, {}).pop(player_id, None):
- for _ in range(client_queue.qsize()):
- client_queue.get_nowait()
- client_queue.task_done()
- client_queue.put_nowait(b"")
-
- async def _get_queue_stream(
- self,
- queue: PlayerQueue,
- sample_rate: int,
- bit_depth: int,
- channels: int = 2,
- resample: bool = False,
- ) -> AsyncGenerator[None, bytes]:
- """Stream the PlayerQueue's tracks as constant feed of PCM raw audio."""
- bytes_written_total = 0
- last_fadeout_data = b""
- queue_index = None
- track_count = 0
- prev_track: Optional[QueueItem] = None
-
- pcm_fmt = ContentType.from_bit_depth(bit_depth)
- self.logger.info(
- "Starting Queue audio stream for Queue %s (PCM format: %s - sample rate: %s)",
- queue.player.name,
- pcm_fmt.value,
- sample_rate,
- )
-
- # stream queue tracks one by one
- while True:
- # get the (next) track in queue
- track_count += 1
- if track_count == 1:
- # report start of queue playback so we can calculate current track/duration etc.
- queue_index, seek_position, fade_in = await queue.queue_stream_start()
- else:
- queue_index = await queue.queue_stream_next(queue_index)
- seek_position = 0
- fade_in = 0
- queue_track = queue.get_item(queue_index)
- if not queue_track:
- self.logger.debug(
- "Abort Queue stream %s: no (more) tracks in queue", queue.queue_id
- )
- break
- # get streamdetails
- try:
- streamdetails = await get_stream_details(
- self.mass, queue_track, queue.queue_id
- )
- except MediaNotFoundError as err:
- self.logger.warning(
- "Skip track %s due to missing streamdetails",
- queue_track.name,
- exc_info=err,
- )
- continue
-
- # check the PCM samplerate/bitrate
- if not resample and streamdetails.bit_depth > bit_depth:
- await queue.queue_stream_signal_next()
- self.logger.debug(
- "Abort queue stream %s due to bit depth mismatch", queue.player.name
- )
- break
- if (
- not resample
- and streamdetails.sample_rate > sample_rate
- and streamdetails.sample_rate <= queue.max_sample_rate
- ):
- self.logger.debug(
- "Abort queue stream %s due to sample rate mismatch",
- queue.player.name,
- )
- await queue.queue_stream_signal_next()
- break
-
- # check crossfade ability
- use_crossfade = queue.settings.crossfade_mode != CrossFadeMode.DISABLED
- if (
- prev_track is not None
- and prev_track.media_type == MediaType.TRACK
- and queue_track.media_type == MediaType.TRACK
- ):
- prev_item = await self.mass.music.get_item_by_uri(prev_track.uri)
- new_item = await self.mass.music.get_item_by_uri(queue_track.uri)
- if (
- prev_item.album is not None
- and new_item.album is not None
- and prev_item.album == new_item.album
- ):
- use_crossfade = False
- prev_track = queue_track
-
- sample_size = int(sample_rate * (bit_depth / 8) * channels) # 1 second
- buffer_size = sample_size * (queue.settings.crossfade_duration or 2)
- # force small buffer for radio to prevent too much lag at start
- if queue_track.media_type != MediaType.TRACK:
- use_crossfade = False
- buffer_size = sample_size
-
- self.logger.info(
- "Start Streaming queue track: %s (%s) for queue %s",
- queue_track.uri,
- queue_track.name,
- queue.player.name,
- )
- queue_track.streamdetails.seconds_skipped = seek_position
- fade_in_part = b""
- cur_chunk = 0
- prev_chunk = None
- bytes_written = 0
- # handle incoming audio chunks
- async for is_last_chunk, chunk in get_media_stream(
- self.mass,
- streamdetails,
- pcm_fmt,
- resample=sample_rate,
- chunk_size=buffer_size,
- seek_position=seek_position,
- ):
- cur_chunk += 1
-
- # HANDLE FIRST PART OF TRACK
- if not chunk and bytes_written == 0 and is_last_chunk:
- # stream error: got empy first chunk ?!
- self.logger.warning("Stream error on %s", queue_track.uri)
- elif cur_chunk == 1 and last_fadeout_data:
- prev_chunk = chunk
- del chunk
- elif cur_chunk == 1 and fade_in:
- # fadein first chunk
- fadein_first_part = await fadein_pcm_part(
- chunk, fade_in, pcm_fmt, sample_rate
- )
- yield fadein_first_part
- bytes_written += len(fadein_first_part)
- del chunk
- del fadein_first_part
- elif cur_chunk <= 2 and not last_fadeout_data:
- # no fadeout_part available so just pass it to the output directly
- yield chunk
- bytes_written += len(chunk)
- del chunk
- # HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN
- elif cur_chunk == 2 and last_fadeout_data:
- # combine the first 2 chunks and strip off silence
- first_part = await strip_silence(
- prev_chunk + chunk, pcm_fmt, sample_rate
- )
- if len(first_part) < buffer_size:
- # part is too short after the strip action?!
- # so we just use the full first part
- first_part = prev_chunk + chunk
- fade_in_part = first_part[:buffer_size]
- remaining_bytes = first_part[buffer_size:]
- del first_part
- # do crossfade
- crossfade_part = await crossfade_pcm_parts(
- fade_in_part,
- last_fadeout_data,
- queue.settings.crossfade_duration,
- pcm_fmt,
- sample_rate,
- )
- # send crossfade_part
- yield crossfade_part
- bytes_written += len(crossfade_part)
- del crossfade_part
- del fade_in_part
- last_fadeout_data = b""
- # also write the leftover bytes from the strip action
- yield remaining_bytes
- bytes_written += len(remaining_bytes)
- del remaining_bytes
- del chunk
- prev_chunk = None # needed to prevent this chunk being sent again
- # HANDLE LAST PART OF TRACK
- elif prev_chunk and is_last_chunk:
- # last chunk received so create the last_part
- # with the previous chunk and this chunk
- # and strip off silence
- last_part = await strip_silence(
- prev_chunk + chunk, pcm_fmt, sample_rate, True
- )
- if len(last_part) < buffer_size:
- # part is too short after the strip action
- # so we just use the entire original data
- last_part = prev_chunk + chunk
- if not use_crossfade or len(last_part) < buffer_size:
- # crossfading is not enabled or not enough data,
- # so just pass the (stripped) audio data
- if use_crossfade:
- self.logger.warning(
- "Not enough data for crossfade: %s", len(last_part)
- )
- yield last_part
- bytes_written += len(last_part)
- del last_part
- del chunk
- else:
- # handle crossfading support
- # store fade section to be picked up for next track
- last_fadeout_data = last_part[-buffer_size:]
- remaining_bytes = last_part[:-buffer_size]
- # write remaining bytes
- if remaining_bytes:
- yield remaining_bytes
- bytes_written += len(remaining_bytes)
- del last_part
- del remaining_bytes
- del chunk
- # MIDDLE PARTS OF TRACK
- else:
- # middle part of the track
- # keep previous chunk in memory so we have enough
- # samples to perform the crossfade
- if prev_chunk:
- yield prev_chunk
- bytes_written += len(prev_chunk)
- prev_chunk = chunk
- else:
- prev_chunk = chunk
- del chunk
- # allow clients to only buffer max ~10 seconds ahead
- queue_track.streamdetails.seconds_played = bytes_written / sample_size
- seconds_buffered = (bytes_written_total + bytes_written) / sample_size
- seconds_needed = queue.player.elapsed_time + 10
- diff = seconds_buffered - seconds_needed
- track_time = queue_track.duration or 0
- if track_time > 10 and diff > 1:
- await asyncio.sleep(diff)
- # end of the track reached
- bytes_written_total += bytes_written
- self.logger.debug(
- "Finished Streaming queue track: %s (%s) on queue %s",
- queue_track.uri,
- queue_track.name,
- queue.player.name,
- )
- # end of queue reached, pass last fadeout bits to final output
- yield last_fadeout_data
- # END OF QUEUE STREAM
- self.logger.info("Queue stream for Queue %s finished.", queue.player.name)
--- /dev/null
+"""Controller to stream audio to players."""
+from __future__ import annotations
+
+import asyncio
+import gc
+import urllib.parse
+from types import CoroutineType
+from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional
+from uuid import uuid4
+
+from aiohttp import web
+
+from music_assistant.helpers.audio import (
+ check_audio_support,
+ create_wave_header,
+ crossfade_pcm_parts,
+ fadein_pcm_part,
+ get_chunksize,
+ get_ffmpeg_args_for_pcm_stream,
+ get_media_stream,
+ get_preview_stream,
+ get_stream_details,
+ strip_silence,
+)
+from music_assistant.helpers.process import AsyncProcess
+from music_assistant.models.enums import (
+ ContentType,
+ CrossFadeMode,
+ EventType,
+ MediaType,
+ ProviderType,
+)
+from music_assistant.models.errors import MediaNotFoundError, QueueEmpty
+from music_assistant.models.event import MassEvent
+from music_assistant.models.player_queue import PlayerQueue
+from music_assistant.models.queue_item import QueueItem
+
+if TYPE_CHECKING:
+ from music_assistant.mass import MusicAssistant
+
+
+class StreamsController:
+ """Controller to stream audio to players."""
+
+ def __init__(self, mass: MusicAssistant):
+ """Initialize instance."""
+ self.mass = mass
+ self.logger = mass.logger.getChild("stream")
+ self._port = mass.config.stream_port
+ self._ip = mass.config.stream_ip
+ self.queue_streams: Dict[str, QueueStream] = {}
+
+ @property
+ def base_url(self) -> str:
+ """Return the base url for the stream engine."""
+ return f"http://{self._ip}:{self._port}"
+
+ def get_stream_url(
+ self,
+ stream_id: str,
+ content_type: ContentType = ContentType.FLAC,
+ ) -> str:
+ """Generate unique stream url for the PlayerQueue Stream."""
+ ext = content_type.value
+ return f"{self.base_url}/{stream_id}.{ext}"
+
+ async def get_preview_url(self, provider: ProviderType, track_id: str) -> str:
+ """Return url to short preview sample."""
+ track = await self.mass.music.tracks.get_provider_item(track_id, provider)
+ if preview := track.metadata.preview:
+ return preview
+ enc_track_id = urllib.parse.quote(track_id)
+ return f"{self.base_url}/preview?provider_id={provider.value}&item_id={enc_track_id}"
+
+ def get_silence_url(self, duration: int = 600) -> str:
+ """Return url to silence."""
+ return f"{self.base_url}/silence?duration={duration}"
+
+ async def setup(self) -> None:
+ """Async initialize of module."""
+ app = web.Application()
+
+ app.router.add_get("/preview", self.serve_preview)
+ app.router.add_get("/silence", self.serve_silence)
+ app.router.add_get("/{stream_id}.{format}", self.serve_queue_stream)
+
+ runner = web.AppRunner(app, access_log=None)
+ await runner.setup()
+ # set host to None to bind to all addresses on both IPv4 and IPv6
+ http_site = web.TCPSite(runner, host=None, port=self._port)
+ await http_site.start()
+
+ async def on_shutdown_event(*event: MassEvent):
+ """Handle shutdown event."""
+ await http_site.stop()
+ await runner.cleanup()
+ await app.shutdown()
+ await app.cleanup()
+ self.logger.info("Streamserver exited.")
+
+ self.mass.subscribe(on_shutdown_event, EventType.SHUTDOWN)
+
+ ffmpeg_present, libsoxr_support = await check_audio_support(True)
+ if not ffmpeg_present:
+ self.logger.error(
+ "FFmpeg binary not found on your system, " "playback will NOT work!."
+ )
+ elif not libsoxr_support:
+ self.logger.warning(
+ "FFmpeg version found without libsoxr support, "
+ "highest quality audio not available. "
+ )
+
+ self.logger.info("Started stream server on port %s", self._port)
+
+ @staticmethod
+ async def serve_silence(request: web.Request):
+ """Serve silence."""
+ resp = web.StreamResponse(
+ status=200, reason="OK", headers={"Content-Type": "audio/wav"}
+ )
+ await resp.prepare(request)
+ duration = int(request.query.get("duration", 600))
+ await resp.write(create_wave_header(duration=duration))
+ for _ in range(0, duration):
+ await resp.write(b"\0" * 1764000)
+ return resp
+
+ async def serve_preview(self, request: web.Request):
+ """Serve short preview sample."""
+ provider_id = request.query["provider_id"]
+ item_id = urllib.parse.unquote(request.query["item_id"])
+ resp = web.StreamResponse(
+ status=200, reason="OK", headers={"Content-Type": "audio/mp3"}
+ )
+ await resp.prepare(request)
+ async for chunk in get_preview_stream(self.mass, provider_id, item_id):
+ await resp.write(chunk)
+ return resp
+
+ async def serve_queue_stream(self, request: web.Request):
+ """Serve queue audio stream to a single player."""
+ self.logger.info(request)
+ self.logger.info(request.headers)
+
+ stream_id = request.match_info["stream_id"]
+ queue_stream = self.queue_streams.get(stream_id)
+
+ if queue_stream is None:
+ self.logger.warning("Got stream request for unknown id: %s", stream_id)
+ return web.Response(status=404)
+
+ # prepare request, add some DLNA/UPNP compatible headers
+ headers = {
+ "Content-Type": f"audio/{queue_stream.output_format.value}",
+ "transferMode.dlna.org": "Streaming",
+ "Connection": "Close",
+ "contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000",
+ }
+ resp = web.StreamResponse(headers=headers)
+ await resp.prepare(request)
+ client_id = request.remote
+ await queue_stream.subscribe(client_id, resp.write)
+
+ return resp
+
+ async def start_queue_stream(
+ self,
+ queue: PlayerQueue,
+ expected_clients: int,
+ start_index: int,
+ seek_position: int,
+ fade_in: bool,
+ output_format: ContentType,
+ ) -> QueueStream:
+ """Start running a queue stream."""
+ # cleanup stale previous queue tasks
+
+ # generate unique stream url
+ stream_id = uuid4().hex
+ # determine the pcm details based on the first track we need to stream
+ try:
+ first_item = queue.items[start_index]
+ except (IndexError, TypeError) as err:
+ raise QueueEmpty() from err
+
+ streamdetails = await get_stream_details(self.mass, first_item, queue.queue_id)
+
+ # work out pcm details
+ if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
+ pcm_sample_rate = min(96000, queue.max_sample_rate)
+ pcm_bit_depth = 24
+ pcm_channels = 2
+ pcm_resample = True
+ elif streamdetails.sample_rate > queue.max_sample_rate:
+ pcm_sample_rate = queue.max_sample_rate
+ pcm_bit_depth = streamdetails.bit_depth
+ pcm_channels = streamdetails.channels
+ pcm_resample = True
+ else:
+ pcm_sample_rate = streamdetails.sample_rate
+ pcm_bit_depth = streamdetails.bit_depth
+ pcm_channels = streamdetails.channels
+ pcm_resample = False
+
+ self.queue_streams[stream_id] = stream = QueueStream(
+ queue=queue,
+ stream_id=stream_id,
+ expected_clients=expected_clients,
+ start_index=start_index,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ output_format=output_format,
+ pcm_sample_rate=pcm_sample_rate,
+ pcm_bit_depth=pcm_bit_depth,
+ pcm_channels=pcm_channels,
+ pcm_resample=pcm_resample,
+ autostart=True,
+ )
+ self.mass.create_task(self.cleanup_stale)
+ return stream
+
+ def cleanup_stale(self) -> None:
+ """Cleanup stale/done stream tasks."""
+ stale = set()
+ for stream_id, stream in self.queue_streams.items():
+ if stream.done.is_set() and not stream.connected_clients:
+ stale.add(stream_id)
+ for stream_id in stale:
+ self.queue_streams.pop(stream_id, None)
+
+
+class QueueStream:
+ """Representation of a (multisubscriber) Audio Queue stream."""
+
+ def __init__(
+ self,
+ queue: PlayerQueue,
+ stream_id: str,
+ expected_clients: int,
+ start_index: int,
+ seek_position: int,
+ fade_in: bool,
+ output_format: ContentType,
+ pcm_sample_rate: int,
+ pcm_bit_depth: int,
+ pcm_channels: int = 2,
+ pcm_floating_point: bool = False,
+ pcm_resample: bool = False,
+ autostart: bool = False,
+ ):
+ """Init QueueStreamJob instance."""
+ self.queue = queue
+ self.stream_id = stream_id
+ self.expected_clients = expected_clients
+ self.start_index = start_index
+ self.seek_position = seek_position
+ self.fade_in = fade_in
+ self.output_format = output_format
+ self.pcm_sample_rate = pcm_sample_rate
+ self.pcm_bit_depth = pcm_bit_depth
+ self.pcm_channels = pcm_channels
+ self.pcm_floating_point = pcm_floating_point
+ self.pcm_resample = pcm_resample
+ self.url = queue.mass.streams.get_stream_url(stream_id, output_format)
+
+ self.mass = queue.mass
+ self.logger = self.queue.logger.getChild("stream")
+ self.expected_clients = expected_clients
+ self.connected_clients: Dict[str, CoroutineType[bytes]] = {}
+ self._runner_task: Optional[asyncio.Task] = None
+ self.done = asyncio.Event()
+ self.all_clients_connected = asyncio.Event()
+ self.index_in_buffer = start_index
+ self.signal_next: bool = False
+ if autostart:
+ self.mass.create_task(self.start())
+
+ async def start(self) -> None:
+ """Start running queue stream."""
+ self._runner_task = self.mass.create_task(self._queue_stream_runner())
+
+ async def stop(self) -> None:
+ """Stop running queue stream and cleanup."""
+ self.done.set()
+ if self._runner_task and not self._runner_task.done():
+ self._runner_task.cancel()
+ # allow some time to cleanup
+ await asyncio.sleep(2)
+
+ self._runner_task = None
+ self.connected_clients = {}
+
+ # run garbage collection manually due to the high number of
+ # processed bytes blocks
+ loop = asyncio.get_running_loop()
+ await loop.run_in_executor(None, gc.collect)
+ self.logger.debug("Stream job %s cleaned up", self.stream_id)
+
+ async def subscribe(self, client_id: str, callback: CoroutineType[bytes]) -> None:
+ """Subscribe callback and wait for completion."""
+ assert client_id not in self.connected_clients, "Client is already connected"
+ assert not self.done.is_set(), "Stream task is already finished"
+ self.connected_clients[client_id] = callback
+ self.logger.debug("client connected: %s", client_id)
+ if len(self.connected_clients) == self.expected_clients:
+ self.all_clients_connected.set()
+ try:
+ await self.done.wait()
+ finally:
+ self.connected_clients.pop(client_id, None)
+ self.logger.debug("client disconnected: %s", client_id)
+ if len(self.connected_clients) == 0:
+ # no more clients, perform cleanup
+ await self.stop()
+
+ async def _queue_stream_runner(self) -> None:
+ """Distribute audio chunks over connected client queues."""
+ ffmpeg_args = await get_ffmpeg_args_for_pcm_stream(
+ self.pcm_sample_rate,
+ self.pcm_bit_depth,
+ self.pcm_channels,
+ output_format=self.output_format,
+ )
+ # get the raw pcm bytes from the queue stream and on the fly encode to wanted format
+ # send the compressed/encoded stream to the client(s).
+ chunk_size = get_chunksize(self.output_format)
+ async with AsyncProcess(ffmpeg_args, True, chunk_size) as ffmpeg_proc:
+
+ async def writer():
+ """Task that sends the raw pcm audio to the ffmpeg process."""
+ async for audio_chunk in self._get_queue_stream():
+ await ffmpeg_proc.write(audio_chunk)
+ del audio_chunk
+ # write eof when last packet is received
+ ffmpeg_proc.write_eof()
+
+ ffmpeg_proc.attach_task(writer())
+
+ # wait max 5 seconds for all client(s) to connect
+ try:
+ await asyncio.wait_for(self.all_clients_connected.wait(), 5)
+ except asyncio.exceptions.TimeoutError:
+ self.logger.warning(
+ "Abort: client(s) did not connect within 5 seconds."
+ )
+ self.done.set()
+ return
+ self.logger.debug("%s clients connected", len(self.connected_clients))
+
+ # Read bytes from final output and send chunk to child callback.
+ async for chunk in ffmpeg_proc.iterate_chunks():
+ if len(self.connected_clients) == 0:
+ # no more clients
+ self.done.set()
+ self.logger.debug("Abort: all clients diconnected.")
+ return
+ for client_id in set(self.connected_clients.keys()):
+ try:
+ callback = self.connected_clients[client_id]
+ await callback(chunk)
+ except (
+ ConnectionResetError,
+ KeyError,
+ BrokenPipeError,
+ ):
+ self.connected_clients.pop(client_id, None)
+
+ del chunk
+
+ # complete queue streamed
+ if self.signal_next:
+ # the queue stream was aborted (e.g. because of sample rate mismatch)
+ # tell the queue to load the next track (restart stream) as soon
+ # as the player finished playing and returns to idle
+ self.queue.signal_next = True
+
+ # all queue data has been streamed. Either because the queue is exhausted
+ # or we need to restart the stream due to decoder/sample rate mismatch
+ # set event that this stream task is finished
+ # if the stream is restarted by the queue manager afterwards is controlled
+ # by the `signal_next` bool above.
+ self.done.set()
+
+ async def _get_queue_stream(
+ self,
+ ) -> AsyncGenerator[None, bytes]:
+ """Stream the PlayerQueue's tracks as constant feed of PCM raw audio."""
+ last_fadeout_data = b""
+ queue_index = None
+ track_count = 0
+ prev_track: Optional[QueueItem] = None
+
+ pcm_fmt = ContentType.from_bit_depth(self.pcm_bit_depth)
+ self.logger.debug(
+ "Starting Queue audio stream for Queue %s (PCM format: %s - sample rate: %s)",
+ self.queue.player.name,
+ pcm_fmt.value,
+ self.pcm_sample_rate,
+ )
+
+ # stream queue tracks one by one
+ while True:
+ # get the (next) track in queue
+ track_count += 1
+ if track_count == 1:
+ queue_index = self.start_index
+ seek_position = self.seek_position
+ fade_in = self.fade_in
+ else:
+ queue_index = self.queue.get_next_index(queue_index)
+ seek_position = 0
+ fade_in = False
+ self.index_in_buffer = queue_index
+ # send signal that we've loaded a new track into the buffer
+ self.queue.signal_update()
+ queue_track = self.queue.get_item(queue_index)
+ if not queue_track:
+ self.logger.debug(
+ "Abort Queue stream %s: no (more) tracks in queue",
+ self.queue.queue_id,
+ )
+ break
+ # get streamdetails
+ try:
+ streamdetails = await get_stream_details(
+ self.mass, queue_track, self.queue.queue_id
+ )
+ except MediaNotFoundError as err:
+ self.logger.warning(
+ "Skip track %s due to missing streamdetails",
+ queue_track.name,
+ exc_info=err,
+ )
+ continue
+
+ if queue_track.name == "alert":
+ self.pcm_resample = True
+
+ # check the PCM samplerate/bitrate
+ if not self.pcm_resample and streamdetails.bit_depth > self.pcm_bit_depth:
+ self.signal_next = True
+ self.logger.debug(
+ "Abort queue stream %s due to bit depth mismatch",
+ self.queue.player.name,
+ )
+ break
+ if (
+ not self.pcm_resample
+ and streamdetails.sample_rate > self.pcm_sample_rate
+ and streamdetails.sample_rate <= self.queue.max_sample_rate
+ ):
+ self.logger.debug(
+ "Abort queue stream %s due to sample rate mismatch",
+ self.queue.player.name,
+ )
+ self.signal_next = True
+ break
+
+ # check crossfade ability
+ use_crossfade = self.queue.settings.crossfade_mode != CrossFadeMode.DISABLED
+ if (
+ prev_track is not None
+ and prev_track.media_type == MediaType.TRACK
+ and queue_track.media_type == MediaType.TRACK
+ ):
+ prev_item = await self.mass.music.get_item_by_uri(prev_track.uri)
+ new_item = await self.mass.music.get_item_by_uri(queue_track.uri)
+ if (
+ prev_item.album is not None
+ and new_item.album is not None
+ and prev_item.album == new_item.album
+ ):
+ use_crossfade = False
+ prev_track = queue_track
+
+ sample_size = int(
+ self.pcm_sample_rate * (self.pcm_bit_depth / 8) * self.pcm_channels
+ ) # 1 second
+ buffer_size = sample_size * (self.queue.settings.crossfade_duration or 2)
+ # force small buffer for radio to prevent too much lag at start
+ if queue_track.media_type != MediaType.TRACK:
+ use_crossfade = False
+ buffer_size = sample_size
+
+ self.logger.info(
+ "Start Streaming queue track: %s (%s) for queue %s",
+ queue_track.uri,
+ queue_track.name,
+ self.queue.player.name,
+ )
+ queue_track.streamdetails.seconds_skipped = seek_position
+ fade_in_part = b""
+ cur_chunk = 0
+ prev_chunk = None
+ bytes_written = 0
+ # handle incoming audio chunks
+ async for chunk in get_media_stream(
+ self.mass,
+ streamdetails,
+ pcm_fmt,
+ pcm_sample_rate=self.pcm_sample_rate,
+ chunk_size=buffer_size,
+ seek_position=seek_position,
+ ):
+ cur_chunk += 1
+ is_last_chunk = len(chunk) < buffer_size
+
+ # HANDLE FIRST PART OF TRACK
+ if len(chunk) == 0 and bytes_written == 0 and is_last_chunk:
+ # stream error: got empy first chunk ?!
+ self.logger.warning("Stream error on %s", queue_track.uri)
+ elif cur_chunk == 1 and last_fadeout_data:
+ prev_chunk = chunk
+ del chunk
+ elif cur_chunk == 1 and fade_in:
+ # fadein first chunk
+ fadein_first_part = await fadein_pcm_part(
+ chunk, fade_in, pcm_fmt, self.pcm_sample_rate
+ )
+ yield fadein_first_part
+ bytes_written += len(fadein_first_part)
+ del chunk
+ del fadein_first_part
+ elif cur_chunk <= 2 and not last_fadeout_data:
+ # no fadeout_part available so just pass it to the output directly
+ yield chunk
+ bytes_written += len(chunk)
+ del chunk
+ # HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN
+ elif cur_chunk == 2 and last_fadeout_data:
+ # combine the first 2 chunks and strip off silence
+ first_part = await strip_silence(
+ prev_chunk + chunk, pcm_fmt, self.pcm_sample_rate
+ )
+ if len(first_part) < buffer_size:
+ # part is too short after the strip action?!
+ # so we just use the full first part
+ first_part = prev_chunk + chunk
+ fade_in_part = first_part[:buffer_size]
+ remaining_bytes = first_part[buffer_size:]
+ del first_part
+ # do crossfade
+ crossfade_part = await crossfade_pcm_parts(
+ fade_in_part,
+ last_fadeout_data,
+ self.queue.settings.crossfade_duration,
+ pcm_fmt,
+ self.pcm_sample_rate,
+ )
+ # send crossfade_part
+ yield crossfade_part
+ bytes_written += len(crossfade_part)
+ del crossfade_part
+ del fade_in_part
+ last_fadeout_data = b""
+ # also write the leftover bytes from the strip action
+ yield remaining_bytes
+ bytes_written += len(remaining_bytes)
+ del remaining_bytes
+ del chunk
+ prev_chunk = None # needed to prevent this chunk being sent again
+ # HANDLE LAST PART OF TRACK
+ elif prev_chunk and is_last_chunk:
+ # last chunk received so create the last_part
+ # with the previous chunk and this chunk
+ # and strip off silence
+ last_part = await strip_silence(
+ prev_chunk + chunk, pcm_fmt, self.pcm_sample_rate, True
+ )
+ if len(last_part) < buffer_size:
+ # part is too short after the strip action
+ # so we just use the entire original data
+ last_part = prev_chunk + chunk
+ if not use_crossfade or len(last_part) < buffer_size:
+ if use_crossfade:
+ self.logger.debug("not enough data for crossfade")
+ # crossfading is not enabled or not enough data,
+ # so just pass the (stripped) audio data
+ yield last_part
+ bytes_written += len(last_part)
+ del last_part
+ del chunk
+ else:
+ # handle crossfading support
+ # store fade section to be picked up for next track
+ last_fadeout_data = last_part[-buffer_size:]
+ remaining_bytes = last_part[:-buffer_size]
+ # write remaining bytes
+ if remaining_bytes:
+ yield remaining_bytes
+ bytes_written += len(remaining_bytes)
+ del last_part
+ del remaining_bytes
+ del chunk
+ # MIDDLE PARTS OF TRACK
+ else:
+ # middle part of the track
+ # keep previous chunk in memory so we have enough
+ # samples to perform the crossfade
+ if prev_chunk:
+ yield prev_chunk
+ bytes_written += len(prev_chunk)
+ prev_chunk = chunk
+ else:
+ prev_chunk = chunk
+ del chunk
+ # end of the track reached
+ queue_track.streamdetails.seconds_streamed = bytes_written / sample_size
+ self.logger.debug(
+ "Finished Streaming queue track: %s (%s) on queue %s",
+ queue_track.uri,
+ queue_track.name,
+ self.queue.player.name,
+ )
+ # end of queue reached, pass last fadeout bits to final output
+ yield last_fadeout_data
+ del last_fadeout_data
+ # END OF QUEUE STREAM
+ self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name)
import asyncio
import logging
+import os
+import re
import struct
from io import BytesIO
+from time import time
from typing import TYPE_CHECKING, AsyncGenerator, List, Optional, Tuple
import aiofiles
from music_assistant.helpers.process import AsyncProcess, check_output
from music_assistant.helpers.util import create_tempfile
-from music_assistant.models.enums import EventType, ProviderType
-from music_assistant.models.errors import AudioError, MediaNotFoundError
-from music_assistant.models.event import MassEvent
-from music_assistant.models.media_items import (
- ContentType,
- MediaType,
- StreamDetails,
- StreamType,
+from music_assistant.models.enums import ProviderType
+from music_assistant.models.errors import (
+ AudioError,
+ MediaNotFoundError,
+ MusicAssistantError,
)
+from music_assistant.models.media_items import ContentType, MediaType, StreamDetails
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
from music_assistant.models.player_queue import QueueItem
-LOGGER = logging.getLogger("audio")
+LOGGER = logging.getLogger(__name__)
# pylint:disable=consider-using-f-string
fmt: ContentType,
sample_rate: int,
) -> bytes:
- """Crossfade two chunks of pcm/raw audio using sox."""
- _, ffmpeg_present = await check_audio_support()
-
- # prefer ffmpeg implementation (due to simplicity)
- if ffmpeg_present:
- fadeoutfile = create_tempfile()
- async with aiofiles.open(fadeoutfile.name, "wb") as outfile:
- await outfile.write(fade_out_part)
- # input args
- args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
- args += [
- "-f",
- fmt.value,
- "-ac",
- "2",
- "-ar",
- str(sample_rate),
- "-i",
- fadeoutfile.name,
- ]
- args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"]
- # filter args
- args += ["-filter_complex", f"[0][1]acrossfade=d={fade_length}"]
- # output args
- args += ["-f", fmt.value, "-"]
- async with AsyncProcess(args, True) as proc:
- crossfade_data, _ = await proc.communicate(fade_in_part)
- return crossfade_data
-
- # sox based implementation
- sox_args = [fmt.sox_format(), "-c", "2", "-r", str(sample_rate)]
- # create fade-in part
- fadeinfile = create_tempfile()
- args = ["sox", "--ignore-length", "-t"] + sox_args
- args += ["-", "-t"] + sox_args + [fadeinfile.name, "fade", "t", str(fade_length)]
- async with AsyncProcess(args, enable_write=True) as sox_proc:
- await sox_proc.communicate(fade_in_part)
- # create fade-out part
+ """Crossfade two chunks of pcm/raw audio using ffmpeg."""
fadeoutfile = create_tempfile()
- args = ["sox", "--ignore-length", "-t"] + sox_args + ["-", "-t"] + sox_args
- args += [fadeoutfile.name, "reverse", "fade", "t", str(fade_length), "reverse"]
- async with AsyncProcess(args, enable_write=True) as sox_proc:
- await sox_proc.communicate(fade_out_part)
- # create crossfade using sox and some temp files
- # TODO: figure out how to make this less complex and without the tempfiles
- args = ["sox", "-m", "-v", "1.0", "-t"] + sox_args + [fadeoutfile.name, "-v", "1.0"]
- args += ["-t"] + sox_args + [fadeinfile.name, "-t"] + sox_args + ["-"]
- async with AsyncProcess(args, enable_write=False) as sox_proc:
- crossfade_part, _ = await sox_proc.communicate()
- fadeinfile.close()
- fadeoutfile.close()
- del fadeinfile
- del fadeoutfile
- return crossfade_part
+ async with aiofiles.open(fadeoutfile.name, "wb") as outfile:
+ await outfile.write(fade_out_part)
+ # input args
+ args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
+ args += [
+ "-f",
+ fmt.value,
+ "-ac",
+ "2",
+ "-ar",
+ str(sample_rate),
+ "-i",
+ fadeoutfile.name,
+ ]
+ args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"]
+ # filter args
+ args += ["-filter_complex", f"[0][1]acrossfade=d={fade_length}"]
+ # output args
+ args += ["-f", fmt.value, "-"]
+ async with AsyncProcess(args, True) as proc:
+ crossfade_data, _ = await proc.communicate(fade_in_part)
+ LOGGER.debug(
+ "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - result: %s",
+ len(fade_in_part),
+ len(fade_out_part),
+ len(crossfade_data),
+ )
+ return crossfade_data
async def fadein_pcm_part(
audio_data: bytes, fmt: ContentType, sample_rate: int, reverse=False
) -> bytes:
"""Strip silence from (a chunk of) pcm audio."""
- _, ffmpeg_present = await check_audio_support()
- # prefer ffmpeg implementation
- if ffmpeg_present:
- # input args
- args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
- args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"]
- # filter args
- if reverse:
- args += ["-af", "areverse,silenceremove=1:0:-50dB:detection=peak,areverse"]
- else:
- args += ["-af", "silenceremove=1:0:-50dB:detection=peak"]
- # output args
- args += ["-f", fmt.value, "-"]
- async with AsyncProcess(args, True) as proc:
- stripped_data, _ = await proc.communicate(audio_data)
- return stripped_data
-
- # sox implementation
- sox_args = [fmt.sox_format(), "-c", "2", "-r", str(sample_rate)]
- args = ["sox", "--ignore-length", "-t"] + sox_args + ["-", "-t"] + sox_args + ["-"]
- if reverse:
- args.append("reverse")
- args += ["silence", "1", "0.1", "1%"]
+ # input args
+ args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
+ args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"]
+ # filter args
if reverse:
- args.append("reverse")
- async with AsyncProcess(args, enable_write=True) as sox_proc:
- stripped_data, _ = await sox_proc.communicate(audio_data)
- return stripped_data
+ args += ["-af", "areverse,silenceremove=1:0:-50dB:detection=peak,areverse"]
+ else:
+ args += ["-af", "silenceremove=1:0:-50dB:detection=peak"]
+ # output args
+ args += ["-f", fmt.value, "-"]
+ async with AsyncProcess(args, True) as proc:
+ stripped_data, _ = await proc.communicate(audio_data)
+ LOGGER.debug(
+ "stripped silence of pcm chunk. size before: %s - after: %s",
+ len(audio_data),
+ len(stripped_data),
+ )
+ return stripped_data
async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> None:
# only when needed we do the analyze job
return
- LOGGER.debug(
- "Start analyzing track %s/%s",
- streamdetails.provider.value,
- streamdetails.item_id,
- )
+ LOGGER.debug("Start analyzing track %s", streamdetails.uri)
# calculate BS.1770 R128 integrated loudness with ffmpeg
- if streamdetails.type == StreamType.EXECUTABLE:
- proc_args = (
- "%s | ffmpeg -i pipe: -af ebur128=framelog=verbose -f null - 2>&1 | awk '/I:/{print $2}'"
- % streamdetails.path
- )
- else:
- proc_args = (
- "ffmpeg -i '%s' -af ebur128=framelog=verbose -f null - 2>&1 | awk '/I:/{print $2}'"
- % streamdetails.path
- )
- audio_data = b""
- if streamdetails.media_type == MediaType.RADIO:
- proc_args = "ffmpeg -i pipe: -af ebur128=framelog=verbose -f null - 2>&1 | awk '/I:/{print $2}'"
- # for radio we collect ~10 minutes of audio data to process
- async with mass.http_session.get(streamdetails.path) as response:
- async for chunk, _ in response.content.iter_chunks():
- audio_data += chunk
- if len(audio_data) >= 20000:
+ started = time()
+ proc_args = [
+ "ffmpeg",
+ "-f",
+ streamdetails.content_type.value,
+ "-i",
+ "-",
+ "-af",
+ "ebur128=framelog=verbose",
+ "-f",
+ "null",
+ "-",
+ ]
+ async with AsyncProcess(proc_args, True, use_stderr=True) as ffmpeg_proc:
+
+ async def writer():
+ """Task that grabs the source audio and feeds it to ffmpeg."""
+ music_prov = mass.music.get_provider(streamdetails.provider)
+ async for audio_chunk in music_prov.get_audio_stream(streamdetails):
+ await ffmpeg_proc.write(audio_chunk)
+ if (time() - started) > 300:
+ # just in case of endless radio stream etc
break
+ ffmpeg_proc.write_eof()
- proc = await asyncio.create_subprocess_shell(
- proc_args,
- stdout=asyncio.subprocess.PIPE,
- stdin=asyncio.subprocess.PIPE if audio_data else None,
- )
- stdout, _ = await proc.communicate(audio_data or None)
- try:
- loudness = float(stdout.decode().strip())
- except (ValueError, AttributeError):
- LOGGER.warning(
- "Could not determine integrated loudness of %s/%s - %s",
- streamdetails.provider.value,
- streamdetails.item_id,
- stdout.decode() or "received empty value",
- )
- else:
- await mass.music.set_track_loudness(
- streamdetails.item_id, streamdetails.provider, loudness
- )
- LOGGER.debug(
- "Integrated loudness of %s/%s is: %s",
- streamdetails.provider.value,
- streamdetails.item_id,
- loudness,
- )
+ writer_task = ffmpeg_proc.attach_task(writer())
+ # wait for the writer task to finish
+ await writer_task
+
+ _, stderr = await ffmpeg_proc.communicate()
+ try:
+ loudness_str = (
+ stderr.decode()
+ .split("Integrated loudness")[1]
+ .split("I:")[1]
+ .split("LUFS")[0]
+ )
+ loudness = float(loudness_str.strip())
+ except (ValueError, AttributeError):
+ LOGGER.warning(
+ "Could not determine integrated loudness of %s - %s",
+ streamdetails.uri,
+ stderr.decode() or "received empty value",
+ )
+ else:
+ streamdetails.loudness = loudness
+ await mass.music.set_track_loudness(
+ streamdetails.item_id, streamdetails.provider, loudness
+ )
+ LOGGER.debug(
+ "Integrated loudness of %s is: %s",
+ streamdetails.uri,
+ loudness,
+ )
async def get_stream_details(
param media_item: The MediaItem (track/radio) for which to request the streamdetails for.
param queue_id: Optionally provide the queue_id which will play this stream.
"""
- if not queue_item.media_item:
- # special case: a plain url was added to the queue
- streamdetails = StreamDetails(
- type=StreamType.URL,
- provider=ProviderType.URL,
- item_id=queue_item.item_id,
- path=queue_item.uri,
- content_type=ContentType.try_parse(queue_item.uri),
- )
+ if queue_item.streamdetails and (time() < queue_item.streamdetails.expires):
+ # we already have fresh streamdetails, use these
+ queue_item.streamdetails.seconds_skipped = 0
+ queue_item.streamdetails.seconds_streamed = 0
+ streamdetails = queue_item.streamdetails
+ elif queue_item.media_type == MediaType.URL:
+ # handle URL provider items
+ url_prov = mass.music.get_provider(ProviderType.URL)
+ streamdetails = await url_prov.get_stream_details(queue_item.uri)
else:
+ # media item: fetch streamdetails from provider
# always request the full db track as there might be other qualities available
full_item = await mass.music.get_item_by_uri(queue_item.uri)
# sort by quality and check track availability
music_prov = mass.music.get_provider(prov_media.prov_id)
if not music_prov or not music_prov.available:
continue # provider temporary unavailable ?
-
- streamdetails: StreamDetails = await music_prov.get_stream_details(
- prov_media.item_id
- )
- if streamdetails:
- try:
- streamdetails.content_type = ContentType(streamdetails.content_type)
- except KeyError:
- LOGGER.warning("Invalid content type!")
- else:
- break
+ try:
+ streamdetails: StreamDetails = await music_prov.get_stream_details(
+ prov_media.item_id
+ )
+ streamdetails.content_type = ContentType(streamdetails.content_type)
+ except MusicAssistantError as err:
+ LOGGER.warning(str(err))
+ else:
+ break
if not streamdetails:
raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}")
- # set player_id on the streamdetails so we know what players stream
+ # set queue_id on the streamdetails so we know what is being streamed
streamdetails.queue_id = queue_id
# get gain correct / replaygain
- loudness, gain_correct = await get_gain_correct(
- mass, queue_id, streamdetails.item_id, streamdetails.provider
- )
- streamdetails.gain_correct = gain_correct
- streamdetails.loudness = loudness
+ if not streamdetails.gain_correct:
+ loudness, gain_correct = await get_gain_correct(mass, streamdetails)
+ streamdetails.gain_correct = gain_correct
+ streamdetails.loudness = loudness
+ if not streamdetails.duration:
+ streamdetails.duration = queue_item.duration
# set streamdetails as attribute on the media_item
# this way the app knows what content is playing
queue_item.streamdetails = streamdetails
async def get_gain_correct(
- mass: MusicAssistant, queue_id: str, item_id: str, provider: ProviderType
+ mass: MusicAssistant, streamdetails: StreamDetails
) -> Tuple[float, float]:
"""Get gain correction for given queue / track combination."""
- queue = mass.players.get_player_queue(queue_id)
+ queue = mass.players.get_player_queue(streamdetails.queue_id)
if not queue or not queue.settings.volume_normalization_enabled:
return (0, 0)
+ if streamdetails.gain_correct is not None:
+ return (streamdetails.loudness, streamdetails.gain_correct)
target_gain = queue.settings.volume_normalization_target
- track_loudness = await mass.music.get_track_loudness(item_id, provider)
+ track_loudness = await mass.music.get_track_loudness(
+ streamdetails.item_id, streamdetails.provider
+ )
if track_loudness is None:
# fallback to provider average
- fallback_track_loudness = await mass.music.get_provider_loudness(provider)
+ fallback_track_loudness = await mass.music.get_provider_loudness(
+ streamdetails.provider
+ )
if fallback_track_loudness is None:
# fallback to some (hopefully sane) average value for now
fallback_track_loudness = -8.5
return file.getvalue()
-async def get_sox_args(
+async def get_ffmpeg_args(
streamdetails: StreamDetails,
output_format: Optional[ContentType] = None,
- resample: Optional[int] = None,
- seek_position: Optional[int] = None,
+ pcm_sample_rate: Optional[int] = None,
+ pcm_channels: int = 2,
) -> List[str]:
- """Collect all args to send to the sox (or ffmpeg) process."""
- stream_path = streamdetails.path
- stream_type = StreamType(streamdetails.type)
+ """Collect all args to send to the ffmpeg process."""
input_format = streamdetails.content_type
if output_format is None:
output_format = input_format
- sox_present, ffmpeg_present = await check_audio_support()
- use_ffmpeg = not sox_present or not input_format.sox_supported() or seek_position
+ ffmpeg_present, libsoxr_support = await check_audio_support()
- # use ffmpeg if content not supported by SoX (e.g. AAC radio streams)
- if use_ffmpeg:
- if not ffmpeg_present:
- raise AudioError(
- "FFmpeg binary is missing from system."
- "Please install ffmpeg on your OS to enable playback.",
- )
- # collect input args
- if stream_type == StreamType.EXECUTABLE:
- # stream from executable
- input_args = [
- stream_path,
- "|",
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "error",
- "-f",
- input_format.value,
- "-i",
- "-",
- ]
- else:
- input_args = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "error",
- "-i",
- stream_path,
- ]
- if seek_position:
- input_args += ["-ss", str(seek_position)]
- # collect output args
- if output_format.is_pcm():
- output_args = [
- "-f",
- output_format.value,
- "-c:a",
- output_format.name.lower(),
- "-",
- ]
- else:
- output_args = ["-f", output_format.value, "-"]
- # collect filter args
- filter_args = []
- if streamdetails.gain_correct:
- filter_args += ["-filter:a", f"volume={streamdetails.gain_correct}dB"]
- if resample or input_format.is_pcm():
- filter_args += ["-ar", str(resample)]
- return input_args + filter_args + output_args
-
- # Prefer SoX for all other (=highest quality)
- if stream_type == StreamType.EXECUTABLE:
- # stream from executable
- input_args = [
- stream_path,
- "|",
- "sox",
- "-t",
- input_format.sox_format(),
- ]
- if input_format.is_pcm():
- input_args += [
- "-r",
- str(streamdetails.sample_rate),
- "-c",
- str(streamdetails.channels),
- ]
- input_args.append("-")
- else:
- input_args = ["sox", "-t", input_format.sox_format(), stream_path]
+ if not ffmpeg_present:
+ raise AudioError(
+ "FFmpeg binary is missing from system."
+ "Please install ffmpeg on your OS to enable playback.",
+ )
+ # collect input args
+ input_args = [
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ "error",
+ "-ignore_unknown",
+ ]
+ if streamdetails.content_type != ContentType.UNKNOWN:
+ input_args += ["-f", input_format.value]
+ input_args += ["-i", "-"]
# collect output args
if output_format.is_pcm():
- output_args = ["-t", output_format.sox_format(), "-c", "2", "-"]
- elif output_format == ContentType.FLAC:
- output_args = ["-t", "flac", "-C", "0", "-"]
+ output_args = [
+ "-acodec",
+ output_format.name.lower(),
+ "-f",
+ output_format.value,
+ "-ac",
+ str(pcm_channels),
+ "-ar",
+ str(pcm_sample_rate),
+ "-",
+ ]
else:
- output_args = ["-t", output_format.sox_format(), "-"]
- # collect filter args
- filter_args = []
+ output_args = ["-f", output_format.value, "-"]
+ # collect extra and filter args
+ extra_args = []
+ filter_params = []
if streamdetails.gain_correct:
- filter_args += ["vol", str(streamdetails.gain_correct), "dB"]
- if resample and streamdetails.media_type != MediaType.RADIO:
- # use extra high quality resampler only if it makes sense
- filter_args += ["rate", "-v", str(resample)]
- elif resample:
- filter_args += ["rate", str(resample)]
- return input_args + output_args + filter_args
+ filter_params.append(f"volume={streamdetails.gain_correct}dB")
+ if (
+ pcm_sample_rate is not None
+ and streamdetails.sample_rate != pcm_sample_rate
+ and libsoxr_support
+ and streamdetails.media_type == MediaType.TRACK
+ ):
+ # prefer libsoxr high quality resampler (if present) for sample rate conversions
+ filter_params.append("aresample=resampler=soxr")
+ if filter_params:
+ extra_args += ["-af", ",".join(filter_params)]
+
+ if pcm_sample_rate is not None and not output_format.is_pcm():
+ extra_args += ["-ar", str(pcm_sample_rate)]
+
+ return input_args + extra_args + output_args
async def get_media_stream(
mass: MusicAssistant,
streamdetails: StreamDetails,
output_format: Optional[ContentType] = None,
- resample: Optional[int] = None,
+ pcm_sample_rate: Optional[int] = None,
chunk_size: Optional[int] = None,
- seek_position: Optional[int] = None,
-) -> AsyncGenerator[Tuple[bool, bytes], None]:
+ seek_position: int = 0,
+) -> AsyncGenerator[bytes, None]:
"""Get the audio stream for the given streamdetails."""
- if chunk_size is None:
- if streamdetails.content_type in (
- ContentType.AAC,
- ContentType.M4A,
- ContentType.MP3,
- ContentType.OGG,
- ):
- chunk_size = 32000
- else:
- chunk_size = 256000
-
- mass.signal_event(
- MassEvent(
- EventType.STREAM_STARTED,
- object_id=streamdetails.provider.value,
- data=streamdetails,
- )
- )
- args = await get_sox_args(streamdetails, output_format, resample, seek_position)
- async with AsyncProcess(args) as sox_proc:
+ args = await get_ffmpeg_args(streamdetails, output_format, pcm_sample_rate)
+ async with AsyncProcess(
+ args, enable_write=True, chunk_size=chunk_size
+ ) as ffmpeg_proc:
LOGGER.debug(
- "start media stream for: %s/%s (%s)",
- streamdetails.provider,
- streamdetails.item_id,
- streamdetails.type,
+ "start media stream for: %s, using args: %s", streamdetails.uri, str(args)
)
+ async def writer():
+ """Task that grabs the source audio and feeds it to ffmpeg."""
+ LOGGER.debug("writer started for %s", streamdetails.uri)
+ music_prov = mass.music.get_provider(streamdetails.provider)
+ async for audio_chunk in music_prov.get_audio_stream(
+ streamdetails, seek_position
+ ):
+ await ffmpeg_proc.write(audio_chunk)
+ # write eof when last packet is received
+ ffmpeg_proc.write_eof()
+ LOGGER.debug("writer finished for %s", streamdetails.uri)
+
+ ffmpeg_proc.attach_task(writer())
+
# yield chunks from stdout
- # we keep 1 chunk behind to detect end of stream properly
try:
- prev_chunk = b""
- async for chunk in sox_proc.iterate_chunks(chunk_size):
- if prev_chunk:
- yield (False, prev_chunk)
- prev_chunk = chunk
- # send last chunk
- yield (True, prev_chunk)
+ async for chunk in ffmpeg_proc.iterate_chunks():
+ yield chunk
except (asyncio.CancelledError, GeneratorExit) as err:
- LOGGER.debug(
- "media stream aborted for: %s/%s",
- streamdetails.provider,
- streamdetails.item_id,
- )
+ LOGGER.debug("media stream aborted for: %s", streamdetails.uri)
raise err
else:
- LOGGER.debug(
- "finished media stream for: %s/%s",
- streamdetails.provider,
- streamdetails.item_id,
- )
+ LOGGER.debug("finished media stream for: %s", streamdetails.uri)
await mass.music.mark_item_played(
streamdetails.item_id, streamdetails.provider
)
+ finally:
# send analyze job to background worker
- if (
- streamdetails.loudness is None
- and streamdetails.provider != ProviderType.URL
- ):
- uri = f"{streamdetails.provider.value}://{streamdetails.media_type.value}/{streamdetails.item_id}"
+ if streamdetails.loudness is None:
mass.add_job(
- analyze_audio(mass, streamdetails), f"Analyze audio for {uri}"
- )
- finally:
- mass.signal_event(
- MassEvent(
- EventType.STREAM_ENDED,
- object_id=streamdetails.provider.value,
- data=streamdetails,
+ analyze_audio(mass, streamdetails),
+ f"Analyze audio for {streamdetails.uri}",
)
+
+
+async def get_radio_stream(
+ mass: MusicAssistant, url: str, streamdetails: StreamDetails
+) -> AsyncGenerator[bytes, None]:
+ """Get radio audio stream from HTTP, including metadata retrieval."""
+ headers = {"Icy-MetaData": "1"}
+ while True:
+ # in loop to reconnect on connection failure
+ LOGGER.debug("radio stream (re)connecting to: %s", url)
+ async with mass.http_session.get(url, headers=headers) as resp:
+ headers = resp.headers
+ meta_int = int(headers.get("icy-metaint", "0"))
+ # stream with ICY Metadata
+ if meta_int:
+ while True:
+ audio_chunk = await resp.content.readexactly(meta_int)
+ yield audio_chunk
+ meta_byte = await resp.content.readexactly(1)
+ meta_length = ord(meta_byte) * 16
+ meta_data = await resp.content.readexactly(meta_length)
+ if not meta_data:
+ continue
+ meta_data = meta_data.rstrip(b"\0")
+ stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
+ if not stream_title:
+ continue
+ stream_title = stream_title.group(1).decode()
+ if stream_title != streamdetails.stream_title:
+ streamdetails.stream_title = stream_title
+ if queue := mass.players.get_player_queue(
+ streamdetails.queue_id
+ ):
+ queue.signal_update()
+ # Regular HTTP stream
+ else:
+ async for chunk in resp.content.iter_any():
+ yield chunk
+
+
+async def get_http_stream(
+ mass: MusicAssistant,
+ url: str,
+ streamdetails: StreamDetails,
+ seek_position: int = 0,
+) -> AsyncGenerator[bytes, None]:
+ """Get audio stream from HTTP."""
+ if seek_position:
+ assert streamdetails.duration, "Duration required for seek requests"
+ chunk_size = get_chunksize(streamdetails.content_type)
+ # try to get filesize with a head request
+ if seek_position and not streamdetails.size:
+ async with mass.http_session.head(url) as resp:
+ if size := resp.headers.get("Content-Length"):
+ streamdetails.size = int(size)
+ # headers
+ headers = {}
+ skip_bytes = 0
+ if seek_position and streamdetails.size:
+ skip_bytes = int(streamdetails.size / streamdetails.duration * seek_position)
+ headers["Range"] = f"bytes={skip_bytes}-"
+
+ # start the streaming from http
+ buffer = b""
+ buffer_all = False
+ bytes_received = 0
+ async with mass.http_session.get(url, headers=headers) as resp:
+ is_partial = resp.status == 206
+ buffer_all = seek_position and not is_partial
+ async for chunk in resp.content.iter_chunked(chunk_size):
+ bytes_received += len(chunk)
+ if buffer_all and not skip_bytes:
+ buffer += chunk
+ continue
+ if not is_partial and skip_bytes and bytes_received < skip_bytes:
+ continue
+ yield chunk
+
+ # store size on streamdetails for later use
+ if not streamdetails.size:
+ streamdetails.size = bytes_received
+ if buffer_all:
+ skip_bytes = streamdetails.size / streamdetails.duration * seek_position
+ yield buffer[:skip_bytes]
+ del buffer
+
+
+async def get_file_stream(
+ mass: MusicAssistant,
+ filename: str,
+ streamdetails: StreamDetails,
+ seek_position: int = 0,
+) -> AsyncGenerator[bytes, None]:
+ """Get audio stream from local accessible file."""
+ if seek_position:
+ assert streamdetails.duration, "Duration required for seek requests"
+ if not streamdetails.size:
+ stat = await mass.loop.run_in_executor(None, os.stat, filename)
+ streamdetails.size = stat.st_size
+ chunk_size = get_chunksize(streamdetails.content_type)
+ async with aiofiles.open(streamdetails.data, "rb") as _file:
+ if seek_position:
+ seek_pos = int(
+ (streamdetails.size / streamdetails.duration) * seek_position
)
+ await _file.seek(seek_pos)
+ # yield chunks of data from file
+ while True:
+ data = await _file.read(chunk_size)
+ if not data:
+ break
+ yield data
-async def check_audio_support(try_install: bool = False) -> Tuple[bool, bool, bool]:
- """Check if sox and/or ffmpeg are present."""
+async def check_audio_support(try_install: bool = False) -> Tuple[bool, bool]:
+ """Check if ffmpeg is present (with/without libsoxr support)."""
cache_key = "audio_support_cache"
if cache := globals().get(cache_key):
return cache
- # check for SoX presence
- returncode, output = await check_output("sox --version")
- sox_present = returncode == 0 and "SoX" in output.decode()
- if not sox_present and try_install:
- # try a few common ways to install SoX
- # this all assumes we have enough rights and running on a linux based platform (or docker)
- await check_output("apt-get update && apt-get install sox libsox-fmt-all")
- await check_output("apk add sox")
- # test again
- returncode, output = await check_output("sox --version")
- sox_present = returncode == 0 and "SoX" in output.decode()
# check for FFmpeg presence
returncode, output = await check_output("ffmpeg -version")
ffmpeg_present = returncode == 0 and "FFmpeg" in output.decode()
if not ffmpeg_present and try_install:
- # try a few common ways to install SoX
+ # try a few common ways to install ffmpeg
# this all assumes we have enough rights and running on a linux based platform (or docker)
await check_output("apt-get update && apt-get install ffmpeg")
await check_output("apk add ffmpeg")
ffmpeg_present = returncode == 0 and "FFmpeg" in output.decode()
# use globals as in-memory cache
- result = (sox_present, ffmpeg_present)
+ libsoxr_support = "enable-libsoxr" in output.decode()
+ result = (ffmpeg_present, libsoxr_support)
globals()[cache_key] = result
return result
-async def get_sox_args_for_pcm_stream(
+async def get_ffmpeg_args_for_pcm_stream(
sample_rate: int,
bit_depth: int,
channels: int,
floating_point: bool = False,
output_format: ContentType = ContentType.FLAC,
) -> List[str]:
- """Collect args for sox (or ffmpeg) when converting from raw pcm to another contenttype."""
-
- sox_present, ffmpeg_present = await check_audio_support()
+ """Collect args for ffmpeg when converting from raw pcm to another contenttype."""
input_format = ContentType.from_bit_depth(bit_depth, floating_point)
- sox_present = True
-
- # use ffmpeg if sox is not present
- if not sox_present:
- if not ffmpeg_present:
- raise AudioError(
- "FFmpeg binary is missing from system. "
- "Please install ffmpeg on your OS to enable playback.",
- )
- # collect input args
- input_args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
- input_args += [
- "-f",
- input_format.value,
- "-ac",
- str(channels),
- "-ar",
- str(sample_rate),
- "-i",
- "-",
- ]
- # collect output args
- output_args = ["-f", output_format.value, "-"]
- return input_args + output_args
-
- # Prefer SoX for all other (=highest quality)
-
# collect input args
- input_args = [
- "sox",
- "-t",
- input_format.sox_format(),
- "-r",
- str(sample_rate),
- "-b",
- str(bit_depth),
- "-c",
+ input_args = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-ignore_unknown"]
+ input_args += [
+ "-f",
+ input_format.value,
+ "-ac",
str(channels),
+ "-ar",
+ str(sample_rate),
+ "-i",
"-",
]
- # collect output args
- if output_format == ContentType.FLAC:
- output_args = ["-t", "flac", "-C", "0", "-"]
- else:
- output_args = ["-t", output_format.sox_format(), "-"]
+ # collect output args
+ output_args = ["-f", output_format.value, "-"]
return input_args + output_args
mass: MusicAssistant,
provider_id: str,
track_id: str,
-) -> AsyncGenerator[Tuple[bool, bytes], None]:
- """Get the audio stream for the given streamdetails."""
+) -> AsyncGenerator[bytes, None]:
+ """Create a 30 seconds preview audioclip for the given streamdetails."""
music_prov = mass.music.get_provider(provider_id)
streamdetails = await music_prov.get_stream_details(track_id)
- if streamdetails.type == StreamType.EXECUTABLE:
- # stream from executable
- input_args = [
- streamdetails.path,
- "|",
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "error",
- "-f",
- streamdetails.content_type.value,
- "-i",
- "-",
- ]
- else:
- input_args = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "error",
- "-i",
- streamdetails.path,
- ]
- output_args = ["-ss", "30", "-to", "60", "-f", "mp3", "-"]
- async with AsyncProcess(input_args + output_args) as proc:
+ input_args = [
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ "error",
+ "-f",
+ streamdetails.content_type.value,
+ "-i",
+ "-",
+ ]
+ output_args = ["-to", "30", "-f", "mp3", "-"]
+ args = input_args + output_args
+ async with AsyncProcess(args, True) as ffmpeg_proc:
+
+ async def writer():
+ """Task that grabs the source audio and feeds it to ffmpeg."""
+ music_prov = mass.music.get_provider(streamdetails.provider)
+ async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30):
+ await ffmpeg_proc.write(audio_chunk)
+ # write eof when last packet is received
+ ffmpeg_proc.write_eof()
+
+ ffmpeg_proc.attach_task(writer())
# yield chunks from stdout
- # we keep 1 chunk behind to detect end of stream properly
- try:
- prev_chunk = b""
- async for chunk in proc.iterate_chunks():
- if prev_chunk:
- yield (False, prev_chunk)
- prev_chunk = chunk
- # send last chunk
- yield (True, prev_chunk)
- finally:
- mass.signal_event(
- MassEvent(
- EventType.STREAM_ENDED,
- object_id=streamdetails.provider.value,
- data=streamdetails,
- )
- )
+ async for chunk in ffmpeg_proc.iterate_chunks():
+ yield chunk
+
+
+def get_chunksize(content_type: ContentType) -> int:
+ """Get a default chunksize for given contenttype."""
+ if content_type.is_pcm():
+ return 512000
+ if content_type in (
+ ContentType.AAC,
+ ContentType.M4A,
+ ):
+ return 32000
+ if content_type in (
+ ContentType.MP3,
+ ContentType.OGG,
+ ):
+ return 64000
+ return 256000
from async_timeout import timeout as _timeout
-LOGGER = logging.getLogger("AsyncProcess")
+LOGGER = logging.getLogger(__name__)
-DEFAULT_CHUNKSIZE = 512000
+DEFAULT_CHUNKSIZE = 128000
DEFAULT_TIMEOUT = 120
class AsyncProcess:
"""Implementation of a (truly) non blocking subprocess."""
- def __init__(self, args: Union[List, str], enable_write: bool = False):
+ def __init__(
+ self,
+ args: Union[List, str],
+ enable_write: bool = False,
+ chunk_size: int = DEFAULT_CHUNKSIZE,
+ use_stderr: bool = False,
+ ):
"""Initialize."""
self._proc = None
self._args = args
+ self._use_stderr = use_stderr
self._enable_write = enable_write
self._attached_task: asyncio.Task = None
self.closed = False
+ self.chunk_size = chunk_size or DEFAULT_CHUNKSIZE
async def __aenter__(self) -> "AsyncProcess":
"""Enter context manager."""
self._proc = await asyncio.create_subprocess_shell(
args,
stdin=asyncio.subprocess.PIPE if self._enable_write else None,
- stdout=asyncio.subprocess.PIPE,
- limit=DEFAULT_CHUNKSIZE * 10,
+ stdout=asyncio.subprocess.PIPE if not self._use_stderr else None,
+ stderr=asyncio.subprocess.PIPE if self._use_stderr else None,
+ limit=self.chunk_size * 5,
close_fds=True,
)
else:
self._proc = await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.PIPE if self._enable_write else None,
- stdout=asyncio.subprocess.PIPE,
- limit=DEFAULT_CHUNKSIZE * 10,
+ stdout=asyncio.subprocess.PIPE if not self._use_stderr else None,
+ stderr=asyncio.subprocess.PIPE if self._use_stderr else None,
+ limit=self.chunk_size * 5,
close_fds=True,
)
return self
self.closed = True
if self._attached_task:
# cancel the attached reader/writer task
- self._attached_task.cancel()
- if self._proc.returncode is None:
- # prevent subprocess deadlocking, send terminate and read remaining bytes
try:
- self._proc.terminate()
- # close stdin and let it drain
- if self._enable_write:
- await self._proc.stdin.drain()
- self._proc.stdin.close()
- # read remaining bytes
- await self._proc.stdout.read()
- # we really want to make this thing die ;-)
- self._proc.kill()
- except (
- ProcessLookupError,
- BrokenPipeError,
- RuntimeError,
- ConnectionResetError,
- ):
+ self._attached_task.cancel()
+ await self._attached_task
+ except asyncio.CancelledError:
pass
- del self._proc
+ if self._proc.returncode is None:
+ # prevent subprocess deadlocking, read remaining bytes
+ await self._proc.communicate(b"" if self._enable_write else None)
+ if self._proc.returncode is None:
+ # just in case?
+ self._proc.kill()
- async def iterate_chunks(
- self, chunk_size: int = DEFAULT_CHUNKSIZE, timeout: int = DEFAULT_TIMEOUT
- ) -> AsyncGenerator[bytes, None]:
+ async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
"""Yield chunks from the process stdout. Generator."""
while True:
- chunk = await self.read(chunk_size, timeout)
- if not chunk:
- break
+ chunk = await self._read_chunk()
yield chunk
- if chunk_size is not None and len(chunk) < chunk_size:
+ if len(chunk) < self.chunk_size:
+ del chunk
break
+ del chunk
- async def read(
- self, chunk_size: int = DEFAULT_CHUNKSIZE, timeout: int = DEFAULT_TIMEOUT
- ) -> bytes:
- """Read x bytes from the process stdout."""
+ async def _read_chunk(self, timeout: int = DEFAULT_TIMEOUT) -> bytes:
+ """Read chunk_size bytes from the process stdout."""
+ if self.closed:
+ return b""
try:
async with _timeout(timeout):
- if chunk_size is None:
- return await self._proc.stdout.read(DEFAULT_CHUNKSIZE)
- return await self._proc.stdout.readexactly(chunk_size)
+ return await self._proc.stdout.readexactly(self.chunk_size)
except asyncio.IncompleteReadError as err:
return err.partial
- except AttributeError as exc:
- raise asyncio.CancelledError() from exc
except asyncio.TimeoutError:
return b""
try:
self._proc.stdin.write(data)
await self._proc.stdin.drain()
- except (AttributeError, AssertionError, BrokenPipeError):
+ except (
+ AttributeError,
+ AssertionError,
+ BrokenPipeError,
+ RuntimeError,
+ ConnectionResetError,
+ ) as err:
# already exited, race condition
- pass
+ raise asyncio.CancelledError() from err
def write_eof(self) -> None:
"""Write end of file to to process stdin."""
- try:
- if self._proc.stdin.can_write_eof():
- self._proc.stdin.write_eof()
- except (AttributeError, AssertionError, BrokenPipeError):
- # already exited, race condition
- pass
+ if self.closed:
+ return
+ if self._proc.stdin.can_write_eof():
+ self._proc.stdin.write_eof()
async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
"""Write bytes to process and read back results."""
return await self._proc.communicate(input_data)
- def attach_task(self, coro: Coroutine) -> None:
+ def attach_task(self, coro: Coroutine) -> asyncio.Task:
"""Attach given coro func as reader/writer task to properly cancel it when needed."""
- self._attached_task = asyncio.create_task(coro)
+ self._attached_task = task = asyncio.create_task(coro)
+ return task
async def check_output(shell_cmd: str) -> Tuple[int, bytes]:
import aiohttp
+from music_assistant.constants import ROOT_LOGGER_NAME
from music_assistant.controllers.metadata import MetaDataController
from music_assistant.controllers.music import MusicController
from music_assistant.controllers.players import PlayerController
-from music_assistant.controllers.stream import StreamController
+from music_assistant.controllers.streams import StreamsController
from music_assistant.helpers.cache import Cache
from music_assistant.helpers.database import Database
from music_assistant.models.background_job import BackgroundJob
self.loop: asyncio.AbstractEventLoop = None
self.http_session: aiohttp.ClientSession = session
self.http_session_provided = session is not None
- self.logger = logging.getLogger(__name__)
+ self.logger = logging.getLogger(ROOT_LOGGER_NAME)
self._listeners = []
self._jobs: Deque[BackgroundJob] = deque()
self.metadata = MetaDataController(self)
self.music = MusicController(self)
self.players = PlayerController(self)
- self.streams = StreamController(self)
+ self.streams = StreamsController(self)
self._tracked_tasks: List[asyncio.Task] = []
self.closed = False
providers: List[MusicProviderConfig] = field(default_factory=list)
# advanced settings
- max_simultaneous_jobs: int = 5
+ max_simultaneous_jobs: int = 2
stream_port: int = select_stream_port()
stream_ip: str = get_ip()
TRACK = "track"
PLAYLIST = "playlist"
RADIO = "radio"
+ URL = "url"
UNKNOWN = "unknown"
UNKNOWN = "unknown"
-class StreamType(Enum):
- """Enum with stream types."""
-
- EXECUTABLE = "executable"
- URL = "url"
- FILE = "file"
- CACHE = "cache"
-
-
class ContentType(Enum):
"""Enum with audio content/container types supported by ffmpeg."""
"""Return if contentype is PCM."""
return self.name.startswith("PCM")
- def sox_supported(self):
- """Return if ContentType is supported by SoX."""
- return self.is_pcm() or self in [
- ContentType.OGG,
- ContentType.FLAC,
- ContentType.MP3,
- ContentType.WAV,
- ContentType.AIFF,
- ]
-
- def sox_format(self):
- """Convert the ContentType to SoX compatible format."""
- if not self.sox_supported():
- raise NotImplementedError
- return self.value.replace("le", "")
-
@classmethod
def from_bit_depth(
cls, bit_depth: int, floating_point: bool = False
PLAYER_ADDED = "player_added"
PLAYER_UPDATED = "player_updated"
- STREAM_STARTED = "streaming_started"
- STREAM_ENDED = "streaming_ended"
QUEUE_ADDED = "queue_added"
QUEUE_UPDATED = "queue_updated"
QUEUE_ITEMS_UPDATED = "queue_items_updated"
from __future__ import annotations
from dataclasses import dataclass, field, fields
+from time import time
from typing import Any, Dict, List, Mapping, Optional, Set, Union
from mashumaro import DataClassDictMixin
MediaQuality,
MediaType,
ProviderType,
- StreamType,
)
MetadataTypes = Union[int, bool, str, List[str]]
class StreamDetails(DataClassDictMixin):
"""Model for streamdetails."""
- type: StreamType
+ # NOTE: the actual provider/itemid of the streamdetails may differ
+ # from the connected media_item due to track linking etc.
+ # the streamdetails are only used to provide details about the content
+ # that is going to be streamed.
+
+ # mandatory fields
provider: ProviderType
item_id: str
- path: str
content_type: ContentType
- player_id: str = ""
- details: Dict[str, Any] = field(default_factory=dict)
- seconds_played: int = 0
- seconds_skipped: int = 0
- gain_correct: float = 0
- loudness: Optional[float] = None
+ media_type: MediaType = MediaType.TRACK
sample_rate: int = 44100
bit_depth: int = 16
channels: int = 2
- media_type: MediaType = MediaType.TRACK
- queue_id: str = None
+ # stream_title: radio streams can optionally set this field
+ stream_title: Optional[str] = None
+ # duration of the item to stream, copied from media_item if omitted
+ duration: Optional[int] = None
+ # total size in bytes of the item, calculated at eof when omitted
+ size: Optional[int] = None
+ # expires: timestamp this streamdetails expire
+ expires: float = time() + 3600
+ # data: provider specific data (not exposed externally)
+ data: Optional[Any] = None
+
+ # the fields below will be set/controlled by the streamcontroller
+ queue_id: Optional[str] = None
+ seconds_streamed: int = 0
+ seconds_skipped: int = 0
+ gain_correct: Optional[float] = None
+ loudness: Optional[float] = None
def __post_serialize__(self, d: Dict[Any, Any]) -> Dict[Any, Any]:
"""Exclude internal fields from dict."""
# pylint: disable=no-self-use
- d.pop("path")
- d.pop("details")
+ d.pop("data")
+ d.pop("expires")
+ d.pop("queue_id")
return d
def __str__(self):
"""Return pretty printable string of object."""
- return f"{self.type.value}/{self.content_type.value} - {self.provider.value}/{self.item_id}"
+ return self.uri
+
+ @property
+ def uri(self) -> str:
+ """Return uri representation of item."""
+ return f"{self.provider.value}://{self.media_type.value}/{self.item_id}"
# SOME CONVENIENCE METHODS (may be overridden if needed)
+ @property
+ def stream_type(self) -> ContentType:
+ """Return supported/preferred stream type for playerqueue. Read only."""
+ # determine default stream type from player capabilities
+ return next(
+ x
+ for x in (
+ ContentType.FLAC,
+ ContentType.WAV,
+ ContentType.PCM_S16LE,
+ ContentType.MP3,
+ ContentType.MPEG,
+ )
+ if x in self.supported_content_types
+ )
+
async def volume_mute(self, muted: bool) -> None:
"""Send volume mute command to player."""
# for players that do not support mute, we fake mute with volume
)
)
+ @property
+ def supported_sample_rates(self) -> Tuple[int]:
+ """Return the sample rates this player supports."""
+ return tuple(
+ sample_rate
+ for sample_rate in DEFAULT_SUPPORTED_SAMPLE_RATES
+ if all(
+ (
+ sample_rate in child_player.supported_sample_rates
+ for child_player in self._get_child_players(False, False)
+ )
+ )
+ )
+
async def stop(self) -> None:
"""Send STOP command to player."""
if not self.use_multi_stream:
from __future__ import annotations
import asyncio
+import os
import pathlib
import random
from asyncio import Task, TimerHandle
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
-from music_assistant.helpers.audio import get_stream_details
from music_assistant.models.enums import EventType, MediaType, QueueOption, RepeatMode
-from music_assistant.models.errors import (
- MediaNotFoundError,
- MusicAssistantError,
- QueueEmpty,
-)
+from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.models.event import MassEvent
-from music_assistant.models.media_items import StreamDetails
from .player import Player, PlayerGroup, PlayerState
from .queue_item import QueueItem
from .queue_settings import QueueSettings
if TYPE_CHECKING:
+ from music_assistant.controllers.streams import QueueStream
from music_assistant.mass import MusicAssistant
RESOURCES_DIR = (
powered: bool
state: PlayerState
- volume_level: int
items: List[QueueItem]
index: Optional[int]
position: int
self.mass = mass
self.logger = mass.players.logger
self.queue_id = player_id
+ self.signal_next: bool = False
+ self._stream_id: str = ""
self._settings = QueueSettings(self)
self._current_index: Optional[int] = None
- # index_in_buffer: which track is currently (pre)loaded in the streamer
- self._index_in_buffer: Optional[int] = None
self._current_item_elapsed_time: int = 0
self._prev_item: Optional[QueueItem] = None
- # start_index: from which index did the queuestream start playing
- self._start_index: int = 0
- # start_pos: from which position (in seconds) did the first track start playing?
- self._start_pos: int = 0
- self._next_fadein: int = 0
- self._next_start_index: int = 0
- self._next_start_pos: int = 0
- self._last_state = PlayerState.IDLE
+ self._last_state = str
self._items: List[QueueItem] = []
self._save_task: TimerHandle = None
self._update_task: Task = None
- self._signal_next: bool = False
self._last_player_update: int = 0
- self._stream_url: str = ""
+ self._last_stream_id: str = ""
self._snapshot: Optional[QueueSnapShot] = None
async def setup(self) -> None:
"""Handle async setup of instance."""
await self._settings.restore()
await self._restore_items()
- self._stream_url: str = self.mass.streams.get_stream_url(
- self.queue_id, content_type=self._settings.stream_type
- )
self.mass.signal_event(
MassEvent(EventType.QUEUE_ADDED, object_id=self.queue_id, data=self)
)
"""Return if player(queue) is available."""
return self.player.available
+ @property
+ def stream(self) -> QueueStream | None:
+ """Return the currently connected/active stream for this queue."""
+ return self.mass.streams.queue_streams.get(self._stream_id)
+
+ @property
+ def index_in_buffer(self) -> int | None:
+ """Return the item that is curently loaded into the buffer."""
+ if not self._stream_id:
+ return None
+ if stream := self.mass.streams.queue_streams.get(self._stream_id):
+ return stream.index_in_buffer
+ return None
+
@property
def active(self) -> bool:
"""Return bool if the queue is currenty active on the player."""
- if self.player.use_multi_stream:
- return self.queue_id in self.player.current_url
- return self._stream_url == self.player.current_url
+ if not self.player.current_url:
+ return False
+ if stream := self.stream:
+ return stream.url == self.player.current_url
+ return False
@property
def elapsed_time(self) -> float:
QueueOption.REPLACE -> Replace queue contents with these items
QueueOption.NEXT -> Play item(s) after current playing item
QueueOption.ADD -> Append new items at end of the queue
- :param passive: do not actually start playback.
- Returns: the stream URL for this queue.
+ :param passive: if passive set to true the stream url will not be sent to the player.
"""
# a single item or list of items may be provided
if not isinstance(uris, list):
media_item = await self.mass.music.get_item_by_uri(uri)
except MusicAssistantError as err:
# invalid MA uri or item not found error
- if uri.startswith("http"):
- # a plain url was provided
+ if uri.startswith("http") or os.path.isfile(uri):
+ # a plain url (or local file) was provided
queue_items.append(QueueItem.from_url(uri))
continue
raise MediaNotFoundError(f"Invalid uri: {uri}") from err
if self._current_index and self._current_index >= (len(self._items) - 1):
self._current_index = None
self._items = []
- # clear resume point if any
- self._start_pos = 0
# load items into the queue
if queue_opt == QueueOption.REPLACE:
- await self.load(queue_items, passive=passive)
+ await self.load(queue_items, passive)
elif (
queue_opt in [QueueOption.PLAY, QueueOption.NEXT] and len(queue_items) > 100
):
- await self.load(queue_items, passive=passive)
+ await self.load(queue_items, passive)
elif queue_opt == QueueOption.NEXT:
- await self.insert(queue_items, 1, passive=passive)
+ await self.insert(queue_items, 1, passive)
elif queue_opt == QueueOption.PLAY:
- await self.insert(queue_items, 0, passive=passive)
+ await self.insert(queue_items, 0, passive)
elif queue_opt == QueueOption.ADD:
await self.append(queue_items)
- return self._stream_url
async def play_alert(
self, uri: str, announce: bool = False, volume_adjust: int = 10
"""
if self._snapshot:
self.logger.debug("Ignore play_alert: already in progress")
- return
+ # return
# create snapshot
await self.snapshot_create()
# prepend annnounce sound if needed
if announce:
- queue_items.append(QueueItem.from_url(ALERT_ANNOUNCE_FILE, "alert"))
+ queue_item = QueueItem.from_url(ALERT_ANNOUNCE_FILE, "alert")
+ queue_item.streamdetails.gain_correct = 10
+ queue_items.append(queue_item)
# parse provided uri into a MA MediaItem or Basic QueueItem from URL
try:
queue_items.append(QueueItem.from_media_item(media_item))
except MusicAssistantError as err:
# invalid MA uri or item not found error
- if uri.startswith("http"):
+ if uri.startswith("http") or os.path.isfile(uri):
# a plain url was provided
- queue_items.append(QueueItem.from_url(uri, "alert"))
+ queue_item = QueueItem.from_url(uri, "alert")
+ queue_item.streamdetails.gain_correct = 6
+ queue_items.append(queue_item)
else:
raise MediaNotFoundError(f"Invalid uri: {uri}") from err
# append silence track, we use this to reliably detect when the alert is ready
silence_url = self.mass.streams.get_silence_url(600)
- queue_items.append(QueueItem.from_url(silence_url, "alert"))
+ queue_item = QueueItem.from_url(silence_url, "alert")
+ queue_items.append(queue_item)
# load queue with alert sound(s)
await self.load(queue_items)
- # set new volume
- new_volume = self.player.volume_level + (
- self.player.volume_level / 100 * volume_adjust
- )
- await self.player.volume_set(new_volume)
# wait for the alert to finish playing
alert_done = asyncio.Event()
async def stop(self) -> None:
"""Stop command on queue player."""
+ self.signal_next = False
# redirect to underlying player
await self.player.stop()
resume_pos = 0
if resume_item is not None:
- await self.play_index(resume_item.item_id, resume_pos, 5)
+ resume_pos = resume_pos if resume_pos > 10 else 0
+ fade_in = 5 if resume_pos else 0
+ await self.play_index(resume_item.item_id, resume_pos, fade_in)
else:
self.logger.warning(
"resume queue requested for %s but queue is empty", self.queue_id
self._snapshot = QueueSnapShot(
powered=self.player.powered,
state=self.player.state,
- volume_level=self.player.volume_level,
items=self._items,
index=self._current_index,
position=self._current_item_elapsed_time,
assert self._snapshot, "Create snapshot before restoring it."
# clear queue first
await self.clear()
- # restore volume
- await self.player.volume_set(self._snapshot.volume_level)
# restore queue
await self.update(self._snapshot.items)
self._current_index = self._snapshot.index
+ self._current_item_elapsed_time = self._snapshot.position
if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED):
await self.resume()
if self._snapshot.state == PlayerState.PAUSED:
passive: bool = False,
) -> None:
"""Play item at index (or item_id) X in queue."""
- if self.player.use_multi_stream:
- await self.mass.streams.stop_multi_client_queue_stream(self.queue_id)
if not isinstance(index, int):
index = self.index_by_id(index)
if index is None:
if not len(self.items) > index:
return
self._current_index = index
- self._next_start_index = index
- self._next_start_pos = int(seek_position)
- self._next_fadein = fade_in
- # send stream url to player connected to this queue
- self._stream_url = self.mass.streams.get_stream_url(
- self.queue_id, content_type=self._settings.stream_type
- )
-
- if self.player.use_multi_stream:
- # multi stream enabled, all child players should receive the same audio stream
- # redirect command to all (powered) players
- coros = []
- expected_clients = set()
- for child_id in self.player.group_childs:
- if child_player := self.mass.players.get_player(child_id):
- if child_player.powered:
- # TODO: this assumes that all client players support flac
- player_url = self.mass.streams.get_stream_url(
- self.queue_id, child_id, self._settings.stream_type
- )
- expected_clients.add(child_id)
- coros.append(child_player.play_url(player_url))
- await self.mass.streams.start_multi_client_queue_stream(
- # TODO: this assumes that all client players support flac
- self.queue_id,
- expected_clients,
- self._settings.stream_type,
- )
- await asyncio.gather(*coros)
- elif not passive:
- # regular (single player) request
- await self.player.play_url(self._stream_url)
+ # start the queue stream
+ await self.queue_stream_start(index, int(seek_position), fade_in, passive)
async def move_item(self, queue_item_id: str, pos_shift: int = 1) -> None:
"""
async def delete_item(self, queue_item_id: str) -> None:
"""Delete item (by id or index) from the queue."""
item_index = self.index_by_id(queue_item_id)
- if item_index <= self._index_in_buffer:
+ if self.stream and item_index <= self.index_in_buffer:
# ignore request if track already loaded in the buffer
# the frontend should guard so this is just in case
+ self.logger.warning("delete requested for item already loaded in buffer")
return
self._items.pop(item_index)
self.signal_update(True)
:param queue_items: a list of QueueItem
:param offset: offset from current queue position
"""
- if not self.items or self._current_index is None:
- return await self.load(queue_items)
- insert_at_index = self._current_index + offset
+ cur_index = self.index_in_buffer or self._current_index
+ if not self.items or cur_index is None:
+ return await self.load(queue_items, passive)
+ insert_at_index = cur_index + offset
for index, item in enumerate(queue_items):
item.sort_index = insert_at_index + index
if self.settings.shuffle_enabled and len(queue_items) > 5:
+ self._items[insert_at_index:]
)
- if offset in (0, self._index_in_buffer):
+ if offset in (0, cur_index):
await self.play_index(insert_at_index, passive=passive)
self.signal_update(True)
def on_player_update(self) -> None:
"""Call when player updates."""
- if self._last_state != self.player.state:
+ player_state_str = f"{self.player.state.value}.{self.player.current_url}"
+ if self._last_state != player_state_str:
# playback state changed
- self._last_state = self.player.state
+ self._last_state = player_state_str
# always signal update if playback state changed
self.signal_update()
):
self._current_index += 1
self._current_item_elapsed_time = 0
- # repeat enabled (of whole queue), play queue from beginning
- if self.settings.repeat_mode == RepeatMode.ALL:
- self.mass.create_task(self.play_index(0))
# handle case where stream stopped on purpose and we need to restart it
- elif self._signal_next:
- self._signal_next = False
+ elif self.signal_next:
+ self.signal_next = False
self.mass.create_task(self.resume())
# start poll/updater task if playback starts on player
)
)
- async def queue_stream_prepare(self) -> StreamDetails:
- """Call when queue_streamer is about to start playing."""
- start_from_index = self._next_start_index
- try:
- next_item = self._items[start_from_index]
- except (IndexError, TypeError) as err:
- raise QueueEmpty() from err
- try:
- return await get_stream_details(self.mass, next_item, self.queue_id)
- except MediaNotFoundError as err:
- # something bad happened, try to recover by requesting the next track in the queue
- await self.play_index(self._current_index + 2)
- raise err
-
- async def queue_stream_start(self) -> Tuple[int, int, int]:
- """Call when queue_streamer starts playing the queue stream."""
- start_from_index = self._next_start_index
+ async def queue_stream_start(
+ self, start_index: int, seek_position: int, fade_in: bool, passive: bool = False
+ ) -> None:
+ """Start the queue stream runner."""
+ players: List[Player] = []
+ output_format = self._settings.stream_type
+ # if multi stream is enabled, all child players should receive the same audio stream
+ if self.player.use_multi_stream:
+ for child_id in self.player.group_childs:
+ child_player = self.mass.players.get_player(child_id)
+ if not child_player or not child_player.powered:
+ continue
+ players.append(child_player)
+ else:
+ # regular (single player) request
+ players.append(self.player)
+
self._current_item_elapsed_time = 0
- self._current_index = start_from_index
- self._start_index = start_from_index
- self._next_start_index = self.get_next_index(start_from_index)
- self._index_in_buffer = start_from_index
- seek_position = self._next_start_pos
- self._next_start_pos = 0
- fade_in = self._next_fadein
- self._next_fadein = 0
- return (start_from_index, seek_position, fade_in)
-
- async def queue_stream_next(self, cur_index: int) -> int | None:
- """Call when queue_streamer loads next track in buffer."""
- next_idx = self._next_start_index
- self._index_in_buffer = next_idx
- self._next_start_index = self.get_next_index(self._next_start_index)
- return next_idx
-
- def get_next_index(self, cur_index: int) -> int:
+ self._current_index = start_index
+
+ # start the queue stream background task
+ stream = await self.mass.streams.start_queue_stream(
+ queue=self,
+ expected_clients=len(players),
+ start_index=start_index,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ output_format=output_format,
+ )
+ self._stream_id = stream.stream_id
+ # execute the play command on the player(s)
+ if not passive:
+ await asyncio.gather(*[x.play_url(stream.url) for x in players])
+
+ def get_next_index(self, cur_index: Optional[int]) -> int:
"""Return the next index for the queue, accounting for repeat settings."""
- if not self._items or cur_index is None:
- raise IndexError("No (more) items in queue")
+ # handle repeat single track
if self.settings.repeat_mode == RepeatMode.ONE:
return cur_index
+ # handle repeat all
+ if (
+ self.settings.repeat_mode == RepeatMode.ALL
+ and self._items
+ and cur_index == (len(self._items) - 1)
+ ):
+ return 0
# simply return the next index. other logic is guarded to detect the index
# being higher than the number of items to detect end of queue and/or handle repeat.
+ if cur_index is None:
+ return 0
return cur_index + 1
- async def queue_stream_signal_next(self):
- """Indicate that queue stream needs to start next index once playback finished."""
- self._signal_next = True
-
def signal_update(self, items_changed: bool = False) -> None:
"""Signal state changed of this queue."""
if items_changed:
"state": self.player.state.value,
"available": self.player.available,
"current_index": self.current_index,
+ "index_in_buffer": self.index_in_buffer,
"current_item": cur_item,
"next_item": next_item,
"settings": self.settings.to_dict(),
elapsed_time_queue = self.player.elapsed_time
total_time = 0
track_time = 0
- if self._items and len(self._items) > self._start_index:
- # start_index: holds the last starting position
- queue_index = self._start_index
+ if self._items and self.stream and len(self._items) > self.stream.start_index:
+ # start_index: holds the position from which the stream started
+ queue_index = self.stream.start_index
queue_track = None
while len(self._items) > queue_index:
# keep enumerating the queue tracks to find current track
if not queue_track.streamdetails:
track_time = elapsed_time_queue - total_time
break
- track_seconds = queue_track.streamdetails.seconds_played
- if elapsed_time_queue > (track_seconds + total_time):
+ duration = (
+ queue_track.streamdetails.seconds_streamed or queue_track.duration
+ )
+ if duration is not None and elapsed_time_queue > (
+ duration + total_time
+ ):
# total elapsed time is more than (streamed) track duration
# move index one up
- total_time += track_seconds
+ total_time += duration
queue_index += 1
else:
# no more seconds left to divide, this is our track
MediaItemType,
Playlist,
Radio,
+ StreamDetails,
Track,
)
-from music_assistant.models.player_queue import StreamDetails
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
if MediaType.PLAYLIST in self.supported_mediatypes:
raise NotImplementedError
- async def get_stream_details(self, item_id: str) -> StreamDetails:
+ async def get_stream_details(self, item_id: str) -> StreamDetails | None:
"""Get streamdetails for a track/radio."""
raise NotImplementedError
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ raise NotImplementedError
+
async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
"""Get single MediaItem from provider."""
if media_type == MediaType.ARTIST:
from mashumaro import DataClassDictMixin
-from music_assistant.models.enums import MediaType
+from music_assistant.models.enums import ContentType, MediaType, ProviderType
from music_assistant.models.media_items import Radio, StreamDetails, Track
return d
@classmethod
- def from_url(cls, url: str, name: Optional[str] = None) -> QueueItem:
- """Create QueueItem from plain url."""
+ def from_url(
+ cls,
+ url: str,
+ name: Optional[str] = None,
+ media_type: MediaType = MediaType.URL,
+ ) -> QueueItem:
+ """Create QueueItem from plain url (or local file)."""
return cls(
uri=url,
name=name or url.split("?")[0],
- media_type=MediaType.UNKNOWN,
- image=None,
- media_item=None,
+ media_type=media_type,
+ streamdetails=StreamDetails(
+ provider=ProviderType.URL,
+ item_id=url,
+ content_type=ContentType.try_parse(url),
+ media_type=media_type,
+ data=url,
+ ),
)
@classmethod
self._crossfade_mode: CrossFadeMode = CrossFadeMode.DISABLED
self._crossfade_duration: int = 6
self._volume_normalization_enabled: bool = True
- self._volume_normalization_target: int = -23
+ self._volume_normalization_target: int = -14
@property
def repeat_mode(self) -> RepeatMode:
@property
def stream_type(self) -> ContentType:
"""Return supported/preferred stream type for playerqueue. Read only."""
- # determine default stream type from player capabilities
- return next(
- x
- for x in (
- ContentType.FLAC,
- ContentType.WAV,
- ContentType.PCM_S16LE,
- ContentType.MP3,
- ContentType.MPEG,
- )
- if x in self._queue.player.supported_content_types
- )
+ return self._queue.player.stream_type
def to_dict(self) -> Dict[str, Any]:
"""Return dict from settings."""