Spaces:
Running
on
Zero
Running
on
Zero
| import boto3 | |
| import csv | |
| from decimal import Decimal | |
| import datetime | |
| from boto3.dynamodb.conditions import Key | |
| from tools.config import AWS_REGION, ACCESS_LOG_DYNAMODB_TABLE_NAME, FEEDBACK_LOG_DYNAMODB_TABLE_NAME, USAGE_LOG_DYNAMODB_TABLE_NAME, OUTPUT_FOLDER | |
| # Replace with your actual table name and region | |
| TABLE_NAME = USAGE_LOG_DYNAMODB_TABLE_NAME # Choose as appropriate | |
| REGION = AWS_REGION | |
| CSV_OUTPUT = OUTPUT_FOLDER + 'dynamodb_logs_export.csv' | |
| # Create DynamoDB resource | |
| dynamodb = boto3.resource('dynamodb', region_name=REGION) | |
| table = dynamodb.Table(TABLE_NAME) | |
| # Helper function to convert Decimal to float or int | |
| def convert_types(item): | |
| new_item = {} | |
| for key, value in item.items(): | |
| # Handle Decimals first | |
| if isinstance(value, Decimal): | |
| new_item[key] = int(value) if value % 1 == 0 else float(value) | |
| # Handle Strings that might be dates | |
| elif isinstance(value, str): | |
| try: | |
| # Attempt to parse a common ISO 8601 format. | |
| # The .replace() handles the 'Z' for Zulu/UTC time. | |
| dt_obj = datetime.datetime.fromisoformat(value.replace('Z', '+00:00')) | |
| # Now that we have a datetime object, format it as desired | |
| new_item[key] = dt_obj.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] | |
| except (ValueError, TypeError): | |
| # If it fails to parse, it's just a regular string | |
| new_item[key] = value | |
| # Handle all other types | |
| else: | |
| new_item[key] = value | |
| return new_item | |
| # Paginated scan | |
| def scan_table(): | |
| items = [] | |
| response = table.scan() | |
| items.extend(response['Items']) | |
| while 'LastEvaluatedKey' in response: | |
| response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey']) | |
| items.extend(response['Items']) | |
| return items | |
| # Export to CSV | |
| # Export to CSV | |
| def export_to_csv(items, output_path, fields_to_drop: list = None): | |
| if not items: | |
| print("No items found.") | |
| return | |
| # Use a set for efficient lookup | |
| drop_set = set(fields_to_drop or []) | |
| # Get a comprehensive list of all possible headers from all items | |
| all_keys = set() | |
| for item in items: | |
| all_keys.update(item.keys()) | |
| # Determine the final fieldnames by subtracting the ones to drop | |
| fieldnames = sorted(list(all_keys - drop_set)) | |
| print("Final CSV columns will be:", fieldnames) | |
| with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile: | |
| # The key fix is here: extrasaction='ignore' | |
| # restval='' is also good practice to handle rows that are missing a key | |
| writer = csv.DictWriter( | |
| csvfile, | |
| fieldnames=fieldnames, | |
| extrasaction='ignore', | |
| restval='' | |
| ) | |
| writer.writeheader() | |
| for item in items: | |
| # The convert_types function can now return the full dict, | |
| # and the writer will simply ignore the extra fields. | |
| writer.writerow(convert_types(item)) | |
| print(f"Exported {len(items)} items to {output_path}") | |
| # Run export | |
| items = scan_table() | |
| export_to_csv(items, CSV_OUTPUT, fields_to_drop=['Query metadata - usage counts and other parameters']) |