from __future__ import annotations
+import asyncio
from typing import TYPE_CHECKING, Any
from music_assistant_models.enums import MediaType, ProviderFeature
from music_assistant_models.errors import InvalidDataError
from music_assistant_models.media_items import Artist, Audiobook, Chapter, UniqueList
-from music_assistant.constants import DB_TABLE_AUDIOBOOKS
+from music_assistant.constants import DB_TABLE_AUDIOBOOKS, DB_TABLE_PLAYLOG
from music_assistant.controllers.media.base import MediaControllerBase
from music_assistant.helpers.compare import (
compare_audiobook,
provider_instance_id_or_domain: str,
) -> UniqueList[Chapter]:
"""Return audiobook chapters for the given provider audiobook id."""
- # always check if we have a library item for this audiobook
- library_audiobook = await self.get_library_item_by_prov_id(
+ if library_audiobook := await self.get_library_item_by_prov_id(
item_id, provider_instance_id_or_domain
- )
- if not library_audiobook:
- return await self._get_provider_audiobook_chapters(
- item_id, provider_instance_id_or_domain
- )
- # return items from first/only provider
- for provider_mapping in library_audiobook.provider_mappings:
- return await self._get_provider_audiobook_chapters(
- provider_mapping.item_id, provider_mapping.provider_instance
- )
- return UniqueList()
+ ):
+ # return items from first/only provider
+ for provider_mapping in library_audiobook.provider_mappings:
+ return await self._get_provider_audiobook_chapters(
+ provider_mapping.item_id, provider_mapping.provider_instance
+ )
+ return await self._get_provider_audiobook_chapters(item_id, provider_instance_id_or_domain)
async def versions(
self,
prov: MusicProvider = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
return []
- # prefer cache items (if any) - for streaming providers only
- cache_base_key = prov.lookup_key
- cache_key = f"audiobook.{item_id}"
- if (
- prov.is_streaming_provider
- and (cache := await self.mass.cache.get(cache_key, base_key=cache_base_key)) is not None
- ):
- return [Chapter.from_dict(x) for x in cache]
- # no items in cache - get listing from provider
+ # grab the chapters from the provider
+ # note that we do not cache any of this because its
+ # always a rather small list and we want fresh resume info
items = await prov.get_audiobook_chapters(item_id)
- # store (serializable items) in cache
- if prov.is_streaming_provider:
- self.mass.create_task(
- self.mass.cache.set(
- cache_key,
- [x.to_dict() for x in items],
- expiration=3600,
- base_key=cache_base_key,
- ),
+
+ async def set_resume_position(chapter: Chapter) -> None:
+ if chapter.resume_position_ms is not None:
+ return
+ if chapter.fully_played is not None:
+ return
+ # TODO: inject resume position info here for providers that do not natively provide it
+ resume_info_db_row = await self.mass.music.database.get_row(
+ DB_TABLE_PLAYLOG,
+ {
+ "item_id": chapter.item_id,
+ "provider": prov.lookup_key,
+ "media_type": MediaType.CHAPTER,
+ },
)
+ if resume_info_db_row is None:
+ return
+ if resume_info_db_row["seconds_played"] is not None:
+ chapter.resume_position_ms = resume_info_db_row["seconds_played"] * 1000
+ if resume_info_db_row["fully_played"] is not None:
+ chapter.fully_played = resume_info_db_row["fully_played"]
+ await asyncio.gather(*[set_resume_position(chapter) for chapter in items])
return items
async def _get_provider_dynamic_base_tracks(
ProviderUnavailableError,
UnsupportedFeaturedException,
)
-from music_assistant_models.media_items import Playlist, PlaylistTrack, Track
+from music_assistant_models.media_items import Playlist, Track
from music_assistant.constants import DB_TABLE_PLAYLISTS
from music_assistant.helpers.json import serialize_to_json
item_id: str,
provider_instance_id_or_domain: str,
force_refresh: bool = False,
- ) -> AsyncGenerator[PlaylistTrack, None]:
+ ) -> AsyncGenerator[Track, None]:
"""Return playlist tracks for the given provider playlist id."""
playlist = await self.get(
item_id,
cache_checksum: Any = None,
page: int = 0,
force_refresh: bool = False,
- ) -> list[PlaylistTrack]:
+ ) -> list[Track]:
"""Return playlist tracks for the given provider playlist id."""
assert provider_instance_id_or_domain != "library"
provider: MusicProvider = self.mass.get_provider(provider_instance_id_or_domain)
)
is not None
):
- return [PlaylistTrack.from_dict(x) for x in cache]
+ return [Track.from_dict(x) for x in cache]
# no items in cache (or force_refresh) - get listing from provider
items = await provider.get_playlist_tracks(item_id, page=page)
# store (serializable items) in cache
from __future__ import annotations
+import asyncio
from typing import TYPE_CHECKING, Any
from music_assistant_models.enums import MediaType, ProviderFeature
from music_assistant_models.errors import InvalidDataError
from music_assistant_models.media_items import Artist, Episode, Podcast, UniqueList
-from music_assistant.constants import DB_TABLE_PODCASTS
+from music_assistant.constants import DB_TABLE_PLAYLOG, DB_TABLE_PODCASTS
from music_assistant.controllers.media.base import MediaControllerBase
from music_assistant.helpers.compare import (
compare_media_item,
) -> UniqueList[Episode]:
"""Return podcast episodes for the given provider podcast id."""
# always check if we have a library item for this podcast
- library_podcast = await self.get_library_item_by_prov_id(
+ if library_podcast := await self.get_library_item_by_prov_id(
item_id, provider_instance_id_or_domain
- )
- if not library_podcast:
- return await self._get_provider_podcast_episodes(
- item_id, provider_instance_id_or_domain
- )
- # return items from first/only provider
- for provider_mapping in library_podcast.provider_mappings:
- return await self._get_provider_podcast_episodes(
- provider_mapping.item_id, provider_mapping.provider_instance
- )
- return UniqueList()
+ ):
+ # return items from first/only provider
+ for provider_mapping in library_podcast.provider_mappings:
+ return await self._get_provider_podcast_episodes(
+ provider_mapping.item_id, provider_mapping.provider_instance
+ )
+ return await self._get_provider_podcast_episodes(item_id, provider_instance_id_or_domain)
async def versions(
self,
prov: MusicProvider = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
return []
- # prefer cache items (if any) - for streaming providers only
- cache_base_key = prov.lookup_key
- cache_key = f"podcast.{item_id}"
- if (
- prov.is_streaming_provider
- and (cache := await self.mass.cache.get(cache_key, base_key=cache_base_key)) is not None
- ):
- return [Episode.from_dict(x) for x in cache]
- # no items in cache - get listing from provider
- items = await prov.get_podcast_episodes(item_id)
- # store (serializable items) in cache
- if prov.is_streaming_provider:
- self.mass.create_task(
- self.mass.cache.set(
- cache_key,
- [x.to_dict() for x in items],
- expiration=3600,
- base_key=cache_base_key,
- ),
+ # grab the episodes from the provider
+ # note that we do not cache any of this because its
+ # always a rather small list and we want fresh resume info
+ items = await prov.get_audiobook_chapters(item_id)
+
+ async def set_resume_position(episode: Episode) -> None:
+ if episode.resume_position_ms is not None:
+ return
+ if episode.fully_played is not None:
+ return
+ # TODO: inject resume position info here for providers that do not natively provide it
+ resume_info_db_row = await self.mass.music.database.get_row(
+ DB_TABLE_PLAYLOG,
+ {
+ "item_id": episode.item_id,
+ "provider": prov.lookup_key,
+ "media_type": MediaType.CHAPTER,
+ },
)
+ if resume_info_db_row is None:
+ return
+ if resume_info_db_row["seconds_played"] is not None:
+ episode.resume_position_ms = resume_info_db_row["seconds_played"] * 1000
+ if resume_info_db_row["fully_played"] is not None:
+ episode.fully_played = resume_info_db_row["fully_played"]
+ await asyncio.gather(*[set_resume_position(chapter) for chapter in items])
+ return items
return items
async def _get_provider_dynamic_base_tracks(
CONF_SYNC_INTERVAL = "sync_interval"
CONF_DELETED_PROVIDERS = "deleted_providers"
CONF_ADD_LIBRARY_ON_PLAY = "add_library_on_play"
-DB_SCHEMA_VERSION: Final[int] = 10
+DB_SCHEMA_VERSION: Final[int] = 11
class MusicController(CoreController):
return None
+ @api_command("music/mark_played")
async def mark_item_played(
- self, media_type: MediaType, item_id: str, provider_instance_id_or_domain: str
+ self,
+ media_type: MediaType,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ fully_played: bool | None = None,
+ seconds_played: int | None = None,
) -> None:
"""Mark item as played in playlog."""
timestamp = utc_timestamp()
"item_id": item_id,
"provider": prov_key,
"media_type": media_type.value,
+ "fully_played": fully_played,
+ "seconds_played": seconds_played,
"timestamp": timestamp,
},
allow_replace=True,
)
await self.database.commit()
+ @api_command("music/mark_unplayed")
+ async def mark_item_unplayed(
+ self, media_type: MediaType, item_id: str, provider_instance_id_or_domain: str
+ ) -> None:
+ """Mark item as unplayed in playlog."""
+ if provider_instance_id_or_domain == "library":
+ prov_key = "library"
+ elif prov := self.mass.get_provider(provider_instance_id_or_domain):
+ prov_key = prov.lookup_key
+ else:
+ prov_key = provider_instance_id_or_domain
+ # update generic playlog table
+ await self.database.delete(
+ DB_TABLE_PLAYLOG,
+ {
+ "item_id": item_id,
+ "provider": prov_key,
+ "media_type": media_type.value,
+ },
+ )
+ # also update playcount in library table
+ ctrl = self.get_controller(media_type)
+ db_item = await ctrl.get_library_item_by_prov_id(item_id, provider_instance_id_or_domain)
+ if db_item:
+ await self.database.execute(f"UPDATE {ctrl.db_table} SET play_count = play_count - 1")
+ await self.database.commit()
+
def get_controller(
self, media_type: MediaType
) -> (
return self.playlists
if media_type == MediaType.AUDIOBOOK:
return self.audiobooks
+ if media_type == MediaType.CHAPTER:
+ return self.audiobooks
+ if media_type == MediaType.EPISODE:
+ return self.podcasts
if media_type == MediaType.PODCAST:
return self.podcasts
+ if media_type == MediaType.EPISODE:
+ return self.podcasts
return None
def get_unique_providers(self) -> set[str]:
)
await self.database.execute("DROP TABLE IF EXISTS track_loudness")
- if prev_version <= 9:
+ if prev_version <= 10:
# recreate db tables for audiobooks and podcasts due to some mistakes in early version
await self.database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_AUDIOBOOKS}")
await self.database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_PODCASTS}")
await self.__create_database_tables()
+ try:
+ await self.database.execute(
+ f"ALTER TABLE {DB_TABLE_PLAYLOG} ADD COLUMN fully_played BOOLEAN"
+ )
+ await self.database.execute(
+ f"ALTER TABLE {DB_TABLE_PLAYLOG} ADD COLUMN seconds_played INTEGER"
+ )
+ except Exception as err:
+ if "duplicate column" not in str(err):
+ raise
# save changes
await self.database.commit()
[provider] TEXT NOT NULL,
[media_type] TEXT NOT NULL DEFAULT 'track',
[timestamp] INTEGER DEFAULT 0,
+ [fully_played] BOOLEAN,
+ [seconds_played] INTEGER,
UNIQUE(item_id, provider, media_type));"""
)
await self.database.execute(
import asyncio
import random
import time
+from types import NoneType
from typing import TYPE_CHECKING, Any, TypedDict
from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
)
from music_assistant_models.errors import (
InvalidCommand,
+ InvalidDataError,
MediaNotFoundError,
MusicAssistantError,
PlayerUnavailableError,
QueueEmpty,
UnsupportedFeaturedException,
)
-from music_assistant_models.media_items import AudioFormat, MediaItemType, Playlist, media_from_dict
+from music_assistant_models.media_items import (
+ AudioFormat,
+ Chapter,
+ Episode,
+ MediaItemType,
+ PlayableMediaItemType,
+ Playlist,
+ media_from_dict,
+)
from music_assistant_models.player import PlayerMedia
from music_assistant_models.player_queue import PlayerQueue
from music_assistant_models.queue_item import QueueItem
if TYPE_CHECKING:
from collections.abc import Iterator
- from music_assistant_models.media_items import Album, Artist, Track
+ from music_assistant_models.media_items import (
+ Album,
+ Artist,
+ Audiobook,
+ Podcast,
+ Track,
+ UniqueList,
+ )
from music_assistant_models.player import Player
CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
+CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
+CONF_DEFAULT_ENQUEUE_OPTION_CHAPTER = "default_enqueue_option_chapter"
+CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
+CONF_DEFAULT_ENQUEUE_OPTION_EPISODE = "default_enqueue_option_episode"
+CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
+CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
options=enqueue_options,
description="Define the default enqueue action for this mediatype.",
),
+ ConfigEntry(
+ key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
+ type=ConfigEntryType.STRING,
+ default_value=QueueOption.REPLACE.value,
+ label="Default enqueue option for Audiobook item(s).",
+ options=enqueue_options,
+ hidden=True,
+ ),
+ ConfigEntry(
+ key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
+ type=ConfigEntryType.STRING,
+ default_value=QueueOption.REPLACE.value,
+ label="Default enqueue option for Podcast item(s).",
+ options=enqueue_options,
+ hidden=True,
+ ),
+ ConfigEntry(
+ key=CONF_DEFAULT_ENQUEUE_OPTION_CHAPTER,
+ type=ConfigEntryType.STRING,
+ default_value=QueueOption.REPLACE.value,
+ label="Default enqueue option for Audiobook-chapter item(s).",
+ options=enqueue_options,
+ hidden=True,
+ ),
+ ConfigEntry(
+ key=CONF_DEFAULT_ENQUEUE_OPTION_EPISODE,
+ type=ConfigEntryType.STRING,
+ default_value=QueueOption.REPLACE.value,
+ label="Default enqueue option for Podcast-episode item(s).",
+ options=enqueue_options,
+ hidden=True,
+ ),
+ ConfigEntry(
+ key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
+ type=ConfigEntryType.STRING,
+ default_value=QueueOption.REPLACE.value,
+ label="Default enqueue option for Folder item(s).",
+ options=enqueue_options,
+ hidden=True,
+ ),
)
def __iter__(self) -> Iterator[PlayerQueue]:
media: MediaItemType | list[MediaItemType] | str | list[str],
option: QueueOption | None = None,
radio_mode: bool = False,
- start_item: str | None = None,
+ start_item: PlayableMediaItemType | str | None = None,
) -> None:
"""Play media item(s) on the given queue.
if option not in (QueueOption.ADD, QueueOption.NEXT):
queue.enqueued_media_items.clear()
- tracks: list[MediaItemType] = []
+ media_items: list[MediaItemType] = []
radio_source: list[MediaItemType] = []
- first_track_seen: bool = False
+ # resolve all media items
for item in media:
try:
# parse provided uri into a MA MediaItem or Basic QueueItem from URL
media_item = media_from_dict(item)
else:
media_item = item
-
# Save requested media item to play on the queue so we can use it as a source
# for Don't stop the music. Use FIFO list to keep track of the last 10 played items
if media_item.media_type in (
queue.enqueued_media_items.append(media_item)
if len(queue.enqueued_media_items) > 10:
queue.enqueued_media_items.pop(0)
-
# handle default enqueue option if needed
if option is None:
option = QueueOption(
)
if option == QueueOption.REPLACE:
self.clear(queue_id)
-
- # collect tracks to play
+ # collect media_items to play
if radio_mode:
radio_source.append(media_item)
- elif media_item.media_type == MediaType.PLAYLIST:
- tracks += await self.get_playlist_tracks(media_item, start_item)
- self.mass.create_task(
- self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
- )
- )
- elif media_item.media_type == MediaType.ARTIST:
- tracks += await self.get_artist_tracks(media_item)
- self.mass.create_task(
- self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
- )
- )
- elif media_item.media_type == MediaType.ALBUM:
- tracks += await self.get_album_tracks(media_item, start_item)
- self.mass.create_task(
- self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
- )
- )
else:
- # single track or radio item
- tracks += [media_item]
+ media_items += await self._resolve_media_items(media_item, start_item)
except MusicAssistantError as err:
# invalid MA uri or item not found error
queue.radio_source += radio_source
# Use collected media items to calculate the radio if radio mode is on
if radio_mode:
- tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=True)
+ media_items = await self._get_radio_tracks(
+ queue_id=queue_id, is_initial_radio_mode=True
+ )
# only add valid/available items
- queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x and x.available]
+ queue_items = [
+ QueueItem.from_media_item(queue_id, x) for x in media_items if x and x.available
+ ]
if not queue_items:
- if first_track_seen:
- # edge case: playlist with only one track
- return
raise MediaNotFoundError("No playable items found")
# load the items into the queue
queue.next_track_enqueued = None
self.signal_update(queue_id)
+ # handle resume point of audiobook(chapter) or podcast(episode)
+ if not seek_position and (
+ resume_position := getattr(queue_item.media_item, "resume_position", 0)
+ ):
+ seek_position = resume_position
+
# work out if we are playing an album and if we should prefer album loudness
if (
next_index is not None
and (queue_item := self.get_item(queue_id, prev_state["current_index"]))
and (stream_details := queue_item.streamdetails)
):
- seconds_streamed = prev_state["elapsed_time"]
+ seconds_played = prev_state["elapsed_time"]
+ fully_played = seconds_played >= (stream_details.duration or 3600) - 5
if music_prov := self.mass.get_provider(stream_details.provider):
- if seconds_streamed > 10:
- self.mass.create_task(music_prov.on_streamed(stream_details, seconds_streamed))
- if queue_item.media_item and seconds_streamed > 10:
+ if fully_played or (seconds_played > 10):
+ self.mass.create_task(music_prov.on_streamed(stream_details, seconds_played))
+ self.mass.create_task(
+ self.mass.music.mark_item_played(
+ stream_details.media_type,
+ stream_details.item_id,
+ stream_details.provider,
+ fully_played=fully_played,
+ seconds_played=seconds_played,
+ )
+ )
+ if queue_item.media_item and (fully_played or seconds_played > 10):
# signal 'media item played' event,
# which is useful for plugins that want to do scrobbling
self.mass.signal_event(
EventType.MEDIA_ITEM_PLAYED,
object_id=queue_item.media_item.uri,
- data=round(seconds_streamed, 2),
+ data=round(seconds_played, 2),
)
if end_of_queue_reached:
result.append(playlist_track)
return result
+ async def get_next_audio_book_chapters(
+ self, audio_book: Audiobook | None, chapter: Chapter | str | None
+ ) -> UniqueList[Chapter]:
+ """Return (next) chapter(s) and resume point for given audio book."""
+ if audio_book is None and isinstance(chapter, str | NoneType):
+ raise InvalidDataError("Either audio_book or chapter must be provided")
+ if audio_book is None:
+ audio_book = chapter.audiobook
+ self.logger.debug(
+ "Fetching chapter(s) and resume point to play for audio book %s",
+ audio_book.name,
+ )
+ all_chapters = await self.mass.music.audiobooks.chapters(
+ audio_book.item_id, audio_book.provider
+ )
+ # if a chapter was provided, a user explicitly selected a chapter to play
+ # so we need to find the index of the chapter in the list
+ if isinstance(chapter, Chapter):
+ chapter = next((x for x in all_chapters if x.uri == chapter.uri), None)
+ elif isinstance(chapter, str):
+ chapter = next((x for x in all_chapters if x.uri == chapter), None)
+ else:
+ # get first chapter that is not fully played
+ chapter = next((x for x in all_chapters if not x.fully_played), None)
+ if chapter is None:
+ # no chapters found that are not fully played, so we start at the beginning
+ chapter = next((x for x in all_chapters), None)
+ if chapter is None:
+ raise InvalidDataError(
+ f"Unable to resolve chapter to play for Audio Book {audio_book.name}"
+ )
+ # get the index of the chapter
+ chapter_index = all_chapters.index(chapter)
+ # return the (remaining) chapter(s) to play
+ return all_chapters[chapter_index:]
+
+ async def get_next_podcast_episodes(
+ self, podcast: Podcast | None, episode: Episode | str | None
+ ) -> UniqueList[Episode]:
+ """Return (next) episode(s) and resume point for given podcast."""
+ if podcast is None and isinstance(episode, str | NoneType):
+ raise InvalidDataError("Either podcast or episode must be provided")
+ if podcast is None:
+ podcast = episode.podcast
+ self.logger.debug(
+ "Fetching episode(s) and resume point to play for Podcast %s",
+ podcast.name,
+ )
+ all_episodes = await self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
+ # if a episode was provided, a user explicitly selected a episode to play
+ # so we need to find the index of the episode in the list
+ if isinstance(episode, Episode):
+ episode = next((x for x in all_episodes if x.uri == episode.uri), None)
+ elif isinstance(episode, str):
+ episode = next((x for x in all_episodes if x.uri == episode), None)
+ else:
+ # get first episode that is not fully played
+ episode = next((x for x in all_episodes if not x.fully_played), None)
+ if episode is None:
+ # no episodes found that are not fully played, so we start at the beginning
+ episode = next((x for x in all_episodes), None)
+ if episode is None:
+ raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
+ # get the index of the episode
+ episode_index = all_episodes.index(episode)
+ # return the (remaining) episode(s) to play
+ return all_episodes[episode_index:]
+
def _get_next_index(
self, queue_id: str, cur_index: int | None, is_skip: bool = False, allow_repeat: bool = True
) -> int | None:
self.mass.create_task(_enqueue_next())
+ async def _resolve_media_items(
+ self, media_item: MediaItemType, start_item: str | None = None
+ ) -> list[MediaItemType]:
+ """Resolve/unwrap media items to enqueue."""
+ if media_item.media_type == MediaType.PLAYLIST:
+ self.mass.create_task(
+ self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
+ )
+ return await self.get_playlist_tracks(media_item, start_item)
+ if media_item.media_type == MediaType.ARTIST:
+ self.mass.create_task(
+ self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
+ )
+ return await self.get_artist_tracks(media_item)
+ if media_item.media_type == MediaType.ALBUM:
+ self.mass.create_task(
+ self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
+ )
+ return await self.get_album_tracks(media_item, start_item)
+ if media_item.media_type == MediaType.AUDIOBOOK:
+ self.mass.create_task(
+ self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
+ )
+ return await self.get_next_audio_book_chapters(media_item, start_item)
+ if media_item.media_type == MediaType.CHAPTER:
+ return await self.get_next_audio_book_chapters(None, media_item)
+ if media_item.media_type == MediaType.PODCAST:
+ self.mass.create_task(
+ self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
+ )
+ return await self.get_next_podcast_episodes(media_item, start_item or media_item)
+ if media_item.media_type == MediaType.EPISODE:
+ return await self.get_next_podcast_episodes(None, media_item)
+ # all other: single track or radio item
+ return [media_item]
+
async def _get_radio_tracks(
self, queue_id: str, is_initial_radio_mode: bool = False
) -> list[Track]:
)
or queue_item.media_item
)
- # sort by quality and check track availability
+ # sort by quality and check item's availability
for prov_media in sorted(
media_item.provider_mappings, key=lambda x: x.quality or 0, reverse=True
):
continue # provider not available ?
# get streamdetails from provider
try:
- streamdetails: StreamDetails = await music_prov.get_stream_details(prov_media.item_id)
+ streamdetails: StreamDetails = await music_prov.get_stream_details(
+ prov_media.item_id, media_item.media_type
+ )
except MusicAssistantError as err:
LOGGER.warning(str(err))
else:
task_id = f"analyze_loudness_{streamdetails.uri}"
mass.create_task(analyze_loudness, mass, streamdetails, task_id=task_id)
- # mark item as played in db if finished or streamed for 30 seconds
- # NOTE that this is not the actual played time but the buffered time
- # the queue controller will update the actual played time when the item is played
- if finished or seconds_streamed > 30:
- mass.create_task(
- mass.music.mark_item_played(
- streamdetails.media_type,
- streamdetails.item_id,
- streamdetails.provider,
- )
- )
-
def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=None):
"""Generate a wave header from given params."""
if ProviderFeature.LIBRARY_PODCASTS in self.supported_features:
raise NotImplementedError
- async def get_chapter(self, prov_chapter_id: str) -> Chapter: # type: ignore[return]
- """Get (full) audiobook chapter details by id."""
- if ProviderFeature.LIBRARY_AUDIOBOOKS in self.supported_features:
- raise NotImplementedError
-
- async def get_episode(self, prov_episode_id: str) -> Episode: # type: ignore[return]
- """Get (full) podcast episode details by id."""
- if ProviderFeature.LIBRARY_PODCASTS in self.supported_features:
- raise NotImplementedError
-
async def get_album_tracks(
self,
prov_album_id: str, # type: ignore[return]
pos += 1
return episodes
- async def get_episode(self, prov_episode_id: str) -> Episode:
- """Get (full) podcast episode details by id."""
- if not self._enable_podcasts:
- return None
- if EP_CHAN_SEP not in prov_episode_id:
- return None
-
- eid, chan_id = prov_episode_id.split(EP_CHAN_SEP)
- channels = await self._run_async(self._conn.getPodcasts, incEpisodes=True, pid=chan_id)
-
- sonic_podcast = channels[0]
- sonic_episode = None
- for ep in sonic_podcast.episodes:
- if ep.id == eid:
- sonic_episode = ep
- break
-
- return self._parse_epsiode(sonic_episode, sonic_podcast)
-
async def get_podcast(self, prov_podcast_id: str) -> Podcast:
"""Get full Podcast details by id."""
if not self._enable_podcasts:
from __future__ import annotations
from collections.abc import AsyncGenerator
-from random import randint
from typing import TYPE_CHECKING
from music_assistant_models.config_entries import ConfigEntry
)
from music_assistant_models.streamdetails import StreamDetails
-from music_assistant.constants import MASS_LOGO, VARIOUS_ARTISTS_FANART
+from music_assistant.constants import MASS_LOGO, SILENCE_FILE, VARIOUS_ARTISTS_FANART
from music_assistant.models.music_provider import MusicProvider
if TYPE_CHECKING:
total_chapters=10,
)
- async def get_chapter(self, prov_chapter_id: str) -> Chapter:
- """Get (full) audiobook chapter details by id."""
- prov_audiobook_id, chapter_idx = prov_chapter_id.split("_", 2)
- return Chapter(
- item_id=prov_chapter_id,
- provider=self.instance_id,
- name=f"Test Chapter {prov_audiobook_id}-{prov_chapter_id}",
- duration=5,
- audiobook=ItemMapping(
- item_id=prov_audiobook_id,
- provider=self.instance_id,
- name=f"Test Audiobook {prov_audiobook_id}",
- media_type=MediaType.AUDIOBOOK,
- ),
- )
-
- async def get_episode(self, prov_episode_id: str) -> Episode:
- """Get (full) podcast episode details by id."""
- prov_podcast_id, episode_idx = prov_episode_id.split("_", 2)
- return Episode(
- item_id=f"{prov_podcast_id}_{episode_idx}",
- provider=self.instance_id,
- name=f"Test Episode {prov_podcast_id}-{episode_idx}",
- duration=5,
- podcast=ItemMapping(
- item_id=prov_podcast_id,
- provider=self.instance_id,
- name=f"Test Podcast {prov_podcast_id}",
- media_type=MediaType.PODCAST,
- ),
- provider_mappings={
- ProviderMapping(
- item_id=f"{prov_podcast_id}_{episode_idx}",
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- )
- },
- metadata=MediaItemMetadata(images=UniqueList([DEFAULT_THUMB])),
- episode_number=episode_idx,
- )
-
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve library artists from the provider."""
num_artists = self.config.get_value(CONF_KEY_NUM_ARTISTS)
prov_audiobook_id: str,
) -> list[Chapter]:
"""Get all Chapters for given audiobook id."""
- num_chapters = randint(5, 75)
+ num_chapters = 25
return [
- await self.get_chapter(f"{prov_audiobook_id}_{chapter_idx}")
+ Chapter(
+ item_id=f"{prov_audiobook_id}_{chapter_idx}",
+ provider=self.instance_id,
+ name=f"Test Chapter {prov_audiobook_id}-{chapter_idx}",
+ duration=5,
+ audiobook=ItemMapping(
+ item_id=prov_audiobook_id,
+ provider=self.instance_id,
+ name=f"Test Audiobook {prov_audiobook_id}",
+ media_type=MediaType.AUDIOBOOK,
+ ),
+ provider_mappings={
+ ProviderMapping(
+ item_id=f"{prov_audiobook_id}_{chapter_idx}",
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
for chapter_idx in range(num_chapters)
]
prov_podcast_id: str,
) -> list[Episode]:
"""Get all Episodes for given podcast id."""
- num_episodes = randint(5, 75)
+ num_episodes = 25
return [
- await self.get_episode(f"{prov_podcast_id}_{episode_idx}")
+ Episode(
+ item_id=f"{prov_podcast_id}_{episode_idx}",
+ provider=self.instance_id,
+ name=f"Test Episode {prov_podcast_id}-{episode_idx}",
+ duration=5,
+ podcast=ItemMapping(
+ item_id=prov_podcast_id,
+ provider=self.instance_id,
+ name=f"Test Podcast {prov_podcast_id}",
+ media_type=MediaType.PODCAST,
+ ),
+ provider_mappings={
+ ProviderMapping(
+ item_id=f"{prov_podcast_id}_{episode_idx}",
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ metadata=MediaItemMetadata(images=UniqueList([DEFAULT_THUMB])),
+ episode_number=episode_idx,
+ )
for episode_idx in range(num_episodes)
]
bit_depth=16,
channels=2,
),
- media_type=MediaType.TRACK,
+ media_type=media_type,
stream_type=StreamType.HTTP,
- path=item_id,
+ path=SILENCE_FILE,
can_seek=True,
)