import asyncio
import logging
import os
-from sys import path
from aiorun import run
-# pylint: disable=wrong-import-position
-from music_assistant.models.player import Player, PlayerState
-
-path.insert(1, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
from music_assistant.mass import MusicAssistant
-from music_assistant.providers.spotify import SpotifyProvider
+from music_assistant.models.player import Player, PlayerState
+from music_assistant.providers.filesystem import FileSystemProvider
from music_assistant.providers.qobuz import QobuzProvider
+from music_assistant.providers.spotify import SpotifyProvider
from music_assistant.providers.tunein import TuneInProvider
-from music_assistant.providers.filesystem import FileSystemProvider
parser = argparse.ArgumentParser(description="MusicAssistant")
parser.add_argument(
class TestPlayer(Player):
+ """Demonstatration player implementation."""
+
def __init__(self):
+ """Init."""
self.player_id = "test"
- self.is_group = False
self._attr_name = "Test player"
- self._attr_powered = False
+ self._attr_powered = True
self._attr_elapsed_time = 0
- self._attr_current_url = None
+ self._attr_current_url = ""
self._attr_state = PlayerState.IDLE
self._attr_available = True
self._attr_volume_level = 100
async def play_url(self, url: str) -> None:
"""Play the specified url on the player."""
- print("play uri: %s" % url)
+ print("play uri called: {url}")
self._attr_current_url = url
self.update_state()
async def stop(self) -> None:
"""Send STOP command to player."""
- print("STOP CALLED")
+ print("stop called")
self._attr_state = PlayerState.IDLE
self._attr_current_url = None
self._attr_elapsed_time = 0
async def play(self) -> None:
"""Send PLAY/UNPAUSE command to player."""
- print("PLAY CALLED")
+ print("play called")
self._attr_state = PlayerState.PLAYING
self._attr_elapsed_time = 1
self.update_state()
async def pause(self) -> None:
"""Send PAUSE command to player."""
- print("PAUSE CALLED")
+ print("pause called")
self._attr_state = PlayerState.PAUSED
self.update_state()
async def power(self, powered: bool) -> None:
"""Send POWER command to player."""
- print("POWER CALLED - %s" % powered)
+ print(f"POWER CALLED - new power: {powered}")
self._attr_powered = powered
self._attr_current_url = None
self.update_state()
async def volume_set(self, volume_level: int) -> None:
"""Send volume level (0..100) command to player."""
- print("VOLUME SET CALLED - %s" % volume_level)
+ print(f"volume_set called - {volume_level}")
self._attr_volume_level = volume_level
self.update_state()
test_player = TestPlayer()
await mass.players.register_player(test_player)
# try to play some playlist
- await test_player.queue.set_crossfade_duration(10)
- await test_player.queue.set_shuffle_enabled(True)
+ await test_player.active_queue.set_crossfade_duration(10)
+ await test_player.active_queue.set_shuffle_enabled(True)
if len(playlists) > 0:
- await test_player.queue.play_media(playlists[0].uri)
+ await test_player.active_queue.play_media(playlists[0].uri)
def on_shutdown(loop):
loop.run_until_complete(mass.stop())