Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] data corruption in GpuBroadcastNestedLoopJoin with empty relations edge case #4188

Closed
abellina opened this issue Nov 22, 2021 · 0 comments · Fixed by #4228
Closed

[BUG] data corruption in GpuBroadcastNestedLoopJoin with empty relations edge case #4188

abellina opened this issue Nov 22, 2021 · 0 comments · Fixed by #4228
Assignees
Labels
bug Something isn't working P0 Must have for release

Comments

@abellina
Copy link
Collaborator

abellina commented Nov 22, 2021

I am testing a patch for empty relations for the broadcast* joins and ran into an edge case that is a bug in 21.12 where we return nothing when the CPU returns results, or the other way around (we return a result where the CPU returns nothing), this is specific to unconditional joins.

This test can reproduce the issue:

@ignore_order(local=True)
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross'], ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn)
def test_right_broadcast_nested_loop_join_without_condition_empty(join_type, batch_size):
    def do_join(spark):
        left, right = create_df(spark, long_gen, 50, 0)
        return left.join(broadcast(right), how=join_type)
    conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size})
    assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)

This will fail for BuildRight: Left, LeftSemi and LeftAnti. I think I have a good idea about where to fix for the LeftSemi case anyway. We currently just return the left relation, whereas spark has other logic here:

  private def leftExistenceJoin(
      relation: Broadcast[Array[InternalRow]],
      exists: Boolean): RDD[InternalRow] = {
    assert(buildSide == BuildRight)
    streamed.execute().mapPartitionsInternal { streamedIter =>
      val buildRows = relation.value
      val joinedRow = new JoinedRow

      if (condition.isDefined) {
        streamedIter.filter(l =>
          buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists
        )
      } else if (buildRows.nonEmpty == exists) {
        streamedIter
      } else {
        Iterator.empty
      }
    }
  }

I think this also covers LeftAnti, but I haven't tested it.

@abellina abellina added bug Something isn't working ? - Needs Triage Need team to review and classify labels Nov 22, 2021
@tgravescs tgravescs added the P0 Must have for release label Nov 22, 2021
@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label Nov 23, 2021
@sameerz sameerz added this to the Nov 30 - Dec 10 milestone Dec 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0 Must have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants