Spaces:
Running
Running
import gradio as gr | |
import torch | |
from datetime import datetime | |
from dateutil import parser | |
from demo_assets import * | |
import re | |
categories = ['Contact related', 'Gathering additional information', 'Defining problem', | |
'Treatment goal', 'Drug related', 'Therapeutic procedure related', 'Evaluating test result', | |
'Deferment', 'Advice and precaution', 'Legal and insurance related'] | |
unicode_symbols = [ | |
"\U0001F91D", # Handshake | |
"\U0001F50D", # Magnifying glass | |
"\U0001F9E9", # Puzzle piece | |
"\U0001F3AF", # Target | |
"\U0001F48A", # Pill | |
"\U00002702", # Surgical scissors | |
"\U0001F9EA", # Test tube | |
"\U000023F0", # Alarm clock | |
"\U000026A0", # Warning sign | |
"\U0001F4C4" # Document | |
] | |
OTHERS_ID = 18 | |
def postprocess_labels(text, logits, t2c): | |
tags = [None for _ in text] | |
labels = logits.argmax(-1) | |
for i,cat in enumerate(labels): | |
if cat != OTHERS_ID: | |
char_ids = t2c(i) | |
if char_ids is None: | |
continue | |
for idx in range(char_ids.start, char_ids.end): | |
if tags[idx] is None and idx < len(text): | |
tags[idx] = categories[cat // 2] | |
for i in range(len(text)-1): | |
if text[i] == ' ' and (text[i+1] == ' ' or tags[i-1] == tags[i+1]): | |
tags[i] = tags[i-1] | |
return tags | |
def indicators_to_spans(labels, t2c = None): | |
def add_span(c, start, end): | |
if t2c(start) is None or t2c(end) is None: | |
start, end = -1, -1 | |
else: | |
start = t2c(start).start | |
end = t2c(end).end | |
span = (c, start, end) | |
spans.add(span) | |
spans = set() | |
num_tokens = len(labels) | |
num_classes = OTHERS_ID // 2 | |
start = None | |
cls = None | |
for t in range(num_tokens): | |
if start and labels[t] == cls + 1: | |
continue | |
elif start: | |
add_span(cls // 2, start, t - 1) | |
start = None | |
# if not start and labels[t] in [2*x for x in range(num_classes)]: | |
if not start and labels[t] != OTHERS_ID: | |
start = t | |
cls = int(labels[t]) // 2 * 2 | |
return spans | |
def extract_date(text): | |
pattern = r'(?<=Date: )\s*(\[\*\*.*?\*\*\]|\d{1,4}[-/]\d{1,2}[-/]\d{1,4})' | |
match = re.search(pattern, text).group(1) | |
start, end = None, None | |
for i, c in enumerate(match): | |
if start is None and c.isnumeric(): | |
start = i | |
elif c.isnumeric(): | |
end = i + 1 | |
match = match[start:end] | |
return match | |
def run_gradio(model, tokenizer): | |
def predict(text): | |
encoding = tokenizer.encode_plus(text) | |
x = torch.tensor(encoding['input_ids']).unsqueeze(0).to(device) | |
mask = torch.ones_like(x) | |
output = model.generate(x, mask)[0] | |
return output, encoding.token_to_chars | |
def process(text): | |
if text is not None: | |
output, t2c = predict(text) | |
tags = postprocess_labels(text, output, t2c) | |
with open('log.csv', 'a') as f: | |
f.write(f'{datetime.now()},{text}\n') | |
return list(zip(text, tags)) | |
else: | |
return text | |
def process_sum(*inputs): | |
global sum_c | |
dates = {} | |
for i in range(sum_c): | |
text = inputs[i] | |
output, t2c = predict(text) | |
spans = indicators_to_spans(output.argmax(-1), t2c) | |
date = extract_date(text) | |
present_decs = set(cat for cat, _, _ in spans) | |
decs = {k: [] for k in sorted(present_decs)} | |
for c, s, e in spans: | |
decs[c].append(text[s:e]) | |
dates[date] = decs | |
out = "" | |
for date in sorted(dates.keys(), key = lambda x: parser.parse(x)): | |
out += f'## **[{date}]**\n\n' | |
decs = dates[date] | |
for c in decs: | |
out += f'### {unicode_symbols[c]} ***{categories[c]}***\n\n' | |
for dec in decs[c]: | |
out += f'{dec}\n\n' | |
return out | |
global sum_c | |
sum_c = 1 | |
SUM_INPUTS = 20 | |
def update_inputs(inputs): | |
outputs = [] | |
if inputs is None: | |
c = 0 | |
else: | |
inputs = [open(f.name).read() for f in inputs] | |
for i, text in enumerate(inputs): | |
outputs.append(gr.update(value=text, visible=True)) | |
c = len(inputs) | |
n = SUM_INPUTS | |
for i in range(n - c): | |
outputs.append(gr.update(value='', visible=False)) | |
global sum_c; sum_c = c | |
return outputs | |
def add_ex(*inputs): | |
global sum_c | |
new_idx = sum_c | |
if new_idx < SUM_INPUTS: | |
out = inputs[:new_idx] + (gr.update(visible=True),) + inputs[new_idx+1:] | |
sum_c += 1 | |
else: | |
out = inputs | |
return out | |
def sub_ex(*inputs): | |
global sum_c | |
new_idx = sum_c - 1 | |
if new_idx > 0: | |
out = inputs[:new_idx] + (gr.update(visible=False),) + inputs[new_idx+1:] | |
sum_c -= 1 | |
else: | |
out = inputs | |
return out | |
device = model.backbone.device | |
# colors = ['aqua', 'blue', 'fuchsia', 'teal', 'green', 'olive', 'lime', 'silver', 'purple', 'red', | |
# 'yellow', 'navy', 'gray', 'white', 'maroon', 'black'] | |
colors = ['#8dd3c7', '#ffffb3', '#bebada', '#fb8072', '#80b1d3', '#fdb462', '#b3de69', '#fccde5', '#d9d9d9', '#bc80bd'] | |
color_map = {cat: colors[i] for i,cat in enumerate(categories)} | |
det_desc = ['Admit, discharge, follow-up, referral', | |
'Ordering test, consulting colleague, seeking external information', | |
'Diagnostic conclusion, evaluation of health state, etiological inference, prognostic judgment', | |
'Quantitative or qualitative', | |
'Start, stop, alter, maintain, refrain', | |
'Start, stop, alter, maintain, refrain', | |
'Positive, negative, ambiguous test results', | |
'Transfer responsibility, wait and see, change subject', | |
'Advice or precaution', | |
'Sick leave, drug refund, insurance, disability'] | |
desc = '### Zones (categories)\n' | |
desc += '| | |\n| --- | --- |\n' | |
for i,cat in enumerate(categories): | |
desc += f'| {unicode_symbols[i]} **{cat}** | {det_desc[i]}|\n' | |
#colors | |
#markdown labels | |
#legend and desc | |
#css font-size | |
css = '.category-legend {border:1px dashed black;}'\ | |
'.text-sm {font-size: 1.5rem; line-height: 200%;}'\ | |
'.gr-sample-textbox {width: 1000px; white-space: nowrap; overflow: hidden; text-overflow: ellipsis;}'\ | |
'.text-limit label textarea {height: 150px !important; overflow: scroll; }'\ | |
'.text-gray-500 {color: #111827; font-weight: 600; font-size: 1.25em; margin-top: 1.6em; margin-bottom: 0.6em;'\ | |
'line-height: 1.6;}'\ | |
'#sum-out {border: 2px solid #007bff; padding: 20px; border-radius: 10px; box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);' | |
title='Clinical Decision Zoning' | |
with gr.Blocks(title=title, css=css) as demo: | |
gr.Markdown(f'# {title}') | |
with gr.Tab("Label a Clinical Note"): | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("## Enter a Discharge Summary or Clinical Note"), | |
text_input = gr.Textbox( | |
# value=examples[0], | |
label="", | |
placeholder="Enter text here...") | |
text_btn = gr.Button('Run') | |
with gr.Column(): | |
gr.Markdown("## Labeled Summary or Note"), | |
text_out = gr.Highlight(label="", combine_adjacent=True, show_legend=False, color_map=color_map) | |
gr.Examples(text_examples, inputs=text_input) | |
with gr.Tab("Summarize Patient History"): | |
with gr.Row(): | |
with gr.Column(): | |
sum_inputs = [gr.Text(label='Clinical Note 1', elem_classes='text-limit')] | |
sum_inputs.extend([gr.Text(label='Clinical Note %d'%i, visible=False, elem_classes='text-limit') | |
for i in range(2, SUM_INPUTS + 1)]) | |
sum_btn = gr.Button('Run') | |
with gr.Row(): | |
ex_add = gr.Button("+") | |
ex_sub = gr.Button("-") | |
upload = gr.File(label='Upload clinical notes', file_type='text', file_count='multiple') | |
gr.Examples(sum_examples, inputs=upload, | |
fn = update_inputs, outputs=sum_inputs, run_on_click=True) | |
with gr.Column(): | |
gr.Markdown("## Summarized Clinical Decision History") | |
sum_out = gr.Markdown(elem_id='sum-out') | |
gr.Markdown(desc) | |
# Functions | |
text_input.submit(process, inputs=text_input, outputs=text_out) | |
text_btn.click(process, inputs=text_input, outputs=text_out) | |
upload.change(update_inputs, inputs=upload, outputs=sum_inputs) | |
ex_add.click(add_ex, inputs=sum_inputs, outputs=sum_inputs) | |
ex_sub.click(sub_ex, inputs=sum_inputs, outputs=sum_inputs) | |
sum_btn.click(process_sum, inputs=sum_inputs, outputs=sum_out) | |
# demo = gr.TabbedInterface([text_demo, sum_demo], ["Label a Clinical Note", "Summarize Patient History"]) | |
demo.launch(share=False) | |