Feat: Pulumi S3

This commit is contained in:
2026-03-16 10:59:51 -03:00
parent 624f5dc7e6
commit a4d9583821
11 changed files with 74 additions and 157 deletions

View File

@@ -1,6 +1,5 @@
import os
TABLE = os.environ["TABLE"]
REGION = os.environ["REGION"]
AWS_ACCOUNT = os.environ["AWS_ACCOUNT"]
SECRET_NAME = os.environ["SECRET_NAME"]
KNOWLEDGE_BASE_ID = os.environ["KNOWLEDGE_BASE_ID"]

View File

@@ -4,9 +4,8 @@ import os
from botocore.exceptions import ClientError
from langfuse import Langfuse
from .config import REGION, TABLE, SECRET_NAME
from .config import REGION, SECRET_NAME
dynamodb = boto3.resource("dynamodb", region_name=REGION)
def get_secret() -> str:
@@ -19,35 +18,9 @@ def get_secret() -> str:
return response["SecretString"]
secrets = json.loads(get_secret())
langfuse = Langfuse(
public_key=secrets["LANGFUSE-PUBLIC-KEY"],
secret_key=secrets["LANGFUSE-SECRET-KEY"],
host=os.environ["LANGFUSE_HOST"],
)
def get_contexto(dashboard: str) -> dict:
"""
Get contexto, filter, and items_disponiveis from DynamoDB for a given dashboard.
Returns:
Dict with 'contexto', 'filter', and 'items_disponiveis' keys
"""
try:
table = dynamodb.Table(TABLE)
response = table.get_item(Key={"id": dashboard + "_contexto"})
if "Item" not in response:
return {"contexto": "", "filter": "", "items_disponiveis": {}}
item = response["Item"]
return {
"contexto": item.get("contexto", ""),
"filter": item.get("filter_key", ""),
"items_disponiveis": item.get("itens_disponiveis", {}),
}
except ClientError as e:
error_message = e.response["Error"]["Message"]
return {"contexto": f"Error: {error_message}", "filter": "", "items_disponiveis": {}}
#secrets = json.loads(get_secret())
#langfuse = Langfuse(
# public_key=secrets["LANGFUSE-PUBLIC-KEY"],
# secret_key=secrets["LANGFUSE-SECRET-KEY"],
# host=os.environ["LANGFUSE_HOST"],
#)

View File

@@ -2,17 +2,16 @@ from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langfuse.langchain import CallbackHandler
from .config import REGION
from .dynamo import langfuse, get_contexto
from .agent_bedrock import create_agent
from .tools import ReportTools
from .tools import build_knowledge_base_tool
def main(user_query, history, model, base):
def main(user_query, history, model="anthropic.claude-sonnet-4-5-20250929-v1:0"):
"""Main execution function."""
report_tools = []
report_tools = [build_knowledge_base_tool()]
SYSTEM_PROMPT = """"""
SYSTEM_PROMPT = """Você é um assistente de matrículas para o campus capivari do instituo federal de são paulo, tem acesso a uma tool que acessa uma knowledge base com informações sobre tanto a matricula dos alunos do técnico quanto superior do procedimento iterno, não responda perguntas sobre o meio de ingresso SISU."""
langfuse_handler = CallbackHandler()
agent = create_agent(model, REGION, tools=report_tools)
@@ -24,7 +23,7 @@ def main(user_query, history, model, base):
"current_step": "init",
}
config = {"callbacks": [langfuse_handler], "tags": [base]}
config = {"callbacks": [langfuse_handler]}
final_state = agent.invoke(initial_state, config=config)
total_input_tokens = 0
@@ -33,8 +32,6 @@ def main(user_query, history, model, base):
if isinstance(msg, AIMessage) and hasattr(msg, "usage_metadata") and msg.usage_metadata:
total_input_tokens += msg.usage_metadata.get("input_tokens", 0)
total_output_tokens += msg.usage_metadata.get("output_tokens", 0)
langfuse.flush()
return {
"response": final_state["messages"][-1].content,
"input_tokens": total_input_tokens,

View File

@@ -1,85 +1,21 @@
from botocore.exceptions import ClientError
from langchain_core.tools import StructuredTool
from langchain_aws import AmazonKnowledgeBasesRetriever
from langchain_core.tools.retriever import create_retriever_tool
from .config import TABLE
from .dynamo import dynamodb
from .config import REGION, KNOWLEDGE_BASE_ID
class ReportTools:
def __init__(self, id_mapping: dict[str, str]):
self.id_mapping = id_mapping
def get_variable_value(self, id: str, variable: str) -> str:
"""
Get a specific variable's value from DynamoDB for a specific id.
Args:
id: The id of the data
variable: The variable/column name to retrieve from the table
Returns:
The content of the specified variable for the given id
"""
real_id = self.id_mapping.get(id, id)
try:
table = dynamodb.Table(TABLE)
response = table.get_item(Key={"id": real_id})
if "Item" not in response:
return f"No report found for month: {id}"
item = response["Item"]
content = item.get(variable, "")
if not content:
return f"Variable '{variable}' not found for month: {id}"
return f"<{id}>\n{content}\n</{id}>"
except ClientError as e:
error_message = e.response["Error"]["Message"]
return f"Error fetching report: {error_message}"
def get_variables_list(self, id: str) -> str:
"""
Get the list of variables available in the table for a specific month.
Args:
id: The id of the data
Returns:
The list of available variables/keys for the specified data
"""
real_id = self.id_mapping.get(id, id)
try:
table = dynamodb.Table(TABLE)
response = table.get_item(Key={"id": real_id})
if "Item" not in response:
return f"No data found for month: {id}"
item = response["Item"]
chaves_consolidadas = item.get("chaves_consolidadas", "")
if not chaves_consolidadas:
return f"No consolidated keys found for id: {id}"
return chaves_consolidadas
except ClientError as e:
error_message = e.response["Error"]["Message"]
return f"Error fetching consolidated keys: {error_message}"
def as_tools(self) -> list:
return [
StructuredTool.from_function(
self.get_variable_value,
name="get_variable_value",
description="Get a specific variable's data from DynamoDB for a specific id.",
),
StructuredTool.from_function(
self.get_variables_list,
name="get_variable_list",
description="Get the list of variables available in the table for a specific id.",
),
]
def build_knowledge_base_tool():
retriever = AmazonKnowledgeBasesRetriever(
knowledge_base_id=KNOWLEDGE_BASE_ID,
retrieval_config={"vectorSearchConfiguration": {"numberOfResults": 5}},
region_name=REGION,
)
return create_retriever_tool(
retriever,
name="consultar_base_conhecimento",
description=(
"Consulta a base de conhecimento com informações sobre procedimentos internos de matrícula "
"para cursos técnicos e superiores do campus Capivari do IFSP. "
"Use esta ferramenta para responder dúvidas sobre matrícula."
),
)

View File

@@ -41,7 +41,7 @@ if prompt := st.chat_input("Type your message here..."):
# Simulate streaming response (replace with actual API call)
result = orquestrador.main(prompt,str(st.session_state.messages),selected_value,base)
result = orquestrador.main(prompt,str(st.session_state.messages))
full_response = result["response"]
# Simulate typing effect

View File

@@ -1,8 +1,8 @@
boto3==1.42.10
langchain-aws==1.1.0
langgraph==1.0.5
langchain-aws==1.3.0
langgraph==1.0.9
langchain==1.2.0
streamlit==1.52.2
langfuse==3.11.2
fastapi==0.129.0
streamlit==1.54.0
langfuse==3.14.5
fastapi==0.133.0
uvicorn==0.41.0