Some optimizations for Airplay streaming (#1107)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 25 Feb 2024 18:59:57 +0000 (19:59 +0100)
committerGitHub <noreply@github.com>
Sun, 25 Feb 2024 18:59:57 +0000 (19:59 +0100)
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/process.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64
music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64
music_assistant/server/providers/airplay/bin/cliraop-macos-arm64
music_assistant/server/providers/opensubsonic/sonic_provider.py

index 8d46f9591602d7b965d7800ed769dea24e8bf822..03d898d48ebd2cec7ed0d58edc941c24be89d68e 100644 (file)
@@ -137,7 +137,7 @@ class MultiClientStreamJob:
             self._audio_task.cancel()
         for sub_queue in self.subscribed_players.values():
             with suppress(asyncio.QueueFull):
-                sub_queue.put_nowait(b"")
+                sub_queue.put_nowait(b"EOF")
 
     def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
         """Resolve the childplayer specific stream URL to this streamjob."""
@@ -167,7 +167,7 @@ class MultiClientStreamJob:
             and player_id not in self.workaround_players_seen
         ):
             self.workaround_players_seen.add(player_id)
-            yield b""
+            yield b"EOF"
             return
 
         try:
@@ -184,10 +184,10 @@ class MultiClientStreamJob:
                 # so that chunks can be pushed
                 self._all_clients_connected.set()
 
-            # keep reading audio chunks from the queue until we receive an empty one
+            # keep reading audio chunks from the queue until we receive an EOF chunk
             while True:
                 chunk = await sub_queue.get()
-                if chunk == b"":
+                if chunk == b"EOF":
                     # EOF chunk received
                     break
                 yield chunk
@@ -246,8 +246,8 @@ class MultiClientStreamJob:
 
             await self._put_chunk(chunk)
 
-        # mark EOF with empty chunk
-        await self._put_chunk(b"")
+        # mark EOF with EOF chunk
+        await self._put_chunk(b"EOF")
 
 
 def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
@@ -905,7 +905,9 @@ class StreamsController(CoreController):
                 queue.display_name,
                 queue_track.streamdetails.seconds_streamed,
             )
-
+        # end of queue flow: make sure we yield the last_fadeout_part
+        if last_fadeout_part:
+            yield last_fadeout_part
         self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
 
     async def _get_player_ffmpeg_args(
index ea6af71e4cea667f37682bf809153cc158486caa..0b3882a1df275b6d06063b1030eeee86cc4065f4 100644 (file)
@@ -63,8 +63,6 @@ class AsyncProcess:
         """Yield chunks of n size from the process stdout."""
         while True:
             chunk = await self.readexactly(n)
-            if chunk == b"":
-                break
             yield chunk
             if len(chunk) < n:
                 break
index ae299cbac40777fcacd37a3c7e0da097855f6de5..37ad643ec3a58320dd003cdae5a82a805da3310e 100644 (file)
@@ -13,11 +13,11 @@ from dataclasses import dataclass
 from random import randint, randrange
 from typing import TYPE_CHECKING
 
-from zeroconf import ServiceStateChange
+from zeroconf import IPVersion, ServiceStateChange
 from zeroconf.asyncio import AsyncServiceInfo
 
 from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_CROSSFADE,
     CONF_ENTRY_CROSSFADE_DURATION,
@@ -48,8 +48,6 @@ if TYPE_CHECKING:
 
 DOMAIN = "airplay"
 
-CONF_LATENCY = "latency"
-DEFAULT_LATENCY = 2000
 CONF_ENCRYPTION = "encryption"
 CONF_ALAC_ENCODE = "alac_encode"
 CONF_VOLUME_START = "volume_start"
@@ -58,18 +56,6 @@ CONF_PASSWORD = "password"
 PLAYER_CONFIG_ENTRIES = (
     CONF_ENTRY_CROSSFADE,
     CONF_ENTRY_CROSSFADE_DURATION,
-    ConfigEntry(
-        key=CONF_LATENCY,
-        type=ConfigEntryType.INTEGER,
-        range=(500, 4000),
-        default_value=DEFAULT_LATENCY,
-        label="Latency",
-        description="Sets the number of milliseconds of audio buffer in the player. "
-        "This is important to absorb network throughput jitter. \n"
-        "Increase this value if you notice network dropouts at the cost of a slower "
-        "response to commands.",
-        advanced=True,
-    ),
     ConfigEntry(
         key=CONF_ENCRYPTION,
         type=ConfigEntryType.BOOLEAN,
@@ -172,6 +158,13 @@ def get_model_from_am(am_property: str | None) -> tuple[str, str]:
     return (manufacturer, model)
 
 
+def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None:
+    """Get primary IP address from zeroconf discovery info."""
+    return next(
+        (x for x in discovery_info.parsed_addresses(IPVersion.V4Only) if x != "127.0.0.1"), None
+    )
+
+
 class AirplayStreamJob:
     """Object that holds the details of a stream job."""
 
@@ -184,13 +177,20 @@ class AirplayStreamJob:
         # with the named pipe used to send commands
         self.active_remote_id: str = str(randint(1000, 8000))
         self.start_ntp: int | None = None  # use as checksum
+        self._audio_buffer = asyncio.Queue(2)
         self._log_reader_task: asyncio.Task | None = None
+        self._audio_reader_task: asyncio.Task | None = None
         self._cliraop_proc: asyncio.subprocess.Process | None = None
+        self._stop_requested = False
 
     @property
     def running(self) -> bool:
         """Return bool if we're running."""
-        return self._cliraop_proc and self._cliraop_proc.returncode is None
+        return (
+            not self._stop_requested
+            and self._cliraop_proc
+            and self._cliraop_proc.returncode is None
+        )
 
     async def init_cliraop(self, start_ntp: int) -> None:
         """Initialize CLIRaop process for a player."""
@@ -198,10 +198,6 @@ class AirplayStreamJob:
         extra_args = []
         player_id = self.airplay_player.player_id
         mass_player = self.mass.players.get(player_id)
-        latency = self.mass.config.get_raw_player_config_value(
-            player_id, CONF_LATENCY, DEFAULT_LATENCY
-        )
-        extra_args += ["-l", str(latency)]
         if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False):
             extra_args += ["-e"]
         if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
@@ -223,7 +219,7 @@ class AirplayStreamJob:
             "-p",
             str(self.airplay_player.discovery_info.port),
             "-w",
-            str(2500 - sync_adjust),
+            str(2000 - sync_adjust),
             "-v",
             str(mass_player.volume_level),
             *extra_args,
@@ -247,26 +243,22 @@ class AirplayStreamJob:
             close_fds=True,
         )
         self._log_reader_task = asyncio.create_task(self._log_watcher())
+        self._audio_reader_task = asyncio.create_task(self._audio_reader())
 
     async def stop(self):
         """Stop playback and cleanup."""
         if not self.running:
             return
-        # prefer interactive command to our streamer
         await self.send_cli_command("ACTION=STOP")
-        # use communicate to clear stdin/stdout and wait for exit
-        try:
-            await asyncio.wait_for(self._cliraop_proc.wait(), 5)
-        except TimeoutError:
-            self.airplay_player.logger.error(  # noqa: TRY400
-                "RAOP process did not stop on time, attempting forced close."
-            )
-            self._cliraop_proc.kill()
-            await asyncio.wait_for(self._cliraop_proc.wait(), 5)
-        finally:
-            # stop background task
-            if self._log_reader_task and not self._log_reader_task.done():
-                self._log_reader_task.cancel()
+        self._stop_requested = True
+        # stop background tasks
+        if self._log_reader_task and not self._log_reader_task.done():
+            self._log_reader_task.cancel()
+        if self._audio_reader_task and not self._audio_reader_task.done():
+            self._audio_reader_task.cancel()
+
+        empty_queue(self._audio_buffer)
+        await asyncio.wait_for(self._cliraop_proc.communicate(), 30)
 
     async def send_cli_command(self, command: str) -> None:
         """Send an interactive command to the running CLIRaop binary."""
@@ -295,55 +287,82 @@ class AirplayStreamJob:
             if not line:
                 continue
             if "elapsed milliseconds:" in line:
+                # do not log this line, its too verbose
                 millis = int(line.split("elapsed milliseconds: ")[1])
                 mass_player.elapsed_time = millis / 1000
                 mass_player.elapsed_time_last_updated = time.time()
-                continue  # do not log this line, its too verbose
+                continue
             if "set pause" in line or "Pause at" in line:
+                logger.info("raop streaming paused")
                 mass_player.state = PlayerState.PAUSED
                 self.mass.players.update(airplay_player.player_id)
-            elif "Restarted at" in line or "restarting w/ pause" in line:
+                continue
+            if "Restarted at" in line or "restarting w/ pause" in line:
+                logger.info("raop streaming restarted after pause")
                 mass_player.state = PlayerState.PLAYING
                 self.mass.players.update(airplay_player.player_id)
-            elif "Stopped at" in line:
+                continue
+            if "Stopped at" in line:
+                logger.info("raop streaming stopped")
                 mass_player.state = PlayerState.IDLE
                 self.mass.players.update(airplay_player.player_id)
-            elif "restarting w/o pause" in line:
+                continue
+            if "restarting w/o pause" in line:
                 # streaming has started
+                logger.info("raop streaming started")
                 mass_player.state = PlayerState.PLAYING
                 mass_player.elapsed_time = 0
                 mass_player.elapsed_time_last_updated = time.time()
                 self.mass.players.update(airplay_player.player_id)
+                continue
+            if "lost packet out of backlog" in line:
+                logger.warning(line)
+                continue
+            # debug log everything else
             logger.debug(line)
 
         # if we reach this point, the process exited
-        airplay_player.logger.debug("Log watcher task finished...")
-        mass_player.state = PlayerState.IDLE
-        self.mass.players.update(airplay_player.player_id)
         logger.debug(
             "CLIRaop process stopped with errorcode %s",
             self._cliraop_proc.returncode,
         )
+        if (
+            airplay_player.active_stream
+            and airplay_player.active_stream.active_remote_id == self.active_remote_id
+        ):
+            mass_player.state = PlayerState.IDLE
+            self.mass.players.update(airplay_player.player_id)
+
+    async def _audio_reader(self) -> None:
+        """Read audio chunks from buffer and send them to the cliraop process."""
+        logger = self.airplay_player.logger
+        logger.debug("Audio reader started")
+        while self.running:
+            chunk = await self._audio_buffer.get()
+            if chunk == b"EOF":
+                # EOF chunk
+                break
+            self._cliraop_proc.stdin.write(chunk)
+            with suppress(BrokenPipeError):
+                await self._cliraop_proc.stdin.drain()
+        # send EOF
+        if self._cliraop_proc.returncode is None and not self._cliraop_proc.stdin.is_closing():
+            self._cliraop_proc.stdin.write_eof()
+            with suppress(BrokenPipeError):
+                await self._cliraop_proc.stdin.drain()
+        logger.debug("Audio reader finished")
 
     async def write_chunk(self, data: bytes) -> None:
-        """Write a chunk of (pcm) data to the stdin of CLIRaop."""
-        if not self.running or not self._cliraop_proc.stdin.can_write_eof():
-            return
-        self._cliraop_proc.stdin.write(data)
-        if not self.running or not self._cliraop_proc.stdin.can_write_eof():
+        """Write a chunk of (pcm) data to the audio buffer."""
+        if not self.running:
             return
-        with suppress(BrokenPipeError):
-            await self._cliraop_proc.stdin.drain()
+        await self._audio_buffer.put(data)
 
-    async def write_eof(self, data: bytes) -> None:
-        """Write a chunk of (pcm) data to the stdin of CLIRaop."""
-        if not self.running or not self._cliraop_proc.stdin.can_write_eof():
-            return
-        self._cliraop_proc.stdin.write_eof()
-        if not self.running or not self._cliraop_proc.stdin.can_write_eof():
+    async def write_eof(self) -> None:
+        """Write end-of-file chunk to the audo buffer."""
+        if not self.running:
             return
-        with suppress(BrokenPipeError):
-            await self._cliraop_proc.stdin.drain()
+        await self._audio_buffer.put(b"EOF")
 
 
 @dataclass
@@ -352,6 +371,7 @@ class AirPlayPlayer:
 
     player_id: str
     discovery_info: AsyncServiceInfo
+    address: str
     logger: logging.Logger
     active_stream: AirplayStreamJob | None = None
 
@@ -418,11 +438,11 @@ class AirplayProvider(PlayerProvider):
         # handle update for existing device
         if airplay_player := self._players.get(player_id):
             if mass_player := self.mass.players.get(player_id):
-                cur_address = info.parsed_addresses()[0]
-                prev_address = airplay_player.discovery_info.parsed_addresses()[0]
-                if cur_address != prev_address:
+                cur_address = get_primary_ip_address(info)
+                if cur_address and cur_address != airplay_player.address:
+                    airplay_player.address = cur_address
                     airplay_player.logger.info(
-                        "Address updated from %s to %s", prev_address, cur_address
+                        "Address updated from %s to %s", airplay_player.address, cur_address
                     )
                     mass_player.device_info = DeviceInfo(
                         model=mass_player.device_info.model,
@@ -463,6 +483,8 @@ class AirplayProvider(PlayerProvider):
 
         - player_id: player_id of the player to handle the command.
         """
+        if existing_stream := self._stream_tasks.get(player_id):
+            existing_stream.cancel()
 
         async def stop_player(airplay_player: AirPlayPlayer) -> None:
             if airplay_player.active_stream:
@@ -522,7 +544,11 @@ class AirplayProvider(PlayerProvider):
             self._resync_handle.cancel()
             self._resync_handle = None
         # always stop existing stream first
-        await self.cmd_stop(player_id)
+        if existing_stream := self._stream_tasks.get(player_id):
+            existing_stream.cancel()
+        for airplay_player in self._get_sync_clients(player_id):
+            if airplay_player.active_stream and airplay_player.active_stream.running:
+                self.mass.create_task(airplay_player.active_stream.stop())
         # start streaming the queue (pcm) audio in a background task
         queue = self.mass.player_queues.get_active_queue(player_id)
         self._stream_tasks[player_id] = asyncio.create_task(
@@ -554,7 +580,12 @@ class AirplayProvider(PlayerProvider):
             self._resync_handle.cancel()
             self._resync_handle = None
         # always stop existing stream first
-        await self.cmd_stop(player_id)
+        if existing_stream := self._stream_tasks.get(player_id):
+            existing_stream.cancel()
+        async with asyncio.TaskGroup() as tg:
+            for airplay_player in self._get_sync_clients(player_id):
+                if airplay_player.active_stream and airplay_player.active_stream.running:
+                    tg.create_task(airplay_player.active_stream.stop())
         if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100:
             # TODO: resample on the fly here ?
             raise RuntimeError("Unsupported PCM format")
@@ -605,32 +636,23 @@ class AirplayProvider(PlayerProvider):
         async for pcm_chunk in audio_iterator:
             # send audio chunk to player(s)
             available_clients = 0
-            async with asyncio.TaskGroup() as tg:
-                for airplay_player in self._get_sync_clients(player_id):
-                    if (
-                        not airplay_player.active_stream
-                        or not airplay_player.active_stream.running
-                        or airplay_player.active_stream.start_ntp != start_ntp
-                    ):
-                        # catch when this stream is no longer active on the player
-                        continue
-                    available_clients += 1
-                    tg.create_task(airplay_player.active_stream.write_chunk(pcm_chunk))
-                    # send the progress report every 5 seconds
-                    now = time.time()
-                    if now - prev_progress_report >= 5:
-                        prev_progress_report = now
-                        tg.create_task(
-                            airplay_player.active_stream.send_cli_command(
-                                f"PROGRESS={int(queue.elapsed_time)}\n"
-                            )
-                        )
+            for airplay_player in self._get_sync_clients(player_id):
+                if (
+                    not airplay_player.active_stream
+                    or not airplay_player.active_stream.running
+                    or airplay_player.active_stream.start_ntp != start_ntp
+                ):
+                    # catch when this stream is no longer active on the player
+                    continue
+                available_clients += 1
+                await airplay_player.active_stream.write_chunk(pcm_chunk)
             if not available_clients:
                 # this streamjob is no longer active
                 return
 
             # send metadata to player(s) if needed
             # NOTE: this must all be done in separate tasks to not disturb audio
+            now = time.time()
             if queue and queue.current_item and queue.current_item.streamdetails:
                 metadata_checksum = (
                     queue.current_item.streamdetails.stream_title
@@ -638,9 +660,19 @@ class AirplayProvider(PlayerProvider):
                 )
                 if prev_metadata_checksum != metadata_checksum:
                     prev_metadata_checksum = metadata_checksum
+                    prev_progress_report = now
                     self.mass.create_task(self._send_metadata(player_id, queue))
+                # send the progress report every 5 seconds
+                elif now - prev_progress_report >= 5:
+                    prev_progress_report = now
+                    self.mass.create_task(self._send_progress(player_id, queue))
 
         # end of stream reached - write eof
+        self.logger.debug(
+            "Finished RAOP stream for Queue %s to %s",
+            queue.display_name,
+            "/".join(synced_player_ids),
+        )
         for airplay_player in self._get_sync_clients(player_id):
             if (
                 not airplay_player.active_stream
@@ -722,6 +754,9 @@ class AirplayProvider(PlayerProvider):
         group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
         group_leader.group_childs.remove(player_id)
         player.synced_to = None
+        # guard if this was the last sync child of the group player
+        if group_leader.group_childs == {group_leader.player_id}:
+            group_leader.group_childs.remove(group_leader.player_id)
         await self.cmd_stop(player_id)
         self.mass.players.update(player_id)
 
@@ -774,17 +809,17 @@ class AirplayProvider(PlayerProvider):
         self, player_id: str, display_name: str, info: AsyncServiceInfo
     ) -> None:
         """Handle setup of a new player that is discovered using mdns."""
-        address = info.parsed_addresses()[0]
-        # some guards if our info is valid/complete
-        if address == "127.0.0.1":
+        address = get_primary_ip_address(info)
+        if address is None:
             return
+        # some guards if our info is valid/complete
         if "md" not in info.decoded_properties:
             return
         if "et" not in info.decoded_properties:
             return
         self.logger.debug("Discovered Airplay device %s on %s", display_name, address)
         self._players[player_id] = AirPlayPlayer(
-            player_id, discovery_info=info, logger=self.logger.getChild(player_id)
+            player_id, discovery_info=info, address=address, logger=self.logger.getChild(player_id)
         )
         manufacturer, model = get_model_from_am(info.decoded_properties.get("am"))
         if "apple tv" in model.lower():
@@ -870,16 +905,17 @@ class AirplayProvider(PlayerProvider):
 
             player_id = airplay_player.player_id
             mass_player = self.mass.players.get(player_id)
+            active_queue = self.mass.player_queues.get_active_queue(player_id)
             if path == "/ctrl-int/1/nextitem":
-                self.mass.create_task(self.mass.player_queues.next(player_id))
+                self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id))
             elif path == "/ctrl-int/1/previtem":
-                self.mass.create_task(self.mass.player_queues.previous(player_id))
+                self.mass.create_task(self.mass.player_queues.previous(active_queue.queue_id))
             elif path == "/ctrl-int/1/play":
-                self.mass.create_task(self.mass.player_queues.play(player_id))
+                self.mass.create_task(self.mass.player_queues.play(active_queue.queue_id))
             elif path == "/ctrl-int/1/playpause":
-                self.mass.create_task(self.mass.player_queues.play_pause(player_id))
+                self.mass.create_task(self.mass.player_queues.play_pause(active_queue.queue_id))
             elif path == "/ctrl-int/1/stop":
-                self.mass.create_task(self.cmd_stop(player_id))
+                self.mass.create_task(self.mass.player_queues.stop(active_queue.queue_id))
             elif path == "/ctrl-int/1/volumeup":
                 self.mass.create_task(self.mass.players.cmd_volume_up(player_id))
             elif path == "/ctrl-int/1/volumedown":
@@ -887,10 +923,12 @@ class AirplayProvider(PlayerProvider):
             elif path == "/ctrl-int/1/shuffle_songs":
                 queue = self.mass.player_queues.get(player_id)
                 self.mass.create_task(
-                    self.mass.player_queues.set_shuffle(player_id, not queue.shuffle_enabled)
+                    self.mass.player_queues.set_shuffle(
+                        active_queue.queue_id, not queue.shuffle_enabled
+                    )
                 )
             elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
-                self.mass.create_task(self.mass.player_queues.pause(player_id))
+                self.mass.create_task(self.mass.player_queues.pause(active_queue.queue_id))
             elif "dmcp.device-volume=" in path:
                 raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
                 volume = convert_airplay_volume(raop_volume)
@@ -944,10 +982,10 @@ class AirplayProvider(PlayerProvider):
                 album = _album.name
 
         cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n"
-        cmd += f"DURATION={duration}\nACTION=SENDMETA\n"
+        cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
 
         for airplay_player in self._get_sync_clients(player_id):
-            if not airplay_player.active_stream:
+            if not airplay_player.active_stream or not airplay_player.active_stream.running:
                 continue
             await airplay_player.active_stream.send_cli_command(cmd)
 
@@ -960,6 +998,16 @@ class AirplayProvider(PlayerProvider):
             queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg"
         )
         for airplay_player in self._get_sync_clients(player_id):
-            if not airplay_player.active_stream:
+            if not airplay_player.active_stream or not airplay_player.active_stream.running:
                 continue
             await airplay_player.active_stream.send_cli_command(f"ARTWORK={image_url}\n")
+
+    async def _send_progress(self, player_id: str, queue: PlayerQueue) -> None:
+        """Send progress report to player (and connected sync childs)."""
+        if not queue or not queue.current_item:
+            return
+        progress = int(queue.corrected_elapsed_time)
+        for airplay_player in self._get_sync_clients(player_id):
+            if not airplay_player.active_stream or not airplay_player.active_stream.running:
+                continue
+            await airplay_player.active_stream.send_cli_command(f"PROGRESS={progress}\n")
index 09d268195c3277cec67d35b98167bf457143d247..bcfec2c587a9d54181ff53523510ebf9308215cc 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 differ
index 0ed63539b2ac75f1565946bb504f529f5c655f25..3face4f88e4d90d9cfc27583e05705cdbea196fa 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 differ
index 20689e636cebef286c4b0957906abdbf75eb986e..9cc60b1bb007daa8fc4a9adddc8bf249656a3467 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 differ
index bc77f4ca8da5b7e2ee5cb55e5d10b0aef642a888..5c432e7f28a67e03d0bc8f4b5df991c6eea4aaa3 100644 (file)
@@ -683,7 +683,7 @@ class OpenSonicProvider(MusicProvider):
                         audio_buffer.put(chunk), self.mass.loop
                     ).result()
             # send empty chunk when we're done
-            asyncio.run_coroutine_threadsafe(audio_buffer.put(b""), self.mass.loop).result()
+            asyncio.run_coroutine_threadsafe(audio_buffer.put(b"EOF"), self.mass.loop).result()
 
         # fire up an executor thread to put the audio chunks (threadsafe) on the audio buffer
         streamer_task = self.mass.loop.run_in_executor(None, _streamer)
@@ -691,7 +691,7 @@ class OpenSonicProvider(MusicProvider):
             while True:
                 # keep reading from the audio buffer until there is no more data
                 chunk = await audio_buffer.get()
-                if chunk == b"":
+                if chunk == b"EOF":
                     break
                 yield chunk
         finally: