''' remove callback from our event listeners '''
self.event_listeners.pop(cb_id, None)
- def create_task(self, corofcn, wait_for_result=False, ignore_exception=None):
- ''' helper to create a new task on the main event loop '''
+ def run_task(self, corofcn, wait_for_result=False, ignore_exception=None):
+ ''' helper to run a task on the main event loop from another thread '''
if threading.current_thread() is threading.main_thread():
- if wait_for_result:
- raise Exception("can not wait for result in main event loop!")
- return self.event_loop.create_task(corofcn)
- else:
- # threadsafe
- future = asyncio.run_coroutine_threadsafe(corofcn, self.event_loop)
- if wait_for_result:
- try:
- return future.result()
- except Exception as exc:
- if ignore_exception and isinstance(exc, ignore_exception):
- return None
- raise exc
- return future
+ raise Exception("Can not be called from main event loop!")
+ future = asyncio.run_coroutine_threadsafe(corofcn, self.event_loop)
+ if wait_for_result:
+ try:
+ return future.result()
+ except Exception as exc:
+ if ignore_exception and isinstance(exc, ignore_exception):
+ return None
+ raise exc
+ return future
EVENT_PLAYBACK_STARTED = "playback started"
EVENT_PLAYBACK_STOPPED = "playback stopped"
EVENT_HASS_ENTITY_CHANGED = "hass entity changed"
+EVENT_MUSIC_SYNC_STARTED = "music sync started"
+EVENT_MUSIC_SYNC_COMPLETED = "music sync completed"
import copy
import slugify as slug
import json
-from .utils import run_periodic, LOGGER, parse_track_title, try_parse_int
+from .utils import run_periodic, LOGGER, IS_HASSIO, parse_track_title, try_parse_int
from .models.media_types import Track
from .constants import CONF_ENABLED, CONF_URL, CONF_TOKEN, EVENT_PLAYER_CHANGED, EVENT_PLAYER_ADDED, EVENT_HASS_ENTITY_CHANGED
from .cache import use_cache
CONF_PUBLISH_PLAYERS = "publish_players"
### auto detect hassio for auto config ####
-if os.path.isfile('/data/options.json'):
- IS_HASSIO = True
+if IS_HASSIO:
CONFIG_ENTRIES = [
(CONF_ENABLED, False, CONF_ENABLED),
(CONF_PUBLISH_PLAYERS, True, 'hass_publish')]
else:
- IS_HASSIO = False
CONFIG_ENTRIES = [
(CONF_ENABLED, False, CONF_ENABLED),
(CONF_URL, 'localhost', 'hass_url'),
return
self.http_session = aiohttp.ClientSession(
loop=self.mass.event_loop, connector=aiohttp.TCPConnector())
- self.mass.create_task(self.__hass_websocket())
+ self.mass.event_loop.create_task(self.__hass_websocket())
await self.mass.add_event_listener(self.mass_event, EVENT_PLAYER_CHANGED)
await self.mass.add_event_listener(self.mass_event, EVENT_PLAYER_ADDED)
- self.mass.create_task(self.__get_sources())
+ self.mass.event_loop.create_task(self.__get_sources())
async def get_state_async(self, entity_id, attribute='state'):
''' get state of a hass entity (async)'''
else:
return state_obj
else:
- self.mass.create_task(self.__request_state(entity_id))
+ self.mass.event_loop.create_task(self.__request_state(entity_id))
return None
async def __request_state(self, entity_id):
state_obj = await self.__get_data('states/%s' % entity_id)
if 'state' in state_obj:
self._tracked_entities[entity_id] = state_obj
- self.mass.create_task(
- self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, state_obj))
+ await self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, state_obj)
async def mass_event(self, msg, msg_details):
''' received event from mass '''
if event_type == 'state_changed':
if event_data['entity_id'] in self._tracked_entities:
self._tracked_entities[event_data['entity_id']] = event_data['new_state']
- self.mass.create_task(
+ self.mass.event_loop.create_task(
self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, event_data))
elif event_type == 'call_service' and event_data['domain'] == 'media_player':
await self.__handle_player_command(event_data['service'], event_data['service_data'])
import aiohttp
import subprocess
import gc
+import shlex
from .constants import EVENT_STREAM_STARTED, EVENT_STREAM_ENDED
from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size
async def setup(self):
''' async initialize of module '''
- pass
- # self.mass.create_task(
- # asyncio.start_server(self.sockets_streamer, '0.0.0.0', 8093))
+ pass # we have nothing to initialize
async def stream(self, http_request):
'''
# we must consume the data to prevent hanging subprocess instances
continue
# put chunk in buffer
- self.mass.create_task(
+ self.mass.run_task(
buffer.write(audio_chunk), wait_for_result=True,
ignore_exception=(BrokenPipeError,ConnectionResetError))
# all chunks received: streaming finished
LOGGER.debug("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name))
else:
# indicate EOF if no more data
- self.mass.create_task(
+ self.mass.run_task(
buffer.write_eof(), wait_for_result=True,
ignore_exception=(BrokenPipeError,ConnectionResetError))
LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name))
pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
args = 'sox -t %s - -t flac -C 0 -' % pcm_args
# start sox process
- sox_proc = subprocess.Popen(args, shell=True,
+ args = shlex.split(args)
+ sox_proc = subprocess.Popen(args, shell=False,
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
def fill_buffer():
if not chunk:
break
if chunk and not cancelled.is_set():
- self.mass.create_task(buffer.write(chunk),
+ self.mass.run_task(buffer.write(chunk),
wait_for_result=True, ignore_exception=(BrokenPipeError,ConnectionResetError))
del chunk
# indicate EOF if no more data
if not cancelled.is_set():
- self.mass.create_task(buffer.write_eof(),
+ self.mass.run_task(buffer.write_eof(),
wait_for_result=True, ignore_exception=(BrokenPipeError,ConnectionResetError))
# start fill buffer task in background
fill_buffer_thread = threading.Thread(target=fill_buffer)
# sort by quality and check track availability
for prov_media in sorted(queue_item.provider_ids,
key=operator.itemgetter('quality'), reverse=True):
- streamdetails = self.mass.create_task(
+ streamdetails = self.mass.run_task(
self.mass.music.providers[prov_media['provider']].get_stream_details(prov_media['item_id']),
wait_for_result=True)
if streamdetails:
if streamdetails["content_type"] == 'aac':
# support for AAC created with ffmpeg in between
args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_options)
- elif streamdetails['type'] == 'url':
+ process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE)
+ elif streamdetails['type'] in ['url', 'file']:
args = 'sox -t %s "%s" -t %s - %s' % (streamdetails["content_type"],
streamdetails["path"], outputfmt, sox_options)
+ args = shlex.split(args)
+ process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE)
elif streamdetails['type'] == 'executable':
args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"],
streamdetails["content_type"], outputfmt, sox_options)
- # start sox process
- process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE)
+ process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE)
+ else:
+ LOGGER.warning(f"no streaming options for {queue_item.name}")
+ yield (True, b'')
+ return
# fire event that streaming has started for this track
- self.mass.create_task(
+ self.mass.run_task(
self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails))
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
bytes_sent += len(chunk)
yield (False, chunk)
# fire event that streaming has ended
- self.mass.create_task(self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails))
+ self.mass.run_task(self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails))
# send task to background to analyse the audio
if queue_item.media_type == MediaType.Track:
self.mass.event_loop.run_in_executor(None, self.__analyze_audio, streamdetails)
''' get player specific sox effect options '''
sox_options = []
# volume normalisation
- gain_correct = self.mass.create_task(
+ gain_correct = self.mass.run_task(
self.mass.players.get_gain_correct(
player.player_id, streamdetails["item_id"], streamdetails["provider"]),
wait_for_result=True)
if item_key in self.analyze_jobs:
return # prevent multiple analyze jobs for same track
self.analyze_jobs[item_key] = True
- track_loudness = self.mass.create_task(self.mass.db.get_track_loudness(
+ track_loudness = self.mass.run_task(self.mass.db.get_track_loudness(
streamdetails["item_id"], streamdetails["provider"]), wait_for_result=True)
if track_loudness == None:
# only when needed we do the analyze stuff
meter = pyloudnorm.Meter(rate) # create BS.1770 meter
loudness = meter.integrated_loudness(data) # measure loudness
del data
- self.mass.create_task(
+ self.mass.run_task(
self.mass.db.set_track_loudness(streamdetails["item_id"], streamdetails["provider"], loudness))
del audio_data
LOGGER.debug("Integrated loudness of track %s is: %s" %(item_key, loudness))
# create fade-in part
fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
args = 'sox --ignore-length -t %s - -t %s %s fade t %s' % (pcm_args, pcm_args, fadeinfile.name, fade_length)
- process = subprocess.Popen(args, shell=True, stdin=subprocess.PIPE)
+ args = shlex.split(args)
+ process = subprocess.Popen(args, shell=False, stdin=subprocess.PIPE)
process.communicate(fade_in_part)
# create fade-out part
fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
args = 'sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse' % (pcm_args, pcm_args, fadeoutfile.name, fade_length)
- process = subprocess.Popen(args, shell=True,
+ args = shlex.split(args)
+ process = subprocess.Popen(args, shell=False,
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
process.communicate(fade_out_part)
# create crossfade using sox and some temp files
# TODO: figure out how to make this less complex and without the tempfiles
args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args)
- process = subprocess.Popen(args, shell=True,
+ args = shlex.split(args)
+ process = subprocess.Popen(args, shell=False,
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
crossfade_part, stderr = process.communicate()
fadeinfile.close()
async def search(self, searchstring, media_types=List[MediaType], limit=5):
''' perform search on the provider '''
- raise NotImplementedError
+ return {
+ "artists": [],
+ "albums": [],
+ "tracks": [],
+ "playlists": []
+ }
async def get_library_artists(self) -> List[Artist]:
''' retrieve library artists from the provider '''
- raise NotImplementedError
+ return []
async def get_library_albums(self) -> List[Album]:
''' retrieve library albums from the provider '''
- raise NotImplementedError
+ return []
async def get_library_tracks(self) -> List[Track]:
''' retrieve library tracks from the provider '''
- raise NotImplementedError
+ return []
async def get_playlists(self) -> List[Playlist]:
''' retrieve library/subscribed playlists from the provider '''
- raise NotImplementedError
+ return []
async def get_radios(self) -> List[Radio]:
''' retrieve library/subscribed radio stations from the provider '''
- raise NotImplementedError
+ return []
async def get_artist(self, prov_item_id) -> Artist:
''' get full artist details by id '''
''' return the content type for the given track when it will be streamed'''
raise NotImplementedError
- async def get_stream(self, track_id):
- ''' get audio stream for a track '''
- raise NotImplementedError
-
-
-class PlayerProvider():
- '''
- Model for a Playerprovider
- Common methods usable for every provider
- Provider specific __get methods shoud be overriden in the provider specific implementation
- '''
- name = 'My great Musicplayer provider' # display name
- prov_id = 'my_provider' # used as id
- icon = ''
-
- def __init__(self, mass):
- self.mass = mass
-
- ### Common methods and properties ####
-
- async def players(self):
- ''' return all players for this provider '''
- return await self.mass.provider_players(self.prov_id)
-
- async def get_player(self, player_id):
- ''' return player by id '''
- return await self.mass.get_player(player_id)
-
- async def add_player(self, player_id, name='', is_group=False):
- ''' register a new player '''
- return await self.mass.players.add_player(player_id,
- self.prov_id, name=name, is_group=is_group)
-
- async def remove_player(self, player_id):
- ''' remove a player '''
- return await self.mass.players.remove_player(player_id)
-
- ### Provider specific implementation #####
-
- async def player_config_entries(self):
- ''' get the player config entries for this provider (list with key/value pairs)'''
- return [
- (CONF_ENABLED, True, CONF_ENABLED)
- ]
-
- async def play_media(self, player_id, media_items:List[Track], queue_opt='play'):
- '''
- play media on a player
- params:
- - player_id: id of the player
- - media_items: List of Tracks to play, each Track will contain uri attribute (e.g. spotify:track:1234 or http://pathtostream)
- - queue_opt:
- replace: replace whatever is currently playing with this media
- next: the given media will be played after the currently playing track
- add: add to the end of the queue
- play: keep existing queue but play the given item(s) now first
- '''
- raise NotImplementedError
-
- async def player_command(self, player_id, cmd:str, cmd_args=None):
- ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
+ async def get_stream_details(self, track_id):
+ ''' get streamdetails for a track '''
raise NotImplementedError
-
-
-
+
\ No newline at end of file
''' [PROTECTED] set (real) name of this player '''
if name != self._name:
self._name = name
- self.mass.create_task(self.update())
+ self.mass.event_loop.create_task(self.update())
@property
def is_group(self):
''' [PROTECTED] set group_childs property of this player '''
if group_childs != self._group_childs:
self._group_childs = group_childs
- self.mass.create_task(self.update())
+ self.mass.event_loop.create_task(self.update())
for child_player_id in group_childs:
- self.mass.create_task(
+ self.mass.event_loop.create_task(
self.mass.players.trigger_update(child_player_id))
def add_group_child(self, child_player_id):
''' add player as child to this group player '''
if not child_player_id in self._group_childs:
self._group_childs.append(child_player_id)
- self.mass.create_task(self.update())
- self.mass.create_task(
+ self.mass.event_loop.create_task(self.update())
+ self.mass.event_loop.create_task(
self.mass.players.trigger_update(child_player_id))
def remove_group_child(self, child_player_id):
''' remove player as child from this group player '''
if child_player_id in self._group_childs:
self._group_childs.remove(child_player_id)
- self.mass.create_task(self.update())
- self.mass.create_task(
+ self.mass.event_loop.create_task(self.update())
+ self.mass.event_loop.create_task(
self.mass.players.trigger_update(child_player_id))
@property
''' [PROTECTED] set state property of this player '''
if state != self._state:
self._state = state
- self.mass.create_task(self.update(update_queue=True))
+ self.mass.event_loop.create_task(self.update(update_queue=True))
@property
def powered(self):
''' [PROTECTED] set (real) power state for this player '''
if powered != self._powered:
self._powered = powered
- self.mass.create_task(self.update())
+ self.mass.event_loop.create_task(self.update())
@property
def cur_time(self):
if cur_time != self._cur_time:
self._cur_time = cur_time
self._media_position_updated_at = time.time()
- self.mass.create_task(self.update(update_queue=True))
+ self.mass.event_loop.create_task(self.update(update_queue=True))
@property
def media_position_updated_at(self):
''' [PROTECTED] set cur_uri (uri loaded in player) property of this player '''
if cur_uri != self._cur_uri:
self._cur_uri = cur_uri
- self.mass.create_task(self.update(update_queue=True))
+ self.mass.event_loop.create_task(self.update(update_queue=True))
@property
def volume_level(self):
volume_level = try_parse_int(volume_level)
if volume_level != self._volume_level:
self._volume_level = volume_level
- self.mass.create_task(self.update())
+ self.mass.event_loop.create_task(self.update())
# trigger update on group player
for group_parent_id in self.group_parents:
- self.mass.create_task(
+ self.mass.event_loop.create_task(
self.mass.players.trigger_update(group_parent_id))
@property
is_muted = try_parse_bool(is_muted)
if is_muted != self._muted:
self._muted = is_muted
- self.mass.create_task(self.update())
+ self.mass.event_loop.create_task(self.update())
@property
def enabled(self):
# shuffle requested
self._shuffle_enabled = True
await self.load(self._items)
- self.mass.create_task(self._player.update())
+ self.mass.event_loop.create_task(self._player.update())
elif self._shuffle_enabled and not enable_shuffle:
self._shuffle_enabled = False
# TODO: Unshuffle the list ?
- self.mass.create_task(self._player.update())
+ self.mass.event_loop.create_task(self._player.update())
async def next(self):
''' request next track in queue '''
# account for track changing state so trigger track change after 1 second
if self._last_track and self._last_track.streamdetails:
self._last_track.streamdetails["seconds_played"] = self._last_item_time
- self.mass.create_task(
- self.mass.signal_event(EVENT_PLAYBACK_STOPPED, self._last_track.streamdetails))
+ await self.mass.signal_event(EVENT_PLAYBACK_STOPPED, self._last_track.streamdetails)
if new_track and new_track.streamdetails:
- self.mass.create_task(
- self.mass.signal_event(EVENT_PLAYBACK_STARTED, new_track.streamdetails))
+ await self.mass.signal_event(EVENT_PLAYBACK_STARTED, new_track.streamdetails)
self._last_track = new_track
self.mass.event_loop.run_in_executor(None, self.__save_to_file)
if self._last_player_state != self._player.state:
import operator
import os
-from .utils import run_periodic, LOGGER, try_supported, load_provider_modules
+from .utils import run_periodic, LOGGER, load_provider_modules
from .models.media_types import MediaType, Track, Artist, Album, Playlist, Radio
-from .constants import CONF_KEY_MUSICPROVIDERS
+from .constants import CONF_KEY_MUSICPROVIDERS, EVENT_MUSIC_SYNC_STARTED, EVENT_MUSIC_SYNC_COMPLETED
class MusicManager():
for prov in self.providers.values():
await prov.setup()
# schedule sync task
- self.mass.create_task(self.sync_music_providers())
+ self.mass.event_loop.create_task(self.sync_music_providers())
async def item(self, item_id, media_type:MediaType, provider='database', lazy=True):
''' get single music item by id and media type'''
# actually add the tracks to the playlist on the provider
await self.providers[playlist_prov['provider']].add_playlist_tracks(playlist_prov['item_id'], track_ids_to_add)
# schedule sync
- self.mass.create_task(self.sync_playlist_tracks(playlist.item_id, playlist_prov['provider'], playlist_prov['item_id']))
+ self.mass.event_loop.create_task(self.sync_playlist_tracks(playlist.item_id, playlist_prov['provider'], playlist_prov['item_id']))
@run_periodic(3600)
async def sync_music_providers(self):
''' periodic sync of all music providers '''
if self.sync_running:
return
+ LOGGER.info("Music provider sync started")
for prov_id in self.providers.keys():
self.sync_running = prov_id
+ await self.mass.signal_event(EVENT_MUSIC_SYNC_STARTED, prov_id)
# sync library items for each provider (if supported)
- await try_supported(self.sync_library_artists(prov_id))
- await try_supported(self.sync_library_albums(prov_id))
- await try_supported(self.sync_library_tracks(prov_id))
- await try_supported(self.sync_playlists(prov_id))
- await try_supported(self.sync_radios(prov_id))
+ await self.sync_library_artists(prov_id)
+ await self.sync_library_albums(prov_id)
+ await self.sync_library_tracks(prov_id)
+ await self.sync_playlists(prov_id)
+ await self.sync_radios(prov_id)
+ LOGGER.info("Music provider sync completed")
+ await self.mass.signal_event(EVENT_MUSIC_SYNC_COMPLETED, None)
self.sync_running = None
async def sync_library_artists(self, prov_id):
''' parse qobuz album object to generic layout '''
album = Album()
if not album_obj.get('id') or not album_obj["streamable"] or not album_obj["displayable"]:
- # some safety checks
- LOGGER.warning("invalid/unavailable album found: %s" % album_obj.get('id'))
+ # do not return unavailable items
return None
album.item_id = album_obj['id']
album.provider = self.prov_id
''' parse qobuz track object to generic layout '''
track = Track()
if not track_obj.get('id') or not track_obj["streamable"] or not track_obj["displayable"]:
- # some safety checks
- LOGGER.warning("invalid/unavailable track found: %s - %s" % (track_obj.get('id'), track_obj.get('name')))
+ # do not return unavailable items
return None
track.item_id = track_obj['id']
track.provider = self.prov_id
import time
import concurrent
from asyncio_throttle import Throttler
-import json
import aiohttp
from ..cache import use_cache
-from ..utils import run_periodic, LOGGER, parse_track_title
+from ..utils import run_periodic, LOGGER, parse_track_title, json
from ..app_vars import get_app_var
from ..models import MusicProvider, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist
from ..constants import CONF_USERNAME, CONF_PASSWORD, CONF_ENABLED, CONF_TYPE_PASSWORD
async def get_stream_details(self, track_id):
''' return the content details for the given track when it will be streamed'''
+ # make sure there is a valid token in cache
+ await self.get_token()
spotty = self.get_spotty_binary()
- spotty_exec = "%s -n temp -u %s -p %s --pass-through --single-track %s" %(spotty, self._username, self._password, track_id)
+ spotty_exec = '%s -n temp -c "%s" --pass-through --single-track %s' %(spotty, self.mass.datapath, track_id)
return {
"type": "executable",
"path": spotty_exec,
if 'track' in track_obj:
track_obj = track_obj['track']
if track_obj['is_local'] or not track_obj['id'] or not track_obj['is_playable']:
- LOGGER.warning("invalid/unavailable track found: %s - %s" % (track_obj.get('id'), track_obj.get('name')))
+ # do not return unavailable items
return None
track = Track()
track.item_id = track_obj['id']
tokeninfo = {}
if not self._username or not self._password:
return tokeninfo
- # try with spotipy-token module first, fallback to spotty
- try:
- import spotify_token as st
- data = st.start_session(self._username, self._password)
- if data and len(data) == 2:
- tokeninfo = {"accessToken": data[0], "expiresIn": data[1] - int(time.time()), "expiresAt":data[1] }
- except Exception as exc:
- LOGGER.debug(exc)
- if not tokeninfo:
- # fallback to spotty approach
- import subprocess
- scopes = [
- "user-read-playback-state",
- "user-read-currently-playing",
- "user-modify-playback-state",
- "playlist-read-private",
- "playlist-read-collaborative",
- "playlist-modify-public",
- "playlist-modify-private",
- "user-follow-modify",
- "user-follow-read",
- "user-library-read",
- "user-library-modify",
- "user-read-private",
- "user-read-email",
- "user-read-birthdate",
- "user-top-read"]
- scope = ",".join(scopes)
- args = [self.get_spotty_binary(), "-t", "--client-id", get_app_var(2), "--scope", scope, "-n", "temp-spotty", "-u", self._username, "-p", self._password, "--disable-discovery"]
- spotty = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- stdout, stderr = spotty.communicate()
- result = json.loads(stdout)
- # transform token info to spotipy compatible format
- if result and "accessToken" in result:
- tokeninfo = result
- tokeninfo['expiresAt'] = tokeninfo['expiresIn'] + int(time.time())
+ # retrieve token with spotty
+ tokeninfo = await self.mass.event_loop.run_in_executor(None, self.__get_token)
if tokeninfo:
self.__auth_token = tokeninfo
self.sp_user = await self.__get_data("me")
raise Exception("Can't get Spotify token for user %s" % self._username)
return tokeninfo
+ def __get_token(self):
+ ''' get spotify auth token with spotty bin '''
+ # get token with spotty
+ scopes = [
+ "user-read-playback-state",
+ "user-read-currently-playing",
+ "user-modify-playback-state",
+ "playlist-read-private",
+ "playlist-read-collaborative",
+ "playlist-modify-public",
+ "playlist-modify-private",
+ "user-follow-modify",
+ "user-follow-read",
+ "user-library-read",
+ "user-library-modify",
+ "user-read-private",
+ "user-read-email",
+ "user-read-birthdate",
+ "user-top-read"]
+ scope = ",".join(scopes)
+ args = [self.get_spotty_binary(), "-t",
+ "--client-id", get_app_var(2),
+ "--scope", scope,
+ "-n", "temp-spotty",
+ "-u", self._username,
+ "-p", self._password,
+ "-c", self.mass.datapath,
+ "--disable-discovery"]
+ import subprocess
+ spotty = subprocess.Popen(args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
+ stdout, stderr = spotty.communicate()
+ result = json.loads(stdout)
+ # transform token info to spotipy compatible format
+ if result and "accessToken" in result:
+ tokeninfo = result
+ tokeninfo['expiresAt'] = tokeninfo['expiresIn'] + int(time.time())
+ return tokeninfo
+
async def __get_all_items(self, endpoint, params={}, limit=0, offset=0, cache_checksum=None):
''' get all items from a paged list '''
if not cache_checksum:
self.cur_uri = mediastatus.content_id
self.cur_time = mediastatus.adjusted_current_time
if self._state == PlayerState.Playing and self.__cc_report_progress_task == None:
- self.__cc_report_progress_task = self.mass.create_task(self.__report_progress())
+ self.__cc_report_progress_task = self.mass.event_loop.create_task(self.__report_progress())
class ChromecastProvider(PlayerProvider):
''' support for ChromeCast Audio '''
async def setup(self):
''' perform async setup '''
- self.mass.create_task(
+ self.mass.event_loop.create_task(
self.__periodic_chromecast_discovery())
async def __handle_group_members_update(self, mz, added_player=None, removed_player=None):
if not player.cc.socket_client or not player.cc.socket_client.is_connected:
# cleanup cast object
del player.cc
- self.mass.create_task(self.remove_player(player.player_id))
+ self.mass.run_task(self.remove_player(player.player_id))
# search for available chromecasts
from pychromecast.discovery import start_discovery, stop_discovery
def discovered_callback(name):
chromecast.register_status_listener(status_listener)
chromecast.media_controller.register_status_listener(status_listener)
player.cc.wait()
- self.mass.create_task(self.add_player(player))
+ self.mass.run_task(self.add_player(player))
if player.mz:
player.mz.update_members()
self.player_id = player_id
def new_cast_status(self, status):
''' chromecast status changed (like volume etc.)'''
- self.mass.create_task(
+ self.mass.run_task(
self.__handle_callback(caststatus=status))
def new_media_status(self, status):
''' mediacontroller has new state '''
- self.mass.create_task(
+ self.mass.run_task(
self.__handle_callback(mediastatus=status))
def new_connection_status(self, status):
''' will be called when the connection changes '''
await asyncio.sleep(1)
self.__sonos_report_progress_task = None
- def _update_state(self, event=None):
+ async def update_state(self, event=None):
''' update state, triggerer by event '''
if event:
variables = event.variables
rel_time = self.__timespan_secs(position_info.get("RelTime"))
self.cur_time = rel_time
if self._state == PlayerState.Playing and self.__sonos_report_progress_task == None:
- self.__sonos_report_progress_task = self.mass.create_task(self.__report_progress())
+ self.__sonos_report_progress_task = self.mass.event_loop.create_task(self.__report_progress())
@staticmethod
def __convert_state(sonos_state):
async def setup(self):
''' perform async setup '''
- self.mass.create_task(
+ self.mass.event_loop.create_task(
self.__periodic_discovery())
@run_periodic(1800)
# remove any disconnected players...
for player in self.players:
if not player.is_group and not player.soco.uid in new_device_ids:
- self.mass.create_task(self.remove_player(player.player_id))
+ self.mass.run_task(self.remove_player(player.player_id))
# process new players
for device in discovered_devices:
if device.uid not in cur_player_ids and device.is_visible:
player._media_position_updated_at = None
# handle subscriptions to events
def subscribe(service, action):
- queue = _ProcessSonosEventQueue(action)
+ queue = _ProcessSonosEventQueue(self.mass, action)
sub = service.subscribe(auto_renew=True, event_queue=queue)
player._subscriptions.append(sub)
- subscribe(soco_device.avTransport, player._update_state)
- subscribe(soco_device.renderingControl, player._update_state)
+ subscribe(soco_device.avTransport, player.update_state)
+ subscribe(soco_device.renderingControl, player.update_state)
subscribe(soco_device.zoneGroupTopology, self.__topology_changed)
- self.mass.create_task(self.add_player(player))
+ self.mass.run_task(self.add_player(player))
return player
def __process_groups(self, sonos_groups):
group_player.name = group.label
group_player.group_childs = [item.uid for item in group.members]
- def __topology_changed(self, event=None):
+ async def __topology_changed(self, event=None):
'''
received topology changed event
from one of the sonos players
class _ProcessSonosEventQueue:
"""Queue like object for dispatching sonos events."""
- def __init__(self, handler):
+ def __init__(self, mass, handler):
"""Initialize Sonos event queue."""
self._handler = handler
+ self.mass = mass
def put(self, item, block=True, timeout=None):
"""Process event."""
try:
- self._handler(item)
+ self.mass.run_task(self._handler(item), wait_for_result=True)
except Exception as ex:
LOGGER.warning("Error calling %s: %s", self._handler, ex)
\ No newline at end of file
async def setup(self):
''' async initialize of module '''
# start slimproto server
- self.mass.create_task(
+ self.mass.event_loop.create_task(
asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483))
# setup discovery
- self.mass.create_task(self.start_discovery())
+ self.mass.event_loop.create_task(self.start_discovery())
async def start_discovery(self):
transport, protocol = await self.mass.event_loop.create_datagram_endpoint(
player_id = str(device_mac).lower()
device_type = devices.get(dev_id, 'unknown device')
player = PySqueezePlayer(self.mass, player_id, self.prov_id, device_type, writer)
- self.mass.create_task(self.mass.players.add_player(player))
+ self.mass.event_loop.create_task(self.mass.players.add_player(player))
elif player != None:
player.process_msg(operation, packet)
self.send_frame(b"setd", struct.pack("B", 4))
# TODO: remember last volume and power state
- self.mass.create_task(self.volume_set(40))
- self.mass.create_task(self.power_off())
+ self.mass.event_loop.create_task(self.volume_set(40))
+ self.mass.event_loop.create_task(self.power_off())
self._heartbeat_task = asyncio.create_task(self.__send_heartbeat())
async def cmd_stop(self):
''' send command to Squeeze player'''
packet = struct.pack('!H', len(data) + 4) + command + data
self._writer.write(packet)
- self.mass.create_task(self._writer.drain())
+ self.mass.event_loop.create_task(self._writer.drain())
def send_version(self):
self.send_frame(b'vers', b'7.8')
LOGGER.debug("Decoder Ready for next track")
next_item = self.queue.next_item
if next_item:
- self.mass.create_task(
+ self.mass.event_loop.create_task(
self.__send_play(next_item.uri))
def stat_STMe(self, data):
''' async initialize of module '''
await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_STATE)
await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_REGISTER)
- self.mass.create_task(self.check_players())
+ self.mass.event_loop.create_task(self.check_players())
async def handle_mass_event(self, msg, msg_details):
''' received event for the webplayer component '''
from .constants import CONF_KEY_MUSICPROVIDERS, CONF_ENABLED
+IS_HASSIO = os.path.isfile('/data/options.json')
def run_periodic(period):
def scheduler(fcn):
return wrapper
return scheduler
-async def try_supported(task):
- ''' try to execute a task and pass NotImplementedError Exception '''
- ret = None
- try:
- ret = await task
- except NotImplementedError:
- pass
- return ret
-
def filename_from_string(string):
''' create filename from unsafe string '''
keepcharacters = (' ','.','_')
import concurrent
import threading
from .models.media_types import MediaItem, MediaType, media_type_from_string
-from .utils import run_periodic, LOGGER, run_async_background_task, get_ip, json_serializer
+from .utils import run_periodic, LOGGER, IS_HASSIO, run_async_background_task, get_ip, json_serializer
CONF_KEY = 'web'
+
CONFIG_ENTRIES = [
('http_port', 8095, 'web_http_port'),
('https_port', 8096, 'web_https_port'),
self.mass = mass
# load/create/update config
config = self.mass.config.create_module_config(CONF_KEY, CONFIG_ENTRIES)
+ enable_ssl = config['ssl_certificate'] and config['ssl_key']
if config['ssl_certificate'] and not os.path.isfile(
config['ssl_certificate']):
- raise FileNotFoundError(
- "SSL certificate file not found: %s" % config['ssl_certificate'])
+ enable_ssl = False
+ LOGGER.warning("SSL certificate file not found: %s" % config['ssl_certificate'])
if config['ssl_key'] and not os.path.isfile(config['ssl_key']):
- raise FileNotFoundError(
- "SSL certificate key file not found: %s" % config['ssl_key'])
- self.local_ip = get_ip()
+ enable_ssl = False
+ LOGGER.warning( "SSL certificate key file not found: %s" % config['ssl_key'])
self.http_port = config['http_port']
self.https_port = config['https_port']
- self._enable_ssl = config['ssl_certificate'] and config['ssl_key']
+ self._enable_ssl = enable_ssl
+ self.local_ip = get_ip()
self.config = config
async def setup(self):
await self.runner.setup()
http_site = web.TCPSite(self.runner, '0.0.0.0', self.http_port)
await http_site.start()
+ LOGGER.info("Started HTTP webserver on port %s" % self.http_port)
if self._enable_ssl:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(self.config['ssl_certificate'], self.config['ssl_key'])
- https_site = web.TCPSite(self.runner, '0.0.0.0', self.https_port, ssl_context=ssl_context)
+ https_site = web.TCPSite(self.runner, '0.0.0.0', self.config['https_port'], ssl_context=ssl_context)
await https_site.start()
+ LOGGER.info("Started HTTPS webserver on port %s" % self.config['https_port'])
+ if IS_HASSIO:
+ # host additional http port for hassio ingress
+ headers = {"X-HASSIO-KEY": os.environ["HASSIO_TOKEN"]}
+ url = "http://hassio/addons/self/info"
+ async with aiohttp.ClientSession().get(url, headers=headers, verify_ssl=False) as response:
+ result = await response.json()
+ ingress_port = int(result["ingress_port"])
+ ingress_site = web.TCPSite(self.runner, '0.0.0.0', ingress_port)
+ await ingress_site.start()
+ LOGGER.info("Started INGRESS webserver on port %s" % ingress_port)
+
+
async def get_items(self, request):
''' get multiple library items'''