self._log_reader_task = asyncio.create_task(self._log_watcher())
self._audio_reader_task = asyncio.create_task(self._audio_reader())
- async def stop(self):
+ async def stop(self, wait: bool = True):
"""Stop playback and cleanup."""
if not self.running:
return
- # always stop the audio feeder
- if self._audio_reader_task and not self._audio_reader_task.done():
- with suppress(asyncio.CancelledError):
- self._audio_reader_task.cancel()
- await self._audio_reader_task
- await self.send_cli_command("ACTION=STOP")
self._stop_requested = True
- with suppress(TimeoutError):
- await asyncio.wait_for(self._cliraop_proc.wait(), 5)
- if self._cliraop_proc.returncode is None:
- self._cliraop_proc.kill()
+ # send stop with cli command
+ await self.send_cli_command("ACTION=STOP")
+
+ async def wait_for_stop() -> None:
+ # always stop the audio feeder
+ if self._audio_reader_task and not self._audio_reader_task.done():
+ with suppress(asyncio.CancelledError):
+ self._audio_reader_task.cancel()
+ # make sure stdin is drained (otherwise we'll deadlock)
+ if self._cliraop_proc and self._cliraop_proc.returncode is None:
+ if self._cliraop_proc.stdin.can_write_eof():
+ self._cliraop_proc.stdin.write_eof()
+ with suppress(BrokenPipeError):
+ await self._cliraop_proc.stdin.drain()
+ await asyncio.wait_for(self._cliraop_proc.wait(), 30)
+
+ task = self.mass.create_task(wait_for_stop())
+ if wait:
+ await task
async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLIRaop binary."""
- if not self.running:
+ if not (self._cliraop_proc and self._cliraop_proc.returncode is None):
return
named_pipe = f"/tmp/fifo-{self.active_remote_id}" # noqa: S108
airplay_player = self.airplay_player
mass_player = self.mass.players.get(airplay_player.player_id)
logger = airplay_player.logger
- airplay_player.logger.debug("Starting log watcher task...")
lost_packets = 0
async for line in self._cliraop_proc.stderr:
line = line.decode().strip() # noqa: PLW2901
mass_player.state = PlayerState.PLAYING
self.mass.players.update(airplay_player.player_id)
continue
- if "Stopped at" in line:
- logger.debug("raop streaming stopped")
- mass_player.state = PlayerState.IDLE
- self.mass.players.update(airplay_player.player_id)
- continue
if "restarting w/o pause" in line:
# streaming has started
logger.debug("raop streaming started")
if "lost packet out of backlog" in line:
lost_packets += 1
if lost_packets == 50:
- logger.warning("High packet loss detected, restart playback...")
+ logger.warning("High packet loss detected, stopping playback...")
queue = self.mass.player_queues.get_active_queue(mass_player.player_id)
- await self.mass.player_queues.resume(queue.queue_id)
+ await self.mass.player_queues.stop(queue.queue_id)
else:
logger.debug(line)
continue
"CLIRaop process stopped with errorcode %s",
self._cliraop_proc.returncode,
)
- if not airplay_player.active_stream or (
- airplay_player.active_stream
- and airplay_player.active_stream.active_remote_id == self.active_remote_id
- ):
+ if airplay_player.active_stream == self:
mass_player.state = PlayerState.IDLE
self.mass.players.update(airplay_player.player_id)
await self._cliraop_proc.stdin.drain()
except (BrokenPipeError, ConnectionResetError):
break
+ if not self.running:
+ return
# send metadata to player(s) if needed
# NOTE: this must all be done in separate tasks to not disturb audio
now = time.time()
async def _send_metadata(self, queue: PlayerQueue) -> None:
"""Send metadata to player (and connected sync childs)."""
+ if not self.running:
+ return
if not queue or not queue.current_item:
return
duration = min(queue.current_item.duration or 0, 3600)
async def _send_progress(self, queue: PlayerQueue) -> None:
"""Send progress report to player (and connected sync childs)."""
+ if not self.running:
+ return
if not queue or not queue.current_item:
return
progress = int(queue.corrected_elapsed_time)
# always stop existing stream first
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream.running:
- await airplay_player.active_stream.stop()
+ await airplay_player.active_stream.stop(wait=False)
pcm_format = AudioFormat(
content_type=ContentType.PCM_S16LE,
sample_rate=44100,
from tempfile import gettempdir
from typing import TYPE_CHECKING, Any
-import aiohttp
from asyncio_throttle import Throttler
from music_assistant.common.helpers.json import json_loads
searchtypes.append("playlist")
searchtype = ",".join(searchtypes)
search_query = search_query.replace("'", "")
- if searchresult := await self._get_data(
- "search", q=search_query, type=searchtype, limit=limit
- ):
- if "artists" in searchresult:
- result.artists += [
- await self._parse_artist(item)
- for item in searchresult["artists"]["items"]
- if (item and item["id"])
- ]
- if "albums" in searchresult:
- result.albums += [
- await self._parse_album(item)
- for item in searchresult["albums"]["items"]
- if (item and item["id"])
- ]
- if "tracks" in searchresult:
- result.tracks += [
- await self._parse_track(item)
- for item in searchresult["tracks"]["items"]
- if (item and item["id"])
- ]
- if "playlists" in searchresult:
- result.playlists += [
- await self._parse_playlist(item)
- for item in searchresult["playlists"]["items"]
- if (item and item["id"])
- ]
+ searchresult = await self._get_data("search", q=search_query, type=searchtype, limit=limit)
+ if "artists" in searchresult:
+ result.artists += [
+ await self._parse_artist(item)
+ for item in searchresult["artists"]["items"]
+ if (item and item["id"])
+ ]
+ if "albums" in searchresult:
+ result.albums += [
+ await self._parse_album(item)
+ for item in searchresult["albums"]["items"]
+ if (item and item["id"])
+ ]
+ if "tracks" in searchresult:
+ result.tracks += [
+ await self._parse_track(item)
+ for item in searchresult["tracks"]["items"]
+ if (item and item["id"])
+ ]
+ if "playlists" in searchresult:
+ result.playlists += [
+ await self._parse_playlist(item)
+ for item in searchresult["playlists"]["items"]
+ if (item and item["id"])
+ ]
return result
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
artist_obj = await self._get_data(f"artists/{prov_artist_id}")
- return await self._parse_artist(artist_obj) if artist_obj else None
+ return await self._parse_artist(artist_obj)
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
- if album_obj := await self._get_data(f"albums/{prov_album_id}"):
- return await self._parse_album(album_obj)
- msg = f"Item {prov_album_id} not found"
- raise MediaNotFoundError(msg)
+ album_obj = await self._get_data(f"albums/{prov_album_id}")
+ return await self._parse_album(album_obj)
async def get_track(self, prov_track_id) -> Track:
"""Get full track details by id."""
- if track_obj := await self._get_data(f"tracks/{prov_track_id}"):
- return await self._parse_track(track_obj)
- msg = f"Item {prov_track_id} not found"
- raise MediaNotFoundError(msg)
+ track_obj = await self._get_data(f"tracks/{prov_track_id}")
+ return await self._parse_track(track_obj)
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
- if playlist_obj := await self._get_data(f"playlists/{prov_playlist_id}"):
- return await self._parse_playlist(playlist_obj)
- msg = f"Item {prov_playlist_id} not found"
- raise MediaNotFoundError(msg)
+ playlist_obj = await self._get_data(f"playlists/{prov_playlist_id}")
+ return await self._parse_playlist(playlist_obj)
async def get_album_tracks(self, prov_album_id) -> list[AlbumTrack]:
"""Get all album tracks for given album id."""
return [
await self._parse_track(item)
for item in await self._get_all_items(f"albums/{prov_album_id}/tracks")
- if (item and item["id"])
+ if item["id"]
]
async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[PlaylistTrack, None]:
break
return all_items
- async def _get_data(self, endpoint, **kwargs):
+ async def _get_data(self, endpoint, **kwargs) -> dict[str, Any]:
"""Get data from api."""
url = f"https://api.spotify.com/v1/{endpoint}"
kwargs["market"] = "from_token"
if tokeninfo is None:
tokeninfo = await self.login()
headers = {"Authorization": f'Bearer {tokeninfo["accessToken"]}'}
- async with self._throttler:
- time_start = time.time()
- result = None
- try:
- async with self.mass.http_session.get(
- url, headers=headers, params=kwargs, ssl=False, timeout=120
- ) as response:
- # handle spotify rate limiter
- if response.status == 429:
- backoff_time = int(response.headers["Retry-After"])
- self.logger.debug(
- "Waiting %s seconds on Spotify rate limiter", backoff_time
- )
- await asyncio.sleep(backoff_time)
- return await self._get_data(endpoint, **kwargs)
- # get text before json so we can log the body in case of errors
- result = await response.text()
- result = json_loads(result)
- if "error" in result or ("status" in result and "error" in result["status"]):
- self.logger.error("%s - %s", endpoint, result)
- return None
- except (
- aiohttp.ContentTypeError,
- JSONDecodeError,
- ):
- self.logger.error("Error while processing %s: %s", endpoint, result)
- return None
- self.logger.debug(
- "Processing GET/%s took %s seconds",
- endpoint,
- round(time.time() - time_start, 2),
- )
- return result
-
- async def _delete_data(self, endpoint, data=None, **kwargs):
+ async with self._throttler, self.mass.http_session.get(
+ url, headers=headers, params=kwargs, ssl=True, timeout=120
+ ) as response:
+ # handle spotify rate limiter
+ if response.status == 429:
+ backoff_time = int(response.headers["Retry-After"])
+ self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time)
+ await asyncio.sleep(backoff_time)
+ return await self._get_data(endpoint, **kwargs)
+ # handle temporary server error
+ if response.status == 503:
+ self.logger.debug(
+ "Request to %s failed with 503 error, retrying in 30 seconds...",
+ endpoint,
+ )
+ await asyncio.sleep(30)
+ return await self._get_data(endpoint, **kwargs)
+ # handle 404 not found, convert to MediaNotFoundError
+ if response.status == 404:
+ raise MediaNotFoundError(f"{endpoint} not found")
+ response.raise_for_status()
+ return await response.json(loads=json_loads)
+
+ async def _delete_data(self, endpoint, data=None, **kwargs) -> str:
"""Delete data from api."""
url = f"https://api.spotify.com/v1/{endpoint}"
token = await self.login()
- if not token:
- return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}
async with self.mass.http_session.delete(
url, headers=headers, params=kwargs, json=data, ssl=False
) as response:
+ # handle spotify rate limiter
+ if response.status == 429:
+ backoff_time = int(response.headers["Retry-After"])
+ self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time)
+ await asyncio.sleep(backoff_time)
+ return await self._delete_data(endpoint, data=data, **kwargs)
+ response.raise_for_status()
return await response.text()
- async def _put_data(self, endpoint, data=None, **kwargs):
+ async def _put_data(self, endpoint, data=None, **kwargs) -> str:
"""Put data on api."""
url = f"https://api.spotify.com/v1/{endpoint}"
token = await self.login()
- if not token:
- return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}
async with self.mass.http_session.put(
url, headers=headers, params=kwargs, json=data, ssl=False
) as response:
+ # handle spotify rate limiter
+ if response.status == 429:
+ backoff_time = int(response.headers["Retry-After"])
+ self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time)
+ await asyncio.sleep(backoff_time)
+ return await self._put_data(endpoint, data=data, **kwargs)
+ response.raise_for_status()
return await response.text()
- async def _post_data(self, endpoint, data=None, **kwargs):
+ async def _post_data(self, endpoint, data=None, **kwargs) -> str:
"""Post data on api."""
url = f"https://api.spotify.com/v1/{endpoint}"
token = await self.login()
- if not token:
- return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}
async with self.mass.http_session.post(
url, headers=headers, params=kwargs, json=data, ssl=False
) as response:
+ # handle spotify rate limiter
+ if response.status == 429:
+ backoff_time = int(response.headers["Retry-After"])
+ self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time)
+ await asyncio.sleep(backoff_time)
+ return await self._post_data(endpoint, data=data, **kwargs)
+ response.raise_for_status()
return await response.text()
async def get_librespot_binary(self):