[IcyParser] Switch back to aiohttp and add M3U/8

This commit is contained in:
aikaterna
2022-04-18 17:49:03 -07:00
committed by GitHub
parent 30994d3d23
commit 4cbce0e538

View File

@@ -1,13 +1,15 @@
import aiohttp
from aiohttp.client_proto import ResponseHandler
from aiohttp.http_parser import HttpResponseParserPy
import discord
import functools
import io
import lavalink
from lavalink import get_player, PlayerNotFound
import logging
import struct
import re
from types import SimpleNamespace
from typing import List, Pattern, Optional
import urllib.error as urllib_error
import urllib.request as urllib_request
from typing import List, Pattern, Optional, Union
from redbot.core import commands
from redbot.core.utils.chat_formatting import pagify
@@ -17,32 +19,61 @@ from redbot.core.utils.menus import menu, DEFAULT_CONTROLS
log = logging.getLogger("red.aikaterna.icyparser")
RUN_ONCE: bool = False
HTML_CLEANUP: Pattern = re.compile("<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});")
def nice_to_icy(self):
"""
Converts an Icecast/Shoutcast HTTP v0.9 response of "ICY 200 OK" to "200 OK" thanks to the power of monkeypatching
dingles' answer on:
https://stackoverflow.com/questions/4247248/record-streaming-and-saving-internet-radio-in-python/5465831
"""
class InterceptedHTTPResponse:
pass
line = self.fp.readline().replace(b"ICY 200 OK\r\n", b"HTTP/1.0 200 OK\r\n")
InterceptedSelf = InterceptedHTTPResponse()
InterceptedSelf.fp = io.BufferedReader(io.BytesIO(line))
InterceptedSelf.debuglevel = self.debuglevel
InterceptedSelf._close_conn = self._close_conn
return ORIGINAL_HTTP_CLIENT_READ_STATUS(InterceptedSelf)
# Now utilizing Jack1142's answer for ICY 200 OK -> 200 OK at
# https://stackoverflow.com/questions/4247248/record-streaming-and-saving-internet-radio-in-python/71890980
if not RUN_ONCE:
ORIGINAL_HTTP_CLIENT_READ_STATUS = urllib_request.http.client.HTTPResponse._read_status
urllib_request.http.client.HTTPResponse._read_status = nice_to_icy
RUN_ONCE = True
class ICYHttpResponseParser(HttpResponseParserPy):
def parse_message(self, lines):
if lines[0].startswith(b"ICY "):
lines[0] = b"HTTP/1.0 " + lines[0][4:]
return super().parse_message(lines)
class ICYResponseHandler(ResponseHandler):
def set_response_params(
self,
*,
timer=None,
skip_payload=False,
read_until_eof=False,
auto_decompress=True,
read_timeout=None,
read_bufsize=2 ** 16,
timeout_ceil_threshold=5,
) -> None:
# this is a copy of the implementation from here:
# https://github.com/aio-libs/aiohttp/blob/v3.8.1/aiohttp/client_proto.py#L137-L165
self._skip_payload = skip_payload
self._read_timeout = read_timeout
self._reschedule_timeout()
self._timeout_ceil_threshold = timeout_ceil_threshold
self._parser = ICYHttpResponseParser(
self,
self._loop,
read_bufsize,
timer=timer,
payload_exception=aiohttp.ClientPayloadError,
response_with_body=not skip_payload,
read_until_eof=read_until_eof,
auto_decompress=auto_decompress,
)
if self._tail:
data, self._tail = self._tail, b""
self.data_received(data)
class ICYConnector(aiohttp.TCPConnector):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._factory = functools.partial(ICYResponseHandler, loop=self._loop)
class IcyParser(commands.Cog):
@@ -54,90 +85,23 @@ class IcyParser(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.timeout = aiohttp.ClientTimeout(total=20)
self.session = session = aiohttp.ClientSession(
connector=ICYConnector(), headers={"Icy-MetaData": "1"}, timeout=self.timeout
)
async def _icyparser(self, url: Optional[str]) -> Optional[SimpleNamespace]:
"""
Icecast/Shoutcast metadata reader.
"""
# Catch for any playlist reader functions returning None back to the _icyparser function
if not url:
error = SimpleNamespace(error="That url didn't seem to contain any valid Icecast or Shoutcast links.")
return error
# Fetch the radio url
try:
request = urllib_request.Request(url, headers={"Icy-MetaData": 1})
except ValueError:
error = SimpleNamespace(
error="Make sure you are using a full url formatted like `https://www.site.com/stream.mp3`."
)
return error
try:
resp = await self.bot.loop.run_in_executor(None, urllib_request.urlopen, request)
except urllib_error.HTTPError as e:
error = SimpleNamespace(
error=f"There was an HTTP error returned while trying to access that url: {e.code} {e.reason}"
)
return error
except urllib_error.URLError as e:
error = SimpleNamespace(error=f"There was a timeout while trying to access that url.")
return error
except Exception:
log.error(f"Icyparser encountered an unhandled error while trying to read a stream at {url}", exc_info=True)
error = SimpleNamespace(error=f"There was an unexpected error while trying to fetch that url.")
return error
if url.endswith(".pls"):
url = await self._pls_reader(resp.readlines())
return await self._icyparser(url)
metaint = resp.headers.get("icy-metaint", None)
if not metaint:
error = SimpleNamespace(
error=f"The url provided doesn't seem like an Icecast or Shoutcast direct stream link: couldn't read the metadata length."
)
return error
# Metadata reading
try:
for _ in range(5):
resp.read(int(metaint))
metadata_length = struct.unpack("B", resp.read(1))[0] * 16
metadata = resp.read(metadata_length).rstrip(b"\0")
m = re.search(br"StreamTitle='([^']*)';", metadata)
if m:
title = m.group(1)
if len(title) > 0:
title = title.decode("utf-8", errors="replace")
else:
title = None
else:
title = None
image = False
t = re.search(br"StreamUrl='([^']*)';", metadata)
if t:
streamurl = t.group(1)
if streamurl:
streamurl = streamurl.decode("utf-8", errors="replace")
image_ext = ["webp", "png", "jpg", "gif"]
if streamurl.split(".")[-1] in image_ext:
image = True
else:
streamurl = None
radio_obj = SimpleNamespace(title=title, image=streamurl, resp_headers=resp.headers.items())
return radio_obj
except Exception:
log.error(f"Icyparser encountered an error while trying to read a stream at {url}", exc_info=True)
return None
def cog_unload(self):
self.bot.loop.create_task(self.session.close())
@commands.guild_only()
@commands.command(aliases=["icynp"])
async def icyparser(self, ctx, url=None):
"""Show Icecast or Shoutcast stream information, if any."""
"""Show Icecast or Shoutcast stream information, if any.
Supported link formats:
\tDirect links to MP3, AAC, or OGG/Opus encoded Icecast or Shoutcast streams
\tLinks to PLS, M3U, or M3U8 files that contain said stream types
"""
if not url:
audiocog = self.bot.get_cog("Audio")
if not audiocog:
@@ -145,33 +109,26 @@ class IcyParser(commands.Cog):
"The Audio cog is not loaded. Provide a url with this command instead, to read from an online Icecast or Shoutcast stream."
)
try:
player = lavalink.get_player(ctx.guild.id)
except KeyError:
player = get_player(ctx.guild.id)
except PlayerNotFound:
return await ctx.send("The bot is not playing any music.")
if not player.current:
return await ctx.send("The bot is not playing any music.")
if not player.current.is_stream:
return await ctx.send("The bot is not playing a stream.")
async with ctx.typing():
radio_obj = await self._icyparser(player.current.uri)
radio_obj = await self._icyreader(ctx, player.current.uri)
else:
async with ctx.typing():
radio_obj = await self._icyparser(url)
radio_obj = await self._icyreader(ctx, url)
if not radio_obj:
return await ctx.send(
f"Can't read the stream information for <{player.current.uri if not url else url}>, it may not be an Icecast or Shoutcast "
"radio station or there may be no stream information available.\n"
"This command needs a direct link to a MP3 or AAC encoded stream, or a PLS file that contains MP3 or AAC encoded streams."
)
if hasattr(radio_obj, "error"):
return await ctx.send(radio_obj.error)
return
embed_menu_list = []
# Now Playing embed
title = radio_obj.title if radio_obj.title is not None else "No stream title availible"
title = radio_obj.title if radio_obj.title is not None else "No stream title available"
song = f"**[{title}]({player.current.uri if not url else url})**\n"
embed = discord.Embed(colour=await ctx.embed_colour(), title="Now Playing", description=song)
@@ -182,6 +139,10 @@ class IcyParser(commands.Cog):
icylogo = dict(radio_obj.resp_headers).get("icy-logo", None)
if icylogo:
embed.set_thumbnail(url=icylogo)
else:
icyfavicon = dict(radio_obj.resp_headers).get("icy-favicon", None)
if icyfavicon:
embed.set_thumbnail(url=icyfavicon)
# Set radio description if present
radio_station_description = dict(radio_obj.resp_headers).get("icy-description", None)
@@ -213,6 +174,115 @@ class IcyParser(commands.Cog):
await menu(ctx, embed_menu_list, DEFAULT_CONTROLS)
async def _icyreader(self, ctx: commands.Context, url: Optional[str]) -> Optional[SimpleNamespace]:
"""
Icecast/Shoutcast stream reader.
"""
try:
extensions = [".pls", ".m3u", ".m3u8"]
if any(url.endswith(x) for x in extensions):
async with self.session.get(url) as resp:
lines = []
async for line in resp.content:
lines.append(line)
if url.endswith(".pls"):
url = await self._pls_reader(lines)
else:
url = await self._m3u_reader(lines)
if url:
await self._icyreader(ctx, url)
else:
await ctx.send("That url didn't seem to contain any valid Icecast or Shoutcast links.")
return
async with self.session.get(url) as resp:
metaint = await self._metaint_read(ctx, resp)
if metaint:
radio_obj = await self._metadata_read(int(metaint), resp)
return radio_obj
except aiohttp.client_exceptions.InvalidURL:
await ctx.send(f"{url} is not a valid url.")
return None
except aiohttp.client_exceptions.ClientConnectorError:
await ctx.send("The connection failed.")
return None
except aiohttp.client_exceptions.ClientPayloadError as e:
friendly_msg = "The website closed the connection prematurely or the response was malformed.\n"
friendly_msg += f"The error returned was: `{str(e)}`\n"
await ctx.send(friendly_msg)
return None
except asyncio.exceptions.TimeoutError:
await ctx.send("The bot timed out while trying to access that url.")
return None
except aiohttp.client_exceptions.ServerDisconnectedError:
await ctx.send("The target server disconnected early without a response.")
return None
except Exception:
log.error(
f"Icyparser's _icyreader encountered an error while trying to read a stream at {url}", exc_info=True
)
return None
@staticmethod
async def _metaint_read(ctx: commands.Context, resp: aiohttp.client_reqrep.ClientResponse) -> Optional[int]:
"""Fetch the metaint value to know how much of the stream header to read, for metadata."""
metaint = resp.headers.get("icy-metaint", None)
if not metaint:
error_msg = (
"The url provided doesn't seem like an Icecast or Shoutcast direct stream link, "
"or doesn't contain a supported format stream link: couldn't read the metadata length."
)
await ctx.send(error_msg)
return None
try:
metaint = int(metaint)
return metaint
except ValueError:
return None
@staticmethod
async def _metadata_read(metaint: int, resp: aiohttp.client_reqrep.ClientResponse) -> Optional[SimpleNamespace]:
"""Read the metadata at the beginning of the stream chunk."""
try:
for _ in range(5):
await resp.content.readexactly(metaint)
metadata_length = struct.unpack("B", await resp.content.readexactly(1))[0] * 16
metadata = await resp.content.readexactly(metadata_length)
m = re.search(br"StreamTitle='([^']*)';", metadata.rstrip(b"\0"))
if m:
title = m.group(1)
if len(title) > 0:
title = title.decode("utf-8", errors="replace")
else:
title = None
else:
title = None
image = False
t = re.search(br"StreamUrl='([^']*)';", metadata.rstrip(b"\0"))
if t:
streamurl = t.group(1)
if streamurl:
streamurl = streamurl.decode("utf-8", errors="replace")
image_ext = ["webp", "png", "jpg", "gif"]
if streamurl.split(".")[-1] in image_ext:
image = True
else:
streamurl = None
radio_obj = SimpleNamespace(title=title, image=streamurl, resp_headers=resp.headers.items())
return radio_obj
except Exception:
log.error(
f"Icyparser's _metadata_read encountered an error while trying to read a stream at {url}", exc_info=True
)
return None
@staticmethod
def _clean_html(html: str) -> str:
"""
@@ -221,6 +291,24 @@ class IcyParser(commands.Cog):
plain_text = re.sub(HTML_CLEANUP, "", html)
return plain_text
@staticmethod
async def _m3u_reader(readlines: List[bytes]) -> Optional[str]:
"""
Helper function for a quick and dirty M3U or M3U8 file read.
M3U8's will most likely contain .ts files, which are not readable by this cog.
Some M3Us seem to follow the standard M3U format, some only have a bare url in
the file, so let's just return the very first url with an http or https prefix
found, if it's formatted like a real url and not a relative url, and is not a .ts chunk.
"""
for text_line in readlines:
text_line_str = text_line.decode()
if text_line_str.startswith("http"):
if not text_line_str.endswith(".ts"):
return text_line_str
return None
@staticmethod
async def _pls_reader(readlines: List[bytes]) -> Optional[str]:
"""