Extract HLS parser to shared helpers module (#2715)
author柴田 <58556078+Shi-553@users.noreply.github.com>
Sun, 30 Nov 2025 19:24:33 +0000 (04:24 +0900)
committerGitHub <noreply@github.com>
Sun, 30 Nov 2025 19:24:33 +0000 (20:24 +0100)
music_assistant/helpers/hls.py [new file with mode: 0644]
music_assistant/helpers/playlists.py
music_assistant/providers/nicovideo/converters/stream.py
music_assistant/providers/nicovideo/helpers/__init__.py
music_assistant/providers/nicovideo/helpers/hls_models.py [deleted file]
music_assistant/providers/nicovideo/helpers/hls_seek_optimizer.py
tests/core/test_hls.py [new file with mode: 0644]
tests/providers/nicovideo/fixtures/api_response_converter_mapping.py

diff --git a/music_assistant/helpers/hls.py b/music_assistant/helpers/hls.py
new file mode 100644 (file)
index 0000000..97baa5b
--- /dev/null
@@ -0,0 +1,164 @@
+"""
+RFC 8216-based HLS utilities.
+
+For simple variant stream selection from master playlists, use helpers.playlists.parse_m3u.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+
+from music_assistant_models.errors import InvalidDataError
+
+
+@dataclass
+class HLSMediaSegment:
+    """Single HLS media segment entry with associated metadata."""
+
+    extinf_line: str = ""
+    segment_url: str = ""
+    key_line: str | None = None
+    byterange_line: str | None = None
+    discontinuity: bool = False
+    map_line: str | None = None
+    program_date_time: str | None = None
+
+    @property
+    def duration(self) -> float:
+        """Extract duration in seconds from #EXTINF line."""
+        try:
+            duration_part = self.extinf_line.split("#EXTINF:")[1].split(",", 1)[0]
+            return float(duration_part.strip())
+        except (IndexError, ValueError):
+            return 0.0
+
+    @property
+    def title(self) -> str | None:
+        """Extract optional title from #EXTINF line."""
+        try:
+            parts = self.extinf_line.split("#EXTINF:")[1].split(",", 1)
+            if len(parts) == 2:
+                title = parts[1].strip()
+                return title if title else None
+            return None
+        except IndexError:
+            return None
+
+
+@dataclass
+class HLSMediaPlaylist:
+    """
+    HLS media playlist structure with headers, segments, and footers preserved.
+
+    Note: header_lines excludes EXT-X-KEY and EXT-X-MAP tags. Per RFC 8216, these
+    tags apply to subsequent segments until overridden, so they're stored per-segment
+    for easier manipulation.
+    """
+
+    header_lines: list[str] = field(default_factory=list)
+    segments: list[HLSMediaSegment] = field(default_factory=list)
+    footer_lines: list[str] = field(default_factory=list)
+
+
+class HLSMediaPlaylistParser:
+    """RFC 8216-based HLS media playlist parser."""
+
+    def __init__(self, hls_playlist_text: str) -> None:
+        """Initialize parser with playlist text."""
+        self.hls_playlist_text = hls_playlist_text
+        self.result = HLSMediaPlaylist()
+        self.working_segment = HLSMediaSegment()
+        self.segments_started = False
+
+    def parse(self) -> HLSMediaPlaylist:
+        """Parse HLS media playlist text into structured data.
+
+        Returns:
+            HLSMediaPlaylist object with extracted structure
+
+        Raises:
+            InvalidDataError: If playlist doesn't start with #EXTM3U or has invalid format
+        """
+        lines = [line.strip() for line in self.hls_playlist_text.split("\n") if line.strip()]
+
+        if not lines or not lines[0].startswith("#EXTM3U"):
+            msg = "Invalid HLS playlist: must start with #EXTM3U"
+            raise InvalidDataError(msg)
+
+        for line in lines:
+            self.process_line(line)
+
+        if not self.result.segments:
+            msg = "Invalid HLS playlist: no segments found"
+            raise InvalidDataError(msg)
+
+        return self.result
+
+    def process_line(self, line: str) -> None:
+        """Process a single line from the playlist."""
+        if line.startswith("#EXTINF:"):
+            self._on_extinf(line)
+        elif line.startswith("#EXT-X-KEY:"):
+            self._on_key_line(line)
+        elif line.startswith("#EXT-X-MAP:"):
+            self._on_map_line(line)
+        elif line.startswith("#EXT-X-PROGRAM-DATE-TIME:"):
+            self._on_program_date_time(line)
+        elif line.startswith("#EXT-X-BYTERANGE:"):
+            self._on_byterange(line)
+        elif line.startswith("#EXT-X-DISCONTINUITY"):
+            self._on_discontinuity()
+        elif line.startswith("#EXT"):
+            self._on_ext_tag(line)
+        elif line.startswith("#"):
+            pass
+        elif self.working_segment.extinf_line:
+            self._on_segment_url(line)
+
+    def _on_extinf(self, line: str) -> None:
+        """Handle #EXTINF tag."""
+        if self.working_segment.extinf_line:
+            msg = (
+                f"Malformed HLS playlist: #EXTINF '{line}' found without "
+                f"preceding segment URL for '{self.working_segment.extinf_line}'"
+            )
+            raise InvalidDataError(msg)
+        self.segments_started = True
+        self.working_segment.extinf_line = line
+
+    def _on_key_line(self, line: str) -> None:
+        """Handle #EXT-X-KEY tag."""
+        self.working_segment.key_line = line
+
+    def _on_map_line(self, line: str) -> None:
+        """Handle #EXT-X-MAP tag."""
+        self.working_segment.map_line = line
+
+    def _on_program_date_time(self, line: str) -> None:
+        """Handle #EXT-X-PROGRAM-DATE-TIME tag."""
+        self.working_segment.program_date_time = line
+
+    def _on_byterange(self, line: str) -> None:
+        """Handle #EXT-X-BYTERANGE tag."""
+        self.working_segment.byterange_line = line
+
+    def _on_discontinuity(self) -> None:
+        """Handle #EXT-X-DISCONTINUITY tag."""
+        self.working_segment.discontinuity = True
+
+    def _on_ext_tag(self, line: str) -> None:
+        """Handle other #EXT tags."""
+        if self.segments_started:
+            self.result.footer_lines.append(line)
+        else:
+            self.result.header_lines.append(line)
+
+    def _on_segment_url(self, line: str) -> None:
+        """Handle segment URL following #EXTINF."""
+        self.working_segment.segment_url = line
+        self.result.segments.append(self.working_segment)
+
+        self.working_segment = HLSMediaSegment(
+            key_line=self.working_segment.key_line,
+            map_line=self.working_segment.map_line,
+        )
index aac79966998e6daca73a303a6a145391cf807271..c66448c636a8af941ab7cf315827f321045fbcf3 100644 (file)
@@ -48,7 +48,12 @@ class PlaylistItem:
 
 
 def parse_m3u(m3u_data: str) -> list[PlaylistItem]:
-    """Very simple m3u parser.
+    """Lightweight M3U/M3U8 parser for playlist URL extraction.
+
+    This parser returns a flat list of playlist items with basic metadata.
+    Supports HLS master playlist tags (#EXT-X-STREAM-INF, #EXT-X-KEY) for
+    stream selection and quality sorting, but does not preserve segment-level
+    details or playlist structure.
 
     Based on https://github.com/dvndrsn/M3uParser/blob/master/m3uparser.py
     """
@@ -75,7 +80,7 @@ def parse_m3u(m3u_data: str) -> list[PlaylistItem]:
                 length = None
             title = info[1].strip()
         elif line.startswith("#EXT-X-STREAM-INF:"):
-            # HLS stream properties
+            # HLS master playlist variant stream properties (BANDWIDTH, RESOLUTION, etc.)
             # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
             stream_info = {}
             for part in line.replace("#EXT-X-STREAM-INF:", "").split(","):
@@ -84,7 +89,7 @@ def parse_m3u(m3u_data: str) -> list[PlaylistItem]:
                 kev_value_parts = part.strip().split("=")
                 stream_info[kev_value_parts[0]] = kev_value_parts[1]
         elif line.startswith("#EXT-X-KEY:"):
-            # Extract encryption key URI if present
+            # Extract encryption key URI from master/media playlist
             # METHOD=NONE means no encryption, so explicitly clear the key
             if "METHOD=NONE" in line:
                 key = None
index 625b741edb355ae3817860f74ad784f12273d0a2..ff51567def9094c215d8dce768aac9638f51cc80 100644 (file)
@@ -13,9 +13,9 @@ from niconico.objects.video.watch import (  # noqa: TC002 - Using by StreamConve
 )
 from pydantic import BaseModel
 
+from music_assistant.helpers.hls import HLSMediaPlaylist, HLSMediaPlaylistParser
 from music_assistant.providers.nicovideo.converters.base import NicovideoConverterBase
 from music_assistant.providers.nicovideo.helpers import create_audio_format
-from music_assistant.providers.nicovideo.helpers.hls_models import ParsedHLSPlaylist
 
 
 @dataclass
@@ -31,7 +31,7 @@ class NicovideoStreamData:
     """
 
     domand_bid: str
-    parsed_hls_playlist: ParsedHLSPlaylist
+    parsed_hls_playlist: HLSMediaPlaylist
 
 
 class StreamConversionData(BaseModel):
@@ -77,7 +77,7 @@ class NicovideoStreamConverter(NicovideoConverterBase):
         # Do not use album image intentionally
         image = track.image if track else None
 
-        parsed_playlist = ParsedHLSPlaylist.from_text(conversion_data.hls_playlist_text)
+        parsed_playlist = HLSMediaPlaylistParser(conversion_data.hls_playlist_text).parse()
 
         return StreamDetails(
             provider=self.provider.instance_id,
index 6809b3bf844cf95e2ddf13e6e21d51c393d5f6d0..d0c7558b0cb91636bc6f13d8f76136ae86898f67 100644 (file)
@@ -1,9 +1,5 @@
 """Helper functions for nicovideo provider."""
 
-from music_assistant.providers.nicovideo.helpers.hls_models import (
-    HLSSegment,
-    ParsedHLSPlaylist,
-)
 from music_assistant.providers.nicovideo.helpers.hls_seek_optimizer import (
     HLSSeekOptimizer,
     SeekOptimizedStreamContext,
@@ -18,8 +14,6 @@ from music_assistant.providers.nicovideo.helpers.utils import (
 __all__ = [
     "AlbumWithTracks",
     "HLSSeekOptimizer",
-    "HLSSegment",
-    "ParsedHLSPlaylist",
     "PlaylistWithTracks",
     "SeekOptimizedStreamContext",
     "create_audio_format",
diff --git a/music_assistant/providers/nicovideo/helpers/hls_models.py b/music_assistant/providers/nicovideo/helpers/hls_models.py
deleted file mode 100644 (file)
index 4c10d6c..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-"""HLS data models and parsing for nicovideo provider."""
-
-from __future__ import annotations
-
-import re
-from dataclasses import dataclass
-
-
-@dataclass
-class HLSSegment:
-    """Single HLS segment entry.
-
-    Attributes:
-        duration_line: #EXTINF line with duration (e.g., "#EXTINF:5.967528,")
-        segment_url: URL to the segment file
-    """
-
-    duration_line: str
-    segment_url: str
-
-
-@dataclass
-class ParsedHLSPlaylist:
-    """Parsed HLS playlist data.
-
-    Attributes:
-        init_segment_url: URL to the initialization segment (#EXT-X-MAP)
-        encryption_key_line: Encryption key line (#EXT-X-KEY) if present
-        segments: List of HLS segments
-        header_lines: Playlist header lines (#EXTM3U, #EXT-X-VERSION, etc.)
-    """
-
-    init_segment_url: str
-    encryption_key_line: str
-    segments: list[HLSSegment]
-    header_lines: list[str]
-
-    @classmethod
-    def from_text(cls, hls_playlist_text: str) -> ParsedHLSPlaylist:
-        """Parse HLS playlist text into structured data.
-
-        Args:
-            hls_playlist_text: HLS playlist text
-
-        Returns:
-            ParsedHLSPlaylist object with extracted data
-        """
-        lines = [line.strip() for line in hls_playlist_text.split("\n") if line.strip()]
-
-        # Extract header lines (#EXTM3U, #EXT-X-VERSION, etc.)
-        header_lines = []
-        for line in lines:
-            if line.startswith("#EXT-X-TARGETDURATION"):
-                break
-            if line.startswith("#EXT"):
-                header_lines.append(line)
-
-        # Extract init segment URL from #EXT-X-MAP
-        init_segment_url = ""
-        for line in lines:
-            if line.startswith("#EXT-X-MAP:"):
-                match = re.search(r'URI="([^"]+)"', line)
-                if match:
-                    init_segment_url = match.group(1)
-                break
-
-        # Extract encryption key line
-        encryption_key_line = ""
-        for line in lines:
-            if line.startswith("#EXT-X-KEY:"):
-                encryption_key_line = line
-                break
-
-        # Extract segments (duration + URL pairs)
-        segments: list[HLSSegment] = []
-        i = 0
-        while i < len(lines):
-            line = lines[i]
-            if line.startswith("#EXTINF:"):
-                duration_line = line
-                # Next line should be segment URL
-                if i + 1 < len(lines):
-                    segment_url = lines[i + 1]
-                    if not segment_url.startswith("#"):
-                        segments.append(HLSSegment(duration_line, segment_url))
-                        i += 2
-                        continue
-            i += 1
-
-        return cls(
-            init_segment_url=init_segment_url,
-            encryption_key_line=encryption_key_line,
-            segments=segments,
-            header_lines=header_lines,
-        )
index f84b9e66c292402ab973c8ed8cf3f77004fd1b7d..6a84bf3057cc93fbc05dc08f343ef433aa6aca9e 100644 (file)
@@ -1,12 +1,19 @@
-"""HLS seek optimizer for nicovideo provider."""
+"""HLS seek optimizer for nicovideo provider.
+
+This module implements a workaround for FFmpeg's seeking limitations with fragmented MP4
+HLS playlists (see https://trac.ffmpeg.org/ticket/7359).
+
+NOTE: This entire module can be removed once Music Assistant requires FFmpeg 8.0+,
+      which fixes the input-side -ss seeking issue (commit 380a518c, 2024-11-10).
+"""
 
 from __future__ import annotations
 
 import logging
-import re
 from dataclasses import dataclass
 from typing import TYPE_CHECKING
 
+from music_assistant.helpers.hls import HLSMediaPlaylist
 from music_assistant.providers.nicovideo.constants import (
     DOMAND_BID_COOKIE_NAME,
     NICOVIDEO_USER_AGENT,
@@ -15,7 +22,6 @@ from music_assistant.providers.nicovideo.helpers.utils import log_verbose
 
 if TYPE_CHECKING:
     from music_assistant.providers.nicovideo.converters.stream import NicovideoStreamData
-    from music_assistant.providers.nicovideo.helpers.hls_models import ParsedHLSPlaylist
 
 LOGGER = logging.getLogger(__name__)
 
@@ -52,7 +58,7 @@ class HLSSeekOptimizer:
         Args:
             hls_data: HLS streaming data containing parsed playlist and authentication info
         """
-        self.parsed_playlist: ParsedHLSPlaylist = hls_data.parsed_hls_playlist
+        self.parsed_playlist: HLSMediaPlaylist = hls_data.parsed_hls_playlist
         self.domand_bid = hls_data.domand_bid
 
     def _calculate_start_segment(self, seek_position: int) -> tuple[int, float]:
@@ -71,10 +77,8 @@ class HLSSeekOptimizer:
 
         accumulated_time = 0.0
         for idx, segment in enumerate(self.parsed_playlist.segments):
-            # Extract duration from #EXTINF:5.967528,
-            match = re.search(r"#EXTINF:([0-9.]+)", segment.duration_line)
-            if match:
-                segment_duration = float(match.group(1))
+            segment_duration = segment.duration
+            if segment_duration > 0:
                 if accumulated_time + segment_duration > seek_position:
                     # Found the segment containing seek_position
                     offset = seek_position - accumulated_time
@@ -98,38 +102,42 @@ class HLSSeekOptimizer:
         # Add header lines
         lines.extend(self.parsed_playlist.header_lines)
 
-        # Calculate target duration from segments (rounded up)
-        max_duration = 6  # Default fallback
-        for segment in self.parsed_playlist.segments:
-            match = re.search(r"#EXTINF:([0-9.]+)", segment.duration_line)
-            if match:
-                duration = float(match.group(1))
-                max_duration = max(max_duration, int(duration) + 1)
-
-        # Add required HLS tags
-        lines.extend(
-            [
-                f"#EXT-X-TARGETDURATION:{max_duration}",
-                "#EXT-X-MEDIA-SEQUENCE:1",
-                "#EXT-X-PLAYLIST-TYPE:VOD",
-            ]
-        )
+        # Add segments from start_segment_idx onward
+        # Track previous segment's key_line and map_line to emit only when changed
+        prev_key_line: str | None = None
+        prev_map_line: str | None = None
 
-        # Add init segment
-        if self.parsed_playlist.init_segment_url:
-            lines.append(f'#EXT-X-MAP:URI="{self.parsed_playlist.init_segment_url}"')
+        for segment in self.parsed_playlist.segments[start_segment_idx:]:
+            # Add discontinuity marker if present
+            if segment.discontinuity:
+                lines.append("#EXT-X-DISCONTINUITY")
 
-        # Add encryption key if present
-        if self.parsed_playlist.encryption_key_line:
-            lines.append(self.parsed_playlist.encryption_key_line)
+            # Add program date/time if present
+            if segment.program_date_time:
+                lines.append(segment.program_date_time)
+
+            # Add map line only if it changed from previous segment
+            # Note: MAP must come before KEY according to RFC 8216
+            if segment.map_line and segment.map_line != prev_map_line:
+                lines.append(segment.map_line)
+                prev_map_line = segment.map_line
+
+            # Add key line only if it changed from previous segment
+            if segment.key_line and segment.key_line != prev_key_line:
+                lines.append(segment.key_line)
+                prev_key_line = segment.key_line
+
+            # Add segment info and URL
+            lines.append(segment.extinf_line)
+
+            # Add byte range if present
+            if segment.byterange_line:
+                lines.append(segment.byterange_line)
 
-        # Add segments from start_segment_idx onward
-        for segment in self.parsed_playlist.segments[start_segment_idx:]:
-            lines.append(segment.duration_line)
             lines.append(segment.segment_url)
 
         # Add end tag
-        lines.append("#EXT-X-ENDLIST")
+        lines.extend(self.parsed_playlist.footer_lines)
 
         return "\n".join(lines)
 
diff --git a/tests/core/test_hls.py b/tests/core/test_hls.py
new file mode 100644 (file)
index 0000000..ca1b1b9
--- /dev/null
@@ -0,0 +1,253 @@
+"""Tests for HLS playlist parsing utilities."""
+
+from __future__ import annotations
+
+import pytest
+from music_assistant_models.errors import InvalidDataError
+
+from music_assistant.helpers.hls import HLSMediaPlaylistParser, HLSMediaSegment
+
+
+def test_basic_vod_playlist() -> None:
+    """Test parsing basic VOD playlist with encryption (standard fMP4 format)."""
+    playlist_text = """#EXTM3U
+#EXT-X-VERSION:3
+#EXT-X-TARGETDURATION:6
+#EXT-X-MEDIA-SEQUENCE:0
+#EXT-X-PLAYLIST-TYPE:VOD
+#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key.bin"
+#EXT-X-MAP:URI="init.mp4"
+#EXTINF:5.967528,
+segment0.m4s
+#EXTINF:5.967528,
+segment1.m4s
+#EXTINF:5.967528,
+segment2.m4s
+#EXTINF:3.123456,
+segment3.m4s
+#EXT-X-ENDLIST
+"""
+    result = HLSMediaPlaylistParser(playlist_text).parse()
+
+    assert len(result.header_lines) == 5
+    assert len(result.segments) == 4
+    assert len(result.footer_lines) == 1
+
+    # Check total duration
+    total_duration = sum(segment.duration for segment in result.segments)
+    assert total_duration == pytest.approx(21.02604, rel=1e-6)
+
+    # Check first segment inherits MAP from header
+    assert result.segments[0].segment_url == "segment0.m4s"
+    assert result.segments[0].duration == pytest.approx(5.967528, rel=1e-6)
+    assert (
+        result.segments[0].key_line == '#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key.bin"'
+    )
+    assert result.segments[0].map_line == '#EXT-X-MAP:URI="init.mp4"'
+    assert result.segments[0].byterange_line is None
+    assert result.segments[0].discontinuity is False
+    assert result.segments[0].program_date_time is None
+
+    # Check last segment - also has MAP
+    assert result.segments[3].segment_url == "segment3.m4s"
+    assert result.segments[3].duration == pytest.approx(3.123456, rel=1e-6)
+    assert result.segments[3].map_line == '#EXT-X-MAP:URI="init.mp4"'
+
+
+def test_live_stream_with_program_date_time() -> None:
+    """Test parsing live stream with PROGRAM-DATE-TIME tags."""
+    playlist_text = """#EXTM3U
+#EXT-X-VERSION:3
+#EXT-X-TARGETDURATION:10
+#EXT-X-MEDIA-SEQUENCE:2680
+#EXT-X-PROGRAM-DATE-TIME:2025-11-27T10:15:00.000Z
+#EXTINF:9.009,
+https://example.com/segment2680.ts
+#EXTINF:9.009,
+https://example.com/segment2681.ts
+#EXT-X-DISCONTINUITY
+#EXT-X-PROGRAM-DATE-TIME:2025-11-27T10:15:20.000Z
+#EXTINF:9.009,
+https://example.com/segment2682.ts
+"""
+    result = HLSMediaPlaylistParser(playlist_text).parse()
+
+    assert len(result.segments) == 3
+
+    # First segment has PROGRAM-DATE-TIME
+    assert (
+        result.segments[0].program_date_time == "#EXT-X-PROGRAM-DATE-TIME:2025-11-27T10:15:00.000Z"
+    )
+    assert result.segments[0].discontinuity is False
+
+    # Second segment does not have PROGRAM-DATE-TIME (single-use tag)
+    assert result.segments[1].program_date_time is None
+    assert result.segments[1].discontinuity is False
+
+    # Third segment has both discontinuity and new PROGRAM-DATE-TIME
+    assert result.segments[2].discontinuity is True
+    assert (
+        result.segments[2].program_date_time == "#EXT-X-PROGRAM-DATE-TIME:2025-11-27T10:15:20.000Z"
+    )
+
+
+def test_byterange_segments() -> None:
+    """Test parsing playlist with byte-range segments."""
+    playlist_text = """#EXTM3U
+#EXT-X-VERSION:4
+#EXT-X-TARGETDURATION:10
+#EXT-X-MEDIA-SEQUENCE:0
+#EXT-X-MAP:URI="init.mp4"
+#EXTINF:10.0,
+#EXT-X-BYTERANGE:1000@0
+video.mp4
+#EXTINF:10.0,
+#EXT-X-BYTERANGE:1500
+video.mp4
+#EXTINF:10.0,
+#EXT-X-BYTERANGE:1200
+video.mp4
+#EXT-X-ENDLIST
+"""
+    result = HLSMediaPlaylistParser(playlist_text).parse()
+
+    assert len(result.segments) == 3
+    assert result.segments[0].byterange_line == "#EXT-X-BYTERANGE:1000@0"
+    assert result.segments[1].byterange_line == "#EXT-X-BYTERANGE:1500"
+    assert result.segments[2].byterange_line == "#EXT-X-BYTERANGE:1200"
+
+    # All segments use same file
+    assert result.segments[0].segment_url == "video.mp4"
+    assert result.segments[1].segment_url == "video.mp4"
+    assert result.segments[2].segment_url == "video.mp4"
+
+
+def test_multiple_encryption_keys() -> None:
+    """Test parsing playlist with multiple encryption keys (ad insertion scenario)."""
+    playlist_text = """#EXTM3U
+#EXT-X-VERSION:3
+#EXT-X-TARGETDURATION:15
+#EXT-X-MEDIA-SEQUENCE:0
+#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key1.bin"
+#EXTINF:10.0,
+segment0.ts
+#EXTINF:10.0,
+segment1.ts
+#EXT-X-DISCONTINUITY
+#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key2.bin"
+#EXTINF:15.0,
+ad_segment.ts
+#EXT-X-DISCONTINUITY
+#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key1.bin"
+#EXTINF:10.0,
+segment2.ts
+#EXT-X-ENDLIST
+"""
+    result = HLSMediaPlaylistParser(playlist_text).parse()
+
+    assert len(result.segments) == 4
+
+    # First two segments use key1
+    assert (
+        result.segments[0].key_line
+        == '#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key1.bin"'
+    )
+    assert (
+        result.segments[1].key_line
+        == '#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key1.bin"'
+    )
+    assert result.segments[1].discontinuity is False
+
+    # Ad segment has discontinuity and key2
+    assert result.segments[2].discontinuity is True
+    assert (
+        result.segments[2].key_line
+        == '#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key2.bin"'
+    )
+
+    # Back to key1 with discontinuity
+    assert result.segments[3].discontinuity is True
+    assert (
+        result.segments[3].key_line
+        == '#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key1.bin"'
+    )
+
+
+def test_segment_properties() -> None:
+    """Test HLSMediaSegment properties (duration, title) and comment handling."""
+    # Test duration extraction
+    segment = HLSMediaSegment(extinf_line="#EXTINF:5.967528,", segment_url="test.m4s")
+    assert segment.duration == pytest.approx(5.967528, rel=1e-6)
+
+    # Test duration with title
+    segment = HLSMediaSegment(extinf_line="#EXTINF:10.5,Track Title", segment_url="test.m4s")
+    assert segment.duration == pytest.approx(10.5, rel=1e-6)
+    assert segment.title == "Track Title"
+
+    # Test malformed EXTINF
+    segment = HLSMediaSegment(extinf_line="malformed", segment_url="test.m4s")
+    assert segment.duration == 0.0
+
+    # Test comment lines and title extraction in playlist
+    playlist_text = """#EXTM3U
+#EXT-X-VERSION:3
+# This is a comment
+#EXTINF:10.0,Test Title
+segment1.ts
+#EXTINF:10.0,
+segment2.ts
+#EXT-X-ENDLIST
+"""
+    result = HLSMediaPlaylistParser(playlist_text).parse()
+    assert len(result.segments) == 2
+    assert result.segments[0].title == "Test Title"
+    assert result.segments[1].title is None
+    assert all("# This" not in line for line in result.header_lines)
+
+
+def test_tag_inheritance() -> None:
+    """Test that EXT-X-KEY and EXT-X-MAP persist across segments until changed."""
+    playlist_text = """#EXTM3U
+#EXT-X-VERSION:3
+#EXT-X-KEY:METHOD=AES-128,URI="key1.bin"
+#EXT-X-MAP:URI="init.mp4"
+#EXTINF:10.0,
+segment1.ts
+#EXTINF:10.0,
+segment2.ts
+#EXT-X-KEY:METHOD=AES-128,URI="key2.bin"
+#EXT-X-MAP:URI="init2.mp4"
+#EXTINF:10.0,
+segment3.ts
+#EXT-X-ENDLIST
+"""
+    result = HLSMediaPlaylistParser(playlist_text).parse()
+
+    # First two segments inherit key1 and init.mp4
+    assert result.segments[0].key_line == '#EXT-X-KEY:METHOD=AES-128,URI="key1.bin"'
+    assert result.segments[0].map_line == '#EXT-X-MAP:URI="init.mp4"'
+    assert result.segments[1].key_line == '#EXT-X-KEY:METHOD=AES-128,URI="key1.bin"'
+    assert result.segments[1].map_line == '#EXT-X-MAP:URI="init.mp4"'
+
+    # Third segment uses key2 and init2.mp4
+    assert result.segments[2].key_line == '#EXT-X-KEY:METHOD=AES-128,URI="key2.bin"'
+    assert result.segments[2].map_line == '#EXT-X-MAP:URI="init2.mp4"'
+
+
+def test_invalid_playlists() -> None:
+    """Test error handling for invalid playlist formats."""
+    # No #EXTM3U header
+    with pytest.raises(InvalidDataError, match="must start with #EXTM3U"):
+        HLSMediaPlaylistParser("#EXTINF:10.0\nsegment.ts").parse()
+
+    # Empty playlist
+    with pytest.raises(InvalidDataError, match="must start with #EXTM3U"):
+        HLSMediaPlaylistParser("").parse()
+
+    # No segments
+    with pytest.raises(InvalidDataError, match="no segments found"):
+        HLSMediaPlaylistParser("#EXTM3U\n#EXT-X-VERSION:3").parse()
+
+    # EXTINF without segment URL
+    with pytest.raises(InvalidDataError, match="without preceding segment URL"):
+        HLSMediaPlaylistParser("#EXTM3U\n#EXTINF:10.0,\n#EXTINF:10.0,").parse()
index 14cb9eef275dd7099221d8cd8ca3a3e820f03266..886a3eb13dce1ecf90b88c3dfec9ec24363120ca 100644 (file)
@@ -166,7 +166,20 @@ API_RESPONSE_CONVERTER_MAPPINGS = (
                 selected_audio=data.selected_audio,
                 hls_url="https://example.com/stub.m3u8",
                 domand_bid="stub_bid",
-                hls_playlist_text="#EXTM3U\n#EXT-X-VERSION:3\n",
+                hls_playlist_text=(
+                    "#EXTM3U\n"
+                    "#EXT-X-VERSION:6\n"
+                    "#EXT-X-TARGETDURATION:6\n"
+                    "#EXT-X-MEDIA-SEQUENCE:1\n"
+                    "#EXT-X-PLAYLIST-TYPE:VOD\n"
+                    '#EXT-X-MAP:URI="https://example.com/init.mp4"\n'
+                    '#EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key"\n'
+                    "#EXTINF:6.0,\n"
+                    "segment1.m4s\n"
+                    "#EXTINF:6.0,\n"
+                    "segment2.m4s\n"
+                    "#EXT-X-ENDLIST\n"
+                ),
             )
         ),
     ),