56 lines
1.9 KiB
Python
56 lines
1.9 KiB
Python
|
import datetime
|
||
|
from typing import LiteralString, Set, List, Tuple
|
||
|
|
||
|
from token_bot.history_manager.update_trigger import UpdateTrigger
|
||
|
from token_bot.persistant_database import AlertType, Alert
|
||
|
from token_bot.token_database.flavor import Flavor
|
||
|
from token_bot.token_database.region import Region
|
||
|
|
||
|
|
||
|
class History:
|
||
|
def __init__(self, flavor: Flavor, region: Region):
|
||
|
self._flavor : Flavor = flavor
|
||
|
self._region : Region = region
|
||
|
self._history : List[Tuple[datetime, int]] = []
|
||
|
self._last_price_movement : int = 0
|
||
|
self._latest_price_datum : Tuple[datetime.datetime, int] | None = None
|
||
|
self._update_triggers : List[UpdateTrigger] = []
|
||
|
self._squelched_alerts : List[Alert] = []
|
||
|
|
||
|
for alert_type in AlertType:
|
||
|
self._update_triggers.append(UpdateTrigger(Alert(alert_type, flavor, self._region)))
|
||
|
|
||
|
@property
|
||
|
def flavor(self) -> Flavor:
|
||
|
return self._flavor
|
||
|
|
||
|
@property
|
||
|
def region(self) -> Region:
|
||
|
return self._region
|
||
|
|
||
|
@property
|
||
|
def last_price_datum(self) -> Tuple[datetime.datetime, int] | None:
|
||
|
return self._latest_price_datum
|
||
|
|
||
|
@property
|
||
|
def history(self) -> List[Tuple[datetime.datetime, int]]:
|
||
|
return self._history
|
||
|
|
||
|
async def _process_update_triggers(self) -> List[Alert]:
|
||
|
alerts = []
|
||
|
for trigger in self._update_triggers:
|
||
|
if trigger.check_and_update(self._latest_price_datum, self._history) and trigger.alert not in self._squelched_alerts:
|
||
|
alerts.append(trigger.alert)
|
||
|
|
||
|
return alerts
|
||
|
|
||
|
async def add_price(self, datum: Tuple[datetime.datetime, int]) -> List[Alert]:
|
||
|
if self._latest_price_datum is None:
|
||
|
self._latest_price_datum = datum
|
||
|
self._last_price_movement = datum[1] - self._latest_price_datum[1]
|
||
|
self._latest_price_datum = datum
|
||
|
self._history.append(datum)
|
||
|
return await self._process_update_triggers()
|
||
|
|
||
|
|