Skip to content

Commit

Permalink
restabilized test
Browse files Browse the repository at this point in the history
  • Loading branch information
AviaAv committed Feb 29, 2024
1 parent 909ef91 commit c3a48e2
Showing 1 changed file with 73 additions and 93 deletions.
166 changes: 73 additions & 93 deletions unit-tests/live/d400/test-hdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
device.hardware_reset()
time.sleep(5)



# HDR STREAMING TESTS
with test.closure("HDR Streaming - default config"):
Expand Down Expand Up @@ -110,6 +109,7 @@
else:
test.check(frame_exposure == exposure_range.min)
test.check(frame_gain == gain_range.min)
pipe.stop()
depth_sensor.set_option(rs.option.hdr_enabled, 0) # disable hdr before next tests


Expand Down Expand Up @@ -166,37 +166,30 @@
test.check(frame_exposure == second_exposure)
test.check(frame_gain == second_gain)

pipe.stop()
depth_sensor.set_option(rs.option.hdr_enabled, 0) # disable hdr before next tests
device.hardware_reset()
time.sleep(5)



# CHECKING HDR AFTER PIPE RESTART

with test.closure("HDR Running - restart hdr at restream"):
device = test.find_first_device_or_exit()
depth_sensor = device.first_depth_sensor()
if depth_sensor and depth_sensor.supports(rs.option.hdr_enabled):

cfg = rs.config()
cfg.enable_stream(rs.stream.depth)
pipe = rs.pipeline()
pipe.start(cfg)

depth_sensor.set_option(rs.option.hdr_enabled, 1)
test.check(depth_sensor.get_option(rs.option.hdr_enabled) == 1)

for i in range(10):
data = pipe.wait_for_frames()

test.check(depth_sensor.get_option(rs.option.hdr_enabled) == 1)
pipe.stop()

pipe.start(cfg)
test.check(depth_sensor.get_option(rs.option.hdr_enabled) == 1)
pipe.stop()

depth_sensor.set_option(rs.option.hdr_enabled, 0) # disable hdr before next tests


Expand Down Expand Up @@ -229,7 +222,6 @@ def check_hdr_frame_counter(pipe, num_of_frames, merging_filter):
# get depth frame data
depth_frame = data.get_depth_frame()
if not test.check(depth_frame.supports_frame_metadata(rs.frame_metadata_value.sequence_id)):
log.e("Frame metadata not supported")
return

depth_counter = depth_frame.get_frame_metadata(rs.frame_metadata_value.frame_counter)
Expand All @@ -239,7 +231,6 @@ def check_hdr_frame_counter(pipe, num_of_frames, merging_filter):

# get hdr frame data
if not test.check(merged_frame.supports_frame_metadata(rs.frame_metadata_value.sequence_id)):
log.e("Frame metadata not supported")
return

hdr_counter = merged_frame.get_frame_metadata(rs.frame_metadata_value.frame_counter)
Expand All @@ -254,7 +245,6 @@ def check_hdr_frame_counter(pipe, num_of_frames, merging_filter):
prev_depth_counter = depth_counter



# CHECKING HDR MERGE AFTER HDR RESTART
with test.closure("HDR Running - hdr merge after hdr restart"):
device = test.find_first_device_or_exit()
Expand Down Expand Up @@ -289,6 +279,66 @@ def check_hdr_frame_counter(pipe, num_of_frames, merging_filter):
pipe.stop()
depth_sensor.set_option(rs.option.hdr_enabled, 0) # disable hdr before next tests


def check_sequence_id_on_frame(frame, prev_frame_counter, old_sequence_id):
"""
given a frame and values from a previous frame
this function checks if frames are sequential, if so test that sequence id is as expected
"""
check_ok = False
frame_counter = frame.get_frame_metadata(rs.frame_metadata_value.frame_counter)
depth_seq_id = frame.get_frame_metadata(rs.frame_metadata_value.sequence_id)
if frame_counter != prev_frame_counter + 1: # can only compare sequential frames
sequence_id = int(depth_seq_id)
else:
sequence_id = 1 if old_sequence_id == 0 else 0

test.info("expected sequence id:", sequence_id)
test.info("frame seq id:", depth_seq_id)
test.info("frame counter:", frame_counter)
test.info("prev frame counter:", prev_frame_counter)
if test.check(sequence_id == depth_seq_id):
check_ok = True

prev_frame_counter = frame_counter

return check_ok, prev_frame_counter, sequence_id


def check_sequence_id(pipe):
"""
given a started pipe, this function is making sure the sequence id for the depth and ir streams is ok.
"""
depth_seq_id = -1
ir_seq_id = -1
iterations_for_preparation = 14
prev_frame_counter = -1
prev_ir_frame_counter = -1
seq_id_check_depth = True
seq_id_check_ir = True
checks_ok = 0
while checks_ok < 50:
data = pipe.wait_for_frames()
if iterations_for_preparation > 0:
iterations_for_preparation -= 1
continue

depth_frame = data.get_depth_frame()
ir_frame = data.get_infrared_frame(1)

if depth_frame.supports_frame_metadata(rs.frame_metadata_value.sequence_id):
seq_id_check_depth, prev_frame_counter, depth_seq_id = (
check_sequence_id_on_frame(depth_frame, prev_frame_counter, depth_seq_id))

if ir_frame.supports_frame_metadata(rs.frame_metadata_value.sequence_id):
seq_id_check_ir, prev_ir_frame_counter, ir_seq_id = (
check_sequence_id_on_frame(depth_frame, prev_ir_frame_counter, ir_seq_id))

# if both frames are sequential (after the previous ones), and sequence id test passes, that check is ok
if seq_id_check_depth and seq_id_check_ir:
checks_ok += 1


# CHECKING SEQUENCE ID WHILE STREAMING
with test.closure("HDR Streaming - checking sequence id"):
device = test.find_first_device_or_exit()
Expand All @@ -303,99 +353,30 @@ def check_hdr_frame_counter(pipe, num_of_frames, merging_filter):
depth_sensor.set_option(rs.option.hdr_enabled, 1)
test.check(depth_sensor.get_option(rs.option.hdr_enabled) == 1)

sequence_id = -1
iterations_for_preparation = 6
prev_frame_counter = -1
checks_ok = 0
while checks_ok < 50:
data = pipe.wait_for_frames()
if iterations_for_preparation > 0:
iterations_for_preparation -= 1
continue

depth_frame = data.get_depth_frame()
ir_frame = data.get_infrared_frame(1)

if depth_frame.supports_frame_metadata(rs.frame_metadata_value.sequence_id):
frame_counter = depth_frame.get_frame_metadata(rs.frame_metadata_value.frame_counter)
depth_seq_id = depth_frame.get_frame_metadata(rs.frame_metadata_value.sequence_id)
if frame_counter != prev_frame_counter + 1: # can only compare sequential frames
sequence_id = int(depth_seq_id)
else:
sequence_id = 1 if sequence_id == 0 else 0

test.info("sequence_id", sequence_id)
test.info("depth_seq_id", depth_seq_id)
test.info("frame_counter", frame_counter)
test.info("prev_frame_counter", prev_frame_counter)
if test.check(sequence_id == depth_seq_id):
checks_ok += 1

prev_frame_counter = frame_counter

if ir_frame.supports_frame_metadata(rs.frame_metadata_value.sequence_id):
ir_seq_id = ir_frame.get_frame_metadata(rs.frame_metadata_value.sequence_id)
test.info("sequence_id", sequence_id)
test.info("depth_seq_id", ir_seq_id)
test.info("frame_counter", frame_counter)
test.info("prev_frame_counter", prev_frame_counter)
test.check(sequence_id == ir_seq_id)
check_sequence_id(pipe)

pipe.stop()
depth_sensor.set_option(rs.option.hdr_enabled, 0) # disable hdr before next tests
device.hardware_reset()
time.sleep(5)


with test.closure("Emitter on/off - checking sequence id"):
device = test.find_first_device_or_exit()
depth_sensor = device.first_depth_sensor()

if depth_sensor and depth_sensor.supports(rs.option.hdr_enabled):
if depth_sensor and depth_sensor.supports(rs.option.emitter_on_off):
cfg = rs.config()
cfg.enable_stream(rs.stream.depth)
cfg.enable_stream(rs.stream.infrared, 1)
pipe = rs.pipeline()
pipe.start(cfg)

if depth_sensor.supports(rs.option.emitter_on_off):
depth_sensor.set_option(rs.option.emitter_on_off, 1)
test.check(depth_sensor.get_option(rs.option.emitter_on_off) == 1)

sequence_id = -1

# emitter on/off works with PWM (pulse with modulation) in the hardware
# this takes some time to configure it
iterations_for_preparation = 10
prev_frame_counter = -1
checks_ok = 0
while checks_ok < 50: # Application still alive?
data = pipe.wait_for_frames()
if iterations_for_preparation > 0:
iterations_for_preparation -= 1
continue

depth_frame = data.get_depth_frame()
ir_frame = data.get_infrared_frame(1)
depth_sensor.set_option(rs.option.emitter_on_off, 1)
test.check(depth_sensor.get_option(rs.option.emitter_on_off) == 1)

if depth_frame.supports_frame_metadata(rs.frame_metadata_value.sequence_id):
frame_counter = depth_frame.get_frame_metadata(rs.frame_metadata_value.frame_counter)
depth_seq_id = depth_frame.get_frame_metadata(rs.frame_metadata_value.sequence_id)
check_sequence_id(pipe)

if frame_counter != prev_frame_counter + 1: # can only compare sequential frames
sequence_id = int(depth_seq_id)
else:
sequence_id = 1 if sequence_id == 0 else 0

test.info("sequence_id", sequence_id)
test.info("depth_seq_id", depth_seq_id)
test.info("frame_counter", frame_counter)
test.info("prev_frame_counter", prev_frame_counter)
if test.check(sequence_id == depth_seq_id):
checks_ok += 1

if ir_frame.supports_frame_metadata(rs.frame_metadata_value.sequence_id):
ir_seq_id = ir_frame.get_frame_metadata(rs.frame_metadata_value.sequence_id)
test.check(sequence_id == ir_seq_id)
prev_frame_counter = frame_counter
pipe.stop()
depth_sensor.set_option(rs.option.emitter_on_off, 0)

Expand Down Expand Up @@ -436,7 +417,7 @@ def check_hdr_frame_counter(pipe, num_of_frames, merging_filter):

pipe.stop()

if at_least_one_frame_supported_seq_id:
if test.check(at_least_one_frame_supported_seq_id):
test.check(first_series_last_merged_ts != -1)
test.check(depth_sensor.get_option(rs.option.hdr_enabled) == 1)
pipe.start(cfg)
Expand All @@ -453,8 +434,6 @@ def check_hdr_frame_counter(pipe, num_of_frames, merging_filter):
frame_ts = merged_depth_frame.get_frame_metadata(rs.frame_metadata_value.frame_timestamp)
test.check(frame_ts > first_series_last_merged_ts)
pipe.stop()
else:
log.e("sequence_id metadata was not supported - skipping test")

depth_sensor.set_option(rs.option.hdr_enabled, 0) # disable hdr before next tests

Expand Down Expand Up @@ -507,11 +486,11 @@ def check_hdr_frame_counter(pipe, num_of_frames, merging_filter):
test.info("iteration_to_check_after_disable", iteration_to_check_after_disable)
test.check(frame_exposure == exposure_before_hdr)

pipe.stop()
device.hardware_reset()
time.sleep(5)



# CONTROLS STABILITY WHILE HDR ACTIVE
with test.closure("HDR Active - set locked options"):
device = test.find_first_device_or_exit()
Expand Down Expand Up @@ -552,7 +531,6 @@ def check_hdr_frame_counter(pipe, num_of_frames, merging_filter):
time.sleep(5)



with test.closure("HDR Streaming - set locked options"):
device = test.find_first_device_or_exit()
depth_sensor = device.first_depth_sensor()
Expand Down Expand Up @@ -587,6 +565,8 @@ def check_hdr_frame_counter(pipe, num_of_frames, merging_filter):

depth_sensor.set_option(rs.option.laser_power, laser_power_before_hdr - 30)
test.check(depth_sensor.get_option(rs.option.laser_power) == laser_power_before_hdr)

pipe.stop()
depth_sensor.set_option(rs.option.hdr_enabled, 0) # disable hdr before next tests
device.hardware_reset()
time.sleep(5)
Expand Down

0 comments on commit c3a48e2

Please sign in to comment.