[RSS] added support for targeted channels (#158)

* rss support for targeted channels

* process review

* remove style and revert black

* Check for permissions

Co-authored-by: aikaterna <20862007+aikaterna@users.noreply.github.com>
This commit is contained in:
Jyu Viole Grace
2020-09-28 02:24:02 +05:30
committed by GitHub
parent c4492abbe8
commit 2bfd00236a

View File

@@ -10,6 +10,7 @@ import io
import logging
import re
import time
from typing import Optional
from types import MappingProxyType, SimpleNamespace
from urllib.parse import urlparse
@@ -24,7 +25,7 @@ from .tag_type import INTERNAL_TAGS, VALID_IMAGES, TagType
log = logging.getLogger("red.aikaterna.rss")
__version__ = "1.1.9"
__version__ = "1.1.10"
class RSS(commands.Cog):
@@ -61,9 +62,9 @@ class RSS(commands.Cog):
rss_object["is_special"].append(tag_name)
return rss_object
async def _add_feed(self, ctx, feed_name: str, url: str):
async def _add_feed(self, ctx, feed_name: str, channel: discord.TextChannel, url: str):
"""Helper for rss add."""
rss_exists = await self._check_feed_existing(ctx, feed_name)
rss_exists = await self._check_feed_existing(ctx, feed_name, channel)
if not rss_exists:
feedparser_obj = await self._fetch_feedparser_object(url)
if not feedparser_obj:
@@ -72,15 +73,16 @@ class RSS(commands.Cog):
feedparser_plus_obj = await self._add_to_feedparser_object(feedparser_obj[0], url)
rss_object = await self._convert_feedparser_to_rssfeed(feed_name, feedparser_plus_obj, url)
async with self.config.channel(ctx.channel).feeds() as feed_data:
async with self.config.channel(channel).feeds() as feed_data:
feed_data[feed_name] = rss_object.to_json()
msg = (
f'Feed "{feed_name}" added. List the template tags with `{ctx.prefix}rss listtags` '
f"Feed `{feed_name}` added in channel: {channel.mention}\n"
f"List the template tags with `{ctx.prefix}rss listtags` "
f"and modify the template using `{ctx.prefix}rss template`."
)
await ctx.send(msg)
else:
await ctx.send(f"There is already an existing feed named {bold(feed_name)}.")
await ctx.send(f"There is already an existing feed named {bold(feed_name)} in {channel.mention}.")
return
def _add_generic_html_plaintext(self, bs4_soup: BeautifulSoup):
@@ -161,29 +163,48 @@ class RSS(commands.Cog):
return rss_object
async def _check_feed_existing(self, ctx, feed_name: str):
async def _check_channel_permissions(self, ctx, channel: discord.TextChannel, addl_send_messages_check=True):
"""Helper for rss functions."""
rss_feed = await self.config.channel(ctx.channel).feeds.get_raw(feed_name, default=None)
if not channel.permissions_for(ctx.me).read_messages:
await ctx.send("I don't have permissions to read that channel.")
return False
elif not channel.permissions_for(ctx.author).read_messages:
await ctx.send("You don't have permissions to read that channel.")
return False
elif addl_send_messages_check:
# check for send messages perm if needed, like on an rss add
# not needed on something like rss delete
if not channel.permissions_for(ctx.me).send_messages:
await ctx.send("I don't have permissions to send messages in that channel.")
return False
else:
return True
else:
return True
async def _check_feed_existing(self, ctx, feed_name: str, channel: discord.TextChannel):
"""Helper for rss functions."""
rss_feed = await self.config.channel(channel).feeds.get_raw(feed_name, default=None)
if not rss_feed:
return False
return True
async def _delete_feed(self, ctx, feed_name: str):
async def _delete_feed(self, ctx, feed_name: str, channel: discord.TextChannel):
"""Helper for rss delete."""
rss_exists = await self._check_feed_existing(ctx, feed_name)
rss_exists = await self._check_feed_existing(ctx, feed_name, channel)
if rss_exists:
async with self.config.channel(ctx.channel).feeds() as rss_data:
async with self.config.channel(channel).feeds() as rss_data:
rss_data.pop(feed_name, None)
return True
return False
async def _edit_template(self, ctx, feed_name: str, template: str):
async def _edit_template(self, ctx, feed_name: str, channel: discord.TextChannel, template: str):
"""Helper for rss template."""
rss_exists = await self._check_feed_existing(ctx, feed_name)
rss_exists = await self._check_feed_existing(ctx, feed_name, channel)
if rss_exists:
async with self.config.channel(ctx.channel).feeds.all() as feed_data:
async with self.config.channel(channel).feeds.all() as feed_data:
if feed_name not in feed_data:
feed_data[feed_name] = {}
feed_data[feed_name]["template"] = template
@@ -316,8 +337,8 @@ class RSS(commands.Cog):
"""Helper for rss add."""
try:
result = urlparse(url)
except:
log.debug(f"failed to resolve {url}")
except Exception as e:
log.exception(e, exc_info=e)
return False
if all([result.scheme, result.netloc, result.path]):
@@ -359,13 +380,27 @@ class RSS(commands.Cog):
pass
@rss.command(name="add")
async def _rss_add(self, ctx, feed_name: str, url: str):
"""Add an RSS feed to the current channel."""
async def _rss_add(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None, url: str = None):
"""
Add an RSS feed to a channel.
Defaults to the current channel if no channel is specified.
"""
no_url = "Invalid or unavailable URL."
if not url:
await ctx.send(no_url)
return
channel = channel or ctx.channel
channel_permission_check = await self._check_channel_permissions(ctx, channel)
if not channel_permission_check:
return
valid_url = await self._valid_url(url)
if valid_url:
await self._add_feed(ctx, feed_name.lower(), url)
await self._add_feed(ctx, feed_name.lower(), channel, url)
else:
await ctx.send("Invalid or unavailable URL.")
await ctx.send(no_url)
@rss.group(name="embed")
async def _rss_embed(self, ctx):
@@ -373,7 +408,7 @@ class RSS(commands.Cog):
pass
@_rss_embed.command(name="color", aliases=["colour"])
async def _rss_embed_color(self, ctx, feed_name: str, *, color: str):
async def _rss_embed_color(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None, *, color: str = None):
"""
Set an embed color for a feed.
@@ -381,7 +416,8 @@ class RSS(commands.Cog):
`color` must be a hex code like #990000, a [Discord color name](https://discordpy.readthedocs.io/en/latest/api.html#colour),
or a [CSS3 color name](https://www.w3.org/TR/2018/REC-css-color-3-20180619/#svg-color).
"""
rss_feed = await self.config.channel(ctx.channel).feeds.get_raw(feed_name, default=None)
channel = channel or ctx.channel
rss_feed = await self.config.channel(channel).feeds.get_raw(feed_name, default=None)
if not rss_feed:
await ctx.send("That feed name doesn't exist in this channel.")
return
@@ -395,7 +431,7 @@ class RSS(commands.Cog):
)
if not color:
async with self.config.channel(ctx.channel).feeds() as feed_data:
async with self.config.channel(channel).feeds() as feed_data:
feed_data[feed_name]["embed_color"] = None
await ctx.send(
f"{embed_state_message}The color for {bold(feed_name)} has been reset. "
@@ -421,21 +457,22 @@ class RSS(commands.Cog):
if hex_code == "0xFFFFFF":
hex_code = "0xFFFFFE"
async with self.config.channel(ctx.channel).feeds() as feed_data:
async with self.config.channel(channel).feeds() as feed_data:
# data is always a 0xFFFFFF style value
feed_data[feed_name]["embed_color"] = hex_code
await ctx.send(f"Embed color for {bold(feed_name)} set to {user_facing_hex} ({color_name}).")
@_rss_embed.command(name="image")
async def _rss_embed_image(self, ctx, feed_name: str, image_tag_name: str = None):
async def _rss_embed_image(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None, image_tag_name: str = None):
"""
Set a tag to be a large embed image.
This image will be applied to the last embed in the paginated list.
Use this command with no image_tag_name to clear the embed image.
"""
rss_feed = await self.config.channel(ctx.channel).feeds.get_raw(feed_name, default=None)
channel = channel or ctx.channel
rss_feed = await self.config.channel(channel).feeds.get_raw(feed_name, default=None)
if not rss_feed:
await ctx.send("That feed name doesn't exist in this channel.")
return
@@ -452,7 +489,7 @@ class RSS(commands.Cog):
if image_tag_name.startswith("$"):
image_tag_name = image_tag_name.strip("$")
async with self.config.channel(ctx.channel).feeds() as feed_data:
async with self.config.channel(channel).feeds() as feed_data:
feed_data[feed_name]["embed_image"] = image_tag_name
if image_tag_name:
@@ -463,14 +500,15 @@ class RSS(commands.Cog):
)
@_rss_embed.command(name="thumbnail")
async def _rss_embed_thumbnail(self, ctx, feed_name: str, thumbnail_tag_name: str = None):
async def _rss_embed_thumbnail(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None, thumbnail_tag_name: str = None):
"""
Set a tag to be a thumbnail image.
This thumbnail will be applied to the first embed in the paginated list.
Use this command with no thumbnail_tag_name to clear the embed thumbnail.
"""
rss_feed = await self.config.channel(ctx.channel).feeds.get_raw(feed_name, default=None)
channel = channel or ctx.channel
rss_feed = await self.config.channel(channel).feeds.get_raw(feed_name, default=None)
if not rss_feed:
await ctx.send("That feed name doesn't exist in this channel.")
return
@@ -487,7 +525,7 @@ class RSS(commands.Cog):
if thumbnail_tag_name.startswith("$"):
thumbnail_tag_name = thumbnail_tag_name.strip("$")
async with self.config.channel(ctx.channel).feeds() as feed_data:
async with self.config.channel(channel).feeds() as feed_data:
feed_data[feed_name]["embed_thumbnail"] = thumbnail_tag_name
if thumbnail_tag_name:
@@ -499,15 +537,16 @@ class RSS(commands.Cog):
)
@_rss_embed.command(name="toggle")
async def _rss_embed_toggle(self, ctx, feed_name: str):
async def _rss_embed_toggle(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None):
"""
Toggle whether a feed is sent in an embed or not.
If the bot doesn't have permissions to post embeds,
the feed will always be plain text, even if the embed
toggle is set.
"""
rss_feed = await self.config.channel(ctx.channel).feeds.get_raw(feed_name, default=None)
channel = channel or ctx.channel
rss_feed = await self.config.channel(channel).feeds.get_raw(feed_name, default=None)
if not rss_feed:
await ctx.send("That feed name doesn't exist in this channel.")
return
@@ -515,33 +554,41 @@ class RSS(commands.Cog):
embed_toggle = rss_feed["embed"]
toggle_text = "disabled" if embed_toggle else "enabled"
async with self.config.channel(ctx.channel).feeds() as feed_data:
async with self.config.channel(channel).feeds() as feed_data:
feed_data[feed_name]["embed"] = not embed_toggle
await ctx.send(f"Embeds for {bold(feed_name)} are {toggle_text}.")
@rss.command(name="force")
async def _rss_force(self, ctx, feed_name: str):
async def _rss_force(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None):
"""Forces a feed alert."""
channel = channel or ctx.channel
channel_permission_check = await self._check_channel_permissions(ctx, channel)
if not channel_permission_check:
return
feeds = await self.config.all_channels()
try:
feeds[ctx.channel.id]
feeds[channel.id]
except KeyError:
await ctx.send("There are no feeds in this channel.")
return
if feed_name not in feeds[ctx.channel.id]["feeds"]:
if feed_name not in feeds[channel.id]["feeds"]:
await ctx.send("That feed name doesn't exist in this channel.")
return
rss_feed = feeds[ctx.channel.id]["feeds"][feed_name]
await self.get_current_feed(ctx.channel, feed_name, rss_feed, force=True)
rss_feed = feeds[channel.id]["feeds"][feed_name]
await self.get_current_feed(channel, feed_name, rss_feed, force=True)
@rss.command(name="list")
async def _rss_list(self, ctx, channel: discord.TextChannel = None):
"""List currently available feeds for this channel, or a specific channel."""
if not channel:
channel = ctx.channel
"""List saved feeds for this channel or a specific channel."""
channel = channel or ctx.channel
channel_permission_check = await self._check_channel_permissions(ctx, channel)
if not channel_permission_check:
return
feeds = await self._get_feed_names(channel)
msg = f"[ Available Feeds for #{channel.name} ]\n\n\t"
if feeds:
@@ -552,9 +599,14 @@ class RSS(commands.Cog):
await ctx.send(box(page, lang="ini"))
@rss.command(name="listtags")
async def _rss_list_tags(self, ctx, feed_name: str):
async def _rss_list_tags(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None):
"""List the tags available from a specific feed."""
rss_feed = await self.config.channel(ctx.channel).feeds.get_raw(feed_name, default=None)
channel = channel or ctx.channel
channel_permission_check = await self._check_channel_permissions(ctx, channel)
if not channel_permission_check:
return
rss_feed = await self.config.channel(channel).feeds.get_raw(feed_name, default=None)
if not rss_feed:
await ctx.send("No feed with that name in this channel.")
@@ -594,19 +646,32 @@ class RSS(commands.Cog):
await ctx.send(box(msg, lang="ini"))
@rss.command(name="remove", aliases=["delete", "del"])
async def _rss_remove(self, ctx, name: str):
"""Removes a feed from this channel."""
success = await self._delete_feed(ctx, name)
async def _rss_remove(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None):
"""
Removes a feed from a channel.
Defaults to the current channel if no channel is specified.
"""
channel = channel or ctx.channel
channel_permission_check = await self._check_channel_permissions(ctx, channel, addl_send_messages_check=False)
if not channel_permission_check:
return
success = await self._delete_feed(ctx, feed_name, channel)
if success:
await ctx.send("Feed deleted.")
else:
await ctx.send("Feed not found!")
@rss.command(name="showtemplate")
async def _rss_show_template(self, ctx, feed_name: str):
async def _rss_show_template(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None):
"""Show the template in use for a specific feed."""
rss_feed = await self.config.channel(ctx.channel).feeds.get_raw(feed_name, default=None)
channel = channel or ctx.channel
channel_permission_check = await self._check_channel_permissions(ctx, channel)
if not channel_permission_check:
return
rss_feed = await self.config.channel(channel).feeds.get_raw(feed_name, default=None)
if not rss_feed:
await ctx.send("No feed with that name in this channel.")
return
@@ -639,15 +704,22 @@ class RSS(commands.Cog):
await ctx.send(f"Template for {bold(feed_name)}:\n\n`{rss_template}`\n{box(embed_settings, lang='ini')}")
@rss.command(name="template")
async def _rss_template(self, ctx, feed_name: str, *, template: str):
async def _rss_template(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None, *, template: str = None):
"""
Set a template for the feed alert.
Each variable must start with $, valid variables can be found with `[p]rss listtags`.
"""
channel = channel or ctx.channel
channel_permission_check = await self._check_channel_permissions(ctx, channel)
if not channel_permission_check:
return
if not template:
await ctx.send_help()
return
template = template.replace("\\t", "\t")
template = template.replace("\\n", "\n")
success = await self._edit_template(ctx, feed_name, template)
success = await self._edit_template(ctx, feed_name, channel, template)
if success:
await ctx.send("Template added successfully.")
else:
@@ -703,7 +775,7 @@ class RSS(commands.Cog):
if (last_title != entry.title) and (last_link != entry.link) and (last_time < entry_time):
log.debug(f"New entry found via time validation for feed {name} on cid {channel.id}")
feedparser_plus_obj = await self._add_to_feedparser_object(entry, url)
feedparser_plus_objects.append(feedparser_plus_obj)
feedparser_plus_objects.append(feedparser_plus_obj)
# this is a post that has no time information attached to it and we can only
# verify that the title and link did not match the previously posted entry