continue
if playlist_prov.prov_type.is_file():
# the file provider can handle uri's from all providers so simply add the uri
- track_id_to_add = track.uri
+ track_id_to_add = track_version.url
break
if track_version.prov_id == playlist_prov.prov_id:
track_id_to_add = track_version.item_id
itempath = await self.get_filepath(prov_playlist_id)
if not self.exists(itempath):
raise MediaNotFoundError(f"Playlist path does not exist: {itempath}")
- async with self.open_file(itempath, "a") as _file:
+ async with self.open_file(itempath, "r") as _file:
+ cur_data = await _file.read()
+ async with self.open_file(itempath, "w") as _file:
+ await _file.write(cur_data)
for uri in prov_track_ids:
- await _file.writeline(uri)
+ await _file.write(f"\n{uri}")
async def remove_playlist_tracks(
self, prov_playlist_id: str, prov_track_ids: List[str]
) -> None:
"""Remove track(s) from playlist."""
- # TODO !
- if MediaType.PLAYLIST in self.supported_mediatypes:
- raise NotImplementedError
+ itempath = await self.get_filepath(prov_playlist_id)
+ if not self.exists(itempath):
+ raise MediaNotFoundError(f"Playlist path does not exist: {itempath}")
+ cur_lines = []
+ async with self.open_file(itempath, "r") as _file:
+ for line in await _file.readlines():
+ line = urllib.parse.unquote(line.strip())
+ if line not in prov_track_ids:
+ cur_lines.append(line)
+ async with self.open_file(itempath, "w") as _file:
+ for uri in cur_lines:
+ await _file.write(f"{uri}\n")
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""