Source code for mlreco.utils.track_clustering

# Utils to do track clustering
import numpy as np
import scipy
import sklearn
from scipy.spatial.distance import cdist

[docs]def track_clustering(voxels, points, method='masked_dbscan', **kwargs): if method == 'masked_dbscan': pair_mat = cdist(points, voxels, metric=kwargs['metric']) dist_mask = np.all((pair_mat > kwargs['mask_radius']), axis=0) labels = sklearn.cluster.DBSCAN(eps=kwargs['eps'], min_samples=kwargs['min_samples'], metric=kwargs['metric']).fit(voxels).labels_ for i in np.unique(labels): global_mask = labels == i active_mask = dist_mask & global_mask passive_mask = ~dist_mask & global_mask if np.sum(active_mask): res = sklearn.cluster.DBSCAN(eps=kwargs['eps'], min_samples=kwargs['min_samples'], metric=kwargs['metric']).fit(voxels[active_mask]) labels[active_mask] = np.max(labels)+1+res.labels_ if np.sum(passive_mask): dist_mat = cdist(voxels[active_mask], voxels[passive_mask], metric=kwargs['metric']) args = np.argmin(dist_mat, axis=0) labels[passive_mask] = labels[active_mask][args] return labels elif method == 'closest_path': pair_mat = cdist(voxels, points, metric=kwargs['metric']) labels = sklearn.cluster.DBSCAN(eps=kwargs['eps'], min_samples=kwargs['min_samples'], metric=kwargs['metric']).fit(voxels).labels_ for l in np.unique(labels): group_mask = labels == l point_mask = np.min(pair_mat[group_mask], axis=0) < kwargs['eps'] point_ids = np.unique(np.argmin(pair_mat[np.ix_(group_mask, point_mask)], axis=0)) if len(point_ids) > 2: # Build a graph on the group voxels that respect the DBSCAN distance scale dist_mat = cdist(voxels[group_mask], voxels[group_mask], metric=kwargs['metric']) graph = dist_mat * (dist_mat < kwargs['eps']) cs_graph = scipy.sparse.csr_matrix(graph) # Find the shortest between each of the breaking points, identify segments that minimize absolute excursion graph_mat, predecessors = scipy.sparse.csgraph.shortest_path(csgraph=cs_graph, directed=False, return_predecessors=True) break_ix = np.ix_(point_ids, point_ids) chord_mat = dist_mat[break_ix] mst_mat = scipy.sparse.csgraph.minimum_spanning_tree(graph_mat[break_ix]-chord_mat+1e-6).toarray() mst_edges = np.vstack(np.where(mst_mat > 0)).T # Construct graph paths along the tree paths = [[] for _ in range(len(mst_edges))] for i, e in enumerate(mst_edges): k, l = point_ids[e] paths[i].append(l) while l != k: l = predecessors[k,l] paths[i].append(l) # Find the path closest to each of the voxels in the group. If a path does not improve reachability, remove mindists = np.vstack([np.min(graph_mat[:,p],axis=1) for p in paths]) mst_tort = mst_mat[mst_mat > 0]/chord_mat[mst_mat > 0] # tau - 1 tort_ord = np.argsort(-mst_tort) least_r = np.max(np.min(mindists, axis=0)) # Least reachable point distance path_mask = np.ones(len(mst_tort), dtype=np.bool) for i in range(len(mst_tort)): if np.sum(path_mask) == 1: break path_mask[tort_ord[i]] = False reach = np.max(np.min(mindists[path_mask], axis=0)) if reach > least_r: path_mask[tort_ord[i]] = True # Associate voxels with closest remaining path mindists = np.vstack([np.min(graph_mat[:,p[min(1,len(p)-2):max(len(p)-1,2)]],axis=1) for i, p in enumerate(paths) if path_mask[i]]) sublabels = np.argmin(mindists, axis=0) labels[group_mask] = max(labels)+1+sublabels return np.unique(labels, return_inverse=True)[1] else: raise ValueError('Track clustering method not recognized:', method)