"""Smart Fades - Object-oriented implementation with intelligent fades and adaptive filtering."""
-# TODO: Figure out if we can achieve shared buffer with StreamController on full
-# current and next track for more EQ options.
-# TODO: Refactor the Analyzer into a metadata controller after we have split the controllers
-# TODO: Refactor the Mixer into a stream controller after we have split the controllers
from __future__ import annotations
import asyncio
import logging
import time
import warnings
+from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
import aiofiles
from music_assistant.constants import VERBOSE_LOG_LEVEL
from music_assistant.helpers.audio import (
align_audio_to_frame_boundary,
- crossfade_pcm_parts,
strip_silence,
)
from music_assistant.helpers.process import communicate
SMART_CROSSFADE_DURATION = 45
ANALYSIS_FPS = 100
-# Only apply time stretching if BPM difference is < this %
-TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD = 5.0
class SmartFadesAnalyzer:
fragment_duration,
len(audio_data),
)
- # Perform beat analysis
-
# Convert PCM bytes to numpy array and then to mono for analysis
audio_array = np.frombuffer(audio_data, dtype=np.float32)
if pcm_format.channels > 1:
return None
-class SmartFadesMixer:
- """Smart fades mixer class that mixes tracks based on analysis data."""
+#############################
+# SMART FADES EQ LOGIC
+#############################
- def __init__(self, mass: MusicAssistant) -> None:
- """Initialize smart fades mixer."""
- self.mass = mass
- self.logger = logging.getLogger(__name__)
- # TODO: Refactor into stream (or metadata) controller after we have split the controllers
- self.analyzer = SmartFadesAnalyzer(mass)
- async def mix(
+class Filter(ABC):
+ """Abstract base class for audio filters."""
+
+ output_fadeout_label: str
+ output_fadein_label: str
+
+ @abstractmethod
+ def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]:
+ """Apply the filter and return the FFmpeg filter strings."""
+
+
+class TimeStretchFilter(Filter):
+ """Filter that applies time stretching to match BPM using rubberband."""
+
+ output_fadeout_label: str = "fadeout_stretched"
+ output_fadein_label: str = "fadein_unchanged"
+
+ def __init__(
self,
- fade_in_part: bytes,
- fade_out_part: bytes,
- fade_in_streamdetails: StreamDetails,
- fade_out_streamdetails: StreamDetails,
- pcm_format: AudioFormat,
- standard_crossfade_duration: int = 10,
- mode: SmartFadesMode = SmartFadesMode.SMART_FADES,
- ) -> bytes:
- """Apply crossfade with internal state management and smart/standard fallback logic."""
- if mode == SmartFadesMode.DISABLED:
- # No crossfade, just concatenate
- # Note that this should not happen since we check this before calling mix()
- # but just to be sure...
- return fade_out_part + fade_in_part
+ stretch_ratio: float,
+ ):
+ """Initialize time stretch filter."""
+ self.stretch_ratio = stretch_ratio
- # strip silence from end of audio of fade_out_part
- fade_out_part = await strip_silence(
- self.mass,
- fade_out_part,
- pcm_format=pcm_format,
- reverse=True,
- )
- # Ensure frame alignment after silence stripping
- fade_out_part = align_audio_to_frame_boundary(fade_out_part, pcm_format)
+ def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]:
+ """Create FFmpeg filters to gradually adjust tempo from original BPM to target BPM."""
+ return [
+ f"{input_fadeout_label}rubberband=tempo={self.stretch_ratio:.6f}:transients=mixed:detector=soft:pitchq=quality"
+ f"[{self.output_fadeout_label}]",
+ f"{input_fadein_label}anull[{self.output_fadein_label}]", # codespell:ignore anull
+ ]
- # strip silence from begin of audio of fade_in_part
- fade_in_part = await strip_silence(
- self.mass,
- fade_in_part,
- pcm_format=pcm_format,
- reverse=False,
- )
- # Ensure frame alignment after silence stripping
- fade_in_part = align_audio_to_frame_boundary(fade_in_part, pcm_format)
- if mode == SmartFadesMode.STANDARD_CROSSFADE:
- # crossfade with standard crossfade
- return await self._default_crossfade(
- fade_in_part,
- fade_out_part,
- pcm_format,
- standard_crossfade_duration,
- )
- # Attempt smart crossfade with analysis data
- fade_out_analysis: SmartFadesAnalysis | None
- if stored_analysis := await self.mass.music.get_smart_fades_analysis(
- fade_out_streamdetails.item_id,
- fade_out_streamdetails.provider,
- SmartFadesAnalysisFragment.OUTRO,
- ):
- fade_out_analysis = stored_analysis
+ def __repr__(self) -> str:
+ """Return string representation of TimeStretchFilter."""
+ return f"TimeStretch(ratio={self.stretch_ratio:.2f})"
+
+
+class TrimFilter(Filter):
+ """Filter that trims incoming track to align with downbeats."""
+
+ output_fadeout_label: str = "fadeout_beatalign"
+ output_fadein_label: str = "fadein_beatalign"
+
+ def __init__(self, fadein_start_pos: float):
+ """Initialize beat align filter.
+
+ Args:
+ fadein_start_pos: Position in seconds to trim the incoming track to
+ """
+ self.fadein_start_pos = fadein_start_pos
+
+ def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]:
+ """Trim the incoming track to align with downbeats."""
+ return [
+ f"{input_fadeout_label}anull[{self.output_fadeout_label}]", # codespell:ignore anull
+ f"{input_fadein_label}atrim=start={self.fadein_start_pos},asetpts=PTS-STARTPTS[{self.output_fadein_label}]",
+ ]
+
+ def __repr__(self) -> str:
+ """Return string representation of TrimFilter."""
+ return f"Trim(trim={self.fadein_start_pos:.2f}s)"
+
+
+class FrequencySweepFilter(Filter):
+ """Filter that creates frequency sweep effects (lowpass/highpass transitions)."""
+
+ output_fadeout_label: str = "frequency_sweep"
+ output_fadein_label: str = "frequency_sweep"
+
+ def __init__(
+ self,
+ sweep_type: str,
+ target_freq: int,
+ duration: float,
+ start_time: float,
+ sweep_direction: str,
+ poles: int,
+ curve_type: str,
+ stream_type: str = "fadeout",
+ ):
+ """Initialize frequency sweep filter.
+
+ Args:
+ sweep_type: 'lowpass' or 'highpass'
+ target_freq: Target frequency for the filter
+ duration: Duration of the sweep in seconds
+ start_time: When to start the sweep
+ sweep_direction: 'fade_in' (unfiltered->filtered) or 'fade_out' (filtered->unfiltered)
+ poles: Number of poles for the filter
+ curve_type: 'linear', 'exponential', or 'logarithmic'
+ stream_type: 'fadeout' or 'fadein' - which stream to process
+ """
+ self.sweep_type = sweep_type
+ self.target_freq = target_freq
+ self.duration = duration
+ self.start_time = start_time
+ self.sweep_direction = sweep_direction
+ self.poles = poles
+ self.curve_type = curve_type
+ self.stream_type = stream_type
+
+ # Set output labels based on stream type
+ if stream_type == "fadeout":
+ self.output_fadeout_label = f"fadeout_{sweep_type}"
+ self.output_fadein_label = "fadein_passthrough"
else:
- fade_out_analysis = await self.analyzer.analyze(
- fade_out_streamdetails.item_id,
- fade_out_streamdetails.provider,
- SmartFadesAnalysisFragment.OUTRO,
- fade_out_part,
- pcm_format,
- )
+ self.output_fadeout_label = "fadeout_passthrough"
+ self.output_fadein_label = f"fadein_{sweep_type}"
- fade_in_analysis: SmartFadesAnalysis | None
- if stored_analysis := await self.mass.music.get_smart_fades_analysis(
- fade_in_streamdetails.item_id,
- fade_in_streamdetails.provider,
- SmartFadesAnalysisFragment.INTRO,
- ):
- fade_in_analysis = stored_analysis
+ def _generate_volume_expr(self, start: float, dur: float, direction: str, curve: str) -> str:
+ t_expr = f"t-{start}" # Time relative to start
+ norm_t = f"min(max({t_expr},0),{dur})/{dur}" # Normalized 0-1
+
+ if curve == "exponential":
+ # Exponential curve for smoother transitions
+ if direction == "up":
+ return f"'pow({norm_t},2)':eval=frame"
+ else:
+ return f"'1-pow({norm_t},2)':eval=frame"
+ elif curve == "logarithmic":
+ # Logarithmic curve for more aggressive initial change
+ if direction == "up":
+ return f"'sqrt({norm_t})':eval=frame"
+ else:
+ return f"'1-sqrt({norm_t})':eval=frame"
+ elif direction == "up":
+ return f"'{norm_t}':eval=frame"
else:
- fade_in_analysis = await self.analyzer.analyze(
- fade_in_streamdetails.item_id,
- fade_in_streamdetails.provider,
- SmartFadesAnalysisFragment.INTRO,
- fade_in_part,
- pcm_format,
- )
- if (
- fade_out_analysis
- and fade_in_analysis
- and fade_out_analysis.confidence > 0.3
- and fade_in_analysis.confidence > 0.3
- and mode == SmartFadesMode.SMART_FADES
- ):
- try:
- return await self._apply_smart_crossfade(
- fade_out_analysis,
- fade_in_analysis,
- fade_out_part,
- fade_in_part,
- pcm_format,
- )
- except Exception as e:
- self.logger.warning(
- "Smart crossfade failed: %s, falling back to standard crossfade", e
- )
+ return f"'1-{norm_t}':eval=frame"
- return await self._default_crossfade(
- fade_in_part,
- fade_out_part,
- pcm_format,
- standard_crossfade_duration,
+ def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]:
+ """Generate FFmpeg filters for frequency sweep effect."""
+ # Select the correct input based on stream type
+ if self.stream_type == "fadeout":
+ input_label = input_fadeout_label
+ output_label = self.output_fadeout_label
+ passthrough_label = self.output_fadein_label
+ passthrough_input = input_fadein_label
+ else:
+ input_label = input_fadein_label
+ output_label = self.output_fadein_label
+ passthrough_label = self.output_fadeout_label
+ passthrough_input = input_fadeout_label
+
+ orig_label = f"{output_label}_orig"
+ filter_label = f"{output_label}_to{self.sweep_type[:2]}"
+ filtered_label = f"{output_label}_filtered"
+ orig_faded_label = f"{output_label}_orig_faded"
+ filtered_faded_label = f"{output_label}_filtered_faded"
+
+ # Determine volume ramp directions based on sweep direction
+ if self.sweep_direction == "fade_in":
+ # Fade from dry to wet (unfiltered to filtered)
+ orig_direction = "down"
+ filter_direction = "up"
+ else: # fade_out
+ # Fade from wet to dry (filtered to unfiltered)
+ orig_direction = "up"
+ filter_direction = "down"
+
+ # Build filter chain
+ orig_volume_expr = self._generate_volume_expr(
+ self.start_time, self.duration, orig_direction, self.curve_type
)
+ filtered_volume_expr = self._generate_volume_expr(
+ self.start_time, self.duration, filter_direction, self.curve_type
+ )
+
+ return [
+ # Pass through the other stream unchanged
+ f"{passthrough_input}anull[{passthrough_label}]", # codespell:ignore anull
+ # Split input into two paths
+ f"{input_label}asplit=2[{orig_label}][{filter_label}]",
+ # Apply frequency filter to one path
+ f"[{filter_label}]{self.sweep_type}=f={self.target_freq}:poles={self.poles}[{filtered_label}]",
+ # Apply time-varying volume to original path
+ f"[{orig_label}]volume={orig_volume_expr}[{orig_faded_label}]",
+ # Apply time-varying volume to filtered path
+ f"[{filtered_label}]volume={filtered_volume_expr}[{filtered_faded_label}]",
+ # Mix the two paths together
+ f"[{orig_faded_label}][{filtered_faded_label}]amix=inputs=2:duration=longest:normalize=0[{output_label}]",
+ ]
+
+ def __repr__(self) -> str:
+ """Return string representation of FrequencySweepFilter."""
+ return f"FreqSweep({self.sweep_type}@{self.target_freq}Hz)"
+
+
+class CrossfadeFilter(Filter):
+ """Filter that applies the final crossfade between fadeout and fadein streams."""
+
+ output_fadeout_label: str = "crossfade"
+ output_fadein_label: str = "crossfade"
+
+ def __init__(self, crossfade_duration: float):
+ """Initialize crossfade filter."""
+ self.crossfade_duration = crossfade_duration
+
+ def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]:
+ """Apply the acrossfade filter."""
+ return [f"{input_fadeout_label}{input_fadein_label}acrossfade=d={self.crossfade_duration}"]
+
+ def __repr__(self) -> str:
+ """Return string representation of CrossfadeFilter."""
+ return f"Crossfade(d={self.crossfade_duration:.1f}s)"
+
+
+class SmartFade(ABC):
+ """Abstract base class for Smart Fades."""
+
+ filters: list[Filter]
- async def _apply_smart_crossfade(
+ def __init__(self) -> None:
+ """Initialize SmartFade base class."""
+ self.logger = logging.getLogger(__name__)
+ self.filters = []
+
+ @abstractmethod
+ def _build(self) -> None:
+ """Build the smart fades filter chain."""
+ ...
+
+ def _get_ffmpeg_filters(
+ self,
+ input_fadein_label: str = "[1]",
+ input_fadeout_label: str = "[0]",
+ ) -> list[str]:
+ """Get FFmpeg filters for smart fades."""
+ if not self.filters:
+ self._build()
+ filters = []
+ _cur_fadein_label = input_fadein_label
+ _cur_fadeout_label = input_fadeout_label
+ for audio_filter in self.filters:
+ filter_strings = audio_filter.apply(_cur_fadein_label, _cur_fadeout_label)
+ filters.extend(filter_strings)
+ _cur_fadein_label = f"[{audio_filter.output_fadein_label}]"
+ _cur_fadeout_label = f"[{audio_filter.output_fadeout_label}]"
+ return filters
+
+ async def apply(
self,
- fade_out_analysis: SmartFadesAnalysis,
- fade_in_analysis: SmartFadesAnalysis,
fade_out_part: bytes,
fade_in_part: bytes,
pcm_format: AudioFormat,
) -> bytes:
- """Apply smart crossfade with beat-perfect timing and adaptive filtering."""
+ """Apply the smart fade to the given PCM audio parts."""
# Write the fade_out_part to a temporary file
fadeout_filename = f"/tmp/{shortuuid.random(20)}.pcm" # noqa: S108
async with aiofiles.open(fadeout_filename, "wb") as outfile:
"-i",
"-",
]
-
- smart_fade_filters = self._create_enhanced_smart_fade_filters(
- fade_out_analysis,
- fade_in_analysis,
+ smart_fade_filters = self._get_ffmpeg_filters()
+ self.logger.debug(
+ "Applying smartfade: %s",
+ self,
)
args.extend(
[
"-",
]
)
-
+ self.logger.debug("FFmpeg smartfade args: %s", " ".join(args))
self.logger.log(VERBOSE_LOG_LEVEL, "FFmpeg command args: %s", " ".join(args))
# Execute the enhanced smart fade with full buffer
stderr_msg = stderr.decode() if stderr else "(no stderr output)"
raise RuntimeError(f"Smart crossfade failed. FFmpeg stderr: {stderr_msg}")
- # SMART FADE HELPER METHODS
- def _create_enhanced_smart_fade_filters(
- self,
- fade_out_analysis: SmartFadesAnalysis,
- fade_in_analysis: SmartFadesAnalysis,
- ) -> list[str]:
- """Create smart fade filters with perfect timing and adaptive filtering."""
- # Calculate optimal crossfade bars that fit in available buffer
- crossfade_bars = self._calculate_optimal_crossfade_bars(fade_out_analysis, fade_in_analysis)
+ def __repr__(self) -> str:
+ """Return string representation of SmartFade showing the filter chain."""
+ if not self.filters:
+ return f"<{self.__class__.__name__}: 0 filters>"
- # Calculate beat positions for the selected bar count
- fadeout_start_pos, fadein_start_pos = self._calculate_optimal_fade_timing(
- fade_out_analysis, fade_in_analysis, crossfade_bars
- )
+ chain = " → ".join(repr(f) for f in self.filters)
+ return f"<{self.__class__.__name__}: {len(self.filters)} filters> {chain}"
- # Log the final selected timing
- if fadeout_start_pos is not None and fadein_start_pos is not None:
- self.logger.debug(
- "Beat timing selected: fadeout=%.2fs, fadein=%.2fs (%d bars)",
- fadeout_start_pos,
- fadein_start_pos,
- crossfade_bars,
- )
- filters: list[str] = []
+class SmartCrossFade(SmartFade):
+ """Smart fades class that implements a Smart Fade mode."""
- # Calculate initial crossfade duration (may be adjusted later for downbeat alignment)
- initial_crossfade_duration = self._calculate_crossfade_duration(
- crossfade_bars=crossfade_bars,
- fade_in_analysis=fade_in_analysis,
- )
+ # Only apply time stretching if BPM difference is < this %
+ time_stretch_bpm_percentage_threshold: float = 5.0
+
+ def __init__(
+ self, fade_out_analysis: SmartFadesAnalysis, fade_in_analysis: SmartFadesAnalysis
+ ) -> None:
+ """Initialize SmartFades with analysis data.
+
+ Args:
+ fade_out_analysis: Analysis data for the outgoing track
+ fade_in_analysis: Analysis data for the incoming track
+ logger: Optional logger for debug output
+ """
+ self.fade_out_analysis = fade_out_analysis
+ self.fade_in_analysis = fade_in_analysis
+ super().__init__()
+
+ def _build(self) -> None:
+ """Build the smart fades filter chain."""
+ # Calculate tempo factor for time stretching
+ bpm_ratio = self.fade_in_analysis.bpm / self.fade_out_analysis.bpm
+ bpm_diff_percent = abs(1.0 - bpm_ratio) * 100
- # Create time stretch filters - needs to know crossfade duration to complete
- # tempo ramping before the crossfade starts
- time_stretch_filters, tempo_factor = self._create_time_stretch_filters(
- fade_out_analysis=fade_out_analysis,
- fade_in_analysis=fade_in_analysis,
- crossfade_bars=crossfade_bars,
- crossfade_duration=initial_crossfade_duration,
+ # Extrapolate downbeats for better bar calculation
+ self.extrapolated_fadeout_downbeats = extrapolate_downbeats(
+ self.fade_out_analysis.downbeats,
+ tempo_factor=1.0,
+ bpm=self.fade_out_analysis.bpm,
)
- filters.extend(time_stretch_filters)
- crossfade_duration = initial_crossfade_duration
+ # Calculate optimal crossfade bars that fit in available buffer
+ crossfade_bars = self._calculate_optimal_crossfade_bars()
- # Check if we would have enough audio after beat alignment for the crossfade
+ # Calculate beat positions for the selected bar count
+ fadein_start_pos = self._calculate_optimal_fade_timing(crossfade_bars)
+
+ # Calculate initial crossfade duration (may be adjusted later for downbeat alignment)
+ crossfade_duration = self._calculate_crossfade_duration(crossfade_bars=crossfade_bars)
+
+ # Add time stretch filter if needed
if (
- fadein_start_pos is not None
- and fadein_start_pos + crossfade_duration > SMART_CROSSFADE_DURATION
+ 0.1 < bpm_diff_percent <= self.time_stretch_bpm_percentage_threshold
+ and crossfade_bars > 4
):
+ self.filters.append(TimeStretchFilter(stretch_ratio=bpm_ratio))
+ # Re-extrapolate downbeats with actual tempo factor for time-stretched audio
+ self.extrapolated_fadeout_downbeats = extrapolate_downbeats(
+ self.fade_out_analysis.downbeats,
+ tempo_factor=bpm_ratio,
+ bpm=self.fade_out_analysis.bpm,
+ )
+
+ # Check if we would have enough audio after beat alignment for the crossfade
+ if fadein_start_pos and fadein_start_pos + crossfade_duration <= SMART_CROSSFADE_DURATION:
+ self.filters.append(TrimFilter(fadein_start_pos=fadein_start_pos))
+ else:
self.logger.debug(
"Skipping beat alignment: not enough audio after trim (%.1fs + %.1fs > %.1fs)",
fadein_start_pos,
crossfade_duration,
SMART_CROSSFADE_DURATION,
)
- # Skip beat alignment
- fadein_start_pos = None
# Adjust crossfade duration to align with outgoing track's downbeats
- # This prevents echo-ey sounds when both tracks have kicks during the crossfade
crossfade_duration = self._adjust_crossfade_to_downbeats(
- fade_out_analysis=fade_out_analysis,
crossfade_duration=crossfade_duration,
fadein_start_pos=fadein_start_pos,
- tempo_factor=tempo_factor,
)
- beat_align_filters = self._trim_incoming_track_to_downbeat(
- fadein_start_pos=fadein_start_pos,
- fadeout_input_label="[fadeout_stretched]",
- fadein_input_label="[1]",
- )
- filters.extend(beat_align_filters)
+ # 90 BPM -> 1500Hz, 140 BPM -> 2500Hz
+ avg_bpm = (self.fade_out_analysis.bpm + self.fade_in_analysis.bpm) / 2
+ crossover_freq = int(np.clip(1500 + (avg_bpm - 90) * 20, 1500, 2500))
- self.logger.debug(
- "Smart fade: out_bpm=%.1f, in_bpm=%.1f, %d bars, crossfade: %.2fs%s",
- fade_out_analysis.bpm,
- fade_in_analysis.bpm,
- crossfade_bars,
- crossfade_duration,
- ", beat-aligned" if fadein_start_pos else "",
- )
- frequency_filters = self._apply_eq_filters(
- fade_out_analysis=fade_out_analysis,
- fade_in_analysis=fade_in_analysis,
- fade_out_label="[fadeout_beatalign]",
- fade_in_label="[fadein_beatalign]",
- crossfade_duration=crossfade_duration,
- crossfade_bars=crossfade_bars,
+ # Adjust for BPM mismatch
+ if abs(bpm_ratio - 1.0) > 0.3:
+ crossover_freq = int(crossover_freq * 0.85)
+
+ # For shorter fades, use exp/exp curves to avoid abruptness
+ if crossfade_bars < 8:
+ fadeout_curve = "exponential"
+ fadein_curve = "exponential"
+ # For long fades, use log/linear curves
+ else:
+ # Use logarithmic curve to give the next track more space
+ fadeout_curve = "logarithmic"
+ # Use linear curve for transition, predictable and not too abrupt
+ fadein_curve = "linear"
+
+ # Create lowpass filter on the outgoing track (unfiltered → low-pass)
+ # Extended lowpass effect to gradually remove bass frequencies
+ fadeout_eq_duration = min(max(crossfade_duration * 2.5, 8.0), SMART_CROSSFADE_DURATION)
+ # The crossfade always happens at the END of the buffer
+ fadeout_eq_start = max(0, SMART_CROSSFADE_DURATION - fadeout_eq_duration)
+ fadeout_sweep = FrequencySweepFilter(
+ sweep_type="lowpass",
+ target_freq=crossover_freq,
+ duration=fadeout_eq_duration,
+ start_time=fadeout_eq_start,
+ sweep_direction="fade_in",
+ poles=1,
+ curve_type=fadeout_curve,
+ stream_type="fadeout",
)
- filters.extend(frequency_filters)
+ self.filters.append(fadeout_sweep)
- # Apply linear crossfade for now since we already use EQ sweeps for smoothness
- filters.append(f"[fadeout_eq][fadein_eq]acrossfade=d={crossfade_duration}")
+ # Create high pass filter on the incoming track (high-pass → unfiltered)
+ # Quicker highpass removal to avoid lingering vocals after crossfade
+ fadein_eq_duration = crossfade_duration / 1.5
+ fadein_sweep = FrequencySweepFilter(
+ sweep_type="highpass",
+ target_freq=crossover_freq,
+ duration=fadein_eq_duration,
+ start_time=0,
+ sweep_direction="fade_out",
+ poles=1,
+ curve_type=fadein_curve,
+ stream_type="fadein",
+ )
+ self.filters.append(fadein_sweep)
- return filters
+ # Add final crossfade filter
+ crossfade_filter = CrossfadeFilter(crossfade_duration=crossfade_duration)
+ self.filters.append(crossfade_filter)
- def _calculate_crossfade_duration(
- self,
- crossfade_bars: int,
- fade_in_analysis: SmartFadesAnalysis,
- ) -> float:
+ def _calculate_crossfade_duration(self, crossfade_bars: int) -> float:
"""Calculate final crossfade duration based on musical bars and BPM."""
# Calculate crossfade duration based on incoming track's BPM
- # This ensures a musically consistent crossfade length regardless of beat positions
beats_per_bar = 4
- seconds_per_beat = 60.0 / fade_in_analysis.bpm
+ seconds_per_beat = 60.0 / self.fade_in_analysis.bpm
musical_duration = crossfade_bars * beats_per_bar * seconds_per_beat
# Apply buffer constraint
return actual_duration
- def _extrapolate_downbeats(
- self,
- downbeats: npt.NDArray[np.float64],
- tempo_factor: float,
- buffer_size: float = SMART_CROSSFADE_DURATION,
- ) -> npt.NDArray[np.float64]:
- """Extrapolate downbeats based on actual intervals when detection is incomplete.
-
- This is needed when we want to perform beat alignment in an 'atmospheric' outro
- that does not have any detected downbeats.
- """
- if len(downbeats) < 3:
- # Need at least 3 downbeats to reliably calculate interval
- return downbeats / tempo_factor
+ def _calculate_optimal_crossfade_bars(self) -> int:
+ """Calculate optimal crossfade bars that fit in available buffer."""
+ bpm_in = self.fade_in_analysis.bpm
+ bpm_out = self.fade_out_analysis.bpm
+ bpm_diff_percent = abs(1.0 - bpm_in / bpm_out) * 100
- # Adjust detected downbeats for time stretching first
- adjusted_downbeats = downbeats / tempo_factor
- last_downbeat = adjusted_downbeats[-1]
+ # Calculate ideal bars based on BPM compatibility
+ ideal_bars = 10 if bpm_diff_percent <= self.time_stretch_bpm_percentage_threshold else 6
- # If the last downbeat is close to the buffer end, no extrapolation needed
- if last_downbeat >= buffer_size - 5:
- return adjusted_downbeats
+ # Reduce bars until it fits in the fadein buffer
+ for bars in [ideal_bars, 8, 6, 4, 2, 1]:
+ if bars > ideal_bars:
+ continue
- # Calculate intervals from ORIGINAL downbeats (before time stretching)
- intervals = np.diff(downbeats)
- median_interval = float(np.median(intervals))
- std_interval = float(np.std(intervals))
+ fadein_start_pos = self._calculate_optimal_fade_timing(bars)
+ if fadein_start_pos is None:
+ continue
- # Only extrapolate if intervals are consistent (low standard deviation)
- if std_interval > 0.2:
- self.logger.debug(
- "Downbeat intervals too inconsistent (std=%.3fs) for extrapolation",
- std_interval,
- )
- return adjusted_downbeats
+ # Calculate what the duration would be
+ test_duration = self._calculate_crossfade_duration(crossfade_bars=bars)
- # Adjust the interval for time stretching
- # When slowing down (tempo_factor < 1.0), intervals get longer
- adjusted_interval = median_interval / tempo_factor
+ # Check if it fits in fadein buffer
+ fadein_buffer = SMART_CROSSFADE_DURATION - fadein_start_pos
+ if test_duration <= fadein_buffer:
+ if bars < ideal_bars:
+ self.logger.debug(
+ "Reduced crossfade from %d to %d bars (fadein buffer=%.1fs, needed=%.1fs)",
+ ideal_bars,
+ bars,
+ fadein_buffer,
+ test_duration,
+ )
+ return bars
- # Extrapolate forward from last adjusted downbeat using adjusted interval
- extrapolated = []
- current_pos = last_downbeat + adjusted_interval
- max_extrapolation_distance = 25.0 # Don't extrapolate more than 25s
+ # Fall back to 1 bar if nothing else fits
+ return 1
- while (
- current_pos < buffer_size
- and (current_pos - last_downbeat) <= max_extrapolation_distance
- ):
- extrapolated.append(current_pos)
- current_pos += adjusted_interval
+ def _calculate_optimal_fade_timing(self, crossfade_bars: int) -> float | None:
+ """Calculate beat positions for alignment."""
+ beats_per_bar = 4
- if extrapolated:
- self.logger.debug(
- "Extrapolated %d downbeats (adjusted_interval=%.3fs, original=%.3fs) "
- "from %.2fs to %.2fs",
- len(extrapolated),
- adjusted_interval,
- median_interval,
- last_downbeat,
- extrapolated[-1],
- )
- # Combine adjusted detected downbeats and extrapolated downbeats
- return np.concatenate([adjusted_downbeats, np.array(extrapolated)])
+ def calculate_beat_positions(
+ fade_out_beats: npt.NDArray[np.float64],
+ fade_in_beats: npt.NDArray[np.float64],
+ num_beats: int,
+ ) -> float | None:
+ """Calculate start positions from beat arrays."""
+ if len(fade_out_beats) < num_beats or len(fade_in_beats) < num_beats:
+ return None
- return adjusted_downbeats
+ fade_in_slice = fade_in_beats[:num_beats]
+ return float(fade_in_slice[0])
+
+ # Try downbeats first for most musical timing
+ downbeat_positions = calculate_beat_positions(
+ self.extrapolated_fadeout_downbeats, self.fade_in_analysis.downbeats, crossfade_bars
+ )
+ if downbeat_positions:
+ return downbeat_positions
+
+ # Try regular beats if downbeats insufficient
+ required_beats = crossfade_bars * beats_per_bar
+ beat_positions = calculate_beat_positions(
+ self.fade_out_analysis.beats, self.fade_in_analysis.beats, required_beats
+ )
+ if beat_positions:
+ return beat_positions
+
+ # Fallback: No beat alignment possible
+ self.logger.debug("No beat alignment possible (insufficient beats)")
+ return None
def _adjust_crossfade_to_downbeats(
self,
- fade_out_analysis: SmartFadesAnalysis,
crossfade_duration: float,
fadein_start_pos: float | None,
- tempo_factor: float,
) -> float:
- """Adjust crossfade duration to align with outgoing track's downbeats.
-
- This ensures the crossfade starts on a downbeat of the outgoing track,
- preventing echo-ey sounds when both tracks have kicks during the crossfade.
-
- The downbeat positions are adjusted for time stretching - when tempo_factor < 1.0
- (slowing down), beats take longer to reach their position in the stretched audio.
- """
+ """Adjust crossfade duration to align with outgoing track's downbeats."""
# If we don't have downbeats or beat alignment is disabled, return original duration
- if len(fade_out_analysis.downbeats) == 0 or fadein_start_pos is None:
+ if len(self.extrapolated_fadeout_downbeats) == 0 or fadein_start_pos is None:
return crossfade_duration
- # Extrapolate downbeats if needed (e.g., when beat detection is incomplete)
- # This returns downbeats already adjusted for time stretching
- adjusted_downbeats = self._extrapolate_downbeats(
- fade_out_analysis.downbeats, tempo_factor=tempo_factor
- )
-
# Calculate where the crossfade would start in the buffer
ideal_start_pos = SMART_CROSSFADE_DURATION - crossfade_duration
- # Debug: Show all downbeats and the ideal position
+ # Debug logging
self.logger.debug(
"Downbeat adjustment - ideal_start=%.2fs (buffer=%.1fs - crossfade=%.2fs), "
- "fadein_start=%.2fs, tempo_factor=%.4f",
+ "fadein_start=%.2fs",
ideal_start_pos,
SMART_CROSSFADE_DURATION,
crossfade_duration,
fadein_start_pos,
- tempo_factor,
)
# Find the closest downbeats (earlier and later)
earlier_downbeat = None
later_downbeat = None
- for downbeat in adjusted_downbeats:
+ for downbeat in self.extrapolated_fadeout_downbeats:
if downbeat <= ideal_start_pos:
earlier_downbeat = downbeat
elif downbeat > ideal_start_pos and later_downbeat is None:
# Try earlier downbeat first (longer crossfade)
if earlier_downbeat is not None:
adjusted_duration = float(SMART_CROSSFADE_DURATION - earlier_downbeat)
- # Check if this fits in the buffer
if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION:
if abs(adjusted_duration - crossfade_duration) > 0.1:
self.logger.debug(
# Try later downbeat (shorter crossfade)
if later_downbeat is not None:
adjusted_duration = float(SMART_CROSSFADE_DURATION - later_downbeat)
- # Check if this fits in the buffer
if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION:
if abs(adjusted_duration - crossfade_duration) > 0.1:
self.logger.debug(
)
return crossfade_duration
- def _calculate_optimal_crossfade_bars(
- self, fade_out_analysis: SmartFadesAnalysis, fade_in_analysis: SmartFadesAnalysis
- ) -> int:
- """Calculate optimal crossfade bars that fit in available buffer."""
- bpm_in = fade_in_analysis.bpm
- bpm_out = fade_out_analysis.bpm
- bpm_diff_percent = abs(1.0 - bpm_in / bpm_out) * 100
-
- # Calculate ideal bars based on BPM compatibility. We link this to time stretching
- # so we avoid extreme tempo changes over short fades.
- ideal_bars = 10 if bpm_diff_percent <= TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD else 6
-
- # We could encounter songs that have a long athmospheric intro without any downbeats
- # In those cases, we need to reduce the bars until it fits in the fadein buffer.
- for bars in [ideal_bars, 8, 6, 4, 2, 1]:
- if bars > ideal_bars:
- continue # Skip bars longer than optimal
-
- fadeout_start_pos, fadein_start_pos = self._calculate_optimal_fade_timing(
- fade_out_analysis, fade_in_analysis, bars
- )
- if fadeout_start_pos is None or fadein_start_pos is None:
- continue
-
- # Calculate what the duration would be
- test_duration = self._calculate_crossfade_duration(
- crossfade_bars=bars,
- fade_in_analysis=fade_in_analysis,
- )
-
- # Check if it fits in fadein buffer
- fadein_buffer = SMART_CROSSFADE_DURATION - fadein_start_pos
- if test_duration <= fadein_buffer:
- if bars < ideal_bars:
- self.logger.debug(
- "Reduced crossfade from %d to %d bars (fadein buffer=%.1fs, needed=%.1fs)",
- ideal_bars,
- bars,
- fadein_buffer,
- test_duration,
- )
- return bars
-
- # Fall back to 1 bar if nothing else fits
- return 1
-
- def _calculate_optimal_fade_timing(
- self,
- fade_out_analysis: SmartFadesAnalysis,
- fade_in_analysis: SmartFadesAnalysis,
- crossfade_bars: int,
- ) -> tuple[float | None, float | None]:
- """Calculate beat positions for alignment."""
- beats_per_bar = 4
-
- # Helper function to calculate beat positions from beat arrays
- def calculate_beat_positions(
- fade_out_beats: npt.NDArray[np.float64],
- fade_in_beats: npt.NDArray[np.float64],
- num_beats: int,
- ) -> tuple[float, float] | None:
- """Calculate start positions from beat arrays with phantom downbeat support."""
- if len(fade_out_beats) < num_beats or len(fade_in_beats) < num_beats:
- return None
- fade_out_slice = fade_out_beats[-num_beats:]
+class StandardCrossFade(SmartFade):
+ """Standard crossfade class that implements a standard crossfade mode."""
- # For fadein, find the earliest downbeat that fits in buffer
- fade_in_slice = fade_in_beats[:num_beats]
- fadein_start_pos = fade_in_slice[0]
+ def __init__(self, crossfade_duration: float = 10.0) -> None:
+ """Initialize StandardCrossFade with crossfade duration."""
+ self.crossfade_duration = crossfade_duration
+ super().__init__()
- fadeout_start_pos = fade_out_slice[0]
- return fadeout_start_pos, fadein_start_pos
+ def _build(self) -> None:
+ """Build the standard crossfade filter chain."""
+ self.filters = [
+ CrossfadeFilter(crossfade_duration=self.crossfade_duration),
+ ]
- # Try downbeats first for most musical timing
- downbeat_positions = calculate_beat_positions(
- fade_out_analysis.downbeats, fade_in_analysis.downbeats, crossfade_bars
+ async def apply(
+ self, fade_out_part: bytes, fade_in_part: bytes, pcm_format: AudioFormat
+ ) -> bytes:
+ """Apply the standard crossfade to the given PCM audio parts."""
+ # We need to override the default apply here, since standard crossfade only needs to be
+ # applied to the overlapping parts, not the full buffers.
+ crossfade_size = int(pcm_format.pcm_sample_size * self.crossfade_duration)
+ # Pre-crossfade: outgoing track minus the crossfaded portion
+ pre_crossfade = fade_out_part[:-crossfade_size]
+ # Post-crossfade: incoming track minus the crossfaded portion
+ post_crossfade = fade_in_part[crossfade_size:]
+ # Adjust portions to exact crossfade size
+ adjusted_fade_in_part = fade_in_part[:crossfade_size]
+ adjusted_fade_out_part = fade_out_part[-crossfade_size:]
+ # Adjust the duration to match actual sizes
+ self.crossfade_duration = min(
+ len(adjusted_fade_in_part) / pcm_format.pcm_sample_size,
+ len(adjusted_fade_out_part) / pcm_format.pcm_sample_size,
)
- if downbeat_positions:
- return downbeat_positions
-
- # Try regular beats if downbeats insufficient
- required_beats = crossfade_bars * beats_per_bar
- beat_positions = calculate_beat_positions(
- fade_out_analysis.beats, fade_in_analysis.beats, required_beats
+ # Crossfaded portion: user's configured duration
+ crossfaded_section = await super().apply(
+ adjusted_fade_out_part, adjusted_fade_in_part, pcm_format
)
- if beat_positions:
- return beat_positions
-
- # Fallback: No beat alignment possible
- self.logger.debug("No beat alignment possible (insufficient beats)")
- return None, None
-
- def _create_frequency_sweep_filter(
- self,
- input_label: str,
- output_label: str,
- sweep_type: str, # 'lowpass' or 'highpass'
- target_freq: int,
- duration: float,
- start_time: float = 0.0,
- sweep_direction: str = "fade_in", # 'fade_in' or 'fade_out'
- poles: int = 2,
- curve_type: str = "linear", # 'linear', 'exponential', 'logarithmic'
- ) -> list[str]:
- """Generate FFmpeg filters for frequency sweep effect."""
- orig_label = f"{output_label}_orig"
- filter_label = f"{output_label}_to{sweep_type[:2]}"
- filtered_label = f"{output_label}_filtered"
- orig_faded_label = f"{output_label}_orig_faded"
- filtered_faded_label = f"{output_label}_filtered_faded"
+ # Full result: everything concatenated
+ return pre_crossfade + crossfaded_section + post_crossfade
- # Generate volume expression based on curve type
- def generate_volume_expr(start: float, dur: float, direction: str, curve: str) -> str:
- t_expr = f"t-{start}" # Time relative to start
- norm_t = f"min(max({t_expr},0),{dur})/{dur}" # Normalized 0-1
-
- if curve == "exponential":
- # Exponential curve for smoother transitions
- if direction == "up":
- return f"'pow({norm_t},2)':eval=frame"
- else:
- return f"'1-pow({norm_t},2)':eval=frame"
- elif curve == "logarithmic":
- # Logarithmic curve for more aggressive initial change
- if direction == "up":
- return f"'sqrt({norm_t})':eval=frame"
- else:
- return f"'1-sqrt({norm_t})':eval=frame"
- elif direction == "up":
- return f"'{norm_t}':eval=frame"
- else:
- return f"'1-{norm_t}':eval=frame"
- # Determine volume ramp directions based on sweep direction
- if sweep_direction == "fade_in":
- # Fade from dry to wet (unfiltered to filtered)
- orig_direction = "down"
- filter_direction = "up"
- else: # fade_out
- # Fade from wet to dry (filtered to unfiltered)
- orig_direction = "up"
- filter_direction = "down"
+#############################
+# SMART FADES MIXER LOGIC
+#############################
+class SmartFadesMixer:
+ """Smart fades mixer class that mixes tracks based on analysis data."""
- # Build filter chain
- return [
- # Split input into two paths
- f"{input_label}asplit=2[{orig_label}][{filter_label}]",
- # Apply frequency filter to one path
- f"[{filter_label}]{sweep_type}=f={target_freq}:poles={poles}[{filtered_label}]",
- # Apply time-varying volume to original path
- (
- f"[{orig_label}]volume="
- f"{generate_volume_expr(start_time, duration, orig_direction, curve_type)}"
- f"[{orig_faded_label}]"
- ),
- # Apply time-varying volume to filtered path
- (
- f"[{filtered_label}]volume="
- f"{generate_volume_expr(start_time, duration, filter_direction, curve_type)}"
- f"[{filtered_faded_label}]"
- ),
- # Mix the two paths together
- (
- f"[{orig_faded_label}][{filtered_faded_label}]"
- f"amix=inputs=2:duration=longest:normalize=0[{output_label}]"
- ),
- ]
+ def __init__(self, mass: MusicAssistant) -> None:
+ """Initialize smart fades mixer."""
+ self.mass = mass
+ self.logger = logging.getLogger(__name__)
+ # TODO: Refactor into stream (or metadata) controller after we have split the controllers
+ self.analyzer = SmartFadesAnalyzer(mass)
- def _trim_incoming_track_to_downbeat(
+ async def mix(
self,
- fadein_start_pos: float | None,
- fadeout_input_label: str = "[0]",
- fadein_input_label: str = "[1]",
- ) -> list[str]:
- """Perform beat alignment preprocessing.
-
- The incoming track is trimmed to its first downbeat position.
- No adjustment is needed for time stretching since the incoming track
- is not stretched - it's already at the target BPM.
- """
- # Just relabel in case we cannot perform beat alignment
- if fadein_start_pos is None:
- return [
- f"{fadeout_input_label}anull[fadeout_beatalign]", # codespell:ignore anull
- f"{fadein_input_label}anull[fadein_beatalign]", # codespell:ignore anull
- ]
-
- # Trim incoming track to start at first downbeat position
- return [
- f"{fadeout_input_label}anull[fadeout_beatalign]", # codespell:ignore anull
- f"{fadein_input_label}atrim=start={fadein_start_pos},asetpts=PTS-STARTPTS[fadein_beatalign]",
- ]
+ fade_in_part: bytes,
+ fade_out_part: bytes,
+ fade_in_streamdetails: StreamDetails,
+ fade_out_streamdetails: StreamDetails,
+ pcm_format: AudioFormat,
+ standard_crossfade_duration: int = 10,
+ mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
+ ) -> bytes:
+ """Apply crossfade with internal state management and smart/standard fallback logic."""
+ if mode == SmartFadesMode.DISABLED:
+ # No crossfade, just concatenate
+ # Note that this should not happen since we check this before calling mix()
+ # but just to be sure...
+ return fade_out_part + fade_in_part
- def _create_time_stretch_filters(
- self,
- fade_out_analysis: SmartFadesAnalysis,
- fade_in_analysis: SmartFadesAnalysis,
- crossfade_bars: int,
- crossfade_duration: float,
- ) -> tuple[list[str], float]:
- """Create FFmpeg filters to gradually adjust tempo from original BPM to target BPM.
+ # strip silence from end of audio of fade_out_part
+ fade_out_part = await strip_silence(
+ self.mass,
+ fade_out_part,
+ pcm_format=pcm_format,
+ reverse=True,
+ )
+ # Ensure frame alignment after silence stripping
+ fade_out_part = align_audio_to_frame_boundary(fade_out_part, pcm_format)
- The tempo ramping is completed before the crossfade starts to ensure perfect beat alignment
- throughout the entire crossfade region.
- """
- # Check if time stretching should be applied (BPM difference < 3%)
- original_bpm = fade_out_analysis.bpm
- target_bpm = fade_in_analysis.bpm
- bpm_ratio = target_bpm / original_bpm
- bpm_diff_percent = abs(1.0 - bpm_ratio) * 100
+ # strip silence from begin of audio of fade_in_part
+ fade_in_part = await strip_silence(
+ self.mass,
+ fade_in_part,
+ pcm_format=pcm_format,
+ reverse=False,
+ )
+ # Ensure frame alignment after silence stripping
+ fade_in_part = align_audio_to_frame_boundary(fade_in_part, pcm_format)
+ if mode == SmartFadesMode.STANDARD_CROSSFADE:
+ smart_fade: SmartFade = StandardCrossFade(
+ crossfade_duration=standard_crossfade_duration
+ )
+ return await smart_fade.apply(
+ fade_out_part,
+ fade_in_part,
+ pcm_format,
+ )
+ # Attempt smart crossfade with analysis data
+ fade_out_analysis: SmartFadesAnalysis | None
+ if stored_analysis := await self.mass.music.get_smart_fades_analysis(
+ fade_out_streamdetails.item_id,
+ fade_out_streamdetails.provider,
+ SmartFadesAnalysisFragment.OUTRO,
+ ):
+ fade_out_analysis = stored_analysis
+ else:
+ fade_out_analysis = await self.analyzer.analyze(
+ fade_out_streamdetails.item_id,
+ fade_out_streamdetails.provider,
+ SmartFadesAnalysisFragment.OUTRO,
+ fade_out_part,
+ pcm_format,
+ )
- # If no time stretching needed, return passthrough filter and no tempo change
- if not (
- 0.1 < bpm_diff_percent <= TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD and crossfade_bars > 4
+ fade_in_analysis: SmartFadesAnalysis | None
+ if stored_analysis := await self.mass.music.get_smart_fades_analysis(
+ fade_in_streamdetails.item_id,
+ fade_in_streamdetails.provider,
+ SmartFadesAnalysisFragment.INTRO,
):
- return ["[0]anull[fadeout_stretched]"], 1.0 # codespell:ignore anull
+ fade_in_analysis = stored_analysis
+ else:
+ fade_in_analysis = await self.analyzer.analyze(
+ fade_in_streamdetails.item_id,
+ fade_in_streamdetails.provider,
+ SmartFadesAnalysisFragment.INTRO,
+ fade_in_part,
+ pcm_format,
+ )
+ if (
+ fade_out_analysis
+ and fade_in_analysis
+ and fade_out_analysis.confidence > 0.3
+ and fade_in_analysis.confidence > 0.3
+ and mode == SmartFadesMode.SMART_CROSSFADE
+ ):
+ try:
+ smart_fade = SmartCrossFade(fade_out_analysis, fade_in_analysis)
+ return await smart_fade.apply(
+ fade_out_part,
+ fade_in_part,
+ pcm_format,
+ )
+ except Exception as e:
+ self.logger.warning(
+ "Smart crossfade failed: %s, falling back to standard crossfade", e
+ )
- # Log that we're applying time stretching
- self.logger.debug(
- "Time stretch: %.1f%% BPM diff, adjusting %.1f -> %.1f BPM, crossfade starts at %.1fs",
- bpm_diff_percent,
- original_bpm,
- target_bpm,
- SMART_CROSSFADE_DURATION - crossfade_duration,
+ # Always fallback to Standard Crossfade in case something goes wrong
+ smart_fade = StandardCrossFade(crossfade_duration=standard_crossfade_duration)
+ return await smart_fade.apply(
+ fade_out_part,
+ fade_in_part,
+ pcm_format,
)
- # Use uniform rubberband time stretching for the entire buffer
- # This ensures downbeat adjustment calculations are accurate and beat alignment is perfect
- # Rubberband is a high-quality music-specific algorithm optimized for music
- self.logger.debug(
- "Time stretch (rubberband uniform): %.1f BPM -> %.1f BPM (factor=%.4f)",
- original_bpm,
- target_bpm,
- bpm_ratio,
- )
- return [
- f"[0]rubberband=tempo={bpm_ratio:.6f}:transients=mixed:detector=soft:pitchq=quality"
- "[fadeout_stretched]"
- ], bpm_ratio
- def _apply_eq_filters(
- self,
- fade_out_analysis: SmartFadesAnalysis,
- fade_in_analysis: SmartFadesAnalysis,
- fade_out_label: str,
- fade_in_label: str,
- crossfade_duration: float,
- crossfade_bars: int,
- ) -> list[str]:
- """Create LP / HP complementary filters using frequency sweeps for smooth transitions."""
- # Calculate target frequency based on average BPM
- avg_bpm = (fade_out_analysis.bpm + fade_in_analysis.bpm) / 2
- bpm_ratio = fade_in_analysis.bpm / fade_out_analysis.bpm
+# HELPER METHODS
+def get_bpm_diff_percentage(bpm1: float, bpm2: float) -> float:
+ """Calculate BPM difference percentage between two BPM values."""
+ return abs(1.0 - bpm1 / bpm2) * 100
+
+
+def extrapolate_downbeats(
+ downbeats: npt.NDArray[np.float64],
+ tempo_factor: float,
+ buffer_size: float = SMART_CROSSFADE_DURATION,
+ bpm: float | None = None,
+) -> npt.NDArray[np.float64]:
+ """Extrapolate downbeats based on actual intervals when detection is incomplete.
+
+ This is needed when we want to perform beat alignment in an 'atmospheric' outro
+ that does not have any detected downbeats.
+
+ Args:
+ downbeats: Array of detected downbeat positions in seconds
+ tempo_factor: Tempo adjustment factor for time stretching
+ buffer_size: Maximum buffer size in seconds
+ bpm: Optional BPM for validation when extrapolating with only 2 downbeats
+ """
+ # Handle case with exactly 2 downbeats (with BPM validation)
+ if len(downbeats) == 2 and bpm is not None:
+ interval = float(downbeats[1] - downbeats[0])
+
+ # Expected interval for this BPM (assuming 4/4 time signature)
+ expected_interval = (60.0 / bpm) * 4
+
+ # Only extrapolate if interval matches BPM within 15% tolerance
+ if abs(interval - expected_interval) / expected_interval < 0.15:
+ # Adjust detected downbeats for time stretching first
+ adjusted_downbeats = downbeats / tempo_factor
+ last_downbeat = adjusted_downbeats[-1]
+
+ # If the last downbeat is close to the buffer end, no extrapolation needed
+ if last_downbeat >= buffer_size - 5:
+ return adjusted_downbeats
+
+ # Adjust the interval for time stretching
+ adjusted_interval = interval / tempo_factor
+
+ # Extrapolate forward from last adjusted downbeat using adjusted interval
+ extrapolated = []
+ current_pos = last_downbeat + adjusted_interval
+ max_extrapolation_distance = 125.0 # Don't extrapolate more than 25s
+
+ while (
+ current_pos < buffer_size
+ and (current_pos - last_downbeat) <= max_extrapolation_distance
+ ):
+ extrapolated.append(current_pos)
+ current_pos += adjusted_interval
+
+ if extrapolated:
+ # Combine adjusted detected downbeats and extrapolated downbeats
+ return np.concatenate([adjusted_downbeats, np.array(extrapolated)])
- # 90 BPM -> 1500Hz, 140 BPM -> 2500Hz
- crossover_freq = int(np.clip(1500 + (avg_bpm - 90) * 20, 1500, 2500))
+ return adjusted_downbeats
+ # else: interval doesn't match BPM, fall through to return original
- # Adjust for BPM mismatch
- if abs(bpm_ratio - 1.0) > 0.3:
- crossover_freq = int(crossover_freq * 0.85)
+ if len(downbeats) < 2:
+ # Need at least 2 downbeats to extrapolate
+ return downbeats / tempo_factor
- # Extended lowpass effect to gradually remove bass frequencies
- fadeout_eq_duration = min(max(crossfade_duration * 2.5, 8.0), SMART_CROSSFADE_DURATION)
+ # Adjust detected downbeats for time stretching first
+ adjusted_downbeats = downbeats / tempo_factor
+ last_downbeat = adjusted_downbeats[-1]
- # Quicker highpass removal to avoid lingering vocals after crossfade
- fadein_eq_duration = crossfade_duration / 1.5
+ # If the last downbeat is close to the buffer end, no extrapolation needed
+ if last_downbeat >= buffer_size - 5:
+ return adjusted_downbeats
- # Calculate when the EQ sweep should start
- # The crossfade always happens at the END of the buffer, regardless of beat alignment
- fadeout_eq_start = max(0, SMART_CROSSFADE_DURATION - fadeout_eq_duration)
+ # Calculate intervals from ORIGINAL downbeats (before time stretching)
+ intervals = np.diff(downbeats)
+ median_interval = float(np.median(intervals))
+ std_interval = float(np.std(intervals))
- # For shorter fades, use exp/exp curves to avoid abruptness
- if crossfade_bars < 8:
- fadeout_curve = "exponential"
- fadein_curve = "exponential"
- # For long fades, use log/linear curves
- else:
- # Use logarithmic curve to give the next track more space
- fadeout_curve = "logarithmic"
- # Use linear curve for transition, predictable and not too abrupt
- fadein_curve = "linear"
+ # Only extrapolate if intervals are consistent (low standard deviation)
+ if std_interval > 0.2:
+ return adjusted_downbeats
- self.logger.debug(
- "EQ: crossover=%dHz, EQ fadeout duration=%.1fs,"
- " EQ fadein duration=%.1fs, BPM=%.1f, BPM ratio=%.2f,"
- " EQ curves: %s/%s",
- crossover_freq,
- fadeout_eq_duration,
- fadein_eq_duration,
- avg_bpm,
- bpm_ratio,
- fadeout_curve,
- fadein_curve,
- )
+ # Adjust the interval for time stretching
+ # When slowing down (tempo_factor < 1.0), intervals get longer
+ adjusted_interval = median_interval / tempo_factor
- # fadeout (unfiltered → low-pass)
- fadeout_filters = self._create_frequency_sweep_filter(
- input_label=fade_out_label,
- output_label="fadeout_eq",
- sweep_type="lowpass",
- target_freq=crossover_freq,
- duration=fadeout_eq_duration,
- start_time=fadeout_eq_start,
- sweep_direction="fade_in",
- poles=1,
- curve_type=fadeout_curve,
- )
+ # Extrapolate forward from last adjusted downbeat using adjusted interval
+ extrapolated = []
+ current_pos = last_downbeat + adjusted_interval
+ max_extrapolation_distance = 25.0 # Don't extrapolate more than 25s
- # fadein (high-pass → unfiltered)
- fadein_filters = self._create_frequency_sweep_filter(
- input_label=fade_in_label,
- output_label="fadein_eq",
- sweep_type="highpass",
- target_freq=crossover_freq,
- duration=fadein_eq_duration,
- start_time=0,
- sweep_direction="fade_out",
- poles=1,
- curve_type=fadein_curve,
- )
+ while current_pos < buffer_size and (current_pos - last_downbeat) <= max_extrapolation_distance:
+ extrapolated.append(current_pos)
+ current_pos += adjusted_interval
- return fadeout_filters + fadein_filters
+ if extrapolated:
+ # Combine adjusted detected downbeats and extrapolated downbeats
+ return np.concatenate([adjusted_downbeats, np.array(extrapolated)])
- # FALLBACK DEFAULT CROSSFADE
- async def _default_crossfade(
- self,
- fade_in_part: bytes,
- fade_out_part: bytes,
- pcm_format: AudioFormat,
- crossfade_duration: int = 10,
- ) -> bytes:
- """Apply a standard crossfade without smart analysis."""
- self.logger.debug("Applying standard crossfade of %ds", crossfade_duration)
- crossfade_size = int(pcm_format.pcm_sample_size * crossfade_duration)
- # Pre-crossfade: outgoing track minus the crossfaded portion
- pre_crossfade = fade_out_part[:-crossfade_size]
- # Crossfaded portion: user's configured duration
- crossfaded_section = await crossfade_pcm_parts(
- fade_in_part[:crossfade_size],
- fade_out_part[-crossfade_size:],
- pcm_format=pcm_format,
- fade_out_pcm_format=pcm_format,
- )
- # Post-crossfade: incoming track minus the crossfaded portion
- post_crossfade = fade_in_part[crossfade_size:]
- # Full result: everything concatenated
- return pre_crossfade + crossfaded_section + post_crossfade
+ return adjusted_downbeats