return await self.get_db_item(item_id)
async def _update_db_item(
- self, item_id: int, item: Playlist, overwrite: bool = True
+ self, item_id: int, item: Playlist, overwrite: bool = False
) -> Playlist:
"""Update Playlist record in the database."""
cur_item = await self.get_db_item(item_id)
{"item_id": item_id},
{
# always prefer name/owner from updated item here
- "name": item.name,
- "sort_name": item.sort_name,
- "owner": item.owner,
+ "name": item.name or cur_item.name,
+ "sort_name": item.sort_name or cur_item.sort_name,
+ "owner": item.owner or cur_item.sort_name,
"is_editable": item.is_editable,
"metadata": serialize_to_json(metadata),
"provider_mappings": serialize_to_json(provider_mappings),
# return created object
return await self.get_db_item(item_id)
- async def _update_db_item(self, item_id: int, item: Radio, overwrite: bool = True) -> Radio:
+ async def _update_db_item(self, item_id: int, item: Radio, overwrite: bool = False) -> Radio:
"""Update Radio record in the database."""
cur_item = await self.get_db_item(item_id)
metadata = cur_item.metadata.update(getattr(item, "metadata", None), overwrite)
match,
{
# always prefer name from updated item here
- "name": item.name,
- "sort_name": item.sort_name,
+ "name": item.name or cur_item.name,
+ "sort_name": item.sort_name or cur_item.sort_name,
"metadata": serialize_to_json(metadata),
"provider_mappings": serialize_to_json(provider_mappings),
"timestamp_modified": int(utc_timestamp()),
raise NotImplementedError
return []
- async def sync_library(self, media_types: tuple[MediaType, ...] | None = None) -> None:
+ async def sync_library(self, media_types: tuple[MediaType, ...]) -> None:
"""Run library sync for this provider."""
# this reference implementation can be overridden
# with a provider specific approach if needed
- media_types = tuple(x for x in MediaType)
for media_type in media_types:
if not self.library_supported(media_type):
continue
items=sorted(subitems, key=lambda x: (x.name.casefold(), x.name)),
)
- async def sync_library(
- self, media_types: tuple[MediaType, ...] | None = None # noqa: ARG002
- ) -> None:
+ async def sync_library(self, media_types: tuple[MediaType, ...]) -> None:
"""Run library sync for this provider."""
+ if MediaType.TRACK not in media_types or MediaType.PLAYLIST not in media_types:
+ return
cache_key = f"{self.instance_id}.checksums"
prev_checksums = await self.mass.cache.get(cache_key, SCHEMA_VERSION)
save_checksum_interval = 0
def get_playlist_checksum(playlist_obj: dict) -> str:
"""Try to calculate a checksum so we can detect changes in a playlist."""
- for key in ("duration_seconds", "trackCount"):
+ for key in ("duration_seconds", "trackCount", "count"):
if key in playlist_obj:
return playlist_obj[key]
return str(int(time()))
"""Signal event to subscribers."""
if self.closing:
return
+ if (
+ event
+ in (
+ EventType.MEDIA_ITEM_ADDED,
+ EventType.MEDIA_ITEM_DELETED,
+ EventType.MEDIA_ITEM_UPDATED,
+ )
+ and self.music.in_progress_syncs
+ ):
+ # ignore media item events while sync is running because it clutters too much
+ return
if LOGGER.isEnabledFor(logging.DEBUG) and event != EventType.QUEUE_TIME_UPDATED:
# do not log queue time updated events because that is too chatty