From 8c3c23c65f4f5038de13243c43b784d0868f6f4b Mon Sep 17 00:00:00 2001 From: DNXBrasil Date: Thu, 12 Mar 2026 15:02:26 -0300 Subject: [PATCH] Feat: Desacoplamento --- code/app.py | 174 +++------------------------- code/dockerfile | 1 + code/services/__init__.py | 0 code/services/authorization.py | 46 ++++++++ code/services/document_extractor.py | 75 ++++++++++++ code/services/result_store.py | 29 +++++ code/utils/config.py | 12 ++ code/utils/langgraph_agent.py | 11 +- infra/ecs_alb/main.tf | 16 +++ infra/ecs_alb/terraform.tfvars | 7 +- infra/ecs_alb/variables.tf | 23 ++++ 11 files changed, 231 insertions(+), 163 deletions(-) create mode 100644 code/services/__init__.py create mode 100644 code/services/authorization.py create mode 100644 code/services/document_extractor.py create mode 100644 code/services/result_store.py create mode 100644 code/utils/config.py diff --git a/code/app.py b/code/app.py index 6475a19..480bf68 100644 --- a/code/app.py +++ b/code/app.py @@ -1,101 +1,32 @@ +import asyncio +import time + +import uvicorn from fastapi import FastAPI, Security, HTTPException from fastapi.security import APIKeyHeader from pydantic import BaseModel -import uvicorn -import boto3 -import asyncio -import json -import time -import io -from pathlib import Path -from urllib.parse import urlparse -from PyPDF2 import PdfReader,PdfWriter -from datetime import datetime -from utils.langgraph_agent import RULES, run_agent + +from utils.langgraph_agent import RULES from utils.secrets_manager import SECRETS +from services.document_extractor import parse_s3_uri, extract_text_from_s3_document +from services.authorization import evaluate_servico +from services.result_store import save_results + app = FastAPI() -AWS_REGION = "us-east-2" -OUTPUT_BUCKET = "upflux-doc-analyzer" -VERSION = "v1" - -# API Key auth _api_key_header = APIKeyHeader(name="X-API-Key") API_KEY = SECRETS["API-KEY"] -AWS_ACCESS_KEY =SECRETS["AWS_ACCESS_KEY"] -AWS_SECRET_KEY = SECRETS["AWS_SECRET_KEY"] -_s3_input = boto3.client("s3",aws_access_key_id=AWS_ACCESS_KEY,aws_secret_access_key=AWS_SECRET_KEY,region_name=AWS_REGION) -_s3_output = boto3.client("s3", region_name=AWS_REGION) -_textract = boto3.client("textract", region_name=AWS_REGION) + def verify_api_key(api_key: str = Security(_api_key_header)): if api_key != API_KEY: raise HTTPException(status_code=403, detail="Invalid API key") return api_key -# --- S3 / Textract helpers --- - -def parse_s3_uri(s3_uri: str) -> tuple[str, str]: - parsed = urlparse(s3_uri) - if parsed.scheme != "s3": - raise ValueError(f"Not an S3 URI: {s3_uri}") - bucket = parsed.netloc - key = parsed.path.lstrip("/") - if not bucket or not key: - raise ValueError(f"Invalid S3 URI: {s3_uri}") - return bucket, key - - -def extract_text_from_textract_response(response: dict) -> str: - if not response: - return "" - return "\n".join( - block["Text"] for block in response.get("Blocks", []) - if block["BlockType"] == "LINE" - ) - - -def _split_pdf_pages(pdf_bytes: bytes) -> list[bytes]: - reader = PdfReader(io.BytesIO(pdf_bytes)) - pages = [] - for page in reader.pages: - writer = PdfWriter() - writer.add_page(page) - buf = io.BytesIO() - writer.write(buf) - pages.append(buf.getvalue()) - return pages - -def _textract_detect_bytes(file_bytes: bytes) -> str: - response = _textract.detect_document_text(Document={"Bytes": file_bytes}) - return extract_text_from_textract_response(response) - -async def extract_text_from_s3_document(bucket: str, key: str) -> tuple[str, int]: - file_bytes = await asyncio.to_thread( - lambda: _s3_input.get_object(Bucket=bucket, Key=key)["Body"].read() - ) - file_ext = Path(key).suffix.lower() - - if file_ext in [".png", ".jpg", ".jpeg"]: - text = await asyncio.to_thread(_textract_detect_bytes, file_bytes) - return text, 1 - - if file_ext == ".pdf": - page_bytes_list = await asyncio.to_thread(_split_pdf_pages, file_bytes) - texts = await asyncio.gather(*[ - asyncio.to_thread(_textract_detect_bytes, p) for p in page_bytes_list - ]) - return "\n".join(texts), len(page_bytes_list) - - return "", 0 async def process_guia(guia: dict) -> dict: - guia_code = guia.get("guia", {}).get("codigoGuiaLocal", "unknown") t_start = time.time() - - # Step 1: Extract text from all anexos anexos = guia.get("anexos", []) - all_extracted_texts = [] async def _extract_anexo(anexo_idx: int, anexo: dict): s3_uri = anexo.get("urlAnexo") or anexo.get("URLAnexo", "") @@ -121,54 +52,11 @@ async def process_guia(guia: dict) -> dict: file_content = "\n\n".join(p for p in parts if p) t_extracao = round(time.time() - t_extracao_start, 2) - # Step 2: For each servico, run the agent servicos = guia.get("servicos", []) - avaliacao_resultados = [] - - async def _run_servico(servico: dict) -> dict: - codigo_servico_raw = str(servico.get("codigoServico", "")) - code = "".join(c for c in codigo_servico_raw if c.isdigit()) - - if code not in RULES: - return { - "codigoServico": codigo_servico_raw, - "resultado": "SKIPPED", - "motivo": f"Codigo '{code}' nao encontrado nas regras", - "agentOutput": "", - "tempoAgentSegundos": 0, - } - - query_data = { - "atendimento": guia.get("atendimento", {}), - "guia": guia.get("guia", {}), - "servico": servico, - "historico": guia.get("historico", {}) - } - query = json.dumps(query_data, indent=2, ensure_ascii=False) - - t0 = time.time() - try: - result = await run_agent(query, code, file_content) - agent_output = result["response"] - input_tokens = result["input_tokens"] - output_tokens = result["output_tokens"] - except Exception as e: - print(f" Agent error for servico {codigo_servico_raw}: {e}") - agent_output = f"ERROR: {str(e)}" - input_tokens = 0 - output_tokens = 0 - - return { - "codigoServico": codigo_servico_raw, - "resultado": "Aprovado" if "".join(c for c in agent_output.lower() if c.isalpha()).startswith("aprov") else "Reprovado", - "agentOutput": agent_output, - "input_tokens": input_tokens, - "output_tokens": output_tokens, - "tempoAgentSegundos": round(time.time() - t0, 2), - } - t_agent_start = time.time() - guia["avaliacaoAgente"] = list(await asyncio.gather(*[_run_servico(s) for s in servicos])) + guia["avaliacaoAgente"] = list( + await asyncio.gather(*[evaluate_servico(s, guia, file_content) for s in servicos]) + ) t_agent = round(time.time() - t_agent_start, 2) guia["tempoProcessamento"] = { @@ -176,24 +64,19 @@ async def process_guia(guia: dict) -> dict: "agentSegundos": t_agent, "totalSegundos": round(time.time() - t_start, 2), } - return guia -# --- API models --- - class ProcessRequest(BaseModel): operadora: dict guias: list[dict] -# --- Endpoints --- - @app.post("/process", dependencies=[Security(verify_api_key)]) async def process(request: ProcessRequest): raw_results = await asyncio.gather( *[process_guia(guia) for guia in request.guias], - return_exceptions=True + return_exceptions=True, ) results = [ {"error": str(r), "guia": request.guias[i].get("guia", {}).get("codigoGuiaLocal", f"index_{i}")} @@ -201,32 +84,9 @@ async def process(request: ProcessRequest): for i, r in enumerate(raw_results) ] - response_body = { - "status": "success", - "operadora": request.operadora, - "guias": results - } + await save_results(results) - # Save result to S3 - # Save result to S3 - async def _save_guia(guia_result: dict): - numero_guia = guia_result.get("guia", {}).get("codigoGuiaLocal", "unknown") - key = f"{VERSION}/{numero_guia}_{timestamp}.json" - await asyncio.to_thread( - _s3_output.put_object, - Bucket=OUTPUT_BUCKET, - Key=key, - Body=json.dumps(guia_result, ensure_ascii=False), - ContentType="application/json", - ) - - try: - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - await asyncio.gather(*[_save_guia(g) for g in results]) - except Exception as e: - print(f"Error saving to S3: {e}") - - return response_body + return {"status": "success", "operadora": request.operadora, "guias": results} @app.get("/health") diff --git a/code/dockerfile b/code/dockerfile index 06e7dfe..84dca34 100644 --- a/code/dockerfile +++ b/code/dockerfile @@ -10,6 +10,7 @@ RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/* COPY app.py . COPY utils/ ./utils/ +COPY services/ ./services/ EXPOSE 8000 diff --git a/code/services/__init__.py b/code/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/services/authorization.py b/code/services/authorization.py new file mode 100644 index 0000000..8b428a9 --- /dev/null +++ b/code/services/authorization.py @@ -0,0 +1,46 @@ +import json +import time + +from utils.langgraph_agent import RULES, run_agent + + +async def evaluate_servico(servico: dict, guia: dict, file_content: str) -> dict: + codigo_servico_raw = str(servico.get("codigoServico", "")) + code = "".join(c for c in codigo_servico_raw if c.isdigit()) + + if code not in RULES: + return { + "codigoServico": codigo_servico_raw, + "resultado": "SKIPPED", + "motivo": f"Codigo '{code}' nao encontrado nas regras", + "agentOutput": "", + "tempoAgentSegundos": 0, + } + + query_data = { + "atendimento": guia.get("atendimento", {}), + "guia": guia.get("guia", {}), + "servico": servico, + "historico": guia.get("historico", {}), + } + query = json.dumps(query_data, indent=2, ensure_ascii=False) + + t0 = time.time() + try: + result = await run_agent(query, code, file_content) + agent_output = result["response"] + input_tokens = result["input_tokens"] + output_tokens = result["output_tokens"] + except Exception as e: + agent_output = f"ERROR: {str(e)}" + input_tokens = 0 + output_tokens = 0 + + return { + "codigoServico": codigo_servico_raw, + "resultado": "Aprovado" if "".join(c for c in agent_output.lower() if c.isalpha()).startswith("aprov") else "Reprovado", + "agentOutput": agent_output, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "tempoAgentSegundos": round(time.time() - t0, 2), + } diff --git a/code/services/document_extractor.py b/code/services/document_extractor.py new file mode 100644 index 0000000..657c978 --- /dev/null +++ b/code/services/document_extractor.py @@ -0,0 +1,75 @@ +import asyncio +import io +from pathlib import Path +from urllib.parse import urlparse + +import boto3 +from PyPDF2 import PdfReader, PdfWriter + +from utils.config import AWS_REGION +from utils.secrets_manager import SECRETS + +_s3_input = boto3.client( + "s3", + aws_access_key_id=SECRETS["AWS_ACCESS_KEY"], + aws_secret_access_key=SECRETS["AWS_SECRET_KEY"], + region_name=AWS_REGION, +) +_textract = boto3.client("textract", region_name=AWS_REGION) + + +def parse_s3_uri(s3_uri: str) -> tuple[str, str]: + parsed = urlparse(s3_uri) + if parsed.scheme != "s3": + raise ValueError(f"Not an S3 URI: {s3_uri}") + bucket = parsed.netloc + key = parsed.path.lstrip("/") + if not bucket or not key: + raise ValueError(f"Invalid S3 URI: {s3_uri}") + return bucket, key + + +def _extract_text_from_textract_response(response: dict) -> str: + if not response: + return "" + return "\n".join( + block["Text"] for block in response.get("Blocks", []) + if block["BlockType"] == "LINE" + ) + + +def _split_pdf_pages(pdf_bytes: bytes) -> list[bytes]: + reader = PdfReader(io.BytesIO(pdf_bytes)) + pages = [] + for page in reader.pages: + writer = PdfWriter() + writer.add_page(page) + buf = io.BytesIO() + writer.write(buf) + pages.append(buf.getvalue()) + return pages + + +def _textract_detect_bytes(file_bytes: bytes) -> str: + response = _textract.detect_document_text(Document={"Bytes": file_bytes}) + return _extract_text_from_textract_response(response) + + +async def extract_text_from_s3_document(bucket: str, key: str) -> tuple[str, int]: + file_bytes = await asyncio.to_thread( + lambda: _s3_input.get_object(Bucket=bucket, Key=key)["Body"].read() + ) + file_ext = Path(key).suffix.lower() + + if file_ext in [".png", ".jpg", ".jpeg"]: + text = await asyncio.to_thread(_textract_detect_bytes, file_bytes) + return text, 1 + + if file_ext == ".pdf": + page_bytes_list = await asyncio.to_thread(_split_pdf_pages, file_bytes) + texts = await asyncio.gather(*[ + asyncio.to_thread(_textract_detect_bytes, p) for p in page_bytes_list + ]) + return "\n".join(texts), len(page_bytes_list) + + return "", 0 diff --git a/code/services/result_store.py b/code/services/result_store.py new file mode 100644 index 0000000..95dbcff --- /dev/null +++ b/code/services/result_store.py @@ -0,0 +1,29 @@ +import asyncio +import json +from datetime import datetime + +import boto3 + +from utils.config import AWS_REGION, OUTPUT_BUCKET, API_VERSION + +_s3_output = boto3.client("s3", region_name=AWS_REGION) + + +async def save_results(results: list[dict]) -> None: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + async def _save_one(guia_result: dict): + numero_guia = guia_result.get("guia", {}).get("codigoGuiaLocal", "unknown") + key = f"{API_VERSION}/{numero_guia}_{timestamp}.json" + await asyncio.to_thread( + _s3_output.put_object, + Bucket=OUTPUT_BUCKET, + Key=key, + Body=json.dumps(guia_result, ensure_ascii=False), + ContentType="application/json", + ) + + try: + await asyncio.gather(*[_save_one(g) for g in results]) + except Exception: + pass diff --git a/code/utils/config.py b/code/utils/config.py new file mode 100644 index 0000000..bf9f05d --- /dev/null +++ b/code/utils/config.py @@ -0,0 +1,12 @@ +import os + +# AWS +AWS_REGION = os.environ.get("AWS_REGION", "us-east-2") +BEDROCK_MODEL_ARN = os.environ["BEDROCK_MODEL_ARN"] + +# S3 +OUTPUT_BUCKET = os.environ.get("OUTPUT_BUCKET", "upflux-doc-analyzer") +API_VERSION = os.environ.get("API_VERSION", "v1") + +# Langfuse +LANGFUSE_HOST = os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com") diff --git a/code/utils/langgraph_agent.py b/code/utils/langgraph_agent.py index fe9c524..ba45ff2 100644 --- a/code/utils/langgraph_agent.py +++ b/code/utils/langgraph_agent.py @@ -17,15 +17,16 @@ from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, System from langchain_core.tools import tool import os from langfuse import Langfuse -from langfuse.langchain import CallbackHandler +from langfuse.langchain import CallbackHandler from utils.secrets_manager import SECRETS +from utils.config import AWS_REGION, BEDROCK_MODEL_ARN, LANGFUSE_HOST CODE="" langfuse = Langfuse( secret_key=SECRETS["LANGFUSE-SECRET-KEY"], public_key=SECRETS["LANGFUSE-PUBLIC-KEY"], - host=os.environ.get("LANGFUSE_HOST", ""), + host=LANGFUSE_HOST, ) @@ -436,14 +437,14 @@ class AgentState(TypedDict): def get_bedrock_client(): """Initialize and return AWS Bedrock runtime client.""" - return boto3.client("bedrock-runtime", region_name="us-east-2") + return boto3.client("bedrock-runtime", region_name=AWS_REGION) def create_llm(): """Create and return the Bedrock LLM.""" return ChatBedrock( - model_id="arn:aws:bedrock:us-east-2:232048051668:application-inference-profile/uy4xskop19zn", - region_name="us-east-2", + model_id=BEDROCK_MODEL_ARN, + region_name=AWS_REGION, provider="anthropic" ) diff --git a/infra/ecs_alb/main.tf b/infra/ecs_alb/main.tf index b6be068..292754d 100644 --- a/infra/ecs_alb/main.tf +++ b/infra/ecs_alb/main.tf @@ -273,6 +273,22 @@ resource "aws_ecs_task_definition" "app" { { name = "LANGFUSE_HOST" value = var.langfuse_host + }, + { + name = "AWS_REGION" + value = var.aws_region_app + }, + { + name = "BEDROCK_MODEL_ARN" + value = var.bedrock_model_arn + }, + { + name = "OUTPUT_BUCKET" + value = var.output_bucket + }, + { + name = "API_VERSION" + value = var.api_version } ] diff --git a/infra/ecs_alb/terraform.tfvars b/infra/ecs_alb/terraform.tfvars index 3a904cd..ea1fa8a 100644 --- a/infra/ecs_alb/terraform.tfvars +++ b/infra/ecs_alb/terraform.tfvars @@ -21,4 +21,9 @@ image_tag = "latest" fargate_cpu = "256" fargate_memory = "512" app_count = 1 -langfuse_host = "http://10.0.0.12:3000" \ No newline at end of file +langfuse_host = "http://10.0.0.12:3000" + +aws_region_app = "us-east-2" +bedrock_model_arn = "arn:aws:bedrock:us-east-2:232048051668:application-inference-profile/uy4xskop19zn" +output_bucket = "upflux-doc-analyzer" +api_version = "v1" \ No newline at end of file diff --git a/infra/ecs_alb/variables.tf b/infra/ecs_alb/variables.tf index 5878032..91360cc 100644 --- a/infra/ecs_alb/variables.tf +++ b/infra/ecs_alb/variables.tf @@ -58,4 +58,27 @@ variable "app_count" { variable "langfuse_host" { description = "Langfuse host URL" type = string +} + +variable "aws_region_app" { + description = "AWS region passed to the application container" + type = string + default = "us-east-2" +} + +variable "bedrock_model_arn" { + description = "ARN of the AWS Bedrock application inference profile" + type = string +} + +variable "output_bucket" { + description = "S3 bucket name for processed results" + type = string + default = "upflux-doc-analyzer" +} + +variable "api_version" { + description = "API version prefix used in S3 output paths" + type = string + default = "v1" } \ No newline at end of file