From: 柴田 <58556078+Shi-553@users.noreply.github.com> Date: Sun, 30 Nov 2025 19:24:33 +0000 (+0900) Subject: Extract HLS parser to shared helpers module (#2715) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=c2d22d091a3927ec4b55c6c2eb47db212e1918a7;p=music-assistant-server.git Extract HLS parser to shared helpers module (#2715) --- diff --git a/music_assistant/helpers/hls.py b/music_assistant/helpers/hls.py new file mode 100644 index 00000000..97baa5b5 --- /dev/null +++ b/music_assistant/helpers/hls.py @@ -0,0 +1,164 @@ +""" +RFC 8216-based HLS utilities. + +For simple variant stream selection from master playlists, use helpers.playlists.parse_m3u. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field + +from music_assistant_models.errors import InvalidDataError + + +@dataclass +class HLSMediaSegment: + """Single HLS media segment entry with associated metadata.""" + + extinf_line: str = "" + segment_url: str = "" + key_line: str | None = None + byterange_line: str | None = None + discontinuity: bool = False + map_line: str | None = None + program_date_time: str | None = None + + @property + def duration(self) -> float: + """Extract duration in seconds from #EXTINF line.""" + try: + duration_part = self.extinf_line.split("#EXTINF:")[1].split(",", 1)[0] + return float(duration_part.strip()) + except (IndexError, ValueError): + return 0.0 + + @property + def title(self) -> str | None: + """Extract optional title from #EXTINF line.""" + try: + parts = self.extinf_line.split("#EXTINF:")[1].split(",", 1) + if len(parts) == 2: + title = parts[1].strip() + return title if title else None + return None + except IndexError: + return None + + +@dataclass +class HLSMediaPlaylist: + """ + HLS media playlist structure with headers, segments, and footers preserved. + + Note: header_lines excludes EXT-X-KEY and EXT-X-MAP tags. Per RFC 8216, these + tags apply to subsequent segments until overridden, so they're stored per-segment + for easier manipulation. + """ + + header_lines: list[str] = field(default_factory=list) + segments: list[HLSMediaSegment] = field(default_factory=list) + footer_lines: list[str] = field(default_factory=list) + + +class HLSMediaPlaylistParser: + """RFC 8216-based HLS media playlist parser.""" + + def __init__(self, hls_playlist_text: str) -> None: + """Initialize parser with playlist text.""" + self.hls_playlist_text = hls_playlist_text + self.result = HLSMediaPlaylist() + self.working_segment = HLSMediaSegment() + self.segments_started = False + + def parse(self) -> HLSMediaPlaylist: + """Parse HLS media playlist text into structured data. + + Returns: + HLSMediaPlaylist object with extracted structure + + Raises: + InvalidDataError: If playlist doesn't start with #EXTM3U or has invalid format + """ + lines = [line.strip() for line in self.hls_playlist_text.split("\n") if line.strip()] + + if not lines or not lines[0].startswith("#EXTM3U"): + msg = "Invalid HLS playlist: must start with #EXTM3U" + raise InvalidDataError(msg) + + for line in lines: + self.process_line(line) + + if not self.result.segments: + msg = "Invalid HLS playlist: no segments found" + raise InvalidDataError(msg) + + return self.result + + def process_line(self, line: str) -> None: + """Process a single line from the playlist.""" + if line.startswith("#EXTINF:"): + self._on_extinf(line) + elif line.startswith("#EXT-X-KEY:"): + self._on_key_line(line) + elif line.startswith("#EXT-X-MAP:"): + self._on_map_line(line) + elif line.startswith("#EXT-X-PROGRAM-DATE-TIME:"): + self._on_program_date_time(line) + elif line.startswith("#EXT-X-BYTERANGE:"): + self._on_byterange(line) + elif line.startswith("#EXT-X-DISCONTINUITY"): + self._on_discontinuity() + elif line.startswith("#EXT"): + self._on_ext_tag(line) + elif line.startswith("#"): + pass + elif self.working_segment.extinf_line: + self._on_segment_url(line) + + def _on_extinf(self, line: str) -> None: + """Handle #EXTINF tag.""" + if self.working_segment.extinf_line: + msg = ( + f"Malformed HLS playlist: #EXTINF '{line}' found without " + f"preceding segment URL for '{self.working_segment.extinf_line}'" + ) + raise InvalidDataError(msg) + self.segments_started = True + self.working_segment.extinf_line = line + + def _on_key_line(self, line: str) -> None: + """Handle #EXT-X-KEY tag.""" + self.working_segment.key_line = line + + def _on_map_line(self, line: str) -> None: + """Handle #EXT-X-MAP tag.""" + self.working_segment.map_line = line + + def _on_program_date_time(self, line: str) -> None: + """Handle #EXT-X-PROGRAM-DATE-TIME tag.""" + self.working_segment.program_date_time = line + + def _on_byterange(self, line: str) -> None: + """Handle #EXT-X-BYTERANGE tag.""" + self.working_segment.byterange_line = line + + def _on_discontinuity(self) -> None: + """Handle #EXT-X-DISCONTINUITY tag.""" + self.working_segment.discontinuity = True + + def _on_ext_tag(self, line: str) -> None: + """Handle other #EXT tags.""" + if self.segments_started: + self.result.footer_lines.append(line) + else: + self.result.header_lines.append(line) + + def _on_segment_url(self, line: str) -> None: + """Handle segment URL following #EXTINF.""" + self.working_segment.segment_url = line + self.result.segments.append(self.working_segment) + + self.working_segment = HLSMediaSegment( + key_line=self.working_segment.key_line, + map_line=self.working_segment.map_line, + ) diff --git a/music_assistant/helpers/playlists.py b/music_assistant/helpers/playlists.py index aac79966..c66448c6 100644 --- a/music_assistant/helpers/playlists.py +++ b/music_assistant/helpers/playlists.py @@ -48,7 +48,12 @@ class PlaylistItem: def parse_m3u(m3u_data: str) -> list[PlaylistItem]: - """Very simple m3u parser. + """Lightweight M3U/M3U8 parser for playlist URL extraction. + + This parser returns a flat list of playlist items with basic metadata. + Supports HLS master playlist tags (#EXT-X-STREAM-INF, #EXT-X-KEY) for + stream selection and quality sorting, but does not preserve segment-level + details or playlist structure. Based on https://github.com/dvndrsn/M3uParser/blob/master/m3uparser.py """ @@ -75,7 +80,7 @@ def parse_m3u(m3u_data: str) -> list[PlaylistItem]: length = None title = info[1].strip() elif line.startswith("#EXT-X-STREAM-INF:"): - # HLS stream properties + # HLS master playlist variant stream properties (BANDWIDTH, RESOLUTION, etc.) # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10 stream_info = {} for part in line.replace("#EXT-X-STREAM-INF:", "").split(","): @@ -84,7 +89,7 @@ def parse_m3u(m3u_data: str) -> list[PlaylistItem]: kev_value_parts = part.strip().split("=") stream_info[kev_value_parts[0]] = kev_value_parts[1] elif line.startswith("#EXT-X-KEY:"): - # Extract encryption key URI if present + # Extract encryption key URI from master/media playlist # METHOD=NONE means no encryption, so explicitly clear the key if "METHOD=NONE" in line: key = None diff --git a/music_assistant/providers/nicovideo/converters/stream.py b/music_assistant/providers/nicovideo/converters/stream.py index 625b741e..ff51567d 100644 --- a/music_assistant/providers/nicovideo/converters/stream.py +++ b/music_assistant/providers/nicovideo/converters/stream.py @@ -13,9 +13,9 @@ from niconico.objects.video.watch import ( # noqa: TC002 - Using by StreamConve ) from pydantic import BaseModel +from music_assistant.helpers.hls import HLSMediaPlaylist, HLSMediaPlaylistParser from music_assistant.providers.nicovideo.converters.base import NicovideoConverterBase from music_assistant.providers.nicovideo.helpers import create_audio_format -from music_assistant.providers.nicovideo.helpers.hls_models import ParsedHLSPlaylist @dataclass @@ -31,7 +31,7 @@ class NicovideoStreamData: """ domand_bid: str - parsed_hls_playlist: ParsedHLSPlaylist + parsed_hls_playlist: HLSMediaPlaylist class StreamConversionData(BaseModel): @@ -77,7 +77,7 @@ class NicovideoStreamConverter(NicovideoConverterBase): # Do not use album image intentionally image = track.image if track else None - parsed_playlist = ParsedHLSPlaylist.from_text(conversion_data.hls_playlist_text) + parsed_playlist = HLSMediaPlaylistParser(conversion_data.hls_playlist_text).parse() return StreamDetails( provider=self.provider.instance_id, diff --git a/music_assistant/providers/nicovideo/helpers/__init__.py b/music_assistant/providers/nicovideo/helpers/__init__.py index 6809b3bf..d0c7558b 100644 --- a/music_assistant/providers/nicovideo/helpers/__init__.py +++ b/music_assistant/providers/nicovideo/helpers/__init__.py @@ -1,9 +1,5 @@ """Helper functions for nicovideo provider.""" -from music_assistant.providers.nicovideo.helpers.hls_models import ( - HLSSegment, - ParsedHLSPlaylist, -) from music_assistant.providers.nicovideo.helpers.hls_seek_optimizer import ( HLSSeekOptimizer, SeekOptimizedStreamContext, @@ -18,8 +14,6 @@ from music_assistant.providers.nicovideo.helpers.utils import ( __all__ = [ "AlbumWithTracks", "HLSSeekOptimizer", - "HLSSegment", - "ParsedHLSPlaylist", "PlaylistWithTracks", "SeekOptimizedStreamContext", "create_audio_format", diff --git a/music_assistant/providers/nicovideo/helpers/hls_models.py b/music_assistant/providers/nicovideo/helpers/hls_models.py deleted file mode 100644 index 4c10d6c8..00000000 --- a/music_assistant/providers/nicovideo/helpers/hls_models.py +++ /dev/null @@ -1,95 +0,0 @@ -"""HLS data models and parsing for nicovideo provider.""" - -from __future__ import annotations - -import re -from dataclasses import dataclass - - -@dataclass -class HLSSegment: - """Single HLS segment entry. - - Attributes: - duration_line: #EXTINF line with duration (e.g., "#EXTINF:5.967528,") - segment_url: URL to the segment file - """ - - duration_line: str - segment_url: str - - -@dataclass -class ParsedHLSPlaylist: - """Parsed HLS playlist data. - - Attributes: - init_segment_url: URL to the initialization segment (#EXT-X-MAP) - encryption_key_line: Encryption key line (#EXT-X-KEY) if present - segments: List of HLS segments - header_lines: Playlist header lines (#EXTM3U, #EXT-X-VERSION, etc.) - """ - - init_segment_url: str - encryption_key_line: str - segments: list[HLSSegment] - header_lines: list[str] - - @classmethod - def from_text(cls, hls_playlist_text: str) -> ParsedHLSPlaylist: - """Parse HLS playlist text into structured data. - - Args: - hls_playlist_text: HLS playlist text - - Returns: - ParsedHLSPlaylist object with extracted data - """ - lines = [line.strip() for line in hls_playlist_text.split("\n") if line.strip()] - - # Extract header lines (#EXTM3U, #EXT-X-VERSION, etc.) - header_lines = [] - for line in lines: - if line.startswith("#EXT-X-TARGETDURATION"): - break - if line.startswith("#EXT"): - header_lines.append(line) - - # Extract init segment URL from #EXT-X-MAP - init_segment_url = "" - for line in lines: - if line.startswith("#EXT-X-MAP:"): - match = re.search(r'URI="([^"]+)"', line) - if match: - init_segment_url = match.group(1) - break - - # Extract encryption key line - encryption_key_line = "" - for line in lines: - if line.startswith("#EXT-X-KEY:"): - encryption_key_line = line - break - - # Extract segments (duration + URL pairs) - segments: list[HLSSegment] = [] - i = 0 - while i < len(lines): - line = lines[i] - if line.startswith("#EXTINF:"): - duration_line = line - # Next line should be segment URL - if i + 1 < len(lines): - segment_url = lines[i + 1] - if not segment_url.startswith("#"): - segments.append(HLSSegment(duration_line, segment_url)) - i += 2 - continue - i += 1 - - return cls( - init_segment_url=init_segment_url, - encryption_key_line=encryption_key_line, - segments=segments, - header_lines=header_lines, - ) diff --git a/music_assistant/providers/nicovideo/helpers/hls_seek_optimizer.py b/music_assistant/providers/nicovideo/helpers/hls_seek_optimizer.py index f84b9e66..6a84bf30 100644 --- a/music_assistant/providers/nicovideo/helpers/hls_seek_optimizer.py +++ b/music_assistant/providers/nicovideo/helpers/hls_seek_optimizer.py @@ -1,12 +1,19 @@ -"""HLS seek optimizer for nicovideo provider.""" +"""HLS seek optimizer for nicovideo provider. + +This module implements a workaround for FFmpeg's seeking limitations with fragmented MP4 +HLS playlists (see https://trac.ffmpeg.org/ticket/7359). + +NOTE: This entire module can be removed once Music Assistant requires FFmpeg 8.0+, + which fixes the input-side -ss seeking issue (commit 380a518c, 2024-11-10). +""" from __future__ import annotations import logging -import re from dataclasses import dataclass from typing import TYPE_CHECKING +from music_assistant.helpers.hls import HLSMediaPlaylist from music_assistant.providers.nicovideo.constants import ( DOMAND_BID_COOKIE_NAME, NICOVIDEO_USER_AGENT, @@ -15,7 +22,6 @@ from music_assistant.providers.nicovideo.helpers.utils import log_verbose if TYPE_CHECKING: from music_assistant.providers.nicovideo.converters.stream import NicovideoStreamData - from music_assistant.providers.nicovideo.helpers.hls_models import ParsedHLSPlaylist LOGGER = logging.getLogger(__name__) @@ -52,7 +58,7 @@ class HLSSeekOptimizer: Args: hls_data: HLS streaming data containing parsed playlist and authentication info """ - self.parsed_playlist: ParsedHLSPlaylist = hls_data.parsed_hls_playlist + self.parsed_playlist: HLSMediaPlaylist = hls_data.parsed_hls_playlist self.domand_bid = hls_data.domand_bid def _calculate_start_segment(self, seek_position: int) -> tuple[int, float]: @@ -71,10 +77,8 @@ class HLSSeekOptimizer: accumulated_time = 0.0 for idx, segment in enumerate(self.parsed_playlist.segments): - # Extract duration from #EXTINF:5.967528, - match = re.search(r"#EXTINF:([0-9.]+)", segment.duration_line) - if match: - segment_duration = float(match.group(1)) + segment_duration = segment.duration + if segment_duration > 0: if accumulated_time + segment_duration > seek_position: # Found the segment containing seek_position offset = seek_position - accumulated_time @@ -98,38 +102,42 @@ class HLSSeekOptimizer: # Add header lines lines.extend(self.parsed_playlist.header_lines) - # Calculate target duration from segments (rounded up) - max_duration = 6 # Default fallback - for segment in self.parsed_playlist.segments: - match = re.search(r"#EXTINF:([0-9.]+)", segment.duration_line) - if match: - duration = float(match.group(1)) - max_duration = max(max_duration, int(duration) + 1) - - # Add required HLS tags - lines.extend( - [ - f"#EXT-X-TARGETDURATION:{max_duration}", - "#EXT-X-MEDIA-SEQUENCE:1", - "#EXT-X-PLAYLIST-TYPE:VOD", - ] - ) + # Add segments from start_segment_idx onward + # Track previous segment's key_line and map_line to emit only when changed + prev_key_line: str | None = None + prev_map_line: str | None = None - # Add init segment - if self.parsed_playlist.init_segment_url: - lines.append(f'#EXT-X-MAP:URI="{self.parsed_playlist.init_segment_url}"') + for segment in self.parsed_playlist.segments[start_segment_idx:]: + # Add discontinuity marker if present + if segment.discontinuity: + lines.append("#EXT-X-DISCONTINUITY") - # Add encryption key if present - if self.parsed_playlist.encryption_key_line: - lines.append(self.parsed_playlist.encryption_key_line) + # Add program date/time if present + if segment.program_date_time: + lines.append(segment.program_date_time) + + # Add map line only if it changed from previous segment + # Note: MAP must come before KEY according to RFC 8216 + if segment.map_line and segment.map_line != prev_map_line: + lines.append(segment.map_line) + prev_map_line = segment.map_line + + # Add key line only if it changed from previous segment + if segment.key_line and segment.key_line != prev_key_line: + lines.append(segment.key_line) + prev_key_line = segment.key_line + + # Add segment info and URL + lines.append(segment.extinf_line) + + # Add byte range if present + if segment.byterange_line: + lines.append(segment.byterange_line) - # Add segments from start_segment_idx onward - for segment in self.parsed_playlist.segments[start_segment_idx:]: - lines.append(segment.duration_line) lines.append(segment.segment_url) # Add end tag - lines.append("#EXT-X-ENDLIST") + lines.extend(self.parsed_playlist.footer_lines) return "\n".join(lines) diff --git a/tests/core/test_hls.py b/tests/core/test_hls.py new file mode 100644 index 00000000..ca1b1b93 --- /dev/null +++ b/tests/core/test_hls.py @@ -0,0 +1,253 @@ +"""Tests for HLS playlist parsing utilities.""" + +from __future__ import annotations + +import pytest +from music_assistant_models.errors import InvalidDataError + +from music_assistant.helpers.hls import HLSMediaPlaylistParser, HLSMediaSegment + + +def test_basic_vod_playlist() -> None: + """Test parsing basic VOD playlist with encryption (standard fMP4 format).""" + playlist_text = """#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-TARGETDURATION:6 +#EXT-X-MEDIA-SEQUENCE:0 +#EXT-X-PLAYLIST-TYPE:VOD +#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key.bin" +#EXT-X-MAP:URI="init.mp4" +#EXTINF:5.967528, +segment0.m4s +#EXTINF:5.967528, +segment1.m4s +#EXTINF:5.967528, +segment2.m4s +#EXTINF:3.123456, +segment3.m4s +#EXT-X-ENDLIST +""" + result = HLSMediaPlaylistParser(playlist_text).parse() + + assert len(result.header_lines) == 5 + assert len(result.segments) == 4 + assert len(result.footer_lines) == 1 + + # Check total duration + total_duration = sum(segment.duration for segment in result.segments) + assert total_duration == pytest.approx(21.02604, rel=1e-6) + + # Check first segment inherits MAP from header + assert result.segments[0].segment_url == "segment0.m4s" + assert result.segments[0].duration == pytest.approx(5.967528, rel=1e-6) + assert ( + result.segments[0].key_line == '#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key.bin"' + ) + assert result.segments[0].map_line == '#EXT-X-MAP:URI="init.mp4"' + assert result.segments[0].byterange_line is None + assert result.segments[0].discontinuity is False + assert result.segments[0].program_date_time is None + + # Check last segment - also has MAP + assert result.segments[3].segment_url == "segment3.m4s" + assert result.segments[3].duration == pytest.approx(3.123456, rel=1e-6) + assert result.segments[3].map_line == '#EXT-X-MAP:URI="init.mp4"' + + +def test_live_stream_with_program_date_time() -> None: + """Test parsing live stream with PROGRAM-DATE-TIME tags.""" + playlist_text = """#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-TARGETDURATION:10 +#EXT-X-MEDIA-SEQUENCE:2680 +#EXT-X-PROGRAM-DATE-TIME:2025-11-27T10:15:00.000Z +#EXTINF:9.009, +https://example.com/segment2680.ts +#EXTINF:9.009, +https://example.com/segment2681.ts +#EXT-X-DISCONTINUITY +#EXT-X-PROGRAM-DATE-TIME:2025-11-27T10:15:20.000Z +#EXTINF:9.009, +https://example.com/segment2682.ts +""" + result = HLSMediaPlaylistParser(playlist_text).parse() + + assert len(result.segments) == 3 + + # First segment has PROGRAM-DATE-TIME + assert ( + result.segments[0].program_date_time == "#EXT-X-PROGRAM-DATE-TIME:2025-11-27T10:15:00.000Z" + ) + assert result.segments[0].discontinuity is False + + # Second segment does not have PROGRAM-DATE-TIME (single-use tag) + assert result.segments[1].program_date_time is None + assert result.segments[1].discontinuity is False + + # Third segment has both discontinuity and new PROGRAM-DATE-TIME + assert result.segments[2].discontinuity is True + assert ( + result.segments[2].program_date_time == "#EXT-X-PROGRAM-DATE-TIME:2025-11-27T10:15:20.000Z" + ) + + +def test_byterange_segments() -> None: + """Test parsing playlist with byte-range segments.""" + playlist_text = """#EXTM3U +#EXT-X-VERSION:4 +#EXT-X-TARGETDURATION:10 +#EXT-X-MEDIA-SEQUENCE:0 +#EXT-X-MAP:URI="init.mp4" +#EXTINF:10.0, +#EXT-X-BYTERANGE:1000@0 +video.mp4 +#EXTINF:10.0, +#EXT-X-BYTERANGE:1500 +video.mp4 +#EXTINF:10.0, +#EXT-X-BYTERANGE:1200 +video.mp4 +#EXT-X-ENDLIST +""" + result = HLSMediaPlaylistParser(playlist_text).parse() + + assert len(result.segments) == 3 + assert result.segments[0].byterange_line == "#EXT-X-BYTERANGE:1000@0" + assert result.segments[1].byterange_line == "#EXT-X-BYTERANGE:1500" + assert result.segments[2].byterange_line == "#EXT-X-BYTERANGE:1200" + + # All segments use same file + assert result.segments[0].segment_url == "video.mp4" + assert result.segments[1].segment_url == "video.mp4" + assert result.segments[2].segment_url == "video.mp4" + + +def test_multiple_encryption_keys() -> None: + """Test parsing playlist with multiple encryption keys (ad insertion scenario).""" + playlist_text = """#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-TARGETDURATION:15 +#EXT-X-MEDIA-SEQUENCE:0 +#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key1.bin" +#EXTINF:10.0, +segment0.ts +#EXTINF:10.0, +segment1.ts +#EXT-X-DISCONTINUITY +#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key2.bin" +#EXTINF:15.0, +ad_segment.ts +#EXT-X-DISCONTINUITY +#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key1.bin" +#EXTINF:10.0, +segment2.ts +#EXT-X-ENDLIST +""" + result = HLSMediaPlaylistParser(playlist_text).parse() + + assert len(result.segments) == 4 + + # First two segments use key1 + assert ( + result.segments[0].key_line + == '#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key1.bin"' + ) + assert ( + result.segments[1].key_line + == '#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key1.bin"' + ) + assert result.segments[1].discontinuity is False + + # Ad segment has discontinuity and key2 + assert result.segments[2].discontinuity is True + assert ( + result.segments[2].key_line + == '#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key2.bin"' + ) + + # Back to key1 with discontinuity + assert result.segments[3].discontinuity is True + assert ( + result.segments[3].key_line + == '#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key1.bin"' + ) + + +def test_segment_properties() -> None: + """Test HLSMediaSegment properties (duration, title) and comment handling.""" + # Test duration extraction + segment = HLSMediaSegment(extinf_line="#EXTINF:5.967528,", segment_url="test.m4s") + assert segment.duration == pytest.approx(5.967528, rel=1e-6) + + # Test duration with title + segment = HLSMediaSegment(extinf_line="#EXTINF:10.5,Track Title", segment_url="test.m4s") + assert segment.duration == pytest.approx(10.5, rel=1e-6) + assert segment.title == "Track Title" + + # Test malformed EXTINF + segment = HLSMediaSegment(extinf_line="malformed", segment_url="test.m4s") + assert segment.duration == 0.0 + + # Test comment lines and title extraction in playlist + playlist_text = """#EXTM3U +#EXT-X-VERSION:3 +# This is a comment +#EXTINF:10.0,Test Title +segment1.ts +#EXTINF:10.0, +segment2.ts +#EXT-X-ENDLIST +""" + result = HLSMediaPlaylistParser(playlist_text).parse() + assert len(result.segments) == 2 + assert result.segments[0].title == "Test Title" + assert result.segments[1].title is None + assert all("# This" not in line for line in result.header_lines) + + +def test_tag_inheritance() -> None: + """Test that EXT-X-KEY and EXT-X-MAP persist across segments until changed.""" + playlist_text = """#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-KEY:METHOD=AES-128,URI="key1.bin" +#EXT-X-MAP:URI="init.mp4" +#EXTINF:10.0, +segment1.ts +#EXTINF:10.0, +segment2.ts +#EXT-X-KEY:METHOD=AES-128,URI="key2.bin" +#EXT-X-MAP:URI="init2.mp4" +#EXTINF:10.0, +segment3.ts +#EXT-X-ENDLIST +""" + result = HLSMediaPlaylistParser(playlist_text).parse() + + # First two segments inherit key1 and init.mp4 + assert result.segments[0].key_line == '#EXT-X-KEY:METHOD=AES-128,URI="key1.bin"' + assert result.segments[0].map_line == '#EXT-X-MAP:URI="init.mp4"' + assert result.segments[1].key_line == '#EXT-X-KEY:METHOD=AES-128,URI="key1.bin"' + assert result.segments[1].map_line == '#EXT-X-MAP:URI="init.mp4"' + + # Third segment uses key2 and init2.mp4 + assert result.segments[2].key_line == '#EXT-X-KEY:METHOD=AES-128,URI="key2.bin"' + assert result.segments[2].map_line == '#EXT-X-MAP:URI="init2.mp4"' + + +def test_invalid_playlists() -> None: + """Test error handling for invalid playlist formats.""" + # No #EXTM3U header + with pytest.raises(InvalidDataError, match="must start with #EXTM3U"): + HLSMediaPlaylistParser("#EXTINF:10.0\nsegment.ts").parse() + + # Empty playlist + with pytest.raises(InvalidDataError, match="must start with #EXTM3U"): + HLSMediaPlaylistParser("").parse() + + # No segments + with pytest.raises(InvalidDataError, match="no segments found"): + HLSMediaPlaylistParser("#EXTM3U\n#EXT-X-VERSION:3").parse() + + # EXTINF without segment URL + with pytest.raises(InvalidDataError, match="without preceding segment URL"): + HLSMediaPlaylistParser("#EXTM3U\n#EXTINF:10.0,\n#EXTINF:10.0,").parse() diff --git a/tests/providers/nicovideo/fixtures/api_response_converter_mapping.py b/tests/providers/nicovideo/fixtures/api_response_converter_mapping.py index 14cb9eef..886a3eb1 100644 --- a/tests/providers/nicovideo/fixtures/api_response_converter_mapping.py +++ b/tests/providers/nicovideo/fixtures/api_response_converter_mapping.py @@ -166,7 +166,20 @@ API_RESPONSE_CONVERTER_MAPPINGS = ( selected_audio=data.selected_audio, hls_url="https://example.com/stub.m3u8", domand_bid="stub_bid", - hls_playlist_text="#EXTM3U\n#EXT-X-VERSION:3\n", + hls_playlist_text=( + "#EXTM3U\n" + "#EXT-X-VERSION:6\n" + "#EXT-X-TARGETDURATION:6\n" + "#EXT-X-MEDIA-SEQUENCE:1\n" + "#EXT-X-PLAYLIST-TYPE:VOD\n" + '#EXT-X-MAP:URI="https://example.com/init.mp4"\n' + '#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key"\n' + "#EXTINF:6.0,\n" + "segment1.m4s\n" + "#EXTINF:6.0,\n" + "segment2.m4s\n" + "#EXT-X-ENDLIST\n" + ), ) ), ),