Various optimizations for file/smb provider (#577)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 26 Mar 2023 14:11:48 +0000 (16:11 +0200)
committerGitHub <noreply@github.com>
Sun, 26 Mar 2023 14:11:48 +0000 (16:11 +0200)
music_assistant/server/controllers/config.py
music_assistant/server/controllers/media/albums.py
music_assistant/server/controllers/media/base.py
music_assistant/server/controllers/media/tracks.py
music_assistant/server/helpers/tags.py
music_assistant/server/providers/filesystem_local/base.py
music_assistant/server/providers/filesystem_smb/__init__.py
music_assistant/server/providers/sonos/__init__.py

index c3070a10c23382b36fa637c8aee92b12dc4ad6fd..d08d50a49ab6e3afeb3acfd436dd74b3c9b6ded6 100644 (file)
@@ -457,6 +457,8 @@ class ConfigController:
 
     def decrypt_string(self, encrypted_str: str) -> str:
         """Decrypt a (password)string with Fernet."""
+        if not encrypted_str:
+            return encrypted_str
         if not encrypted_str.startswith(ENCRYPT_SUFFIX):
             return encrypted_str
         encrypted_str = encrypted_str.replace(ENCRYPT_SUFFIX, "")
index 1e9a70d06ab145904a425f7ec1cf8407f6d31d6f..ea922976e75928448e90deb6c71c0c178f1c9d82 100644 (file)
@@ -53,7 +53,7 @@ class AlbumsController(MediaControllerBase[Album]):
         force_refresh: bool = False,
         lazy: bool = True,
         details: Album = None,
-        force_provider_item: bool = False,
+        add_to_db: bool = True,
     ) -> Album:
         """Return (full) details for a single media item."""
         album = await super().get(
@@ -63,7 +63,7 @@ class AlbumsController(MediaControllerBase[Album]):
             force_refresh=force_refresh,
             lazy=lazy,
             details=details,
-            force_provider_item=force_provider_item,
+            add_to_db=add_to_db,
         )
         # append full artist details to full album item
         if album.artist:
index 931ee3471376525b3d8bb8de5fdf6f3fbf434c72..074dd42c27743521649f05c46ae6e3d5d07b9f79 100644 (file)
@@ -126,14 +126,14 @@ class MediaControllerBase(Generic[ItemCls], metaclass=ABCMeta):
         force_refresh: bool = False,
         lazy: bool = True,
         details: ItemCls = None,
-        force_provider_item: bool = False,
+        add_to_db: bool = True,
     ) -> ItemCls:
         """Return (full) details for a single media item."""
         assert (
             provider_domain or provider_instance
         ), "provider_domain or provider_instance must be supplied"
-        if force_provider_item:
-            return await self.get_provider_item(item_id, provider_instance)
+        if not add_to_db:
+            return await self.get_provider_item(item_id, provider_instance or provider_domain)
         if details and details.provider == "database":
             details = None
         db_item = await self.get_db_item_by_prov_id(
index 2cc027a1b2e1f794b3f6405d2a56b79b2e46bf9d..54ddf48e33b2ca30570a81976e3f09b28211578c 100644 (file)
@@ -51,7 +51,8 @@ class TracksController(MediaControllerBase[Track]):
         force_refresh: bool = False,
         lazy: bool = True,
         details: Track = None,
-        force_provider_item: bool = False,
+        album_uri: str | None = None,
+        add_to_db: bool = True,
     ) -> Track:
         """Return (full) details for a single media item."""
         track = await super().get(
@@ -61,20 +62,27 @@ class TracksController(MediaControllerBase[Track]):
             force_refresh=force_refresh,
             lazy=lazy,
             details=details,
-            force_provider_item=force_provider_item,
+            add_to_db=add_to_db,
         )
         # append full album details to full track item
-        if track.album:
-            try:
+        try:
+            cur_track_album = track.album
+            if album_uri and (album := await self.mass.music.get_item_by_uri(album_uri)):
+                track.album = album
+                # if the track's primary album does not match, copy the image
+                # otherwise it will look weird
+                if album.uri != cur_track_album.uri and album.image:
+                    track.metadata.images = [album.image] + track.metadata.images
+            elif track.album:
                 track.album = await self.mass.music.albums.get(
                     track.album.item_id,
                     track.album.provider,
                     lazy=True,
                     details=track.album,
                 )
-            except MediaNotFoundError:
-                # edge case where playlist track has invalid albumdetails
-                self.logger.warning("Unable to fetch album details %s", track.album.uri)
+        except MediaNotFoundError:
+            # edge case where playlist track has invalid albumdetails
+            self.logger.warning("Unable to fetch album details %s", track.album.uri)
         # append full artist details to full track item
         full_artists = []
         for artist in track.artists:
index ec8fb54c11e46aa47c081266556456df637f5aec..8ea4fd323bbb056bbc36a619664692aecef2376e 100644 (file)
@@ -310,8 +310,16 @@ async def parse_tags(
             # feed the file contents to the process
 
             async def chunk_feeder():
+                bytes_read = 0
                 async for chunk in input_file:
                     await proc.write(chunk)
+                    bytes_read += len(chunk)
+
+                if bytes_read > 25 * 1000000:
+                    # this is possibly a m4a file with 'moove atom' metadata at the end of the file
+                    # we'll have to read the entire file to do something with it
+                    # for now we just ignore/deny these files
+                    raise RuntimeError("Tags not present at beginning of file")
 
                 proc.write_eof()
 
index 283cce5613c1e8b744d90e6559d5e48ea0d05705..f3ab8f31c9d970fd68d9178b577c6758786334eb 100644 (file)
@@ -565,29 +565,9 @@ class FileSystemProviderBase(MusicProvider):
         """Get full track details by id."""
         # ruff: noqa: PLR0915, PLR0912
 
-        # m4a files are nasty because in 99% of the cases the metadata is
-        # at the end of the file (moov atom) so in order to read tags
-        # we need to read the entire file, which is not practically do-able with
-        # remote connections, so we ignore those files
-        large_m4a_file = (
-            file_item.ext in ("m4a", "m4b")
-            and not file_item.local_path
-            and file_item.file_size > 100000000
-        )
-        if large_m4a_file:
-            self.logger.warning(
-                "Large m4a file detected which is unsuitable for remote storage: %s"
-                " - consider converting this file to another file format or make sure "
-                "that `moov atom` metadata is at the beginning of the file. - "
-                "loading info for this file is going to take a long time!",
-                file_item.path,
-            )
-
         # parse tags
         input_file = file_item.local_path or self.read_file_content(file_item.absolute_path)
         tags = await parse_tags(input_file, file_item.file_size)
-        if large_m4a_file:
-            tags.has_cover_image = False
 
         name, version = parse_title_and_version(tags.title, tags.version)
         track = Track(
index 87c685f54ba93fd6fe3f34a9a829922bd5fa8f46..4ffc496f6fce8bdd2376cdeed5fc64fee92600ac 100644 (file)
@@ -11,6 +11,7 @@ from typing import TYPE_CHECKING
 import smbclient
 from smbclient import path as smbpath
 
+from music_assistant.common.helpers.util import get_ip_from_host
 from music_assistant.common.models.config_entries import ConfigEntry
 from music_assistant.common.models.enums import ConfigEntryType
 from music_assistant.common.models.errors import LoginFailed
@@ -35,6 +36,7 @@ if TYPE_CHECKING:
 CONF_HOST = "host"
 CONF_SHARE = "share"
 CONF_SUBFOLDER = "subfolder"
+CONF_CONN_LIMIT = "connection_limit"
 
 
 async def setup(
@@ -44,6 +46,10 @@ async def setup(
     # silence logging a bit on smbprotocol
     logging.getLogger("smbprotocol").setLevel("WARNING")
     logging.getLogger("smbclient").setLevel("INFO")
+    # check if valid dns name is given
+    server: str = config.get_value(CONF_HOST)
+    if not await get_ip_from_host(server):
+        raise LoginFailed(f"Unable to resolve {server}, make sure the address is resolveable.")
     prov = SMBFileSystemProvider(mass, manifest, config)
     await prov.handle_setup()
     return prov
@@ -55,15 +61,15 @@ async def get_config_entries(
     """Return Config entries to setup this provider."""
     return (
         ConfigEntry(
-            key="host",
+            key=CONF_HOST,
             type=ConfigEntryType.STRING,
-            label="Remote host",
+            label="Server",
             required=True,
-            description="The (fqdn) hostname of the SMB/CIFS server to connect to."
+            description="The (fqdn) hostname of the SMB/CIFS/DFS server to connect to."
             "For example mynas.local.",
         ),
         ConfigEntry(
-            key="share",
+            key=CONF_SHARE,
             type=ConfigEntryType.STRING,
             label="Share",
             required=True,
@@ -71,7 +77,7 @@ async def get_config_entries(
             "the remote host, For example 'media'.",
         ),
         ConfigEntry(
-            key="username",
+            key=CONF_USERNAME,
             type=ConfigEntryType.STRING,
             label="Username",
             required=True,
@@ -80,16 +86,16 @@ async def get_config_entries(
             "For anynymous access you may want to try with the user `guest`.",
         ),
         ConfigEntry(
-            key="password",
+            key=CONF_PASSWORD,
             type=ConfigEntryType.SECURE_STRING,
-            label="Username",
-            required=True,
-            default_value="guest",
+            label="Password",
+            required=False,
+            default_value=None,
             description="The username to authenticate to the remote server. "
             "For anynymous access you may want to try with the user `guest`.",
         ),
         ConfigEntry(
-            key="subfolder",
+            key=CONF_SUBFOLDER,
             type=ConfigEntryType.STRING,
             label="Subfolder",
             required=False,
@@ -98,6 +104,17 @@ async def get_config_entries(
             "E.g. 'collections' or 'albums/A-K'.",
         ),
         CONF_ENTRY_MISSING_ALBUM_ARTIST,
+        ConfigEntry(
+            key=CONF_CONN_LIMIT,
+            type=ConfigEntryType.INTEGER,
+            label="Connection limit",
+            required=False,
+            default_value=5,
+            advanced=True,
+            description="[optional] Limit the number of concurrent connections. "
+            "Set the value high(er) for more performance but some (Windows) servers "
+            "may deny requests in that case",
+        ),
     )
 
 
@@ -127,30 +144,42 @@ class SMBFileSystemProvider(FileSystemProviderBase):
 
     async def handle_setup(self) -> None:
         """Handle async initialization of the provider."""
-        # silence SMB.SMBConnection logger a bit
-        logging.getLogger("SMB.SMBConnection").setLevel("WARNING")
-
         server: str = self.config.get_value(CONF_HOST)
         share: str = self.config.get_value(CONF_SHARE)
         subfolder: str = self.config.get_value(CONF_SUBFOLDER)
-
-        # register smb session
-        self.logger.info("Connecting to server %s", server)
-        self._session = await asyncio.to_thread(
-            smbclient.register_session,
-            server,
-            username=self.config.get_value(CONF_USERNAME),
-            password=self.config.get_value(CONF_PASSWORD),
-        )
+        connection_limit: int = self.config.get_value(CONF_CONN_LIMIT)
+        self.semaphore = asyncio.Semaphore(connection_limit)
 
         # create windows like path (\\server\share\subfolder)
         if subfolder.endswith(os.sep):
             subfolder = subfolder[:-1]
         self._root_path = f"{os.sep}{os.sep}{server}{os.sep}{share}{os.sep}{subfolder}"
         self.logger.debug("Using root path: %s", self._root_path)
-        # validate provided path
-        if not await asyncio.to_thread(smbpath.isdir, self._root_path):
-            raise LoginFailed(f"Invalid share or subfolder given: {self._root_path}")
+
+        # register smb session
+        self.logger.info("Connecting to server %s", server)
+        try:
+            self._session = await asyncio.to_thread(
+                smbclient.register_session,
+                server,
+                username=self.config.get_value(CONF_USERNAME),
+                password=self.config.get_value(CONF_PASSWORD),
+            )
+            # validate provided path
+            if not await asyncio.to_thread(smbpath.isdir, self._root_path):
+                raise LoginFailed(f"Invalid subfolder given: {subfolder}")
+        except Exception as err:
+            if "Unable to negotiate " in str(err):
+                detail = "Invalid credentials"
+            elif "refused " in str(err):
+                detail = "Invalid hostname (or host not reachable)"
+            elif "STATUS_NOT_FOUND" in str(err):
+                detail = "Share does not exist"
+            elif "Invalid argument" in str(err) and "." not in server:
+                detail = "Make sure to enter a FQDN hostname or IP-address"
+            else:
+                detail = str(err)
+            raise LoginFailed(f"Connection failed for the given details: {detail}") from err
 
     async def listdir(
         self, path: str, recursive: bool = False
@@ -169,7 +198,9 @@ class SMBFileSystemProvider(FileSystemProviderBase):
 
         """
         abs_path = get_absolute_path(self._root_path, path)
-        for entry in await asyncio.to_thread(smbclient.scandir, abs_path):
+        async with self.semaphore:
+            entries = await asyncio.to_thread(smbclient.scandir, abs_path)
+        for entry in entries:
             if entry.name.startswith(".") or any(x in entry.name for x in IGNORE_DIRS):
                 # skip invalid/system files and dirs
                 continue
@@ -207,7 +238,8 @@ class SMBFileSystemProvider(FileSystemProviderBase):
             )
 
         # run in thread because strictly taken this may be blocking IO
-        return await asyncio.to_thread(_create_item)
+        async with self.semaphore:
+            return await asyncio.to_thread(_create_item)
 
     async def exists(self, file_path: str) -> bool:
         """Return bool is this FileSystem musicprovider has given file/dir."""
@@ -215,7 +247,13 @@ class SMBFileSystemProvider(FileSystemProviderBase):
             return False  # guard
         file_path = file_path.replace("\\", os.sep)
         abs_path = get_absolute_path(self._root_path, file_path)
-        return await asyncio.to_thread(smbpath.exists, abs_path)
+        async with self.semaphore:
+            try:
+                return await asyncio.to_thread(smbpath.exists, abs_path)
+            except Exception as err:
+                if "STATUS_OBJECT_NAME_INVALID" in str(err):
+                    return False
+                raise err
 
     async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]:
         """Yield (binary) contents of file in chunks of bytes."""
@@ -225,24 +263,26 @@ class SMBFileSystemProvider(FileSystemProviderBase):
         queue = asyncio.Queue()
         self.logger.debug("Reading file contents for %s", abs_path)
 
-        def _reader():
-            with smbclient.open_file(abs_path, "rb", share_access="r") as _file:
-                if seek:
-                    _file.seek(seek)
-                # yield chunks of data from file
-                while True:
-                    data = _file.read(chunk_size)
-                    if not data:
-                        break
-                    self.mass.loop.call_soon_threadsafe(queue.put_nowait, data)
-            self.mass.loop.call_soon_threadsafe(queue.put_nowait, b"")
-
-        self.mass.create_task(_reader)
-        while True:
-            chunk = await queue.get()
-            if chunk == b"":
-                break
-            yield chunk
+        async with self.semaphore:
+
+            def _reader():
+                with smbclient.open_file(abs_path, "rb", share_access="r") as _file:
+                    if seek:
+                        _file.seek(seek)
+                    # yield chunks of data from file
+                    while True:
+                        data = _file.read(chunk_size)
+                        if not data:
+                            break
+                        self.mass.loop.call_soon_threadsafe(queue.put_nowait, data)
+                self.mass.loop.call_soon_threadsafe(queue.put_nowait, b"")
+
+            self.mass.create_task(_reader)
+            while True:
+                chunk = await queue.get()
+                if chunk == b"":
+                    break
+                yield chunk
 
     async def write_file_content(self, file_path: str, data: bytes) -> None:
         """Write entire file content as bytes (e.g. for playlists)."""
@@ -253,4 +293,5 @@ class SMBFileSystemProvider(FileSystemProviderBase):
             with smbclient.open_file(abs_path, "wb") as _file:
                 _file.write(data)
 
-        await asyncio.to_thread(_writer)
+        async with self.semaphore:
+            await asyncio.to_thread(_writer)
index bcb31effa4e4381995925cd699423af707716b2d..349ea02b80f84634dbefad72efab3b938a8884e7 100644 (file)
@@ -284,7 +284,9 @@ class SonosPlayerProvider(PlayerProvider):
         await asyncio.to_thread(sonos_player.soco.stop)
         await asyncio.to_thread(sonos_player.soco.clear_queue)
 
-        radio_mode = flow_mode or not queue_item.duration
+        radio_mode = (
+            flow_mode or not queue_item.duration or queue_item.media_type == MediaType.RADIO
+        )
         url = await self.mass.streams.resolve_stream_url(
             queue_item=queue_item,
             player_id=sonos_player.player_id,