54 lines
1.5 KiB
Python
54 lines
1.5 KiB
Python
import boto3
|
|
import json
|
|
import os
|
|
from botocore.exceptions import ClientError
|
|
from langfuse import Langfuse
|
|
|
|
from .config import REGION, TABLE, SECRET_NAME
|
|
|
|
dynamodb = boto3.resource("dynamodb", region_name=REGION)
|
|
|
|
|
|
def get_secret() -> str:
|
|
session = boto3.session.Session()
|
|
client = session.client(service_name="secretsmanager", region_name=REGION)
|
|
try:
|
|
response = client.get_secret_value(SecretId=SECRET_NAME)
|
|
except ClientError as e:
|
|
raise e
|
|
return response["SecretString"]
|
|
|
|
|
|
secrets = json.loads(get_secret())
|
|
langfuse = Langfuse(
|
|
public_key=secrets["LANGFUSE-PUBLIC-KEY"],
|
|
secret_key=secrets["LANGFUSE-SECRET-KEY"],
|
|
host=os.environ["LANGFUSE_HOST"],
|
|
)
|
|
|
|
|
|
def get_contexto(dashboard: str) -> dict:
|
|
"""
|
|
Get contexto, filter, and items_disponiveis from DynamoDB for a given dashboard.
|
|
|
|
Returns:
|
|
Dict with 'contexto', 'filter', and 'items_disponiveis' keys
|
|
"""
|
|
try:
|
|
table = dynamodb.Table(TABLE)
|
|
response = table.get_item(Key={"id": dashboard + "_contexto"})
|
|
|
|
if "Item" not in response:
|
|
return {"contexto": "", "filter": "", "items_disponiveis": {}}
|
|
|
|
item = response["Item"]
|
|
return {
|
|
"contexto": item.get("contexto", ""),
|
|
"filter": item.get("filter_key", ""),
|
|
"items_disponiveis": item.get("itens_disponiveis", {}),
|
|
}
|
|
|
|
except ClientError as e:
|
|
error_message = e.response["Error"]["Message"]
|
|
return {"contexto": f"Error: {error_message}", "filter": "", "items_disponiveis": {}}
|