mass.async_start(),
use_uvloop=True,
shutdown_callback=on_shutdown,
- executor_workers=32,
+ executor_workers=64,
)
from enum import Enum
from typing import List
-from cryptography.fernet import Fernet, InvalidToken
-from music_assistant.app_vars import get_app_var # noqa # pylint: disable=all
from music_assistant.constants import (
CONF_CROSSFADE_DURATION,
CONF_ENABLED,
EVENT_CONFIG_CHANGED,
)
from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType
-from music_assistant.utils import get_external_ip, json, try_load_json_file
+from music_assistant.utils import (
+ decrypt_string,
+ encrypt_string,
+ get_external_ip,
+ json,
+ try_load_json_file,
+)
from passlib.hash import pbkdf2_sha256
LOGGER = logging.getLogger("mass")
entry = self.get_entry(key)
if entry.entry_type == ConfigEntryType.PASSWORD:
# decrypted password is only returned if explicitly asked for this key
- try:
- return Fernet(get_app_var(3)).decrypt(entry.value.encode()).decode()
- except InvalidToken:
- pass
+ decrypted_value = decrypt_string(entry.value)
+ if decrypted_value:
+ return decrypted_value
return entry.value
def __setitem__(self, key, value):
if entry.store_hashed:
value = pbkdf2_sha256.hash(value)
if entry.entry_type == ConfigEntryType.PASSWORD:
- value = Fernet(get_app_var(3)).encrypt(value.encode()).decode()
+ value = encrypt_string(value)
self.stored_config[key] = value
self.mass.signal_event(
EVENT_CONFIG_CHANGED, (self._base_type, self._parent_item_key)
import gc
import io
import logging
-import os
import shlex
-import signal
import subprocess
import threading
import urllib
from music_assistant.models.media_types import MediaType
from music_assistant.models.player_queue import QueueItem
from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType
-from music_assistant.utils import create_tempfile, get_ip, try_parse_int
+from music_assistant.utils import create_tempfile, decrypt_string, get_ip, try_parse_int
from music_assistant.web import require_local_subnet
LOGGER = logging.getLogger("mass")
asyncio.CancelledError,
aiohttp.ClientConnectionError,
asyncio.TimeoutError,
+ Exception,
) as exc:
cancelled.set()
raise exc # re-raise
player_queue = self.mass.player_manager.get_player_queue(player_id)
sample_rate = try_parse_int(player_conf["max_sample_rate"])
fade_length = try_parse_int(player_conf["crossfade_duration"])
- if not sample_rate or sample_rate < 44100 or sample_rate > 384000:
+ if not sample_rate or sample_rate < 44100:
sample_rate = 96000
if fade_length:
fade_bytes = int(sample_rate * 4 * 2 * fade_length)
sox_proc.stdin.write(last_fadeout_data)
del last_fadeout_data
# END OF QUEUE STREAM
- sox_proc.stdin.close()
sox_proc.terminate()
+ sox_proc.communicate()
fill_buffer_thread.join()
- del sox_proc
# run garbage collect manually to avoid too much memory fragmentation
gc.collect()
if cancelled.is_set():
self, player_id, queue_item, cancelled, chunksize=128000, resample=None
):
"""Get audio stream from provider and apply additional effects/processing if needed."""
- # pylint: disable=subprocess-popen-preexec-fn
streamdetails = self.mass.add_job(
self.mass.music_manager.async_get_stream_details(queue_item, player_id)
).result()
if streamdetails.content_type == ContentType.AAC:
# support for AAC created with ffmpeg in between
args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (
- streamdetails.path,
+ decrypt_string(streamdetails.path),
outputfmt,
sox_options,
)
process = subprocess.Popen(
- args,
- shell=True,
- stdout=subprocess.PIPE,
- bufsize=chunksize,
- preexec_fn=os.setsid,
+ args, shell=True, stdout=subprocess.PIPE, bufsize=chunksize
)
elif streamdetails.type in [StreamType.URL, StreamType.FILE]:
args = 'sox -t %s "%s" -t %s - %s' % (
streamdetails.content_type.name,
- streamdetails.path,
+ decrypt_string(streamdetails.path),
outputfmt,
sox_options,
)
args = shlex.split(args)
process = subprocess.Popen(
- args,
- shell=False,
- stdout=subprocess.PIPE,
- bufsize=chunksize,
- preexec_fn=os.setsid,
+ args, shell=False, stdout=subprocess.PIPE, bufsize=chunksize
)
elif streamdetails.type == StreamType.EXECUTABLE:
args = "%s | sox -t %s - -t %s - %s" % (
- streamdetails.path,
+ decrypt_string(streamdetails.path),
streamdetails.content_type.name,
outputfmt,
sox_options,
)
process = subprocess.Popen(
- args,
- shell=True,
- stdout=subprocess.PIPE,
- bufsize=chunksize,
- preexec_fn=os.setsid,
+ args, shell=True, stdout=subprocess.PIPE, bufsize=chunksize
)
else:
LOGGER.warning("no streaming options for %s", queue_item.name)
yield (True, b"")
return
# fire event that streaming has started for this track
- streamdetails.path = "" # invalidate
self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails)
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
if cancelled.is_set():
# http session ended
# send terminate and pick up left over bytes
- # process.terminate()
- os.killpg(os.getpgid(process.pid), signal.SIGHUP)
- os.killpg(os.getpgid(process.pid), signal.SIGTERM)
- # read exactly chunksize of data
- chunk = process.stdout.read(chunksize)
+ process.terminate()
+ chunk, _ = process.communicate()
+ LOGGER.warning(
+ "__get_audio_stream cancelled for track %s on player %s",
+ queue_item.name,
+ player_id,
+ )
+ else:
+ # read exactly chunksize of data
+ chunk = process.stdout.read(chunksize)
if len(chunk) < chunksize:
# last chunk
yield (True, prev_chunk + chunk)
# send task to background to analyse the audio
if queue_item.media_type == MediaType.Track:
self.mass.add_job(self.__analyze_audio, streamdetails)
+ LOGGER.warning(
+ "__get_audio_stream complete for track %s on player %s",
+ queue_item.name,
+ player_id,
+ )
def __get_player_sox_options(
self, player_id: str, streamdetails: StreamDetails
# only when needed we do the analyze stuff
LOGGER.debug("Start analyzing track %s", item_key)
if streamdetails.type == StreamType.URL:
- audio_data = urllib.request.urlopen(streamdetails.path).read()
+ audio_data = urllib.request.urlopen(
+ decrypt_string(streamdetails.path)
+ ).read()
elif streamdetails.type == StreamType.EXECUTABLE:
- audio_data = subprocess.check_output(streamdetails.path, shell=True)
+ audio_data = subprocess.check_output(
+ decrypt_string(streamdetails.path), shell=True
+ )
elif streamdetails.type == StreamType.FILE:
- with open(streamdetails.path, "rb") as _file:
+ with open(decrypt_string(streamdetails.path), "rb") as _file:
audio_data = _file.read()
# calculate BS.1770 R128 integrated loudness
with io.BytesIO(audio_data) as tmpfile:
if self.config.providers[provider.id][CONF_ENABLED]:
if await provider.async_on_start():
provider.available = True
- LOGGER.debug("New provider registered: %s", provider.name)
+ LOGGER.debug("Provider registered: %s", provider.name)
self.signal_event(EVENT_PROVIDER_REGISTERED, provider.id)
else:
- LOGGER.debug("Not loading provider %s as it is disabled:", provider.name)
+ LOGGER.debug("Not loading provider %s as it is disabled", provider.name)
async def register_provider(self, provider: Provider):
"""Register a new Provider/Plugin."""
except Exception as exc:
LOGGER.exception("Error preloading module %s: %s", module_name, exc)
else:
- LOGGER.info("Successfully preloaded module %s", module_name)
+ LOGGER.debug("Successfully preloaded module %s", module_name)
@callback
def signal_event(self, event_msg: str, event_details: Any = None):
self._repeat_enabled = False
self._cur_index = 0
self._cur_item_time = 0
- self._last_item_time = 0
- self._last_queue_startindex = 0
+ self._last_item = None
self._next_queue_startindex = 0
- self._last_track = None
+ self._last_queue_startindex = 0
+ self._last_player_state = PlayerState.Stopped
# load previous queue settings from disk
self.mass.add_job(self.__async_restore_saved_state())
Indicate that we need to use the queue stream.
For example if crossfading is requested but a player doesn't natively support it
- it will send a constant stream of audio to the player with all tracks.
+ we will send a constant stream of audio to the player with all tracks.
"""
supports_crossfade = PlayerFeature.CROSSFADE in self.player.features
supports_queue = PlayerFeature.QUEUE in self.player.features
return
if self.use_queue_stream:
self._next_queue_startindex = index
- queue_stream_uri = "%s/stream/%s" % (
+ self.player.elapsed_time = 0 # set just in case of a race condition
+ queue_stream_uri = "%s/stream/%s?id=%s" % (
self.mass.web.internal_url,
self.player.player_id,
+ self.items[
+ index
+ ].queue_item_id, # just set to invalidate any cache stuff
)
return await player_prov.async_cmd_play_uri(
self.player_id, queue_stream_uri
"cmd_queue_insert not supported by player, fallback to cmd_queue_load "
)
self._items = self._items[index:]
- await player_prov.async_cmd_queue_load(self.player_id, self._items)
+ return await player_prov.async_cmd_queue_load(
+ self.player_id, self._items
+ )
else:
return await player_prov.async_cmd_play_uri(
self.player_id, self._items[index].uri
item.sort_index = insert_at_index + index
if self.shuffle_enabled:
queue_items = self.__shuffle_items(queue_items)
- self._items = (
- self._items[:insert_at_index] + queue_items + self._items[insert_at_index:]
- )
+ if offset == 0:
+ # replace current item with new
+ self._items = (
+ self._items[:insert_at_index]
+ + queue_items
+ + self._items[insert_at_index + 1 :]
+ )
+ else:
+ self._items = (
+ self._items[:insert_at_index]
+ + queue_items
+ + self._items[insert_at_index:]
+ )
if self.use_queue_stream or not supports_queue:
if offset == 0:
await self.async_play_index(insert_at_index)
"cmd_queue_insert not supported by player, fallback to cmd_queue_load "
)
self._items = self._items[self.cur_index :]
- await player_prov.async_cmd_queue_load(self.player_id, self._items)
+ return await player_prov.async_cmd_queue_load(
+ self.player_id, self._items
+ )
self.mass.signal_event(EVENT_QUEUE_ITEMS_UPDATED, self.to_dict())
self.mass.add_job(self.__async_save_state())
item.sort_index = len(self.items) + index
if self.shuffle_enabled:
played_items = self.items[: self.cur_index]
- next_items = self.items[self.cur_index :] + queue_items
+ next_items = self.items[self.cur_index + 1 :] + queue_items
next_items = self.__shuffle_items(next_items)
- items = played_items + next_items
+ items = played_items + [self.cur_item] + next_items
return await self.async_update(items)
self._items = self._items + queue_items
if supports_queue and not self.use_queue_stream:
"cmd_queue_append not supported by player, fallback to cmd_queue_load "
)
self._items = self._items[self.cur_index :]
- await player_prov.async_cmd_queue_load(self.player_id, self._items)
+ return await player_prov.async_cmd_queue_load(
+ self.player_id, self._items
+ )
self.mass.signal_event(EVENT_QUEUE_ITEMS_UPDATED, self.to_dict())
self.mass.add_job(self.__async_save_state())
"cmd_queue_update not supported by player, fallback to cmd_queue_load "
)
self._items = self._items[self.cur_index :]
- await player_prov.async_cmd_queue_load(self.player_id, self._items)
+ return await player_prov.async_cmd_queue_load(
+ self.player_id, self._items
+ )
self.mass.signal_event(EVENT_QUEUE_ITEMS_UPDATED, self.to_dict())
self.mass.add_job(self.__async_save_state())
async def async_update_state(self):
"""Update queue details, called when player updates."""
- cur_index = self._cur_index
+ new_index = self._cur_index
track_time = self._cur_item_time
# handle queue stream
- if self.use_queue_stream and self.player.state == PlayerState.Playing:
- cur_index, track_time = self.__get_queue_stream_index()
+ if (
+ self.use_queue_stream
+ and self.player.state == PlayerState.Playing
+ and self.player.elapsed_time > 1
+ ):
+ new_index, track_time = self.__get_queue_stream_index()
# normal queue based approach
elif not self.use_queue_stream:
track_time = self.player.elapsed_time
for index, queue_item in enumerate(self.items):
if queue_item.uri == self.player.current_uri:
- cur_index = index
+ new_index = index
break
# process new index
- await self.async_process_queue_update(cur_index, track_time)
+ if self._cur_index != new_index:
+ # queue track updated
+ self._next_queue_startindex = self.next_index
+ self._cur_index = new_index
+ # check if a new track is loaded, wait for the streamdetails
+ if self._last_item != self.cur_item and self.cur_item.streamdetails:
+ # new active item in queue
+ self.mass.signal_event(EVENT_QUEUE_UPDATED, self.to_dict())
+ # invalidate previous streamdetails
+ if self._last_item:
+ self._last_item.streamdetails = None
+ self._last_item = self.cur_item
+ # update vars
+ if self._cur_item_time != track_time:
+ self._cur_item_time = track_time
+ self.mass.signal_event(
+ EVENT_QUEUE_TIME_UPDATED,
+ {"player_id": self.player_id, "cur_item_time": track_time},
+ )
async def async_start_queue_stream(self):
"""Call when queue_streamer starts playing the queue stream."""
self._last_queue_startindex = self._next_queue_startindex
+
+ self._cur_item_time = 0
return self.get_item(self._next_queue_startindex)
def to_dict(self):
else:
track_time = elapsed_time_queue - total_time
break
- self._next_queue_startindex = queue_index + 1
return queue_index, track_time
- async def async_process_queue_update(self, new_index, track_time):
- """Compare the queue index to determine if playback changed."""
- new_track = self.get_item(new_index)
- self._cur_item_time = track_time
- self._cur_index = new_index
- if self._last_track != new_track:
- # queue track updated
- self._last_track = new_track
- self.mass.signal_event(EVENT_QUEUE_UPDATED, self.to_dict())
- if self._last_track:
- self._last_track.streamdetails = None # invalidate streamdetails
- # update vars
- if self._last_item_time != track_time:
- self._last_item_time = track_time
- self.mass.signal_event(
- EVENT_QUEUE_TIME_UPDATED,
- {"player_id": self.player_id, "cur_item_time": track_time},
- )
-
@staticmethod
def __shuffle_items(queue_items):
"""Shuffle a list of tracks."""
from music_assistant.models.musicprovider import MusicProvider
from music_assistant.models.provider import ProviderType
from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType
-from music_assistant.utils import compare_strings, run_periodic
+from music_assistant.utils import compare_strings, encrypt_string, run_periodic
from PIL import Image
LOGGER = logging.getLogger("mass")
full_track = media_item
else:
full_track = await self.async_get_track(
- media_item.item_id, media_item.provider, lazy=True, refresh=True
+ media_item.item_id, media_item.provider, lazy=True, refresh=False
)
# sort by quality and check track availability
for prov_media in sorted(
streamdetails = await music_prov.async_get_stream_details(
prov_media.item_id
)
+ if streamdetails:
+ break
- if streamdetails:
- streamdetails.player_id = player_id
- # set streamdetails as attribute on the media_item
- media_item.streamdetails = streamdetails
- return streamdetails
+ if streamdetails:
+ # set player_id on the streamdetails so we know what players stream
+ streamdetails.player_id = player_id
+ # store the path encrypted as we do not want it to be visible in the api
+ streamdetails.path = encrypt_string(streamdetails.path)
+ # set streamdetails as attribute on the media_item
+ # this way the app knows what content is playing
+ media_item.streamdetails = streamdetails
+ return streamdetails
return None
################ Library synchronization logic ################
else:
gain_correct = target_gain - track_loudness
gain_correct = round(gain_correct, 2)
- LOGGER.debug(
- "Loudness level for track %s/%s is %s - calculated replayGain is %s",
- provider_id,
- item_id,
- track_loudness,
- gain_correct,
- )
return gain_correct
async def __async_create_player_state(self, player: Player):
"""Demo/test providers."""
import asyncio
+import logging
import signal
import subprocess
from typing import List
PROV_ID = "demo_player"
PROV_NAME = "Demo/Test players"
+LOGGER = logging.getLogger(PROV_ID)
class DemoPlayerProvider(PlayerProvider):
def __init__(self, *args, **kwargs):
"""Initialize."""
self._players = {}
+ self._progress_tasks = {}
super().__init__(*args, **kwargs)
@property
if player.sox:
await self.async_cmd_stop(player_id)
player.current_uri = uri
- player.sox = subprocess.Popen(["play", uri])
+ player.sox = subprocess.Popen(["play", "-q", uri])
player.state = PlayerState.Playing
player.powered = True
self.mass.add_job(self.mass.player_manager.async_update_player(player))
async def report_progress():
"""Report fake progress while sox is playing."""
+ LOGGER.info("Playback started on player %s", player_id)
player.elapsed_time = 0
- while (
- player.state == PlayerState.Playing
- and player.sox
- and not player.sox.poll()
- ):
+ while player.sox and not player.sox.poll():
await asyncio.sleep(1)
player.elapsed_time += 1
self.mass.add_job(self.mass.player_manager.async_update_player(player))
+ LOGGER.info("Playback stopped on player %s", player_id)
player.elapsed_time = 0
player.state = PlayerState.Stopped
self.mass.add_job(self.mass.player_manager.async_update_player(player))
- self.mass.add_job(report_progress)
+ if self._progress_tasks.get(player_id):
+ self._progress_tasks[player_id].cancel()
+ self._progress_tasks[player_id] = self.mass.add_job(report_progress)
async def async_cmd_stop(self, player_id: str) -> None:
"""
self.__user_auth_info = None
self.__logged_in = False
self._throttler = Throttler(rate_limit=4, period=1)
- self.mass.add_event_listener(self.async_mass_event, EVENT_STREAM_STARTED)
- self.mass.add_event_listener(self.async_mass_event, EVENT_STREAM_ENDED)
+ self.mass.add_event_listener(
+ self.async_mass_event, [EVENT_STREAM_STARTED, EVENT_STREAM_ENDED]
+ )
return True
async def async_search(
if not self.__user_auth_info:
return
# TODO: need to figure out if the streamed track is purchased by user
- if msg == EVENT_STREAM_STARTED and msg_details["provider"] == PROV_ID:
+ if msg == EVENT_STREAM_STARTED and msg_details.provider == PROV_ID:
# report streaming started to qobuz
device_id = self.__user_auth_info["user"]["device"]["id"]
credential_id = self.__user_auth_info["user"]["credential"]["id"]
import memory_tempfile
import unidecode
+from cryptography.fernet import Fernet, InvalidToken
+from music_assistant.app_vars import get_app_var # noqa # pylint: disable=all
try:
import simplejson as json
return tempfile.NamedTemporaryFile(buffering=0)
+def encrypt_string(str_value):
+ """Encrypt a string with Fernet."""
+ return Fernet(get_app_var(3)).encrypt(str_value.encode()).decode()
+
+
+def decrypt_string(str_value):
+ """Decrypt a string with Fernet."""
+ try:
+ return Fernet(get_app_var(3)).decrypt(str_value.encode()).decode()
+ except InvalidToken:
+ return None
+
+
class CustomIntEnum(int, Enum):
"""Base for IntEnum with some helpers."""
player_id = request.match_info.get("player_id")
player_queue = self.mass.player_manager.get_player_queue(player_id)
cmd = request.match_info.get("cmd")
- cmd_args = await request.json()
+ try:
+ cmd_args = await request.json()
+ except json.decoder.JSONDecodeError:
+ cmd_args = None
if cmd == "repeat_enabled":
player_queue.repeat_enabled = cmd_args
elif cmd == "shuffle_enabled":
await player_queue.async_move_item(cmd_args, 1)
elif cmd == "next":
await player_queue.async_move_item(cmd_args, 0)
- return web.json_response(player_queue, dumps=json_serializer)
+ return web.json_response(player_queue.to_dict(), dumps=json_serializer)
@login_required
@routes.get("/api/players/{player_id}")