-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathlora.py
More file actions
89 lines (77 loc) · 2.46 KB
/
lora.py
File metadata and controls
89 lines (77 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from time import sleep
import config_lora
from sx127x import SX127x
from controller_esp32 import ESP32Controller
import _thread
class LoRa:
#Iniciamos el objeto de lora de acuerdo a los pines establecidos para las placas
#TTGO y Heltec con el chip sx127x
def __init__(self, header=None, period=0):
#Se especifica la cabecera en caso de que solo se quiera procesar algo en especifico
#y tambien un periodo por si se quiere hacer cierta accion de manera periodica
self.header = header
self.period = float(period)
self.status = 1
self.cb = None
self.controller = ESP32Controller()
self.lora = self.controller.add_transceiver(SX127x(name = 'LoRa'),
pin_id_ss = ESP32Controller.PIN_ID_FOR_LORA_SS,
pin_id_RxDone = ESP32Controller.PIN_ID_FOR_LORA_DIO0)
def __del__(self):
print("LoRa object deleted")
def stop(self):
self.status = 0
def go_on(self):
self.status = 1
#permite enviar un dato con el header especificado en la construccion del objeto
#o con un header distinto
def send(self, data, spheader=None):
if spheader is not None:
self.lora.println(str(spheader)+'@'+str(data))
elif self.header is not None:
self.lora.println(str(self.header)+'@'+str(data))
else:
self.lora.println(str(data))
#metodo para escuchar algun mensaje y ejecutar el callback
def wait_msg(self):
assert self.cb is not None, "Subscribe callback is not set"
def inc_msg():
while 1:
if self.status:
if self.lora.receivedPacket():
try:
payload = self.lora.read_payload().decode()
data = payload.split('@')
if len(data)==2:
if self.header is not None:
payload = payload.split('@')
if payload[0] == str(self.header):
self.cb(payload[1])
else:
pass
elif self.header is None:
self.cb(payload)
sleep(self.period)
except Exception as e:
print(e)
_thread.start_new_thread(inc_msg, ())
def set_callback(self, f):
self.cb = f
#metodo para el recibimiento de paquetes
def receive_msg(self):
assert self.cb is not None, "Subscribe callback is not set"
if self.lora.receivedPacket():
try:
payload = self.lora.read_payload().decode()
data = payload.split('@')
if len(data)==2:
if self.header is not None:
payload = payload.split('@')
if payload[0] == str(self.header):
self.cb(payload[1])
else:
pass
elif self.header is None:
self.cb(payload)
except Exception as e:
print(e)