From c35b8a14eb46c23da2d79dadcfb40dd098c1b1a6 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 28 Mar 2023 18:20:29 +0200 Subject: [PATCH] Fix memory leaks and some performance tweaks (#587) * add docs for airplay * verify_ssl is deprecated * limit number of workers a bit * change order * reduce chunk sizes * Fix memory leaks in File providers * use taskgroup for poll players * allow max 25mb for reading tags * force close process * bump pychromecast to 13.0.6 fixes a memory leak --- music_assistant/__main__.py | 2 +- music_assistant/server/controllers/players.py | 73 +++++++-------- music_assistant/server/controllers/streams.py | 64 ++++++------- music_assistant/server/helpers/process.py | 70 +++++---------- music_assistant/server/helpers/tags.py | 30 ++++--- music_assistant/server/helpers/util.py | 32 ++++++- .../server/providers/airplay/manifest.json | 2 +- .../server/providers/chromecast/manifest.json | 2 +- .../server/providers/fanarttv/__init__.py | 2 +- .../providers/filesystem_local/__init__.py | 3 +- .../server/providers/filesystem_local/base.py | 4 +- .../providers/filesystem_smb/__init__.py | 89 ++++++++----------- .../server/providers/musicbrainz/__init__.py | 2 +- .../server/providers/qobuz/__init__.py | 4 +- .../server/providers/spotify/__init__.py | 8 +- .../server/providers/theaudiodb/__init__.py | 2 +- .../server/providers/tunein/__init__.py | 2 +- .../server/providers/ytmusic/__init__.py | 2 +- music_assistant/server/server.py | 4 +- requirements_all.txt | 1 + 20 files changed, 202 insertions(+), 196 deletions(-) diff --git a/music_assistant/__main__.py b/music_assistant/__main__.py index 960a487d..9d40c04a 100644 --- a/music_assistant/__main__.py +++ b/music_assistant/__main__.py @@ -108,7 +108,7 @@ def main(): start_mass(), use_uvloop=False, shutdown_callback=on_shutdown, - executor_workers=64, + executor_workers=32, ) diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index 4e81031f..8a486081 100755 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -521,40 +521,41 @@ class PlayerController: count = 0 while True: count += 1 - for player in list(self._players.values()): - player_id = player.player_id - # if the player is playing, update elapsed time every tick - # to ensure the queue has accurate details - player_playing = ( - player.active_queue == player.player_id and player.state == PlayerState.PLAYING - ) - if player_playing: - self.mass.loop.call_soon(self.update, player_id) - # Poll player; - # - every 360 seconds if the player if not powered - # - every 30 seconds if the player is powered - # - every 10 seconds if the player is playing - if ( - (player.available and player.powered and count % 30 == 0) - or (player.available and player_playing and count % 10 == 0) - or count == 360 - ): - if player_prov := self.get_player_provider(player_id): - try: - await player_prov.poll_player(player_id) - except PlayerUnavailableError: - player.available = False - player.state = PlayerState.IDLE - player.powered = False - self.mass.loop.call_soon(self.update, player_id) - except Exception as err: # pylint: disable=broad-except - LOGGER.warning( - "Error while requesting latest state from player %s: %s", - player.display_name, - str(err), - exc_info=err, - ) - if count >= 360: - count = 0 - await asyncio.sleep(0) + async with asyncio.TaskGroup() as tg: + for player in list(self._players.values()): + player_id = player.player_id + # if the player is playing, update elapsed time every tick + # to ensure the queue has accurate details + player_playing = ( + player.active_queue == player.player_id + and player.state == PlayerState.PLAYING + ) + if player_playing: + self.mass.loop.call_soon(self.update, player_id) + # Poll player; + # - every 360 seconds if the player if not powered + # - every 30 seconds if the player is powered + # - every 10 seconds if the player is playing + if ( + (player.available and player.powered and count % 30 == 0) + or (player.available and player_playing and count % 10 == 0) + or count == 360 + ): + if player_prov := self.get_player_provider(player_id): + try: + tg.create_task(player_prov.poll_player(player_id)) + except PlayerUnavailableError: + player.available = False + player.state = PlayerState.IDLE + player.powered = False + self.mass.loop.call_soon(self.update, player_id) + except Exception as err: # pylint: disable=broad-except + LOGGER.warning( + "Error while requesting latest state from player %s: %s", + player.display_name, + str(err), + exc_info=err, + ) + if count >= 360: + count = 0 await asyncio.sleep(1) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index eef53b5d..b22885f3 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -397,7 +397,10 @@ class StreamsController: # feed stdin with pcm audio chunks from origin async def read_audio(): async for chunk in stream_job.subscribe(player_id): - await ffmpeg_proc.write(chunk) + try: + await ffmpeg_proc.write(chunk) + except BrokenPipeError: + break ffmpeg_proc.write_eof() ffmpeg_proc.attach_task(read_audio()) @@ -406,7 +409,7 @@ class StreamsController: iterator = ( ffmpeg_proc.iter_chunked(icy_meta_interval) if enable_icy - else ffmpeg_proc.iter_any() + else ffmpeg_proc.iter_chunked(128000) ) bytes_streamed = 0 @@ -414,37 +417,36 @@ class StreamsController: async for chunk in iterator: try: await resp.write(chunk) - bytes_streamed += len(chunk) - - # do not allow the player to prebuffer more than 60 seconds - seconds_streamed = int(bytes_streamed / stream_job.pcm_sample_size) - if ( - seconds_streamed > 120 - and (seconds_streamed - player.corrected_elapsed_time) > 30 - ): - await asyncio.sleep(1) - - if not enable_icy: - continue - - # if icy metadata is enabled, send the icy metadata after the chunk - item_in_buf = stream_job.queue_item - if item_in_buf and item_in_buf.streamdetails.stream_title: - title = item_in_buf.streamdetails.stream_title - elif item_in_buf and item_in_buf.name: - title = item_in_buf.name - else: - title = "Music Assistant" - metadata = f"StreamTitle='{title}';".encode() - while len(metadata) % 16 != 0: - metadata += b"\x00" - length = len(metadata) - length_b = chr(int(length / 16)).encode() - await resp.write(length_b + metadata) - except (BrokenPipeError, ConnectionResetError): - # connection lost + # race condition break + bytes_streamed += len(chunk) + + # do not allow the player to prebuffer more than 60 seconds + seconds_streamed = int(bytes_streamed / stream_job.pcm_sample_size) + if ( + seconds_streamed > 120 + and (seconds_streamed - player.corrected_elapsed_time) > 30 + ): + await asyncio.sleep(1) + + if not enable_icy: + continue + + # if icy metadata is enabled, send the icy metadata after the chunk + item_in_buf = stream_job.queue_item + if item_in_buf and item_in_buf.streamdetails.stream_title: + title = item_in_buf.streamdetails.stream_title + elif item_in_buf and item_in_buf.name: + title = item_in_buf.name + else: + title = "Music Assistant" + metadata = f"StreamTitle='{title}';".encode() + while len(metadata) % 16 != 0: + metadata += b"\x00" + length = len(metadata) + length_b = chr(int(length / 16)).encode() + await resp.write(length_b + metadata) return resp diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index c272426e..e082e5f2 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -12,7 +12,7 @@ from collections.abc import AsyncGenerator, Coroutine LOGGER = logging.getLogger(__name__) DEFAULT_CHUNKSIZE = 128000 -DEFAULT_TIMEOUT = 30 * 60 +DEFAULT_TIMEOUT = 60 # pylint: disable=invalid-name @@ -22,7 +22,7 @@ class AsyncProcess: def __init__( self, - args: list | str, + args: list, enable_stdin: bool = False, enable_stdout: bool = True, enable_stderr: bool = False, @@ -38,55 +38,29 @@ class AsyncProcess: async def __aenter__(self) -> AsyncProcess: """Enter context manager.""" - args = " ".join(self._args) if "|" in self._args else self._args - if isinstance(args, str): - self._proc = await asyncio.create_subprocess_shell( - args, - stdin=asyncio.subprocess.PIPE if self._enable_stdin else None, - stdout=asyncio.subprocess.PIPE if self._enable_stdout else None, - stderr=asyncio.subprocess.PIPE if self._enable_stderr else None, - close_fds=True, - limit=64 * 1024 * 1024, - ) - else: - self._proc = await asyncio.create_subprocess_exec( - *args, - stdin=asyncio.subprocess.PIPE if self._enable_stdin else None, - stdout=asyncio.subprocess.PIPE if self._enable_stdout else None, - stderr=asyncio.subprocess.PIPE if self._enable_stderr else None, - close_fds=True, - limit=64 * 1024 * 1024, - ) - - # Fix BrokenPipeError due to a race condition - # by attaching a default done callback - def _done_cb(fut: asyncio.Future): - fut.exception() - - self._proc._transport._protocol._stdin_closed.add_done_callback(_done_cb) - + self._proc = await asyncio.create_subprocess_exec( + *self._args, + stdin=asyncio.subprocess.PIPE if self._enable_stdin else None, + stdout=asyncio.subprocess.PIPE if self._enable_stdout else None, + stderr=asyncio.subprocess.PIPE if self._enable_stderr else None, + close_fds=True, + ) return self async def __aexit__(self, exc_type, exc_value, traceback) -> bool: """Exit context manager.""" self.closed = True - if self._attached_task: - # cancel the attached reader/writer task - try: - self._attached_task.cancel() - await self._attached_task - except asyncio.CancelledError: - pass + # make sure the process is cleaned up + self.write_eof() if self._proc.returncode is None: - # prevent subprocess deadlocking, read remaining bytes - await self._proc.communicate() - if self._enable_stdout and not self._proc.stdout.at_eof(): - await self._proc.stdout.read() - if self._enable_stderr and not self._proc.stderr.at_eof(): - await self._proc.stderr.read() - if self._proc.returncode is None: - # just in case? + try: + async with asyncio.timeout(10): + await self._proc.communicate() + except TimeoutError: self._proc.kill() + await self._proc.communicate() + if self._proc.returncode is None: + self._proc.kill() async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks of n size from the process stdout.""" @@ -127,15 +101,17 @@ class AsyncProcess: async def write(self, data: bytes) -> None: """Write data to process stdin.""" if self.closed or self._proc.stdin.is_closing(): - raise asyncio.CancelledError() + return self._proc.stdin.write(data) try: await self._proc.stdin.drain() except BrokenPipeError: - raise asyncio.CancelledError() + LOGGER.warning("Attempted write to an already closed process") def write_eof(self) -> None: """Write end of file to to process stdin.""" + if self.closed or self._proc.stdin.is_closing(): + return try: if self._proc.stdin.can_write_eof(): self._proc.stdin.write_eof() @@ -147,7 +123,7 @@ class AsyncProcess: ConnectionResetError, ): # already exited, race condition - return + LOGGER.warning("Attempted write to an already closed process") async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]: """Write bytes to process and read back results.""" diff --git a/music_assistant/server/helpers/tags.py b/music_assistant/server/helpers/tags.py index af557efa..00c2d582 100644 --- a/music_assistant/server/helpers/tags.py +++ b/music_assistant/server/helpers/tags.py @@ -316,17 +316,21 @@ async def parse_tags( async def chunk_feeder(): bytes_read = 0 - async for chunk in input_file: - await proc.write(chunk) - bytes_read += len(chunk) - - if bytes_read > 25 * 1000000: - # this is possibly a m4a file with 'moove atom' metadata at the end of the file - # we'll have to read the entire file to do something with it - # for now we just ignore/deny these files - raise RuntimeError("Tags not present at beginning of file") - - proc.write_eof() + try: + async for chunk in input_file: + if proc.closed: + break + await proc.write(chunk) + bytes_read += len(chunk) + del chunk + if bytes_read > 25 * 1000000: + # this is possibly a m4a file with 'moove atom' metadata at the + # end of the file + # we'll have to read the entire file to do something with it + # for now we just ignore/deny these files + raise RuntimeError("Tags not present at beginning of file") + finally: + proc.write_eof() proc.attach_task(chunk_feeder()) @@ -335,7 +339,11 @@ async def parse_tags( data = json.loads(res) if error := data.get("error"): raise InvalidDataError(error["string"]) + if not data.get("streams") or data["streams"][0].get("codec_type") == "video": + raise InvalidDataError("Not an audio file") tags = AudioTags.parse(data) + del res + del data if not tags.duration and file_size and tags.bit_rate: # estimate duration from filesize/bitrate tags.duration = int((file_size * 8) / tags.bit_rate) diff --git a/music_assistant/server/helpers/util.py b/music_assistant/server/helpers/util.py index 3a6f3b18..b35a9a9a 100644 --- a/music_assistant/server/helpers/util.py +++ b/music_assistant/server/helpers/util.py @@ -4,8 +4,9 @@ from __future__ import annotations import asyncio import importlib import logging +from collections.abc import AsyncGenerator, Iterator from functools import lru_cache -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from music_assistant.server.models import ProviderModuleType @@ -37,3 +38,32 @@ async def get_provider_module(domain: str) -> ProviderModuleType: return importlib.import_module(f".{domain}", "music_assistant.server.providers") return await asyncio.to_thread(_get_provider_module, domain) + + +async def async_iter(sync_iterator: Iterator, *args, **kwargs) -> AsyncGenerator[Any, None]: + """Wrap blocking iterator into an asynchronous one.""" + # inspired by: https://stackoverflow.com/questions/62294385/synchronous-generator-in-asyncio + loop = asyncio.get_running_loop() + queue = asyncio.Queue(1) + _end_ = object() + + def iter_to_queue(): + try: + for item in sync_iterator(*args, **kwargs): + if queue is None: + break + asyncio.run_coroutine_threadsafe(queue.put(item), loop).result() + finally: + asyncio.run_coroutine_threadsafe(queue.put(_end_), loop).result() + + iter_fut = loop.run_in_executor(None, iter_to_queue) + try: + while True: + next_item = await queue.get() + if next_item is _end_: + break + yield next_item + finally: + queue = None + if not iter_fut.done(): + iter_fut.cancel() diff --git a/music_assistant/server/providers/airplay/manifest.json b/music_assistant/server/providers/airplay/manifest.json index 1b7bb7ee..4e26fb48 100644 --- a/music_assistant/server/providers/airplay/manifest.json +++ b/music_assistant/server/providers/airplay/manifest.json @@ -5,7 +5,7 @@ "description": "Support for players that support the Airplay protocol.", "codeowners": ["@music-assistant"], "requirements": [], - "documentation": "", + "documentation": "https://github.com/orgs/music-assistant/discussions/1165", "multi_instance": false, "builtin": false, "load_by_default": true, diff --git a/music_assistant/server/providers/chromecast/manifest.json b/music_assistant/server/providers/chromecast/manifest.json index bc7cc258..9e044cdf 100644 --- a/music_assistant/server/providers/chromecast/manifest.json +++ b/music_assistant/server/providers/chromecast/manifest.json @@ -4,7 +4,7 @@ "name": "Chromecast", "description": "Support for Chromecast based players.", "codeowners": ["@music-assistant"], - "requirements": ["PyChromecast==13.0.5"], + "requirements": ["PyChromecast==13.0.6"], "documentation": "https://github.com/music-assistant/hass-music-assistant/discussions/1138", "multi_instance": false, "builtin": false, diff --git a/music_assistant/server/providers/fanarttv/__init__.py b/music_assistant/server/providers/fanarttv/__init__.py index 62999fda..c0926c06 100644 --- a/music_assistant/server/providers/fanarttv/__init__.py +++ b/music_assistant/server/providers/fanarttv/__init__.py @@ -110,7 +110,7 @@ class FanartTvMetadataProvider(MetadataProvider): url = f"http://webservice.fanart.tv/v3/{endpoint}" kwargs["api_key"] = app_var(4) async with self.throttler: - async with self.mass.http_session.get(url, params=kwargs, verify_ssl=False) as response: + async with self.mass.http_session.get(url, params=kwargs, ssl=False) as response: try: result = await response.json() except ( diff --git a/music_assistant/server/providers/filesystem_local/__init__.py b/music_assistant/server/providers/filesystem_local/__init__.py index af3d6690..8469ce16 100644 --- a/music_assistant/server/providers/filesystem_local/__init__.py +++ b/music_assistant/server/providers/filesystem_local/__init__.py @@ -14,6 +14,7 @@ from music_assistant.common.models.config_entries import ConfigEntry from music_assistant.common.models.enums import ConfigEntryType from music_assistant.common.models.errors import SetupFailedError from music_assistant.constants import CONF_PATH +from music_assistant.server.helpers.util import async_iter from .base import ( CONF_ENTRY_MISSING_ALBUM_ARTIST, @@ -103,7 +104,7 @@ class LocalFileSystemProvider(FileSystemProviderBase): """ abs_path = get_absolute_path(self.config.get_value(CONF_PATH), path) - for entry in await asyncio.to_thread(os.scandir, abs_path): + async for entry in async_iter(os.scandir, abs_path): if entry.name.startswith(".") or any(x in entry.name for x in IGNORE_DIRS): # skip invalid/system files and dirs continue diff --git a/music_assistant/server/providers/filesystem_local/base.py b/music_assistant/server/providers/filesystem_local/base.py index 355fe4cd..bd027de5 100644 --- a/music_assistant/server/providers/filesystem_local/base.py +++ b/music_assistant/server/providers/filesystem_local/base.py @@ -38,6 +38,7 @@ from music_assistant.common.models.media_items import ( Track, ) from music_assistant.constants import SCHEMA_VERSION, VARIOUS_ARTISTS, VARIOUS_ARTISTS_ID +from music_assistant.server.controllers.cache import use_cache from music_assistant.server.helpers.compare import compare_strings from music_assistant.server.helpers.playlists import parse_m3u, parse_pls from music_assistant.server.helpers.tags import parse_tags, split_items @@ -659,7 +660,7 @@ class FileSystemProviderBase(MusicProvider): track.metadata.images = [ MediaItemImage(ImageType.THUMB, file_item.path, self.instance_id) ] - elif track.album.image: + elif track.album and track.album.image: track.metadata.images = [track.album.image] if track.album and not track.album.metadata.images: @@ -822,6 +823,7 @@ class FileSystemProviderBase(MusicProvider): return album + @use_cache(120) async def _get_local_images(self, folder: str) -> list[MediaItemImage]: """Return local images found in a given folderpath.""" images = [] diff --git a/music_assistant/server/providers/filesystem_smb/__init__.py b/music_assistant/server/providers/filesystem_smb/__init__.py index 4ffc496f..fa29cc62 100644 --- a/music_assistant/server/providers/filesystem_smb/__init__.py +++ b/music_assistant/server/providers/filesystem_smb/__init__.py @@ -16,6 +16,8 @@ from music_assistant.common.models.config_entries import ConfigEntry from music_assistant.common.models.enums import ConfigEntryType from music_assistant.common.models.errors import LoginFailed from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME +from music_assistant.server.controllers.cache import use_cache +from music_assistant.server.helpers.util import async_iter from music_assistant.server.providers.filesystem_local.base import ( CONF_ENTRY_MISSING_ALBUM_ARTIST, IGNORE_DIRS, @@ -104,17 +106,6 @@ async def get_config_entries( "E.g. 'collections' or 'albums/A-K'.", ), CONF_ENTRY_MISSING_ALBUM_ARTIST, - ConfigEntry( - key=CONF_CONN_LIMIT, - type=ConfigEntryType.INTEGER, - label="Connection limit", - required=False, - default_value=5, - advanced=True, - description="[optional] Limit the number of concurrent connections. " - "Set the value high(er) for more performance but some (Windows) servers " - "may deny requests in that case", - ), ) @@ -147,12 +138,11 @@ class SMBFileSystemProvider(FileSystemProviderBase): server: str = self.config.get_value(CONF_HOST) share: str = self.config.get_value(CONF_SHARE) subfolder: str = self.config.get_value(CONF_SUBFOLDER) - connection_limit: int = self.config.get_value(CONF_CONN_LIMIT) - self.semaphore = asyncio.Semaphore(connection_limit) # create windows like path (\\server\share\subfolder) if subfolder.endswith(os.sep): subfolder = subfolder[:-1] + subfolder = subfolder.replace("\\", os.sep).replace("/", os.sep) self._root_path = f"{os.sep}{os.sep}{server}{os.sep}{share}{os.sep}{subfolder}" self.logger.debug("Using root path: %s", self._root_path) @@ -198,19 +188,14 @@ class SMBFileSystemProvider(FileSystemProviderBase): """ abs_path = get_absolute_path(self._root_path, path) - async with self.semaphore: - entries = await asyncio.to_thread(smbclient.scandir, abs_path) - for entry in entries: + async for entry in async_iter(smbclient.scandir, abs_path): if entry.name.startswith(".") or any(x in entry.name for x in IGNORE_DIRS): # skip invalid/system files and dirs continue item = await create_item(self._root_path, entry) if recursive and item.is_dir: - try: - async for subitem in self.listdir(item.absolute_path, True): - yield subitem - except (OSError, PermissionError) as err: - self.logger.warning("Skip folder %s: %s", item.path, str(err)) + async for subitem in self.listdir(item.absolute_path, True): + yield subitem else: yield item @@ -238,51 +223,52 @@ class SMBFileSystemProvider(FileSystemProviderBase): ) # run in thread because strictly taken this may be blocking IO - async with self.semaphore: - return await asyncio.to_thread(_create_item) + return await asyncio.to_thread(_create_item) + @use_cache(120) async def exists(self, file_path: str) -> bool: """Return bool is this FileSystem musicprovider has given file/dir.""" if not file_path: return False # guard file_path = file_path.replace("\\", os.sep) abs_path = get_absolute_path(self._root_path, file_path) - async with self.semaphore: - try: - return await asyncio.to_thread(smbpath.exists, abs_path) - except Exception as err: - if "STATUS_OBJECT_NAME_INVALID" in str(err): - return False - raise err + try: + return await asyncio.to_thread(smbpath.exists, abs_path) + except Exception as err: + if "STATUS_OBJECT_NAME_INVALID" in str(err): + return False + raise err async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]: """Yield (binary) contents of file in chunks of bytes.""" file_path = file_path.replace("\\", os.sep) - abs_path = get_absolute_path(self._root_path, file_path) - chunk_size = 512000 - queue = asyncio.Queue() - self.logger.debug("Reading file contents for %s", abs_path) - - async with self.semaphore: + absolute_path = get_absolute_path(self._root_path, file_path) - def _reader(): - with smbclient.open_file(abs_path, "rb", share_access="r") as _file: + def _reader(): + self.logger.debug("Reading file contents for %s", absolute_path) + try: + chunk_size = 64000 + bytes_sent = 0 + with smbclient.open_file( + absolute_path, "rb", buffering=chunk_size, share_access="r" + ) as _file: if seek: _file.seek(seek) - # yield chunks of data from file while True: - data = _file.read(chunk_size) - if not data: + chunk = _file.read(chunk_size) + if not chunk: break - self.mass.loop.call_soon_threadsafe(queue.put_nowait, data) - self.mass.loop.call_soon_threadsafe(queue.put_nowait, b"") - - self.mass.create_task(_reader) - while True: - chunk = await queue.get() - if chunk == b"": - break - yield chunk + yield chunk + bytes_sent += len(chunk) + finally: + self.logger.debug( + "Finished Reading file contents for %s - bytes transferred: %s", + absolute_path, + bytes_sent, + ) + + async for chunk in async_iter(_reader): + yield chunk async def write_file_content(self, file_path: str, data: bytes) -> None: """Write entire file content as bytes (e.g. for playlists).""" @@ -293,5 +279,4 @@ class SMBFileSystemProvider(FileSystemProviderBase): with smbclient.open_file(abs_path, "wb") as _file: _file.write(data) - async with self.semaphore: - await asyncio.to_thread(_writer) + await asyncio.to_thread(_writer) diff --git a/music_assistant/server/providers/musicbrainz/__init__.py b/music_assistant/server/providers/musicbrainz/__init__.py index 631df430..14ffef43 100644 --- a/music_assistant/server/providers/musicbrainz/__init__.py +++ b/music_assistant/server/providers/musicbrainz/__init__.py @@ -186,7 +186,7 @@ class MusicbrainzProvider(MetadataProvider): kwargs["fmt"] = "json" # type: ignore[assignment] async with self.throttler: async with self.mass.http_session.get( - url, headers=headers, params=kwargs, verify_ssl=False + url, headers=headers, params=kwargs, ssl=False ) as response: try: result = await response.json() diff --git a/music_assistant/server/providers/qobuz/__init__.py b/music_assistant/server/providers/qobuz/__init__.py index 91a00628..98654200 100644 --- a/music_assistant/server/providers/qobuz/__init__.py +++ b/music_assistant/server/providers/qobuz/__init__.py @@ -643,7 +643,7 @@ class QobuzProvider(MusicProvider): kwargs["user_auth_token"] = await self._auth_token() async with self._throttler: async with self.mass.http_session.get( - url, headers=headers, params=kwargs, verify_ssl=False + url, headers=headers, params=kwargs, ssl=False ) as response: try: # make sure status is 200 @@ -677,7 +677,7 @@ class QobuzProvider(MusicProvider): params["app_id"] = app_var(0) params["user_auth_token"] = await self._auth_token() async with self.mass.http_session.post( - url, params=params, json=data, verify_ssl=False + url, params=params, json=data, ssl=False ) as response: try: result = await response.json() diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index a89b5473..c528d7ab 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -670,7 +670,7 @@ class SpotifyProvider(MusicProvider): time_start = time.time() try: async with self.mass.http_session.get( - url, headers=headers, params=kwargs, verify_ssl=False, timeout=120 + url, headers=headers, params=kwargs, ssl=False, timeout=120 ) as response: result = await response.json() if "error" in result or ("status" in result and "error" in result["status"]): @@ -698,7 +698,7 @@ class SpotifyProvider(MusicProvider): return None headers = {"Authorization": f'Bearer {token["accessToken"]}'} async with self.mass.http_session.delete( - url, headers=headers, params=kwargs, json=data, verify_ssl=False + url, headers=headers, params=kwargs, json=data, ssl=False ) as response: return await response.text() @@ -710,7 +710,7 @@ class SpotifyProvider(MusicProvider): return None headers = {"Authorization": f'Bearer {token["accessToken"]}'} async with self.mass.http_session.put( - url, headers=headers, params=kwargs, json=data, verify_ssl=False + url, headers=headers, params=kwargs, json=data, ssl=False ) as response: return await response.text() @@ -722,7 +722,7 @@ class SpotifyProvider(MusicProvider): return None headers = {"Authorization": f'Bearer {token["accessToken"]}'} async with self.mass.http_session.post( - url, headers=headers, params=kwargs, json=data, verify_ssl=False + url, headers=headers, params=kwargs, json=data, ssl=False ) as response: return await response.text() diff --git a/music_assistant/server/providers/theaudiodb/__init__.py b/music_assistant/server/providers/theaudiodb/__init__.py index 6e8236d0..adecc469 100644 --- a/music_assistant/server/providers/theaudiodb/__init__.py +++ b/music_assistant/server/providers/theaudiodb/__init__.py @@ -301,7 +301,7 @@ class AudioDbMetadataProvider(MetadataProvider): """Get data from api.""" url = f"https://theaudiodb.com/api/v1/json/{app_var(3)}/{endpoint}" async with self.throttler: - async with self.mass.http_session.get(url, params=kwargs, verify_ssl=False) as response: + async with self.mass.http_session.get(url, params=kwargs, ssl=False) as response: try: result = await response.json() except ( diff --git a/music_assistant/server/providers/tunein/__init__.py b/music_assistant/server/providers/tunein/__init__.py index bd03fa83..6a7f182a 100644 --- a/music_assistant/server/providers/tunein/__init__.py +++ b/music_assistant/server/providers/tunein/__init__.py @@ -237,7 +237,7 @@ class TuneInProvider(MusicProvider): kwargs["partnerId"] = "1" kwargs["render"] = "json" async with self._throttler: - async with self.mass.http_session.get(url, params=kwargs, verify_ssl=False) as response: + async with self.mass.http_session.get(url, params=kwargs, ssl=False) as response: result = await response.json() if not result or "error" in result: self.logger.error(url) diff --git a/music_assistant/server/providers/ytmusic/__init__.py b/music_assistant/server/providers/ytmusic/__init__.py index 6642b4f3..eee388d6 100644 --- a/music_assistant/server/providers/ytmusic/__init__.py +++ b/music_assistant/server/providers/ytmusic/__init__.py @@ -454,7 +454,7 @@ class YoutubeMusicProvider(MusicProvider): url, headers=self._headers, json=data, - verify_ssl=False, + ssl=False, cookies=self._cookies, ) as response: return await response.json() diff --git a/music_assistant/server/server.py b/music_assistant/server/server.py index 7f4589c5..49eb0fce 100644 --- a/music_assistant/server/server.py +++ b/music_assistant/server/server.py @@ -233,10 +233,10 @@ class MusicAssistant: return existing if asyncio.iscoroutinefunction(target): task = self.loop.create_task(target(*args, **kwargs)) - elif isinstance(target, asyncio.Future): - task = target elif asyncio.iscoroutine(target): task = self.loop.create_task(target) + elif isinstance(target, asyncio.Future): + task = target else: # assume normal callable (non coroutine or awaitable) task = self.loop.create_task(asyncio.to_thread(target, *args, **kwargs)) diff --git a/requirements_all.txt b/requirements_all.txt index 0b6f1c0b..28bf6500 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -16,6 +16,7 @@ memory-tempfile==2.2.3 music-assistant-frontend==20230327.1 orjson==3.8.7 pillow==9.4.0 +PyChromecast==13.0.6 plexapi==4.13.2 PyChromecast==13.0.5 python-slugify==8.0.1 -- 2.34.1