from .models.media_types import TrackQuality, MediaType
from .models.playerstate import PlayerState
-
class HTTPStreamer():
''' Built-in streamer using sox and webserver '''
self.mass = mass
self.local_ip = get_ip()
self.analyze_jobs = {}
+ self.stream_clients = []
async def setup(self):
''' async initialize of module '''
- # TODO: cleanup of cache files etc.
pass
+ # self.mass.event_loop.create_task(
+ # asyncio.start_server(self.sockets_streamer, '0.0.0.0', 8093))
+
+ async def webplayer(self, http_request):
+ '''
+ start stream for a player
+ '''
+ from .models import Player
+ player_id = http_request.match_info.get('player_id')
+ player = Player(self.mass, player_id, 'web')
+ player.name = player_id
+ await self.mass.players.add_player(player)
+ # wait for queue
+ while not player.queue.items:
+ await asyncio.sleep(0.2)
+ return await self.stream(http_request)
async def stream(self, http_request):
'''
# make sure we have a valid player
player_id = http_request.match_info.get('player_id','')
player = await self.mass.players.get_player(player_id)
- if not player:
- LOGGER.error("Received stream request for non-existing player %s" %(player_id))
- return
- queue_item_id = http_request.match_info.get('queue_item_id')
- queue_item = await player.queue.by_item_id(queue_item_id)
+ assert(player)
# prepare headers as audio/flac content
- resp = web.StreamResponse(status=200, reason='OK', headers={'Content-Type': 'audio/flac'})
+ resp = web.StreamResponse(status=200, reason='OK',
+ headers={'Content-Type': 'audio/flac'})
await resp.prepare(http_request)
# send content only on GET request
- if http_request.method.upper() != 'HEAD':
- # stream audio
- buf_queue = asyncio.Queue()
- cancelled = threading.Event()
- if queue_item:
- # single stream requested, run stream in executor
- bg_task = run_async_background_task(
- None,
- self.__stream_single, player, queue_item, buf_queue, cancelled)
- else:
- # no item is given, start queue stream, run stream in executor
- bg_task = run_async_background_task(
- None,
- self.__stream_queue, player, buf_queue, cancelled)
- try:
- while True:
- chunk = await buf_queue.get()
- if not chunk:
- buf_queue.task_done()
- break
- await resp.write(chunk)
+ if http_request.method.upper() != 'GET':
+ return resp
+ # stream audio
+ buf_queue = asyncio.Queue()
+ cancelled = threading.Event()
+ if player.queue.use_queue_stream:
+ # use queue stream
+ bg_task = run_async_background_task(
+ None,
+ self.__stream_queue, player, buf_queue, cancelled)
+ else:
+ # single track stream
+ queue_item_id = http_request.match_info.get('queue_item_id')
+ queue_item = await player.queue.by_item_id(queue_item_id)
+ assert(queue_item)
+ bg_task = run_async_background_task(
+ None,
+ self.__stream_single, player, queue_item, buf_queue, cancelled)
+ # let the streaming begin!
+ try:
+ while True:
+ chunk = await buf_queue.get()
+ if not chunk:
buf_queue.task_done()
- except (asyncio.CancelledError, asyncio.TimeoutError):
- LOGGER.debug("stream interrupted")
- cancelled.set()
- # wait for bg_task
- await asyncio.gather(bg_task)
- del buf_queue
- raise asyncio.CancelledError()
+ break
+ await resp.write(chunk)
+ buf_queue.task_done()
+ except (asyncio.CancelledError, asyncio.TimeoutError):
+ LOGGER.debug("stream interrupted")
+ cancelled.set()
+ # wait for bg_task to finish
+ await asyncio.gather(bg_task)
+ del buf_queue
+ raise asyncio.CancelledError()
return resp
async def __stream_single(self, player, queue_item, buffer, cancelled):
asyncio.run_coroutine_threadsafe(
self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails), self.mass.event_loop)
# send task to background to analyse the audio
- asyncio.ensure_future(self.__analyze_audio(queue_item), loop=self.mass.event_loop)
+ self.mass.event_loop.call_soon_threadsafe(
+ asyncio.ensure_future, self.__analyze_audio(queue_item))
async def __get_player_sox_options(self, player, queue_item):
''' get player specific sox effect options '''
fadeinfile.close()
fadeoutfile.close()
return crossfade_part
+
+ async def start_stream(self, clients_needed):
+ # wait for clients
+ print("wait for clients...")
+ track = asyncio.run_coroutine_threadsafe(
+ self.mass.music.track('2741', provider='database'),
+ self.mass.event_loop).result()
+ player_id = '1523403a-4cc4-f151-29d1-758822807128'
+ player = self.mass.players._players[player_id]
+ cancelled = threading.Event()
+ # wait for clients
+ while len(self.stream_clients) < clients_needed:
+ await asyncio.sleep(0.1)
+ # start streaming
+ while self.stream_clients:
+ audio_stream = self.__get_audio_stream(player, track, cancelled)
+ async for is_last_chunk, audio_chunk in audio_stream:
+ for client in self.stream_clients:
+ try:
+ client.write(audio_chunk)
+ await client.drain()
+ except ConnectionResetError:
+ print('client disconnected')
+ client.close()
+ self.stream_clients.remove(client)
+ await asyncio.sleep(1)
+ print("all clients disconnected")
+ return
+
+ async def add_client(self, client_writer, client_msg):
+ print("new client connected!")
+ for line in client_msg.decode().split('\r\n'):
+ print(line)
+ msg = 'HTTP/1.0 200 OK\r\n'
+ msg += "Content-Type: audio/flac\r\n"
+ msg += "Transfer-Encoding: chunked\r\n\r\n"
+ client_writer.write(msg.encode())
+ await client_writer.drain()
+ self.stream_clients.append(client_writer)
+ if len(self.stream_clients) == 1:
+ bg_task = run_async_background_task(
+ None,
+ self.start_stream, 2)
+
+ async def sockets_streamer(self, reader, writer):
+ while True:
+ request = await reader.read(1024)
+ if request:
+ await self.add_client(writer, request)
+ else:
+ print("client lost")
+ break
+