"""Catch UpnpError errors."""
@functools.wraps(func)
- async def wrapper(
- self: _DLNAPlayerProviderT, *args: _P.args, **kwargs: _P.kwargs
- ) -> _R | None:
+ async def wrapper(self: _DLNAPlayerProviderT, *args: _P.args, **kwargs: _P.kwargs) -> _R | None:
"""Catch UpnpError errors and check availability before and after request."""
player_id = kwargs["player_id"] if "player_id" in kwargs else args[0]
dlna_player = self.dlnaplayers[player_id]
dlna_player.player.display_name,
)
if not dlna_player.available:
- self.logger.warning(
- "Device disappeared when trying to call %s", func.__name__
- )
+ self.logger.warning("Device disappeared when trying to call %s", func.__name__)
return None
try:
return await func(self, *args, **kwargs)
"""Handle async initialization of the provider."""
self.dlnaplayers = {}
self.lock = asyncio.Lock()
- self.requester = AiohttpSessionRequester(
- self.mass.http_session, with_sleep=True
- )
+ self.requester = AiohttpSessionRequester(self.mass.http_session, with_sleep=True)
self.upnp_factory = UpnpFactory(self.requester, non_strict=True)
self.notify_server = DLNANotifyServer(self.requester, self.mass)
self.mass.create_task(self._run_discovery())
- seek_position: Optional seek to this position.
- fade_in: Optionally fade in the item at playback start.
"""
- use_flow_mode = await self.mass.config.get_player_config_value(
- player_id, CONF_FLOW_MODE
- )
- enforce_mp3 = await self.mass.config.get_player_config_value(
- player_id, CONF_ENFORCE_MP3
- )
+ use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
+ enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
url = await self.mass.streams.resolve_stream_url(
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
await self.poll_player(dlna_player.udn)
@catch_request_errors
- async def play_stream(
- self, player_id: str, stream_job: MultiClientStreamJob
- ) -> None:
+ async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
"""Handle PLAY STREAM on given player.
This is a special feature from the Universal Group provider.
"""
- enforce_mp3 = await self.mass.config.get_player_config_value(
- player_id, CONF_ENFORCE_MP3
- )
+ enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
output_codec = ContentType.MP3 if enforce_mp3 else ContentType.FLAC
url = stream_job.resolve_stream_url(player_id, output_codec)
dlna_player = self.dlnaplayers[player_id]
if dlna_player.device.can_stop:
await self.cmd_stop(player_id)
didl_metadata = create_didl_metadata(self.mass, url, None)
- await dlna_player.device.async_set_transport_uri(
- url, "Music Assistant", didl_metadata
- )
+ await dlna_player.device.async_set_transport_uri(url, "Music Assistant", didl_metadata)
# Play it
await dlna_player.device.async_wait_for_can_play(10)
# optimistically set this timestamp to help in case of a player
await self.poll_player(dlna_player.udn)
@catch_request_errors
- async def enqueue_next_queue_item(
- self, player_id: str, queue_item: QueueItem
- ) -> None:
+ async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
"""Handle enqueuing of the next queue item on the player."""
dlna_player = self.dlnaplayers[player_id]
url = await self.mass.streams.resolve_stream_url(
# we iterate between using a regular and multicast search (if enabled)
if allow_network_scan and use_multicast:
- await async_search(
- on_response, target=(str(IPv4Address("255.255.255.255")), 1900)
- )
+ await async_search(on_response, target=(str(IPv4Address("255.255.255.255")), 1900))
else:
await async_search(on_response)
async with self.lock:
if dlna_player := self.dlnaplayers.get(udn):
# existing player
- if (
- dlna_player.description_url == description_url
- and dlna_player.player.available
- ):
+ if dlna_player.description_url == description_url and dlna_player.player.available:
# nothing to do, device is already connected
return
# update description url to newly discovered one
return
# Connect to the base UPNP device
- upnp_device = await self.upnp_factory.async_create_device(
- dlna_player.description_url
- )
+ upnp_device = await self.upnp_factory.async_create_device(dlna_player.description_url)
# Create profile wrapper
- dlna_player.device = DmrDevice(
- upnp_device, self.notify_server.event_handler
- )
+ dlna_player.device = DmrDevice(upnp_device, self.notify_server.event_handler)
# Subscribe to event notifications
try:
# Don't leave the device half-constructed
dlna_player.device.on_event = None
dlna_player.device = None
- self.logger.debug(
- "Error while subscribing during device connect: %r", err
- )
+ self.logger.debug("Error while subscribing during device connect: %r", err)
raise
else:
# connect was successful, update device info
for state_variable in state_variables:
# Force a state refresh when player begins or pauses playback
# to update the position info.
- if (
- state_variable.name == "TransportState"
- and state_variable.value
- in (
- TransportState.PLAYING,
- TransportState.PAUSED_PLAYBACK,
- )
+ if state_variable.name == "TransportState" and state_variable.value in (
+ TransportState.PLAYING,
+ TransportState.PAUSED_PLAYBACK,
):
dlna_player.force_poll = True
self.mass.create_task(self.poll_player(dlna_player.udn))
"""Set Player Features based on config values and capabilities."""
dlna_player.player.supported_features = BASE_PLAYER_FEATURES
player_id = dlna_player.player.player_id
- if self.mass.config.get_raw_player_config_value(
- player_id, CONF_ENQUEUE_NEXT, False
- ):
+ if self.mass.config.get_raw_player_config_value(player_id, CONF_ENQUEUE_NEXT, False):
dlna_player.player.supported_features = (
*dlna_player.player.supported_features,
PlayerFeature.ENQUEUE_NEXT,