Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer fix: didn't release frames with inactive stream #9593

Merged
merged 5 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 69 additions & 31 deletions src/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ namespace librealsense
}
update_next_expected( matcher, f );

auto const last_arrived = f.frame;
if( ! _frames_queue[matcher.get()].enqueue( std::move( f ) ) )
// If we get stopped, nothing to do!
return;
Expand Down Expand Up @@ -383,7 +384,7 @@ namespace librealsense
LOG_IF_ENABLE( "... missing " << i->get_name() << ", next expected "
<< _next_expected[i],
env );
if( skip_missing_stream( synced_frames, i, env ) )
if( skip_missing_stream( *curr_sync, i, last_arrived, env ) )
{
LOG_IF_ENABLE( "... ignoring it", env );
continue;
Expand Down Expand Up @@ -485,20 +486,17 @@ namespace librealsense
}

bool
frame_number_composite_matcher::skip_missing_stream( std::vector< matcher * > const & synced,
frame_number_composite_matcher::skip_missing_stream( frame_interface const * const synced_frame,
matcher * missing,
frame_interface const * last_arrived,
const syncronization_environment & env )
{
frame_holder* synced_frame;

if(!missing->get_active())
return true;

_frames_queue[synced[0]].peek( [&]( frame_holder & fh ) { synced_frame = &fh; } );

auto next_expected = _next_expected[missing];

if((*synced_frame)->get_frame_number() - next_expected > 4 || (*synced_frame)->get_frame_number() < next_expected)
if(synced_frame->get_frame_number() - next_expected > 4 || synced_frame->get_frame_number() < next_expected)
{
return true;
}
Expand Down Expand Up @@ -563,20 +561,20 @@ namespace librealsense
_last_arrived[m] = now;
}

unsigned int timestamp_composite_matcher::get_fps(const frame_holder & f)
unsigned int timestamp_composite_matcher::get_fps( frame_interface const * f )
{
uint32_t fps = 0;
if(f.frame->supports_frame_metadata(RS2_FRAME_METADATA_ACTUAL_FPS))
if(f->supports_frame_metadata(RS2_FRAME_METADATA_ACTUAL_FPS))
{
fps = (uint32_t)f.frame->get_frame_metadata(RS2_FRAME_METADATA_ACTUAL_FPS);
fps = (uint32_t)f->get_frame_metadata(RS2_FRAME_METADATA_ACTUAL_FPS);
}
if( fps )
{
//LOG_DEBUG( "fps " << fps << " from metadata" );
}
else
{
fps = f.frame->get_stream()->get_framerate();
fps = f->get_stream()->get_framerate();
//LOG_DEBUG( "fps " << fps << " from stream framerate" );
}
return fps;
Expand All @@ -601,8 +599,9 @@ namespace librealsense
// We let skip_missing_stream clean any inactive missing streams
}

bool timestamp_composite_matcher::skip_missing_stream( std::vector< matcher * > const & synced,
bool timestamp_composite_matcher::skip_missing_stream( frame_interface const * waiting_to_be_released,
matcher * missing,
frame_interface const * last_arrived,
const syncronization_environment & env )
{
// true : frameset is ready despite the missing stream (no use waiting) -- "skip" it
Expand All @@ -612,42 +611,78 @@ namespace librealsense
return true;

//LOG_IF_ENABLE( "... matcher " << synced[0]->get_name(), env );
rs2_time_t timestamp;
rs2_timestamp_domain domain;
unsigned int fps;
_frames_queue[synced[0]].peek( [&]( frame_holder & fh ) {
// LOG_IF_ENABLE( "... frame " << fh->frame, env );

timestamp = fh->get_frame_timestamp();
domain = fh->get_frame_timestamp_domain();
fps = get_fps( fh );
} );

auto next_expected = _next_expected[missing];
// LOG_IF_ENABLE( "... next " << std::fixed << next_expected, env );

auto it = _next_expected_domain.find( missing );
if( it != _next_expected_domain.end() )
{
if( it->second != domain )
if( it->second != last_arrived->get_frame_timestamp_domain() )
{
// LOG_IF_ENABLE( "... not the same domain: frameset not ready!", env );
return false;
}
}
// next expected of the missing stream didn't updated yet
if( timestamp > next_expected )

// We want to calculate a cutout for inactive stream detection: if we wait too long past
// this cutout, then the missing stream is inactive and we no longer wait for it.
//
// cutout = next expected + threshold
// threshold = 7 * gap = 7 * (1000 / FPS)
//
//
// E.g.:
//
// D: 100 fps -> 10 ms gap
// C: 10 fps -> 100 ms gap
//
// D C @timestamp
// -- -- ----------
// 1 0 -> release (we don't know about a missing stream yet, so don't get here)
// ...
// 6 50 -> release (D6); next expected (NE) -> 60
// 1 0 -> release (C1); NE -> 100 <----- LATENCY
// 7 ? 60 -> release (D7) (not comparable to NE because we use fps of 100!)
// ...
// 11 ? 100 -> wait for C (now comparable); cutout is 100+7*10 = 170
// 12 ? 110 -> wait (> NE, < cutout)
// ...
// 19 ? 180 > 170 -> release (D11); mark C inactive (> cutout)
// release (D12) ... (D19) (no missing streams, so don't get here)
// 20 190 -> release (D20) (no missing streams, so don't get here)
// ...
//
// But, if C had arrived before D19:
//
// ...
// 2 100 -> release (D11, C2) (nothing missing); NE -> 200
// ? release (D12) ... (D18) (C missing but 100 not comparable to NE 200)
// 19 ? 180 -> release (D19)
// 20 ? 190 -> release (D20)
// 21 ? 200 -> wait for C (comparable to NE 200)
//
//
// The threshold is a function of the FPS, but note we can't keep more frames than
// our queue size and per-stream archive size allow.
auto const fps = get_fps( waiting_to_be_released );

rs2_time_t now = last_arrived->get_frame_timestamp();;
maloel marked this conversation as resolved.
Show resolved Hide resolved
if( now > next_expected )
{
// Wait up to 10*gap for the missing stream frame to arrive -- anything more and we
// Wait for the missing stream frame to arrive -- up to a cutout: anything more and we
// let the frameset be ready without it...
auto gap = 1000.f / fps;
auto threshold = 10 * gap;
if( timestamp - next_expected < threshold )
// NOTE: the threshold is a function of the gap; the bigger it is, the more latency
// between the streams we're willing to live with. Each gap is a frame so we are limited
// by the number of frames we're willing to keep (which is our queue limit)
auto threshold = 7 * gap; // really 7+1 because NE is already 1 away
if( now - next_expected < threshold )
{
// LOG_IF_ENABLE( "... next expected of the missing stream didn't updated yet", env );
//LOG_IF_ENABLE( "... still below threshold of {10*gap}" << threshold, env );
return false;
}
LOG_IF_ENABLE( "... exceeded threshold of {10*gap}" << threshold << "; deactivating matcher!", env );
LOG_IF_ENABLE( "... exceeded cutout of {NE+7*gap}" << ( next_expected + threshold ) << "; deactivating matcher!", env );

auto const q_it = _frames_queue.find( missing );
if( q_it != _frames_queue.end() )
Expand All @@ -656,9 +691,12 @@ namespace librealsense
_frames_queue.erase( q_it );
}
missing->set_active( false );
return true;
}

return ! are_equivalent( timestamp, next_expected, fps );
return ! are_equivalent( waiting_to_be_released->get_frame_timestamp(),
next_expected,
fps ); // should be min fps to match behavior elsewhere?
}

bool timestamp_composite_matcher::are_equivalent( double a, double b, unsigned int fps )
Expand Down
14 changes: 9 additions & 5 deletions src/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ namespace librealsense

virtual bool are_equivalent(frame_holder& a, frame_holder& b) = 0;
virtual bool is_smaller_than(frame_holder& a, frame_holder& b) = 0;
virtual bool skip_missing_stream( std::vector< matcher * > const & synced,
virtual bool skip_missing_stream( frame_interface const * waiting_to_be_released,
matcher * missing,
frame_interface const * last_arrived,
const syncronization_environment & env )
= 0;
virtual void clean_inactive_streams(frame_holder& f) = 0;
Expand Down Expand Up @@ -165,8 +166,9 @@ namespace librealsense
void sync(frame_holder f, const syncronization_environment& env) override;
virtual bool are_equivalent(frame_holder& a, frame_holder& b) override { return false; }
virtual bool is_smaller_than(frame_holder& a, frame_holder& b) override { return false; }
virtual bool skip_missing_stream( std::vector< matcher * > const & synced,
virtual bool skip_missing_stream( frame_interface const * waiting_to_be_released,
matcher * missing,
frame_interface const * last_arrived,
const syncronization_environment & env ) override
{
return false;
Expand All @@ -189,8 +191,9 @@ namespace librealsense
virtual void update_last_arrived(frame_holder& f, matcher* m) override;
bool are_equivalent(frame_holder& a, frame_holder& b) override;
bool is_smaller_than(frame_holder& a, frame_holder& b) override;
bool skip_missing_stream( std::vector< matcher * > const & synced,
bool skip_missing_stream( frame_interface const * waiting_to_be_released,
matcher * missing,
frame_interface const * last_arrived,
const syncronization_environment & env ) override;
void clean_inactive_streams(frame_holder& f) override;
void update_next_expected( std::shared_ptr< matcher > const & matcher,
Expand All @@ -208,14 +211,15 @@ namespace librealsense
bool is_smaller_than(frame_holder& a, frame_holder& b) override;
virtual void update_last_arrived(frame_holder& f, matcher* m) override;
void clean_inactive_streams(frame_holder& f) override;
bool skip_missing_stream( std::vector< matcher * > const & synced,
bool skip_missing_stream( frame_interface const * waiting_to_be_released,
matcher * missing,
frame_interface const * last_arrived,
const syncronization_environment & env ) override;
void update_next_expected( std::shared_ptr< matcher > const & matcher,
const frame_holder & f ) override;

private:
unsigned int get_fps(const frame_holder & f);
unsigned int get_fps( frame_interface const * f );
bool are_equivalent( double a, double b, unsigned int fps );
std::map<matcher*, double> _last_arrived;
std::map<matcher*, unsigned int> _fps;
Expand Down