Ticket #11554: PostProcessAdmin_two_archieves.py

File PostProcessAdmin_two_archieves.py, 16.7 KB (added by Anders Markvardsen, 5 years ago)
Line 
1#!/usr/bin/env python
2"""
3Post Process Administrator. It kicks off cataloging and reduction jobs.
4"""
5import logging, json, socket, os, sys, subprocess, time, shutil, imp, stomp, re
6import logging.handlers
7
8logger = logging.getLogger(__name__)
9logger.setLevel(logging.DEBUG)
10handler = logging.handlers.RotatingFileHandler('/var/log/autoreduction.log', maxBytes=104857600, backupCount=20)
11handler.setLevel(logging.DEBUG)
12formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
13handler.setFormatter(formatter)
14logger.addHandler(handler)
15# Quite the Stomp logs as they are quite chatty
16logging.getLogger('stomp').setLevel(logging.DEBUG)
17
18
19# Currently, depending on the instrument, two different archives are used
20ARCHIVE_DIRECTORY = '/isis/NDX%s/Instrument/data/cycle_%s/autoreduced/RB%s/%s' # %(instrument, cycle, experiment_number, run_number)
21ARCHIVE_DIRECTORY_CEPT = '/instrument/%s/CYCLE20%s/RB%s/autoreduced/%s' # %(instrument, cycle, experiment_number, run_number)
22
23# where data are temporarily stored during reduction
24TEMP_ROOT_DIRECTORY = '/autoreducetmp'
25
26def copytree(src, dst):
27    if not os.path.exists(dst):
28        os.makedirs(dst)
29    for item in os.listdir(src):
30        s = os.path.join(src, item)
31        d = os.path.join(dst, item)
32        if os.path.isdir(s):
33            copytree(s, d)
34        else:
35            if not os.path.exists(d) or os.stat(src).st_mtime - os.stat(dst).st_mtime > 1:
36                shutil.copy2(s, d)
37
38def linux_to_windows_path(path):
39    path = path.replace('/', '\\')
40    # '/isis/' maps to '\\isis\inst$\'
41    path = path.replace('\\isis\\', '\\\\isis\\inst$\\')
42    return path
43
44def windows_to_linux_path(path):
45    # '\\isis\inst$\' maps to '/isis/'
46    path = path.replace('\\\\isis\\inst$\\', '/isis/')
47    path = path.replace('\\\\autoreduce\\data\\', TEMP_ROOT_DIRECTORY+'/data/')
48    path = path.replace('\\', '/')
49    return path
50
51class PostProcessAdmin:
52    def __init__(self, data, conf, connection):
53
54        logger.debug("json data: " + str(data))
55        data["information"] = socket.gethostname()
56        self.data = data
57        self.conf = conf
58        self.client = connection
59
60        try:
61            if data.has_key('data'):
62                self.data_file = windows_to_linux_path(str(data['data']))
63                logger.debug("data_file: %s" % self.data_file)
64            else:
65                raise ValueError("data is missing")
66
67            if data.has_key('facility'):
68                self.facility = str(data['facility']).upper()
69                logger.debug("facility: %s" % self.facility)
70            else: 
71                raise ValueError("facility is missing")
72
73            if data.has_key('instrument'):
74                self.instrument = str(data['instrument']).upper()
75                logger.debug("instrument: %s" % self.instrument)
76            else:
77                raise ValueError("instrument is missing")
78
79            if data.has_key('rb_number'):
80                self.proposal = str(data['rb_number']).upper()
81                logger.debug("rb_number: %s" % self.proposal)
82            else:
83                raise ValueError("rb_number is missing")
84               
85            if data.has_key('run_number'):
86                self.run_number = str(data['run_number'])
87                logger.debug("run_number: %s" % self.run_number)
88            else:
89                raise ValueError("run_number is missing")
90               
91            if data.has_key('reduction_script'):
92                self.reduction_script = windows_to_linux_path(str(data['reduction_script']))
93                logger.debug("reduction_script: %s" % str(self.reduction_script))
94            else:
95                raise ValueError("reduction_script is missing")
96               
97            if data.has_key('reduction_arguments'):
98                self.reduction_arguments = data['reduction_arguments']
99                logger.debug("reduction_arguments: %s" % self.reduction_arguments)
100            else:
101                raise ValueError("reduction_arguments is missing")
102
103        except ValueError:
104            logger.error('JSON data error', exc_info=True)
105            raise
106
107    def parse_input_variable(self, default, value):
108        varType = type(default)
109        if varType.__name__ == "str":
110            return str(value)
111        if varType.__name__ == "int":
112            return int(value)
113        if varType.__name__ == "list":
114            return value.split(',')
115        if varType.__name__ == "bool":
116            return (value.lower() is 'true')
117        if varType.__name__ == "float":
118            return float(value)
119
120    def replace_variables(self, reduce_script):
121        if hasattr(reduce_script, 'web_var'):
122            if hasattr(reduce_script.web_var, 'standard_vars'):
123                for key in reduce_script.web_var.standard_vars:
124                    if 'standard_vars' in self.reduction_arguments and key in self.reduction_arguments['standard_vars']:
125                        if type(self.reduction_arguments['standard_vars'][key]).__name__ == 'unicode':
126                            self.reduction_arguments['standard_vars'][key] = self.reduction_arguments['standard_vars'][key].encode('ascii','ignore')
127                        reduce_script.web_var.standard_vars[key] = self.reduction_arguments['standard_vars'][key]
128            if hasattr(reduce_script.web_var, 'advanced_vars'):
129                for key in reduce_script.web_var.advanced_vars:
130                    if 'advanced_vars' in self.reduction_arguments and key in self.reduction_arguments['advanced_vars']:
131                        if type(self.reduction_arguments['advanced_vars'][key]).__name__ == 'unicode':
132                            self.reduction_arguments['advanced_vars'][key] = self.reduction_arguments['advanced_vars'][key].encode('ascii','ignore')
133                        reduce_script.web_var.advanced_vars[key] = self.reduction_arguments['advanced_vars'][key]
134        return reduce_script
135
136    def reduce(self):
137        print "\n> In reduce()\n"
138        try:         
139            print "\nCalling: " + self.conf['reduction_started'] + "\n" + json.dumps(self.data) + "\n"
140            logger.debug("Calling: " + self.conf['reduction_started'] + "\n" + json.dumps(self.data))
141            self.client.send(self.conf['reduction_started'], json.dumps(self.data))
142
143            # specify instrument directory
144            CEPH_INSTRUMENTS = ['LET', 'MARI', 'MER', 'MERLIN', 'MAPS']
145            instrument_dir = ''
146            if self.instrument.upper() in CEPH_INSTRUMENTS: 
147                cycle = re.sub('[_]','',(re.match('.*cycle_(\d\d_\d).*', self.data['data'].lower()).group(1)))
148                instrument_dir = ARCHIVE_DIRECTORY_CEPH % (self.instrument.upper(), cycle, self.data['rb_number'], self.data['run_number'])
149            else:
150                cycle = re.match('.*cycle_(\d\d_\d).*', self.data['data'].lower()).group(1)
151                instrument_dir = ARCHIVE_DIRECTORY % (self.instrument.upper(), cycle, self.data['rb_number'], self.data['run_number'])
152           
153            # specify script to run and directory
154            if os.path.exists(os.path.join(self.reduction_script, "reduce.py")) == False:
155                self.data['message'] = "Reduce script doesn't exist within %s" % self.reduction_script
156                logger.error("Reduction script not found within %s" % self.reduction_script)
157                self.client.send(self.conf['reduction_error'] , json.dumps(self.data)) 
158                print "\nCalling: "+self.conf['reduction_error'] + "\n" + json.dumps(self.data) + "\n"
159                logger.debug("Calling: "+self.conf['reduction_error'] + "\n" + json.dumps(self.data))
160                return
161           
162            # specify directory where autoreduction output goes
163            run_output_dir = TEMP_ROOT_DIRECTORY + instrument_dir[:instrument_dir.find('/'+ str(self.data['run_number']))+1]
164            reduce_result_dir = TEMP_ROOT_DIRECTORY + instrument_dir + "/results/"
165            reduce_result_dir_tail_length = len("/results")
166            if not os.path.isdir(reduce_result_dir):
167                os.makedirs(reduce_result_dir)
168
169            log_dir = reduce_result_dir + "reduction_log/"
170            if not os.path.exists(log_dir):
171                os.makedirs(log_dir)
172
173            # Load reduction script
174            logger.debug("Paths current %s" % str(sys.path))   
175            sys.path.append(self.reduction_script)
176            reduce_script = imp.load_source('reducescript', os.path.join(self.reduction_script, "reduce.py"))
177            out_log = os.path.join(log_dir, self.data['rb_number'] + ".log")
178            out_err = os.path.join(reduce_result_dir, self.data['rb_number'] + ".err")
179
180            logger.info("----------------")
181            logger.info("Reduction script: %s" % self.reduction_script)
182            logger.info("Result dir: %s" % reduce_result_dir)
183            logger.info("Run Output dir: %s" % run_output_dir)
184            logger.info("Log dir: %s" % log_dir)
185            logger.info("Out log: %s" % out_log)
186            logger.info("Error log: %s" % out_err)
187            logger.info("----------------")
188
189            logger.info("Reduction subprocess started.")
190            logFile=open(out_log, "w")
191            errFile=open(out_err, "w")
192            # Set the output to be the logfile
193            sys.stdout = logFile
194            sys.stderr = errFile
195
196            reduce_script = self.replace_variables(reduce_script)
197
198            out_directories = reduce_script.main(input_file=str(self.data_file), output_dir=str(reduce_result_dir))
199            logger.info("this is the reduce results directory %s" % str(reduce_result_dir))
200            logger.info("this is the entire outdirectories %s" % str(out_directories))
201
202            # Reset outputs back to default
203            sys.stdout = sys.__stdout__
204            sys.stderr = sys.__stderr__
205            logFile.close()
206            errFile.close()
207
208            # if errFile is empty don't output it to the user
209            #if (os.stat(out_err).st_size == 0):
210            #    logger.debug("No output errors from reduce.py")
211            #    os.remove(out_err)
212            #else:
213            #    logger.debug("Output errors from reduce.py")
214
215            logger.info("Reduction subprocess completed.")
216            logger.info("Additional save directories: %s" % out_directories)
217           
218            self.data['reduction_data'] = []
219            if "message" not in self.data:
220                self.data["message"] = ""
221
222            # If the reduce script specified some additional save directories, copy to there first
223            if out_directories:
224                if type(out_directories) is str and os.access(out_directories, os.R_OK):
225                    self.data['reduction_data'].append(linux_to_windows_path(out_directories))
226                    if not os.path.exists(out_directories):
227                        os.makedirs(out_directories)
228                    try:
229                        copytree(run_output_dir[:-1], out_directories)
230                    except Exception, e:
231                        logger.error("Unable to copy %s to %s - %s" % (run_output_dir[:-1], out_directories, e))
232                        self.data["message"] += "Unable to copy to %s - %s. " % (out_directories, e)
233                elif type(out_directories) is list:
234                    for out_dir in out_directories:
235                        self.data['reduction_data'].append(linux_to_windows_path(out_dir))
236                        if not os.path.exists(out_dir):
237                            os.makedirs(out_dir)
238                        if type(out_dir) is str and os.access(out_dir, os.R_OK):
239                            try:
240                                copytree(run_output_dir[:-1], out_dir)
241                            except Exception, e:
242                                logger.error("Unable to copy %s to %s - %s" % (run_output_dir[:-1], out_dir, e))
243                                self.data["message"] += "Unable to copy to %s - %s. " % (out_dir, e)
244                        else:
245                            logger.error("Unable to access directory: %s" % out_dir)
246
247            # Move from tmp directory to actual directory (remove /tmp from
248            # replace old data if they exist
249            if os.path.isdir(reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length]):
250                try:
251                    shutil.rmtree(reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length], ignore_errors=True)
252                except Exception, e:
253                    logger.error("Unable to remove existing directory %s - %s" % (reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length], e))
254            try:
255                os.makedirs(reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length])
256            except Exception, e:
257                logger.error("Unable to create %s - %s" % (reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length], e))
258                self.data["message"] += "Unable to create %s - %s. " % (reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length], e)
259           
260            # [4,-8] is used to remove the prepending '/tmp' and the trailing 'results/' from the destination
261            self.data['reduction_data'].append(linux_to_windows_path(reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length]))
262            logger.info("Moving %s to %s" % (reduce_result_dir[:-1], reduce_result_dir[len(TEMP_ROOT_DIRECTORY):-reduce_result_dir_tail_length]))
263            try:
264                shutil.copytree(reduce_result_dir[:-1], reduce_result_dir[len(TEMP_ROOT_DIRECTORY):])
265            except Exception, e:
266                logger.error("Unable to copy to %s - %s" % (reduce_result_dir[len(TEMP_ROOT_DIRECTORY):], e))
267                self.data["message"] += "Unable to copy to %s - %s. " % (reduce_result_dir[len(TEMP_ROOT_DIRECTORY):], e)
268           
269            if os.stat(out_err).st_size == 0:
270                os.remove(out_err)
271                self.client.send(self.conf['reduction_complete'] , json.dumps(self.data)) 
272                print "\nCalling: "+self.conf['reduction_complete'] + "\n" + json.dumps(self.data) + "\n"
273            else:
274                maxLineLength=80
275                fp=file(out_err, "r")
276                fp.seek(-maxLineLength-1, 2) # 2 means "from the end of the file"
277                lastLine = fp.readlines()[-1]
278                errMsg = lastLine.strip() + ", see reduction_log/" + os.path.basename(out_log) + " or " + os.path.basename(out_err) + " for details."
279                self.data["message"] = "REDUCTION: %s" % errMsg
280                self.client.send(self.conf['reduction_error'] , json.dumps(self.data))
281                logger.error("Called "+self.conf['reduction_error']  + " --- " + json.dumps(self.data))       
282           
283            # Remove temporary working directory
284            try:
285                shutil.rmtree(reduce_result_dir[:-reduce_result_dir_tail_length], ignore_errors=True)
286            except Exception, e:
287                logger.error("Unable to remove temporary directory %s - %s" % reduce_result_dir)
288
289            logger.info("Reduction job complete")
290        except Exception, e:
291            try:
292                self.data["message"] = "REDUCTION Error: %s " % e
293                logger.exception("Called "+self.conf['reduction_error']  + "\nException: " + str(e) + "\nJSON: " + json.dumps(self.data))
294                self.client.send(self.conf['reduction_error'] , json.dumps(self.data))
295            except BaseException, e:
296                print "\nFailed to send to queue!\n%s\n%s" % (e, repr(e))
297                logger.error("Failed to send to queue! - %s - %s" % (e, repr(e)))
298         
299if __name__ == "__main__":
300    logger.debug("Paths current %s" % str(sys.path))   
301    print "\n> In PostProcessAdmin.py\n"
302
303    try:
304        conf = json.load(open('/etc/autoreduce/post_process_consumer.conf'))
305
306        brokers = []
307        brokers.append((conf['brokers'].split(':')[0],int(conf['brokers'].split(':')[1])))
308        connection = stomp.Connection(host_and_ports=brokers, use_ssl=True, ssl_version=3 )
309        connection.start()
310        connection.connect(conf['amq_user'], conf['amq_pwd'], wait=True, header={'activemq.prefetchSize': '1',})
311
312        destination, message = sys.argv[1:3]
313        print("destination: " + destination)
314        print("message: " + message)
315        data = json.loads(message)
316       
317        try: 
318            pp = PostProcessAdmin(data, conf, connection)
319            if destination == '/queue/ReductionPending':
320                pp.reduce()
321
322        except ValueError as e:
323            data["error"] = str(e)
324            logger.error("JSON data error: " + json.dumps(data))
325
326            connection.send(conf['postprocess_error'], json.dumps(data))
327            print("Called " + conf['postprocess_error'] + "----" + json.dumps(data))
328            raise
329       
330        except:
331            raise
332       
333    except Error as er:
334        logger.error("Something went wrong: " + str(er))
335        sys.exit()
336
337