""" DynamoDB Table Reader Script This script connects to AWS DynamoDB and reads all entries from a specified table. Outputs data in XML format with tags containing the context XML content. Usage: from dynamodb_read_table import read_table_as_xml xml_content = read_table_as_xml("my-table-name") """ import re import boto3 from botocore.exceptions import ClientError def clean_context_xml(context: str) -> str: """ Remove XML declaration and tags from context content. Args: context: Raw XML content from DynamoDB Returns: Cleaned XML content without declaration and relatorio tags """ # Remove XML declaration (e.g., ) context = re.sub(r'<\?xml[^?]*\?>\s*', '', context) # Remove opening tag (with any attributes) context = re.sub(r']*>\s*', '', context) # Remove closing tag context = re.sub(r'\s*', '', context) return context.strip() def remove_xml_declaration(content: str) -> str: """ Remove only the XML declaration from content. Args: content: Raw XML content Returns: Content without XML declaration (keeps relatorio tags) """ content = re.sub(r'<\?xml[^?]*\?>\s*', '', content) return content.strip() def format_items_to_xml(items: list) -> str: """ Format all DynamoDB items to XML format. Each item's 'period' field becomes a tag, and the 'context' and 'dados_consolidados' fields are placed inside it. Args: items: List of DynamoDB items Returns: Complete XML formatted string with all items """ xml_parts = [] for item in items: period = item.get("period", "unknown") context = item.get("context", "") dados_consolidados = item.get("dados_consolidados", "") # Clean the XML content cleaned_context = clean_context_xml(context) cleaned_dados = remove_xml_declaration(dados_consolidados) xml_parts.append(f"<{period}>") xml_parts.append(cleaned_context) if cleaned_dados: xml_parts.append(cleaned_dados) xml_parts.append(f"") xml_parts.append("") # Empty line between entries return "\n".join(xml_parts) def get_dynamodb_client(region_name: str = "us-east-1"): """Create and return a DynamoDB client.""" session = boto3.Session() return session.client("dynamodb", region_name=region_name) def get_dynamodb_resource(region_name: str = "us-east-1"): """Create and return a DynamoDB resource for higher-level operations.""" session = boto3.Session() return session.resource("dynamodb", region_name=region_name) def scan_table(table_name: str, region_name: str = "us-east-1") -> list: """ Scan a DynamoDB table and return all items. Uses pagination to handle tables larger than 1MB response limit. Args: table_name: Name of the DynamoDB table to scan region_name: AWS region where the table is located Returns: List of all items in the table """ dynamodb = get_dynamodb_resource(region_name) table = dynamodb.Table(table_name) items = [] last_evaluated_key = None try: while True: if last_evaluated_key: response = table.scan(ExclusiveStartKey=last_evaluated_key) else: response = table.scan() items.extend(response.get("Items", [])) last_evaluated_key = response.get("LastEvaluatedKey") if not last_evaluated_key: break print(f"Successfully scanned {len(items)} items from table '{table_name}'") return items except ClientError as e: error_code = e.response["Error"]["Code"] error_message = e.response["Error"]["Message"] print(f"Error scanning table: {error_code} - {error_message}") raise def list_tables(region_name: str = "us-east-1") -> list: """List all DynamoDB tables in the specified region.""" client = get_dynamodb_client(region_name) tables = [] last_evaluated_table_name = None try: while True: if last_evaluated_table_name: response = client.list_tables(ExclusiveStartTableName=last_evaluated_table_name) else: response = client.list_tables() tables.extend(response.get("TableNames", [])) last_evaluated_table_name = response.get("LastEvaluatedTableName") if not last_evaluated_table_name: break return tables except ClientError as e: error_code = e.response["Error"]["Code"] error_message = e.response["Error"]["Message"] print(f"Error listing tables: {error_code} - {error_message}") raise def get_table_info(table_name: str, region_name: str = "us-east-1") -> dict: """Get metadata information about a DynamoDB table.""" client = get_dynamodb_client(region_name) try: response = client.describe_table(TableName=table_name) table_info = response.get("Table", {}) return { "TableName": table_info.get("TableName"), "TableStatus": table_info.get("TableStatus"), "ItemCount": table_info.get("ItemCount"), "TableSizeBytes": table_info.get("TableSizeBytes"), "KeySchema": table_info.get("KeySchema"), "AttributeDefinitions": table_info.get("AttributeDefinitions"), "CreationDateTime": str(table_info.get("CreationDateTime")), } except ClientError as e: error_code = e.response["Error"]["Code"] error_message = e.response["Error"]["Message"] print(f"Error describing table: {error_code} - {error_message}") raise def read_table_as_xml(table_name: str, region_name: str = "us-east-1") -> str: """ Read all entries from a DynamoDB table and return as XML string. Args: table_name: Name of the DynamoDB table to read region_name: AWS region where the table is located (default: us-east-1) Returns: XML formatted string with all items wrapped in tags """ items = scan_table(table_name, region_name) return format_items_to_xml(items) if __name__=="__main__": print(read_table_as_xml("poc_dnx_monthly_summary","us-east-1"))