1 | #!/usr/bin/env python |
---|
2 | """ |
---|
3 | Post Process Administrator. It kicks off cataloging and reduction jobs. |
---|
4 | """ |
---|
5 | import logging, json, socket, os, sys, subprocess, time, shutil, imp, stomp, re |
---|
6 | import logging.handlers |
---|
7 | |
---|
8 | logger = logging.getLogger(__name__) |
---|
9 | logger.setLevel(logging.DEBUG) |
---|
10 | handler = logging.handlers.RotatingFileHandler('/var/log/autoreduction.log', maxBytes=104857600, backupCount=20) |
---|
11 | handler.setLevel(logging.DEBUG) |
---|
12 | formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
---|
13 | handler.setFormatter(formatter) |
---|
14 | logger.addHandler(handler) |
---|
15 | # Quite the Stomp logs as they are quite chatty |
---|
16 | logging.getLogger('stomp').setLevel(logging.DEBUG) |
---|
17 | |
---|
18 | |
---|
19 | # Currently, depending on the instrument, two different archives are used |
---|
20 | ARCHIVE_DIRECTORY = '/isis/NDX%s/Instrument/data/cycle_%s/autoreduced/RB%s/%s' # %(instrument, cycle, experiment_number, run_number) |
---|
21 | ARCHIVE_DIRECTORY_CEPT = '/instrument/%s/CYCLE20%s/RB%s/autoreduced/%s' # %(instrument, cycle, experiment_number, run_number) |
---|
22 | |
---|
23 | # where data are temporarily stored during reduction |
---|
24 | TEMP_ROOT_DIRECTORY = '/autoreducetmp' |
---|
25 | |
---|
26 | def copytree(src, dst): |
---|
27 | if not os.path.exists(dst): |
---|
28 | os.makedirs(dst) |
---|
29 | for item in os.listdir(src): |
---|
30 | s = os.path.join(src, item) |
---|
31 | d = os.path.join(dst, item) |
---|
32 | if os.path.isdir(s): |
---|
33 | copytree(s, d) |
---|
34 | else: |
---|
35 | if not os.path.exists(d) or os.stat(src).st_mtime - os.stat(dst).st_mtime > 1: |
---|
36 | shutil.copy2(s, d) |
---|
37 | |
---|
38 | def linux_to_windows_path(path): |
---|
39 | path = path.replace('/', '\\') |
---|
40 | # '/isis/' maps to '\\isis\inst$\' |
---|
41 | path = path.replace('\\isis\\', '\\\\isis\\inst$\\') |
---|
42 | return path |
---|
43 | |
---|
44 | def windows_to_linux_path(path): |
---|
45 | # '\\isis\inst$\' maps to '/isis/' |
---|
46 | path = path.replace('\\\\isis\\inst$\\', '/isis/') |
---|
47 | path = path.replace('\\\\autoreduce\\data\\', TEMP_ROOT_DIRECTORY+'/data/') |
---|
48 | path = path.replace('\\', '/') |
---|
49 | return path |
---|
50 | |
---|
51 | class PostProcessAdmin: |
---|
52 | def __init__(self, data, conf, connection): |
---|
53 | |
---|
54 | logger.debug("json data: " + str(data)) |
---|
55 | data["information"] = socket.gethostname() |
---|
56 | self.data = data |
---|
57 | self.conf = conf |
---|
58 | self.client = connection |
---|
59 | |
---|
60 | try: |
---|
61 | if data.has_key('data'): |
---|
62 | self.data_file = windows_to_linux_path(str(data['data'])) |
---|
63 | logger.debug("data_file: %s" % self.data_file) |
---|
64 | else: |
---|
65 | raise ValueError("data is missing") |
---|
66 | |
---|
67 | if data.has_key('facility'): |
---|
68 | self.facility = str(data['facility']).upper() |
---|
69 | logger.debug("facility: %s" % self.facility) |
---|
70 | else: |
---|
71 | raise ValueError("facility is missing") |
---|
72 | |
---|
73 | if data.has_key('instrument'): |
---|
74 | self.instrument = str(data['instrument']).upper() |
---|
75 | logger.debug("instrument: %s" % self.instrument) |
---|
76 | else: |
---|
77 | raise ValueError("instrument is missing") |
---|
78 | |
---|
79 | if data.has_key('rb_number'): |
---|
80 | self.proposal = str(data['rb_number']).upper() |
---|
81 | logger.debug("rb_number: %s" % self.proposal) |
---|
82 | else: |
---|
83 | raise ValueError("rb_number is missing") |
---|
84 | |
---|
85 | if data.has_key('run_number'): |
---|
86 | self.run_number = str(data['run_number']) |
---|
87 | logger.debug("run_number: %s" % self.run_number) |
---|
88 | else: |
---|
89 | raise ValueError("run_number is missing") |
---|
90 | |
---|
91 | if data.has_key('reduction_script'): |
---|
92 | self.reduction_script = windows_to_linux_path(str(data['reduction_script'])) |
---|
93 | logger.debug("reduction_script: %s" % str(self.reduction_script)) |
---|
94 | else: |
---|
95 | raise ValueError("reduction_script is missing") |
---|
96 | |
---|
97 | if data.has_key('reduction_arguments'): |
---|
98 | self.reduction_arguments = data['reduction_arguments'] |
---|
99 | logger.debug("reduction_arguments: %s" % self.reduction_arguments) |
---|
100 | else: |
---|
101 | raise ValueError("reduction_arguments is missing") |
---|
102 | |
---|
103 | except ValueError: |
---|
104 | logger.error('JSON data error', exc_info=True) |
---|
105 | raise |
---|
106 | |
---|
107 | def parse_input_variable(self, default, value): |
---|
108 | varType = type(default) |
---|
109 | if varType.__name__ == "str": |
---|
110 | return str(value) |
---|
111 | if varType.__name__ == "int": |
---|
112 | return int(value) |
---|
113 | if varType.__name__ == "list": |
---|
114 | return value.split(',') |
---|
115 | if varType.__name__ == "bool": |
---|
116 | return (value.lower() is 'true') |
---|
117 | if varType.__name__ == "float": |
---|
118 | return float(value) |
---|
119 | |
---|
120 | def replace_variables(self, reduce_script): |
---|
121 | if hasattr(reduce_script, 'web_var'): |
---|
122 | if hasattr(reduce_script.web_var, 'standard_vars'): |
---|
123 | for key in reduce_script.web_var.standard_vars: |
---|
124 | if 'standard_vars' in self.reduction_arguments and key in self.reduction_arguments['standard_vars']: |
---|
125 | if type(self.reduction_arguments['standard_vars'][key]).__name__ == 'unicode': |
---|
126 | self.reduction_arguments['standard_vars'][key] = self.reduction_arguments['standard_vars'][key].encode('ascii','ignore') |
---|
127 | reduce_script.web_var.standard_vars[key] = self.reduction_arguments['standard_vars'][key] |
---|
128 | if hasattr(reduce_script.web_var, 'advanced_vars'): |
---|
129 | for key in reduce_script.web_var.advanced_vars: |
---|
130 | if 'advanced_vars' in self.reduction_arguments and key in self.reduction_arguments['advanced_vars']: |
---|
131 | if type(self.reduction_arguments['advanced_vars'][key]).__name__ == 'unicode': |
---|
132 | self.reduction_arguments['advanced_vars'][key] = self.reduction_arguments['advanced_vars'][key].encode('ascii','ignore') |
---|
133 | reduce_script.web_var.advanced_vars[key] = self.reduction_arguments['advanced_vars'][key] |
---|
134 | return reduce_script |
---|
135 | |
---|
136 | def reduce(self): |
---|
137 | print "\n> In reduce()\n" |
---|
138 | try: |
---|
139 | print "\nCalling: " + self.conf['reduction_started'] + "\n" + json.dumps(self.data) + "\n" |
---|
140 | logger.debug("Calling: " + self.conf['reduction_started'] + "\n" + json.dumps(self.data)) |
---|
141 | self.client.send(self.conf['reduction_started'], json.dumps(self.data)) |
---|
142 | |
---|
143 | # specify instrument directory |
---|
144 | CEPH_INSTRUMENTS = ['LET', 'MARI', 'MER', 'MERLIN', 'MAPS'] |
---|
145 | instrument_dir = '' |
---|
146 | if self.instrument.upper() in CEPH_INSTRUMENTS: |
---|
147 | cycle = re.sub('[_]','',(re.match('.*cycle_(\d\d_\d).*', self.data['data'].lower()).group(1))) |
---|
148 | instrument_dir = ARCHIVE_DIRECTORY_CEPH % (self.instrument.upper(), cycle, self.data['rb_number'], self.data['run_number']) |
---|
149 | else: |
---|
150 | cycle = re.match('.*cycle_(\d\d_\d).*', self.data['data'].lower()).group(1) |
---|
151 | instrument_dir = ARCHIVE_DIRECTORY % (self.instrument.upper(), cycle, self.data['rb_number'], self.data['run_number']) |
---|
152 | |
---|
153 | # specify script to run and directory |
---|
154 | if os.path.exists(os.path.join(self.reduction_script, "reduce.py")) == False: |
---|
155 | self.data['message'] = "Reduce script doesn't exist within %s" % self.reduction_script |
---|
156 | logger.error("Reduction script not found within %s" % self.reduction_script) |
---|
157 | self.client.send(self.conf['reduction_error'] , json.dumps(self.data)) |
---|
158 | print "\nCalling: "+self.conf['reduction_error'] + "\n" + json.dumps(self.data) + "\n" |
---|
159 | logger.debug("Calling: "+self.conf['reduction_error'] + "\n" + json.dumps(self.data)) |
---|
160 | return |
---|
161 | |
---|
162 | # specify directory where autoreduction output goes |
---|
163 | run_output_dir = TEMP_ROOT_DIRECTORY + instrument_dir[:instrument_dir.find('/'+ str(self.data['run_number']))+1] |
---|
164 | reduce_result_dir = TEMP_ROOT_DIRECTORY + instrument_dir + "/results/" |
---|
165 | reduce_result_dir_tail_length = len("/results") |
---|
166 | if not os.path.isdir(reduce_result_dir): |
---|
167 | os.makedirs(reduce_result_dir) |
---|
168 | |
---|
169 | log_dir = reduce_result_dir + "reduction_log/" |
---|
170 | if not os.path.exists(log_dir): |
---|
171 | os.makedirs(log_dir) |
---|
172 | |
---|
173 | # Load reduction script |
---|
174 | logger.debug("Paths current %s" % str(sys.path)) |
---|
175 | sys.path.append(self.reduction_script) |
---|
176 | reduce_script = imp.load_source('reducescript', os.path.join(self.reduction_script, "reduce.py")) |
---|
177 | out_log = os.path.join(log_dir, self.data['rb_number'] + ".log") |
---|
178 | out_err = os.path.join(reduce_result_dir, self.data['rb_number'] + ".err") |
---|
179 | |
---|
180 | logger.info("----------------") |
---|
181 | logger.info("Reduction script: %s" % self.reduction_script) |
---|
182 | logger.info("Result dir: %s" % reduce_result_dir) |
---|
183 | logger.info("Run Output dir: %s" % run_output_dir) |
---|
184 | logger.info("Log dir: %s" % log_dir) |
---|
185 | logger.info("Out log: %s" % out_log) |
---|
186 | logger.info("Error log: %s" % out_err) |
---|
187 | logger.info("----------------") |
---|
188 | |
---|
189 | logger.info("Reduction subprocess started.") |
---|
190 | logFile=open(out_log, "w") |
---|
191 | errFile=open(out_err, "w") |
---|
192 | # Set the output to be the logfile |
---|
193 | sys.stdout = logFile |
---|
194 | sys.stderr = errFile |
---|
195 | |
---|
196 | reduce_script = self.replace_variables(reduce_script) |
---|
197 | |
---|
198 | out_directories = reduce_script.main(input_file=str(self.data_file), output_dir=str(reduce_result_dir)) |
---|
199 | logger.info("this is the reduce results directory %s" % str(reduce_result_dir)) |
---|
200 | logger.info("this is the entire outdirectories %s" % str(out_directories)) |
---|
201 | |
---|
202 | # Reset outputs back to default |
---|
203 | sys.stdout = sys.__stdout__ |
---|
204 | sys.stderr = sys.__stderr__ |
---|
205 | logFile.close() |
---|
206 | errFile.close() |
---|
207 | |
---|
208 | # if errFile is empty don't output it to the user |
---|
209 | #if (os.stat(out_err).st_size == 0): |
---|
210 | # logger.debug("No output errors from reduce.py") |
---|
211 | # os.remove(out_err) |
---|
212 | #else: |
---|
213 | # logger.debug("Output errors from reduce.py") |
---|
214 | |
---|
215 | logger.info("Reduction subprocess completed.") |
---|
216 | logger.info("Additional save directories: %s" % out_directories) |
---|
217 | |
---|
218 | self.data['reduction_data'] = [] |
---|
219 | if "message" not in self.data: |
---|
220 | self.data["message"] = "" |
---|
221 | |
---|
222 | # If the reduce script specified some additional save directories, copy to there first |
---|
223 | if out_directories: |
---|
224 | if type(out_directories) is str and os.access(out_directories, os.R_OK): |
---|
225 | self.data['reduction_data'].append(linux_to_windows_path(out_directories)) |
---|
226 | if not os.path.exists(out_directories): |
---|
227 | os.makedirs(out_directories) |
---|
228 | try: |
---|
229 | copytree(run_output_dir[:-1], out_directories) |
---|
230 | except Exception, e: |
---|
231 | logger.error("Unable to copy %s to %s - %s" % (run_output_dir[:-1], out_directories, e)) |
---|
232 | self.data["message"] += "Unable to copy to %s - %s. " % (out_directories, e) |
---|
233 | elif type(out_directories) is list: |
---|
234 | for out_dir in out_directories: |
---|
235 | self.data['reduction_data'].append(linux_to_windows_path(out_dir)) |
---|
236 | if not os.path.exists(out_dir): |
---|
237 | os.makedirs(out_dir) |
---|
238 | if type(out_dir) is str and os.access(out_dir, os.R_OK): |
---|
239 | try: |
---|
240 | copytree(run_output_dir[:-1], out_dir) |
---|
241 | except Exception, e: |
---|
242 | logger.error("Unable to copy %s to %s - %s" % (run_output_dir[:-1], out_dir, e)) |
---|
243 | self.data["message"] += "Unable to copy to %s - %s. " % (out_dir, e) |
---|
244 | else: |
---|
245 | logger.error("Unable to access directory: %s" % out_dir) |
---|
246 | |
---|
247 | # Move from tmp directory to actual directory (remove /tmp from |
---|
248 | # replace old data if they exist |
---|
249 | if os.path.isdir(reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length]): |
---|
250 | try: |
---|
251 | shutil.rmtree(reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length], ignore_errors=True) |
---|
252 | except Exception, e: |
---|
253 | logger.error("Unable to remove existing directory %s - %s" % (reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length], e)) |
---|
254 | try: |
---|
255 | os.makedirs(reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length]) |
---|
256 | except Exception, e: |
---|
257 | logger.error("Unable to create %s - %s" % (reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length], e)) |
---|
258 | self.data["message"] += "Unable to create %s - %s. " % (reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length], e) |
---|
259 | |
---|
260 | # [4,-8] is used to remove the prepending '/tmp' and the trailing 'results/' from the destination |
---|
261 | self.data['reduction_data'].append(linux_to_windows_path(reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length])) |
---|
262 | logger.info("Moving %s to %s" % (reduce_result_dir[:-1], reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length])) |
---|
263 | try: |
---|
264 | shutil.copytree(reduce_result_dir[:-1], reduce_result_dir[len(TEMP_ROOT_DIRECTORY):]) |
---|
265 | except Exception, e: |
---|
266 | logger.error("Unable to copy to %s - %s" % (reduce_result_dir[len(TEMP_ROOT_DIRECTORY):], e)) |
---|
267 | self.data["message"] += "Unable to copy to %s - %s. " % (reduce_result_dir[len(TEMP_ROOT_DIRECTORY):], e) |
---|
268 | |
---|
269 | if os.stat(out_err).st_size == 0: |
---|
270 | os.remove(out_err) |
---|
271 | self.client.send(self.conf['reduction_complete'] , json.dumps(self.data)) |
---|
272 | print "\nCalling: "+self.conf['reduction_complete'] + "\n" + json.dumps(self.data) + "\n" |
---|
273 | else: |
---|
274 | maxLineLength=80 |
---|
275 | fp=file(out_err, "r") |
---|
276 | fp.seek(-maxLineLength-1, 2) # 2 means "from the end of the file" |
---|
277 | lastLine = fp.readlines()[-1] |
---|
278 | errMsg = lastLine.strip() + ", see reduction_log/" + os.path.basename(out_log) + " or " + os.path.basename(out_err) + " for details." |
---|
279 | self.data["message"] = "REDUCTION: %s" % errMsg |
---|
280 | self.client.send(self.conf['reduction_error'] , json.dumps(self.data)) |
---|
281 | logger.error("Called "+self.conf['reduction_error'] + " --- " + json.dumps(self.data)) |
---|
282 | |
---|
283 | # Remove temporary working directory |
---|
284 | try: |
---|
285 | shutil.rmtree(reduce_result_dir[:-reduce_result_dir_tail_length], ignore_errors=True) |
---|
286 | except Exception, e: |
---|
287 | logger.error("Unable to remove temporary directory %s - %s" % reduce_result_dir) |
---|
288 | |
---|
289 | logger.info("Reduction job complete") |
---|
290 | except Exception, e: |
---|
291 | try: |
---|
292 | self.data["message"] = "REDUCTION Error: %s " % e |
---|
293 | logger.exception("Called "+self.conf['reduction_error'] + "\nException: " + str(e) + "\nJSON: " + json.dumps(self.data)) |
---|
294 | self.client.send(self.conf['reduction_error'] , json.dumps(self.data)) |
---|
295 | except BaseException, e: |
---|
296 | print "\nFailed to send to queue!\n%s\n%s" % (e, repr(e)) |
---|
297 | logger.error("Failed to send to queue! - %s - %s" % (e, repr(e))) |
---|
298 | |
---|
299 | if __name__ == "__main__": |
---|
300 | logger.debug("Paths current %s" % str(sys.path)) |
---|
301 | print "\n> In PostProcessAdmin.py\n" |
---|
302 | |
---|
303 | try: |
---|
304 | conf = json.load(open('/etc/autoreduce/post_process_consumer.conf')) |
---|
305 | |
---|
306 | brokers = [] |
---|
307 | brokers.append((conf['brokers'].split(':')[0],int(conf['brokers'].split(':')[1]))) |
---|
308 | connection = stomp.Connection(host_and_ports=brokers, use_ssl=True, ssl_version=3 ) |
---|
309 | connection.start() |
---|
310 | connection.connect(conf['amq_user'], conf['amq_pwd'], wait=True, header={'activemq.prefetchSize': '1',}) |
---|
311 | |
---|
312 | destination, message = sys.argv[1:3] |
---|
313 | print("destination: " + destination) |
---|
314 | print("message: " + message) |
---|
315 | data = json.loads(message) |
---|
316 | |
---|
317 | try: |
---|
318 | pp = PostProcessAdmin(data, conf, connection) |
---|
319 | if destination == '/queue/ReductionPending': |
---|
320 | pp.reduce() |
---|
321 | |
---|
322 | except ValueError as e: |
---|
323 | data["error"] = str(e) |
---|
324 | logger.error("JSON data error: " + json.dumps(data)) |
---|
325 | |
---|
326 | connection.send(conf['postprocess_error'], json.dumps(data)) |
---|
327 | print("Called " + conf['postprocess_error'] + "----" + json.dumps(data)) |
---|
328 | raise |
---|
329 | |
---|
330 | except: |
---|
331 | raise |
---|
332 | |
---|
333 | except Error as er: |
---|
334 | logger.error("Something went wrong: " + str(er)) |
---|
335 | sys.exit() |
---|
336 | |
---|
337 | |
---|