event_loop.set_debug(True)
logger.setLevel(logging.DEBUG)
logging.getLogger('aiosqlite').setLevel(logging.INFO)
- logging.getLogger('asyncio').setLevel(logging.INFO)
+ logging.getLogger('asyncio').setLevel(logging.WARNING)
else:
logger.setLevel(logging.INFO)
# start music assistant!
import json
import time
import logging
+import threading
from .database import Database
from .config import MassConfig
def handle_exception(self, loop, context):
''' global exception handler '''
+ LOGGER.debug(f"Caught exception: {context}")
loop.default_exception_handler(context)
- #LOGGER.exception(f"Caught exception: {context}")
async def signal_event(self, msg, msg_details:dict):
''' signal (systemwide) event '''
- if not (msg_details == None or isinstance(msg_details, (str, int, dict))):
+ if not (msg_details == None or isinstance(msg_details, (str, dict))):
msg_details = serialize_values(msg_details)
- LOGGER.debug("Event: %s" %(msg))
listeners = list(self.event_listeners.values())
for callback, eventfilter in listeners:
if not eventfilter or eventfilter in msg:
async def remove_event_listener(self, cb_id):
''' remove callback from our event listeners '''
- self.event_listeners.pop(cb_id, None)
\ No newline at end of file
+ self.event_listeners.pop(cb_id, None)
+
+ def create_task(self, corofcn, wait_for_result=False, ignore_exception=None):
+ ''' helper to create a new task on the main event loop '''
+ if threading.current_thread() is threading.main_thread():
+ if wait_for_result:
+ raise Exception("can not wait for result in main event loop!")
+ return self.event_loop.create_task(corofcn)
+ else:
+ # threadsafe
+ future = asyncio.run_coroutine_threadsafe(corofcn, self.event_loop)
+ if wait_for_result:
+ try:
+ return future.result()
+ except Exception as exc:
+ if ignore_exception and isinstance(exc, ignore_exception):
+ return None
+ raise exc
+ return future
class Cache(object):
'''basic stateless caching system '''
+ # TODO: convert to aiosql
_database = None
def __init__(self, datapath):
''' async initialize of cache module '''
asyncio.create_task(self._do_cleanup())
- async def get(self, endpoint, checksum=""):
+ async def get_async(self, endpoint, checksum=""):
+ return await asyncio.get_running_loop().run_in_executor(None, self.get, endpoint, checksum)
+
+ async def set_async(self, endpoint, data, checksum="", expiration=datetime.timedelta(days=14)):
+ return await asyncio.get_running_loop().run_in_executor(None, self.set, endpoint, data, checksum, expiration)
+
+ def get(self, endpoint, checksum=""):
'''
get object from cache and return the results
endpoint: the (unique) name of the cache object as reference
result = eval(cache_data[1])
return result
- async def set(self, endpoint, data, checksum="", expiration=datetime.timedelta(days=14)):
+ def set(self, endpoint, data, checksum="", expiration=datetime.timedelta(days=14)):
'''
set data in cache
'''
self._execute_sql(query, (endpoint, expires, data, checksum))
@run_periodic(3600)
+ async def auto_cleanup(self):
+ ''' scheduled auto cleanup task '''
+ asyncio.get_running_loop().run_in_executor(None, self._do_cleanup)
+
async def _do_cleanup(self):
'''perform cleanup task'''
cur_time = datetime.datetime.now()
else:
cache_str += ".%s%s" %(key,value)
cache_str = cache_str.lower()
- cachedata = await method_class.cache.get(cache_str, checksum=cache_checksum)
+ cachedata = await method_class.cache.get_async(cache_str, checksum=cache_checksum)
if cachedata is not None:
return cachedata
else:
result = await func(*args, **kwargs)
- await method_class.cache.set(cache_str, result, checksum=cache_checksum, expiration=datetime.timedelta(days=cache_days, hours=cache_hours))
+ await method_class.cache.set_async(cache_str, result, checksum=cache_checksum, expiration=datetime.timedelta(days=cache_days, hours=cache_hours))
return result
return wrapped
return wrapper
return
self.http_session = aiohttp.ClientSession(
loop=self.mass.event_loop, connector=aiohttp.TCPConnector())
- self.mass.event_loop.create_task(self.__hass_websocket())
+ self.mass.create_task(self.__hass_websocket())
await self.mass.add_event_listener(self.mass_event, EVENT_PLAYER_CHANGED)
await self.mass.add_event_listener(self.mass_event, EVENT_PLAYER_ADDED)
- self.mass.event_loop.create_task(self.__get_sources())
+ self.mass.create_task(self.__get_sources())
async def get_state_async(self, entity_id, attribute='state'):
''' get state of a hass entity (async)'''
else:
return state_obj
else:
- self.mass.event_loop.create_task(self.__request_state(entity_id))
+ self.mass.create_task(self.__request_state(entity_id))
return None
async def __request_state(self, entity_id):
state_obj = await self.__get_data('states/%s' % entity_id)
if 'state' in state_obj:
self._tracked_entities[entity_id] = state_obj
- self.mass.event_loop.create_task(
+ self.mass.create_task(
self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, state_obj))
async def mass_event(self, msg, msg_details):
if event_type == 'state_changed':
if event_data['entity_id'] in self._tracked_entities:
self._tracked_entities[event_data['entity_id']] = event_data['new_state']
- self.mass.event_loop.create_task(
+ self.mass.create_task(
self.mass.signal_event(EVENT_HASS_ENTITY_CHANGED, event_data))
elif event_type == 'call_service' and event_data['domain'] == 'media_player':
await self.__handle_player_command(event_data['service'], event_data['service_data'])
# LOGGER.info(data)
elif msg.type == aiohttp.WSMsgType.ERROR:
raise Exception("error in websocket")
+ except asyncio.CancelledError:
+ raise asyncio.CancelledError()
except Exception as exc:
LOGGER.exception(exc)
await asyncio.sleep(10)
async def setup(self):
''' async initialize of module '''
pass
- # self.mass.event_loop.create_task(
+ # self.mass.create_task(
# asyncio.start_server(self.sockets_streamer, '0.0.0.0', 8093))
async def stream(self, http_request):
'''
start stream for a player
'''
- # make sure we have a valid player
+ # make sure we have valid params
player_id = http_request.match_info.get('player_id','')
player = await self.mass.players.get_player(player_id)
- assert(player)
+ if not player:
+ return web.Response(status=404, reason="Player not found")
+ if not player.queue.use_queue_stream:
+ queue_item_id = http_request.match_info.get('queue_item_id')
+ queue_item = await player.queue.by_item_id(queue_item_id)
+ if not queue_item:
+ return web.Response(status=404, reason="Invalid Queue item Id")
# prepare headers as audio/flac content
resp = web.StreamResponse(status=200, reason='OK',
- headers={
- 'Content-Type': 'audio/flac'
- })
+ headers={'Content-Type': 'audio/flac'})
await resp.prepare(http_request)
- # send content only on GET request
- if http_request.method.upper() != 'GET':
- return resp
- # stream audio
+ # run the streamer in executor to prevent the subprocess locking up our eventloop
cancelled = threading.Event()
if player.queue.use_queue_stream:
- # use queue stream
- bg_task = run_async_background_task(
- None,
- self.__stream_queue, player, resp, cancelled)
+ bg_task = self.mass.event_loop.run_in_executor(None,
+ self.__get_queue_stream, player, resp, cancelled)
else:
- # single track stream
- queue_item_id = http_request.match_info.get('queue_item_id')
- queue_item = await player.queue.by_item_id(queue_item_id)
- assert(queue_item)
- bg_task = run_async_background_task(
- None,
- self.__stream_single, player, queue_item, resp, cancelled)
+ bg_task = self.mass.event_loop.run_in_executor(None,
+ self.__get_queue_item_stream, player, queue_item, resp, cancelled)
# let the streaming begin!
try:
await asyncio.gather(bg_task)
except (asyncio.CancelledError, asyncio.TimeoutError):
- LOGGER.debug("stream request cancelled")
cancelled.set()
- # wait for bg_task to finish
- await asyncio.gather(bg_task)
raise asyncio.CancelledError()
return resp
- async def __stream_single(self, player, queue_item, buffer, cancelled):
+ def __get_queue_item_stream(self, player, queue_item, buffer, cancelled):
''' start streaming single queue track '''
LOGGER.debug("stream single queue track started for track %s on player %s" % (queue_item.name, player.name))
- audio_stream = self.__get_audio_stream(player, queue_item, cancelled)
- async for is_last_chunk, audio_chunk in audio_stream:
+ for is_last_chunk, audio_chunk in self.__get_audio_stream(player, queue_item, cancelled):
if cancelled.is_set():
# http session ended
# we must consume the data to prevent hanging subprocess instances
continue
# put chunk in buffer
- asyncio.run_coroutine_threadsafe(
- buffer.write(audio_chunk),
- self.mass.event_loop)
- # wait for the queue to consume the data
- if not cancelled.is_set():
- await asyncio.sleep(0.5)
+ self.mass.create_task(
+ buffer.write(audio_chunk), wait_for_result=True,
+ ignore_exception=BrokenPipeError)
# all chunks received: streaming finished
- gc.collect()
if cancelled.is_set():
LOGGER.debug("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name))
else:
# indicate EOF if no more data
- asyncio.run_coroutine_threadsafe(
- buffer.write_eof(),
- self.mass.event_loop)
+ self.mass.create_task(
+ buffer.write_eof(), wait_for_result=True,
+ ignore_exception=BrokenPipeError)
LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name))
- async def __stream_queue(self, player, buffer, cancelled):
+ def __get_queue_stream(self, player, buffer, cancelled):
''' start streaming all queue tracks '''
sample_rate = try_parse_int(player.settings['max_sample_rate'])
fade_length = try_parse_int(player.settings["crossfade_duration"])
pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
args = 'sox -t %s - -t flac -C 0 -' % pcm_args
# start sox process
- # we use normal subprocess instead of asyncio because of bug with executor
- # this should be fixed with python 3.8
sox_proc = subprocess.Popen(args, shell=True,
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
if not chunk:
break
if chunk and not cancelled.is_set():
- asyncio.run_coroutine_threadsafe(
- buffer.write(chunk), self.mass.event_loop)
+ self.mass.create_task(buffer.write(chunk),
+ wait_for_result=True, ignore_exception=BrokenPipeError)
del chunk
# indicate EOF if no more data
if not cancelled.is_set():
- asyncio.run_coroutine_threadsafe(
- buffer.write_eof(), self.mass.event_loop)
- LOGGER.debug("stream queue player %s: fill buffer completed" % player.name)
+ self.mass.create_task(buffer.write_eof(),
+ wait_for_result=True, ignore_exception=BrokenPipeError)
# start fill buffer task in background
fill_buffer_thread = threading.Thread(target=fill_buffer)
fill_buffer_thread.start()
prev_chunk = None
bytes_written = 0
# handle incoming audio chunks
- async for is_last_chunk, chunk in self.__get_audio_stream(
+ for is_last_chunk, chunk in self.__get_audio_stream(
player, queue_track, cancelled, chunksize=fade_bytes,
resample=sample_rate):
cur_chunk += 1
first_part, std_err = subprocess.Popen(args, shell=True,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE).communicate(prev_chunk + chunk)
+ if len(first_part) < fade_bytes:
+ # part is too short after the strip action?!
+ # so we just cut off at the fade position
+ first_part = prev_chunk+chunk
+ if len(first_part) >= fade_bytes:
+ first_part = first_part[fade_bytes:]
fade_in_part = first_part[:fade_bytes]
remaining_bytes = first_part[fade_bytes:]
del first_part
prev_chunk = None # needed to prevent this chunk being sent again
### HANDLE LAST PART OF TRACK
elif prev_chunk and is_last_chunk:
- # last chunk received so create the fadeout_part with the previous chunk and this chunk
+ # last chunk received so create the last_part with the previous chunk and this chunk
# and strip off silence
args = 'sox --ignore-length -t %s - -t %s - reverse silence 1 0.1 1%% reverse' % (pcm_args, pcm_args)
last_part, stderr = subprocess.Popen(args, shell=True,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE).communicate(prev_chunk + chunk)
+ if len(last_part) < fade_bytes:
+ # part is too short after the strip action
+ # so we just cut off at the fade position
+ last_part = prev_chunk+chunk
+ if len(last_part) >= fade_bytes:
+ last_part = last_part[:fade_bytes]
if not player.queue.crossfade_enabled:
# crossfading is not enabled so just pass the (stripped) audio data
sox_proc.stdin.write(last_part)
del chunk
else:
# handle crossfading support
- if len(last_part) < fade_bytes:
- # not enough data for crossfade duration after the strip action...
- last_part = prev_chunk + chunk
- if len(last_part) < fade_bytes:
- # still not enough data so we'll skip the crossfading
- LOGGER.debug("not enough data for fadeout so skip crossfade... %s" % len(last_part))
- sox_proc.stdin.write(last_part)
- bytes_written += len(last_part)
- del last_part
- else:
- # store fade section to be picked up for next track
- last_fadeout_data = last_part[-fade_bytes:]
- remaining_bytes = last_part[:-fade_bytes]
- # write remaining bytes
- sox_proc.stdin.write(remaining_bytes)
- bytes_written += len(remaining_bytes)
- del last_part
- del remaining_bytes
- del chunk
+ # store fade section to be picked up for next track
+ last_fadeout_data = last_part[-fade_bytes:]
+ remaining_bytes = last_part[:-fade_bytes]
+ # write remaining bytes
+ sox_proc.stdin.write(remaining_bytes)
+ bytes_written += len(remaining_bytes)
+ del last_part
+ del remaining_bytes
+ del chunk
### MIDDLE PARTS OF TRACK
else:
# middle part of the track
else:
prev_chunk = chunk
del chunk
- ### throttle to prevent entire track sitting in memory
- if not cancelled.is_set():
- await asyncio.sleep(fade_length)
# end of the track reached
if cancelled.is_set():
# break out the loop if the http session is cancelled
accurate_duration = bytes_written / int(sample_rate * 4 * 2)
queue_track.duration = accurate_duration
LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
- LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration))
# run garbage collect manually to avoid too much memory fragmentation
gc.collect()
# end of queue reached, pass last fadeout bits to final output
del sox_proc
# run garbage collect manually to avoid too much memory fragmentation
gc.collect()
- LOGGER.info("streaming of queue for player %s completed" % player.name)
+ if cancelled.is_set():
+ LOGGER.info("streaming of queue for player %s interrupted" % player.name)
+ else:
+ LOGGER.info("streaming of queue for player %s completed" % player.name)
- async def __get_audio_stream(self, player, queue_item, cancelled,
+ def __get_audio_stream(self, player, queue_item, cancelled,
chunksize=128000, resample=None):
''' get audio stream from provider and apply additional effects/processing where/if needed'''
# get stream details from provider
# sort by quality and check track availability
for prov_media in sorted(queue_item.provider_ids,
key=operator.itemgetter('quality'), reverse=True):
- streamdetails = asyncio.run_coroutine_threadsafe(
+ streamdetails = self.mass.create_task(
self.mass.music.providers[prov_media['provider']].get_stream_details(prov_media['item_id']),
- self.mass.event_loop).result()
+ wait_for_result=True)
if streamdetails:
streamdetails['player_id'] = player.player_id
if not 'item_id' in streamdetails:
yield (True, b'')
return
# get sox effects and resample options
- sox_options = await self.__get_player_sox_options(player, streamdetails)
+ sox_options = self.__get_player_sox_options(player, streamdetails)
outputfmt = 'flac -C 0'
if resample:
outputfmt = 'raw -b 32 -c 2 -e signed-integer'
args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"],
streamdetails["content_type"], outputfmt, sox_options)
# start sox process
- # we use normal subprocess instead of asyncio because of bug with executor
- # this should be fixed with python 3.8
process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE)
# fire event that streaming has started for this track
- asyncio.run_coroutine_threadsafe(
- self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails), self.mass.event_loop)
+ self.mass.create_task(
+ self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails))
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
bytes_sent = 0
chunk = process.stdout.read(chunksize)
if len(chunk) < chunksize:
# last chunk
- LOGGER.debug("last chunk received")
bytes_sent += len(chunk)
yield (True, chunk)
break
else:
bytes_sent += len(chunk)
yield (False, chunk)
-
# fire event that streaming has ended
- asyncio.run_coroutine_threadsafe(
- self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails), self.mass.event_loop)
- # send task to main event loop to analyse the audio
+ self.mass.create_task(self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails))
+ # send task to background to analyse the audio
if queue_item.media_type == MediaType.Track:
- self.mass.event_loop.call_soon_threadsafe(
- asyncio.ensure_future, self.__analyze_audio(streamdetails))
- # run garbage collect manually to avoid too much memory fragmentation
- gc.collect()
+ self.mass.event_loop.run_in_executor(None, self.__analyze_audio, streamdetails)
- async def __get_player_sox_options(self, player, streamdetails):
+ def __get_player_sox_options(self, player, streamdetails):
''' get player specific sox effect options '''
sox_options = []
# volume normalisation
- gain_correct = asyncio.run_coroutine_threadsafe(
+ gain_correct = self.mass.create_task(
self.mass.players.get_gain_correct(
player.player_id, streamdetails["item_id"], streamdetails["provider"]),
- self.mass.event_loop).result()
+ wait_for_result=True)
if gain_correct != 0:
sox_options.append('vol %s dB ' % gain_correct)
# downsample if needed
sox_options.append(player.settings['sox_options'])
return " ".join(sox_options)
- async def __analyze_audio(self, streamdetails):
+ def __analyze_audio(self, streamdetails):
''' analyze track audio, for now we only calculate EBU R128 loudness '''
item_key = '%s%s' %(streamdetails["item_id"], streamdetails["provider"])
if item_key in self.analyze_jobs:
return # prevent multiple analyze jobs for same track
self.analyze_jobs[item_key] = True
- track_loudness = await self.mass.db.get_track_loudness(
- streamdetails["item_id"], streamdetails["provider"])
+ track_loudness = self.mass.create_task(self.mass.db.get_track_loudness(
+ streamdetails["item_id"], streamdetails["provider"]), wait_for_result=True)
if track_loudness == None:
# only when needed we do the analyze stuff
LOGGER.debug('Start analyzing track %s' % item_key)
if streamdetails['type'] == 'url':
- async with aiohttp.ClientSession() as session:
- async with session.get(streamdetails["path"], verify_ssl=False) as resp:
- audio_data = await resp.read()
+ import urllib
+ audio_data = urllib.request.urlopen(streamdetails["path"]).read()
elif streamdetails['type'] == 'executable':
audio_data = subprocess.check_output(streamdetails["path"], shell=True)
# calculate BS.1770 R128 integrated loudness
meter = pyloudnorm.Meter(rate) # create BS.1770 meter
loudness = meter.integrated_loudness(data) # measure loudness
del data
- await self.mass.db.set_track_loudness(streamdetails["item_id"], streamdetails["provider"], loudness)
+ self.mass.create_task(
+ self.mass.db.set_track_loudness(streamdetails["item_id"], streamdetails["provider"], loudness))
del audio_data
LOGGER.debug("Integrated loudness of track %s is: %s" %(item_key, loudness))
self.analyze_jobs.pop(item_key, None)
process = subprocess.Popen(args, shell=True,
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
crossfade_part, stderr = process.communicate()
- LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
fadeinfile.close()
fadeoutfile.close()
del fadeinfile
del fadeoutfile
return crossfade_part
-
- async def start_stream(self, clients_needed):
- # wait for clients
- print("wait for clients...")
- track = asyncio.run_coroutine_threadsafe(
- self.mass.music.track('2741', provider='database'),
- self.mass.event_loop).result()
- player_id = '1523403a-4cc4-f151-29d1-758822807128'
- player = self.mass.players._players[player_id]
- cancelled = threading.Event()
- # wait for clients
- while len(self.stream_clients) < clients_needed:
- await asyncio.sleep(0.1)
- # start streaming
- while self.stream_clients:
- audio_stream = self.__get_audio_stream(player, track, cancelled)
- async for is_last_chunk, audio_chunk in audio_stream:
- for client in self.stream_clients:
- try:
- client.write(audio_chunk)
- await client.drain()
- except ConnectionResetError:
- print('client disconnected')
- client.close()
- self.stream_clients.remove(client)
- await asyncio.sleep(1)
- print("all clients disconnected")
- return
-
- async def add_client(self, client_writer, client_msg):
- print("new client connected!")
- for line in client_msg.decode().split('\r\n'):
- print(line)
- msg = 'HTTP/1.0 200 OK\r\n'
- msg += "Content-Type: audio/flac\r\n"
- msg += "Transfer-Encoding: chunked\r\n\r\n"
- client_writer.write(msg.encode())
- await client_writer.drain()
- self.stream_clients.append(client_writer)
- if len(self.stream_clients) == 1:
- bg_task = run_async_background_task(
- None,
- self.start_stream, 2)
-
- async def sockets_streamer(self, reader, writer):
- while True:
- request = await reader.read(1024)
- if request:
- await self.add_client(writer, request)
- else:
- print("client lost")
- break
-
#### Common implementation, should NOT be overrridden #####
- def __init__(self, mass, player_id, prov_id, is_group=False):
+ def __init__(self, mass, player_id, prov_id):
# private attributes
self.mass = mass
self._player_id = player_id # unique id for this player
self._name = ''
self._state = PlayerState.Stopped
self._group_childs = []
- self._last_group_parent = None
self._powered = False
self._cur_time = 0
self._media_position_updated_at = 0
self._volume_level = 0
self._muted = False
self._queue = PlayerQueue(mass, self)
- self._player_settings = None
+ self.__update_player_settings()
self._initialized = False
- self._last_event = 0
- self._update_cur_time_task = None
# public attributes
self.supports_queue = True # has native support for a queue
self.supports_gapless = False # has native gapless support
self.supports_crossfade = False # has native crossfading support
-
- def __del__(self):
- if self._update_cur_time_task:
- self._update_cur_time_task.cancel()
-
@property
def player_id(self):
''' [PROTECTED] player_id of this player '''
''' [PROTECTED] set (real) name of this player '''
if name != self._name:
self._name = name
- self.mass.event_loop.create_task(self.update())
+ self.mass.create_task(self.update())
@property
def is_group(self):
''' [PROTECTED] set group_childs property of this player '''
if group_childs != self._group_childs:
self._group_childs = group_childs
- self.mass.event_loop.create_task(self.update())
+ self.mass.create_task(self.update())
for child_player_id in group_childs:
- self.mass.event_loop.create_task(
+ self.mass.create_task(
self.mass.players.trigger_update(child_player_id))
def add_group_child(self, child_player_id):
''' add player as child to this group player '''
if not child_player_id in self._group_childs:
self._group_childs.append(child_player_id)
- self.mass.event_loop.create_task(self.update())
- self.mass.event_loop.create_task(
+ self.mass.create_task(self.update())
+ self.mass.create_task(
self.mass.players.trigger_update(child_player_id))
def remove_group_child(self, child_player_id):
''' remove player as child from this group player '''
if child_player_id in self._group_childs:
self._group_childs.remove(child_player_id)
- self.mass.event_loop.create_task(self.update())
- self.mass.event_loop.create_task(
+ self.mass.create_task(self.update())
+ self.mass.create_task(
self.mass.players.trigger_update(child_player_id))
@property
for group_parent_id in self.group_parents:
group_player = self.mass.players.get_player_sync(group_parent_id)
if group_player and group_player.state != PlayerState.Off:
- self._last_group_parent = group_parent_id
return group_player.state
return self._state
''' [PROTECTED] set state property of this player '''
if state != self._state:
self._state = state
- self.mass.event_loop.create_task(self.update(update_queue=True))
+ self.mass.create_task(self.update(update_queue=True))
@property
def powered(self):
''' [PROTECTED] set (real) power state for this player '''
if powered != self._powered:
self._powered = powered
- self.mass.event_loop.create_task(self.update())
+ self.mass.create_task(self.update())
@property
def cur_time(self):
if cur_time != self._cur_time:
self._cur_time = cur_time
self._media_position_updated_at = time.time()
- self.mass.event_loop.create_task(self.update(update_queue=True))
+ self.mass.create_task(self.update(update_queue=True))
@property
def media_position_updated_at(self):
''' [PROTECTED] set cur_uri (uri loaded in player) property of this player '''
if cur_uri != self._cur_uri:
self._cur_uri = cur_uri
- self.mass.event_loop.create_task(self.update(update_queue=True))
+ self.mass.create_task(self.update(update_queue=True))
@property
def volume_level(self):
volume_level = try_parse_int(volume_level)
if volume_level != self._volume_level:
self._volume_level = volume_level
- self.mass.event_loop.create_task(self.update())
+ self.mass.create_task(self.update())
# trigger update on group player
for group_parent_id in self.group_parents:
- self.mass.event_loop.create_task(
+ self.mass.create_task(
self.mass.players.trigger_update(group_parent_id))
@property
is_muted = try_parse_bool(is_muted)
if is_muted != self._muted:
self._muted = is_muted
- self.mass.event_loop.create_task(self.update())
+ self.mass.create_task(self.update())
@property
def enabled(self):
domain = self.settings['hass_power_entity'].split('.')[0]
service_data = { 'entity_id': self.settings['hass_power_entity']}
await self.mass.hass.call_service(domain, 'turn_on', service_data)
- # power on group parent if needed
- last_group_player = await self.mass.players.get_player(self._last_group_parent)
- if last_group_player:
- await last_group_player.power_on()
# handle play on power on
- elif self.settings.get('play_power_on'):
- await self.play()
+ if self.settings.get('play_power_on'):
+ # play player's own queue if it has items
+ if self._queue.items:
+ await self.play()
+ # fallback to the first group parent with items
+ else:
+ for group_parent_id in self.group_parents:
+ group_player = await self.mass.players.get_player(group_parent_id)
+ if group_player and group_player.queue.items:
+ await group_player.play()
+ break
async def power_off(self):
''' [PROTECTED] send power OFF command to player '''
async def update(self, update_queue=False):
''' [PROTECTED] signal player updated '''
- self.get_player_settings()
if not self._initialized:
return
# update queue state if player state changes
if update_queue:
await self.queue.update()
await self.mass.signal_event(EVENT_PLAYER_CHANGED, self.to_dict())
- if self._state == PlayerState.Playing and not self._update_cur_time_task and (time.time() - self._media_position_updated_at > 2):
- self._update_cur_time_task = self.mass.event_loop.create_task(self.__update_cur_time())
-
- async def __update_cur_time(self):
- ''' background task that keeps updating the current time '''
- while self._state == PlayerState.Playing:
- calc_time = self._cur_time + (time.time() - self._media_position_updated_at)
- self.cur_time = calc_time
- await asyncio.sleep(1)
- self._update_cur_time_task = None
@property
def settings(self):
- ''' [PROTECTED] get (or create) player config settings '''
- if self._player_settings:
- return self._player_settings
+ ''' [PROTECTED] get player config settings '''
+ if self.player_id in self.mass.config['player_settings']:
+ return self.mass.config['player_settings'][self.player_id]
else:
- return self.get_player_settings()
+ self.__update_player_settings()
+ return self.mass.config['player_settings'][self.player_id]
- def get_player_settings(self):
- ''' [PROTECTED] get (or create) player config settings '''
+ def __update_player_settings(self):
+ ''' [PROTECTED] update player config settings '''
player_settings = self.mass.config['player_settings'].get(self.player_id,{})
# generate config for the player
config_entries = [ # default config entries for a player
player_settings[key] = def_value
self.mass.config['player_settings'][self.player_id] = player_settings
self.mass.config['player_settings'][self.player_id]['__desc__'] = config_entries
- self._player_settings = self.mass.config['player_settings'][self.player_id]
- return player_settings
def to_dict(self):
''' instance attributes as dict so it can be serialized to json '''
self._save_busy_ = False
self._last_track = None
# load previous queue settings from disk
- self.mass.event_loop.create_task(self.__load_from_file())
+ self.mass.event_loop.run_in_executor(None, self.__load_from_file)
@property
def shuffle_enabled(self):
# shuffle requested
self._shuffle_enabled = True
await self.load(self._items)
- self.mass.event_loop.create_task(self._player.update())
+ self.mass.create_task(self._player.update())
elif self._shuffle_enabled and not enable_shuffle:
self._shuffle_enabled = False
# TODO: Unshuffle the list ?
- self.mass.event_loop.create_task(self._player.update())
+ self.mass.create_task(self._player.update())
async def next(self):
''' request next track in queue '''
:param queue_items: a list of QueueItem
:param offset: offset from current queue position
'''
- insert_at_index = self.cur_index + offset
- if not self.items or insert_at_index > len(self.items):
+
+ if not self.items or self.cur_index == None or self.cur_index + offset > len(self.items):
return await self.load(queue_items)
+ insert_at_index = self.cur_index + offset
if self.shuffle_enabled:
queue_items = await self.__shuffle_items(queue_items)
self._items = self._items[:insert_at_index] + queue_items + self._items[insert_at_index:]
# account for track changing state so trigger track change after 1 second
if self._last_track and self._last_track.streamdetails:
self._last_track.streamdetails["seconds_played"] = self._last_item_time
- self.mass.event_loop.create_task(
+ self.mass.create_task(
self.mass.signal_event(EVENT_PLAYBACK_STOPPED, self._last_track.streamdetails))
- if new_track:
- self.mass.event_loop.create_task(
+ if new_track and new_track.streamdetails:
+ self.mass.create_task(
self.mass.signal_event(EVENT_PLAYBACK_STARTED, new_track.streamdetails))
- self._last_track = new_track
- await self.__save_to_file()
+ self._last_track = new_track
+ self.mass.event_loop.run_in_executor(None, self.__save_to_file)
if self._last_player_state != self._player.state:
self._last_player_state = self._player.state
if (self._player.cur_time == 0 and
# can be extended with some more magic last last_played and stuff
return random.sample(queue_items, len(queue_items))
- async def __load_from_file(self):
+ def __load_from_file(self):
''' try to load the saved queue for this player from file '''
player_safe_str = filename_from_string(self._player.player_id)
settings_dir = os.path.join(self.mass.datapath, 'queue')
except Exception as exc:
LOGGER.debug("Could not load queue from disk - %s" % str(exc))
- async def __save_to_file(self):
+ def __save_to_file(self):
''' save current queue settings to file '''
if self._save_busy_:
return
for prov in self.providers.values():
await prov.setup()
# schedule sync task
- self.mass.event_loop.create_task(self.sync_music_providers())
+ self.mass.create_task(self.sync_music_providers())
async def item(self, item_id, media_type:MediaType, provider='database', lazy=True):
''' get single music item by id and media type'''
# actually add the tracks to the playlist on the provider
await self.providers[playlist_prov['provider']].add_playlist_tracks(playlist_prov['item_id'], track_ids_to_add)
# schedule sync
- self.mass.event_loop.create_task(self.sync_playlist_tracks(playlist.item_id, playlist_prov['provider'], playlist_prov['item_id']))
+ self.mass.create_task(self.sync_playlist_tracks(playlist.item_id, playlist_prov['provider'], playlist_prov['item_id']))
@run_periodic(3600)
async def sync_music_providers(self):
''' periodic sync of all music providers '''
if self.sync_running:
return
- self.sync_running = True
for prov_id in self.providers.keys():
- # sync library artists
+ self.sync_running = prov_id
+ # sync library items for each provider (if supported)
await try_supported(self.sync_library_artists(prov_id))
await try_supported(self.sync_library_albums(prov_id))
await try_supported(self.sync_library_tracks(prov_id))
await try_supported(self.sync_playlists(prov_id))
await try_supported(self.sync_radios(prov_id))
- self.sync_running = False
+ self.sync_running = None
async def sync_library_artists(self, prov_id):
''' sync library artists for given provider'''
tracks += await self.get_album_tracks(album.item_id)
return tracks[:10]
- async def get_stream_content_type(self, track_id):
- ''' return the content type for the given track when it will be streamed'''
+ async def get_stream_details(self, track_id):
+ ''' return the content details for the given track when it will be streamed'''
if not os.sep in track_id:
track_id = base64.b64decode(track_id).decode('utf-8')
- return track_id.split('.')[-1]
-
- async def get_audio_stream(self, track_id):
- ''' get audio stream for a track '''
- if not os.sep in track_id:
- track_id = base64.b64decode(track_id).decode('utf-8')
- with open(track_id) as f:
- while True:
- line = f.readline()
- if line:
- yield line
- else:
- break
+ # TODO: retrieve sanple rate and bitdepth
+ return {
+ "type": "file",
+ "path": track_id,
+ "content_type": track_id.split('.')[-1],
+ "sample_rate": 44100,
+ "bit_depth": 16
+ }
async def __parse_track(self, filename):
''' try to parse a track from a filename with taglib '''
try:
async with self.throttler:
async with self.http_session.get(url, headers=headers, params=params, verify_ssl=False) as response:
- result = await response.json()
- if not result or 'error' in result:
- LOGGER.error(url)
+ try:
+ result = await response.json()
+ if "error" in result:
+ return None
+ return result
+ except Exception as exc:
+ LOGGER.error(exc)
+ LOGGER.debug(url)
LOGGER.debug(params)
- LOGGER.debug(result)
- return None
- return result
+ result = response
+ LOGGER.debug(await response.text())
except Exception as exc:
LOGGER.exception(exc)
return None
async with self.http_session.post(url, params=params, json=data, verify_ssl=False) as response:
try:
result = await response.json()
+ if "error" in result:
+ return None
return result
except Exception as exc:
LOGGER.error(exc)
from pychromecast.controllers.multizone import MultizoneController
from pychromecast.socket_client import CONNECTION_STATUS_CONNECTED, CONNECTION_STATUS_DISCONNECTED
import types
+import time
from ..utils import run_periodic, LOGGER, try_parse_int
from ..models.playerprovider import PlayerProvider
class ChromecastPlayer(Player):
''' Chromecast player object '''
-
+
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- self._poll_task = self.mass.event_loop.create_task(self.__poll_status())
+ self.__cc_report_progress_task = None
def __del__(self):
- if self._poll_task:
- self._poll_task.cancel()
+ if self.__cc_report_progress_task:
+ self.__cc_report_progress_task.cancel()
async def try_chromecast_command(self, cmd:types.MethodType, *args, **kwargs):
''' guard for disconnected socket client '''
else:
send_queue()
- @run_periodic(10)
- async def __poll_status(self):
- ''' request actual status from CC '''
- # this is needed to get some accurate media progress info
- if self._state == PlayerState.Playing:
- await self.try_chromecast_command(self.cc.media_controller.update_status)
+ async def __report_progress(self):
+ ''' report current progress while playing '''
+ # chromecast does not send updates of the player's progress (cur_time)
+ # so we need to send it in periodically
+ while self._state == PlayerState.Playing:
+ self.cur_time = self.cc.media_controller.status.adjusted_current_time
+ await asyncio.sleep(1)
+ self.__cc_report_progress_task = None
async def handle_player_state(self, caststatus=None,
- mediastatus=None, connection_status=None):
+ mediastatus=None):
''' handle a player state message from the socket '''
- # handle connection status
- if connection_status:
- if connection_status.status == CONNECTION_STATUS_DISCONNECTED:
- # schedule a new scan which will handle group parent changes
- self.mass.event_loop.create_task(
- self.mass.players.providers[self.player_provider].start_chromecast_discovery())
# handle generic cast status
if caststatus:
self.muted = caststatus.volume_muted
self.state = PlayerState.Stopped
self.cur_uri = mediastatus.content_id
self.cur_time = mediastatus.adjusted_current_time
+ if self._state == PlayerState.Playing and self.__cc_report_progress_task == None:
+ self.__cc_report_progress_task = self.mass.create_task(self.__report_progress())
class ChromecastProvider(PlayerProvider):
''' support for ChromeCast Audio '''
async def setup(self):
''' perform async setup '''
- self.mass.event_loop.create_task(
+ self.mass.create_task(
self.__periodic_chromecast_discovery())
async def __handle_group_members_update(self, mz, added_player=None, removed_player=None):
@run_periodic(1800)
async def __periodic_chromecast_discovery(self):
''' run chromecast discovery on interval '''
- await self.start_chromecast_discovery()
+ self.mass.event_loop.run_in_executor(None, self.run_chromecast_discovery)
- async def start_chromecast_discovery(self):
+ def run_chromecast_discovery(self):
''' background non-blocking chromecast discovery and handler '''
if self._discovery_running:
return
self._discovery_running = True
LOGGER.debug("Chromecast discovery started...")
# remove any disconnected players...
- removed_players = []
for player in self.players:
if not player.cc.socket_client or not player.cc.socket_client.is_connected:
- removed_players.append(player.player_id)
# cleanup cast object
del player.cc
- # signal removed players
- for player_id in removed_players:
- await self.remove_player(player_id)
+ self.mass.create_task(self.remove_player(player.player_id))
# search for available chromecasts
from pychromecast.discovery import start_discovery, stop_discovery
def discovered_callback(name):
discovery_info = listener.services[name]
ip_address, port, uuid, model_name, friendly_name = discovery_info
player_id = str(uuid)
- player = asyncio.run_coroutine_threadsafe(
- self.get_player(player_id),
- self.mass.event_loop).result()
- if not player:
- asyncio.run_coroutine_threadsafe(
- self.__chromecast_discovered(player_id, discovery_info), self.mass.event_loop)
+ if not player_id in self.mass.players._players:
+ self.__chromecast_discovered(player_id, discovery_info)
listener, browser = start_discovery(discovered_callback)
- await asyncio.sleep(15) # run discovery for 15 seconds
+ time.sleep(15) # run discovery for 15 seconds
stop_discovery(browser)
LOGGER.debug("Chromecast discovery completed...")
self._discovery_running = False
- async def __chromecast_discovered(self, player_id, discovery_info):
+ def __chromecast_discovered(self, player_id, discovery_info):
''' callback when a (new) chromecast device is discovered '''
from pychromecast import _get_chromecast_from_host, ChromecastConnectionError
try:
self.supports_crossfade = False
# register status listeners
status_listener = StatusListener(player_id,
- player.handle_player_state, self.mass.event_loop)
+ player.handle_player_state, self.mass)
if chromecast.cast_type == 'group':
mz = MultizoneController(chromecast.uuid)
mz.register_listener(MZListener(mz,
chromecast.register_status_listener(status_listener)
chromecast.media_controller.register_status_listener(status_listener)
player.cc.wait()
- await self.add_player(player)
+ self.mass.create_task(self.add_player(player))
if player.mz:
player.mz.update_members()
-
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
class StatusListener:
- def __init__(self, player_id, status_callback, loop):
+ def __init__(self, player_id, status_callback, mass):
self.__handle_callback = status_callback
- self.loop = loop
+ self.mass = mass
self.player_id = player_id
def new_cast_status(self, status):
''' chromecast status changed (like volume etc.)'''
- asyncio.run_coroutine_threadsafe(
- self.__handle_callback(caststatus=status), self.loop)
+ self.mass.create_task(
+ self.__handle_callback(caststatus=status))
def new_media_status(self, status):
''' mediacontroller has new state '''
- asyncio.run_coroutine_threadsafe(
- self.__handle_callback(mediastatus=status), self.loop)
+ self.mass.create_task(
+ self.__handle_callback(mediastatus=status))
def new_connection_status(self, status):
''' will be called when the connection changes '''
- asyncio.run_coroutine_threadsafe(
- self.__handle_callback(connection_status=status), self.loop)
+ if status.status == CONNECTION_STATUS_DISCONNECTED:
+ # schedule a new scan which will handle reconnects and group parent changes
+ self.mass.event_loop.run_in_executor(None,
+ self.mass.players.providers[PROV_ID].run_chromecast_discovery)
class MZListener:
def __init__(self, mz, callback, loop):
from typing import List
import logging
import types
+import time
from ..utils import run_periodic, LOGGER, try_parse_int
from ..models.playerprovider import PlayerProvider
PROV_CLASS = 'SonosProvider'
CONFIG_ENTRIES = [
- (CONF_ENABLED, False, CONF_ENABLED),
+ (CONF_ENABLED, True, CONF_ENABLED),
]
PLAYER_CONFIG_ENTRIES = []
class SonosPlayer(Player):
''' Sonos player object '''
-
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.__sonos_report_progress_task = None
+
+ def __del__(self):
+ if self.__sonos_report_progress_task:
+ self.__sonos_report_progress_task.cancel()
+
async def cmd_stop(self):
''' send stop command to player '''
self.soco.stop()
for pos, item in enumerate(queue_items):
self.soco.add_uri_to_queue(item.uri, last_index+pos)
+ async def __report_progress(self):
+ ''' report current progress while playing '''
+ # sonos does not send instant updates of the player's progress (cur_time)
+ # so we need to send it in periodically
+ while self._state == PlayerState.Playing:
+ time_diff = time.time() - self.media_position_updated_at
+ adjusted_current_time = self._cur_time + time_diff
+ self.cur_time = adjusted_current_time
+ await asyncio.sleep(1)
+ self.__sonos_report_progress_task = None
+
def _update_state(self, event=None):
''' update state, triggerer by event '''
if event:
return
if self.soco.is_playing_tv or self.soco.is_playing_line_in:
self.powered = False
- else:
- new_state = self.__convert_state(current_transport_state)
- self.state = new_state
- track_info = self.soco.get_current_track_info()
- self.cur_uri = track_info["uri"]
- position_info = self.soco.avTransport.GetPositionInfo(
- [("InstanceID", 0), ("Channel", "Master")])
- rel_time = self.__timespan_secs(position_info.get("RelTime"))
- self.cur_time = rel_time
+ return
+ new_state = self.__convert_state(current_transport_state)
+ self.state = new_state
+ track_info = self.soco.get_current_track_info()
+ self.cur_uri = track_info["uri"]
+ position_info = self.soco.avTransport.GetPositionInfo(
+ [("InstanceID", 0), ("Channel", "Master")])
+ rel_time = self.__timespan_secs(position_info.get("RelTime"))
+ self.cur_time = rel_time
+ if self._state == PlayerState.Playing and self.__sonos_report_progress_task == None:
+ self.__sonos_report_progress_task = self.mass.create_task(self.__report_progress())
@staticmethod
def __convert_state(sonos_state):
async def setup(self):
''' perform async setup '''
- self.mass.event_loop.create_task(
+ self.mass.create_task(
self.__periodic_discovery())
@run_periodic(1800)
async def __periodic_discovery(self):
''' run sonos discovery on interval '''
- await self.run_discovery()
+ self.mass.event_loop.run_in_executor(None, self.run_discovery)
- async def run_discovery(self):
+ def run_discovery(self):
''' background sonos discovery and handler '''
if self._discovery_running:
return
LOGGER.debug("Sonos discovery started...")
import soco
discovered_devices = soco.discover()
+ if discovered_devices == None:
+ discovered_devices = []
new_device_ids = [item.uid for item in discovered_devices]
cur_player_ids = [item.player_id for item in self.players]
# remove any disconnected players...
for player in self.players:
if not player.is_group and not player.soco.uid in new_device_ids:
- await self.remove_player(player.player_id)
+ self.mass.create_task(self.remove_player(player.player_id))
# process new players
for device in discovered_devices:
if device.uid not in cur_player_ids and device.is_visible:
- await self.__device_discovered(device)
+ self.__device_discovered(device)
# handle groups
if len(discovered_devices) > 0:
- await self.__process_groups(discovered_devices[0].all_groups)
+ self.__process_groups(discovered_devices[0].all_groups)
else:
- await self.__process_groups([])
+ self.__process_groups([])
- async def __device_discovered(self, soco_device):
- '''handle new player '''
+ def __device_discovered(self, soco_device):
+ '''handle new sonos player '''
player = SonosPlayer(self.mass, soco_device.uid, self.prov_id)
player.soco = soco_device
player.name = soco_device.player_name
subscribe(soco_device.avTransport, player._update_state)
subscribe(soco_device.renderingControl, player._update_state)
subscribe(soco_device.zoneGroupTopology, self.__topology_changed)
- return await self.add_player(player)
+ self.mass.create_task(self.add_player(player))
+ return player
- async def __process_groups(self, sonos_groups):
+ def __process_groups(self, sonos_groups):
''' process all sonos groups '''
all_group_ids = []
for group in sonos_groups:
all_group_ids.append(group.uid)
if group.uid not in self.mass.players._players:
# new group player
- group_player = await self.__device_discovered(group.coordinator)
+ group_player = self.__device_discovered(group.coordinator)
else:
- group_player = await self.get_player(group.uid)
+ group_player = self.mass.players.get_player_sync(group.uid)
# check members
group_player.name = group.label
group_player.group_childs = [item.uid for item in group.members]
from one of the sonos players
schedule discovery to work out the changes
'''
- self.mass.event_loop.create_task(self.run_discovery())
+ self.mass.event_loop.run_in_executor(None, self.run_discovery)
class _ProcessSonosEventQueue:
"""Queue like object for dispatching sonos events."""
async def setup(self):
''' async initialize of module '''
# start slimproto server
- self.mass.event_loop.create_task(
+ self.mass.create_task(
asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483))
# setup discovery
- self.mass.event_loop.create_task(self.start_discovery())
+ self.mass.create_task(self.start_discovery())
async def start_discovery(self):
transport, protocol = await self.mass.event_loop.create_datagram_endpoint(
player_id = str(device_mac).lower()
device_type = devices.get(dev_id, 'unknown device')
player = PySqueezePlayer(self.mass, player_id, self.prov_id, device_type, writer)
- self.mass.event_loop.create_task(self.mass.players.add_player(player))
+ self.mass.create_task(self.mass.players.add_player(player))
elif player != None:
player.process_msg(operation, packet)
self.send_frame(b"setd", struct.pack("B", 4))
# TODO: remember last volume and power state
- self.mass.event_loop.create_task(self.volume_set(40))
- self.mass.event_loop.create_task(self.power_off())
+ self.mass.create_task(self.volume_set(40))
+ self.mass.create_task(self.power_off())
self._heartbeat_task = asyncio.create_task(self.__send_heartbeat())
async def cmd_stop(self):
''' send command to Squeeze player'''
packet = struct.pack('!H', len(data) + 4) + command + data
self._writer.write(packet)
- self.mass.event_loop.create_task(self._writer.drain())
+ self.mass.create_task(self._writer.drain())
def send_version(self):
self.send_frame(b'vers', b'7.8')
LOGGER.debug("Decoder Ready for next track")
next_item = self.queue.next_item
if next_item:
- self.mass.event_loop.create_task(
+ self.mass.create_task(
self.__send_play(next_item.uri))
def stat_STMe(self, data):
if value is None:
value = ''
elif len(value) > 255:
- LOGGER.warning("Response %s too long, truncating to 255 bytes" % typ)
+ # Response too long, truncating to 255 bytes
value = value[:255]
parts.extend((typ, chr(len(value)), value))
self.packet = ''.join(parts)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
def connection_lost(self, *args, **kwargs):
- LOGGER.warning("Connection lost to discovery")
+ LOGGER.debug("Connection lost to discovery")
def build_TLV_response(self, requestdata):
responsedata = OrderedDict()
try:
data = data.decode()
dgram = Datagram.decode(data)
- LOGGER.debug("Data received from %s: %s" % (addr, dgram))
if isinstance(dgram, ClientDiscoveryDatagram):
self.sendDiscoveryResponse(addr)
elif isinstance(dgram, TLVDiscoveryRequestDatagram):
def sendDiscoveryResponse(self, addr):
dgram = DiscoveryResponseDatagram(get_hostname(), 3483)
- LOGGER.debug("Sending discovery response %r" % (dgram.packet,))
self.transport.sendto(dgram.packet.encode(), addr)
def sendTLVDiscoveryResponse(self, resonsedata, addr):
dgram = TLVDiscoveryResponseDatagram(resonsedata)
- LOGGER.debug("Sending discovery response %r" % (dgram.packet,))
self.transport.sendto(dgram.packet.encode(), addr)
+++ /dev/null
-#!/usr/bin/env python3
-# -*- coding:utf-8 -*-
-
-import asyncio
-import os
-import struct
-from collections import OrderedDict
-import time
-import decimal
-from typing import List
-import random
-import sys
-import socket
-from ..utils import run_periodic, LOGGER, parse_track_title, try_parse_int, get_ip, get_hostname
-from ..models import PlayerProvider, Player, PlayerState, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist
-from ..constants import CONF_ENABLED
-
-
-PROV_ID = 'web'
-PROV_NAME = 'WebPlayer'
-PROV_CLASS = 'WebPlayerProvider'
-
-CONFIG_ENTRIES = [
- (CONF_ENABLED, True, CONF_ENABLED),
- ]
-
-PLAYER_CONFIG_ENTRIES = []
-
-EVENT_WEBPLAYER_CMD = 'webplayer command'
-EVENT_WEBPLAYER_STATE = 'webplayer state'
-EVENT_WEBPLAYER_REGISTER = 'webplayer register'
-
-class WebPlayerProvider(PlayerProvider):
- '''
- Implementation of a player using pure HTML/javascript
- used in the front-end.
- Communication is handled through the websocket connection
- and our internal event bus
- '''
-
- def __init__(self, mass, conf):
- super().__init__(mass, conf)
- self.prov_id = PROV_ID
- self.name = PROV_NAME
- self.player_config_entries = PLAYER_CONFIG_ENTRIES
-
- ### Provider specific implementation #####
-
- async def setup(self):
- ''' async initialize of module '''
- await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_STATE)
- await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_REGISTER)
- self.mass.event_loop.create_task(self.check_players())
-
- async def handle_mass_event(self, msg, msg_details):
- ''' received event for the webplayer component '''
- #print("%s ---> %s" %(msg, msg_details))
- if msg == EVENT_WEBPLAYER_REGISTER:
- # register new player
- player_id = msg_details['player_id']
- player = WebPlayer(self.mass, player_id, self.prov_id)
- player.supports_crossfade = False
- player.supports_gapless = False
- player.supports_queue = False
- player.name = msg_details['name']
- await self.add_player(player)
- elif msg == EVENT_WEBPLAYER_STATE:
- player_id = msg_details['player_id']
- player = await self.get_player(player_id)
- if player:
- await player.handle_state(msg_details)
-
- @run_periodic(30)
- async def check_players(self):
- ''' invalidate players that did not send a heartbeat message in a while '''
- cur_time = time.time()
- offline_players = []
- for player in self.players:
- if cur_time - player._last_message > 30:
- offline_players.append(player.player_id)
- for player_id in offline_players:
- await self.remove_player(player_id)
-
-
-class WebPlayer(Player):
- ''' Web player object '''
-
- def __init__(self, mass, player_id, prov_id):
- self._last_message = time.time()
- super().__init__(mass, player_id, prov_id)
-
- async def cmd_stop(self):
- ''' send stop command to player '''
- data = { 'player_id': self.player_id, 'cmd': 'stop'}
- await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def cmd_play(self):
- ''' send play command to player '''
- data = { 'player_id': self.player_id, 'cmd': 'play'}
- await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def cmd_pause(self):
- ''' send pause command to player '''
- data = { 'player_id': self.player_id, 'cmd': 'pause'}
- await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def cmd_power_on(self):
- ''' send power ON command to player '''
- self.powered = True # not supported on webplayer
- data = { 'player_id': self.player_id, 'cmd': 'stop'}
- await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def cmd_power_off(self):
- ''' send power OFF command to player '''
- self.powered = False
-
- async def cmd_volume_set(self, volume_level):
- ''' send new volume level command to player '''
- data = { 'player_id': self.player_id, 'cmd': 'volume_set', 'volume_level': volume_level}
- await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def cmd_volume_mute(self, is_muted=False):
- ''' send mute command to player '''
- data = { 'player_id': self.player_id, 'cmd': 'volume_mute', 'is_muted': is_muted}
- await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def cmd_play_uri(self, uri:str):
- ''' play single uri on player '''
- data = { 'player_id': self.player_id, 'cmd': 'play_uri', 'uri': uri}
- await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def handle_state(self, data):
- ''' handle state event from player '''
- if 'volume_level' in data:
- self.volume_level = data['volume_level']
- if 'muted' in data:
- self.muted = data['muted']
- if 'state' in data:
- self.state = PlayerState(data['state'])
- if 'cur_time' in data:
- self.cur_time = data['cur_time']
- if 'cur_uri' in data:
- self.cur_uri = data['cur_uri']
- if 'powered' in data:
- self.powered = data['powered']
- if 'name' in data:
- self.name = data['name']
- self._last_message = time.time()
-
-
--- /dev/null
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-
+
+import asyncio
+import os
+import struct
+from collections import OrderedDict
+import time
+import decimal
+from typing import List
+import random
+import sys
+import socket
+from ..utils import run_periodic, LOGGER, parse_track_title, try_parse_int, get_ip, get_hostname
+from ..models import PlayerProvider, Player, PlayerState, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist
+from ..constants import CONF_ENABLED
+
+
+PROV_ID = 'webplayer'
+PROV_NAME = 'WebPlayer'
+PROV_CLASS = 'WebPlayerProvider'
+
+CONFIG_ENTRIES = [
+ (CONF_ENABLED, True, CONF_ENABLED),
+ ]
+
+PLAYER_CONFIG_ENTRIES = []
+
+EVENT_WEBPLAYER_CMD = 'webplayer command'
+EVENT_WEBPLAYER_STATE = 'webplayer state'
+EVENT_WEBPLAYER_REGISTER = 'webplayer register'
+
+class WebPlayerProvider(PlayerProvider):
+ '''
+ Implementation of a player using pure HTML/javascript
+ used in the front-end.
+ Communication is handled through the websocket connection
+ and our internal event bus
+ '''
+
+ def __init__(self, mass, conf):
+ super().__init__(mass, conf)
+ self.prov_id = PROV_ID
+ self.name = PROV_NAME
+ self.player_config_entries = PLAYER_CONFIG_ENTRIES
+
+ ### Provider specific implementation #####
+
+ async def setup(self):
+ ''' async initialize of module '''
+ await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_STATE)
+ await self.mass.add_event_listener(self.handle_mass_event, EVENT_WEBPLAYER_REGISTER)
+ self.mass.create_task(self.check_players())
+
+ async def handle_mass_event(self, msg, msg_details):
+ ''' received event for the webplayer component '''
+ if msg == EVENT_WEBPLAYER_REGISTER:
+ # register new player
+ player_id = msg_details['player_id']
+ player = WebPlayer(self.mass, player_id, self.prov_id)
+ player.supports_crossfade = False
+ player.supports_gapless = False
+ player.supports_queue = False
+ player.name = msg_details['name']
+ await self.add_player(player)
+ elif msg == EVENT_WEBPLAYER_STATE:
+ player_id = msg_details['player_id']
+ player = await self.get_player(player_id)
+ if player:
+ await player.handle_state(msg_details)
+
+ @run_periodic(30)
+ async def check_players(self):
+ ''' invalidate players that did not send a heartbeat message in a while '''
+ cur_time = time.time()
+ offline_players = []
+ for player in self.players:
+ if cur_time - player._last_message > 30:
+ offline_players.append(player.player_id)
+ for player_id in offline_players:
+ await self.remove_player(player_id)
+
+
+class WebPlayer(Player):
+ ''' Web player object '''
+
+ def __init__(self, mass, player_id, prov_id):
+ self._last_message = time.time()
+ super().__init__(mass, player_id, prov_id)
+
+ async def cmd_stop(self):
+ ''' send stop command to player '''
+ data = { 'player_id': self.player_id, 'cmd': 'stop'}
+ await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def cmd_play(self):
+ ''' send play command to player '''
+ data = { 'player_id': self.player_id, 'cmd': 'play'}
+ await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def cmd_pause(self):
+ ''' send pause command to player '''
+ data = { 'player_id': self.player_id, 'cmd': 'pause'}
+ await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def cmd_power_on(self):
+ ''' send power ON command to player '''
+ self.powered = True # not supported on webplayer
+ data = { 'player_id': self.player_id, 'cmd': 'stop'}
+ await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def cmd_power_off(self):
+ ''' send power OFF command to player '''
+ self.powered = False
+
+ async def cmd_volume_set(self, volume_level):
+ ''' send new volume level command to player '''
+ data = { 'player_id': self.player_id, 'cmd': 'volume_set', 'volume_level': volume_level}
+ await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def cmd_volume_mute(self, is_muted=False):
+ ''' send mute command to player '''
+ data = { 'player_id': self.player_id, 'cmd': 'volume_mute', 'is_muted': is_muted}
+ await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def cmd_play_uri(self, uri:str):
+ ''' play single uri on player '''
+ data = { 'player_id': self.player_id, 'cmd': 'play_uri', 'uri': uri}
+ await self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def handle_state(self, data):
+ ''' handle state event from player '''
+ if 'volume_level' in data:
+ self.volume_level = data['volume_level']
+ if 'muted' in data:
+ self.muted = data['muted']
+ if 'state' in data:
+ self.state = PlayerState(data['state'])
+ if 'cur_time' in data:
+ self.cur_time = data['cur_time']
+ if 'cur_uri' in data:
+ self.cur_uri = data['cur_uri']
+ if 'powered' in data:
+ self.powered = data['powered']
+ if 'name' in data:
+ self.name = data['name']
+ self._last_message = time.time()
+
+
keepcharacters = (' ','.','_')
return "".join(c for c in string if c.isalnum() or c in keepcharacters).rstrip()
-def run_background_task(executor, corofn, *args):
+def run_background_task(corofn, *args, executor=None):
''' run non-async task in background '''
return asyncio.get_event_loop().run_in_executor(executor, corofn, *args)
async def setup(self):
''' perform async setup '''
- self.http_session = aiohttp.ClientSession()
app = web.Application()
app.add_routes([web.get('/jsonrpc.js', self.json_rpc)])
app.add_routes([web.post('/jsonrpc.js', self.json_rpc)])
},
switchPlayer (new_player_id) {
this.active_player_id = new_player_id;
+ localStorage.setItem('active_player_id', new_player_id);
},
setPlayerVolume: function(player_id, new_volume) {
this.players[player_id].volume_level = new_volume;
}
},
createAudioPlayer(data) {
- if (navigator.userAgent.includes("WebKit"))
- return // streaming flac not supported on webkit ?!
+ if (!navigator.userAgent.includes("Chrome"))
+ return // streaming flac only supported on chrome browser
if (localStorage.getItem('audio_player_id'))
// get player id from local storage
this.audioPlayerId = localStorage.getItem('audio_player_id');
}
// select new active player
- // TODO: store previous player in local storage
- if (!this.active_player_id || !this.players[this.active_player_id].enabled)
- for (var player_id in this.players)
- if (this.players[player_id].state == 'playing' && this.players[player_id].enabled) {
- // prefer the first playing player
- this.active_player_id = player_id;
- break;
- }
- if (!this.active_player_id || !this.players[this.active_player_id].enabled)
- for (var player_id in this.players) {
- // fallback to just the first player
- if (this.players[player_id].enabled)
- {
- this.active_player_id = player_id;
- break;
- }
+ if (!this.active_player_id || !this.players[this.active_player_id].enabled) {
+ // prefer last selected player
+ last_player = localStorage.getItem('active_player_id')
+ if (last_player && this.players[last_player] && this.players[last_player].enabled)
+ this.active_player_id = last_player;
+ else
+ {
+ // prefer the first playing player
+ for (var player_id in this.players)
+ if (this.players[player_id].state == 'playing' && this.players[player_id].enabled && this.players[player_id].group_parents.length == 0) {
+ this.active_player_id = player_id;
+ break;
+ }
+ // fallback to just the first player
+ if (!this.active_player_id || !this.players[this.active_player_id].enabled)
+ for (var player_id in this.players) {
+ if (this.players[player_id].enabled && this.players[player_id].group_parents.length == 0)
+ {
+ this.active_player_id = player_id;
+ break;
+ }
+ }
}
+ }
}.bind(this);
this.ws.onclose = function(e) {
file: "Filesystem",
chromecast: "Chromecast",
squeezebox: "Squeezebox support",
+ sonos: "Sonos",
+ webplayer: "Web Player (Chrome browser only)",
username: "Username",
password: "Password",
hostname: "Hostname (or IP)",
file: "Bestandssysteem",
chromecast: "Chromecast",
squeezebox: "Squeezebox ondersteuning",
+ sonos: "Sonos",
+ webplayer: "Web Player (alleen Chrome browser)",
username: "Gebruikersnaam",
password: "Wachtwoord",
hostname: "Hostnaam (of IP)",