Spaces:
Sleeping
Sleeping
| """ | |
| checker.py — core logic for Image + Text Compliance Check | |
| This module is UI-agnostic (no FastAPI/Gradio). Import its functions from | |
| app.py (Gradio) or an API layer. CPU-only; optional tiny HF classifier via env. | |
| """ | |
| from __future__ import annotations | |
| from typing import List, Optional, Dict, Any, Iterable, Union | |
| import os | |
| import re | |
| import json | |
| try: | |
| from PIL import Image # type: ignore | |
| except Exception: | |
| Image = None # Allows import without PIL when not doing OCR | |
| try: | |
| import pytesseract # type: ignore | |
| except Exception: | |
| pytesseract = None | |
| # ----------------------------- | |
| # Config & Constants | |
| # ----------------------------- | |
| COMPANY_NAME_DEFAULT = "Berkshire Hathaway HomeServices Beazley, REALTORS" | |
| COMPANY_PHONES_DEFAULT = ["7068631775", "8032337111"] | |
| DISCLAIMER_DEFAULT = ( | |
| "©2025 BHH Affiliates, LLC. An independently owned and operated franchisee of BHH Affiliates, LLC. " | |
| "Berkshire Hathaway HomeServices and the Berkshire Hathaway HomeServices symbol are registered service marks " | |
| "of Columbia Insurance Company, a Berkshire Hathaway affiliate. Equal Housing Opportunity." | |
| ) | |
| # Behavior toggle for social posts requiring disclaimer (choose True/False) | |
| REQUIRE_DISCLAIMER_ON_SOCIAL = os.getenv("REQUIRE_DISCLAIMER_ON_SOCIAL", "1") == "1" | |
| # Optional HF classifier (tiny) – set USE_TINY_ML=1 to enable | |
| USE_TINY_ML = os.getenv("USE_TINY_ML", "0") == "1" | |
| HF_REPO = os.getenv("HF_REPO", "tlogandesigns/fairhousing-bert-tiny") | |
| HF_THRESH = float(os.getenv("HF_THRESH", "0.75")) | |
| # Rule-based phrases file (optional). If present, we use it for flags. | |
| PHRASES_PATH = os.getenv("PHRASES_PATH", "phrases.yaml") | |
| # ----------------------------- | |
| # Utilities | |
| # ----------------------------- | |
| PHONE_RE = re.compile(r"\+?1?\D*([2-9]\d{2})\D*(\d{3})\D*(\d{4})") | |
| def normalize_phone(s: str) -> str: | |
| digits = re.sub(r"\D", "", s or "") | |
| if len(digits) == 11 and digits.startswith("1"): | |
| digits = digits[1:] | |
| return digits | |
| def count_phone_instances(text: str, target_numbers: Iterable[str]) -> int: | |
| targets = {normalize_phone(n) for n in (target_numbers or []) if n} | |
| count = 0 | |
| for m in PHONE_RE.finditer(text or ""): | |
| num = "".join(m.groups()) | |
| if num in targets: | |
| count += 1 | |
| return count | |
| def escape_name_regex(name: str) -> str: | |
| # Allow flexible whitespace and optional punctuation inside the name | |
| parts = [re.escape(p) for p in (name or "").split() if p] | |
| if not parts: | |
| return r"" # no name | |
| # Join with one-or-more whitespace OR punctuation between tokens | |
| return r"\b" + r"[\s\-.,]+".join(parts) + r"\b" | |
| def count_name_instances(text: str, name: str) -> int: | |
| if not (name or "").strip(): | |
| return 0 | |
| pattern = re.compile(escape_name_regex(name), re.IGNORECASE) | |
| return len(pattern.findall(text or "")) | |
| def contains_disclaimer(text: str, disclaimer: str) -> bool: | |
| if not disclaimer: | |
| return False | |
| # Relax matching a bit: compress whitespace in both | |
| def squeeze(s: str) -> str: | |
| return re.sub(r"\s+", " ", s or "").strip().lower() | |
| return squeeze(disclaimer) in squeeze(text) | |
| # ----------------------------- | |
| # Fair Housing Classifier (hybrid) | |
| # ----------------------------- | |
| try: | |
| import yaml # type: ignore | |
| except Exception: | |
| yaml = None | |
| PHRASE_PATTERNS: List[re.Pattern] = [] | |
| if yaml and os.path.exists(PHRASES_PATH): | |
| try: | |
| data = yaml.safe_load(open(PHRASES_PATH, "r", encoding="utf-8").read()) or {} | |
| for rx in data.get("patterns", []): | |
| # compile as case-insensitive | |
| PHRASE_PATTERNS.append(re.compile(rx, re.IGNORECASE)) | |
| except Exception as e: | |
| print("Failed loading phrases.yaml:", e) | |
| # Optional HF pipeline (disabled by default to keep CPU/lightweight) | |
| hf_pipe = None | |
| if USE_TINY_ML: | |
| try: | |
| from transformers import pipeline # type: ignore | |
| hf_pipe = pipeline("text-classification", model=HF_REPO) | |
| except Exception as e: | |
| print("HF model unavailable:", e) | |
| hf_pipe = None | |
| def fair_housing_flags(text: str) -> List[str]: | |
| flags: List[str] = [] | |
| t = text or "" | |
| # Rule-based first | |
| for pat in PHRASE_PATTERNS: | |
| for m in pat.finditer(t): | |
| snippet = t[max(0, m.start() - 30) : m.end() + 30] | |
| flags.append( | |
| f"RuleFlag: pattern '{pat.pattern}' matched around: {snippet!r}" | |
| ) | |
| # Optional tiny model | |
| if hf_pipe: | |
| try: | |
| pred = hf_pipe(t[:2000]) # keep it small | |
| # Expecting [{'label': 'LABEL_1'/'LABEL_0', 'score': 0.x}] or custom labels | |
| lbl = pred[0]["label"] | |
| score = float(pred[0]["score"]) | |
| # Assume LABEL_1 = potential violation (adjust to your model labels) | |
| if (lbl in ("1", "LABEL_1", "violation", "POSITIVE")) and score >= HF_THRESH: | |
| flags.append(f"MLFlag: model={HF_REPO} label={lbl} score={score:.2f}") | |
| except Exception as e: | |
| flags.append(f"MLFlag: inference error: {e}") | |
| return flags | |
| # ----------------------------- | |
| # Core evaluation logic | |
| # ----------------------------- | |
| def evaluate_section( | |
| text: str, | |
| social: bool, | |
| company_name: str, | |
| company_phones: List[str], | |
| agent_name: str, | |
| agent_phone: str, | |
| disclaimer: str, | |
| require_disclaimer_on_social: bool, | |
| ) -> Dict[str, Any]: | |
| flags: List[str] = [] | |
| # Counts | |
| company_name_count = count_name_instances(text, company_name) | |
| agent_name_count = count_name_instances(text, agent_name) | |
| office_phone_count = count_phone_instances(text, company_phones) | |
| agent_phone_count = count_phone_instances(text, [agent_phone] if agent_phone else []) | |
| # Equality checks | |
| name_equal = company_name_count == agent_name_count | |
| phone_equal = office_phone_count == agent_phone_count | |
| # Disclaimer logic | |
| disclaimer_ok = True | |
| if social and require_disclaimer_on_social: | |
| disclaimer_ok = contains_disclaimer(text, disclaimer) | |
| if not disclaimer_ok: | |
| flags.append("Missing disclaimer on social content") | |
| if not name_equal: | |
| flags.append( | |
| f"Name imbalance: company={company_name_count} vs agent={agent_name_count}" | |
| ) | |
| if not phone_equal: | |
| flags.append( | |
| f"Phone imbalance: office={office_phone_count} vs agent={agent_phone_count}" | |
| ) | |
| compliant = name_equal and phone_equal and disclaimer_ok | |
| return { | |
| "compliant": compliant, | |
| "Flags": flags, | |
| } | |
| # ----------------------------- | |
| # OCR helper (optional) | |
| # ----------------------------- | |
| def ocr_image(image: Union["Image.Image", bytes, None]) -> str: | |
| """OCR a PIL image or raw bytes. Returns empty string if OCR not available.""" | |
| if image is None or pytesseract is None: | |
| return "" | |
| try: | |
| if isinstance(image, bytes): | |
| if Image is None: | |
| return "" | |
| from io import BytesIO | |
| image = Image.open(BytesIO(image)).convert("RGB") | |
| return pytesseract.image_to_string(image) # type: ignore[arg-type] | |
| except Exception: | |
| return "" | |
| # ----------------------------- | |
| # Orchestration (UI-agnostic) | |
| # ----------------------------- | |
| def run_check( | |
| image: Optional["Image.Image"], | |
| ptxt: str, | |
| social: bool, | |
| agent_name: str, | |
| agent_phone: str, | |
| *, | |
| company_name: str = COMPANY_NAME_DEFAULT, | |
| company_phones: Optional[List[str]] = None, | |
| disclaimer: str = DISCLAIMER_DEFAULT, | |
| require_disclaimer_on_social: Optional[bool] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Execute full pipeline and return payload dict with keys: | |
| - Fair_Housing | |
| - img | |
| - Ptxt | |
| """ | |
| company_phones = company_phones or COMPANY_PHONES_DEFAULT | |
| if require_disclaimer_on_social is None: | |
| require_disclaimer_on_social = REQUIRE_DISCLAIMER_ON_SOCIAL | |
| itxt = ocr_image(image) | |
| # Compose combined content | |
| content = "\n\n".join(x for x in [itxt, ptxt or "", f"Social={social}"] if x) | |
| # Fair-housing flags on combined content | |
| fh_flags = fair_housing_flags(content) | |
| fair_housing_block = {"compliant": len(fh_flags) == 0, "Flags": fh_flags} | |
| # Evaluate image text section | |
| img_block = evaluate_section( | |
| text=itxt, | |
| social=social, | |
| company_name=company_name, | |
| company_phones=company_phones, | |
| agent_name=agent_name, | |
| agent_phone=agent_phone, | |
| disclaimer=disclaimer, | |
| require_disclaimer_on_social=require_disclaimer_on_social, | |
| ) | |
| # Evaluate post text section | |
| ptxt_block = evaluate_section( | |
| text=ptxt or "", | |
| social=social, | |
| company_name=company_name, | |
| company_phones=company_phones, | |
| agent_name=agent_name, | |
| agent_phone=agent_phone, | |
| disclaimer=disclaimer, | |
| require_disclaimer_on_social=require_disclaimer_on_social, | |
| ) | |
| return { | |
| "Fair_Housing": fair_housing_block, | |
| "img": img_block, | |
| "Ptxt": ptxt_block, | |
| } | |
| __all__ = [ | |
| "COMPANY_NAME_DEFAULT", | |
| "COMPANY_PHONES_DEFAULT", | |
| "DISCLAIMER_DEFAULT", | |
| "REQUIRE_DISCLAIMER_ON_SOCIAL", | |
| "USE_TINY_ML", | |
| "HF_REPO", | |
| "HF_THRESH", | |
| "PHRASES_PATH", | |
| "count_phone_instances", | |
| "count_name_instances", | |
| "contains_disclaimer", | |
| "fair_housing_flags", | |
| "evaluate_section", | |
| "ocr_image", | |
| "run_check", | |
| ] | |