Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Latency Spike during Spark Structured Streaming #2165

Open
1 of 2 tasks
sinban04 opened this issue Nov 16, 2023 · 2 comments
Open
1 of 2 tasks

Latency Spike during Spark Structured Streaming #2165

sinban04 opened this issue Nov 16, 2023 · 2 comments

Comments

@sinban04
Copy link

sinban04 commented Nov 16, 2023

What kind an issue is this?

  • Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
    The easier it is to track down the bug, the faster it is solved.
  • Feature Request. Start by telling us what problem you’re trying to solve.
    Often a solution already exists! Don’t send pull requests to implement new features without
    first getting our support. Sometimes we leave features out on purpose to keep the project small.

Issue description

I'm using elasticsearch-hadoop to stream data into Elasticsearch server using Spark Structured Streaming.
But during streaming, it shows repetitive and periodic latency spike of Operation Duration in spark.

image

Spark

When i tried Structured Streaming w/ console sink (print to console), it shows the stable low latency

image

ES

I digged into ES montoring w/ Grafana, but it shows nothing special w/ ES.
It shows indexing time lower than 50ms and it doesn't matter w/ that high latency in streaming (about 50s)

ES-Hadoop connector

I saw the elasticsearch-hadoop code and i found out that it uses Bulk API to send Dataframe to ES w/ Hadoop HTTP Client.
At first, i believed that ES does not response fast enough to spark to commit the operation even if it has done indexing
But, for now i have no idea about this latency spike

Do you have any case like this ?

Steps to reproduce

  • Just simply use Spark Structured Streaming
    # ES Configs
    val spark = 
            SparkSession.builder
            .appName(s"Mobile Click Streaming => $datetime")
...
            .config("es.port", "10200")
            .config("es.nodes.wan.only","true")
            .config("es.index.auto.create", "true")
...
            .getOrCreate

    val esQuery = df
      .writeStream
      .outputMode("append")
      .queryName("writing_to_es")
      .format("org.elasticsearch.spark.sql")
      .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("60 seconds"))
      .start("feedback_latency_debug")

Version Info

OS: : Centos7

$ lsb_release -d
Description:    CentOS Linux release 7.9.2009 (Core)

JVM : Java 1.8.0_112

$ java -version
java version "1.8.0_112"
Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)

Hadoop/Spark: Spark 3.1.2-2, Hadoop 3.1.1.3.1.2-39
ES-Hadoop : 7.12.1 (Scala Spark https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30)
ES : 7.10.1 (Elasticsearch Server)

Feature description

@sinban04
Copy link
Author

sinban04 commented Nov 22, 2023

Pyspark w/ Python bulk api

I tested the same task w/ Elasticsearch Python API
I changed the implementation of spark structured streaming from scala into python to use python API.
I used Pyspark w/ same spark version(3.1.2-2) and used foreachBatch instead of Elasticsearch Sink.
In order to use python Bulk API, i used collect() and toJSON() to make data list

    def updateToESPythonAPI(df, epoch_id):
        es = Elasticsearch([
            'http://esfarm-cluster.~~~~~~~.com:10200',
        ], http_auth=('id', 'pw!'))
        index_name = "structured_python01"
        data = []

        for row in df.toJSON().collect():
            json_ = json.loads(row)
            data.append(
                {
                    "_index": index_name,
                    "_id": json_['doc_id'],
                    "_source": row,
                }
            )
        count = len(data)
        start = time.time()
        helpers.bulk(es, data)
        end = time.time()
        print("ES Bulk Sended !")
        print(f"It took {end - start:.5f} sec sending {count} items. \n")

and it showed no latency spike
image

I doubt there's some bug in elasticsearch-spark-30 connector

Or just the discrepancy of the version could raise unexpected behavior ?

Pyspark w/ elasticsearch Sink

And w/ Elasticsearch Connector,
it shows same results as in Scala Spark

image

@sinban04
Copy link
Author

Scala Spark + foreachBatch + Batch API

Hi guys, i tested again with Scala Spark (3.1.2-2)
using normal batch API (df.write.format("org.elasticsearch.spark.sql") ) w/ foreachBatch

It's just as same as Streaming Sink but, rather used batch API in foreachBatch function.

        val esQuery = df
          .writeStream
          .queryName("ES Bulk Update")
          .option("truncate", "false")
          .format("console")
          .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("5 seconds"))
          .outputMode("append")
          .foreachBatch { 
            (batchDF: DataFrame, batchId: Long) =>
              ...
              batchDF.write
                  .format("org.elasticsearch.spark.sql")
                  .option("es.nodes", endpoint)
                  .option("es.resource", index)
                  .option("es.mapping.id", "doc_id") 
                  .option("es.net.http.auth.user", username)
                  .option("es.net.http.auth.pass", password)
                  .option("es.nodes.wan.only", "true")
                  .option("es.write.operation", "upsert")
                  .mode("append")
                  .save()
          }
          .start()

And it displayed no latency spike

image

I believe that there's some problem in the writeStream API of connector

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant