diff --git a/code/app.py b/code/app.py index 40131db..9b16fb0 100644 --- a/code/app.py +++ b/code/app.py @@ -1,4 +1,5 @@ -from fastapi import FastAPI +from fastapi import FastAPI, Security, HTTPException +from fastapi.security import APIKeyHeader from pydantic import BaseModel import uvicorn import boto3 @@ -9,11 +10,28 @@ from pathlib import Path from urllib.parse import urlparse from PyPDF2 import PdfReader +from datetime import datetime from utils.langgraph_agent import RULES, run_agent +from utils.secrets_manager import SECRETS app = FastAPI() AWS_REGION = "us-east-2" +INPUT_BUCKET="automated-pre-authorization" +OUTPUT_BUCKET = "upflux-doc-analyzer" +VERSION = "v1" + +# API Key auth +_api_key_header = APIKeyHeader(name="X-API-Key") +API_KEY = SECRETS["API-KEY"] + +AWS_ACCESS_KEY =SECRETS["AWS_ACCESS_KEY"] +AWS_SECRET_KEY = SECRETS["AWS_SECRET_KEY"] + +def verify_api_key(api_key: str = Security(_api_key_header)): + if api_key != API_KEY: + raise HTTPException(status_code=403, detail="Invalid API key") + return api_key # --- S3 / Textract helpers --- @@ -29,7 +47,13 @@ def parse_s3_uri(s3_uri: str) -> tuple[str, str]: return bucket, key -def get_s3_client(): +def get_s3_input_client(): + """S3 client with cross-account credentials for INPUT_BUCKET.""" + return boto3.client("s3",aws_access_key_id=AWS_ACCESS_KEY,aws_secret_access_key=AWS_SECRET_KEY,region_name=AWS_REGION) + + +def get_s3_output_client(): + """S3 client using ECS task role for OUTPUT_BUCKET.""" return boto3.client("s3", region_name=AWS_REGION) @@ -53,42 +77,66 @@ def extract_text_from_textract_response(response: dict) -> str: ) -def extract_text_from_s3_document(bucket: str, key: str) -> str: - s3 = get_s3_client() +def extract_text_from_s3_document(bucket: str, key: str) -> tuple[str, int]: + """Returns (extracted_text, page_count).""" + s3_input = get_s3_input_client() + s3_output = get_s3_output_client() textract = get_textract_client() file_ext = Path(key).suffix.lower() + # Download file bytes using cross-account S3 credentials + obj = s3_input.get_object(Bucket=bucket, Key=key) + file_bytes = obj["Body"].read() + if file_ext in [".png", ".jpg", ".jpeg"]: + # Pass bytes directly to Textract (avoids Textract needing cross-account S3 access) response = textract.detect_document_text( - Document={"S3Object": {"Bucket": bucket, "Name": key}} + Document={"Bytes": file_bytes} ) - return extract_text_from_textract_response(response) + return extract_text_from_textract_response(response), 1 if file_ext == ".pdf": - obj = s3.get_object(Bucket=bucket, Key=key) - pdf_bytes = obj["Body"].read() - page_count = get_pdf_page_count(pdf_bytes) + page_count = get_pdf_page_count(file_bytes) if page_count > 1: + # Async API requires S3Object — copy to local bucket Textract can access + temp_key = f"temp_textract/{Path(key).name}" + s3_output.put_object(Bucket=OUTPUT_BUCKET, Key=temp_key, Body=file_bytes) + response = textract.start_document_text_detection( - DocumentLocation={"S3Object": {"Bucket": bucket, "Name": key}} + DocumentLocation={"S3Object": {"Bucket": OUTPUT_BUCKET, "Name": temp_key}} ) job_id = response["JobId"] - while True: - result = textract.get_document_text_detection(JobId=job_id) - status = result["JobStatus"] - if status == "SUCCEEDED": - return extract_text_from_textract_response(result) - elif status == "FAILED": - return "" - time.sleep(2) - else: - response = textract.detect_document_text( - Document={"S3Object": {"Bucket": bucket, "Name": key}} - ) - return extract_text_from_textract_response(response) + try: + # Wait for job to complete + while True: + result = textract.get_document_text_detection(JobId=job_id) + status = result["JobStatus"] + if status == "SUCCEEDED": + break + elif status == "FAILED": + return "", page_count + time.sleep(2) - return "" + # Collect all blocks across paginated results + all_blocks = result.get("Blocks", []) + while "NextToken" in result: + result = textract.get_document_text_detection( + JobId=job_id, NextToken=result["NextToken"] + ) + all_blocks.extend(result.get("Blocks", [])) + + return extract_text_from_textract_response({"Blocks": all_blocks}), page_count + finally: + s3_output.delete_object(Bucket=OUTPUT_BUCKET, Key=temp_key) + else: + # Single-page PDF — pass bytes directly to sync API + response = textract.detect_document_text( + Document={"Bytes": file_bytes} + ) + return extract_text_from_textract_response(response), page_count + + return "", 0 # --- Guia processing --- @@ -110,12 +158,14 @@ def process_guia(guia: dict) -> dict: try: bucket, key = parse_s3_uri(s3_uri) - extracted_text = extract_text_from_s3_document(bucket, key) + extracted_text, page_count = extract_text_from_s3_document(bucket, key) except Exception as e: - print(f" Error extracting text from {nome_arquivo}: {e}") extracted_text = "" + page_count = 0 + anexo["error"] = str(e) anexo["textoExtraido"] = extracted_text + anexo["pageCount"] = page_count all_extracted_texts.append(f"--- {nome_arquivo} ---\n{extracted_text}") file_content = "\n\n".join(all_extracted_texts) @@ -146,15 +196,22 @@ def process_guia(guia: dict) -> dict: query = json.dumps(query_data, indent=2, ensure_ascii=False) try: - agent_output = run_agent(query, code, file_content) + result = run_agent(query, code, file_content) + agent_output = result["response"] + input_tokens = result["input_tokens"] + output_tokens = result["output_tokens"] except Exception as e: print(f" Agent error for servico {codigo_servico_raw}: {e}") agent_output = f"ERROR: {str(e)}" + input_tokens = 0 + output_tokens = 0 avaliacao_resultados.append({ "codigoServico": codigo_servico_raw, "resultado": "Aprovado" if "aprov" in "".join(c for c in agent_output.lower() if c.isalnum() or c == ' ') else "Reprovado", - "agentOutput": agent_output + "agentOutput": agent_output, + "input_tokens": input_tokens, + "output_tokens": output_tokens, }) guia["avaliacaoAgente"] = avaliacao_resultados @@ -170,7 +227,7 @@ class ProcessRequest(BaseModel): # --- Endpoints --- -@app.post("/process") +@app.post("/process", dependencies=[Security(verify_api_key)]) async def process(request: ProcessRequest): results = [] for idx, guia in enumerate(request.guias): @@ -183,12 +240,30 @@ async def process(request: ProcessRequest): "guia": guia.get("guia", {}).get("codigoGuiaLocal", f"index_{idx}") }) - return { + response_body = { "status": "success", "operadora": request.operadora, "guias": results } + # Save result to S3 + try: + s3 = get_s3_output_client() + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + for guia_result in results: + numero_guia = guia_result.get("guia", {}).get("codigoGuiaLocal", "unknown") + key = f"{VERSION}/{numero_guia}_{timestamp}.json" + s3.put_object( + Bucket=OUTPUT_BUCKET, + Key=key, + Body=json.dumps(guia_result, ensure_ascii=False), + ContentType="application/json", + ) + except Exception as e: + print(f"Error saving to S3: {e}") + + return response_body + @app.get("/health") async def health(): diff --git a/code/requirements.txt b/code/requirements.txt index 9946153..ffd2a94 100644 --- a/code/requirements.txt +++ b/code/requirements.txt @@ -5,4 +5,5 @@ langchain-aws langchain PyPDF2 pydantic -boto3 \ No newline at end of file +boto3 +langfuse \ No newline at end of file diff --git a/code/utils/langgraph_agent.py b/code/utils/langgraph_agent.py index bf497a3..9babaed 100644 --- a/code/utils/langgraph_agent.py +++ b/code/utils/langgraph_agent.py @@ -14,9 +14,21 @@ from langgraph.graph.message import add_messages from langchain_aws import ChatBedrock from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, SystemMessage from langchain_core.tools import tool +import os +from langfuse import Langfuse +from langfuse.langchain import CallbackHandler +from utils.secrets_manager import SECRETS CODE="" +langfuse = Langfuse( + secret_key=SECRETS["LANGFUSE-SECRET-KEY"], + public_key=SECRETS["LANGFUSE-PUBLIC-KEY"], + host=os.environ.get("LANGFUSE_HOST", ""), +) + + + # Base paths SCRIPTS_DIR = Path(__file__).parent JSON_OUTPUT_DIR = SCRIPTS_DIR / "json_output" @@ -564,13 +576,26 @@ Start your answer with either: print("-" * 50) # Run the agent - final_state = agent.invoke(initial_state) + langfuse_handler = CallbackHandler() + config = {"callbacks": [langfuse_handler]} + final_state = agent.invoke(initial_state, config=config) # Get the final response final_message = final_state["messages"][-1] response = final_message.content if hasattr(final_message, "content") else str(final_message) + # Count tokens from all AI messages + input_tokens = 0 + output_tokens = 0 + for msg in final_state["messages"]: + usage = getattr(msg, "usage_metadata", None) + if usage: + input_tokens += usage.get("input_tokens", 0) + output_tokens += usage.get("output_tokens", 0) + + langfuse.flush() print(f"Agent: {response}") - return response + print(f"Tokens - input: {input_tokens}, output: {output_tokens}") + return {"response": response, "input_tokens": input_tokens, "output_tokens": output_tokens} diff --git a/code/utils/secrets_manager.py b/code/utils/secrets_manager.py new file mode 100644 index 0000000..275b208 --- /dev/null +++ b/code/utils/secrets_manager.py @@ -0,0 +1,6 @@ +import json +import boto3 + +_client = boto3.client("secretsmanager", region_name="us-east-2") +_response = _client.get_secret_value(SecretId="doc-analyzer") +SECRETS: dict = json.loads(_response["SecretString"]) diff --git a/infra/ecs_alb/main.tf b/infra/ecs_alb/main.tf index 92f5763..cd7f1b4 100644 --- a/infra/ecs_alb/main.tf +++ b/infra/ecs_alb/main.tf @@ -202,13 +202,23 @@ resource "aws_iam_role_policy" "s3_policy" { policy = jsonencode({ Version = "2012-10-17" - Statement = [{ - Effect = "Allow" - Action = [ - "s3:GetObject" - ] - Resource = "arn:aws:s3:::upflux-doc-analyzer/*" - }] + Statement = [ + { + Effect = "Allow" + Action = [ + "s3:GetObject", + "s3:PutObject" + ] + Resource = "arn:aws:s3:::upflux-doc-analyzer/*" + }, + { + Effect = "Allow" + Action = [ + "s3:DeleteObject" + ] + Resource = "arn:aws:s3:::upflux-doc-analyzer/temp_textract/*" + } + ] }) } @@ -229,6 +239,23 @@ resource "aws_iam_role_policy" "textract_policy" { }] }) } + +resource "aws_iam_role_policy" "secrets_manager_policy" { + name = "${var.app_name}-secrets-manager-policy" + role = aws_iam_role.ecs_task_role.id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Effect = "Allow" + Action = [ + "secretsmanager:GetSecretValue" + ] + Resource = "*" + }] + }) +} + # ECS Task Definition resource "aws_ecs_task_definition" "app" { family = var.app_name @@ -242,6 +269,13 @@ resource "aws_ecs_task_definition" "app" { name = var.app_name image = "${data.aws_caller_identity.current.account_id}.dkr.ecr.${var.aws_region}.amazonaws.com/${var.ecr_repository_name}:${var.image_tag}" + environment = [ + { + name = "LANGFUSE_HOST" + value = var.langfuse_host + } + ] + portMappings = [{ containerPort = 8000 hostPort = 8000 diff --git a/infra/ecs_alb/terraform.tfvars b/infra/ecs_alb/terraform.tfvars index a27df00..3a904cd 100644 --- a/infra/ecs_alb/terraform.tfvars +++ b/infra/ecs_alb/terraform.tfvars @@ -20,4 +20,5 @@ ecr_repository_name = "upflux-doc-analyser" image_tag = "latest" fargate_cpu = "256" fargate_memory = "512" -app_count = 1 \ No newline at end of file +app_count = 1 +langfuse_host = "http://10.0.0.12:3000" \ No newline at end of file diff --git a/infra/ecs_alb/variables.tf b/infra/ecs_alb/variables.tf index ad2cb2d..7c9af34 100644 --- a/infra/ecs_alb/variables.tf +++ b/infra/ecs_alb/variables.tf @@ -53,4 +53,9 @@ variable "app_count" { description = "Number of tasks to run" type = number default = 1 +} + +variable "langfuse_host" { + description = "Langfuse host URL" + type = string } \ No newline at end of file diff --git a/infra/langfuse-terraform/data.tf b/infra/langfuse-terraform/data.tf new file mode 100644 index 0000000..62460ae --- /dev/null +++ b/infra/langfuse-terraform/data.tf @@ -0,0 +1,14 @@ +data "aws_ami" "ubuntu" { + most_recent = true + owners = ["099720109477"] # Canonical + + filter { + name = "name" + values = ["ubuntu/images/hvm-ssd/ubuntu-jammy-22.04-amd64-server-*"] + } + + filter { + name = "virtualization-type" + values = ["hvm"] + } +} diff --git a/infra/langfuse-terraform/main.tf b/infra/langfuse-terraform/main.tf new file mode 100644 index 0000000..a215ada --- /dev/null +++ b/infra/langfuse-terraform/main.tf @@ -0,0 +1,57 @@ +# ────────────────────────────────────────────── +# Security Group +# ────────────────────────────────────────────── +resource "aws_security_group" "langfuse" { + name = var.sg_name + description = "Allow defined ports for Langfuse" + vpc_id = var.vpc_id + + dynamic "ingress" { + for_each = var.allowed_ports + content { + from_port = ingress.value + to_port = ingress.value + protocol = "tcp" + cidr_blocks = ["3.14.44.224/32"] + } + } + + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + tags = merge(var.tags, { + Name = var.sg_name + }) +} + +# ────────────────────────────────────────────── +# EC2 Instance +# ────────────────────────────────────────────── +resource "aws_instance" "langfuse" { + ami = data.aws_ami.ubuntu.id + instance_type = var.instance_type + subnet_id = var.subnet_id + vpc_security_group_ids = [aws_security_group.langfuse.id] + associate_public_ip_address = true + key_name = var.key_name != "" ? var.key_name : null + + user_data = templatefile("${path.module}/user_data.sh.tftpl", { + langfuse_repo_url = var.langfuse_repo_url + langfuse_web_port = var.langfuse_web_port + ebs_device_name = var.ebs_device_name + }) + + root_block_device { + volume_size = var.root_volume_size + volume_type = var.root_volume_type + delete_on_termination = true + } + + tags = merge(var.tags, { + Name = var.instance_name + }) +} diff --git a/infra/langfuse-terraform/outputs.tf b/infra/langfuse-terraform/outputs.tf new file mode 100644 index 0000000..d437cc9 --- /dev/null +++ b/infra/langfuse-terraform/outputs.tf @@ -0,0 +1,9 @@ +output "instance_ip" { + description = "Public IP of the Langfuse EC2 instance" + value = aws_instance.langfuse.public_ip +} + +output "url" { + description = "Langfuse web UI URL" + value = "http://${aws_instance.langfuse.public_ip}:${var.langfuse_web_port}" +} diff --git a/infra/langfuse-terraform/providers.tf b/infra/langfuse-terraform/providers.tf new file mode 100644 index 0000000..05313ec --- /dev/null +++ b/infra/langfuse-terraform/providers.tf @@ -0,0 +1,14 @@ +terraform { + required_version = ">= 1.0" + + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 6.0" + } + } +} + +provider "aws" { + region = var.aws_region +} diff --git a/infra/langfuse-terraform/terraform.tfvars b/infra/langfuse-terraform/terraform.tfvars new file mode 100644 index 0000000..e29f65c --- /dev/null +++ b/infra/langfuse-terraform/terraform.tfvars @@ -0,0 +1,36 @@ +# ────────────────────────────────────────────── +# General +# ────────────────────────────────────────────── +aws_region = "us-east-2" +project_name = "langfuse" +environment = "dev" + +tags = { + project = "doc-processor" + env = "dev" + costCenter = "AI" + owner = "ai-team" +} + +# ────────────────────────────────────────────── +# Network +# ────────────────────────────────────────────── +vpc_id = "vpc-0270f02aee3bf1b8d" +subnet_id = "subnet-088bc49c54ec8f028" # public-us-east-1a-subnet + +# ────────────────────────────────────────────── +# EC2 +# ────────────────────────────────────────────── +instance_type = "t3.xlarge" +instance_name = "LangfuseEC2" +sg_name = "langfuse-sg" +allowed_ports = [22, 80, 443, 3000] +root_volume_size = 100 +root_volume_type = "gp2" +ebs_device_name = "/dev/sdf" + +# ────────────────────────────────────────────── +# Langfuse +# ────────────────────────────────────────────── +langfuse_repo_url = "https://github.com/langfuse/langfuse.git" +langfuse_web_port = 3000 diff --git a/infra/langfuse-terraform/user_data.sh.tftpl b/infra/langfuse-terraform/user_data.sh.tftpl new file mode 100644 index 0000000..156ab10 --- /dev/null +++ b/infra/langfuse-terraform/user_data.sh.tftpl @@ -0,0 +1,66 @@ +#!/bin/bash +set -e + +# Install Docker +sudo apt-get update -y +sudo apt-get install -y ca-certificates curl gnupg git +sudo install -m 0755 -d /etc/apt/keyrings +curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg +sudo chmod a+r /etc/apt/keyrings/docker.gpg +echo \ + "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \ + $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \ + sudo tee /etc/apt/sources.list.d/docker.list > /dev/null +sudo apt-get update -y +sudo apt-get install -y docker-ce docker-ce-cli containerd.io docker-compose-plugin +sudo groupadd docker || true +sudo usermod -aG docker ubuntu +sudo chmod 666 /var/run/docker.sock +sudo systemctl enable docker +sudo systemctl restart docker + +# Clone and configure Langfuse +cd /opt +git clone ${langfuse_repo_url} +cd langfuse + +NEXTAUTH_SECRET=$(openssl rand -hex 32) +PUBLIC_IP=$(curl -s http://169.254.169.254/latest/meta-data/public-ipv4) +SALT=$(openssl rand -hex 16) +ENCRYPTION_KEY=$(openssl rand -hex 32) + +cat > .env <