table: str,
match: dict = None,
order_by: str = None,
+ limit: int = 500,
+ offset: int = 0,
db: Optional[Db] = None,
) -> List[Mapping]:
"""Get all rows for given table."""
return await _db.fetch_all(sql_query, match)
async def get_rows_from_query(
- self, query: str, params: Optional[dict] = None, db: Optional[Db] = None
+ self,
+ query: str,
+ params: Optional[dict] = None,
+ limit: int = 500,
+ offset: int = 0,
+ db: Optional[Db] = None,
) -> List[Mapping]:
"""Get all rows for given custom query."""
async with self.get_db(db) as _db:
match: dict = None,
db: Optional[Db] = None,
) -> AsyncGenerator[Mapping, None]:
- """Iterate rows for given table."""
+ """Iterate (all) rows for given table."""
async with self.get_db(db) as _db:
sql_query = f"SELECT * FROM {table}"
if match is not None:
"""Update record in the database, merging data."""
raise NotImplementedError
- async def library(self) -> List[ItemCls]:
+ async def library(self, limit: int = 500, offset: int = 0) -> List[ItemCls]:
"""Get all in-library items."""
match = {"in_library": True}
return [
self.item_cls.from_db_row(db_row)
- for db_row in await self.mass.database.get_rows(self.db_table, match)
+ for db_row in await self.mass.database.get_rows(
+ self.db_table, match, limit=limit, offset=offset
+ )
]
async def count(self) -> int:
self,
query: Optional[str] = None,
query_params: Optional[dict] = None,
+ limit: int = 500,
+ offset: int = 0,
db: Optional[Db] = None,
) -> List[ItemCls]:
"""Fetch all records from database."""
if query is not None:
- func = self.mass.database.get_rows_from_query(query, query_params, db=db)
+ func = self.mass.database.get_rows_from_query(
+ query, query_params, limit=limit, offset=offset, db=db
+ )
else:
- func = self.mass.database.get_rows(self.db_table, db=db)
+ func = self.mass.database.get_rows(
+ self.db_table, limit=limit, offset=offset, db=db
+ )
return [self.item_cls.from_db_row(db_row) for db_row in await func]
async def get_db_item(self, item_id: int, db: Optional[Db] = None) -> ItemCls:
provider: Optional[ProviderType] = None,
provider_id: Optional[str] = None,
provider_item_ids: Optional[Tuple[str]] = None,
+ limit: int = 500,
+ offset: int = 0,
db: Optional[Db] = None,
) -> List[ItemCls]:
"""Fetch all records from database for given provider."""
assert provider or provider_id, "provider or provider_id must be supplied"
if provider == ProviderType.DATABASE or provider_id == "database":
- return await self.get_db_items(db=db)
+ return await self.get_db_items(limit=limit, offset=offset, db=db)
query = f"SELECT * FROM {self.db_table}, json_each(provider_ids)"
if provider_id is not None:
prov_ids = prov_ids.replace(",)", ")")
query += f" AND json_extract(json_each.value, '$.item_id') in {prov_ids}"
- return await self.get_db_items(query, db=db)
+ return await self.get_db_items(query, limit=limit, offset=offset, db=db)
async def iterate_db_items(
self,