import pathlib
from typing import Final
-from music_assistant_models.config_entries import (
- ConfigEntry,
- ConfigEntryType,
- ConfigValueOption,
-)
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
+from music_assistant_models.enums import ConfigEntryType
API_SCHEMA_VERSION: Final[int] = 26
MIN_SCHEMA_VERSION: Final[int] = 24
from music_assistant.models.music_provider import MusicProvider
-class AlbumsController(MediaControllerBase[Album]):
+class AlbumsController(MediaControllerBase[Album, Album]):
"""Controller managing MediaItems of type Album."""
db_table = DB_TABLE_ALBUMS
from music_assistant.models.music_provider import MusicProvider
-class ArtistsController(MediaControllerBase[Artist]):
+class ArtistsController(MediaControllerBase[Artist, Artist | ItemMapping]):
"""Controller managing MediaItems of type Artist."""
db_table = DB_TABLE_ARTISTS
from music_assistant.models.music_provider import MusicProvider
-class AudiobooksController(MediaControllerBase[Audiobook]):
+class AudiobooksController(MediaControllerBase[Audiobook, Audiobook]):
"""Controller managing MediaItems of type Audiobook."""
db_table = DB_TABLE_AUDIOBOOKS
from music_assistant import MusicAssistant
+MediaItemTypeBound = MediaItemType | ItemMapping
ItemCls = TypeVar("ItemCls", bound="MediaItemType")
+LibraryUpdate = TypeVar("LibraryUpdate", bound="MediaItemTypeBound")
JSON_KEYS = (
"artists",
}
-class MediaControllerBase(Generic[ItemCls], metaclass=ABCMeta):
+class MediaControllerBase(Generic[ItemCls, LibraryUpdate], metaclass=ABCMeta):
"""Base model for controller managing a MediaType."""
media_type: MediaType
return None
async def update_item_in_library(
- self, item_id: str | int, update: ItemCls, overwrite: bool = False
+ self, item_id: str | int, update: LibraryUpdate, overwrite: bool = False
) -> ItemCls:
"""Update existing library record in the library database."""
await self._update_library_item(item_id, update, overwrite=overwrite)
from .base import MediaControllerBase
-class PlaylistController(MediaControllerBase[Playlist]):
+class PlaylistController(MediaControllerBase[Playlist, Playlist]):
"""Controller managing MediaItems of type Playlist."""
db_table = DB_TABLE_PLAYLISTS
from music_assistant.models.music_provider import MusicProvider
-class PodcastsController(MediaControllerBase[Podcast]):
+class PodcastsController(MediaControllerBase[Podcast, Podcast]):
"""Controller managing MediaItems of type Podcast."""
db_table = DB_TABLE_PODCASTS
from .base import MediaControllerBase
-class RadioController(MediaControllerBase[Radio]):
+class RadioController(MediaControllerBase[Radio, Radio]):
"""Controller managing MediaItems of type Radio."""
db_table = DB_TABLE_RADIOS
from .base import MediaControllerBase
-class TracksController(MediaControllerBase[Track]):
+class TracksController(MediaControllerBase[Track, Track]):
"""Controller managing MediaItems of type Track."""
db_table = DB_TABLE_TRACKS
self._dacp_info = AsyncServiceInfo(
zeroconf_type,
name=server_id,
- addresses=[await get_ip_pton(self.mass.streams.publish_ip)],
+ addresses=[await get_ip_pton(str(self.mass.streams.publish_ip))],
port=dacp_port,
properties={
"txtvers": "1",
# select audio source
if media.media_type == MediaType.ANNOUNCEMENT:
# special case: stream announcement
+ assert media.custom_data
input_format = AIRPLAY_PCM_FORMAT
audio_source = self.mass.streams.get_announcement_stream(
media.custom_data["url"],
elif media.queue_id and media.queue_item_id:
# regular queue (flow) stream request
input_format = AIRPLAY_FLOW_PCM_FORMAT
+ queue = self.mass.player_queues.get(media.queue_id)
+ assert queue
+ start_queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id)
+ assert start_queue_item
audio_source = self.mass.streams.get_flow_stream(
- queue=self.mass.player_queues.get(media.queue_id),
- start_queue_item=self.mass.player_queues.get_item(
- media.queue_id, media.queue_item_id
- ),
+ queue=queue,
+ start_queue_item=start_queue_item,
pcm_format=input_format,
)
else:
# to prevent an endless pingpong of volume changes
raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
volume = convert_airplay_volume(raop_volume)
+ assert mass_player.volume_level
if (
abs(mass_player.volume_level - volume) > 5
or (time.time() - airplay_player.last_command_sent) < 2
# device switched to another source (or is powered off)
if raop_stream := airplay_player.raop_stream:
# ignore this if we just started playing to prevent false positives
+ assert mass_player.elapsed_time
if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING:
raop_stream.prevent_playback = True
self.mass.create_task(self.monitor_prevent_playback(player_id))
async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
"""Initialize CLIRaop process for a player."""
- extra_args = []
+ assert self.prov.cliraop_bin
+ extra_args: list[str] = []
player_id = self.airplay_player.player_id
mass_player = self.mass.players.get(player_id)
if not mass_player:
return
- bind_ip = await self.mass.config.get_provider_config_value(
- self.prov.instance_id, CONF_BIND_INTERFACE
+ bind_ip = str(
+ await self.mass.config.get_provider_config_value(
+ self.prov.instance_id, CONF_BIND_INTERFACE
+ )
)
extra_args += ["-if", bind_ip]
if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False):
if prop_value := self.airplay_player.discovery_info.decoded_properties.get(prop):
extra_args += [f"-{prop}", prop_value]
sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
+ assert isinstance(sync_adjust, int)
if device_password := self.mass.config.get_raw_player_config_value(
player_id, CONF_PASSWORD, None
):
- extra_args += ["-password", device_password]
+ extra_args += ["-password", str(device_password)]
if self.prov.logger.isEnabledFor(logging.DEBUG):
extra_args += ["-debug", "5"]
elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
"""Monitor stderr for the running CLIRaop process."""
airplay_player = self.airplay_player
mass_player = self.mass.players.get(airplay_player.player_id)
- if not mass_player:
+ if not mass_player or not mass_player.active_source:
return
queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
logger = airplay_player.logger
ContentType,
ExternalID,
ImageType,
+ MediaType,
ProviderFeature,
StreamType,
)
ItemMapping,
MediaItemImage,
MediaItemType,
- MediaType,
Playlist,
ProviderMapping,
SearchResults,
media: PlexMedia = plex_track.media[0]
- media_type = (
+ content_type = (
ContentType.try_parse(media.container) if media.container else ContentType.UNKNOWN
)
media_part: PlexMediaPart = media.parts[0]
item_id=plex_track.key,
provider=self.instance_id,
audio_format=AudioFormat(
- content_type=media_type,
+ content_type=content_type,
channels=media.audioChannels,
),
stream_type=StreamType.HTTP,
data=plex_track,
)
- if media_type != ContentType.M4A:
+ if content_type != ContentType.M4A:
stream_details.path = self._plex_server.url(media_part.key, True)
if audio_stream.samplingRate:
stream_details.audio_format.sample_rate = audio_stream.samplingRate
ContentType,
ImageType,
LinkType,
+ MediaType,
ProviderFeature,
StreamType,
)
MediaItemImage,
MediaItemLink,
MediaItemType,
- MediaType,
ProviderMapping,
Radio,
SearchResults,
metadata=MediaItemMetadata(images=UniqueList([DEFAULT_THUMB])),
)
- async def get_podcast(self, prov_podcast_id: str) -> Album:
+ async def get_podcast(self, prov_podcast_id: str) -> Podcast:
"""Get full podcast details by id."""
return Podcast(
item_id=prov_podcast_id,
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve library artists from the provider."""
num_artists = self.config.get_value(CONF_KEY_NUM_ARTISTS)
+ assert isinstance(num_artists, int)
for artist_idx in range(num_artists):
yield await self.get_artist(str(artist_idx))
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve library albums from the provider."""
num_artists = self.config.get_value(CONF_KEY_NUM_ARTISTS) or 5
+ assert isinstance(num_artists, int)
num_albums = self.config.get_value(CONF_KEY_NUM_ALBUMS)
+ assert isinstance(num_albums, int)
for artist_idx in range(num_artists):
for album_idx in range(num_albums):
album_item_id = f"{artist_idx}_{album_idx}"
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from the provider."""
num_artists = self.config.get_value(CONF_KEY_NUM_ARTISTS) or 5
+ assert isinstance(num_artists, int)
num_albums = self.config.get_value(CONF_KEY_NUM_ALBUMS) or 5
+ assert isinstance(num_albums, int)
num_tracks = self.config.get_value(CONF_KEY_NUM_TRACKS)
+ assert isinstance(num_tracks, int)
for artist_idx in range(num_artists):
for album_idx in range(num_albums):
for track_idx in range(num_tracks):
async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
"""Retrieve library tracks from the provider."""
num_podcasts = self.config.get_value(CONF_KEY_NUM_PODCASTS)
+ assert isinstance(num_podcasts, int)
for podcast_idx in range(num_podcasts):
yield await self.get_podcast(str(podcast_idx))
async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
"""Retrieve library audiobooks from the provider."""
num_audiobooks = self.config.get_value(CONF_KEY_NUM_AUDIOBOOKS)
+ assert isinstance(num_audiobooks, int)
for audiobook_idx in range(num_audiobooks):
yield await self.get_audiobook(str(audiobook_idx))
"mashumaro==3.14",
"memory-tempfile==2.2.3",
"music-assistant-frontend==2.10.4",
- "music-assistant-models==1.1.9",
+ "music-assistant-models==1.1.10",
"orjson==3.10.12",
"pillow==11.0.0",
"podcastparser==0.6.10",
mashumaro==3.14
memory-tempfile==2.2.3
music-assistant-frontend==2.10.4
-music-assistant-models==1.1.9
+music-assistant-models==1.1.10
orjson==3.10.12
pillow==11.0.0
pkce==1.0.3
"""Tests for utility/helper functions."""
import pytest
-from music_assistant_models import media_items
+from music_assistant_models.enums import MediaType
from music_assistant_models.errors import MusicAssistantError
from music_assistant.helpers import uri, util
# test regular uri
test_uri = "spotify://track/123456789"
media_type, provider, item_id = await uri.parse_uri(test_uri)
- assert media_type == media_items.MediaType.TRACK
+ assert media_type == MediaType.TRACK
assert provider == "spotify"
assert item_id == "123456789"
# test spotify uri
test_uri = "spotify:track:123456789"
media_type, provider, item_id = await uri.parse_uri(test_uri)
- assert media_type == media_items.MediaType.TRACK
+ assert media_type == MediaType.TRACK
assert provider == "spotify"
assert item_id == "123456789"
# test public play/open url
test_uri = "https://open.spotify.com/playlist/5lH9NjOeJvctAO92ZrKQNB?si=04a63c8234ac413e"
media_type, provider, item_id = await uri.parse_uri(test_uri)
- assert media_type == media_items.MediaType.PLAYLIST
+ assert media_type == MediaType.PLAYLIST
assert provider == "spotify"
assert item_id == "5lH9NjOeJvctAO92ZrKQNB"
# test filename with slashes as item_id
test_uri = "filesystem://track/Artist/Album/Track.flac"
media_type, provider, item_id = await uri.parse_uri(test_uri)
- assert media_type == media_items.MediaType.TRACK
+ assert media_type == MediaType.TRACK
assert provider == "filesystem"
assert item_id == "Artist/Album/Track.flac"
# test regular url to builtin provider
test_uri = "http://radiostream.io/stream.mp3"
media_type, provider, item_id = await uri.parse_uri(test_uri)
- assert media_type == media_items.MediaType.UNKNOWN
+ assert media_type == MediaType.UNKNOWN
assert provider == "builtin"
assert item_id == "http://radiostream.io/stream.mp3"
# test local file to builtin provider
test_uri = __file__
media_type, provider, item_id = await uri.parse_uri(test_uri)
- assert media_type == media_items.MediaType.UNKNOWN
+ assert media_type == MediaType.UNKNOWN
assert provider == "builtin"
assert item_id == __file__
# test invalid uri