Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single serial port object #99

Merged
merged 4 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ venv
venus-data.tar.gz
BMS-trials
.DS_Store
.project
.pydevproject
*.prefs
117 changes: 86 additions & 31 deletions etc/dbus-serialbattery/daly.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ def __init__(self, port,baud,address):
TEMP_ZERO_CONSTANT = 40

def test_connection(self):
return self.read_status_data()
# return self.read_status_data()
result = False
ser = open_serial_port(self.port, self.baud_rate)
if ser is not None:
result = self.read_status_data(ser)
ser.close()

return result

def get_settings(self):
self.capacity = BATTERY_CAPACITY
Expand All @@ -46,27 +53,59 @@ def get_settings(self):
return True

def refresh_data(self):
result = self.read_soc_data()
result = result and self.read_fed_data()
if self.poll_step == 0:
# result = self.read_soc_data()
# result = result and self.read_fed_data()
# if self.poll_step == 0:
# This must be listed in step 0 as get_min_cell_voltage and get_max_cell_voltage in battery.py needs it at first cycle for publish_dbus in dbushelper.py
result = result and self.read_cell_voltage_range_data()
elif self.poll_step == 1:
result = result and self.read_alarm_data()
elif self.poll_step == 2:
result = result and self.read_cells_volts()
elif self.poll_step == 3:
result = result and self.read_temperature_range_data()
# result = result and self.read_cell_voltage_range_data()
# elif self.poll_step == 1:
# result = result and self.read_alarm_data()
# elif self.poll_step == 2:
# result = result and self.read_cells_volts()
# elif self.poll_step == 3:
# result = result and self.read_temperature_range_data()
#else: # A placeholder to remind this is the last step. Add any additional steps before here
# This is last step so reset poll_step
self.poll_step = -1

self.poll_step += 1
# self.poll_step = -1

# self.poll_step += 1

result = False
# Open serial port to be used for all data reads instead of openning multiple times
ser = open_serial_port(self.port, self.baud_rate)
if ser is not None:
#logger.warning("read_soc_data")
result = self.read_soc_data(ser)
#logger.warning("read_fed_data")
result = result and self.read_fed_data(ser)
#logger.warning("poll_step: " + str(self.poll_step))
if self.poll_step == 0:
# This must be listed in step 0 as get_min_cell_voltage and get_max_cell_voltage in battery.py needs it at first cycle for publish_dbus in dbushelper.py
#logger.warning("read_cell_voltage_range_data")
result = result and self.read_cell_voltage_range_data(ser)
elif self.poll_step == 1:
#logger.warning("read_alarm_data")
result = result and self.read_alarm_data(ser)
elif self.poll_step == 2:
#logger.warning("read_cells_volts")
result = result and self.read_cells_volts(ser)
elif self.poll_step == 3:
#logger.warning("read_temperature_range_data")
result = result and self.read_temperature_range_data(ser)
#else: # A placeholder to remind this is the last step. Add any additional steps before here
# This is last step so reset poll_step
self.poll_step = -1

self.poll_step += 1

ser.close()

return result

def read_status_data(self):
status_data = self.read_serial_data_daly(self.command_status)
# def read_status_data(self):
def read_status_data(self, ser):
# status_data = self.read_serial_data_daly(self.command_status)
status_data = self.read_serial_data_daly(ser, self.command_status)
# check if connection success
if status_data is False:
logger.warning("read_status_data")
Expand All @@ -82,7 +121,8 @@ def read_status_data(self):
logger.info(self.hardware_version)
return True

def read_soc_data(self):
# def read_soc_data(self):
def read_soc_data(self, ser):
# Ensure data received is valid
crntMinValid = -(MAX_BATTERY_DISCHARGE_CURRENT * 2.1)
crntMaxValid = (MAX_BATTERY_CURRENT * 1.3)
Expand All @@ -92,11 +132,13 @@ def read_soc_data(self):
# Try up to 3 times to get data. This greatly increases soc_data collection with Daly
triesData = 3
while soc_data is False and triesData > 0:
soc_data = self.read_serial_data_daly(self.command_soc)
# soc_data = self.read_serial_data_daly(self.command_soc)
soc_data = self.read_serial_data_daly(ser, self.command_soc)
#if soc_data is False:
# logger.warning("read_soc_data - triesData " + str(triesData))
triesData -= 1
# check if connection success
if soc_data is False:
logger.warning("read_soc_data")
return False

voltage, tmp, current, soc = unpack_from('>hhhh', soc_data)
Expand All @@ -107,12 +149,15 @@ def read_soc_data(self):
self.soc = (soc / 10)
return True

logger.warning("read_soc_data - triesValid " + str(triesValid))
triesValid -= 1

return False

def read_alarm_data(self):
alarm_data = self.read_serial_data_daly(self.command_alarm)
# def read_alarm_data(self):
def read_alarm_data(self, ser):
# alarm_data = self.read_serial_data_daly(self.command_alarm)
alarm_data = self.read_serial_data_daly(ser, self.command_alarm)
# check if connection success
if alarm_data is False:
logger.warning("read_alarm_data")
Expand Down Expand Up @@ -214,7 +259,8 @@ def read_alarm_data(self):

return True

def read_cells_volts(self):
# def read_cells_volts(self):
def read_cells_volts(self, ser):
if self.cell_count is not None:
buffer = bytearray(self.cellvolt_buffer)
buffer[1] = self.command_address[0] # Always serial 40 or 80
Expand All @@ -223,7 +269,8 @@ def read_cells_volts(self):
maxFrame = (int(self.cell_count / 3) + 1)
lenFixed = (maxFrame * 12)

cells_volts_data = read_serial_data(buffer, self.port, self.baud_rate, self.LENGTH_POS, self.LENGTH_CHECK, lenFixed)
# cells_volts_data = read_serial_data(buffer, self.port, self.baud_rate, self.LENGTH_POS, self.LENGTH_CHECK, lenFixed)
cells_volts_data = read_serialport_data(ser, buffer, self.LENGTH_POS, self.LENGTH_CHECK, lenFixed)
if cells_volts_data is False:
logger.warning("read_cells_volts")
return False
Expand All @@ -246,8 +293,10 @@ def read_cells_volts(self):

return True

def read_cell_voltage_range_data(self):
minmax_data = self.read_serial_data_daly(self.command_minmax_cell_volts)
# def read_cell_voltage_range_data(self):
def read_cell_voltage_range_data(self, ser):
# minmax_data = self.read_serial_data_daly(self.command_minmax_cell_volts)
minmax_data = self.read_serial_data_daly(ser, self.command_minmax_cell_volts)
# check if connection success
if minmax_data is False:
logger.warning("read_cell_voltage_range_data")
Expand All @@ -262,8 +311,10 @@ def read_cell_voltage_range_data(self):
self.cell_min_voltage = cell_min_voltage / 1000
return True

def read_temperature_range_data(self):
minmax_data = self.read_serial_data_daly(self.command_minmax_temp)
# def read_temperature_range_data(self):
def read_temperature_range_data(self, ser):
# minmax_data = self.read_serial_data_daly(self.command_minmax_temp)
minmax_data = self.read_serial_data_daly(ser, self.command_minmax_temp)
# check if connection success
if minmax_data is False:
logger.warning("read_temperature_range_data")
Expand All @@ -274,8 +325,10 @@ def read_temperature_range_data(self):
self.temp2 = max_temp - self.TEMP_ZERO_CONSTANT
return True

def read_fed_data(self):
fed_data = self.read_serial_data_daly(self.command_fet)
# def read_fed_data(self):
def read_fed_data(self, ser):
# fed_data = self.read_serial_data_daly(self.command_fet)
fed_data = self.read_serial_data_daly(ser, self.command_fet)
# check if connection success
if fed_data is False:
logger.warning("read_fed_data")
Expand All @@ -292,8 +345,10 @@ def generate_command(self, command):
buffer[12] = sum(buffer[:12]) & 0xFF #checksum calc
return buffer

def read_serial_data_daly(self, command):
data = read_serial_data(self.generate_command(command), self.port, self.baud_rate, self.LENGTH_POS, self.LENGTH_CHECK)
# def read_serial_data_daly(self, command):
def read_serial_data_daly(self, ser, command):
# data = read_serial_data(self.generate_command(command), self.port, self.baud_rate, self.LENGTH_POS, self.LENGTH_CHECK)
data = read_serialport_data(ser, self.generate_command(command), self.LENGTH_POS, self.LENGTH_CHECK)
if data is False:
return False

Expand Down
72 changes: 72 additions & 0 deletions etc/dbus-serialbattery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,76 @@ def read_serial_data(command, port, baud, length_pos, length_check, length_fixed
except serial.SerialException as e:
logger.error(e)
return False


# Open the serial port
# Return variable for the openned port
def open_serial_port(port, baud):
ser = None
tries = 3
while tries > 0:
try:
ser = serial.Serial(port, baudrate=baud, timeout=0.1)
tries = 0
except serial.SerialException as e:
logger.error(e)
tries -= 1

return ser

# Read data from previously openned serial port
def read_serialport_data(ser, command, length_pos, length_check, length_fixed=None, length_size=None):
try:
ser.flushOutput()
ser.flushInput()
ser.write(command)

length_byte_size = 1
if length_size is not None:
if length_size.upper() == 'H':
length_byte_size = 2
elif length_size.upper() == 'I' or length_size.upper() == 'L':
length_byte_size = 4

count = 0
toread = ser.inWaiting()

while toread < (length_pos+length_byte_size):
sleep(0.005)
toread = ser.inWaiting()
count += 1
if count > 50:
#logger.error(">>> ERROR: No reply - returning")
return False

#logger.info('serial data toread ' + str(toread))
res = ser.read(toread)
if length_fixed is not None:
length = length_fixed
else:
if len(res) < (length_pos+length_byte_size):
logger.error(">>> ERROR: No reply - returning [len:" + str(len(res)) + "]")
return False
length_size = length_size if length_size is not None else 'B'
length = unpack_from('>'+length_size, res,length_pos)[0]

#logger.info('serial data length ' + str(length))

count = 0
data = bytearray(res)
while len(data) <= length + length_check:
res = ser.read(length + length_check)
data.extend(res)
#logger.info('serial data length ' + str(len(data)))
sleep(0.005)
count += 1
if count > 150:
logger.error(">>> ERROR: No reply - returning [len:" + str(len(data)) + "/" + str(length + length_check) + "]")
return False

return data

except serial.SerialException as e:
logger.error(e)
return False