From b70d07b85646c0cbd640b2385b78a1d995ed415f Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sat, 19 Sep 2020 13:44:32 +0200 Subject: [PATCH] fix issues with chromecast --- music_assistant/config.py | 2 +- music_assistant/constants.py | 2 +- music_assistant/models/player_queue.py | 54 +- music_assistant/player_manager.py | 7 +- .../providers/chromecast/__init__.py | 20 +- .../providers/chromecast/player.py | 7 +- music_assistant/stream_manager.py | 606 ++++++++++++++++++ 7 files changed, 653 insertions(+), 45 deletions(-) create mode 100755 music_assistant/stream_manager.py diff --git a/music_assistant/config.py b/music_assistant/config.py index 1395e2f0..49c1fb33 100755 --- a/music_assistant/config.py +++ b/music_assistant/config.py @@ -252,7 +252,7 @@ class ConfigItem: self.mass.get_provider(self._parent_item_key).async_on_reload() ) if self._base_type == ConfigBaseType.PLAYER: - + # force update of player if it's config changed player = self.mass.player_manager.get_player(self._parent_item_key) if player: self.mass.add_job( diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 4fe7eb74..df4939ba 100755 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -1,6 +1,6 @@ """All constants for Music Assistant.""" -__version__ = "0.0.36" +__version__ = "0.0.37" REQUIRED_PYTHON_VER = "3.7" CONF_USERNAME = "username" diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index c9b9a495..df9ee7d9 100755 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -54,10 +54,10 @@ class QueueItem(Track): class PlayerQueue: """Class that holds the queue items for a player.""" - def __init__(self, mass, player): + def __init__(self, mass, player_id: str): """Initialize class.""" self.mass = mass - self._player = player + self._player_id = player_id self._items = [] self._shuffle_enabled = False self._repeat_enabled = False @@ -78,12 +78,12 @@ class PlayerQueue: @property def player(self): """Return handle to player.""" - return self._player + return self.mass.player_manager.get_player(self._player_id) @property def player_id(self): """Return handle to player.""" - return self._player.player_id + return self._player_id @property def shuffle_enabled(self): @@ -213,9 +213,21 @@ class PlayerQueue: For example if crossfading is requested but a player doesn't natively support it we will send a constant stream of audio to the player with all tracks. """ - supports_crossfade = PlayerFeature.CROSSFADE in self.player.features - supports_queue = PlayerFeature.QUEUE in self.player.features - return not supports_crossfade if self.crossfade_enabled else not supports_queue + return ( + not self.supports_crossfade + if self.crossfade_enabled + else not self.supports_queue + ) + + @property + def supports_queue(self): + """Return if this player supports native queue.""" + return PlayerFeature.QUEUE in self.player.features + + @property + def supports_crossfade(self): + """Return if this player supports native crossfade.""" + return PlayerFeature.CROSSFADE in self.player.features @callback def get_item(self, index): @@ -240,7 +252,6 @@ class PlayerQueue: return if self.use_queue_stream: return await self.async_play_index(self.cur_index + 1) - await self.mass.player_manager.async_cmd_power_on(self.player_id) return await self.mass.player_manager.get_player_provider( self.player_id ).async_cmd_next(self.player_id) @@ -249,7 +260,6 @@ class PlayerQueue: """Play the previous track in the queue.""" if self.cur_index is None: return - await self.mass.player_manager.async_cmd_power_on(self.player_id) if self.use_queue_stream: return await self.async_play_index(self.cur_index - 1) return await self.mass.player_manager.async_cmd_previous(self.player_id) @@ -257,10 +267,8 @@ class PlayerQueue: async def async_resume(self): """Resume previous queue.""" if self.items: - await self.mass.player_manager.async_cmd_power_on(self.player_id) prev_index = self.cur_index - supports_queue = PlayerFeature.QUEUE in self.player.features - if self.use_queue_stream or not supports_queue: + if self.use_queue_stream or not self.supports_queue: await self.async_play_index(prev_index) else: # at this point we don't know if the queue is synced with the player @@ -272,14 +280,12 @@ class PlayerQueue: await self.async_play_index(prev_index) else: LOGGER.warning( - "resume queue requested for %s but queue is empty", self.player.name + "resume queue requested for %s but queue is empty", self.player_id ) async def async_play_index(self, index): """Play item at index X in queue.""" - await self.mass.player_manager.async_cmd_power_on(self.player_id) player_prov = self.mass.player_manager.get_player_provider(self.player_id) - supports_queue = PlayerFeature.QUEUE in self.player.features if not isinstance(index, int): index = self.__index_by_id(index) if not len(self.items) > index: @@ -297,7 +303,7 @@ class PlayerQueue: return await player_prov.async_cmd_play_uri( self.player_id, queue_stream_uri ) - if supports_queue: + if self.supports_queue: try: return await player_prov.async_cmd_queue_play_index( self.player_id, index @@ -342,14 +348,12 @@ class PlayerQueue: async def async_load(self, queue_items: List[QueueItem]): """Load (overwrite) queue with new items.""" - await self.mass.player_manager.async_cmd_power_on(self.player_id) - supports_queue = PlayerFeature.QUEUE in self.player.features for index, item in enumerate(queue_items): item.sort_index = index if self._shuffle_enabled: queue_items = self.__shuffle_items(queue_items) self._items = queue_items - if self.use_queue_stream or not supports_queue: + if self.use_queue_stream or not self.supports_queue: await self.async_play_index(0) else: player_prov = self.mass.player_manager.get_player_provider(self.player_id) @@ -366,7 +370,6 @@ class PlayerQueue: :param queue_items: a list of QueueItem :param offset: offset from current queue position """ - supports_queue = PlayerFeature.QUEUE in self.player.features if ( not self.items @@ -393,7 +396,7 @@ class PlayerQueue: + queue_items + self._items[insert_at_index:] ) - if self.use_queue_stream or not supports_queue: + if self.use_queue_stream: if offset == 0: await self.async_play_index(insert_at_index) else: @@ -417,7 +420,6 @@ class PlayerQueue: async def async_append(self, queue_items: List[QueueItem]): """Append new items at the end of the queue.""" - supports_queue = PlayerFeature.QUEUE in self.player.features for index, item in enumerate(queue_items): item.sort_index = len(self.items) + index if self.shuffle_enabled: @@ -427,7 +429,7 @@ class PlayerQueue: items = played_items + [self.cur_item] + next_items return await self.async_update(items) self._items = self._items + queue_items - if supports_queue and not self.use_queue_stream: + if self.supports_queue and not self.use_queue_stream: # send queue to player's own implementation player_prov = self.mass.player_manager.get_player_provider(self.player_id) try: @@ -446,9 +448,8 @@ class PlayerQueue: async def async_update(self, queue_items: List[QueueItem]): """Update the existing queue items, mostly caused by reordering.""" - supports_queue = PlayerFeature.QUEUE in self.player.features self._items = queue_items - if supports_queue and not self.use_queue_stream: + if self.supports_queue and not self.use_queue_stream: # send queue to player's own implementation player_prov = self.mass.player_manager.get_player_provider(self.player_id) try: @@ -467,10 +468,9 @@ class PlayerQueue: async def async_clear(self): """Clear all items in the queue.""" - supports_queue = PlayerFeature.QUEUE in self.player.features await self.mass.player_manager.async_cmd_stop(self.player_id) self._items = [] - if supports_queue: + if self.supports_queue: # send queue cmd to player's own implementation player_prov = self.mass.player_manager.get_player_provider(self.player_id) try: diff --git a/music_assistant/player_manager.py b/music_assistant/player_manager.py index 119b17b7..207b8f52 100755 --- a/music_assistant/player_manager.py +++ b/music_assistant/player_manager.py @@ -98,9 +98,8 @@ class PlayerManager: def get_player(self, player_id: str) -> Player: """Return player by player_id or None if player does not exist.""" player = self._players.get(player_id) - if not player or not player.available: + if not player: LOGGER.warning("Player %s is not available!", player_id) - return None return player @callback @@ -147,7 +146,9 @@ class PlayerManager: if is_new_player: # create player queue if player.player_id not in self._player_queues: - self._player_queues[player.player_id] = PlayerQueue(self.mass, player) + self._player_queues[player.player_id] = PlayerQueue( + self.mass, player.player_id + ) # TODO: turn on player if it was previously turned on ? LOGGER.info( "New player added: %s/%s", diff --git a/music_assistant/providers/chromecast/__init__.py b/music_assistant/providers/chromecast/__init__.py index ac7177c8..99cd6066 100644 --- a/music_assistant/providers/chromecast/__init__.py +++ b/music_assistant/providers/chromecast/__init__.py @@ -59,9 +59,13 @@ class ChromecastProvider(PlayerProvider): self.__chromecast_remove_callback, self.__chromecast_add_update_callback, ) - self._browser = pychromecast.discovery.start_discovery( - self._listener, self.mass.zeroconf - ) + + def start_discovery(): + self._browser = pychromecast.discovery.start_discovery( + self._listener, self.mass.zeroconf + ) + + self.mass.add_job(start_discovery) return True async def async_on_stop(self): @@ -76,7 +80,7 @@ class ChromecastProvider(PlayerProvider): async def async_cmd_play_uri(self, player_id: str, uri: str): """ - Play the specified uri/url on the goven player. + Play the specified uri/url on the given player. :param player_id: player_id of the player to handle the command. """ @@ -191,15 +195,13 @@ class ChromecastProvider(PlayerProvider): port=service[5], ) player_id = cast_info.uuid - LOGGER.debug( - "Chromecast discovered: %s (%s)", cast_info.friendly_name, player_id - ) if player_id in self._players: # player already added, the player will take care of reconnects itself. + return + else: LOGGER.debug( - "Player is already added: %s (%s)", cast_info.friendly_name, player_id + "Chromecast discovered: %s (%s)", cast_info.friendly_name, player_id ) - else: player = ChromecastPlayer(self.mass, cast_info) self._players[player_id] = player self.mass.add_job(self.mass.player_manager.async_add_player(player)) diff --git a/music_assistant/providers/chromecast/player.py b/music_assistant/providers/chromecast/player.py index 11b347ae..a5d5c692 100644 --- a/music_assistant/providers/chromecast/player.py +++ b/music_assistant/providers/chromecast/player.py @@ -210,7 +210,6 @@ class ChromecastPlayer: def new_cast_status(self, cast_status): """Handle updates of the cast status.""" - LOGGER.debug("received cast status for %s", self.name) self.cast_status = cast_status self._is_speaker_group = ( self._cast_info.is_audio_group @@ -224,7 +223,6 @@ class ChromecastPlayer: def new_media_status(self, media_status): """Handle updates of the media status.""" - LOGGER.debug("received media_status for %s", self.name) self.media_status = media_status self.mass.add_job(self.mass.player_manager.async_update_player(self)) if media_status.player_is_playing: @@ -232,7 +230,6 @@ class ChromecastPlayer: def new_connection_status(self, connection_status): """Handle updates of connection status.""" - LOGGER.debug("received connection_status for %s", self._cast_info.friendly_name) if connection_status.status == CONNECTION_STATUS_DISCONNECTED: self._available = False self._invalidate() @@ -305,7 +302,9 @@ class ChromecastPlayer: LOGGER.warning("Ignore player command: Socket client is not connected.") return if self.media_status and ( - self.media_status.player_is_playing or self.media_status.player_is_paused + self.media_status.player_is_playing + or self.media_status.player_is_paused + or self.media_status.player_is_idle ): self._chromecast.media_controller.stop() self._powered = False diff --git a/music_assistant/stream_manager.py b/music_assistant/stream_manager.py new file mode 100755 index 00000000..990d71d6 --- /dev/null +++ b/music_assistant/stream_manager.py @@ -0,0 +1,606 @@ +""" +StreamManager: handles all audio streaming to players. + +Either by sending tracks one by one or send one continuous stream +of music with crossfade/gapless support (queue stream). +""" +import asyncio +import gc +import io +import logging +import shlex +import subprocess +import threading +import urllib +from contextlib import suppress + +import aiohttp +import pyloudnorm +import soundfile +from aiofile import AIOFile, Reader +from aiohttp import web +from music_assistant.constants import EVENT_STREAM_ENDED, EVENT_STREAM_STARTED +from music_assistant.models.media_types import MediaType +from music_assistant.models.player_queue import QueueItem +from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType +from music_assistant.utils import create_tempfile, decrypt_string, get_ip, try_parse_int +from music_assistant.web import require_local_subnet + +LOGGER = logging.getLogger("mass") + +MusicAssistantType = "MusicAssistant" + + +class StreamManager: + """Built-in streamer utilizing SoX.""" + + def __init__(self, mass: MusicAssistantType): + """Initialize class.""" + self.mass = mass + self.local_ip = get_ip() + self.analyze_jobs = {} + self.stream_clients = [] + + async def async_get_audio_stream(self, streamdetails: StreamDetails): + """Get the (original) audio data for the given streamdetails. Generator.""" + stream_path = decrypt_string(streamdetails.path) + stream_type = StreamType(streamdetails.type) + + if streamdetails.content_type == ContentType.AAC: + # support for AAC created with ffmpeg in between + stream_type = StreamType.EXECUTABLE + streamdetails.content_type = ContentType.FLAC + stream_path = f'ffmpeg -v quiet -i "{stream_path}" -f flac -' + + if stream_type == StreamType.URL: + async with self.mass.http_session.get(stream_path) as response: + async for chunk in response.content.iter_any(): + yield chunk + elif stream_type == StreamType.FILE: + async with AIOFile(stream_path) as afp: + async for chunk in Reader(afp): + yield chunk + elif stream_type == StreamType.EXECUTABLE: + args = shlex.split(stream_path) + process = await asyncio.create_subprocess_exec( + *args, stdout=asyncio.subprocess.PIPE + ) + try: + async for chunk in process.stdout: + yield chunk + except (asyncio.CancelledError, StopAsyncIteration, GeneratorExit) as exc: + LOGGER.error("process aborted") + raise exc + finally: + process.terminate() + while True: + data = await process.stdout.read() + if not data: + break + LOGGER.error("process ended") + + async def async_stream_media_item(self, http_request): + """Start stream for a single media item, player independent.""" + # make sure we have valid params + media_type = MediaType.from_string(http_request.match_info["media_type"]) + if media_type not in [MediaType.Track, MediaType.Radio]: + return web.Response(status=404, reason="Media item is not playable!") + provider = http_request.match_info["provider"] + item_id = http_request.match_info["item_id"] + player_id = http_request.remote # fake player id + # prepare headers as audio/flac content + resp = web.StreamResponse( + status=200, reason="OK", headers={"Content-Type": "audio/flac"} + ) + await resp.prepare(http_request) + # collect tracks to play + media_item = await self.mass.music_manager.async_get_item( + item_id, provider, media_type + ) + queue_item = QueueItem(media_item) + # run the streamer in executor to prevent the subprocess locking up our eventloop + cancelled = threading.Event() + bg_task = self.mass.loop.run_in_executor( + None, + self.__get_queue_item_stream, + player_id, + queue_item, + resp, + cancelled, + ) + # let the streaming begin! + try: + await asyncio.gather(bg_task) + except ( + asyncio.CancelledError, + aiohttp.ClientConnectionError, + asyncio.TimeoutError, + ) as exc: + cancelled.set() + raise exc # re-raise + return resp + + @require_local_subnet + async def async_stream(self, http_request): + """Start stream for a player.""" + # make sure we have valid params + player_id = http_request.match_info.get("player_id", "") + player_queue = self.mass.player_manager.get_player_queue(player_id) + if not player_queue: + return web.Response(status=404, reason="Player(queue) not found!") + if not player_queue.use_queue_stream: + queue_item_id = http_request.match_info.get("queue_item_id") + queue_item = player_queue.by_item_id(queue_item_id) + if not queue_item: + return web.Response(status=404, reason="Invalid Queue item Id") + # prepare headers as audio/flac content + resp = web.StreamResponse( + status=200, reason="OK", headers={"Content-Type": "audio/flac"} + ) + await resp.prepare(http_request) + # run the streamer in executor to prevent the subprocess locking up our eventloop + cancelled = threading.Event() + if player_queue.use_queue_stream: + bg_task = self.mass.loop.run_in_executor( + None, self.__get_queue_stream, player_id, resp, cancelled + ) + else: + bg_task = self.mass.loop.run_in_executor( + None, + self.__get_queue_item_stream, + player_id, + queue_item, + resp, + cancelled, + ) + # let the streaming begin! + try: + await asyncio.gather(bg_task) + except ( + asyncio.CancelledError, + aiohttp.ClientConnectionError, + asyncio.TimeoutError, + ) as exc: + cancelled.set() + raise exc # re-raise + return resp + + def __get_queue_item_stream(self, player_id, queue_item, buffer, cancelled): + """Start streaming single queue track.""" + # pylint: disable=unused-variable + LOGGER.debug( + "stream single queue track started for track %s on player %s", + queue_item.name, + player_id, + ) + for is_last_chunk, audio_chunk in self.__get_audio_stream( + player_id, queue_item, cancelled + ): + if cancelled.is_set(): + # http session ended + # we must consume the data to prevent hanging subprocess instances + continue + # put chunk in buffer + with suppress((BrokenPipeError, ConnectionResetError)): + asyncio.run_coroutine_threadsafe( + buffer.write(audio_chunk), self.mass.loop + ).result() + # all chunks received: streaming finished + if cancelled.is_set(): + LOGGER.debug( + "stream single track interrupted for track %s on player %s", + queue_item.name, + player_id, + ) + else: + # indicate EOF if no more data + with suppress((BrokenPipeError, ConnectionResetError)): + asyncio.run_coroutine_threadsafe( + buffer.write_eof(), self.mass.loop + ).result() + + LOGGER.debug( + "stream single track finished for track %s on player %s", + queue_item.name, + player_id, + ) + + def __get_queue_stream(self, player_id, buffer, cancelled): + """Start streaming all queue tracks.""" + player_conf = self.mass.config.get_player_config(player_id) + player_queue = self.mass.player_manager.get_player_queue(player_id) + sample_rate = try_parse_int(player_conf["max_sample_rate"]) + fade_length = try_parse_int(player_conf["crossfade_duration"]) + if not sample_rate or sample_rate < 44100: + sample_rate = 96000 + if fade_length: + fade_bytes = int(sample_rate * 4 * 2 * fade_length) + else: + fade_bytes = int(sample_rate * 4 * 2 * 6) + pcm_args = "raw -b 32 -c 2 -e signed-integer -r %s" % sample_rate + args = "sox -t %s - -t flac -C 0 -" % pcm_args + # start sox process + args = shlex.split(args) + sox_proc = subprocess.Popen( + args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE + ) + + def fill_buffer(): + while True: + chunk = sox_proc.stdout.read(128000) # noqa + if not chunk: + break + if chunk and not cancelled.is_set(): + with suppress((BrokenPipeError, ConnectionResetError)): + asyncio.run_coroutine_threadsafe( + buffer.write(chunk), self.mass.loop + ).result() + del chunk + # indicate EOF if no more data + if not cancelled.is_set(): + with suppress((BrokenPipeError, ConnectionResetError)): + asyncio.run_coroutine_threadsafe( + buffer.write_eof(), self.mass.loop + ).result() + + # start fill buffer task in background + fill_buffer_thread = threading.Thread(target=fill_buffer) + fill_buffer_thread.start() + + LOGGER.info("Start Queue Stream for player %s ", player_id) + is_start = True + last_fadeout_data = b"" + while True: + if cancelled.is_set(): + break + # get the (next) track in queue + if is_start: + # report start of queue playback so we can calculate current track/duration etc. + queue_track = self.mass.add_job( + player_queue.async_start_queue_stream() + ).result() + is_start = False + else: + queue_track = player_queue.next_item + if not queue_track: + LOGGER.debug("no (more) tracks left in queue") + break + LOGGER.debug( + "Start Streaming queue track: %s (%s) on player %s", + queue_track.item_id, + queue_track.name, + player_id, + ) + fade_in_part = b"" + cur_chunk = 0 + prev_chunk = None + bytes_written = 0 + # handle incoming audio chunks + for is_last_chunk, chunk in self.__get_audio_stream( + player_id, + queue_track, + cancelled, + chunksize=fade_bytes, + resample=sample_rate, + ): + cur_chunk += 1 + + # HANDLE FIRST PART OF TRACK + if not chunk and cur_chunk == 1 and is_last_chunk: + LOGGER.warning("Stream error, skip track %s", queue_track.item_id) + break + if cur_chunk <= 2 and not last_fadeout_data: + # no fadeout_part available so just pass it to the output directly + sox_proc.stdin.write(chunk) + bytes_written += len(chunk) + del chunk + elif cur_chunk == 1 and last_fadeout_data: + prev_chunk = chunk + del chunk + # HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN + elif cur_chunk == 2 and last_fadeout_data: + # combine the first 2 chunks and strip off silence + args = "sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%" % ( + pcm_args, + pcm_args, + ) + first_part, _ = subprocess.Popen( + args, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE + ).communicate(prev_chunk + chunk) + if len(first_part) < fade_bytes: + # part is too short after the strip action?! + # so we just use the full first part + first_part = prev_chunk + chunk + fade_in_part = first_part[:fade_bytes] + remaining_bytes = first_part[fade_bytes:] + del first_part + # do crossfade + crossfade_part = self.__crossfade_pcm_parts( + fade_in_part, last_fadeout_data, pcm_args, fade_length + ) + sox_proc.stdin.write(crossfade_part) + bytes_written += len(crossfade_part) + del crossfade_part + del fade_in_part + last_fadeout_data = b"" + # also write the leftover bytes from the strip action + sox_proc.stdin.write(remaining_bytes) + bytes_written += len(remaining_bytes) + del remaining_bytes + del chunk + prev_chunk = None # needed to prevent this chunk being sent again + # HANDLE LAST PART OF TRACK + elif prev_chunk and is_last_chunk: + # last chunk received so create the last_part + # with the previous chunk and this chunk + # and strip off silence + args = ( + "sox --ignore-length -t %s - -t %s - reverse silence 1 0.1 1%% reverse" + % (pcm_args, pcm_args) + ) + last_part, _ = subprocess.Popen( + args, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE + ).communicate(prev_chunk + chunk) + if len(last_part) < fade_bytes: + # part is too short after the strip action + # so we just use the entire original data + last_part = prev_chunk + chunk + if len(last_part) < fade_bytes: + LOGGER.warning( + "Not enough data for crossfade: %s", len(last_part) + ) + if ( + not player_queue.crossfade_enabled + or len(last_part) < fade_bytes + ): + # crossfading is not enabled so just pass the (stripped) audio data + sox_proc.stdin.write(last_part) + bytes_written += len(last_part) + del last_part + del chunk + else: + # handle crossfading support + # store fade section to be picked up for next track + last_fadeout_data = last_part[-fade_bytes:] + remaining_bytes = last_part[:-fade_bytes] + # write remaining bytes + sox_proc.stdin.write(remaining_bytes) + bytes_written += len(remaining_bytes) + del last_part + del remaining_bytes + del chunk + # MIDDLE PARTS OF TRACK + else: + # middle part of the track + # keep previous chunk in memory so we have enough + # samples to perform the crossfade + if prev_chunk: + sox_proc.stdin.write(prev_chunk) + bytes_written += len(prev_chunk) + prev_chunk = chunk + else: + prev_chunk = chunk + del chunk + # end of the track reached + if cancelled.is_set(): + # break out the loop if the http session is cancelled + break + # update actual duration to the queue for more accurate now playing info + accurate_duration = bytes_written / int(sample_rate * 4 * 2) + queue_track.duration = accurate_duration + LOGGER.debug( + "Finished Streaming queue track: %s (%s) on player %s", + queue_track.item_id, + queue_track.name, + player_id, + ) + # run garbage collect manually to avoid too much memory fragmentation + gc.collect() + # end of queue reached, pass last fadeout bits to final output + if last_fadeout_data and not cancelled.is_set(): + sox_proc.stdin.write(last_fadeout_data) + del last_fadeout_data + # END OF QUEUE STREAM + sox_proc.stdin.close() + sox_proc.terminate() + sox_proc.communicate() + fill_buffer_thread.join() + # run garbage collect manually to avoid too much memory fragmentation + gc.collect() + if cancelled.is_set(): + LOGGER.info("streaming of queue for player %s interrupted", player_id) + else: + LOGGER.info("streaming of queue for player %s completed", player_id) + + def __get_audio_stream( + self, player_id, queue_item, cancelled, chunksize=128000, resample=None + ): + """Get audio stream from provider and apply additional effects/processing if needed.""" + streamdetails = self.mass.add_job( + self.mass.music_manager.async_get_stream_details(queue_item, player_id) + ).result() + if not streamdetails: + LOGGER.warning("no stream details for %s", queue_item.name) + yield (True, b"") + return + # get sox effects and resample options + sox_options = self.__get_player_sox_options(player_id, streamdetails) + outputfmt = "flac -C 0" + if resample: + outputfmt = "raw -b 32 -c 2 -e signed-integer" + sox_options += " rate -v %s" % resample + streamdetails.sox_options = sox_options + # determine how to proceed based on input file type + if streamdetails.content_type == ContentType.AAC: + # support for AAC created with ffmpeg in between + args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % ( + decrypt_string(streamdetails.path), + outputfmt, + sox_options, + ) + process = subprocess.Popen( + args, shell=True, stdout=subprocess.PIPE, bufsize=chunksize + ) + elif streamdetails.type in [StreamType.URL, StreamType.FILE]: + args = 'sox -t %s "%s" -t %s - %s' % ( + streamdetails.content_type.name, + decrypt_string(streamdetails.path), + outputfmt, + sox_options, + ) + args = shlex.split(args) + process = subprocess.Popen( + args, shell=False, stdout=subprocess.PIPE, bufsize=chunksize + ) + elif streamdetails.type == StreamType.EXECUTABLE: + args = "%s | sox -t %s - -t %s - %s" % ( + decrypt_string(streamdetails.path), + streamdetails.content_type.name, + outputfmt, + sox_options, + ) + process = subprocess.Popen( + args, shell=True, stdout=subprocess.PIPE, bufsize=chunksize + ) + else: + LOGGER.warning("no streaming options for %s", queue_item.name) + yield (True, b"") + return + # fire event that streaming has started for this track + self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails) + # yield chunks from stdout + # we keep 1 chunk behind to detect end of stream properly + prev_chunk = b"" + while True: + if cancelled.is_set(): + # http session ended + # send terminate and pick up left over bytes + process.terminate() + chunk, _ = process.communicate() + else: + # read exactly chunksize of data + chunk = process.stdout.read(chunksize) + if len(chunk) < chunksize: + # last chunk + yield (True, prev_chunk + chunk) + break + if prev_chunk: + yield (False, prev_chunk) + prev_chunk = chunk + # fire event that streaming has ended + if not cancelled.is_set(): + streamdetails.seconds_played = queue_item.duration + self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails) + # send task to background to analyse the audio + if queue_item.media_type == MediaType.Track: + self.mass.add_job(self.__analyze_audio, streamdetails) + + def __get_player_sox_options( + self, player_id: str, streamdetails: StreamDetails + ) -> str: + """Get player specific sox effect options.""" + sox_options = [] + player_conf = self.mass.config.get_player_config(player_id) + # volume normalisation + gain_correct = self.mass.add_job( + self.mass.player_manager.async_get_gain_correct( + player_id, streamdetails.item_id, streamdetails.provider + ) + ).result() + if gain_correct != 0: + sox_options.append("vol %s dB " % gain_correct) + # downsample if needed + if player_conf["max_sample_rate"]: + max_sample_rate = try_parse_int(player_conf["max_sample_rate"]) + if max_sample_rate < streamdetails.sample_rate: + sox_options.append(f"rate -v {max_sample_rate}") + if player_conf.get("sox_options"): + sox_options.append(player_conf["sox_options"]) + return " ".join(sox_options) + + def __analyze_audio(self, streamdetails): + """Analyze track audio, for now we only calculate EBU R128 loudness.""" + item_key = "%s%s" % (streamdetails.item_id, streamdetails.provider) + if item_key in self.analyze_jobs: + return # prevent multiple analyze jobs for same track + self.analyze_jobs[item_key] = True + track_loudness = self.mass.add_job( + self.mass.database.async_get_track_loudness( + streamdetails.item_id, streamdetails.provider + ) + ).result() + if track_loudness is None: + # only when needed we do the analyze stuff + LOGGER.debug("Start analyzing track %s", item_key) + if streamdetails.type == StreamType.URL: + audio_data = urllib.request.urlopen( + decrypt_string(streamdetails.path) + ).read() + elif streamdetails.type == StreamType.EXECUTABLE: + audio_data = subprocess.check_output( + decrypt_string(streamdetails.path), shell=True + ) + elif streamdetails.type == StreamType.FILE: + with open(decrypt_string(streamdetails.path), "rb") as _file: + audio_data = _file.read() + # calculate BS.1770 R128 integrated loudness + with io.BytesIO(audio_data) as tmpfile: + data, rate = soundfile.read(tmpfile) + meter = pyloudnorm.Meter(rate) # create BS.1770 meter + loudness = meter.integrated_loudness(data) # measure loudness + del data + self.mass.add_job( + self.mass.database.async_set_track_loudness( + streamdetails.item_id, streamdetails.provider, loudness + ) + ) + del audio_data + LOGGER.debug("Integrated loudness of track %s is: %s", item_key, loudness) + self.analyze_jobs.pop(item_key, None) + + @staticmethod + def __crossfade_pcm_parts(fade_in_part, fade_out_part, pcm_args, fade_length): + """Crossfade two chunks of audio using sox.""" + # create fade-in part + fadeinfile = create_tempfile() + args = "sox --ignore-length -t %s - -t %s %s fade t %s" % ( + pcm_args, + pcm_args, + fadeinfile.name, + fade_length, + ) + args = shlex.split(args) + process = subprocess.Popen(args, shell=False, stdin=subprocess.PIPE) + process.communicate(fade_in_part) + # create fade-out part + fadeoutfile = create_tempfile() + args = "sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse" % ( + pcm_args, + pcm_args, + fadeoutfile.name, + fade_length, + ) + args = shlex.split(args) + process = subprocess.Popen( + args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE + ) + process.communicate(fade_out_part) + # create crossfade using sox and some temp files + # TODO: figure out how to make this less complex and without the tempfiles + args = "sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -" % ( + pcm_args, + fadeoutfile.name, + pcm_args, + fadeinfile.name, + pcm_args, + ) + args = shlex.split(args) + process = subprocess.Popen( + args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE + ) + crossfade_part, _ = process.communicate() + fadeinfile.close() + fadeoutfile.close() + del fadeinfile + del fadeoutfile + return crossfade_part -- 2.34.1