Spark 3.0 new features – Learning from Dr.Kazuaki Ishizaki

Dr.Kazuaki Ishizaki gives a great summary of spark 3.0 features in his presentation “SQL Performance Improvements at a Glance in Apache Spark 3.0” . It is very helpful for us to understand how these new features work and where we can use it.

New explain format

Spark 3.0 provides a terse format explain with detail information.

EXPLAIN [ EXTENDED | CODEGEN | COST | FORMATTED ] statement

There are five formats:

  1. default. Physical plan only.
  2. extended. It equals df.explain(true) in spark 2.4, which generates parsed logical plan, analyzed logical plan , optimized logical plan and physical plan.
  3. codegen. Generates java code for the statement.
  4. code. If plan stats are available, it generates a logical plan and the states.
  5. formatted. This is most useful in my mind. It has two sections, a physical plan outline with simple tree format and node details.
-- example from spark document
-- Using Formatted
EXPLAIN FORMATTED select k, sum(v) from values (1, 2), (1, 3) t(k, v) group by k;
+----------------------------------------------------+
|                                                plan|
+----------------------------------------------------+
| == Physical Plan ==
 * HashAggregate (4)
 +- Exchange (3)
    +- * HashAggregate (2)
       +- * LocalTableScan (1)
   
   
 (1) LocalTableScan [codegen id : 1]
 Output: [k#19, v#20]
        
 (2) HashAggregate [codegen id : 1]
 Input: [k#19, v#20]
        
 (3) Exchange
 Input: [k#19, sum#24L]
        
 (4) HashAggregate [codegen id : 2]
 Input: [k#19, sum#24L]
|
+----------------------------------------------------+

As these syntax are not exactly match with spark, the following list is help to explain the “meaning of spark explain”

  • scan. Basic file access. In spark 3.0, it can achieve some predication tasks before load data in.
    • ColumnPruning: select the columns only needed.
    • Partitionfilters: only grab data from certain partitions
    • Pushedfilters: filter fields that can be directly to file scan(push down prediction)
  • filter. Due to pushdown prediction, lots of filter work has moved to scan stage, so you may not find filter in explain matching with query. But there are still some operations like first, last, we need to do it in filter.
    • Pushdown prediction.
    • Combine filters: combines two neighboring operations into one
    • Infer filter from constraints. create a new filter form a join condition. we will talk about it in next section “dynamic partitioning pruning”.
    • prune filter.
  • project. Select operation for columns, like select, drop, withColumn.
  • exchange. shuffle operation, like sortmerge, shuffle hash
  • HashAggregate. data aggregation.
  • BroadcastHashJoin & broadcastExchange. Broadcast shuffle.
  • columnarToRow. a transition between columnar and row execution.

All type of Join hints

Spark 2.4 only supports broadcast, while spark 3.0 support all type of join hints

Spark uses two types of hints, one is partition hints, other is join hints. Since spark 3.0, join hints support all type of join.

  • Broadcast join. which is famous join for joining small table(dimension table) with big table(fact table) by avoiding costly data shuffling.
    • table less than 10MB is broadcast across all nodes to avoid shuffling
    • two steps: broadcast –> hash join
    • spark.sql.autoBroadcastJoinThreshold
  • shuffle merge join
    • Sort merge join perform the Sort operation first and then merges the datasets.
    • steps:
      • shuffle. 2 big tables are partitioned as per the join keys across the partitions.
      • sort. sort the data within each partition
      • merge. join the 2 sorted and partitioned data.
    • work well when
      • two big tables as it doesn’t need load all data into memory like hash join
      • highly scalable approach
  • shuffle hash join 
    • Shuffle hash join shuffles the data based on join key, so that rows related to same keys from both tables will be moved on to same node and then perform the join.
    • works well when
      • dataframes are distributed evenly with the keys
      • dataframes has enough number of keys for parallelism 
      • memory is enough for hash join
    • supported for all join except full outer join
    • spark.sql.join.preferSortMergeJoin = false
  • shuffle replicate nl
    • cartesian product(similar to SQL) of the two relations is calculated to evaluate join.

Adaptive query execution(AQE)

AQE is automatic feature enabled for strategy choose in the running time.

  • Set the number of reducers to avoid wasting memory and I/O resource. Dynamically coalescing shuffle partitions.  
    • spark.sql.adaptive.enabled=true
    • spark.sql.adaptive.coalescePartitions.enabled=ture
AQE can merge serval short partitions into one reducer to even the pressure
  • select better join strategy to improve performance
    • dynamically choose from 3 join strategy. broadcast has best performance, but static strategy choose is not accurate sometimes. 
    • spark.sql.adaptive.enabled=true
AQE get the size of join table dynamically, so that it can choose broadcast rather then shuffle operation.
  • Optimize skewed join to avoid imbalance workload
    • the large partition is split into multiple partitions
    • spark.sql.adaptive.skewJoin.enabled=true
AQE split skewed partition into multiple partitions.

Dynamic partitioning pruning

We already peek part of it in explain format. Spark 3.0 is smart that avoid to read unnecessary partitions in a join operations by using results of filter operations in another table. for example,

SELECT * FROM dim_iteblog
JOIN fact_iteblog
ON (dim_iteblog.partcol = fact_iteblog.partcol)
WHERE dim_iteblog.othercol > 10

In this case, spark will do the prune prediction and add a new filter for join table “fact_iteblog”.

Enhanced nested column pruning & pushdown

  • nested column pruning can be applied to all operators, like limits, repartition
    • select col2._1 from(select col2 from tp limit1000)
  • parquet can apply pushdown filter and can read part of columns
    • spark.read.parquet(‘filename’).filter(‘col2._1 = 100’)

Improved aggregation code generation

  • Catalyst translates a given query to java code, Hotspot compiler in OpenJDK translates Java code into native code
  • HotSpot compiler gives up generating native code for more than 8000 Java bytecode instruction per method.
  • Catalyst splits a large java method into small ones  to allow hotspoot to generate native code

New Scala and Java (infrastructure updates)

  • Java 11
  • Scala 2.12

Summary

I think it is better to take a screenshot from presentation of Dr.Kazuaki Ishizaki to do the summary.

Reference

SQL Performance Improvements at a Glance in Apache Spark 3.0, https://www.iteblog.com/ppt/sparkaisummit-north-america-2020-iteblog/sql-performance-improvements-at-a-glance-in-apache-spark-30-iteblog.com.pdf

Spark 3.0.1 – Explain, http://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html

Mastering Query Plans in Spark 3.0, https://towardsdatascience.com/mastering-query-plans-in-spark-3-0-f4c334663aa4

Fast Filtering with Spark PartitionFilters and PushedFilters, https://mungingdata.com/apache-spark/partition-filters-pushed-filters/

Spark 3.0.1 – Hints, https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html

Machine Learning on Spark — How it works and why it doesn’t work well

Spark provides spark MLlib for machine learning in a scalable environment. MLlib includes three major parts: Transformer, Estimator and Pipeline. Essentially, transformer takes a dataframe as an input and returns a new data frame with more columns. Most featurization tasks are transformer. Estimator takes a dataframes as an input and returns a model(transformer), as we know the ML algorithms.. Pipeline combines transformer and estimator together.
Additionally, data frame becomes the primary API for MLlib. There is not any more new features for RDD based API in Spark MLib.

If you already understood or used high level machine learning or deep learning frameworks, like scikit-learning, keras, tersorflow, you will find everything is so familiar with. But when you use spark MLlib in practice, you still need third library’s help. I will talk about it in the end.

Basic Stats

# Corrlation
from pyspark.ml.stat import Correlation
r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

# Summarizer
from pyspark.ml.stat import Summarizer
# compute statistics for multiple metrics without weight
df.select(summarizer.summary(df.features)).show(truncate=False)

# ChiSquare
r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

Featurization

# TF-IDF
# stop word remove
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData)
# tokenize
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
# n grame
ngram = NGram(n=2, inputCol="wordsData", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
# word frequence
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# idf
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

# word2vec
word2Vec = Word2Vec(vectorSize=N, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

# binarizer
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)

# PCA
# reduce dimension to 3
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

# StringIndex
# encodes a string column of labels to a column of label of indices order by frequency or alphabet
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)

# OneHotEstimator
# we need to use StringIndex first if apply to categorical feature
encoder = OneHotEncoderEstimator(inputCols=["categoryIndex1", "categoryIndex2"],
                                 outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)encoded = model.transform(df)

# Normalize & Scaler
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
l1NormData = normalizer.transform(dataFrame)
# standard scaler
# withMean=false: standard deviation, withMean=true: mean 
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)
# maxmin scaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# max abs scaler
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# bin
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# QuantileDiscretizer
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

# ElementwiseProduct
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")

# SQL Transformer
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

# VectorAssembler
# combine vector together for future model inputs
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"], # the columns we need to combine
    outputCol="features") # output column

# Imputer
# handle missing value
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
imputer.setMissingValue(custom_value)

# slice vector
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

# ChiSqSelector
# use Chisqare to select the features
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

Clarification and Regression

# Linear regression
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# logistic regression
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) 
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial") # multinomial

# decision tree
# classification
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# regression
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Random forest
# classification
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# regeression
rf = RandomForestRegressor(featuresCol="indexedFeatures")
# gradient-boosted 
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# preceptron
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# SVM
# Linear SVM, there is no kernel SVM like RBF
lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Naive Bayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# KNN
knn = KNNClassifier().setTopTreeSize(training.count().toInt / 500).setK(10)

Clustering

# k-means
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# GMM
gmm = GaussianMixture().setK(2).setSeed(538009335)

Collaborative Filtering

als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)
# Evaluate the model by computing the RMSE on the test datapredictions = model.transform(test)evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

Validation

# split train and test
train, test = data.randomSplit([0.9, 0.1], seed=12345)
# cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

You might be already found the problem. The ecosystem of Spark MLlib is not as rich as scikit learning, and it is lack of deep learning (of course, its name is machine learning). According to Databricks documents, We still have the solutions.

  • Use scikit learning on single node. Very simple solution. but since scikit leanring load the data still in memory. If the note is faster enough(driver), we can get a good performance as well.
  • To solve deep learning problem. we have two work around methods.
    • Apply keras, tensorflow on single node with GPU acceleration(Recommend by databricks).
    • Distribute Training. It might be slower than on the single node because of communication overhead. There are two frameworks used for distribute training. Horovod and Apache SystemML. I’ve never use Horovod, but you can find information here. As to SystemML, it is more like a wrapper for high level API and provide cluster optimizer which parses the code into spark RDD(live variable analysis, propagate stats, rewrite by matrix decomposition and runtime instruction). From the official website, we know it is much faster than MLLib and native R. The problem is it didn’t update anymore since 2017.
# Create and save a single-hidden-layer Keras model for binary classification# NOTE: In a typical workflow, we'd train the model before exporting it to disk,# but we skip that step here for brevity
model = Sequential()
model.add(Dense(units=20, input_shape=[num_features], activation='relu'))
model.add(Dense(units=1, activation='sigmoid'))
model_path = "/tmp/simple-binary-classification"
model.save(model_path)

transformer = KerasTransformer(inputCol="features", outputCol="predictions", modelFile=model_path)

It seems no perfect solution for machine learning in spark, right? Don’t forget we have other time costing jobs: hyper parameters configuration and validation. We can run same model with different hyper parameters on different nodes using paramMap which similar to grid search or random search.

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [10, 100, 1000]).addGrid(lr.regParam, [0.1, 0.01]).build()
crossval = CrossValidator(estimator=pipeline,
                      estimatorParamMaps=paramGrid,
                      evaluator=RegressionEvaluator(),
                      numFolds=2)  # use 3+ folds in practice
cvModel = crossval.fit(training)

There might be someone saying: why we don’t use MPI? The answer is simple, too complex. Although it can gain the perfect performance, and you can do whatever you want even running distributed GPU + CPU codes, there are too many things we need to manually configuration on low level API without fail tolerance.

In conclusion, we can utilize spark for ELT and training/ validation model to maximize the performance(it did really well for these works). But until now, we still need third frameworks to help us do deep learning or machine learning tasks on single strong node.

Reference:

Spark MLlib: http://spark.apache.org/docs/latest/ml-guide.html

Databrick: https://docs.databricks.com/getting-started/index.html

Spark ML Tuning: http://spark.apache.org/docs/latest/ml-tuning.html

Harovod: https://github.com/horovod/horovod

Apache SystemML: https://systemml.apache.org/docs/1.1.0/beginners-guide-keras2dml

How to use Dataframe in pySpark (compared with SQL)

-- version 1.0: initial @20190428
-- version 1.1: add image processing, broadcast and accumulator
-- version 1.2: add ambiguous column handle, maptype

When we implement spark, there are two ways to manipulate data: RDD and Dataframe. I don’t know why in most of books, they start with RDD rather than Dataframe. Since RDD is more OOP and functional structure, it is not very friendly to the people like SQL, pandas or R. Then Dataframe comes, it looks like a star in the dark. The advantage of using Dataframe can be listed as follows:

  • Static-typing and runtime type-safety. We can know syntax error in compile time, saves developer lots of time.
  • High-level abstraction and tell what to do rather than how to do. If you ever touched pandas, well you will find they are almost same thing.
  • High performance. Yes. Dataframe is not only simple but also much faster than using RDD directly, As the optimization work has been done in the catalyst which generates an optimized logical and physical query plan.

For more information, we can find in this article.

After know why we need to use dataframe, let’s us see how to use it to handle daily work. To make it easier, I will compare dataframe operation with SQL.

Initializing Spark Session

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("example project") \
        .config("spark.some.config.option", "some-value") \ # set paramaters for spark
        .getOrCreate()

Create DataFrames

## From RDDs
>>> from pyspark.sql.types import *
# Infer Schema
>>> sc = spark.sparkContext
>>> lines = sc.textFile("people.txt")
>>> parts = lines.map(lambda l: l.split(","))
>>> people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
>>> peopledf = spark.createDataFrame(people)
# Specify Schema
>>> people = parts.map(lambda p: Row(name=p[0],
age=int(p[1].strip())))
>>> schemaString = "name age"
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
>>> schema = StructType(fields)
>>> spark.createDataFrame(people, schema).show()

## From Spark Data Source
# JSON
>>> df = spark.read.json("customer.json")

# Use Maptype to read dynamic columns from JSON
customSchema = StructType([
                StructField("col1", StringType(),True),
                                StructField("event", MapType(StringType(),StringType()))])
spark.read.schema(customSchema).jason(path)

# Parquet files
>>> df3 = spark.read.load("users.parquet")
# TXT files
>>> df4 = spark.read.text("people.txt")
# CSV files
>>> df5 = spark.read.format("csv").option("header", true).option("inferSchema", true).load("csvfile.csv")
# MS SQL
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"}
pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
# or we can use
# collection = spark.read.sqlDB(config)
display(df)

Dataframe Manipulation

from pyspark.sql import functions as F
# select & where
df.select("column1","column2", explod("phonenumber").alias("contactInfo"), df['age']>24).show()

# join
df = dfa.join(dfb, dfa.id==dfb.id & dfa.name == dfb.name, how ='left') 
df = dfa.join(dfb, dfa.id==dfb.id | dfa.name == dfb.name , how ='right')
df = dfa.join(dfb, dfa.id==dfb.id, how ='full')
df = dfa.join(dfb, dfa.id==dfb.id)
df = dfa.crossjoin(dfb)

# distinct count
from pyspark.sql.functions import countDistinct
df = df.groupby('col1','col2').agg(countDistinct("col3").alias("others"))

# ambiguous column handle
# both date and endpoint_id exist in two dataframes
df_result = df_result.join(df_result2, ["date","endpoint_id"],how="left") 

# exits and not exits
new_df = df.join(
    spark.table("target"),
    how='left_semi',
    on='id')

new_df = df.join(
    spark.table("target"),
    how='left_anti',
    on='id')

# when
df.select("first name", F.when(df.age>30,1).otherwise(0))

# like
df.select("firstName", df.lastName.like("Smith"))

# startwith-endwith
df.select("firstName", df.lastName.like("Smith"))

# substring
df.select(df.firstName.substr(1,3).alias("name"))

# between
df.select(df.age.between(22,24))

# add columns
df = df.withColumn('city',df.address.city) \
    .withColumn('postalCode',df.address.postalCode) \
    .withColumn('state',df.address.state) \
    .withColumn('streetAddress',df.address.streetAddress) \
    .withColumn('telePhoneNumber',
    explode(df.phoneNumber.number)) \
    .withColumn('telePhoneType',
    explode(df.phoneNumber.type))

# update column name
df = df.withColumnRenamed('prename','aftername')

# removing column
df = df.drop("ColumnName1","columnname2")

# group by
df.groupby("groupbycolumn").agg({"salary": "avg", "age": "max"})

# filter
df.filter(df["age"]>24).show()

# Sort
df.sort("age",ascending=False).collect()

# Missing & Replace values
df.na.fill(value)
df.na.drop()
df.na.replace(value1,value2)
df["age"].na.fill(value)

# repartitioning
df.repartition(10)\ df with 10 partitions
    .rdd \
    .getNumPartitions()
df.coalesce(1).rdd.getNumPartitions()

# union and unionAll
df.union(df2)

# windows function
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func
windowSpec = \
  Window
    .partitionBy(df['category']) \
    .orderBy(df['revenue'].desc()) \
    .rangeBetween(-3,3) # or rowframe:  .rowBetween(Window.unboundedPreceding, Window.currentRow)
dataFrame = sqlContext.table("productRevenue")
revenue_difference = \
  (func.max(dataFrame['revenue']).over(windowSpec) - dataFrame['revenue'])
dataFrame.select(
  dataFrame['product'],
  dataFrame['category'],
  dataFrame['revenue'],
  revenue_difference.alias("revenue_difference"))

from pyspark.sql.functions import percentRank, ntile
df.select(
    "k", "v",
    percentRank().over(windowSpec).alias("percent_rank"),
    ntile(3).over(windowSpec).alias("ntile3"))

# pivot & unpivot
df_data
    .groupby(df_data.id, df_data.type)
    .pivot("date")
    .agg(count("ship"))
    .show())

df.selectExpr(df_data.id, df_data.type, "stack(3, '2010', 2010, '2011', 2011, '2012', 2012) as (date, shipNumber)").where("shipNumber is not null").show()

# Remove Duplicate
df.dropDuplicates()

Running SQL queries

# registering Dataframe as vies
>>> peopledf.createGlobalTempView("people")
>>> df.createTempView("customer")
>>> df.createOrReplaceTempView("customer")

# Query view
>>> df5 = spark.sql("SELECT * FROM customer").show()
>>> peopledf2 = spark.sql("SELECT * FROM global_temp.people")\
.show()
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()

Output

# Data convert
rdd = df.rdd
df.toJSON().first()
df.toPandas()

# write and save
df.select("columnname").write.save("filename",format="jason")

Check data

>>> df.dtypes Return df column names and data types
>>> df.show() Display the content of df
>>> df.head() Return first n rows
>>> df.first() Return first row
>>> df.take(2) Return the first n rows
>>> df.schema Return the schema of df
>>> df.describe().show() Compute summary statistics
>>> df.columns Return the columns of df
>>> df.count() Count the number of rows in df
>>> df.distinct().count() Count the number of distinct rows in df
>>> df.printSchema() Print the schema of df
>>> df.explain() Print the (logical and physical) plans

Image Processing

# spark 2.3 provoid the ImageSchema.readImages API
image_df = spark.read.format("image").option("dropInvalid", true).load("/path/to/images")
# the structure of output dataframe is like
image: struct containing all the image data
 |    |-- origin: string representing the source URI
 |    |-- height: integer, image height in pixels
 |    |-- width: integer, image width in pixels
 |    |-- nChannels: integer, number of color channels
 |    |-- mode: integer, OpenCV type
 |    |-- data: binary, the actual image
# Then we can use sparkML to build and train the model, blew is a sample crop and resize process
from mmlspark import ImageTransformer
tr = (ImageTransformer() # images are resized and then cropped
    .setOutputCol(“transformed”)
    .resize(height = 200, width = 200)
    .crop(0, 0, height = 180, width = 180) )

smallImages = tr.transform(images_df).select(“transformed”)

Broadcast and Accumulator

# Broadcast is a read-only variable to reduce data transfer, mostly we use it for "lookup" operation. In Azure data warehouse, there is a similar structure named "Replicate".
from pyspark.sql import SQLContext
from pyspark.sql.functions import broadcast
 
sqlContext = SQLContext(sc)
df_tiny = sqlContext.sql('select * from tiny_table')
df_large = sqlContext.sql('select * from massive_table')
df3 = df_large.join(broadcast(df_tiny), df_large.some_sort_of_key == df_tiny.key)

# Accumulator is a write-only(except spark driver) structure to aggregate information across executor. We can understand it as a global variable, but write-only.
from pyspark import SparkContext 

sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(1) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([2,3,4,5]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)

Someone might ask ADF dataflow can do almost same thing, is there any difference? In my understanding till now, NO. ADF dataflow need to translate to spark SQL which is the same engine with dataframe. If you like coding and familiar with python and pandas, or you want to do some data exploration/data science tasks, choose dataframe, if you like GUI similar to SSIS to do something like ELT tasks, choose ADF dataflow.

Reference:

A Tale of Three Apache Spark APIs: RDDs vs DataFrames and Datasets, https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

PySpark Cheat Sheet: Spark DataFrames in Python, https://www.datacamp.com/community/blog/pyspark-sql-cheat-sheet

pyspark.sql module, http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html

Spark Overview, http://spark.apache.org/docs/latest/

Get Rid of ETL , Move to Spark.

ETL is the most common tool in the process of building EDW, of course the first step in data integration. As big data emerging, we would find more and more customer starting using hadoop and spark. Personally, I agree the idea that spark will replace most ETL tools.

Background

  • Business Intelligence -> big data
  • Data warehouse -> data lake
  • Applications -> Micro services

ETL hell

  • Data getting out of sync, each copy is a risk.
  • Performance issues and waste of server resource(peek Performance), although ETL can do limited parallel work.
  • Plain-text code in hidden stages(VB or java typical)
  • CSV files are not type safe
  • all or nothing approach in batch jobs.
  • legacy code

Spark for ETL

  • parallel processing in build in
  • using steaming to parallel ETL
  • Hadoop which is data source, we don’t need copy and reduce risk
  • just one code(scala or python)
  • Machine learning included
  • security, unit testing, Performance measurement , excepting handling, monitoring

Code Demo

  1. Simple one
spark.read.json("/sourcepath") #extract
.filter(...)   # Transform and blew
.agg(...)
.write.mode("append")  # Load
.parquet("/outputpath")

2.Steam

# @param1: master
# @param2: appname
sc = SparkContext("local[2]", "NetworkWordCount")
# @param1: spark context
# @param2: seconds
ssc = StreamingContext(sc, 1)
steam = ssc.textFileStream("path")
# do transform
# do load
ssc.start()
ssc.awaitTermination()

reference:
1. https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html
2.https://databricks.com/session/get-rid-of-traditional-etl-move-to-spark
3.https://www.slideshare.net/databricks/building-robust-etl-pipelines-with-apache-spark

Simple global warming analysis by pyspark

Purpose

Using pyspark to help analysis the situation of global warming. The data is from NCDC(http://www.ncdc.noaa.gov/) through 1980 to 1989 and 2000 to 2009(except 1984 and 2004).

Two Stretage

  • Get the max/min temperature and max wind speed only filter mistaken data(9999). Steps are as follows:
    • Load files into RDD: sc.textFile("/home/DATA/NOAA_weather/{198[0-3],198[5-9]}/*.gz")
    • Extract fields from files through map function: parse_record, return a key-value(tuple) data.
    • Filter 9999 data: .filter(lambda x: x[1]!=9999)
    • reducebyKey to get max or min data ( the key is year): .reduceByKey(lambda x,y: max(x,y)
  • Get the average temperature and avg wind speed by year, latitude and longitude of station which is a fixed land station.
    • Load files into RDD. Same as mapreduce
    • Load RDD to Dataframe. sqlContext.createDataFrame(all_fields,schema=["date","report_type","lat","lon","wind_speed","wind_qulity","temp"])
    • Filter error data(9999) and station type(FM-12) df.where((df['lat']!='+9999') & (df['lon']!='+9999') & (df['wind_speed']!=9999) & (df['temp']!=9999) & (df['report_type']=='FM-12'))
    • aggregate average by year, latitude and longitude:df.groupBy(['date',"lat","lon"]).agg({"wind_speed":"avg","temp":"avg"})

Result and visualization

  • the max/min temperature and max wind speed(based on stretage 1.)
year, max_temp(10x), min_temp(10x), max_wind_speed(10x)
1980,600,-780,617
1981,580,-850,618
1983,616,-931,618
1984,617,-932,618
1982,617,-930,700
1986,607,-901,607
1987,607,-900,602
1985,611,-932,618
1989,606,-900,900
2000,568,-900,900
1988,607,-900,618
2002,568,-932,900
2001,568,-900,900
2003,565,-900,900
2005,610,-925,900
2006,610,-917,900
2007,610,-900,900
2008,610,-900,900
2009,610,-854,900

These extreme data maybe happen in special area, like Antarctica or Sahara Desert.

  • the average temperature and avg wind speed by year, latitude and longitude(based on stretage 2.)
img
img

Through trend line chat, we can figure out the temperature in 2000 to 2009 is significantly higher(1.5-2.0℃) than in 1980 to 1989. Also the max wind speed is greater than mostly before.

Let’s create another chat, which shows the temperature difference between two decades.

img

The red color means weather becomes warm, otherwise weather becomes cold. In the most of countries, the red dots are more than cold ones. The most serious area is Europe.

Files

  • infor_extra.py: main file to execute in spark, output a single csv file in the folder named output1
  • weather_gap.py: caculate the gap between two decades, export to output.csv
  • weather_overview.twb and weather_gap.twb are two tableau files for visualization.
  • If out of memory or GC error happens, please squeeze the year range in infor_extra.pyrdd = sc.textFile("/home/DATA/NOAA_weather/{198[0-3],198[5-9]}/*.gz")

Code: https://github.com/neoaksa/HPC/tree/master/6.Global_Warming