commit 8936cccd29becbbcc7dbd99f4d38693537feb3ec Author: Emily Doherty Date: Fri Sep 8 23:20:13 2023 -0700 Initial implementations Okay not really, but I have lost the original git repo so this is where we're at qq diff --git a/wow-token-current.py b/wow-token-current.py new file mode 100644 index 0000000..61fddf8 --- /dev/null +++ b/wow-token-current.py @@ -0,0 +1,62 @@ +import boto3 +import datetime +import json +import os + +dynamo_region_map = { + 'us-west-1': 'us-west-1', + 'us-west-2': 'us-west-2', + 'us-east-1': 'us-east-1', + 'us-east-2': 'us-east-2', + 'ap-south-1': 'eu-north-1', + 'ap-northeast-3': 'ap-northeast-1', + 'ap-northeast-2': 'ap-northeast-1', + 'ap-southeast-1': 'ap-southeast-2', + 'ap-southeast-2': 'ap-southeast-2', + 'ap-northeast-1': 'ap-northeast-1', + 'ca-central-1': 'us-east-1', + 'eu-central-1': 'eu-north-1', + 'eu-west-1': 'eu-north-1', + 'eu-west-2': 'eu-north-1', + 'eu-west-3': 'eu-north-1', + 'eu-north-1': 'eu-north-1', + 'sa-east-1': 'sa-east-1', + 'eu-south-1': 'eu-north-1' +} # This is a rough first pass at an intelligent region selector based on what is replicated +local_region = '' +if os.environ['AWS_REGION'] in dynamo_region_map: + local_region = dynamo_region_map[os.environ['AWS_REGION']] +else: + local_region = 'eu-north-1' + +dynamodb_client = boto3.resource('dynamodb', region_name=local_region) +table = dynamodb_client.Table('wow-token-price') + +regions = ['us', 'eu', 'tw', 'kr'] +regional_data = { + 'us': {'current_time': 0, 'price': 0}, + 'eu': {'current_time': 0, 'price': 0}, + 'tw': {'current_time': 0, 'price': 0}, + 'kr': {'current_time': 0, 'price': 0} +} + + +def token_data(): + items = table.scan()['Items'] + data = { + 'current_time': datetime.datetime.utcfromtimestamp(int(items[0]['current_time'])).replace( + tzinfo=datetime.timezone.utc).isoformat(), + 'price_data': {} + } + for item in items: + data['price_data'][item['region']] = int(int(item['price']) / 10000) + return data + + +def lambda_handler(event, context): + data = token_data() + response = {'status': '200', 'statusDescription': 'OK', 'headers': {}} + response['headers']['content-type'] = [{'key': 'Content-Type', 'value': 'application/json'}] + response['body'] = json.dumps(data) + print('AWS Region:' + os.environ['AWS_REGION'] + '\tdynamodb_connect_region: ' + local_region) + return response diff --git a/wow-token-historical.py b/wow-token-historical.py new file mode 100644 index 0000000..a679f55 --- /dev/null +++ b/wow-token-historical.py @@ -0,0 +1,249 @@ +import sys + +import boto3 +from boto3.dynamodb.conditions import Key +import datetime +import calendar +import json +import os +import statistics + +dynamo_region_map = { + 'us-west-1': 'us-west-1', + 'us-west-2': 'us-west-2', + 'us-east-1': 'us-east-1', + 'us-east-2': 'us-east-2', + 'ap-south-1': 'eu-north-1', + 'ap-northeast-3': 'ap-northeast-1', + 'ap-northeast-2': 'ap-northeast-1', + 'ap-southeast-1': 'ap-southeast-2', + 'ap-southeast-2': 'ap-southeast-2', + 'ap-northeast-1': 'ap-northeast-1', + 'ca-central-1': 'us-east-1', + 'eu-central-1': 'eu-north-1', + 'eu-west-1': 'eu-west-3', + 'eu-west-2': 'eu-west-3', + 'eu-west-3': 'eu-west-3', + 'eu-north-1': 'eu-north-1', + 'sa-east-1': 'sa-east-1', + 'eu-south-1': 'eu-north-1', + +} # This is a rough first pass at an intelligent region selector based on what is replicated +local_region = '' +if os.environ['AWS_REGION'] in dynamo_region_map: + local_region = dynamo_region_map[os.environ['AWS_REGION']] +else: + local_region = 'eu-west-3' + + +timestream_client = boto3.client('timestream-query', region_name='us-east-1') +dynamodb_client = boto3.resource('dynamodb', region_name=local_region) +table = dynamodb_client.Table('wow-token-price-recent') + + +def historical_data(time, region): + # This shim is to permanently change the URL of 30d to 720h for local caching, + # There seems to be at least 1 person using 30d (strangely with no .json) which was deprecated + # as the data source for 1 month of data years ago + if time == '30d': + time = '720h' + + if time[-1] == 'h': + return dynamo_data(time, region) + else: + return timestream_data(time, region) + + +def timestream_data(time, region): + query_table = { + '30d': {'interval': "INTERVAL '1' MONTH", 'bin': '15m'}, + '90d': {'interval': "INTERVAL '3' MONTH", 'bin': '30m'}, + '6m': {'interval': "INTERVAL '6' MONTH", 'bin': '1h'}, + '1y': {'interval': "INTERVAL '1' YEAR", 'bin': '2h'}, + '2y': {'interval': "INTERVAL '2' YEAR", 'bin': '2h'}, + 'all': {'interval': "INTERVAL '10' YEAR", 'bin': '3h'}, + } + timestream_query = 'WITH binned_query AS (' + \ + 'SELECT BIN(time, ' + query_table[time]['bin'] + ') AS binned_time,' + \ + 'CAST(AVG(measure_value::bigint) AS bigint) as average ' + \ + 'FROM "wow-token-price-history"."' + region + '-price-history" ' + \ + "WHERE measure_name = 'price' " + \ + 'AND time > now() - (' + query_table[time]['interval'] + ') ' + \ + 'GROUP BY BIN(time, ' + query_table[time]['bin'] + "), 'price') " + \ + 'SELECT CREATE_TIME_SERIES(binned_time, average) AS timeseries_data ' + \ + 'FROM binned_query ' + \ + 'ORDER BY timeseries_data ASC' # This is so ugly + response = timestream_client.query(QueryString=timestream_query) + rows = response['Rows'][0]['Data'][0]['TimeSeriesValue'] + data = [] + # TODO: Should we inject the aggregation functions into this loop to reduce the cost of looping? + for row in rows: + row_isotime = row['Time'].split('.')[0] + row_datetime = datetime.datetime.fromisoformat(row_isotime).replace( + tzinfo=datetime.timezone.utc) + data.append({ + 'time': row_datetime.isoformat(), + 'value': int(int(row['Value']['ScalarValue']) / 10000) + }) + return data + + +def dynamo_data(time, region): + time_stripped = int(time[:-1]) + start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=time_stripped) + start_time_utc = start_time.replace(tzinfo=datetime.timezone.utc) + response = table.query( + KeyConditionExpression=( + Key('region').eq(region) & + Key('timestamp').gte(int(start_time_utc.timestamp())))) + data = [] + for item in response['Items']: + item_time = datetime.datetime.utcfromtimestamp(int(item['timestamp'])).replace( + tzinfo=datetime.timezone.utc).isoformat() + data.append({ + 'time': item_time, + 'value': int(int(item['price']) / 10000) + }) + return data + + +def aggregate_data(aggregate_function: str, data: list): + if aggregate_function == 'daily_max': + return max_min(1, 1, data) + elif aggregate_function == 'daily_min': + return max_min(-1, 1, data) + elif aggregate_function == 'daily_mean': + return mean(1, data) + elif aggregate_function == 'weekly_max': + return max_min(1, 7, data) + elif aggregate_function == 'weekly_min': + return max_min(-1, 7, data) + elif aggregate_function == 'weekly_mean': + return mean(7, data) + + +def date_in_range(day_range: tuple, date: datetime.datetime): + month_range = calendar.monthrange(date.year, date.month) + if day_range[0] <= date.day < day_range[1]: + return True + elif date.day < day_range[1] and date.day < day_range[0]: + # TODO: I am probably missing a sanity check here, come back to it + return True + else: + return False + + +def day_bucket(bucket_size: int, date: datetime.datetime) -> tuple[datetime.datetime, datetime.datetime]: + month_range = calendar.monthrange(date.year, date.month) + days_to_reset = {0: 1, 1: 0, 2: 6, 3: 5, 4: 4, 5: 3, 6: 2} + # We want the bucket boundaries for a bucket size of 7 to fall on + # reset day (index 1), and for a month (31) to fall on the actual boundaries of that month + # this means month-to-month, there are dynamic sizing of buckets + # TODO: Monthly boundaries + if bucket_size == 7 and date.weekday() != 1: + # This is WoW, the week starts on Tuesday (datetime index 1) + bucket_size = days_to_reset[date.weekday()] + + return tuple((date, date + datetime.timedelta(days=bucket_size))) + + +def is_new_bucket(d_datetime: datetime.datetime, current_bucket_day: datetime.datetime.day, bucket: tuple) -> bool: + if d_datetime.day != current_bucket_day and (d_datetime >= bucket[1] or d_datetime.weekday() == 1): + return True + return False + + +def max_min(fn: int, bucket_size: int, data: list) -> list: + new_data = [] + first_date = datetime.datetime.fromisoformat(data[0]['time']) + current_bucket_day = first_date.day + # I hate working with dates + bucket = day_bucket(bucket_size, first_date) + min_max = {'minimum': 999_999_999, 'maximum': 0} + min_max_date = {'minimum_date': datetime.datetime.min, 'maximum_date': datetime.datetime.max} + + for d in data: + d_datetime = datetime.datetime.fromisoformat(d['time']) + # current_day is used to check if this 'if' has triggered for a new bucket and bypass if it has + if is_new_bucket(d_datetime, current_bucket_day, bucket): + current_bucket_day = d_datetime.day + bucket = day_bucket(bucket_size, d_datetime) + if fn == -1: # Minimum function + new_data.append({'time': min_max_date['minimum_date'], 'value': min_max['minimum']}) + elif fn == 1: # Maximum function + new_data.append({'time': min_max_date['maximum_date'], 'value': min_max['maximum']}) + min_max = {'minimum': 999_999_999, 'maximum': 0} + min_max_date = { + 'minimum_date': datetime.datetime.min.isoformat(), + 'maximum_date': datetime.datetime.max.isoformat() + } + + if d['value'] < min_max['minimum']: + min_max['minimum'] = d['value'] + min_max_date['minimum_date'] = d_datetime.isoformat() + + if d['value'] > min_max['maximum']: + min_max['maximum'] = d['value'] + min_max_date['maximum_date'] = d_datetime.isoformat() + + return new_data + + +def mean(bucket_size: int, data: list) -> list: + new_data = [] + first_date = datetime.datetime.fromisoformat(data[0]['time']) + current_bucket_day = first_date.day + bucket = day_bucket(bucket_size, first_date) + mean_bucket = [] + bucket_date = first_date + + for d in data: + d_datetime = datetime.datetime.fromisoformat(d['time']) + if is_new_bucket(d_datetime, current_bucket_day, bucket): + current_bucket_day = d_datetime.day + bucket = day_bucket(bucket_size, d_datetime) + new_data.append({'time': bucket[0].isoformat(), 'value': int(statistics.mean(mean_bucket))}) + mean_bucket = [] + + mean_bucket.append(d['value']) + + return new_data + + +def validate_path(split_uri: str) -> bool: + return validate_region(split_uri[-2]) and validate_time(split_uri[-1].split('.')[0]) + + +def validate_time(time: str) -> bool: + valid_times = ['24h', '48h', '72h', '120h', '168h', '336h', '720h', '30d', '2190h', '90d', '6m', '1y', '2y', 'all'] + return time in valid_times + + +def validate_region(region: str) -> bool: + valid_regions = ['us', 'eu', 'tw', 'kr'] + return region in valid_regions + + +def validate_aggregate(aggregate_function: str) -> bool: + valid_aggregates = ['daily_max', 'daily_min', 'daily_mean', 'weekly_max', 'weekly_min', 'weekly_mean'] + return aggregate_function in valid_aggregates + + +def lambda_handler(event, context): + uri = event['Records'][0]['cf']['request']['uri'] + split_uri = uri.split('/') + if validate_path(split_uri): + time = split_uri[-1].split('.')[0] + region = split_uri[-2] + aggregate_function = split_uri[-3] + data = historical_data(time, region) + + if validate_aggregate(aggregate_function): + data = aggregate_data(aggregate_function, data) + + response = {'status': '200', 'statusDescription': 'OK', 'headers': {}} + response['headers']['content-type'] = [{'key': 'Content-Type', 'value': 'application/json'}] + response['body'] = json.dumps(data) + return response + else: + return {'status': '404', 'statusDescription': 'NotFound', 'headers': {}}