:param media_types: A list of media_types to include.
:param limit: number of items to return in the search (per type).
"""
+ if not media_types:
+ media_types = MediaType.ALL
# Check if the search query is a streaming provider public shareable URL
try:
media_type, provider_instance_id_or_domain, item_id = await parse_uri(
for sublist in zip_longest(*[x.artists for x in results_per_provider])
for item in sublist
if item is not None
- ],
+ ][:limit],
albums=[
item
for sublist in zip_longest(*[x.albums for x in results_per_provider])
for item in sublist
if item is not None
- ],
+ ][:limit],
tracks=[
item
for sublist in zip_longest(*[x.tracks for x in results_per_provider])
for sublist in zip_longest(*[x.playlists for x in results_per_provider])
for item in sublist
if item is not None
- ],
+ ][:limit],
radio=[
item
for sublist in zip_longest(*[x.radio for x in results_per_provider])
for item in sublist
if item is not None
- ],
+ ][:limit],
)
async def search_provider(
self,
search_query: str,
provider_instance_id_or_domain: str,
- media_types: list[MediaType] = MediaType.ALL,
+ media_types: list[MediaType],
limit: int = 10,
) -> SearchResults:
"""Perform search on given provider.
:param provider_instance_id_or_domain: instance_id or domain of the provider
to perform the search on.
:param provider_instance: instance id of the provider to perform the search on.
- :param media_types: A list of media_types to include. All types if None.
+ :param media_types: A list of media_types to include.
:param limit: number of items to return in the search (per type).
"""
prov = self.mass.get_provider(provider_instance_id_or_domain)
)
from music_assistant.common.models.errors import (
AlreadyRegisteredError,
+ PlayerCommandFailed,
PlayerUnavailableError,
ProviderUnavailableError,
UnsupportedFeaturedException,
player = self.get(player_id, True)
if player.announcement_in_progress:
return
+ if not url.startswith("http"):
+ raise PlayerCommandFailed("Only URLs are supported for announcements")
try:
# mark announcement_in_progress on player
player.announcement_in_progress = True
from music_assistant.constants import MASS_LOGGER_NAME
LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.helpers.process")
-
+PRIVILEGED = True
DEFAULT_CHUNKSIZE = 64000
async def start(self) -> None:
"""Perform Async init of process."""
- self.proc = await asyncio.create_subprocess_exec(
- *self._args,
- stdin=asyncio.subprocess.PIPE if self._stdin is True else self._stdin,
- stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
- stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
- # because we're exchanging big amounts of (audio) data with pipes
- # it makes sense to extend the pipe size and (buffer) limits a bit
- limit=1000000,
- pipesize=1000000,
- )
+ for attempt in range(2):
+ try:
+ self.proc = await asyncio.create_subprocess_exec(
+ *self._args,
+ stdin=asyncio.subprocess.PIPE if self._stdin is True else self._stdin,
+ stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
+ stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
+ # because we're exchanging big amounts of (audio) data with pipes
+ # it makes sense to extend the pipe size and (buffer) limits a bit
+ limit=1000000 if attempt == 0 else 65536,
+ pipesize=1000000 if attempt == 0 else -1,
+ )
+ except PermissionError:
+ if attempt > 0:
+ raise
+ LOGGER.error(
+ "Detected that you are running the (docker) container without "
+ "permissive access rights. This will impact performance !"
+ )
+
self.logger.debug("Process %s started with PID %s", self.name, self.proc.pid)
async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
await self._apprunner.setup()
# set host to None to bind to all addresses on both IPv4 and IPv6
host = None if bind_ip == "0.0.0.0" else bind_ip
- self._tcp_site = web.TCPSite(self._apprunner, host=host, port=bind_port)
- await self._tcp_site.start()
+ try:
+ self._tcp_site = web.TCPSite(self._apprunner, host=host, port=bind_port)
+ await self._tcp_site.start()
+ except OSError:
+ if host is None:
+ raise
+ # the configured interface is not available, retry on all interfaces
+ self.logger.error(
+ "Could not bind to %s, will start on all interfaces as fallback!", host
+ )
+ self._tcp_site = web.TCPSite(self._apprunner, host=None, port=bind_port)
+ await self._tcp_site.start()
async def close(self) -> None:
"""Cleanup on exit."""
async def search(
self,
search_query: str,
- media_types: list[MediaType] | None = None,
+ media_types: list[MediaType],
limit: int = 5,
) -> SearchResults:
"""Perform search on musicprovider.
:param search_query: Search query.
- :param media_types: A list of media_types to include. All types if None.
+ :param media_types: A list of media_types to include.
:param limit: Number of items to return in the search (per type).
"""
if ProviderFeature.SEARCH in self.supported_features:
return SUPPORTED_FEATURES
async def search(
- self, search_query: str, media_types=list[MediaType] | None, limit: int = 5
+ self, search_query: str, media_types=list[MediaType], limit: int = 5
) -> SearchResults:
"""Perform search on music provider.
:param search_query: Search query.
:param media_types: A list of media_types to include. All types if None.
"""
- # If no media_types are provided, search for all types
- if not media_types:
- media_types = [
- MediaType.ARTIST,
- MediaType.ALBUM,
- MediaType.TRACK,
- MediaType.PLAYLIST,
- ]
-
# Create a task for each media_type
tasks = {}
async def search(\r
self,\r
search_query: str,\r
- media_types: list[MediaType] | None = None,\r
+ media_types: list[MediaType],\r
limit: int = 20,\r
) -> SearchResults:\r
"""Perform search on the plex library.\r
:param media_types: A list of media_types to include. All types if None.\r
:param limit: Number of items to return in the search (per type).\r
"""\r
- if not media_types:\r
- media_types = [MediaType.ARTIST, MediaType.ALBUM, MediaType.TRACK, MediaType.PLAYLIST]\r
-\r
tasks = {}\r
\r
async with TaskGroup() as tg:\r
return await asyncio.to_thread(_get_cover_art)
async def search(
- self, search_query: str, media_types: list[MediaType] | None = None, limit: int = 20
+ self, search_query: str, media_types: list[MediaType], limit: int = 20
) -> SearchResults:
"""Search the sonic library."""
- artists = limit
- albums = limit
- songs = limit
- if media_types:
- if MediaType.ARTIST not in media_types:
- artists = 0
- if MediaType.ALBUM not in media_types:
- albums = 0
- if MediaType.TRACK not in media_types:
- songs = 0
+ artists = limit if MediaType.ARTIST in media_types else 0
+ albums = limit if MediaType.ALBUM in media_types else 0
+ songs = limit if MediaType.TRACK in media_types else 0
+ if not (artists or albums or songs):
+ return SearchResults()
answer = await self._run_async(
self._conn.search3,
query=search_query,
async def search(
self,
search_query: str,
- media_types: list[MediaType] | None = None,
+ media_types: list[MediaType],
limit: int = 20,
) -> SearchResults:
"""Perform search on the plex library.
:param search_query: Search query.
- :param media_types: A list of media_types to include. All types if None.
+ :param media_types: A list of media_types to include.
:param limit: Number of items to return in the search (per type).
"""
- if not media_types:
- media_types = [
- MediaType.ARTIST,
- MediaType.ALBUM,
- MediaType.TRACK,
- MediaType.PLAYLIST,
- ]
-
tasks = {}
async with TaskGroup() as tg:
return SUPPORTED_FEATURES
async def search(
- self, search_query: str, media_types=list[MediaType] | None, limit: int = 5
+ self, search_query: str, media_types=list[MediaType], limit: int = 5
) -> SearchResults:
"""Perform search on musicprovider.
:param limit: Number of items to return in the search (per type).
"""
result = SearchResults()
+ media_types = [
+ x
+ for x in media_types
+ if x in (MediaType.ARTIST, MediaType.ALBUM, MediaType.TRACK, MediaType.PLAYLIST)
+ ]
+ if not media_types:
+ return result
params = {"query": search_query, "limit": limit}
if len(media_types) == 1:
# qobuz does not support multiple searchtypes, falls back to all if no type given
if media_types[0] == MediaType.PLAYLIST:
params["type"] = "playlists"
if searchresult := await self._get_data("catalog/search", **params):
- if "artists" in searchresult:
+ if "artists" in searchresult and MediaType.ARTIST in media_types:
result.artists += [
self._parse_artist(item)
for item in searchresult["artists"]["items"]
if (item and item["id"])
]
- if "albums" in searchresult:
+ if "albums" in searchresult and MediaType.ALBUM in media_types:
result.albums += [
await self._parse_album(item)
for item in searchresult["albums"]["items"]
if (item and item["id"])
]
- if "tracks" in searchresult:
+ if "tracks" in searchresult and MediaType.TRACK in media_types:
result.tracks += [
await self._parse_track(item)
for item in searchresult["tracks"]["items"]
if (item and item["id"])
]
- if "playlists" in searchresult:
+ if "playlists" in searchresult and MediaType.PLAYLIST in media_types:
result.playlists += [
self._parse_playlist(item)
for item in searchresult["playlists"]["items"]
from __future__ import annotations
-from time import time
from typing import TYPE_CHECKING
from radios import FilterBy, Order, RadioBrowser, RadioBrowserError
self.logger.exception("%s", err)
async def search(
- self, search_query: str, media_types=list[MediaType] | None, limit: int = 10
+ self, search_query: str, media_types=list[MediaType], limit: int = 10
) -> SearchResults:
"""Perform search on musicprovider.
:param limit: Number of items to return in the search (per type).
"""
result = SearchResults()
- searchtypes = []
- if MediaType.RADIO in media_types:
- searchtypes.append("radio")
-
- time_start = time()
+ if MediaType.RADIO not in media_types:
+ return result
searchresult = await self.radios.search(name=search_query, limit=limit)
- self.logger.debug(
- "Processing RadioBrowser search took %s seconds",
- round(time() - time_start, 2),
- )
for item in searchresult:
result.radio.append(await self._parse_radio(item))
return await asyncio.to_thread(call, *args, **kwargs)
async def search(
- self, search_query: str, media_types=list[MediaType] | None, limit: int = 10
+ self, search_query: str, media_types=list[MediaType], limit: int = 10
) -> SearchResults:
"""Perform search on musicprovider.
:param search_query: Search query.
- :param media_types: A list of media_types to include. All types if None.
+ :param media_types: A list of media_types to include.
:param limit: Number of items to return in the search (per type).
"""
result = SearchResults()
if MediaType.PLAYLIST in media_types:
searchtypes.append("playlist")
- time_start = time.time()
+ media_types = [
+ x for x in media_types if x in (MediaType.ARTIST, MediaType.TRACK, MediaType.PLAYLIST)
+ ]
+ if not media_types:
+ return result
searchresult = await self._soundcloud.search(search_query, limit)
- self.logger.debug(
- "Processing Soundcloud search took %s seconds",
- round(time.time() - time_start, 2),
- )
-
for item in searchresult["collection"]:
media_type = item["kind"]
if media_type == "user":
- result.artists.append(await self._parse_artist(item))
+ if MediaType.ARTIST in media_types:
+ result.artists.append(await self._parse_artist(item))
elif media_type == "track":
- result.tracks.append(await self._parse_track(item))
+ if MediaType.TRACK in media_types:
+ result.tracks.append(await self._parse_track(item))
elif media_type == "playlist":
- result.playlists.append(await self._parse_playlist(item))
+ if MediaType.PLAYLIST in media_types:
+ result.playlists.append(await self._parse_playlist(item))
return result
)
async def search(
- self, search_query: str, media_types=list[MediaType] | None, limit: int = 5
+ self, search_query: str, media_types=list[MediaType], limit: int = 5
) -> SearchResults:
"""Perform search on musicprovider.
:param search_query: Search query.
- :param media_types: A list of media_types to include. All types if None.
+ :param media_types: A list of media_types to include.
:param limit: Number of items to return in the search (per type).
"""
- result = SearchResults()
+ searchresult = SearchResults()
searchtypes = []
if MediaType.ARTIST in media_types:
searchtypes.append("artist")
searchtypes.append("track")
if MediaType.PLAYLIST in media_types:
searchtypes.append("playlist")
+ if not searchtypes:
+ return searchresult
searchtype = ",".join(searchtypes)
search_query = search_query.replace("'", "")
- searchresult = await self._get_data("search", q=search_query, type=searchtype, limit=limit)
- if "artists" in searchresult:
- result.artists += [
+ api_result = await self._get_data("search", q=search_query, type=searchtype, limit=limit)
+ if "artists" in api_result:
+ searchresult.artists += [
self._parse_artist(item)
- for item in searchresult["artists"]["items"]
+ for item in api_result["artists"]["items"]
if (item and item["id"] and item["name"])
]
- if "albums" in searchresult:
- result.albums += [
+ if "albums" in api_result:
+ searchresult.albums += [
self._parse_album(item)
- for item in searchresult["albums"]["items"]
+ for item in api_result["albums"]["items"]
if (item and item["id"])
]
- if "tracks" in searchresult:
- result.tracks += [
+ if "tracks" in api_result:
+ searchresult.tracks += [
self._parse_track(item)
- for item in searchresult["tracks"]["items"]
+ for item in api_result["tracks"]["items"]
if (item and item["id"])
]
- if "playlists" in searchresult:
- result.playlists += [
+ if "playlists" in api_result:
+ searchresult.playlists += [
self._parse_playlist(item)
- for item in searchresult["playlists"]["items"]
+ for item in api_result["playlists"]["items"]
if (item and item["id"])
]
- return result
+ return searchresult
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve library artists from spotify."""
async def search(
self,
search_query: str,
- media_types: list[MediaType] | None = None,
+ media_types: list[MediaType],
limit: int = 5,
) -> SearchResults:
"""Perform search on musicprovider.
:param search_query: Search query.
- :param media_types: A list of media_types to include. All types if None.
+ :param media_types: A list of media_types to include.
:param limit: Number of items to return in the search (per type).
"""
+ parsed_results = SearchResults()
+ media_types = [
+ x
+ for x in media_types
+ if x in (MediaType.ARTIST, MediaType.ALBUM, MediaType.TRACK, MediaType.PLAYLIST)
+ ]
+ if not media_types:
+ return parsed_results
+
tidal_session = await self._get_tidal_session()
search_query = search_query.replace("'", "")
results = await search(tidal_session, search_query, media_types, limit)
- parsed_results = SearchResults()
+
if results["artists"]:
for artist in results["artists"]:
parsed_results.artists.append(self._parse_artist(artist))
async def search(
session: TidalSession,
query: str,
- media_types: list[MediaType] | None = None,
+ media_types: list[MediaType],
limit: int = 50,
offset: int = 0,
) -> dict[str, str]:
def inner() -> dict[str, str]:
search_types = []
- if media_types and MediaType.ARTIST in media_types:
+ if MediaType.ARTIST in media_types:
search_types.append(TidalArtist)
- if media_types and MediaType.ALBUM in media_types:
+ if MediaType.ALBUM in media_types:
search_types.append(TidalAlbum)
- if media_types and MediaType.TRACK in media_types:
+ if MediaType.TRACK in media_types:
search_types.append(TidalTrack)
- if media_types and MediaType.PLAYLIST in media_types:
+ if MediaType.PLAYLIST in media_types:
search_types.append(TidalPlaylist)
- models = search_types if search_types else None
+ models = search_types
results: dict[str, str] = session.search(query, models, limit, offset)
return results
return SUPPORTED_FEATURES
async def search(
- self, search_query: str, media_types=list[MediaType] | None, limit: int = 5
+ self, search_query: str, media_types=list[MediaType], limit: int = 5
) -> SearchResults:
"""Perform search on musicprovider.
:param media_types: A list of media_types to include. All types if None.
:param limit: Number of items to return in the search (per type).
"""
+ parsed_results = SearchResults()
ytm_filter = None
if len(media_types) == 1:
# YTM does not support multiple searchtypes, falls back to all if no type given
ytm_filter = "songs"
if media_types[0] == MediaType.PLAYLIST:
ytm_filter = "playlists"
+ if media_types[0] == MediaType.RADIO:
+ # bit of an edge case but still good to handle
+ return parsed_results
results = await search(
query=search_query, ytm_filter=ytm_filter, limit=limit, language=self.language
)
parsed_results = SearchResults()
for result in results:
try:
- if result["resultType"] == "artist":
+ if result["resultType"] == "artist" and MediaType.ARTIST in media_types:
parsed_results.artists.append(await self._parse_artist(result))
- elif result["resultType"] == "album":
+ elif result["resultType"] == "album" and MediaType.ALBUM in media_types:
parsed_results.albums.append(await self._parse_album(result))
- elif result["resultType"] == "playlist":
+ elif result["resultType"] == "playlist" and MediaType.PLAYLIST in media_types:
parsed_results.playlists.append(await self._parse_playlist(result))
- elif result["resultType"] in ("song", "video") and (
- track := await self._parse_track(result)
+ elif (
+ result["resultType"] in ("song", "video")
+ and MediaType.TRACK in media_types
+ and (track := await self._parse_track(result))
):
parsed_results.tracks.append(track)
except InvalidDataError:
"asyncio-throttle==1.0.2",
"aiofiles==23.2.1",
"aiorun==2023.7.2",
+ "certifi==2024.2.2",
"colorlog==6.8.2",
"aiosqlite==0.20.0",
"python-slugify==8.0.4",
"memory-tempfile==2.2.3",
"music-assistant-frontend==2.5.2",
"pillow==10.3.0",
- "pyatv==0.14.5",
- "soundcloudpy==0.1.0",
"unidecode==1.3.8",
"xmltodict==0.13.0",
"orjson==3.10.3",
"ifaddr==0.2.0",
]
test = [
- "black==24.4.2",
"codespell==2.2.6",
"isort==5.13.2",
"mypy==1.10.0",
"pytest==8.2.0",
"pytest-aiohttp==1.0.5",
"pytest-cov==5.0.0",
+ "tomli==2.0.1",
"ruff==0.4.3",
]
aiosqlite==0.20.0
async-upnp-client==0.38.3
asyncio-throttle==1.0.2
+certifi==2024.2.2
colorlog==6.8.2
cryptography==42.0.7
deezer-python-async==0.3.0
pillow==10.3.0
plexapi==4.15.12
py-opensonic==5.0.5
-pyatv==0.14.5
PyChromecast==14.0.1
pycryptodome==3.20.0
python-fullykiosk==0.0.12