Files
ExperionCrawler/dxf-graph/pid_extractor.py
2026-05-08 17:22:10 +09:00

613 lines
21 KiB
Python

"""
P&ID Extractor - DXF/PDF → Claude Vision API → CSV → PostgreSQL
Extracts: Equipment Name, Tag No., Instrument Type, Line Number, P&ID Drawing No.
"""
import os
import json
import csv
import base64
import re
import logging
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, field, asdict
from typing import Optional
import anthropic
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("logs/extractor.log"),
logging.StreamHandler()
]
)
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────
# Data Model
# ─────────────────────────────────────────────
@dataclass
class PIDItem:
"""Single extracted item from a P&ID drawing."""
pid_drawing_no: str = ""
tag_no: str = ""
equipment_name: str = ""
instrument_type: str = ""
line_number: str = ""
service_description: str = ""
confidence: float = 0.0
source_file: str = ""
extracted_at: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> dict:
return asdict(self)
# ─────────────────────────────────────────────
# File Converters
# ─────────────────────────────────────────────
def dxf_to_image(dxf_path: str, output_dir: str = "output") -> list[str]:
"""
Convert DXF file to PNG image(s) using ezdxf + matplotlib.
Returns list of image paths.
"""
try:
import ezdxf
from ezdxf.addons.drawing import RenderContext, Frontend
from ezdxf.addons.drawing.matplotlib import MatplotlibBackend
import matplotlib.pyplot as plt
doc = ezdxf.readfile(dxf_path)
msp = doc.modelspace()
fig = plt.figure(figsize=(24, 18), dpi=150)
ax = fig.add_axes([0, 0, 1, 1])
ctx = RenderContext(doc)
out = MatplotlibBackend(ax)
Frontend(ctx, out).draw_layout(msp, finalize=True)
output_path = Path(output_dir) / (Path(dxf_path).stem + ".png")
fig.savefig(output_path, dpi=150, bbox_inches="tight",
facecolor="white", edgecolor="none")
plt.close(fig)
log.info(f"DXF converted: {output_path}")
return [str(output_path)]
except ImportError:
log.warning("ezdxf/matplotlib not installed. Run: pip install ezdxf matplotlib")
return []
except Exception as e:
log.error(f"DXF conversion failed: {e}")
return []
def pdf_to_images(pdf_path: str, output_dir: str = "output",
dpi: int = 200) -> list[str]:
"""
Convert PDF pages to PNG images using pdf2image.
Returns list of image paths.
"""
try:
from pdf2image import convert_from_path
pages = convert_from_path(pdf_path, dpi=dpi)
paths = []
stem = Path(pdf_path).stem
for i, page in enumerate(pages):
out_path = Path(output_dir) / f"{stem}_page{i+1:03d}.png"
page.save(str(out_path), "PNG")
paths.append(str(out_path))
log.info(f"PDF page {i+1} saved: {out_path}")
return paths
except ImportError:
log.warning("pdf2image not installed. Run: pip install pdf2image")
return []
except Exception as e:
log.error(f"PDF conversion failed: {e}")
return []
def dxf_text_extract(dxf_path: str) -> str:
"""
Directly extract all text entities from DXF file (faster, no image needed).
Returns concatenated text for pre-filtering.
"""
try:
import ezdxf
doc = ezdxf.readfile(dxf_path)
texts = []
for entity in doc.modelspace():
if entity.dxftype() in ("TEXT", "MTEXT", "ATTRIB", "ATTDEF"):
try:
txt = entity.dxf.text if hasattr(entity.dxf, "text") else ""
if txt.strip():
texts.append(txt.strip())
except Exception:
pass
return "\n".join(texts)
except Exception as e:
log.error(f"DXF text extraction failed: {e}")
return ""
# ─────────────────────────────────────────────
# Claude Vision Analyzer
# ─────────────────────────────────────────────
EXTRACTION_PROMPT = """You are an expert P&ID (Piping and Instrumentation Diagram) engineer.
Analyze this P&ID drawing image and extract ALL of the following items:
1. **P&ID Drawing Number** (도면번호) - usually found in title block
2. **Tag Numbers** (태그번호) - e.g. FT-1001, PT-2003, LT-1005, E-101, V-201
3. **Equipment Names** (장비명) - e.g. Heat Exchanger, Pump, Vessel, Compressor
4. **Instrument Types** (계기타입) - e.g. Flow Transmitter, Pressure Indicator, Level Controller
5. **Line Numbers** (라인번호) - e.g. 6\"-P-1001-A1A, 3\"-IA-2001
For each item found, return a JSON array with this exact structure:
[
{
"pid_drawing_no": "P&ID drawing number or sheet number",
"tag_no": "instrument or equipment tag (e.g. FT-1001)",
"equipment_name": "descriptive name in English (e.g. Flow Transmitter)",
"instrument_type": "ISA instrument type abbreviation (e.g. FT, PT, LT, E, V, P)",
"line_number": "pipe line number if associated",
"service_description": "brief service description if visible",
"confidence": 0.0 to 1.0 confidence score
}
]
Rules:
- Extract EVERY tag and instrument visible, do not skip any
- If a field is not visible/applicable, use empty string ""
- Return ONLY valid JSON array, no markdown, no explanation
- confidence: 1.0 = clearly readable, 0.5 = partially legible, 0.2 = guessed
"""
TEXT_EXTRACTION_PROMPT = """You are an expert P&ID engineer.
Below is raw text extracted from a DXF P&ID file.
Parse and extract ALL instrument tags, equipment tags, line numbers, and drawing info.
Text content:
{text_content}
Return a JSON array with this exact structure:
[
{{
"pid_drawing_no": "drawing number if found",
"tag_no": "tag number (e.g. FT-1001, E-101)",
"equipment_name": "equipment or instrument name",
"instrument_type": "ISA type abbreviation",
"line_number": "line number if found",
"service_description": "service description if found",
"confidence": 0.8
}}
]
Return ONLY valid JSON, no markdown.
"""
class PIDAnalyzer:
"""Claude-powered P&ID analyzer supporting both image and text modes."""
def __init__(self, api_key: Optional[str] = None):
self.client = anthropic.Anthropic(
api_key=api_key or os.environ.get("ANTHROPIC_API_KEY")
)
self.model = "claude-opus-4-20250514"
def analyze_image(self, image_path: str) -> list[PIDItem]:
"""Analyze a P&ID image using Claude Vision."""
log.info(f"Analyzing image: {image_path}")
with open(image_path, "rb") as f:
image_data = base64.standard_b64encode(f.read()).decode("utf-8")
# Detect media type
suffix = Path(image_path).suffix.lower()
media_map = {".png": "image/png", ".jpg": "image/jpeg",
".jpeg": "image/jpeg", ".gif": "image/gif",
".webp": "image/webp"}
media_type = media_map.get(suffix, "image/png")
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": image_data
}
},
{"type": "text", "text": EXTRACTION_PROMPT}
]
}]
)
raw = response.content[0].text
return self._parse_response(raw, source_file=image_path)
def analyze_dxf_text(self, dxf_path: str) -> list[PIDItem]:
"""Analyze DXF by extracting text entities and sending to Claude."""
log.info(f"Analyzing DXF text: {dxf_path}")
text_content = dxf_text_extract(dxf_path)
if not text_content.strip():
log.warning("No text found in DXF, falling back to image mode")
images = dxf_to_image(dxf_path)
results = []
for img in images:
results.extend(self.analyze_image(img))
return results
prompt = TEXT_EXTRACTION_PROMPT.format(
text_content=text_content[:8000] # token limit guard
)
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
)
raw = response.content[0].text
return self._parse_response(raw, source_file=dxf_path)
def _parse_response(self, raw: str, source_file: str) -> list[PIDItem]:
"""Parse Claude's JSON response into PIDItem list."""
try:
# Strip markdown fences if present
clean = re.sub(r"```(?:json)?|```", "", raw).strip()
# Find JSON array
match = re.search(r"\[.*\]", clean, re.DOTALL)
if not match:
log.warning("No JSON array found in response")
return []
data = json.loads(match.group())
items = []
for d in data:
if not isinstance(d, dict):
continue
item = PIDItem(
pid_drawing_no=d.get("pid_drawing_no", ""),
tag_no=d.get("tag_no", ""),
equipment_name=d.get("equipment_name", ""),
instrument_type=d.get("instrument_type", ""),
line_number=d.get("line_number", ""),
service_description=d.get("service_description", ""),
confidence=float(d.get("confidence", 0.5)),
source_file=Path(source_file).name,
)
items.append(item)
log.info(f"Extracted {len(items)} items from {Path(source_file).name}")
return items
except json.JSONDecodeError as e:
log.error(f"JSON parse error: {e}\nRaw: {raw[:500]}")
return []
# ─────────────────────────────────────────────
# CSV Exporter
# ─────────────────────────────────────────────
CSV_COLUMNS = [
"pid_drawing_no", "tag_no", "equipment_name", "instrument_type",
"line_number", "service_description", "confidence",
"source_file", "extracted_at"
]
def export_csv(items: list[PIDItem], output_path: str) -> str:
"""Export extracted items to CSV file."""
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
writer.writeheader()
for item in items:
writer.writerow(item.to_dict())
log.info(f"CSV saved: {output_path} ({len(items)} rows)")
return output_path
# ─────────────────────────────────────────────
# PostgreSQL Loader
# ─────────────────────────────────────────────
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS pid_equipment (
id SERIAL PRIMARY KEY,
pid_drawing_no VARCHAR(100),
tag_no VARCHAR(100),
equipment_name VARCHAR(255),
instrument_type VARCHAR(50),
line_number VARCHAR(100),
service_description TEXT,
confidence FLOAT,
source_file VARCHAR(255),
extracted_at TIMESTAMPTZ DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Useful indexes
CREATE INDEX IF NOT EXISTS idx_pid_tag_no ON pid_equipment(tag_no);
CREATE INDEX IF NOT EXISTS idx_pid_drawing_no ON pid_equipment(pid_drawing_no);
CREATE INDEX IF NOT EXISTS idx_pid_instrument_type ON pid_equipment(instrument_type);
"""
INSERT_SQL = """
INSERT INTO pid_equipment
(pid_drawing_no, tag_no, equipment_name, instrument_type,
line_number, service_description, confidence, source_file, extracted_at)
VALUES
(%(pid_drawing_no)s, %(tag_no)s, %(equipment_name)s, %(instrument_type)s,
%(line_number)s, %(service_description)s, %(confidence)s,
%(source_file)s, %(extracted_at)s)
ON CONFLICT DO NOTHING;
"""
def load_to_postgres(items: list[PIDItem], dsn: str) -> int:
"""
Load extracted items into PostgreSQL.
DSN format: postgresql://user:password@host:5432/dbname
Returns number of rows inserted.
"""
try:
import psycopg2
conn = psycopg2.connect(dsn)
cur = conn.cursor()
# Create table if needed
cur.execute(CREATE_TABLE_SQL)
# Insert rows
rows = [item.to_dict() for item in items]
cur.executemany(INSERT_SQL, rows)
conn.commit()
count = cur.rowcount
cur.close()
conn.close()
log.info(f"Inserted {count} rows into PostgreSQL")
return count
except ImportError:
log.error("psycopg2 not installed. Run: pip install psycopg2-binary")
return 0
except Exception as e:
log.error(f"PostgreSQL error: {e}")
return 0
# ─────────────────────────────────────────────
# AX (Asset Excellence) Formatter
# ─────────────────────────────────────────────
AX_COLUMN_MAP = {
"tag_no": "Tag Number",
"equipment_name": "Asset Description",
"instrument_type": "Equipment Class",
"pid_drawing_no": "P&ID Reference",
"line_number": "Line Reference",
"service_description": "Service",
}
def export_ax_excel(items: list[PIDItem], output_path: str) -> str:
"""
Export data in AX (Asset Excellence / Hexagon) compatible Excel format.
Columns mapped to typical AX field names.
"""
try:
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "AX_Import"
# Header style
header_fill = PatternFill("solid", fgColor="1F4E79")
header_font = Font(color="FFFFFF", bold=True, size=11)
header_align = Alignment(horizontal="center", vertical="center", wrap_text=True)
thin = Side(style="thin", color="CCCCCC")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
ax_columns = list(AX_COLUMN_MAP.values())
ax_columns.append("Confidence")
# Write header
for col_idx, col_name in enumerate(ax_columns, start=1):
cell = ws.cell(row=1, column=col_idx, value=col_name)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_align
cell.border = border
# Write data rows
for row_idx, item in enumerate(items, start=2):
row_data = [
item.tag_no,
item.equipment_name,
item.instrument_type,
item.pid_drawing_no,
item.line_number,
item.service_description,
item.confidence,
]
for col_idx, value in enumerate(row_data, start=1):
cell = ws.cell(row=row_idx, column=col_idx, value=value)
cell.border = border
cell.alignment = Alignment(vertical="center")
# Confidence color coding
if col_idx == len(ax_columns):
if isinstance(value, float):
if value >= 0.8:
cell.fill = PatternFill("solid", fgColor="C6EFCE") # green
elif value >= 0.5:
cell.fill = PatternFill("solid", fgColor="FFEB9C") # yellow
else:
cell.fill = PatternFill("solid", fgColor="FFC7CE") # red
# Column widths
col_widths = [20, 35, 25, 20, 20, 30, 12]
for i, w in enumerate(col_widths, start=1):
ws.column_dimensions[
openpyxl.utils.get_column_letter(i)
].width = w
ws.row_dimensions[1].height = 35
ws.freeze_panes = "A2"
wb.save(output_path)
log.info(f"AX Excel saved: {output_path} ({len(items)} rows)")
return output_path
except ImportError:
log.error("openpyxl not installed. Run: pip install openpyxl")
return ""
except Exception as e:
log.error(f"Excel export error: {e}")
return ""
# ─────────────────────────────────────────────
# Main Pipeline
# ─────────────────────────────────────────────
def run_pipeline(
input_files: list[str],
output_dir: str = "output",
db_dsn: Optional[str] = None,
use_image_mode: bool = False,
) -> dict:
"""
Full pipeline: files → AI extraction → CSV + Excel + optional DB load.
Args:
input_files: List of DXF or PDF file paths
output_dir: Directory for output files
db_dsn: PostgreSQL DSN (optional)
use_image_mode: Force image conversion even for DXF
Returns:
Summary dict with counts and output paths
"""
Path(output_dir).mkdir(parents=True, exist_ok=True)
Path("logs").mkdir(exist_ok=True)
analyzer = PIDAnalyzer()
all_items: list[PIDItem] = []
for file_path in input_files:
suffix = Path(file_path).suffix.lower()
log.info(f"Processing: {file_path}")
try:
if suffix == ".pdf":
image_paths = pdf_to_images(file_path, output_dir)
for img in image_paths:
all_items.extend(analyzer.analyze_image(img))
elif suffix == ".dxf":
if use_image_mode:
image_paths = dxf_to_image(file_path, output_dir)
for img in image_paths:
all_items.extend(analyzer.analyze_image(img))
else:
all_items.extend(analyzer.analyze_dxf_text(file_path))
elif suffix in (".png", ".jpg", ".jpeg"):
all_items.extend(analyzer.analyze_image(file_path))
else:
log.warning(f"Unsupported file type: {suffix}")
except Exception as e:
log.error(f"Failed processing {file_path}: {e}")
if not all_items:
log.warning("No items extracted from any file")
return {"total": 0, "csv": None, "excel": None, "db_rows": 0}
# Sort by drawing + tag
all_items.sort(key=lambda x: (x.pid_drawing_no, x.tag_no))
# Export CSV
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_path = str(Path(output_dir) / f"pid_extracted_{ts}.csv")
export_csv(all_items, csv_path)
# Export AX Excel
excel_path = str(Path(output_dir) / f"pid_AX_import_{ts}.xlsx")
export_ax_excel(all_items, excel_path)
# Load to DB
db_rows = 0
if db_dsn:
db_rows = load_to_postgres(all_items, db_dsn)
summary = {
"total": len(all_items),
"csv": csv_path,
"excel": excel_path,
"db_rows": db_rows,
"files_processed": len(input_files),
}
log.info(f"Pipeline complete: {summary}")
return summary
# ─────────────────────────────────────────────
# CLI Entry Point
# ─────────────────────────────────────────────
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="P&ID Extractor: DXF/PDF → CSV/Excel/PostgreSQL"
)
parser.add_argument("files", nargs="+", help="DXF or PDF file paths")
parser.add_argument("--output-dir", default="output", help="Output directory")
parser.add_argument("--db-dsn", help="PostgreSQL DSN (optional)")
parser.add_argument("--image-mode", action="store_true",
help="Force DXF → image conversion (slower but more accurate)")
args = parser.parse_args()
result = run_pipeline(
input_files=args.files,
output_dir=args.output_dir,
db_dsn=args.db_dsn,
use_image_mode=args.image_mode,
)
print("\n===== Extraction Summary =====")
for k, v in result.items():
print(f" {k}: {v}")