550 lines
23 KiB
Python
550 lines
23 KiB
Python
import asyncio
|
|
import copy
|
|
import datetime
|
|
import logging
|
|
from typing import Type, Dict, List
|
|
|
|
import aiohttp
|
|
from interactions import (
|
|
Extension,
|
|
SlashContext,
|
|
component_callback,
|
|
ComponentContext,
|
|
StringSelectMenu,
|
|
Message,
|
|
Embed,
|
|
EmbedField,
|
|
is_owner,
|
|
check,
|
|
StringSelectOption,
|
|
)
|
|
from interactions import Task, IntervalTrigger
|
|
from interactions import slash_command, listen
|
|
from interactions.api.events import Component
|
|
from interactions.api.events import Startup
|
|
|
|
from token_bot.controller.alerts import AlertsController
|
|
from token_bot.controller.users import UsersController
|
|
from token_bot.history_manager.history_manager import HistoryManager, History
|
|
from token_bot.persistant_database.alert_category import AlertCategory
|
|
from token_bot.persistant_database.alert_schema import Alert
|
|
from token_bot.persistant_database.alert_type import AlertType
|
|
from token_bot.persistant_database.user_schema import User
|
|
from token_bot.token_database import database as tdb
|
|
from token_bot.token_database.flavor import Flavor
|
|
from token_bot.token_database.region import Region
|
|
from token_bot.ui.action_row.tracker import ALERT_TYPE_ROW
|
|
from token_bot.ui.select_menus.alert_menu import HIGH_ALERT_MENU, LOW_ALERT_MENU
|
|
from token_bot.ui.select_menus.flavor_menu import FLAVOR_MENU
|
|
from token_bot.ui.select_menus.region_menu import REGION_MENU
|
|
|
|
|
|
#### Static Helper Functions
|
|
|
|
|
|
async def gather_alerts_by_flavor(alerts: List[Alert]) -> Dict[Flavor, List[Alert]]:
|
|
alerts_by_flavor = {}
|
|
for alert in alerts:
|
|
if alert.flavor not in alerts_by_flavor:
|
|
alerts_by_flavor[alert.flavor] = [alert]
|
|
else:
|
|
alerts_by_flavor[alert.flavor].append(alert)
|
|
return alerts_by_flavor
|
|
|
|
|
|
class Tracker(Extension):
|
|
def __init__(self, bot):
|
|
self._users: UsersController | None = None
|
|
self._alerts: AlertsController | None = None
|
|
self._tdb: tdb.Database | None = None
|
|
self._history_manager: HistoryManager | None = None
|
|
|
|
###################################
|
|
# Task Functions #
|
|
###################################
|
|
|
|
@Task.create(IntervalTrigger(minutes=1))
|
|
async def update_data(self):
|
|
self.bot.logger.log(logging.INFO, "TokenBot Tracker: Updating Price")
|
|
users_alerts: Dict[User, List[Alert]] = {}
|
|
for flavor in Flavor:
|
|
for region in Region:
|
|
alerts = await self._history_manager.update_data(flavor, Region(region))
|
|
for alert in alerts:
|
|
users = await self._alerts.get_users(alert)
|
|
for user in users:
|
|
if user not in users_alerts:
|
|
users_alerts[user] = [alert]
|
|
else:
|
|
users_alerts[user].append(alert)
|
|
if users_alerts:
|
|
self.bot.logger.log(
|
|
logging.INFO, "TokenBot Tracker: Processing User Alerts"
|
|
)
|
|
for user in users_alerts:
|
|
discord_user = await self.bot.fetch_user(user.user_id)
|
|
embeds = [
|
|
Embed(
|
|
title="GoblinBot Tracker Alert Triggered",
|
|
color=0xB10000,
|
|
description=f"Hello, you requested to be sent an alert when the price of the World of Warcraft "
|
|
f"token reaches a certain value.\n\n"
|
|
f"As a reminder, you can remove an alert via ```/remove-alert```\n"
|
|
f"or you can remove all alerts and user data via ```/remove-registration```\n\n",
|
|
)
|
|
]
|
|
alerts_by_flavor = await gather_alerts_by_flavor(users_alerts[user])
|
|
for flavor in alerts_by_flavor:
|
|
embeds.append(
|
|
await self._render_alert_flavor(
|
|
alerts_by_flavor[flavor], user=user
|
|
)
|
|
)
|
|
|
|
await discord_user.send(embeds=embeds)
|
|
self.bot.logger.log(
|
|
logging.INFO, "TokenBot Tracker: Done Processing User Alerts"
|
|
)
|
|
|
|
###################################
|
|
# Slash Commands #
|
|
###################################
|
|
|
|
@listen(Startup)
|
|
async def on_start(self):
|
|
self.bot.logger.log(logging.INFO, "TokenBot Tracker: Initializing")
|
|
self._users = UsersController(aiohttp.ClientSession())
|
|
self._alerts = AlertsController(aiohttp.ClientSession())
|
|
self._tdb = tdb.Database(aiohttp.ClientSession())
|
|
self._history_manager = HistoryManager(self._tdb)
|
|
self.bot.logger.log(logging.INFO, "TokenBot Tracker: Initialized")
|
|
self.bot.logger.log(logging.INFO, "TokenBot Tracker: Loading Historical Data")
|
|
await self._history_manager.load_data()
|
|
self.bot.logger.log(
|
|
logging.INFO, "TokenBot Tracker: Loading Historical Data Finished"
|
|
)
|
|
self.bot.logger.log(logging.INFO, "TokenBot Tracker: Started")
|
|
self.update_data.start()
|
|
|
|
@slash_command(
|
|
name="register",
|
|
description="Register with a new GoblinBot Region for alerts on token price changes.",
|
|
)
|
|
async def register(self, ctx: SlashContext):
|
|
text = (
|
|
"## Select a region to register with \n\n"
|
|
"Please note: \n"
|
|
"* You can only be registered with one region at a time \n"
|
|
"* Changing your region will remove all previous alerts you have signed up for \n"
|
|
"* You can remove all alerts and user data using ```/remove-registration```"
|
|
)
|
|
menu = copy.deepcopy(REGION_MENU)
|
|
await ctx.send(text, components=menu, ephemeral=True)
|
|
|
|
@slash_command(
|
|
name="remove-registration",
|
|
description="Remove all alerts and registration from GoblinBot",
|
|
)
|
|
async def remove_registration(self, ctx: SlashContext):
|
|
if await self._users.exists(ctx.user.id):
|
|
user = await self._users.get(ctx.user.id)
|
|
for alert in user.subscribed_alerts:
|
|
await self._alerts.remove_user(alert, user)
|
|
await self._users.delete(ctx.user.id)
|
|
|
|
await ctx.send("All alert subscriptions and user data deleted", ephemeral=True)
|
|
|
|
@slash_command(
|
|
name="exists", description="Check if you are registered with GoblinBot" ""
|
|
)
|
|
@check(is_owner())
|
|
async def exists(self, ctx: SlashContext):
|
|
await ctx.send(str(await self._users.exists(ctx.user.id)), ephemeral=True)
|
|
|
|
@slash_command(description="The current retail token cost")
|
|
async def current(self, ctx: SlashContext):
|
|
current_str = await self.get_current_token(ctx, tdb.Flavor.RETAIL)
|
|
await ctx.send(current_str, ephemeral=True)
|
|
|
|
@slash_command(description="The current classic token cost")
|
|
async def current_classic(self, ctx: SlashContext):
|
|
current_str = await self.get_current_token(ctx, tdb.Flavor.CLASSIC)
|
|
await ctx.send(current_str, ephemeral=True)
|
|
|
|
@slash_command(name="add-alert", description="Add an alert listener")
|
|
async def add_alert(self, ctx: SlashContext):
|
|
if not await self._users.exists(ctx.user.id):
|
|
try:
|
|
await self.region_select_menu(ctx)
|
|
except TimeoutError:
|
|
return
|
|
|
|
user = await self._users.get(ctx.user.id)
|
|
|
|
try:
|
|
flavor = await self.flavor_select_menu(ctx)
|
|
alert_category = await self.alert_category_select_menu(ctx)
|
|
match alert_category:
|
|
case AlertCategory.LOW:
|
|
alert_type = await self.low_alert_select_menu(ctx)
|
|
case AlertCategory.HIGH:
|
|
alert_type = await self.high_alert_select_menu(ctx)
|
|
case _:
|
|
raise NotImplementedError
|
|
|
|
except TimeoutError:
|
|
return
|
|
|
|
else:
|
|
alert = Alert(alert_type, flavor, user.region)
|
|
if not await self._users.is_subscribed(user, alert):
|
|
await asyncio.gather(
|
|
self._users.add_alert(user, alert),
|
|
self._alerts.add_user(alert, user),
|
|
)
|
|
await ctx.send("Successfully added alert", ephemeral=True)
|
|
|
|
else:
|
|
await ctx.send(
|
|
"You are already subscribed to this alert", ephemeral=True
|
|
)
|
|
|
|
@slash_command(
|
|
name="remove-alert", description="Remove an alert you have signed up for"
|
|
)
|
|
async def remove_alert(self, ctx: SlashContext):
|
|
if not await self._user_is_registered(ctx):
|
|
return
|
|
user = await self._users.get(ctx.user.id)
|
|
alerts = await self._users.list_alerts(user)
|
|
if len(alerts) == 0:
|
|
await ctx.send("You do not have any alerts registered", ephemeral=True)
|
|
return
|
|
|
|
try:
|
|
alert = await self.remove_alert_select_menu(ctx, user)
|
|
except TimeoutError:
|
|
return
|
|
else:
|
|
await asyncio.gather(
|
|
self._users.remove_alert(user, alert),
|
|
self._alerts.remove_user(alert, user),
|
|
)
|
|
await ctx.send("Successfully removed alert", ephemeral=True)
|
|
|
|
@slash_command(
|
|
name="list-alerts", description="List all alerts you have signed up for"
|
|
)
|
|
async def list_alerts(self, ctx: SlashContext):
|
|
if not await self._user_is_registered(ctx):
|
|
return
|
|
user = await self._users.get(ctx.user.id)
|
|
alerts = await self._users.list_alerts(user)
|
|
if len(alerts) == 0:
|
|
await ctx.send("You do not have any alerts registered", ephemeral=True)
|
|
return
|
|
alerts_str = f"You have {len(alerts)} out of 25 maximum alerts registered"
|
|
embeds = [
|
|
Embed(
|
|
title="List of GoblinBot Tracker Alerts",
|
|
color=0x0000B1,
|
|
description=alerts_str,
|
|
)
|
|
]
|
|
alerts_by_flavor = await gather_alerts_by_flavor(alerts)
|
|
for flavor in alerts_by_flavor:
|
|
embeds.append(
|
|
await self._render_alert_flavor(alerts_by_flavor[flavor], user=user)
|
|
)
|
|
await ctx.send(embeds=embeds, ephemeral=True)
|
|
|
|
###################################
|
|
# Callbacks Commands #
|
|
###################################
|
|
|
|
@component_callback("flavor_menu")
|
|
async def flavor_menu(self, ctx: ComponentContext):
|
|
await ctx.send(f"Selected Flavor: {ctx.values[0]}", ephemeral=True)
|
|
|
|
@component_callback("high_alert_menu")
|
|
async def alert_menu(self, ctx: ComponentContext):
|
|
await ctx.send(f"Selected Alert: {ctx.values[0]}", ephemeral=True)
|
|
|
|
@component_callback("low_alert_menu")
|
|
async def alert_menu(self, ctx: ComponentContext):
|
|
await ctx.send(f"Selected Alert: {ctx.values[0]}", ephemeral=True)
|
|
|
|
@component_callback("remove_alert_menu")
|
|
async def remove_alert_menu(self, ctx: ComponentContext):
|
|
await ctx.send(
|
|
f"You have selected to remove the following alert: {ctx.values[0].title()}",
|
|
ephemeral=True,
|
|
)
|
|
|
|
@component_callback("region_menu")
|
|
async def region_menu_cb(self, ctx: ComponentContext):
|
|
discord_user = await self.bot.fetch_user(ctx.user.id)
|
|
await discord_user.send(
|
|
"You have successfully registered your region with GoblinBot!\n"
|
|
"Most interactions will happen in direct messages with GoblinBot here.\n"
|
|
"You can remove your user data and alerts at any time using ```/remove-registration```\n"
|
|
)
|
|
await ctx.defer(edit_origin=True, suppress_error=True)
|
|
|
|
@component_callback("high_alert_button")
|
|
async def high_alert_button(self, ctx: ComponentContext):
|
|
await ctx.send("You selected to add a High Price Alert", ephemeral=True)
|
|
|
|
@component_callback("low_alert_button")
|
|
async def low_alert_button(self, ctx: ComponentContext):
|
|
await ctx.send("You selected to add a Low Price Alert", ephemeral=True)
|
|
|
|
@component_callback("custom_alert_button")
|
|
async def custom_alert_button(self, ctx: ComponentContext):
|
|
await ctx.send("You selected to add a Custom Price Alert", ephemeral=True)
|
|
|
|
###################################
|
|
# Helper Functions #
|
|
###################################
|
|
|
|
async def get_current_token(self, ctx: SlashContext, flavor: Flavor) -> str:
|
|
user: User = await self._users.get(ctx.user.id)
|
|
region = user.region.name
|
|
region_history = self._history_manager.get_history(flavor, user.region)
|
|
price_movement_str = format(region_history.last_price_movement, ",")
|
|
if region_history.last_price_movement > 0:
|
|
price_movement_str = f"+{price_movement_str}"
|
|
|
|
return (
|
|
f"Last Price Value for {region}: {format(region_history.last_price_datum[1], ",")}\n"
|
|
f"Last Update Time: <t:{int(region_history.last_price_datum[0].timestamp())}:F> local time\n"
|
|
f"Last Price Movement: {price_movement_str}"
|
|
)
|
|
|
|
async def remove_alert_select_menu(self, ctx: SlashContext, user: User):
|
|
alerts_by_flavor = await gather_alerts_by_flavor(user.subscribed_alerts)
|
|
select_options: List[StringSelectOption] = []
|
|
for flavor in alerts_by_flavor:
|
|
for alert in alerts_by_flavor[flavor]:
|
|
select_options.append(
|
|
StringSelectOption(
|
|
label=f"{alert.flavor.name.lower().title()} {alert.to_human_string()}",
|
|
value=f"{alert.flavor.name.lower()} {alert.to_human_string()}",
|
|
)
|
|
)
|
|
menu = StringSelectMenu(
|
|
select_options,
|
|
placeholder="Select an alert to remove",
|
|
custom_id="remove_alert_menu",
|
|
)
|
|
message = await ctx.send(
|
|
"Select an alert to remove", components=menu, ephemeral=True
|
|
)
|
|
|
|
try:
|
|
alert_component: Component = await self.bot.wait_for_component(
|
|
messages=message, components=menu, timeout=30
|
|
)
|
|
except TimeoutError:
|
|
menu.disabled = True
|
|
await message.edit(context=ctx, components=menu, content="Timed out")
|
|
raise TimeoutError
|
|
else:
|
|
menu.disabled = True
|
|
await message.edit(context=ctx, components=menu)
|
|
selection_split = alert_component.ctx.values[0].split(" ")
|
|
flavor = Flavor[selection_split[0].upper()]
|
|
alert_type = AlertType.from_str(" ".join(selection_split[1:]))
|
|
return Alert(alert_type, flavor, user.region)
|
|
|
|
async def region_select_menu(self, ctx: SlashContext, user: User | None = None):
|
|
region_menu = copy.deepcopy(REGION_MENU)
|
|
region_menu_str = str()
|
|
|
|
if user is None:
|
|
region_menu_str += "You are not currently registered with a region, please select a region to register with.\n"
|
|
region_menu_str += (
|
|
"* You can only be registered with one region at a time.\n"
|
|
"* Registering for a new region will remove your old region's registration.\n"
|
|
)
|
|
region_message = await ctx.send(
|
|
region_menu_str,
|
|
components=region_menu,
|
|
ephemeral=True,
|
|
)
|
|
try:
|
|
region_component = await self.bot.wait_for_component(
|
|
messages=region_message, components=region_menu, timeout=30
|
|
)
|
|
except TimeoutError:
|
|
region_menu.disabled = True
|
|
await region_message.edit(
|
|
context=ctx, components=region_menu, content="Timed out"
|
|
)
|
|
raise TimeoutError
|
|
else:
|
|
region_menu.disabled = True
|
|
region = Region(region_component.ctx.values[0].lower())
|
|
user = User(ctx.user.id, region, subscribed_alerts=[])
|
|
await asyncio.gather(
|
|
self._users.add(user),
|
|
region_message.edit(context=ctx, components=region_menu),
|
|
)
|
|
return region
|
|
|
|
async def flavor_select_menu(self, ctx: SlashContext) -> Type[Flavor]:
|
|
flavor_menu = copy.deepcopy(FLAVOR_MENU)
|
|
|
|
flavor_message = await ctx.send(
|
|
"Select a flavor to add alerts for", components=flavor_menu, ephemeral=True
|
|
)
|
|
try:
|
|
flavor_component: Component = await self.bot.wait_for_component(
|
|
messages=flavor_message, components=flavor_menu, timeout=30
|
|
)
|
|
except TimeoutError:
|
|
flavor_menu.disabled = True
|
|
await flavor_message.edit(
|
|
context=ctx, components=flavor_menu, content="Timed out"
|
|
)
|
|
raise TimeoutError
|
|
else:
|
|
flavor = Flavor[flavor_component.ctx.values[0].upper()]
|
|
flavor_menu.disabled = True
|
|
await flavor_message.edit(context=ctx, components=flavor_menu)
|
|
return flavor
|
|
|
|
async def alert_category_select_menu(self, ctx: SlashContext) -> AlertCategory:
|
|
alert_type_button = copy.deepcopy(ALERT_TYPE_ROW)
|
|
alert_type_message = await ctx.send(
|
|
"Select an alert type to add", components=alert_type_button, ephemeral=True
|
|
)
|
|
try:
|
|
alert_type_component: Component = await self.bot.wait_for_component(
|
|
messages=alert_type_message, components=alert_type_button, timeout=30
|
|
)
|
|
except TimeoutError:
|
|
for button in alert_type_button[0].components:
|
|
button.disabled = True
|
|
await alert_type_message.edit(
|
|
context=ctx, components=alert_type_button, content="Timed out"
|
|
)
|
|
raise TimeoutError
|
|
else:
|
|
alert_type = AlertCategory.from_str(alert_type_component.ctx.custom_id)
|
|
for button in alert_type_button[0].components:
|
|
button.disabled = True
|
|
await alert_type_message.edit(context=ctx, components=alert_type_button)
|
|
return alert_type
|
|
|
|
async def _alert_select_menu_handler(
|
|
self, ctx: SlashContext, menu: StringSelectMenu, message: Message
|
|
) -> AlertType:
|
|
try:
|
|
component: Component = await self.bot.wait_for_component(
|
|
messages=message, components=menu, timeout=30
|
|
)
|
|
except TimeoutError:
|
|
menu.disabled = True
|
|
await message.edit(context=ctx, components=menu, content="Timed out")
|
|
raise TimeoutError
|
|
else:
|
|
menu.disabled = True
|
|
await component.ctx.defer(edit_origin=True, suppress_error=True)
|
|
await message.edit(context=ctx, components=menu)
|
|
return AlertType.from_str(component.ctx.values[0])
|
|
|
|
async def high_alert_select_menu(self, ctx: SlashContext) -> AlertType:
|
|
high_menu = copy.deepcopy(HIGH_ALERT_MENU)
|
|
high_message = await ctx.send(
|
|
"Select a time range to add a High Alert for",
|
|
components=high_menu,
|
|
ephemeral=True,
|
|
)
|
|
return await self._alert_select_menu_handler(ctx, high_menu, high_message)
|
|
|
|
async def low_alert_select_menu(self, ctx: SlashContext) -> AlertType:
|
|
low_menu = copy.deepcopy(LOW_ALERT_MENU)
|
|
low_message = await ctx.send(
|
|
"Select a time range to add a Low Alert for",
|
|
components=low_menu,
|
|
ephemeral=True,
|
|
)
|
|
return await self._alert_select_menu_handler(ctx, low_menu, low_message)
|
|
|
|
async def _user_is_registered(self, ctx: SlashContext) -> bool:
|
|
if not await self._users.exists(ctx.user.id):
|
|
await ctx.send(
|
|
"You are not registered with any region\n"
|
|
"Please add an alert to get started ```/add-alert```",
|
|
ephemeral=True,
|
|
)
|
|
return False
|
|
return True
|
|
|
|
async def _render_alert_flavor(
|
|
self, alerts: List[Alert], user: User | None = None
|
|
) -> Embed:
|
|
region = alerts[0].region
|
|
flavor = alerts[0].flavor
|
|
fields: List[EmbedField] = []
|
|
for alert in alerts:
|
|
history = self._history_manager.get_history(alert.flavor, alert.region)
|
|
trigger = await history.find_update_trigger_from_alert(alert)
|
|
if trigger.last_trigger is not None:
|
|
alert_str = (
|
|
f"Last Alerting Price Value: {format(trigger.last_alerting[1], ",")}\n"
|
|
f"Last Alerting Time: <t:{int(trigger.last_alerting[0].timestamp())}:F> local time\n"
|
|
f"[Link to this Chart]({self._render_token_url(alert)})\n"
|
|
)
|
|
if user is not None and user.user_id == 265678699435655169:
|
|
alert_str += (
|
|
f"\nShowing you some internals since you are the bot owner:\n"
|
|
f"```history.last_price_datum:\n"
|
|
f"\t{history.last_price_datum[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
|
f"\t{history.last_price_datum[1]}\n"
|
|
f"trigger.last_alerting:\n"
|
|
f"\t{trigger.last_alerting[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
|
f"\t{trigger.last_alerting[1]}\n"
|
|
f"trigger.last_trigger:\n"
|
|
f"\t{trigger.last_trigger[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
|
f"\t{trigger.last_trigger[1]}\n"
|
|
f"trigger.squelched:\n\t{trigger.squelched}```"
|
|
)
|
|
else:
|
|
alert_str = "You should only be seeing this if the bot has not finished importing history at startup."
|
|
fields.append(
|
|
EmbedField(
|
|
name=f"{alert.to_human_string()} Alert",
|
|
value=alert_str,
|
|
inline=False,
|
|
)
|
|
)
|
|
embed = Embed(
|
|
title=f"Alerts for {region.name} {flavor.name.lower().title()}",
|
|
color=0xB10000,
|
|
fields=fields,
|
|
)
|
|
return embed
|
|
|
|
def _render_token_url(self, alert: Alert) -> str:
|
|
match alert.flavor:
|
|
case Flavor.CLASSIC:
|
|
url = "https://classic.wowtoken.app/?"
|
|
case Flavor.RETAIL:
|
|
url = "https://wowtoken.app/?"
|
|
case _:
|
|
raise NotImplementedError
|
|
url += f"region={alert.region.value}&"
|
|
match alert.alert_type:
|
|
case AlertType.WEEKLY_LOW | AlertType.WEEKLY_HIGH:
|
|
url += "time=168h&"
|
|
case AlertType.MONTHLY_LOW | AlertType.MONTHLY_HIGH:
|
|
url += "time=720h&"
|
|
case AlertType.YEARLY_LOW | AlertType.YEARLY_HIGH:
|
|
url += "time=1y&"
|
|
case AlertType.ALL_TIME_LOW | AlertType.ALL_TIME_HIGH:
|
|
url += "time=all&"
|
|
|
|
return url
|