fix issues with chromecast
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 19 Sep 2020 11:44:32 +0000 (13:44 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 19 Sep 2020 11:44:32 +0000 (13:44 +0200)
music_assistant/config.py
music_assistant/constants.py
music_assistant/models/player_queue.py
music_assistant/player_manager.py
music_assistant/providers/chromecast/__init__.py
music_assistant/providers/chromecast/player.py
music_assistant/stream_manager.py [new file with mode: 0755]

index 1395e2f036dc7f1c3891ee63d892bb6f4c3e7a35..49c1fb339016ed47ffab7576040d8a0d33daaa06 100755 (executable)
@@ -252,7 +252,7 @@ class ConfigItem:
                         self.mass.get_provider(self._parent_item_key).async_on_reload()
                     )
                 if self._base_type == ConfigBaseType.PLAYER:
-
+                    # force update of player if it's config changed
                     player = self.mass.player_manager.get_player(self._parent_item_key)
                     if player:
                         self.mass.add_job(
index 4fe7eb74c6e2f9f953c01016643ab092f1d9f858..df4939ba56f538e32b4be2a9317620822c3d5536 100755 (executable)
@@ -1,6 +1,6 @@
 """All constants for Music Assistant."""
 
-__version__ = "0.0.36"
+__version__ = "0.0.37"
 REQUIRED_PYTHON_VER = "3.7"
 
 CONF_USERNAME = "username"
index c9b9a495901104cf3de6e2b7ae52fd02e78ce055..df9ee7d980595b65fa7407ee43dace836cde09fe 100755 (executable)
@@ -54,10 +54,10 @@ class QueueItem(Track):
 class PlayerQueue:
     """Class that holds the queue items for a player."""
 
-    def __init__(self, mass, player):
+    def __init__(self, mass, player_id: str):
         """Initialize class."""
         self.mass = mass
-        self._player = player
+        self._player_id = player_id
         self._items = []
         self._shuffle_enabled = False
         self._repeat_enabled = False
@@ -78,12 +78,12 @@ class PlayerQueue:
     @property
     def player(self):
         """Return handle to player."""
-        return self._player
+        return self.mass.player_manager.get_player(self._player_id)
 
     @property
     def player_id(self):
         """Return handle to player."""
-        return self._player.player_id
+        return self._player_id
 
     @property
     def shuffle_enabled(self):
@@ -213,9 +213,21 @@ class PlayerQueue:
         For example if crossfading is requested but a player doesn't natively support it
         we will send a constant stream of audio to the player with all tracks.
         """
-        supports_crossfade = PlayerFeature.CROSSFADE in self.player.features
-        supports_queue = PlayerFeature.QUEUE in self.player.features
-        return not supports_crossfade if self.crossfade_enabled else not supports_queue
+        return (
+            not self.supports_crossfade
+            if self.crossfade_enabled
+            else not self.supports_queue
+        )
+
+    @property
+    def supports_queue(self):
+        """Return if this player supports native queue."""
+        return PlayerFeature.QUEUE in self.player.features
+
+    @property
+    def supports_crossfade(self):
+        """Return if this player supports native crossfade."""
+        return PlayerFeature.CROSSFADE in self.player.features
 
     @callback
     def get_item(self, index):
@@ -240,7 +252,6 @@ class PlayerQueue:
             return
         if self.use_queue_stream:
             return await self.async_play_index(self.cur_index + 1)
-        await self.mass.player_manager.async_cmd_power_on(self.player_id)
         return await self.mass.player_manager.get_player_provider(
             self.player_id
         ).async_cmd_next(self.player_id)
@@ -249,7 +260,6 @@ class PlayerQueue:
         """Play the previous track in the queue."""
         if self.cur_index is None:
             return
-        await self.mass.player_manager.async_cmd_power_on(self.player_id)
         if self.use_queue_stream:
             return await self.async_play_index(self.cur_index - 1)
         return await self.mass.player_manager.async_cmd_previous(self.player_id)
@@ -257,10 +267,8 @@ class PlayerQueue:
     async def async_resume(self):
         """Resume previous queue."""
         if self.items:
-            await self.mass.player_manager.async_cmd_power_on(self.player_id)
             prev_index = self.cur_index
-            supports_queue = PlayerFeature.QUEUE in self.player.features
-            if self.use_queue_stream or not supports_queue:
+            if self.use_queue_stream or not self.supports_queue:
                 await self.async_play_index(prev_index)
             else:
                 # at this point we don't know if the queue is synced with the player
@@ -272,14 +280,12 @@ class PlayerQueue:
                 await self.async_play_index(prev_index)
         else:
             LOGGER.warning(
-                "resume queue requested for %s but queue is empty", self.player.name
+                "resume queue requested for %s but queue is empty", self.player_id
             )
 
     async def async_play_index(self, index):
         """Play item at index X in queue."""
-        await self.mass.player_manager.async_cmd_power_on(self.player_id)
         player_prov = self.mass.player_manager.get_player_provider(self.player_id)
-        supports_queue = PlayerFeature.QUEUE in self.player.features
         if not isinstance(index, int):
             index = self.__index_by_id(index)
         if not len(self.items) > index:
@@ -297,7 +303,7 @@ class PlayerQueue:
             return await player_prov.async_cmd_play_uri(
                 self.player_id, queue_stream_uri
             )
-        if supports_queue:
+        if self.supports_queue:
             try:
                 return await player_prov.async_cmd_queue_play_index(
                     self.player_id, index
@@ -342,14 +348,12 @@ class PlayerQueue:
 
     async def async_load(self, queue_items: List[QueueItem]):
         """Load (overwrite) queue with new items."""
-        await self.mass.player_manager.async_cmd_power_on(self.player_id)
-        supports_queue = PlayerFeature.QUEUE in self.player.features
         for index, item in enumerate(queue_items):
             item.sort_index = index
         if self._shuffle_enabled:
             queue_items = self.__shuffle_items(queue_items)
         self._items = queue_items
-        if self.use_queue_stream or not supports_queue:
+        if self.use_queue_stream or not self.supports_queue:
             await self.async_play_index(0)
         else:
             player_prov = self.mass.player_manager.get_player_provider(self.player_id)
@@ -366,7 +370,6 @@ class PlayerQueue:
             :param queue_items: a list of QueueItem
             :param offset: offset from current queue position
         """
-        supports_queue = PlayerFeature.QUEUE in self.player.features
 
         if (
             not self.items
@@ -393,7 +396,7 @@ class PlayerQueue:
                 + queue_items
                 + self._items[insert_at_index:]
             )
-        if self.use_queue_stream or not supports_queue:
+        if self.use_queue_stream:
             if offset == 0:
                 await self.async_play_index(insert_at_index)
         else:
@@ -417,7 +420,6 @@ class PlayerQueue:
 
     async def async_append(self, queue_items: List[QueueItem]):
         """Append new items at the end of the queue."""
-        supports_queue = PlayerFeature.QUEUE in self.player.features
         for index, item in enumerate(queue_items):
             item.sort_index = len(self.items) + index
         if self.shuffle_enabled:
@@ -427,7 +429,7 @@ class PlayerQueue:
             items = played_items + [self.cur_item] + next_items
             return await self.async_update(items)
         self._items = self._items + queue_items
-        if supports_queue and not self.use_queue_stream:
+        if self.supports_queue and not self.use_queue_stream:
             # send queue to player's own implementation
             player_prov = self.mass.player_manager.get_player_provider(self.player_id)
             try:
@@ -446,9 +448,8 @@ class PlayerQueue:
 
     async def async_update(self, queue_items: List[QueueItem]):
         """Update the existing queue items, mostly caused by reordering."""
-        supports_queue = PlayerFeature.QUEUE in self.player.features
         self._items = queue_items
-        if supports_queue and not self.use_queue_stream:
+        if self.supports_queue and not self.use_queue_stream:
             # send queue to player's own implementation
             player_prov = self.mass.player_manager.get_player_provider(self.player_id)
             try:
@@ -467,10 +468,9 @@ class PlayerQueue:
 
     async def async_clear(self):
         """Clear all items in the queue."""
-        supports_queue = PlayerFeature.QUEUE in self.player.features
         await self.mass.player_manager.async_cmd_stop(self.player_id)
         self._items = []
-        if supports_queue:
+        if self.supports_queue:
             # send queue cmd to player's own implementation
             player_prov = self.mass.player_manager.get_player_provider(self.player_id)
             try:
index 119b17b762c5e16cacd4484d7fd2e044749f814f..207b8f529e03c839a0eafd05de8dcb89c44396cb 100755 (executable)
@@ -98,9 +98,8 @@ class PlayerManager:
     def get_player(self, player_id: str) -> Player:
         """Return player by player_id or None if player does not exist."""
         player = self._players.get(player_id)
-        if not player or not player.available:
+        if not player:
             LOGGER.warning("Player %s is not available!", player_id)
-            return None
         return player
 
     @callback
@@ -147,7 +146,9 @@ class PlayerManager:
         if is_new_player:
             # create player queue
             if player.player_id not in self._player_queues:
-                self._player_queues[player.player_id] = PlayerQueue(self.mass, player)
+                self._player_queues[player.player_id] = PlayerQueue(
+                    self.mass, player.player_id
+                )
             # TODO: turn on player if it was previously turned on ?
             LOGGER.info(
                 "New player added: %s/%s",
index ac7177c8b1accca2d1f0ae77c75e8c893fe54420..99cd6066cde9a0ed22d7d897f4f1ffac2107cf23 100644 (file)
@@ -59,9 +59,13 @@ class ChromecastProvider(PlayerProvider):
             self.__chromecast_remove_callback,
             self.__chromecast_add_update_callback,
         )
-        self._browser = pychromecast.discovery.start_discovery(
-            self._listener, self.mass.zeroconf
-        )
+
+        def start_discovery():
+            self._browser = pychromecast.discovery.start_discovery(
+                self._listener, self.mass.zeroconf
+            )
+
+        self.mass.add_job(start_discovery)
         return True
 
     async def async_on_stop(self):
@@ -76,7 +80,7 @@ class ChromecastProvider(PlayerProvider):
 
     async def async_cmd_play_uri(self, player_id: str, uri: str):
         """
-        Play the specified uri/url on the goven player.
+        Play the specified uri/url on the given player.
 
             :param player_id: player_id of the player to handle the command.
         """
@@ -191,15 +195,13 @@ class ChromecastProvider(PlayerProvider):
             port=service[5],
         )
         player_id = cast_info.uuid
-        LOGGER.debug(
-            "Chromecast discovered: %s (%s)", cast_info.friendly_name, player_id
-        )
         if player_id in self._players:
             # player already added, the player will take care of reconnects itself.
+            return
+        else:
             LOGGER.debug(
-                "Player is already added:  %s (%s)", cast_info.friendly_name, player_id
+                "Chromecast discovered: %s (%s)", cast_info.friendly_name, player_id
             )
-        else:
             player = ChromecastPlayer(self.mass, cast_info)
             self._players[player_id] = player
             self.mass.add_job(self.mass.player_manager.async_add_player(player))
index 11b347ae256e4f5c98e82084e1c9271ff4883143..a5d5c6923c73586d6d8e5c92424dc366a8469763 100644 (file)
@@ -210,7 +210,6 @@ class ChromecastPlayer:
 
     def new_cast_status(self, cast_status):
         """Handle updates of the cast status."""
-        LOGGER.debug("received cast status for %s", self.name)
         self.cast_status = cast_status
         self._is_speaker_group = (
             self._cast_info.is_audio_group
@@ -224,7 +223,6 @@ class ChromecastPlayer:
 
     def new_media_status(self, media_status):
         """Handle updates of the media status."""
-        LOGGER.debug("received media_status for %s", self.name)
         self.media_status = media_status
         self.mass.add_job(self.mass.player_manager.async_update_player(self))
         if media_status.player_is_playing:
@@ -232,7 +230,6 @@ class ChromecastPlayer:
 
     def new_connection_status(self, connection_status):
         """Handle updates of connection status."""
-        LOGGER.debug("received connection_status for %s", self._cast_info.friendly_name)
         if connection_status.status == CONNECTION_STATUS_DISCONNECTED:
             self._available = False
             self._invalidate()
@@ -305,7 +302,9 @@ class ChromecastPlayer:
             LOGGER.warning("Ignore player command: Socket client is not connected.")
             return
         if self.media_status and (
-            self.media_status.player_is_playing or self.media_status.player_is_paused
+            self.media_status.player_is_playing
+            or self.media_status.player_is_paused
+            or self.media_status.player_is_idle
         ):
             self._chromecast.media_controller.stop()
         self._powered = False
diff --git a/music_assistant/stream_manager.py b/music_assistant/stream_manager.py
new file mode 100755 (executable)
index 0000000..990d71d
--- /dev/null
@@ -0,0 +1,606 @@
+"""
+StreamManager: handles all audio streaming to players.
+
+Either by sending tracks one by one or send one continuous stream
+of music with crossfade/gapless support (queue stream).
+"""
+import asyncio
+import gc
+import io
+import logging
+import shlex
+import subprocess
+import threading
+import urllib
+from contextlib import suppress
+
+import aiohttp
+import pyloudnorm
+import soundfile
+from aiofile import AIOFile, Reader
+from aiohttp import web
+from music_assistant.constants import EVENT_STREAM_ENDED, EVENT_STREAM_STARTED
+from music_assistant.models.media_types import MediaType
+from music_assistant.models.player_queue import QueueItem
+from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType
+from music_assistant.utils import create_tempfile, decrypt_string, get_ip, try_parse_int
+from music_assistant.web import require_local_subnet
+
+LOGGER = logging.getLogger("mass")
+
+MusicAssistantType = "MusicAssistant"
+
+
+class StreamManager:
+    """Built-in streamer utilizing SoX."""
+
+    def __init__(self, mass: MusicAssistantType):
+        """Initialize class."""
+        self.mass = mass
+        self.local_ip = get_ip()
+        self.analyze_jobs = {}
+        self.stream_clients = []
+
+    async def async_get_audio_stream(self, streamdetails: StreamDetails):
+        """Get the (original) audio data for the given streamdetails. Generator."""
+        stream_path = decrypt_string(streamdetails.path)
+        stream_type = StreamType(streamdetails.type)
+
+        if streamdetails.content_type == ContentType.AAC:
+            # support for AAC created with ffmpeg in between
+            stream_type = StreamType.EXECUTABLE
+            streamdetails.content_type = ContentType.FLAC
+            stream_path = f'ffmpeg -v quiet -i "{stream_path}" -f flac -'
+
+        if stream_type == StreamType.URL:
+            async with self.mass.http_session.get(stream_path) as response:
+                async for chunk in response.content.iter_any():
+                    yield chunk
+        elif stream_type == StreamType.FILE:
+            async with AIOFile(stream_path) as afp:
+                async for chunk in Reader(afp):
+                    yield chunk
+        elif stream_type == StreamType.EXECUTABLE:
+            args = shlex.split(stream_path)
+            process = await asyncio.create_subprocess_exec(
+                *args, stdout=asyncio.subprocess.PIPE
+            )
+            try:
+                async for chunk in process.stdout:
+                    yield chunk
+            except (asyncio.CancelledError, StopAsyncIteration, GeneratorExit) as exc:
+                LOGGER.error("process aborted")
+                raise exc
+            finally:
+                process.terminate()
+                while True:
+                    data = await process.stdout.read()
+                    if not data:
+                        break
+                LOGGER.error("process ended")
+
+    async def async_stream_media_item(self, http_request):
+        """Start stream for a single media item, player independent."""
+        # make sure we have valid params
+        media_type = MediaType.from_string(http_request.match_info["media_type"])
+        if media_type not in [MediaType.Track, MediaType.Radio]:
+            return web.Response(status=404, reason="Media item is not playable!")
+        provider = http_request.match_info["provider"]
+        item_id = http_request.match_info["item_id"]
+        player_id = http_request.remote  # fake player id
+        # prepare headers as audio/flac content
+        resp = web.StreamResponse(
+            status=200, reason="OK", headers={"Content-Type": "audio/flac"}
+        )
+        await resp.prepare(http_request)
+        # collect tracks to play
+        media_item = await self.mass.music_manager.async_get_item(
+            item_id, provider, media_type
+        )
+        queue_item = QueueItem(media_item)
+        # run the streamer in executor to prevent the subprocess locking up our eventloop
+        cancelled = threading.Event()
+        bg_task = self.mass.loop.run_in_executor(
+            None,
+            self.__get_queue_item_stream,
+            player_id,
+            queue_item,
+            resp,
+            cancelled,
+        )
+        # let the streaming begin!
+        try:
+            await asyncio.gather(bg_task)
+        except (
+            asyncio.CancelledError,
+            aiohttp.ClientConnectionError,
+            asyncio.TimeoutError,
+        ) as exc:
+            cancelled.set()
+            raise exc  # re-raise
+        return resp
+
+    @require_local_subnet
+    async def async_stream(self, http_request):
+        """Start stream for a player."""
+        # make sure we have valid params
+        player_id = http_request.match_info.get("player_id", "")
+        player_queue = self.mass.player_manager.get_player_queue(player_id)
+        if not player_queue:
+            return web.Response(status=404, reason="Player(queue) not found!")
+        if not player_queue.use_queue_stream:
+            queue_item_id = http_request.match_info.get("queue_item_id")
+            queue_item = player_queue.by_item_id(queue_item_id)
+            if not queue_item:
+                return web.Response(status=404, reason="Invalid Queue item Id")
+        # prepare headers as audio/flac content
+        resp = web.StreamResponse(
+            status=200, reason="OK", headers={"Content-Type": "audio/flac"}
+        )
+        await resp.prepare(http_request)
+        # run the streamer in executor to prevent the subprocess locking up our eventloop
+        cancelled = threading.Event()
+        if player_queue.use_queue_stream:
+            bg_task = self.mass.loop.run_in_executor(
+                None, self.__get_queue_stream, player_id, resp, cancelled
+            )
+        else:
+            bg_task = self.mass.loop.run_in_executor(
+                None,
+                self.__get_queue_item_stream,
+                player_id,
+                queue_item,
+                resp,
+                cancelled,
+            )
+        # let the streaming begin!
+        try:
+            await asyncio.gather(bg_task)
+        except (
+            asyncio.CancelledError,
+            aiohttp.ClientConnectionError,
+            asyncio.TimeoutError,
+        ) as exc:
+            cancelled.set()
+            raise exc  # re-raise
+        return resp
+
+    def __get_queue_item_stream(self, player_id, queue_item, buffer, cancelled):
+        """Start streaming single queue track."""
+        # pylint: disable=unused-variable
+        LOGGER.debug(
+            "stream single queue track started for track %s on player %s",
+            queue_item.name,
+            player_id,
+        )
+        for is_last_chunk, audio_chunk in self.__get_audio_stream(
+            player_id, queue_item, cancelled
+        ):
+            if cancelled.is_set():
+                # http session ended
+                # we must consume the data to prevent hanging subprocess instances
+                continue
+            # put chunk in buffer
+            with suppress((BrokenPipeError, ConnectionResetError)):
+                asyncio.run_coroutine_threadsafe(
+                    buffer.write(audio_chunk), self.mass.loop
+                ).result()
+        # all chunks received: streaming finished
+        if cancelled.is_set():
+            LOGGER.debug(
+                "stream single track interrupted for track %s on player %s",
+                queue_item.name,
+                player_id,
+            )
+        else:
+            # indicate EOF if no more data
+            with suppress((BrokenPipeError, ConnectionResetError)):
+                asyncio.run_coroutine_threadsafe(
+                    buffer.write_eof(), self.mass.loop
+                ).result()
+
+            LOGGER.debug(
+                "stream single track finished for track %s on player %s",
+                queue_item.name,
+                player_id,
+            )
+
+    def __get_queue_stream(self, player_id, buffer, cancelled):
+        """Start streaming all queue tracks."""
+        player_conf = self.mass.config.get_player_config(player_id)
+        player_queue = self.mass.player_manager.get_player_queue(player_id)
+        sample_rate = try_parse_int(player_conf["max_sample_rate"])
+        fade_length = try_parse_int(player_conf["crossfade_duration"])
+        if not sample_rate or sample_rate < 44100:
+            sample_rate = 96000
+        if fade_length:
+            fade_bytes = int(sample_rate * 4 * 2 * fade_length)
+        else:
+            fade_bytes = int(sample_rate * 4 * 2 * 6)
+        pcm_args = "raw -b 32 -c 2 -e signed-integer -r %s" % sample_rate
+        args = "sox -t %s - -t flac -C 0 -" % pcm_args
+        # start sox process
+        args = shlex.split(args)
+        sox_proc = subprocess.Popen(
+            args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE
+        )
+
+        def fill_buffer():
+            while True:
+                chunk = sox_proc.stdout.read(128000)  # noqa
+                if not chunk:
+                    break
+                if chunk and not cancelled.is_set():
+                    with suppress((BrokenPipeError, ConnectionResetError)):
+                        asyncio.run_coroutine_threadsafe(
+                            buffer.write(chunk), self.mass.loop
+                        ).result()
+                del chunk
+            # indicate EOF if no more data
+            if not cancelled.is_set():
+                with suppress((BrokenPipeError, ConnectionResetError)):
+                    asyncio.run_coroutine_threadsafe(
+                        buffer.write_eof(), self.mass.loop
+                    ).result()
+
+        # start fill buffer task in background
+        fill_buffer_thread = threading.Thread(target=fill_buffer)
+        fill_buffer_thread.start()
+
+        LOGGER.info("Start Queue Stream for player %s ", player_id)
+        is_start = True
+        last_fadeout_data = b""
+        while True:
+            if cancelled.is_set():
+                break
+            # get the (next) track in queue
+            if is_start:
+                # report start of queue playback so we can calculate current track/duration etc.
+                queue_track = self.mass.add_job(
+                    player_queue.async_start_queue_stream()
+                ).result()
+                is_start = False
+            else:
+                queue_track = player_queue.next_item
+            if not queue_track:
+                LOGGER.debug("no (more) tracks left in queue")
+                break
+            LOGGER.debug(
+                "Start Streaming queue track: %s (%s) on player %s",
+                queue_track.item_id,
+                queue_track.name,
+                player_id,
+            )
+            fade_in_part = b""
+            cur_chunk = 0
+            prev_chunk = None
+            bytes_written = 0
+            # handle incoming audio chunks
+            for is_last_chunk, chunk in self.__get_audio_stream(
+                player_id,
+                queue_track,
+                cancelled,
+                chunksize=fade_bytes,
+                resample=sample_rate,
+            ):
+                cur_chunk += 1
+
+                # HANDLE FIRST PART OF TRACK
+                if not chunk and cur_chunk == 1 and is_last_chunk:
+                    LOGGER.warning("Stream error, skip track %s", queue_track.item_id)
+                    break
+                if cur_chunk <= 2 and not last_fadeout_data:
+                    # no fadeout_part available so just pass it to the output directly
+                    sox_proc.stdin.write(chunk)
+                    bytes_written += len(chunk)
+                    del chunk
+                elif cur_chunk == 1 and last_fadeout_data:
+                    prev_chunk = chunk
+                    del chunk
+                # HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN
+                elif cur_chunk == 2 and last_fadeout_data:
+                    # combine the first 2 chunks and strip off silence
+                    args = "sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%" % (
+                        pcm_args,
+                        pcm_args,
+                    )
+                    first_part, _ = subprocess.Popen(
+                        args, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE
+                    ).communicate(prev_chunk + chunk)
+                    if len(first_part) < fade_bytes:
+                        # part is too short after the strip action?!
+                        # so we just use the full first part
+                        first_part = prev_chunk + chunk
+                    fade_in_part = first_part[:fade_bytes]
+                    remaining_bytes = first_part[fade_bytes:]
+                    del first_part
+                    # do crossfade
+                    crossfade_part = self.__crossfade_pcm_parts(
+                        fade_in_part, last_fadeout_data, pcm_args, fade_length
+                    )
+                    sox_proc.stdin.write(crossfade_part)
+                    bytes_written += len(crossfade_part)
+                    del crossfade_part
+                    del fade_in_part
+                    last_fadeout_data = b""
+                    # also write the leftover bytes from the strip action
+                    sox_proc.stdin.write(remaining_bytes)
+                    bytes_written += len(remaining_bytes)
+                    del remaining_bytes
+                    del chunk
+                    prev_chunk = None  # needed to prevent this chunk being sent again
+                # HANDLE LAST PART OF TRACK
+                elif prev_chunk and is_last_chunk:
+                    # last chunk received so create the last_part
+                    # with the previous chunk and this chunk
+                    # and strip off silence
+                    args = (
+                        "sox --ignore-length -t %s - -t %s - reverse silence 1 0.1 1%% reverse"
+                        % (pcm_args, pcm_args)
+                    )
+                    last_part, _ = subprocess.Popen(
+                        args, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE
+                    ).communicate(prev_chunk + chunk)
+                    if len(last_part) < fade_bytes:
+                        # part is too short after the strip action
+                        # so we just use the entire original data
+                        last_part = prev_chunk + chunk
+                        if len(last_part) < fade_bytes:
+                            LOGGER.warning(
+                                "Not enough data for crossfade: %s", len(last_part)
+                            )
+                    if (
+                        not player_queue.crossfade_enabled
+                        or len(last_part) < fade_bytes
+                    ):
+                        # crossfading is not enabled so just pass the (stripped) audio data
+                        sox_proc.stdin.write(last_part)
+                        bytes_written += len(last_part)
+                        del last_part
+                        del chunk
+                    else:
+                        # handle crossfading support
+                        # store fade section to be picked up for next track
+                        last_fadeout_data = last_part[-fade_bytes:]
+                        remaining_bytes = last_part[:-fade_bytes]
+                        # write remaining bytes
+                        sox_proc.stdin.write(remaining_bytes)
+                        bytes_written += len(remaining_bytes)
+                        del last_part
+                        del remaining_bytes
+                        del chunk
+                # MIDDLE PARTS OF TRACK
+                else:
+                    # middle part of the track
+                    # keep previous chunk in memory so we have enough
+                    # samples to perform the crossfade
+                    if prev_chunk:
+                        sox_proc.stdin.write(prev_chunk)
+                        bytes_written += len(prev_chunk)
+                        prev_chunk = chunk
+                    else:
+                        prev_chunk = chunk
+                    del chunk
+            # end of the track reached
+            if cancelled.is_set():
+                # break out the loop if the http session is cancelled
+                break
+            # update actual duration to the queue for more accurate now playing info
+            accurate_duration = bytes_written / int(sample_rate * 4 * 2)
+            queue_track.duration = accurate_duration
+            LOGGER.debug(
+                "Finished Streaming queue track: %s (%s) on player %s",
+                queue_track.item_id,
+                queue_track.name,
+                player_id,
+            )
+            # run garbage collect manually to avoid too much memory fragmentation
+            gc.collect()
+        # end of queue reached, pass last fadeout bits to final output
+        if last_fadeout_data and not cancelled.is_set():
+            sox_proc.stdin.write(last_fadeout_data)
+            del last_fadeout_data
+        # END OF QUEUE STREAM
+        sox_proc.stdin.close()
+        sox_proc.terminate()
+        sox_proc.communicate()
+        fill_buffer_thread.join()
+        # run garbage collect manually to avoid too much memory fragmentation
+        gc.collect()
+        if cancelled.is_set():
+            LOGGER.info("streaming of queue for player %s interrupted", player_id)
+        else:
+            LOGGER.info("streaming of queue for player %s completed", player_id)
+
+    def __get_audio_stream(
+        self, player_id, queue_item, cancelled, chunksize=128000, resample=None
+    ):
+        """Get audio stream from provider and apply additional effects/processing if needed."""
+        streamdetails = self.mass.add_job(
+            self.mass.music_manager.async_get_stream_details(queue_item, player_id)
+        ).result()
+        if not streamdetails:
+            LOGGER.warning("no stream details for %s", queue_item.name)
+            yield (True, b"")
+            return
+        # get sox effects and resample options
+        sox_options = self.__get_player_sox_options(player_id, streamdetails)
+        outputfmt = "flac -C 0"
+        if resample:
+            outputfmt = "raw -b 32 -c 2 -e signed-integer"
+            sox_options += " rate -v %s" % resample
+        streamdetails.sox_options = sox_options
+        # determine how to proceed based on input file type
+        if streamdetails.content_type == ContentType.AAC:
+            # support for AAC created with ffmpeg in between
+            args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (
+                decrypt_string(streamdetails.path),
+                outputfmt,
+                sox_options,
+            )
+            process = subprocess.Popen(
+                args, shell=True, stdout=subprocess.PIPE, bufsize=chunksize
+            )
+        elif streamdetails.type in [StreamType.URL, StreamType.FILE]:
+            args = 'sox -t %s "%s" -t %s - %s' % (
+                streamdetails.content_type.name,
+                decrypt_string(streamdetails.path),
+                outputfmt,
+                sox_options,
+            )
+            args = shlex.split(args)
+            process = subprocess.Popen(
+                args, shell=False, stdout=subprocess.PIPE, bufsize=chunksize
+            )
+        elif streamdetails.type == StreamType.EXECUTABLE:
+            args = "%s | sox -t %s - -t %s - %s" % (
+                decrypt_string(streamdetails.path),
+                streamdetails.content_type.name,
+                outputfmt,
+                sox_options,
+            )
+            process = subprocess.Popen(
+                args, shell=True, stdout=subprocess.PIPE, bufsize=chunksize
+            )
+        else:
+            LOGGER.warning("no streaming options for %s", queue_item.name)
+            yield (True, b"")
+            return
+        # fire event that streaming has started for this track
+        self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails)
+        # yield chunks from stdout
+        # we keep 1 chunk behind to detect end of stream properly
+        prev_chunk = b""
+        while True:
+            if cancelled.is_set():
+                # http session ended
+                # send terminate and pick up left over bytes
+                process.terminate()
+                chunk, _ = process.communicate()
+            else:
+                # read exactly chunksize of data
+                chunk = process.stdout.read(chunksize)
+            if len(chunk) < chunksize:
+                # last chunk
+                yield (True, prev_chunk + chunk)
+                break
+            if prev_chunk:
+                yield (False, prev_chunk)
+            prev_chunk = chunk
+        # fire event that streaming has ended
+        if not cancelled.is_set():
+            streamdetails.seconds_played = queue_item.duration
+            self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails)
+            # send task to background to analyse the audio
+            if queue_item.media_type == MediaType.Track:
+                self.mass.add_job(self.__analyze_audio, streamdetails)
+
+    def __get_player_sox_options(
+        self, player_id: str, streamdetails: StreamDetails
+    ) -> str:
+        """Get player specific sox effect options."""
+        sox_options = []
+        player_conf = self.mass.config.get_player_config(player_id)
+        # volume normalisation
+        gain_correct = self.mass.add_job(
+            self.mass.player_manager.async_get_gain_correct(
+                player_id, streamdetails.item_id, streamdetails.provider
+            )
+        ).result()
+        if gain_correct != 0:
+            sox_options.append("vol %s dB " % gain_correct)
+        # downsample if needed
+        if player_conf["max_sample_rate"]:
+            max_sample_rate = try_parse_int(player_conf["max_sample_rate"])
+            if max_sample_rate < streamdetails.sample_rate:
+                sox_options.append(f"rate -v {max_sample_rate}")
+        if player_conf.get("sox_options"):
+            sox_options.append(player_conf["sox_options"])
+        return " ".join(sox_options)
+
+    def __analyze_audio(self, streamdetails):
+        """Analyze track audio, for now we only calculate EBU R128 loudness."""
+        item_key = "%s%s" % (streamdetails.item_id, streamdetails.provider)
+        if item_key in self.analyze_jobs:
+            return  # prevent multiple analyze jobs for same track
+        self.analyze_jobs[item_key] = True
+        track_loudness = self.mass.add_job(
+            self.mass.database.async_get_track_loudness(
+                streamdetails.item_id, streamdetails.provider
+            )
+        ).result()
+        if track_loudness is None:
+            # only when needed we do the analyze stuff
+            LOGGER.debug("Start analyzing track %s", item_key)
+            if streamdetails.type == StreamType.URL:
+                audio_data = urllib.request.urlopen(
+                    decrypt_string(streamdetails.path)
+                ).read()
+            elif streamdetails.type == StreamType.EXECUTABLE:
+                audio_data = subprocess.check_output(
+                    decrypt_string(streamdetails.path), shell=True
+                )
+            elif streamdetails.type == StreamType.FILE:
+                with open(decrypt_string(streamdetails.path), "rb") as _file:
+                    audio_data = _file.read()
+            # calculate BS.1770 R128 integrated loudness
+            with io.BytesIO(audio_data) as tmpfile:
+                data, rate = soundfile.read(tmpfile)
+            meter = pyloudnorm.Meter(rate)  # create BS.1770 meter
+            loudness = meter.integrated_loudness(data)  # measure loudness
+            del data
+            self.mass.add_job(
+                self.mass.database.async_set_track_loudness(
+                    streamdetails.item_id, streamdetails.provider, loudness
+                )
+            )
+            del audio_data
+            LOGGER.debug("Integrated loudness of track %s is: %s", item_key, loudness)
+        self.analyze_jobs.pop(item_key, None)
+
+    @staticmethod
+    def __crossfade_pcm_parts(fade_in_part, fade_out_part, pcm_args, fade_length):
+        """Crossfade two chunks of audio using sox."""
+        # create fade-in part
+        fadeinfile = create_tempfile()
+        args = "sox --ignore-length -t %s - -t %s %s fade t %s" % (
+            pcm_args,
+            pcm_args,
+            fadeinfile.name,
+            fade_length,
+        )
+        args = shlex.split(args)
+        process = subprocess.Popen(args, shell=False, stdin=subprocess.PIPE)
+        process.communicate(fade_in_part)
+        # create fade-out part
+        fadeoutfile = create_tempfile()
+        args = "sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse" % (
+            pcm_args,
+            pcm_args,
+            fadeoutfile.name,
+            fade_length,
+        )
+        args = shlex.split(args)
+        process = subprocess.Popen(
+            args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE
+        )
+        process.communicate(fade_out_part)
+        # create crossfade using sox and some temp files
+        # TODO: figure out how to make this less complex and without the tempfiles
+        args = "sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -" % (
+            pcm_args,
+            fadeoutfile.name,
+            pcm_args,
+            fadeinfile.name,
+            pcm_args,
+        )
+        args = shlex.split(args)
+        process = subprocess.Popen(
+            args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE
+        )
+        crossfade_part, _ = process.communicate()
+        fadeinfile.close()
+        fadeoutfile.close()
+        del fadeinfile
+        del fadeoutfile
+        return crossfade_part