From 83d05650b152409baeea80e0bce467b426a32e71 Mon Sep 17 00:00:00 2001 From: marcelveldt Date: Sat, 12 Oct 2019 17:45:47 +0200 Subject: [PATCH] finished refactor --- music_assistant/__init__.py | 28 +++----- music_assistant/homeassistant.py | 9 ++- music_assistant/http_streamer.py | 55 +++++++++------ music_assistant/metadata.py | 15 +++- music_assistant/models/__init__.py | 9 +-- music_assistant/models/player.py | 52 ++++++-------- music_assistant/models/player_queue.py | 69 +++++++++++++++---- music_assistant/models/playerprovider.py | 9 +-- music_assistant/models/playerstate.py | 10 +++ music_assistant/musicproviders/qobuz.py | 13 ++-- music_assistant/musicproviders/spotify.py | 9 ++- music_assistant/player_manager.py | 25 +++---- music_assistant/playerproviders/chromecast.py | 21 ++++-- music_assistant/playerproviders/lms.py | 8 +-- music_assistant/playerproviders/pylms.py | 2 +- music_assistant/web.py | 60 +++++++++++----- 16 files changed, 240 insertions(+), 154 deletions(-) create mode 100755 music_assistant/models/playerstate.py diff --git a/music_assistant/__init__.py b/music_assistant/__init__.py index 984e33b8..23907716 100644 --- a/music_assistant/__init__.py +++ b/music_assistant/__init__.py @@ -1,8 +1,6 @@ #!/usr/bin/env python3 # -*- coding:utf-8 -*- -# import os, sys; sys.path.append(os.path.dirname(os.path.realpath(__file__))) - import sys import asyncio from concurrent.futures import ThreadPoolExecutor @@ -15,9 +13,6 @@ import slugify as unicode_slug import uuid import json import time -# import stackimpact - -# __package__ = 'music_assistant' from .database import Database from .utils import run_periodic, LOGGER @@ -32,7 +27,7 @@ from .web import setup as web_setup def handle_exception(loop, context): # context["message"] will always be there; but context["exception"] may not msg = context.get("exception", context["message"]) - LOGGER.error(f"Caught exception: {msg}") + LOGGER.exception(f"Caught exception: {msg}") class MusicAssistant(): @@ -41,9 +36,10 @@ class MusicAssistant(): self.datapath = datapath self.parse_config() self.event_loop = asyncio.get_event_loop() + self.event_loop.set_debug(True) self.bg_executor = ThreadPoolExecutor() self.event_loop.set_default_executor(self.bg_executor) - self.event_loop.set_exception_handler(handle_exception) + #self.event_loop.set_exception_handler(handle_exception) self.event_listeners = {} # init database and metadata modules @@ -61,41 +57,33 @@ class MusicAssistant(): self.player = PlayerManager(self) self.http_streamer = HTTPStreamer(self) - # agent = stackimpact.start( - # agent_key = '4a00b6f2c7da20f692807d204ab3760318978ba3', - # app_name = 'MusicAssistant') - # print("profiler started...") - # start the event loop try: self.event_loop.run_forever() except (KeyboardInterrupt, SystemExit): LOGGER.info('Exit requested!') - self.signal_event("system_shutdown") + self.event_loop.create_task(self.signal_event("system_shutdown")) self.event_loop.stop() self.save_config() time.sleep(5) self.event_loop.close() LOGGER.info('Shutdown complete.') - def signal_event(self, msg, msg_details=None): + async def signal_event(self, msg, msg_details=None): ''' signal (systemwide) event ''' LOGGER.debug("Event: %s - %s" %(msg, msg_details)) listeners = list(self.event_listeners.values()) for callback, eventfilter in listeners: if not eventfilter or eventfilter in msg: - if not asyncio.iscoroutinefunction(callback): - callback(msg, msg_details) - else: - self.event_loop.create_task(callback(msg, msg_details)) + self.event_loop.create_task(callback(msg, msg_details)) - def add_event_listener(self, cb, eventfilter=None): + async def add_event_listener(self, cb, eventfilter=None): ''' add callback to our event listeners ''' cb_id = str(uuid.uuid4()) self.event_listeners[cb_id] = (cb, eventfilter) return cb_id - def remove_event_listener(self, cb_id): + async def remove_event_listener(self, cb_id): ''' remove callback from our event listeners ''' self.event_listeners.pop(cb_id, None) diff --git a/music_assistant/homeassistant.py b/music_assistant/homeassistant.py index ae30d1f6..9f76f1ce 100644 --- a/music_assistant/homeassistant.py +++ b/music_assistant/homeassistant.py @@ -68,12 +68,17 @@ class HomeAssistant(): else: self._use_ssl = False self._host = url.replace('http://','').split('/')[0] - self.http_session = aiohttp.ClientSession(loop=mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False)) self.__send_ws = None self.__last_id = 10 LOGGER.info('Homeassistant integration is enabled') + self.mass.event_loop.create_task(self.setup()) + + async def setup(self): + ''' perform async setup ''' + self.http_session = aiohttp.ClientSession( + loop=self.mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False)) mass.event_loop.create_task(self.__hass_websocket()) - self.mass.add_event_listener(self.mass_event, "player updated") + await self.mass.add_event_listener(self.mass_event, "player updated") mass.event_loop.create_task(self.__get_sources()) async def get_state(self, entity_id, attribute='state', register_listener=None): diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index 0c5a3433..9fcf3717 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -14,7 +14,7 @@ import pyloudnorm as pyln import aiohttp from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size from .models.media_types import TrackQuality, MediaType -from .models.player import PlayerState +from .models.playerstate import PlayerState class HTTPStreamer(): @@ -55,7 +55,6 @@ class HTTPStreamer(): run_async_background_task( self.mass.bg_executor, self.__stream_queue, player, queue, cancelled) - await asyncio.sleep(2) try: while True: chunk = await queue.get() @@ -104,21 +103,33 @@ class HTTPStreamer(): chunk = await sox_proc.stdout.read(256000) if not chunk: break - await buffer.put(chunk) - await buffer.put(b'') # indicate EOF + asyncio.run_coroutine_threadsafe( + buffer.put(chunk), + self.mass.event_loop) + # indicate EOF if no more data + asyncio.run_coroutine_threadsafe( + buffer.put(b''), + self.mass.event_loop) asyncio.create_task(fill_buffer()) - LOGGER.info("Start Queue Stream for player %s" %(player.name)) + LOGGER.info("Start Queue Stream for player %s " %(player.name)) + is_start = True last_fadeout_data = b'' - # report start of queue playback so we can calculate current track/duration etc. - # self.mass.event_loop.create_task(self.mass.player.player_queue_stream_update(player_id, queue_index, True)) while True: # get the (next) track in queue - queue_track = player.queue.next_item - LOGGER.info("got queue track %s" % queue_track.name) + if is_start: + # report start of queue playback so we can calculate current track/duration etc. + queue_track = asyncio.run_coroutine_threadsafe( + player.queue.start_queue_stream(), + self.mass.event_loop).result() + is_start = False + else: + queue_track = player.queue.next_item if not queue_track: + LOGGER.warning("no (more) tracks left in queue") break - LOGGER.debug("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) + LOGGER.info("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) + LOGGER.info(player.state) fade_in_part = b'' cur_chunk = 0 prev_chunk = None @@ -196,7 +207,7 @@ class HTTPStreamer(): # wait for the queue to consume the data # this prevents that the entire track is sitting in memory # and it helps a bit in the quest to follow where we are in the queue - while buffer.qsize() > 1 and not cancelled.is_set(): + while buffer.qsize() > 2 and not cancelled.is_set(): await asyncio.sleep(1) # end of the track reached if cancelled.is_set(): @@ -207,13 +218,8 @@ class HTTPStreamer(): # WIP: update actual duration to the queue for more accurate now playing info accurate_duration = bytes_written / int(sample_rate * 4 * 2) queue_track.duration = accurate_duration - #self.mass.player.providers[player.player_provider]._player_queue[player_id][queue_index] = queue_track - # move to next queue index - #queue_index += 1 - #self.mass.event_loop.create_task(self.mass.player.player_queue_stream_update(player_id, queue_index, False)) LOGGER.info("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) LOGGER.info("bytes written: %s - duration: %s" % (bytes_written, accurate_duration)) - break # end of queue reached, pass last fadeout bits to final output if last_fadeout_data and not cancelled.is_set(): sox_proc.stdin.write(last_fadeout_data) @@ -255,14 +261,13 @@ class HTTPStreamer(): streamdetails["provider"] = queue_item.provider streamdetails["track_id"] = queue_item.item_id streamdetails["player_id"] = player.player_id - self.mass.signal_event('streaming_started', streamdetails) + asyncio.run_coroutine_threadsafe( + self.mass.signal_event('streaming_started', streamdetails), self.mass.event_loop) # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly prev_chunk = b'' bytes_sent = 0 while not process.stdout.at_eof(): - if cancelled.is_set(): - process.terminate() try: chunk = await process.stdout.readexactly(chunksize) except asyncio.streams.IncompleteReadError: @@ -279,6 +284,10 @@ class HTTPStreamer(): bytes_sent += len(prev_chunk) await process.wait() if cancelled.is_set(): + try: + process.terminate() + except ProcessLookupError: + pass LOGGER.warning("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent)) else: LOGGER.info("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent)) @@ -289,9 +298,13 @@ class HTTPStreamer(): bytes_per_second = streamdetails["sample_rate"] * (streamdetails["bit_depth"]/8) * 2 seconds_streamed = int(bytes_sent/bytes_per_second) streamdetails["seconds"] = seconds_streamed - self.mass.signal_event('streaming_ended', streamdetails) + asyncio.run_coroutine_threadsafe( + self.mass.signal_event('streaming_ended', streamdetails), + self.mass.event_loop) # send task to background to analyse the audio - self.mass.event_loop.create_task(self.__analyze_audio(queue_item.item_id, queue_item.provider)) + asyncio.run_coroutine_threadsafe( + self.__analyze_audio(queue_item.item_id, queue_item.provider), + self.mass.event_loop) async def __get_player_sox_options(self, player, queue_item): ''' get player specific sox effect options ''' diff --git a/music_assistant/metadata.py b/music_assistant/metadata.py index 0f087890..2e0c14b1 100755 --- a/music_assistant/metadata.py +++ b/music_assistant/metadata.py @@ -61,7 +61,12 @@ class MusicBrainz(): def __init__(self, event_loop, cache): self.event_loop = event_loop self.cache = cache - self.http_session = aiohttp.ClientSession(loop=event_loop, connector=aiohttp.TCPConnector(verify_ssl=False)) + self.event_loop.create_task(self.setup()) + + async def setup(self): + ''' perform async setup ''' + self.http_session = aiohttp.ClientSession( + loop=self.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False)) self.throttler = Throttler(rate_limit=1, period=1) async def search_artist_by_album(self, artistname, albumname=None, album_upc=None): @@ -131,13 +136,17 @@ class MusicBrainz(): result = None return result - class FanartTv(): def __init__(self, event_loop, cache): self.event_loop = event_loop self.cache = cache - self.http_session = aiohttp.ClientSession(loop=event_loop, connector=aiohttp.TCPConnector(verify_ssl=False)) + self.event_loop.create_task(self.setup()) + + async def setup(self): + ''' perform async setup ''' + self.http_session = aiohttp.ClientSession( + loop=self.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False)) self.throttler = Throttler(rate_limit=1, period=1) async def artist_images(self, mb_artist_id): diff --git a/music_assistant/models/__init__.py b/music_assistant/models/__init__.py index 05b72219..3518fe87 100644 --- a/music_assistant/models/__init__.py +++ b/music_assistant/models/__init__.py @@ -1,5 +1,6 @@ from .media_types import * -from .musicprovider import * -from .player_queue import * -from .player import * -from .playerprovider import * \ No newline at end of file +from .musicprovider import MusicProvider +from .player_queue import QueueItem, PlayerQueue +from .player import Player +from .playerstate import PlayerState +from .playerprovider import PlayerProvider \ No newline at end of file diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 4ae2c4f6..e7d1b3b2 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -10,14 +10,9 @@ from ..constants import CONF_ENABLED from ..cache import use_cache from .media_types import Track, MediaType from .player_queue import PlayerQueue, QueueItem +from .playerstate import PlayerState -class PlayerState(str, Enum): - Off = "off" - Stopped = "stopped" - Paused = "paused" - Playing = "playing" - class Player(): ''' representation of a player ''' @@ -198,7 +193,7 @@ class Player(): return hass_state != 'off' # mute as power elif self.settings.get('mute_as_power'): - return self.muted + return not self.muted else: return self._powered @@ -214,11 +209,10 @@ class Player(): ''' [PROTECTED] cur_time (player's elapsed time) property of this player ''' # handle group player if self.group_parent: - group_player = self.mass.bg_executor.submit(asyncio.run, - self.mass.player.get_player(self.group_parent)).result() + group_player = self.mass.player.get_player_sync(self.group_parent) if group_player: return group_player.cur_time - return self._cur_time + return self.queue.cur_item_time @cur_time.setter def cur_time(self, cur_time:int): @@ -232,8 +226,7 @@ class Player(): ''' [PROTECTED] cur_uri (uri loaded in player) property of this player ''' # handle group player if self.group_parent: - group_player = self.mass.bg_executor.submit(asyncio.run, - self.mass.player.get_player(self.group_parent)).result() + group_player = self.mass.player.get_player_sync(self.group_parent) if group_player: return group_player.cur_uri return self._cur_uri @@ -310,15 +303,6 @@ class Player(): return [] return [item for item in self.mass.player.players if item.group_parent == self.player_id] - @property - def settings(self): - ''' [PROTECTED] get the player config settings ''' - player_settings = self.mass.config['player_settings'].get(self.player_id) - if not player_settings: - player_settings = self.mass.bg_executor.submit(asyncio.run, - self.__update_player_settings()).result() - return player_settings - @property def enabled(self): ''' [PROTECTED] player enabled config setting ''' @@ -329,8 +313,7 @@ class Player(): ''' [PROTECTED] player's queue ''' # handle group player if self.group_parent: - group_player = self.mass.bg_executor.submit(asyncio.run, - self.mass.player.get_player(self.group_parent)).result() + group_player = self.mass.player.get_player_sync(self.group_parent) if group_player: return group_player.queue return self._queue @@ -344,7 +327,7 @@ class Player(): ''' [PROTECTED] send stop command to player ''' if self.group_parent: # redirect playback related commands to parent player - group_player = await self.mass.player.get(self.group_parent) + group_player = await self.mass.player.get_player(self.group_parent) if group_player: return await group_player.stop() else: @@ -501,6 +484,8 @@ class Player(): async def volume_up(self): ''' [PROTECTED] send volume up command to player ''' new_level = self.volume_level + 1 + if new_level > 100: + new_level = 100 return await self.volume_set(new_level) async def volume_down(self): @@ -516,12 +501,17 @@ class Player(): async def update(self): ''' [PROTECTED] signal player updated ''' - await self.__update_player_settings() - LOGGER.info("player updated: %s" % self.name) - self.mass.signal_event('player changed', self) + await self.queue.update() + LOGGER.debug("player updated: %s" % self.name) + await self.mass.signal_event('player changed', self) - async def __update_player_settings(self): + @property + def settings(self): ''' [PROTECTED] get (or create) player config settings ''' + player_settings = self.mass.config['player_settings'].get(self.player_id,{}) + if player_settings: + return player_settings + # generate config for the player config_entries = [ # default config entries for a player ("enabled", True, "player_enabled"), ("name", "", "player_name"), @@ -533,7 +523,7 @@ class Player(): ("crossfade_duration", 0, "crossfade_duration"), ] # append player specific settings - config_entries += await self.mass.player.providers[self._prov_id].get_player_config_entries() + config_entries += self.mass.player.providers[self._prov_id].player_config_entries if self.is_group or not self.group_parent: config_entries += [ # play on power on setting ("play_power_on", False, "player_power_play"), @@ -543,7 +533,6 @@ class Player(): config_entries += [("hass_power_entity", "", "hass_player_power"), ("hass_power_entity_source", "", "hass_player_source"), ("hass_volume_entity", "", "hass_player_volume")] - player_settings = self.mass.config['player_settings'].get(self.player_id,{}) for key, def_value, desc in config_entries: if not key in player_settings: if (isinstance(def_value, str) and def_value.startswith('<')): @@ -554,8 +543,7 @@ class Player(): self.mass.config['player_settings'][self.player_id]['__desc__'] = config_entries return player_settings - @property - def __dict__(self): + def to_dict(self): ''' instance attributes as dict so it can be serialized to json ''' return { "player_id": self.player_id, diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index ccf495d8..de7d555c 100755 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -10,6 +10,7 @@ import uuid from ..utils import LOGGER from ..constants import CONF_ENABLED from .media_types import Track, TrackQuality +from .playerstate import PlayerState class QueueItem(Track): @@ -21,8 +22,8 @@ class QueueItem(Track): self.queue_item_id = str(uuid.uuid4()) # if existing media_item given, load those values if media_item: - for attribute, value in media_item.__dict__.items(): - setattr(self, attribute, value) + for key, value in media_item.__dict__.items(): + setattr(self, key, value) class PlayerQueue(): ''' @@ -37,7 +38,9 @@ class PlayerQueue(): self._items = [] self._shuffle_enabled = True self._repeat_enabled = False - self._cur_index = None + self._cur_index = 0 + self._cur_item_time = 0 + self._last_index = 0 @property def shuffle_enabled(self): @@ -49,7 +52,7 @@ class PlayerQueue(): @property def crossfade_enabled(self): - return self._player.settings['crossfade_duration'] + return self._player.settings.get('crossfade_duration', 0) > 0 @property def gapless_enabled(self): @@ -58,17 +61,21 @@ class PlayerQueue(): @property def cur_index(self): ''' match current uri with queue items to determine queue index ''' - for index, queue_item in enumerate(self.items): - if queue_item.uri == self._player.cur_uri: - return index return self._cur_index @property def cur_item(self): - if self.cur_index == None: + if self.cur_index == None or not self.items or len(self.items) < self.cur_index: return None - return self.mass.bg_executor.submit(asyncio.run,self.get_item(self.cur_index)).result() + return self.items[self.cur_index] + @property + def cur_item_time(self): + if self.use_queue_stream: + return self._cur_item_time + else: + return self._player._cur_time + @property def next_index(self): ''' @@ -94,8 +101,9 @@ class PlayerQueue(): ''' return the next item in the queue ''' - return self.mass.bg_executor.submit( - asyncio.run, self.get_item(self.next_index)).result() + if self.next_index != None: + return self.items[self.next_index] + return None @property def items(self): @@ -170,7 +178,7 @@ class PlayerQueue(): if not len(self.items) > index: return if self.use_queue_stream: - self._cur_index = index -1 + self._cur_index = index queue_stream_uri = 'http://%s:%s/stream/%s'% ( self.mass.web.local_ip, self.mass.web.http_port, self._player.player_id) return await self._player.cmd_play_uri(queue_stream_uri) @@ -184,7 +192,7 @@ class PlayerQueue(): if self._shuffle_enabled: queue_items = await self.__shuffle_items(queue_items) self._items = queue_items - self._cur_index = None + self._cur_index = 0 if self.use_queue_stream or not self._player.supports_queue: return await self.play_index(0) else: @@ -223,6 +231,41 @@ class PlayerQueue(): if self._player.supports_queue: return await self._player.cmd_queue_append(queue_items) + async def update(self): + ''' update queue details, called when player updates ''' + if self.use_queue_stream and self._player.state == PlayerState.Playing: + # determine queue index and cur_time for queue stream + # player is playing a constant stream of the queue so we need to do this the hard way + cur_time_queue = self._player._cur_time + total_time = 0 + track_time = 0 + if self.items: + queue_index = self._last_index # holds the last starting position + queue_track = None + while True: + queue_track = self.items[queue_index] + if cur_time_queue > (queue_track.duration + total_time): + total_time += queue_track.duration + queue_index += 1 + else: + track_time = cur_time_queue - total_time + break + self._cur_index = queue_index + self._cur_item_time = track_time + elif not self.use_queue_stream: + # normal queue based approach + cur_index = 0 + for index, queue_item in enumerate(self.items): + if queue_item.uri == self._player.cur_uri: + cur_index = index + break + self._cur_index = cur_index + + async def start_queue_stream(self): + ''' called by the queue streamer when it starts playing the queue stream ''' + self._last_index = self.cur_index + return await self.get_item(self.cur_index) + async def __shuffle_items(self, queue_items): ''' shuffle a list of tracks ''' # for now we use default python random function diff --git a/music_assistant/models/playerprovider.py b/music_assistant/models/playerprovider.py index 2b45955f..0dce5c33 100755 --- a/music_assistant/models/playerprovider.py +++ b/music_assistant/models/playerprovider.py @@ -24,19 +24,16 @@ class PlayerProvider(): self.mass = mass self.name = 'My great Musicplayer provider' # display name self.prov_id = 'my_provider' # used as id + self.player_config_entries = [] # player specific config entries ### Common methods and properties #### - async def get_player_config_entries(self): - ''' [CAN OVERRIDE] get the player-specific config entries for this provider (list with key/value pairs)''' - return [] @property def players(self): ''' return all players for this provider ''' - return self.mass.bg_executor.submit(asyncio.run, - self.mass.player.get_provider_players(self.prov_id)).result() - + return [item for item in self.mass.player.players if item.player_provider == self.prov_id] + async def get_player(self, player_id:str): ''' return player by id ''' return await self.mass.player.get_player(player_id) diff --git a/music_assistant/models/playerstate.py b/music_assistant/models/playerstate.py new file mode 100755 index 00000000..34336119 --- /dev/null +++ b/music_assistant/models/playerstate.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python3 +# -*- coding:utf-8 -*- + +from enum import Enum + +class PlayerState(str, Enum): + Off = "off" + Stopped = "stopped" + Paused = "paused" + Playing = "playing" diff --git a/music_assistant/musicproviders/qobuz.py b/music_assistant/musicproviders/qobuz.py index 40bab64d..0e4ba3cf 100644 --- a/music_assistant/musicproviders/qobuz.py +++ b/music_assistant/musicproviders/qobuz.py @@ -44,15 +44,20 @@ class QobuzProvider(MusicProvider): self.prov_id = 'qobuz' self.mass = mass self.cache = mass.cache - self.http_session = aiohttp.ClientSession(loop=mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False)) self.__username = username self.__password = password self.__user_auth_info = None self.__logged_in = False - self.throttler = Throttler(rate_limit=2, period=1) - mass.add_event_listener(self.mass_event, 'streaming_started') - mass.add_event_listener(self.mass_event, 'streaming_ended') + self.mass.event_loop.create_task(self.setup()) + async def setup(self): + ''' perform async setup ''' + self.http_session = aiohttp.ClientSession( + loop=self.mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False)) + self.throttler = Throttler(rate_limit=2, period=1) + await self.mass.add_event_listener(self.mass_event, 'streaming_started') + await self.mass.add_event_listener(self.mass_event, 'streaming_ended') + async def search(self, searchstring, media_types=List[MediaType], limit=5): ''' perform search on the provider ''' result = { diff --git a/music_assistant/musicproviders/spotify.py b/music_assistant/musicproviders/spotify.py index 43a56fc8..50f4001d 100644 --- a/music_assistant/musicproviders/spotify.py +++ b/music_assistant/musicproviders/spotify.py @@ -45,11 +45,16 @@ class SpotifyProvider(MusicProvider): self._cur_user = None self.mass = mass self.cache = mass.cache - self.http_session = aiohttp.ClientSession(loop=mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False)) - self.throttler = Throttler(rate_limit=1, period=1) self._username = username self._password = password self.__auth_token = {} + self.mass.event_loop.create_task(self.setup()) + + async def setup(self): + ''' perform async setup ''' + self.http_session = aiohttp.ClientSession( + loop=self.mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False)) + self.throttler = Throttler(rate_limit=1, period=1) async def search(self, searchstring, media_types=List[MediaType], limit=5): ''' perform search on the provider ''' diff --git a/music_assistant/player_manager.py b/music_assistant/player_manager.py index de6026d8..324c4823 100755 --- a/music_assistant/player_manager.py +++ b/music_assistant/player_manager.py @@ -13,7 +13,7 @@ import importlib from .utils import run_periodic, LOGGER, try_parse_int, try_parse_float, get_ip, run_async_background_task from .models.media_types import MediaType, TrackQuality from .models.player_queue import QueueItem -from .models.player import PlayerState +from .models.playerstate import PlayerState BASE_DIR = os.path.dirname(os.path.abspath(__file__)) MODULES_PATH = os.path.join(BASE_DIR, "playerproviders" ) @@ -29,38 +29,31 @@ class PlayerManager(): self._players = {} # dynamically load provider modules self.load_providers() - + @property def players(self): - ''' all players as property ''' - return self.mass.bg_executor.submit(asyncio.run, - self.get_players()).result() - - async def get_players(self): - ''' return all players as a list ''' - items = list(self._players.values()) - items.sort(key=lambda x: x.name, reverse=False) - return items + ''' return list of all players ''' + return self._players.values() async def get_player(self, player_id): ''' return player by id ''' return self._players.get(player_id, None) - async def get_provider_players(self, player_provider): - ''' return all players for given provider_id ''' - return [item for item in self._players.values() if item.player_provider == player_provider] + def get_player_sync(self, player_id): + ''' return player by id (non async) ''' + return self._players.get(player_id, None) async def add_player(self, player): ''' register a new player ''' self._players[player.player_id] = player - self.mass.signal_event('player added', player) + await self.mass.signal_event('player added', player) # TODO: turn on player if it was previously turned on ? return player async def remove_player(self, player_id): ''' handle a player remove ''' self._players.pop(player_id, None) - self.mass.signal_event('player removed', player_id) + await self.mass.signal_event('player removed', player_id) async def trigger_update(self, player_id): ''' manually trigger update for a player ''' diff --git a/music_assistant/playerproviders/chromecast.py b/music_assistant/playerproviders/chromecast.py index 7441b6c4..cf1434db 100644 --- a/music_assistant/playerproviders/chromecast.py +++ b/music_assistant/playerproviders/chromecast.py @@ -13,6 +13,7 @@ import types from ..utils import run_periodic, LOGGER, try_parse_int from ..models.playerprovider import PlayerProvider from ..models.player import Player, PlayerState +from ..models.playerstate import PlayerState from ..models.player_queue import QueueItem, PlayerQueue from ..constants import CONF_ENABLED, CONF_HOSTNAME, CONF_PORT @@ -180,14 +181,9 @@ class ChromecastProvider(PlayerProvider): self.prov_id = 'chromecast' self.name = 'Chromecast' self._discovery_running = False + self.player_config_entries = [("gapless_enabled", False, "gapless_enabled")] self.mass.event_loop.create_task(self.__periodic_chromecast_discovery()) - async def get_player_config_entries(self): - ''' get the player config entries for this provider (list with key/value pairs)''' - return [ - ("gapless_enabled", False, "gapless_enabled") - ] - async def __handle_player_state(self, chromecast, caststatus=None, mediastatus=None): ''' handle a player state message from the socket ''' player_id = str(chromecast.uuid) @@ -198,15 +194,25 @@ class ChromecastProvider(PlayerProvider): player.muted = caststatus.volume_muted player.volume_level = caststatus.volume_level * 100 if mediastatus: - # chromecast does not support power on/of so we only set state if mediastatus.player_state in ['PLAYING', 'BUFFERING']: player.state = PlayerState.Playing + player.powered = True elif mediastatus.player_state == 'PAUSED': player.state = PlayerState.Paused else: player.state = PlayerState.Stopped player.cur_uri = mediastatus.content_id player.cur_time = mediastatus.adjusted_current_time + # create update/poll task for the current time + async def poll_task(): + player.poll_task = True + while player.state == PlayerState.Playing: + player.cur_time = mediastatus.adjusted_current_time + await asyncio.sleep(5) + player.poll_task = False + if not player.poll_task and player.state == PlayerState.Playing: + self.mass.event_loop.create_task(poll_task()) + asyncio.run_coroutine_threadsafe(player.update(), self.mass.event_loop) async def __handle_group_members_update(self, mz, added_player=None, removed_player=None): ''' callback when cast group members update ''' @@ -286,6 +292,7 @@ class ChromecastProvider(PlayerProvider): listenerMedia = StatusMediaListener(chromecast, self.__handle_player_state, self.mass.event_loop) chromecast.media_controller.register_status_listener(listenerMedia) player = ChromecastPlayer(self.mass, player_id, self.prov_id) + player.poll_task = False if chromecast.cast_type == 'group': player.is_group = True mz = MultizoneController(chromecast.uuid) diff --git a/music_assistant/playerproviders/lms.py b/music_assistant/playerproviders/lms.py index ad5b5e38..cc1b55fd 100644 --- a/music_assistant/playerproviders/lms.py +++ b/music_assistant/playerproviders/lms.py @@ -44,15 +44,12 @@ class LMSProvider(PlayerProvider): ''' support for Logitech Media Server ''' def __init__(self, mass, hostname, port): + super().__init__(mass) self.prov_id = 'lms' self.name = 'Logitech Media Server' - self.icon = '' - self.mass = mass - self._players = {} self._host = hostname self._port = port self.last_msg_received = 0 - self.supported_musicproviders = ['qobuz', 'file', 'spotify', 'http'] self.http_session = aiohttp.ClientSession(loop=mass.event_loop) # we use a combi of active polling and subscriptions because the cometd implementation of LMS is somewhat unreliable asyncio.ensure_future(self.__lms_events()) @@ -60,9 +57,6 @@ class LMSProvider(PlayerProvider): ### Provider specific implementation ##### - async def player_config_entries(self): - ''' get the player config entries for this provider (list with key/value pairs)''' - return [] async def player_command(self, player_id, cmd:str, cmd_args=None): ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) ''' diff --git a/music_assistant/playerproviders/pylms.py b/music_assistant/playerproviders/pylms.py index 0a83f533..78ad61a7 100644 --- a/music_assistant/playerproviders/pylms.py +++ b/music_assistant/playerproviders/pylms.py @@ -35,9 +35,9 @@ class PyLMSServer(PlayerProvider): ''' Python implementation of SlimProto server ''' def __init__(self, mass): + super().__init__(mass) self.prov_id = 'pylms' self.name = 'Logitech Media Server Emulation' - self.mass = mass self._lmsplayers = {} self.buffer = b'' self.last_msg_received = 0 diff --git a/music_assistant/web.py b/music_assistant/web.py index 739e894d..83bac063 100755 --- a/music_assistant/web.py +++ b/music_assistant/web.py @@ -17,14 +17,30 @@ from .utils import run_periodic, LOGGER, run_async_background_task, get_ip #json_serializer = partial(json.dumps, default=lambda x: x.__dict__) def json_serializer(obj): - # if isinstance(obj, list): - # lst = [] - # for item in obj: - # json_obj = json.dumps(item, skipkeys=True, default=lambda x: x.__dict__) - # lst.append(json_obj) - # return '[' + ','.join(lst) + ']' - return json.dumps(obj, skipkeys=True, default=lambda x: x.__dict__) + def get_val(val): + if isinstance(val, (int, str, bool, float)): + return val + elif isinstance(val, list): + new_list = [] + for item in val: + new_list.append( get_val(item)) + return new_list + elif hasattr(val, 'to_dict'): + return get_val(val.to_dict()) + elif isinstance(val, dict): + new_dict = {} + for key, value in val.items(): + new_dict[key] = get_val(value) + return new_dict + elif hasattr(val, '__dict__'): + new_dict = {} + for key, value in val.__dict__.items(): + new_dict[key] = get_val(value) + return new_dict + + obj = get_val(obj) + return json.dumps(obj, skipkeys=True) def setup(mass): ''' setup the module and read/apply config''' @@ -70,14 +86,15 @@ class Web(): self._ssl_cert = ssl_cert self._ssl_key = ssl_key self._cert_fqdn_host = cert_fqdn_host - self.http_session = aiohttp.ClientSession() - mass.event_loop.create_task(self.setup_web()) + self.mass.event_loop.create_task(self.setup()) def stop(self): asyncio.create_task(self.runner.cleanup()) asyncio.create_task(self.http_session.close()) - async def setup_web(self): + async def setup(self): + ''' perform async setup ''' + self.http_session = aiohttp.ClientSession() app = web.Application() app.add_routes([web.get('/jsonrpc.js', self.json_rpc)]) app.add_routes([web.post('/jsonrpc.js', self.json_rpc)]) @@ -89,6 +106,7 @@ class Web(): app.add_routes([web.get('/api/config', self.get_config)]) app.add_routes([web.post('/api/config', self.save_config)]) app.add_routes([web.get('/api/players', self.players)]) + app.add_routes([web.get('/api/players/{player_id}', self.player)]) app.add_routes([web.get('/api/players/{player_id}/queue', self.player_queue)]) app.add_routes([web.get('/api/players/{player_id}/cmd/{cmd}', self.player_command)]) app.add_routes([web.get('/api/players/{player_id}/cmd/{cmd}/{cmd_args}', self.player_command)]) @@ -194,7 +212,15 @@ class Web(): async def players(self, request): ''' get all players ''' - return web.json_response(self.mass.player.players, dumps=json_serializer) + players = list(self.mass.player.players) + players.sort(key=lambda x: x.name, reverse=False) + return web.json_response(players, dumps=json_serializer) + + async def player(self, request): + ''' get single player ''' + player_id = request.match_info.get('player_id') + player = await self.mass.player.get_player(player_id) + return web.json_response(player, dumps=json_serializer) async def player_command(self, request): ''' issue player command''' @@ -237,7 +263,7 @@ class Web(): # queue_items = [item.__dict__ for item in queue_items] # print(queue_items) # result = queue_items[offset:limit] - return web.json_response(player.queue.items, dumps=json_serializer) + return web.json_response(player.queue.items[offset:limit], dumps=json_serializer) async def index(self, request): return web.FileResponse("./web/index.html") @@ -253,14 +279,16 @@ class Web(): async def send_event(msg, msg_details): ws_msg = {"message": msg, "message_details": msg_details } await ws.send_json(ws_msg, dumps=json_serializer) - cb_id = self.mass.add_event_listener(send_event) + cb_id = await self.mass.add_event_listener(send_event) # process incoming messages async for msg in ws: if msg.type != aiohttp.WSMsgType.TEXT: continue # for now we only use WS for (simple) player commands if msg.data == 'players': - ws_msg = {'message': 'players', 'message_details': self.mass.player.players} + players = list(self.mass.player.players) + players.sort(key=lambda x: x.name, reverse=False) + ws_msg = {'message': 'players', 'message_details': players} await ws.send_json(ws_msg, dumps=json_serializer) elif msg.data.startswith('players') and '/cmd/' in msg.data: # players/{player_id}/cmd/{cmd} or players/{player_id}/cmd/{cmd}/{cmd_args} @@ -277,7 +305,7 @@ class Web(): except Exception as exc: LOGGER.exception(exc) finally: - self.mass.remove_event_listener(cb_id) + await self.mass.remove_event_listener(cb_id) LOGGER.debug('websocket connection closed') return ws @@ -303,7 +331,7 @@ class Web(): self.mass.config[key] = new_config[key] if config_changed: self.mass.save_config() - self.mass.signal_event('config_changed') + await self.mass.signal_event('config_changed') return web.Response(text='success') async def json_rpc(self, request): -- 2.34.1