async def fill_buffer():
''' fill buffer runs in background process to prevent deadlocks of the sox executable '''
- audio_stream = self.__get_audio_stream(track_id, provider, player_id)
+ audio_stream = self.__get_audio_stream(track_id, provider, player_id, cancelled)
async for is_last_chunk, audio_chunk in audio_stream:
if not cancelled.is_set():
await queue.put(audio_chunk)
+ # wait for the queue to consume the data
+ # this prevents that the entire track is sitting in memory
+ while queue.qsize() > 1 and not cancelled.is_set():
+ await asyncio.sleep(1)
await queue.put(b'') # EOF
run_async_background_task(self.mass.bg_executor, fill_buffer)
prev_chunk = None
bytes_written = 0
async for is_last_chunk, chunk in self.__get_audio_stream(
- track_id, provider, player_id, chunksize=fade_bytes, outputfmt=pcm_args,
- sox_effects='rate -v %s' % sample_rate ):
+ track_id, provider, player_id, cancelled, chunksize=fade_bytes, resample=sample_rate):
cur_chunk += 1
if cur_chunk <= 2 and not last_fadeout_data:
- # fade-in part but this is the first track so just pass it to the final file
+ # fade-in part but no fadeout_part available so just pass it to the output directly
sox_proc.stdin.write(chunk)
await sox_proc.stdin.drain()
bytes_written += len(chunk)
await sox_proc.stdin.drain()
bytes_written += len(remaining_bytes)
del remaining_bytes
+ prev_chunk = None # needed to prevent this chunk being sent again
elif prev_chunk and is_last_chunk:
# last chunk received so create the fadeout_part with the previous chunk and this chunk
# and strip off silence
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
last_part, stderr = await process.communicate(prev_chunk + chunk)
- last_fadeout_data = last_part[-fade_bytes:]
- remaining_bytes = last_part[:-fade_bytes]
- # write remaining bytes
- sox_proc.stdin.write(remaining_bytes)
- bytes_written += len(remaining_bytes)
- await sox_proc.stdin.drain()
- del last_part
- del remaining_bytes
+ if len(last_part) < fade_bytes:
+ # not enough data for crossfade duration
+ LOGGER.warning("not enough data for fadeout so skip crossfade... %s" % len(last_part))
+ sox_proc.stdin.write(last_part)
+ bytes_written += len(last_part)
+ await sox_proc.stdin.drain()
+ del last_part
+ else:
+ # store fade section to be picked up for next track
+ last_fadeout_data = last_part[-fade_bytes:]
+ remaining_bytes = last_part[:-fade_bytes]
+ # write remaining bytes
+ sox_proc.stdin.write(remaining_bytes)
+ bytes_written += len(remaining_bytes)
+ await sox_proc.stdin.drain()
+ del last_part
+ del remaining_bytes
else:
# middle part of the track
# keep previous chunk in memory so we have enough samples to perform the crossfade
# wait for the queue to consume the data
# this prevents that the entire track is sitting in memory
# and it helps a bit in the quest to follow where we are in the queue
- while buffer.qsize() > 2 and not cancelled.is_set():
+ while buffer.qsize() > 1 and not cancelled.is_set():
await asyncio.sleep(1)
# end of the track reached
# WIP: update actual duration to the queue for more accurate now playing info
await sox_proc.wait()
LOGGER.info("streaming of queue for player %s completed" % player_id)
- async def __get_audio_stream(self, track_id, provider, player_id,
- chunksize=512000, outputfmt='flac -C 0', sox_effects=''):
+ async def __get_audio_stream(self, track_id, provider, player_id, cancelled,
+ chunksize=512000, resample=None):
''' get audio stream from provider and apply additional effects/processing where/if needed'''
if self.mass.config['base']['http_streamer']['volume_normalisation']:
gain_correct = await self.__get_track_gain_correct(track_id, provider)
gain_correct = 'vol %s dB ' % gain_correct
else:
gain_correct = ''
- sox_effects += await self.__get_player_sox_options(track_id, provider, player_id, False)
+ sox_effects = await self.__get_player_sox_options(track_id, provider, player_id, False)
+ outputfmt = 'flac -C 0'
+ if resample:
+ outputfmt = 'raw -b 32 -c 2 -e signed-integer'
+ sox_effects += ' rate -v %s' % resample
# stream audio from provider
streamdetails = asyncio.run_coroutine_threadsafe(
self.mass.music.providers[provider].get_stream_details(track_id), self.mass.event_loop).result()
LOGGER.debug("Running sox with args: %s" % args)
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE)
+ # fire event that streaming has started for this track (needed by some streaming providers)
+ streamdetails["provider"] = provider
+ streamdetails["track_id"] = track_id
+ streamdetails["player_id"] = player_id
+ self.mass.event_loop.create_task(self.mass.event('streaming_started', streamdetails))
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
prev_chunk = b''
+ bytes_sent = 0
while not process.stdout.at_eof():
try:
chunk = await process.stdout.readexactly(chunksize)
except asyncio.streams.IncompleteReadError:
chunk = await process.stdout.read(chunksize)
- if prev_chunk:
+ if not chunk:
+ break
+ if prev_chunk and not cancelled.is_set():
yield (False, prev_chunk)
+ bytes_sent += len(prev_chunk)
prev_chunk = chunk
# yield last chunk
- yield (True, prev_chunk)
+ if not cancelled.is_set():
+ yield (True, prev_chunk)
+ bytes_sent += len(prev_chunk)
await process.wait()
- LOGGER.info("__get_audio_stream for track_id %s completed" % track_id)
+ if cancelled.is_set():
+ LOGGER.warning("__get_audio_stream for track_id %s interrupted" % track_id)
+ else:
+ LOGGER.info("__get_audio_stream for track_id %s completed" % track_id)
+ # fire event that streaming has ended for this track (needed by some streaming providers)
+ if resample:
+ bytes_per_second = resample * (32/8) * 2
+ else:
+ bytes_per_second = streamdetails["sample_rate"] * (streamdetails["bit_depth"]/8) * 2
+ seconds_streamed = int(bytes_sent/bytes_per_second)
+ streamdetails["seconds"] = seconds_streamed
+ self.mass.event_loop.create_task(self.mass.event('streaming_ended', streamdetails))
# send task to background to analyse the audio
self.mass.event_loop.create_task(self.__analyze_audio(track_id, provider))
''' analyze track audio, for now we only calculate EBU R128 loudness '''
track_key = '%s%s' %(track_id, provider)
if track_key in self.analyze_jobs:
- return # prevent multiple analyze jobs for same tracks
+ return # prevent multiple analyze jobs for same track
self.analyze_jobs[track_key] = True
streamdetails = await self.mass.music.providers[provider].get_stream_details(track_id)
track_loudness = await self.mass.db.get_track_loudness(track_id, provider)
def __init__(self, mass, username, password):
self.name = 'Qobuz'
self.prov_id = 'qobuz'
- self._cur_user = None
self.mass = mass
self.cache = mass.cache
self.http_session = aiohttp.ClientSession(loop=mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
self.__username = username
self.__password = password
- self.__user_auth_token = None
+ self.__user_auth_info = None
self.__app_id = "285473059" # TEMP! Own key requested
self.__app_secret = "47249d0eaefa6bf43a959c09aacdbce8" # TEMP! Own key requested
self.__logged_in = False
self.throttler = Throttler(rate_limit=1, period=1)
+ mass.event_loop.create_task(mass.add_event_listener(self.mass_event))
async def search(self, searchstring, media_types=List[MediaType], limit=5):
''' perform search on the provider '''
"path": streamdetails['url'],
"content_type": streamdetails['mime_type'].split('/')[1],
"sample_rate": int(streamdetails['sampling_rate']*1000),
- "bit_depth": streamdetails['bit_depth']
+ "bit_depth": streamdetails['bit_depth'],
+ "details": streamdetails # we need these details for reporting playback
}
+ async def mass_event(self, msg, msg_details):
+ ''' received event from mass '''
+ if msg == "streaming_started" and msg_details['provider'] == self.prov_id:
+ # report streaming started to qobuz
+ LOGGER.debug("streaming_started %s" % msg_details["track_id"])
+ device_id = self.__user_auth_info["user"]["device"]["id"]
+ credential_id = self.__user_auth_info["user"]["credential"]["id"]
+ user_id = self.__user_auth_info["user"]["id"]
+ format_id = msg_details["details"]["format_id"]
+ timestamp = int(time.time())
+ events=[{"online": True, "sample": False, "intent": "stream", "device_id": device_id,
+ "track_id": msg_details["track_id"], "purchase": False, "date": timestamp,
+ "credential_id": credential_id, "user_id": user_id, "local": False, "format_id":format_id}]
+ await self.__post_data("track/reportStreamingStart", data=events)
+ elif msg == "streaming_ended" and msg_details['provider'] == self.prov_id:
+ # report streaming ended to qobuz
+ LOGGER.debug("streaming_ended %s - seconds played: %s" %(msg_details["track_id"], msg_details["seconds"]) )
+ device_id = self.__user_auth_info["user"]["device"]["id"]
+ credential_id = self.__user_auth_info["user"]["credential"]["id"]
+ user_id = self.__user_auth_info["user"]["id"]
+ format_id = msg_details["details"]["format_id"]
+ timestamp = int(time.time())
+ events=[{"online": True, "sample": False, "intent": "stream", "device_id": device_id,
+ "track_id": msg_details["track_id"], "purchase": False, "date": timestamp, "duration": msg_details["seconds"],
+ "credential_id": credential_id, "user_id": user_id, "local": False, "format_id":format_id}]
+ await self.__post_data("track/reportStreamingStart", data=events)
+
async def __parse_artist(self, artist_obj):
''' parse spotify artist object to generic layout '''
artist = Artist()
})
playlist.name = playlist_obj['name']
playlist.owner = playlist_obj['owner']['name']
- playlist.is_editable = playlist_obj['owner']['id'] == self._cur_user["id"] or playlist_obj['is_collaborative']
+ playlist.is_editable = playlist_obj['owner']['id'] == self.__user_auth_info["user"]["id"] or playlist_obj['is_collaborative']
if playlist_obj.get('images300'):
playlist.metadata["image"] = playlist_obj['images300'][0]
if playlist_obj.get('url'):
async def __auth_token(self):
''' login to qobuz and store the token'''
- if self.__user_auth_token:
- return self.__user_auth_token
- params = { "username": self.__username, "password": self.__password}
+ if self.__user_auth_info:
+ return self.__user_auth_info["user_auth_token"]
+ params = { "username": self.__username, "password": self.__password, "device_manufacturer_id": "music_assistant"}
details = await self.__get_data("user/login", params, ignore_cache=True)
- self._cur_user = details["user"]
- self.__user_auth_token = details["user_auth_token"]
- LOGGER.info("Succesfully logged in to Qobuz as %s" % (self._cur_user["display_name"]))
+ self.__user_auth_info = details
+ LOGGER.info("Succesfully logged in to Qobuz as %s" % (details["user"]["display_name"]))
return details["user_auth_token"]
async def __get_all_items(self, endpoint, params={}, key="playlists", limit=0, offset=0, cache_checksum=None):
params["request_ts"] = request_ts
params["request_sig"] = request_sig
params["app_id"] = self.__app_id
- params["user_auth_token"] = self.__user_auth_token
+ params["user_auth_token"] = await self.__auth_token()
async with self.throttler:
async with self.http_session.get(url, headers=headers, params=params) as response:
result = await response.json()
result = None
return result
+ async def __post_data(self, endpoint, params={}, data={}):
+ ''' post data to api'''
+ url = "http://www.qobuz.com/api.json/0.2/%s" % endpoint
+ params["app_id"] = self.__app_id
+ params["user_auth_token"] = await self.__auth_token()
+ async with self.http_session.post(url, params=params, json=data) as response:
+ result = await response.json()
+ if not result or 'error' in result:
+ LOGGER.error(url)
+ LOGGER.error(params)
+ LOGGER.error(result)
+ result = None
+ return result
\ No newline at end of file