from __future__ import annotations from pathlib import Path from typing import List, Optional, Dict, Any, Iterable, Union, Tuple import os import re import json import smtplib import threading from email.mime.text import MIMEText from dataclasses import dataclass try: from PIL import Image # type: ignore except Exception: Image = None try: import pytesseract # type: ignore except Exception: pytesseract = None try: import yaml # type: ignore except Exception: yaml = None COMPANY_NAME_DEFAULT = "Berkshire Hathaway HomeServices Beazley, REALTORS" COMPANY_PHONES_DEFAULT = ["7068631775", "8032337111"] DISCLAIMER_DEFAULT = ( "©2025 BHH Affiliates, LLC. An independently owned and operated franchisee of BHH Affiliates, LLC. " "Berkshire Hathaway HomeServices and the Berkshire Hathaway HomeServices symbol are registered service marks " "of Columbia Insurance Company, a Berkshire Hathaway affiliate. Equal Housing Opportunity." ) REQUIRE_DISCLAIMER_ON_NON_SOCIAL = os.getenv("REQUIRE_DISCLAIMER_ON_NON_SOCIAL", "1") == "1" USE_TINY_ML = os.getenv("USE_TINY_ML", "1") == "1" HF_REPO = os.getenv("HF_REPO", "tlogandesigns/fairhousing-bert-tiny") HF_THRESH = float(os.getenv("HF_THRESH", "0.75")) ML_POSITIVE_LABELS = { s.strip().lower() for s in re.split(r"\s*,\s*", os.getenv("ML_POSITIVE_LABELS", "Potential Violation,violation,positive,LABEL_1,1")) if s.strip() } BASE_DIR = Path(__file__).parent PHRASES_PATH = Path(os.getenv("PHRASES_PATH", str(BASE_DIR / "phrases.yaml"))) EMAIL_ON_FAILURE = os.getenv("EMAIL_ON_FAILURE", "0") == "1" SMTP_SERVER = os.getenv("SMTP_SERVER") SMTP_PORT = int(os.getenv("SMTP_PORT", 587)) SMTP_USER = os.getenv("SMTP_USER") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD") EMAIL_RECIPIENT = os.getenv("EMAIL_RECIPIENT") PHONE_RE = re.compile(r"\+?1?\D*([2-9]\d{2})\D*(\d{3})\D*(\d{4})") def normalize_phone(s: str) -> str: digits = re.sub(r"\D", "", s or "") if len(digits) == 11 and digits.startswith("1"): digits = digits[1:] return digits def count_phone_instances(text: str, target_numbers: Iterable[str]) -> int: targets = {normalize_phone(n) for n in (target_numbers or []) if n} count = 0 for m in PHONE_RE.finditer(text or ""): num = "".join(m.groups()) if num in targets: count += 1 return count def escape_name_regex(name: str) -> str: parts = [re.escape(p) for p in (name or "").split() if p] if not parts: return r"" return r"\b" + r"[\s\-.,]+".join(parts) + r"\b" def count_name_instances(text: str, name: str) -> int: if not (name or "").strip(): return 0 pattern = re.compile(escape_name_regex(name), re.IGNORECASE) return len(pattern.findall(text or "")) def contains_disclaimer(text: str, disclaimer: str) -> bool: if not disclaimer: return False def squeeze(s: str) -> str: return re.sub(r"\s+", " ", s or "").strip().lower() return squeeze(disclaimer) in squeeze(text) @dataclass class Rule: regex: re.Pattern category: str suggests: list[str] PHRASE_RULES: list[Rule] = [] PHRASES_ERROR: Optional[str] = None if yaml: try: text = Path(PHRASES_PATH).read_text(encoding="utf-8") data = yaml.safe_load(text) or {} if isinstance(data, dict) and "categories" in data: cats = data["categories"] or {} for cat_name, cfg in cats.items(): if not isinstance(cfg, dict): continue pats = cfg.get("patterns") or [] suggests = cfg.get("suggest") or [] for rx in pats: if isinstance(rx, str): PHRASE_RULES.append( Rule( regex=re.compile(rx, re.IGNORECASE), category=str(cat_name), suggests=[str(s) for s in suggests if isinstance(s, str)], ) ) else: pats = (data or {}).get("patterns") or [] for rx in pats: if isinstance(rx, str): PHRASE_RULES.append( Rule( regex=re.compile(rx, re.IGNORECASE), category="Uncategorized", suggests=[], ) ) except FileNotFoundError: PHRASES_ERROR = f"phrases.yaml not found at {PHRASES_PATH}" except Exception as e: PHRASES_ERROR = f"phrases.yaml load/parse error: {e}" hf_pipe = None if USE_TINY_ML: try: os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") from transformers import pipeline # type: ignore hf_pipe = pipeline("text-classification", model=HF_REPO) try: import torch torch.set_grad_enabled(False) try: threads = max(1, (os.cpu_count() or 2) // 2) torch.set_num_threads(threads) except Exception: pass try: from torch.ao.quantization import quantize_dynamic hf_pipe.model.eval() hf_pipe.model = quantize_dynamic(hf_pipe.model, {torch.nn.Linear}, dtype=torch.qint8) except Exception: pass except Exception: pass try: _ = hf_pipe("warmup") except Exception: pass except Exception as e: raise RuntimeError( f"USE_TINY_ML=1 but Transformers/model failed to load: {e}. " "Check requirements.txt, apt.txt, HF_REPO, and network." ) def _violation_score(pipe, text: str) -> float: try: preds = pipe(text, return_all_scores=True) scores = {str(d["label"]).lower(): float(d["score"]) for d in preds[0]} except TypeError: preds = pipe(text) if isinstance(preds, list) and preds: p = preds[0] label = str(p.get("label", "")).lower() score = float(p.get("score", 0.0)) if label in ML_POSITIVE_LABELS: return score return score return 0.0 except Exception: return 0.0 for name in ML_POSITIVE_LABELS: if name in scores: return scores[name] if "non-violation" in scores: return 1.0 - scores["non-violation"] candidates = {k: v for k, v in scores.items() if any(tok in k for tok in ("violat", "posit", "flag", "risk", "unsafe", "toxic"))} if candidates: return max(candidates.values()) return max(scores.values()) if scores else 0.0 def fair_housing_flags(text: str) -> List[str]: flags: List[str] = [] t = (text or "")[:1500] for rule in PHRASE_RULES: for _m in rule.regex.finditer(t): if rule.suggests: for s in rule.suggests: flags.append(f"{rule.category}: {s}") else: flags.append(rule.category) if hf_pipe: try: score = _violation_score(hf_pipe, t) if score >= HF_THRESH: flags.append(f"MLFlag: model={HF_REPO} score={score:.2f}") except Exception as e: flags.append(f"MLFlag: inference error: {e}") return flags def evaluate_section( text: str, social: bool, company_name: str, company_phones: List[str], agent_name: str, agent_phone: str, disclaimer: str, require_disclaimer_on_non_social: bool, ) -> Dict[str, Any]: flags: List[str] = [] company_name_count = count_name_instances(text, company_name) agent_name_count = count_name_instances(text, agent_name) office_phone_count = count_phone_instances(text, company_phones) agent_phone_count = count_phone_instances(text, [agent_phone] if agent_phone else []) name_equal = company_name_count == agent_name_count phone_equal = office_phone_count == agent_phone_count disclaimer_ok = True if (not social) and require_disclaimer_on_non_social: disclaimer_ok = contains_disclaimer(text, disclaimer) if not disclaimer_ok: flags.append("Missing disclaimer on non-social content") if not name_equal: flags.append( f"Name imbalance: company={company_name_count} vs agent={agent_name_count}" ) if not phone_equal: flags.append( f"Phone imbalance: office={office_phone_count} vs agent={agent_phone_count}" ) compliant = name_equal and phone_equal and disclaimer_ok return { "compliant": compliant, "Flags": flags, } def ocr_image(image: Union["Image.Image", bytes, None]) -> str: if image is None or pytesseract is None: return "" try: if isinstance(image, bytes): if Image is None: return "" from io import BytesIO image = Image.open(BytesIO(image)).convert("RGB") if Image is not None: img = image.copy() try: img.thumbnail((1600, 1600)) except Exception: pass try: return pytesseract.image_to_string(img, config="--psm 6 -l eng") # type: ignore[arg-type] except Exception: return pytesseract.image_to_string(img) # type: ignore[arg-type] return pytesseract.image_to_string(image) # type: ignore[arg-type] except Exception: return "" def find_rule_matches(text: str) -> Tuple[List[Dict[str, Any]], List[Tuple[int, int, str]]]: text = text or "" findings: List[Dict[str, Any]] = [] spans: List[Tuple[int, int, str]] = [] for rule in PHRASE_RULES: for m in rule.regex.finditer(text): s, e = m.span() snippet = text[max(0, s - 40): min(len(text), e + 40)] findings.append({ "category": rule.category, "match": m.group(0), "start": s, "end": e, "context": snippet, "suggestions": (rule.suggests or [])[:3], }) spans.append((s, e, rule.category)) return findings, spans def send_email_notification(results: Dict[str, Any]): if not EMAIL_ON_FAILURE or not SMTP_SERVER or not EMAIL_RECIPIENT: return is_compliant = ( results.get("Fair_Housing", {}).get("compliant", True) and results.get("img", {}).get("compliant", True) and results.get("Ptxt", {}).get("compliant", True) ) if is_compliant: return subject = "Compliance Check Failed" body = f""" A compliance check has failed. Results: {json.dumps(results, indent=2)} """ msg = MIMEText(body) msg["Subject"] = subject msg["From"] = SMTP_USER or "noreply@example.com" msg["To"] = EMAIL_RECIPIENT def _worker(): try: with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: server.starttls() if SMTP_USER and SMTP_PASSWORD: server.login(SMTP_USER, SMTP_PASSWORD) server.sendmail(SMTP_USER or "noreply@example.com", [EMAIL_RECIPIENT], msg.as_string()) except Exception: pass threading.Thread(target=_worker, daemon=True).start() def run_check( image: Optional["Image.Image"], ptxt: str, social: bool, agent_name: str, agent_phone: str, *, company_name: str = COMPANY_NAME_DEFAULT, company_phones: Optional[List[str]] = None, disclaimer: str = DISCLAIMER_DEFAULT, require_disclaimer_on_non_social: Optional[bool] = None, ) -> Dict[str, Any]: company_phones = company_phones or COMPANY_PHONES_DEFAULT if require_disclaimer_on_non_social is None: require_disclaimer_on_non_social = REQUIRE_DISCLAIMER_ON_NON_SOCIAL itxt = ocr_image(image) ptxt = (ptxt or "")[:1500] content = "\n\n".join(x for x in [itxt, ptxt, f"Social={social}"] if x) fh_flags = fair_housing_flags(content) fair_housing_block = {"compliant": len(fh_flags) == 0, "Flags": fh_flags} img_block = evaluate_section( text=itxt, social=social, company_name=company_name, company_phones=company_phones, agent_name=agent_name, agent_phone=agent_phone, disclaimer=disclaimer, require_disclaimer_on_non_social=require_disclaimer_on_non_social, ) ptxt_block = evaluate_section( text=ptxt, social=social, company_name=company_name, company_phones=company_phones, agent_name=agent_name, agent_phone=agent_phone, disclaimer=disclaimer, require_disclaimer_on_non_social=require_disclaimer_on_non_social, ) img_findings, img_spans = find_rule_matches(itxt) ptxt_findings, ptxt_spans = find_rule_matches(ptxt) model_labels = [] try: if hf_pipe is not None and hasattr(hf_pipe, "model") and hasattr(hf_pipe.model, "config"): labels_map = getattr(hf_pipe.model.config, "id2label", {}) or {} model_labels = list(labels_map.values()) except Exception: model_labels = [] results = { "Fair_Housing": fair_housing_block, "img": img_block, "Ptxt": ptxt_block, "RuleMatches": { "img": {"findings": img_findings, "spans": img_spans}, "ptxt": {"findings": ptxt_findings, "spans": ptxt_spans}, }, "Diagnostics": { "USE_TINY_ML": USE_TINY_ML, "HF_REPO": HF_REPO, "HF_THRESH": HF_THRESH, "PhrasesLoaded": len(PHRASE_RULES), "PhrasesPath": str(PHRASES_PATH), "PhrasesError": PHRASES_ERROR, "OCR": pytesseract is not None, "Categories": sorted({r.category for r in PHRASE_RULES}), "DisclaimerRequiredOnNonSocial": REQUIRE_DISCLAIMER_ON_NON_SOCIAL, "ModelLabels": model_labels, "MLPositiveLabels": sorted(list(ML_POSITIVE_LABELS)), }, } send_email_notification(results) return results __all__ = [ "COMPANY_NAME_DEFAULT", "COMPANY_PHONES_DEFAULT", "DISCLAIMER_DEFAULT", "REQUIRE_DISCLAIMER_ON_NON_SOCIAL", "USE_TINY_ML", "HF_REPO", "HF_THRESH", "PHRASES_PATH", "count_phone_instances", "count_name_instances", "contains_disclaimer", "fair_housing_flags", "evaluate_section", "ocr_image", "find_rule_matches", "run_check", "send_email_notification", ]