import platform
import socket
import time
+from collections.abc import AsyncGenerator
from random import randint, randrange
from typing import TYPE_CHECKING, cast
-import aiofiles
-import shortuuid
-from aiofiles.os import wrap
from pyatv import connect, exceptions, interface, scan
from pyatv.conf import AppleTV as ATVConf
from pyatv.const import DeviceModel, DeviceState, PowerState, Protocol
)
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.models.player_provider import PlayerProvider
CONF_ALAC_ENCODE = "alac_encode"
CONF_VOLUME_START = "volume_start"
CONF_SYNC_ADJUST = "sync_adjust"
+CONF_PASSWORD = "password"
PLAYER_CONFIG_ENTRIES = (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
"you can shift the audio a bit.",
advanced=True,
),
+ ConfigEntry(
+ key=CONF_PASSWORD,
+ type=ConfigEntryType.STRING,
+ default_value=None,
+ required=False,
+ label="Device password",
+ description="Some devices require a password to connect/play.",
+ advanced=True,
+ ),
)
BACKOFF_TIME_LOWER_LIMIT = 15 # seconds
BACKOFF_TIME_UPPER_LIMIT = 300 # Five minutes
if abs(volume - int(atv_player.atv.audio.volume)) > 2:
self.mass.create_task(self.cmd_volume_set(player_id, volume))
else:
- self.logger.warning(
+ self.logger.debug(
"Unknown DACP request for %s: %s",
atv_player.discovery_info.name,
path,
"""
# stop existing streams first
await self.cmd_stop(player_id)
+ # power on player if needed
+ # start streaming the queue (pcm) audio in a background task
+ queue = self.mass.player_queues.get_active_queue(player_id)
+ self._stream_tasks[player_id] = asyncio.create_task(
+ self._stream_audio(
+ player_id,
+ queue=queue,
+ audio_iterator=self.mass.streams.get_flow_stream(
+ queue,
+ start_queue_item=queue_item,
+ pcm_format=AudioFormat(
+ content_type=ContentType.PCM_S16LE,
+ sample_rate=44100,
+ bit_depth=16,
+ channels=2,
+ ),
+ seek_position=seek_position,
+ fade_in=fade_in,
+ ),
+ )
+ )
+
+ async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
+ """Handle PLAY STREAM on given player.
+
+ This is a special feature from the Universal Group provider.
+ """
+ # stop existing streams first
+ await self.cmd_stop(player_id)
+ # power on player if needed
await self.cmd_power(player_id, True)
- atv_player = self._atv_players[player_id]
- player = self.mass.players.get(player_id)
+ if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100:
+ # TODO: resample on the fly here ?
+ raise RuntimeError("Unsupported PCM format")
+ # start streaming the queue (pcm) audio in a background task
+ queue = self.mass.player_queues.get_active_queue(player_id)
+ self._stream_tasks[player_id] = asyncio.create_task(
+ self._stream_audio(
+ player_id,
+ queue=queue,
+ audio_iterator=stream_job.subscribe(player_id),
+ )
+ )
+ async def _stream_audio(
+ self, player_id: str, queue: PlayerQueue, audio_iterator: AsyncGenerator[bytes, None]
+ ) -> None:
+ """Handle the actual streaming of audio to Airplay."""
+ player = self.mass.players.get(player_id)
if player.synced_to:
# should not happen, but just in case
raise RuntimeError("Player is synced")
-
+ player.elapsed_time = 0
+ player.elapsed_time_last_updated = time.time()
+ player.state = PlayerState.PLAYING
+ self.mass.players.update(player_id)
# NOTE: Although the pyatv library is perfectly capable of playback
# to not only raop targets but also airplay 1 + 2, its not suitable
# for synced playback to multiple clients at once.
# just in case...
await atv_player.connect()
tg.create_task(self._init_cliraop(atv_player, ntp))
-
- async def _streamer() -> None:
- queue = self.mass.player_queues.get(queue_item.queue_id)
- player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
- player.elapsed_time = 0
- player.elapsed_time_last_updated = time.time()
- player.state = PlayerState.PLAYING
- self.mass.players.register_or_update(player)
- prev_metadata_checksum: str = ""
- pcm_format = AudioFormat(
- content_type=ContentType.PCM_S16LE,
- sample_rate=44100,
- bit_depth=16,
- channels=2,
- )
- try:
- async for pcm_chunk in self.mass.streams.get_flow_stream(
- queue,
- start_queue_item=queue_item,
- pcm_format=pcm_format,
- seek_position=seek_position,
- fade_in=fade_in,
- ):
- # send metadata to player(s) if needed
- # NOTE: this must all be done in separate tasks to not disturb audio
- if queue and queue.current_item and queue.current_item.streamdetails:
- metadata_checksum = (
- queue.current_item.streamdetails.stream_title
- or queue.current_item.queue_item_id
- )
- if prev_metadata_checksum != metadata_checksum:
- prev_metadata_checksum = metadata_checksum
- self.mass.create_task(self._send_metadata(player_id))
-
- # send audio chunk to player(s)
- async with asyncio.TaskGroup() as tg:
- available_clients = 0
+ prev_metadata_checksum: str = ""
+ try:
+ async for pcm_chunk in audio_iterator:
+ # send metadata to player(s) if needed
+ # NOTE: this must all be done in separate tasks to not disturb audio
+ if queue and queue.current_item and queue.current_item.streamdetails:
+ metadata_checksum = (
+ queue.current_item.streamdetails.stream_title
+ or queue.current_item.queue_item_id
+ )
+ if prev_metadata_checksum != metadata_checksum:
+ prev_metadata_checksum = metadata_checksum
+ self.mass.create_task(self._send_metadata(player_id))
+
+ async with asyncio.TaskGroup() as tg:
+ # send progress metadata
+ if queue.elapsed_time:
for atv_player in self._get_sync_clients(player_id):
- if not atv_player.cliraop_proc or atv_player.cliraop_proc.closed:
- # this may not happen, but just in case
- continue
- available_clients += 1
- tg.create_task(atv_player.cliraop_proc.write(pcm_chunk))
- if not available_clients:
- return
-
- # send progress metadata
- if queue.elapsed_time:
- for atv_player in self._get_sync_clients(player_id):
- tg.create_task(
- atv_player.send_cli_command(
- f"PROGRESS={int(queue.elapsed_time)}\n"
- )
- )
-
- finally:
- self.logger.debug("Streamer task ended for player %s", queue.display_name)
- for atv_player in self._get_sync_clients(player_id):
- if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
- atv_player.cliraop_proc.write_eof()
-
- # start streaming the queue (pcm) audio in a background task
- self._stream_tasks[player_id] = asyncio.create_task(_streamer())
-
- async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle PLAY STREAM on given player.
-
- This is a special feature from the Universal Group provider.
- """
- raise NotImplementedError
+ tg.create_task(
+ atv_player.send_cli_command(f"PROGRESS={int(queue.elapsed_time)}\n")
+ )
+ # send audio chunk to player(s)
+ available_clients = 0
+ for atv_player in self._get_sync_clients(player_id):
+ if not atv_player.cliraop_proc or atv_player.cliraop_proc.closed:
+ # this may not happen, but just in case
+ continue
+ available_clients += 1
+ tg.create_task(atv_player.cliraop_proc.write(pcm_chunk))
+ if not available_clients:
+ return
+ finally:
+ self.logger.debug("Streaming ended for player %s", player.display_name)
+ for atv_player in self._get_sync_clients(player_id):
+ if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
+ atv_player.cliraop_proc.write_eof()
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player.
if atv_player.cliraop_proc:
# prefer interactive command to our streamer
await atv_player.send_cli_command(f"VOLUME={volume_level}\n")
- elif atv := atv_player.atv:
+ if atv := atv_player.atv:
await atv.audio.set_volume(volume_level)
async def cmd_sync(self, player_id: str, target_player: str) -> None:
group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
group_leader.group_childs.remove(player_id)
player.synced_to = None
- if player.state == PlayerState.PLAYING:
- await self.cmd_stop(player_id)
+ await self.cmd_stop(player_id)
self.mass.players.update(player_id)
async def _run_discovery(self) -> None:
logger = self.logger.getChild(atv_player.player_id)
async for line in cliraop_proc._proc.stderr:
line = line.decode().strip() # noqa: PLW2901
+ if not line:
+ continue
if "set pause" in line:
atv_player.optimistic_state = PlayerState.PAUSED
atv_player.update_attributes()
- if "Restarted at" in line:
+ logger.info(line)
+ elif "Restarted at" in line:
atv_player.optimistic_state = PlayerState.PLAYING
atv_player.update_attributes()
+ logger.info(line)
elif "after start), played" in line:
millis = int(line.split("played ")[1].split(" ")[0])
mass_player.elapsed_time = millis / 1000
sync_adjust = self.mass.config.get_raw_player_config_value(
atv_player.player_id, CONF_SYNC_ADJUST, 0
)
+ if device_password := self.mass.config.get_raw_player_config_value(
+ atv_player.player_id, CONF_PASSWORD, None
+ ):
+ extra_args += ["-P", device_password]
if self.logger.level == logging.DEBUG:
extra_args += ["-d", "5"]
# get image
if not queue.current_item.image:
return
- temp_image_path = f"/tmp/{shortuuid.random(12)}" # noqa: S108
- image_data = await self.mass.metadata.get_thumbnail(
- queue.current_item.image.path,
- 512,
- queue.current_item.image.provider,
+
+ image_url = self.mass.metadata.get_image_url(
+ queue.current_item.image, size=512, prefer_proxy=True
)
- if not image_data:
- return
- async with aiofiles.open(temp_image_path, "wb") as outfile:
- await outfile.write(image_data)
for atv_player in self._get_sync_clients(player_id):
- await atv_player.send_cli_command(f"ARTWORK={temp_image_path}\n")
- # make sure the temp file gets deleted again
- await asyncio.sleep(30)
- rm_func = wrap(os.remove)
- await rm_func(temp_image_path)
+ await atv_player.send_cli_command(f"ARTWORK={image_url}\n")