Table of Contents |
---|
Introduction
...
The streaming benchmark consists of the following parts:
- Data Generation
The role of data generator is to generate a steady stream of data and send it to the Kafka cluster. Each record is labeled with the timestamp when it is generated.
- Kafka cluster
Kafka is a messaging system. Each streaming workload saves the data to its own topics.
- Test cluster
This could be a Spark cluster, Flink cluster, Storm cluster or Gearpump cluster. The streaming application (identity, repartition, wordcount, fixwindow) reads data from Kafka, processes the data and writes the results back to another topic in Kafka. Each record in the result is also labeled a timestamp.
- Metrics reader
It reads the result in Kafka and calculate the time difference(Record out time - Record in time) and generate the report.
Setup
...
- Python 2.x(>=2.6) is required.
Supported Hadoop version: Apache Hadoop 2.x, CDH5.x, HDP
Supported Spark version: 1.6.x, 2.0.x, 2.1.x, 2.2.x
Build HiBench according to build HiBench.
Start HDFS, Yarn in the cluster.
Setup Apache Kafka (0.8.2.2, scala version 2.10 is preferred): Start Kafka
Setup one of the streaming frameworks that you want to test.
Apache Spark (1.6.1, is preferred)
Apache Storm (1.0.1 is preferred).
Apache Flink (1.0.3 is prefered).
Apache Gearpump (0.8.1 is prefered)
We use Apache Spark 2.2.1 to streaming frameworks.
Hadoop Configuration
...
Hadoop is used to generate the input data of the workloads. Create and edit conf/hadoop.conf
:
Code Block | ||
---|---|---|
| ||
cp conf/hadoop.conf.template conf/hadoop.conf |
Set the below properties properly:
Property | Meaning |
---|---|
hibench.hadoop.home | /root/hadoop |
hibench.hadoop.executable | ${hibench.hadoop.home}/bin/hadoop |
hibench.hadoop.configure.dir | ${hibench.hadoop.home}/etc/hadoop |
hibench.hdfs.master | hdfs://wls-arm-huawei01:9000 |
hibench.hadoop.release | apache, cdh5, hdp |
Note: For CDH and HDP users, please update hibench.hadoop.executable
, hibench.hadoop.configure.dir
and hibench.hadoop.release
properly.
The default value is for Apache release.
Kafka Configuration
...
Set the below Kafka properites in conf/hibench.conf
and leave others as default.
Property | Meaning |
---|---|
hibench.streambench.kafka.home | /root/kafka_2.10-0.8.2.2 |
hibench.streambench.zkHost | wls-arm-huawei01:2181 |
hibench.streambench.kafka.brokerList | wls-arm-huawei01:9092 |
hibench.streambench.kafka.topicPartitions | Number of partitions of generated topic (default 20) |
Generate the data
...
Take workload identity
as an example. genSeedDataset.sh
generates the seed data on HDFS. dataGen.sh
sends the data to Kafka.
Code Block | ||
---|---|---|
| ||
bin/workloads/streaming/identity/prepare/genSeedDataset.sh
bin/workloads/streaming/identity/prepare/dataGen.sh |
Run the streaming application
...
While the data are being sent to the Kafka, start the streaming application. Take Spark streaming as an example.
Code Block | ||
---|---|---|
| ||
bin/workloads/streaming/identity/spark/run.sh |
Generate the report
...
metrics_reader.sh
is used to generate the report.
Code Block | ||
---|---|---|
| ||
bin/workloads/streaming/identity/common/metrics_reader.sh |
Debug for running Streaming bench
...
when running the streaming application:
Code Block | ||
---|---|---|
| ||
bin/workloads/streaming/identity/spark/run.sh |
There are some issues as follows:
streaming identity run into ERROR CheckpointWriter: Could not submit checkpoint task to the thread pool executor
Code Block | ||
---|---|---|
| ||
root@wls-arm-huawei01:~/HiBench# bin/workloads/streaming/identity/spark/run.sh
patching args=
Parsing conf: /root/HiBench/conf/hadoop.conf
Parsing conf: /root/HiBench/conf/hibench.conf
Parsing conf: /root/HiBench/conf/spark.conf
Parsing conf: /root/HiBench/conf/workloads/streaming/identity.conf
probe sleep jar: /root/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.1-tests.jar
start SparkStreamingIdentity bench
Export env: SPARKBENCH_PROPERTIES_FILES=/root/HiBench/report/identity/spark/conf/sparkbench/sparkbench.conf
Export env: HADOOP_CONF_DIR=/root/hadoop/etc/hadoop
Submit Spark job: /root/spark/bin/spark-submit --properties-file /root/HiBench/report/identity/spark/conf/sparkbench/spark.conf --class com.intel.hibench.sparkbench.streaming.RunBench --master yarn-client --num-executors 2 --executor-cores 4 --executor-memory 6g /root/HiBench/sparkbench/assembly/target/sparkbench-assembly-7.1-SNAPSHOT-dist.jar /root/HiBench/report/identity/spark/conf/sparkbench/sparkbench.conf
Warning: Master yarn-client is deprecated since 2.0. Please use master "yarn" with specified deploy mode instead.
metrics is being written to kafka topic SPARK_identity_1_5_50_1528365200903
Reporter Topic: SPARK_identity_1_5_50_1528365200903
18/06/07 17:53:20 INFO zkclient.ZkEventThread: Starting ZkClient event thread.
18/06/07 17:53:20 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
18/06/07 17:53:20 INFO zookeeper.ZooKeeper: Client environment:host.name=wls-x86-hp05
18/06/07 17:53:20 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_171
........
.....
...
..
18/06/07 17:53:34 INFO cluster.YarnClientSchedulerBackend: Application application_1528275127402_0017 has started running.
18/06/07 17:53:34 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45199.
18/06/07 17:53:34 INFO netty.NettyBlockTransferService: Server created on 10.169.41.13:45199
18/06/07 17:53:34 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/06/07 17:53:34 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.169.41.13, 45199, None)
18/06/07 17:53:34 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.169.41.13:45199 with 3.0 GB RAM, BlockManagerId(driver, 10.169.41.13, 45199, None)
18/06/07 17:53:34 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.169.41.13, 45199, None)
18/06/07 17:53:34 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.169.41.13, 45199, None)
18/06/07 17:53:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3fb450d7{/metrics/json,null,AVAILABLE,@Spark}
18/06/07 17:53:37 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.169.41.13:43928) with ID 1
18/06/07 17:53:37 INFO storage.BlockManagerMasterEndpoint: Registering block manager wls-x86-hp05:44621 with 3.0 GB RAM, BlockManagerId(1, wls-x86-hp05, 44621, None)
18/06/07 17:53:52 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@76efdbf9 rejected from java.util.concurrent.ThreadPoolExecut...
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:287)
at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:297)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@54d5eb6a rejected from java.util.concurrent.ThreadPoolExecut...
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:287)
at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:297)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
.....
...
..
. |
The solutions:
- Turn off checkpointing :
Code Block | ||
---|---|---|
| ||
hibench.streambench.spark.checkpointPath false |
- Set "conf/spark.conf:hibench.streambench.spark.batchInterval" to a larger value
2. Process of generating data hangs
The solutions:
Modify conf/hibench.conf to make data-generating not in infinity mode:
Code Block | ||
---|---|---|
| ||
# Total round count of data send (default: -1 means infinity)
hibench.streambench.datagen.totalRounds -1 -> 512
# Number of total records that will be generated (default: -1 means infinity)
hibench.streambench.datagen.totalRecords -1 -> 1024 |
Spark streaming bench report
...
Identity:
Arch | count | throughput(msgs/s) | max_latency(ms) | mean_latency(ms) | min_latency(ms) | stddev_latency(ms) | p50_latency(ms) | p75_latency(ms) | p95_latency(ms) | p98_latency(ms) | p99_latency(ms) | p999_latency(ms) |
Arm64 | 2015 | 99 | 2339 | 338.531 | 54 | 437.862 | 210 | 307 | 1579 | 1985.32 | 2148 | 2338 |
X86 | 6115 | 100 | 2269 | 162.566 | 25 | 349.292 | 80 | 87 | 974 | 1687.68 | 1817 | 2220 |
Repartition:
Arch | count | throughput(msgs/s) | max_latency(ms) | mean_latency(ms) | min_latency(ms) | stddev_latency(ms) | p50_latency(ms) | p75_latency(ms) | p95_latency(ms) | p98_latency(ms) | p99_latency(ms) | p999_latency(ms) |
Arm64 | 2160 | 99 | 2896 | 458.714 | 89 | 573.476 | 257 | 351 | 1961.55 | 2447.12 | 2678.02 | 2885.433 |
X86 | 2930 | 99 | 2720 | 569.22 | 73 | 828.736vi | 132 | 503 | 2480.45 | 2549.38 | 2573.07 | 2674.243 |
Wordcount:
Arch | count | throughput(msgs/s) | max_latency(ms) | mean_latency(ms) | min_latency(ms) | stddev_latency(ms) | p50_latency(ms) | p75_latency(ms) | p95_latency(ms) | p98_latency(ms) | p99_latency(ms) | p999_latency(ms) |
Arm64 | 4145 | 99 | 3495 | 967.401 | 48 | 844.909 | 743 | 1482.5 | 2695.7 | 3005.36 | 3134.54 | 3445.854 |
X86 | 5674 | 92 | 4825 | 3879.146 | 2534 | 428.128 | 3901 | 4171.25 | 4586 | 4682 | 4731.5 | 4818 |