| 1 | |
|---|
| 2 | # File: ReduceSCD_Parallel.py |
|---|
| 3 | # |
|---|
| 4 | # Version 2.0, modified to work with Mantid's new python interface. |
|---|
| 5 | # |
|---|
| 6 | # This script will run multiple instances of the script ReduceOneSCD_Run.py |
|---|
| 7 | # in parallel, using either local processes or a slurm partition. After |
|---|
| 8 | # using the ReduceOneSCD_Run script to find, index and integrate peaks from |
|---|
| 9 | # multiple runs, this script merges the integrated peaks files and re-indexes |
|---|
| 10 | # them in a consistent way. If desired, the indexing can also be changed to a |
|---|
| 11 | # specified conventional cell. |
|---|
| 12 | # Many intermediate files are generated and saved, so all output is written |
|---|
| 13 | # to a specified output_directory. This output directory must be created |
|---|
| 14 | # before running this script, and must be specified in the configuration file. |
|---|
| 15 | # The user should first make sure that all parameters are set properly in |
|---|
| 16 | # the configuration file for the ReduceOneSCD_Run.py script, and that that |
|---|
| 17 | # script will properly reduce one scd run. Once a single run can be properly |
|---|
| 18 | # reduced, set the additional parameters in the configuration file that specify |
|---|
| 19 | # how the the list of runs will be processed in parallel. |
|---|
| 20 | # |
|---|
| 21 | |
|---|
| 22 | # |
|---|
| 23 | # _v1: December 3rd 2013. Mads Joergensen |
|---|
| 24 | # This version now includes the posibility to use the 1D cylindrical integration method |
|---|
| 25 | # and the posibility to load a UB matrix which will be used for integration of the individual |
|---|
| 26 | # runs and to index the combined file (Code from Xiapoing). |
|---|
| 27 | # |
|---|
| 28 | |
|---|
| 29 | import os |
|---|
| 30 | import sys |
|---|
| 31 | import threading |
|---|
| 32 | import time |
|---|
| 33 | import ReduceDictionary |
|---|
| 34 | |
|---|
| 35 | #sys.path.append("/opt/mantidnightly/bin") |
|---|
| 36 | sys.path.append("/opt/Mantid/bin") |
|---|
| 37 | |
|---|
| 38 | from mantid.simpleapi import * |
|---|
| 39 | |
|---|
| 40 | print "API Version" |
|---|
| 41 | print apiVersion() |
|---|
| 42 | |
|---|
| 43 | start_time = time.time() |
|---|
| 44 | |
|---|
| 45 | # ------------------------------------------------------------------------- |
|---|
| 46 | # ProcessThread is a simple local class. Each instance of ProcessThread is |
|---|
| 47 | # a thread that starts a command line process to reduce one run. |
|---|
| 48 | # |
|---|
| 49 | class ProcessThread ( threading.Thread ): |
|---|
| 50 | command = "" |
|---|
| 51 | |
|---|
| 52 | def setCommand( self, command="" ): |
|---|
| 53 | self.command = command |
|---|
| 54 | |
|---|
| 55 | def run ( self ): |
|---|
| 56 | print 'STARTING PROCESS: ' + self.command |
|---|
| 57 | os.system( self.command ) |
|---|
| 58 | |
|---|
| 59 | # ------------------------------------------------------------------------- |
|---|
| 60 | |
|---|
| 61 | # |
|---|
| 62 | # Get the config file name from the command line |
|---|
| 63 | # |
|---|
| 64 | if (len(sys.argv) < 2): |
|---|
| 65 | print "You MUST give the config file name on the command line" |
|---|
| 66 | exit(0) |
|---|
| 67 | |
|---|
| 68 | config_file_name = sys.argv[1] |
|---|
| 69 | |
|---|
| 70 | # |
|---|
| 71 | # Load the parameter names and values from the specified configuration file |
|---|
| 72 | # into a dictionary and set all the required parameters from the dictionary. |
|---|
| 73 | # |
|---|
| 74 | |
|---|
| 75 | params_dictionary = ReduceDictionary.LoadDictionary( config_file_name ) |
|---|
| 76 | |
|---|
| 77 | exp_name = params_dictionary[ "exp_name" ] |
|---|
| 78 | output_directory = params_dictionary[ "output_directory" ] |
|---|
| 79 | reduce_one_run_script = params_dictionary[ "reduce_one_run_script" ] |
|---|
| 80 | slurm_queue_name = params_dictionary[ "slurm_queue_name" ] |
|---|
| 81 | max_processes = int(params_dictionary[ "max_processes" ]) |
|---|
| 82 | min_d = params_dictionary[ "min_d" ] |
|---|
| 83 | max_d = params_dictionary[ "max_d" ] |
|---|
| 84 | tolerance = params_dictionary[ "tolerance" ] |
|---|
| 85 | cell_type = params_dictionary[ "cell_type" ] |
|---|
| 86 | centering = params_dictionary[ "centering" ] |
|---|
| 87 | run_nums = params_dictionary[ "run_nums" ] |
|---|
| 88 | |
|---|
| 89 | use_cylindrical_integration = params_dictionary[ "use_cylindrical_integration" ] |
|---|
| 90 | instrument_name = params_dictionary[ "instrument_name" ] |
|---|
| 91 | |
|---|
| 92 | read_UB = params_dictionary[ "read_UB" ] |
|---|
| 93 | UB_filename = params_dictionary[ "UB_filename" ] |
|---|
| 94 | |
|---|
| 95 | # |
|---|
| 96 | # Make the list of separate process commands. If a slurm queue name |
|---|
| 97 | # was specified, run the processes using slurm, otherwise just use |
|---|
| 98 | # multiple processes on the local machine. |
|---|
| 99 | # |
|---|
| 100 | list=[] |
|---|
| 101 | index = 0 |
|---|
| 102 | for r_num in run_nums: |
|---|
| 103 | list.append( ProcessThread() ) |
|---|
| 104 | cmd = 'python ' + reduce_one_run_script + ' ' + config_file_name + ' ' + str(r_num) |
|---|
| 105 | if slurm_queue_name is not None: |
|---|
| 106 | console_file = output_directory + "/" + str(r_num) + "_output.txt" |
|---|
| 107 | cmd = 'srun -p ' + slurm_queue_name + \ |
|---|
| 108 | ' --cpus-per-task=3 -J ReduceSCD_Parallel.py -o ' + console_file + ' ' + cmd |
|---|
| 109 | list[index].setCommand( cmd ) |
|---|
| 110 | index = index + 1 |
|---|
| 111 | |
|---|
| 112 | # |
|---|
| 113 | # Now create and start a thread for each command to run the commands in parallel, |
|---|
| 114 | # starting up to max_processes simultaneously. |
|---|
| 115 | # |
|---|
| 116 | all_done = False |
|---|
| 117 | active_list=[] |
|---|
| 118 | while not all_done: |
|---|
| 119 | if ( len(list) > 0 and len(active_list) < max_processes ): |
|---|
| 120 | thread = list[0] |
|---|
| 121 | list.remove(thread) |
|---|
| 122 | active_list.append( thread ) |
|---|
| 123 | thread.start() |
|---|
| 124 | time.sleep(2) |
|---|
| 125 | for thread in active_list: |
|---|
| 126 | if not thread.isAlive(): |
|---|
| 127 | active_list.remove( thread ) |
|---|
| 128 | if len(list) == 0 and len(active_list) == 0 : |
|---|
| 129 | all_done = True |
|---|
| 130 | |
|---|
| 131 | print "\n**************************************************************************************" |
|---|
| 132 | print "************** Completed Individual Runs, Starting to Combine Results ****************" |
|---|
| 133 | print "**************************************************************************************\n" |
|---|
| 134 | |
|---|
| 135 | # |
|---|
| 136 | # First combine all of the integrated files, by reading the separate files and |
|---|
| 137 | # appending them to a combined output file. |
|---|
| 138 | # |
|---|
| 139 | niggli_name = output_directory + "/" + exp_name + "_Niggli" |
|---|
| 140 | niggli_integrate_file = niggli_name + ".integrate" |
|---|
| 141 | niggli_matrix_file = niggli_name + ".mat" |
|---|
| 142 | |
|---|
| 143 | first_time = True |
|---|
| 144 | if not use_cylindrical_integration: |
|---|
| 145 | for r_num in run_nums: |
|---|
| 146 | one_run_file = output_directory + '/' + str(r_num) + '_Niggli.integrate' |
|---|
| 147 | peaks_ws = LoadIsawPeaks( Filename=one_run_file ) |
|---|
| 148 | if first_time: |
|---|
| 149 | SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=False, Filename=niggli_integrate_file ) |
|---|
| 150 | first_time = False |
|---|
| 151 | else: |
|---|
| 152 | SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=True, Filename=niggli_integrate_file ) |
|---|
| 153 | |
|---|
| 154 | # |
|---|
| 155 | # Load the combined file and re-index all of the peaks together. |
|---|
| 156 | # Save them back to the combined Niggli file (Or selcted UB file if in use...) |
|---|
| 157 | # |
|---|
| 158 | peaks_ws = LoadIsawPeaks( Filename=niggli_integrate_file ) |
|---|
| 159 | |
|---|
| 160 | # |
|---|
| 161 | # Find a Niggli UB matrix that indexes the peaks in this run |
|---|
| 162 | # Load UB instead of Using FFT |
|---|
| 163 | #Index peaks using UB from UB of initial orientation run/or combined runs from first iteration of crystal orientation refinement |
|---|
| 164 | if read_UB: |
|---|
| 165 | LoadIsawUB(InputWorkspace=peaks_ws, Filename=UB_filename) |
|---|
| 166 | #OptimizeCrystalPlacement(PeaksWorkspace=peaks_ws,ModifiedPeaksWorkspace=peaks_ws,FitInfoTable='CrystalPlacement_info',MaxIndexingError=tolerance) |
|---|
| 167 | else: |
|---|
| 168 | FindUBUsingFFT( PeaksWorkspace=peaks_ws, MinD=min_d, MaxD=max_d, Tolerance=tolerance ) |
|---|
| 169 | |
|---|
| 170 | IndexPeaks( PeaksWorkspace=peaks_ws, Tolerance=tolerance ) |
|---|
| 171 | SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=False, Filename=niggli_integrate_file ) |
|---|
| 172 | SaveIsawUB( InputWorkspace=peaks_ws, Filename=niggli_matrix_file ) |
|---|
| 173 | |
|---|
| 174 | # |
|---|
| 175 | # If requested, also switch to the specified conventional cell and save the |
|---|
| 176 | # corresponding matrix and integrate file |
|---|
| 177 | # |
|---|
| 178 | if not use_cylindrical_integration: |
|---|
| 179 | if (not cell_type is None) and (not centering is None) : |
|---|
| 180 | conv_name = output_directory + "/" + exp_name + "_" + cell_type + "_" + centering |
|---|
| 181 | conventional_integrate_file = conv_name + ".integrate" |
|---|
| 182 | conventional_matrix_file = conv_name + ".mat" |
|---|
| 183 | |
|---|
| 184 | SelectCellOfType( PeaksWorkspace=peaks_ws, CellType=cell_type, Centering=centering, |
|---|
| 185 | Apply=True, Tolerance=tolerance ) |
|---|
| 186 | SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=False, Filename=conventional_integrate_file ) |
|---|
| 187 | SaveIsawUB( InputWorkspace=peaks_ws, Filename=conventional_matrix_file ) |
|---|
| 188 | |
|---|
| 189 | if use_cylindrical_integration: |
|---|
| 190 | if (not cell_type is None) or (not centering is None): |
|---|
| 191 | print "WARNING: Cylindrical profiles are NOT transformed!!!" |
|---|
| 192 | # Combine *.profiles files |
|---|
| 193 | filename = output_directory + '/' + exp_name + '.profiles' |
|---|
| 194 | output = open( filename, 'w' ) |
|---|
| 195 | |
|---|
| 196 | # Read and write the first run profile file with header. |
|---|
| 197 | r_num = run_nums[0] |
|---|
| 198 | filename = output_directory + '/' + instrument_name + '_' + r_num + '.profiles' |
|---|
| 199 | input = open( filename, 'r' ) |
|---|
| 200 | file_all_lines = input.read() |
|---|
| 201 | output.write(file_all_lines) |
|---|
| 202 | input.close() |
|---|
| 203 | os.remove(filename) |
|---|
| 204 | |
|---|
| 205 | # Read and write the rest of the runs without the header. |
|---|
| 206 | for r_num in run_nums[1:]: |
|---|
| 207 | filename = output_directory + '/' + instrument_name + '_' + r_num + '.profiles' |
|---|
| 208 | input = open(filename, 'r') |
|---|
| 209 | for line in input: |
|---|
| 210 | if line[0] == '0': break |
|---|
| 211 | output.write(line) |
|---|
| 212 | for line in input: |
|---|
| 213 | output.write(line) |
|---|
| 214 | input.close() |
|---|
| 215 | os.remove(filename) |
|---|
| 216 | |
|---|
| 217 | # Remove *.integrate file(s) ONLY USED FOR CYLINDRICAL INTEGRATION! |
|---|
| 218 | for file in os.listdir(output_directory): |
|---|
| 219 | if file.endswith('.integrate'): |
|---|
| 220 | os.remove(file) |
|---|
| 221 | |
|---|
| 222 | end_time = time.time() |
|---|
| 223 | |
|---|
| 224 | print "\n**************************************************************************************" |
|---|
| 225 | print "****************************** DONE PROCESSING ALL RUNS ******************************" |
|---|
| 226 | print "**************************************************************************************\n" |
|---|
| 227 | |
|---|
| 228 | print 'Total time: ' + str(end_time - start_time) + ' sec' |
|---|
| 229 | print 'Connfig file: ' + config_file_name |
|---|
| 230 | print 'Script file: ' + reduce_one_run_script + '\n' |
|---|
| 231 | print |
|---|