Initial implementation of custom price triggers

- likely to have some bugs, but this is good enough for a preview release.
This commit is contained in:
2025-11-06 02:46:03 -08:00
parent 19eb0a4e24
commit ed79f4b65c
8 changed files with 197 additions and 58 deletions

View File

@@ -41,3 +41,18 @@ class AlertsController:
alert = self._alert_to_obj(alert)
user = self._user_to_obj(user)
await alert.remove_user(self.table, user)
async def get_all_by_type(self, alert_type: int) -> List[Alert]:
"""Query all alerts of a specific type from the database."""
from aiodynamo.expressions import F
alerts = []
async for item in self.table.query(
key_condition=F("alert").equals(alert_type)
):
alert = Alert.from_item(
primary_key=item["alert"],
sort_key=item["flavor-region-price"],
users=item.get("users", [])
)
alerts.append(alert)
return alerts