import os import json import importlib import html import gradio as gr import checker APP_TITLE = "Fair Housing Image + Text Compliance Checker" APP_DESC = ( "Upload an image (flyer) and/or paste text. The tool OCRs the image (if provided), " "highlights potential Fair Housing risks, and verifies brand/agent balance and disclaimer requirements. " "This is not legal advice." ) CATEGORY_COLORS = { "Familial status": "#e57373", "Religion": "#64b5f6", "Disability": "#81c784", "Sex": "#ba68c8", "Race or color": "#4db6ac", "National origin": "#ffd54f", "Other preference": "#90a4ae", } STYLE_BLOCK = """ """ def _build_legend(categories: set[str]) -> str: parts = ["
"] for cat in sorted(categories): color = CATEGORY_COLORS.get(cat, "#bdbdbd") parts.append(f"{html.escape(cat)}") parts.append("
") return "".join(parts) def _highlight_html(text: str, spans: list[tuple[int,int,str]], cats: set[str]) -> str: if not spans: return STYLE_BLOCK + f"
{html.escape(text or '')}
" text = text or "" spans = sorted(spans, key=lambda x: x[0]) cur = 0 out = [STYLE_BLOCK, _build_legend(cats), "
"] for s, e, cat in spans: if s > cur: out.append(html.escape(text[cur:s])) frag = html.escape(text[s:e]) color = CATEGORY_COLORS.get(cat, "#bdbdbd") out.append( f"" f"{frag}{html.escape(cat)}" ) cur = e if cur < len(text): out.append(html.escape(text[cur:])) out.append("
") return "".join(out) def _parse_company_phones(s: str): if not s: return checker.COMPANY_PHONES_DEFAULT try: data = json.loads(s) if isinstance(data, list): return [str(x) for x in data] except Exception: pass parts = [p.strip() for p in s.replace("\n", ",").split(",") if p.strip()] return parts or checker.COMPANY_PHONES_DEFAULT def _ensure_checker_reloaded(enable_ml: bool, hf_repo: str, hf_thresh: float, req_disc_non_social: bool, phrases_path: str): need_reload = False phrases_path = (phrases_path or str(checker.PHRASES_PATH)).strip() if bool(enable_ml) != bool(checker.USE_TINY_ML): os.environ["USE_TINY_ML"] = "1" if enable_ml else "0" need_reload = True if (hf_repo or checker.HF_REPO) != checker.HF_REPO: os.environ["HF_REPO"] = (hf_repo or checker.HF_REPO).strip() need_reload = True if float(hf_thresh) != float(checker.HF_THRESH): os.environ["HF_THRESH"] = str(float(hf_thresh)) need_reload = True if bool(req_disc_non_social) != bool(checker.REQUIRE_DISCLAIMER_ON_NON_SOCIAL): os.environ["REQUIRE_DISCLAIMER_ON_NON_SOCIAL"] = "1" if req_disc_non_social else "0" need_reload = True if phrases_path and str(phrases_path) != str(checker.PHRASES_PATH): os.environ["PHRASES_PATH"] = phrases_path need_reload = True if need_reload: importlib.reload(checker) return need_reload def _build_report(findings: list[dict]) -> str: if not findings: return "No obvious risk phrases found by the rules engine." rows = [] for f in findings: sug = ", ".join(f.get("suggestions") or []) if f.get("suggestions") else "N/A" rows.append( f"- **{f['category']}** → “{f['match']}”\n" f" \n _Context_: …{f['context']}…\n" f" \n _Suggestions_: {sug}\n" ) return "### Potential issues\n" + "\n".join(rows) def on_run(image, ptxt, social, agent_name, agent_phone, company_name, company_phones_json, disclaimer, enable_ml, hf_repo, hf_thresh, req_disc_non_social, phrases_path): reloaded = _ensure_checker_reloaded( enable_ml=bool(enable_ml), hf_repo=(hf_repo or checker.HF_REPO).strip(), hf_thresh=float(hf_thresh or checker.HF_THRESH), req_disc_non_social=bool(req_disc_non_social), phrases_path=(phrases_path or str(checker.PHRASES_PATH)).strip(), ) company_name = (company_name or checker.COMPANY_NAME_DEFAULT).strip() company_phones = _parse_company_phones(company_phones_json) disclaimer = (disclaimer or checker.DISCLAIMER_DEFAULT).strip() results = checker.run_check( image=image, ptxt=ptxt or "", social=bool(social), agent_name=agent_name or "", agent_phone=agent_phone or "", company_name=company_name, company_phones=company_phones, disclaimer=disclaimer, require_disclaimer_on_non_social=None, ) fh_ok = results.get("Fair_Housing", {}).get("compliant", True) img_ok = results.get("img", {}).get("compliant", True) ptxt_ok = results.get("Ptxt", {}).get("compliant", True) summary = [] summary.append(f"Fair Housing (rules+ML): {'OK' if fh_ok else 'Needs review'}") summary.append(f"Image text balance/disclaimer: {'OK' if img_ok else 'Needs review'}") summary.append(f"Post text balance/disclaimer: {'OK' if ptxt_ok else 'Needs review'}") diag = results.get("Diagnostics", {}) badge = ( f"Tiny ML: {diag.get('USE_TINY_ML')} | Repo: {diag.get('HF_REPO')} | " f"Thresh: {diag.get('HF_THRESH')} | Phrases: {diag.get('PhrasesLoaded')} | " f"DisclaimerOnNonSocial: {diag.get('DisclaimerRequiredOnNonSocial')}" ) rm = results.get("RuleMatches", {}) ptxt_findings = (rm.get("ptxt") or {}).get("findings") or [] ptxt_spans = (rm.get("ptxt") or {}).get("spans") or [] ptxt_cats = {f["category"] for f in ptxt_findings} marked_html_ptxt = _highlight_html(ptxt or "", ptxt_spans, ptxt_cats) report_ptxt = _build_report(ptxt_findings) img_findings = (rm.get("img") or {}).get("findings") or [] img_spans = (rm.get("img") or {}).get("spans") or [] # Build OCR text by re-running OCR or by reconstructing from spans. We'll reuse run_check's OCR text via spans length: # If spans exist but you want to show the raw OCR text, you need it. We can derive it by calling ocr_image again if needed. # However, results already computed OCR; to access the text, call checker.ocr_image(image) again safely: ocr_text = checker.ocr_image(image) if image is not None else "" img_cats = {f["category"] for f in img_findings} marked_html_img = _highlight_html(ocr_text, img_spans, img_cats) report_img = _build_report(img_findings) return "\n".join(summary), json.dumps(results, indent=2), badge, reloaded, marked_html_ptxt, report_ptxt, marked_html_img, report_img with gr.Blocks(title=APP_TITLE) as demo: gr.Markdown(f"# {APP_TITLE}\n{APP_DESC}") with gr.Row(): with gr.Column(): image = gr.Image(label="Image (optional)", type="pil") with gr.Column(): ptxt = gr.Textbox(label="Post / Listing Text", lines=8, placeholder="Paste the text to check…") social = gr.Checkbox(label="This is a social post (no long-form disclaimer required)", value=True) agent_name = gr.Textbox(label="Agent Name", placeholder="Jane Doe") agent_phone = gr.Textbox(label="Agent Phone", placeholder="(706) 555-1234") summary_out = gr.Textbox(label="Summary", lines=4) with gr.Accordion("Advanced (Tiny ML, Rules, Disclaimer toggle)", open=False): with gr.Row(): company_name = gr.Textbox(label="Company Name", value=checker.COMPANY_NAME_DEFAULT) company_phones_json = gr.Textbox( label="Company Phones (JSON array or comma-separated)", value=json.dumps(checker.COMPANY_PHONES_DEFAULT), ) disclaimer = gr.Textbox(label="Disclaimer (used on non-social)", value=checker.DISCLAIMER_DEFAULT, lines=4) with gr.Row(): enable_ml = gr.Checkbox(label="Enable Tiny HF Classifier (CPU-friendly)", value=checker.USE_TINY_ML) req_disc_non_social = gr.Checkbox( label="Require Disclaimer on Non-Social Content", value=checker.REQUIRE_DISCLAIMER_ON_NON_SOCIAL, ) with gr.Row(): hf_repo = gr.Textbox(label="HF_REPO", value=checker.HF_REPO) hf_thresh = gr.Slider(label="HF_THRESH", minimum=0.5, maximum=0.99, step=0.01, value=checker.HF_THRESH) phrases_path = gr.Textbox(label="PHRASES_PATH", value=str(checker.PHRASES_PATH)) with gr.Row(): results_json = gr.Code(label="Results JSON") with gr.Row(): diag_badge = gr.Textbox(label="Diagnostics", lines=1) reloaded_flag = gr.Checkbox(label="Module reloaded this run", interactive=False) gr.Markdown( "If you change any of these, the backend module will hot‑reload. On Spaces, ensure requirements include transformers + pyyaml." ) with gr.Row(): marked_html_ptxt = gr.HTML(label="Highlighted text (Post)") with gr.Row(): report_ptxt = gr.Markdown(label="Report (Post)") with gr.Row(): marked_html_img = gr.HTML(label="Highlighted text (OCR Image)") with gr.Row(): report_img = gr.Markdown(label="Report (OCR Image)") run_btn = gr.Button("Run Compliance Check", variant="primary") run_btn.click( fn=on_run, inputs=[ image, ptxt, social, agent_name, agent_phone, company_name, company_phones_json, disclaimer, enable_ml, hf_repo, hf_thresh, req_disc_non_social, phrases_path, ], outputs=[summary_out, results_json, diag_badge, reloaded_flag, marked_html_ptxt, report_ptxt, marked_html_img, report_img], ) if __name__ == "__main__": demo.queue(max_size=16).launch()