Feat: Adds parallel and async process to calls and ocr

This commit is contained in:
2026-03-11 17:14:47 -03:00
parent b7c0b92fa3
commit 08be8e314d
8 changed files with 193 additions and 1507 deletions

View File

@@ -3,21 +3,19 @@ from fastapi.security import APIKeyHeader
from pydantic import BaseModel
import uvicorn
import boto3
import asyncio
import json
import time
import io
from pathlib import Path
from urllib.parse import urlparse
from PyPDF2 import PdfReader
from PyPDF2 import PdfReader,PdfWriter
from datetime import datetime
from utils.langgraph_agent import RULES, run_agent
from utils.secrets_manager import SECRETS
app = FastAPI()
AWS_REGION = "us-east-2"
INPUT_BUCKET="automated-pre-authorization"
OUTPUT_BUCKET = "upflux-doc-analyzer"
VERSION = "v1"
@@ -27,7 +25,9 @@ API_KEY = SECRETS["API-KEY"]
AWS_ACCESS_KEY =SECRETS["AWS_ACCESS_KEY"]
AWS_SECRET_KEY = SECRETS["AWS_SECRET_KEY"]
_s3_input = boto3.client("s3",aws_access_key_id=AWS_ACCESS_KEY,aws_secret_access_key=AWS_SECRET_KEY,region_name=AWS_REGION)
_s3_output = boto3.client("s3", region_name=AWS_REGION)
_textract = boto3.client("textract", region_name=AWS_REGION)
def verify_api_key(api_key: str = Security(_api_key_header)):
if api_key != API_KEY:
raise HTTPException(status_code=403, detail="Invalid API key")
@@ -47,27 +47,6 @@ def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
return bucket, key
def get_s3_input_client():
"""S3 client with cross-account credentials for INPUT_BUCKET."""
return boto3.client("s3",aws_access_key_id=AWS_ACCESS_KEY,aws_secret_access_key=AWS_SECRET_KEY,region_name=AWS_REGION)
def get_s3_output_client():
"""S3 client using ECS task role for OUTPUT_BUCKET."""
return boto3.client("s3", region_name=AWS_REGION)
def get_textract_client():
return boto3.client("textract", region_name=AWS_REGION)
def get_pdf_page_count(pdf_bytes: bytes) -> int:
try:
return len(PdfReader(io.BytesIO(pdf_bytes)).pages)
except Exception:
return 1
def extract_text_from_textract_response(response: dict) -> str:
if not response:
return ""
@@ -77,115 +56,87 @@ def extract_text_from_textract_response(response: dict) -> str:
)
def extract_text_from_s3_document(bucket: str, key: str) -> tuple[str, int]:
"""Returns (extracted_text, page_count)."""
s3_input = get_s3_input_client()
s3_output = get_s3_output_client()
textract = get_textract_client()
def _split_pdf_pages(pdf_bytes: bytes) -> list[bytes]:
reader = PdfReader(io.BytesIO(pdf_bytes))
pages = []
for page in reader.pages:
writer = PdfWriter()
writer.add_page(page)
buf = io.BytesIO()
writer.write(buf)
pages.append(buf.getvalue())
return pages
def _textract_detect_bytes(file_bytes: bytes) -> str:
response = _textract.detect_document_text(Document={"Bytes": file_bytes})
return extract_text_from_textract_response(response)
async def extract_text_from_s3_document(bucket: str, key: str) -> tuple[str, int]:
file_bytes = await asyncio.to_thread(
lambda: _s3_input.get_object(Bucket=bucket, Key=key)["Body"].read()
)
file_ext = Path(key).suffix.lower()
# Download file bytes using cross-account S3 credentials
obj = s3_input.get_object(Bucket=bucket, Key=key)
file_bytes = obj["Body"].read()
if file_ext in [".png", ".jpg", ".jpeg"]:
# Pass bytes directly to Textract (avoids Textract needing cross-account S3 access)
response = textract.detect_document_text(
Document={"Bytes": file_bytes}
)
return extract_text_from_textract_response(response), 1
text = await asyncio.to_thread(_textract_detect_bytes, file_bytes)
return text, 1
if file_ext == ".pdf":
page_count = get_pdf_page_count(file_bytes)
if page_count > 1:
# Async API requires S3Object — copy to local bucket Textract can access
temp_key = f"temp_textract/{Path(key).name}"
s3_output.put_object(Bucket=OUTPUT_BUCKET, Key=temp_key, Body=file_bytes)
response = textract.start_document_text_detection(
DocumentLocation={"S3Object": {"Bucket": OUTPUT_BUCKET, "Name": temp_key}}
)
job_id = response["JobId"]
try:
# Wait for job to complete
while True:
result = textract.get_document_text_detection(JobId=job_id)
status = result["JobStatus"]
if status == "SUCCEEDED":
break
elif status == "FAILED":
return "", page_count
time.sleep(2)
# Collect all blocks across paginated results
all_blocks = result.get("Blocks", [])
while "NextToken" in result:
result = textract.get_document_text_detection(
JobId=job_id, NextToken=result["NextToken"]
)
all_blocks.extend(result.get("Blocks", []))
return extract_text_from_textract_response({"Blocks": all_blocks}), page_count
finally:
s3_output.delete_object(Bucket=OUTPUT_BUCKET, Key=temp_key)
else:
# Single-page PDF — pass bytes directly to sync API
response = textract.detect_document_text(
Document={"Bytes": file_bytes}
)
return extract_text_from_textract_response(response), page_count
page_bytes_list = await asyncio.to_thread(_split_pdf_pages, file_bytes)
texts = await asyncio.gather(*[
asyncio.to_thread(_textract_detect_bytes, p) for p in page_bytes_list
])
return "\n".join(texts), len(page_bytes_list)
return "", 0
# --- Guia processing ---
def process_guia(guia: dict) -> dict:
async def process_guia(guia: dict) -> dict:
guia_code = guia.get("guia", {}).get("codigoGuiaLocal", "unknown")
t_start = time.time()
# Step 1: Extract text from all anexos
anexos = guia.get("anexos", [])
all_extracted_texts = []
for anexo_idx, anexo in enumerate(anexos):
async def _extract_anexo(anexo_idx: int, anexo: dict):
s3_uri = anexo.get("urlAnexo") or anexo.get("URLAnexo", "")
nome_arquivo = anexo.get("nomeArquivo", f"attachment_{anexo_idx}")
if not s3_uri or not s3_uri.startswith("s3://"):
anexo["textoExtraido"] = ""
continue
return None
t0 = time.time()
try:
bucket, key = parse_s3_uri(s3_uri)
extracted_text, page_count = extract_text_from_s3_document(bucket, key)
extracted_text, page_count = await extract_text_from_s3_document(bucket, key)
except Exception as e:
extracted_text = ""
page_count = 0
anexo["error"] = str(e)
anexo["textoExtraido"] = extracted_text
anexo["pageCount"] = page_count
all_extracted_texts.append(f"--- {nome_arquivo} ---\n{extracted_text}")
anexo["tempoExtracaoSegundos"] = round(time.time() - t0, 2)
return f"--- {nome_arquivo} ---\n{extracted_text}"
file_content = "\n\n".join(all_extracted_texts)
t_extracao_start = time.time()
parts = await asyncio.gather(*[_extract_anexo(i, a) for i, a in enumerate(anexos)])
file_content = "\n\n".join(p for p in parts if p)
t_extracao = round(time.time() - t_extracao_start, 2)
# Step 2: For each servico, run the agent
servicos = guia.get("servicos", [])
avaliacao_resultados = []
for servico in servicos:
async def _run_servico(servico: dict) -> dict:
codigo_servico_raw = str(servico.get("codigoServico", ""))
code = "".join(c for c in codigo_servico_raw if c.isdigit())
if code not in RULES:
avaliacao_resultados.append({
return {
"codigoServico": codigo_servico_raw,
"resultado": "SKIPPED",
"motivo": f"Codigo '{code}' nao encontrado nas regras",
"agentOutput": ""
})
continue
"agentOutput": "",
"tempoAgentSegundos": 0,
}
query_data = {
"atendimento": guia.get("atendimento", {}),
@@ -195,8 +146,9 @@ def process_guia(guia: dict) -> dict:
}
query = json.dumps(query_data, indent=2, ensure_ascii=False)
t0 = time.time()
try:
result = run_agent(query, code, file_content)
result = await run_agent(query, code, file_content)
agent_output = result["response"]
input_tokens = result["input_tokens"]
output_tokens = result["output_tokens"]
@@ -206,15 +158,25 @@ def process_guia(guia: dict) -> dict:
input_tokens = 0
output_tokens = 0
avaliacao_resultados.append({
return {
"codigoServico": codigo_servico_raw,
"resultado": "Aprovado" if "aprov" in "".join(c for c in agent_output.lower() if c.isalnum() or c == ' ') else "Reprovado",
"resultado": "Aprovado" if "".join(c for c in agent_output.lower() if c.isalpha()).startswith("aprov") else "Reprovado",
"agentOutput": agent_output,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
})
"tempoAgentSegundos": round(time.time() - t0, 2),
}
t_agent_start = time.time()
guia["avaliacaoAgente"] = list(await asyncio.gather(*[_run_servico(s) for s in servicos]))
t_agent = round(time.time() - t_agent_start, 2)
guia["tempoProcessamento"] = {
"extracaoSegundos": t_extracao,
"agentSegundos": t_agent,
"totalSegundos": round(time.time() - t_start, 2),
}
guia["avaliacaoAgente"] = avaliacao_resultados
return guia
@@ -229,16 +191,15 @@ class ProcessRequest(BaseModel):
@app.post("/process", dependencies=[Security(verify_api_key)])
async def process(request: ProcessRequest):
results = []
for idx, guia in enumerate(request.guias):
try:
enriched = process_guia(guia)
results.append(enriched)
except Exception as e:
results.append({
"error": str(e),
"guia": guia.get("guia", {}).get("codigoGuiaLocal", f"index_{idx}")
})
raw_results = await asyncio.gather(
*[process_guia(guia) for guia in request.guias],
return_exceptions=True
)
results = [
{"error": str(r), "guia": request.guias[i].get("guia", {}).get("codigoGuiaLocal", f"index_{i}")}
if isinstance(r, Exception) else r
for i, r in enumerate(raw_results)
]
response_body = {
"status": "success",
@@ -247,18 +208,21 @@ async def process(request: ProcessRequest):
}
# Save result to S3
# Save result to S3
async def _save_guia(guia_result: dict):
numero_guia = guia_result.get("guia", {}).get("codigoGuiaLocal", "unknown")
key = f"{VERSION}/{numero_guia}_{timestamp}.json"
await asyncio.to_thread(
_s3_output.put_object,
Bucket=OUTPUT_BUCKET,
Key=key,
Body=json.dumps(guia_result, ensure_ascii=False),
ContentType="application/json",
)
try:
s3 = get_s3_output_client()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
for guia_result in results:
numero_guia = guia_result.get("guia", {}).get("codigoGuiaLocal", "unknown")
key = f"{VERSION}/{numero_guia}_{timestamp}.json"
s3.put_object(
Bucket=OUTPUT_BUCKET,
Key=key,
Body=json.dumps(guia_result, ensure_ascii=False),
ContentType="application/json",
)
await asyncio.gather(*[_save_guia(g) for g in results])
except Exception as e:
print(f"Error saving to S3: {e}")