Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "ESPNowChat",
"publisher": "MicroPythonOS",
"short_description": "ESPNow Chat",
"long_description": "Simple chat app using EspNow protocol for communication between devices.",
"icon_url": "https://apps.micropythonos.com/apps/com.micropythonos.espnow_chat/icons/com.micropythonos.espnow_chat_0.0.1_64x64.png",
"download_url": "https://apps.micropythonos.com/apps/com.micropythonos.espnow_chat/mpks/com.micropythonos.espnow_chat_0.0.1.mpk",
"fullname": "com.micropythonos.espnow_chat",
"version": "0.1.0",
"category": "development",
"activities": [
{
"entrypoint": "assets/espnow_chat.py",
"classname": "EspNowChat",
"intent_filters": [
{
"action": "main",
"category": "launcher"
}
]
}
]
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""
https://docs.micropython.org/en/latest/library/espnow.html
"""

from collections import deque

import lvgl as lv
import machine
from micropython import const
from mpos import Activity, MposKeyboard, TaskManager
from mpos.time import localtime

try:
import aioespnow
except ImportError:
aioespnow = None

try:
import network
except ImportError:
network = None

BROADCAST_MAC = const(b"\xbb\xbb\xbb\xbb\xbb\xbb")


def pformat_mac(mac):
if mac:
return ":".join(f"{b:02x}" for b in mac)
else:
return "<no mac>"


class EspNowChat(Activity):
def onCreate(self):
main_content = lv.obj()
main_content.set_flex_flow(lv.FLEX_FLOW.COLUMN)
main_content.set_style_pad_gap(10, 0)

self.input_textarea = lv.textarea(main_content)
self.input_textarea.set_placeholder_text("Message input...")
self.input_textarea.set_one_line(True)
self.input_textarea.set_style_text_font(lv.font_montserrat_16, lv.PART.MAIN)
self.input_textarea.set_width(lv.pct(100))
self.input_textarea.add_event_cb(self.show_keyboard, lv.EVENT.CLICKED, None)

self.keyboard = MposKeyboard(main_content)
self.keyboard.set_textarea(self.input_textarea)
self.keyboard.add_event_cb(self.keyboard_cb, lv.EVENT.READY, None)
self.keyboard.add_flag(lv.obj.FLAG.HIDDEN)

self.messages = lv.label(main_content)
self.messages.set_style_text_font(lv.font_montserrat_14, 0)

# Buffer to store and display the latest 20 messages:
self.messages_buffer = deque((), 20)

self.setContentView(main_content)

if aioespnow and network:
print("Initialize WLAN interface...")
sta = network.WLAN(network.WLAN.IF_STA)
sta.active(True)

self.own_id = pformat_mac(machine.unique_id())

self.info("Initialize ESPNow...")
self.espnow = aioespnow.AIOESPNow()
self.espnow.active(True)
self.espnow.add_peer(BROADCAST_MAC)

if sta.isconnected():
self.info(f"Connected to WiFi: {sta.config('essid')}")
self.info(f"Use WiFi Channel: {sta.config('channel')}")
else:
self.own_id = "<no espnow>"
self.info("ESPNow not available on this platform")

def info(self, text):
now = localtime()
hour, minute, second = now[3], now[4], now[5]
message = f"{hour:02}:{minute:02}:{second:02} {text}"
print(message)
self.messages_buffer.appendleft(message)
self.messages.set_text("\n".join(self.messages_buffer))

def keyboard_cb(self, event):
message = self.input_textarea.get_text()
if not message:
print("Ignore empty input")
else:
self.input_textarea.set_text("")
print(f"Create task to send {message=}...")
TaskManager.create_task(self.send_messages(message))

def show_keyboard(self, event):
print("Show keyboard")
self.keyboard.remove_flag(lv.obj.FLAG.HIDDEN)

async def send_messages(self, message):
self.info(f"Sending: {message} ({self.own_id})")
try:
await self.espnow.asend(BROADCAST_MAC, message.encode())
except OSError as err:
print(f"Error sending message: {err}")
else:
print(f"{message=} sent")

async def receive_messages(self):
await self.send_messages(f"{self.own_id} joins ESPNow chat.")
async for mac, msg in self.espnow:
if not msg:
print("Ignore empty message from", pformat_mac(mac))
continue
try:
msg = msg.decode()
except UnicodeError as err:
msg = f"<invalid message: {err}>"
self.info(f"{msg} ({pformat_mac(mac)})")
raise RuntimeError("ESPNow receive loop exited, which shouldn't happen")

def onResume(self, screen):
super().onResume(screen)
if aioespnow and network:
TaskManager.create_task(self.receive_messages())

def onPause(self, screen):
if aioespnow and network:
self.espnow.send(
BROADCAST_MAC, f"{self.own_id} leaves ESPNow chat.".encode()
)

print("Stop ESPNow...")
self.espnow.active(False)
print("ESPNow deactivated")

super().onPause(screen)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading