Source code for mlreco.iotools.parsers.particles

import numpy as np
from larcv import larcv
from mlreco.utils.ppn import get_ppn_info
from mlreco.utils.groups import type_labels as TYPE_LABELS


[docs]def parse_particles(particle_event, cluster_event, voxel_coordinates=True): """ A function to copy construct & return an array of larcv::Particle. If `voxel_coordinates` is set to `True`, the parser rescales the truth positions (start, end, etc.) to voxel coordinates. .. code-block:: yaml schema: particles: parser: parse_particles args: particle_event: particle_pcluster cluster_event: cluster3d_pcluster voxel_coordinates: True Configuration ------------- particle_event: larcv::EventParticle cluster_event: larcv::EventClusterVoxel3D to translate coordinates voxel_coordinates: bool Returns ------- list a python list of larcv::Particle objects """ particles = [larcv.Particle(p) for p in particle_event.as_vector()] if voxel_coordinates: meta = cluster_event.meta() funcs = ['first_step', 'last_step', 'position', 'end_position', 'ancestor_position'] for p in particles: for f in funcs: pos = getattr(p,f)() x = (pos.x() - meta.min_x()) / meta.size_voxel_x() y = (pos.y() - meta.min_y()) / meta.size_voxel_y() z = (pos.z() - meta.min_z()) / meta.size_voxel_z() # x = (pos.x() - meta.origin().x) / meta.size_voxel_x() # y = (pos.y() - meta.origin().y) / meta.size_voxel_y() # z = (pos.z() - meta.origin().z) / meta.size_voxel_z() # x = pos.x() * meta.size_voxel_x() + meta.origin().x # y = pos.y() * meta.size_voxel_y() + meta.origin().y # z = pos.z() * meta.size_voxel_z() + meta.origin().z getattr(p,f)(x,y,z,pos.t()) return particles
[docs]def parse_neutrinos(neutrino_event, cluster_event, voxel_coordinates=True): """ A function to copy construct & return an array of larcv::Neutrino. If `voxel_coordinates` is set to `True`, the parser rescales the truth position information to voxel coordinates. .. code-block:: yaml schema: neutrinos: parser: parse_neutrinos args: neutrino_event: neutrino_mpv cluster_event: cluster3d_pcluster voxel_coordinates: True Configuration ------------- neutrino_pcluster: larcv::EventNeutrino cluster3d_pcluster: larcv::EventClusterVoxel3D to translate coordinates voxel_coordinates: bool Returns ------- list a python list of larcv::Neutrino objects """ neutrinos = [larcv.Neutrino(p) for p in neutrino_event.as_vector()] if voxel_coordinates: meta = cluster_event.meta() funcs = ['position'] for p in neutrinos: for f in funcs: pos = getattr(p,f)() x = (pos.x() - meta.min_x()) / meta.size_voxel_x() y = (pos.y() - meta.min_y()) / meta.size_voxel_y() z = (pos.z() - meta.min_z()) / meta.size_voxel_z() # x = (pos.x() - meta.origin().x) / meta.size_voxel_x() # y = (pos.y() - meta.origin().y) / meta.size_voxel_y() # z = (pos.z() - meta.origin().z) / meta.size_voxel_z() # x = pos.x() * meta.size_voxel_x() + meta.origin().x # y = pos.y() * meta.size_voxel_y() + meta.origin().y # z = pos.z() * meta.size_voxel_z() + meta.origin().z getattr(p,f)(x,y,z,pos.t()) return neutrinos
[docs]def parse_particle_points(sparse_event, particle_event, include_point_tagging=True): """ A function to retrieve particles ground truth points tensor, returns points coordinates, types and particle index. If include_point_tagging is true, it includes start vs end point tagging. .. code-block:: yaml schema: points: parser: parse_particle_points args: sprase_event: sparse3d_pcluster particle_event: particle_pcluster include_point_tagging: True Configuration ------------- sparse3d_pcluster: larcv::EventSparseTensor3D particle_pcluster: larcv::EventParticle include_point_tagging: bool Returns ------- np_voxels: np.ndarray a numpy array with the shape (N,3) where 3 represents (x,y,z) coordinate np_values: np.ndarray a numpy array with the shape (N, 2) where 2 represents the class of the ground truth point and the particle data index in this order. (optionally: end/start tagging) """ particles_v = particle_event.as_vector() part_info = get_ppn_info(particles_v, sparse_event.meta()) # For open data - to reproduce # part_info = get_ppn_info(particles_v, sparse_event.meta(), min_voxel_count=7, min_energy_deposit=10, use_particle_shape=False) # part_info = get_ppn_info(particles_v, sparse_event.meta(), min_voxel_count=5, min_energy_deposit=10, use_particle_shape=False) np_values = np.column_stack([part_info[:, 3], part_info[:, 8]]) if part_info.shape[0] > 0 else np.empty(shape=(0, 2), dtype=np.float32) if include_point_tagging: np_values = np.column_stack([part_info[:, 3], part_info[:, 8], part_info[:, 9]]) if part_info.shape[0] > 0 else np.empty(shape=(0, 3), dtype=np.float32) if part_info.shape[0] > 0: #return part_info[:, :3], part_info[:, 3][:, None] return part_info[:, :3], np_values else: #return np.empty(shape=(0, 3), dtype=np.int32), np.empty(shape=(0, 1), dtype=np.float32) return np.empty(shape=(0, 3), dtype=np.int32), np_values
[docs]def parse_particle_coords(particle_event, cluster_event): ''' Function that returns particle coordinates (start and end) and start time. This is used for particle clustering into interactions .. code-block:: yaml schema: coords: parser: parse_particle_coords args: particle_event: particle_pcluster cluster_event: cluster3d_pcluster Configuration ------------- particle_pcluster: larcv::EventParticle cluster3d_pcluster: larcv::EventClusterVoxel3D to translate coordinates Returns ------- numpy.ndarray Shape (N,8) containing: [first_step_x, first_step_y, first_step_z, last_step_x, last_step_y, last_step_z, first_step_t, shape_id] ''' # Scale particle coordinates to image size particles = parse_particles(particle_event, cluster_event) # Make features particle_feats = [] for i, p in enumerate(particles): start_point = last_point = [p.first_step().x(), p.first_step().y(), p.first_step().z()] if p.shape() == 1: # End point only meaningful and thought out for tracks last_point = [p.last_step().x(), p.last_step().y(), p.last_step().z()] particle_feats.append(np.concatenate((start_point, last_point, [p.first_step().t(), p.shape()]))) particle_feats = np.vstack(particle_feats) return particle_feats[:,:3], particle_feats[:,3:]
[docs]def parse_particle_graph(particle_event, cluster_event=None): """ A function to parse larcv::EventParticle to construct edges between particles (i.e. clusters) If cluster_event is provided, it also removes edges to clusters that have a zero pixel count and patches subsequently broken parentage. .. code-block:: yaml schema: graph: parser: parse_particle_graph args: particle_event: particle_pcluster cluster_event: cluster3d_pcluster Configuration ------------- particle_pcluster: larcv::EventParticle Returns ------- np.ndarray a numpy array of directed edges where each edge is (parent,child) batch index ID. See Also -------- parse_particle_graph_corrected: in addition, remove empty clusters. """ particles_v = particle_event.as_vector() num_particles = particles_v.size() if cluster_event is None: # Fill edges (directed [parent,child] pair) edges = np.empty((0,2), dtype = np.int32) for cluster_id in range(num_particles): p = particles_v[cluster_id] if p.parent_id() != p.id(): edges = np.vstack((edges, [int(p.parent_id()), cluster_id])) if p.parent_id() == p.id() and p.group_id() != p.id(): edges = np.vstack((edges, [int(p.group_id()), cluster_id])) else: # Check that the cluster and particle objects are consistent num_clusters = cluster_event.size() assert num_clusters == num_particles # Fill edges (directed [parent,child] pair) zero_nodes, zero_nodes_pid = [], [] edges = np.empty((0,2), dtype = np.int32) for cluster_id in range(num_particles): cluster = cluster_event.as_vector()[cluster_id] num_points = cluster.as_vector().size() p = particles_v[cluster_id] if p.id() != p.group_id(): continue if p.parent_id() != p.group_id(): edges = np.vstack((edges, [int(p.parent_id()),p.group_id()])) if num_points == 0: zero_nodes.append(p.group_id()) zero_nodes_pid.append(cluster_id) # Remove zero pixel nodes for i, zn in enumerate(zero_nodes): children = np.where(edges[:, 0] == zn)[0] if len(children) == 0: edges = edges[edges[:, 0] != zn] edges = edges[edges[:, 1] != zn] continue parent = np.where(edges[:, 1] == zn)[0] assert len(parent) <= 1 # If zero node has a parent, then assign children to that parent if len(parent) == 1: parent_id = edges[parent][0][0] edges[:, 0][children] = parent_id else: edges = edges[edges[:, 0] != zn] edges = edges[edges[:, 1] != zn] return edges
[docs]def parse_particle_singlep_pdg(particle_event): """ Get each true particle's PDG code. .. code-block:: yaml schema: pdg_list: parser: parse_particle_singlep_pdg args: particle_event: particle_pcluster Configuration ---------- particle_event : larcv::EventParticle Returns ------- np.ndarray List of PDG codes for each particle in TTree. """ pdgs = [] pdg = -1 for p in particle_event.as_vector(): if not p.track_id() == 1: continue if int(p.pdg_code()) in TYPE_LABELS.keys(): pdg = TYPE_LABELS[int(p.pdg_code())] else: pdg = -1 return np.asarray([pdg]) return np.asarray([pdg])
[docs]def parse_particle_singlep_einit(particle_event): """ Get each true particle's true initial energy. .. code-block:: yaml schema: einit_list: parser: parse_particle_singlep_pdg args: particle_event: particle_pcluster Configuration ---------- particle_event : larcv::EventParticle Returns ------- np.ndarray List of true initial energy for each particle in TTree. """ for p in particle_event.as_vector(): is_primary = p.track_id() == p.parent_track_id() if not p.track_id() == 1: continue return p.energy_init() return -1