runs-on: ubuntu-latest
strategy:
matrix:
- python-version: [3.7, 3.8]
+ python-version: [3.8, 3.9]
steps:
- uses: actions/checkout@v2
if not os.path.isdir(data_dir):
os.makedirs(data_dir)
# config debug settings if needed
- if args.debug:
+ if args.debug or bool(os.environ.get("DEBUG")):
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
"""All constants for Music Assistant."""
-__version__ = "0.1.2"
-REQUIRED_PYTHON_VER = "3.7"
+__version__ = "0.1.3"
+REQUIRED_PYTHON_VER = "3.8"
# configuration keys/attributes
CONF_USERNAME = "username"
CONF_URL = "url"
CONF_NAME = "name"
CONF_CROSSFADE_DURATION = "crossfade_duration"
-CONF_FALLBACK_GAIN_CORRECT = "fallback_gain_correct"
CONF_GROUP_DELAY = "group_delay"
CONF_VOLUME_CONTROL = "volume_control"
CONF_POWER_CONTROL = "power_control"
self._proc = None
self._process_args = process_args
self._enable_write = enable_write
- self._cancelled = False
async def __aenter__(self) -> "AsyncProcess":
"""Enter context manager."""
self._proc = await asyncio.create_subprocess_exec(
*self._process_args,
stdin=asyncio.subprocess.PIPE if self._enable_write else None,
- stdout=asyncio.subprocess.PIPE,
- limit=4000000
+ stdout=asyncio.subprocess.PIPE
)
return self
async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
"""Exit context manager."""
- self._cancelled = True
if self._proc.returncode is None:
# prevent subprocess deadlocking, send terminate and read remaining bytes
- if self._enable_write and self._proc.stdin.can_write_eof():
- self._proc.stdin.write_eof()
- self._proc.terminate()
- await self._proc.stdout.read()
+ await self.write_eof()
+ try:
+ self._proc.terminate()
+ await self._proc.stdout.read()
+ self._proc.kill()
+ except (ProcessLookupError, BrokenPipeError):
+ pass
del self._proc
async def iterate_chunks(
async def read(self, chunk_size: int = DEFAULT_CHUNKSIZE) -> bytes:
"""Read x bytes from the process stdout."""
- if self._cancelled:
- raise asyncio.CancelledError()
try:
return await self._proc.stdout.readexactly(chunk_size)
except asyncio.IncompleteReadError as err:
async def write(self, data: bytes) -> None:
"""Write data to process stdin."""
- if self._cancelled or not self._proc:
- raise asyncio.CancelledError()
- if self._proc.stdin.is_closing():
- raise asyncio.CancelledError()
- self._proc.stdin.write(data)
try:
+ self._proc.stdin.write(data)
await self._proc.stdin.drain()
except BrokenPipeError:
pass
async def write_eof(self) -> None:
"""Write eof to process."""
- if self._cancelled:
- raise asyncio.CancelledError()
- if self._proc.stdin.can_write_eof():
+ if not (self._enable_write and self._proc.stdin.can_write_eof()):
+ return
+ try:
self._proc.stdin.write_eof()
+ except BrokenPipeError:
+ pass
async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
"""Write bytes to process and read back results."""
- if self._cancelled:
- raise asyncio.CancelledError()
return await self._proc.communicate(input_data)
from music_assistant.constants import (
CONF_CROSSFADE_DURATION,
CONF_ENABLED,
- CONF_FALLBACK_GAIN_CORRECT,
CONF_GROUP_DELAY,
CONF_KEY_BASE,
CONF_KEY_METADATA_PROVIDERS,
description="desc_target_volume",
depends_on=CONF_VOLUME_NORMALISATION,
),
- ConfigEntry(
- entry_key=CONF_FALLBACK_GAIN_CORRECT,
- entry_type=ConfigEntryType.INT,
- range=(-20, 0),
- default_value=-12,
- label=CONF_FALLBACK_GAIN_CORRECT,
- description="desc_gain_correct",
- depends_on=CONF_VOLUME_NORMALISATION,
- ),
ConfigEntry(
entry_key=CONF_CROSSFADE_DURATION,
entry_type=ConfigEntryType.INT,
# pylint: disable=too-many-lines
import logging
import os
+import statistics
from datetime import datetime
from typing import List, Optional, Set, Union
return result[0]
return None
+ async def get_provider_loudness(self, provider) -> Optional[float]:
+ """Get average integrated loudness for tracks of given provider."""
+ all_items = []
+ async with aiosqlite.connect(self._dbfile, timeout=120) as db_conn:
+ sql_query = """SELECT loudness FROM track_loudness WHERE provider = ?"""
+ async with db_conn.execute(sql_query, (provider,)) as cursor:
+ result = await cursor.fetchone()
+ if result:
+ return result[0]
+ sql_query = """SELECT loudness FROM track_loudness WHERE provider = ?"""
+ async with aiosqlite.connect(self._dbfile, timeout=120) as db_conn:
+ for db_row in await db_conn.execute_fetchall(sql_query, (provider,)):
+ all_items.append(db_row[0])
+ if all_items:
+ return statistics.fmean(all_items)
+ return None
+
async def mark_item_played(self, item_id: str, provider: str):
"""Mark item as played in playlog."""
timestamp = datetime.utcnow().timestamp()
self._players = {}
self._providers = {}
self._player_queues = {}
- self._poll_ticks = 0
self._controls = {}
async def setup(self) -> None:
for player in self:
await player.on_remove()
- @run_periodic(1)
+ @run_periodic(30)
async def poll_task(self):
"""Check for updates on players that need to be polled."""
for player in self:
if not player.player_state.available:
continue
- if player.should_poll and (
- self._poll_ticks >= POLL_INTERVAL
- or player.state == PlaybackState.Playing
- ):
- await player.on_poll()
- if self._poll_ticks >= POLL_INTERVAL:
- self._poll_ticks = 0
- else:
- self._poll_ticks += 1
+ if not player.should_poll:
+ continue
+ await player.on_poll()
@property
def players(self) -> Dict[str, Player]:
if not player_conf["volume_normalisation"]:
return 0
target_gain = int(player_conf["target_volume"])
- fallback_gain = int(player_conf["fallback_gain_correct"])
track_loudness = await self.mass.database.get_track_loudness(
item_id, provider_id
)
if track_loudness is None:
- gain_correct = fallback_gain
- else:
- gain_correct = target_gain - track_loudness
+ # fallback to provider average
+ track_loudness = await self.mass.database.get_provider_loudness(provider_id)
+ if track_loudness is None:
+ # fallback to some (hopefully sane) average value for now
+ track_loudness = -8.5
+ gain_correct = target_gain - track_loudness
gain_correct = round(gain_correct, 2)
return gain_correct
All audio is processed by the SoX executable, using various subprocess streams.
"""
import asyncio
-import gc
import logging
import shlex
import subprocess
output_format: SoxOutputFormat = SoxOutputFormat.FLAC,
resample: Optional[int] = None,
gain_db_adjust: Optional[float] = None,
- chunk_size: int = 4000000,
+ chunk_size: int = 512000,
) -> AsyncGenerator[Tuple[bool, bytes], None]:
"""Get the sox manipulated audio data for the given streamdetails."""
# collect all args for sox
fill_buffer_task = self.mass.loop.create_task(fill_buffer())
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
- prev_chunk = b""
- async for chunk in sox_proc.iterate_chunks(chunk_size):
- if len(chunk) < chunk_size:
- # last chunk
- yield (True, prev_chunk + chunk)
- break
- if prev_chunk:
- yield (False, prev_chunk)
- prev_chunk = chunk
- await asyncio.wait([fill_buffer_task])
- LOGGER.debug(
- "finished sox stream for: %s/%s",
- streamdetails.provider,
- streamdetails.item_id,
- )
+ try:
+ prev_chunk = b""
+ async for chunk in sox_proc.iterate_chunks(chunk_size):
+ if prev_chunk:
+ yield (False, prev_chunk)
+ prev_chunk = chunk
+ # send last chunk
+ yield (True, prev_chunk)
+ except (asyncio.CancelledError, GeneratorExit) as err:
+ LOGGER.debug(
+ "get_sox_stream aborted for: %s/%s",
+ streamdetails.provider,
+ streamdetails.item_id,
+ )
+ fill_buffer_task.cancel()
+ raise err
+ else:
+ LOGGER.debug(
+ "finished sox stream for: %s/%s",
+ streamdetails.provider,
+ streamdetails.item_id,
+ )
async def queue_stream_flac(self, player_id) -> AsyncGenerator[bytes, None]:
"""Stream the PlayerQueue's tracks as constant feed in flac format."""
fill_buffer_task = self.mass.loop.create_task(fill_buffer())
# start yielding audio chunks
- async for chunk in sox_proc.iterate_chunks():
- yield chunk
- await asyncio.wait([fill_buffer_task])
+ try:
+ async for chunk in sox_proc.iterate_chunks():
+ yield chunk
+ except (asyncio.CancelledError, GeneratorExit) as err:
+ LOGGER.debug(
+ "queue_stream_flac aborted for: %s",
+ player_id,
+ )
+ fill_buffer_task.cancel()
+ raise err
+ else:
+ LOGGER.debug(
+ "finished queue_stream_flac for: %s",
+ player_id,
+ )
async def queue_stream_pcm(
self, player_id, sample_rate=96000, bit_depth=32
# HANDLE FIRST PART OF TRACK
if not chunk and bytes_written == 0:
# stream error: got empy first chunk
- # prevent player queue get stuck by sending next track command
- self.mass.add_job(player_queue.next())
LOGGER.error("Stream error on track %s", queue_track.item_id)
- return
+ # prevent player queue get stuck by just skipping to the next track
+ queue_track.duration = 0
+ continue
if cur_chunk <= 2 and not last_fadeout_data:
# no fadeout_part available so just pass it to the output directly
yield chunk
# part is too short after the strip action
# so we just use the entire original data
last_part = prev_chunk + chunk
- if len(last_part) < buffer_size:
- LOGGER.warning(
- "Not enough data for crossfade: %s", len(last_part)
- )
if (
not player_queue.crossfade_enabled
or len(last_part) < buffer_size
):
- # crossfading is not enabled so just pass the (stripped) audio data
+ # crossfading is not enabled or not enough data,
+ # so just pass the (stripped) audio data
+ if not player_queue.crossfade_enabled:
+ LOGGER.warning(
+ "Not enough data for crossfade: %s", len(last_part)
+ )
+
yield last_part
bytes_written += len(last_part)
del last_part
last_fadeout_data = last_part[-buffer_size:]
remaining_bytes = last_part[:-buffer_size]
# write remaining bytes
- yield remaining_bytes
- bytes_written += len(remaining_bytes)
+ if remaining_bytes:
+ yield remaining_bytes
+ bytes_written += len(remaining_bytes)
del last_part
del remaining_bytes
del chunk
queue_track.name,
player_id,
)
- # run garbage collect manually to avoid too much memory fragmentation
- self.mass.add_job(gc.collect)
# end of queue reached, pass last fadeout bits to final output
- yield last_fadeout_data
+ if last_fadeout_data:
+ yield last_fadeout_data
del last_fadeout_data
# END OF QUEUE STREAM
- # run garbage collect manually to avoid too much memory fragmentation
- self.mass.add_job(gc.collect)
LOGGER.info("streaming of queue for player %s completed", player_id)
async def stream_queue_item(
# stream from URL
if stream_type == StreamType.URL:
async with self.mass.http_session.get(stream_path) as response:
- async for chunk in response.content.iter_any():
+ async for chunk, _ in response.content.iter_chunks():
yield chunk
if needs_analyze and len(audio_data) < 100000000:
audio_data += chunk
def global_exception_handler(loop: asyncio.AbstractEventLoop, context: Dict) -> None:
"""Global exception handler."""
- LOGGER.exception(
- "Caught exception: %s", context.get("exception", context["message"])
- )
+ LOGGER.debug("Caught exception: %s", context.get("exception", context["message"]))
if "Broken pipe" in str(context.get("exception")):
# fix for the spamming subprocess
return
include_unavailable: bool = False,
) -> Tuple[Provider]:
"""Return all providers, optionally filtered by type."""
- return (
+ return tuple(
item
for item in self._providers.values()
if (filter_type is None or item.type == filter_type)
is_group_player: bool = False
group_childs: Set[str] = field(default_factory=set)
device_info: DeviceInfo = field(default_factory=DeviceInfo)
- updated_at: datetime = None
+ updated_at: datetime = datetime.now()
group_parents: Set[str] = field(default_factory=set)
features: Set[PlayerFeature] = field(default_factory=set)
active_queue: str = None
setattr(self, key, new_val)
if key != "updated_at":
changed_keys.add(key)
+ if changed_keys:
+ self.updated_at = datetime.now()
return changed_keys
provider_id=self.provider_id,
name=self._get_name(),
powered=self._get_powered(),
- state=self.state,
+ state=self._get_state(),
available=self._get_available(),
volume_level=self._get_volume_level(),
elapsed_time=self.elapsed_time,
group_parents=self._get_group_parents(),
features=self.features,
active_queue=self._get_active_queue(),
- updated_at=datetime.now(),
)
def to_dict(self) -> dict:
"""Instance attributes as dict so it can be serialized to json."""
return {
"queue_id": self.player.player_id,
+ "queue_name": self.player.player_state.name,
"shuffle_enabled": self.shuffle_enabled,
"repeat_enabled": self.repeat_enabled,
"crossfade_enabled": self.crossfade_enabled,
"""Representation of a Cast device on the network."""
import logging
import uuid
-from datetime import datetime
from typing import List, Optional
import pychromecast
# Not playing, return last reported seek time
return self.media_status.current_time
- @property
- def elapsed_milliseconds(self) -> int:
- """Return (realtime) elapsed time of current playing media in milliseconds."""
- if self.media_status is None or not (
- self.media_status.player_is_playing
- or self.media_status.player_is_paused
- or self.media_status.player_is_idle
- ):
- return 0
- if self.media_status.player_is_playing:
- # Add time since last update
- return int(
- (
- self.media_status.current_time
- + (
- datetime.utcnow().timestamp()
- - self.media_status.last_updated.timestamp()
- )
- )
- * 1000
- )
- # Not playing, return last reported seek time
- return self.media_status.current_time * 1000
-
@property
def available(self) -> bool:
"""Return availablity state of this player."""
if self._cast_info.is_audio_group and new_available:
self.mass.add_job(self._chromecast.mz_controller.update_members)
- async def on_poll(self) -> None:
- """Call when player is periodically polled by the player manager (should_poll=True)."""
- if self.active_queue.startswith("group_player"):
- # the group player wants very accurate elapsed_time state so we request it very often
- await self.chromecast_command(
- self._chromecast.media_controller.update_status
- )
- self.update_state()
-
# ========== Service Calls ==========
async def cmd_stop(self) -> None:
async def queue_stream_task(self):
"""Handle streaming queue to connected child players."""
ticks = 0
- while ticks < 60 and len(self.connected_clients) != len(self.group_childs):
+ while ticks < 60 and (len(self.connected_clients) != len(self.group_childs)):
# TODO: Support situation where not all clients of the group are powered
await asyncio.sleep(0.1)
ticks += 1
"max_sample_rate": "Maximum sample rate",
"volume_normalisation": "Enable Volume normalisation",
"target_volume": "Target Volume level",
- "fallback_gain_correct": "Fallback gain correction level",
"desc_player_name": "Set a custom name for this player.",
"crossfade_duration": "Enable crossfade",
"group_delay": "Correction of groupdelay",
"desc_sample_rate": "Set the maximum sample rate this player can handle.",
"desc_volume_normalisation": "Enable R128 volume normalisation to play music at an equally loud volume.",
"desc_target_volume": "Set the preferred target volume level in LUFS. The R128 default is -22 LUFS.",
- "desc_gain_correct": "Set a fallback gain correction when there is no R128 measurement available.",
"desc_crossfade": "Enable crossfading of Queue tracks by setting a crossfade duration in seconds.",
"desc_enable_provider": "Enable this provider.",
"desc_base_username": "Username to access this Music Assistant server.",
"max_sample_rate": "Maximale sample rate",
"volume_normalisation": "Volume normalisering inschakelen",
"target_volume": "Doel volume",
- "fallback_gain_correct": "Fallback gain correctie niveau",
"desc_player_name": "Stel een aangepaste naam in voor deze speler.",
"crossfade_duration": "Crossfade inschakelen",
"security": "Beveiliging",
"desc_sample_rate": "Stel de maximale sample rate in die deze speler aankan.",
"desc_volume_normalisation": "R128 volume normalisatie inschakelen om muziek altijd op een gelijk volume af te spelen.",
"desc_target_volume": "Selecteer het gewenste doelvolume in LUFS. De R128 standaard is -22 LUFS.",
- "desc_gain_correct": "Stel een fallback gain correctie in als er geen R128 meting beschikbaar is.",
"desc_crossfade": "Crossfade inschakelen door het instellen van een crossfade duur in seconden.",
"desc_enable_provider": "Deze provider inschakelen.",
"desc_base_username": "Gebruikersnaam waarmee deze server beveiligd moet worden.",