From dbfb26a92c9f0f43e477cdefe5a568994db1783e Mon Sep 17 00:00:00 2001 From: marcelveldt Date: Sun, 13 Oct 2019 17:09:04 +0200 Subject: [PATCH] fix issues with queue streaming --- main.py | 4 +- music_assistant/__init__.py | 13 +- music_assistant/http_streamer.py | 184 +++++----- music_assistant/models/player.py | 2 +- music_assistant/models/player_queue.py | 4 +- music_assistant/playerproviders/chromecast.py | 6 + music_assistant/playerproviders/lms.py | 316 ------------------ .../{pylms.py => squeezebox.py} | 74 ++-- music_assistant/utils.py | 8 +- .../icons/{pylms.png => squeezebox.png} | Bin 10 files changed, 161 insertions(+), 450 deletions(-) delete mode 100644 music_assistant/playerproviders/lms.py rename music_assistant/playerproviders/{pylms.py => squeezebox.py} (92%) rename web/images/icons/{pylms.png => squeezebox.png} (100%) diff --git a/main.py b/main.py index 084bc50e..7cb98257 100755 --- a/main.py +++ b/main.py @@ -13,9 +13,9 @@ if __name__ == "__main__": else: datapath = os.path.dirname(os.path.abspath(__file__)) if len(sys.argv) > 2: - debug = sys.argv[2] + debug = sys.argv[2] == "debug" else: - debug = True + debug = False MusicAssistant(datapath, debug) \ No newline at end of file diff --git a/music_assistant/__init__.py b/music_assistant/__init__.py index 6de619a7..9d08c84b 100644 --- a/music_assistant/__init__.py +++ b/music_assistant/__init__.py @@ -13,6 +13,7 @@ import slugify as unicode_slug import uuid import json import time +import logging from .database import Database from .utils import run_periodic, LOGGER, try_parse_bool @@ -33,12 +34,22 @@ class MusicAssistant(): def __init__(self, datapath, debug=False): debug = try_parse_bool(debug) + logformat = logging.Formatter('%(asctime)-15s %(levelname)-5s %(name)s.%(module)s -- %(message)s') + consolehandler = logging.StreamHandler() + consolehandler.setFormatter(logformat) + LOGGER.addHandler(consolehandler) + if debug: + LOGGER.setLevel(logging.DEBUG) + logging.getLogger('aiosqlite').setLevel(logging.INFO) + logging.getLogger('asyncio').setLevel(logging.INFO) + else: + LOGGER.setLevel(logging.INFO) uvloop.install() self.datapath = datapath self.parse_config() self.event_loop = asyncio.get_event_loop() self.event_loop.set_debug(debug) - self.bg_executor = ThreadPoolExecutor() + self.bg_executor = ThreadPoolExecutor() self.event_loop.set_default_executor(self.bg_executor) #self.event_loop.set_exception_handler(handle_exception) self.event_listeners = {} diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index 1e181a27..f9cac55a 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -43,26 +43,27 @@ class HTTPStreamer(): # send content only on GET request if http_request.method.upper() != 'HEAD': # stream audio - queue = asyncio.Queue() + buf_queue = asyncio.Queue() cancelled = threading.Event() if queue_item: # single stream requested + #asyncio.create_task(self.__stream_single(player, queue_item, buf_queue, cancelled)) run_async_background_task( self.mass.bg_executor, - self.__stream_single, player, queue_item, queue, cancelled) + self.__stream_single, player, queue_item, buf_queue, cancelled) else: # no item is given, start queue stream run_async_background_task( self.mass.bg_executor, - self.__stream_queue, player, queue, cancelled) + self.__stream_queue, player, buf_queue, cancelled) try: while True: - chunk = await queue.get() + chunk = await buf_queue.get() if not chunk: - queue.task_done() + buf_queue.task_done() break await resp.write(chunk) - queue.task_done() + buf_queue.task_done() LOGGER.info("stream fininished for player %s" % player.name) except asyncio.CancelledError: cancelled.set() @@ -72,42 +73,45 @@ class HTTPStreamer(): async def __stream_single(self, player, queue_item, buffer, cancelled): ''' start streaming single track from provider ''' + LOGGER.info("stream single track started for track %s on player %s" % (queue_item.name, player.name)) try: audio_stream = self.__get_audio_stream(player, queue_item, cancelled) async for is_last_chunk, audio_chunk in audio_stream: asyncio.run_coroutine_threadsafe( buffer.put(audio_chunk), self.mass.event_loop) - # wait for the queue to consume the data - # this prevents that the entire track is sitting in memory - while buffer.qsize() > 2 and not cancelled.is_set(): - await asyncio.sleep(1) # indicate EOF if no more data asyncio.run_coroutine_threadsafe( buffer.put(b''), self.mass.event_loop) except (asyncio.CancelledError, asyncio.TimeoutError): cancelled.set() - LOGGER.info("stream_track interrupted for %s" % queue_item.name) + LOGGER.info("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name)) raise asyncio.CancelledError() else: - LOGGER.info("stream_track fininished for %s" % queue_item.name) + LOGGER.info("stream single track finished for track %s on player %s" % (queue_item.name, player.name)) async def __stream_queue(self, player, buffer, cancelled): ''' start streaming all queue tracks ''' - sample_rate = player.settings['max_sample_rate'] - fade_length = player.settings["crossfade_duration"] - fade_bytes = int(sample_rate * 4 * 2 * fade_length) + sample_rate = try_parse_int(player.settings['max_sample_rate']) + fade_length = try_parse_int(player.settings["crossfade_duration"]) + if not sample_rate or sample_rate < 44100 or sample_rate > 384000: + sample_rate = 96000 + if fade_length: + fade_bytes = int(sample_rate * 4 * 2 * fade_length) + else: + fade_bytes = int(sample_rate * 4 * 2) pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate - args = 'sox -t %s - -t flac -C 0 -' % pcm_args + args = 'sox -V3 -t %s - -t flac -C 0 -' % pcm_args sox_proc = await asyncio.create_subprocess_shell(args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) async def fill_buffer(): - while not sox_proc.stdout.at_eof(): - chunk = await sox_proc.stdout.read(256000) - if not chunk: - break + while not sox_proc.stdout.at_eof() and not sox_proc.returncode: + try: + chunk = await sox_proc.stdout.readexactly(256000) + except asyncio.streams.IncompleteReadError as err: + chunk = err.partial asyncio.run_coroutine_threadsafe( buffer.put(chunk), self.mass.event_loop) @@ -131,24 +135,27 @@ class HTTPStreamer(): else: queue_track = player.queue.next_item if not queue_track: - LOGGER.warning("no (more) tracks left in queue") + LOGGER.debug("no (more) tracks left in queue") break - LOGGER.info("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) + LOGGER.debug("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) fade_in_part = b'' cur_chunk = 0 prev_chunk = None bytes_written = 0 + # handle incoming audio chunks async for is_last_chunk, chunk in self.__get_audio_stream( player, queue_track, cancelled, chunksize=fade_bytes, resample=sample_rate): cur_chunk += 1 + + ### HANDLE FIRST PART OF TRACK if cur_chunk <= 2 and not last_fadeout_data: - # fade-in part but no fadeout_part available so just pass it to the output directly + # no fadeout_part available so just pass it to the output directly sox_proc.stdin.write(chunk) await sox_proc.stdin.drain() bytes_written += len(chunk) - print(chunk) elif cur_chunk == 1 and last_fadeout_data: prev_chunk = chunk + ### HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN elif cur_chunk == 2 and last_fadeout_data: # combine the first 2 chunks and strip off silence args = 'sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%' % (pcm_args, pcm_args) @@ -172,6 +179,7 @@ class HTTPStreamer(): bytes_written += len(remaining_bytes) del remaining_bytes prev_chunk = None # needed to prevent this chunk being sent again + ### HANDLE LAST PART OF TRACK elif prev_chunk and is_last_chunk: # last chunk received so create the fadeout_part with the previous chunk and this chunk # and strip off silence @@ -179,26 +187,35 @@ class HTTPStreamer(): process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) last_part, stderr = await process.communicate(prev_chunk + chunk) - if len(last_part) < fade_bytes: - # not enough data for crossfade duration after the strip action... - last_part = prev_chunk + chunk - if len(last_part) < fade_bytes: - # still not enough data so we'll skip the crossfading - LOGGER.warning("not enough data for fadeout so skip crossfade... %s" % len(last_part)) + if not player.queue.crossfade_enabled: + # crossfading is not enabled so just pass the (stripped) audio data sox_proc.stdin.write(last_part) bytes_written += len(last_part) await sox_proc.stdin.drain() del last_part else: - # store fade section to be picked up for next track - last_fadeout_data = last_part[-fade_bytes:] - remaining_bytes = last_part[:-fade_bytes] - # write remaining bytes - sox_proc.stdin.write(remaining_bytes) - bytes_written += len(remaining_bytes) - await sox_proc.stdin.drain() - del last_part - del remaining_bytes + # handle crossfading support + if len(last_part) < fade_bytes: + # not enough data for crossfade duration after the strip action... + last_part = prev_chunk + chunk + if len(last_part) < fade_bytes: + # still not enough data so we'll skip the crossfading + LOGGER.debug("not enough data for fadeout so skip crossfade... %s" % len(last_part)) + sox_proc.stdin.write(last_part) + bytes_written += len(last_part) + await sox_proc.stdin.drain() + del last_part + else: + # store fade section to be picked up for next track + last_fadeout_data = last_part[-fade_bytes:] + remaining_bytes = last_part[:-fade_bytes] + # write remaining bytes + sox_proc.stdin.write(remaining_bytes) + bytes_written += len(remaining_bytes) + await sox_proc.stdin.drain() + del last_part + del remaining_bytes + ### MIDDLE PARTS OF TRACK else: # middle part of the track # keep previous chunk in memory so we have enough samples to perform the crossfade @@ -223,22 +240,24 @@ class HTTPStreamer(): # WIP: update actual duration to the queue for more accurate now playing info accurate_duration = bytes_written / int(sample_rate * 4 * 2) queue_track.duration = accurate_duration - LOGGER.info("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) + LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration)) # end of queue reached, pass last fadeout bits to final output if last_fadeout_data and not cancelled.is_set(): sox_proc.stdin.write(last_fadeout_data) await sox_proc.stdin.drain() sox_proc.stdin.close() + sox_proc.terminate() await sox_proc.wait() LOGGER.info("streaming of queue for player %s completed" % player.name) async def __get_audio_stream(self, player, queue_item, cancelled, - chunksize=512000, resample=None): + chunksize=128000, resample=None): ''' get audio stream from provider and apply additional effects/processing where/if needed''' # get stream details from provider # sort by quality and check track availability - for prov_media in sorted(queue_item.provider_ids, key=operator.itemgetter('quality'), reverse=True): + for prov_media in sorted(queue_item.provider_ids, + key=operator.itemgetter('quality'), reverse=True): streamdetails = asyncio.run_coroutine_threadsafe( self.mass.music.providers[prov_media['provider']].get_stream_details(prov_media['item_id']), self.mass.event_loop).result() @@ -249,10 +268,9 @@ class HTTPStreamer(): queue_item.quality = prov_media['quality'] break if not streamdetails: - LOGGER.warning("no stream details!") + LOGGER.warning(f"no stream details for {queue_item.name}") yield (True, b'') return - print(streamdetails) # get sox effects and resample options sox_effects = await self.__get_player_sox_options(player, queue_item) outputfmt = 'flac -C 0' @@ -264,14 +282,15 @@ class HTTPStreamer(): # support for AAC created with ffmpeg in between args = 'ffmpeg -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_effects) elif streamdetails['type'] == 'url': - args = 'sox -V3 -t %s "%s" -t %s - %s' % (streamdetails["content_type"], + args = 'sox -t %s "%s" -t %s - %s' % (streamdetails["content_type"], streamdetails["path"], outputfmt, sox_effects) elif streamdetails['type'] == 'executable': - args = '%s | sox -V3 -t %s - -t %s - %s' % (streamdetails["path"], + args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, sox_effects) # start sox process process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE) + # fire event that streaming has started for this track (needed by some streaming providers) streamdetails["provider"] = queue_item.provider streamdetails["track_id"] = queue_item.item_id @@ -282,49 +301,43 @@ class HTTPStreamer(): # we keep 1 chunk behind to detect end of stream properly prev_chunk = b'' bytes_sent = 0 - first_chunk = False - while not process.stdout.at_eof(): + while not process.stdout.at_eof() and not process.returncode: + if cancelled.is_set(): + try: + process.terminate() + except ProcessLookupError: + pass try: chunk = await process.stdout.readexactly(chunksize) - except asyncio.streams.IncompleteReadError: - chunk = await process.stdout.read(chunksize) - if first_chunk: - print(len(chunk)) - if not chunk: - break + except asyncio.streams.IncompleteReadError as err: + chunk = err.partial if prev_chunk: yield (False, prev_chunk) bytes_sent += len(prev_chunk) prev_chunk = chunk # yield last chunk - if not cancelled.is_set(): - yield (True, prev_chunk) - bytes_sent += len(prev_chunk) + yield (True, prev_chunk) + bytes_sent += len(prev_chunk) await process.wait() if cancelled.is_set(): - try: - process.terminate() - except ProcessLookupError: - pass - LOGGER.debug("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent)) + LOGGER.info("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent)) else: - LOGGER.debug("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent)) + LOGGER.info("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent)) # fire event that streaming has ended for this track (needed by some streaming providers) if resample: bytes_per_second = resample * (32/8) * 2 + bytes_per_second = (resample * 32 * 2) / 8 + seconds_streamed = int(bytes_sent/bytes_per_second) else: - bytes_per_second = streamdetails["sample_rate"] * (streamdetails["bit_depth"]/8) * 2 - seconds_streamed = int(bytes_sent/bytes_per_second) + seconds_streamed = queue_item.duration streamdetails["seconds"] = seconds_streamed asyncio.run_coroutine_threadsafe( self.mass.signal_event('streaming_ended', streamdetails), self.mass.event_loop) # send task to background to analyse the audio - # TODO: send audio data completely - if not queue_item.media_type == MediaType.Radio: - asyncio.run_coroutine_threadsafe( - self.__analyze_audio(queue_item.item_id, queue_item.provider), - self.mass.event_loop) + asyncio.run_coroutine_threadsafe( + self.__analyze_audio(queue_item), + self.mass.event_loop) async def __get_player_sox_options(self, player, queue_item): ''' get player specific sox effect options ''' @@ -333,8 +346,9 @@ class HTTPStreamer(): if not player.supports_replay_gain and player.settings['volume_normalisation']: target_gain = int(player.settings['target_volume']) fallback_gain = int(player.settings['fallback_gain_correct']) - track_loudness = await self.mass.db.get_track_loudness( - queue_item.item_id, queue_item.provider) + track_loudness = asyncio.run_coroutine_threadsafe( + self.mass.db.get_track_loudness(queue_item.item_id, queue_item.provider), + self.mass.event_loop).result() if track_loudness == None: gain_correct = fallback_gain else: @@ -358,17 +372,21 @@ class HTTPStreamer(): sox_effects.append(player.settings['sox_effects']) return " ".join(sox_effects) - async def __analyze_audio(self, track_id, provider): + async def __analyze_audio(self, queue_item): ''' analyze track audio, for now we only calculate EBU R128 loudness ''' - track_key = '%s%s' %(track_id, provider) - if track_key in self.analyze_jobs: + if queue_item.media_type != MediaType.Track: + # TODO: calculate loudness average for web radio ? + return + item_key = '%s%s' %(queue_item.item_id, queue_item.provider) + if item_key in self.analyze_jobs: return # prevent multiple analyze jobs for same track - self.analyze_jobs[track_key] = True - streamdetails = await self.mass.music.providers[provider].get_stream_details(track_id) - track_loudness = await self.mass.db.get_track_loudness(track_id, provider) + self.analyze_jobs[item_key] = True + streamdetails = queue_item.stream_details + track_loudness = await self.mass.db.get_track_loudness( + queue_item.item_id, queue_item.provider) if track_loudness == None: # only when needed we do the analyze stuff - LOGGER.debug('Start analyzing track %s' % track_id) + LOGGER.debug('Start analyzing track %s' % item_key) if streamdetails['type'] == 'url': async with aiohttp.ClientSession() as session: async with session.get(streamdetails["path"]) as resp: @@ -384,11 +402,11 @@ class HTTPStreamer(): meter = pyln.Meter(rate) # create BS.1770 meter loudness = meter.integrated_loudness(data) # measure loudness del data - LOGGER.debug("Integrated loudness of track %s is: %s" %(track_id, loudness)) - await self.mass.db.set_track_loudness(track_id, provider, loudness) + LOGGER.debug("Integrated loudness of track %s is: %s" %(item_key, loudness)) + await self.mass.db.set_track_loudness(queue_item.item_id, queue_item.provider, loudness) del audio_data - LOGGER.debug('Finished analyzing track %s' % track_id) - self.analyze_jobs.pop(track_key, None) + LOGGER.debug('Finished analyzing track %s' % item_key) + self.analyze_jobs.pop(item_key, None) async def __crossfade_pcm_parts(self, fade_in_part, fade_out_part, pcm_args, fade_length): ''' crossfade two chunks of audio using sox ''' diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 320fcfd2..3216a18f 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -114,7 +114,7 @@ class Player(): self._player_settings = None # public attributes self.supports_queue = True # has native support for a queue - self.supports_gapless = True # has native gapless support + self.supports_gapless = False # has native gapless support self.supports_crossfade = False # has native crossfading support self.supports_replay_gain = False # has native support for replaygain volume leveling # if home assistant support is enabled, register state listener diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 89fddb11..d2b67567 100755 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -207,7 +207,7 @@ class PlayerQueue(): :param offset: offset from current queue position ''' insert_at_index = self.cur_index + offset - if not self.items or insert_at_index >= len(self.items): + if not self.items or insert_at_index > len(self.items): return await self.load(queue_items) if self.shuffle_enabled: queue_items = await self.__shuffle_items(queue_items) @@ -239,7 +239,7 @@ class PlayerQueue(): if self.items and len(self.items) > self._last_index: queue_index = self._last_index # holds the last starting position queue_track = None - while True: + while len(self.items) > queue_index: queue_track = self.items[queue_index] if cur_time_queue > (queue_track.duration + total_time): total_time += queue_track.duration diff --git a/music_assistant/playerproviders/chromecast.py b/music_assistant/playerproviders/chromecast.py index 14f16578..f301374b 100644 --- a/music_assistant/playerproviders/chromecast.py +++ b/music_assistant/playerproviders/chromecast.py @@ -4,6 +4,7 @@ import asyncio import aiohttp from typing import List +import logging import pychromecast from pychromecast.controllers.multizone import MultizoneController from pychromecast.controllers import BaseController @@ -181,6 +182,7 @@ class ChromecastProvider(PlayerProvider): self.prov_id = 'chromecast' self.name = 'Chromecast' self._discovery_running = False + logging.getLogger('pychromecast').setLevel(logging.WARNING) self.player_config_entries = [("gapless_enabled", False, "gapless_enabled")] self.mass.event_loop.create_task(self.__periodic_chromecast_discovery()) @@ -293,6 +295,10 @@ class ChromecastProvider(PlayerProvider): chromecast.media_controller.register_status_listener(listenerMedia) player = ChromecastPlayer(self.mass, player_id, self.prov_id) player.poll_task = False + self.supports_queue = True + self.supports_gapless = False + self.supports_crossfade = False + self.supports_replay_gain = False if chromecast.cast_type == 'group': player.is_group = True mz = MultizoneController(chromecast.uuid) diff --git a/music_assistant/playerproviders/lms.py b/music_assistant/playerproviders/lms.py deleted file mode 100644 index cc1b55fd..00000000 --- a/music_assistant/playerproviders/lms.py +++ /dev/null @@ -1,316 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding:utf-8 -*- - -import asyncio -import os -from typing import List -import random -import sys -import json -import aiohttp -import time -import datetime -import hashlib -from asyncio_throttle import Throttler -from aiocometd import Client, ConnectionType, Extension -import copy -import urllib - -from ..cache import use_cache -from ..utils import run_periodic, LOGGER, parse_track_title -from ..models import PlayerProvider, Player, PlayerState, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist -from ..constants import CONF_ENABLED, CONF_HOSTNAME, CONF_PORT - - -def setup(mass): - ''' setup the provider''' - enabled = mass.config["playerproviders"]['lms'].get(CONF_ENABLED) - hostname = mass.config["playerproviders"]['lms'].get(CONF_HOSTNAME) - port = mass.config["playerproviders"]['lms'].get(CONF_PORT) - if enabled and hostname and port: - provider = LMSProvider(mass, hostname, port) - return provider - return False - -def config_entries(): - ''' get the config entries for this provider (list with key/value pairs)''' - return [ - (CONF_ENABLED, False, CONF_ENABLED), - (CONF_HOSTNAME, 'localhost', CONF_HOSTNAME), - (CONF_PORT, 9000, CONF_PORT) - ] - -class LMSProvider(PlayerProvider): - ''' support for Logitech Media Server ''' - - def __init__(self, mass, hostname, port): - super().__init__(mass) - self.prov_id = 'lms' - self.name = 'Logitech Media Server' - self._host = hostname - self._port = port - self.last_msg_received = 0 - self.http_session = aiohttp.ClientSession(loop=mass.event_loop) - # we use a combi of active polling and subscriptions because the cometd implementation of LMS is somewhat unreliable - asyncio.ensure_future(self.__lms_events()) - asyncio.ensure_future(self.__get_players()) - - ### Provider specific implementation ##### - - - async def player_command(self, player_id, cmd:str, cmd_args=None): - ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) ''' - lms_commands = [] - if cmd == 'play': - lms_commands = ['play'] - elif cmd == 'pause': - lms_commands = ['pause', '1'] - elif cmd == 'stop': - lms_commands = ['stop'] - elif cmd == 'next': - lms_commands = ['playlist', 'index', '+1'] - elif cmd == 'previous': - lms_commands = ['playlist', 'index', '-1'] - elif cmd == 'stop': - lms_commands = ['playlist', 'stop'] - elif cmd == 'power' and cmd_args == 'off': - lms_commands = ['power', '0'] - elif cmd == 'power': - lms_commands = ['power', '1'] - elif cmd == 'volume': - lms_commands = ['mixer', 'volume', cmd_args] - elif cmd == 'mute' and cmd_args == 'off': - lms_commands = ['mixer', 'muting', '0'] - elif cmd == 'mute': - lms_commands = ['mixer', 'muting', '1'] - return await self.__get_data(lms_commands, player_id=player_id) - - async def play_media(self, player_id, media_items, queue_opt='play'): - ''' - play media on a player - ''' - if queue_opt == 'play': - cmd = ['playlist', 'insert', media_items[0].uri] - await self.__get_data(cmd, player_id=player_id) - cmd = ['playlist', 'index', '+1'] - await self.__get_data(cmd, player_id=player_id) - for track in media_items[1:]: - cmd = ['playlist', 'insert', track.uri] - await self.__get_data(cmd, player_id=player_id) - elif queue_opt == 'replace': - cmd = ['playlist', 'play', media_items[0].uri] - await self.__get_data(cmd, player_id=player_id) - for track in media_items[1:]: - cmd = ['playlist', 'add', track.uri] - await self.__get_data(cmd, player_id=player_id) - elif queue_opt == 'next': - for track in media_items: - cmd = ['playlist', 'insert', track.uri] - await self.__get_data(cmd, player_id=player_id) - else: - for track in media_items: - cmd = ['playlist', 'add', track.uri] - await self.__get_data(cmd, player_id=player_id) - - async def player_queue(self, player_id, offset=0, limit=50): - ''' return the items in the player's queue ''' - items = [] - player_details = await self.__get_data(["status", offset, limit, "tags:aAcCdegGijJKlostuxyRwk"], player_id=player_id) - if 'playlist_loop' in player_details: - for item in player_details['playlist_loop']: - track = await self.__parse_track(item) - items.append(track) - return items - - ### Provider specific (helper) methods ##### - - async def __get_players(self): - ''' update all players, used as fallback if cometd is failing and to detect removed players''' - server_info = await self.__get_data(['players', 0, 1000]) - player_ids = await self.__process_serverstatus(server_info) - for player_id in player_ids: - player_details = await self.__get_data(["status", "-","1", "tags:aAcCdegGijJKlostuxyRwk"], player_id=player_id) - await self.__process_player_details(player_id, player_details) - - async def __process_player_details(self, player_id, player_details): - ''' get state of a given player ''' - if player_id not in self._players: - return - player = self._players[player_id] - volume = player_details.get('mixer volume',0) - player.muted = volume < 0 - if volume >= 0: - player.volume_level = player_details.get('mixer volume',0) - player.shuffle_enabled = player_details.get('playlist shuffle',0) != 0 - player.repeat_enabled = player_details.get('playlist repeat',0) != 0 - # player state - if 'power' in player_details: - player.powered = player_details['power'] == 1 - else: - print(player_details) # DEBUG - if player_details['mode'] == 'play': - player.state = PlayerState.Playing - elif player_details['mode'] == 'pause': - player.state = PlayerState.Paused - else: - player.state = PlayerState.Stopped - # current track - if player_details.get('playlist_loop'): - player.cur_item = await self.__parse_track(player_details['playlist_loop'][0]) - player.cur_time = player_details.get('time',0) - else: - player.cur_item = None - player.cur_time = 0 - await self.mass.player.update_player(player) - - async def __process_serverstatus(self, server_status): - ''' process players from server state msg (players_loop) ''' - cur_player_ids = [] - for lms_player in server_status['players_loop']: - if lms_player['isplayer'] != 1: - continue - player_id = lms_player['playerid'] - cur_player_ids.append(player_id) - if not player_id in self._players: - # new player - self._players[player_id] = MusicPlayer() - player = self._players[player_id] - player.player_id = player_id - player.player_provider = self.prov_id - else: - # existing player - player = self._players[player_id] - # always update player details that may change - player.name = lms_player['name'] - if lms_player['model'] == "group": - player.is_group = True - # player is a groupplayer, retrieve childs - group_player_child_ids = await self.__get_group_childs(player_id) - for child_player_id in group_player_child_ids: - if child_player_id in self._players: - self._players[child_player_id].group_parent = player_id - elif player.group_parent: - # check if player parent is still correct - group_player_child_ids = await self.__get_group_childs(player.group_parent) - if not player_id in group_player_child_ids: - player.group_parent = None - # process update - await self.mass.player.update_player(player) - # process removed players... - for player_id, player in self._players.items(): - if player_id not in cur_player_ids: - await self.mass.player.remove_player(player_id) - return cur_player_ids - - async def __parse_track(self, track_details): - ''' parse track in LMS to our internal format ''' - track_url = track_details.get('url','') - if track_url.startswith('qobuz://') and 'qobuz' in self.mass.music.providers: - # qobuz track! - try: - track_id = track_url.replace('qobuz://','').replace('.flac','') - return await self.mass.music.providers['qobuz'].track(track_id) - except Exception as exc: - LOGGER.error(exc) - elif track_url.startswith('spotify://track:') and 'spotify' in self.mass.music.providers: - # spotify track! - try: - track_id = track_url.replace('spotify://track:','') - return await self.mass.music.providers['spotify'].track(track_id) - except Exception as exc: - LOGGER.error(exc) - elif track_url.startswith('http') and '/stream' in track_url: - params = urllib.parse.parse_qs(track_url.split('?')[1]) - track_id = params['track_id'][0] - provider = params['provider'][0] - return await self.mass.music.providers[provider].track(track_id) - # fallback to a generic track - track = Track() - track.name = track_details['title'] - track.duration = int(track_details['duration']) - if 'artwork_url' in track_details: - image = "http://%s:%s%s" % (self._host, self._port, track_details['artwork_url']) - track.metadata['image'] = image - return track - - async def __get_group_childs(self, group_player_id): - ''' get child players for groupplayer ''' - group_childs = [] - result = await self.__get_data('playergroup', player_id=group_player_id) - if result and 'players_loop' in result: - group_childs = [item['id'] for item in result['players_loop']] - return group_childs - - async def __lms_events(self): - # Receive events from LMS through CometD socket - while self.mass.event_loop.is_running(): - try: - last_msg_received = 0 - async with Client("http://%s:%s/cometd" % (self._host, self._port), - connection_types=ConnectionType.LONG_POLLING, - extensions=[LMSExtension()]) as client: - # subscribe - watched_players = [] - await client.subscribe("/slim/subscribe/serverstatus") - - # listen for incoming messages - async for message in client: - last_msg_received = int(time.time()) - if 'playerstatus' in message['channel']: - # player state - player_id = message['channel'].split('playerstatus/')[1] - asyncio.ensure_future(self.__process_player_details(player_id, message['data'])) - elif '/slim/serverstatus' in message['channel']: - # server state with all players - player_ids = await self.__process_serverstatus(message['data']) - for player_id in player_ids: - if player_id not in watched_players: - # subscribe to player change events - watched_players.append(player_id) - await client.subscribe("/slim/subscribe/playerstatus/%s" % player_id) - except Exception as exc: - LOGGER.exception(exc) - - async def __get_data(self, cmds:List, player_id=''): - ''' get data from api''' - if not isinstance(cmds, list): - cmds = [cmds] - cmd = [player_id, cmds] - url = "http://%s:%s/jsonrpc.js" % (self._host, self._port) - params = {"id": 1, "method": "slim.request", "params": cmd} - try: - async with self.http_session.post(url, json=params) as response: - result = await response.json() - return result['result'] - except Exception as exc: - LOGGER.exception('Error executing LMS command %s' % params) - return None - - -class LMSExtension(Extension): - ''' Extension for the custom cometd implementation of LMS''' - - async def incoming(self, payload, headers=None): - pass - - async def outgoing(self, payload, headers): - ''' override outgoing messages to fit LMS custom implementation''' - - # LMS does not need/want id for the connect and handshake message - if payload[0]['channel'] == '/meta/handshake' or payload[0]['channel'] == '/meta/connect': - del payload[0]['id'] - - # handle subscriptions - if 'subscribe' in payload[0]['channel']: - client_id = payload[0]['clientId'] - if payload[0]['subscription'] == '/slim/subscribe/serverstatus': - # append additional request data to the request - payload[0]['data'] = {'response':'/%s/slim/serverstatus' % client_id, - 'request':['', ['serverstatus', 0, 100, 'subscribe:60']]} - payload[0]['channel'] = '/slim/subscribe' - if payload[0]['subscription'].startswith('/slim/subscribe/playerstatus'): - # append additional request data to the request - player_id = payload[0]['subscription'].split('/')[-1] - payload[0]['data'] = {'response':'/%s/slim/playerstatus/%s' % (client_id, player_id), - 'request':[player_id, ["status", "-", 1, "tags:aAcCdegGijJKlostuxyRwk", "subscribe:60"]]} - payload[0]['channel'] = '/slim/subscribe' \ No newline at end of file diff --git a/music_assistant/playerproviders/pylms.py b/music_assistant/playerproviders/squeezebox.py similarity index 92% rename from music_assistant/playerproviders/pylms.py rename to music_assistant/playerproviders/squeezebox.py index 99cfb590..bbe93941 100644 --- a/music_assistant/playerproviders/pylms.py +++ b/music_assistant/playerproviders/squeezebox.py @@ -18,9 +18,9 @@ from ..constants import CONF_ENABLED def setup(mass): ''' setup the provider''' - enabled = mass.config["playerproviders"]['pylms'].get(CONF_ENABLED) + enabled = mass.config["playerproviders"]['squeezebox'].get(CONF_ENABLED) if enabled: - provider = PyLMSServer(mass) + provider = PySqueezeServer(mass) return provider return False @@ -31,14 +31,14 @@ def config_entries(): ] -class PyLMSServer(PlayerProvider): +class PySqueezeServer(PlayerProvider): ''' Python implementation of SlimProto server ''' def __init__(self, mass): super().__init__(mass) - self.prov_id = 'pylms' - self.name = 'Logitech Media Server Emulation' - self._lmsplayers = {} + self.prov_id = 'squeezebox' + self.name = 'Squeezebox' + self._squeeze_players = {} self.buffer = b'' self.last_msg_received = 0 @@ -65,25 +65,25 @@ class PyLMSServer(PlayerProvider): if self._players[player_id].state == PlayerState.Stopped: await self.__queue_play(player_id, None) else: - self._lmsplayers[player_id].unpause() + self._squeeze_players[player_id].unpause() elif cmd == 'pause': - self._lmsplayers[player_id].pause() + self._squeeze_players[player_id].pause() elif cmd == 'stop': - self._lmsplayers[player_id].stop() + self._squeeze_players[player_id].stop() elif cmd == 'next': - self._lmsplayers[player_id].next() + self._squeeze_players[player_id].next() elif cmd == 'previous': await self.__queue_previous(player_id) elif cmd == 'power' and cmd_args == 'off': - self._lmsplayers[player_id].power_off() + self._squeeze_players[player_id].power_off() elif cmd == 'power': - self._lmsplayers[player_id].power_on() + self._squeeze_players[player_id].power_on() elif cmd == 'volume': - self._lmsplayers[player_id].volume_set(try_parse_int(cmd_args)) + self._squeeze_players[player_id].volume_set(try_parse_int(cmd_args)) elif cmd == 'mute' and cmd_args == 'off': - self._lmsplayers[player_id].unmute() + self._squeeze_players[player_id].unmute() elif cmd == 'mute': - self._lmsplayers[player_id].mute() + self._squeeze_players[player_id].mute() async def play_media(self, player_id, media_items, queue_opt='play'): ''' @@ -120,8 +120,8 @@ class PyLMSServer(PlayerProvider): if len(player.queue[player_id]) >= index: track = player.queue[player_id][index] if send_flush: - self._lmsplayers[player_id].flush() - self._lmsplayers[player_id].play(track.uri) + self._squeeze_players[player_id].flush() + self._squeeze_players[player_id].play(track.uri) player.queue_index[player_id] = index async def __queue_next(self, player_id): @@ -157,7 +157,7 @@ class PyLMSServer(PlayerProvider): if not player_id: return LOGGER.debug("Event from player %s: %s - event_data: %s" %(player_id, event, str(event_data))) - lms_player = self._lmsplayers[player_id] + Squeeze_player = self._squeeze_players[player_id] if event == "next_track": return await self.__queue_next(player_id) player @@ -173,9 +173,9 @@ class PyLMSServer(PlayerProvider): else: player = self._players[player_id] # update player properties - player.name = lms_player.player_name - player.volume_level = lms_player.volume_level - player.cur_time = lms_player._elapsed_seconds + player.name = Squeeze_player.player_name + player.volume_level = Squeeze_player.volume_level + player.cur_time = Squeeze_player._elapsed_seconds if event == "disconnected": return await self.mass.player.remove_player(player_id) elif event == "power": @@ -191,38 +191,36 @@ class PyLMSServer(PlayerProvider): async def __handle_socket_client(self, reader, writer): ''' handle a client connection on the socket''' LOGGER.debug("new socket client connected") - stream_host = get_ip() - stream_port = self.mass.config['base']['web']['http_port'] - lms_player = PyLMSPlayer(stream_host, stream_port) + Squeeze_player = PySqueezePlayer(stream_host, stream_port) def send_frame(command, data): - ''' send command to lms player''' + ''' send command to Squeeze player''' packet = struct.pack('!H', len(data) + 4) + command + data writer.write(packet) def handle_event(event, event_data=None): ''' handle events from player''' if event == "connected": - self._lmsplayers[lms_player.player_id] = lms_player - lms_player.player_settings = self.mass.config['player_settings'][lms_player.player_id] - asyncio.create_task(self.__handle_player_event(lms_player.player_id, event, event_data)) + self._squeeze_players[Squeeze_player.player_id] = Squeeze_player + Squeeze_player.player_settings = self.mass.config['player_settings'][Squeeze_player.player_id] + asyncio.create_task(self.__handle_player_event(Squeeze_player.player_id, event, event_data)) try: @run_periodic(5) async def send_heartbeat(): timestamp = int(time.time()) - data = lms_player.pack_stream(b"t", replayGain=timestamp, flags=0) - lms_player.send_frame(b"strm", data) + data = Squeeze_player.pack_stream(b"t", replayGain=timestamp, flags=0) + Squeeze_player.send_frame(b"strm", data) - lms_player.send_frame = send_frame - lms_player.send_event = handle_event + Squeeze_player.send_frame = send_frame + Squeeze_player.send_event = handle_event heartbeat_task = asyncio.create_task(send_heartbeat()) # keep reading bytes from the socket while True: data = await reader.read(64) if data: - lms_player.dataReceived(data) + Squeeze_player.dataReceived(data) else: break except Exception as exc: @@ -230,11 +228,11 @@ class PyLMSServer(PlayerProvider): LOGGER.warning(exc) # disconnect heartbeat_task.cancel() - asyncio.create_task(self.__handle_player_event(lms_player.player_id, 'disconnected')) + asyncio.create_task(self.__handle_player_event(Squeeze_player.player_id, 'disconnected')) -class PyLMSPlayer(object): - ''' very basic Python implementation of SlimProto ''' +class PySqueezePlayer(Player): + ''' Squeezebox socket client ''' def __init__(self, stream_host, stream_port): self.buffer = b'' @@ -245,7 +243,7 @@ class PyLMSPlayer(object): self.stream_port = stream_port self.player_settings = {} self.playback_millis = 0 - self._volume = PyLMSVolume() + self._volume = PySqueezeVolume() self._device_type = None self._mac_address = None self._player_name = None @@ -570,7 +568,7 @@ devices = { } -class PyLMSVolume(object): +class PySqueezeVolume(object): """ Represents a sound volume. This is an awful lot more complex than it sounds. """ diff --git a/music_assistant/utils.py b/music_assistant/utils.py index e40de73d..3713253a 100755 --- a/music_assistant/utils.py +++ b/music_assistant/utils.py @@ -6,13 +6,7 @@ import logging from concurrent.futures import ThreadPoolExecutor import socket import os -logformat = logging.Formatter('%(asctime)-15s %(levelname)-5s %(name)s.%(module)s -- %(message)s') -consolehandler = logging.StreamHandler() -consolehandler.setFormatter(logformat) -LOGGER = logging.getLogger(__package__) -LOGGER.setLevel(logging.INFO) -LOGGER.addHandler(consolehandler) - +LOGGER = logging.getLogger() def run_periodic(period): diff --git a/web/images/icons/pylms.png b/web/images/icons/squeezebox.png similarity index 100% rename from web/images/icons/pylms.png rename to web/images/icons/squeezebox.png -- 2.34.1