Ticket #8554: ReduceSCD_Parallel_v1.py

File ReduceSCD_Parallel_v1.py, 8.9 KB (added by Peter Peterson, 7 years ago)
Line 
1
2# File: ReduceSCD_Parallel.py
3#
4# Version 2.0, modified to work with Mantid's new python interface.
5#
6# This script will run multiple instances of the script ReduceOneSCD_Run.py
7# in parallel, using either local processes or a slurm partition.  After
8# using the ReduceOneSCD_Run script to find, index and integrate peaks from
9# multiple runs, this script merges the integrated peaks files and re-indexes
10# them in a consistent way.  If desired, the indexing can also be changed to a
11# specified conventional cell.
12# Many intermediate files are generated and saved, so all output is written
13# to a specified output_directory.  This output directory must be created
14# before running this script, and must be specified in the configuration file.
15# The user should first make sure that all parameters are set properly in
16# the configuration file for the ReduceOneSCD_Run.py script, and that that
17# script will properly reduce one scd run.  Once a single run can be properly
18# reduced, set the additional parameters in the configuration file that specify
19# how the the list of runs will be processed in parallel.
20#
21
22#
23# _v1: December 3rd 2013. Mads Joergensen
24# This version now includes the posibility to use the 1D cylindrical integration method
25# and the posibility to load a UB matrix which will be used for integration of the individual
26# runs and to index the combined file (Code from Xiapoing).
27#
28
29import os
30import sys
31import threading
32import time
33import ReduceDictionary
34
35#sys.path.append("/opt/mantidnightly/bin")
36sys.path.append("/opt/Mantid/bin")
37
38from mantid.simpleapi import *
39
40print "API Version"
41print apiVersion()
42
43start_time = time.time()
44
45# -------------------------------------------------------------------------
46# ProcessThread is a simple local class.  Each instance of ProcessThread is
47# a thread that starts a command line process to reduce one run.
48#
49class ProcessThread ( threading.Thread ):
50   command = ""
51
52   def setCommand( self, command="" ):
53      self.command = command
54
55   def run ( self ):
56      print 'STARTING PROCESS: ' + self.command
57      os.system( self.command )
58
59# -------------------------------------------------------------------------
60
61#
62# Get the config file name from the command line
63#
64if (len(sys.argv) < 2):
65  print "You MUST give the config file name on the command line"
66  exit(0)
67
68config_file_name = sys.argv[1]
69
70#
71# Load the parameter names and values from the specified configuration file
72# into a dictionary and set all the required parameters from the dictionary.
73#
74
75params_dictionary = ReduceDictionary.LoadDictionary( config_file_name )
76
77exp_name              = params_dictionary[ "exp_name" ]
78output_directory      = params_dictionary[ "output_directory" ]
79reduce_one_run_script = params_dictionary[ "reduce_one_run_script" ]
80slurm_queue_name      = params_dictionary[ "slurm_queue_name" ] 
81max_processes         = int(params_dictionary[ "max_processes" ])
82min_d                 = params_dictionary[ "min_d" ]
83max_d                 = params_dictionary[ "max_d" ]
84tolerance             = params_dictionary[ "tolerance" ]
85cell_type             = params_dictionary[ "cell_type" ] 
86centering             = params_dictionary[ "centering" ]
87run_nums              = params_dictionary[ "run_nums" ]
88
89use_cylindrical_integration = params_dictionary[ "use_cylindrical_integration" ]
90instrument_name       = params_dictionary[ "instrument_name" ]
91
92read_UB               = params_dictionary[ "read_UB" ]
93UB_filename           = params_dictionary[ "UB_filename" ]
94
95#
96# Make the list of separate process commands.  If a slurm queue name
97# was specified, run the processes using slurm, otherwise just use
98# multiple processes on the local machine.
99#
100list=[]
101index = 0
102for r_num in run_nums:
103  list.append( ProcessThread() )
104  cmd = 'python ' + reduce_one_run_script + ' ' + config_file_name + ' ' + str(r_num)
105  if slurm_queue_name is not None:
106    console_file = output_directory + "/" + str(r_num) + "_output.txt"
107    cmd =  'srun -p ' + slurm_queue_name + \
108           ' --cpus-per-task=3 -J ReduceSCD_Parallel.py -o ' + console_file + ' ' + cmd
109  list[index].setCommand( cmd )
110  index = index + 1
111
112#
113# Now create and start a thread for each command to run the commands in parallel,
114# starting up to max_processes simultaneously. 
115#
116all_done = False
117active_list=[]
118while not all_done:
119  if ( len(list) > 0 and len(active_list) < max_processes ):
120    thread = list[0]
121    list.remove(thread)
122    active_list.append( thread ) 
123    thread.start()
124  time.sleep(2)
125  for thread in active_list:
126    if not thread.isAlive():
127      active_list.remove( thread )
128  if len(list) == 0 and len(active_list) == 0 :
129    all_done = True
130
131print "\n**************************************************************************************"
132print   "************** Completed Individual Runs, Starting to Combine Results ****************"
133print   "**************************************************************************************\n"
134
135#
136# First combine all of the integrated files, by reading the separate files and
137# appending them to a combined output file.
138#
139niggli_name = output_directory + "/" + exp_name + "_Niggli"
140niggli_integrate_file = niggli_name + ".integrate"
141niggli_matrix_file = niggli_name + ".mat"
142
143first_time = True
144if not use_cylindrical_integration:
145  for r_num in run_nums:
146    one_run_file = output_directory + '/' + str(r_num) + '_Niggli.integrate'
147    peaks_ws = LoadIsawPeaks( Filename=one_run_file )
148    if first_time:
149      SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=False, Filename=niggli_integrate_file )
150      first_time = False
151    else:
152      SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=True, Filename=niggli_integrate_file )
153
154#
155# Load the combined file and re-index all of the peaks together.
156# Save them back to the combined Niggli file (Or selcted UB file if in use...)
157#
158  peaks_ws = LoadIsawPeaks( Filename=niggli_integrate_file )
159
160#
161# Find a Niggli UB matrix that indexes the peaks in this run
162# Load UB instead of Using FFT
163#Index peaks using UB from UB of initial orientation run/or combined runs from first iteration of crystal orientation refinement
164  if read_UB:
165    LoadIsawUB(InputWorkspace=peaks_ws, Filename=UB_filename)
166  #OptimizeCrystalPlacement(PeaksWorkspace=peaks_ws,ModifiedPeaksWorkspace=peaks_ws,FitInfoTable='CrystalPlacement_info',MaxIndexingError=tolerance)
167  else:
168    FindUBUsingFFT( PeaksWorkspace=peaks_ws, MinD=min_d, MaxD=max_d, Tolerance=tolerance )
169
170  IndexPeaks( PeaksWorkspace=peaks_ws, Tolerance=tolerance )
171  SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=False, Filename=niggli_integrate_file )
172  SaveIsawUB( InputWorkspace=peaks_ws, Filename=niggli_matrix_file )
173
174#
175# If requested, also switch to the specified conventional cell and save the
176# corresponding matrix and integrate file
177#
178if not use_cylindrical_integration:
179  if (not cell_type is None) and (not centering is None) :
180    conv_name = output_directory + "/" + exp_name + "_" + cell_type + "_" + centering
181    conventional_integrate_file = conv_name + ".integrate"
182    conventional_matrix_file = conv_name + ".mat"
183
184    SelectCellOfType( PeaksWorkspace=peaks_ws, CellType=cell_type, Centering=centering,
185                      Apply=True, Tolerance=tolerance )
186    SaveIsawPeaks( InputWorkspace=peaks_ws, AppendFile=False, Filename=conventional_integrate_file )
187    SaveIsawUB( InputWorkspace=peaks_ws, Filename=conventional_matrix_file )
188
189if use_cylindrical_integration: 
190  if (not cell_type is None) or (not centering is None):
191    print "WARNING: Cylindrical profiles are NOT transformed!!!"
192  # Combine *.profiles files
193  filename = output_directory + '/' + exp_name + '.profiles'
194  output = open( filename, 'w' )
195
196  # Read and write the first run profile file with header.
197  r_num = run_nums[0]
198  filename = output_directory + '/' + instrument_name + '_' + r_num + '.profiles'
199  input = open( filename, 'r' )
200  file_all_lines = input.read()
201  output.write(file_all_lines)
202  input.close()
203  os.remove(filename)
204
205  # Read and write the rest of the runs without the header.
206  for r_num in run_nums[1:]:
207      filename = output_directory + '/' + instrument_name + '_' + r_num + '.profiles'
208      input = open(filename, 'r')
209      for line in input:
210          if line[0] == '0': break
211      output.write(line)
212      for line in input:
213          output.write(line)
214      input.close()
215      os.remove(filename)
216
217  # Remove *.integrate file(s) ONLY USED FOR CYLINDRICAL INTEGRATION!
218  for file in os.listdir(output_directory):
219    if file.endswith('.integrate'):
220      os.remove(file)
221
222end_time = time.time()
223
224print "\n**************************************************************************************"
225print   "****************************** DONE PROCESSING ALL RUNS ******************************"
226print   "**************************************************************************************\n"
227
228print 'Total time:   ' + str(end_time - start_time) + ' sec'
229print 'Connfig file: ' + config_file_name
230print 'Script file:  ' + reduce_one_run_script + '\n'
231print