Source code for analysis.algorithms.selections.through_going_muons

from collections import OrderedDict
from turtle import update
from sklearn.decomposition import PCA

from analysis.classes.ui import FullChainEvaluator, FullChainPredictor
from analysis.decorator import evaluate
from analysis.algorithms.selections.flash_matching import find_true_time, find_true_x

from pprint import pprint
import time
import numpy as np
from scipy.spatial.distance import cdist


[docs]def must_invert(x, invert_regions): for r in invert_regions: if x >= r[0] and x <= r[1]: return True return False
[docs]@evaluate(['acpt_muons_cells', 'acpt_muons'], mode='per_batch') def through_going_muons(data_blob, res, data_idx, analysis_cfg, cfg): """ Selection of through going muons ================================= 1. Anode-cathode-piercing tracks: The idea is to select muons whose x-projected length matches with the detector size in the x-direction. Then we claim to have found an anode-cathode crossing muon. 2. Anode- or cathode-piercing tracks: TODO describe here how it is done (similar to MicroBooNE). Then for any selected track, we split it into segments using a cubic 3D binning of the detector volume. For each segment we perform PCA to estimate the direction, hence dx, and we sum reco charge to get dQ. This heuristic relies only on semantic segmentation and particle clustering predictions. Note: we might need to use PID to weed out pion tracks. Note: we might exclude muons attached to a Michel for anode-piercing or cathode-piercing tracks. Configuration ============= Under `processor_cfg`, you can specify the following parameters: spatial_size selection_threshold bin_size mode data vdrift drift voxel_size invert_regions limits volume_thresholds Michel_threshold pid_constraint muon_label track_label Output ====== Two CSV files should contain respectively: - array of measurements of dQ/dx for each non-empty cell (`acpt_muons_cells.csv`) - array of detected muons and related information (`acpt_muons.csv`) """ # # ====== Configuration ====== # muon_cells, muons = [], [] deghosting = analysis_cfg['analysis']['deghosting'] processor_cfg = analysis_cfg['analysis'].get('processor_cfg', {}) spatial_size = processor_cfg['spatial_size'] selection_threshold = processor_cfg['selection_threshold'] bin_size = processor_cfg['bin_size'] # 'ac' for anode-cathode piercing tracks mode = processor_cfg.get('mode', ['ac']) # Whether we are running on MC or data data = processor_cfg.get('data', False) # drift velocity of electrons in liquid argon in cm / us vdrift = processor_cfg.get('vdrift', 0.1114) # drift the full length of TPC in cm drift_length = processor_cfg.get('drift', 150) # Voxel size in cm voxel_size = processor_cfg.get('voxel_size', 0.3) # Regions along x-direction where we need to reverse anode/cathode, ie # these regions go cathode -> anode in increasing x # expecting [[xmin, xmax]] format in px invert_regions = processor_cfg.get('invert_regions', []) # YZ plane detector limits - expecting a list of [min_y, max_y, min_z, max_z] limits = processor_cfg.get('limits', []) if len(limits) == 0: min_y, min_z = 0, 0 max_y, max_z = spatial_size, spatial_size elif len(limits) != 4: raise Exception('Limits ill-defined, expected a list of [min_y, max_y, min_z, max_z]') else: min_y, max_y, min_z, max_z = limits # Define "active" volume in YZ for this selection for anode- or cathode-piercing tracks # expecting [bottom_threshold, top_threshold, back_threshold, front_threshold] volume_thresholds = processor_cfg.get('volume_thresholds', []) if len(volume_thresholds) == 0: bottom_threshold, top_threshold, back_threshold, front_threshold = selection_threshold, selection_threshold, selection_threshold, selection_threshold elif len(volume_thresholds) != 4: raise Exception('Volume thresholds in YZ plane ill-defined, expected a list [bottom_threshold, top_threshold, back_threshold, front_threshold]') else: bottom_threshold, top_threshold, back_threshold, front_threshold = volume_thresholds # Whether to exclude tracks that are close to Michel voxels # threshold =-1 to disable, otherwise it is the threshold below which we consider the track # might be attached to a Michel electron. Michel_threshold = processor_cfg.get('Michel_threshold', -1) # Whether to enforce PID constraint (predicted as muon only) pid_constraint = processor_cfg.get('pid_constraint', False) # Avoid hardcoding labels muon_label = processor_cfg.get('muon_label', 2) track_label = processor_cfg.get('track_label', 1) # To correct x coordinates - one cathode per cryostat, in same order as volumes cathode_x = processor_cfg.get('cathode_x', [675.5, 2077]) # # ====== Initialization ====== # # Initialize analysis differently depending on data/MC setting if not data: predictor = FullChainEvaluator(data_blob, res, cfg, processor_cfg, deghosting=deghosting) else: predictor = FullChainPredictor(data_blob, res, cfg, processor_cfg, deghosting=deghosting) image_idxs = data_blob['index'] # x is drift direction # y is vertical direction # z is beam direction x, y, z = 0, 1, 2 pca = PCA(n_components=2) for i, index in enumerate(image_idxs): index_dict = { 'Index': index, 'run': data_blob['run_info'][i][0], 'subrun': data_blob['run_info'][i][1], 'event': data_blob['run_info'][i][2] } print(index_dict) pred_particles = predictor.get_particles(i, only_primaries=False) # Match with true particles if available if not data: true_particles = predictor.get_true_particles(i, only_primaries=False) # Match true particles to predicted particles true_ids = np.array([p.id for p in true_particles]) matched_particles = predictor.match_particles(i, mode='true_to_pred', min_overlap=0.1) # Loop over predicted particles for p in pred_particles: if p.semantic_type != track_label: continue # We found a predicted track particle, examine if it is piercing coords = p.points projected_x_length = coords[:, x].max() - coords[:, x].min() touching_top = max_y - coords[:, y].max() < top_threshold touching_bottom = coords[:, y].min() - min_y < bottom_threshold touching_back = coords[:, z].min() - min_z < back_threshold touching_front = max_z - coords[:, z].max() < front_threshold # decide whether this track is worth keeping depending on mode keep = {'ac': False, 'a': False, 'c': False} keep['ac'] = (projected_x_length > (drift_length/voxel_size) - selection_threshold) piercing_x = -1 if (touching_top ^ touching_bottom ^ touching_back ^ touching_front): # Assuming downward going entering, exiting = None, None if touching_top: entering = coords[coords[:, y].argmax()] exiting = coords[coords[:, y].argmin()] keep['a'] = exiting[x] < entering[x] keep['c'] = not keep['a'] elif touching_bottom: entering = coords[coords[:, y].argmin()] exiting = coords[coords[:, y].argmax()] keep['a'] = exiting[x] > entering[x] keep['c'] = not keep['a'] elif touching_back: # Not really always "entering" - just smallest z. entering = coords[coords[:, z].argmin()] # Not really always "exiting" - just largest z. exiting = coords[coords[:, z].argmax()] keep['a'] = entering[x] > exiting[x] keep['c'] = not keep['a'] elif touching_front: entering = coords[coords[:, z].argmax()] exiting = coords[coords[:, z].argmin()] keep['a'] = exiting[x] < entering[x] keep['c'] = not keep['a'] piercing_x = min(entering[x], exiting[x]) if keep['a'] else max(entering[x], exiting[x]) # Invert if necessary for r in invert_regions: if (entering[x] > r[0] and entering[x] < r[1]) or (exiting[x] > r[0] and exiting[x] < r[1]): temp = keep['c'] keep['c'] = keep['a'] keep['a'] = temp break # FIXME does an ACPT count as an APT or CPT ? # if keep['ac']: # keep['a'] = True # keep['c'] = True # If asked, check for presence of Michel electron if Michel_threshold >= 0: for p2 in pred_particles: if p2.semantic_type != 2: continue if cdist(p.points, p2.points).min() >= Michel_threshold: continue keep['ac'] = False keep['a'] = False keep['c'] = False # If asked to check predicted PID, exclude non-predicted-muons if pid_constraint and p.pid != muon_label: continue if np.any([(keep[key] and key in mode) for key in keep]): # We are keeping this candidate, see if it is a match # for the records keeping # T0 correction, assuming T0 is minimal drift coordinate # keep in mind vdrift is in cm / us, so t0 is in us inverted = False if keep['ac']: inverted = must_invert(coords[:, x].min(), invert_regions) if inverted: piercing_x = coords[:, x].min() #t0 = piercing_x * voxel_size / vdrift else: piercing_x = coords[:, x].max() #t0 = - piercing_x * voxel_size / vdrift t0 = -1 elif keep['a']: # FIXME account for invert_regions t0 = piercing_x * voxel_size / vdrift elif keep['c']: # FIXME account for invert_regions t0 = piercing_x * voxel_size / vdrift - drift_length/vdrift # don't forget coords are in voxel units coords[:, x] = coords[:, x] + (cathode_x[p.volume] - piercing_x) theta_yz = np.arctan2((coords[:, y].max() - coords[:, y].min()),(coords[:, z].max()-coords[:, z].min())) theta_xz = np.arctan2((coords[:, x].max() - coords[:, x].min()),(coords[:, z].max()-coords[:, z].min())) update_dict = { 'index': index, 'pred_particle_type': p.pid, 'pred_particle_is_primary': p.is_primary, 'pred_particle_size': p.size, 'projected_x_length': projected_x_length, 'theta_yz': theta_yz, 'theta_xz': theta_xz, 'coords_min_x': coords[:, x].min(), 'coords_max_x': coords[:, x].max(), 'piercing_x': piercing_x, 'must_invert': inverted, 'matched': len(p.match), 't0': t0, 'true_pdg': -1, 'true_size': -1, 'volume': p.volume } update_dict.update(keep) # Record what kind of piercing track it was update_dict.update(index_dict) if not data and len(p.match)>0: m = np.array(true_particles)[true_ids == p.match[0]][0] update_dict.update({ 'true_pdg': m.pid, 'true_size': m.size }) muons.append(OrderedDict(update_dict)) track_dict = update_dict # Bin track in segments bins = np.arange(0, spatial_size, bin_size) y_inds = np.digitize(coords[:, y], bins) z_inds = np.digitize(coords[:, z], bins) x_inds = np.digitize(coords[:, x], bins) # Assuming x binning is a multiple for y_idx in np.unique(y_inds): for z_idx in np.unique(z_inds): for x_idx in np.unique(x_inds): cell = (y_inds == y_idx) & (z_inds == z_idx) & (x_inds == x_idx) if np.count_nonzero(cell) < 2: continue coords_pca = pca.fit_transform(coords[cell])[:, 0] update_dict = OrderedDict({ 'index': index, 'cell_dQ': p.depositions[cell].sum(), 'cell_dN': np.count_nonzero(cell), 'cell_dx': coords_pca.max() - coords_pca.min(), 'cell_deltax': coords[cell][:, x].max() - coords[cell][:, x].min(), 'cell_deltay': coords[cell][:, y].max() - coords[cell][:, y].min(), 'cell_deltaz': coords[cell][:, z].max() - coords[cell][:, z].min(), 'cell_ybin': y_idx, 'cell_zbin': z_idx, 'cell_xbin': x_idx, 'volume': p.volume, }) # Include parent track information update_dict.update(track_dict) update_dict.update(index_dict) muon_cells.append(update_dict) return [muon_cells, muons]