Machine Learning on Spark — How it works and why it doesn’t work well

Spark provides spark MLlib for machine learning in a scalable environment. MLlib includes three major parts: Transformer, Estimator and Pipeline. Essentially, transformer takes a dataframe as an input and returns a new data frame with more columns. Most featurization tasks are transformer. Estimator takes a dataframes as an input and returns a model(transformer), as we know the ML algorithms.. Pipeline combines transformer and estimator together.
Additionally, data frame becomes the primary API for MLlib. There is not any more new features for RDD based API in Spark MLib.

If you already understood or used high level machine learning or deep learning frameworks, like scikit-learning, keras, tersorflow, you will find everything is so familiar with. But when you use spark MLlib in practice, you still need third library’s help. I will talk about it in the end.

Basic Stats

# Corrlation
from pyspark.ml.stat import Correlation
r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

# Summarizer
from pyspark.ml.stat import Summarizer
# compute statistics for multiple metrics without weight
df.select(summarizer.summary(df.features)).show(truncate=False)

# ChiSquare
r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

Featurization

# TF-IDF
# stop word remove
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData)
# tokenize
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
# n grame
ngram = NGram(n=2, inputCol="wordsData", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
# word frequence
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# idf
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

# word2vec
word2Vec = Word2Vec(vectorSize=N, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

# binarizer
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)

# PCA
# reduce dimension to 3
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

# StringIndex
# encodes a string column of labels to a column of label of indices order by frequency or alphabet
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)

# OneHotEstimator
# we need to use StringIndex first if apply to categorical feature
encoder = OneHotEncoderEstimator(inputCols=["categoryIndex1", "categoryIndex2"],
                                 outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)encoded = model.transform(df)

# Normalize & Scaler
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
l1NormData = normalizer.transform(dataFrame)
# standard scaler
# withMean=false: standard deviation, withMean=true: mean 
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)
# maxmin scaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# max abs scaler
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# bin
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# QuantileDiscretizer
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

# ElementwiseProduct
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")

# SQL Transformer
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

# VectorAssembler
# combine vector together for future model inputs
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"], # the columns we need to combine
    outputCol="features") # output column

# Imputer
# handle missing value
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
imputer.setMissingValue(custom_value)

# slice vector
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

# ChiSqSelector
# use Chisqare to select the features
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

Clarification and Regression

# Linear regression
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# logistic regression
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) 
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial") # multinomial

# decision tree
# classification
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# regression
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Random forest
# classification
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# regeression
rf = RandomForestRegressor(featuresCol="indexedFeatures")
# gradient-boosted 
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# preceptron
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# SVM
# Linear SVM, there is no kernel SVM like RBF
lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Naive Bayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# KNN
knn = KNNClassifier().setTopTreeSize(training.count().toInt / 500).setK(10)

Clustering

# k-means
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# GMM
gmm = GaussianMixture().setK(2).setSeed(538009335)

Collaborative Filtering

als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)
# Evaluate the model by computing the RMSE on the test datapredictions = model.transform(test)evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

Validation

# split train and test
train, test = data.randomSplit([0.9, 0.1], seed=12345)
# cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

You might be already found the problem. The ecosystem of Spark MLlib is not as rich as scikit learning, and it is lack of deep learning (of course, its name is machine learning). According to Databricks documents, We still have the solutions.

  • Use scikit learning on single node. Very simple solution. but since scikit leanring load the data still in memory. If the note is faster enough(driver), we can get a good performance as well.
  • To solve deep learning problem. we have two work around methods.
    • Apply keras, tensorflow on single node with GPU acceleration(Recommend by databricks).
    • Distribute Training. It might be slower than on the single node because of communication overhead. There are two frameworks used for distribute training. Horovod and Apache SystemML. I’ve never use Horovod, but you can find information here. As to SystemML, it is more like a wrapper for high level API and provide cluster optimizer which parses the code into spark RDD(live variable analysis, propagate stats, rewrite by matrix decomposition and runtime instruction). From the official website, we know it is much faster than MLLib and native R. The problem is it didn’t update anymore since 2017.
# Create and save a single-hidden-layer Keras model for binary classification# NOTE: In a typical workflow, we'd train the model before exporting it to disk,# but we skip that step here for brevity
model = Sequential()
model.add(Dense(units=20, input_shape=[num_features], activation='relu'))
model.add(Dense(units=1, activation='sigmoid'))
model_path = "/tmp/simple-binary-classification"
model.save(model_path)

transformer = KerasTransformer(inputCol="features", outputCol="predictions", modelFile=model_path)

It seems no perfect solution for machine learning in spark, right? Don’t forget we have other time costing jobs: hyper parameters configuration and validation. We can run same model with different hyper parameters on different nodes using paramMap which similar to grid search or random search.

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [10, 100, 1000]).addGrid(lr.regParam, [0.1, 0.01]).build()
crossval = CrossValidator(estimator=pipeline,
                      estimatorParamMaps=paramGrid,
                      evaluator=RegressionEvaluator(),
                      numFolds=2)  # use 3+ folds in practice
cvModel = crossval.fit(training)

There might be someone saying: why we don’t use MPI? The answer is simple, too complex. Although it can gain the perfect performance, and you can do whatever you want even running distributed GPU + CPU codes, there are too many things we need to manually configuration on low level API without fail tolerance.

In conclusion, we can utilize spark for ELT and training/ validation model to maximize the performance(it did really well for these works). But until now, we still need third frameworks to help us do deep learning or machine learning tasks on single strong node.

Reference:

Spark MLlib: http://spark.apache.org/docs/latest/ml-guide.html

Databrick: https://docs.databricks.com/getting-started/index.html

Spark ML Tuning: http://spark.apache.org/docs/latest/ml-tuning.html

Harovod: https://github.com/horovod/horovod

Apache SystemML: https://systemml.apache.org/docs/1.1.0/beginners-guide-keras2dml

How to use Dataframe in pySpark (compared with SQL)

-- version 1.0: initial @20190428
-- version 1.1: add image processing, broadcast and accumulator
-- version 1.2: add ambiguous column handle, maptype

When we implement spark, there are two ways to manipulate data: RDD and Dataframe. I don’t know why in most of books, they start with RDD rather than Dataframe. Since RDD is more OOP and functional structure, it is not very friendly to the people like SQL, pandas or R. Then Dataframe comes, it looks like a star in the dark. The advantage of using Dataframe can be listed as follows:

  • Static-typing and runtime type-safety. We can know syntax error in compile time, saves developer lots of time.
  • High-level abstraction and tell what to do rather than how to do. If you ever touched pandas, well you will find they are almost same thing.
  • High performance. Yes. Dataframe is not only simple but also much faster than using RDD directly, As the optimization work has been done in the catalyst which generates an optimized logical and physical query plan.

For more information, we can find in this article.

After know why we need to use dataframe, let’s us see how to use it to handle daily work. To make it easier, I will compare dataframe operation with SQL.

Initializing Spark Session

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("example project") \
        .config("spark.some.config.option", "some-value") \ # set paramaters for spark
        .getOrCreate()

Create DataFrames

## From RDDs
>>> from pyspark.sql.types import *
# Infer Schema
>>> sc = spark.sparkContext
>>> lines = sc.textFile("people.txt")
>>> parts = lines.map(lambda l: l.split(","))
>>> people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
>>> peopledf = spark.createDataFrame(people)
# Specify Schema
>>> people = parts.map(lambda p: Row(name=p[0],
age=int(p[1].strip())))
>>> schemaString = "name age"
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
>>> schema = StructType(fields)
>>> spark.createDataFrame(people, schema).show()

## From Spark Data Source
# JSON
>>> df = spark.read.json("customer.json")

# Use Maptype to read dynamic columns from JSON
customSchema = StructType([
                StructField("col1", StringType(),True),
                                StructField("event", MapType(StringType(),StringType()))])
spark.read.schema(customSchema).jason(path)

# Parquet files
>>> df3 = spark.read.load("users.parquet")
# TXT files
>>> df4 = spark.read.text("people.txt")
# CSV files
>>> df5 = spark.read.format("csv").option("header", true).option("inferSchema", true).load("csvfile.csv")
# MS SQL
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"}
pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
# or we can use
# collection = spark.read.sqlDB(config)
display(df)

Dataframe Manipulation

from pyspark.sql import functions as F
# select & where
df.select("column1","column2", explod("phonenumber").alias("contactInfo"), df['age']>24).show()

# join
df = dfa.join(dfb, dfa.id==dfb.id & dfa.name == dfb.name, how ='left') 
df = dfa.join(dfb, dfa.id==dfb.id | dfa.name == dfb.name , how ='right')
df = dfa.join(dfb, dfa.id==dfb.id, how ='full')
df = dfa.join(dfb, dfa.id==dfb.id)
df = dfa.crossjoin(dfb)

# distinct count
from pyspark.sql.functions import countDistinct
df = df.groupby('col1','col2').agg(countDistinct("col3").alias("others"))

# ambiguous column handle
# both date and endpoint_id exist in two dataframes
df_result = df_result.join(df_result2, ["date","endpoint_id"],how="left") 

# exits and not exits
new_df = df.join(
    spark.table("target"),
    how='left_semi',
    on='id')

new_df = df.join(
    spark.table("target"),
    how='left_anti',
    on='id')

# when
df.select("first name", F.when(df.age>30,1).otherwise(0))

# like
df.select("firstName", df.lastName.like("Smith"))

# startwith-endwith
df.select("firstName", df.lastName.like("Smith"))

# substring
df.select(df.firstName.substr(1,3).alias("name"))

# between
df.select(df.age.between(22,24))

# add columns
df = df.withColumn('city',df.address.city) \
    .withColumn('postalCode',df.address.postalCode) \
    .withColumn('state',df.address.state) \
    .withColumn('streetAddress',df.address.streetAddress) \
    .withColumn('telePhoneNumber',
    explode(df.phoneNumber.number)) \
    .withColumn('telePhoneType',
    explode(df.phoneNumber.type))

# update column name
df = df.withColumnRenamed('prename','aftername')

# removing column
df = df.drop("ColumnName1","columnname2")

# group by
df.groupby("groupbycolumn").agg({"salary": "avg", "age": "max"})

# filter
df.filter(df["age"]>24).show()

# Sort
df.sort("age",ascending=False).collect()

# Missing & Replace values
df.na.fill(value)
df.na.drop()
df.na.replace(value1,value2)
df["age"].na.fill(value)

# repartitioning
df.repartition(10)\ df with 10 partitions
    .rdd \
    .getNumPartitions()
df.coalesce(1).rdd.getNumPartitions()

# union and unionAll
df.union(df2)

# windows function
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func
windowSpec = \
  Window
    .partitionBy(df['category']) \
    .orderBy(df['revenue'].desc()) \
    .rangeBetween(-3,3) # or rowframe:  .rowBetween(Window.unboundedPreceding, Window.currentRow)
dataFrame = sqlContext.table("productRevenue")
revenue_difference = \
  (func.max(dataFrame['revenue']).over(windowSpec) - dataFrame['revenue'])
dataFrame.select(
  dataFrame['product'],
  dataFrame['category'],
  dataFrame['revenue'],
  revenue_difference.alias("revenue_difference"))

from pyspark.sql.functions import percentRank, ntile
df.select(
    "k", "v",
    percentRank().over(windowSpec).alias("percent_rank"),
    ntile(3).over(windowSpec).alias("ntile3"))

# pivot & unpivot
df_data
    .groupby(df_data.id, df_data.type)
    .pivot("date")
    .agg(count("ship"))
    .show())

df.selectExpr(df_data.id, df_data.type, "stack(3, '2010', 2010, '2011', 2011, '2012', 2012) as (date, shipNumber)").where("shipNumber is not null").show()

# Remove Duplicate
df.dropDuplicates()

Running SQL queries

# registering Dataframe as vies
>>> peopledf.createGlobalTempView("people")
>>> df.createTempView("customer")
>>> df.createOrReplaceTempView("customer")

# Query view
>>> df5 = spark.sql("SELECT * FROM customer").show()
>>> peopledf2 = spark.sql("SELECT * FROM global_temp.people")\
.show()
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()

Output

# Data convert
rdd = df.rdd
df.toJSON().first()
df.toPandas()

# write and save
df.select("columnname").write.save("filename",format="jason")

Check data

>>> df.dtypes Return df column names and data types
>>> df.show() Display the content of df
>>> df.head() Return first n rows
>>> df.first() Return first row
>>> df.take(2) Return the first n rows
>>> df.schema Return the schema of df
>>> df.describe().show() Compute summary statistics
>>> df.columns Return the columns of df
>>> df.count() Count the number of rows in df
>>> df.distinct().count() Count the number of distinct rows in df
>>> df.printSchema() Print the schema of df
>>> df.explain() Print the (logical and physical) plans

Image Processing

# spark 2.3 provoid the ImageSchema.readImages API
image_df = spark.read.format("image").option("dropInvalid", true).load("/path/to/images")
# the structure of output dataframe is like
image: struct containing all the image data
 |    |-- origin: string representing the source URI
 |    |-- height: integer, image height in pixels
 |    |-- width: integer, image width in pixels
 |    |-- nChannels: integer, number of color channels
 |    |-- mode: integer, OpenCV type
 |    |-- data: binary, the actual image
# Then we can use sparkML to build and train the model, blew is a sample crop and resize process
from mmlspark import ImageTransformer
tr = (ImageTransformer() # images are resized and then cropped
    .setOutputCol(“transformed”)
    .resize(height = 200, width = 200)
    .crop(0, 0, height = 180, width = 180) )

smallImages = tr.transform(images_df).select(“transformed”)

Broadcast and Accumulator

# Broadcast is a read-only variable to reduce data transfer, mostly we use it for "lookup" operation. In Azure data warehouse, there is a similar structure named "Replicate".
from pyspark.sql import SQLContext
from pyspark.sql.functions import broadcast
 
sqlContext = SQLContext(sc)
df_tiny = sqlContext.sql('select * from tiny_table')
df_large = sqlContext.sql('select * from massive_table')
df3 = df_large.join(broadcast(df_tiny), df_large.some_sort_of_key == df_tiny.key)

# Accumulator is a write-only(except spark driver) structure to aggregate information across executor. We can understand it as a global variable, but write-only.
from pyspark import SparkContext 

sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(1) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([2,3,4,5]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)

Someone might ask ADF dataflow can do almost same thing, is there any difference? In my understanding till now, NO. ADF dataflow need to translate to spark SQL which is the same engine with dataframe. If you like coding and familiar with python and pandas, or you want to do some data exploration/data science tasks, choose dataframe, if you like GUI similar to SSIS to do something like ELT tasks, choose ADF dataflow.

Reference:

A Tale of Three Apache Spark APIs: RDDs vs DataFrames and Datasets, https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

PySpark Cheat Sheet: Spark DataFrames in Python, https://www.datacamp.com/community/blog/pyspark-sql-cheat-sheet

pyspark.sql module, http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html

Spark Overview, http://spark.apache.org/docs/latest/

Get Rid of ETL , Move to Spark.

ETL is the most common tool in the process of building EDW, of course the first step in data integration. As big data emerging, we would find more and more customer starting using hadoop and spark. Personally, I agree the idea that spark will replace most ETL tools.

Background

  • Business Intelligence -> big data
  • Data warehouse -> data lake
  • Applications -> Micro services

ETL hell

  • Data getting out of sync, each copy is a risk.
  • Performance issues and waste of server resource(peek Performance), although ETL can do limited parallel work.
  • Plain-text code in hidden stages(VB or java typical)
  • CSV files are not type safe
  • all or nothing approach in batch jobs.
  • legacy code

Spark for ETL

  • parallel processing in build in
  • using steaming to parallel ETL
  • Hadoop which is data source, we don’t need copy and reduce risk
  • just one code(scala or python)
  • Machine learning included
  • security, unit testing, Performance measurement , excepting handling, monitoring

Code Demo

  1. Simple one
spark.read.json("/sourcepath") #extract
.filter(...)   # Transform and blew
.agg(...)
.write.mode("append")  # Load
.parquet("/outputpath")

2.Steam

# @param1: master
# @param2: appname
sc = SparkContext("local[2]", "NetworkWordCount")
# @param1: spark context
# @param2: seconds
ssc = StreamingContext(sc, 1)
steam = ssc.textFileStream("path")
# do transform
# do load
ssc.start()
ssc.awaitTermination()

reference:
1. https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html
2.https://databricks.com/session/get-rid-of-traditional-etl-move-to-spark
3.https://www.slideshare.net/databricks/building-robust-etl-pipelines-with-apache-spark

Simple global warming analysis by pyspark

Purpose

Using pyspark to help analysis the situation of global warming. The data is from NCDC(http://www.ncdc.noaa.gov/) through 1980 to 1989 and 2000 to 2009(except 1984 and 2004).

Two Stretage

  • Get the max/min temperature and max wind speed only filter mistaken data(9999). Steps are as follows:
    • Load files into RDD: sc.textFile("/home/DATA/NOAA_weather/{198[0-3],198[5-9]}/*.gz")
    • Extract fields from files through map function: parse_record, return a key-value(tuple) data.
    • Filter 9999 data: .filter(lambda x: x[1]!=9999)
    • reducebyKey to get max or min data ( the key is year): .reduceByKey(lambda x,y: max(x,y)
  • Get the average temperature and avg wind speed by year, latitude and longitude of station which is a fixed land station.
    • Load files into RDD. Same as mapreduce
    • Load RDD to Dataframe. sqlContext.createDataFrame(all_fields,schema=["date","report_type","lat","lon","wind_speed","wind_qulity","temp"])
    • Filter error data(9999) and station type(FM-12) df.where((df['lat']!='+9999') & (df['lon']!='+9999') & (df['wind_speed']!=9999) & (df['temp']!=9999) & (df['report_type']=='FM-12'))
    • aggregate average by year, latitude and longitude:df.groupBy(['date',"lat","lon"]).agg({"wind_speed":"avg","temp":"avg"})

Result and visualization

  • the max/min temperature and max wind speed(based on stretage 1.)
year, max_temp(10x), min_temp(10x), max_wind_speed(10x)
1980,600,-780,617
1981,580,-850,618
1983,616,-931,618
1984,617,-932,618
1982,617,-930,700
1986,607,-901,607
1987,607,-900,602
1985,611,-932,618
1989,606,-900,900
2000,568,-900,900
1988,607,-900,618
2002,568,-932,900
2001,568,-900,900
2003,565,-900,900
2005,610,-925,900
2006,610,-917,900
2007,610,-900,900
2008,610,-900,900
2009,610,-854,900

These extreme data maybe happen in special area, like Antarctica or Sahara Desert.

  • the average temperature and avg wind speed by year, latitude and longitude(based on stretage 2.)
img
img

Through trend line chat, we can figure out the temperature in 2000 to 2009 is significantly higher(1.5-2.0℃) than in 1980 to 1989. Also the max wind speed is greater than mostly before.

Let’s create another chat, which shows the temperature difference between two decades.

img

The red color means weather becomes warm, otherwise weather becomes cold. In the most of countries, the red dots are more than cold ones. The most serious area is Europe.

Files

  • infor_extra.py: main file to execute in spark, output a single csv file in the folder named output1
  • weather_gap.py: caculate the gap between two decades, export to output.csv
  • weather_overview.twb and weather_gap.twb are two tableau files for visualization.
  • If out of memory or GC error happens, please squeeze the year range in infor_extra.pyrdd = sc.textFile("/home/DATA/NOAA_weather/{198[0-3],198[5-9]}/*.gz")

Code: https://github.com/neoaksa/HPC/tree/master/6.Global_Warming