Delta Lake Step by Step(1)

Before we start to talk about delta lake, we have to take time to deal with data lake and understand why we need to use data lake. Blew is the best definition I think.

A data lake is a repository for structured, unstructured, and semi-structured data. Data lakes are much different from data warehouses since they allow data to be in its rawest form without needing to be converted and analyzed first.

Devin Pickell 

Data Lake stored everything(raw) with every format. In a short nut, it is a repository. e.g. in Azure, Data Lake Gen2 is a repository based on Blob storage.

What’s the difference between data lake and data warehouse?

Table 1. Difference between data warehouse and data lake

From table 1, we can figure out that dw and dl use in the different scenarios. Data warehouse is used for the classic data analysis, and the data stored in structured rational table; while data lake is used for big data, ELT situation, every raw data coming into data lake then doing following actions. Since data lake is on the cluster cloud, it is very faster to handle the raw data.

What is the downside of data lake?

  • No Transaction.
  • Schema on read.
  • No version control.

These three downsides are very critical in applying data lake into real project. It will affect data quality and efficiency both. It is why most big data projects are failed. Because the data lake can not provide stable data like the database.

Delta Lake comes out!

Figure 1. Delta Lake created ACID and Transaction based on Parquet.

Delta Lake has its own abilities which enables a stable data lake for analysis.

  • Open source storage layer, based on parquet file
  • ACID transaction: serialize isolation
  • Scalable meta data handling: spark distribution data processing
  • stream and batch uniform: batch table as well as stream source
  • Schema enforcement
  • Time travel: data version

Let’s see some simple samples. If you are familiar with spark, you can change to delta lake super quick.

# set up
# upgrade pyspark
pip install --upgrade pyspark
# install
pyspark --packages io.delta:delta-core_2.12:0.1.0


# write table
df= spark.read.csv("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", header="true", inferSchema="true")
df.write.format("delta").save("/delta/diamonds")
# append
df.write.format("delta").mode("append").save("/delta/events")
# we can also partition the table
df.write.format("delta").partitionBy("date").save("/delta/events")
# update
data.write.format("delta").mode("overwrite").option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'").save("/tmp/delta-table")
# by default overwrite doesn't change schema, only if option("overwriteSchema", "true") exiting
# add column automatically
df.write.format("delta").mode("append").option("mergeSchema", "true").save("/delta/events")


# read
df = spark.read.format("delta").load("/tmp/delta-table")
SELECT * FROM delta.`/tmp/delta-table`

# time travel
# create a read version 
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events") 
# For timestamp_string, only date or timestamp strings are accepted
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

# structured stream
streamingDf = spark.readStream.format("delta").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
# set mini-batch
spark.readStream.format("delta").option("maxFilesPerTrigger",1000).load()
# ignore updates and deletes with "ignoreDeletes" and "ignoreChanges"
# append mode and complete mode
streamingDf = spark.writeStream.format("delta").outputMode("append").load() 
streamingDf = spark.writeStream.format("delta").outputMode("complete").load()

stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()
stream.stop()

# check point
.option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")

In next article, I will talk about when to use delta lake and how to optimize it in data brick.

Here is one of my example which covert three small files into one delta table: https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2499619125013057/2952616815059893/2608746070181068/latest.html

Reference:

  1. Delta Lake Document. https://delta.io
  2. What Is a Data Lake and Why Is It Essential for Big Data? https://learn.g2.com/what-is-a-data-lake
  3. Delta Lake Document for databricks. https://docs.databricks.com/delta

Night Sight with Google Camera

As an amateur photographer, I was believing DSL is better than phone camera since it has a much larger CMOS so that it can receive more photons, until I installed Google Camera on my Galaxy S8. The results surprised me a lot. I mean, it performed better than my Conan 5DIII in the most of case without editing in the software like photoshop.

Except HDR+ and portrait mode, Google camera provides a magic mode called Night Sight. Unfortunately, this can only work on Pixel series (Pixel 3/3a is the best) phone with hardware support. You can find the A/B Testing here.

left: disable Night Sight; right: enable the Night Sight.

How does Night Sight improve the quality of shots in the night?

  • HDR+. HDR+ is the foundational function for Night Sight. It is a computational photography technology that improves this situation by capturing a burst of frames, aligning the frames in software, and merging them together. Since each frame is short enough to prevent the blur caused by hand shake, the result turns out to be sharper and wilder dynamic range than without HDR+.
This image has an empty alt attribute; its file name is IMG_20190518_101130.jpg

with HDR+, we can clear see the background in an indoor place
  • Positive-shutter-lag (PSL) . In the regular mode, Google camera uses zero-shutter-lag (ZSL) protocol which  limits exposures to at most 66ms no matter how dim the scene is, and allows our viewfinder to keep up a display rate of at least 15 frames per second. Using PSL means you need to hold still for a short time after pressing the shutter, but it allows the use of longer exposures, thereby improving SNR at much lower brightness levels. 
  • Motion metering. Optical image stabilization (OIS) is widely used in many devices to prevent hand shake. But it doesn’t help for long exposure and motion object. Pixel 3 adds motion metering to detect the motion object and adjust the exposure time for each frame. For example, if it detects a dog moving in the frame or we are using the tripod, it will increase exposure time.
  • Super Res Zoom.  HDR essentially uses algorithm to aliment and merge the frames to increase the SNR( signal to noise ratio). Pixel 3 provides a new algorithm called Super Res Zoom for super-resolution and reduce noise, since it averages multiple images together. Super Res Zoom produces better results for some nighttime scenes than HDR+, but it requires the faster processor of the Pixel 3.
  • Learning-based AWB algorithm. When it is dim, AWB( auto white balance) is not functional well. And it is an ill-posed problem, which means we cannot inverse the problem (find out the real color of object in the dark). In this case, Google camera utilizes machine learning to “guess” what is the true color and shift the white balance.
  •  S-curve into our tone mapping. As we know, if we exposure a picture for a long time, all the objects become brighter so that we can not figure out when this picture takes. Google uses sigmoid function to adjust the object brightness ( dark objects become darker, light objects become brighter).

Reference: Night Sight: Seeing in the Dark on Pixel Phones, https://ai.googleblog.com/2018/11/night-sight-seeing-in-dark-on-pixel.html

Brief Talk Object Detection Algorithm of YOLO

Figure 1: YOLO: Real-Time Object Detection

YOLO also know as You Only Look Once. Not like R-CNN, YOLO uses single CNN to do the object detection as well as localization which makes it super faster than R-CNN with only losing a little accuracy. From 2016 to 2018, YOLO has been imporved from v1 to v3. In this article, I will use a simple way to explain how YOLO works.

What tasks we need to solve in object detection problem?

Yolo use the same method as human to detect the object. There are three major steps: 1. is it an object? 2. what object is it? 3. where is the position and size of this object. BUT! Through CNN, YOLO can do these three things all together.

How YOLO solve this problem?

First, Let’s introduce Grid Cell in YOLO. The whole input image is divided into S \times S grid. Each grid cell predicts only one objects with fixed boundary boxes( say #B). For each boundary box has its own box confidence score to reflet the possibility of object. For each grid cell it predicts C conditional class probabilities( one per class). so that we will get
S\times S \times (B*5+C) predictions. Here 5 means central location(x,y), size( h,w) and confidence score of each boundary box.

Figure 2: the cell with red mark predicts two boundary boxes for a single object.

Then you will find so many boundary box from output. How we choose of them? Here we need to do Non-max suppression. The step is as blew:

  1. discard all boxes with box confidence less then a threshold. ( say 0.65)
  2. While there are any renaming box(overlapping):
    1. pick the box with the largest confidence that as a prediction
    2. discard any remaining boxes with IoU(intersection over union: you can see it as overlap size between two boundary box) greater than a threshold(say 0.5)
Figure 3: boundary boxes for each grid cell

After Non-max suppression, we need to calculate class confidence score , which equals to box confidence score * conditional class probability. Finally, we get the object with probability and its localization. (see Figure 1)

YOLO Network Design

Let’s see how YOLO v1 looks like. Input = 448*448 image, output = S\times S \times (B*5+C). There are 24 convolutional layers followed by 2 full connected layer for localization. It use sum-squared error between the predictions and ground truth to calculate loss which is consist of classification loss, localization loss and confidence loss.

Figure 4: YOLO v1 architecture

Classification loss:

Localization loss:

Confidence loss:

an object is detected in the box
an object is not detected in the box

The final loss add three components together.

YOLO V2

YOLO v2 improves accuracy compared with YOLO v1.

  1. Add batch normalization on all of the convolutional layers. It get more than 2% improvement in accuracy.
  2. High-resolution classifier. First fine tune the classification network at the full 448 \times 448 resolution for 10 epochs on IMageNet. This gives network time to adjust tis filters to work better on higher resolution input.
  3. Convolutional with Anchor Boxes. YOLO v1 can only predicts 98 boxes per images and it makes arbitrary guesses on the boundary boxes which leads to bad generalization, but with anchor boxes, YOLO v2 predicts more than a thousand. Then it use dimension cluster and direct location prediction to get the boundary box.
  4. Dimension Cluster. use K-mean to get the boundary boxes patterns. Figure 5 might be the most common boundary boxes in spec dataset.
Figure 5: 5 anchor boxes
Figure 6: remove the class prediction from the cell level to the boundary box level

5. Direct location prediction. Since we use anchor boxes, we have to predict on the offsets to these anchors.

Figure 6: Bounding boxes with dimension priors and location prediction

6. Multi-Scale Training. Every 10 batches, YOLOv2 randomly selects another image size to train the model. This acts as data augmentation and forces the network to predict well for different input image dimension and scale. 

YOLO v3

  1. Detection at three scales. YOLOv3 predicts boxes at 3 different scales. Then features are extracted from each scale by using a method similar to that of feature pyramid networks
  2. Bounding box predictions. YOLO v3 predicts the object score using logistic regression.
  3. Class prediction. Use independent logistic classifiers instead of softmax. This is done to make the classification multi-able classification.

Reference:

YOLOv1 : https://arxiv.org/abs/1506.02640

YOLOv2 : https://pjreddie.com/media/files/papers/YOLO9000.pdf

YOLOv3 : https://pjreddie.com/media/files/papers/YOLOv3.pdf

Machine Learning on Spark — How it works and why it doesn’t work well

Spark provides spark MLlib for machine learning in a scalable environment. MLlib includes three major parts: Transformer, Estimator and Pipeline. Essentially, transformer takes a dataframe as an input and returns a new data frame with more columns. Most featurization tasks are transformer. Estimator takes a dataframes as an input and returns a model(transformer), as we know the ML algorithms.. Pipeline combines transformer and estimator together.
Additionally, data frame becomes the primary API for MLlib. There is not any more new features for RDD based API in Spark MLib.

If you already understood or used high level machine learning or deep learning frameworks, like scikit-learning, keras, tersorflow, you will find everything is so familiar with. But when you use spark MLlib in practice, you still need third library’s help. I will talk about it in the end.

Basic Stats

# Corrlation
from pyspark.ml.stat import Correlation
r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

# Summarizer
from pyspark.ml.stat import Summarizer
# compute statistics for multiple metrics without weight
df.select(summarizer.summary(df.features)).show(truncate=False)

# ChiSquare
r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

Featurization

# TF-IDF
# stop word remove
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData)
# tokenize
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
# n grame
ngram = NGram(n=2, inputCol="wordsData", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
# word frequence
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# idf
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

# word2vec
word2Vec = Word2Vec(vectorSize=N, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

# binarizer
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)

# PCA
# reduce dimension to 3
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

# StringIndex
# encodes a string column of labels to a column of label of indices order by frequency or alphabet
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)

# OneHotEstimator
# we need to use StringIndex first if apply to categorical feature
encoder = OneHotEncoderEstimator(inputCols=["categoryIndex1", "categoryIndex2"],
                                 outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)encoded = model.transform(df)

# Normalize & Scaler
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
l1NormData = normalizer.transform(dataFrame)
# standard scaler
# withMean=false: standard deviation, withMean=true: mean 
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)
# maxmin scaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# max abs scaler
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# bin
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# QuantileDiscretizer
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

# ElementwiseProduct
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")

# SQL Transformer
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

# VectorAssembler
# combine vector together for future model inputs
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"], # the columns we need to combine
    outputCol="features") # output column

# Imputer
# handle missing value
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
imputer.setMissingValue(custom_value)

# slice vector
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

# ChiSqSelector
# use Chisqare to select the features
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

Clarification and Regression

# Linear regression
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# logistic regression
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) 
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial") # multinomial

# decision tree
# classification
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# regression
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Random forest
# classification
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# regeression
rf = RandomForestRegressor(featuresCol="indexedFeatures")
# gradient-boosted 
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# preceptron
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# SVM
# Linear SVM, there is no kernel SVM like RBF
lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Naive Bayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# KNN
knn = KNNClassifier().setTopTreeSize(training.count().toInt / 500).setK(10)

Clustering

# k-means
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# GMM
gmm = GaussianMixture().setK(2).setSeed(538009335)

Collaborative Filtering

als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)
# Evaluate the model by computing the RMSE on the test datapredictions = model.transform(test)evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

Validation

# split train and test
train, test = data.randomSplit([0.9, 0.1], seed=12345)
# cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

You might be already found the problem. The ecosystem of Spark MLlib is not as rich as scikit learning, and it is lack of deep learning (of course, its name is machine learning). According to Databricks documents, We still have the solutions.

  • Use scikit learning on single node. Very simple solution. but since scikit leanring load the data still in memory. If the note is faster enough(driver), we can get a good performance as well.
  • To solve deep learning problem. we have two work around methods.
    • Apply keras, tensorflow on single node with GPU acceleration(Recommend by databricks).
    • Distribute Training. It might be slower than on the single node because of communication overhead. There are two frameworks used for distribute training. Horovod and Apache SystemML. I’ve never use Horovod, but you can find information here. As to SystemML, it is more like a wrapper for high level API and provide cluster optimizer which parses the code into spark RDD(live variable analysis, propagate stats, rewrite by matrix decomposition and runtime instruction). From the official website, we know it is much faster than MLLib and native R. The problem is it didn’t update anymore since 2017.
# Create and save a single-hidden-layer Keras model for binary classification# NOTE: In a typical workflow, we'd train the model before exporting it to disk,# but we skip that step here for brevity
model = Sequential()
model.add(Dense(units=20, input_shape=[num_features], activation='relu'))
model.add(Dense(units=1, activation='sigmoid'))
model_path = "/tmp/simple-binary-classification"
model.save(model_path)

transformer = KerasTransformer(inputCol="features", outputCol="predictions", modelFile=model_path)

It seems no perfect solution for machine learning in spark, right? Don’t forget we have other time costing jobs: hyper parameters configuration and validation. We can run same model with different hyper parameters on different nodes using paramMap which similar to grid search or random search.

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [10, 100, 1000]).addGrid(lr.regParam, [0.1, 0.01]).build()
crossval = CrossValidator(estimator=pipeline,
                      estimatorParamMaps=paramGrid,
                      evaluator=RegressionEvaluator(),
                      numFolds=2)  # use 3+ folds in practice
cvModel = crossval.fit(training)

There might be someone saying: why we don’t use MPI? The answer is simple, too complex. Although it can gain the perfect performance, and you can do whatever you want even running distributed GPU + CPU codes, there are too many things we need to manually configuration on low level API without fail tolerance.

In conclusion, we can utilize spark for ELT and training/ validation model to maximize the performance(it did really well for these works). But until now, we still need third frameworks to help us do deep learning or machine learning tasks on single strong node.

Reference:

Spark MLlib: http://spark.apache.org/docs/latest/ml-guide.html

Databrick: https://docs.databricks.com/getting-started/index.html

Spark ML Tuning: http://spark.apache.org/docs/latest/ml-tuning.html

Harovod: https://github.com/horovod/horovod

Apache SystemML: https://systemml.apache.org/docs/1.1.0/beginners-guide-keras2dml

How to use Dataframe in pySpark (compared with SQL)

-- version 1.0: initial @20190428
-- version 1.1: add image processing, broadcast and accumulator
-- version 1.2: add ambiguous column handle, maptype

When we implement spark, there are two ways to manipulate data: RDD and Dataframe. I don’t know why in most of books, they start with RDD rather than Dataframe. Since RDD is more OOP and functional structure, it is not very friendly to the people like SQL, pandas or R. Then Dataframe comes, it looks like a star in the dark. The advantage of using Dataframe can be listed as follows:

  • Static-typing and runtime type-safety. We can know syntax error in compile time, saves developer lots of time.
  • High-level abstraction and tell what to do rather than how to do. If you ever touched pandas, well you will find they are almost same thing.
  • High performance. Yes. Dataframe is not only simple but also much faster than using RDD directly, As the optimization work has been done in the catalyst which generates an optimized logical and physical query plan.

For more information, we can find in this article.

After know why we need to use dataframe, let’s us see how to use it to handle daily work. To make it easier, I will compare dataframe operation with SQL.

Initializing Spark Session

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("example project") \
        .config("spark.some.config.option", "some-value") \ # set paramaters for spark
        .getOrCreate()

Create DataFrames

## From RDDs
>>> from pyspark.sql.types import *
# Infer Schema
>>> sc = spark.sparkContext
>>> lines = sc.textFile("people.txt")
>>> parts = lines.map(lambda l: l.split(","))
>>> people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
>>> peopledf = spark.createDataFrame(people)
# Specify Schema
>>> people = parts.map(lambda p: Row(name=p[0],
age=int(p[1].strip())))
>>> schemaString = "name age"
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
>>> schema = StructType(fields)
>>> spark.createDataFrame(people, schema).show()

## From Spark Data Source
# JSON
>>> df = spark.read.json("customer.json")

# Use Maptype to read dynamic columns from JSON
customSchema = StructType([
                StructField("col1", StringType(),True),
                                StructField("event", MapType(StringType(),StringType()))])
spark.read.schema(customSchema).jason(path)

# Parquet files
>>> df3 = spark.read.load("users.parquet")
# TXT files
>>> df4 = spark.read.text("people.txt")
# CSV files
>>> df5 = spark.read.format("csv").option("header", true).option("inferSchema", true).load("csvfile.csv")
# MS SQL
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"}
pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
# or we can use
# collection = spark.read.sqlDB(config)
display(df)

Dataframe Manipulation

from pyspark.sql import functions as F
# select & where
df.select("column1","column2", explod("phonenumber").alias("contactInfo"), df['age']>24).show()

# join
df = dfa.join(dfb, dfa.id==dfb.id & dfa.name == dfb.name, how ='left') 
df = dfa.join(dfb, dfa.id==dfb.id | dfa.name == dfb.name , how ='right')
df = dfa.join(dfb, dfa.id==dfb.id, how ='full')
df = dfa.join(dfb, dfa.id==dfb.id)
df = dfa.crossjoin(dfb)

# distinct count
from pyspark.sql.functions import countDistinct
df = df.groupby('col1','col2').agg(countDistinct("col3").alias("others"))

# ambiguous column handle
# both date and endpoint_id exist in two dataframes
df_result = df_result.join(df_result2, ["date","endpoint_id"],how="left") 

# exits and not exits
new_df = df.join(
    spark.table("target"),
    how='left_semi',
    on='id')

new_df = df.join(
    spark.table("target"),
    how='left_anti',
    on='id')

# when
df.select("first name", F.when(df.age>30,1).otherwise(0))

# like
df.select("firstName", df.lastName.like("Smith"))

# startwith-endwith
df.select("firstName", df.lastName.like("Smith"))

# substring
df.select(df.firstName.substr(1,3).alias("name"))

# between
df.select(df.age.between(22,24))

# add columns
df = df.withColumn('city',df.address.city) \
    .withColumn('postalCode',df.address.postalCode) \
    .withColumn('state',df.address.state) \
    .withColumn('streetAddress',df.address.streetAddress) \
    .withColumn('telePhoneNumber',
    explode(df.phoneNumber.number)) \
    .withColumn('telePhoneType',
    explode(df.phoneNumber.type))

# update column name
df = df.withColumnRenamed('prename','aftername')

# removing column
df = df.drop("ColumnName1","columnname2")

# group by
df.groupby("groupbycolumn").agg({"salary": "avg", "age": "max"})

# filter
df.filter(df["age"]>24).show()

# Sort
df.sort("age",ascending=False).collect()

# Missing & Replace values
df.na.fill(value)
df.na.drop()
df.na.replace(value1,value2)
df["age"].na.fill(value)

# repartitioning
df.repartition(10)\ df with 10 partitions
    .rdd \
    .getNumPartitions()
df.coalesce(1).rdd.getNumPartitions()

# union and unionAll
df.union(df2)

# windows function
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func
windowSpec = \
  Window
    .partitionBy(df['category']) \
    .orderBy(df['revenue'].desc()) \
    .rangeBetween(-3,3) # or rowframe:  .rowBetween(Window.unboundedPreceding, Window.currentRow)
dataFrame = sqlContext.table("productRevenue")
revenue_difference = \
  (func.max(dataFrame['revenue']).over(windowSpec) - dataFrame['revenue'])
dataFrame.select(
  dataFrame['product'],
  dataFrame['category'],
  dataFrame['revenue'],
  revenue_difference.alias("revenue_difference"))

from pyspark.sql.functions import percentRank, ntile
df.select(
    "k", "v",
    percentRank().over(windowSpec).alias("percent_rank"),
    ntile(3).over(windowSpec).alias("ntile3"))

# pivot & unpivot
df_data
    .groupby(df_data.id, df_data.type)
    .pivot("date")
    .agg(count("ship"))
    .show())

df.selectExpr(df_data.id, df_data.type, "stack(3, '2010', 2010, '2011', 2011, '2012', 2012) as (date, shipNumber)").where("shipNumber is not null").show()

# Remove Duplicate
df.dropDuplicates()

Running SQL queries

# registering Dataframe as vies
>>> peopledf.createGlobalTempView("people")
>>> df.createTempView("customer")
>>> df.createOrReplaceTempView("customer")

# Query view
>>> df5 = spark.sql("SELECT * FROM customer").show()
>>> peopledf2 = spark.sql("SELECT * FROM global_temp.people")\
.show()
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()

Output

# Data convert
rdd = df.rdd
df.toJSON().first()
df.toPandas()

# write and save
df.select("columnname").write.save("filename",format="jason")

Check data

>>> df.dtypes Return df column names and data types
>>> df.show() Display the content of df
>>> df.head() Return first n rows
>>> df.first() Return first row
>>> df.take(2) Return the first n rows
>>> df.schema Return the schema of df
>>> df.describe().show() Compute summary statistics
>>> df.columns Return the columns of df
>>> df.count() Count the number of rows in df
>>> df.distinct().count() Count the number of distinct rows in df
>>> df.printSchema() Print the schema of df
>>> df.explain() Print the (logical and physical) plans

Image Processing

# spark 2.3 provoid the ImageSchema.readImages API
image_df = spark.read.format("image").option("dropInvalid", true).load("/path/to/images")
# the structure of output dataframe is like
image: struct containing all the image data
 |    |-- origin: string representing the source URI
 |    |-- height: integer, image height in pixels
 |    |-- width: integer, image width in pixels
 |    |-- nChannels: integer, number of color channels
 |    |-- mode: integer, OpenCV type
 |    |-- data: binary, the actual image
# Then we can use sparkML to build and train the model, blew is a sample crop and resize process
from mmlspark import ImageTransformer
tr = (ImageTransformer() # images are resized and then cropped
    .setOutputCol(“transformed”)
    .resize(height = 200, width = 200)
    .crop(0, 0, height = 180, width = 180) )

smallImages = tr.transform(images_df).select(“transformed”)

Broadcast and Accumulator

# Broadcast is a read-only variable to reduce data transfer, mostly we use it for "lookup" operation. In Azure data warehouse, there is a similar structure named "Replicate".
from pyspark.sql import SQLContext
from pyspark.sql.functions import broadcast
 
sqlContext = SQLContext(sc)
df_tiny = sqlContext.sql('select * from tiny_table')
df_large = sqlContext.sql('select * from massive_table')
df3 = df_large.join(broadcast(df_tiny), df_large.some_sort_of_key == df_tiny.key)

# Accumulator is a write-only(except spark driver) structure to aggregate information across executor. We can understand it as a global variable, but write-only.
from pyspark import SparkContext 

sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(1) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([2,3,4,5]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)

Someone might ask ADF dataflow can do almost same thing, is there any difference? In my understanding till now, NO. ADF dataflow need to translate to spark SQL which is the same engine with dataframe. If you like coding and familiar with python and pandas, or you want to do some data exploration/data science tasks, choose dataframe, if you like GUI similar to SSIS to do something like ELT tasks, choose ADF dataflow.

Reference:

A Tale of Three Apache Spark APIs: RDDs vs DataFrames and Datasets, https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

PySpark Cheat Sheet: Spark DataFrames in Python, https://www.datacamp.com/community/blog/pyspark-sql-cheat-sheet

pyspark.sql module, http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html

Spark Overview, http://spark.apache.org/docs/latest/

ADF tends to replace SSIS

Essentially, ADF(Azure Data Factory) only takes responsibility of Extraction and Load in the ELT pipeline. Then we have to use another tool, like databrick to write jupyter notebook to manipulate dataframe or RDD to complete transform activities. However, as Microsoft launched data “Data Flow” in ADF, it becomes more and more similar to SSIS. Most of ETL work can be done in ADF with nicely and friendly GUI. For ETL stuff, I copied a cheat sheet comparing the dataflows between ADF and SSIS. It maybe helpful.

Surprisingly, ADF didn’t provide SCD(slowing changing Dimension). You have to manually work on it. It won’t be hard although. Just set up some start and end time. Here is the article about it.

Anyway, for me. I like this way to complete some simple but repeatable actives with GUI, and let some complex operations in databrick.

Share some useful/special MS SQL tips as a data engineer





If you are a data scientist, you maybe never need to do the data preprocess work, like ETL/ELT, performance tunning or OLTP database design. Everything is already prepared in the structured data warehouse or flat file, it is beauty and nice. Regarding to data quality, all a data scientist need to do is handle some missing or wrong value, then clear the relationship and do the analysis. I didn’t say it is easy after preprocess, what I mean is data engineer really does lots of time-consuming work for the final success. So I wanna summary and share some of my experience, it maybe can save data engineer much time. And I am welcome if someone can correct me and add more information, please send me email: neo_aksa@hotmail.com

1. Incremental Loading. We get three method to do incremental loading.

A. Merge clause. It’s very simple. just the combination of update and insert.

MERGE INTO
target_table tg_table
USING source_table src_table
ON ( src_table.id = tg_table.id )
WHEN MATCHED
THEN UPDATE SET tg_table.name = src_table.name
WHEN NOT MATCHED
THEN INSERT ( tg_table.id, tg_table.name ) VALUES ( src_table.id, src_table.name );

B. CDC(change data capture) in SSIS. More information see my another topic: “Incremental Load DW by using CDC in SSIS

C.Lookup + conditional split in SSIS. Essentially it is as same as the method A. Not find goes to “Insert”, find goes to “update”.

2. CTE. Before CTE coming out, we write the SQL with many sub queries which is a little bit hard to read since the logic is reversed. Now with the help of CTE, we can make our codes more readable and get rid of function in group by.

-- return the customers who had over $10,000 in purchase for their first three transactions.
with OrderRank
as
( 
select custID, row_number() over(partition by custID order by orderID) as Rank, amount from SalesOrder
),
OrderOver
as
(
select custID, sum(amount) as totalAmount from OrderRank where rank<=3 group by custID
)
select custID, totalAmount from OrderOver where totalAmount>10000

3. Delete duplicate row. This is very common job as lots of data are manual input. Here we have two simple ways to handle it.
A. use “Sort” component in SSIS, check reduce duplication box.
B. Write script. Using CTE to mark the row number, then delete the row number greater than 1

With CTE RemoveDuplicate
AS
(
-- partition and order by columns which decide duplication 
select ROW_NUMBER() over (partition id,name.. order by id,name) as row id, column ....... from tablename
)
delete from RemoveDuplicate where row_id > 1

4. Faster Loading. SQL Server defaults isolation level is “Read committed”. But in most of case, we don’t need it as we only need to load all the data from OLTP. There are two ways to make loading faster and not lock another jobs.
A. use “WITH(NOLOCK)” in statement level.

SELECT FirstName, LastName
FROM EmployeeInfo WITH(NOLOCK)
WHERE EmpID = 1;

B. use “Set Transaction ISOLATION LEVEL” to read uncommitted.

SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

5. Use Column stored index in data warehouse. Column stored index is very helpful to increase the select performance since the column in the same page, but bad for insert or update. For the fact table with many different values, it is very good for full table scan. Just remember, if we create clustered columnstore index, we cannot create primary key, and all columns should be included into this clustered columnstore index.

--BASIC EXAMPLE: Create a nonclustered index on a clustered columnstore table.  
--Create the table  
CREATE TABLE t_account (  
    AccountKey int NOT NULL,  
    AccountDescription nvarchar (50),  
    AccountType nvarchar(50),  
    UnitSold int  
);  
GO  
  
--Store the table as a clustered columnstore.  
CREATE CLUSTERED COLUMNSTORE INDEX taccount_cci ON t_account;  
GO  
  
--Add a nonclustered index for table seek.
CREATE UNIQUE INDEX taccount_nc1 ON t_account (AccountKey);

6. Bulk data load. We maybe find its very slow to bulk load huge tables into data warehouse. This is because we missed some steps before loading.
A. drop the clustered index for large table before loading.
B. recreate index for the large table after loading.
C. Update statistics.

7. Update view. Typically, we use view to hide the logic and table behead, and make loading more easier. But in some cases, we need to update view( yes, we dont want to know the detail of view, we just need to update some data). SQL SERVER provides ability to update view directly and indirectly.
A. If the view match following limitation, you can do DML operation directly. a. no subquery and only select b. no distinct or group by(aggregation=NO) c. No order by d. if view contains multiple tables, you can only insert/update one table. e. use ‘with check option‘, otherwise, you will update the data out of you exception.
B. Use instead of trigger to update tables which related to view.

CREATE TRIGGER trigUnion ON vwUnionCustomerSupplier
INSTEAD OF UPDATE
AS
BEGIN
SET NOCOUNT ON
DECLARE @DelName nvarchar(50)

IF (SELECT inserted.Type FROM inserted) Is Null
RETURN

SELECT @DelName = deleted.CompanyName FROM deleted

IF (SELECT inserted.Type FROM inserted) = 'Company'
UPDATE Customers
SET CompanyName =
  (SELECT CompanyName
  FROM inserted)
  WHERE Customers.CompanyName =
  @DelName
ELSE
UPDATE Suppliers
SET CompanyName =
  (SELECT CompanyName
  FROM inserted)
  WHERE Suppliers.CompanyName =
  @DelName
END

8. Deadlock or long running Query. It’s not normal. but if you find your ELT or ETL is running for a long time. It may be caused by deadlock. check it by sys.dm_tran_lock. or we can use sys.dm_exec_query_stats to get the query running information.

9. Use windows function for rolling aggregation. We can set the row or range option to achieve running aggregation in MS SQL. By default, Range is default option.

-- running total
select customer id, orderId, amount, sum(amount) over (order by orderid) runningtotal from sales_order (in tempDB)
-- revised running total 
select customer id, orderId, amount, sum(amount) over (order by orderid rows unbounded preceding) runningtotal from saels_order (in memory) running total
-- runningtotal from sales order(in memory) all sum, very useful in partition with subtotal
select customer id, orderId, amount, sum(amount) over (order by orderid rows between unbounded preceding and unbounded following) 
-- running 3 month total from sales (in memory)
select customer id, orderId, amount, sum(amount) over (order by orderid rows between 1 preceding and 1 following) 

10. Covering Index. An index that contains all information required to resolve the query is known as a “Covering Index” . If the fields from “select” are not in non-cluster or cluster index, the “key lookup” will happen in execution plan.

To meet the covering Index, but we don’t want move new column into non-clustered index, we can use “Included columns“. It will keep non index in the leaf node of the index.

CREATE NONCLUSTERED INDEX [ix_Customer_Email] ON [dbo].[Customers]
(
            [Last_Name] ASC,
            [First_Name] ASC
)
INCLUDE ( [Email_Address]) WITH (PAD_INDEX  = OFF, STATISTICS_NORECOMPUTE  = OFF, SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS  = ON, ALLOW_PAGE_LOCKS  = ON) ON [PRIMARY]

11. Schema Binding. schema binding is used for view and function.
Objects that are referenced by schema bound objects cannot have their definition changed. it can also significantly increase the performance of user defined functions

CREATE FUNCTION dbo.GetProductStatusLabel
(
  @StatusID tinyint
)
RETURNS nvarchar(32)
WITH SCHEMABINDING
AS
BEGIN
  RETURN (SELECT Label FROM dbo.ProductStatus WHERE StatusID = @StatusID);
END

12. Table/Index partitioning. If you are working on Azure or cluster platform, please skip this. The HDFS has already helps you to complete similar thing. But if you still work on-prem, table partitioning will help to improve performance a lot. Essentially, table partitioning is creating more than one filegroup to improve its I/O. There are four steps to create partition for table or index.
A. Add filegroups and files

-- Adds four new filegroups to the AdventureWorks2012 database  
ALTER DATABASE AdventureWorks2012  
ADD FILEGROUP test1fg;  
GO  
ALTER DATABASE AdventureWorks2012  
ADD FILEGROUP test2fg;  
-- Adds one file for each filegroup.  
ALTER DATABASE AdventureWorks2012   
ADD FILE   
(  
    NAME = test1dat1,  
    FILENAME = 'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\t1dat1.ndf',  
    SIZE = 5MB,  
    MAXSIZE = 100MB,  
    FILEGROWTH = 5MB  
)  
TO FILEGROUP test1fg;  
ALTER DATABASE AdventureWorks2012   
ADD FILE   
(  
    NAME = test2dat2,  
    FILENAME = 'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\t2dat2.ndf',  
    SIZE = 5MB,  
    MAXSIZE = 100MB,  
    FILEGROWTH = 5MB  
)  
TO FILEGROUP test2fg;  
GO  

B. Add partition function: how map to the partitions based on column’s value

-- Creates a partition function called myRangePF1 that will partition a table into four partitions  
CREATE PARTITION FUNCTION myRangePF1 (int)  
    AS RANGE LEFT FOR VALUES (100) ;  
GO  

C. Add partition scheme: map the partition function to filegourps.

-- Creates a partition scheme called myRangePS1 that applies myRangePF1 to the four filegroups created above  
CREATE PARTITION SCHEME myRangePS1  
    AS PARTITION myRangePF1  
    TO (test1fg, test2fg) ;  
GO  

D. participating column: partition function uses it to perform partition

-- Creates a partitioned table called PartitionTable that uses myRangePS1 to partition col1  
CREATE TABLE PartitionTable (col1 int PRIMARY KEY, col2 char(10))  
    ON myRangePS1 (col1) ;  
GO

13. Defragmentation. According to Mircorsoft suggestion, if fragment greater than 30%, we need to rebuild index, if between 5% – 30%, we need to reorganize index. We can use sys.dm_db_index_physical_stats to check the avg_fragmentation_in_percent.

-- use sys.dm_db_index_pysical_stats to check fregmanet
select * from   sys.dm_db_index_physical_stats(DB_ID(N'AdventureWorks2017'),OBJECT_id(N'AdventureWorks2017.Person.Person'),-1,null,'detailed')
-- Check avg_fragmentation_in_percent
-- if this percent 
--> 5% and < = 30%
ALTER INDEX REORGANIZE
--> 30%
ALTER INDEX REBUILD WITH (ONLINE = ON) 1

14. Other convenient code.

-- get all column names of spec table
sp_coulumns table_name, table_owner

-- Object Dependencies
sp_depends table_name, table_owner

-- convert if fail
Try_Convert(data_ype(length), expression, style)

-- split string by demilation.
SELECT * FROM STRING_split('A,B,B',',')
select column1, column2 from table1
cross apply string_split(column3,',')

--  returns the last day of the month containing a specified date, with an optional offset.
EOMONTH ( start_date [, month_to_add ] ) 

-- check the object
IF OBJECT_ID('Sales.uspGetEmployeeSalesYTD', 'P') IS NOT NULL

-- dynamic SQL
-- use sp_executesql
SET @ParmDefinition = N'@BusinessEntityID tinyint'; /* Execute the string with the first parameter value. */ 
SET @IntVariable = 197; 
EXECUTE sp_executesql @SQLString, @ParmDefinition, @BusinessEntityID = @IntVariable;
-- use exec
SET @columnList = 'AddressID, AddressLine1, City'SET @city = '''London'''
SET @sqlCommand = 'SELECT ' + @columnList + ' FROM Person.Address WHERE City = ' + @city
EXEC (@sqlCommand)

A Failed Text Classification

  • — version 1@20190401
  • –version 2@20190402: change to category to 2

Today I tried a text classification task where the data is about the message on the flights and labeled into 5 levels. Observably, it is a supervised problem. And I though there are bunch of solutions already for this kind of problem. So that I started with full of confidence. But…. the result was so bad, no more than 35% accurate for 5 classification. Only a little bit better than guess.

ModelAcc%
word tf-idf with kernal SVM32.6
word tf-idf with random forest31.3
Naïve Bayes35.3
word embedding(FastText) with GRU30.3

The detail can be found in Google Colaboratory.

I am considering following reasons leading this failure:

  1. The module is easy to over fitting. For example, when GRU model’s training loss decreasing, the invalidation loss was decreasing in the beginning, but after 40 epochs, it started to increasing or jumped up/down.
  2. Since I used trained embedding model(FastText) which is based on wiki but the dataset is in civil aviation. The words and word vectors may far away.
  3. In the data source, there might be lack of significant or clear rules to classify them to 5 categories. If we just label it to binary “attention/no worry”. The result will be better.

@20190402: I change the category from 5 to 2, hopefully the result would be better. But NO improvement. Only 63.5% for 2 categories.

From Qlikview to Tableau, a comparison from developer’s viewpoint

— 03/26/2019 version 0.1

In the field of BI tools, the three wildly used are Tableau, PowerBI and Qlik. According to Gartner’s report, all of them are leaders in the market. although there are some difference in features and operations, I think they can do the most of typical visualization work. (Don’t compared them with D3 or matplotlib, the complexity and target customer are totally different)

Tableau, Qlikview and PowerBI are leaders in the market

The downside of this situation for the BI developer or data analyzer is that you have to learn all of them since you never know your company or clients environments in advance. So, like Einstein said:

“The measure of intelligence is the ability to change.”

Albert Einstein

I touched Qlikview almost 7 years ago, compared with SSRS, it brought new ideas and quick access ability for BI. And 3 years ago, I started to use Tableau, I never thought it could be so easy and fast developing. In this post, I will share some ideas through compare Performance, Visualization, Suitable , ETL, LOD or Set analysis, Table calculation, Tooltips, Sets/Filter/Group etc between Qlikview and Tableau as a developer used both of them.

  • Performance. Qlikview and Tableau are both working well in performance. They use in-memory tech to accelerate the speed. You won’t feel much difference when use them to handle the data with small or middle size ( less than 1 million). But above this size, Tableau is slower than Qlikview since Qlikview is only based on your RAM, Tableau uses cube and RAM. The speed of Qlikiview more depends on your design in your model where the more sync tables the more computation to refresh data, that’s the reason of slow Qlikview.
  • Visualization. Tableau provides a fashion, simple, drop-drag method to operate the dashboards. It is very quick to develop a new dashboard without do much modeling work. Qlikivew provides relatively complex but flexible dashboard and tables. Yes, you are right, “Table” is much better in Qlikview. and another advantage is drilldown, the cycle drill down feature is convenience for managers to find the key points in the lower level. However, as to Map function, I have to say, Qlikivew has much to learn from Tableau. In Tableau, you can: 1. set map by country, state or latitude, longitude 2. import spatial file 3. set custom image as map, and set x, y coordinate.
Use x, y coordinate in a custom image
  • Suitable: in a nuts, Tableau is good for both end-user and IT; Qlikview is good for IT. Tableau is better to develop dashboards and rapid developing for specific purposes, like sales growth analysis. Qlikivew is better to develop enterprise BI solution( all in One). It is easy to understand since Qlikview is based on its model which connects everything together and reveals on the UI.
  • ETL. Both of two companies said they can do some part of ETL work. In my opinion, neither of them can replace ETL tools, their tools for ETL are simple and armature. Maybe Qliview doing a little better in incremental loading with QVD files. Without incremental loading, Tableau cannot handle large size data. Hope it can solve it soon
use QVD files for incremental loading
  • LOD or Set analysis. Level of Detail(Tableau) and Set analysis(Qlikview) are my favor features. It allows us to control one or more dimensions. The only difference is set analysis allow developer to set value for specific dimension.
LOD in Tableau
Set analysis in Qlikview
  • Table calculation. This feature and Tooltips are two my favors in Tableau. In Qlikview, except simple percentage and cumulative sum, you have you code by yourself, like rolling sum “sum(aggr(rangesum(above(total sum({
    <Month=> }Amount),0,3)),Month))” . However, in Tableau, Table calculation gives more convince experience. Also you can choose effect area between table and pane.
logic of custom table calculation in Tableau
  • Tooltips. This is my second favor feature in Tableau. Long time ago, I was hoping Qliview could provide subchart in tooltip. But until now, it still can only text context in tooltips.
subchart in tooltips gives end-user more relative information rather than text one.
  • Filter, Set, Group: In Qlikview, there is no corresponding concept. Filter is only fields in the control panel; set is much like bookmark; and group is mostly done in the script. In Tableau, you need to set filter repeatedly with its working scope; set is a dynamic sub dataset, you can set compute set or in/out set in the global or region level; group is static sub dataset in the region level. From Qlikview point, it is hard to understand “Set”, but it is just a True/False flag essentially.
Set detail members of in/out in set with parameters
  • Others. Qlikview can do lots of work in its script, I mean everything you can image since it includes vbscript in module function. It seems tableau can do jscript as well, but you won’t want to use it. Tableau provide “Story” feature, the user won’t need to export to PPT to do the second developing.

I didn’t mention some soft or hardware features, like rapid prototyping ability or device supporting. I will put them in the future poster.

Brief analysis on recommendation system of Netflix & YouTube

–03/19/2019 version 0.1


Last week, my wife told me she logged into my netflix’s account, then she found it was not hers immediately since the items did not match her tastes. This activated my interesting in the recommendation system of Netflix & Youtube which are the most watched channels in US. (maybe spotfiy will be the same way). Here I want to give a brief analysis how they work.

Basic

Before we quickly look how many different manners(that I knew) used in the recommendation systems.

Popularity. This is the simplest way in term of PV. It works very good for new users and avoid the “cold start” problem. However, the downside is this method can not provide the personalized recommendation. The way to optimize it is adding some categories at the beginning so that users can filter the categories by themselves.
Collaborative filtering (CF). The Collaborative Filtering (CF) algorithms are based on the idea that if two clients have similar rating history then they will behave similarly in the future (Breese,Heckerman, and Kadie, 1998). It can also split into two subcategories, one is Memory-based, another is Model-based.

  • Memory-based approach can be divided into User-based and Item-based.  They find the similar users or similar items respectively in term of Pearson Correlation.
    • User-based.
      1. Build correlation matrix S which is symmetric.

            \[S(i,k)=\frac{\sum_j (v_{ij}-\bar{v_i})(v_{kj}-\bar{v_k})}{\sqrt{\sum_j (v_{ij}-\bar{v_i})^2(v_{kj}-\bar{v_k})^2}}\]

      2. select top k users who has the largest scores.
      3. identify items that similar users like but the prediction user has not seem before.  The prediction of a recommendation is based on the wighted combination of the selected neighbor’s rating.

            \[p(i,k)=\bar{v}_i+\frac{\sum_{i=1}^{n}(v_{ij}-\bar{v}_k)\times S(i,k)}{\sum_{i=1}^{n}S(i,k)}\]

      4. pick up top N of movies based on the predicted rating.
    • Item-based.
      1. Build correlation matrix S based on items. (similar to user-based)
      2. Get the top n movies that prediction user watched and rated before.
      3. return the movies that mostly related to these n movies and the prediction user has never watched.
    • In the real word. The size of user are growing faster than item, and they are easy to be changed. So item-based are most frequency used. 
  • Model-based approach are based on matrix factorization which is popular in  dimension reduction. Here we use Singular value decomposition(SVD) to explain. 

        \[X_{n*m}=U_{n*r}\cdot S_{r*r}\cdot V_{r*m}^T\]

    , where U represents the freature vectors corresponding to the users in the latent space with dimension r, V represents the feature vectors corresponding to the items in the latent space with dimension r.  Once we find U and V, we can calculate any p(i,j) by U_i \cdot V_j.
  • CF is based on historical data, it has “cold start” problem. and the accuracy of prediction is based on the mount of data since the CF matrix has sparsity problem, e.g, few mistake rating will effect the prediction seriously. 

Contented-based(CB).  This approach is based on the information of item itself rather than only rating in CF approach. We need to create meta data for the items. These meta data can be tagged manual or use TF-IDF tech to automatically extra keywords. Then build the connection between the item that prediction user liked and the items with similar meta data. CB avoid of “cold start” and “over recommend” problems, however, it is hard to metain and keep accuracy of meta data.  

Hybrid. It combined CF and CB. We can merge the prediction together or set the weights in different scenarios. 

Deep Learning. In the large scale dataset, it is hard to use traditional recommendation system because of 4V(volume, variety, velocity, and veracity).   Deep learning model are good at solving complex problem( A review on deep learning for recommender systems: challenges and remedies).  We will introduce deep learning model used by YouTube in the next section.

Netflix

I firstly log into the Netflix to find some information provided by the official website. Fortunately, there was a topic How Netflix’s Recommendations System Works. They didn’t give much detail about algorithms but the provides the clues which information they are using for predict users’ choices. Blew is their explanation:

We estimate the likelihood that you will watch a particular title in our catalog based on a number of factors including:

  • your interactions with our service (such as your viewing history and how you rated other titles),

  • other members with similar tastes and preferences on our service (more info here), and

  • information about the titles, such as their genre, categories, actors, release year, etc.

So, we can guess it is a hybrid approach combined with CF(item-base and user-based) and CB approaches. But we don’t know how they design it at this moment. Let keep reading from the official website.

In addition to knowing what you have watched on Netflix, to best personalize the recommendations we also look at things like:

  • the time of day you watch,

  • the devices you are watching Netflix on, and

  • how long you watch.

These actives are not mentioned in the basic section. They are all used as input vector for the deep learning model which we will see in YouTube section. 

It also mentioned “Cold start” problem:

When you create your Netflix account, or add a new profile in your account, we ask you to choose a few titles that you like. We use these titles to “jump start” your recommendations. Choosing a few titles you like is optional. If you choose to forego this step then we will start you off with a diverse and popular set of titles to get you going.

It’s clear they use popularity approach with categories to solve “cold start” problem. As user has more historical information, Netflix will use another approaches to replace the initial one. 

They also personalized row  and title inside:

In addition to choosing which titles to include in the rows on your Netflix homepage, our system also ranks each title within the row, and then ranks the rows themselves, using algorithms and complex systems to provide a personalized experience. …. In each row there are three layers of personalization:

  • the choice of row (e.g. Continue Watching, Trending Now, Award-Winning Comedies, etc.)
  • which titles appear in the row, and
  • the ranking of those titles.

They calculate the score for each item for each users, then sum up these scores into each category to decide the order of rows. As I said, we don’t know how they mix CB and CF to get the score of each item yet. But they are mixed for sure. 

 

YouTube

As Google’s product, it is not surprised that YouTube uses Deep learning as a solution for recommendation system. It is too large both in user and item aspects. A simple stats model can not handle it well.  In the paper “Deep Neural Networks for YouTube Recommendations“, they explained how they use DL to YouTube.

 

It has two parts: Candidate Generation and Ranking. One for filtering hundred candidates from millions, second for sorting by adding more scenario or video features information. Let’s see how they work:

  • Candidate generation.  For candidate generation, it filters from millions videos, so it only uses user activities and scenario information. The basic idea is getting  probabilities of watching specific video V through user U and context C.

        \[P(w_t=i|U,C)=\frac{e^{v_iu}}{\sum_{j\in{v}}e^{v_ju}}}\]

    . The key point is to get user vector u and v.  To get user vector u, author embeds the video watches and search tokens, then average them into watch vector and search vector, then combined with other geogrphic , video ages and gender vectors to get through 3 connected ReLU layer. The output is user vector u.  To get video vectors v, we need to use u to predict probabilities for all v through softmax. After training, the video vector v is what we want. In the serving processing, we only need to put u and v together to calculate the top N highest probability vectors. 
  • Ranking.Compared with Candidate Generation, the number of videos is much less. So we can put more video features into the embedding vectors. These features are mostly focus on scenario, like topic of video, how many videos the user watched under each topic and time since last watch.  It embeds categorical features with shared embeddings and continuous features with powers of normalization. 

Reference:

  • Deep Neural Networks for YouTube Recommendations, Paul Covington, Jay Adams, Emre Sargin, https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/45530.pdf
  • How Netflix’s Recommendations System Works, n/a, https://help.netflix.com/en/node/100639
  • Finding the Latent Factors | Stanford University, https://www.youtube.com/watch?v=GGWBMg0i9d4&index=56&list=PLLssT5z_DsK9JDLcT8T62VtzwyW9LNepV
  • Recommendation System for Netflix, Leidy Esperanza MOLINA
    FERNÁNDEZ, https://beta.vu.nl/nl/Images/werkstuk-fernandez_tcm235-874624.pdf
  • 现在推荐算法都发展成什么样了?来看看这个你就知道了!,章华燕, https://mp.weixin.qq.com/s?__biz=MzIzNzA4NDk3Nw==&mid=2457737060&idx=1&sn=88ef898f5054ae9b8cb005c31b65ee2d&chksm=ff44bf3ac833362c2436002be265c390b033d3e7709553fd8b603e6269d58f366f689beb2639&mpshare=1&scene=1&srcid=#