{
- "python.linting.pylintEnabled": false,
+ "python.linting.pylintEnabled": true,
"python.linting.enabled": true,
"python.pythonPath": "venv/bin/python3",
- "python.linting.flake8Enabled": true
+ "python.linting.flake8Enabled": false
}
\ No newline at end of file
music_assistant/providers/demo/__init__.py
music_assistant/providers/demo/demo_musicprovider.py
music_assistant/providers/demo/demo_playerprovider.py
-music_assistant/providers/file/file.py
+music_assistant/providers/file/__init__.py
music_assistant/providers/home_assistant/__init__.py
music_assistant/providers/qobuz/__init__.py
music_assistant/providers/sonos/__init__.py
music_assistant/providers/squeezebox/discovery.py
music_assistant/providers/squeezebox/socket_client.py
music_assistant/providers/tunein/__init__.py
-music_assistant/providers/webplayer/todo.py
+music_assistant/providers/webplayer/__init__.py
venv/lib/python3.7/site-packages/Pillow-7.2.0.dist-info/top_level.txt
venv/lib/python3.7/site-packages/PyChromecast-7.2.1.dist-info/top_level.txt
venv/lib/python3.7/site-packages/PyJWT-1.7.1.dist-info/entry_points.txt
venv/lib/python3.7/site-packages/memory_tempfile-2.2.3.dist-info/LICENSE.txt
venv/lib/python3.7/site-packages/more_itertools-8.5.0.dist-info/top_level.txt
venv/lib/python3.7/site-packages/multidict-4.7.6.dist-info/top_level.txt
-venv/lib/python3.7/site-packages/music_assistant-1.0.0-py3.7.egg/EGG-INFO/SOURCES.txt
-venv/lib/python3.7/site-packages/music_assistant-1.0.0-py3.7.egg/EGG-INFO/dependency_links.txt
-venv/lib/python3.7/site-packages/music_assistant-1.0.0-py3.7.egg/EGG-INFO/entry_points.txt
-venv/lib/python3.7/site-packages/music_assistant-1.0.0-py3.7.egg/EGG-INFO/requires.txt
-venv/lib/python3.7/site-packages/music_assistant-1.0.0-py3.7.egg/EGG-INFO/top_level.txt
venv/lib/python3.7/site-packages/mypy-0.770.dist-info/entry_points.txt
venv/lib/python3.7/site-packages/mypy-0.770.dist-info/top_level.txt
venv/lib/python3.7/site-packages/mypy_extensions-0.4.3.dist-info/top_level.txt
LOGGER = logging.getLogger("mass")
-class Cache(object):
+class Cache:
"""Basic stateless caching system."""
_db = None
"""Get int checksum from string."""
if not stringinput:
return 0
- else:
- stringinput = str(stringinput)
+ stringinput = str(stringinput)
return reduce(lambda x, y: x + y, map(ord, stringinput))
cachedata = await method_class.cache.async_get(cache_str)
if cachedata is not None:
return cachedata
- else:
- result = await func(*args, **kwargs)
- await method_class.cache.async_set(
- cache_str,
- result,
- checksum=cache_checksum,
- expiration=(86400 * cache_days),
- )
- return result
+ result = await func(*args, **kwargs)
+ await method_class.cache.async_set(
+ cache_str,
+ result,
+ checksum=cache_checksum,
+ expiration=(86400 * cache_days),
+ )
+ return result
return async_wrapped
from typing import List
from cryptography.fernet import Fernet, InvalidToken
-from music_assistant.app_vars import get_app_var # noqa
+from music_assistant.app_vars import get_app_var # noqa # pylint: disable=all
from music_assistant.constants import (
CONF_CROSSFADE_DURATION,
CONF_ENABLED,
import signal
import subprocess
import threading
+import urllib
from contextlib import suppress
import pyloudnorm
if cancelled.is_set():
# break out the loop if the http session is cancelled
break
- else:
- # update actual duration to the queue for more accurate now playing info
- accurate_duration = bytes_written / int(sample_rate * 4 * 2)
- queue_track.duration = accurate_duration
- LOGGER.debug(
- "Finished Streaming queue track: %s (%s) on player %s",
- queue_track.item_id,
- queue_track.name,
- player_id,
- )
- # run garbage collect manually to avoid too much memory fragmentation
- gc.collect()
+ # update actual duration to the queue for more accurate now playing info
+ accurate_duration = bytes_written / int(sample_rate * 4 * 2)
+ queue_track.duration = accurate_duration
+ LOGGER.debug(
+ "Finished Streaming queue track: %s (%s) on player %s",
+ queue_track.item_id,
+ queue_track.name,
+ player_id,
+ )
+ # run garbage collect manually to avoid too much memory fragmentation
+ gc.collect()
# end of queue reached, pass last fadeout bits to final output
if last_fadeout_data and not cancelled.is_set():
sox_proc.stdin.write(last_fadeout_data)
# last chunk
yield (True, prev_chunk + chunk)
break
- else:
- if prev_chunk:
- yield (False, prev_chunk)
- prev_chunk = chunk
+ if prev_chunk:
+ yield (False, prev_chunk)
+ prev_chunk = chunk
# fire event that streaming has ended
self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails)
# send task to background to analyse the audio
# only when needed we do the analyze stuff
LOGGER.debug("Start analyzing track %s", item_key)
if streamdetails.type == StreamType.URL:
- import urllib
-
audio_data = urllib.request.urlopen(streamdetails.path).read()
elif streamdetails.type == StreamType.EXECUTABLE:
audio_data = subprocess.check_output(streamdetails.path, shell=True)
task = self.loop.run_in_executor(None, target, *args) # type: ignore
return task
- def __handle_exception(self, loop, context):
+ @staticmethod
+ def __handle_exception(loop, context):
"""Global exception handler."""
LOGGER.error("Caught exception: %s", context)
loop.default_exception_handler(context)
@dataclass
-class MediaItem(object):
+class MediaItem:
"""Representation of a media item."""
item_id: str = ""
if self.cur_index is None:
# playback started
return 0
- else:
- # player already playing (or paused) so return the next item
- if len(self.items) > (self.cur_index + 1):
- return self.cur_index + 1
- if self._repeat_enabled:
- # repeat enabled, start queue at beginning
- return 0
+ # player already playing (or paused) so return the next item
+ if len(self.items) > (self.cur_index + 1):
+ return self.cur_index + 1
+ if self._repeat_enabled:
+ # repeat enabled, start queue at beginning
+ return 0
return None
@property
return
if self.use_queue_stream:
return await self.async_play_index(self.cur_index + 1)
- else:
- return await self.mass.player_manager.get_player_provider(
- self.player_id
- ).async_cmd_next(self.player_id)
+ return await self.mass.player_manager.get_player_provider(
+ self.player_id
+ ).async_cmd_next(self.player_id)
async def async_previous(self):
"""Play the previous track in the queue."""
return
if self.use_queue_stream:
return await self.async_play_index(self.cur_index - 1)
- else:
- return await self.mass.player_manager.async_cmd_previous(self.player_id)
+ return await self.mass.player_manager.async_cmd_previous(self.player_id)
async def async_resume(self):
"""Resume previous queue."""
return await player_prov.async_cmd_play_uri(
self.player_id, queue_stream_uri
)
- elif supports_queue:
+ if supports_queue:
try:
return await player_prov.async_cmd_queue_play_index(
self.player_id, index
if not music_prov:
continue # provider temporary unavailable ?
- streamdetails = await music_prov.async_get_stream_details(
+ streamdetails: StreamDetails = await music_prov.async_get_stream_details(
prov_media.item_id
)
if streamdetails:
+ streamdetails.player_id = player_id
# set streamdetails as attribute on the queue_item
queue_item.streamdetails = streamdetails
return streamdetails
raise NotImplementedError
@abstractmethod
- async def async_cmd_stop(self, player_id: str):
+ async def async_cmd_stop(self, player_id: str) -> None:
"""
Send STOP command to given player.
raise NotImplementedError
@abstractmethod
- async def async_cmd_play(self, player_id: str):
+ async def async_cmd_play(self, player_id: str) -> None:
"""
Send PLAY command to given player.
raise NotImplementedError
@abstractmethod
- async def async_cmd_power_on(self, player_id: str):
+ async def async_cmd_power_on(self, player_id: str) -> None:
"""
Send POWER ON command to given player.
raise NotImplementedError
@abstractmethod
- async def async_cmd_power_off(self, player_id: str):
+ async def async_cmd_power_off(self, player_id: str) -> None:
"""
Send POWER OFF command to given player.
raise NotImplementedError
@abstractmethod
- async def async_cmd_volume_set(self, player_id: str, volume_level: int):
+ async def async_cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""
Send volume level command to given player.
if track_version.provider == playlist_prov.provider:
track_ids_to_add.append(track_version.item_id)
break
- elif playlist_prov.provider == "file":
+ if playlist_prov.provider == "file":
# the file provider can handle uri's from all providers so simply add the uri
uri = f"{track_version.provider}://{track_version.item_id}"
track_ids_to_add.append(uri)
if queue_opt == QueueOption.Add:
return await player_queue.async_append(queue_items)
- async def async_cmd_stop(self, player_id: str):
+ async def async_cmd_stop(self, player_id: str) -> None:
"""
Send STOP command to given player.
# TODO: redirect playback related commands to parent player?
return await self.get_player_provider(player_id).async_cmd_stop(player_id)
- async def async_cmd_play(self, player_id: str):
+ async def async_cmd_play(self, player_id: str) -> None:
"""
Send PLAY command to given player.
"""
return await self.get_player_queue(player_id).async_previous()
- async def async_cmd_power_on(self, player_id: str):
+ async def async_cmd_power_on(self, player_id: str) -> None:
"""
Send POWER ON command to given player.
if control:
self.mass.add_job(control.set_state, control.id, True)
- async def async_cmd_power_off(self, player_id: str):
+ async def async_cmd_power_off(self, player_id: str) -> None:
"""
Send POWER OFF command to given player.
return await self.async_cmd_power_off(player_id)
return await self.async_cmd_power_on(player_id)
- async def async_cmd_volume_set(self, player_id: str, volume_level: int):
+ async def async_cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""
Send volume level command to given player.
return player.state
@callback
- def __get_player_mute_state(self, player: Player):
+ @classmethod
+ def __get_player_mute_state(cls, player: Player):
"""Get final/calculated player's mute state."""
# TODO: Handle VolumeControl plugin for mute state?
return player.muted
"""
self.mass.add_job(self._players[player_id].play_uri, uri)
- async def async_cmd_stop(self, player_id: str):
+ async def async_cmd_stop(self, player_id: str) -> None:
"""
Send STOP command to given player.
"""
self.mass.add_job(self._players[player_id].stop)
- async def async_cmd_play(self, player_id: str):
+ async def async_cmd_play(self, player_id: str) -> None:
"""
Send STOP command to given player.
"""
self.mass.add_job(self._players[player_id].previous)
- async def async_cmd_power_on(self, player_id: str):
+ async def async_cmd_power_on(self, player_id: str) -> None:
"""
Send POWER ON command to given player.
"""
self.mass.add_job(self._players[player_id].power_on)
- async def async_cmd_power_off(self, player_id: str):
+ async def async_cmd_power_off(self, player_id: str) -> None:
"""
Send POWER OFF command to given player.
"""
self.mass.add_job(self._players[player_id].power_off)
- async def async_cmd_volume_set(self, player_id: str, volume_level: int):
+ async def async_cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""
Send volume level command to given player.
queue_item.name = "Music Assistant"
queue_item.uri = uri
return self.queue_load([queue_item, queue_item])
- else:
- self._chromecast.play_media(uri, "audio/flac")
+ self._chromecast.play_media(uri, "audio/flac")
def queue_load(self, queue_items: List[QueueItem]):
"""Load (overwrite) queue with new items."""
self.mass.add_job(self._players[player_id].vlc_player.set_media, media)
self.mass.add_job(self._players[player_id].vlc_player.play)
- async def async_cmd_stop(self, player_id: str):
+ async def async_cmd_stop(self, player_id: str) -> None:
"""
Send STOP command to given player.
"""
self.mass.add_job(self._players[player_id].vlc_player.stop)
- async def async_cmd_play(self, player_id: str):
+ async def async_cmd_play(self, player_id: str) -> None:
"""
Send PLAY command to given player.
"""
self.mass.add_job(self._players[player_id].vlc_player.previous_chapter)
- async def async_cmd_power_on(self, player_id: str):
+ async def async_cmd_power_on(self, player_id: str) -> None:
"""
Send POWER ON command to given player.
self.mass.player_manager.async_update_player(self._players[player_id])
)
- async def async_cmd_power_off(self, player_id: str):
+ async def async_cmd_power_off(self, player_id: str) -> None:
"""
Send POWER OFF command to given player.
self.mass.player_manager.async_update_player(self._players[player_id])
)
- async def async_cmd_volume_set(self, player_id: str, volume_level: int):
+ async def async_cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""
Send volume level command to given player.
Should be compatible with LMS
"""
+ # pylint chokes on taglib so ignore these
+ # pylint: disable=unsubscriptable-object,unsupported-membership-test
+
_music_dir = None
_playlists_dir = None
async def async_get_library_artists(self) -> List[Artist]:
"""Retrieve all library artists."""
if not os.path.isdir(self._music_dir):
- LOGGER.error("music path does not exist: %s" % self._music_dir)
+ LOGGER.error("music path does not exist: %s", self._music_dir)
+ yield None
return
- yield
for dirname in os.listdir(self._music_dir):
dirpath = os.path.join(self._music_dir, dirname)
if os.path.isdir(dirpath) and not dirpath.startswith("."):
- artist = await self.get_artist(dirpath)
+ artist = await self.async_get_artist(dirpath)
if artist:
yield artist
async def async_get_library_albums(self) -> List[Album]:
"""Get album folders recursively."""
- async for artist in self.get_library_artists():
- async for album in self.get_artist_albums(artist.item_id):
+ async for artist in self.async_get_library_artists():
+ async for album in self.async_get_artist_albums(artist.item_id):
yield album
async def async_get_library_tracks(self) -> List[Track]:
"""Get all tracks recursively."""
# TODO: support disk subfolders
- async for album in self.get_library_albums():
- async for track in self.get_album_tracks(album.item_id):
+ async for album in self.async_get_library_albums():
+ async for track in self.async_get_album_tracks(album.item_id):
yield track
async def async_get_library_playlists(self) -> List[Playlist]:
"""Retrieve playlists from disk."""
if not self._playlists_dir:
+ yield None
return
- yield
for filename in os.listdir(self._playlists_dir):
filepath = os.path.join(self._playlists_dir, filename)
if (
and not filename.startswith(".")
and filename.lower().endswith(".m3u")
):
- playlist = await self.get_playlist(filepath)
+ playlist = await self.async_get_playlist(filepath)
if playlist:
yield playlist
- async def async_get_artist(self, prov_item_id: str) -> Artist:
+ async def async_get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
- if os.sep not in prov_item_id:
- itempath = base64.b64decode(prov_item_id).decode("utf-8")
+ if os.sep not in prov_artist_id:
+ itempath = base64.b64decode(prov_artist_id).decode("utf-8")
else:
- itempath = prov_item_id
- prov_item_id = base64.b64encode(itempath.encode("utf-8")).decode("utf-8")
+ itempath = prov_artist_id
+ prov_artist_id = base64.b64encode(itempath.encode("utf-8")).decode("utf-8")
if not os.path.isdir(itempath):
- LOGGER.error("Artist path does not exist: %s" % itempath)
+ LOGGER.error("Artist path does not exist: %s", itempath)
return None
name = itempath.split(os.sep)[-1]
artist = Artist()
- artist.item_id = prov_item_id
+ artist.item_id = prov_artist_id
artist.provider = PROV_ID
artist.name = name
artist.provider_ids.append(
)
return artist
- async def async_get_album(self, prov_item_id: str) -> Album:
+ async def async_get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
- if os.sep not in prov_item_id:
- itempath = base64.b64decode(prov_item_id).decode("utf-8")
+ if os.sep not in prov_album_id:
+ itempath = base64.b64decode(prov_album_id).decode("utf-8")
else:
- itempath = prov_item_id
- prov_item_id = base64.b64encode(itempath.encode("utf-8")).decode("utf-8")
+ itempath = prov_album_id
+ prov_album_id = base64.b64encode(itempath.encode("utf-8")).decode("utf-8")
if not os.path.isdir(itempath):
- LOGGER.error("album path does not exist: %s" % itempath)
+ LOGGER.error("album path does not exist: %s", itempath)
return None
name = itempath.split(os.sep)[-1]
artistpath = itempath.rsplit(os.sep, 1)[0]
album = Album()
- album.item_id = prov_item_id
- album.provider = self.prov_id
+ album.item_id = prov_album_id
+ album.provider = PROV_ID
album.name, album.version = parse_title_and_version(name)
- album.artist = await self.get_artist(artistpath)
+ album.artist = await self.async_get_artist(artistpath)
if not album.artist:
raise Exception("No album artist ! %s" % artistpath)
album.provider_ids.append(
- MediaItemProviderId(provider=PROV_ID, item_id=prov_item_id)
+ MediaItemProviderId(provider=PROV_ID, item_id=prov_album_id)
)
return album
- async def async_get_track(self, prov_item_id: str) -> Track:
+ async def async_get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- if os.sep not in prov_item_id:
- itempath = base64.b64decode(prov_item_id).decode("utf-8")
+ if os.sep not in prov_track_id:
+ itempath = base64.b64decode(prov_track_id).decode("utf-8")
else:
- itempath = prov_item_id
+ itempath = prov_track_id
if not os.path.isfile(itempath):
LOGGER.error("track path does not exist: %s", itempath)
return None
- return await self.__parse_track(itempath)
+ return await self.__async_parse_track(itempath)
- async def async_get_playlist(self, prov_item_id: str) -> Playlist:
+ async def async_get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
- if os.sep not in prov_item_id:
- itempath = base64.b64decode(prov_item_id).decode("utf-8")
+ if os.sep not in prov_playlist_id:
+ itempath = base64.b64decode(prov_playlist_id).decode("utf-8")
else:
- itempath = prov_item_id
- prov_item_id = base64.b64encode(itempath.encode("utf-8")).decode("utf-8")
+ itempath = prov_playlist_id
+ prov_playlist_id = base64.b64encode(itempath.encode("utf-8")).decode(
+ "utf-8"
+ )
if not os.path.isfile(itempath):
- LOGGER.error("playlist path does not exist: %s" % itempath)
+ LOGGER.error("playlist path does not exist: %s", itempath)
return None
playlist = Playlist()
- playlist.item_id = prov_item_id
- playlist.provider = self.prov_id
+ playlist.item_id = prov_playlist_id
+ playlist.provider = PROV_ID
playlist.name = itempath.split(os.sep)[-1].replace(".m3u", "")
playlist.is_editable = True
playlist.provider_ids.append(
- MediaItemProviderId(provider=PROV_ID, item_id=prov_item_id)
+ MediaItemProviderId(provider=PROV_ID, item_id=prov_playlist_id)
)
playlist.owner = "disk"
playlist.checksum = os.path.getmtime(itempath)
else:
albumpath = prov_album_id
if not os.path.isdir(albumpath):
- LOGGER.error("album path does not exist: %s" % albumpath)
+ LOGGER.error("album path does not exist: %s", albumpath)
return
- album = await self.get_album(albumpath)
+ album = await self.async_get_album(albumpath)
for filename in os.listdir(albumpath):
filepath = os.path.join(albumpath, filename)
if os.path.isfile(filepath) and not filepath.startswith("."):
- track = await self.__parse_track(filepath)
+ track = await self.__async_parse_track(filepath)
if track:
track.album = album
yield track
- async def async_get_playlist_tracks(
- self, prov_playlist_id: str, limit: int = 50, offset: int = 0
- ) -> List[Track]:
+ async def async_get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get playlist tracks for given playlist id."""
if os.sep not in prov_playlist_id:
itempath = base64.b64decode(prov_playlist_id).decode("utf-8")
else:
itempath = prov_playlist_id
if not os.path.isfile(itempath):
- LOGGER.error("playlist path does not exist: %s" % itempath)
+ LOGGER.error("playlist path does not exist: %s", itempath)
return
- counter = 0
index = 0
- with open(itempath) as f:
- for line in f.readlines():
+ with open(itempath) as _file:
+ for line in _file.readlines():
line = line.strip()
if line and not line.startswith("#"):
- counter += 1
- if counter > offset:
- track = await self.__parse_track_from_uri(line)
- if track:
- yield track
- index += 1
- if limit and index == limit:
- break
+ track = await self.__async_parse_track_from_uri(line)
+ if track:
+ yield track
+ index += 1
async def async_get_artist_albums(self, prov_artist_id: str) -> List[Album]:
"""Get a list of albums for the given artist."""
else:
artistpath = prov_artist_id
if not os.path.isdir(artistpath):
- LOGGER.error("artist path does not exist: %s" % artistpath)
+ LOGGER.error("artist path does not exist: %s", artistpath)
return
for dirname in os.listdir(artistpath):
dirpath = os.path.join(artistpath, dirname)
if os.path.isdir(dirpath) and not dirpath.startswith("."):
- album = await self.get_album(dirpath)
+ album = await self.async_get_album(dirpath)
if album:
yield album
async def async_get_artist_toptracks(self, prov_artist_id: str) -> List[Track]:
"""Get a list of random tracks as we have no clue about preference."""
- async for album in self.get_artist_albums(prov_artist_id):
- async for track in self.get_album_tracks(album.item_id):
+ async for album in self.async_get_artist_albums(prov_artist_id):
+ async for track in self.async_get_album_tracks(album.item_id):
yield track
- async def async_get_stream_details(self, track_id):
+ async def async_get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- if os.sep not in track_id:
- track_id = base64.b64decode(track_id).decode("utf-8")
+ if os.sep not in item_id:
+ track_id = base64.b64decode(item_id).decode("utf-8")
if not os.path.isfile(track_id):
return None
# TODO: retrieve sanple rate and bitdepth
return StreamDetails(
type=StreamType.FILE,
provider=PROV_ID,
- item_id=track_id,
- content_type=ContentType(track_id.split(".")[-1]),
- path=track_id,
+ item_id=item_id,
+ content_type=ContentType(item_id.split(".")[-1]),
+ path=item_id,
sample_rate=44100,
bit_depth=16,
)
prov_item_id = base64.b64encode(filename.encode("utf-8")).decode("utf-8")
track.duration = song.length
track.item_id = prov_item_id
- track.provider = self.prov_id
+ track.provider = PROV_ID
name = song.tags["TITLE"][0]
track.name, track.version = parse_title_and_version(name)
albumpath = filename.rsplit(os.sep, 1)[0]
- track.album = await self.get_album(albumpath)
+ track.album = await self.async_get_album(albumpath)
artists = []
for artist_str in song.tags["ARTIST"]:
local_artist_path = os.path.join(self._music_dir, artist_str)
if os.path.isfile(local_artist_path):
- artist = await self.get_artist(local_artist_path)
+ artist = await self.async_get_artist(local_artist_path)
else:
artist = Artist()
artist.name = artist_str
async def __async_parse_track_from_uri(self, uri):
"""Try to parse a track from an uri found in playlist."""
+ # pylint: disable=broad-except
if "://" in uri:
# track is uri from external provider?
prov_id = uri.split("://")[0]
prov_item_id, prov_id, lazy=False
)
except Exception as exc:
- LOGGER.warning("Could not parse uri %s to track: %s" % (uri, str(exc)))
+ LOGGER.warning("Could not parse uri %s to track: %s", uri, str(exc))
return None
# try to treat uri as filename
# TODO: filename could be related to musicdir or full path
return await self.mass.player_manager.async_play_media(
player_id, media_items, queue_opt
)
- elif "spotify://playlist" in media_content_id:
+ if "spotify://playlist" in media_content_id:
# TODO: handle parsing of other uri's here
playlist = await self.mass.music_manager.async_getplaylist(
"spotify", media_content_id.split(":")[-1]
import aiohttp
from asyncio_throttle import Throttler
-from music_assistant.app_vars import get_app_var # noqa
+from music_assistant.app_vars import get_app_var # noqa # pylint: disable=all
from music_assistant.constants import (
CONF_PASSWORD,
CONF_USERNAME,
else:
LOGGER.warning("Received command for unavailable player: %s", player_id)
- async def async_cmd_stop(self, player_id: str):
+ async def async_cmd_stop(self, player_id: str) -> None:
"""
Send STOP command to given player.
else:
LOGGER.warning("Received command for unavailable player: %s", player_id)
- async def async_cmd_play(self, player_id: str):
+ async def async_cmd_play(self, player_id: str) -> None:
"""
Send STOP command to given player.
else:
LOGGER.warning("Received command for unavailable player: %s", player_id)
- async def async_cmd_power_on(self, player_id: str):
+ async def async_cmd_power_on(self, player_id: str) -> None:
"""
Send POWER ON command to given player.
else:
LOGGER.warning("Received command for unavailable player: %s", player_id)
- async def async_cmd_power_off(self, player_id: str):
+ async def async_cmd_power_off(self, player_id: str) -> None:
"""
Send POWER OFF command to given player.
else:
LOGGER.warning("Received command for unavailable player: %s", player_id)
- async def async_cmd_volume_set(self, player_id: str, volume_level: int):
+ async def async_cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""
Send volume level command to given player.
return await self.async_cmd_queue_insert(
player_id, queue_items, len(player_queue.items)
)
- else:
- LOGGER.warning("Received command for unavailable player: %s", player_id)
+ LOGGER.warning("Received command for unavailable player: %s", player_id)
async def async_cmd_queue_clear(self, player_id: str):
"""
import aiohttp
from asyncio_throttle import Throttler
-from music_assistant.app_vars import get_app_var # noqa
+from music_assistant.app_vars import get_app_var # noqa # pylint: disable=all
from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType
from music_assistant.models.media_types import (
else:
LOGGER.warning("Received command for unavailable player: %s", player_id)
- async def async_cmd_stop(self, player_id: str):
+ async def async_cmd_stop(self, player_id: str) -> None:
"""
Send STOP command to given player.
else:
LOGGER.warning("Received command for unavailable player: %s", player_id)
- async def async_cmd_play(self, player_id: str):
+ async def async_cmd_play(self, player_id: str) -> None:
"""
Send PLAY command to given player.
if new_track:
await self.async_cmd_play_uri(player_id, new_track.uri)
- async def async_cmd_power_on(self, player_id: str):
+ async def async_cmd_power_on(self, player_id: str) -> None:
"""
Send POWER ON command to given player.
else:
LOGGER.warning("Received command for unavailable player: %s", player_id)
- async def async_cmd_power_off(self, player_id: str):
+ async def async_cmd_power_off(self, player_id: str) -> None:
"""
Send POWER OFF command to given player.
else:
LOGGER.warning("Received command for unavailable player: %s", player_id)
- async def async_cmd_volume_set(self, player_id: str, volume_level: int):
+ async def async_cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""
Send volume level command to given player.
:param player_id: player_id of the player to handle the command.
:param queue_items: a list of QueueItems
"""
- pass # automagically handled by built-in queue controller
+ # automagically handled by built-in queue controller
async def async_cmd_queue_update(
self, player_id: str, queue_items: List[QueueItem]
:param player_id: player_id of the player to handle the command.
:param queue_items: a list of QueueItems
"""
- pass # automagically handled by built-in queue controller
+ # automagically handled by built-in queue controller
async def async_cmd_queue_clear(self, player_id: str):
"""
async def async_start_discovery(self):
"""Start discovery for players."""
- transport, protocol = await self.mass.loop.create_datagram_endpoint(
+ transport, _ = await self.mass.loop.create_datagram_endpoint(
lambda: DiscoveryProtocol(self.mass.web.http_port),
local_addr=("0.0.0.0", 3483),
)
"""Decode a datagram message."""
if data[0] == "e":
return TLVDiscoveryRequestDatagram(data)
- elif data[0] == "E":
+ if data[0] == "E":
return TLVDiscoveryResponseDatagram(data)
- elif data[0] == "d":
+ if data[0] == "d":
return ClientDiscoveryDatagram(data)
- elif data[0] == "h":
+ if data[0] == "h":
pass # Hello!
- elif data[0] == "i":
+ if data[0] == "i":
pass # IR
- elif data[0] == "2":
+ if data[0] == "2":
pass # i2c?
- elif data[0] == "a":
+ if data[0] == "a":
pass # ack!
mreq = struct.pack("4sL", group, socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
- def error_received(self, exc):
+ @classmethod
+ def error_received(cls, exc):
"""Call on Error."""
LOGGER.error(exc)
- def connection_lost(self, *args, **kwargs):
+ @classmethod
+ def connection_lost(cls, *args, **kwargs):
"""Call on Connection lost."""
# pylint: disable=unused-argument
LOGGER.debug("Connection lost to discovery")
self.close()
@callback
+ @staticmethod
def __pack_stream(
- self,
command,
autostart=b"1",
formatbyte=b"o",
asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self))
@callback
- def _process_stat_stmo(self, data):
+ @classmethod
+ def _process_stat_stmo(cls, data):
"""
Process incoming stat STMo message.
asyncio.create_task(self._event_callback(Event.EVENT_UPDATED, self))
-class PySqueezeVolume(object):
+class PySqueezeVolume:
"""Represents a sound volume. This is an awful lot more complex than it sounds."""
minimum = 0
await self.mass.player_manager.async_add_player(player)
elif msg == EVENT_WEBPLAYER_STATE:
- await self.__handle_player_state(msg_details)
+ await self.__async_handle_player_state(msg_details)
@run_periodic(30)
- async def async_check_players(self):
+ async def async_check_players(self) -> None:
"""Invalidate players that did not send a heartbeat message in a while."""
cur_time = time.time()
offline_players = []
for player in self._players.values():
- if cur_time - player._last_message > 30:
+ if cur_time - player.last_message > 30:
offline_players.append(player.player_id)
for player_id in offline_players:
await self.mass.player_manager.async_remove_player(player_id)
self._players.pop(player_id, None)
- async def async_cmd_stop(self, player_id: str):
+ async def async_cmd_stop(self, player_id: str) -> None:
"""Send stop command to player."""
data = {"player_id": player_id, "cmd": "stop"}
self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
- async def async_cmd_play(self, player_id: str):
+ async def async_cmd_play(self, player_id: str) -> None:
"""Send play command to player."""
data = {"player_id": player_id, "cmd": "play"}
self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
data = {"player_id": player_id, "cmd": "pause"}
self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
- async def async_cmd_power_on(self, player_id: str):
+ async def async_cmd_power_on(self, player_id: str) -> None:
"""Send power ON command to player."""
self._players[player_id].powered = True # not supported on webplayer
data = {"player_id": player_id, "cmd": "stop"}
self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
- async def async_cmd_power_off(self, player_id: str):
+ async def async_cmd_power_off(self, player_id: str) -> None:
"""Send power OFF command to player."""
await self.async_cmd_stop(player_id)
self._players[player_id].powered = False
- async def async_cmd_volume_set(self, volume_level, player_id: str):
+ async def async_cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send new volume level command to player."""
data = {
"player_id": player_id,
player.powered = data["powered"]
if "name" in data:
player.name = data["name"]
- player._last_message = time.time()
+ player.last_message = time.time()
self.mass.add_job(self.mass.player_manager.async_update_player(player))
"""Try to parse a bool."""
if isinstance(possible_bool, bool):
return possible_bool
- else:
- return possible_bool in ["true", "True", "1", "on", "ON", 1]
+ return possible_bool in ["true", "True", "1", "on", "ON", 1]
def parse_title_and_version(track_title, track_version=None):
[MASTER]
ignore=tests
+ignore-patterns=app_vars
# Use a conservative default here; 2 should speed up most setups and not hurt
# any too bad. Override on command line as appropriate.
jobs=2
persistent=no
+suggestion-mode=yes
+extension-pkg-whitelist=taglib
[BASIC]
good-names=id,i,j,k,ex,Run,_,fp,T,ev
# inconsistent-return-statements - doesn't handle raise
# too-many-ancestors - it's too strict.
# wrong-import-order - isort guards this
+# fixme - project is in development phase
disable=
format,
abstract-class-little-used,
too-many-statements,
too-many-boolean-expressions,
unused-argument,
- wrong-import-order
+ wrong-import-order,
+ fixme
# enable useless-suppression temporarily every now and then to clean them up
enable=
use-symbolic-message-instead
[REPORTS]
score=no
+[REFACTORING]
+
+# Maximum number of nested blocks for function / method body
+max-nested-blocks=15
+
[TYPECHECK]
# For attrs
ignored-classes=_CountingAttr
E266
[isort]
+profile = black
multi_line_output = 3
include_trailing_comma = True
force_grid_wrap = 0
line_length = 88
[mypy]
-follow_imports = skip
+python_version = 3.7
+ignore_errors = true
+follow_imports = silent
ignore_missing_imports = true
-check_untyped_defs = true
-disallow_incomplete_defs = true
-disallow_untyped_calls = true
-disallow_untyped_defs = true
-warn_return_any = true
-warn_unreachable = true
-warn_unused_ignores = true
warn_incomplete_stub = true
warn_redundant_casts = true
warn_unused_configs = true