"""SMB filesystem provider for Music Assistant."""
from __future__ import annotations
+import asyncio
import logging
import os
from collections.abc import AsyncGenerator
-from contextlib import asynccontextmanager
+from os.path import basename
from typing import TYPE_CHECKING
-from smb.base import SharedFile
+import smbclient
+from smbclient import path as smbpath
-from music_assistant.common.helpers.util import get_ip_from_host
-from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueOption
+from music_assistant.common.models.config_entries import ConfigEntry
from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.errors import LoginFailed
from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
-from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.providers.filesystem_local.base import (
CONF_ENTRY_MISSING_ALBUM_ARTIST,
+ IGNORE_DIRS,
FileSystemItem,
FileSystemProviderBase,
)
get_relative_path,
)
-from .helpers import AsyncSMB
-
if TYPE_CHECKING:
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
) -> ProviderInstanceType:
"""Initialize provider(instance) with given configuration."""
+ # silence logging a bit on smbprotocol
+ logging.getLogger("smbprotocol").setLevel("WARNING")
+ logging.getLogger("smbclient").setLevel("INFO")
prov = SMBFileSystemProvider(mass, manifest, config)
await prov.handle_setup()
return prov
description="[optional] Use if your music is stored in a sublevel of the share. "
"E.g. 'collections' or 'albums/A-K'.",
),
- ConfigEntry(
- key="domain",
- type=ConfigEntryType.STRING,
- label="Domain",
- required=False,
- advanced=True,
- default_value="",
- description="The network domain. On windows, it is known as the workgroup. "
- "Usually, it is safe to leave this parameter as an empty string.",
- ),
- ConfigEntry(
- key="use_ntlm_v2",
- type=ConfigEntryType.BOOLEAN,
- label="Use NTLM v2",
- required=False,
- advanced=True,
- default_value=False,
- description="Indicates whether NTLMv1 or NTLMv2 authentication algorithm should "
- "be used for authentication. The choice of NTLMv1 and NTLMv2 is configured on "
- "the remote server, and there is no mechanism to auto-detect which algorithm has "
- "been configured. Hence, we can only “guess” or try both algorithms. On Sambda, "
- "Windows Vista and Windows 7, NTLMv2 is enabled by default. "
- "On Windows XP, we can use NTLMv1 before NTLMv2.",
- ),
- ConfigEntry(
- key="sign_options",
- type=ConfigEntryType.INTEGER,
- label="Sign Options",
- required=False,
- advanced=True,
- default_value=2,
- options=(
- ConfigValueOption("SIGN_NEVER", 0),
- ConfigValueOption("SIGN_WHEN_SUPPORTED", 1),
- ConfigValueOption("SIGN_WHEN_REQUIRED", 2),
- ),
- description="Determines whether SMB messages will be signed. "
- "Default is SIGN_WHEN_REQUIRED. If SIGN_WHEN_REQUIRED (value=2), "
- "SMB messages will only be signed when remote server requires signing. "
- "If SIGN_WHEN_SUPPORTED (value=1), SMB messages will be signed when "
- "remote server supports signing but not requires signing. "
- "If SIGN_NEVER (value=0), SMB messages will never be signed regardless "
- "of remote server’s configurations; access errors will occur if the "
- "remote server requires signing.",
- ),
- ConfigEntry(
- key="is_direct_tcp",
- type=ConfigEntryType.BOOLEAN,
- label="Use Direct TCP",
- required=False,
- advanced=True,
- default_value=False,
- description="Controls whether the NetBIOS over TCP/IP (is_direct_tcp=False) "
- "or the newer Direct hosting of SMB over TCP/IP (is_direct_tcp=True) will "
- "be used for the communication. The default parameter is False which will "
- "use NetBIOS over TCP/IP for wider compatibility (TCP port: 139).",
- ),
CONF_ENTRY_MISSING_ALBUM_ARTIST,
)
-async def create_item(file_path: str, entry: SharedFile, root_path: str) -> FileSystemItem:
- """Create FileSystemItem from smb.SharedFile."""
- rel_path = get_relative_path(root_path, file_path)
- abs_path = get_absolute_path(root_path, file_path)
- return FileSystemItem(
- name=entry.filename,
- path=rel_path,
- absolute_path=abs_path,
- is_file=not entry.isDirectory,
- is_dir=entry.isDirectory,
- checksum=str(int(entry.last_write_time)),
- file_size=entry.file_size,
- )
+async def create_item(base_path: str, entry: smbclient.SMBDirEntry) -> FileSystemItem:
+ """Create FileSystemItem from smbclient.SMBDirEntry."""
+
+ def _create_item():
+ entry_path = entry.path.replace("/\\", os.sep).replace("\\", os.sep)
+ absolute_path = get_absolute_path(base_path, entry_path)
+ stat = entry.stat(follow_symlinks=False)
+ return FileSystemItem(
+ name=entry.name,
+ path=get_relative_path(base_path, entry_path),
+ absolute_path=absolute_path,
+ is_file=entry.is_file(follow_symlinks=False),
+ is_dir=entry.is_dir(follow_symlinks=False),
+ checksum=str(int(stat.st_mtime)),
+ file_size=stat.st_size,
+ )
+
+ # run in thread because strictly taken this may be blocking IO
+ return await asyncio.to_thread(_create_item)
class SMBFileSystemProvider(FileSystemProviderBase):
"""Implementation of an SMB File System Provider."""
- _service_name = ""
- _root_path = "/"
- _remote_name = ""
- _target_ip = ""
-
async def handle_setup(self) -> None:
"""Handle async initialization of the provider."""
# silence SMB.SMBConnection logger a bit
logging.getLogger("SMB.SMBConnection").setLevel("WARNING")
- self._remote_name = self.config.get_value(CONF_HOST)
- self._service_name = self.config.get_value(CONF_SHARE)
-
- # validate provided path
+ server: str = self.config.get_value(CONF_HOST)
+ share: str = self.config.get_value(CONF_SHARE)
subfolder: str = self.config.get_value(CONF_SUBFOLDER)
- subfolder.replace("\\", "/")
- if not subfolder.startswith("/"):
- subfolder = "/" + subfolder
- if not subfolder.endswith("/"):
- subfolder += "/"
- self._root_path = subfolder
-
- # resolve dns name to IP
- target_ip = await get_ip_from_host(self._remote_name)
- if target_ip is None:
- raise LoginFailed(
- f"Unable to resolve {self._remote_name}, maybe use an IP address as remote host ?"
- )
- self._target_ip = target_ip
- # test connection and return
- # this code will raise if the connection did not succeed
- async with self._get_smb_connection():
- return
+ # register smb session
+ self.logger.info("Connecting to server %s", server)
+ self._session = await asyncio.to_thread(
+ smbclient.register_session,
+ server,
+ username=self.config.get_value(CONF_USERNAME),
+ password=self.config.get_value(CONF_PASSWORD),
+ )
+
+ # create windows like path (\\server\share\subfolder)
+ if subfolder.endswith(os.sep):
+ subfolder = subfolder[:-1]
+ self._root_path = f"{os.sep}{os.sep}{server}{os.sep}{share}{os.sep}{subfolder}"
+ self.logger.debug("Using root path: %s", self._root_path)
+ # validate provided path
+ if not await asyncio.to_thread(smbpath.isdir, self._root_path):
+ raise LoginFailed(f"Invalid share or subfolder given: {self._root_path}")
async def listdir(
- self,
- path: str,
- recursive: bool = False,
+ self, path: str, recursive: bool = False
) -> AsyncGenerator[FileSystemItem, None]:
"""List contents of a given provider directory/path.
----------
- path: path of the directory (relative or absolute) to list contents of.
Empty string for provider's root.
- - recursive: If True will recursively keep unwrapping subdirectories (scandir equivalent)
+ - recursive: If True will recursively keep unwrapping subdirectories (scandir equivalent).
Returns:
-------
"""
abs_path = get_absolute_path(self._root_path, path)
- async with self._get_smb_connection() as smb_conn:
- path_result: list[SharedFile] = await smb_conn.list_path(abs_path)
-
- for entry in path_result:
- if entry.filename.startswith("."):
+ for entry in await asyncio.to_thread(smbclient.scandir, abs_path):
+ if entry.name.startswith(".") or any(x in entry.name for x in IGNORE_DIRS):
# skip invalid/system files and dirs
continue
- file_path = os.path.join(path, entry.filename)
- item = await create_item(file_path, entry, self._root_path)
+ item = await create_item(self._root_path, entry)
if recursive and item.is_dir:
- # yield sublevel recursively
try:
- async for subitem in self.listdir(file_path, True):
+ async for subitem in self.listdir(item.absolute_path, True):
yield subitem
except (OSError, PermissionError) as err:
self.logger.warning("Skip folder %s: %s", item.path, str(err))
else:
- # yield single item (file or directory)
yield item
- async def resolve(self, file_path: str) -> FileSystemItem:
- """Resolve (absolute or relative) path to FileSystemItem."""
- abs_path = get_absolute_path(self._root_path, file_path)
- async with self._get_smb_connection() as smb_conn:
- entry: SharedFile = await smb_conn.get_attributes(abs_path)
+ async def resolve(
+ self, file_path: str, require_local: bool = False # noqa: ARG002
+ ) -> FileSystemItem:
+ """Resolve (absolute or relative) path to FileSystemItem.
+
+ If require_local is True, we prefer to have the `local_path` attribute filled
+ (e.g. with a tempfile), if supported by the provider/item.
+ """
+ file_path = file_path.replace("\\", os.sep)
+ absolute_path = get_absolute_path(self._root_path, file_path)
+
+ def _create_item():
+ stat = smbclient.stat(absolute_path, follow_symlinks=False)
return FileSystemItem(
- name=file_path,
+ name=basename(file_path),
path=get_relative_path(self._root_path, file_path),
- absolute_path=abs_path,
- is_file=not entry.isDirectory,
- is_dir=entry.isDirectory,
- checksum=str(int(entry.last_write_time)),
- file_size=entry.file_size,
+ absolute_path=absolute_path,
+ is_dir=smbpath.isdir(absolute_path),
+ is_file=smbpath.isfile(absolute_path),
+ checksum=str(int(stat.st_mtime)),
+ file_size=stat.st_size,
)
- @use_cache(15 * 60)
+ # run in thread because strictly taken this may be blocking IO
+ return await asyncio.to_thread(_create_item)
+
async def exists(self, file_path: str) -> bool:
- """Return bool if this FileSystem musicprovider has given file/dir."""
+ """Return bool is this FileSystem musicprovider has given file/dir."""
+ if not file_path:
+ return False # guard
+ file_path = file_path.replace("\\", os.sep)
abs_path = get_absolute_path(self._root_path, file_path)
- async with self._get_smb_connection() as smb_conn:
- return await smb_conn.path_exists(abs_path)
+ return await asyncio.to_thread(smbpath.exists, abs_path)
async def read_file_content(self, file_path: str, seek: int = 0) -> AsyncGenerator[bytes, None]:
"""Yield (binary) contents of file in chunks of bytes."""
+ file_path = file_path.replace("\\", os.sep)
abs_path = get_absolute_path(self._root_path, file_path)
-
- async with self._get_smb_connection() as smb_conn:
- async for chunk in smb_conn.retrieve_file(abs_path, seek):
- yield chunk
+ chunk_size = 512000
+ queue = asyncio.Queue()
+ self.logger.debug("Reading file contents for %s", abs_path)
+
+ def _reader():
+ with smbclient.open_file(abs_path, "rb", share_access="r") as _file:
+ if seek:
+ _file.seek(seek)
+ # yield chunks of data from file
+ while True:
+ data = _file.read(chunk_size)
+ if not data:
+ break
+ self.mass.loop.call_soon_threadsafe(queue.put_nowait, data)
+ self.mass.loop.call_soon_threadsafe(queue.put_nowait, b"")
+
+ self.mass.create_task(_reader)
+ while True:
+ chunk = await queue.get()
+ if chunk == b"":
+ break
+ yield chunk
async def write_file_content(self, file_path: str, data: bytes) -> None:
"""Write entire file content as bytes (e.g. for playlists)."""
+ file_path = file_path.replace("\\", os.sep)
abs_path = get_absolute_path(self._root_path, file_path)
- async with self._get_smb_connection() as smb_conn:
- await smb_conn.write_file(abs_path, data)
-
- @asynccontextmanager
- async def _get_smb_connection(self) -> AsyncGenerator[AsyncSMB, None]:
- """Get instance of AsyncSMB."""
- # For now we just create a connection per call
- # as that is the most reliable (but a bit slower)
- # this could be improved by creating a connection pool
- # if really needed
-
- async with AsyncSMB(
- remote_name=self._remote_name,
- service_name=self._service_name,
- username=self.config.get_value(CONF_USERNAME),
- password=self.config.get_value(CONF_PASSWORD),
- target_ip=self._target_ip,
- use_ntlm_v2=self.config.get_value("use_ntlm_v2"),
- sign_options=self.config.get_value("sign_options"),
- is_direct_tcp=self.config.get_value("is_direct_tcp"),
- ) as smb_conn:
- yield smb_conn
+
+ def _writer():
+ with smbclient.open_file(abs_path, "wb") as _file:
+ _file.write(data)
+
+ await asyncio.to_thread(_writer)
+++ /dev/null
-"""Some helpers for Filesystem based Musicproviders."""
-from __future__ import annotations
-
-import asyncio
-from collections.abc import AsyncGenerator
-from io import BytesIO
-
-from smb.base import OperationFailure, SharedFile
-from smb.SMBConnection import SMBConnection
-
-from music_assistant.common.models.errors import LoginFailed
-
-SERVICE_NAME = "music_assistant"
-
-
-class AsyncSMB:
- """Async wrapped pysmb."""
-
- def __init__(
- self,
- remote_name: str,
- service_name: str,
- username: str,
- password: str,
- target_ip: str,
- use_ntlm_v2: bool = True,
- sign_options: int = 2,
- is_direct_tcp: bool = False,
- ) -> None:
- """Initialize instance."""
- self._service_name = service_name
- self._remote_name = remote_name
- self._target_ip = target_ip
- self._username = username
- self._password = password
- self._conn = SMBConnection(
- username=self._username,
- password=self._password,
- my_name=SERVICE_NAME,
- remote_name=self._remote_name,
- use_ntlm_v2=use_ntlm_v2,
- sign_options=sign_options,
- is_direct_tcp=is_direct_tcp,
- )
- # the smb connection may not be used asynchronously and
- # each operation should take sequentially.
- # to support this, we use a Lock and we create a new.
- self._lock = asyncio.Lock()
-
- async def list_path(self, path: str) -> list[SharedFile]:
- """Retrieve a directory listing of files/folders at *path*."""
- async with self._lock:
- return await asyncio.to_thread(self._conn.listPath, self._service_name, path)
-
- async def get_attributes(self, path: str) -> SharedFile:
- """Retrieve information about the file at *path* on the *service_name*."""
- async with self._lock:
- return await asyncio.to_thread(self._conn.getAttributes, self._service_name, path)
-
- async def retrieve_file(self, path: str, offset: int = 0) -> AsyncGenerator[bytes, None]:
- """Retrieve file contents."""
- chunk_size = 256000
- while True:
- async with self._lock:
- with BytesIO() as file_obj:
- await asyncio.to_thread(
- self._conn.retrieveFileFromOffset,
- self._service_name,
- path,
- file_obj,
- offset,
- chunk_size,
- )
- file_obj.seek(0)
- chunk = file_obj.read()
- yield chunk
- offset += len(chunk)
- if len(chunk) < chunk_size:
- break
-
- async def write_file(self, path: str, data: bytes) -> SharedFile:
- """Store the contents to the file at *path*."""
- with BytesIO() as file_obj:
- file_obj.write(data)
- file_obj.seek(0)
- async with self._lock:
- await asyncio.to_thread(
- self._conn.storeFile,
- self._service_name,
- path,
- file_obj,
- )
-
- async def path_exists(self, path: str) -> bool:
- """Return bool is this FileSystem musicprovider has given file/dir."""
- async with self._lock:
- try:
- await asyncio.to_thread(self._conn.getAttributes, self._service_name, path)
- except (OperationFailure,):
- return False
- except IndexError:
- return False
- return True
-
- async def connect(self) -> None:
- """Connect to the SMB server."""
- async with self._lock:
- try:
- assert await asyncio.to_thread(self._conn.connect, self._target_ip) is True
- except Exception as exc:
- raise LoginFailed(f"SMB Connect failed to {self._remote_name}") from exc
-
- async def __aenter__(self) -> AsyncSMB:
- """Enter context manager."""
- # connect
- await self.connect()
- return self
-
- async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
- """Exit context manager."""
- self._conn.close()