from music_assistant_models.enums import (
CacheCategory,
ConfigEntryType,
+ ContentType,
EventType,
MediaType,
PlayerState,
elapsed_time: int
stream_title: str | None
content_type: str | None
+ codec_type: ContentType | None
output_formats: list[str] | None
content_type=queue.current_item.streamdetails.audio_format.output_format_str
if queue.current_item and queue.current_item.streamdetails
else None,
+ codec_type=queue.current_item.streamdetails.audio_format.codec_type
+ if queue.current_item and queue.current_item.streamdetails
+ else None,
output_formats=output_formats,
)
changed_keys = get_changed_keys(prev_state, new_state)
bytes_sent += len(chunk)
continue
+ if chunk_number == 0:
+ # At this point ffmpeg has started and should now know the codec used
+ # for encoding the audio.
+ streamdetails.audio_format.codec_type = ffmpeg_proc.input_format.codec_type
+
chunk_number += 1
# determine buffer size dynamically
if chunk_number < 5 and strip_silence_begin:
self.log_history: deque[str] = deque(maxlen=100)
self._stdin_task: asyncio.Task | None = None
self._logger_task: asyncio.Task | None = None
+ self._input_codec_parsed = False
super().__init__(
ffmpeg_args,
stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input,
# if streamdetails contenttype is unknown, try parse it from the ffmpeg log
if line.startswith("Stream #") and ": Audio: " in line:
- if self.input_format.content_type == ContentType.UNKNOWN:
+ if not self._input_codec_parsed:
content_type_raw = line.split(": Audio: ")[1].split(" ")[0]
+ content_type_raw = content_type_raw.split(",")[0]
content_type = ContentType.try_parse(content_type_raw)
self.logger.debug(
"Detected (input) content type: %s (%s)",
content_type,
content_type_raw,
)
- self.input_format.content_type = content_type
+ if self.input_format.content_type == ContentType.UNKNOWN:
+ self.input_format.content_type = content_type
+ self.input_format.codec_type = content_type
+ self._input_codec_parsed = True
del line
async def _feed_stdin(self) -> None: