}
resp = web.StreamResponse(headers=headers)
- await resp.prepare(request)
+ try:
+ await resp.prepare(request)
+ except ConnectionResetError:
+ return resp
if request.method != "GET":
# do not start stream on HEAD request
self.signal_next: bool = False
self.chunk_size = get_chunksize(output_format)
self._runner_task: Optional[asyncio.Task] = None
- self._prev_chunks: Dict[str, bytes] = {}
+ self._prev_chunk: bytes = b""
if autostart:
self.mass.create_task(self.start())
self._runner_task = None
self.connected_clients = {}
- self._prev_chunks = {}
+ self._prev_chunk = b""
# run garbage collection manually due to the high number of
# processed bytes blocks
async def subscribe(self, client_id: str, callback: CoroutineType[bytes]) -> None:
"""Subscribe callback and wait for completion."""
- assert client_id not in self.connected_clients, "Client is already connected"
assert not self.done.is_set(), "Stream task is already finished"
+ if client_id in self.connected_clients:
+ self.logger.warning(
+ "Simultanuous connections detected from %s, playback may be disturbed",
+ client_id,
+ )
+ client_id += uuid4().hex
+
self.connected_clients[client_id] = callback
self.logger.debug("client connected: %s", client_id)
if len(self.connected_clients) == self.expected_clients:
self.all_clients_connected.set()
- if client_id in self._prev_chunks:
- self.logger.warning(
- "Reconnect of player %s detected, playback may be disturbed", client_id
- )
- await callback(self._prev_chunks[client_id])
+ # workaround for reconnecting clients (such as kodi)
+ # send the previous chunk if we have one
+ if self._prev_chunk:
+ await callback(self._prev_chunk)
try:
await self.done.wait()
finally:
if await self._check_stop():
return
for client_id in set(self.connected_clients.keys()):
- self._prev_chunks[client_id] = chunk
+ self._prev_chunk = chunk
try:
callback = self.connected_clients[client_id]
await callback(chunk)
# start queue with alert sound(s)
self._items = queue_items
- await self.queue_stream_start(0, 0, False, is_alert=True)
+ stream = await self.queue_stream_start(
+ start_index=0, seek_position=0, fade_in=False, is_alert=True
+ )
+ # execute the play command on the player(s)
+ await self.player.play_url(stream.url)
# wait for the player to finish playing
play_started = asyncio.Event()
return
self._current_index = index
# start the queue stream
- await self.queue_stream_start(index, int(seek_position), fade_in, passive)
+ stream = await self.queue_stream_start(
+ start_index=index,
+ seek_position=int(seek_position),
+ fade_in=fade_in,
+ passive=passive,
+ )
+ # execute the play command on the player(s)
+ if not passive:
+ await self.player.play_url(stream.url)
async def move_item(self, queue_item_id: str, pos_shift: int = 1) -> None:
"""
is_alert=is_alert,
)
self._stream_id = stream.stream_id
- # execute the play command on the player(s)
- if not passive:
- await self.player.play_url(stream.url)
return stream
def get_next_index(self, cur_index: Optional[int]) -> int: