-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathbrooks_s_protocol.py
More file actions
148 lines (135 loc) · 5.33 KB
/
brooks_s_protocol.py
File metadata and controls
148 lines (135 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
""" Driver for Brooks s-protocol """
from __future__ import print_function
import time
import struct
import logging
import serial
from six import b, indexbytes
from PyExpLabSys.common.supported_versions import python2_and_3
# Configure logger as library logger and set supported python versions
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler())
python2_and_3(__file__)
class Brooks(object):
""" Driver for Brooks s-protocol """
def __init__(self, device, port='/dev/ttyUSB0'):
self.ser = serial.Serial(port, 19200)
self.ser.parity = serial.PARITY_ODD
self.ser.bytesize = serial.EIGHTBITS
self.ser.stopbits = serial.STOPBITS_ONE
deviceid = self.comm('8280000000000b06' + self.pack(device[-8:]))
manufactor_code = '0a'
device_type = deviceid[12:14]
long_address = manufactor_code + device_type + deviceid[-6:]
self.long_address = long_address
def pack(self, input_string):
""" Turns a string in packed-ascii format """
# This function lacks basic error checking....
klaf = ''
for s in input_string:
klaf += bin((ord(s) % 128) % 64)[2:].zfill(6)
result = ''
for i in range(0, 6):
result = result + hex(int('' + klaf[i * 8 : i * 8 + 8], 2))[2:].zfill(2)
return result
def crc(self, command):
""" Calculate crc value of command """
i = 0
while command[i : i + 2] == 'FF':
i += 2
command = command[i:]
n = len(command)
result = 0
for i in range(0, (n // 2)):
byte_string = command[i * 2 : i * 2 + 2]
byte = int(byte_string, 16)
result = byte ^ result
return hex(result)
def comm(self, command):
""" Implements low-level details of the s-protocol """
check = str(self.crc(command))
check = check[2:].zfill(2)
final_com = 'FFFFFFFF' + command + check
bin_comm = ''
for i in range(0, len(final_com) // 2):
bin_comm += chr(int(final_com[i * 2 : i * 2 + 2], 16))
bin_comm += chr(0)
bytes_for_serial = b(bin_comm)
error = 1
while (error > 0) and (error < 10):
self.ser.write(bytes_for_serial)
time.sleep(0.2)
s = self.ser.read(self.ser.inWaiting())
st = ''
for i in range(0, len(s)):
# char = hex(ord(s[i]))[2:].zfill(2)
# char = hex(s[i])[2:].zfill(2)
char = hex(indexbytes(s, i))[2:].zfill(2)
if not char.upper() == 'FF':
st = st + char
try:
# delimiter = st[0:2]
# address = st[2:12]
command = st[12:14]
byte_count = int(st[14:16], 16)
response = st[16 : 16 + 2 * byte_count]
error = 0
except ValueError:
error = error + 1
response = 'Error'
return response
def read_flow(self):
""" Read the current flow-rate """
response = self.comm('82' + self.long_address + '0100')
try: # TODO: This should be handled be re-sending command
# status_code = response[0:4]
unit_code = int(response[4:6], 16)
flow_code = response[6:]
byte0 = chr(int(flow_code[0:2], 16))
byte1 = chr(int(flow_code[2:4], 16))
byte2 = chr(int(flow_code[4:6], 16))
byte3 = chr(int(flow_code[6:8], 16))
flow = struct.unpack('>f', b(byte0 + byte1 + byte2 + byte3))
value = flow[0]
except ValueError:
value = -1
unit_code = 171 # Satisfy assertion check, we know what is wrong
assert unit_code == 171 # Flow unit should always be mL/min
return value
def read_full_range(self):
"""
Report the full range of the device
Apparantly this does not work for SLA-series...
"""
response = self.comm('82' + self.long_address + '980106') # Command 152
print(response)
# Double check what gas-selection code really means...
# currently 01 is used
# status_code = response[0:4]
unit_code = int(response[4:6], 16)
assert unit_code == 171 # Flow controller should always be set to mL/min
flow_code = response[6:]
byte0 = chr(int(flow_code[0:2], 16))
byte1 = chr(int(flow_code[2:4], 16))
byte2 = chr(int(flow_code[4:6], 16))
byte3 = chr(int(flow_code[6:8], 16))
max_flow = struct.unpack('>f', byte0 + byte1 + byte2 + byte3)
return max_flow[0]
def set_flow(self, flowrate):
""" Set the setpoint of the flow """
ieee = struct.pack('>f', flowrate)
ieee_flowrate = ''
for i in range(0, 4):
ieee_flowrate += hex(ord(ieee[i]))[2:].zfill(2)
# 39 = unit code for percent
# FA = unit code for 'same unit as flowrate measurement'
# response = self.comm('82' + self.long_address +
# 'ec05' + 'FA' + ieee_flowrate)
# status_code = response[0:4]
# unit_code = int(response[4:6], 16)
return True
if __name__ == '__main__':
BROOKS = Brooks('3F2320902001')
print(BROOKS.long_address)
print(BROOKS.read_full_range())
print(BROOKS.read_flow())