From: marcelveldt Date: Thu, 24 Oct 2019 22:17:33 +0000 (+0200) Subject: stability and performance fixes X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=ce8835bf046b462d888d0336ee8dc4cce90c2dbb;p=music-assistant-server.git stability and performance fixes --- diff --git a/mass.py b/mass.py index 76472fb6..cd7dd1ed 100755 --- a/mass.py +++ b/mass.py @@ -85,7 +85,7 @@ if __name__ == "__main__": event_loop.set_debug(True) logger.setLevel(logging.DEBUG) logging.getLogger('aiosqlite').setLevel(logging.INFO) - logging.getLogger('asyncio').setLevel(logging.INFO) + logging.getLogger('asyncio').setLevel(logging.WARNING) else: logger.setLevel(logging.INFO) # start music assistant! diff --git a/music_assistant/__init__.py b/music_assistant/__init__.py index 559c74bb..f1bf784c 100644 --- a/music_assistant/__init__.py +++ b/music_assistant/__init__.py @@ -11,6 +11,7 @@ import uuid import json import time import logging +import threading from .database import Database from .config import MassConfig @@ -60,14 +61,13 @@ class MusicAssistant(): def handle_exception(self, loop, context): ''' global exception handler ''' + LOGGER.debug(f"Caught exception: {context}") loop.default_exception_handler(context) - #LOGGER.exception(f"Caught exception: {context}") async def signal_event(self, msg, msg_details:dict): ''' signal (systemwide) event ''' - if not (msg_details == None or isinstance(msg_details, (str, int, dict))): + if not (msg_details == None or isinstance(msg_details, (str, dict))): msg_details = serialize_values(msg_details) - LOGGER.debug("Event: %s" %(msg)) listeners = list(self.event_listeners.values()) for callback, eventfilter in listeners: if not eventfilter or eventfilter in msg: @@ -81,4 +81,22 @@ class MusicAssistant(): async def remove_event_listener(self, cb_id): ''' remove callback from our event listeners ''' - self.event_listeners.pop(cb_id, None) \ No newline at end of file + self.event_listeners.pop(cb_id, None) + + def create_task(self, corofcn, wait_for_result=False, ignore_exception=None): + ''' helper to create a new task on the main event loop ''' + if threading.current_thread() is threading.main_thread(): + if wait_for_result: + raise Exception("can not wait for result in main event loop!") + return self.event_loop.create_task(corofcn) + else: + # threadsafe + future = asyncio.run_coroutine_threadsafe(corofcn, self.event_loop) + if wait_for_result: + try: + return future.result() + except Exception as exc: + if ignore_exception and isinstance(exc, ignore_exception): + return None + raise exc + return future diff --git a/music_assistant/cache.py b/music_assistant/cache.py index e894bfdd..d6d92067 100644 --- a/music_assistant/cache.py +++ b/music_assistant/cache.py @@ -15,6 +15,7 @@ from .utils import run_periodic, LOGGER, parse_track_title class Cache(object): '''basic stateless caching system ''' + # TODO: convert to aiosql _database = None def __init__(self, datapath): @@ -27,7 +28,13 @@ class Cache(object): ''' async initialize of cache module ''' asyncio.create_task(self._do_cleanup()) - async def get(self, endpoint, checksum=""): + async def get_async(self, endpoint, checksum=""): + return await asyncio.get_running_loop().run_in_executor(None, self.get, endpoint, checksum) + + async def set_async(self, endpoint, data, checksum="", expiration=datetime.timedelta(days=14)): + return await asyncio.get_running_loop().run_in_executor(None, self.set, endpoint, data, checksum, expiration) + + def get(self, endpoint, checksum=""): ''' get object from cache and return the results endpoint: the (unique) name of the cache object as reference @@ -44,7 +51,7 @@ class Cache(object): result = eval(cache_data[1]) return result - async def set(self, endpoint, data, checksum="", expiration=datetime.timedelta(days=14)): + def set(self, endpoint, data, checksum="", expiration=datetime.timedelta(days=14)): ''' set data in cache ''' @@ -55,6 +62,10 @@ class Cache(object): self._execute_sql(query, (endpoint, expires, data, checksum)) @run_periodic(3600) + async def auto_cleanup(self): + ''' scheduled auto cleanup task ''' + asyncio.get_running_loop().run_in_executor(None, self._do_cleanup) + async def _do_cleanup(self): '''perform cleanup task''' cur_time = datetime.datetime.now() @@ -166,12 +177,12 @@ def use_cache(cache_days=14, cache_hours=8): else: cache_str += ".%s%s" %(key,value) cache_str = cache_str.lower() - cachedata = await method_class.cache.get(cache_str, checksum=cache_checksum) + cachedata = await method_class.cache.get_async(cache_str, checksum=cache_checksum) if cachedata is not None: return cachedata else: result = await func(*args, **kwargs) - await method_class.cache.set(cache_str, result, checksum=cache_checksum, expiration=datetime.timedelta(days=cache_days, hours=cache_hours)) + await method_class.cache.set_async(cache_str, result, checksum=cache_checksum, expiration=datetime.timedelta(days=cache_days, hours=cache_hours)) return result return wrapped return wrapper diff --git a/music_assistant/homeassistant.py b/music_assistant/homeassistant.py index 6434e875..6c6cdc82 100644 --- a/music_assistant/homeassistant.py +++ b/music_assistant/homeassistant.py @@ -81,10 +81,10 @@ class HomeAssistant(): return self.http_session = aiohttp.ClientSession( loop=self.mass.event_loop, connector=aiohttp.TCPConnector()) - self.mass.event_loop.create_task(self.__hass_websocket()) + self.mass.create_task(self.__hass_websocket()) await self.mass.add_event_listener(self.mass_event, EVENT_PLAYER_CHANGED) await self.mass.add_event_listener(self.mass_event, EVENT_PLAYER_ADDED) - self.mass.event_loop.create_task(self.__get_sources()) + self.mass.create_task(self.__get_sources()) async def get_state_async(self, entity_id, attribute='state'): ''' get state of a hass entity (async)''' @@ -105,7 +105,7 @@ class HomeAssistant(): else: return state_obj else: - self.mass.event_loop.create_task(self.__request_state(entity_id)) + self.mass.create_task(self.__request_state(entity_id)) return None async def __request_state(self, entity_id): @@ -113,7 +113,7 @@ class HomeAssistant(): state_obj = await self.__get_data('states/%s' % entity_id) if 'state' in state_obj: self._tracked_entities[entity_id] = state_obj - self.mass.event_loop.create_task( + self.mass.create_task( self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, state_obj)) async def mass_event(self, msg, msg_details): @@ -126,7 +126,7 @@ class HomeAssistant(): if event_type == 'state_changed': if event_data['entity_id'] in self._tracked_entities: self._tracked_entities[event_data['entity_id']] = event_data['new_state'] - self.mass.event_loop.create_task( + self.mass.create_task( self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, event_data)) elif event_type == 'call_service' and event_data['domain'] == 'media_player': await self.__handle_player_command(event_data['service'], event_data['service_data']) @@ -301,6 +301,8 @@ class HomeAssistant(): # LOGGER.info(data) elif msg.type == aiohttp.WSMsgType.ERROR: raise Exception("error in websocket") + except asyncio.CancelledError: + raise asyncio.CancelledError() except Exception as exc: LOGGER.exception(exc) await asyncio.sleep(10) diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index c262ef1e..f32925f1 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -32,80 +32,66 @@ class HTTPStreamer(): async def setup(self): ''' async initialize of module ''' pass - # self.mass.event_loop.create_task( + # self.mass.create_task( # asyncio.start_server(self.sockets_streamer, '0.0.0.0', 8093)) async def stream(self, http_request): ''' start stream for a player ''' - # make sure we have a valid player + # make sure we have valid params player_id = http_request.match_info.get('player_id','') player = await self.mass.players.get_player(player_id) - assert(player) + if not player: + return web.Response(status=404, reason="Player not found") + if not player.queue.use_queue_stream: + queue_item_id = http_request.match_info.get('queue_item_id') + queue_item = await player.queue.by_item_id(queue_item_id) + if not queue_item: + return web.Response(status=404, reason="Invalid Queue item Id") # prepare headers as audio/flac content resp = web.StreamResponse(status=200, reason='OK', - headers={ - 'Content-Type': 'audio/flac' - }) + headers={'Content-Type': 'audio/flac'}) await resp.prepare(http_request) - # send content only on GET request - if http_request.method.upper() != 'GET': - return resp - # stream audio + # run the streamer in executor to prevent the subprocess locking up our eventloop cancelled = threading.Event() if player.queue.use_queue_stream: - # use queue stream - bg_task = run_async_background_task( - None, - self.__stream_queue, player, resp, cancelled) + bg_task = self.mass.event_loop.run_in_executor(None, + self.__get_queue_stream, player, resp, cancelled) else: - # single track stream - queue_item_id = http_request.match_info.get('queue_item_id') - queue_item = await player.queue.by_item_id(queue_item_id) - assert(queue_item) - bg_task = run_async_background_task( - None, - self.__stream_single, player, queue_item, resp, cancelled) + bg_task = self.mass.event_loop.run_in_executor(None, + self.__get_queue_item_stream, player, queue_item, resp, cancelled) # let the streaming begin! try: await asyncio.gather(bg_task) except (asyncio.CancelledError, asyncio.TimeoutError): - LOGGER.debug("stream request cancelled") cancelled.set() - # wait for bg_task to finish - await asyncio.gather(bg_task) raise asyncio.CancelledError() return resp - async def __stream_single(self, player, queue_item, buffer, cancelled): + def __get_queue_item_stream(self, player, queue_item, buffer, cancelled): ''' start streaming single queue track ''' LOGGER.debug("stream single queue track started for track %s on player %s" % (queue_item.name, player.name)) - audio_stream = self.__get_audio_stream(player, queue_item, cancelled) - async for is_last_chunk, audio_chunk in audio_stream: + for is_last_chunk, audio_chunk in self.__get_audio_stream(player, queue_item, cancelled): if cancelled.is_set(): # http session ended # we must consume the data to prevent hanging subprocess instances continue # put chunk in buffer - asyncio.run_coroutine_threadsafe( - buffer.write(audio_chunk), - self.mass.event_loop) - # wait for the queue to consume the data - if not cancelled.is_set(): - await asyncio.sleep(0.5) + self.mass.create_task( + buffer.write(audio_chunk), wait_for_result=True, + ignore_exception=BrokenPipeError) # all chunks received: streaming finished - gc.collect() if cancelled.is_set(): LOGGER.debug("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name)) else: # indicate EOF if no more data - asyncio.run_coroutine_threadsafe( - buffer.write_eof(), - self.mass.event_loop) + self.mass.create_task( + buffer.write_eof(), wait_for_result=True, + ignore_exception=BrokenPipeError) LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name)) - async def __stream_queue(self, player, buffer, cancelled): + def __get_queue_stream(self, player, buffer, cancelled): ''' start streaming all queue tracks ''' sample_rate = try_parse_int(player.settings['max_sample_rate']) fade_length = try_parse_int(player.settings["crossfade_duration"]) @@ -119,8 +105,6 @@ class HTTPStreamer(): pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate args = 'sox -t %s - -t flac -C 0 -' % pcm_args # start sox process - # we use normal subprocess instead of asyncio because of bug with executor - # this should be fixed with python 3.8 sox_proc = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) @@ -131,14 +115,13 @@ class HTTPStreamer(): if not chunk: break if chunk and not cancelled.is_set(): - asyncio.run_coroutine_threadsafe( - buffer.write(chunk), self.mass.event_loop) + self.mass.create_task(buffer.write(chunk), + wait_for_result=True, ignore_exception=BrokenPipeError) del chunk # indicate EOF if no more data if not cancelled.is_set(): - asyncio.run_coroutine_threadsafe( - buffer.write_eof(), self.mass.event_loop) - LOGGER.debug("stream queue player %s: fill buffer completed" % player.name) + self.mass.create_task(buffer.write_eof(), + wait_for_result=True, ignore_exception=BrokenPipeError) # start fill buffer task in background fill_buffer_thread = threading.Thread(target=fill_buffer) fill_buffer_thread.start() @@ -167,7 +150,7 @@ class HTTPStreamer(): prev_chunk = None bytes_written = 0 # handle incoming audio chunks - async for is_last_chunk, chunk in self.__get_audio_stream( + for is_last_chunk, chunk in self.__get_audio_stream( player, queue_track, cancelled, chunksize=fade_bytes, resample=sample_rate): cur_chunk += 1 @@ -188,6 +171,12 @@ class HTTPStreamer(): first_part, std_err = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE).communicate(prev_chunk + chunk) + if len(first_part) < fade_bytes: + # part is too short after the strip action?! + # so we just cut off at the fade position + first_part = prev_chunk+chunk + if len(first_part) >= fade_bytes: + first_part = first_part[fade_bytes:] fade_in_part = first_part[:fade_bytes] remaining_bytes = first_part[fade_bytes:] del first_part @@ -207,12 +196,18 @@ class HTTPStreamer(): prev_chunk = None # needed to prevent this chunk being sent again ### HANDLE LAST PART OF TRACK elif prev_chunk and is_last_chunk: - # last chunk received so create the fadeout_part with the previous chunk and this chunk + # last chunk received so create the last_part with the previous chunk and this chunk # and strip off silence args = 'sox --ignore-length -t %s - -t %s - reverse silence 1 0.1 1%% reverse' % (pcm_args, pcm_args) last_part, stderr = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE).communicate(prev_chunk + chunk) + if len(last_part) < fade_bytes: + # part is too short after the strip action + # so we just cut off at the fade position + last_part = prev_chunk+chunk + if len(last_part) >= fade_bytes: + last_part = last_part[:fade_bytes] if not player.queue.crossfade_enabled: # crossfading is not enabled so just pass the (stripped) audio data sox_proc.stdin.write(last_part) @@ -221,25 +216,15 @@ class HTTPStreamer(): del chunk else: # handle crossfading support - if len(last_part) < fade_bytes: - # not enough data for crossfade duration after the strip action... - last_part = prev_chunk + chunk - if len(last_part) < fade_bytes: - # still not enough data so we'll skip the crossfading - LOGGER.debug("not enough data for fadeout so skip crossfade... %s" % len(last_part)) - sox_proc.stdin.write(last_part) - bytes_written += len(last_part) - del last_part - else: - # store fade section to be picked up for next track - last_fadeout_data = last_part[-fade_bytes:] - remaining_bytes = last_part[:-fade_bytes] - # write remaining bytes - sox_proc.stdin.write(remaining_bytes) - bytes_written += len(remaining_bytes) - del last_part - del remaining_bytes - del chunk + # store fade section to be picked up for next track + last_fadeout_data = last_part[-fade_bytes:] + remaining_bytes = last_part[:-fade_bytes] + # write remaining bytes + sox_proc.stdin.write(remaining_bytes) + bytes_written += len(remaining_bytes) + del last_part + del remaining_bytes + del chunk ### MIDDLE PARTS OF TRACK else: # middle part of the track @@ -251,9 +236,6 @@ class HTTPStreamer(): else: prev_chunk = chunk del chunk - ### throttle to prevent entire track sitting in memory - if not cancelled.is_set(): - await asyncio.sleep(fade_length) # end of the track reached if cancelled.is_set(): # break out the loop if the http session is cancelled @@ -263,7 +245,6 @@ class HTTPStreamer(): accurate_duration = bytes_written / int(sample_rate * 4 * 2) queue_track.duration = accurate_duration LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) - LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration)) # run garbage collect manually to avoid too much memory fragmentation gc.collect() # end of queue reached, pass last fadeout bits to final output @@ -277,18 +258,21 @@ class HTTPStreamer(): del sox_proc # run garbage collect manually to avoid too much memory fragmentation gc.collect() - LOGGER.info("streaming of queue for player %s completed" % player.name) + if cancelled.is_set(): + LOGGER.info("streaming of queue for player %s interrupted" % player.name) + else: + LOGGER.info("streaming of queue for player %s completed" % player.name) - async def __get_audio_stream(self, player, queue_item, cancelled, + def __get_audio_stream(self, player, queue_item, cancelled, chunksize=128000, resample=None): ''' get audio stream from provider and apply additional effects/processing where/if needed''' # get stream details from provider # sort by quality and check track availability for prov_media in sorted(queue_item.provider_ids, key=operator.itemgetter('quality'), reverse=True): - streamdetails = asyncio.run_coroutine_threadsafe( + streamdetails = self.mass.create_task( self.mass.music.providers[prov_media['provider']].get_stream_details(prov_media['item_id']), - self.mass.event_loop).result() + wait_for_result=True) if streamdetails: streamdetails['player_id'] = player.player_id if not 'item_id' in streamdetails: @@ -304,7 +288,7 @@ class HTTPStreamer(): yield (True, b'') return # get sox effects and resample options - sox_options = await self.__get_player_sox_options(player, streamdetails) + sox_options = self.__get_player_sox_options(player, streamdetails) outputfmt = 'flac -C 0' if resample: outputfmt = 'raw -b 32 -c 2 -e signed-integer' @@ -321,12 +305,10 @@ class HTTPStreamer(): args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, sox_options) # start sox process - # we use normal subprocess instead of asyncio because of bug with executor - # this should be fixed with python 3.8 process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE) # fire event that streaming has started for this track - asyncio.run_coroutine_threadsafe( - self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails), self.mass.event_loop) + self.mass.create_task( + self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails)) # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly bytes_sent = 0 @@ -339,32 +321,26 @@ class HTTPStreamer(): chunk = process.stdout.read(chunksize) if len(chunk) < chunksize: # last chunk - LOGGER.debug("last chunk received") bytes_sent += len(chunk) yield (True, chunk) break else: bytes_sent += len(chunk) yield (False, chunk) - # fire event that streaming has ended - asyncio.run_coroutine_threadsafe( - self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails), self.mass.event_loop) - # send task to main event loop to analyse the audio + self.mass.create_task(self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails)) + # send task to background to analyse the audio if queue_item.media_type == MediaType.Track: - self.mass.event_loop.call_soon_threadsafe( - asyncio.ensure_future, self.__analyze_audio(streamdetails)) - # run garbage collect manually to avoid too much memory fragmentation - gc.collect() + self.mass.event_loop.run_in_executor(None, self.__analyze_audio, streamdetails) - async def __get_player_sox_options(self, player, streamdetails): + def __get_player_sox_options(self, player, streamdetails): ''' get player specific sox effect options ''' sox_options = [] # volume normalisation - gain_correct = asyncio.run_coroutine_threadsafe( + gain_correct = self.mass.create_task( self.mass.players.get_gain_correct( player.player_id, streamdetails["item_id"], streamdetails["provider"]), - self.mass.event_loop).result() + wait_for_result=True) if gain_correct != 0: sox_options.append('vol %s dB ' % gain_correct) # downsample if needed @@ -382,21 +358,20 @@ class HTTPStreamer(): sox_options.append(player.settings['sox_options']) return " ".join(sox_options) - async def __analyze_audio(self, streamdetails): + def __analyze_audio(self, streamdetails): ''' analyze track audio, for now we only calculate EBU R128 loudness ''' item_key = '%s%s' %(streamdetails["item_id"], streamdetails["provider"]) if item_key in self.analyze_jobs: return # prevent multiple analyze jobs for same track self.analyze_jobs[item_key] = True - track_loudness = await self.mass.db.get_track_loudness( - streamdetails["item_id"], streamdetails["provider"]) + track_loudness = self.mass.create_task(self.mass.db.get_track_loudness( + streamdetails["item_id"], streamdetails["provider"]), wait_for_result=True) if track_loudness == None: # only when needed we do the analyze stuff LOGGER.debug('Start analyzing track %s' % item_key) if streamdetails['type'] == 'url': - async with aiohttp.ClientSession() as session: - async with session.get(streamdetails["path"], verify_ssl=False) as resp: - audio_data = await resp.read() + import urllib + audio_data = urllib.request.urlopen(streamdetails["path"]).read() elif streamdetails['type'] == 'executable': audio_data = subprocess.check_output(streamdetails["path"], shell=True) # calculate BS.1770 R128 integrated loudness @@ -405,7 +380,8 @@ class HTTPStreamer(): meter = pyloudnorm.Meter(rate) # create BS.1770 meter loudness = meter.integrated_loudness(data) # measure loudness del data - await self.mass.db.set_track_loudness(streamdetails["item_id"], streamdetails["provider"], loudness) + self.mass.create_task( + self.mass.db.set_track_loudness(streamdetails["item_id"], streamdetails["provider"], loudness)) del audio_data LOGGER.debug("Integrated loudness of track %s is: %s" %(item_key, loudness)) self.analyze_jobs.pop(item_key, None) @@ -429,62 +405,8 @@ class HTTPStreamer(): process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) crossfade_part, stderr = process.communicate() - LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part)) fadeinfile.close() fadeoutfile.close() del fadeinfile del fadeoutfile return crossfade_part - - async def start_stream(self, clients_needed): - # wait for clients - print("wait for clients...") - track = asyncio.run_coroutine_threadsafe( - self.mass.music.track('2741', provider='database'), - self.mass.event_loop).result() - player_id = '1523403a-4cc4-f151-29d1-758822807128' - player = self.mass.players._players[player_id] - cancelled = threading.Event() - # wait for clients - while len(self.stream_clients) < clients_needed: - await asyncio.sleep(0.1) - # start streaming - while self.stream_clients: - audio_stream = self.__get_audio_stream(player, track, cancelled) - async for is_last_chunk, audio_chunk in audio_stream: - for client in self.stream_clients: - try: - client.write(audio_chunk) - await client.drain() - except ConnectionResetError: - print('client disconnected') - client.close() - self.stream_clients.remove(client) - await asyncio.sleep(1) - print("all clients disconnected") - return - - async def add_client(self, client_writer, client_msg): - print("new client connected!") - for line in client_msg.decode().split('\r\n'): - print(line) - msg = 'HTTP/1.0 200 OK\r\n' - msg += "Content-Type: audio/flac\r\n" - msg += "Transfer-Encoding: chunked\r\n\r\n" - client_writer.write(msg.encode()) - await client_writer.drain() - self.stream_clients.append(client_writer) - if len(self.stream_clients) == 1: - bg_task = run_async_background_task( - None, - self.start_stream, 2) - - async def sockets_streamer(self, reader, writer): - while True: - request = await reader.read(1024) - if request: - await self.add_client(writer, request) - else: - print("client lost") - break - diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 29806f31..9ee63171 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -102,7 +102,7 @@ class Player(): #### Common implementation, should NOT be overrridden ##### - def __init__(self, mass, player_id, prov_id, is_group=False): + def __init__(self, mass, player_id, prov_id): # private attributes self.mass = mass self._player_id = player_id # unique id for this player @@ -110,7 +110,6 @@ class Player(): self._name = '' self._state = PlayerState.Stopped self._group_childs = [] - self._last_group_parent = None self._powered = False self._cur_time = 0 self._media_position_updated_at = 0 @@ -118,20 +117,13 @@ class Player(): self._volume_level = 0 self._muted = False self._queue = PlayerQueue(mass, self) - self._player_settings = None + self.__update_player_settings() self._initialized = False - self._last_event = 0 - self._update_cur_time_task = None # public attributes self.supports_queue = True # has native support for a queue self.supports_gapless = False # has native gapless support self.supports_crossfade = False # has native crossfading support - - def __del__(self): - if self._update_cur_time_task: - self._update_cur_time_task.cancel() - @property def player_id(self): ''' [PROTECTED] player_id of this player ''' @@ -155,7 +147,7 @@ class Player(): ''' [PROTECTED] set (real) name of this player ''' if name != self._name: self._name = name - self.mass.event_loop.create_task(self.update()) + self.mass.create_task(self.update()) @property def is_group(self): @@ -185,25 +177,25 @@ class Player(): ''' [PROTECTED] set group_childs property of this player ''' if group_childs != self._group_childs: self._group_childs = group_childs - self.mass.event_loop.create_task(self.update()) + self.mass.create_task(self.update()) for child_player_id in group_childs: - self.mass.event_loop.create_task( + self.mass.create_task( self.mass.players.trigger_update(child_player_id)) def add_group_child(self, child_player_id): ''' add player as child to this group player ''' if not child_player_id in self._group_childs: self._group_childs.append(child_player_id) - self.mass.event_loop.create_task(self.update()) - self.mass.event_loop.create_task( + self.mass.create_task(self.update()) + self.mass.create_task( self.mass.players.trigger_update(child_player_id)) def remove_group_child(self, child_player_id): ''' remove player as child from this group player ''' if child_player_id in self._group_childs: self._group_childs.remove(child_player_id) - self.mass.event_loop.create_task(self.update()) - self.mass.event_loop.create_task( + self.mass.create_task(self.update()) + self.mass.create_task( self.mass.players.trigger_update(child_player_id)) @property @@ -215,7 +207,6 @@ class Player(): for group_parent_id in self.group_parents: group_player = self.mass.players.get_player_sync(group_parent_id) if group_player and group_player.state != PlayerState.Off: - self._last_group_parent = group_parent_id return group_player.state return self._state @@ -224,7 +215,7 @@ class Player(): ''' [PROTECTED] set state property of this player ''' if state != self._state: self._state = state - self.mass.event_loop.create_task(self.update(update_queue=True)) + self.mass.create_task(self.update(update_queue=True)) @property def powered(self): @@ -251,7 +242,7 @@ class Player(): ''' [PROTECTED] set (real) power state for this player ''' if powered != self._powered: self._powered = powered - self.mass.event_loop.create_task(self.update()) + self.mass.create_task(self.update()) @property def cur_time(self): @@ -271,7 +262,7 @@ class Player(): if cur_time != self._cur_time: self._cur_time = cur_time self._media_position_updated_at = time.time() - self.mass.event_loop.create_task(self.update(update_queue=True)) + self.mass.create_task(self.update(update_queue=True)) @property def media_position_updated_at(self): @@ -293,7 +284,7 @@ class Player(): ''' [PROTECTED] set cur_uri (uri loaded in player) property of this player ''' if cur_uri != self._cur_uri: self._cur_uri = cur_uri - self.mass.event_loop.create_task(self.update(update_queue=True)) + self.mass.create_task(self.update(update_queue=True)) @property def volume_level(self): @@ -325,10 +316,10 @@ class Player(): volume_level = try_parse_int(volume_level) if volume_level != self._volume_level: self._volume_level = volume_level - self.mass.event_loop.create_task(self.update()) + self.mass.create_task(self.update()) # trigger update on group player for group_parent_id in self.group_parents: - self.mass.event_loop.create_task( + self.mass.create_task( self.mass.players.trigger_update(group_parent_id)) @property @@ -342,7 +333,7 @@ class Player(): is_muted = try_parse_bool(is_muted) if is_muted != self._muted: self._muted = is_muted - self.mass.event_loop.create_task(self.update()) + self.mass.create_task(self.update()) @property def enabled(self): @@ -444,13 +435,18 @@ class Player(): domain = self.settings['hass_power_entity'].split('.')[0] service_data = { 'entity_id': self.settings['hass_power_entity']} await self.mass.hass.call_service(domain, 'turn_on', service_data) - # power on group parent if needed - last_group_player = await self.mass.players.get_player(self._last_group_parent) - if last_group_player: - await last_group_player.power_on() # handle play on power on - elif self.settings.get('play_power_on'): - await self.play() + if self.settings.get('play_power_on'): + # play player's own queue if it has items + if self._queue.items: + await self.play() + # fallback to the first group parent with items + else: + for group_parent_id in self.group_parents: + group_player = await self.mass.players.get_player(group_parent_id) + if group_player and group_player.queue.items: + await group_player.play() + break async def power_off(self): ''' [PROTECTED] send power OFF command to player ''' @@ -551,34 +547,24 @@ class Player(): async def update(self, update_queue=False): ''' [PROTECTED] signal player updated ''' - self.get_player_settings() if not self._initialized: return # update queue state if player state changes if update_queue: await self.queue.update() await self.mass.signal_event(EVENT_PLAYER_CHANGED, self.to_dict()) - if self._state == PlayerState.Playing and not self._update_cur_time_task and (time.time() - self._media_position_updated_at > 2): - self._update_cur_time_task = self.mass.event_loop.create_task(self.__update_cur_time()) - - async def __update_cur_time(self): - ''' background task that keeps updating the current time ''' - while self._state == PlayerState.Playing: - calc_time = self._cur_time + (time.time() - self._media_position_updated_at) - self.cur_time = calc_time - await asyncio.sleep(1) - self._update_cur_time_task = None @property def settings(self): - ''' [PROTECTED] get (or create) player config settings ''' - if self._player_settings: - return self._player_settings + ''' [PROTECTED] get player config settings ''' + if self.player_id in self.mass.config['player_settings']: + return self.mass.config['player_settings'][self.player_id] else: - return self.get_player_settings() + self.__update_player_settings() + return self.mass.config['player_settings'][self.player_id] - def get_player_settings(self): - ''' [PROTECTED] get (or create) player config settings ''' + def __update_player_settings(self): + ''' [PROTECTED] update player config settings ''' player_settings = self.mass.config['player_settings'].get(self.player_id,{}) # generate config for the player config_entries = [ # default config entries for a player @@ -608,8 +594,6 @@ class Player(): player_settings[key] = def_value self.mass.config['player_settings'][self.player_id] = player_settings self.mass.config['player_settings'][self.player_id]['__desc__'] = config_entries - self._player_settings = self.mass.config['player_settings'][self.player_id] - return player_settings def to_dict(self): ''' instance attributes as dict so it can be serialized to json ''' diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 9093f94b..ed90f345 100755 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -48,7 +48,7 @@ class PlayerQueue(): self._save_busy_ = False self._last_track = None # load previous queue settings from disk - self.mass.event_loop.create_task(self.__load_from_file()) + self.mass.event_loop.run_in_executor(None, self.__load_from_file) @property def shuffle_enabled(self): @@ -151,11 +151,11 @@ class PlayerQueue(): # shuffle requested self._shuffle_enabled = True await self.load(self._items) - self.mass.event_loop.create_task(self._player.update()) + self.mass.create_task(self._player.update()) elif self._shuffle_enabled and not enable_shuffle: self._shuffle_enabled = False # TODO: Unshuffle the list ? - self.mass.event_loop.create_task(self._player.update()) + self.mass.create_task(self._player.update()) async def next(self): ''' request next track in queue ''' @@ -220,9 +220,10 @@ class PlayerQueue(): :param queue_items: a list of QueueItem :param offset: offset from current queue position ''' - insert_at_index = self.cur_index + offset - if not self.items or insert_at_index > len(self.items): + + if not self.items or self.cur_index == None or self.cur_index + offset > len(self.items): return await self.load(queue_items) + insert_at_index = self.cur_index + offset if self.shuffle_enabled: queue_items = await self.__shuffle_items(queue_items) self._items = self._items[:insert_at_index] + queue_items + self._items[insert_at_index:] @@ -300,13 +301,13 @@ class PlayerQueue(): # account for track changing state so trigger track change after 1 second if self._last_track and self._last_track.streamdetails: self._last_track.streamdetails["seconds_played"] = self._last_item_time - self.mass.event_loop.create_task( + self.mass.create_task( self.mass.signal_event(EVENT_PLAYBACK_STOPPED, self._last_track.streamdetails)) - if new_track: - self.mass.event_loop.create_task( + if new_track and new_track.streamdetails: + self.mass.create_task( self.mass.signal_event(EVENT_PLAYBACK_STARTED, new_track.streamdetails)) - self._last_track = new_track - await self.__save_to_file() + self._last_track = new_track + self.mass.event_loop.run_in_executor(None, self.__save_to_file) if self._last_player_state != self._player.state: self._last_player_state = self._player.state if (self._player.cur_time == 0 and @@ -326,7 +327,7 @@ class PlayerQueue(): # can be extended with some more magic last last_played and stuff return random.sample(queue_items, len(queue_items)) - async def __load_from_file(self): + def __load_from_file(self): ''' try to load the saved queue for this player from file ''' player_safe_str = filename_from_string(self._player.player_id) settings_dir = os.path.join(self.mass.datapath, 'queue') @@ -343,7 +344,7 @@ class PlayerQueue(): except Exception as exc: LOGGER.debug("Could not load queue from disk - %s" % str(exc)) - async def __save_to_file(self): + def __save_to_file(self): ''' save current queue settings to file ''' if self._save_busy_: return diff --git a/music_assistant/music_manager.py b/music_assistant/music_manager.py index fc8fa3fd..79a7df00 100755 --- a/music_assistant/music_manager.py +++ b/music_assistant/music_manager.py @@ -27,7 +27,7 @@ class MusicManager(): for prov in self.providers.values(): await prov.setup() # schedule sync task - self.mass.event_loop.create_task(self.sync_music_providers()) + self.mass.create_task(self.sync_music_providers()) async def item(self, item_id, media_type:MediaType, provider='database', lazy=True): ''' get single music item by id and media type''' @@ -254,22 +254,22 @@ class MusicManager(): # actually add the tracks to the playlist on the provider await self.providers[playlist_prov['provider']].add_playlist_tracks(playlist_prov['item_id'], track_ids_to_add) # schedule sync - self.mass.event_loop.create_task(self.sync_playlist_tracks(playlist.item_id, playlist_prov['provider'], playlist_prov['item_id'])) + self.mass.create_task(self.sync_playlist_tracks(playlist.item_id, playlist_prov['provider'], playlist_prov['item_id'])) @run_periodic(3600) async def sync_music_providers(self): ''' periodic sync of all music providers ''' if self.sync_running: return - self.sync_running = True for prov_id in self.providers.keys(): - # sync library artists + self.sync_running = prov_id + # sync library items for each provider (if supported) await try_supported(self.sync_library_artists(prov_id)) await try_supported(self.sync_library_albums(prov_id)) await try_supported(self.sync_library_tracks(prov_id)) await try_supported(self.sync_playlists(prov_id)) await try_supported(self.sync_radios(prov_id)) - self.sync_running = False + self.sync_running = None async def sync_library_artists(self, prov_id): ''' sync library artists for given provider''' diff --git a/music_assistant/musicproviders/file.py b/music_assistant/musicproviders/file.py index 2773ef74..10e74071 100644 --- a/music_assistant/musicproviders/file.py +++ b/music_assistant/musicproviders/file.py @@ -248,23 +248,18 @@ class FileProvider(MusicProvider): tracks += await self.get_album_tracks(album.item_id) return tracks[:10] - async def get_stream_content_type(self, track_id): - ''' return the content type for the given track when it will be streamed''' + async def get_stream_details(self, track_id): + ''' return the content details for the given track when it will be streamed''' if not os.sep in track_id: track_id = base64.b64decode(track_id).decode('utf-8') - return track_id.split('.')[-1] - - async def get_audio_stream(self, track_id): - ''' get audio stream for a track ''' - if not os.sep in track_id: - track_id = base64.b64decode(track_id).decode('utf-8') - with open(track_id) as f: - while True: - line = f.readline() - if line: - yield line - else: - break + # TODO: retrieve sanple rate and bitdepth + return { + "type": "file", + "path": track_id, + "content_type": track_id.split('.')[-1], + "sample_rate": 44100, + "bit_depth": 16 + } async def __parse_track(self, filename): ''' try to parse a track from a filename with taglib ''' diff --git a/music_assistant/musicproviders/qobuz.py b/music_assistant/musicproviders/qobuz.py index 587ec2d7..25166cce 100644 --- a/music_assistant/musicproviders/qobuz.py +++ b/music_assistant/musicproviders/qobuz.py @@ -535,13 +535,17 @@ class QobuzProvider(MusicProvider): try: async with self.throttler: async with self.http_session.get(url, headers=headers, params=params, verify_ssl=False) as response: - result = await response.json() - if not result or 'error' in result: - LOGGER.error(url) + try: + result = await response.json() + if "error" in result: + return None + return result + except Exception as exc: + LOGGER.error(exc) + LOGGER.debug(url) LOGGER.debug(params) - LOGGER.debug(result) - return None - return result + result = response + LOGGER.debug(await response.text()) except Exception as exc: LOGGER.exception(exc) return None @@ -554,6 +558,8 @@ class QobuzProvider(MusicProvider): async with self.http_session.post(url, params=params, json=data, verify_ssl=False) as response: try: result = await response.json() + if "error" in result: + return None return result except Exception as exc: LOGGER.error(exc) diff --git a/music_assistant/playerproviders/chromecast.py b/music_assistant/playerproviders/chromecast.py index bee8acdf..e2a01da6 100644 --- a/music_assistant/playerproviders/chromecast.py +++ b/music_assistant/playerproviders/chromecast.py @@ -9,6 +9,7 @@ import pychromecast from pychromecast.controllers.multizone import MultizoneController from pychromecast.socket_client import CONNECTION_STATUS_CONNECTED, CONNECTION_STATUS_DISCONNECTED import types +import time from ..utils import run_periodic, LOGGER, try_parse_int from ..models.playerprovider import PlayerProvider @@ -31,14 +32,14 @@ PLAYER_CONFIG_ENTRIES = [ class ChromecastPlayer(Player): ''' Chromecast player object ''' - + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._poll_task = self.mass.event_loop.create_task(self.__poll_status()) + self.__cc_report_progress_task = None def __del__(self): - if self._poll_task: - self._poll_task.cancel() + if self.__cc_report_progress_task: + self.__cc_report_progress_task.cancel() async def try_chromecast_command(self, cmd:types.MethodType, *args, **kwargs): ''' guard for disconnected socket client ''' @@ -183,22 +184,18 @@ class ChromecastPlayer(Player): else: send_queue() - @run_periodic(10) - async def __poll_status(self): - ''' request actual status from CC ''' - # this is needed to get some accurate media progress info - if self._state == PlayerState.Playing: - await self.try_chromecast_command(self.cc.media_controller.update_status) + async def __report_progress(self): + ''' report current progress while playing ''' + # chromecast does not send updates of the player's progress (cur_time) + # so we need to send it in periodically + while self._state == PlayerState.Playing: + self.cur_time = self.cc.media_controller.status.adjusted_current_time + await asyncio.sleep(1) + self.__cc_report_progress_task = None async def handle_player_state(self, caststatus=None, - mediastatus=None, connection_status=None): + mediastatus=None): ''' handle a player state message from the socket ''' - # handle connection status - if connection_status: - if connection_status.status == CONNECTION_STATUS_DISCONNECTED: - # schedule a new scan which will handle group parent changes - self.mass.event_loop.create_task( - self.mass.players.providers[self.player_provider].start_chromecast_discovery()) # handle generic cast status if caststatus: self.muted = caststatus.volume_muted @@ -215,6 +212,8 @@ class ChromecastPlayer(Player): self.state = PlayerState.Stopped self.cur_uri = mediastatus.content_id self.cur_time = mediastatus.adjusted_current_time + if self._state == PlayerState.Playing and self.__cc_report_progress_task == None: + self.__cc_report_progress_task = self.mass.create_task(self.__report_progress()) class ChromecastProvider(PlayerProvider): ''' support for ChromeCast Audio ''' @@ -229,7 +228,7 @@ class ChromecastProvider(PlayerProvider): async def setup(self): ''' perform async setup ''' - self.mass.event_loop.create_task( + self.mass.create_task( self.__periodic_chromecast_discovery()) async def __handle_group_members_update(self, mz, added_player=None, removed_player=None): @@ -247,24 +246,20 @@ class ChromecastProvider(PlayerProvider): @run_periodic(1800) async def __periodic_chromecast_discovery(self): ''' run chromecast discovery on interval ''' - await self.start_chromecast_discovery() + self.mass.event_loop.run_in_executor(None, self.run_chromecast_discovery) - async def start_chromecast_discovery(self): + def run_chromecast_discovery(self): ''' background non-blocking chromecast discovery and handler ''' if self._discovery_running: return self._discovery_running = True LOGGER.debug("Chromecast discovery started...") # remove any disconnected players... - removed_players = [] for player in self.players: if not player.cc.socket_client or not player.cc.socket_client.is_connected: - removed_players.append(player.player_id) # cleanup cast object del player.cc - # signal removed players - for player_id in removed_players: - await self.remove_player(player_id) + self.mass.create_task(self.remove_player(player.player_id)) # search for available chromecasts from pychromecast.discovery import start_discovery, stop_discovery def discovered_callback(name): @@ -272,19 +267,15 @@ class ChromecastProvider(PlayerProvider): discovery_info = listener.services[name] ip_address, port, uuid, model_name, friendly_name = discovery_info player_id = str(uuid) - player = asyncio.run_coroutine_threadsafe( - self.get_player(player_id), - self.mass.event_loop).result() - if not player: - asyncio.run_coroutine_threadsafe( - self.__chromecast_discovered(player_id, discovery_info), self.mass.event_loop) + if not player_id in self.mass.players._players: + self.__chromecast_discovered(player_id, discovery_info) listener, browser = start_discovery(discovered_callback) - await asyncio.sleep(15) # run discovery for 15 seconds + time.sleep(15) # run discovery for 15 seconds stop_discovery(browser) LOGGER.debug("Chromecast discovery completed...") self._discovery_running = False - async def __chromecast_discovered(self, player_id, discovery_info): + def __chromecast_discovered(self, player_id, discovery_info): ''' callback when a (new) chromecast device is discovered ''' from pychromecast import _get_chromecast_from_host, ChromecastConnectionError try: @@ -300,7 +291,7 @@ class ChromecastProvider(PlayerProvider): self.supports_crossfade = False # register status listeners status_listener = StatusListener(player_id, - player.handle_player_state, self.mass.event_loop) + player.handle_player_state, self.mass) if chromecast.cast_type == 'group': mz = MultizoneController(chromecast.uuid) mz.register_listener(MZListener(mz, @@ -311,11 +302,10 @@ class ChromecastProvider(PlayerProvider): chromecast.register_status_listener(status_listener) chromecast.media_controller.register_status_listener(status_listener) player.cc.wait() - await self.add_player(player) + self.mass.create_task(self.add_player(player)) if player.mz: player.mz.update_members() - def chunks(l, n): """Yield successive n-sized chunks from l.""" for i in range(0, len(l), n): @@ -323,22 +313,24 @@ def chunks(l, n): class StatusListener: - def __init__(self, player_id, status_callback, loop): + def __init__(self, player_id, status_callback, mass): self.__handle_callback = status_callback - self.loop = loop + self.mass = mass self.player_id = player_id def new_cast_status(self, status): ''' chromecast status changed (like volume etc.)''' - asyncio.run_coroutine_threadsafe( - self.__handle_callback(caststatus=status), self.loop) + self.mass.create_task( + self.__handle_callback(caststatus=status)) def new_media_status(self, status): ''' mediacontroller has new state ''' - asyncio.run_coroutine_threadsafe( - self.__handle_callback(mediastatus=status), self.loop) + self.mass.create_task( + self.__handle_callback(mediastatus=status)) def new_connection_status(self, status): ''' will be called when the connection changes ''' - asyncio.run_coroutine_threadsafe( - self.__handle_callback(connection_status=status), self.loop) + if status.status == CONNECTION_STATUS_DISCONNECTED: + # schedule a new scan which will handle reconnects and group parent changes + self.mass.event_loop.run_in_executor(None, + self.mass.players.providers[PROV_ID].run_chromecast_discovery) class MZListener: def __init__(self, mz, callback, loop): diff --git a/music_assistant/playerproviders/sonos.py b/music_assistant/playerproviders/sonos.py index 596ee1dc..ba658f70 100644 --- a/music_assistant/playerproviders/sonos.py +++ b/music_assistant/playerproviders/sonos.py @@ -6,6 +6,7 @@ import aiohttp from typing import List import logging import types +import time from ..utils import run_periodic, LOGGER, try_parse_int from ..models.playerprovider import PlayerProvider @@ -19,14 +20,22 @@ PROV_NAME = 'Sonos' PROV_CLASS = 'SonosProvider' CONFIG_ENTRIES = [ - (CONF_ENABLED, False, CONF_ENABLED), + (CONF_ENABLED, True, CONF_ENABLED), ] PLAYER_CONFIG_ENTRIES = [] class SonosPlayer(Player): ''' Sonos player object ''' - + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.__sonos_report_progress_task = None + + def __del__(self): + if self.__sonos_report_progress_task: + self.__sonos_report_progress_task.cancel() + async def cmd_stop(self): ''' send stop command to player ''' self.soco.stop() @@ -94,6 +103,17 @@ class SonosPlayer(Player): for pos, item in enumerate(queue_items): self.soco.add_uri_to_queue(item.uri, last_index+pos) + async def __report_progress(self): + ''' report current progress while playing ''' + # sonos does not send instant updates of the player's progress (cur_time) + # so we need to send it in periodically + while self._state == PlayerState.Playing: + time_diff = time.time() - self.media_position_updated_at + adjusted_current_time = self._cur_time + time_diff + self.cur_time = adjusted_current_time + await asyncio.sleep(1) + self.__sonos_report_progress_task = None + def _update_state(self, event=None): ''' update state, triggerer by event ''' if event: @@ -111,15 +131,17 @@ class SonosPlayer(Player): return if self.soco.is_playing_tv or self.soco.is_playing_line_in: self.powered = False - else: - new_state = self.__convert_state(current_transport_state) - self.state = new_state - track_info = self.soco.get_current_track_info() - self.cur_uri = track_info["uri"] - position_info = self.soco.avTransport.GetPositionInfo( - [("InstanceID", 0), ("Channel", "Master")]) - rel_time = self.__timespan_secs(position_info.get("RelTime")) - self.cur_time = rel_time + return + new_state = self.__convert_state(current_transport_state) + self.state = new_state + track_info = self.soco.get_current_track_info() + self.cur_uri = track_info["uri"] + position_info = self.soco.avTransport.GetPositionInfo( + [("InstanceID", 0), ("Channel", "Master")]) + rel_time = self.__timespan_secs(position_info.get("RelTime")) + self.cur_time = rel_time + if self._state == PlayerState.Playing and self.__sonos_report_progress_task == None: + self.__sonos_report_progress_task = self.mass.create_task(self.__report_progress()) @staticmethod def __convert_state(sonos_state): @@ -151,15 +173,15 @@ class SonosProvider(PlayerProvider): async def setup(self): ''' perform async setup ''' - self.mass.event_loop.create_task( + self.mass.create_task( self.__periodic_discovery()) @run_periodic(1800) async def __periodic_discovery(self): ''' run sonos discovery on interval ''' - await self.run_discovery() + self.mass.event_loop.run_in_executor(None, self.run_discovery) - async def run_discovery(self): + def run_discovery(self): ''' background sonos discovery and handler ''' if self._discovery_running: return @@ -167,24 +189,26 @@ class SonosProvider(PlayerProvider): LOGGER.debug("Sonos discovery started...") import soco discovered_devices = soco.discover() + if discovered_devices == None: + discovered_devices = [] new_device_ids = [item.uid for item in discovered_devices] cur_player_ids = [item.player_id for item in self.players] # remove any disconnected players... for player in self.players: if not player.is_group and not player.soco.uid in new_device_ids: - await self.remove_player(player.player_id) + self.mass.create_task(self.remove_player(player.player_id)) # process new players for device in discovered_devices: if device.uid not in cur_player_ids and device.is_visible: - await self.__device_discovered(device) + self.__device_discovered(device) # handle groups if len(discovered_devices) > 0: - await self.__process_groups(discovered_devices[0].all_groups) + self.__process_groups(discovered_devices[0].all_groups) else: - await self.__process_groups([]) + self.__process_groups([]) - async def __device_discovered(self, soco_device): - '''handle new player ''' + def __device_discovered(self, soco_device): + '''handle new sonos player ''' player = SonosPlayer(self.mass, soco_device.uid, self.prov_id) player.soco = soco_device player.name = soco_device.player_name @@ -201,18 +225,19 @@ class SonosProvider(PlayerProvider): subscribe(soco_device.avTransport, player._update_state) subscribe(soco_device.renderingControl, player._update_state) subscribe(soco_device.zoneGroupTopology, self.__topology_changed) - return await self.add_player(player) + self.mass.create_task(self.add_player(player)) + return player - async def __process_groups(self, sonos_groups): + def __process_groups(self, sonos_groups): ''' process all sonos groups ''' all_group_ids = [] for group in sonos_groups: all_group_ids.append(group.uid) if group.uid not in self.mass.players._players: # new group player - group_player = await self.__device_discovered(group.coordinator) + group_player = self.__device_discovered(group.coordinator) else: - group_player = await self.get_player(group.uid) + group_player = self.mass.players.get_player_sync(group.uid) # check members group_player.name = group.label group_player.group_childs = [item.uid for item in group.members] @@ -223,7 +248,7 @@ class SonosProvider(PlayerProvider): from one of the sonos players schedule discovery to work out the changes ''' - self.mass.event_loop.create_task(self.run_discovery()) + self.mass.event_loop.run_in_executor(None, self.run_discovery) class _ProcessSonosEventQueue: """Queue like object for dispatching sonos events.""" diff --git a/music_assistant/playerproviders/squeezebox.py b/music_assistant/playerproviders/squeezebox.py index f5ff8683..e60d1f2c 100644 --- a/music_assistant/playerproviders/squeezebox.py +++ b/music_assistant/playerproviders/squeezebox.py @@ -41,10 +41,10 @@ class PySqueezeProvider(PlayerProvider): async def setup(self): ''' async initialize of module ''' # start slimproto server - self.mass.event_loop.create_task( + self.mass.create_task( asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483)) # setup discovery - self.mass.event_loop.create_task(self.start_discovery()) + self.mass.create_task(self.start_discovery()) async def start_discovery(self): transport, protocol = await self.mass.event_loop.create_datagram_endpoint( @@ -84,7 +84,7 @@ class PySqueezeProvider(PlayerProvider): player_id = str(device_mac).lower() device_type = devices.get(dev_id, 'unknown device') player = PySqueezePlayer(self.mass, player_id, self.prov_id, device_type, writer) - self.mass.event_loop.create_task(self.mass.players.add_player(player)) + self.mass.create_task(self.mass.players.add_player(player)) elif player != None: player.process_msg(operation, packet) @@ -122,8 +122,8 @@ class PySqueezePlayer(Player): self.send_frame(b"setd", struct.pack("B", 4)) # TODO: remember last volume and power state - self.mass.event_loop.create_task(self.volume_set(40)) - self.mass.event_loop.create_task(self.power_off()) + self.mass.create_task(self.volume_set(40)) + self.mass.create_task(self.power_off()) self._heartbeat_task = asyncio.create_task(self.__send_heartbeat()) async def cmd_stop(self): @@ -243,7 +243,7 @@ class PySqueezePlayer(Player): ''' send command to Squeeze player''' packet = struct.pack('!H', len(data) + 4) + command + data self._writer.write(packet) - self.mass.event_loop.create_task(self._writer.drain()) + self.mass.create_task(self._writer.drain()) def send_version(self): self.send_frame(b'vers', b'7.8') @@ -318,7 +318,7 @@ class PySqueezePlayer(Player): LOGGER.debug("Decoder Ready for next track") next_item = self.queue.next_item if next_item: - self.mass.event_loop.create_task( + self.mass.create_task( self.__send_play(next_item.uri)) def stat_STMe(self, data): @@ -600,7 +600,7 @@ class TLVDiscoveryResponseDatagram(Datagram): if value is None: value = '' elif len(value) > 255: - LOGGER.warning("Response %s too long, truncating to 255 bytes" % typ) + # Response too long, truncating to 255 bytes value = value[:255] parts.extend((typ, chr(len(value)), value)) self.packet = ''.join(parts) @@ -619,7 +619,7 @@ class DiscoveryProtocol(): sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) def connection_lost(self, *args, **kwargs): - LOGGER.warning("Connection lost to discovery") + LOGGER.debug("Connection lost to discovery") def build_TLV_response(self, requestdata): responsedata = OrderedDict() @@ -655,7 +655,6 @@ class DiscoveryProtocol(): try: data = data.decode() dgram = Datagram.decode(data) - LOGGER.debug("Data received from %s: %s" % (addr, dgram)) if isinstance(dgram, ClientDiscoveryDatagram): self.sendDiscoveryResponse(addr) elif isinstance(dgram, TLVDiscoveryRequestDatagram): @@ -666,11 +665,9 @@ class DiscoveryProtocol(): def sendDiscoveryResponse(self, addr): dgram = DiscoveryResponseDatagram(get_hostname(), 3483) - LOGGER.debug("Sending discovery response %r" % (dgram.packet,)) self.transport.sendto(dgram.packet.encode(), addr) def sendTLVDiscoveryResponse(self, resonsedata, addr): dgram = TLVDiscoveryResponseDatagram(resonsedata) - LOGGER.debug("Sending discovery response %r" % (dgram.packet,)) self.transport.sendto(dgram.packet.encode(), addr) diff --git a/music_assistant/playerproviders/web.py b/music_assistant/playerproviders/web.py deleted file mode 100644 index 6b6af1b4..00000000 --- a/music_assistant/playerproviders/web.py +++ /dev/null @@ -1,150 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding:utf-8 -*- - -import asyncio -import os -import struct -from collections import OrderedDict -import time -import decimal -from typing import List -import random -import sys -import socket -from ..utils import run_periodic, LOGGER, parse_track_title, try_parse_int, get_ip, get_hostname -from ..models import PlayerProvider, Player, PlayerState, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist -from ..constants import CONF_ENABLED - - -PROV_ID = 'web' -PROV_NAME = 'WebPlayer' -PROV_CLASS = 'WebPlayerProvider' - -CONFIG_ENTRIES = [ - (CONF_ENABLED, True, CONF_ENABLED), - ] - -PLAYER_CONFIG_ENTRIES = [] - -EVENT_WEBPLAYER_CMD = 'webplayer command' -EVENT_WEBPLAYER_STATE = 'webplayer state' -EVENT_WEBPLAYER_REGISTER = 'webplayer register' - -class WebPlayerProvider(PlayerProvider): - ''' - Implementation of a player using pure HTML/javascript - used in the front-end. - Communication is handled through the websocket connection - and our internal event bus - ''' - - def __init__(self, mass, conf): - super().__init__(mass, conf) - self.prov_id = PROV_ID - self.name = PROV_NAME - self.player_config_entries = PLAYER_CONFIG_ENTRIES - - ### Provider specific implementation ##### - - async def setup(self): - ''' async initialize of module ''' - await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_STATE) - await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_REGISTER) - self.mass.event_loop.create_task(self.check_players()) - - async def handle_mass_event(self, msg, msg_details): - ''' received event for the webplayer component ''' - #print("%s ---> %s" %(msg, msg_details)) - if msg == EVENT_WEBPLAYER_REGISTER: - # register new player - player_id = msg_details['player_id'] - player = WebPlayer(self.mass, player_id, self.prov_id) - player.supports_crossfade = False - player.supports_gapless = False - player.supports_queue = False - player.name = msg_details['name'] - await self.add_player(player) - elif msg == EVENT_WEBPLAYER_STATE: - player_id = msg_details['player_id'] - player = await self.get_player(player_id) - if player: - await player.handle_state(msg_details) - - @run_periodic(30) - async def check_players(self): - ''' invalidate players that did not send a heartbeat message in a while ''' - cur_time = time.time() - offline_players = [] - for player in self.players: - if cur_time - player._last_message > 30: - offline_players.append(player.player_id) - for player_id in offline_players: - await self.remove_player(player_id) - - -class WebPlayer(Player): - ''' Web player object ''' - - def __init__(self, mass, player_id, prov_id): - self._last_message = time.time() - super().__init__(mass, player_id, prov_id) - - async def cmd_stop(self): - ''' send stop command to player ''' - data = { 'player_id': self.player_id, 'cmd': 'stop'} - await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) - - async def cmd_play(self): - ''' send play command to player ''' - data = { 'player_id': self.player_id, 'cmd': 'play'} - await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) - - async def cmd_pause(self): - ''' send pause command to player ''' - data = { 'player_id': self.player_id, 'cmd': 'pause'} - await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) - - async def cmd_power_on(self): - ''' send power ON command to player ''' - self.powered = True # not supported on webplayer - data = { 'player_id': self.player_id, 'cmd': 'stop'} - await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) - - async def cmd_power_off(self): - ''' send power OFF command to player ''' - self.powered = False - - async def cmd_volume_set(self, volume_level): - ''' send new volume level command to player ''' - data = { 'player_id': self.player_id, 'cmd': 'volume_set', 'volume_level': volume_level} - await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) - - async def cmd_volume_mute(self, is_muted=False): - ''' send mute command to player ''' - data = { 'player_id': self.player_id, 'cmd': 'volume_mute', 'is_muted': is_muted} - await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) - - async def cmd_play_uri(self, uri:str): - ''' play single uri on player ''' - data = { 'player_id': self.player_id, 'cmd': 'play_uri', 'uri': uri} - await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) - - async def handle_state(self, data): - ''' handle state event from player ''' - if 'volume_level' in data: - self.volume_level = data['volume_level'] - if 'muted' in data: - self.muted = data['muted'] - if 'state' in data: - self.state = PlayerState(data['state']) - if 'cur_time' in data: - self.cur_time = data['cur_time'] - if 'cur_uri' in data: - self.cur_uri = data['cur_uri'] - if 'powered' in data: - self.powered = data['powered'] - if 'name' in data: - self.name = data['name'] - self._last_message = time.time() - - diff --git a/music_assistant/playerproviders/webplayer.py b/music_assistant/playerproviders/webplayer.py new file mode 100644 index 00000000..a079b1d2 --- /dev/null +++ b/music_assistant/playerproviders/webplayer.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +# -*- coding:utf-8 -*- + +import asyncio +import os +import struct +from collections import OrderedDict +import time +import decimal +from typing import List +import random +import sys +import socket +from ..utils import run_periodic, LOGGER, parse_track_title, try_parse_int, get_ip, get_hostname +from ..models import PlayerProvider, Player, PlayerState, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist +from ..constants import CONF_ENABLED + + +PROV_ID = 'webplayer' +PROV_NAME = 'WebPlayer' +PROV_CLASS = 'WebPlayerProvider' + +CONFIG_ENTRIES = [ + (CONF_ENABLED, True, CONF_ENABLED), + ] + +PLAYER_CONFIG_ENTRIES = [] + +EVENT_WEBPLAYER_CMD = 'webplayer command' +EVENT_WEBPLAYER_STATE = 'webplayer state' +EVENT_WEBPLAYER_REGISTER = 'webplayer register' + +class WebPlayerProvider(PlayerProvider): + ''' + Implementation of a player using pure HTML/javascript + used in the front-end. + Communication is handled through the websocket connection + and our internal event bus + ''' + + def __init__(self, mass, conf): + super().__init__(mass, conf) + self.prov_id = PROV_ID + self.name = PROV_NAME + self.player_config_entries = PLAYER_CONFIG_ENTRIES + + ### Provider specific implementation ##### + + async def setup(self): + ''' async initialize of module ''' + await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_STATE) + await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_REGISTER) + self.mass.create_task(self.check_players()) + + async def handle_mass_event(self, msg, msg_details): + ''' received event for the webplayer component ''' + if msg == EVENT_WEBPLAYER_REGISTER: + # register new player + player_id = msg_details['player_id'] + player = WebPlayer(self.mass, player_id, self.prov_id) + player.supports_crossfade = False + player.supports_gapless = False + player.supports_queue = False + player.name = msg_details['name'] + await self.add_player(player) + elif msg == EVENT_WEBPLAYER_STATE: + player_id = msg_details['player_id'] + player = await self.get_player(player_id) + if player: + await player.handle_state(msg_details) + + @run_periodic(30) + async def check_players(self): + ''' invalidate players that did not send a heartbeat message in a while ''' + cur_time = time.time() + offline_players = [] + for player in self.players: + if cur_time - player._last_message > 30: + offline_players.append(player.player_id) + for player_id in offline_players: + await self.remove_player(player_id) + + +class WebPlayer(Player): + ''' Web player object ''' + + def __init__(self, mass, player_id, prov_id): + self._last_message = time.time() + super().__init__(mass, player_id, prov_id) + + async def cmd_stop(self): + ''' send stop command to player ''' + data = { 'player_id': self.player_id, 'cmd': 'stop'} + await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) + + async def cmd_play(self): + ''' send play command to player ''' + data = { 'player_id': self.player_id, 'cmd': 'play'} + await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) + + async def cmd_pause(self): + ''' send pause command to player ''' + data = { 'player_id': self.player_id, 'cmd': 'pause'} + await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) + + async def cmd_power_on(self): + ''' send power ON command to player ''' + self.powered = True # not supported on webplayer + data = { 'player_id': self.player_id, 'cmd': 'stop'} + await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) + + async def cmd_power_off(self): + ''' send power OFF command to player ''' + self.powered = False + + async def cmd_volume_set(self, volume_level): + ''' send new volume level command to player ''' + data = { 'player_id': self.player_id, 'cmd': 'volume_set', 'volume_level': volume_level} + await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) + + async def cmd_volume_mute(self, is_muted=False): + ''' send mute command to player ''' + data = { 'player_id': self.player_id, 'cmd': 'volume_mute', 'is_muted': is_muted} + await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) + + async def cmd_play_uri(self, uri:str): + ''' play single uri on player ''' + data = { 'player_id': self.player_id, 'cmd': 'play_uri', 'uri': uri} + await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data) + + async def handle_state(self, data): + ''' handle state event from player ''' + if 'volume_level' in data: + self.volume_level = data['volume_level'] + if 'muted' in data: + self.muted = data['muted'] + if 'state' in data: + self.state = PlayerState(data['state']) + if 'cur_time' in data: + self.cur_time = data['cur_time'] + if 'cur_uri' in data: + self.cur_uri = data['cur_uri'] + if 'powered' in data: + self.powered = data['powered'] + if 'name' in data: + self.name = data['name'] + self._last_message = time.time() + + diff --git a/music_assistant/utils.py b/music_assistant/utils.py index 8df418fc..90a2baee 100755 --- a/music_assistant/utils.py +++ b/music_assistant/utils.py @@ -39,7 +39,7 @@ def filename_from_string(string): keepcharacters = (' ','.','_') return "".join(c for c in string if c.isalnum() or c in keepcharacters).rstrip() -def run_background_task(executor, corofn, *args): +def run_background_task(corofn, *args, executor=None): ''' run non-async task in background ''' return asyncio.get_event_loop().run_in_executor(executor, corofn, *args) diff --git a/music_assistant/web.py b/music_assistant/web.py index cd3e0211..685b846f 100755 --- a/music_assistant/web.py +++ b/music_assistant/web.py @@ -43,7 +43,6 @@ class Web(): async def setup(self): ''' perform async setup ''' - self.http_session = aiohttp.ClientSession() app = web.Application() app.add_routes([web.get('/jsonrpc.js', self.json_rpc)]) app.add_routes([web.post('/jsonrpc.js', self.json_rpc)]) diff --git a/music_assistant/web/components/player.vue.js b/music_assistant/web/components/player.vue.js index 61d2cbc7..30d3502f 100755 --- a/music_assistant/web/components/player.vue.js +++ b/music_assistant/web/components/player.vue.js @@ -253,6 +253,7 @@ Vue.component("player", { }, switchPlayer (new_player_id) { this.active_player_id = new_player_id; + localStorage.setItem('active_player_id', new_player_id); }, setPlayerVolume: function(player_id, new_volume) { this.players[player_id].volume_level = new_volume; @@ -297,8 +298,8 @@ Vue.component("player", { } }, createAudioPlayer(data) { - if (navigator.userAgent.includes("WebKit")) - return // streaming flac not supported on webkit ?! + if (!navigator.userAgent.includes("Chrome")) + return // streaming flac only supported on chrome browser if (localStorage.getItem('audio_player_id')) // get player id from local storage this.audioPlayerId = localStorage.getItem('audio_player_id'); @@ -400,23 +401,30 @@ Vue.component("player", { } // select new active player - // TODO: store previous player in local storage - if (!this.active_player_id || !this.players[this.active_player_id].enabled) - for (var player_id in this.players) - if (this.players[player_id].state == 'playing' && this.players[player_id].enabled) { - // prefer the first playing player - this.active_player_id = player_id; - break; - } - if (!this.active_player_id || !this.players[this.active_player_id].enabled) - for (var player_id in this.players) { - // fallback to just the first player - if (this.players[player_id].enabled) - { - this.active_player_id = player_id; - break; - } + if (!this.active_player_id || !this.players[this.active_player_id].enabled) { + // prefer last selected player + last_player = localStorage.getItem('active_player_id') + if (last_player && this.players[last_player] && this.players[last_player].enabled) + this.active_player_id = last_player; + else + { + // prefer the first playing player + for (var player_id in this.players) + if (this.players[player_id].state == 'playing' && this.players[player_id].enabled && this.players[player_id].group_parents.length == 0) { + this.active_player_id = player_id; + break; + } + // fallback to just the first player + if (!this.active_player_id || !this.players[this.active_player_id].enabled) + for (var player_id in this.players) { + if (this.players[player_id].enabled && this.players[player_id].group_parents.length == 0) + { + this.active_player_id = player_id; + break; + } + } } + } }.bind(this); this.ws.onclose = function(e) { diff --git a/music_assistant/web/images/icons/sonos.png b/music_assistant/web/images/icons/sonos.png new file mode 100644 index 00000000..b0a2c0d0 Binary files /dev/null and b/music_assistant/web/images/icons/sonos.png differ diff --git a/music_assistant/web/images/icons/webplayer.png b/music_assistant/web/images/icons/webplayer.png new file mode 100644 index 00000000..39b73236 Binary files /dev/null and b/music_assistant/web/images/icons/webplayer.png differ diff --git a/music_assistant/web/strings.js b/music_assistant/web/strings.js index ffa09bd5..bd8025d4 100644 --- a/music_assistant/web/strings.js +++ b/music_assistant/web/strings.js @@ -35,6 +35,8 @@ const messages = { file: "Filesystem", chromecast: "Chromecast", squeezebox: "Squeezebox support", + sonos: "Sonos", + webplayer: "Web Player (Chrome browser only)", username: "Username", password: "Password", hostname: "Hostname (or IP)", @@ -125,6 +127,8 @@ const messages = { file: "Bestandssysteem", chromecast: "Chromecast", squeezebox: "Squeezebox ondersteuning", + sonos: "Sonos", + webplayer: "Web Player (alleen Chrome browser)", username: "Gebruikersnaam", password: "Wachtwoord", hostname: "Hostnaam (of IP)",