-
Notifications
You must be signed in to change notification settings - Fork 0
/
plotting.py
138 lines (109 loc) · 4.17 KB
/
plotting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import picoS2000aRealtimeStreaming as pico
from PySide6 import QtWidgets
from PySide6.QtCore import QThread, Signal
import pyqtgraph as pg
class DataFetcher(QThread):
data_fetched = Signal(list)
def __init__(self, channels):
"""
Initialize the Plotting class.
Args:
channels (list): A list of channels.
Attributes:
channels (list): A list of channels.
running (bool): A flag indicating if the plotting is running.
"""
super().__init__()
self.channels = channels
self.running = True
def run(self):
"""
Continuously fetches data from the specified channels and emits the fetched data.
This method runs in a loop until the `running` flag is set to False. It fetches data from each channel
using the `get_value` method of the `pico` object. The fetched data is then emitted using the `data_fetched`
signal. After emitting the data, the method sleeps for 25 milliseconds using the `msleep` method.
If an exception occurs while fetching the data, the error message is printed and the `running` flag is set
to False, terminating the loop.
Note: This method assumes that the `channels` attribute is a list of valid channel names.
"""
while self.running:
try:
values = [pico.get_value(channel) for channel in self.channels]
self.data_fetched.emit(values)
except Exception as e:
print(f"Error fetching data: {e}")
self.running = False
def stop(self):
"""
Stops the execution of the program.
Sets the `running` attribute to False, indicating that the program should stop running.
"""
self.running = False
class PicoPlotter(QtWidgets.QMainWindow):
def __init__(self, channels, title, parent):
"""
Initialize the PlottingWidget.
Args:
channels (list): A list of channels.
title (str): The title of the widget.
parent (QWidget): The parent widget.
Returns:
None
"""
super().__init__(parent)
self.channels = channels
self.title = title
self.widgetParent = parent
self.initUI(parent)
self.data = [[] for _ in channels]
self.data_fetcher = DataFetcher(channels)
self.data_fetcher.data_fetched.connect(self.update_plot)
self.data_fetcher.start()
def initUI(self, parent):
"""
Initializes the user interface for the plot.
Args:
parent: The parent widget.
Returns:
None
"""
layout = QtWidgets.QVBoxLayout(parent)
self.plotWidget = pg.PlotWidget(title=self.title, parent=parent)
layout.addWidget(self.plotWidget)
self.plotWidget.setTitle(self.title, color="k", size="18pt")
self.plotWidget.showGrid(x=True, y=True)
self.plotWidget.addLegend()
styles = {"color": "black", "font-size": "18px"}
self.plotWidget.setLabel('left', 'Tension (mV)', **styles)
self.plotWidget.setLabel('bottom', 'Temps (tick)', **styles)
self.plotWidget.setBackground('w')
lineThickness = 2
colors = ['r', 'g', 'b', 'y', 'm', 'c']
self.curves = []
for i, channel in enumerate(self.channels):
curve = self.plotWidget.plot(pen=pg.mkPen(
colors[i], width=lineThickness), name=f"{channel}")
self.curves.append(curve)
def update_plot(self, values):
"""
Update the plot with new data.
Args:
values (list): The new data values for each channel.
Returns:
None
"""
for i, value in enumerate(values):
self.data[i].append(value)
self.curves[i].setData(self.data[i])
def closeEvent(self, event):
"""
Handle the close event to stop the data fetching thread.
Args:
event: The close event.
Returns:
None
"""
self.data_fetcher.stop()
self.data_fetcher.wait()
event.accept()
# © AIMA DEVELOPPEMENT 2024