Feat: Desacoplamento

This commit is contained in:
2026-03-12 15:02:26 -03:00
parent 08be8e314d
commit 8c3c23c65f
11 changed files with 231 additions and 163 deletions

View File

@@ -1,101 +1,32 @@
import asyncio
import time
import uvicorn
from fastapi import FastAPI, Security, HTTPException
from fastapi.security import APIKeyHeader
from pydantic import BaseModel
import uvicorn
import boto3
import asyncio
import json
import time
import io
from pathlib import Path
from urllib.parse import urlparse
from PyPDF2 import PdfReader,PdfWriter
from datetime import datetime
from utils.langgraph_agent import RULES, run_agent
from utils.langgraph_agent import RULES
from utils.secrets_manager import SECRETS
from services.document_extractor import parse_s3_uri, extract_text_from_s3_document
from services.authorization import evaluate_servico
from services.result_store import save_results
app = FastAPI()
AWS_REGION = "us-east-2"
OUTPUT_BUCKET = "upflux-doc-analyzer"
VERSION = "v1"
# API Key auth
_api_key_header = APIKeyHeader(name="X-API-Key")
API_KEY = SECRETS["API-KEY"]
AWS_ACCESS_KEY =SECRETS["AWS_ACCESS_KEY"]
AWS_SECRET_KEY = SECRETS["AWS_SECRET_KEY"]
_s3_input = boto3.client("s3",aws_access_key_id=AWS_ACCESS_KEY,aws_secret_access_key=AWS_SECRET_KEY,region_name=AWS_REGION)
_s3_output = boto3.client("s3", region_name=AWS_REGION)
_textract = boto3.client("textract", region_name=AWS_REGION)
def verify_api_key(api_key: str = Security(_api_key_header)):
if api_key != API_KEY:
raise HTTPException(status_code=403, detail="Invalid API key")
return api_key
# --- S3 / Textract helpers ---
def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
parsed = urlparse(s3_uri)
if parsed.scheme != "s3":
raise ValueError(f"Not an S3 URI: {s3_uri}")
bucket = parsed.netloc
key = parsed.path.lstrip("/")
if not bucket or not key:
raise ValueError(f"Invalid S3 URI: {s3_uri}")
return bucket, key
def extract_text_from_textract_response(response: dict) -> str:
if not response:
return ""
return "\n".join(
block["Text"] for block in response.get("Blocks", [])
if block["BlockType"] == "LINE"
)
def _split_pdf_pages(pdf_bytes: bytes) -> list[bytes]:
reader = PdfReader(io.BytesIO(pdf_bytes))
pages = []
for page in reader.pages:
writer = PdfWriter()
writer.add_page(page)
buf = io.BytesIO()
writer.write(buf)
pages.append(buf.getvalue())
return pages
def _textract_detect_bytes(file_bytes: bytes) -> str:
response = _textract.detect_document_text(Document={"Bytes": file_bytes})
return extract_text_from_textract_response(response)
async def extract_text_from_s3_document(bucket: str, key: str) -> tuple[str, int]:
file_bytes = await asyncio.to_thread(
lambda: _s3_input.get_object(Bucket=bucket, Key=key)["Body"].read()
)
file_ext = Path(key).suffix.lower()
if file_ext in [".png", ".jpg", ".jpeg"]:
text = await asyncio.to_thread(_textract_detect_bytes, file_bytes)
return text, 1
if file_ext == ".pdf":
page_bytes_list = await asyncio.to_thread(_split_pdf_pages, file_bytes)
texts = await asyncio.gather(*[
asyncio.to_thread(_textract_detect_bytes, p) for p in page_bytes_list
])
return "\n".join(texts), len(page_bytes_list)
return "", 0
async def process_guia(guia: dict) -> dict:
guia_code = guia.get("guia", {}).get("codigoGuiaLocal", "unknown")
t_start = time.time()
# Step 1: Extract text from all anexos
anexos = guia.get("anexos", [])
all_extracted_texts = []
async def _extract_anexo(anexo_idx: int, anexo: dict):
s3_uri = anexo.get("urlAnexo") or anexo.get("URLAnexo", "")
@@ -121,54 +52,11 @@ async def process_guia(guia: dict) -> dict:
file_content = "\n\n".join(p for p in parts if p)
t_extracao = round(time.time() - t_extracao_start, 2)
# Step 2: For each servico, run the agent
servicos = guia.get("servicos", [])
avaliacao_resultados = []
async def _run_servico(servico: dict) -> dict:
codigo_servico_raw = str(servico.get("codigoServico", ""))
code = "".join(c for c in codigo_servico_raw if c.isdigit())
if code not in RULES:
return {
"codigoServico": codigo_servico_raw,
"resultado": "SKIPPED",
"motivo": f"Codigo '{code}' nao encontrado nas regras",
"agentOutput": "",
"tempoAgentSegundos": 0,
}
query_data = {
"atendimento": guia.get("atendimento", {}),
"guia": guia.get("guia", {}),
"servico": servico,
"historico": guia.get("historico", {})
}
query = json.dumps(query_data, indent=2, ensure_ascii=False)
t0 = time.time()
try:
result = await run_agent(query, code, file_content)
agent_output = result["response"]
input_tokens = result["input_tokens"]
output_tokens = result["output_tokens"]
except Exception as e:
print(f" Agent error for servico {codigo_servico_raw}: {e}")
agent_output = f"ERROR: {str(e)}"
input_tokens = 0
output_tokens = 0
return {
"codigoServico": codigo_servico_raw,
"resultado": "Aprovado" if "".join(c for c in agent_output.lower() if c.isalpha()).startswith("aprov") else "Reprovado",
"agentOutput": agent_output,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"tempoAgentSegundos": round(time.time() - t0, 2),
}
t_agent_start = time.time()
guia["avaliacaoAgente"] = list(await asyncio.gather(*[_run_servico(s) for s in servicos]))
guia["avaliacaoAgente"] = list(
await asyncio.gather(*[evaluate_servico(s, guia, file_content) for s in servicos])
)
t_agent = round(time.time() - t_agent_start, 2)
guia["tempoProcessamento"] = {
@@ -176,24 +64,19 @@ async def process_guia(guia: dict) -> dict:
"agentSegundos": t_agent,
"totalSegundos": round(time.time() - t_start, 2),
}
return guia
# --- API models ---
class ProcessRequest(BaseModel):
operadora: dict
guias: list[dict]
# --- Endpoints ---
@app.post("/process", dependencies=[Security(verify_api_key)])
async def process(request: ProcessRequest):
raw_results = await asyncio.gather(
*[process_guia(guia) for guia in request.guias],
return_exceptions=True
return_exceptions=True,
)
results = [
{"error": str(r), "guia": request.guias[i].get("guia", {}).get("codigoGuiaLocal", f"index_{i}")}
@@ -201,32 +84,9 @@ async def process(request: ProcessRequest):
for i, r in enumerate(raw_results)
]
response_body = {
"status": "success",
"operadora": request.operadora,
"guias": results
}
await save_results(results)
# Save result to S3
# Save result to S3
async def _save_guia(guia_result: dict):
numero_guia = guia_result.get("guia", {}).get("codigoGuiaLocal", "unknown")
key = f"{VERSION}/{numero_guia}_{timestamp}.json"
await asyncio.to_thread(
_s3_output.put_object,
Bucket=OUTPUT_BUCKET,
Key=key,
Body=json.dumps(guia_result, ensure_ascii=False),
ContentType="application/json",
)
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
await asyncio.gather(*[_save_guia(g) for g in results])
except Exception as e:
print(f"Error saving to S3: {e}")
return response_body
return {"status": "success", "operadora": request.operadora, "guias": results}
@app.get("/health")

View File

@@ -10,6 +10,7 @@ RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*
COPY app.py .
COPY utils/ ./utils/
COPY services/ ./services/
EXPOSE 8000

View File

View File

@@ -0,0 +1,46 @@
import json
import time
from utils.langgraph_agent import RULES, run_agent
async def evaluate_servico(servico: dict, guia: dict, file_content: str) -> dict:
codigo_servico_raw = str(servico.get("codigoServico", ""))
code = "".join(c for c in codigo_servico_raw if c.isdigit())
if code not in RULES:
return {
"codigoServico": codigo_servico_raw,
"resultado": "SKIPPED",
"motivo": f"Codigo '{code}' nao encontrado nas regras",
"agentOutput": "",
"tempoAgentSegundos": 0,
}
query_data = {
"atendimento": guia.get("atendimento", {}),
"guia": guia.get("guia", {}),
"servico": servico,
"historico": guia.get("historico", {}),
}
query = json.dumps(query_data, indent=2, ensure_ascii=False)
t0 = time.time()
try:
result = await run_agent(query, code, file_content)
agent_output = result["response"]
input_tokens = result["input_tokens"]
output_tokens = result["output_tokens"]
except Exception as e:
agent_output = f"ERROR: {str(e)}"
input_tokens = 0
output_tokens = 0
return {
"codigoServico": codigo_servico_raw,
"resultado": "Aprovado" if "".join(c for c in agent_output.lower() if c.isalpha()).startswith("aprov") else "Reprovado",
"agentOutput": agent_output,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"tempoAgentSegundos": round(time.time() - t0, 2),
}

View File

@@ -0,0 +1,75 @@
import asyncio
import io
from pathlib import Path
from urllib.parse import urlparse
import boto3
from PyPDF2 import PdfReader, PdfWriter
from utils.config import AWS_REGION
from utils.secrets_manager import SECRETS
_s3_input = boto3.client(
"s3",
aws_access_key_id=SECRETS["AWS_ACCESS_KEY"],
aws_secret_access_key=SECRETS["AWS_SECRET_KEY"],
region_name=AWS_REGION,
)
_textract = boto3.client("textract", region_name=AWS_REGION)
def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
parsed = urlparse(s3_uri)
if parsed.scheme != "s3":
raise ValueError(f"Not an S3 URI: {s3_uri}")
bucket = parsed.netloc
key = parsed.path.lstrip("/")
if not bucket or not key:
raise ValueError(f"Invalid S3 URI: {s3_uri}")
return bucket, key
def _extract_text_from_textract_response(response: dict) -> str:
if not response:
return ""
return "\n".join(
block["Text"] for block in response.get("Blocks", [])
if block["BlockType"] == "LINE"
)
def _split_pdf_pages(pdf_bytes: bytes) -> list[bytes]:
reader = PdfReader(io.BytesIO(pdf_bytes))
pages = []
for page in reader.pages:
writer = PdfWriter()
writer.add_page(page)
buf = io.BytesIO()
writer.write(buf)
pages.append(buf.getvalue())
return pages
def _textract_detect_bytes(file_bytes: bytes) -> str:
response = _textract.detect_document_text(Document={"Bytes": file_bytes})
return _extract_text_from_textract_response(response)
async def extract_text_from_s3_document(bucket: str, key: str) -> tuple[str, int]:
file_bytes = await asyncio.to_thread(
lambda: _s3_input.get_object(Bucket=bucket, Key=key)["Body"].read()
)
file_ext = Path(key).suffix.lower()
if file_ext in [".png", ".jpg", ".jpeg"]:
text = await asyncio.to_thread(_textract_detect_bytes, file_bytes)
return text, 1
if file_ext == ".pdf":
page_bytes_list = await asyncio.to_thread(_split_pdf_pages, file_bytes)
texts = await asyncio.gather(*[
asyncio.to_thread(_textract_detect_bytes, p) for p in page_bytes_list
])
return "\n".join(texts), len(page_bytes_list)
return "", 0

View File

@@ -0,0 +1,29 @@
import asyncio
import json
from datetime import datetime
import boto3
from utils.config import AWS_REGION, OUTPUT_BUCKET, API_VERSION
_s3_output = boto3.client("s3", region_name=AWS_REGION)
async def save_results(results: list[dict]) -> None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
async def _save_one(guia_result: dict):
numero_guia = guia_result.get("guia", {}).get("codigoGuiaLocal", "unknown")
key = f"{API_VERSION}/{numero_guia}_{timestamp}.json"
await asyncio.to_thread(
_s3_output.put_object,
Bucket=OUTPUT_BUCKET,
Key=key,
Body=json.dumps(guia_result, ensure_ascii=False),
ContentType="application/json",
)
try:
await asyncio.gather(*[_save_one(g) for g in results])
except Exception:
pass

12
code/utils/config.py Normal file
View File

@@ -0,0 +1,12 @@
import os
# AWS
AWS_REGION = os.environ.get("AWS_REGION", "us-east-2")
BEDROCK_MODEL_ARN = os.environ["BEDROCK_MODEL_ARN"]
# S3
OUTPUT_BUCKET = os.environ.get("OUTPUT_BUCKET", "upflux-doc-analyzer")
API_VERSION = os.environ.get("API_VERSION", "v1")
# Langfuse
LANGFUSE_HOST = os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")

View File

@@ -17,15 +17,16 @@ from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, System
from langchain_core.tools import tool
import os
from langfuse import Langfuse
from langfuse.langchain import CallbackHandler
from langfuse.langchain import CallbackHandler
from utils.secrets_manager import SECRETS
from utils.config import AWS_REGION, BEDROCK_MODEL_ARN, LANGFUSE_HOST
CODE=""
langfuse = Langfuse(
secret_key=SECRETS["LANGFUSE-SECRET-KEY"],
public_key=SECRETS["LANGFUSE-PUBLIC-KEY"],
host=os.environ.get("LANGFUSE_HOST", ""),
host=LANGFUSE_HOST,
)
@@ -436,14 +437,14 @@ class AgentState(TypedDict):
def get_bedrock_client():
"""Initialize and return AWS Bedrock runtime client."""
return boto3.client("bedrock-runtime", region_name="us-east-2")
return boto3.client("bedrock-runtime", region_name=AWS_REGION)
def create_llm():
"""Create and return the Bedrock LLM."""
return ChatBedrock(
model_id="arn:aws:bedrock:us-east-2:232048051668:application-inference-profile/uy4xskop19zn",
region_name="us-east-2",
model_id=BEDROCK_MODEL_ARN,
region_name=AWS_REGION,
provider="anthropic"
)