else:
datapath = os.path.dirname(os.path.abspath(__file__))
if len(sys.argv) > 2:
- debug = sys.argv[2]
+ debug = sys.argv[2] == "debug"
else:
- debug = True
+ debug = False
MusicAssistant(datapath, debug)
\ No newline at end of file
import uuid
import json
import time
+import logging
from .database import Database
from .utils import run_periodic, LOGGER, try_parse_bool
def __init__(self, datapath, debug=False):
debug = try_parse_bool(debug)
+ logformat = logging.Formatter('%(asctime)-15s %(levelname)-5s %(name)s.%(module)s -- %(message)s')
+ consolehandler = logging.StreamHandler()
+ consolehandler.setFormatter(logformat)
+ LOGGER.addHandler(consolehandler)
+ if debug:
+ LOGGER.setLevel(logging.DEBUG)
+ logging.getLogger('aiosqlite').setLevel(logging.INFO)
+ logging.getLogger('asyncio').setLevel(logging.INFO)
+ else:
+ LOGGER.setLevel(logging.INFO)
uvloop.install()
self.datapath = datapath
self.parse_config()
self.event_loop = asyncio.get_event_loop()
self.event_loop.set_debug(debug)
- self.bg_executor = ThreadPoolExecutor()
+ self.bg_executor = ThreadPoolExecutor()
self.event_loop.set_default_executor(self.bg_executor)
#self.event_loop.set_exception_handler(handle_exception)
self.event_listeners = {}
# send content only on GET request
if http_request.method.upper() != 'HEAD':
# stream audio
- queue = asyncio.Queue()
+ buf_queue = asyncio.Queue()
cancelled = threading.Event()
if queue_item:
# single stream requested
+ #asyncio.create_task(self.__stream_single(player, queue_item, buf_queue, cancelled))
run_async_background_task(
self.mass.bg_executor,
- self.__stream_single, player, queue_item, queue, cancelled)
+ self.__stream_single, player, queue_item, buf_queue, cancelled)
else:
# no item is given, start queue stream
run_async_background_task(
self.mass.bg_executor,
- self.__stream_queue, player, queue, cancelled)
+ self.__stream_queue, player, buf_queue, cancelled)
try:
while True:
- chunk = await queue.get()
+ chunk = await buf_queue.get()
if not chunk:
- queue.task_done()
+ buf_queue.task_done()
break
await resp.write(chunk)
- queue.task_done()
+ buf_queue.task_done()
LOGGER.info("stream fininished for player %s" % player.name)
except asyncio.CancelledError:
cancelled.set()
async def __stream_single(self, player, queue_item, buffer, cancelled):
''' start streaming single track from provider '''
+ LOGGER.info("stream single track started for track %s on player %s" % (queue_item.name, player.name))
try:
audio_stream = self.__get_audio_stream(player, queue_item, cancelled)
async for is_last_chunk, audio_chunk in audio_stream:
asyncio.run_coroutine_threadsafe(
buffer.put(audio_chunk),
self.mass.event_loop)
- # wait for the queue to consume the data
- # this prevents that the entire track is sitting in memory
- while buffer.qsize() > 2 and not cancelled.is_set():
- await asyncio.sleep(1)
# indicate EOF if no more data
asyncio.run_coroutine_threadsafe(
buffer.put(b''),
self.mass.event_loop)
except (asyncio.CancelledError, asyncio.TimeoutError):
cancelled.set()
- LOGGER.info("stream_track interrupted for %s" % queue_item.name)
+ LOGGER.info("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name))
raise asyncio.CancelledError()
else:
- LOGGER.info("stream_track fininished for %s" % queue_item.name)
+ LOGGER.info("stream single track finished for track %s on player %s" % (queue_item.name, player.name))
async def __stream_queue(self, player, buffer, cancelled):
''' start streaming all queue tracks '''
- sample_rate = player.settings['max_sample_rate']
- fade_length = player.settings["crossfade_duration"]
- fade_bytes = int(sample_rate * 4 * 2 * fade_length)
+ sample_rate = try_parse_int(player.settings['max_sample_rate'])
+ fade_length = try_parse_int(player.settings["crossfade_duration"])
+ if not sample_rate or sample_rate < 44100 or sample_rate > 384000:
+ sample_rate = 96000
+ if fade_length:
+ fade_bytes = int(sample_rate * 4 * 2 * fade_length)
+ else:
+ fade_bytes = int(sample_rate * 4 * 2)
pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
- args = 'sox -t %s - -t flac -C 0 -' % pcm_args
+ args = 'sox -V3 -t %s - -t flac -C 0 -' % pcm_args
sox_proc = await asyncio.create_subprocess_shell(args,
stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
async def fill_buffer():
- while not sox_proc.stdout.at_eof():
- chunk = await sox_proc.stdout.read(256000)
- if not chunk:
- break
+ while not sox_proc.stdout.at_eof() and not sox_proc.returncode:
+ try:
+ chunk = await sox_proc.stdout.readexactly(256000)
+ except asyncio.streams.IncompleteReadError as err:
+ chunk = err.partial
asyncio.run_coroutine_threadsafe(
buffer.put(chunk),
self.mass.event_loop)
else:
queue_track = player.queue.next_item
if not queue_track:
- LOGGER.warning("no (more) tracks left in queue")
+ LOGGER.debug("no (more) tracks left in queue")
break
- LOGGER.info("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
+ LOGGER.debug("Start Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
fade_in_part = b''
cur_chunk = 0
prev_chunk = None
bytes_written = 0
+ # handle incoming audio chunks
async for is_last_chunk, chunk in self.__get_audio_stream(
player, queue_track, cancelled, chunksize=fade_bytes, resample=sample_rate):
cur_chunk += 1
+
+ ### HANDLE FIRST PART OF TRACK
if cur_chunk <= 2 and not last_fadeout_data:
- # fade-in part but no fadeout_part available so just pass it to the output directly
+ # no fadeout_part available so just pass it to the output directly
sox_proc.stdin.write(chunk)
await sox_proc.stdin.drain()
bytes_written += len(chunk)
- print(chunk)
elif cur_chunk == 1 and last_fadeout_data:
prev_chunk = chunk
+ ### HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN
elif cur_chunk == 2 and last_fadeout_data:
# combine the first 2 chunks and strip off silence
args = 'sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%' % (pcm_args, pcm_args)
bytes_written += len(remaining_bytes)
del remaining_bytes
prev_chunk = None # needed to prevent this chunk being sent again
+ ### HANDLE LAST PART OF TRACK
elif prev_chunk and is_last_chunk:
# last chunk received so create the fadeout_part with the previous chunk and this chunk
# and strip off silence
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
last_part, stderr = await process.communicate(prev_chunk + chunk)
- if len(last_part) < fade_bytes:
- # not enough data for crossfade duration after the strip action...
- last_part = prev_chunk + chunk
- if len(last_part) < fade_bytes:
- # still not enough data so we'll skip the crossfading
- LOGGER.warning("not enough data for fadeout so skip crossfade... %s" % len(last_part))
+ if not player.queue.crossfade_enabled:
+ # crossfading is not enabled so just pass the (stripped) audio data
sox_proc.stdin.write(last_part)
bytes_written += len(last_part)
await sox_proc.stdin.drain()
del last_part
else:
- # store fade section to be picked up for next track
- last_fadeout_data = last_part[-fade_bytes:]
- remaining_bytes = last_part[:-fade_bytes]
- # write remaining bytes
- sox_proc.stdin.write(remaining_bytes)
- bytes_written += len(remaining_bytes)
- await sox_proc.stdin.drain()
- del last_part
- del remaining_bytes
+ # handle crossfading support
+ if len(last_part) < fade_bytes:
+ # not enough data for crossfade duration after the strip action...
+ last_part = prev_chunk + chunk
+ if len(last_part) < fade_bytes:
+ # still not enough data so we'll skip the crossfading
+ LOGGER.debug("not enough data for fadeout so skip crossfade... %s" % len(last_part))
+ sox_proc.stdin.write(last_part)
+ bytes_written += len(last_part)
+ await sox_proc.stdin.drain()
+ del last_part
+ else:
+ # store fade section to be picked up for next track
+ last_fadeout_data = last_part[-fade_bytes:]
+ remaining_bytes = last_part[:-fade_bytes]
+ # write remaining bytes
+ sox_proc.stdin.write(remaining_bytes)
+ bytes_written += len(remaining_bytes)
+ await sox_proc.stdin.drain()
+ del last_part
+ del remaining_bytes
+ ### MIDDLE PARTS OF TRACK
else:
# middle part of the track
# keep previous chunk in memory so we have enough samples to perform the crossfade
# WIP: update actual duration to the queue for more accurate now playing info
accurate_duration = bytes_written / int(sample_rate * 4 * 2)
queue_track.duration = accurate_duration
- LOGGER.info("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
+ LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration))
# end of queue reached, pass last fadeout bits to final output
if last_fadeout_data and not cancelled.is_set():
sox_proc.stdin.write(last_fadeout_data)
await sox_proc.stdin.drain()
sox_proc.stdin.close()
+ sox_proc.terminate()
await sox_proc.wait()
LOGGER.info("streaming of queue for player %s completed" % player.name)
async def __get_audio_stream(self, player, queue_item, cancelled,
- chunksize=512000, resample=None):
+ chunksize=128000, resample=None):
''' get audio stream from provider and apply additional effects/processing where/if needed'''
# get stream details from provider
# sort by quality and check track availability
- for prov_media in sorted(queue_item.provider_ids, key=operator.itemgetter('quality'), reverse=True):
+ for prov_media in sorted(queue_item.provider_ids,
+ key=operator.itemgetter('quality'), reverse=True):
streamdetails = asyncio.run_coroutine_threadsafe(
self.mass.music.providers[prov_media['provider']].get_stream_details(prov_media['item_id']),
self.mass.event_loop).result()
queue_item.quality = prov_media['quality']
break
if not streamdetails:
- LOGGER.warning("no stream details!")
+ LOGGER.warning(f"no stream details for {queue_item.name}")
yield (True, b'')
return
- print(streamdetails)
# get sox effects and resample options
sox_effects = await self.__get_player_sox_options(player, queue_item)
outputfmt = 'flac -C 0'
# support for AAC created with ffmpeg in between
args = 'ffmpeg -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_effects)
elif streamdetails['type'] == 'url':
- args = 'sox -V3 -t %s "%s" -t %s - %s' % (streamdetails["content_type"],
+ args = 'sox -t %s "%s" -t %s - %s' % (streamdetails["content_type"],
streamdetails["path"], outputfmt, sox_effects)
elif streamdetails['type'] == 'executable':
- args = '%s | sox -V3 -t %s - -t %s - %s' % (streamdetails["path"],
+ args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"],
streamdetails["content_type"], outputfmt, sox_effects)
# start sox process
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE)
+
# fire event that streaming has started for this track (needed by some streaming providers)
streamdetails["provider"] = queue_item.provider
streamdetails["track_id"] = queue_item.item_id
# we keep 1 chunk behind to detect end of stream properly
prev_chunk = b''
bytes_sent = 0
- first_chunk = False
- while not process.stdout.at_eof():
+ while not process.stdout.at_eof() and not process.returncode:
+ if cancelled.is_set():
+ try:
+ process.terminate()
+ except ProcessLookupError:
+ pass
try:
chunk = await process.stdout.readexactly(chunksize)
- except asyncio.streams.IncompleteReadError:
- chunk = await process.stdout.read(chunksize)
- if first_chunk:
- print(len(chunk))
- if not chunk:
- break
+ except asyncio.streams.IncompleteReadError as err:
+ chunk = err.partial
if prev_chunk:
yield (False, prev_chunk)
bytes_sent += len(prev_chunk)
prev_chunk = chunk
# yield last chunk
- if not cancelled.is_set():
- yield (True, prev_chunk)
- bytes_sent += len(prev_chunk)
+ yield (True, prev_chunk)
+ bytes_sent += len(prev_chunk)
await process.wait()
if cancelled.is_set():
- try:
- process.terminate()
- except ProcessLookupError:
- pass
- LOGGER.debug("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent))
+ LOGGER.info("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent))
else:
- LOGGER.debug("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent))
+ LOGGER.info("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent))
# fire event that streaming has ended for this track (needed by some streaming providers)
if resample:
bytes_per_second = resample * (32/8) * 2
+ bytes_per_second = (resample * 32 * 2) / 8
+ seconds_streamed = int(bytes_sent/bytes_per_second)
else:
- bytes_per_second = streamdetails["sample_rate"] * (streamdetails["bit_depth"]/8) * 2
- seconds_streamed = int(bytes_sent/bytes_per_second)
+ seconds_streamed = queue_item.duration
streamdetails["seconds"] = seconds_streamed
asyncio.run_coroutine_threadsafe(
self.mass.signal_event('streaming_ended', streamdetails),
self.mass.event_loop)
# send task to background to analyse the audio
- # TODO: send audio data completely
- if not queue_item.media_type == MediaType.Radio:
- asyncio.run_coroutine_threadsafe(
- self.__analyze_audio(queue_item.item_id, queue_item.provider),
- self.mass.event_loop)
+ asyncio.run_coroutine_threadsafe(
+ self.__analyze_audio(queue_item),
+ self.mass.event_loop)
async def __get_player_sox_options(self, player, queue_item):
''' get player specific sox effect options '''
if not player.supports_replay_gain and player.settings['volume_normalisation']:
target_gain = int(player.settings['target_volume'])
fallback_gain = int(player.settings['fallback_gain_correct'])
- track_loudness = await self.mass.db.get_track_loudness(
- queue_item.item_id, queue_item.provider)
+ track_loudness = asyncio.run_coroutine_threadsafe(
+ self.mass.db.get_track_loudness(queue_item.item_id, queue_item.provider),
+ self.mass.event_loop).result()
if track_loudness == None:
gain_correct = fallback_gain
else:
sox_effects.append(player.settings['sox_effects'])
return " ".join(sox_effects)
- async def __analyze_audio(self, track_id, provider):
+ async def __analyze_audio(self, queue_item):
''' analyze track audio, for now we only calculate EBU R128 loudness '''
- track_key = '%s%s' %(track_id, provider)
- if track_key in self.analyze_jobs:
+ if queue_item.media_type != MediaType.Track:
+ # TODO: calculate loudness average for web radio ?
+ return
+ item_key = '%s%s' %(queue_item.item_id, queue_item.provider)
+ if item_key in self.analyze_jobs:
return # prevent multiple analyze jobs for same track
- self.analyze_jobs[track_key] = True
- streamdetails = await self.mass.music.providers[provider].get_stream_details(track_id)
- track_loudness = await self.mass.db.get_track_loudness(track_id, provider)
+ self.analyze_jobs[item_key] = True
+ streamdetails = queue_item.stream_details
+ track_loudness = await self.mass.db.get_track_loudness(
+ queue_item.item_id, queue_item.provider)
if track_loudness == None:
# only when needed we do the analyze stuff
- LOGGER.debug('Start analyzing track %s' % track_id)
+ LOGGER.debug('Start analyzing track %s' % item_key)
if streamdetails['type'] == 'url':
async with aiohttp.ClientSession() as session:
async with session.get(streamdetails["path"]) as resp:
meter = pyln.Meter(rate) # create BS.1770 meter
loudness = meter.integrated_loudness(data) # measure loudness
del data
- LOGGER.debug("Integrated loudness of track %s is: %s" %(track_id, loudness))
- await self.mass.db.set_track_loudness(track_id, provider, loudness)
+ LOGGER.debug("Integrated loudness of track %s is: %s" %(item_key, loudness))
+ await self.mass.db.set_track_loudness(queue_item.item_id, queue_item.provider, loudness)
del audio_data
- LOGGER.debug('Finished analyzing track %s' % track_id)
- self.analyze_jobs.pop(track_key, None)
+ LOGGER.debug('Finished analyzing track %s' % item_key)
+ self.analyze_jobs.pop(item_key, None)
async def __crossfade_pcm_parts(self, fade_in_part, fade_out_part, pcm_args, fade_length):
''' crossfade two chunks of audio using sox '''
self._player_settings = None
# public attributes
self.supports_queue = True # has native support for a queue
- self.supports_gapless = True # has native gapless support
+ self.supports_gapless = False # has native gapless support
self.supports_crossfade = False # has native crossfading support
self.supports_replay_gain = False # has native support for replaygain volume leveling
# if home assistant support is enabled, register state listener
:param offset: offset from current queue position
'''
insert_at_index = self.cur_index + offset
- if not self.items or insert_at_index >= len(self.items):
+ if not self.items or insert_at_index > len(self.items):
return await self.load(queue_items)
if self.shuffle_enabled:
queue_items = await self.__shuffle_items(queue_items)
if self.items and len(self.items) > self._last_index:
queue_index = self._last_index # holds the last starting position
queue_track = None
- while True:
+ while len(self.items) > queue_index:
queue_track = self.items[queue_index]
if cur_time_queue > (queue_track.duration + total_time):
total_time += queue_track.duration
import asyncio
import aiohttp
from typing import List
+import logging
import pychromecast
from pychromecast.controllers.multizone import MultizoneController
from pychromecast.controllers import BaseController
self.prov_id = 'chromecast'
self.name = 'Chromecast'
self._discovery_running = False
+ logging.getLogger('pychromecast').setLevel(logging.WARNING)
self.player_config_entries = [("gapless_enabled", False, "gapless_enabled")]
self.mass.event_loop.create_task(self.__periodic_chromecast_discovery())
chromecast.media_controller.register_status_listener(listenerMedia)
player = ChromecastPlayer(self.mass, player_id, self.prov_id)
player.poll_task = False
+ self.supports_queue = True
+ self.supports_gapless = False
+ self.supports_crossfade = False
+ self.supports_replay_gain = False
if chromecast.cast_type == 'group':
player.is_group = True
mz = MultizoneController(chromecast.uuid)
+++ /dev/null
-#!/usr/bin/env python3
-# -*- coding:utf-8 -*-
-
-import asyncio
-import os
-from typing import List
-import random
-import sys
-import json
-import aiohttp
-import time
-import datetime
-import hashlib
-from asyncio_throttle import Throttler
-from aiocometd import Client, ConnectionType, Extension
-import copy
-import urllib
-
-from ..cache import use_cache
-from ..utils import run_periodic, LOGGER, parse_track_title
-from ..models import PlayerProvider, Player, PlayerState, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist
-from ..constants import CONF_ENABLED, CONF_HOSTNAME, CONF_PORT
-
-
-def setup(mass):
- ''' setup the provider'''
- enabled = mass.config["playerproviders"]['lms'].get(CONF_ENABLED)
- hostname = mass.config["playerproviders"]['lms'].get(CONF_HOSTNAME)
- port = mass.config["playerproviders"]['lms'].get(CONF_PORT)
- if enabled and hostname and port:
- provider = LMSProvider(mass, hostname, port)
- return provider
- return False
-
-def config_entries():
- ''' get the config entries for this provider (list with key/value pairs)'''
- return [
- (CONF_ENABLED, False, CONF_ENABLED),
- (CONF_HOSTNAME, 'localhost', CONF_HOSTNAME),
- (CONF_PORT, 9000, CONF_PORT)
- ]
-
-class LMSProvider(PlayerProvider):
- ''' support for Logitech Media Server '''
-
- def __init__(self, mass, hostname, port):
- super().__init__(mass)
- self.prov_id = 'lms'
- self.name = 'Logitech Media Server'
- self._host = hostname
- self._port = port
- self.last_msg_received = 0
- self.http_session = aiohttp.ClientSession(loop=mass.event_loop)
- # we use a combi of active polling and subscriptions because the cometd implementation of LMS is somewhat unreliable
- asyncio.ensure_future(self.__lms_events())
- asyncio.ensure_future(self.__get_players())
-
- ### Provider specific implementation #####
-
-
- async def player_command(self, player_id, cmd:str, cmd_args=None):
- ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
- lms_commands = []
- if cmd == 'play':
- lms_commands = ['play']
- elif cmd == 'pause':
- lms_commands = ['pause', '1']
- elif cmd == 'stop':
- lms_commands = ['stop']
- elif cmd == 'next':
- lms_commands = ['playlist', 'index', '+1']
- elif cmd == 'previous':
- lms_commands = ['playlist', 'index', '-1']
- elif cmd == 'stop':
- lms_commands = ['playlist', 'stop']
- elif cmd == 'power' and cmd_args == 'off':
- lms_commands = ['power', '0']
- elif cmd == 'power':
- lms_commands = ['power', '1']
- elif cmd == 'volume':
- lms_commands = ['mixer', 'volume', cmd_args]
- elif cmd == 'mute' and cmd_args == 'off':
- lms_commands = ['mixer', 'muting', '0']
- elif cmd == 'mute':
- lms_commands = ['mixer', 'muting', '1']
- return await self.__get_data(lms_commands, player_id=player_id)
-
- async def play_media(self, player_id, media_items, queue_opt='play'):
- '''
- play media on a player
- '''
- if queue_opt == 'play':
- cmd = ['playlist', 'insert', media_items[0].uri]
- await self.__get_data(cmd, player_id=player_id)
- cmd = ['playlist', 'index', '+1']
- await self.__get_data(cmd, player_id=player_id)
- for track in media_items[1:]:
- cmd = ['playlist', 'insert', track.uri]
- await self.__get_data(cmd, player_id=player_id)
- elif queue_opt == 'replace':
- cmd = ['playlist', 'play', media_items[0].uri]
- await self.__get_data(cmd, player_id=player_id)
- for track in media_items[1:]:
- cmd = ['playlist', 'add', track.uri]
- await self.__get_data(cmd, player_id=player_id)
- elif queue_opt == 'next':
- for track in media_items:
- cmd = ['playlist', 'insert', track.uri]
- await self.__get_data(cmd, player_id=player_id)
- else:
- for track in media_items:
- cmd = ['playlist', 'add', track.uri]
- await self.__get_data(cmd, player_id=player_id)
-
- async def player_queue(self, player_id, offset=0, limit=50):
- ''' return the items in the player's queue '''
- items = []
- player_details = await self.__get_data(["status", offset, limit, "tags:aAcCdegGijJKlostuxyRwk"], player_id=player_id)
- if 'playlist_loop' in player_details:
- for item in player_details['playlist_loop']:
- track = await self.__parse_track(item)
- items.append(track)
- return items
-
- ### Provider specific (helper) methods #####
-
- async def __get_players(self):
- ''' update all players, used as fallback if cometd is failing and to detect removed players'''
- server_info = await self.__get_data(['players', 0, 1000])
- player_ids = await self.__process_serverstatus(server_info)
- for player_id in player_ids:
- player_details = await self.__get_data(["status", "-","1", "tags:aAcCdegGijJKlostuxyRwk"], player_id=player_id)
- await self.__process_player_details(player_id, player_details)
-
- async def __process_player_details(self, player_id, player_details):
- ''' get state of a given player '''
- if player_id not in self._players:
- return
- player = self._players[player_id]
- volume = player_details.get('mixer volume',0)
- player.muted = volume < 0
- if volume >= 0:
- player.volume_level = player_details.get('mixer volume',0)
- player.shuffle_enabled = player_details.get('playlist shuffle',0) != 0
- player.repeat_enabled = player_details.get('playlist repeat',0) != 0
- # player state
- if 'power' in player_details:
- player.powered = player_details['power'] == 1
- else:
- print(player_details) # DEBUG
- if player_details['mode'] == 'play':
- player.state = PlayerState.Playing
- elif player_details['mode'] == 'pause':
- player.state = PlayerState.Paused
- else:
- player.state = PlayerState.Stopped
- # current track
- if player_details.get('playlist_loop'):
- player.cur_item = await self.__parse_track(player_details['playlist_loop'][0])
- player.cur_time = player_details.get('time',0)
- else:
- player.cur_item = None
- player.cur_time = 0
- await self.mass.player.update_player(player)
-
- async def __process_serverstatus(self, server_status):
- ''' process players from server state msg (players_loop) '''
- cur_player_ids = []
- for lms_player in server_status['players_loop']:
- if lms_player['isplayer'] != 1:
- continue
- player_id = lms_player['playerid']
- cur_player_ids.append(player_id)
- if not player_id in self._players:
- # new player
- self._players[player_id] = MusicPlayer()
- player = self._players[player_id]
- player.player_id = player_id
- player.player_provider = self.prov_id
- else:
- # existing player
- player = self._players[player_id]
- # always update player details that may change
- player.name = lms_player['name']
- if lms_player['model'] == "group":
- player.is_group = True
- # player is a groupplayer, retrieve childs
- group_player_child_ids = await self.__get_group_childs(player_id)
- for child_player_id in group_player_child_ids:
- if child_player_id in self._players:
- self._players[child_player_id].group_parent = player_id
- elif player.group_parent:
- # check if player parent is still correct
- group_player_child_ids = await self.__get_group_childs(player.group_parent)
- if not player_id in group_player_child_ids:
- player.group_parent = None
- # process update
- await self.mass.player.update_player(player)
- # process removed players...
- for player_id, player in self._players.items():
- if player_id not in cur_player_ids:
- await self.mass.player.remove_player(player_id)
- return cur_player_ids
-
- async def __parse_track(self, track_details):
- ''' parse track in LMS to our internal format '''
- track_url = track_details.get('url','')
- if track_url.startswith('qobuz://') and 'qobuz' in self.mass.music.providers:
- # qobuz track!
- try:
- track_id = track_url.replace('qobuz://','').replace('.flac','')
- return await self.mass.music.providers['qobuz'].track(track_id)
- except Exception as exc:
- LOGGER.error(exc)
- elif track_url.startswith('spotify://track:') and 'spotify' in self.mass.music.providers:
- # spotify track!
- try:
- track_id = track_url.replace('spotify://track:','')
- return await self.mass.music.providers['spotify'].track(track_id)
- except Exception as exc:
- LOGGER.error(exc)
- elif track_url.startswith('http') and '/stream' in track_url:
- params = urllib.parse.parse_qs(track_url.split('?')[1])
- track_id = params['track_id'][0]
- provider = params['provider'][0]
- return await self.mass.music.providers[provider].track(track_id)
- # fallback to a generic track
- track = Track()
- track.name = track_details['title']
- track.duration = int(track_details['duration'])
- if 'artwork_url' in track_details:
- image = "http://%s:%s%s" % (self._host, self._port, track_details['artwork_url'])
- track.metadata['image'] = image
- return track
-
- async def __get_group_childs(self, group_player_id):
- ''' get child players for groupplayer '''
- group_childs = []
- result = await self.__get_data('playergroup', player_id=group_player_id)
- if result and 'players_loop' in result:
- group_childs = [item['id'] for item in result['players_loop']]
- return group_childs
-
- async def __lms_events(self):
- # Receive events from LMS through CometD socket
- while self.mass.event_loop.is_running():
- try:
- last_msg_received = 0
- async with Client("http://%s:%s/cometd" % (self._host, self._port),
- connection_types=ConnectionType.LONG_POLLING,
- extensions=[LMSExtension()]) as client:
- # subscribe
- watched_players = []
- await client.subscribe("/slim/subscribe/serverstatus")
-
- # listen for incoming messages
- async for message in client:
- last_msg_received = int(time.time())
- if 'playerstatus' in message['channel']:
- # player state
- player_id = message['channel'].split('playerstatus/')[1]
- asyncio.ensure_future(self.__process_player_details(player_id, message['data']))
- elif '/slim/serverstatus' in message['channel']:
- # server state with all players
- player_ids = await self.__process_serverstatus(message['data'])
- for player_id in player_ids:
- if player_id not in watched_players:
- # subscribe to player change events
- watched_players.append(player_id)
- await client.subscribe("/slim/subscribe/playerstatus/%s" % player_id)
- except Exception as exc:
- LOGGER.exception(exc)
-
- async def __get_data(self, cmds:List, player_id=''):
- ''' get data from api'''
- if not isinstance(cmds, list):
- cmds = [cmds]
- cmd = [player_id, cmds]
- url = "http://%s:%s/jsonrpc.js" % (self._host, self._port)
- params = {"id": 1, "method": "slim.request", "params": cmd}
- try:
- async with self.http_session.post(url, json=params) as response:
- result = await response.json()
- return result['result']
- except Exception as exc:
- LOGGER.exception('Error executing LMS command %s' % params)
- return None
-
-
-class LMSExtension(Extension):
- ''' Extension for the custom cometd implementation of LMS'''
-
- async def incoming(self, payload, headers=None):
- pass
-
- async def outgoing(self, payload, headers):
- ''' override outgoing messages to fit LMS custom implementation'''
-
- # LMS does not need/want id for the connect and handshake message
- if payload[0]['channel'] == '/meta/handshake' or payload[0]['channel'] == '/meta/connect':
- del payload[0]['id']
-
- # handle subscriptions
- if 'subscribe' in payload[0]['channel']:
- client_id = payload[0]['clientId']
- if payload[0]['subscription'] == '/slim/subscribe/serverstatus':
- # append additional request data to the request
- payload[0]['data'] = {'response':'/%s/slim/serverstatus' % client_id,
- 'request':['', ['serverstatus', 0, 100, 'subscribe:60']]}
- payload[0]['channel'] = '/slim/subscribe'
- if payload[0]['subscription'].startswith('/slim/subscribe/playerstatus'):
- # append additional request data to the request
- player_id = payload[0]['subscription'].split('/')[-1]
- payload[0]['data'] = {'response':'/%s/slim/playerstatus/%s' % (client_id, player_id),
- 'request':[player_id, ["status", "-", 1, "tags:aAcCdegGijJKlostuxyRwk", "subscribe:60"]]}
- payload[0]['channel'] = '/slim/subscribe'
\ No newline at end of file
+++ /dev/null
-#!/usr/bin/env python3
-# -*- coding:utf-8 -*-
-
-import asyncio
-import os
-import struct
-from collections import OrderedDict
-import time
-import decimal
-from typing import List
-import random
-import sys
-import socket
-from ..utils import run_periodic, LOGGER, parse_track_title, try_parse_int, get_ip, get_hostname
-from ..models import PlayerProvider, Player, PlayerState, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist
-from ..constants import CONF_ENABLED
-
-
-def setup(mass):
- ''' setup the provider'''
- enabled = mass.config["playerproviders"]['pylms'].get(CONF_ENABLED)
- if enabled:
- provider = PyLMSServer(mass)
- return provider
- return False
-
-def config_entries():
- ''' get the config entries for this provider (list with key/value pairs)'''
- return [
- (CONF_ENABLED, True, CONF_ENABLED)
- ]
-
-
-class PyLMSServer(PlayerProvider):
- ''' Python implementation of SlimProto server '''
-
- def __init__(self, mass):
- super().__init__(mass)
- self.prov_id = 'pylms'
- self.name = 'Logitech Media Server Emulation'
- self._lmsplayers = {}
- self.buffer = b''
- self.last_msg_received = 0
-
- # start slimproto server
- mass.event_loop.create_task(asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483))
- # setup discovery
- mass.event_loop.create_task(self.start_discovery())
-
- ### Provider specific implementation #####
-
- async def start_discovery(self):
- transport, protocol = await self.mass.event_loop.create_datagram_endpoint(
- lambda: DiscoveryProtocol(self.mass.web._http_port),
- local_addr=('0.0.0.0', 3483))
- try:
- while True:
- await asyncio.sleep(60) # serve forever
- finally:
- transport.close()
-
- async def player_command(self, player_id, cmd:str, cmd_args=None):
- ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
- if cmd == 'play':
- if self._players[player_id].state == PlayerState.Stopped:
- await self.__queue_play(player_id, None)
- else:
- self._lmsplayers[player_id].unpause()
- elif cmd == 'pause':
- self._lmsplayers[player_id].pause()
- elif cmd == 'stop':
- self._lmsplayers[player_id].stop()
- elif cmd == 'next':
- self._lmsplayers[player_id].next()
- elif cmd == 'previous':
- await self.__queue_previous(player_id)
- elif cmd == 'power' and cmd_args == 'off':
- self._lmsplayers[player_id].power_off()
- elif cmd == 'power':
- self._lmsplayers[player_id].power_on()
- elif cmd == 'volume':
- self._lmsplayers[player_id].volume_set(try_parse_int(cmd_args))
- elif cmd == 'mute' and cmd_args == 'off':
- self._lmsplayers[player_id].unmute()
- elif cmd == 'mute':
- self._lmsplayers[player_id].mute()
-
- async def play_media(self, player_id, media_items, queue_opt='play'):
- '''
- play media on a player
- '''
- player = await self.get_player(player_id)
- cur_index = player.cur_queue_index
-
- if queue_opt == 'replace' or not player.queue:
- # overwrite queue with new items
- player.queue = media_items
- await self.__queue_play(player_id, 0, send_flush=True)
- elif queue_opt == 'play':
- # replace current item with new item(s)
- player.queue = player.queue[player_id][:cur_index] + media_items + player.queue[player_id][cur_index+1:]
- await self.__queue_play(player_id, cur_index, send_flush=True)
- elif queue_opt == 'next':
- # insert new items at current index +1
- player.queue[player_id] = player.queue[player_id][:cur_index+1] + media_items + player.queue[player_id][cur_index+1:]
- elif queue_opt == 'add':
- # add new items at end of queue
- player.queue[player_id] = player.queue[player_id] + media_items
-
- ### Provider specific (helper) methods #####
-
- async def __queue_play(self, player_id, index, send_flush=False):
- ''' send play command to player '''
- if not player_id in player.queue or not player_id in player.queue_index:
- return
- if not player.queue[player_id]:
- return
- if index == None:
- index = player.queue_index[player_id]
- if len(player.queue[player_id]) >= index:
- track = player.queue[player_id][index]
- if send_flush:
- self._lmsplayers[player_id].flush()
- self._lmsplayers[player_id].play(track.uri)
- player.queue_index[player_id] = index
-
- async def __queue_next(self, player_id):
- ''' request next track from queue '''
- if not player_id in player.queue or not player_id in player.queue:
- return
- cur_queue_index = player.queue_index[player_id]
- if len(player.queue[player_id]) > cur_queue_index:
- new_queue_index = cur_queue_index + 1
- elif self._players[player_id].repeat_enabled:
- new_queue_index = 0
- else:
- LOGGER.warning("next track requested but no more tracks in queue")
- return
- return await self.__queue_play(player_id, new_queue_index)
-
- async def __queue_previous(self, player_id):
- ''' request previous track from queue '''
- if not player_id in player.queue:
- return
- cur_queue_index = player.queue_index[player_id]
- if cur_queue_index == 0 and len(player.queue[player_id]) > 1:
- new_queue_index = len(player.queue[player_id]) -1
- elif cur_queue_index == 0:
- new_queue_index = cur_queue_index
- else:
- new_queue_index -= 1
- player.queue_index[player_id] = new_queue_index
- return await self.__queue_play(player_id, new_queue_index)
-
- async def __handle_player_event(self, player_id, event, event_data=None):
- ''' handle event from player '''
- if not player_id:
- return
- LOGGER.debug("Event from player %s: %s - event_data: %s" %(player_id, event, str(event_data)))
- lms_player = self._lmsplayers[player_id]
- if event == "next_track":
- return await self.__queue_next(player_id)
- player
- if not player_id in self._players:
- player = MusicPlayer()
- player.player_id = player_id
- player.player_provider = self.prov_id
- self._players[player_id] = player
- if not player_id in player.queue:
- player.queue[player_id] = []
- if not player_id in player.queue_index:
- player.queue_index[player_id] = 0
- else:
- player = self._players[player_id]
- # update player properties
- player.name = lms_player.player_name
- player.volume_level = lms_player.volume_level
- player.cur_time = lms_player._elapsed_seconds
- if event == "disconnected":
- return await self.mass.player.remove_player(player_id)
- elif event == "power":
- player.powered = event_data
- elif event == "state":
- player.state = event_data
- if player.queue[player_id]:
- cur_queue_index = player.queue_index[player_id]
- player.cur_item = player.queue[player_id][cur_queue_index]
- # update player details
- await self.mass.player.update_player(player)
-
- async def __handle_socket_client(self, reader, writer):
- ''' handle a client connection on the socket'''
- LOGGER.debug("new socket client connected")
- stream_host = get_ip()
- stream_port = self.mass.config['base']['web']['http_port']
- lms_player = PyLMSPlayer(stream_host, stream_port)
-
- def send_frame(command, data):
- ''' send command to lms player'''
- packet = struct.pack('!H', len(data) + 4) + command + data
- writer.write(packet)
-
- def handle_event(event, event_data=None):
- ''' handle events from player'''
- if event == "connected":
- self._lmsplayers[lms_player.player_id] = lms_player
- lms_player.player_settings = self.mass.config['player_settings'][lms_player.player_id]
- asyncio.create_task(self.__handle_player_event(lms_player.player_id, event, event_data))
-
- try:
- @run_periodic(5)
- async def send_heartbeat():
- timestamp = int(time.time())
- data = lms_player.pack_stream(b"t", replayGain=timestamp, flags=0)
- lms_player.send_frame(b"strm", data)
-
- lms_player.send_frame = send_frame
- lms_player.send_event = handle_event
- heartbeat_task = asyncio.create_task(send_heartbeat())
-
- # keep reading bytes from the socket
- while True:
- data = await reader.read(64)
- if data:
- lms_player.dataReceived(data)
- else:
- break
- except Exception as exc:
- # connection lost ?
- LOGGER.warning(exc)
- # disconnect
- heartbeat_task.cancel()
- asyncio.create_task(self.__handle_player_event(lms_player.player_id, 'disconnected'))
-
-
-class PyLMSPlayer(object):
- ''' very basic Python implementation of SlimProto '''
-
- def __init__(self, stream_host, stream_port):
- self.buffer = b''
- #self.display = Display()
- self.send_frame = None
- self.send_event = None
- self.stream_host = stream_host
- self.stream_port = stream_port
- self.player_settings = {}
- self.playback_millis = 0
- self._volume = PyLMSVolume()
- self._device_type = None
- self._mac_address = None
- self._player_name = None
- self._last_volume = 0
- self._last_heartbeat = 0
- self._elapsed_seconds = 0
- self._elapsed_milliseconds = 0
-
- @property
- def player_name(self):
- if self._player_name:
- return self._player_name
- return "%s - %s" %(self._device_type, self._mac_address)
-
- @property
- def player_id(self):
- return self._mac_address
-
- @property
- def volume_level(self):
- return self._volume.volume
-
- def dataReceived(self, data):
- self.buffer = self.buffer + data
- if len(self.buffer) > 8:
- operation, length = self.buffer[:4], self.buffer[4:8]
- length = struct.unpack('!I', length)[0]
- plen = length + 8
- if len(self.buffer) >= plen:
- packet, self.buffer = self.buffer[8:plen], self.buffer[plen:]
- operation = operation.strip(b"!").strip().decode()
- #LOGGER.info("operation: %s" % operation)
- handler = getattr(self, "process_%s" % operation, None)
- if handler is None:
- raise NotImplementedError
- handler(packet)
-
- def send_version(self):
- self.send_frame(b'vers', b'7.8')
-
- def pack_stream(self, command, autostart=b"1", formatbyte = b'o', pcmargs = (b'?',b'?',b'?',b'?'), threshold = 200,
- spdif = b'0', transDuration = 0, transType = b'0', flags = 0x40, outputThreshold = 0,
- replayGain=0, serverPort = 8095, serverIp = 0):
- return struct.pack("!cccccccBcBcBBBLHL",
- command, autostart, formatbyte, *pcmargs,
- threshold, spdif, transDuration, transType,
- flags, outputThreshold, 0, replayGain, serverPort, serverIp)
-
- def stop(self):
- data = self.pack_stream(b"q", autostart=b"0", flags=0)
- self.send_frame(b"strm", data)
-
- def flush(self):
- data = self.pack_stream(b"f", autostart=b"0", flags=0)
- self.send_frame(b"strm", data)
-
- def pause(self):
- data = self.pack_stream(b"p", autostart=b"0", flags=0)
- self.send_frame(b"strm", data)
- LOGGER.info("Sending pause request")
-
- def unpause(self):
- data = self.pack_stream(b"u", autostart=b"0", flags=0)
- self.send_frame(b"strm", data)
- LOGGER.info("Sending unpause request")
-
- def next(self):
- data = self.pack_stream(b"f", autostart=b"0", flags=0)
- self.send_frame(b"strm", data)
- self.send_event("next_track")
-
- def previous(self):
- data = self.pack_stream(b"f", autostart=b"0", flags=0)
- self.send_frame(b"strm", data)
- self.send_event("previous_track")
-
- def power_on(self):
- self.send_frame(b"aude", struct.pack("2B", 1, 1))
- self.send_event("power", True)
-
- def power_off(self):
- self.stop()
- self.send_frame(b"aude", struct.pack("2B", 0, 0))
- self.send_event("power", False)
-
- def mute_on(self):
- self.send_frame(b"aude", struct.pack("2B", 0, 0))
- self.send_event("mute", True)
-
- def mute_off(self):
- self.send_frame(b"aude", struct.pack("2B", 1, 1))
- self.send_event("mute", False)
-
- def volume_up(self):
- self._volume.increment()
- self.send_volume()
-
- def volume_down(self):
- self._volume.decrement()
- self.send_volume()
-
- def volume_set(self, new_vol):
- self._volume.volume = new_vol
- self.send_volume()
-
- def play(self, uri):
- enable_crossfade = self.player_settings["crossfade_duration"] > 0
- command = b's'
- autostart = b'3' # we use direct stream for now so let the player do the messy work with buffers
- transType= b'1' if enable_crossfade else b'0'
- transDuration = self.player_settings["crossfade_duration"]
- formatbyte = b'f' # fixed to flac
- uri = '/stream' + uri.split('/stream')[1]
- data = self.pack_stream(command, autostart=autostart, flags=0x00, formatbyte=formatbyte, transType=transType, transDuration=transDuration)
- headers = "Connection: close\r\nAccept: */*\r\nHost: %s:%s\r\n" %(self.stream_host, self.stream_port)
- request = "GET %s HTTP/1.0\r\n%s\r\n" % (uri, headers)
- data = data + request.encode("utf-8")
- self.send_frame(b'strm', data)
- LOGGER.info("Requesting play from squeezebox" )
-
- def displayTrack(self, track):
- self.render("%s by %s" % (track.title, track.artist))
-
- def process_HELO(self, data):
- (devId, rev, mac) = struct.unpack('BB6s', data[:8])
- device_mac = ':'.join("%02x" % x for x in mac)
- self._device_type = devices.get(devId, 'unknown device')
- self._mac_address = str(device_mac).lower()
- LOGGER.debug("HELO received from %s %s" % (self._mac_address, self._device_type))
- self.init_client()
-
- def init_client(self):
- ''' initialize a new connected client '''
- self.send_event("connected")
- self.send_version()
- self.stop()
- self.setBrightness()
- #self.set_visualisation(SpectrumAnalyser())
- self.send_frame(b"setd", struct.pack("B", 0))
- self.send_frame(b"setd", struct.pack("B", 4))
- self.power_on()
- self.volume_set(40) # TODO: remember last volume
-
- def send_volume(self):
- og = self._volume.old_gain()
- ng = self._volume.new_gain()
- LOGGER.info("Volume set to %d (%d/%d)" % (self._volume.volume, og, ng))
- d = self.send_frame(b"audg", struct.pack("!LLBBLL", og, og, 1, 255, ng, ng))
- self.send_event("volume", self._volume.volume)
-
- def setBrightness(self, level=4):
- assert 0 <= level <= 4
- self.send_frame(b"grfb", struct.pack("!H", level))
-
- def set_visualisation(self, visualisation):
- self.send_frame(b"visu", visualisation.pack())
-
- def render(self, text):
- #self.display.clear()
- #self.display.renderText(text, "DejaVu-Sans", 16, (0,0))
- #self.updateDisplay(self.display.frame())
- pass
-
- def updateDisplay(self, bitmap, transition = 'c', offset=0, param=0):
- frame = struct.pack("!Hcb", offset, transition, param) + bitmap
- self.send_frame(b"grfe", frame)
-
- def process_STAT(self, data):
- ev = data[:4]
- if ev == b'\x00\x00\x00\x00':
- LOGGER.info("Presumed informational stat message")
- else:
- handler = getattr(self, 'stat_%s' % ev.decode(), None)
- if handler is None:
- raise NotImplementedError("Stat message %r not known" % ev)
- handler(data[4:])
-
- def stat_aude(self, data):
- (spdif_enable, dac_enable) = struct.unpack("2B", data[:4])
- powered = spdif_enable or dac_enable
- self.send_event("power", powered)
- LOGGER.debug("ACK aude - Received player power: %s" % powered)
-
- def stat_audg(self, data):
- LOGGER.info("Received volume_level from player %s" % data)
- self.send_event("volume", self._volume.volume)
-
- def stat_strm(self, data):
- LOGGER.debug("ACK strm")
- #self.send_frame(b"cont", b"0")
-
- def stat_STMc(self, data):
- LOGGER.debug("Status Message: Connect")
-
- def stat_STMd(self, data):
- LOGGER.debug("Decoder Ready for next track")
- self.send_event("next_track")
-
- def stat_STMe(self, data):
- LOGGER.info("Connection established")
-
- def stat_STMf(self, data):
- LOGGER.info("Status Message: Connection closed")
- self.send_event("state", PlayerState.Stopped)
-
- def stat_STMh(self, data):
- LOGGER.info("Status Message: End of headers")
-
- def stat_STMn(self, data):
- LOGGER.error("Decoder does not support file format")
-
- def stat_STMo(self, data):
- ''' No more decoded (uncompressed) data to play; triggers rebuffering. '''
- LOGGER.debug("Output Underrun")
-
- def stat_STMp(self, data):
- '''Pause confirmed'''
- self.send_event("state", PlayerState.Paused)
-
- def stat_STMr(self, data):
- '''Resume confirmed'''
- self.send_event("state", PlayerState.Playing)
-
- def stat_STMs(self, data):
- '''Playback of new track has started'''
- self.send_event("state", PlayerState.Playing)
-
- def stat_STMt(self, data):
- """ heartbeat from client """
- timestamp = time.time()
- self._last_heartbeat = timestamp
- (num_crlf, mas_initialized, mas_mode, rptr, wptr,
- bytes_received_h, bytes_received_l, signal_strength,
- jiffies, output_buffer_size, output_buffer_fullness,
- elapsed_seconds, voltage, elapsed_milliseconds,
- server_timestamp, error_code) = struct.unpack("!BBBLLLLHLLLLHLLH", data)
- if elapsed_seconds != self._elapsed_seconds:
- self.send_event("progress")
- self._elapsed_seconds = elapsed_seconds
- self._elapsed_milliseconds = elapsed_milliseconds
-
- def stat_STMu(self, data):
- '''Normal end of playback'''
- LOGGER.info("End of playback - Underrun")
- self.send_event("state", PlayerState.Stopped)
-
- def process_BYE(self, data):
- LOGGER.info("BYE received")
- self.send_event("disconnected")
-
- def process_RESP(self, data):
- LOGGER.info("RESP received")
- self.send_frame(b"cont", b"0")
-
- def process_BODY(self, data):
- LOGGER.info("BODY received")
-
- def process_META(self, data):
- LOGGER.info("META received")
-
- def process_DSCO(self, data):
- LOGGER.info("Data Stream Disconnected")
-
- def process_DBUG(self, data):
- LOGGER.info("DBUG received")
-
- def process_IR(self, data):
- """ Slightly involved codepath here. This raises an event, which may
- be picked up by the service and then the process_remote_* function in
- this player will be called. This is mostly relevant for volume changes
- - most other button presses will require some context to operate. """
- (time, code) = struct.unpack("!IxxI", data)
- LOGGER.info("IR code %s" % code)
- # command = Remote.codes.get(code, None)
- # if command is not None:
- # LOGGER.info("IR received: %r, %r" % (code, command))
- # #self.service.evreactor.fireEvent(RemoteButtonPressed(self, command))
- # else:
- # LOGGER.info("Unknown IR received: %r, %r" % (time, code))
-
- def process_RAWI(self, data):
- LOGGER.info("RAWI received")
-
- def process_ANIC(self, data):
- LOGGER.info("ANIC received")
-
- def process_BUTN(self, data):
- LOGGER.info("BUTN received")
-
- def process_KNOB(self, data):
- ''' Transporter only, knob-related '''
- LOGGER.info("KNOB received")
-
- def process_SETD(self, data):
- ''' Get/set player firmware settings '''
- LOGGER.debug("SETD received %s" % data)
- cmd_id = data[0]
- if cmd_id == 0:
- # received player name
- data = data[1:].decode()
- self._player_name = data
- self.send_event("name")
-
- def process_UREQ(self, data):
- LOGGER.info("UREQ received")
-
-
-
-# from http://wiki.slimdevices.com/index.php/SlimProtoTCPProtocol#HELO
-devices = {
- 2: 'squeezebox',
- 3: 'softsqueeze',
- 4: 'squeezebox2',
- 5: 'transporter',
- 6: 'softsqueeze3',
- 7: 'receiver',
- 8: 'squeezeslave',
- 9: 'controller',
- 10: 'boom',
- 11: 'softboom',
- 12: 'squeezeplay',
- }
-
-
-class PyLMSVolume(object):
-
- """ Represents a sound volume. This is an awful lot more complex than it
- sounds. """
-
- minimum = 0
- maximum = 100
- step = 1
-
- # this map is taken from Slim::Player::Squeezebox2 in the squeezecenter source
- # i don't know how much magic it contains, or any way I can test it
- old_map = [
- 0, 1, 1, 1, 2, 2, 2, 3, 3, 4,
- 5, 5, 6, 6, 7, 8, 9, 9, 10, 11,
- 12, 13, 14, 15, 16, 16, 17, 18, 19, 20,
- 22, 23, 24, 25, 26, 27, 28, 29, 30, 32,
- 33, 34, 35, 37, 38, 39, 40, 42, 43, 44,
- 46, 47, 48, 50, 51, 53, 54, 56, 57, 59,
- 60, 61, 63, 65, 66, 68, 69, 71, 72, 74,
- 75, 77, 79, 80, 82, 84, 85, 87, 89, 90,
- 92, 94, 96, 97, 99, 101, 103, 104, 106, 108, 110,
- 112, 113, 115, 117, 119, 121, 123, 125, 127, 128
- ];
-
- # new gain parameters, from the same place
- total_volume_range = -50 # dB
- step_point = -1 # Number of steps, up from the bottom, where a 2nd volume ramp kicks in.
- step_fraction = 1 # fraction of totalVolumeRange where alternate volume ramp kicks in.
-
- def __init__(self):
- self.volume = 50
-
- def increment(self):
- """ Increment the volume """
- self.volume += self.step
- if self.volume > self.maximum:
- self.volume = self.maximum
-
- def decrement(self):
- """ Decrement the volume """
- self.volume -= self.step
- if self.volume < self.minimum:
- self.volume = self.minimum
-
- def old_gain(self):
- """ Return the "Old" gain value as required by the squeezebox """
- return self.old_map[self.volume]
-
- def decibels(self):
- """ Return the "new" gain value. """
-
- step_db = self.total_volume_range * self.step_fraction
- max_volume_db = 0 # different on the boom?
-
- # Equation for a line:
- # y = mx+b
- # y1 = mx1+b, y2 = mx2+b.
- # y2-y1 = m(x2 - x1)
- # y2 = m(x2 - x1) + y1
- slope_high = max_volume_db - step_db / (100.0 - self.step_point)
- slope_low = step_db - self.total_volume_range / (self.step_point - 0.0)
- x2 = self.volume
- if (x2 > self.step_point):
- m = slope_high
- x1 = 100
- y1 = max_volume_db
- else:
- m = slope_low
- x1 = 0
- y1 = self.total_volume_range
- return m * (x2 - x1) + y1
-
- def new_gain(self):
- db = self.decibels()
- floatmult = 10 ** (db/20.0)
- # avoid rounding errors somehow
- if -30 <= db <= 0:
- return int(floatmult * (1 << 8) + 0.5) * (1<<8)
- else:
- return int((floatmult * (1<<16)) + 0.5)
-
-
-##### UDP DISCOVERY STUFF #############
-
-class Datagram(object):
-
- @classmethod
- def decode(self, data):
- if data[0] == 'e':
- return TLVDiscoveryRequestDatagram(data)
- elif data[0] == 'E':
- return TLVDiscoveryResponseDatagram(data)
- elif data[0] == 'd':
- return ClientDiscoveryDatagram(data)
- elif data[0] == 'h':
- pass # Hello!
- elif data[0] == 'i':
- pass # IR
- elif data[0] == '2':
- pass # i2c?
- elif data[0] == 'a':
- pass # ack!
-
-class ClientDiscoveryDatagram(Datagram):
-
- device = None
- firmware = None
- client = None
-
- def __init__(self, data):
- s = struct.unpack('!cxBB8x6B', data.encode())
- assert s[0] == 'd'
- self.device = s[1]
- self.firmware = hex(s[2])
- self.client = ":".join(["%02x" % (x,) for x in s[3:]])
-
- def __repr__(self):
- return "<%s device=%r firmware=%r client=%r>" % (self.__class__.__name__, self.device, self.firmware, self.client)
-
-class DiscoveryResponseDatagram(Datagram):
-
- def __init__(self, hostname, port):
- hostname = hostname[:16].encode("UTF-8")
- hostname += (16 - len(hostname)) * '\x00'
- self.packet = struct.pack('!c16s', 'D', hostname).decode()
-
-class TLVDiscoveryRequestDatagram(Datagram):
-
- def __init__(self, data):
- requestdata = OrderedDict()
- assert data[0] == 'e'
- idx = 1
- length = len(data)-5
- while idx <= length:
- typ, l = struct.unpack_from("4sB", data.encode(), idx)
- if l:
- val = data[idx+5:idx+5+l]
- idx += 5+l
- else:
- val = None
- idx += 5
- typ = typ.decode()
- requestdata[typ] = val
- self.data = requestdata
-
- def __repr__(self):
- return "<%s data=%r>" % (self.__class__.__name__, self.data.items())
-
-class TLVDiscoveryResponseDatagram(Datagram):
-
- def __init__(self, responsedata):
- parts = ['E'] # new discovery format
- for typ, value in responsedata.items():
- if value is None:
- value = ''
- elif len(value) > 255:
- LOGGER.warning("Response %s too long, truncating to 255 bytes" % typ)
- value = value[:255]
- parts.extend((typ, chr(len(value)), value))
- self.packet = ''.join(parts)
-
-class DiscoveryProtocol():
-
- def __init__(self, web_port):
- self.web_port = web_port
-
- def connection_made(self, transport):
- self.transport = transport
- # Allow receiving multicast broadcasts
- sock = self.transport.get_extra_info('socket')
- group = socket.inet_aton('239.255.255.250')
- mreq = struct.pack('4sL', group, socket.INADDR_ANY)
- sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
-
- def build_TLV_response(self, requestdata):
- responsedata = OrderedDict()
- for typ, value in requestdata.items():
- if typ == 'NAME':
- # send full host name - no truncation
- value = get_hostname()
- elif typ == 'IPAD':
- # send ipaddress as a string only if it is set
- value = get_ip()
- # :todo: IPv6
- if value == '0.0.0.0':
- # do not send back an ip address
- typ = None
- elif typ == 'JSON':
- # send port as a string
- json_port = self.web_port
- value = str(json_port)
- elif typ == 'VERS':
- # send server version
- value = '7.9'
- elif typ == 'UUID':
- # send server uuid
- value = 'musicassistant'
- else:
- LOGGER.debug('Unexpected information request: %r', typ)
- typ = None
- if typ:
- responsedata[typ] = value
- return responsedata
-
- def datagram_received(self, data, addr):
- try:
- data = data.decode()
- dgram = Datagram.decode(data)
- LOGGER.debug("Data received from %s: %s" % (addr, dgram))
- if isinstance(dgram, ClientDiscoveryDatagram):
- self.sendDiscoveryResponse(addr)
- elif isinstance(dgram, TLVDiscoveryRequestDatagram):
- resonsedata = self.build_TLV_response(dgram.data)
- self.sendTLVDiscoveryResponse(resonsedata, addr)
- except Exception as exc:
- LOGGER.exception(exc)
-
- def sendDiscoveryResponse(self, addr):
- dgram = DiscoveryResponseDatagram(get_hostname(), 3483)
- LOGGER.debug("Sending discovery response %r" % (dgram.packet,))
- self.transport.sendto(dgram.packet.encode(), addr)
-
- def sendTLVDiscoveryResponse(self, resonsedata, addr):
- dgram = TLVDiscoveryResponseDatagram(resonsedata)
- LOGGER.debug("Sending discovery response %r" % (dgram.packet,))
- self.transport.sendto(dgram.packet.encode(), addr)
-
--- /dev/null
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-
+
+import asyncio
+import os
+import struct
+from collections import OrderedDict
+import time
+import decimal
+from typing import List
+import random
+import sys
+import socket
+from ..utils import run_periodic, LOGGER, parse_track_title, try_parse_int, get_ip, get_hostname
+from ..models import PlayerProvider, Player, PlayerState, MediaType, TrackQuality, AlbumType, Artist, Album, Track, Playlist
+from ..constants import CONF_ENABLED
+
+
+def setup(mass):
+ ''' setup the provider'''
+ enabled = mass.config["playerproviders"]['squeezebox'].get(CONF_ENABLED)
+ if enabled:
+ provider = PySqueezeServer(mass)
+ return provider
+ return False
+
+def config_entries():
+ ''' get the config entries for this provider (list with key/value pairs)'''
+ return [
+ (CONF_ENABLED, True, CONF_ENABLED)
+ ]
+
+
+class PySqueezeServer(PlayerProvider):
+ ''' Python implementation of SlimProto server '''
+
+ def __init__(self, mass):
+ super().__init__(mass)
+ self.prov_id = 'squeezebox'
+ self.name = 'Squeezebox'
+ self._squeeze_players = {}
+ self.buffer = b''
+ self.last_msg_received = 0
+
+ # start slimproto server
+ mass.event_loop.create_task(asyncio.start_server(self.__handle_socket_client, '0.0.0.0', 3483))
+ # setup discovery
+ mass.event_loop.create_task(self.start_discovery())
+
+ ### Provider specific implementation #####
+
+ async def start_discovery(self):
+ transport, protocol = await self.mass.event_loop.create_datagram_endpoint(
+ lambda: DiscoveryProtocol(self.mass.web._http_port),
+ local_addr=('0.0.0.0', 3483))
+ try:
+ while True:
+ await asyncio.sleep(60) # serve forever
+ finally:
+ transport.close()
+
+ async def player_command(self, player_id, cmd:str, cmd_args=None):
+ ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
+ if cmd == 'play':
+ if self._players[player_id].state == PlayerState.Stopped:
+ await self.__queue_play(player_id, None)
+ else:
+ self._squeeze_players[player_id].unpause()
+ elif cmd == 'pause':
+ self._squeeze_players[player_id].pause()
+ elif cmd == 'stop':
+ self._squeeze_players[player_id].stop()
+ elif cmd == 'next':
+ self._squeeze_players[player_id].next()
+ elif cmd == 'previous':
+ await self.__queue_previous(player_id)
+ elif cmd == 'power' and cmd_args == 'off':
+ self._squeeze_players[player_id].power_off()
+ elif cmd == 'power':
+ self._squeeze_players[player_id].power_on()
+ elif cmd == 'volume':
+ self._squeeze_players[player_id].volume_set(try_parse_int(cmd_args))
+ elif cmd == 'mute' and cmd_args == 'off':
+ self._squeeze_players[player_id].unmute()
+ elif cmd == 'mute':
+ self._squeeze_players[player_id].mute()
+
+ async def play_media(self, player_id, media_items, queue_opt='play'):
+ '''
+ play media on a player
+ '''
+ player = await self.get_player(player_id)
+ cur_index = player.cur_queue_index
+
+ if queue_opt == 'replace' or not player.queue:
+ # overwrite queue with new items
+ player.queue = media_items
+ await self.__queue_play(player_id, 0, send_flush=True)
+ elif queue_opt == 'play':
+ # replace current item with new item(s)
+ player.queue = player.queue[player_id][:cur_index] + media_items + player.queue[player_id][cur_index+1:]
+ await self.__queue_play(player_id, cur_index, send_flush=True)
+ elif queue_opt == 'next':
+ # insert new items at current index +1
+ player.queue[player_id] = player.queue[player_id][:cur_index+1] + media_items + player.queue[player_id][cur_index+1:]
+ elif queue_opt == 'add':
+ # add new items at end of queue
+ player.queue[player_id] = player.queue[player_id] + media_items
+
+ ### Provider specific (helper) methods #####
+
+ async def __queue_play(self, player_id, index, send_flush=False):
+ ''' send play command to player '''
+ if not player_id in player.queue or not player_id in player.queue_index:
+ return
+ if not player.queue[player_id]:
+ return
+ if index == None:
+ index = player.queue_index[player_id]
+ if len(player.queue[player_id]) >= index:
+ track = player.queue[player_id][index]
+ if send_flush:
+ self._squeeze_players[player_id].flush()
+ self._squeeze_players[player_id].play(track.uri)
+ player.queue_index[player_id] = index
+
+ async def __queue_next(self, player_id):
+ ''' request next track from queue '''
+ if not player_id in player.queue or not player_id in player.queue:
+ return
+ cur_queue_index = player.queue_index[player_id]
+ if len(player.queue[player_id]) > cur_queue_index:
+ new_queue_index = cur_queue_index + 1
+ elif self._players[player_id].repeat_enabled:
+ new_queue_index = 0
+ else:
+ LOGGER.warning("next track requested but no more tracks in queue")
+ return
+ return await self.__queue_play(player_id, new_queue_index)
+
+ async def __queue_previous(self, player_id):
+ ''' request previous track from queue '''
+ if not player_id in player.queue:
+ return
+ cur_queue_index = player.queue_index[player_id]
+ if cur_queue_index == 0 and len(player.queue[player_id]) > 1:
+ new_queue_index = len(player.queue[player_id]) -1
+ elif cur_queue_index == 0:
+ new_queue_index = cur_queue_index
+ else:
+ new_queue_index -= 1
+ player.queue_index[player_id] = new_queue_index
+ return await self.__queue_play(player_id, new_queue_index)
+
+ async def __handle_player_event(self, player_id, event, event_data=None):
+ ''' handle event from player '''
+ if not player_id:
+ return
+ LOGGER.debug("Event from player %s: %s - event_data: %s" %(player_id, event, str(event_data)))
+ Squeeze_player = self._squeeze_players[player_id]
+ if event == "next_track":
+ return await self.__queue_next(player_id)
+ player
+ if not player_id in self._players:
+ player = MusicPlayer()
+ player.player_id = player_id
+ player.player_provider = self.prov_id
+ self._players[player_id] = player
+ if not player_id in player.queue:
+ player.queue[player_id] = []
+ if not player_id in player.queue_index:
+ player.queue_index[player_id] = 0
+ else:
+ player = self._players[player_id]
+ # update player properties
+ player.name = Squeeze_player.player_name
+ player.volume_level = Squeeze_player.volume_level
+ player.cur_time = Squeeze_player._elapsed_seconds
+ if event == "disconnected":
+ return await self.mass.player.remove_player(player_id)
+ elif event == "power":
+ player.powered = event_data
+ elif event == "state":
+ player.state = event_data
+ if player.queue[player_id]:
+ cur_queue_index = player.queue_index[player_id]
+ player.cur_item = player.queue[player_id][cur_queue_index]
+ # update player details
+ await self.mass.player.update_player(player)
+
+ async def __handle_socket_client(self, reader, writer):
+ ''' handle a client connection on the socket'''
+ LOGGER.debug("new socket client connected")
+ Squeeze_player = PySqueezePlayer(stream_host, stream_port)
+
+ def send_frame(command, data):
+ ''' send command to Squeeze player'''
+ packet = struct.pack('!H', len(data) + 4) + command + data
+ writer.write(packet)
+
+ def handle_event(event, event_data=None):
+ ''' handle events from player'''
+ if event == "connected":
+ self._squeeze_players[Squeeze_player.player_id] = Squeeze_player
+ Squeeze_player.player_settings = self.mass.config['player_settings'][Squeeze_player.player_id]
+ asyncio.create_task(self.__handle_player_event(Squeeze_player.player_id, event, event_data))
+
+ try:
+ @run_periodic(5)
+ async def send_heartbeat():
+ timestamp = int(time.time())
+ data = Squeeze_player.pack_stream(b"t", replayGain=timestamp, flags=0)
+ Squeeze_player.send_frame(b"strm", data)
+
+ Squeeze_player.send_frame = send_frame
+ Squeeze_player.send_event = handle_event
+ heartbeat_task = asyncio.create_task(send_heartbeat())
+
+ # keep reading bytes from the socket
+ while True:
+ data = await reader.read(64)
+ if data:
+ Squeeze_player.dataReceived(data)
+ else:
+ break
+ except Exception as exc:
+ # connection lost ?
+ LOGGER.warning(exc)
+ # disconnect
+ heartbeat_task.cancel()
+ asyncio.create_task(self.__handle_player_event(Squeeze_player.player_id, 'disconnected'))
+
+
+class PySqueezePlayer(Player):
+ ''' Squeezebox socket client '''
+
+ def __init__(self, stream_host, stream_port):
+ self.buffer = b''
+ #self.display = Display()
+ self.send_frame = None
+ self.send_event = None
+ self.stream_host = stream_host
+ self.stream_port = stream_port
+ self.player_settings = {}
+ self.playback_millis = 0
+ self._volume = PySqueezeVolume()
+ self._device_type = None
+ self._mac_address = None
+ self._player_name = None
+ self._last_volume = 0
+ self._last_heartbeat = 0
+ self._elapsed_seconds = 0
+ self._elapsed_milliseconds = 0
+
+ @property
+ def player_name(self):
+ if self._player_name:
+ return self._player_name
+ return "%s - %s" %(self._device_type, self._mac_address)
+
+ @property
+ def player_id(self):
+ return self._mac_address
+
+ @property
+ def volume_level(self):
+ return self._volume.volume
+
+ def dataReceived(self, data):
+ self.buffer = self.buffer + data
+ if len(self.buffer) > 8:
+ operation, length = self.buffer[:4], self.buffer[4:8]
+ length = struct.unpack('!I', length)[0]
+ plen = length + 8
+ if len(self.buffer) >= plen:
+ packet, self.buffer = self.buffer[8:plen], self.buffer[plen:]
+ operation = operation.strip(b"!").strip().decode()
+ #LOGGER.info("operation: %s" % operation)
+ handler = getattr(self, "process_%s" % operation, None)
+ if handler is None:
+ raise NotImplementedError
+ handler(packet)
+
+ def send_version(self):
+ self.send_frame(b'vers', b'7.8')
+
+ def pack_stream(self, command, autostart=b"1", formatbyte = b'o', pcmargs = (b'?',b'?',b'?',b'?'), threshold = 200,
+ spdif = b'0', transDuration = 0, transType = b'0', flags = 0x40, outputThreshold = 0,
+ replayGain=0, serverPort = 8095, serverIp = 0):
+ return struct.pack("!cccccccBcBcBBBLHL",
+ command, autostart, formatbyte, *pcmargs,
+ threshold, spdif, transDuration, transType,
+ flags, outputThreshold, 0, replayGain, serverPort, serverIp)
+
+ def stop(self):
+ data = self.pack_stream(b"q", autostart=b"0", flags=0)
+ self.send_frame(b"strm", data)
+
+ def flush(self):
+ data = self.pack_stream(b"f", autostart=b"0", flags=0)
+ self.send_frame(b"strm", data)
+
+ def pause(self):
+ data = self.pack_stream(b"p", autostart=b"0", flags=0)
+ self.send_frame(b"strm", data)
+ LOGGER.info("Sending pause request")
+
+ def unpause(self):
+ data = self.pack_stream(b"u", autostart=b"0", flags=0)
+ self.send_frame(b"strm", data)
+ LOGGER.info("Sending unpause request")
+
+ def next(self):
+ data = self.pack_stream(b"f", autostart=b"0", flags=0)
+ self.send_frame(b"strm", data)
+ self.send_event("next_track")
+
+ def previous(self):
+ data = self.pack_stream(b"f", autostart=b"0", flags=0)
+ self.send_frame(b"strm", data)
+ self.send_event("previous_track")
+
+ def power_on(self):
+ self.send_frame(b"aude", struct.pack("2B", 1, 1))
+ self.send_event("power", True)
+
+ def power_off(self):
+ self.stop()
+ self.send_frame(b"aude", struct.pack("2B", 0, 0))
+ self.send_event("power", False)
+
+ def mute_on(self):
+ self.send_frame(b"aude", struct.pack("2B", 0, 0))
+ self.send_event("mute", True)
+
+ def mute_off(self):
+ self.send_frame(b"aude", struct.pack("2B", 1, 1))
+ self.send_event("mute", False)
+
+ def volume_up(self):
+ self._volume.increment()
+ self.send_volume()
+
+ def volume_down(self):
+ self._volume.decrement()
+ self.send_volume()
+
+ def volume_set(self, new_vol):
+ self._volume.volume = new_vol
+ self.send_volume()
+
+ def play(self, uri):
+ enable_crossfade = self.player_settings["crossfade_duration"] > 0
+ command = b's'
+ autostart = b'3' # we use direct stream for now so let the player do the messy work with buffers
+ transType= b'1' if enable_crossfade else b'0'
+ transDuration = self.player_settings["crossfade_duration"]
+ formatbyte = b'f' # fixed to flac
+ uri = '/stream' + uri.split('/stream')[1]
+ data = self.pack_stream(command, autostart=autostart, flags=0x00, formatbyte=formatbyte, transType=transType, transDuration=transDuration)
+ headers = "Connection: close\r\nAccept: */*\r\nHost: %s:%s\r\n" %(self.stream_host, self.stream_port)
+ request = "GET %s HTTP/1.0\r\n%s\r\n" % (uri, headers)
+ data = data + request.encode("utf-8")
+ self.send_frame(b'strm', data)
+ LOGGER.info("Requesting play from squeezebox" )
+
+ def displayTrack(self, track):
+ self.render("%s by %s" % (track.title, track.artist))
+
+ def process_HELO(self, data):
+ (devId, rev, mac) = struct.unpack('BB6s', data[:8])
+ device_mac = ':'.join("%02x" % x for x in mac)
+ self._device_type = devices.get(devId, 'unknown device')
+ self._mac_address = str(device_mac).lower()
+ LOGGER.debug("HELO received from %s %s" % (self._mac_address, self._device_type))
+ self.init_client()
+
+ def init_client(self):
+ ''' initialize a new connected client '''
+ self.send_event("connected")
+ self.send_version()
+ self.stop()
+ self.setBrightness()
+ #self.set_visualisation(SpectrumAnalyser())
+ self.send_frame(b"setd", struct.pack("B", 0))
+ self.send_frame(b"setd", struct.pack("B", 4))
+ self.power_on()
+ self.volume_set(40) # TODO: remember last volume
+
+ def send_volume(self):
+ og = self._volume.old_gain()
+ ng = self._volume.new_gain()
+ LOGGER.info("Volume set to %d (%d/%d)" % (self._volume.volume, og, ng))
+ d = self.send_frame(b"audg", struct.pack("!LLBBLL", og, og, 1, 255, ng, ng))
+ self.send_event("volume", self._volume.volume)
+
+ def setBrightness(self, level=4):
+ assert 0 <= level <= 4
+ self.send_frame(b"grfb", struct.pack("!H", level))
+
+ def set_visualisation(self, visualisation):
+ self.send_frame(b"visu", visualisation.pack())
+
+ def render(self, text):
+ #self.display.clear()
+ #self.display.renderText(text, "DejaVu-Sans", 16, (0,0))
+ #self.updateDisplay(self.display.frame())
+ pass
+
+ def updateDisplay(self, bitmap, transition = 'c', offset=0, param=0):
+ frame = struct.pack("!Hcb", offset, transition, param) + bitmap
+ self.send_frame(b"grfe", frame)
+
+ def process_STAT(self, data):
+ ev = data[:4]
+ if ev == b'\x00\x00\x00\x00':
+ LOGGER.info("Presumed informational stat message")
+ else:
+ handler = getattr(self, 'stat_%s' % ev.decode(), None)
+ if handler is None:
+ raise NotImplementedError("Stat message %r not known" % ev)
+ handler(data[4:])
+
+ def stat_aude(self, data):
+ (spdif_enable, dac_enable) = struct.unpack("2B", data[:4])
+ powered = spdif_enable or dac_enable
+ self.send_event("power", powered)
+ LOGGER.debug("ACK aude - Received player power: %s" % powered)
+
+ def stat_audg(self, data):
+ LOGGER.info("Received volume_level from player %s" % data)
+ self.send_event("volume", self._volume.volume)
+
+ def stat_strm(self, data):
+ LOGGER.debug("ACK strm")
+ #self.send_frame(b"cont", b"0")
+
+ def stat_STMc(self, data):
+ LOGGER.debug("Status Message: Connect")
+
+ def stat_STMd(self, data):
+ LOGGER.debug("Decoder Ready for next track")
+ self.send_event("next_track")
+
+ def stat_STMe(self, data):
+ LOGGER.info("Connection established")
+
+ def stat_STMf(self, data):
+ LOGGER.info("Status Message: Connection closed")
+ self.send_event("state", PlayerState.Stopped)
+
+ def stat_STMh(self, data):
+ LOGGER.info("Status Message: End of headers")
+
+ def stat_STMn(self, data):
+ LOGGER.error("Decoder does not support file format")
+
+ def stat_STMo(self, data):
+ ''' No more decoded (uncompressed) data to play; triggers rebuffering. '''
+ LOGGER.debug("Output Underrun")
+
+ def stat_STMp(self, data):
+ '''Pause confirmed'''
+ self.send_event("state", PlayerState.Paused)
+
+ def stat_STMr(self, data):
+ '''Resume confirmed'''
+ self.send_event("state", PlayerState.Playing)
+
+ def stat_STMs(self, data):
+ '''Playback of new track has started'''
+ self.send_event("state", PlayerState.Playing)
+
+ def stat_STMt(self, data):
+ """ heartbeat from client """
+ timestamp = time.time()
+ self._last_heartbeat = timestamp
+ (num_crlf, mas_initialized, mas_mode, rptr, wptr,
+ bytes_received_h, bytes_received_l, signal_strength,
+ jiffies, output_buffer_size, output_buffer_fullness,
+ elapsed_seconds, voltage, elapsed_milliseconds,
+ server_timestamp, error_code) = struct.unpack("!BBBLLLLHLLLLHLLH", data)
+ if elapsed_seconds != self._elapsed_seconds:
+ self.send_event("progress")
+ self._elapsed_seconds = elapsed_seconds
+ self._elapsed_milliseconds = elapsed_milliseconds
+
+ def stat_STMu(self, data):
+ '''Normal end of playback'''
+ LOGGER.info("End of playback - Underrun")
+ self.send_event("state", PlayerState.Stopped)
+
+ def process_BYE(self, data):
+ LOGGER.info("BYE received")
+ self.send_event("disconnected")
+
+ def process_RESP(self, data):
+ LOGGER.info("RESP received")
+ self.send_frame(b"cont", b"0")
+
+ def process_BODY(self, data):
+ LOGGER.info("BODY received")
+
+ def process_META(self, data):
+ LOGGER.info("META received")
+
+ def process_DSCO(self, data):
+ LOGGER.info("Data Stream Disconnected")
+
+ def process_DBUG(self, data):
+ LOGGER.info("DBUG received")
+
+ def process_IR(self, data):
+ """ Slightly involved codepath here. This raises an event, which may
+ be picked up by the service and then the process_remote_* function in
+ this player will be called. This is mostly relevant for volume changes
+ - most other button presses will require some context to operate. """
+ (time, code) = struct.unpack("!IxxI", data)
+ LOGGER.info("IR code %s" % code)
+ # command = Remote.codes.get(code, None)
+ # if command is not None:
+ # LOGGER.info("IR received: %r, %r" % (code, command))
+ # #self.service.evreactor.fireEvent(RemoteButtonPressed(self, command))
+ # else:
+ # LOGGER.info("Unknown IR received: %r, %r" % (time, code))
+
+ def process_RAWI(self, data):
+ LOGGER.info("RAWI received")
+
+ def process_ANIC(self, data):
+ LOGGER.info("ANIC received")
+
+ def process_BUTN(self, data):
+ LOGGER.info("BUTN received")
+
+ def process_KNOB(self, data):
+ ''' Transporter only, knob-related '''
+ LOGGER.info("KNOB received")
+
+ def process_SETD(self, data):
+ ''' Get/set player firmware settings '''
+ LOGGER.debug("SETD received %s" % data)
+ cmd_id = data[0]
+ if cmd_id == 0:
+ # received player name
+ data = data[1:].decode()
+ self._player_name = data
+ self.send_event("name")
+
+ def process_UREQ(self, data):
+ LOGGER.info("UREQ received")
+
+
+
+# from http://wiki.slimdevices.com/index.php/SlimProtoTCPProtocol#HELO
+devices = {
+ 2: 'squeezebox',
+ 3: 'softsqueeze',
+ 4: 'squeezebox2',
+ 5: 'transporter',
+ 6: 'softsqueeze3',
+ 7: 'receiver',
+ 8: 'squeezeslave',
+ 9: 'controller',
+ 10: 'boom',
+ 11: 'softboom',
+ 12: 'squeezeplay',
+ }
+
+
+class PySqueezeVolume(object):
+
+ """ Represents a sound volume. This is an awful lot more complex than it
+ sounds. """
+
+ minimum = 0
+ maximum = 100
+ step = 1
+
+ # this map is taken from Slim::Player::Squeezebox2 in the squeezecenter source
+ # i don't know how much magic it contains, or any way I can test it
+ old_map = [
+ 0, 1, 1, 1, 2, 2, 2, 3, 3, 4,
+ 5, 5, 6, 6, 7, 8, 9, 9, 10, 11,
+ 12, 13, 14, 15, 16, 16, 17, 18, 19, 20,
+ 22, 23, 24, 25, 26, 27, 28, 29, 30, 32,
+ 33, 34, 35, 37, 38, 39, 40, 42, 43, 44,
+ 46, 47, 48, 50, 51, 53, 54, 56, 57, 59,
+ 60, 61, 63, 65, 66, 68, 69, 71, 72, 74,
+ 75, 77, 79, 80, 82, 84, 85, 87, 89, 90,
+ 92, 94, 96, 97, 99, 101, 103, 104, 106, 108, 110,
+ 112, 113, 115, 117, 119, 121, 123, 125, 127, 128
+ ];
+
+ # new gain parameters, from the same place
+ total_volume_range = -50 # dB
+ step_point = -1 # Number of steps, up from the bottom, where a 2nd volume ramp kicks in.
+ step_fraction = 1 # fraction of totalVolumeRange where alternate volume ramp kicks in.
+
+ def __init__(self):
+ self.volume = 50
+
+ def increment(self):
+ """ Increment the volume """
+ self.volume += self.step
+ if self.volume > self.maximum:
+ self.volume = self.maximum
+
+ def decrement(self):
+ """ Decrement the volume """
+ self.volume -= self.step
+ if self.volume < self.minimum:
+ self.volume = self.minimum
+
+ def old_gain(self):
+ """ Return the "Old" gain value as required by the squeezebox """
+ return self.old_map[self.volume]
+
+ def decibels(self):
+ """ Return the "new" gain value. """
+
+ step_db = self.total_volume_range * self.step_fraction
+ max_volume_db = 0 # different on the boom?
+
+ # Equation for a line:
+ # y = mx+b
+ # y1 = mx1+b, y2 = mx2+b.
+ # y2-y1 = m(x2 - x1)
+ # y2 = m(x2 - x1) + y1
+ slope_high = max_volume_db - step_db / (100.0 - self.step_point)
+ slope_low = step_db - self.total_volume_range / (self.step_point - 0.0)
+ x2 = self.volume
+ if (x2 > self.step_point):
+ m = slope_high
+ x1 = 100
+ y1 = max_volume_db
+ else:
+ m = slope_low
+ x1 = 0
+ y1 = self.total_volume_range
+ return m * (x2 - x1) + y1
+
+ def new_gain(self):
+ db = self.decibels()
+ floatmult = 10 ** (db/20.0)
+ # avoid rounding errors somehow
+ if -30 <= db <= 0:
+ return int(floatmult * (1 << 8) + 0.5) * (1<<8)
+ else:
+ return int((floatmult * (1<<16)) + 0.5)
+
+
+##### UDP DISCOVERY STUFF #############
+
+class Datagram(object):
+
+ @classmethod
+ def decode(self, data):
+ if data[0] == 'e':
+ return TLVDiscoveryRequestDatagram(data)
+ elif data[0] == 'E':
+ return TLVDiscoveryResponseDatagram(data)
+ elif data[0] == 'd':
+ return ClientDiscoveryDatagram(data)
+ elif data[0] == 'h':
+ pass # Hello!
+ elif data[0] == 'i':
+ pass # IR
+ elif data[0] == '2':
+ pass # i2c?
+ elif data[0] == 'a':
+ pass # ack!
+
+class ClientDiscoveryDatagram(Datagram):
+
+ device = None
+ firmware = None
+ client = None
+
+ def __init__(self, data):
+ s = struct.unpack('!cxBB8x6B', data.encode())
+ assert s[0] == 'd'
+ self.device = s[1]
+ self.firmware = hex(s[2])
+ self.client = ":".join(["%02x" % (x,) for x in s[3:]])
+
+ def __repr__(self):
+ return "<%s device=%r firmware=%r client=%r>" % (self.__class__.__name__, self.device, self.firmware, self.client)
+
+class DiscoveryResponseDatagram(Datagram):
+
+ def __init__(self, hostname, port):
+ hostname = hostname[:16].encode("UTF-8")
+ hostname += (16 - len(hostname)) * '\x00'
+ self.packet = struct.pack('!c16s', 'D', hostname).decode()
+
+class TLVDiscoveryRequestDatagram(Datagram):
+
+ def __init__(self, data):
+ requestdata = OrderedDict()
+ assert data[0] == 'e'
+ idx = 1
+ length = len(data)-5
+ while idx <= length:
+ typ, l = struct.unpack_from("4sB", data.encode(), idx)
+ if l:
+ val = data[idx+5:idx+5+l]
+ idx += 5+l
+ else:
+ val = None
+ idx += 5
+ typ = typ.decode()
+ requestdata[typ] = val
+ self.data = requestdata
+
+ def __repr__(self):
+ return "<%s data=%r>" % (self.__class__.__name__, self.data.items())
+
+class TLVDiscoveryResponseDatagram(Datagram):
+
+ def __init__(self, responsedata):
+ parts = ['E'] # new discovery format
+ for typ, value in responsedata.items():
+ if value is None:
+ value = ''
+ elif len(value) > 255:
+ LOGGER.warning("Response %s too long, truncating to 255 bytes" % typ)
+ value = value[:255]
+ parts.extend((typ, chr(len(value)), value))
+ self.packet = ''.join(parts)
+
+class DiscoveryProtocol():
+
+ def __init__(self, web_port):
+ self.web_port = web_port
+
+ def connection_made(self, transport):
+ self.transport = transport
+ # Allow receiving multicast broadcasts
+ sock = self.transport.get_extra_info('socket')
+ group = socket.inet_aton('239.255.255.250')
+ mreq = struct.pack('4sL', group, socket.INADDR_ANY)
+ sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
+
+ def build_TLV_response(self, requestdata):
+ responsedata = OrderedDict()
+ for typ, value in requestdata.items():
+ if typ == 'NAME':
+ # send full host name - no truncation
+ value = get_hostname()
+ elif typ == 'IPAD':
+ # send ipaddress as a string only if it is set
+ value = get_ip()
+ # :todo: IPv6
+ if value == '0.0.0.0':
+ # do not send back an ip address
+ typ = None
+ elif typ == 'JSON':
+ # send port as a string
+ json_port = self.web_port
+ value = str(json_port)
+ elif typ == 'VERS':
+ # send server version
+ value = '7.9'
+ elif typ == 'UUID':
+ # send server uuid
+ value = 'musicassistant'
+ else:
+ LOGGER.debug('Unexpected information request: %r', typ)
+ typ = None
+ if typ:
+ responsedata[typ] = value
+ return responsedata
+
+ def datagram_received(self, data, addr):
+ try:
+ data = data.decode()
+ dgram = Datagram.decode(data)
+ LOGGER.debug("Data received from %s: %s" % (addr, dgram))
+ if isinstance(dgram, ClientDiscoveryDatagram):
+ self.sendDiscoveryResponse(addr)
+ elif isinstance(dgram, TLVDiscoveryRequestDatagram):
+ resonsedata = self.build_TLV_response(dgram.data)
+ self.sendTLVDiscoveryResponse(resonsedata, addr)
+ except Exception as exc:
+ LOGGER.exception(exc)
+
+ def sendDiscoveryResponse(self, addr):
+ dgram = DiscoveryResponseDatagram(get_hostname(), 3483)
+ LOGGER.debug("Sending discovery response %r" % (dgram.packet,))
+ self.transport.sendto(dgram.packet.encode(), addr)
+
+ def sendTLVDiscoveryResponse(self, resonsedata, addr):
+ dgram = TLVDiscoveryResponseDatagram(resonsedata)
+ LOGGER.debug("Sending discovery response %r" % (dgram.packet,))
+ self.transport.sendto(dgram.packet.encode(), addr)
+
from concurrent.futures import ThreadPoolExecutor
import socket
import os
-logformat = logging.Formatter('%(asctime)-15s %(levelname)-5s %(name)s.%(module)s -- %(message)s')
-consolehandler = logging.StreamHandler()
-consolehandler.setFormatter(logformat)
-LOGGER = logging.getLogger(__package__)
-LOGGER.setLevel(logging.INFO)
-LOGGER.addHandler(consolehandler)
-
+LOGGER = logging.getLogger()
def run_periodic(period):