Adds final version with new prompt

This commit is contained in:
2026-02-11 14:45:00 -03:00
parent c31d089efb
commit 82ac556ecc
9 changed files with 283 additions and 92 deletions

35
code/app/api.py Normal file
View File

@@ -0,0 +1,35 @@
from fastapi import FastAPI
from pydantic import BaseModel
from .backend import BDAgent
app = FastAPI()
@app.get("/")
def health():
return {"status": "ok"}
class QueryRequest(BaseModel):
query: str
history: str = ""
model: str = "anthropic.claude-haiku-4-5-20251001-v1:0"
base: str = "bacio_transacional_loja_app"
class QueryResponse(BaseModel):
response: str
input_tokens: int
output_tokens: int
total_tokens: int
@app.post("/agent", response_model=QueryResponse)
def run_agent(request: QueryRequest):
result = BDAgent.main(request.query, request.history, request.model, request.base)
return QueryResponse(
response=result["response"],
input_tokens=result["input_tokens"],
output_tokens=result["output_tokens"],
total_tokens=result["total_tokens"],
)

View File

@@ -4,7 +4,6 @@ LangGraph Agent using AWS Bedrock Cross-Region Inference Profile with Tools
This script demonstrates how to create a LangGraph agent that uses
an AWS Bedrock inference profile with custom tools (add and multiply).
"""
import boto3
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
@@ -18,9 +17,109 @@ from langfuse import Langfuse
from langfuse.langchain import CallbackHandler
from botocore.exceptions import ClientError
import os
from backend.utils import dynamodb_read_table as drt
WORKGROUP = "iceberg-workgroup"
DATABASE = "dnx_warehouse"
TABLE = "poc_dnx_monthly_summary"
REGION = "us-east-1"
# DynamoDB client
dynamodb = boto3.resource("dynamodb", region_name=REGION)
@tool
def get_monthly_report(id: str, variable: str) -> str:
"""
Get a specific variable's data from DynamoDB for a specific id.
Args:
id: The id of the data
variable: The variable/column name to retrieve from the table
Returns:
The content of the specified variable for the given id
"""
print(f"\n🔧 [TOOL CALLED] get_monthly_report for month: {id}, variable: {variable}")
try:
table = dynamodb.Table(TABLE)
response = table.get_item(Key={"id": id})
if "Item" not in response:
return f"No report found for month: {id}"
item = response["Item"]
content = item.get(variable, "")
if not content:
return f"Variable '{variable}' not found for month: {id}"
result = f"<{id}>\n{content}\n</{id}>"
return result
except ClientError as e:
error_message = e.response["Error"]["Message"]
return f"Error fetching report: {error_message}"
@tool
def get_consolidated_keys(id: str) -> str:
"""
Get the list of consolidated keys (variables) available in the table for a specific month.
Args:
id: The id of the data
Returns:
The list of available variables/keys for the specified data
"""
print(f"\n🔧 [TOOL CALLED] get_consolidated_keys for id: {id}")
try:
table = dynamodb.Table(TABLE)
response = table.get_item(Key={"id": id})
if "Item" not in response:
return f"No data found for month: {id}"
item = response["Item"]
chaves_consolidadas = item.get("chaves_consolidadas", "")
if not chaves_consolidadas:
return f"No consolidated keys found for id: {id}"
return chaves_consolidadas
except ClientError as e:
error_message = e.response["Error"]["Message"]
return f"Error fetching consolidated keys: {error_message}"
def get_contexto() -> dict:
"""
Get contexto, filter, and items_disponiveis from DynamoDB where id=DASHBOARD+'_contexto'.
Returns:
Dict with 'contexto', 'filter', and 'items_disponiveis' keys
"""
try:
table = dynamodb.Table(TABLE)
response = table.get_item(Key={"id": DASHBOARD + "_contexto"})
if "Item" not in response:
return {"contexto": "", "filter": "", "items_disponiveis": {}}
item = response["Item"]
return {
"contexto": item.get("contexto", ""),
"filter": item.get("filter_key", ""),
"items_disponiveis": item.get("itens_disponiveis", {}),
}
except ClientError as e:
error_message = e.response["Error"]["Message"]
return {"contexto": f"Error: {error_message}", "filter": "", "items_disponiveis": {}}
def get_secret():
secret_name = "assistente-db-secrets-manager"
@@ -112,7 +211,7 @@ def create_bedrock_llm(model_id: str, region: str = "us-east-1"):
"""
# Determine provider and model_kwargs based on model ID
MODEL_ARNS = {
"anthropic.claude-haiku-4-5-20251001-v1:0": "arn:aws:bedrock:us-east-1:305427701314:inference-profile/global.anthropic.claude-haiku-4-5-20251001-v1:0",
"anthropic.claude-haiku-4-5-20251001-v1:0": "arn:aws:bedrock:us-east-1:305427701314:inference-profile/us.anthropic.claude-haiku-4-5-20251001-v1:0",
"anthropic.claude-sonnet-4-5-20250929-v1:0": "arn:aws:bedrock:us-east-1:305427701314:inference-profile/global.anthropic.claude-sonnet-4-5-20250929-v1:0",
"meta.llama4-maverick-17b-instruct-v1:0": "arn:aws:bedrock:us-east-1:305427701314:inference-profile/us.meta.llama4-maverick-17b-instruct-v1:0",
"meta.llama4-scout-17b-instruct-v1:0": "arn:aws:bedrock:us-east-1:305427701314:inference-profile/us.meta.llama4-scout-17b-instruct-v1:0",
@@ -130,7 +229,7 @@ def create_bedrock_llm(model_id: str, region: str = "us-east-1"):
"amazon.nova-2-lite-v1:0": "amazon"
}
prefix={
"anthropic.claude-haiku-4-5-20251001-v1:0": "global",
"anthropic.claude-haiku-4-5-20251001-v1:0": "us",
"anthropic.claude-sonnet-4-5-20250929-v1:0": "global",
"meta.llama4-maverick-17b-instruct-v1:0": "us",
"meta.llama4-scout-17b-instruct-v1:0": "us",
@@ -147,8 +246,7 @@ def create_bedrock_llm(model_id: str, region: str = "us-east-1"):
)
# Bind tools to the LLM
#tools = [consult_answers,count_table_rows]
tools=[]
tools = [get_monthly_report, get_consolidated_keys]
llm_with_tools = llm.bind_tools(tools)
return llm_with_tools
@@ -160,9 +258,7 @@ def call_model(state: AgentState, llm) -> AgentState:
print(f"[MODEL] Calling Bedrock inference profile...")
messages = state["messages"]
langfuse_handler = CallbackHandler()
config = {"configurable": {"thread_id": "abc123"},"callbacks": [langfuse_handler]}
response = llm.invoke(messages,config=config)
response = llm.invoke(messages)
state["current_step"] = "model_called"
return {"messages": [response]}
@@ -180,6 +276,8 @@ def call_tools(state: AgentState) -> AgentState:
tool_messages = []
tools_map = {
"get_monthly_report": get_monthly_report,
"get_consolidated_keys": get_consolidated_keys
}
# Execute each tool call
@@ -267,24 +365,52 @@ def create_agent(inference_profile_arn: str, region: str = "us-east-1"):
return app
def main(user_query,history,model):
def main(user_query,history,model,base):
"""Main execution function."""
global DASHBOARD
DASHBOARD = base
# Configuration - Update with your actual inference profile ARN
INFERENCE_PROFILE_ARN = model
REGION = "us-east-1"
# System prompt for the agent
SYSTEM_PROMPT=""" You are a analitical agent, with acess to monthly reports about Bacio di latte
contexto_data = get_contexto()
if contexto_data["filter"]=="period":
CONSULT_RULES="""To use the tools you must give the id of the correspondant data, which can be associated to a given month and year in the following format year_month, which:
-Year is the year in 4 digits (2025,2024,2023,2022,2021,...)
-Month is th two digit representation: 01,02,03,04,05,06,07,08,09,10,11,12
The format of the dict is: {id1:year_month1,id2:year_month2...}
Choose the correct id based on the following dict:
"""
elif contexto_data["filter"]=="event":
CONSULT_RULES="""To use the tools you must give the id of the correspondant data, which can be associated to a event, which is in the format "Name - City DD/MM/YYYY", where the last is a date in the format day/month/year. Theformat of elements in dict is {id1:event_description1,id2:event_description2...}"""
else:
CONSULT_RULES="""Wrong filter value, you must terminate the workflow and ask the user to contact the technical team"""
SYSTEM_PROMPT=""" You are a analitical agent in Brazilian Portuguese, with acess to monthly reports about a specific company, specified in the context. You have access to tools that lets you consult present variables in table, you always have access to "context", which keeps inside answers to different questions, that you may consult as you desire.
Do not access other variables besides the ones reported by the tool and "context".
You currently have access to data in a period specified in the context, so only answer questions inside the time window.
<context>
A Bacio di Latte é uma rede de gelaterias artesanais fundada em São Paulo, Brasil, em 2011, pelos irmãos milaneses Edoardo e Luigi Tonolli, que trouxeram a tradição do gelato italiano com ingredientes de alta qualidade, resultando em um sorvete cremoso e fresco, produzido diariamente, sem gordura hidrogenada ou trans, e que se tornou popular não só no Brasil, mas também nos EUA, representando uma experiência autêntica de gelato.
<\context>
<reports>
"""+drt.read_table_as_xml("poc_dnx_monthly_summary","us-east-1")+""""
<\reports>
"""+contexto_data["contexto"]+"""
</context>
"""+CONSULT_RULES+"""
<correlation>
"""+str(contexto_data["items_disponiveis"])+"""
</correlation>
Here is the chat history:"""+history+"""
Aswer the user the best you can with the given information, if you don't know the answer or how to answer say so, only answer from what you know."""
Inside the "NPS" in data is some useful values to calculate the NPS, which includes "distribuicao".
Inside of it are grades and the amount of people who given that grade.
Grades from 0 to 6 are detractors.
Grades from 7 to 8 are neutral.
Grades from 9 to 10 are promoters.
Calculate the percentage of them when prompted about NPS and then calculate the nps using the following formula: NPS = %promoter - %detractor, never use the medium of the notes.
You have access to the tools:
-get_consolidated_keys: Given a id returns the column names inside of a entity of a given table element.
- get_monthly_report: given a id and a variable name, either one listed in the previous tool output or "context", returns its value. Using "context" gives you a summarization of many answers of questions asked to the customers.
Answer, in Brazilian Portuguese, to the user the best you can with the given information, if you don't know the answer or how to answer say so, only answer from what you know.
Always consult the most recent information when a date is not given, like questions "Quanto é meu nps?" """
print("=" * 60)
print("LangGraph Agent with AWS Bedrock Inference Profile + Tools")
@@ -297,11 +423,10 @@ def main(user_query,history,model):
print("\nSystem Prompt: Configured ✓")
print("=" * 60)
# Create the agent
# Create the agent with a unique session_id to group all steps
langfuse_handler = CallbackHandler()
agent = create_agent(INFERENCE_PROFILE_ARN, REGION)
# Example query that requires tools
# Initialize state with system prompt
initial_state = {
"messages": [
@@ -314,8 +439,9 @@ def main(user_query,history,model):
print(f"\nUser Query: {user_query}\n")
print("-" * 60)
# Run the agent
final_state = agent.invoke(initial_state)
# Run the agent with callbacks at graph level
config = {"callbacks": [langfuse_handler], "tags": [DASHBOARD]}
final_state = agent.invoke(initial_state, config=config)
# Display results
print("-" * 60)
@@ -336,7 +462,21 @@ def main(user_query,history,model):
print("\n" + "=" * 60)
print(f"Agent completed successfully. Final step: {final_state['current_step']}")
# Aggregate token usage from all AIMessage objects
total_input_tokens = 0
total_output_tokens = 0
for msg in final_state["messages"]:
if isinstance(msg, AIMessage) and hasattr(msg, 'usage_metadata') and msg.usage_metadata:
total_input_tokens += msg.usage_metadata.get("input_tokens", 0)
total_output_tokens += msg.usage_metadata.get("output_tokens", 0)
langfuse.flush()
return final_state['messages'][-1].content
return {
"response": final_state['messages'][-1].content,
"input_tokens": total_input_tokens,
"output_tokens": total_output_tokens,
"total_tokens": total_input_tokens + total_output_tokens,
}
if __name__=="__main__":
main("oi","ancar_nps_tradicional","","")
main("Liste o nps mês a mês desde maio 2025 até dezembro 2025","","anthropic.claude-sonnet-4-5-20250929-v1:0")

View File

@@ -1,209 +0,0 @@
"""
DynamoDB Table Reader Script
This script connects to AWS DynamoDB and reads all entries from a specified table.
Outputs data in XML format with <period> tags containing the context XML content.
Usage:
from dynamodb_read_table import read_table_as_xml
xml_content = read_table_as_xml("my-table-name")
"""
import re
import boto3
from botocore.exceptions import ClientError
def clean_context_xml(context: str) -> str:
"""
Remove XML declaration and <relatorio> tags from context content.
Args:
context: Raw XML content from DynamoDB
Returns:
Cleaned XML content without declaration and relatorio tags
"""
# Remove XML declaration (e.g., <?xml version="1.0" encoding="UTF-8"?>)
context = re.sub(r'<\?xml[^?]*\?>\s*', '', context)
# Remove opening <relatorio> tag (with any attributes)
context = re.sub(r'<relatorio[^>]*>\s*', '', context)
# Remove closing </relatorio> tag
context = re.sub(r'\s*</relatorio>', '', context)
return context.strip()
def remove_xml_declaration(content: str) -> str:
"""
Remove only the XML declaration from content.
Args:
content: Raw XML content
Returns:
Content without XML declaration (keeps relatorio tags)
"""
content = re.sub(r'<\?xml[^?]*\?>\s*', '', content)
return content.strip()
def format_items_to_xml(items: list) -> str:
"""
Format all DynamoDB items to XML format.
Each item's 'period' field becomes a <period> tag,
and the 'context' and 'dados_consolidados' fields are placed inside it.
Args:
items: List of DynamoDB items
Returns:
Complete XML formatted string with all items
"""
xml_parts = []
for item in items:
period = item.get("period", "unknown")
context = item.get("context", "")
dados_consolidados = item.get("dados_consolidados", "")
# Clean the XML content
cleaned_context = clean_context_xml(context)
cleaned_dados = remove_xml_declaration(dados_consolidados)
xml_parts.append(f"<{period}>")
xml_parts.append(cleaned_context)
if cleaned_dados:
xml_parts.append(cleaned_dados)
xml_parts.append(f"</{period}>")
xml_parts.append("") # Empty line between entries
return "\n".join(xml_parts)
def get_dynamodb_client(region_name: str = "us-east-1"):
"""Create and return a DynamoDB client."""
session = boto3.Session()
return session.client("dynamodb", region_name=region_name)
def get_dynamodb_resource(region_name: str = "us-east-1"):
"""Create and return a DynamoDB resource for higher-level operations."""
session = boto3.Session()
return session.resource("dynamodb", region_name=region_name)
def scan_table(table_name: str, region_name: str = "us-east-1") -> list:
"""
Scan a DynamoDB table and return all items.
Uses pagination to handle tables larger than 1MB response limit.
Args:
table_name: Name of the DynamoDB table to scan
region_name: AWS region where the table is located
Returns:
List of all items in the table
"""
dynamodb = get_dynamodb_resource(region_name)
table = dynamodb.Table(table_name)
items = []
last_evaluated_key = None
try:
while True:
if last_evaluated_key:
response = table.scan(ExclusiveStartKey=last_evaluated_key)
else:
response = table.scan()
items.extend(response.get("Items", []))
last_evaluated_key = response.get("LastEvaluatedKey")
if not last_evaluated_key:
break
print(f"Successfully scanned {len(items)} items from table '{table_name}'")
return items
except ClientError as e:
error_code = e.response["Error"]["Code"]
error_message = e.response["Error"]["Message"]
print(f"Error scanning table: {error_code} - {error_message}")
raise
def list_tables(region_name: str = "us-east-1") -> list:
"""List all DynamoDB tables in the specified region."""
client = get_dynamodb_client(region_name)
tables = []
last_evaluated_table_name = None
try:
while True:
if last_evaluated_table_name:
response = client.list_tables(ExclusiveStartTableName=last_evaluated_table_name)
else:
response = client.list_tables()
tables.extend(response.get("TableNames", []))
last_evaluated_table_name = response.get("LastEvaluatedTableName")
if not last_evaluated_table_name:
break
return tables
except ClientError as e:
error_code = e.response["Error"]["Code"]
error_message = e.response["Error"]["Message"]
print(f"Error listing tables: {error_code} - {error_message}")
raise
def get_table_info(table_name: str, region_name: str = "us-east-1") -> dict:
"""Get metadata information about a DynamoDB table."""
client = get_dynamodb_client(region_name)
try:
response = client.describe_table(TableName=table_name)
table_info = response.get("Table", {})
return {
"TableName": table_info.get("TableName"),
"TableStatus": table_info.get("TableStatus"),
"ItemCount": table_info.get("ItemCount"),
"TableSizeBytes": table_info.get("TableSizeBytes"),
"KeySchema": table_info.get("KeySchema"),
"AttributeDefinitions": table_info.get("AttributeDefinitions"),
"CreationDateTime": str(table_info.get("CreationDateTime")),
}
except ClientError as e:
error_code = e.response["Error"]["Code"]
error_message = e.response["Error"]["Message"]
print(f"Error describing table: {error_code} - {error_message}")
raise
def read_table_as_xml(table_name: str, region_name: str = "us-east-1") -> str:
"""
Read all entries from a DynamoDB table and return as XML string.
Args:
table_name: Name of the DynamoDB table to read
region_name: AWS region where the table is located (default: us-east-1)
Returns:
XML formatted string with all items wrapped in <period> tags
"""
items = scan_table(table_name, region_name)
return format_items_to_xml(items)
if __name__=="__main__":
print(read_table_as_xml("poc_dnx_monthly_summary","us-east-1"))

View File

@@ -2,6 +2,7 @@ import streamlit as st
import time
from backend import BDAgent
import boto3
from boto3.dynamodb.conditions import Key
# Configure the page - MUST BE FIRST
st.set_page_config(
@@ -11,9 +12,19 @@ st.set_page_config(
)
session = boto3.Session()
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
def list_bases():
table = dynamodb.Table("poc_dnx_monthly_summary")
response = table.query(
IndexName="item_type_index",
KeyConditionExpression=Key("item_type").eq("contexto")
)
return [item["id"].removesuffix("_contexto") for item in response.get("Items", [])]
# Model selection with session state persistence
MODELS = [
"anthropic.claude-haiku-4-5-20251001-v1:0",
"anthropic.claude-sonnet-4-5-20250929-v1:0",
"meta.llama4-maverick-17b-instruct-v1:0",
"meta.llama4-scout-17b-instruct-v1:0",
@@ -27,7 +38,13 @@ selected_value = st.selectbox(
MODELS,
key="selected_model"
)
BASES = list_bases()
BASES.sort()
base = st.selectbox(
"Selecione a base:",
BASES,
key="selected_base"
)
# Initialize chat history in session state
@@ -57,17 +74,19 @@ if prompt := st.chat_input("Type your message here..."):
# Simulate streaming response (replace with actual API call)
full_response = BDAgent.main(prompt,str(st.session_state.messages),selected_value)
result = BDAgent.main(prompt,str(st.session_state.messages),selected_value,base)
full_response = result["response"]
# Simulate typing effect
displayed_response = ""
for char in full_response:
displayed_response += char
message_placeholder.markdown(displayed_response + "")
time.sleep(0.01)
message_placeholder.markdown(full_response)
st.caption(f"Tokens: {result['input_tokens']} in / {result['output_tokens']} out / {result['total_tokens']} total")
# Add assistant response to chat history
st.session_state.messages.append({"role": "assistant", "content": full_response})