[RSS] Provisional tag qualification
This commit is contained in:
145
rss/rss.py
145
rss/rss.py
@@ -15,7 +15,7 @@ from types import MappingProxyType, SimpleNamespace
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from redbot.core import checks, commands, Config
|
||||
from redbot.core.utils.chat_formatting import bold, box, escape, pagify
|
||||
from redbot.core.utils.chat_formatting import bold, box, escape, humanize_list, pagify
|
||||
|
||||
from .color import Color
|
||||
from .quiet_template import QuietTemplate
|
||||
@@ -25,7 +25,7 @@ from .tag_type import INTERNAL_TAGS, VALID_IMAGES, TagType
|
||||
log = logging.getLogger("red.aikaterna.rss")
|
||||
|
||||
|
||||
__version__ = "1.1.22"
|
||||
__version__ = "1.2.0"
|
||||
|
||||
|
||||
class RSS(commands.Cog):
|
||||
@@ -147,6 +147,9 @@ class RSS(commands.Cog):
|
||||
rss_object[f"{tag_name}_plaintext"] = self._add_generic_html_plaintext(soup)
|
||||
|
||||
if tag_content_check == TagType.LIST:
|
||||
tags_list = []
|
||||
tags_content_counter = 0
|
||||
|
||||
for list_item in tag_content:
|
||||
list_item_check = await self._get_tag_content_type(list_item)
|
||||
|
||||
@@ -171,18 +174,34 @@ class RSS(commands.Cog):
|
||||
except (KeyError, TypeError):
|
||||
pass
|
||||
|
||||
# common "author" tag format
|
||||
list_dict_content_counter = 0
|
||||
if list_item_check == TagType.DICT:
|
||||
list_tags = ["name"]
|
||||
for tag in list_tags:
|
||||
try:
|
||||
list_dict_content_counter += 1
|
||||
name = f"{tag_name}_plaintext{str(list_dict_content_counter).zfill(2)}"
|
||||
rss_object[name] = list_item[tag]
|
||||
rss_object["is_special"].append(name)
|
||||
except (KeyError, TypeError):
|
||||
pass
|
||||
authors_content_counter = 0
|
||||
|
||||
# common "authors" tag format
|
||||
try:
|
||||
authors_content_counter += 1
|
||||
name = f"{tag_name}_plaintext{str(authors_content_counter).zfill(2)}"
|
||||
rss_object[name] = list_item["name"]
|
||||
rss_object["is_special"].append(name)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# common "tags" tag format
|
||||
try:
|
||||
tag = list_item["term"]
|
||||
tags_content_counter += 1
|
||||
name = f"{tag_name}_plaintext{str(tags_content_counter).zfill(2)}"
|
||||
rss_object[name] = tag
|
||||
rss_object["is_special"].append(name)
|
||||
tags_list.append(tag)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
if len(tags_list) > 0:
|
||||
rss_object["tags_list"] = tags_list
|
||||
rss_object["tags_plaintext_list"] = humanize_list(tags_list)
|
||||
rss_object["is_special"].append("tags_list")
|
||||
rss_object["is_special"].append("tags_plaintext_list")
|
||||
|
||||
# if media_thumbnail or media_content exists, return the first friendly url
|
||||
try:
|
||||
@@ -383,6 +402,14 @@ class RSS(commands.Cog):
|
||||
return int(entry_time)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
async def _title_case(phrase: str):
|
||||
exceptions = ["a", "and", "in", "of", "or", "on", "the"]
|
||||
lowercase_words = re.split(" ", phrase.lower())
|
||||
final_words = [lowercase_words[0].capitalize()]
|
||||
final_words += [word if word in exceptions else word.capitalize() for word in lowercase_words[1:]]
|
||||
return " ".join(final_words)
|
||||
|
||||
async def _update_last_scraped(
|
||||
self,
|
||||
channel: discord.TextChannel,
|
||||
@@ -768,10 +795,89 @@ class RSS(commands.Cog):
|
||||
else f"[X] Embed hex color:{space*6}{hex_color} ({color_name})"
|
||||
)
|
||||
|
||||
allowed_tags = rss_feed.get("allowed_tags", [])
|
||||
if not allowed_tags:
|
||||
tag_msg = "[ ] No restrictions\n\tAll tags are allowed."
|
||||
else:
|
||||
tag_msg = "[X] Feed is restricted to posts that include:\n\t"
|
||||
for tag in allowed_tags:
|
||||
tag_msg += f"{await self._title_case(tag)}\n\t"
|
||||
|
||||
embed_settings = f"{embed_toggle}\n{embed_color}\n{embed_image}\n{embed_thumbnail}"
|
||||
rss_template = rss_feed["template"].replace("\n", "\\n").replace("\t", "\\t")
|
||||
|
||||
await ctx.send(f"Template for {bold(feed_name)}:\n\n`{rss_template}`\n{box(embed_settings, lang='ini')}")
|
||||
await ctx.send(f"Template for {bold(feed_name)}:\n\n`{rss_template}`\n\n{box(embed_settings, lang='ini')}\n{box(tag_msg, lang='ini')}")
|
||||
|
||||
@rss.group(name="tag")
|
||||
async def _rss_tag(self, ctx):
|
||||
"""RSS post tag qualification."""
|
||||
pass
|
||||
|
||||
@_rss_tag.command(name="allow")
|
||||
async def _rss_tag_allow(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None, *, tag: str = None):
|
||||
"""
|
||||
Set an allowed tag for a feed to be posted. The tag must match exactly (without regard to title casing).
|
||||
No regex or placeholder qualification.
|
||||
|
||||
Tags can be found in `[p]rss listtags` under `$tags` or `$tags_list` (if tags are present in the feed - not all feeds have tags).
|
||||
"""
|
||||
channel = channel or ctx.channel
|
||||
rss_feed = await self.config.channel(channel).feeds.get_raw(feed_name, default=None)
|
||||
if not rss_feed:
|
||||
await ctx.send("That feed name doesn't exist in this channel.")
|
||||
return
|
||||
|
||||
async with self.config.channel(channel).feeds() as feed_data:
|
||||
allowed_tags = feed_data[feed_name].get("allowed_tags", [])
|
||||
if tag.lower() in [x.lower() for x in allowed_tags]:
|
||||
return await ctx.send(f"{bold(await self._title_case(tag))} is already in the allowed list for {bold(feed_name)}.")
|
||||
allowed_tags.append(tag.lower())
|
||||
feed_data[feed_name]["allowed_tags"] = allowed_tags
|
||||
|
||||
await ctx.send(f"{bold(await self._title_case(tag))} was added to the list of allowed tags for {bold(feed_name)}. "
|
||||
"If a feed post's `$tags` does not include this value, the feed will not post.")
|
||||
|
||||
@_rss_tag.command(name="allowlist")
|
||||
async def _rss_tag_allowlist(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None):
|
||||
"""
|
||||
List allowed tags for feed post qualification.
|
||||
"""
|
||||
channel = channel or ctx.channel
|
||||
rss_feed = await self.config.channel(channel).feeds.get_raw(feed_name, default=None)
|
||||
if not rss_feed:
|
||||
await ctx.send("That feed name doesn't exist in this channel.")
|
||||
return
|
||||
|
||||
msg = f"[ Allowed Tags for {feed_name} ]\n\n\t"
|
||||
allowed_tags = rss_feed.get("allowed_tags", [])
|
||||
if not allowed_tags:
|
||||
msg += "All tags are allowed."
|
||||
else:
|
||||
for tag in allowed_tags:
|
||||
msg += f"{await self._title_case(tag)}\n"
|
||||
|
||||
await ctx.send(box(msg, lang="ini"))
|
||||
|
||||
@_rss_tag.command(name="remove", aliases=["delete"])
|
||||
async def _rss_tag_remove(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None, *, tag: str = None):
|
||||
"""
|
||||
Remove a tag from the allow list. The tag must match exactly (without regard to title casing).
|
||||
No regex or placeholder qualification.
|
||||
"""
|
||||
channel = channel or ctx.channel
|
||||
rss_feed = await self.config.channel(channel).feeds.get_raw(feed_name, default=None)
|
||||
if not rss_feed:
|
||||
await ctx.send("That feed name doesn't exist in this channel.")
|
||||
return
|
||||
|
||||
async with self.config.channel(channel).feeds() as feed_data:
|
||||
allowed_tags = feed_data[feed_name].get("allowed_tags", [])
|
||||
try:
|
||||
allowed_tags.remove(tag.lower())
|
||||
feed_data[feed_name]["allowed_tags"] = allowed_tags
|
||||
await ctx.send(f"{bold(await self._title_case(tag))} was removed from the list of allowed tags for {bold(feed_name)}.")
|
||||
except ValueError:
|
||||
await ctx.send(f"{bold(await self._title_case(tag))} was not found in the allow list for {bold(feed_name)}.")
|
||||
|
||||
@rss.command(name="template")
|
||||
async def _rss_template(self, ctx, feed_name: str, channel: Optional[discord.TextChannel] = None, *, template: str = None):
|
||||
@@ -890,6 +996,17 @@ class RSS(commands.Cog):
|
||||
log.debug(f"No entries found for feed {name} on cid {channel.id}")
|
||||
return
|
||||
|
||||
# allowed tag verification section
|
||||
allowed_tags = rss_feed.get("allowed_tags", [])
|
||||
if len(allowed_tags) > 0:
|
||||
allowed_post_tags = [x.lower() for x in allowed_tags]
|
||||
feed_tag_list = [x.lower() for x in feedparser_plus_obj.get("tags_list", [])]
|
||||
intersection = list(set(feed_tag_list).intersection(allowed_post_tags))
|
||||
if len(intersection) == 0:
|
||||
log.debug(f"{name} feed post in {channel.name} ({channel.id}) was denied because of an allowed tag mismatch.")
|
||||
continue
|
||||
|
||||
# starting to fill out the template for feeds that passed tag verification (if present)
|
||||
to_fill = QuietTemplate(template)
|
||||
message = to_fill.quiet_safe_substitute(name=bold(name), **feedparser_plus_obj)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user