用Python从零实现一个卡尔曼滤波器(附完整代码与可视化)

卡尔曼滤波是工程领域最经典的状态估计算法之一,广泛应用于机器人导航、自动驾驶、金融预测等领域。本文将带你从零开始,用Python实现一个完整的卡尔曼滤波器,并通过可视化直观展示其工作原理。

1. 卡尔曼滤波基础概念

卡尔曼滤波的核心思想是通过融合预测和观测数据,得到更准确的状态估计。它假设系统状态和观测噪声都服从高斯分布,通过递归计算实现最优估计。

关键变量定义

  • x :状态向量(如位置和速度)
  • P :状态协方差矩阵(不确定性)
  • F :状态转移矩阵
  • H :观测矩阵
  • Q :过程噪声协方差
  • R :观测噪声协方差
  • K :卡尔曼增益

卡尔曼滤波分为两个主要步骤:

  1. 预测步骤 :根据系统模型预测下一时刻状态
  2. 更新步骤 :用观测数据修正预测结果

2. 一维卡尔曼滤波器实现

下面我们实现一个简单的一维卡尔曼滤波器,用于跟踪物体的位置和速度。

import numpy as np

class KalmanFilter1D:
    def __init__(self, initial_x, initial_v, accel_variance):
        # 初始状态 (位置和速度)
        self.x = np.array([initial_x, initial_v])
        
        # 初始协方差矩阵
        self.P = np.eye(2)
        
        # 状态转移矩阵
        self.F = np.array([[1, 1], [0, 1]])
        
        # 过程噪声协方差
        self.Q = np.array([[0.25, 0.5], [0.5, 1.0]]) * accel_variance
        
        # 观测矩阵 (只观测位置)
        self.H = np.array([[1, 0]])
        
        # 观测噪声
        self.R = np.array([[1.0]])
        
    def predict(self):
        # 状态预测
        self.x = self.F @ self.x
        # 协方差预测
        self.P = self.F @ self.P @ self.F.T + self.Q
        
    def update(self, z):
        # 计算卡尔曼增益
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)
        
        # 状态更新
        y = z - self.H @ self.x
        self.x = self.x + K @ y
        
        # 协方差更新
        I = np.eye(2)
        self.P = (I - K @ self.H) @ self.P

参数说明

  • initial_x : 初始位置
  • initial_v : 初始速度
  • accel_variance : 加速度方差,反映系统不确定性

3. 二维卡尔曼滤波器实现

对于更复杂的场景,我们需要跟踪物体在二维平面中的运动。下面是二维卡尔曼滤波器的实现:

class KalmanFilter2D:
    def __init__(self, initial_state, dt=1.0):
        # 状态向量: [x, y, vx, vy]
        self.x = initial_state
        
        # 状态转移矩阵
        self.F = np.array([
            [1, 0, dt, 0],
            [0, 1, 0, dt],
            [0, 0, 1, 0],
            [0, 0, 0, 1]
        ])
        
        # 过程噪声协方差
        self.Q = np.eye(4) * 0.1
        
        # 观测矩阵 (只观测x,y位置)
        self.H = np.array([
            [1, 0, 0, 0],
            [0, 1, 0, 0]
        ])
        
        # 观测噪声
        self.R = np.eye(2) * 1.0
        
        # 状态协方差
        self.P = np.eye(4)
        
    def predict(self):
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q
        return self.x[:2]  # 返回预测的位置
        
    def update(self, z):
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)
        
        y = z - self.H @ self.x
        self.x = self.x + K @ y
        
        I = np.eye(4)
        self.P = (I - K @ self.H) @ self.P
        return self.x[:2]  # 返回更新后的位置

4. 可视化实现与结果分析

为了直观理解卡尔曼滤波的工作原理,我们使用Matplotlib进行可视化:

import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

def simulate_and_visualize():
    # 真实轨迹 (圆周运动)
    t = np.linspace(0, 10, 100)
    true_x = 5 * np.cos(t)
    true_y = 5 * np.sin(t)
    
    # 添加噪声的观测
    np.random.seed(42)
    observed_x = true_x + np.random.normal(0, 1, len(t))
    observed_y = true_y + np.random.normal(0, 1, len(t))
    
    # 初始化卡尔曼滤波器
    kf = KalmanFilter2D(np.array([5, 0, 0, 1]))
    
    # 存储滤波结果
    filtered_x = []
    filtered_y = []
    
    # 运行滤波
    for i in range(len(t)):
        kf.predict()
        pos = kf.update(np.array([observed_x[i], observed_y[i]]))
        filtered_x.append(pos[0])
        filtered_y.append(pos[1])
    
    # 绘制结果
    fig, ax = plt.subplots(figsize=(10, 8))
    ax.plot(true_x, true_y, 'g-', label='真实轨迹')
    ax.plot(observed_x, observed_y, 'r.', label='观测值', alpha=0.5)
    ax.plot(filtered_x, filtered_y, 'b-', label='卡尔曼滤波')
    ax.legend()
    ax.set_title('二维卡尔曼滤波效果对比')
    ax.set_xlabel('X位置')
    ax.set_ylabel('Y位置')
    ax.grid(True)
    plt.show()

simulate_and_visualize()

可视化结果分析

  1. 绿色曲线 :物体的真实运动轨迹(圆周运动)
  2. 红色点 :带有噪声的观测值
  3. 蓝色曲线 :卡尔曼滤波估计的轨迹

从图中可以明显看出,卡尔曼滤波能够有效减少观测噪声的影响,得到接近真实轨迹的估计结果。

5. 参数调优与性能分析

卡尔曼滤波的性能很大程度上取决于参数的选择。以下是关键参数的调优建议:

过程噪声协方差Q

  • 反映系统模型的不确定性
  • 值越大,滤波器对预测的信任度越低
  • 对于运动平稳的系统,Q应设较小值

观测噪声协方差R

  • 反映测量设备的精度
  • 值越大,滤波器对观测的信任度越低
  • 对于高精度传感器,R应设较小值

初始协方差P

  • 反映初始状态的不确定性
  • 通常可以设为较大的值,让滤波器快速收敛

调优示例

# 调优后的参数设置示例
def optimized_kalman_filter():
    kf = KalmanFilter2D(np.array([5, 0, 0, 1]))
    kf.Q = np.diag([0.05, 0.05, 0.1, 0.1])  # 较小的过程噪声
    kf.R = np.diag([0.5, 0.5])             # 适中的观测噪声
    kf.P = np.diag([1.0, 1.0, 2.0, 2.0])   # 较大的初始不确定性
    return kf

6. 实际应用案例:目标跟踪

卡尔曼滤波在计算机视觉的目标跟踪中有广泛应用。下面是一个简化的目标跟踪实现:

class ObjectTracker:
    def __init__(self):
        self.tracks = []
        self.next_id = 0
        
    def update(self, detections):
        # 预测现有轨迹
        for track in self.tracks:
            track['kf'].predict()
            
        # 数据关联 (简化的最近邻匹配)
        if detections and self.tracks:
            dists = np.zeros((len(self.tracks), len(detections)))
            for i, track in enumerate(self.tracks):
                for j, det in enumerate(detections):
                    pred_pos = track['kf'].x[:2]
                    dists[i,j] = np.linalg.norm(pred_pos - det)
                    
            # 简单的最小距离匹配
            rows, cols = linear_sum_assignment(dists)
            for r, c in zip(rows, cols):
                if dists[r,c] < 50:  # 匹配阈值
                    self.tracks[r]['kf'].update(detections[c])
                    self.tracks[r]['misses'] = 0
                else:
                    self.tracks[r]['misses'] += 1
                    
        # 移除丢失的轨迹
        self.tracks = [t for t in self.tracks if t['misses'] < 3]
        
        # 为新检测���建轨迹
        for det in detections:
            if all(np.linalg.norm(t['kf'].x[:2] - det) > 30 for t in self.tracks):
                kf = KalmanFilter2D(np.array([det[0], det[1], 0, 0]))
                self.tracks.append({
                    'id': self.next_id,
                    'kf': kf,
                    'misses': 0
                })
                self.next_id += 1
                
        return self.tracks

使用示例

# 模拟检测数据
detections = [
    np.array([10, 20]),
    np.array([30, 40]),
    np.array([50, 60])
]

tracker = ObjectTracker()
tracks = tracker.update(detections)

for track in tracks:
    print(f"Track {track['id']}: Position {track['kf'].x[:2]}")

7. 高级话题与扩展

7.1 非线性系统:扩展卡尔曼滤波(EKF)

当系统模型或观测模型为非线性时,可以使用扩展卡尔曼滤波:

class ExtendedKalmanFilter:
    def __init__(self, initial_state):
        self.x = initial_state
        self.P = np.eye(len(initial_state))
        self.Q = np.eye(len(initial_state)) * 0.1
        self.R = np.eye(2) * 1.0
        
    def predict(self, f, F_jacobian):
        # 非线性状态转移
        self.x = f(self.x)
        # 计算雅可比矩阵
        F = F_jacobian(self.x)
        self.P = F @ self.P @ F.T + self.Q
        
    def update(self, z, h, H_jacobian):
        # 非线性观测
        z_pred = h(self.x)
        # 计算雅可比矩阵
        H = H_jacobian(self.x)
        
        S = H @ self.P @ H.T + self.R
        K = self.P @ H.T @ np.linalg.inv(S)
        
        y = z - z_pred
        self.x = self.x + K @ y
        self.P = (np.eye(len(self.x)) - K @ H) @ self.P

7.2 无迹卡尔曼滤波(UKF)

UKF使用sigma点来近似非线性变换,通常比EKF有更好的性能:

def unscented_transform(sigma_points, mean_weights, cov_weights):
    # 计算变换后的均值和协方差
    new_mean = np.sum(mean_weights * sigma_points, axis=0)
    diff = sigma_points - new_mean
    new_cov = np.sum(cov_weights * (diff[:,:,None] @ diff[:,None,:]), axis=0)
    return new_mean, new_cov

class UnscentedKalmanFilter:
    def __init__(self, initial_state, alpha=1e-3, beta=2, kappa=0):
        self.x = initial_state
        self.P = np.eye(len(initial_state))
        self.n = len(initial_state)
        self.lambda_ = alpha**2 * (self.n + kappa) - self.n
        self.weights_mean = np.full(2*self.n+1, 1/(2*(self.n+self.lambda_)))
        self.weights_mean[0] = self.lambda_/(self.n+self.lambda_)
        self.weights_cov = self.weights_mean.copy()
        self.weights_cov[0] += (1 - alpha**2 + beta)
        
    def generate_sigma_points(self):
        sigma_points = np.zeros((2*self.n+1, self.n))
        sigma_points[0] = self.x
        sqrt_P = np.linalg.cholesky((self.n + self.lambda_)*self.P)
        for i in range(self.n):
            sigma_points[i+1] = self.x + sqrt_P[i]
            sigma_points[self.n+i+1] = self.x - sqrt_P[i]
        return sigma_points
    
    def predict(self, f):
        sigma_points = self.generate_sigma_points()
        transformed_points = np.array([f(point) for point in sigma_points])
        self.x, self.P = unscented_transform(transformed_points, 
                                           self.weights_mean, 
                                           self.weights_cov)
        self.P += self.Q
        
    def update(self, z, h):
        sigma_points = self.generate_sigma_points()
        transformed_points = np.array([h(point) for point in sigma_points])
        z_pred, Pzz = unscented_transform(transformed_points,
                                        self.weights_mean,
                                        self.weights_cov)
        Pzz += self.R
        
        # 计算互协方差
        diff_x = sigma_points - self.x
        diff_z = transformed_points - z_pred
        Pxz = np.sum(self.weights_cov * (diff_x[:,:,None] @ diff_z[:,None,:]), axis=0)
        
        # 计算卡尔曼增益
        K = Pxz @ np.linalg.inv(Pzz)
        
        # 状态更新
        self.x = self.x + K @ (z - z_pred)
        self.P = self.P - K @ Pzz @ K.T

8. 性能优化技巧

数值稳定性优化

  1. 使用平方根形式实现协方差更新
  2. 避免矩阵求逆,使用Cholesky分解
  3. 定期检查协方差矩阵的正定性

计算效率优化

  1. 预计算不变矩阵
  2. 利用稀疏矩阵特性
  3. 并行化计算

代码示例:平方根卡尔曼滤波

def sqrt_kalman_update(x, P_sqrt, H, R_sqrt, z):
    # 预测残差
    y = z - H @ x
    
    # 计算中间矩阵
    PHt = P_sqrt @ (P_sqrt.T @ H.T)
    S_sqrt = np.linalg.qr(np.vstack([
        H @ PHt,
        R_sqrt
    ]), mode='r')
    
    # 计算卡尔曼增益
    K = PHt @ np.linalg.inv(S_sqrt.T @ S_sqrt)
    
    # 状态更新
    x_new = x + K @ y
    
    # 协方差更新
    temp = K @ S_sqrt
    P_sqrt_new = np.linalg.qr(np.hstack([
        P_sqrt - temp @ H,
        temp
    ]), mode='r')
    
    return x_new, P_sqrt_new

更多推荐