From 6f18be66186f98633753d38b7d7ff8e4e9dbc8f3 Mon Sep 17 00:00:00 2001 From: Marvin Schenkel Date: Fri, 3 Oct 2025 22:11:32 +0200 Subject: [PATCH] Fix: Smart fades beat alignment (#2477) --- music_assistant/helpers/smart_fades.py | 307 ++++++++++++++++--------- 1 file changed, 200 insertions(+), 107 deletions(-) diff --git a/music_assistant/helpers/smart_fades.py b/music_assistant/helpers/smart_fades.py index a91023d5..a13d072d 100644 --- a/music_assistant/helpers/smart_fades.py +++ b/music_assistant/helpers/smart_fades.py @@ -36,7 +36,7 @@ if TYPE_CHECKING: SMART_CROSSFADE_DURATION = 45 ANALYSIS_FPS = 100 # Only apply time stretching if BPM difference is < this % -TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD = 8.0 +TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD = 5.0 class SmartFadesAnalyzer: @@ -427,17 +427,23 @@ class SmartFadesMixer: filters: list[str] = [] + # Calculate initial crossfade duration (may be adjusted later for downbeat alignment) + initial_crossfade_duration = self._calculate_crossfade_duration( + crossfade_bars=crossfade_bars, + fade_in_analysis=fade_in_analysis, + ) + + # Create time stretch filters - needs to know crossfade duration to complete + # tempo ramping before the crossfade starts time_stretch_filters, tempo_factor = self._create_time_stretch_filters( fade_out_analysis=fade_out_analysis, fade_in_analysis=fade_in_analysis, crossfade_bars=crossfade_bars, + crossfade_duration=initial_crossfade_duration, ) filters.extend(time_stretch_filters) - crossfade_duration = self._calculate_crossfade_duration( - crossfade_bars=crossfade_bars, - fade_in_analysis=fade_in_analysis, - ) + crossfade_duration = initial_crossfade_duration # Check if we would have enough audio after beat alignment for the crossfade if ( @@ -453,9 +459,17 @@ class SmartFadesMixer: # Skip beat alignment fadein_start_pos = None - beat_align_filters = self._perform_beat_alignment( + # Adjust crossfade duration to align with outgoing track's downbeats + # This prevents echo-ey sounds when both tracks have kicks during the crossfade + crossfade_duration = self._adjust_crossfade_to_downbeats( + fade_out_analysis=fade_out_analysis, + crossfade_duration=crossfade_duration, fadein_start_pos=fadein_start_pos, tempo_factor=tempo_factor, + ) + + beat_align_filters = self._trim_incoming_track_to_downbeat( + fadein_start_pos=fadein_start_pos, fadeout_input_label="[fadeout_stretched]", fadein_input_label="[1]", ) @@ -509,6 +523,160 @@ class SmartFadesMixer: return actual_duration + def _extrapolate_downbeats( + self, + downbeats: npt.NDArray[np.float64], + tempo_factor: float, + buffer_size: float = SMART_CROSSFADE_DURATION, + ) -> npt.NDArray[np.float64]: + """Extrapolate downbeats based on actual intervals when detection is incomplete. + + This is needed when we want to perform beat alignment in an 'atmospheric' outro + that does not have any detected downbeats. + """ + if len(downbeats) < 3: + # Need at least 3 downbeats to reliably calculate interval + return downbeats / tempo_factor + + # Adjust detected downbeats for time stretching first + adjusted_downbeats = downbeats / tempo_factor + last_downbeat = adjusted_downbeats[-1] + + # If the last downbeat is close to the buffer end, no extrapolation needed + if last_downbeat >= buffer_size - 5: + return adjusted_downbeats + + # Calculate intervals from ORIGINAL downbeats (before time stretching) + intervals = np.diff(downbeats) + median_interval = float(np.median(intervals)) + std_interval = float(np.std(intervals)) + + # Only extrapolate if intervals are consistent (low standard deviation) + if std_interval > 0.2: + self.logger.debug( + "Downbeat intervals too inconsistent (std=%.3fs) for extrapolation", + std_interval, + ) + return adjusted_downbeats + + # Adjust the interval for time stretching + # When slowing down (tempo_factor < 1.0), intervals get longer + adjusted_interval = median_interval / tempo_factor + + # Extrapolate forward from last adjusted downbeat using adjusted interval + extrapolated = [] + current_pos = last_downbeat + adjusted_interval + max_extrapolation_distance = 25.0 # Don't extrapolate more than 25s + + while ( + current_pos < buffer_size + and (current_pos - last_downbeat) <= max_extrapolation_distance + ): + extrapolated.append(current_pos) + current_pos += adjusted_interval + + if extrapolated: + self.logger.debug( + "Extrapolated %d downbeats (adjusted_interval=%.3fs, original=%.3fs) " + "from %.2fs to %.2fs", + len(extrapolated), + adjusted_interval, + median_interval, + last_downbeat, + extrapolated[-1], + ) + # Combine adjusted detected downbeats and extrapolated downbeats + return np.concatenate([adjusted_downbeats, np.array(extrapolated)]) + + return adjusted_downbeats + + def _adjust_crossfade_to_downbeats( + self, + fade_out_analysis: SmartFadesAnalysis, + crossfade_duration: float, + fadein_start_pos: float | None, + tempo_factor: float, + ) -> float: + """Adjust crossfade duration to align with outgoing track's downbeats. + + This ensures the crossfade starts on a downbeat of the outgoing track, + preventing echo-ey sounds when both tracks have kicks during the crossfade. + + The downbeat positions are adjusted for time stretching - when tempo_factor < 1.0 + (slowing down), beats take longer to reach their position in the stretched audio. + """ + # If we don't have downbeats or beat alignment is disabled, return original duration + if len(fade_out_analysis.downbeats) == 0 or fadein_start_pos is None: + return crossfade_duration + + # Extrapolate downbeats if needed (e.g., when beat detection is incomplete) + # This returns downbeats already adjusted for time stretching + adjusted_downbeats = self._extrapolate_downbeats( + fade_out_analysis.downbeats, tempo_factor=tempo_factor + ) + + # Calculate where the crossfade would start in the buffer + ideal_start_pos = SMART_CROSSFADE_DURATION - crossfade_duration + + # Debug: Show all downbeats and the ideal position + self.logger.debug( + "Downbeat adjustment - ideal_start=%.2fs (buffer=%.1fs - crossfade=%.2fs), " + "fadein_start=%.2fs, tempo_factor=%.4f", + ideal_start_pos, + SMART_CROSSFADE_DURATION, + crossfade_duration, + fadein_start_pos, + tempo_factor, + ) + + # Find the closest downbeats (earlier and later) + earlier_downbeat = None + later_downbeat = None + + for downbeat in adjusted_downbeats: + if downbeat <= ideal_start_pos: + earlier_downbeat = downbeat + elif downbeat > ideal_start_pos and later_downbeat is None: + later_downbeat = downbeat + break + + # Try earlier downbeat first (longer crossfade) + if earlier_downbeat is not None: + adjusted_duration = float(SMART_CROSSFADE_DURATION - earlier_downbeat) + # Check if this fits in the buffer + if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION: + if abs(adjusted_duration - crossfade_duration) > 0.1: + self.logger.debug( + "Adjusted crossfade duration from %.2fs to %.2fs to align with " + "downbeat at %.2fs (earlier)", + crossfade_duration, + adjusted_duration, + earlier_downbeat, + ) + return adjusted_duration + + # Try later downbeat (shorter crossfade) + if later_downbeat is not None: + adjusted_duration = float(SMART_CROSSFADE_DURATION - later_downbeat) + # Check if this fits in the buffer + if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION: + if abs(adjusted_duration - crossfade_duration) > 0.1: + self.logger.debug( + "Adjusted crossfade duration from %.2fs to %.2fs to align with " + "downbeat at %.2fs (later)", + crossfade_duration, + adjusted_duration, + later_downbeat, + ) + return adjusted_duration + + # If no suitable downbeat found, return original duration + self.logger.debug( + "Could not adjust crossfade duration to downbeats, using original %.2fs", + crossfade_duration, + ) + return crossfade_duration + def _calculate_optimal_crossfade_bars( self, fade_out_analysis: SmartFadesAnalysis, fade_in_analysis: SmartFadesAnalysis ) -> int: @@ -678,14 +846,18 @@ class SmartFadesMixer: ), ] - def _perform_beat_alignment( + def _trim_incoming_track_to_downbeat( self, fadein_start_pos: float | None, - tempo_factor: float, fadeout_input_label: str = "[0]", fadein_input_label: str = "[1]", ) -> list[str]: - """Perform beat alignment preprocessing.""" + """Perform beat alignment preprocessing. + + The incoming track is trimmed to its first downbeat position. + No adjustment is needed for time stretching since the incoming track + is not stretched - it's already at the target BPM. + """ # Just relabel in case we cannot perform beat alignment if fadein_start_pos is None: return [ @@ -693,15 +865,10 @@ class SmartFadesMixer: f"{fadein_input_label}anull[fadein_beatalign]", # codespell:ignore anull ] - # When time stretching is applied, we need to compensate for the timing change - # If tempo_factor < 1.0 (slowing down), beats in fadeout take longer to reach - # If tempo_factor > 1.0 (speeding up), beats in fadeout arrive sooner - adjusted_fadein_start_pos = fadein_start_pos / tempo_factor - - # Apply beat alignment: fadeout passes through, fadein trims to adjusted position + # Trim incoming track to start at first downbeat position return [ f"{fadeout_input_label}anull[fadeout_beatalign]", # codespell:ignore anull - f"{fadein_input_label}atrim=start={adjusted_fadein_start_pos},asetpts=PTS-STARTPTS[fadein_beatalign]", + f"{fadein_input_label}atrim=start={fadein_start_pos},asetpts=PTS-STARTPTS[fadein_beatalign]", ] def _create_time_stretch_filters( @@ -709,8 +876,13 @@ class SmartFadesMixer: fade_out_analysis: SmartFadesAnalysis, fade_in_analysis: SmartFadesAnalysis, crossfade_bars: int, + crossfade_duration: float, ) -> tuple[list[str], float]: - """Create FFmpeg filters to gradually adjust tempo from original BPM to target BPM.""" + """Create FFmpeg filters to gradually adjust tempo from original BPM to target BPM. + + The tempo ramping is completed before the crossfade starts to ensure perfect beat alignment + throughout the entire crossfade region. + """ # Check if time stretching should be applied (BPM difference < 3%) original_bpm = fade_out_analysis.bpm target_bpm = fade_in_analysis.bpm @@ -725,105 +897,26 @@ class SmartFadesMixer: # Log that we're applying time stretching self.logger.debug( - "Time stretch: %.1f%% BPM diff, adjusting %.1f -> %.1f BPM over buffer", + "Time stretch: %.1f%% BPM diff, adjusting %.1f -> %.1f BPM, crossfade starts at %.1fs", bpm_diff_percent, original_bpm, target_bpm, + SMART_CROSSFADE_DURATION - crossfade_duration, ) - # Calculate the tempo change factor - # atempo accepts values between 0.5 and 2.0 (can be chained for larger changes) - tempo_factor = bpm_ratio - buffer_duration = SMART_CROSSFADE_DURATION # 45 seconds - - # Calculate expected crossfade duration from bars for comparison - beats_per_bar = 4 - seconds_per_beat = 60.0 / original_bpm - expected_crossfade_duration = crossfade_bars * beats_per_bar * seconds_per_beat - - # For BPM differences < 3%, tempo_factor will be between 0.97 and 1.03 - # This is well within atempo's range - - # Validate tempo factor is within ffmpeg's atempo range - if not 0.5 <= tempo_factor <= 2.0: - self.logger.warning( - "Tempo factor %.4f out of range [0.5, 2.0], skipping time stretch", - tempo_factor, - ) - return ["[0]anull[fadeout_stretched]"], 1.0 # codespell:ignore anull - - # If the crossfade takes up most of the buffer, use simple linear stretch - if buffer_duration - expected_crossfade_duration < 5.0: - self.logger.debug( - "Time stretch filter (linear): %.1f BPM -> %.1f BPM (factor=%.4f)", - original_bpm, - target_bpm, - tempo_factor, - ) - return [f"[0]atempo={tempo_factor:.6f}[fadeout_stretched]"], tempo_factor - - # Implement segmented time stretching with exponential curve - num_segments = 4 # Balance between smoothness and filter complexity - filters = [] - - # Split the input into segments - filters.append( - f"[0]asplit={num_segments}" + "".join(f"[seg{i}]" for i in range(num_segments)) - ) - - # Process each segment with progressively more tempo adjustment - for i in range(num_segments): - # Calculate segment timing - segment_start = (i * buffer_duration) / num_segments - segment_end = ((i + 1) * buffer_duration) / num_segments - - # Calculate progress through the buffer (0 to 1) - progress = (i + 0.5) / num_segments # Use midpoint of segment - - # Apply exponential easing curve (ease-in-out cubic) - # This creates minimal change at start, accelerating in middle, decelerating at end - if progress < 0.5: - # First half: ease in (slow start) - eased_progress = 4 * progress * progress * progress - else: - # Second half: ease out (slow finish) - p = 2 * progress - 2 - eased_progress = 1 + p * p * p / 2 - - # Calculate tempo for this segment - segment_tempo = 1.0 + (tempo_factor - 1.0) * eased_progress - - # Clamp to atempo's valid range (should never exceed for < 3% changes) - segment_tempo = max(0.5, min(2.0, segment_tempo)) - - # Trim segment and apply tempo adjustment - filters.append( - f"[seg{i}]atrim=start={segment_start:.3f}:end={segment_end:.3f}," - f"asetpts=PTS-STARTPTS,atempo={segment_tempo:.6f}[seg{i}_stretched]" - ) - - self.logger.debug( - "Segment %d: %.1f-%.1fs, tempo factor=%.4f (%.1f%% of change)", - i + 1, - segment_start, - segment_end, - segment_tempo, - eased_progress * 100, - ) - - # Concatenate all stretched segments - concat_inputs = "".join(f"[seg{i}_stretched]" for i in range(num_segments)) - filters.append(f"{concat_inputs}concat=n={num_segments}:v=0:a=1[fadeout_stretched]") - + # Use uniform rubberband time stretching for the entire buffer + # This ensures downbeat adjustment calculations are accurate and beat alignment is perfect + # Rubberband is a high-quality music-specific algorithm optimized for music self.logger.debug( - "Time stretch filter (segmented): %.1f BPM -> %.1f BPM (factor=%.4f) with %d segments", + "Time stretch (rubberband uniform): %.1f BPM -> %.1f BPM (factor=%.4f)", original_bpm, target_bpm, - tempo_factor, - num_segments, + bpm_ratio, ) - - return filters, tempo_factor + return [ + f"[0]rubberband=tempo={bpm_ratio:.6f}:transients=mixed:detector=soft:pitchq=quality" + "[fadeout_stretched]" + ], bpm_ratio def _apply_eq_filters( self, -- 2.34.1