# retrieve genres from tracks
# TODO: retrieve style/mood ?
playlist.metadata.genres = set()
- images = set()
+ image_urls = set()
for track in await self.mass.music.playlists.tracks(
playlist.item_id, playlist.provider
):
if not playlist.image and track.image:
- images.add(track.image)
+ image_urls.add(track.image.url)
if track.media_type != MediaType.TRACK:
# filter out radio items
continue
elif track.album and track.album.metadata.genres:
playlist.metadata.genres.update(track.album.metadata.genres)
# create collage thumb/fanart from playlist tracks
- if images:
+ if image_urls:
fake_path = f"playlist_collage.{playlist.provider.value}.{playlist.item_id}"
- collage = await create_collage(self.mass, list(images))
+ collage = await create_collage(self.mass, list(image_urls))
match = {"path": fake_path, "size": 0}
await self.mass.database.insert(
TABLE_THUMBS, {**match, "data": collage}, allow_replace=True
query = f"SELECT * FROM albums WHERE artists LIKE '%\"{db_artist.item_id}\"%'"
query += f" AND provider_ids LIKE '%\"{prov_id}\"%'"
items = await self.mass.music.albums.get_db_items_by_query(query)
+ else:
+ # edge case
+ items = []
# store (serializable items) in cache
self.mass.create_task(
self.mass.cache.set(
MusicProviderFeature,
ProviderType,
)
+from music_assistant.models.errors import MediaNotFoundError
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
track = await super().get(*args, **kwargs)
# append full album details to full track item
if track.album:
- track.album = await self.mass.music.albums.get(
- track.album.item_id, track.album.provider
- )
+ try:
+ track.album = await self.mass.music.albums.get(
+ track.album.item_id, track.album.provider
+ )
+ except MediaNotFoundError:
+ # edge case where playlist track has invalid albumdetails
+ self.logger.warning("Unable to fetch album details %s", track.album.uri)
# append full artist details to full track item
full_artists = []
for artist in track.artists:
"""
if db_track.provider != ProviderType.DATABASE:
return # Matching only supported for database items
- if isinstance(db_track.album, ItemMapping):
- # matching only works if we have a full track object
- db_track = await self.get_db_item(db_track.item_id)
for provider in self.mass.music.providers:
if MusicProviderFeature.SEARCH not in provider.supported_features:
continue
for player in self.players:
if not player.available:
continue
- if cur_tick == interval or (
- player.active_queue.active
- and player.state
- in (
- PlayerState.PLAYING,
- PlayerState.PAUSED,
- )
+ if cur_tick == interval:
+ self.mass.loop.call_soon(player.update_state)
+ elif (
+ player.active_queue.queue_id == player.player_id
+ and player.active_queue.active
+ and player.state == PlayerState.PLAYING
):
- player.update_state()
+ self.mass.loop.call_soon(player.active_queue.on_player_update)
if cur_tick == interval:
cur_tick = 0
else:
@staticmethod
async def serve_silence(request: web.Request):
"""Serve some nice silence."""
- duration = int(request.query.get("duration", 3600))
+ duration = int(request.query.get("duration", 60))
fmt = ContentType.try_parse(request.match_info["fmt"])
resp = web.StreamResponse(
item_in_buf = queue_stream.queue.get_item(queue_stream.index_in_buffer)
if item_in_buf and item_in_buf.name:
title = item_in_buf.name
- image = item_in_buf.image or ""
+ if item_in_buf.image and not item_in_buf.image.is_file:
+ image = item_in_buf.media_item.image.url
+ else:
+ image = ""
else:
title = "Music Assistant"
image = ""
pcm_bit_depth=pcm_bit_depth,
pcm_channels=pcm_channels,
allow_resample=allow_resample,
- autostart=True,
)
# cleanup stale previous queue tasks
- self.mass.create_task(self.cleanup_stale)
+ asyncio.create_task(self.cleanup_stale())
return stream
- def cleanup_stale(self) -> None:
+ async def cleanup_stale(self) -> None:
"""Cleanup stale/done stream tasks."""
stale = set()
for stream_id, stream in self.queue_streams.items():
pcm_channels: int = 2,
pcm_floating_point: bool = False,
allow_resample: bool = False,
- autostart: bool = False,
):
"""Init QueueStreamJob instance."""
self.queue = queue
self.logger = self.queue.logger.getChild("stream")
self.expected_clients = 1
self.connected_clients: Dict[str, CoroutineType[bytes]] = {}
- self.seconds_streamed = 0
+ self.total_seconds_streamed = 0
self.streaming_started = 0
self.done = asyncio.Event()
self.all_clients_connected = asyncio.Event()
self.output_chunksize = get_chunksize(
output_format, pcm_sample_rate, pcm_bit_depth
)
- if autostart:
- self.mass.create_task(self.start())
-
- async def start(self) -> None:
- """Start running queue stream."""
+ self.sample_size_per_second = get_chunksize(
+ ContentType.from_bit_depth(pcm_bit_depth, pcm_floating_point),
+ pcm_sample_rate,
+ pcm_bit_depth,
+ pcm_channels,
+ )
self._runner_task = self.mass.create_task(self._queue_stream_runner())
async def stop(self) -> None:
"0",
"-",
]
- # get the raw pcm bytes from the queue stream and on the fly encode to wanted format
+ # get the raw pcm bytes from the queue stream and on-the-fly encode to wanted format
# send the compressed/encoded stream to the client(s).
async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc:
"""Task that sends the raw pcm audio to the ffmpeg process."""
async for audio_chunk in self._get_queue_stream():
await ffmpeg_proc.write(audio_chunk)
+ self.total_seconds_streamed += (
+ len(audio_chunk) / self.sample_size_per_second
+ )
# write eof when last packet is received
ffmpeg_proc.write_eof()
):
self.connected_clients.pop(client_id, None)
- # complete queue streamed
- if self.signal_next is not None and not self.queue.announcement_in_progress:
- # the queue stream was aborted (e.g. because of sample rate mismatch)
- # tell the queue to load the next track (restart stream) as soon
- # as the player finished playing and returns to idle
- self.queue.signal_next = self.signal_next
-
# all queue data has been streamed. Either because the queue is exhausted
# or we need to restart the stream due to decoder/sample rate mismatch
# set event that this stream task is finished
if (
use_crossfade
and self.queue.settings.crossfade_mode != CrossFadeMode.ALWAYS
- and prev_track is not None
+ and prev_track
and prev_track.media_type == MediaType.TRACK
and queue_track.media_type == MediaType.TRACK
):
use_crossfade = False
prev_track = queue_track
- # calculate sample_size based on PCM params for 1 second of audio
- input_format = ContentType.from_bit_depth(
- self.pcm_bit_depth, self.pcm_floating_point
- )
- sample_size_per_second = get_chunksize(
- input_format,
- self.pcm_sample_rate,
- self.pcm_bit_depth,
- self.pcm_channels,
- )
- crossfade_duration = self.queue.settings.crossfade_duration
- crossfade_size = sample_size_per_second * crossfade_duration
- # buffer_duration has some overhead to account for padded silence
- buffer_duration = (crossfade_duration or 1) * 2
- # predict total size to expect for this track from duration
- stream_duration = (queue_track.duration or 0) - seek_position
-
self.logger.info(
"Start Streaming queue track: %s (%s) for queue %s - crossfade: %s",
queue_track.uri,
self.queue.player.name,
use_crossfade,
)
+
+ # set some basic vars
+ if last_fadeout_part:
+ crossfade_duration = (
+ len(last_fadeout_part) / self.sample_size_per_second
+ )
+ else:
+ crossfade_duration = self.queue.settings.crossfade_duration
+ crossfade_size = self.sample_size_per_second * crossfade_duration
queue_track.streamdetails.seconds_skipped = seek_position
+ # predict total size to expect for this track from duration
+ stream_duration = (queue_track.duration or 0) - seek_position
# send signal that we've loaded a new track into the buffer
self.index_in_buffer = queue_index
self.queue.signal_update()
+ # precache the streamdetails for the next track
+ self.mass.create_task(self._precache_next_streamdetails())
+
buffer = b""
bytes_written = 0
- seconds_streamed = 0
+ chunk_num = 0
# handle incoming audio chunks
async for chunk in get_media_stream(
self.mass,
sample_rate=self.pcm_sample_rate,
channels=self.pcm_channels,
seek_position=seek_position,
- chunk_size=sample_size_per_second,
+ chunk_size=self.sample_size_per_second,
):
- seconds_streamed += 1
- self.seconds_streamed += 1
- seconds_in_buffer = len(buffer) / sample_size_per_second
- # try to make a rough assumption of how many seconds the player has in buffer
- player_in_buffer = self.seconds_streamed - (
- time() - self.streaming_started
+ chunk_num += 1
+ seconds_in_buffer = len(buffer) / self.sample_size_per_second
+ # try to make a rough assumption of how many seconds is buffered ahead by the player(s)
+ buffered_ahead = (
+ self.total_seconds_streamed - self.queue.player.elapsed_time or 0
)
+ # use dynamic buffer size to account for slow connections (or throttling providers, like YT)
+ # buffer_duration has some overhead to account for padded silence
+ if use_crossfade and buffered_ahead > (crossfade_duration * 4):
+ buffer_duration = crossfade_duration + 6
+ elif use_crossfade and buffered_ahead > (crossfade_duration * 2):
+ buffer_duration = crossfade_duration + 4
+ else:
+ buffer_duration = 2
#### HANDLE FIRST PART OF TRACK
queue_track.streamdetails.seconds_streamed = 0
break
- # bypass any processing for radiostreams and announcements
- if (
- streamdetails.media_type == MediaType.ANNOUNCEMENT
- or not stream_duration
- or stream_duration < buffer_duration
- or player_in_buffer < buffer_duration
- ):
- # handle edge case where we have a previous chunk in buffer
- # and the next track is too short
- if last_fadeout_part:
- yield last_fadeout_part
- last_fadeout_part = b""
- yield chunk
- bytes_written += len(chunk)
- continue
-
# buffer full for crossfade
if last_fadeout_part and (seconds_in_buffer >= buffer_duration):
# strip silence of start
crossfade_part = await crossfade_pcm_parts(
fadein_part,
last_fadeout_part,
- crossfade_duration,
- pcm_fmt,
+ self.pcm_bit_depth,
self.pcm_sample_rate,
)
# send crossfade_part
continue
# last part of track: fill buffer
- if buffer or (seconds_streamed >= (stream_duration - buffer_duration)):
+ if buffer or (chunk_num >= (stream_duration - buffer_duration)):
buffer += chunk
continue
#### HANDLE END OF TRACK
self.logger.debug(
- "end of track reached - seconds_streamed: %s - seconds_in_buffer: %s - stream_duration: %s",
- seconds_streamed,
+ "end of track reached - chunk_num: %s - stream_buffer: %s - stream_duration: %s - player_buffer: %s",
+ chunk_num,
seconds_in_buffer,
stream_duration,
+ buffered_ahead,
)
if buffer:
last_part = await strip_silence(
buffer, pcm_fmt, self.pcm_sample_rate, reverse=True
)
- # if crossfade is enabled, save fadeout part to pickup for next track
- if use_crossfade and len(last_part) > crossfade_size:
- # yield remaining bytes from strip action,
- # we only need the crossfade_size part
- last_fadeout_part = last_part[-crossfade_size:]
- remaining_bytes = last_part[:-crossfade_size]
- yield remaining_bytes
- bytes_written += len(remaining_bytes)
- elif use_crossfade:
- last_fadeout_part = last_part
+ if use_crossfade:
+ # if crossfade is enabled, save fadeout part to pickup for next track
+ if len(last_part) < crossfade_size <= len(buffer):
+ # the chunk length is too short after stripping silence, only use first part
+ last_fadeout_part = buffer[:crossfade_size]
+ elif use_crossfade and len(last_part) > crossfade_size:
+ # yield remaining bytes from strip action,
+ # we only need the crossfade_size part
+ last_fadeout_part = last_part[-crossfade_size:]
+ remaining_bytes = last_part[:-crossfade_size]
+ yield remaining_bytes
+ bytes_written += len(remaining_bytes)
+ elif use_crossfade:
+ last_fadeout_part = last_part
else:
# no crossfade enabled, just yield the stripped audio data
yield last_part
# end of the track reached - store accurate duration
buffer = b""
queue_track.streamdetails.seconds_streamed = (
- bytes_written / sample_size_per_second
+ bytes_written / self.sample_size_per_second
)
self.logger.debug(
"Finished Streaming queue track: %s (%s) on queue %s",
# END OF QUEUE STREAM
self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name)
+ async def _precache_next_streamdetails(self) -> None:
+ """Prefetch the streamdetails for the next track."""
+ next_index = self.queue.get_next_index(self.index_in_buffer)
+ if next_index <= self.index_in_buffer:
+ return
+ queue_track = self.queue.get_item(next_index)
+ if not queue_track:
+ return
+ await get_stream_details(self.mass, queue_track, self.queue.queue_id)
+
async def _check_stop(self) -> bool:
"""Schedule stop of queue stream."""
# Stop this queue stream when no clients (re)connected within 5 seconds
if len(self.connected_clients) > 0:
return False
await asyncio.sleep(0.5)
- self.mass.create_task(self.stop())
+ asyncio.create_task(self.stop())
return True
async def crossfade_pcm_parts(
fade_in_part: bytes,
fade_out_part: bytes,
- fade_length: int,
- fmt: ContentType,
+ bit_depth: int,
sample_rate: int,
channels: int = 2,
) -> bytes:
"""Crossfade two chunks of pcm/raw audio using ffmpeg."""
+ sample_size = int(sample_rate * (bit_depth / 8) * channels)
+ fmt = ContentType.from_bit_depth(bit_depth)
+ # calculate the fade_length from the smallest chunk
+ fade_length = min(len(fade_in_part), len(fade_out_part)) / sample_size
fadeoutfile = create_tempfile()
async with aiofiles.open(fadeoutfile.name, "wb") as outfile:
await outfile.write(fade_out_part)
crossfade_data, _ = await proc.communicate(fade_in_part)
if crossfade_data:
LOGGER.debug(
- "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - result: %s",
+ "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - fade_length: %s seconds",
len(fade_in_part),
len(fade_out_part),
- len(crossfade_data),
+ fade_length,
)
return crossfade_data
# no crossfade_data, return original data instead
LOGGER.debug(
- "crossfade of pcm chunks failed: not enough data. fade_in_part: %s - fade_out_part: %s",
+ "crossfade of pcm chunks failed: not enough data? fade_in_part: %s - fade_out_part: %s",
len(fade_in_part),
len(fade_out_part),
)
from __future__ import annotations
import asyncio
-import functools
import logging
-import threading
from collections import deque
from functools import partial
from time import time
async def setup(self) -> None:
"""Async setup of music assistant."""
# initialize loop
- self.loop = asyncio.get_event_loop()
+ self.loop = asyncio.get_running_loop()
# create shared aiohttp ClientSession
if not self.http_session:
self.http_session = aiohttp.ClientSession(
continue
if not (id_filter is None or event.object_id in id_filter):
continue
- self.create_task(cb_func, event)
+ if asyncio.iscoroutinefunction(cb_func):
+ asyncio.run_coroutine_threadsafe(cb_func(event), self.loop)
+ else:
+ self.loop.call_soon_threadsafe(cb_func, event)
def subscribe(
self,
def create_task(
self,
- target: Callable[..., Any],
+ target: Coroutine,
*args: Any,
**kwargs: Any,
) -> Union[asyncio.Task, asyncio.Future]:
if self.closed:
return
- # Check for partials to properly determine if coroutine function
- check_target = target
- while isinstance(check_target, functools.partial):
- check_target = check_target.func
-
- async def executor_wrapper(_target: Callable, *_args, **_kwargs):
- return await self.loop.run_in_executor(None, _target, *_args, **_kwargs)
-
- # called from other thread
- if threading.current_thread() is not threading.main_thread():
- if asyncio.iscoroutine(check_target):
- task = asyncio.run_coroutine_threadsafe(target, self.loop)
- elif asyncio.iscoroutinefunction(check_target):
- task = asyncio.run_coroutine_threadsafe(target(*args), self.loop)
- else:
- task = asyncio.run_coroutine_threadsafe(
- executor_wrapper(target, *args, **kwargs), self.loop
- )
+ if asyncio.iscoroutinefunction(target):
+ task = self.loop.create_task(target(*args, **kwargs))
else:
- if asyncio.iscoroutine(check_target):
- task = self.loop.create_task(target)
- elif asyncio.iscoroutinefunction(check_target):
- task = self.loop.create_task(target(*args))
- else:
- task = self.loop.create_task(executor_wrapper(target, *args, **kwargs))
+ task = self.loop.create_task(target)
def task_done_callback(*args, **kwargs):
self._tracked_tasks.remove(task)
sql_query, params, limit=limit, offset=offset
)
count = len(items)
- if count < limit:
+ if 0 < count < limit:
total = offset + count
else:
total = await self.mass.database.get_count_from_query(sql_query, params)
# mark as favorite/library item on provider(s)
for prov_id in prov_item.provider_ids:
if prov := self.mass.music.get_provider(prov_id.prov_id):
+ if not prov.library_edit_supported(self.media_type):
+ continue
await prov.library_add(prov_id.item_id, self.media_type)
# mark as library item in internal db if db item
if prov_item.provider == ProviderType.DATABASE:
# unmark as favorite/library item on provider(s)
for prov_id in prov_item.provider_ids:
if prov := self.mass.music.get_provider(prov_id.prov_id):
+ if not prov.library_edit_supported(self.media_type):
+ continue
await prov.library_remove(prov_id.item_id, self.media_type)
# unmark as library item in internal db if db item
if prov_item.provider == ProviderType.DATABASE:
async def remove_prov_mapping(self, item_id: int, prov_id: str) -> None:
"""Remove provider id(s) from item."""
- if db_item := await self.get_db_item(item_id):
- db_item.provider_ids = {
- x for x in db_item.provider_ids if x.prov_id != prov_id
- }
- if not db_item.provider_ids:
- # item has no more provider_ids left, it is completely deleted
- try:
- await self.delete_db_item(db_item.item_id)
- except AssertionError:
- self.logger.debug(
- "Could not delete %s: it has items attached", db_item.item_id
- )
- return
- await self.update_db_item(db_item.item_id, db_item, overwrite=True)
+ try:
+ db_item = await self.get_db_item(item_id)
+ except MediaNotFoundError:
+ # edge case: already deleted / race condition
+ return
+
+ db_item.provider_ids = {x for x in db_item.provider_ids if x.prov_id != prov_id}
+ if not db_item.provider_ids:
+ # item has no more provider_ids left, it is completely deleted
+ try:
+ await self.delete_db_item(db_item.item_id)
+ except AssertionError:
+ self.logger.debug(
+ "Could not delete %s: it has items attached", db_item.item_id
+ )
+ return
+ await self.update_db_item(db_item.item_id, db_item, overwrite=True)
self.logger.debug("removed provider %s from item id %s", prov_id, item_id)
setattr(self, fld.name, new_val)
elif cur_val is None or allow_overwrite:
setattr(self, fld.name, new_val)
+ elif new_val and fld.name in ("checksum", "popularity", "last_refresh"):
+ # some fields are always allowed to be overwritten (such as checksum and last_refresh)
+ setattr(self, fld.name, new_val)
return self
return any(x.available for x in self.provider_ids)
@property
- def image(self) -> str | None:
+ def image(self) -> MediaItemImage | None:
"""Return (first/random) image/thumb from metadata (if any)."""
if self.metadata is None or self.metadata.images is None:
return None
return next(
- (x.url for x in self.metadata.images if x.type == ImageType.THUMB), None
+ (x for x in self.metadata.images if x.type == ImageType.THUMB), None
)
def add_provider_id(self, prov_id: MediaItemProviderId) -> None:
return hash((self.provider, self.item_id))
@property
- def image(self) -> str | None:
+ def image(self) -> MediaItemImage | None:
"""Return (first/random) image/thumb from metadata (if any)."""
if image := super().image:
return image
if media_type == MediaType.RADIO:
return MusicProviderFeature.LIBRARY_RADIOS in self.supported_features
+ def library_edit_supported(self, media_type: MediaType) -> bool:
+ """Return if Library add/remove is supported for given MediaType on this provider."""
+ if media_type == MediaType.ARTIST:
+ return MusicProviderFeature.LIBRARY_ARTISTS_EDIT in self.supported_features
+ if media_type == MediaType.ALBUM:
+ return MusicProviderFeature.LIBRARY_ALBUMS_EDIT in self.supported_features
+ if media_type == MediaType.TRACK:
+ return MusicProviderFeature.LIBRARY_TRACKS_EDIT in self.supported_features
+ if media_type == MediaType.PLAYLIST:
+ return (
+ MusicProviderFeature.LIBRARY_PLAYLISTS_EDIT in self.supported_features
+ )
+ if media_type == MediaType.RADIO:
+ return MusicProviderFeature.LIBRARY_RADIOS_EDIT in self.supported_features
+
def _get_library_gen(self, media_type: MediaType) -> AsyncGenerator[MediaItemType]:
"""Return library generator for given media_type."""
if media_type == MediaType.ARTIST:
if child_player_id == self.player_id:
continue
if player := self.mass.players.get_player(child_player_id):
- self.mass.create_task(
+ self.mass.loop.call_soon_threadsafe(
player.on_parent_update, self.player_id, changed_keys
)
# update group player(s) when child updates
for group_player in self.get_group_parents():
- self.mass.create_task(
+ self.mass.loop.call_soon_threadsafe(
group_player.on_child_update, self.player_id, changed_keys
)
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
+from music_assistant.helpers.util import try_parse_int
from music_assistant.models.enums import (
ContentType,
EventType,
def create_announcement(_url: str):
return QueueItem(
- uri=_url,
name="announcement",
duration=30,
streamdetails=StreamDetails(
gain_correct=4,
direct=_url,
),
- media_type=MediaType.ANNOUNCEMENT,
)
try:
await self.player.play_url(stream.url)
# wait for the player to finish playing
- await asyncio.sleep(5)
await self._wait_for_state(PlayerState.PLAYING, silence_item.item_id)
except Exception as err: # pylint: disable=broad-except
# save items
self.mass.create_task(
self.mass.cache.set(
- f"queue.{self.queue_id}.items",
+ f"queue.items.{self.queue_id}",
[x.to_dict() for x in self._items],
)
)
async def _restore_items(self) -> None:
"""Try to load the saved state from cache."""
- if queue_cache := await self.mass.cache.get(f"queue.{self.queue_id}.items"):
+ if queue_cache := await self.mass.cache.get(f"queue.items.{self.queue_id}"):
try:
self._items = [QueueItem.from_dict(x) for x in queue_cache]
except (KeyError, AttributeError, TypeError) as err:
# restore state too
db_key = f"queue.{self.queue_id}.current_index"
if db_value := await self.mass.database.get_setting(db_key):
- self._current_index = int(db_value)
+ self._current_index = try_parse_int(db_value)
db_key = f"queue.{self.queue_id}.current_item_elapsed_time"
if db_value := await self.mass.database.get_setting(db_key):
- self._current_item_elapsed_time = int(db_value)
+ self._current_item_elapsed_time = try_parse_int(db_value)
await self.settings.restore()
from mashumaro import DataClassDictMixin
from music_assistant.models.enums import MediaType
-from music_assistant.models.media_items import Radio, StreamDetails, Track
+from music_assistant.models.media_items import (
+ ItemMapping,
+ MediaItemImage,
+ Radio,
+ StreamDetails,
+ Track,
+)
@dataclass
class QueueItem(DataClassDictMixin):
"""Representation of a queue item."""
- uri: str
name: str = ""
duration: Optional[int] = None
item_id: str = ""
sort_index: int = 0
streamdetails: Optional[StreamDetails] = None
- media_type: MediaType = MediaType.UNKNOWN
- image: Optional[str] = None
- available: bool = True
media_item: Union[Track, Radio, None] = None
+ image: Optional[MediaItemImage] = None
def __post_init__(self):
"""Set default values."""
if not self.item_id:
self.item_id = str(uuid4())
+ if self.streamdetails and self.streamdetails.stream_title:
+ self.name = self.streamdetails.stream_title
if not self.name:
self.name = self.uri
d.pop("streamdetails", None)
return d
- def __post_serialize__(self, d: Dict[Any, Any]) -> Dict[Any, Any]:
- """Run actions before serialization."""
- if self.media_type == MediaType.RADIO:
- d.pop("duration")
- return d
+ @property
+ def uri(self) -> str:
+ """Return uri for this QueueItem (for logging purposes)."""
+ if self.media_item:
+ return self.media_item.uri
+ return self.item_id
+
+ @property
+ def media_type(self) -> MediaType:
+ """Return MediaType for this QueueItem (for convenience purposes)."""
+ if self.media_item:
+ return self.media_item.media_type
+ return MediaType.UNKNOWN
@classmethod
def from_media_item(cls, media_item: Track | Radio):
if isinstance(media_item, Track):
artists = "/".join((x.name for x in media_item.artists))
name = f"{artists} - {media_item.name}"
+ # save a lot of data/bandwidth by simplifying nested objects
+ media_item.artists = [ItemMapping.from_item(x) for x in media_item.artists]
+ if media_item.album:
+ media_item.album = ItemMapping.from_item(media_item.album)
+ media_item.albums = []
else:
name = media_item.name
return cls(
- uri=media_item.uri,
name=name,
duration=media_item.duration,
- media_type=media_item.media_type,
media_item=media_item,
image=media_item.image,
- available=media_item.available,
)
import os
import urllib.parse
from contextlib import asynccontextmanager
-from pathlib import Path
from time import time
from typing import AsyncGenerator, List, Optional, Set, Tuple
from music_assistant.constants import VARIOUS_ARTISTS, VARIOUS_ARTISTS_ID
from music_assistant.helpers.compare import compare_strings
+from music_assistant.helpers.playlists import parse_m3u, parse_pls
from music_assistant.helpers.tags import parse_tags, split_items
from music_assistant.helpers.util import create_safe_string, parse_title_and_version
from music_assistant.models.enums import MusicProviderFeature, ProviderType
from music_assistant.models.music_provider import MusicProvider
TRACK_EXTENSIONS = ("mp3", "m4a", "mp4", "flac", "wav", "ogg", "aiff", "wma", "dsf")
-PLAYLIST_EXTENSIONS = ("m3u",)
+PLAYLIST_EXTENSIONS = ("m3u", "pls")
SUPPORTED_EXTENSIONS = TRACK_EXTENSIONS + PLAYLIST_EXTENSIONS
+IMAGE_EXTENSIONS = ("jpg", "jpeg", "JPG", "JPEG", "png", "PNG", "gif", "GIF")
SCHEMA_VERSION = 17
LOGGER = logging.getLogger(__name__)
subitems.append(db_item)
continue
if ext in PLAYLIST_EXTENSIONS:
+ item_id = self._get_item_id(full_path)
if db_item := await self.mass.music.playlists.get_db_item_by_prov_id(
item_id, provider_id=self.id
):
playlist_path = await self._get_filepath(MediaType.PLAYLIST, prov_playlist_id)
if not await self.exists(playlist_path):
raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
- playlist_base_path = Path(playlist_path).parent
+ parentdir = os.path.dirname(playlist_path)
+ _, ext = playlist_path.rsplit(".", 1)
try:
async with self.open_file(playlist_path, "r") as _file:
- for line_no, line in enumerate(await _file.readlines()):
- line = urllib.parse.unquote(line.strip())
- if line and not line.startswith("#"):
- # TODO: add support for .pls playlist files
- if media_item := await self._parse_playlist_line(
- line, playlist_base_path
- ):
- # use the linenumber as position for easier deletions
- media_item.position = line_no
- result.append(media_item)
+ playlist_data = await _file.read()
+
+ if ext in ("m3u", "m3u8"):
+ playlist_lines = await parse_m3u(playlist_data)
+ else:
+ playlist_lines = await parse_pls(playlist_data)
+
+ for line_no, playlist_line in enumerate(playlist_lines):
+
+ if media_item := await self._parse_playlist_line(
+ playlist_line, parentdir
+ ):
+ # use the linenumber as position for easier deletions
+ media_item.position = line_no
+ result.append(media_item)
+
except Exception as err: # pylint: disable=broad-except
self.logger.warning(
"Error while parsing playlist %s", playlist_path, exc_info=err
) -> Track | Radio | None:
"""Try to parse a track from a playlist line."""
try:
- # try to treat uri as filename first
- if await self.exists(line):
- file_path = await self.resolve(line)
- return await self._parse_track(file_path)
+ # try to treat uri as (relative) filename
+ if "://" not in line:
+ for filename in (line, os.path.join(playlist_path, line)):
+ if not await self.exists(filename):
+ continue
+ file_path = await self.resolve(filename)
+ return await self._parse_track(file_path)
# fallback to generic uri parsing
return await self.mass.music.get_item_by_uri(line)
except MusicAssistantError as err:
# cover image - prefer album image, fallback to embedded
if track.album and track.album.image:
- track.metadata.images = [
- MediaItemImage(ImageType.THUMB, track.album.image, True)
- ]
+ track.metadata.images = [track.album.image]
elif tags.has_cover_image:
# we do not actually embed the image in the metadata because that would consume too
# much space and bandwidth. Instead we set the filename as value so the image can
if genre := info.get("genre"):
artist.metadata.genres = set(split_items(genre))
# find local images
- images = []
- for _path in await self.mass.loop.run_in_executor(
- None, os.scandir, artist_path
- ):
- if "." not in _path.path or _path.is_dir():
- continue
- filename, ext = _path.path.rsplit(os.sep, 1)[-1].split(".", 1)
- if ext not in ("jpg", "png"):
- continue
- try:
- images.append(MediaItemImage(ImageType(filename), _path.path, True))
- except ValueError:
- if "folder" in filename:
- images.append(MediaItemImage(ImageType.THUMB, _path.path, True))
- elif "Artist" in filename:
- images.append(MediaItemImage(ImageType.THUMB, _path.path, True))
- if images:
- artist.metadata.images = images
+ artist.metadata.images = await self._get_local_images(artist_path) or None
return artist
album.name, album.version = parse_title_and_version(album.name)
# find local images
- images = []
- async for _path in scantree(album_path):
- if "." not in _path.path or _path.is_dir():
- continue
- filename, ext = _path.path.rsplit(os.sep, 1)[-1].split(".", 1)
- if ext not in ("jpg", "png"):
- continue
- try:
- images.append(MediaItemImage(ImageType(filename), _path.path, True))
- except ValueError:
- if "folder" in filename:
- images.append(MediaItemImage(ImageType.THUMB, _path.path, True))
- elif "AlbumArt" in filename:
- images.append(MediaItemImage(ImageType.THUMB, _path.path, True))
- if images:
- album.metadata.images = images
+ album.metadata.images = await self._get_local_images(album_path) or None
return album
if not await self.exists(playlist_path):
raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
- name = playlist_path.split(os.sep)[-1].replace(".m3u", "")
+ playlist_path_base, ext = playlist_path.rsplit(".", 1)
+ name = playlist_path_base.split(os.sep)[-1]
playlist = Playlist(playlist_item_id, provider=self.type, name=name)
- playlist.is_editable = True
+ playlist.is_editable = ext != "pls" # can only edit m3u playlists
playlist.add_provider_id(
MediaItemProviderId(
def _get_item_id(self, file_path: str) -> str:
"""Create item id from filename."""
return create_safe_string(file_path.replace(self.config.path, ""))
+
+ async def _get_local_images(self, folder: str) -> List[MediaItemImage]:
+ """Return local images found in a given folderpath."""
+ images = []
+ async for _path in scantree(folder):
+ if "." not in _path.path or _path.is_dir():
+ continue
+ for ext in IMAGE_EXTENSIONS:
+ if not _path.path.endswith(f".{ext}"):
+ continue
+ filename = _path.path.rsplit(os.sep, 1)[-1].replace(f".{ext}", "")
+ try:
+ images.append(MediaItemImage(ImageType(filename), _path.path, True))
+ except ValueError:
+ if "folder" in filename:
+ images.append(MediaItemImage(ImageType.THUMB, _path.path, True))
+ elif "AlbumArt" in filename:
+ images.append(MediaItemImage(ImageType.THUMB, _path.path, True))
+ elif "Artist" in filename:
+ images.append(MediaItemImage(ImageType.THUMB, _path.path, True))
+ return images
"""Handle async initialization of the provider."""
if not self.config.enabled:
return False
- if not self.config.username or not self.config.password:
- raise LoginFailed("Invalid login credentials")
# try to get a token, raise if that fails
self._cache_dir = os.path.join(CACHE_DIR, self.id)
- token = await self.get_token()
- if not token:
- try:
- # a spotify free/basic account can be recoognized when
- # the username consists of numbers only - check that here
- int(self.config.username)
- # an integer can be parsed of the username, this is a free account
- raise LoginFailed("Only Spotify Premium accounts are supported")
- except ValueError:
- # pylint: disable=raise-missing-from
- raise LoginFailed(f"Login failed for user {self.config.username}")
+ # try login which will raise if it fails
+ await self.login()
return True
async def search(
if not track:
raise MediaNotFoundError(f"track {item_id} not found")
# make sure that the token is still valid by just requesting it
- await self.get_token()
+ await self.login()
return StreamDetails(
item_id=track.item_id,
provider=self.type,
) -> AsyncGenerator[bytes, None]:
"""Return the audio stream for the provider item."""
# make sure that the token is still valid by just requesting it
- await self.get_token()
+ await self.login()
librespot = await self.get_librespot_binary()
args = [
librespot,
playlist.metadata.checksum = str(playlist_obj["snapshot_id"])
return playlist
- async def get_token(self):
- """Get auth token on spotify."""
+ async def login(self) -> dict:
+ """Log-in Spotify and return tokeninfo."""
# return existing token if we have one in memory
if (
self._auth_token
and (self._auth_token["expiresAt"] > int(time.time()) + 20)
):
return self._auth_token
- tokeninfo = {}
+ tokeninfo, userinfo = None, self._sp_user
if not self.config.username or not self.config.password:
- return tokeninfo
+ raise LoginFailed("Invalid login credentials")
# retrieve token with librespot
retries = 0
- while retries < 4:
+ while retries < 20:
try:
- tokeninfo = await asyncio.wait_for(self._get_token(), 5)
- if tokeninfo:
+ retries += 1
+ if not tokeninfo:
+ tokeninfo = await asyncio.wait_for(self._get_token(), 5)
+ if tokeninfo and not userinfo:
+ userinfo = await asyncio.wait_for(
+ self._get_data("me", tokeninfo=tokeninfo), 5
+ )
+ if tokeninfo and userinfo:
+ # we have all info we need!
break
if retries > 2:
# switch to ap workaround after 2 retries
self._ap_workaround = True
- retries += 1
+ except asyncio.exceptions.TimeoutError:
await asyncio.sleep(2)
- except TimeoutError:
- pass
- if tokeninfo:
+ if tokeninfo and userinfo:
self._auth_token = tokeninfo
- self._sp_user = await self._get_data("me")
- self.mass.metadata.preferred_language = self._sp_user["country"]
- self.logger.info(
- "Succesfully logged in to Spotify as %s", self._sp_user["id"]
- )
+ self._sp_user = userinfo
+ self.mass.metadata.preferred_language = userinfo["country"]
+ self.logger.info("Succesfully logged in to Spotify as %s", userinfo["id"])
self._auth_token = tokeninfo
- else:
- self.logger.error("Login failed for user %s", self.config.username)
- return tokeninfo
+ return tokeninfo
+ if tokeninfo and not userinfo:
+ raise LoginFailed(
+ "Unable to retrieve userdetails from Spotify API - probably just a temporary error"
+ )
+ if self.config.username.isnumeric():
+ # a spotify free/basic account can be recognized when
+ # the username consists of numbers only - check that here
+ # an integer can be parsed of the username, this is a free account
+ raise LoginFailed("Only Spotify Premium accounts are supported")
+ raise LoginFailed(f"Login failed for user {self.config.username}")
async def _get_token(self):
"""Get spotify auth token with librespot bin."""
+ time_start = time.time()
# authorize with username and password (NOTE: this can also be Spotify Connect)
args = [
await self.get_librespot_binary(),
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
)
stdout, _ = await librespot.communicate()
+ duration = round(time.time() - time_start, 2)
try:
result = json.loads(stdout)
except JSONDecodeError:
self.logger.warning(
- "Error while retrieving Spotify token, details: %s",
+ "Error while retrieving Spotify token after %s seconds, details: %s",
+ duration,
stdout.decode(),
)
return None
+ self.logger.debug(
+ "Retrieved Spotify token using librespot in %s seconds",
+ duration,
+ )
# transform token info to spotipy compatible format
if result and "accessToken" in result:
tokeninfo = result
break
return all_items
- async def _get_data(self, endpoint, **kwargs):
+ async def _get_data(self, endpoint, tokeninfo: Optional[dict] = None, **kwargs):
"""Get data from api."""
url = f"https://api.spotify.com/v1/{endpoint}"
kwargs["market"] = "from_token"
kwargs["country"] = "from_token"
- token = await self.get_token()
- if not token:
- return None
- headers = {"Authorization": f'Bearer {token["accessToken"]}'}
+ if tokeninfo is None:
+ tokeninfo = await self.login()
+ headers = {"Authorization": f'Bearer {tokeninfo["accessToken"]}'}
async with self._throttler:
- async with self.mass.http_session.get(
- url, headers=headers, params=kwargs, verify_ssl=False
- ) as response:
- try:
+ time_start = time.time()
+ try:
+ async with self.mass.http_session.get(
+ url, headers=headers, params=kwargs, verify_ssl=False, timeout=120
+ ) as response:
result = await response.json()
if "error" in result or (
"status" in result and "error" in result["status"]
):
self.logger.error("%s - %s", endpoint, result)
return None
- except (
- aiohttp.ContentTypeError,
- JSONDecodeError,
- ) as err:
- self.logger.error("%s - %s", endpoint, str(err))
- return None
- return result
+ except (
+ aiohttp.ContentTypeError,
+ JSONDecodeError,
+ ) as err:
+ self.logger.error("%s - %s", endpoint, str(err))
+ return None
+ finally:
+ self.logger.debug(
+ "Processing GET/%s took %s seconds",
+ endpoint,
+ round(time.time() - time_start, 2),
+ )
+ return result
async def _delete_data(self, endpoint, data=None, **kwargs):
"""Delete data from api."""
url = f"https://api.spotify.com/v1/{endpoint}"
- token = await self.get_token()
+ token = await self.login()
if not token:
return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}
async def _put_data(self, endpoint, data=None, **kwargs):
"""Put data on api."""
url = f"https://api.spotify.com/v1/{endpoint}"
- token = await self.get_token()
+ token = await self.login()
if not token:
return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}
async def _post_data(self, endpoint, data=None, **kwargs):
"""Post data on api."""
url = f"https://api.spotify.com/v1/{endpoint}"
- token = await self.get_token()
+ token = await self.login()
if not token:
return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}