import sys import boto3 from boto3.dynamodb.conditions import Key import datetime import calendar import json import os import statistics dynamo_region_map = { 'us-west-1': 'us-west-1', 'us-west-2': 'us-west-2', 'us-east-1': 'us-east-1', 'us-east-2': 'us-east-2', 'ap-south-1': 'eu-north-1', 'ap-northeast-3': 'ap-northeast-1', 'ap-northeast-2': 'ap-northeast-1', 'ap-southeast-1': 'ap-southeast-2', 'ap-southeast-2': 'ap-southeast-2', 'ap-northeast-1': 'ap-northeast-1', 'ca-central-1': 'us-east-1', 'eu-central-1': 'eu-north-1', 'eu-west-1': 'eu-west-3', 'eu-west-2': 'eu-west-3', 'eu-west-3': 'eu-west-3', 'eu-north-1': 'eu-north-1', 'sa-east-1': 'sa-east-1', 'eu-south-1': 'eu-north-1', } # This is a rough first pass at an intelligent region selector based on what is replicated local_region = '' if os.environ['AWS_REGION'] in dynamo_region_map: local_region = dynamo_region_map[os.environ['AWS_REGION']] else: local_region = 'eu-west-3' timestream_client = boto3.client('timestream-query', region_name='us-east-1') dynamodb_client = boto3.resource('dynamodb', region_name=local_region) table = dynamodb_client.Table('wow-token-price-recent') def historical_data(time, region): # This shim is to permanently change the URL of 30d to 720h for local caching, # There seems to be at least 1 person using 30d (strangely with no .json) which was deprecated # as the data source for 1 month of data years ago if time == '30d': time = '720h' if time[-1] == 'h': return dynamo_data(time, region) else: return timestream_data(time, region) def timestream_data(time, region): query_table = { '30d': {'interval': "INTERVAL '1' MONTH", 'bin': '15m'}, '90d': {'interval': "INTERVAL '3' MONTH", 'bin': '30m'}, '6m': {'interval': "INTERVAL '6' MONTH", 'bin': '1h'}, '1y': {'interval': "INTERVAL '1' YEAR", 'bin': '2h'}, '2y': {'interval': "INTERVAL '2' YEAR", 'bin': '2h'}, 'all': {'interval': "INTERVAL '10' YEAR", 'bin': '3h'}, } timestream_query = 'WITH binned_query AS (' + \ 'SELECT BIN(time, ' + query_table[time]['bin'] + ') AS binned_time,' + \ 'CAST(AVG(measure_value::bigint) AS bigint) as average ' + \ 'FROM "wow-token-price-history"."' + region + '-price-history" ' + \ "WHERE measure_name = 'price' " + \ 'AND time > now() - (' + query_table[time]['interval'] + ') ' + \ 'GROUP BY BIN(time, ' + query_table[time]['bin'] + "), 'price') " + \ 'SELECT CREATE_TIME_SERIES(binned_time, average) AS timeseries_data ' + \ 'FROM binned_query ' + \ 'ORDER BY timeseries_data ASC' # This is so ugly response = timestream_client.query(QueryString=timestream_query) rows = response['Rows'][0]['Data'][0]['TimeSeriesValue'] data = [] # TODO: Should we inject the aggregation functions into this loop to reduce the cost of looping? for row in rows: row_isotime = row['Time'].split('.')[0] row_datetime = datetime.datetime.fromisoformat(row_isotime).replace( tzinfo=datetime.timezone.utc) data.append({ 'time': row_datetime.isoformat(), 'value': int(int(row['Value']['ScalarValue']) / 10000) }) return data def dynamo_data(time, region): time_stripped = int(time[:-1]) start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=time_stripped) start_time_utc = start_time.replace(tzinfo=datetime.timezone.utc) response = table.query( KeyConditionExpression=( Key('region').eq(region) & Key('timestamp').gte(int(start_time_utc.timestamp())))) data = [] for item in response['Items']: item_time = datetime.datetime.utcfromtimestamp(int(item['timestamp'])).replace( tzinfo=datetime.timezone.utc).isoformat() data.append({ 'time': item_time, 'value': int(int(item['price']) / 10000) }) return data def aggregate_data(aggregate_function: str, data: list): if aggregate_function == 'daily_max': return max_min(1, 1, data) elif aggregate_function == 'daily_min': return max_min(-1, 1, data) elif aggregate_function == 'daily_mean': return mean(1, data) elif aggregate_function == 'weekly_max': return max_min(1, 7, data) elif aggregate_function == 'weekly_min': return max_min(-1, 7, data) elif aggregate_function == 'weekly_mean': return mean(7, data) def date_in_range(day_range: tuple, date: datetime.datetime): month_range = calendar.monthrange(date.year, date.month) if day_range[0] <= date.day < day_range[1]: return True elif date.day < day_range[1] and date.day < day_range[0]: # TODO: I am probably missing a sanity check here, come back to it return True else: return False def day_bucket(bucket_size: int, date: datetime.datetime) -> tuple[datetime.datetime, datetime.datetime]: month_range = calendar.monthrange(date.year, date.month) days_to_reset = {0: 1, 1: 0, 2: 6, 3: 5, 4: 4, 5: 3, 6: 2} # We want the bucket boundaries for a bucket size of 7 to fall on # reset day (index 1), and for a month (31) to fall on the actual boundaries of that month # this means month-to-month, there are dynamic sizing of buckets # TODO: Monthly boundaries if bucket_size == 7 and date.weekday() != 1: # This is WoW, the week starts on Tuesday (datetime index 1) bucket_size = days_to_reset[date.weekday()] return tuple((date, date + datetime.timedelta(days=bucket_size))) def is_new_bucket(d_datetime: datetime.datetime, current_bucket_day: datetime.datetime.day, bucket: tuple) -> bool: if d_datetime.day != current_bucket_day and (d_datetime >= bucket[1] or d_datetime.weekday() == 1): return True return False def max_min(fn: int, bucket_size: int, data: list) -> list: new_data = [] first_date = datetime.datetime.fromisoformat(data[0]['time']) current_bucket_day = first_date.day # I hate working with dates bucket = day_bucket(bucket_size, first_date) min_max = {'minimum': 999_999_999, 'maximum': 0} min_max_date = {'minimum_date': datetime.datetime.min, 'maximum_date': datetime.datetime.max} for d in data: d_datetime = datetime.datetime.fromisoformat(d['time']) # current_day is used to check if this 'if' has triggered for a new bucket and bypass if it has if is_new_bucket(d_datetime, current_bucket_day, bucket): current_bucket_day = d_datetime.day bucket = day_bucket(bucket_size, d_datetime) if fn == -1: # Minimum function new_data.append({'time': min_max_date['minimum_date'], 'value': min_max['minimum']}) elif fn == 1: # Maximum function new_data.append({'time': min_max_date['maximum_date'], 'value': min_max['maximum']}) min_max = {'minimum': 999_999_999, 'maximum': 0} min_max_date = { 'minimum_date': datetime.datetime.min.isoformat(), 'maximum_date': datetime.datetime.max.isoformat() } if d['value'] < min_max['minimum']: min_max['minimum'] = d['value'] min_max_date['minimum_date'] = d_datetime.isoformat() if d['value'] > min_max['maximum']: min_max['maximum'] = d['value'] min_max_date['maximum_date'] = d_datetime.isoformat() return new_data def mean(bucket_size: int, data: list) -> list: new_data = [] first_date = datetime.datetime.fromisoformat(data[0]['time']) current_bucket_day = first_date.day bucket = day_bucket(bucket_size, first_date) mean_bucket = [] bucket_date = first_date for d in data: d_datetime = datetime.datetime.fromisoformat(d['time']) if is_new_bucket(d_datetime, current_bucket_day, bucket): current_bucket_day = d_datetime.day bucket = day_bucket(bucket_size, d_datetime) new_data.append({'time': bucket[0].isoformat(), 'value': int(statistics.mean(mean_bucket))}) mean_bucket = [] mean_bucket.append(d['value']) return new_data def validate_path(split_uri: str) -> bool: return validate_region(split_uri[-2]) and validate_time(split_uri[-1].split('.')[0]) def validate_time(time: str) -> bool: valid_times = ['24h', '48h', '72h', '120h', '168h', '336h', '720h', '30d', '2190h', '90d', '6m', '1y', '2y', 'all'] return time in valid_times def validate_region(region: str) -> bool: valid_regions = ['us', 'eu', 'tw', 'kr'] return region in valid_regions def validate_aggregate(aggregate_function: str) -> bool: valid_aggregates = ['daily_max', 'daily_min', 'daily_mean', 'weekly_max', 'weekly_min', 'weekly_mean'] return aggregate_function in valid_aggregates def lambda_handler(event, context): uri = event['Records'][0]['cf']['request']['uri'] split_uri = uri.split('/') if validate_path(split_uri): time = split_uri[-1].split('.')[0] region = split_uri[-2] aggregate_function = split_uri[-3] data = historical_data(time, region) if validate_aggregate(aggregate_function): data = aggregate_data(aggregate_function, data) response = {'status': '200', 'statusDescription': 'OK', 'headers': {}} response['headers']['content-type'] = [{'key': 'Content-Type', 'value': 'application/json'}] response['body'] = json.dumps(data) return response else: return {'status': '404', 'statusDescription': 'NotFound', 'headers': {}}