EVENT_STREAM_STARTED = "streaming started"
EVENT_STREAM_ENDED = "streaming ended"
EVENT_CONFIG_CHANGED = "config changed"
+EVENT_PLAYBACK_STARTED = "playback started"
+EVENT_PLAYBACK_STOPPED = "playback stopped"
yield (True, b'')
return
# get sox effects and resample options
- sox_effects = await self.__get_player_sox_options(player, queue_item)
+ sox_options = await self.__get_player_sox_options(player, queue_item)
outputfmt = 'flac -C 0'
if resample:
outputfmt = 'raw -b 32 -c 2 -e signed-integer'
- sox_effects += ' rate -v %s' % resample
+ sox_options += ' rate -v %s' % resample
+ streamdetails['sox_options'] = sox_options
# determine how to proceed based on input file ype
if streamdetails["content_type"] == 'aac':
# support for AAC created with ffmpeg in between
- args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_effects)
+ args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_options)
elif streamdetails['type'] == 'url':
args = 'sox -t %s "%s" -t %s - %s' % (streamdetails["content_type"],
- streamdetails["path"], outputfmt, sox_effects)
+ streamdetails["path"], outputfmt, sox_options)
elif streamdetails['type'] == 'executable':
args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"],
- streamdetails["content_type"], outputfmt, sox_effects)
+ streamdetails["content_type"], outputfmt, sox_options)
# start sox process
# we use normal subprocess instead of asyncio because of bug with executor
# this should be fixed with python 3.8
process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE)
- # fire event that streaming has started for this track (needed by some streaming providers)
+ # fire event that streaming has started for this track
asyncio.run_coroutine_threadsafe(
- self.mass.signal_event(EVENT_STREAM_STARTED,
- streamdetails), self.mass.event_loop)
+ self.mass.signal_event(EVENT_STREAM_STARTED, queue_item), self.mass.event_loop)
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
bytes_sent = 0
else:
buf += data
del buf
- # fire event that streaming has ended for this track (needed by some streaming providers)
- if resample:
- bytes_per_second = resample * (32/8) * 2
- bytes_per_second = (resample * 32 * 2) / 8
- seconds_streamed = int(bytes_sent/bytes_per_second)
- else:
- seconds_streamed = queue_item.duration
- streamdetails["seconds"] = seconds_streamed
+ # fire event that streaming has ended
asyncio.run_coroutine_threadsafe(
- self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails), self.mass.event_loop)
+ self.mass.signal_event(EVENT_STREAM_ENDED, queue_item), self.mass.event_loop)
# send task to background to analyse the audio
self.mass.event_loop.call_soon_threadsafe(
asyncio.ensure_future, self.__analyze_audio(queue_item))
async def __get_player_sox_options(self, player, queue_item):
''' get player specific sox effect options '''
- sox_effects = []
+ sox_options = []
# volume normalisation
gain_correct = asyncio.run_coroutine_threadsafe(
self.mass.players.get_gain_correct(
player.player_id, queue_item.item_id, queue_item.provider),
self.mass.event_loop).result()
if gain_correct != 0:
- sox_effects.append('vol %s dB ' % gain_correct)
+ sox_options.append('vol %s dB ' % gain_correct)
# downsample if needed
if player.settings['max_sample_rate']:
max_sample_rate = try_parse_int(player.settings['max_sample_rate'])
if max_sample_rate:
quality = queue_item.quality
if quality > TrackQuality.FLAC_LOSSLESS_HI_RES_3 and max_sample_rate == 192000:
- sox_effects.append('rate -v 192000')
+ sox_options.append('rate -v 192000')
elif quality > TrackQuality.FLAC_LOSSLESS_HI_RES_2 and max_sample_rate == 96000:
- sox_effects.append('rate -v 96000')
+ sox_options.append('rate -v 96000')
elif quality > TrackQuality.FLAC_LOSSLESS_HI_RES_1 and max_sample_rate == 48000:
- sox_effects.append('rate -v 48000')
- if player.settings.get('sox_effects'):
- sox_effects.append(player.settings['sox_effects'])
- return " ".join(sox_effects)
+ sox_options.append('rate -v 48000')
+ if player.settings.get('sox_options'):
+ sox_options.append(player.settings['sox_options'])
+ return " ".join(sox_options)
async def __analyze_audio(self, queue_item):
''' analyze track audio, for now we only calculate EBU R128 loudness '''
args = 'sox --ignore-length -t %s - -t %s %s fade t %s' % (pcm_args, pcm_args, fadeinfile.name, fade_length)
process = subprocess.Popen(args, shell=True, stdin=subprocess.PIPE)
process.communicate(fade_in_part)
- fadeinfile.close()
# create fade-out part
fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
args = 'sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse' % (pcm_args, pcm_args, fadeoutfile.name, fade_length)
import pickle
from ..utils import LOGGER, json, filename_from_string
-from ..constants import CONF_ENABLED
+from ..constants import CONF_ENABLED, EVENT_PLAYBACK_STARTED, EVENT_PLAYBACK_STOPPED
from .media_types import Track, TrackQuality
from .playerstate import PlayerState
self._repeat_enabled = False
self._cur_index = 0
self._cur_item_time = 0
- self._last_cur_item_time = 0
- self._last_index = 0
+ self._last_item_time = 0
+ self._last_queue_startindex = 0
self._last_player_state = PlayerState.Stopped
self._save_busy_ = False
+ self._last_track = None
# load previous queue settings from disk
- self.__load_from_file()
-
+ self.mass.event_loop.create_task(self.__load_from_file())
+
@property
def shuffle_enabled(self):
return self._shuffle_enabled
@property
def cur_item_time(self):
- if self.use_queue_stream:
- return self._cur_item_time
- else:
- return self._player._cur_time
+ return self._cur_item_time
@property
def next_index(self):
''' resume previous queue '''
if self.items:
prev_index = self.cur_index
- await self.load(self._items)
- if prev_index:
+ if self.use_queue_stream:
await self.play_index(prev_index)
+ else:
+ # at this point we don't know if the queue is synced with the player
+ # so just to be safe we only send the queue_items from the index
+ await self.load(self._items[prev_index:])
else:
LOGGER.warning("resume queue requested for %s but queue is empty" % self._player.name)
if not len(self.items) > index:
return
if self.use_queue_stream:
- self._cur_index = index
+ self._last_queue_startindex = index
queue_stream_uri = 'http://%s:%s/stream/%s'% (
self.mass.web.local_ip, self.mass.web.http_port, self._player.player_id)
return await self._player.cmd_play_uri(queue_stream_uri)
if self._shuffle_enabled:
queue_items = await self.__shuffle_items(queue_items)
self._items = queue_items
- self._cur_index = 0
if self.use_queue_stream or not self._player.supports_queue:
- return await self.play_index(0)
+ await self.play_index(0)
else:
- return await self._player.cmd_queue_load(queue_items)
+ await self._player.cmd_queue_load(queue_items)
async def insert(self, queue_items:List[QueueItem], offset=0):
'''
async def update(self):
''' update queue details, called when player updates '''
cur_index = self._cur_index
- # determine queue index and cur_time for queue stream
+ track_time = self._cur_item_time
+ # handle queue stream
if self.use_queue_stream and self._player.state == PlayerState.Playing:
- # player is playing a constant stream of the queue so we need to do this the hard way
- cur_time_queue = self._player._cur_time
- total_time = 0
- track_time = 0
- if self.items and len(self.items) > self._last_index:
- queue_index = self._last_index # holds the last starting position
- queue_track = None
- while len(self.items) > queue_index:
- queue_track = self.items[queue_index]
- if cur_time_queue > (queue_track.duration + total_time):
- total_time += queue_track.duration
- queue_index += 1
- else:
- track_time = cur_time_queue - total_time
- break
- cur_index = queue_index
- self._cur_item_time = track_time
+ cur_index, track_time = await self.__get_queue_stream_index()
# normal queue based approach
elif not self.use_queue_stream:
- if 'queue_item_id' in self._player.cur_uri:
- queue_item_id = self._player.cur_uri.split('queue_item_id=')[1]
+ track_time = self._player._cur_time
for index, queue_item in enumerate(self.items):
if queue_item.uri == self._player.cur_uri:
cur_index = index
break
# process new index
- await self.__update_index(cur_index)
+ await self.__process_queue_update(cur_index, track_time)
async def start_queue_stream(self):
''' called by the queue streamer when it starts playing the queue stream '''
- self._last_index = self.cur_index
- return await self.get_item(self.cur_index)
+ return await self.get_item(self._last_queue_startindex)
- async def __update_index(self, new_index):
+ async def __get_queue_stream_index(self):
+ # player is playing a constant stream of the queue so we need to do this the hard way
+ queue_index = 0
+ cur_time_queue = self._player._cur_time
+ total_time = 0
+ track_time = 0
+ if self.items and len(self.items) > self._last_queue_startindex:
+ queue_index = self._last_queue_startindex # holds the last starting position
+ queue_track = None
+ while len(self.items) > queue_index:
+ queue_track = self.items[queue_index]
+ if cur_time_queue > (queue_track.duration + total_time):
+ total_time += queue_track.duration
+ queue_index += 1
+ else:
+ track_time = cur_time_queue - total_time
+ break
+ return queue_index, track_time
+
+ async def __process_queue_update(self, new_index, track_time):
''' compare the queue index to determine if playback changed '''
- if new_index != self._last_index:
- LOGGER.info("new track loaded in queue")
- self._cur_index = new_index
- elif (self._last_player_state == PlayerState.Stopped and
- self._player.state == PlayerState.Playing and
- self.cur_item):
- LOGGER.info("Player %s started playing %s" % self.cur_item.name)
- elif (self._last_player_state == PlayerState.Playing and
- self._player.state == PlayerState.Stopped and
- self.cur_item):
- LOGGER.info("Player %s stopped playing %s" % self.cur_item.name)
- # always update timestamp
- self._last_cur_item_time = self.cur_item_time
-
+ new_track = await self.get_item(new_index)
+ if (not self._last_track and new_track) or self._last_track != new_track:
+ # queue track updated
+ # account for track changing state so trigger track change after 1 second
+ if self._last_track:
+ self._last_track.seconds_played = self._last_item_time
+ self.mass.event_loop.create_task(
+ self.mass.signal_event(EVENT_PLAYBACK_STOPPED, self._last_track))
+ if new_track:
+ self.mass.event_loop.create_task(
+ self.mass.signal_event(EVENT_PLAYBACK_STARTED, new_track))
+ self._last_track = new_track
+ await self.__save_to_file()
+ if self._last_player_state != self._player.state:
+ self._last_player_state = self._player.state
+ if (self._player.cur_time == 0 and
+ self._player.state in [PlayerState.Stopped, PlayerState.Off]):
+ # player stopped playing
+ self._last_queue_startindex = self.cur_index
+ # update vars
+ if track_time > 2:
+ # account for track changing state so keep this a few seconds behind
+ self._last_item_time = track_time
+ self._cur_item_time = track_time
+ self._cur_index = new_index
+
async def __shuffle_items(self, queue_items):
''' shuffle a list of tracks '''
# for now we use default python random function
# can be extended with some more magic last last_played and stuff
return random.sample(queue_items, len(queue_items))
- def __load_from_file(self):
+ async def __load_from_file(self):
''' try to load the saved queue for this player from file '''
player_safe_str = filename_from_string(self._player.player_id)
settings_dir = os.path.join(self.mass.datapath, 'queue')
player_file = os.path.join(settings_dir, player_safe_str)
if os.path.isfile(player_file):
try:
- with open(player_file) as f:
+ with open(player_file, 'rb') as f:
data = pickle.load(f)
self._shuffle_enabled = data["shuffle_enabled"]
self._repeat_enabled = data["repeat_enabled"]
self._items = data["items"]
self._cur_index = data["cur_item"]
- self._last_index = data["last_index"]
+ self._last_queue_startindex = data["last_index"]
except Exception as exc:
LOGGER.debug("Could not load queue from disk - %s" % str(exc))
- def __save_to_file(self):
+ async def __save_to_file(self):
''' save current queue settings to file '''
if self._save_busy_:
return
"repeat_enabled": self._repeat_enabled,
"items": self._items,
"cur_item": self._cur_index,
- "last_index": self._last_index
+ "last_index": self._cur_index
}
if not os.path.isdir(settings_dir):
os.mkdir(settings_dir)
- with open(player_file, 'w+') as f:
- pickle.dump(data, f)
+ with open(player_file, 'wb') as f:
+ data = pickle.dump(data, f)
self._save_busy_ = False
from ..models import MusicProvider, MediaType, TrackQuality, \
AlbumType, Artist, Album, Track, Playlist
from ..constants import CONF_USERNAME, CONF_PASSWORD, CONF_ENABLED, \
- CONF_TYPE_PASSWORD, EVENT_STREAM_STARTED, EVENT_STREAM_ENDED
+ CONF_TYPE_PASSWORD, EVENT_PLAYBACK_STARTED, EVENT_PLAYBACK_STOPPED
PROV_ID = 'qobuz'
PROV_NAME = 'Qobuz'
self.http_session = aiohttp.ClientSession(
loop=self.mass.event_loop, connector=aiohttp.TCPConnector())
self.throttler = Throttler(rate_limit=2, period=1)
- await self.mass.add_event_listener(self.mass_event, EVENT_STREAM_STARTED)
- await self.mass.add_event_listener(self.mass_event, EVENT_STREAM_ENDED)
+ await self.mass.add_event_listener(self.mass_event, EVENT_PLAYBACK_STARTED)
+ await self.mass.add_event_listener(self.mass_event, EVENT_PLAYBACK_STOPPED)
async def search(self, searchstring, media_types=List[MediaType], limit=5):
''' perform search on the provider '''
async def mass_event(self, msg, msg_details):
''' received event from mass '''
# TODO: need to figure out if the streamed track is purchased
- if msg == "streaming_started" and msg_details['provider'] == self.prov_id:
+ if msg == EVENT_PLAYBACK_STARTED and msg_details.provider == self.prov_id:
# report streaming started to qobuz
- LOGGER.debug("streaming_started %s" % msg_details["track_id"])
device_id = self.__user_auth_info["user"]["device"]["id"]
credential_id = self.__user_auth_info["user"]["credential"]["id"]
user_id = self.__user_auth_info["user"]["id"]
- format_id = msg_details["details"]["format_id"]
+ format_id = msg_details.streamdetails["details"]["format_id"]
timestamp = int(time.time())
events=[{"online": True, "sample": False, "intent": "stream", "device_id": device_id,
- "track_id": msg_details["track_id"], "purchase": False, "date": timestamp,
+ "track_id": msg_details.item_id, "purchase": False, "date": timestamp,
"credential_id": credential_id, "user_id": user_id, "local": False, "format_id":format_id}]
await self.__post_data("track/reportStreamingStart", data=events)
- elif msg == "streaming_ended" and msg_details['provider'] == self.prov_id:
+ elif msg == EVENT_PLAYBACK_STOPPED and msg_details.provider == self.prov_id:
# report streaming ended to qobuz
- LOGGER.debug("streaming_ended %s - seconds played: %s" %(msg_details["track_id"], msg_details["seconds"]) )
device_id = self.__user_auth_info["user"]["device"]["id"]
credential_id = self.__user_auth_info["user"]["credential"]["id"]
user_id = self.__user_auth_info["user"]["id"]
- format_id = msg_details["details"]["format_id"]
+ format_id = msg_details.streamdetails["details"]["format_id"]
timestamp = int(time.time())
events=[{"online": True, "sample": False, "intent": "stream", "device_id": device_id,
- "track_id": msg_details["track_id"], "purchase": False, "date": timestamp, "duration": msg_details["seconds"],
+ "track_id": msg_details.item_id, "purchase": False, "date": timestamp, "duration": int(msg_details.seconds_played),
"credential_id": credential_id, "user_id": user_id, "local": False, "format_id":format_id}]
- await self.__post_data("track/reportStreamingStart", data=events)
+ await self.__post_data("track/reportStreamingEnd", data=events)
async def __parse_artist(self, artist_obj):
''' parse qobuz artist object to generic layout '''
async def try_chromecast_command(self, cmd:types.MethodType, *args, **kwargs):
''' guard for disconnected socket client '''
- try:
- cmd(*args, **kwargs)
- except (pychromecast.error.NotConnected, AttributeError):
- LOGGER.warning("Chromecast %s is not connected!" % self.name)
- except Exception as exc:
- LOGGER.warning(exc)
+ def _try_chromecast_command(_cmd:types.MethodType, *_args, **_kwargs):
+ try:
+ _cmd(*_args, **_kwargs)
+ except (pychromecast.error.NotConnected, AttributeError):
+ LOGGER.warning("Chromecast %s is not connected!" % self.name)
+ except Exception as exc:
+ LOGGER.warning(exc)
+ return self.mass.event_loop.call_soon_threadsafe(
+ _try_chromecast_command, cmd, *args, **kwargs)
async def cmd_stop(self):
''' send stop command to player '''
else:
send_queue()
+ def __update_group_members(self):
+ ''' update group members '''
+ if not self.mz:
+ return
+ try:
+ self.mz.update_members()
+ except Exception as exc:
+ LOGGER.exception(exc)
+
async def handle_player_state(self, caststatus=None,
mediastatus=None, connection_status=None):
''' handle a player state message from the socket '''
# handle connection status
if connection_status:
if self.mz and connection_status.status == CONNECTION_STATUS_CONNECTED:
- return self.mz.update_members()
+ self.mass.event_loop.call_soon_threadsafe(self.__update_group_members)
elif connection_status.status == CONNECTION_STATUS_DISCONNECTED:
# schedule a new scan which will handle group parent changes
- return self.mass.event_loop.create_task(
+ self.mass.event_loop.create_task(
self.mass.players.providers[self.player_provider].start_chromecast_discovery())
# handle generic cast status
if caststatus:
- self.name = self.cc.name
self.muted = caststatus.volume_muted
self.volume_level = caststatus.volume_level * 100
+ self.name = self.cc.name
# handle media status
if mediastatus:
if mediastatus.player_state in ['PLAYING', 'BUFFERING']:
self.poll_task = False
if not self.poll_task and self.state == PlayerState.Playing:
self.mass.event_loop.create_task(poll_task())
- # we are called from socket client thread so do this threadsafe!
- #asyncio.run_coroutine_threadsafe(self.update(), self.mass.event_loop)
class ChromecastProvider(PlayerProvider):
''' support for ChromeCast Audio '''
player = await self.get_player(added_player)
group_player = await self.get_player(str(mz._uuid))
if player and group_player:
- player.group_parent = str(mz._uuid)
+ player.group_parent = group_player.player_id
LOGGER.debug("player %s added to group %s" %(player.name, group_player.name))
elif removed_player:
player = await self.get_player(added_player)
for member in mz.members:
player = await self.get_player(member)
if player:
+ LOGGER.debug("player %s added to group %s" %(player.name, str(mz._uuid)))
player.group_parent = str(mz._uuid)
@run_periodic(1800)
result = await player_cmd(cmd_args)
elif player_cmd:
result = await player_cmd()
+ except (Exception, AssertionError) as exc:
+ LOGGER.warning("Websocket disconnected - %s" % str(exc))
finally:
await self.mass.remove_event_listener(cb_id)
LOGGER.debug('websocket connection closed')