From 4cbce0e538d6d1986266b9c0debcd583863fc958 Mon Sep 17 00:00:00 2001 From: aikaterna <20862007+aikaterna@users.noreply.github.com> Date: Mon, 18 Apr 2022 17:49:03 -0700 Subject: [PATCH] [IcyParser] Switch back to aiohttp and add M3U/8 --- icyparser/icyparser.py | 322 ++++++++++++++++++++++++++--------------- 1 file changed, 205 insertions(+), 117 deletions(-) diff --git a/icyparser/icyparser.py b/icyparser/icyparser.py index 5e5f269..d0174ca 100644 --- a/icyparser/icyparser.py +++ b/icyparser/icyparser.py @@ -1,13 +1,15 @@ +import aiohttp +from aiohttp.client_proto import ResponseHandler +from aiohttp.http_parser import HttpResponseParserPy import discord +import functools import io -import lavalink +from lavalink import get_player, PlayerNotFound import logging import struct import re from types import SimpleNamespace -from typing import List, Pattern, Optional -import urllib.error as urllib_error -import urllib.request as urllib_request +from typing import List, Pattern, Optional, Union from redbot.core import commands from redbot.core.utils.chat_formatting import pagify @@ -17,32 +19,61 @@ from redbot.core.utils.menus import menu, DEFAULT_CONTROLS log = logging.getLogger("red.aikaterna.icyparser") -RUN_ONCE: bool = False HTML_CLEANUP: Pattern = re.compile("<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});") -def nice_to_icy(self): - """ - Converts an Icecast/Shoutcast HTTP v0.9 response of "ICY 200 OK" to "200 OK" thanks to the power of monkeypatching - dingles' answer on: - https://stackoverflow.com/questions/4247248/record-streaming-and-saving-internet-radio-in-python/5465831 - """ - - class InterceptedHTTPResponse: - pass - - line = self.fp.readline().replace(b"ICY 200 OK\r\n", b"HTTP/1.0 200 OK\r\n") - InterceptedSelf = InterceptedHTTPResponse() - InterceptedSelf.fp = io.BufferedReader(io.BytesIO(line)) - InterceptedSelf.debuglevel = self.debuglevel - InterceptedSelf._close_conn = self._close_conn - return ORIGINAL_HTTP_CLIENT_READ_STATUS(InterceptedSelf) +# Now utilizing Jack1142's answer for ICY 200 OK -> 200 OK at +# https://stackoverflow.com/questions/4247248/record-streaming-and-saving-internet-radio-in-python/71890980 -if not RUN_ONCE: - ORIGINAL_HTTP_CLIENT_READ_STATUS = urllib_request.http.client.HTTPResponse._read_status - urllib_request.http.client.HTTPResponse._read_status = nice_to_icy - RUN_ONCE = True +class ICYHttpResponseParser(HttpResponseParserPy): + def parse_message(self, lines): + if lines[0].startswith(b"ICY "): + lines[0] = b"HTTP/1.0 " + lines[0][4:] + return super().parse_message(lines) + + +class ICYResponseHandler(ResponseHandler): + def set_response_params( + self, + *, + timer=None, + skip_payload=False, + read_until_eof=False, + auto_decompress=True, + read_timeout=None, + read_bufsize=2 ** 16, + timeout_ceil_threshold=5, + ) -> None: + # this is a copy of the implementation from here: + # https://github.com/aio-libs/aiohttp/blob/v3.8.1/aiohttp/client_proto.py#L137-L165 + self._skip_payload = skip_payload + + self._read_timeout = read_timeout + self._reschedule_timeout() + + self._timeout_ceil_threshold = timeout_ceil_threshold + + self._parser = ICYHttpResponseParser( + self, + self._loop, + read_bufsize, + timer=timer, + payload_exception=aiohttp.ClientPayloadError, + response_with_body=not skip_payload, + read_until_eof=read_until_eof, + auto_decompress=auto_decompress, + ) + + if self._tail: + data, self._tail = self._tail, b"" + self.data_received(data) + + +class ICYConnector(aiohttp.TCPConnector): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._factory = functools.partial(ICYResponseHandler, loop=self._loop) class IcyParser(commands.Cog): @@ -54,90 +85,23 @@ class IcyParser(commands.Cog): def __init__(self, bot): self.bot = bot + self.timeout = aiohttp.ClientTimeout(total=20) + self.session = session = aiohttp.ClientSession( + connector=ICYConnector(), headers={"Icy-MetaData": "1"}, timeout=self.timeout + ) - async def _icyparser(self, url: Optional[str]) -> Optional[SimpleNamespace]: - """ - Icecast/Shoutcast metadata reader. - """ - # Catch for any playlist reader functions returning None back to the _icyparser function - if not url: - error = SimpleNamespace(error="That url didn't seem to contain any valid Icecast or Shoutcast links.") - return error - - # Fetch the radio url - try: - request = urllib_request.Request(url, headers={"Icy-MetaData": 1}) - except ValueError: - error = SimpleNamespace( - error="Make sure you are using a full url formatted like `https://www.site.com/stream.mp3`." - ) - return error - - try: - resp = await self.bot.loop.run_in_executor(None, urllib_request.urlopen, request) - except urllib_error.HTTPError as e: - error = SimpleNamespace( - error=f"There was an HTTP error returned while trying to access that url: {e.code} {e.reason}" - ) - return error - except urllib_error.URLError as e: - error = SimpleNamespace(error=f"There was a timeout while trying to access that url.") - return error - except Exception: - log.error(f"Icyparser encountered an unhandled error while trying to read a stream at {url}", exc_info=True) - error = SimpleNamespace(error=f"There was an unexpected error while trying to fetch that url.") - return error - - if url.endswith(".pls"): - url = await self._pls_reader(resp.readlines()) - return await self._icyparser(url) - - metaint = resp.headers.get("icy-metaint", None) - if not metaint: - error = SimpleNamespace( - error=f"The url provided doesn't seem like an Icecast or Shoutcast direct stream link: couldn't read the metadata length." - ) - return error - - # Metadata reading - try: - for _ in range(5): - resp.read(int(metaint)) - metadata_length = struct.unpack("B", resp.read(1))[0] * 16 - metadata = resp.read(metadata_length).rstrip(b"\0") - m = re.search(br"StreamTitle='([^']*)';", metadata) - if m: - title = m.group(1) - if len(title) > 0: - title = title.decode("utf-8", errors="replace") - else: - title = None - else: - title = None - - image = False - t = re.search(br"StreamUrl='([^']*)';", metadata) - if t: - streamurl = t.group(1) - if streamurl: - streamurl = streamurl.decode("utf-8", errors="replace") - image_ext = ["webp", "png", "jpg", "gif"] - if streamurl.split(".")[-1] in image_ext: - image = True - else: - streamurl = None - - radio_obj = SimpleNamespace(title=title, image=streamurl, resp_headers=resp.headers.items()) - return radio_obj - - except Exception: - log.error(f"Icyparser encountered an error while trying to read a stream at {url}", exc_info=True) - return None + def cog_unload(self): + self.bot.loop.create_task(self.session.close()) @commands.guild_only() @commands.command(aliases=["icynp"]) async def icyparser(self, ctx, url=None): - """Show Icecast or Shoutcast stream information, if any.""" + """Show Icecast or Shoutcast stream information, if any. + + Supported link formats: + \tDirect links to MP3, AAC, or OGG/Opus encoded Icecast or Shoutcast streams + \tLinks to PLS, M3U, or M3U8 files that contain said stream types + """ if not url: audiocog = self.bot.get_cog("Audio") if not audiocog: @@ -145,33 +109,26 @@ class IcyParser(commands.Cog): "The Audio cog is not loaded. Provide a url with this command instead, to read from an online Icecast or Shoutcast stream." ) try: - player = lavalink.get_player(ctx.guild.id) - except KeyError: + player = get_player(ctx.guild.id) + except PlayerNotFound: return await ctx.send("The bot is not playing any music.") if not player.current: return await ctx.send("The bot is not playing any music.") if not player.current.is_stream: return await ctx.send("The bot is not playing a stream.") async with ctx.typing(): - radio_obj = await self._icyparser(player.current.uri) + radio_obj = await self._icyreader(ctx, player.current.uri) else: async with ctx.typing(): - radio_obj = await self._icyparser(url) + radio_obj = await self._icyreader(ctx, url) if not radio_obj: - return await ctx.send( - f"Can't read the stream information for <{player.current.uri if not url else url}>, it may not be an Icecast or Shoutcast " - "radio station or there may be no stream information available.\n" - "This command needs a direct link to a MP3 or AAC encoded stream, or a PLS file that contains MP3 or AAC encoded streams." - ) - - if hasattr(radio_obj, "error"): - return await ctx.send(radio_obj.error) + return embed_menu_list = [] # Now Playing embed - title = radio_obj.title if radio_obj.title is not None else "No stream title availible" + title = radio_obj.title if radio_obj.title is not None else "No stream title available" song = f"**[{title}]({player.current.uri if not url else url})**\n" embed = discord.Embed(colour=await ctx.embed_colour(), title="Now Playing", description=song) @@ -182,6 +139,10 @@ class IcyParser(commands.Cog): icylogo = dict(radio_obj.resp_headers).get("icy-logo", None) if icylogo: embed.set_thumbnail(url=icylogo) + else: + icyfavicon = dict(radio_obj.resp_headers).get("icy-favicon", None) + if icyfavicon: + embed.set_thumbnail(url=icyfavicon) # Set radio description if present radio_station_description = dict(radio_obj.resp_headers).get("icy-description", None) @@ -213,6 +174,115 @@ class IcyParser(commands.Cog): await menu(ctx, embed_menu_list, DEFAULT_CONTROLS) + async def _icyreader(self, ctx: commands.Context, url: Optional[str]) -> Optional[SimpleNamespace]: + """ + Icecast/Shoutcast stream reader. + """ + try: + extensions = [".pls", ".m3u", ".m3u8"] + if any(url.endswith(x) for x in extensions): + async with self.session.get(url) as resp: + lines = [] + async for line in resp.content: + lines.append(line) + + if url.endswith(".pls"): + url = await self._pls_reader(lines) + else: + url = await self._m3u_reader(lines) + + if url: + await self._icyreader(ctx, url) + else: + await ctx.send("That url didn't seem to contain any valid Icecast or Shoutcast links.") + return + + async with self.session.get(url) as resp: + metaint = await self._metaint_read(ctx, resp) + if metaint: + radio_obj = await self._metadata_read(int(metaint), resp) + return radio_obj + + except aiohttp.client_exceptions.InvalidURL: + await ctx.send(f"{url} is not a valid url.") + return None + except aiohttp.client_exceptions.ClientConnectorError: + await ctx.send("The connection failed.") + return None + except aiohttp.client_exceptions.ClientPayloadError as e: + friendly_msg = "The website closed the connection prematurely or the response was malformed.\n" + friendly_msg += f"The error returned was: `{str(e)}`\n" + await ctx.send(friendly_msg) + return None + except asyncio.exceptions.TimeoutError: + await ctx.send("The bot timed out while trying to access that url.") + return None + except aiohttp.client_exceptions.ServerDisconnectedError: + await ctx.send("The target server disconnected early without a response.") + return None + except Exception: + log.error( + f"Icyparser's _icyreader encountered an error while trying to read a stream at {url}", exc_info=True + ) + return None + + @staticmethod + async def _metaint_read(ctx: commands.Context, resp: aiohttp.client_reqrep.ClientResponse) -> Optional[int]: + """Fetch the metaint value to know how much of the stream header to read, for metadata.""" + metaint = resp.headers.get("icy-metaint", None) + if not metaint: + error_msg = ( + "The url provided doesn't seem like an Icecast or Shoutcast direct stream link, " + "or doesn't contain a supported format stream link: couldn't read the metadata length." + ) + await ctx.send(error_msg) + return None + + try: + metaint = int(metaint) + return metaint + except ValueError: + return None + + @staticmethod + async def _metadata_read(metaint: int, resp: aiohttp.client_reqrep.ClientResponse) -> Optional[SimpleNamespace]: + """Read the metadata at the beginning of the stream chunk.""" + try: + for _ in range(5): + await resp.content.readexactly(metaint) + metadata_length = struct.unpack("B", await resp.content.readexactly(1))[0] * 16 + metadata = await resp.content.readexactly(metadata_length) + m = re.search(br"StreamTitle='([^']*)';", metadata.rstrip(b"\0")) + if m: + title = m.group(1) + if len(title) > 0: + title = title.decode("utf-8", errors="replace") + else: + title = None + else: + title = None + + image = False + t = re.search(br"StreamUrl='([^']*)';", metadata.rstrip(b"\0")) + if t: + streamurl = t.group(1) + if streamurl: + streamurl = streamurl.decode("utf-8", errors="replace") + image_ext = ["webp", "png", "jpg", "gif"] + if streamurl.split(".")[-1] in image_ext: + image = True + else: + streamurl = None + + radio_obj = SimpleNamespace(title=title, image=streamurl, resp_headers=resp.headers.items()) + return radio_obj + + except Exception: + log.error( + f"Icyparser's _metadata_read encountered an error while trying to read a stream at {url}", exc_info=True + ) + return None + @staticmethod def _clean_html(html: str) -> str: """ @@ -221,6 +291,24 @@ class IcyParser(commands.Cog): plain_text = re.sub(HTML_CLEANUP, "", html) return plain_text + @staticmethod + async def _m3u_reader(readlines: List[bytes]) -> Optional[str]: + """ + Helper function for a quick and dirty M3U or M3U8 file read. + M3U8's will most likely contain .ts files, which are not readable by this cog. + + Some M3Us seem to follow the standard M3U format, some only have a bare url in + the file, so let's just return the very first url with an http or https prefix + found, if it's formatted like a real url and not a relative url, and is not a .ts chunk. + """ + for text_line in readlines: + text_line_str = text_line.decode() + if text_line_str.startswith("http"): + if not text_line_str.endswith(".ts"): + return text_line_str + + return None + @staticmethod async def _pls_reader(readlines: List[bytes]) -> Optional[str]: """