cur_chunk += 1
# HANDLE FIRST PART OF TRACK
- if cur_chunk == 1 and is_last_chunk:
+ if not chunk and cur_chunk == 1 and is_last_chunk:
LOGGER.warning("Stream error, skip track %s", queue_track.item_id)
break
if cur_chunk <= 2 and not last_fadeout_data:
import threading
from typing import Any, Awaitable, Callable, List, Optional, Union
+import aiohttp
from music_assistant.cache import Cache
from music_assistant.config import MassConfig
from music_assistant.constants import (
"""
self.loop = None
+ self._http_session = None
self._event_listeners = []
self._providers = {}
self.config = MassConfig(self, datapath)
async def async_start(self):
"""Start running the music assistant server."""
+ # initialize loop
self.loop = asyncio.get_event_loop()
self.loop.set_exception_handler(self.__handle_exception)
if LOGGER.level == logging.DEBUG:
self.loop.set_debug(True)
+ # create shared aiohttp ClientSession
+ self._http_session = aiohttp.ClientSession(
+ loop=self.loop,
+ connector=aiohttp.TCPConnector(enable_cleanup_closed=True, ssl=False),
+ )
await self.database.async_setup()
await self.cache.async_setup()
- await self.metadata.async_setup()
await self.music_manager.async_setup()
await self.player_manager.async_setup()
await self.web.async_setup()
for prov in self._providers.values():
await prov.async_on_stop()
await self.player_manager.async_close()
+ await self._http_session.connector.close()
+ self._http_session.detach()
+
+ @property
+ def http_session(self):
+ """Return the default http session."""
+ return self._http_session
async def async_register_provider(self, provider: Provider):
"""Register a new Provider/Plugin."""
return remove_listener
+ @callback
def add_job(
self, target: Callable[..., Any], *args: Any
) -> Optional[asyncio.Future]:
if threading.current_thread() is not threading.main_thread():
# called from other thread
if asyncio.iscoroutine(check_target):
- task = asyncio.run_coroutine_threadsafe(
- target, self.loop
- ) # type: ignore
+ task = asyncio.run_coroutine_threadsafe(target, self.loop) # type: ignore
elif asyncio.iscoroutinefunction(check_target):
task = asyncio.run_coroutine_threadsafe(target(*args), self.loop)
elif is_callback(check_target):
self.musicbrainz = MusicBrainz(mass)
self.fanarttv = FanartTv(mass)
- async def async_setup(self):
- """Async setup of metadata module."""
- await self.musicbrainz.async_setup()
- await self.fanarttv.async_setup()
-
async def async_get_artist_metadata(self, mb_artist_id, cur_metadata):
"""Get/update rich metadata for an artist by providing the musicbrainz artist id."""
metadata = cur_metadata
"""Initialize class."""
self.mass = mass
self.cache = mass.cache
- self.throttler = None
- self._http_session = None
-
- async def async_setup(self):
- """Perform async setup."""
- self._http_session = aiohttp.ClientSession(
- loop=self.mass.loop, connector=aiohttp.TCPConnector()
- )
self.throttler = Throttler(rate_limit=1, period=1)
async def async_search_artist_by_album(
headers = {"User-Agent": "Music Assistant/1.0.0 https://github.com/marcelveldt"}
params["fmt"] = "json"
async with self.throttler:
- async with self._http_session.get(
+ async with self.mass.http_session.get(
url, headers=headers, params=params, verify_ssl=False
) as response:
try:
"""Initialize class."""
self.mass = mass
self.cache = mass.cache
- self._http_session = None
- self.throttler = None
-
- async def async_setup(self):
- """Perform async setup."""
- self._http_session = aiohttp.ClientSession(
- loop=self.mass.loop, connector=aiohttp.TCPConnector()
- )
self.throttler = Throttler(rate_limit=1, period=2)
async def async_get_artist_images(self, mb_artist_id):
url = "http://webservice.fanart.tv/v3/%s" % endpoint
params["api_key"] = "639191cb0774661597f28a47e7e2bad5"
async with self.throttler:
- async with self._http_session.get(
+ async with self.mass.http_session.get(
url, params=params, verify_ssl=False
) as response:
try:
it will send a constant stream of audio to the player with all tracks.
"""
supports_crossfade = PlayerFeature.CROSSFADE in self.player.features
- return self.crossfade_enabled and not supports_crossfade
+ supports_queue = PlayerFeature.QUEUE in self.player.features
+ return not supports_crossfade if self.crossfade_enabled else not supports_queue
@callback
def get_item(self, index):
@abstractmethod
async def async_on_stop(self):
"""Handle correct close/cleanup of the provider on exit. Called on shutdown."""
- raise NotImplementedError
async def async_on_reload(self):
"""Handle configuration changes for this provider. Called on reload."""
EVENT_PLAYER_REMOVED,
)
from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType
-from music_assistant.models.media_types import MediaItem, MediaType
+from music_assistant.models.media_types import MediaItem, MediaType, Track
from music_assistant.models.player import (
Player,
PlayerControl,
from music_assistant.models.player_queue import PlayerQueue, QueueItem, QueueOption
from music_assistant.models.playerprovider import PlayerProvider
from music_assistant.models.provider import ProviderType
+from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType
from music_assistant.utils import (
async_iter_items,
callback,
if queue_opt == QueueOption.Add:
return await player_queue.async_append(queue_items)
+ async def async_cmd_play_uri(self, player_id: str, uri: str):
+ """
+ Play the specified uri/url on the given player.
+
+ Will create a fake track on the queue.
+
+ :param player_id: player_id of the player to handle the command.
+ :param uri: Url/Uri that can be played by a player.
+ :param queue_opt:
+ QueueOption.Play -> Insert new items in queue and start playing at inserted position
+ QueueOption.Replace -> Replace queue contents with these items
+ QueueOption.Next -> Play item(s) after current playing item
+ QueueOption.Add -> Append new items at end of the queue
+ """
+ player = self._players[player_id]
+ if not player:
+ return
+ queue_item = QueueItem(
+ Track(
+ item_id=uri,
+ provider="",
+ name="uri",
+ )
+ )
+ queue_item.streamdetails = StreamDetails(
+ type=StreamType.URL,
+ provider="",
+ item_id=uri,
+ path=uri,
+ content_type=ContentType(uri.split(".")[-1]),
+ sample_rate=44100,
+ bit_depth=16,
+ )
+ # turn on player
+ await self.async_cmd_power_on(player_id)
+ # load item into the queue
+ player_queue = self.get_player_queue(player_id)
+ return await player_queue.async_insert([queue_item], 0)
+
async def async_cmd_stop(self, player_id: str) -> None:
"""
Send STOP command to given player.
"""Demo/test providers."""
-
-import functools
+import asyncio
+import signal
+import subprocess
from typing import List
-import vlc
from music_assistant.models.config_entry import ConfigEntry
-from music_assistant.models.player import DeviceInfo, Player, PlayerFeature, PlayerState
-from music_assistant.models.player_queue import QueueItem
+from music_assistant.models.player import DeviceInfo, Player, PlayerState
from music_assistant.models.playerprovider import PlayerProvider
PROV_ID = "demo_player"
async def async_on_start(self) -> bool:
"""Handle initialization of the provider based on config."""
- # create some fake players
- for count in range(5)[1:]:
- player_id = f"demo_{count}"
- player = Player(
- player_id=player_id,
- provider_id=PROV_ID,
- name=f"Demo player {count}",
- should_poll=False,
- available=True,
- )
- model_name = "Base"
- if count == 1:
- # player 1 has no support for special features
- model_name = "Basic"
- if count == 2:
- # player 2 has QUEUE support feature but no crossfade
- player.features = [PlayerFeature.QUEUE]
- model_name = "QUEUE support"
- if count == 3:
- # player 3 has support for all features
- player.features = [
- PlayerFeature.QUEUE,
- PlayerFeature.GAPLESS,
- PlayerFeature.CROSSFADE,
- ]
- if count == 4:
- # player 4 is a group player
- player.is_group_player = True
- player.group_childs = ["demo_1", "demo_2", "demo_8"]
- player.blaat = True
- player.device_info = DeviceInfo(
- model=model_name, address=f"http://demo:{count}", manufacturer=PROV_ID
- )
- player.vlc_instance = vlc.Instance()
- player.vlc_player = player.vlc_instance.media_player_new()
- events = player.vlc_player.event_manager()
- player_event_cb = functools.partial(self.player_event, player_id)
- events.event_attach(vlc.EventType.MediaPlayerEndReached, player_event_cb)
- events.event_attach(vlc.EventType.MediaPlayerMediaChanged, player_event_cb)
- events.event_attach(vlc.EventType.MediaPlayerPlaying, player_event_cb)
- events.event_attach(vlc.EventType.MediaPlayerPaused, player_event_cb)
- events.event_attach(vlc.EventType.MediaPlayerStopped, player_event_cb)
- events.event_attach(vlc.EventType.MediaPlayerTimeChanged, player_event_cb)
- events.event_attach(vlc.EventType.MediaPlayerMuted, player_event_cb)
- events.event_attach(vlc.EventType.MediaPlayerUnmuted, player_event_cb)
- events.event_attach(vlc.EventType.MediaPlayerAudioVolume, player_event_cb)
- self._players[player_id] = player
- self.mass.add_job(self.mass.player_manager.async_add_player(player))
+ # create fake/test regular player 1
+ player = Player(
+ player_id="demo_player_1",
+ provider_id=PROV_ID,
+ name="Demo player 1",
+ device_info=DeviceInfo(
+ model="Demo/Test Player",
+ address="http://demo_player1:12345",
+ manufacturer=PROV_ID,
+ ),
+ )
+ player.sox = None
+ self._players[player.player_id] = player
+ self.mass.add_job(self.mass.player_manager.async_add_player(player))
+ # create fake/test regular player 2
+ player = Player(
+ player_id="demo_player_2",
+ provider_id=PROV_ID,
+ name="Demo player 2",
+ device_info=DeviceInfo(
+ model="Demo/Test Player",
+ address="http://demo_player2:12345",
+ manufacturer=PROV_ID,
+ ),
+ )
+ player.sox = None
+ self._players[player.player_id] = player
+ self.mass.add_job(self.mass.player_manager.async_add_player(player))
+ # create fake/test group player
+ group_player = Player(
+ player_id="demo_group_player",
+ is_group_player=True,
+ group_childs=["demo_player_1", "demo_player_2"],
+ provider_id=PROV_ID,
+ name="Demo Group Player",
+ device_info=DeviceInfo(
+ model="Demo/Test Group player",
+ address="http://demo_group_player:12345",
+ manufacturer=PROV_ID,
+ ),
+ )
+ group_player.sox = None
+ self._players[group_player.player_id] = group_player
+ self.mass.add_job(self.mass.player_manager.async_add_player(group_player))
return True
async def async_on_stop(self):
"""Handle correct close/cleanup of the provider on exit."""
- for player_id, player in self._players.items():
- player.vlc_player.release()
- player.vlc_instance.release()
- del player
- await self.mass.player_manager.async_remove_player(player_id)
- self._players = {}
-
- def player_event(self, player_id, event):
- """Call on vlc player events."""
- # pylint: disable = unused-argument
- vlc_player: vlc.MediaPlayer = self._players[player_id].vlc_player
- self._players[player_id].muted = vlc_player.audio_get_mute()
- self._players[player_id].volume_level = vlc_player.audio_get_volume()
- if vlc_player.is_playing():
- self._players[player_id].state = PlayerState.Playing
- self._players[player_id].powered = True
- elif vlc_player.get_media():
- self._players[player_id].state = PlayerState.Paused
- else:
- self._players[player_id].state = PlayerState.Stopped
- self._players[player_id].elapsed_time = int(vlc_player.get_time() / 1000)
- self.mass.add_job(
- self.mass.player_manager.async_update_player(self._players[player_id])
- )
+ for player in self._players.values():
+ if player.sox:
+ player.sox.terminate()
# SERVICE CALLS / PLAYER COMMANDS
:param player_id: player_id of the player to handle the command.
"""
- # self._players[player_id].current_uri = uri
- media = self._players[player_id].vlc_instance.media_new_location(uri)
- self.mass.add_job(self._players[player_id].vlc_player.set_media, media)
- self.mass.add_job(self._players[player_id].vlc_player.play)
+ player = self._players[player_id]
+ if player.sox:
+ await self.async_cmd_stop(player_id)
+ player.current_uri = uri
+ player.sox = subprocess.Popen(["play", uri])
+ player.state = PlayerState.Playing
+ self.mass.add_job(self.mass.player_manager.async_update_player(player))
+
+ async def report_progress():
+ """Report fake progress while sox is playing."""
+ player.elapsed_time = 0
+ while player.sox and not player.sox.poll():
+ await asyncio.sleep(1)
+ player.elapsed_time += 1
+ self.mass.add_job(self.mass.player_manager.async_update_player(player))
+ player.elapsed_time = 0
+ player.state = PlayerState.Stopped
+ self.mass.add_job(self.mass.player_manager.async_update_player(player))
+
+ self.mass.add_job(report_progress)
async def async_cmd_stop(self, player_id: str) -> None:
"""
:param player_id: player_id of the player to handle the command.
"""
- self.mass.add_job(self._players[player_id].vlc_player.stop)
+ player = self._players[player_id]
+ if player.sox:
+ player.sox.terminate()
+ player.sox = None
+ player.state = PlayerState.Stopped
+ self.mass.add_job(self.mass.player_manager.async_update_player(player))
async def async_cmd_play(self, player_id: str) -> None:
"""
:param player_id: player_id of the player to handle the command.
"""
- if self._players[player_id].vlc_player.get_media():
- self.mass.add_job(self._players[player_id].vlc_player.play)
+ player = self._players[player_id]
+ if player.sox:
+ player.sox.send_signal(signal.SIGCONT)
+ player.state = PlayerState.Playing
+ self.mass.add_job(self.mass.player_manager.async_update_player(player))
async def async_cmd_pause(self, player_id: str):
"""
:param player_id: player_id of the player to handle the command.
"""
- self.mass.add_job(self._players[player_id].vlc_player.pause)
+ player = self._players[player_id]
+ if player.sox:
+ player.sox.send_signal(signal.SIGSTOP)
+ player.state = PlayerState.Paused
+ self.mass.add_job(self.mass.player_manager.async_update_player(player))
async def async_cmd_next(self, player_id: str):
"""
:param player_id: player_id of the player to handle the command.
"""
- self.mass.add_job(self._players[player_id].vlc_player.next_chapter)
+ # this code should never be reached as the player doesn't report queue support
+ # throw NotImplementedError just in case we've missed a spot
+ raise NotImplementedError
async def async_cmd_previous(self, player_id: str):
"""
:param player_id: player_id of the player to handle the command.
"""
- self.mass.add_job(self._players[player_id].vlc_player.previous_chapter)
+ # this code should never be reached as the player doesn't report queue support
+ # throw NotImplementedError just in case we've missed a spot
+ raise NotImplementedError
async def async_cmd_power_on(self, player_id: str) -> None:
"""
:param player_id: player_id of the player to handle the command.
"""
- self.mass.add_job(self._players[player_id].vlc_player.stop)
+ await self.async_cmd_stop(player_id)
self._players[player_id].powered = False
self.mass.add_job(
self.mass.player_manager.async_update_player(self._players[player_id])
:param player_id: player_id of the player to handle the command.
:param volume_level: volume level to set (0..100).
"""
+ self._players[player_id].volume_level = volume_level
self.mass.add_job(
- self._players[player_id].vlc_player.audio_set_volume, volume_level
+ self.mass.player_manager.async_update_player(self._players[player_id])
)
async def async_cmd_volume_mute(self, player_id: str, is_muted=False):
:param player_id: player_id of the player to handle the command.
:param is_muted: bool with new mute state.
"""
- self.mass.add_job(self._players[player_id].vlc_player.audio_set_mute, is_muted)
-
- # OPTIONAL: QUEUE SERVICE CALLS/COMMANDS - OVERRIDE ONLY IF SUPPORTED BY PROVIDER
- # pylint: disable=abstract-method
-
- async def async_cmd_queue_play_index(self, player_id: str, index: int):
- """
- Play item at index X on player's queue.
-
- :param player_id: player_id of the player to handle the command.
- :param index: (int) index of the queue item that should start playing
- """
- raise NotImplementedError
-
- async def async_cmd_queue_load(self, player_id: str, queue_items: List[QueueItem]):
- """
- Load/overwrite given items in the player's queue implementation.
-
- :param player_id: player_id of the player to handle the command.
- :param queue_items: a list of QueueItems
- """
- raise NotImplementedError
-
- async def async_cmd_queue_insert(
- self, player_id: str, queue_items: List[QueueItem], insert_at_index: int
- ):
- """
- Insert new items at position X into existing queue.
-
- If insert_at_index 0 or None, will start playing newly added item(s)
- :param player_id: player_id of the player to handle the command.
- :param queue_items: a list of QueueItems
- :param insert_at_index: queue position to insert new items
- """
- raise NotImplementedError
-
- async def async_cmd_queue_append(
- self, player_id: str, queue_items: List[QueueItem]
- ):
- """
- Append new items at the end of the queue.
-
- :param player_id: player_id of the player to handle the command.
- :param queue_items: a list of QueueItems
- """
- raise NotImplementedError
-
- async def async_cmd_queue_update(
- self, player_id: str, queue_items: List[QueueItem]
- ):
- """
- Overwrite the existing items in the queue, used for reordering.
-
- :param player_id: player_id of the player to handle the command.
- :param queue_items: a list of QueueItems
- """
- raise NotImplementedError
-
- async def async_cmd_queue_clear(self, player_id: str):
- """
- Clear the player's queue.
-
- :param player_id: player_id of the player to handle the command.
- """
- raise NotImplementedError
+ self._players[player_id].muted = is_muted
+ self.mass.add_job(
+ self.mass.player_manager.async_update_player(self._players[player_id])
+ )
import time
from typing import List, Optional
-import aiohttp
from asyncio_throttle import Throttler
from music_assistant.app_vars import get_app_var # noqa # pylint: disable=all
from music_assistant.constants import (
# pylint: disable=abstract-method
- _http_session = None
__user_auth_info = None
@property
async def async_on_start(self) -> bool:
"""Handle initialization of the provider based on config."""
# pylint: disable=attribute-defined-outside-init
- self._http_session = aiohttp.ClientSession(
- loop=self.mass.loop, connector=aiohttp.TCPConnector()
- )
config = self.mass.config.get_provider_config(self.id)
if not config[CONF_USERNAME] or not config[CONF_PASSWORD]:
LOGGER.debug("Username and password not set. Abort load of provider.")
self.mass.add_event_listener(self.async_mass_event, EVENT_PLAYBACK_STOPPED)
return True
- async def async_on_stop(self):
- """Handle correct close/cleanup of the provider on exit."""
- if self._http_session:
- await self._http_session.close()
-
async def async_search(
self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
) -> SearchResult:
params["app_id"] = get_app_var(0)
params["user_auth_token"] = await self.__async_auth_token()
async with self._throttler:
- async with self._http_session.get(
+ async with self.mass.http_session.get(
url, headers=headers, params=params, verify_ssl=False
) as response:
result = await response.json()
url = "http://www.qobuz.com/api.json/0.2/%s" % endpoint
params["app_id"] = get_app_var(0)
params["user_auth_token"] = await self.__async_auth_token()
- async with self._http_session.post(
+ async with self.mass.http_session.post(
url, params=params, json=data, verify_ssl=False
) as response:
result = await response.json()
import time
from typing import List, Optional
-import aiohttp
from asyncio_throttle import Throttler
from music_assistant.app_vars import get_app_var # noqa # pylint: disable=all
from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
# pylint: disable=abstract-method
- _http_session = None
__auth_token = None
sp_user = None
config = self.mass.config.get_provider_config(self.id)
# pylint: disable=attribute-defined-outside-init
self._cur_user = None
- self._http_session = aiohttp.ClientSession(
- loop=self.mass.loop, connector=aiohttp.TCPConnector()
- )
self.sp_user = None
if not config[CONF_USERNAME] or not config[CONF_PASSWORD]:
LOGGER.debug("Username and password not set. Abort load of provider.")
return token is not None
- async def async_on_stop(self):
- """Handle correct close/cleanup of the provider on exit."""
- if self._http_session:
- await self._http_session.close()
-
async def async_search(
self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
) -> SearchResult:
token = await self.async_get_token()
headers = {"Authorization": "Bearer %s" % token["accessToken"]}
async with self._throttler:
- async with self._http_session.get(
+ async with self.mass.http_session.get(
url, headers=headers, params=params, verify_ssl=False
) as response:
result = await response.json()
url = "https://api.spotify.com/v1/%s" % endpoint
token = await self.async_get_token()
headers = {"Authorization": "Bearer %s" % token["accessToken"]}
- async with self._http_session.delete(
+ async with self.mass.http_session.delete(
url, headers=headers, params=params, json=data, verify_ssl=False
) as response:
return await response.text()
url = "https://api.spotify.com/v1/%s" % endpoint
token = await self.async_get_token()
headers = {"Authorization": "Bearer %s" % token["accessToken"]}
- async with self._http_session.put(
+ async with self.mass.http_session.put(
url, headers=headers, params=params, json=data, verify_ssl=False
) as response:
return await response.text()
url = "https://api.spotify.com/v1/%s" % endpoint
token = await self.async_get_token()
headers = {"Authorization": "Bearer %s" % token["accessToken"]}
- async with self._http_session.post(
+ async with self.mass.http_session.post(
url, headers=headers, params=params, json=data, verify_ssl=False
) as response:
return await response.text()
import logging
from typing import List, Optional
-import aiohttp
from asyncio_throttle import Throttler
from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType
_username = None
_password = None
- _http_session = None
_throttler = None
@property
async def async_on_start(self) -> bool:
"""Handle initialization of the provider based on config."""
# pylint: disable=attribute-defined-outside-init
- self._http_session = aiohttp.ClientSession(
- loop=self.mass.loop, connector=aiohttp.TCPConnector()
- )
config = self.mass.config.get_provider_config(self.id)
if not config[CONF_USERNAME] or not config[CONF_PASSWORD]:
LOGGER.debug("Username and password not set. Abort load of provider.")
self._password = config[CONF_PASSWORD]
self._throttler = Throttler(rate_limit=1, period=1)
- async def async_on_stop(self):
- """Handle correct close/cleanup of the provider on exit."""
- if self._http_session:
- await self._http_session.close()
-
async def async_search(
self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
) -> SearchResult:
params["username"] = self._username
params["partnerId"] = "1"
async with self._throttler:
- async with self._http_session.get(
+ async with self.mass.http_session.get(
url, params=params, verify_ssl=False
) as response:
result = await response.json()
zeroconf==0.28.4
passlib==1.7.2
cryptography==3.1
-python-vlc==3.0.11115
mashumaro==1.12