import time from decimal import Decimal from functools import cache from typing import Tuple, List, Dict import boto3 import json import os import datetime from boto3.dynamodb.conditions import Key dynamo_region_map = { 'us-west-1': 'us-west-1', 'us-west-2': 'us-west-2', 'us-east-1': 'us-east-1', 'us-east-2': 'us-east-2', 'ap-south-1': 'eu-north-1', 'ap-northeast-3': 'ap-northeast-1', 'ap-northeast-2': 'ap-northeast-1', 'ap-southeast-1': 'ap-southeast-1', 'ap-southeast-2': 'ap-southeast-2', 'ap-northeast-1': 'ap-northeast-1', 'ca-central-1': 'us-east-1', 'eu-central-1': 'eu-north-1', 'eu-west-1': 'eu-west-1', 'eu-west-2': 'eu-west-1', 'eu-west-3': 'eu-west-3', 'eu-north-1': 'eu-north-1', 'sa-east-1': 'sa-east-1', 'eu-south-1': 'eu-north-1' } # This is a rough first pass at an intelligent region selector based on what is replicated if os.environ['AWS_REGION'] in dynamo_region_map: local_dynamo_region = dynamo_region_map[os.environ['AWS_REGION']] else: local_dynamo_region = 'eu-central-1' DYNAMO_CLIENT = boto3.resource('dynamodb', region_name=local_dynamo_region) class PriceHistory: region: str flavor: str _prices: Dict[str, List[Tuple[int, int]]] def __init__(self, region: str, flavor: str): self.region = region self.flavor = flavor self._prices = dict() def _retrieve_compacted_prices_from_dynamo(self, year, month) -> None: table = DYNAMO_CLIENT.Table('wow-token-compacted') pk = f'{self.region}-{self.flavor}-{year}-{month}' data = [] response = table.query( KeyConditionExpression=Key('region-flavor-timestamp').eq(pk) ) if response['Items']: self._prices[f'{year}-{month}'] = [] for timestamp, price in response['Items'][0]['data'].items(): data.append((int(timestamp), int(price))) self._prices[f'{year}-{month}'] = sorted(data, key=lambda x: x[0]) def _retrieve_time_bin(self, start_time: datetime.datetime, end_time: datetime.datetime) -> List[Tuple[int, int]]: scan_data = self.get_month_prices(start_time.month, start_time.year) if end_time.year != start_time.year or end_time.month != start_time.month: scan_data += self.get_month_prices(end_time.month, end_time.year) + scan_data high_tuple = (0,0) low_tuple = (0,0) for item in scan_data: if start_time.timestamp() <= item[0] < end_time.timestamp(): if item[1] > high_tuple[1]: high_tuple = item if item[1] < low_tuple[1] or low_tuple[0] == 0: low_tuple = item if high_tuple[0] == 0 or low_tuple[0] == 0: return [] else: if high_tuple[0] == low_tuple[0]: return [high_tuple] elif low_tuple[0] > high_tuple[0]: return [high_tuple, low_tuple] else: return [low_tuple, high_tuple] def request_time_to_datetime_pair(self, time_str: str) -> Tuple[datetime.datetime, datetime.datetime]: end_time = datetime.datetime.now(datetime.timezone.utc) if time_str == 'all': if self.flavor == 'retail': start_time = datetime.datetime.fromisoformat('2020-11-15 00:00:01.000000000+00:00') else: start_time = datetime.datetime.fromisoformat('2023-05-23 00:00:01.000000000+00:00') elif time_str[-1] == 'd': days = int(time_str[:-1]) start_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days) elif time_str[-1] == 'm': months = int(time_str[:-1]) start_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=int(30.437*months)) elif time_str[-1] == 'y': years = int(time_str[:-1]) start_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=int(365.25*years)) else: raise ValueError return start_time, end_time def binned_time(self, start_time: datetime.datetime, end_time: datetime.datetime) -> List[Tuple[int, int]]: time_delta = end_time - start_time hours = time_delta.days * 24 + time_delta.seconds // 3600 if hours > 8800: # Above a year bin_size = 12 elif hours > 4400: # Above 6 months bin_size = 6 elif hours > 2112: # 3 months bin_size = 2 else: bin_size = 1 _bin_start = start_time _bin_end = _bin_start + datetime.timedelta(hours=bin_size, seconds=-1) data = [] while _bin_start < end_time: data += self._retrieve_time_bin(_bin_start, _bin_end) _bin_start = _bin_end + datetime.timedelta(hours=1) _bin_end = _bin_start + datetime.timedelta(hours=bin_size, seconds=-1) return data def get_month_prices(self, month: int|str, year: int|str) -> List[Tuple[int, int]]: if isinstance(month, int): month = str(month) if isinstance(year, int): year = str(year) if f'{year}-{month}' not in self._prices: self._retrieve_compacted_prices_from_dynamo(year, month) return self._prices[f'{year}-{month}'] def retrieve_binned_prices(self, time_str: str) -> List[Tuple[int, int]]: table = DYNAMO_CLIENT.Table('wow-token-compacted') pk = f'{self.region}-{self.flavor}-{time_str}' response = table.get_item( Key={ 'region-flavor-timestamp': pk } ) if 'Item' not in response: return [] data = [] for _time, _price in response['Item']['data'].items(): data.append((int(_time), int(_price))) return sorted(data) def write_binned_if_updated(self, time_str: str) -> bool: current_binned_prices = self.retrieve_binned_prices(time_str) start, end = self.request_time_to_datetime_pair(time_str) binned_data = sorted(self.binned_time(start, end)) if all(item in current_binned_prices for item in binned_data) and all(item in binned_data for item in current_binned_prices): print(f"{time_str} No update needed") return False else: self.write_binned_prices(time_str, binned_data) return True def write_binned_prices(self, time_str: str, binned_data: List[Tuple[int, int]] = None) -> None: print(f'Writing compacted bin data for {self.region}-{self.flavor}-{time_str}') table = DYNAMO_CLIENT.Table('wow-token-compacted') pk = f'{self.region}-{self.flavor}-{time_str}' dynamo_data: Dict[str, Dict[str, str]] = {} for item in binned_data: dynamo_data[str(item[0])] = item[1] response = table.put_item( Item={ 'region-flavor-timestamp': pk, 'data': dynamo_data } ) @cache def retrieve_prices_from_s3(region: str, flavor: str) -> List[Tuple[datetime.datetime, int]]: s3 = boto3.client('s3') key = f"{flavor}-{region}-price-history.json" obj = s3.get_object( Bucket=os.environ['S3_BUCKET'], Key=key ) raw_data = json.loads(obj['Body'].read()) data = [] for datum in raw_data: date = datetime.datetime.fromisoformat(f"{datum[2]['ScalarValue']}+00:00") price = int(int(datum[3]['ScalarValue']) / 10000) data.append((date, price)) return data def retrieve_recent_prices(region: str, flavor: str) -> List[Tuple[datetime.datetime, int]]: start_time = (datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=95)) if flavor == 'retail': table = DYNAMO_CLIENT.Table('wow-token-price-recent') else: table = DYNAMO_CLIENT.Table('wow-token-classic-price-recent') response = table.query( KeyConditionExpression=( Key('region').eq(region) & Key('timestamp').gte(int(start_time.timestamp())))) data = [] last_price = 0 for item in response['Items']: price = int(int(item['price']) / 10000) if last_price != price: last_price = price item_time = datetime.datetime.fromtimestamp(int(item['timestamp']), tz=datetime.timezone.utc) data.append((item_time, price)) return data def write_compacted_month(region: str, flavor: str, year: int, month: int, data: List[Tuple[datetime.datetime, int]])-> None: table = DYNAMO_CLIENT.Table('wow-token-compacted') pk = f'{region}-{flavor}-{year}-{month}' dynamo_data: Dict[str, Dict[str, str]] = {} for item in data: dynamo_data[str(int(item[0].timestamp()))] = item[1] response = table.put_item( Item={ 'region-flavor-timestamp': pk, 'data': dynamo_data } ) def write_compacted_month_if_updated(region: str, flavor: str, year: int, month: int, data: List[Tuple[datetime.datetime, int]]) -> bool: current_compacted_data = sorted(retrieve_compacted_prices(region, flavor, year, month).items()) calculated_data: List[Tuple[str, Decimal]]= [] for item in data: calculated_data.append((str(int(item[0].timestamp())), Decimal(item[1]))) if all(item in current_compacted_data for item in calculated_data) and all(item in calculated_data for item in current_compacted_data): print(f"{region} {flavor} {year} {month} No compaction update needed") return False else: print(f"{region} {flavor} {year} {month} Compaction needed") write_compacted_month(region, flavor, year, month, data) return True def retrieve_compacted_prices(region, flavor, year, month) -> dict: table = DYNAMO_CLIENT.Table('wow-token-compacted') pk = f'{region}-{flavor}-{year}-{month}' response = table.query( KeyConditionExpression=Key('region-flavor-timestamp').eq(pk) ) if response['Items']: return response['Items'][0]['data'] return {} def _is_recent_month(current_time: datetime.datetime, given_time: datetime.datetime) -> bool: difference = abs(current_time - given_time) return difference.days <= 93 def _is_current_month(current_time: datetime.datetime, given_time: datetime.datetime) -> bool: return current_time.month == given_time.month and current_time.year == given_time.year def monthly_compact(region, flavor, year, month, current_time) -> None: compacted_prices = retrieve_compacted_prices(region, flavor, year, month) given_time = datetime.datetime(year, month, 1, tzinfo=datetime.timezone.utc) recent = _is_recent_month(current_time, given_time) if not compacted_prices and not recent: prices = retrieve_prices_from_s3(region, flavor) elif _is_recent_month(current_time, given_time): prices = retrieve_recent_prices(region, flavor) else: return compacted_prices = [] for price in prices: if price[0].month == month and price[0].year == year: compacted_prices.append(price) if compacted_prices: print(f'Writing compacted data for {region} {flavor} {year} {month}') write_compacted_month_if_updated(region, flavor, year, month, compacted_prices) def compactor(region, flavor, compact_all: bool = False): if compact_all: if flavor == 'retail': start_date = datetime.datetime.fromisoformat('2020-11-15 00:00:01.000000000+00:00') else: start_date = datetime.datetime.fromisoformat('2023-05-23 00:00:01.000000000+00:00') else: start_date = datetime.datetime.now(tz=datetime.UTC) - datetime.timedelta(days=62) _current_time = datetime.datetime.now(tz=datetime.UTC) _date = start_date _month = start_date.month _year = start_date.year if compact_all: monthly_compact(region, flavor, _year, _month, _current_time) while _date <= _current_time: _current_time - datetime.timedelta(days=1) if _month != _date.month or _year != _date.year: _month = _date.month _year = _date.year time.sleep(1) monthly_compact(region, flavor, _year, _month, _current_time) _date += datetime.timedelta(days=1) def lambda_handler(event, context): regions = ['us', 'eu', 'tw', 'kr'] flavors = ['retail', 'classic'] times = ['30d', '90d', '6m', '1y', '2y', 'all'] for region in regions: for flavor in flavors: history = PriceHistory(region, flavor) compactor(region, flavor) for t in times: if not (flavor == 'classic' and t == '2y'): history.write_binned_if_updated(t) def main(): #print(retrieve_prices_from_s3('us', 'retail')) # retrieve_recent_prices('us', 'retail') # retrieve_compacted_prices('us', 'retail', '2024', '1') regions = ['us', 'eu', 'tw', 'kr'] flavors = ['retail', 'classic'] times = ['30d', '90d', '6m', '1y', '2y', 'all'] for region in regions: for flavor in flavors: compactor(region, flavor) history = PriceHistory(region, flavor) #for t in times: # if not (flavor == 'classic' and t == '2y'): # history.write_binned_if_updated(t) #history.write_binned_prices(t) if __name__ == '__main__': main()