diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index fa3cf1311..223668474 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -37,6 +37,7 @@ except ImportError as e: have_test = False +import meshtastic.file_transfer_cli as file_transfer_cli import meshtastic.ota import meshtastic.util import meshtastic.serial_interface @@ -443,6 +444,116 @@ def onConnected(interface): # Must turn off encryption on primary channel interface.getNode(args.dest, **getNode_kwargs).turnOffEncryptionOnPrimaryChannel() + if args.ls is not None: + closeNow = True + remote_dir = args.ls + depth = int(getattr(args, "ls_depth", 0) or 0) + node = interface.localNode + rows = node.listDir(remote_dir, depth=depth) + if rows is None: + meshtastic.util.our_exit("listDir failed", 1) + for path, sz in rows: + print(f"{sz}\t{path}") + + if args.upload is not None: + closeNow = True + node = interface.localNode + local_tokens = list(args.upload[:-1]) + remote_base = args.upload[-1] + try: + upload_pairs = file_transfer_cli.plan_upload(local_tokens, remote_base) + except file_transfer_cli.FileTransferCliError as e: + meshtastic.util.our_exit(str(e), 1) + + def _upload_progress(sent, total): + pct = 100 * sent // total if total else 0 + bar = '#' * (pct // 5) + '.' * (20 - pct // 5) + print(f"\r [{bar}] {pct}%", end="", flush=True) + + for i, (lp, devp) in enumerate(upload_pairs, start=1): + print(f"Uploading ({i}/{len(upload_pairs)}) {lp} → {devp}") + ok = node.uploadFile(lp, devp, on_progress=_upload_progress) + print(f"\r {'OK' if ok else 'FAILED'}: {lp} → {devp} ") + if not ok: + meshtastic.util.our_exit("Upload failed", 1) + + if args.download is not None: + closeNow = True + node = interface.localNode + rpath, lpath = args.download + lpath_abs = os.path.abspath(os.path.expanduser(lpath)) + if os.path.isdir(lpath_abs): + meshtastic.util.our_exit( + "ERROR: --download LOCAL must be a file path, not a directory (use --download-tree or --download-glob).", + 1, + ) + parent = os.path.dirname(lpath_abs) + if parent: + os.makedirs(parent, exist_ok=True) + print(f"Downloading {rpath} → {lpath}") + def _download_progress(received, _total): + print(f"\r {received} bytes received...", end="", flush=True) + ok = node.downloadFile(rpath, lpath_abs, on_progress=_download_progress) + print(f"\r {'OK' if ok else 'FAILED'}: {rpath} → {lpath_abs} ") + if not ok: + meshtastic.util.our_exit("Download failed", 1) + + if args.download_tree is not None: + closeNow = True + node = interface.localNode + rdir, ldir = args.download_tree + depth = 255 + rows = node.listDir(rdir.rstrip("/") or "/", depth=depth) + if rows is None: + meshtastic.util.our_exit("listDir failed", 1) + try: + tree_pairs = file_transfer_cli.plan_download_tree(rdir, ldir, rows) + except file_transfer_cli.FileTransferCliError as e: + meshtastic.util.our_exit(str(e), 1) + if not tree_pairs: + meshtastic.util.our_exit("No files matched for --download-tree", 1) + + def _download_progress(received, _total): + print(f"\r {received} bytes received...", end="", flush=True) + + for i, (devp, locp) in enumerate(tree_pairs, start=1): + os.makedirs(os.path.dirname(locp) or ".", exist_ok=True) + print(f"Downloading ({i}/{len(tree_pairs)}) {devp} → {locp}") + ok = node.downloadFile(devp, locp, on_progress=_download_progress) + print(f"\r {'OK' if ok else 'FAILED'}: {devp} → {locp} ") + if not ok: + meshtastic.util.our_exit("Download failed", 1) + + if args.download_glob is not None: + closeNow = True + node = interface.localNode + pattern, ldir = args.download_glob + try: + base, _rel = file_transfer_cli.split_remote_glob_pattern(pattern) + except file_transfer_cli.FileTransferCliError as e: + meshtastic.util.our_exit(str(e), 1) + depth = 255 + rows = node.listDir(base, depth=depth) + if rows is None: + meshtastic.util.our_exit("listDir failed", 1) + try: + glob_pairs = file_transfer_cli.plan_download_glob(pattern, ldir, rows) + except file_transfer_cli.FileTransferCliError as e: + meshtastic.util.our_exit(str(e), 1) + if not glob_pairs: + meshtastic.util.our_exit("No files matched for --download-glob", 1) + + def _download_progress(received, _total): + print(f"\r {received} bytes received...", end="", flush=True) + + for i, (devp, locp) in enumerate(glob_pairs, start=1): + os.makedirs(os.path.dirname(locp) or ".", exist_ok=True) + print(f"Downloading ({i}/{len(glob_pairs)}) {devp} → {locp}") + ok = node.downloadFile(devp, locp, on_progress=_download_progress) + print(f"\r {'OK' if ok else 'FAILED'}: {devp} → {locp} ") + if not ok: + meshtastic.util.our_exit("Download failed", 1) + if args.reboot: closeNow = True waitForAckNak = True @@ -1876,6 +1987,74 @@ def addLocalActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars default=None ) + xfer = group.add_mutually_exclusive_group() + xfer.add_argument( + "--upload", + help=( + "Upload local files to the device via XModem (requires matching firmware). " + "Usage: --upload LOCAL [LOCAL ...] REMOTE. " + "Last argument is the device path: for a single plain file LOCAL, REMOTE is the exact destination file path; " + "otherwise REMOTE is a directory prefix and relative paths are preserved. " + "LOCAL may be files, directories (recursive), or globs (quote patterns with **). " + "Each device path must be <= 128 UTF-8 bytes." + ), + nargs="+", + metavar="SPEC", + default=None, + ) + xfer.add_argument( + "--download", + help=( + "Download one file from the device. " + "Usage: --download REMOTE_FILE LOCAL_FILE. " + "For a directory tree use --download-tree; for remote globs use --download-glob." + ), + nargs=2, + metavar=("REMOTE", "LOCAL"), + default=None, + ) + xfer.add_argument( + "--download-tree", + help=( + "Download a full remote directory tree via MFLIST + XModem. " + "Usage: --download-tree REMOTE_DIR LOCAL_DIR. " + "Only rows with size > 0 are treated as files." + ), + nargs=2, + metavar=("REMOTE_DIR", "LOCAL_DIR"), + default=None, + ) + xfer.add_argument( + "--download-glob", + help=( + "Download remote files matching a glob (MFLIST at the literal base + filter). " + "Usage: --download-glob 'REMOTE_PATTERN' LOCAL_DIR. " + "Pattern must include * ? or [; ** matches across / (relative to the literal base)." + ), + nargs=2, + metavar=("REMOTE_PATTERN", "LOCAL_DIR"), + default=None, + ) + + group.add_argument( + "--ls", + help=( + "List files on the device under REMOTE_DIR via XMODEM MFLIST (requires matching firmware). " + "Output: size_bytespath (one per line)." + ), + nargs="?", + const="/", + default=None, + metavar="REMOTE_DIR", + ) + + group.add_argument( + "--ls-depth", + help="Max directory depth for --ls (0 = files in REMOTE_DIR only).", + type=int, + default=0, + ) + return parser def addRemoteActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: diff --git a/meshtastic/file_transfer_cli.py b/meshtastic/file_transfer_cli.py new file mode 100644 index 000000000..cb8730453 --- /dev/null +++ b/meshtastic/file_transfer_cli.py @@ -0,0 +1,275 @@ +"""CLI helpers for XModem upload/download (multi-file, globs, listDir-based downloads).""" + +from __future__ import annotations + +import glob +import os +import posixpath +import re +from typing import Iterable, List, Optional, Sequence, Tuple + +# Must match meshtastic.node.MeshInterface._XMODEM_BUFFER_MAX (path in first packet). +XMODEM_DEVICE_PATH_UTF8_MAX = 128 + + +def device_posix_join(base: str, *parts: str) -> str: + """Join device path segments with forward slashes; collapse duplicate slashes.""" + segs: List[str] = [] + for p in (base,) + parts: + if not p: + continue + for seg in p.replace("\\", "/").split("/"): + if seg == "" or seg == ".": + continue + if seg == "..": + if segs: + segs.pop() + continue + segs.append(seg) + return "/" + "/".join(segs) if segs else "/" + + +def device_path_utf8_len(path: str) -> int: + return len(path.encode("utf-8")) + + +def check_device_paths(paths: Iterable[str]) -> Optional[str]: + """Return first error message if any path exceeds XModem limit, else None.""" + for p in paths: + n = device_path_utf8_len(p) + if n > XMODEM_DEVICE_PATH_UTF8_MAX: + return ( + f"Device path exceeds {XMODEM_DEVICE_PATH_UTF8_MAX} UTF-8 bytes ({n}): {p!r}" + ) + return None + + +def _first_glob_magic_index(s: str) -> int: + for i, c in enumerate(s): + if c in "*?[": + return i + return -1 + + +def _expand_local_token(token: str) -> Tuple[List[str], str]: + """ + Expand one local CLI token to absolute file paths and a strip_prefix for relpath. + + Returns (sorted_unique_files, strip_prefix). + """ + exp = os.path.expanduser(token) + idx = _first_glob_magic_index(exp) + + if idx >= 0: + raw = sorted(set(glob.glob(exp, recursive=True))) + files = [os.path.normpath(os.path.abspath(p)) for p in raw if os.path.isfile(p)] + if not files: + raise FileTransferCliError(f"Glob matched no files: {token!r}") + literal = exp[:idx] + if not literal.strip(): + anchor = os.path.abspath(".") + else: + anchor = os.path.normpath(os.path.abspath(literal)) + return files, anchor + + if os.path.isfile(exp): + p = os.path.normpath(os.path.abspath(exp)) + return [p], os.path.dirname(p) + + if os.path.isdir(exp): + root = os.path.normpath(os.path.abspath(exp)) + out: List[str] = [] + for dirpath, _dirnames, filenames in os.walk(root): + for name in filenames: + fp = os.path.join(dirpath, name) + if os.path.isfile(fp): + out.append(os.path.normpath(os.path.abspath(fp))) + return sorted(set(out)), root + + raise FileTransferCliError(f"Not a file, directory, or glob: {token!r}") + + +class FileTransferCliError(Exception): + pass + + +def plan_upload(local_tokens: Sequence[str], remote_base: str) -> List[Tuple[str, str]]: + """ + Build (local_abs_path, device_path) for each upload. + + Rules: + - Exactly one local path token that is a plain file, and expansion yields one file: + device path is ``remote_base`` as given. + - Otherwise: device path is device_posix_join(remote_base, relpath) where relpath uses + os.path.relpath(local_file, strip_prefix) with forward slashes. + """ + if not local_tokens: + raise FileTransferCliError("--upload requires at least LOCAL and REMOTE") + remote_base = remote_base.replace("\\", "/") + if remote_base and not remote_base.startswith("/"): + remote_base = "/" + remote_base.lstrip("/") + + single_token_plain_file = ( + len(local_tokens) == 1 + and _first_glob_magic_index(local_tokens[0]) < 0 + and os.path.isfile(os.path.expanduser(local_tokens[0])) + ) + + entries: List[Tuple[str, str]] = [] + for tok in local_tokens: + files, strip = _expand_local_token(tok) + for f in files: + entries.append((f, strip)) + + if not entries: + raise FileTransferCliError("No files to upload") + + seen: set = set() + deduped: List[Tuple[str, str]] = [] + for f, strip in entries: + if f in seen: + continue + seen.add(f) + deduped.append((f, strip)) + + use_exact_remote = len(deduped) == 1 and single_token_plain_file + out: List[Tuple[str, str]] = [] + for local_path, strip_prefix in deduped: + if use_exact_remote: + dev = remote_base + else: + rel = os.path.relpath(local_path, strip_prefix) + rel_posix = rel.replace(os.sep, "/") + dev = device_posix_join(remote_base, rel_posix) + out.append((local_path, dev)) + + err = check_device_paths(dev for _loc, dev in out) + if err: + raise FileTransferCliError(err) + return out + + +def split_remote_glob_pattern(pattern: str) -> Tuple[str, str]: + """ + Split ``pattern`` into (list_dir_base, relative_glob) for MFLIST + filtering. + + ``list_dir_base`` is the longest leading substring with no glob metacharacters, + normalized to a POSIX path without trailing slash (except root ``/``). + ``relative_glob`` is the remainder (may include ``**``). + """ + pattern = pattern.replace("\\", "/") + idx = _first_glob_magic_index(pattern) + if idx < 0: + raise FileTransferCliError( + "--download-glob pattern must contain at least one of * ? [" + ) + if idx == 0: + base = "/" + rel = pattern.lstrip("/") + else: + literal = pattern[:idx] + rel = pattern[idx:].lstrip("/") + if not rel: + raise FileTransferCliError("Invalid --download-glob pattern") + base = posixpath.normpath(literal.rstrip("/") or "/") + if not base.startswith("/"): + base = "/" + base + return base, rel + + +def remote_rel_glob_to_regex(rel_glob: str) -> re.Pattern[str]: + """ + Match a path relative to list base, using ``/`` separators. + ``**`` matches across directories; ``*`` and ``?`` do not cross ``/``. + """ + rel_glob = rel_glob.replace("\\", "/") + out: List[str] = ["\\A"] + i = 0 + while i < len(rel_glob): + if rel_glob[i : i + 2] == "**": + if i + 2 < len(rel_glob) and rel_glob[i + 2] == "/": + out.append("(?:.*/)?") + i += 3 + else: + out.append(".*") + i += 2 + elif rel_glob[i] == "*": + out.append("[^/]*") + i += 1 + elif rel_glob[i] == "?": + out.append("[^/]") + i += 1 + elif rel_glob[i] in r".^$+{}[]|()\\": + out.append(re.escape(rel_glob[i])) + i += 1 + else: + out.append(re.escape(rel_glob[i])) + i += 1 + out.append("\\Z") + return re.compile("".join(out)) + + +def plan_download_tree( + remote_dir: str, local_dir: str, rows: Sequence[Tuple[str, int]] +) -> List[Tuple[str, str]]: + """From listDir rows, build (device_path, local_abs_path) for every file.""" + remote_dir = remote_dir.rstrip("/").replace("\\", "/") + if not remote_dir: + remote_dir = "/" + if not remote_dir.startswith("/"): + remote_dir = "/" + remote_dir + + local_root = os.path.abspath(os.path.expanduser(local_dir)) + out: List[Tuple[str, str]] = [] + + for path, sz in rows: + if sz <= 0: + continue + path = path.replace("\\", "/") + if not path.startswith(remote_dir): + continue + tail = path[len(remote_dir) :].lstrip("/") + if not tail: + continue + local_path = os.path.join(local_root, *tail.split("/")) + out.append((path, os.path.normpath(local_path))) + + err = check_device_paths(dev for dev, _l in out) + if err: + raise FileTransferCliError(err) + return out + + +def plan_download_glob( + pattern: str, local_dir: str, rows: Sequence[Tuple[str, int]] +) -> List[Tuple[str, str]]: + base, rel_pat = split_remote_glob_pattern(pattern) + rx = remote_rel_glob_to_regex(rel_pat) + base_n = base.rstrip("/") or "/" + + local_root = os.path.abspath(os.path.expanduser(local_dir)) + out: List[Tuple[str, str]] = [] + + for path, sz in rows: + if sz <= 0: + continue + path = path.replace("\\", "/") + if base_n == "/": + if not path.startswith("/"): + continue + rel = path.lstrip("/") + else: + if not (path == base_n or path.startswith(base_n + "/")): + continue + rel = path[len(base_n) :].lstrip("/") + if not rel: + continue + if not rx.match(rel): + continue + local_path = os.path.join(local_root, *rel.split("/")) + out.append((path, os.path.normpath(local_path))) + + err = check_device_paths(dev for dev, _l in out) + if err: + raise FileTransferCliError(err) + return out diff --git a/meshtastic/node.py b/meshtastic/node.py index 66b6312ff..3bda70fba 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -3,11 +3,12 @@ import base64 import logging +import threading import time from typing import Optional, Union, List -from meshtastic.protobuf import admin_pb2, apponly_pb2, channel_pb2, config_pb2, localonly_pb2, mesh_pb2, portnums_pb2 +from meshtastic.protobuf import admin_pb2, apponly_pb2, channel_pb2, config_pb2, localonly_pb2, mesh_pb2, portnums_pb2, xmodem_pb2 from meshtastic.util import ( Timeout, camel_to_snake, @@ -1076,3 +1077,319 @@ def get_channels_with_hash(self): "hash": hash_val, }) return result + + # ── XModem file transfer ─────────────────────────────────────────────────── + + _XMODEM_BUFFER_MAX = 128 # meshtastic_XModem_buffer_t::bytes + _XMODEM_MAX_RETRY = 10 + _XMODEM_TIMEOUT_S = 5.0 + _MFLIST_PREFIX = "MFLIST " + + @staticmethod + def _xmodem_crc16(data: bytes) -> int: + """CRC-16-CCITT matching the firmware's XModemAdapter::crc16_ccitt.""" + crc = 0 + for b in data: + crc = ((crc >> 8) | (crc << 8)) & 0xFFFF + crc ^= b + crc ^= ((crc & 0xFF) >> 4) & 0xFFFF + crc ^= ((crc << 8) << 4) & 0xFFFF + crc ^= (((crc & 0xFF) << 4) << 1) & 0xFFFF + return crc & 0xFFFF + + def _xmodem_send(self, xm: xmodem_pb2.XModem) -> None: + """Wrap an XModem protobuf in ToRadio and send to the device.""" + tr = mesh_pb2.ToRadio() + tr.xmodemPacket.CopyFrom(xm) + self.iface._sendToRadio(tr) + + def _xmodem_roundtrip(self, xm: xmodem_pb2.XModem, + timeout_s: float = _XMODEM_TIMEOUT_S) -> Optional[xmodem_pb2.XModem]: + """Subscribe to xmodempacket, send, then wait for response (subscribe-first to avoid race).""" + from pubsub import pub # type: ignore[import-untyped] + event = threading.Event() + result: list = [None] + + def _on_xmodem(packet, interface): + result[0] = packet + event.set() + + # Subscribe BEFORE sending so we don't miss a fast response + pub.subscribe(_on_xmodem, "meshtastic.xmodempacket") + try: + self._xmodem_send(xm) + event.wait(timeout=timeout_s) + finally: + try: + pub.unsubscribe(_on_xmodem, "meshtastic.xmodempacket") + except Exception: + pass + + return result[0] + + def uploadFile(self, local_path: str, device_path: str, + on_progress=None, timeout_s: float = _XMODEM_TIMEOUT_S) -> bool: + """Upload a local file to the device via XModem. + + Args: + local_path: Path to the local file to upload. + device_path: Absolute path on the device filesystem (what the firmware + resolves for XModem open/write). + on_progress: Optional callback ``fn(bytes_sent, total_bytes)``. + timeout_s: Per-packet ACK timeout in seconds. + + Returns: + True on success, False on failure. + + Example:: + + iface.localNode.uploadFile("wordle.bin", "/bbs/kb/wordle.bin") + """ + if self.noProto: + logger.warning("uploadFile: protocol disabled (noProto)") + return False + + try: + data = open(local_path, "rb").read() + except OSError as e: + logger.error(f"uploadFile: cannot read {local_path}: {e}") + return False + + XC = xmodem_pb2.XModem + + # SOH seq=0 — filename handshake + xm = xmodem_pb2.XModem() + xm.control = XC.SOH + xm.seq = 0 + xm.buffer = device_path.encode("utf-8")[: self._XMODEM_BUFFER_MAX] + + for attempt in range(self._XMODEM_MAX_RETRY): + resp = self._xmodem_roundtrip(xm, timeout_s) + logger.debug(f"uploadFile: OPEN attempt {attempt+1} resp={resp.control if resp else None}") + if resp and resp.control == XC.ACK: + break + if attempt == self._XMODEM_MAX_RETRY - 1: + logger.error(f"uploadFile: OPEN rejected for {device_path}") + return False + + # STX data packets + seq = 1 + offset = 0 + total = len(data) + while offset < total: + chunk = data[offset: offset + self._XMODEM_BUFFER_MAX] + crc = self._xmodem_crc16(chunk) + + xm = xmodem_pb2.XModem() + xm.control = XC.STX + xm.seq = seq + xm.crc16 = crc + xm.buffer = bytes(chunk) + + acked = False + for retry in range(self._XMODEM_MAX_RETRY): + resp = self._xmodem_roundtrip(xm, timeout_s) + if resp and resp.control == XC.ACK: + acked = True + break + if resp and resp.control == XC.CAN: + logger.error(f"uploadFile: transfer cancelled at offset {offset}") + return False + if not acked: + logger.error(f"uploadFile: no ACK for seq {seq} at offset {offset}") + return False + + offset += len(chunk) + # Firmware uses monotonic uint16 packet numbers (not 8-bit XMODEM wrap). + seq += 1 + if on_progress: + on_progress(offset, total) + + # EOT + xm = xmodem_pb2.XModem() + xm.control = XC.EOT + for attempt in range(self._XMODEM_MAX_RETRY): + resp = self._xmodem_roundtrip(xm, timeout_s) + if resp and resp.control == XC.ACK: + logger.debug(f"uploadFile: {local_path} → {device_path} complete ({total} bytes)") + return True + logger.error(f"uploadFile: EOT not acknowledged for {device_path}") + return False + + def _xmodem_wait_next(self, timeout_s: float = _XMODEM_TIMEOUT_S) -> Optional[xmodem_pb2.XModem]: + """Wait for the next xmodem packet without sending anything first.""" + from pubsub import pub # type: ignore[import-untyped] + event = threading.Event() + result: list = [None] + + def _cb(packet, interface): + result[0] = packet + event.set() + + pub.subscribe(_cb, "meshtastic.xmodempacket") + try: + event.wait(timeout=timeout_s) + finally: + try: + pub.unsubscribe(_cb, "meshtastic.xmodempacket") + except Exception: + pass + return result[0] + + def downloadFile(self, device_path: str, local_path: str, + on_progress=None, timeout_s: float = _XMODEM_TIMEOUT_S) -> bool: + """Download a file from the device via XModem. + + Args: + device_path: Absolute path on the device (same path rules as ``uploadFile``). + local_path: Destination path on the local filesystem. + on_progress: Optional callback ``fn(bytes_received, total_bytes)``. + ``total_bytes`` is -1 (unknown) during transfer. + timeout_s: Per-packet response timeout in seconds. + + Returns: + True on success, False on failure. + + Example:: + + iface.localNode.downloadFile("/bbs/kb/wordle.bin", "wordle.bin") + """ + if self.noProto: + logger.warning("downloadFile: protocol disabled (noProto)") + return False + + XC = xmodem_pb2.XModem + + # STX seq=0 — request device to transmit the file + xm = xmodem_pb2.XModem() + xm.control = XC.STX + xm.seq = 0 + xm.buffer = device_path.encode("utf-8")[: self._XMODEM_BUFFER_MAX] + + chunks: list = [] + expected_seq = 1 + + # Subscribe first, then send, so we don't miss the first response + resp = self._xmodem_roundtrip(xm, timeout_s) + + while True: + if resp is None: + logger.error(f"downloadFile: timeout waiting for data from {device_path}") + return False + + if resp.control == XC.EOT: + # Final ACK — no more packets expected after this + ack = xmodem_pb2.XModem() + ack.control = XC.ACK + self._xmodem_send(ack) + break + + if resp.control in (XC.NAK, XC.CAN): + logger.error(f"downloadFile: device sent error control for {device_path}") + return False + + if resp.control in (XC.SOH, XC.STX): + chunk = bytes(resp.buffer) + if resp.seq == expected_seq and self._xmodem_crc16(chunk) == resp.crc16: + chunks.append(chunk) + if on_progress: + on_progress(sum(len(c) for c in chunks), -1) + ack = xmodem_pb2.XModem() + ack.control = XC.ACK + expected_seq += 1 + else: + ack = xmodem_pb2.XModem() + ack.control = XC.NAK + + # Subscribe BEFORE sending ACK/NAK so we don't miss the next packet + resp = self._xmodem_roundtrip(ack, timeout_s) + continue + + # Unexpected control — skip + resp = self._xmodem_wait_next(timeout_s) + + data = b"".join(chunks) + try: + with open(local_path, "wb") as f: + f.write(data) + except OSError as e: + logger.error(f"downloadFile: cannot write {local_path}: {e}") + return False + + logger.debug(f"downloadFile: {device_path} → {local_path} complete ({len(data)} bytes)") + return True + + def listDir(self, device_path: str, depth: int = 0, timeout_s: float = _XMODEM_TIMEOUT_S): + """List files on the device under ``device_path`` via XMODEM ``MFLIST`` (matching firmware). + + Args: + device_path: Directory path on the device to list. + depth: Recursion depth (0 = files in that directory only; each increment adds one tree level). + timeout_s: Per-packet timeout. + + Returns: + List of ``(path, size_bytes)`` for each file, or ``None`` on failure. + Lines starting with ``#`` in the payload are ignored (comments / truncation markers). + """ + if self.noProto: + logger.warning("listDir: protocol disabled (noProto)") + return None + + d = max(0, min(255, int(depth))) + cmd = f"{self._MFLIST_PREFIX}{device_path} {d}".encode("utf-8")[: self._XMODEM_BUFFER_MAX] + XC = xmodem_pb2.XModem + + xm = xmodem_pb2.XModem() + xm.control = XC.SOH + xm.seq = 0 + xm.buffer = bytes(cmd) + + chunks: list = [] + expected_seq = 1 + resp = self._xmodem_roundtrip(xm, timeout_s) + + while True: + if resp is None: + logger.error(f"listDir: timeout waiting for data from {device_path}") + return None + + if resp.control == XC.EOT: + ack = xmodem_pb2.XModem() + ack.control = XC.ACK + self._xmodem_send(ack) + break + + if resp.control in (XC.NAK, XC.CAN): + logger.error(f"listDir: device rejected or cancelled for {device_path}") + return None + + if resp.control in (XC.SOH, XC.STX): + chunk = bytes(resp.buffer) + if resp.seq == expected_seq and self._xmodem_crc16(chunk) == resp.crc16: + chunks.append(chunk) + ack = xmodem_pb2.XModem() + ack.control = XC.ACK + expected_seq += 1 + else: + ack = xmodem_pb2.XModem() + ack.control = XC.NAK + + resp = self._xmodem_roundtrip(ack, timeout_s) + continue + + resp = self._xmodem_wait_next(timeout_s) + + raw = b"".join(chunks).decode("utf-8", errors="replace") + out: list = [] + for line in raw.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + if "\t" not in line: + continue + path, sz = line.split("\t", 1) + try: + out.append((path, int(sz))) + except ValueError: + logger.debug("listDir: skip unparsable line %r", line) + return out diff --git a/meshtastic/tests/test_file_transfer_cli.py b/meshtastic/tests/test_file_transfer_cli.py new file mode 100644 index 000000000..429d356fc --- /dev/null +++ b/meshtastic/tests/test_file_transfer_cli.py @@ -0,0 +1,129 @@ +"""Tests for meshtastic.file_transfer_cli.""" + +import os +import tempfile + +import pytest + +from meshtastic.file_transfer_cli import ( + FileTransferCliError, + XMODEM_DEVICE_PATH_UTF8_MAX, + check_device_paths, + device_posix_join, + plan_download_glob, + plan_download_tree, + plan_upload, + remote_rel_glob_to_regex, + split_remote_glob_pattern, +) + + +def test_device_posix_join(): + assert device_posix_join("/mnt/d", "a/b") == "/mnt/d/a/b" + assert device_posix_join("/mnt/d/", "a", "b") == "/mnt/d/a/b" + assert device_posix_join("/", "x") == "/x" + + +def test_check_device_paths(): + assert check_device_paths(["/short"]) is None + longp = "/" + "a" * (XMODEM_DEVICE_PATH_UTF8_MAX + 1) + assert check_device_paths([longp]) is not None + + +def test_plan_upload_single_plain_file(): + with tempfile.TemporaryDirectory() as d: + f = os.path.join(d, "one.bin") + with open(f, "wb") as fp: + fp.write(b"x") + pairs = plan_upload([f], "/mnt/out.bin") + assert pairs == [(f, "/mnt/out.bin")] + + +def test_plan_upload_glob_recursive(): + with tempfile.TemporaryDirectory() as d: + os.makedirs(os.path.join(d, "p", "q")) + f1 = os.path.join(d, "p", "a.txt") + f2 = os.path.join(d, "p", "q", "b.txt") + for fp in (f1, f2): + with open(fp, "w") as fh: + fh.write("x") + pat = os.path.join(d, "p", "**", "*.txt") + pairs = plan_upload([pat], "/mnt/out") + assert len(pairs) == 2 + devs = sorted(dev for _loc, dev in pairs) + assert devs == ["/mnt/out/a.txt", "/mnt/out/q/b.txt"] + + +def test_plan_upload_directory_preserves_layout(): + with tempfile.TemporaryDirectory() as d: + sub = os.path.join(d, "a", "b") + os.makedirs(sub) + f1 = os.path.join(sub, "f.txt") + with open(f1, "w") as fp: + fp.write("hi") + pairs = plan_upload([d], "/mnt/dst") + assert len(pairs) == 1 + loc, dev = pairs[0] + assert loc == f1 + assert dev == "/mnt/dst/a/b/f.txt" + + +def test_plan_upload_dedupe(): + with tempfile.TemporaryDirectory() as d: + f = os.path.join(d, "x.bin") + with open(f, "wb") as fp: + fp.write(b"x") + pairs = plan_upload([f, f], "/mnt/d") + assert len(pairs) == 1 + + +def test_plan_upload_path_too_long(): + with tempfile.TemporaryDirectory() as d: + f = os.path.join(d, "x.bin") + with open(f, "wb") as fp: + fp.write(b"x") + long_base = "/" + "z" * 200 + with pytest.raises(FileTransferCliError): + plan_upload([f], long_base) + + +def test_split_remote_glob_pattern(): + assert split_remote_glob_pattern("/mnt/bbs/*.md") == ("/mnt/bbs", "*.md") + base, rel = split_remote_glob_pattern("*.md") + assert base == "/" + assert rel == "*.md" + + +def test_remote_rel_glob_to_regex(): + rx = remote_rel_glob_to_regex("**/*.png") + assert rx.match("a/b/c.png") + assert rx.match("x.png") + assert not rx.match("a/b/c.jpg") + + +def test_plan_download_tree(): + rows = [ + ("/mnt/d/a.txt", 3), + ("/mnt/d/sub/b.bin", 2), + ("/mnt/d/emptydir", 0), + ] + with tempfile.TemporaryDirectory() as ld: + pairs = plan_download_tree("/mnt/d", ld, rows) + assert len(pairs) == 2 + by = {os.path.basename(lp): dp for dp, lp in pairs} + assert by["a.txt"] == "/mnt/d/a.txt" + assert by["b.bin"] == "/mnt/d/sub/b.bin" + + +def test_plan_download_glob(): + rows = [ + ("/mnt/bbs/kb/one.md", 10), + ("/mnt/bbs/kb/two.txt", 5), + ("/mnt/bbs/other/x.md", 3), + ] + with tempfile.TemporaryDirectory() as ld: + pairs = plan_download_glob("/mnt/bbs/**/*.md", ld, rows) + assert len(pairs) == 2 + locs = sorted(lp for _dp, lp in pairs) + assert locs[0].endswith(os.path.join("kb", "one.md")) + assert locs[1].endswith(os.path.join("other", "x.md")) diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index 986c1783c..8e072ddce 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -7,7 +7,7 @@ import pytest -from ..protobuf import admin_pb2, localonly_pb2, config_pb2 +from ..protobuf import admin_pb2, localonly_pb2, config_pb2, xmodem_pb2 from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611 from ..node import Node from ..serial_interface import SerialInterface @@ -1610,6 +1610,287 @@ def test_start_ota_remote_node_raises_error(): ) +@pytest.mark.unit +def test_node_xmodem_crc16_known_vectors(): + """Regression vectors for CRC-16-CCITT matching firmware XModem.""" + assert Node._xmodem_crc16(b"") == 0 + assert Node._xmodem_crc16(b"a") == 31879 + assert Node._xmodem_crc16(b"hello") == 50018 + assert Node._xmodem_crc16(b"x" * 128) == 33239 + + +@pytest.mark.unit +def test_node_upload_file_xmodem_happy_path(tmp_path): + """uploadFile: OPEN (SOH/0), one STX data packet, EOT — all ACKed.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + device_path = "/t.bin" + payload = b"hello" + src = tmp_path / "src.bin" + src.write_bytes(payload) + + def fake_roundtrip(xm, timeout_s=5.0): + if xm.control == XC.SOH and xm.seq == 0: + assert bytes(xm.buffer) == device_path.encode("utf-8") + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + if xm.control == XC.STX and xm.seq == 1: + assert bytes(xm.buffer) == payload + assert xm.crc16 == Node._xmodem_crc16(payload) + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + if xm.control == XC.EOT: + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + assert anode.uploadFile(str(src), device_path) is True + + +@pytest.mark.unit +def test_node_upload_file_xmodem_two_chunks(tmp_path): + """uploadFile spans two STX packets when payload is larger than buffer max (128).""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + device_path = "/big.bin" + payload = b"Z" * 129 + c0, c1 = payload[:128], payload[128:] + src = tmp_path / "src.bin" + src.write_bytes(payload) + + def fake_roundtrip(xm, timeout_s=5.0): + if xm.control == XC.SOH and xm.seq == 0: + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + if xm.control == XC.STX and xm.seq == 1: + assert bytes(xm.buffer) == c0 + assert xm.crc16 == Node._xmodem_crc16(c0) + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + if xm.control == XC.STX and xm.seq == 2: + assert bytes(xm.buffer) == c1 + assert xm.crc16 == Node._xmodem_crc16(c1) + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + if xm.control == XC.EOT: + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + assert anode.uploadFile(str(src), device_path) is True + + +@pytest.mark.unit +def test_node_upload_file_open_rejected(tmp_path): + """uploadFile returns False when device never ACKs OPEN.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + src = tmp_path / "src.bin" + src.write_bytes(b"x") + + def fake_roundtrip(xm, timeout_s=5.0): + if xm.control == XC.SOH and xm.seq == 0: + p = xmodem_pb2.XModem() + p.control = XC.NAK + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + assert anode.uploadFile(str(src), "/x.bin") is False + + +@pytest.mark.unit +def test_node_download_file_xmodem_happy_path(tmp_path): + """downloadFile: request STX/0, receive data STX/1..n, then EOT.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + device_path = "/r.bin" + payload = b"hi" + dst = tmp_path / "out.bin" + phase = 0 + + def fake_roundtrip(xm, timeout_s=5.0): + nonlocal phase + if phase == 0: + phase += 1 + assert xm.control == XC.STX and xm.seq == 0 + assert bytes(xm.buffer) == device_path.encode("utf-8") + p = xmodem_pb2.XModem() + p.control = XC.STX + p.seq = 1 + p.buffer = payload + p.crc16 = Node._xmodem_crc16(payload) + return p + if phase == 1: + phase += 1 + assert xm.control == XC.ACK + p = xmodem_pb2.XModem() + p.control = XC.EOT + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + assert anode.downloadFile(device_path, str(dst)) is True + assert dst.read_bytes() == payload + + +@pytest.mark.unit +def test_node_download_file_two_chunks(tmp_path): + """downloadFile reassembles multiple STX payloads before EOT.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + device_path = "/p.bin" + c0, c1 = b"A" * 128, b"B" * 10 + payload = c0 + c1 + dst = tmp_path / "out.bin" + phase = 0 + + def fake_roundtrip(xm, timeout_s=5.0): + nonlocal phase + if phase == 0: + phase += 1 + assert xm.control == XC.STX and xm.seq == 0 + p = xmodem_pb2.XModem() + p.control = XC.STX + p.seq = 1 + p.buffer = c0 + p.crc16 = Node._xmodem_crc16(c0) + return p + if phase == 1: + phase += 1 + assert xm.control == XC.ACK + p = xmodem_pb2.XModem() + p.control = XC.STX + p.seq = 2 + p.buffer = c1 + p.crc16 = Node._xmodem_crc16(c1) + return p + if phase == 2: + phase += 1 + assert xm.control == XC.ACK + p = xmodem_pb2.XModem() + p.control = XC.EOT + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + assert anode.downloadFile(device_path, str(dst)) is True + assert dst.read_bytes() == payload + + +@pytest.mark.unit +def test_node_listdir_parses_mflist_payload(): + """listDir sends MFLIST, collects SOH chunks, parses path\\tsize lines.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + + def fake_roundtrip(xm, timeout_s=5.0): + if xm.control == XC.SOH and xm.seq == 0: + assert bytes(xm.buffer) == b"MFLIST / 0" + chunk = b"/a.txt\t10\n/b.bin\t3\n" + p = xmodem_pb2.XModem() + p.control = XC.SOH + p.seq = 1 + p.buffer = chunk + p.crc16 = Node._xmodem_crc16(chunk) + return p + if xm.control == XC.ACK: + p = xmodem_pb2.XModem() + p.control = XC.EOT + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + rows = anode.listDir("/", depth=0) + assert rows == [("/a.txt", 10), ("/b.bin", 3)] + + +@pytest.mark.unit +def test_node_listdir_skips_comments_and_bad_lines(): + """listDir ignores # lines, lines without tab, and non-integer sizes.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + chunk = ( + b"# meta\n" + b"/ok.txt\t1\n" + b"no-tab-field\n" + b"/badsz\txx\n" + b"/good.bin\t99\n" + ) + + def fake_roundtrip(xm, timeout_s=5.0): + if xm.control == XC.SOH and xm.seq == 0: + p = xmodem_pb2.XModem() + p.control = XC.SOH + p.seq = 1 + p.buffer = chunk + p.crc16 = Node._xmodem_crc16(chunk) + return p + if xm.control == XC.ACK: + p = xmodem_pb2.XModem() + p.control = XC.EOT + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + rows = anode.listDir("/mnt", depth=0) + assert rows == [("/ok.txt", 1), ("/good.bin", 99)] + + +@pytest.mark.unit +def test_node_listdir_depth_clamped_to_byte_range(): + """listDir clamps depth to 0..255 in the MFLIST command.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + first_cmd: list[bytes] = [] + + def fake_roundtrip(xm, timeout_s=5.0): + if xm.control == XC.SOH and xm.seq == 0: + first_cmd.append(bytes(xm.buffer)) + p = xmodem_pb2.XModem() + p.control = XC.SOH + p.seq = 1 + p.buffer = b"/x\t0\n" + p.crc16 = Node._xmodem_crc16(p.buffer) + return p + if xm.control == XC.ACK: + p = xmodem_pb2.XModem() + p.control = XC.EOT + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + anode.listDir("/mount", depth=300) + anode.listDir("/mount", depth=-5) + assert first_cmd[0] == b"MFLIST /mount 255" + assert first_cmd[1] == b"MFLIST /mount 0" + + # TODO # @pytest.mark.unitslow # def test_waitForConfig():