import argparse
import asyncio
+from cgi import test
import logging
import os
from os.path import abspath, dirname
from music_assistant.mass import MusicAssistant
from music_assistant.models.config import MassConfig, MusicProviderConfig
from music_assistant.models.enums import ProviderType
+from music_assistant.models.player import Player, PlayerState
+from music_assistant.models.player_queue import RepeatMode
# setup logger
logging.basicConfig(
)
)
+class TestPlayer(Player):
+ """Demonstatration player implementation."""
+
+ def __init__(self, player_id: str):
+ """Init."""
+ self.player_id = player_id
+ self._attr_name = player_id
+ self._attr_powered = True
+ self._attr_elapsed_time = 0
+ self._attr_current_url = ""
+ self._attr_state = PlayerState.IDLE
+ self._attr_available = True
+ self._attr_volume_level = 100
+
+ async def play_url(self, url: str) -> None:
+ """Play the specified url on the player."""
+ print(f"stream url: {url}")
+ self._attr_current_url = url
+ self.update_state()
+
+ async def stop(self) -> None:
+ """Send STOP command to player."""
+ print("stop called")
+ self._attr_state = PlayerState.IDLE
+ self._attr_current_url = None
+ self._attr_elapsed_time = 0
+ self.update_state()
+
+ async def play(self) -> None:
+ """Send PLAY/UNPAUSE command to player."""
+ print("play called")
+ self._attr_state = PlayerState.PLAYING
+ self._attr_elapsed_time = 1
+ self.update_state()
+
+ async def pause(self) -> None:
+ """Send PAUSE command to player."""
+ print("pause called")
+ self._attr_state = PlayerState.PAUSED
+ self.update_state()
+
+ async def power(self, powered: bool) -> None:
+ """Send POWER command to player."""
+ print(f"POWER CALLED - new power: {powered}")
+ self._attr_powered = powered
+ self._attr_current_url = None
+ self.update_state()
+
+ async def volume_set(self, volume_level: int) -> None:
+ """Send volume level (0..100) command to player."""
+ print(f"volume_set called - {volume_level}")
+ self._attr_volume_level = volume_level
+ self.update_state()
+
async def main():
"""Handle main execution."""
# get some data
yt = mass.music.get_provider(ProviderType.YTMUSIC)
await yt.get_album("MPREb_9nqEki4ZDpp")
- #await yt.get_track("f3igK4EDUnk")
- await yt.get_track("pE3ju1qS848")
+ track = await yt.get_track("pE3ju1qS848")
+
+ test_player1 = TestPlayer("test1")
+ await mass.players.register_player(test_player1)
+ await test_player1.active_queue.play_media(track)
+
+ await asyncio.sleep(3600)
+
if __name__ == "__main__":
"""YT Music support for MusicAssistant"""
import json
from requests.structures import CaseInsensitiveDict
-from typing import Dict, List, Optional
+from typing import AsyncGenerator, Dict, List, Optional
import ytmusicapi
+import pytube
from music_assistant.models.enums import ProviderType
-from music_assistant.models.errors import LoginFailed, MediaNotFoundError
+from music_assistant.models.errors import MediaNotFoundError
from music_assistant.models.media_items import (
Album,
AlbumType,
Track,
)
from music_assistant.models.music_provider import MusicProvider
+from music_assistant.helpers.audio import (
+ get_http_stream,
+)
YTM_DOMAIN = "https://music.youtube.com"
YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/"
async def get_track(self, prov_track_id) -> Track:
"""Get full track details by id."""
- signatureTimestamp = ytmusicapi.mixins._utils.get_datestamp() - 1
+ signature_timestamp = ytmusicapi.mixins._utils.get_datestamp() - 1
data = {
"playbackContext": {
"contentPlaybackContext": {
- "signatureTimestamp": signatureTimestamp
+ "signatureTimestamp": signature_timestamp
}
},
"video_id": prov_track_id
}
- track_obj = await self._post_data(f"player", data=data)
+ track_obj = await self._post_data("player", data=data)
return await self._parse_track(track_obj) if track_obj else None
+ async def get_stream_details(self, item_id: str) -> StreamDetails:
+ """Return the content details for the given track when it will be streamed."""
+ signature_timestamp = ytmusicapi.mixins._utils.get_datestamp() - 1
+ data = {
+ "playbackContext": {
+ "contentPlaybackContext": {
+ "signatureTimestamp": signature_timestamp
+ }
+ },
+ "video_id": item_id
+ }
+ track_obj = await self._post_data("player", data=data)
+ stream_format = await self._parse_stream_format(track_obj)
+ url = await self._parse_stream_url(stream_format=stream_format, item_id=item_id)
+
+ return StreamDetails(
+ provider=self.type,
+ item_id=item_id,
+ data=url,
+ content_type=ContentType.AAC
+ )
+
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Return the audio stream for the provider item."""
+ async for chunk in get_http_stream(
+ self.mass, streamdetails.data, streamdetails, seek_position
+ ):
+ yield chunk
+
async def _post_data(self, endpoint: str, data: Dict[str, str], **kwargs):
url = f"{YTM_BASE_URL}{endpoint}"
data.update(self._context)
return headers
-
async def _initialize_context(self) -> Dict[str, str]:
"""Returns a dict to use as a context in requests"""
return {
for k in list(track_obj.keys()):
if k not in keys:
del track_obj[k]
- print(json.dumps(track_obj))
track = Track(
item_id=track_obj["videoDetails"]["videoId"],
provider=self.type,
name=track_obj["videoDetails"]["title"]
)
- return track
\ No newline at end of file
+ return track
+
+ async def _parse_stream_format(self, track_obj: dict) -> dict:
+ """Grabs the highes available audio stream from available streams"""
+ stream_format = None
+
+ for format in track_obj["streamingData"]["adaptiveFormats"]:
+ if format["mimeType"].startswith("audio") and format["audioQuality"] == "AUDIO_QUALITY_HIGH":
+ stream_format = format
+
+ if stream_format is None:
+ raise MediaNotFoundError("No stream found for this track")
+
+ return stream_format
+
+ async def _parse_stream_url(self, stream_format: dict, item_id: str) -> str:
+ """Figures out the stream URL to use based on the YT track object"""
+ cipherParts = dict()
+ for part in stream_format["signatureCipher"].split("&"):
+ k, v = part.split("=", maxsplit=1)
+ cipherParts[k] = v
+
+ signature = await self._decipher_signature(ciphered_signature=cipherParts["s"], item_id=item_id)
+ url = cipherParts["url"] + "&sig=" + signature
+ return url
+
+ async def _decipher_signature(self, ciphered_signature: str, item_id: str):
+ """Decipher the signature, required to build the Stream URL"""
+ embed_url = f"https://www.youtube.com/embed/{item_id}"
+ embed_html = pytube.request.get(embed_url)
+ js_url = pytube.extract.js_url(embed_html)
+ js = pytube.request.get(js_url)
+ cipher = pytube.cipher.Cipher(js=js)
+ return cipher.get_signature(ciphered_signature)