# handle a second connection for the same player
# this means either that the player itself want to skip to the next track
- # or a misbehaving client which reconnects multiple times (e.g. Kodi)
+ # or a misbehaving client which reconnects multiple times (e.g. Kodi, Vlc)
if client_id in queue_stream.connected_clients:
self.logger.warning(
"Simultanuous connections detected from %s, playback may be disturbed!",
client_id,
)
client_id += uuid4().hex
- elif queue_stream.all_clients_connected.is_set():
+ elif (
+ queue_stream.all_clients_connected.is_set()
+ and queue_stream.total_seconds_streamed
+ ):
self.logger.debug(
"Got stream request for running stream: %s, assuming next", stream_id
)
self.index_in_buffer = start_index
self.signal_next: Optional[int] = None
self._runner_task: Optional[asyncio.Task] = None
- self._prev_chunk: bytes = b""
if queue.settings.metadata_mode == MetadataMode.LEGACY:
# use the legacy/recommended metaint size of 8192 bytes
self.output_chunksize = ICY_CHUNKSIZE
async def subscribe(self, client_id: str, callback: CoroutineType[bytes]) -> None:
"""Subscribe callback and wait for completion."""
assert not self.done.is_set(), "Stream task is already finished"
- if client_id in self.connected_clients:
- self.logger.warning(
- "Simultanuous connections detected from %s, playback may be disturbed",
- client_id,
- )
- client_id += uuid4().hex
+ # assert client_id not in self.connected_clients, "Client already connected"
self.connected_clients[client_id] = callback
self.logger.debug("client connected: %s", client_id)
try:
await self.done.wait()
finally:
- self.connected_clients.pop(client_id, None)
self.logger.debug("client disconnected: %s", client_id)
+ self.connected_clients.pop(client_id, None)
await self._check_stop()
async def _queue_stream_runner(self) -> None:
self.done.set()
return
self.logger.debug("%s clients connected", len(self.connected_clients))
+ # TEMP fix - to remove in stream refactor
+ # some clients connect twice (browser, vlc, kodi) and we are not prepared for that
+ await asyncio.sleep(0.2)
self.streaming_started = time()
# Read bytes from final output and send chunk to child callback.
if await self._check_stop():
return
for client_id in set(self.connected_clients.keys()):
- self._prev_chunk = chunk
try:
callback = self.connected_clients[client_id]
await callback(chunk)
self.total_seconds_streamed - self.queue.player.elapsed_time or 0
)
if player_buffered > 30:
- await asyncio.sleep(1)
+ await asyncio.sleep(0.5)
# all queue data has been streamed. Either because the queue is exhausted
# or we need to restart the stream due to decoder/sample rate mismatch
# send signal that we've loaded a new track into the buffer
self.index_in_buffer = queue_index
self.queue.signal_update()
- # precache the streamdetails for the next track
- self.mass.create_task(self._precache_next_streamdetails())
buffer = b""
bytes_written = 0
# END OF QUEUE STREAM
self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name)
- async def _precache_next_streamdetails(self) -> None:
- """Prefetch the streamdetails for the next track."""
- next_index = self.queue.get_next_index(self.index_in_buffer)
- if next_index <= self.index_in_buffer:
- return
- queue_track = self.queue.get_item(next_index)
- if not queue_track:
- return
- await get_stream_details(self.mass, queue_track, self.queue.queue_id)
-
async def _check_stop(self) -> bool:
"""Schedule stop of queue stream."""
# Stop this queue stream when no clients (re)connected within 5 seconds