From: Marcel van der Veldt Date: Sat, 15 Jun 2019 21:54:49 +0000 (+0200) Subject: fix report streaming start/end to streaming providers X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=9853233ee3d0a672c50212535a551ef563d140cc;p=music-assistant-server.git fix report streaming start/end to streaming providers --- diff --git a/music_assistant/modules/http_streamer.py b/music_assistant/modules/http_streamer.py index 5ec16d78..552d4def 100755 --- a/music_assistant/modules/http_streamer.py +++ b/music_assistant/modules/http_streamer.py @@ -55,10 +55,14 @@ class HTTPStreamer(): async def fill_buffer(): ''' fill buffer runs in background process to prevent deadlocks of the sox executable ''' - audio_stream = self.__get_audio_stream(track_id, provider, player_id) + audio_stream = self.__get_audio_stream(track_id, provider, player_id, cancelled) async for is_last_chunk, audio_chunk in audio_stream: if not cancelled.is_set(): await queue.put(audio_chunk) + # wait for the queue to consume the data + # this prevents that the entire track is sitting in memory + while queue.qsize() > 1 and not cancelled.is_set(): + await asyncio.sleep(1) await queue.put(b'') # EOF run_async_background_task(self.mass.bg_executor, fill_buffer) @@ -199,11 +203,10 @@ class HTTPStreamer(): prev_chunk = None bytes_written = 0 async for is_last_chunk, chunk in self.__get_audio_stream( - track_id, provider, player_id, chunksize=fade_bytes, outputfmt=pcm_args, - sox_effects='rate -v %s' % sample_rate ): + track_id, provider, player_id, cancelled, chunksize=fade_bytes, resample=sample_rate): cur_chunk += 1 if cur_chunk <= 2 and not last_fadeout_data: - # fade-in part but this is the first track so just pass it to the final file + # fade-in part but no fadeout_part available so just pass it to the output directly sox_proc.stdin.write(chunk) await sox_proc.stdin.drain() bytes_written += len(chunk) @@ -231,6 +234,7 @@ class HTTPStreamer(): await sox_proc.stdin.drain() bytes_written += len(remaining_bytes) del remaining_bytes + prev_chunk = None # needed to prevent this chunk being sent again elif prev_chunk and is_last_chunk: # last chunk received so create the fadeout_part with the previous chunk and this chunk # and strip off silence @@ -238,14 +242,23 @@ class HTTPStreamer(): process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) last_part, stderr = await process.communicate(prev_chunk + chunk) - last_fadeout_data = last_part[-fade_bytes:] - remaining_bytes = last_part[:-fade_bytes] - # write remaining bytes - sox_proc.stdin.write(remaining_bytes) - bytes_written += len(remaining_bytes) - await sox_proc.stdin.drain() - del last_part - del remaining_bytes + if len(last_part) < fade_bytes: + # not enough data for crossfade duration + LOGGER.warning("not enough data for fadeout so skip crossfade... %s" % len(last_part)) + sox_proc.stdin.write(last_part) + bytes_written += len(last_part) + await sox_proc.stdin.drain() + del last_part + else: + # store fade section to be picked up for next track + last_fadeout_data = last_part[-fade_bytes:] + remaining_bytes = last_part[:-fade_bytes] + # write remaining bytes + sox_proc.stdin.write(remaining_bytes) + bytes_written += len(remaining_bytes) + await sox_proc.stdin.drain() + del last_part + del remaining_bytes else: # middle part of the track # keep previous chunk in memory so we have enough samples to perform the crossfade @@ -259,7 +272,7 @@ class HTTPStreamer(): # wait for the queue to consume the data # this prevents that the entire track is sitting in memory # and it helps a bit in the quest to follow where we are in the queue - while buffer.qsize() > 2 and not cancelled.is_set(): + while buffer.qsize() > 1 and not cancelled.is_set(): await asyncio.sleep(1) # end of the track reached # WIP: update actual duration to the queue for more accurate now playing info @@ -282,15 +295,19 @@ class HTTPStreamer(): await sox_proc.wait() LOGGER.info("streaming of queue for player %s completed" % player_id) - async def __get_audio_stream(self, track_id, provider, player_id, - chunksize=512000, outputfmt='flac -C 0', sox_effects=''): + async def __get_audio_stream(self, track_id, provider, player_id, cancelled, + chunksize=512000, resample=None): ''' get audio stream from provider and apply additional effects/processing where/if needed''' if self.mass.config['base']['http_streamer']['volume_normalisation']: gain_correct = await self.__get_track_gain_correct(track_id, provider) gain_correct = 'vol %s dB ' % gain_correct else: gain_correct = '' - sox_effects += await self.__get_player_sox_options(track_id, provider, player_id, False) + sox_effects = await self.__get_player_sox_options(track_id, provider, player_id, False) + outputfmt = 'flac -C 0' + if resample: + outputfmt = 'raw -b 32 -c 2 -e signed-integer' + sox_effects += ' rate -v %s' % resample # stream audio from provider streamdetails = asyncio.run_coroutine_threadsafe( self.mass.music.providers[provider].get_stream_details(track_id), self.mass.event_loop).result() @@ -304,21 +321,43 @@ class HTTPStreamer(): LOGGER.debug("Running sox with args: %s" % args) process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE) + # fire event that streaming has started for this track (needed by some streaming providers) + streamdetails["provider"] = provider + streamdetails["track_id"] = track_id + streamdetails["player_id"] = player_id + self.mass.event_loop.create_task(self.mass.event('streaming_started', streamdetails)) # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly prev_chunk = b'' + bytes_sent = 0 while not process.stdout.at_eof(): try: chunk = await process.stdout.readexactly(chunksize) except asyncio.streams.IncompleteReadError: chunk = await process.stdout.read(chunksize) - if prev_chunk: + if not chunk: + break + if prev_chunk and not cancelled.is_set(): yield (False, prev_chunk) + bytes_sent += len(prev_chunk) prev_chunk = chunk # yield last chunk - yield (True, prev_chunk) + if not cancelled.is_set(): + yield (True, prev_chunk) + bytes_sent += len(prev_chunk) await process.wait() - LOGGER.info("__get_audio_stream for track_id %s completed" % track_id) + if cancelled.is_set(): + LOGGER.warning("__get_audio_stream for track_id %s interrupted" % track_id) + else: + LOGGER.info("__get_audio_stream for track_id %s completed" % track_id) + # fire event that streaming has ended for this track (needed by some streaming providers) + if resample: + bytes_per_second = resample * (32/8) * 2 + else: + bytes_per_second = streamdetails["sample_rate"] * (streamdetails["bit_depth"]/8) * 2 + seconds_streamed = int(bytes_sent/bytes_per_second) + streamdetails["seconds"] = seconds_streamed + self.mass.event_loop.create_task(self.mass.event('streaming_ended', streamdetails)) # send task to background to analyse the audio self.mass.event_loop.create_task(self.__analyze_audio(track_id, provider)) @@ -353,7 +392,7 @@ class HTTPStreamer(): ''' analyze track audio, for now we only calculate EBU R128 loudness ''' track_key = '%s%s' %(track_id, provider) if track_key in self.analyze_jobs: - return # prevent multiple analyze jobs for same tracks + return # prevent multiple analyze jobs for same track self.analyze_jobs[track_key] = True streamdetails = await self.mass.music.providers[provider].get_stream_details(track_id) track_loudness = await self.mass.db.get_track_loudness(track_id, provider) diff --git a/music_assistant/modules/musicproviders/qobuz.py b/music_assistant/modules/musicproviders/qobuz.py index b8dd2753..ee0a781a 100644 --- a/music_assistant/modules/musicproviders/qobuz.py +++ b/music_assistant/modules/musicproviders/qobuz.py @@ -40,17 +40,17 @@ class QobuzProvider(MusicProvider): def __init__(self, mass, username, password): self.name = 'Qobuz' self.prov_id = 'qobuz' - self._cur_user = None self.mass = mass self.cache = mass.cache self.http_session = aiohttp.ClientSession(loop=mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False)) self.__username = username self.__password = password - self.__user_auth_token = None + self.__user_auth_info = None self.__app_id = "285473059" # TEMP! Own key requested self.__app_secret = "47249d0eaefa6bf43a959c09aacdbce8" # TEMP! Own key requested self.__logged_in = False self.throttler = Throttler(rate_limit=1, period=1) + mass.event_loop.create_task(mass.add_event_listener(self.mass_event)) async def search(self, searchstring, media_types=List[MediaType], limit=5): ''' perform search on the provider ''' @@ -269,9 +269,37 @@ class QobuzProvider(MusicProvider): "path": streamdetails['url'], "content_type": streamdetails['mime_type'].split('/')[1], "sample_rate": int(streamdetails['sampling_rate']*1000), - "bit_depth": streamdetails['bit_depth'] + "bit_depth": streamdetails['bit_depth'], + "details": streamdetails # we need these details for reporting playback } + async def mass_event(self, msg, msg_details): + ''' received event from mass ''' + if msg == "streaming_started" and msg_details['provider'] == self.prov_id: + # report streaming started to qobuz + LOGGER.debug("streaming_started %s" % msg_details["track_id"]) + device_id = self.__user_auth_info["user"]["device"]["id"] + credential_id = self.__user_auth_info["user"]["credential"]["id"] + user_id = self.__user_auth_info["user"]["id"] + format_id = msg_details["details"]["format_id"] + timestamp = int(time.time()) + events=[{"online": True, "sample": False, "intent": "stream", "device_id": device_id, + "track_id": msg_details["track_id"], "purchase": False, "date": timestamp, + "credential_id": credential_id, "user_id": user_id, "local": False, "format_id":format_id}] + await self.__post_data("track/reportStreamingStart", data=events) + elif msg == "streaming_ended" and msg_details['provider'] == self.prov_id: + # report streaming ended to qobuz + LOGGER.debug("streaming_ended %s - seconds played: %s" %(msg_details["track_id"], msg_details["seconds"]) ) + device_id = self.__user_auth_info["user"]["device"]["id"] + credential_id = self.__user_auth_info["user"]["credential"]["id"] + user_id = self.__user_auth_info["user"]["id"] + format_id = msg_details["details"]["format_id"] + timestamp = int(time.time()) + events=[{"online": True, "sample": False, "intent": "stream", "device_id": device_id, + "track_id": msg_details["track_id"], "purchase": False, "date": timestamp, "duration": msg_details["seconds"], + "credential_id": credential_id, "user_id": user_id, "local": False, "format_id":format_id}] + await self.__post_data("track/reportStreamingStart", data=events) + async def __parse_artist(self, artist_obj): ''' parse spotify artist object to generic layout ''' artist = Artist() @@ -430,7 +458,7 @@ class QobuzProvider(MusicProvider): }) playlist.name = playlist_obj['name'] playlist.owner = playlist_obj['owner']['name'] - playlist.is_editable = playlist_obj['owner']['id'] == self._cur_user["id"] or playlist_obj['is_collaborative'] + playlist.is_editable = playlist_obj['owner']['id'] == self.__user_auth_info["user"]["id"] or playlist_obj['is_collaborative'] if playlist_obj.get('images300'): playlist.metadata["image"] = playlist_obj['images300'][0] if playlist_obj.get('url'): @@ -439,13 +467,12 @@ class QobuzProvider(MusicProvider): async def __auth_token(self): ''' login to qobuz and store the token''' - if self.__user_auth_token: - return self.__user_auth_token - params = { "username": self.__username, "password": self.__password} + if self.__user_auth_info: + return self.__user_auth_info["user_auth_token"] + params = { "username": self.__username, "password": self.__password, "device_manufacturer_id": "music_assistant"} details = await self.__get_data("user/login", params, ignore_cache=True) - self._cur_user = details["user"] - self.__user_auth_token = details["user_auth_token"] - LOGGER.info("Succesfully logged in to Qobuz as %s" % (self._cur_user["display_name"])) + self.__user_auth_info = details + LOGGER.info("Succesfully logged in to Qobuz as %s" % (details["user"]["display_name"])) return details["user_auth_token"] async def __get_all_items(self, endpoint, params={}, key="playlists", limit=0, offset=0, cache_checksum=None): @@ -500,7 +527,7 @@ class QobuzProvider(MusicProvider): params["request_ts"] = request_ts params["request_sig"] = request_sig params["app_id"] = self.__app_id - params["user_auth_token"] = self.__user_auth_token + params["user_auth_token"] = await self.__auth_token() async with self.throttler: async with self.http_session.get(url, headers=headers, params=params) as response: result = await response.json() @@ -511,3 +538,16 @@ class QobuzProvider(MusicProvider): result = None return result + async def __post_data(self, endpoint, params={}, data={}): + ''' post data to api''' + url = "http://www.qobuz.com/api.json/0.2/%s" % endpoint + params["app_id"] = self.__app_id + params["user_auth_token"] = await self.__auth_token() + async with self.http_session.post(url, params=params, json=data) as response: + result = await response.json() + if not result or 'error' in result: + LOGGER.error(url) + LOGGER.error(params) + LOGGER.error(result) + result = None + return result \ No newline at end of file diff --git a/music_assistant/modules/musicproviders/spotify.py b/music_assistant/modules/musicproviders/spotify.py index 587c820a..9ac6f6d5 100644 --- a/music_assistant/modules/musicproviders/spotify.py +++ b/music_assistant/modules/musicproviders/spotify.py @@ -241,10 +241,6 @@ class SpotifyProvider(MusicProvider): elif offset_uri != None: # only for playlists/albums! opts["offset"] = {"uri": offset_uri } return await self.__put_data('me/player/play', {"device_id": device_id}, opts) - - async def get_stream_content_type(self, track_id): - ''' return the content type for the given track when it will be streamed''' - return 'ogg' async def get_stream_details(self, track_id): ''' return the content details for the given track when it will be streamed''' @@ -257,19 +253,6 @@ class SpotifyProvider(MusicProvider): "sample_rate": 44100, "bit_depth": 16 } - - async def get_audio_stream(self, track_id): - ''' get audio stream for a track ''' - import subprocess - spotty = self.get_spotty_binary() - args = ['-n', 'temp', '-u', self._username, '-p', self._password, '--pass-through', '--single-track', track_id] - process = await asyncio.create_subprocess_exec(spotty, *args, stdout=asyncio.subprocess.PIPE) - while not process.stdout.at_eof(): - chunk = await process.stdout.read(32000) - if not chunk: - break - yield chunk - await process.wait() async def __parse_artist(self, artist_obj): ''' parse spotify artist object to generic layout '''