import asyncio
import logging
-from asyncio import TaskGroup
+from asyncio import Task, TaskGroup
+from collections.abc import Awaitable
from contextlib import suppress
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
import plexapi.exceptions
import requests
from plexapi.audio import Artist as PlexArtist
from plexapi.audio import Playlist as PlexPlaylist
from plexapi.audio import Track as PlexTrack
+from plexapi.base import PlexObject
from plexapi.myplex import MyPlexAccount, MyPlexPinLogin
from plexapi.server import PlexServer
ProviderMapping,
SearchResults,
Track,
+ UniqueList,
)
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import UNKNOWN_ARTIST
if action == CONF_ACTION_GDM:
server_details = await discover_local_servers()
if server_details and server_details[0] and server_details[1]:
+ assert values
values[CONF_LOCAL_SERVER_IP] = server_details[0]
values[CONF_LOCAL_SERVER_PORT] = server_details[1]
values[CONF_LOCAL_SERVER_SSL] = False
values[CONF_LOCAL_SERVER_VERIFY_CERT] = False
else:
+ assert values
values[CONF_LOCAL_SERVER_IP] = "Discovery failed, please add IP manually"
values[CONF_LOCAL_SERVER_PORT] = 32400
values[CONF_LOCAL_SERVER_SSL] = False
# handle action clear authentication
if action == CONF_ACTION_CLEAR_AUTH:
+ assert values
values[CONF_AUTH_TOKEN] = None
values[CONF_LOCAL_SERVER_IP] = None
values[CONF_LOCAL_SERVER_PORT] = 32400
# handle action MyPlex auth
if action == CONF_ACTION_AUTH_MYPLEX:
+ assert values
values[CONF_AUTH_TOKEN] = None
- async with AuthenticationHelper(mass, values["session_id"]) as auth_helper:
+ async with AuthenticationHelper(mass, str(values["session_id"])) as auth_helper:
plex_auth = MyPlexPinLogin(headers={"X-Plex-Product": "Music Assistant"}, oauth=True)
auth_url = plex_auth.oauthUrl(auth_helper.callback_url)
await auth_helper.authenticate(auth_url)
# handle action Local auth (no MyPlex)
if action == CONF_ACTION_AUTH_LOCAL:
+ assert values
values[CONF_AUTH_TOKEN] = AUTH_TOKEN_UNAUTH
# collect all config entries to show
action_label="Select Plex Music Library",
)
if action in (CONF_ACTION_LIBRARY, CONF_ACTION_AUTH_MYPLEX, CONF_ACTION_AUTH_LOCAL):
- token = mass.config.decrypt_string(values.get(CONF_AUTH_TOKEN))
- server_http_ip = values.get(CONF_LOCAL_SERVER_IP)
- server_http_port = values.get(CONF_LOCAL_SERVER_PORT)
- server_http_ssl = values.get(CONF_LOCAL_SERVER_SSL)
- server_http_verify_cert = values.get(CONF_LOCAL_SERVER_VERIFY_CERT)
+ token = mass.config.decrypt_string(str(values.get(CONF_AUTH_TOKEN)))
+ server_http_ip = str(values.get(CONF_LOCAL_SERVER_IP))
+ server_http_port = str(values.get(CONF_LOCAL_SERVER_PORT))
+ server_http_ssl = bool(values.get(CONF_LOCAL_SERVER_SSL))
+ server_http_verify_cert = bool(values.get(CONF_LOCAL_SERVER_VERIFY_CERT))
if not (
libraries := await get_libraries(
mass,
return tuple(entries)
+Param = ParamSpec("Param")
+RetType = TypeVar("RetType")
+PlexObjectT = TypeVar("PlexObjectT", bound=PlexObject)
+MediaItemT = TypeVar("MediaItemT", bound=MediaItem)
+
+
class PlexProvider(MusicProvider):
"""Provider for a plex music library."""
_plex_server: PlexServer = None
_plex_library: PlexMusicSection = None
_myplex_account: MyPlexAccount = None
- _baseurl: str = None
+ _baseurl: str
async def handle_async_init(self) -> None:
"""Set up the music provider by connecting to the server."""
# silence loggers
logging.getLogger("plexapi").setLevel(self.logger.level + 10)
- _, library_name = self.config.get_value(CONF_LIBRARY_ID).split(" / ", 1)
+ _, library_name = str(self.config.get_value(CONF_LIBRARY_ID)).split(" / ", 1)
def connect() -> PlexServer:
try:
except plexapi.exceptions.BadRequest as err:
if "Invalid token" in str(err):
# token invalid, invalidate the config
- self.mass.config.remove_provider_config_value(self.instance_id, CONF_AUTH_TOKEN)
+ self.mass.call_later(
+ 0,
+ self.mass.config.remove_provider_config_value(
+ self.instance_id, CONF_AUTH_TOKEN
+ ),
+ )
msg = "Authentication failed"
raise LoginFailed(msg)
raise LoginFailed from err
return plex_server
self._myplex_account = await self.get_myplex_account_and_refresh_token(
- self.config.get_value(CONF_AUTH_TOKEN)
+ str(self.config.get_value(CONF_AUTH_TOKEN))
)
try:
self._plex_server = await self._run_async(connect)
async def resolve_image(self, path: str) -> str | bytes:
"""Return the full image URL including the auth token."""
- return self._plex_server.url(path, True)
+ return str(self._plex_server.url(path, True))
- async def _run_async(self, call: Callable, *args, **kwargs):
- await self.get_myplex_account_and_refresh_token(self.config.get_value(CONF_AUTH_TOKEN))
+ async def _run_async(
+ self, call: Callable[Param, RetType], *args: Param.args, **kwargs: Param.kwargs
+ ) -> RetType:
+ await self.get_myplex_account_and_refresh_token(str(self.config.get_value(CONF_AUTH_TOKEN)))
return await asyncio.to_thread(call, *args, **kwargs)
- async def _get_data(self, key, cls=None):
- return await self._run_async(self._plex_library.fetchItem, key, cls)
+ async def _get_data(self, key: str, cls: type[PlexObjectT]) -> PlexObjectT:
+ results = await self._run_async(self._plex_library.fetchItem, key, cls)
+ return cast(PlexObjectT, results)
def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
name, version = parse_title_and_version(name)
version=version,
)
- async def _get_or_create_artist_by_name(self, artist_name) -> Artist:
+ async def _get_or_create_artist_by_name(self, artist_name: str) -> Artist | ItemMapping:
subquery = (
"WHERE provider_mappings.media_type = 'artist' "
"AND provider_mappings.provider_instance = :provider_instance"
},
)
- async def _parse(self, plex_media) -> MediaItem | None:
+ async def _parse(self, plex_media: PlexObject) -> MediaItem | None:
if plex_media.type == "artist":
return await self._parse_artist(plex_media)
elif plex_media.type == "album":
return await self._parse_playlist(plex_media)
return None
- async def _search_track(self, search_query, limit) -> list[PlexTrack]:
- return await self._run_async(
- self._plex_library.searchTracks, title=search_query, limit=limit
+ async def _search_track(self, search_query: str | None, limit: int) -> list[PlexTrack]:
+ return cast(
+ list[PlexTrack],
+ await self._run_async(self._plex_library.searchTracks, title=search_query, limit=limit),
)
- async def _search_album(self, search_query, limit) -> list[PlexAlbum]:
- return await self._run_async(
- self._plex_library.searchAlbums, title=search_query, limit=limit
+ async def _search_album(self, search_query: str, limit: int) -> list[PlexAlbum]:
+ return cast(
+ list[PlexAlbum],
+ await self._run_async(self._plex_library.searchAlbums, title=search_query, limit=limit),
)
- async def _search_artist(self, search_query, limit) -> list[PlexArtist]:
- return await self._run_async(
- self._plex_library.searchArtists, title=search_query, limit=limit
+ async def _search_artist(self, search_query: str, limit: int) -> list[PlexArtist]:
+ return cast(
+ list[PlexArtist],
+ await self._run_async(
+ self._plex_library.searchArtists, title=search_query, limit=limit
+ ),
)
- async def _search_playlist(self, search_query, limit) -> list[PlexPlaylist]:
- return await self._run_async(self._plex_library.playlists, title=search_query, limit=limit)
+ async def _search_playlist(self, search_query: str, limit: int) -> list[PlexPlaylist]:
+ return cast(
+ list[PlexPlaylist],
+ await self._run_async(self._plex_library.playlists, title=search_query, limit=limit),
+ )
- async def _search_track_advanced(self, limit, **kwargs) -> list[PlexTrack]:
- return await self._run_async(self._plex_library.searchTracks, filters=kwargs, limit=limit)
+ async def _search_track_advanced(self, limit: int, **kwargs: Any) -> list[PlexTrack]:
+ return cast(
+ list[PlexPlaylist],
+ await self._run_async(self._plex_library.searchTracks, filters=kwargs, limit=limit),
+ )
- async def _search_album_advanced(self, limit, **kwargs) -> list[PlexAlbum]:
- return await self._run_async(self._plex_library.searchAlbums, filters=kwargs, limit=limit)
+ async def _search_album_advanced(self, limit: int, **kwargs: Any) -> list[PlexAlbum]:
+ return cast(
+ list[PlexPlaylist],
+ await self._run_async(self._plex_library.searchAlbums, filters=kwargs, limit=limit),
+ )
- async def _search_artist_advanced(self, limit, **kwargs) -> list[PlexArtist]:
- return await self._run_async(self._plex_library.searchArtists, filters=kwargs, limit=limit)
+ async def _search_artist_advanced(self, limit: int, **kwargs: Any) -> list[PlexArtist]:
+ return cast(
+ list[PlexPlaylist],
+ await self._run_async(self._plex_library.searchArtists, filters=kwargs, limit=limit),
+ )
- async def _search_playlist_advanced(self, limit, **kwargs) -> list[PlexPlaylist]:
- return await self._run_async(self._plex_library.playlists, filters=kwargs, limit=limit)
+ async def _search_playlist_advanced(self, limit: int, **kwargs: Any) -> list[PlexPlaylist]:
+ return cast(
+ list[PlexPlaylist],
+ await self._run_async(self._plex_library.playlists, filters=kwargs, limit=limit),
+ )
async def _search_and_parse(
- self, search_coro: Coroutine, parse_coro: Callable
- ) -> list[MediaItem]:
- task_results = []
+ self,
+ search_coro: Awaitable[list[PlexObjectT]],
+ parse_coro: Callable[[PlexObjectT], Coroutine[Any, Any, MediaItemT]],
+ ) -> list[MediaItemT]:
+ task_results: list[Task[MediaItemT]] = []
async with TaskGroup() as tg:
for item in await search_coro:
task_results.append(tg.create_task(parse_coro(item)))
- results = []
+ results: list[MediaItemT] = []
for task in task_results:
results.append(task.result())
if plex_album.year:
album.year = plex_album.year
if thumb := plex_album.firstAttr("thumb", "parentThumb", "grandparentThumb"):
- album.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=thumb,
- provider=self.instance_id,
- remotely_accessible=False,
- )
- ]
+ album.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ )
if plex_album.summary:
album.metadata.description = plex_album.summary
if plex_artist.summary:
artist.metadata.description = plex_artist.summary
if thumb := plex_artist.firstAttr("thumb", "parentThumb", "grandparentThumb"):
- artist.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=thumb,
- provider=self.instance_id,
- remotely_accessible=False,
- )
- ]
+ artist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ )
return artist
async def _parse_playlist(self, plex_playlist: PlexPlaylist) -> Playlist:
if plex_playlist.summary:
playlist.metadata.description = plex_playlist.summary
if thumb := plex_playlist.firstAttr("thumb", "parentThumb", "grandparentThumb"):
- playlist.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=thumb,
- provider=self.instance_id,
- remotely_accessible=False,
- )
- ]
+ playlist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ )
playlist.is_editable = not plex_playlist.smart
playlist.cache_checksum = str(plex_playlist.updatedAt.timestamp())
raise InvalidDataError(msg)
if thumb := plex_track.firstAttr("thumb", "parentThumb", "grandparentThumb"):
- track.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path=thumb,
- provider=self.instance_id,
- remotely_accessible=False,
- )
- ]
+ track.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ )
if plex_track.parentKey:
track.album = self._get_item_mapping(
MediaType.ALBUM, plex_track.parentKey, plex_track.parentTitle
if plex_track.duration:
track.duration = int(plex_track.duration / 1000)
if plex_track.chapters:
- track.metadata.chapters = [
- MediaItemChapter(
- chapter_id=plex_chapter.id,
- position_start=plex_chapter.start,
- position_end=plex_chapter.end,
- title=plex_chapter.title,
- )
- for plex_chapter in plex_track.chapters
- ]
+ track.metadata.chapters = UniqueList(
+ [
+ MediaItemChapter(
+ chapter_id=plex_chapter.id,
+ position_start=plex_chapter.start,
+ position_end=plex_chapter.end,
+ title=plex_chapter.title,
+ )
+ for plex_chapter in plex_track.chapters
+ ]
+ )
return track
:param media_types: A list of media_types to include.
:param limit: Number of items to return in the search (per type).
"""
- tasks = {}
+ artists = None
+ albums = None
+ tracks = None
+ playlists = None
async with TaskGroup() as tg:
- for media_type in media_types:
- if media_type == MediaType.ARTIST:
- tasks[MediaType.ARTIST] = tg.create_task(
- self._search_and_parse(
- self._search_artist(search_query, limit), self._parse_artist
- )
+ if MediaType.ARTIST in media_types:
+ artists = tg.create_task(
+ self._search_and_parse(
+ self._search_artist(search_query, limit), self._parse_artist
)
- elif media_type == MediaType.ALBUM:
- tasks[MediaType.ARTIST] = tg.create_task(
- self._search_and_parse(
- self._search_album(search_query, limit), self._parse_album
- )
+ )
+
+ if MediaType.ALBUM in media_types:
+ albums = tg.create_task(
+ self._search_and_parse(
+ self._search_album(search_query, limit), self._parse_album
)
- elif media_type == MediaType.TRACK:
- tasks[MediaType.ARTIST] = tg.create_task(
- self._search_and_parse(
- self._search_track(search_query, limit), self._parse_track
- )
+ )
+
+ if MediaType.TRACK in media_types:
+ tracks = tg.create_task(
+ self._search_and_parse(
+ self._search_track(search_query, limit), self._parse_track
)
- elif media_type == MediaType.PLAYLIST:
- tasks[MediaType.ARTIST] = tg.create_task(
- self._search_and_parse(
- self._search_playlist(search_query, limit),
- self._parse_playlist,
- )
+ )
+
+ if MediaType.PLAYLIST in media_types:
+ playlists = tg.create_task(
+ self._search_and_parse(
+ self._search_playlist(search_query, limit),
+ self._parse_playlist,
)
+ )
search_results = SearchResults()
- for media_type, task in tasks.items():
- if media_type == MediaType.ARTIST:
- search_results.artists = task.result()
- elif media_type == MediaType.ALBUM:
- search_results.albums = task.result()
- elif media_type == MediaType.TRACK:
- search_results.tracks = task.result()
- elif media_type == MediaType.PLAYLIST:
- search_results.playlists = task.result()
+ if artists:
+ search_results.artists = artists.result()
+
+ if albums:
+ search_results.albums = albums.result()
+
+ if tracks:
+ search_results.tracks = tracks.result()
+
+ if playlists:
+ search_results.playlists = playlists.result()
return search_results
for track in tracks_obj:
yield await self._parse_track(track)
- async def get_album(self, prov_album_id) -> Album:
+ async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
if plex_album := await self._get_data(prov_album_id, PlexAlbum):
return await self._parse_album(plex_album)
tracks.append(track)
return tracks
- async def get_artist(self, prov_artist_id) -> Artist:
+ async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
if prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
# This artist does not exist in plex, so we can just load it from DB.
msg = f"Item {prov_artist_id} not found"
raise MediaNotFoundError(msg)
- async def get_track(self, prov_track_id) -> Track:
+ async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
if plex_track := await self._get_data(prov_track_id, PlexTrack):
return await self._parse_track(plex_track)
msg = f"Item {prov_track_id} not found"
raise MediaNotFoundError(msg)
- async def get_playlist(self, prov_playlist_id) -> Playlist:
+ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
if plex_playlist := await self._get_data(prov_playlist_id, PlexPlaylist):
return await self._parse_playlist(plex_playlist)
result.append(track)
return result
- async def get_artist_albums(self, prov_artist_id) -> list[Album]:
+ async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get a list of albums for the given artist."""
if not prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
plex_artist = await self._get_data(prov_artist_id, PlexArtist)
- plex_albums = await self._run_async(plex_artist.albums)
+ plex_albums = cast(list[PlexAlbum], await self._run_async(plex_artist.albums))
if plex_albums:
albums = []
for album_obj in plex_albums:
async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
"""Handle callback when an item completed streaming."""
- def mark_played():
+ def mark_played() -> None:
item = streamdetails.data
params = {"key": str(item.ratingKey), "identifier": "com.plexapp.plugins.library"}
self._plex_server.query("/:/scrobble", params=params)
if auth_token == AUTH_TOKEN_UNAUTH:
return self._myplex_account
- def _refresh_plex_token():
+ def _refresh_plex_token() -> MyPlexAccount:
if self._myplex_account is None:
myplex_account = MyPlexAccount(token=auth_token)
self._myplex_account = myplex_account