Buffer optimizations (#431)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 21 Jul 2022 23:31:28 +0000 (01:31 +0200)
committerGitHub <noreply@github.com>
Thu, 21 Jul 2022 23:31:28 +0000 (01:31 +0200)
* make icy metadata controllable

* to not buffer ahead (and skip crossfade) if the player does not have any buffer

* enlarge buffer

music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/models/enums.py
music_assistant/models/queue_settings.py

index 37d9e013ad9f197966fe410c46a12221fc9efeeb..deecdff679794bb773dad4f3b2c64981caea3a68 100644 (file)
@@ -27,6 +27,7 @@ from music_assistant.models.enums import (
     CrossFadeMode,
     EventType,
     MediaType,
+    MetadataMode,
     ProviderType,
 )
 from music_assistant.models.errors import MediaNotFoundError, QueueEmpty
@@ -192,20 +193,12 @@ class StreamsController:
             "Cache-Control": "no-cache",
         }
 
-        # for now, only support icy metadata on MP3 streams to prevent issues
-        # https://github.com/music-assistant/hass-music-assistant/issues/603
-        # in the future we could expand this support:
-        # by making exceptions for players that do also support ICY on other content types
-        # and/or metaint value such as Kodi.
-        # another future expansion is to just get the PCM frames here and encode
-        # for each inidvidual player with or without ICY...
-        if queue_stream.output_format == ContentType.MP3:
-            # use the default/recommended metaint size of 8192
-            # https://cast.readme.io/docs/icy
+        # ICY-metadata headers depend on settings
+        metadata_mode = queue_stream.queue.settings.metadata_mode
+        if metadata_mode != MetadataMode.DISABLED:
             headers["icy-name"] = "Music Assistant"
             headers["icy-pub"] = "1"
-            # use the default/recommended metaint size of 8192
-            headers["icy-metaint"] = str(ICY_CHUNKSIZE)
+            headers["icy-metaint"] = str(queue_stream.output_chunksize)
 
         resp = web.StreamResponse(headers=headers)
         try:
@@ -364,6 +357,13 @@ class QueueStream:
         self.signal_next: bool = False
         self._runner_task: Optional[asyncio.Task] = None
         self._prev_chunk: bytes = b""
+        if queue.settings.metadata_mode == MetadataMode.LEGACY:
+            # use the legacy/recommended metaint size of 8192 bytes
+            self.output_chunksize = ICY_CHUNKSIZE
+        else:
+            self.output_chunksize = get_chunksize(
+                output_format, pcm_sample_rate, pcm_bit_depth
+            )
         if autostart:
             self.mass.create_task(self.start())
 
@@ -473,21 +473,7 @@ class QueueStream:
 
             # Read bytes from final output and send chunk to child callback.
             chunk_num = 0
-            if self.output_format == ContentType.MP3:
-                # use the icy compatible static chunksize (iter_chunks of x size)
-                get_chunks = ffmpeg_proc.iter_chunked(ICY_CHUNKSIZE)
-            else:
-                # all other: prefer chunksize that fits 1 second belonging to output type
-                # but accept less (iter any chunk of max chunk size)
-                get_chunks = ffmpeg_proc.iter_any(
-                    get_chunksize(
-                        self.output_format,
-                        self.pcm_sample_rate,
-                        self.pcm_bit_depth,
-                        self.pcm_channels,
-                    )
-                )
-            async for chunk in get_chunks:
+            async for chunk in ffmpeg_proc.iter_chunked(self.output_chunksize):
                 chunk_num += 1
 
                 if len(self.connected_clients) == 0:
@@ -630,7 +616,7 @@ class QueueStream:
             crossfade_duration = self.queue.settings.crossfade_duration
             crossfade_size = sample_size_per_second * crossfade_duration
             # buffer_duration has some overhead to account for padded silence
-            buffer_duration = (crossfade_duration or 2) * 2 if track_count > 1 else 1
+            buffer_duration = (crossfade_duration or 1) * 2
             # predict total size to expect for this track from duration
             stream_duration = (queue_track.duration or 0) - seek_position
 
@@ -647,6 +633,7 @@ class QueueStream:
             self.queue.signal_update()
             buffer = b""
             bytes_written = 0
+            seconds_streamed = 0
             # handle incoming audio chunks
             async for chunk in get_media_stream(
                 self.mass,
@@ -658,9 +645,13 @@ class QueueStream:
                 chunk_size=sample_size_per_second,
             ):
 
-                seconds_streamed = bytes_written / sample_size_per_second
+                seconds_streamed += 1
+                self.seconds_streamed += 1
                 seconds_in_buffer = len(buffer) / sample_size_per_second
-                queue_track.streamdetails.seconds_streamed = seconds_streamed
+                # try to make a rough assumption of how many seconds the player has in buffer
+                player_in_buffer = self.seconds_streamed - (
+                    time() - self.streaming_started
+                )
 
                 ####  HANDLE FIRST PART OF TRACK
 
@@ -675,6 +666,7 @@ class QueueStream:
                     streamdetails.media_type == MediaType.ANNOUNCEMENT
                     or not stream_duration
                     or stream_duration < buffer_duration
+                    or player_in_buffer < buffer_duration
                 ):
                     # handle edge case where we have a previous chunk in buffer
                     # and the next track is too short
index b913c46103fe6a69ce1d7373e954fa7df9fb3f51..582b9fb21ef7fe66c784cfad4b7d802a4e752222 100644 (file)
@@ -425,7 +425,7 @@ async def get_radio_stream(
 ) -> AsyncGenerator[bytes, None]:
     """Get radio audio stream from HTTP, including metadata retrieval."""
     headers = {"Icy-MetaData": "1"}
-    timeout = ClientTimeout(total=0, connect=30, sock_read=120)
+    timeout = ClientTimeout(total=0, connect=30, sock_read=600)
     async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp:
         headers = resp.headers
         meta_int = int(headers.get("icy-metaint", "0"))
@@ -479,7 +479,7 @@ async def get_http_stream(
     buffer = b""
     buffer_all = False
     bytes_received = 0
-    timeout = ClientTimeout(total=0, connect=30, sock_read=120)
+    timeout = ClientTimeout(total=0, connect=30, sock_read=600)
     async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp:
         is_partial = resp.status == 206
         buffer_all = seek_position and not is_partial
index fb1801910b58592ff022b45d3ba10b7fae0253ea..8ec4fae4a0998d0e7a20fe4d37c07787938be184 100644 (file)
@@ -189,6 +189,14 @@ class RepeatMode(Enum):
     ALL = "all"  # repeat entire queue
 
 
+class MetadataMode(Enum):
+    """Enum with stream metadata modes."""
+
+    DISABLED = "disabled"  # do not notify icy support
+    DEFAULT = "default"  # enable icy if player requests it, default chunksize
+    LEGACY = "legacy"  # enable icy but with legacy 8kb chunksize, requires mp3
+
+
 class PlayerState(Enum):
     """Enum for the (playback)state of a player."""
 
index 02e94f48e57f3bfeb94a4503e975ba424355606d..f9d6ef391782e088e8843a789f163945b1b69c10 100644 (file)
@@ -5,7 +5,7 @@ import asyncio
 import random
 from typing import TYPE_CHECKING, Any, Dict, Optional
 
-from .enums import ContentType, CrossFadeMode, RepeatMode
+from .enums import ContentType, CrossFadeMode, MetadataMode, RepeatMode
 
 if TYPE_CHECKING:
     from .player_queue import PlayerQueue
@@ -27,6 +27,7 @@ class QueueSettings:
         self._stream_type: ContentType = queue.player.stream_type
         self._max_sample_rate: int = queue.player.max_sample_rate
         self._announce_volume_increase: int = 15
+        self._metadata_mode: MetadataMode = MetadataMode.DEFAULT
 
     @property
     def repeat_mode(self) -> RepeatMode:
@@ -34,10 +35,10 @@ class QueueSettings:
         return self._repeat_mode
 
     @repeat_mode.setter
-    def repeat_mode(self, enabled: bool) -> None:
+    def repeat_mode(self, mode: RepeatMode) -> None:
         """Set repeat enabled setting."""
-        if self._repeat_mode != enabled:
-            self._repeat_mode = enabled
+        if self._repeat_mode != mode:
+            self._repeat_mode = mode
             self._on_update("repeat_mode")
 
     @property
@@ -59,7 +60,6 @@ class QueueSettings:
                 # for now we use default python random function
                 # can be extended with some more magic based on last_played and stuff
                 next_items = random.sample(next_items, len(next_items))
-
                 items = played_items + [cur_item] + next_items
                 asyncio.create_task(self._queue.update_items(items))
                 self._on_update("shuffle_enabled")
@@ -165,6 +165,18 @@ class QueueSettings:
             self._announce_volume_increase = volume_increase
             self._on_update("announce_volume_increase")
 
+    @property
+    def metadata_mode(self) -> MetadataMode:
+        """Return metadata mode setting."""
+        return self._metadata_mode
+
+    @metadata_mode.setter
+    def metadata_mode(self, mode: MetadataMode) -> None:
+        """Set metadata mode setting."""
+        if self._metadata_mode != mode:
+            self._metadata_mode = mode
+            self._on_update("metadata_mode")
+
     def to_dict(self) -> Dict[str, Any]:
         """Return dict from settings."""
         return {
@@ -177,6 +189,7 @@ class QueueSettings:
             "stream_type": self.stream_type.value,
             "max_sample_rate": self.max_sample_rate,
             "announce_volume_increase": self.announce_volume_increase,
+            "metadata_mode": self.metadata_mode.value,
         }
 
     def from_dict(self, d: Dict[str, Any]) -> None:
@@ -200,6 +213,9 @@ class QueueSettings:
         self._announce_volume_increase = int(
             d.get("announce_volume_increase", self._announce_volume_increase)
         )
+        self._metadata_mode = MetadataMode(
+            d.get("metadata_mode", self._metadata_mode.value)
+        )
 
     async def restore(self) -> None:
         """Restore state from db."""