async with MusicAssistant(mass_conf) as mass:
# get some data
- ytm = mass.music.get_provider(ProviderType.YTMUSIC)
+ # ytm = mass.music.get_provider(ProviderType.YTMUSIC)
# track = await yt.get_track("pE3ju1qS848")
- album = await ytm.get_album("MPREb_AYetWMZunqA")
- print(album)
+ # album = await ytm.get_album("MPREb_AYetWMZunqA")
+ # print(album)
+ # start sync
+ # await mass.music.start_sync(schedule=3)
+ artists = await mass.music.artists.count()
+ print(f"Got {artists} artists in library")
+ albums = await mass.music.albums.count()
+ print(f"Got {albums} albums in library")
# sd = await yt.get_stream_details(track.item_id)
# print(sd.data)
self, item: Artist, overwrite_existing: bool = False
) -> Artist:
"""Add a new item record to the database."""
- assert item.provider_ids, "Album is missing provider id(s)"
- # always try to grab existing item by musicbrainz_id
- cur_item = None
- if item.musicbrainz_id:
- match = {"musicbrainz_id": item.musicbrainz_id}
- cur_item = await self.mass.database.get_row(self.db_table, match)
- if not cur_item:
- # fallback to exact name match
- # NOTE: we match an artist by name which could theoretically lead to collisions
- # but the chance is so small it is not worth the additional overhead of grabbing
- # the musicbrainz id upfront
- match = {"sort_name": item.sort_name}
- for row in await self.mass.database.get_rows(self.db_table, match):
- row_artist = Artist.from_db_row(row)
- if row_artist.sort_name == item.sort_name:
- # just to be sure ?!
- cur_item = row_artist
- break
- if cur_item:
- # update existing
- return await self.update_db_item(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
+ assert item.provider_ids, "Artist is missing provider id(s)"
+ async with self.mass.database.get_db(db) as db:
+ # always try to grab existing item by musicbrainz_id
+ cur_item = None
+ if item.musicbrainz_id:
+ match = {"musicbrainz_id": item.musicbrainz_id}
+ cur_item = await self.mass.database.get_row(self.db_table, match, db=db)
+ if not cur_item:
+ # fallback to matching
+ # NOTE: we match an artist by name which could theoretically lead to collisions
+ # but the chance is so small it is not worth the additional overhead of grabbing
+ # the musicbrainz id upfront
+ match = {"sort_name": item.sort_name}
+ for row in await self.mass.database.get_rows(
+ self.db_table, match, db=db
+ ):
+ row_artist = Artist.from_db_row(row)
+ if row_artist.sort_name == item.sort_name:
+ # just to be sure ?!
+ cur_item = row_artist
+ break
+ if cur_item:
+ # update existing
+ return await self.update_db_item(
+ cur_item.item_id, item, overwrite=overwrite_existing, db=db
+ )
# insert item
new_item = await self.mass.database.insert(self.db_table, item.to_db_row())
ALBUM = "album"
SINGLE = "single"
COMPILATION = "compilation"
+ EP = "ep"
UNKNOWN = "unknown"
print(category)
return parsed_results
+ async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
+ """Retrieve all library artists from Youtube Music."""
+ data = {"browseId": "FEmusic_library_corpus_track_artists"}
+ response = await self._post_data(endpoint="browse", data=data)
+ response_artist = response["contents"]["singleColumnBrowseResultsRenderer"][
+ "tabs"
+ ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][
+ "itemSectionRenderer"
+ ][
+ "contents"
+ ][
+ 0
+ ][
+ "musicShelfRenderer"
+ ][
+ "contents"
+ ]
+ parsed_artists = ytmusicapi.parsers.library.parse_artists(response_artist)
+ for item in parsed_artists:
+ artist = Artist(
+ item_id=item["browseId"], provider=self.type, name=item["artist"]
+ )
+ artist.metadata.images = [
+ MediaItemImage(ImageType.THUMB, thumb["url"])
+ for thumb in item["thumbnails"]
+ ]
+ artist.add_provider_id(
+ MediaItemProviderId(
+ item_id=str(item["browseId"]), prov_type=self.type, prov_id=self.id
+ )
+ )
+ yield artist
+
+ async def get_library_albums(self) -> AsyncGenerator[Album, None]:
+ """Retrieve all library albums from Youtube Music."""
+ data = {"browseId": "FEmusic_liked_albums"}
+ response = await self._post_data(endpoint="browse", data=data)
+ response_albums = response["contents"]["singleColumnBrowseResultsRenderer"][
+ "tabs"
+ ][0]["tabRenderer"]["content"]["sectionListRenderer"]["contents"][1][
+ "itemSectionRenderer"
+ ][
+ "contents"
+ ][
+ 0
+ ][
+ "gridRenderer"
+ ][
+ "items"
+ ]
+ parsed_albums = ytmusicapi.parsers.library.parse_albums(response_albums)
+ for item in parsed_albums:
+ if item["type"] == "Single":
+ album_type = AlbumType.SINGLE
+ elif item["type"] == "EP":
+ album_type = AlbumType.EP
+ elif item["type"] == "Album":
+ album_type = AlbumType.ALBUM
+ else:
+ album_type = AlbumType.UNKNOWN
+ album = Album(
+ item_id=item["browseId"],
+ name=item["title"],
+ year=item["year"],
+ album_type=album_type,
+ provider=self.type,
+ )
+ artists = []
+ for artist in item["artists"]:
+ album_artist = Artist(
+ item_id=artist["id"], name=artist["name"], provider=self.type
+ )
+ album_artist.add_provider_id(
+ MediaItemProviderId(
+ item_id=str(artist["id"]), prov_type=self.type, prov_id=self.id
+ )
+ )
+ artists.append(album_artist)
+ album.artists = artists
+ album.metadata.images = [
+ MediaItemImage(ImageType.THUMB, thumb["url"])
+ for thumb in item["thumbnails"]
+ ]
+ album.add_provider_id(
+ MediaItemProviderId(
+ item_id=str(item["browseId"]), prov_type=self.type, prov_id=self.id
+ )
+ )
+ yield album
+
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
data = {"browseId": prov_album_id}