Update historical to support both Timestream and Dynamo, as well as multi-region support

This commit is contained in:
Emily Doherty 2023-10-02 01:06:33 -07:00
parent 3e7aa429fa
commit 36b5de0df2

View File

@ -2,12 +2,34 @@ import sys
import boto3 import boto3
from boto3.dynamodb.conditions import Key from boto3.dynamodb.conditions import Key
from collections import deque
import datetime import datetime
import calendar import calendar
import json import json
import os import os
import statistics import statistics
timestream_region_map = {
'us-west-1': 'us-west-2',
'us-west-2': 'us-west-2',
'us-east-1': 'us-east-1',
'us-east-2': 'us-east-2',
'ap-south-1': 'eu-west-1',
'ap-northeast-3': 'ap-northeast-1',
'ap-northeast-2': 'ap-northeast-1',
'ap-southeast-1': 'ap-southeast-2',
'ap-southeast-2': 'ap-southeast-2',
'ap-northeast-1': 'ap-northeast-1',
'ca-central-1': 'us-east-1',
'eu-central-1': 'eu-central-1',
'eu-west-1': 'eu-west-1',
'eu-west-2': 'eu-west-1',
'eu-west-3': 'eu-central-1',
'eu-north-1': 'eu-central-1',
'sa-east-1': 'us-east-1',
'eu-south-1': 'eu-west-1'
}
dynamo_region_map = { dynamo_region_map = {
'us-west-1': 'us-west-1', 'us-west-1': 'us-west-1',
'us-west-2': 'us-west-2', 'us-west-2': 'us-west-2',
@ -16,32 +38,42 @@ dynamo_region_map = {
'ap-south-1': 'eu-north-1', 'ap-south-1': 'eu-north-1',
'ap-northeast-3': 'ap-northeast-1', 'ap-northeast-3': 'ap-northeast-1',
'ap-northeast-2': 'ap-northeast-1', 'ap-northeast-2': 'ap-northeast-1',
'ap-southeast-1': 'ap-southeast-2', 'ap-southeast-1': 'ap-southeast-1',
'ap-southeast-2': 'ap-southeast-2', 'ap-southeast-2': 'ap-southeast-2',
'ap-northeast-1': 'ap-northeast-1', 'ap-northeast-1': 'ap-northeast-1',
'ca-central-1': 'us-east-1', 'ca-central-1': 'us-east-1',
'eu-central-1': 'eu-north-1', 'eu-central-1': 'eu-north-1',
'eu-west-1': 'eu-west-3', 'eu-west-1': 'eu-west-1',
'eu-west-2': 'eu-west-3', 'eu-west-2': 'eu-west-1',
'eu-west-3': 'eu-west-3', 'eu-west-3': 'eu-west-3',
'eu-north-1': 'eu-north-1', 'eu-north-1': 'eu-north-1',
'sa-east-1': 'sa-east-1', 'sa-east-1': 'sa-east-1',
'eu-south-1': 'eu-north-1', 'eu-south-1': 'eu-north-1'
} # This is a rough first pass at an intelligent region selector based on what is replicated } # This is a rough first pass at an intelligent region selector based on what is replicated
local_region = '' local_region = ''
if os.environ['AWS_REGION'] in dynamo_region_map: if os.environ['AWS_REGION'] in dynamo_region_map:
local_region = dynamo_region_map[os.environ['AWS_REGION']] local_dynamo_region = dynamo_region_map[os.environ['AWS_REGION']]
local_timestream_region = timestream_region_map[os.environ['AWS_REGION']]
else: else:
local_region = 'eu-west-3' local_dynamo_region = 'eu-central-1'
local_timestream_region = 'eu-central-1'
timestream_client = boto3.client('timestream-query', region_name=local_timestream_region)
dynamodb_client = boto3.resource('dynamodb', region_name=local_dynamo_region)
timestream_client = boto3.client('timestream-query', region_name='us-east-1') tables = {
dynamodb_client = boto3.resource('dynamodb', region_name=local_region) 'retail': {
table = dynamodb_client.Table('wow-token-price-recent') 'recent': 'wow-token-price-recent',
'current': 'wow-token-price',
'timestream': 'wow-token-price-history'
def historical_data(time, region): },
'classic': {
'recent': 'wow-token-classic-price-recent',
'current': 'wow-token-classic-price',
'timestream': 'wow-token-classic-price-history'
}
}
def historical_data(time, region, version):
# This shim is to permanently change the URL of 30d to 720h for local caching, # This shim is to permanently change the URL of 30d to 720h for local caching,
# There seems to be at least 1 person using 30d (strangely with no .json) which was deprecated # There seems to be at least 1 person using 30d (strangely with no .json) which was deprecated
# as the data source for 1 month of data years ago # as the data source for 1 month of data years ago
@ -49,30 +81,54 @@ def historical_data(time, region):
time = '720h' time = '720h'
if time[-1] == 'h': if time[-1] == 'h':
return dynamo_data(time, region) return dynamo_data(time, region, version)
else: else:
return timestream_data(time, region) return timestream_data(time, region, version)
def timestream_data(time, region): def timestream_data(time, region, version):
query_table = { def __interval_bin(__time: str) -> dict:
'30d': {'interval': "INTERVAL '1' MONTH", 'bin': '15m'}, __time = __time.lower()
'90d': {'interval': "INTERVAL '3' MONTH", 'bin': '30m'},
'6m': {'interval': "INTERVAL '6' MONTH", 'bin': '1h'}, def __div_floor(divisor: int, floor: int) -> int:
'1y': {'interval': "INTERVAL '1' YEAR", 'bin': '2h'}, res = int(int(__time[:-1]) / divisor)
'2y': {'interval': "INTERVAL '2' YEAR", 'bin': '2h'}, if res < floor:
'all': {'interval': "INTERVAL '10' YEAR", 'bin': '3h'}, return floor
return res
if __time == 'all':
return {'interval': "INTERVAL '10' YEAR", 'bin': '3h'}
if __time[-1] == 'd':
# I don't believe fstrings would work here
return {
'interval': "INTERVAL '" + __time[:-1] + "' DAY",
'bin': str(__div_floor(3, 1)) + 'm'
} }
timestream_query = 'WITH binned_query AS (' + \
'SELECT BIN(time, ' + query_table[time]['bin'] + ') AS binned_time,' + \ if __time[-1] == 'm':
'CAST(AVG(measure_value::bigint) AS bigint) as average ' + \ return {
'FROM "wow-token-price-history"."' + region + '-price-history" ' + \ 'interval': "INTERVAL '" + __time[:-1] + "' MONTH",
"WHERE measure_name = 'price' " + \ 'bin': str(__div_floor(6, 1)) + 'h'
'AND time > now() - (' + query_table[time]['interval'] + ') ' + \ }
'GROUP BY BIN(time, ' + query_table[time]['bin'] + "), 'price') " + \
'SELECT CREATE_TIME_SERIES(binned_time, average) AS timeseries_data ' + \ if __time[-1] == 'y':
'FROM binned_query ' + \ return {
'ORDER BY timeseries_data ASC' # This is so ugly 'interval': "INTERVAL '" + __time[:-1] + "' YEAR",
'bin': str(__div_floor(6, 1)) + 'h'
}
print(f"Function region: {os.environ['AWS_REGION']}\t Timestream Region: {local_region}")
timestream_query = (f"WITH binned_query AS ("
f" SELECT BIN(time, {__interval_bin(time)['bin']}) AS binned_time,"
f" CAST(AVG(measure_value::bigint) AS bigint) as average "
f"FROM \"{tables[version]['timestream']}\".\"{region}-price-history\" "
f"WHERE measure_name = 'price' "
f" AND time > now() - ({__interval_bin(time)['interval']}) "
f"GROUP BY BIN(time, {__interval_bin(time)['bin']}), 'price') "
f"SELECT CREATE_TIME_SERIES(binned_time, average) AS timeseries_data "
f"FROM binned_query ORDER BY timeseries_data ASC")
response = timestream_client.query(QueryString=timestream_query) response = timestream_client.query(QueryString=timestream_query)
rows = response['Rows'][0]['Data'][0]['TimeSeriesValue'] rows = response['Rows'][0]['Data'][0]['TimeSeriesValue']
data = [] data = []
@ -88,10 +144,12 @@ def timestream_data(time, region):
return data return data
def dynamo_data(time, region): def dynamo_data(time, region, version):
print(f"Function region: {os.environ['AWS_REGION']}\t Dynamo Region: {local_region}")
time_stripped = int(time[:-1]) time_stripped = int(time[:-1])
start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=time_stripped) start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=time_stripped)
start_time_utc = start_time.replace(tzinfo=datetime.timezone.utc) start_time_utc = start_time.replace(tzinfo=datetime.timezone.utc)
table = dynamodb_client.Table(tables[version]['recent'])
response = table.query( response = table.query(
KeyConditionExpression=( KeyConditionExpression=(
Key('region').eq(region) & Key('region').eq(region) &
@ -153,6 +211,13 @@ def is_new_bucket(d_datetime: datetime.datetime, current_bucket_day: datetime.da
return False return False
def __sum_total(__data: list) -> int:
__total = 0
for __d in __data:
__total += __d['value']
return __total
def max_min(fn: int, bucket_size: int, data: list) -> list: def max_min(fn: int, bucket_size: int, data: list) -> list:
new_data = [] new_data = []
first_date = datetime.datetime.fromisoformat(data[0]['time']) first_date = datetime.datetime.fromisoformat(data[0]['time'])
@ -210,13 +275,72 @@ def mean(bucket_size: int, data: list) -> list:
return new_data return new_data
def validate_path(split_uri: str) -> bool: # TODO FIXME
def simple_moving_average(hours: int, data: list) -> list:
# The cyclomatic complexity of this function is getting high, I need to figure out a more elegant solution
new_data = []
queue = deque()
hours_in_queue = 0
head_date = datetime.datetime.fromisoformat(data[8]['time'])
for datum in data:
datum_datetime = datetime.datetime.fromisoformat(datum['time'])
if datum_datetime.hour == head_date.hour:
queue.append(datum)
elif datum_datetime.hour != head_date.hour:
if hours_in_queue == hours:
q_list = list(queue)
total = __sum_total(q_list)
new_datum = {
'value': int(total / len(q_list)),
'time': head_date.isoformat()
}
new_data.append(new_datum)
deque_val = 0
for d in q_list:
__dt = datetime.datetime.fromisoformat(d['time'])
if __dt.hour == head_date.hour and __dt.day == __dt.day:
deque_val += 1
while deque_val != 0:
queue.pop()
deque_val -= 1
hours_in_queue -= 1
head_date = datum_datetime
elif hours_in_queue < 5:
queue.append(datum)
hours_in_queue += 1
return new_data
def moving_weighted_average(days: int, data: list) -> list:
pass
def validate_path(split_uri: list) -> bool:
if not split_uri[-1].endswith('json'):
return False
return validate_region(split_uri[-2]) and validate_time(split_uri[-1].split('.')[0]) return validate_region(split_uri[-2]) and validate_time(split_uri[-1].split('.')[0])
def validate_time(time: str) -> bool: def validate_time(time: str) -> bool:
valid_times = ['24h', '48h', '72h', '120h', '168h', '336h', '720h', '30d', '2190h', '90d', '6m', '1y', '2y', 'all'] # These can probably be rewritten as a lambda but at the time I am writing this I am just doing a first pass
return time in valid_times if time[-1] == 'h':
hours = int(time[0:-1])
return (hours >= 24) and (hours < 1000)
if time[-1] == 'd':
days = int(time[0:-1])
return (days >= 30) and (days <= 100)
if time[-1] == 'm':
months = int(time[0:-1])
return (months >= 1) and (months <= 12)
if time[-1] == 'y':
years = int(time[0:-1])
return (years >= 1) and (years <= 10)
return time == 'all'
def validate_region(region: str) -> bool: def validate_region(region: str) -> bool:
@ -233,10 +357,14 @@ def lambda_handler(event, context):
uri = event['Records'][0]['cf']['request']['uri'] uri = event['Records'][0]['cf']['request']['uri']
split_uri = uri.split('/') split_uri = uri.split('/')
if validate_path(split_uri): if validate_path(split_uri):
if 'classic' in split_uri:
version = 'classic'
else:
version = 'retail'
time = split_uri[-1].split('.')[0] time = split_uri[-1].split('.')[0]
region = split_uri[-2] region = split_uri[-2]
aggregate_function = split_uri[-3] aggregate_function = split_uri[-3]
data = historical_data(time, region) data = historical_data(time, region, version)
if validate_aggregate(aggregate_function): if validate_aggregate(aggregate_function):
data = aggregate_data(aggregate_function, data) data = aggregate_data(aggregate_function, data)