From: Marcel van der Veldt Date: Sat, 15 Jun 2019 18:42:39 +0000 (+0200) Subject: simplify the streaming X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=284d1bae3de86ea66bc8d998835f22538866dafa;p=music-assistant-server.git simplify the streaming --- diff --git a/Dockerfile b/Dockerfile index cfa440b2..5c1ed6b8 100755 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,13 @@ -FROM python:3.7.3-alpine +FROM alpine:latest # install deps -RUN apk add build-base python-dev flac sox taglib-dev zip curl ffmpeg ffmpeg-dev sox-dev py-numpy +RUN apk add flac sox zip curl wget ffmpeg +RUN apk add --no-cache -X http://dl-cdn.alpinelinux.org/alpine/edge/testing python3 py3-numpy py3-scipy py3-matplotlib py3-aiohttp py3-cairocffi COPY requirements.txt requirements.txt -RUN pip install --upgrade -r requirements.txt +RUN apk --no-cache add --virtual .builddeps build-base python3-dev taglib-dev && \ + pip3 install -r requirements.txt && \ + apk del .builddeps && \ + rm -rf /root/.cache # copy files RUN mkdir -p /usr/src/app diff --git a/music_assistant/modules/http_streamer.py b/music_assistant/modules/http_streamer.py index 529bc790..5ec16d78 100755 --- a/music_assistant/modules/http_streamer.py +++ b/music_assistant/modules/http_streamer.py @@ -5,24 +5,16 @@ import asyncio import os from utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size from models import TrackQuality, MediaType, PlayerState -import shutil -import xml.etree.ElementTree as ET -import random -import base64 import operator from aiohttp import web import threading import urllib -import math from memory_tempfile import MemoryTempfile -import tempfile import io import soundfile as sf import pyloudnorm as pyln import aiohttp -AUDIO_TEMP_DIR = "/tmp/audio_tmp" -AUDIO_CACHE_DIR = "/tmp/audio_cache" class HTTPStreamer(): ''' Built-in streamer using sox and webserver ''' @@ -32,25 +24,13 @@ class HTTPStreamer(): self.create_config_entries() self.local_ip = get_ip() self.analyze_jobs = {} - self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder'] - # create needed temp/cache dirs - if self.mass.config['base']['http_streamer']['enable_cache'] and not os.path.isdir(self._audio_cache_dir): - self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder'] - os.makedirs(self._audio_cache_dir) - if not os.path.isdir(AUDIO_TEMP_DIR): - os.makedirs(AUDIO_TEMP_DIR) - mass.event_loop.create_task(self.__cache_cleanup()) def create_config_entries(self): ''' sets the config entries for this module (list with key/value pairs)''' config_entries = [ ('volume_normalisation', True, 'enable_r128_volume_normalisation'), ('target_volume', '-23', 'target_volume_lufs'), - ('fallback_gain_correct', '-12', 'fallback_gain_correct'), - ('enable_cache', True, 'enable_audio_cache'), - ('trim_silence', True, 'trim_silence'), - ('audio_cache_folder', '/tmp/audio_cache', 'audio_cache_folder'), - ('audio_cache_max_size_gb', 20, 'audio_cache_max_size_gb') + ('fallback_gain_correct', '-12', 'fallback_gain_correct') ] if not self.mass.config['base'].get('http_streamer'): self.mass.config['base']['http_streamer'] = {} @@ -222,52 +202,50 @@ class HTTPStreamer(): track_id, provider, player_id, chunksize=fade_bytes, outputfmt=pcm_args, sox_effects='rate -v %s' % sample_rate ): cur_chunk += 1 - if cur_chunk == 1 and not last_fadeout_data: + if cur_chunk <= 2 and not last_fadeout_data: # fade-in part but this is the first track so just pass it to the final file sox_proc.stdin.write(chunk) await sox_proc.stdin.drain() bytes_written += len(chunk) elif cur_chunk == 1 and last_fadeout_data: - # create fade-out part - args = 'sox --ignore-length -t %s - -t %s - reverse fade t %s reverse' % (pcm_args, pcm_args, fade_length) + prev_chunk = chunk + elif cur_chunk == 2 and last_fadeout_data: + # combine the first 2 chunks and strip off silence + args = 'sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%' % (pcm_args, pcm_args) process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) - last_fadeout_data, stderr = await process.communicate(last_fadeout_data) - LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data)) - # create fade-in part - args = 'sox --ignore-length -t %s - -t %s - fade t %s' % (pcm_args, pcm_args, fade_length) - process = await asyncio.create_subprocess_shell(args, - stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) - fade_in_part, stderr = await process.communicate(chunk) - LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part)) - # create crossfade using sox and some temp files - # TODO: figure out how to make this less complex and without the tempfiles - fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) - fadeinfile.write(fade_in_part) - fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) - fadeoutfile.write(last_fadeout_data) - args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args) - process = await asyncio.create_subprocess_shell(args, - stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) - crossfade_part, stderr = await process.communicate(fade_in_part) - LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part)) - # write the crossfade part to the sox player + first_part, stderr = await process.communicate(prev_chunk + chunk) + fade_in_part = first_part[:fade_bytes] + remaining_bytes = first_part[fade_bytes:] + del first_part + # do crossfade + crossfade_part = await self.__crossfade_pcm_parts(fade_in_part, last_fadeout_data, pcm_args, fade_length) sox_proc.stdin.write(crossfade_part) await sox_proc.stdin.drain() bytes_written += len(crossfade_part) - fadeinfile.close() - fadeoutfile.close() del crossfade_part - fade_in_part = None + del fade_in_part last_fadeout_data = b'' + # also write the leftover bytes from the strip action + sox_proc.stdin.write(remaining_bytes) + await sox_proc.stdin.drain() + bytes_written += len(remaining_bytes) + del remaining_bytes elif prev_chunk and is_last_chunk: - # last chunk received so create the fadeout with the previous chunk and this chunk - last_part = prev_chunk + chunk + # last chunk received so create the fadeout_part with the previous chunk and this chunk + # and strip off silence + args = 'sox --ignore-length -t %s - -t %s - reverse silence 1 0.1 1%% reverse' % (pcm_args, pcm_args) + process = await asyncio.create_subprocess_shell(args, + stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) + last_part, stderr = await process.communicate(prev_chunk + chunk) last_fadeout_data = last_part[-fade_bytes:] - bytes_remaining = last_part[:-fade_bytes] - sox_proc.stdin.write(bytes_remaining) - bytes_written += len(bytes_remaining) + remaining_bytes = last_part[:-fade_bytes] + # write remaining bytes + sox_proc.stdin.write(remaining_bytes) + bytes_written += len(remaining_bytes) await sox_proc.stdin.drain() + del last_part + del remaining_bytes else: # middle part of the track # keep previous chunk in memory so we have enough samples to perform the crossfade @@ -278,16 +256,6 @@ class HTTPStreamer(): prev_chunk = chunk else: prev_chunk = chunk - # pre-analyse the next track - to ensure smooth transitions - try: - queue_tracks = await self.mass.player.player_queue(player_id, queue_index+1, queue_index+2) - queue_track = queue_tracks[0] - params = urllib.parse.parse_qs(queue_track.uri.split('?')[1]) - track_id = params['track_id'][0] - provider = params['provider'][0] - self.mass.event_loop.create_task(self.__analyze_audio(track_id, provider)) - except: - pass # wait for the queue to consume the data # this prevents that the entire track is sitting in memory # and it helps a bit in the quest to follow where we are in the queue @@ -323,22 +291,16 @@ class HTTPStreamer(): else: gain_correct = '' sox_effects += await self.__get_player_sox_options(track_id, provider, player_id, False) - - cachefile = self.__get_track_cache_filename(track_id, provider) - if os.path.isfile(cachefile): - # stream from cachefile - args = 'sox -t sox "%s" -t %s - %s %s' % (cachefile, outputfmt, gain_correct, sox_effects) - else: - # stream directly from provider - streamdetails = asyncio.run_coroutine_threadsafe( - self.mass.music.providers[provider].get_stream_details(track_id), self.mass.event_loop).result() - if not streamdetails: - yield b'' - return - if streamdetails['type'] == 'url': - args = 'sox -t %s "%s" -t %s - %s %s' % (streamdetails["content_type"], streamdetails["path"], outputfmt, gain_correct, sox_effects) - elif streamdetails['type'] == 'executable': - args = '%s | sox -t %s - -t %s - %s %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, gain_correct, sox_effects) + # stream audio from provider + streamdetails = asyncio.run_coroutine_threadsafe( + self.mass.music.providers[provider].get_stream_details(track_id), self.mass.event_loop).result() + if not streamdetails: + yield b'' + return + if streamdetails['type'] == 'url': + args = 'sox -t %s "%s" -t %s - %s %s' % (streamdetails["content_type"], streamdetails["path"], outputfmt, gain_correct, sox_effects) + elif streamdetails['type'] == 'executable': + args = '%s | sox -t %s - -t %s - %s %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, gain_correct, sox_effects) LOGGER.debug("Running sox with args: %s" % args) process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE) @@ -394,11 +356,8 @@ class HTTPStreamer(): return # prevent multiple analyze jobs for same tracks self.analyze_jobs[track_key] = True streamdetails = await self.mass.music.providers[provider].get_stream_details(track_id) - cachefile = self.__get_track_cache_filename(track_id, provider) - enable_cache = self.mass.config['base']['http_streamer']['enable_cache'] - needs_cachefile = enable_cache and not os.path.isfile(cachefile) track_loudness = await self.mass.db.get_track_loudness(track_id, provider) - if needs_cachefile or track_loudness == None: + if track_loudness == None: # only when needed we do the analyze stuff LOGGER.info('Start analyzing track %s' % track_id) if streamdetails['type'] == 'url': @@ -418,11 +377,6 @@ class HTTPStreamer(): del data LOGGER.debug("Integrated loudness of track %s is: %s" %(track_id, loudness)) await self.mass.db.set_track_loudness(track_id, provider, loudness) - if needs_cachefile: - # use sox to store cache file (strip silence from start and end for better transitions) - cmd = 'sox -t %s - -t sox %s silence 1 0.1 1%% reverse silence 1 0.1 1%% reverse' %(streamdetails['content_type'], cachefile) - process = await asyncio.create_subprocess_shell(cmd, stdin=asyncio.subprocess.PIPE) - await process.communicate(audio_data) del audio_data LOGGER.info('Finished analyzing track %s' % track_id) self.analyze_jobs.pop(track_key, None) @@ -437,74 +391,24 @@ class HTTPStreamer(): gain_correct = target_gain - track_loudness return round(gain_correct,2) - async def __fill_audio_buffer(self, buf, track_id, provider, content_type): - ''' get audio data from provider and write to buffer''' - # fill the buffer with audio data - # a tempfile is created so we can do audio analysis - try: - tmpfile = os.path.join(AUDIO_TEMP_DIR, '%s%s%s.tmp' % (random.randint(0, 999), track_id, random.randint(0, 999))) - fd = open(tmpfile, 'wb') - async for chunk in self.mass.music.providers[provider].get_audio_stream(track_id): - if not chunk: - break - buf.write(chunk) - await buf.drain() - fd.write(chunk) - LOGGER.info("fill_audio_buffer complete for track %s" % track_id) - # successfull completion, process temp file for analysis - self.mass.event_loop.create_task( - self.__analyze_audio(tmpfile, track_id, provider, content_type)) - except Exception as exc: - LOGGER.exception("fill_audio_buffer failed for track %s" % track_id) - finally: - buf.write_eof() - await buf.drain() - fd.close() - - def __get_track_cache_filename(self, track_id, provider): - ''' get filename for a track to use as cache file ''' - filename = '%s%s' %(provider, track_id.split(os.sep)[-1]) - filename = base64.b64encode(filename.encode()).decode() - return os.path.join(self._audio_cache_dir, filename) - - @run_periodic(3600) - async def __cache_cleanup(self): - ''' calculate size of cache folder and cleanup if needed ''' - def cleanup(): - size_limit = self.mass.config['base']['http_streamer']['audio_cache_max_size_gb'] - total_size_gb = get_folder_size(self._audio_cache_dir) - LOGGER.info("current size of cache folder is %s GB" % total_size_gb) - if size_limit and total_size_gb > size_limit: - LOGGER.info("Cache folder size exceeds threshold, start cleanup...") - from pathlib import Path - import time - days = 14 - while total_size_gb > size_limit: - time_in_secs = time.time() - (days * 24 * 60 * 60) - for i in Path(self._audio_cache_dir).iterdir(): - if i.is_file(): - if i.stat().st_atime <= time_in_secs: - total_size_gb -= i.stat().st_size/float(1<<30) - i.unlink() - if total_size_gb < size_limit: - break - days -= 1 - LOGGER.info("Cache folder size cleanup completed") - await self.mass.event_loop.run_in_executor(None, cleanup) - - @staticmethod - def __get_bs1770_binary(): - ''' get the path to the bs1770 binary for the current OS ''' - import platform - bs1770_binary = None - if platform.system() == "Windows": - bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "win64", "bs1770gain") - elif platform.system() == "Darwin": - # macos binary is x86_64 intel - bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "osx", "bs1770gain") - elif platform.system() == "Linux": - architecture = platform.machine() - if architecture.startswith('AMD64') or architecture.startswith('x86_64'): - bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "linux64", "bs1770gain") - # TODO: build armhf binary - return bs1770_binary \ No newline at end of file + async def __crossfade_pcm_parts(self, fade_in_part, fade_out_part, pcm_args, fade_length): + ''' crossfade two chunks of audio using sox ''' + # create fade-in part + fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) + args = 'sox --ignore-length -t %s - -t %s %s fade t %s' % (pcm_args, pcm_args, fadeinfile.name, fade_length) + process = await asyncio.create_subprocess_shell(args, stdin=asyncio.subprocess.PIPE) + await process.communicate(fade_in_part) + # create fade-out part + fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) + args = 'sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse' % (pcm_args, pcm_args, fadeoutfile.name, fade_length) + process = await asyncio.create_subprocess_shell(args, + stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) + await process.communicate(fade_out_part) + # create crossfade using sox and some temp files + # TODO: figure out how to make this less complex and without the tempfiles + args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args) + process = await asyncio.create_subprocess_shell(args, + stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) + crossfade_part, stderr = await process.communicate() + LOGGER.info("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part)) + return crossfade_part diff --git a/music_assistant/modules/musicproviders/qobuz.py b/music_assistant/modules/musicproviders/qobuz.py index 7c152e68..b8dd2753 100644 --- a/music_assistant/modules/musicproviders/qobuz.py +++ b/music_assistant/modules/musicproviders/qobuz.py @@ -47,8 +47,8 @@ class QobuzProvider(MusicProvider): self.__username = username self.__password = password self.__user_auth_token = None - self.__app_id = "285473059" - self.__app_secret = "47249d0eaefa6bf43a959c09aacdbce8" + self.__app_id = "285473059" # TEMP! Own key requested + self.__app_secret = "47249d0eaefa6bf43a959c09aacdbce8" # TEMP! Own key requested self.__logged_in = False self.throttler = Throttler(rate_limit=1, period=1) @@ -252,12 +252,9 @@ class QobuzProvider(MusicProvider): params = {'playlist_id': prov_playlist_id, 'track_ids': ",".join(playlist_track_ids)} return await self.__get_data('playlist/deleteTracks', params) - async def get_stream_content_type(self, track_id): - ''' return the content type for the given track when it will be streamed''' - return 'flac' #TODO handle other file formats on qobuz? - async def get_stream_details(self, track_id): ''' return the content details for the given track when it will be streamed''' + # TODO: Report streaming start and streaming end !! params = {'format_id': 27, 'track_id': track_id, 'intent': 'stream'} streamdetails = await self.__get_data('track/getFileUrl', params, sign_request=True, ignore_cache=True) if not streamdetails: @@ -275,31 +272,6 @@ class QobuzProvider(MusicProvider): "bit_depth": streamdetails['bit_depth'] } - async def get_audio_stream(self, track_id): - ''' get audio stream for a track ''' - params = {'format_id': 27, 'track_id': track_id, 'intent': 'stream'} - # we are called from other thread - streamdetails_future = asyncio.run_coroutine_threadsafe( - self.__get_data('track/getFileUrl', params, sign_request=True, ignore_cache=True), - self.mass.event_loop - ) - streamdetails = streamdetails_future.result() - print(streamdetails) - if not streamdetails: - # simply retry this request - await asyncio.sleep(1) - streamdetails_future = asyncio.run_coroutine_threadsafe( - self.__get_data('track/getFileUrl', params, sign_request=True, ignore_cache=True), - self.mass.event_loop - ) - streamdetails = streamdetails_future.result() - if not streamdetails: - raise Exception("Unable to retrieve stream url for track %s" % track_id) - async with aiohttp.ClientSession(loop=asyncio.get_event_loop(), connector=aiohttp.TCPConnector(verify_ssl=False)) as session: - async with session.get(streamdetails['url']) as resp: - async for data, end_of_http_chunk in resp.content.iter_chunks(): - yield data - async def __parse_artist(self, artist_obj): ''' parse spotify artist object to generic layout ''' artist = Artist() diff --git a/music_assistant/modules/playerproviders/chromecast.py b/music_assistant/modules/playerproviders/chromecast.py index 05c26d8a..486a5aa4 100644 --- a/music_assistant/modules/playerproviders/chromecast.py +++ b/music_assistant/modules/playerproviders/chromecast.py @@ -92,6 +92,7 @@ class ChromecastProvider(PlayerProvider): else: self._chromecasts[player_id].media_controller.queue_next() elif cmd == 'previous': + enable_crossfade = self.mass.config['player_settings'][player_id]["crossfade_duration"] > 0 if enable_crossfade: await self.__play_stream_queue(player_id, self._player_queue_index[player_id]-1) else: diff --git a/requirements.txt b/requirements.txt old mode 100644 new mode 100755 index 554af152..397d2107 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ aiohttp spotify_token pychromecast uvloop -slugify asyncio_throttle aiocometd aiosqlite diff --git a/run.sh b/run.sh index 993dbc3e..3e73292e 100755 --- a/run.sh +++ b/run.sh @@ -7,7 +7,6 @@ if [ "$autoupdate" == "true" ]; then cd /tmp curl -LOks "https://github.com/marcelveldt/musicassistant/archive/master.zip" unzip -q master.zip - pip install -q --upgrade -r musicassistant-master/requirements.txt cp -rf musicassistant-master/music_assistant/. /usr/src/app rm -R /tmp/musicassistant-master fi