Spark Streaming benchmark on Arm64

Spark Streaming benchmark on Arm64

 

Introduction


The streaming benchmark consists of the following parts:

  • Data Generation

The role of data generator is to generate a steady stream of data and send it to the Kafka cluster. Each record is labeled with the timestamp when it is generated.

  • Kafka cluster

Kafka is a messaging system. Each streaming workload saves the data to its own topics.

  • Test cluster

This could be a Spark cluster, Flink cluster, Storm cluster or Gearpump cluster. The streaming application (identity, repartition, wordcount, fixwindow) reads data from Kafka, processes the data and writes the results back to another topic in Kafka. Each record in the result is also labeled a timestamp.

  • Metrics reader

It reads the result in Kafka and calculate the time difference(Record out time - Record in time) and generate the report.

 

Setup


  • Python 2.x(>=2.6) is required.

We use Apache Spark 2.2.1 to streaming frameworks.

 

Hadoop Configuration


Hadoop is used to generate the input data of the workloads. Create and edit conf/hadoop.conf

cp conf/hadoop.conf.template conf/hadoop.conf

Set the below properties properly:

Property

Meaning

Property

Meaning

hibench.hadoop.home

/root/hadoop

hibench.hadoop.executable

${hibench.hadoop.home}/bin/hadoop

hibench.hadoop.configure.dir

${hibench.hadoop.home}/etc/hadoop

hibench.hdfs.master

hdfs://wls-arm-huawei01:9000

hibench.hadoop.release

apache, cdh5, hdp

Note: For CDH and HDP users, please update hibench.hadoop.executablehibench.hadoop.configure.dir and hibench.hadoop.release properly.

The default value is for Apache release.

 

Kafka Configuration


Set the below Kafka properites in conf/hibench.conf and leave others as default.

Property

Meaning

Property

Meaning

hibench.streambench.kafka.home

/root/kafka_2.10-0.8.2.2

hibench.streambench.zkHost

wls-arm-huawei01:2181

hibench.streambench.kafka.brokerList

wls-arm-huawei01:9092

hibench.streambench.kafka.topicPartitions

Number of partitions of generated topic (default 20)

 

Generate the data


Take workload identity as an example. genSeedDataset.sh generates the seed data on HDFS. dataGen.sh sends the data to Kafka.

bin/workloads/streaming/identity/prepare/genSeedDataset.sh bin/workloads/streaming/identity/prepare/dataGen.sh

Run the streaming application


While the data are being sent to the Kafka, start the streaming application. Take Spark streaming as an example.

bin/workloads/streaming/identity/spark/run.sh

 

 

Generate the report


metrics_reader.sh is used to generate the report.

bin/workloads/streaming/identity/common/metrics_reader.sh

 

Debug for running Streaming bench 


when running the streaming application: 

bin/workloads/streaming/identity/spark/run.sh

 

There are some issues as follows:

  1.  streaming identity run into ERROR CheckpointWriter: Could not submit checkpoint task to the thread pool executor

root@wls-arm-huawei01:~/HiBench# bin/workloads/streaming/identity/spark/run.sh patching args= Parsing conf: /root/HiBench/conf/hadoop.conf Parsing conf: /root/HiBench/conf/hibench.conf Parsing conf: /root/HiBench/conf/spark.conf Parsing conf: /root/HiBench/conf/workloads/streaming/identity.conf probe sleep jar: /root/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.1-tests.jar start SparkStreamingIdentity bench Export env: SPARKBENCH_PROPERTIES_FILES=/root/HiBench/report/identity/spark/conf/sparkbench/sparkbench.conf Export env: HADOOP_CONF_DIR=/root/hadoop/etc/hadoop Submit Spark job: /root/spark/bin/spark-submit --properties-file /root/HiBench/report/identity/spark/conf/sparkbench/spark.conf --class com.intel.hibench.sparkbench.streaming.RunBench --master yarn-client --num-executors 2 --executor-cores 4 --executor-memory 6g /root/HiBench/sparkbench/assembly/target/sparkbench-assembly-7.1-SNAPSHOT-dist.jar /root/HiBench/report/identity/spark/conf/sparkbench/sparkbench.conf Warning: Master yarn-client is deprecated since 2.0. Please use master "yarn" with specified deploy mode instead. metrics is being written to kafka topic SPARK_identity_1_5_50_1528365200903 Reporter Topic: SPARK_identity_1_5_50_1528365200903 18/06/07 17:53:20 INFO zkclient.ZkEventThread: Starting ZkClient event thread. 18/06/07 17:53:20 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 18/06/07 17:53:20 INFO zookeeper.ZooKeeper: Client environment:host.name=wls-x86-hp05 18/06/07 17:53:20 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_171 ........ ..... ... .. 18/06/07 17:53:34 INFO cluster.YarnClientSchedulerBackend: Application application_1528275127402_0017 has started running. 18/06/07 17:53:34 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45199. 18/06/07 17:53:34 INFO netty.NettyBlockTransferService: Server created on 10.169.41.13:45199 18/06/07 17:53:34 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 18/06/07 17:53:34 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.169.41.13, 45199, None) 18/06/07 17:53:34 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.169.41.13:45199 with 3.0 GB RAM, BlockManagerId(driver, 10.169.41.13, 45199, None) 18/06/07 17:53:34 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.169.41.13, 45199, None) 18/06/07 17:53:34 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.169.41.13, 45199, None) 18/06/07 17:53:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3fb450d7{/metrics/json,null,AVAILABLE,@Spark} 18/06/07 17:53:37 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.169.41.13:43928) with ID 1 18/06/07 17:53:37 INFO storage.BlockManagerMasterEndpoint: Registering block manager wls-x86-hp05:44621 with 3.0 GB RAM, BlockManagerId(1, wls-x86-hp05, 44621, None) 18/06/07 17:53:52 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms) java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@76efdbf9 rejected from java.util.concurrent.ThreadPoolExecut... at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:287) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:297) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@54d5eb6a rejected from java.util.concurrent.ThreadPoolExecut... at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:287) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:297) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) ..... ... .. .

 

The solutions:

  • Turn off checkpointing : 

hibench.streambench.spark.checkpointPath false
  • Set "conf/spark.conf:hibench.streambench.spark.batchInterval" to a larger value

 

2.   Process of generating data hangs

The solutions:

Modify conf/hibench.conf  to make data-generating not in infinity mode:

# Total round count of data send (default: -1 means infinity) hibench.streambench.datagen.totalRounds -1 -> 512 # Number of total records that will be generated (default: -1 means infinity) hibench.streambench.datagen.totalRecords -1 -> 1024

 

 

Spark streaming bench report


Identity:

Arch

count

throughput(msgs/s)

max_latency(ms)

mean_latency(ms)

min_latency(ms)

stddev_latency(ms)

p50_latency(ms)

p75_latency(ms)

p95_latency(ms)

p98_latency(ms)

p99_latency(ms)

p999_latency(ms)

Arm64

2015

99

2339

338.531

54

437.862

210

307

1579

1985.32

2148

2338

X86

6115

100

2269

162.566

25

349.292

80

87

974

1687.68

1817

2220


Repartition:

Arch

count

throughput(msgs/s)

max_latency(ms)

mean_latency(ms)

min_latency(ms)

stddev_latency(ms)

p50_latency(ms)

p75_latency(ms)

p95_latency(ms)

p98_latency(ms)

p99_latency(ms)

p999_latency(ms)

Arm64

2160

99

2896

458.714

89

573.476

257

351

1961.55

2447.12

2678.02

2885.433

X86

2930

99

2720

569.22

73

828.736vi

132

503

2480.45

2549.38

2573.07

2674.243

 

Wordcount:

Arch

count

throughput(msgs/s)

max_latency(ms)

mean_latency(ms)

min_latency(ms)

stddev_latency(ms)

p50_latency(ms)

p75_latency(ms)

p95_latency(ms)

p98_latency(ms)

p99_latency(ms)

p999_latency(ms)

Arm64

4145

99

3495

967.401

48

844.909

743

1482.5

2695.7

3005.36

3134.54

3445.854

X86

5674

92

4825

3879.146

2534

428.128

3901

4171.25

4586

4682

4731.5

4818