Files
AI-coodex-rekog-image-labeling/label/infra/code/diagram_processor.py
2026-05-14 14:07:04 -03:00

1262 lines
53 KiB
Python

import boto3
import json
import os
from PIL import Image, ImageDraw
import numpy as np
from scipy.optimize import linear_sum_assignment
import re
# Configuration
REGION = 'us-east-1'
CUSTOM_LABELS_PROJECT_ARN = 'arn:aws:rekognition:us-east-1:173378533286:project/labels-valvula/version/labels-valvula.2025-11-24T15.44.16/1764009856090'
CONFIDENCE_THRESHOLD = 80 # Minimum confidence for custom labels detection
class DiagramProcessor:
"""Process a single diagram: segment, detect text, clean, and recognize objects"""
def __init__(self, region=REGION, custom_labels_arn=CUSTOM_LABELS_PROJECT_ARN):
self.textract_client = boto3.client('textract', region_name=region)
self.rekognition_client = boto3.client('rekognition', region_name=region)
self.custom_labels_arn = custom_labels_arn
self.region = region
def segment_image(self, image_path, output_dir, grid_size=(5, 5), overlap_percent=10):
"""
Segment an image into a grid with overlap
Args:
image_path: Path to input diagram image
output_dir: Directory to save segments
grid_size: Tuple (rows, cols) for grid dimensions
overlap_percent: Percentage of overlap between segments (0-100)
Returns:
List of tuples: [(segment_path, position_info), ...]
"""
os.makedirs(output_dir, exist_ok=True)
# Load image
img = Image.open(image_path)
img_width, img_height = img.size
rows, cols = grid_size
# Calculate segment dimensions with overlap
overlap_factor = overlap_percent / 100.0
segment_width = img_width / cols
segment_height = img_height / rows
# Calculate step size (distance between segment starts)
step_width = segment_width * (1 - overlap_factor)
step_height = segment_height * (1 - overlap_factor)
segments = []
segment_idx = 0
print(f"\nSegmenting image: {image_path}")
print(f"Image size: {img_width}x{img_height}")
print(f"Grid: {rows}x{cols} with {overlap_percent}% overlap")
for row in range(rows):
for col in range(cols):
# Calculate segment boundaries
left = int(col * step_width)
top = int(row * step_height)
right = int(min(left + segment_width, img_width))
bottom = int(min(top + segment_height, img_height))
# Crop segment
segment = img.crop((left, top, right, bottom))
# Save segment
segment_filename = f"segment_{row}_{col}.png"
segment_path = os.path.join(output_dir, segment_filename)
segment.save(segment_path)
# Store segment info
position_info = {
'row': row,
'col': col,
'left': left,
'top': top,
'right': right,
'bottom': bottom,
'width': right - left,
'height': bottom - top
}
segments.append((segment_path, position_info))
segment_idx += 1
print(f" Created: {segment_filename} at position ({row}, {col})")
print(f"\nTotal segments created: {len(segments)}")
return segments
def detect_text_segment(self, segment_path):
"""
Detect text in a segment using Textract
Args:
segment_path: Path to segment image
Returns:
Textract results dictionary
"""
with open(segment_path, 'rb') as image_file:
image_bytes = image_file.read()
result = self.textract_client.detect_document_text(
Document={'Bytes': image_bytes}
)
return result
def clean_text_from_segment(self, segment_path, textract_data, output_path,
shrink_percent=8.5, keep_regex_list=None, min_confidence=80):
"""
Remove text from a segment
Args:
segment_path: Path to input segment
textract_data: Textract results
output_path: Path to save cleaned segment
shrink_percent: Percentage to shrink bounding boxes
keep_regex_list: List of regex patterns to keep
min_confidence: Minimum confidence to remove text
Returns:
Statistics dictionary
"""
# Compile regex patterns
compiled_patterns = []
if keep_regex_list:
for pattern in keep_regex_list:
try:
compiled_patterns.append(re.compile(pattern))
except re.error as e:
print(f"Warning: Invalid regex pattern '{pattern}': {e}")
# Load image
img = Image.open(segment_path)
width, height = img.size
draw = ImageDraw.Draw(img)
words_removed = 0
words_kept = 0
# Process each word
for block in textract_data['Blocks']:
if block['BlockType'] == 'WORD':
text = block['Text']
confidence = block['Confidence']
# Check if word should be kept
should_keep = False
if confidence < min_confidence:
should_keep = True
words_kept += 1
if compiled_patterns:
for pattern in compiled_patterns:
if pattern.match(text):
should_keep = True
words_kept += 1
break
if should_keep:
continue
# Remove text
bbox = block['Geometry']['BoundingBox']
left = int(bbox['Left'] * width)
top = int(bbox['Top'] * height)
box_width = int(bbox['Width'] * width)
box_height = int(bbox['Height'] * height)
# Apply shrink
if shrink_percent > 0:
shrink_factor = shrink_percent / 100
width_reduction = int(box_width * shrink_factor / 2)
height_reduction = int(box_height * shrink_factor / 2)
left += width_reduction
top += height_reduction
box_width -= width_reduction * 2
box_height -= height_reduction * 2
# Draw white rectangle
draw.rectangle(
[(left, top), (left + box_width, top + box_height)],
fill='white'
)
words_removed += 1
# Save cleaned image
img.save(output_path)
return {
'words_removed': words_removed,
'words_kept': words_kept
}
def recognize_objects_segment(self, segment_path, min_confidence=CONFIDENCE_THRESHOLD):
"""
Recognize objects in a cleaned segment using Custom Labels
Args:
segment_path: Path to cleaned segment
min_confidence: Minimum confidence threshold
Returns:
Dictionary with detection results
"""
with open(segment_path, 'rb') as image_file:
image_bytes = image_file.read()
try:
response = self.rekognition_client.detect_custom_labels(
ProjectVersionArn=self.custom_labels_arn,
Image={'Bytes': image_bytes},
MinConfidence=min_confidence
)
return {
'custom_labels': response.get('CustomLabels', []),
'success': True
}
except Exception as e:
print(f"Error detecting custom labels: {e}")
return {
'custom_labels': [],
'success': False,
'error': str(e)
}
def calculate_iou(self, box1, box2):
"""
Calculate Intersection over Union (IoU) between two bounding boxes
Args:
box1, box2: Bounding boxes in global coordinates {left, top, right, bottom}
Returns:
IoU value (0 to 1)
"""
# Calculate intersection
x_left = max(box1['left'], box2['left'])
y_top = max(box1['top'], box2['top'])
x_right = min(box1['right'], box2['right'])
y_bottom = min(box1['bottom'], box2['bottom'])
if x_right < x_left or y_bottom < y_top:
return 0.0
intersection_area = (x_right - x_left) * (y_bottom - y_top)
# Calculate union
box1_area = (box1['right'] - box1['left']) * (box1['bottom'] - box1['top'])
box2_area = (box2['right'] - box2['left']) * (box2['bottom'] - box2['top'])
union_area = box1_area + box2_area - intersection_area
if union_area == 0:
return 0.0
return intersection_area / union_area
def merge_bounding_boxes(self, boxes):
"""
Merge multiple bounding boxes into one by computing their union
Args:
boxes: List of bounding box dicts with {left, top, right, bottom}
Returns:
Merged bounding box
"""
if not boxes:
return None
min_left = min(box['left'] for box in boxes)
min_top = min(box['top'] for box in boxes)
max_right = max(box['right'] for box in boxes)
max_bottom = max(box['bottom'] for box in boxes)
return {
'left': min_left,
'top': min_top,
'right': max_right,
'bottom': max_bottom,
'width': max_right - min_left,
'height': max_bottom - min_top
}
def deduplicate_detections(self, all_detections, iou_threshold=0.3):
"""
Remove duplicate detections across overlapping segments using Non-Maximum Suppression
Args:
all_detections: List of detection dicts with global coordinates
iou_threshold: IoU threshold for considering boxes as duplicates
Returns:
List of deduplicated detections
"""
if not all_detections:
return []
print(f"\n[DEDUPLICATION] Processing {len(all_detections)} detections...")
# Group detections by label name
detections_by_label = {}
for det in all_detections:
label = det['Name']
if label not in detections_by_label:
detections_by_label[label] = []
detections_by_label[label].append(det)
deduplicated = []
# Process each label separately
for label, detections in detections_by_label.items():
print(f"\n Processing label: {label} ({len(detections)} detections)")
# Sort by confidence (highest first)
detections = sorted(detections, key=lambda x: x['Confidence'], reverse=True)
# Group overlapping detections
groups = []
used = set()
for i, det in enumerate(detections):
if i in used:
continue
# Start a new group with this detection
group = [det]
used.add(i)
# Find all overlapping detections
for j, other_det in enumerate(detections):
if j in used or j == i:
continue
iou = self.calculate_iou(det['global_bbox'], other_det['global_bbox'])
if iou > iou_threshold:
group.append(other_det)
used.add(j)
groups.append(group)
print(f" Found {len(groups)} unique objects (merged from {len(detections)} detections)")
# Merge each group into a single detection
for group in groups:
if len(group) == 1:
# No duplicates, keep as is
deduplicated.append(group[0])
else:
# Merge multiple detections
merged_bbox = self.merge_bounding_boxes([d['global_bbox'] for d in group])
avg_confidence = sum(d['Confidence'] for d in group) / len(group)
merged_detection = {
'Name': label,
'Confidence': avg_confidence,
'global_bbox': merged_bbox,
'merged_from': len(group),
'source_segments': [d['segment_name'] for d in group]
}
deduplicated.append(merged_detection)
print(f" Merged {len(group)} detections into one")
print(f"\n[DEDUPLICATION] Complete: {len(all_detections)} -> {len(deduplicated)} detections")
return deduplicated
def get_bbox_center(self, bbox):
"""Get center point of bounding box"""
if 'left' in bbox:
center_x = bbox['left'] + bbox['width'] / 2
center_y = bbox['top'] + bbox['height'] / 2
else:
center_x = bbox['Left'] + bbox['Width'] / 2
center_y = bbox['Top'] + bbox['Height'] / 2
return (center_x, center_y)
def calculate_distance(self, center1, center2):
"""Calculate Euclidean distance between two centers"""
return np.sqrt(
(center1[0] - center2[0])**2 +
(center1[1] - center2[1])**2
)
def match_objects_to_text_hungarian(self, objects, all_text_detections,
pattern_regex, max_distance=None,
img_width=1.0, img_height=1.0):
"""
Match deduplicated objects to VM-#### text labels using Hungarian algorithm
Args:
objects: List of deduplicated object detections with global_bbox
all_text_detections: Combined text detections from all segments
pattern_regex: Compiled regex pattern (e.g., r'VM-\d{4}')
max_distance: Maximum allowed distance (in pixels)
img_width: Image width
img_height: Image height
Returns:
Dict with matches, unmatched_objects, unmatched_texts
"""
# Filter text that matches VM-#### pattern
matching_texts = []
for text_data in all_text_detections:
text = text_data['text']
if pattern_regex.search(text):
matching_texts.append(text_data)
if not objects:
print("\n[MATCHING] No objects to match")
return {
'matches': [],
'unmatched_objects': [],
'unmatched_texts': matching_texts
}
if not matching_texts:
print("\n[MATCHING] No matching text found")
return {
'matches': [],
'unmatched_objects': objects,
'unmatched_texts': []
}
n_objects = len(objects)
n_texts = len(matching_texts)
print(f"\n{'='*80}")
print(f"HUNGARIAN ALGORITHM MATCHING")
print(f"{'='*80}")
print(f"Objects to match: {n_objects}")
print(f"Text labels (VM-####): {n_texts}")
print(f"Max distance: {max_distance if max_distance else 'unlimited'} pixels")
# Build cost matrix
max_dim = max(n_objects, n_texts)
cost_matrix = np.full((max_dim, max_dim), 1e10)
for i, obj in enumerate(objects):
obj_bbox = obj['global_bbox']
obj_center = self.get_bbox_center(obj_bbox)
for j, text_data in enumerate(matching_texts):
text_bbox = text_data['global_bbox']
text_center = self.get_bbox_center(text_bbox)
# Calculate distance
distance = self.calculate_distance(obj_center, text_center)
# Apply distance threshold
if max_distance and distance > max_distance:
cost_matrix[i, j] = 1e10
else:
cost_matrix[i, j] = distance
# Solve assignment problem with Hungarian algorithm
row_indices, col_indices = linear_sum_assignment(cost_matrix)
# Build matches
matches = []
matched_obj_indices = set()
matched_text_indices = set()
for obj_idx, text_idx in zip(row_indices, col_indices):
# Skip padding or high-cost assignments
if (obj_idx >= n_objects or text_idx >= n_texts or
cost_matrix[obj_idx, text_idx] >= 1e10):
continue
distance = cost_matrix[obj_idx, text_idx]
match = {
'object': objects[obj_idx],
'object_name': objects[obj_idx]['Name'],
'object_bbox': objects[obj_idx]['global_bbox'],
'object_confidence': objects[obj_idx]['Confidence'],
'text': matching_texts[text_idx]['text'],
'text_bbox': matching_texts[text_idx]['global_bbox'],
'text_confidence': matching_texts[text_idx]['confidence'],
'distance': distance,
'distance_pixels': distance
}
matches.append(match)
matched_obj_indices.add(obj_idx)
matched_text_indices.add(text_idx)
print(f"\n ✓ Match: {objects[obj_idx]['Name']} -> {matching_texts[text_idx]['text']}")
print(f" Distance: {distance:.2f} pixels")
print(f" Object confidence: {objects[obj_idx]['Confidence']:.2f}%")
print(f" Text confidence: {matching_texts[text_idx]['confidence']:.2f}%")
# Find unmatched items
unmatched_objects = [
objects[i] for i in range(n_objects)
if i not in matched_obj_indices
]
unmatched_texts = [
matching_texts[j] for j in range(n_texts)
if j not in matched_text_indices
]
# Print summary
print(f"\n{'='*80}")
print(f"MATCHING SUMMARY")
print(f"{'='*80}")
print(f"Successful matches: {len(matches)}")
print(f"Unmatched objects: {len(unmatched_objects)}")
print(f"Unmatched VM-#### labels: {len(unmatched_texts)}")
if unmatched_objects:
print(f"\nUnmatched objects:")
for obj in unmatched_objects:
print(f" - {obj['Name']} (confidence: {obj['Confidence']:.2f}%)")
if unmatched_texts:
print(f"\nUnmatched text labels:")
for text_data in unmatched_texts:
print(f" - {text_data['text']} (confidence: {text_data['confidence']:.2f}%)")
return {
'matches': matches,
'unmatched_objects': unmatched_objects,
'unmatched_texts': unmatched_texts,
'n_objects': n_objects,
'n_texts': n_texts,
'matching_rate': len(matches) / max(n_objects, n_texts) if max(n_objects, n_texts) > 0 else 0
}
def match_objects_to_text_by_type(self, objects, all_text_detections,
max_distance=200, img_width=1.0, img_height=1.0):
"""
Match objects to text based on object type:
- globo, gaveta, retencao, espera -> Match to VM-#### labels (using Hungarian algorithm)
- sis_con_dist, instrumento_local -> Match to 2 text labels inside (top and bottom)
- Other objects -> Match to single text inside their bounding box
Args:
objects: List of deduplicated object detections
all_text_detections: List of deduplicated text detections
max_distance: Maximum distance for VM-#### matching (pixels)
img_width: Image width
img_height: Image height
Returns:
Dict with matches, unmatched_objects, unmatched_texts
"""
# Define which objects should match to VM-#### labels
VM_LABEL_OBJECTS = ['globo', 'gaveta', 'retencao', 'espera']
# Define which objects have 2 internal labels
TWO_LABEL_OBJECTS = ['sis_con_dist', 'instrumento_local']
# Separate objects by matching type
vm_label_objects = []
two_label_objects = []
single_label_objects = []
for obj in objects:
obj_name = obj['Name'].lower()
if obj_name in VM_LABEL_OBJECTS:
vm_label_objects.append(obj)
elif obj_name in TWO_LABEL_OBJECTS:
two_label_objects.append(obj)
else:
single_label_objects.append(obj)
print(f"\n{'='*80}")
print(f"OBJECT-TEXT MATCHING BY TYPE")
print(f"{'='*80}")
print(f"Objects matching to VM-#### labels: {len(vm_label_objects)}")
if vm_label_objects:
print(f" Types: {', '.join([obj['Name'] for obj in vm_label_objects])}")
print(f"Objects with 2 internal labels: {len(two_label_objects)}")
if two_label_objects:
print(f" Types: {', '.join([obj['Name'] for obj in two_label_objects])}")
print(f"Objects with 1 internal label: {len(single_label_objects)}")
if single_label_objects:
print(f" Types: {', '.join([obj['Name'] for obj in single_label_objects])}")
# Separate text by type
vm_pattern = re.compile(r'VM-\d{4}')
vm_texts = [t for t in all_text_detections if vm_pattern.search(t['text'])]
other_texts = [t for t in all_text_detections if not vm_pattern.search(t['text'])]
print(f"\nVM-#### labels available: {len(vm_texts)}")
print(f"Other text available: {len(other_texts)}")
all_matches = []
all_unmatched_objects = []
all_unmatched_texts = []
used_texts = set() # Track which texts have been used
# Part 1: Match VM-#### objects to VM-#### labels using Hungarian algorithm
if vm_label_objects:
print(f"\n{'='*80}")
print(f"PART 1: Matching VM-#### label objects to VM-#### text")
print(f"{'='*80}")
vm_matching_results = self.match_objects_to_text_hungarian(
objects=vm_label_objects,
all_text_detections=vm_texts,
pattern_regex=vm_pattern,
max_distance=max_distance,
img_width=img_width,
img_height=img_height
)
# Add match type identifier
for match in vm_matching_results['matches']:
match['match_type'] = 'vm_label'
all_matches.extend(vm_matching_results['matches'])
all_unmatched_objects.extend(vm_matching_results['unmatched_objects'])
all_unmatched_texts.extend(vm_matching_results['unmatched_texts'])
# Part 2: Match objects with 2 internal labels
if two_label_objects:
print(f"\n{'='*80}")
print(f"PART 2: Matching objects to 2 internal labels")
print(f"{'='*80}")
for obj in two_label_objects:
obj_bbox = obj['global_bbox']
obj_name = obj['Name']
# Calculate object center
obj_center_x = obj_bbox['left'] + obj_bbox['width'] / 2
obj_center_y = obj_bbox['top'] + obj_bbox['height'] / 2
# Find all text inside this object's bounding box
texts_inside = []
for text_data in other_texts:
text_id = id(text_data)
if text_id in used_texts:
continue
text_bbox = text_data['global_bbox']
# Check if text center is inside object bbox
text_center_x = text_bbox['left'] + text_bbox['width'] / 2
text_center_y = text_bbox['top'] + text_bbox['height'] / 2
if (obj_bbox['left'] <= text_center_x <= obj_bbox['right'] and
obj_bbox['top'] <= text_center_y <= obj_bbox['bottom']):
# Calculate distance from text center to object center
distance_to_center = self.calculate_distance(
(obj_center_x, obj_center_y),
(text_center_x, text_center_y)
)
texts_inside.append({
'text_data': text_data,
'distance_to_center': distance_to_center,
'y_position': text_center_y
})
if len(texts_inside) >= 2:
# Sort by distance to center (closest first)
texts_inside.sort(key=lambda t: t['distance_to_center'])
# Take the 2 closest texts to center
closest_two = texts_inside[:2]
# Sort these 2 by vertical position (top to bottom)
closest_two.sort(key=lambda t: t['y_position'])
top_text = closest_two[0]['text_data']
bottom_text = closest_two[1]['text_data']
# Create match with both labels
match = {
'object': obj,
'object_name': obj_name,
'object_bbox': obj_bbox,
'object_confidence': obj['Confidence'],
'text': f"{top_text['text']} / {bottom_text['text']}",
'text_top': top_text['text'],
'text_bottom': bottom_text['text'],
'text_bbox_top': top_text['global_bbox'],
'text_bbox_bottom': bottom_text['global_bbox'],
'text_confidence_top': top_text['confidence'],
'text_confidence_bottom': bottom_text['confidence'],
'distance': 0,
'distance_pixels': 0,
'match_type': 'two_labels',
'texts_found_inside': len(texts_inside)
}
all_matches.append(match)
# Mark texts as used
used_texts.add(id(top_text))
used_texts.add(id(bottom_text))
print(f"\n ✓ Match: {obj_name} -> '{top_text['text']}' (top) / '{bottom_text['text']}' (bottom)")
print(f" Object confidence: {obj['Confidence']:.2f}%")
print(f" Top text confidence: {top_text['confidence']:.2f}%")
print(f" Bottom text confidence: {bottom_text['confidence']:.2f}%")
print(f" Top text distance to center: {closest_two[0]['distance_to_center']:.2f}px")
print(f" Bottom text distance to center: {closest_two[1]['distance_to_center']:.2f}px")
if len(texts_inside) > 2:
print(f" Note: {len(texts_inside)} texts found inside, used 2 closest to center")
elif len(texts_inside) == 1:
# Only found 1 text, but expected 2
print(f"\n ⚠ Partial match: {obj_name} - Found only 1 text inside (expected 2)")
print(f" Text: '{texts_inside[0]['text_data']['text']}'")
all_unmatched_objects.append(obj)
else:
# No text inside
all_unmatched_objects.append(obj)
print(f"\n ✗ No match: {obj_name} - No text found inside bounding box (expected 2)")
# Part 3: Match other objects to single text inside their bounding boxes
if single_label_objects:
print(f"\n{'='*80}")
print(f"PART 3: Matching objects to single internal text")
print(f"{'='*80}")
for obj in single_label_objects:
obj_bbox = obj['global_bbox']
obj_name = obj['Name']
# Calculate object center
obj_center_x = obj_bbox['left'] + obj_bbox['width'] / 2
obj_center_y = obj_bbox['top'] + obj_bbox['height'] / 2
# Find all text inside this object's bounding box
texts_inside = []
for text_data in other_texts:
text_id = id(text_data)
if text_id in used_texts:
continue
text_bbox = text_data['global_bbox']
# Check if text center is inside object bbox
text_center_x = text_bbox['left'] + text_bbox['width'] / 2
text_center_y = text_bbox['top'] + text_bbox['height'] / 2
if (obj_bbox['left'] <= text_center_x <= obj_bbox['right'] and
obj_bbox['top'] <= text_center_y <= obj_bbox['bottom']):
texts_inside.append(text_data)
if texts_inside:
# Choose the text closest to object center
closest_text = min(texts_inside, key=lambda t: self.calculate_distance(
(obj_center_x, obj_center_y),
(t['global_bbox']['left'] + t['global_bbox']['width'] / 2,
t['global_bbox']['top'] + t['global_bbox']['height'] / 2)
))
# Calculate distance for reporting
text_center_x = closest_text['global_bbox']['left'] + closest_text['global_bbox']['width'] / 2
text_center_y = closest_text['global_bbox']['top'] + closest_text['global_bbox']['height'] / 2
distance_to_center = self.calculate_distance(
(obj_center_x, obj_center_y),
(text_center_x, text_center_y)
)
# Create match
match = {
'object': obj,
'object_name': obj_name,
'object_bbox': obj_bbox,
'object_confidence': obj['Confidence'],
'text': closest_text['text'],
'text_bbox': closest_text['global_bbox'],
'text_confidence': closest_text['confidence'],
'distance': distance_to_center,
'distance_pixels': distance_to_center,
'match_type': 'single_label',
'texts_found_inside': len(texts_inside)
}
all_matches.append(match)
# Mark text as used
used_texts.add(id(closest_text))
print(f"\n ✓ Match: {obj_name} -> '{closest_text['text']}' (internal)")
print(f" Object confidence: {obj['Confidence']:.2f}%")
print(f" Text confidence: {closest_text['confidence']:.2f}%")
print(f" Distance to center: {distance_to_center:.2f}px")
if len(texts_inside) > 1:
print(f" Note: {len(texts_inside)} texts found inside, chose closest to center")
else:
# No text inside
all_unmatched_objects.append(obj)
print(f"\n ✗ No match: {obj_name} - No text found inside bounding box")
# Part 4: Report remaining unmatched texts (those not used)
for text_data in other_texts:
if id(text_data) not in used_texts:
all_unmatched_texts.append(text_data)
# Summary
print(f"\n{'='*80}")
print(f"MATCHING SUMMARY")
print(f"{'='*80}")
print(f"Total matches: {len(all_matches)}")
print(f" - VM-#### label matches: {sum(1 for m in all_matches if m['match_type'] == 'vm_label')}")
print(f" - Two-label matches: {sum(1 for m in all_matches if m['match_type'] == 'two_labels')}")
print(f" - Single-label matches: {sum(1 for m in all_matches if m['match_type'] == 'single_label')}")
print(f"Unmatched objects: {len(all_unmatched_objects)}")
print(f"Unmatched texts: {len(all_unmatched_texts)}")
if all_unmatched_objects:
print(f"\nUnmatched objects:")
for obj in all_unmatched_objects:
print(f" - {obj['Name']} (confidence: {obj['Confidence']:.2f}%)")
if all_unmatched_texts:
print(f"\nUnmatched texts:")
for text_data in all_unmatched_texts:
print(f" - '{text_data['text']}' (confidence: {text_data['confidence']:.2f}%)")
return {
'matches': all_matches,
'unmatched_objects': all_unmatched_objects,
'unmatched_texts': all_unmatched_texts,
'n_objects': len(objects),
'n_texts': len(all_text_detections),
'matching_rate': len(all_matches) / len(objects) if objects else 0
}
def deduplicate_text_detections(self, all_text_detections, iou_threshold=0.5):
"""
Remove duplicate text detections across overlapping segments
Args:
all_text_detections: List of text detection dicts with global coordinates
iou_threshold: IoU threshold for considering text as duplicates
Returns:
List of deduplicated text detections
"""
if not all_text_detections:
return []
print(f"\n[TEXT DEDUPLICATION] Processing {len(all_text_detections)} text detections...")
# Sort by confidence (highest first)
all_text_detections = sorted(all_text_detections, key=lambda x: x['confidence'], reverse=True)
deduplicated = []
used = set()
for i, text_det in enumerate(all_text_detections):
if i in used:
continue
# Start a new group
group = [text_det]
used.add(i)
# Find overlapping text with same content
for j, other_det in enumerate(all_text_detections):
if j in used or j == i:
continue
# Check if text is the same (case-insensitive)
if text_det['text'].lower() == other_det['text'].lower():
iou = self.calculate_iou(text_det['global_bbox'], other_det['global_bbox'])
if iou > iou_threshold:
group.append(other_det)
used.add(j)
# Take the one with highest confidence (already sorted)
if len(group) > 1:
print(f" Merged {len(group)} duplicates of '{text_det['text']}'")
deduplicated.append(text_det)
print(f"[TEXT DEDUPLICATION] Complete: {len(all_text_detections)} -> {len(deduplicated)} text detections")
return deduplicated
def process_single_diagram(self, diagram_path, output_base_dir,
grid_size=(5, 5), overlap_percent=10,
keep_regex_list=None, min_confidence=80,
custom_labels_confidence=80, iou_threshold=0.3,
matching_max_distance=200):
"""
Complete pipeline: segment, detect text, clean, recognize objects, and match to labels
Args:
diagram_path: Path to input diagram
output_base_dir: Base directory for all outputs
grid_size: Tuple (rows, cols) for segmentation
overlap_percent: Overlap percentage for segments
keep_regex_list: Regex patterns for text to keep
min_confidence: Minimum confidence for text removal
custom_labels_confidence: Minimum confidence for object detection
iou_threshold: IoU threshold for deduplication (0.3 = 30% overlap)
matching_max_distance: Maximum distance for matching objects to text (pixels)
Returns:
Dictionary with complete results including matches
"""
# Create output directories
segments_dir = os.path.join(output_base_dir, 'segments')
text_json_dir = os.path.join(output_base_dir, 'text_detections')
cleaned_dir = os.path.join(output_base_dir, 'cleaned_segments')
detections_dir = os.path.join(output_base_dir, 'object_detections')
for dir_path in [segments_dir, text_json_dir, cleaned_dir, detections_dir]:
os.makedirs(dir_path, exist_ok=True)
print("="*80)
print("DIAGRAM PROCESSING PIPELINE")
print("="*80)
# Step 1: Segment the diagram
print("\n[STEP 1] Segmenting diagram...")
segments = self.segment_image(diagram_path, segments_dir, grid_size, overlap_percent)
# Get original image dimensions
original_img = Image.open(diagram_path)
img_width, img_height = original_img.size
# Step 2-4: Process each segment
all_results = []
all_global_detections = []
all_text_detections = []
for idx, (segment_path, position_info) in enumerate(segments):
segment_name = os.path.basename(segment_path)
base_name = os.path.splitext(segment_name)[0]
print(f"\n{'='*80}")
print(f"Processing segment {idx+1}/{len(segments)}: {segment_name}")
print(f"{'='*80}")
# Step 2: Detect text
print("\n[STEP 2] Detecting text with Textract...")
textract_data = self.detect_text_segment(segment_path)
# Save text detection JSON
json_path = os.path.join(text_json_dir, f"{base_name}.json")
with open(json_path, 'w') as f:
json.dump(textract_data, f, indent=2)
word_count = sum(1 for b in textract_data['Blocks'] if b['BlockType'] == 'WORD')
print(f" Detected {word_count} words")
# Extract text with global coordinates
for block in textract_data['Blocks']:
if block['BlockType'] == 'WORD':
bbox = block['Geometry']['BoundingBox']
# Convert to global coordinates
seg_left = position_info['left']
seg_top = position_info['top']
seg_width = position_info['width']
seg_height = position_info['height']
global_left = seg_left + int(bbox['Left'] * seg_width)
global_top = seg_top + int(bbox['Top'] * seg_height)
global_width = int(bbox['Width'] * seg_width)
global_height = int(bbox['Height'] * seg_height)
all_text_detections.append({
'text': block['Text'],
'confidence': block['Confidence'],
'segment_name': segment_name,
'global_bbox': {
'left': global_left,
'top': global_top,
'right': global_left + global_width,
'bottom': global_top + global_height,
'width': global_width,
'height': global_height
}
})
# Step 3: Clean text from segment
print("\n[STEP 3] Cleaning text from segment...")
cleaned_path = os.path.join(cleaned_dir, segment_name)
clean_stats = self.clean_text_from_segment(
segment_path, textract_data, cleaned_path,
keep_regex_list=keep_regex_list, min_confidence=min_confidence
)
print(f" Removed: {clean_stats['words_removed']} words")
print(f" Kept: {clean_stats['words_kept']} words")
# Step 4: Recognize objects with Custom Labels
print("\n[STEP 4] Recognizing objects with Custom Labels...")
detection_results = self.recognize_objects_segment(
cleaned_path, min_confidence=custom_labels_confidence
)
# Save detection results JSON
detection_json_path = os.path.join(detections_dir, f"{base_name}_detections.json")
with open(detection_json_path, 'w') as f:
json.dump(detection_results, f, indent=2)
if detection_results['success']:
labels = detection_results['custom_labels']
print(f" Detected {len(labels)} objects:")
# Convert to global coordinates and store
for label in labels:
print(f" - {label['Name']}: {label['Confidence']:.2f}%")
if 'Geometry' in label and 'BoundingBox' in label['Geometry']:
bbox = label['Geometry']['BoundingBox']
# Convert segment-local to global coordinates
seg_left = position_info['left']
seg_top = position_info['top']
seg_width = position_info['width']
seg_height = position_info['height']
global_left = seg_left + int(bbox['Left'] * seg_width)
global_top = seg_top + int(bbox['Top'] * seg_height)
global_width = int(bbox['Width'] * seg_width)
global_height = int(bbox['Height'] * seg_height)
global_detection = {
'Name': label['Name'],
'Confidence': label['Confidence'],
'segment_name': segment_name,
'global_bbox': {
'left': global_left,
'top': global_top,
'right': global_left + global_width,
'bottom': global_top + global_height,
'width': global_width,
'height': global_height
}
}
all_global_detections.append(global_detection)
else:
print(f" Error: {detection_results.get('error', 'Unknown error')}")
# Store results
segment_result = {
'segment_name': segment_name,
'segment_path': segment_path,
'position': position_info,
'cleaned_path': cleaned_path,
'text_detection': {
'total_words': word_count,
'words_removed': clean_stats['words_removed'],
'words_kept': clean_stats['words_kept']
},
'object_detection': detection_results
}
all_results.append(segment_result)
# Step 5: Deduplicate detections
print("\n" + "="*80)
print("[STEP 5] Deduplicating detections across segments")
print("="*80)
deduplicated_detections = self.deduplicate_detections(
all_global_detections,
iou_threshold=iou_threshold
)
print("\n[STEP 5b] Deduplicating text detections")
deduplicated_text = self.deduplicate_text_detections(
all_text_detections,
iou_threshold=0.5
)
# Step 6: Match objects to text based on object type
print("\n" + "="*80)
print("[STEP 6] Matching objects to text (by type)")
print("="*80)
matching_results = self.match_objects_to_text_by_type(
objects=deduplicated_detections,
all_text_detections=deduplicated_text,
max_distance=matching_max_distance,
img_width=img_width,
img_height=img_height
)
# Generate summary
print("\n" + "="*80)
print("PROCESSING COMPLETE - SUMMARY")
print("="*80)
total_objects_raw = len(all_global_detections)
total_objects_deduplicated = len(deduplicated_detections)
total_words_detected = sum(r['text_detection']['total_words']
for r in all_results)
total_words_removed = sum(r['text_detection']['words_removed']
for r in all_results)
print(f"\nSegments processed: {len(segments)}")
print(f"Total words detected (raw): {len(all_text_detections)}")
print(f"Total words after deduplication: {len(deduplicated_text)}")
print(f"Total words removed: {total_words_removed}")
print(f"Total objects detected (raw): {total_objects_raw}")
print(f"Total objects after deduplication: {total_objects_deduplicated}")
print(f"Total VM-#### labels found: {matching_results.get('n_texts', 0)}")
print(f"Successful matches: {len(matching_results.get('matches', []))}")
print(f"Matching rate: {matching_results.get('matching_rate', 0)*100:.1f}%")
# Save complete results
summary_path = os.path.join(output_base_dir, 'processing_summary.json')
summary = {
'input_diagram': diagram_path,
'image_dimensions': {'width': img_width, 'height': img_height},
'grid_size': grid_size,
'overlap_percent': overlap_percent,
'iou_threshold': iou_threshold,
'matching_max_distance': matching_max_distance,
'total_segments': len(segments),
'total_words_detected': total_words_detected,
'total_words_removed': total_words_removed,
'total_objects_raw': total_objects_raw,
'total_objects_deduplicated': total_objects_deduplicated,
'total_vm_labels': matching_results['n_texts'],
'total_matches': len(matching_results['matches']),
'matching_rate': matching_results['matching_rate'],
'segments': all_results,
'deduplicated_detections': deduplicated_detections,
'matching_results': matching_results
}
with open(summary_path, 'w') as f:
json.dump(summary, f, indent=2)
# Save matched pairs to separate file
matches_path = os.path.join(output_base_dir, 'object_label_matches.json')
with open(matches_path, 'w') as f:
json.dump(matching_results, f, indent=2)
# Create human-readable matches report
report_path = os.path.join(output_base_dir, 'matches_report.txt')
with open(report_path, 'w') as f:
f.write("="*80 + "\n")
f.write("OBJECT-TO-LABEL MATCHING REPORT\n")
f.write("="*80 + "\n\n")
f.write(f"Total Objects: {matching_results.get('n_objects', 0)}\n")
f.write(f"Total Text Labels: {matching_results.get('n_texts', 0)}\n")
f.write(f"Successful Matches: {len(matching_results.get('matches', []))}\n")
f.write(f"Matching Rate: {matching_results.get('matching_rate', 0)*100:.1f}%\n\n")
f.write("="*80 + "\n")
f.write("MATCHED PAIRS\n")
f.write("="*80 + "\n\n")
for i, match in enumerate(matching_results.get('matches', []), 1):
match_type = match.get('match_type', 'vm_label')
f.write(f"{i}. {match['object_name']} -> {match['text']}\n")
f.write(f" Match Type: {match_type}\n")
f.write(f" Object Confidence: {match['object_confidence']:.2f}%\n")
if match_type == 'two_labels':
f.write(f" Top Text: {match['text_top']}\n")
f.write(f" Top Text Confidence: {match['text_confidence_top']:.2f}%\n")
f.write(f" Bottom Text: {match['text_bottom']}\n")
f.write(f" Bottom Text Confidence: {match['text_confidence_bottom']:.2f}%\n")
else:
f.write(f" Text Confidence: {match['text_confidence']:.2f}%\n")
f.write(f" Distance: {match['distance']:.2f} pixels\n")
f.write("\n")
if matching_results.get('unmatched_objects'):
f.write("="*80 + "\n")
f.write("UNMATCHED OBJECTS\n")
f.write("="*80 + "\n\n")
for obj in matching_results['unmatched_objects']:
f.write(f"- {obj['Name']} (Confidence: {obj['Confidence']:.2f}%)\n\n")
if matching_results.get('unmatched_texts'):
f.write("="*80 + "\n")
f.write("UNMATCHED TEXT LABELS\n")
f.write("="*80 + "\n\n")
for text_data in matching_results['unmatched_texts']:
f.write(f"- {text_data['text']} (Confidence: {text_data['confidence']:.2f}%)\n\n")
print(f"\nResults saved to: {output_base_dir}")
print(f"Summary: {summary_path}")
print(f"Matches JSON: {matches_path}")
print(f"Matches Report: {report_path}")
return summary
def visualize_detections(self, summary_data, output_path, show_duplicates=False):
"""Create visualization of detections"""
diagram_path = summary_data['input_diagram']
img = Image.open(diagram_path)
draw = ImageDraw.Draw(img)
if not show_duplicates:
deduplicated = summary_data.get('deduplicated_detections', [])
for detection in deduplicated:
bbox = detection['global_bbox']
draw.rectangle(
[(bbox['left'], bbox['top']), (bbox['right'], bbox['bottom'])],
outline='green',
width=3
)
label = f"{detection['Name']} ({detection['Confidence']:.1f}%)"
draw.text((bbox['left'], bbox['top'] - 15), label, fill='green')
img.save(output_path)
print(f"Visualization saved to: {output_path}")
def visualize_text_detections(self, summary_data, output_path, show_duplicates=False):
"""Create visualization of text detections"""
diagram_path = summary_data['input_diagram']
img = Image.open(diagram_path)
img.save(output_path)
print(f"Text visualization saved to: {output_path}")
def visualize_matches(self, summary_data, output_path):
"""Create visualization of matches"""
diagram_path = summary_data['input_diagram']
img = Image.open(diagram_path)
draw = ImageDraw.Draw(img)
matching_results = summary_data.get('matching_results', {})
matches = matching_results.get('matches', [])
for match in matches:
obj_bbox = match['object_bbox']
match_type = match.get('match_type', 'vm_label')
color = 'blue' if match_type == 'vm_label' else 'green'
draw.rectangle(
[(obj_bbox['left'], obj_bbox['top']),
(obj_bbox['right'], obj_bbox['bottom'])],
outline=color,
width=3
)
img.save(output_path)
print(f"Match visualization saved to: {output_path}")