# Step 6 — PII scrubbing + style tags (no Harmony here)
import json, re, unicodedata
from pathlib import Path
# --- Normalization helpers ---
HYPHENS = dict.fromkeys(map(ord, "‐-‒–—―﹘﹣-"), ord("-")) # map unicode hyphens → ASCII
def normalize(s: str) -> str:
if not isinstance(s, str): return s
s = unicodedata.normalize("NFKC", s)
s = s.translate(HYPHENS)
return s
# --- PII patterns (illustrative; tune for production) ---
RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
# KR mobile numbers with spaces/hyphens: 010-1234-5678, 010 1234 5678, etc.
RE_PHONE = re.compile(r"\b01[016789][-\s]?\d{3,4}[-\s]?\d{4}\b")
# Korean RRN (주민등록번호) basic pattern
RE_RRN = re.compile(r"\b\d{6}-\d{7}\b")
# Bank-ish account numbers: strictly digits in groups (avoid codes with letters)
RE_ACCOUNT = re.compile(r"\b\d{2,3}-\d{2,4}-\d{3,6}\b")
# Very simple postal address cue (city names) – conservative, just redact the token (optional)
RE_CITY = re.compile(r"(서울특별시|부산광역시|대구광역시|인천광역시|광주광역시|대전광역시|울산광역시|세종특별자치시|경기도|강원도|충청북도|충청남도|전라북도|전라남도|경상북도|경상남도|제주특별자치도)")
# Allowlist: things that look like PII but aren’t (e.g., bill/order codes w/ letters)
def looks_like_code(s: str) -> bool:
return bool(re.search(r"[A-Za-z]", s)) # if letters present, treat as code, not account/phone
# Order of application matters (longest/most specific first sometimes helps)
SCRUBBERS = [
("[RRN]", RE_RRN),
("[EMAIL]", RE_EMAIL),
("[PHONE]", RE_PHONE),
("[ACCOUNT]", RE_ACCOUNT),
("[CITY]", RE_CITY), # optional; comment out if you don't want to redact city tokens
]
def scrub_text(text: str) -> tuple[str, dict]:
"""Return (scrubbed_text, hits_dict). Avoid false positives with basic allowlisting."""
if not isinstance(text, str) or not text:
return text, {}
orig = text
text = normalize(text)
hits = {}
# Guard account-like and phone-like strings that contain letters (likely codes)
guarded = set()
for m in RE_ACCOUNT.finditer(text):
if looks_like_code(m.group(0)):
guarded.add(m.span())
for m in RE_PHONE.finditer(text):
if looks_like_code(m.group(0)):
guarded.add(m.span())
# Apply scrubs
for label, pattern in SCRUBBERS:
out = []
last = 0
count = 0
for m in pattern.finditer(text):
span = m.span()
if pattern in (RE_ACCOUNT, RE_PHONE) and span in guarded:
continue
out.append(text[last:span[0]])
out.append(label)
last = span[1]
count += 1
out.append(text[last:])
text = "".join(out)
if count:
hits[label] = hits.get(label, 0) + count
return text, hits if text != orig else {}
def scrub_record(rec: dict, kind: str) -> tuple[dict, dict]:
"""Scrub fields in a news/chat record; return (new_rec, hits)."""
rec = dict(rec) # shallow copy
total_hits = {}
def scrub_field(key):
val = rec.get(key)
new, hits = scrub_text(val) if isinstance(val, str) else (val, {})
rec[key] = new
for k, v in hits.items():
total_hits[k] = total_hits.get(k, 0) + v
if kind == "news":
for key in ("title", "summary", "topic"):
scrub_field(key)
elif kind == "chat":
scrub_field("style")
if isinstance(rec.get("dialog"), list):
cleaned_dialog = []
for turn in rec["dialog"]:
new, hits = scrub_text(turn) if isinstance(turn, str) else (turn, {})
cleaned_dialog.append(new)
for k, v in hits.items():
total_hits[k] = total_hits.get(k, 0) + v
rec["dialog"] = cleaned_dialog
return rec, total_hits
# --- Style tagger (lightweight labels for later routing/metrics) ---
def build_style_tags(rec: dict, kind: str) -> list[str]:
tags = []
if kind == "news":
tags.append("domain:" + (rec.get("topic") or "unknown"))
tags.append("style:" + (rec.get("style") or "news"))
tags.append("tone:formal")
tags.append("medium:news")
elif kind == "chat":
style = (rec.get("style") or "").lower()
tags.append("style:" + (style or "chat"))
tags.append("tone:" + ("formal" if "formal" in style else "casual"))
tags.append("medium:kakao")
return [t.replace(" ", "_") for t in tags]
# --- Process files ---
def process_file(src: str, dst: str, kind: str):
total = 0
redacted = 0
counters = {}
with open(src, encoding="utf-8") as fin, open(dst, "w", encoding="utf-8") as fout:
for line in fin:
if not line.strip(): continue
rec = json.loads(line)
total += 1
cleaned, hits = scrub_record(rec, kind)
cleaned["style_tags"] = build_style_tags(cleaned, kind)
cleaned["_pii_hits"] = hits # keep for inspection; drop later if you want
if hits: redacted += 1
for k, v in hits.items():
counters[k] = counters.get(k, 0) + v
fout.write(json.dumps(cleaned, ensure_ascii=False) + "\n")
print(f"{src} -> {dst} | rows: {total}, redacted_rows: {redacted}, hits: {counters}")
process_file("data/news.jsonl", "data/news_clean.jsonl", kind="news")
process_file("data/chat.jsonl", "data/chat_clean.jsonl", kind="chat")