fix report streaming start/end to streaming providers
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 15 Jun 2019 21:54:49 +0000 (23:54 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 15 Jun 2019 21:54:49 +0000 (23:54 +0200)
music_assistant/modules/http_streamer.py
music_assistant/modules/musicproviders/qobuz.py
music_assistant/modules/musicproviders/spotify.py

index 5ec16d78d87c28ceabbc7edc105a9cd48fa28aca..552d4def2eb40d38b4de264d067bf061cc3707fc 100755 (executable)
@@ -55,10 +55,14 @@ class HTTPStreamer():
 
             async def fill_buffer():
                 ''' fill buffer runs in background process to prevent deadlocks of the sox executable '''
-                audio_stream = self.__get_audio_stream(track_id, provider, player_id)
+                audio_stream = self.__get_audio_stream(track_id, provider, player_id, cancelled)
                 async for is_last_chunk, audio_chunk in audio_stream:
                     if not cancelled.is_set():
                         await queue.put(audio_chunk)
+                    # wait for the queue to consume the data
+                    # this prevents that the entire track is sitting in memory
+                    while queue.qsize() > 1 and not cancelled.is_set():
+                        await asyncio.sleep(1)
                 await queue.put(b'') # EOF
             run_async_background_task(self.mass.bg_executor, fill_buffer)
                
@@ -199,11 +203,10 @@ class HTTPStreamer():
             prev_chunk = None
             bytes_written = 0
             async for is_last_chunk, chunk in self.__get_audio_stream(
-                    track_id, provider, player_id, chunksize=fade_bytes, outputfmt=pcm_args, 
-                    sox_effects='rate -v %s' % sample_rate ):
+                    track_id, provider, player_id, cancelled, chunksize=fade_bytes, resample=sample_rate):
                 cur_chunk += 1
                 if cur_chunk <= 2 and not last_fadeout_data:
-                    # fade-in part but this is the first track so just pass it to the final file
+                    # fade-in part but no fadeout_part available so just pass it to the output directly
                     sox_proc.stdin.write(chunk)
                     await sox_proc.stdin.drain()
                     bytes_written += len(chunk)
@@ -231,6 +234,7 @@ class HTTPStreamer():
                     await sox_proc.stdin.drain()
                     bytes_written += len(remaining_bytes)
                     del remaining_bytes
+                    prev_chunk = None # needed to prevent this chunk being sent again
                 elif prev_chunk and is_last_chunk:
                     # last chunk received so create the fadeout_part with the previous chunk and this chunk
                     # and strip off silence
@@ -238,14 +242,23 @@ class HTTPStreamer():
                     process = await asyncio.create_subprocess_shell(args,
                             stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
                     last_part, stderr = await process.communicate(prev_chunk + chunk)
-                    last_fadeout_data = last_part[-fade_bytes:]
-                    remaining_bytes = last_part[:-fade_bytes]
-                    # write remaining bytes
-                    sox_proc.stdin.write(remaining_bytes)
-                    bytes_written += len(remaining_bytes)
-                    await sox_proc.stdin.drain()
-                    del last_part
-                    del remaining_bytes
+                    if len(last_part) < fade_bytes:
+                        # not enough data for crossfade duration
+                        LOGGER.warning("not enough data for fadeout so skip crossfade... %s" % len(last_part))
+                        sox_proc.stdin.write(last_part)
+                        bytes_written += len(last_part)
+                        await sox_proc.stdin.drain()
+                        del last_part
+                    else:
+                        # store fade section to be picked up for next track
+                        last_fadeout_data = last_part[-fade_bytes:]
+                        remaining_bytes = last_part[:-fade_bytes]
+                        # write remaining bytes
+                        sox_proc.stdin.write(remaining_bytes)
+                        bytes_written += len(remaining_bytes)
+                        await sox_proc.stdin.drain()
+                        del last_part
+                        del remaining_bytes
                 else:
                     # middle part of the track
                     # keep previous chunk in memory so we have enough samples to perform the crossfade
@@ -259,7 +272,7 @@ class HTTPStreamer():
                 # wait for the queue to consume the data
                 # this prevents that the entire track is sitting in memory
                 # and it helps a bit in the quest to follow where we are in the queue
-                while buffer.qsize() > 2 and not cancelled.is_set():
+                while buffer.qsize() > 1 and not cancelled.is_set():
                     await asyncio.sleep(1)
             # end of the track reached
             # WIP: update actual duration to the queue for more accurate now playing info
@@ -282,15 +295,19 @@ class HTTPStreamer():
         await sox_proc.wait()
         LOGGER.info("streaming of queue for player %s completed" % player_id)
 
-    async def __get_audio_stream(self, track_id, provider, player_id,
-                chunksize=512000, outputfmt='flac -C 0', sox_effects=''):
+    async def __get_audio_stream(self, track_id, provider, player_id, cancelled,
+                chunksize=512000, resample=None):
         ''' get audio stream from provider and apply additional effects/processing where/if needed'''
         if self.mass.config['base']['http_streamer']['volume_normalisation']:
             gain_correct = await self.__get_track_gain_correct(track_id, provider)
             gain_correct = 'vol %s dB ' % gain_correct
         else:
             gain_correct = ''
-        sox_effects += await self.__get_player_sox_options(track_id, provider, player_id, False)
+        sox_effects = await self.__get_player_sox_options(track_id, provider, player_id, False)
+        outputfmt = 'flac -C 0'
+        if resample:
+            outputfmt = 'raw -b 32 -c 2 -e signed-integer'
+            sox_effects += ' rate -v %s' % resample
         # stream audio from provider
         streamdetails = asyncio.run_coroutine_threadsafe(
                 self.mass.music.providers[provider].get_stream_details(track_id), self.mass.event_loop).result()
@@ -304,21 +321,43 @@ class HTTPStreamer():
         LOGGER.debug("Running sox with args: %s" % args)
         process = await asyncio.create_subprocess_shell(args,
                 stdout=asyncio.subprocess.PIPE)
+        # fire event that streaming has started for this track (needed by some streaming providers)
+        streamdetails["provider"] = provider
+        streamdetails["track_id"] = track_id
+        streamdetails["player_id"] = player_id
+        self.mass.event_loop.create_task(self.mass.event('streaming_started', streamdetails))
         # yield chunks from stdout
         # we keep 1 chunk behind to detect end of stream properly
         prev_chunk = b''
+        bytes_sent = 0
         while not process.stdout.at_eof():
             try:
                 chunk = await process.stdout.readexactly(chunksize)
             except asyncio.streams.IncompleteReadError:
                 chunk = await process.stdout.read(chunksize)
-            if prev_chunk:
+            if not chunk:
+                break
+            if prev_chunk and not cancelled.is_set():
                 yield (False, prev_chunk)
+                bytes_sent += len(prev_chunk)
             prev_chunk = chunk
         # yield last chunk
-        yield (True, prev_chunk)
+        if not cancelled.is_set():
+            yield (True, prev_chunk)
+            bytes_sent += len(prev_chunk)
         await process.wait()
-        LOGGER.info("__get_audio_stream for track_id %s completed" % track_id)
+        if cancelled.is_set():
+            LOGGER.warning("__get_audio_stream for track_id %s interrupted" % track_id)
+        else:
+            LOGGER.info("__get_audio_stream for track_id %s completed" % track_id)
+        # fire event that streaming has ended for this track (needed by some streaming providers)
+        if resample:
+            bytes_per_second = resample * (32/8) * 2
+        else:
+            bytes_per_second = streamdetails["sample_rate"] * (streamdetails["bit_depth"]/8) * 2
+        seconds_streamed = int(bytes_sent/bytes_per_second)
+        streamdetails["seconds"] = seconds_streamed
+        self.mass.event_loop.create_task(self.mass.event('streaming_ended', streamdetails))
         # send task to background to analyse the audio
         self.mass.event_loop.create_task(self.__analyze_audio(track_id, provider))
 
@@ -353,7 +392,7 @@ class HTTPStreamer():
         ''' analyze track audio, for now we only calculate EBU R128 loudness '''
         track_key = '%s%s' %(track_id, provider)
         if track_key in self.analyze_jobs:
-            return # prevent multiple analyze jobs for same tracks
+            return # prevent multiple analyze jobs for same track
         self.analyze_jobs[track_key] = True
         streamdetails = await self.mass.music.providers[provider].get_stream_details(track_id)
         track_loudness = await self.mass.db.get_track_loudness(track_id, provider)
index b8dd27537452f4c0a8cc713dbf02d018d806c256..ee0a781ae5966a9bee3f90e41ddfefa9cc3d8f6e 100644 (file)
@@ -40,17 +40,17 @@ class QobuzProvider(MusicProvider):
     def __init__(self, mass, username, password):
         self.name = 'Qobuz'
         self.prov_id = 'qobuz'
-        self._cur_user = None
         self.mass = mass
         self.cache = mass.cache
         self.http_session = aiohttp.ClientSession(loop=mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
         self.__username = username
         self.__password = password
-        self.__user_auth_token = None
+        self.__user_auth_info = None
         self.__app_id = "285473059" # TEMP! Own key requested
         self.__app_secret = "47249d0eaefa6bf43a959c09aacdbce8" # TEMP! Own key requested
         self.__logged_in = False
         self.throttler = Throttler(rate_limit=1, period=1)
+        mass.event_loop.create_task(mass.add_event_listener(self.mass_event))
 
     async def search(self, searchstring, media_types=List[MediaType], limit=5):
         ''' perform search on the provider '''
@@ -269,9 +269,37 @@ class QobuzProvider(MusicProvider):
             "path": streamdetails['url'],
             "content_type": streamdetails['mime_type'].split('/')[1],
             "sample_rate": int(streamdetails['sampling_rate']*1000),
-            "bit_depth": streamdetails['bit_depth']
+            "bit_depth": streamdetails['bit_depth'],
+            "details": streamdetails # we need these details for reporting playback
         }
 
+    async def mass_event(self, msg, msg_details):
+        ''' received event from mass '''
+        if msg == "streaming_started" and msg_details['provider'] == self.prov_id:
+            # report streaming started to qobuz
+            LOGGER.debug("streaming_started %s" % msg_details["track_id"])
+            device_id = self.__user_auth_info["user"]["device"]["id"]
+            credential_id = self.__user_auth_info["user"]["credential"]["id"]
+            user_id = self.__user_auth_info["user"]["id"]
+            format_id = msg_details["details"]["format_id"]
+            timestamp = int(time.time())
+            events=[{"online": True, "sample": False, "intent": "stream", "device_id": device_id, 
+                "track_id": msg_details["track_id"], "purchase": False, "date": timestamp,
+                "credential_id": credential_id, "user_id": user_id, "local": False, "format_id":format_id}]
+            await self.__post_data("track/reportStreamingStart", data=events)
+        elif msg == "streaming_ended" and msg_details['provider'] == self.prov_id:
+            # report streaming ended to qobuz
+            LOGGER.debug("streaming_ended %s - seconds played: %s" %(msg_details["track_id"], msg_details["seconds"]) )
+            device_id = self.__user_auth_info["user"]["device"]["id"]
+            credential_id = self.__user_auth_info["user"]["credential"]["id"]
+            user_id = self.__user_auth_info["user"]["id"]
+            format_id = msg_details["details"]["format_id"]
+            timestamp = int(time.time())
+            events=[{"online": True, "sample": False, "intent": "stream", "device_id": device_id, 
+                "track_id": msg_details["track_id"], "purchase": False, "date": timestamp, "duration": msg_details["seconds"],
+                "credential_id": credential_id, "user_id": user_id, "local": False, "format_id":format_id}]
+            await self.__post_data("track/reportStreamingStart", data=events)
+    
     async def __parse_artist(self, artist_obj):
         ''' parse spotify artist object to generic layout '''
         artist = Artist()
@@ -430,7 +458,7 @@ class QobuzProvider(MusicProvider):
         })
         playlist.name = playlist_obj['name']
         playlist.owner = playlist_obj['owner']['name']
-        playlist.is_editable = playlist_obj['owner']['id'] == self._cur_user["id"] or playlist_obj['is_collaborative']
+        playlist.is_editable = playlist_obj['owner']['id'] == self.__user_auth_info["user"]["id"] or playlist_obj['is_collaborative']
         if playlist_obj.get('images300'):
             playlist.metadata["image"] = playlist_obj['images300'][0]
         if playlist_obj.get('url'):
@@ -439,13 +467,12 @@ class QobuzProvider(MusicProvider):
 
     async def __auth_token(self):
         ''' login to qobuz and store the token'''
-        if self.__user_auth_token:
-            return self.__user_auth_token
-        params = { "username": self.__username, "password": self.__password}
+        if self.__user_auth_info:
+            return self.__user_auth_info["user_auth_token"]
+        params = { "username": self.__username, "password": self.__password, "device_manufacturer_id": "music_assistant"}
         details = await self.__get_data("user/login", params, ignore_cache=True)
-        self._cur_user = details["user"]
-        self.__user_auth_token = details["user_auth_token"]
-        LOGGER.info("Succesfully logged in to Qobuz as %s" % (self._cur_user["display_name"]))
+        self.__user_auth_info = details
+        LOGGER.info("Succesfully logged in to Qobuz as %s" % (details["user"]["display_name"]))
         return details["user_auth_token"]
 
     async def __get_all_items(self, endpoint, params={}, key="playlists", limit=0, offset=0, cache_checksum=None):
@@ -500,7 +527,7 @@ class QobuzProvider(MusicProvider):
             params["request_ts"] = request_ts
             params["request_sig"] = request_sig
             params["app_id"] = self.__app_id
-            params["user_auth_token"] = self.__user_auth_token
+            params["user_auth_token"] = await self.__auth_token()
         async with self.throttler:
             async with self.http_session.get(url, headers=headers, params=params) as response:
                 result = await response.json()
@@ -511,3 +538,16 @@ class QobuzProvider(MusicProvider):
                     result = None
                 return result
 
+    async def __post_data(self, endpoint, params={}, data={}):
+        ''' post data to api'''
+        url = "http://www.qobuz.com/api.json/0.2/%s" % endpoint
+        params["app_id"] = self.__app_id
+        params["user_auth_token"] = await self.__auth_token()
+        async with self.http_session.post(url, params=params, json=data) as response:
+            result = await response.json()
+            if not result or 'error' in result:
+                LOGGER.error(url)
+                LOGGER.error(params)
+                LOGGER.error(result)
+                result = None
+            return result
\ No newline at end of file
index 587c820a67921f737924f24fd524a06e7587776a..9ac6f6d5650b86b0a83b9ee40237af0c8682ff84 100644 (file)
@@ -241,10 +241,6 @@ class SpotifyProvider(MusicProvider):
         elif offset_uri != None: # only for playlists/albums!
             opts["offset"] = {"uri": offset_uri }
         return await self.__put_data('me/player/play', {"device_id": device_id}, opts)
-    
-    async def get_stream_content_type(self, track_id):
-        ''' return the content type for the given track when it will be streamed'''
-        return 'ogg'
 
     async def get_stream_details(self, track_id):
         ''' return the content details for the given track when it will be streamed'''
@@ -257,19 +253,6 @@ class SpotifyProvider(MusicProvider):
             "sample_rate": 44100,
             "bit_depth": 16
         }
-
-    async def get_audio_stream(self, track_id):
-        ''' get audio stream for a track '''
-        import subprocess
-        spotty = self.get_spotty_binary()
-        args = ['-n', 'temp', '-u', self._username, '-p', self._password, '--pass-through', '--single-track', track_id]
-        process = await asyncio.create_subprocess_exec(spotty, *args, stdout=asyncio.subprocess.PIPE)
-        while not process.stdout.at_eof():
-            chunk = await process.stdout.read(32000)
-            if not chunk:
-                break
-            yield chunk
-        await process.wait()
         
     async def __parse_artist(self, artist_obj):
         ''' parse spotify artist object to generic layout '''