1 | |
---|
2 | # File: ReduceSCD_Parallel.py |
---|
3 | # |
---|
4 | # Version 2.0, modified to work with Mantid's new python interface. |
---|
5 | # |
---|
6 | # This script will run multiple instances of the script ReduceOneSCD_Run.py |
---|
7 | # in parallel, using either local processes or a slurm partition. After |
---|
8 | # using the ReduceOneSCD_Run script to find, index and integrate peaks from |
---|
9 | # multiple runs, this script merges the integrated peaks files and re-indexes |
---|
10 | # them in a consistent way. If desired, the indexing can also be changed to a |
---|
11 | # specified conventional cell. |
---|
12 | # Many intermediate files are generated and saved, so all output is written |
---|
13 | # to a specified output_directory. This output directory must be created |
---|
14 | # before running this script, and must be specified in the configuration file. |
---|
15 | # The user should first make sure that all parameters are set properly in |
---|
16 | # the configuration file for the ReduceOneSCD_Run.py script, and that that |
---|
17 | # script will properly reduce one scd run. Once a single run can be properly |
---|
18 | # reduced, set the additional parameters in the configuration file that specify |
---|
19 | # how the the list of runs will be processed in parallel. |
---|
20 | # |
---|
21 | |
---|
22 | # |
---|
23 | # _v1: December 3rd 2013. Mads Joergensen |
---|
24 | # This version now includes the posibility to use the 1D cylindrical integration method |
---|
25 | # and the posibility to load a UB matrix which will be used for integration of the individual |
---|
26 | # runs and to index the combined file (Code from Xiapoing). |
---|
27 | # |
---|
28 | |
---|
29 | import os |
---|
30 | import sys |
---|
31 | import threading |
---|
32 | import time |
---|
33 | import ReduceDictionary |
---|
34 | |
---|
35 | #sys.path.append("/opt/mantidnightly/bin") |
---|
36 | sys.path.append("/opt/Mantid/bin") |
---|
37 | |
---|
38 | from mantid.simpleapi import * |
---|
39 | |
---|
40 | print "API Version" |
---|
41 | print apiVersion() |
---|
42 | |
---|
43 | start_time = time.time() |
---|
44 | |
---|
45 | # ------------------------------------------------------------------------- |
---|
46 | # ProcessThread is a simple local class. Each instance of ProcessThread is |
---|
47 | # a thread that starts a command line process to reduce one run. |
---|
48 | # |
---|
49 | class ProcessThread ( threading.Thread ): |
---|
50 | command = "" |
---|
51 | |
---|
52 | def setCommand( self, command="" ): |
---|
53 | self.command = command |
---|
54 | |
---|
55 | def run ( self ): |
---|
56 | print 'STARTING PROCESS: ' + self.command |
---|
57 | os.system( self.command ) |
---|
58 | |
---|
59 | # ------------------------------------------------------------------------- |
---|
60 | |
---|
61 | # |
---|
62 | # Get the config file name from the command line |
---|
63 | # |
---|
64 | if (len(sys.argv) < 2): |
---|
65 | print "You MUST give the config file name on the command line" |
---|
66 | exit(0) |
---|
67 | |
---|
68 | config_file_name = sys.argv[1] |
---|
69 | |
---|
70 | # |
---|
71 | # Load the parameter names and values from the specified configuration file |
---|
72 | # into a dictionary and set all the required parameters from the dictionary. |
---|
73 | # |
---|
74 | |
---|
75 | params_dictionary = ReduceDictionary.LoadDictionary( config_file_name ) |
---|
76 | |
---|
77 | exp_name = params_dictionary[ "exp_name" ] |
---|
78 | output_directory = params_dictionary[ "output_directory" ] |
---|
79 | reduce_one_run_script = params_dictionary[ "reduce_one_run_script" ] |
---|
80 | slurm_queue_name = params_dictionary[ "slurm_queue_name" ] |
---|
81 | max_processes = int(params_dictionary[ "max_processes" ]) |
---|
82 | min_d = params_dictionary[ "min_d" ] |
---|
83 | max_d = params_dictionary[ "max_d" ] |
---|
84 | tolerance = params_dictionary[ "tolerance" ] |
---|
85 | cell_type = params_dictionary[ "cell_type" ] |
---|
86 | centering = params_dictionary[ "centering" ] |
---|
87 | run_nums = params_dictionary[ "run_nums" ] |
---|
88 | |
---|
89 | use_cylindrical_integration = params_dictionary[ "use_cylindrical_integration" ] |
---|
90 | instrument_name = params_dictionary[ "instrument_name" ] |
---|
91 | |
---|
92 | read_UB = params_dictionary[ "read_UB" ] |
---|
93 | UB_filename = params_dictionary[ "UB_filename" ] |
---|
94 | |
---|
95 | # |
---|
96 | # Make the list of separate process commands. If a slurm queue name |
---|
97 | # was specified, run the processes using slurm, otherwise just use |
---|
98 | # multiple processes on the local machine. |
---|
99 | # |
---|
100 | list=[] |
---|
101 | index = 0 |
---|
102 | for r_num in run_nums: |
---|
103 | list.append( ProcessThread() ) |
---|
104 | cmd = 'python ' + reduce_one_run_script + ' ' + config_file_name + ' ' + str(r_num) |
---|
105 | if slurm_queue_name is not None: |
---|
106 | console_file = output_directory + "/" + str(r_num) + "_output.txt" |
---|
107 | cmd = 'srun -p ' + slurm_queue_name + \ |
---|
108 | ' --cpus-per-task=3 -J ReduceSCD_Parallel.py -o ' + console_file + ' ' + cmd |
---|
109 | list[index].setCommand( cmd ) |
---|
110 | index = index + 1 |
---|
111 | |
---|
112 | # |
---|
113 | # Now create and start a thread for each command to run the commands in parallel, |
---|
114 | # starting up to max_processes simultaneously. |
---|
115 | # |
---|
116 | all_done = False |
---|
117 | active_list=[] |
---|
118 | while not all_done: |
---|
119 | if ( len(list) > 0 and len(active_list) < max_processes ): |
---|
120 | thread = list[0] |
---|
121 | list.remove(thread) |
---|
122 | active_list.append( thread ) |
---|
123 | thread.start() |
---|
124 | time.sleep(2) |
---|
125 | for thread in active_list: |
---|
126 | if not thread.isAlive(): |
---|
127 | active_list.remove( thread ) |
---|
128 | if len(list) == 0 and len(active_list) == 0 : |
---|
129 | all_done = True |
---|
130 | |
---|
131 | print "\n**************************************************************************************" |
---|
132 | print "************** Completed Individual Runs, Starting to Combine Results ****************" |
---|
133 | print "**************************************************************************************\n" |
---|
134 | |
---|
135 | # |
---|
136 | # First combine all of the integrated files, by reading the separate files and |
---|
137 | # appending them to a combined output file. |
---|
138 | # |
---|
139 | niggli_name = output_directory + "/" + exp_name + "_Niggli" |
---|
140 | niggli_integrate_file = niggli_name + ".integrate" |
---|
141 | niggli_matrix_file = niggli_name + ".mat" |
---|
142 | |
---|
143 | first_time = True |
---|
144 | if not use_cylindrical_integration: |
---|
145 | for r_num in run_nums: |
---|
146 | one_run_file = output_directory + '/' + str(r_num) + '_Niggli.integrate' |
---|
147 | peaks_ws = LoadIsawPeaks( Filename=one_run_file ) |
---|
148 | if first_time: |
---|
149 | SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=False, Filename=niggli_integrate_file ) |
---|
150 | first_time = False |
---|
151 | else: |
---|
152 | SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=True, Filename=niggli_integrate_file ) |
---|
153 | |
---|
154 | # |
---|
155 | # Load the combined file and re-index all of the peaks together. |
---|
156 | # Save them back to the combined Niggli file (Or selcted UB file if in use...) |
---|
157 | # |
---|
158 | peaks_ws = LoadIsawPeaks( Filename=niggli_integrate_file ) |
---|
159 | |
---|
160 | # |
---|
161 | # Find a Niggli UB matrix that indexes the peaks in this run |
---|
162 | # Load UB instead of Using FFT |
---|
163 | #Index peaks using UB from UB of initial orientation run/or combined runs from first iteration of crystal orientation refinement |
---|
164 | if read_UB: |
---|
165 | LoadIsawUB(InputWorkspace=peaks_ws, Filename=UB_filename) |
---|
166 | #OptimizeCrystalPlacement(PeaksWorkspace=peaks_ws,ModifiedPeaksWorkspace=peaks_ws,FitInfoTable='CrystalPlacement_info',MaxIndexingError=tolerance) |
---|
167 | else: |
---|
168 | FindUBUsingFFT( PeaksWorkspace=peaks_ws, MinD=min_d, MaxD=max_d, Tolerance=tolerance ) |
---|
169 | |
---|
170 | IndexPeaks( PeaksWorkspace=peaks_ws, Tolerance=tolerance ) |
---|
171 | SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=False, Filename=niggli_integrate_file ) |
---|
172 | SaveIsawUB( InputWorkspace=peaks_ws, Filename=niggli_matrix_file ) |
---|
173 | |
---|
174 | # |
---|
175 | # If requested, also switch to the specified conventional cell and save the |
---|
176 | # corresponding matrix and integrate file |
---|
177 | # |
---|
178 | if not use_cylindrical_integration: |
---|
179 | if (not cell_type is None) and (not centering is None) : |
---|
180 | conv_name = output_directory + "/" + exp_name + "_" + cell_type + "_" + centering |
---|
181 | conventional_integrate_file = conv_name + ".integrate" |
---|
182 | conventional_matrix_file = conv_name + ".mat" |
---|
183 | |
---|
184 | SelectCellOfType( PeaksWorkspace=peaks_ws, CellType=cell_type, Centering=centering, |
---|
185 | Apply=True, Tolerance=tolerance ) |
---|
186 | SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=False, Filename=conventional_integrate_file ) |
---|
187 | SaveIsawUB( InputWorkspace=peaks_ws, Filename=conventional_matrix_file ) |
---|
188 | |
---|
189 | if use_cylindrical_integration: |
---|
190 | if (not cell_type is None) or (not centering is None): |
---|
191 | print "WARNING: Cylindrical profiles are NOT transformed!!!" |
---|
192 | # Combine *.profiles files |
---|
193 | filename = output_directory + '/' + exp_name + '.profiles' |
---|
194 | output = open( filename, 'w' ) |
---|
195 | |
---|
196 | # Read and write the first run profile file with header. |
---|
197 | r_num = run_nums[0] |
---|
198 | filename = output_directory + '/' + instrument_name + '_' + r_num + '.profiles' |
---|
199 | input = open( filename, 'r' ) |
---|
200 | file_all_lines = input.read() |
---|
201 | output.write(file_all_lines) |
---|
202 | input.close() |
---|
203 | os.remove(filename) |
---|
204 | |
---|
205 | # Read and write the rest of the runs without the header. |
---|
206 | for r_num in run_nums[1:]: |
---|
207 | filename = output_directory + '/' + instrument_name + '_' + r_num + '.profiles' |
---|
208 | input = open(filename, 'r') |
---|
209 | for line in input: |
---|
210 | if line[0] == '0': break |
---|
211 | output.write(line) |
---|
212 | for line in input: |
---|
213 | output.write(line) |
---|
214 | input.close() |
---|
215 | os.remove(filename) |
---|
216 | |
---|
217 | # Remove *.integrate file(s) ONLY USED FOR CYLINDRICAL INTEGRATION! |
---|
218 | for file in os.listdir(output_directory): |
---|
219 | if file.endswith('.integrate'): |
---|
220 | os.remove(file) |
---|
221 | |
---|
222 | end_time = time.time() |
---|
223 | |
---|
224 | print "\n**************************************************************************************" |
---|
225 | print "****************************** DONE PROCESSING ALL RUNS ******************************" |
---|
226 | print "**************************************************************************************\n" |
---|
227 | |
---|
228 | print 'Total time: ' + str(end_time - start_time) + ' sec' |
---|
229 | print 'Connfig file: ' + config_file_name |
---|
230 | print 'Script file: ' + reduce_one_run_script + '\n' |
---|
231 | print |
---|