-
Notifications
You must be signed in to change notification settings - Fork 0
/
dht11.py
124 lines (105 loc) · 3.48 KB
/
dht11.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import utime
import array
import micropython
import utime
from machine import Pin
from micropython import const
class InvalidChecksum(Exception):
pass
class InvalidPulseCount(Exception):
pass
MAX_UNCHANGED = const(100)
MIN_INTERVAL_US = const(200000)
HIGH_LEVEL = const(50)
EXPECTED_PULSES = const(84)
class DHT11:
_temperature: float
_humidity: float
def __init__(self, pin):
self._pin = pin
self._last_measure = utime.ticks_us()
self._temperature = -1
self._humidity = -1
def measure(self):
current_ticks = utime.ticks_us()
if utime.ticks_diff(current_ticks, self._last_measure) < MIN_INTERVAL_US and (
self._temperature > -1 or self._humidity > -1
):
# Less than a second since last read, which is too soon according
# to the datasheet
return
self._send_init_signal()
pulses = self._capture_pulses()
buffer = self._convert_pulses_to_buffer(pulses)
self._verify_checksum(buffer)
self._humidity = buffer[0] + buffer[1] / 10
self._temperature = buffer[2] + buffer[3] / 10
self._last_measure = utime.ticks_us()
@property
def humidity(self):
self.measure()
return self._humidity
@property
def temperature(self):
self.measure()
return self._temperature
def _send_init_signal(self):
self._pin.init(Pin.OUT, Pin.PULL_DOWN)
self._pin.value(1)
utime.sleep_ms(50)
self._pin.value(0)
utime.sleep_ms(18)
# @micropython.native
def _capture_pulses(self):
pin = self._pin
pin.init(Pin.IN, Pin.PULL_UP)
val = 1
idx = 0
transitions = bytearray(EXPECTED_PULSES)
unchanged = 0
timestamp = utime.ticks_us()
while unchanged < MAX_UNCHANGED:
if val != pin.value():
if idx >= EXPECTED_PULSES:
raise InvalidPulseCount(
"Got more than {} pulses".format(EXPECTED_PULSES)
)
now = utime.ticks_us()
transitions[idx] = now - timestamp
timestamp = now
idx += 1
val = 1 - val
unchanged = 0
else:
unchanged += 1
pin.init(Pin.OUT, Pin.PULL_DOWN)
if idx != EXPECTED_PULSES:
raise InvalidPulseCount(
"Expected {} but got {} pulses".format(EXPECTED_PULSES, idx)
)
return transitions[4:]
def _convert_pulses_to_buffer(self, pulses):
"""Convert a list of 80 pulses into a 5 byte buffer
The resulting 5 bytes in the buffer will be:
0: Integral relative humidity data
1: Decimal relative humidity data
2: Integral temperature data
3: Decimal temperature data
4: Checksum
"""
# Convert the pulses to 40 bits
binary = 0
for idx in range(0, len(pulses), 2):
binary = binary << 1 | int(pulses[idx] > HIGH_LEVEL)
# Split into 5 bytes
buffer = array.array("B")
for shift in range(4, -1, -1):
buffer.append(binary >> shift * 8 & 0xFF)
return buffer
def _verify_checksum(self, buffer):
# Calculate checksum
checksum = 0
for buf in buffer[0:4]:
checksum += buf
if checksum & 0xFF != buffer[4]:
raise InvalidChecksum()