#
# Functions related to vertex finding heuristic.
#
import numpy as np
import scipy
from mlreco.utils.ppn import uresnet_ppn_type_point_selector
from mlreco.utils.ppn import get_track_endpoints_geo
from sklearn.decomposition import PCA
from mlreco.utils.gnn.evaluation import primary_assignment
from mlreco.utils.groups import type_labels
[docs]def find_closest_points_of_approach(point1, direction1, point2, direction2):
"""
Given two lines in 3D space, find the two points that are closest to
each other on these lines.
See also https://math.stackexchange.com/a/1993990/391047.
Parameters
==========
point1: np.ndarray
Point belonging to first line. Shape (3,)
direction1: np.ndarray
Direction defining the first line with `point1`. Shape (3,)
point2: np.ndarray
Point belonging to second line. Shape (3,)
direction2: np.ndarray
Direction defining the second line with `point2`. Shape (3,)
Output
======
tuple of np.ndarray
Two points of approach, tuple with shape ((3,), (3,))
"""
#print(point1, point2)
common_normal = np.cross(direction1, direction2)
A = np.stack([direction1, - direction2, common_normal], axis=1)
b = point2 - point1
#print(A.shape, b.shape)
x = np.linalg.solve(A, b)
return point1 + x[0] * direction1, point2 + x[1] * direction2
[docs]def get_ppn_points_per_particles(input_data, res,
primary_particles, primary_particles_seg,
data_idx=0, coords_col=(1, 4),
attaching_threshold=10,
track_label=1,
shower_label=0,
unwrapped=False,
apply_deghosting=True,
return_distances=False,
min_voxel_count=10,
min_track_count=2):
"""
Get predicted PPN points
Parameters
----------
input_data: dict
res: dict
primary_particles: list
primary_particles_seg: list
data_idx: int, default 0
coords_col: tuple of int, default (1, 4)
attaching_threshold: float, default 10
Distance (in voxels) to associate PPN candidates with particles.
track_label: int, default 1
Semantic label for track-like particles.
shower_label: int, default 0
Semantic label for shower-like particles.
unwrapped: bool, default False
apply_deghosting: bool, default True
Whether PPN post-processing should consider the output already deghosted or not.
return_distancesL bool, default False
For tuning studies.
min_voxel_count: int, default 10
Any particle with predicted deghosted voxel count below this threshold
will be ignored.
min_track_count: int, default 2
Will ignore shower particles as soon as we have >= min_track_count
track-like particles in the interaction. This can be set to `None`
to disable this behavior completely.
Output
------
- list of N arrays of shape (M_i,f) of M_i PPN candidate points, f corresponds to the number
of feature in the output of uresnet_ppn_type_point_selector.
- array of N lists of voxel indices, corresponding to the particles
whose predicted semantic is track or shower.
N is the number of particles which are either track or shower (predicted).
"""
clusts = res['inter_particles'][data_idx]
ppn_candidates, c_candidates, distances, c_indices = [], [], [], []
#
# 1. Run PPN post-processing first to extract PPN candidates
#
ppn = uresnet_ppn_type_point_selector(input_data[data_idx], res,
entry=data_idx,
score_threshold=0.5,
type_threshold=2,
unwrapped=unwrapped,
apply_deghosting=apply_deghosting)
if ppn.shape[0] == 0:
if return_distances:
return np.empty((0, 3)), np.empty((0,)), np.empty((0, 3)), np.empty((0, 3))
else:
return np.empty((0, 3)), np.empty((0,)), np.empty((0, 3))
ppn_voxels = ppn[:, coords_col[0]:coords_col[1]]
ppn_score = ppn[:, 5]
#ppn_occupancy = ppn[:, 6]
ppn_type = ppn[:, 7:12]
ppn_endpoints = np.argmax(ppn[:, 13:15], axis=1)
no_delta = ppn_type[:, 3] < 0.5
ppn = ppn[no_delta]
ppn_voxels = ppn_voxels[no_delta]
ppn_score = ppn_score[no_delta]
ppn_type = ppn_type[no_delta]
ppn_endpoints = ppn_endpoints[no_delta]
ppn = ppn[(ppn_type[:, track_label] > 0.5) | (ppn_type[:, shower_label] > 0.5)]
#print('\n', ppn_voxels, '\n')
all_voxels = input_data[data_idx]
#
# 2. Find shower primary predictions
#
if 'shower_node_pred' in res:
all_primaries = primary_assignment(res['shower_node_pred'][data_idx], res['shower_group_pred'][data_idx])
else:
all_primaries = []
if 'ghost' in res and apply_deghosting:
mask_ghost = np.argmax(res['ghost'][data_idx], axis=1) == 0
all_voxels = input_data[data_idx][mask_ghost]
#
# 3. Identify predicted particles that we want to keep to
# find the vertex ("preliminary candidates").
#
preliminary_candidates = []
for c_idx, c in enumerate(primary_particles):
if len(c) < min_voxel_count: continue
c_seg = primary_particles_seg[c_idx]
if c_seg == shower_label:
shower_primary = []
# Several shower fragments can be part of the predicted primary
for p in res['shower_fragments'][data_idx][all_primaries]:
if len(np.intersect1d(c, p)):
shower_primary.append(p)
if len(shower_primary):
c = np.hstack(shower_primary)
# If it is not a shower or track particle, ignore
if c_seg not in [track_label, shower_label]:
continue
preliminary_candidates.append((c_idx, c_seg, c))
# If we have >= 2 tracks, just ignore shower particles
# to find the vertex
if min_track_count is not None:
assert type(min_track_count) == int
track_count = np.sum([int(c_seg == track_label) for _, c_seg, _ in preliminary_candidates])
if track_count >= min_track_count:
preliminary_candidates = [(c_idx, c_seg, c) for c_idx, c_seg, c in preliminary_candidates if c_seg == track_label]
#
# 4. Look at PPN predictions for each primary particle
# Keep them when distance is < attaching_threshold.
# This outputs a list of "ppn_candidates" that we will
# use to compute the vertex. `ppn_candidates` has the
# same length as `preliminary_candidates` and lists
# points for each candidate particle of interest.
#
for c_idx, c_seg, c in preliminary_candidates:
c_candidates.append(c)
c_indices.append(c_idx)
d = scipy.spatial.distance.cdist(ppn[:, coords_col[0]:coords_col[1]], all_voxels[c, coords_col[0]:coords_col[1]])
distances.append(d.min(axis=1))
good_ppn_predictions = d.min(axis=1) < attaching_threshold
# If it's a track also use geometry, not just PPN candidates
if c_seg == track_label:
end_points = get_track_endpoints_geo(all_voxels, c, use_numpy=True)
end_points = np.concatenate([end_points, ppn[good_ppn_predictions, coords_col[0]:coords_col[1]]], axis=0)
else:
end_points = ppn[good_ppn_predictions, coords_col[0]:coords_col[1]]
ppn_candidates.append(end_points)
c_indices = np.array(c_indices)
if return_distances:
return ppn_candidates, c_candidates, c_indices, distances
else:
return ppn_candidates, c_candidates, c_indices
[docs]def predict_vertex(inter_idx, data_idx, input_data, res,
coords_col=(1, 4),
primary_label=1,
shower_label=0,
track_label=1,
attaching_threshold=10,
inter_threshold=20,
unwrapped=False,
apply_deghosting=True,
return_distances=False,
other_primaries_threshold=10,
other_primaries_gamma_threshold=100,
pca_radius=28,
min_track_count=2,
min_voxel_count=10):
"""
Heuristic to find the vertex by looking at
- predicted primary particles within predicted interaction
- predicted PPN points for these primary particles
For now, very simple: taking the barycenter of potential candidates.
Parameters
----------
inter_idx: int
Predicted interaction index.
data_idx: int
Batch entry index.
input_data: dict
Input dictionary.
res: dict
Output dictionary.
coords_col: tuple, default (1, 4)
primary_label: int, default 1
In GNN predictions, integer tagging predicted primary particles.
shower_label: int, default 0
Semantic label for shower-like particles.
track_label: int, default 1
Semantic label for track-like particles.
attaching_threshold: float, default 10
See `get_ppn_points_per_particles`.
inter_threshold: float, default 20
PPN candidates need to minimize difference between distance
to closest primary and distance of current primary particle
voxels to closest primary particle.
unwrapped: bool, default False
Whether `input_data` and `res` are already unwrapped or not.
apply_deghosting: bool, default True
Whether to apply deghosting.
return_distances: bool, default False
For tuning studies.
other_primaries_threshold: float, default 10
Primaries too far from the other primaries will be ignored.
other_primaries_gamma_threshold: float, default 100
Same as previous but for photon-like particles exclusively ($T_B$).
Can be -1, then the same threshold as all other primaries will be used.
pca_radius: float, default 28
min_track_count: int, default 2
See `get_ppn_points_per_particles`.
min_voxel_count: int, default 10
See `get_ppn_points_per_particles`.
Output
------
np.ndarray
vtx_candidate with shape (3,)
"""
if other_primaries_gamma_threshold < 0:
other_primaries_gamma_threshold = other_primaries_threshold
pca = PCA(n_components=3)
clusts = res['inter_particles'][data_idx]
if inter_idx not in set(res['inter_group_pred'][data_idx]):
raise ValueError("Interaction ID: {} does not exist for data entry : {}.\n"\
" Available Interactions: {}".format(inter_idx, data_idx, str(np.unique(res['inter_group_pred'][data_idx]))))
inter_mask = res['inter_group_pred'][data_idx] == inter_idx
interaction = clusts[inter_mask]
# Identify predicted primary particles within the interaction
primary_particles = np.argmax(res['node_pred_vtx'][data_idx][inter_mask][:, 3:], axis=1) == primary_label
# Identify PID among primary particles
pid = np.argmax(res['node_pred_type'][data_idx][inter_mask][primary_particles], axis=1)
photon_label = type_labels[22]
#
# Get PPN candidates for vertex, listed per primary particle
#
out = get_ppn_points_per_particles(input_data, res,
clusts[inter_mask][primary_particles],
res['particles_seg'][data_idx][inter_mask][primary_particles],
data_idx=data_idx,
attaching_threshold=attaching_threshold,
track_label=track_label,
shower_label=shower_label,
coords_col=coords_col,
unwrapped=unwrapped,
apply_deghosting=apply_deghosting,
return_distances=return_distances,
min_track_count=min_track_count,
min_voxel_count=min_voxel_count)
if return_distances:
ppn_candidates, c_candidates, c_indices, distances = out
else:
ppn_candidates, c_candidates, c_indices = out
all_voxels = input_data[data_idx]
if 'ghost' in res and apply_deghosting:
if 'input_rescaled' not in res:
mask_ghost = np.argmax(res['ghost'][data_idx], axis=1) == 0
all_voxels = input_data[data_idx][mask_ghost]
else:
all_voxels = res['input_rescaled'][data_idx]
# Handle the case where only a single primary is available
if len(ppn_candidates) == 1:
particle_seg = res['particles_seg'][data_idx][inter_mask][primary_particles][c_indices[0]]
end_points = res['particle_node_features'][data_idx][inter_mask][primary_particles][c_indices[0], -9:-3].reshape(-1,3)
if particle_seg != 1:
# If there's a single shower object, pick the shower start point
return end_points[0]
else:
# If there's a single track, pick the end point with the lowest local charge density
voxels = all_voxels[c_candidates[0], coords_col[0]:coords_col[1]]
dist_mat = scipy.spatial.distance.cdist(end_points, voxels)
mask = dist_mat < 5
charges = all_voxels[c_candidates[0],4]
locald = [np.sum(charges[mask[0]]), np.sum(charges[mask[1]])]
return end_points[np.argmin(locald)]
# Handle all other cases
ppn_candidates2 = []
directions = []
distances_others, distances_primaries = [], []
if len(ppn_candidates) > 1:
# For each primary particle, select the PPN predicted point
# that is closest to other primary particles in the interaction.
for p_idx, points in enumerate(ppn_candidates):
# No PPN point associated with this primary particle
if len(points) == 0:
continue
current_pid = pid[c_indices[p_idx]]
# For tuning purpose only
# compute distance from current particle to primaries that are not the current particle
if return_distances:
other_primaries_coordinates = all_voxels[np.hstack([c for idx, c in enumerate(c_candidates) if idx != p_idx])][:, coords_col[0]:coords_col[1]]
d = scipy.spatial.distance.cdist(all_voxels[c_candidates[p_idx], coords_col[0]:coords_col[1]], other_primaries_coordinates)
distances_others.append(d.min())
distance_to_other_primaries = []
points_to_other_primaries = []
# Ignore photons if
# - at least 3 primary particles involved
# - at least 2 non-photon primary
use_gamma_threshold = (pid[c_indices] != type_labels[22]).sum() <= 1
for c_idx, c2 in enumerate(c_candidates):
if c_idx == p_idx: continue
# Ignore photons
# if no_photon_count > 0 and pid[c_indices[c_idx]] == type_labels[22]: continue
if ~use_gamma_threshold and pid[c_indices[c_idx]] == type_labels[22]: continue
d2 = scipy.spatial.distance.cdist(all_voxels[c_candidates[p_idx], coords_col[0]:coords_col[1]], all_voxels[c2, coords_col[0]:coords_col[1]])
distance_to_other_primaries.append(d2.min())
d3 = scipy.spatial.distance.cdist(points, all_voxels[c2, coords_col[0]:coords_col[1]])
points_to_other_primaries.append((d3.min(axis=1) - d2.min())[:, None])
distance_to_other_primaries = np.array(distance_to_other_primaries)
points_to_other_primaries = np.concatenate(points_to_other_primaries, axis=1)
if len(distance_to_other_primaries) == 0: continue
# Select points that minimize
# - distance from the point to other primaries
# - distance from current primary to other primaries
# (this is a heuristic to select points closest to vertex)
candidate_distances = np.mean(np.abs(points_to_other_primaries), axis=1)
best_candidate_distance = candidate_distances.min()
distances_primaries.append(best_candidate_distance)
#
# Apply T_B threshold
#
use_gamma_threshold = (current_pid == type_labels[22]) or use_gamma_threshold
if use_gamma_threshold and (other_primaries_gamma_threshold > -1) and (distance_to_other_primaries.min() >= other_primaries_gamma_threshold):
#print('Skipping photon')
continue
elif (~use_gamma_threshold or other_primaries_gamma_threshold == -1) and (other_primaries_threshold > -1) and (distance_to_other_primaries.min() >= other_primaries_threshold):
#print("Skipping", p_idx, (distance_to_other_primaries >= other_primaries_threshold).sum(), len(distance_to_other_primaries), distance_to_other_primaries)
continue
if best_candidate_distance < inter_threshold:
# Look at all of the points below threshold, pick first one for which we can make a direction
all_mask = candidate_distances < inter_threshold
best_idx = np.where(all_mask)[0][candidate_distances[all_mask].argsort()]
for b in best_idx:
d = scipy.spatial.distance.cdist(all_voxels[c_candidates[p_idx], coords_col[0]:coords_col[1]], [points[b]])
X = all_voxels[c_candidates[p_idx], coords_col[0]:coords_col[1]][d.reshape((-1,)) < pca_radius]
# We need at least 3 (2?) distinct voxels to compute a PCA
if X.shape[0] >= 3:
ppn_candidates2.append(points[b][None, :])
directions.append(pca.fit(X).components_[0][None, :])
break
# Initialize final output
vtx_std, vtx_candidate = np.array([-1, -1, -1]), np.array([-1, -1, -1])
# If we have many candidates, we will consider closest points of approach
# When we are happy with our candidates we take a barycenter.
if len(ppn_candidates2):
ppn_candidates2 = np.concatenate(ppn_candidates2, axis=0)
directions = np.concatenate(directions, axis=0)
if len(ppn_candidates2) > 1:
closest_points = []
# We want to find the closest points of approach between any pair
# of (vertex candidate, direction of associated primary particle)
for p1_idx in range(0, len(ppn_candidates2)):
for p2_idx in range(p1_idx+1, len(ppn_candidates2)):
closest_points.extend(find_closest_points_of_approach(ppn_candidates2[p1_idx],
directions[p1_idx],
ppn_candidates2[p2_idx],
directions[p2_idx]))
closest_points = np.stack(closest_points)
else:
closest_points = ppn_candidates2
# Refine with dbscan to eliminate ppn candidates that are
# far away (e.g. middle of a track)
# ppn_candidates_group = DBSCAN(eps=7, min_samples=1).fit(ppn_candidates[:, :3]).labels_
# groups, counts = np.unique(ppn_candidates_group, return_counts=True)
# #print(counts)
# ppn_candidates = ppn_candidates[ppn_candidates_group == groups[np.argmax(counts)]]
#print("Selecting %d / %d points after dbscan" % (len(ppn_candidates), len(ppn_candidates_group)))
# This part here was giving a divide by zero RuntimeWarning. Best to avoid if possible
# Now take barycenter
vtx_candidate = np.mean(closest_points, axis=0)
vtx_std = np.std(closest_points, axis=0)
if return_distances:
return ppn_candidates2, c_candidates, vtx_candidate, vtx_std, ppn_candidates, distances, distances_others, distances_primaries
else:
return vtx_candidate
[docs]def get_vertex(kinematics, cluster_label, data_idx, inter_idx,
vtx_col=9, primary_label=1):
"""
Getting true vertex for interaction identified by inter_idx
Look at kinematics label, selecting only primary particles
within this interaction, and get vertex which occurs the most.
Parameters
----------
kinematics: list of np.ndarray
Kinematics labels.
cluster_label: list of np.ndarray
Cluster labels.
data_idx: int
Which entry we are looking at (labels).
inter_idx: int
The true interaction id for which we want the vertex.
vtx_col: int, default 9
First column of vertex coordinates in the kinematics labels.
Coordinates columns go from vtx_col to vtx_col+2.
primary_label: int, default 1
What integer tags primary particles in kinematics labels
("primary particles" ~ particles coming out of the vertex).
Output
------
np.ndarray
True vertex coordinates. Shape (3,)
"""
inter_mask = cluster_label[data_idx][:, 7] == inter_idx
primary_mask = kinematics[data_idx][:, vtx_col+3] == primary_label
mask = inter_mask if (inter_mask & primary_mask).sum() == 0 else inter_mask & primary_mask
vtx, counts = np.unique(kinematics[data_idx][mask][:, [vtx_col, vtx_col+1, vtx_col+2]], axis=0, return_counts=True)
vtx = vtx[np.argmax(counts)]
return vtx