from __future__ import annotations
-import asyncio
-import threading
from asyncio import TaskGroup
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar
-from libopensonic.connection import Connection as SonicConnection
+from libopensonic import AsyncConnection as SonicConnection
from libopensonic.errors import (
AuthError,
CredentialError,
)
if TYPE_CHECKING:
- from collections.abc import AsyncGenerator, Callable
+ from collections.abc import AsyncGenerator
from libopensonic.media import AlbumID3 as SonicAlbum
from libopensonic.media import ArtistID3 as SonicArtist
app_name="Music Assistant",
)
try:
- success = await self._run_async(self.conn.ping)
+ success = await self.conn.ping()
if not success:
raise CredentialError
except (AuthError, CredentialError) as e:
self._enable_podcasts = bool(self.config.get_value(CONF_ENABLE_PODCASTS))
self._ignore_offset = bool(self.config.get_value(CONF_OVERRIDE_OFFSET))
try:
- extensions: list[OpenSubsonicExtension] = await self._run_async(
- self.conn.get_open_subsonic_extensions
- )
+ extensions: list[OpenSubsonicExtension] = await self.conn.get_open_subsonic_extensions()
for entry in extensions:
if entry.name == "transcodeOffset" and not self._ignore_offset:
self._seek_support = True
async def _get_podcast_episode(self, eid: str) -> SonicEpisode:
chan_id, ep_id = eid.split(EP_CHAN_SEP)
- chan = await self._run_async(self.conn.get_podcasts, inc_episodes=True, pid=chan_id)
+ chan = await self.conn.get_podcasts(inc_episodes=True, pid=chan_id)
if not chan[0].episode:
raise MediaNotFoundError(f"Missing episode list for podcast channel '{chan[0].id}'")
msg = f"Can't find episode {ep_id} in podcast {chan_id}"
raise MediaNotFoundError(msg)
- async def _run_async(
- self, call: Callable[Param, RetType], *args: Param.args, **kwargs: Param.kwargs
- ) -> RetType:
- return await asyncio.to_thread(call, *args, **kwargs)
-
def _set_loudness(self, item: SonicItem) -> None:
if item.replay_gain and item.replay_gain.track_gain is not None:
# Convert ReplayGain values (gain in dB) to integrated loudness (LUFS)
"""Return the image."""
self.logger.debug("Requesting cover art for '%s'", path)
- def _get_cover_art() -> bytes | Any:
- try:
- with self.conn.get_cover_art(path) as art:
- return art.content
- except DataNotFoundError:
- self.logger.warning("Unable to locate a cover image for %s", path)
- return None
-
- return await asyncio.to_thread(_get_cover_art)
+ try:
+ art = await self.conn.get_cover_art(path)
+ return await art.content.read()
+ except DataNotFoundError:
+ self.logger.warning("Unable to locate a cover image for %s", path)
+ return None
async def search(
self, search_query: str, media_types: list[MediaType], limit: int = 20
songs = limit if MediaType.TRACK in media_types else 0
if not (artists or albums or songs):
return SearchResults()
- answer = await self._run_async(
- self.conn.search3,
+ answer = await self.conn.search3(
query=search_query,
artist_count=artists,
artist_offset=0,
track_ids.append(prov_item_id)
if favorite:
- await self._run_async(
- self.conn.star, sids=track_ids, album_ids=album_ids, artist_ids=artist_ids
- )
+ await self.conn.star(sids=track_ids, album_ids=album_ids, artist_ids=artist_ids)
else:
- await self._run_async(
- self.conn.unstar, sids=track_ids, album_ids=album_ids, artist_ids=artist_ids
- )
+ await self.conn.unstar(sids=track_ids, album_ids=album_ids, artist_ids=artist_ids)
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Provide a generator for reading all artists."""
- artists = await self._run_async(self.conn.get_artists)
+ artists = await self.conn.get_artists()
if not artists.index:
return
"""
offset = 0
size = self._pagination_size
- albums = await self._run_async(
- self.conn.get_album_list2,
+ albums = await self.conn.get_album_list2(
ltype="alphabeticalByArtist",
size=size,
offset=offset,
for album in albums:
yield parse_album(self.logger, self.instance_id, album)
offset += size
- albums = await self._run_async(
- self.conn.get_album_list2,
+ albums = await self.conn.get_album_list2(
ltype="alphabeticalByArtist",
size=size,
offset=offset,
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Provide a generator for library playlists."""
- results = await self._run_async(self.conn.get_playlists)
+ results = await self.conn.get_playlists()
for entry in results:
yield parse_playlist(self.instance_id, entry)
offset = 0
count = self._pagination_size
try:
- results = await self._run_async(
- self.conn.search3,
+ results = await self.conn.search3(
query=query,
artist_count=0,
album_count=0,
except ParameterError:
# Older Navidrome does not accept an empty string and requires the empty quotes
query = '""'
- results = await self._run_async(
- self.conn.search3,
+ results = await self.conn.search3(
query=query,
artist_count=0,
album_count=0,
self._set_loudness(entry)
yield parse_track(self.logger, self.instance_id, entry, album=album)
offset += count
- results = await self._run_async(
- self.conn.search3,
+ results = await self.conn.search3(
query=query,
artist_count=0,
album_count=0,
async def get_album(self, prov_album_id: str) -> Album:
"""Return the requested Album."""
try:
- sonic_album: SonicAlbum = await self._run_async(self.conn.get_album, prov_album_id)
- sonic_info = await self._run_async(self.conn.get_album_info2, aid=prov_album_id)
+ sonic_album: SonicAlbum = await self.conn.get_album(prov_album_id)
+ sonic_info = await self.conn.get_album_info2(aid=prov_album_id)
except (ParameterError, DataNotFoundError) as e:
msg = f"Album {prov_album_id} not found"
raise MediaNotFoundError(msg) from e
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Return a list of tracks on the specified Album."""
try:
- sonic_album: SonicAlbum = await self._run_async(self.conn.get_album, prov_album_id)
+ sonic_album: SonicAlbum = await self.conn.get_album(prov_album_id)
except (ParameterError, DataNotFoundError) as e:
msg = f"Album {prov_album_id} not found"
raise MediaNotFoundError(msg) from e
)
try:
- sonic_artist: SonicArtist = await self._run_async(
- self.conn.get_artist, artist_id=prov_artist_id
- )
- sonic_info = await self._run_async(self.conn.get_artist_info2, aid=prov_artist_id)
+ sonic_artist: SonicArtist = await self.conn.get_artist(artist_id=prov_artist_id)
+ sonic_info = await self.conn.get_artist_info2(aid=prov_artist_id)
except (ParameterError, DataNotFoundError) as e:
msg = f"Artist {prov_artist_id} not found"
raise MediaNotFoundError(msg) from e
async def get_track(self, prov_track_id: str) -> Track:
"""Return the specified track."""
try:
- sonic_song: SonicItem = await self._run_async(self.conn.get_song, prov_track_id)
+ sonic_song: SonicItem = await self.conn.get_song(prov_track_id)
except (ParameterError, DataNotFoundError) as e:
msg = f"Item {prov_track_id} not found"
raise MediaNotFoundError(msg) from e
return []
try:
- sonic_artist: SonicArtist = await self._run_async(self.conn.get_artist, prov_artist_id)
+ sonic_artist: SonicArtist = await self.conn.get_artist(prov_artist_id)
except (ParameterError, DataNotFoundError) as e:
msg = f"Album {prov_artist_id} not found"
raise MediaNotFoundError(msg) from e
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Return the specified Playlist."""
try:
- sonic_playlist: SonicPlaylist = await self._run_async(
- self.conn.get_playlist, prov_playlist_id
- )
+ sonic_playlist: SonicPlaylist = await self.conn.get_playlist(prov_playlist_id)
except (ParameterError, DataNotFoundError) as e:
msg = f"Playlist {prov_playlist_id} not found"
raise MediaNotFoundError(msg) from e
"""Get all Episodes for given podcast id."""
if not self._enable_podcasts:
return
- channels = await self._run_async(
- self.conn.get_podcasts, inc_episodes=True, pid=prov_podcast_id
- )
+ channels = await self.conn.get_podcasts(inc_episodes=True, pid=prov_podcast_id)
channel = channels[0]
if not channel.episode:
return
msg = "Podcasts are currently disabled in the provider configuration"
raise ActionUnavailable(msg)
- channels = await self._run_async(
- self.conn.get_podcasts, inc_episodes=True, pid=prov_podcast_id
- )
+ channels = await self.conn.get_podcasts(inc_episodes=True, pid=prov_podcast_id)
return parse_podcast(self.instance_id, channels[0])
async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
"""Retrieve library/subscribed podcasts from the provider."""
if self._enable_podcasts:
- channels = await self._run_async(self.conn.get_podcasts, inc_episodes=True)
+ channels = await self.conn.get_podcasts(inc_episodes=True)
for channel in channels:
yield parse_podcast(self.instance_id, channel)
# paging not supported, we always return the whole list at once
return result
try:
- sonic_playlist: SonicPlaylist = await self._run_async(
- self.conn.get_playlist, prov_playlist_id
- )
+ sonic_playlist: SonicPlaylist = await self.conn.get_playlist(prov_playlist_id)
except (ParameterError, DataNotFoundError) as e:
msg = f"Playlist {prov_playlist_id} not found"
raise MediaNotFoundError(msg) from e
return []
try:
- sonic_artist: SonicArtist = await self._run_async(self.conn.get_artist, prov_artist_id)
+ sonic_artist: SonicArtist = await self.conn.get_artist(prov_artist_id)
except DataNotFoundError as e:
msg = f"Artist {prov_artist_id} not found"
raise MediaNotFoundError(msg) from e
- songs: list[SonicItem] = await self._run_async(self.conn.get_top_songs, sonic_artist.name)
+ songs: list[SonicItem] = await self.conn.get_top_songs(sonic_artist.name)
tracks = []
for entry in songs:
self._set_loudness(entry)
async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
"""Get tracks similar to selected track."""
try:
- songs: list[SonicItem] = await self._run_async(
- self.conn.get_similar_songs, iid=prov_track_id, count=limit
+ songs: list[SonicItem] = await self.conn.get_similar_songs(
+ iid=prov_track_id, count=limit
)
except DataNotFoundError as e:
# Subsonic returns an error here instead of an empty list, I don't think this
async def create_playlist(self, name: str) -> Playlist:
"""Create a new empty playlist on the server."""
- if not await self._run_async(self.conn.create_playlist, name=name):
+ if not await self.conn.create_playlist(name=name):
raise ProviderPermissionDenied(
"Please ensure you have permission to create playlists on your server"
)
- pls: list[SonicPlaylist] = await self._run_async(self.conn.get_playlists)
+ pls: list[SonicPlaylist] = await self.conn.get_playlists()
for pl in pls:
if pl.name == name:
return parse_playlist(self.instance_id, pl)
Note that the configured user must own the playlist to edit this way.
"""
try:
- await self._run_async(
- self.conn.update_playlist,
+ await self.conn.update_playlist(
lid=prov_playlist_id,
song_ids_to_add=prov_track_ids,
)
"""Remove selected positions from the playlist."""
idx_to_remove = [pos - 1 for pos in positions_to_remove]
try:
- await self._run_async(
- self.conn.update_playlist,
+ await self.conn.update_playlist(
lid=prov_playlist_id,
song_indices_to_remove=idx_to_remove,
)
item: SonicItem | SonicEpisode
if media_type == MediaType.TRACK:
try:
- item = await self._run_async(self.conn.get_song, item_id)
+ item = await self.conn.get_song(item_id)
except (ParameterError, DataNotFoundError) as e:
msg = f"Item {item_id} not found"
raise MediaNotFoundError(msg) from e
if fully_played:
# We completed the episode and should delete our bookmark
try:
- await self._run_async(self.conn.delete_bookmark, mid=ep_id)
+ await self.conn.delete_bookmark(mid=ep_id)
except DataNotFoundError:
# We probably raced with something else deleting this bookmark, not really a problem
self.logger.info("Bookmark for item '%s' has already been deleted.", ep_id)
# Otherwise, create a new bookmark for this item or update the existing one
# MA provides a position in seconds but expects it back in milliseconds
- await self._run_async(
- self.conn.create_bookmark,
+ await self.conn.create_bookmark(
mid=ep_id,
position=position * 1000,
comment="Music Assistant Bookmark",
_, ep_id = item_id.split(EP_CHAN_SEP)
- bookmarks: list[SonicBookmark] = await self._run_async(self.conn.get_bookmarks)
+ bookmarks: list[SonicBookmark] = await self.conn.get_bookmarks()
for mark in bookmarks:
if mark.entry.id == ep_id:
self, streamdetails: StreamDetails, seek_position: int = 0
) -> AsyncGenerator[bytes, None]:
"""Provide a generator for the stream data."""
- audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(10)
# ignore seek position if the server does not support it
# in that case we let the core handle seeking
if not self._seek_support:
seek_position = 0
self.logger.debug("Streaming %s", streamdetails.item_id)
- cancelled = threading.Event()
-
- def _streamer() -> None:
- self.logger.debug("starting stream of item '%s'", streamdetails.item_id)
- try:
- with self.conn.stream(
- streamdetails.item_id,
- time_offset=seek_position,
- estimate_length=True,
- ) as stream:
- for chunk in stream.iter_content(chunk_size=40960):
- # Use put_nowait to avoid blocking and potential duplicate chunks
- # that can occur when using put() with timeouts
- while True:
- if cancelled.is_set():
- self.logger.debug(
- "Stream cancelled for item '%s'", streamdetails.item_id
- )
- return
- try:
- audio_buffer.put_nowait(chunk)
- break # Successfully put chunk, move to next
- except asyncio.QueueFull:
- # Queue is full, wait a bit and check for cancellation
- cancelled.wait(timeout=0.1)
- # send empty chunk when we're done
- if not cancelled.is_set():
- # For EOF, we can wait a bit longer since it's the final message
- for _ in range(50): # Try for up to 5 seconds
- try:
- audio_buffer.put_nowait(b"EOF")
- break
- except asyncio.QueueFull:
- if cancelled.is_set():
- break
- cancelled.wait(timeout=0.1)
- else:
- self.logger.debug(
- "Timeout sending EOF for item '%s'", streamdetails.item_id
- )
- except DataNotFoundError as err:
- msg = f"Item '{streamdetails.item_id}' not found"
- raise MediaNotFoundError(msg) from err
-
- # fire up an executor thread to put the audio chunks (threadsafe) on the audio buffer
- streamer_task = self.mass.loop.run_in_executor(None, _streamer)
try:
- while True:
- # keep reading from the audio buffer until there is no more data
- chunk = await audio_buffer.get()
- if chunk == b"EOF":
- break
- yield chunk
- finally:
- # Signal the streamer thread to stop
- cancelled.set()
- if not streamer_task.done():
- streamer_task.cancel()
+ resp = await self.conn.stream(
+ streamdetails.item_id, time_offset=seek_position, estimate_length=True
+ )
+ except DataNotFoundError as err:
+ msg = f"Item '{streamdetails.item_id}' not found"
+ raise MediaNotFoundError(msg) from err
+ self.logger.debug("starting stream of item '%s'", streamdetails.item_id)
+ async with resp:
+ async for chunk in resp.content.iter_chunked(40960):
+ yield bytes(chunk)
self.logger.debug("Done streaming %s", streamdetails.item_id)
category=CACHE_CATEGORY_PODCAST_CHANNEL,
):
return cache
- if channels := await self._run_async(
- self.conn.get_podcasts, inc_episodes=True, pid=chan_id
- ):
+ if channels := await self.conn.get_podcasts(inc_episodes=True, pid=chan_id):
channel = channels[0]
await self.mass.cache.set(
key=chan_id,
provider=self.domain,
name="Newest Podcast Episodes",
)
- sonic_episodes = await self._run_async(
- self.conn.get_newest_podcasts, count=self._reco_limit
- )
+ sonic_episodes = await self.conn.get_newest_podcasts(count=self._reco_limit)
for ep in sonic_episodes:
if channel_info := await self._get_podcast_channel_async(ep.channel_id):
self._set_loudness(ep)
faves: RecommendationFolder = RecommendationFolder(
item_id="subsonic_starred_albums", provider=self.domain, name="Starred Items"
)
- starred = await self._run_async(self.conn.get_starred2)
+ starred = await self.conn.get_starred2()
if starred.album:
for sonic_album in starred.album[: self._reco_limit]:
faves.items.append(parse_album(self.logger, self.instance_id, sonic_album))
new_stuff: RecommendationFolder = RecommendationFolder(
item_id="subsonic_new_albums", provider=self.domain, name="New Albums"
)
- new_albums = await self._run_async(
- self.conn.get_album_list2, ltype="newest", size=self._reco_limit
- )
+ new_albums = await self.conn.get_album_list2(ltype="newest", size=self._reco_limit)
for sonic_album in new_albums:
new_stuff.items.append(parse_album(self.logger, self.instance_id, sonic_album))
return new_stuff
recent: RecommendationFolder = RecommendationFolder(
item_id="subsonic_most_played", provider=self.domain, name="Most Played Albums"
)
- albums = await self._run_async(
- self.conn.get_album_list2, ltype="frequent", size=self._reco_limit
- )
+ albums = await self.conn.get_album_list2(ltype="frequent", size=self._reco_limit)
for sonic_album in albums:
recent.items.append(parse_album(self.logger, self.instance_id, sonic_album))
return recent