From 30994d3d2344afd49265af286ba68f41201a637a Mon Sep 17 00:00:00 2001 From: aikaterna <20862007+aikaterna@users.noreply.github.com> Date: Fri, 15 Apr 2022 19:49:25 -0700 Subject: [PATCH] [IcyParser] Read ICY 200 OK streams and PLS links --- icyparser/icyparser.py | 232 ++++++++++++++++++++++++++++++++++------- 1 file changed, 192 insertions(+), 40 deletions(-) diff --git a/icyparser/icyparser.py b/icyparser/icyparser.py index fbc0a87..5e5f269 100644 --- a/icyparser/icyparser.py +++ b/icyparser/icyparser.py @@ -1,56 +1,138 @@ -import aiohttp import discord +import io import lavalink +import logging import struct import re +from types import SimpleNamespace +from typing import List, Pattern, Optional +import urllib.error as urllib_error +import urllib.request as urllib_request + from redbot.core import commands +from redbot.core.utils.chat_formatting import pagify +from redbot.core.utils.menus import menu, DEFAULT_CONTROLS + + +log = logging.getLogger("red.aikaterna.icyparser") + + +RUN_ONCE: bool = False +HTML_CLEANUP: Pattern = re.compile("<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});") + + +def nice_to_icy(self): + """ + Converts an Icecast/Shoutcast HTTP v0.9 response of "ICY 200 OK" to "200 OK" thanks to the power of monkeypatching + dingles' answer on: + https://stackoverflow.com/questions/4247248/record-streaming-and-saving-internet-radio-in-python/5465831 + """ + + class InterceptedHTTPResponse: + pass + + line = self.fp.readline().replace(b"ICY 200 OK\r\n", b"HTTP/1.0 200 OK\r\n") + InterceptedSelf = InterceptedHTTPResponse() + InterceptedSelf.fp = io.BufferedReader(io.BytesIO(line)) + InterceptedSelf.debuglevel = self.debuglevel + InterceptedSelf._close_conn = self._close_conn + return ORIGINAL_HTTP_CLIENT_READ_STATUS(InterceptedSelf) + + +if not RUN_ONCE: + ORIGINAL_HTTP_CLIENT_READ_STATUS = urllib_request.http.client.HTTPResponse._read_status + urllib_request.http.client.HTTPResponse._read_status = nice_to_icy + RUN_ONCE = True class IcyParser(commands.Cog): """Icyparser/Shoutcast stream reader.""" async def red_delete_data_for_user(self, **kwargs): - """ Nothing to delete """ + """Nothing to delete.""" return def __init__(self, bot): self.bot = bot - self.session = aiohttp.ClientSession() - async def _icyparser(self, url: str): + async def _icyparser(self, url: Optional[str]) -> Optional[SimpleNamespace]: + """ + Icecast/Shoutcast metadata reader. + """ + # Catch for any playlist reader functions returning None back to the _icyparser function + if not url: + error = SimpleNamespace(error="That url didn't seem to contain any valid Icecast or Shoutcast links.") + return error + + # Fetch the radio url try: - async with self.session.get(url, headers={"Icy-MetaData": "1"}) as resp: - metaint = int(resp.headers["icy-metaint"]) - for _ in range(5): - await resp.content.readexactly(metaint) - metadata_length = struct.unpack("B", await resp.content.readexactly(1))[0] * 16 - metadata = await resp.content.readexactly(metadata_length) - m = re.search(br"StreamTitle='([^']*)';", metadata.rstrip(b"\0")) - if m: - title = m.group(1) - if title: - title = title.decode("utf-8", errors="replace") + request = urllib_request.Request(url, headers={"Icy-MetaData": 1}) + except ValueError: + error = SimpleNamespace( + error="Make sure you are using a full url formatted like `https://www.site.com/stream.mp3`." + ) + return error + + try: + resp = await self.bot.loop.run_in_executor(None, urllib_request.urlopen, request) + except urllib_error.HTTPError as e: + error = SimpleNamespace( + error=f"There was an HTTP error returned while trying to access that url: {e.code} {e.reason}" + ) + return error + except urllib_error.URLError as e: + error = SimpleNamespace(error=f"There was a timeout while trying to access that url.") + return error + except Exception: + log.error(f"Icyparser encountered an unhandled error while trying to read a stream at {url}", exc_info=True) + error = SimpleNamespace(error=f"There was an unexpected error while trying to fetch that url.") + return error + + if url.endswith(".pls"): + url = await self._pls_reader(resp.readlines()) + return await self._icyparser(url) + + metaint = resp.headers.get("icy-metaint", None) + if not metaint: + error = SimpleNamespace( + error=f"The url provided doesn't seem like an Icecast or Shoutcast direct stream link: couldn't read the metadata length." + ) + return error + + # Metadata reading + try: + for _ in range(5): + resp.read(int(metaint)) + metadata_length = struct.unpack("B", resp.read(1))[0] * 16 + metadata = resp.read(metadata_length).rstrip(b"\0") + m = re.search(br"StreamTitle='([^']*)';", metadata) + if m: + title = m.group(1) + if len(title) > 0: + title = title.decode("utf-8", errors="replace") else: title = None - image = False - t = re.search(br"StreamUrl='([^']*)';", metadata.rstrip(b"\0")) - if t: - streamurl = t.group(1) - if streamurl: - streamurl = streamurl.decode("utf-8", errors="replace") - image_ext = ["webp", "png", "jpg", "gif"] - if streamurl.split(".")[-1] in image_ext: - image = True - else: - streamurl = None + else: + title = None - return title, streamurl, image + image = False + t = re.search(br"StreamUrl='([^']*)';", metadata) + if t: + streamurl = t.group(1) + if streamurl: + streamurl = streamurl.decode("utf-8", errors="replace") + image_ext = ["webp", "png", "jpg", "gif"] + if streamurl.split(".")[-1] in image_ext: + image = True + else: + streamurl = None - except (KeyError, aiohttp.client_exceptions.ClientConnectionError, aiohttp.client_exceptions.ClientResponseError): - return None, None, None + radio_obj = SimpleNamespace(title=title, image=streamurl, resp_headers=resp.headers.items()) + return radio_obj - def cog_unload(self): - self.bot.loop.create_task(self.session.close()) + except Exception: + log.error(f"Icyparser encountered an error while trying to read a stream at {url}", exc_info=True) + return None @commands.guild_only() @commands.command(aliases=["icynp"]) @@ -59,7 +141,9 @@ class IcyParser(commands.Cog): if not url: audiocog = self.bot.get_cog("Audio") if not audiocog: - return await ctx.send("Audio is not loaded.") + return await ctx.send( + "The Audio cog is not loaded. Provide a url with this command instead, to read from an online Icecast or Shoutcast stream." + ) try: player = lavalink.get_player(ctx.guild.id) except KeyError: @@ -68,15 +152,83 @@ class IcyParser(commands.Cog): return await ctx.send("The bot is not playing any music.") if not player.current.is_stream: return await ctx.send("The bot is not playing a stream.") - icy = await self._icyparser(player.current.uri) + async with ctx.typing(): + radio_obj = await self._icyparser(player.current.uri) else: - icy = await self._icyparser(url) - if not icy[0]: + async with ctx.typing(): + radio_obj = await self._icyparser(url) + + if not radio_obj: return await ctx.send( - f"Can't read the stream information for <{player.current.uri if not url else url}>, it may not be an Icecast or Shoutcast radio station or there may be no stream information available." + f"Can't read the stream information for <{player.current.uri if not url else url}>, it may not be an Icecast or Shoutcast " + "radio station or there may be no stream information available.\n" + "This command needs a direct link to a MP3 or AAC encoded stream, or a PLS file that contains MP3 or AAC encoded streams." ) - song = f"**[{icy[0]}]({player.current.uri if not url else url})**\n" + + if hasattr(radio_obj, "error"): + return await ctx.send(radio_obj.error) + + embed_menu_list = [] + + # Now Playing embed + title = radio_obj.title if radio_obj.title is not None else "No stream title availible" + song = f"**[{title}]({player.current.uri if not url else url})**\n" embed = discord.Embed(colour=await ctx.embed_colour(), title="Now Playing", description=song) - if icy[2]: - embed.set_thumbnail(url=icy[1]) - await ctx.send(embed=embed) + + # Set radio image if scraped or provided by the Icy headers + if radio_obj.image: + embed.set_thumbnail(url=radio_obj.image) + else: + icylogo = dict(radio_obj.resp_headers).get("icy-logo", None) + if icylogo: + embed.set_thumbnail(url=icylogo) + + # Set radio description if present + radio_station_description = dict(radio_obj.resp_headers).get("icy-description", None) + if radio_station_description == "Unspecified description": + radio_station_description = None + if radio_station_description: + embed.set_footer(text=radio_station_description) + + embed_menu_list.append(embed) + + # Metadata info embed(s) + stream_info_text = "" + sorted_radio_obj_dict = dict(sorted(radio_obj.resp_headers)) + for k, v in sorted_radio_obj_dict.items(): + v = self._clean_html(v) + stream_info_text += f"**{k}**: {v}\n" + + if len(stream_info_text) > 1950: + for page in pagify(stream_info_text, delims=["\n"], page_length=1950): + info_embed = discord.Embed( + colour=await ctx.embed_colour(), title="Radio Station Metadata", description=page + ) + embed_menu_list.append(info_embed) + else: + info_embed = discord.Embed( + colour=await ctx.embed_colour(), title="Radio Station Metadata", description=stream_info_text + ) + embed_menu_list.append(info_embed) + + await menu(ctx, embed_menu_list, DEFAULT_CONTROLS) + + @staticmethod + def _clean_html(html: str) -> str: + """ + Strip out any html, as subtle as a hammer. + """ + plain_text = re.sub(HTML_CLEANUP, "", html) + return plain_text + + @staticmethod + async def _pls_reader(readlines: List[bytes]) -> Optional[str]: + """ + Helper function for a quick and dirty PLS file read. + """ + for text_line in readlines: + text_line_str = text_line.decode() + if text_line_str.startswith("File1="): + return text_line_str[6:] + + return None