From aaec0cb542ee38cfbdf404326148ee9397fcc7a8 Mon Sep 17 00:00:00 2001 From: marcelveldt Date: Fri, 25 Oct 2019 13:39:11 +0200 Subject: [PATCH] some code cleanup --- music_assistant/__init__.py | 28 +++--- music_assistant/constants.py | 2 + music_assistant/homeassistant.py | 17 ++-- music_assistant/http_streamer.py | 49 ++++++----- music_assistant/models/musicprovider.py | 84 +++--------------- music_assistant/models/player.py | 28 +++--- music_assistant/models/player_queue.py | 10 +-- music_assistant/music_manager.py | 22 +++-- music_assistant/musicproviders/qobuz.py | 6 +- music_assistant/musicproviders/spotify.py | 85 ++++++++++--------- music_assistant/playerproviders/chromecast.py | 12 +-- music_assistant/playerproviders/sonos.py | 23 ++--- music_assistant/playerproviders/squeezebox.py | 14 +-- music_assistant/playerproviders/webplayer.py | 2 +- music_assistant/utils.py | 10 +-- music_assistant/web.py | 32 +++++-- 16 files changed, 193 insertions(+), 231 deletions(-) diff --git a/music_assistant/__init__.py b/music_assistant/__init__.py index f1bf784c..d42c2fe7 100644 --- a/music_assistant/__init__.py +++ b/music_assistant/__init__.py @@ -83,20 +83,16 @@ class MusicAssistant(): ''' remove callback from our event listeners ''' self.event_listeners.pop(cb_id, None) - def create_task(self, corofcn, wait_for_result=False, ignore_exception=None): - ''' helper to create a new task on the main event loop ''' + def run_task(self, corofcn, wait_for_result=False, ignore_exception=None): + ''' helper to run a task on the main event loop from another thread ''' if threading.current_thread() is threading.main_thread(): - if wait_for_result: - raise Exception("can not wait for result in main event loop!") - return self.event_loop.create_task(corofcn) - else: - # threadsafe - future = asyncio.run_coroutine_threadsafe(corofcn, self.event_loop) - if wait_for_result: - try: - return future.result() - except Exception as exc: - if ignore_exception and isinstance(exc, ignore_exception): - return None - raise exc - return future + raise Exception("Can not be called from main event loop!") + future = asyncio.run_coroutine_threadsafe(corofcn, self.event_loop) + if wait_for_result: + try: + return future.result() + except Exception as exc: + if ignore_exception and isinstance(exc, ignore_exception): + return None + raise exc + return future diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 9c438c89..4edf53a7 100755 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -25,3 +25,5 @@ EVENT_CONFIG_CHANGED = "config changed" EVENT_PLAYBACK_STARTED = "playback started" EVENT_PLAYBACK_STOPPED = "playback stopped" EVENT_HASS_ENTITY_CHANGED = "hass entity changed" +EVENT_MUSIC_SYNC_STARTED = "music sync started" +EVENT_MUSIC_SYNC_COMPLETED = "music sync completed" diff --git a/music_assistant/homeassistant.py b/music_assistant/homeassistant.py index 6c6cdc82..6508639c 100644 --- a/music_assistant/homeassistant.py +++ b/music_assistant/homeassistant.py @@ -14,7 +14,7 @@ from aiocometd import Client, ConnectionType, Extension import copy import slugify as slug import json -from .utils import run_periodic, LOGGER, parse_track_title, try_parse_int +from .utils import run_periodic, LOGGER, IS_HASSIO, parse_track_title, try_parse_int from .models.media_types import Track from .constants import CONF_ENABLED, CONF_URL, CONF_TOKEN, EVENT_PLAYER_CHANGED, EVENT_PLAYER_ADDED, EVENT_HASS_ENTITY_CHANGED from .cache import use_cache @@ -23,13 +23,11 @@ CONF_KEY = 'homeassistant' CONF_PUBLISH_PLAYERS = "publish_players" ### auto detect hassio for auto config #### -if os.path.isfile('/data/options.json'): - IS_HASSIO = True +if IS_HASSIO: CONFIG_ENTRIES = [ (CONF_ENABLED, False, CONF_ENABLED), (CONF_PUBLISH_PLAYERS, True, 'hass_publish')] else: - IS_HASSIO = False CONFIG_ENTRIES = [ (CONF_ENABLED, False, CONF_ENABLED), (CONF_URL, 'localhost', 'hass_url'), @@ -81,10 +79,10 @@ class HomeAssistant(): return self.http_session = aiohttp.ClientSession( loop=self.mass.event_loop, connector=aiohttp.TCPConnector()) - self.mass.create_task(self.__hass_websocket()) + self.mass.event_loop.create_task(self.__hass_websocket()) await self.mass.add_event_listener(self.mass_event, EVENT_PLAYER_CHANGED) await self.mass.add_event_listener(self.mass_event, EVENT_PLAYER_ADDED) - self.mass.create_task(self.__get_sources()) + self.mass.event_loop.create_task(self.__get_sources()) async def get_state_async(self, entity_id, attribute='state'): ''' get state of a hass entity (async)''' @@ -105,7 +103,7 @@ class HomeAssistant(): else: return state_obj else: - self.mass.create_task(self.__request_state(entity_id)) + self.mass.event_loop.create_task(self.__request_state(entity_id)) return None async def __request_state(self, entity_id): @@ -113,8 +111,7 @@ class HomeAssistant(): state_obj = await self.__get_data('states/%s' % entity_id) if 'state' in state_obj: self._tracked_entities[entity_id] = state_obj - self.mass.create_task( - self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, state_obj)) + await self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, state_obj) async def mass_event(self, msg, msg_details): ''' received event from mass ''' @@ -126,7 +123,7 @@ class HomeAssistant(): if event_type == 'state_changed': if event_data['entity_id'] in self._tracked_entities: self._tracked_entities[event_data['entity_id']] = event_data['new_state'] - self.mass.create_task( + self.mass.event_loop.create_task( self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, event_data)) elif event_type == 'call_service' and event_data['domain'] == 'media_player': await self.__handle_player_command(event_data['service'], event_data['service_data']) diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index 412bc6ec..ca19b193 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -14,6 +14,7 @@ import io import aiohttp import subprocess import gc +import shlex from .constants import EVENT_STREAM_STARTED, EVENT_STREAM_ENDED from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size @@ -31,9 +32,7 @@ class HTTPStreamer(): async def setup(self): ''' async initialize of module ''' - pass - # self.mass.create_task( - # asyncio.start_server(self.sockets_streamer, '0.0.0.0', 8093)) + pass # we have nothing to initialize async def stream(self, http_request): ''' @@ -78,7 +77,7 @@ class HTTPStreamer(): # we must consume the data to prevent hanging subprocess instances continue # put chunk in buffer - self.mass.create_task( + self.mass.run_task( buffer.write(audio_chunk), wait_for_result=True, ignore_exception=(BrokenPipeError,ConnectionResetError)) # all chunks received: streaming finished @@ -86,7 +85,7 @@ class HTTPStreamer(): LOGGER.debug("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name)) else: # indicate EOF if no more data - self.mass.create_task( + self.mass.run_task( buffer.write_eof(), wait_for_result=True, ignore_exception=(BrokenPipeError,ConnectionResetError)) LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name)) @@ -105,7 +104,8 @@ class HTTPStreamer(): pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate args = 'sox -t %s - -t flac -C 0 -' % pcm_args # start sox process - sox_proc = subprocess.Popen(args, shell=True, + args = shlex.split(args) + sox_proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE) def fill_buffer(): @@ -115,12 +115,12 @@ class HTTPStreamer(): if not chunk: break if chunk and not cancelled.is_set(): - self.mass.create_task(buffer.write(chunk), + self.mass.run_task(buffer.write(chunk), wait_for_result=True, ignore_exception=(BrokenPipeError,ConnectionResetError)) del chunk # indicate EOF if no more data if not cancelled.is_set(): - self.mass.create_task(buffer.write_eof(), + self.mass.run_task(buffer.write_eof(), wait_for_result=True, ignore_exception=(BrokenPipeError,ConnectionResetError)) # start fill buffer task in background fill_buffer_thread = threading.Thread(target=fill_buffer) @@ -270,7 +270,7 @@ class HTTPStreamer(): # sort by quality and check track availability for prov_media in sorted(queue_item.provider_ids, key=operator.itemgetter('quality'), reverse=True): - streamdetails = self.mass.create_task( + streamdetails = self.mass.run_task( self.mass.music.providers[prov_media['provider']].get_stream_details(prov_media['item_id']), wait_for_result=True) if streamdetails: @@ -298,16 +298,22 @@ class HTTPStreamer(): if streamdetails["content_type"] == 'aac': # support for AAC created with ffmpeg in between args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_options) - elif streamdetails['type'] == 'url': + process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE) + elif streamdetails['type'] in ['url', 'file']: args = 'sox -t %s "%s" -t %s - %s' % (streamdetails["content_type"], streamdetails["path"], outputfmt, sox_options) + args = shlex.split(args) + process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE) elif streamdetails['type'] == 'executable': args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, sox_options) - # start sox process - process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE) + process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE) + else: + LOGGER.warning(f"no streaming options for {queue_item.name}") + yield (True, b'') + return # fire event that streaming has started for this track - self.mass.create_task( + self.mass.run_task( self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails)) # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly @@ -328,7 +334,7 @@ class HTTPStreamer(): bytes_sent += len(chunk) yield (False, chunk) # fire event that streaming has ended - self.mass.create_task(self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails)) + self.mass.run_task(self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails)) # send task to background to analyse the audio if queue_item.media_type == MediaType.Track: self.mass.event_loop.run_in_executor(None, self.__analyze_audio, streamdetails) @@ -337,7 +343,7 @@ class HTTPStreamer(): ''' get player specific sox effect options ''' sox_options = [] # volume normalisation - gain_correct = self.mass.create_task( + gain_correct = self.mass.run_task( self.mass.players.get_gain_correct( player.player_id, streamdetails["item_id"], streamdetails["provider"]), wait_for_result=True) @@ -364,7 +370,7 @@ class HTTPStreamer(): if item_key in self.analyze_jobs: return # prevent multiple analyze jobs for same track self.analyze_jobs[item_key] = True - track_loudness = self.mass.create_task(self.mass.db.get_track_loudness( + track_loudness = self.mass.run_task(self.mass.db.get_track_loudness( streamdetails["item_id"], streamdetails["provider"]), wait_for_result=True) if track_loudness == None: # only when needed we do the analyze stuff @@ -380,7 +386,7 @@ class HTTPStreamer(): meter = pyloudnorm.Meter(rate) # create BS.1770 meter loudness = meter.integrated_loudness(data) # measure loudness del data - self.mass.create_task( + self.mass.run_task( self.mass.db.set_track_loudness(streamdetails["item_id"], streamdetails["provider"], loudness)) del audio_data LOGGER.debug("Integrated loudness of track %s is: %s" %(item_key, loudness)) @@ -391,18 +397,21 @@ class HTTPStreamer(): # create fade-in part fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) args = 'sox --ignore-length -t %s - -t %s %s fade t %s' % (pcm_args, pcm_args, fadeinfile.name, fade_length) - process = subprocess.Popen(args, shell=True, stdin=subprocess.PIPE) + args = shlex.split(args) + process = subprocess.Popen(args, shell=False, stdin=subprocess.PIPE) process.communicate(fade_in_part) # create fade-out part fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) args = 'sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse' % (pcm_args, pcm_args, fadeoutfile.name, fade_length) - process = subprocess.Popen(args, shell=True, + args = shlex.split(args) + process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE) process.communicate(fade_out_part) # create crossfade using sox and some temp files # TODO: figure out how to make this less complex and without the tempfiles args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args) - process = subprocess.Popen(args, shell=True, + args = shlex.split(args) + process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE) crossfade_part, stderr = process.communicate() fadeinfile.close() diff --git a/music_assistant/models/musicprovider.py b/music_assistant/models/musicprovider.py index 0ed1376b..67192c74 100755 --- a/music_assistant/models/musicprovider.py +++ b/music_assistant/models/musicprovider.py @@ -278,27 +278,32 @@ class MusicProvider(): async def search(self, searchstring, media_types=List[MediaType], limit=5): ''' perform search on the provider ''' - raise NotImplementedError + return { + "artists": [], + "albums": [], + "tracks": [], + "playlists": [] + } async def get_library_artists(self) -> List[Artist]: ''' retrieve library artists from the provider ''' - raise NotImplementedError + return [] async def get_library_albums(self) -> List[Album]: ''' retrieve library albums from the provider ''' - raise NotImplementedError + return [] async def get_library_tracks(self) -> List[Track]: ''' retrieve library tracks from the provider ''' - raise NotImplementedError + return [] async def get_playlists(self) -> List[Playlist]: ''' retrieve library/subscribed playlists from the provider ''' - raise NotImplementedError + return [] async def get_radios(self) -> List[Radio]: ''' retrieve library/subscribed radio stations from the provider ''' - raise NotImplementedError + return [] async def get_artist(self, prov_item_id) -> Artist: ''' get full artist details by id ''' @@ -356,68 +361,7 @@ class MusicProvider(): ''' return the content type for the given track when it will be streamed''' raise NotImplementedError - async def get_stream(self, track_id): - ''' get audio stream for a track ''' - raise NotImplementedError - - -class PlayerProvider(): - ''' - Model for a Playerprovider - Common methods usable for every provider - Provider specific __get methods shoud be overriden in the provider specific implementation - ''' - name = 'My great Musicplayer provider' # display name - prov_id = 'my_provider' # used as id - icon = '' - - def __init__(self, mass): - self.mass = mass - - ### Common methods and properties #### - - async def players(self): - ''' return all players for this provider ''' - return await self.mass.provider_players(self.prov_id) - - async def get_player(self, player_id): - ''' return player by id ''' - return await self.mass.get_player(player_id) - - async def add_player(self, player_id, name='', is_group=False): - ''' register a new player ''' - return await self.mass.players.add_player(player_id, - self.prov_id, name=name, is_group=is_group) - - async def remove_player(self, player_id): - ''' remove a player ''' - return await self.mass.players.remove_player(player_id) - - ### Provider specific implementation ##### - - async def player_config_entries(self): - ''' get the player config entries for this provider (list with key/value pairs)''' - return [ - (CONF_ENABLED, True, CONF_ENABLED) - ] - - async def play_media(self, player_id, media_items:List[Track], queue_opt='play'): - ''' - play media on a player - params: - - player_id: id of the player - - media_items: List of Tracks to play, each Track will contain uri attribute (e.g. spotify:track:1234 or http://pathtostream) - - queue_opt: - replace: replace whatever is currently playing with this media - next: the given media will be played after the currently playing track - add: add to the end of the queue - play: keep existing queue but play the given item(s) now first - ''' - raise NotImplementedError - - async def player_command(self, player_id, cmd:str, cmd_args=None): - ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) ''' + async def get_stream_details(self, track_id): + ''' get streamdetails for a track ''' raise NotImplementedError - - - + \ No newline at end of file diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 9ee63171..be284b97 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -147,7 +147,7 @@ class Player(): ''' [PROTECTED] set (real) name of this player ''' if name != self._name: self._name = name - self.mass.create_task(self.update()) + self.mass.event_loop.create_task(self.update()) @property def is_group(self): @@ -177,25 +177,25 @@ class Player(): ''' [PROTECTED] set group_childs property of this player ''' if group_childs != self._group_childs: self._group_childs = group_childs - self.mass.create_task(self.update()) + self.mass.event_loop.create_task(self.update()) for child_player_id in group_childs: - self.mass.create_task( + self.mass.event_loop.create_task( self.mass.players.trigger_update(child_player_id)) def add_group_child(self, child_player_id): ''' add player as child to this group player ''' if not child_player_id in self._group_childs: self._group_childs.append(child_player_id) - self.mass.create_task(self.update()) - self.mass.create_task( + self.mass.event_loop.create_task(self.update()) + self.mass.event_loop.create_task( self.mass.players.trigger_update(child_player_id)) def remove_group_child(self, child_player_id): ''' remove player as child from this group player ''' if child_player_id in self._group_childs: self._group_childs.remove(child_player_id) - self.mass.create_task(self.update()) - self.mass.create_task( + self.mass.event_loop.create_task(self.update()) + self.mass.event_loop.create_task( self.mass.players.trigger_update(child_player_id)) @property @@ -215,7 +215,7 @@ class Player(): ''' [PROTECTED] set state property of this player ''' if state != self._state: self._state = state - self.mass.create_task(self.update(update_queue=True)) + self.mass.event_loop.create_task(self.update(update_queue=True)) @property def powered(self): @@ -242,7 +242,7 @@ class Player(): ''' [PROTECTED] set (real) power state for this player ''' if powered != self._powered: self._powered = powered - self.mass.create_task(self.update()) + self.mass.event_loop.create_task(self.update()) @property def cur_time(self): @@ -262,7 +262,7 @@ class Player(): if cur_time != self._cur_time: self._cur_time = cur_time self._media_position_updated_at = time.time() - self.mass.create_task(self.update(update_queue=True)) + self.mass.event_loop.create_task(self.update(update_queue=True)) @property def media_position_updated_at(self): @@ -284,7 +284,7 @@ class Player(): ''' [PROTECTED] set cur_uri (uri loaded in player) property of this player ''' if cur_uri != self._cur_uri: self._cur_uri = cur_uri - self.mass.create_task(self.update(update_queue=True)) + self.mass.event_loop.create_task(self.update(update_queue=True)) @property def volume_level(self): @@ -316,10 +316,10 @@ class Player(): volume_level = try_parse_int(volume_level) if volume_level != self._volume_level: self._volume_level = volume_level - self.mass.create_task(self.update()) + self.mass.event_loop.create_task(self.update()) # trigger update on group player for group_parent_id in self.group_parents: - self.mass.create_task( + self.mass.event_loop.create_task( self.mass.players.trigger_update(group_parent_id)) @property @@ -333,7 +333,7 @@ class Player(): is_muted = try_parse_bool(is_muted) if is_muted != self._muted: self._muted = is_muted - self.mass.create_task(self.update()) + self.mass.event_loop.create_task(self.update()) @property def enabled(self): diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index ed90f345..a04f2612 100755 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -151,11 +151,11 @@ class PlayerQueue(): # shuffle requested self._shuffle_enabled = True await self.load(self._items) - self.mass.create_task(self._player.update()) + self.mass.event_loop.create_task(self._player.update()) elif self._shuffle_enabled and not enable_shuffle: self._shuffle_enabled = False # TODO: Unshuffle the list ? - self.mass.create_task(self._player.update()) + self.mass.event_loop.create_task(self._player.update()) async def next(self): ''' request next track in queue ''' @@ -301,11 +301,9 @@ class PlayerQueue(): # account for track changing state so trigger track change after 1 second if self._last_track and self._last_track.streamdetails: self._last_track.streamdetails["seconds_played"] = self._last_item_time - self.mass.create_task( - self.mass.signal_event(EVENT_PLAYBACK_STOPPED, self._last_track.streamdetails)) + await self.mass.signal_event(EVENT_PLAYBACK_STOPPED, self._last_track.streamdetails) if new_track and new_track.streamdetails: - self.mass.create_task( - self.mass.signal_event(EVENT_PLAYBACK_STARTED, new_track.streamdetails)) + await self.mass.signal_event(EVENT_PLAYBACK_STARTED, new_track.streamdetails) self._last_track = new_track self.mass.event_loop.run_in_executor(None, self.__save_to_file) if self._last_player_state != self._player.state: diff --git a/music_assistant/music_manager.py b/music_assistant/music_manager.py index 79a7df00..641eab9a 100755 --- a/music_assistant/music_manager.py +++ b/music_assistant/music_manager.py @@ -7,9 +7,9 @@ import toolz import operator import os -from .utils import run_periodic, LOGGER, try_supported, load_provider_modules +from .utils import run_periodic, LOGGER, load_provider_modules from .models.media_types import MediaType, Track, Artist, Album, Playlist, Radio -from .constants import CONF_KEY_MUSICPROVIDERS +from .constants import CONF_KEY_MUSICPROVIDERS, EVENT_MUSIC_SYNC_STARTED, EVENT_MUSIC_SYNC_COMPLETED class MusicManager(): @@ -27,7 +27,7 @@ class MusicManager(): for prov in self.providers.values(): await prov.setup() # schedule sync task - self.mass.create_task(self.sync_music_providers()) + self.mass.event_loop.create_task(self.sync_music_providers()) async def item(self, item_id, media_type:MediaType, provider='database', lazy=True): ''' get single music item by id and media type''' @@ -254,21 +254,25 @@ class MusicManager(): # actually add the tracks to the playlist on the provider await self.providers[playlist_prov['provider']].add_playlist_tracks(playlist_prov['item_id'], track_ids_to_add) # schedule sync - self.mass.create_task(self.sync_playlist_tracks(playlist.item_id, playlist_prov['provider'], playlist_prov['item_id'])) + self.mass.event_loop.create_task(self.sync_playlist_tracks(playlist.item_id, playlist_prov['provider'], playlist_prov['item_id'])) @run_periodic(3600) async def sync_music_providers(self): ''' periodic sync of all music providers ''' if self.sync_running: return + LOGGER.info("Music provider sync started") for prov_id in self.providers.keys(): self.sync_running = prov_id + await self.mass.signal_event(EVENT_MUSIC_SYNC_STARTED, prov_id) # sync library items for each provider (if supported) - await try_supported(self.sync_library_artists(prov_id)) - await try_supported(self.sync_library_albums(prov_id)) - await try_supported(self.sync_library_tracks(prov_id)) - await try_supported(self.sync_playlists(prov_id)) - await try_supported(self.sync_radios(prov_id)) + await self.sync_library_artists(prov_id) + await self.sync_library_albums(prov_id) + await self.sync_library_tracks(prov_id) + await self.sync_playlists(prov_id) + await self.sync_radios(prov_id) + LOGGER.info("Music provider sync completed") + await self.mass.signal_event(EVENT_MUSIC_SYNC_COMPLETED, None) self.sync_running = None async def sync_library_artists(self, prov_id): diff --git a/music_assistant/musicproviders/qobuz.py b/music_assistant/musicproviders/qobuz.py index b1c850da..9fae439f 100644 --- a/music_assistant/musicproviders/qobuz.py +++ b/music_assistant/musicproviders/qobuz.py @@ -332,8 +332,7 @@ class QobuzProvider(MusicProvider): ''' parse qobuz album object to generic layout ''' album = Album() if not album_obj.get('id') or not album_obj["streamable"] or not album_obj["displayable"]: - # some safety checks - LOGGER.warning("invalid/unavailable album found: %s" % album_obj.get('id')) + # do not return unavailable items return None album.item_id = album_obj['id'] album.provider = self.prov_id @@ -378,8 +377,7 @@ class QobuzProvider(MusicProvider): ''' parse qobuz track object to generic layout ''' track = Track() if not track_obj.get('id') or not track_obj["streamable"] or not track_obj["displayable"]: - # some safety checks - LOGGER.warning("invalid/unavailable track found: %s - %s" % (track_obj.get('id'), track_obj.get('name'))) + # do not return unavailable items return None track.item_id = track_obj['id'] track.provider = self.prov_id diff --git a/music_assistant/musicproviders/spotify.py b/music_assistant/musicproviders/spotify.py index 866f0526..0e2a2662 100644 --- a/music_assistant/musicproviders/spotify.py +++ b/music_assistant/musicproviders/spotify.py @@ -8,11 +8,10 @@ import sys import time import concurrent from asyncio_throttle import Throttler -import json import aiohttp from ..cache import use_cache -from ..utils import run_periodic, LOGGER, parse_track_title +from ..utils import run_periodic, LOGGER, parse_track_title, json from ..app_vars import get_app_var from ..models import MusicProvider, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist from ..constants import CONF_USERNAME, CONF_PASSWORD, CONF_ENABLED, CONF_TYPE_PASSWORD @@ -246,8 +245,10 @@ class SpotifyProvider(MusicProvider): async def get_stream_details(self, track_id): ''' return the content details for the given track when it will be streamed''' + # make sure there is a valid token in cache + await self.get_token() spotty = self.get_spotty_binary() - spotty_exec = "%s -n temp -u %s -p %s --pass-through --single-track %s" %(spotty, self._username, self._password, track_id) + spotty_exec = '%s -n temp -c "%s" --pass-through --single-track %s' %(spotty, self.mass.datapath, track_id) return { "type": "executable", "path": spotty_exec, @@ -328,7 +329,7 @@ class SpotifyProvider(MusicProvider): if 'track' in track_obj: track_obj = track_obj['track'] if track_obj['is_local'] or not track_obj['id'] or not track_obj['is_playable']: - LOGGER.warning("invalid/unavailable track found: %s - %s" % (track_obj.get('id'), track_obj.get('name'))) + # do not return unavailable items return None track = Track() track.item_id = track_obj['id'] @@ -388,42 +389,8 @@ class SpotifyProvider(MusicProvider): tokeninfo = {} if not self._username or not self._password: return tokeninfo - # try with spotipy-token module first, fallback to spotty - try: - import spotify_token as st - data = st.start_session(self._username, self._password) - if data and len(data) == 2: - tokeninfo = {"accessToken": data[0], "expiresIn": data[1] - int(time.time()), "expiresAt":data[1] } - except Exception as exc: - LOGGER.debug(exc) - if not tokeninfo: - # fallback to spotty approach - import subprocess - scopes = [ - "user-read-playback-state", - "user-read-currently-playing", - "user-modify-playback-state", - "playlist-read-private", - "playlist-read-collaborative", - "playlist-modify-public", - "playlist-modify-private", - "user-follow-modify", - "user-follow-read", - "user-library-read", - "user-library-modify", - "user-read-private", - "user-read-email", - "user-read-birthdate", - "user-top-read"] - scope = ",".join(scopes) - args = [self.get_spotty_binary(), "-t", "--client-id", get_app_var(2), "--scope", scope, "-n", "temp-spotty", "-u", self._username, "-p", self._password, "--disable-discovery"] - spotty = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, stderr = spotty.communicate() - result = json.loads(stdout) - # transform token info to spotipy compatible format - if result and "accessToken" in result: - tokeninfo = result - tokeninfo['expiresAt'] = tokeninfo['expiresIn'] + int(time.time()) + # retrieve token with spotty + tokeninfo = await self.mass.event_loop.run_in_executor(None, self.__get_token) if tokeninfo: self.__auth_token = tokeninfo self.sp_user = await self.__get_data("me") @@ -433,6 +400,44 @@ class SpotifyProvider(MusicProvider): raise Exception("Can't get Spotify token for user %s" % self._username) return tokeninfo + def __get_token(self): + ''' get spotify auth token with spotty bin ''' + # get token with spotty + scopes = [ + "user-read-playback-state", + "user-read-currently-playing", + "user-modify-playback-state", + "playlist-read-private", + "playlist-read-collaborative", + "playlist-modify-public", + "playlist-modify-private", + "user-follow-modify", + "user-follow-read", + "user-library-read", + "user-library-modify", + "user-read-private", + "user-read-email", + "user-read-birthdate", + "user-top-read"] + scope = ",".join(scopes) + args = [self.get_spotty_binary(), "-t", + "--client-id", get_app_var(2), + "--scope", scope, + "-n", "temp-spotty", + "-u", self._username, + "-p", self._password, + "-c", self.mass.datapath, + "--disable-discovery"] + import subprocess + spotty = subprocess.Popen(args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) + stdout, stderr = spotty.communicate() + result = json.loads(stdout) + # transform token info to spotipy compatible format + if result and "accessToken" in result: + tokeninfo = result + tokeninfo['expiresAt'] = tokeninfo['expiresIn'] + int(time.time()) + return tokeninfo + async def __get_all_items(self, endpoint, params={}, limit=0, offset=0, cache_checksum=None): ''' get all items from a paged list ''' if not cache_checksum: diff --git a/music_assistant/playerproviders/chromecast.py b/music_assistant/playerproviders/chromecast.py index e2a01da6..eb5ad69e 100644 --- a/music_assistant/playerproviders/chromecast.py +++ b/music_assistant/playerproviders/chromecast.py @@ -213,7 +213,7 @@ class ChromecastPlayer(Player): self.cur_uri = mediastatus.content_id self.cur_time = mediastatus.adjusted_current_time if self._state == PlayerState.Playing and self.__cc_report_progress_task == None: - self.__cc_report_progress_task = self.mass.create_task(self.__report_progress()) + self.__cc_report_progress_task = self.mass.event_loop.create_task(self.__report_progress()) class ChromecastProvider(PlayerProvider): ''' support for ChromeCast Audio ''' @@ -228,7 +228,7 @@ class ChromecastProvider(PlayerProvider): async def setup(self): ''' perform async setup ''' - self.mass.create_task( + self.mass.event_loop.create_task( self.__periodic_chromecast_discovery()) async def __handle_group_members_update(self, mz, added_player=None, removed_player=None): @@ -259,7 +259,7 @@ class ChromecastProvider(PlayerProvider): if not player.cc.socket_client or not player.cc.socket_client.is_connected: # cleanup cast object del player.cc - self.mass.create_task(self.remove_player(player.player_id)) + self.mass.run_task(self.remove_player(player.player_id)) # search for available chromecasts from pychromecast.discovery import start_discovery, stop_discovery def discovered_callback(name): @@ -302,7 +302,7 @@ class ChromecastProvider(PlayerProvider): chromecast.register_status_listener(status_listener) chromecast.media_controller.register_status_listener(status_listener) player.cc.wait() - self.mass.create_task(self.add_player(player)) + self.mass.run_task(self.add_player(player)) if player.mz: player.mz.update_members() @@ -319,11 +319,11 @@ class StatusListener: self.player_id = player_id def new_cast_status(self, status): ''' chromecast status changed (like volume etc.)''' - self.mass.create_task( + self.mass.run_task( self.__handle_callback(caststatus=status)) def new_media_status(self, status): ''' mediacontroller has new state ''' - self.mass.create_task( + self.mass.run_task( self.__handle_callback(mediastatus=status)) def new_connection_status(self, status): ''' will be called when the connection changes ''' diff --git a/music_assistant/playerproviders/sonos.py b/music_assistant/playerproviders/sonos.py index ba658f70..b77f32be 100644 --- a/music_assistant/playerproviders/sonos.py +++ b/music_assistant/playerproviders/sonos.py @@ -114,7 +114,7 @@ class SonosPlayer(Player): await asyncio.sleep(1) self.__sonos_report_progress_task = None - def _update_state(self, event=None): + async def update_state(self, event=None): ''' update state, triggerer by event ''' if event: variables = event.variables @@ -141,7 +141,7 @@ class SonosPlayer(Player): rel_time = self.__timespan_secs(position_info.get("RelTime")) self.cur_time = rel_time if self._state == PlayerState.Playing and self.__sonos_report_progress_task == None: - self.__sonos_report_progress_task = self.mass.create_task(self.__report_progress()) + self.__sonos_report_progress_task = self.mass.event_loop.create_task(self.__report_progress()) @staticmethod def __convert_state(sonos_state): @@ -173,7 +173,7 @@ class SonosProvider(PlayerProvider): async def setup(self): ''' perform async setup ''' - self.mass.create_task( + self.mass.event_loop.create_task( self.__periodic_discovery()) @run_periodic(1800) @@ -196,7 +196,7 @@ class SonosProvider(PlayerProvider): # remove any disconnected players... for player in self.players: if not player.is_group and not player.soco.uid in new_device_ids: - self.mass.create_task(self.remove_player(player.player_id)) + self.mass.run_task(self.remove_player(player.player_id)) # process new players for device in discovered_devices: if device.uid not in cur_player_ids and device.is_visible: @@ -219,13 +219,13 @@ class SonosProvider(PlayerProvider): player._media_position_updated_at = None # handle subscriptions to events def subscribe(service, action): - queue = _ProcessSonosEventQueue(action) + queue = _ProcessSonosEventQueue(self.mass, action) sub = service.subscribe(auto_renew=True, event_queue=queue) player._subscriptions.append(sub) - subscribe(soco_device.avTransport, player._update_state) - subscribe(soco_device.renderingControl, player._update_state) + subscribe(soco_device.avTransport, player.update_state) + subscribe(soco_device.renderingControl, player.update_state) subscribe(soco_device.zoneGroupTopology, self.__topology_changed) - self.mass.create_task(self.add_player(player)) + self.mass.run_task(self.add_player(player)) return player def __process_groups(self, sonos_groups): @@ -242,7 +242,7 @@ class SonosProvider(PlayerProvider): group_player.name = group.label group_player.group_childs = [item.uid for item in group.members] - def __topology_changed(self, event=None): + async def __topology_changed(self, event=None): ''' received topology changed event from one of the sonos players @@ -253,13 +253,14 @@ class SonosProvider(PlayerProvider): class _ProcessSonosEventQueue: """Queue like object for dispatching sonos events.""" - def __init__(self, handler): + def __init__(self, mass, handler): """Initialize Sonos event queue.""" self._handler = handler + self.mass = mass def put(self, item, block=True, timeout=None): """Process event.""" try: - self._handler(item) + self.mass.run_task(self._handler(item), wait_for_result=True) except Exception as ex: LOGGER.warning("Error calling %s: %s", self._handler, ex) \ No newline at end of file diff --git a/music_assistant/playerproviders/squeezebox.py b/music_assistant/playerproviders/squeezebox.py index e60d1f2c..cf012eb9 100644 --- a/music_assistant/playerproviders/squeezebox.py +++ b/music_assistant/playerproviders/squeezebox.py @@ -41,10 +41,10 @@ class PySqueezeProvider(PlayerProvider): async def setup(self): ''' async initialize of module ''' # start slimproto server - self.mass.create_task( + self.mass.event_loop.create_task( asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483)) # setup discovery - self.mass.create_task(self.start_discovery()) + self.mass.event_loop.create_task(self.start_discovery()) async def start_discovery(self): transport, protocol = await self.mass.event_loop.create_datagram_endpoint( @@ -84,7 +84,7 @@ class PySqueezeProvider(PlayerProvider): player_id = str(device_mac).lower() device_type = devices.get(dev_id, 'unknown device') player = PySqueezePlayer(self.mass, player_id, self.prov_id, device_type, writer) - self.mass.create_task(self.mass.players.add_player(player)) + self.mass.event_loop.create_task(self.mass.players.add_player(player)) elif player != None: player.process_msg(operation, packet) @@ -122,8 +122,8 @@ class PySqueezePlayer(Player): self.send_frame(b"setd", struct.pack("B", 4)) # TODO: remember last volume and power state - self.mass.create_task(self.volume_set(40)) - self.mass.create_task(self.power_off()) + self.mass.event_loop.create_task(self.volume_set(40)) + self.mass.event_loop.create_task(self.power_off()) self._heartbeat_task = asyncio.create_task(self.__send_heartbeat()) async def cmd_stop(self): @@ -243,7 +243,7 @@ class PySqueezePlayer(Player): ''' send command to Squeeze player''' packet = struct.pack('!H', len(data) + 4) + command + data self._writer.write(packet) - self.mass.create_task(self._writer.drain()) + self.mass.event_loop.create_task(self._writer.drain()) def send_version(self): self.send_frame(b'vers', b'7.8') @@ -318,7 +318,7 @@ class PySqueezePlayer(Player): LOGGER.debug("Decoder Ready for next track") next_item = self.queue.next_item if next_item: - self.mass.create_task( + self.mass.event_loop.create_task( self.__send_play(next_item.uri)) def stat_STMe(self, data): diff --git a/music_assistant/playerproviders/webplayer.py b/music_assistant/playerproviders/webplayer.py index a079b1d2..6af0fb7d 100644 --- a/music_assistant/playerproviders/webplayer.py +++ b/music_assistant/playerproviders/webplayer.py @@ -50,7 +50,7 @@ class WebPlayerProvider(PlayerProvider): ''' async initialize of module ''' await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_STATE) await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_REGISTER) - self.mass.create_task(self.check_players()) + self.mass.event_loop.create_task(self.check_players()) async def handle_mass_event(self, msg, msg_details): ''' received event for the webplayer component ''' diff --git a/music_assistant/utils.py b/music_assistant/utils.py index 90a2baee..18232ef5 100755 --- a/music_assistant/utils.py +++ b/music_assistant/utils.py @@ -15,6 +15,7 @@ LOGGER = logging.getLogger('music_assistant') from .constants import CONF_KEY_MUSICPROVIDERS, CONF_ENABLED +IS_HASSIO = os.path.isfile('/data/options.json') def run_periodic(period): def scheduler(fcn): @@ -25,15 +26,6 @@ def run_periodic(period): return wrapper return scheduler -async def try_supported(task): - ''' try to execute a task and pass NotImplementedError Exception ''' - ret = None - try: - ret = await task - except NotImplementedError: - pass - return ret - def filename_from_string(string): ''' create filename from unsafe string ''' keepcharacters = (' ','.','_') diff --git a/music_assistant/web.py b/music_assistant/web.py index 685b846f..5abe5c7f 100755 --- a/music_assistant/web.py +++ b/music_assistant/web.py @@ -10,9 +10,10 @@ import ssl import concurrent import threading from .models.media_types import MediaItem, MediaType, media_type_from_string -from .utils import run_periodic, LOGGER, run_async_background_task, get_ip, json_serializer +from .utils import run_periodic, LOGGER, IS_HASSIO, run_async_background_task, get_ip, json_serializer CONF_KEY = 'web' + CONFIG_ENTRIES = [ ('http_port', 8095, 'web_http_port'), ('https_port', 8096, 'web_https_port'), @@ -28,17 +29,18 @@ class Web(): self.mass = mass # load/create/update config config = self.mass.config.create_module_config(CONF_KEY, CONFIG_ENTRIES) + enable_ssl = config['ssl_certificate'] and config['ssl_key'] if config['ssl_certificate'] and not os.path.isfile( config['ssl_certificate']): - raise FileNotFoundError( - "SSL certificate file not found: %s" % config['ssl_certificate']) + enable_ssl = False + LOGGER.warning("SSL certificate file not found: %s" % config['ssl_certificate']) if config['ssl_key'] and not os.path.isfile(config['ssl_key']): - raise FileNotFoundError( - "SSL certificate key file not found: %s" % config['ssl_key']) - self.local_ip = get_ip() + enable_ssl = False + LOGGER.warning( "SSL certificate key file not found: %s" % config['ssl_key']) self.http_port = config['http_port'] self.https_port = config['https_port'] - self._enable_ssl = config['ssl_certificate'] and config['ssl_key'] + self._enable_ssl = enable_ssl + self.local_ip = get_ip() self.config = config async def setup(self): @@ -74,11 +76,25 @@ class Web(): await self.runner.setup() http_site = web.TCPSite(self.runner, '0.0.0.0', self.http_port) await http_site.start() + LOGGER.info("Started HTTP webserver on port %s" % self.http_port) if self._enable_ssl: ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.load_cert_chain(self.config['ssl_certificate'], self.config['ssl_key']) - https_site = web.TCPSite(self.runner, '0.0.0.0', self.https_port, ssl_context=ssl_context) + https_site = web.TCPSite(self.runner, '0.0.0.0', self.config['https_port'], ssl_context=ssl_context) await https_site.start() + LOGGER.info("Started HTTPS webserver on port %s" % self.config['https_port']) + if IS_HASSIO: + # host additional http port for hassio ingress + headers = {"X-HASSIO-KEY": os.environ["HASSIO_TOKEN"]} + url = "http://hassio/addons/self/info" + async with aiohttp.ClientSession().get(url, headers=headers, verify_ssl=False) as response: + result = await response.json() + ingress_port = int(result["ingress_port"]) + ingress_site = web.TCPSite(self.runner, '0.0.0.0', ingress_port) + await ingress_site.start() + LOGGER.info("Started INGRESS webserver on port %s" % ingress_port) + + async def get_items(self, request): ''' get multiple library items''' -- 2.34.1