simplify the streaming
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 15 Jun 2019 18:42:39 +0000 (20:42 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 15 Jun 2019 18:42:39 +0000 (20:42 +0200)
Dockerfile
music_assistant/modules/http_streamer.py
music_assistant/modules/musicproviders/qobuz.py
music_assistant/modules/playerproviders/chromecast.py
requirements.txt [changed mode: 0644->0755]
run.sh

index cfa440b202cdabe0c9dbf3b3bfc8aae15237762c..5c1ed6b8413ee8f08cef1ade2c6d9bb9ab97521e 100755 (executable)
@@ -1,9 +1,13 @@
-FROM python:3.7.3-alpine
+FROM alpine:latest
 
 # install deps
-RUN apk add build-base python-dev flac sox taglib-dev zip curl ffmpeg ffmpeg-dev sox-dev py-numpy
+RUN apk add flac sox zip curl wget ffmpeg
+RUN apk add --no-cache -X http://dl-cdn.alpinelinux.org/alpine/edge/testing python3 py3-numpy py3-scipy py3-matplotlib py3-aiohttp py3-cairocffi
 COPY requirements.txt requirements.txt
-RUN pip install --upgrade -r requirements.txt
+RUN apk --no-cache add --virtual .builddeps build-base python3-dev taglib-dev && \
+    pip3 install -r requirements.txt && \
+    apk del .builddeps && \
+    rm -rf /root/.cache
 
 # copy files
 RUN mkdir -p /usr/src/app
index 529bc7902c78ef04760ae2a065f70d95bc89a12f..5ec16d78d87c28ceabbc7edc105a9cd48fa28aca 100755 (executable)
@@ -5,24 +5,16 @@ import asyncio
 import os
 from utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size
 from models import TrackQuality, MediaType, PlayerState
-import shutil
-import xml.etree.ElementTree as ET
-import random
-import base64
 import operator
 from aiohttp import web
 import threading
 import urllib
-import math
 from memory_tempfile import MemoryTempfile
-import tempfile
 import io
 import soundfile as sf
 import pyloudnorm as pyln
 import aiohttp
 
-AUDIO_TEMP_DIR = "/tmp/audio_tmp"
-AUDIO_CACHE_DIR = "/tmp/audio_cache"
 
 class HTTPStreamer():
     ''' Built-in streamer using sox and webserver '''
@@ -32,25 +24,13 @@ class HTTPStreamer():
         self.create_config_entries()
         self.local_ip = get_ip()
         self.analyze_jobs = {}
-        self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder']
-        # create needed temp/cache dirs
-        if self.mass.config['base']['http_streamer']['enable_cache'] and not os.path.isdir(self._audio_cache_dir):
-            self._audio_cache_dir = self.mass.config['base']['http_streamer']['audio_cache_folder']
-            os.makedirs(self._audio_cache_dir)
-        if not os.path.isdir(AUDIO_TEMP_DIR):
-            os.makedirs(AUDIO_TEMP_DIR)
-        mass.event_loop.create_task(self.__cache_cleanup())
 
     def create_config_entries(self):
         ''' sets the config entries for this module (list with key/value pairs)'''
         config_entries = [
             ('volume_normalisation', True, 'enable_r128_volume_normalisation'), 
             ('target_volume', '-23', 'target_volume_lufs'),
-            ('fallback_gain_correct', '-12', 'fallback_gain_correct'),
-            ('enable_cache', True, 'enable_audio_cache'),
-            ('trim_silence', True, 'trim_silence'),
-            ('audio_cache_folder', '/tmp/audio_cache', 'audio_cache_folder'),
-            ('audio_cache_max_size_gb', 20, 'audio_cache_max_size_gb')
+            ('fallback_gain_correct', '-12', 'fallback_gain_correct')
             ]
         if not self.mass.config['base'].get('http_streamer'):
             self.mass.config['base']['http_streamer'] = {}
@@ -222,52 +202,50 @@ class HTTPStreamer():
                     track_id, provider, player_id, chunksize=fade_bytes, outputfmt=pcm_args, 
                     sox_effects='rate -v %s' % sample_rate ):
                 cur_chunk += 1
-                if cur_chunk == 1 and not last_fadeout_data:
+                if cur_chunk <= 2 and not last_fadeout_data:
                     # fade-in part but this is the first track so just pass it to the final file
                     sox_proc.stdin.write(chunk)
                     await sox_proc.stdin.drain()
                     bytes_written += len(chunk)
                 elif cur_chunk == 1 and last_fadeout_data:
-                    # create fade-out part
-                    args = 'sox --ignore-length -t %s - -t %s - reverse fade t %s reverse' % (pcm_args, pcm_args, fade_length)
+                    prev_chunk = chunk
+                elif cur_chunk == 2 and last_fadeout_data:
+                    # combine the first 2 chunks and strip off silence
+                    args = 'sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%' % (pcm_args, pcm_args)
                     process = await asyncio.create_subprocess_shell(args,
                             stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
-                    last_fadeout_data, stderr = await process.communicate(last_fadeout_data)
-                    LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data))
-                    # create fade-in part
-                    args = 'sox --ignore-length -t %s - -t %s - fade t %s' % (pcm_args, pcm_args, fade_length)
-                    process = await asyncio.create_subprocess_shell(args,
-                            stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
-                    fade_in_part, stderr = await process.communicate(chunk)
-                    LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
-                    # create crossfade using sox and some temp files
-                    # TODO: figure out how to make this less complex and without the tempfiles
-                    fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
-                    fadeinfile.write(fade_in_part)
-                    fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
-                    fadeoutfile.write(last_fadeout_data)
-                    args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args)
-                    process = await asyncio.create_subprocess_shell(args,
-                            stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
-                    crossfade_part, stderr = await process.communicate(fade_in_part)
-                    LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
-                    # write the crossfade part to the sox player
+                    first_part, stderr = await process.communicate(prev_chunk + chunk)
+                    fade_in_part = first_part[:fade_bytes]
+                    remaining_bytes = first_part[fade_bytes:]
+                    del first_part
+                    # do crossfade
+                    crossfade_part = await self.__crossfade_pcm_parts(fade_in_part, last_fadeout_data, pcm_args, fade_length) 
                     sox_proc.stdin.write(crossfade_part)
                     await sox_proc.stdin.drain()
                     bytes_written += len(crossfade_part)
-                    fadeinfile.close()
-                    fadeoutfile.close()
                     del crossfade_part
-                    fade_in_part = None
+                    del fade_in_part
                     last_fadeout_data = b''
+                    # also write the leftover bytes from the strip action
+                    sox_proc.stdin.write(remaining_bytes)
+                    await sox_proc.stdin.drain()
+                    bytes_written += len(remaining_bytes)
+                    del remaining_bytes
                 elif prev_chunk and is_last_chunk:
-                    # last chunk received so create the fadeout with the previous chunk and this chunk
-                    last_part = prev_chunk + chunk
+                    # last chunk received so create the fadeout_part with the previous chunk and this chunk
+                    # and strip off silence
+                    args = 'sox --ignore-length -t %s - -t %s - reverse silence 1 0.1 1%% reverse' % (pcm_args, pcm_args)
+                    process = await asyncio.create_subprocess_shell(args,
+                            stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+                    last_part, stderr = await process.communicate(prev_chunk + chunk)
                     last_fadeout_data = last_part[-fade_bytes:]
-                    bytes_remaining = last_part[:-fade_bytes]
-                    sox_proc.stdin.write(bytes_remaining)
-                    bytes_written += len(bytes_remaining)
+                    remaining_bytes = last_part[:-fade_bytes]
+                    # write remaining bytes
+                    sox_proc.stdin.write(remaining_bytes)
+                    bytes_written += len(remaining_bytes)
                     await sox_proc.stdin.drain()
+                    del last_part
+                    del remaining_bytes
                 else:
                     # middle part of the track
                     # keep previous chunk in memory so we have enough samples to perform the crossfade
@@ -278,16 +256,6 @@ class HTTPStreamer():
                         prev_chunk = chunk
                     else:
                         prev_chunk = chunk
-                # pre-analyse the next track - to ensure smooth transitions
-                try:
-                    queue_tracks = await self.mass.player.player_queue(player_id, queue_index+1, queue_index+2)
-                    queue_track = queue_tracks[0]
-                    params = urllib.parse.parse_qs(queue_track.uri.split('?')[1])
-                    track_id = params['track_id'][0]
-                    provider = params['provider'][0]
-                    self.mass.event_loop.create_task(self.__analyze_audio(track_id, provider))
-                except:
-                    pass
                 # wait for the queue to consume the data
                 # this prevents that the entire track is sitting in memory
                 # and it helps a bit in the quest to follow where we are in the queue
@@ -323,22 +291,16 @@ class HTTPStreamer():
         else:
             gain_correct = ''
         sox_effects += await self.__get_player_sox_options(track_id, provider, player_id, False)
-
-        cachefile = self.__get_track_cache_filename(track_id, provider)
-        if os.path.isfile(cachefile):
-            # stream from cachefile
-            args = 'sox -t sox "%s" -t %s - %s %s' % (cachefile, outputfmt, gain_correct, sox_effects)
-        else:
-            # stream directly from provider
-            streamdetails = asyncio.run_coroutine_threadsafe(
-                    self.mass.music.providers[provider].get_stream_details(track_id), self.mass.event_loop).result()
-            if not streamdetails:
-                yield b''
-                return
-            if streamdetails['type'] == 'url':
-                args = 'sox -t %s "%s" -t %s - %s %s' % (streamdetails["content_type"], streamdetails["path"], outputfmt, gain_correct, sox_effects)
-            elif streamdetails['type'] == 'executable':
-                args = '%s | sox -t %s - -t %s - %s %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, gain_correct, sox_effects)
+        # stream audio from provider
+        streamdetails = asyncio.run_coroutine_threadsafe(
+                self.mass.music.providers[provider].get_stream_details(track_id), self.mass.event_loop).result()
+        if not streamdetails:
+            yield b''
+            return
+        if streamdetails['type'] == 'url':
+            args = 'sox -t %s "%s" -t %s - %s %s' % (streamdetails["content_type"], streamdetails["path"], outputfmt, gain_correct, sox_effects)
+        elif streamdetails['type'] == 'executable':
+            args = '%s | sox -t %s - -t %s - %s %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, gain_correct, sox_effects)
         LOGGER.debug("Running sox with args: %s" % args)
         process = await asyncio.create_subprocess_shell(args,
                 stdout=asyncio.subprocess.PIPE)
@@ -394,11 +356,8 @@ class HTTPStreamer():
             return # prevent multiple analyze jobs for same tracks
         self.analyze_jobs[track_key] = True
         streamdetails = await self.mass.music.providers[provider].get_stream_details(track_id)
-        cachefile = self.__get_track_cache_filename(track_id, provider)
-        enable_cache = self.mass.config['base']['http_streamer']['enable_cache']
-        needs_cachefile = enable_cache and not os.path.isfile(cachefile)
         track_loudness = await self.mass.db.get_track_loudness(track_id, provider)
-        if needs_cachefile or track_loudness == None:
+        if track_loudness == None:
             # only when needed we do the analyze stuff
             LOGGER.info('Start analyzing track %s' % track_id)
             if streamdetails['type'] == 'url':
@@ -418,11 +377,6 @@ class HTTPStreamer():
                 del data
                 LOGGER.debug("Integrated loudness of track %s is: %s" %(track_id, loudness))
                 await self.mass.db.set_track_loudness(track_id, provider, loudness)
-            if needs_cachefile:
-                # use sox to store cache file (strip silence from start and end for better transitions)
-                cmd = 'sox -t %s - -t sox %s silence 1 0.1 1%% reverse silence 1 0.1 1%% reverse' %(streamdetails['content_type'], cachefile)
-                process = await asyncio.create_subprocess_shell(cmd, stdin=asyncio.subprocess.PIPE)
-                await process.communicate(audio_data)
             del audio_data
             LOGGER.info('Finished analyzing track %s' % track_id)
         self.analyze_jobs.pop(track_key, None)
@@ -437,74 +391,24 @@ class HTTPStreamer():
         gain_correct = target_gain - track_loudness
         return round(gain_correct,2)
 
-    async def __fill_audio_buffer(self, buf, track_id, provider, content_type):
-        ''' get audio data from provider and write to buffer'''
-        # fill the buffer with audio data
-        # a tempfile is created so we can do audio analysis
-        try:
-            tmpfile = os.path.join(AUDIO_TEMP_DIR, '%s%s%s.tmp' % (random.randint(0, 999), track_id, random.randint(0, 999)))
-            fd = open(tmpfile, 'wb')
-            async for chunk in self.mass.music.providers[provider].get_audio_stream(track_id):
-                if not chunk:
-                    break
-                buf.write(chunk)
-                await buf.drain()
-                fd.write(chunk)
-            LOGGER.info("fill_audio_buffer complete for track %s" % track_id)
-            # successfull completion, process temp file for analysis
-            self.mass.event_loop.create_task(
-                    self.__analyze_audio(tmpfile, track_id, provider, content_type))
-        except Exception as exc:
-            LOGGER.exception("fill_audio_buffer failed for track %s" % track_id)
-        finally:
-            buf.write_eof()
-            await buf.drain()
-            fd.close()
-
-    def __get_track_cache_filename(self, track_id, provider):
-        ''' get filename for a track to use as cache file '''
-        filename = '%s%s' %(provider, track_id.split(os.sep)[-1])
-        filename = base64.b64encode(filename.encode()).decode()
-        return os.path.join(self._audio_cache_dir, filename)
-
-    @run_periodic(3600)
-    async def __cache_cleanup(self):
-        ''' calculate size of cache folder and cleanup if needed '''
-        def cleanup():
-            size_limit = self.mass.config['base']['http_streamer']['audio_cache_max_size_gb']
-            total_size_gb = get_folder_size(self._audio_cache_dir)
-            LOGGER.info("current size of cache folder is %s GB" % total_size_gb)
-            if size_limit and total_size_gb > size_limit:
-                LOGGER.info("Cache folder size exceeds threshold, start cleanup...")
-                from pathlib import Path
-                import time
-                days = 14
-                while total_size_gb > size_limit:
-                    time_in_secs = time.time() - (days * 24 * 60 * 60)
-                    for i in Path(self._audio_cache_dir).iterdir():
-                        if i.is_file():
-                            if i.stat().st_atime <= time_in_secs:
-                                total_size_gb -= i.stat().st_size/float(1<<30)
-                                i.unlink()
-                        if total_size_gb < size_limit:
-                            break
-                    days -= 1
-                LOGGER.info("Cache folder size cleanup completed")
-        await self.mass.event_loop.run_in_executor(None, cleanup)
-
-    @staticmethod
-    def __get_bs1770_binary():
-        ''' get the path to the bs1770 binary for the current OS '''
-        import platform
-        bs1770_binary = None
-        if platform.system() == "Windows":
-            bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "win64", "bs1770gain")
-        elif platform.system() == "Darwin":
-            # macos binary is x86_64 intel
-            bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "osx", "bs1770gain")
-        elif platform.system() == "Linux":
-            architecture = platform.machine()
-            if architecture.startswith('AMD64') or architecture.startswith('x86_64'):
-                bs1770_binary = os.path.join(os.path.dirname(__file__), "bs1770gain", "linux64", "bs1770gain")
-            # TODO: build armhf binary
-        return bs1770_binary
\ No newline at end of file
+    async def __crossfade_pcm_parts(self, fade_in_part, fade_out_part, pcm_args, fade_length):
+        ''' crossfade two chunks of audio using sox '''
+        # create fade-in part
+        fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+        args = 'sox --ignore-length -t %s - -t %s %s fade t %s' % (pcm_args, pcm_args, fadeinfile.name, fade_length)
+        process = await asyncio.create_subprocess_shell(args, stdin=asyncio.subprocess.PIPE)
+        await process.communicate(fade_in_part)
+        # create fade-out part
+        fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+        args = 'sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse' % (pcm_args, pcm_args, fadeoutfile.name, fade_length)
+        process = await asyncio.create_subprocess_shell(args,
+                stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+        await process.communicate(fade_out_part)
+        # create crossfade using sox and some temp files
+        # TODO: figure out how to make this less complex and without the tempfiles
+        args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args)
+        process = await asyncio.create_subprocess_shell(args,
+                stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+        crossfade_part, stderr = await process.communicate()
+        LOGGER.info("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
+        return crossfade_part
index 7c152e682d78bd49ac834a5b672d8db5d091274b..b8dd27537452f4c0a8cc713dbf02d018d806c256 100644 (file)
@@ -47,8 +47,8 @@ class QobuzProvider(MusicProvider):
         self.__username = username
         self.__password = password
         self.__user_auth_token = None
-        self.__app_id = "285473059"
-        self.__app_secret = "47249d0eaefa6bf43a959c09aacdbce8"
+        self.__app_id = "285473059" # TEMP! Own key requested
+        self.__app_secret = "47249d0eaefa6bf43a959c09aacdbce8" # TEMP! Own key requested
         self.__logged_in = False
         self.throttler = Throttler(rate_limit=1, period=1)
 
@@ -252,12 +252,9 @@ class QobuzProvider(MusicProvider):
         params = {'playlist_id': prov_playlist_id, 'track_ids': ",".join(playlist_track_ids)}
         return await self.__get_data('playlist/deleteTracks', params)
     
-    async def get_stream_content_type(self, track_id):
-        ''' return the content type for the given track when it will be streamed'''
-        return 'flac' #TODO handle other file formats on qobuz?
-
     async def get_stream_details(self, track_id):
         ''' return the content details for the given track when it will be streamed'''
+        # TODO: Report streaming start and streaming end !!
         params = {'format_id': 27, 'track_id': track_id, 'intent': 'stream'}
         streamdetails = await self.__get_data('track/getFileUrl', params, sign_request=True, ignore_cache=True)
         if not streamdetails:
@@ -275,31 +272,6 @@ class QobuzProvider(MusicProvider):
             "bit_depth": streamdetails['bit_depth']
         }
 
-    async def get_audio_stream(self, track_id):
-        ''' get audio stream for a track '''
-        params = {'format_id': 27, 'track_id': track_id, 'intent': 'stream'}
-        # we are called from other thread
-        streamdetails_future = asyncio.run_coroutine_threadsafe(
-            self.__get_data('track/getFileUrl', params, sign_request=True, ignore_cache=True),
-            self.mass.event_loop
-        )
-        streamdetails = streamdetails_future.result()
-        print(streamdetails)
-        if not streamdetails:
-            # simply retry this request
-            await asyncio.sleep(1)
-            streamdetails_future = asyncio.run_coroutine_threadsafe(
-                self.__get_data('track/getFileUrl', params, sign_request=True, ignore_cache=True),
-                self.mass.event_loop
-            )
-            streamdetails = streamdetails_future.result()
-        if not streamdetails:
-            raise Exception("Unable to retrieve stream url for track %s" % track_id)
-        async with aiohttp.ClientSession(loop=asyncio.get_event_loop(), connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
-            async with session.get(streamdetails['url']) as resp:
-                async for data, end_of_http_chunk in resp.content.iter_chunks():
-                    yield data
-    
     async def __parse_artist(self, artist_obj):
         ''' parse spotify artist object to generic layout '''
         artist = Artist()
index 05c26d8a2775217dd14b8fae994562f5d1e28f1f..486a5aa43b8e4a23959f8570f38ddf825d06b750 100644 (file)
@@ -92,6 +92,7 @@ class ChromecastProvider(PlayerProvider):
             else:
                 self._chromecasts[player_id].media_controller.queue_next()
         elif cmd == 'previous':
+            enable_crossfade = self.mass.config['player_settings'][player_id]["crossfade_duration"] > 0
             if enable_crossfade:
                 await self.__play_stream_queue(player_id, self._player_queue_index[player_id]-1)
             else:
old mode 100644 (file)
new mode 100755 (executable)
index 554af15..397d210
@@ -3,7 +3,6 @@ aiohttp
 spotify_token
 pychromecast
 uvloop
-slugify
 asyncio_throttle
 aiocometd
 aiosqlite
diff --git a/run.sh b/run.sh
index 993dbc3e4ddd8f3282654e571992b3c836e60ae6..3e73292e5bc3122f4cd87314588c3712581e13a1 100755 (executable)
--- a/run.sh
+++ b/run.sh
@@ -7,7 +7,6 @@ if [ "$autoupdate" == "true" ]; then
     cd /tmp
     curl -LOks "https://github.com/marcelveldt/musicassistant/archive/master.zip"
     unzip -q master.zip
-    pip install -q --upgrade -r musicassistant-master/requirements.txt
     cp -rf musicassistant-master/music_assistant/. /usr/src/app
     rm -R /tmp/musicassistant-master
 fi