SMART_CROSSFADE_DURATION = 45
ANALYSIS_FPS = 100
# Only apply time stretching if BPM difference is < this %
-TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD = 8.0
+TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD = 5.0
class SmartFadesAnalyzer:
filters: list[str] = []
+ # Calculate initial crossfade duration (may be adjusted later for downbeat alignment)
+ initial_crossfade_duration = self._calculate_crossfade_duration(
+ crossfade_bars=crossfade_bars,
+ fade_in_analysis=fade_in_analysis,
+ )
+
+ # Create time stretch filters - needs to know crossfade duration to complete
+ # tempo ramping before the crossfade starts
time_stretch_filters, tempo_factor = self._create_time_stretch_filters(
fade_out_analysis=fade_out_analysis,
fade_in_analysis=fade_in_analysis,
crossfade_bars=crossfade_bars,
+ crossfade_duration=initial_crossfade_duration,
)
filters.extend(time_stretch_filters)
- crossfade_duration = self._calculate_crossfade_duration(
- crossfade_bars=crossfade_bars,
- fade_in_analysis=fade_in_analysis,
- )
+ crossfade_duration = initial_crossfade_duration
# Check if we would have enough audio after beat alignment for the crossfade
if (
# Skip beat alignment
fadein_start_pos = None
- beat_align_filters = self._perform_beat_alignment(
+ # Adjust crossfade duration to align with outgoing track's downbeats
+ # This prevents echo-ey sounds when both tracks have kicks during the crossfade
+ crossfade_duration = self._adjust_crossfade_to_downbeats(
+ fade_out_analysis=fade_out_analysis,
+ crossfade_duration=crossfade_duration,
fadein_start_pos=fadein_start_pos,
tempo_factor=tempo_factor,
+ )
+
+ beat_align_filters = self._trim_incoming_track_to_downbeat(
+ fadein_start_pos=fadein_start_pos,
fadeout_input_label="[fadeout_stretched]",
fadein_input_label="[1]",
)
return actual_duration
+ def _extrapolate_downbeats(
+ self,
+ downbeats: npt.NDArray[np.float64],
+ tempo_factor: float,
+ buffer_size: float = SMART_CROSSFADE_DURATION,
+ ) -> npt.NDArray[np.float64]:
+ """Extrapolate downbeats based on actual intervals when detection is incomplete.
+
+ This is needed when we want to perform beat alignment in an 'atmospheric' outro
+ that does not have any detected downbeats.
+ """
+ if len(downbeats) < 3:
+ # Need at least 3 downbeats to reliably calculate interval
+ return downbeats / tempo_factor
+
+ # Adjust detected downbeats for time stretching first
+ adjusted_downbeats = downbeats / tempo_factor
+ last_downbeat = adjusted_downbeats[-1]
+
+ # If the last downbeat is close to the buffer end, no extrapolation needed
+ if last_downbeat >= buffer_size - 5:
+ return adjusted_downbeats
+
+ # Calculate intervals from ORIGINAL downbeats (before time stretching)
+ intervals = np.diff(downbeats)
+ median_interval = float(np.median(intervals))
+ std_interval = float(np.std(intervals))
+
+ # Only extrapolate if intervals are consistent (low standard deviation)
+ if std_interval > 0.2:
+ self.logger.debug(
+ "Downbeat intervals too inconsistent (std=%.3fs) for extrapolation",
+ std_interval,
+ )
+ return adjusted_downbeats
+
+ # Adjust the interval for time stretching
+ # When slowing down (tempo_factor < 1.0), intervals get longer
+ adjusted_interval = median_interval / tempo_factor
+
+ # Extrapolate forward from last adjusted downbeat using adjusted interval
+ extrapolated = []
+ current_pos = last_downbeat + adjusted_interval
+ max_extrapolation_distance = 25.0 # Don't extrapolate more than 25s
+
+ while (
+ current_pos < buffer_size
+ and (current_pos - last_downbeat) <= max_extrapolation_distance
+ ):
+ extrapolated.append(current_pos)
+ current_pos += adjusted_interval
+
+ if extrapolated:
+ self.logger.debug(
+ "Extrapolated %d downbeats (adjusted_interval=%.3fs, original=%.3fs) "
+ "from %.2fs to %.2fs",
+ len(extrapolated),
+ adjusted_interval,
+ median_interval,
+ last_downbeat,
+ extrapolated[-1],
+ )
+ # Combine adjusted detected downbeats and extrapolated downbeats
+ return np.concatenate([adjusted_downbeats, np.array(extrapolated)])
+
+ return adjusted_downbeats
+
+ def _adjust_crossfade_to_downbeats(
+ self,
+ fade_out_analysis: SmartFadesAnalysis,
+ crossfade_duration: float,
+ fadein_start_pos: float | None,
+ tempo_factor: float,
+ ) -> float:
+ """Adjust crossfade duration to align with outgoing track's downbeats.
+
+ This ensures the crossfade starts on a downbeat of the outgoing track,
+ preventing echo-ey sounds when both tracks have kicks during the crossfade.
+
+ The downbeat positions are adjusted for time stretching - when tempo_factor < 1.0
+ (slowing down), beats take longer to reach their position in the stretched audio.
+ """
+ # If we don't have downbeats or beat alignment is disabled, return original duration
+ if len(fade_out_analysis.downbeats) == 0 or fadein_start_pos is None:
+ return crossfade_duration
+
+ # Extrapolate downbeats if needed (e.g., when beat detection is incomplete)
+ # This returns downbeats already adjusted for time stretching
+ adjusted_downbeats = self._extrapolate_downbeats(
+ fade_out_analysis.downbeats, tempo_factor=tempo_factor
+ )
+
+ # Calculate where the crossfade would start in the buffer
+ ideal_start_pos = SMART_CROSSFADE_DURATION - crossfade_duration
+
+ # Debug: Show all downbeats and the ideal position
+ self.logger.debug(
+ "Downbeat adjustment - ideal_start=%.2fs (buffer=%.1fs - crossfade=%.2fs), "
+ "fadein_start=%.2fs, tempo_factor=%.4f",
+ ideal_start_pos,
+ SMART_CROSSFADE_DURATION,
+ crossfade_duration,
+ fadein_start_pos,
+ tempo_factor,
+ )
+
+ # Find the closest downbeats (earlier and later)
+ earlier_downbeat = None
+ later_downbeat = None
+
+ for downbeat in adjusted_downbeats:
+ if downbeat <= ideal_start_pos:
+ earlier_downbeat = downbeat
+ elif downbeat > ideal_start_pos and later_downbeat is None:
+ later_downbeat = downbeat
+ break
+
+ # Try earlier downbeat first (longer crossfade)
+ if earlier_downbeat is not None:
+ adjusted_duration = float(SMART_CROSSFADE_DURATION - earlier_downbeat)
+ # Check if this fits in the buffer
+ if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION:
+ if abs(adjusted_duration - crossfade_duration) > 0.1:
+ self.logger.debug(
+ "Adjusted crossfade duration from %.2fs to %.2fs to align with "
+ "downbeat at %.2fs (earlier)",
+ crossfade_duration,
+ adjusted_duration,
+ earlier_downbeat,
+ )
+ return adjusted_duration
+
+ # Try later downbeat (shorter crossfade)
+ if later_downbeat is not None:
+ adjusted_duration = float(SMART_CROSSFADE_DURATION - later_downbeat)
+ # Check if this fits in the buffer
+ if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION:
+ if abs(adjusted_duration - crossfade_duration) > 0.1:
+ self.logger.debug(
+ "Adjusted crossfade duration from %.2fs to %.2fs to align with "
+ "downbeat at %.2fs (later)",
+ crossfade_duration,
+ adjusted_duration,
+ later_downbeat,
+ )
+ return adjusted_duration
+
+ # If no suitable downbeat found, return original duration
+ self.logger.debug(
+ "Could not adjust crossfade duration to downbeats, using original %.2fs",
+ crossfade_duration,
+ )
+ return crossfade_duration
+
def _calculate_optimal_crossfade_bars(
self, fade_out_analysis: SmartFadesAnalysis, fade_in_analysis: SmartFadesAnalysis
) -> int:
),
]
- def _perform_beat_alignment(
+ def _trim_incoming_track_to_downbeat(
self,
fadein_start_pos: float | None,
- tempo_factor: float,
fadeout_input_label: str = "[0]",
fadein_input_label: str = "[1]",
) -> list[str]:
- """Perform beat alignment preprocessing."""
+ """Perform beat alignment preprocessing.
+
+ The incoming track is trimmed to its first downbeat position.
+ No adjustment is needed for time stretching since the incoming track
+ is not stretched - it's already at the target BPM.
+ """
# Just relabel in case we cannot perform beat alignment
if fadein_start_pos is None:
return [
f"{fadein_input_label}anull[fadein_beatalign]", # codespell:ignore anull
]
- # When time stretching is applied, we need to compensate for the timing change
- # If tempo_factor < 1.0 (slowing down), beats in fadeout take longer to reach
- # If tempo_factor > 1.0 (speeding up), beats in fadeout arrive sooner
- adjusted_fadein_start_pos = fadein_start_pos / tempo_factor
-
- # Apply beat alignment: fadeout passes through, fadein trims to adjusted position
+ # Trim incoming track to start at first downbeat position
return [
f"{fadeout_input_label}anull[fadeout_beatalign]", # codespell:ignore anull
- f"{fadein_input_label}atrim=start={adjusted_fadein_start_pos},asetpts=PTS-STARTPTS[fadein_beatalign]",
+ f"{fadein_input_label}atrim=start={fadein_start_pos},asetpts=PTS-STARTPTS[fadein_beatalign]",
]
def _create_time_stretch_filters(
fade_out_analysis: SmartFadesAnalysis,
fade_in_analysis: SmartFadesAnalysis,
crossfade_bars: int,
+ crossfade_duration: float,
) -> tuple[list[str], float]:
- """Create FFmpeg filters to gradually adjust tempo from original BPM to target BPM."""
+ """Create FFmpeg filters to gradually adjust tempo from original BPM to target BPM.
+
+ The tempo ramping is completed before the crossfade starts to ensure perfect beat alignment
+ throughout the entire crossfade region.
+ """
# Check if time stretching should be applied (BPM difference < 3%)
original_bpm = fade_out_analysis.bpm
target_bpm = fade_in_analysis.bpm
# Log that we're applying time stretching
self.logger.debug(
- "Time stretch: %.1f%% BPM diff, adjusting %.1f -> %.1f BPM over buffer",
+ "Time stretch: %.1f%% BPM diff, adjusting %.1f -> %.1f BPM, crossfade starts at %.1fs",
bpm_diff_percent,
original_bpm,
target_bpm,
+ SMART_CROSSFADE_DURATION - crossfade_duration,
)
- # Calculate the tempo change factor
- # atempo accepts values between 0.5 and 2.0 (can be chained for larger changes)
- tempo_factor = bpm_ratio
- buffer_duration = SMART_CROSSFADE_DURATION # 45 seconds
-
- # Calculate expected crossfade duration from bars for comparison
- beats_per_bar = 4
- seconds_per_beat = 60.0 / original_bpm
- expected_crossfade_duration = crossfade_bars * beats_per_bar * seconds_per_beat
-
- # For BPM differences < 3%, tempo_factor will be between 0.97 and 1.03
- # This is well within atempo's range
-
- # Validate tempo factor is within ffmpeg's atempo range
- if not 0.5 <= tempo_factor <= 2.0:
- self.logger.warning(
- "Tempo factor %.4f out of range [0.5, 2.0], skipping time stretch",
- tempo_factor,
- )
- return ["[0]anull[fadeout_stretched]"], 1.0 # codespell:ignore anull
-
- # If the crossfade takes up most of the buffer, use simple linear stretch
- if buffer_duration - expected_crossfade_duration < 5.0:
- self.logger.debug(
- "Time stretch filter (linear): %.1f BPM -> %.1f BPM (factor=%.4f)",
- original_bpm,
- target_bpm,
- tempo_factor,
- )
- return [f"[0]atempo={tempo_factor:.6f}[fadeout_stretched]"], tempo_factor
-
- # Implement segmented time stretching with exponential curve
- num_segments = 4 # Balance between smoothness and filter complexity
- filters = []
-
- # Split the input into segments
- filters.append(
- f"[0]asplit={num_segments}" + "".join(f"[seg{i}]" for i in range(num_segments))
- )
-
- # Process each segment with progressively more tempo adjustment
- for i in range(num_segments):
- # Calculate segment timing
- segment_start = (i * buffer_duration) / num_segments
- segment_end = ((i + 1) * buffer_duration) / num_segments
-
- # Calculate progress through the buffer (0 to 1)
- progress = (i + 0.5) / num_segments # Use midpoint of segment
-
- # Apply exponential easing curve (ease-in-out cubic)
- # This creates minimal change at start, accelerating in middle, decelerating at end
- if progress < 0.5:
- # First half: ease in (slow start)
- eased_progress = 4 * progress * progress * progress
- else:
- # Second half: ease out (slow finish)
- p = 2 * progress - 2
- eased_progress = 1 + p * p * p / 2
-
- # Calculate tempo for this segment
- segment_tempo = 1.0 + (tempo_factor - 1.0) * eased_progress
-
- # Clamp to atempo's valid range (should never exceed for < 3% changes)
- segment_tempo = max(0.5, min(2.0, segment_tempo))
-
- # Trim segment and apply tempo adjustment
- filters.append(
- f"[seg{i}]atrim=start={segment_start:.3f}:end={segment_end:.3f},"
- f"asetpts=PTS-STARTPTS,atempo={segment_tempo:.6f}[seg{i}_stretched]"
- )
-
- self.logger.debug(
- "Segment %d: %.1f-%.1fs, tempo factor=%.4f (%.1f%% of change)",
- i + 1,
- segment_start,
- segment_end,
- segment_tempo,
- eased_progress * 100,
- )
-
- # Concatenate all stretched segments
- concat_inputs = "".join(f"[seg{i}_stretched]" for i in range(num_segments))
- filters.append(f"{concat_inputs}concat=n={num_segments}:v=0:a=1[fadeout_stretched]")
-
+ # Use uniform rubberband time stretching for the entire buffer
+ # This ensures downbeat adjustment calculations are accurate and beat alignment is perfect
+ # Rubberband is a high-quality music-specific algorithm optimized for music
self.logger.debug(
- "Time stretch filter (segmented): %.1f BPM -> %.1f BPM (factor=%.4f) with %d segments",
+ "Time stretch (rubberband uniform): %.1f BPM -> %.1f BPM (factor=%.4f)",
original_bpm,
target_bpm,
- tempo_factor,
- num_segments,
+ bpm_ratio,
)
-
- return filters, tempo_factor
+ return [
+ f"[0]rubberband=tempo={bpm_ratio:.6f}:transients=mixed:detector=soft:pitchq=quality"
+ "[fadeout_stretched]"
+ ], bpm_ratio
def _apply_eq_filters(
self,