import numpy as np |
from numba import jit |
from tqdm import tqdm |
from scipy.optimize import linear_sum_assignment |
from skimage.measure import regionprops |
from collections import namedtuple |
from csbdeep.utils import _raise |
matching_criteria = dict() |
def label_are_sequential(y): |
""" returns true if y has only sequential labels from 1... """ |
labels = np.unique(y) |
return (set(labels)-{0}) == set(range(1,1+labels.max())) |
def is_array_of_integers(y): |
return isinstance(y,np.ndarray) and np.issubdtype(y.dtype, np.integer) |
def _check_label_array(y, name=None, check_sequential=False): |
err = ValueError("{label} must be an array of {integers}.".format( |
label = 'labels' if name is None else name, |
integers = ('sequential ' if check_sequential else '') + 'non-negative integers', |
)) |
is_array_of_integers(y) or _raise(err) |
if len(y) == 0: |
return True |
if check_sequential: |
label_are_sequential(y) or _raise(err) |
else: |
y.min() >= 0 or _raise(err) |
return True |
def label_overlap(x, y, check=True): |
if check: |
_check_label_array(x,'x',True) |
_check_label_array(y,'y',True) |
x.shape == y.shape or _raise(ValueError("x and y must have the same shape")) |
return _label_overlap(x, y) |
@jit(nopython=True) |
def _label_overlap(x, y): |
x = x.ravel() |
y = y.ravel() |
overlap = np.zeros((1+x.max(),1+y.max()), dtype=np.uint) |
for i in range(len(x)): |
overlap[x[i],y[i]] += 1 |
return overlap |
def _safe_divide(x,y, eps=1e-10): |
"""computes a safe divide which returns 0 if y is zero""" |
if np.isscalar(x) and np.isscalar(y): |
return x/y if np.abs(y)>eps else 0.0 |
else: |
out = np.zeros(np.broadcast(x,y).shape, np.float32) |
np.divide(x,y, out=out, where=np.abs(y)>eps) |
return out |
def intersection_over_union(overlap): |
_check_label_array(overlap,'overlap') |
if np.sum(overlap) == 0: |
return overlap |
n_pixels_pred = np.sum(overlap, axis=0, keepdims=True) |
n_pixels_true = np.sum(overlap, axis=1, keepdims=True) |
return _safe_divide(overlap, (n_pixels_pred + n_pixels_true - overlap)) |
matching_criteria['iou'] = intersection_over_union |
def intersection_over_true(overlap): |
_check_label_array(overlap,'overlap') |
if np.sum(overlap) == 0: |
return overlap |
n_pixels_true = np.sum(overlap, axis=1, keepdims=True) |
return _safe_divide(overlap, n_pixels_true) |
matching_criteria['iot'] = intersection_over_true |
def intersection_over_pred(overlap): |
_check_label_array(overlap,'overlap') |
if np.sum(overlap) == 0: |
return overlap |
n_pixels_pred = np.sum(overlap, axis=0, keepdims=True) |
return _safe_divide(overlap, n_pixels_pred) |
matching_criteria['iop'] = intersection_over_pred |
def precision(tp,fp,fn): |
return tp/(tp+fp) if tp > 0 else 0 |
def recall(tp,fp,fn): |
return tp/(tp+fn) if tp > 0 else 0 |
def accuracy(tp,fp,fn): |
return tp/(tp+fp+fn) if tp > 0 else 0 |
def f1(tp,fp,fn): |
return (2*tp)/(2*tp+fp+fn) if tp > 0 else 0 |
def matching(y_true, y_pred, thresh=0.5, criterion='iou', report_matches=False): |
"""Calculate detection/instance segmentation metrics between ground truth and predicted label images. |
Currently, the following metrics are implemented: |
'fp', 'tp', 'fn', 'precision', 'recall', 'accuracy', 'f1', 'criterion', 'thresh', 'n_true', 'n_pred', 'mean_true_score', 'mean_matched_score', 'panoptic_quality' |
Corresponding objects of y_true and y_pred are counted as true positives (tp), false positives (fp), and false negatives (fn) |
whether their intersection over union (IoU) >= thresh (for criterion='iou', which can be changed) |
* mean_matched_score is the mean IoUs of matched true positives |
* mean_true_score is the mean IoUs of matched true positives but normalized by the total number of GT objects |
* panoptic_quality defined as in Eq. 1 of Kirillov et al. "Panoptic Segmentation", CVPR 2019 |
Parameters |
---------- |
y_true: ndarray |
ground truth label image (integer valued) |
y_pred: ndarray |
predicted label image (integer valued) |
thresh: float |
threshold for matching criterion (default 0.5) |
criterion: string |
matching criterion (default IoU) |
report_matches: bool |
if True, additionally calculate matched_pairs and matched_scores (note, that this returns even gt-pred pairs whose scores are below 'thresh') |
Returns |
------- |
Matching object with different metrics as attributes |
Examples |
-------- |
>>> y_true = np.zeros((100,100), np.uint16) |
>>> y_true[10:20,10:20] = 1 |
>>> y_pred = np.roll(y_true,5,axis = 0) |
>>> stats = matching(y_true, y_pred) |
>>> print(stats) |
Matching(criterion='iou', thresh=0.5, fp=1, tp=0, fn=1, precision=0, recall=0, accuracy=0, f1=0, n_true=1, n_pred=1, mean_true_score=0.0, mean_matched_score=0.0, panoptic_quality=0.0) |
""" |
_check_label_array(y_true,'y_true') |
_check_label_array(y_pred,'y_pred') |
y_true.shape == y_pred.shape or _raise(ValueError("y_true ({y_true.shape}) and y_pred ({y_pred.shape}) have different shapes".format(y_true=y_true, y_pred=y_pred))) |
criterion in matching_criteria or _raise(ValueError("Matching criterion '%s' not supported." % criterion)) |
if thresh is None: thresh = 0 |
thresh = float(thresh) if np.isscalar(thresh) else map(float,thresh) |
y_true, _, map_rev_true = relabel_sequential(y_true) |
y_pred, _, map_rev_pred = relabel_sequential(y_pred) |
overlap = label_overlap(y_true, y_pred, check=False) |
scores = matching_criteria[criterion](overlap) |
assert 0 <= np.min(scores) <= np.max(scores) <= 1 |
scores = scores[1:,1:] |
n_true, n_pred = scores.shape |
n_matched = min(n_true, n_pred) |
def _single(thr): |
not_trivial = n_matched > 0 |
if not_trivial: |
costs = -(scores >= thr).astype(float) - scores / (2*n_matched) |
true_ind, pred_ind = linear_sum_assignment(costs) |
assert n_matched == len(true_ind) == len(pred_ind) |
match_ok = scores[true_ind,pred_ind] >= thr |
tp = np.count_nonzero(match_ok) |
else: |
tp = 0 |
fp = n_pred - tp |
fn = n_true - tp |
sum_matched_score = np.sum(scores[true_ind,pred_ind][match_ok]) if not_trivial else 0.0 |
mean_matched_score = _safe_divide(sum_matched_score, tp) |
mean_true_score = _safe_divide(sum_matched_score, n_true) |
panoptic_quality = _safe_divide(sum_matched_score, tp+fp/2+fn/2) |
stats_dict = dict ( |
criterion = criterion, |
thresh = thr, |
fp = fp, |
tp = tp, |
fn = fn, |
precision = precision(tp,fp,fn), |
recall = recall(tp,fp,fn), |
accuracy = accuracy(tp,fp,fn), |
f1 = f1(tp,fp,fn), |
n_true = n_true, |
n_pred = n_pred, |
mean_true_score = mean_true_score, |
mean_matched_score = mean_matched_score, |
panoptic_quality = panoptic_quality, |
) |
if bool(report_matches): |
if not_trivial: |
stats_dict.update ( |
matched_pairs = tuple((int(map_rev_true[i]),int(map_rev_pred[j])) for i,j in zip(1+true_ind,1+pred_ind)), |
matched_scores = tuple(scores[true_ind,pred_ind]), |
matched_tps = tuple(map(int,np.flatnonzero(match_ok))), |
) |
else: |
stats_dict.update ( |
matched_pairs = (), |
matched_scores = (), |
matched_tps = (), |
) |
return namedtuple('Matching',stats_dict.keys())(*stats_dict.values()) |
return _single(thresh) if np.isscalar(thresh) else tuple(map(_single,thresh)) |
def matching_dataset(y_true, y_pred, thresh=0.5, criterion='iou', by_image=False, show_progress=True, parallel=False): |
"""matching metrics for list of images, see `stardist.matching.matching` |
""" |
len(y_true) == len(y_pred) or _raise(ValueError("y_true and y_pred must have the same length.")) |
return matching_dataset_lazy ( |
tuple(zip(y_true,y_pred)), thresh=thresh, criterion=criterion, by_image=by_image, show_progress=show_progress, parallel=parallel, |
) |
def matching_dataset_lazy(y_gen, thresh=0.5, criterion='iou', by_image=False, show_progress=True, parallel=False): |
expected_keys = set(('fp', 'tp', 'fn', 'precision', 'recall', 'accuracy', 'f1', 'criterion', 'thresh', 'n_true', 'n_pred', 'mean_true_score', 'mean_matched_score', 'panoptic_quality')) |
single_thresh = False |
if np.isscalar(thresh): |
single_thresh = True |
thresh = (thresh,) |
tqdm_kwargs = {} |
tqdm_kwargs['disable'] = not bool(show_progress) |
if int(show_progress) > 1: |
tqdm_kwargs['total'] = int(show_progress) |
if parallel: |
from concurrent.futures import ThreadPoolExecutor |
fn = lambda pair: matching(*pair, thresh=thresh, criterion=criterion, report_matches=False) |
with ThreadPoolExecutor() as pool: |
stats_all = tuple(pool.map(fn, tqdm(y_gen,**tqdm_kwargs))) |
else: |
stats_all = tuple ( |
matching(y_t, y_p, thresh=thresh, criterion=criterion, report_matches=False) |
for y_t,y_p in tqdm(y_gen,**tqdm_kwargs) |
) |
n_images, n_threshs = len(stats_all), len(thresh) |
accumulate = [{} for _ in range(n_threshs)] |
for stats in stats_all: |
for i,s in enumerate(stats): |
acc = accumulate[i] |
for k,v in s._asdict().items(): |
if k == 'mean_true_score' and not bool(by_image): |
acc[k] = acc.setdefault(k,0) + v * s.n_true |
else: |
try: |
acc[k] = acc.setdefault(k,0) + v |
except TypeError: |
pass |
for thr,acc in zip(thresh,accumulate): |
set(acc.keys()) == expected_keys or _raise(ValueError("unexpected keys")) |
acc['criterion'] = criterion |
acc['thresh'] = thr |
acc['by_image'] = bool(by_image) |
if bool(by_image): |
for k in ('precision', 'recall', 'accuracy', 'f1', 'mean_true_score', 'mean_matched_score', 'panoptic_quality'): |
acc[k] /= n_images |
else: |
tp, fp, fn, n_true = acc['tp'], acc['fp'], acc['fn'], acc['n_true'] |
sum_matched_score = acc['mean_true_score'] |
mean_matched_score = _safe_divide(sum_matched_score, tp) |
mean_true_score = _safe_divide(sum_matched_score, n_true) |
panoptic_quality = _safe_divide(sum_matched_score, tp+fp/2+fn/2) |
acc.update( |
precision = precision(tp,fp,fn), |
recall = recall(tp,fp,fn), |
accuracy = accuracy(tp,fp,fn), |
f1 = f1(tp,fp,fn), |
mean_true_score = mean_true_score, |
mean_matched_score = mean_matched_score, |
panoptic_quality = panoptic_quality, |
) |
accumulate = tuple(namedtuple('DatasetMatching',acc.keys())(*acc.values()) for acc in accumulate) |
return accumulate[0] if single_thresh else accumulate |
def relabel_sequential(label_field, offset=1): |
"""Relabel arbitrary labels to {`offset`, ... `offset` + number_of_labels}. |
This function also returns the forward map (mapping the original labels to |
the reduced labels) and the inverse map (mapping the reduced labels back |
to the original ones). |
Parameters |
---------- |
label_field : numpy array of int, arbitrary shape |
An array of labels, which must be non-negative integers. |
offset : int, optional |
The return labels will start at `offset`, which should be |
strictly positive. |
Returns |
------- |
relabeled : numpy array of int, same shape as `label_field` |
The input label field with labels mapped to |
{offset, ..., number_of_labels + offset - 1}. |
The data type will be the same as `label_field`, except when |
offset + number_of_labels causes overflow of the current data type. |
forward_map : numpy array of int, shape ``(label_field.max() + 1,)`` |
The map from the original label space to the returned label |
space. Can be used to re-apply the same mapping. See examples |
for usage. The data type will be the same as `relabeled`. |
inverse_map : 1D numpy array of int, of length offset + number of labels |
The map from the new label space to the original space. This |
can be used to reconstruct the original label field from the |
relabeled one. The data type will be the same as `relabeled`. |
Notes |
----- |
The label 0 is assumed to denote the background and is never remapped. |
The forward map can be extremely big for some inputs, since its |
length is given by the maximum of the label field. However, in most |
situations, ``label_field.max()`` is much smaller than |
``label_field.size``, and in these cases the forward map is |
guaranteed to be smaller than either the input or output images. |
Examples |
-------- |
>>> from skimage.segmentation import relabel_sequential |
>>> label_field = np.array([1, 1, 5, 5, 8, 99, 42]) |
>>> relab, fw, inv = relabel_sequential(label_field) |
>>> relab |
array([1, 1, 2, 2, 3, 5, 4]) |
>>> fw |
array([0, 1, 0, 0, 0, 2, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, |
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, |
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, |
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, |
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5]) |
>>> inv |
array([ 0, 1, 5, 8, 42, 99]) |
>>> (fw[label_field] == relab).all() |
True |
>>> (inv[relab] == label_field).all() |
True |
>>> relab, fw, inv = relabel_sequential(label_field, offset=5) |
>>> relab |
array([5, 5, 6, 6, 7, 9, 8]) |
""" |
offset = int(offset) |
if offset <= 0: |
raise ValueError("Offset must be strictly positive.") |
if np.min(label_field) < 0: |
raise ValueError("Cannot relabel array that contains negative values.") |
max_label = int(label_field.max()) |
if not np.issubdtype(label_field.dtype, np.integer): |
new_type = np.min_scalar_type(max_label) |
label_field = label_field.astype(new_type) |
labels = np.unique(label_field) |
labels0 = labels[labels != 0] |
new_max_label = offset - 1 + len(labels0) |
new_labels0 = np.arange(offset, new_max_label + 1) |
output_type = label_field.dtype |
required_type = np.min_scalar_type(new_max_label) |
if np.dtype(required_type).itemsize > np.dtype(label_field.dtype).itemsize: |
output_type = required_type |
forward_map = np.zeros(max_label + 1, dtype=output_type) |
forward_map[labels0] = new_labels0 |
inverse_map = np.zeros(new_max_label + 1, dtype=output_type) |
inverse_map[offset:] = labels0 |
relabeled = forward_map[label_field] |
return relabeled, forward_map, inverse_map |
def group_matching_labels(ys, thresh=1e-10, criterion='iou'): |
""" |
Group matching objects (i.e. assign the same label id) in a |
list of label images (e.g. consecutive frames of a time-lapse). |
Uses function `matching` (with provided `criterion` and `thresh`) to |
iteratively/greedily match and group objects/labels in consecutive images of `ys`. |
To that end, matching objects are grouped together by assigning the same label id, |
whereas unmatched objects are assigned a new label id. |
At the end of this process, each label group will have been assigned a unique id. |
Note that the label images `ys` will not be modified. Instead, they will initially |
be duplicated and converted to data type `np.int32` before objects are grouped and the result |
is returned. (Note that `np.int32` limits the number of label groups to at most 2147483647.) |
Example |
------- |
import numpy as np |
from stardist.data import test_image_nuclei_2d |
from stardist.matching import group_matching_labels |
_y = test_image_nuclei_2d(return_mask=True)[1] |
labels = np.stack([_y, 2*np.roll(_y,10)], axis=0) |
labels_new = group_matching_labels(labels) |
Parameters |
---------- |
ys : np.ndarray or list/tuple of np.ndarray |
list/array of integer labels (2D or 3D) |
""" |
len(ys) > 1 or _raise(ValueError("'ys' must have 2 or more entries")) |
if isinstance(ys, np.ndarray): |
_check_label_array(ys, 'ys') |
ys.ndim > 1 or _raise(ValueError("'ys' must be at least 2-dimensional")) |
ys_grouped = np.empty_like(ys, dtype=np.int32) |
else: |
all(_check_label_array(y, 'ys') for y in ys) or _raise(ValueError("'ys' must be a list of label images")) |
all(y.shape==ys[0].shape for y in ys) or _raise(ValueError("all label images must have the same shape")) |
ys_grouped = np.empty((len(ys),)+ys[0].shape, dtype=np.int32) |
def _match_single(y_prev, y, next_id): |
y = y.astype(np.int32, copy=False) |
res = matching(y_prev, y, report_matches=True, thresh=thresh, criterion=criterion) |
relabel = dict(reversed(res.matched_pairs[i]) for i in res.matched_tps) |
y_grouped = np.zeros_like(y) |
for r in regionprops(y): |
m = (y[r.slice] == r.label) |
if r.label in relabel: |
y_grouped[r.slice][m] = relabel[r.label] |
else: |
y_grouped[r.slice][m] = next_id |
next_id += 1 |
return y_grouped, next_id |
ys_grouped[0] = ys[0] |
next_id = ys_grouped[0].max() + 1 |
for i in range(len(ys)-1): |
ys_grouped[i+1], next_id = _match_single(ys_grouped[i], ys[i+1], next_id) |
return ys_grouped |
def _shuffle_labels(y): |
_check_label_array(y, 'y') |
y2 = np.zeros_like(y) |
ids = tuple(set(np.unique(y)) - {0}) |
relabel = dict(zip(ids,np.random.permutation(ids))) |
for r in regionprops(y): |
m = (y[r.slice] == r.label) |
y2[r.slice][m] = relabel[r.label] |
return y2 |