-
Notifications
You must be signed in to change notification settings - Fork 1
/
metrics.py
147 lines (112 loc) · 5.02 KB
/
metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import torch
import numpy as np
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import f1_score, confusion_matrix
import core.base as base
import constants
from collections import Counter
class Metric:
def __str__(self):
raise NotImplementedError("A metric must implement the method '__str__'")
def fit(self, embeddings, y):
raise NotImplementedError("A metric must implement the method 'fit'")
def calculate_batch(self, embeddings, logits, y):
raise NotImplementedError("A metric must implement the method 'calculate_batch'")
def get(self):
raise NotImplementedError("A metric must implement the method 'get'")
class KNNF1ScoreMetric(Metric):
def __init__(self, distance, neighbors: int = 1):
self.neighbors = neighbors
self.knn = KNeighborsClassifier(n_neighbors=neighbors, metric=distance.to_sklearn_metric())
self.preds, self.y = [], []
self.train_class_counter = None
self.train_y = None
def __str__(self):
return 'KNN Macro F1-Score'
def fit(self, embeddings, y):
self.knn.fit(embeddings, y)
self.train_y = y
self.train_class_counter = Counter(y)
def calculate_batch(self, embeddings, logits, y):
if self.neighbors == 1:
predicted = self.knn.predict(embeddings)
else:
_, idx = self.knn.kneighbors(embeddings)
predicted = []
for neigh_labels in self.train_y[idx]:
counter = Counter(neigh_labels)
max_vote, max_label = 0, -1
for label in counter:
vote = counter[label] / self.train_class_counter[label]
if vote > max_vote:
max_vote = vote
max_label = label
predicted.append(max_label)
predicted = np.array(predicted)
self.preds.extend(predicted)
self.y.extend(y)
def get(self):
metric = f1_score(self.y, self.preds, average='macro')
print(f"Confusion Matrix:\n{confusion_matrix(self.y, self.preds)}")
self.preds, self.y = [], []
return metric
class Evaluator(base.TrainingListener):
def __init__(self, loader, metric, partition_name, callbacks=None):
self.loader = loader
self.metric = metric
self.partition_name = partition_name
self.callbacks = callbacks if callbacks is not None else []
self.feat_train, self.y_train = None, None
self.results = []
self.best_metric, self.best_epoch, self.last_metric = 0, -1, 0
def eval(self, model):
model.eval()
feat_test, logits_test, y_test = [], [], []
for cb in self.callbacks:
cb.on_before_test()
with torch.no_grad():
for i in range(self.loader.nbatches()):
x, y = next(self.loader)
if isinstance(x, torch.Tensor):
x = x.to(constants.DEVICE)
if isinstance(y, torch.Tensor):
y = y.to(constants.DEVICE)
# Feed Forward
feat, logits = model(x, y)
feat = feat.detach().cpu().numpy()
if logits is not None:
logits = logits.detach().cpu().numpy()
y = y.detach().cpu().numpy()
# Track accuracy
feat_test.append(feat)
if logits is not None:
logits_test.append(logits)
y_test.append(y)
self.metric.calculate_batch(feat, logits, y)
for cb in self.callbacks:
cb.on_batch_tested(i, feat)
feat_test, y_test = np.concatenate(feat_test), np.concatenate(y_test)
return feat_test, y_test
def on_before_epoch(self, epoch):
self.feat_train, self.y_train = [], []
def on_after_gradients(self, epoch, ibatch, feat, logits, y, loss):
self.feat_train.append(feat.detach().cpu().numpy())
self.y_train.append(y.detach().cpu().numpy())
def on_after_epoch(self, epoch, model, loss_fn, optim):
feat_train = np.concatenate(self.feat_train)
y_train = np.concatenate(self.y_train)
self.metric.fit(feat_train, y_train)
feat_test, y_test = self.eval(model)
self.last_metric = self.metric.get()
self.results.append(self.last_metric)
for cb in self.callbacks:
cb.on_after_test(epoch, feat_test, y_test, self.last_metric)
print(f"[{self.partition_name.capitalize()} {self.metric}: {self.last_metric:.6f}]")
if self.best_epoch != -1:
print(f"Best until now: {self.best_metric:.6f}, at epoch {self.best_epoch}")
if self.last_metric > self.best_metric:
self.best_metric = self.last_metric
self.best_epoch = epoch
print(f'New Best {self.partition_name.capitalize()} {self.metric}!')
for cb in self.callbacks:
cb.on_best_accuracy(epoch, model, loss_fn, optim, self.last_metric, feat_test, y_test)