用Python从零实现一个卡尔曼滤波器(附完整代码与可视化)
·
用Python从零实现一个卡尔曼滤波器(附完整代码与可视化)
卡尔曼滤波是工程领域最经典的状态估计算法之一,广泛应用于机器人导航、自动驾驶、金融预测等领域。本文将带你从零开始,用Python实现一个完整的卡尔曼滤波器,并通过可视化直观展示其工作原理。
1. 卡尔曼滤波基础概念
卡尔曼滤波的核心思想是通过融合预测和观测数据,得到更准确的状态估计。它假设系统状态和观测噪声都服从高斯分布,通过递归计算实现最优估计。
关键变量定义 :
x:状态向量(如位置和速度)P:状态协方差矩阵(不确定性)F:状态转移矩阵H:观测矩阵Q:过程噪声协方差R:观测噪声协方差K:卡尔曼增益
卡尔曼滤波分为两个主要步骤:
- 预测步骤 :根据系统模型预测下一时刻状态
- 更新步骤 :用观测数据修正预测结果
2. 一维卡尔曼滤波器实现
下面我们实现一个简单的一维卡尔曼滤波器,用于跟踪物体的位置和速度。
import numpy as np
class KalmanFilter1D:
def __init__(self, initial_x, initial_v, accel_variance):
# 初始状态 (位置和速度)
self.x = np.array([initial_x, initial_v])
# 初始协方差矩阵
self.P = np.eye(2)
# 状态转移矩阵
self.F = np.array([[1, 1], [0, 1]])
# 过程噪声协方差
self.Q = np.array([[0.25, 0.5], [0.5, 1.0]]) * accel_variance
# 观测矩阵 (只观测位置)
self.H = np.array([[1, 0]])
# 观测噪声
self.R = np.array([[1.0]])
def predict(self):
# 状态预测
self.x = self.F @ self.x
# 协方差预测
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
# 计算卡尔曼增益
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)
# 状态更新
y = z - self.H @ self.x
self.x = self.x + K @ y
# 协方差更新
I = np.eye(2)
self.P = (I - K @ self.H) @ self.P
参数说明 :
initial_x: 初始位置initial_v: 初始速度accel_variance: 加速度方差,反映系统不确定性
3. 二维卡尔曼滤波器实现
对于更复杂的场景,我们需要跟踪物体在二维平面中的运动。下面是二维卡尔曼滤波器的实现:
class KalmanFilter2D:
def __init__(self, initial_state, dt=1.0):
# 状态向量: [x, y, vx, vy]
self.x = initial_state
# 状态转移矩阵
self.F = np.array([
[1, 0, dt, 0],
[0, 1, 0, dt],
[0, 0, 1, 0],
[0, 0, 0, 1]
])
# 过程噪声协方差
self.Q = np.eye(4) * 0.1
# 观测矩阵 (只观测x,y位置)
self.H = np.array([
[1, 0, 0, 0],
[0, 1, 0, 0]
])
# 观测噪声
self.R = np.eye(2) * 1.0
# 状态协方差
self.P = np.eye(4)
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
return self.x[:2] # 返回预测的位置
def update(self, z):
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)
y = z - self.H @ self.x
self.x = self.x + K @ y
I = np.eye(4)
self.P = (I - K @ self.H) @ self.P
return self.x[:2] # 返回更新后的位置
4. 可视化实现与结果分析
为了直观理解卡尔曼滤波的工作原理,我们使用Matplotlib进行可视化:
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
def simulate_and_visualize():
# 真实轨迹 (圆周运动)
t = np.linspace(0, 10, 100)
true_x = 5 * np.cos(t)
true_y = 5 * np.sin(t)
# 添加噪声的观测
np.random.seed(42)
observed_x = true_x + np.random.normal(0, 1, len(t))
observed_y = true_y + np.random.normal(0, 1, len(t))
# 初始化卡尔曼滤波器
kf = KalmanFilter2D(np.array([5, 0, 0, 1]))
# 存储滤波结果
filtered_x = []
filtered_y = []
# 运行滤波
for i in range(len(t)):
kf.predict()
pos = kf.update(np.array([observed_x[i], observed_y[i]]))
filtered_x.append(pos[0])
filtered_y.append(pos[1])
# 绘制结果
fig, ax = plt.subplots(figsize=(10, 8))
ax.plot(true_x, true_y, 'g-', label='真实轨迹')
ax.plot(observed_x, observed_y, 'r.', label='观测值', alpha=0.5)
ax.plot(filtered_x, filtered_y, 'b-', label='卡尔曼滤波')
ax.legend()
ax.set_title('二维卡尔曼滤波效果对比')
ax.set_xlabel('X位置')
ax.set_ylabel('Y位置')
ax.grid(True)
plt.show()
simulate_and_visualize()
可视化结果分析 :
- 绿色曲线 :物体的真实运动轨迹(圆周运动)
- 红色点 :带有噪声的观测值
- 蓝色曲线 :卡尔曼滤波估计的轨迹
从图中可以明显看出,卡尔曼滤波能够有效减少观测噪声的影响,得到接近真实轨迹的估计结果。
5. 参数调优与性能分析
卡尔曼滤波的性能很大程度上取决于参数的选择。以下是关键参数的调优建议:
过程噪声协方差Q :
- 反映系统模型的不确定性
- 值越大,滤波器对预测的信任度越低
- 对于运动平稳的系统,Q应设较小值
观测噪声协方差R :
- 反映测量设备的精度
- 值越大,滤波器对观测的信任度越低
- 对于高精度传感器,R应设较小值
初始协方差P :
- 反映初始状态的不确定性
- 通常可以设为较大的值,让滤波器快速收敛
调优示例 :
# 调优后的参数设置示例
def optimized_kalman_filter():
kf = KalmanFilter2D(np.array([5, 0, 0, 1]))
kf.Q = np.diag([0.05, 0.05, 0.1, 0.1]) # 较小的过程噪声
kf.R = np.diag([0.5, 0.5]) # 适中的观测噪声
kf.P = np.diag([1.0, 1.0, 2.0, 2.0]) # 较大的初始不确定性
return kf
6. 实际应用案例:目标跟踪
卡尔曼滤波在计算机视觉的目标跟踪中有广泛应用。下面是一个简化的目标跟踪实现:
class ObjectTracker:
def __init__(self):
self.tracks = []
self.next_id = 0
def update(self, detections):
# 预测现有轨迹
for track in self.tracks:
track['kf'].predict()
# 数据关联 (简化的最近邻匹配)
if detections and self.tracks:
dists = np.zeros((len(self.tracks), len(detections)))
for i, track in enumerate(self.tracks):
for j, det in enumerate(detections):
pred_pos = track['kf'].x[:2]
dists[i,j] = np.linalg.norm(pred_pos - det)
# 简单的最小距离匹配
rows, cols = linear_sum_assignment(dists)
for r, c in zip(rows, cols):
if dists[r,c] < 50: # 匹配阈值
self.tracks[r]['kf'].update(detections[c])
self.tracks[r]['misses'] = 0
else:
self.tracks[r]['misses'] += 1
# 移除丢失的轨迹
self.tracks = [t for t in self.tracks if t['misses'] < 3]
# 为新检测���建轨迹
for det in detections:
if all(np.linalg.norm(t['kf'].x[:2] - det) > 30 for t in self.tracks):
kf = KalmanFilter2D(np.array([det[0], det[1], 0, 0]))
self.tracks.append({
'id': self.next_id,
'kf': kf,
'misses': 0
})
self.next_id += 1
return self.tracks
使用示例 :
# 模拟检测数据
detections = [
np.array([10, 20]),
np.array([30, 40]),
np.array([50, 60])
]
tracker = ObjectTracker()
tracks = tracker.update(detections)
for track in tracks:
print(f"Track {track['id']}: Position {track['kf'].x[:2]}")
7. 高级话题与扩展
7.1 非线性系统:扩展卡尔曼滤波(EKF)
当系统模型或观测模型为非线性时,可以使用扩展卡尔曼滤波:
class ExtendedKalmanFilter:
def __init__(self, initial_state):
self.x = initial_state
self.P = np.eye(len(initial_state))
self.Q = np.eye(len(initial_state)) * 0.1
self.R = np.eye(2) * 1.0
def predict(self, f, F_jacobian):
# 非线性状态转移
self.x = f(self.x)
# 计算雅可比矩阵
F = F_jacobian(self.x)
self.P = F @ self.P @ F.T + self.Q
def update(self, z, h, H_jacobian):
# 非线性观测
z_pred = h(self.x)
# 计算雅可比矩阵
H = H_jacobian(self.x)
S = H @ self.P @ H.T + self.R
K = self.P @ H.T @ np.linalg.inv(S)
y = z - z_pred
self.x = self.x + K @ y
self.P = (np.eye(len(self.x)) - K @ H) @ self.P
7.2 无迹卡尔曼滤波(UKF)
UKF使用sigma点来近似非线性变换,通常比EKF有更好的性能:
def unscented_transform(sigma_points, mean_weights, cov_weights):
# 计算变换后的均值和协方差
new_mean = np.sum(mean_weights * sigma_points, axis=0)
diff = sigma_points - new_mean
new_cov = np.sum(cov_weights * (diff[:,:,None] @ diff[:,None,:]), axis=0)
return new_mean, new_cov
class UnscentedKalmanFilter:
def __init__(self, initial_state, alpha=1e-3, beta=2, kappa=0):
self.x = initial_state
self.P = np.eye(len(initial_state))
self.n = len(initial_state)
self.lambda_ = alpha**2 * (self.n + kappa) - self.n
self.weights_mean = np.full(2*self.n+1, 1/(2*(self.n+self.lambda_)))
self.weights_mean[0] = self.lambda_/(self.n+self.lambda_)
self.weights_cov = self.weights_mean.copy()
self.weights_cov[0] += (1 - alpha**2 + beta)
def generate_sigma_points(self):
sigma_points = np.zeros((2*self.n+1, self.n))
sigma_points[0] = self.x
sqrt_P = np.linalg.cholesky((self.n + self.lambda_)*self.P)
for i in range(self.n):
sigma_points[i+1] = self.x + sqrt_P[i]
sigma_points[self.n+i+1] = self.x - sqrt_P[i]
return sigma_points
def predict(self, f):
sigma_points = self.generate_sigma_points()
transformed_points = np.array([f(point) for point in sigma_points])
self.x, self.P = unscented_transform(transformed_points,
self.weights_mean,
self.weights_cov)
self.P += self.Q
def update(self, z, h):
sigma_points = self.generate_sigma_points()
transformed_points = np.array([h(point) for point in sigma_points])
z_pred, Pzz = unscented_transform(transformed_points,
self.weights_mean,
self.weights_cov)
Pzz += self.R
# 计算互协方差
diff_x = sigma_points - self.x
diff_z = transformed_points - z_pred
Pxz = np.sum(self.weights_cov * (diff_x[:,:,None] @ diff_z[:,None,:]), axis=0)
# 计算卡尔曼增益
K = Pxz @ np.linalg.inv(Pzz)
# 状态更新
self.x = self.x + K @ (z - z_pred)
self.P = self.P - K @ Pzz @ K.T
8. 性能优化技巧
数值稳定性优化 :
- 使用平方根形式实现协方差更新
- 避免矩阵求逆,使用Cholesky分解
- 定期检查协方差矩阵的正定性
计算效率优化 :
- 预计算不变矩阵
- 利用稀疏矩阵特性
- 并行化计算
代码示例:平方根卡尔曼滤波 :
def sqrt_kalman_update(x, P_sqrt, H, R_sqrt, z):
# 预测残差
y = z - H @ x
# 计算中间矩阵
PHt = P_sqrt @ (P_sqrt.T @ H.T)
S_sqrt = np.linalg.qr(np.vstack([
H @ PHt,
R_sqrt
]), mode='r')
# 计算卡尔曼增益
K = PHt @ np.linalg.inv(S_sqrt.T @ S_sqrt)
# 状态更新
x_new = x + K @ y
# 协方差更新
temp = K @ S_sqrt
P_sqrt_new = np.linalg.qr(np.hstack([
P_sqrt - temp @ H,
temp
]), mode='r')
return x_new, P_sqrt_new
更多推荐

所有评论(0)