Skip to content

Robot Data Pipeline

How sensor data flows from hardware to analysis, with practical examples.

System Architecture

┌─────────────────────────────────────────────────────────────────┐
│                        ROBOT (Pico)                             │
│  ┌─────────┐    ┌──────────┐    ┌──────────┐    ┌────────────┐  │
│  │ BMI160  │───→│ Raw Data │───→│ Streamer │───→│ UDP Packet │  │
│  │ Sensor  │    │  ax,ay,  │    │ Bundle   │    │ to WiFi    │  │
│  │ 800 Hz  │    │  az,gx,  │    │ samples  │    │            │  │
│  │         │    │  gy,gz   │    │          │    │            │  │
│  └─────────┘    └──────────┘    └──────────┘    └────────────┘  │
└───────────────────────────────────────────────│────────────────┘
                                    UDP (port 5005)
┌─────────────────────────────────────────────────────────────────┐
│                        HOST (PC)                                │
│  ┌────────────┐    ┌──────────┐    ┌──────────┐    ┌─────────┐  │
│  │ UDP Socket │───→│ Parse    │───→│ Buffers  │───→│ Charts  │  │
│  │ Receive    │    │ Packets  │    │ (deque)  │    │ (live)  │  │
│  └────────────┘    └──────────┘    └────┬─────┘    └─────────┘  │
│                                         │                       │
│                                         ↓                       │
│                                    ┌──────────┐    ┌─────────┐  │
│                                    │ Record   │───→│ CSV     │  │
│                                    │ Buffer   │    │ Export  │  │
│                                    └────┬─────┘    └─────────┘  │
│                                         │                       │
│                                         ↓                       │
│                                    ┌──────────────────────────┐ │
│                                    │ ANALYZERS                │ │
│                                    │ ├── BumpDetector         │ │
│                                    │ ├── VibrationAnalyzer    │ │
│                                    │ ├── FFTAnalyzer          │ │
│                                    │ └── MotionClassifier     │ │
│                                    └──────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

Step 1: Sensor Reading (Robot)

BMI160 Hardware

The BMI160 IMU measures: - Accelerometer: Force in X, Y, Z (including gravity) - Gyroscope: Rotation rate around X, Y, Z

# From pico_collector.py

class BMI160:
    """6-axis IMU (Accelerometer + Gyroscope)"""

    def read_raw(self):
        """Read raw 16-bit values from sensor."""
        # BMI160 data format: little-endian, gyro first
        data = self.i2c.readfrom_mem(self.addr, 0x0C, 12)
        gx = struct.unpack_from('<h', data, 0)[0]
        gy = struct.unpack_from('<h', data, 2)[0]
        gz = struct.unpack_from('<h', data, 4)[0]
        ax = struct.unpack_from('<h', data, 6)[0]
        ay = struct.unpack_from('<h', data, 8)[0]
        az = struct.unpack_from('<h', data, 10)[0]
        return (ax, ay, az, gx, gy, gz)

Converting Raw to Physical Units

# Raw values → Physical units
# Accelerometer: ±2g range at 16384 LSB/g
ax_g = ax_raw / 16384.0  # Result in g (9.81 m/s²)

# Gyroscope: ±250°/s range at 131.2 LSB/(°/s)
gx_dps = gx_raw / 131.2  # Result in degrees per second
Understanding g-force
  • Robot stationary: az ≈ 1.0g (gravity pointing down)
  • Robot tilted 45°: ax ≈ 0.7g, az ≈ 0.7g
  • Robot accelerating: ax shows thrust force

Step 2: Data Streaming (Robot → Host)

Packet Format

# Each sample: timestamp + 7 floats
# <Q7f = little-endian: uint64 + 7 × float32
sample = struct.pack('<Q7f',
    timestamp_us,  # Microseconds since boot
    ax, ay, az,    # Accelerometer (g)
    gx, gy, gz,    # Gyroscope (°/s)
    distance       # Ultrasonic (cm)
)

# Packet: header + multiple samples
# [0x01][count][sample1][sample2]...
packet = bytes([0x01, batch_size]) + samples_data

Why Batch Samples?

Without batching:           With batching (10 samples):
┌──────┐                    ┌──────────────────────────┐
│ S1   │ → network latency  │ S1 S2 S3 S4 S5 S6 S7... │ → 1 network latency
└──────┘                    └──────────────────────────┘
┌──────┐
│ S2   │ → network latency  10x fewer packets
└──────┘                    More efficient WiFi usage
   ...                      Smoother data flow

Step 3: Parsing on Host

# From host_collector.py

def parse_sensor_packet(data):
    """Parse UDP packet from robot into samples."""
    if len(data) < 3:
        return []

    packet_type = data[0]
    count = data[1]
    samples = []

    if packet_type == 0x01:  # Standard IMU packet
        sample_size = 36  # <Q7f = 8 + 7*4 = 36 bytes

        for i in range(count):
            offset = 2 + i * sample_size
            sample_data = data[offset:offset + sample_size]

            if len(sample_data) == sample_size:
                values = struct.unpack('<Q7f', sample_data)
                samples.append({
                    'timestamp_us': values[0],
                    'ax': values[1], 'ay': values[2], 'az': values[3],
                    'gx': values[4], 'gy': values[5], 'gz': values[6],
                    'us': values[7]
                })

    return samples

Step 4: Buffering for Charts

from collections import deque

# Ring buffers for chart data
CHART_HISTORY = 500  # 1 second at 500 Hz

time_buffer = deque(maxlen=CHART_HISTORY)
data_buffers = {
    'ax': deque(maxlen=CHART_HISTORY),
    'ay': deque(maxlen=CHART_HISTORY),
    'az': deque(maxlen=CHART_HISTORY),
    # ... etc
}

def process_sample(sample):
    time_s = (sample['timestamp_us'] - first_timestamp) / 1_000_000
    time_buffer.append(time_s)

    for key in data_buffers:
        if key in sample:
            data_buffers[key].append(sample[key])
Why deque with maxlen?
  • Automatically removes oldest data when full
  • Constant O(1) append and pop
  • No manual memory management needed

Step 5: Analysis

Bump Detection Pipeline

# analyzers/bump_detector.py

class BumpDetector:
    """Counts wheel bumps using high-pass filter."""

    def __init__(self, threshold=0.015, hp_alpha=0.95):
        self.threshold = threshold
        self.hp_alpha = hp_alpha

    def analyze(self, data):
        """
        Pipeline:
        1. Calculate magnitude: sqrt(ax² + ay² + az²) - 1
        2. High-pass filter to remove DC/slow drift
        3. Count threshold crossings
        """
        magnitudes = []
        for row in data:
            mag = math.sqrt(row['ax']**2 + row['ay']**2 + row['az']**2) - 1.0
            magnitudes.append(mag)

        # High-pass filter
        filtered = []
        prev_in, prev_out = 0, 0
        for m in magnitudes:
            out = self.hp_alpha * (prev_out + m - prev_in)
            filtered.append(out)
            prev_in, prev_out = m, out

        # Count bumps
        bumps = 0
        was_above = False
        for f in filtered:
            is_above = f > self.threshold
            if is_above and not was_above:
                bumps += 1
            was_above = is_above

        return bumps

FFT Analysis Pipeline

# analyzers/fft_analyzer.py

class FFTAnalyzer:
    """Frequency domain analysis."""

    def __init__(self, sample_rate=500, window_type='hann'):
        self.sample_rate = sample_rate
        self.window_type = window_type

    def analyze(self, signal):
        """
        Pipeline:
        1. Remove DC offset (mean)
        2. Apply window function (Hann)
        3. Compute FFT
        4. Find peaks in magnitude spectrum
        """
        import numpy as np

        # Remove DC
        signal = np.array(signal) - np.mean(signal)

        # Apply window
        if self.window_type == 'hann':
            window = np.hanning(len(signal))
        else:
            window = np.hamming(len(signal))
        windowed = signal * window

        # FFT
        fft = np.fft.fft(windowed)
        freqs = np.fft.fftfreq(len(signal), 1/self.sample_rate)

        # Get positive frequencies only
        n = len(signal) // 2
        magnitudes = np.abs(fft[:n]) / n
        freqs = freqs[:n]

        # Find peaks
        peaks = self._find_peaks(magnitudes, freqs)

        return {
            'frequencies': freqs,
            'magnitudes': magnitudes,
            'peaks': peaks
        }

Vibration Analysis Pipeline

# analyzers/vibration_analyzer.py

class VibrationAnalyzer:
    """Sliding window variance for motion intensity."""

    def __init__(self, window_size=50):
        self.window_size = window_size

    def analyze(self, data):
        """
        Pipeline:
        1. Calculate acceleration magnitude
        2. Compute variance in sliding window
        3. Classify motion state by variance level
        """
        magnitudes = [
            math.sqrt(r['ax']**2 + r['ay']**2 + r['az']**2)
            for r in data
        ]

        variances = []
        for i in range(len(magnitudes) - self.window_size):
            window = magnitudes[i:i + self.window_size]
            var = sum((x - sum(window)/len(window))**2
                     for x in window) / len(window)
            variances.append(var)

        # Classify by variance
        states = []
        for v in variances:
            if v < 0.001:
                states.append('STOPPED')
            elif v < 0.01:
                states.append('SLOW')
            elif v < 0.1:
                states.append('MEDIUM')
            else:
                states.append('FAST')

        return states, variances

Putting It Together

Complete Workflow

# 1. Deploy robot code
ampy --port /dev/ttyACM0 put pico_collector.py main.py

# 2. Start host (robot must be on same WiFi)
python host_collector.py

# 3. Drive and record (press R)
# WASD to control, R to record, E to export

# 4. Analyze recorded data
python analyzers/bump_detector.py data/recording.csv
python analyzers/fft_analyzer.py data/recording.csv --bump
python analyzers/vibration_analyzer.py data/recording.csv

Real-Time vs Batch Analysis

Analysis Real-Time (on robot) Batch (on host)
Bump counting HP filter + threshold Same, higher accuracy
Speed Bump rate × calibration FFT peak detection
Motion state Simple thresholds ML classifier
Vibration Variance accumulator Full windowed analysis
Educational Approach

Robot does simple, reliable calculations in real-time. Host does complex analysis on recorded data.

This separation makes both sides easier to understand and debug.

Calibration

Bump-to-Distance Calibration

# Drive robot a known distance, count bumps
known_distance_cm = 100  # Measure with ruler
total_bumps = 523        # From analysis

bumps_per_cm = total_bumps / known_distance_cm
# = 5.23 bumps/cm

# Later: estimate distance from bumps
estimated_distance = new_bump_count / bumps_per_cm

Speed Calibration

# Correlate bump frequency with known speed
known_speed_cm_s = 20    # Measure with timer
bump_frequency_hz = 25   # From FFT analysis

calibration_factor = known_speed_cm_s / bump_frequency_hz
# = 0.8 cm/s per Hz

# Later: estimate speed from FFT
estimated_speed = new_bump_freq * calibration_factor

Common Issues and Solutions

Issue: No Data in Charts

Check:
1. Robot connected to WiFi?
2. Correct IP address/port?
3. Firewall blocking UDP?
4. Robot code running? (LED blinking?)

Issue: Noisy Bump Detection

Adjust threshold:
- Too many false positives: increase threshold (0.015 → 0.025)
- Missing real bumps: decrease threshold (0.015 → 0.010)

Or adjust filter:
- Too sensitive: increase hp_alpha (0.95 → 0.98)
- Too sluggish: decrease hp_alpha (0.95 → 0.90)

Issue: Wrong FFT Frequencies

Check sample rate:
- If FFT shows 50Hz but expect 25Hz, sample rate might be wrong
- Verify: count samples per second from recorded CSV
- Actual Fs = len(data) / (last_time - first_time)

Next Steps

  • [[Reference/extras/signal-processing/basics|Signal Processing Basics]] - Mathematical foundations
  • [[Reference/07-data-analysis/analyzers-reference|Analyzers Reference]] - CLI tool reference
  • [[Reference/07-data-analysis/overview|Data Analysis Overview]] - Workflow overview