Fix memory leaks and some performance tweaks (#587)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 28 Mar 2023 16:20:29 +0000 (18:20 +0200)
committerGitHub <noreply@github.com>
Tue, 28 Mar 2023 16:20:29 +0000 (18:20 +0200)
* add docs for airplay

* verify_ssl is deprecated

* limit number of workers a bit

* change order

* reduce chunk sizes

* Fix memory leaks in File providers

* use taskgroup for poll players

* allow max 25mb for reading tags

* force close process

* bump pychromecast to 13.0.6

fixes a memory leak

20 files changed:
music_assistant/__main__.py
music_assistant/server/controllers/players.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/process.py
music_assistant/server/helpers/tags.py
music_assistant/server/helpers/util.py
music_assistant/server/providers/airplay/manifest.json
music_assistant/server/providers/chromecast/manifest.json
music_assistant/server/providers/fanarttv/__init__.py
music_assistant/server/providers/filesystem_local/__init__.py
music_assistant/server/providers/filesystem_local/base.py
music_assistant/server/providers/filesystem_smb/__init__.py
music_assistant/server/providers/musicbrainz/__init__.py
music_assistant/server/providers/qobuz/__init__.py
music_assistant/server/providers/spotify/__init__.py
music_assistant/server/providers/theaudiodb/__init__.py
music_assistant/server/providers/tunein/__init__.py
music_assistant/server/providers/ytmusic/__init__.py
music_assistant/server/server.py
requirements_all.txt

index 960a487d24a2e85b7a7d7d61b5f1d52409d42167..9d40c04a4395cbc7b9997dcac01e7f11b55b193f 100644 (file)
@@ -108,7 +108,7 @@ def main():
         start_mass(),
         use_uvloop=False,
         shutdown_callback=on_shutdown,
-        executor_workers=64,
+        executor_workers=32,
     )
 
 
index 4e81031f9272a7bc7c7b08b5c69ef0713bde5504..8a48608141255dbfeb76f39391b51fdd334fa132 100755 (executable)
@@ -521,40 +521,41 @@ class PlayerController:
         count = 0
         while True:
             count += 1
-            for player in list(self._players.values()):
-                player_id = player.player_id
-                # if the player is playing, update elapsed time every tick
-                # to ensure the queue has accurate details
-                player_playing = (
-                    player.active_queue == player.player_id and player.state == PlayerState.PLAYING
-                )
-                if player_playing:
-                    self.mass.loop.call_soon(self.update, player_id)
-                # Poll player;
-                # - every 360 seconds if the player if not powered
-                # - every 30 seconds if the player is powered
-                # - every 10 seconds if the player is playing
-                if (
-                    (player.available and player.powered and count % 30 == 0)
-                    or (player.available and player_playing and count % 10 == 0)
-                    or count == 360
-                ):
-                    if player_prov := self.get_player_provider(player_id):
-                        try:
-                            await player_prov.poll_player(player_id)
-                        except PlayerUnavailableError:
-                            player.available = False
-                            player.state = PlayerState.IDLE
-                            player.powered = False
-                            self.mass.loop.call_soon(self.update, player_id)
-                        except Exception as err:  # pylint: disable=broad-except
-                            LOGGER.warning(
-                                "Error while requesting latest state from player %s: %s",
-                                player.display_name,
-                                str(err),
-                                exc_info=err,
-                            )
-                    if count >= 360:
-                        count = 0
-                    await asyncio.sleep(0)
+            async with asyncio.TaskGroup() as tg:
+                for player in list(self._players.values()):
+                    player_id = player.player_id
+                    # if the player is playing, update elapsed time every tick
+                    # to ensure the queue has accurate details
+                    player_playing = (
+                        player.active_queue == player.player_id
+                        and player.state == PlayerState.PLAYING
+                    )
+                    if player_playing:
+                        self.mass.loop.call_soon(self.update, player_id)
+                    # Poll player;
+                    # - every 360 seconds if the player if not powered
+                    # - every 30 seconds if the player is powered
+                    # - every 10 seconds if the player is playing
+                    if (
+                        (player.available and player.powered and count % 30 == 0)
+                        or (player.available and player_playing and count % 10 == 0)
+                        or count == 360
+                    ):
+                        if player_prov := self.get_player_provider(player_id):
+                            try:
+                                tg.create_task(player_prov.poll_player(player_id))
+                            except PlayerUnavailableError:
+                                player.available = False
+                                player.state = PlayerState.IDLE
+                                player.powered = False
+                                self.mass.loop.call_soon(self.update, player_id)
+                            except Exception as err:  # pylint: disable=broad-except
+                                LOGGER.warning(
+                                    "Error while requesting latest state from player %s: %s",
+                                    player.display_name,
+                                    str(err),
+                                    exc_info=err,
+                                )
+                        if count >= 360:
+                            count = 0
             await asyncio.sleep(1)
index eef53b5df284f099e1bb28d186591204021dacc4..b22885f38ed52703c7e9e52f6d20b1d8267801ed 100644 (file)
@@ -397,7 +397,10 @@ class StreamsController:
             # feed stdin with pcm audio chunks from origin
             async def read_audio():
                 async for chunk in stream_job.subscribe(player_id):
-                    await ffmpeg_proc.write(chunk)
+                    try:
+                        await ffmpeg_proc.write(chunk)
+                    except BrokenPipeError:
+                        break
                 ffmpeg_proc.write_eof()
 
             ffmpeg_proc.attach_task(read_audio())
@@ -406,7 +409,7 @@ class StreamsController:
             iterator = (
                 ffmpeg_proc.iter_chunked(icy_meta_interval)
                 if enable_icy
-                else ffmpeg_proc.iter_any()
+                else ffmpeg_proc.iter_chunked(128000)
             )
 
             bytes_streamed = 0
@@ -414,37 +417,36 @@ class StreamsController:
             async for chunk in iterator:
                 try:
                     await resp.write(chunk)
-                    bytes_streamed += len(chunk)
-
-                    # do not allow the player to prebuffer more than 60 seconds
-                    seconds_streamed = int(bytes_streamed / stream_job.pcm_sample_size)
-                    if (
-                        seconds_streamed > 120
-                        and (seconds_streamed - player.corrected_elapsed_time) > 30
-                    ):
-                        await asyncio.sleep(1)
-
-                    if not enable_icy:
-                        continue
-
-                    # if icy metadata is enabled, send the icy metadata after the chunk
-                    item_in_buf = stream_job.queue_item
-                    if item_in_buf and item_in_buf.streamdetails.stream_title:
-                        title = item_in_buf.streamdetails.stream_title
-                    elif item_in_buf and item_in_buf.name:
-                        title = item_in_buf.name
-                    else:
-                        title = "Music Assistant"
-                    metadata = f"StreamTitle='{title}';".encode()
-                    while len(metadata) % 16 != 0:
-                        metadata += b"\x00"
-                    length = len(metadata)
-                    length_b = chr(int(length / 16)).encode()
-                    await resp.write(length_b + metadata)
-
                 except (BrokenPipeError, ConnectionResetError):
-                    # connection lost
+                    # race condition
                     break
+                bytes_streamed += len(chunk)
+
+                # do not allow the player to prebuffer more than 60 seconds
+                seconds_streamed = int(bytes_streamed / stream_job.pcm_sample_size)
+                if (
+                    seconds_streamed > 120
+                    and (seconds_streamed - player.corrected_elapsed_time) > 30
+                ):
+                    await asyncio.sleep(1)
+
+                if not enable_icy:
+                    continue
+
+                # if icy metadata is enabled, send the icy metadata after the chunk
+                item_in_buf = stream_job.queue_item
+                if item_in_buf and item_in_buf.streamdetails.stream_title:
+                    title = item_in_buf.streamdetails.stream_title
+                elif item_in_buf and item_in_buf.name:
+                    title = item_in_buf.name
+                else:
+                    title = "Music Assistant"
+                metadata = f"StreamTitle='{title}';".encode()
+                while len(metadata) % 16 != 0:
+                    metadata += b"\x00"
+                length = len(metadata)
+                length_b = chr(int(length / 16)).encode()
+                await resp.write(length_b + metadata)
 
         return resp
 
index c272426e7e3eea472ec5ec1e601bc588b94425a0..e082e5f2f9c577a34fb77db335bfb1961391dc48 100644 (file)
@@ -12,7 +12,7 @@ from collections.abc import AsyncGenerator, Coroutine
 LOGGER = logging.getLogger(__name__)
 
 DEFAULT_CHUNKSIZE = 128000
-DEFAULT_TIMEOUT = 30 * 60
+DEFAULT_TIMEOUT = 60
 
 # pylint: disable=invalid-name
 
@@ -22,7 +22,7 @@ class AsyncProcess:
 
     def __init__(
         self,
-        args: list | str,
+        args: list,
         enable_stdin: bool = False,
         enable_stdout: bool = True,
         enable_stderr: bool = False,
@@ -38,55 +38,29 @@ class AsyncProcess:
 
     async def __aenter__(self) -> AsyncProcess:
         """Enter context manager."""
-        args = " ".join(self._args) if "|" in self._args else self._args
-        if isinstance(args, str):
-            self._proc = await asyncio.create_subprocess_shell(
-                args,
-                stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
-                stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
-                stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
-                close_fds=True,
-                limit=64 * 1024 * 1024,
-            )
-        else:
-            self._proc = await asyncio.create_subprocess_exec(
-                *args,
-                stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
-                stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
-                stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
-                close_fds=True,
-                limit=64 * 1024 * 1024,
-            )
-
-            # Fix BrokenPipeError due to a race condition
-            # by attaching a default done callback
-            def _done_cb(fut: asyncio.Future):
-                fut.exception()
-
-            self._proc._transport._protocol._stdin_closed.add_done_callback(_done_cb)
-
+        self._proc = await asyncio.create_subprocess_exec(
+            *self._args,
+            stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
+            stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
+            stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
+            close_fds=True,
+        )
         return self
 
     async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
         """Exit context manager."""
         self.closed = True
-        if self._attached_task:
-            # cancel the attached reader/writer task
-            try:
-                self._attached_task.cancel()
-                await self._attached_task
-            except asyncio.CancelledError:
-                pass
+        # make sure the process is cleaned up
+        self.write_eof()
         if self._proc.returncode is None:
-            # prevent subprocess deadlocking, read remaining bytes
-            await self._proc.communicate()
-            if self._enable_stdout and not self._proc.stdout.at_eof():
-                await self._proc.stdout.read()
-            if self._enable_stderr and not self._proc.stderr.at_eof():
-                await self._proc.stderr.read()
-            if self._proc.returncode is None:
-                # just in case?
+            try:
+                async with asyncio.timeout(10):
+                    await self._proc.communicate()
+            except TimeoutError:
                 self._proc.kill()
+                await self._proc.communicate()
+        if self._proc.returncode is None:
+            self._proc.kill()
 
     async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks of n size from the process stdout."""
@@ -127,15 +101,17 @@ class AsyncProcess:
     async def write(self, data: bytes) -> None:
         """Write data to process stdin."""
         if self.closed or self._proc.stdin.is_closing():
-            raise asyncio.CancelledError()
+            return
         self._proc.stdin.write(data)
         try:
             await self._proc.stdin.drain()
         except BrokenPipeError:
-            raise asyncio.CancelledError()
+            LOGGER.warning("Attempted write to an already closed process")
 
     def write_eof(self) -> None:
         """Write end of file to to process stdin."""
+        if self.closed or self._proc.stdin.is_closing():
+            return
         try:
             if self._proc.stdin.can_write_eof():
                 self._proc.stdin.write_eof()
@@ -147,7 +123,7 @@ class AsyncProcess:
             ConnectionResetError,
         ):
             # already exited, race condition
-            return
+            LOGGER.warning("Attempted write to an already closed process")
 
     async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
         """Write bytes to process and read back results."""
index af557efa97ae084e073fa824ac1db264aa46b88f..00c2d5828688f06872776ce27c72fe448be1a071 100644 (file)
@@ -316,17 +316,21 @@ async def parse_tags(
 
             async def chunk_feeder():
                 bytes_read = 0
-                async for chunk in input_file:
-                    await proc.write(chunk)
-                    bytes_read += len(chunk)
-
-                if bytes_read > 25 * 1000000:
-                    # this is possibly a m4a file with 'moove atom' metadata at the end of the file
-                    # we'll have to read the entire file to do something with it
-                    # for now we just ignore/deny these files
-                    raise RuntimeError("Tags not present at beginning of file")
-
-                proc.write_eof()
+                try:
+                    async for chunk in input_file:
+                        if proc.closed:
+                            break
+                        await proc.write(chunk)
+                        bytes_read += len(chunk)
+                        del chunk
+                        if bytes_read > 25 * 1000000:
+                            # this is possibly a m4a file with 'moove atom' metadata at the
+                            # end of the file
+                            # we'll have to read the entire file to do something with it
+                            # for now we just ignore/deny these files
+                            raise RuntimeError("Tags not present at beginning of file")
+                finally:
+                    proc.write_eof()
 
             proc.attach_task(chunk_feeder())
 
@@ -335,7 +339,11 @@ async def parse_tags(
             data = json.loads(res)
             if error := data.get("error"):
                 raise InvalidDataError(error["string"])
+            if not data.get("streams") or data["streams"][0].get("codec_type") == "video":
+                raise InvalidDataError("Not an audio file")
             tags = AudioTags.parse(data)
+            del res
+            del data
             if not tags.duration and file_size and tags.bit_rate:
                 # estimate duration from filesize/bitrate
                 tags.duration = int((file_size * 8) / tags.bit_rate)
index 3a6f3b18bca83480af940d025778a1ee58d2363c..b35a9a9ac1fd5a1afb506fd741b0e77f24282b80 100644 (file)
@@ -4,8 +4,9 @@ from __future__ import annotations
 import asyncio
 import importlib
 import logging
+from collections.abc import AsyncGenerator, Iterator
 from functools import lru_cache
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
 
 if TYPE_CHECKING:
     from music_assistant.server.models import ProviderModuleType
@@ -37,3 +38,32 @@ async def get_provider_module(domain: str) -> ProviderModuleType:
         return importlib.import_module(f".{domain}", "music_assistant.server.providers")
 
     return await asyncio.to_thread(_get_provider_module, domain)
+
+
+async def async_iter(sync_iterator: Iterator, *args, **kwargs) -> AsyncGenerator[Any, None]:
+    """Wrap blocking iterator into an asynchronous one."""
+    # inspired by: https://stackoverflow.com/questions/62294385/synchronous-generator-in-asyncio
+    loop = asyncio.get_running_loop()
+    queue = asyncio.Queue(1)
+    _end_ = object()
+
+    def iter_to_queue():
+        try:
+            for item in sync_iterator(*args, **kwargs):
+                if queue is None:
+                    break
+                asyncio.run_coroutine_threadsafe(queue.put(item), loop).result()
+        finally:
+            asyncio.run_coroutine_threadsafe(queue.put(_end_), loop).result()
+
+    iter_fut = loop.run_in_executor(None, iter_to_queue)
+    try:
+        while True:
+            next_item = await queue.get()
+            if next_item is _end_:
+                break
+            yield next_item
+    finally:
+        queue = None
+        if not iter_fut.done():
+            iter_fut.cancel()
index 1b7bb7ee61bc40cad530ebfc90ef4e34b9f6d263..4e26fb48a6e12ad0bfe5cab93065581f42a28cd8 100644 (file)
@@ -5,7 +5,7 @@
   "description": "Support for players that support the Airplay protocol.",
   "codeowners": ["@music-assistant"],
   "requirements": [],
-  "documentation": "",
+  "documentation": "https://github.com/orgs/music-assistant/discussions/1165",
   "multi_instance": false,
   "builtin": false,
   "load_by_default": true,
index bc7cc2587a30476e800c5876f6f0614ec1c92c55..9e044cdf785b8bf49328a807bce18371f3255906 100644 (file)
@@ -4,7 +4,7 @@
   "name": "Chromecast",
   "description": "Support for Chromecast based players.",
   "codeowners": ["@music-assistant"],
-  "requirements": ["PyChromecast==13.0.5"],
+  "requirements": ["PyChromecast==13.0.6"],
   "documentation": "https://github.com/music-assistant/hass-music-assistant/discussions/1138",
   "multi_instance": false,
   "builtin": false,
index 62999fda6f151eef90ec044b16bf6fa329af46a6..c0926c0677404e28f502133af998c33dc19045bd 100644 (file)
@@ -110,7 +110,7 @@ class FanartTvMetadataProvider(MetadataProvider):
         url = f"http://webservice.fanart.tv/v3/{endpoint}"
         kwargs["api_key"] = app_var(4)
         async with self.throttler:
-            async with self.mass.http_session.get(url, params=kwargs, verify_ssl=False) as response:
+            async with self.mass.http_session.get(url, params=kwargs, ssl=False) as response:
                 try:
                     result = await response.json()
                 except (
index af3d669086ad91a5ec3e3ecbb1bc5dfd7ec4155c..8469ce16817233fa01e9a95681e1650c5198db7f 100644 (file)
@@ -14,6 +14,7 @@ from music_assistant.common.models.config_entries import ConfigEntry
 from music_assistant.common.models.enums import ConfigEntryType
 from music_assistant.common.models.errors import SetupFailedError
 from music_assistant.constants import CONF_PATH
+from music_assistant.server.helpers.util import async_iter
 
 from .base import (
     CONF_ENTRY_MISSING_ALBUM_ARTIST,
@@ -103,7 +104,7 @@ class LocalFileSystemProvider(FileSystemProviderBase):
 
         """
         abs_path = get_absolute_path(self.config.get_value(CONF_PATH), path)
-        for entry in await asyncio.to_thread(os.scandir, abs_path):
+        async for entry in async_iter(os.scandir, abs_path):
             if entry.name.startswith(".") or any(x in entry.name for x in IGNORE_DIRS):
                 # skip invalid/system files and dirs
                 continue
index 355fe4cd86b64b547bb6da2dbc8317b427b53f51..bd027de5b2cbd22c1fd5d6960e1d643300f93bb1 100644 (file)
@@ -38,6 +38,7 @@ from music_assistant.common.models.media_items import (
     Track,
 )
 from music_assistant.constants import SCHEMA_VERSION, VARIOUS_ARTISTS, VARIOUS_ARTISTS_ID
+from music_assistant.server.controllers.cache import use_cache
 from music_assistant.server.helpers.compare import compare_strings
 from music_assistant.server.helpers.playlists import parse_m3u, parse_pls
 from music_assistant.server.helpers.tags import parse_tags, split_items
@@ -659,7 +660,7 @@ class FileSystemProviderBase(MusicProvider):
             track.metadata.images = [
                 MediaItemImage(ImageType.THUMB, file_item.path, self.instance_id)
             ]
-        elif track.album.image:
+        elif track.album and track.album.image:
             track.metadata.images = [track.album.image]
 
         if track.album and not track.album.metadata.images:
@@ -822,6 +823,7 @@ class FileSystemProviderBase(MusicProvider):
 
         return album
 
+    @use_cache(120)
     async def _get_local_images(self, folder: str) -> list[MediaItemImage]:
         """Return local images found in a given folderpath."""
         images = []
index 4ffc496f6fce8bdd2376cdeed5fc64fee92600ac..fa29cc6296e494bc97294c141dcdd3b9deafee52 100644 (file)
@@ -16,6 +16,8 @@ from music_assistant.common.models.config_entries import ConfigEntry
 from music_assistant.common.models.enums import ConfigEntryType
 from music_assistant.common.models.errors import LoginFailed
 from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
+from music_assistant.server.controllers.cache import use_cache
+from music_assistant.server.helpers.util import async_iter
 from music_assistant.server.providers.filesystem_local.base import (
     CONF_ENTRY_MISSING_ALBUM_ARTIST,
     IGNORE_DIRS,
@@ -104,17 +106,6 @@ async def get_config_entries(
             "E.g. 'collections' or 'albums/A-K'.",
         ),
         CONF_ENTRY_MISSING_ALBUM_ARTIST,
-        ConfigEntry(
-            key=CONF_CONN_LIMIT,
-            type=ConfigEntryType.INTEGER,
-            label="Connection limit",
-            required=False,
-            default_value=5,
-            advanced=True,
-            description="[optional] Limit the number of concurrent connections. "
-            "Set the value high(er) for more performance but some (Windows) servers "
-            "may deny requests in that case",
-        ),
     )
 
 
@@ -147,12 +138,11 @@ class SMBFileSystemProvider(FileSystemProviderBase):
         server: str = self.config.get_value(CONF_HOST)
         share: str = self.config.get_value(CONF_SHARE)
         subfolder: str = self.config.get_value(CONF_SUBFOLDER)
-        connection_limit: int = self.config.get_value(CONF_CONN_LIMIT)
-        self.semaphore = asyncio.Semaphore(connection_limit)
 
         # create windows like path (\\server\share\subfolder)
         if subfolder.endswith(os.sep):
             subfolder = subfolder[:-1]
+        subfolder = subfolder.replace("\\", os.sep).replace("/", os.sep)
         self._root_path = f"{os.sep}{os.sep}{server}{os.sep}{share}{os.sep}{subfolder}"
         self.logger.debug("Using root path: %s", self._root_path)
 
@@ -198,19 +188,14 @@ class SMBFileSystemProvider(FileSystemProviderBase):
 
         """
         abs_path = get_absolute_path(self._root_path, path)
-        async with self.semaphore:
-            entries = await asyncio.to_thread(smbclient.scandir, abs_path)
-        for entry in entries:
+        async for entry in async_iter(smbclient.scandir, abs_path):
             if entry.name.startswith(".") or any(x in entry.name for x in IGNORE_DIRS):
                 # skip invalid/system files and dirs
                 continue
             item = await create_item(self._root_path, entry)
             if recursive and item.is_dir:
-                try:
-                    async for subitem in self.listdir(item.absolute_path, True):
-                        yield subitem
-                except (OSError, PermissionError) as err:
-                    self.logger.warning("Skip folder %s: %s", item.path, str(err))
+                async for subitem in self.listdir(item.absolute_path, True):
+                    yield subitem
             else:
                 yield item
 
@@ -238,51 +223,52 @@ class SMBFileSystemProvider(FileSystemProviderBase):
             )
 
         # run in thread because strictly taken this may be blocking IO
-        async with self.semaphore:
-            return await asyncio.to_thread(_create_item)
+        return await asyncio.to_thread(_create_item)
 
+    @use_cache(120)
     async def exists(self, file_path: str) -> bool:
         """Return bool is this FileSystem musicprovider has given file/dir."""
         if not file_path:
             return False  # guard
         file_path = file_path.replace("\\", os.sep)
         abs_path = get_absolute_path(self._root_path, file_path)
-        async with self.semaphore:
-            try:
-                return await asyncio.to_thread(smbpath.exists, abs_path)
-            except Exception as err:
-                if "STATUS_OBJECT_NAME_INVALID" in str(err):
-                    return False
-                raise err
+        try:
+            return await asyncio.to_thread(smbpath.exists, abs_path)
+        except Exception as err:
+            if "STATUS_OBJECT_NAME_INVALID" in str(err):
+                return False
+            raise err
 
     async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]:
         """Yield (binary) contents of file in chunks of bytes."""
         file_path = file_path.replace("\\", os.sep)
-        abs_path = get_absolute_path(self._root_path, file_path)
-        chunk_size = 512000
-        queue = asyncio.Queue()
-        self.logger.debug("Reading file contents for %s", abs_path)
-
-        async with self.semaphore:
+        absolute_path = get_absolute_path(self._root_path, file_path)
 
-            def _reader():
-                with smbclient.open_file(abs_path, "rb", share_access="r") as _file:
+        def _reader():
+            self.logger.debug("Reading file contents for %s", absolute_path)
+            try:
+                chunk_size = 64000
+                bytes_sent = 0
+                with smbclient.open_file(
+                    absolute_path, "rb", buffering=chunk_size, share_access="r"
+                ) as _file:
                     if seek:
                         _file.seek(seek)
-                    # yield chunks of data from file
                     while True:
-                        data = _file.read(chunk_size)
-                        if not data:
+                        chunk = _file.read(chunk_size)
+                        if not chunk:
                             break
-                        self.mass.loop.call_soon_threadsafe(queue.put_nowait, data)
-                self.mass.loop.call_soon_threadsafe(queue.put_nowait, b"")
-
-            self.mass.create_task(_reader)
-            while True:
-                chunk = await queue.get()
-                if chunk == b"":
-                    break
-                yield chunk
+                        yield chunk
+                        bytes_sent += len(chunk)
+            finally:
+                self.logger.debug(
+                    "Finished Reading file contents for %s - bytes transferred: %s",
+                    absolute_path,
+                    bytes_sent,
+                )
+
+        async for chunk in async_iter(_reader):
+            yield chunk
 
     async def write_file_content(self, file_path: str, data: bytes) -> None:
         """Write entire file content as bytes (e.g. for playlists)."""
@@ -293,5 +279,4 @@ class SMBFileSystemProvider(FileSystemProviderBase):
             with smbclient.open_file(abs_path, "wb") as _file:
                 _file.write(data)
 
-        async with self.semaphore:
-            await asyncio.to_thread(_writer)
+        await asyncio.to_thread(_writer)
index 631df43062d2f15cfdf4b04870235e73ad775dcc..14ffef4348bf21c9370db8b8e8e15ceecf89d148 100644 (file)
@@ -186,7 +186,7 @@ class MusicbrainzProvider(MetadataProvider):
         kwargs["fmt"] = "json"  # type: ignore[assignment]
         async with self.throttler:
             async with self.mass.http_session.get(
-                url, headers=headers, params=kwargs, verify_ssl=False
+                url, headers=headers, params=kwargs, ssl=False
             ) as response:
                 try:
                     result = await response.json()
index 91a0062811e0013ae2fd84d3ed6d696b60d27253..98654200ca329ddd095db2ce84aad2c1dd99e179 100644 (file)
@@ -643,7 +643,7 @@ class QobuzProvider(MusicProvider):
             kwargs["user_auth_token"] = await self._auth_token()
         async with self._throttler:
             async with self.mass.http_session.get(
-                url, headers=headers, params=kwargs, verify_ssl=False
+                url, headers=headers, params=kwargs, ssl=False
             ) as response:
                 try:
                     # make sure status is 200
@@ -677,7 +677,7 @@ class QobuzProvider(MusicProvider):
         params["app_id"] = app_var(0)
         params["user_auth_token"] = await self._auth_token()
         async with self.mass.http_session.post(
-            url, params=params, json=data, verify_ssl=False
+            url, params=params, json=data, ssl=False
         ) as response:
             try:
                 result = await response.json()
index a89b5473d95e6afbc90b8e5a06d2862911e41205..c528d7ab4e35fc4e9fb689d27a2d5fc607b3960b 100644 (file)
@@ -670,7 +670,7 @@ class SpotifyProvider(MusicProvider):
             time_start = time.time()
             try:
                 async with self.mass.http_session.get(
-                    url, headers=headers, params=kwargs, verify_ssl=False, timeout=120
+                    url, headers=headers, params=kwargs, ssl=False, timeout=120
                 ) as response:
                     result = await response.json()
                     if "error" in result or ("status" in result and "error" in result["status"]):
@@ -698,7 +698,7 @@ class SpotifyProvider(MusicProvider):
             return None
         headers = {"Authorization": f'Bearer {token["accessToken"]}'}
         async with self.mass.http_session.delete(
-            url, headers=headers, params=kwargs, json=data, verify_ssl=False
+            url, headers=headers, params=kwargs, json=data, ssl=False
         ) as response:
             return await response.text()
 
@@ -710,7 +710,7 @@ class SpotifyProvider(MusicProvider):
             return None
         headers = {"Authorization": f'Bearer {token["accessToken"]}'}
         async with self.mass.http_session.put(
-            url, headers=headers, params=kwargs, json=data, verify_ssl=False
+            url, headers=headers, params=kwargs, json=data, ssl=False
         ) as response:
             return await response.text()
 
@@ -722,7 +722,7 @@ class SpotifyProvider(MusicProvider):
             return None
         headers = {"Authorization": f'Bearer {token["accessToken"]}'}
         async with self.mass.http_session.post(
-            url, headers=headers, params=kwargs, json=data, verify_ssl=False
+            url, headers=headers, params=kwargs, json=data, ssl=False
         ) as response:
             return await response.text()
 
index 6e8236d07e390568defe562ead4290b678f1d748..adecc469d7013fdcab6b0e3b268b3260d5447862 100644 (file)
@@ -301,7 +301,7 @@ class AudioDbMetadataProvider(MetadataProvider):
         """Get data from api."""
         url = f"https://theaudiodb.com/api/v1/json/{app_var(3)}/{endpoint}"
         async with self.throttler:
-            async with self.mass.http_session.get(url, params=kwargs, verify_ssl=False) as response:
+            async with self.mass.http_session.get(url, params=kwargs, ssl=False) as response:
                 try:
                     result = await response.json()
                 except (
index bd03fa83a871d816a18bcce7411ebbfdd49cd399..6a7f182a37c470aedff5e18164251946345e9d0f 100644 (file)
@@ -237,7 +237,7 @@ class TuneInProvider(MusicProvider):
             kwargs["partnerId"] = "1"
             kwargs["render"] = "json"
         async with self._throttler:
-            async with self.mass.http_session.get(url, params=kwargs, verify_ssl=False) as response:
+            async with self.mass.http_session.get(url, params=kwargs, ssl=False) as response:
                 result = await response.json()
                 if not result or "error" in result:
                     self.logger.error(url)
index 6642b4f3554cbbb670a5c0584e381db3f9c2c46b..eee388d6c8711dc726e206a6242401f20eced762 100644 (file)
@@ -454,7 +454,7 @@ class YoutubeMusicProvider(MusicProvider):
             url,
             headers=self._headers,
             json=data,
-            verify_ssl=False,
+            ssl=False,
             cookies=self._cookies,
         ) as response:
             return await response.json()
index 7f4589c5f5299611c74fa284b136c804577417c7..49eb0fce4098c5fa2435254b32b19bc2dc7f965b 100644 (file)
@@ -233,10 +233,10 @@ class MusicAssistant:
             return existing
         if asyncio.iscoroutinefunction(target):
             task = self.loop.create_task(target(*args, **kwargs))
-        elif isinstance(target, asyncio.Future):
-            task = target
         elif asyncio.iscoroutine(target):
             task = self.loop.create_task(target)
+        elif isinstance(target, asyncio.Future):
+            task = target
         else:
             # assume normal callable (non coroutine or awaitable)
             task = self.loop.create_task(asyncio.to_thread(target, *args, **kwargs))
index 0b6f1c0b9a1e79917fc996913d6867362dc08f2e..28bf6500aeab571380c9b4c5c023c96013f8c8fe 100644 (file)
@@ -16,6 +16,7 @@ memory-tempfile==2.2.3
 music-assistant-frontend==20230327.1
 orjson==3.8.7
 pillow==9.4.0
+PyChromecast==13.0.6
 plexapi==4.13.2
 PyChromecast==13.0.5
 python-slugify==8.0.1