Initial implementations
Okay not really, but I have lost the original git repo so this is where we're at qq
This commit is contained in:
commit
8936cccd29
62
wow-token-current.py
Normal file
62
wow-token-current.py
Normal file
@ -0,0 +1,62 @@
|
||||
import boto3
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
|
||||
dynamo_region_map = {
|
||||
'us-west-1': 'us-west-1',
|
||||
'us-west-2': 'us-west-2',
|
||||
'us-east-1': 'us-east-1',
|
||||
'us-east-2': 'us-east-2',
|
||||
'ap-south-1': 'eu-north-1',
|
||||
'ap-northeast-3': 'ap-northeast-1',
|
||||
'ap-northeast-2': 'ap-northeast-1',
|
||||
'ap-southeast-1': 'ap-southeast-2',
|
||||
'ap-southeast-2': 'ap-southeast-2',
|
||||
'ap-northeast-1': 'ap-northeast-1',
|
||||
'ca-central-1': 'us-east-1',
|
||||
'eu-central-1': 'eu-north-1',
|
||||
'eu-west-1': 'eu-north-1',
|
||||
'eu-west-2': 'eu-north-1',
|
||||
'eu-west-3': 'eu-north-1',
|
||||
'eu-north-1': 'eu-north-1',
|
||||
'sa-east-1': 'sa-east-1',
|
||||
'eu-south-1': 'eu-north-1'
|
||||
} # This is a rough first pass at an intelligent region selector based on what is replicated
|
||||
local_region = ''
|
||||
if os.environ['AWS_REGION'] in dynamo_region_map:
|
||||
local_region = dynamo_region_map[os.environ['AWS_REGION']]
|
||||
else:
|
||||
local_region = 'eu-north-1'
|
||||
|
||||
dynamodb_client = boto3.resource('dynamodb', region_name=local_region)
|
||||
table = dynamodb_client.Table('wow-token-price')
|
||||
|
||||
regions = ['us', 'eu', 'tw', 'kr']
|
||||
regional_data = {
|
||||
'us': {'current_time': 0, 'price': 0},
|
||||
'eu': {'current_time': 0, 'price': 0},
|
||||
'tw': {'current_time': 0, 'price': 0},
|
||||
'kr': {'current_time': 0, 'price': 0}
|
||||
}
|
||||
|
||||
|
||||
def token_data():
|
||||
items = table.scan()['Items']
|
||||
data = {
|
||||
'current_time': datetime.datetime.utcfromtimestamp(int(items[0]['current_time'])).replace(
|
||||
tzinfo=datetime.timezone.utc).isoformat(),
|
||||
'price_data': {}
|
||||
}
|
||||
for item in items:
|
||||
data['price_data'][item['region']] = int(int(item['price']) / 10000)
|
||||
return data
|
||||
|
||||
|
||||
def lambda_handler(event, context):
|
||||
data = token_data()
|
||||
response = {'status': '200', 'statusDescription': 'OK', 'headers': {}}
|
||||
response['headers']['content-type'] = [{'key': 'Content-Type', 'value': 'application/json'}]
|
||||
response['body'] = json.dumps(data)
|
||||
print('AWS Region:' + os.environ['AWS_REGION'] + '\tdynamodb_connect_region: ' + local_region)
|
||||
return response
|
249
wow-token-historical.py
Normal file
249
wow-token-historical.py
Normal file
@ -0,0 +1,249 @@
|
||||
import sys
|
||||
|
||||
import boto3
|
||||
from boto3.dynamodb.conditions import Key
|
||||
import datetime
|
||||
import calendar
|
||||
import json
|
||||
import os
|
||||
import statistics
|
||||
|
||||
dynamo_region_map = {
|
||||
'us-west-1': 'us-west-1',
|
||||
'us-west-2': 'us-west-2',
|
||||
'us-east-1': 'us-east-1',
|
||||
'us-east-2': 'us-east-2',
|
||||
'ap-south-1': 'eu-north-1',
|
||||
'ap-northeast-3': 'ap-northeast-1',
|
||||
'ap-northeast-2': 'ap-northeast-1',
|
||||
'ap-southeast-1': 'ap-southeast-2',
|
||||
'ap-southeast-2': 'ap-southeast-2',
|
||||
'ap-northeast-1': 'ap-northeast-1',
|
||||
'ca-central-1': 'us-east-1',
|
||||
'eu-central-1': 'eu-north-1',
|
||||
'eu-west-1': 'eu-west-3',
|
||||
'eu-west-2': 'eu-west-3',
|
||||
'eu-west-3': 'eu-west-3',
|
||||
'eu-north-1': 'eu-north-1',
|
||||
'sa-east-1': 'sa-east-1',
|
||||
'eu-south-1': 'eu-north-1',
|
||||
|
||||
} # This is a rough first pass at an intelligent region selector based on what is replicated
|
||||
local_region = ''
|
||||
if os.environ['AWS_REGION'] in dynamo_region_map:
|
||||
local_region = dynamo_region_map[os.environ['AWS_REGION']]
|
||||
else:
|
||||
local_region = 'eu-west-3'
|
||||
|
||||
|
||||
timestream_client = boto3.client('timestream-query', region_name='us-east-1')
|
||||
dynamodb_client = boto3.resource('dynamodb', region_name=local_region)
|
||||
table = dynamodb_client.Table('wow-token-price-recent')
|
||||
|
||||
|
||||
def historical_data(time, region):
|
||||
# This shim is to permanently change the URL of 30d to 720h for local caching,
|
||||
# There seems to be at least 1 person using 30d (strangely with no .json) which was deprecated
|
||||
# as the data source for 1 month of data years ago
|
||||
if time == '30d':
|
||||
time = '720h'
|
||||
|
||||
if time[-1] == 'h':
|
||||
return dynamo_data(time, region)
|
||||
else:
|
||||
return timestream_data(time, region)
|
||||
|
||||
|
||||
def timestream_data(time, region):
|
||||
query_table = {
|
||||
'30d': {'interval': "INTERVAL '1' MONTH", 'bin': '15m'},
|
||||
'90d': {'interval': "INTERVAL '3' MONTH", 'bin': '30m'},
|
||||
'6m': {'interval': "INTERVAL '6' MONTH", 'bin': '1h'},
|
||||
'1y': {'interval': "INTERVAL '1' YEAR", 'bin': '2h'},
|
||||
'2y': {'interval': "INTERVAL '2' YEAR", 'bin': '2h'},
|
||||
'all': {'interval': "INTERVAL '10' YEAR", 'bin': '3h'},
|
||||
}
|
||||
timestream_query = 'WITH binned_query AS (' + \
|
||||
'SELECT BIN(time, ' + query_table[time]['bin'] + ') AS binned_time,' + \
|
||||
'CAST(AVG(measure_value::bigint) AS bigint) as average ' + \
|
||||
'FROM "wow-token-price-history"."' + region + '-price-history" ' + \
|
||||
"WHERE measure_name = 'price' " + \
|
||||
'AND time > now() - (' + query_table[time]['interval'] + ') ' + \
|
||||
'GROUP BY BIN(time, ' + query_table[time]['bin'] + "), 'price') " + \
|
||||
'SELECT CREATE_TIME_SERIES(binned_time, average) AS timeseries_data ' + \
|
||||
'FROM binned_query ' + \
|
||||
'ORDER BY timeseries_data ASC' # This is so ugly
|
||||
response = timestream_client.query(QueryString=timestream_query)
|
||||
rows = response['Rows'][0]['Data'][0]['TimeSeriesValue']
|
||||
data = []
|
||||
# TODO: Should we inject the aggregation functions into this loop to reduce the cost of looping?
|
||||
for row in rows:
|
||||
row_isotime = row['Time'].split('.')[0]
|
||||
row_datetime = datetime.datetime.fromisoformat(row_isotime).replace(
|
||||
tzinfo=datetime.timezone.utc)
|
||||
data.append({
|
||||
'time': row_datetime.isoformat(),
|
||||
'value': int(int(row['Value']['ScalarValue']) / 10000)
|
||||
})
|
||||
return data
|
||||
|
||||
|
||||
def dynamo_data(time, region):
|
||||
time_stripped = int(time[:-1])
|
||||
start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=time_stripped)
|
||||
start_time_utc = start_time.replace(tzinfo=datetime.timezone.utc)
|
||||
response = table.query(
|
||||
KeyConditionExpression=(
|
||||
Key('region').eq(region) &
|
||||
Key('timestamp').gte(int(start_time_utc.timestamp()))))
|
||||
data = []
|
||||
for item in response['Items']:
|
||||
item_time = datetime.datetime.utcfromtimestamp(int(item['timestamp'])).replace(
|
||||
tzinfo=datetime.timezone.utc).isoformat()
|
||||
data.append({
|
||||
'time': item_time,
|
||||
'value': int(int(item['price']) / 10000)
|
||||
})
|
||||
return data
|
||||
|
||||
|
||||
def aggregate_data(aggregate_function: str, data: list):
|
||||
if aggregate_function == 'daily_max':
|
||||
return max_min(1, 1, data)
|
||||
elif aggregate_function == 'daily_min':
|
||||
return max_min(-1, 1, data)
|
||||
elif aggregate_function == 'daily_mean':
|
||||
return mean(1, data)
|
||||
elif aggregate_function == 'weekly_max':
|
||||
return max_min(1, 7, data)
|
||||
elif aggregate_function == 'weekly_min':
|
||||
return max_min(-1, 7, data)
|
||||
elif aggregate_function == 'weekly_mean':
|
||||
return mean(7, data)
|
||||
|
||||
|
||||
def date_in_range(day_range: tuple, date: datetime.datetime):
|
||||
month_range = calendar.monthrange(date.year, date.month)
|
||||
if day_range[0] <= date.day < day_range[1]:
|
||||
return True
|
||||
elif date.day < day_range[1] and date.day < day_range[0]:
|
||||
# TODO: I am probably missing a sanity check here, come back to it
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def day_bucket(bucket_size: int, date: datetime.datetime) -> tuple[datetime.datetime, datetime.datetime]:
|
||||
month_range = calendar.monthrange(date.year, date.month)
|
||||
days_to_reset = {0: 1, 1: 0, 2: 6, 3: 5, 4: 4, 5: 3, 6: 2}
|
||||
# We want the bucket boundaries for a bucket size of 7 to fall on
|
||||
# reset day (index 1), and for a month (31) to fall on the actual boundaries of that month
|
||||
# this means month-to-month, there are dynamic sizing of buckets
|
||||
# TODO: Monthly boundaries
|
||||
if bucket_size == 7 and date.weekday() != 1:
|
||||
# This is WoW, the week starts on Tuesday (datetime index 1)
|
||||
bucket_size = days_to_reset[date.weekday()]
|
||||
|
||||
return tuple((date, date + datetime.timedelta(days=bucket_size)))
|
||||
|
||||
|
||||
def is_new_bucket(d_datetime: datetime.datetime, current_bucket_day: datetime.datetime.day, bucket: tuple) -> bool:
|
||||
if d_datetime.day != current_bucket_day and (d_datetime >= bucket[1] or d_datetime.weekday() == 1):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def max_min(fn: int, bucket_size: int, data: list) -> list:
|
||||
new_data = []
|
||||
first_date = datetime.datetime.fromisoformat(data[0]['time'])
|
||||
current_bucket_day = first_date.day
|
||||
# I hate working with dates
|
||||
bucket = day_bucket(bucket_size, first_date)
|
||||
min_max = {'minimum': 999_999_999, 'maximum': 0}
|
||||
min_max_date = {'minimum_date': datetime.datetime.min, 'maximum_date': datetime.datetime.max}
|
||||
|
||||
for d in data:
|
||||
d_datetime = datetime.datetime.fromisoformat(d['time'])
|
||||
# current_day is used to check if this 'if' has triggered for a new bucket and bypass if it has
|
||||
if is_new_bucket(d_datetime, current_bucket_day, bucket):
|
||||
current_bucket_day = d_datetime.day
|
||||
bucket = day_bucket(bucket_size, d_datetime)
|
||||
if fn == -1: # Minimum function
|
||||
new_data.append({'time': min_max_date['minimum_date'], 'value': min_max['minimum']})
|
||||
elif fn == 1: # Maximum function
|
||||
new_data.append({'time': min_max_date['maximum_date'], 'value': min_max['maximum']})
|
||||
min_max = {'minimum': 999_999_999, 'maximum': 0}
|
||||
min_max_date = {
|
||||
'minimum_date': datetime.datetime.min.isoformat(),
|
||||
'maximum_date': datetime.datetime.max.isoformat()
|
||||
}
|
||||
|
||||
if d['value'] < min_max['minimum']:
|
||||
min_max['minimum'] = d['value']
|
||||
min_max_date['minimum_date'] = d_datetime.isoformat()
|
||||
|
||||
if d['value'] > min_max['maximum']:
|
||||
min_max['maximum'] = d['value']
|
||||
min_max_date['maximum_date'] = d_datetime.isoformat()
|
||||
|
||||
return new_data
|
||||
|
||||
|
||||
def mean(bucket_size: int, data: list) -> list:
|
||||
new_data = []
|
||||
first_date = datetime.datetime.fromisoformat(data[0]['time'])
|
||||
current_bucket_day = first_date.day
|
||||
bucket = day_bucket(bucket_size, first_date)
|
||||
mean_bucket = []
|
||||
bucket_date = first_date
|
||||
|
||||
for d in data:
|
||||
d_datetime = datetime.datetime.fromisoformat(d['time'])
|
||||
if is_new_bucket(d_datetime, current_bucket_day, bucket):
|
||||
current_bucket_day = d_datetime.day
|
||||
bucket = day_bucket(bucket_size, d_datetime)
|
||||
new_data.append({'time': bucket[0].isoformat(), 'value': int(statistics.mean(mean_bucket))})
|
||||
mean_bucket = []
|
||||
|
||||
mean_bucket.append(d['value'])
|
||||
|
||||
return new_data
|
||||
|
||||
|
||||
def validate_path(split_uri: str) -> bool:
|
||||
return validate_region(split_uri[-2]) and validate_time(split_uri[-1].split('.')[0])
|
||||
|
||||
|
||||
def validate_time(time: str) -> bool:
|
||||
valid_times = ['24h', '48h', '72h', '120h', '168h', '336h', '720h', '30d', '2190h', '90d', '6m', '1y', '2y', 'all']
|
||||
return time in valid_times
|
||||
|
||||
|
||||
def validate_region(region: str) -> bool:
|
||||
valid_regions = ['us', 'eu', 'tw', 'kr']
|
||||
return region in valid_regions
|
||||
|
||||
|
||||
def validate_aggregate(aggregate_function: str) -> bool:
|
||||
valid_aggregates = ['daily_max', 'daily_min', 'daily_mean', 'weekly_max', 'weekly_min', 'weekly_mean']
|
||||
return aggregate_function in valid_aggregates
|
||||
|
||||
|
||||
def lambda_handler(event, context):
|
||||
uri = event['Records'][0]['cf']['request']['uri']
|
||||
split_uri = uri.split('/')
|
||||
if validate_path(split_uri):
|
||||
time = split_uri[-1].split('.')[0]
|
||||
region = split_uri[-2]
|
||||
aggregate_function = split_uri[-3]
|
||||
data = historical_data(time, region)
|
||||
|
||||
if validate_aggregate(aggregate_function):
|
||||
data = aggregate_data(aggregate_function, data)
|
||||
|
||||
response = {'status': '200', 'statusDescription': 'OK', 'headers': {}}
|
||||
response['headers']['content-type'] = [{'key': 'Content-Type', 'value': 'application/json'}]
|
||||
response['body'] = json.dumps(data)
|
||||
return response
|
||||
else:
|
||||
return {'status': '404', 'statusDescription': 'NotFound', 'headers': {}}
|
Loading…
Reference in New Issue
Block a user