#!/usr/bin/env python3
# -*- coding:utf-8 -*-
-# import os, sys; sys.path.append(os.path.dirname(os.path.realpath(__file__)))
-
import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor
import uuid
import json
import time
-# import stackimpact
-
-# __package__ = 'music_assistant'
from .database import Database
from .utils import run_periodic, LOGGER
def handle_exception(loop, context):
# context["message"] will always be there; but context["exception"] may not
msg = context.get("exception", context["message"])
- LOGGER.error(f"Caught exception: {msg}")
+ LOGGER.exception(f"Caught exception: {msg}")
class MusicAssistant():
self.datapath = datapath
self.parse_config()
self.event_loop = asyncio.get_event_loop()
+ self.event_loop.set_debug(True)
self.bg_executor = ThreadPoolExecutor()
self.event_loop.set_default_executor(self.bg_executor)
- self.event_loop.set_exception_handler(handle_exception)
+ #self.event_loop.set_exception_handler(handle_exception)
self.event_listeners = {}
# init database and metadata modules
self.player = PlayerManager(self)
self.http_streamer = HTTPStreamer(self)
- # agent = stackimpact.start(
- # agent_key = '4a00b6f2c7da20f692807d204ab3760318978ba3',
- # app_name = 'MusicAssistant')
- # print("profiler started...")
-
# start the event loop
try:
self.event_loop.run_forever()
except (KeyboardInterrupt, SystemExit):
LOGGER.info('Exit requested!')
- self.signal_event("system_shutdown")
+ self.event_loop.create_task(self.signal_event("system_shutdown"))
self.event_loop.stop()
self.save_config()
time.sleep(5)
self.event_loop.close()
LOGGER.info('Shutdown complete.')
- def signal_event(self, msg, msg_details=None):
+ async def signal_event(self, msg, msg_details=None):
''' signal (systemwide) event '''
LOGGER.debug("Event: %s - %s" %(msg, msg_details))
listeners = list(self.event_listeners.values())
for callback, eventfilter in listeners:
if not eventfilter or eventfilter in msg:
- if not asyncio.iscoroutinefunction(callback):
- callback(msg, msg_details)
- else:
- self.event_loop.create_task(callback(msg, msg_details))
+ self.event_loop.create_task(callback(msg, msg_details))
- def add_event_listener(self, cb, eventfilter=None):
+ async def add_event_listener(self, cb, eventfilter=None):
''' add callback to our event listeners '''
cb_id = str(uuid.uuid4())
self.event_listeners[cb_id] = (cb, eventfilter)
return cb_id
- def remove_event_listener(self, cb_id):
+ async def remove_event_listener(self, cb_id):
''' remove callback from our event listeners '''
self.event_listeners.pop(cb_id, None)
else:
self._use_ssl = False
self._host = url.replace('http://','').split('/')[0]
- self.http_session = aiohttp.ClientSession(loop=mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
self.__send_ws = None
self.__last_id = 10
LOGGER.info('Homeassistant integration is enabled')
+ self.mass.event_loop.create_task(self.setup())
+
+ async def setup(self):
+ ''' perform async setup '''
+ self.http_session = aiohttp.ClientSession(
+ loop=self.mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
mass.event_loop.create_task(self.__hass_websocket())
- self.mass.add_event_listener(self.mass_event, "player updated")
+ await self.mass.add_event_listener(self.mass_event, "player updated")
mass.event_loop.create_task(self.__get_sources())
async def get_state(self, entity_id, attribute='state', register_listener=None):
import aiohttp
from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size
from .models.media_types import TrackQuality, MediaType
-from .models.player import PlayerState
+from .models.playerstate import PlayerState
class HTTPStreamer():
run_async_background_task(
self.mass.bg_executor,
self.__stream_queue, player, queue, cancelled)
- await asyncio.sleep(2)
try:
while True:
chunk = await queue.get()
chunk = await sox_proc.stdout.read(256000)
if not chunk:
break
- await buffer.put(chunk)
- await buffer.put(b'') # indicate EOF
+ asyncio.run_coroutine_threadsafe(
+ buffer.put(chunk),
+ self.mass.event_loop)
+ # indicate EOF if no more data
+ asyncio.run_coroutine_threadsafe(
+ buffer.put(b''),
+ self.mass.event_loop)
asyncio.create_task(fill_buffer())
- LOGGER.info("Start Queue Stream for player %s" %(player.name))
+ LOGGER.info("Start Queue Stream for player %s " %(player.name))
+ is_start = True
last_fadeout_data = b''
- # report start of queue playback so we can calculate current track/duration etc.
- # self.mass.event_loop.create_task(self.mass.player.player_queue_stream_update(player_id, queue_index, True))
while True:
# get the (next) track in queue
- queue_track = player.queue.next_item
- LOGGER.info("got queue track %s" % queue_track.name)
+ if is_start:
+ # report start of queue playback so we can calculate current track/duration etc.
+ queue_track = asyncio.run_coroutine_threadsafe(
+ player.queue.start_queue_stream(),
+ self.mass.event_loop).result()
+ is_start = False
+ else:
+ queue_track = player.queue.next_item
if not queue_track:
+ LOGGER.warning("no (more) tracks left in queue")
break
- LOGGER.debug("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
+ LOGGER.info("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
+ LOGGER.info(player.state)
fade_in_part = b''
cur_chunk = 0
prev_chunk = None
# wait for the queue to consume the data
# this prevents that the entire track is sitting in memory
# and it helps a bit in the quest to follow where we are in the queue
- while buffer.qsize() > 1 and not cancelled.is_set():
+ while buffer.qsize() > 2 and not cancelled.is_set():
await asyncio.sleep(1)
# end of the track reached
if cancelled.is_set():
# WIP: update actual duration to the queue for more accurate now playing info
accurate_duration = bytes_written / int(sample_rate * 4 * 2)
queue_track.duration = accurate_duration
- #self.mass.player.providers[player.player_provider]._player_queue[player_id][queue_index] = queue_track
- # move to next queue index
- #queue_index += 1
- #self.mass.event_loop.create_task(self.mass.player.player_queue_stream_update(player_id, queue_index, False))
LOGGER.info("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
LOGGER.info("bytes written: %s - duration: %s" % (bytes_written, accurate_duration))
- break
# end of queue reached, pass last fadeout bits to final output
if last_fadeout_data and not cancelled.is_set():
sox_proc.stdin.write(last_fadeout_data)
streamdetails["provider"] = queue_item.provider
streamdetails["track_id"] = queue_item.item_id
streamdetails["player_id"] = player.player_id
- self.mass.signal_event('streaming_started', streamdetails)
+ asyncio.run_coroutine_threadsafe(
+ self.mass.signal_event('streaming_started', streamdetails), self.mass.event_loop)
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
prev_chunk = b''
bytes_sent = 0
while not process.stdout.at_eof():
- if cancelled.is_set():
- process.terminate()
try:
chunk = await process.stdout.readexactly(chunksize)
except asyncio.streams.IncompleteReadError:
bytes_sent += len(prev_chunk)
await process.wait()
if cancelled.is_set():
+ try:
+ process.terminate()
+ except ProcessLookupError:
+ pass
LOGGER.warning("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent))
else:
LOGGER.info("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent))
bytes_per_second = streamdetails["sample_rate"] * (streamdetails["bit_depth"]/8) * 2
seconds_streamed = int(bytes_sent/bytes_per_second)
streamdetails["seconds"] = seconds_streamed
- self.mass.signal_event('streaming_ended', streamdetails)
+ asyncio.run_coroutine_threadsafe(
+ self.mass.signal_event('streaming_ended', streamdetails),
+ self.mass.event_loop)
# send task to background to analyse the audio
- self.mass.event_loop.create_task(self.__analyze_audio(queue_item.item_id, queue_item.provider))
+ asyncio.run_coroutine_threadsafe(
+ self.__analyze_audio(queue_item.item_id, queue_item.provider),
+ self.mass.event_loop)
async def __get_player_sox_options(self, player, queue_item):
''' get player specific sox effect options '''
def __init__(self, event_loop, cache):
self.event_loop = event_loop
self.cache = cache
- self.http_session = aiohttp.ClientSession(loop=event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
+ self.event_loop.create_task(self.setup())
+
+ async def setup(self):
+ ''' perform async setup '''
+ self.http_session = aiohttp.ClientSession(
+ loop=self.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
self.throttler = Throttler(rate_limit=1, period=1)
async def search_artist_by_album(self, artistname, albumname=None, album_upc=None):
result = None
return result
-
class FanartTv():
def __init__(self, event_loop, cache):
self.event_loop = event_loop
self.cache = cache
- self.http_session = aiohttp.ClientSession(loop=event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
+ self.event_loop.create_task(self.setup())
+
+ async def setup(self):
+ ''' perform async setup '''
+ self.http_session = aiohttp.ClientSession(
+ loop=self.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
self.throttler = Throttler(rate_limit=1, period=1)
async def artist_images(self, mb_artist_id):
from .media_types import *
-from .musicprovider import *
-from .player_queue import *
-from .player import *
-from .playerprovider import *
\ No newline at end of file
+from .musicprovider import MusicProvider
+from .player_queue import QueueItem, PlayerQueue
+from .player import Player
+from .playerstate import PlayerState
+from .playerprovider import PlayerProvider
\ No newline at end of file
from ..cache import use_cache
from .media_types import Track, MediaType
from .player_queue import PlayerQueue, QueueItem
+from .playerstate import PlayerState
-class PlayerState(str, Enum):
- Off = "off"
- Stopped = "stopped"
- Paused = "paused"
- Playing = "playing"
-
class Player():
''' representation of a player '''
return hass_state != 'off'
# mute as power
elif self.settings.get('mute_as_power'):
- return self.muted
+ return not self.muted
else:
return self._powered
''' [PROTECTED] cur_time (player's elapsed time) property of this player '''
# handle group player
if self.group_parent:
- group_player = self.mass.bg_executor.submit(asyncio.run,
- self.mass.player.get_player(self.group_parent)).result()
+ group_player = self.mass.player.get_player_sync(self.group_parent)
if group_player:
return group_player.cur_time
- return self._cur_time
+ return self.queue.cur_item_time
@cur_time.setter
def cur_time(self, cur_time:int):
''' [PROTECTED] cur_uri (uri loaded in player) property of this player '''
# handle group player
if self.group_parent:
- group_player = self.mass.bg_executor.submit(asyncio.run,
- self.mass.player.get_player(self.group_parent)).result()
+ group_player = self.mass.player.get_player_sync(self.group_parent)
if group_player:
return group_player.cur_uri
return self._cur_uri
return []
return [item for item in self.mass.player.players if item.group_parent == self.player_id]
- @property
- def settings(self):
- ''' [PROTECTED] get the player config settings '''
- player_settings = self.mass.config['player_settings'].get(self.player_id)
- if not player_settings:
- player_settings = self.mass.bg_executor.submit(asyncio.run,
- self.__update_player_settings()).result()
- return player_settings
-
@property
def enabled(self):
''' [PROTECTED] player enabled config setting '''
''' [PROTECTED] player's queue '''
# handle group player
if self.group_parent:
- group_player = self.mass.bg_executor.submit(asyncio.run,
- self.mass.player.get_player(self.group_parent)).result()
+ group_player = self.mass.player.get_player_sync(self.group_parent)
if group_player:
return group_player.queue
return self._queue
''' [PROTECTED] send stop command to player '''
if self.group_parent:
# redirect playback related commands to parent player
- group_player = await self.mass.player.get(self.group_parent)
+ group_player = await self.mass.player.get_player(self.group_parent)
if group_player:
return await group_player.stop()
else:
async def volume_up(self):
''' [PROTECTED] send volume up command to player '''
new_level = self.volume_level + 1
+ if new_level > 100:
+ new_level = 100
return await self.volume_set(new_level)
async def volume_down(self):
async def update(self):
''' [PROTECTED] signal player updated '''
- await self.__update_player_settings()
- LOGGER.info("player updated: %s" % self.name)
- self.mass.signal_event('player changed', self)
+ await self.queue.update()
+ LOGGER.debug("player updated: %s" % self.name)
+ await self.mass.signal_event('player changed', self)
- async def __update_player_settings(self):
+ @property
+ def settings(self):
''' [PROTECTED] get (or create) player config settings '''
+ player_settings = self.mass.config['player_settings'].get(self.player_id,{})
+ if player_settings:
+ return player_settings
+ # generate config for the player
config_entries = [ # default config entries for a player
("enabled", True, "player_enabled"),
("name", "", "player_name"),
("crossfade_duration", 0, "crossfade_duration"),
]
# append player specific settings
- config_entries += await self.mass.player.providers[self._prov_id].get_player_config_entries()
+ config_entries += self.mass.player.providers[self._prov_id].player_config_entries
if self.is_group or not self.group_parent:
config_entries += [ # play on power on setting
("play_power_on", False, "player_power_play"),
config_entries += [("hass_power_entity", "", "hass_player_power"),
("hass_power_entity_source", "", "hass_player_source"),
("hass_volume_entity", "", "hass_player_volume")]
- player_settings = self.mass.config['player_settings'].get(self.player_id,{})
for key, def_value, desc in config_entries:
if not key in player_settings:
if (isinstance(def_value, str) and def_value.startswith('<')):
self.mass.config['player_settings'][self.player_id]['__desc__'] = config_entries
return player_settings
- @property
- def __dict__(self):
+ def to_dict(self):
''' instance attributes as dict so it can be serialized to json '''
return {
"player_id": self.player_id,
from ..utils import LOGGER
from ..constants import CONF_ENABLED
from .media_types import Track, TrackQuality
+from .playerstate import PlayerState
class QueueItem(Track):
self.queue_item_id = str(uuid.uuid4())
# if existing media_item given, load those values
if media_item:
- for attribute, value in media_item.__dict__.items():
- setattr(self, attribute, value)
+ for key, value in media_item.__dict__.items():
+ setattr(self, key, value)
class PlayerQueue():
'''
self._items = []
self._shuffle_enabled = True
self._repeat_enabled = False
- self._cur_index = None
+ self._cur_index = 0
+ self._cur_item_time = 0
+ self._last_index = 0
@property
def shuffle_enabled(self):
@property
def crossfade_enabled(self):
- return self._player.settings['crossfade_duration']
+ return self._player.settings.get('crossfade_duration', 0) > 0
@property
def gapless_enabled(self):
@property
def cur_index(self):
''' match current uri with queue items to determine queue index '''
- for index, queue_item in enumerate(self.items):
- if queue_item.uri == self._player.cur_uri:
- return index
return self._cur_index
@property
def cur_item(self):
- if self.cur_index == None:
+ if self.cur_index == None or not self.items or len(self.items) < self.cur_index:
return None
- return self.mass.bg_executor.submit(asyncio.run,self.get_item(self.cur_index)).result()
+ return self.items[self.cur_index]
+ @property
+ def cur_item_time(self):
+ if self.use_queue_stream:
+ return self._cur_item_time
+ else:
+ return self._player._cur_time
+
@property
def next_index(self):
'''
'''
return the next item in the queue
'''
- return self.mass.bg_executor.submit(
- asyncio.run, self.get_item(self.next_index)).result()
+ if self.next_index != None:
+ return self.items[self.next_index]
+ return None
@property
def items(self):
if not len(self.items) > index:
return
if self.use_queue_stream:
- self._cur_index = index -1
+ self._cur_index = index
queue_stream_uri = 'http://%s:%s/stream/%s'% (
self.mass.web.local_ip, self.mass.web.http_port, self._player.player_id)
return await self._player.cmd_play_uri(queue_stream_uri)
if self._shuffle_enabled:
queue_items = await self.__shuffle_items(queue_items)
self._items = queue_items
- self._cur_index = None
+ self._cur_index = 0
if self.use_queue_stream or not self._player.supports_queue:
return await self.play_index(0)
else:
if self._player.supports_queue:
return await self._player.cmd_queue_append(queue_items)
+ async def update(self):
+ ''' update queue details, called when player updates '''
+ if self.use_queue_stream and self._player.state == PlayerState.Playing:
+ # determine queue index and cur_time for queue stream
+ # player is playing a constant stream of the queue so we need to do this the hard way
+ cur_time_queue = self._player._cur_time
+ total_time = 0
+ track_time = 0
+ if self.items:
+ queue_index = self._last_index # holds the last starting position
+ queue_track = None
+ while True:
+ queue_track = self.items[queue_index]
+ if cur_time_queue > (queue_track.duration + total_time):
+ total_time += queue_track.duration
+ queue_index += 1
+ else:
+ track_time = cur_time_queue - total_time
+ break
+ self._cur_index = queue_index
+ self._cur_item_time = track_time
+ elif not self.use_queue_stream:
+ # normal queue based approach
+ cur_index = 0
+ for index, queue_item in enumerate(self.items):
+ if queue_item.uri == self._player.cur_uri:
+ cur_index = index
+ break
+ self._cur_index = cur_index
+
+ async def start_queue_stream(self):
+ ''' called by the queue streamer when it starts playing the queue stream '''
+ self._last_index = self.cur_index
+ return await self.get_item(self.cur_index)
+
async def __shuffle_items(self, queue_items):
''' shuffle a list of tracks '''
# for now we use default python random function
self.mass = mass
self.name = 'My great Musicplayer provider' # display name
self.prov_id = 'my_provider' # used as id
+ self.player_config_entries = [] # player specific config entries
### Common methods and properties ####
- async def get_player_config_entries(self):
- ''' [CAN OVERRIDE] get the player-specific config entries for this provider (list with key/value pairs)'''
- return []
@property
def players(self):
''' return all players for this provider '''
- return self.mass.bg_executor.submit(asyncio.run,
- self.mass.player.get_provider_players(self.prov_id)).result()
-
+ return [item for item in self.mass.player.players if item.player_provider == self.prov_id]
+
async def get_player(self, player_id:str):
''' return player by id '''
return await self.mass.player.get_player(player_id)
--- /dev/null
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-
+
+from enum import Enum
+
+class PlayerState(str, Enum):
+ Off = "off"
+ Stopped = "stopped"
+ Paused = "paused"
+ Playing = "playing"
self.prov_id = 'qobuz'
self.mass = mass
self.cache = mass.cache
- self.http_session = aiohttp.ClientSession(loop=mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
self.__username = username
self.__password = password
self.__user_auth_info = None
self.__logged_in = False
- self.throttler = Throttler(rate_limit=2, period=1)
- mass.add_event_listener(self.mass_event, 'streaming_started')
- mass.add_event_listener(self.mass_event, 'streaming_ended')
+ self.mass.event_loop.create_task(self.setup())
+ async def setup(self):
+ ''' perform async setup '''
+ self.http_session = aiohttp.ClientSession(
+ loop=self.mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
+ self.throttler = Throttler(rate_limit=2, period=1)
+ await self.mass.add_event_listener(self.mass_event, 'streaming_started')
+ await self.mass.add_event_listener(self.mass_event, 'streaming_ended')
+
async def search(self, searchstring, media_types=List[MediaType], limit=5):
''' perform search on the provider '''
result = {
self._cur_user = None
self.mass = mass
self.cache = mass.cache
- self.http_session = aiohttp.ClientSession(loop=mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
- self.throttler = Throttler(rate_limit=1, period=1)
self._username = username
self._password = password
self.__auth_token = {}
+ self.mass.event_loop.create_task(self.setup())
+
+ async def setup(self):
+ ''' perform async setup '''
+ self.http_session = aiohttp.ClientSession(
+ loop=self.mass.event_loop, connector=aiohttp.TCPConnector(verify_ssl=False))
+ self.throttler = Throttler(rate_limit=1, period=1)
async def search(self, searchstring, media_types=List[MediaType], limit=5):
''' perform search on the provider '''
from .utils import run_periodic, LOGGER, try_parse_int, try_parse_float, get_ip, run_async_background_task
from .models.media_types import MediaType, TrackQuality
from .models.player_queue import QueueItem
-from .models.player import PlayerState
+from .models.playerstate import PlayerState
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MODULES_PATH = os.path.join(BASE_DIR, "playerproviders" )
self._players = {}
# dynamically load provider modules
self.load_providers()
-
+
@property
def players(self):
- ''' all players as property '''
- return self.mass.bg_executor.submit(asyncio.run,
- self.get_players()).result()
-
- async def get_players(self):
- ''' return all players as a list '''
- items = list(self._players.values())
- items.sort(key=lambda x: x.name, reverse=False)
- return items
+ ''' return list of all players '''
+ return self._players.values()
async def get_player(self, player_id):
''' return player by id '''
return self._players.get(player_id, None)
- async def get_provider_players(self, player_provider):
- ''' return all players for given provider_id '''
- return [item for item in self._players.values() if item.player_provider == player_provider]
+ def get_player_sync(self, player_id):
+ ''' return player by id (non async) '''
+ return self._players.get(player_id, None)
async def add_player(self, player):
''' register a new player '''
self._players[player.player_id] = player
- self.mass.signal_event('player added', player)
+ await self.mass.signal_event('player added', player)
# TODO: turn on player if it was previously turned on ?
return player
async def remove_player(self, player_id):
''' handle a player remove '''
self._players.pop(player_id, None)
- self.mass.signal_event('player removed', player_id)
+ await self.mass.signal_event('player removed', player_id)
async def trigger_update(self, player_id):
''' manually trigger update for a player '''
from ..utils import run_periodic, LOGGER, try_parse_int
from ..models.playerprovider import PlayerProvider
from ..models.player import Player, PlayerState
+from ..models.playerstate import PlayerState
from ..models.player_queue import QueueItem, PlayerQueue
from ..constants import CONF_ENABLED, CONF_HOSTNAME, CONF_PORT
self.prov_id = 'chromecast'
self.name = 'Chromecast'
self._discovery_running = False
+ self.player_config_entries = [("gapless_enabled", False, "gapless_enabled")]
self.mass.event_loop.create_task(self.__periodic_chromecast_discovery())
- async def get_player_config_entries(self):
- ''' get the player config entries for this provider (list with key/value pairs)'''
- return [
- ("gapless_enabled", False, "gapless_enabled")
- ]
-
async def __handle_player_state(self, chromecast, caststatus=None, mediastatus=None):
''' handle a player state message from the socket '''
player_id = str(chromecast.uuid)
player.muted = caststatus.volume_muted
player.volume_level = caststatus.volume_level * 100
if mediastatus:
- # chromecast does not support power on/of so we only set state
if mediastatus.player_state in ['PLAYING', 'BUFFERING']:
player.state = PlayerState.Playing
+ player.powered = True
elif mediastatus.player_state == 'PAUSED':
player.state = PlayerState.Paused
else:
player.state = PlayerState.Stopped
player.cur_uri = mediastatus.content_id
player.cur_time = mediastatus.adjusted_current_time
+ # create update/poll task for the current time
+ async def poll_task():
+ player.poll_task = True
+ while player.state == PlayerState.Playing:
+ player.cur_time = mediastatus.adjusted_current_time
+ await asyncio.sleep(5)
+ player.poll_task = False
+ if not player.poll_task and player.state == PlayerState.Playing:
+ self.mass.event_loop.create_task(poll_task())
+ asyncio.run_coroutine_threadsafe(player.update(), self.mass.event_loop)
async def __handle_group_members_update(self, mz, added_player=None, removed_player=None):
''' callback when cast group members update '''
listenerMedia = StatusMediaListener(chromecast, self.__handle_player_state, self.mass.event_loop)
chromecast.media_controller.register_status_listener(listenerMedia)
player = ChromecastPlayer(self.mass, player_id, self.prov_id)
+ player.poll_task = False
if chromecast.cast_type == 'group':
player.is_group = True
mz = MultizoneController(chromecast.uuid)
''' support for Logitech Media Server '''
def __init__(self, mass, hostname, port):
+ super().__init__(mass)
self.prov_id = 'lms'
self.name = 'Logitech Media Server'
- self.icon = ''
- self.mass = mass
- self._players = {}
self._host = hostname
self._port = port
self.last_msg_received = 0
- self.supported_musicproviders = ['qobuz', 'file', 'spotify', 'http']
self.http_session = aiohttp.ClientSession(loop=mass.event_loop)
# we use a combi of active polling and subscriptions because the cometd implementation of LMS is somewhat unreliable
asyncio.ensure_future(self.__lms_events())
### Provider specific implementation #####
- async def player_config_entries(self):
- ''' get the player config entries for this provider (list with key/value pairs)'''
- return []
async def player_command(self, player_id, cmd:str, cmd_args=None):
''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
''' Python implementation of SlimProto server '''
def __init__(self, mass):
+ super().__init__(mass)
self.prov_id = 'pylms'
self.name = 'Logitech Media Server Emulation'
- self.mass = mass
self._lmsplayers = {}
self.buffer = b''
self.last_msg_received = 0
#json_serializer = partial(json.dumps, default=lambda x: x.__dict__)
def json_serializer(obj):
- # if isinstance(obj, list):
- # lst = []
- # for item in obj:
- # json_obj = json.dumps(item, skipkeys=True, default=lambda x: x.__dict__)
- # lst.append(json_obj)
- # return '[' + ','.join(lst) + ']'
- return json.dumps(obj, skipkeys=True, default=lambda x: x.__dict__)
+ def get_val(val):
+ if isinstance(val, (int, str, bool, float)):
+ return val
+ elif isinstance(val, list):
+ new_list = []
+ for item in val:
+ new_list.append( get_val(item))
+ return new_list
+ elif hasattr(val, 'to_dict'):
+ return get_val(val.to_dict())
+ elif isinstance(val, dict):
+ new_dict = {}
+ for key, value in val.items():
+ new_dict[key] = get_val(value)
+ return new_dict
+ elif hasattr(val, '__dict__'):
+ new_dict = {}
+ for key, value in val.__dict__.items():
+ new_dict[key] = get_val(value)
+ return new_dict
+
+ obj = get_val(obj)
+ return json.dumps(obj, skipkeys=True)
def setup(mass):
''' setup the module and read/apply config'''
self._ssl_cert = ssl_cert
self._ssl_key = ssl_key
self._cert_fqdn_host = cert_fqdn_host
- self.http_session = aiohttp.ClientSession()
- mass.event_loop.create_task(self.setup_web())
+ self.mass.event_loop.create_task(self.setup())
def stop(self):
asyncio.create_task(self.runner.cleanup())
asyncio.create_task(self.http_session.close())
- async def setup_web(self):
+ async def setup(self):
+ ''' perform async setup '''
+ self.http_session = aiohttp.ClientSession()
app = web.Application()
app.add_routes([web.get('/jsonrpc.js', self.json_rpc)])
app.add_routes([web.post('/jsonrpc.js', self.json_rpc)])
app.add_routes([web.get('/api/config', self.get_config)])
app.add_routes([web.post('/api/config', self.save_config)])
app.add_routes([web.get('/api/players', self.players)])
+ app.add_routes([web.get('/api/players/{player_id}', self.player)])
app.add_routes([web.get('/api/players/{player_id}/queue', self.player_queue)])
app.add_routes([web.get('/api/players/{player_id}/cmd/{cmd}', self.player_command)])
app.add_routes([web.get('/api/players/{player_id}/cmd/{cmd}/{cmd_args}', self.player_command)])
async def players(self, request):
''' get all players '''
- return web.json_response(self.mass.player.players, dumps=json_serializer)
+ players = list(self.mass.player.players)
+ players.sort(key=lambda x: x.name, reverse=False)
+ return web.json_response(players, dumps=json_serializer)
+
+ async def player(self, request):
+ ''' get single player '''
+ player_id = request.match_info.get('player_id')
+ player = await self.mass.player.get_player(player_id)
+ return web.json_response(player, dumps=json_serializer)
async def player_command(self, request):
''' issue player command'''
# queue_items = [item.__dict__ for item in queue_items]
# print(queue_items)
# result = queue_items[offset:limit]
- return web.json_response(player.queue.items, dumps=json_serializer)
+ return web.json_response(player.queue.items[offset:limit], dumps=json_serializer)
async def index(self, request):
return web.FileResponse("./web/index.html")
async def send_event(msg, msg_details):
ws_msg = {"message": msg, "message_details": msg_details }
await ws.send_json(ws_msg, dumps=json_serializer)
- cb_id = self.mass.add_event_listener(send_event)
+ cb_id = await self.mass.add_event_listener(send_event)
# process incoming messages
async for msg in ws:
if msg.type != aiohttp.WSMsgType.TEXT:
continue
# for now we only use WS for (simple) player commands
if msg.data == 'players':
- ws_msg = {'message': 'players', 'message_details': self.mass.player.players}
+ players = list(self.mass.player.players)
+ players.sort(key=lambda x: x.name, reverse=False)
+ ws_msg = {'message': 'players', 'message_details': players}
await ws.send_json(ws_msg, dumps=json_serializer)
elif msg.data.startswith('players') and '/cmd/' in msg.data:
# players/{player_id}/cmd/{cmd} or players/{player_id}/cmd/{cmd}/{cmd_args}
except Exception as exc:
LOGGER.exception(exc)
finally:
- self.mass.remove_event_listener(cb_id)
+ await self.mass.remove_event_listener(cb_id)
LOGGER.debug('websocket connection closed')
return ws
self.mass.config[key] = new_config[key]
if config_changed:
self.mass.save_config()
- self.mass.signal_event('config_changed')
+ await self.mass.signal_event('config_changed')
return web.Response(text='success')
async def json_rpc(self, request):