Three-Tier Safety Architecture
The platform separates flight control into three strictly isolated tiers. Each tier communicates only downward through well-defined interfaces — no tier can bypass the one below it. The AI advisory layer sits entirely outside the command path.
Reactive Layer — 1 kHz
PX4 firmware on NuttX RTOS. Attitude quaternions, motor mixing matrices, hard real-time servo control. No user code runs here — this layer is treated as a certified black box.
Executive Layer — 10–50 Hz
Zig 0.14.0 onboard companion. Safety governor, formal shield (5 geometric invariants), deadman switch with NTP-attack detection, 13-state EKF. DAL-A discipline: no eval, no dynamic dispatch in safety paths.
Advisory Layer — Async
Claude Sonnet / Ollama mistral:7b AI router with $5/day cap and circuit breaker. Produces waypoint suggestions and mission analysis. Never touches actuators directly.
Safety Principle
The AI never writes to the flight command path. Every AI suggestion passes through the formal shield. On exception, the governor returns NO_GO immediately.
SE(3) Mathematics & 13-State EKF
The state vector encodes full rigid-body pose with IMU bias compensation:
// x[13] = [ px py pz | vx vy vz | qw qx qy qz | bgx bgy bgz ] // position(m) velocity(m/s) attitude(unit quat) gyro bias(rad/s)
The math library is built on SE(3) Lie groups using Zig comptime generics: Vec3(T), Mat3(T), Mat4(T), Quaternion(T), SE3(T) — all instantiated at compile time for f32 and f64 with zero runtime cost.
SO(3) exp/log maps convert angular velocity to quaternion increments using Taylor series near zero to avoid the sinc singularity. Mahalanobis distance gates GPS measurements: innovation y, d² = yT·S-1·y, reject if d² > 9.21 (chi²₃ 95th percentile).
EKF Predict — 100 Hz IMU integration
// EKF predict — called at 100 Hz from IMU interrupt pub fn predict(s: *EkfState, dt: f64, gyro: Vec3f64, accel: Vec3f64) void { const q = Quaternion(f64){ .w=s.x[6], .x=s.x[7], .y=s.x[8], .z=s.x[9] }; const R = q.to_rotation_matrix(); const bias = Vec3f64{ .x=s.x[10], .y=s.x[11], .z=s.x[12] }; const w_corrected = gyro.sub(bias); // State propagation s.x[0] += s.x[3]*dt; s.x[1] += s.x[4]*dt; s.x[2] += s.x[5]*dt; const a_world = R.mul_vec(accel); s.x[3] += a_world.x*dt; s.x[4] += a_world.y*dt; s.x[5] += (a_world.z - 9.81)*dt; const dq = Quaternion(f64).exp_so3(w_corrected.scale(dt)); _ = dq; // quaternion applied via Hamilton product in propagate_covariance // P propagation: P = F·P·Fᵀ + Q (Jacobian F computed inline) propagate_covariance(s, dt, R, w_corrected); }
GPS Update — Mahalanobis-gated measurement
pub fn update_gps(s: *EkfState, z: Vec3f64) void { const y = Vec3f64{ .x=z.x-s.x[0], .y=z.y-s.x[1], .z=z.z-s.x[2] }; const S = compute_innovation_cov(s); // S = H·P·Hᵀ + R_gps const d_sq = mahalanobis_sq(y, S); if (d_sq > 9.21) return; // Mahalanobis gate: chi²₃ at 95% const K = kalman_gain(s, S); apply_update(s, K, y); }
AES-GCM-256 Per-Packet Cryptography
Every MAVLink 2 packet is encrypted with AES-GCM-256 before transmission over NATS. A counter-based nonce (u96, incremented per packet) avoids a CSPRNG call in the hot loop while guaranteeing nonce uniqueness under the same key.
The NATS subject (e.g. "uav.v1.telemetry.attitude") is bound as Additional Data in the AEAD construction. A tampered routing label causes decryption to fail — routing attacks are cryptographically prevented, not just detected.
const Aes256Gcm = std.crypto.aead.aes_gcm.Aes256Gcm; pub fn seal(key: [32]u8, plaintext: []const u8, subject: []const u8, nonce_ctr: *u96) AuthPacket { var nonce: [12]u8 = undefined; std.mem.writeInt(u96, &nonce, nonce_ctr.*, .little); nonce_ctr.* += 1; // counter never reuses nonce under same key var pkt = AuthPacket{ .nonce = nonce, .len = @intCast(plaintext.len) }; Aes256Gcm.encrypt( pkt.ciphertext[0..plaintext.len], &pkt.tag, plaintext, subject, // subject = additional data (AEAD) nonce, key, ); return pkt; }
Decryption verifies the GCM tag before any plaintext is released. A mismatched subject — even a single byte different — produces a tag failure. Packets from a compromised relay that attempts to re-route telemetry to a different NATS subject are silently dropped.
NATS Edge Gateway & Ring Buffer
The onboard companion connects to a NATS JetStream broker over the RF link. RF dropout is expected — the 4096-slot ring buffer absorbs the gap with zero heap allocation: the backing array is a fixed-size comptime parameter, stack-allocated at startup.
On disconnect, encrypted packets are enqueued to the ring. On reconnect, the buffer drains in FIFO order before live telemetry resumes. Reconnect uses exponential backoff: 100ms → 200ms → 400ms → … capped at 30s. Cross-compiles to aarch64-linux-musl for Jetson Orin and Raspberry Pi 5 with a single zig build -Dtarget=aarch64-linux-musl.
pub fn RingBuffer(comptime T: type, comptime N: usize) type { return struct { data: [N]T = undefined, head: usize = 0, tail: usize = 0, count: usize = 0, // push returns false when full (caller chooses: drop or backpressure) pub fn push(self: *@This(), item: T) bool { if (self.count == N) return false; // full self.data[self.head] = item; self.head = (self.head + 1) % N; self.count += 1; return true; } pub fn pop(self: *@This()) ?T { if (self.count == 0) return null; // empty const item = self.data[self.tail]; self.tail = (self.tail + 1) % N; self.count -= 1; return item; } pub fn is_empty(self: @This()) bool { return self.count == 0; } pub fn is_full(self: @This()) bool { return self.count == N; } }; } // Telemetry buffer: RingBuffer(AuthPacket, 4096) // ≈ 4096 × 308 bytes = 1.2 MB stack-allocated
MAVLink 2 Zero-Allocation Parser
The onboard companion parses MAVLink 2 frames directly from the UART byte stream with no heap allocation. The parser uses packed Zig structs that match the wire layout exactly — @bitCast to decode, zero-copy. A 280-byte stack buffer holds the maximum frame size.
MAVLink 2 uses CRC-16/X.25 (poly=0x1021, init=0xFFFF). The CRC covers the header + payload + a per-message CRC_EXTRA byte (seeded from the message definition — prevents cross-message injection). Known CRC_EXTRA values: HEARTBEAT(0)=50, SYS_STATUS(1)=124, ATTITUDE(30)=39, GLOBAL_POS_INT(33)=104, COMMAND_LONG(76)=152, COMMAND_ACK(77)=143.
// telemetry/mavlink.zig const STX: u8 = 0xFD; // MAVLink 2 start byte const MAX_PAYLOAD: usize = 253; pub const MavlinkHeader = packed struct { stx: u8, payload_len: u8, incompat_flags: u8, compat_flags: u8, seq: u8, sysid: u8, compid: u8, msgid: u24, // 3-byte little-endian message ID }; pub const Attitude = packed struct { time_boot_ms: u32, roll: f32, pitch: f32, yaw: f32, rollspeed: f32, pitchspeed: f32, yawspeed: f32, }; pub const GlobalPositionInt = packed struct { time_boot_ms: u32, lat: i32, lon: i32, alt: i32, relative_alt: i32, vx: i16, vy: i16, vz: i16, hdg: u16, }; // X.25 CRC — poly 0x1021, init 0xFFFF fn crc16_x25_update(crc: u16, byte: u8) u16 { var c = crc ^ (@as(u16, byte) << 8); var i: u4 = 0; while (i < 8) : (i += 1) { c = if (c & 0x8000 != 0) (c << 1) ^ 0x1021 else c << 1; } return c; }
pub const ParseResult = union(enum) { need_more, crc_error, unknown_msg_id: u24, heartbeat: Heartbeat, attitude: Attitude, global_pos: GlobalPositionInt, }; pub fn parse_packet(buf: []const u8) ParseResult { if (buf.len < @sizeOf(MavlinkHeader) + 2) return .need_more; const hdr = @as(*const MavlinkHeader, @ptrCast(buf.ptr)).*; if (hdr.stx != STX) return .crc_error; const payload_end = @sizeOf(MavlinkHeader) + hdr.payload_len; if (buf.len < payload_end + 2) return .need_more; // Verify CRC over header[1..] + payload + CRC_EXTRA[msgid] var crc: u16 = 0xFFFF; for (buf[1..payload_end]) |b| crc = crc16_x25_update(crc, b); const extra = CRC_EXTRA_TABLE[hdr.msgid] orelse return .{ .unknown_msg_id = hdr.msgid }; crc = crc16_x25_update(crc, extra); const wire_crc = std.mem.readInt(u16, buf[payload_end..][0..2], .little); if (crc != wire_crc) return .crc_error; const payload = buf[@sizeOf(MavlinkHeader)..payload_end]; return switch (hdr.msgid) { 30 => .{ .attitude = @as(*const Attitude, @ptrCast(payload.ptr)).* }, 33 => .{ .global_pos = @as(*const GlobalPositionInt, @ptrCast(payload.ptr)).* }, 0 => .{ .heartbeat = @as(*const Heartbeat, @ptrCast(payload.ptr)).* }, else => .{ .unknown_msg_id = hdr.msgid }, }; }
The CRC_EXTRA_TABLE is a comptime array keyed by message ID. Unknown message IDs return .unknown_msg_id without error — the gateway logs and drops them. A crc_error triggers a metric increment and frame re-sync (scan forward for next 0xFD byte).
Formal Safety Shield
The executive governor enforces 5 geometric invariants before any setpoint reaches PX4. Invariant checks are inlined into a single function with no allocations and no dynamic dispatch — the compiler can verify the path is branch-free on the happy path.
-
1
Altitude floor:
pos.z >= FLOOR_M— never fly below the hard deck -
2
Geofence:
distance_from_home(pos) <= MAX_RADIUS_M— hard cylinder boundary -
3
Velocity limit:
vel.norm() <= MAX_VEL_MS— catches runaway EKF divergence -
4
Attitude limit:
2·acos(|q.w|) <= MAX_TILT_RAD— prevents inverted flight -
5
Deadman switch:
monotonic_clock_delta() <= HEARTBEAT_PERIOD_S— detects NTP spoofing and operator link loss
On any invariant violation the governor returns NO_GO immediately. Decision variables are pre-bound before the try block so an unexpected exception can never leave the return value unset. The shield is the last line of defence — it cannot be bypassed by the advisory layer.
Math library — SE(3) comptime generics
Vec3/Mat3/Mat4/Quaternion/SE3 with exp/log maps and full unit test coverage across f32 and f64 instantiations.
13-state EKF — IMU + GPS + barometer fusion
Mahalanobis gating on GPS and barometer updates, covariance propagation with analytic Jacobian, gyro bias convergence tests.
MAVLink 2 parser — zero allocation
X.25 CRC, packed structs, message dispatch table. No heap in the hot path — all parsing in caller-provided fixed buffers.
AES-GCM-256 + NATS edge gateway
Counter nonce, AEAD subject binding, 4096-slot ring buffer, exponential backoff reconnect with ordered replay.
Formal shield + 58 tests passing
5 geometric invariants verified at every governor tick. CI gate enforces DAL-A rules: no dynamic dispatch, no heap in safety paths, all 58 tests green.
Single Command: Native + Jetson Cross-Compile
The companion builds to two targets from a single build.zig. No Docker, no cross-toolchain setup — Zig ships its own linker and libc. The aarch64 musl target produces a statically linked binary with zero shared library dependencies.
// build.zig pub fn build(b: *std.Build) void { // Native build (x86_64 dev machine) const exe = b.addExecutable(.{ .name = "companion", .root_source_file = b.path("src/main.zig"), .target = b.host, .optimize = .ReleaseSafe, }); b.installArtifact(exe); // Cross-compile: Jetson Orin / Raspberry Pi 5 const arm_target = b.resolveTargetQuery(.{ .cpu_arch = .aarch64, .os_tag = .linux, .abi = .musl, // static libc — no runtime deps }); const arm_exe = b.addExecutable(.{ .name = "companion-arm64", .root_source_file = b.path("src/main.zig"), .target = arm_target, .optimize = .ReleaseSafe, }); b.installArtifact(arm_exe); // Test step — runs all unit tests on host const test_step = b.step("test", "Run all unit tests"); for (&[_][]const u8{ "src/math/vec3.zig", "src/math/quaternion.zig", "src/math/lie_groups.zig", "src/fusion/ekf.zig", "src/telemetry/mavlink.zig", "src/telemetry/ring_buffer.zig", "src/telemetry/crypto.zig", }) |src| { const t = b.addTest(.{ .root_source_file = b.path(src), .target = b.host }); test_step.dependOn(&b.addRunArtifact(t).step); } }
DAL-A CI gate: a Python script (scripts/check_dal_a_rules.py) scans all safety-critical Zig files for prohibited patterns — @import("std").heap, dynamic dispatch via anyopaque, and any catch that returns non-NO_GO. The test step refuses to pass if violations are found.
# scripts/check_dal_a_rules.py FORBIDDEN = [ r'std\.heap\.', # no heap allocator in safety paths r'anyopaque', # no dynamic dispatch r'@import\("builtin"\)', # no runtime OS detection in governor ] SAFETY_PATHS = ["src/fusion/", "src/telemetry/mavlink.zig"] for path in SAFETY_PATHS: for f in glob(path + "**/*.zig", recursive=True): src = open(f).read() for pattern in FORBIDDEN: if re.search(pattern, src): print(f"DAL-A VIOLATION in {f}: {pattern}") sys.exit(1) print("DAL-A gate: PASS")
Need a safety-critical system built right?
I design aerospace-grade software that separates concerns by hard boundary — not by convention. Engagements start with an architecture review.