from libopensonic.media import AlbumID3 as SonicAlbum
from libopensonic.media import ArtistID3 as SonicArtist
from libopensonic.media import Bookmark as SonicBookmark
- from libopensonic.media import Child as SonicSong
+ from libopensonic.media import Child as SonicItem
from libopensonic.media import OpenSubsonicExtension, PodcastChannel
from libopensonic.media import Playlist as SonicPlaylist
from libopensonic.media import PodcastEpisode as SonicEpisode
) -> RetType:
return await asyncio.to_thread(call, *args, **kwargs)
+ def _set_loudness(self, item: SonicItem) -> None:
+ if item.replay_gain and item.replay_gain.track_gain:
+ self.mass.create_task(
+ self.mass.music.set_loudness(
+ item.id,
+ self.instance_id,
+ item.replay_gain.track_gain,
+ item.replay_gain.album_gain,
+ )
+ )
+
async def resolve_image(self, path: str) -> bytes | Any:
"""Return the image."""
self.logger.debug("Requesting cover art for '%s'", path)
al = []
if answer.song:
- tr = [parse_track(self.logger, self.instance_id, entry) for entry in answer.song]
+ tr = []
+ for entry in answer.song:
+ self._set_loudness(entry)
+ tr.append(parse_track(self.logger, self.instance_id, entry))
else:
tr = []
aid = entry.album_id if entry.album_id else entry.parent
if aid is not None and (album is None or album.item_id != aid):
album = await self.get_album(prov_album_id=aid)
+ self._set_loudness(entry)
yield parse_track(self.logger, self.instance_id, entry, album=album)
offset += count
results = await self._run_async(
tracks = []
if sonic_album.song:
for sonic_song in sonic_album.song:
+ self._set_loudness(sonic_song)
tracks.append(parse_track(self.logger, self.instance_id, sonic_song))
return tracks
async def get_track(self, prov_track_id: str) -> Track:
"""Return the specified track."""
try:
- sonic_song: SonicSong = await self._run_async(self.conn.get_song, prov_track_id)
+ sonic_song: SonicItem = await self._run_async(self.conn.get_song, prov_track_id)
except (ParameterError, DataNotFoundError) as e:
msg = f"Item {prov_track_id} not found"
raise MediaNotFoundError(msg) from e
self.logger.warning("Unable to find album id for track %s", sonic_song.id)
else:
album = await self.get_album(prov_album_id=aid)
+ self._set_loudness(sonic_song)
return parse_track(self.logger, self.instance_id, sonic_song, album=album)
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
return
for episode in channel.episode:
+ self._set_loudness(episode)
yield parse_epsiode(self.instance_id, episode, channel)
async def get_podcast(self, prov_podcast_id: str) -> Podcast:
self.logger.warning("Unable to find album for track %s", sonic_song.id)
if aid is not None and (not album or album.item_id != aid):
album = await self.get_album(prov_album_id=aid)
+ self._set_loudness(sonic_song)
track = parse_track(self.logger, self.instance_id, sonic_song, album=album)
track.position = index
result.append(track)
except DataNotFoundError as e:
msg = f"Artist {prov_artist_id} not found"
raise MediaNotFoundError(msg) from e
- songs: list[SonicSong] = await self._run_async(self.conn.get_top_songs, sonic_artist.name)
- return [parse_track(self.logger, self.instance_id, entry) for entry in songs]
+ songs: list[SonicItem] = await self._run_async(self.conn.get_top_songs, sonic_artist.name)
+ tracks = []
+ for entry in songs:
+ self._set_loudness(entry)
+ tracks.append(parse_track(self.logger, self.instance_id, entry))
+ return tracks
async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
"""Get tracks similar to selected track."""
try:
- songs: list[SonicSong] = await self._run_async(
+ songs: list[SonicItem] = await self._run_async(
self.conn.get_similar_songs, iid=prov_track_id, count=limit
)
except DataNotFoundError as e:
# exception means we didn't find anything similar.
self.logger.info(e)
return []
- return [parse_track(self.logger, self.instance_id, entry) for entry in songs]
+ tracks = []
+ for entry in songs:
+ self._set_loudness(entry)
+ tracks.append(parse_track(self.logger, self.instance_id, entry))
+ return tracks
async def create_playlist(self, name: str) -> Playlist:
"""Create a new empty playlist on the server."""
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get the details needed to process a specified track."""
- item: SonicSong | SonicEpisode
+ item: SonicItem | SonicEpisode
if media_type == MediaType.TRACK:
try:
item = await self._run_async(self.conn.get_song, item_id)
)
for ep in sonic_episodes:
if channel_info := await self._get_podcast_channel_async(ep.channel_id):
+ self._set_loudness(ep)
podcasts.items.append(parse_epsiode(self.instance_id, ep, channel_info))
return podcasts
faves.items.append(parse_artist(self.instance_id, sonic_artist))
if starred.song:
for sonic_song in starred.song[: self._reco_limit]:
+ self._set_loudness(sonic_song)
faves.items.append(parse_track(self.logger, self.instance_id, sonic_song))
return faves