| 1 | import icat, time, stomp, json, sys, re |
|---|
| 2 | |
|---|
| 3 | class Client(object): |
|---|
| 4 | def __init__(self, brokers, user, password, topics=None, consumer_name='QueueProcessor'): |
|---|
| 5 | self._brokers = brokers |
|---|
| 6 | self._user = user |
|---|
| 7 | self._password = password |
|---|
| 8 | self._connection = None |
|---|
| 9 | self._topics = topics |
|---|
| 10 | self._consumer_name = consumer_name |
|---|
| 11 | self._listener = None |
|---|
| 12 | |
|---|
| 13 | def get_connection(self): |
|---|
| 14 | connection = stomp.Connection(host_and_ports=self._brokers, use_ssl=True, ssl_version=3) |
|---|
| 15 | connection.start() |
|---|
| 16 | connection.connect(self._user, self._password, wait=False) |
|---|
| 17 | |
|---|
| 18 | time.sleep(0.5) |
|---|
| 19 | return connection |
|---|
| 20 | |
|---|
| 21 | def connect(self): |
|---|
| 22 | if self._connection is None or not self._connection.is_connected(): |
|---|
| 23 | self._disconnect() |
|---|
| 24 | self._connection = self.get_connection() |
|---|
| 25 | |
|---|
| 26 | def _disconnect(self): |
|---|
| 27 | if self._connection is not None and self._connection.is_connected(): |
|---|
| 28 | self._connection.disconnect() |
|---|
| 29 | self._connection = None |
|---|
| 30 | |
|---|
| 31 | def stop(self): |
|---|
| 32 | self._disconnect() |
|---|
| 33 | if self._connection is not None: |
|---|
| 34 | self._connection.stop() |
|---|
| 35 | self._connection = None |
|---|
| 36 | |
|---|
| 37 | def send(self, destination, message, persistent='true'): |
|---|
| 38 | if self._connection is None or not self._connection.is_connected(): |
|---|
| 39 | self._disconnect() |
|---|
| 40 | self._connection = self.get_connection() |
|---|
| 41 | self._connection.send(destination, message, persistent=persistent) |
|---|
| 42 | |
|---|
| 43 | |
|---|
| 44 | activemq_client = Client([("autoreduce.isis.cclrc.ac.uk", 61613)], 'autoreduce', '1^G8r2b$(6', 'RUN_BACKLOG') |
|---|
| 45 | activemq_client.connect() |
|---|
| 46 | |
|---|
| 47 | if len(sys.argv) < 4: |
|---|
| 48 | min_run = int(raw_input('Start run number: ')) |
|---|
| 49 | max_run = int(raw_input('End run number: ')) |
|---|
| 50 | rename = raw_input('Use .nxs file? [Y/N]: ') |
|---|
| 51 | rbnum = int(raw_input('RB Number: ')) |
|---|
| 52 | cycle = raw_input('Enter cycle number in format [14_3]: ') |
|---|
| 53 | else: |
|---|
| 54 | min_run = int(sys.argv[1]) |
|---|
| 55 | max_run = int(sys.argv[2]) |
|---|
| 56 | rename = 'n' #change to use nxs file |
|---|
| 57 | rbnum = int(sys.argv[3]) #test RB |
|---|
| 58 | cycle = str(sys.argv[4]) |
|---|
| 59 | |
|---|
| 60 | |
|---|
| 61 | instrument = 'WISH' |
|---|
| 62 | start = time.clock() |
|---|
| 63 | print "Start: %0.3f" % start |
|---|
| 64 | |
|---|
| 65 | for x in range (min_run, max_run+1): |
|---|
| 66 | location = str("\\isis\NDXWISH\Instrument\data\cycle_"+str(cycle)+"\WISH000"+str(x)+".raw") |
|---|
| 67 | if rename.lower() == 'y': |
|---|
| 68 | location = location.replace('.raw','.nxs') |
|---|
| 69 | pattern = re.compile('(?P<before>cycle_\d\d_\d)\$', re.IGNORECASE) |
|---|
| 70 | location = pattern.sub('\g<before>', location) |
|---|
| 71 | data_dict = { |
|---|
| 72 | "rb_number": str(rbnum), |
|---|
| 73 | "instrument": instrument, |
|---|
| 74 | "data": location, |
|---|
| 75 | "run_number": str(int(x)), |
|---|
| 76 | "facility": "ISIS" |
|---|
| 77 | } |
|---|
| 78 | activemq_client.send('/queue/DataReady', json.dumps(data_dict)) |
|---|
| 79 | print data_dict |
|---|
| 80 | time.sleep(2) |
|---|