From 36b5de0df27aab5b779e9330bd460ec5b8606d5e Mon Sep 17 00:00:00 2001 From: Emily Doherty Date: Mon, 2 Oct 2023 01:06:33 -0700 Subject: [PATCH] Update historical to support both Timestream and Dynamo, as well as multi-region support --- wow-token-historical.py | 206 ++++++++++++++++++++++++++++++++-------- 1 file changed, 167 insertions(+), 39 deletions(-) diff --git a/wow-token-historical.py b/wow-token-historical.py index a679f55..eb58710 100644 --- a/wow-token-historical.py +++ b/wow-token-historical.py @@ -2,12 +2,34 @@ import sys import boto3 from boto3.dynamodb.conditions import Key +from collections import deque import datetime import calendar import json import os import statistics +timestream_region_map = { + 'us-west-1': 'us-west-2', + 'us-west-2': 'us-west-2', + 'us-east-1': 'us-east-1', + 'us-east-2': 'us-east-2', + 'ap-south-1': 'eu-west-1', + 'ap-northeast-3': 'ap-northeast-1', + 'ap-northeast-2': 'ap-northeast-1', + 'ap-southeast-1': 'ap-southeast-2', + 'ap-southeast-2': 'ap-southeast-2', + 'ap-northeast-1': 'ap-northeast-1', + 'ca-central-1': 'us-east-1', + 'eu-central-1': 'eu-central-1', + 'eu-west-1': 'eu-west-1', + 'eu-west-2': 'eu-west-1', + 'eu-west-3': 'eu-central-1', + 'eu-north-1': 'eu-central-1', + 'sa-east-1': 'us-east-1', + 'eu-south-1': 'eu-west-1' +} + dynamo_region_map = { 'us-west-1': 'us-west-1', 'us-west-2': 'us-west-2', @@ -16,32 +38,42 @@ dynamo_region_map = { 'ap-south-1': 'eu-north-1', 'ap-northeast-3': 'ap-northeast-1', 'ap-northeast-2': 'ap-northeast-1', - 'ap-southeast-1': 'ap-southeast-2', + 'ap-southeast-1': 'ap-southeast-1', 'ap-southeast-2': 'ap-southeast-2', 'ap-northeast-1': 'ap-northeast-1', 'ca-central-1': 'us-east-1', 'eu-central-1': 'eu-north-1', - 'eu-west-1': 'eu-west-3', - 'eu-west-2': 'eu-west-3', + 'eu-west-1': 'eu-west-1', + 'eu-west-2': 'eu-west-1', 'eu-west-3': 'eu-west-3', 'eu-north-1': 'eu-north-1', 'sa-east-1': 'sa-east-1', - 'eu-south-1': 'eu-north-1', - + 'eu-south-1': 'eu-north-1' } # This is a rough first pass at an intelligent region selector based on what is replicated local_region = '' if os.environ['AWS_REGION'] in dynamo_region_map: - local_region = dynamo_region_map[os.environ['AWS_REGION']] + local_dynamo_region = dynamo_region_map[os.environ['AWS_REGION']] + local_timestream_region = timestream_region_map[os.environ['AWS_REGION']] else: - local_region = 'eu-west-3' + local_dynamo_region = 'eu-central-1' + local_timestream_region = 'eu-central-1' +timestream_client = boto3.client('timestream-query', region_name=local_timestream_region) +dynamodb_client = boto3.resource('dynamodb', region_name=local_dynamo_region) -timestream_client = boto3.client('timestream-query', region_name='us-east-1') -dynamodb_client = boto3.resource('dynamodb', region_name=local_region) -table = dynamodb_client.Table('wow-token-price-recent') - - -def historical_data(time, region): +tables = { + 'retail': { + 'recent': 'wow-token-price-recent', + 'current': 'wow-token-price', + 'timestream': 'wow-token-price-history' + }, + 'classic': { + 'recent': 'wow-token-classic-price-recent', + 'current': 'wow-token-classic-price', + 'timestream': 'wow-token-classic-price-history' + } +} +def historical_data(time, region, version): # This shim is to permanently change the URL of 30d to 720h for local caching, # There seems to be at least 1 person using 30d (strangely with no .json) which was deprecated # as the data source for 1 month of data years ago @@ -49,30 +81,54 @@ def historical_data(time, region): time = '720h' if time[-1] == 'h': - return dynamo_data(time, region) + return dynamo_data(time, region, version) else: - return timestream_data(time, region) + return timestream_data(time, region, version) -def timestream_data(time, region): - query_table = { - '30d': {'interval': "INTERVAL '1' MONTH", 'bin': '15m'}, - '90d': {'interval': "INTERVAL '3' MONTH", 'bin': '30m'}, - '6m': {'interval': "INTERVAL '6' MONTH", 'bin': '1h'}, - '1y': {'interval': "INTERVAL '1' YEAR", 'bin': '2h'}, - '2y': {'interval': "INTERVAL '2' YEAR", 'bin': '2h'}, - 'all': {'interval': "INTERVAL '10' YEAR", 'bin': '3h'}, - } - timestream_query = 'WITH binned_query AS (' + \ - 'SELECT BIN(time, ' + query_table[time]['bin'] + ') AS binned_time,' + \ - 'CAST(AVG(measure_value::bigint) AS bigint) as average ' + \ - 'FROM "wow-token-price-history"."' + region + '-price-history" ' + \ - "WHERE measure_name = 'price' " + \ - 'AND time > now() - (' + query_table[time]['interval'] + ') ' + \ - 'GROUP BY BIN(time, ' + query_table[time]['bin'] + "), 'price') " + \ - 'SELECT CREATE_TIME_SERIES(binned_time, average) AS timeseries_data ' + \ - 'FROM binned_query ' + \ - 'ORDER BY timeseries_data ASC' # This is so ugly +def timestream_data(time, region, version): + def __interval_bin(__time: str) -> dict: + __time = __time.lower() + + def __div_floor(divisor: int, floor: int) -> int: + res = int(int(__time[:-1]) / divisor) + if res < floor: + return floor + return res + + if __time == 'all': + return {'interval': "INTERVAL '10' YEAR", 'bin': '3h'} + + if __time[-1] == 'd': + # I don't believe fstrings would work here + return { + 'interval': "INTERVAL '" + __time[:-1] + "' DAY", + 'bin': str(__div_floor(3, 1)) + 'm' + } + + if __time[-1] == 'm': + return { + 'interval': "INTERVAL '" + __time[:-1] + "' MONTH", + 'bin': str(__div_floor(6, 1)) + 'h' + } + + if __time[-1] == 'y': + return { + 'interval': "INTERVAL '" + __time[:-1] + "' YEAR", + 'bin': str(__div_floor(6, 1)) + 'h' + } + + print(f"Function region: {os.environ['AWS_REGION']}\t Timestream Region: {local_region}") + timestream_query = (f"WITH binned_query AS (" + f" SELECT BIN(time, {__interval_bin(time)['bin']}) AS binned_time," + f" CAST(AVG(measure_value::bigint) AS bigint) as average " + f"FROM \"{tables[version]['timestream']}\".\"{region}-price-history\" " + f"WHERE measure_name = 'price' " + f" AND time > now() - ({__interval_bin(time)['interval']}) " + f"GROUP BY BIN(time, {__interval_bin(time)['bin']}), 'price') " + f"SELECT CREATE_TIME_SERIES(binned_time, average) AS timeseries_data " + f"FROM binned_query ORDER BY timeseries_data ASC") + response = timestream_client.query(QueryString=timestream_query) rows = response['Rows'][0]['Data'][0]['TimeSeriesValue'] data = [] @@ -88,10 +144,12 @@ def timestream_data(time, region): return data -def dynamo_data(time, region): +def dynamo_data(time, region, version): + print(f"Function region: {os.environ['AWS_REGION']}\t Dynamo Region: {local_region}") time_stripped = int(time[:-1]) start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=time_stripped) start_time_utc = start_time.replace(tzinfo=datetime.timezone.utc) + table = dynamodb_client.Table(tables[version]['recent']) response = table.query( KeyConditionExpression=( Key('region').eq(region) & @@ -153,6 +211,13 @@ def is_new_bucket(d_datetime: datetime.datetime, current_bucket_day: datetime.da return False +def __sum_total(__data: list) -> int: + __total = 0 + for __d in __data: + __total += __d['value'] + return __total + + def max_min(fn: int, bucket_size: int, data: list) -> list: new_data = [] first_date = datetime.datetime.fromisoformat(data[0]['time']) @@ -210,13 +275,72 @@ def mean(bucket_size: int, data: list) -> list: return new_data -def validate_path(split_uri: str) -> bool: +# TODO FIXME +def simple_moving_average(hours: int, data: list) -> list: + # The cyclomatic complexity of this function is getting high, I need to figure out a more elegant solution + new_data = [] + queue = deque() + hours_in_queue = 0 + head_date = datetime.datetime.fromisoformat(data[8]['time']) + for datum in data: + datum_datetime = datetime.datetime.fromisoformat(datum['time']) + if datum_datetime.hour == head_date.hour: + queue.append(datum) + elif datum_datetime.hour != head_date.hour: + if hours_in_queue == hours: + q_list = list(queue) + total = __sum_total(q_list) + new_datum = { + 'value': int(total / len(q_list)), + 'time': head_date.isoformat() + } + new_data.append(new_datum) + deque_val = 0 + for d in q_list: + __dt = datetime.datetime.fromisoformat(d['time']) + if __dt.hour == head_date.hour and __dt.day == __dt.day: + deque_val += 1 + while deque_val != 0: + queue.pop() + deque_val -= 1 + hours_in_queue -= 1 + head_date = datum_datetime + elif hours_in_queue < 5: + queue.append(datum) + hours_in_queue += 1 + return new_data + + +def moving_weighted_average(days: int, data: list) -> list: + pass + + +def validate_path(split_uri: list) -> bool: + if not split_uri[-1].endswith('json'): + return False + return validate_region(split_uri[-2]) and validate_time(split_uri[-1].split('.')[0]) def validate_time(time: str) -> bool: - valid_times = ['24h', '48h', '72h', '120h', '168h', '336h', '720h', '30d', '2190h', '90d', '6m', '1y', '2y', 'all'] - return time in valid_times + # These can probably be rewritten as a lambda but at the time I am writing this I am just doing a first pass + if time[-1] == 'h': + hours = int(time[0:-1]) + return (hours >= 24) and (hours < 1000) + + if time[-1] == 'd': + days = int(time[0:-1]) + return (days >= 30) and (days <= 100) + + if time[-1] == 'm': + months = int(time[0:-1]) + return (months >= 1) and (months <= 12) + + if time[-1] == 'y': + years = int(time[0:-1]) + return (years >= 1) and (years <= 10) + + return time == 'all' def validate_region(region: str) -> bool: @@ -233,10 +357,14 @@ def lambda_handler(event, context): uri = event['Records'][0]['cf']['request']['uri'] split_uri = uri.split('/') if validate_path(split_uri): + if 'classic' in split_uri: + version = 'classic' + else: + version = 'retail' time = split_uri[-1].split('.')[0] region = split_uri[-2] aggregate_function = split_uri[-3] - data = historical_data(time, region) + data = historical_data(time, region, version) if validate_aggregate(aggregate_function): data = aggregate_data(aggregate_function, data)