import operator
import random
import uuid
+import os
+import pickle
-from ..utils import LOGGER
+from ..utils import LOGGER, json, filename_from_string
from ..constants import CONF_ENABLED
from .media_types import Track, TrackQuality
from .playerstate import PlayerState
self._repeat_enabled = False
self._cur_index = 0
self._cur_item_time = 0
- self._last_index = 0
+ self._last_cur_item_time = 0
+ self._last_index = 0
+ self._last_player_state = PlayerState.Stopped
+ self._save_busy_ = False
+ # load previous queue settings from disk
+ self.__load_from_file()
@property
def shuffle_enabled(self):
'''
insert new items at offset x from current position
keeps remaining items in queue
- if offset 0 or None, will start playing newly added item(s)
+ if offset 0, will start playing newly added item(s)
:param queue_items: a list of QueueItem
:param offset: offset from current queue position
'''
if offset == 0:
return await self.play_index(insert_at_index)
else:
- return await self._player.cmd_queue_insert(queue_items, offset)
+ try:
+ await self._player.cmd_queue_insert(queue_items, insert_at_index)
+ except NotImplementedError:
+ # not supported by player, use load queue instead
+ LOGGER.debug("cmd_queue_insert not supported by player, fallback to cmd_queue_load ")
+ await self._player.cmd_queue_load(self._items[insert_at_index:])
async def append(self, queue_items:List[QueueItem]):
'''
queue_items = await self.__shuffle_items(queue_items)
self._items = self._items + queue_items
if self._player.supports_queue:
- return await self._player.cmd_queue_append(queue_items)
+ try:
+ return await self._player.cmd_queue_append(queue_items)
+ except NotImplementedError:
+ # not supported by player, use load queue instead
+ LOGGER.debug("cmd_queue_append not supported by player, fallback to cmd_queue_load ")
+ await self._player.cmd_queue_load(self._items[self.cur_index:])
async def update(self):
''' update queue details, called when player updates '''
+ # determine queue index and cur_time for queue stream
if self.use_queue_stream and self._player.state == PlayerState.Playing:
- # determine queue index and cur_time for queue stream
# player is playing a constant stream of the queue so we need to do this the hard way
+ cur_index = self._cur_index
cur_time_queue = self._player._cur_time
total_time = 0
track_time = 0
else:
track_time = cur_time_queue - total_time
break
- self._cur_index = queue_index
+ cur_index = queue_index
self._cur_item_time = track_time
+ # normal queue based approach
elif not self.use_queue_stream:
- # normal queue based approach
- cur_index = 0
+ if 'queue_item_id' in self._player.cur_uri:
+ queue_item_id = self._player.cur_uri.split('queue_item_id=')[1]
for index, queue_item in enumerate(self.items):
if queue_item.uri == self._player.cur_uri:
cur_index = index
break
- self._cur_index = cur_index
+ # process new index
+ await self.__update_index(cur_index)
async def start_queue_stream(self):
''' called by the queue streamer when it starts playing the queue stream '''
self._last_index = self.cur_index
return await self.get_item(self.cur_index)
+ async def __update_index(self, new_index):
+ ''' compare the queue index to determine if playback changed '''
+ if new_index != self._last_index:
+ LOGGER.info("new track loaded in queue")
+ self._cur_index = new_index
+ elif (self._last_player_state == PlayerState.Stopped and
+ self._player.state == PlayerState.Playing and
+ self.cur_item):
+ LOGGER.info("Player %s started playing %s" % self.cur_item.name)
+ elif (self._last_player_state == PlayerState.Playing and
+ self._player.state == PlayerState.Stopped and
+ self.cur_item):
+ LOGGER.info("Player %s stopped playing %s" % self.cur_item.name)
+ # always update timestamp
+ self._last_cur_item_time = self.cur_item_time
+
async def __shuffle_items(self, queue_items):
''' shuffle a list of tracks '''
# for now we use default python random function
# can be extended with some more magic last last_played and stuff
- return random.sample(queue_items, len(queue_items))
\ No newline at end of file
+ return random.sample(queue_items, len(queue_items))
+
+ def __load_from_file(self):
+ ''' try to load the saved queue for this player from file '''
+ player_safe_str = filename_from_string(self._player.player_id)
+ settings_dir = os.path.join(self.mass.datapath, 'queue')
+ player_file = os.path.join(settings_dir, player_safe_str)
+ if os.path.isfile(player_file):
+ try:
+ with open(player_file) as f:
+ data = pickle.load(f)
+ self._shuffle_enabled = data["shuffle_enabled"]
+ self._repeat_enabled = data["repeat_enabled"]
+ self._items = data["items"]
+ self._cur_index = data["cur_item"]
+ self._last_index = data["last_index"]
+ except Exception as exc:
+ LOGGER.debug("Could not load queue from disk - %s" % str(exc))
+
+ def __save_to_file(self):
+ ''' save current queue settings to file '''
+ if self._save_busy_:
+ return
+ self._save_busy_ = True
+ player_safe_str = filename_from_string(self._player.player_id)
+ settings_dir = os.path.join(self.mass.datapath, 'queue')
+ player_file = os.path.join(settings_dir, player_safe_str)
+ data = {
+ "shuffle_enabled": self._shuffle_enabled,
+ "repeat_enabled": self._repeat_enabled,
+ "items": self._items,
+ "cur_item": self._cur_index,
+ "last_index": self._last_index
+ }
+ if not os.path.isdir(settings_dir):
+ os.mkdir(settings_dir)
+ with open(player_file, 'w+') as f:
+ pickle.dump(data, f)
+ self._save_busy_ = False
+
+
import logging
import pychromecast
from pychromecast.controllers.multizone import MultizoneController
-from pychromecast.controllers import BaseController
-from pychromecast.controllers.media import MediaController
+from pychromecast.socket_client import CONNECTION_STATUS_CONNECTED, CONNECTION_STATUS_DISCONNECTED
import types
from ..utils import run_periodic, LOGGER, try_parse_int
class ChromecastPlayer(Player):
''' Chromecast player object '''
+
+ async def try_chromecast_command(self, cmd:types.MethodType, *args, **kwargs):
+ ''' guard for disconnected socket client '''
+ try:
+ cmd(*args, **kwargs)
+ except (pychromecast.error.NotConnected, AttributeError):
+ LOGGER.warning("Chromecast %s is not connected!" % self.name)
+ except Exception as exc:
+ LOGGER.warning(exc)
async def cmd_stop(self):
''' send stop command to player '''
- self.cc.media_controller.stop()
+ await self.try_chromecast_command(self.cc.media_controller.stop)
async def cmd_play(self):
''' send play command to player '''
- self.cc.media_controller.play()
+ await self.try_chromecast_command(self.cc.media_controller.play)
async def cmd_pause(self):
''' send pause command to player '''
- self.cc.media_controller.pause()
+ await self.try_chromecast_command(self.cc.media_controller.pause)
async def cmd_next(self):
''' send next track command to player '''
- self.cc.media_controller.queue_next()
+ await self.try_chromecast_command(self.cc.media_controller.queue_next)
async def cmd_previous(self):
''' [CAN OVERRIDE] send previous track command to player '''
- self.cc.media_controller.queue_prev()
+ await self.try_chromecast_command(self.cc.media_controller.queue_prev)
async def cmd_power_on(self):
''' send power ON command to player '''
self.powered = False
# power is not supported so send quit_app instead
if not self.group_parent:
- self.cc.quit_app()
+ await self.try_chromecast_command(self.cc.quit_app)
async def cmd_volume_set(self, volume_level):
''' send new volume level command to player '''
- self.cc.set_volume(volume_level/100)
+ await self.try_chromecast_command(self.cc.set_volume, volume_level/100)
self.volume_level = volume_level
async def cmd_volume_mute(self, is_muted=False):
''' send mute command to player '''
- self.cc.set_volume_muted(is_muted)
+ await self.try_chromecast_command(self.cc.set_volume_muted, is_muted)
async def cmd_play_uri(self, uri:str):
''' play single uri on player '''
- self.cc.play_media(uri, 'audio/flac')
+ if self.queue.use_queue_stream:
+ # create CC queue so that skip and previous will work
+ queue_item = QueueItem()
+ queue_item.name = "Music Assistant"
+ queue_item.uri = uri
+ return await self.cmd_queue_load([queue_item, queue_item])
+ else:
+ await self.try_chromecast_command(self.cc.play_media, uri, 'audio/flac')
async def cmd_queue_load(self, queue_items:List[QueueItem]):
''' load (overwrite) queue with new items '''
queuedata = {
"type": 'QUEUE_LOAD',
"repeatMode": "REPEAT_ALL" if self.queue.repeat_enabled else "REPEAT_OFF",
- "shuffle": self.queue.shuffle_enabled,
+ "shuffle": False, # handled by our queue controller
"queueType": "PLAYLIST",
"startIndex": 0, # Item index to play after this request or keep same item if undefined
"items": cc_queue_items # only load 50 tracks at once or the socket will crash
}
- await self.__send_player_queue(queuedata)
+ await self.try_chromecast_command(self.__send_player_queue, queuedata)
await asyncio.sleep(0.2)
if len(queue_items) > 50:
await self.cmd_queue_append(queue_items[51:])
await asyncio.sleep(0.2)
- async def cmd_queue_insert(self, queue_items:List[QueueItem], offset=0):
- '''
- insert new items at offset x from current position
- keeps remaining items in queue
- if offset 0 or None, will start playing newly added item(s)
- :param queue_items: a list of QueueItem
- :param offset: offset from current queue position
- '''
- insert_before = self.queue.cur_index + offset
- cc_queue_items = await self.__create_queue_items(queue_items)
- for chunk in chunks(cc_queue_items, 50):
- queuedata = {
- "type": 'QUEUE_INSERT',
- "insertBefore": insert_before,
- "items": chunk
- }
- await self.__send_player_queue(queuedata)
+ async def cmd_queue_insert(self, queue_items:List[QueueItem], insert_at_index):
+ # for now we don't support this as google requires a special internal id
+ # as item id to determine the insert position
+ # https://developers.google.com/cast/docs/reference/caf_receiver/cast.framework.QueueManager#insertItems
+ raise NotImplementedError
async def cmd_queue_append(self, queue_items:List[QueueItem]):
'''
"insertBefore": None,
"items": chunk
}
- await self.__send_player_queue(queuedata)
+ await self.try_chromecast_command(self.__send_player_queue, queuedata)
async def __create_queue_items(self, tracks):
''' create list of CC queue items from tracks '''
async def __create_queue_item(self, track):
'''create CC queue item from track info '''
return {
+ 'opt_itemId': track.queue_item_id,
'autoplay' : True,
'preloadTime' : 10,
'playbackDuration': int(track.duration),
'customData': {
'provider': track.provider,
'uri': track.uri,
- 'item_id': track.item_id
+ 'item_id': track.queue_item_id
},
'contentType': "audio/flac",
- 'streamType': 'BUFFERED',
+ 'streamType': 'LIVE' if self.queue.use_queue_stream else 'BUFFERED',
'metadata': {
'title': track.name,
'artist': track.artists[0].name if track.artists else "",
}
}
- async def __send_player_queue(self, queuedata):
+ def __send_player_queue(self, queuedata):
'''send new data to the CC queue'''
media_controller = self.cc.media_controller
receiver_ctrl = media_controller._socket_client.receiver_controller
def send_queue():
- """Plays media after chromecast has switched to requested app."""
- queuedata['mediaSessionId'] = media_controller.status.media_session_id
- media_controller.send_message(queuedata, inc_session_id=False)
+ """Plays media after chromecast has switched to requested app."""
+ queuedata['mediaSessionId'] = media_controller.status.media_session_id
+ media_controller.send_message(queuedata, inc_session_id=False)
if not media_controller.status.media_session_id:
receiver_ctrl.launch_app(media_controller.app_id, callback_function=send_queue)
else:
send_queue()
- await asyncio.sleep(0.2)
+
+ async def handle_player_state(self, caststatus=None,
+ mediastatus=None, connection_status=None):
+ ''' handle a player state message from the socket '''
+ # handle connection status
+ if connection_status:
+ if self.mz and connection_status.status == CONNECTION_STATUS_CONNECTED:
+ return self.mz.update_members()
+ elif connection_status.status == CONNECTION_STATUS_DISCONNECTED:
+ # schedule a new scan which will handle group parent changes
+ return self.mass.event_loop.create_task(
+ self.mass.players.providers[self.player_provider].start_chromecast_discovery())
+ # handle generic cast status
+ if caststatus:
+ self.name = self.cc.name
+ self.muted = caststatus.volume_muted
+ self.volume_level = caststatus.volume_level * 100
+ # handle media status
+ if mediastatus:
+ if mediastatus.player_state in ['PLAYING', 'BUFFERING']:
+ self.state = PlayerState.Playing
+ self.powered = True
+ elif mediastatus.player_state == 'PAUSED':
+ self.state = PlayerState.Paused
+ else:
+ self.state = PlayerState.Stopped
+ self.cur_uri = mediastatus.content_id
+ self.cur_time = mediastatus.adjusted_current_time
+ # create update/poll task for the current time
+ async def poll_task():
+ self.poll_task = True
+ while self.state == PlayerState.Playing:
+ self.cur_time = mediastatus.adjusted_current_time
+ await asyncio.sleep(1)
+ self.poll_task = False
+ if not self.poll_task and self.state == PlayerState.Playing:
+ self.mass.event_loop.create_task(poll_task())
+ # we are called from socket client thread so do this threadsafe!
+ #asyncio.run_coroutine_threadsafe(self.update(), self.mass.event_loop)
class ChromecastProvider(PlayerProvider):
''' support for ChromeCast Audio '''
self.mass.event_loop.create_task(
self.__periodic_chromecast_discovery())
- async def __handle_player_state(self, chromecast, caststatus=None, mediastatus=None):
- ''' handle a player state message from the socket '''
- player_id = str(chromecast.uuid)
- player = await self.get_player(player_id)
- # always update player details that may change
- player.name = chromecast.name
- if caststatus:
- player.muted = caststatus.volume_muted
- player.volume_level = caststatus.volume_level * 100
- if mediastatus:
- if mediastatus.player_state in ['PLAYING', 'BUFFERING']:
- player.state = PlayerState.Playing
- player.powered = True
- elif mediastatus.player_state == 'PAUSED':
- player.state = PlayerState.Paused
- else:
- player.state = PlayerState.Stopped
- player.cur_uri = mediastatus.content_id
- player.cur_time = mediastatus.adjusted_current_time
- # create update/poll task for the current time
- async def poll_task():
- player.poll_task = True
- while player.state == PlayerState.Playing:
- player.cur_time = mediastatus.adjusted_current_time
- await asyncio.sleep(1)
- player.poll_task = False
- if not player.poll_task and player.state == PlayerState.Playing:
- self.mass.event_loop.create_task(poll_task())
- asyncio.run_coroutine_threadsafe(player.update(), self.mass.event_loop)
-
async def __handle_group_members_update(self, mz, added_player=None, removed_player=None):
- ''' callback when cast group members update '''
+ ''' handle callback from multizone manager '''
if added_player:
player = await self.get_player(added_player)
group_player = await self.get_player(str(mz._uuid))
@run_periodic(1800)
async def __periodic_chromecast_discovery(self):
''' run chromecast discovery on interval '''
- await self.__chromecast_discovery()
+ await self.start_chromecast_discovery()
- async def __chromecast_discovery(self):
+ async def start_chromecast_discovery(self):
''' background non-blocking chromecast discovery and handler '''
if self._discovery_running:
return
removed_players = []
for player in self.players:
if not player.cc.socket_client or not player.cc.socket_client.is_connected:
- LOGGER.warning("%s is disconnected" % player.name)
+ removed_players.append(player.player_id)
+ for child_player in player.group_childs:
+ # update childs
+ child_player.group_parent = None
# cleanup cast object
del player.cc
- removed_players.append(player.player_id)
# signal removed players
for player_id in removed_players:
await self.remove_player(player_id)
''' callback when a (new) chromecast device is discovered '''
from pychromecast import _get_chromecast_from_host, ChromecastConnectionError
try:
- chromecast = _get_chromecast_from_host(discovery_info, tries=2, retry_wait=5)
+ chromecast = _get_chromecast_from_host(discovery_info, tries=2, timeout=5, retry_wait=5)
except ChromecastConnectionError:
LOGGER.warning("Could not connect to device %s" % player_id)
return
- # patch the receive message method for handling queue status updates
- chromecast.media_controller.queue_items = []
- chromecast.media_controller.queue_cur_id = None
- chromecast.media_controller.receive_message = types.MethodType(receive_message, chromecast.media_controller)
- listenerCast = StatusListener(chromecast, self.__handle_player_state, self.mass.event_loop)
- chromecast.register_status_listener(listenerCast)
- listenerMedia = StatusMediaListener(chromecast, self.__handle_player_state, self.mass.event_loop)
- chromecast.media_controller.register_status_listener(listenerMedia)
player = ChromecastPlayer(self.mass, player_id, self.prov_id)
+ player.cc = chromecast
+ player.mz = None
player.poll_task = False
self.supports_queue = True
self.supports_gapless = False
self.supports_crossfade = False
+ # register status listeners
+ status_listener = StatusListener(player_id,
+ player.handle_player_state, self.mass.event_loop)
if chromecast.cast_type == 'group':
player.is_group = True
mz = MultizoneController(chromecast.uuid)
mz.register_listener(MZListener(mz, self.__handle_group_members_update, self.mass.event_loop))
chromecast.register_handler(mz)
- chromecast.register_connection_listener(MZConnListener(mz))
- chromecast.mz = mz
- player.cc = chromecast
+ player.mz = mz
+ chromecast.register_connection_listener(status_listener)
+ chromecast.register_status_listener(status_listener)
+ chromecast.media_controller.register_status_listener(status_listener)
player.cc.wait()
await self.add_player(player)
- await self.update_all_group_members()
-
- async def update_all_group_members(self):
- ''' force member update of all cast groups '''
- for player in self.players:
- if player.cc.cast_type == 'group':
- player.cc.mz.update_members()
def chunks(l, n):
class StatusListener:
- def __init__(self, chromecast, callback, loop):
- self.chromecast = chromecast
- self.__handle_player_state = callback
+ def __init__(self, player_id, status_callback, loop):
+ self.__handle_callback = status_callback
self.loop = loop
+ self.player_id = player_id
def new_cast_status(self, status):
+ ''' chromecast status changed (like volume etc.)'''
asyncio.run_coroutine_threadsafe(
- self.__handle_player_state(self.chromecast, caststatus=status), self.loop)
-
-class StatusMediaListener:
- def __init__(self, chromecast, callback, loop):
- self.chromecast= chromecast
- self.__handle_player_state = callback
- self.loop = loop
+ self.__handle_callback(caststatus=status), self.loop)
def new_media_status(self, status):
+ ''' mediacontroller has new state '''
asyncio.run_coroutine_threadsafe(
- self.__handle_player_state(self.chromecast, mediastatus=status), self.loop)
-
-class MZConnListener:
- def __init__(self, mz):
- self._mz=mz
- def new_connection_status(self, connection_status):
- """Handle reception of a new ConnectionStatus."""
- if connection_status.status == 'CONNECTED':
- self._mz.update_members()
+ self.__handle_callback(mediastatus=status), self.loop)
+ def new_connection_status(self, status):
+ ''' will be called when the connection changes '''
+ asyncio.run_coroutine_threadsafe(
+ self.__handle_callback(connection_status=status), self.loop)
class MZListener:
def __init__(self, mz, callback, loop):
def multizone_status_received(self):
asyncio.run_coroutine_threadsafe(
self.__handle_group_members_update(self._mz), self._loop)
-
-def receive_message(self, message, data):
- """ Called when a media message is received. """
- #LOGGER.info('message: %s - data: %s'%(message, data))
- if data['type'] == 'MEDIA_STATUS':
- try:
- self.queue_items = data['status'][0]['items']
- except:
- pass
- try:
- self.queue_cur_id = data['status'][0]['currentItemId']
- except:
- pass
- self._process_media_status(data)
- return True
- return False
\ No newline at end of file