from libopensonic.media import AlbumID3 as SonicAlbum
from libopensonic.media import AlbumInfo as SonicAlbumInfo
from libopensonic.media import ArtistID3 as SonicArtist
- from libopensonic.media import ArtistInfo as SonicArtistInfo
+ from libopensonic.media import ArtistInfo2 as SonicArtistInfo
from libopensonic.media import Child as SonicSong
from libopensonic.media import Playlist as SonicPlaylist
from libopensonic.media import PodcastChannel as SonicPodcast
provider=instance_id,
name=sonic_song.title,
album=album,
- duration=sonic_song.duration if sonic_song.duration is not None else 0,
+ duration=sonic_song.duration or 0,
disc_number=sonic_song.disc_number or 0,
favorite=bool(sonic_song.starred),
metadata=metadata,
provider_instance=instance_id,
available=True,
audio_format=AudioFormat(
- content_type=ContentType.try_parse(sonic_song.content_type),
+ content_type=ContentType.try_parse(sonic_song.content_type or "?"),
sample_rate=sonic_song.sampling_rate if sonic_song.sampling_rate else 44100,
bit_depth=sonic_song.bit_depth if sonic_song.bit_depth else 16,
channels=sonic_song.channel_count if sonic_song.channel_count else 2,
def parse_artist(
- instance_id: str, sonic_artist: SonicArtist, sonic_info: SonicArtistInfo = None
+ instance_id: str, sonic_artist: SonicArtist, sonic_info: SonicArtistInfo | None = None
) -> Artist:
"""Parse artist and artistInfo into a Music Assistant Artist."""
metadata: MediaItemMetadata = MediaItemMetadata()
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Callable
- from libopensonic.media import Album as SonicAlbum
- from libopensonic.media import Artist as SonicArtist
+ from libopensonic.media import AlbumID3 as SonicAlbum
+ from libopensonic.media import ArtistID3 as SonicArtist
from libopensonic.media import Bookmark as SonicBookmark
from libopensonic.media import Child as SonicSong
from libopensonic.media import OpenSubsonicExtension
class OpenSonicProvider(MusicProvider):
"""Provider for Open Subsonic servers."""
- conn: SonicConnection = None
+ conn: SonicConnection
_enable_podcasts: bool = True
_seek_support: bool = False
_ignore_offset: bool = False
album_offset=0,
song_count=songs,
song_offset=0,
- music_folder_id=None,
- )
- return SearchResults(
- artists=[parse_artist(self.instance_id, entry) for entry in answer.artist]
- if answer.artist
- else [],
- albums=[parse_album(self.logger, self.instance_id, entry) for entry in answer.album]
- if answer.album
- else [],
- tracks=[parse_track(self.logger, self.instance_id, entry) for entry in answer.song]
- if answer.song
- else [],
)
+ if answer.artist:
+ ar = [parse_artist(self.instance_id, entry) for entry in answer.artist]
+ else:
+ ar = []
+
+ if answer.album:
+ al = [parse_album(self.logger, self.instance_id, entry) for entry in answer.album]
+ else:
+ al = []
+
+ if answer.song:
+ tr = [parse_track(self.logger, self.instance_id, entry) for entry in answer.song]
+ else:
+ tr = []
+
+ return SearchResults(artists=ar, albums=al, tracks=tr)
+
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Provide a generator for reading all artists."""
artists = await self._run_async(self.conn.get_artists)
async def create_playlist(self, name: str) -> Playlist:
"""Create a new empty playlist on the server."""
- playlist: SonicPlaylist = await self._run_async(self.conn.create_playlist, name=name)
- return parse_playlist(self.instance_id, playlist)
+ if not await self._run_async(self.conn.create_playlist, name=name):
+ raise ProviderPermissionDenied(
+ "Please ensure you have permission to create playlists on your server"
+ )
+ pls: list[SonicPlaylist] = await self._run_async(self.conn.get_playlists)
+ for pl in pls:
+ if pl.name == name:
+ return parse_playlist(self.instance_id, pl)
+ raise MediaNotFoundError(f"Failed to create podcast with name '{name}'")
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
"""Append the listed tracks to the selected playlist.
raise UnsupportedFeaturedException(msg)
# For mp4 or m4a files, better to let ffmpeg detect the codec in use so mark them unknown
- if mime_type.endswith("mp4"):
+ if mime_type and mime_type.endswith("mp4"):
self.logger.warning(
"Due to the streaming method used by the subsonic API, M4A files "
"may fail. See provider documentation for more information."
if fully_played:
# We completed the episode and should delete our bookmark
try:
- await self._run_async(self.conn.delete_bookmark, id=ep_id)
+ await self._run_async(self.conn.delete_bookmark, mid=ep_id)
except DataNotFoundError:
# We probably raced with something else deleting this bookmark, not really a problem
return
# MA provides a position in seconds but expects it back in milliseconds, while
# the Open Subsonic spec expects a position in milliseconds but returns it in
# seconds, go figure.
- await self._run_async(self.conn.create_bookmark, id=ep_id, position=(position * 1000))
+ await self._run_async(self.conn.create_bookmark, mid=ep_id, position=position * 1000)
async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
"""