From: marcelveldt Date: Sun, 13 Oct 2019 15:09:04 +0000 (+0200) Subject: fix issues with queue streaming X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=dbfb26a92c9f0f43e477cdefe5a568994db1783e;p=music-assistant-server.git fix issues with queue streaming --- diff --git a/main.py b/main.py index 084bc50e..7cb98257 100755 --- a/main.py +++ b/main.py @@ -13,9 +13,9 @@ if __name__ == "__main__": else: datapath = os.path.dirname(os.path.abspath(__file__)) if len(sys.argv) > 2: - debug = sys.argv[2] + debug = sys.argv[2] == "debug" else: - debug = True + debug = False MusicAssistant(datapath, debug) \ No newline at end of file diff --git a/music_assistant/__init__.py b/music_assistant/__init__.py index 6de619a7..9d08c84b 100644 --- a/music_assistant/__init__.py +++ b/music_assistant/__init__.py @@ -13,6 +13,7 @@ import slugify as unicode_slug import uuid import json import time +import logging from .database import Database from .utils import run_periodic, LOGGER, try_parse_bool @@ -33,12 +34,22 @@ class MusicAssistant(): def __init__(self, datapath, debug=False): debug = try_parse_bool(debug) + logformat = logging.Formatter('%(asctime)-15s %(levelname)-5s %(name)s.%(module)s -- %(message)s') + consolehandler = logging.StreamHandler() + consolehandler.setFormatter(logformat) + LOGGER.addHandler(consolehandler) + if debug: + LOGGER.setLevel(logging.DEBUG) + logging.getLogger('aiosqlite').setLevel(logging.INFO) + logging.getLogger('asyncio').setLevel(logging.INFO) + else: + LOGGER.setLevel(logging.INFO) uvloop.install() self.datapath = datapath self.parse_config() self.event_loop = asyncio.get_event_loop() self.event_loop.set_debug(debug) - self.bg_executor = ThreadPoolExecutor() + self.bg_executor = ThreadPoolExecutor() self.event_loop.set_default_executor(self.bg_executor) #self.event_loop.set_exception_handler(handle_exception) self.event_listeners = {} diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index 1e181a27..f9cac55a 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -43,26 +43,27 @@ class HTTPStreamer(): # send content only on GET request if http_request.method.upper() != 'HEAD': # stream audio - queue = asyncio.Queue() + buf_queue = asyncio.Queue() cancelled = threading.Event() if queue_item: # single stream requested + #asyncio.create_task(self.__stream_single(player, queue_item, buf_queue, cancelled)) run_async_background_task( self.mass.bg_executor, - self.__stream_single, player, queue_item, queue, cancelled) + self.__stream_single, player, queue_item, buf_queue, cancelled) else: # no item is given, start queue stream run_async_background_task( self.mass.bg_executor, - self.__stream_queue, player, queue, cancelled) + self.__stream_queue, player, buf_queue, cancelled) try: while True: - chunk = await queue.get() + chunk = await buf_queue.get() if not chunk: - queue.task_done() + buf_queue.task_done() break await resp.write(chunk) - queue.task_done() + buf_queue.task_done() LOGGER.info("stream fininished for player %s" % player.name) except asyncio.CancelledError: cancelled.set() @@ -72,42 +73,45 @@ class HTTPStreamer(): async def __stream_single(self, player, queue_item, buffer, cancelled): ''' start streaming single track from provider ''' + LOGGER.info("stream single track started for track %s on player %s" % (queue_item.name, player.name)) try: audio_stream = self.__get_audio_stream(player, queue_item, cancelled) async for is_last_chunk, audio_chunk in audio_stream: asyncio.run_coroutine_threadsafe( buffer.put(audio_chunk), self.mass.event_loop) - # wait for the queue to consume the data - # this prevents that the entire track is sitting in memory - while buffer.qsize() > 2 and not cancelled.is_set(): - await asyncio.sleep(1) # indicate EOF if no more data asyncio.run_coroutine_threadsafe( buffer.put(b''), self.mass.event_loop) except (asyncio.CancelledError, asyncio.TimeoutError): cancelled.set() - LOGGER.info("stream_track interrupted for %s" % queue_item.name) + LOGGER.info("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name)) raise asyncio.CancelledError() else: - LOGGER.info("stream_track fininished for %s" % queue_item.name) + LOGGER.info("stream single track finished for track %s on player %s" % (queue_item.name, player.name)) async def __stream_queue(self, player, buffer, cancelled): ''' start streaming all queue tracks ''' - sample_rate = player.settings['max_sample_rate'] - fade_length = player.settings["crossfade_duration"] - fade_bytes = int(sample_rate * 4 * 2 * fade_length) + sample_rate = try_parse_int(player.settings['max_sample_rate']) + fade_length = try_parse_int(player.settings["crossfade_duration"]) + if not sample_rate or sample_rate < 44100 or sample_rate > 384000: + sample_rate = 96000 + if fade_length: + fade_bytes = int(sample_rate * 4 * 2 * fade_length) + else: + fade_bytes = int(sample_rate * 4 * 2) pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate - args = 'sox -t %s - -t flac -C 0 -' % pcm_args + args = 'sox -V3 -t %s - -t flac -C 0 -' % pcm_args sox_proc = await asyncio.create_subprocess_shell(args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) async def fill_buffer(): - while not sox_proc.stdout.at_eof(): - chunk = await sox_proc.stdout.read(256000) - if not chunk: - break + while not sox_proc.stdout.at_eof() and not sox_proc.returncode: + try: + chunk = await sox_proc.stdout.readexactly(256000) + except asyncio.streams.IncompleteReadError as err: + chunk = err.partial asyncio.run_coroutine_threadsafe( buffer.put(chunk), self.mass.event_loop) @@ -131,24 +135,27 @@ class HTTPStreamer(): else: queue_track = player.queue.next_item if not queue_track: - LOGGER.warning("no (more) tracks left in queue") + LOGGER.debug("no (more) tracks left in queue") break - LOGGER.info("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) + LOGGER.debug("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) fade_in_part = b'' cur_chunk = 0 prev_chunk = None bytes_written = 0 + # handle incoming audio chunks async for is_last_chunk, chunk in self.__get_audio_stream( player, queue_track, cancelled, chunksize=fade_bytes, resample=sample_rate): cur_chunk += 1 + + ### HANDLE FIRST PART OF TRACK if cur_chunk <= 2 and not last_fadeout_data: - # fade-in part but no fadeout_part available so just pass it to the output directly + # no fadeout_part available so just pass it to the output directly sox_proc.stdin.write(chunk) await sox_proc.stdin.drain() bytes_written += len(chunk) - print(chunk) elif cur_chunk == 1 and last_fadeout_data: prev_chunk = chunk + ### HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN elif cur_chunk == 2 and last_fadeout_data: # combine the first 2 chunks and strip off silence args = 'sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%' % (pcm_args, pcm_args) @@ -172,6 +179,7 @@ class HTTPStreamer(): bytes_written += len(remaining_bytes) del remaining_bytes prev_chunk = None # needed to prevent this chunk being sent again + ### HANDLE LAST PART OF TRACK elif prev_chunk and is_last_chunk: # last chunk received so create the fadeout_part with the previous chunk and this chunk # and strip off silence @@ -179,26 +187,35 @@ class HTTPStreamer(): process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) last_part, stderr = await process.communicate(prev_chunk + chunk) - if len(last_part) < fade_bytes: - # not enough data for crossfade duration after the strip action... - last_part = prev_chunk + chunk - if len(last_part) < fade_bytes: - # still not enough data so we'll skip the crossfading - LOGGER.warning("not enough data for fadeout so skip crossfade... %s" % len(last_part)) + if not player.queue.crossfade_enabled: + # crossfading is not enabled so just pass the (stripped) audio data sox_proc.stdin.write(last_part) bytes_written += len(last_part) await sox_proc.stdin.drain() del last_part else: - # store fade section to be picked up for next track - last_fadeout_data = last_part[-fade_bytes:] - remaining_bytes = last_part[:-fade_bytes] - # write remaining bytes - sox_proc.stdin.write(remaining_bytes) - bytes_written += len(remaining_bytes) - await sox_proc.stdin.drain() - del last_part - del remaining_bytes + # handle crossfading support + if len(last_part) < fade_bytes: + # not enough data for crossfade duration after the strip action... + last_part = prev_chunk + chunk + if len(last_part) < fade_bytes: + # still not enough data so we'll skip the crossfading + LOGGER.debug("not enough data for fadeout so skip crossfade... %s" % len(last_part)) + sox_proc.stdin.write(last_part) + bytes_written += len(last_part) + await sox_proc.stdin.drain() + del last_part + else: + # store fade section to be picked up for next track + last_fadeout_data = last_part[-fade_bytes:] + remaining_bytes = last_part[:-fade_bytes] + # write remaining bytes + sox_proc.stdin.write(remaining_bytes) + bytes_written += len(remaining_bytes) + await sox_proc.stdin.drain() + del last_part + del remaining_bytes + ### MIDDLE PARTS OF TRACK else: # middle part of the track # keep previous chunk in memory so we have enough samples to perform the crossfade @@ -223,22 +240,24 @@ class HTTPStreamer(): # WIP: update actual duration to the queue for more accurate now playing info accurate_duration = bytes_written / int(sample_rate * 4 * 2) queue_track.duration = accurate_duration - LOGGER.info("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) + LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration)) # end of queue reached, pass last fadeout bits to final output if last_fadeout_data and not cancelled.is_set(): sox_proc.stdin.write(last_fadeout_data) await sox_proc.stdin.drain() sox_proc.stdin.close() + sox_proc.terminate() await sox_proc.wait() LOGGER.info("streaming of queue for player %s completed" % player.name) async def __get_audio_stream(self, player, queue_item, cancelled, - chunksize=512000, resample=None): + chunksize=128000, resample=None): ''' get audio stream from provider and apply additional effects/processing where/if needed''' # get stream details from provider # sort by quality and check track availability - for prov_media in sorted(queue_item.provider_ids, key=operator.itemgetter('quality'), reverse=True): + for prov_media in sorted(queue_item.provider_ids, + key=operator.itemgetter('quality'), reverse=True): streamdetails = asyncio.run_coroutine_threadsafe( self.mass.music.providers[prov_media['provider']].get_stream_details(prov_media['item_id']), self.mass.event_loop).result() @@ -249,10 +268,9 @@ class HTTPStreamer(): queue_item.quality = prov_media['quality'] break if not streamdetails: - LOGGER.warning("no stream details!") + LOGGER.warning(f"no stream details for {queue_item.name}") yield (True, b'') return - print(streamdetails) # get sox effects and resample options sox_effects = await self.__get_player_sox_options(player, queue_item) outputfmt = 'flac -C 0' @@ -264,14 +282,15 @@ class HTTPStreamer(): # support for AAC created with ffmpeg in between args = 'ffmpeg -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_effects) elif streamdetails['type'] == 'url': - args = 'sox -V3 -t %s "%s" -t %s - %s' % (streamdetails["content_type"], + args = 'sox -t %s "%s" -t %s - %s' % (streamdetails["content_type"], streamdetails["path"], outputfmt, sox_effects) elif streamdetails['type'] == 'executable': - args = '%s | sox -V3 -t %s - -t %s - %s' % (streamdetails["path"], + args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, sox_effects) # start sox process process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE) + # fire event that streaming has started for this track (needed by some streaming providers) streamdetails["provider"] = queue_item.provider streamdetails["track_id"] = queue_item.item_id @@ -282,49 +301,43 @@ class HTTPStreamer(): # we keep 1 chunk behind to detect end of stream properly prev_chunk = b'' bytes_sent = 0 - first_chunk = False - while not process.stdout.at_eof(): + while not process.stdout.at_eof() and not process.returncode: + if cancelled.is_set(): + try: + process.terminate() + except ProcessLookupError: + pass try: chunk = await process.stdout.readexactly(chunksize) - except asyncio.streams.IncompleteReadError: - chunk = await process.stdout.read(chunksize) - if first_chunk: - print(len(chunk)) - if not chunk: - break + except asyncio.streams.IncompleteReadError as err: + chunk = err.partial if prev_chunk: yield (False, prev_chunk) bytes_sent += len(prev_chunk) prev_chunk = chunk # yield last chunk - if not cancelled.is_set(): - yield (True, prev_chunk) - bytes_sent += len(prev_chunk) + yield (True, prev_chunk) + bytes_sent += len(prev_chunk) await process.wait() if cancelled.is_set(): - try: - process.terminate() - except ProcessLookupError: - pass - LOGGER.debug("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent)) + LOGGER.info("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent)) else: - LOGGER.debug("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent)) + LOGGER.info("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent)) # fire event that streaming has ended for this track (needed by some streaming providers) if resample: bytes_per_second = resample * (32/8) * 2 + bytes_per_second = (resample * 32 * 2) / 8 + seconds_streamed = int(bytes_sent/bytes_per_second) else: - bytes_per_second = streamdetails["sample_rate"] * (streamdetails["bit_depth"]/8) * 2 - seconds_streamed = int(bytes_sent/bytes_per_second) + seconds_streamed = queue_item.duration streamdetails["seconds"] = seconds_streamed asyncio.run_coroutine_threadsafe( self.mass.signal_event('streaming_ended', streamdetails), self.mass.event_loop) # send task to background to analyse the audio - # TODO: send audio data completely - if not queue_item.media_type == MediaType.Radio: - asyncio.run_coroutine_threadsafe( - self.__analyze_audio(queue_item.item_id, queue_item.provider), - self.mass.event_loop) + asyncio.run_coroutine_threadsafe( + self.__analyze_audio(queue_item), + self.mass.event_loop) async def __get_player_sox_options(self, player, queue_item): ''' get player specific sox effect options ''' @@ -333,8 +346,9 @@ class HTTPStreamer(): if not player.supports_replay_gain and player.settings['volume_normalisation']: target_gain = int(player.settings['target_volume']) fallback_gain = int(player.settings['fallback_gain_correct']) - track_loudness = await self.mass.db.get_track_loudness( - queue_item.item_id, queue_item.provider) + track_loudness = asyncio.run_coroutine_threadsafe( + self.mass.db.get_track_loudness(queue_item.item_id, queue_item.provider), + self.mass.event_loop).result() if track_loudness == None: gain_correct = fallback_gain else: @@ -358,17 +372,21 @@ class HTTPStreamer(): sox_effects.append(player.settings['sox_effects']) return " ".join(sox_effects) - async def __analyze_audio(self, track_id, provider): + async def __analyze_audio(self, queue_item): ''' analyze track audio, for now we only calculate EBU R128 loudness ''' - track_key = '%s%s' %(track_id, provider) - if track_key in self.analyze_jobs: + if queue_item.media_type != MediaType.Track: + # TODO: calculate loudness average for web radio ? + return + item_key = '%s%s' %(queue_item.item_id, queue_item.provider) + if item_key in self.analyze_jobs: return # prevent multiple analyze jobs for same track - self.analyze_jobs[track_key] = True - streamdetails = await self.mass.music.providers[provider].get_stream_details(track_id) - track_loudness = await self.mass.db.get_track_loudness(track_id, provider) + self.analyze_jobs[item_key] = True + streamdetails = queue_item.stream_details + track_loudness = await self.mass.db.get_track_loudness( + queue_item.item_id, queue_item.provider) if track_loudness == None: # only when needed we do the analyze stuff - LOGGER.debug('Start analyzing track %s' % track_id) + LOGGER.debug('Start analyzing track %s' % item_key) if streamdetails['type'] == 'url': async with aiohttp.ClientSession() as session: async with session.get(streamdetails["path"]) as resp: @@ -384,11 +402,11 @@ class HTTPStreamer(): meter = pyln.Meter(rate) # create BS.1770 meter loudness = meter.integrated_loudness(data) # measure loudness del data - LOGGER.debug("Integrated loudness of track %s is: %s" %(track_id, loudness)) - await self.mass.db.set_track_loudness(track_id, provider, loudness) + LOGGER.debug("Integrated loudness of track %s is: %s" %(item_key, loudness)) + await self.mass.db.set_track_loudness(queue_item.item_id, queue_item.provider, loudness) del audio_data - LOGGER.debug('Finished analyzing track %s' % track_id) - self.analyze_jobs.pop(track_key, None) + LOGGER.debug('Finished analyzing track %s' % item_key) + self.analyze_jobs.pop(item_key, None) async def __crossfade_pcm_parts(self, fade_in_part, fade_out_part, pcm_args, fade_length): ''' crossfade two chunks of audio using sox ''' diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 320fcfd2..3216a18f 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -114,7 +114,7 @@ class Player(): self._player_settings = None # public attributes self.supports_queue = True # has native support for a queue - self.supports_gapless = True # has native gapless support + self.supports_gapless = False # has native gapless support self.supports_crossfade = False # has native crossfading support self.supports_replay_gain = False # has native support for replaygain volume leveling # if home assistant support is enabled, register state listener diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 89fddb11..d2b67567 100755 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -207,7 +207,7 @@ class PlayerQueue(): :param offset: offset from current queue position ''' insert_at_index = self.cur_index + offset - if not self.items or insert_at_index >= len(self.items): + if not self.items or insert_at_index > len(self.items): return await self.load(queue_items) if self.shuffle_enabled: queue_items = await self.__shuffle_items(queue_items) @@ -239,7 +239,7 @@ class PlayerQueue(): if self.items and len(self.items) > self._last_index: queue_index = self._last_index # holds the last starting position queue_track = None - while True: + while len(self.items) > queue_index: queue_track = self.items[queue_index] if cur_time_queue > (queue_track.duration + total_time): total_time += queue_track.duration diff --git a/music_assistant/playerproviders/chromecast.py b/music_assistant/playerproviders/chromecast.py index 14f16578..f301374b 100644 --- a/music_assistant/playerproviders/chromecast.py +++ b/music_assistant/playerproviders/chromecast.py @@ -4,6 +4,7 @@ import asyncio import aiohttp from typing import List +import logging import pychromecast from pychromecast.controllers.multizone import MultizoneController from pychromecast.controllers import BaseController @@ -181,6 +182,7 @@ class ChromecastProvider(PlayerProvider): self.prov_id = 'chromecast' self.name = 'Chromecast' self._discovery_running = False + logging.getLogger('pychromecast').setLevel(logging.WARNING) self.player_config_entries = [("gapless_enabled", False, "gapless_enabled")] self.mass.event_loop.create_task(self.__periodic_chromecast_discovery()) @@ -293,6 +295,10 @@ class ChromecastProvider(PlayerProvider): chromecast.media_controller.register_status_listener(listenerMedia) player = ChromecastPlayer(self.mass, player_id, self.prov_id) player.poll_task = False + self.supports_queue = True + self.supports_gapless = False + self.supports_crossfade = False + self.supports_replay_gain = False if chromecast.cast_type == 'group': player.is_group = True mz = MultizoneController(chromecast.uuid) diff --git a/music_assistant/playerproviders/lms.py b/music_assistant/playerproviders/lms.py deleted file mode 100644 index cc1b55fd..00000000 --- a/music_assistant/playerproviders/lms.py +++ /dev/null @@ -1,316 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding:utf-8 -*- - -import asyncio -import os -from typing import List -import random -import sys -import json -import aiohttp -import time -import datetime -import hashlib -from asyncio_throttle import Throttler -from aiocometd import Client, ConnectionType, Extension -import copy -import urllib - -from ..cache import use_cache -from ..utils import run_periodic, LOGGER, parse_track_title -from ..models import PlayerProvider, Player, PlayerState, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist -from ..constants import CONF_ENABLED, CONF_HOSTNAME, CONF_PORT - - -def setup(mass): - ''' setup the provider''' - enabled = mass.config["playerproviders"]['lms'].get(CONF_ENABLED) - hostname = mass.config["playerproviders"]['lms'].get(CONF_HOSTNAME) - port = mass.config["playerproviders"]['lms'].get(CONF_PORT) - if enabled and hostname and port: - provider = LMSProvider(mass, hostname, port) - return provider - return False - -def config_entries(): - ''' get the config entries for this provider (list with key/value pairs)''' - return [ - (CONF_ENABLED, False, CONF_ENABLED), - (CONF_HOSTNAME, 'localhost', CONF_HOSTNAME), - (CONF_PORT, 9000, CONF_PORT) - ] - -class LMSProvider(PlayerProvider): - ''' support for Logitech Media Server ''' - - def __init__(self, mass, hostname, port): - super().__init__(mass) - self.prov_id = 'lms' - self.name = 'Logitech Media Server' - self._host = hostname - self._port = port - self.last_msg_received = 0 - self.http_session = aiohttp.ClientSession(loop=mass.event_loop) - # we use a combi of active polling and subscriptions because the cometd implementation of LMS is somewhat unreliable - asyncio.ensure_future(self.__lms_events()) - asyncio.ensure_future(self.__get_players()) - - ### Provider specific implementation ##### - - - async def player_command(self, player_id, cmd:str, cmd_args=None): - ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) ''' - lms_commands = [] - if cmd == 'play': - lms_commands = ['play'] - elif cmd == 'pause': - lms_commands = ['pause', '1'] - elif cmd == 'stop': - lms_commands = ['stop'] - elif cmd == 'next': - lms_commands = ['playlist', 'index', '+1'] - elif cmd == 'previous': - lms_commands = ['playlist', 'index', '-1'] - elif cmd == 'stop': - lms_commands = ['playlist', 'stop'] - elif cmd == 'power' and cmd_args == 'off': - lms_commands = ['power', '0'] - elif cmd == 'power': - lms_commands = ['power', '1'] - elif cmd == 'volume': - lms_commands = ['mixer', 'volume', cmd_args] - elif cmd == 'mute' and cmd_args == 'off': - lms_commands = ['mixer', 'muting', '0'] - elif cmd == 'mute': - lms_commands = ['mixer', 'muting', '1'] - return await self.__get_data(lms_commands, player_id=player_id) - - async def play_media(self, player_id, media_items, queue_opt='play'): - ''' - play media on a player - ''' - if queue_opt == 'play': - cmd = ['playlist', 'insert', media_items[0].uri] - await self.__get_data(cmd, player_id=player_id) - cmd = ['playlist', 'index', '+1'] - await self.__get_data(cmd, player_id=player_id) - for track in media_items[1:]: - cmd = ['playlist', 'insert', track.uri] - await self.__get_data(cmd, player_id=player_id) - elif queue_opt == 'replace': - cmd = ['playlist', 'play', media_items[0].uri] - await self.__get_data(cmd, player_id=player_id) - for track in media_items[1:]: - cmd = ['playlist', 'add', track.uri] - await self.__get_data(cmd, player_id=player_id) - elif queue_opt == 'next': - for track in media_items: - cmd = ['playlist', 'insert', track.uri] - await self.__get_data(cmd, player_id=player_id) - else: - for track in media_items: - cmd = ['playlist', 'add', track.uri] - await self.__get_data(cmd, player_id=player_id) - - async def player_queue(self, player_id, offset=0, limit=50): - ''' return the items in the player's queue ''' - items = [] - player_details = await self.__get_data(["status", offset, limit, "tags:aAcCdegGijJKlostuxyRwk"], player_id=player_id) - if 'playlist_loop' in player_details: - for item in player_details['playlist_loop']: - track = await self.__parse_track(item) - items.append(track) - return items - - ### Provider specific (helper) methods ##### - - async def __get_players(self): - ''' update all players, used as fallback if cometd is failing and to detect removed players''' - server_info = await self.__get_data(['players', 0, 1000]) - player_ids = await self.__process_serverstatus(server_info) - for player_id in player_ids: - player_details = await self.__get_data(["status", "-","1", "tags:aAcCdegGijJKlostuxyRwk"], player_id=player_id) - await self.__process_player_details(player_id, player_details) - - async def __process_player_details(self, player_id, player_details): - ''' get state of a given player ''' - if player_id not in self._players: - return - player = self._players[player_id] - volume = player_details.get('mixer volume',0) - player.muted = volume < 0 - if volume >= 0: - player.volume_level = player_details.get('mixer volume',0) - player.shuffle_enabled = player_details.get('playlist shuffle',0) != 0 - player.repeat_enabled = player_details.get('playlist repeat',0) != 0 - # player state - if 'power' in player_details: - player.powered = player_details['power'] == 1 - else: - print(player_details) # DEBUG - if player_details['mode'] == 'play': - player.state = PlayerState.Playing - elif player_details['mode'] == 'pause': - player.state = PlayerState.Paused - else: - player.state = PlayerState.Stopped - # current track - if player_details.get('playlist_loop'): - player.cur_item = await self.__parse_track(player_details['playlist_loop'][0]) - player.cur_time = player_details.get('time',0) - else: - player.cur_item = None - player.cur_time = 0 - await self.mass.player.update_player(player) - - async def __process_serverstatus(self, server_status): - ''' process players from server state msg (players_loop) ''' - cur_player_ids = [] - for lms_player in server_status['players_loop']: - if lms_player['isplayer'] != 1: - continue - player_id = lms_player['playerid'] - cur_player_ids.append(player_id) - if not player_id in self._players: - # new player - self._players[player_id] = MusicPlayer() - player = self._players[player_id] - player.player_id = player_id - player.player_provider = self.prov_id - else: - # existing player - player = self._players[player_id] - # always update player details that may change - player.name = lms_player['name'] - if lms_player['model'] == "group": - player.is_group = True - # player is a groupplayer, retrieve childs - group_player_child_ids = await self.__get_group_childs(player_id) - for child_player_id in group_player_child_ids: - if child_player_id in self._players: - self._players[child_player_id].group_parent = player_id - elif player.group_parent: - # check if player parent is still correct - group_player_child_ids = await self.__get_group_childs(player.group_parent) - if not player_id in group_player_child_ids: - player.group_parent = None - # process update - await self.mass.player.update_player(player) - # process removed players... - for player_id, player in self._players.items(): - if player_id not in cur_player_ids: - await self.mass.player.remove_player(player_id) - return cur_player_ids - - async def __parse_track(self, track_details): - ''' parse track in LMS to our internal format ''' - track_url = track_details.get('url','') - if track_url.startswith('qobuz://') and 'qobuz' in self.mass.music.providers: - # qobuz track! - try: - track_id = track_url.replace('qobuz://','').replace('.flac','') - return await self.mass.music.providers['qobuz'].track(track_id) - except Exception as exc: - LOGGER.error(exc) - elif track_url.startswith('spotify://track:') and 'spotify' in self.mass.music.providers: - # spotify track! - try: - track_id = track_url.replace('spotify://track:','') - return await self.mass.music.providers['spotify'].track(track_id) - except Exception as exc: - LOGGER.error(exc) - elif track_url.startswith('http') and '/stream' in track_url: - params = urllib.parse.parse_qs(track_url.split('?')[1]) - track_id = params['track_id'][0] - provider = params['provider'][0] - return await self.mass.music.providers[provider].track(track_id) - # fallback to a generic track - track = Track() - track.name = track_details['title'] - track.duration = int(track_details['duration']) - if 'artwork_url' in track_details: - image = "http://%s:%s%s" % (self._host, self._port, track_details['artwork_url']) - track.metadata['image'] = image - return track - - async def __get_group_childs(self, group_player_id): - ''' get child players for groupplayer ''' - group_childs = [] - result = await self.__get_data('playergroup', player_id=group_player_id) - if result and 'players_loop' in result: - group_childs = [item['id'] for item in result['players_loop']] - return group_childs - - async def __lms_events(self): - # Receive events from LMS through CometD socket - while self.mass.event_loop.is_running(): - try: - last_msg_received = 0 - async with Client("http://%s:%s/cometd" % (self._host, self._port), - connection_types=ConnectionType.LONG_POLLING, - extensions=[LMSExtension()]) as client: - # subscribe - watched_players = [] - await client.subscribe("/slim/subscribe/serverstatus") - - # listen for incoming messages - async for message in client: - last_msg_received = int(time.time()) - if 'playerstatus' in message['channel']: - # player state - player_id = message['channel'].split('playerstatus/')[1] - asyncio.ensure_future(self.__process_player_details(player_id, message['data'])) - elif '/slim/serverstatus' in message['channel']: - # server state with all players - player_ids = await self.__process_serverstatus(message['data']) - for player_id in player_ids: - if player_id not in watched_players: - # subscribe to player change events - watched_players.append(player_id) - await client.subscribe("/slim/subscribe/playerstatus/%s" % player_id) - except Exception as exc: - LOGGER.exception(exc) - - async def __get_data(self, cmds:List, player_id=''): - ''' get data from api''' - if not isinstance(cmds, list): - cmds = [cmds] - cmd = [player_id, cmds] - url = "http://%s:%s/jsonrpc.js" % (self._host, self._port) - params = {"id": 1, "method": "slim.request", "params": cmd} - try: - async with self.http_session.post(url, json=params) as response: - result = await response.json() - return result['result'] - except Exception as exc: - LOGGER.exception('Error executing LMS command %s' % params) - return None - - -class LMSExtension(Extension): - ''' Extension for the custom cometd implementation of LMS''' - - async def incoming(self, payload, headers=None): - pass - - async def outgoing(self, payload, headers): - ''' override outgoing messages to fit LMS custom implementation''' - - # LMS does not need/want id for the connect and handshake message - if payload[0]['channel'] == '/meta/handshake' or payload[0]['channel'] == '/meta/connect': - del payload[0]['id'] - - # handle subscriptions - if 'subscribe' in payload[0]['channel']: - client_id = payload[0]['clientId'] - if payload[0]['subscription'] == '/slim/subscribe/serverstatus': - # append additional request data to the request - payload[0]['data'] = {'response':'/%s/slim/serverstatus' % client_id, - 'request':['', ['serverstatus', 0, 100, 'subscribe:60']]} - payload[0]['channel'] = '/slim/subscribe' - if payload[0]['subscription'].startswith('/slim/subscribe/playerstatus'): - # append additional request data to the request - player_id = payload[0]['subscription'].split('/')[-1] - payload[0]['data'] = {'response':'/%s/slim/playerstatus/%s' % (client_id, player_id), - 'request':[player_id, ["status", "-", 1, "tags:aAcCdegGijJKlostuxyRwk", "subscribe:60"]]} - payload[0]['channel'] = '/slim/subscribe' \ No newline at end of file diff --git a/music_assistant/playerproviders/pylms.py b/music_assistant/playerproviders/pylms.py deleted file mode 100644 index 99cfb590..00000000 --- a/music_assistant/playerproviders/pylms.py +++ /dev/null @@ -1,799 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding:utf-8 -*- - -import asyncio -import os -import struct -from collections import OrderedDict -import time -import decimal -from typing import List -import random -import sys -import socket -from ..utils import run_periodic, LOGGER, parse_track_title, try_parse_int, get_ip, get_hostname -from ..models import PlayerProvider, Player, PlayerState, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist -from ..constants import CONF_ENABLED - - -def setup(mass): - ''' setup the provider''' - enabled = mass.config["playerproviders"]['pylms'].get(CONF_ENABLED) - if enabled: - provider = PyLMSServer(mass) - return provider - return False - -def config_entries(): - ''' get the config entries for this provider (list with key/value pairs)''' - return [ - (CONF_ENABLED, True, CONF_ENABLED) - ] - - -class PyLMSServer(PlayerProvider): - ''' Python implementation of SlimProto server ''' - - def __init__(self, mass): - super().__init__(mass) - self.prov_id = 'pylms' - self.name = 'Logitech Media Server Emulation' - self._lmsplayers = {} - self.buffer = b'' - self.last_msg_received = 0 - - # start slimproto server - mass.event_loop.create_task(asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483)) - # setup discovery - mass.event_loop.create_task(self.start_discovery()) - - ### Provider specific implementation ##### - - async def start_discovery(self): - transport, protocol = await self.mass.event_loop.create_datagram_endpoint( - lambda: DiscoveryProtocol(self.mass.web._http_port), - local_addr=('0.0.0.0', 3483)) - try: - while True: - await asyncio.sleep(60) # serve forever - finally: - transport.close() - - async def player_command(self, player_id, cmd:str, cmd_args=None): - ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) ''' - if cmd == 'play': - if self._players[player_id].state == PlayerState.Stopped: - await self.__queue_play(player_id, None) - else: - self._lmsplayers[player_id].unpause() - elif cmd == 'pause': - self._lmsplayers[player_id].pause() - elif cmd == 'stop': - self._lmsplayers[player_id].stop() - elif cmd == 'next': - self._lmsplayers[player_id].next() - elif cmd == 'previous': - await self.__queue_previous(player_id) - elif cmd == 'power' and cmd_args == 'off': - self._lmsplayers[player_id].power_off() - elif cmd == 'power': - self._lmsplayers[player_id].power_on() - elif cmd == 'volume': - self._lmsplayers[player_id].volume_set(try_parse_int(cmd_args)) - elif cmd == 'mute' and cmd_args == 'off': - self._lmsplayers[player_id].unmute() - elif cmd == 'mute': - self._lmsplayers[player_id].mute() - - async def play_media(self, player_id, media_items, queue_opt='play'): - ''' - play media on a player - ''' - player = await self.get_player(player_id) - cur_index = player.cur_queue_index - - if queue_opt == 'replace' or not player.queue: - # overwrite queue with new items - player.queue = media_items - await self.__queue_play(player_id, 0, send_flush=True) - elif queue_opt == 'play': - # replace current item with new item(s) - player.queue = player.queue[player_id][:cur_index] + media_items + player.queue[player_id][cur_index+1:] - await self.__queue_play(player_id, cur_index, send_flush=True) - elif queue_opt == 'next': - # insert new items at current index +1 - player.queue[player_id] = player.queue[player_id][:cur_index+1] + media_items + player.queue[player_id][cur_index+1:] - elif queue_opt == 'add': - # add new items at end of queue - player.queue[player_id] = player.queue[player_id] + media_items - - ### Provider specific (helper) methods ##### - - async def __queue_play(self, player_id, index, send_flush=False): - ''' send play command to player ''' - if not player_id in player.queue or not player_id in player.queue_index: - return - if not player.queue[player_id]: - return - if index == None: - index = player.queue_index[player_id] - if len(player.queue[player_id]) >= index: - track = player.queue[player_id][index] - if send_flush: - self._lmsplayers[player_id].flush() - self._lmsplayers[player_id].play(track.uri) - player.queue_index[player_id] = index - - async def __queue_next(self, player_id): - ''' request next track from queue ''' - if not player_id in player.queue or not player_id in player.queue: - return - cur_queue_index = player.queue_index[player_id] - if len(player.queue[player_id]) > cur_queue_index: - new_queue_index = cur_queue_index + 1 - elif self._players[player_id].repeat_enabled: - new_queue_index = 0 - else: - LOGGER.warning("next track requested but no more tracks in queue") - return - return await self.__queue_play(player_id, new_queue_index) - - async def __queue_previous(self, player_id): - ''' request previous track from queue ''' - if not player_id in player.queue: - return - cur_queue_index = player.queue_index[player_id] - if cur_queue_index == 0 and len(player.queue[player_id]) > 1: - new_queue_index = len(player.queue[player_id]) -1 - elif cur_queue_index == 0: - new_queue_index = cur_queue_index - else: - new_queue_index -= 1 - player.queue_index[player_id] = new_queue_index - return await self.__queue_play(player_id, new_queue_index) - - async def __handle_player_event(self, player_id, event, event_data=None): - ''' handle event from player ''' - if not player_id: - return - LOGGER.debug("Event from player %s: %s - event_data: %s" %(player_id, event, str(event_data))) - lms_player = self._lmsplayers[player_id] - if event == "next_track": - return await self.__queue_next(player_id) - player - if not player_id in self._players: - player = MusicPlayer() - player.player_id = player_id - player.player_provider = self.prov_id - self._players[player_id] = player - if not player_id in player.queue: - player.queue[player_id] = [] - if not player_id in player.queue_index: - player.queue_index[player_id] = 0 - else: - player = self._players[player_id] - # update player properties - player.name = lms_player.player_name - player.volume_level = lms_player.volume_level - player.cur_time = lms_player._elapsed_seconds - if event == "disconnected": - return await self.mass.player.remove_player(player_id) - elif event == "power": - player.powered = event_data - elif event == "state": - player.state = event_data - if player.queue[player_id]: - cur_queue_index = player.queue_index[player_id] - player.cur_item = player.queue[player_id][cur_queue_index] - # update player details - await self.mass.player.update_player(player) - - async def __handle_socket_client(self, reader, writer): - ''' handle a client connection on the socket''' - LOGGER.debug("new socket client connected") - stream_host = get_ip() - stream_port = self.mass.config['base']['web']['http_port'] - lms_player = PyLMSPlayer(stream_host, stream_port) - - def send_frame(command, data): - ''' send command to lms player''' - packet = struct.pack('!H', len(data) + 4) + command + data - writer.write(packet) - - def handle_event(event, event_data=None): - ''' handle events from player''' - if event == "connected": - self._lmsplayers[lms_player.player_id] = lms_player - lms_player.player_settings = self.mass.config['player_settings'][lms_player.player_id] - asyncio.create_task(self.__handle_player_event(lms_player.player_id, event, event_data)) - - try: - @run_periodic(5) - async def send_heartbeat(): - timestamp = int(time.time()) - data = lms_player.pack_stream(b"t", replayGain=timestamp, flags=0) - lms_player.send_frame(b"strm", data) - - lms_player.send_frame = send_frame - lms_player.send_event = handle_event - heartbeat_task = asyncio.create_task(send_heartbeat()) - - # keep reading bytes from the socket - while True: - data = await reader.read(64) - if data: - lms_player.dataReceived(data) - else: - break - except Exception as exc: - # connection lost ? - LOGGER.warning(exc) - # disconnect - heartbeat_task.cancel() - asyncio.create_task(self.__handle_player_event(lms_player.player_id, 'disconnected')) - - -class PyLMSPlayer(object): - ''' very basic Python implementation of SlimProto ''' - - def __init__(self, stream_host, stream_port): - self.buffer = b'' - #self.display = Display() - self.send_frame = None - self.send_event = None - self.stream_host = stream_host - self.stream_port = stream_port - self.player_settings = {} - self.playback_millis = 0 - self._volume = PyLMSVolume() - self._device_type = None - self._mac_address = None - self._player_name = None - self._last_volume = 0 - self._last_heartbeat = 0 - self._elapsed_seconds = 0 - self._elapsed_milliseconds = 0 - - @property - def player_name(self): - if self._player_name: - return self._player_name - return "%s - %s" %(self._device_type, self._mac_address) - - @property - def player_id(self): - return self._mac_address - - @property - def volume_level(self): - return self._volume.volume - - def dataReceived(self, data): - self.buffer = self.buffer + data - if len(self.buffer) > 8: - operation, length = self.buffer[:4], self.buffer[4:8] - length = struct.unpack('!I', length)[0] - plen = length + 8 - if len(self.buffer) >= plen: - packet, self.buffer = self.buffer[8:plen], self.buffer[plen:] - operation = operation.strip(b"!").strip().decode() - #LOGGER.info("operation: %s" % operation) - handler = getattr(self, "process_%s" % operation, None) - if handler is None: - raise NotImplementedError - handler(packet) - - def send_version(self): - self.send_frame(b'vers', b'7.8') - - def pack_stream(self, command, autostart=b"1", formatbyte = b'o', pcmargs = (b'?',b'?',b'?',b'?'), threshold = 200, - spdif = b'0', transDuration = 0, transType = b'0', flags = 0x40, outputThreshold = 0, - replayGain=0, serverPort = 8095, serverIp = 0): - return struct.pack("!cccccccBcBcBBBLHL", - command, autostart, formatbyte, *pcmargs, - threshold, spdif, transDuration, transType, - flags, outputThreshold, 0, replayGain, serverPort, serverIp) - - def stop(self): - data = self.pack_stream(b"q", autostart=b"0", flags=0) - self.send_frame(b"strm", data) - - def flush(self): - data = self.pack_stream(b"f", autostart=b"0", flags=0) - self.send_frame(b"strm", data) - - def pause(self): - data = self.pack_stream(b"p", autostart=b"0", flags=0) - self.send_frame(b"strm", data) - LOGGER.info("Sending pause request") - - def unpause(self): - data = self.pack_stream(b"u", autostart=b"0", flags=0) - self.send_frame(b"strm", data) - LOGGER.info("Sending unpause request") - - def next(self): - data = self.pack_stream(b"f", autostart=b"0", flags=0) - self.send_frame(b"strm", data) - self.send_event("next_track") - - def previous(self): - data = self.pack_stream(b"f", autostart=b"0", flags=0) - self.send_frame(b"strm", data) - self.send_event("previous_track") - - def power_on(self): - self.send_frame(b"aude", struct.pack("2B", 1, 1)) - self.send_event("power", True) - - def power_off(self): - self.stop() - self.send_frame(b"aude", struct.pack("2B", 0, 0)) - self.send_event("power", False) - - def mute_on(self): - self.send_frame(b"aude", struct.pack("2B", 0, 0)) - self.send_event("mute", True) - - def mute_off(self): - self.send_frame(b"aude", struct.pack("2B", 1, 1)) - self.send_event("mute", False) - - def volume_up(self): - self._volume.increment() - self.send_volume() - - def volume_down(self): - self._volume.decrement() - self.send_volume() - - def volume_set(self, new_vol): - self._volume.volume = new_vol - self.send_volume() - - def play(self, uri): - enable_crossfade = self.player_settings["crossfade_duration"] > 0 - command = b's' - autostart = b'3' # we use direct stream for now so let the player do the messy work with buffers - transType= b'1' if enable_crossfade else b'0' - transDuration = self.player_settings["crossfade_duration"] - formatbyte = b'f' # fixed to flac - uri = '/stream' + uri.split('/stream')[1] - data = self.pack_stream(command, autostart=autostart, flags=0x00, formatbyte=formatbyte, transType=transType, transDuration=transDuration) - headers = "Connection: close\r\nAccept: */*\r\nHost: %s:%s\r\n" %(self.stream_host, self.stream_port) - request = "GET %s HTTP/1.0\r\n%s\r\n" % (uri, headers) - data = data + request.encode("utf-8") - self.send_frame(b'strm', data) - LOGGER.info("Requesting play from squeezebox" ) - - def displayTrack(self, track): - self.render("%s by %s" % (track.title, track.artist)) - - def process_HELO(self, data): - (devId, rev, mac) = struct.unpack('BB6s', data[:8]) - device_mac = ':'.join("%02x" % x for x in mac) - self._device_type = devices.get(devId, 'unknown device') - self._mac_address = str(device_mac).lower() - LOGGER.debug("HELO received from %s %s" % (self._mac_address, self._device_type)) - self.init_client() - - def init_client(self): - ''' initialize a new connected client ''' - self.send_event("connected") - self.send_version() - self.stop() - self.setBrightness() - #self.set_visualisation(SpectrumAnalyser()) - self.send_frame(b"setd", struct.pack("B", 0)) - self.send_frame(b"setd", struct.pack("B", 4)) - self.power_on() - self.volume_set(40) # TODO: remember last volume - - def send_volume(self): - og = self._volume.old_gain() - ng = self._volume.new_gain() - LOGGER.info("Volume set to %d (%d/%d)" % (self._volume.volume, og, ng)) - d = self.send_frame(b"audg", struct.pack("!LLBBLL", og, og, 1, 255, ng, ng)) - self.send_event("volume", self._volume.volume) - - def setBrightness(self, level=4): - assert 0 <= level <= 4 - self.send_frame(b"grfb", struct.pack("!H", level)) - - def set_visualisation(self, visualisation): - self.send_frame(b"visu", visualisation.pack()) - - def render(self, text): - #self.display.clear() - #self.display.renderText(text, "DejaVu-Sans", 16, (0,0)) - #self.updateDisplay(self.display.frame()) - pass - - def updateDisplay(self, bitmap, transition = 'c', offset=0, param=0): - frame = struct.pack("!Hcb", offset, transition, param) + bitmap - self.send_frame(b"grfe", frame) - - def process_STAT(self, data): - ev = data[:4] - if ev == b'\x00\x00\x00\x00': - LOGGER.info("Presumed informational stat message") - else: - handler = getattr(self, 'stat_%s' % ev.decode(), None) - if handler is None: - raise NotImplementedError("Stat message %r not known" % ev) - handler(data[4:]) - - def stat_aude(self, data): - (spdif_enable, dac_enable) = struct.unpack("2B", data[:4]) - powered = spdif_enable or dac_enable - self.send_event("power", powered) - LOGGER.debug("ACK aude - Received player power: %s" % powered) - - def stat_audg(self, data): - LOGGER.info("Received volume_level from player %s" % data) - self.send_event("volume", self._volume.volume) - - def stat_strm(self, data): - LOGGER.debug("ACK strm") - #self.send_frame(b"cont", b"0") - - def stat_STMc(self, data): - LOGGER.debug("Status Message: Connect") - - def stat_STMd(self, data): - LOGGER.debug("Decoder Ready for next track") - self.send_event("next_track") - - def stat_STMe(self, data): - LOGGER.info("Connection established") - - def stat_STMf(self, data): - LOGGER.info("Status Message: Connection closed") - self.send_event("state", PlayerState.Stopped) - - def stat_STMh(self, data): - LOGGER.info("Status Message: End of headers") - - def stat_STMn(self, data): - LOGGER.error("Decoder does not support file format") - - def stat_STMo(self, data): - ''' No more decoded (uncompressed) data to play; triggers rebuffering. ''' - LOGGER.debug("Output Underrun") - - def stat_STMp(self, data): - '''Pause confirmed''' - self.send_event("state", PlayerState.Paused) - - def stat_STMr(self, data): - '''Resume confirmed''' - self.send_event("state", PlayerState.Playing) - - def stat_STMs(self, data): - '''Playback of new track has started''' - self.send_event("state", PlayerState.Playing) - - def stat_STMt(self, data): - """ heartbeat from client """ - timestamp = time.time() - self._last_heartbeat = timestamp - (num_crlf, mas_initialized, mas_mode, rptr, wptr, - bytes_received_h, bytes_received_l, signal_strength, - jiffies, output_buffer_size, output_buffer_fullness, - elapsed_seconds, voltage, elapsed_milliseconds, - server_timestamp, error_code) = struct.unpack("!BBBLLLLHLLLLHLLH", data) - if elapsed_seconds != self._elapsed_seconds: - self.send_event("progress") - self._elapsed_seconds = elapsed_seconds - self._elapsed_milliseconds = elapsed_milliseconds - - def stat_STMu(self, data): - '''Normal end of playback''' - LOGGER.info("End of playback - Underrun") - self.send_event("state", PlayerState.Stopped) - - def process_BYE(self, data): - LOGGER.info("BYE received") - self.send_event("disconnected") - - def process_RESP(self, data): - LOGGER.info("RESP received") - self.send_frame(b"cont", b"0") - - def process_BODY(self, data): - LOGGER.info("BODY received") - - def process_META(self, data): - LOGGER.info("META received") - - def process_DSCO(self, data): - LOGGER.info("Data Stream Disconnected") - - def process_DBUG(self, data): - LOGGER.info("DBUG received") - - def process_IR(self, data): - """ Slightly involved codepath here. This raises an event, which may - be picked up by the service and then the process_remote_* function in - this player will be called. This is mostly relevant for volume changes - - most other button presses will require some context to operate. """ - (time, code) = struct.unpack("!IxxI", data) - LOGGER.info("IR code %s" % code) - # command = Remote.codes.get(code, None) - # if command is not None: - # LOGGER.info("IR received: %r, %r" % (code, command)) - # #self.service.evreactor.fireEvent(RemoteButtonPressed(self, command)) - # else: - # LOGGER.info("Unknown IR received: %r, %r" % (time, code)) - - def process_RAWI(self, data): - LOGGER.info("RAWI received") - - def process_ANIC(self, data): - LOGGER.info("ANIC received") - - def process_BUTN(self, data): - LOGGER.info("BUTN received") - - def process_KNOB(self, data): - ''' Transporter only, knob-related ''' - LOGGER.info("KNOB received") - - def process_SETD(self, data): - ''' Get/set player firmware settings ''' - LOGGER.debug("SETD received %s" % data) - cmd_id = data[0] - if cmd_id == 0: - # received player name - data = data[1:].decode() - self._player_name = data - self.send_event("name") - - def process_UREQ(self, data): - LOGGER.info("UREQ received") - - - -# from http://wiki.slimdevices.com/index.php/SlimProtoTCPProtocol#HELO -devices = { - 2: 'squeezebox', - 3: 'softsqueeze', - 4: 'squeezebox2', - 5: 'transporter', - 6: 'softsqueeze3', - 7: 'receiver', - 8: 'squeezeslave', - 9: 'controller', - 10: 'boom', - 11: 'softboom', - 12: 'squeezeplay', - } - - -class PyLMSVolume(object): - - """ Represents a sound volume. This is an awful lot more complex than it - sounds. """ - - minimum = 0 - maximum = 100 - step = 1 - - # this map is taken from Slim::Player::Squeezebox2 in the squeezecenter source - # i don't know how much magic it contains, or any way I can test it - old_map = [ - 0, 1, 1, 1, 2, 2, 2, 3, 3, 4, - 5, 5, 6, 6, 7, 8, 9, 9, 10, 11, - 12, 13, 14, 15, 16, 16, 17, 18, 19, 20, - 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, - 33, 34, 35, 37, 38, 39, 40, 42, 43, 44, - 46, 47, 48, 50, 51, 53, 54, 56, 57, 59, - 60, 61, 63, 65, 66, 68, 69, 71, 72, 74, - 75, 77, 79, 80, 82, 84, 85, 87, 89, 90, - 92, 94, 96, 97, 99, 101, 103, 104, 106, 108, 110, - 112, 113, 115, 117, 119, 121, 123, 125, 127, 128 - ]; - - # new gain parameters, from the same place - total_volume_range = -50 # dB - step_point = -1 # Number of steps, up from the bottom, where a 2nd volume ramp kicks in. - step_fraction = 1 # fraction of totalVolumeRange where alternate volume ramp kicks in. - - def __init__(self): - self.volume = 50 - - def increment(self): - """ Increment the volume """ - self.volume += self.step - if self.volume > self.maximum: - self.volume = self.maximum - - def decrement(self): - """ Decrement the volume """ - self.volume -= self.step - if self.volume < self.minimum: - self.volume = self.minimum - - def old_gain(self): - """ Return the "Old" gain value as required by the squeezebox """ - return self.old_map[self.volume] - - def decibels(self): - """ Return the "new" gain value. """ - - step_db = self.total_volume_range * self.step_fraction - max_volume_db = 0 # different on the boom? - - # Equation for a line: - # y = mx+b - # y1 = mx1+b, y2 = mx2+b. - # y2-y1 = m(x2 - x1) - # y2 = m(x2 - x1) + y1 - slope_high = max_volume_db - step_db / (100.0 - self.step_point) - slope_low = step_db - self.total_volume_range / (self.step_point - 0.0) - x2 = self.volume - if (x2 > self.step_point): - m = slope_high - x1 = 100 - y1 = max_volume_db - else: - m = slope_low - x1 = 0 - y1 = self.total_volume_range - return m * (x2 - x1) + y1 - - def new_gain(self): - db = self.decibels() - floatmult = 10 ** (db/20.0) - # avoid rounding errors somehow - if -30 <= db <= 0: - return int(floatmult * (1 << 8) + 0.5) * (1<<8) - else: - return int((floatmult * (1<<16)) + 0.5) - - -##### UDP DISCOVERY STUFF ############# - -class Datagram(object): - - @classmethod - def decode(self, data): - if data[0] == 'e': - return TLVDiscoveryRequestDatagram(data) - elif data[0] == 'E': - return TLVDiscoveryResponseDatagram(data) - elif data[0] == 'd': - return ClientDiscoveryDatagram(data) - elif data[0] == 'h': - pass # Hello! - elif data[0] == 'i': - pass # IR - elif data[0] == '2': - pass # i2c? - elif data[0] == 'a': - pass # ack! - -class ClientDiscoveryDatagram(Datagram): - - device = None - firmware = None - client = None - - def __init__(self, data): - s = struct.unpack('!cxBB8x6B', data.encode()) - assert s[0] == 'd' - self.device = s[1] - self.firmware = hex(s[2]) - self.client = ":".join(["%02x" % (x,) for x in s[3:]]) - - def __repr__(self): - return "<%s device=%r firmware=%r client=%r>" % (self.__class__.__name__, self.device, self.firmware, self.client) - -class DiscoveryResponseDatagram(Datagram): - - def __init__(self, hostname, port): - hostname = hostname[:16].encode("UTF-8") - hostname += (16 - len(hostname)) * '\x00' - self.packet = struct.pack('!c16s', 'D', hostname).decode() - -class TLVDiscoveryRequestDatagram(Datagram): - - def __init__(self, data): - requestdata = OrderedDict() - assert data[0] == 'e' - idx = 1 - length = len(data)-5 - while idx <= length: - typ, l = struct.unpack_from("4sB", data.encode(), idx) - if l: - val = data[idx+5:idx+5+l] - idx += 5+l - else: - val = None - idx += 5 - typ = typ.decode() - requestdata[typ] = val - self.data = requestdata - - def __repr__(self): - return "<%s data=%r>" % (self.__class__.__name__, self.data.items()) - -class TLVDiscoveryResponseDatagram(Datagram): - - def __init__(self, responsedata): - parts = ['E'] # new discovery format - for typ, value in responsedata.items(): - if value is None: - value = '' - elif len(value) > 255: - LOGGER.warning("Response %s too long, truncating to 255 bytes" % typ) - value = value[:255] - parts.extend((typ, chr(len(value)), value)) - self.packet = ''.join(parts) - -class DiscoveryProtocol(): - - def __init__(self, web_port): - self.web_port = web_port - - def connection_made(self, transport): - self.transport = transport - # Allow receiving multicast broadcasts - sock = self.transport.get_extra_info('socket') - group = socket.inet_aton('239.255.255.250') - mreq = struct.pack('4sL', group, socket.INADDR_ANY) - sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) - - def build_TLV_response(self, requestdata): - responsedata = OrderedDict() - for typ, value in requestdata.items(): - if typ == 'NAME': - # send full host name - no truncation - value = get_hostname() - elif typ == 'IPAD': - # send ipaddress as a string only if it is set - value = get_ip() - # :todo: IPv6 - if value == '0.0.0.0': - # do not send back an ip address - typ = None - elif typ == 'JSON': - # send port as a string - json_port = self.web_port - value = str(json_port) - elif typ == 'VERS': - # send server version - value = '7.9' - elif typ == 'UUID': - # send server uuid - value = 'musicassistant' - else: - LOGGER.debug('Unexpected information request: %r', typ) - typ = None - if typ: - responsedata[typ] = value - return responsedata - - def datagram_received(self, data, addr): - try: - data = data.decode() - dgram = Datagram.decode(data) - LOGGER.debug("Data received from %s: %s" % (addr, dgram)) - if isinstance(dgram, ClientDiscoveryDatagram): - self.sendDiscoveryResponse(addr) - elif isinstance(dgram, TLVDiscoveryRequestDatagram): - resonsedata = self.build_TLV_response(dgram.data) - self.sendTLVDiscoveryResponse(resonsedata, addr) - except Exception as exc: - LOGGER.exception(exc) - - def sendDiscoveryResponse(self, addr): - dgram = DiscoveryResponseDatagram(get_hostname(), 3483) - LOGGER.debug("Sending discovery response %r" % (dgram.packet,)) - self.transport.sendto(dgram.packet.encode(), addr) - - def sendTLVDiscoveryResponse(self, resonsedata, addr): - dgram = TLVDiscoveryResponseDatagram(resonsedata) - LOGGER.debug("Sending discovery response %r" % (dgram.packet,)) - self.transport.sendto(dgram.packet.encode(), addr) - diff --git a/music_assistant/playerproviders/squeezebox.py b/music_assistant/playerproviders/squeezebox.py new file mode 100644 index 00000000..bbe93941 --- /dev/null +++ b/music_assistant/playerproviders/squeezebox.py @@ -0,0 +1,797 @@ +#!/usr/bin/env python3 +# -*- coding:utf-8 -*- + +import asyncio +import os +import struct +from collections import OrderedDict +import time +import decimal +from typing import List +import random +import sys +import socket +from ..utils import run_periodic, LOGGER, parse_track_title, try_parse_int, get_ip, get_hostname +from ..models import PlayerProvider, Player, PlayerState, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist +from ..constants import CONF_ENABLED + + +def setup(mass): + ''' setup the provider''' + enabled = mass.config["playerproviders"]['squeezebox'].get(CONF_ENABLED) + if enabled: + provider = PySqueezeServer(mass) + return provider + return False + +def config_entries(): + ''' get the config entries for this provider (list with key/value pairs)''' + return [ + (CONF_ENABLED, True, CONF_ENABLED) + ] + + +class PySqueezeServer(PlayerProvider): + ''' Python implementation of SlimProto server ''' + + def __init__(self, mass): + super().__init__(mass) + self.prov_id = 'squeezebox' + self.name = 'Squeezebox' + self._squeeze_players = {} + self.buffer = b'' + self.last_msg_received = 0 + + # start slimproto server + mass.event_loop.create_task(asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483)) + # setup discovery + mass.event_loop.create_task(self.start_discovery()) + + ### Provider specific implementation ##### + + async def start_discovery(self): + transport, protocol = await self.mass.event_loop.create_datagram_endpoint( + lambda: DiscoveryProtocol(self.mass.web._http_port), + local_addr=('0.0.0.0', 3483)) + try: + while True: + await asyncio.sleep(60) # serve forever + finally: + transport.close() + + async def player_command(self, player_id, cmd:str, cmd_args=None): + ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) ''' + if cmd == 'play': + if self._players[player_id].state == PlayerState.Stopped: + await self.__queue_play(player_id, None) + else: + self._squeeze_players[player_id].unpause() + elif cmd == 'pause': + self._squeeze_players[player_id].pause() + elif cmd == 'stop': + self._squeeze_players[player_id].stop() + elif cmd == 'next': + self._squeeze_players[player_id].next() + elif cmd == 'previous': + await self.__queue_previous(player_id) + elif cmd == 'power' and cmd_args == 'off': + self._squeeze_players[player_id].power_off() + elif cmd == 'power': + self._squeeze_players[player_id].power_on() + elif cmd == 'volume': + self._squeeze_players[player_id].volume_set(try_parse_int(cmd_args)) + elif cmd == 'mute' and cmd_args == 'off': + self._squeeze_players[player_id].unmute() + elif cmd == 'mute': + self._squeeze_players[player_id].mute() + + async def play_media(self, player_id, media_items, queue_opt='play'): + ''' + play media on a player + ''' + player = await self.get_player(player_id) + cur_index = player.cur_queue_index + + if queue_opt == 'replace' or not player.queue: + # overwrite queue with new items + player.queue = media_items + await self.__queue_play(player_id, 0, send_flush=True) + elif queue_opt == 'play': + # replace current item with new item(s) + player.queue = player.queue[player_id][:cur_index] + media_items + player.queue[player_id][cur_index+1:] + await self.__queue_play(player_id, cur_index, send_flush=True) + elif queue_opt == 'next': + # insert new items at current index +1 + player.queue[player_id] = player.queue[player_id][:cur_index+1] + media_items + player.queue[player_id][cur_index+1:] + elif queue_opt == 'add': + # add new items at end of queue + player.queue[player_id] = player.queue[player_id] + media_items + + ### Provider specific (helper) methods ##### + + async def __queue_play(self, player_id, index, send_flush=False): + ''' send play command to player ''' + if not player_id in player.queue or not player_id in player.queue_index: + return + if not player.queue[player_id]: + return + if index == None: + index = player.queue_index[player_id] + if len(player.queue[player_id]) >= index: + track = player.queue[player_id][index] + if send_flush: + self._squeeze_players[player_id].flush() + self._squeeze_players[player_id].play(track.uri) + player.queue_index[player_id] = index + + async def __queue_next(self, player_id): + ''' request next track from queue ''' + if not player_id in player.queue or not player_id in player.queue: + return + cur_queue_index = player.queue_index[player_id] + if len(player.queue[player_id]) > cur_queue_index: + new_queue_index = cur_queue_index + 1 + elif self._players[player_id].repeat_enabled: + new_queue_index = 0 + else: + LOGGER.warning("next track requested but no more tracks in queue") + return + return await self.__queue_play(player_id, new_queue_index) + + async def __queue_previous(self, player_id): + ''' request previous track from queue ''' + if not player_id in player.queue: + return + cur_queue_index = player.queue_index[player_id] + if cur_queue_index == 0 and len(player.queue[player_id]) > 1: + new_queue_index = len(player.queue[player_id]) -1 + elif cur_queue_index == 0: + new_queue_index = cur_queue_index + else: + new_queue_index -= 1 + player.queue_index[player_id] = new_queue_index + return await self.__queue_play(player_id, new_queue_index) + + async def __handle_player_event(self, player_id, event, event_data=None): + ''' handle event from player ''' + if not player_id: + return + LOGGER.debug("Event from player %s: %s - event_data: %s" %(player_id, event, str(event_data))) + Squeeze_player = self._squeeze_players[player_id] + if event == "next_track": + return await self.__queue_next(player_id) + player + if not player_id in self._players: + player = MusicPlayer() + player.player_id = player_id + player.player_provider = self.prov_id + self._players[player_id] = player + if not player_id in player.queue: + player.queue[player_id] = [] + if not player_id in player.queue_index: + player.queue_index[player_id] = 0 + else: + player = self._players[player_id] + # update player properties + player.name = Squeeze_player.player_name + player.volume_level = Squeeze_player.volume_level + player.cur_time = Squeeze_player._elapsed_seconds + if event == "disconnected": + return await self.mass.player.remove_player(player_id) + elif event == "power": + player.powered = event_data + elif event == "state": + player.state = event_data + if player.queue[player_id]: + cur_queue_index = player.queue_index[player_id] + player.cur_item = player.queue[player_id][cur_queue_index] + # update player details + await self.mass.player.update_player(player) + + async def __handle_socket_client(self, reader, writer): + ''' handle a client connection on the socket''' + LOGGER.debug("new socket client connected") + Squeeze_player = PySqueezePlayer(stream_host, stream_port) + + def send_frame(command, data): + ''' send command to Squeeze player''' + packet = struct.pack('!H', len(data) + 4) + command + data + writer.write(packet) + + def handle_event(event, event_data=None): + ''' handle events from player''' + if event == "connected": + self._squeeze_players[Squeeze_player.player_id] = Squeeze_player + Squeeze_player.player_settings = self.mass.config['player_settings'][Squeeze_player.player_id] + asyncio.create_task(self.__handle_player_event(Squeeze_player.player_id, event, event_data)) + + try: + @run_periodic(5) + async def send_heartbeat(): + timestamp = int(time.time()) + data = Squeeze_player.pack_stream(b"t", replayGain=timestamp, flags=0) + Squeeze_player.send_frame(b"strm", data) + + Squeeze_player.send_frame = send_frame + Squeeze_player.send_event = handle_event + heartbeat_task = asyncio.create_task(send_heartbeat()) + + # keep reading bytes from the socket + while True: + data = await reader.read(64) + if data: + Squeeze_player.dataReceived(data) + else: + break + except Exception as exc: + # connection lost ? + LOGGER.warning(exc) + # disconnect + heartbeat_task.cancel() + asyncio.create_task(self.__handle_player_event(Squeeze_player.player_id, 'disconnected')) + + +class PySqueezePlayer(Player): + ''' Squeezebox socket client ''' + + def __init__(self, stream_host, stream_port): + self.buffer = b'' + #self.display = Display() + self.send_frame = None + self.send_event = None + self.stream_host = stream_host + self.stream_port = stream_port + self.player_settings = {} + self.playback_millis = 0 + self._volume = PySqueezeVolume() + self._device_type = None + self._mac_address = None + self._player_name = None + self._last_volume = 0 + self._last_heartbeat = 0 + self._elapsed_seconds = 0 + self._elapsed_milliseconds = 0 + + @property + def player_name(self): + if self._player_name: + return self._player_name + return "%s - %s" %(self._device_type, self._mac_address) + + @property + def player_id(self): + return self._mac_address + + @property + def volume_level(self): + return self._volume.volume + + def dataReceived(self, data): + self.buffer = self.buffer + data + if len(self.buffer) > 8: + operation, length = self.buffer[:4], self.buffer[4:8] + length = struct.unpack('!I', length)[0] + plen = length + 8 + if len(self.buffer) >= plen: + packet, self.buffer = self.buffer[8:plen], self.buffer[plen:] + operation = operation.strip(b"!").strip().decode() + #LOGGER.info("operation: %s" % operation) + handler = getattr(self, "process_%s" % operation, None) + if handler is None: + raise NotImplementedError + handler(packet) + + def send_version(self): + self.send_frame(b'vers', b'7.8') + + def pack_stream(self, command, autostart=b"1", formatbyte = b'o', pcmargs = (b'?',b'?',b'?',b'?'), threshold = 200, + spdif = b'0', transDuration = 0, transType = b'0', flags = 0x40, outputThreshold = 0, + replayGain=0, serverPort = 8095, serverIp = 0): + return struct.pack("!cccccccBcBcBBBLHL", + command, autostart, formatbyte, *pcmargs, + threshold, spdif, transDuration, transType, + flags, outputThreshold, 0, replayGain, serverPort, serverIp) + + def stop(self): + data = self.pack_stream(b"q", autostart=b"0", flags=0) + self.send_frame(b"strm", data) + + def flush(self): + data = self.pack_stream(b"f", autostart=b"0", flags=0) + self.send_frame(b"strm", data) + + def pause(self): + data = self.pack_stream(b"p", autostart=b"0", flags=0) + self.send_frame(b"strm", data) + LOGGER.info("Sending pause request") + + def unpause(self): + data = self.pack_stream(b"u", autostart=b"0", flags=0) + self.send_frame(b"strm", data) + LOGGER.info("Sending unpause request") + + def next(self): + data = self.pack_stream(b"f", autostart=b"0", flags=0) + self.send_frame(b"strm", data) + self.send_event("next_track") + + def previous(self): + data = self.pack_stream(b"f", autostart=b"0", flags=0) + self.send_frame(b"strm", data) + self.send_event("previous_track") + + def power_on(self): + self.send_frame(b"aude", struct.pack("2B", 1, 1)) + self.send_event("power", True) + + def power_off(self): + self.stop() + self.send_frame(b"aude", struct.pack("2B", 0, 0)) + self.send_event("power", False) + + def mute_on(self): + self.send_frame(b"aude", struct.pack("2B", 0, 0)) + self.send_event("mute", True) + + def mute_off(self): + self.send_frame(b"aude", struct.pack("2B", 1, 1)) + self.send_event("mute", False) + + def volume_up(self): + self._volume.increment() + self.send_volume() + + def volume_down(self): + self._volume.decrement() + self.send_volume() + + def volume_set(self, new_vol): + self._volume.volume = new_vol + self.send_volume() + + def play(self, uri): + enable_crossfade = self.player_settings["crossfade_duration"] > 0 + command = b's' + autostart = b'3' # we use direct stream for now so let the player do the messy work with buffers + transType= b'1' if enable_crossfade else b'0' + transDuration = self.player_settings["crossfade_duration"] + formatbyte = b'f' # fixed to flac + uri = '/stream' + uri.split('/stream')[1] + data = self.pack_stream(command, autostart=autostart, flags=0x00, formatbyte=formatbyte, transType=transType, transDuration=transDuration) + headers = "Connection: close\r\nAccept: */*\r\nHost: %s:%s\r\n" %(self.stream_host, self.stream_port) + request = "GET %s HTTP/1.0\r\n%s\r\n" % (uri, headers) + data = data + request.encode("utf-8") + self.send_frame(b'strm', data) + LOGGER.info("Requesting play from squeezebox" ) + + def displayTrack(self, track): + self.render("%s by %s" % (track.title, track.artist)) + + def process_HELO(self, data): + (devId, rev, mac) = struct.unpack('BB6s', data[:8]) + device_mac = ':'.join("%02x" % x for x in mac) + self._device_type = devices.get(devId, 'unknown device') + self._mac_address = str(device_mac).lower() + LOGGER.debug("HELO received from %s %s" % (self._mac_address, self._device_type)) + self.init_client() + + def init_client(self): + ''' initialize a new connected client ''' + self.send_event("connected") + self.send_version() + self.stop() + self.setBrightness() + #self.set_visualisation(SpectrumAnalyser()) + self.send_frame(b"setd", struct.pack("B", 0)) + self.send_frame(b"setd", struct.pack("B", 4)) + self.power_on() + self.volume_set(40) # TODO: remember last volume + + def send_volume(self): + og = self._volume.old_gain() + ng = self._volume.new_gain() + LOGGER.info("Volume set to %d (%d/%d)" % (self._volume.volume, og, ng)) + d = self.send_frame(b"audg", struct.pack("!LLBBLL", og, og, 1, 255, ng, ng)) + self.send_event("volume", self._volume.volume) + + def setBrightness(self, level=4): + assert 0 <= level <= 4 + self.send_frame(b"grfb", struct.pack("!H", level)) + + def set_visualisation(self, visualisation): + self.send_frame(b"visu", visualisation.pack()) + + def render(self, text): + #self.display.clear() + #self.display.renderText(text, "DejaVu-Sans", 16, (0,0)) + #self.updateDisplay(self.display.frame()) + pass + + def updateDisplay(self, bitmap, transition = 'c', offset=0, param=0): + frame = struct.pack("!Hcb", offset, transition, param) + bitmap + self.send_frame(b"grfe", frame) + + def process_STAT(self, data): + ev = data[:4] + if ev == b'\x00\x00\x00\x00': + LOGGER.info("Presumed informational stat message") + else: + handler = getattr(self, 'stat_%s' % ev.decode(), None) + if handler is None: + raise NotImplementedError("Stat message %r not known" % ev) + handler(data[4:]) + + def stat_aude(self, data): + (spdif_enable, dac_enable) = struct.unpack("2B", data[:4]) + powered = spdif_enable or dac_enable + self.send_event("power", powered) + LOGGER.debug("ACK aude - Received player power: %s" % powered) + + def stat_audg(self, data): + LOGGER.info("Received volume_level from player %s" % data) + self.send_event("volume", self._volume.volume) + + def stat_strm(self, data): + LOGGER.debug("ACK strm") + #self.send_frame(b"cont", b"0") + + def stat_STMc(self, data): + LOGGER.debug("Status Message: Connect") + + def stat_STMd(self, data): + LOGGER.debug("Decoder Ready for next track") + self.send_event("next_track") + + def stat_STMe(self, data): + LOGGER.info("Connection established") + + def stat_STMf(self, data): + LOGGER.info("Status Message: Connection closed") + self.send_event("state", PlayerState.Stopped) + + def stat_STMh(self, data): + LOGGER.info("Status Message: End of headers") + + def stat_STMn(self, data): + LOGGER.error("Decoder does not support file format") + + def stat_STMo(self, data): + ''' No more decoded (uncompressed) data to play; triggers rebuffering. ''' + LOGGER.debug("Output Underrun") + + def stat_STMp(self, data): + '''Pause confirmed''' + self.send_event("state", PlayerState.Paused) + + def stat_STMr(self, data): + '''Resume confirmed''' + self.send_event("state", PlayerState.Playing) + + def stat_STMs(self, data): + '''Playback of new track has started''' + self.send_event("state", PlayerState.Playing) + + def stat_STMt(self, data): + """ heartbeat from client """ + timestamp = time.time() + self._last_heartbeat = timestamp + (num_crlf, mas_initialized, mas_mode, rptr, wptr, + bytes_received_h, bytes_received_l, signal_strength, + jiffies, output_buffer_size, output_buffer_fullness, + elapsed_seconds, voltage, elapsed_milliseconds, + server_timestamp, error_code) = struct.unpack("!BBBLLLLHLLLLHLLH", data) + if elapsed_seconds != self._elapsed_seconds: + self.send_event("progress") + self._elapsed_seconds = elapsed_seconds + self._elapsed_milliseconds = elapsed_milliseconds + + def stat_STMu(self, data): + '''Normal end of playback''' + LOGGER.info("End of playback - Underrun") + self.send_event("state", PlayerState.Stopped) + + def process_BYE(self, data): + LOGGER.info("BYE received") + self.send_event("disconnected") + + def process_RESP(self, data): + LOGGER.info("RESP received") + self.send_frame(b"cont", b"0") + + def process_BODY(self, data): + LOGGER.info("BODY received") + + def process_META(self, data): + LOGGER.info("META received") + + def process_DSCO(self, data): + LOGGER.info("Data Stream Disconnected") + + def process_DBUG(self, data): + LOGGER.info("DBUG received") + + def process_IR(self, data): + """ Slightly involved codepath here. This raises an event, which may + be picked up by the service and then the process_remote_* function in + this player will be called. This is mostly relevant for volume changes + - most other button presses will require some context to operate. """ + (time, code) = struct.unpack("!IxxI", data) + LOGGER.info("IR code %s" % code) + # command = Remote.codes.get(code, None) + # if command is not None: + # LOGGER.info("IR received: %r, %r" % (code, command)) + # #self.service.evreactor.fireEvent(RemoteButtonPressed(self, command)) + # else: + # LOGGER.info("Unknown IR received: %r, %r" % (time, code)) + + def process_RAWI(self, data): + LOGGER.info("RAWI received") + + def process_ANIC(self, data): + LOGGER.info("ANIC received") + + def process_BUTN(self, data): + LOGGER.info("BUTN received") + + def process_KNOB(self, data): + ''' Transporter only, knob-related ''' + LOGGER.info("KNOB received") + + def process_SETD(self, data): + ''' Get/set player firmware settings ''' + LOGGER.debug("SETD received %s" % data) + cmd_id = data[0] + if cmd_id == 0: + # received player name + data = data[1:].decode() + self._player_name = data + self.send_event("name") + + def process_UREQ(self, data): + LOGGER.info("UREQ received") + + + +# from http://wiki.slimdevices.com/index.php/SlimProtoTCPProtocol#HELO +devices = { + 2: 'squeezebox', + 3: 'softsqueeze', + 4: 'squeezebox2', + 5: 'transporter', + 6: 'softsqueeze3', + 7: 'receiver', + 8: 'squeezeslave', + 9: 'controller', + 10: 'boom', + 11: 'softboom', + 12: 'squeezeplay', + } + + +class PySqueezeVolume(object): + + """ Represents a sound volume. This is an awful lot more complex than it + sounds. """ + + minimum = 0 + maximum = 100 + step = 1 + + # this map is taken from Slim::Player::Squeezebox2 in the squeezecenter source + # i don't know how much magic it contains, or any way I can test it + old_map = [ + 0, 1, 1, 1, 2, 2, 2, 3, 3, 4, + 5, 5, 6, 6, 7, 8, 9, 9, 10, 11, + 12, 13, 14, 15, 16, 16, 17, 18, 19, 20, + 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, + 33, 34, 35, 37, 38, 39, 40, 42, 43, 44, + 46, 47, 48, 50, 51, 53, 54, 56, 57, 59, + 60, 61, 63, 65, 66, 68, 69, 71, 72, 74, + 75, 77, 79, 80, 82, 84, 85, 87, 89, 90, + 92, 94, 96, 97, 99, 101, 103, 104, 106, 108, 110, + 112, 113, 115, 117, 119, 121, 123, 125, 127, 128 + ]; + + # new gain parameters, from the same place + total_volume_range = -50 # dB + step_point = -1 # Number of steps, up from the bottom, where a 2nd volume ramp kicks in. + step_fraction = 1 # fraction of totalVolumeRange where alternate volume ramp kicks in. + + def __init__(self): + self.volume = 50 + + def increment(self): + """ Increment the volume """ + self.volume += self.step + if self.volume > self.maximum: + self.volume = self.maximum + + def decrement(self): + """ Decrement the volume """ + self.volume -= self.step + if self.volume < self.minimum: + self.volume = self.minimum + + def old_gain(self): + """ Return the "Old" gain value as required by the squeezebox """ + return self.old_map[self.volume] + + def decibels(self): + """ Return the "new" gain value. """ + + step_db = self.total_volume_range * self.step_fraction + max_volume_db = 0 # different on the boom? + + # Equation for a line: + # y = mx+b + # y1 = mx1+b, y2 = mx2+b. + # y2-y1 = m(x2 - x1) + # y2 = m(x2 - x1) + y1 + slope_high = max_volume_db - step_db / (100.0 - self.step_point) + slope_low = step_db - self.total_volume_range / (self.step_point - 0.0) + x2 = self.volume + if (x2 > self.step_point): + m = slope_high + x1 = 100 + y1 = max_volume_db + else: + m = slope_low + x1 = 0 + y1 = self.total_volume_range + return m * (x2 - x1) + y1 + + def new_gain(self): + db = self.decibels() + floatmult = 10 ** (db/20.0) + # avoid rounding errors somehow + if -30 <= db <= 0: + return int(floatmult * (1 << 8) + 0.5) * (1<<8) + else: + return int((floatmult * (1<<16)) + 0.5) + + +##### UDP DISCOVERY STUFF ############# + +class Datagram(object): + + @classmethod + def decode(self, data): + if data[0] == 'e': + return TLVDiscoveryRequestDatagram(data) + elif data[0] == 'E': + return TLVDiscoveryResponseDatagram(data) + elif data[0] == 'd': + return ClientDiscoveryDatagram(data) + elif data[0] == 'h': + pass # Hello! + elif data[0] == 'i': + pass # IR + elif data[0] == '2': + pass # i2c? + elif data[0] == 'a': + pass # ack! + +class ClientDiscoveryDatagram(Datagram): + + device = None + firmware = None + client = None + + def __init__(self, data): + s = struct.unpack('!cxBB8x6B', data.encode()) + assert s[0] == 'd' + self.device = s[1] + self.firmware = hex(s[2]) + self.client = ":".join(["%02x" % (x,) for x in s[3:]]) + + def __repr__(self): + return "<%s device=%r firmware=%r client=%r>" % (self.__class__.__name__, self.device, self.firmware, self.client) + +class DiscoveryResponseDatagram(Datagram): + + def __init__(self, hostname, port): + hostname = hostname[:16].encode("UTF-8") + hostname += (16 - len(hostname)) * '\x00' + self.packet = struct.pack('!c16s', 'D', hostname).decode() + +class TLVDiscoveryRequestDatagram(Datagram): + + def __init__(self, data): + requestdata = OrderedDict() + assert data[0] == 'e' + idx = 1 + length = len(data)-5 + while idx <= length: + typ, l = struct.unpack_from("4sB", data.encode(), idx) + if l: + val = data[idx+5:idx+5+l] + idx += 5+l + else: + val = None + idx += 5 + typ = typ.decode() + requestdata[typ] = val + self.data = requestdata + + def __repr__(self): + return "<%s data=%r>" % (self.__class__.__name__, self.data.items()) + +class TLVDiscoveryResponseDatagram(Datagram): + + def __init__(self, responsedata): + parts = ['E'] # new discovery format + for typ, value in responsedata.items(): + if value is None: + value = '' + elif len(value) > 255: + LOGGER.warning("Response %s too long, truncating to 255 bytes" % typ) + value = value[:255] + parts.extend((typ, chr(len(value)), value)) + self.packet = ''.join(parts) + +class DiscoveryProtocol(): + + def __init__(self, web_port): + self.web_port = web_port + + def connection_made(self, transport): + self.transport = transport + # Allow receiving multicast broadcasts + sock = self.transport.get_extra_info('socket') + group = socket.inet_aton('239.255.255.250') + mreq = struct.pack('4sL', group, socket.INADDR_ANY) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + + def build_TLV_response(self, requestdata): + responsedata = OrderedDict() + for typ, value in requestdata.items(): + if typ == 'NAME': + # send full host name - no truncation + value = get_hostname() + elif typ == 'IPAD': + # send ipaddress as a string only if it is set + value = get_ip() + # :todo: IPv6 + if value == '0.0.0.0': + # do not send back an ip address + typ = None + elif typ == 'JSON': + # send port as a string + json_port = self.web_port + value = str(json_port) + elif typ == 'VERS': + # send server version + value = '7.9' + elif typ == 'UUID': + # send server uuid + value = 'musicassistant' + else: + LOGGER.debug('Unexpected information request: %r', typ) + typ = None + if typ: + responsedata[typ] = value + return responsedata + + def datagram_received(self, data, addr): + try: + data = data.decode() + dgram = Datagram.decode(data) + LOGGER.debug("Data received from %s: %s" % (addr, dgram)) + if isinstance(dgram, ClientDiscoveryDatagram): + self.sendDiscoveryResponse(addr) + elif isinstance(dgram, TLVDiscoveryRequestDatagram): + resonsedata = self.build_TLV_response(dgram.data) + self.sendTLVDiscoveryResponse(resonsedata, addr) + except Exception as exc: + LOGGER.exception(exc) + + def sendDiscoveryResponse(self, addr): + dgram = DiscoveryResponseDatagram(get_hostname(), 3483) + LOGGER.debug("Sending discovery response %r" % (dgram.packet,)) + self.transport.sendto(dgram.packet.encode(), addr) + + def sendTLVDiscoveryResponse(self, resonsedata, addr): + dgram = TLVDiscoveryResponseDatagram(resonsedata) + LOGGER.debug("Sending discovery response %r" % (dgram.packet,)) + self.transport.sendto(dgram.packet.encode(), addr) + diff --git a/music_assistant/utils.py b/music_assistant/utils.py index e40de73d..3713253a 100755 --- a/music_assistant/utils.py +++ b/music_assistant/utils.py @@ -6,13 +6,7 @@ import logging from concurrent.futures import ThreadPoolExecutor import socket import os -logformat = logging.Formatter('%(asctime)-15s %(levelname)-5s %(name)s.%(module)s -- %(message)s') -consolehandler = logging.StreamHandler() -consolehandler.setFormatter(logformat) -LOGGER = logging.getLogger(__package__) -LOGGER.setLevel(logging.INFO) -LOGGER.addHandler(consolehandler) - +LOGGER = logging.getLogger() def run_periodic(period): diff --git a/web/images/icons/pylms.png b/web/images/icons/pylms.png deleted file mode 100644 index 18531d79..00000000 Binary files a/web/images/icons/pylms.png and /dev/null differ diff --git a/web/images/icons/squeezebox.png b/web/images/icons/squeezebox.png new file mode 100644 index 00000000..18531d79 Binary files /dev/null and b/web/images/icons/squeezebox.png differ