From cf05d3e1a44541d8b592cc9eb43a6807b308993c Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sun, 27 Sep 2020 03:19:53 +0200 Subject: [PATCH] Refactor streamer (#17) * temporary disable uvloop uvloop is temporary disabled due to a bug with subprocesses * implement streaming as async methods * refactor player state handling, groupplayer support * more rework on player states, improve group sync --- music_assistant/__main__.py | 4 +- music_assistant/config.py | 18 +- music_assistant/constants.py | 4 +- music_assistant/helpers/__init__.py | 1 + music_assistant/helpers/typing.py | 27 + music_assistant/http_streamer.py | 582 ------------- music_assistant/mass.py | 28 +- music_assistant/models/musicprovider.py | 76 +- music_assistant/models/player.py | 282 ++++++- music_assistant/models/player_queue.py | 172 ++-- music_assistant/models/player_state.py | 534 ++++++++++++ music_assistant/models/playerprovider.py | 184 +---- music_assistant/models/provider.py | 27 +- music_assistant/models/streamdetails.py | 1 + music_assistant/player_manager.py | 317 ++----- .../providers/chromecast/__init__.py | 118 +-- .../providers/chromecast/player.py | 310 +++---- .../providers/demo/demo_playerprovider.py | 300 ++++--- music_assistant/providers/file/__init__.py | 2 + .../providers/group_player/__init__.py | 514 ++++++++++++ music_assistant/providers/sonos/sonos.py | 23 +- music_assistant/providers/spotify/__init__.py | 18 +- .../providers/squeezebox/__init__.py | 269 +----- .../providers/squeezebox/socket_client.py | 262 ++++-- .../providers/webplayer/__init__.py | 4 +- music_assistant/stream_manager.py | 771 +++++++++--------- music_assistant/utils.py | 69 ++ music_assistant/web.py | 106 ++- 28 files changed, 2631 insertions(+), 2392 deletions(-) create mode 100644 music_assistant/helpers/__init__.py create mode 100644 music_assistant/helpers/typing.py delete mode 100755 music_assistant/http_streamer.py create mode 100755 music_assistant/models/player_state.py create mode 100644 music_assistant/providers/group_player/__init__.py diff --git a/music_assistant/__main__.py b/music_assistant/__main__.py index b8be5fd7..3712b874 100755 --- a/music_assistant/__main__.py +++ b/music_assistant/__main__.py @@ -60,9 +60,11 @@ def main(): logger.info("shutdown requested!") loop.run_until_complete(mass.async_stop()) + # TODO: uvloop is temporary disabled due to a bug with subprocesses + # https://github.com/MagicStack/uvloop/issues/317 run( mass.async_start(), - use_uvloop=True, + use_uvloop=False, shutdown_callback=on_shutdown, executor_workers=64, ) diff --git a/music_assistant/config.py b/music_assistant/config.py index 49c1fb33..f711b823 100755 --- a/music_assistant/config.py +++ b/music_assistant/config.py @@ -197,7 +197,9 @@ class ConfigItem: # use default value for config entry entry.value = entry.default_value return entry - raise KeyError + raise KeyError( + "%s\\%s has no key %s!" % (self._base_type, self._parent_item_key, key) + ) def __getitem__(self, key) -> ConfigEntry: """Return default value from ConfigEntry if needed.""" @@ -253,11 +255,11 @@ class ConfigItem: ) if self._base_type == ConfigBaseType.PLAYER: # force update of player if it's config changed - player = self.mass.player_manager.get_player(self._parent_item_key) - if player: - self.mass.add_job( - self.mass.player_manager.async_update_player(player) + self.mass.add_job( + self.mass.player_manager.async_trigger_player_update( + self._parent_item_key ) + ) return # raise KeyError if we're trying to set a value not defined as ConfigEntry raise KeyError @@ -342,8 +344,10 @@ class MassConfig: def get_player_config_entries(self, player_id: str) -> List[ConfigEntry]: """Return all config entries for the given player.""" - player_conf = self.mass.player_manager.get_player_config_entries(player_id) - return DEFAULT_PLAYER_CONFIG_ENTRIES + player_conf + player = self.mass.player_manager.get_player(player_id) + if player: + return DEFAULT_PLAYER_CONFIG_ENTRIES + player.config_entries + return DEFAULT_PLAYER_CONFIG_ENTRIES @staticmethod def get_base_config_entries(base_key) -> List[ConfigEntry]: diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 17211c61..379a0fc9 100755 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -13,7 +13,9 @@ CONF_URL = "url" CONF_NAME = "name" CONF_CROSSFADE_DURATION = "crossfade_duration" CONF_FALLBACK_GAIN_CORRECT = "fallback_gain_correct" - +CONF_GROUP_DELAY = "group_delay" +CONF_VOLUME_CONTROL = "volume_control" +CONF_POWER_CONTROL = "power_control" CONF_KEY_BASE = "base" CONF_KEY_PLAYERSETTINGS = "player_settings" diff --git a/music_assistant/helpers/__init__.py b/music_assistant/helpers/__init__.py new file mode 100644 index 00000000..65147294 --- /dev/null +++ b/music_assistant/helpers/__init__.py @@ -0,0 +1 @@ +"""Various utils/helpers.""" diff --git a/music_assistant/helpers/typing.py b/music_assistant/helpers/typing.py new file mode 100644 index 00000000..29f12289 --- /dev/null +++ b/music_assistant/helpers/typing.py @@ -0,0 +1,27 @@ +"""Typing helper.""" + +from typing import TYPE_CHECKING, List, Optional + +# pylint: disable=invalid-name +if TYPE_CHECKING: + from music_assistant.mass import MusicAssistant as MusicAssistantType + from music_assistant.models.player_queue import ( + QueueItem as QueueItemType, + PlayerQueue as PlayerQueueType, + ) + from music_assistant.models.streamdetails import StreamDetails as StreamDetailsType + from music_assistant.models.player import Player as PlayerType + +else: + MusicAssistantType = "MusicAssistant" + QueueItemType = "QueueItem" + PlayerQueueType = "PlayerQueue" + StreamDetailsType = "StreamDetailsType" + PlayerType = "PlayerType" + + +QueueItems = List[QueueItemType] +Players = List[PlayerType] + +OptionalInt = Optional[int] +OptionalStr = Optional[str] diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py deleted file mode 100755 index 6c2a7bb7..00000000 --- a/music_assistant/http_streamer.py +++ /dev/null @@ -1,582 +0,0 @@ -""" -HTTPStreamer: handles all audio streaming to players. - -Either by sending tracks one by one or send one continuous stream -of music with crossfade/gapless support (queue stream). -""" -import asyncio -import gc -import io -import logging -import shlex -import subprocess -import threading -import urllib -from contextlib import suppress - -import aiohttp -import pyloudnorm -import soundfile -from aiohttp import web -from music_assistant.constants import EVENT_STREAM_ENDED, EVENT_STREAM_STARTED -from music_assistant.models.media_types import MediaType -from music_assistant.models.player_queue import QueueItem -from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType -from music_assistant.utils import create_tempfile, decrypt_string, get_ip, try_parse_int -from music_assistant.web import require_local_subnet - -LOGGER = logging.getLogger("mass") - - -class HTTPStreamer: - """Built-in streamer using sox and webserver.""" - - def __init__(self, mass): - """Initialize class.""" - self.mass = mass - self.local_ip = get_ip() - self.analyze_jobs = {} - self.stream_clients = [] - - async def async_stream_media_item(self, http_request): - """Start stream for a single media item, player independent.""" - # make sure we have valid params - media_type = MediaType.from_string(http_request.match_info["media_type"]) - if media_type not in [MediaType.Track, MediaType.Radio]: - return web.Response(status=404, reason="Media item is not playable!") - provider = http_request.match_info["provider"] - item_id = http_request.match_info["item_id"] - player_id = http_request.remote # fake player id - # prepare headers as audio/flac content - resp = web.StreamResponse( - status=200, reason="OK", headers={"Content-Type": "audio/flac"} - ) - await resp.prepare(http_request) - # collect tracks to play - media_item = await self.mass.music_manager.async_get_item( - item_id, provider, media_type - ) - queue_item = QueueItem(media_item) - # run the streamer in executor to prevent the subprocess locking up our eventloop - cancelled = threading.Event() - bg_task = self.mass.loop.run_in_executor( - None, - self.__get_queue_item_stream, - player_id, - queue_item, - resp, - cancelled, - ) - # let the streaming begin! - try: - await asyncio.gather(bg_task) - except ( - asyncio.CancelledError, - aiohttp.ClientConnectionError, - asyncio.TimeoutError, - ) as exc: - cancelled.set() - raise exc # re-raise - return resp - - @require_local_subnet - async def async_stream(self, http_request): - """Start stream for a player.""" - # make sure we have valid params - player_id = http_request.match_info.get("player_id", "") - player_queue = self.mass.player_manager.get_player_queue(player_id) - if not player_queue: - return web.Response(status=404, reason="Player(queue) not found!") - if not player_queue.use_queue_stream: - queue_item_id = http_request.match_info.get("queue_item_id") - queue_item = player_queue.by_item_id(queue_item_id) - if not queue_item: - return web.Response(status=404, reason="Invalid Queue item Id") - # prepare headers as audio/flac content - resp = web.StreamResponse( - status=200, reason="OK", headers={"Content-Type": "audio/flac"} - ) - await resp.prepare(http_request) - # run the streamer in executor to prevent the subprocess locking up our eventloop - cancelled = threading.Event() - if player_queue.use_queue_stream: - bg_task = self.mass.loop.run_in_executor( - None, self.__get_queue_stream, player_id, resp, cancelled - ) - else: - bg_task = self.mass.loop.run_in_executor( - None, - self.__get_queue_item_stream, - player_id, - queue_item, - resp, - cancelled, - ) - # let the streaming begin! - try: - await asyncio.gather(bg_task) - except ( - asyncio.CancelledError, - aiohttp.ClientConnectionError, - asyncio.TimeoutError, - Exception, - ) as exc: - cancelled.set() - raise exc # re-raise - return resp - - def __get_queue_item_stream(self, player_id, queue_item, buffer, cancelled): - """Start streaming single queue track.""" - # pylint: disable=unused-variable - LOGGER.debug( - "stream single queue track started for track %s on player %s", - queue_item.name, - player_id, - ) - for is_last_chunk, audio_chunk in self.__get_audio_stream( - player_id, queue_item, cancelled - ): - if cancelled.is_set(): - # http session ended - # we must consume the data to prevent hanging subprocess instances - continue - # put chunk in buffer - with suppress((BrokenPipeError, ConnectionResetError)): - asyncio.run_coroutine_threadsafe( - buffer.write(audio_chunk), self.mass.loop - ).result() - # all chunks received: streaming finished - if cancelled.is_set(): - LOGGER.debug( - "stream single track interrupted for track %s on player %s", - queue_item.name, - player_id, - ) - else: - # indicate EOF if no more data - with suppress((BrokenPipeError, ConnectionResetError)): - asyncio.run_coroutine_threadsafe( - buffer.write_eof(), self.mass.loop - ).result() - - LOGGER.debug( - "stream single track finished for track %s on player %s", - queue_item.name, - player_id, - ) - - def __get_queue_stream(self, player_id, buffer, cancelled): - """Start streaming all queue tracks.""" - player_conf = self.mass.config.get_player_config(player_id) - player_queue = self.mass.player_manager.get_player_queue(player_id) - sample_rate = try_parse_int(player_conf["max_sample_rate"]) - fade_length = try_parse_int(player_conf["crossfade_duration"]) - if not sample_rate or sample_rate < 44100: - sample_rate = 96000 - if fade_length: - fade_bytes = int(sample_rate * 4 * 2 * fade_length) - else: - fade_bytes = int(sample_rate * 4 * 2 * 6) - pcm_args = "raw -b 32 -c 2 -e signed-integer -r %s" % sample_rate - args = "sox -t %s - -t flac -C 0 -" % pcm_args - # start sox process - args = shlex.split(args) - sox_proc = subprocess.Popen( - args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE - ) - - def fill_buffer(): - while True: - if ( - not sox_proc - or not sox_proc.stdout - or sox_proc.stdout.closed - or sox_proc.poll() is not None - ): - break - chunk = sox_proc.stdout.read(128000) # noqa - if not chunk: - break - if chunk and not cancelled.is_set(): - with suppress((BrokenPipeError, ConnectionResetError)): - asyncio.run_coroutine_threadsafe( - buffer.write(chunk), self.mass.loop - ).result() - del chunk - # indicate EOF if no more data - if not cancelled.is_set(): - with suppress((BrokenPipeError, ConnectionResetError)): - asyncio.run_coroutine_threadsafe( - buffer.write_eof(), self.mass.loop - ).result() - - # start fill buffer task in background - fill_buffer_thread = threading.Thread(target=fill_buffer) - fill_buffer_thread.start() - - LOGGER.info("Start Queue Stream for player %s ", player_id) - is_start = True - last_fadeout_data = b"" - while True: - if cancelled.is_set(): - break - # get the (next) track in queue - if is_start: - # report start of queue playback so we can calculate current track/duration etc. - queue_track = self.mass.add_job( - player_queue.async_start_queue_stream() - ).result() - is_start = False - else: - queue_track = player_queue.next_item - if not queue_track: - LOGGER.debug("no (more) tracks left in queue") - break - LOGGER.debug( - "Start Streaming queue track: %s (%s) on player %s", - queue_track.item_id, - queue_track.name, - player_id, - ) - fade_in_part = b"" - cur_chunk = 0 - prev_chunk = None - bytes_written = 0 - # handle incoming audio chunks - for is_last_chunk, chunk in self.__get_audio_stream( - player_id, - queue_track, - cancelled, - chunksize=fade_bytes, - resample=sample_rate, - ): - cur_chunk += 1 - - # HANDLE FIRST PART OF TRACK - if not chunk and cur_chunk == 1 and is_last_chunk: - LOGGER.warning("Stream error, skip track %s", queue_track.item_id) - break - if cur_chunk <= 2 and not last_fadeout_data: - # no fadeout_part available so just pass it to the output directly - sox_proc.stdin.write(chunk) - bytes_written += len(chunk) - del chunk - elif cur_chunk == 1 and last_fadeout_data: - prev_chunk = chunk - del chunk - # HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN - elif cur_chunk == 2 and last_fadeout_data: - # combine the first 2 chunks and strip off silence - args = "sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%" % ( - pcm_args, - pcm_args, - ) - first_part, _ = subprocess.Popen( - args, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE - ).communicate(prev_chunk + chunk) - if len(first_part) < fade_bytes: - # part is too short after the strip action?! - # so we just use the full first part - first_part = prev_chunk + chunk - fade_in_part = first_part[:fade_bytes] - remaining_bytes = first_part[fade_bytes:] - del first_part - # do crossfade - crossfade_part = self.__crossfade_pcm_parts( - fade_in_part, last_fadeout_data, pcm_args, fade_length - ) - sox_proc.stdin.write(crossfade_part) - bytes_written += len(crossfade_part) - del crossfade_part - del fade_in_part - last_fadeout_data = b"" - # also write the leftover bytes from the strip action - sox_proc.stdin.write(remaining_bytes) - bytes_written += len(remaining_bytes) - del remaining_bytes - del chunk - prev_chunk = None # needed to prevent this chunk being sent again - # HANDLE LAST PART OF TRACK - elif prev_chunk and is_last_chunk: - # last chunk received so create the last_part - # with the previous chunk and this chunk - # and strip off silence - args = ( - "sox --ignore-length -t %s - -t %s - reverse silence 1 0.1 1%% reverse" - % (pcm_args, pcm_args) - ) - last_part, _ = subprocess.Popen( - args, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE - ).communicate(prev_chunk + chunk) - if len(last_part) < fade_bytes: - # part is too short after the strip action - # so we just use the entire original data - last_part = prev_chunk + chunk - if len(last_part) < fade_bytes: - LOGGER.warning( - "Not enough data for crossfade: %s", len(last_part) - ) - if ( - not player_queue.crossfade_enabled - or len(last_part) < fade_bytes - ): - # crossfading is not enabled so just pass the (stripped) audio data - sox_proc.stdin.write(last_part) - bytes_written += len(last_part) - del last_part - del chunk - else: - # handle crossfading support - # store fade section to be picked up for next track - last_fadeout_data = last_part[-fade_bytes:] - remaining_bytes = last_part[:-fade_bytes] - # write remaining bytes - sox_proc.stdin.write(remaining_bytes) - bytes_written += len(remaining_bytes) - del last_part - del remaining_bytes - del chunk - # MIDDLE PARTS OF TRACK - else: - # middle part of the track - # keep previous chunk in memory so we have enough - # samples to perform the crossfade - if prev_chunk: - sox_proc.stdin.write(prev_chunk) - bytes_written += len(prev_chunk) - prev_chunk = chunk - else: - prev_chunk = chunk - del chunk - # end of the track reached - if cancelled.is_set(): - # break out the loop if the http session is cancelled - break - # update actual duration to the queue for more accurate now playing info - accurate_duration = bytes_written / int(sample_rate * 4 * 2) - queue_track.duration = accurate_duration - LOGGER.debug( - "Finished Streaming queue track: %s (%s) on player %s", - queue_track.item_id, - queue_track.name, - player_id, - ) - # run garbage collect manually to avoid too much memory fragmentation - gc.collect() - # end of queue reached, pass last fadeout bits to final output - if last_fadeout_data and not cancelled.is_set(): - sox_proc.stdin.write(last_fadeout_data) - del last_fadeout_data - # END OF QUEUE STREAM - sox_proc.terminate() - sox_proc.communicate() - fill_buffer_thread.join() - # run garbage collect manually to avoid too much memory fragmentation - gc.collect() - if cancelled.is_set(): - LOGGER.info("streaming of queue for player %s interrupted", player_id) - else: - LOGGER.info("streaming of queue for player %s completed", player_id) - - def __get_audio_stream( - self, player_id, queue_item, cancelled, chunksize=128000, resample=None - ): - """Get audio stream from provider and apply additional effects/processing if needed.""" - streamdetails = self.mass.add_job( - self.mass.music_manager.async_get_stream_details(queue_item, player_id) - ).result() - if not streamdetails: - LOGGER.warning("no stream details for %s", queue_item.name) - yield (True, b"") - return - # get sox effects and resample options - sox_options = self.__get_player_sox_options(player_id, streamdetails) - outputfmt = "flac -C 0" - if resample: - outputfmt = "raw -b 32 -c 2 -e signed-integer" - sox_options += " rate -v %s" % resample - streamdetails.sox_options = sox_options - # determine how to proceed based on input file type - if streamdetails.content_type == ContentType.AAC: - # support for AAC created with ffmpeg in between - args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % ( - decrypt_string(streamdetails.path), - outputfmt, - sox_options, - ) - process = subprocess.Popen( - args, shell=True, stdout=subprocess.PIPE, bufsize=chunksize - ) - elif streamdetails.type in [StreamType.URL, StreamType.FILE]: - args = 'sox -t %s "%s" -t %s - %s' % ( - streamdetails.content_type.name, - decrypt_string(streamdetails.path), - outputfmt, - sox_options, - ) - args = shlex.split(args) - process = subprocess.Popen( - args, shell=False, stdout=subprocess.PIPE, bufsize=chunksize - ) - elif streamdetails.type == StreamType.EXECUTABLE: - args = "%s | sox -t %s - -t %s - %s" % ( - decrypt_string(streamdetails.path), - streamdetails.content_type.name, - outputfmt, - sox_options, - ) - process = subprocess.Popen( - args, shell=True, stdout=subprocess.PIPE, bufsize=chunksize - ) - else: - LOGGER.warning("no streaming options for %s", queue_item.name) - yield (True, b"") - return - # fire event that streaming has started for this track - self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails) - # yield chunks from stdout - # we keep 1 chunk behind to detect end of stream properly - prev_chunk = b"" - while True: - if cancelled.is_set(): - # http session ended - # send terminate and pick up left over bytes - process.terminate() - chunk, _ = process.communicate() - LOGGER.warning( - "__get_audio_stream cancelled for track %s on player %s", - queue_item.name, - player_id, - ) - else: - # read exactly chunksize of data - chunk = process.stdout.read(chunksize) - if len(chunk) < chunksize: - # last chunk - yield (True, prev_chunk + chunk) - break - if prev_chunk: - yield (False, prev_chunk) - prev_chunk = chunk - # fire event that streaming has ended - if not cancelled.is_set(): - streamdetails.seconds_played = queue_item.duration - self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails) - # send task to background to analyse the audio - if queue_item.media_type == MediaType.Track: - self.mass.add_job(self.__analyze_audio, streamdetails) - LOGGER.debug( - "__get_audio_stream complete for track %s on player %s", - queue_item.name, - player_id, - ) - - def __get_player_sox_options( - self, player_id: str, streamdetails: StreamDetails - ) -> str: - """Get player specific sox effect options.""" - sox_options = [] - player_conf = self.mass.config.get_player_config(player_id) - # volume normalisation - gain_correct = self.mass.add_job( - self.mass.player_manager.async_get_gain_correct( - player_id, streamdetails.item_id, streamdetails.provider - ) - ).result() - if gain_correct != 0: - sox_options.append("vol %s dB " % gain_correct) - # downsample if needed - if player_conf["max_sample_rate"]: - max_sample_rate = try_parse_int(player_conf["max_sample_rate"]) - if max_sample_rate < streamdetails.sample_rate: - sox_options.append(f"rate -v {max_sample_rate}") - if player_conf.get("sox_options"): - sox_options.append(player_conf["sox_options"]) - return " ".join(sox_options) - - def __analyze_audio(self, streamdetails): - """Analyze track audio, for now we only calculate EBU R128 loudness.""" - item_key = "%s%s" % (streamdetails.item_id, streamdetails.provider) - if item_key in self.analyze_jobs: - return # prevent multiple analyze jobs for same track - self.analyze_jobs[item_key] = True - track_loudness = self.mass.add_job( - self.mass.database.async_get_track_loudness( - streamdetails.item_id, streamdetails.provider - ) - ).result() - if track_loudness is None: - # only when needed we do the analyze stuff - LOGGER.debug("Start analyzing track %s", item_key) - if streamdetails.type == StreamType.URL: - audio_data = urllib.request.urlopen( - decrypt_string(streamdetails.path) - ).read() - elif streamdetails.type == StreamType.EXECUTABLE: - audio_data = subprocess.check_output( - decrypt_string(streamdetails.path), shell=True - ) - elif streamdetails.type == StreamType.FILE: - with open(decrypt_string(streamdetails.path), "rb") as _file: - audio_data = _file.read() - # calculate BS.1770 R128 integrated loudness - with io.BytesIO(audio_data) as tmpfile: - data, rate = soundfile.read(tmpfile) - meter = pyloudnorm.Meter(rate) # create BS.1770 meter - loudness = meter.integrated_loudness(data) # measure loudness - del data - self.mass.add_job( - self.mass.database.async_set_track_loudness( - streamdetails.item_id, streamdetails.provider, loudness - ) - ) - del audio_data - LOGGER.debug("Integrated loudness of track %s is: %s", item_key, loudness) - self.analyze_jobs.pop(item_key, None) - - @staticmethod - def __crossfade_pcm_parts(fade_in_part, fade_out_part, pcm_args, fade_length): - """Crossfade two chunks of audio using sox.""" - # create fade-in part - fadeinfile = create_tempfile() - args = "sox --ignore-length -t %s - -t %s %s fade t %s" % ( - pcm_args, - pcm_args, - fadeinfile.name, - fade_length, - ) - args = shlex.split(args) - process = subprocess.Popen(args, shell=False, stdin=subprocess.PIPE) - process.communicate(fade_in_part) - # create fade-out part - fadeoutfile = create_tempfile() - args = "sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse" % ( - pcm_args, - pcm_args, - fadeoutfile.name, - fade_length, - ) - args = shlex.split(args) - process = subprocess.Popen( - args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE - ) - process.communicate(fade_out_part) - # create crossfade using sox and some temp files - # TODO: figure out how to make this less complex and without the tempfiles - args = "sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -" % ( - pcm_args, - fadeoutfile.name, - pcm_args, - fadeinfile.name, - pcm_args, - ) - args = shlex.split(args) - process = subprocess.Popen( - args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE - ) - crossfade_part, _ = process.communicate() - fadeinfile.close() - fadeoutfile.close() - del fadeinfile - del fadeoutfile - return crossfade_part diff --git a/music_assistant/mass.py b/music_assistant/mass.py index c512da58..8824502b 100644 --- a/music_assistant/mass.py +++ b/music_assistant/mass.py @@ -17,11 +17,11 @@ from music_assistant.constants import ( EVENT_SHUTDOWN, ) from music_assistant.database import Database -from music_assistant.http_streamer import HTTPStreamer from music_assistant.metadata import MetaData from music_assistant.models.provider import Provider, ProviderType from music_assistant.music_manager import MusicManager from music_assistant.player_manager import PlayerManager +from music_assistant.stream_manager import StreamManager from music_assistant.utils import callback, get_ip_pton, is_callback from music_assistant.web import Web from zeroconf import NonUniqueNameException, ServiceInfo, Zeroconf @@ -52,7 +52,7 @@ class MusicAssistant: self.web = Web(self) self.music_manager = MusicManager(self) self.player_manager = PlayerManager(self) - self.http_streamer = HTTPStreamer(self) + self.stream_manager = StreamManager(self) # shared zeroconf instance self.zeroconf = Zeroconf() self._exit = False @@ -73,9 +73,9 @@ class MusicAssistant: await self.cache.async_setup() await self.music_manager.async_setup() await self.player_manager.async_setup() - await self.web.async_setup() await self.async_preload_providers() await self.__async_setup_discovery() + await self.web.async_setup() async def async_stop(self): """Stop running the music assistant server.""" @@ -200,7 +200,7 @@ class MusicAssistant: @callback def add_job( - self, target: Callable[..., Any], *args: Any + self, target: Callable[..., Any], *args: Any, **kwargs: Any ) -> Optional[asyncio.Future]: """Add a job/task to the event loop. @@ -209,34 +209,36 @@ class MusicAssistant: """ task = None - if self._exit: - return - # Check for partials to properly determine if coroutine function check_target = target while isinstance(check_target, functools.partial): check_target = check_target.func + if self._exit: + LOGGER.warning("scheduling job %s while exiting", check_target.__name__) + if threading.current_thread() is not threading.main_thread(): # called from other thread if asyncio.iscoroutine(check_target): task = asyncio.run_coroutine_threadsafe(target, self.loop) # type: ignore elif asyncio.iscoroutinefunction(check_target): - task = asyncio.run_coroutine_threadsafe(target(*args), self.loop) + task = asyncio.run_coroutine_threadsafe( + target(*args, **kwargs), self.loop + ) elif is_callback(check_target): - task = self.loop.call_soon_threadsafe(target, *args) + task = self.loop.call_soon_threadsafe(target, *args, **kwargs) else: - task = self.loop.run_in_executor(None, target, *args) # type: ignore + task = self.loop.run_in_executor(None, target, *args, **kwargs) # type: ignore else: # called from mainthread if asyncio.iscoroutine(check_target): task = self.loop.create_task(target) # type: ignore elif asyncio.iscoroutinefunction(check_target): - task = self.loop.create_task(target(*args)) + task = self.loop.create_task(target(*args, **kwargs)) elif is_callback(check_target): - task = self.loop.call_soon(target, *args) + task = self.loop.call_soon(target, *args, *kwargs) else: - task = self.loop.run_in_executor(None, target, *args) # type: ignore + task = self.loop.run_in_executor(None, target, *args, *kwargs) # type: ignore return task @staticmethod diff --git a/music_assistant/models/musicprovider.py b/music_assistant/models/musicprovider.py index a24db70b..fd3f059a 100755 --- a/music_assistant/models/musicprovider.py +++ b/music_assistant/models/musicprovider.py @@ -1,7 +1,5 @@ """Model and helpers for Music Providers.""" -from abc import abstractmethod -from dataclasses import dataclass from typing import List, Optional from music_assistant.models.media_types import ( @@ -17,7 +15,6 @@ from music_assistant.models.provider import Provider, ProviderType from music_assistant.models.streamdetails import StreamDetails -@dataclass class MusicProvider(Provider): """ Base class for a Musicprovider. @@ -25,7 +22,10 @@ class MusicProvider(Provider): Should be overriden in the provider specific implementation. """ - type: ProviderType = ProviderType.MUSIC_PROVIDER + @property + def type(self) -> ProviderType: + """Return ProviderType.""" + return ProviderType.MUSIC_PROVIDER @property def supported_mediatypes(self) -> List[MediaType]: @@ -38,7 +38,6 @@ class MusicProvider(Provider): MediaType.Track, ] - @abstractmethod async def async_search( self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 ) -> SearchResult: @@ -51,103 +50,100 @@ class MusicProvider(Provider): """ raise NotImplementedError - @abstractmethod async def async_get_library_artists(self) -> List[Artist]: """Retrieve library artists from the provider.""" - raise NotImplementedError + if MediaType.Artist in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_library_albums(self) -> List[Album]: """Retrieve library albums from the provider.""" - raise NotImplementedError + if MediaType.Album in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_library_tracks(self) -> List[Track]: """Retrieve library tracks from the provider.""" - raise NotImplementedError + if MediaType.Track in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_library_playlists(self) -> List[Playlist]: """Retrieve library/subscribed playlists from the provider.""" - raise NotImplementedError + if MediaType.Playlist in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_radios(self) -> List[Radio]: """Retrieve library/subscribed radio stations from the provider.""" - raise NotImplementedError + if MediaType.Radio in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_artist(self, prov_artist_id: str) -> Artist: """Get full artist details by id.""" - raise NotImplementedError + if MediaType.Artist in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_artist_albums(self, prov_artist_id: str) -> List[Album]: """Get a list of all albums for the given artist.""" - raise NotImplementedError + if MediaType.Album in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_artist_toptracks(self, prov_artist_id: str) -> List[Track]: """Get a list of most popular tracks for the given artist.""" - raise NotImplementedError + if MediaType.Track in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_album(self, prov_album_id: str) -> Album: """Get full album details by id.""" - raise NotImplementedError + if MediaType.Album in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_track(self, prov_track_id: str) -> Track: """Get full track details by id.""" - raise NotImplementedError + if MediaType.Track in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_playlist(self, prov_playlist_id: str) -> Playlist: """Get full playlist details by id.""" - raise NotImplementedError + if MediaType.Playlist in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_radio(self, prov_radio_id: str) -> Radio: """Get full radio details by id.""" - raise NotImplementedError + if MediaType.Radio in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_album_tracks(self, prov_album_id: str) -> List[Track]: """Get album tracks for given album id.""" - raise NotImplementedError + if MediaType.Album in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]: """Get all playlist tracks for given playlist id.""" - raise NotImplementedError + if MediaType.Playlist in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_library_add(self, prov_item_id: str, media_type: MediaType) -> bool: """Add item to provider's library. Return true on succes.""" raise NotImplementedError - @abstractmethod async def async_library_remove( self, prov_item_id: str, media_type: MediaType ) -> bool: """Remove item from provider's library. Return true on succes.""" raise NotImplementedError - @abstractmethod async def async_add_playlist_tracks( self, prov_playlist_id: str, prov_track_ids: List[str] ) -> bool: """Add track(s) to playlist. Return true on succes.""" - raise NotImplementedError + if MediaType.Playlist in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_remove_playlist_tracks( self, prov_playlist_id: str, prov_track_ids: List[str] ) -> bool: """Remove track(s) from playlist. Return true on succes.""" - raise NotImplementedError + if MediaType.Playlist in self.supported_mediatypes: + raise NotImplementedError - @abstractmethod async def async_get_stream_details(self, item_id: str) -> StreamDetails: """Get streamdetails for a track/radio.""" raise NotImplementedError diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index d2525462..607ac65a 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -1,17 +1,18 @@ """Models and helpers for a player.""" -from dataclasses import dataclass, field -from datetime import datetime +from abc import abstractmethod +from dataclasses import dataclass from enum import Enum -from typing import Any, List +from typing import Any, List, Optional from mashumaro import DataClassDictMixin from music_assistant.constants import EVENT_SET_PLAYER_CONTROL_STATE +from music_assistant.helpers.typing import MusicAssistantType, QueueItems from music_assistant.models.config_entry import ConfigEntry -from music_assistant.utils import CustomIntEnum +from music_assistant.utils import CustomIntEnum, callback -class PlayerState(Enum): +class PlaybackState(Enum): """Enum for the playstate of a player.""" Stopped = "stopped" @@ -37,43 +38,236 @@ class PlayerFeature(CustomIntEnum): CROSSFADE = 2 -@dataclass -class Player(DataClassDictMixin): - """Model for a MusicPlayer.""" +class Player: + """Model for a music player.""" - player_id: str - provider_id: str - name: str = "" - powered: bool = False - elapsed_time: int = 0 - state: PlayerState = PlayerState.Stopped - available: bool = True - current_uri: str = "" - volume_level: int = 0 - muted: bool = False - is_group_player: bool = False - group_childs: List[str] = field(default_factory=list) - device_info: DeviceInfo = None - should_poll: bool = False - features: List[PlayerFeature] = field(default_factory=list) - config_entries: List[ConfigEntry] = field(default_factory=list) - # below attributes are handled by the player manager. No need to set/override them. - updated_at: datetime = field(default=datetime.utcnow(), init=False) - active_queue: str = field(default="", init=False) - group_parents: List[str] = field(init=False, default_factory=list) - - def __setattr__(self, name, value): - """Watch for attribute updates. Do not override.""" - if name == "updated_at": - # updated at is set by the on_update callback - # make sure we do not hit an endless loop - super().__setattr__(name, value) - return - value_changed = hasattr(self, name) and getattr(self, name) != value - super().__setattr__(name, value) - if value_changed and hasattr(self, "_on_update"): - # pylint: disable=no-member - self._on_update(self.player_id, name) + mass: MusicAssistantType = None # will be set by player manager + + # Public properties: should be overriden with provider specific implementation + + @property + @abstractmethod + def player_id(self) -> str: + """Return player id of this player.""" + return None + + @property + @abstractmethod + def provider_id(self) -> str: + """Return provider id of this player.""" + return None + + @property + def name(self) -> str: + """Return name of the player.""" + return None + + @property + @abstractmethod + def powered(self) -> bool: + """Return current power state of player.""" + return False + + @property + @abstractmethod + def elapsed_time(self) -> int: + """Return elapsed time of current playing media in seconds.""" + return 0 + + @property + def elapsed_milliseconds(self) -> Optional[int]: + """ + Return elapsed time of current playing media in milliseconds. + + This is an optional property. + If provided, the property must return the REALTIME value while playing. + Used for synced playback in player groups. + """ + return None + + @property + @abstractmethod + def state(self) -> PlaybackState: + """Return current PlaybackState of player.""" + return PlaybackState.Stopped + + @property + def available(self) -> bool: + """Return current availablity of player.""" + return True + + @property + @abstractmethod + def current_uri(self) -> Optional[str]: + """Return currently loaded uri of player (if any).""" + return None + + @property + @abstractmethod + def volume_level(self) -> int: + """Return current volume level of player (scale 0..100).""" + return 0 + + @property + @abstractmethod + def muted(self) -> bool: + """Return current mute state of player.""" + return False + + @property + @abstractmethod + def is_group_player(self) -> bool: + """Return True if this player is a group player.""" + return False + + @property + def group_childs(self) -> List[str]: + """Return list of child player id's if player is a group player.""" + return [] + + @property + def device_info(self) -> DeviceInfo: + """Return the device info for this player.""" + return DeviceInfo() + + @property + def should_poll(self) -> bool: + """Return True if this player should be polled for state updates.""" + return False + + @property + def features(self) -> List[PlayerFeature]: + """Return list of features this player supports.""" + return [] + + @property + def config_entries(self) -> List[ConfigEntry]: + """Return player specific config entries (if any).""" + return [] + + # Public methods / player commands: should be overriden with provider specific implementation + + async def async_on_update(self) -> None: + """Call when player is periodically polled by the player manager (should_poll=True).""" + self.update_state() + + async def async_on_remove(self) -> None: + """Call when player is removed from the player manager.""" + + async def async_cmd_play_uri(self, uri: str) -> None: + """ + Play the specified uri/url on the player. + + :param uri: uri/url to send to the player. + """ + raise NotImplementedError + + async def async_cmd_stop(self) -> None: + """Send STOP command to player.""" + raise NotImplementedError + + async def async_cmd_play(self) -> None: + """Send PLAY command to player.""" + raise NotImplementedError + + async def async_cmd_pause(self) -> None: + """Send PAUSE command to player.""" + raise NotImplementedError + + async def async_cmd_next(self) -> None: + """Send NEXT TRACK command to player.""" + raise NotImplementedError + + async def async_cmd_previous(self) -> None: + """Send PREVIOUS TRACK command to player.""" + raise NotImplementedError + + async def async_cmd_power_on(self) -> None: + """Send POWER ON command to player.""" + raise NotImplementedError + + async def async_cmd_power_off(self) -> None: + """Send POWER OFF command to player.""" + raise NotImplementedError + + async def async_cmd_volume_set(self, volume_level: int) -> None: + """ + Send volume level command to player. + + :param volume_level: volume level to set (0..100). + """ + raise NotImplementedError + + async def async_cmd_volume_mute(self, is_muted: bool = False) -> None: + """ + Send volume MUTE command to given player. + + :param is_muted: bool with new mute state. + """ + raise NotImplementedError + + # OPTIONAL: QUEUE SERVICE CALLS/COMMANDS - OVERRIDE ONLY IF SUPPORTED BY PROVIDER + + async def async_cmd_queue_play_index(self, index: int) -> None: + """ + Play item at index X on player's queue. + + :param index: (int) index of the queue item that should start playing + """ + if PlayerFeature.QUEUE in self.features: + raise NotImplementedError + + async def async_cmd_queue_load(self, queue_items: QueueItems) -> None: + """ + Load/overwrite given items in the player's queue implementation. + + :param queue_items: a list of QueueItems + """ + if PlayerFeature.QUEUE in self.features: + raise NotImplementedError + + async def async_cmd_queue_insert( + self, queue_items: QueueItems, insert_at_index: int + ) -> None: + """ + Insert new items at position X into existing queue. + + If insert_at_index 0 or None, will start playing newly added item(s) + :param queue_items: a list of QueueItems + :param insert_at_index: queue position to insert new items + """ + if PlayerFeature.QUEUE in self.features: + raise NotImplementedError + + async def async_cmd_queue_append(self, queue_items: QueueItems) -> None: + """ + Append new items at the end of the queue. + + :param queue_items: a list of QueueItems + """ + if PlayerFeature.QUEUE in self.features: + raise NotImplementedError + + async def async_cmd_queue_update(self, queue_items: QueueItems) -> None: + """ + Overwrite the existing items in the queue, used for reordering. + + :param queue_items: a list of QueueItems + """ + if PlayerFeature.QUEUE in self.features: + raise NotImplementedError + + async def async_cmd_queue_clear(self) -> None: + """Clear the player's queue.""" + if PlayerFeature.QUEUE in self.features: + raise NotImplementedError + + # Do not override below this point + + @callback + def update_state(self) -> None: + """Call to store current player state in the player manager.""" + self.mass.add_job(self.mass.player_manager.async_update_player(self)) class PlayerControlType(CustomIntEnum): @@ -98,20 +292,20 @@ class PlayerControl: provider: str = "" name: str = "" state: Any = None + mass: MusicAssistantType = None # will be set by player manager - async def async_set_state(self, new_state: Any): + async def async_set_state(self, new_state: Any) -> None: """Handle command to set the state for a player control.""" # by default we just signal an event on the eventbus # pickup this event (e.g. from the websocket api) # or override this method with your own implementation. - # pylint: disable=no-member self.mass.signal_event( EVENT_SET_PLAYER_CONTROL_STATE, {"control_id": self.control_id, "state": new_state}, ) - def to_dict(self): + def to_dict(self) -> dict: """Return dict representation of this playercontrol.""" return { "type": int(self.type), diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index df9ee7d9..0372da58 100755 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -2,18 +2,25 @@ import logging import random +import time import uuid from dataclasses import dataclass from enum import Enum -from typing import List +from typing import List, Optional, Tuple from music_assistant.constants import ( EVENT_QUEUE_ITEMS_UPDATED, EVENT_QUEUE_TIME_UPDATED, EVENT_QUEUE_UPDATED, ) +from music_assistant.helpers.typing import ( + MusicAssistantType, + OptionalInt, + OptionalStr, + PlayerType, +) from music_assistant.models.media_types import Track -from music_assistant.models.player import PlayerFeature, PlayerState +from music_assistant.models.player import PlaybackState, PlayerFeature from music_assistant.models.streamdetails import StreamDetails from music_assistant.utils import callback @@ -41,7 +48,7 @@ class QueueItem(Track): uri: str = "" queue_item_id: str = "" - def __init__(self, media_item=None): + def __init__(self, media_item=None) -> None: """Initialize class.""" super().__init__() self.queue_item_id = str(uuid.uuid4()) @@ -54,7 +61,7 @@ class QueueItem(Track): class PlayerQueue: """Class that holds the queue items for a player.""" - def __init__(self, mass, player_id: str): + def __init__(self, mass: MusicAssistantType, player_id: str) -> None: """Initialize class.""" self.mass = mass self._player_id = player_id @@ -66,32 +73,39 @@ class PlayerQueue: self._last_item = None self._next_queue_startindex = 0 self._last_queue_startindex = 0 - self._last_player_state = PlayerState.Stopped + self._last_player_state = PlaybackState.Stopped # load previous queue settings from disk self.mass.add_job(self.__async_restore_saved_state()) - async def async_close(self): + async def async_close(self) -> None: """Handle shutdown/close.""" # pylint: disable=unused-argument await self.__async_save_state() @property - def player(self): + def player(self) -> PlayerType: """Return handle to player.""" return self.mass.player_manager.get_player(self._player_id) @property - def player_id(self): - """Return handle to player.""" + def player_id(self) -> str: + """Return the player's id.""" return self._player_id + def get_stream_url(self) -> str: + """Return the full stream url for this QueueStream.""" + uri = f"{self.mass.web.internal_url}/stream/queue/{self.player_id}" + # we set the checksum just to invalidate cache stuf + uri += f"?checksum={time.time()}" + return uri + @property - def shuffle_enabled(self): + def shuffle_enabled(self) -> bool: """Return shuffle enabled property.""" return self._shuffle_enabled @shuffle_enabled.setter - def shuffle_enabled(self, enable_shuffle: bool): + def shuffle_enabled(self, enable_shuffle: bool) -> None: """Set shuffle.""" if not self._shuffle_enabled and enable_shuffle: # shuffle requested @@ -113,12 +127,12 @@ class PlayerQueue: self.mass.add_job(self.async_update_state()) @property - def repeat_enabled(self): + def repeat_enabled(self) -> bool: """Return if crossfade is enabled for this player.""" return self._repeat_enabled @repeat_enabled.setter - def repeat_enabled(self, enable_repeat: bool): + def repeat_enabled(self, enable_repeat: bool) -> None: """Set the repeat mode for this queue.""" if self._repeat_enabled != enable_repeat: self._repeat_enabled = enable_repeat @@ -126,14 +140,14 @@ class PlayerQueue: self.mass.add_job(self.__async_save_state()) @property - def crossfade_enabled(self): + def crossfade_enabled(self) -> bool: """Return if crossfade is enabled for this player's queue.""" return ( self.mass.config.player_settings[self.player_id]["crossfade_duration"] > 0 ) @property - def cur_index(self): + def cur_index(self) -> OptionalInt: """ Return the current index of the queue. @@ -144,7 +158,7 @@ class PlayerQueue: return self._cur_index @property - def cur_item_id(self): + def cur_item_id(self) -> OptionalStr: """ Return the queue item id of the current item in the queue. @@ -155,7 +169,7 @@ class PlayerQueue: return self.items[self.cur_index].queue_item_id @property - def cur_item(self): + def cur_item(self) -> Optional[QueueItem]: """ Return the current item in the queue. @@ -166,12 +180,12 @@ class PlayerQueue: return self.items[self.cur_index] @property - def cur_item_time(self): + def cur_item_time(self) -> int: """Return the time (progress) for current (playing) item.""" return self._cur_item_time @property - def next_index(self): + def next_index(self) -> OptionalInt: """Return the next index for this player's queue. Return None if queue is empty or no more items. @@ -191,7 +205,7 @@ class PlayerQueue: return None @property - def next_item(self): + def next_item(self) -> Optional[QueueItem]: """Return the next item in the queue. Returns None if queue is empty or no more items. @@ -201,12 +215,12 @@ class PlayerQueue: return None @property - def items(self): + def items(self) -> List[QueueItem]: """Return all queue items for this player's queue.""" return self._items @property - def use_queue_stream(self): + def use_queue_stream(self) -> bool: """ Indicate that we need to use the queue stream. @@ -220,24 +234,24 @@ class PlayerQueue: ) @property - def supports_queue(self): + def supports_queue(self) -> bool: """Return if this player supports native queue.""" return PlayerFeature.QUEUE in self.player.features @property - def supports_crossfade(self): + def supports_crossfade(self) -> bool: """Return if this player supports native crossfade.""" return PlayerFeature.CROSSFADE in self.player.features @callback - def get_item(self, index): + def get_item(self, index: int) -> Optional[QueueItem]: """Get item by index from queue.""" if index is not None and len(self.items) > index: return self.items[index] return None @callback - def by_item_id(self, queue_item_id: str): + def by_item_id(self, queue_item_id: str) -> Optional[QueueItem]: """Get item by queue_item_id from queue.""" if not queue_item_id: return None @@ -246,17 +260,15 @@ class PlayerQueue: return item return None - async def async_next(self): + async def async_next(self) -> None: """Play the next track in the queue.""" if self.cur_index is None: return if self.use_queue_stream: return await self.async_play_index(self.cur_index + 1) - return await self.mass.player_manager.get_player_provider( - self.player_id - ).async_cmd_next(self.player_id) + return await self.player.async_cmd_next() - async def async_previous(self): + async def async_previous(self) -> None: """Play the previous track in the queue.""" if self.cur_index is None: return @@ -264,7 +276,7 @@ class PlayerQueue: return await self.async_play_index(self.cur_index - 1) return await self.mass.player_manager.async_cmd_previous(self.player_id) - async def async_resume(self): + async def async_resume(self) -> None: """Resume previous queue.""" if self.items: prev_index = self.cur_index @@ -273,56 +285,37 @@ class PlayerQueue: else: # at this point we don't know if the queue is synced with the player # so just to be safe we send the queue_items to the player - player_provider = self.mass.player_manager.get_player_provider( - self.player_id - ) - await player_provider.async_cmd_queue_load(self.player_id, self.items) + await self.player.async_cmd_queue_load(self.items) await self.async_play_index(prev_index) else: LOGGER.warning( "resume queue requested for %s but queue is empty", self.player_id ) - async def async_play_index(self, index): + async def async_play_index(self, index: int) -> None: """Play item at index X in queue.""" - player_prov = self.mass.player_manager.get_player_provider(self.player_id) if not isinstance(index, int): index = self.__index_by_id(index) if not len(self.items) > index: return if self.use_queue_stream: self._next_queue_startindex = index - self.player.elapsed_time = 0 # set just in case of a race condition - queue_stream_uri = "%s/stream/%s?id=%s" % ( - self.mass.web.internal_url, - self.player.player_id, - self.items[ - index - ].queue_item_id, # just set to invalidate any cache stuff - ) - return await player_prov.async_cmd_play_uri( - self.player_id, queue_stream_uri - ) + queue_stream_uri = self.get_stream_url() + return await self.player.async_cmd_play_uri(queue_stream_uri) if self.supports_queue: try: - return await player_prov.async_cmd_queue_play_index( - self.player_id, index - ) + return await self.player.async_cmd_queue_play_index(index) except NotImplementedError: # not supported by player, use load queue instead LOGGER.debug( "cmd_queue_insert not supported by player, fallback to cmd_queue_load " ) self._items = self._items[index:] - return await player_prov.async_cmd_queue_load( - self.player_id, self._items - ) + return await self.player.async_cmd_queue_load(self._items) else: - return await player_prov.async_cmd_play_uri( - self.player_id, self._items[index].uri - ) + return await self.player.async_cmd_play_uri(self._items[index].uri) - async def async_move_item(self, queue_item_id, pos_shift=1): + async def async_move_item(self, queue_item_id: str, pos_shift: int = 1) -> None: """ Move queue item x up/down the queue. @@ -332,7 +325,7 @@ class PlayerQueue: """ items = self.items.copy() item_index = self.__index_by_id(queue_item_id) - if pos_shift == 0 and self.player.state == PlayerState.Playing: + if pos_shift == 0 and self.player.state == PlaybackState.Playing: new_index = self.cur_index + 1 elif pos_shift == 0: new_index = self.cur_index @@ -346,7 +339,7 @@ class PlayerQueue: if pos_shift == 0: await self.async_play_index(new_index) - async def async_load(self, queue_items: List[QueueItem]): + async def async_load(self, queue_items: List[QueueItem]) -> None: """Load (overwrite) queue with new items.""" for index, item in enumerate(queue_items): item.sort_index = index @@ -356,12 +349,11 @@ class PlayerQueue: if self.use_queue_stream or not self.supports_queue: await self.async_play_index(0) else: - player_prov = self.mass.player_manager.get_player_provider(self.player_id) - await player_prov.async_cmd_queue_load(self.player_id, queue_items) + await self.player.async_cmd_queue_load(queue_items) self.mass.signal_event(EVENT_QUEUE_ITEMS_UPDATED, self.to_dict()) self.mass.add_job(self.__async_save_state()) - async def async_insert(self, queue_items: List[QueueItem], offset=0): + async def async_insert(self, queue_items: List[QueueItem], offset: int = 0) -> None: """ Insert new items at offset x from current position. @@ -401,24 +393,19 @@ class PlayerQueue: await self.async_play_index(insert_at_index) else: # send queue to player's own implementation - player_prov = self.mass.player_manager.get_player_provider(self.player_id) try: - await player_prov.async_cmd_queue_insert( - self.player_id, queue_items, insert_at_index - ) + await self.player.async_cmd_queue_insert(queue_items, insert_at_index) except NotImplementedError: # not supported by player, use load queue instead LOGGER.debug( "cmd_queue_insert not supported by player, fallback to cmd_queue_load " ) self._items = self._items[self.cur_index :] - return await player_prov.async_cmd_queue_load( - self.player_id, self._items - ) + return await self.player.async_cmd_queue_load(self._items) self.mass.signal_event(EVENT_QUEUE_ITEMS_UPDATED, self.to_dict()) self.mass.add_job(self.__async_save_state()) - async def async_append(self, queue_items: List[QueueItem]): + async def async_append(self, queue_items: List[QueueItem]) -> None: """Append new items at the end of the queue.""" for index, item in enumerate(queue_items): item.sort_index = len(self.items) + index @@ -431,67 +418,60 @@ class PlayerQueue: self._items = self._items + queue_items if self.supports_queue and not self.use_queue_stream: # send queue to player's own implementation - player_prov = self.mass.player_manager.get_player_provider(self.player_id) try: - await player_prov.async_cmd_queue_append(self.player_id, queue_items) + await self.player.async_cmd_queue_append(queue_items) except NotImplementedError: # not supported by player, use load queue instead LOGGER.debug( "cmd_queue_append not supported by player, fallback to cmd_queue_load " ) self._items = self._items[self.cur_index :] - return await player_prov.async_cmd_queue_load( - self.player_id, self._items - ) + return await self.player.async_cmd_queue_load(self._items) self.mass.signal_event(EVENT_QUEUE_ITEMS_UPDATED, self.to_dict()) self.mass.add_job(self.__async_save_state()) - async def async_update(self, queue_items: List[QueueItem]): + async def async_update(self, queue_items: List[QueueItem]) -> None: """Update the existing queue items, mostly caused by reordering.""" self._items = queue_items if self.supports_queue and not self.use_queue_stream: # send queue to player's own implementation - player_prov = self.mass.player_manager.get_player_provider(self.player_id) try: - await player_prov.async_cmd_queue_update(self.player_id, queue_items) + await self.player.async_cmd_queue_update(queue_items) except NotImplementedError: # not supported by player, use load queue instead LOGGER.debug( "cmd_queue_update not supported by player, fallback to cmd_queue_load " ) self._items = self._items[self.cur_index :] - return await player_prov.async_cmd_queue_load( - self.player_id, self._items - ) + return await self.player.async_cmd_queue_load(self._items) self.mass.signal_event(EVENT_QUEUE_ITEMS_UPDATED, self.to_dict()) self.mass.add_job(self.__async_save_state()) - async def async_clear(self): + async def async_clear(self) -> None: """Clear all items in the queue.""" await self.mass.player_manager.async_cmd_stop(self.player_id) self._items = [] if self.supports_queue: # send queue cmd to player's own implementation - player_prov = self.mass.player_manager.get_player_provider(self.player_id) try: - await player_prov.async_cmd_queue_clear(self.player_id) + await self.player.async_cmd_queue_clear() except NotImplementedError: # not supported by player, try update instead try: - await player_prov.async_cmd_queue_update(self.player_id, []) + await self.player.async_cmd_queue_update([]) except NotImplementedError: # not supported by player, ignore pass self.mass.signal_event(EVENT_QUEUE_ITEMS_UPDATED, self.to_dict()) - async def async_update_state(self): + async def async_update_state(self) -> None: """Update queue details, called when player updates.""" new_index = self._cur_index track_time = self._cur_item_time # handle queue stream if ( self.use_queue_stream - and self.player.state == PlayerState.Playing + and self.player.state == PlaybackState.Playing and self.player.elapsed_time > 1 ): new_index, track_time = self.__get_queue_stream_index() @@ -527,14 +507,14 @@ class PlayerQueue: {"player_id": self.player_id, "cur_item_time": track_time}, ) - async def async_start_queue_stream(self): + async def async_start_queue_stream(self) -> None: """Call when queue_streamer starts playing the queue stream.""" self._last_queue_startindex = self._next_queue_startindex self._cur_item_time = 0 return self.get_item(self._next_queue_startindex) - def to_dict(self): + def to_dict(self) -> dict: """Instance attributes as dict so it can be serialized to json.""" return { "player_id": self.player.player_id, @@ -552,7 +532,7 @@ class PlayerQueue: } @callback - def __get_queue_stream_index(self): + def __get_queue_stream_index(self) -> Tuple[int, int]: """Get index of queue stream.""" # player is playing a constant stream of the queue so we need to do this the hard way queue_index = 0 @@ -575,13 +555,13 @@ class PlayerQueue: return queue_index, track_time @staticmethod - def __shuffle_items(queue_items): + def __shuffle_items(queue_items) -> List[QueueItem]: """Shuffle a list of tracks.""" # for now we use default python random function # can be extended with some more magic last_played and stuff return random.sample(queue_items, len(queue_items)) - def __index_by_id(self, queue_item_id): + def __index_by_id(self, queue_item_id) -> OptionalInt: """Get index by queue_item_id.""" item_index = None for index, item in enumerate(self.items): @@ -589,7 +569,7 @@ class PlayerQueue: item_index = index return item_index - async def __async_restore_saved_state(self): + async def __async_restore_saved_state(self) -> None: """Try to load the saved queue for this player from cache file.""" cache_str = "queue_state_%s" % self.player.player_id cache_data = await self.mass.cache.async_get(cache_str) @@ -602,7 +582,7 @@ class PlayerQueue: # pylint: enable=unused-argument - async def __async_save_state(self): + async def __async_save_state(self) -> None: """Save current queue settings to file.""" cache_str = "queue_state_%s" % self.player_id cache_data = { diff --git a/music_assistant/models/player_state.py b/music_assistant/models/player_state.py new file mode 100755 index 00000000..3bdbbab3 --- /dev/null +++ b/music_assistant/models/player_state.py @@ -0,0 +1,534 @@ +""" +Models and helpers for the calculated state of a player. + +PlayerProviders send Player objects to us with the raw/untouched player state. +Due to configuration settings and other influences this playerstate needs alteration, +that's why we store the final player state (we present to outside world) +into a PlayerState object. +""" + +import logging +from datetime import datetime +from typing import List, Optional + +from music_assistant.constants import ( + CONF_ENABLED, + CONF_GROUP_DELAY, + CONF_NAME, + CONF_POWER_CONTROL, + CONF_VOLUME_CONTROL, + EVENT_PLAYER_CHANGED, +) +from music_assistant.helpers.typing import MusicAssistantType +from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType +from music_assistant.models.player import ( + DeviceInfo, + PlaybackState, + Player, + PlayerControlType, + PlayerFeature, +) +from music_assistant.utils import callback + +LOGGER = logging.getLogger("mass") + +ATTR_PLAYER_ID = "player_id" +ATTR_PROVIDER_ID = "provider_id" +ATTR_NAME = "name" +ATTR_POWERED = "powered" +ATTR_ELAPSED_TIME = "elapsed_time" +ATTR_STATE = "state" +ATTR_AVAILABLE = "available" +ATTR_CURRENT_URI = "current_uri" +ATTR_VOLUME_LEVEL = "volume_level" +ATTR_MUTED = "muted" +ATTR_IS_GROUP_PLAYER = "is_group_player" +ATTR_GROUP_CHILDS = "group_childs" +ATTR_DEVICE_INFO = "device_info" +ATTR_SHOULD_POLL = "should_poll" +ATTR_FEATURES = "features" +ATTR_CONFIG_ENTRIES = "config_entries" +ATTR_UPDATED_AT = "updated_at" +ATTR_ACTIVE_QUEUE = "active_queue" +ATTR_GROUP_PARENTS = "group_parents" + + +# list of Player attributes that can/will cause a player changed event +UPDATE_ATTRIBUTES = [ + ATTR_NAME, + ATTR_POWERED, + ATTR_STATE, + ATTR_AVAILABLE, + ATTR_CURRENT_URI, + ATTR_VOLUME_LEVEL, + ATTR_MUTED, + ATTR_IS_GROUP_PLAYER, + ATTR_GROUP_CHILDS, + ATTR_DEVICE_INFO, + ATTR_FEATURES, +] + + +class PlayerState: + """ + Model for the calculated state of a player. + + PlayerProviders send Player objects to us with the raw/untouched player state. + Due to configuration settings and other influences this playerstate needs alteration, + that's why we store the final player state (we present to outside world) + into this PlayerState object. + """ + + def __init__(self, mass: MusicAssistantType, player: Player): + """Initialize a PlayerState from a Player object.""" + self.mass = mass + # make sure the MusicAssistant obj is present on the player + player.mass = mass + self._player = player + self._player_id = player.player_id + self._provider_id = player.provider_id + self._features = player.features + self._muted = player.muted + self._is_group_player = player.is_group_player + self._group_childs = player.group_childs + self._device_info = player.device_info + self._elapsed_time = player.elapsed_time + self._current_uri = player.current_uri + self._available = player.available + self._name = player.name + self._powered = player.powered + self._state = player.state + self._volume_level = player.volume_level + self._updated_at = datetime.utcnow() + self._group_parents = self.get_group_parents() + self._active_queue = self.get_active_queue() + self._config_entries = self.get_player_config_entries() + # schedule update to set the transforms + self.mass.add_job(self.async_update(player)) + + @property + def player(self): + """Return the underlying player object.""" + return self._player + + @property + def player_id(self) -> str: + """Return player id of this player.""" + return self._player_id + + @property + def provider_id(self) -> str: + """Return provider id of this player.""" + return self._provider_id + + @property + def name(self) -> str: + """Return name of the player.""" + return self._name + + @property + def powered(self) -> bool: + """Return current power state of player.""" + return self._powered + + @property + def elapsed_time(self) -> int: + """Return elapsed time of current playing media in seconds.""" + return self._elapsed_time + + @property + def elapsed_milliseconds(self) -> Optional[int]: + """ + Return elapsed time of current playing media in milliseconds. + + This is an optional property. + If provided, the property must return the REALTIME value while playing. + Used for synced playback in player groups. + """ + return self.player.elapsed_milliseconds # always realtime returned from player + + @property + def state(self) -> PlaybackState: + """Return current PlaybackState of player.""" + return self._state + + @property + def available(self) -> bool: + """Return current availablity of player.""" + return self._available + + @property + def current_uri(self) -> Optional[str]: + """Return currently loaded uri of player (if any).""" + return self._current_uri + + @property + def volume_level(self) -> int: + """Return current volume level of player (scale 0..100).""" + return self._volume_level + + @property + def muted(self) -> bool: + """Return current mute state of player.""" + return self._muted + + @property + def is_group_player(self) -> bool: + """Return True if this player is a group player.""" + return self._is_group_player + + @property + def group_childs(self) -> List[str]: + """Return list of child player id's if player is a group player.""" + return self._group_childs + + @property + def device_info(self) -> DeviceInfo: + """Return the device info for this player.""" + return self._device_info + + @property + def should_poll(self) -> bool: + """Return True if this player should be polled for state updates.""" + return self._player.should_poll # always realtime returned from player + + @property + def features(self) -> List[PlayerFeature]: + """Return list of features this player supports.""" + return self._features + + @property + def config_entries(self) -> List[ConfigEntry]: + """Return player specific config entries (if any).""" + return self._config_entries + + async def async_update(self, player: Player): + """Update attributes from player object.""" + # detect state changes + changed_keys = set() + for attr in UPDATE_ATTRIBUTES: + new_value = getattr(self._player, attr) + + # handle transformations + if attr == ATTR_NAME: + new_value = self.get_name(new_value) + elif attr == ATTR_POWERED: + new_value = self.get_power(new_value) + elif attr == ATTR_STATE: + new_value = self.get_state(new_value) + elif attr == ATTR_AVAILABLE: + new_value = self.get_available(new_value) + elif attr == ATTR_VOLUME_LEVEL: + new_value = self.get_volume_level(new_value) + + current_value = getattr(self, attr) + + if current_value != new_value: + # value changed + setattr(self, "_" + attr, new_value) + changed_keys.add(attr) + LOGGER.debug("Attribute %s changed on player %s", attr, self.player_id) + + # some attributes are always updated + self._elapsed_time = player.elapsed_time + self._updated_at = datetime.utcnow() + self._group_parents = self.get_group_parents() + self._active_queue = self.get_active_queue() + self._config_entries = self.get_player_config_entries() + + if changed_keys: + self.mass.signal_event(EVENT_PLAYER_CHANGED, self) + # update group player childs when parent updates + if ATTR_GROUP_CHILDS in changed_keys: + for child_player_id in self.group_childs: + self.mass.add_job( + self.mass.player_manager.async_trigger_player_update( + child_player_id + ) + ) + + # always update the player queue + player_queue = self.mass.player_manager.get_player_queue(self.active_queue) + if player_queue: + self.mass.add_job(player_queue.async_update_state()) + + @callback + def get_name(self, name: str) -> str: + """Return final/calculated player name.""" + conf_name = self.mass.config.get_player_config(self.player_id)[CONF_NAME] + return conf_name if conf_name else name + + @callback + def get_power(self, power: bool) -> bool: + """Return final/calculated player's power state.""" + if not self.available: + return False + player_config = self.mass.config.player_settings[self.player_id] + if player_config.get(CONF_POWER_CONTROL): + control = self.mass.player_manager.get_player_control( + player_config[CONF_POWER_CONTROL] + ) + if control: + return control.state + return power + + @callback + def get_state(self, state: PlaybackState) -> PlaybackState: + """Return final/calculated player's playback state.""" + if self.powered and self.active_queue != self.player_id: + # use group state + return self.mass.player_manager.get_player(self.active_queue).state + if state == PlaybackState.Stopped and not self.powered: + return PlaybackState.Off + return state + + @callback + def get_available(self, available: bool) -> bool: + """Return current availablity of player.""" + player_enabled = bool( + self.mass.config.get_player_config(self.player_id)[CONF_ENABLED] + ) + return False if not player_enabled else available + + @callback + def get_volume_level(self, volume_level: int) -> int: + """Return final/calculated player's volume_level.""" + if not self.available: + return 0 + player_config = self.mass.config.player_settings[self.player_id] + if player_config.get(CONF_VOLUME_CONTROL): + control = self.mass.player_manager.get_player_control( + player_config[CONF_VOLUME_CONTROL] + ) + if control: + return control.state + # handle group volume + if self.is_group_player: + group_volume = 0 + active_players = 0 + for child_player_id in self.group_childs: + child_player = self.mass.player_manager.get_player(child_player_id) + if child_player and child_player.available and child_player.powered: + group_volume += child_player.volume_level + active_players += 1 + if active_players: + group_volume = group_volume / active_players + return group_volume + return volume_level + + @property + def group_parents(self) -> List[str]: + """Return all group players this player belongs to.""" + return self._group_parents + + @callback + def get_group_parents(self) -> List[str]: + """Return all group players this player belongs to.""" + if self.is_group_player: + return [] + result = [] + for player in self.mass.player_manager.players: + if not player.is_group_player: + continue + if self.player_id not in player.group_childs: + continue + result.append(player.player_id) + return result + + @property + def active_queue(self) -> str: + """Return the active parent player/queue for a player.""" + return self._active_queue + + @callback + def get_active_queue(self) -> str: + """Return the active parent player/queue for a player.""" + # if a group is powered on, all of it's childs will have/use + # the parent's player's queue. + for group_player_id in self.group_parents: + group_player = self.mass.player_manager.get_player(group_player_id) + if group_player and group_player.powered: + return group_player_id + return self.player_id + + @property + def updated_at(self) -> datetime: + """Return the datetime (UTC) that the player state was last updated.""" + return self._updated_at + + @callback + def get_player_config_entries(self): + """Get final/calculated config entries for a player.""" + entries = [item for item in self.player.config_entries] + # append power control config entries + power_controls = self.mass.player_manager.get_player_controls( + PlayerControlType.POWER + ) + if power_controls: + controls = [ + {"text": f"{item.provider}: {item.name}", "value": item.control_id} + for item in power_controls + ] + entries.append( + ConfigEntry( + entry_key=CONF_POWER_CONTROL, + entry_type=ConfigEntryType.STRING, + description_key=CONF_POWER_CONTROL, + values=controls, + ) + ) + # append volume control config entries + volume_controls = self.mass.player_manager.get_player_controls( + PlayerControlType.VOLUME + ) + if volume_controls: + controls = [ + {"text": f"{item.provider}: {item.name}", "value": item.control_id} + for item in volume_controls + ] + entries.append( + ConfigEntry( + entry_key=CONF_VOLUME_CONTROL, + entry_type=ConfigEntryType.STRING, + description_key=CONF_VOLUME_CONTROL, + values=controls, + ) + ) + # append group player entries + for parent_id in self.group_parents: + parent_player = self.mass.player_manager.get_player(parent_id) + if parent_player and parent_player.provider_id == "group_player": + entries.append( + ConfigEntry( + entry_key=CONF_GROUP_DELAY, + entry_type=ConfigEntryType.INT, + default_value=0, + range=(0, 500), + description_key=CONF_GROUP_DELAY, + ) + ) + break + return entries + + @callback + def to_dict(self): + """Instance attributes as dict so it can be serialized to json.""" + return { + ATTR_PLAYER_ID: self.player_id, + ATTR_PROVIDER_ID: self.provider_id, + ATTR_NAME: self.name, + ATTR_POWERED: self.powered, + ATTR_ELAPSED_TIME: int(self.elapsed_time), + ATTR_STATE: self.state.value, + ATTR_AVAILABLE: self.available, + ATTR_CURRENT_URI: self.current_uri, + ATTR_VOLUME_LEVEL: self.volume_level, + ATTR_MUTED: self.muted, + ATTR_IS_GROUP_PLAYER: self.is_group_player, + ATTR_GROUP_CHILDS: self.group_childs, + ATTR_DEVICE_INFO: self.device_info.to_dict(), + ATTR_UPDATED_AT: self.updated_at.isoformat(), + ATTR_GROUP_PARENTS: self.group_parents, + ATTR_FEATURES: self.features, + ATTR_ACTIVE_QUEUE: self.active_queue, + } + + async def async_cmd_play_uri(self, uri: str) -> None: + """ + Play the specified uri/url on the player. + + :param uri: uri/url to send to the player. + """ + return await self.player.async_cmd_play_uri(uri) + + async def async_cmd_stop(self) -> None: + """Send STOP command to player.""" + return await self.player.async_cmd_stop() + + async def async_cmd_play(self) -> None: + """Send PLAY command to player.""" + return await self.player.async_cmd_play() + + async def async_cmd_pause(self) -> None: + """Send PAUSE command to player.""" + return await self.player.async_cmd_pause() + + async def async_cmd_next(self) -> None: + """Send NEXT TRACK command to player.""" + return await self.player.async_cmd_next() + + async def async_cmd_previous(self) -> None: + """Send PREVIOUS TRACK command to player.""" + return await self.player.async_cmd_previous() + + async def async_cmd_power_on(self) -> None: + """Send POWER ON command to player.""" + return await self.player.async_cmd_power_on() + + async def async_cmd_power_off(self) -> None: + """Send POWER OFF command to player.""" + return await self.player.async_cmd_power_off() + + async def async_cmd_volume_set(self, volume_level: int) -> None: + """ + Send volume level command to player. + + :param volume_level: volume level to set (0..100). + """ + return await self.player.async_cmd_volume_set(volume_level) + + async def async_cmd_volume_mute(self, is_muted: bool = False) -> None: + """ + Send volume MUTE command to given player. + + :param is_muted: bool with new mute state. + """ + return await self.player.async_cmd_volume_mute(is_muted) + + # OPTIONAL: QUEUE SERVICE CALLS/COMMANDS - OVERRIDE ONLY IF SUPPORTED BY PROVIDER + + async def async_cmd_queue_play_index(self, index: int) -> None: + """ + Play item at index X on player's queue. + + :param index: (int) index of the queue item that should start playing + """ + return await self.player.async_cmd_queue_play_index(index) + + async def async_cmd_queue_load(self, queue_items) -> None: + """ + Load/overwrite given items in the player's queue implementation. + + :param queue_items: a list of QueueItems + """ + return await self.player.async_cmd_queue_load(queue_items) + + async def async_cmd_queue_insert(self, queue_items, insert_at_index: int) -> None: + """ + Insert new items at position X into existing queue. + + If insert_at_index 0 or None, will start playing newly added item(s) + :param queue_items: a list of QueueItems + :param insert_at_index: queue position to insert new items + """ + return await self.player.async_cmd_queue_insert(queue_items, insert_at_index) + + async def async_cmd_queue_append(self, queue_items) -> None: + """ + Append new items at the end of the queue. + + :param queue_items: a list of QueueItems + """ + return await self.player.async_cmd_queue_append(queue_items) + + async def async_cmd_queue_update(self, queue_items) -> None: + """ + Overwrite the existing items in the queue, used for reordering. + + :param queue_items: a list of QueueItems + """ + return await self.player.async_cmd_queue_update(queue_items) + + async def async_cmd_queue_clear(self) -> None: + """Clear the player's queue.""" + return await self.player.async_cmd_queue_clear() diff --git a/music_assistant/models/playerprovider.py b/music_assistant/models/playerprovider.py index ed89ecbf..d2bd7722 100755 --- a/music_assistant/models/playerprovider.py +++ b/music_assistant/models/playerprovider.py @@ -1,14 +1,9 @@ """Models and helpers for a player provider.""" -from abc import abstractmethod -from dataclasses import dataclass -from typing import List - -from music_assistant.models.player_queue import QueueItem +from music_assistant.helpers.typing import Players from music_assistant.models.provider import Provider, ProviderType -@dataclass class PlayerProvider(Provider): """ Base class for a Playerprovider. @@ -16,166 +11,17 @@ class PlayerProvider(Provider): Should be overridden/subclassed by provider specific implementation. """ - type: ProviderType = ProviderType.PLAYER_PROVIDER - - # SERVICE CALLS / PLAYER COMMANDS - - @abstractmethod - async def async_cmd_play_uri(self, player_id: str, uri: str): - """ - Play the specified uri/url on the given player. - - :param player_id: player_id of the player to handle the command. - """ - raise NotImplementedError - - @abstractmethod - async def async_cmd_stop(self, player_id: str) -> None: - """ - Send STOP command to given player. - - :param player_id: player_id of the player to handle the command. - """ - raise NotImplementedError - - @abstractmethod - async def async_cmd_play(self, player_id: str) -> None: - """ - Send PLAY command to given player. - - :param player_id: player_id of the player to handle the command. - """ - raise NotImplementedError - - @abstractmethod - async def async_cmd_pause(self, player_id: str): - """ - Send PAUSE command to given player. - - :param player_id: player_id of the player to handle the command. - """ - raise NotImplementedError - - @abstractmethod - async def async_cmd_next(self, player_id: str): - """ - Send NEXT TRACK command to given player. - - :param player_id: player_id of the player to handle the command. - """ - raise NotImplementedError - - @abstractmethod - async def async_cmd_previous(self, player_id: str): - """ - Send PREVIOUS TRACK command to given player. - - :param player_id: player_id of the player to handle the command. - """ - raise NotImplementedError - - @abstractmethod - async def async_cmd_power_on(self, player_id: str) -> None: - """ - Send POWER ON command to given player. - - :param player_id: player_id of the player to handle the command. - """ - raise NotImplementedError - - @abstractmethod - async def async_cmd_power_off(self, player_id: str) -> None: - """ - Send POWER OFF command to given player. - - :param player_id: player_id of the player to handle the command. - """ - raise NotImplementedError - - @abstractmethod - async def async_cmd_volume_set(self, player_id: str, volume_level: int) -> None: - """ - Send volume level command to given player. - - :param player_id: player_id of the player to handle the command. - :param volume_level: volume level to set (0..100). - """ - raise NotImplementedError - - @abstractmethod - async def async_cmd_volume_mute(self, player_id: str, is_muted=False): - """ - Send volume MUTE command to given player. - - :param player_id: player_id of the player to handle the command. - :param is_muted: bool with new mute state. - """ - raise NotImplementedError - - # OPTIONAL: QUEUE SERVICE CALLS/COMMANDS - OVERRIDE ONLY IF SUPPORTED BY PROVIDER - # pylint: disable=abstract-method - - async def async_cmd_queue_play_index(self, player_id: str, index: int): - """ - Play item at index X on player's queue. - - :param player_id: player_id of the player to handle the command. - :param index: (int) index of the queue item that should start playing - """ - raise NotImplementedError - - async def async_cmd_queue_load(self, player_id: str, queue_items: List[QueueItem]): - """ - Load/overwrite given items in the player's queue implementation. - - :param player_id: player_id of the player to handle the command. - :param queue_items: a list of QueueItems - """ - raise NotImplementedError - - async def async_cmd_queue_insert( - self, player_id: str, queue_items: List[QueueItem], insert_at_index: int - ): - """ - Insert new items at position X into existing queue. - - If insert_at_index 0 or None, will start playing newly added item(s) - :param player_id: player_id of the player to handle the command. - :param queue_items: a list of QueueItems - :param insert_at_index: queue position to insert new items - """ - # pylint: disable=abstract-method - raise NotImplementedError - - async def async_cmd_queue_append( - self, player_id: str, queue_items: List[QueueItem] - ): - """ - Append new items at the end of the queue. - - :param player_id: player_id of the player to handle the command. - :param queue_items: a list of QueueItems - """ - # pylint: disable=abstract-method - raise NotImplementedError - - async def async_cmd_queue_update( - self, player_id: str, queue_items: List[QueueItem] - ): - """ - Overwrite the existing items in the queue, used for reordering. - - :param player_id: player_id of the player to handle the command. - :param queue_items: a list of QueueItems - """ - # pylint: disable=abstract-method - raise NotImplementedError - - async def async_cmd_queue_clear(self, player_id: str): - """ - Clear the player's queue. - - :param player_id: player_id of the player to handle the command. - """ - # pylint: disable=abstract-method - raise NotImplementedError + @property + def type(self) -> ProviderType: + """Return ProviderType.""" + return ProviderType.PLAYER_PROVIDER + + @property + def players(self, calculated_state=False) -> Players: + """Return all players belonging to this provider.""" + # pylint: disable=no-member + return [ + player + for player in self.mass.player_manager.players + if player.provider_id == self.id + ] diff --git a/music_assistant/models/provider.py b/music_assistant/models/provider.py index 3553f44d..47510742 100644 --- a/music_assistant/models/provider.py +++ b/music_assistant/models/provider.py @@ -1,15 +1,12 @@ -"""Generic Models and helpers for plugins.""" +"""Generic Models and helpers for providers/plugins.""" from abc import abstractmethod -from dataclasses import dataclass from enum import Enum -from typing import TYPE_CHECKING, List +from typing import List +from music_assistant.helpers.typing import MusicAssistantType from music_assistant.models.config_entry import ConfigEntry -if TYPE_CHECKING: - from music_assistant.mass import MusicAssistant - class ProviderType(Enum): """Enum with plugin types.""" @@ -19,13 +16,19 @@ class ProviderType(Enum): GENERIC = "generic" -@dataclass class Provider: """Base model for a provider/plugin.""" - type: ProviderType = ProviderType.GENERIC - mass: "MusicAssistant" = None - available: bool = False + mass: MusicAssistantType = ( + None # will be set automagically while loading the provider + ) + available: bool = False # will be set automagically while loading the provider + + @property + @abstractmethod + def type(self) -> ProviderType: + """Return ProviderType.""" + return ProviderType.GENERIC @property @abstractmethod @@ -52,10 +55,10 @@ class Provider: raise NotImplementedError @abstractmethod - async def async_on_stop(self): + async def async_on_stop(self) -> None: """Handle correct close/cleanup of the provider on exit. Called on shutdown.""" - async def async_on_reload(self): + async def async_on_reload(self) -> None: """Handle configuration changes for this provider. Called on reload.""" await self.async_on_stop() await self.async_on_start() diff --git a/music_assistant/models/streamdetails.py b/music_assistant/models/streamdetails.py index ded98e13..5a5e3cbf 100644 --- a/music_assistant/models/streamdetails.py +++ b/music_assistant/models/streamdetails.py @@ -13,6 +13,7 @@ class StreamType(Enum): EXECUTABLE = "executable" URL = "url" FILE = "file" + CACHE = "cache" class ContentType(Enum): diff --git a/music_assistant/player_manager.py b/music_assistant/player_manager.py index bb036278..fb281c4c 100755 --- a/music_assistant/player_manager.py +++ b/music_assistant/player_manager.py @@ -1,29 +1,28 @@ """PlayerManager: Orchestrates all players from player providers.""" import logging -from datetime import datetime from typing import List, Optional from music_assistant.constants import ( - CONF_ENABLED, - CONF_NAME, + CONF_POWER_CONTROL, + CONF_VOLUME_CONTROL, EVENT_PLAYER_ADDED, - EVENT_PLAYER_CHANGED, EVENT_PLAYER_CONTROL_REGISTERED, EVENT_PLAYER_CONTROL_UPDATED, EVENT_PLAYER_REMOVED, EVENT_REGISTER_PLAYER_CONTROL, EVENT_UNREGISTER_PLAYER_CONTROL, ) -from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType +from music_assistant.helpers.typing import MusicAssistantType from music_assistant.models.media_types import MediaItem, MediaType, Track from music_assistant.models.player import ( + PlaybackState, Player, PlayerControl, PlayerControlType, - PlayerState, ) from music_assistant.models.player_queue import PlayerQueue, QueueItem, QueueOption +from music_assistant.models.player_state import PlayerState from music_assistant.models.playerprovider import PlayerProvider from music_assistant.models.provider import ProviderType from music_assistant.utils import ( @@ -34,8 +33,6 @@ from music_assistant.utils import ( ) POLL_INTERVAL = 10 -CONF_VOLUME_CONTROL = "volume_control" -CONF_POWER_CONTROL = "power_control" LOGGER = logging.getLogger("mass") @@ -43,11 +40,10 @@ LOGGER = logging.getLogger("mass") class PlayerManager: """Several helpers to handle playback through player providers.""" - def __init__(self, mass): + def __init__(self, mass: MusicAssistantType): """Initialize class.""" self.mass = mass - self._players = {} - self._org_players = {} + self._player_states = {} self._providers = {} self._player_queues = {} self._poll_ticks = 0 @@ -73,21 +69,26 @@ class PlayerManager: @run_periodic(1) async def poll_task(self): """Check for updates on players that need to be polled.""" - for player in self._org_players.values(): - if player.should_poll and ( - self._poll_ticks >= POLL_INTERVAL or player.state == PlayerState.Playing + for player_state in self.players: + if player_state.player.should_poll and ( + self._poll_ticks >= POLL_INTERVAL + or player_state.state == PlaybackState.Playing ): - # Just request update, value checking for changes is handled - await self.async_update_player(player) + await player_state.player.async_on_update() if self._poll_ticks >= POLL_INTERVAL: self._poll_ticks = 0 else: self._poll_ticks += 1 @property - def players(self) -> List[Player]: + def players(self) -> List[PlayerState]: """Return all registered players.""" - return list(self._players.values()) + return list(self._player_states.values()) + + @property + def player_queues(self) -> List[PlayerQueue]: + """Return all player queues.""" + return list(self._player_queues.values()) @property def providers(self) -> List[PlayerProvider]: @@ -95,9 +96,9 @@ class PlayerManager: return self.mass.get_providers(ProviderType.PLAYER_PROVIDER) @callback - def get_player(self, player_id: str) -> Player: + def get_player(self, player_id: str) -> PlayerState: """Return player by player_id or None if player does not exist.""" - player = self._players.get(player_id) + player = self._player_states.get(player_id) if not player: LOGGER.warning("Player %s is not available!", player_id) return player @@ -111,9 +112,10 @@ class PlayerManager: @callback def get_player_queue(self, player_id: str) -> PlayerQueue: """Return player's queue by player_id or None if player does not exist.""" - if player_id not in self._players: + player = self.get_player(player_id) + if not player: + LOGGER.warning("Player(queue) %s is not available!", player_id) return None - player = self._players[player_id] return self._player_queues.get(player.active_queue) @callback @@ -141,37 +143,41 @@ class PlayerManager: """Register a new player or update an existing one.""" if not player or not player.available: return - is_new_player = player.player_id not in self._players - await self.__async_create_player_state(player) - if is_new_player: - # create player queue - if player.player_id not in self._player_queues: - self._player_queues[player.player_id] = PlayerQueue( - self.mass, player.player_id - ) - # TODO: turn on player if it was previously turned on ? - LOGGER.info( - "New player added: %s/%s", - player.provider_id, - self._players[player.player_id].name, - ) - self.mass.signal_event(EVENT_PLAYER_ADDED, self._players[player.player_id]) + if player.player_id in self._player_states: + return await self.async_update_player(player) + # create playerstate and queue object + self._player_states[player.player_id] = PlayerState(self.mass, player) + self._player_queues[player.player_id] = PlayerQueue(self.mass, player.player_id) + # TODO: turn on player if it was previously turned on ? + LOGGER.info( + "New player added: %s/%s", + player.provider_id, + self._player_states[player.player_id].name, + ) + self.mass.signal_event( + EVENT_PLAYER_ADDED, self._player_states[player.player_id] + ) async def async_remove_player(self, player_id: str): """Remove a player from the registry.""" - self._players.pop(player_id, None) + player_state = self._player_states.pop(player_id, None) + if player_state: + await player_state.player.async_on_remove() self._player_queues.pop(player_id, None) - self._org_players.pop(player_id, None) LOGGER.info("Player removed: %s", player_id) self.mass.signal_event(EVENT_PLAYER_REMOVED, {"player_id": player_id}) async def async_update_player(self, player: Player): """Update an existing player (or register as new if non existing).""" - if not player: - return - if player.player_id not in self._players: + if player.player_id not in self._player_states: return await self.async_add_player(player) - await self.__async_create_player_state(player) + await self._player_states[player.player_id].async_update(player) + + async def async_trigger_player_update(self, player_id: str): + """Trigger update of an existing player..""" + player = self.get_player(player_id) + if player: + await self._player_states[player.player_id].async_update(player.player) async def async_register_player_control(self, control: PlayerControl): """Register a playercontrol with the player manager.""" @@ -186,7 +192,7 @@ class PlayerManager: control.name, ) # update all players using this playercontrol - for player_id, player in self._players.items(): + for player_id, player in self.players: conf = self.mass.config.player_settings[player_id] if control.control_id in [ conf.get(CONF_POWER_CONTROL), @@ -209,13 +215,13 @@ class PlayerManager: new_state, ) # update all players using this playercontrol - for player_id, player in self._players.items(): + for player_id, player in self.players: conf = self.mass.config.player_settings[player_id] if control.control_id in [ conf.get(CONF_POWER_CONTROL), conf.get(CONF_VOLUME_CONTROL), ]: - self.mass.add_job(self.async_update_player(player)) + player.update_state() # SERVICE CALLS / PLAYER COMMANDS @@ -260,7 +266,7 @@ class PlayerManager: async for track in tracks: queue_item = QueueItem(track) # generate uri for this queue item - queue_item.uri = "%s/stream/%s/%s" % ( + queue_item.uri = "%s/stream/queue/%s/%s" % ( self.mass.web.internal_url, player_id, queue_item.queue_item_id, @@ -327,9 +333,8 @@ class PlayerManager: if not player: return queue_player_id = player.active_queue - return await self.get_player_provider(queue_player_id).async_cmd_stop( - queue_player_id - ) + queue_player = self.get_player(queue_player_id) + return await queue_player.async_cmd_stop() async def async_cmd_play(self, player_id: str) -> None: """ @@ -341,11 +346,10 @@ class PlayerManager: if not player: return queue_player_id = player.active_queue + queue_player = self.get_player(queue_player_id) # unpause if paused else resume queue - if player.state == PlayerState.Paused: - return await self.get_player_provider(queue_player_id).async_cmd_play( - queue_player_id - ) + if queue_player.state == PlaybackState.Paused: + return await queue_player.async_cmd_play() # power on at play request await self.async_cmd_power_on(player_id) return await self._player_queues[queue_player_id].async_resume() @@ -360,9 +364,8 @@ class PlayerManager: if not player: return queue_player_id = player.active_queue - return await self.get_player_provider(queue_player_id).async_cmd_pause( - queue_player_id - ) + queue_player = self.get_player(queue_player_id) + return await queue_player.async_cmd_pause() async def async_cmd_play_pause(self, player_id: str): """ @@ -373,7 +376,7 @@ class PlayerManager: player = self.get_player(player_id) if not player: return - if player.state == PlayerState.Playing: + if player.state == PlaybackState.Playing: return await self.async_cmd_pause(player_id) return await self.async_cmd_play(player_id) @@ -412,7 +415,7 @@ class PlayerManager: return player_config = self.mass.config.player_settings[player.player_id] # turn on player - await self.get_player_provider(player_id).async_cmd_power_on(player_id) + await player.async_cmd_power_on() # player control support if player_config.get(CONF_POWER_CONTROL): control = self.get_player_control(player_config[CONF_POWER_CONTROL]) @@ -433,7 +436,7 @@ class PlayerManager: await self.async_cmd_stop(player_id) player_config = self.mass.config.player_settings[player.player_id] # turn off player - await self.get_player_provider(player_id).async_cmd_power_off(player_id) + await player.async_cmd_power_off() # player control support if player_config.get(CONF_POWER_CONTROL): control = self.get_player_control(player_config[CONF_POWER_CONTROL]) @@ -485,7 +488,6 @@ class PlayerManager: player = self.get_player(player_id) if not player or not player.powered: return - player_prov = self.get_player_provider(player_id) player_config = self.mass.config.player_settings[player.player_id] volume_level = try_parse_int(volume_level) if volume_level < 0: @@ -498,7 +500,7 @@ class PlayerManager: if control: await control.async_set_state(volume_level) # just force full volume on actual player if volume is outsourced to volumecontrol - await player_prov.async_cmd_volume_set(player_id, 100) + await player.async_cmd_volume_set(player_id, 100) # handle group volume elif player.is_group_player: cur_volume = player.volume_level @@ -509,7 +511,7 @@ class PlayerManager: else: volume_dif_percent = volume_dif / cur_volume for child_player_id in player.group_childs: - child_player = self._players.get(child_player_id) + child_player = self.get_player(child_player_id) if child_player and child_player.available and child_player.powered: cur_child_volume = child_player.volume_level new_child_volume = cur_child_volume + ( @@ -518,7 +520,7 @@ class PlayerManager: await self.async_cmd_volume_set(child_player_id, new_child_volume) # regular volume command else: - await player_prov.async_cmd_volume_set(player_id, volume_level) + await player.async_cmd_volume_set(volume_level) async def async_cmd_volume_up(self, player_id: str): """ @@ -558,50 +560,11 @@ class PlayerManager: player = self.get_player(player_id) if not player: return - player_prov = self.get_player_provider(player_id) # TODO: handle mute on volumecontrol? - return await player_prov.async_cmd_volume_mute(player_id, is_muted) + return await player.async_cmd_volume_mute(is_muted) # OTHER/HELPER FUNCTIONS - @callback - def get_player_config_entries(self, player_id: str): - """Get final/calculated config entries for a player.""" - if player_id not in self._org_players: - return [] - entries = self._org_players[player_id].config_entries - # append power control config entries - power_controls = self.get_player_controls(PlayerControlType.POWER) - if power_controls: - controls = [ - {"text": f"{item.provider}: {item.name}", "value": item.control_id} - for item in power_controls - ] - entries.append( - ConfigEntry( - entry_key=CONF_POWER_CONTROL, - entry_type=ConfigEntryType.STRING, - description_key=CONF_POWER_CONTROL, - values=controls, - ) - ) - # append volume control config entries - volume_controls = self.get_player_controls(PlayerControlType.VOLUME) - if volume_controls: - controls = [ - {"text": f"{item.provider}: {item.name}", "value": item.control_id} - for item in volume_controls - ] - entries.append( - ConfigEntry( - entry_key=CONF_VOLUME_CONTROL, - entry_type=ConfigEntryType.STRING, - description_key=CONF_VOLUME_CONTROL, - values=controls, - ) - ) - return entries - async def async_get_gain_correct( self, player_id: str, item_id: str, provider_id: str ): @@ -621,152 +584,6 @@ class PlayerManager: gain_correct = round(gain_correct, 2) return gain_correct - async def __async_create_player_state(self, player: Player): - """Create/update internal Player object with all calculated properties.""" - self._org_players[player.player_id] = player - player_enabled = bool( - self.mass.config.get_player_config(player.player_id)[CONF_ENABLED] - ) - if player.player_id in self._players: - player_state = self._players[player.player_id] - else: - player_state = Player(player.player_id, player.provider_id) - self._players[player.player_id] = player_state - setattr(player_state, "_on_update", self.__player_updated) - player_state.group_parents = self.__get_player_group_parents(player) - active_queue = self.__get_player_active_queue( - player, player_state.group_parents - ) - player_state.name = self.__get_player_name(player) - player_state.powered = self.__get_player_power_state(player) - if active_queue != player.player_id: - player_state.elapsed_time = self._players[active_queue].elapsed_time - player_state.current_uri = self._players[active_queue].current_uri - else: - player_state.elapsed_time = int(player.elapsed_time) - player_state.current_uri = player.current_uri - player_state.state = self.__get_player_state( - player, active_queue, player_state.powered - ) - player_state.available = False if not player_enabled else player.available - player_state.volume_level = self.__get_player_volume_level(player) - player_state.muted = self.__get_player_mute_state(player) - player_state.is_group_player = player.is_group_player - player_state.group_childs = player.group_childs - player_state.device_info = player.device_info - player_state.should_poll = player.should_poll - player_state.features = player.features - player_state.active_queue = active_queue - - @callback - def __get_player_name(self, player: Player): - """Get final/calculated player name.""" - conf_name = self.mass.config.get_player_config(player.player_id)[CONF_NAME] - return conf_name if conf_name else player.name - - @callback - def __get_player_power_state(self, player: Player): - """Get final/calculated player's power state.""" - if not player.available: - return False - player_config = self.mass.config.player_settings[player.player_id] - if player_config.get(CONF_POWER_CONTROL): - control = self.get_player_control(player_config[CONF_POWER_CONTROL]) - if control: - return control.state - return player.powered - - @callback - def __get_player_volume_level(self, player: Player): - """Get final/calculated player's volume_level.""" - if not player.available: - return 0 - player_config = self.mass.config.player_settings[player.player_id] - if player_config.get(CONF_VOLUME_CONTROL): - control = self.get_player_control(player_config[CONF_VOLUME_CONTROL]) - if control: - return control.state - # handle group volume - if player.is_group_player: - group_volume = 0 - active_players = 0 - for child_player_id in player.group_childs: - child_player = self._players.get(child_player_id) - if child_player and child_player.available and child_player.powered: - group_volume += child_player.volume_level - active_players += 1 - if active_players: - group_volume = group_volume / active_players - return group_volume - return player.volume_level - - @callback - def __get_player_state(self, player: Player, active_parent: str, powered: bool): - """Get final/calculated player's state.""" - if powered and active_parent != player.player_id: - # use group state - return self._players[active_parent].state - if PlayerState(player.state) == PlayerState.Stopped and not powered: - return PlayerState.Off - return PlayerState(player.state) - - @callback - @classmethod - def __get_player_mute_state(cls, player: Player): - """Get final/calculated player's mute state.""" - # TODO: Handle VolumeControl plugin for mute state? - return player.muted - - @callback - def __get_player_group_parents(self, player: Player): - """Return all group players this player belongs to.""" - if player.is_group_player: - return [] - result = [] - for group_player in self._players.values(): - if not group_player.is_group_player: - continue - if player.player_id not in group_player.group_childs: - continue - result.append(group_player.player_id) - return result - - @callback - def __get_player_active_queue(self, player: Player, group_parents: List[str]): - """Return the active parent player/queue for a player.""" - # if a group is powered on, all of it's childs will have/use - # the parent's player's queue. - for group_player_id in group_parents: - group_player = self.get_player(group_player_id) - if group_player and group_player.powered: - return group_player_id - return player.player_id - - @callback - def __player_updated(self, player_id: str, changed_value: str): - """Call when player is updated.""" - if player_id not in self._players: - return - player = self._players[player_id] - if not player.available and changed_value != "available": - # ignore updates from unavailable players - return - if changed_value == "config_entries": - return # we can ignore this too - # store datetime the player was last updated - player.updated_at = datetime.utcnow() - # signal player_updated on all state changes except elapsed time - if not changed_value == "elapsed_time": - self.mass.signal_event(EVENT_PLAYER_CHANGED, self._players[player_id]) - # signal child players - if player.is_group_player: - for child_player_id in player.group_childs: - child_player = self.get_player(child_player_id) - if child_player and child_player.available: - self.mass.add_job(self.async_update_player(child_player)) - if player_id in self._player_queues and player.active_queue == player_id: - self.mass.add_job(self._player_queues[player_id].async_update_state()) - async def __handle_websocket_player_control_event(self, msg, msg_details): """Handle player controls over the websockets api.""" if msg in [EVENT_REGISTER_PLAYER_CONTROL, EVENT_PLAYER_CONTROL_UPDATED]: diff --git a/music_assistant/providers/chromecast/__init__.py b/music_assistant/providers/chromecast/__init__.py index 99cd6066..5dc86539 100644 --- a/music_assistant/providers/chromecast/__init__.py +++ b/music_assistant/providers/chromecast/__init__.py @@ -6,7 +6,6 @@ from typing import List import pychromecast from music_assistant.models.config_entry import ConfigEntry -from music_assistant.models.player_queue import QueueItem from music_assistant.models.playerprovider import PlayerProvider from pychromecast.controllers.multizone import MultizoneManager @@ -27,8 +26,6 @@ async def async_setup(mass): class ChromecastProvider(PlayerProvider): """Support for ChromeCast Audio PlayerProvider.""" - # pylint: disable=abstract-method - def __init__(self, *args, **kwargs): """Initialize.""" self.mz_mgr = MultizoneManager() @@ -78,108 +75,6 @@ class ChromecastProvider(PlayerProvider): for player in self._players.values(): player.disconnect() - async def async_cmd_play_uri(self, player_id: str, uri: str): - """ - Play the specified uri/url on the given player. - - :param player_id: player_id of the player to handle the command. - """ - self.mass.add_job(self._players[player_id].play_uri, uri) - - async def async_cmd_stop(self, player_id: str) -> None: - """ - Send STOP command to given player. - - :param player_id: player_id of the player to handle the command. - """ - self.mass.add_job(self._players[player_id].stop) - - async def async_cmd_play(self, player_id: str) -> None: - """ - Send STOP command to given player. - - :param player_id: player_id of the player to handle the command. - """ - self.mass.add_job(self._players[player_id].play) - - async def async_cmd_pause(self, player_id: str): - """ - Send PAUSE command to given player. - - :param player_id: player_id of the player to handle the command. - """ - self.mass.add_job(self._players[player_id].pause) - - async def async_cmd_next(self, player_id: str): - """ - Send NEXT TRACK command to given player. - - :param player_id: player_id of the player to handle the command. - """ - self.mass.add_job(self._players[player_id].next) - - async def async_cmd_previous(self, player_id: str): - """ - Send PREVIOUS TRACK command to given player. - - :param player_id: player_id of the player to handle the command. - """ - self.mass.add_job(self._players[player_id].previous) - - async def async_cmd_power_on(self, player_id: str) -> None: - """ - Send POWER ON command to given player. - - :param player_id: player_id of the player to handle the command. - """ - self.mass.add_job(self._players[player_id].power_on) - - async def async_cmd_power_off(self, player_id: str) -> None: - """ - Send POWER OFF command to given player. - - :param player_id: player_id of the player to handle the command. - """ - self.mass.add_job(self._players[player_id].power_off) - - async def async_cmd_volume_set(self, player_id: str, volume_level: int) -> None: - """ - Send volume level command to given player. - - :param player_id: player_id of the player to handle the command. - :param volume_level: volume level to set (0..100). - """ - self.mass.add_job(self._players[player_id].volume_set, volume_level / 100) - - async def async_cmd_volume_mute(self, player_id: str, is_muted=False): - """ - Send volume MUTE command to given player. - - :param player_id: player_id of the player to handle the command. - :param is_muted: bool with new mute state. - """ - self.mass.add_job(self._players[player_id].volume_mute, is_muted) - - async def async_cmd_queue_load(self, player_id: str, queue_items: List[QueueItem]): - """ - Load/overwrite given items in the player's queue implementation. - - :param player_id: player_id of the player to handle the command. - :param queue_items: a list of QueueItems - """ - self.mass.add_job(self._players[player_id].queue_load, queue_items) - - async def async_cmd_queue_append( - self, player_id: str, queue_items: List[QueueItem] - ): - """ - Append new items at the end of the queue. - - :param player_id: player_id of the player to handle the command. - :param queue_items: a list of QueueItems - """ - self.mass.add_job(self._players[player_id].queue_append, queue_items) - def __chromecast_add_update_callback(self, cast_uuid, cast_service_name): """Handle zeroconf discovery of a new or updated chromecast.""" # pylint: disable=unused-argument @@ -198,14 +93,13 @@ class ChromecastProvider(PlayerProvider): if player_id in self._players: # player already added, the player will take care of reconnects itself. return - else: - LOGGER.debug( - "Chromecast discovered: %s (%s)", cast_info.friendly_name, player_id - ) - player = ChromecastPlayer(self.mass, cast_info) - self._players[player_id] = player - self.mass.add_job(self.mass.player_manager.async_add_player(player)) + LOGGER.debug( + "Chromecast discovered: %s (%s)", cast_info.friendly_name, player_id + ) + player = ChromecastPlayer(self.mass, cast_info) + self._players[player_id] = player self.mass.add_job(self._players[player_id].set_cast_info, cast_info) + self.mass.add_job(self.mass.player_manager.async_add_player(player)) def __chromecast_remove_callback(self, cast_uuid, cast_service_name, cast_service): """Handle a Chromecast removed event.""" diff --git a/music_assistant/providers/chromecast/player.py b/music_assistant/providers/chromecast/player.py index fe37788a..520431d7 100644 --- a/music_assistant/providers/chromecast/player.py +++ b/music_assistant/providers/chromecast/player.py @@ -1,14 +1,20 @@ """Representation of a Cast device on the network.""" import logging import uuid -from contextlib import suppress from datetime import datetime from typing import List, Optional import pychromecast -from music_assistant.models.player import DeviceInfo, PlayerFeature, PlayerState +from music_assistant.helpers.typing import MusicAssistantType +from music_assistant.models.config_entry import ConfigEntry +from music_assistant.models.player import ( + DeviceInfo, + PlaybackState, + Player, + PlayerFeature, +) from music_assistant.models.player_queue import QueueItem -from music_assistant.utils import compare_strings +from music_assistant.utils import compare_strings, yield_chunks from pychromecast.controllers.multizone import MultizoneController from pychromecast.socket_client import ( CONNECTION_STATUS_CONNECTED, @@ -22,7 +28,7 @@ LOGGER = logging.getLogger(PROV_ID) PLAYER_FEATURES = [PlayerFeature.QUEUE] -class ChromecastPlayer: +class ChromecastPlayer(Player): """Representation of a Cast device on the network. This class is the holder of the pychromecast.Chromecast object and @@ -30,13 +36,12 @@ class ChromecastPlayer: "elected leader" itself. """ - def __init__(self, mass, cast_info: ChromecastInfo): + def __init__(self, mass: MusicAssistantType, cast_info: ChromecastInfo) -> None: """Initialize the cast device.""" self.mass = mass - self.features = PLAYER_FEATURES - self.config_entries = PLAYER_CONFIG_ENTRIES - self.provider_id = PROV_ID self._cast_info = cast_info + self._player_id = cast_info.uuid + self.services = cast_info.services self._chromecast: Optional[pychromecast.Chromecast] = None self.cast_status = None @@ -48,44 +53,46 @@ class ChromecastPlayer: self._powered = False self._status_listener: Optional[CastStatusListener] = None self._is_speaker_group = False - self.last_updated = datetime.utcnow() @property - def player_id(self): - """Return id of this player.""" - return self._cast_info.uuid + def player_id(self) -> str: + """Return player id of this player.""" + return self._player_id + + @property + def provider_id(self) -> str: + """Return provider id of this player.""" + return PROV_ID @property - def name(self): + def name(self) -> str: """Return name of this player.""" return ( self._chromecast.name if self._chromecast else self._cast_info.friendly_name ) @property - def powered(self): + def powered(self) -> bool: """Return power state of this player.""" return self._powered @property - def should_poll(self): + def should_poll(self) -> bool: """Return bool if this player needs to be polled for state changes.""" - if not self._chromecast or not self._chromecast.media_controller: - return False - return self._chromecast.media_controller.status.player_is_playing + return self.media_status and self.media_status.player_is_playing @property - def state(self) -> PlayerState: + def state(self) -> PlaybackState: """Return the state of the player.""" if self.media_status is None: - return PlayerState.Stopped + return PlaybackState.Stopped if self.media_status.player_is_playing: - return PlayerState.Playing + return PlaybackState.Playing if self.media_status.player_is_paused: - return PlayerState.Paused + return PlaybackState.Paused if self.media_status.player_is_idle: - return PlayerState.Stopped - return PlayerState.Stopped + return PlaybackState.Stopped + return PlaybackState.Stopped @property def elapsed_time(self) -> int: @@ -96,39 +103,63 @@ class ChromecastPlayer: or self.media_status.player_is_idle ): return 0 - if self.media_status.player_is_playing: - return self._chromecast.media_controller.status.adjusted_current_time + # Add time since last update + return self.media_status.adjusted_current_time # Not playing, return last reported seek time return self.media_status.current_time @property - def available(self): + def elapsed_milliseconds(self) -> int: + """Return (realtime) elapsed time of current playing media in milliseconds.""" + if self.media_status is None or not ( + self.media_status.player_is_playing + or self.media_status.player_is_paused + or self.media_status.player_is_idle + ): + return 0 + if self.media_status.player_is_playing: + # Add time since last update + return int( + ( + self.media_status.current_time + + ( + datetime.utcnow().timestamp() + - self.media_status.last_updated.timestamp() + ) + ) + * 1000 + ) + # Not playing, return last reported seek time + return self.media_status.current_time * 1000 + + @property + def available(self) -> bool: """Return availablity state of this player.""" return self._available @property - def current_uri(self): + def current_uri(self) -> str: """Return current_uri of this player.""" return self.media_status.content_id if self.media_status else None @property - def volume_level(self): + def volume_level(self) -> int: """Return volume_level of this player.""" return int(self.cast_status.volume_level * 100 if self.cast_status else 0) @property - def muted(self): + def muted(self) -> bool: """Return mute state of this player.""" return self.cast_status.volume_muted if self.cast_status else False @property - def is_group_player(self): + def is_group_player(self) -> bool: """Return if this player is a group player.""" return self._cast_info.is_audio_group and not self._is_speaker_group @property - def group_childs(self): + def group_childs(self) -> List[str]: """Return group_childs.""" if ( self._cast_info.is_audio_group @@ -141,7 +172,7 @@ class ChromecastPlayer: return [] @property - def device_info(self): + def device_info(self) -> DeviceInfo: """Return deviceinfo.""" return DeviceInfo( model=self._cast_info.model_name, @@ -149,7 +180,17 @@ class ChromecastPlayer: manufacturer=self._cast_info.manufacturer, ) - async def set_cast_info(self, cast_info: ChromecastInfo): + @property + def features(self) -> List[PlayerFeature]: + """Return list of features this player supports.""" + return PLAYER_FEATURES + + @property + def config_entries(self) -> List[ConfigEntry]: + """Return player specific config entries (if any).""" + return PLAYER_CONFIG_ENTRIES + + def set_cast_info(self, cast_info: ChromecastInfo) -> None: """Set the cast information and set up the chromecast object.""" self._cast_info = cast_info if self._chromecast is not None: @@ -178,14 +219,11 @@ class ChromecastPlayer: self.cast_status = chromecast.status self.media_status = chromecast.media_controller.status mz_controller = MultizoneController(chromecast.uuid) - # mz.register_listener( - # MZListener(mz, self.__handle_group_members_update, self.mass.loop) - # ) chromecast.register_handler(mz_controller) chromecast.mz_controller = mz_controller - self._chromecast.start() + self.mass.add_job(self._chromecast.start) - def disconnect(self): + def disconnect(self) -> None: """Disconnect Chromecast object if it is set.""" if self._chromecast is None: return @@ -193,10 +231,10 @@ class ChromecastPlayer: "[%s] Disconnecting from chromecast socket", self._cast_info.friendly_name ) self._available = False - self._chromecast.disconnect() + self.mass.add_job(self._chromecast.disconnect) self._invalidate() - def _invalidate(self): + def _invalidate(self) -> None: """Invalidate some attributes.""" self._chromecast = None self.cast_status = None @@ -207,9 +245,13 @@ class ChromecastPlayer: self._status_listener.invalidate() self._status_listener = None + async def async_on_remove(self) -> None: + """Call when player is removed from the player manager.""" + self.disconnect() + # ========== Callbacks ========== - def new_cast_status(self, cast_status): + def new_cast_status(self, cast_status) -> None: """Handle updates of the cast status.""" self.cast_status = cast_status self._is_speaker_group = ( @@ -220,21 +262,21 @@ class ChromecastPlayer: self._chromecast.mz_controller.members[0], self.player_id ) ) - self.mass.add_job(self.mass.player_manager.async_update_player(self)) + self.update_state() - def new_media_status(self, media_status): + def new_media_status(self, media_status) -> None: """Handle updates of the media status.""" self.media_status = media_status - self.mass.add_job(self.mass.player_manager.async_update_player(self)) + self.update_state() if media_status.player_is_playing: self._powered = True - def new_connection_status(self, connection_status): + def new_connection_status(self, connection_status) -> None: """Handle updates of connection status.""" if connection_status.status == CONNECTION_STATUS_DISCONNECTED: self._available = False self._invalidate() - self.mass.add_job(self.mass.player_manager.async_update_player(self)) + self.update_state() return new_available = connection_status.status == CONNECTION_STATUS_CONNECTED @@ -248,85 +290,71 @@ class ChromecastPlayer: connection_status.status, ) self._available = new_available - self.mass.add_job(self.mass.player_manager.async_update_player(self)) + self.update_state() if self._cast_info.is_audio_group and new_available: self._chromecast.mz_controller.update_members() + async def async_on_update(self) -> None: + """Call when player is periodically polled by the player manager (should_poll=True).""" + if self.mass.player_manager.get_player(self.player_id).active_queue.startswith( + "group_player" + ): + self.mass.add_job(self._chromecast.media_controller.update_status) + self.update_state() + # ========== Service Calls ========== - def stop(self): + async def async_cmd_stop(self) -> None: """Send stop command to player.""" - if not self._chromecast.socket_client.is_connected: - LOGGER.warning("Ignore player command: Socket client is not connected.") - return - with SuppressChromeCastError(self.name): - self._chromecast.media_controller.stop() + if self._chromecast and self._chromecast.media_controller: + self.mass.add_job(self._chromecast.media_controller.stop) - def play(self): + async def async_cmd_play(self) -> None: """Send play command to player.""" - if not self._chromecast.socket_client.is_connected: - LOGGER.warning("Ignore player command: Socket client is not connected.") - return - with SuppressChromeCastError(self.name): - self._chromecast.media_controller.play() + if self._chromecast.media_controller: + self.mass.add_job(self._chromecast.media_controller.play) - def pause(self): + async def async_cmd_pause(self) -> None: """Send pause command to player.""" - if not self._chromecast.socket_client.is_connected: - LOGGER.warning("Ignore player command: Socket client is not connected.") - return - with SuppressChromeCastError(self.name): - self._chromecast.media_controller.pause() + if self._chromecast.media_controller: + self.mass.add_job(self._chromecast.media_controller.pause) - def next(self): + async def async_cmd_next(self) -> None: """Send next track command to player.""" - if not self._chromecast.socket_client.is_connected: - LOGGER.warning("Ignore player command: Socket client is not connected.") - return - with SuppressChromeCastError(self.name): - self._chromecast.media_controller.queue_next() + if self._chromecast.media_controller: + self.mass.add_job(self._chromecast.media_controller.queue_next) - def previous(self): + async def async_cmd_previous(self) -> None: """Send previous track command to player.""" - if not self._chromecast.socket_client.is_connected: - LOGGER.warning("Ignore player command: Socket client is not connected.") - return - with SuppressChromeCastError(self.name): - self._chromecast.media_controller.queue_prev() + if self._chromecast.media_controller: + self.mass.add_job(self._chromecast.media_controller.queue_prev) - def power_on(self): + async def async_cmd_power_on(self) -> None: """Send power ON command to player.""" - if not self._chromecast.socket_client.is_connected: - LOGGER.warning("Ignore player command: Socket client is not connected.") - return self._powered = True - with SuppressChromeCastError(self.name): - self._chromecast.set_volume_muted(False) + self.mass.add_job(self._chromecast.set_volume_muted, False) - def power_off(self): + async def async_cmd_power_off(self) -> None: """Send power OFF command to player.""" - with SuppressChromeCastError(self.name): - if self.media_status and ( - self.media_status.player_is_playing - or self.media_status.player_is_paused - or self.media_status.player_is_idle - ): - self._chromecast.media_controller.stop() - self._powered = False - # chromecast has no real poweroff so we send mute instead - self._chromecast.set_volume_muted(True) - - def volume_set(self, volume_level): + if self.media_status and ( + self.media_status.player_is_playing + or self.media_status.player_is_paused + or self.media_status.player_is_idle + ): + self.mass.add_job(self._chromecast.media_controller.stop) + self._powered = False + # chromecast has no real poweroff so we send mute instead + self.mass.add_job(self._chromecast.set_volume_muted, True) + + async def async_cmd_volume_set(self, volume_level: int) -> None: """Send new volume level command to player.""" - with SuppressChromeCastError(self.name): - self._chromecast.set_volume(volume_level) + self.mass.add_job(self._chromecast.set_volume, volume_level / 100) - def volume_mute(self, is_muted=False): + async def async_cmd_volume_mute(self, is_muted: bool = False) -> None: """Send mute command to player.""" - with SuppressChromeCastError(self.name): - self._chromecast.set_volume_muted(is_muted) + self.mass.add_job(self._chromecast.set_volume_muted, is_muted) - def play_uri(self, uri: str): + async def async_cmd_play_uri(self, uri: str) -> None: """Play single uri on player.""" player_queue = self.mass.player_manager.get_player_queue(self.player_id) if player_queue.use_queue_stream: @@ -334,11 +362,10 @@ class ChromecastPlayer: queue_item = QueueItem() queue_item.name = "Music Assistant" queue_item.uri = uri - return self.queue_load([queue_item, queue_item]) - with SuppressChromeCastError(self.name): - self._chromecast.play_media(uri, "audio/flac") + return await self.async_cmd_queue_load([queue_item, queue_item]) + self.mass.add_job(self._chromecast.play_media, uri, "audio/flac") - def queue_load(self, queue_items: List[QueueItem]): + async def async_cmd_queue_load(self, queue_items: List[QueueItem]) -> None: """Load (overwrite) queue with new items.""" player_queue = self.mass.player_manager.get_player_queue(self.player_id) cc_queue_items = self.__create_queue_items(queue_items[:50]) @@ -351,22 +378,22 @@ class ChromecastPlayer: "startIndex": 0, # Item index to play after this request or keep same item if undefined "items": cc_queue_items, # only load 50 tracks at once or the socket will crash } - self.__send_player_queue(queuedata) + self.mass.add_job(self.__send_player_queue, queuedata) if len(queue_items) > 50: - self.queue_append(queue_items[51:]) + await self.async_cmd_queue_append(queue_items[51:]) - def queue_append(self, queue_items: List[QueueItem]): + async def async_cmd_queue_append(self, queue_items: List[QueueItem]) -> None: """Append new items at the end of the queue.""" cc_queue_items = self.__create_queue_items(queue_items) - for chunk in chunks(cc_queue_items, 50): + for chunk in yield_chunks(cc_queue_items, 50): queuedata = { "type": "QUEUE_INSERT", "insertBefore": None, "items": chunk, } - self.__send_player_queue(queuedata) + self.mass.add_job(self.__send_player_queue, queuedata) - def __create_queue_items(self, tracks): + def __create_queue_items(self, tracks) -> None: """Create list of CC queue items from tracks.""" queue_items = [] for track in tracks: @@ -401,46 +428,21 @@ class ChromecastPlayer: }, } - def __send_player_queue(self, queuedata): + def __send_player_queue(self, queuedata: dict) -> None: """Send new data to the CC queue.""" - with SuppressChromeCastError(self.name): - media_controller = self._chromecast.media_controller - # pylint: disable=protected-access - receiver_ctrl = media_controller._socket_client.receiver_controller - - def send_queue(): - """Plays media after chromecast has switched to requested app.""" - queuedata["mediaSessionId"] = media_controller.status.media_session_id - media_controller.send_message(queuedata, inc_session_id=False) - - if not media_controller.status.media_session_id: - receiver_ctrl.launch_app( - media_controller.app_id, callback_function=send_queue - ) - else: - send_queue() - - -def chunks(_list, chunk_size): - """Yield successive n-sized chunks from list.""" - for i in range(0, len(_list), chunk_size): - yield _list[i : i + chunk_size] - - -class SuppressChromeCastError(suppress): - """Context manager to suppress Chromecast connection error.""" - - def __init__(self, player_id): - """Handle init of the contextmanager.""" - # pylint: disable=super-init-not-called - self.player_id = player_id - - def __exit__(self, exctype, excinst, exctb): - """Handle exit of the contextmanager.""" - if exctype is not None and issubclass(exctype, pychromecast.error.NotConnected): - LOGGER.warning( - "Chromecast client %s is not connected, ignoring command...", - self.player_id, + media_controller = self._chromecast.media_controller + # pylint: disable=protected-access + receiver_ctrl = media_controller._socket_client.receiver_controller + + def send_queue(): + """Plays media after chromecast has switched to requested app.""" + queuedata["mediaSessionId"] = media_controller.status.media_session_id + media_controller.send_message(queuedata, False) + + if not media_controller.status.media_session_id: + receiver_ctrl.launch_app( + media_controller.app_id, + callback_function=send_queue, ) - return True - return exctype is None + else: + send_queue() diff --git a/music_assistant/providers/demo/demo_playerprovider.py b/music_assistant/providers/demo/demo_playerprovider.py index 33fe60f8..6ed6b728 100644 --- a/music_assistant/providers/demo/demo_playerprovider.py +++ b/music_assistant/providers/demo/demo_playerprovider.py @@ -6,7 +6,7 @@ import subprocess from typing import List from music_assistant.models.config_entry import ConfigEntry -from music_assistant.models.player import DeviceInfo, Player, PlayerState +from music_assistant.models.player import DeviceInfo, PlaybackState, Player from music_assistant.models.playerprovider import PlayerProvider PROV_ID = "demo_player" @@ -17,12 +17,6 @@ LOGGER = logging.getLogger(PROV_ID) class DemoPlayerProvider(PlayerProvider): """Demo PlayerProvider which provides fake players.""" - def __init__(self, *args, **kwargs): - """Initialize.""" - self._players = {} - self._progress_tasks = {} - super().__init__(*args, **kwargs) - @property def id(self) -> str: """Return provider ID for this provider.""" @@ -41,192 +35,172 @@ class DemoPlayerProvider(PlayerProvider): async def async_on_start(self) -> bool: """Handle initialization of the provider based on config.""" # create fake/test regular player 1 - player = Player( - player_id="demo_player_1", - provider_id=PROV_ID, - name="Demo player 1", - device_info=DeviceInfo( - model="Demo/Test Player", - address="http://demo_player1:12345", - manufacturer=PROV_ID, - ), - ) - player.sox = None - self._players[player.player_id] = player + player = DemoPlayer("demo_player_1", "Demo player 1") self.mass.add_job(self.mass.player_manager.async_add_player(player)) - # create fake/test regular player 2 - player = Player( - player_id="demo_player_2", - provider_id=PROV_ID, - name="Demo player 2", - device_info=DeviceInfo( - model="Demo/Test Player", - address="http://demo_player2:12345", - manufacturer=PROV_ID, - ), - ) - player.sox = None - self._players[player.player_id] = player + player = DemoPlayer("demo_player_2", "Demo player 2") self.mass.add_job(self.mass.player_manager.async_add_player(player)) - # create fake/test group player - group_player = Player( - player_id="demo_group_player", - is_group_player=True, - group_childs=["demo_player_1", "demo_player_2"], - provider_id=PROV_ID, - name="Demo Group Player", - device_info=DeviceInfo( - model="Demo/Test Group player", - address="http://demo_group_player:12345", - manufacturer=PROV_ID, - ), - ) - group_player.sox = None - self._players[group_player.player_id] = group_player - self.mass.add_job(self.mass.player_manager.async_add_player(group_player)) - return True async def async_on_stop(self): """Handle correct close/cleanup of the provider on exit.""" - for player in self._players.values(): - if player.sox: - player.sox.terminate() - - # SERVICE CALLS / PLAYER COMMANDS + for player in self.players: + await player.async_cmd_stop() + + +class DemoPlayer(Player): + """Representation of a player for the demo provider.""" + + def __init__(self, player_id: str, name: str) -> None: + """Initialize a demo player.""" + self._player_id = player_id + self._name = name + self._powered = False + self._elapsed_time = 0 + self._state = PlaybackState.Stopped + self._current_uri = "" + self._volume_level = 100 + self._muted = False + self._sox = None + self._progress_task = None - async def async_cmd_play_uri(self, player_id: str, uri: str): - """ - Play the specified uri/url on the given player. - - :param player_id: player_id of the player to handle the command. - """ - player = self._players[player_id] - if player.sox: - await self.async_cmd_stop(player_id) - player.current_uri = uri - player.sox = subprocess.Popen(["play", "-q", uri]) - player.state = PlayerState.Playing - player.powered = True - self.mass.add_job(self.mass.player_manager.async_update_player(player)) + @property + def player_id(self) -> str: + """Return player id of this player.""" + return self._player_id - async def report_progress(): - """Report fake progress while sox is playing.""" - LOGGER.info("Playback started on player %s", player_id) - player.elapsed_time = 0 - while player.sox and not player.sox.poll(): - await asyncio.sleep(1) - player.elapsed_time += 1 - self.mass.add_job(self.mass.player_manager.async_update_player(player)) - LOGGER.info("Playback stopped on player %s", player_id) - player.elapsed_time = 0 - player.state = PlayerState.Stopped - self.mass.add_job(self.mass.player_manager.async_update_player(player)) - - if self._progress_tasks.get(player_id): - self._progress_tasks[player_id].cancel() - self._progress_tasks[player_id] = self.mass.add_job(report_progress) - - async def async_cmd_stop(self, player_id: str) -> None: - """ - Send STOP command to given player. + @property + def provider_id(self) -> str: + """Return provider id of this player.""" + return PROV_ID - :param player_id: player_id of the player to handle the command. - """ - player = self._players[player_id] - if player.sox: - player.sox.terminate() - player.sox = None - player.state = PlayerState.Stopped - self.mass.add_job(self.mass.player_manager.async_update_player(player)) - - async def async_cmd_play(self, player_id: str) -> None: - """ - Send PLAY command to given player. + @property + def name(self) -> str: + """Return name of the player.""" + return self._name - :param player_id: player_id of the player to handle the command. - """ - player = self._players[player_id] - if player.sox: - player.sox.send_signal(signal.SIGCONT) - player.state = PlayerState.Playing - self.mass.add_job(self.mass.player_manager.async_update_player(player)) + @property + def powered(self) -> bool: + """Return current power state of player.""" + return self._powered - async def async_cmd_pause(self, player_id: str): - """ - Send PAUSE command to given player. + @property + def elapsed_time(self) -> float: + """Return elapsed_time of current playing uri in (fractions of) seconds.""" + return self._elapsed_time - :param player_id: player_id of the player to handle the command. - """ - player = self._players[player_id] - if player.sox: - player.sox.send_signal(signal.SIGSTOP) - player.state = PlayerState.Paused - self.mass.add_job(self.mass.player_manager.async_update_player(player)) + @property + def state(self) -> PlaybackState: + """Return current PlaybackState of player.""" + return self._state - async def async_cmd_next(self, player_id: str): - """ - Send NEXT TRACK command to given player. + @property + def available(self) -> bool: + """Return current availablity of player.""" + return True - :param player_id: player_id of the player to handle the command. - """ - # this code should never be reached as the player doesn't report queue support - # throw NotImplementedError just in case we've missed a spot - raise NotImplementedError + @property + def current_uri(self) -> str: + """Return currently loaded uri of player (if any).""" + return self._current_uri - async def async_cmd_previous(self, player_id: str): - """ - Send PREVIOUS TRACK command to given player. + @property + def volume_level(self) -> int: + """Return current volume level of player (scale 0..100).""" + return self._volume_level - :param player_id: player_id of the player to handle the command. - """ - # this code should never be reached as the player doesn't report queue support - # throw NotImplementedError just in case we've missed a spot - raise NotImplementedError + @property + def muted(self) -> bool: + """Return current mute state of player.""" + return self._muted - async def async_cmd_power_on(self, player_id: str) -> None: - """ - Send POWER ON command to given player. + @property + def is_group_player(self) -> bool: + """Return True if this player is a group player.""" + return False - :param player_id: player_id of the player to handle the command. - """ - self._players[player_id].powered = True - self.mass.add_job( - self.mass.player_manager.async_update_player(self._players[player_id]) + @property + def device_info(self) -> DeviceInfo: + """Return the device info for this player.""" + return DeviceInfo( + model="Demo", address="http://demo:12345", manufacturer=PROV_NAME ) - async def async_cmd_power_off(self, player_id: str) -> None: - """ - Send POWER OFF command to given player. + # SERVICE CALLS / PLAYER COMMANDS - :param player_id: player_id of the player to handle the command. - """ - await self.async_cmd_stop(player_id) - self._players[player_id].powered = False - self.mass.add_job( - self.mass.player_manager.async_update_player(self._players[player_id]) - ) + async def async_cmd_play_uri(self, uri: str): + """Play the specified uri/url on the player.""" + if self._sox: + await self.async_cmd_stop() + self._current_uri = uri + self._sox = subprocess.Popen(["play", "-t", "flac", "-q", uri]) + self._state = PlaybackState.Playing + self._powered = True + self.update_state() - async def async_cmd_volume_set(self, player_id: str, volume_level: int) -> None: + async def report_progress(): + """Report fake progress while sox is playing.""" + LOGGER.info("Playback started on player %s", self.name) + self._elapsed_time = 0 + while self._sox and not self._sox.poll(): + await asyncio.sleep(1) + self._elapsed_time += 1 + self.update_state() + LOGGER.info("Playback stopped on player %s", self.name) + self._elapsed_time = 0 + self._state = PlaybackState.Stopped + self.update_state() + + if self._progress_task: + self._progress_task.cancel() + self._progress_task = self.mass.add_job(report_progress) + + async def async_cmd_stop(self) -> None: + """Send STOP command to player.""" + if self._sox: + self._sox.terminate() + self._sox = None + self._state = PlaybackState.Stopped + self.update_state() + + async def async_cmd_play(self) -> None: + """Send PLAY command to player.""" + if self._sox: + self._sox.send_signal(signal.SIGCONT) + self._state = PlaybackState.Playing + self.update_state() + + async def async_cmd_pause(self): + """Send PAUSE command to given player.""" + if self._sox: + self._sox.send_signal(signal.SIGSTOP) + self._state = PlaybackState.Paused + self.update_state() + + async def async_cmd_power_on(self) -> None: + """Send POWER ON command to player.""" + self._powered = True + self.update_state() + + async def async_cmd_power_off(self) -> None: + """Send POWER OFF command to player.""" + await self.async_cmd_stop() + self._powered = False + self.update_state() + + async def async_cmd_volume_set(self, volume_level: int) -> None: """ Send volume level command to given player. - :param player_id: player_id of the player to handle the command. :param volume_level: volume level to set (0..100). """ - self._players[player_id].volume_level = volume_level - self.mass.add_job( - self.mass.player_manager.async_update_player(self._players[player_id]) - ) + self._volume_level = volume_level + self.update_state() - async def async_cmd_volume_mute(self, player_id: str, is_muted=False): + async def async_cmd_volume_mute(self, is_muted=False): """ Send volume MUTE command to given player. - :param player_id: player_id of the player to handle the command. :param is_muted: bool with new mute state. """ - self._players[player_id].muted = is_muted - self.mass.add_job( - self.mass.player_manager.async_update_player(self._players[player_id]) - ) + self._muted = is_muted + self.update_state() diff --git a/music_assistant/providers/file/__init__.py b/music_assistant/providers/file/__init__.py index f32559b8..fbbc47e7 100644 --- a/music_assistant/providers/file/__init__.py +++ b/music_assistant/providers/file/__init__.py @@ -88,6 +88,8 @@ class FileProvider(MusicProvider): async def async_on_start(self) -> bool: """Handle initialization of the provider based on config.""" conf = self.mass.config.get_provider_config(self.id) + if not conf[CONF_MUSIC_DIR]: + return False if not os.path.isdir(conf[CONF_MUSIC_DIR]): raise FileNotFoundError(f"Directory {conf[CONF_MUSIC_DIR]} does not exist") self._music_dir = conf["music_dir"] diff --git a/music_assistant/providers/group_player/__init__.py b/music_assistant/providers/group_player/__init__.py new file mode 100644 index 00000000..2a922af8 --- /dev/null +++ b/music_assistant/providers/group_player/__init__.py @@ -0,0 +1,514 @@ +"""Group player provider: enables grouping of all playertypes.""" + +import asyncio +import logging +from typing import List + +from music_assistant.constants import CONF_GROUP_DELAY +from music_assistant.helpers.typing import MusicAssistantType +from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType +from music_assistant.models.player import DeviceInfo, PlaybackState, Player +from music_assistant.models.playerprovider import PlayerProvider + +PROV_ID = "group_player" +PROV_NAME = "Group player creator" +LOGGER = logging.getLogger(PROV_ID) + +CONF_PLAYER_COUNT = "group_player_count" +CONF_PLAYERS = "group_player_players" +CONF_MASTER = "group_player_master" + +CONFIG_ENTRIES = [ + ConfigEntry( + entry_key=CONF_PLAYER_COUNT, + entry_type=ConfigEntryType.INT, + description_key=CONF_PLAYER_COUNT, + default_value=1, + range=(0, 10), + ) +] + + +async def async_setup(mass): + """Perform async setup of this Plugin/Provider.""" + prov = GroupPlayerProvider() + await mass.async_register_provider(prov) + + +class GroupPlayerProvider(PlayerProvider): + """PlayerProvider which allows users to group players.""" + + @property + def id(self) -> str: + """Return provider ID for this provider.""" + return PROV_ID + + @property + def name(self) -> str: + """Return provider Name for this provider.""" + return PROV_NAME + + @property + def config_entries(self) -> List[ConfigEntry]: + """Return Config Entries for this provider.""" + return CONFIG_ENTRIES + + async def async_on_start(self) -> bool: + """Handle initialization of the provider based on config.""" + conf = self.mass.config.providers[PROV_ID] + for index in range(conf[CONF_PLAYER_COUNT]): + player = GroupPlayer(self.mass, index) + self.mass.add_job(self.mass.player_manager.async_add_player(player)) + return True + + async def async_on_stop(self): + """Handle correct close/cleanup of the provider on exit. Called on shutdown.""" + for player in self.players: + await player.async_cmd_stop() + + +class GroupPlayer(Player): + """Model for a group player.""" + + def __init__(self, mass: MusicAssistantType, player_index: int): + """Initialize.""" + self.mass = mass + self._player_index = player_index + self._player_id = f"group_player_{player_index}" + self._provider_id = PROV_ID + self._name = f"Group Player {player_index}" + self._powered = False + self._state = PlaybackState.Stopped + self._available = True + self._current_uri = "" + self._volume_level = 0 + self._muted = False + self.connected_clients = {} + self.stream_task = None + self.sync_task = None + + @property + def player_id(self) -> str: + """Return player id of this player.""" + return self._player_id + + @property + def provider_id(self) -> str: + """Return provider id of this player.""" + return self._provider_id + + @property + def name(self) -> str: + """Return name of the player.""" + return self._name + + @property + def powered(self) -> bool: + """Return current power state of player.""" + return self._powered + + @property + def state(self) -> PlaybackState: + """Return current PlaybackState of player.""" + return self._state + + @property + def available(self) -> bool: + """Return current availablity of player.""" + return True + + @property + def current_uri(self) -> str: + """Return currently loaded uri of player (if any).""" + return self._current_uri + + @property + def volume_level(self) -> int: + """Return current volume level of player (scale 0..100).""" + return self._volume_level + + @property + def muted(self) -> bool: + """Return current mute state of player.""" + return self._muted + + @property + def elapsed_time(self): + """Return elapsed timefor first child player.""" + if self.state in [PlaybackState.Playing, PlaybackState.Paused]: + for player_id in self.group_childs: + player = self.mass.player_manager.get_player(player_id) + if player: + return player.elapsed_time + return 0 + + @property + def should_poll(self): + """Return True if this player should be polled for state.""" + return self.state in [PlaybackState.Playing, PlaybackState.Paused] + + @property + def is_group_player(self) -> bool: + """Return True if this player is a group player.""" + return True + + @property + def group_childs(self): + """Return group childs of this group player.""" + player_conf = self.mass.config.get_player_config(self.player_id) + if player_conf and player_conf.get(CONF_PLAYERS): + return player_conf[CONF_PLAYERS] + return [] + + @property + def device_info(self) -> DeviceInfo: + """Return deviceinfo.""" + return DeviceInfo( + model="Group Player", + manufacturer=PROV_ID, + ) + + @property + def config_entries(self): + """Return config entries for this group player.""" + all_players = [ + {"text": item.name, "value": item.player_id} + for item in self.mass.player_manager.players + if item.player_id is not self._player_id + ] + selected_players = self.mass.config.get_player_config(self.player_id).get( + CONF_PLAYERS, [] + ) + default_master = "" + if selected_players: + default_master = selected_players[0] + return [ + ConfigEntry( + entry_key=CONF_PLAYERS, + entry_type=ConfigEntryType.STRING, + default_value=[], + values=all_players, + description_key=CONF_PLAYERS, + multi_value=True, + ), + ConfigEntry( + entry_key=CONF_MASTER, + entry_type=ConfigEntryType.STRING, + default_value=default_master, + values=selected_players, + description_key=CONF_MASTER, + multi_value=False, + depends_on=CONF_MASTER, + ), + ] + + # SERVICE CALLS / PLAYER COMMANDS + + async def async_cmd_play_uri(self, uri: str): + """Play the specified uri/url on the player.""" + await self.async_cmd_stop() + self._current_uri = uri + self._state = PlaybackState.Playing + self._powered = True + # forward this command to each child player + # TODO: Only start playing on powered players ? + # Monitor if a child turns on and join it to the sync ? + for child_player_id in self.group_childs: + child_player = self.mass.player_manager.get_player(child_player_id) + if child_player: + queue_stream_uri = f"{self.mass.web.internal_url}/stream/group/{self.player_id}?player_id={child_player_id}" + await child_player.async_cmd_play_uri(queue_stream_uri) + self.update_state() + self.stream_task = self.mass.add_job(self.async_queue_stream_task()) + + async def async_cmd_stop(self) -> None: + """Send STOP command to player.""" + self._state = PlaybackState.Stopped + if self.stream_task: + # cancel existing stream task if any + self.stream_task.cancel() + self.connected_clients = {} + await asyncio.sleep(0.5) + if self.sync_task: + self.sync_task.cancel() + # forward this command to each child player + # TODO: Only forward to powered child players + for child_player_id in self.group_childs: + child_player = self.mass.player_manager.get_player(child_player_id) + if child_player: + await child_player.async_cmd_stop() + self.update_state() + + async def async_cmd_play(self) -> None: + """Send PLAY command to player.""" + if not self.state == PlaybackState.Paused: + return + # forward this command to each child player + for child_player_id in self.group_childs: + child_player = self.mass.player_manager.get_player(child_player_id) + if child_player: + await child_player.async_cmd_play() + self._state = PlaybackState.Playing + self.update_state() + + async def async_cmd_pause(self): + """Send PAUSE command to player.""" + # forward this command to each child player + for child_player_id in self.group_childs: + child_player = self.mass.player_manager.get_player(child_player_id) + if child_player: + await child_player.async_cmd_pause() + self._state = PlaybackState.Paused + self.update_state() + + async def async_cmd_power_on(self) -> None: + """Send POWER ON command to player.""" + self._powered = True + self.update_state() + + async def async_cmd_power_off(self) -> None: + """Send POWER OFF command to player.""" + await self.async_cmd_stop() + self._powered = False + self.update_state() + + async def async_cmd_volume_set(self, volume_level: int) -> None: + """ + Send volume level command to player. + + :param volume_level: volume level to set (0..100). + """ + for child_player_id in self.group_childs: + child_player = self.mass.player_manager.get_player(child_player_id) + if child_player and child_player.powered: + await child_player.async_cmd_volume_set(volume_level) + + async def async_cmd_volume_mute(self, is_muted=False): + """ + Send volume MUTE command to given player. + + :param is_muted: bool with new mute state. + """ + for child_player_id in self.group_childs: + child_player = self.mass.player_manager.get_player(child_player_id) + if child_player and child_player.powered: + await child_player.async_cmd_volume_mute(is_muted) + self.muted = is_muted + + async def subscribe_stream_client(self, child_player_id): + """Handle streaming to all players of a group. Highly experimental.""" + + # each connected client gets its own sox process to convert incoming pcm samples + # to flac (which is streamed to the player). + args = [ + "sox", + "-t", + "s32", + "-c", + "2", + "-r", + "96000", + "-", + "-t", + "flac", + "-C", + "0", + "-", + ] + sox_proc = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE, + ) + chunk_size = 2880000 # roughly 5 seconds of flac @ 96000/32 + try: + # report this client as connected + self.connected_clients[child_player_id] = sox_proc.stdin + LOGGER.debug( + "[%s] child player connected: %s", + self.player_id, + child_player_id, + ) + # yield flac chunks from stdout to the http streamresponse + while True: + try: + chunk = await sox_proc.stdout.readexactly(chunk_size) + yield chunk + except asyncio.IncompleteReadError as exc: + chunk = exc.partial + yield chunk + break + except (GeneratorExit, Exception): # pylint: disable=broad-except + LOGGER.warning( + "[%s] child player aborted stream: %s", self.player_id, child_player_id + ) + self.connected_clients.pop(child_player_id, None) + sox_proc.terminate() + await sox_proc.communicate() + await sox_proc.wait() + else: + self.connected_clients.pop(child_player_id, None) + LOGGER.debug( + "[%s] child player completed streaming: %s", + self.player_id, + child_player_id, + ) + + async def async_queue_stream_task(self): + """Handle streaming queue to connected child players.""" + ticks = 0 + while ticks < 60 and len(self.connected_clients) != len(self.group_childs): + # TODO: Support situation where not alle clients of the group are powered + await asyncio.sleep(0.1) + ticks += 1 + if not self.connected_clients: + LOGGER.warning("no clients!") + return + LOGGER.debug( + "start queue stream with %s connected clients", len(self.connected_clients) + ) + self.sync_task = asyncio.create_task(self.__synchronize_players()) + + received_milliseconds = 0 + received_seconds = 0 + async for audio_chunk in self.mass.stream_manager.async_queue_stream_pcm( + self.player_id, sample_rate=96000, bit_depth=32 + ): + received_seconds += 1 + received_milliseconds += 1000 + chunk_size = len(audio_chunk) + start_bytes = 0 + + # make sure we still have clients connected + if not self.connected_clients: + LOGGER.warning("no more clients!") + return + + # send the audio chunk to all connected players + for child_player_id, writer in self.connected_clients.items(): + + # work out startdelay + if received_seconds == 1: + player_delay = self.mass.config.player_settings[ + child_player_id + ].get(CONF_GROUP_DELAY, 0) + if player_delay: + start_bytes = int( + ((player_delay - received_milliseconds) / 1000) * chunk_size + ) + else: + start_bytes = 0 + + # send the data to the client + try: + writer.write(audio_chunk[start_bytes:]) + await writer.drain() + except (BrokenPipeError, ConnectionResetError, AssertionError): + pass # happens at client disconnect + + if not self.connected_clients: + LOGGER.warning("no more clients!") + return + + async def __synchronize_players(self): + """Handle drifting/lagging by monitoring progress and compare to master player.""" + + master_player_id = self.mass.config.player_settings[self.player_id].get( + CONF_MASTER + ) + if not master_player_id: + LOGGER.warning("Synchronization of playback aborted: no master player.") + return + else: + LOGGER.debug( + "Synchronize playback of group using master player %s", master_player_id + ) + master_player = self.mass.player_manager.get_player(master_player_id) + + # wait until master is playing + while master_player.state != PlaybackState.Playing: + await asyncio.sleep(0.1) + await asyncio.sleep(0.5) + + prev_lags = {} + prev_drifts = {} + + while self.connected_clients: + + # check every 2 seconds for player sync + await asyncio.sleep(0.5) + + for child_player_id in self.connected_clients: + + if child_player_id == master_player_id: + continue + child_player = self.mass.player_manager.get_player(child_player_id) + + if ( + not child_player + or child_player.state != PlaybackState.Playing + or child_player.elapsed_milliseconds is None + ): + continue + + if child_player_id not in prev_lags: + prev_lags[child_player_id] = [] + if child_player_id not in prev_drifts: + prev_drifts[child_player_id] = [] + + # calculate lag (player is too slow in relation to the master) + lag = ( + master_player.elapsed_milliseconds + - child_player.elapsed_milliseconds + ) + prev_lags[child_player_id].append(lag) + if len(prev_lags[child_player_id]) == 10: + # if we have 10 samples calclate the average lag + avg_lag = sum(prev_lags[child_player_id]) / len( + prev_lags[child_player_id] + ) + prev_lags[child_player_id] = [] + if avg_lag > 50: + LOGGER.debug( + "child player %s is lagging behind with %s milliseconds", + child_player_id, + avg_lag, + ) + # we correct the lag by pausing the master player for a very short time + await master_player.async_cmd_pause() + # sending the command takes some time, account for that too + if avg_lag > 20: + sleep_time = avg_lag - 20 + await asyncio.sleep(sleep_time / 1000) + asyncio.create_task(master_player.async_cmd_play()) + break # no more processing this round if we've just corrected a lag + + # calculate drift (player is going faster in relation to the master) + drift = ( + child_player.elapsed_milliseconds + - master_player.elapsed_milliseconds + ) + prev_drifts[child_player_id].append(drift) + if len(prev_drifts[child_player_id]) == 10: + # if we have 10 samples calculate the average drift + avg_drift = sum(prev_drifts[child_player_id]) / len( + prev_drifts[child_player_id] + ) + prev_drifts[child_player_id] = [] + + if avg_drift > 50: + LOGGER.debug( + "child player %s is drifting ahead with %s milliseconds", + child_player_id, + avg_drift, + ) + # we correct the drift by pausing the player for a very short time + # this is not the best approach but works with all playertypes + # temporary solution until I find something better like sending more/less pcm chunks + await child_player.async_cmd_pause() + # sending the command takes some time, account for that too + if avg_drift > 20: + sleep_time = drift - 20 + await asyncio.sleep(sleep_time / 1000) + await child_player.async_cmd_play() + break # no more processing this round if we've just corrected a lag diff --git a/music_assistant/providers/sonos/sonos.py b/music_assistant/providers/sonos/sonos.py index bd771639..f48b7aba 100644 --- a/music_assistant/providers/sonos/sonos.py +++ b/music_assistant/providers/sonos/sonos.py @@ -7,7 +7,12 @@ from typing import List import soco from music_assistant.models.config_entry import ConfigEntry -from music_assistant.models.player import DeviceInfo, Player, PlayerFeature, PlayerState +from music_assistant.models.player import ( + DeviceInfo, + PlaybackState, + Player, + PlayerFeature, +) from music_assistant.models.player_queue import QueueItem from music_assistant.models.playerprovider import PlayerProvider from music_assistant.utils import run_periodic @@ -353,7 +358,7 @@ class SonosProvider(PlayerProvider): ) rel_time = __timespan_secs(position_info.get("RelTime")) player.elapsed_time = rel_time - if player.state == PlayerState.Playing: + if player.state == PlaybackState.Playing: self.mass.add_job(self.__async_report_progress(player_id)) self.mass.add_job(self.mass.player_manager.async_update_player(player)) @@ -389,7 +394,7 @@ class SonosProvider(PlayerProvider): # so we need to send it in periodically player = self._players[player_id] player.should_poll = True - while player and player.state == PlayerState.Playing: + while player and player.state == PlaybackState.Playing: time_diff = time.time() - player.media_position_updated_at adjusted_current_time = player.elapsed_time + time_diff player.elapsed_time = adjusted_current_time @@ -398,15 +403,15 @@ class SonosProvider(PlayerProvider): self._report_progress_tasks.pop(player_id, None) -def __convert_state(sonos_state: str) -> PlayerState: - """Convert Sonos state to PlayerState.""" +def __convert_state(sonos_state: str) -> PlaybackState: + """Convert Sonos state to PlaybackState.""" if sonos_state == "PLAYING": - return PlayerState.Playing + return PlaybackState.Playing if sonos_state == "TRANSITIONING": - return PlayerState.Playing + return PlaybackState.Playing if sonos_state == "PAUSED_PLAYBACK": - return PlayerState.Paused - return PlayerState.Stopped + return PlaybackState.Paused + return PlaybackState.Stopped def __timespan_secs(timespan): diff --git a/music_assistant/providers/spotify/__init__.py b/music_assistant/providers/spotify/__init__.py index d39926bc..f67fc8e3 100644 --- a/music_assistant/providers/spotify/__init__.py +++ b/music_assistant/providers/spotify/__init__.py @@ -3,7 +3,6 @@ import asyncio import logging import os import platform -import subprocess import time from typing import List, Optional @@ -470,8 +469,7 @@ class SpotifyProvider(MusicProvider): if not self._username or not self._password: return tokeninfo # retrieve token with spotty - task = self.mass.add_job(self.__get_token) - tokeninfo = await task + tokeninfo = await self.__async_get_token() if tokeninfo: self.__auth_token = tokeninfo self.sp_user = await self.__async_get_data("me") @@ -481,7 +479,7 @@ class SpotifyProvider(MusicProvider): raise Exception("Can't get Spotify token for user %s" % self._username) return tokeninfo - def __get_token(self): + async def __async_get_token(self): """Get spotify auth token with spotty bin.""" # get token with spotty scopes = [ @@ -519,11 +517,15 @@ class SpotifyProvider(MusicProvider): self.mass.config.data_path, "--disable-discovery", ] - spotty = subprocess.Popen( - args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT + spotty = await asyncio.create_subprocess_exec( + *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT ) - stdout, _ = spotty.communicate() - result = json.loads(stdout) + stdout, _ = await spotty.communicate() + try: + result = json.loads(stdout) + except json.decoder.JSONDecodeError: + LOGGER.warning("Error while retrieving Spotify token!") + result = None # transform token info to spotipy compatible format if result and "accessToken" in result: tokeninfo = result diff --git a/music_assistant/providers/squeezebox/__init__.py b/music_assistant/providers/squeezebox/__init__.py index a9336f4a..d0302d0d 100644 --- a/music_assistant/providers/squeezebox/__init__.py +++ b/music_assistant/providers/squeezebox/__init__.py @@ -4,15 +4,12 @@ import asyncio import logging from typing import List -from music_assistant.constants import CONF_CROSSFADE_DURATION from music_assistant.models.config_entry import ConfigEntry -from music_assistant.models.player import DeviceInfo, PlayerFeature -from music_assistant.models.player_queue import QueueItem from music_assistant.models.playerprovider import PlayerProvider from .constants import PROV_ID, PROV_NAME from .discovery import DiscoveryProtocol -from .socket_client import Event, SqueezeSocketClient +from .socket_client import SqueezeSocketClient CONF_LAST_POWER = "last_power" CONF_LAST_VOLUME = "last_volume" @@ -20,8 +17,6 @@ CONF_LAST_VOLUME = "last_volume" LOGGER = logging.getLogger(PROV_ID) CONFIG_ENTRIES = [] # we don't have any provider config entries (for now) -PLAYER_FEATURES = [PlayerFeature.QUEUE, PlayerFeature.CROSSFADE, PlayerFeature.GAPLESS] -PLAYER_CONFIG_ENTRIES = [] # we don't have any player config entries (for now) async def async_setup(mass): @@ -33,7 +28,6 @@ async def async_setup(mass): class PySqueezeProvider(PlayerProvider): """Python implementation of SlimProto server.""" - _socket_clients = {} _tasks = [] @property @@ -66,222 +60,6 @@ class PySqueezeProvider(PlayerProvider): """Handle correct close/cleanup of the provider on exit.""" for task in self._tasks: task.cancel() - for client in self._socket_clients.values(): - client.close() - - async def async_cmd_play_uri(self, player_id: str, uri: str): - """ - Play the specified uri/url on the given player. - - :param player_id: player_id of the player to handle the command. - """ - socket_client = self._socket_clients.get(player_id) - if socket_client: - crossfade = self.mass.config.player_settings[player_id][ - CONF_CROSSFADE_DURATION - ] - await socket_client.async_cmd_play_uri( - uri, send_flush=True, crossfade_duration=crossfade - ) - else: - LOGGER.warning("Received command for unavailable player: %s", player_id) - - async def async_cmd_stop(self, player_id: str) -> None: - """ - Send STOP command to given player. - - :param player_id: player_id of the player to handle the command. - """ - socket_client = self._socket_clients.get(player_id) - if socket_client: - await socket_client.async_cmd_stop() - else: - LOGGER.warning("Received command for unavailable player: %s", player_id) - - async def async_cmd_play(self, player_id: str) -> None: - """ - Send PLAY command to given player. - - :param player_id: player_id of the player to handle the command. - """ - socket_client = self._socket_clients.get(player_id) - if socket_client: - await socket_client.async_cmd_play() - else: - LOGGER.warning("Received command for unavailable player: %s", player_id) - - async def async_cmd_pause(self, player_id: str): - """ - Send PAUSE command to given player. - - :param player_id: player_id of the player to handle the command. - """ - socket_client = self._socket_clients.get(player_id) - if socket_client: - await socket_client.async_cmd_pause() - else: - LOGGER.warning("Received command for unavailable player: %s", player_id) - - async def async_cmd_next(self, player_id: str): - """ - Send NEXT TRACK command to given player. - - :param player_id: player_id of the player to handle the command. - """ - queue = self.mass.player_manager.get_player_queue(player_id) - if queue: - new_track = queue.get_item(queue.cur_index + 1) - if new_track: - await self.async_cmd_play_uri(player_id, new_track.uri) - - async def async_cmd_previous(self, player_id: str): - """ - Send PREVIOUS TRACK command to given player. - - :param player_id: player_id of the player to handle the command. - """ - queue = self.mass.player_manager.get_player_queue(player_id) - if queue: - new_track = queue.get_item(queue.cur_index - 1) - if new_track: - await self.async_cmd_play_uri(player_id, new_track.uri) - - async def async_cmd_power_on(self, player_id: str) -> None: - """ - Send POWER ON command to given player. - - :param player_id: player_id of the player to handle the command. - """ - socket_client = self._socket_clients.get(player_id) - if socket_client: - await socket_client.async_cmd_power(True) - # save power and volume state in cache - cache_str = f"squeezebox_player_state_{player_id}" - await self.mass.cache.async_set( - cache_str, (True, socket_client.volume_level) - ) - else: - LOGGER.warning("Received command for unavailable player: %s", player_id) - - async def async_cmd_power_off(self, player_id: str) -> None: - """ - Send POWER OFF command to given player. - - :param player_id: player_id of the player to handle the command. - """ - socket_client = self._socket_clients.get(player_id) - if socket_client: - await socket_client.async_cmd_power(False) - # store last power state as we need it when the player (re)connects - # save power and volume state in cache - cache_str = f"squeezebox_player_state_{player_id}" - await self.mass.cache.async_set( - cache_str, (False, socket_client.volume_level) - ) - else: - LOGGER.warning("Received command for unavailable player: %s", player_id) - - async def async_cmd_volume_set(self, player_id: str, volume_level: int) -> None: - """ - Send volume level command to given player. - - :param player_id: player_id of the player to handle the command. - :param volume_level: volume level to set (0..100). - """ - socket_client = self._socket_clients.get(player_id) - if socket_client: - await socket_client.async_cmd_volume_set(volume_level) - # save power and volume state in cache - cache_str = f"squeezebox_player_state_{player_id}" - await self.mass.cache.async_set( - cache_str, (socket_client.powered, volume_level) - ) - else: - LOGGER.warning("Received command for unavailable player: %s", player_id) - - async def async_cmd_volume_mute(self, player_id: str, is_muted=False): - """ - Send volume MUTE command to given player. - - :param player_id: player_id of the player to handle the command. - :param is_muted: bool with new mute state. - """ - socket_client = self._socket_clients.get(player_id) - if socket_client: - await socket_client.async_cmd_mute(is_muted) - else: - LOGGER.warning("Received command for unavailable player: %s", player_id) - - async def async_cmd_queue_play_index(self, player_id: str, index: int): - """ - Play item at index X on player's queue. - - :param player_id: player_id of the player to handle the command. - :param index: (int) index of the queue item that should start playing - """ - queue = self.mass.player_manager.get_player_queue(player_id) - if queue: - new_track = queue.get_item(index) - if new_track: - await self.async_cmd_play_uri(player_id, new_track.uri) - - async def async_cmd_queue_load(self, player_id: str, queue_items: List[QueueItem]): - """ - Load/overwrite given items in the player's queue implementation. - - :param player_id: player_id of the player to handle the command. - :param queue_items: a list of QueueItems - """ - if queue_items: - await self.async_cmd_play_uri(player_id, queue_items[0].uri) - - async def async_cmd_queue_insert( - self, player_id: str, queue_items: List[QueueItem], insert_at_index: int - ): - """ - Insert new items at position X into existing queue. - - If insert_at_index 0 or None, will start playing newly added item(s) - :param player_id: player_id of the player to handle the command. - :param queue_items: a list of QueueItems - :param insert_at_index: queue position to insert new items - """ - # queue handled by built-in queue controller - # we only check the start index - queue = self.mass.player_manager.get_player_queue(player_id) - if queue and insert_at_index == queue.cur_index: - return await self.async_cmd_queue_play_index(player_id, insert_at_index) - - async def async_cmd_queue_append( - self, player_id: str, queue_items: List[QueueItem] - ): - """ - Append new items at the end of the queue. - - :param player_id: player_id of the player to handle the command. - :param queue_items: a list of QueueItems - """ - # automagically handled by built-in queue controller - - async def async_cmd_queue_update( - self, player_id: str, queue_items: List[QueueItem] - ): - """ - Overwrite the existing items in the queue, used for reordering. - - :param player_id: player_id of the player to handle the command. - :param queue_items: a list of QueueItems - """ - # automagically handled by built-in queue controller - - async def async_cmd_queue_clear(self, player_id: str): - """ - Clear the player's queue. - - :param player_id: player_id of the player to handle the command. - """ - # queue is handled by built-in queue controller but send stop - return await self.async_cmd_stop(player_id) async def async_start_discovery(self): """Start discovery for players.""" @@ -299,46 +77,5 @@ class PySqueezeProvider(PlayerProvider): """Handle a client connection on the socket.""" addr = writer.get_extra_info("peername") LOGGER.debug("New socket client connected: %s", addr) - SqueezeSocketClient(reader, writer, self.__client_event) - - async def __client_event(self, event: str, socket_client: SqueezeSocketClient): - """Received event from a socket client.""" - if event == Event.EVENT_CONNECTED: - # set some attributes to make the socket client compatible with mass player format - socket_client.should_poll = False - socket_client.provider_id = PROV_ID - socket_client.available = True - socket_client.is_group_player = False - socket_client.group_childs = [] - socket_client.device_info = DeviceInfo( - model=socket_client.device_type, address=socket_client.device_address - ) - socket_client.features = PLAYER_FEATURES - socket_client.config_entries = PLAYER_CONFIG_ENTRIES - # restore power/volume states - cache_str = f"squeezebox_player_state_{socket_client.player_id}" - cache_data = await self.mass.cache.async_get(cache_str) - last_power, last_volume = cache_data if cache_data else (False, 40) - await socket_client.async_cmd_volume_set(last_volume) - await socket_client.async_cmd_power(last_power) - await self.mass.player_manager.async_add_player(socket_client) - self._socket_clients[socket_client.player_id] = socket_client - elif event == Event.EVENT_UPDATED: - await self.mass.player_manager.async_update_player(socket_client) - elif event == Event.EVENT_DISCONNECTED: - await self.mass.player_manager.async_remove_player(socket_client.player_id) - self._socket_clients.pop(socket_client.player_id, None) - del socket_client - elif event == Event.EVENT_DECODER_READY: - # player is ready for the next track (if any) - player_id = socket_client.player_id - queue = self.mass.player_manager.get_player_queue(socket_client.player_id) - if queue: - next_item = queue.next_item - if next_item: - crossfade = self.mass.config.player_settings[player_id][ - CONF_CROSSFADE_DURATION - ] - await self._socket_clients[player_id].async_cmd_play_uri( - next_item.uri, send_flush=False, crossfade_duration=crossfade - ) + socket_client = SqueezeSocketClient(reader, writer) + socket_client.mass = self.mass diff --git a/music_assistant/providers/squeezebox/socket_client.py b/music_assistant/providers/squeezebox/socket_client.py index c0053044..dd9b9e6a 100644 --- a/music_assistant/providers/squeezebox/socket_client.py +++ b/music_assistant/providers/squeezebox/socket_client.py @@ -5,16 +5,26 @@ import logging import re import struct import time -from enum import Enum -from typing import Awaitable - -from music_assistant.models.player import PlayerState +from typing import List + +from music_assistant.constants import CONF_CROSSFADE_DURATION +from music_assistant.models.config_entry import ConfigEntry +from music_assistant.models.player import ( + DeviceInfo, + PlaybackState, + Player, + PlayerFeature, +) +from music_assistant.models.player_queue import QueueItem from music_assistant.utils import callback, run_periodic from .constants import PROV_ID LOGGER = logging.getLogger(PROV_ID) +PLAYER_FEATURES = [PlayerFeature.QUEUE, PlayerFeature.CROSSFADE, PlayerFeature.GAPLESS] +PLAYER_CONFIG_ENTRIES = [] # we don't have any player config entries (for now) + # from http://wiki.slimdevices.com/index.php/SlimProtoTCPProtocol#HELO DEVICE_TYPE = { 2: "squeezebox", @@ -31,58 +41,52 @@ DEVICE_TYPE = { } -class Event(Enum): - """Enum with the various events the socket client fires.""" - - EVENT_CONNECTED = "connected" - EVENT_DISCONNECTED = "disconnected" - EVENT_UPDATED = "updated" - EVENT_DECODER_READY = "decoder_ready" - - -class SqueezeSocketClient: +class SqueezeSocketClient(Player): """Squeezebox socket client.""" - def __init__( - self, - reader: asyncio.StreamReader, - writer: asyncio.StreamWriter, - event_callback: Awaitable = None, - ): + def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): """Initialize the socket client.""" self._reader = reader self._writer = writer - self._event_callback = event_callback self._player_id = "" self._device_type = "" self._device_name = "" self._last_volume = 0 self._last_heartbeat = 0 - self._cur_time_milliseconds = 0 self._volume_control = PySqueezeVolume() self._volume_level = 0 self._powered = False self._muted = False - self._state = PlayerState.Stopped - self._elapsed_time = 0 + self._state = PlaybackState.Stopped + self._elapsed_seconds = 0 + self._elapsed_milliseconds = 0 self._current_uri = "" self._tasks = [ asyncio.create_task(self.__async_socket_reader()), asyncio.create_task(self.__async_send_heartbeat()), ] - def close(self): - """Cleanup when the socket client needs to closed.""" + async def async_close(self): + """Cleanup when the socket client needs to close.""" for task in self._tasks: if not task.cancelled(): task.cancel() - asyncio.create_task(self._event_callback(Event.EVENT_DISCONNECTED, self)) + await self.mass.player_manager.async_remove_player(self.player_id) + + async def async_on_remove(self) -> None: + """Call when player is removed from the player manager.""" + await self.async_close() @property def player_id(self) -> str: """Return player id (=mac address) of the player.""" return self._player_id + @property + def provider_id(self) -> str: + """Return provider id of this player.""" + return PROV_ID + @property def device_type(self) -> str: """Return device type of the player.""" @@ -123,14 +127,36 @@ class SqueezeSocketClient: @property def elapsed_time(self): - """Return elapsed_time of current playing track.""" - return self._elapsed_time + """Return elapsed_time of current playing track in (fractions of) seconds.""" + return self._elapsed_seconds + + @property + def elapsed_milliseconds(self) -> int: + """Return (realtime) elapsed time of current playing media in milliseconds.""" + return self._elapsed_milliseconds + int( + (time.time() * 1000) - (self._last_heartbeat * 1000) + ) @property def current_uri(self): """Return uri of currently loaded track.""" return self._current_uri + @property + def features(self) -> List[PlayerFeature]: + """Return list of features this player supports.""" + return PLAYER_FEATURES + + @property + def config_entries(self) -> List[ConfigEntry]: + """Return player specific config entries (if any).""" + return PLAYER_CONFIG_ENTRIES + + @property + def device_info(self) -> DeviceInfo: + """Return the device info for this player.""" + return DeviceInfo(model=self.device_type, address=self.device_address) + async def __async_initialize_player(self): """Set some startup settings for the player.""" # send version @@ -153,13 +179,25 @@ class SqueezeSocketClient: data = self.__pack_stream(b"p", autostart=b"0", flags=0) await self.__async_send_frame(b"strm", data) + async def async_cmd_power_on(self) -> None: + """Send POWER ON command to player.""" + return await self.async_cmd_power(True) + + async def async_cmd_power_off(self) -> None: + """Send POWER OFF command to player.""" + await self.async_cmd_stop() + return await self.async_cmd_power(False) + async def async_cmd_power(self, powered: bool = True): """Send power command to player.""" # power is not supported so abuse mute instead power_int = 1 if powered else 0 await self.__async_send_frame(b"aude", struct.pack("2B", power_int, 1)) self._powered = powered - asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self)) + self.update_state() + # save power and volume state in cache + cache_str = f"squeezebox_player_state_{self.player_id}" + await self.mass.cache.async_set(cache_str, (True, self.volume_level)) async def async_cmd_volume_set(self, volume_level: int): """Send new volume level command to player.""" @@ -178,7 +216,16 @@ class SqueezeSocketClient: await self.__async_send_frame(b"aude", struct.pack("2B", muted_int, 0)) self.muted = muted - async def async_cmd_play_uri( + async def async_cmd_play_uri(self, uri: str): + """Request player to start playing a single uri.""" + crossfade = self.mass.config.player_settings[self.player_id][ + CONF_CROSSFADE_DURATION + ] + await self.__async_cmd_handle_play_uri( + uri, send_flush=True, crossfade_duration=crossfade + ) + + async def __async_cmd_handle_play_uri( self, uri: str, send_flush: bool = True, crossfade_duration: int = 0 ): """Request player to start playing a single uri.""" @@ -212,10 +259,84 @@ class SqueezeSocketClient: elif not port: port = 80 headers = f"Connection: close\r\nAccept: */*\r\nHost: {host}:{port}\r\n" - request = "GET %s HTTP/1.0\r\n%s\r\n" % (uri, headers) + request = "GET %s HTTP/1.1\r\n%s\r\n" % (uri, headers) data = data + request.encode("utf-8") await self.__async_send_frame(b"strm", data) + async def async_cmd_next(self): + """Send NEXT TRACK command to player.""" + queue = self.mass.player_manager.get_player_queue(self.player_id) + if queue: + new_track = queue.get_item(queue.cur_index + 1) + if new_track: + await self.__async_cmd_handle_play_uri(new_track.uri) + + async def async_cmd_previous(self): + """Send PREVIOUS TRACK command to player.""" + queue = self.mass.player_manager.get_player_queue(self.player_id) + if queue: + new_track = queue.get_item(queue.cur_index - 1) + if new_track: + await self.async_cmd_play_uri(new_track.uri) + + async def async_cmd_queue_play_index(self, index: int): + """ + Play item at index X on player's queue. + + :param index: (int) index of the queue item that should start playing + """ + queue = self.mass.player_manager.get_player_queue(self.player_id) + if queue: + new_track = queue.get_item(index) + if new_track: + await self.async_cmd_play_uri(new_track.uri) + + async def async_cmd_queue_load(self, queue_items: List[QueueItem]): + """ + Load/overwrite given items in the player's queue implementation. + + :param queue_items: a list of QueueItems + """ + if queue_items: + await self.async_cmd_play_uri(queue_items[0].uri) + + async def async_cmd_queue_insert( + self, queue_items: List[QueueItem], insert_at_index: int + ): + """ + Insert new items at position X into existing queue. + + If insert_at_index 0 or None, will start playing newly added item(s) + :param queue_items: a list of QueueItems + :param insert_at_index: queue position to insert new items + """ + # queue handled by built-in queue controller + # we only check the start index + queue = self.mass.player_manager.get_player_queue(self.player_id) + if queue and insert_at_index == queue.cur_index: + return await self.async_cmd_queue_play_index(insert_at_index) + + async def async_cmd_queue_append(self, queue_items: List[QueueItem]): + """ + Append new items at the end of the queue. + + :param queue_items: a list of QueueItems + """ + # automagically handled by built-in queue controller + + async def async_cmd_queue_update(self, queue_items: List[QueueItem]): + """ + Overwrite the existing items in the queue, used for reordering. + + :param queue_items: a list of QueueItems + """ + # automagically handled by built-in queue controller + + async def async_cmd_queue_clear(self): + """Clear the player's queue.""" + # queue is handled by built-in queue controller but send stop + return await self.async_cmd_stop() + @run_periodic(5) async def __async_send_heartbeat(self): """Send periodic heartbeat message to player.""" @@ -227,10 +348,13 @@ class SqueezeSocketClient: """Send command to Squeeze player.""" if self._reader.at_eof() or self._writer.is_closing(): LOGGER.debug("Socket is disconnected.") - return self.close() + return await self.async_close() packet = struct.pack("!H", len(data) + 4) + command + data - self._writer.write(packet) - await self._writer.drain() + try: + self._writer.write(packet) + await self._writer.drain() + except ConnectionResetError: + pass async def __async_socket_reader(self): """Handle incoming data from socket.""" @@ -255,7 +379,7 @@ class SqueezeSocketClient: handler(packet) # EOF reached: socket is disconnected LOGGER.info("Socket disconnected: %s", self._writer.get_extra_info("peername")) - self.close() + await self.async_close() @callback @staticmethod @@ -304,7 +428,17 @@ class SqueezeSocketClient: self._device_type = DEVICE_TYPE.get(dev_id, "unknown device") LOGGER.info("Player connected: %s", self.name) asyncio.create_task(self.__async_initialize_player()) - asyncio.create_task(self._event_callback(Event.EVENT_CONNECTED, self)) + # add player to player manager + asyncio.create_task(self.mass.player_manager.async_add_player(self)) + asyncio.create_task(self.async_restore_states()) + + async def async_restore_states(self): + """Restore power/volume states.""" + cache_str = f"squeezebox_player_state_{self.player_id}" + cache_data = await self.mass.cache.async_get(cache_str) + last_power, last_volume = cache_data if cache_data else (False, 40) + await self.async_cmd_volume_set(last_volume) + await self.async_cmd_power(last_power) @callback def _process_stat(self, data): @@ -327,7 +461,7 @@ class SqueezeSocketClient: powered = spdif_enable or dac_enable self._powered = powered self._muted = not powered - asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self)) + self.update_state() @callback def _process_stat_audg(self, data): @@ -335,22 +469,35 @@ class SqueezeSocketClient: # TODO: process volume level LOGGER.debug("AUDg received - Volume level: %s", data) self._volume_level = self._volume_control.volume - asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self)) + self.update_state() @callback def _process_stat_stmd(self, data): """Process incoming stat STMd message (decoder ready).""" # pylint: disable=unused-argument LOGGER.debug("STMu received - Decoder Ready for next track.") - asyncio.create_task(self._event_callback(Event.EVENT_DECODER_READY, self)) + queue = self.mass.player_manager.get_player_queue(self.player_id) + if queue: + next_item = queue.next_item + if next_item: + crossfade = self.mass.config.player_settings[self.player_id][ + CONF_CROSSFADE_DURATION + ] + asyncio.create_task( + self.__async_cmd_handle_play_uri( + next_item.uri, send_flush=False, crossfade_duration=crossfade + ) + ) @callback def _process_stat_stmf(self, data): """Process incoming stat STMf message (connection closed).""" # pylint: disable=unused-argument LOGGER.debug("STMf received - connection closed.") - self._state = PlayerState.Stopped - asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self)) + self._state = PlaybackState.Stopped + self._elapsed_milliseconds = 0 + self._elapsed_seconds = 0 + self.update_state() @callback @classmethod @@ -369,31 +516,30 @@ class SqueezeSocketClient: """Process incoming stat STMp message: Pause confirmed.""" # pylint: disable=unused-argument LOGGER.debug("STMp received - pause confirmed.") - self._state = PlayerState.Paused - asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self)) + self._state = PlaybackState.Paused + self.update_state() @callback def _process_stat_stmr(self, data): """Process incoming stat STMr message: Resume confirmed.""" # pylint: disable=unused-argument LOGGER.debug("STMr received - resume confirmed.") - self._state = PlayerState.Playing - asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self)) + self._state = PlaybackState.Playing + self.update_state() @callback def _process_stat_stms(self, data): # pylint: disable=unused-argument """Process incoming stat STMs message: Playback of new track has started.""" LOGGER.debug("STMs received - playback of new track has started.") - self._state = PlayerState.Playing - asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self)) + self._state = PlaybackState.Playing + self.update_state() @callback def _process_stat_stmt(self, data): """Process incoming stat STMt message: heartbeat from client.""" # pylint: disable=unused-variable - timestamp = time.time() - self._last_heartbeat = timestamp + self._last_heartbeat = time.time() ( num_crlf, mas_initialized, @@ -412,17 +558,21 @@ class SqueezeSocketClient: server_timestamp, error_code, ) = struct.unpack("!BBBLLLLHLLLLHLLH", data) - if self._state == PlayerState.Playing and elapsed_seconds != self._elapsed_time: - self._elapsed_time = elapsed_seconds - asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self)) + if self.state == PlaybackState.Playing: + # elapsed seconds is weird when player is buffering etc. + # only rely on it if player is playing + self._elapsed_milliseconds = elapsed_milliseconds + if self._elapsed_seconds != elapsed_seconds: + self._elapsed_seconds = elapsed_seconds + self.update_state() @callback def _process_stat_stmu(self, data): """Process incoming stat STMu message: Buffer underrun: Normal end of playback.""" # pylint: disable=unused-argument LOGGER.debug("STMu received - end of playback.") - self.state = PlayerState.Stopped - asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self)) + self.state = PlaybackState.Stopped + self.update_state() @callback def _process_resp(self, data): @@ -439,7 +589,7 @@ class SqueezeSocketClient: # received player name data = data[1:].decode() self._device_name = data - asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self)) + self.update_state() class PySqueezeVolume: diff --git a/music_assistant/providers/webplayer/__init__.py b/music_assistant/providers/webplayer/__init__.py index c623d26b..c3d93ea8 100644 --- a/music_assistant/providers/webplayer/__init__.py +++ b/music_assistant/providers/webplayer/__init__.py @@ -4,7 +4,7 @@ import time from typing import List from music_assistant.models.config_entry import ConfigEntry -from music_assistant.models.player import Player, PlayerState +from music_assistant.models.player import PlaybackState, Player from music_assistant.models.playerprovider import PlayerProvider from music_assistant.utils import run_periodic @@ -147,7 +147,7 @@ class WebPlayerProvider(PlayerProvider): if "muted" in data: player.muted = data["muted"] if "state" in data: - player.state = PlayerState(data["state"]) + player.state = PlaybackState(data["state"]) if "cur_time" in data: player.elapsed_time = data["elapsed_time"] if "current_uri" in data: diff --git a/music_assistant/stream_manager.py b/music_assistant/stream_manager.py index 990d71d6..2cfb66b6 100755 --- a/music_assistant/stream_manager.py +++ b/music_assistant/stream_manager.py @@ -6,267 +6,252 @@ of music with crossfade/gapless support (queue stream). """ import asyncio import gc +import gzip import io import logging +import os import shlex -import subprocess -import threading -import urllib -from contextlib import suppress +from enum import Enum +from typing import AsyncGenerator, List, Optional, Tuple -import aiohttp import pyloudnorm import soundfile from aiofile import AIOFile, Reader -from aiohttp import web from music_assistant.constants import EVENT_STREAM_ENDED, EVENT_STREAM_STARTED -from music_assistant.models.media_types import MediaType -from music_assistant.models.player_queue import QueueItem +from music_assistant.helpers.typing import MusicAssistantType from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType -from music_assistant.utils import create_tempfile, decrypt_string, get_ip, try_parse_int -from music_assistant.web import require_local_subnet +from music_assistant.utils import ( + create_tempfile, + decrypt_bytes, + decrypt_string, + encrypt_bytes, + get_ip, + try_parse_int, + yield_chunks, +) LOGGER = logging.getLogger("mass") -MusicAssistantType = "MusicAssistant" + +class SoxOutputFormat(Enum): + """Enum representing the various output formats.""" + + MP3 = "mp3" # Lossy mp3 + OGG = "ogg" # Lossy Ogg Vorbis + FLAC = "flac" # Flac (with default compression) + S24 = "s24" # Raw PCM 24bits signed + S32 = "s32" # Raw PCM 32bits signed + S64 = "s64" # Raw PCM 64bits signed class StreamManager: """Built-in streamer utilizing SoX.""" - def __init__(self, mass: MusicAssistantType): + def __init__(self, mass: MusicAssistantType) -> None: """Initialize class.""" self.mass = mass self.local_ip = get_ip() self.analyze_jobs = {} - self.stream_clients = [] - - async def async_get_audio_stream(self, streamdetails: StreamDetails): - """Get the (original) audio data for the given streamdetails. Generator.""" - stream_path = decrypt_string(streamdetails.path) - stream_type = StreamType(streamdetails.type) - - if streamdetails.content_type == ContentType.AAC: - # support for AAC created with ffmpeg in between - stream_type = StreamType.EXECUTABLE - streamdetails.content_type = ContentType.FLAC - stream_path = f'ffmpeg -v quiet -i "{stream_path}" -f flac -' - if stream_type == StreamType.URL: - async with self.mass.http_session.get(stream_path) as response: - async for chunk in response.content.iter_any(): - yield chunk - elif stream_type == StreamType.FILE: - async with AIOFile(stream_path) as afp: - async for chunk in Reader(afp): - yield chunk - elif stream_type == StreamType.EXECUTABLE: - args = shlex.split(stream_path) - process = await asyncio.create_subprocess_exec( - *args, stdout=asyncio.subprocess.PIPE - ) - try: - async for chunk in process.stdout: - yield chunk - except (asyncio.CancelledError, StopAsyncIteration, GeneratorExit) as exc: - LOGGER.error("process aborted") - raise exc - finally: - process.terminate() - while True: - data = await process.stdout.read() - if not data: - break - LOGGER.error("process ended") - - async def async_stream_media_item(self, http_request): - """Start stream for a single media item, player independent.""" - # make sure we have valid params - media_type = MediaType.from_string(http_request.match_info["media_type"]) - if media_type not in [MediaType.Track, MediaType.Radio]: - return web.Response(status=404, reason="Media item is not playable!") - provider = http_request.match_info["provider"] - item_id = http_request.match_info["item_id"] - player_id = http_request.remote # fake player id - # prepare headers as audio/flac content - resp = web.StreamResponse( - status=200, reason="OK", headers={"Content-Type": "audio/flac"} + async def async_get_sox_stream( + self, + streamdetails: StreamDetails, + output_format: SoxOutputFormat = SoxOutputFormat.FLAC, + resample: Optional[int] = None, + gain_db_adjust: Optional[float] = None, + chunk_size: int = 128000, + ) -> AsyncGenerator[Tuple[bool, bytes], None]: + """Get the sox manipulated audio data for the given streamdetails.""" + # collect all args for sox + if output_format in [ + SoxOutputFormat.S24, + SoxOutputFormat.S32, + SoxOutputFormat.S64, + ]: + output_format = [output_format.value, "-c", "2"] + else: + output_format = [output_format.value] + args = ( + ["sox", "-t", streamdetails.content_type.value, "-", "-t"] + + output_format + + ["-"] ) - await resp.prepare(http_request) - # collect tracks to play - media_item = await self.mass.music_manager.async_get_item( - item_id, provider, media_type + if gain_db_adjust: + args += ["vol", str(gain_db_adjust), "dB"] + if resample: + args += ["rate", "-v", str(resample)] + LOGGER.debug( + "[async_get_sox_stream] [%s/%s] started using args: %s", + streamdetails.provider, + streamdetails.item_id, + " ".join(args), ) - queue_item = QueueItem(media_item) - # run the streamer in executor to prevent the subprocess locking up our eventloop - cancelled = threading.Event() - bg_task = self.mass.loop.run_in_executor( - None, - self.__get_queue_item_stream, - player_id, - queue_item, - resp, - cancelled, + # init the process with stdin/out pipes + sox_proc = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE, + bufsize=0, ) - # let the streaming begin! + + async def fill_buffer(): + """Forward audio chunks to sox stdin.""" + LOGGER.debug( + "[async_get_sox_stream] [%s/%s] fill_buffer started", + streamdetails.provider, + streamdetails.item_id, + ) + # feed audio data into sox stdin for processing + async for chunk in self.async_get_media_stream(streamdetails): + sox_proc.stdin.write(chunk) + await sox_proc.stdin.drain() + sox_proc.stdin.write_eof() + await sox_proc.stdin.drain() + LOGGER.debug( + "[async_get_sox_stream] [%s/%s] fill_buffer finished", + streamdetails.provider, + streamdetails.item_id, + ) + + fill_buffer_task = self.mass.loop.create_task(fill_buffer()) try: - await asyncio.gather(bg_task) - except ( - asyncio.CancelledError, - aiohttp.ClientConnectionError, - asyncio.TimeoutError, - ) as exc: - cancelled.set() - raise exc # re-raise - return resp - - @require_local_subnet - async def async_stream(self, http_request): - """Start stream for a player.""" - # make sure we have valid params - player_id = http_request.match_info.get("player_id", "") - player_queue = self.mass.player_manager.get_player_queue(player_id) - if not player_queue: - return web.Response(status=404, reason="Player(queue) not found!") - if not player_queue.use_queue_stream: - queue_item_id = http_request.match_info.get("queue_item_id") - queue_item = player_queue.by_item_id(queue_item_id) - if not queue_item: - return web.Response(status=404, reason="Invalid Queue item Id") - # prepare headers as audio/flac content - resp = web.StreamResponse( - status=200, reason="OK", headers={"Content-Type": "audio/flac"} - ) - await resp.prepare(http_request) - # run the streamer in executor to prevent the subprocess locking up our eventloop - cancelled = threading.Event() - if player_queue.use_queue_stream: - bg_task = self.mass.loop.run_in_executor( - None, self.__get_queue_stream, player_id, resp, cancelled + # yield chunks from stdout + # we keep 1 chunk behind to detect end of stream properly + prev_chunk = b"" + while True: + # read exactly chunksize of data + try: + chunk = await sox_proc.stdout.readexactly(chunk_size) + except asyncio.IncompleteReadError as exc: + chunk = exc.partial + if len(chunk) < chunk_size: + # last chunk + yield (True, prev_chunk + chunk) + break + if prev_chunk: + yield (False, prev_chunk) + prev_chunk = chunk + + await asyncio.wait([fill_buffer_task]) + LOGGER.debug( + "[async_get_sox_stream] [%s/%s] finished", + streamdetails.provider, + streamdetails.item_id, ) + except (GeneratorExit, Exception): # pylint: disable=broad-except + LOGGER.warning( + "[async_get_sox_stream] [%s/%s] aborted", + streamdetails.provider, + streamdetails.item_id, + ) + if fill_buffer_task and not fill_buffer_task.cancelled(): + fill_buffer_task.cancel() + sox_proc.terminate() + await sox_proc.communicate() + await sox_proc.wait() + # raise GeneratorExit from exc else: - bg_task = self.mass.loop.run_in_executor( - None, - self.__get_queue_item_stream, - player_id, - queue_item, - resp, - cancelled, + LOGGER.debug( + "[async_get_sox_stream] [%s/%s] finished", + streamdetails.provider, + streamdetails.item_id, ) - # let the streaming begin! - try: - await asyncio.gather(bg_task) - except ( - asyncio.CancelledError, - aiohttp.ClientConnectionError, - asyncio.TimeoutError, - ) as exc: - cancelled.set() - raise exc # re-raise - return resp - - def __get_queue_item_stream(self, player_id, queue_item, buffer, cancelled): - """Start streaming single queue track.""" - # pylint: disable=unused-variable + + async def async_queue_stream_flac(self, player_id) -> AsyncGenerator[bytes, None]: + """Stream the PlayerQueue's tracks as constant feed in flac format.""" + + args = ["sox", "-t", "s32", "-c", "2", "-r", "96000", "-", "-t", "flac", "-"] + sox_proc = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE, + ) LOGGER.debug( - "stream single queue track started for track %s on player %s", - queue_item.name, + "[async_queue_stream_flac] [%s] started using args: %s", player_id, + " ".join(args), ) - for is_last_chunk, audio_chunk in self.__get_audio_stream( - player_id, queue_item, cancelled - ): - if cancelled.is_set(): - # http session ended - # we must consume the data to prevent hanging subprocess instances - continue - # put chunk in buffer - with suppress((BrokenPipeError, ConnectionResetError)): - asyncio.run_coroutine_threadsafe( - buffer.write(audio_chunk), self.mass.loop - ).result() - # all chunks received: streaming finished - if cancelled.is_set(): + chunk_size = 571392 # 74,7% of pcm + + # feed stdin with pcm samples + async def fill_buffer(): + """Feed audio data into sox stdin for processing.""" LOGGER.debug( - "stream single track interrupted for track %s on player %s", - queue_item.name, - player_id, + "[async_queue_stream_flac] [%s] fill buffer started", player_id + ) + async for chunk in self.async_queue_stream_pcm(player_id, 96000, 32): + sox_proc.stdin.write(chunk) + await sox_proc.stdin.drain() + sox_proc.stdin.write_eof() + await sox_proc.stdin.drain() + LOGGER.debug( + "[async_queue_stream_flac] [%s] fill buffer finished", player_id ) - else: - # indicate EOF if no more data - with suppress((BrokenPipeError, ConnectionResetError)): - asyncio.run_coroutine_threadsafe( - buffer.write_eof(), self.mass.loop - ).result() + fill_buffer_task = self.mass.loop.create_task(fill_buffer()) + try: + # yield flac chunks from stdout + while True: + try: + chunk = await sox_proc.stdout.readexactly(chunk_size) + yield chunk + except asyncio.IncompleteReadError as exc: + chunk = exc.partial + yield chunk + break + except (GeneratorExit, Exception): # pylint: disable=broad-except + LOGGER.debug("[async_queue_stream_flac] [%s] aborted", player_id) + if fill_buffer_task and not fill_buffer_task.cancelled(): + fill_buffer_task.cancel() + sox_proc.terminate() + await sox_proc.communicate() + await sox_proc.wait() + else: LOGGER.debug( - "stream single track finished for track %s on player %s", - queue_item.name, + "[async_queue_stream_flac] [%s] finished", player_id, ) - def __get_queue_stream(self, player_id, buffer, cancelled): - """Start streaming all queue tracks.""" - player_conf = self.mass.config.get_player_config(player_id) + async def async_queue_stream_pcm( + self, player_id, sample_rate=96000, bit_depth=32 + ) -> AsyncGenerator[bytes, None]: + """Stream the PlayerQueue's tracks as constant feed in PCM raw audio.""" player_queue = self.mass.player_manager.get_player_queue(player_id) - sample_rate = try_parse_int(player_conf["max_sample_rate"]) - fade_length = try_parse_int(player_conf["crossfade_duration"]) - if not sample_rate or sample_rate < 44100: - sample_rate = 96000 + queue_conf = self.mass.config.get_player_config(player_id) + fade_length = try_parse_int(queue_conf["crossfade_duration"]) + pcm_args = ["s32", "-c", "2", "-r", str(sample_rate)] + chunk_size = int(sample_rate * (bit_depth / 8) * 2) # 1 second if fade_length: - fade_bytes = int(sample_rate * 4 * 2 * fade_length) + buffer_size = chunk_size * fade_length else: - fade_bytes = int(sample_rate * 4 * 2 * 6) - pcm_args = "raw -b 32 -c 2 -e signed-integer -r %s" % sample_rate - args = "sox -t %s - -t flac -C 0 -" % pcm_args - # start sox process - args = shlex.split(args) - sox_proc = subprocess.Popen( - args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE - ) - - def fill_buffer(): - while True: - chunk = sox_proc.stdout.read(128000) # noqa - if not chunk: - break - if chunk and not cancelled.is_set(): - with suppress((BrokenPipeError, ConnectionResetError)): - asyncio.run_coroutine_threadsafe( - buffer.write(chunk), self.mass.loop - ).result() - del chunk - # indicate EOF if no more data - if not cancelled.is_set(): - with suppress((BrokenPipeError, ConnectionResetError)): - asyncio.run_coroutine_threadsafe( - buffer.write_eof(), self.mass.loop - ).result() - - # start fill buffer task in background - fill_buffer_thread = threading.Thread(target=fill_buffer) - fill_buffer_thread.start() + buffer_size = chunk_size * 10 LOGGER.info("Start Queue Stream for player %s ", player_id) + is_start = True last_fadeout_data = b"" while True: - if cancelled.is_set(): - break + # get the (next) track in queue if is_start: # report start of queue playback so we can calculate current track/duration etc. - queue_track = self.mass.add_job( - player_queue.async_start_queue_stream() - ).result() + queue_track = await player_queue.async_start_queue_stream() is_start = False else: queue_track = player_queue.next_item if not queue_track: LOGGER.debug("no (more) tracks left in queue") break + # get streamdetails + streamdetails = await self.mass.music_manager.async_get_stream_details( + queue_track, player_id + ) + # get gain correct / replaygain + gain_correct = await self.mass.player_manager.async_get_gain_correct( + player_id, streamdetails.item_id, streamdetails.provider + ) LOGGER.debug( - "Start Streaming queue track: %s (%s) on player %s", + "Start Streaming queue track: %s (%s) for player %s", queue_track.item_id, queue_track.name, player_id, @@ -276,12 +261,12 @@ class StreamManager: prev_chunk = None bytes_written = 0 # handle incoming audio chunks - for is_last_chunk, chunk in self.__get_audio_stream( - player_id, - queue_track, - cancelled, - chunksize=fade_bytes, + async for is_last_chunk, chunk in self.mass.stream_manager.async_get_sox_stream( + streamdetails, + SoxOutputFormat.S32, resample=sample_rate, + gain_db_adjust=gain_correct, + chunk_size=buffer_size, ): cur_chunk += 1 @@ -291,7 +276,8 @@ class StreamManager: break if cur_chunk <= 2 and not last_fadeout_data: # no fadeout_part available so just pass it to the output directly - sox_proc.stdin.write(chunk) + for small_chunk in yield_chunks(chunk, chunk_size): + yield small_chunk bytes_written += len(chunk) del chunk elif cur_chunk == 1 and last_fadeout_data: @@ -300,31 +286,28 @@ class StreamManager: # HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN elif cur_chunk == 2 and last_fadeout_data: # combine the first 2 chunks and strip off silence - args = "sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%" % ( - pcm_args, - pcm_args, - ) - first_part, _ = subprocess.Popen( - args, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE - ).communicate(prev_chunk + chunk) - if len(first_part) < fade_bytes: + first_part = await async_strip_silence(prev_chunk + chunk, pcm_args) + if len(first_part) < buffer_size: # part is too short after the strip action?! # so we just use the full first part first_part = prev_chunk + chunk - fade_in_part = first_part[:fade_bytes] - remaining_bytes = first_part[fade_bytes:] + fade_in_part = first_part[:buffer_size] + remaining_bytes = first_part[buffer_size:] del first_part # do crossfade - crossfade_part = self.__crossfade_pcm_parts( + crossfade_part = await async_crossfade_pcm_parts( fade_in_part, last_fadeout_data, pcm_args, fade_length ) - sox_proc.stdin.write(crossfade_part) + # send crossfade_part + for small_chunk in yield_chunks(crossfade_part, chunk_size): + yield small_chunk bytes_written += len(crossfade_part) del crossfade_part del fade_in_part last_fadeout_data = b"" # also write the leftover bytes from the strip action - sox_proc.stdin.write(remaining_bytes) + for small_chunk in yield_chunks(remaining_bytes, chunk_size): + yield small_chunk bytes_written += len(remaining_bytes) del remaining_bytes del chunk @@ -334,37 +317,35 @@ class StreamManager: # last chunk received so create the last_part # with the previous chunk and this chunk # and strip off silence - args = ( - "sox --ignore-length -t %s - -t %s - reverse silence 1 0.1 1%% reverse" - % (pcm_args, pcm_args) + last_part = await async_strip_silence( + prev_chunk + chunk, pcm_args, True ) - last_part, _ = subprocess.Popen( - args, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE - ).communicate(prev_chunk + chunk) - if len(last_part) < fade_bytes: + if len(last_part) < buffer_size: # part is too short after the strip action # so we just use the entire original data last_part = prev_chunk + chunk - if len(last_part) < fade_bytes: + if len(last_part) < buffer_size: LOGGER.warning( "Not enough data for crossfade: %s", len(last_part) ) if ( not player_queue.crossfade_enabled - or len(last_part) < fade_bytes + or len(last_part) < buffer_size ): # crossfading is not enabled so just pass the (stripped) audio data - sox_proc.stdin.write(last_part) + for small_chunk in yield_chunks(last_part, chunk_size): + yield small_chunk bytes_written += len(last_part) del last_part del chunk else: # handle crossfading support # store fade section to be picked up for next track - last_fadeout_data = last_part[-fade_bytes:] - remaining_bytes = last_part[:-fade_bytes] + last_fadeout_data = last_part[-buffer_size:] + remaining_bytes = last_part[:-buffer_size] # write remaining bytes - sox_proc.stdin.write(remaining_bytes) + for small_chunk in yield_chunks(remaining_bytes, chunk_size): + yield small_chunk bytes_written += len(remaining_bytes) del last_part del remaining_bytes @@ -375,21 +356,19 @@ class StreamManager: # keep previous chunk in memory so we have enough # samples to perform the crossfade if prev_chunk: - sox_proc.stdin.write(prev_chunk) + for small_chunk in yield_chunks(prev_chunk, chunk_size): + yield small_chunk bytes_written += len(prev_chunk) prev_chunk = chunk else: prev_chunk = chunk del chunk # end of the track reached - if cancelled.is_set(): - # break out the loop if the http session is cancelled - break # update actual duration to the queue for more accurate now playing info - accurate_duration = bytes_written / int(sample_rate * 4 * 2) + accurate_duration = bytes_written / chunk_size queue_track.duration = accurate_duration LOGGER.debug( - "Finished Streaming queue track: %s (%s) on player %s", + "Finished Streaming queue track: %s (%s) on queue %s", queue_track.item_id, queue_track.name, player_id, @@ -397,103 +376,113 @@ class StreamManager: # run garbage collect manually to avoid too much memory fragmentation gc.collect() # end of queue reached, pass last fadeout bits to final output - if last_fadeout_data and not cancelled.is_set(): - sox_proc.stdin.write(last_fadeout_data) - del last_fadeout_data + for small_chunk in yield_chunks(last_fadeout_data, chunk_size): + yield small_chunk + del last_fadeout_data # END OF QUEUE STREAM - sox_proc.stdin.close() - sox_proc.terminate() - sox_proc.communicate() - fill_buffer_thread.join() # run garbage collect manually to avoid too much memory fragmentation gc.collect() - if cancelled.is_set(): - LOGGER.info("streaming of queue for player %s interrupted", player_id) - else: - LOGGER.info("streaming of queue for player %s completed", player_id) - - def __get_audio_stream( - self, player_id, queue_item, cancelled, chunksize=128000, resample=None - ): - """Get audio stream from provider and apply additional effects/processing if needed.""" - streamdetails = self.mass.add_job( - self.mass.music_manager.async_get_stream_details(queue_item, player_id) - ).result() - if not streamdetails: - LOGGER.warning("no stream details for %s", queue_item.name) - yield (True, b"") - return - # get sox effects and resample options - sox_options = self.__get_player_sox_options(player_id, streamdetails) - outputfmt = "flac -C 0" - if resample: - outputfmt = "raw -b 32 -c 2 -e signed-integer" - sox_options += " rate -v %s" % resample - streamdetails.sox_options = sox_options - # determine how to proceed based on input file type + LOGGER.info("streaming of queue for player %s completed", player_id) + + async def async_stream_queue_item( + self, player_id: str, queue_item_id: str + ) -> AsyncGenerator[bytes, None]: + """Stream a single Queue item.""" + # collect streamdetails + player_queue = self.mass.player_manager.get_player_queue(player_id) + if not player_queue: + raise FileNotFoundError("invalid player_id") + queue_item = player_queue.by_item_id(queue_item_id) + if not queue_item: + raise FileNotFoundError("invalid queue_item_id") + streamdetails = await self.mass.music_manager.async_get_stream_details( + queue_item, player_id + ) + + # get gain correct / replaygain + gain_correct = await self.mass.player_manager.async_get_gain_correct( + player_id, streamdetails.item_id, streamdetails.provider + ) + # start streaming + async for _, audio_chunk in self.async_get_sox_stream( + streamdetails, gain_db_adjust=gain_correct + ): + yield audio_chunk + + async def async_get_media_stream( + self, streamdetails: StreamDetails + ) -> AsyncGenerator[bytes, None]: + """Get the (original/untouched) audio data for the given streamdetails. Generator.""" + stream_path = decrypt_string(streamdetails.path) + stream_type = StreamType(streamdetails.type) + audio_data = b"" + + # Handle (optional) caching of audio data + cache_file = "/tmp/" + f"{streamdetails.item_id}{streamdetails.provider}"[::-1] + if os.path.isfile(cache_file): + with gzip.open(cache_file, "rb") as _file: + audio_data = decrypt_bytes(_file.read()) + if audio_data: + stream_type = StreamType.CACHE + + # support for AAC created with ffmpeg in between if streamdetails.content_type == ContentType.AAC: - # support for AAC created with ffmpeg in between - args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % ( - decrypt_string(streamdetails.path), - outputfmt, - sox_options, - ) - process = subprocess.Popen( - args, shell=True, stdout=subprocess.PIPE, bufsize=chunksize - ) - elif streamdetails.type in [StreamType.URL, StreamType.FILE]: - args = 'sox -t %s "%s" -t %s - %s' % ( - streamdetails.content_type.name, - decrypt_string(streamdetails.path), - outputfmt, - sox_options, - ) - args = shlex.split(args) - process = subprocess.Popen( - args, shell=False, stdout=subprocess.PIPE, bufsize=chunksize - ) - elif streamdetails.type == StreamType.EXECUTABLE: - args = "%s | sox -t %s - -t %s - %s" % ( - decrypt_string(streamdetails.path), - streamdetails.content_type.name, - outputfmt, - sox_options, - ) - process = subprocess.Popen( - args, shell=True, stdout=subprocess.PIPE, bufsize=chunksize - ) - else: - LOGGER.warning("no streaming options for %s", queue_item.name) - yield (True, b"") - return - # fire event that streaming has started for this track + stream_type = StreamType.EXECUTABLE + streamdetails.content_type = ContentType.FLAC + stream_path = f'ffmpeg -v quiet -i "{stream_path}" -f flac -' + + # signal start of stream event self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails) - # yield chunks from stdout - # we keep 1 chunk behind to detect end of stream properly - prev_chunk = b"" - while True: - if cancelled.is_set(): - # http session ended - # send terminate and pick up left over bytes + LOGGER.debug( + "[async_get_media_stream] [%s/%s] started, using %s", + streamdetails.provider, + streamdetails.item_id, + stream_type, + ) + + if stream_type == StreamType.CACHE: + yield audio_data + elif stream_type == StreamType.URL: + async with self.mass.http_session.get(stream_path) as response: + async for chunk in response.content.iter_any(): + audio_data += chunk + yield chunk + elif stream_type == StreamType.FILE: + async with AIOFile(stream_path) as afp: + async for chunk in Reader(afp): + audio_data += chunk + yield chunk + elif stream_type == StreamType.EXECUTABLE: + args = shlex.split(stream_path) + process = await asyncio.create_subprocess_exec( + *args, stdout=asyncio.subprocess.PIPE + ) + try: + async for chunk in process.stdout: + audio_data += chunk + yield chunk + except (GeneratorExit, Exception) as exc: # pylint: disable=broad-except + LOGGER.warning( + "[async_get_media_stream] [%s/%s] Aborted: %s", + streamdetails.provider, + streamdetails.item_id, + str(exc), + ) + # read remaining bytes process.terminate() - chunk, _ = process.communicate() - else: - # read exactly chunksize of data - chunk = process.stdout.read(chunksize) - if len(chunk) < chunksize: - # last chunk - yield (True, prev_chunk + chunk) - break - if prev_chunk: - yield (False, prev_chunk) - prev_chunk = chunk - # fire event that streaming has ended - if not cancelled.is_set(): - streamdetails.seconds_played = queue_item.duration - self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails) - # send task to background to analyse the audio - if queue_item.media_type == MediaType.Track: - self.mass.add_job(self.__analyze_audio, streamdetails) + await process.communicate() + await process.wait() + + # signal end of stream event + self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails) + + # send analyze job to background worker + self.mass.add_job(self.__analyze_audio, streamdetails, audio_data) + LOGGER.debug( + "[async_get_media_stream] [%s/%s] Finished", + streamdetails.provider, + streamdetails.item_id, + ) def __get_player_sox_options( self, player_id: str, streamdetails: StreamDetails @@ -518,12 +507,18 @@ class StreamManager: sox_options.append(player_conf["sox_options"]) return " ".join(sox_options) - def __analyze_audio(self, streamdetails): + def __analyze_audio(self, streamdetails, audio_data) -> None: """Analyze track audio, for now we only calculate EBU R128 loudness.""" item_key = "%s%s" % (streamdetails.item_id, streamdetails.provider) if item_key in self.analyze_jobs: return # prevent multiple analyze jobs for same track self.analyze_jobs[item_key] = True + # do we need saving to disk ? + cache_file = "/tmp/" + f"{streamdetails.item_id}{streamdetails.provider}"[::-1] + if not os.path.isfile(cache_file): + with gzip.open(cache_file, "wb") as _file: + _file.write(encrypt_bytes(audio_data)) + # get track loudness track_loudness = self.mass.add_job( self.mass.database.async_get_track_loudness( streamdetails.item_id, streamdetails.provider @@ -532,17 +527,6 @@ class StreamManager: if track_loudness is None: # only when needed we do the analyze stuff LOGGER.debug("Start analyzing track %s", item_key) - if streamdetails.type == StreamType.URL: - audio_data = urllib.request.urlopen( - decrypt_string(streamdetails.path) - ).read() - elif streamdetails.type == StreamType.EXECUTABLE: - audio_data = subprocess.check_output( - decrypt_string(streamdetails.path), shell=True - ) - elif streamdetails.type == StreamType.FILE: - with open(decrypt_string(streamdetails.path), "rb") as _file: - audio_data = _file.read() # calculate BS.1770 R128 integrated loudness with io.BytesIO(audio_data) as tmpfile: data, rate = soundfile.read(tmpfile) @@ -554,53 +538,56 @@ class StreamManager: streamdetails.item_id, streamdetails.provider, loudness ) ) - del audio_data LOGGER.debug("Integrated loudness of track %s is: %s", item_key, loudness) + del audio_data self.analyze_jobs.pop(item_key, None) - @staticmethod - def __crossfade_pcm_parts(fade_in_part, fade_out_part, pcm_args, fade_length): - """Crossfade two chunks of audio using sox.""" - # create fade-in part - fadeinfile = create_tempfile() - args = "sox --ignore-length -t %s - -t %s %s fade t %s" % ( - pcm_args, - pcm_args, - fadeinfile.name, - fade_length, - ) - args = shlex.split(args) - process = subprocess.Popen(args, shell=False, stdin=subprocess.PIPE) - process.communicate(fade_in_part) - # create fade-out part - fadeoutfile = create_tempfile() - args = "sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse" % ( - pcm_args, - pcm_args, - fadeoutfile.name, - fade_length, - ) - args = shlex.split(args) - process = subprocess.Popen( - args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE - ) - process.communicate(fade_out_part) - # create crossfade using sox and some temp files - # TODO: figure out how to make this less complex and without the tempfiles - args = "sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -" % ( - pcm_args, - fadeoutfile.name, - pcm_args, - fadeinfile.name, - pcm_args, - ) - args = shlex.split(args) - process = subprocess.Popen( - args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE - ) - crossfade_part, _ = process.communicate() - fadeinfile.close() - fadeoutfile.close() - del fadeinfile - del fadeoutfile - return crossfade_part + +async def async_crossfade_pcm_parts( + fade_in_part: bytes, fade_out_part: bytes, pcm_args: List[str], fade_length: int +) -> bytes: + """Crossfade two chunks of pcm/raw audio using sox.""" + # create fade-in part + fadeinfile = create_tempfile() + args = ["sox", "--ignore-length", "-t"] + pcm_args + args += ["-", "-t"] + pcm_args + [fadeinfile.name, "fade", "t", str(fade_length)] + process = await asyncio.create_subprocess_exec(*args, stdin=asyncio.subprocess.PIPE) + await process.communicate(fade_in_part) + # create fade-out part + fadeoutfile = create_tempfile() + args = ["sox", "--ignore-length", "-t"] + pcm_args + ["-", "-t"] + pcm_args + args += [fadeoutfile.name, "reverse", "fade", "t", str(fade_length), "reverse"] + process = await asyncio.create_subprocess_exec( + *args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE + ) + await process.communicate(fade_out_part) + # create crossfade using sox and some temp files + # TODO: figure out how to make this less complex and without the tempfiles + args = ["sox", "-m", "-v", "1.0", "-t"] + pcm_args + [fadeoutfile.name, "-v", "1.0"] + args += ["-t"] + pcm_args + [fadeinfile.name, "-t"] + pcm_args + ["-"] + process = await asyncio.create_subprocess_exec( + *args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE + ) + crossfade_part, _ = await process.communicate() + fadeinfile.close() + fadeoutfile.close() + del fadeinfile + del fadeoutfile + return crossfade_part + + +async def async_strip_silence( + audio_data: bytes, pcm_args: List[str], reverse=False +) -> bytes: + """Strip silence from (a chunk of) pcm audio.""" + args = ["sox", "--ignore-length", "-t"] + pcm_args + ["-", "-t"] + pcm_args + ["-"] + if reverse: + args.append("reverse") + args += ["silence", "1", "0.1", "1%"] + if reverse: + args.append("reverse") + process = await asyncio.create_subprocess_exec( + *args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE + ) + stripped_data, _ = await process.communicate(audio_data) + return stripped_data diff --git a/music_assistant/utils.py b/music_assistant/utils.py index 5f3fdd7b..ab041bf7 100755 --- a/music_assistant/utils.py +++ b/music_assistant/utils.py @@ -6,10 +6,12 @@ import os import platform import re import socket +import struct import tempfile import urllib.request from datetime import datetime from enum import Enum +from io import BytesIO from typing import Any, Callable, TypeVar import memory_tempfile @@ -305,6 +307,18 @@ def encrypt_string(str_value): return Fernet(get_app_var(3)).encrypt(str_value.encode()).decode() +def encrypt_bytes(bytes_value): + """Encrypt bytes with Fernet.""" + return Fernet(get_app_var(3)).encrypt(bytes_value) + + +def yield_chunks(_obj, chunk_size): + """Yield successive n-sized chunks from list/str/bytes.""" + chunk_size = int(chunk_size) + for i in range(0, len(_obj), chunk_size): + yield _obj[i : i + chunk_size] + + def decrypt_string(str_value): """Decrypt a string with Fernet.""" try: @@ -313,6 +327,14 @@ def decrypt_string(str_value): return None +def decrypt_bytes(bytes_value): + """Decrypt bytes with Fernet.""" + try: + return Fernet(get_app_var(3)).decrypt(bytes_value) + except InvalidToken: + return None + + class CustomIntEnum(int, Enum): """Base for IntEnum with some helpers.""" @@ -340,3 +362,50 @@ class CustomIntEnum(int, Enum): if key.lower() == string or value == try_parse_int(string): return value return KeyError + + +def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=3600): + """Generate a wave header from given params.""" + file = BytesIO() + numsamples = samplerate * duration + + # Generate format chunk + format_chunk_spec = b"<4sLHHLLHH" + format_chunk = struct.pack( + format_chunk_spec, + b"fmt ", # Chunk id + 16, # Size of this chunk (excluding chunk id and this field) + 1, # Audio format, 1 for PCM + channels, # Number of channels + int(samplerate), # Samplerate, 44100, 48000, etc. + int(samplerate * channels * (bitspersample / 8)), # Byterate + int(channels * (bitspersample / 8)), # Blockalign + bitspersample, # 16 bits for two byte samples, etc. + ) + # Generate data chunk + data_chunk_spec = b"<4sL" + datasize = int(numsamples * channels * (bitspersample / 8)) + data_chunk = struct.pack( + data_chunk_spec, + b"data", # Chunk id + int(datasize), # Chunk size (excluding chunk id and this field) + ) + sum_items = [ + # "WAVE" string following size field + 4, + # "fmt " + chunk size field + chunk size + struct.calcsize(format_chunk_spec), + # Size of data chunk spec + data size + struct.calcsize(data_chunk_spec) + datasize, + ] + # Generate main header + all_chunks_size = int(sum(sum_items)) + main_header_spec = b"<4sL4s" + main_header = struct.pack(main_header_spec, b"RIFF", all_chunks_size, b"WAVE") + # Write all the contents in + file.write(main_header) + file.write(format_chunk) + file.write(data_chunk) + + # return file.getvalue(), all_chunks_size + 8 + return file.getvalue() diff --git a/music_assistant/web.py b/music_assistant/web.py index 43ff0a74..f3c97e9e 100755 --- a/music_assistant/web.py +++ b/music_assistant/web.py @@ -124,21 +124,6 @@ class Web: app.add_routes(routes) app.add_routes( [ - web.get( - "/stream/{player_id}", - self.mass.http_streamer.async_stream, - allow_head=False, - ), - web.get( - "/stream/{player_id}/{queue_item_id}", - self.mass.http_streamer.async_stream, - allow_head=False, - ), - web.get( - "/stream_media/{media_type}/{provider}/{item_id}", - self.mass.http_streamer.async_stream_media_item, - allow_head=False, - ), web.get("/", self.async_index), web.post("/login", self.async_login), web.get("/jsonrpc.js", self.async_json_rpc), @@ -254,6 +239,97 @@ class Web: raise web.HTTPFound("https://music-assistant.github.io/app") return web.FileResponse(os.path.join(webdir, "index.html")) + @routes.get("/stream/media/{media_type}/{item_id}") + async def stream_media(self, request): + """Stream a single audio track.""" + media_type = MediaType.from_string(request.match_info["media_type"]) + if media_type not in [MediaType.Track, MediaType.Radio]: + return web.Response(status=404, reason="Media item is not playable!") + item_id = request.match_info["item_id"] + provider = request.rel_url.query.get("provider", "database") + media_item = await self.mass.music_manager.async_get_item( + item_id, provider, media_type + ) + streamdetails = await self.mass.music_manager.async_get_stream_details( + media_item + ) + + # prepare request + content_type = streamdetails.content_type.value + resp = web.StreamResponse( + status=200, reason="OK", headers={"Content-Type": f"audio/{content_type}"} + ) + resp.enable_chunked_encoding() + await resp.prepare(request) + + # stream track + async for audio_chunk in self.mass.stream_manager.async_get_stream( + streamdetails + ): + await resp.write(audio_chunk) + return resp + + @routes.get("/stream/queue/{player_id}") + async def stream_queue(self, request): + """Stream a player's queue.""" + player_id = request.match_info["player_id"] + if not self.mass.player_manager.get_player_queue(player_id): + return web.Response(text="invalid queue", status=404) + + # prepare request + resp = web.StreamResponse( + status=200, reason="OK", headers={"Content-Type": "audio/flac"} + ) + resp.enable_chunked_encoding() + await resp.prepare(request) + + # stream queue + async for audio_chunk in self.mass.stream_manager.async_queue_stream_flac( + player_id + ): + await resp.write(audio_chunk) + return resp + + @routes.get("/stream/queue/{player_id}/{queue_item_id}") + async def stream_queue_item(self, request): + """Stream a single queue item.""" + player_id = request.match_info["player_id"] + queue_item_id = request.match_info["queue_item_id"] + + # prepare request + resp = web.StreamResponse( + status=200, reason="OK", headers={"Content-Type": "audio/flac"} + ) + resp.enable_chunked_encoding() + await resp.prepare(request) + + async for audio_chunk in self.mass.stream_manager.async_stream_queue_item( + player_id, queue_item_id + ): + await resp.write(audio_chunk) + return resp + + @routes.get("/stream/group/{group_player_id}") + async def stream_group(self, request): + """Handle streaming to all players of a group. Highly experimental.""" + group_player_id = request.match_info["group_player_id"] + if not self.mass.player_manager.get_player_queue(group_player_id): + return web.Response(text="invalid player id", status=404) + child_player_id = request.rel_url.query.get("player_id", request.remote) + + # prepare request + resp = web.StreamResponse( + status=200, reason="OK", headers={"Content-Type": "audio/flac"} + ) + resp.enable_chunked_encoding() + await resp.prepare(request) + + # stream queue + player = self.mass.player_manager.get_player(group_player_id) + async for audio_chunk in player.player.subscribe_stream_client(child_player_id): + await resp.write(audio_chunk) + return resp + @login_required @routes.get("/api/library/artists") async def async_library_artists(self, request): -- 2.34.1