-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpyMonitor.py
More file actions
161 lines (128 loc) · 5.29 KB
/
pyMonitor.py
File metadata and controls
161 lines (128 loc) · 5.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""
pyMonitor first Version
Written By :Ahmed Alkabir
"""
#!/usr/bin/python3
# Library
import serial
import sys,queue,time,os
class pyMonitor(object):
#baud rate of Serial communication
baud_rate = [4800,9600,14400,19200,28800,38400,57600,115200]
# Parity Check list
Parity = {'PARITY_NONE':serial.PARITY_NONE,'PARITY_EVEN':serial.PARITY_EVEN,'PARITY_ODD':serial.PARITY_ODD}
# Bit Width List or ByteSize
ByteSize = {'FIVEBITS':serial.FIVEBITS,'SIXBITS':serial.SIXBITS,'SEVENBITS':serial.SEVENBITS,'EIGHTBITS':serial.EIGHTBITS}
# stopBits List
stopbits = {'STOPBITS_ONE':serial.STOPBITS_ONE,'STOPBITS_ONE_POINT_FIVE':serial.STOPBITS_ONE_POINT_FIVE,'STOPBITS_TWO':serial.STOPBITS_TWO}
# serial object
__main_conn = None
dataQueue = None
checkStatusQueue = None
out = None
__flag_out = None
try:
# Constructor
def __init__(self, port, baud_rate, byte_size=serial.EIGHTBITS, parity=serial.PARITY_NONE, stop_bit=serial.STOPBITS_ONE):
# initialize the connection and if anything ok open the port
self.__main_conn = serial.Serial(port, baud_rate, bytesize=byte_size, parity=parity, stopbits=stop_bit)
# Queue
self.dataQueue = queue.Queue()
self.checkStatusQueue = queue.Queue()
# Define Variables
self.out = ''
self.__flag_out = True
except serial.SerialException:
raise serial.SerialException
# Close Connection of Serial Communication
def close_connection(self):
if self.__main_conn is not None:
del self.__main_conn
# Get Name of Current Port
def get_name(self):
return self.__main_conn.name
# Transmit Data
def transmit_data(self,data):
# pass data to __main_conn pySerial object write method
self.__main_conn.write(data)
# Receive Data from Devices
def receive_data(self):
# And operation with __flag_out to check if want to close this thread
while True and self.__flag_out:
# to store received value into out variable
# and Make __flag_out = True
self.out = ''
self.__flag_out = True
# waiting for data to receive
while self.__main_conn.inWaiting() == 0 and self.__flag_out:
try:
if self.checkStatusQueue.get_nowait() == 'Stop':
self.__flag_out = False
except queue.Empty:
pass
# Sleep for 0.25 second
time.sleep(0.05)
# receive data from the target device
while self.__main_conn.inWaiting() > 0 and self.__flag_out:
try:
self.out += self.__main_conn.read(1).decode("utf-8")
except UnicodeDecodeError:
pass
if self.__flag_out:
# add to out variable
self.dataQueue.put(self.out)
# Ports of Computers and it depends on os system
@staticmethod
def get_port():
if sys.platform.startswith('win'): # For Windows Platform
ports = ['COM%s' %(i + 1) for i in range(256)]
win_ports = []
for port in ports:
try:
# Check if port it works if work add to results list otherwise ignore it
temp_port = serial.Serial(port)
temp_port.close()
win_ports.append(port)
except (OSError, serial.SerialException):
pass
# return ports
return win_ports
elif sys.platform.startswith('linux'): # For Linux Platform
# list of devices
list_dev_linux = os.listdir('/dev/')
list_linux = []
# but we need to select right devices
# to handle with it
for list_ in list_dev_linux:
if list_.startswith('tty'):
port = '/dev/' + list_
try:
# Check ports
temp_port = serial.Serial(port)
temp_port.close()
list_linux.append(port)
except(OSError, serial.SerialException):
# debug: just want to check available devices
print('-->{} - {}\n'.format(port, serial.SerialException.__str__))
pass
# return ports
return list_linux
# For Mac Devices
elif sys.platform.startswith('darwin'):
# list of devices
list_dev_mac = os.listdir('/dev/')
list_mac = []
# but we need to select right devices
# to handle with it
for list_ in list_dev_mac :
if list_.startswith('cu.'):
port = '/dev/' + list_
try:
# Check ports
temp_port = serial.Serial(port)
temp_port.close()
list_mac.append(port)
except(OSError, serial.SerialException):
pass
# return ports
return list_mac