Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions internal_filesystem/lib/mpos/board/linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,9 @@ def adc_to_voltage(adc_value):
# LightsManager will not be initialized (functions will return False)

# === SENSOR HARDWARE ===
# Note: Desktop builds have no sensor hardware
from mpos import SensorManager

# Initialize with no I2C bus - will detect MCU temp if available
# (On Linux desktop, this will fail gracefully but set _initialized flag)
SensorManager.init(None)
SensorManager.init_iio()

# === CAMERA HARDWARE ===

Expand Down
236 changes: 206 additions & 30 deletions internal_filesystem/lib/mpos/sensor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""

import time
import os
try:
import _thread
_lock = _thread.allocate_lock()
Expand All @@ -28,6 +29,7 @@

# Sensor type constants (matching Android SensorManager)
TYPE_ACCELEROMETER = 1 # Units: m/s² (meters per second squared)
TYPE_MAGNETIC_FIELD = 2 # Units: μT (micro teslas)
TYPE_GYROSCOPE = 4 # Units: deg/s (degrees per second)
TYPE_TEMPERATURE = 13 # Units: °C (generic, returns first available - deprecated)
TYPE_IMU_TEMPERATURE = 14 # Units: °C (IMU chip temperature)
Expand Down Expand Up @@ -101,6 +103,7 @@ class SensorManager:

# Class-level constants
TYPE_ACCELEROMETER = TYPE_ACCELEROMETER
TYPE_MAGNETIC_FIELD = TYPE_MAGNETIC_FIELD
TYPE_GYROSCOPE = TYPE_GYROSCOPE
TYPE_TEMPERATURE = TYPE_TEMPERATURE
TYPE_IMU_TEMPERATURE = TYPE_IMU_TEMPERATURE
Expand All @@ -120,7 +123,7 @@ def get(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance

def init(self, i2c_bus, address=0x6B, mounted_position=FACING_SKY):
"""Initialize SensorManager. MCU temperature initializes immediately, IMU initializes on first use.

Expand All @@ -146,6 +149,43 @@ def init(self, i2c_bus, address=0x6B, mounted_position=FACING_SKY):

self._initialized = True
return True

def init_iio(self):
self._imu_driver = _IIODriver()
self._sensor_list = [
Sensor(
name="Accelerometer",
sensor_type=TYPE_ACCELEROMETER,
vendor="Linux IIO",
version=1,
max_range="?",
resolution="?",
power_ma=10
),
Sensor(
name="Gyroscope",
sensor_type=TYPE_GYROSCOPE,
vendor="Linux IIO",
version=1,
max_range="?",
resolution="?",
power_ma=10
),
Sensor(
name="Temperature",
sensor_type=TYPE_IMU_TEMPERATURE,
vendor="Linux IIO",
version=1,
max_range="?",
resolution="?",
power_ma=10
)
]

self._load_calibration()

self._initialized = True
return True

def _ensure_imu_initialized(self):
"""Perform IMU initialization on first use (lazy initialization).
Expand Down Expand Up @@ -227,7 +267,35 @@ def get_default_sensor(self, sensor_type):
if sensor.type == sensor_type:
return sensor
return None


def read_sensor_once(self, sensor):
if sensor.type == TYPE_ACCELEROMETER:
if self._imu_driver:
ax, ay, az = self._imu_driver.read_acceleration()
if self._mounted_position == FACING_EARTH:
az *= -1
return (ax, ay, az)
elif sensor.type == TYPE_GYROSCOPE:
if self._imu_driver:
return self._imu_driver.read_gyroscope()
elif sensor.type == TYPE_IMU_TEMPERATURE:
if self._imu_driver:
return self._imu_driver.read_temperature()
elif sensor.type == TYPE_SOC_TEMPERATURE:
if self._has_mcu_temperature:
import esp32
return esp32.mcu_temperature()
elif sensor.type == TYPE_TEMPERATURE:
# Generic temperature - return first available (backward compatibility)
if self._imu_driver:
temp = self._imu_driver.read_temperature()
if temp is not None:
return temp
if self._has_mcu_temperature:
import esp32
return esp32.mcu_temperature()
return None

def read_sensor(self, sensor):
"""Read sensor data synchronously.

Expand Down Expand Up @@ -258,32 +326,7 @@ def read_sensor(self, sensor):

for attempt in range(max_retries):
try:
if sensor.type == TYPE_ACCELEROMETER:
if self._imu_driver:
ax, ay, az = self._imu_driver.read_acceleration()
if self._mounted_position == FACING_EARTH:
az *= -1
return (ax, ay, az)
elif sensor.type == TYPE_GYROSCOPE:
if self._imu_driver:
return self._imu_driver.read_gyroscope()
elif sensor.type == TYPE_IMU_TEMPERATURE:
if self._imu_driver:
return self._imu_driver.read_temperature()
elif sensor.type == TYPE_SOC_TEMPERATURE:
if self._has_mcu_temperature:
import esp32
return esp32.mcu_temperature()
elif sensor.type == TYPE_TEMPERATURE:
# Generic temperature - return first available (backward compatibility)
if self._imu_driver:
temp = self._imu_driver.read_temperature()
if temp is not None:
return temp
if self._has_mcu_temperature:
import esp32
return esp32.mcu_temperature()
return None
return self.read_sensor_once(sensor)
except Exception as e:
error_msg = str(e)
# Retry if sensor data not ready, otherwise fail immediately
Expand All @@ -292,6 +335,7 @@ def read_sensor(self, sensor):
time.sleep_ms(retry_delay_ms)
continue
else:
print("Exception reading sensor:", error_msg)
return None

return None
Expand Down Expand Up @@ -717,6 +761,138 @@ def set_calibration(self, accel_offsets, gyro_offsets):
raise NotImplementedError


class _IIODriver(_IMUDriver):
"""
Read sensor data via Linux IIO sysfs.

Typical base path:
/sys/bus/iio/devices/iio:device0
"""
accel_path: str

def __init__(self):
self.accel_path = self.find_iio_device_with_file("in_accel_x_raw")
print("path:", self.accel_path)

def _p(self, name: str):
return self.accel_path + "/" + name

def _exists(self, name):
try:
os.stat(name)
return True
except OSError:
return False

def _is_dir(self, path):
# MicroPython: stat tuple, mode is [0]
try:
st = os.stat(path)
mode = st[0]
# directory bit (POSIX): 0o040000
return (mode & 0o170000) == 0o040000
except OSError:
return False

def find_iio_device_with_file(self, filename, base_dir="/sys/bus/iio/devices/"):
"""
Returns full path to iio:deviceX that contains given filename,
e.g. "/sys/bus/iio/devices/iio:device0"

Returns None if not found.
"""

print("Is dir? ", self._is_dir(base_dir), base_dir)
try:
entries = os.listdir(base_dir)
except OSError:
print("Error listing dir")
return None

for e in entries:
print("Entry:", e)
if not e.startswith("iio:device"):
continue

print("Entry:", e)

dev_path = base_dir + "/" + e
if not self._is_dir(dev_path):
continue

if self._exists(dev_path + "/" + filename):
return dev_path

return None

def _read_text(self, name: str) -> str:
p = name
print("Read: ", p)
f = open(p, "r")
try:
return f.readline().strip()
finally:
f.close()

def _read_float(self, name: str) -> float:
return float(self._read_text(name))

def _read_int(self, name: str) -> int:
return int(self._read_text(name), 10)

def _read_raw_scaled(self, raw_name: str, scale_name: str) -> float:
raw = self._read_int(raw_name)
scale = self._read_float(scale_name)
return raw * scale

# ----------------------------
# Public API (replacing I2C)
# ----------------------------

def read_temperature(self) -> float:
"""
Tries common IIO patterns:
- in_temp_input (already scaled, usually millidegree C)
- in_temp_raw + in_temp_scale
"""
if False: # os.path.exists(self._p("in_temp_input")):
v = self._read_float(self.accel_path + "/" + "in_temp_input")
# Many drivers expose millidegree Celsius here.
if abs(v) > 200: # heuristic: 25000 means 25°C
return v / 1000.0
return v

# Fallback: raw + scale
return self._read_raw_scaled(self.accel_path + "/" + "in_temp_raw", self.accel_path + "/" + "in_temp_scale")

def read_acceleration(self) -> tuple[float, float, float]:
"""
Returns acceleration in m/s^2 if the kernel driver uses standard IIO scale.
Common names:
in_accel_{x,y,z}_raw + in_accel_scale
"""
scale_name = self.accel_path + "/" + "in_accel_scale"

ax = self._read_raw_scaled(self.accel_path + "/" + "in_accel_x_raw", scale_name)
ay = self._read_raw_scaled(self.accel_path + "/" + "in_accel_y_raw", scale_name)
az = self._read_raw_scaled(self.accel_path + "/" + "in_accel_z_raw", scale_name)

return (ax, ay, az)

def read_gyroscope(self) -> tuple[float, float, float]:
"""
Returns angular velocity in rad/s if the kernel driver uses standard IIO scale.
Common names:
in_anglvel_{x,y,z}_raw + in_anglvel_scale
"""
scale_name = self.accel_path + "/" + "in_anglvel_scale"

gx = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_x_raw", scale_name)
gy = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_y_raw", scale_name)
gz = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_z_raw", scale_name)

return (gx, gy, gz)

class _QMI8658Driver(_IMUDriver):
"""Wrapper for QMI8658 IMU (Waveshare board)."""

Expand Down Expand Up @@ -918,8 +1094,8 @@ def set_calibration(self, accel_offsets, gyro_offsets):

_original_methods = {}
_methods_to_delegate = [
'init', 'is_available', 'get_sensor_list', 'get_default_sensor',
'read_sensor', 'calibrate_sensor', 'check_calibration_quality',
'init', 'init_iio', 'is_available', 'get_sensor_list', 'get_default_sensor',
'read_sensor', 'read_sensor_once', 'calibrate_sensor', 'check_calibration_quality',
'check_stationarity'
]

Expand Down
Loading