import asyncio import io from pathlib import Path from urllib.parse import urlparse import boto3 from PyPDF2 import PdfReader, PdfWriter from utils.config import AWS_REGION from utils.secrets_manager import SECRETS _s3_input = boto3.client( "s3", aws_access_key_id=SECRETS["AWS_ACCESS_KEY"], aws_secret_access_key=SECRETS["AWS_SECRET_KEY"], region_name=AWS_REGION, ) _textract = boto3.client("textract", region_name=AWS_REGION) def parse_s3_uri(s3_uri: str) -> tuple[str, str]: parsed = urlparse(s3_uri) if parsed.scheme != "s3": raise ValueError(f"Not an S3 URI: {s3_uri}") bucket = parsed.netloc key = parsed.path.lstrip("/") if not bucket or not key: raise ValueError(f"Invalid S3 URI: {s3_uri}") return bucket, key def _extract_text_from_textract_response(response: dict) -> str: if not response: return "" return "\n".join( block["Text"] for block in response.get("Blocks", []) if block["BlockType"] == "LINE" ) def _split_pdf_pages(pdf_bytes: bytes) -> list[bytes]: reader = PdfReader(io.BytesIO(pdf_bytes)) pages = [] for page in reader.pages: writer = PdfWriter() writer.add_page(page) buf = io.BytesIO() writer.write(buf) pages.append(buf.getvalue()) return pages def _textract_detect_bytes(file_bytes: bytes) -> str: response = _textract.detect_document_text(Document={"Bytes": file_bytes}) return _extract_text_from_textract_response(response) async def extract_text_from_s3_document(bucket: str, key: str) -> tuple[str, int]: file_bytes = await asyncio.to_thread( lambda: _s3_input.get_object(Bucket=bucket, Key=key)["Body"].read() ) file_ext = Path(key).suffix.lower() if file_ext in [".png", ".jpg", ".jpeg"]: text = await asyncio.to_thread(_textract_detect_bytes, file_bytes) return text, 1 if file_ext == ".pdf": page_bytes_list = await asyncio.to_thread(_split_pdf_pages, file_bytes) texts = await asyncio.gather(*[ asyncio.to_thread(_textract_detect_bytes, p) for p in page_bytes_list ]) return "\n".join(texts), len(page_bytes_list) return "", 0