-
Notifications
You must be signed in to change notification settings - Fork 1
/
wrappedkalman.py
83 lines (71 loc) · 2.7 KB
/
wrappedkalman.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""
Python implementation of the wrapped kalman filter for bearing tracking
The wrapped Kalman filter infers the hidden state of a linear
determinate system where the state lies on the unit circle.
author: Alex Pollara
Algorithm is described in:
"A Wrapped Kalman Filter for Azimuthal Speaker Tracking"
IEEE Signal Processing Letters, December 2013, Volume: 20 Issue: 12
Pages 1257-1260 (Johannes Traa and Paris Smaragdis)
"""
from numpy import matrix, square, exp, sqrt, pi, array, eye
from numpy.linalg import inv
def wrap(x):
"""
:param x: float
:return: float
"""
# returns the value of an angle wrapped around the unit circle
return ((x + pi) % (2 * pi)) - pi
class WrappedKalman:
def __init__(self, x0, dt, cov, var, l=1, v0=0.0):
"""
:param x0: float
:param dt: float
:param cov: numpy.matrixlib.defmatrix.matrix
:param var: float
:param l: int
:param v0: float
"""
self.A = matrix([[1, dt], [0, 1]]) # state transition matrix
self.C = cov # state covariance matrix
self.cov = cov # process noise
self.B = matrix([1, 0]) # measurement matrix
self.var = var # measurement noise
self.L = range(-l, l + 1) # replicates
self.state = matrix([[x0], [v0]]) # current state
self.predictions = [self.state] # historical states
def step(self, y):
"""
:param y: float
:return: int
"""
# prediction step
# state prediction
state_est = self.A * self.state
# wrap predicted state
state_est[0, 0] = wrap(state_est[0, 0])
# estimate covariance
c_est = self.A * self.C * self.A.T + self.cov
# innovation step
# formulate normal pdf
N = lambda x: (1 / sqrt(2 * pi * self.var)) * exp(-square(x - state_est[0, 0]) / (2 * self.var))
# calculate partial probabilities
p_yl = array([N(y + 2 * pi * l) for l in self.L])
# calculate conditional probabilities
p_y = p_yl / sum(p_yl)
# calculate innovation conditional on l
g_l = array([((y + 2 * pi * l)-state_est[0, 0]) for l in self.L])
# calculate weighted innovation
g = sum(g_l * p_y)
# correction step
# calculate Kalman gain
K = (c_est * self.B.T) * inv(self.B * c_est * self.B.T + self.var)
# update state
self.state = state_est + K * g
# wrap predicted state
self.state[0, 0] = wrap(self.state[0, 0])
# update covariance
self.cov = (eye(self.cov.shape[0]) - K * self.B) * c_est
# add current state to predictions
self.predictions.append(self.state)