import os.path
import time
import urllib.parse
-from collections.abc import AsyncGenerator, Iterator, Sequence
+from collections.abc import AsyncGenerator, Sequence
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
IsChapterFile,
)
from .helpers import (
- IGNORE_DIRS,
FileSystemItem,
get_absolute_path,
get_album_dir,
get_artist_dir,
get_relative_path,
+ recursive_iter,
sorted_scandir,
)
# NOTE: we do the entire traversing of the directory structure, including parsing tags
# in a single executor thread to save the overhead of having to spin up tons of tasks
- def listdir(path: str) -> Iterator[FileSystemItem]:
- """Recursively traverse directory entries."""
- for item in os.scandir(path):
- # ignore invalid filenames
- if item.name in IGNORE_DIRS or item.name.startswith((".", "_")):
- continue
- if item.is_dir(follow_symlinks=False):
- yield from listdir(item.path)
- elif item.is_file(follow_symlinks=False):
- # skip files without extension
- if "." not in item.name:
- continue
- ext = item.name.rsplit(".", 1)[1].lower()
- if ext not in SUPPORTED_EXTENSIONS:
- # skip unsupported file extension
- continue
- try:
- yield FileSystemItem.from_dir_entry(item, self.base_path)
- except OSError as err:
- # Skip files that cannot be stat'd (e.g., invalid encoding on SMB mounts)
- # This typically happens with emoji or special unicode characters
- self.logger.debug(
- "Skipping file %s due to stat error: %s",
- item.path,
- str(err),
- )
-
def run_sync() -> None:
"""Run the actual sync (in an executor job)."""
self.sync_running = True
try:
- for item in listdir(self.base_path):
+ for item in recursive_iter(
+ self.base_path, self.base_path, SUPPORTED_EXTENSIONS, self.logger
+ ):
prev_checksum = file_checksums.get(item.relative_path)
if self._process_item(item, prev_checksum):
cur_filenames.add(item.relative_path)
from __future__ import annotations
+import errno
+import logging
import os
import re
+from collections.abc import Iterator
from dataclasses import dataclass
from music_assistant.helpers.compare import compare_strings
+logger = logging.getLogger(__name__)
+
IGNORE_DIRS = ("recycle", "Recently-Snaphot", "#recycle", "System Volume Information", "lost+found")
if _dir_contains_album_name(album_name, dirname):
return parentdir
- if compare_strings(album_name.split("(")[0], dirname, False):
+ if compare_strings(album_name.split("(", maxsplit=1)[0], dirname, False):
# account for AlbumName (Version) format in the album name
return parentdir
- if compare_strings(album_name.split("(")[0], dirname.split(" - ")[-1], False):
+ if compare_strings(album_name.split("(", maxsplit=1)[0], dirname.split(" - ")[-1], False):
# account for ArtistName - AlbumName (Version) format
return parentdir
if len(album_name) > 8 and album_name in dirname:
return os.path.join(base_path, path)
+def recursive_iter(
+ path: str,
+ base_path: str,
+ supported_extensions: set[str],
+ log: logging.Logger,
+) -> Iterator[FileSystemItem]:
+ """Recursively traverse directory entries yielding supported files.
+
+ :param path: The directory path to scan.
+ :param base_path: The root base path for constructing relative paths.
+ :param supported_extensions: Set of file extensions to include (lowercase, no dot).
+ :param log: Logger instance to use for warnings/debug messages.
+ """
+ try:
+ scan_iter = os.scandir(path)
+ except OSError as err:
+ if err.errno == errno.EINVAL:
+ log.warning(
+ "Skipping directory '%s' - unsupported characters in path",
+ path,
+ )
+ else:
+ log.warning("Unable to scan directory %s: %s", path, err)
+ return
+ with scan_iter:
+ for item in scan_iter:
+ if item.name in IGNORE_DIRS or item.name.startswith((".", "_")):
+ continue
+ try:
+ is_dir = item.is_dir(follow_symlinks=False)
+ is_file = item.is_file(follow_symlinks=False)
+ except OSError as err:
+ if err.errno == errno.EINVAL:
+ log.warning(
+ "Skipping '%s' - unsupported characters in name",
+ item.name,
+ )
+ continue
+ if is_dir:
+ yield from recursive_iter(item.path, base_path, supported_extensions, log)
+ elif is_file:
+ if "." not in item.name:
+ continue
+ ext = item.name.rsplit(".", 1)[1].lower()
+ if ext not in supported_extensions:
+ continue
+ try:
+ yield FileSystemItem.from_dir_entry(item, base_path)
+ except OSError as err:
+ if err.errno == errno.EINVAL:
+ log.warning(
+ "Skipping '%s' - unsupported characters in name",
+ item.name,
+ )
+ else:
+ log.debug(
+ "Skipping file %s due to OS error: %s",
+ item.path,
+ str(err),
+ )
+
+
def sorted_scandir(base_path: str, sub_path: str, sort: bool = False) -> list[FileSystemItem]:
"""
Implement os.scandir that returns (optionally) sorted entries.
if base_path not in sub_path:
sub_path = os.path.join(base_path, sub_path)
- items = []
- for entry in os.scandir(sub_path):
- # filter out invalid dirs and hidden files
- if not (entry.is_dir(follow_symlinks=False) or entry.is_file(follow_symlinks=False)):
- continue
- if entry.name in IGNORE_DIRS or entry.name.startswith("."):
- continue
- try:
- items.append(FileSystemItem.from_dir_entry(entry, base_path))
- except OSError:
- # Skip files that cannot be stat'd (e.g., invalid encoding on SMB mounts)
- # This typically happens with emoji or special unicode characters
- continue
+ items: list[FileSystemItem] = []
+ try:
+ entries = os.scandir(sub_path)
+ except OSError as err:
+ if err.errno == errno.EINVAL:
+ logger.warning(
+ "Skipping directory '%s' - unsupported characters in path",
+ sub_path,
+ )
+ return items
+ raise
+ with entries:
+ for entry in entries:
+ try:
+ is_dir = entry.is_dir(follow_symlinks=False)
+ is_file = entry.is_file(follow_symlinks=False)
+ except OSError as err:
+ if err.errno == errno.EINVAL:
+ logger.warning(
+ "Skipping '%s' - unsupported characters in name",
+ entry.name,
+ )
+ continue
+ if not (is_dir or is_file):
+ continue
+ if entry.name in IGNORE_DIRS or entry.name.startswith("."):
+ continue
+ try:
+ items.append(FileSystemItem.from_dir_entry(entry, base_path))
+ except OSError as err:
+ if err.errno == errno.EINVAL:
+ logger.warning(
+ "Skipping '%s' - unsupported characters in name",
+ entry.name,
+ )
+ else:
+ logger.debug("Skipping '%s' due to OS error: %s", entry.name, err)
+ continue
if sort:
return sorted(