Fix: Smart fades beat alignment (#2477)
authorMarvin Schenkel <marvinschenkel@gmail.com>
Fri, 3 Oct 2025 20:11:32 +0000 (22:11 +0200)
committerGitHub <noreply@github.com>
Fri, 3 Oct 2025 20:11:32 +0000 (22:11 +0200)
music_assistant/helpers/smart_fades.py

index a91023d508de48564bb43f7d9c47cb5191988945..a13d072d88466ed9f9da7208a92def2f399ab08d 100644 (file)
@@ -36,7 +36,7 @@ if TYPE_CHECKING:
 SMART_CROSSFADE_DURATION = 45
 ANALYSIS_FPS = 100
 # Only apply time stretching if BPM difference is < this %
-TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD = 8.0
+TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD = 5.0
 
 
 class SmartFadesAnalyzer:
@@ -427,17 +427,23 @@ class SmartFadesMixer:
 
         filters: list[str] = []
 
+        # Calculate initial crossfade duration (may be adjusted later for downbeat alignment)
+        initial_crossfade_duration = self._calculate_crossfade_duration(
+            crossfade_bars=crossfade_bars,
+            fade_in_analysis=fade_in_analysis,
+        )
+
+        # Create time stretch filters - needs to know crossfade duration to complete
+        # tempo ramping before the crossfade starts
         time_stretch_filters, tempo_factor = self._create_time_stretch_filters(
             fade_out_analysis=fade_out_analysis,
             fade_in_analysis=fade_in_analysis,
             crossfade_bars=crossfade_bars,
+            crossfade_duration=initial_crossfade_duration,
         )
         filters.extend(time_stretch_filters)
 
-        crossfade_duration = self._calculate_crossfade_duration(
-            crossfade_bars=crossfade_bars,
-            fade_in_analysis=fade_in_analysis,
-        )
+        crossfade_duration = initial_crossfade_duration
 
         # Check if we would have enough audio after beat alignment for the crossfade
         if (
@@ -453,9 +459,17 @@ class SmartFadesMixer:
             # Skip beat alignment
             fadein_start_pos = None
 
-        beat_align_filters = self._perform_beat_alignment(
+        # Adjust crossfade duration to align with outgoing track's downbeats
+        # This prevents echo-ey sounds when both tracks have kicks during the crossfade
+        crossfade_duration = self._adjust_crossfade_to_downbeats(
+            fade_out_analysis=fade_out_analysis,
+            crossfade_duration=crossfade_duration,
             fadein_start_pos=fadein_start_pos,
             tempo_factor=tempo_factor,
+        )
+
+        beat_align_filters = self._trim_incoming_track_to_downbeat(
+            fadein_start_pos=fadein_start_pos,
             fadeout_input_label="[fadeout_stretched]",
             fadein_input_label="[1]",
         )
@@ -509,6 +523,160 @@ class SmartFadesMixer:
 
         return actual_duration
 
+    def _extrapolate_downbeats(
+        self,
+        downbeats: npt.NDArray[np.float64],
+        tempo_factor: float,
+        buffer_size: float = SMART_CROSSFADE_DURATION,
+    ) -> npt.NDArray[np.float64]:
+        """Extrapolate downbeats based on actual intervals when detection is incomplete.
+
+        This is needed when we want to perform beat alignment in an 'atmospheric' outro
+        that does not have any detected downbeats.
+        """
+        if len(downbeats) < 3:
+            # Need at least 3 downbeats to reliably calculate interval
+            return downbeats / tempo_factor
+
+        # Adjust detected downbeats for time stretching first
+        adjusted_downbeats = downbeats / tempo_factor
+        last_downbeat = adjusted_downbeats[-1]
+
+        # If the last downbeat is close to the buffer end, no extrapolation needed
+        if last_downbeat >= buffer_size - 5:
+            return adjusted_downbeats
+
+        # Calculate intervals from ORIGINAL downbeats (before time stretching)
+        intervals = np.diff(downbeats)
+        median_interval = float(np.median(intervals))
+        std_interval = float(np.std(intervals))
+
+        # Only extrapolate if intervals are consistent (low standard deviation)
+        if std_interval > 0.2:
+            self.logger.debug(
+                "Downbeat intervals too inconsistent (std=%.3fs) for extrapolation",
+                std_interval,
+            )
+            return adjusted_downbeats
+
+        # Adjust the interval for time stretching
+        # When slowing down (tempo_factor < 1.0), intervals get longer
+        adjusted_interval = median_interval / tempo_factor
+
+        # Extrapolate forward from last adjusted downbeat using adjusted interval
+        extrapolated = []
+        current_pos = last_downbeat + adjusted_interval
+        max_extrapolation_distance = 25.0  # Don't extrapolate more than 25s
+
+        while (
+            current_pos < buffer_size
+            and (current_pos - last_downbeat) <= max_extrapolation_distance
+        ):
+            extrapolated.append(current_pos)
+            current_pos += adjusted_interval
+
+        if extrapolated:
+            self.logger.debug(
+                "Extrapolated %d downbeats (adjusted_interval=%.3fs, original=%.3fs) "
+                "from %.2fs to %.2fs",
+                len(extrapolated),
+                adjusted_interval,
+                median_interval,
+                last_downbeat,
+                extrapolated[-1],
+            )
+            # Combine adjusted detected downbeats and extrapolated downbeats
+            return np.concatenate([adjusted_downbeats, np.array(extrapolated)])
+
+        return adjusted_downbeats
+
+    def _adjust_crossfade_to_downbeats(
+        self,
+        fade_out_analysis: SmartFadesAnalysis,
+        crossfade_duration: float,
+        fadein_start_pos: float | None,
+        tempo_factor: float,
+    ) -> float:
+        """Adjust crossfade duration to align with outgoing track's downbeats.
+
+        This ensures the crossfade starts on a downbeat of the outgoing track,
+        preventing echo-ey sounds when both tracks have kicks during the crossfade.
+
+        The downbeat positions are adjusted for time stretching - when tempo_factor < 1.0
+        (slowing down), beats take longer to reach their position in the stretched audio.
+        """
+        # If we don't have downbeats or beat alignment is disabled, return original duration
+        if len(fade_out_analysis.downbeats) == 0 or fadein_start_pos is None:
+            return crossfade_duration
+
+        # Extrapolate downbeats if needed (e.g., when beat detection is incomplete)
+        # This returns downbeats already adjusted for time stretching
+        adjusted_downbeats = self._extrapolate_downbeats(
+            fade_out_analysis.downbeats, tempo_factor=tempo_factor
+        )
+
+        # Calculate where the crossfade would start in the buffer
+        ideal_start_pos = SMART_CROSSFADE_DURATION - crossfade_duration
+
+        # Debug: Show all downbeats and the ideal position
+        self.logger.debug(
+            "Downbeat adjustment - ideal_start=%.2fs (buffer=%.1fs - crossfade=%.2fs), "
+            "fadein_start=%.2fs, tempo_factor=%.4f",
+            ideal_start_pos,
+            SMART_CROSSFADE_DURATION,
+            crossfade_duration,
+            fadein_start_pos,
+            tempo_factor,
+        )
+
+        # Find the closest downbeats (earlier and later)
+        earlier_downbeat = None
+        later_downbeat = None
+
+        for downbeat in adjusted_downbeats:
+            if downbeat <= ideal_start_pos:
+                earlier_downbeat = downbeat
+            elif downbeat > ideal_start_pos and later_downbeat is None:
+                later_downbeat = downbeat
+                break
+
+        # Try earlier downbeat first (longer crossfade)
+        if earlier_downbeat is not None:
+            adjusted_duration = float(SMART_CROSSFADE_DURATION - earlier_downbeat)
+            # Check if this fits in the buffer
+            if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION:
+                if abs(adjusted_duration - crossfade_duration) > 0.1:
+                    self.logger.debug(
+                        "Adjusted crossfade duration from %.2fs to %.2fs to align with "
+                        "downbeat at %.2fs (earlier)",
+                        crossfade_duration,
+                        adjusted_duration,
+                        earlier_downbeat,
+                    )
+                return adjusted_duration
+
+        # Try later downbeat (shorter crossfade)
+        if later_downbeat is not None:
+            adjusted_duration = float(SMART_CROSSFADE_DURATION - later_downbeat)
+            # Check if this fits in the buffer
+            if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION:
+                if abs(adjusted_duration - crossfade_duration) > 0.1:
+                    self.logger.debug(
+                        "Adjusted crossfade duration from %.2fs to %.2fs to align with "
+                        "downbeat at %.2fs (later)",
+                        crossfade_duration,
+                        adjusted_duration,
+                        later_downbeat,
+                    )
+                return adjusted_duration
+
+        # If no suitable downbeat found, return original duration
+        self.logger.debug(
+            "Could not adjust crossfade duration to downbeats, using original %.2fs",
+            crossfade_duration,
+        )
+        return crossfade_duration
+
     def _calculate_optimal_crossfade_bars(
         self, fade_out_analysis: SmartFadesAnalysis, fade_in_analysis: SmartFadesAnalysis
     ) -> int:
@@ -678,14 +846,18 @@ class SmartFadesMixer:
             ),
         ]
 
-    def _perform_beat_alignment(
+    def _trim_incoming_track_to_downbeat(
         self,
         fadein_start_pos: float | None,
-        tempo_factor: float,
         fadeout_input_label: str = "[0]",
         fadein_input_label: str = "[1]",
     ) -> list[str]:
-        """Perform beat alignment preprocessing."""
+        """Perform beat alignment preprocessing.
+
+        The incoming track is trimmed to its first downbeat position.
+        No adjustment is needed for time stretching since the incoming track
+        is not stretched - it's already at the target BPM.
+        """
         # Just relabel in case we cannot perform beat alignment
         if fadein_start_pos is None:
             return [
@@ -693,15 +865,10 @@ class SmartFadesMixer:
                 f"{fadein_input_label}anull[fadein_beatalign]",  # codespell:ignore anull
             ]
 
-        # When time stretching is applied, we need to compensate for the timing change
-        # If tempo_factor < 1.0 (slowing down), beats in fadeout take longer to reach
-        # If tempo_factor > 1.0 (speeding up), beats in fadeout arrive sooner
-        adjusted_fadein_start_pos = fadein_start_pos / tempo_factor
-
-        # Apply beat alignment: fadeout passes through, fadein trims to adjusted position
+        # Trim incoming track to start at first downbeat position
         return [
             f"{fadeout_input_label}anull[fadeout_beatalign]",  # codespell:ignore anull
-            f"{fadein_input_label}atrim=start={adjusted_fadein_start_pos},asetpts=PTS-STARTPTS[fadein_beatalign]",
+            f"{fadein_input_label}atrim=start={fadein_start_pos},asetpts=PTS-STARTPTS[fadein_beatalign]",
         ]
 
     def _create_time_stretch_filters(
@@ -709,8 +876,13 @@ class SmartFadesMixer:
         fade_out_analysis: SmartFadesAnalysis,
         fade_in_analysis: SmartFadesAnalysis,
         crossfade_bars: int,
+        crossfade_duration: float,
     ) -> tuple[list[str], float]:
-        """Create FFmpeg filters to gradually adjust tempo from original BPM to target BPM."""
+        """Create FFmpeg filters to gradually adjust tempo from original BPM to target BPM.
+
+        The tempo ramping is completed before the crossfade starts to ensure perfect beat alignment
+        throughout the entire crossfade region.
+        """
         # Check if time stretching should be applied (BPM difference < 3%)
         original_bpm = fade_out_analysis.bpm
         target_bpm = fade_in_analysis.bpm
@@ -725,105 +897,26 @@ class SmartFadesMixer:
 
         # Log that we're applying time stretching
         self.logger.debug(
-            "Time stretch: %.1f%% BPM diff, adjusting %.1f -> %.1f BPM over buffer",
+            "Time stretch: %.1f%% BPM diff, adjusting %.1f -> %.1f BPM, crossfade starts at %.1fs",
             bpm_diff_percent,
             original_bpm,
             target_bpm,
+            SMART_CROSSFADE_DURATION - crossfade_duration,
         )
 
-        # Calculate the tempo change factor
-        # atempo accepts values between 0.5 and 2.0 (can be chained for larger changes)
-        tempo_factor = bpm_ratio
-        buffer_duration = SMART_CROSSFADE_DURATION  # 45 seconds
-
-        # Calculate expected crossfade duration from bars for comparison
-        beats_per_bar = 4
-        seconds_per_beat = 60.0 / original_bpm
-        expected_crossfade_duration = crossfade_bars * beats_per_bar * seconds_per_beat
-
-        # For BPM differences < 3%, tempo_factor will be between 0.97 and 1.03
-        # This is well within atempo's range
-
-        # Validate tempo factor is within ffmpeg's atempo range
-        if not 0.5 <= tempo_factor <= 2.0:
-            self.logger.warning(
-                "Tempo factor %.4f out of range [0.5, 2.0], skipping time stretch",
-                tempo_factor,
-            )
-            return ["[0]anull[fadeout_stretched]"], 1.0  # codespell:ignore anull
-
-        # If the crossfade takes up most of the buffer, use simple linear stretch
-        if buffer_duration - expected_crossfade_duration < 5.0:
-            self.logger.debug(
-                "Time stretch filter (linear): %.1f BPM -> %.1f BPM (factor=%.4f)",
-                original_bpm,
-                target_bpm,
-                tempo_factor,
-            )
-            return [f"[0]atempo={tempo_factor:.6f}[fadeout_stretched]"], tempo_factor
-
-        # Implement segmented time stretching with exponential curve
-        num_segments = 4  # Balance between smoothness and filter complexity
-        filters = []
-
-        # Split the input into segments
-        filters.append(
-            f"[0]asplit={num_segments}" + "".join(f"[seg{i}]" for i in range(num_segments))
-        )
-
-        # Process each segment with progressively more tempo adjustment
-        for i in range(num_segments):
-            # Calculate segment timing
-            segment_start = (i * buffer_duration) / num_segments
-            segment_end = ((i + 1) * buffer_duration) / num_segments
-
-            # Calculate progress through the buffer (0 to 1)
-            progress = (i + 0.5) / num_segments  # Use midpoint of segment
-
-            # Apply exponential easing curve (ease-in-out cubic)
-            # This creates minimal change at start, accelerating in middle, decelerating at end
-            if progress < 0.5:
-                # First half: ease in (slow start)
-                eased_progress = 4 * progress * progress * progress
-            else:
-                # Second half: ease out (slow finish)
-                p = 2 * progress - 2
-                eased_progress = 1 + p * p * p / 2
-
-            # Calculate tempo for this segment
-            segment_tempo = 1.0 + (tempo_factor - 1.0) * eased_progress
-
-            # Clamp to atempo's valid range (should never exceed for < 3% changes)
-            segment_tempo = max(0.5, min(2.0, segment_tempo))
-
-            # Trim segment and apply tempo adjustment
-            filters.append(
-                f"[seg{i}]atrim=start={segment_start:.3f}:end={segment_end:.3f},"
-                f"asetpts=PTS-STARTPTS,atempo={segment_tempo:.6f}[seg{i}_stretched]"
-            )
-
-            self.logger.debug(
-                "Segment %d: %.1f-%.1fs, tempo factor=%.4f (%.1f%% of change)",
-                i + 1,
-                segment_start,
-                segment_end,
-                segment_tempo,
-                eased_progress * 100,
-            )
-
-        # Concatenate all stretched segments
-        concat_inputs = "".join(f"[seg{i}_stretched]" for i in range(num_segments))
-        filters.append(f"{concat_inputs}concat=n={num_segments}:v=0:a=1[fadeout_stretched]")
-
+        # Use uniform rubberband time stretching for the entire buffer
+        # This ensures downbeat adjustment calculations are accurate and beat alignment is perfect
+        # Rubberband is a high-quality music-specific algorithm optimized for music
         self.logger.debug(
-            "Time stretch filter (segmented): %.1f BPM -> %.1f BPM (factor=%.4f) with %d segments",
+            "Time stretch (rubberband uniform): %.1f BPM -> %.1f BPM (factor=%.4f)",
             original_bpm,
             target_bpm,
-            tempo_factor,
-            num_segments,
+            bpm_ratio,
         )
-
-        return filters, tempo_factor
+        return [
+            f"[0]rubberband=tempo={bpm_ratio:.6f}:transients=mixed:detector=soft:pitchq=quality"
+            "[fadeout_stretched]"
+        ], bpm_ratio
 
     def _apply_eq_filters(
         self,