Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"icon_url": "https://apps.micropythonos.com/apps/com.micropythonos.scan_bluetooth/icons/com.micropythonos.scan_bluetooth_0.0.1_64x64.png",
"download_url": "https://apps.micropythonos.com/apps/com.micropythonos.scan_bluetooth/mpks/com.micropythonos.scan_bluetooth_0.0.1.mpk",
"fullname": "com.micropythonos.scan_bluetooth",
"version": "0.0.1",
"version": "0.1.0",
"category": "development",
"activities": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,68 +3,45 @@
https://docs.micropython.org/en/latest/library/bluetooth.html
"""

import time

try:
import bluetooth
except ImportError: # Linux test runner may not provide bluetooth module
bluetooth = None

import sys

import lvgl as lv
from micropython import const
from mpos import Activity
from mpos import Activity, TaskManager

SCAN_DURATION = const(1000) # Duration of each BLE scan in milliseconds
_IRQ_SCAN_RESULT = const(5)
# Scan for 5 seconds,
SCAN_DURATION_MS = const(5000) # Duration of each BLE scan in milliseconds
# with very low interval/window (to maximize detection rate):
INTERVAL_US = const(30000)
WINDOW_US = const(30000)

_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)

# BLE Advertising Data Types (Standardized by Bluetooth SIG)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_SHORT_NAME = const(8)
_ADV_TYPE_NAME = const(9)


def decode_field(payload: bytes, adv_type: int) -> list:
results = []
def decode_name(payload: bytes) -> str | None:
i = 0
payload_len = len(payload)
while i < payload_len:
length = payload[i]
if length == 0 or i + length >= payload_len:
break
field_type = payload[i + 1]
if field_type == adv_type:
results.append(payload[i + 2 : i + length + 1])
if field_type in (_ADV_TYPE_SHORT_NAME, _ADV_TYPE_NAME):
if new_name := payload[i + 2 : i + length + 1]:
return str(new_name, "utf-8")
else:
print(f"Unsupported: {field_type=} with {length=}")
i += length + 1
return results


class BluetoothScanner:
def __init__(self, device_callback):
if bluetooth is None:
raise RuntimeError("Bluetooth module not available")
self.device_callback = device_callback
self.ble = bluetooth.BLE()
self.ble.irq(self.ble_irq_handler)

def __enter__(self):
print("Activating BLE")
self.ble.active(True)
return self

def ble_irq_handler(self, event: int, data: tuple) -> None:
if event == _IRQ_SCAN_RESULT:
addr_type, addr, adv_type, rssi, adv_data = data
addr = ":".join(f"{b:02x}" for b in addr)
names = decode_field(adv_data, _ADV_TYPE_NAME)
name = str(names[0], "utf-8") if names else "Unknown"
self.device_callback(addr, rssi, name)

def scan(self, duration_ms: int):
print(f"BLE scanning for {duration_ms}ms...")
self.ble.gap_scan(duration_ms, 20000, 10000)

def __exit__(self, exc_type, exc_val, exc_tb):
print("Deactivating BLE")
self.ble.active(False)


def set_dynamic_column_widths(table, font=None, padding=8):
Expand All @@ -85,75 +62,120 @@ def set_cell_value(table, *, row: int, values: tuple):


class ScanBluetooth(Activity):
refresh_timer = None

def onCreate(self):
screen = lv.obj()
screen.set_flex_flow(lv.FLEX_FLOW.COLUMN)
screen.set_style_pad_all(0, 0)
screen.set_size(lv.pct(100), lv.pct(100))
main_content = lv.obj()
main_content.set_flex_flow(lv.FLEX_FLOW.COLUMN)
main_content.set_style_pad_all(0, 0)
main_content.set_size(lv.pct(100), lv.pct(100))

info_column = lv.obj(main_content)
info_column.set_flex_flow(lv.FLEX_FLOW.COLUMN)
info_column.set_style_pad_all(1, 1)
info_column.set_size(lv.pct(100), lv.SIZE_CONTENT)

self.info_label = lv.label(info_column)
self.info_label.set_style_text_font(lv.font_montserrat_14, 0)

if bluetooth is None:
label = lv.label(screen)
label.set_text("Bluetooth not available on this platform")
label.center()
self.setContentView(screen)
self.info("Bluetooth not available on this platform")
self.setContentView(main_content)
return

self.table = lv.table(screen)
tabel_column = lv.obj(main_content)
tabel_column.set_flex_flow(lv.FLEX_FLOW.COLUMN)
tabel_column.set_style_pad_all(0, 0)
tabel_column.set_size(lv.pct(100), lv.SIZE_CONTENT)

self.table = lv.table(tabel_column)
set_cell_value(
self.table,
row=0,
values=("pos", "MAC", "RSSI", "count", "Name"),
)
set_dynamic_column_widths(self.table)

self.scan_count = 0
self.mac2column = {}
self.mac2counts = {}
self.mac2name = {}

self.scanner_cm = BluetoothScanner(device_callback=self.scan_callback)
self.scanner = self.scanner_cm.__enter__() # Activate BLE
self.ble = bluetooth.BLE()

self.setContentView(screen)
self.setContentView(main_content)

def scan_callback(self, addr, rssi, name):
if not (column_index := self.mac2column.get(addr)):
column_index = len(self.mac2column) + 1
self.mac2column[addr] = column_index
self.mac2counts[addr] = 1
else:
self.mac2counts[addr] += 1
def info(self, text):
print(text)
self.info_label.set_text(text)

set_cell_value(
self.table,
row=column_index,
values=(
str(column_index),
addr,
f"{rssi} dBm",
str(self.mac2counts[addr]),
name,
),
)
async def ble_scan(self):
"""Check sensor every second"""
while self.scanning:
print(f"async scan for {SCAN_DURATION_MS}ms...")
self.ble.gap_scan(SCAN_DURATION_MS, INTERVAL_US, WINDOW_US, True)
await TaskManager.sleep_ms(SCAN_DURATION_MS + 100)

def onResume(self, screen):
super().onResume(screen)
if bluetooth is None:
return

def update(timer):
self.scanner.scan(SCAN_DURATION)
set_dynamic_column_widths(self.table)
time.sleep_ms(SCAN_DURATION + 100) # Wait ?
print(f"Scan complete: {len(self.mac2column)} unique devices")
self.info("Activating Bluetooth...")
self.ble.irq(self.ble_irq_handler)
self.ble.active(True)

self.refresh_timer = lv.timer_create(update, SCAN_DURATION + 1000, None)
self.scanning = True
TaskManager.create_task(self.ble_scan())

def onPause(self, screen):
super().onPause(screen)
if bluetooth is None:
return
self.scanner.__exit__(None, None, None) # Deactivate BLE
if self.refresh_timer:
self.refresh_timer.delete()
self.refresh_timer = None

self.scanning = False

self.info("Stop scanning...")
self.ble.gap_scan(None)
self.info("Deactivating BLE...")
self.ble.active(False)
self.info("BLE deactivated")

def ble_irq_handler(self, event: int, data: tuple) -> None:
try:
if event == _IRQ_SCAN_RESULT:
addr_type, addr, adv_type, rssi, adv_data = data
addr = ":".join(f"{b:02x}" for b in addr)
print(f"{addr=} {rssi=} {len(adv_data)=}")
if name := decode_name(adv_data):
self.mac2name[addr] = name
else:
name = self.mac2name.get(addr, "Unknown")

if not (column_index := self.mac2column.get(addr)):
column_index = len(self.mac2column) + 1
self.mac2column[addr] = column_index
self.mac2counts[addr] = 1
else:
self.mac2counts[addr] += 1

set_cell_value(
self.table,
row=column_index,
values=(
str(column_index),
addr,
f"{rssi} dBm",
str(self.mac2counts[addr]),
name,
),
)
elif event == _IRQ_SCAN_DONE:
set_dynamic_column_widths(self.table)
self.scan_count += 1
self.info(
f"{len(self.mac2column)} unique devices (Scan {self.scan_count})"
)
else:
print(f"Ignored BLE {event=}")
except Exception as e:
sys.print_exception(e)
print(f"Error in BLE IRQ handler {event=}: {e}")
Loading