if not self._chromecast.socket_client.is_connected:
LOGGER.warning("Ignore player command: Socket client is not connected.")
return
- with suppress(pychromecast.error.NotConnected):
+ with SuppressChromeCastError(self.name):
self._chromecast.media_controller.stop()
def play(self):
if not self._chromecast.socket_client.is_connected:
LOGGER.warning("Ignore player command: Socket client is not connected.")
return
- with suppress(pychromecast.error.NotConnected):
+ with SuppressChromeCastError(self.name):
self._chromecast.media_controller.play()
def pause(self):
if not self._chromecast.socket_client.is_connected:
LOGGER.warning("Ignore player command: Socket client is not connected.")
return
- with suppress(pychromecast.error.NotConnected):
+ with SuppressChromeCastError(self.name):
self._chromecast.media_controller.pause()
def next(self):
if not self._chromecast.socket_client.is_connected:
LOGGER.warning("Ignore player command: Socket client is not connected.")
return
- with suppress(pychromecast.error.NotConnected):
+ with SuppressChromeCastError(self.name):
self._chromecast.media_controller.queue_next()
def previous(self):
if not self._chromecast.socket_client.is_connected:
LOGGER.warning("Ignore player command: Socket client is not connected.")
return
- with suppress(pychromecast.error.NotConnected):
+ with SuppressChromeCastError(self.name):
self._chromecast.media_controller.queue_prev()
def power_on(self):
LOGGER.warning("Ignore player command: Socket client is not connected.")
return
self._powered = True
- with suppress(pychromecast.error.NotConnected):
+ with SuppressChromeCastError(self.name):
self._chromecast.set_volume_muted(False)
def power_off(self):
"""Send power OFF command to player."""
- with suppress(pychromecast.error.NotConnected):
+ with SuppressChromeCastError(self.name):
if self.media_status and (
self.media_status.player_is_playing
or self.media_status.player_is_paused
def volume_set(self, volume_level):
"""Send new volume level command to player."""
- with suppress(pychromecast.error.NotConnected):
+ with SuppressChromeCastError(self.name):
self._chromecast.set_volume(volume_level)
def volume_mute(self, is_muted=False):
"""Send mute command to player."""
- with suppress(pychromecast.error.NotConnected):
+ with SuppressChromeCastError(self.name):
self._chromecast.set_volume_muted(is_muted)
def play_uri(self, uri: str):
"""Play single uri on player."""
- with suppress(pychromecast.error.NotConnected):
- player_queue = self.mass.player_manager.get_player_queue(self.player_id)
- if player_queue.use_queue_stream:
- # create CC queue so that skip and previous will work
- queue_item = QueueItem()
- queue_item.name = "Music Assistant"
- queue_item.uri = uri
- return self.queue_load([queue_item, queue_item])
+ player_queue = self.mass.player_manager.get_player_queue(self.player_id)
+ if player_queue.use_queue_stream:
+ # create CC queue so that skip and previous will work
+ queue_item = QueueItem()
+ queue_item.name = "Music Assistant"
+ queue_item.uri = uri
+ return self.queue_load([queue_item, queue_item])
+ with SuppressChromeCastError(self.name):
self._chromecast.play_media(uri, "audio/flac")
def queue_load(self, queue_items: List[QueueItem]):
"""Load (overwrite) queue with new items."""
- with suppress(pychromecast.error.NotConnected):
- player_queue = self.mass.player_manager.get_player_queue(self.player_id)
- cc_queue_items = self.__create_queue_items(queue_items[:50])
- repeat_enabled = (
- player_queue.use_queue_stream or player_queue.repeat_enabled
- )
- queuedata = {
- "type": "QUEUE_LOAD",
- "repeatMode": "REPEAT_ALL" if repeat_enabled else "REPEAT_OFF",
- "shuffle": False, # handled by our queue controller
- "queueType": "PLAYLIST",
- "startIndex": 0, # Item index to play after this request or keep same item if undefined
- "items": cc_queue_items, # only load 50 tracks at once or the socket will crash
- }
- self.__send_player_queue(queuedata)
- if len(queue_items) > 50:
- self.queue_append(queue_items[51:])
+ player_queue = self.mass.player_manager.get_player_queue(self.player_id)
+ cc_queue_items = self.__create_queue_items(queue_items[:50])
+ repeat_enabled = player_queue.use_queue_stream or player_queue.repeat_enabled
+ queuedata = {
+ "type": "QUEUE_LOAD",
+ "repeatMode": "REPEAT_ALL" if repeat_enabled else "REPEAT_OFF",
+ "shuffle": False, # handled by our queue controller
+ "queueType": "PLAYLIST",
+ "startIndex": 0, # Item index to play after this request or keep same item if undefined
+ "items": cc_queue_items, # only load 50 tracks at once or the socket will crash
+ }
+ self.__send_player_queue(queuedata)
+ if len(queue_items) > 50:
+ self.queue_append(queue_items[51:])
def queue_append(self, queue_items: List[QueueItem]):
"""Append new items at the end of the queue."""
- with suppress(pychromecast.error.NotConnected):
- cc_queue_items = self.__create_queue_items(queue_items)
- for chunk in chunks(cc_queue_items, 50):
- queuedata = {
- "type": "QUEUE_INSERT",
- "insertBefore": None,
- "items": chunk,
- }
- self.__send_player_queue(queuedata)
+ cc_queue_items = self.__create_queue_items(queue_items)
+ for chunk in chunks(cc_queue_items, 50):
+ queuedata = {
+ "type": "QUEUE_INSERT",
+ "insertBefore": None,
+ "items": chunk,
+ }
+ self.__send_player_queue(queuedata)
def __create_queue_items(self, tracks):
"""Create list of CC queue items from tracks."""
def __send_player_queue(self, queuedata):
"""Send new data to the CC queue."""
- media_controller = self._chromecast.media_controller
- # pylint: disable=protected-access
- receiver_ctrl = media_controller._socket_client.receiver_controller
-
- def send_queue():
- """Plays media after chromecast has switched to requested app."""
- queuedata["mediaSessionId"] = media_controller.status.media_session_id
- media_controller.send_message(queuedata, inc_session_id=False)
-
- if not media_controller.status.media_session_id:
- receiver_ctrl.launch_app(
- media_controller.app_id, callback_function=send_queue
- )
- else:
- send_queue()
+ with SuppressChromeCastError(self.name):
+ media_controller = self._chromecast.media_controller
+ # pylint: disable=protected-access
+ receiver_ctrl = media_controller._socket_client.receiver_controller
+
+ def send_queue():
+ """Plays media after chromecast has switched to requested app."""
+ queuedata["mediaSessionId"] = media_controller.status.media_session_id
+ media_controller.send_message(queuedata, inc_session_id=False)
+
+ if not media_controller.status.media_session_id:
+ receiver_ctrl.launch_app(
+ media_controller.app_id, callback_function=send_queue
+ )
+ else:
+ send_queue()
def chunks(_list, chunk_size):
"""Yield successive n-sized chunks from list."""
for i in range(0, len(_list), chunk_size):
yield _list[i : i + chunk_size]
+
+
+class SuppressChromeCastError(suppress):
+ """Context manager to suppress Chromecast connection error."""
+
+ def __init__(self, player_id):
+ """Handle init of the contextmanager."""
+ # pylint: disable=super-init-not-called
+ self.player_id = player_id
+
+ def __exit__(self, exctype, excinst, exctb):
+ """Handle exit of the contextmanager."""
+ if exctype is not None and issubclass(exctype, pychromecast.error.NotConnected):
+ LOGGER.warning(
+ "Chromecast client %s is not connected, ignoring command...",
+ self.player_id,
+ )
+ return exctype is None