Spark Streaming benchmark on Arm64
Introduction
The streaming benchmark consists of the following parts:
- Data Generation
The role of data generator is to generate a steady stream of data and send it to the Kafka cluster. Each record is labeled with the timestamp when it is generated.
- Kafka cluster
Kafka is a messaging system. Each streaming workload saves the data to its own topics.
- Test cluster
This could be a Spark cluster, Flink cluster, Storm cluster or Gearpump cluster. The streaming application (identity, repartition, wordcount, fixwindow) reads data from Kafka, processes the data and writes the results back to another topic in Kafka. Each record in the result is also labeled a timestamp.
- Metrics reader
It reads the result in Kafka and calculate the time difference(Record out time - Record in time) and generate the report.
Setup
- Python 2.x(>=2.6) is required.
Supported Hadoop version: Apache Hadoop 2.x, CDH5.x, HDP
Supported Spark version: 1.6.x, 2.0.x, 2.1.x, 2.2.x
Build HiBench according to build HiBench.
Start HDFS, Yarn in the cluster.
Setup Apache Kafka (0.8.2.2, scala version 2.10 is preferred): Start Kafka
Setup one of the streaming frameworks that you want to test.
Apache Spark (1.6.1, is preferred)
Apache Storm (1.0.1 is preferred).
Apache Flink (1.0.3 is prefered).
Apache Gearpump (0.8.1 is prefered)
We use Apache Spark 2.2.1 to streaming frameworks.
Hadoop Configuration
Hadoop is used to generate the input data of the workloads. Create and edit conf/hadoop.conf
:
cp conf/hadoop.conf.template conf/hadoop.conf
Set the below properties properly:
Property | Meaning |
---|---|
hibench.hadoop.home | /root/hadoop |
hibench.hadoop.executable | ${hibench.hadoop.home}/bin/hadoop |
hibench.hadoop.configure.dir | ${hibench.hadoop.home}/etc/hadoop |
hibench.hdfs.master | hdfs://wls-arm-huawei01:9000 |
hibench.hadoop.release | apache, cdh5, hdp |
Note: For CDH and HDP users, please update hibench.hadoop.executable
, hibench.hadoop.configure.dir
and hibench.hadoop.release
properly.
The default value is for Apache release.
Kafka Configuration
Set the below Kafka properites in conf/hibench.conf
and leave others as default.
Property | Meaning |
---|---|
hibench.streambench.kafka.home | /root/kafka_2.10-0.8.2.2 |
hibench.streambench.zkHost | wls-arm-huawei01:2181 |
hibench.streambench.kafka.brokerList | wls-arm-huawei01:9092 |
hibench.streambench.kafka.topicPartitions | Number of partitions of generated topic (default 20) |
Generate the data
Take workload identity
as an example. genSeedDataset.sh
generates the seed data on HDFS. dataGen.sh
sends the data to Kafka.
bin/workloads/streaming/identity/prepare/genSeedDataset.sh bin/workloads/streaming/identity/prepare/dataGen.sh
Run the streaming application
While the data are being sent to the Kafka, start the streaming application. Take Spark streaming as an example.
bin/workloads/streaming/identity/spark/run.sh
Generate the report
metrics_reader.sh
is used to generate the report.
bin/workloads/streaming/identity/common/metrics_reader.sh
Debug for running Streaming bench
when running the streaming application:
bin/workloads/streaming/identity/spark/run.sh
There are some issues as follows:
streaming identity run into ERROR CheckpointWriter: Could not submit checkpoint task to the thread pool executor
root@wls-arm-huawei01:~/HiBench# bin/workloads/streaming/identity/spark/run.sh patching args= Parsing conf: /root/HiBench/conf/hadoop.conf Parsing conf: /root/HiBench/conf/hibench.conf Parsing conf: /root/HiBench/conf/spark.conf Parsing conf: /root/HiBench/conf/workloads/streaming/identity.conf probe sleep jar: /root/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.1-tests.jar start SparkStreamingIdentity bench Export env: SPARKBENCH_PROPERTIES_FILES=/root/HiBench/report/identity/spark/conf/sparkbench/sparkbench.conf Export env: HADOOP_CONF_DIR=/root/hadoop/etc/hadoop Submit Spark job: /root/spark/bin/spark-submit --properties-file /root/HiBench/report/identity/spark/conf/sparkbench/spark.conf --class com.intel.hibench.sparkbench.streaming.RunBench --master yarn-client --num-executors 2 --executor-cores 4 --executor-memory 6g /root/HiBench/sparkbench/assembly/target/sparkbench-assembly-7.1-SNAPSHOT-dist.jar /root/HiBench/report/identity/spark/conf/sparkbench/sparkbench.conf Warning: Master yarn-client is deprecated since 2.0. Please use master "yarn" with specified deploy mode instead. metrics is being written to kafka topic SPARK_identity_1_5_50_1528365200903 Reporter Topic: SPARK_identity_1_5_50_1528365200903 18/06/07 17:53:20 INFO zkclient.ZkEventThread: Starting ZkClient event thread. 18/06/07 17:53:20 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 18/06/07 17:53:20 INFO zookeeper.ZooKeeper: Client environment:host.name=wls-x86-hp05 18/06/07 17:53:20 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_171 ........ ..... ... .. 18/06/07 17:53:34 INFO cluster.YarnClientSchedulerBackend: Application application_1528275127402_0017 has started running. 18/06/07 17:53:34 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45199. 18/06/07 17:53:34 INFO netty.NettyBlockTransferService: Server created on 10.169.41.13:45199 18/06/07 17:53:34 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 18/06/07 17:53:34 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.169.41.13, 45199, None) 18/06/07 17:53:34 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.169.41.13:45199 with 3.0 GB RAM, BlockManagerId(driver, 10.169.41.13, 45199, None) 18/06/07 17:53:34 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.169.41.13, 45199, None) 18/06/07 17:53:34 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.169.41.13, 45199, None) 18/06/07 17:53:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3fb450d7{/metrics/json,null,AVAILABLE,@Spark} 18/06/07 17:53:37 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.169.41.13:43928) with ID 1 18/06/07 17:53:37 INFO storage.BlockManagerMasterEndpoint: Registering block manager wls-x86-hp05:44621 with 3.0 GB RAM, BlockManagerId(1, wls-x86-hp05, 44621, None) 18/06/07 17:53:52 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms) java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@76efdbf9 rejected from java.util.concurrent.ThreadPoolExecut... at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:287) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:297) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@54d5eb6a rejected from java.util.concurrent.ThreadPoolExecut... at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:287) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:297) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) ..... ... .. .
The solutions:
- Turn off checkpointing :
hibench.streambench.spark.checkpointPath false
- Set "conf/spark.conf:hibench.streambench.spark.batchInterval" to a larger value
2. Process of generating data hangs
The solutions:
Modify conf/hibench.conf to make data-generating not in infinity mode:
# Total round count of data send (default: -1 means infinity) hibench.streambench.datagen.totalRounds -1 -> 512 # Number of total records that will be generated (default: -1 means infinity) hibench.streambench.datagen.totalRecords -1 -> 1024
Spark streaming bench report
Identity:
Arch | count | throughput(msgs/s) | max_latency(ms) | mean_latency(ms) | min_latency(ms) | stddev_latency(ms) | p50_latency(ms) | p75_latency(ms) | p95_latency(ms) | p98_latency(ms) | p99_latency(ms) | p999_latency(ms) |
Arm64 | 2015 | 99 | 2339 | 338.531 | 54 | 437.862 | 210 | 307 | 1579 | 1985.32 | 2148 | 2338 |
X86 | 6115 | 100 | 2269 | 162.566 | 25 | 349.292 | 80 | 87 | 974 | 1687.68 | 1817 | 2220 |
Repartition:
Arch | count | throughput(msgs/s) | max_latency(ms) | mean_latency(ms) | min_latency(ms) | stddev_latency(ms) | p50_latency(ms) | p75_latency(ms) | p95_latency(ms) | p98_latency(ms) | p99_latency(ms) | p999_latency(ms) |
Arm64 | 2160 | 99 | 2896 | 458.714 | 89 | 573.476 | 257 | 351 | 1961.55 | 2447.12 | 2678.02 | 2885.433 |
X86 | 2930 | 99 | 2720 | 569.22 | 73 | 828.736vi | 132 | 503 | 2480.45 | 2549.38 | 2573.07 | 2674.243 |
Wordcount:
Arch | count | throughput(msgs/s) | max_latency(ms) | mean_latency(ms) | min_latency(ms) | stddev_latency(ms) | p50_latency(ms) | p75_latency(ms) | p95_latency(ms) | p98_latency(ms) | p99_latency(ms) | p999_latency(ms) |
Arm64 | 4145 | 99 | 3495 | 967.401 | 48 | 844.909 | 743 | 1482.5 | 2695.7 | 3005.36 | 3134.54 | 3445.854 |
X86 | 5674 | 92 | 4825 | 3879.146 | 2534 | 428.128 | 3901 | 4171.25 | 4586 | 4682 | 4731.5 | 4818 |