import asyncio import copy import datetime import logging from typing import Type, Dict, List import aiohttp from interactions import ( Extension, SlashContext, component_callback, ComponentContext, StringSelectMenu, Message, Embed, EmbedField, is_owner, check, StringSelectOption, ) from interactions import Task, IntervalTrigger from interactions import slash_command, listen from interactions.api.events import Component from interactions.api.events import Startup from token_bot.controller.alerts import AlertsController from token_bot.controller.users import UsersController from token_bot.history_manager.history_manager import HistoryManager, History from token_bot.persistant_database.alert_category import AlertCategory from token_bot.persistant_database.alert_schema import Alert from token_bot.persistant_database.alert_type import AlertType from token_bot.persistant_database.user_schema import User from token_bot.token_database import database as tdb from token_bot.token_database.flavor import Flavor from token_bot.token_database.region import Region from token_bot.ui.action_row.tracker import ALERT_TYPE_ROW from token_bot.ui.select_menus.alert_menu import HIGH_ALERT_MENU, LOW_ALERT_MENU from token_bot.ui.select_menus.flavor_menu import FLAVOR_MENU from token_bot.ui.select_menus.region_menu import REGION_MENU #### Static Helper Functions async def gather_alerts_by_flavor(alerts: List[Alert]) -> Dict[Flavor, List[Alert]]: alerts_by_flavor = {} for alert in alerts: if alert.flavor not in alerts_by_flavor: alerts_by_flavor[alert.flavor] = [alert] else: alerts_by_flavor[alert.flavor].append(alert) return alerts_by_flavor class Tracker(Extension): def __init__(self, bot): self._users: UsersController | None = None self._alerts: AlertsController | None = None self._tdb: tdb.Database | None = None self._history_manager: HistoryManager | None = None ################################### # Task Functions # ################################### @Task.create(IntervalTrigger(minutes=1)) async def update_data(self): self.bot.logger.log(logging.INFO, "TokenBot Tracker: Updating Price") users_alerts: Dict[User, List[Alert]] = {} for flavor in Flavor: for region in Region: alerts = await self._history_manager.update_data(flavor, Region(region)) for alert in alerts: users = await self._alerts.get_users(alert) for user in users: if user not in users_alerts: users_alerts[user] = [alert] else: users_alerts[user].append(alert) for user in users_alerts: discord_user = await self.bot.fetch_user(user.user_id) embeds = [ Embed( title="TokenBot Tracker Alert Triggered", color=0xB10000, description=f"Hello, you requested to be sent an alert when the price of the World of Warcraft " f"token reaches a certain value.\n\n" f"As a reminder, you can remove an alert via ```/remove-alert```\n" f"or you can remove all alerts and user data via ```/remove-registration```\n\n", ) ] alerts_by_flavor = await gather_alerts_by_flavor(users_alerts[user]) for flavor in alerts_by_flavor: embeds.append( await self._render_alert_flavor(alerts_by_flavor[flavor], user=user) ) await discord_user.send(embeds=embeds) ################################### # Slash Commands # ################################### @listen(Startup) async def on_start(self): self.bot.logger.log(logging.INFO, "TokenBot Tracker: Initializing") self._users = UsersController(aiohttp.ClientSession()) self._alerts = AlertsController(aiohttp.ClientSession()) self._tdb = tdb.Database(aiohttp.ClientSession()) self._history_manager = HistoryManager(self._tdb) self.bot.logger.log(logging.INFO, "TokenBot Tracker: Initialized") self.bot.logger.log(logging.INFO, "TokenBot Tracker: Loading Historical Data") await self._history_manager.load_data() self.bot.logger.log( logging.INFO, "TokenBot Tracker: Loading Historical Data Finished" ) self.bot.logger.log(logging.INFO, "TokenBot Tracker: Started") self.update_data.start() @slash_command( name="register", description="Register with a new GoblinBot Region for alerts on token price changes.", ) async def register(self, ctx: SlashContext): text = ( "## Select a region to register with \n\n" "Please note: \n" "* You can only be registered with one region at a time \n" "* Changing your region will remove all previous alerts you have signed up for \n" "* You can remove all alerts and user data using ```/remove-registration```" ) menu = copy.deepcopy(REGION_MENU) await ctx.send(text, components=menu, ephemeral=True) @slash_command( name="remove-registration", description="Remove all alerts and registration from GoblinBot", ) async def remove_registration(self, ctx: SlashContext): if await self._users.exists(ctx.user.id): user = await self._users.get(ctx.user.id) for alert in user.subscribed_alerts: await self._alerts.remove_user(alert, user) await self._users.delete(ctx.user.id) await ctx.send("All alert subscriptions and user data deleted", ephemeral=True) @slash_command( name="exists", description="Check if you are registered with GoblinBot" "" ) @check(is_owner()) async def exists(self, ctx: SlashContext): await ctx.send(str(await self._users.exists(ctx.user.id)), ephemeral=True) @slash_command(description="The current retail token cost") async def current(self, ctx: SlashContext): current_str = await self.get_current_token(ctx, tdb.Flavor.RETAIL) await ctx.send(current_str, ephemeral=True) @slash_command(description="The current classic token cost") async def current_classic(self, ctx: SlashContext): current_str = await self.get_current_token(ctx, tdb.Flavor.CLASSIC) await ctx.send(current_str, ephemeral=True) @slash_command(name="add-alert", description="Add an alert listener") async def add_alert(self, ctx: SlashContext): if not await self._users.exists(ctx.user.id): try: await self.region_select_menu(ctx) except TimeoutError: return user = await self._users.get(ctx.user.id) try: flavor = await self.flavor_select_menu(ctx) alert_category = await self.alert_category_select_menu(ctx) match alert_category: case AlertCategory.LOW: alert_type = await self.low_alert_select_menu(ctx) case AlertCategory.HIGH: alert_type = await self.high_alert_select_menu(ctx) case _: raise NotImplementedError except TimeoutError: return else: alert = Alert(alert_type, flavor, user.region) if not await self._users.is_subscribed(user, alert): await asyncio.gather( self._users.add_alert(user, alert), self._alerts.add_user(alert, user), ) await ctx.send("Successfully added alert", ephemeral=True) else: await ctx.send( "You are already subscribed to this alert", ephemeral=True ) @slash_command( name="remove-alert", description="Remove an alert you have signed up for" ) async def remove_alert(self, ctx: SlashContext): if not self._user_is_registered(ctx): return user = await self._users.get(ctx.user.id) alerts = await self._users.list_alerts(user) if len(alerts) == 0: await ctx.send("You do not have any alerts registered", ephemeral=True) return try: alert = await self.remove_alert_select_menu(ctx, user) except TimeoutError: return else: await asyncio.gather( self._users.remove_alert(user, alert), self._alerts.remove_user(alert, user), ) await ctx.send("Successfully removed alert", ephemeral=True) @slash_command( name="list-alerts", description="List all alerts you have signed up for" ) async def list_alerts(self, ctx: SlashContext): if not await self._user_is_registered(ctx): return user = await self._users.get(ctx.user.id) alerts = await self._users.list_alerts(user) if len(alerts) == 0: await ctx.send("You do not have any alerts registered", ephemeral=True) return alerts_str = f"You have {len(alerts)} out of 25 maximum alerts registered" embeds = [ Embed( title="List of GoblinBot Tracker Alerts", color=0x0000B1, description=alerts_str, ) ] alerts_by_flavor = await gather_alerts_by_flavor(alerts) for flavor in alerts_by_flavor: embeds.append( await self._render_alert_flavor(alerts_by_flavor[flavor], user=user) ) await ctx.send(embeds=embeds, ephemeral=True) ################################### # Callbacks Commands # ################################### @component_callback("flavor_menu") async def flavor_menu(self, ctx: ComponentContext): await ctx.send(f"Selected Flavor: {ctx.values[0]}", ephemeral=True) @component_callback("high_alert_menu") async def alert_menu(self, ctx: ComponentContext): await ctx.send(f"Selected Alert: {ctx.values[0]}", ephemeral=True) @component_callback("low_alert_menu") async def alert_menu(self, ctx: ComponentContext): await ctx.send(f"Selected Alert: {ctx.values[0]}", ephemeral=True) @component_callback("remove_alert_menu") async def remove_alert_menu(self, ctx: ComponentContext): await ctx.send( f"You have selected to remove the following alert: {ctx.values[0].title()}", ephemeral=True, ) @component_callback("region_menu") async def region_menu_cb(self, ctx: ComponentContext): discord_user = await self.bot.fetch_user(ctx.user.id) await discord_user.send( "You have successfully registered your region with GoblinBot!\n" "Most interactions will happen in direct messages with GoblinBot here.\n" "You can remove your user data and alerts at any time using ```/remove-registration```\n" ) await ctx.defer(edit_origin=True) @component_callback("high_alert_button") async def high_alert_button(self, ctx: ComponentContext): await ctx.send("You selected to add a High Price Alert", ephemeral=True) @component_callback("low_alert_button") async def low_alert_button(self, ctx: ComponentContext): await ctx.send("You selected to add a Low Price Alert", ephemeral=True) @component_callback("custom_alert_button") async def custom_alert_button(self, ctx: ComponentContext): await ctx.send("You selected to add a Custom Price Alert", ephemeral=True) ################################### # Helper Functions # ################################### async def get_current_token(self, ctx: SlashContext, flavor: Flavor) -> str: user: User = await self._users.get(ctx.user.id) region = user.region.name region_history = self._history_manager.get_history(flavor, user.region) price_movement_str = format(region_history.last_price_movement, ",") if region_history.last_price_movement > 0: price_movement_str = f"+{price_movement_str}" return ( f"Last Price Value for {region}: {format(region_history.last_price_datum[1], ",")}\n" f"Last Update Time: {region_history.last_price_datum[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n" f"Last Price Movement: {price_movement_str}" ) async def remove_alert_select_menu(self, ctx: SlashContext, user: User): alerts_by_flavor = await gather_alerts_by_flavor(user.subscribed_alerts) select_options: List[StringSelectOption] = [] for flavor in alerts_by_flavor: for alert in alerts_by_flavor[flavor]: select_options.append( StringSelectOption( label=f"{alert.flavor.name.lower().title()} {alert.to_human_string()}", value=f"{alert.flavor.name.lower()} {alert.to_human_string()}", ) ) menu = StringSelectMenu( select_options, placeholder="Select an alert to remove", custom_id="remove_alert_menu", ) message = await ctx.send( "Select an alert to remove", components=menu, ephemeral=True ) try: alert_component: Component = await self.bot.wait_for_component( messages=message, components=menu, timeout=30 ) except TimeoutError: menu.disabled = True await message.edit(context=ctx, components=menu, content="Timed out") raise TimeoutError else: menu.disabled = True await message.edit(context=ctx, components=menu) selection_split = alert_component.ctx.values[0].split(" ") flavor = Flavor[selection_split[0].upper()] alert_type = AlertType.from_str(" ".join(selection_split[1:])) return Alert(alert_type, flavor, user.region) async def region_select_menu(self, ctx: SlashContext, user: User | None = None): region_menu = copy.deepcopy(REGION_MENU) region_menu_str = str() if user is None: region_menu_str += "You are not currently registered with a region, please select a region to register with.\n" region_menu_str += ( "* You can only be registered with one region at a time.\n" "* Registering for a new region will remove your old region's registration.\n" ) region_message = await ctx.send( region_menu_str, components=region_menu, ephemeral=True, ) try: region_component = await self.bot.wait_for_component( messages=region_message, components=region_menu, timeout=30 ) except TimeoutError: region_menu.disabled = True await region_message.edit( context=ctx, components=region_menu, content="Timed out" ) raise TimeoutError else: region_menu.disabled = True region = Region(region_component.ctx.values[0].lower()) user = User(ctx.user.id, region, subscribed_alerts=[]) await asyncio.gather( self._users.add(user), region_message.edit(context=ctx, components=region_menu), ) return region async def flavor_select_menu(self, ctx: SlashContext) -> Type[Flavor]: flavor_menu = copy.deepcopy(FLAVOR_MENU) flavor_message = await ctx.send( "Select a flavor to add alerts for", components=flavor_menu, ephemeral=True ) try: flavor_component: Component = await self.bot.wait_for_component( messages=flavor_message, components=flavor_menu, timeout=30 ) except TimeoutError: flavor_menu.disabled = True await flavor_message.edit( context=ctx, components=flavor_menu, content="Timed out" ) raise TimeoutError else: flavor = Flavor[flavor_component.ctx.values[0].upper()] flavor_menu.disabled = True await flavor_message.edit(context=ctx, components=flavor_menu) return flavor async def alert_category_select_menu(self, ctx: SlashContext) -> AlertCategory: alert_type_button = copy.deepcopy(ALERT_TYPE_ROW) alert_type_message = await ctx.send( "Select an alert type to add", components=alert_type_button, ephemeral=True ) try: alert_type_component: Component = await self.bot.wait_for_component( messages=alert_type_message, components=alert_type_button, timeout=30 ) except TimeoutError: for button in alert_type_button[0].components: button.disabled = True await alert_type_message.edit( context=ctx, components=alert_type_button, content="Timed out" ) raise TimeoutError else: alert_type = AlertCategory.from_str(alert_type_component.ctx.custom_id) for button in alert_type_button[0].components: button.disabled = True await alert_type_message.edit(context=ctx, components=alert_type_button) return alert_type async def _alert_select_menu_handler( self, ctx: SlashContext, menu: StringSelectMenu, message: Message ) -> AlertType: try: component: Component = await self.bot.wait_for_component( messages=message, components=menu, timeout=30 ) except TimeoutError: menu.disabled = True await message.edit(context=ctx, components=menu, content="Timed out") raise TimeoutError else: menu.disabled = True await component.ctx.defer(edit_origin=True) await message.edit(context=ctx, components=menu) return AlertType.from_str(component.ctx.values[0]) async def high_alert_select_menu(self, ctx: SlashContext) -> AlertType: high_menu = copy.deepcopy(HIGH_ALERT_MENU) high_message = await ctx.send( "Select a time range to add a High Alert for", components=high_menu, ephemeral=True, ) return await self._alert_select_menu_handler(ctx, high_menu, high_message) async def low_alert_select_menu(self, ctx: SlashContext) -> AlertType: low_menu = copy.deepcopy(LOW_ALERT_MENU) low_message = await ctx.send( "Select a time range to add a Low Alert for", components=low_menu, ephemeral=True, ) return await self._alert_select_menu_handler(ctx, low_menu, low_message) async def _user_is_registered(self, ctx: SlashContext) -> bool: if not await self._users.exists(ctx.user.id): await ctx.send( "You are not registered with any region\n" "Please add an alert to get started ```/add-alert```", ephemeral=True, ) return False return True async def _render_alert_flavor( self, alerts: List[Alert], user: User | None = None ) -> Embed: region = alerts[0].region flavor = alerts[0].flavor fields: List[EmbedField] = [] for alert in alerts: history = self._history_manager.get_history(alert.flavor, alert.region) trigger = await history.find_update_trigger_from_alert(alert) if trigger.last_trigger is not None: alert_str = ( f"Last Alerting Price Value: {format(trigger.last_alerting[1], ",")}\n" f"Last Alerting Time: {trigger.last_alerting[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n" f"[Link to this Chart]({self._render_token_url(alert)})\n" ) if user is not None and user.user_id == 265678699435655169: alert_str += ( f"\nShowing you some internals since you are the bot owner:\n" f"```history.last_price_datum:\n" f"\t{history.last_price_datum[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n" f"\t{history.last_price_datum[1]}\n" f"trigger.last_alerting:\n" f"\t{trigger.last_alerting[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n" f"\t{trigger.last_alerting[1]}\n" f"trigger.last_trigger:\n" f"\t{trigger.last_trigger[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n" f"\t{trigger.last_trigger[1]}\n" f"trigger.squelched:\n\t{trigger.squelched}```" ) else: alert_str = "You should only be seeing this if the bot has not finished importing history at startup." fields.append( EmbedField( name=f"{alert.to_human_string()} Alert", value=alert_str, inline=False, ) ) embed = Embed( title=f"Alerts for {region.name} {flavor.name.lower().title()}", color=0xB10000, fields=fields, ) return embed def _render_token_url(self, alert: Alert) -> str: match alert.flavor: case Flavor.CLASSIC: url = "https://classic.wowtoken.app/?" case Flavor.RETAIL: url = "https://wowtoken.app/?" case _: raise NotImplementedError url += f"region={alert.region.value}&" match alert.alert_type: case AlertType.WEEKLY_LOW | AlertType.WEEKLY_HIGH: url += "time=168h&" case AlertType.MONTHLY_LOW | AlertType.MONTHLY_HIGH: url += "time=720h&" case AlertType.YEARLY_LOW | AlertType.YEARLY_HIGH: url += "time=1y&" case AlertType.ALL_TIME_LOW | AlertType.ALL_TIME_HIGH: url += "time=all&" return url