Spark Streaming benchmark on Arm64


Introduction


The streaming benchmark consists of the following parts:

  • Data Generation

The role of data generator is to generate a steady stream of data and send it to the Kafka cluster. Each record is labeled with the timestamp when it is generated.

  • Kafka cluster

Kafka is a messaging system. Each streaming workload saves the data to its own topics.

  • Test cluster

This could be a Spark cluster, Flink cluster, Storm cluster or Gearpump cluster. The streaming application (identity, repartition, wordcount, fixwindow) reads data from Kafka, processes the data and writes the results back to another topic in Kafka. Each record in the result is also labeled a timestamp.

  • Metrics reader

It reads the result in Kafka and calculate the time difference(Record out time - Record in time) and generate the report.


Setup


  • Python 2.x(>=2.6) is required.

We use Apache Spark 2.2.1 to streaming frameworks.


Hadoop Configuration


Hadoop is used to generate the input data of the workloads. Create and edit conf/hadoop.conf

cp conf/hadoop.conf.template conf/hadoop.conf

Set the below properties properly:

PropertyMeaning
hibench.hadoop.home/root/hadoop
hibench.hadoop.executable${hibench.hadoop.home}/bin/hadoop
hibench.hadoop.configure.dir${hibench.hadoop.home}/etc/hadoop
hibench.hdfs.masterhdfs://wls-arm-huawei01:9000
hibench.hadoop.releaseapache, cdh5, hdp

Note: For CDH and HDP users, please update hibench.hadoop.executablehibench.hadoop.configure.dir and hibench.hadoop.release properly.

The default value is for Apache release.


Kafka Configuration


Set the below Kafka properites in conf/hibench.conf and leave others as default.

PropertyMeaning
hibench.streambench.kafka.home/root/kafka_2.10-0.8.2.2
hibench.streambench.zkHostwls-arm-huawei01:2181
hibench.streambench.kafka.brokerListwls-arm-huawei01:9092
hibench.streambench.kafka.topicPartitionsNumber of partitions of generated topic (default 20)


Generate the data


Take workload identity as an example. genSeedDataset.sh generates the seed data on HDFS. dataGen.sh sends the data to Kafka.

bin/workloads/streaming/identity/prepare/genSeedDataset.sh
bin/workloads/streaming/identity/prepare/dataGen.sh

Run the streaming application


While the data are being sent to the Kafka, start the streaming application. Take Spark streaming as an example.

bin/workloads/streaming/identity/spark/run.sh



Generate the report


metrics_reader.sh is used to generate the report.

bin/workloads/streaming/identity/common/metrics_reader.sh


Debug for running Streaming bench 


when running the streaming application: 

bin/workloads/streaming/identity/spark/run.sh


There are some issues as follows:

  1.  streaming identity run into ERROR CheckpointWriter: Could not submit checkpoint task to the thread pool executor

root@wls-arm-huawei01:~/HiBench# bin/workloads/streaming/identity/spark/run.sh
patching args=
Parsing conf: /root/HiBench/conf/hadoop.conf
Parsing conf: /root/HiBench/conf/hibench.conf
Parsing conf: /root/HiBench/conf/spark.conf
Parsing conf: /root/HiBench/conf/workloads/streaming/identity.conf
probe sleep jar: /root/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.1-tests.jar
start SparkStreamingIdentity bench
Export env: SPARKBENCH_PROPERTIES_FILES=/root/HiBench/report/identity/spark/conf/sparkbench/sparkbench.conf
Export env: HADOOP_CONF_DIR=/root/hadoop/etc/hadoop
Submit Spark job: /root/spark/bin/spark-submit  --properties-file /root/HiBench/report/identity/spark/conf/sparkbench/spark.conf --class com.intel.hibench.sparkbench.streaming.RunBench --master yarn-client --num-executors 2 --executor-cores 4 --executor-memory 6g /root/HiBench/sparkbench/assembly/target/sparkbench-assembly-7.1-SNAPSHOT-dist.jar /root/HiBench/report/identity/spark/conf/sparkbench/sparkbench.conf
Warning: Master yarn-client is deprecated since 2.0. Please use master "yarn" with specified deploy mode instead.
metrics is being written to kafka topic SPARK_identity_1_5_50_1528365200903
Reporter Topic: SPARK_identity_1_5_50_1528365200903
18/06/07 17:53:20 INFO zkclient.ZkEventThread: Starting ZkClient event thread.
18/06/07 17:53:20 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
18/06/07 17:53:20 INFO zookeeper.ZooKeeper: Client environment:host.name=wls-x86-hp05
18/06/07 17:53:20 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_171
........
.....
...
..


18/06/07 17:53:34 INFO cluster.YarnClientSchedulerBackend: Application application_1528275127402_0017 has started running.
18/06/07 17:53:34 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45199.
18/06/07 17:53:34 INFO netty.NettyBlockTransferService: Server created on 10.169.41.13:45199
18/06/07 17:53:34 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/06/07 17:53:34 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.169.41.13, 45199, None)
18/06/07 17:53:34 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.169.41.13:45199 with 3.0 GB RAM, BlockManagerId(driver, 10.169.41.13, 45199, None)
18/06/07 17:53:34 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.169.41.13, 45199, None)
18/06/07 17:53:34 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.169.41.13, 45199, None)
18/06/07 17:53:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3fb450d7{/metrics/json,null,AVAILABLE,@Spark}
18/06/07 17:53:37 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.169.41.13:43928) with ID 1
18/06/07 17:53:37 INFO storage.BlockManagerMasterEndpoint: Registering block manager wls-x86-hp05:44621 with 3.0 GB RAM, BlockManagerId(1, wls-x86-hp05, 44621, None)
18/06/07 17:53:52 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@76efdbf9 rejected from java.util.concurrent.ThreadPoolExecut...
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:287)
        at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:297)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@54d5eb6a rejected from java.util.concurrent.ThreadPoolExecut...
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:287)
        at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:297)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
.....
...
..
.


The solutions:

  • Turn off checkpointing : 
hibench.streambench.spark.checkpointPath false
  • Set "conf/spark.conf:hibench.streambench.spark.batchInterval" to a larger value


2.   Process of generating data hangs

The solutions:

Modify conf/hibench.conf  to make data-generating not in infinity mode:

# Total round count of data send (default: -1 means infinity)
hibench.streambench.datagen.totalRounds         -1   ->  512
# Number of total records that will be generated (default: -1 means infinity)
hibench.streambench.datagen.totalRecords        -1   ->  1024



Spark streaming bench report


Identity:

Archcountthroughput(msgs/s)max_latency(ms)mean_latency(ms)min_latency(ms)stddev_latency(ms)p50_latency(ms)p75_latency(ms)p95_latency(ms)p98_latency(ms)p99_latency(ms)p999_latency(ms)
Arm642015992339338.53154437.86221030715791985.3221482338
X8661151002269162.56625349.29280879741687.6818172220


Repartition:

Archcountthroughput(msgs/s)max_latency(ms)mean_latency(ms)min_latency(ms)stddev_latency(ms)p50_latency(ms)p75_latency(ms)p95_latency(ms)p98_latency(ms)p99_latency(ms)p999_latency(ms)
Arm642160992896458.71489573.4762573511961.552447.122678.022885.433
X862930992720569.2273828.736vi1325032480.452549.382573.072674.243


Wordcount:

Archcountthroughput(msgs/s)max_latency(ms)mean_latency(ms)min_latency(ms)stddev_latency(ms)p50_latency(ms)p75_latency(ms)p95_latency(ms)p98_latency(ms)p99_latency(ms)p999_latency(ms)
Arm644145993495967.40148844.9097431482.52695.73005.363134.543445.854
X8656749248253879.1462534428.12839014171.25458646824731.54818