Refactor streaming (#361)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 10 Jun 2022 21:29:31 +0000 (23:29 +0200)
committerGitHub <noreply@github.com>
Fri, 10 Jun 2022 21:29:31 +0000 (23:29 +0200)
* refactored audio streaming for better (pre)buffering

* Parse ICY metadata from radio stream

* fix alert feature

* set default volume normalisation target to -14

* use ffmpeg with libsoxr

* fix playback on UPNP devices (e.g. Sonos)

* Fix buffering issues

* Optimize memory and cpu consumption while streaming

20 files changed:
music_assistant/constants.py
music_assistant/controllers/music/__init__.py
music_assistant/controllers/music/providers/filesystem.py
music_assistant/controllers/music/providers/qobuz.py
music_assistant/controllers/music/providers/spotify.py
music_assistant/controllers/music/providers/tunein.py
music_assistant/controllers/music/providers/url.py [new file with mode: 0644]
music_assistant/controllers/stream.py [deleted file]
music_assistant/controllers/streams.py [new file with mode: 0644]
music_assistant/helpers/audio.py
music_assistant/helpers/process.py
music_assistant/mass.py
music_assistant/models/config.py
music_assistant/models/enums.py
music_assistant/models/media_items.py
music_assistant/models/player.py
music_assistant/models/player_queue.py
music_assistant/models/provider.py
music_assistant/models/queue_item.py
music_assistant/models/queue_settings.py

index 0d036d30ce1022a642ffac37a70f4271e444b6ca..1e88116e6f2f08b78a72c5fa0efd8aff2e45a76d 100755 (executable)
@@ -21,3 +21,6 @@ ATTR_CONFIG_ENTRIES = "config_entries"
 ATTR_UPDATED_AT = "updated_at"
 ATTR_ACTIVE_QUEUE = "active_queue"
 ATTR_GROUP_PARENTS = "group_parents"
+
+
+ROOT_LOGGER_NAME = "music_assistant"
index 5090a03d3b3b82c068e1fd681b7c3a1b1806e8ee..abd4796136a11a1e2a96b446272731bbe448a573 100755 (executable)
@@ -27,6 +27,8 @@ from .providers.filesystem import FileSystemProvider
 from .providers.qobuz import QobuzProvider
 from .providers.spotify import SpotifyProvider
 from .providers.tunein import TuneInProvider
+from .providers.url import PROVIDER_CONFIG as URL_CONFIG
+from .providers.url import URLProvider
 
 if TYPE_CHECKING:
     from music_assistant.mass import MusicAssistant
@@ -59,6 +61,8 @@ class MusicController:
         for prov_conf in self.mass.config.providers:
             prov_cls = PROV_MAP[prov_conf.type]
             await self._register_provider(prov_cls(self.mass, prov_conf), prov_conf)
+        # always register url provider
+        await self._register_provider(URLProvider(self.mass, URL_CONFIG), URL_CONFIG)
 
     async def start_sync(
         self,
index 904d28a41dc46adcdb3986237f4d2eebb1887bdd..e5ad97dbc2a586456652873d9de524173443a157 100644 (file)
@@ -14,6 +14,7 @@ from aiofiles.os import wrap
 from aiofiles.threadpool.binary import AsyncFileIO
 from tinytag.tinytag import TinyTag
 
+from music_assistant.helpers.audio import get_file_stream
 from music_assistant.helpers.compare import compare_strings
 from music_assistant.helpers.database import SCHEMA_VERSION
 from music_assistant.helpers.util import (
@@ -37,7 +38,6 @@ from music_assistant.models.media_items import (
     MediaType,
     Playlist,
     StreamDetails,
-    StreamType,
     Track,
 )
 from music_assistant.models.provider import MusicProvider
@@ -435,16 +435,29 @@ class FileSystemProvider(MusicProvider):
         _, ext = Path(itempath).name.rsplit(".", 1)
         content_type = CONTENT_TYPE_EXT.get(ext.lower())
 
+        stat = await self.mass.loop.run_in_executor(None, os.stat, itempath)
+
         return StreamDetails(
-            type=StreamType.FILE,
             provider=self.type,
             item_id=item_id,
             content_type=content_type,
-            path=itempath,
+            media_type=MediaType.TRACK,
+            duration=tags.duration,
+            size=stat.st_size,
             sample_rate=tags.samplerate or 44100,
             bit_depth=16,  # TODO: parse bitdepth
+            data=itempath,
         )
 
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        async for chunk in get_file_stream(
+            self.mass, streamdetails.data, streamdetails, seek_position
+        ):
+            yield chunk
+
     async def _parse_track(self, track_path: str) -> Track | None:
         """Try to parse a track from a filename by reading its tags."""
 
index 8ced8464cd39dd96f19a35e2ae73fe651bee9df3..65bbc48ae7cfc04f51b59cdf67470933f8369d9f 100644 (file)
@@ -13,11 +13,11 @@ from asyncio_throttle import Throttler
 from music_assistant.helpers.app_vars import (  # pylint: disable=no-name-in-module
     app_var,
 )
+from music_assistant.helpers.audio import get_http_stream
 from music_assistant.helpers.cache import use_cache
 from music_assistant.helpers.util import parse_title_and_version, try_parse_int
-from music_assistant.models.enums import EventType, ProviderType
-from music_assistant.models.errors import LoginFailed
-from music_assistant.models.event import MassEvent
+from music_assistant.models.enums import ProviderType
+from music_assistant.models.errors import LoginFailed, MediaNotFoundError
 from music_assistant.models.media_items import (
     Album,
     AlbumType,
@@ -31,7 +31,6 @@ from music_assistant.models.media_items import (
     MediaType,
     Playlist,
     StreamDetails,
-    StreamType,
     Track,
 )
 from music_assistant.models.provider import MusicProvider
@@ -61,12 +60,6 @@ class QobuzProvider(MusicProvider):
         token = await self._auth_token()
         if not token:
             raise LoginFailed(f"Login failed for user {self.config.username}")
-        # subscribe to stream events so we can report playback to Qobuz
-        self.mass.subscribe(
-            self.on_stream_event,
-            (EventType.STREAM_STARTED, EventType.STREAM_ENDED),
-            id_filter=self.type.value,
-        )
         return True
 
     async def search(
@@ -338,69 +331,81 @@ class QobuzProvider(MusicProvider):
                 streamdata = result
                 break
         if not streamdata:
-            self.logger.error("Unable to retrieve stream details for track %s", item_id)
-            return None
+            raise MediaNotFoundError(f"Unable to retrieve stream details for {item_id}")
         if streamdata["mime_type"] == "audio/mpeg":
             content_type = ContentType.MPEG
         elif streamdata["mime_type"] == "audio/flac":
             content_type = ContentType.FLAC
         else:
-            self.logger.error("Unsupported mime type for track %s", item_id)
-            return None
+            raise MediaNotFoundError(f"Unsupported mime type for {item_id}")
         return StreamDetails(
-            type=StreamType.URL,
             item_id=str(item_id),
             provider=self.type,
-            path=streamdata["url"],
             content_type=content_type,
+            duration=streamdata["duration"],
             sample_rate=int(streamdata["sampling_rate"] * 1000),
             bit_depth=streamdata["bit_depth"],
-            details=streamdata,  # we need these details for reporting playback
+            data=streamdata,  # we need these details for reporting playback
+            expires=time.time() + 1800,  # not sure about the real allowed value
         )
 
-    async def on_stream_event(self, event: MassEvent):
-        """
-        Received event from mass.
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        self.mass.create_task(self._report_playback_started(streamdetails))
+        bytes_sent = 0
+        try:
+            url = streamdetails.data["url"]
+            async for chunk in get_http_stream(
+                self.mass, url, streamdetails, seek_position
+            ):
+                yield chunk
+                bytes_sent += len(chunk)
+        finally:
+            if bytes_sent:
+                self.mass.create_task(
+                    self._report_playback_stopped(streamdetails, bytes_sent)
+                )
 
-        We use this to report playback start/stop to qobuz.
-        """
-        if not self._user_auth_info:
-            return
+    async def _report_playback_started(self, streamdetails: StreamDetails) -> None:
+        """Report playback start to qobuz."""
         # TODO: need to figure out if the streamed track is purchased by user
         # https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx
         # {"albums":{"total":0,"items":[]},"tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}}
-        if event.type == EventType.STREAM_STARTED:
-            # report streaming started to qobuz
-            device_id = self._user_auth_info["user"]["device"]["id"]
-            credential_id = self._user_auth_info["user"]["credential"]["id"]
-            user_id = self._user_auth_info["user"]["id"]
-            format_id = event.data.details["format_id"]
-            timestamp = int(time.time())
-            events = [
-                {
-                    "online": True,
-                    "sample": False,
-                    "intent": "stream",
-                    "device_id": device_id,
-                    "track_id": str(event.data.item_id),
-                    "purchase": False,
-                    "date": timestamp,
-                    "credential_id": credential_id,
-                    "user_id": user_id,
-                    "local": False,
-                    "format_id": format_id,
-                }
-            ]
-            await self._post_data("track/reportStreamingStart", data=events)
-        elif event.type == EventType.STREAM_ENDED:
-            # report streaming ended to qobuz
-            user_id = self._user_auth_info["user"]["id"]
-            await self._get_data(
-                "/track/reportStreamingEnd",
-                user_id=user_id,
-                track_id=str(event.data.item_id),
-                duration=try_parse_int(event.data.seconds_played),
-            )
+        device_id = self._user_auth_info["user"]["device"]["id"]
+        credential_id = self._user_auth_info["user"]["credential"]["id"]
+        user_id = self._user_auth_info["user"]["id"]
+        format_id = streamdetails.data["format_id"]
+        timestamp = int(time.time())
+        events = [
+            {
+                "online": True,
+                "sample": False,
+                "intent": "stream",
+                "device_id": device_id,
+                "track_id": str(streamdetails.item_id),
+                "purchase": False,
+                "date": timestamp,
+                "credential_id": credential_id,
+                "user_id": user_id,
+                "local": False,
+                "format_id": format_id,
+            }
+        ]
+        await self._post_data("track/reportStreamingStart", data=events)
+
+    async def _report_playback_stopped(
+        self, streamdetails: StreamDetails, bytes_sent: int
+    ) -> None:
+        """Report playback stop to qobuz."""
+        user_id = self._user_auth_info["user"]["id"]
+        await self._get_data(
+            "/track/reportStreamingEnd",
+            user_id=user_id,
+            track_id=str(streamdetails.item_id),
+            duration=try_parse_int(streamdetails.seconds_streamed),
+        )
 
     async def _parse_artist(self, artist_obj: dict):
         """Parse qobuz artist object to generic layout."""
index 1ea973815cce4b74e26494e4fc55bda04f081415..83d1f85fe1828e9db1a1b5f862d2c1f66218b075 100644 (file)
@@ -17,9 +17,10 @@ from music_assistant.helpers.app_vars import (  # noqa # pylint: disable=no-name
     app_var,
 )
 from music_assistant.helpers.cache import use_cache
+from music_assistant.helpers.process import AsyncProcess
 from music_assistant.helpers.util import parse_title_and_version
 from music_assistant.models.enums import ProviderType
-from music_assistant.models.errors import LoginFailed
+from music_assistant.models.errors import LoginFailed, MediaNotFoundError
 from music_assistant.models.media_items import (
     Album,
     AlbumType,
@@ -33,7 +34,6 @@ from music_assistant.models.media_items import (
     MediaType,
     Playlist,
     StreamDetails,
-    StreamType,
     Track,
 )
 from music_assistant.models.provider import MusicProvider
@@ -276,21 +276,39 @@ class SpotifyProvider(MusicProvider):
         # make sure a valid track is requested.
         track = await self.get_track(item_id)
         if not track:
-            return None
+            raise MediaNotFoundError(f"track {item_id} not found")
         # make sure that the token is still valid by just requesting it
         await self.get_token()
-        librespot = await self.get_librespot_binary()
-        librespot_exec = f'{librespot} -c "{self._cache_dir}" --pass-through -b 320 --single-track spotify://track:{track.item_id}'
         return StreamDetails(
-            type=StreamType.EXECUTABLE,
             item_id=track.item_id,
             provider=self.type,
-            path=librespot_exec,
             content_type=ContentType.OGG,
-            sample_rate=44100,
-            bit_depth=16,
+            duration=track.duration,
         )
 
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        # make sure that the token is still valid by just requesting it
+        await self.get_token()
+        librespot = await self.get_librespot_binary()
+        args = [
+            librespot,
+            "-c",
+            self._cache_dir,
+            "--pass-through",
+            "-b",
+            "320",
+            "--single-track",
+            f"spotify://track:{streamdetails.item_id}",
+        ]
+        if seek_position:
+            args += ["--start-position", str(int(seek_position))]
+        async with AsyncProcess(args) as librespot_proc:
+            async for chunk in librespot_proc.iterate_chunks():
+                yield chunk
+
     async def _parse_artist(self, artist_obj):
         """Parse spotify artist object to generic layout."""
         artist = Artist(
index 9769196662d2324f22f6a2bdf92138db284da325..fe6ef60a14dcbdc10ae5aba646a4cc2d7f7926d4 100644 (file)
@@ -5,10 +5,11 @@ from typing import AsyncGenerator, List, Optional
 
 from asyncio_throttle import Throttler
 
+from music_assistant.helpers.audio import get_radio_stream
 from music_assistant.helpers.cache import use_cache
 from music_assistant.helpers.util import create_clean_string
 from music_assistant.models.enums import ProviderType
-from music_assistant.models.errors import LoginFailed
+from music_assistant.models.errors import LoginFailed, MediaNotFoundError
 from music_assistant.models.media_items import (
     ContentType,
     ImageType,
@@ -19,7 +20,6 @@ from music_assistant.models.media_items import (
     MediaType,
     Radio,
     StreamDetails,
-    StreamType,
 )
 from music_assistant.models.provider import MusicProvider
 
@@ -154,17 +154,22 @@ class TuneInProvider(MusicProvider):
         for stream in stream_info["body"]:
             if stream["media_type"] == media_type:
                 return StreamDetails(
-                    type=StreamType.URL,
-                    item_id=item_id,
                     provider=self.type,
-                    path=stream["url"],
+                    item_id=item_id,
                     content_type=ContentType(stream["media_type"]),
-                    sample_rate=44100,
-                    bit_depth=16,
                     media_type=MediaType.RADIO,
-                    details=stream,
+                    data=stream,
                 )
-        return None
+        raise MediaNotFoundError(f"Unable to retrieve stream details for {item_id}")
+
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        async for chunk in get_radio_stream(
+            self.mass, streamdetails.data["url"], streamdetails
+        ):
+            yield chunk
 
     @use_cache(3600 * 2)
     async def __get_data(self, endpoint: str, **kwargs):
diff --git a/music_assistant/controllers/music/providers/url.py b/music_assistant/controllers/music/providers/url.py
new file mode 100644 (file)
index 0000000..1dbb50f
--- /dev/null
@@ -0,0 +1,74 @@
+"""Basic provider allowing for external URL's to be streamed."""
+from __future__ import annotations
+
+import os
+from typing import AsyncGenerator, List, Optional
+
+from music_assistant.helpers.audio import (
+    get_file_stream,
+    get_http_stream,
+    get_radio_stream,
+)
+from music_assistant.models.config import MusicProviderConfig
+from music_assistant.models.enums import ContentType, MediaType, ProviderType
+from music_assistant.models.media_items import MediaItemType, StreamDetails
+from music_assistant.models.provider import MusicProvider
+
+PROVIDER_CONFIG = MusicProviderConfig(ProviderType.URL)
+
+
+class URLProvider(MusicProvider):
+    """Music Provider for manual URL's/files added to the queue."""
+
+    _attr_name: str = "URL"
+    _attr_type: ProviderType = ProviderType.URL
+    _attr_available: bool = True
+    _attr_supported_mediatypes: List[MediaType] = []
+
+    async def setup(self) -> bool:
+        """
+        Handle async initialization of the provider.
+
+        Called when provider is registered.
+        """
+        return True
+
+    async def search(
+        self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
+    ) -> List[MediaItemType]:
+        """Perform search on musicprovider."""
+        return []
+
+    async def get_stream_details(self, item_id: str) -> StreamDetails | None:
+        """Get streamdetails for a track/radio."""
+        url = item_id
+        return StreamDetails(
+            provider=ProviderType.URL,
+            item_id=item_id,
+            content_type=ContentType.try_parse(url),
+            media_type=MediaType.URL,
+            data=url,
+        )
+
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        if streamdetails.media_type == MediaType.RADIO:
+            # radio stream url
+            async for chunk in get_radio_stream(
+                self.mass, streamdetails.data, streamdetails
+            ):
+                yield chunk
+        elif os.path.isfile(streamdetails.data):
+            # local file
+            async for chunk in get_file_stream(
+                self.mass, streamdetails.data, streamdetails, seek_position
+            ):
+                yield chunk
+        else:
+            # regular stream url (without icy meta and reconnect)
+            async for chunk in get_http_stream(
+                self.mass, streamdetails.data, streamdetails, seek_position
+            ):
+                yield chunk
diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py
deleted file mode 100644 (file)
index 12031ec..0000000
+++ /dev/null
@@ -1,663 +0,0 @@
-"""Controller to stream audio to players."""
-from __future__ import annotations
-
-import asyncio
-import urllib.parse
-from asyncio import Task
-from time import time
-from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional, Set
-
-from aiohttp import web
-
-from music_assistant.helpers.audio import (
-    check_audio_support,
-    create_wave_header,
-    crossfade_pcm_parts,
-    fadein_pcm_part,
-    get_media_stream,
-    get_preview_stream,
-    get_sox_args_for_pcm_stream,
-    get_stream_details,
-    strip_silence,
-)
-from music_assistant.helpers.process import AsyncProcess
-from music_assistant.models.enums import (
-    ContentType,
-    CrossFadeMode,
-    EventType,
-    MediaType,
-    ProviderType,
-)
-from music_assistant.models.errors import (
-    MediaNotFoundError,
-    MusicAssistantError,
-    QueueEmpty,
-)
-from music_assistant.models.event import MassEvent
-from music_assistant.models.player_queue import PlayerQueue, QueueItem
-
-if TYPE_CHECKING:
-    from music_assistant.mass import MusicAssistant
-
-
-class StreamController:
-    """Controller to stream audio to players."""
-
-    def __init__(self, mass: MusicAssistant):
-        """Initialize instance."""
-        self.mass = mass
-        self.logger = mass.logger.getChild("stream")
-        self._port = mass.config.stream_port
-        self._ip = mass.config.stream_ip
-        self._subscribers: Dict[str, Set[str]] = {}
-        self._client_queues: Dict[str, Dict[str, asyncio.Queue]] = {}
-        self._stream_tasks: Dict[str, Task] = {}
-        self._time_started: Dict[str, float] = {}
-
-    def get_stream_url(
-        self,
-        queue_id: str,
-        child_player: Optional[str] = None,
-        content_type: ContentType = ContentType.FLAC,
-    ) -> str:
-        """Return the full stream url for the PlayerQueue Stream."""
-        ext = content_type.value
-        if child_player:
-            return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{ext}"
-        return f"http://{self._ip}:{self._port}/{queue_id}.{ext}"
-
-    async def get_preview_url(self, provider: ProviderType, track_id: str) -> str:
-        """Return url to short preview sample."""
-        track = await self.mass.music.tracks.get_provider_item(track_id, provider)
-        if preview := track.metadata.preview:
-            return preview
-        enc_track_id = urllib.parse.quote(track_id)
-        return f"http://{self._ip}:{self._port}/preview?provider_id={provider.value}&item_id={enc_track_id}"
-
-    def get_silence_url(self, duration: int = 600) -> str:
-        """Return url to silence."""
-        return f"http://{self._ip}:{self._port}/silence?duration={duration}"
-
-    async def setup(self) -> None:
-        """Async initialize of module."""
-        app = web.Application()
-
-        app.router.add_get("/preview", self.serve_preview)
-        app.router.add_get("/silence", self.serve_silence)
-        app.router.add_get(
-            "/{queue_id}/{player_id}.{format}",
-            self.serve_multi_client_queue_stream,
-        )
-        app.router.add_get("/{queue_id}.{format}", self.serve_queue_stream)
-        app.router.add_get("/{queue_id}", self.serve_queue_stream)
-
-        runner = web.AppRunner(app, access_log=None)
-        await runner.setup()
-        # set host to None to bind to all addresses on both IPv4 and IPv6
-        http_site = web.TCPSite(runner, host=None, port=self._port)
-        await http_site.start()
-
-        async def on_shutdown_event(*event: MassEvent):
-            """Handle shutdown event."""
-            await http_site.stop()
-            await runner.cleanup()
-            await app.shutdown()
-            await app.cleanup()
-            self.logger.info("Streamserver exited.")
-
-        self.mass.subscribe(on_shutdown_event, EventType.SHUTDOWN)
-
-        sox_present, ffmpeg_present = await check_audio_support(True)
-        if not ffmpeg_present and not sox_present:
-            self.logger.error(
-                "SoX or FFmpeg binary not found on your system, "
-                "playback will NOT work!."
-            )
-        elif not ffmpeg_present:
-            self.logger.warning(
-                "The FFmpeg binary was not found on your system, "
-                "you might experience issues with playback. "
-                "Please install FFmpeg with your OS package manager.",
-            )
-        elif not sox_present:
-            self.logger.warning(
-                "The SoX binary was not found on your system, FFmpeg is used as fallback."
-            )
-
-        self.logger.info("Started stream server on port %s", self._port)
-
-    @staticmethod
-    async def serve_silence(request: web.Request):
-        """Serve silence."""
-        resp = web.StreamResponse(
-            status=200, reason="OK", headers={"Content-Type": "audio/wav"}
-        )
-        await resp.prepare(request)
-        duration = int(request.query.get("duration", 600))
-        await resp.write(create_wave_header(duration=duration))
-        for _ in range(0, duration):
-            await resp.write(b"\0" * 1764000)
-        return resp
-
-    async def serve_preview(self, request: web.Request):
-        """Serve short preview sample."""
-        provider_id = request.query["provider_id"]
-        item_id = urllib.parse.unquote(request.query["item_id"])
-        resp = web.StreamResponse(
-            status=200, reason="OK", headers={"Content-Type": "audio/mp3"}
-        )
-        await resp.prepare(request)
-        async for _, chunk in get_preview_stream(self.mass, provider_id, item_id):
-            await resp.write(chunk)
-        return resp
-
-    async def serve_queue_stream(self, request: web.Request):
-        """Serve queue audio stream to a single player (encoded to fileformat of choice)."""
-        queue_id = request.match_info["queue_id"]
-        fmt = request.match_info.get("format", "flac")
-        queue = self.mass.players.get_player_queue(queue_id)
-
-        if queue is None:
-            return web.Response(status=404)
-
-        # prepare request
-        try:
-            start_streamdetails = await queue.queue_stream_prepare()
-        except QueueEmpty:
-            # send stop here to prevent the player from retrying over and over
-            await queue.stop()
-            # send some silence to allow the player to process the stop request
-            return await self.serve_silence(request)
-
-        resp = web.StreamResponse(
-            status=200, reason="OK", headers={"Content-Type": f"audio/{fmt}"}
-        )
-        await resp.prepare(request)
-
-        output_fmt = ContentType(fmt)
-        # work out sample rate
-        if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
-            sample_rate = min(96000, queue.max_sample_rate)
-            bit_depth = 24
-            channels = 2
-            resample = True
-        else:
-            sample_rate = start_streamdetails.sample_rate
-            bit_depth = start_streamdetails.bit_depth
-            channels = start_streamdetails.channels
-            resample = False
-        sox_args = await get_sox_args_for_pcm_stream(
-            sample_rate,
-            bit_depth,
-            channels,
-            output_format=output_fmt,
-        )
-        # get the raw pcm bytes from the queue stream and on the fly encode to wanted format
-        # send the compressed/encoded stream to the client.
-        async with AsyncProcess(sox_args, True) as sox_proc:
-
-            async def writer():
-                # task that sends the raw pcm audio to the sox/ffmpeg process
-                async for audio_chunk in self._get_queue_stream(
-                    queue,
-                    sample_rate=sample_rate,
-                    bit_depth=bit_depth,
-                    channels=channels,
-                    resample=resample,
-                ):
-                    if sox_proc.closed:
-                        return
-                    await sox_proc.write(audio_chunk)
-                # write eof when last packet is received
-                sox_proc.write_eof()
-
-            sox_proc.attach_task(writer())
-
-            # read bytes from final output
-            async for audio_chunk in sox_proc.iterate_chunks():
-                await resp.write(audio_chunk)
-
-        return resp
-
-    async def serve_multi_client_queue_stream(self, request: web.Request):
-        """Serve queue audio stream to multiple (group)clients."""
-        queue_id = request.match_info["queue_id"]
-        player_id = request.match_info["player_id"]
-        fmt = request.match_info.get("format", "flac")
-        queue = self.mass.players.get_player_queue(queue_id)
-        player = self.mass.players.get_player(player_id)
-
-        if queue is None or player is None:
-            return web.Response(status=404)
-
-        # prepare request
-        resp = web.StreamResponse(
-            status=200,
-            reason="OK",
-            headers={"Content-Type": f"audio/{fmt}"},
-        )
-        await resp.prepare(request)
-
-        # start delivering audio chunks
-        await self.subscribe_client(queue_id, player_id)
-        try:
-            while True:
-                client_queue = self._client_queues.get(queue_id).get(player_id)
-                if not client_queue:
-                    break
-                audio_chunk = await client_queue.get()
-                if audio_chunk == b"":
-                    # eof
-                    break
-                await resp.write(audio_chunk)
-                client_queue.task_done()
-        finally:
-            await self.unsubscribe_client(queue_id, player_id)
-        return resp
-
-    async def subscribe_client(self, queue_id: str, player_id: str) -> None:
-        """Subscribe client to queue stream."""
-        if queue_id not in self._stream_tasks:
-            raise MusicAssistantError(f"No Queue stream available for {queue_id}")
-
-        if queue_id not in self._subscribers:
-            self._subscribers[queue_id] = set()
-        self._subscribers[queue_id].add(player_id)
-
-        self.logger.debug(
-            "Subscribed player %s to multi queue stream %s",
-            player_id,
-            queue_id,
-        )
-
-    async def unsubscribe_client(self, queue_id: str, player_id: str):
-        """Unsubscribe client from queue stream."""
-        if player_id in self._subscribers[queue_id]:
-            self._subscribers[queue_id].remove(player_id)
-
-        self.__cleanup_client_queue(queue_id, player_id)
-        self.logger.debug(
-            "Unsubscribed player %s from multi queue stream %s", player_id, queue_id
-        )
-        if len(self._subscribers[queue_id]) == 0:
-            # no more clients, cancel stream task
-            if task := self._stream_tasks.pop(queue_id, None):
-                self.logger.debug(
-                    "Aborted multi queue stream %s due to no more clients", queue_id
-                )
-                task.cancel()
-
-    async def start_multi_client_queue_stream(
-        self, queue_id: str, expected_clients: Set[str], output_fmt: ContentType
-    ) -> None:
-        """Start the Queue stream feeding callbacks of listeners.."""
-        assert queue_id not in self._stream_tasks, "already running!"
-
-        self._time_started[queue_id] = time()
-
-        # create queue for expected clients
-        self._client_queues.setdefault(queue_id, {})
-        for child_id in expected_clients:
-            self._client_queues[queue_id][child_id] = asyncio.Queue(10)
-
-        self._stream_tasks[queue_id] = asyncio.create_task(
-            self.__multi_client_queue_stream_runner(queue_id, output_fmt)
-        )
-
-    async def stop_multi_client_queue_stream(self, queue_id: str) -> None:
-        """Signal a running queue stream task and its listeners to stop."""
-        if queue_id not in self._stream_tasks:
-            return
-
-        # send stop to child players
-        await asyncio.gather(
-            *[
-                self.mass.players.get_player(client_id).stop()
-                for client_id in self._subscribers.get(queue_id, {})
-            ]
-        )
-
-        # stop background task
-        if stream_task := self._stream_tasks.pop(queue_id, None):
-            stream_task.cancel()
-
-        # wait for cleanup
-        while len(self._subscribers.get(queue_id, {})) != 0:
-            await asyncio.sleep(0.1)
-        while len(self._client_queues.get(queue_id, {})) != 0:
-            await asyncio.sleep(0.1)
-
-    async def __multi_client_queue_stream_runner(
-        self, queue_id: str, output_fmt: ContentType
-    ):
-        """Distribute audio chunks over connected clients in a multi client queue stream."""
-        queue = self.mass.players.get_player_queue(queue_id)
-
-        start_streamdetails = await queue.queue_stream_prepare()
-        # work out sample rate
-        if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
-            sample_rate = min(96000, queue.max_sample_rate)
-            bit_depth = 24
-            channels = 2
-            resample = True
-        else:
-            sample_rate = start_streamdetails.sample_rate
-            bit_depth = start_streamdetails.bit_depth
-            channels = start_streamdetails.channels
-            resample = False
-        sox_args = await get_sox_args_for_pcm_stream(
-            sample_rate,
-            bit_depth,
-            channels,
-            output_format=output_fmt,
-        )
-        self.logger.debug("Multi client queue stream %s started", queue.queue_id)
-        try:
-
-            # get the raw pcm bytes from the queue stream and on the fly encode to wanted format
-            # send the compressed/endoded stream to the client.
-            async with AsyncProcess(sox_args, True) as sox_proc:
-
-                async def writer():
-                    """Task that sends the raw pcm audio to the sox/ffmpeg process."""
-                    async for audio_chunk in self._get_queue_stream(
-                        queue,
-                        sample_rate=sample_rate,
-                        bit_depth=bit_depth,
-                        channels=channels,
-                        resample=resample,
-                    ):
-                        if sox_proc.closed:
-                            return
-                        await sox_proc.write(audio_chunk)
-                    # write eof when last packet is received
-                    sox_proc.write_eof()
-
-                async def reader():
-                    """Read bytes from final output and put chunk on child queues."""
-                    chunks_sent = 0
-                    async for chunk in sox_proc.iterate_chunks():
-                        chunks_sent += 1
-                        coros = []
-                        for player_id in list(self._client_queues[queue_id].keys()):
-                            if (
-                                self._client_queues[queue_id][player_id].full()
-                                and chunks_sent >= 10
-                                and player_id not in self._subscribers[queue_id]
-                            ):
-                                # assume client did not connect at all or got disconnected somehow
-                                self.__cleanup_client_queue(queue_id, player_id)
-                                self._client_queues[queue_id].pop(player_id, None)
-                            else:
-                                coros.append(
-                                    self._client_queues[queue_id][player_id].put(chunk)
-                                )
-                        await asyncio.gather(*coros)
-
-                # launch the reader and writer
-                await asyncio.gather(*[writer(), reader()])
-                # wait for all queues to consume their data
-                await asyncio.gather(
-                    *[cq.join() for cq in self._client_queues[queue_id].values()]
-                )
-                # send empty chunk to inform EOF
-                await asyncio.gather(
-                    *[cq.put(b"") for cq in self._client_queues[queue_id].values()]
-                )
-
-        finally:
-            self.logger.debug("Multi client queue stream %s finished", queue.queue_id)
-            # cleanup
-            self._stream_tasks.pop(queue_id, None)
-            for player_id in list(self._client_queues[queue_id].keys()):
-                self.__cleanup_client_queue(queue_id, player_id)
-
-            self.logger.debug("Multi client queue stream %s ended", queue.queue_id)
-
-    def __cleanup_client_queue(self, queue_id: str, player_id: str):
-        """Cleanup a client queue after it completes/disconnects."""
-        if client_queue := self._client_queues.get(queue_id, {}).pop(player_id, None):
-            for _ in range(client_queue.qsize()):
-                client_queue.get_nowait()
-                client_queue.task_done()
-            client_queue.put_nowait(b"")
-
-    async def _get_queue_stream(
-        self,
-        queue: PlayerQueue,
-        sample_rate: int,
-        bit_depth: int,
-        channels: int = 2,
-        resample: bool = False,
-    ) -> AsyncGenerator[None, bytes]:
-        """Stream the PlayerQueue's tracks as constant feed of PCM raw audio."""
-        bytes_written_total = 0
-        last_fadeout_data = b""
-        queue_index = None
-        track_count = 0
-        prev_track: Optional[QueueItem] = None
-
-        pcm_fmt = ContentType.from_bit_depth(bit_depth)
-        self.logger.info(
-            "Starting Queue audio stream for Queue %s (PCM format: %s - sample rate: %s)",
-            queue.player.name,
-            pcm_fmt.value,
-            sample_rate,
-        )
-
-        # stream queue tracks one by one
-        while True:
-            # get the (next) track in queue
-            track_count += 1
-            if track_count == 1:
-                # report start of queue playback so we can calculate current track/duration etc.
-                queue_index, seek_position, fade_in = await queue.queue_stream_start()
-            else:
-                queue_index = await queue.queue_stream_next(queue_index)
-                seek_position = 0
-                fade_in = 0
-            queue_track = queue.get_item(queue_index)
-            if not queue_track:
-                self.logger.debug(
-                    "Abort Queue stream %s: no (more) tracks in queue", queue.queue_id
-                )
-                break
-            # get streamdetails
-            try:
-                streamdetails = await get_stream_details(
-                    self.mass, queue_track, queue.queue_id
-                )
-            except MediaNotFoundError as err:
-                self.logger.warning(
-                    "Skip track %s due to missing streamdetails",
-                    queue_track.name,
-                    exc_info=err,
-                )
-                continue
-
-            # check the PCM samplerate/bitrate
-            if not resample and streamdetails.bit_depth > bit_depth:
-                await queue.queue_stream_signal_next()
-                self.logger.debug(
-                    "Abort queue stream %s due to bit depth mismatch", queue.player.name
-                )
-                break
-            if (
-                not resample
-                and streamdetails.sample_rate > sample_rate
-                and streamdetails.sample_rate <= queue.max_sample_rate
-            ):
-                self.logger.debug(
-                    "Abort queue stream %s due to sample rate mismatch",
-                    queue.player.name,
-                )
-                await queue.queue_stream_signal_next()
-                break
-
-            # check crossfade ability
-            use_crossfade = queue.settings.crossfade_mode != CrossFadeMode.DISABLED
-            if (
-                prev_track is not None
-                and prev_track.media_type == MediaType.TRACK
-                and queue_track.media_type == MediaType.TRACK
-            ):
-                prev_item = await self.mass.music.get_item_by_uri(prev_track.uri)
-                new_item = await self.mass.music.get_item_by_uri(queue_track.uri)
-                if (
-                    prev_item.album is not None
-                    and new_item.album is not None
-                    and prev_item.album == new_item.album
-                ):
-                    use_crossfade = False
-            prev_track = queue_track
-
-            sample_size = int(sample_rate * (bit_depth / 8) * channels)  # 1 second
-            buffer_size = sample_size * (queue.settings.crossfade_duration or 2)
-            # force small buffer for radio to prevent too much lag at start
-            if queue_track.media_type != MediaType.TRACK:
-                use_crossfade = False
-                buffer_size = sample_size
-
-            self.logger.info(
-                "Start Streaming queue track: %s (%s) for queue %s",
-                queue_track.uri,
-                queue_track.name,
-                queue.player.name,
-            )
-            queue_track.streamdetails.seconds_skipped = seek_position
-            fade_in_part = b""
-            cur_chunk = 0
-            prev_chunk = None
-            bytes_written = 0
-            # handle incoming audio chunks
-            async for is_last_chunk, chunk in get_media_stream(
-                self.mass,
-                streamdetails,
-                pcm_fmt,
-                resample=sample_rate,
-                chunk_size=buffer_size,
-                seek_position=seek_position,
-            ):
-                cur_chunk += 1
-
-                # HANDLE FIRST PART OF TRACK
-                if not chunk and bytes_written == 0 and is_last_chunk:
-                    # stream error: got empy first chunk ?!
-                    self.logger.warning("Stream error on %s", queue_track.uri)
-                elif cur_chunk == 1 and last_fadeout_data:
-                    prev_chunk = chunk
-                    del chunk
-                elif cur_chunk == 1 and fade_in:
-                    # fadein first chunk
-                    fadein_first_part = await fadein_pcm_part(
-                        chunk, fade_in, pcm_fmt, sample_rate
-                    )
-                    yield fadein_first_part
-                    bytes_written += len(fadein_first_part)
-                    del chunk
-                    del fadein_first_part
-                elif cur_chunk <= 2 and not last_fadeout_data:
-                    # no fadeout_part available so just pass it to the output directly
-                    yield chunk
-                    bytes_written += len(chunk)
-                    del chunk
-                # HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN
-                elif cur_chunk == 2 and last_fadeout_data:
-                    # combine the first 2 chunks and strip off silence
-                    first_part = await strip_silence(
-                        prev_chunk + chunk, pcm_fmt, sample_rate
-                    )
-                    if len(first_part) < buffer_size:
-                        # part is too short after the strip action?!
-                        # so we just use the full first part
-                        first_part = prev_chunk + chunk
-                    fade_in_part = first_part[:buffer_size]
-                    remaining_bytes = first_part[buffer_size:]
-                    del first_part
-                    # do crossfade
-                    crossfade_part = await crossfade_pcm_parts(
-                        fade_in_part,
-                        last_fadeout_data,
-                        queue.settings.crossfade_duration,
-                        pcm_fmt,
-                        sample_rate,
-                    )
-                    # send crossfade_part
-                    yield crossfade_part
-                    bytes_written += len(crossfade_part)
-                    del crossfade_part
-                    del fade_in_part
-                    last_fadeout_data = b""
-                    # also write the leftover bytes from the strip action
-                    yield remaining_bytes
-                    bytes_written += len(remaining_bytes)
-                    del remaining_bytes
-                    del chunk
-                    prev_chunk = None  # needed to prevent this chunk being sent again
-                # HANDLE LAST PART OF TRACK
-                elif prev_chunk and is_last_chunk:
-                    # last chunk received so create the last_part
-                    # with the previous chunk and this chunk
-                    # and strip off silence
-                    last_part = await strip_silence(
-                        prev_chunk + chunk, pcm_fmt, sample_rate, True
-                    )
-                    if len(last_part) < buffer_size:
-                        # part is too short after the strip action
-                        # so we just use the entire original data
-                        last_part = prev_chunk + chunk
-                    if not use_crossfade or len(last_part) < buffer_size:
-                        # crossfading is not enabled or not enough data,
-                        # so just pass the (stripped) audio data
-                        if use_crossfade:
-                            self.logger.warning(
-                                "Not enough data for crossfade: %s", len(last_part)
-                            )
-                        yield last_part
-                        bytes_written += len(last_part)
-                        del last_part
-                        del chunk
-                    else:
-                        # handle crossfading support
-                        # store fade section to be picked up for next track
-                        last_fadeout_data = last_part[-buffer_size:]
-                        remaining_bytes = last_part[:-buffer_size]
-                        # write remaining bytes
-                        if remaining_bytes:
-                            yield remaining_bytes
-                            bytes_written += len(remaining_bytes)
-                        del last_part
-                        del remaining_bytes
-                        del chunk
-                # MIDDLE PARTS OF TRACK
-                else:
-                    # middle part of the track
-                    # keep previous chunk in memory so we have enough
-                    # samples to perform the crossfade
-                    if prev_chunk:
-                        yield prev_chunk
-                        bytes_written += len(prev_chunk)
-                        prev_chunk = chunk
-                    else:
-                        prev_chunk = chunk
-                    del chunk
-                # allow clients to only buffer max ~10 seconds ahead
-                queue_track.streamdetails.seconds_played = bytes_written / sample_size
-                seconds_buffered = (bytes_written_total + bytes_written) / sample_size
-                seconds_needed = queue.player.elapsed_time + 10
-                diff = seconds_buffered - seconds_needed
-                track_time = queue_track.duration or 0
-                if track_time > 10 and diff > 1:
-                    await asyncio.sleep(diff)
-            # end of the track reached
-            bytes_written_total += bytes_written
-            self.logger.debug(
-                "Finished Streaming queue track: %s (%s) on queue %s",
-                queue_track.uri,
-                queue_track.name,
-                queue.player.name,
-            )
-        # end of queue reached, pass last fadeout bits to final output
-        yield last_fadeout_data
-        # END OF QUEUE STREAM
-        self.logger.info("Queue stream for Queue %s finished.", queue.player.name)
diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py
new file mode 100644 (file)
index 0000000..96fad46
--- /dev/null
@@ -0,0 +1,620 @@
+"""Controller to stream audio to players."""
+from __future__ import annotations
+
+import asyncio
+import gc
+import urllib.parse
+from types import CoroutineType
+from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional
+from uuid import uuid4
+
+from aiohttp import web
+
+from music_assistant.helpers.audio import (
+    check_audio_support,
+    create_wave_header,
+    crossfade_pcm_parts,
+    fadein_pcm_part,
+    get_chunksize,
+    get_ffmpeg_args_for_pcm_stream,
+    get_media_stream,
+    get_preview_stream,
+    get_stream_details,
+    strip_silence,
+)
+from music_assistant.helpers.process import AsyncProcess
+from music_assistant.models.enums import (
+    ContentType,
+    CrossFadeMode,
+    EventType,
+    MediaType,
+    ProviderType,
+)
+from music_assistant.models.errors import MediaNotFoundError, QueueEmpty
+from music_assistant.models.event import MassEvent
+from music_assistant.models.player_queue import PlayerQueue
+from music_assistant.models.queue_item import QueueItem
+
+if TYPE_CHECKING:
+    from music_assistant.mass import MusicAssistant
+
+
+class StreamsController:
+    """Controller to stream audio to players."""
+
+    def __init__(self, mass: MusicAssistant):
+        """Initialize instance."""
+        self.mass = mass
+        self.logger = mass.logger.getChild("stream")
+        self._port = mass.config.stream_port
+        self._ip = mass.config.stream_ip
+        self.queue_streams: Dict[str, QueueStream] = {}
+
+    @property
+    def base_url(self) -> str:
+        """Return the base url for the stream engine."""
+        return f"http://{self._ip}:{self._port}"
+
+    def get_stream_url(
+        self,
+        stream_id: str,
+        content_type: ContentType = ContentType.FLAC,
+    ) -> str:
+        """Generate unique stream url for the PlayerQueue Stream."""
+        ext = content_type.value
+        return f"{self.base_url}/{stream_id}.{ext}"
+
+    async def get_preview_url(self, provider: ProviderType, track_id: str) -> str:
+        """Return url to short preview sample."""
+        track = await self.mass.music.tracks.get_provider_item(track_id, provider)
+        if preview := track.metadata.preview:
+            return preview
+        enc_track_id = urllib.parse.quote(track_id)
+        return f"{self.base_url}/preview?provider_id={provider.value}&item_id={enc_track_id}"
+
+    def get_silence_url(self, duration: int = 600) -> str:
+        """Return url to silence."""
+        return f"{self.base_url}/silence?duration={duration}"
+
+    async def setup(self) -> None:
+        """Async initialize of module."""
+        app = web.Application()
+
+        app.router.add_get("/preview", self.serve_preview)
+        app.router.add_get("/silence", self.serve_silence)
+        app.router.add_get("/{stream_id}.{format}", self.serve_queue_stream)
+
+        runner = web.AppRunner(app, access_log=None)
+        await runner.setup()
+        # set host to None to bind to all addresses on both IPv4 and IPv6
+        http_site = web.TCPSite(runner, host=None, port=self._port)
+        await http_site.start()
+
+        async def on_shutdown_event(*event: MassEvent):
+            """Handle shutdown event."""
+            await http_site.stop()
+            await runner.cleanup()
+            await app.shutdown()
+            await app.cleanup()
+            self.logger.info("Streamserver exited.")
+
+        self.mass.subscribe(on_shutdown_event, EventType.SHUTDOWN)
+
+        ffmpeg_present, libsoxr_support = await check_audio_support(True)
+        if not ffmpeg_present:
+            self.logger.error(
+                "FFmpeg binary not found on your system, " "playback will NOT work!."
+            )
+        elif not libsoxr_support:
+            self.logger.warning(
+                "FFmpeg version found without libsoxr support, "
+                "highest quality audio not available. "
+            )
+
+        self.logger.info("Started stream server on port %s", self._port)
+
+    @staticmethod
+    async def serve_silence(request: web.Request):
+        """Serve silence."""
+        resp = web.StreamResponse(
+            status=200, reason="OK", headers={"Content-Type": "audio/wav"}
+        )
+        await resp.prepare(request)
+        duration = int(request.query.get("duration", 600))
+        await resp.write(create_wave_header(duration=duration))
+        for _ in range(0, duration):
+            await resp.write(b"\0" * 1764000)
+        return resp
+
+    async def serve_preview(self, request: web.Request):
+        """Serve short preview sample."""
+        provider_id = request.query["provider_id"]
+        item_id = urllib.parse.unquote(request.query["item_id"])
+        resp = web.StreamResponse(
+            status=200, reason="OK", headers={"Content-Type": "audio/mp3"}
+        )
+        await resp.prepare(request)
+        async for chunk in get_preview_stream(self.mass, provider_id, item_id):
+            await resp.write(chunk)
+        return resp
+
+    async def serve_queue_stream(self, request: web.Request):
+        """Serve queue audio stream to a single player."""
+        self.logger.info(request)
+        self.logger.info(request.headers)
+
+        stream_id = request.match_info["stream_id"]
+        queue_stream = self.queue_streams.get(stream_id)
+
+        if queue_stream is None:
+            self.logger.warning("Got stream request for unknown id: %s", stream_id)
+            return web.Response(status=404)
+
+        # prepare request, add some DLNA/UPNP compatible headers
+        headers = {
+            "Content-Type": f"audio/{queue_stream.output_format.value}",
+            "transferMode.dlna.org": "Streaming",
+            "Connection": "Close",
+            "contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000",
+        }
+        resp = web.StreamResponse(headers=headers)
+        await resp.prepare(request)
+        client_id = request.remote
+        await queue_stream.subscribe(client_id, resp.write)
+
+        return resp
+
+    async def start_queue_stream(
+        self,
+        queue: PlayerQueue,
+        expected_clients: int,
+        start_index: int,
+        seek_position: int,
+        fade_in: bool,
+        output_format: ContentType,
+    ) -> QueueStream:
+        """Start running a queue stream."""
+        # cleanup stale previous queue tasks
+
+        # generate unique stream url
+        stream_id = uuid4().hex
+        # determine the pcm details based on the first track we need to stream
+        try:
+            first_item = queue.items[start_index]
+        except (IndexError, TypeError) as err:
+            raise QueueEmpty() from err
+
+        streamdetails = await get_stream_details(self.mass, first_item, queue.queue_id)
+
+        # work out pcm details
+        if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
+            pcm_sample_rate = min(96000, queue.max_sample_rate)
+            pcm_bit_depth = 24
+            pcm_channels = 2
+            pcm_resample = True
+        elif streamdetails.sample_rate > queue.max_sample_rate:
+            pcm_sample_rate = queue.max_sample_rate
+            pcm_bit_depth = streamdetails.bit_depth
+            pcm_channels = streamdetails.channels
+            pcm_resample = True
+        else:
+            pcm_sample_rate = streamdetails.sample_rate
+            pcm_bit_depth = streamdetails.bit_depth
+            pcm_channels = streamdetails.channels
+            pcm_resample = False
+
+        self.queue_streams[stream_id] = stream = QueueStream(
+            queue=queue,
+            stream_id=stream_id,
+            expected_clients=expected_clients,
+            start_index=start_index,
+            seek_position=seek_position,
+            fade_in=fade_in,
+            output_format=output_format,
+            pcm_sample_rate=pcm_sample_rate,
+            pcm_bit_depth=pcm_bit_depth,
+            pcm_channels=pcm_channels,
+            pcm_resample=pcm_resample,
+            autostart=True,
+        )
+        self.mass.create_task(self.cleanup_stale)
+        return stream
+
+    def cleanup_stale(self) -> None:
+        """Cleanup stale/done stream tasks."""
+        stale = set()
+        for stream_id, stream in self.queue_streams.items():
+            if stream.done.is_set() and not stream.connected_clients:
+                stale.add(stream_id)
+        for stream_id in stale:
+            self.queue_streams.pop(stream_id, None)
+
+
+class QueueStream:
+    """Representation of a (multisubscriber) Audio Queue stream."""
+
+    def __init__(
+        self,
+        queue: PlayerQueue,
+        stream_id: str,
+        expected_clients: int,
+        start_index: int,
+        seek_position: int,
+        fade_in: bool,
+        output_format: ContentType,
+        pcm_sample_rate: int,
+        pcm_bit_depth: int,
+        pcm_channels: int = 2,
+        pcm_floating_point: bool = False,
+        pcm_resample: bool = False,
+        autostart: bool = False,
+    ):
+        """Init QueueStreamJob instance."""
+        self.queue = queue
+        self.stream_id = stream_id
+        self.expected_clients = expected_clients
+        self.start_index = start_index
+        self.seek_position = seek_position
+        self.fade_in = fade_in
+        self.output_format = output_format
+        self.pcm_sample_rate = pcm_sample_rate
+        self.pcm_bit_depth = pcm_bit_depth
+        self.pcm_channels = pcm_channels
+        self.pcm_floating_point = pcm_floating_point
+        self.pcm_resample = pcm_resample
+        self.url = queue.mass.streams.get_stream_url(stream_id, output_format)
+
+        self.mass = queue.mass
+        self.logger = self.queue.logger.getChild("stream")
+        self.expected_clients = expected_clients
+        self.connected_clients: Dict[str, CoroutineType[bytes]] = {}
+        self._runner_task: Optional[asyncio.Task] = None
+        self.done = asyncio.Event()
+        self.all_clients_connected = asyncio.Event()
+        self.index_in_buffer = start_index
+        self.signal_next: bool = False
+        if autostart:
+            self.mass.create_task(self.start())
+
+    async def start(self) -> None:
+        """Start running queue stream."""
+        self._runner_task = self.mass.create_task(self._queue_stream_runner())
+
+    async def stop(self) -> None:
+        """Stop running queue stream and cleanup."""
+        self.done.set()
+        if self._runner_task and not self._runner_task.done():
+            self._runner_task.cancel()
+            # allow some time to cleanup
+            await asyncio.sleep(2)
+
+        self._runner_task = None
+        self.connected_clients = {}
+
+        # run garbage collection manually due to the high number of
+        # processed bytes blocks
+        loop = asyncio.get_running_loop()
+        await loop.run_in_executor(None, gc.collect)
+        self.logger.debug("Stream job %s cleaned up", self.stream_id)
+
+    async def subscribe(self, client_id: str, callback: CoroutineType[bytes]) -> None:
+        """Subscribe callback and wait for completion."""
+        assert client_id not in self.connected_clients, "Client is already connected"
+        assert not self.done.is_set(), "Stream task is already finished"
+        self.connected_clients[client_id] = callback
+        self.logger.debug("client connected: %s", client_id)
+        if len(self.connected_clients) == self.expected_clients:
+            self.all_clients_connected.set()
+        try:
+            await self.done.wait()
+        finally:
+            self.connected_clients.pop(client_id, None)
+            self.logger.debug("client disconnected: %s", client_id)
+            if len(self.connected_clients) == 0:
+                # no more clients, perform cleanup
+                await self.stop()
+
+    async def _queue_stream_runner(self) -> None:
+        """Distribute audio chunks over connected client queues."""
+        ffmpeg_args = await get_ffmpeg_args_for_pcm_stream(
+            self.pcm_sample_rate,
+            self.pcm_bit_depth,
+            self.pcm_channels,
+            output_format=self.output_format,
+        )
+        # get the raw pcm bytes from the queue stream and on the fly encode to wanted format
+        # send the compressed/encoded stream to the client(s).
+        chunk_size = get_chunksize(self.output_format)
+        async with AsyncProcess(ffmpeg_args, True, chunk_size) as ffmpeg_proc:
+
+            async def writer():
+                """Task that sends the raw pcm audio to the ffmpeg process."""
+                async for audio_chunk in self._get_queue_stream():
+                    await ffmpeg_proc.write(audio_chunk)
+                    del audio_chunk
+                # write eof when last packet is received
+                ffmpeg_proc.write_eof()
+
+            ffmpeg_proc.attach_task(writer())
+
+            # wait max 5 seconds for all client(s) to connect
+            try:
+                await asyncio.wait_for(self.all_clients_connected.wait(), 5)
+            except asyncio.exceptions.TimeoutError:
+                self.logger.warning(
+                    "Abort: client(s) did not connect within 5 seconds."
+                )
+                self.done.set()
+                return
+            self.logger.debug("%s clients connected", len(self.connected_clients))
+
+            # Read bytes from final output and send chunk to child callback.
+            async for chunk in ffmpeg_proc.iterate_chunks():
+                if len(self.connected_clients) == 0:
+                    # no more clients
+                    self.done.set()
+                    self.logger.debug("Abort: all clients diconnected.")
+                    return
+                for client_id in set(self.connected_clients.keys()):
+                    try:
+                        callback = self.connected_clients[client_id]
+                        await callback(chunk)
+                    except (
+                        ConnectionResetError,
+                        KeyError,
+                        BrokenPipeError,
+                    ):
+                        self.connected_clients.pop(client_id, None)
+
+                del chunk
+
+            # complete queue streamed
+            if self.signal_next:
+                # the queue stream was aborted (e.g. because of sample rate mismatch)
+                # tell the queue to load the next track (restart stream) as soon
+                # as the player finished playing and returns to idle
+                self.queue.signal_next = True
+
+        # all queue data has been streamed. Either because the queue is exhausted
+        # or we need to restart the stream due to decoder/sample rate mismatch
+        # set event that this stream task is finished
+        # if the stream is restarted by the queue manager afterwards is controlled
+        # by the `signal_next` bool above.
+        self.done.set()
+
+    async def _get_queue_stream(
+        self,
+    ) -> AsyncGenerator[None, bytes]:
+        """Stream the PlayerQueue's tracks as constant feed of PCM raw audio."""
+        last_fadeout_data = b""
+        queue_index = None
+        track_count = 0
+        prev_track: Optional[QueueItem] = None
+
+        pcm_fmt = ContentType.from_bit_depth(self.pcm_bit_depth)
+        self.logger.debug(
+            "Starting Queue audio stream for Queue %s (PCM format: %s - sample rate: %s)",
+            self.queue.player.name,
+            pcm_fmt.value,
+            self.pcm_sample_rate,
+        )
+
+        # stream queue tracks one by one
+        while True:
+            # get the (next) track in queue
+            track_count += 1
+            if track_count == 1:
+                queue_index = self.start_index
+                seek_position = self.seek_position
+                fade_in = self.fade_in
+            else:
+                queue_index = self.queue.get_next_index(queue_index)
+                seek_position = 0
+                fade_in = False
+            self.index_in_buffer = queue_index
+            # send signal that we've loaded a new track into the buffer
+            self.queue.signal_update()
+            queue_track = self.queue.get_item(queue_index)
+            if not queue_track:
+                self.logger.debug(
+                    "Abort Queue stream %s: no (more) tracks in queue",
+                    self.queue.queue_id,
+                )
+                break
+            # get streamdetails
+            try:
+                streamdetails = await get_stream_details(
+                    self.mass, queue_track, self.queue.queue_id
+                )
+            except MediaNotFoundError as err:
+                self.logger.warning(
+                    "Skip track %s due to missing streamdetails",
+                    queue_track.name,
+                    exc_info=err,
+                )
+                continue
+
+            if queue_track.name == "alert":
+                self.pcm_resample = True
+
+            # check the PCM samplerate/bitrate
+            if not self.pcm_resample and streamdetails.bit_depth > self.pcm_bit_depth:
+                self.signal_next = True
+                self.logger.debug(
+                    "Abort queue stream %s due to bit depth mismatch",
+                    self.queue.player.name,
+                )
+                break
+            if (
+                not self.pcm_resample
+                and streamdetails.sample_rate > self.pcm_sample_rate
+                and streamdetails.sample_rate <= self.queue.max_sample_rate
+            ):
+                self.logger.debug(
+                    "Abort queue stream %s due to sample rate mismatch",
+                    self.queue.player.name,
+                )
+                self.signal_next = True
+                break
+
+            # check crossfade ability
+            use_crossfade = self.queue.settings.crossfade_mode != CrossFadeMode.DISABLED
+            if (
+                prev_track is not None
+                and prev_track.media_type == MediaType.TRACK
+                and queue_track.media_type == MediaType.TRACK
+            ):
+                prev_item = await self.mass.music.get_item_by_uri(prev_track.uri)
+                new_item = await self.mass.music.get_item_by_uri(queue_track.uri)
+                if (
+                    prev_item.album is not None
+                    and new_item.album is not None
+                    and prev_item.album == new_item.album
+                ):
+                    use_crossfade = False
+            prev_track = queue_track
+
+            sample_size = int(
+                self.pcm_sample_rate * (self.pcm_bit_depth / 8) * self.pcm_channels
+            )  # 1 second
+            buffer_size = sample_size * (self.queue.settings.crossfade_duration or 2)
+            # force small buffer for radio to prevent too much lag at start
+            if queue_track.media_type != MediaType.TRACK:
+                use_crossfade = False
+                buffer_size = sample_size
+
+            self.logger.info(
+                "Start Streaming queue track: %s (%s) for queue %s",
+                queue_track.uri,
+                queue_track.name,
+                self.queue.player.name,
+            )
+            queue_track.streamdetails.seconds_skipped = seek_position
+            fade_in_part = b""
+            cur_chunk = 0
+            prev_chunk = None
+            bytes_written = 0
+            # handle incoming audio chunks
+            async for chunk in get_media_stream(
+                self.mass,
+                streamdetails,
+                pcm_fmt,
+                pcm_sample_rate=self.pcm_sample_rate,
+                chunk_size=buffer_size,
+                seek_position=seek_position,
+            ):
+                cur_chunk += 1
+                is_last_chunk = len(chunk) < buffer_size
+
+                # HANDLE FIRST PART OF TRACK
+                if len(chunk) == 0 and bytes_written == 0 and is_last_chunk:
+                    # stream error: got empy first chunk ?!
+                    self.logger.warning("Stream error on %s", queue_track.uri)
+                elif cur_chunk == 1 and last_fadeout_data:
+                    prev_chunk = chunk
+                    del chunk
+                elif cur_chunk == 1 and fade_in:
+                    # fadein first chunk
+                    fadein_first_part = await fadein_pcm_part(
+                        chunk, fade_in, pcm_fmt, self.pcm_sample_rate
+                    )
+                    yield fadein_first_part
+                    bytes_written += len(fadein_first_part)
+                    del chunk
+                    del fadein_first_part
+                elif cur_chunk <= 2 and not last_fadeout_data:
+                    # no fadeout_part available so just pass it to the output directly
+                    yield chunk
+                    bytes_written += len(chunk)
+                    del chunk
+                # HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN
+                elif cur_chunk == 2 and last_fadeout_data:
+                    # combine the first 2 chunks and strip off silence
+                    first_part = await strip_silence(
+                        prev_chunk + chunk, pcm_fmt, self.pcm_sample_rate
+                    )
+                    if len(first_part) < buffer_size:
+                        # part is too short after the strip action?!
+                        # so we just use the full first part
+                        first_part = prev_chunk + chunk
+                    fade_in_part = first_part[:buffer_size]
+                    remaining_bytes = first_part[buffer_size:]
+                    del first_part
+                    # do crossfade
+                    crossfade_part = await crossfade_pcm_parts(
+                        fade_in_part,
+                        last_fadeout_data,
+                        self.queue.settings.crossfade_duration,
+                        pcm_fmt,
+                        self.pcm_sample_rate,
+                    )
+                    # send crossfade_part
+                    yield crossfade_part
+                    bytes_written += len(crossfade_part)
+                    del crossfade_part
+                    del fade_in_part
+                    last_fadeout_data = b""
+                    # also write the leftover bytes from the strip action
+                    yield remaining_bytes
+                    bytes_written += len(remaining_bytes)
+                    del remaining_bytes
+                    del chunk
+                    prev_chunk = None  # needed to prevent this chunk being sent again
+                # HANDLE LAST PART OF TRACK
+                elif prev_chunk and is_last_chunk:
+                    # last chunk received so create the last_part
+                    # with the previous chunk and this chunk
+                    # and strip off silence
+                    last_part = await strip_silence(
+                        prev_chunk + chunk, pcm_fmt, self.pcm_sample_rate, True
+                    )
+                    if len(last_part) < buffer_size:
+                        # part is too short after the strip action
+                        # so we just use the entire original data
+                        last_part = prev_chunk + chunk
+                    if not use_crossfade or len(last_part) < buffer_size:
+                        if use_crossfade:
+                            self.logger.debug("not enough data for crossfade")
+                        # crossfading is not enabled or not enough data,
+                        # so just pass the (stripped) audio data
+                        yield last_part
+                        bytes_written += len(last_part)
+                        del last_part
+                        del chunk
+                    else:
+                        # handle crossfading support
+                        # store fade section to be picked up for next track
+                        last_fadeout_data = last_part[-buffer_size:]
+                        remaining_bytes = last_part[:-buffer_size]
+                        # write remaining bytes
+                        if remaining_bytes:
+                            yield remaining_bytes
+                            bytes_written += len(remaining_bytes)
+                        del last_part
+                        del remaining_bytes
+                        del chunk
+                # MIDDLE PARTS OF TRACK
+                else:
+                    # middle part of the track
+                    # keep previous chunk in memory so we have enough
+                    # samples to perform the crossfade
+                    if prev_chunk:
+                        yield prev_chunk
+                        bytes_written += len(prev_chunk)
+                        prev_chunk = chunk
+                    else:
+                        prev_chunk = chunk
+                    del chunk
+            # end of the track reached
+            queue_track.streamdetails.seconds_streamed = bytes_written / sample_size
+            self.logger.debug(
+                "Finished Streaming queue track: %s (%s) on queue %s",
+                queue_track.uri,
+                queue_track.name,
+                self.queue.player.name,
+            )
+        # end of queue reached, pass last fadeout bits to final output
+        yield last_fadeout_data
+        del last_fadeout_data
+        # END OF QUEUE STREAM
+        self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name)
index d12d5afd9f3ea57df6287bf0c6a2e8ec3968eed3..41adbeb51e7dcc92c038b35cd61e8503b0488220 100644 (file)
@@ -3,29 +3,30 @@ from __future__ import annotations
 
 import asyncio
 import logging
+import os
+import re
 import struct
 from io import BytesIO
+from time import time
 from typing import TYPE_CHECKING, AsyncGenerator, List, Optional, Tuple
 
 import aiofiles
 
 from music_assistant.helpers.process import AsyncProcess, check_output
 from music_assistant.helpers.util import create_tempfile
-from music_assistant.models.enums import EventType, ProviderType
-from music_assistant.models.errors import AudioError, MediaNotFoundError
-from music_assistant.models.event import MassEvent
-from music_assistant.models.media_items import (
-    ContentType,
-    MediaType,
-    StreamDetails,
-    StreamType,
+from music_assistant.models.enums import ProviderType
+from music_assistant.models.errors import (
+    AudioError,
+    MediaNotFoundError,
+    MusicAssistantError,
 )
+from music_assistant.models.media_items import ContentType, MediaType, StreamDetails
 
 if TYPE_CHECKING:
     from music_assistant.mass import MusicAssistant
     from music_assistant.models.player_queue import QueueItem
 
-LOGGER = logging.getLogger("audio")
+LOGGER = logging.getLogger(__name__)
 
 # pylint:disable=consider-using-f-string
 
@@ -37,60 +38,36 @@ async def crossfade_pcm_parts(
     fmt: ContentType,
     sample_rate: int,
 ) -> bytes:
-    """Crossfade two chunks of pcm/raw audio using sox."""
-    _, ffmpeg_present = await check_audio_support()
-
-    # prefer ffmpeg implementation (due to simplicity)
-    if ffmpeg_present:
-        fadeoutfile = create_tempfile()
-        async with aiofiles.open(fadeoutfile.name, "wb") as outfile:
-            await outfile.write(fade_out_part)
-        # input args
-        args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
-        args += [
-            "-f",
-            fmt.value,
-            "-ac",
-            "2",
-            "-ar",
-            str(sample_rate),
-            "-i",
-            fadeoutfile.name,
-        ]
-        args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"]
-        # filter args
-        args += ["-filter_complex", f"[0][1]acrossfade=d={fade_length}"]
-        # output args
-        args += ["-f", fmt.value, "-"]
-        async with AsyncProcess(args, True) as proc:
-            crossfade_data, _ = await proc.communicate(fade_in_part)
-            return crossfade_data
-
-    # sox based implementation
-    sox_args = [fmt.sox_format(), "-c", "2", "-r", str(sample_rate)]
-    # create fade-in part
-    fadeinfile = create_tempfile()
-    args = ["sox", "--ignore-length", "-t"] + sox_args
-    args += ["-", "-t"] + sox_args + [fadeinfile.name, "fade", "t", str(fade_length)]
-    async with AsyncProcess(args, enable_write=True) as sox_proc:
-        await sox_proc.communicate(fade_in_part)
-    # create fade-out part
+    """Crossfade two chunks of pcm/raw audio using ffmpeg."""
     fadeoutfile = create_tempfile()
-    args = ["sox", "--ignore-length", "-t"] + sox_args + ["-", "-t"] + sox_args
-    args += [fadeoutfile.name, "reverse", "fade", "t", str(fade_length), "reverse"]
-    async with AsyncProcess(args, enable_write=True) as sox_proc:
-        await sox_proc.communicate(fade_out_part)
-    # create crossfade using sox and some temp files
-    # TODO: figure out how to make this less complex and without the tempfiles
-    args = ["sox", "-m", "-v", "1.0", "-t"] + sox_args + [fadeoutfile.name, "-v", "1.0"]
-    args += ["-t"] + sox_args + [fadeinfile.name, "-t"] + sox_args + ["-"]
-    async with AsyncProcess(args, enable_write=False) as sox_proc:
-        crossfade_part, _ = await sox_proc.communicate()
-    fadeinfile.close()
-    fadeoutfile.close()
-    del fadeinfile
-    del fadeoutfile
-    return crossfade_part
+    async with aiofiles.open(fadeoutfile.name, "wb") as outfile:
+        await outfile.write(fade_out_part)
+    # input args
+    args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
+    args += [
+        "-f",
+        fmt.value,
+        "-ac",
+        "2",
+        "-ar",
+        str(sample_rate),
+        "-i",
+        fadeoutfile.name,
+    ]
+    args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"]
+    # filter args
+    args += ["-filter_complex", f"[0][1]acrossfade=d={fade_length}"]
+    # output args
+    args += ["-f", fmt.value, "-"]
+    async with AsyncProcess(args, True) as proc:
+        crossfade_data, _ = await proc.communicate(fade_in_part)
+        LOGGER.debug(
+            "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - result: %s",
+            len(fade_in_part),
+            len(fade_out_part),
+            len(crossfade_data),
+        )
+        return crossfade_data
 
 
 async def fadein_pcm_part(
@@ -125,34 +102,24 @@ async def strip_silence(
     audio_data: bytes, fmt: ContentType, sample_rate: int, reverse=False
 ) -> bytes:
     """Strip silence from (a chunk of) pcm audio."""
-    _, ffmpeg_present = await check_audio_support()
-    # prefer ffmpeg implementation
-    if ffmpeg_present:
-        # input args
-        args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
-        args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"]
-        # filter args
-        if reverse:
-            args += ["-af", "areverse,silenceremove=1:0:-50dB:detection=peak,areverse"]
-        else:
-            args += ["-af", "silenceremove=1:0:-50dB:detection=peak"]
-        # output args
-        args += ["-f", fmt.value, "-"]
-        async with AsyncProcess(args, True) as proc:
-            stripped_data, _ = await proc.communicate(audio_data)
-            return stripped_data
-
-    # sox implementation
-    sox_args = [fmt.sox_format(), "-c", "2", "-r", str(sample_rate)]
-    args = ["sox", "--ignore-length", "-t"] + sox_args + ["-", "-t"] + sox_args + ["-"]
-    if reverse:
-        args.append("reverse")
-    args += ["silence", "1", "0.1", "1%"]
+    # input args
+    args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
+    args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"]
+    # filter args
     if reverse:
-        args.append("reverse")
-    async with AsyncProcess(args, enable_write=True) as sox_proc:
-        stripped_data, _ = await sox_proc.communicate(audio_data)
-    return stripped_data
+        args += ["-af", "areverse,silenceremove=1:0:-50dB:detection=peak,areverse"]
+    else:
+        args += ["-af", "silenceremove=1:0:-50dB:detection=peak"]
+    # output args
+    args += ["-f", fmt.value, "-"]
+    async with AsyncProcess(args, True) as proc:
+        stripped_data, _ = await proc.communicate(audio_data)
+        LOGGER.debug(
+            "stripped silence of pcm chunk. size before: %s - after: %s",
+            len(audio_data),
+            len(stripped_data),
+        )
+        return stripped_data
 
 
 async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> None:
@@ -162,57 +129,62 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N
         # only when needed we do the analyze job
         return
 
-    LOGGER.debug(
-        "Start analyzing track %s/%s",
-        streamdetails.provider.value,
-        streamdetails.item_id,
-    )
+    LOGGER.debug("Start analyzing track %s", streamdetails.uri)
     # calculate BS.1770 R128 integrated loudness with ffmpeg
-    if streamdetails.type == StreamType.EXECUTABLE:
-        proc_args = (
-            "%s | ffmpeg -i pipe: -af ebur128=framelog=verbose -f null - 2>&1 | awk '/I:/{print $2}'"
-            % streamdetails.path
-        )
-    else:
-        proc_args = (
-            "ffmpeg -i '%s' -af ebur128=framelog=verbose -f null - 2>&1 | awk '/I:/{print $2}'"
-            % streamdetails.path
-        )
-    audio_data = b""
-    if streamdetails.media_type == MediaType.RADIO:
-        proc_args = "ffmpeg -i pipe: -af ebur128=framelog=verbose -f null - 2>&1 | awk '/I:/{print $2}'"
-        # for radio we collect ~10 minutes of audio data to process
-        async with mass.http_session.get(streamdetails.path) as response:
-            async for chunk, _ in response.content.iter_chunks():
-                audio_data += chunk
-                if len(audio_data) >= 20000:
+    started = time()
+    proc_args = [
+        "ffmpeg",
+        "-f",
+        streamdetails.content_type.value,
+        "-i",
+        "-",
+        "-af",
+        "ebur128=framelog=verbose",
+        "-f",
+        "null",
+        "-",
+    ]
+    async with AsyncProcess(proc_args, True, use_stderr=True) as ffmpeg_proc:
+
+        async def writer():
+            """Task that grabs the source audio and feeds it to ffmpeg."""
+            music_prov = mass.music.get_provider(streamdetails.provider)
+            async for audio_chunk in music_prov.get_audio_stream(streamdetails):
+                await ffmpeg_proc.write(audio_chunk)
+                if (time() - started) > 300:
+                    # just in case of endless radio stream etc
                     break
+            ffmpeg_proc.write_eof()
 
-    proc = await asyncio.create_subprocess_shell(
-        proc_args,
-        stdout=asyncio.subprocess.PIPE,
-        stdin=asyncio.subprocess.PIPE if audio_data else None,
-    )
-    stdout, _ = await proc.communicate(audio_data or None)
-    try:
-        loudness = float(stdout.decode().strip())
-    except (ValueError, AttributeError):
-        LOGGER.warning(
-            "Could not determine integrated loudness of %s/%s - %s",
-            streamdetails.provider.value,
-            streamdetails.item_id,
-            stdout.decode() or "received empty value",
-        )
-    else:
-        await mass.music.set_track_loudness(
-            streamdetails.item_id, streamdetails.provider, loudness
-        )
-        LOGGER.debug(
-            "Integrated loudness of %s/%s is: %s",
-            streamdetails.provider.value,
-            streamdetails.item_id,
-            loudness,
-        )
+        writer_task = ffmpeg_proc.attach_task(writer())
+        # wait for the writer task to finish
+        await writer_task
+
+        _, stderr = await ffmpeg_proc.communicate()
+        try:
+            loudness_str = (
+                stderr.decode()
+                .split("Integrated loudness")[1]
+                .split("I:")[1]
+                .split("LUFS")[0]
+            )
+            loudness = float(loudness_str.strip())
+        except (ValueError, AttributeError):
+            LOGGER.warning(
+                "Could not determine integrated loudness of %s - %s",
+                streamdetails.uri,
+                stderr.decode() or "received empty value",
+            )
+        else:
+            streamdetails.loudness = loudness
+            await mass.music.set_track_loudness(
+                streamdetails.item_id, streamdetails.provider, loudness
+            )
+            LOGGER.debug(
+                "Integrated loudness of %s is: %s",
+                streamdetails.uri,
+                loudness,
+            )
 
 
 async def get_stream_details(
@@ -226,16 +198,17 @@ async def get_stream_details(
         param media_item: The MediaItem (track/radio) for which to request the streamdetails for.
         param queue_id: Optionally provide the queue_id which will play this stream.
     """
-    if not queue_item.media_item:
-        # special case: a plain url was added to the queue
-        streamdetails = StreamDetails(
-            type=StreamType.URL,
-            provider=ProviderType.URL,
-            item_id=queue_item.item_id,
-            path=queue_item.uri,
-            content_type=ContentType.try_parse(queue_item.uri),
-        )
+    if queue_item.streamdetails and (time() < queue_item.streamdetails.expires):
+        # we already have fresh streamdetails, use these
+        queue_item.streamdetails.seconds_skipped = 0
+        queue_item.streamdetails.seconds_streamed = 0
+        streamdetails = queue_item.streamdetails
+    elif queue_item.media_type == MediaType.URL:
+        # handle URL provider items
+        url_prov = mass.music.get_provider(ProviderType.URL)
+        streamdetails = await url_prov.get_stream_details(queue_item.uri)
     else:
+        # media item: fetch streamdetails from provider
         # always request the full db track as there might be other qualities available
         full_item = await mass.music.get_item_by_uri(queue_item.uri)
         # sort by quality and check track availability
@@ -248,29 +221,28 @@ async def get_stream_details(
             music_prov = mass.music.get_provider(prov_media.prov_id)
             if not music_prov or not music_prov.available:
                 continue  # provider temporary unavailable ?
-
-            streamdetails: StreamDetails = await music_prov.get_stream_details(
-                prov_media.item_id
-            )
-            if streamdetails:
-                try:
-                    streamdetails.content_type = ContentType(streamdetails.content_type)
-                except KeyError:
-                    LOGGER.warning("Invalid content type!")
-                else:
-                    break
+            try:
+                streamdetails: StreamDetails = await music_prov.get_stream_details(
+                    prov_media.item_id
+                )
+                streamdetails.content_type = ContentType(streamdetails.content_type)
+            except MusicAssistantError as err:
+                LOGGER.warning(str(err))
+            else:
+                break
 
     if not streamdetails:
         raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}")
 
-    # set player_id on the streamdetails so we know what players stream
+    # set queue_id on the streamdetails so we know what is being streamed
     streamdetails.queue_id = queue_id
     # get gain correct / replaygain
-    loudness, gain_correct = await get_gain_correct(
-        mass, queue_id, streamdetails.item_id, streamdetails.provider
-    )
-    streamdetails.gain_correct = gain_correct
-    streamdetails.loudness = loudness
+    if not streamdetails.gain_correct:
+        loudness, gain_correct = await get_gain_correct(mass, streamdetails)
+        streamdetails.gain_correct = gain_correct
+        streamdetails.loudness = loudness
+    if not streamdetails.duration:
+        streamdetails.duration = queue_item.duration
     # set streamdetails as attribute on the media_item
     # this way the app knows what content is playing
     queue_item.streamdetails = streamdetails
@@ -278,17 +250,23 @@ async def get_stream_details(
 
 
 async def get_gain_correct(
-    mass: MusicAssistant, queue_id: str, item_id: str, provider: ProviderType
+    mass: MusicAssistant, streamdetails: StreamDetails
 ) -> Tuple[float, float]:
     """Get gain correction for given queue / track combination."""
-    queue = mass.players.get_player_queue(queue_id)
+    queue = mass.players.get_player_queue(streamdetails.queue_id)
     if not queue or not queue.settings.volume_normalization_enabled:
         return (0, 0)
+    if streamdetails.gain_correct is not None:
+        return (streamdetails.loudness, streamdetails.gain_correct)
     target_gain = queue.settings.volume_normalization_target
-    track_loudness = await mass.music.get_track_loudness(item_id, provider)
+    track_loudness = await mass.music.get_track_loudness(
+        streamdetails.item_id, streamdetails.provider
+    )
     if track_loudness is None:
         # fallback to provider average
-        fallback_track_loudness = await mass.music.get_provider_loudness(provider)
+        fallback_track_loudness = await mass.music.get_provider_loudness(
+            streamdetails.provider
+        )
         if fallback_track_loudness is None:
             # fallback to some (hopefully sane) average value for now
             fallback_track_loudness = -8.5
@@ -353,218 +331,249 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=
     return file.getvalue()
 
 
-async def get_sox_args(
+async def get_ffmpeg_args(
     streamdetails: StreamDetails,
     output_format: Optional[ContentType] = None,
-    resample: Optional[int] = None,
-    seek_position: Optional[int] = None,
+    pcm_sample_rate: Optional[int] = None,
+    pcm_channels: int = 2,
 ) -> List[str]:
-    """Collect all args to send to the sox (or ffmpeg) process."""
-    stream_path = streamdetails.path
-    stream_type = StreamType(streamdetails.type)
+    """Collect all args to send to the ffmpeg process."""
     input_format = streamdetails.content_type
     if output_format is None:
         output_format = input_format
 
-    sox_present, ffmpeg_present = await check_audio_support()
-    use_ffmpeg = not sox_present or not input_format.sox_supported() or seek_position
+    ffmpeg_present, libsoxr_support = await check_audio_support()
 
-    # use ffmpeg if content not supported by SoX (e.g. AAC radio streams)
-    if use_ffmpeg:
-        if not ffmpeg_present:
-            raise AudioError(
-                "FFmpeg binary is missing from system."
-                "Please install ffmpeg on your OS to enable playback.",
-            )
-        # collect input args
-        if stream_type == StreamType.EXECUTABLE:
-            # stream from executable
-            input_args = [
-                stream_path,
-                "|",
-                "ffmpeg",
-                "-hide_banner",
-                "-loglevel",
-                "error",
-                "-f",
-                input_format.value,
-                "-i",
-                "-",
-            ]
-        else:
-            input_args = [
-                "ffmpeg",
-                "-hide_banner",
-                "-loglevel",
-                "error",
-                "-i",
-                stream_path,
-            ]
-        if seek_position:
-            input_args += ["-ss", str(seek_position)]
-        # collect output args
-        if output_format.is_pcm():
-            output_args = [
-                "-f",
-                output_format.value,
-                "-c:a",
-                output_format.name.lower(),
-                "-",
-            ]
-        else:
-            output_args = ["-f", output_format.value, "-"]
-        # collect filter args
-        filter_args = []
-        if streamdetails.gain_correct:
-            filter_args += ["-filter:a", f"volume={streamdetails.gain_correct}dB"]
-        if resample or input_format.is_pcm():
-            filter_args += ["-ar", str(resample)]
-        return input_args + filter_args + output_args
-
-    # Prefer SoX for all other (=highest quality)
-    if stream_type == StreamType.EXECUTABLE:
-        # stream from executable
-        input_args = [
-            stream_path,
-            "|",
-            "sox",
-            "-t",
-            input_format.sox_format(),
-        ]
-        if input_format.is_pcm():
-            input_args += [
-                "-r",
-                str(streamdetails.sample_rate),
-                "-c",
-                str(streamdetails.channels),
-            ]
-        input_args.append("-")
-    else:
-        input_args = ["sox", "-t", input_format.sox_format(), stream_path]
+    if not ffmpeg_present:
+        raise AudioError(
+            "FFmpeg binary is missing from system."
+            "Please install ffmpeg on your OS to enable playback.",
+        )
+    # collect input args
+    input_args = [
+        "ffmpeg",
+        "-hide_banner",
+        "-loglevel",
+        "error",
+        "-ignore_unknown",
+    ]
+    if streamdetails.content_type != ContentType.UNKNOWN:
+        input_args += ["-f", input_format.value]
+    input_args += ["-i", "-"]
     # collect output args
     if output_format.is_pcm():
-        output_args = ["-t", output_format.sox_format(), "-c", "2", "-"]
-    elif output_format == ContentType.FLAC:
-        output_args = ["-t", "flac", "-C", "0", "-"]
+        output_args = [
+            "-acodec",
+            output_format.name.lower(),
+            "-f",
+            output_format.value,
+            "-ac",
+            str(pcm_channels),
+            "-ar",
+            str(pcm_sample_rate),
+            "-",
+        ]
     else:
-        output_args = ["-t", output_format.sox_format(), "-"]
-    # collect filter args
-    filter_args = []
+        output_args = ["-f", output_format.value, "-"]
+    # collect extra and filter args
+    extra_args = []
+    filter_params = []
     if streamdetails.gain_correct:
-        filter_args += ["vol", str(streamdetails.gain_correct), "dB"]
-    if resample and streamdetails.media_type != MediaType.RADIO:
-        # use extra high quality resampler only if it makes sense
-        filter_args += ["rate", "-v", str(resample)]
-    elif resample:
-        filter_args += ["rate", str(resample)]
-    return input_args + output_args + filter_args
+        filter_params.append(f"volume={streamdetails.gain_correct}dB")
+    if (
+        pcm_sample_rate is not None
+        and streamdetails.sample_rate != pcm_sample_rate
+        and libsoxr_support
+        and streamdetails.media_type == MediaType.TRACK
+    ):
+        # prefer libsoxr high quality resampler (if present) for sample rate conversions
+        filter_params.append("aresample=resampler=soxr")
+    if filter_params:
+        extra_args += ["-af", ",".join(filter_params)]
+
+    if pcm_sample_rate is not None and not output_format.is_pcm():
+        extra_args += ["-ar", str(pcm_sample_rate)]
+
+    return input_args + extra_args + output_args
 
 
 async def get_media_stream(
     mass: MusicAssistant,
     streamdetails: StreamDetails,
     output_format: Optional[ContentType] = None,
-    resample: Optional[int] = None,
+    pcm_sample_rate: Optional[int] = None,
     chunk_size: Optional[int] = None,
-    seek_position: Optional[int] = None,
-) -> AsyncGenerator[Tuple[bool, bytes], None]:
+    seek_position: int = 0,
+) -> AsyncGenerator[bytes, None]:
     """Get the audio stream for the given streamdetails."""
 
-    if chunk_size is None:
-        if streamdetails.content_type in (
-            ContentType.AAC,
-            ContentType.M4A,
-            ContentType.MP3,
-            ContentType.OGG,
-        ):
-            chunk_size = 32000
-        else:
-            chunk_size = 256000
-
-    mass.signal_event(
-        MassEvent(
-            EventType.STREAM_STARTED,
-            object_id=streamdetails.provider.value,
-            data=streamdetails,
-        )
-    )
-    args = await get_sox_args(streamdetails, output_format, resample, seek_position)
-    async with AsyncProcess(args) as sox_proc:
+    args = await get_ffmpeg_args(streamdetails, output_format, pcm_sample_rate)
+    async with AsyncProcess(
+        args, enable_write=True, chunk_size=chunk_size
+    ) as ffmpeg_proc:
 
         LOGGER.debug(
-            "start media stream for: %s/%s (%s)",
-            streamdetails.provider,
-            streamdetails.item_id,
-            streamdetails.type,
+            "start media stream for: %s, using args: %s", streamdetails.uri, str(args)
         )
 
+        async def writer():
+            """Task that grabs the source audio and feeds it to ffmpeg."""
+            LOGGER.debug("writer started for %s", streamdetails.uri)
+            music_prov = mass.music.get_provider(streamdetails.provider)
+            async for audio_chunk in music_prov.get_audio_stream(
+                streamdetails, seek_position
+            ):
+                await ffmpeg_proc.write(audio_chunk)
+            # write eof when last packet is received
+            ffmpeg_proc.write_eof()
+            LOGGER.debug("writer finished for %s", streamdetails.uri)
+
+        ffmpeg_proc.attach_task(writer())
+
         # yield chunks from stdout
-        # we keep 1 chunk behind to detect end of stream properly
         try:
-            prev_chunk = b""
-            async for chunk in sox_proc.iterate_chunks(chunk_size):
-                if prev_chunk:
-                    yield (False, prev_chunk)
-                prev_chunk = chunk
-            # send last chunk
-            yield (True, prev_chunk)
+            async for chunk in ffmpeg_proc.iterate_chunks():
+                yield chunk
         except (asyncio.CancelledError, GeneratorExit) as err:
-            LOGGER.debug(
-                "media stream aborted for: %s/%s",
-                streamdetails.provider,
-                streamdetails.item_id,
-            )
+            LOGGER.debug("media stream aborted for: %s", streamdetails.uri)
             raise err
         else:
-            LOGGER.debug(
-                "finished media stream for: %s/%s",
-                streamdetails.provider,
-                streamdetails.item_id,
-            )
+            LOGGER.debug("finished media stream for: %s", streamdetails.uri)
             await mass.music.mark_item_played(
                 streamdetails.item_id, streamdetails.provider
             )
+        finally:
             # send analyze job to background worker
-            if (
-                streamdetails.loudness is None
-                and streamdetails.provider != ProviderType.URL
-            ):
-                uri = f"{streamdetails.provider.value}://{streamdetails.media_type.value}/{streamdetails.item_id}"
+            if streamdetails.loudness is None:
                 mass.add_job(
-                    analyze_audio(mass, streamdetails), f"Analyze audio for {uri}"
-                )
-        finally:
-            mass.signal_event(
-                MassEvent(
-                    EventType.STREAM_ENDED,
-                    object_id=streamdetails.provider.value,
-                    data=streamdetails,
+                    analyze_audio(mass, streamdetails),
+                    f"Analyze audio for {streamdetails.uri}",
                 )
+
+
+async def get_radio_stream(
+    mass: MusicAssistant, url: str, streamdetails: StreamDetails
+) -> AsyncGenerator[bytes, None]:
+    """Get radio audio stream from HTTP, including metadata retrieval."""
+    headers = {"Icy-MetaData": "1"}
+    while True:
+        # in loop to reconnect on connection failure
+        LOGGER.debug("radio stream (re)connecting to: %s", url)
+        async with mass.http_session.get(url, headers=headers) as resp:
+            headers = resp.headers
+            meta_int = int(headers.get("icy-metaint", "0"))
+            # stream with ICY Metadata
+            if meta_int:
+                while True:
+                    audio_chunk = await resp.content.readexactly(meta_int)
+                    yield audio_chunk
+                    meta_byte = await resp.content.readexactly(1)
+                    meta_length = ord(meta_byte) * 16
+                    meta_data = await resp.content.readexactly(meta_length)
+                    if not meta_data:
+                        continue
+                    meta_data = meta_data.rstrip(b"\0")
+                    stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
+                    if not stream_title:
+                        continue
+                    stream_title = stream_title.group(1).decode()
+                    if stream_title != streamdetails.stream_title:
+                        streamdetails.stream_title = stream_title
+                        if queue := mass.players.get_player_queue(
+                            streamdetails.queue_id
+                        ):
+                            queue.signal_update()
+            # Regular HTTP stream
+            else:
+                async for chunk in resp.content.iter_any():
+                    yield chunk
+
+
+async def get_http_stream(
+    mass: MusicAssistant,
+    url: str,
+    streamdetails: StreamDetails,
+    seek_position: int = 0,
+) -> AsyncGenerator[bytes, None]:
+    """Get audio stream from HTTP."""
+    if seek_position:
+        assert streamdetails.duration, "Duration required for seek requests"
+    chunk_size = get_chunksize(streamdetails.content_type)
+    # try to get filesize with a head request
+    if seek_position and not streamdetails.size:
+        async with mass.http_session.head(url) as resp:
+            if size := resp.headers.get("Content-Length"):
+                streamdetails.size = int(size)
+    # headers
+    headers = {}
+    skip_bytes = 0
+    if seek_position and streamdetails.size:
+        skip_bytes = int(streamdetails.size / streamdetails.duration * seek_position)
+        headers["Range"] = f"bytes={skip_bytes}-"
+
+    # start the streaming from http
+    buffer = b""
+    buffer_all = False
+    bytes_received = 0
+    async with mass.http_session.get(url, headers=headers) as resp:
+        is_partial = resp.status == 206
+        buffer_all = seek_position and not is_partial
+        async for chunk in resp.content.iter_chunked(chunk_size):
+            bytes_received += len(chunk)
+            if buffer_all and not skip_bytes:
+                buffer += chunk
+                continue
+            if not is_partial and skip_bytes and bytes_received < skip_bytes:
+                continue
+            yield chunk
+
+    # store size on streamdetails for later use
+    if not streamdetails.size:
+        streamdetails.size = bytes_received
+    if buffer_all:
+        skip_bytes = streamdetails.size / streamdetails.duration * seek_position
+        yield buffer[:skip_bytes]
+        del buffer
+
+
+async def get_file_stream(
+    mass: MusicAssistant,
+    filename: str,
+    streamdetails: StreamDetails,
+    seek_position: int = 0,
+) -> AsyncGenerator[bytes, None]:
+    """Get audio stream from local accessible file."""
+    if seek_position:
+        assert streamdetails.duration, "Duration required for seek requests"
+    if not streamdetails.size:
+        stat = await mass.loop.run_in_executor(None, os.stat, filename)
+        streamdetails.size = stat.st_size
+    chunk_size = get_chunksize(streamdetails.content_type)
+    async with aiofiles.open(streamdetails.data, "rb") as _file:
+        if seek_position:
+            seek_pos = int(
+                (streamdetails.size / streamdetails.duration) * seek_position
             )
+            await _file.seek(seek_pos)
+        # yield chunks of data from file
+        while True:
+            data = await _file.read(chunk_size)
+            if not data:
+                break
+            yield data
 
 
-async def check_audio_support(try_install: bool = False) -> Tuple[bool, bool, bool]:
-    """Check if sox and/or ffmpeg are present."""
+async def check_audio_support(try_install: bool = False) -> Tuple[bool, bool]:
+    """Check if ffmpeg is present (with/without libsoxr support)."""
     cache_key = "audio_support_cache"
     if cache := globals().get(cache_key):
         return cache
-    # check for SoX presence
-    returncode, output = await check_output("sox --version")
-    sox_present = returncode == 0 and "SoX" in output.decode()
-    if not sox_present and try_install:
-        # try a few common ways to install SoX
-        # this all assumes we have enough rights and running on a linux based platform (or docker)
-        await check_output("apt-get update && apt-get install sox libsox-fmt-all")
-        await check_output("apk add sox")
-        # test again
-        returncode, output = await check_output("sox --version")
-        sox_present = returncode == 0 and "SoX" in output.decode()
 
     # check for FFmpeg presence
     returncode, output = await check_output("ffmpeg -version")
     ffmpeg_present = returncode == 0 and "FFmpeg" in output.decode()
     if not ffmpeg_present and try_install:
-        # try a few common ways to install SoX
+        # try a few common ways to install ffmpeg
         # this all assumes we have enough rights and running on a linux based platform (or docker)
         await check_output("apt-get update && apt-get install ffmpeg")
         await check_output("apk add ffmpeg")
@@ -573,67 +582,35 @@ async def check_audio_support(try_install: bool = False) -> Tuple[bool, bool, bo
         ffmpeg_present = returncode == 0 and "FFmpeg" in output.decode()
 
     # use globals as in-memory cache
-    result = (sox_present, ffmpeg_present)
+    libsoxr_support = "enable-libsoxr" in output.decode()
+    result = (ffmpeg_present, libsoxr_support)
     globals()[cache_key] = result
     return result
 
 
-async def get_sox_args_for_pcm_stream(
+async def get_ffmpeg_args_for_pcm_stream(
     sample_rate: int,
     bit_depth: int,
     channels: int,
     floating_point: bool = False,
     output_format: ContentType = ContentType.FLAC,
 ) -> List[str]:
-    """Collect args for sox (or ffmpeg) when converting from raw pcm to another contenttype."""
-
-    sox_present, ffmpeg_present = await check_audio_support()
+    """Collect args for ffmpeg when converting from raw pcm to another contenttype."""
     input_format = ContentType.from_bit_depth(bit_depth, floating_point)
-    sox_present = True
-
-    # use ffmpeg if sox is not present
-    if not sox_present:
-        if not ffmpeg_present:
-            raise AudioError(
-                "FFmpeg binary is missing from system. "
-                "Please install ffmpeg on your OS to enable playback.",
-            )
-        # collect input args
-        input_args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
-        input_args += [
-            "-f",
-            input_format.value,
-            "-ac",
-            str(channels),
-            "-ar",
-            str(sample_rate),
-            "-i",
-            "-",
-        ]
-        # collect output args
-        output_args = ["-f", output_format.value, "-"]
-        return input_args + output_args
-
-    # Prefer SoX for all other (=highest quality)
-
     # collect input args
-    input_args = [
-        "sox",
-        "-t",
-        input_format.sox_format(),
-        "-r",
-        str(sample_rate),
-        "-b",
-        str(bit_depth),
-        "-c",
+    input_args = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-ignore_unknown"]
+    input_args += [
+        "-f",
+        input_format.value,
+        "-ac",
         str(channels),
+        "-ar",
+        str(sample_rate),
+        "-i",
         "-",
     ]
-    #  collect output args
-    if output_format == ContentType.FLAC:
-        output_args = ["-t", "flac", "-C", "0", "-"]
-    else:
-        output_args = ["-t", output_format.sox_format(), "-"]
+    # collect output args
+    output_args = ["-f", output_format.value, "-"]
     return input_args + output_args
 
 
@@ -641,53 +618,53 @@ async def get_preview_stream(
     mass: MusicAssistant,
     provider_id: str,
     track_id: str,
-) -> AsyncGenerator[Tuple[bool, bytes], None]:
-    """Get the audio stream for the given streamdetails."""
+) -> AsyncGenerator[bytes, None]:
+    """Create a 30 seconds preview audioclip for the given streamdetails."""
     music_prov = mass.music.get_provider(provider_id)
 
     streamdetails = await music_prov.get_stream_details(track_id)
 
-    if streamdetails.type == StreamType.EXECUTABLE:
-        # stream from executable
-        input_args = [
-            streamdetails.path,
-            "|",
-            "ffmpeg",
-            "-hide_banner",
-            "-loglevel",
-            "error",
-            "-f",
-            streamdetails.content_type.value,
-            "-i",
-            "-",
-        ]
-    else:
-        input_args = [
-            "ffmpeg",
-            "-hide_banner",
-            "-loglevel",
-            "error",
-            "-i",
-            streamdetails.path,
-        ]
-    output_args = ["-ss", "30", "-to", "60", "-f", "mp3", "-"]
-    async with AsyncProcess(input_args + output_args) as proc:
+    input_args = [
+        "ffmpeg",
+        "-hide_banner",
+        "-loglevel",
+        "error",
+        "-f",
+        streamdetails.content_type.value,
+        "-i",
+        "-",
+    ]
+    output_args = ["-to", "30", "-f", "mp3", "-"]
+    args = input_args + output_args
+    async with AsyncProcess(args, True) as ffmpeg_proc:
+
+        async def writer():
+            """Task that grabs the source audio and feeds it to ffmpeg."""
+            music_prov = mass.music.get_provider(streamdetails.provider)
+            async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30):
+                await ffmpeg_proc.write(audio_chunk)
+            # write eof when last packet is received
+            ffmpeg_proc.write_eof()
+
+        ffmpeg_proc.attach_task(writer())
 
         # yield chunks from stdout
-        # we keep 1 chunk behind to detect end of stream properly
-        try:
-            prev_chunk = b""
-            async for chunk in proc.iterate_chunks():
-                if prev_chunk:
-                    yield (False, prev_chunk)
-                prev_chunk = chunk
-            # send last chunk
-            yield (True, prev_chunk)
-        finally:
-            mass.signal_event(
-                MassEvent(
-                    EventType.STREAM_ENDED,
-                    object_id=streamdetails.provider.value,
-                    data=streamdetails,
-                )
-            )
+        async for chunk in ffmpeg_proc.iterate_chunks():
+            yield chunk
+
+
+def get_chunksize(content_type: ContentType) -> int:
+    """Get a default chunksize for given contenttype."""
+    if content_type.is_pcm():
+        return 512000
+    if content_type in (
+        ContentType.AAC,
+        ContentType.M4A,
+    ):
+        return 32000
+    if content_type in (
+        ContentType.MP3,
+        ContentType.OGG,
+    ):
+        return 64000
+    return 256000
index 958e64b1d7ec1b4a6e7742ae541e644e3560a982..952637869d93e88c119112af372dc8215a4c0d9a 100644 (file)
@@ -12,22 +12,30 @@ from typing import AsyncGenerator, Coroutine, List, Optional, Tuple, Union
 
 from async_timeout import timeout as _timeout
 
-LOGGER = logging.getLogger("AsyncProcess")
+LOGGER = logging.getLogger(__name__)
 
-DEFAULT_CHUNKSIZE = 512000
+DEFAULT_CHUNKSIZE = 128000
 DEFAULT_TIMEOUT = 120
 
 
 class AsyncProcess:
     """Implementation of a (truly) non blocking subprocess."""
 
-    def __init__(self, args: Union[List, str], enable_write: bool = False):
+    def __init__(
+        self,
+        args: Union[List, str],
+        enable_write: bool = False,
+        chunk_size: int = DEFAULT_CHUNKSIZE,
+        use_stderr: bool = False,
+    ):
         """Initialize."""
         self._proc = None
         self._args = args
+        self._use_stderr = use_stderr
         self._enable_write = enable_write
         self._attached_task: asyncio.Task = None
         self.closed = False
+        self.chunk_size = chunk_size or DEFAULT_CHUNKSIZE
 
     async def __aenter__(self) -> "AsyncProcess":
         """Enter context manager."""
@@ -40,16 +48,18 @@ class AsyncProcess:
             self._proc = await asyncio.create_subprocess_shell(
                 args,
                 stdin=asyncio.subprocess.PIPE if self._enable_write else None,
-                stdout=asyncio.subprocess.PIPE,
-                limit=DEFAULT_CHUNKSIZE * 10,
+                stdout=asyncio.subprocess.PIPE if not self._use_stderr else None,
+                stderr=asyncio.subprocess.PIPE if self._use_stderr else None,
+                limit=self.chunk_size * 5,
                 close_fds=True,
             )
         else:
             self._proc = await asyncio.create_subprocess_exec(
                 *args,
                 stdin=asyncio.subprocess.PIPE if self._enable_write else None,
-                stdout=asyncio.subprocess.PIPE,
-                limit=DEFAULT_CHUNKSIZE * 10,
+                stdout=asyncio.subprocess.PIPE if not self._use_stderr else None,
+                stderr=asyncio.subprocess.PIPE if self._use_stderr else None,
+                limit=self.chunk_size * 5,
                 close_fds=True,
             )
         return self
@@ -59,53 +69,37 @@ class AsyncProcess:
         self.closed = True
         if self._attached_task:
             # cancel the attached reader/writer task
-            self._attached_task.cancel()
-        if self._proc.returncode is None:
-            # prevent subprocess deadlocking, send terminate and read remaining bytes
             try:
-                self._proc.terminate()
-                # close stdin and let it drain
-                if self._enable_write:
-                    await self._proc.stdin.drain()
-                    self._proc.stdin.close()
-                # read remaining bytes
-                await self._proc.stdout.read()
-                # we really want to make this thing die ;-)
-                self._proc.kill()
-            except (
-                ProcessLookupError,
-                BrokenPipeError,
-                RuntimeError,
-                ConnectionResetError,
-            ):
+                self._attached_task.cancel()
+                await self._attached_task
+            except asyncio.CancelledError:
                 pass
-        del self._proc
+        if self._proc.returncode is None:
+            # prevent subprocess deadlocking, read remaining bytes
+            await self._proc.communicate(b"" if self._enable_write else None)
+            if self._proc.returncode is None:
+                # just in case?
+                self._proc.kill()
 
-    async def iterate_chunks(
-        self, chunk_size: int = DEFAULT_CHUNKSIZE, timeout: int = DEFAULT_TIMEOUT
-    ) -> AsyncGenerator[bytes, None]:
+    async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
         """Yield chunks from the process stdout. Generator."""
         while True:
-            chunk = await self.read(chunk_size, timeout)
-            if not chunk:
-                break
+            chunk = await self._read_chunk()
             yield chunk
-            if chunk_size is not None and len(chunk) < chunk_size:
+            if len(chunk) < self.chunk_size:
+                del chunk
                 break
+            del chunk
 
-    async def read(
-        self, chunk_size: int = DEFAULT_CHUNKSIZE, timeout: int = DEFAULT_TIMEOUT
-    ) -> bytes:
-        """Read x bytes from the process stdout."""
+    async def _read_chunk(self, timeout: int = DEFAULT_TIMEOUT) -> bytes:
+        """Read chunk_size bytes from the process stdout."""
+        if self.closed:
+            return b""
         try:
             async with _timeout(timeout):
-                if chunk_size is None:
-                    return await self._proc.stdout.read(DEFAULT_CHUNKSIZE)
-                return await self._proc.stdout.readexactly(chunk_size)
+                return await self._proc.stdout.readexactly(self.chunk_size)
         except asyncio.IncompleteReadError as err:
             return err.partial
-        except AttributeError as exc:
-            raise asyncio.CancelledError() from exc
         except asyncio.TimeoutError:
             return b""
 
@@ -116,26 +110,31 @@ class AsyncProcess:
         try:
             self._proc.stdin.write(data)
             await self._proc.stdin.drain()
-        except (AttributeError, AssertionError, BrokenPipeError):
+        except (
+            AttributeError,
+            AssertionError,
+            BrokenPipeError,
+            RuntimeError,
+            ConnectionResetError,
+        ) as err:
             # already exited, race condition
-            pass
+            raise asyncio.CancelledError() from err
 
     def write_eof(self) -> None:
         """Write end of file to to process stdin."""
-        try:
-            if self._proc.stdin.can_write_eof():
-                self._proc.stdin.write_eof()
-        except (AttributeError, AssertionError, BrokenPipeError):
-            # already exited, race condition
-            pass
+        if self.closed:
+            return
+        if self._proc.stdin.can_write_eof():
+            self._proc.stdin.write_eof()
 
     async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
         """Write bytes to process and read back results."""
         return await self._proc.communicate(input_data)
 
-    def attach_task(self, coro: Coroutine) -> None:
+    def attach_task(self, coro: Coroutine) -> asyncio.Task:
         """Attach given coro func as reader/writer task to properly cancel it when needed."""
-        self._attached_task = asyncio.create_task(coro)
+        self._attached_task = task = asyncio.create_task(coro)
+        return task
 
 
 async def check_output(shell_cmd: str) -> Tuple[int, bytes]:
index e889f92f8377abfe11146e2223296d721debdbe3..d61aeeee832cd7bcc9d76d0e44c07500dad3e5eb 100644 (file)
@@ -14,10 +14,11 @@ from uuid import uuid4
 
 import aiohttp
 
+from music_assistant.constants import ROOT_LOGGER_NAME
 from music_assistant.controllers.metadata import MetaDataController
 from music_assistant.controllers.music import MusicController
 from music_assistant.controllers.players import PlayerController
-from music_assistant.controllers.stream import StreamController
+from music_assistant.controllers.streams import StreamsController
 from music_assistant.helpers.cache import Cache
 from music_assistant.helpers.database import Database
 from music_assistant.models.background_job import BackgroundJob
@@ -51,7 +52,7 @@ class MusicAssistant:
         self.loop: asyncio.AbstractEventLoop = None
         self.http_session: aiohttp.ClientSession = session
         self.http_session_provided = session is not None
-        self.logger = logging.getLogger(__name__)
+        self.logger = logging.getLogger(ROOT_LOGGER_NAME)
 
         self._listeners = []
         self._jobs: Deque[BackgroundJob] = deque()
@@ -63,7 +64,7 @@ class MusicAssistant:
         self.metadata = MetaDataController(self)
         self.music = MusicController(self)
         self.players = PlayerController(self)
-        self.streams = StreamController(self)
+        self.streams = StreamsController(self)
         self._tracked_tasks: List[asyncio.Task] = []
         self.closed = False
 
index 4130970ef7c9bee70c6cf2ee904f530ae7cb55d3..f08b3d408da6abb8e653f559abb82534ff4fc94c 100644 (file)
@@ -44,6 +44,6 @@ class MassConfig:
     providers: List[MusicProviderConfig] = field(default_factory=list)
 
     # advanced settings
-    max_simultaneous_jobs: int = 5
+    max_simultaneous_jobs: int = 2
     stream_port: int = select_stream_port()
     stream_ip: str = get_ip()
index b1495e1dbf8fa72e288a247641b2b625cb01107e..b2be4df5bd48e4f8f60658caaefa83e742204097 100644 (file)
@@ -11,6 +11,7 @@ class MediaType(Enum):
     TRACK = "track"
     PLAYLIST = "playlist"
     RADIO = "radio"
+    URL = "url"
     UNKNOWN = "unknown"
 
 
@@ -68,15 +69,6 @@ class AlbumType(Enum):
     UNKNOWN = "unknown"
 
 
-class StreamType(Enum):
-    """Enum with stream types."""
-
-    EXECUTABLE = "executable"
-    URL = "url"
-    FILE = "file"
-    CACHE = "cache"
-
-
 class ContentType(Enum):
     """Enum with audio content/container types supported by ffmpeg."""
 
@@ -114,22 +106,6 @@ class ContentType(Enum):
         """Return if contentype is PCM."""
         return self.name.startswith("PCM")
 
-    def sox_supported(self):
-        """Return if ContentType is supported by SoX."""
-        return self.is_pcm() or self in [
-            ContentType.OGG,
-            ContentType.FLAC,
-            ContentType.MP3,
-            ContentType.WAV,
-            ContentType.AIFF,
-        ]
-
-    def sox_format(self):
-        """Convert the ContentType to SoX compatible format."""
-        if not self.sox_supported():
-            raise NotImplementedError
-        return self.value.replace("le", "")
-
     @classmethod
     def from_bit_depth(
         cls, bit_depth: int, floating_point: bool = False
@@ -186,8 +162,6 @@ class EventType(Enum):
 
     PLAYER_ADDED = "player_added"
     PLAYER_UPDATED = "player_updated"
-    STREAM_STARTED = "streaming_started"
-    STREAM_ENDED = "streaming_ended"
     QUEUE_ADDED = "queue_added"
     QUEUE_UPDATED = "queue_updated"
     QUEUE_ITEMS_UPDATED = "queue_items_updated"
index bdc3b5dbba46ea20aaab716fcd5e4f904a3b945e..2ae836c02e5fd9a6d2524c94ae782024d2835094 100755 (executable)
@@ -2,6 +2,7 @@
 from __future__ import annotations
 
 from dataclasses import dataclass, field, fields
+from time import time
 from typing import Any, Dict, List, Mapping, Optional, Set, Union
 
 from mashumaro import DataClassDictMixin
@@ -17,7 +18,6 @@ from music_assistant.models.enums import (
     MediaQuality,
     MediaType,
     ProviderType,
-    StreamType,
 )
 
 MetadataTypes = Union[int, bool, str, List[str]]
@@ -338,30 +338,50 @@ MediaItemType = Union[Artist, Album, Track, Radio, Playlist]
 class StreamDetails(DataClassDictMixin):
     """Model for streamdetails."""
 
-    type: StreamType
+    # NOTE: the actual provider/itemid of the streamdetails may differ
+    # from the connected media_item due to track linking etc.
+    # the streamdetails are only used to provide details about the content
+    # that is going to be streamed.
+
+    # mandatory fields
     provider: ProviderType
     item_id: str
-    path: str
     content_type: ContentType
-    player_id: str = ""
-    details: Dict[str, Any] = field(default_factory=dict)
-    seconds_played: int = 0
-    seconds_skipped: int = 0
-    gain_correct: float = 0
-    loudness: Optional[float] = None
+    media_type: MediaType = MediaType.TRACK
     sample_rate: int = 44100
     bit_depth: int = 16
     channels: int = 2
-    media_type: MediaType = MediaType.TRACK
-    queue_id: str = None
+    # stream_title: radio streams can optionally set this field
+    stream_title: Optional[str] = None
+    # duration of the item to stream, copied from media_item if omitted
+    duration: Optional[int] = None
+    # total size in bytes of the item, calculated at eof when omitted
+    size: Optional[int] = None
+    # expires: timestamp this streamdetails expire
+    expires: float = time() + 3600
+    # data: provider specific data (not exposed externally)
+    data: Optional[Any] = None
+
+    # the fields below will be set/controlled by the streamcontroller
+    queue_id: Optional[str] = None
+    seconds_streamed: int = 0
+    seconds_skipped: int = 0
+    gain_correct: Optional[float] = None
+    loudness: Optional[float] = None
 
     def __post_serialize__(self, d: Dict[Any, Any]) -> Dict[Any, Any]:
         """Exclude internal fields from dict."""
         # pylint: disable=no-self-use
-        d.pop("path")
-        d.pop("details")
+        d.pop("data")
+        d.pop("expires")
+        d.pop("queue_id")
         return d
 
     def __str__(self):
         """Return pretty printable string of object."""
-        return f"{self.type.value}/{self.content_type.value} - {self.provider.value}/{self.item_id}"
+        return self.uri
+
+    @property
+    def uri(self) -> str:
+        """Return uri representation of item."""
+        return f"{self.provider.value}://{self.media_type.value}/{self.item_id}"
index 3af07abc3cb0e53c890f073522c2a49ab6c79928..e92160d62579828955ce824d375354214aaf9c2e 100755 (executable)
@@ -195,6 +195,22 @@ class Player(ABC):
 
     # SOME CONVENIENCE METHODS (may be overridden if needed)
 
+    @property
+    def stream_type(self) -> ContentType:
+        """Return supported/preferred stream type for playerqueue. Read only."""
+        # determine default stream type from player capabilities
+        return next(
+            x
+            for x in (
+                ContentType.FLAC,
+                ContentType.WAV,
+                ContentType.PCM_S16LE,
+                ContentType.MP3,
+                ContentType.MPEG,
+            )
+            if x in self.supported_content_types
+        )
+
     async def volume_mute(self, muted: bool) -> None:
         """Send volume mute command to player."""
         # for players that do not support mute, we fake mute with volume
@@ -402,6 +418,20 @@ class PlayerGroup(Player):
             )
         )
 
+    @property
+    def supported_sample_rates(self) -> Tuple[int]:
+        """Return the sample rates this player supports."""
+        return tuple(
+            sample_rate
+            for sample_rate in DEFAULT_SUPPORTED_SAMPLE_RATES
+            if all(
+                (
+                    sample_rate in child_player.supported_sample_rates
+                    for child_player in self._get_child_players(False, False)
+                )
+            )
+        )
+
     async def stop(self) -> None:
         """Send STOP command to player."""
         if not self.use_multi_stream:
index 90f8d6b4ac04dc45c336a80cc4905635b9879d08..9f44e22af76407f9d7c4ce1d7c093cd16ebae1c9 100644 (file)
@@ -2,27 +2,23 @@
 from __future__ import annotations
 
 import asyncio
+import os
 import pathlib
 import random
 from asyncio import Task, TimerHandle
 from dataclasses import dataclass
 from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
 
-from music_assistant.helpers.audio import get_stream_details
 from music_assistant.models.enums import EventType, MediaType, QueueOption, RepeatMode
-from music_assistant.models.errors import (
-    MediaNotFoundError,
-    MusicAssistantError,
-    QueueEmpty,
-)
+from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
 from music_assistant.models.event import MassEvent
-from music_assistant.models.media_items import StreamDetails
 
 from .player import Player, PlayerGroup, PlayerState
 from .queue_item import QueueItem
 from .queue_settings import QueueSettings
 
 if TYPE_CHECKING:
+    from music_assistant.controllers.streams import QueueStream
     from music_assistant.mass import MusicAssistant
 
 RESOURCES_DIR = (
@@ -41,7 +37,6 @@ class QueueSnapShot:
 
     powered: bool
     state: PlayerState
-    volume_level: int
     items: List[QueueItem]
     index: Optional[int]
     position: int
@@ -55,35 +50,24 @@ class PlayerQueue:
         self.mass = mass
         self.logger = mass.players.logger
         self.queue_id = player_id
+        self.signal_next: bool = False
+        self._stream_id: str = ""
         self._settings = QueueSettings(self)
         self._current_index: Optional[int] = None
-        # index_in_buffer: which track is currently (pre)loaded in the streamer
-        self._index_in_buffer: Optional[int] = None
         self._current_item_elapsed_time: int = 0
         self._prev_item: Optional[QueueItem] = None
-        # start_index: from which index did the queuestream start playing
-        self._start_index: int = 0
-        # start_pos: from which position (in seconds) did the first track start playing?
-        self._start_pos: int = 0
-        self._next_fadein: int = 0
-        self._next_start_index: int = 0
-        self._next_start_pos: int = 0
-        self._last_state = PlayerState.IDLE
+        self._last_state = str
         self._items: List[QueueItem] = []
         self._save_task: TimerHandle = None
         self._update_task: Task = None
-        self._signal_next: bool = False
         self._last_player_update: int = 0
-        self._stream_url: str = ""
+        self._last_stream_id: str = ""
         self._snapshot: Optional[QueueSnapShot] = None
 
     async def setup(self) -> None:
         """Handle async setup of instance."""
         await self._settings.restore()
         await self._restore_items()
-        self._stream_url: str = self.mass.streams.get_stream_url(
-            self.queue_id, content_type=self._settings.stream_type
-        )
         self.mass.signal_event(
             MassEvent(EventType.QUEUE_ADDED, object_id=self.queue_id, data=self)
         )
@@ -103,12 +87,28 @@ class PlayerQueue:
         """Return if player(queue) is available."""
         return self.player.available
 
+    @property
+    def stream(self) -> QueueStream | None:
+        """Return the currently connected/active stream for this queue."""
+        return self.mass.streams.queue_streams.get(self._stream_id)
+
+    @property
+    def index_in_buffer(self) -> int | None:
+        """Return the item that is curently loaded into the buffer."""
+        if not self._stream_id:
+            return None
+        if stream := self.mass.streams.queue_streams.get(self._stream_id):
+            return stream.index_in_buffer
+        return None
+
     @property
     def active(self) -> bool:
         """Return bool if the queue is currenty active on the player."""
-        if self.player.use_multi_stream:
-            return self.queue_id in self.player.current_url
-        return self._stream_url == self.player.current_url
+        if not self.player.current_url:
+            return False
+        if stream := self.stream:
+            return stream.url == self.player.current_url
+        return False
 
     @property
     def elapsed_time(self) -> float:
@@ -192,8 +192,7 @@ class PlayerQueue:
                 QueueOption.REPLACE -> Replace queue contents with these items
                 QueueOption.NEXT -> Play item(s) after current playing item
                 QueueOption.ADD -> Append new items at end of the queue
-            :param passive: do not actually start playback.
-        Returns: the stream URL for this queue.
+            :param passive: if passive set to true the stream url will not be sent to the player.
         """
         # a single item or list of items may be provided
         if not isinstance(uris, list):
@@ -205,8 +204,8 @@ class PlayerQueue:
                 media_item = await self.mass.music.get_item_by_uri(uri)
             except MusicAssistantError as err:
                 # invalid MA uri or item not found error
-                if uri.startswith("http"):
-                    # a plain url was provided
+                if uri.startswith("http") or os.path.isfile(uri):
+                    # a plain url (or local file) was provided
                     queue_items.append(QueueItem.from_url(uri))
                     continue
                 raise MediaNotFoundError(f"Invalid uri: {uri}") from err
@@ -247,23 +246,20 @@ class PlayerQueue:
         if self._current_index and self._current_index >= (len(self._items) - 1):
             self._current_index = None
             self._items = []
-        # clear resume point if any
-        self._start_pos = 0
 
         # load items into the queue
         if queue_opt == QueueOption.REPLACE:
-            await self.load(queue_items, passive=passive)
+            await self.load(queue_items, passive)
         elif (
             queue_opt in [QueueOption.PLAY, QueueOption.NEXT] and len(queue_items) > 100
         ):
-            await self.load(queue_items, passive=passive)
+            await self.load(queue_items, passive)
         elif queue_opt == QueueOption.NEXT:
-            await self.insert(queue_items, 1, passive=passive)
+            await self.insert(queue_items, 1, passive)
         elif queue_opt == QueueOption.PLAY:
-            await self.insert(queue_items, 0, passive=passive)
+            await self.insert(queue_items, 0, passive)
         elif queue_opt == QueueOption.ADD:
             await self.append(queue_items)
-        return self._stream_url
 
     async def play_alert(
         self, uri: str, announce: bool = False, volume_adjust: int = 10
@@ -277,7 +273,7 @@ class PlayerQueue:
         """
         if self._snapshot:
             self.logger.debug("Ignore play_alert: already in progress")
-            return
+            return
 
         # create snapshot
         await self.snapshot_create()
@@ -286,7 +282,9 @@ class PlayerQueue:
 
         # prepend annnounce sound if needed
         if announce:
-            queue_items.append(QueueItem.from_url(ALERT_ANNOUNCE_FILE, "alert"))
+            queue_item = QueueItem.from_url(ALERT_ANNOUNCE_FILE, "alert")
+            queue_item.streamdetails.gain_correct = 10
+            queue_items.append(queue_item)
 
         # parse provided uri into a MA MediaItem or Basic QueueItem from URL
         try:
@@ -294,23 +292,21 @@ class PlayerQueue:
             queue_items.append(QueueItem.from_media_item(media_item))
         except MusicAssistantError as err:
             # invalid MA uri or item not found error
-            if uri.startswith("http"):
+            if uri.startswith("http") or os.path.isfile(uri):
                 # a plain url was provided
-                queue_items.append(QueueItem.from_url(uri, "alert"))
+                queue_item = QueueItem.from_url(uri, "alert")
+                queue_item.streamdetails.gain_correct = 6
+                queue_items.append(queue_item)
             else:
                 raise MediaNotFoundError(f"Invalid uri: {uri}") from err
 
         # append silence track, we use this to reliably detect when the alert is ready
         silence_url = self.mass.streams.get_silence_url(600)
-        queue_items.append(QueueItem.from_url(silence_url, "alert"))
+        queue_item = QueueItem.from_url(silence_url, "alert")
+        queue_items.append(queue_item)
 
         # load queue with alert sound(s)
         await self.load(queue_items)
-        # set new volume
-        new_volume = self.player.volume_level + (
-            self.player.volume_level / 100 * volume_adjust
-        )
-        await self.player.volume_set(new_volume)
 
         # wait for the alert to finish playing
         alert_done = asyncio.Event()
@@ -336,6 +332,7 @@ class PlayerQueue:
 
     async def stop(self) -> None:
         """Stop command on queue player."""
+        self.signal_next = False
         # redirect to underlying player
         await self.player.stop()
 
@@ -399,7 +396,9 @@ class PlayerQueue:
             resume_pos = 0
 
         if resume_item is not None:
-            await self.play_index(resume_item.item_id, resume_pos, 5)
+            resume_pos = resume_pos if resume_pos > 10 else 0
+            fade_in = 5 if resume_pos else 0
+            await self.play_index(resume_item.item_id, resume_pos, fade_in)
         else:
             self.logger.warning(
                 "resume queue requested for %s but queue is empty", self.queue_id
@@ -410,7 +409,6 @@ class PlayerQueue:
         self._snapshot = QueueSnapShot(
             powered=self.player.powered,
             state=self.player.state,
-            volume_level=self.player.volume_level,
             items=self._items,
             index=self._current_index,
             position=self._current_item_elapsed_time,
@@ -421,11 +419,10 @@ class PlayerQueue:
         assert self._snapshot, "Create snapshot before restoring it."
         # clear queue first
         await self.clear()
-        # restore volume
-        await self.player.volume_set(self._snapshot.volume_level)
         # restore queue
         await self.update(self._snapshot.items)
         self._current_index = self._snapshot.index
+        self._current_item_elapsed_time = self._snapshot.position
         if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED):
             await self.resume()
         if self._snapshot.state == PlayerState.PAUSED:
@@ -443,8 +440,6 @@ class PlayerQueue:
         passive: bool = False,
     ) -> None:
         """Play item at index (or item_id) X in queue."""
-        if self.player.use_multi_stream:
-            await self.mass.streams.stop_multi_client_queue_stream(self.queue_id)
         if not isinstance(index, int):
             index = self.index_by_id(index)
         if index is None:
@@ -452,38 +447,8 @@ class PlayerQueue:
         if not len(self.items) > index:
             return
         self._current_index = index
-        self._next_start_index = index
-        self._next_start_pos = int(seek_position)
-        self._next_fadein = fade_in
-        # send stream url to player connected to this queue
-        self._stream_url = self.mass.streams.get_stream_url(
-            self.queue_id, content_type=self._settings.stream_type
-        )
-
-        if self.player.use_multi_stream:
-            # multi stream enabled, all child players should receive the same audio stream
-            # redirect command to all (powered) players
-            coros = []
-            expected_clients = set()
-            for child_id in self.player.group_childs:
-                if child_player := self.mass.players.get_player(child_id):
-                    if child_player.powered:
-                        # TODO: this assumes that all client players support flac
-                        player_url = self.mass.streams.get_stream_url(
-                            self.queue_id, child_id, self._settings.stream_type
-                        )
-                        expected_clients.add(child_id)
-                        coros.append(child_player.play_url(player_url))
-            await self.mass.streams.start_multi_client_queue_stream(
-                # TODO: this assumes that all client players support flac
-                self.queue_id,
-                expected_clients,
-                self._settings.stream_type,
-            )
-            await asyncio.gather(*coros)
-        elif not passive:
-            # regular (single player) request
-            await self.player.play_url(self._stream_url)
+        # start the queue stream
+        await self.queue_stream_start(index, int(seek_position), fade_in, passive)
 
     async def move_item(self, queue_item_id: str, pos_shift: int = 1) -> None:
         """
@@ -510,9 +475,10 @@ class PlayerQueue:
     async def delete_item(self, queue_item_id: str) -> None:
         """Delete item (by id or index) from the queue."""
         item_index = self.index_by_id(queue_item_id)
-        if item_index <= self._index_in_buffer:
+        if self.stream and item_index <= self.index_in_buffer:
             # ignore request if track already loaded in the buffer
             # the frontend should guard so this is just in case
+            self.logger.warning("delete requested for item already loaded in buffer")
             return
         self._items.pop(item_index)
         self.signal_update(True)
@@ -538,9 +504,10 @@ class PlayerQueue:
             :param queue_items: a list of QueueItem
             :param offset: offset from current queue position
         """
-        if not self.items or self._current_index is None:
-            return await self.load(queue_items)
-        insert_at_index = self._current_index + offset
+        cur_index = self.index_in_buffer or self._current_index
+        if not self.items or cur_index is None:
+            return await self.load(queue_items, passive)
+        insert_at_index = cur_index + offset
         for index, item in enumerate(queue_items):
             item.sort_index = insert_at_index + index
         if self.settings.shuffle_enabled and len(queue_items) > 5:
@@ -559,7 +526,7 @@ class PlayerQueue:
                 + self._items[insert_at_index:]
             )
 
-        if offset in (0, self._index_in_buffer):
+        if offset in (0, cur_index):
             await self.play_index(insert_at_index, passive=passive)
 
         self.signal_update(True)
@@ -592,9 +559,10 @@ class PlayerQueue:
 
     def on_player_update(self) -> None:
         """Call when player updates."""
-        if self._last_state != self.player.state:
+        player_state_str = f"{self.player.state.value}.{self.player.current_url}"
+        if self._last_state != player_state_str:
             # playback state changed
-            self._last_state = self.player.state
+            self._last_state = player_state_str
 
             # always signal update if playback state changed
             self.signal_update()
@@ -606,13 +574,10 @@ class PlayerQueue:
                 ):
                     self._current_index += 1
                     self._current_item_elapsed_time = 0
-                    # repeat enabled (of whole queue), play queue from beginning
-                    if self.settings.repeat_mode == RepeatMode.ALL:
-                        self.mass.create_task(self.play_index(0))
 
                 # handle case where stream stopped on purpose and we need to restart it
-                elif self._signal_next:
-                    self._signal_next = False
+                elif self.signal_next:
+                    self.signal_next = False
                     self.mass.create_task(self.resume())
 
             # start poll/updater task if playback starts on player
@@ -668,55 +633,58 @@ class PlayerQueue:
                 )
             )
 
-    async def queue_stream_prepare(self) -> StreamDetails:
-        """Call when queue_streamer is about to start playing."""
-        start_from_index = self._next_start_index
-        try:
-            next_item = self._items[start_from_index]
-        except (IndexError, TypeError) as err:
-            raise QueueEmpty() from err
-        try:
-            return await get_stream_details(self.mass, next_item, self.queue_id)
-        except MediaNotFoundError as err:
-            # something bad happened, try to recover by requesting the next track in the queue
-            await self.play_index(self._current_index + 2)
-            raise err
-
-    async def queue_stream_start(self) -> Tuple[int, int, int]:
-        """Call when queue_streamer starts playing the queue stream."""
-        start_from_index = self._next_start_index
+    async def queue_stream_start(
+        self, start_index: int, seek_position: int, fade_in: bool, passive: bool = False
+    ) -> None:
+        """Start the queue stream runner."""
+        players: List[Player] = []
+        output_format = self._settings.stream_type
+        # if multi stream is enabled, all child players should receive the same audio stream
+        if self.player.use_multi_stream:
+            for child_id in self.player.group_childs:
+                child_player = self.mass.players.get_player(child_id)
+                if not child_player or not child_player.powered:
+                    continue
+                players.append(child_player)
+        else:
+            # regular (single player) request
+            players.append(self.player)
+
         self._current_item_elapsed_time = 0
-        self._current_index = start_from_index
-        self._start_index = start_from_index
-        self._next_start_index = self.get_next_index(start_from_index)
-        self._index_in_buffer = start_from_index
-        seek_position = self._next_start_pos
-        self._next_start_pos = 0
-        fade_in = self._next_fadein
-        self._next_fadein = 0
-        return (start_from_index, seek_position, fade_in)
-
-    async def queue_stream_next(self, cur_index: int) -> int | None:
-        """Call when queue_streamer loads next track in buffer."""
-        next_idx = self._next_start_index
-        self._index_in_buffer = next_idx
-        self._next_start_index = self.get_next_index(self._next_start_index)
-        return next_idx
-
-    def get_next_index(self, cur_index: int) -> int:
+        self._current_index = start_index
+
+        # start the queue stream background task
+        stream = await self.mass.streams.start_queue_stream(
+            queue=self,
+            expected_clients=len(players),
+            start_index=start_index,
+            seek_position=seek_position,
+            fade_in=fade_in,
+            output_format=output_format,
+        )
+        self._stream_id = stream.stream_id
+        # execute the play command on the player(s)
+        if not passive:
+            await asyncio.gather(*[x.play_url(stream.url) for x in players])
+
+    def get_next_index(self, cur_index: Optional[int]) -> int:
         """Return the next index for the queue, accounting for repeat settings."""
-        if not self._items or cur_index is None:
-            raise IndexError("No (more) items in queue")
+        # handle repeat single track
         if self.settings.repeat_mode == RepeatMode.ONE:
             return cur_index
+        # handle repeat all
+        if (
+            self.settings.repeat_mode == RepeatMode.ALL
+            and self._items
+            and cur_index == (len(self._items) - 1)
+        ):
+            return 0
         # simply return the next index. other logic is guarded to detect the index
         # being higher than the number of items to detect end of queue and/or handle repeat.
+        if cur_index is None:
+            return 0
         return cur_index + 1
 
-    async def queue_stream_signal_next(self):
-        """Indicate that queue stream needs to start next index once playback finished."""
-        self._signal_next = True
-
     def signal_update(self, items_changed: bool = False) -> None:
         """Signal state changed of this queue."""
         if items_changed:
@@ -744,6 +712,7 @@ class PlayerQueue:
             "state": self.player.state.value,
             "available": self.player.available,
             "current_index": self.current_index,
+            "index_in_buffer": self.index_in_buffer,
             "current_item": cur_item,
             "next_item": next_item,
             "settings": self.settings.to_dict(),
@@ -756,9 +725,9 @@ class PlayerQueue:
         elapsed_time_queue = self.player.elapsed_time
         total_time = 0
         track_time = 0
-        if self._items and len(self._items) > self._start_index:
-            # start_index: holds the last starting position
-            queue_index = self._start_index
+        if self._items and self.stream and len(self._items) > self.stream.start_index:
+            # start_index: holds the position from which the stream started
+            queue_index = self.stream.start_index
             queue_track = None
             while len(self._items) > queue_index:
                 # keep enumerating the queue tracks to find current track
@@ -767,11 +736,15 @@ class PlayerQueue:
                 if not queue_track.streamdetails:
                     track_time = elapsed_time_queue - total_time
                     break
-                track_seconds = queue_track.streamdetails.seconds_played
-                if elapsed_time_queue > (track_seconds + total_time):
+                duration = (
+                    queue_track.streamdetails.seconds_streamed or queue_track.duration
+                )
+                if duration is not None and elapsed_time_queue > (
+                    duration + total_time
+                ):
                     # total elapsed time is more than (streamed) track duration
                     # move index one up
-                    total_time += track_seconds
+                    total_time += duration
                     queue_index += 1
                 else:
                     # no more seconds left to divide, this is our track
index 62ba1f4e759806f8a82f65564879b3ec39f74280..3c6038fd11c17c7d7e15a63673570b42a0630d0e 100644 (file)
@@ -12,9 +12,9 @@ from music_assistant.models.media_items import (
     MediaItemType,
     Playlist,
     Radio,
+    StreamDetails,
     Track,
 )
-from music_assistant.models.player_queue import StreamDetails
 
 if TYPE_CHECKING:
     from music_assistant.mass import MusicAssistant
@@ -170,10 +170,16 @@ class MusicProvider:
         if MediaType.PLAYLIST in self.supported_mediatypes:
             raise NotImplementedError
 
-    async def get_stream_details(self, item_id: str) -> StreamDetails:
+    async def get_stream_details(self, item_id: str) -> StreamDetails | None:
         """Get streamdetails for a track/radio."""
         raise NotImplementedError
 
+    async def get_audio_stream(
+        self, streamdetails: StreamDetails, seek_position: int = 0
+    ) -> AsyncGenerator[bytes, None]:
+        """Return the audio stream for the provider item."""
+        raise NotImplementedError
+
     async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
         """Get single MediaItem from provider."""
         if media_type == MediaType.ARTIST:
index 6dc2045eaed85da07dfd3219763fefb0f89f57fa..878270edf9bb4123f44c01113182b9366762affd 100644 (file)
@@ -7,7 +7,7 @@ from uuid import uuid4
 
 from mashumaro import DataClassDictMixin
 
-from music_assistant.models.enums import MediaType
+from music_assistant.models.enums import ContentType, MediaType, ProviderType
 from music_assistant.models.media_items import Radio, StreamDetails, Track
 
 
@@ -46,14 +46,24 @@ class QueueItem(DataClassDictMixin):
         return d
 
     @classmethod
-    def from_url(cls, url: str, name: Optional[str] = None) -> QueueItem:
-        """Create QueueItem from plain url."""
+    def from_url(
+        cls,
+        url: str,
+        name: Optional[str] = None,
+        media_type: MediaType = MediaType.URL,
+    ) -> QueueItem:
+        """Create QueueItem from plain url (or local file)."""
         return cls(
             uri=url,
             name=name or url.split("?")[0],
-            media_type=MediaType.UNKNOWN,
-            image=None,
-            media_item=None,
+            media_type=media_type,
+            streamdetails=StreamDetails(
+                provider=ProviderType.URL,
+                item_id=url,
+                content_type=ContentType.try_parse(url),
+                media_type=media_type,
+                data=url,
+            ),
         )
 
     @classmethod
index e69b52203525574cc148552fdacce87d50031320..62cac2aa1b6684a6ddc1a46becff03c3e7cdaed9 100644 (file)
@@ -23,7 +23,7 @@ class QueueSettings:
         self._crossfade_mode: CrossFadeMode = CrossFadeMode.DISABLED
         self._crossfade_duration: int = 6
         self._volume_normalization_enabled: bool = True
-        self._volume_normalization_target: int = -23
+        self._volume_normalization_target: int = -14
 
     @property
     def repeat_mode(self) -> RepeatMode:
@@ -124,18 +124,7 @@ class QueueSettings:
     @property
     def stream_type(self) -> ContentType:
         """Return supported/preferred stream type for playerqueue. Read only."""
-        # determine default stream type from player capabilities
-        return next(
-            x
-            for x in (
-                ContentType.FLAC,
-                ContentType.WAV,
-                ContentType.PCM_S16LE,
-                ContentType.MP3,
-                ContentType.MPEG,
-            )
-            if x in self._queue.player.supported_content_types
-        )
+        return self._queue.player.stream_type
 
     def to_dict(self) -> Dict[str, Any]:
         """Return dict from settings."""