from plexapi.media import AudioStream as PlexAudioStream
from plexapi.media import Media as PlexMedia
from plexapi.media import MediaPart as PlexMediaPart
-from plexapi.myplex import MyPlexAccount, MyPlexPinLogin
+from plexapi.myplex import MyPlexPinLogin
from plexapi.server import PlexServer
from music_assistant.common.models.config_entries import (
# use the same value for both the value and the title
# until we find out what plex uses as stable identifiers
ConfigValueOption(
- title=x,
- value=x,
+ title=key,
+ value=f"{value} / {key}",
)
- for x in libraries
+ for key, value in libraries.items()
)
# select first library as (default) value
- conf_libraries.default_value = libraries[0]
- conf_libraries.value = libraries[0]
+
+ conf_libraries.default_value = conf_libraries.options[0]
+ conf_libraries.value = conf_libraries.options[0]
# return the collected config entries
return (
ConfigEntry(
"""Set up the music provider by connecting to the server."""
# silence urllib logger
logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
- server_name, library_name = self.config.get_value(CONF_LIBRARY_ID).split(" / ", 1)
+ base_url, server_name, library_name = self.config.get_value(CONF_LIBRARY_ID).split(" / ")
def connect() -> PlexServer:
try:
- plex_account = MyPlexAccount(token=self.config.get_value(CONF_AUTH_TOKEN))
+ plex_server = PlexServer(
+ baseurl=base_url, token=self.config.get_value(CONF_AUTH_TOKEN)
+ )
except plexapi.exceptions.BadRequest as err:
if "Invalid token" in str(err):
- # token invalid, invaidate the config
+ # token invalid, invalidate the config
self.mass.config.remove_provider_config_value(self.instance_id, CONF_AUTH_TOKEN)
raise LoginFailed("Authentication failed")
raise LoginFailed() from err
- return plex_account.resource(server_name).connect(None, 10)
+ return plex_server
self._plex_server = await self._run_async(connect)
self._plex_library = await self._run_async(self._plex_server.library.section, library_name)
"""Get album tracks for given album id."""
plex_album: PlexAlbum = await self._get_data(prov_album_id, PlexAlbum)
tracks = []
- for plex_track in await self._run_async(plex_album.tracks):
+ for idx, plex_track in enumerate(await self._run_async(plex_album.tracks), 1):
track = await self._parse_track(
plex_track,
- {"disc_number": plex_track.parentIndex, "track_number": plex_track.trackNumber},
+ {
+ "disc_number": plex_track.parentIndex,
+ "track_number": plex_track.trackNumber or idx,
+ },
)
tracks.append(track)
return tracks
from music_assistant.server import MusicAssistant
-async def get_libraries(mass: MusicAssistant, auth_token: str) -> set[str]:
+async def get_libraries(mass: MusicAssistant, auth_token: str) -> dict[str, PlexServer]:
"""
Get all music libraries for all plex servers.
- Returns a set of Library names in format 'servername / library name'
+ Returns a dict of Library names in format {'servername / library name':'baseurl'}
"""
cache_key = "plex_libraries"
def _get_libraries():
# create a listing of available music libraries on all servers
- all_libraries: list[str] = []
+ all_libraries: dict[str, PlexServer] = {}
plex_account = MyPlexAccount(token=auth_token)
+ connected_servers = []
for resource in plex_account.resources():
if "server" not in resource.provides:
continue
try:
plex_server: PlexServer = resource.connect(None, 10)
+ connected_servers.append(plex_server)
except plexapi.exceptions.NotFound:
continue
for media_section in plex_server.library.sections():
if media_section.type != PlexMusicSection.TYPE:
continue
# TODO: figure out what plex uses as stable id and use that instead of names
- all_libraries.append(f"{resource.name} / {media_section.title}")
+ all_libraries[f"{resource.name} / {media_section.title}"] = plex_server._baseurl
return all_libraries
if cache := await mass.cache.get(cache_key, checksum=auth_token):