import boto3 import json import os from botocore.exceptions import ClientError from langfuse import Langfuse from .config import REGION, TABLE, SECRET_NAME dynamodb = boto3.resource("dynamodb", region_name=REGION) def get_secret() -> str: session = boto3.session.Session() client = session.client(service_name="secretsmanager", region_name=REGION) try: response = client.get_secret_value(SecretId=SECRET_NAME) except ClientError as e: raise e return response["SecretString"] secrets = json.loads(get_secret()) langfuse = Langfuse( public_key=secrets["LANGFUSE-PUBLIC-KEY"], secret_key=secrets["LANGFUSE-SECRET-KEY"], host=os.environ["LANGFUSE_HOST"], ) def get_contexto(dashboard: str) -> dict: """ Get contexto, filter, and items_disponiveis from DynamoDB for a given dashboard. Returns: Dict with 'contexto', 'filter', and 'items_disponiveis' keys """ try: table = dynamodb.Table(TABLE) response = table.get_item(Key={"id": dashboard + "_contexto"}) if "Item" not in response: return {"contexto": "", "filter": "", "items_disponiveis": {}} item = response["Item"] return { "contexto": item.get("contexto", ""), "filter": item.get("filter_key", ""), "items_disponiveis": item.get("itens_disponiveis", {}), } except ClientError as e: error_message = e.response["Error"]["Message"] return {"contexto": f"Error: {error_message}", "filter": "", "items_disponiveis": {}}