from music_assistant.helpers.database import UNSET
from music_assistant.helpers.datetime import utc_timestamp
from music_assistant.helpers.json import serialize_to_json
+from music_assistant.helpers.util import parse_optional_bool
from music_assistant.models.music_provider import MusicProvider
if TYPE_CHECKING:
# abort if nothing changed
if (
cur_entry
- and cur_entry["fully_played"] == media_item.fully_played
+ and parse_optional_bool(cur_entry["fully_played"]) == media_item.fully_played
and abs((cur_entry["seconds_played"] or 0) - seconds_played) <= 2
):
return
from music_assistant.helpers.compare import compare_media_item, create_safe_string
from music_assistant.helpers.database import UNSET
from music_assistant.helpers.json import json_loads, serialize_to_json
-from music_assistant.helpers.util import guard_single_request
+from music_assistant.helpers.util import guard_single_request, parse_optional_bool
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Mapping
continue
db_row_dict[key] = json_loads(raw_value)
+ # parse "fully_played" as bool if present in the row
+ if "fully_played" in db_row_dict:
+ db_row_dict["fully_played"] = parse_optional_bool(db_row_dict["fully_played"])
+
# copy track_album --> album
if track_album := db_row_dict.get("track_album"):
db_row_dict["album"] = track_album
if resume_info_db_row["seconds_played"]:
episode.resume_position_ms = int(resume_info_db_row["seconds_played"] * 1000)
if resume_info_db_row["fully_played"] is not None:
- episode.fully_played = resume_info_db_row["fully_played"]
+ episode.fully_played = bool(resume_info_db_row["fully_played"])
# grab the episodes from the provider
# note that we do not cache any of this because its
from music_assistant.helpers.json import json_dumps, json_loads, serialize_to_json
from music_assistant.helpers.tags import split_artists
from music_assistant.helpers.uri import parse_uri
-from music_assistant.helpers.util import TaskManager, parse_title_and_version
+from music_assistant.helpers.util import TaskManager, parse_optional_bool, parse_title_and_version
from music_assistant.models.core_controller import CoreController
from music_assistant.models.music_provider import MusicProvider
from music_assistant.models.smart_fades import SmartFadesAnalysis, SmartFadesAnalysisFragment
params["userid"] = userid
if db_entry := await self.database.get_row(DB_TABLE_PLAYLOG, params):
ma_position_ms = db_entry["seconds_played"] * 1000 if db_entry["seconds_played"] else 0
- ma_fully_played = db_entry["fully_played"]
+ ma_fully_played = parse_optional_bool(db_entry["fully_played"])
# Return the higher position to ensure users never lose progress
if ma_position_ms >= provider_position_ms:
return fallback
+def parse_optional_bool(value: Any) -> bool | None:
+ """Parse an optional boolean value from various input types."""
+ if value is None:
+ return None
+ if isinstance(value, bool):
+ return value
+ if isinstance(value, str):
+ value_lower = value.strip().lower()
+ if value_lower in ("true", "1", "yes", "on"):
+ return True
+ if value_lower in ("false", "0", "no", "off"):
+ return False
+ if isinstance(value, (int, float)):
+ return bool(value)
+ return None
+
+
def merge_dict(
base_dict: dict[Any, Any],
new_dict: dict[Any, Any],