import asyncio from typing import Dict, List import aiohttp from token_bot.token_database.exceptions import * from token_bot.token_database.flavor import Flavor from token_bot.token_database.region import Region class Database: def __init__(self, session: aiohttp.ClientSession): self.data_url = "https://data.wowtoken.app/v2/" self.session = session async def _get_data(self, endpoint: str) -> Dict | List: url = f"{self.data_url}{endpoint}" success = False tries = 0 backoff = 0.1 while not success and tries <= 5: if tries > 1: await asyncio.sleep(backoff * tries) try: return await self._external_call(url) except TokenHttpException: tries += 1 raise TokenHttpException async def _external_call(self, url) -> Dict | List: async with self.session.get(url) as resp: await resp.text() if resp.ok: return await resp.json() else: raise TokenHttpException(resp.status) async def current(self, flavor: Flavor) -> dict: return await self._get_data(f"current/{flavor.name.lower()}.json") async def history(self, flavor: Flavor, region: Region, relative_time: str = "all"): return await self._get_data( f"relative/{flavor.name.lower()}/{region.value.lower()}/{relative_time}.json" )