enc_track_id = urllib.parse.quote(track_id)
return f"{self.base_url}/preview?provider_id={provider.value}&item_id={enc_track_id}"
- def get_control_url(self, queue_id: str, control: str = "next") -> str:
- """Return url to control endpoint."""
- return f"{self.base_url}/{queue_id}/{control}"
-
def get_silence_url(
self,
content_type: ContentType = ContentType.WAV,
app.router.add_get("/preview", self.serve_preview)
app.router.add_get("/silence.{fmt}", self.serve_silence)
- app.router.add_get("/{queue_id}/{control}", self.serve_control)
app.router.add_get("/{stream_id}.{fmt}", self.serve_queue_stream)
runner = web.AppRunner(app, access_log=None)
self.logger.info("Started stream server on port %s", self._port)
- async def serve_control(self, request: web.Request):
- """Server player control endpoint."""
- queue_id = request.match_info["queue_id"]
- control = request.match_info["control"]
- if queue := self.mass.players.get_player_queue(queue_id):
- if control == "next" and queue.signal_next is None:
- await queue.next()
-
- resp = web.StreamResponse(
- status=200, reason="OK", headers={"Content-Type": "audio/wav"}
- )
- await resp.prepare(request)
- if request.method == "GET":
- # service 1 second of silence while player is processing request
- async for chunk in get_silence(1, ContentType.WAV):
- await resp.write(chunk)
- return resp
-
async def serve_preview(self, request: web.Request):
"""Serve short preview sample."""
provider_id = request.query["provider_id"]
self.done = asyncio.Event()
self.all_clients_connected = asyncio.Event()
self.index_in_buffer = start_index
- self.signal_next: bool = False
+ self.signal_next: Optional[int] = None
self._runner_task: Optional[asyncio.Task] = None
self._prev_chunk: bytes = b""
if queue.settings.metadata_mode == MetadataMode.LEGACY:
self.mass = mass
self.logger = mass.players.logger
self.queue_id = player_id
- self.signal_next: Optional[int] = 0
self._stream_id: str = ""
self._settings = QueueSettings(self)
self._current_index: Optional[int] = None
self._current_item_elapsed_time: int = 0
self._prev_item: Optional[QueueItem] = None
- self._last_player_state: Tuple[str, str] = ("", "")
+ self._last_player_state: Tuple[PlayerState, str] = (PlayerState.OFF, "")
self._items: List[QueueItem] = []
self._save_task: TimerHandle = None
self._last_player_update: int = 0
if self.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
- self.signal_next = None
+ if stream := self.stream:
+ stream.signal_next = None
# redirect to underlying player
await self.player.stop()
async def next(self) -> None:
"""Play the next track in the queue."""
- next_index = self.get_next_index(self._current_index)
+ next_index = self.get_next_index(self._current_index, True)
if next_index is None:
return None
await self.play_index(next_index)
await self.player.play_url(last_player_url)
return
resume_item = self.current_item
+ next_item = self.next_item
resume_pos = self._current_item_elapsed_time
if (
resume_item
+ and next_item
and resume_item.duration
and resume_pos > (resume_item.duration * 0.9)
):
# track is already played for > 90% - skip to next
- resume_item = self.next_item
+ resume_item = next_item
+ resume_pos = 0
+ elif self._current_index is None and len(self._items) > 0:
+ # items available in queue but no previous track, start at 0
+ resume_item = self.get_item(0)
resume_pos = 0
if resume_item is not None:
resume_pos = resume_pos if resume_pos > 10 else 0
fade_in = resume_pos > 0
await self.play_index(resume_item.item_id, resume_pos, fade_in)
- elif len(self._items) > 0:
- # items available in queue but no previous track, start at 0
- await self.play_index(0)
else:
self.logger.warning(
"resume queue requested for %s but queue is empty", self.queue_id
self._current_index = self._snapshot.index
self._current_item_elapsed_time = self._snapshot.position
self._last_player_state = (
- self._snapshot.state.value,
+ self._snapshot.state,
self._snapshot.player_url,
)
if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED):
if self.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
+ if stream := self.stream:
+ # make sure that the previous stream is not auto restarted (race condition)
+ stream.signal_next = None
if not isinstance(index, int):
index = self.index_by_id(index)
if index is None:
def on_player_update(self) -> None:
"""Call when player updates."""
- cur_player_state = (self.player.state.value, self.player.current_url)
- if self._last_player_state != cur_player_state:
- # playback state changed
+ prev_state = self._last_player_state
+ new_state = (self.player.state, self.player.current_url)
+
+ # handle PlayerState changed
+ if new_state[0] != prev_state[0]:
+
+ # store previous state
if self.announcement_in_progress:
# while announcement in progress dont update the last url
# to allow us to resume from 3rd party sources
# https://github.com/music-assistant/hass-music-assistant/issues/697
- self._last_player_state = (
- cur_player_state[0],
- self._last_player_state[1],
- )
+ self._last_player_state = (new_state[0], prev_state[1])
else:
- self._last_player_state = cur_player_state
-
- # always signal update if playback state changed
- self.signal_update()
- if self.player.state == PlayerState.IDLE:
+ self._last_player_state = new_state
+
+ # the queue stream was aborted on purpose and needs to restart
+ if (
+ prev_state[0] == PlayerState.PLAYING
+ and new_state[0] == PlayerState.IDLE
+ and self.stream
+ and self.stream.signal_next is not None
+ ):
+ # the queue stream was aborted on purpose (e.g. because of sample rate mismatch)
+ # we need to restart the stream with the next index
+ self._current_item_elapsed_time = 0
+ self.mass.create_task(self.play_index(self.stream.signal_next))
+ return
- # handle case where stream stopped on purpose and we need to restart it
- if self.signal_next is not None:
- self._current_item_elapsed_time = 0
- next_idx = self.signal_next
- self.signal_next = None
- self.mass.create_task(self.play_index(next_idx))
+ # queue exhausted or player turned off/stopped
+ if (
+ self.stream
+ and new_state[0] in (PlayerState.IDLE, PlayerState.OFF)
+ or not self.player.available
+ ):
+ self.stream.signal_next = None
+ # handle last track of the queue, set the index to index that is out of range
+ if self._current_index >= (len(self._items) - 1):
+ self._current_index += 1
- self.update_state()
+ # always signal update if the PlayerState changed
+ if new_state != prev_state:
+ self.signal_update()
- def update_state(self) -> None:
- """Update queue details, called when player updates."""
- if self.player.active_queue != self:
- return
- if not self.active:
+ # update queue details if we're the active queue for the attached player
+ if self.player.active_queue != self or not self.active:
return
+
new_index = self._current_index
track_time = self._current_item_elapsed_time
new_item_loaded = False
self._current_index = start_index
return stream
- def get_next_index(self, cur_index: Optional[int]) -> int:
+ def get_next_index(self, cur_index: Optional[int], is_skip: bool = False) -> int:
"""Return the next index for the queue, accounting for repeat settings."""
# handle repeat single track
- if self.settings.repeat_mode == RepeatMode.ONE:
+ if self.settings.repeat_mode == RepeatMode.ONE and not is_skip:
return cur_index
# handle repeat all
if (
def signal_update(self, items_changed: bool = False) -> None:
"""Signal state changed of this queue."""
if items_changed:
- self.mass.create_task(self._save_items())
self.mass.signal_event(
MassEvent(
EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self
)
)
+ # save items
+ self.mass.create_task(
+ self.mass.cache.set(
+ f"queue.{self.queue_id}.items",
+ [x.to_dict() for x in self._items],
+ )
+ )
+
# always send the base event
self.mass.signal_event(
MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
)
+ # save state
+ self.mass.create_task(
+ self.mass.database.set_setting(
+ f"queue.{self.queue_id}.current_index", self._current_index
+ )
+ )
+ self.mass.create_task(
+ self.mass.database.set_setting(
+ f"queue.{self.queue_id}.current_item_elapsed_time",
+ self._current_item_elapsed_time,
+ )
+ )
def to_dict(self) -> Dict[str, Any]:
"""Export object to dict."""
async def _restore_items(self) -> None:
"""Try to load the saved state from cache."""
- if queue_cache := await self.mass.cache.get(f"queue_items.{self.queue_id}"):
+ if queue_cache := await self.mass.cache.get(f"queue.{self.queue_id}.items"):
try:
- self._items = [QueueItem.from_dict(x) for x in queue_cache["items"]]
- self._current_index = queue_cache["current_index"]
- self._current_item_elapsed_time = queue_cache.get(
- "current_item_elapsed_time", 0
- )
+ self._items = [QueueItem.from_dict(x) for x in queue_cache]
except (KeyError, AttributeError, TypeError) as err:
self.logger.warning(
"Unable to restore queue state for queue %s",
self.queue_id,
exc_info=err,
)
- await self.settings.restore()
+ else:
+ # restore state too
+ db_key = f"queue.{self.queue_id}.current_index"
+ if db_value := await self.mass.database.get_setting(db_key):
+ self._current_index = int(db_value)
+ db_key = f"queue.{self.queue_id}.current_item_elapsed_time"
+ if db_value := await self.mass.database.get_setting(db_key):
+ self._current_item_elapsed_time = int(db_value)
- async def _save_items(self) -> None:
- """Save current queue items/state in cache."""
- await self.mass.cache.set(
- f"queue_items.{self.queue_id}",
- {
- "items": [x.to_dict() for x in self._items],
- "current_index": self._current_index,
- "current_item_elapsed_time": self._current_item_elapsed_time,
- },
- )
+ await self.settings.restore()
async def _wait_for_state(
self,