Removing the usage of Timestream

Moving to using a DynamoDB global table with compacted writing
This commit is contained in:
Emily Doherty 2024-10-29 15:13:41 -07:00
parent 5032d2b974
commit 27cd98ee52
3 changed files with 391 additions and 196 deletions

357
wow-token-compactor.py Normal file
View File

@ -0,0 +1,357 @@
import time
from decimal import Decimal
from functools import cache
from typing import Tuple, List, Dict
import boto3
import json
import os
import datetime
from boto3.dynamodb.conditions import Key
dynamo_region_map = {
'us-west-1': 'us-west-1',
'us-west-2': 'us-west-2',
'us-east-1': 'us-east-1',
'us-east-2': 'us-east-2',
'ap-south-1': 'eu-north-1',
'ap-northeast-3': 'ap-northeast-1',
'ap-northeast-2': 'ap-northeast-1',
'ap-southeast-1': 'ap-southeast-1',
'ap-southeast-2': 'ap-southeast-2',
'ap-northeast-1': 'ap-northeast-1',
'ca-central-1': 'us-east-1',
'eu-central-1': 'eu-north-1',
'eu-west-1': 'eu-west-1',
'eu-west-2': 'eu-west-1',
'eu-west-3': 'eu-west-3',
'eu-north-1': 'eu-north-1',
'sa-east-1': 'sa-east-1',
'eu-south-1': 'eu-north-1'
} # This is a rough first pass at an intelligent region selector based on what is replicated
if os.environ['AWS_REGION'] in dynamo_region_map:
local_dynamo_region = dynamo_region_map[os.environ['AWS_REGION']]
else:
local_dynamo_region = 'eu-central-1'
DYNAMO_CLIENT = boto3.resource('dynamodb', region_name=local_dynamo_region)
class PriceHistory:
region: str
flavor: str
_prices: Dict[str, List[Tuple[int, int]]]
def __init__(self, region: str, flavor: str):
self.region = region
self.flavor = flavor
self._prices = dict()
def _retrieve_compacted_prices_from_dynamo(self, year, month) -> None:
table = DYNAMO_CLIENT.Table('wow-token-compacted')
pk = f'{self.region}-{self.flavor}-{year}-{month}'
data = []
response = table.query(
KeyConditionExpression=Key('region-flavor-timestamp').eq(pk)
)
if response['Items']:
self._prices[f'{year}-{month}'] = []
for timestamp, price in response['Items'][0]['data'].items():
data.append((int(timestamp), int(price)))
self._prices[f'{year}-{month}'] = sorted(data, key=lambda x: x[0])
def _retrieve_time_bin(self, start_time: datetime.datetime, end_time: datetime.datetime) -> List[Tuple[int, int]]:
scan_data = self.get_month_prices(start_time.month, start_time.year)
if end_time.year != start_time.year or end_time.month != start_time.month:
scan_data += self.get_month_prices(end_time.month, end_time.year) + scan_data
high_tuple = (0,0)
low_tuple = (0,0)
for item in scan_data:
if start_time.timestamp() <= item[0] < end_time.timestamp():
if item[1] > high_tuple[1]:
high_tuple = item
if item[1] < low_tuple[1] or low_tuple[0] == 0:
low_tuple = item
if high_tuple[0] == 0 or low_tuple[0] == 0:
return []
else:
if high_tuple[0] == low_tuple[0]:
return [high_tuple]
elif low_tuple[0] > high_tuple[0]:
return [high_tuple, low_tuple]
else:
return [low_tuple, high_tuple]
def request_time_to_datetime_pair(self, time_str: str) -> Tuple[datetime.datetime, datetime.datetime]:
end_time = datetime.datetime.now(datetime.timezone.utc)
if time_str == 'all':
if self.flavor == 'retail':
start_time = datetime.datetime.fromisoformat('2020-11-15 00:00:01.000000000+00:00')
else:
start_time = datetime.datetime.fromisoformat('2023-05-23 00:00:01.000000000+00:00')
elif time_str[-1] == 'd':
days = int(time_str[:-1])
start_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days)
elif time_str[-1] == 'm':
months = int(time_str[:-1])
start_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=int(30.437*months))
elif time_str[-1] == 'y':
years = int(time_str[:-1])
start_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=int(365.25*years))
else:
raise ValueError
return start_time, end_time
def binned_time(self, start_time: datetime.datetime, end_time: datetime.datetime) -> List[Tuple[int, int]]:
time_delta = end_time - start_time
hours = time_delta.days * 24 + time_delta.seconds // 3600
if hours > 8800: # Above a year
bin_size = 12
elif hours > 4400: # Above 6 months
bin_size = 6
elif hours > 2112: # 3 months
bin_size = 2
else:
bin_size = 1
_bin_start = start_time
_bin_end = _bin_start + datetime.timedelta(hours=bin_size, seconds=-1)
data = []
while _bin_start < end_time:
data += self._retrieve_time_bin(_bin_start, _bin_end)
_bin_start = _bin_end + datetime.timedelta(hours=1)
_bin_end = _bin_start + datetime.timedelta(hours=bin_size, seconds=-1)
return data
def get_month_prices(self, month: int|str, year: int|str) -> List[Tuple[int, int]]:
if isinstance(month, int):
month = str(month)
if isinstance(year, int):
year = str(year)
if f'{year}-{month}' not in self._prices:
self._retrieve_compacted_prices_from_dynamo(year, month)
return self._prices[f'{year}-{month}']
def retrieve_binned_prices(self, time_str: str) -> List[Tuple[int, int]]:
table = DYNAMO_CLIENT.Table('wow-token-compacted')
pk = f'{self.region}-{self.flavor}-{time_str}'
response = table.get_item(
Key={
'region-flavor-timestamp': pk
}
)
if 'Item' not in response:
return []
data = []
for _time, _price in response['Item']['data'].items():
data.append((int(_time), int(_price)))
return sorted(data)
def write_binned_if_updated(self, time_str: str) -> bool:
current_binned_prices = self.retrieve_binned_prices(time_str)
start, end = self.request_time_to_datetime_pair(time_str)
binned_data = sorted(self.binned_time(start, end))
if all(item in current_binned_prices for item in binned_data) and all(item in binned_data for item in current_binned_prices):
print(f"{time_str} No update needed")
return False
else:
self.write_binned_prices(time_str, binned_data)
return True
def write_binned_prices(self, time_str: str, binned_data: List[Tuple[int, int]] = None) -> None:
print(f'Writing compacted bin data for {self.region}-{self.flavor}-{time_str}')
table = DYNAMO_CLIENT.Table('wow-token-compacted')
pk = f'{self.region}-{self.flavor}-{time_str}'
dynamo_data: Dict[str, Dict[str, str]] = {}
for item in binned_data:
dynamo_data[str(item[0])] = item[1]
response = table.put_item(
Item={
'region-flavor-timestamp': pk,
'data': dynamo_data
}
)
@cache
def retrieve_prices_from_s3(region: str, flavor: str) -> List[Tuple[datetime.datetime, int]]:
s3 = boto3.client('s3')
key = f"{flavor}-{region}-price-history.json"
obj = s3.get_object(
Bucket=os.environ['S3_BUCKET'],
Key=key
)
raw_data = json.loads(obj['Body'].read())
data = []
for datum in raw_data:
date = datetime.datetime.fromisoformat(f"{datum[2]['ScalarValue']}+00:00")
price = int(int(datum[3]['ScalarValue']) / 10000)
data.append((date, price))
return data
def retrieve_recent_prices(region: str, flavor: str) -> List[Tuple[datetime.datetime, int]]:
start_time = (datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=95))
if flavor == 'retail':
table = DYNAMO_CLIENT.Table('wow-token-price-recent')
else:
table = DYNAMO_CLIENT.Table('wow-token-classic-price-recent')
response = table.query(
KeyConditionExpression=(
Key('region').eq(region) &
Key('timestamp').gte(int(start_time.timestamp()))))
data = []
last_price = 0
for item in response['Items']:
price = int(int(item['price']) / 10000)
if last_price != price:
last_price = price
item_time = datetime.datetime.fromtimestamp(int(item['timestamp']), tz=datetime.timezone.utc)
data.append((item_time, price))
return data
def write_compacted_month(region: str, flavor: str, year: int, month: int, data: List[Tuple[datetime.datetime, int]])-> None:
table = DYNAMO_CLIENT.Table('wow-token-compacted')
pk = f'{region}-{flavor}-{year}-{month}'
dynamo_data: Dict[str, Dict[str, str]] = {}
for item in data:
dynamo_data[str(int(item[0].timestamp()))] = item[1]
response = table.put_item(
Item={
'region-flavor-timestamp': pk,
'data': dynamo_data
}
)
def write_compacted_month_if_updated(region: str, flavor: str, year: int, month: int, data: List[Tuple[datetime.datetime, int]]) -> bool:
current_compacted_data = sorted(retrieve_compacted_prices(region, flavor, year, month).items())
calculated_data: List[Tuple[str, Decimal]]= []
for item in data:
calculated_data.append((str(int(item[0].timestamp())), Decimal(item[1])))
if all(item in current_compacted_data for item in calculated_data) and all(item in calculated_data for item in current_compacted_data):
print(f"{region} {flavor} {year} {month} No compaction update needed")
return False
else:
print(f"{region} {flavor} {year} {month} Compaction needed")
write_compacted_month(region, flavor, year, month, data)
return True
def retrieve_compacted_prices(region, flavor, year, month) -> dict:
table = DYNAMO_CLIENT.Table('wow-token-compacted')
pk = f'{region}-{flavor}-{year}-{month}'
response = table.query(
KeyConditionExpression=Key('region-flavor-timestamp').eq(pk)
)
if response['Items']:
return response['Items'][0]['data']
return {}
def _is_recent_month(current_time: datetime.datetime, given_time: datetime.datetime) -> bool:
difference = abs(current_time - given_time)
return difference.days <= 93
def _is_current_month(current_time: datetime.datetime, given_time: datetime.datetime) -> bool:
return current_time.month == given_time.month and current_time.year == given_time.year
def monthly_compact(region, flavor, year, month, current_time) -> None:
compacted_prices = retrieve_compacted_prices(region, flavor, year, month)
given_time = datetime.datetime(year, month, 1, tzinfo=datetime.timezone.utc)
recent = _is_recent_month(current_time, given_time)
if not compacted_prices and not recent:
prices = retrieve_prices_from_s3(region, flavor)
elif _is_recent_month(current_time, given_time):
prices = retrieve_recent_prices(region, flavor)
else:
return
compacted_prices = []
for price in prices:
if price[0].month == month and price[0].year == year:
compacted_prices.append(price)
if compacted_prices:
print(f'Writing compacted data for {region} {flavor} {year} {month}')
write_compacted_month_if_updated(region, flavor, year, month, compacted_prices)
def compactor(region, flavor, compact_all: bool = False):
if compact_all:
if flavor == 'retail':
start_date = datetime.datetime.fromisoformat('2020-11-15 00:00:01.000000000+00:00')
else:
start_date = datetime.datetime.fromisoformat('2023-05-23 00:00:01.000000000+00:00')
else:
start_date = datetime.datetime.now(tz=datetime.UTC) - datetime.timedelta(days=62)
_current_time = datetime.datetime.now(tz=datetime.UTC)
_date = start_date
_month = start_date.month
_year = start_date.year
if compact_all:
monthly_compact(region, flavor, _year, _month, _current_time)
while _date <= _current_time:
_current_time - datetime.timedelta(days=1)
if _month != _date.month or _year != _date.year:
_month = _date.month
_year = _date.year
time.sleep(1)
monthly_compact(region, flavor, _year, _month, _current_time)
_date += datetime.timedelta(days=1)
def lambda_handler(event, context):
regions = ['us', 'eu', 'tw', 'kr']
flavors = ['retail', 'classic']
times = ['30d', '90d', '6m', '1y', '2y', 'all']
for region in regions:
for flavor in flavors:
history = PriceHistory(region, flavor)
compactor(region, flavor)
for t in times:
if not (flavor == 'classic' and t == '2y'):
history.write_binned_if_updated(t)
def main():
#print(retrieve_prices_from_s3('us', 'retail'))
# retrieve_recent_prices('us', 'retail')
# retrieve_compacted_prices('us', 'retail', '2024', '1')
regions = ['us', 'eu', 'tw', 'kr']
flavors = ['retail', 'classic']
times = ['30d', '90d', '6m', '1y', '2y', 'all']
for region in regions:
for flavor in flavors:
compactor(region, flavor)
history = PriceHistory(region, flavor)
#for t in times:
# if not (flavor == 'classic' and t == '2y'):
# history.write_binned_if_updated(t)
#history.write_binned_prices(t)
if __name__ == '__main__':
main()

View File

@ -1,4 +1,5 @@
import sys
from typing import List, Dict
import boto3
from boto3.dynamodb.conditions import Key
@ -9,27 +10,6 @@ import json
import os
import statistics
timestream_region_map = {
'us-west-1': 'us-west-2',
'us-west-2': 'us-west-2',
'us-east-1': 'us-east-1',
'us-east-2': 'us-east-2',
'ap-south-1': 'eu-west-1',
'ap-northeast-3': 'ap-northeast-1',
'ap-northeast-2': 'ap-northeast-1',
'ap-southeast-1': 'ap-southeast-2',
'ap-southeast-2': 'ap-southeast-2',
'ap-northeast-1': 'ap-northeast-1',
'ca-central-1': 'us-east-1',
'eu-central-1': 'eu-central-1',
'eu-west-1': 'eu-west-1',
'eu-west-2': 'eu-west-1',
'eu-west-3': 'eu-central-1',
'eu-north-1': 'eu-central-1',
'sa-east-1': 'us-east-1',
'eu-south-1': 'eu-west-1'
}
dynamo_region_map = {
'us-west-1': 'us-west-1',
'us-west-2': 'us-west-2',
@ -53,23 +33,24 @@ dynamo_region_map = {
local_region = ''
if os.environ['AWS_REGION'] in dynamo_region_map:
local_dynamo_region = dynamo_region_map[os.environ['AWS_REGION']]
local_timestream_region = timestream_region_map[os.environ['AWS_REGION']]
else:
local_dynamo_region = 'eu-central-1'
local_timestream_region = 'eu-central-1'
timestream_client = boto3.client('timestream-query', region_name=local_timestream_region)
timestream_client = boto3.client('timestream-query', region_name='us-east-1')
dynamodb_client = boto3.resource('dynamodb', region_name=local_dynamo_region)
tables = {
'retail': {
'recent': 'wow-token-price-recent',
'current': 'wow-token-price',
'compacted': 'wow-token-compacted',
'timestream': 'wow-token-price-history'
},
'classic': {
'recent': 'wow-token-classic-price-recent',
'current': 'wow-token-classic-price',
'compacted': 'wow-token-compacted',
'timestream': 'wow-token-classic-price-history'
}
}
@ -85,67 +66,33 @@ def historical_data(time, region, version):
if time[-1] == 'h':
return dynamo_data(time, region, version)
else:
return timestream_data(time, region, version)
return dynamo_compacted(time, region, version)
def timestream_data(time, region, version):
def __interval_bin(__time: str) -> dict:
__time = __time.lower()
def __div_floor(divisor: int, floor: int) -> int:
res = int(int(__time[:-1]) / divisor)
if res < floor:
return floor
return res
if __time == 'all':
return {'interval': "INTERVAL '10' YEAR", 'bin': '3h'}
if __time[-1] == 'd':
# I don't believe fstrings would work here
return {
'interval': "INTERVAL '" + __time[:-1] + "' DAY",
'bin': str(__div_floor(3, 1)) + 'm'
}
if __time[-1] == 'm':
return {
'interval': "INTERVAL '" + __time[:-1] + "' MONTH",
'bin': str(__div_floor(6, 1)) + 'h'
}
if __time[-1] == 'y':
return {
'interval': "INTERVAL '" + __time[:-1] + "' YEAR",
'bin': str(__div_floor(6, 1)) + 'h'
}
print(f"Function region: {os.environ['AWS_REGION']}\t Timestream Region: {local_region}")
timestream_query = (f"WITH binned_query AS ("
f" SELECT BIN(time, {__interval_bin(time)['bin']}) AS binned_time,"
f" CAST(AVG(measure_value::bigint) AS bigint) as average "
f"FROM \"{tables[version]['timestream']}\".\"{region}-price-history\" "
f"WHERE measure_name = 'price' "
f" AND time > now() - ({__interval_bin(time)['interval']}) "
f"GROUP BY BIN(time, {__interval_bin(time)['bin']}), 'price') "
f"SELECT CREATE_TIME_SERIES(binned_time, average) AS timeseries_data "
f"FROM binned_query ORDER BY timeseries_data ASC")
response = timestream_client.query(QueryString=timestream_query)
rows = response['Rows'][0]['Data'][0]['TimeSeriesValue']
def _get_dynamo_compacted(time: str, region: str, version: str) -> List[Dict[str, int|str]]:
table = dynamodb_client.Table(tables[version]['compacted'])
pk = f'{region}-{version}-{time}'
response = table.query(
KeyConditionExpression=(
Key('region-flavor-timestamp').eq(pk)
)
)
response_data = sorted(response['Items'][0]['data'].items())
data = []
# TODO: Should we inject the aggregation functions into this loop to reduce the cost of looping?
for row in rows:
row_isotime = row['Time'].split('.')[0]
row_datetime = datetime.datetime.fromisoformat(row_isotime).replace(
tzinfo=datetime.timezone.utc)
for item in response_data:
data.append({
'time': row_datetime.isoformat(),
'value': int(int(row['Value']['ScalarValue']) / 10000)
'time': datetime.datetime.fromtimestamp(
int(item[0]),
tz=datetime.UTC).isoformat(),
'value': int(item[1])
})
return data
def dynamo_compacted(time: str, region: str, version: str) -> List[Dict[str, int]]:
return _get_dynamo_compacted(time, region, version)
def dynamo_data(time, region, version):
print(f"Function region: {os.environ['AWS_REGION']}\t Dynamo Region: {local_region}")
time_stripped = int(time[:-1])
@ -387,3 +334,12 @@ def lambda_handler(event, context):
return response
else:
return {'status': '404', 'statusDescription': 'NotFound', 'headers': {}}
def main():
pass
#data = dynamo_compacted('1y', 'us', 'retail')
#print(data)
if __name__ == '__main__':
main()

View File

@ -9,7 +9,6 @@ import requests
local_region = os.environ['AWS_REGION']
dynamo_client = boto3.client('dynamodb', region_name=local_region)
timestream_client = boto3.client('timestream-write', region_name=local_region)
tables = {
'retail': {
'recent': 'wow-token-price-recent',
@ -47,23 +46,13 @@ def flavor_handler(flavor: str, regions: list, timestamp: int) -> None:
def update_token_price(flavor: str, region: str, current_time_epoch: int, live_price: int) -> None:
stored_price = get_stored_price(flavor, region)
regional_item = get_regional_update_item(flavor, region)
print(f'Current live price {live_price}')
print(f'Current stored price {stored_price}')
regional_price = int(regional_item['price']['S'])
print(f'Current regional price {regional_price}')
if stored_price != live_price:
# If the stored price is not the same as the live price, nor the regional price
# assume no other Lambda has updated it and update it
print(f"Stored price is differing from the live price, updating all databases")
# update the stored price and the recent price
print(f"Stored price differs from the live price, updating databases")
update_stored_token_price(flavor, region, live_price, current_time_epoch)
update_recent_token_price(flavor, region, live_price, current_time_epoch)
update_regional_price(flavor, region, live_price, current_time_epoch)
update_timestream_token_price(flavor, region, live_price, current_time_epoch)
if (stored_price == live_price) and (regional_price != stored_price):
print(f"Stored price differs from regional price but not live price, updating regional databases")
update_regional_price(flavor, region, live_price, current_time_epoch)
update_timestream_token_price(flavor, region, live_price, current_time_epoch)
else:
print(f"Price hasn't changed for {flavor} {region.upper()}")
@ -90,55 +79,6 @@ def update_stored_token_price(flavor: str, region: str, price: int, current_time
)
print(f'Updated {flavor} {region.upper()} price to {price}')
def update_regional_price(flavor: str, region: str, price: int, current_time_epoch: int) -> None:
if flavor == 'retail':
key = region
else:
key = f"{flavor}-{region}"
dynamo_client.update_item(
TableName='wow-token-regional',
Key={
'region': {
'S': key
}
},
UpdateExpression='SET price = :p, current_time = :t',
ExpressionAttributeValues={
':p': {
'S': str(price)
},
':t': {
'S': str(current_time_epoch)
}
}
# ReturnValues="UPDATED_NEW"
)
print(f'Updated regional {flavor} {region.upper()} price to {price}')
def create_regional_item(flavor: str, region: str) -> None:
print(f"Creating default regional item in {flavor} {region}")
if flavor == 'retail':
key = region
else:
key = f"{flavor}-{region}"
dynamo_client.put_item(
TableName='wow-token-regional',
Item={
'region': {
'S': key
},
'price': {
'S': str(1)
},
'timestamp': {
'N': str(1)
}
}
)
# add a record to the recent token price table in DynamoDB
def update_recent_token_price(flavor: str, region: str, price: int, current_time_epoch: int) -> None:
dynamo_client.put_item(
@ -161,43 +101,6 @@ def update_recent_token_price(flavor: str, region: str, price: int, current_time
)
print(f'Added {region.upper()} price {price} to {tables[flavor]["recent"]} table')
def update_timestream_token_price(flavor: str, region: str, price: int, current_time_epoch: int) -> None:
record_inserted = False
while not record_inserted:
try:
print('Attempting to write to Timestream')
timestream_client.write_records(
DatabaseName=tables[flavor]['timestream'],
TableName=f'{region}-price-history',
Records=[
build_timestream_record(region, price, current_time_epoch),
]
)
record_inserted = True
except Exception as e:
print(f'Error writing to Timestream: {e}')
time.sleep(2)
print(f'Updated {flavor} {region.upper()} price to {price} in Timestream')
def build_timestream_record(region: str, price: int, current_time_epoch: int) -> dict:
return {
'Dimensions': [
{
'Name': 'region',
'Value': region,
'DimensionValueType': 'VARCHAR'
}
],
'MeasureName': 'price',
'MeasureValue': str(price),
'MeasureValueType': 'BIGINT',
'Time': str(current_time_epoch),
'TimeUnit': 'SECONDS',
}
# get the current stored token price from dynamodb
def get_stored_price(flavor: str, region: str) -> int:
response = dynamo_client.get_item(
@ -211,27 +114,6 @@ def get_stored_price(flavor: str, region: str) -> int:
return int(response['Item']['price']['S'])
def get_regional_update_item(flavor: str, region: str) -> dict:
if flavor == 'retail':
key = region
else:
key = f"{flavor}-{region}"
response = dynamo_client.get_item(
TableName='wow-token-regional',
Key={
'region': {
'S': key
}
}
)
if 'Item' in response:
return response['Item']
else:
create_regional_item(flavor, region)
time.sleep(5)
return get_regional_update_item(flavor, region)
def get_combined_live_price(game_flavor: str) -> dict:
if game_flavor == 'retail':
namespace = 'dynamic'