-
Notifications
You must be signed in to change notification settings - Fork 0
/
knn.py
138 lines (106 loc) · 4.52 KB
/
knn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import collections
import operator
from creme import base
__all__ = ["KNeighborsRegressor", "KNeighborsClassifier"]
def minkowski_distance(a: dict, b: dict, p: int):
"""Minkowski distance.
Parameters:
a
b
p: Parameter for the Minkowski distance. When `p=1`, this is equivalent to using the
Manhattan distance. When `p=2`, this is equivalent to using the Euclidean distance.
"""
return sum(
(abs(a.get(k, 0.0) - b.get(k, 0.0))) ** p for k in set([*a.keys(), *b.keys()])
)
class NearestNeighbours:
def __init__(self, window_size, p, min_distance_keep=0.05):
self.window_size = window_size
self.p = p
self.window = []
# Quick lookup of errors to skip distances if needed
self.lookup = set()
self.min_distance_keep = min_distance_keep
def update(self, x, y, identifier, k):
# Don't add VERY similar points to window
# Euclidean distance min is
nearest = self.find_nearest(x, k)
# Quick way to skip calculating distances
key = " ".join(x)
if key in self.lookup:
return self
# If we have any points too similar, don't keep
distances = [x[3] for x in nearest if x[3] < self.min_distance_keep]
if not distances:
# Have we gone over the length and need to remove one?
if len(self.window) >= self.window_size:
print("Removing from window %s: %s" % (identifier, key))
removed = self.window.pop(0)
self.lookup.remove(" ".join(removed[0]))
# Add a new x, y, and identifier to the window and lookup
print("Adding to window %s: %s" % (identifier, " ".join(x)))
self.window.append((x, y, identifier))
self.lookup.add(" ".join(" ".join(x)))
return self
def find_nearest(self, x, k):
"""Returns the `k` closest points to `x`, along with their distances."""
# Compute the distances to each point in the window
points = ((*p, minkowski_distance(a=x, b=p[0], p=self.p)) for p in self.window)
# Return the k closest points, index 3 is the distance (preceeded by x,y,identifier)
return sorted(points, key=operator.itemgetter(3))[:k]
class KNeighborsClassifier(base.Classifier):
"""K-Nearest Neighbors (KNN) for classification.
This works by storing a buffer with the `window_size` most recent observations. A brute-force
search is used to find the `n_neighbors` nearest observations in the buffer to make a
prediction.
Parameters:
n_neighbors: Number of neighbors to use.
window_size: Size of the sliding window use to search neighbors with.
p: Power parameter for the Minkowski metric. When `p=1`, this corresponds to the
Manhattan distance, while `p=2` corresponds to the Euclidean distance.
weighted: Whether to weight the contribution of each neighbor by it's inverse
distance or not.
Example:
>>> from creme import datasets
>>> from creme import evaluate
>>> from creme import metrics
>>> from creme import neighbors
>>> from creme import preprocessing
>>> dataset = datasets.Phishing()
>>> model = (
... preprocessing.StandardScaler() |
... neighbors.KNeighborsClassifier()
... )
>>> metric = metrics.Accuracy()
>>> evaluate.progressive_val_score(dataset, model, metric)
Accuracy: 84.55%
"""
def __init__(self, n_neighbors=5, window_size=50, p=2, min_distance_keep=0.05):
self.n_neighbors = n_neighbors
self.window_size = window_size
self.p = p
self.classes = set()
self._nn = NearestNeighbours(
window_size=window_size, p=p, min_distance_keep=min_distance_keep
)
@property
def _multiclass(self):
return True
def fit_one(self, x, y, identifier):
self.classes.add(y)
self._nn.update(x, y, identifier, k=self.n_neighbors)
return self
def predict_one(self, x: dict):
"""Predict the label of a set of features `x`.
Parameters:
x: A dictionary of features.
Returns:
The neighbors
"""
return self.predict_proba_one(x)
def predict_proba_one(self, x):
"""
This is modified to just return the nearest, not try to calculate
a prediction because we just want the points back.
"""
return self._nn.find_nearest(x=x, k=self.n_neighbors)