436 lines
19 KiB
Python
436 lines
19 KiB
Python
import asyncio
|
|
import copy
|
|
import datetime
|
|
import logging
|
|
from typing import Type, Dict, List
|
|
|
|
import aiohttp
|
|
from interactions import Extension, SlashContext, component_callback, \
|
|
ComponentContext, StringSelectMenu, Message, Embed, EmbedField, is_owner, check, StringSelectOption
|
|
from interactions import Task, IntervalTrigger
|
|
from interactions import slash_command, listen
|
|
from interactions.api.events import Component
|
|
from interactions.api.events import Startup
|
|
|
|
from token_bot.controller.alerts import AlertsController
|
|
from token_bot.controller.users import UsersController
|
|
from token_bot.history_manager.history_manager import HistoryManager, History
|
|
from token_bot.persistant_database.alert_category import AlertCategory
|
|
from token_bot.persistant_database.alert_schema import Alert
|
|
from token_bot.persistant_database.alert_type import AlertType
|
|
from token_bot.persistant_database.user_schema import User
|
|
from token_bot.token_database import database as tdb
|
|
from token_bot.token_database.flavor import Flavor
|
|
from token_bot.token_database.region import Region
|
|
from token_bot.ui.action_row.tracker import ALERT_TYPE_ROW
|
|
from token_bot.ui.select_menus.alert_menu import HIGH_ALERT_MENU, LOW_ALERT_MENU
|
|
from token_bot.ui.select_menus.flavor_menu import FLAVOR_MENU
|
|
from token_bot.ui.select_menus.region_menu import REGION_MENU
|
|
|
|
#### Static Helper Functions
|
|
|
|
async def gather_alerts_by_flavor(alerts: List[Alert]) -> Dict[Flavor, List[Alert]]:
|
|
alerts_by_flavor = {}
|
|
for alert in alerts:
|
|
if alert.flavor not in alerts_by_flavor:
|
|
alerts_by_flavor[alert.flavor] = [alert]
|
|
else:
|
|
alerts_by_flavor[alert.flavor].append(alert)
|
|
return alerts_by_flavor
|
|
|
|
|
|
|
|
class Tracker(Extension):
|
|
def __init__(self, bot):
|
|
self._users: UsersController | None = None
|
|
self._alerts: AlertsController | None = None
|
|
self._tdb: tdb.Database | None = None
|
|
self._history_manager: HistoryManager | None = None
|
|
|
|
|
|
###################################
|
|
# Task Functions #
|
|
###################################
|
|
|
|
@Task.create(IntervalTrigger(minutes=1))
|
|
async def update_data(self):
|
|
self.bot.logger.log(logging.INFO, "TokenBot Tracker: Updating Price")
|
|
users_alerts: Dict[User, List[Alert]] = {}
|
|
for flavor in Flavor:
|
|
for region in Region:
|
|
alerts = await self._history_manager.update_data(flavor, Region(region))
|
|
for alert in alerts:
|
|
users = await self._alerts.get_users(alert)
|
|
for user in users:
|
|
if user not in users_alerts:
|
|
users_alerts[user] = [alert]
|
|
else:
|
|
users_alerts[user].append(alert)
|
|
for user in users_alerts:
|
|
discord_user = await self.bot.fetch_user(user.user_id)
|
|
embeds = [Embed(
|
|
title="TokenBot Tracker Alert Triggered",
|
|
color=0xb10000,
|
|
description=f"Hello, you requested to be sent an alert when the price of the World of Warcraft "
|
|
f"token reaches a certain value.\n\n"
|
|
f"As a reminder, you can remove an alert via ```/remove-alert```\n"
|
|
f"or you can remove all registrations via ```/remove-registration```\n\n"
|
|
)]
|
|
alerts_by_flavor = await gather_alerts_by_flavor(users_alerts[user])
|
|
for flavor in alerts_by_flavor:
|
|
embeds.append(await self.render_alert_flavor(alerts_by_flavor[flavor], user=user))
|
|
|
|
await discord_user.send(embeds=embeds)
|
|
|
|
|
|
###################################
|
|
# Slash Commands #
|
|
###################################
|
|
|
|
@listen(Startup)
|
|
async def on_start(self):
|
|
self.bot.logger.log(logging.INFO, "TokenBot Tracker: Initializing")
|
|
self._users = UsersController(aiohttp.ClientSession())
|
|
self._alerts = AlertsController(aiohttp.ClientSession())
|
|
self._tdb = tdb.Database(aiohttp.ClientSession())
|
|
self._history_manager = HistoryManager(self._tdb)
|
|
self.bot.logger.log(logging.INFO, "TokenBot Tracker: Initialized")
|
|
self.bot.logger.log(logging.INFO, "TokenBot Tracker: Loading Historical Data")
|
|
await self._history_manager.load_data()
|
|
self.bot.logger.log(logging.INFO, "TokenBot Tracker: Loading Historical Data Finished")
|
|
self.bot.logger.log(logging.INFO, "TokenBot Tracker: Started")
|
|
self.update_data.start()
|
|
|
|
|
|
@slash_command(
|
|
name="register",
|
|
description="Register with TokenBot for alerts on token price changes."
|
|
)
|
|
async def register(self, ctx: SlashContext):
|
|
text = ("## Select a region to register with \n\n"
|
|
"Please note: \n"
|
|
"* You can only be registered with one region at a time \n"
|
|
"* Changing your region will remove all previous alerts you have signed up for \n"
|
|
"* You can remove all alerts and registration using ```/remove-registration```")
|
|
menu = copy.deepcopy(REGION_MENU)
|
|
await ctx.send(text, components=menu, ephemeral=True)
|
|
|
|
|
|
@slash_command(
|
|
name="remove-registration",
|
|
description="Remove all alerts and registration from TokenBot"
|
|
)
|
|
async def remove_registration(self, ctx: SlashContext):
|
|
if await self._users.exists(ctx.user.id):
|
|
user = await self._users.get(ctx.user.id)
|
|
for alert in user.subscribed_alerts:
|
|
await self._alerts.remove_user(alert, user)
|
|
await self._users.delete(ctx.user.id)
|
|
|
|
await ctx.send("All alert subscriptions and user registration deleted", ephemeral=True)
|
|
|
|
|
|
@slash_command(
|
|
name="exists",
|
|
description="Check if you are registered with TokenBot"
|
|
)
|
|
@check(is_owner())
|
|
async def exists(self, ctx: SlashContext):
|
|
await ctx.send(str(await self._users.exists(ctx.user.id)), ephemeral=True)
|
|
|
|
|
|
@slash_command(
|
|
description="The current retail token cost"
|
|
)
|
|
async def current(self, ctx: SlashContext):
|
|
current_str = await self.get_current_token(ctx, tdb.Flavor.RETAIL)
|
|
await ctx.send(current_str, ephemeral=True)
|
|
|
|
|
|
@slash_command(
|
|
description="The current classic token cost"
|
|
)
|
|
async def current_classic(self, ctx: SlashContext):
|
|
current_str = await self.get_current_token(ctx, tdb.Flavor.CLASSIC)
|
|
await ctx.send(current_str, ephemeral=True)
|
|
|
|
|
|
@slash_command(
|
|
name="add-alert",
|
|
description="List all alerts you have signed up for"
|
|
)
|
|
async def add_alert(self, ctx: SlashContext):
|
|
if not await self._users.exists(ctx.user.id):
|
|
await ctx.send("You are not registered with any region\n"
|
|
"Please register with /register before adding alerts",
|
|
ephemeral=True)
|
|
return
|
|
user = await self._users.get(ctx.user.id)
|
|
|
|
try:
|
|
flavor = await self.flavor_select_menu(ctx)
|
|
alert_category = await self.alert_category_select_menu(ctx)
|
|
match alert_category:
|
|
case AlertCategory.LOW:
|
|
alert_type = await self.low_alert_select_menu(ctx)
|
|
case AlertCategory.HIGH:
|
|
alert_type = await self.high_alert_select_menu(ctx)
|
|
case _:
|
|
raise NotImplementedError
|
|
|
|
except TimeoutError:
|
|
return
|
|
|
|
else:
|
|
alert = Alert(alert_type, flavor, user.region)
|
|
if not await self._users.is_subscribed(user, alert):
|
|
await self._users.add_alert(user, alert)
|
|
await self._alerts.add_user(alert, user)
|
|
await ctx.send("Successfully added alert", ephemeral=True)
|
|
|
|
else:
|
|
await ctx.send("You are already subscribed to this alert", ephemeral=True)
|
|
|
|
|
|
@slash_command(
|
|
name="remove-alert",
|
|
description="Remove an alert you have signed up for"
|
|
)
|
|
async def remove_alert(self, ctx: SlashContext):
|
|
user = await self._users.get(ctx.user.id)
|
|
try:
|
|
alert = await self.remove_alert_select_menu(ctx, user)
|
|
except TimeoutError:
|
|
return
|
|
else:
|
|
await self._users.remove_alert(user, alert)
|
|
await self._alerts.remove_user(alert, user)
|
|
await ctx.send("Successfully removed alert", ephemeral=True)
|
|
|
|
|
|
@slash_command(
|
|
name="list-alerts",
|
|
description="List all alerts you have signed up for"
|
|
)
|
|
async def list_alerts(self, ctx: SlashContext):
|
|
if not await self._users.exists(ctx.user.id):
|
|
await ctx.send("You are not registered with any region\n"
|
|
"Please register with /register before adding alerts",
|
|
ephemeral=True)
|
|
return
|
|
user = await self._users.get(ctx.user.id)
|
|
alerts = await self._users.list_alerts(user)
|
|
if len(alerts) == 0:
|
|
await ctx.send("You do not have any alerts registered", ephemeral=True)
|
|
return
|
|
alerts_str = f"You have {len(alerts)} out of 25 maximum alerts registered"
|
|
embeds = [Embed(
|
|
title="List of TokenBot Tracker Alerts",
|
|
color=0x0000b1,
|
|
description=alerts_str
|
|
)]
|
|
alerts_by_flavor = await gather_alerts_by_flavor(alerts)
|
|
for flavor in alerts_by_flavor:
|
|
embeds.append(await self.render_alert_flavor(alerts_by_flavor[flavor], user=user))
|
|
await ctx.send(embeds=embeds, ephemeral=True)
|
|
|
|
###################################
|
|
# Callbacks Commands #
|
|
###################################
|
|
|
|
@component_callback('flavor_menu')
|
|
async def flavor_menu(self, ctx: ComponentContext):
|
|
await ctx.send(f"Selected Flavor: {ctx.values[0]}", ephemeral=True)
|
|
|
|
@component_callback('high_alert_menu')
|
|
async def alert_menu(self, ctx: ComponentContext):
|
|
await ctx.send(f"Selected Alert: {ctx.values[0]}", ephemeral=True)
|
|
|
|
@component_callback('low_alert_menu')
|
|
async def alert_menu(self, ctx: ComponentContext):
|
|
await ctx.send(f"Selected Alert: {ctx.values[0]}", ephemeral=True)
|
|
|
|
@component_callback('remove_alert_menu')
|
|
async def remove_alert_menu(self, ctx: ComponentContext):
|
|
await ctx.send(f"You have selected to remove the following alert: {ctx.values[0].title()}", ephemeral=True)
|
|
|
|
@component_callback('region_menu')
|
|
async def region_menu(self, ctx: ComponentContext):
|
|
user = User(ctx.user.id, Region(ctx.values[0].lower()), subscribed_alerts=[])
|
|
await self._users.add(user)
|
|
discord_user = await self.bot.fetch_user(user.user_id)
|
|
await discord_user.send("You have successfully registered with TokenBot!\n"
|
|
"Most interactions will happen in direct messages with TokenBot here.\n"
|
|
"You can remove your registration and alerts at any time using ```/remove-registration```\n")
|
|
await ctx.send(f"Successfully registered with the {ctx.values[0]} region", ephemeral=True)
|
|
|
|
@component_callback('high_alert_button')
|
|
async def high_alert_button(self, ctx: ComponentContext):
|
|
await ctx.send("You selected to add a High Price Alert", ephemeral=True)
|
|
|
|
@component_callback('low_alert_button')
|
|
async def low_alert_button(self, ctx: ComponentContext):
|
|
await ctx.send("You selected to add a Low Price Alert", ephemeral=True)
|
|
|
|
@component_callback('custom_alert_button')
|
|
async def custom_alert_button(self, ctx: ComponentContext):
|
|
await ctx.send("You selected to add a Custom Price Alert", ephemeral=True)
|
|
|
|
###################################
|
|
# Helper Functions #
|
|
###################################
|
|
|
|
async def get_current_token(self, ctx: SlashContext, flavor: Flavor) -> str:
|
|
user: User = await self._users.get(ctx.user.id)
|
|
region = user.region.name
|
|
region_history = self._history_manager.get_history(flavor, user.region)
|
|
price_movement_str = format(region_history.last_price_movement, ',')
|
|
if region_history.last_price_movement > 0:
|
|
price_movement_str = f"+{price_movement_str}"
|
|
|
|
return (f"Last Price Value for {region}: {format(region_history.last_price_datum[1], ",")}\n"
|
|
f"Last Update Time: {region_history.last_price_datum[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
|
f"Last Price Movement: {price_movement_str}")
|
|
|
|
|
|
async def remove_alert_select_menu(self, ctx: SlashContext, user: User):
|
|
alerts_by_flavor = await gather_alerts_by_flavor(user.subscribed_alerts)
|
|
select_options: List[StringSelectOption] = []
|
|
for flavor in alerts_by_flavor:
|
|
for alert in alerts_by_flavor[flavor]:
|
|
select_options.append(StringSelectOption(
|
|
label=f"{alert.flavor.name.lower().title()} {alert.to_human_string()}",
|
|
value=f"{alert.flavor.name.lower()} {alert.to_human_string()}"
|
|
))
|
|
menu = StringSelectMenu(
|
|
select_options,
|
|
placeholder="Select an alert to remove",
|
|
custom_id="remove_alert_menu"
|
|
)
|
|
message = await ctx.send("Select an alert to remove", components=menu, ephemeral=True)
|
|
|
|
try:
|
|
alert_component: Component = await self.bot.wait_for_component(messages=message,
|
|
components=menu, timeout=30)
|
|
except TimeoutError:
|
|
menu.disabled = True
|
|
await message.edit(context=ctx, components=menu, content="Timed out")
|
|
raise TimeoutError
|
|
else:
|
|
menu.disabled = True
|
|
await message.edit(context=ctx, components=menu)
|
|
selection_split = alert_component.ctx.values[0].split(" ")
|
|
flavor = Flavor[selection_split[0].upper()]
|
|
alert_type = AlertType.from_str(' '.join(selection_split[1:]))
|
|
return Alert(alert_type, flavor, user.region)
|
|
|
|
|
|
async def flavor_select_menu(self, ctx: SlashContext) -> Type[Flavor]:
|
|
flavor_menu = copy.deepcopy(FLAVOR_MENU)
|
|
|
|
flavor_message = await ctx.send(
|
|
"Select a flavor to add alerts for",
|
|
components=flavor_menu,
|
|
ephemeral=True)
|
|
try:
|
|
flavor_component: Component = await self.bot.wait_for_component(messages=flavor_message,
|
|
components=flavor_menu, timeout=30)
|
|
except TimeoutError:
|
|
flavor_menu.disabled = True
|
|
await flavor_message.edit(context=ctx, components=flavor_menu, content="Timed out")
|
|
raise TimeoutError
|
|
else:
|
|
flavor = Flavor[flavor_component.ctx.values[0].upper()]
|
|
flavor_menu.disabled = True
|
|
await flavor_message.edit(context=ctx, components=flavor_menu)
|
|
return flavor
|
|
|
|
|
|
async def alert_category_select_menu(self, ctx: SlashContext) -> AlertCategory:
|
|
alert_type_button = copy.deepcopy(ALERT_TYPE_ROW)
|
|
alert_type_message = await ctx.send(
|
|
"Select an alert type to add",
|
|
components=alert_type_button,
|
|
ephemeral=True)
|
|
try:
|
|
alert_type_component: Component = await self.bot.wait_for_component(messages=alert_type_message,
|
|
components=alert_type_button,
|
|
timeout=30)
|
|
except TimeoutError:
|
|
for button in alert_type_button[0].components:
|
|
button.disabled = True
|
|
await alert_type_message.edit(context=ctx, components=alert_type_button, content="Timed out")
|
|
raise TimeoutError
|
|
else:
|
|
alert_type = AlertCategory.from_str(alert_type_component.ctx.custom_id)
|
|
for button in alert_type_button[0].components:
|
|
button.disabled = True
|
|
await alert_type_message.edit(context=ctx, components=alert_type_button)
|
|
return alert_type
|
|
|
|
|
|
async def _alert_select_menu_handler(self, ctx: SlashContext, menu: StringSelectMenu, message: Message) -> AlertType:
|
|
try:
|
|
component: Component = await self.bot.wait_for_component(messages=message, components=menu, timeout=30)
|
|
except TimeoutError:
|
|
menu.disabled = True
|
|
await message.edit(context=ctx, components=menu, content="Timed out")
|
|
raise TimeoutError
|
|
else:
|
|
menu.disabled = True
|
|
await message.edit(context=ctx, components=menu)
|
|
return AlertType.from_str(component.ctx.values[0])
|
|
|
|
|
|
async def high_alert_select_menu(self, ctx: SlashContext) -> AlertType:
|
|
high_menu = copy.deepcopy(HIGH_ALERT_MENU)
|
|
high_message = await ctx.send(
|
|
"Select a time range to add a High Alert for",
|
|
components=high_menu,
|
|
ephemeral=True)
|
|
return await self._alert_select_menu_handler(ctx, high_menu, high_message)
|
|
|
|
|
|
async def low_alert_select_menu(self, ctx: SlashContext) -> AlertType:
|
|
low_menu = copy.deepcopy(LOW_ALERT_MENU)
|
|
low_message = await ctx.send(
|
|
"Select a time range to add a Low Alert for",
|
|
components=low_menu,
|
|
ephemeral=True)
|
|
return await self._alert_select_menu_handler(ctx, low_menu, low_message)
|
|
|
|
|
|
async def render_alert_flavor(self, alerts: List[Alert], user: User | None = None) -> Embed:
|
|
region = alerts[0].region
|
|
flavor = alerts[0].flavor
|
|
fields: List[EmbedField] = []
|
|
for alert in alerts:
|
|
history = self._history_manager.get_history(alert.flavor, alert.region)
|
|
trigger = await history.find_update_trigger_from_alert(alert)
|
|
if trigger.last_trigger is not None:
|
|
alert_str = (f"Last Alerting Price Value: {format(trigger.last_alerting[1], ",")}\n"
|
|
f"Last Alerting Time: {trigger.last_alerting[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n")
|
|
if user is not None and user.user_id == 265678699435655169:
|
|
alert_str += (f"\nShowing you some internals since you are the bot owner:\n"
|
|
f"```history.last_price_datum:\n"
|
|
f"\t{history.last_price_datum[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
|
f"\t{history.last_price_datum[1]}\n"
|
|
f"trigger.last_alerting:\n"
|
|
f"\t{trigger.last_alerting[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
|
f"\t{trigger.last_alerting[1]}\n"
|
|
f"trigger.last_trigger:\n"
|
|
f"\t{trigger.last_trigger[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
|
f"\t{trigger.last_trigger[1]}\n"
|
|
f"trigger.squelched:\n\t{trigger.squelched}```")
|
|
else:
|
|
alert_str = "You should only be seeing this if the bot has not finished importing history at startup."
|
|
fields.append(
|
|
EmbedField(
|
|
name=f"{alert.to_human_string()} Alert", value=alert_str, inline=False))
|
|
embed = Embed(
|
|
title=f"Alerts for {region.name} {flavor.name.lower().title()}",
|
|
color=0xb10000,
|
|
fields=fields
|
|
)
|
|
return embed
|