wowtoken.app-backend/wow-token-historical.py

378 lines
14 KiB
Python

import sys
import boto3
from boto3.dynamodb.conditions import Key
from collections import deque
import datetime
import calendar
import json
import os
import statistics
timestream_region_map = {
'us-west-1': 'us-west-2',
'us-west-2': 'us-west-2',
'us-east-1': 'us-east-1',
'us-east-2': 'us-east-2',
'ap-south-1': 'eu-west-1',
'ap-northeast-3': 'ap-northeast-1',
'ap-northeast-2': 'ap-northeast-1',
'ap-southeast-1': 'ap-southeast-2',
'ap-southeast-2': 'ap-southeast-2',
'ap-northeast-1': 'ap-northeast-1',
'ca-central-1': 'us-east-1',
'eu-central-1': 'eu-central-1',
'eu-west-1': 'eu-west-1',
'eu-west-2': 'eu-west-1',
'eu-west-3': 'eu-central-1',
'eu-north-1': 'eu-central-1',
'sa-east-1': 'us-east-1',
'eu-south-1': 'eu-west-1'
}
dynamo_region_map = {
'us-west-1': 'us-west-1',
'us-west-2': 'us-west-2',
'us-east-1': 'us-east-1',
'us-east-2': 'us-east-2',
'ap-south-1': 'eu-north-1',
'ap-northeast-3': 'ap-northeast-1',
'ap-northeast-2': 'ap-northeast-1',
'ap-southeast-1': 'ap-southeast-1',
'ap-southeast-2': 'ap-southeast-2',
'ap-northeast-1': 'ap-northeast-1',
'ca-central-1': 'us-east-1',
'eu-central-1': 'eu-north-1',
'eu-west-1': 'eu-west-1',
'eu-west-2': 'eu-west-1',
'eu-west-3': 'eu-west-3',
'eu-north-1': 'eu-north-1',
'sa-east-1': 'sa-east-1',
'eu-south-1': 'eu-north-1'
} # This is a rough first pass at an intelligent region selector based on what is replicated
local_region = ''
if os.environ['AWS_REGION'] in dynamo_region_map:
local_dynamo_region = dynamo_region_map[os.environ['AWS_REGION']]
local_timestream_region = timestream_region_map[os.environ['AWS_REGION']]
else:
local_dynamo_region = 'eu-central-1'
local_timestream_region = 'eu-central-1'
timestream_client = boto3.client('timestream-query', region_name=local_timestream_region)
dynamodb_client = boto3.resource('dynamodb', region_name=local_dynamo_region)
tables = {
'retail': {
'recent': 'wow-token-price-recent',
'current': 'wow-token-price',
'timestream': 'wow-token-price-history'
},
'classic': {
'recent': 'wow-token-classic-price-recent',
'current': 'wow-token-classic-price',
'timestream': 'wow-token-classic-price-history'
}
}
def historical_data(time, region, version):
# This shim is to permanently change the URL of 30d to 720h for local caching,
# There seems to be at least 1 person using 30d (strangely with no .json) which was deprecated
# as the data source for 1 month of data years ago
if time == '30d':
time = '720h'
if time[-1] == 'h':
return dynamo_data(time, region, version)
else:
return timestream_data(time, region, version)
def timestream_data(time, region, version):
def __interval_bin(__time: str) -> dict:
__time = __time.lower()
def __div_floor(divisor: int, floor: int) -> int:
res = int(int(__time[:-1]) / divisor)
if res < floor:
return floor
return res
if __time == 'all':
return {'interval': "INTERVAL '10' YEAR", 'bin': '3h'}
if __time[-1] == 'd':
# I don't believe fstrings would work here
return {
'interval': "INTERVAL '" + __time[:-1] + "' DAY",
'bin': str(__div_floor(3, 1)) + 'm'
}
if __time[-1] == 'm':
return {
'interval': "INTERVAL '" + __time[:-1] + "' MONTH",
'bin': str(__div_floor(6, 1)) + 'h'
}
if __time[-1] == 'y':
return {
'interval': "INTERVAL '" + __time[:-1] + "' YEAR",
'bin': str(__div_floor(6, 1)) + 'h'
}
print(f"Function region: {os.environ['AWS_REGION']}\t Timestream Region: {local_region}")
timestream_query = (f"WITH binned_query AS ("
f" SELECT BIN(time, {__interval_bin(time)['bin']}) AS binned_time,"
f" CAST(AVG(measure_value::bigint) AS bigint) as average "
f"FROM \"{tables[version]['timestream']}\".\"{region}-price-history\" "
f"WHERE measure_name = 'price' "
f" AND time > now() - ({__interval_bin(time)['interval']}) "
f"GROUP BY BIN(time, {__interval_bin(time)['bin']}), 'price') "
f"SELECT CREATE_TIME_SERIES(binned_time, average) AS timeseries_data "
f"FROM binned_query ORDER BY timeseries_data ASC")
response = timestream_client.query(QueryString=timestream_query)
rows = response['Rows'][0]['Data'][0]['TimeSeriesValue']
data = []
# TODO: Should we inject the aggregation functions into this loop to reduce the cost of looping?
for row in rows:
row_isotime = row['Time'].split('.')[0]
row_datetime = datetime.datetime.fromisoformat(row_isotime).replace(
tzinfo=datetime.timezone.utc)
data.append({
'time': row_datetime.isoformat(),
'value': int(int(row['Value']['ScalarValue']) / 10000)
})
return data
def dynamo_data(time, region, version):
print(f"Function region: {os.environ['AWS_REGION']}\t Dynamo Region: {local_region}")
time_stripped = int(time[:-1])
start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=time_stripped)
start_time_utc = start_time.replace(tzinfo=datetime.timezone.utc)
table = dynamodb_client.Table(tables[version]['recent'])
response = table.query(
KeyConditionExpression=(
Key('region').eq(region) &
Key('timestamp').gte(int(start_time_utc.timestamp()))))
data = []
for item in response['Items']:
item_time = datetime.datetime.utcfromtimestamp(int(item['timestamp'])).replace(
tzinfo=datetime.timezone.utc).isoformat()
data.append({
'time': item_time,
'value': int(int(item['price']) / 10000)
})
return data
def aggregate_data(aggregate_function: str, data: list):
if aggregate_function == 'daily_max':
return max_min(1, 1, data)
elif aggregate_function == 'daily_min':
return max_min(-1, 1, data)
elif aggregate_function == 'daily_mean':
return mean(1, data)
elif aggregate_function == 'weekly_max':
return max_min(1, 7, data)
elif aggregate_function == 'weekly_min':
return max_min(-1, 7, data)
elif aggregate_function == 'weekly_mean':
return mean(7, data)
def date_in_range(day_range: tuple, date: datetime.datetime):
month_range = calendar.monthrange(date.year, date.month)
if day_range[0] <= date.day < day_range[1]:
return True
elif date.day < day_range[1] and date.day < day_range[0]:
# TODO: I am probably missing a sanity check here, come back to it
return True
else:
return False
def day_bucket(bucket_size: int, date: datetime.datetime) -> tuple[datetime.datetime, datetime.datetime]:
month_range = calendar.monthrange(date.year, date.month)
days_to_reset = {0: 1, 1: 0, 2: 6, 3: 5, 4: 4, 5: 3, 6: 2}
# We want the bucket boundaries for a bucket size of 7 to fall on
# reset day (index 1), and for a month (31) to fall on the actual boundaries of that month
# this means month-to-month, there are dynamic sizing of buckets
# TODO: Monthly boundaries
if bucket_size == 7 and date.weekday() != 1:
# This is WoW, the week starts on Tuesday (datetime index 1)
bucket_size = days_to_reset[date.weekday()]
return tuple((date, date + datetime.timedelta(days=bucket_size)))
def is_new_bucket(d_datetime: datetime.datetime, current_bucket_day: datetime.datetime.day, bucket: tuple) -> bool:
if d_datetime.day != current_bucket_day and (d_datetime >= bucket[1] or d_datetime.weekday() == 1):
return True
return False
def __sum_total(__data: list) -> int:
__total = 0
for __d in __data:
__total += __d['value']
return __total
def max_min(fn: int, bucket_size: int, data: list) -> list:
new_data = []
first_date = datetime.datetime.fromisoformat(data[0]['time'])
current_bucket_day = first_date.day
# I hate working with dates
bucket = day_bucket(bucket_size, first_date)
min_max = {'minimum': 999_999_999, 'maximum': 0}
min_max_date = {'minimum_date': datetime.datetime.min, 'maximum_date': datetime.datetime.max}
for d in data:
d_datetime = datetime.datetime.fromisoformat(d['time'])
# current_day is used to check if this 'if' has triggered for a new bucket and bypass if it has
if is_new_bucket(d_datetime, current_bucket_day, bucket):
current_bucket_day = d_datetime.day
bucket = day_bucket(bucket_size, d_datetime)
if fn == -1: # Minimum function
new_data.append({'time': min_max_date['minimum_date'], 'value': min_max['minimum']})
elif fn == 1: # Maximum function
new_data.append({'time': min_max_date['maximum_date'], 'value': min_max['maximum']})
min_max = {'minimum': 999_999_999, 'maximum': 0}
min_max_date = {
'minimum_date': datetime.datetime.min.isoformat(),
'maximum_date': datetime.datetime.max.isoformat()
}
if d['value'] < min_max['minimum']:
min_max['minimum'] = d['value']
min_max_date['minimum_date'] = d_datetime.isoformat()
if d['value'] > min_max['maximum']:
min_max['maximum'] = d['value']
min_max_date['maximum_date'] = d_datetime.isoformat()
return new_data
def mean(bucket_size: int, data: list) -> list:
new_data = []
first_date = datetime.datetime.fromisoformat(data[0]['time'])
current_bucket_day = first_date.day
bucket = day_bucket(bucket_size, first_date)
mean_bucket = []
bucket_date = first_date
for d in data:
d_datetime = datetime.datetime.fromisoformat(d['time'])
if is_new_bucket(d_datetime, current_bucket_day, bucket):
current_bucket_day = d_datetime.day
bucket = day_bucket(bucket_size, d_datetime)
new_data.append({'time': bucket[0].isoformat(), 'value': int(statistics.mean(mean_bucket))})
mean_bucket = []
mean_bucket.append(d['value'])
return new_data
# TODO FIXME
def simple_moving_average(hours: int, data: list) -> list:
# The cyclomatic complexity of this function is getting high, I need to figure out a more elegant solution
new_data = []
queue = deque()
hours_in_queue = 0
head_date = datetime.datetime.fromisoformat(data[8]['time'])
for datum in data:
datum_datetime = datetime.datetime.fromisoformat(datum['time'])
if datum_datetime.hour == head_date.hour:
queue.append(datum)
elif datum_datetime.hour != head_date.hour:
if hours_in_queue == hours:
q_list = list(queue)
total = __sum_total(q_list)
new_datum = {
'value': int(total / len(q_list)),
'time': head_date.isoformat()
}
new_data.append(new_datum)
deque_val = 0
for d in q_list:
__dt = datetime.datetime.fromisoformat(d['time'])
if __dt.hour == head_date.hour and __dt.day == __dt.day:
deque_val += 1
while deque_val != 0:
queue.pop()
deque_val -= 1
hours_in_queue -= 1
head_date = datum_datetime
elif hours_in_queue < 5:
queue.append(datum)
hours_in_queue += 1
return new_data
def moving_weighted_average(days: int, data: list) -> list:
pass
def validate_path(split_uri: list) -> bool:
if not split_uri[-1].endswith('json'):
return False
return validate_region(split_uri[-2]) and validate_time(split_uri[-1].split('.')[0])
def validate_time(time: str) -> bool:
# These can probably be rewritten as a lambda but at the time I am writing this I am just doing a first pass
if time[-1] == 'h':
hours = int(time[0:-1])
return (hours >= 24) and (hours < 1000)
if time[-1] == 'd':
days = int(time[0:-1])
return (days >= 30) and (days <= 100)
if time[-1] == 'm':
months = int(time[0:-1])
return (months >= 1) and (months <= 12)
if time[-1] == 'y':
years = int(time[0:-1])
return (years >= 1) and (years <= 10)
return time == 'all'
def validate_region(region: str) -> bool:
valid_regions = ['us', 'eu', 'tw', 'kr']
return region in valid_regions
def validate_aggregate(aggregate_function: str) -> bool:
valid_aggregates = ['daily_max', 'daily_min', 'daily_mean', 'weekly_max', 'weekly_min', 'weekly_mean']
return aggregate_function in valid_aggregates
def lambda_handler(event, context):
uri = event['Records'][0]['cf']['request']['uri']
split_uri = uri.split('/')
if validate_path(split_uri):
if 'classic' in split_uri:
version = 'classic'
else:
version = 'retail'
time = split_uri[-1].split('.')[0]
region = split_uri[-2]
aggregate_function = split_uri[-3]
data = historical_data(time, region, version)
if validate_aggregate(aggregate_function):
data = aggregate_data(aggregate_function, data)
response = {'status': '200', 'statusDescription': 'OK', 'headers': {}}
response['headers']['content-type'] = [{'key': 'Content-Type', 'value': 'application/json'}]
response['body'] = json.dumps(data)
return response
else:
return {'status': '404', 'statusDescription': 'NotFound', 'headers': {}}