1 | import icat, time, stomp, json, sys, re |
---|
2 | |
---|
3 | class Client(object): |
---|
4 | def __init__(self, brokers, user, password, topics=None, consumer_name='QueueProcessor'): |
---|
5 | self._brokers = brokers |
---|
6 | self._user = user |
---|
7 | self._password = password |
---|
8 | self._connection = None |
---|
9 | self._topics = topics |
---|
10 | self._consumer_name = consumer_name |
---|
11 | self._listener = None |
---|
12 | |
---|
13 | def get_connection(self): |
---|
14 | connection = stomp.Connection(host_and_ports=self._brokers, use_ssl=True, ssl_version=3) |
---|
15 | connection.start() |
---|
16 | connection.connect(self._user, self._password, wait=False) |
---|
17 | |
---|
18 | time.sleep(0.5) |
---|
19 | return connection |
---|
20 | |
---|
21 | def connect(self): |
---|
22 | if self._connection is None or not self._connection.is_connected(): |
---|
23 | self._disconnect() |
---|
24 | self._connection = self.get_connection() |
---|
25 | |
---|
26 | def _disconnect(self): |
---|
27 | if self._connection is not None and self._connection.is_connected(): |
---|
28 | self._connection.disconnect() |
---|
29 | self._connection = None |
---|
30 | |
---|
31 | def stop(self): |
---|
32 | self._disconnect() |
---|
33 | if self._connection is not None: |
---|
34 | self._connection.stop() |
---|
35 | self._connection = None |
---|
36 | |
---|
37 | def send(self, destination, message, persistent='true'): |
---|
38 | if self._connection is None or not self._connection.is_connected(): |
---|
39 | self._disconnect() |
---|
40 | self._connection = self.get_connection() |
---|
41 | self._connection.send(destination, message, persistent=persistent) |
---|
42 | |
---|
43 | |
---|
44 | activemq_client = Client([("autoreduce.isis.cclrc.ac.uk", 61613)], 'autoreduce', '1^G8r2b$(6', 'RUN_BACKLOG') |
---|
45 | activemq_client.connect() |
---|
46 | |
---|
47 | if len(sys.argv) < 4: |
---|
48 | min_run = int(raw_input('Start run number: ')) |
---|
49 | max_run = int(raw_input('End run number: ')) |
---|
50 | rename = raw_input('Use .nxs file? [Y/N]: ') |
---|
51 | rbnum = int(raw_input('RB Number: ')) |
---|
52 | cycle = raw_input('Enter cycle number in format [14_3]: ') |
---|
53 | else: |
---|
54 | min_run = int(sys.argv[1]) |
---|
55 | max_run = int(sys.argv[2]) |
---|
56 | rename = 'n' #change to use nxs file |
---|
57 | rbnum = int(sys.argv[3]) #test RB |
---|
58 | cycle = str(sys.argv[4]) |
---|
59 | |
---|
60 | |
---|
61 | instrument = 'WISH' |
---|
62 | start = time.clock() |
---|
63 | print "Start: %0.3f" % start |
---|
64 | |
---|
65 | for x in range (min_run, max_run+1): |
---|
66 | location = str("\\isis\NDXWISH\Instrument\data\cycle_"+str(cycle)+"\WISH000"+str(x)+".raw") |
---|
67 | if rename.lower() == 'y': |
---|
68 | location = location.replace('.raw','.nxs') |
---|
69 | pattern = re.compile('(?P<before>cycle_\d\d_\d)\$', re.IGNORECASE) |
---|
70 | location = pattern.sub('\g<before>', location) |
---|
71 | data_dict = { |
---|
72 | "rb_number": str(rbnum), |
---|
73 | "instrument": instrument, |
---|
74 | "data": location, |
---|
75 | "run_number": str(int(x)), |
---|
76 | "facility": "ISIS" |
---|
77 | } |
---|
78 | activemq_client.send('/queue/DataReady', json.dumps(data_dict)) |
---|
79 | print data_dict |
---|
80 | time.sleep(2) |
---|