import asyncio
import random
+import time
import urllib.parse
from contextlib import suppress
from typing import TYPE_CHECKING, cast
self._is_streaming = False
self._restart_requested: bool = False
self._stop_requested: bool = False
+ self._streaming_started_at: float | None = None
self._socket_server: SnapcastSocketServer | None = None
self._socket_path: str | None = None
"""Return True if the FFmpeg streaming task is currently running."""
return self._is_streaming
+ @property
+ def playback_started_at(self) -> float | None:
+ """Return when the playback started at the clients.
+
+ return The (UTC) timestamp when the playback was started on the client
+ or None if not started yet or not streaming.
+ """
+ if self._streaming_started_at is None:
+ return None
+ if self._provider._use_builtin_server:
+ buffer_ms = self._provider._snapcast_server_buffer_size
+ if time.time() - self._streaming_started_at < buffer_ms / 1000.0:
+ return None
+ return self._streaming_started_at + buffer_ms / 1000.0
+ return self._streaming_started_at
+
async def setup(self) -> None:
"""Prepare the Snapcast stream resources.
) as ffmpeg_proc:
wait_ffmpeg = self._mass.create_task(ffmpeg_proc.wait())
wait_stop = self._mass.create_task(self._stop_streamer_evt.wait())
+ self._streaming_started_at = time.time()
self._streamer_started_evt.set()
self._is_streaming = True
await asyncio.sleep(0.25)
await asyncio.wait_for(wait_until_idle(), timeout=10.0)
-
except TimeoutError:
self._logger.warning(
"Timeout waiting for stream %s to become idle",
self.stream_name,
)
+ finally:
+ self._streaming_started_at = None
def _on_streamer_done(self, t: asyncio.Task[None]) -> None:
"""Handle streamer task completion and optional cleanup."""
return PlaybackState.PLAYING
+ @property
+ def elapsed_time(self) -> float | None:
+ """Return the elapsed time in (fractional) seconds of the current track (if any)."""
+ # using flow-mode, elapsed time will be estimated upstream from 'elapsed_time_last_updated'
+ return 0 if self.active_snap_ma_stream else None
+
+ @property
+ def elapsed_time_last_updated(self) -> float | None:
+ """
+ Return when the elapsed time was last updated.
+
+ return: The (UTC) timestamp when the elapsed time was last updated,
+ or None if it was never updated (or unknown).
+ """
+ # we only update on playback starts
+ if snap_ma_stream := self.active_snap_ma_stream:
+ return snap_ma_stream.playback_started_at
+ return None
+
def setup(self) -> None:
"""Set up player."""
self._attr_name = self.snap_client.friendly_name