Very Initial MVP
There is so much more to do, but I think it is time to commit this to VCS
This commit is contained in:
42
token_bot/token_database/database.py
Normal file
42
token_bot/token_database/database.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from typing import Dict, List, LiteralString
|
||||
|
||||
from token_bot.token_database.endpoints import Endpoints
|
||||
from token_bot.token_database.flavor import Flavor
|
||||
from token_bot.token_database.exceptions import *
|
||||
import aiohttp
|
||||
import json
|
||||
|
||||
from token_bot.token_database.region import Region
|
||||
|
||||
|
||||
class Database:
|
||||
def __init__(self, session: aiohttp.ClientSession):
|
||||
self.data_url = "https://data.wowtoken.app/v2/"
|
||||
self.session = session
|
||||
|
||||
async def _get_data(self, endpoint: str) -> Dict | List:
|
||||
url = f"{self.data_url}{endpoint}"
|
||||
|
||||
success = False
|
||||
tries = 0
|
||||
|
||||
while not success and tries < 3:
|
||||
try:
|
||||
return await self._external_call(url)
|
||||
except TokenHttpException:
|
||||
tries += 1
|
||||
return {}
|
||||
|
||||
async def _external_call(self, url) -> Dict | List:
|
||||
async with self.session.get(url) as resp:
|
||||
await resp.text()
|
||||
if resp.ok:
|
||||
return await resp.json()
|
||||
else:
|
||||
raise TokenHttpException(resp.status)
|
||||
|
||||
async def current(self, flavor: Flavor) -> dict:
|
||||
return await self._get_data(f'current/{flavor.name.lower()}.json')
|
||||
|
||||
async def history(self, flavor: Flavor, region: Region, relative_time: str = 'all'):
|
||||
return await self._get_data(f'relative/{flavor.name.lower()}/{region.value.lower()}/{relative_time}.json')
|
||||
Reference in New Issue
Block a user