start_mass(),
use_uvloop=False,
shutdown_callback=on_shutdown,
- executor_workers=64,
+ executor_workers=32,
)
count = 0
while True:
count += 1
- for player in list(self._players.values()):
- player_id = player.player_id
- # if the player is playing, update elapsed time every tick
- # to ensure the queue has accurate details
- player_playing = (
- player.active_queue == player.player_id and player.state == PlayerState.PLAYING
- )
- if player_playing:
- self.mass.loop.call_soon(self.update, player_id)
- # Poll player;
- # - every 360 seconds if the player if not powered
- # - every 30 seconds if the player is powered
- # - every 10 seconds if the player is playing
- if (
- (player.available and player.powered and count % 30 == 0)
- or (player.available and player_playing and count % 10 == 0)
- or count == 360
- ):
- if player_prov := self.get_player_provider(player_id):
- try:
- await player_prov.poll_player(player_id)
- except PlayerUnavailableError:
- player.available = False
- player.state = PlayerState.IDLE
- player.powered = False
- self.mass.loop.call_soon(self.update, player_id)
- except Exception as err: # pylint: disable=broad-except
- LOGGER.warning(
- "Error while requesting latest state from player %s: %s",
- player.display_name,
- str(err),
- exc_info=err,
- )
- if count >= 360:
- count = 0
- await asyncio.sleep(0)
+ async with asyncio.TaskGroup() as tg:
+ for player in list(self._players.values()):
+ player_id = player.player_id
+ # if the player is playing, update elapsed time every tick
+ # to ensure the queue has accurate details
+ player_playing = (
+ player.active_queue == player.player_id
+ and player.state == PlayerState.PLAYING
+ )
+ if player_playing:
+ self.mass.loop.call_soon(self.update, player_id)
+ # Poll player;
+ # - every 360 seconds if the player if not powered
+ # - every 30 seconds if the player is powered
+ # - every 10 seconds if the player is playing
+ if (
+ (player.available and player.powered and count % 30 == 0)
+ or (player.available and player_playing and count % 10 == 0)
+ or count == 360
+ ):
+ if player_prov := self.get_player_provider(player_id):
+ try:
+ tg.create_task(player_prov.poll_player(player_id))
+ except PlayerUnavailableError:
+ player.available = False
+ player.state = PlayerState.IDLE
+ player.powered = False
+ self.mass.loop.call_soon(self.update, player_id)
+ except Exception as err: # pylint: disable=broad-except
+ LOGGER.warning(
+ "Error while requesting latest state from player %s: %s",
+ player.display_name,
+ str(err),
+ exc_info=err,
+ )
+ if count >= 360:
+ count = 0
await asyncio.sleep(1)
# feed stdin with pcm audio chunks from origin
async def read_audio():
async for chunk in stream_job.subscribe(player_id):
- await ffmpeg_proc.write(chunk)
+ try:
+ await ffmpeg_proc.write(chunk)
+ except BrokenPipeError:
+ break
ffmpeg_proc.write_eof()
ffmpeg_proc.attach_task(read_audio())
iterator = (
ffmpeg_proc.iter_chunked(icy_meta_interval)
if enable_icy
- else ffmpeg_proc.iter_any()
+ else ffmpeg_proc.iter_chunked(128000)
)
bytes_streamed = 0
async for chunk in iterator:
try:
await resp.write(chunk)
- bytes_streamed += len(chunk)
-
- # do not allow the player to prebuffer more than 60 seconds
- seconds_streamed = int(bytes_streamed / stream_job.pcm_sample_size)
- if (
- seconds_streamed > 120
- and (seconds_streamed - player.corrected_elapsed_time) > 30
- ):
- await asyncio.sleep(1)
-
- if not enable_icy:
- continue
-
- # if icy metadata is enabled, send the icy metadata after the chunk
- item_in_buf = stream_job.queue_item
- if item_in_buf and item_in_buf.streamdetails.stream_title:
- title = item_in_buf.streamdetails.stream_title
- elif item_in_buf and item_in_buf.name:
- title = item_in_buf.name
- else:
- title = "Music Assistant"
- metadata = f"StreamTitle='{title}';".encode()
- while len(metadata) % 16 != 0:
- metadata += b"\x00"
- length = len(metadata)
- length_b = chr(int(length / 16)).encode()
- await resp.write(length_b + metadata)
-
except (BrokenPipeError, ConnectionResetError):
- # connection lost
+ # race condition
break
+ bytes_streamed += len(chunk)
+
+ # do not allow the player to prebuffer more than 60 seconds
+ seconds_streamed = int(bytes_streamed / stream_job.pcm_sample_size)
+ if (
+ seconds_streamed > 120
+ and (seconds_streamed - player.corrected_elapsed_time) > 30
+ ):
+ await asyncio.sleep(1)
+
+ if not enable_icy:
+ continue
+
+ # if icy metadata is enabled, send the icy metadata after the chunk
+ item_in_buf = stream_job.queue_item
+ if item_in_buf and item_in_buf.streamdetails.stream_title:
+ title = item_in_buf.streamdetails.stream_title
+ elif item_in_buf and item_in_buf.name:
+ title = item_in_buf.name
+ else:
+ title = "Music Assistant"
+ metadata = f"StreamTitle='{title}';".encode()
+ while len(metadata) % 16 != 0:
+ metadata += b"\x00"
+ length = len(metadata)
+ length_b = chr(int(length / 16)).encode()
+ await resp.write(length_b + metadata)
return resp
LOGGER = logging.getLogger(__name__)
DEFAULT_CHUNKSIZE = 128000
-DEFAULT_TIMEOUT = 30 * 60
+DEFAULT_TIMEOUT = 60
# pylint: disable=invalid-name
def __init__(
self,
- args: list | str,
+ args: list,
enable_stdin: bool = False,
enable_stdout: bool = True,
enable_stderr: bool = False,
async def __aenter__(self) -> AsyncProcess:
"""Enter context manager."""
- args = " ".join(self._args) if "|" in self._args else self._args
- if isinstance(args, str):
- self._proc = await asyncio.create_subprocess_shell(
- args,
- stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
- stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
- stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
- close_fds=True,
- limit=64 * 1024 * 1024,
- )
- else:
- self._proc = await asyncio.create_subprocess_exec(
- *args,
- stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
- stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
- stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
- close_fds=True,
- limit=64 * 1024 * 1024,
- )
-
- # Fix BrokenPipeError due to a race condition
- # by attaching a default done callback
- def _done_cb(fut: asyncio.Future):
- fut.exception()
-
- self._proc._transport._protocol._stdin_closed.add_done_callback(_done_cb)
-
+ self._proc = await asyncio.create_subprocess_exec(
+ *self._args,
+ stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
+ stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
+ stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
+ close_fds=True,
+ )
return self
async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
"""Exit context manager."""
self.closed = True
- if self._attached_task:
- # cancel the attached reader/writer task
- try:
- self._attached_task.cancel()
- await self._attached_task
- except asyncio.CancelledError:
- pass
+ # make sure the process is cleaned up
+ self.write_eof()
if self._proc.returncode is None:
- # prevent subprocess deadlocking, read remaining bytes
- await self._proc.communicate()
- if self._enable_stdout and not self._proc.stdout.at_eof():
- await self._proc.stdout.read()
- if self._enable_stderr and not self._proc.stderr.at_eof():
- await self._proc.stderr.read()
- if self._proc.returncode is None:
- # just in case?
+ try:
+ async with asyncio.timeout(10):
+ await self._proc.communicate()
+ except TimeoutError:
self._proc.kill()
+ await self._proc.communicate()
+ if self._proc.returncode is None:
+ self._proc.kill()
async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks of n size from the process stdout."""
async def write(self, data: bytes) -> None:
"""Write data to process stdin."""
if self.closed or self._proc.stdin.is_closing():
- raise asyncio.CancelledError()
+ return
self._proc.stdin.write(data)
try:
await self._proc.stdin.drain()
except BrokenPipeError:
- raise asyncio.CancelledError()
+ LOGGER.warning("Attempted write to an already closed process")
def write_eof(self) -> None:
"""Write end of file to to process stdin."""
+ if self.closed or self._proc.stdin.is_closing():
+ return
try:
if self._proc.stdin.can_write_eof():
self._proc.stdin.write_eof()
ConnectionResetError,
):
# already exited, race condition
- return
+ LOGGER.warning("Attempted write to an already closed process")
async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
"""Write bytes to process and read back results."""
async def chunk_feeder():
bytes_read = 0
- async for chunk in input_file:
- await proc.write(chunk)
- bytes_read += len(chunk)
-
- if bytes_read > 25 * 1000000:
- # this is possibly a m4a file with 'moove atom' metadata at the end of the file
- # we'll have to read the entire file to do something with it
- # for now we just ignore/deny these files
- raise RuntimeError("Tags not present at beginning of file")
-
- proc.write_eof()
+ try:
+ async for chunk in input_file:
+ if proc.closed:
+ break
+ await proc.write(chunk)
+ bytes_read += len(chunk)
+ del chunk
+ if bytes_read > 25 * 1000000:
+ # this is possibly a m4a file with 'moove atom' metadata at the
+ # end of the file
+ # we'll have to read the entire file to do something with it
+ # for now we just ignore/deny these files
+ raise RuntimeError("Tags not present at beginning of file")
+ finally:
+ proc.write_eof()
proc.attach_task(chunk_feeder())
data = json.loads(res)
if error := data.get("error"):
raise InvalidDataError(error["string"])
+ if not data.get("streams") or data["streams"][0].get("codec_type") == "video":
+ raise InvalidDataError("Not an audio file")
tags = AudioTags.parse(data)
+ del res
+ del data
if not tags.duration and file_size and tags.bit_rate:
# estimate duration from filesize/bitrate
tags.duration = int((file_size * 8) / tags.bit_rate)
import asyncio
import importlib
import logging
+from collections.abc import AsyncGenerator, Iterator
from functools import lru_cache
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from music_assistant.server.models import ProviderModuleType
return importlib.import_module(f".{domain}", "music_assistant.server.providers")
return await asyncio.to_thread(_get_provider_module, domain)
+
+
+async def async_iter(sync_iterator: Iterator, *args, **kwargs) -> AsyncGenerator[Any, None]:
+ """Wrap blocking iterator into an asynchronous one."""
+ # inspired by: https://stackoverflow.com/questions/62294385/synchronous-generator-in-asyncio
+ loop = asyncio.get_running_loop()
+ queue = asyncio.Queue(1)
+ _end_ = object()
+
+ def iter_to_queue():
+ try:
+ for item in sync_iterator(*args, **kwargs):
+ if queue is None:
+ break
+ asyncio.run_coroutine_threadsafe(queue.put(item), loop).result()
+ finally:
+ asyncio.run_coroutine_threadsafe(queue.put(_end_), loop).result()
+
+ iter_fut = loop.run_in_executor(None, iter_to_queue)
+ try:
+ while True:
+ next_item = await queue.get()
+ if next_item is _end_:
+ break
+ yield next_item
+ finally:
+ queue = None
+ if not iter_fut.done():
+ iter_fut.cancel()
"description": "Support for players that support the Airplay protocol.",
"codeowners": ["@music-assistant"],
"requirements": [],
- "documentation": "",
+ "documentation": "https://github.com/orgs/music-assistant/discussions/1165",
"multi_instance": false,
"builtin": false,
"load_by_default": true,
"name": "Chromecast",
"description": "Support for Chromecast based players.",
"codeowners": ["@music-assistant"],
- "requirements": ["PyChromecast==13.0.5"],
+ "requirements": ["PyChromecast==13.0.6"],
"documentation": "https://github.com/music-assistant/hass-music-assistant/discussions/1138",
"multi_instance": false,
"builtin": false,
url = f"http://webservice.fanart.tv/v3/{endpoint}"
kwargs["api_key"] = app_var(4)
async with self.throttler:
- async with self.mass.http_session.get(url, params=kwargs, verify_ssl=False) as response:
+ async with self.mass.http_session.get(url, params=kwargs, ssl=False) as response:
try:
result = await response.json()
except (
from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.errors import SetupFailedError
from music_assistant.constants import CONF_PATH
+from music_assistant.server.helpers.util import async_iter
from .base import (
CONF_ENTRY_MISSING_ALBUM_ARTIST,
"""
abs_path = get_absolute_path(self.config.get_value(CONF_PATH), path)
- for entry in await asyncio.to_thread(os.scandir, abs_path):
+ async for entry in async_iter(os.scandir, abs_path):
if entry.name.startswith(".") or any(x in entry.name for x in IGNORE_DIRS):
# skip invalid/system files and dirs
continue
Track,
)
from music_assistant.constants import SCHEMA_VERSION, VARIOUS_ARTISTS, VARIOUS_ARTISTS_ID
+from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.helpers.compare import compare_strings
from music_assistant.server.helpers.playlists import parse_m3u, parse_pls
from music_assistant.server.helpers.tags import parse_tags, split_items
track.metadata.images = [
MediaItemImage(ImageType.THUMB, file_item.path, self.instance_id)
]
- elif track.album.image:
+ elif track.album and track.album.image:
track.metadata.images = [track.album.image]
if track.album and not track.album.metadata.images:
return album
+ @use_cache(120)
async def _get_local_images(self, folder: str) -> list[MediaItemImage]:
"""Return local images found in a given folderpath."""
images = []
from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.errors import LoginFailed
from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
+from music_assistant.server.controllers.cache import use_cache
+from music_assistant.server.helpers.util import async_iter
from music_assistant.server.providers.filesystem_local.base import (
CONF_ENTRY_MISSING_ALBUM_ARTIST,
IGNORE_DIRS,
"E.g. 'collections' or 'albums/A-K'.",
),
CONF_ENTRY_MISSING_ALBUM_ARTIST,
- ConfigEntry(
- key=CONF_CONN_LIMIT,
- type=ConfigEntryType.INTEGER,
- label="Connection limit",
- required=False,
- default_value=5,
- advanced=True,
- description="[optional] Limit the number of concurrent connections. "
- "Set the value high(er) for more performance but some (Windows) servers "
- "may deny requests in that case",
- ),
)
server: str = self.config.get_value(CONF_HOST)
share: str = self.config.get_value(CONF_SHARE)
subfolder: str = self.config.get_value(CONF_SUBFOLDER)
- connection_limit: int = self.config.get_value(CONF_CONN_LIMIT)
- self.semaphore = asyncio.Semaphore(connection_limit)
# create windows like path (\\server\share\subfolder)
if subfolder.endswith(os.sep):
subfolder = subfolder[:-1]
+ subfolder = subfolder.replace("\\", os.sep).replace("/", os.sep)
self._root_path = f"{os.sep}{os.sep}{server}{os.sep}{share}{os.sep}{subfolder}"
self.logger.debug("Using root path: %s", self._root_path)
"""
abs_path = get_absolute_path(self._root_path, path)
- async with self.semaphore:
- entries = await asyncio.to_thread(smbclient.scandir, abs_path)
- for entry in entries:
+ async for entry in async_iter(smbclient.scandir, abs_path):
if entry.name.startswith(".") or any(x in entry.name for x in IGNORE_DIRS):
# skip invalid/system files and dirs
continue
item = await create_item(self._root_path, entry)
if recursive and item.is_dir:
- try:
- async for subitem in self.listdir(item.absolute_path, True):
- yield subitem
- except (OSError, PermissionError) as err:
- self.logger.warning("Skip folder %s: %s", item.path, str(err))
+ async for subitem in self.listdir(item.absolute_path, True):
+ yield subitem
else:
yield item
)
# run in thread because strictly taken this may be blocking IO
- async with self.semaphore:
- return await asyncio.to_thread(_create_item)
+ return await asyncio.to_thread(_create_item)
+ @use_cache(120)
async def exists(self, file_path: str) -> bool:
"""Return bool is this FileSystem musicprovider has given file/dir."""
if not file_path:
return False # guard
file_path = file_path.replace("\\", os.sep)
abs_path = get_absolute_path(self._root_path, file_path)
- async with self.semaphore:
- try:
- return await asyncio.to_thread(smbpath.exists, abs_path)
- except Exception as err:
- if "STATUS_OBJECT_NAME_INVALID" in str(err):
- return False
- raise err
+ try:
+ return await asyncio.to_thread(smbpath.exists, abs_path)
+ except Exception as err:
+ if "STATUS_OBJECT_NAME_INVALID" in str(err):
+ return False
+ raise err
async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]:
"""Yield (binary) contents of file in chunks of bytes."""
file_path = file_path.replace("\\", os.sep)
- abs_path = get_absolute_path(self._root_path, file_path)
- chunk_size = 512000
- queue = asyncio.Queue()
- self.logger.debug("Reading file contents for %s", abs_path)
-
- async with self.semaphore:
+ absolute_path = get_absolute_path(self._root_path, file_path)
- def _reader():
- with smbclient.open_file(abs_path, "rb", share_access="r") as _file:
+ def _reader():
+ self.logger.debug("Reading file contents for %s", absolute_path)
+ try:
+ chunk_size = 64000
+ bytes_sent = 0
+ with smbclient.open_file(
+ absolute_path, "rb", buffering=chunk_size, share_access="r"
+ ) as _file:
if seek:
_file.seek(seek)
- # yield chunks of data from file
while True:
- data = _file.read(chunk_size)
- if not data:
+ chunk = _file.read(chunk_size)
+ if not chunk:
break
- self.mass.loop.call_soon_threadsafe(queue.put_nowait, data)
- self.mass.loop.call_soon_threadsafe(queue.put_nowait, b"")
-
- self.mass.create_task(_reader)
- while True:
- chunk = await queue.get()
- if chunk == b"":
- break
- yield chunk
+ yield chunk
+ bytes_sent += len(chunk)
+ finally:
+ self.logger.debug(
+ "Finished Reading file contents for %s - bytes transferred: %s",
+ absolute_path,
+ bytes_sent,
+ )
+
+ async for chunk in async_iter(_reader):
+ yield chunk
async def write_file_content(self, file_path: str, data: bytes) -> None:
"""Write entire file content as bytes (e.g. for playlists)."""
with smbclient.open_file(abs_path, "wb") as _file:
_file.write(data)
- async with self.semaphore:
- await asyncio.to_thread(_writer)
+ await asyncio.to_thread(_writer)
kwargs["fmt"] = "json" # type: ignore[assignment]
async with self.throttler:
async with self.mass.http_session.get(
- url, headers=headers, params=kwargs, verify_ssl=False
+ url, headers=headers, params=kwargs, ssl=False
) as response:
try:
result = await response.json()
kwargs["user_auth_token"] = await self._auth_token()
async with self._throttler:
async with self.mass.http_session.get(
- url, headers=headers, params=kwargs, verify_ssl=False
+ url, headers=headers, params=kwargs, ssl=False
) as response:
try:
# make sure status is 200
params["app_id"] = app_var(0)
params["user_auth_token"] = await self._auth_token()
async with self.mass.http_session.post(
- url, params=params, json=data, verify_ssl=False
+ url, params=params, json=data, ssl=False
) as response:
try:
result = await response.json()
time_start = time.time()
try:
async with self.mass.http_session.get(
- url, headers=headers, params=kwargs, verify_ssl=False, timeout=120
+ url, headers=headers, params=kwargs, ssl=False, timeout=120
) as response:
result = await response.json()
if "error" in result or ("status" in result and "error" in result["status"]):
return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}
async with self.mass.http_session.delete(
- url, headers=headers, params=kwargs, json=data, verify_ssl=False
+ url, headers=headers, params=kwargs, json=data, ssl=False
) as response:
return await response.text()
return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}
async with self.mass.http_session.put(
- url, headers=headers, params=kwargs, json=data, verify_ssl=False
+ url, headers=headers, params=kwargs, json=data, ssl=False
) as response:
return await response.text()
return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}
async with self.mass.http_session.post(
- url, headers=headers, params=kwargs, json=data, verify_ssl=False
+ url, headers=headers, params=kwargs, json=data, ssl=False
) as response:
return await response.text()
"""Get data from api."""
url = f"https://theaudiodb.com/api/v1/json/{app_var(3)}/{endpoint}"
async with self.throttler:
- async with self.mass.http_session.get(url, params=kwargs, verify_ssl=False) as response:
+ async with self.mass.http_session.get(url, params=kwargs, ssl=False) as response:
try:
result = await response.json()
except (
kwargs["partnerId"] = "1"
kwargs["render"] = "json"
async with self._throttler:
- async with self.mass.http_session.get(url, params=kwargs, verify_ssl=False) as response:
+ async with self.mass.http_session.get(url, params=kwargs, ssl=False) as response:
result = await response.json()
if not result or "error" in result:
self.logger.error(url)
url,
headers=self._headers,
json=data,
- verify_ssl=False,
+ ssl=False,
cookies=self._cookies,
) as response:
return await response.json()
return existing
if asyncio.iscoroutinefunction(target):
task = self.loop.create_task(target(*args, **kwargs))
- elif isinstance(target, asyncio.Future):
- task = target
elif asyncio.iscoroutine(target):
task = self.loop.create_task(target)
+ elif isinstance(target, asyncio.Future):
+ task = target
else:
# assume normal callable (non coroutine or awaitable)
task = self.loop.create_task(asyncio.to_thread(target, *args, **kwargs))
music-assistant-frontend==20230327.1
orjson==3.8.7
pillow==9.4.0
+PyChromecast==13.0.6
plexapi==4.13.2
PyChromecast==13.0.5
python-slugify==8.0.1