""" checker.py — core logic for Image + Text Compliance Check This module is UI-agnostic (no FastAPI/Gradio). Import its functions from app.py (Gradio) or an API layer. CPU-only; optional tiny HF classifier via env. """ from __future__ import annotations from typing import List, Optional, Dict, Any, Iterable, Union import os import re import json try: from PIL import Image # type: ignore except Exception: Image = None # Allows import without PIL when not doing OCR try: import pytesseract # type: ignore except Exception: pytesseract = None # ----------------------------- # Config & Constants # ----------------------------- COMPANY_NAME_DEFAULT = "Berkshire Hathaway HomeServices Beazley, REALTORS" COMPANY_PHONES_DEFAULT = ["7068631775", "8032337111"] DISCLAIMER_DEFAULT = ( "©2025 BHH Affiliates, LLC. An independently owned and operated franchisee of BHH Affiliates, LLC. " "Berkshire Hathaway HomeServices and the Berkshire Hathaway HomeServices symbol are registered service marks " "of Columbia Insurance Company, a Berkshire Hathaway affiliate. Equal Housing Opportunity." ) # Behavior toggle for social posts requiring disclaimer (choose True/False) REQUIRE_DISCLAIMER_ON_SOCIAL = os.getenv("REQUIRE_DISCLAIMER_ON_SOCIAL", "1") == "1" # Optional HF classifier (tiny) – set USE_TINY_ML=1 to enable USE_TINY_ML = os.getenv("USE_TINY_ML", "0") == "1" HF_REPO = os.getenv("HF_REPO", "tlogandesigns/fairhousing-bert-tiny") HF_THRESH = float(os.getenv("HF_THRESH", "0.75")) # Rule-based phrases file (optional). If present, we use it for flags. PHRASES_PATH = os.getenv("PHRASES_PATH", "phrases.yaml") # ----------------------------- # Utilities # ----------------------------- PHONE_RE = re.compile(r"\+?1?\D*([2-9]\d{2})\D*(\d{3})\D*(\d{4})") def normalize_phone(s: str) -> str: digits = re.sub(r"\D", "", s or "") if len(digits) == 11 and digits.startswith("1"): digits = digits[1:] return digits def count_phone_instances(text: str, target_numbers: Iterable[str]) -> int: targets = {normalize_phone(n) for n in (target_numbers or []) if n} count = 0 for m in PHONE_RE.finditer(text or ""): num = "".join(m.groups()) if num in targets: count += 1 return count def escape_name_regex(name: str) -> str: # Allow flexible whitespace and optional punctuation inside the name parts = [re.escape(p) for p in (name or "").split() if p] if not parts: return r"" # no name # Join with one-or-more whitespace OR punctuation between tokens return r"\b" + r"[\s\-.,]+".join(parts) + r"\b" def count_name_instances(text: str, name: str) -> int: if not (name or "").strip(): return 0 pattern = re.compile(escape_name_regex(name), re.IGNORECASE) return len(pattern.findall(text or "")) def contains_disclaimer(text: str, disclaimer: str) -> bool: if not disclaimer: return False # Relax matching a bit: compress whitespace in both def squeeze(s: str) -> str: return re.sub(r"\s+", " ", s or "").strip().lower() return squeeze(disclaimer) in squeeze(text) # ----------------------------- # Fair Housing Classifier (hybrid) # ----------------------------- try: import yaml # type: ignore except Exception: yaml = None PHRASE_PATTERNS: List[re.Pattern] = [] if yaml and os.path.exists(PHRASES_PATH): try: data = yaml.safe_load(open(PHRASES_PATH, "r", encoding="utf-8").read()) or {} for rx in data.get("patterns", []): # compile as case-insensitive PHRASE_PATTERNS.append(re.compile(rx, re.IGNORECASE)) except Exception as e: print("Failed loading phrases.yaml:", e) # Optional HF pipeline (disabled by default to keep CPU/lightweight) hf_pipe = None if USE_TINY_ML: try: from transformers import pipeline # type: ignore hf_pipe = pipeline("text-classification", model=HF_REPO) except Exception as e: print("HF model unavailable:", e) hf_pipe = None def fair_housing_flags(text: str) -> List[str]: flags: List[str] = [] t = text or "" # Rule-based first for pat in PHRASE_PATTERNS: for m in pat.finditer(t): snippet = t[max(0, m.start() - 30) : m.end() + 30] flags.append( f"RuleFlag: pattern '{pat.pattern}' matched around: {snippet!r}" ) # Optional tiny model if hf_pipe: try: pred = hf_pipe(t[:2000]) # keep it small # Expecting [{'label': 'LABEL_1'/'LABEL_0', 'score': 0.x}] or custom labels lbl = pred[0]["label"] score = float(pred[0]["score"]) # Assume LABEL_1 = potential violation (adjust to your model labels) if (lbl in ("1", "LABEL_1", "violation", "POSITIVE")) and score >= HF_THRESH: flags.append(f"MLFlag: model={HF_REPO} label={lbl} score={score:.2f}") except Exception as e: flags.append(f"MLFlag: inference error: {e}") return flags # ----------------------------- # Core evaluation logic # ----------------------------- def evaluate_section( text: str, social: bool, company_name: str, company_phones: List[str], agent_name: str, agent_phone: str, disclaimer: str, require_disclaimer_on_social: bool, ) -> Dict[str, Any]: flags: List[str] = [] # Counts company_name_count = count_name_instances(text, company_name) agent_name_count = count_name_instances(text, agent_name) office_phone_count = count_phone_instances(text, company_phones) agent_phone_count = count_phone_instances(text, [agent_phone] if agent_phone else []) # Equality checks name_equal = company_name_count == agent_name_count phone_equal = office_phone_count == agent_phone_count # Disclaimer logic disclaimer_ok = True if social and require_disclaimer_on_social: disclaimer_ok = contains_disclaimer(text, disclaimer) if not disclaimer_ok: flags.append("Missing disclaimer on social content") if not name_equal: flags.append( f"Name imbalance: company={company_name_count} vs agent={agent_name_count}" ) if not phone_equal: flags.append( f"Phone imbalance: office={office_phone_count} vs agent={agent_phone_count}" ) compliant = name_equal and phone_equal and disclaimer_ok return { "compliant": compliant, "Flags": flags, } # ----------------------------- # OCR helper (optional) # ----------------------------- def ocr_image(image: Union["Image.Image", bytes, None]) -> str: """OCR a PIL image or raw bytes. Returns empty string if OCR not available.""" if image is None or pytesseract is None: return "" try: if isinstance(image, bytes): if Image is None: return "" from io import BytesIO image = Image.open(BytesIO(image)).convert("RGB") return pytesseract.image_to_string(image) # type: ignore[arg-type] except Exception: return "" # ----------------------------- # Orchestration (UI-agnostic) # ----------------------------- def run_check( image: Optional["Image.Image"], ptxt: str, social: bool, agent_name: str, agent_phone: str, *, company_name: str = COMPANY_NAME_DEFAULT, company_phones: Optional[List[str]] = None, disclaimer: str = DISCLAIMER_DEFAULT, require_disclaimer_on_social: Optional[bool] = None, ) -> Dict[str, Any]: """ Execute full pipeline and return payload dict with keys: - Fair_Housing - img - Ptxt """ company_phones = company_phones or COMPANY_PHONES_DEFAULT if require_disclaimer_on_social is None: require_disclaimer_on_social = REQUIRE_DISCLAIMER_ON_SOCIAL itxt = ocr_image(image) # Compose combined content content = "\n\n".join(x for x in [itxt, ptxt or "", f"Social={social}"] if x) # Fair-housing flags on combined content fh_flags = fair_housing_flags(content) fair_housing_block = {"compliant": len(fh_flags) == 0, "Flags": fh_flags} # Evaluate image text section img_block = evaluate_section( text=itxt, social=social, company_name=company_name, company_phones=company_phones, agent_name=agent_name, agent_phone=agent_phone, disclaimer=disclaimer, require_disclaimer_on_social=require_disclaimer_on_social, ) # Evaluate post text section ptxt_block = evaluate_section( text=ptxt or "", social=social, company_name=company_name, company_phones=company_phones, agent_name=agent_name, agent_phone=agent_phone, disclaimer=disclaimer, require_disclaimer_on_social=require_disclaimer_on_social, ) return { "Fair_Housing": fair_housing_block, "img": img_block, "Ptxt": ptxt_block, } __all__ = [ "COMPANY_NAME_DEFAULT", "COMPANY_PHONES_DEFAULT", "DISCLAIMER_DEFAULT", "REQUIRE_DISCLAIMER_ON_SOCIAL", "USE_TINY_ML", "HF_REPO", "HF_THRESH", "PHRASES_PATH", "count_phone_instances", "count_name_instances", "contains_disclaimer", "fair_housing_flags", "evaluate_section", "ocr_image", "run_check", ]