Spark vs Essentia: Runtime Performance Comparison

Apache Spark and Essentia are both scalable distributed processing platforms for big data workloads. They both allow to quickly transform data at scales larger than Python can handle effectively. Apache Spark is supported in Amazon EMR, and Essentia is offered through AWS Marketplace.

Performance differences across platforms are not well-known, so I tested both to learn how they perform comparing speed differences across the entire process from dataset transformation to exporting results to S3.

 

Goal

Compare performance creating a pivot table from Twitter data already preprocessed like the dataset below and export via a compressed csv format into Amazon S3 by

  1. PySpark on EMR
  2. Essentia

The transformed data is sparse so compression efficiently reduces the size of the final result.

Before:

After: [After pivoting with key: id label: word value: sum of count ]

 

Environments

PySpark on Amazon EMR

    1. Instance Type: m4.large (4 vCore, 8GiB memory, 2vCPU)
    2. Number of Instances : 1 (master node only)
    3. Hadoop version: 2.8.5-amzn-4
      Pyspark: Python version 3.6.8
      mapreduce.fileoutputcommitter.algorithm.version: 2 (This is the default value since Spark 2.0.1. It enables to avoid temporary writes to S3. To know more about this setting, please read XXX)

Essentia

    1. Instance Type: m4.large (4 vCore, 8GiB memory, 2vCPU)
    2. Number of Instances : 1 (master node only)
    3. Essentia : 4.0.0.2
      AQ Tools : AQ Tools : 2.0.1-2 — Wed Aug 14 17:00:00 PDT 201

 

Data

A. id-word-count data from Twitter

    • Already cleaned and finished text processing
    • Picked top 20,000 words in terms of frequency of occurrence
    • 1,696,011 rows x 3 columns

B. Size: 43.8MB

 

Test Result

PySpark on Amazon EMR

    • Total Runtime: Couldn’t complete the jobs due to insufficient memory

    • Error Log:

Essentia

    • Total Runtime: 120 secs (Average runtime from 3 times of the execution, 120,119 and 122 secs)
      Pivot execution and export data into S3: 120 secs
    • Output result:
      Compressed size: 21.7MB / Decompressed size: 12.4GB
      ※ For reference the scripts used are placed in the Appendix at the end of this blog.

In the results shown above, Spark couldn’t finish the jobs on m4.large instance with 8 GiB memory while Essentia finished in 2 minutes. Essentia doesn’t use as much memory when creating a pivot table and saving files in S3, instead streaming results directly to S3. This enables saving while compressing. In fact, memory usage during the process didn’t exceed 10% until the job was done. On the other hand, Spark needs memory allocation during execution.

A second test added one more instance as a worker node to both Spark and Essentia, and we evaluated performance again. 

PySpark on Amazon EMR

a. Directly save compressed csv in S3
Total Runtime: 124 mins 21 secs

      • Assign pivot: 21 secs
      • Export data to S3: 124 mins

b. Save compressed csv in HDFS and copy to S3
Total Runtime:

      • Assign pivot: 21 secs
      • Export data to HDFS: 125 mins
      • Copy to S3: 1 mins 49 secs

Essentia

a. Total Runtime: 119 secs
Pivot + Export data to S3

In the chart above we see that PySpark was able to successfully complete the operation, but performance was about 60x slower in comparison to Essentia.

 

Appendix

1-a. Performance Notes of Additional Test (Save in S3/Spark on EMR)

  1. Assign pivot transformation
  2. Pivot execution and save compressed csv to S3

1-b. Performance Notes of Additional Test (Save in HDFS/Spark on EMR)

  1. Assign pivot transformation
  2. Pivot execution and save compressed csv to S3

2. Performance Notes of Additional Test (Essentia)

3. Scripts

  • PySpark
%%info

#Import SQLContext
from pyspark.sql import SQLContext

#sc is automatically created when initialize notebook on EMR
sql = SQLContext(sc)
df_top_only = (sql.read
         .format("com.databricks.spark.csv")
         .option("header", "true").option("inferSchema", "true")
         .load("s3://pivot-spark-test/before/only_top_words/twitter_word_top20000.csv"))

spark.conf.set("spark.sql.pivotMaxValues", 100000)

#Assign pivot transformation
pivot_df = df_top_only.groupBy("tw_id").pivot("word").sum("count").fillna(0)
#A)Execute pivot transformation and write csv to Amazon S3 with gzip
pivot_df.write.csv("s3://pivot-spark-test/after/pyspark/", compression="gzip", header = True,mode= "overwrite")
#B)Execute pivot transformation and write csv to hdfs with gzip
pivot_df.write.csv("hdfs:///output/pivot", compression="gzip",header = True,mode= "overwrite")
  • Essentia
#!/bin/bash

OUTPUT="--debug --s3out pivot-spark-test:/after"
COL="s:id s:word i:count"
DB_SPEC="s,pkey:id s,+key:word i,+add:count"
OPT="--debug"


#Set the input data path in S3
ess select s3://pivot-spark-test
ess category add twitter "before/only_top_words/twitter_word_top20000.csv" --noprobe --overwrite

#Create database
ess server reset
ess create database test --port 0
ess create table before $DB_SPEC
ess server commit

#Import data into database
ess stream twitter "*" "*" "aq_pp -f,+1 - -d $COL -imp test:before" $OPT

pivot_count() {
	start_time=`date +%s`
	#Pivot and Export the compressed file to s3
	ess exec "aq_udb -exp test:before -o,aq - | aq_rst -f,aq - -key id -lab word -val count -o - | gzip" --master $OUTPUT/after_pivot_with_ess.csv.gz
	end_time=`date +%s`
	run_time=$((end_time - start_time))
	echo "Run time is $run_time"
}
#Execute function
pivot_count