diff --git a/wow-token-compactor.py b/wow-token-compactor.py new file mode 100644 index 0000000..6851a6d --- /dev/null +++ b/wow-token-compactor.py @@ -0,0 +1,357 @@ +import time +from decimal import Decimal +from functools import cache +from typing import Tuple, List, Dict + +import boto3 +import json +import os +import datetime + +from boto3.dynamodb.conditions import Key + +dynamo_region_map = { + 'us-west-1': 'us-west-1', + 'us-west-2': 'us-west-2', + 'us-east-1': 'us-east-1', + 'us-east-2': 'us-east-2', + 'ap-south-1': 'eu-north-1', + 'ap-northeast-3': 'ap-northeast-1', + 'ap-northeast-2': 'ap-northeast-1', + 'ap-southeast-1': 'ap-southeast-1', + 'ap-southeast-2': 'ap-southeast-2', + 'ap-northeast-1': 'ap-northeast-1', + 'ca-central-1': 'us-east-1', + 'eu-central-1': 'eu-north-1', + 'eu-west-1': 'eu-west-1', + 'eu-west-2': 'eu-west-1', + 'eu-west-3': 'eu-west-3', + 'eu-north-1': 'eu-north-1', + 'sa-east-1': 'sa-east-1', + 'eu-south-1': 'eu-north-1' +} # This is a rough first pass at an intelligent region selector based on what is replicated +if os.environ['AWS_REGION'] in dynamo_region_map: + local_dynamo_region = dynamo_region_map[os.environ['AWS_REGION']] +else: + local_dynamo_region = 'eu-central-1' + +DYNAMO_CLIENT = boto3.resource('dynamodb', region_name=local_dynamo_region) + + + +class PriceHistory: + region: str + flavor: str + _prices: Dict[str, List[Tuple[int, int]]] + + def __init__(self, region: str, flavor: str): + self.region = region + self.flavor = flavor + self._prices = dict() + + def _retrieve_compacted_prices_from_dynamo(self, year, month) -> None: + table = DYNAMO_CLIENT.Table('wow-token-compacted') + pk = f'{self.region}-{self.flavor}-{year}-{month}' + data = [] + response = table.query( + KeyConditionExpression=Key('region-flavor-timestamp').eq(pk) + ) + if response['Items']: + self._prices[f'{year}-{month}'] = [] + for timestamp, price in response['Items'][0]['data'].items(): + data.append((int(timestamp), int(price))) + self._prices[f'{year}-{month}'] = sorted(data, key=lambda x: x[0]) + + + def _retrieve_time_bin(self, start_time: datetime.datetime, end_time: datetime.datetime) -> List[Tuple[int, int]]: + scan_data = self.get_month_prices(start_time.month, start_time.year) + if end_time.year != start_time.year or end_time.month != start_time.month: + scan_data += self.get_month_prices(end_time.month, end_time.year) + scan_data + + high_tuple = (0,0) + low_tuple = (0,0) + for item in scan_data: + if start_time.timestamp() <= item[0] < end_time.timestamp(): + if item[1] > high_tuple[1]: + high_tuple = item + if item[1] < low_tuple[1] or low_tuple[0] == 0: + low_tuple = item + if high_tuple[0] == 0 or low_tuple[0] == 0: + return [] + else: + if high_tuple[0] == low_tuple[0]: + return [high_tuple] + elif low_tuple[0] > high_tuple[0]: + return [high_tuple, low_tuple] + else: + return [low_tuple, high_tuple] + + + def request_time_to_datetime_pair(self, time_str: str) -> Tuple[datetime.datetime, datetime.datetime]: + end_time = datetime.datetime.now(datetime.timezone.utc) + if time_str == 'all': + if self.flavor == 'retail': + start_time = datetime.datetime.fromisoformat('2020-11-15 00:00:01.000000000+00:00') + else: + start_time = datetime.datetime.fromisoformat('2023-05-23 00:00:01.000000000+00:00') + elif time_str[-1] == 'd': + days = int(time_str[:-1]) + start_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days) + elif time_str[-1] == 'm': + months = int(time_str[:-1]) + start_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=int(30.437*months)) + elif time_str[-1] == 'y': + years = int(time_str[:-1]) + start_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=int(365.25*years)) + else: + raise ValueError + return start_time, end_time + + + def binned_time(self, start_time: datetime.datetime, end_time: datetime.datetime) -> List[Tuple[int, int]]: + time_delta = end_time - start_time + hours = time_delta.days * 24 + time_delta.seconds // 3600 + if hours > 8800: # Above a year + bin_size = 12 + elif hours > 4400: # Above 6 months + bin_size = 6 + elif hours > 2112: # 3 months + bin_size = 2 + else: + bin_size = 1 + + _bin_start = start_time + _bin_end = _bin_start + datetime.timedelta(hours=bin_size, seconds=-1) + data = [] + + while _bin_start < end_time: + data += self._retrieve_time_bin(_bin_start, _bin_end) + _bin_start = _bin_end + datetime.timedelta(hours=1) + _bin_end = _bin_start + datetime.timedelta(hours=bin_size, seconds=-1) + + return data + + + def get_month_prices(self, month: int|str, year: int|str) -> List[Tuple[int, int]]: + if isinstance(month, int): + month = str(month) + + if isinstance(year, int): + year = str(year) + + if f'{year}-{month}' not in self._prices: + self._retrieve_compacted_prices_from_dynamo(year, month) + return self._prices[f'{year}-{month}'] + + + def retrieve_binned_prices(self, time_str: str) -> List[Tuple[int, int]]: + table = DYNAMO_CLIENT.Table('wow-token-compacted') + pk = f'{self.region}-{self.flavor}-{time_str}' + response = table.get_item( + Key={ + 'region-flavor-timestamp': pk + } + ) + if 'Item' not in response: + return [] + data = [] + for _time, _price in response['Item']['data'].items(): + data.append((int(_time), int(_price))) + return sorted(data) + + + def write_binned_if_updated(self, time_str: str) -> bool: + current_binned_prices = self.retrieve_binned_prices(time_str) + start, end = self.request_time_to_datetime_pair(time_str) + binned_data = sorted(self.binned_time(start, end)) + if all(item in current_binned_prices for item in binned_data) and all(item in binned_data for item in current_binned_prices): + print(f"{time_str} No update needed") + return False + else: + self.write_binned_prices(time_str, binned_data) + return True + + + def write_binned_prices(self, time_str: str, binned_data: List[Tuple[int, int]] = None) -> None: + print(f'Writing compacted bin data for {self.region}-{self.flavor}-{time_str}') + table = DYNAMO_CLIENT.Table('wow-token-compacted') + pk = f'{self.region}-{self.flavor}-{time_str}' + dynamo_data: Dict[str, Dict[str, str]] = {} + for item in binned_data: + dynamo_data[str(item[0])] = item[1] + + response = table.put_item( + Item={ + 'region-flavor-timestamp': pk, + 'data': dynamo_data + } + ) + + +@cache +def retrieve_prices_from_s3(region: str, flavor: str) -> List[Tuple[datetime.datetime, int]]: + s3 = boto3.client('s3') + key = f"{flavor}-{region}-price-history.json" + obj = s3.get_object( + Bucket=os.environ['S3_BUCKET'], + Key=key + ) + raw_data = json.loads(obj['Body'].read()) + data = [] + for datum in raw_data: + date = datetime.datetime.fromisoformat(f"{datum[2]['ScalarValue']}+00:00") + price = int(int(datum[3]['ScalarValue']) / 10000) + data.append((date, price)) + return data + + +def retrieve_recent_prices(region: str, flavor: str) -> List[Tuple[datetime.datetime, int]]: + start_time = (datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=95)) + if flavor == 'retail': + table = DYNAMO_CLIENT.Table('wow-token-price-recent') + else: + table = DYNAMO_CLIENT.Table('wow-token-classic-price-recent') + response = table.query( + KeyConditionExpression=( + Key('region').eq(region) & + Key('timestamp').gte(int(start_time.timestamp())))) + data = [] + last_price = 0 + for item in response['Items']: + price = int(int(item['price']) / 10000) + if last_price != price: + last_price = price + item_time = datetime.datetime.fromtimestamp(int(item['timestamp']), tz=datetime.timezone.utc) + data.append((item_time, price)) + return data + + +def write_compacted_month(region: str, flavor: str, year: int, month: int, data: List[Tuple[datetime.datetime, int]])-> None: + table = DYNAMO_CLIENT.Table('wow-token-compacted') + pk = f'{region}-{flavor}-{year}-{month}' + dynamo_data: Dict[str, Dict[str, str]] = {} + for item in data: + dynamo_data[str(int(item[0].timestamp()))] = item[1] + + response = table.put_item( + Item={ + 'region-flavor-timestamp': pk, + 'data': dynamo_data + } + ) + +def write_compacted_month_if_updated(region: str, flavor: str, year: int, month: int, data: List[Tuple[datetime.datetime, int]]) -> bool: + current_compacted_data = sorted(retrieve_compacted_prices(region, flavor, year, month).items()) + calculated_data: List[Tuple[str, Decimal]]= [] + for item in data: + calculated_data.append((str(int(item[0].timestamp())), Decimal(item[1]))) + + if all(item in current_compacted_data for item in calculated_data) and all(item in calculated_data for item in current_compacted_data): + print(f"{region} {flavor} {year} {month} No compaction update needed") + return False + else: + print(f"{region} {flavor} {year} {month} Compaction needed") + write_compacted_month(region, flavor, year, month, data) + return True + + +def retrieve_compacted_prices(region, flavor, year, month) -> dict: + table = DYNAMO_CLIENT.Table('wow-token-compacted') + pk = f'{region}-{flavor}-{year}-{month}' + response = table.query( + KeyConditionExpression=Key('region-flavor-timestamp').eq(pk) + ) + if response['Items']: + return response['Items'][0]['data'] + return {} + + +def _is_recent_month(current_time: datetime.datetime, given_time: datetime.datetime) -> bool: + difference = abs(current_time - given_time) + return difference.days <= 93 + + +def _is_current_month(current_time: datetime.datetime, given_time: datetime.datetime) -> bool: + return current_time.month == given_time.month and current_time.year == given_time.year + + +def monthly_compact(region, flavor, year, month, current_time) -> None: + compacted_prices = retrieve_compacted_prices(region, flavor, year, month) + given_time = datetime.datetime(year, month, 1, tzinfo=datetime.timezone.utc) + recent = _is_recent_month(current_time, given_time) + + if not compacted_prices and not recent: + prices = retrieve_prices_from_s3(region, flavor) + elif _is_recent_month(current_time, given_time): + prices = retrieve_recent_prices(region, flavor) + else: + return + + compacted_prices = [] + for price in prices: + if price[0].month == month and price[0].year == year: + compacted_prices.append(price) + if compacted_prices: + print(f'Writing compacted data for {region} {flavor} {year} {month}') + write_compacted_month_if_updated(region, flavor, year, month, compacted_prices) + + +def compactor(region, flavor, compact_all: bool = False): + if compact_all: + if flavor == 'retail': + start_date = datetime.datetime.fromisoformat('2020-11-15 00:00:01.000000000+00:00') + else: + start_date = datetime.datetime.fromisoformat('2023-05-23 00:00:01.000000000+00:00') + else: + start_date = datetime.datetime.now(tz=datetime.UTC) - datetime.timedelta(days=62) + + _current_time = datetime.datetime.now(tz=datetime.UTC) + _date = start_date + _month = start_date.month + _year = start_date.year + if compact_all: + monthly_compact(region, flavor, _year, _month, _current_time) + while _date <= _current_time: + _current_time - datetime.timedelta(days=1) + if _month != _date.month or _year != _date.year: + _month = _date.month + _year = _date.year + time.sleep(1) + monthly_compact(region, flavor, _year, _month, _current_time) + + _date += datetime.timedelta(days=1) + + +def lambda_handler(event, context): + regions = ['us', 'eu', 'tw', 'kr'] + flavors = ['retail', 'classic'] + times = ['30d', '90d', '6m', '1y', '2y', 'all'] + for region in regions: + for flavor in flavors: + history = PriceHistory(region, flavor) + compactor(region, flavor) + for t in times: + if not (flavor == 'classic' and t == '2y'): + history.write_binned_if_updated(t) + + +def main(): + #print(retrieve_prices_from_s3('us', 'retail')) + # retrieve_recent_prices('us', 'retail') + # retrieve_compacted_prices('us', 'retail', '2024', '1') + regions = ['us', 'eu', 'tw', 'kr'] + flavors = ['retail', 'classic'] + times = ['30d', '90d', '6m', '1y', '2y', 'all'] + for region in regions: + for flavor in flavors: + compactor(region, flavor) + history = PriceHistory(region, flavor) + #for t in times: + # if not (flavor == 'classic' and t == '2y'): + # history.write_binned_if_updated(t) + #history.write_binned_prices(t) + + + +if __name__ == '__main__': + main() diff --git a/wow-token-historical.py b/wow-token-historical.py index 49b7094..bf66656 100644 --- a/wow-token-historical.py +++ b/wow-token-historical.py @@ -1,4 +1,5 @@ import sys +from typing import List, Dict import boto3 from boto3.dynamodb.conditions import Key @@ -9,27 +10,6 @@ import json import os import statistics -timestream_region_map = { - 'us-west-1': 'us-west-2', - 'us-west-2': 'us-west-2', - 'us-east-1': 'us-east-1', - 'us-east-2': 'us-east-2', - 'ap-south-1': 'eu-west-1', - 'ap-northeast-3': 'ap-northeast-1', - 'ap-northeast-2': 'ap-northeast-1', - 'ap-southeast-1': 'ap-southeast-2', - 'ap-southeast-2': 'ap-southeast-2', - 'ap-northeast-1': 'ap-northeast-1', - 'ca-central-1': 'us-east-1', - 'eu-central-1': 'eu-central-1', - 'eu-west-1': 'eu-west-1', - 'eu-west-2': 'eu-west-1', - 'eu-west-3': 'eu-central-1', - 'eu-north-1': 'eu-central-1', - 'sa-east-1': 'us-east-1', - 'eu-south-1': 'eu-west-1' -} - dynamo_region_map = { 'us-west-1': 'us-west-1', 'us-west-2': 'us-west-2', @@ -53,23 +33,24 @@ dynamo_region_map = { local_region = '' if os.environ['AWS_REGION'] in dynamo_region_map: local_dynamo_region = dynamo_region_map[os.environ['AWS_REGION']] - local_timestream_region = timestream_region_map[os.environ['AWS_REGION']] else: local_dynamo_region = 'eu-central-1' local_timestream_region = 'eu-central-1' -timestream_client = boto3.client('timestream-query', region_name=local_timestream_region) +timestream_client = boto3.client('timestream-query', region_name='us-east-1') dynamodb_client = boto3.resource('dynamodb', region_name=local_dynamo_region) tables = { 'retail': { 'recent': 'wow-token-price-recent', 'current': 'wow-token-price', + 'compacted': 'wow-token-compacted', 'timestream': 'wow-token-price-history' }, 'classic': { 'recent': 'wow-token-classic-price-recent', 'current': 'wow-token-classic-price', + 'compacted': 'wow-token-compacted', 'timestream': 'wow-token-classic-price-history' } } @@ -85,67 +66,33 @@ def historical_data(time, region, version): if time[-1] == 'h': return dynamo_data(time, region, version) else: - return timestream_data(time, region, version) + return dynamo_compacted(time, region, version) -def timestream_data(time, region, version): - def __interval_bin(__time: str) -> dict: - __time = __time.lower() - - def __div_floor(divisor: int, floor: int) -> int: - res = int(int(__time[:-1]) / divisor) - if res < floor: - return floor - return res - - if __time == 'all': - return {'interval': "INTERVAL '10' YEAR", 'bin': '3h'} - - if __time[-1] == 'd': - # I don't believe fstrings would work here - return { - 'interval': "INTERVAL '" + __time[:-1] + "' DAY", - 'bin': str(__div_floor(3, 1)) + 'm' - } - - if __time[-1] == 'm': - return { - 'interval': "INTERVAL '" + __time[:-1] + "' MONTH", - 'bin': str(__div_floor(6, 1)) + 'h' - } - - if __time[-1] == 'y': - return { - 'interval': "INTERVAL '" + __time[:-1] + "' YEAR", - 'bin': str(__div_floor(6, 1)) + 'h' - } - - print(f"Function region: {os.environ['AWS_REGION']}\t Timestream Region: {local_region}") - timestream_query = (f"WITH binned_query AS (" - f" SELECT BIN(time, {__interval_bin(time)['bin']}) AS binned_time," - f" CAST(AVG(measure_value::bigint) AS bigint) as average " - f"FROM \"{tables[version]['timestream']}\".\"{region}-price-history\" " - f"WHERE measure_name = 'price' " - f" AND time > now() - ({__interval_bin(time)['interval']}) " - f"GROUP BY BIN(time, {__interval_bin(time)['bin']}), 'price') " - f"SELECT CREATE_TIME_SERIES(binned_time, average) AS timeseries_data " - f"FROM binned_query ORDER BY timeseries_data ASC") - - response = timestream_client.query(QueryString=timestream_query) - rows = response['Rows'][0]['Data'][0]['TimeSeriesValue'] +def _get_dynamo_compacted(time: str, region: str, version: str) -> List[Dict[str, int|str]]: + table = dynamodb_client.Table(tables[version]['compacted']) + pk = f'{region}-{version}-{time}' + response = table.query( + KeyConditionExpression=( + Key('region-flavor-timestamp').eq(pk) + ) + ) + response_data = sorted(response['Items'][0]['data'].items()) data = [] - # TODO: Should we inject the aggregation functions into this loop to reduce the cost of looping? - for row in rows: - row_isotime = row['Time'].split('.')[0] - row_datetime = datetime.datetime.fromisoformat(row_isotime).replace( - tzinfo=datetime.timezone.utc) + for item in response_data: data.append({ - 'time': row_datetime.isoformat(), - 'value': int(int(row['Value']['ScalarValue']) / 10000) + 'time': datetime.datetime.fromtimestamp( + int(item[0]), + tz=datetime.UTC).isoformat(), + 'value': int(item[1]) }) return data +def dynamo_compacted(time: str, region: str, version: str) -> List[Dict[str, int]]: + return _get_dynamo_compacted(time, region, version) + + def dynamo_data(time, region, version): print(f"Function region: {os.environ['AWS_REGION']}\t Dynamo Region: {local_region}") time_stripped = int(time[:-1]) @@ -387,3 +334,12 @@ def lambda_handler(event, context): return response else: return {'status': '404', 'statusDescription': 'NotFound', 'headers': {}} + + +def main(): + pass + #data = dynamo_compacted('1y', 'us', 'retail') + #print(data) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/wow-token-updater.py b/wow-token-updater.py index 6c8f717..92f8b6e 100644 --- a/wow-token-updater.py +++ b/wow-token-updater.py @@ -9,7 +9,6 @@ import requests local_region = os.environ['AWS_REGION'] dynamo_client = boto3.client('dynamodb', region_name=local_region) -timestream_client = boto3.client('timestream-write', region_name=local_region) tables = { 'retail': { 'recent': 'wow-token-price-recent', @@ -47,23 +46,13 @@ def flavor_handler(flavor: str, regions: list, timestamp: int) -> None: def update_token_price(flavor: str, region: str, current_time_epoch: int, live_price: int) -> None: stored_price = get_stored_price(flavor, region) - regional_item = get_regional_update_item(flavor, region) print(f'Current live price {live_price}') print(f'Current stored price {stored_price}') - regional_price = int(regional_item['price']['S']) - print(f'Current regional price {regional_price}') if stored_price != live_price: - # If the stored price is not the same as the live price, nor the regional price - # assume no other Lambda has updated it and update it - print(f"Stored price is differing from the live price, updating all databases") + # update the stored price and the recent price + print(f"Stored price differs from the live price, updating databases") update_stored_token_price(flavor, region, live_price, current_time_epoch) update_recent_token_price(flavor, region, live_price, current_time_epoch) - update_regional_price(flavor, region, live_price, current_time_epoch) - update_timestream_token_price(flavor, region, live_price, current_time_epoch) - if (stored_price == live_price) and (regional_price != stored_price): - print(f"Stored price differs from regional price but not live price, updating regional databases") - update_regional_price(flavor, region, live_price, current_time_epoch) - update_timestream_token_price(flavor, region, live_price, current_time_epoch) else: print(f"Price hasn't changed for {flavor} {region.upper()}") @@ -90,55 +79,6 @@ def update_stored_token_price(flavor: str, region: str, price: int, current_time ) print(f'Updated {flavor} {region.upper()} price to {price}') - -def update_regional_price(flavor: str, region: str, price: int, current_time_epoch: int) -> None: - if flavor == 'retail': - key = region - else: - key = f"{flavor}-{region}" - dynamo_client.update_item( - TableName='wow-token-regional', - Key={ - 'region': { - 'S': key - } - }, - UpdateExpression='SET price = :p, current_time = :t', - ExpressionAttributeValues={ - ':p': { - 'S': str(price) - }, - ':t': { - 'S': str(current_time_epoch) - } - } - # ReturnValues="UPDATED_NEW" - ) - print(f'Updated regional {flavor} {region.upper()} price to {price}') - - -def create_regional_item(flavor: str, region: str) -> None: - print(f"Creating default regional item in {flavor} {region}") - if flavor == 'retail': - key = region - else: - key = f"{flavor}-{region}" - dynamo_client.put_item( - TableName='wow-token-regional', - Item={ - 'region': { - 'S': key - }, - 'price': { - 'S': str(1) - }, - 'timestamp': { - 'N': str(1) - } - } - ) - - # add a record to the recent token price table in DynamoDB def update_recent_token_price(flavor: str, region: str, price: int, current_time_epoch: int) -> None: dynamo_client.put_item( @@ -161,43 +101,6 @@ def update_recent_token_price(flavor: str, region: str, price: int, current_time ) print(f'Added {region.upper()} price {price} to {tables[flavor]["recent"]} table') - -def update_timestream_token_price(flavor: str, region: str, price: int, current_time_epoch: int) -> None: - record_inserted = False - while not record_inserted: - try: - print('Attempting to write to Timestream') - timestream_client.write_records( - DatabaseName=tables[flavor]['timestream'], - TableName=f'{region}-price-history', - Records=[ - build_timestream_record(region, price, current_time_epoch), - ] - ) - record_inserted = True - except Exception as e: - print(f'Error writing to Timestream: {e}') - time.sleep(2) - print(f'Updated {flavor} {region.upper()} price to {price} in Timestream') - - -def build_timestream_record(region: str, price: int, current_time_epoch: int) -> dict: - return { - 'Dimensions': [ - { - 'Name': 'region', - 'Value': region, - 'DimensionValueType': 'VARCHAR' - } - ], - 'MeasureName': 'price', - 'MeasureValue': str(price), - 'MeasureValueType': 'BIGINT', - 'Time': str(current_time_epoch), - 'TimeUnit': 'SECONDS', - } - - # get the current stored token price from dynamodb def get_stored_price(flavor: str, region: str) -> int: response = dynamo_client.get_item( @@ -211,27 +114,6 @@ def get_stored_price(flavor: str, region: str) -> int: return int(response['Item']['price']['S']) -def get_regional_update_item(flavor: str, region: str) -> dict: - if flavor == 'retail': - key = region - else: - key = f"{flavor}-{region}" - response = dynamo_client.get_item( - TableName='wow-token-regional', - Key={ - 'region': { - 'S': key - } - } - ) - if 'Item' in response: - return response['Item'] - else: - create_regional_item(flavor, region) - time.sleep(5) - return get_regional_update_item(flavor, region) - - def get_combined_live_price(game_flavor: str) -> dict: if game_flavor == 'retail': namespace = 'dynamic'