Refactor Smart fades (#2582)
authorMarvin Schenkel <marvinschenkel@gmail.com>
Fri, 7 Nov 2025 16:16:53 +0000 (17:16 +0100)
committerGitHub <noreply@github.com>
Fri, 7 Nov 2025 16:16:53 +0000 (17:16 +0100)
music_assistant/constants.py
music_assistant/controllers/config.py
music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/helpers/smart_fades.py
music_assistant/models/smart_fades.py

index 35854ac5e130a7de35bc684425e5135e1843fe6e..7aed50f41d9ce45408013fc8ece73b671bc4452a 100644 (file)
@@ -310,12 +310,12 @@ CONF_ENTRY_SMART_FADES_MODE = ConfigEntry(
     label="Enable Smart Fades",
     options=[
         ConfigValueOption("Disabled", "disabled"),
-        ConfigValueOption("Smart Fades", "smart_fades"),
+        ConfigValueOption("Smart Crossfade", "smart_crossfade"),
         ConfigValueOption("Standard Crossfade", "standard_crossfade"),
     ],
     default_value="disabled",
     description="Select the crossfade mode to use when transitioning between tracks.\n\n"
-    "- 'Smart Fades': Uses beat matching and DJ-like EQ filters to create smooth transitions"
+    "- 'Smart Crossfade': Uses beat matching and EQ filters to create smooth transitions"
     " between tracks.\n"
     "- 'Standard Crossfade': Regular crossfade that crossfades the last/first x-seconds of a "
     "track.",
index 9a91a6337811acddbed74ece9fd4929b9a0bfe52..f8dae8ff378d77eae3181a0e63287d0d0d5da732 100644 (file)
@@ -1048,12 +1048,23 @@ class ConfigController:
 
         # Migrate the crossfade setting into Smart Fade Mode = 'crossfade'
         for player_config in self._data.get(CONF_PLAYERS, {}).values():
-            if (crossfade := player_config.pop(CONF_DEPRECATED_CROSSFADE, None)) is None:
+            if not (values := player_config.get("values")):
+                continue
+            if (crossfade := values.pop(CONF_DEPRECATED_CROSSFADE, None)) is None:
                 continue
             # Check if player has old crossfade enabled but no smart fades mode set
-            if crossfade is True and CONF_SMART_FADES_MODE not in player_config:
+            if crossfade is True and CONF_SMART_FADES_MODE not in values:
                 # Set smart fades mode to standard_crossfade
-                player_config[CONF_SMART_FADES_MODE] = "standard_crossfade"
+                values[CONF_SMART_FADES_MODE] = "standard_crossfade"
+                changed = True
+
+        # Migrate smart_fades mode value to smart_crossfade
+        for player_config in self._data.get(CONF_PLAYERS, {}).values():
+            if not (values := player_config.get("values")):
+                continue
+            if values.get(CONF_SMART_FADES_MODE) == "smart_fades":
+                # Update old 'smart_fades' value to new 'smart_crossfade' value
+                values[CONF_SMART_FADES_MODE] = "smart_crossfade"
                 changed = True
 
         # migrate player configs: always use lookup key for provider
index f83514bcf6724c59f87ad82b448e38a641b099a6..8233a21da7e86dc0c1626cb1277aa8f486e380cc 100644 (file)
@@ -998,7 +998,7 @@ class StreamsController(CoreController):
             # calculate crossfade buffer size
             crossfade_buffer_duration = (
                 SMART_CROSSFADE_DURATION
-                if smart_fades_mode == SmartFadesMode.SMART_FADES
+                if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
                 else standard_crossfade_duration
             )
             crossfade_buffer_duration = min(
@@ -1396,7 +1396,7 @@ class StreamsController(CoreController):
         self,
         queue_item: QueueItem,
         pcm_format: AudioFormat,
-        smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_FADES,
+        smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
         standard_crossfade_duration: int = 10,
     ) -> AsyncGenerator[bytes, None]:
         """Get the audio stream for a single queue item with (smart) crossfade to the next item."""
@@ -1434,7 +1434,7 @@ class StreamsController(CoreController):
         # calculate crossfade buffer size
         crossfade_buffer_duration = (
             SMART_CROSSFADE_DURATION
-            if smart_fades_mode == SmartFadesMode.SMART_FADES
+            if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
             else standard_crossfade_duration
         )
         crossfade_buffer_duration = min(
index 9c6b3f51343ddd9a644771fe8828b35830539fda..11a91d7ad44f327d677fbdb259a916f7706d7476 100644 (file)
@@ -98,105 +98,6 @@ def align_audio_to_frame_boundary(audio_data: bytes, pcm_format: AudioFormat) ->
     return audio_data
 
 
-async def crossfade_pcm_parts(
-    fade_in_part: bytes,
-    fade_out_part: bytes,
-    pcm_format: AudioFormat,
-    fade_out_pcm_format: AudioFormat | None = None,
-) -> bytes:
-    """Crossfade two chunks of pcm/raw audio using ffmpeg."""
-    if fade_out_pcm_format is None:
-        fade_out_pcm_format = pcm_format
-
-    # calculate the fade_length from the smallest chunk
-    fade_length = min(
-        len(fade_in_part) / pcm_format.pcm_sample_size,
-        len(fade_out_part) / fade_out_pcm_format.pcm_sample_size,
-    )
-    # write the fade_out_part to a temporary file
-    fadeout_filename = f"/tmp/{shortuuid.random(20)}.pcm"  # noqa: S108
-    async with aiofiles.open(fadeout_filename, "wb") as outfile:
-        await outfile.write(fade_out_part)
-
-    args = [
-        # generic args
-        "ffmpeg",
-        "-hide_banner",
-        "-loglevel",
-        "quiet",
-        # fadeout part (as file)
-        "-acodec",
-        fade_out_pcm_format.content_type.name.lower(),
-        "-ac",
-        str(fade_out_pcm_format.channels),
-        "-ar",
-        str(fade_out_pcm_format.sample_rate),
-        "-channel_layout",
-        "mono" if fade_out_pcm_format.channels == 1 else "stereo",
-        "-f",
-        fade_out_pcm_format.content_type.value,
-        "-i",
-        fadeout_filename,
-        # fade_in part (stdin)
-        "-acodec",
-        pcm_format.content_type.name.lower(),
-        "-ac",
-        str(pcm_format.channels),
-        "-channel_layout",
-        "mono" if pcm_format.channels == 1 else "stereo",
-        "-ar",
-        str(pcm_format.sample_rate),
-        "-f",
-        pcm_format.content_type.value,
-        "-i",
-        "-",
-        # filter args
-        "-filter_complex",
-        f"[0][1]acrossfade=d={fade_length}",
-        # output args
-        "-acodec",
-        pcm_format.content_type.name.lower(),
-        "-ac",
-        str(pcm_format.channels),
-        "-channel_layout",
-        "mono" if pcm_format.channels == 1 else "stereo",
-        "-ar",
-        str(pcm_format.sample_rate),
-        "-f",
-        pcm_format.content_type.value,
-        "-",
-    ]
-    _, crossfaded_audio, _ = await communicate(args, fade_in_part)
-    await remove_file(fadeout_filename)
-    if crossfaded_audio:
-        LOGGER.log(
-            VERBOSE_LOG_LEVEL,
-            "crossfaded 2 pcm chunks. fade_in_part: %s - "
-            "fade_out_part: %s - fade_length: %s seconds",
-            len(fade_in_part),
-            len(fade_out_part),
-            fade_length,
-        )
-        return crossfaded_audio
-    # no crossfade_data, return original data instead
-    LOGGER.debug(
-        "crossfade of pcm chunks failed: not enough data? - fade_in_part: %s - fade_out_part: %s",
-        len(fade_in_part),
-        len(fade_out_part),
-    )
-    if fade_out_pcm_format.sample_rate != pcm_format.sample_rate:
-        # Edge case: the sample rates are different,
-        # we need to resample the fade_out part to the same sample rate as the fade_in part
-        async with FFMpeg(
-            audio_input="-",
-            input_format=fade_out_pcm_format,
-            output_format=pcm_format,
-        ) as ffmpeg:
-            res = await ffmpeg.communicate(fade_out_part)
-            return res[0] + fade_in_part
-    return fade_out_part + fade_in_part
-
-
 async def strip_silence(
     mass: MusicAssistant,  # noqa: ARG001
     audio_data: bytes,
index edb59dbaa1954a112fa5b75b6a4d82db9ee2c8b4..259ea7d1d6ff2b394fa03649e1a7ebbdbc36f72a 100644 (file)
@@ -1,15 +1,12 @@
 """Smart Fades - Object-oriented implementation with intelligent fades and adaptive filtering."""
 
-# TODO: Figure out if we can achieve shared buffer with StreamController on full
-# current and next track for more EQ options.
-# TODO: Refactor the Analyzer into a metadata controller after we have split the controllers
-# TODO: Refactor the Mixer into a stream controller after we have split the controllers
 from __future__ import annotations
 
 import asyncio
 import logging
 import time
 import warnings
+from abc import ABC, abstractmethod
 from typing import TYPE_CHECKING
 
 import aiofiles
@@ -21,7 +18,6 @@ import shortuuid
 from music_assistant.constants import VERBOSE_LOG_LEVEL
 from music_assistant.helpers.audio import (
     align_audio_to_frame_boundary,
-    crossfade_pcm_parts,
     strip_silence,
 )
 from music_assistant.helpers.process import communicate
@@ -40,8 +36,6 @@ if TYPE_CHECKING:
 
 SMART_CROSSFADE_DURATION = 45
 ANALYSIS_FPS = 100
-# Only apply time stretching if BPM difference is < this %
-TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD = 5.0
 
 
 class SmartFadesAnalyzer:
@@ -78,8 +72,6 @@ class SmartFadesAnalyzer:
                 fragment_duration,
                 len(audio_data),
             )
-            # Perform beat analysis
-
             # Convert PCM bytes to numpy array and then to mono for analysis
             audio_array = np.frombuffer(audio_data, dtype=np.float32)
             if pcm_format.channels > 1:
@@ -270,128 +262,258 @@ class SmartFadesAnalyzer:
             return None
 
 
-class SmartFadesMixer:
-    """Smart fades mixer class that mixes tracks based on analysis data."""
+#############################
+# SMART FADES EQ LOGIC
+#############################
 
-    def __init__(self, mass: MusicAssistant) -> None:
-        """Initialize smart fades mixer."""
-        self.mass = mass
-        self.logger = logging.getLogger(__name__)
-        # TODO: Refactor into stream (or metadata) controller after we have split the controllers
-        self.analyzer = SmartFadesAnalyzer(mass)
 
-    async def mix(
+class Filter(ABC):
+    """Abstract base class for audio filters."""
+
+    output_fadeout_label: str
+    output_fadein_label: str
+
+    @abstractmethod
+    def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]:
+        """Apply the filter and return the FFmpeg filter strings."""
+
+
+class TimeStretchFilter(Filter):
+    """Filter that applies time stretching to match BPM using rubberband."""
+
+    output_fadeout_label: str = "fadeout_stretched"
+    output_fadein_label: str = "fadein_unchanged"
+
+    def __init__(
         self,
-        fade_in_part: bytes,
-        fade_out_part: bytes,
-        fade_in_streamdetails: StreamDetails,
-        fade_out_streamdetails: StreamDetails,
-        pcm_format: AudioFormat,
-        standard_crossfade_duration: int = 10,
-        mode: SmartFadesMode = SmartFadesMode.SMART_FADES,
-    ) -> bytes:
-        """Apply crossfade with internal state management and smart/standard fallback logic."""
-        if mode == SmartFadesMode.DISABLED:
-            # No crossfade, just concatenate
-            # Note that this should not happen since we check this before calling mix()
-            # but just to be sure...
-            return fade_out_part + fade_in_part
+        stretch_ratio: float,
+    ):
+        """Initialize time stretch filter."""
+        self.stretch_ratio = stretch_ratio
 
-        # strip silence from end of audio of fade_out_part
-        fade_out_part = await strip_silence(
-            self.mass,
-            fade_out_part,
-            pcm_format=pcm_format,
-            reverse=True,
-        )
-        # Ensure frame alignment after silence stripping
-        fade_out_part = align_audio_to_frame_boundary(fade_out_part, pcm_format)
+    def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]:
+        """Create FFmpeg filters to gradually adjust tempo from original BPM to target BPM."""
+        return [
+            f"{input_fadeout_label}rubberband=tempo={self.stretch_ratio:.6f}:transients=mixed:detector=soft:pitchq=quality"
+            f"[{self.output_fadeout_label}]",
+            f"{input_fadein_label}anull[{self.output_fadein_label}]",  # codespell:ignore anull
+        ]
 
-        # strip silence from begin of audio of fade_in_part
-        fade_in_part = await strip_silence(
-            self.mass,
-            fade_in_part,
-            pcm_format=pcm_format,
-            reverse=False,
-        )
-        # Ensure frame alignment after silence stripping
-        fade_in_part = align_audio_to_frame_boundary(fade_in_part, pcm_format)
-        if mode == SmartFadesMode.STANDARD_CROSSFADE:
-            # crossfade with standard crossfade
-            return await self._default_crossfade(
-                fade_in_part,
-                fade_out_part,
-                pcm_format,
-                standard_crossfade_duration,
-            )
-        # Attempt smart crossfade with analysis data
-        fade_out_analysis: SmartFadesAnalysis | None
-        if stored_analysis := await self.mass.music.get_smart_fades_analysis(
-            fade_out_streamdetails.item_id,
-            fade_out_streamdetails.provider,
-            SmartFadesAnalysisFragment.OUTRO,
-        ):
-            fade_out_analysis = stored_analysis
+    def __repr__(self) -> str:
+        """Return string representation of TimeStretchFilter."""
+        return f"TimeStretch(ratio={self.stretch_ratio:.2f})"
+
+
+class TrimFilter(Filter):
+    """Filter that trims incoming track to align with downbeats."""
+
+    output_fadeout_label: str = "fadeout_beatalign"
+    output_fadein_label: str = "fadein_beatalign"
+
+    def __init__(self, fadein_start_pos: float):
+        """Initialize beat align filter.
+
+        Args:
+            fadein_start_pos: Position in seconds to trim the incoming track to
+        """
+        self.fadein_start_pos = fadein_start_pos
+
+    def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]:
+        """Trim the incoming track to align with downbeats."""
+        return [
+            f"{input_fadeout_label}anull[{self.output_fadeout_label}]",  # codespell:ignore anull
+            f"{input_fadein_label}atrim=start={self.fadein_start_pos},asetpts=PTS-STARTPTS[{self.output_fadein_label}]",
+        ]
+
+    def __repr__(self) -> str:
+        """Return string representation of TrimFilter."""
+        return f"Trim(trim={self.fadein_start_pos:.2f}s)"
+
+
+class FrequencySweepFilter(Filter):
+    """Filter that creates frequency sweep effects (lowpass/highpass transitions)."""
+
+    output_fadeout_label: str = "frequency_sweep"
+    output_fadein_label: str = "frequency_sweep"
+
+    def __init__(
+        self,
+        sweep_type: str,
+        target_freq: int,
+        duration: float,
+        start_time: float,
+        sweep_direction: str,
+        poles: int,
+        curve_type: str,
+        stream_type: str = "fadeout",
+    ):
+        """Initialize frequency sweep filter.
+
+        Args:
+            sweep_type: 'lowpass' or 'highpass'
+            target_freq: Target frequency for the filter
+            duration: Duration of the sweep in seconds
+            start_time: When to start the sweep
+            sweep_direction: 'fade_in' (unfiltered->filtered) or 'fade_out' (filtered->unfiltered)
+            poles: Number of poles for the filter
+            curve_type: 'linear', 'exponential', or 'logarithmic'
+            stream_type: 'fadeout' or 'fadein' - which stream to process
+        """
+        self.sweep_type = sweep_type
+        self.target_freq = target_freq
+        self.duration = duration
+        self.start_time = start_time
+        self.sweep_direction = sweep_direction
+        self.poles = poles
+        self.curve_type = curve_type
+        self.stream_type = stream_type
+
+        # Set output labels based on stream type
+        if stream_type == "fadeout":
+            self.output_fadeout_label = f"fadeout_{sweep_type}"
+            self.output_fadein_label = "fadein_passthrough"
         else:
-            fade_out_analysis = await self.analyzer.analyze(
-                fade_out_streamdetails.item_id,
-                fade_out_streamdetails.provider,
-                SmartFadesAnalysisFragment.OUTRO,
-                fade_out_part,
-                pcm_format,
-            )
+            self.output_fadeout_label = "fadeout_passthrough"
+            self.output_fadein_label = f"fadein_{sweep_type}"
 
-        fade_in_analysis: SmartFadesAnalysis | None
-        if stored_analysis := await self.mass.music.get_smart_fades_analysis(
-            fade_in_streamdetails.item_id,
-            fade_in_streamdetails.provider,
-            SmartFadesAnalysisFragment.INTRO,
-        ):
-            fade_in_analysis = stored_analysis
+    def _generate_volume_expr(self, start: float, dur: float, direction: str, curve: str) -> str:
+        t_expr = f"t-{start}"  # Time relative to start
+        norm_t = f"min(max({t_expr},0),{dur})/{dur}"  # Normalized 0-1
+
+        if curve == "exponential":
+            # Exponential curve for smoother transitions
+            if direction == "up":
+                return f"'pow({norm_t},2)':eval=frame"
+            else:
+                return f"'1-pow({norm_t},2)':eval=frame"
+        elif curve == "logarithmic":
+            # Logarithmic curve for more aggressive initial change
+            if direction == "up":
+                return f"'sqrt({norm_t})':eval=frame"
+            else:
+                return f"'1-sqrt({norm_t})':eval=frame"
+        elif direction == "up":
+            return f"'{norm_t}':eval=frame"
         else:
-            fade_in_analysis = await self.analyzer.analyze(
-                fade_in_streamdetails.item_id,
-                fade_in_streamdetails.provider,
-                SmartFadesAnalysisFragment.INTRO,
-                fade_in_part,
-                pcm_format,
-            )
-        if (
-            fade_out_analysis
-            and fade_in_analysis
-            and fade_out_analysis.confidence > 0.3
-            and fade_in_analysis.confidence > 0.3
-            and mode == SmartFadesMode.SMART_FADES
-        ):
-            try:
-                return await self._apply_smart_crossfade(
-                    fade_out_analysis,
-                    fade_in_analysis,
-                    fade_out_part,
-                    fade_in_part,
-                    pcm_format,
-                )
-            except Exception as e:
-                self.logger.warning(
-                    "Smart crossfade failed: %s, falling back to standard crossfade", e
-                )
+            return f"'1-{norm_t}':eval=frame"
 
-        return await self._default_crossfade(
-            fade_in_part,
-            fade_out_part,
-            pcm_format,
-            standard_crossfade_duration,
+    def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]:
+        """Generate FFmpeg filters for frequency sweep effect."""
+        # Select the correct input based on stream type
+        if self.stream_type == "fadeout":
+            input_label = input_fadeout_label
+            output_label = self.output_fadeout_label
+            passthrough_label = self.output_fadein_label
+            passthrough_input = input_fadein_label
+        else:
+            input_label = input_fadein_label
+            output_label = self.output_fadein_label
+            passthrough_label = self.output_fadeout_label
+            passthrough_input = input_fadeout_label
+
+        orig_label = f"{output_label}_orig"
+        filter_label = f"{output_label}_to{self.sweep_type[:2]}"
+        filtered_label = f"{output_label}_filtered"
+        orig_faded_label = f"{output_label}_orig_faded"
+        filtered_faded_label = f"{output_label}_filtered_faded"
+
+        # Determine volume ramp directions based on sweep direction
+        if self.sweep_direction == "fade_in":
+            # Fade from dry to wet (unfiltered to filtered)
+            orig_direction = "down"
+            filter_direction = "up"
+        else:  # fade_out
+            # Fade from wet to dry (filtered to unfiltered)
+            orig_direction = "up"
+            filter_direction = "down"
+
+        # Build filter chain
+        orig_volume_expr = self._generate_volume_expr(
+            self.start_time, self.duration, orig_direction, self.curve_type
         )
+        filtered_volume_expr = self._generate_volume_expr(
+            self.start_time, self.duration, filter_direction, self.curve_type
+        )
+
+        return [
+            # Pass through the other stream unchanged
+            f"{passthrough_input}anull[{passthrough_label}]",  # codespell:ignore anull
+            # Split input into two paths
+            f"{input_label}asplit=2[{orig_label}][{filter_label}]",
+            # Apply frequency filter to one path
+            f"[{filter_label}]{self.sweep_type}=f={self.target_freq}:poles={self.poles}[{filtered_label}]",
+            # Apply time-varying volume to original path
+            f"[{orig_label}]volume={orig_volume_expr}[{orig_faded_label}]",
+            # Apply time-varying volume to filtered path
+            f"[{filtered_label}]volume={filtered_volume_expr}[{filtered_faded_label}]",
+            # Mix the two paths together
+            f"[{orig_faded_label}][{filtered_faded_label}]amix=inputs=2:duration=longest:normalize=0[{output_label}]",
+        ]
+
+    def __repr__(self) -> str:
+        """Return string representation of FrequencySweepFilter."""
+        return f"FreqSweep({self.sweep_type}@{self.target_freq}Hz)"
+
+
+class CrossfadeFilter(Filter):
+    """Filter that applies the final crossfade between fadeout and fadein streams."""
+
+    output_fadeout_label: str = "crossfade"
+    output_fadein_label: str = "crossfade"
+
+    def __init__(self, crossfade_duration: float):
+        """Initialize crossfade filter."""
+        self.crossfade_duration = crossfade_duration
+
+    def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]:
+        """Apply the acrossfade filter."""
+        return [f"{input_fadeout_label}{input_fadein_label}acrossfade=d={self.crossfade_duration}"]
+
+    def __repr__(self) -> str:
+        """Return string representation of CrossfadeFilter."""
+        return f"Crossfade(d={self.crossfade_duration:.1f}s)"
+
+
+class SmartFade(ABC):
+    """Abstract base class for Smart Fades."""
+
+    filters: list[Filter]
 
-    async def _apply_smart_crossfade(
+    def __init__(self) -> None:
+        """Initialize SmartFade base class."""
+        self.logger = logging.getLogger(__name__)
+        self.filters = []
+
+    @abstractmethod
+    def _build(self) -> None:
+        """Build the smart fades filter chain."""
+        ...
+
+    def _get_ffmpeg_filters(
+        self,
+        input_fadein_label: str = "[1]",
+        input_fadeout_label: str = "[0]",
+    ) -> list[str]:
+        """Get FFmpeg filters for smart fades."""
+        if not self.filters:
+            self._build()
+        filters = []
+        _cur_fadein_label = input_fadein_label
+        _cur_fadeout_label = input_fadeout_label
+        for audio_filter in self.filters:
+            filter_strings = audio_filter.apply(_cur_fadein_label, _cur_fadeout_label)
+            filters.extend(filter_strings)
+            _cur_fadein_label = f"[{audio_filter.output_fadein_label}]"
+            _cur_fadeout_label = f"[{audio_filter.output_fadeout_label}]"
+        return filters
+
+    async def apply(
         self,
-        fade_out_analysis: SmartFadesAnalysis,
-        fade_in_analysis: SmartFadesAnalysis,
         fade_out_part: bytes,
         fade_in_part: bytes,
         pcm_format: AudioFormat,
     ) -> bytes:
-        """Apply smart crossfade with beat-perfect timing and adaptive filtering."""
+        """Apply the smart fade to the given PCM audio parts."""
         # Write the fade_out_part to a temporary file
         fadeout_filename = f"/tmp/{shortuuid.random(20)}.pcm"  # noqa: S108
         async with aiofiles.open(fadeout_filename, "wb") as outfile:
@@ -428,10 +550,10 @@ class SmartFadesMixer:
             "-i",
             "-",
         ]
-
-        smart_fade_filters = self._create_enhanced_smart_fade_filters(
-            fade_out_analysis,
-            fade_in_analysis,
+        smart_fade_filters = self._get_ffmpeg_filters()
+        self.logger.debug(
+            "Applying smartfade: %s",
+            self,
         )
         args.extend(
             [
@@ -451,7 +573,7 @@ class SmartFadesMixer:
                 "-",
             ]
         )
-
+        self.logger.debug("FFmpeg smartfade args: %s", " ".join(args))
         self.logger.log(VERBOSE_LOG_LEVEL, "FFmpeg command args: %s", " ".join(args))
 
         # Execute the enhanced smart fade with full buffer
@@ -464,113 +586,147 @@ class SmartFadesMixer:
             stderr_msg = stderr.decode() if stderr else "(no stderr output)"
             raise RuntimeError(f"Smart crossfade failed. FFmpeg stderr: {stderr_msg}")
 
-    # SMART FADE HELPER METHODS
-    def _create_enhanced_smart_fade_filters(
-        self,
-        fade_out_analysis: SmartFadesAnalysis,
-        fade_in_analysis: SmartFadesAnalysis,
-    ) -> list[str]:
-        """Create smart fade filters with perfect timing and adaptive filtering."""
-        # Calculate optimal crossfade bars that fit in available buffer
-        crossfade_bars = self._calculate_optimal_crossfade_bars(fade_out_analysis, fade_in_analysis)
+    def __repr__(self) -> str:
+        """Return string representation of SmartFade showing the filter chain."""
+        if not self.filters:
+            return f"<{self.__class__.__name__}: 0 filters>"
 
-        # Calculate beat positions for the selected bar count
-        fadeout_start_pos, fadein_start_pos = self._calculate_optimal_fade_timing(
-            fade_out_analysis, fade_in_analysis, crossfade_bars
-        )
+        chain = " → ".join(repr(f) for f in self.filters)
+        return f"<{self.__class__.__name__}: {len(self.filters)} filters> {chain}"
 
-        # Log the final selected timing
-        if fadeout_start_pos is not None and fadein_start_pos is not None:
-            self.logger.debug(
-                "Beat timing selected: fadeout=%.2fs, fadein=%.2fs (%d bars)",
-                fadeout_start_pos,
-                fadein_start_pos,
-                crossfade_bars,
-            )
 
-        filters: list[str] = []
+class SmartCrossFade(SmartFade):
+    """Smart fades class that implements a Smart Fade mode."""
 
-        # Calculate initial crossfade duration (may be adjusted later for downbeat alignment)
-        initial_crossfade_duration = self._calculate_crossfade_duration(
-            crossfade_bars=crossfade_bars,
-            fade_in_analysis=fade_in_analysis,
-        )
+    # Only apply time stretching if BPM difference is < this %
+    time_stretch_bpm_percentage_threshold: float = 5.0
+
+    def __init__(
+        self, fade_out_analysis: SmartFadesAnalysis, fade_in_analysis: SmartFadesAnalysis
+    ) -> None:
+        """Initialize SmartFades with analysis data.
+
+        Args:
+            fade_out_analysis: Analysis data for the outgoing track
+            fade_in_analysis: Analysis data for the incoming track
+            logger: Optional logger for debug output
+        """
+        self.fade_out_analysis = fade_out_analysis
+        self.fade_in_analysis = fade_in_analysis
+        super().__init__()
+
+    def _build(self) -> None:
+        """Build the smart fades filter chain."""
+        # Calculate tempo factor for time stretching
+        bpm_ratio = self.fade_in_analysis.bpm / self.fade_out_analysis.bpm
+        bpm_diff_percent = abs(1.0 - bpm_ratio) * 100
 
-        # Create time stretch filters - needs to know crossfade duration to complete
-        # tempo ramping before the crossfade starts
-        time_stretch_filters, tempo_factor = self._create_time_stretch_filters(
-            fade_out_analysis=fade_out_analysis,
-            fade_in_analysis=fade_in_analysis,
-            crossfade_bars=crossfade_bars,
-            crossfade_duration=initial_crossfade_duration,
+        # Extrapolate downbeats for better bar calculation
+        self.extrapolated_fadeout_downbeats = extrapolate_downbeats(
+            self.fade_out_analysis.downbeats,
+            tempo_factor=1.0,
+            bpm=self.fade_out_analysis.bpm,
         )
-        filters.extend(time_stretch_filters)
 
-        crossfade_duration = initial_crossfade_duration
+        # Calculate optimal crossfade bars that fit in available buffer
+        crossfade_bars = self._calculate_optimal_crossfade_bars()
 
-        # Check if we would have enough audio after beat alignment for the crossfade
+        # Calculate beat positions for the selected bar count
+        fadein_start_pos = self._calculate_optimal_fade_timing(crossfade_bars)
+
+        # Calculate initial crossfade duration (may be adjusted later for downbeat alignment)
+        crossfade_duration = self._calculate_crossfade_duration(crossfade_bars=crossfade_bars)
+
+        # Add time stretch filter if needed
         if (
-            fadein_start_pos is not None
-            and fadein_start_pos + crossfade_duration > SMART_CROSSFADE_DURATION
+            0.1 < bpm_diff_percent <= self.time_stretch_bpm_percentage_threshold
+            and crossfade_bars > 4
         ):
+            self.filters.append(TimeStretchFilter(stretch_ratio=bpm_ratio))
+            # Re-extrapolate downbeats with actual tempo factor for time-stretched audio
+            self.extrapolated_fadeout_downbeats = extrapolate_downbeats(
+                self.fade_out_analysis.downbeats,
+                tempo_factor=bpm_ratio,
+                bpm=self.fade_out_analysis.bpm,
+            )
+
+        # Check if we would have enough audio after beat alignment for the crossfade
+        if fadein_start_pos and fadein_start_pos + crossfade_duration <= SMART_CROSSFADE_DURATION:
+            self.filters.append(TrimFilter(fadein_start_pos=fadein_start_pos))
+        else:
             self.logger.debug(
                 "Skipping beat alignment: not enough audio after trim (%.1fs + %.1fs > %.1fs)",
                 fadein_start_pos,
                 crossfade_duration,
                 SMART_CROSSFADE_DURATION,
             )
-            # Skip beat alignment
-            fadein_start_pos = None
 
         # Adjust crossfade duration to align with outgoing track's downbeats
-        # This prevents echo-ey sounds when both tracks have kicks during the crossfade
         crossfade_duration = self._adjust_crossfade_to_downbeats(
-            fade_out_analysis=fade_out_analysis,
             crossfade_duration=crossfade_duration,
             fadein_start_pos=fadein_start_pos,
-            tempo_factor=tempo_factor,
         )
 
-        beat_align_filters = self._trim_incoming_track_to_downbeat(
-            fadein_start_pos=fadein_start_pos,
-            fadeout_input_label="[fadeout_stretched]",
-            fadein_input_label="[1]",
-        )
-        filters.extend(beat_align_filters)
+        # 90 BPM -> 1500Hz, 140 BPM -> 2500Hz
+        avg_bpm = (self.fade_out_analysis.bpm + self.fade_in_analysis.bpm) / 2
+        crossover_freq = int(np.clip(1500 + (avg_bpm - 90) * 20, 1500, 2500))
 
-        self.logger.debug(
-            "Smart fade: out_bpm=%.1f, in_bpm=%.1f, %d bars, crossfade: %.2fs%s",
-            fade_out_analysis.bpm,
-            fade_in_analysis.bpm,
-            crossfade_bars,
-            crossfade_duration,
-            ", beat-aligned" if fadein_start_pos else "",
-        )
-        frequency_filters = self._apply_eq_filters(
-            fade_out_analysis=fade_out_analysis,
-            fade_in_analysis=fade_in_analysis,
-            fade_out_label="[fadeout_beatalign]",
-            fade_in_label="[fadein_beatalign]",
-            crossfade_duration=crossfade_duration,
-            crossfade_bars=crossfade_bars,
+        # Adjust for BPM mismatch
+        if abs(bpm_ratio - 1.0) > 0.3:
+            crossover_freq = int(crossover_freq * 0.85)
+
+        # For shorter fades, use exp/exp curves to avoid abruptness
+        if crossfade_bars < 8:
+            fadeout_curve = "exponential"
+            fadein_curve = "exponential"
+        # For long fades, use log/linear curves
+        else:
+            # Use logarithmic curve to give the next track more space
+            fadeout_curve = "logarithmic"
+            # Use linear curve for transition, predictable and not too abrupt
+            fadein_curve = "linear"
+
+        # Create lowpass filter on the outgoing track (unfiltered → low-pass)
+        # Extended lowpass effect to gradually remove bass frequencies
+        fadeout_eq_duration = min(max(crossfade_duration * 2.5, 8.0), SMART_CROSSFADE_DURATION)
+        # The crossfade always happens at the END of the buffer
+        fadeout_eq_start = max(0, SMART_CROSSFADE_DURATION - fadeout_eq_duration)
+        fadeout_sweep = FrequencySweepFilter(
+            sweep_type="lowpass",
+            target_freq=crossover_freq,
+            duration=fadeout_eq_duration,
+            start_time=fadeout_eq_start,
+            sweep_direction="fade_in",
+            poles=1,
+            curve_type=fadeout_curve,
+            stream_type="fadeout",
         )
-        filters.extend(frequency_filters)
+        self.filters.append(fadeout_sweep)
 
-        # Apply linear crossfade for now since we already use EQ sweeps for smoothness
-        filters.append(f"[fadeout_eq][fadein_eq]acrossfade=d={crossfade_duration}")
+        # Create high pass filter on the incoming track (high-pass → unfiltered)
+        # Quicker highpass removal to avoid lingering vocals after crossfade
+        fadein_eq_duration = crossfade_duration / 1.5
+        fadein_sweep = FrequencySweepFilter(
+            sweep_type="highpass",
+            target_freq=crossover_freq,
+            duration=fadein_eq_duration,
+            start_time=0,
+            sweep_direction="fade_out",
+            poles=1,
+            curve_type=fadein_curve,
+            stream_type="fadein",
+        )
+        self.filters.append(fadein_sweep)
 
-        return filters
+        # Add final crossfade filter
+        crossfade_filter = CrossfadeFilter(crossfade_duration=crossfade_duration)
+        self.filters.append(crossfade_filter)
 
-    def _calculate_crossfade_duration(
-        self,
-        crossfade_bars: int,
-        fade_in_analysis: SmartFadesAnalysis,
-    ) -> float:
+    def _calculate_crossfade_duration(self, crossfade_bars: int) -> float:
         """Calculate final crossfade duration based on musical bars and BPM."""
         # Calculate crossfade duration based on incoming track's BPM
-        # This ensures a musically consistent crossfade length regardless of beat positions
         beats_per_bar = 4
-        seconds_per_beat = 60.0 / fade_in_analysis.bpm
+        seconds_per_beat = 60.0 / self.fade_in_analysis.bpm
         musical_duration = crossfade_bars * beats_per_bar * seconds_per_beat
 
         # Apply buffer constraint
@@ -586,117 +742,106 @@ class SmartFadesMixer:
 
         return actual_duration
 
-    def _extrapolate_downbeats(
-        self,
-        downbeats: npt.NDArray[np.float64],
-        tempo_factor: float,
-        buffer_size: float = SMART_CROSSFADE_DURATION,
-    ) -> npt.NDArray[np.float64]:
-        """Extrapolate downbeats based on actual intervals when detection is incomplete.
-
-        This is needed when we want to perform beat alignment in an 'atmospheric' outro
-        that does not have any detected downbeats.
-        """
-        if len(downbeats) < 3:
-            # Need at least 3 downbeats to reliably calculate interval
-            return downbeats / tempo_factor
+    def _calculate_optimal_crossfade_bars(self) -> int:
+        """Calculate optimal crossfade bars that fit in available buffer."""
+        bpm_in = self.fade_in_analysis.bpm
+        bpm_out = self.fade_out_analysis.bpm
+        bpm_diff_percent = abs(1.0 - bpm_in / bpm_out) * 100
 
-        # Adjust detected downbeats for time stretching first
-        adjusted_downbeats = downbeats / tempo_factor
-        last_downbeat = adjusted_downbeats[-1]
+        # Calculate ideal bars based on BPM compatibility
+        ideal_bars = 10 if bpm_diff_percent <= self.time_stretch_bpm_percentage_threshold else 6
 
-        # If the last downbeat is close to the buffer end, no extrapolation needed
-        if last_downbeat >= buffer_size - 5:
-            return adjusted_downbeats
+        # Reduce bars until it fits in the fadein buffer
+        for bars in [ideal_bars, 8, 6, 4, 2, 1]:
+            if bars > ideal_bars:
+                continue
 
-        # Calculate intervals from ORIGINAL downbeats (before time stretching)
-        intervals = np.diff(downbeats)
-        median_interval = float(np.median(intervals))
-        std_interval = float(np.std(intervals))
+            fadein_start_pos = self._calculate_optimal_fade_timing(bars)
+            if fadein_start_pos is None:
+                continue
 
-        # Only extrapolate if intervals are consistent (low standard deviation)
-        if std_interval > 0.2:
-            self.logger.debug(
-                "Downbeat intervals too inconsistent (std=%.3fs) for extrapolation",
-                std_interval,
-            )
-            return adjusted_downbeats
+            # Calculate what the duration would be
+            test_duration = self._calculate_crossfade_duration(crossfade_bars=bars)
 
-        # Adjust the interval for time stretching
-        # When slowing down (tempo_factor < 1.0), intervals get longer
-        adjusted_interval = median_interval / tempo_factor
+            # Check if it fits in fadein buffer
+            fadein_buffer = SMART_CROSSFADE_DURATION - fadein_start_pos
+            if test_duration <= fadein_buffer:
+                if bars < ideal_bars:
+                    self.logger.debug(
+                        "Reduced crossfade from %d to %d bars (fadein buffer=%.1fs, needed=%.1fs)",
+                        ideal_bars,
+                        bars,
+                        fadein_buffer,
+                        test_duration,
+                    )
+                return bars
 
-        # Extrapolate forward from last adjusted downbeat using adjusted interval
-        extrapolated = []
-        current_pos = last_downbeat + adjusted_interval
-        max_extrapolation_distance = 25.0  # Don't extrapolate more than 25s
+        # Fall back to 1 bar if nothing else fits
+        return 1
 
-        while (
-            current_pos < buffer_size
-            and (current_pos - last_downbeat) <= max_extrapolation_distance
-        ):
-            extrapolated.append(current_pos)
-            current_pos += adjusted_interval
+    def _calculate_optimal_fade_timing(self, crossfade_bars: int) -> float | None:
+        """Calculate beat positions for alignment."""
+        beats_per_bar = 4
 
-        if extrapolated:
-            self.logger.debug(
-                "Extrapolated %d downbeats (adjusted_interval=%.3fs, original=%.3fs) "
-                "from %.2fs to %.2fs",
-                len(extrapolated),
-                adjusted_interval,
-                median_interval,
-                last_downbeat,
-                extrapolated[-1],
-            )
-            # Combine adjusted detected downbeats and extrapolated downbeats
-            return np.concatenate([adjusted_downbeats, np.array(extrapolated)])
+        def calculate_beat_positions(
+            fade_out_beats: npt.NDArray[np.float64],
+            fade_in_beats: npt.NDArray[np.float64],
+            num_beats: int,
+        ) -> float | None:
+            """Calculate start positions from beat arrays."""
+            if len(fade_out_beats) < num_beats or len(fade_in_beats) < num_beats:
+                return None
 
-        return adjusted_downbeats
+            fade_in_slice = fade_in_beats[:num_beats]
+            return float(fade_in_slice[0])
+
+        # Try downbeats first for most musical timing
+        downbeat_positions = calculate_beat_positions(
+            self.extrapolated_fadeout_downbeats, self.fade_in_analysis.downbeats, crossfade_bars
+        )
+        if downbeat_positions:
+            return downbeat_positions
+
+        # Try regular beats if downbeats insufficient
+        required_beats = crossfade_bars * beats_per_bar
+        beat_positions = calculate_beat_positions(
+            self.fade_out_analysis.beats, self.fade_in_analysis.beats, required_beats
+        )
+        if beat_positions:
+            return beat_positions
+
+        # Fallback: No beat alignment possible
+        self.logger.debug("No beat alignment possible (insufficient beats)")
+        return None
 
     def _adjust_crossfade_to_downbeats(
         self,
-        fade_out_analysis: SmartFadesAnalysis,
         crossfade_duration: float,
         fadein_start_pos: float | None,
-        tempo_factor: float,
     ) -> float:
-        """Adjust crossfade duration to align with outgoing track's downbeats.
-
-        This ensures the crossfade starts on a downbeat of the outgoing track,
-        preventing echo-ey sounds when both tracks have kicks during the crossfade.
-
-        The downbeat positions are adjusted for time stretching - when tempo_factor < 1.0
-        (slowing down), beats take longer to reach their position in the stretched audio.
-        """
+        """Adjust crossfade duration to align with outgoing track's downbeats."""
         # If we don't have downbeats or beat alignment is disabled, return original duration
-        if len(fade_out_analysis.downbeats) == 0 or fadein_start_pos is None:
+        if len(self.extrapolated_fadeout_downbeats) == 0 or fadein_start_pos is None:
             return crossfade_duration
 
-        # Extrapolate downbeats if needed (e.g., when beat detection is incomplete)
-        # This returns downbeats already adjusted for time stretching
-        adjusted_downbeats = self._extrapolate_downbeats(
-            fade_out_analysis.downbeats, tempo_factor=tempo_factor
-        )
-
         # Calculate where the crossfade would start in the buffer
         ideal_start_pos = SMART_CROSSFADE_DURATION - crossfade_duration
 
-        # Debug: Show all downbeats and the ideal position
+        # Debug logging
         self.logger.debug(
             "Downbeat adjustment - ideal_start=%.2fs (buffer=%.1fs - crossfade=%.2fs), "
-            "fadein_start=%.2fs, tempo_factor=%.4f",
+            "fadein_start=%.2fs",
             ideal_start_pos,
             SMART_CROSSFADE_DURATION,
             crossfade_duration,
             fadein_start_pos,
-            tempo_factor,
         )
 
         # Find the closest downbeats (earlier and later)
         earlier_downbeat = None
         later_downbeat = None
 
-        for downbeat in adjusted_downbeats:
+        for downbeat in self.extrapolated_fadeout_downbeats:
             if downbeat <= ideal_start_pos:
                 earlier_downbeat = downbeat
             elif downbeat > ideal_start_pos and later_downbeat is None:
@@ -706,7 +851,6 @@ class SmartFadesMixer:
         # Try earlier downbeat first (longer crossfade)
         if earlier_downbeat is not None:
             adjusted_duration = float(SMART_CROSSFADE_DURATION - earlier_downbeat)
-            # Check if this fits in the buffer
             if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION:
                 if abs(adjusted_duration - crossfade_duration) > 0.1:
                     self.logger.debug(
@@ -721,7 +865,6 @@ class SmartFadesMixer:
         # Try later downbeat (shorter crossfade)
         if later_downbeat is not None:
             adjusted_duration = float(SMART_CROSSFADE_DURATION - later_downbeat)
-            # Check if this fits in the buffer
             if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION:
                 if abs(adjusted_duration - crossfade_duration) > 0.1:
                     self.logger.debug(
@@ -740,351 +883,264 @@ class SmartFadesMixer:
         )
         return crossfade_duration
 
-    def _calculate_optimal_crossfade_bars(
-        self, fade_out_analysis: SmartFadesAnalysis, fade_in_analysis: SmartFadesAnalysis
-    ) -> int:
-        """Calculate optimal crossfade bars that fit in available buffer."""
-        bpm_in = fade_in_analysis.bpm
-        bpm_out = fade_out_analysis.bpm
-        bpm_diff_percent = abs(1.0 - bpm_in / bpm_out) * 100
-
-        # Calculate ideal bars based on BPM compatibility. We link this to time stretching
-        # so we avoid extreme tempo changes over short fades.
-        ideal_bars = 10 if bpm_diff_percent <= TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD else 6
-
-        # We could encounter songs that have a long athmospheric intro without any downbeats
-        # In those cases, we need to reduce the bars until it fits in the fadein buffer.
-        for bars in [ideal_bars, 8, 6, 4, 2, 1]:
-            if bars > ideal_bars:
-                continue  # Skip bars longer than optimal
-
-            fadeout_start_pos, fadein_start_pos = self._calculate_optimal_fade_timing(
-                fade_out_analysis, fade_in_analysis, bars
-            )
-            if fadeout_start_pos is None or fadein_start_pos is None:
-                continue
-
-            # Calculate what the duration would be
-            test_duration = self._calculate_crossfade_duration(
-                crossfade_bars=bars,
-                fade_in_analysis=fade_in_analysis,
-            )
-
-            # Check if it fits in fadein buffer
-            fadein_buffer = SMART_CROSSFADE_DURATION - fadein_start_pos
-            if test_duration <= fadein_buffer:
-                if bars < ideal_bars:
-                    self.logger.debug(
-                        "Reduced crossfade from %d to %d bars (fadein buffer=%.1fs, needed=%.1fs)",
-                        ideal_bars,
-                        bars,
-                        fadein_buffer,
-                        test_duration,
-                    )
-                return bars
-
-        # Fall back to 1 bar if nothing else fits
-        return 1
-
-    def _calculate_optimal_fade_timing(
-        self,
-        fade_out_analysis: SmartFadesAnalysis,
-        fade_in_analysis: SmartFadesAnalysis,
-        crossfade_bars: int,
-    ) -> tuple[float | None, float | None]:
-        """Calculate beat positions for alignment."""
-        beats_per_bar = 4
-
-        # Helper function to calculate beat positions from beat arrays
-        def calculate_beat_positions(
-            fade_out_beats: npt.NDArray[np.float64],
-            fade_in_beats: npt.NDArray[np.float64],
-            num_beats: int,
-        ) -> tuple[float, float] | None:
-            """Calculate start positions from beat arrays with phantom downbeat support."""
-            if len(fade_out_beats) < num_beats or len(fade_in_beats) < num_beats:
-                return None
 
-            fade_out_slice = fade_out_beats[-num_beats:]
+class StandardCrossFade(SmartFade):
+    """Standard crossfade class that implements a standard crossfade mode."""
 
-            # For fadein, find the earliest downbeat that fits in buffer
-            fade_in_slice = fade_in_beats[:num_beats]
-            fadein_start_pos = fade_in_slice[0]
+    def __init__(self, crossfade_duration: float = 10.0) -> None:
+        """Initialize StandardCrossFade with crossfade duration."""
+        self.crossfade_duration = crossfade_duration
+        super().__init__()
 
-            fadeout_start_pos = fade_out_slice[0]
-            return fadeout_start_pos, fadein_start_pos
+    def _build(self) -> None:
+        """Build the standard crossfade filter chain."""
+        self.filters = [
+            CrossfadeFilter(crossfade_duration=self.crossfade_duration),
+        ]
 
-        # Try downbeats first for most musical timing
-        downbeat_positions = calculate_beat_positions(
-            fade_out_analysis.downbeats, fade_in_analysis.downbeats, crossfade_bars
+    async def apply(
+        self, fade_out_part: bytes, fade_in_part: bytes, pcm_format: AudioFormat
+    ) -> bytes:
+        """Apply the standard crossfade to the given PCM audio parts."""
+        # We need to override the default apply here, since standard crossfade only needs to be
+        # applied to the overlapping parts, not the full buffers.
+        crossfade_size = int(pcm_format.pcm_sample_size * self.crossfade_duration)
+        # Pre-crossfade: outgoing track minus the crossfaded portion
+        pre_crossfade = fade_out_part[:-crossfade_size]
+        # Post-crossfade: incoming track minus the crossfaded portion
+        post_crossfade = fade_in_part[crossfade_size:]
+        # Adjust portions to exact crossfade size
+        adjusted_fade_in_part = fade_in_part[:crossfade_size]
+        adjusted_fade_out_part = fade_out_part[-crossfade_size:]
+        # Adjust the duration to match actual sizes
+        self.crossfade_duration = min(
+            len(adjusted_fade_in_part) / pcm_format.pcm_sample_size,
+            len(adjusted_fade_out_part) / pcm_format.pcm_sample_size,
         )
-        if downbeat_positions:
-            return downbeat_positions
-
-        # Try regular beats if downbeats insufficient
-        required_beats = crossfade_bars * beats_per_bar
-        beat_positions = calculate_beat_positions(
-            fade_out_analysis.beats, fade_in_analysis.beats, required_beats
+        # Crossfaded portion: user's configured duration
+        crossfaded_section = await super().apply(
+            adjusted_fade_out_part, adjusted_fade_in_part, pcm_format
         )
-        if beat_positions:
-            return beat_positions
-
-        # Fallback: No beat alignment possible
-        self.logger.debug("No beat alignment possible (insufficient beats)")
-        return None, None
-
-    def _create_frequency_sweep_filter(
-        self,
-        input_label: str,
-        output_label: str,
-        sweep_type: str,  # 'lowpass' or 'highpass'
-        target_freq: int,
-        duration: float,
-        start_time: float = 0.0,
-        sweep_direction: str = "fade_in",  # 'fade_in' or 'fade_out'
-        poles: int = 2,
-        curve_type: str = "linear",  # 'linear', 'exponential', 'logarithmic'
-    ) -> list[str]:
-        """Generate FFmpeg filters for frequency sweep effect."""
-        orig_label = f"{output_label}_orig"
-        filter_label = f"{output_label}_to{sweep_type[:2]}"
-        filtered_label = f"{output_label}_filtered"
-        orig_faded_label = f"{output_label}_orig_faded"
-        filtered_faded_label = f"{output_label}_filtered_faded"
+        # Full result: everything concatenated
+        return pre_crossfade + crossfaded_section + post_crossfade
 
-        # Generate volume expression based on curve type
-        def generate_volume_expr(start: float, dur: float, direction: str, curve: str) -> str:
-            t_expr = f"t-{start}"  # Time relative to start
-            norm_t = f"min(max({t_expr},0),{dur})/{dur}"  # Normalized 0-1
-
-            if curve == "exponential":
-                # Exponential curve for smoother transitions
-                if direction == "up":
-                    return f"'pow({norm_t},2)':eval=frame"
-                else:
-                    return f"'1-pow({norm_t},2)':eval=frame"
-            elif curve == "logarithmic":
-                # Logarithmic curve for more aggressive initial change
-                if direction == "up":
-                    return f"'sqrt({norm_t})':eval=frame"
-                else:
-                    return f"'1-sqrt({norm_t})':eval=frame"
-            elif direction == "up":
-                return f"'{norm_t}':eval=frame"
-            else:
-                return f"'1-{norm_t}':eval=frame"
 
-        # Determine volume ramp directions based on sweep direction
-        if sweep_direction == "fade_in":
-            # Fade from dry to wet (unfiltered to filtered)
-            orig_direction = "down"
-            filter_direction = "up"
-        else:  # fade_out
-            # Fade from wet to dry (filtered to unfiltered)
-            orig_direction = "up"
-            filter_direction = "down"
+#############################
+# SMART FADES MIXER LOGIC
+#############################
+class SmartFadesMixer:
+    """Smart fades mixer class that mixes tracks based on analysis data."""
 
-        # Build filter chain
-        return [
-            # Split input into two paths
-            f"{input_label}asplit=2[{orig_label}][{filter_label}]",
-            # Apply frequency filter to one path
-            f"[{filter_label}]{sweep_type}=f={target_freq}:poles={poles}[{filtered_label}]",
-            # Apply time-varying volume to original path
-            (
-                f"[{orig_label}]volume="
-                f"{generate_volume_expr(start_time, duration, orig_direction, curve_type)}"
-                f"[{orig_faded_label}]"
-            ),
-            # Apply time-varying volume to filtered path
-            (
-                f"[{filtered_label}]volume="
-                f"{generate_volume_expr(start_time, duration, filter_direction, curve_type)}"
-                f"[{filtered_faded_label}]"
-            ),
-            # Mix the two paths together
-            (
-                f"[{orig_faded_label}][{filtered_faded_label}]"
-                f"amix=inputs=2:duration=longest:normalize=0[{output_label}]"
-            ),
-        ]
+    def __init__(self, mass: MusicAssistant) -> None:
+        """Initialize smart fades mixer."""
+        self.mass = mass
+        self.logger = logging.getLogger(__name__)
+        # TODO: Refactor into stream (or metadata) controller after we have split the controllers
+        self.analyzer = SmartFadesAnalyzer(mass)
 
-    def _trim_incoming_track_to_downbeat(
+    async def mix(
         self,
-        fadein_start_pos: float | None,
-        fadeout_input_label: str = "[0]",
-        fadein_input_label: str = "[1]",
-    ) -> list[str]:
-        """Perform beat alignment preprocessing.
-
-        The incoming track is trimmed to its first downbeat position.
-        No adjustment is needed for time stretching since the incoming track
-        is not stretched - it's already at the target BPM.
-        """
-        # Just relabel in case we cannot perform beat alignment
-        if fadein_start_pos is None:
-            return [
-                f"{fadeout_input_label}anull[fadeout_beatalign]",  # codespell:ignore anull
-                f"{fadein_input_label}anull[fadein_beatalign]",  # codespell:ignore anull
-            ]
-
-        # Trim incoming track to start at first downbeat position
-        return [
-            f"{fadeout_input_label}anull[fadeout_beatalign]",  # codespell:ignore anull
-            f"{fadein_input_label}atrim=start={fadein_start_pos},asetpts=PTS-STARTPTS[fadein_beatalign]",
-        ]
+        fade_in_part: bytes,
+        fade_out_part: bytes,
+        fade_in_streamdetails: StreamDetails,
+        fade_out_streamdetails: StreamDetails,
+        pcm_format: AudioFormat,
+        standard_crossfade_duration: int = 10,
+        mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
+    ) -> bytes:
+        """Apply crossfade with internal state management and smart/standard fallback logic."""
+        if mode == SmartFadesMode.DISABLED:
+            # No crossfade, just concatenate
+            # Note that this should not happen since we check this before calling mix()
+            # but just to be sure...
+            return fade_out_part + fade_in_part
 
-    def _create_time_stretch_filters(
-        self,
-        fade_out_analysis: SmartFadesAnalysis,
-        fade_in_analysis: SmartFadesAnalysis,
-        crossfade_bars: int,
-        crossfade_duration: float,
-    ) -> tuple[list[str], float]:
-        """Create FFmpeg filters to gradually adjust tempo from original BPM to target BPM.
+        # strip silence from end of audio of fade_out_part
+        fade_out_part = await strip_silence(
+            self.mass,
+            fade_out_part,
+            pcm_format=pcm_format,
+            reverse=True,
+        )
+        # Ensure frame alignment after silence stripping
+        fade_out_part = align_audio_to_frame_boundary(fade_out_part, pcm_format)
 
-        The tempo ramping is completed before the crossfade starts to ensure perfect beat alignment
-        throughout the entire crossfade region.
-        """
-        # Check if time stretching should be applied (BPM difference < 3%)
-        original_bpm = fade_out_analysis.bpm
-        target_bpm = fade_in_analysis.bpm
-        bpm_ratio = target_bpm / original_bpm
-        bpm_diff_percent = abs(1.0 - bpm_ratio) * 100
+        # strip silence from begin of audio of fade_in_part
+        fade_in_part = await strip_silence(
+            self.mass,
+            fade_in_part,
+            pcm_format=pcm_format,
+            reverse=False,
+        )
+        # Ensure frame alignment after silence stripping
+        fade_in_part = align_audio_to_frame_boundary(fade_in_part, pcm_format)
+        if mode == SmartFadesMode.STANDARD_CROSSFADE:
+            smart_fade: SmartFade = StandardCrossFade(
+                crossfade_duration=standard_crossfade_duration
+            )
+            return await smart_fade.apply(
+                fade_out_part,
+                fade_in_part,
+                pcm_format,
+            )
+        # Attempt smart crossfade with analysis data
+        fade_out_analysis: SmartFadesAnalysis | None
+        if stored_analysis := await self.mass.music.get_smart_fades_analysis(
+            fade_out_streamdetails.item_id,
+            fade_out_streamdetails.provider,
+            SmartFadesAnalysisFragment.OUTRO,
+        ):
+            fade_out_analysis = stored_analysis
+        else:
+            fade_out_analysis = await self.analyzer.analyze(
+                fade_out_streamdetails.item_id,
+                fade_out_streamdetails.provider,
+                SmartFadesAnalysisFragment.OUTRO,
+                fade_out_part,
+                pcm_format,
+            )
 
-        # If no time stretching needed, return passthrough filter and no tempo change
-        if not (
-            0.1 < bpm_diff_percent <= TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD and crossfade_bars > 4
+        fade_in_analysis: SmartFadesAnalysis | None
+        if stored_analysis := await self.mass.music.get_smart_fades_analysis(
+            fade_in_streamdetails.item_id,
+            fade_in_streamdetails.provider,
+            SmartFadesAnalysisFragment.INTRO,
         ):
-            return ["[0]anull[fadeout_stretched]"], 1.0  # codespell:ignore anull
+            fade_in_analysis = stored_analysis
+        else:
+            fade_in_analysis = await self.analyzer.analyze(
+                fade_in_streamdetails.item_id,
+                fade_in_streamdetails.provider,
+                SmartFadesAnalysisFragment.INTRO,
+                fade_in_part,
+                pcm_format,
+            )
+        if (
+            fade_out_analysis
+            and fade_in_analysis
+            and fade_out_analysis.confidence > 0.3
+            and fade_in_analysis.confidence > 0.3
+            and mode == SmartFadesMode.SMART_CROSSFADE
+        ):
+            try:
+                smart_fade = SmartCrossFade(fade_out_analysis, fade_in_analysis)
+                return await smart_fade.apply(
+                    fade_out_part,
+                    fade_in_part,
+                    pcm_format,
+                )
+            except Exception as e:
+                self.logger.warning(
+                    "Smart crossfade failed: %s, falling back to standard crossfade", e
+                )
 
-        # Log that we're applying time stretching
-        self.logger.debug(
-            "Time stretch: %.1f%% BPM diff, adjusting %.1f -> %.1f BPM, crossfade starts at %.1fs",
-            bpm_diff_percent,
-            original_bpm,
-            target_bpm,
-            SMART_CROSSFADE_DURATION - crossfade_duration,
+        # Always fallback to Standard Crossfade in case something goes wrong
+        smart_fade = StandardCrossFade(crossfade_duration=standard_crossfade_duration)
+        return await smart_fade.apply(
+            fade_out_part,
+            fade_in_part,
+            pcm_format,
         )
 
-        # Use uniform rubberband time stretching for the entire buffer
-        # This ensures downbeat adjustment calculations are accurate and beat alignment is perfect
-        # Rubberband is a high-quality music-specific algorithm optimized for music
-        self.logger.debug(
-            "Time stretch (rubberband uniform): %.1f BPM -> %.1f BPM (factor=%.4f)",
-            original_bpm,
-            target_bpm,
-            bpm_ratio,
-        )
-        return [
-            f"[0]rubberband=tempo={bpm_ratio:.6f}:transients=mixed:detector=soft:pitchq=quality"
-            "[fadeout_stretched]"
-        ], bpm_ratio
 
-    def _apply_eq_filters(
-        self,
-        fade_out_analysis: SmartFadesAnalysis,
-        fade_in_analysis: SmartFadesAnalysis,
-        fade_out_label: str,
-        fade_in_label: str,
-        crossfade_duration: float,
-        crossfade_bars: int,
-    ) -> list[str]:
-        """Create LP / HP complementary filters using frequency sweeps for smooth transitions."""
-        # Calculate target frequency based on average BPM
-        avg_bpm = (fade_out_analysis.bpm + fade_in_analysis.bpm) / 2
-        bpm_ratio = fade_in_analysis.bpm / fade_out_analysis.bpm
+# HELPER METHODS
+def get_bpm_diff_percentage(bpm1: float, bpm2: float) -> float:
+    """Calculate BPM difference percentage between two BPM values."""
+    return abs(1.0 - bpm1 / bpm2) * 100
+
+
+def extrapolate_downbeats(
+    downbeats: npt.NDArray[np.float64],
+    tempo_factor: float,
+    buffer_size: float = SMART_CROSSFADE_DURATION,
+    bpm: float | None = None,
+) -> npt.NDArray[np.float64]:
+    """Extrapolate downbeats based on actual intervals when detection is incomplete.
+
+    This is needed when we want to perform beat alignment in an 'atmospheric' outro
+    that does not have any detected downbeats.
+
+    Args:
+        downbeats: Array of detected downbeat positions in seconds
+        tempo_factor: Tempo adjustment factor for time stretching
+        buffer_size: Maximum buffer size in seconds
+        bpm: Optional BPM for validation when extrapolating with only 2 downbeats
+    """
+    # Handle case with exactly 2 downbeats (with BPM validation)
+    if len(downbeats) == 2 and bpm is not None:
+        interval = float(downbeats[1] - downbeats[0])
+
+        # Expected interval for this BPM (assuming 4/4 time signature)
+        expected_interval = (60.0 / bpm) * 4
+
+        # Only extrapolate if interval matches BPM within 15% tolerance
+        if abs(interval - expected_interval) / expected_interval < 0.15:
+            # Adjust detected downbeats for time stretching first
+            adjusted_downbeats = downbeats / tempo_factor
+            last_downbeat = adjusted_downbeats[-1]
+
+            # If the last downbeat is close to the buffer end, no extrapolation needed
+            if last_downbeat >= buffer_size - 5:
+                return adjusted_downbeats
+
+            # Adjust the interval for time stretching
+            adjusted_interval = interval / tempo_factor
+
+            # Extrapolate forward from last adjusted downbeat using adjusted interval
+            extrapolated = []
+            current_pos = last_downbeat + adjusted_interval
+            max_extrapolation_distance = 125.0  # Don't extrapolate more than 25s
+
+            while (
+                current_pos < buffer_size
+                and (current_pos - last_downbeat) <= max_extrapolation_distance
+            ):
+                extrapolated.append(current_pos)
+                current_pos += adjusted_interval
+
+            if extrapolated:
+                # Combine adjusted detected downbeats and extrapolated downbeats
+                return np.concatenate([adjusted_downbeats, np.array(extrapolated)])
 
-        # 90 BPM -> 1500Hz, 140 BPM -> 2500Hz
-        crossover_freq = int(np.clip(1500 + (avg_bpm - 90) * 20, 1500, 2500))
+            return adjusted_downbeats
+        # else: interval doesn't match BPM, fall through to return original
 
-        # Adjust for BPM mismatch
-        if abs(bpm_ratio - 1.0) > 0.3:
-            crossover_freq = int(crossover_freq * 0.85)
+    if len(downbeats) < 2:
+        # Need at least 2 downbeats to extrapolate
+        return downbeats / tempo_factor
 
-        # Extended lowpass effect to gradually remove bass frequencies
-        fadeout_eq_duration = min(max(crossfade_duration * 2.5, 8.0), SMART_CROSSFADE_DURATION)
+    # Adjust detected downbeats for time stretching first
+    adjusted_downbeats = downbeats / tempo_factor
+    last_downbeat = adjusted_downbeats[-1]
 
-        # Quicker highpass removal to avoid lingering vocals after crossfade
-        fadein_eq_duration = crossfade_duration / 1.5
+    # If the last downbeat is close to the buffer end, no extrapolation needed
+    if last_downbeat >= buffer_size - 5:
+        return adjusted_downbeats
 
-        # Calculate when the EQ sweep should start
-        # The crossfade always happens at the END of the buffer, regardless of beat alignment
-        fadeout_eq_start = max(0, SMART_CROSSFADE_DURATION - fadeout_eq_duration)
+    # Calculate intervals from ORIGINAL downbeats (before time stretching)
+    intervals = np.diff(downbeats)
+    median_interval = float(np.median(intervals))
+    std_interval = float(np.std(intervals))
 
-        # For shorter fades, use exp/exp curves to avoid abruptness
-        if crossfade_bars < 8:
-            fadeout_curve = "exponential"
-            fadein_curve = "exponential"
-        # For long fades, use log/linear curves
-        else:
-            # Use logarithmic curve to give the next track more space
-            fadeout_curve = "logarithmic"
-            # Use linear curve for transition, predictable and not too abrupt
-            fadein_curve = "linear"
+    # Only extrapolate if intervals are consistent (low standard deviation)
+    if std_interval > 0.2:
+        return adjusted_downbeats
 
-        self.logger.debug(
-            "EQ: crossover=%dHz, EQ fadeout duration=%.1fs,"
-            " EQ fadein duration=%.1fs, BPM=%.1f, BPM ratio=%.2f,"
-            " EQ curves: %s/%s",
-            crossover_freq,
-            fadeout_eq_duration,
-            fadein_eq_duration,
-            avg_bpm,
-            bpm_ratio,
-            fadeout_curve,
-            fadein_curve,
-        )
+    # Adjust the interval for time stretching
+    # When slowing down (tempo_factor < 1.0), intervals get longer
+    adjusted_interval = median_interval / tempo_factor
 
-        # fadeout (unfiltered → low-pass)
-        fadeout_filters = self._create_frequency_sweep_filter(
-            input_label=fade_out_label,
-            output_label="fadeout_eq",
-            sweep_type="lowpass",
-            target_freq=crossover_freq,
-            duration=fadeout_eq_duration,
-            start_time=fadeout_eq_start,
-            sweep_direction="fade_in",
-            poles=1,
-            curve_type=fadeout_curve,
-        )
+    # Extrapolate forward from last adjusted downbeat using adjusted interval
+    extrapolated = []
+    current_pos = last_downbeat + adjusted_interval
+    max_extrapolation_distance = 25.0  # Don't extrapolate more than 25s
 
-        # fadein (high-pass → unfiltered)
-        fadein_filters = self._create_frequency_sweep_filter(
-            input_label=fade_in_label,
-            output_label="fadein_eq",
-            sweep_type="highpass",
-            target_freq=crossover_freq,
-            duration=fadein_eq_duration,
-            start_time=0,
-            sweep_direction="fade_out",
-            poles=1,
-            curve_type=fadein_curve,
-        )
+    while current_pos < buffer_size and (current_pos - last_downbeat) <= max_extrapolation_distance:
+        extrapolated.append(current_pos)
+        current_pos += adjusted_interval
 
-        return fadeout_filters + fadein_filters
+    if extrapolated:
+        # Combine adjusted detected downbeats and extrapolated downbeats
+        return np.concatenate([adjusted_downbeats, np.array(extrapolated)])
 
-    # FALLBACK DEFAULT CROSSFADE
-    async def _default_crossfade(
-        self,
-        fade_in_part: bytes,
-        fade_out_part: bytes,
-        pcm_format: AudioFormat,
-        crossfade_duration: int = 10,
-    ) -> bytes:
-        """Apply a standard crossfade without smart analysis."""
-        self.logger.debug("Applying standard crossfade of %ds", crossfade_duration)
-        crossfade_size = int(pcm_format.pcm_sample_size * crossfade_duration)
-        # Pre-crossfade: outgoing track minus the crossfaded portion
-        pre_crossfade = fade_out_part[:-crossfade_size]
-        # Crossfaded portion: user's configured duration
-        crossfaded_section = await crossfade_pcm_parts(
-            fade_in_part[:crossfade_size],
-            fade_out_part[-crossfade_size:],
-            pcm_format=pcm_format,
-            fade_out_pcm_format=pcm_format,
-        )
-        # Post-crossfade: incoming track minus the crossfaded portion
-        post_crossfade = fade_in_part[crossfade_size:]
-        # Full result: everything concatenated
-        return pre_crossfade + crossfaded_section + post_crossfade
+    return adjusted_downbeats
index 63bd155eb5ac3ceda8361fa51a1d03629767f583..bff3d67ecc2598102b4fbf294bfa73c641e15d5b 100644 (file)
@@ -12,7 +12,7 @@ from mashumaro.config import BaseConfig
 class SmartFadesMode(StrEnum):
     """Smart fades modes."""
 
-    SMART_FADES = "smart_fades"  # Use smart fades with beat matching and EQ filters
+    SMART_CROSSFADE = "smart_crossfade"  # Use smart crossfade with beat matching and EQ filters
     STANDARD_CROSSFADE = "standard_crossfade"  # Use standard crossfade only
     DISABLED = "disabled"  # No crossfade