Source code for mlreco.main_funcs

import os, time, datetime, glob, sys, yaml
import numpy as np
try:
    if os.environ.get('OMP_NUM_THREADS') is None:
        os.environ['OMP_NUM_THREADS'] = '16'
    import MinkowskiEngine as ME
except ImportError:
    pass

from mlreco.iotools.factories import loader_factory
# Important: do not import here anything that might
# trigger cuda initialization through PyTorch.
# We need to set CUDA_VISIBLE_DEVICES first, which
# happens in the process_config function before anything
# else is allowed to happen.


[docs]class Handlers: cfg = None data_io = None data_io_iter = None csv_logger = None weight_io = None train_logger = None watch = None iteration = 0
[docs] def keys(self): return list(self.__dict__.keys())
# Use this function instead of itertools.cycle to avoid creating a memory leak. # (itertools.cycle attempts to save all outputs in order to re-cycle through them)
[docs]def cycle(data_io): while True: for x in data_io: yield x
[docs]def train(cfg, event_list=None): handlers = prepare(cfg, event_list=event_list) train_loop(handlers)
[docs]def inference(cfg, event_list=None): handlers = prepare(cfg, event_list=event_list) inference_loop(handlers)
[docs]def process_config(cfg, verbose=True): if 'trainval' in cfg: # Set GPUs to be used os.environ['CUDA_VISIBLE_DEVICES'] = cfg['trainval']['gpus'] cfg['trainval']['gpus'] = list(range(len([int(a) for a in cfg['trainval']['gpus'].split(',') if a.isdigit()]))) # Update seed if cfg['trainval']['seed'] < 0: import time cfg['trainval']['seed'] = int(time.time()) else: cfg['trainval']['seed'] = int(cfg['trainval']['seed']) # Set MinkowskiEngine number of threads os.environ['OMP_NUM_THREADS'] = '16' # default value # Set default concat_result default_concat_result = ['input_edge_features', 'input_node_features','points', 'coordinates', 'particle_node_features', 'particle_edge_features', 'track_node_features', 'shower_node_features', 'ppn_coords', 'mask_ppn', 'ppn_layers', 'classify_endpoints', 'vertex_layers', 'vertex_coords', 'primary_label_scales', 'segment_label_scales', 'seediness', 'margins', 'embeddings', 'fragments', 'fragments_seg', 'shower_fragments', 'shower_edge_index', 'shower_edge_pred','shower_node_pred','shower_group_pred','track_fragments', 'track_edge_index', 'track_node_pred', 'track_edge_pred', 'track_group_pred', 'particle_fragments', 'particle_edge_index', 'particle_node_pred', 'particle_edge_pred', 'particle_group_pred', 'particles', 'inter_edge_index', 'inter_node_pred', 'inter_edge_pred', 'inter_group_pred', 'inter_particles', 'node_pred_p', 'node_pred_type', 'vtx_labels', 'vtx_anchors', 'grappa_inter_vtx_labels', 'grappa_inter_vtx_anchors', 'kinematics_node_pred_p', 'kinematics_node_pred_type', 'flow_edge_pred', 'kinematics_particles', 'kinematics_edge_index', 'clust_fragments', 'clust_frag_seg', 'interactions', 'inter_cosmic_pred', 'node_pred_vtx', 'total_num_points', 'total_nonghost_points', 'spatial_embeddings', 'occupancy', 'hypergraph_features', 'features', 'feature_embeddings', 'covariance', 'clusts','edge_index','edge_pred','node_pred'] if 'concat_result' not in cfg['trainval']: cfg['trainval']['concat_result'] = default_concat_result if 'iotool' in cfg: # Update IO seed if 'sampler' in cfg['iotool']: if 'seed' not in cfg['iotool']['sampler'] or cfg['iotool']['sampler']['seed'] < 0: import time cfg['iotool']['sampler']['seed'] = int(time.time()) else: cfg['iotool']['sampler']['seed'] = int(cfg['iotool']['sampler']['seed']) # Batch size checker if cfg['iotool'].get('minibatch_size',None) is None: cfg['iotool']['minibatch_size'] = -1 if cfg['iotool']['batch_size'] < 0 and cfg['iotool']['minibatch_size'] < 0: raise ValueError('Cannot have both BATCH_SIZE (-bs) and MINIBATCH_SIZE (-mbs) negative values!') # Assign non-default values num_gpus = 1 if 'trainval' in cfg: num_gpus = max(1,len(cfg['trainval']['gpus'])) if cfg['iotool']['batch_size'] < 0: cfg['iotool']['batch_size'] = int(cfg['iotool']['minibatch_size'] * num_gpus) if cfg['iotool']['minibatch_size'] < 0: cfg['iotool']['minibatch_size'] = int(cfg['iotool']['batch_size'] / num_gpus) # Check consistency if not (cfg['iotool']['batch_size'] % (cfg['iotool']['minibatch_size'] * num_gpus)) == 0: raise ValueError('BATCH_SIZE (-bs) must be multiples of MINIBATCH_SIZE (-mbs) and GPU count (--gpus)!') # Report where config processed import subprocess as sc print('\nConfig processed at:',sc.getstatusoutput('uname -a')[1]) print('\n$CUDA_VISIBLE_DEVICES="%s"\n' % os.environ.get('CUDA_VISIBLE_DEVICES',None)) # Report GPUs to be used (if any) # Report configuations if verbose: from warnings import filterwarnings filterwarnings('once', message='Deprecated', category=DeprecationWarning) print(yaml.dump(cfg, default_flow_style=None))
[docs]def make_directories(cfg, loaded_iteration, handlers=None): from mlreco.utils import CSVData if 'trainval' in cfg: # Weight save directory if cfg['trainval']['weight_prefix']: save_dir = cfg['trainval']['weight_prefix'][0:cfg['trainval']['weight_prefix'].rfind('/')] if save_dir and not os.path.isdir(save_dir): os.makedirs(save_dir) # Log save directory if cfg['trainval']['log_dir']: if not os.path.exists(cfg['trainval']['log_dir']): os.makedirs(cfg['trainval']['log_dir']) logname = '%s/train_log-%07d.csv' % (cfg['trainval']['log_dir'], loaded_iteration) if not cfg['trainval']['train']: logname = '%s/inference_log-%07d.csv' % (cfg['trainval']['log_dir'], loaded_iteration) if handlers is not None: if hasattr(handlers,'csv_logger') and handlers.csv_logger: handlers.csv_logger.close() handlers.csv_logger = CSVData(logname)
[docs]def prepare(cfg, event_list=None): """ Prepares high level API handlers, namely trainval instance and torch DataLoader (w/ iterator) INPUT - cfg is a full configuration block after pre-processed by process_config function OUTPUT - Handler instance attached with trainval/DataLoader instances (if in config) """ import torch from mlreco.trainval import trainval handlers = Handlers() handlers.cfg = cfg # Instantiate DataLoader handlers.data_io = loader_factory(cfg, event_list=event_list) # IO iterator handlers.data_io_iter = iter(cycle(handlers.data_io)) if 'trainval' in cfg: # Set random seed for reproducibility np.random.seed(cfg['trainval']['seed']) torch.manual_seed(cfg['trainval']['seed']) # Set primary device if len(cfg['trainval']['gpus']) > 0: torch.cuda.set_device(cfg['trainval']['gpus'][0]) # Trainer configuration handlers.trainer = trainval(cfg) # set the shared clock handlers.watch = handlers.trainer._watch # Restore weights if necessary loaded_iteration = handlers.trainer.initialize() if cfg['trainval']['train']: handlers.iteration = loaded_iteration make_directories(cfg, loaded_iteration, handlers=handlers) return handlers
[docs]def apply_event_filter(handlers,event_list=None): """ Reconfigures IO to apply an event filter INPUT: - handlers is Handlers instance generated by prepare() function - event_list is an array of integers """ # Instantiate DataLoader handlers.data_io = loader_factory(handlers.cfg,event_list) # IO iterator handlers.data_io_iter = iter(cycle(handlers.data_io))
[docs]def log(handlers, tstamp_iteration, #tspent_io, tspent_iteration, tsum, res, cfg, epoch, first_id): """ Log relevant information to CSV files and stdout. """ import torch from mlreco.utils import utils report_step = cfg['trainval']['report_step'] and \ ((handlers.iteration+1) % cfg['trainval']['report_step'] == 0) res_dict = {} for key in res: # Average loss and acc over all the events in this batch # Keys of format %s_count are special and used as counters # e.g. for PPN when there are no particle labels in event #if 'analysis_keys' not in cfg['model'] or key not in cfg['model']['analysis_keys']: if len(res[key]) == 0: continue if isinstance(res[key][0], float) or isinstance(res[key][0], int): if "count" not in key: res_dict[key] = np.mean([np.array(t).mean() for t in res[key]]) else: res_dict[key] = np.sum(np.sum([np.array(t).sum() for t in res[key]])) mem = 0. if torch.cuda.is_available(): mem = utils.round_decimals(torch.cuda.max_memory_allocated()/1.e9, 3) # Organize time info t_iter = handlers.watch.time('iteration') t_io = handlers.watch.time('io') t_save = handlers.watch.time('save') t_net = handlers.watch.time('train' if cfg['trainval']['train'] else 'forward') t_forward_cpu = handlers.watch.time_cputime('forward_cpu') t_backward_cpu = handlers.watch.time_cputime('backward_cpu') # Report (logger) if handlers.csv_logger: tsum_map = handlers.trainer.tspent_sum handlers.csv_logger.record(('iter', 'first_id', 'epoch', 'titer', 'tsumiter'), (handlers.iteration, first_id, epoch, t_iter, tsum)) handlers.csv_logger.record(('tio', 'tsumio'), (t_io,tsum_map['io'])) handlers.csv_logger.record(('mem', ), (mem, )) handlers.csv_logger.record(('tforwardcpu', ), (t_forward_cpu, )) handlers.csv_logger.record(('tbackwardcpu', ), (t_backward_cpu, )) if cfg['trainval']['train']: handlers.csv_logger.record(('ttrain', 'tsave', 'tsumtrain', 'tsumsave'), (t_net, t_save, tsum_map['train'], tsum_map['save'])) else: handlers.csv_logger.record(('tforward', 'tsumforward'), (t_net, tsum_map['forward'])) for key in res_dict: handlers.csv_logger.record((key,), (res_dict[key],)) handlers.csv_logger.write() # Report (stdout) if report_step: acc = utils.round_decimals(np.mean(res.get('accuracy',-1)), 4) loss = utils.round_decimals(np.mean(res.get('loss', -1)), 4) tfrac = utils.round_decimals(t_net/t_iter*100., 2) tabs = utils.round_decimals(t_net, 3) epoch = utils.round_decimals(epoch, 2) if cfg['trainval']['train']: msg = 'Iter. %d (epoch %g) @ %s ... train time %g%% (%g [s]) mem. %g GB \n' msg = msg % (handlers.iteration, epoch, tstamp_iteration, tfrac, tabs, mem) else: msg = 'Iter. %d (epoch %g) @ %s ... forward time %g%% (%g [s]) mem. %g GB \n' msg = msg % (handlers.iteration, epoch, tstamp_iteration, tfrac, tabs, mem) msg += ' loss %g accuracy %g\n' % (loss, acc) print(msg) sys.stdout.flush() if handlers.csv_logger: handlers.csv_logger.flush() if handlers.train_logger: handlers.train_logger.flush()
[docs]def train_loop(handlers): """ Trainval loop. With optional minibatching as determined by the parameters cfg['iotool']['batch_size'] vs cfg['iotool']['minibatch_size']. """ import mlreco.post_processing as post_processing cfg=handlers.cfg tsum = 0. while handlers.iteration < cfg['trainval']['iterations']: epoch = handlers.iteration / float(len(handlers.data_io)) tstamp_iteration = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') handlers.watch.start('iteration') checkpt_step = cfg['trainval']['checkpoint_step'] and \ cfg['trainval']['weight_prefix'] and \ ((handlers.iteration+1) % cfg['trainval']['checkpoint_step'] == 0) # Train step if handlers.trainer._time_dependent: data_blob, result_blob = handlers.trainer.train_step(handlers.data_io_iter, iteration=handlers.iteration) else: data_blob, result_blob = handlers.trainer.train_step(handlers.data_io_iter) # Save snapshot if checkpt_step: handlers.trainer.save_state(handlers.iteration) # Store output if requested if 'post_processing' in cfg: for processor_name,processor_cfg in cfg['post_processing'].items(): processor_name = processor_name.split('+')[0] processor = getattr(post_processing,str(processor_name)) processor(cfg,processor_cfg,data_blob,result_blob,cfg['trainval']['log_dir'],handlers.iteration) handlers.watch.stop('iteration') tsum += handlers.watch.time('iteration') log(handlers, tstamp_iteration, tsum, result_blob, cfg, epoch, data_blob['index'][0]) # Increment iteration counter handlers.iteration += 1 # Finalize if handlers.csv_logger: handlers.csv_logger.close()
[docs]def inference_loop(handlers): """ Inference loop. Loops over weight files specified in cfg['trainval']['model_path']. For each weight file, runs the inference cfg['trainval']['iterations'] times. Note: Accuracy/loss will be per batch in the CSV log file, not per event. Write an analysis function to do per-event analysis (TODO). """ import mlreco.post_processing as post_processing tsum = 0. # Metrics for each event # global_metrics = {} weights = glob.glob(handlers.cfg['trainval']['model_path']) # if len(weights) > 0: print("Looping over weights: ", len(weights)) for w in weights: print(' -',w) for weight in weights: print('Setting weights',weight) handlers.cfg['trainval']['model_path'] = weight loaded_iteration = handlers.trainer.initialize() make_directories(handlers.cfg,loaded_iteration,handlers) handlers.iteration = 0 handlers.data_io_iter = iter(cycle(handlers.data_io)) while handlers.iteration < handlers.cfg['trainval']['iterations']: epoch = handlers.iteration / float(len(handlers.data_io)) tstamp_iteration = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') handlers.watch.start('iteration') checkpt_step = handlers.cfg['trainval']['checkpoint_step'] and \ handlers.cfg['trainval']['weight_prefix'] and \ ((handlers.iteration+1) % handlers.cfg['trainval']['checkpoint_step'] == 0) # Run inference data_blob, result_blob = handlers.trainer.forward(handlers.data_io_iter) # Store output if requested if 'post_processing' in handlers.cfg: for processor_name,processor_cfg in handlers.cfg['post_processing'].items(): processor_name = processor_name.split('+')[0] processor = getattr(post_processing,str(processor_name)) processor(handlers.cfg,processor_cfg,data_blob,result_blob,handlers.cfg['trainval']['log_dir'],handlers.iteration) handlers.watch.stop('iteration') tsum += handlers.watch.time('iteration') log(handlers, tstamp_iteration, tsum, result_blob, handlers.cfg, epoch, data_blob['index'][0]) handlers.iteration += 1 # Metrics # TODO # Finalize if handlers.csv_logger: handlers.csv_logger.close()