Adds starting files

This commit is contained in:
2026-01-16 17:45:22 -03:00
parent da8f2984c2
commit c31d089efb
37 changed files with 2560 additions and 0 deletions

View File

@@ -0,0 +1,209 @@
"""
DynamoDB Table Reader Script
This script connects to AWS DynamoDB and reads all entries from a specified table.
Outputs data in XML format with <period> tags containing the context XML content.
Usage:
from dynamodb_read_table import read_table_as_xml
xml_content = read_table_as_xml("my-table-name")
"""
import re
import boto3
from botocore.exceptions import ClientError
def clean_context_xml(context: str) -> str:
"""
Remove XML declaration and <relatorio> tags from context content.
Args:
context: Raw XML content from DynamoDB
Returns:
Cleaned XML content without declaration and relatorio tags
"""
# Remove XML declaration (e.g., <?xml version="1.0" encoding="UTF-8"?>)
context = re.sub(r'<\?xml[^?]*\?>\s*', '', context)
# Remove opening <relatorio> tag (with any attributes)
context = re.sub(r'<relatorio[^>]*>\s*', '', context)
# Remove closing </relatorio> tag
context = re.sub(r'\s*</relatorio>', '', context)
return context.strip()
def remove_xml_declaration(content: str) -> str:
"""
Remove only the XML declaration from content.
Args:
content: Raw XML content
Returns:
Content without XML declaration (keeps relatorio tags)
"""
content = re.sub(r'<\?xml[^?]*\?>\s*', '', content)
return content.strip()
def format_items_to_xml(items: list) -> str:
"""
Format all DynamoDB items to XML format.
Each item's 'period' field becomes a <period> tag,
and the 'context' and 'dados_consolidados' fields are placed inside it.
Args:
items: List of DynamoDB items
Returns:
Complete XML formatted string with all items
"""
xml_parts = []
for item in items:
period = item.get("period", "unknown")
context = item.get("context", "")
dados_consolidados = item.get("dados_consolidados", "")
# Clean the XML content
cleaned_context = clean_context_xml(context)
cleaned_dados = remove_xml_declaration(dados_consolidados)
xml_parts.append(f"<{period}>")
xml_parts.append(cleaned_context)
if cleaned_dados:
xml_parts.append(cleaned_dados)
xml_parts.append(f"</{period}>")
xml_parts.append("") # Empty line between entries
return "\n".join(xml_parts)
def get_dynamodb_client(region_name: str = "us-east-1"):
"""Create and return a DynamoDB client."""
session = boto3.Session()
return session.client("dynamodb", region_name=region_name)
def get_dynamodb_resource(region_name: str = "us-east-1"):
"""Create and return a DynamoDB resource for higher-level operations."""
session = boto3.Session()
return session.resource("dynamodb", region_name=region_name)
def scan_table(table_name: str, region_name: str = "us-east-1") -> list:
"""
Scan a DynamoDB table and return all items.
Uses pagination to handle tables larger than 1MB response limit.
Args:
table_name: Name of the DynamoDB table to scan
region_name: AWS region where the table is located
Returns:
List of all items in the table
"""
dynamodb = get_dynamodb_resource(region_name)
table = dynamodb.Table(table_name)
items = []
last_evaluated_key = None
try:
while True:
if last_evaluated_key:
response = table.scan(ExclusiveStartKey=last_evaluated_key)
else:
response = table.scan()
items.extend(response.get("Items", []))
last_evaluated_key = response.get("LastEvaluatedKey")
if not last_evaluated_key:
break
print(f"Successfully scanned {len(items)} items from table '{table_name}'")
return items
except ClientError as e:
error_code = e.response["Error"]["Code"]
error_message = e.response["Error"]["Message"]
print(f"Error scanning table: {error_code} - {error_message}")
raise
def list_tables(region_name: str = "us-east-1") -> list:
"""List all DynamoDB tables in the specified region."""
client = get_dynamodb_client(region_name)
tables = []
last_evaluated_table_name = None
try:
while True:
if last_evaluated_table_name:
response = client.list_tables(ExclusiveStartTableName=last_evaluated_table_name)
else:
response = client.list_tables()
tables.extend(response.get("TableNames", []))
last_evaluated_table_name = response.get("LastEvaluatedTableName")
if not last_evaluated_table_name:
break
return tables
except ClientError as e:
error_code = e.response["Error"]["Code"]
error_message = e.response["Error"]["Message"]
print(f"Error listing tables: {error_code} - {error_message}")
raise
def get_table_info(table_name: str, region_name: str = "us-east-1") -> dict:
"""Get metadata information about a DynamoDB table."""
client = get_dynamodb_client(region_name)
try:
response = client.describe_table(TableName=table_name)
table_info = response.get("Table", {})
return {
"TableName": table_info.get("TableName"),
"TableStatus": table_info.get("TableStatus"),
"ItemCount": table_info.get("ItemCount"),
"TableSizeBytes": table_info.get("TableSizeBytes"),
"KeySchema": table_info.get("KeySchema"),
"AttributeDefinitions": table_info.get("AttributeDefinitions"),
"CreationDateTime": str(table_info.get("CreationDateTime")),
}
except ClientError as e:
error_code = e.response["Error"]["Code"]
error_message = e.response["Error"]["Message"]
print(f"Error describing table: {error_code} - {error_message}")
raise
def read_table_as_xml(table_name: str, region_name: str = "us-east-1") -> str:
"""
Read all entries from a DynamoDB table and return as XML string.
Args:
table_name: Name of the DynamoDB table to read
region_name: AWS region where the table is located (default: us-east-1)
Returns:
XML formatted string with all items wrapped in <period> tags
"""
items = scan_table(table_name, region_name)
return format_items_to_xml(items)
if __name__=="__main__":
print(read_table_as_xml("poc_dnx_monthly_summary","us-east-1"))