"""All constants for Music Assistant."""
+import pathlib
+
ROOT_LOGGER_NAME = "music_assistant"
UNKNOWN_ARTIST = "Unknown Artist"
VARIOUS_ARTISTS = "Various Artists"
VARIOUS_ARTISTS_ID = "89ad4ac3-39f7-470e-963a-56509c546377"
+
+
+RESOURCES_DIR = pathlib.Path(__file__).parent.resolve().joinpath("helpers/resources")
+
+ANNOUNCE_ALERT_FILE = str(RESOURCES_DIR.joinpath("announce.mp3"))
+SILENCE_FILE = str(RESOURCES_DIR.joinpath("silence.mp3"))
+
+# if duration is None (e.g. radio stream) = 48 hours
+FALLBACK_DURATION = 172800
from aiohttp import web
+from music_assistant.constants import FALLBACK_DURATION, SILENCE_FILE
from music_assistant.helpers.audio import (
check_audio_support,
crossfade_pcm_parts,
request.remote,
request.headers,
)
+ client_id = request.match_info.get("player_id", request.remote)
stream_id = request.match_info["stream_id"]
queue_stream = self.queue_streams.get(stream_id)
+ # try to recover from the situation where the player itself requests
+ # a stream that is already done
if queue_stream is None:
- self.logger.warning("Got stream request for unknown id: %s", stream_id)
+ self.logger.warning(
+ "Got stream request for unknown or finished id: %s, trying resume",
+ stream_id,
+ )
+ if player := self.mass.players.get_player(client_id):
+ self.mass.create_task(player.active_queue.resume())
+ return web.FileResponse(SILENCE_FILE)
return web.Response(status=404)
+ if queue_stream.done.is_set():
+ self.logger.warning(
+ "Got stream request for finished stream: %s, assuming resume", stream_id
+ )
+ self.mass.create_task(queue_stream.queue.resume())
+ return web.FileResponse(SILENCE_FILE)
+
+ # handle a second connection for the same player
+ # this means either that the player itself want to skip to the next track
+ # or a misbehaving client which reconnects multiple times (e.g. Kodi)
+ if queue_stream.all_clients_connected.is_set():
+ self.logger.warning(
+ "Got stream request for running stream: %s, assuming next", stream_id
+ )
+ self.mass.create_task(queue_stream.queue.next())
+ return web.FileResponse(SILENCE_FILE)
+
+ if client_id in queue_stream.connected_clients:
+ self.logger.warning(
+ "Simultanuous connections detected from %s, playback may be disturbed",
+ client_id,
+ )
+ client_id += uuid4().hex
# prepare request, add some DLNA/UPNP compatible headers
headers = {
# do not start stream on HEAD request
return resp
- client_id = request.remote
enable_icy = request.headers.get("Icy-MetaData", "") == "1"
# regular streaming - each chunk is sent to the callback here
crossfade_size = int(self.sample_size_per_second * crossfade_duration)
queue_track.streamdetails.seconds_skipped = seek_position
# predict total size to expect for this track from duration
- stream_duration = (queue_track.duration or 48 * 3600) - seek_position
+ stream_duration = (
+ queue_track.duration or FALLBACK_DURATION
+ ) - seek_position
# buffer_duration has some overhead to account for padded silence
buffer_duration = (crossfade_duration + 4) if use_crossfade else 4
# send signal that we've loaded a new track into the buffer
from __future__ import annotations
import asyncio
-import pathlib
import random
from asyncio import TimerHandle
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
+from music_assistant.constants import ANNOUNCE_ALERT_FILE, FALLBACK_DURATION
from music_assistant.helpers.tags import parse_tags
from music_assistant.helpers.util import try_parse_int
from music_assistant.models.enums import EventType, MediaType, QueueOption, RepeatMode
from music_assistant.controllers.streams import QueueStream
from music_assistant.mass import MusicAssistant
-RESOURCES_DIR = (
- pathlib.Path(__file__)
- .parent.resolve()
- .parent.resolve()
- .joinpath("helpers/resources")
-)
-
-ANNOUNCE_ALERT_FILE = str(RESOURCES_DIR.joinpath("announce.flac"))
-
-FALLBACK_DURATION = 172800 # if duration is None (e.g. radio stream) = 48 hours
-
@dataclass
class QueueSnapShot:
async def seek(self, position: int) -> None:
"""Seek to a specific position in the track (given in seconds)."""
assert self.current_item, "No item loaded"
- assert position < self.current_item.duration, "Position exceeds track duration"
+ assert self.current_item.media_item.media_type == MediaType.TRACK
+ assert self.current_item.duration
+ assert position < self.current_item.duration
await self.play_index(self._current_index, position)
async def resume(self) -> None:
duration = (
queue_track.streamdetails.seconds_streamed
or queue_track.duration
- or 48 * 3600
+ or FALLBACK_DURATION
)
if duration is not None and elapsed_time_queue > (
duration + total_time
self._current_item_elapsed_time = try_parse_int(db_value)
await self.settings.restore()
-
- async def _wait_for_state(
- self,
- state: Union[None, PlayerState, Tuple[PlayerState]],
- queue_item_id: Optional[str] = None,
- timeout: int = 120,
- ) -> None:
- """Wait for player(queue) to reach a specific state."""
- if state is not None and not isinstance(state, tuple):
- state = (state,)
-
- count = 0
- while count < timeout * 10:
-
- if (state is None or self.player.state in state) and (
- queue_item_id is None
- or self.current_item
- and self.current_item.item_id == queue_item_id
- ):
- return
-
- count += 1
- await asyncio.sleep(0.1)
-
- raise TimeoutError(f"Timeout while waiting on state(s) {state}")