Until now, we discussed the hardware and what would be the orientation of the software. It’s now time to describe how the code is.
Apologies… This post is full of code extracts and weird bash commands
You’ve been warned :0P
As you probably already know, BAC0 is based on bacpypes. So coding with BAC0 is very similar in concept to what it would be using bacpypes, except that, there is a lot of work already done. Before going too far, I always use berryconda to run Python on a Pi. I find this distribution easy to install. You get Python3 with everything you need like Numpy or Pandas… no efforts.
When you initialize your BAC0 app
import BAC0
new_device = BAC0.lite(
ip=ip, deviceId=deviceId, localObjName="BAC0_Fireplace"
)
every details about BACnet basis are taken care of. Just doing that, you could find your implementation on the network already. “App” could read or write to others devices, it has a device ID, a vendor ID, am object name. It respond to Who-Is requests by a I-Am. You would have to think about all those details if starting from scratch.
So, knowing that, what do we need to create for our app to work ? I see 4 parts :
- A serial driver that will take care of all communication with the fireplace
- A local variable definition file that will add objects to the “app”
- An application file that will serve as bridge between BACnet and serial
- A main file that will be used to initialize everything (and will be defined as a service) We’ll see this later.
Serial communication
I used pyserial to implement serial communication with the fireplace. It is the “de facto” module to work with serial ports in Python.As usual, I didn’t want to reinvent the wheel so I stick to the examples I found in the documentation and it served me well.
It even provide a little terminal tool to make simple tests :
python -m serial.tools.miniterm /dev/ttyS0 115200
The terminal picture from last post is showing the output of this terminal app.
# Really simple example to open a serial port with pyserial
def open_serial_port():
print('Opening COM port 1')
# open serial port
ser = serial.Serial('/dev/ttyS0', 115200, timeout=1)
# check which port was really used
print(ser.name)
return ser
The first version of my driver was working fine. It was based on a simple “on-demand” type of requests. I mean that I was issuing requests, and waited for a response. Each time.
# You write to the serial port
response = ser.write(b'GET LED')
# You get an answer
b'ON'
Problem came when I tried to integrate the touch screen interface to the driver. My driver was not constantly listening on the serial port for requests that would be issued by something else than the “app”. This was a big flaw.
I’ve been really inspired by this thread when defining my reading thread.
I also used some regex to process the responses I got.
class ReadLine:
def __init__(self, s):
self.buf = bytearray()
self.s = s
self.need_to_write = False
self.can_write = False
self.write_responses = deque()
self.display_commands = deque()
def readline(self):
i = self.buf.find(b"\n")
if i >= 0:
r = self.buf[: i + 1]
self.buf = self.buf[i + 1 :]
self.process_data(r)
self.buf = bytearray()
return
while True:
i = max(1, min(2048, self.s.in_waiting))
data = self.s.read(i)
i = data.find(b"\n")
if i >= 0:
r = self.buf + data[: i + 1]
self.buf[0:] = data[i + 1 :]
self.process_data(r)
self.buf = bytearray()
return
else:
self.buf.extend(data)
def process_data(self, response):
if response == b"\n":
return
response_to_write = re.compile(r"(\r)([\w :.]*)(\n)")
display_write = re.compile(r"(HEY )([\w :.]*)(\n)?")
if response:
data = response.decode()
if response_to_write.search(data):
data = response_to_write.search(data)[2]
self.write_responses.append(data)
elif display_write.search(data):
data = display_write.search(data)[2]
self.display_commands.append(data)
This have been put into a thread running readline all the time. The heart of the requests handling is
def _request(self, req):
self.ser.write(bytes("{}\r\n".format(req), "UTF-8")) # write a string
self._log.debug("{} sent to COM1".format(req))
time.sleep(0.2)
try:
while True:
try:
_result = self.rl.write_responses.pop()
result = self.decode_result(_result)
break
except IndexError:
time.sleep(0.1)
except EmptyResult:
self._log.warning("No result to request {}".format(req))
result = None
except ErrorResponse:
raise ErrorResponse("Error received for request {}".format(req))
self._log.debug("Response : {}".format(result))
if not result:
raise CommunicationFailure("No response to request : {}".format(req))
return result
When it is needed, we send something on the serial interface, then we wait for something appearing in the “deque” of write responses. We can then decode and process to the next thing.
Using this, I can also monitor ser.rl.display_commands deque and see what is going on with the remote display. Case closed.
Defining BACnet objects
I’ve already said BAC0 made things easy. This is the file named device.py that creates all the objects I needed to define the BACnet side of the object. An important thing is to declare the class with your vendor id (see lines 27-30)
import BAC0
from BAC0.core.utils.notes import note_and_log
from bacpypes.local.object import (
AnalogOutputCmdObject,
AnalogValueCmdObject,
BinaryOutputCmdObject,
BinaryValueCmdObject,
)
from bacpypes.object import AnalogInputObject, BinaryInputObject, register_object_type
from bacpypes.basetypes import EngineeringUnits
from bacpypes.primitivedata import CharacterString
import time
def start_device(ip, deviceId):
try:
new_device = BAC0.lite(
ip=ip, deviceId=deviceId, localObjName="BAC0_Fireplace"
)
except Exception:
new_device = BAC0.lite(deviceId=deviceId, localObjName="BAC0_Fireplace")
time.sleep(1)
# Register class to activate behaviours
register_object_type(AnalogOutputCmdObject, vendor_id=842)
register_object_type(AnalogValueCmdObject, vendor_id=842)
register_object_type(BinaryOutputCmdObject, vendor_id=842)
register_object_type(BinaryValueCmdObject, vendor_id=842)
online = BinaryValueCmdObject(
objectIdentifier=("binaryValue", 1),
objectName="Fireplace Online",
presentValue="inactive",
description=CharacterString("Communication status on the serial interface"),
)
ledpulse_bv = BinaryValueCmdObject(
objectIdentifier=("binaryValue", 2),
objectName="LEDPULSE",
presentValue="inactive",
description=CharacterString(
"Set Led to pulse between current color and color set by LEDCOLOR after turning on LEDPULSE on"
),
)
flameenable_bv = BinaryValueCmdObject(
objectIdentifier=("binaryValue", 3),
objectName="FLAME-EN",
presentValue="inactive",
description=CharacterString(
"If set to false, will prevent FLAME to be turned ON"
),
)
flamealarm_bv = BinaryValueCmdObject(
objectIdentifier=("binaryValue", 4),
objectName="FLAME-ALARM",
presentValue="inactive",
minimumOnTime=300,
description=CharacterString(
"Will be On if there is a discrepancy between command and status of flame"
),
)
led_bo = BinaryOutputCmdObject(
objectIdentifier=("binaryOutput", 1),
objectName="LED",
presentValue="inactive",
description=CharacterString("Turns the LED on or off"),
)
flame_bo = BinaryOutputCmdObject(
objectIdentifier=("binaryOutput", 2),
objectName="FLAME",
presentValue="inactive",
description=CharacterString("Turns flame relay on or off"),
)
heatfan_bo = BinaryOutputCmdObject(
objectIdentifier=("binaryOutput", 3),
objectName="HEATFAN",
presentValue="inactive",
description=CharacterString(
"Turns the heat exchanger blower on or off, if present"
),
)
lamp_bo = BinaryOutputCmdObject(
objectIdentifier=("binaryOutput", 4),
objectName="LAMP",
presentValue="inactive",
description=CharacterString("Lamp on or off (non LED)"),
)
auxburner_bo = BinaryOutputCmdObject(
objectIdentifier=("binaryOutput", 5),
objectName="AUXBURNER",
presentValue="inactive",
description=CharacterString("Auxiliary burner or valve, if present"),
)
ledfadetime_av = AnalogValueCmdObject(
objectIdentifier=("analogValue", 1),
objectName="LEDFADETIME",
presentValue=0,
units=EngineeringUnits("milliseconds"),
description=CharacterString("Sets fade time between led colors (0-32767)"),
)
leddwelltime_av = AnalogValueCmdObject(
objectIdentifier=("analogValue", 2),
objectName="LEDDWELLTIME",
presentValue=0,
units=EngineeringUnits("milliseconds"),
description=CharacterString(
"Sets how long a color will remain before it transitions to the newer color"
),
)
ledcolorR_av = AnalogValueCmdObject(
objectIdentifier=("analogValue", 3),
objectName="LEDCOLOR_R",
relinquishDefault=100,
presentValue=100,
description=CharacterString("RED value of LEDCOLOR (0-255)"),
)
ledcolorG_av = AnalogValueCmdObject(
objectIdentifier=("analogValue", 4),
objectName="LEDCOLOR_G",
relinquishDefault=100,
presentValue=100,
description=CharacterString("GREEN value of LEDCOLOR (0-255)"),
)
ledcolorB_av = AnalogValueCmdObject(
objectIdentifier=("analogValue", 5),
objectName="LEDCOLOR_B",
relinquishDefault=100,
presentValue=100,
description=CharacterString("BLUE value of LEDCOLOR (0-255)"),
)
ledcolorW_av = AnalogValueCmdObject(
objectIdentifier=("analogValue", 6),
objectName="LEDCOLOR_W",
relinquishDefault=100,
presentValue=100,
description=CharacterString("WHITE value of LEDCOLOR (0-255)"),
)
humidity_ai = AnalogInputObject(
objectIdentifier=("analogInput", 1),
objectName="HUMIDITY",
presentValue=0,
units=EngineeringUnits("percentRelativeHumidity"),
description=CharacterString("Reading of humidity sensor"),
)
temperature_ai = AnalogInputObject(
objectIdentifier=("analogInput", 2),
objectName="TEMPERATURE",
presentValue=0,
units=EngineeringUnits("degreesCelsius"),
description=CharacterString("Reading of temperature sensor"),
)
dewpoint_ai = AnalogInputObject(
objectIdentifier=("analogInput", 3),
objectName="DEWPOINT",
presentValue=0,
units=EngineeringUnits("degreesCelsius"),
description=CharacterString(
"Calculated dewpoint based on temperature and humidity readings"
),
)
heatfanspeed_ao = AnalogOutputCmdObject(
objectIdentifier=("analogOutput", 1),
objectName="HEATFANSPEED",
presentValue=0,
description=CharacterString("Sets speed of heat exchanger"),
)
flamelevel_ao = AnalogOutputCmdObject(
objectIdentifier=("analogOutput", 2),
objectName="FLAMELEVEL",
presentValue=0,
description=CharacterString("Level of flame, if present (1-10)"),
)
lamplevel_ao = AnalogOutputCmdObject(
objectIdentifier=("analogOutput", 3),
objectName="LAMPLEVEL",
presentValue=6,
description=CharacterString("Level of lamp (1-10)"),
)
# BV
new_device.this_application.add_object(online)
new_device.this_application.add_object(ledpulse_bv)
new_device.this_application.add_object(flameenable_bv)
new_device.this_application.add_object(flamealarm_bv)
# BO
new_device.this_application.add_object(led_bo)
new_device.this_application.add_object(flame_bo)
new_device.this_application.add_object(heatfan_bo)
new_device.this_application.add_object(lamp_bo)
new_device.this_application.add_object(auxburner_bo)
# AI
new_device.this_application.add_object(humidity_ai)
new_device.this_application.add_object(temperature_ai)
new_device.this_application.add_object(dewpoint_ai)
# AO
new_device.this_application.add_object(heatfanspeed_ao)
new_device.this_application.add_object(flamelevel_ao)
new_device.this_application.add_object(lamplevel_ao)
# AV
new_device.this_application.add_object(ledfadetime_av)
new_device.this_application.add_object(leddwelltime_av)
new_device.this_application.add_object(ledcolorR_av)
new_device.this_application.add_object(ledcolorG_av)
new_device.this_application.add_object(ledcolorB_av)
new_device.this_application.add_object(ledcolorW_av)
return new_device
The application itself
What I call application here is the part of the code that glue the BACnet side and the serial driver. By the name of the functions, I think you will understand what is going on with this part of the code. I removed a lot of code to emphasize the flow of the application code. I kept parts that were directly tight to BAC0 itself and how I write to BACnet objects for example.
from BAC0.tasks.RecurringTask import RecurringTask
from BAC0.tasks.DoOnce import DoOnce
from BAC0.tasks.TaskManager import Manager
from BAC0.core.utils.notes import note_and_log
from bacpypes.primitivedata import Real, Null
import time
from beat import led_beat
from serial_driver import Fireplace
from device import start_device
@note_and_log
class App(object):
def __init__(self, ip, deviceId):
self.dev = start_device(ip=ip, deviceId=deviceId)
self.fireplace = Fireplace()
self.process_permission = True
def update_online_status(self):
online = self.dev.this_application.get_object_name("Fireplace Online")
if self.fireplace.online:
online.presentValue = "active"
else:
online.presentValue = "inactive"
def ping_serial(self):
"""
This function ask the serial driver to ping the device.
Depending on the response, update the bacnet object Fireplace Online
"""
self._log.debug(
"Validating Online Status of Fireplace : {}".format(self.fireplace.online)
)
self.fireplace.driver.ping()
self.update_online_status()
def process(self):
while self.process_permission:
try:
if self.fireplace.online:
self.update_present_values()
self.make_display_write_to_priority_15()
self.process_write_requests_to_device()
else:
self.ping_serial()
if not self.fireplace.online:
self._log.warning("Device offline... will pause for 5 seconds")
time.sleep(5)
self.ping_serial()
except Exception as e:
self._log.error(
"Error happend, trying to recover in 1 min.\n{}".format(e),
exc_info=True,
)
time.sleep(60)
def update_present_values(self):
"""
Read all variables from the serial interface and update bacnet presentValues
accordingly
"""
def decode_ledcolor(self, result):
# You get a list of R G B W... decode and return values
return (r, g, b, w)
def make_display_write_to_priority_15(self):
"""
If something happen on the display, make that change appear on the priority
array of the corresponding BACnet object... and update the present value as it
may have changed.
"""
def process_write_requests_to_device(self):
"""
Read all variable from the serial interface and update bacnet presentValues
accordingly
"""
# Write if necessary (if priority array != presentValue)
And at the end, a main file
The main file is calling everything, and dealing with exceptions… because I don’t want the app to stop… just try to reset.
def main():
# BAC0.log_level("debug")
app = App(ip="172.16.3.246/22", deviceId=3246)
beat_task = DoOnce(led_beat)
process = DoOnce(app.process)
ping = RecurringTask(app.ping_serial, name="ping", delay=300)
app._log.debug("Starting leds blinking")
beat_task.start()
app._log.debug("Starting processing values task")
process.start()
while True:
time.sleep(10)
if not process.is_alive():
app._log.error("Probleme with process thread...trying to join")
process.join()
process = DoOnce(app.process)
app._log.error("Process thread...restarting")
process.start()
if __name__ == "__main__":
main()
Let see what will happen from there. As usual, it will probably have some bugs to fix…
Running as a service ?
[ref : https://tecadmin.net/setup-autorun-python-script-using-systemd/ ]
To run the python script as a service (each time the Pi restart) I’ve used systemd and had to create one file and put it under
sudo nano /lib/systemd/system/fireplace.service
[Unit]
Description=BACnet Gateway
After=multi-user.target
[email protected]
[Service]
Type=Simple
ExecStart=/home/pi/berryconda3/bin/python /home/pi/dev/Fireplace/main.py
StandardInput=tty-force
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
Once it’s done, make systemd find the new file by reloading, enable the service to make it “enabled” on start…. and start the service.
sudo systemctl daemon-reload
sudo systemctl enable fireplace.service
sudo systemctl start fireplace.service
That’s it. Now the python script will run as soon as the Pi is alive. You can check the status of the service, like any other
sudo systemctl status fireplace.service
Now, what do you get ? I mean, does it work in Niagara ? We’ll see that in the next post.










