Machine Learning on Spark — How it works and why it doesn’t work well

Spark provides spark MLlib for machine learning in a scalable environment. MLlib includes three major parts: Transformer, Estimator and Pipeline. Essentially, transformer takes a dataframe as an input and returns a new data frame with more columns. Most featurization tasks are transformer. Estimator takes a dataframes as an input and returns a model(transformer), as we know the ML algorithms.. Pipeline combines transformer and estimator together.
Additionally, data frame becomes the primary API for MLlib. There is not any more new features for RDD based API in Spark MLib.

If you already understood or used high level machine learning or deep learning frameworks, like scikit-learning, keras, tersorflow, you will find everything is so familiar with. But when you use spark MLlib in practice, you still need third library’s help. I will talk about it in the end.

Basic Stats

# Corrlation
from pyspark.ml.stat import Correlation
r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

# Summarizer
from pyspark.ml.stat import Summarizer
# compute statistics for multiple metrics without weight
df.select(summarizer.summary(df.features)).show(truncate=False)

# ChiSquare
r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

Featurization

# TF-IDF
# stop word remove
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData)
# tokenize
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
# n grame
ngram = NGram(n=2, inputCol="wordsData", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
# word frequence
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# idf
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

# word2vec
word2Vec = Word2Vec(vectorSize=N, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

# binarizer
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)

# PCA
# reduce dimension to 3
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

# StringIndex
# encodes a string column of labels to a column of label of indices order by frequency or alphabet
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)

# OneHotEstimator
# we need to use StringIndex first if apply to categorical feature
encoder = OneHotEncoderEstimator(inputCols=["categoryIndex1", "categoryIndex2"],
                                 outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)encoded = model.transform(df)

# Normalize & Scaler
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
l1NormData = normalizer.transform(dataFrame)
# standard scaler
# withMean=false: standard deviation, withMean=true: mean 
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)
# maxmin scaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# max abs scaler
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# bin
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# QuantileDiscretizer
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

# ElementwiseProduct
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")

# SQL Transformer
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

# VectorAssembler
# combine vector together for future model inputs
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"], # the columns we need to combine
    outputCol="features") # output column

# Imputer
# handle missing value
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
imputer.setMissingValue(custom_value)

# slice vector
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

# ChiSqSelector
# use Chisqare to select the features
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

Clarification and Regression

# Linear regression
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# logistic regression
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) 
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial") # multinomial

# decision tree
# classification
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# regression
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Random forest
# classification
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# regeression
rf = RandomForestRegressor(featuresCol="indexedFeatures")
# gradient-boosted 
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# preceptron
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# SVM
# Linear SVM, there is no kernel SVM like RBF
lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Naive Bayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# KNN
knn = KNNClassifier().setTopTreeSize(training.count().toInt / 500).setK(10)

Clustering

# k-means
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# GMM
gmm = GaussianMixture().setK(2).setSeed(538009335)

Collaborative Filtering

als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)
# Evaluate the model by computing the RMSE on the test datapredictions = model.transform(test)evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

Validation

# split train and test
train, test = data.randomSplit([0.9, 0.1], seed=12345)
# cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

You might be already found the problem. The ecosystem of Spark MLlib is not as rich as scikit learning, and it is lack of deep learning (of course, its name is machine learning). According to Databricks documents, We still have the solutions.

  • Use scikit learning on single node. Very simple solution. but since scikit leanring load the data still in memory. If the note is faster enough(driver), we can get a good performance as well.
  • To solve deep learning problem. we have two work around methods.
    • Apply keras, tensorflow on single node with GPU acceleration(Recommend by databricks).
    • Distribute Training. It might be slower than on the single node because of communication overhead. There are two frameworks used for distribute training. Horovod and Apache SystemML. I’ve never use Horovod, but you can find information here. As to SystemML, it is more like a wrapper for high level API and provide cluster optimizer which parses the code into spark RDD(live variable analysis, propagate stats, rewrite by matrix decomposition and runtime instruction). From the official website, we know it is much faster than MLLib and native R. The problem is it didn’t update anymore since 2017.
# Create and save a single-hidden-layer Keras model for binary classification# NOTE: In a typical workflow, we'd train the model before exporting it to disk,# but we skip that step here for brevity
model = Sequential()
model.add(Dense(units=20, input_shape=[num_features], activation='relu'))
model.add(Dense(units=1, activation='sigmoid'))
model_path = "/tmp/simple-binary-classification"
model.save(model_path)

transformer = KerasTransformer(inputCol="features", outputCol="predictions", modelFile=model_path)

It seems no perfect solution for machine learning in spark, right? Don’t forget we have other time costing jobs: hyper parameters configuration and validation. We can run same model with different hyper parameters on different nodes using paramMap which similar to grid search or random search.

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [10, 100, 1000]).addGrid(lr.regParam, [0.1, 0.01]).build()
crossval = CrossValidator(estimator=pipeline,
                      estimatorParamMaps=paramGrid,
                      evaluator=RegressionEvaluator(),
                      numFolds=2)  # use 3+ folds in practice
cvModel = crossval.fit(training)

There might be someone saying: why we don’t use MPI? The answer is simple, too complex. Although it can gain the perfect performance, and you can do whatever you want even running distributed GPU + CPU codes, there are too many things we need to manually configuration on low level API without fail tolerance.

In conclusion, we can utilize spark for ELT and training/ validation model to maximize the performance(it did really well for these works). But until now, we still need third frameworks to help us do deep learning or machine learning tasks on single strong node.

Reference:

Spark MLlib: http://spark.apache.org/docs/latest/ml-guide.html

Databrick: https://docs.databricks.com/getting-started/index.html

Spark ML Tuning: http://spark.apache.org/docs/latest/ml-tuning.html

Harovod: https://github.com/horovod/horovod

Apache SystemML: https://systemml.apache.org/docs/1.1.0/beginners-guide-keras2dml

How to use Dataframe in pySpark (compared with SQL)

-- version 1.0: initial @20190428
-- version 1.1: add image processing, broadcast and accumulator
-- version 1.2: add ambiguous column handle, maptype

When we implement spark, there are two ways to manipulate data: RDD and Dataframe. I don’t know why in most of books, they start with RDD rather than Dataframe. Since RDD is more OOP and functional structure, it is not very friendly to the people like SQL, pandas or R. Then Dataframe comes, it looks like a star in the dark. The advantage of using Dataframe can be listed as follows:

  • Static-typing and runtime type-safety. We can know syntax error in compile time, saves developer lots of time.
  • High-level abstraction and tell what to do rather than how to do. If you ever touched pandas, well you will find they are almost same thing.
  • High performance. Yes. Dataframe is not only simple but also much faster than using RDD directly, As the optimization work has been done in the catalyst which generates an optimized logical and physical query plan.

For more information, we can find in this article.

After know why we need to use dataframe, let’s us see how to use it to handle daily work. To make it easier, I will compare dataframe operation with SQL.

Initializing Spark Session

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("example project") \
        .config("spark.some.config.option", "some-value") \ # set paramaters for spark
        .getOrCreate()

Create DataFrames

## From RDDs
>>> from pyspark.sql.types import *
# Infer Schema
>>> sc = spark.sparkContext
>>> lines = sc.textFile("people.txt")
>>> parts = lines.map(lambda l: l.split(","))
>>> people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
>>> peopledf = spark.createDataFrame(people)
# Specify Schema
>>> people = parts.map(lambda p: Row(name=p[0],
age=int(p[1].strip())))
>>> schemaString = "name age"
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
>>> schema = StructType(fields)
>>> spark.createDataFrame(people, schema).show()

## From Spark Data Source
# JSON
>>> df = spark.read.json("customer.json")

# Use Maptype to read dynamic columns from JSON
customSchema = StructType([
                StructField("col1", StringType(),True),
                                StructField("event", MapType(StringType(),StringType()))])
spark.read.schema(customSchema).jason(path)

# Parquet files
>>> df3 = spark.read.load("users.parquet")
# TXT files
>>> df4 = spark.read.text("people.txt")
# CSV files
>>> df5 = spark.read.format("csv").option("header", true).option("inferSchema", true).load("csvfile.csv")
# MS SQL
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"}
pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
# or we can use
# collection = spark.read.sqlDB(config)
display(df)

Dataframe Manipulation

from pyspark.sql import functions as F
# select & where
df.select("column1","column2", explod("phonenumber").alias("contactInfo"), df['age']>24).show()

# join
df = dfa.join(dfb, dfa.id==dfb.id & dfa.name == dfb.name, how ='left') 
df = dfa.join(dfb, dfa.id==dfb.id | dfa.name == dfb.name , how ='right')
df = dfa.join(dfb, dfa.id==dfb.id, how ='full')
df = dfa.join(dfb, dfa.id==dfb.id)
df = dfa.crossjoin(dfb)

# distinct count
from pyspark.sql.functions import countDistinct
df = df.groupby('col1','col2').agg(countDistinct("col3").alias("others"))

# ambiguous column handle
# both date and endpoint_id exist in two dataframes
df_result = df_result.join(df_result2, ["date","endpoint_id"],how="left") 

# exits and not exits
new_df = df.join(
    spark.table("target"),
    how='left_semi',
    on='id')

new_df = df.join(
    spark.table("target"),
    how='left_anti',
    on='id')

# when
df.select("first name", F.when(df.age>30,1).otherwise(0))

# like
df.select("firstName", df.lastName.like("Smith"))

# startwith-endwith
df.select("firstName", df.lastName.like("Smith"))

# substring
df.select(df.firstName.substr(1,3).alias("name"))

# between
df.select(df.age.between(22,24))

# add columns
df = df.withColumn('city',df.address.city) \
    .withColumn('postalCode',df.address.postalCode) \
    .withColumn('state',df.address.state) \
    .withColumn('streetAddress',df.address.streetAddress) \
    .withColumn('telePhoneNumber',
    explode(df.phoneNumber.number)) \
    .withColumn('telePhoneType',
    explode(df.phoneNumber.type))

# update column name
df = df.withColumnRenamed('prename','aftername')

# removing column
df = df.drop("ColumnName1","columnname2")

# group by
df.groupby("groupbycolumn").agg({"salary": "avg", "age": "max"})

# filter
df.filter(df["age"]>24).show()

# Sort
df.sort("age",ascending=False).collect()

# Missing & Replace values
df.na.fill(value)
df.na.drop()
df.na.replace(value1,value2)
df["age"].na.fill(value)

# repartitioning
df.repartition(10)\ df with 10 partitions
    .rdd \
    .getNumPartitions()
df.coalesce(1).rdd.getNumPartitions()

# union and unionAll
df.union(df2)

# windows function
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func
windowSpec = \
  Window
    .partitionBy(df['category']) \
    .orderBy(df['revenue'].desc()) \
    .rangeBetween(-3,3) # or rowframe:  .rowBetween(Window.unboundedPreceding, Window.currentRow)
dataFrame = sqlContext.table("productRevenue")
revenue_difference = \
  (func.max(dataFrame['revenue']).over(windowSpec) - dataFrame['revenue'])
dataFrame.select(
  dataFrame['product'],
  dataFrame['category'],
  dataFrame['revenue'],
  revenue_difference.alias("revenue_difference"))

from pyspark.sql.functions import percentRank, ntile
df.select(
    "k", "v",
    percentRank().over(windowSpec).alias("percent_rank"),
    ntile(3).over(windowSpec).alias("ntile3"))

# pivot & unpivot
df_data
    .groupby(df_data.id, df_data.type)
    .pivot("date")
    .agg(count("ship"))
    .show())

df.selectExpr(df_data.id, df_data.type, "stack(3, '2010', 2010, '2011', 2011, '2012', 2012) as (date, shipNumber)").where("shipNumber is not null").show()

# Remove Duplicate
df.dropDuplicates()

Running SQL queries

# registering Dataframe as vies
>>> peopledf.createGlobalTempView("people")
>>> df.createTempView("customer")
>>> df.createOrReplaceTempView("customer")

# Query view
>>> df5 = spark.sql("SELECT * FROM customer").show()
>>> peopledf2 = spark.sql("SELECT * FROM global_temp.people")\
.show()
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()

Output

# Data convert
rdd = df.rdd
df.toJSON().first()
df.toPandas()

# write and save
df.select("columnname").write.save("filename",format="jason")

Check data

>>> df.dtypes Return df column names and data types
>>> df.show() Display the content of df
>>> df.head() Return first n rows
>>> df.first() Return first row
>>> df.take(2) Return the first n rows
>>> df.schema Return the schema of df
>>> df.describe().show() Compute summary statistics
>>> df.columns Return the columns of df
>>> df.count() Count the number of rows in df
>>> df.distinct().count() Count the number of distinct rows in df
>>> df.printSchema() Print the schema of df
>>> df.explain() Print the (logical and physical) plans

Image Processing

# spark 2.3 provoid the ImageSchema.readImages API
image_df = spark.read.format("image").option("dropInvalid", true).load("/path/to/images")
# the structure of output dataframe is like
image: struct containing all the image data
 |    |-- origin: string representing the source URI
 |    |-- height: integer, image height in pixels
 |    |-- width: integer, image width in pixels
 |    |-- nChannels: integer, number of color channels
 |    |-- mode: integer, OpenCV type
 |    |-- data: binary, the actual image
# Then we can use sparkML to build and train the model, blew is a sample crop and resize process
from mmlspark import ImageTransformer
tr = (ImageTransformer() # images are resized and then cropped
    .setOutputCol(“transformed”)
    .resize(height = 200, width = 200)
    .crop(0, 0, height = 180, width = 180) )

smallImages = tr.transform(images_df).select(“transformed”)

Broadcast and Accumulator

# Broadcast is a read-only variable to reduce data transfer, mostly we use it for "lookup" operation. In Azure data warehouse, there is a similar structure named "Replicate".
from pyspark.sql import SQLContext
from pyspark.sql.functions import broadcast
 
sqlContext = SQLContext(sc)
df_tiny = sqlContext.sql('select * from tiny_table')
df_large = sqlContext.sql('select * from massive_table')
df3 = df_large.join(broadcast(df_tiny), df_large.some_sort_of_key == df_tiny.key)

# Accumulator is a write-only(except spark driver) structure to aggregate information across executor. We can understand it as a global variable, but write-only.
from pyspark import SparkContext 

sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(1) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([2,3,4,5]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)

Someone might ask ADF dataflow can do almost same thing, is there any difference? In my understanding till now, NO. ADF dataflow need to translate to spark SQL which is the same engine with dataframe. If you like coding and familiar with python and pandas, or you want to do some data exploration/data science tasks, choose dataframe, if you like GUI similar to SSIS to do something like ELT tasks, choose ADF dataflow.

Reference:

A Tale of Three Apache Spark APIs: RDDs vs DataFrames and Datasets, https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

PySpark Cheat Sheet: Spark DataFrames in Python, https://www.datacamp.com/community/blog/pyspark-sql-cheat-sheet

pyspark.sql module, http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html

Spark Overview, http://spark.apache.org/docs/latest/

ADF tends to replace SSIS

Essentially, ADF(Azure Data Factory) only takes responsibility of Extraction and Load in the ELT pipeline. Then we have to use another tool, like databrick to write jupyter notebook to manipulate dataframe or RDD to complete transform activities. However, as Microsoft launched data “Data Flow” in ADF, it becomes more and more similar to SSIS. Most of ETL work can be done in ADF with nicely and friendly GUI. For ETL stuff, I copied a cheat sheet comparing the dataflows between ADF and SSIS. It maybe helpful.

Surprisingly, ADF didn’t provide SCD(slowing changing Dimension). You have to manually work on it. It won’t be hard although. Just set up some start and end time. Here is the article about it.

Anyway, for me. I like this way to complete some simple but repeatable actives with GUI, and let some complex operations in databrick.

Share some useful/special MS SQL tips as a data engineer





If you are a data scientist, you maybe never need to do the data preprocess work, like ETL/ELT, performance tunning or OLTP database design. Everything is already prepared in the structured data warehouse or flat file, it is beauty and nice. Regarding to data quality, all a data scientist need to do is handle some missing or wrong value, then clear the relationship and do the analysis. I didn’t say it is easy after preprocess, what I mean is data engineer really does lots of time-consuming work for the final success. So I wanna summary and share some of my experience, it maybe can save data engineer much time. And I am welcome if someone can correct me and add more information, please send me email: neo_aksa@hotmail.com

1. Incremental Loading. We get three method to do incremental loading.

A. Merge clause. It’s very simple. just the combination of update and insert.

MERGE INTO
target_table tg_table
USING source_table src_table
ON ( src_table.id = tg_table.id )
WHEN MATCHED
THEN UPDATE SET tg_table.name = src_table.name
WHEN NOT MATCHED
THEN INSERT ( tg_table.id, tg_table.name ) VALUES ( src_table.id, src_table.name );

B. CDC(change data capture) in SSIS. More information see my another topic: “Incremental Load DW by using CDC in SSIS

C.Lookup + conditional split in SSIS. Essentially it is as same as the method A. Not find goes to “Insert”, find goes to “update”.

2. CTE. Before CTE coming out, we write the SQL with many sub queries which is a little bit hard to read since the logic is reversed. Now with the help of CTE, we can make our codes more readable and get rid of function in group by.

-- return the customers who had over $10,000 in purchase for their first three transactions.
with OrderRank
as
( 
select custID, row_number() over(partition by custID order by orderID) as Rank, amount from SalesOrder
),
OrderOver
as
(
select custID, sum(amount) as totalAmount from OrderRank where rank<=3 group by custID
)
select custID, totalAmount from OrderOver where totalAmount>10000

3. Delete duplicate row. This is very common job as lots of data are manual input. Here we have two simple ways to handle it.
A. use “Sort” component in SSIS, check reduce duplication box.
B. Write script. Using CTE to mark the row number, then delete the row number greater than 1

With CTE RemoveDuplicate
AS
(
-- partition and order by columns which decide duplication 
select ROW_NUMBER() over (partition id,name.. order by id,name) as row id, column ....... from tablename
)
delete from RemoveDuplicate where row_id > 1

4. Faster Loading. SQL Server defaults isolation level is “Read committed”. But in most of case, we don’t need it as we only need to load all the data from OLTP. There are two ways to make loading faster and not lock another jobs.
A. use “WITH(NOLOCK)” in statement level.

SELECT FirstName, LastName
FROM EmployeeInfo WITH(NOLOCK)
WHERE EmpID = 1;

B. use “Set Transaction ISOLATION LEVEL” to read uncommitted.

SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

5. Use Column stored index in data warehouse. Column stored index is very helpful to increase the select performance since the column in the same page, but bad for insert or update. For the fact table with many different values, it is very good for full table scan. Just remember, if we create clustered columnstore index, we cannot create primary key, and all columns should be included into this clustered columnstore index.

--BASIC EXAMPLE: Create a nonclustered index on a clustered columnstore table.  
--Create the table  
CREATE TABLE t_account (  
    AccountKey int NOT NULL,  
    AccountDescription nvarchar (50),  
    AccountType nvarchar(50),  
    UnitSold int  
);  
GO  
  
--Store the table as a clustered columnstore.  
CREATE CLUSTERED COLUMNSTORE INDEX taccount_cci ON t_account;  
GO  
  
--Add a nonclustered index for table seek.
CREATE UNIQUE INDEX taccount_nc1 ON t_account (AccountKey);

6. Bulk data load. We maybe find its very slow to bulk load huge tables into data warehouse. This is because we missed some steps before loading.
A. drop the clustered index for large table before loading.
B. recreate index for the large table after loading.
C. Update statistics.

7. Update view. Typically, we use view to hide the logic and table behead, and make loading more easier. But in some cases, we need to update view( yes, we dont want to know the detail of view, we just need to update some data). SQL SERVER provides ability to update view directly and indirectly.
A. If the view match following limitation, you can do DML operation directly. a. no subquery and only select b. no distinct or group by(aggregation=NO) c. No order by d. if view contains multiple tables, you can only insert/update one table. e. use ‘with check option‘, otherwise, you will update the data out of you exception.
B. Use instead of trigger to update tables which related to view.

CREATE TRIGGER trigUnion ON vwUnionCustomerSupplier
INSTEAD OF UPDATE
AS
BEGIN
SET NOCOUNT ON
DECLARE @DelName nvarchar(50)

IF (SELECT inserted.Type FROM inserted) Is Null
RETURN

SELECT @DelName = deleted.CompanyName FROM deleted

IF (SELECT inserted.Type FROM inserted) = 'Company'
UPDATE Customers
SET CompanyName =
  (SELECT CompanyName
  FROM inserted)
  WHERE Customers.CompanyName =
  @DelName
ELSE
UPDATE Suppliers
SET CompanyName =
  (SELECT CompanyName
  FROM inserted)
  WHERE Suppliers.CompanyName =
  @DelName
END

8. Deadlock or long running Query. It’s not normal. but if you find your ELT or ETL is running for a long time. It may be caused by deadlock. check it by sys.dm_tran_lock. or we can use sys.dm_exec_query_stats to get the query running information.

9. Use windows function for rolling aggregation. We can set the row or range option to achieve running aggregation in MS SQL. By default, Range is default option.

-- running total
select customer id, orderId, amount, sum(amount) over (order by orderid) runningtotal from sales_order (in tempDB)
-- revised running total 
select customer id, orderId, amount, sum(amount) over (order by orderid rows unbounded preceding) runningtotal from saels_order (in memory) running total
-- runningtotal from sales order(in memory) all sum, very useful in partition with subtotal
select customer id, orderId, amount, sum(amount) over (order by orderid rows between unbounded preceding and unbounded following) 
-- running 3 month total from sales (in memory)
select customer id, orderId, amount, sum(amount) over (order by orderid rows between 1 preceding and 1 following) 

10. Covering Index. An index that contains all information required to resolve the query is known as a “Covering Index” . If the fields from “select” are not in non-cluster or cluster index, the “key lookup” will happen in execution plan.

To meet the covering Index, but we don’t want move new column into non-clustered index, we can use “Included columns“. It will keep non index in the leaf node of the index.

CREATE NONCLUSTERED INDEX [ix_Customer_Email] ON [dbo].[Customers]
(
            [Last_Name] ASC,
            [First_Name] ASC
)
INCLUDE ( [Email_Address]) WITH (PAD_INDEX  = OFF, STATISTICS_NORECOMPUTE  = OFF, SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS  = ON, ALLOW_PAGE_LOCKS  = ON) ON [PRIMARY]

11. Schema Binding. schema binding is used for view and function.
Objects that are referenced by schema bound objects cannot have their definition changed. it can also significantly increase the performance of user defined functions

CREATE FUNCTION dbo.GetProductStatusLabel
(
  @StatusID tinyint
)
RETURNS nvarchar(32)
WITH SCHEMABINDING
AS
BEGIN
  RETURN (SELECT Label FROM dbo.ProductStatus WHERE StatusID = @StatusID);
END

12. Table/Index partitioning. If you are working on Azure or cluster platform, please skip this. The HDFS has already helps you to complete similar thing. But if you still work on-prem, table partitioning will help to improve performance a lot. Essentially, table partitioning is creating more than one filegroup to improve its I/O. There are four steps to create partition for table or index.
A. Add filegroups and files

-- Adds four new filegroups to the AdventureWorks2012 database  
ALTER DATABASE AdventureWorks2012  
ADD FILEGROUP test1fg;  
GO  
ALTER DATABASE AdventureWorks2012  
ADD FILEGROUP test2fg;  
-- Adds one file for each filegroup.  
ALTER DATABASE AdventureWorks2012   
ADD FILE   
(  
    NAME = test1dat1,  
    FILENAME = 'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\t1dat1.ndf',  
    SIZE = 5MB,  
    MAXSIZE = 100MB,  
    FILEGROWTH = 5MB  
)  
TO FILEGROUP test1fg;  
ALTER DATABASE AdventureWorks2012   
ADD FILE   
(  
    NAME = test2dat2,  
    FILENAME = 'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\t2dat2.ndf',  
    SIZE = 5MB,  
    MAXSIZE = 100MB,  
    FILEGROWTH = 5MB  
)  
TO FILEGROUP test2fg;  
GO  

B. Add partition function: how map to the partitions based on column’s value

-- Creates a partition function called myRangePF1 that will partition a table into four partitions  
CREATE PARTITION FUNCTION myRangePF1 (int)  
    AS RANGE LEFT FOR VALUES (100) ;  
GO  

C. Add partition scheme: map the partition function to filegourps.

-- Creates a partition scheme called myRangePS1 that applies myRangePF1 to the four filegroups created above  
CREATE PARTITION SCHEME myRangePS1  
    AS PARTITION myRangePF1  
    TO (test1fg, test2fg) ;  
GO  

D. participating column: partition function uses it to perform partition

-- Creates a partitioned table called PartitionTable that uses myRangePS1 to partition col1  
CREATE TABLE PartitionTable (col1 int PRIMARY KEY, col2 char(10))  
    ON myRangePS1 (col1) ;  
GO

13. Defragmentation. According to Mircorsoft suggestion, if fragment greater than 30%, we need to rebuild index, if between 5% – 30%, we need to reorganize index. We can use sys.dm_db_index_physical_stats to check the avg_fragmentation_in_percent.

-- use sys.dm_db_index_pysical_stats to check fregmanet
select * from   sys.dm_db_index_physical_stats(DB_ID(N'AdventureWorks2017'),OBJECT_id(N'AdventureWorks2017.Person.Person'),-1,null,'detailed')
-- Check avg_fragmentation_in_percent
-- if this percent 
--> 5% and < = 30%
ALTER INDEX REORGANIZE
--> 30%
ALTER INDEX REBUILD WITH (ONLINE = ON) 1

14. Other convenient code.

-- get all column names of spec table
sp_coulumns table_name, table_owner

-- Object Dependencies
sp_depends table_name, table_owner

-- convert if fail
Try_Convert(data_ype(length), expression, style)

-- split string by demilation.
SELECT * FROM STRING_split('A,B,B',',')
select column1, column2 from table1
cross apply string_split(column3,',')

--  returns the last day of the month containing a specified date, with an optional offset.
EOMONTH ( start_date [, month_to_add ] ) 

-- check the object
IF OBJECT_ID('Sales.uspGetEmployeeSalesYTD', 'P') IS NOT NULL

-- dynamic SQL
-- use sp_executesql
SET @ParmDefinition = N'@BusinessEntityID tinyint'; /* Execute the string with the first parameter value. */ 
SET @IntVariable = 197; 
EXECUTE sp_executesql @SQLString, @ParmDefinition, @BusinessEntityID = @IntVariable;
-- use exec
SET @columnList = 'AddressID, AddressLine1, City'SET @city = '''London'''
SET @sqlCommand = 'SELECT ' + @columnList + ' FROM Person.Address WHERE City = ' + @city
EXEC (@sqlCommand)

A Failed Text Classification

  • — version 1@20190401
  • –version 2@20190402: change to category to 2

Today I tried a text classification task where the data is about the message on the flights and labeled into 5 levels. Observably, it is a supervised problem. And I though there are bunch of solutions already for this kind of problem. So that I started with full of confidence. But…. the result was so bad, no more than 35% accurate for 5 classification. Only a little bit better than guess.

ModelAcc%
word tf-idf with kernal SVM32.6
word tf-idf with random forest31.3
Naïve Bayes35.3
word embedding(FastText) with GRU30.3

The detail can be found in Google Colaboratory.

I am considering following reasons leading this failure:

  1. The module is easy to over fitting. For example, when GRU model’s training loss decreasing, the invalidation loss was decreasing in the beginning, but after 40 epochs, it started to increasing or jumped up/down.
  2. Since I used trained embedding model(FastText) which is based on wiki but the dataset is in civil aviation. The words and word vectors may far away.
  3. In the data source, there might be lack of significant or clear rules to classify them to 5 categories. If we just label it to binary “attention/no worry”. The result will be better.

@20190402: I change the category from 5 to 2, hopefully the result would be better. But NO improvement. Only 63.5% for 2 categories.

From Qlikview to Tableau, a comparison from developer’s viewpoint

— 03/26/2019 version 0.1

In the field of BI tools, the three wildly used are Tableau, PowerBI and Qlik. According to Gartner’s report, all of them are leaders in the market. although there are some difference in features and operations, I think they can do the most of typical visualization work. (Don’t compared them with D3 or matplotlib, the complexity and target customer are totally different)

Tableau, Qlikview and PowerBI are leaders in the market

The downside of this situation for the BI developer or data analyzer is that you have to learn all of them since you never know your company or clients environments in advance. So, like Einstein said:

“The measure of intelligence is the ability to change.”

Albert Einstein

I touched Qlikview almost 7 years ago, compared with SSRS, it brought new ideas and quick access ability for BI. And 3 years ago, I started to use Tableau, I never thought it could be so easy and fast developing. In this post, I will share some ideas through compare Performance, Visualization, Suitable , ETL, LOD or Set analysis, Table calculation, Tooltips, Sets/Filter/Group etc between Qlikview and Tableau as a developer used both of them.

  • Performance. Qlikview and Tableau are both working well in performance. They use in-memory tech to accelerate the speed. You won’t feel much difference when use them to handle the data with small or middle size ( less than 1 million). But above this size, Tableau is slower than Qlikview since Qlikview is only based on your RAM, Tableau uses cube and RAM. The speed of Qlikiview more depends on your design in your model where the more sync tables the more computation to refresh data, that’s the reason of slow Qlikview.
  • Visualization. Tableau provides a fashion, simple, drop-drag method to operate the dashboards. It is very quick to develop a new dashboard without do much modeling work. Qlikivew provides relatively complex but flexible dashboard and tables. Yes, you are right, “Table” is much better in Qlikview. and another advantage is drilldown, the cycle drill down feature is convenience for managers to find the key points in the lower level. However, as to Map function, I have to say, Qlikivew has much to learn from Tableau. In Tableau, you can: 1. set map by country, state or latitude, longitude 2. import spatial file 3. set custom image as map, and set x, y coordinate.
Use x, y coordinate in a custom image
  • Suitable: in a nuts, Tableau is good for both end-user and IT; Qlikview is good for IT. Tableau is better to develop dashboards and rapid developing for specific purposes, like sales growth analysis. Qlikivew is better to develop enterprise BI solution( all in One). It is easy to understand since Qlikview is based on its model which connects everything together and reveals on the UI.
  • ETL. Both of two companies said they can do some part of ETL work. In my opinion, neither of them can replace ETL tools, their tools for ETL are simple and armature. Maybe Qliview doing a little better in incremental loading with QVD files. Without incremental loading, Tableau cannot handle large size data. Hope it can solve it soon
use QVD files for incremental loading
  • LOD or Set analysis. Level of Detail(Tableau) and Set analysis(Qlikview) are my favor features. It allows us to control one or more dimensions. The only difference is set analysis allow developer to set value for specific dimension.
LOD in Tableau
Set analysis in Qlikview
  • Table calculation. This feature and Tooltips are two my favors in Tableau. In Qlikview, except simple percentage and cumulative sum, you have you code by yourself, like rolling sum “sum(aggr(rangesum(above(total sum({
    <Month=> }Amount),0,3)),Month))” . However, in Tableau, Table calculation gives more convince experience. Also you can choose effect area between table and pane.
logic of custom table calculation in Tableau
  • Tooltips. This is my second favor feature in Tableau. Long time ago, I was hoping Qliview could provide subchart in tooltip. But until now, it still can only text context in tooltips.
subchart in tooltips gives end-user more relative information rather than text one.
  • Filter, Set, Group: In Qlikview, there is no corresponding concept. Filter is only fields in the control panel; set is much like bookmark; and group is mostly done in the script. In Tableau, you need to set filter repeatedly with its working scope; set is a dynamic sub dataset, you can set compute set or in/out set in the global or region level; group is static sub dataset in the region level. From Qlikview point, it is hard to understand “Set”, but it is just a True/False flag essentially.
Set detail members of in/out in set with parameters
  • Others. Qlikview can do lots of work in its script, I mean everything you can image since it includes vbscript in module function. It seems tableau can do jscript as well, but you won’t want to use it. Tableau provide “Story” feature, the user won’t need to export to PPT to do the second developing.

I didn’t mention some soft or hardware features, like rapid prototyping ability or device supporting. I will put them in the future poster.

Brief analysis on recommendation system of Netflix & YouTube

–03/19/2019 version 0.1


Last week, my wife told me she logged into my netflix’s account, then she found it was not hers immediately since the items did not match her tastes. This activated my interesting in the recommendation system of Netflix & Youtube which are the most watched channels in US. (maybe spotfiy will be the same way). Here I want to give a brief analysis how they work.

Basic

Before we quickly look how many different manners(that I knew) used in the recommendation systems.

Popularity. This is the simplest way in term of PV. It works very good for new users and avoid the “cold start” problem. However, the downside is this method can not provide the personalized recommendation. The way to optimize it is adding some categories at the beginning so that users can filter the categories by themselves.
Collaborative filtering (CF). The Collaborative Filtering (CF) algorithms are based on the idea that if two clients have similar rating history then they will behave similarly in the future (Breese,Heckerman, and Kadie, 1998). It can also split into two subcategories, one is Memory-based, another is Model-based.

  • Memory-based approach can be divided into User-based and Item-based.  They find the similar users or similar items respectively in term of Pearson Correlation.
    • User-based.
      1. Build correlation matrix S which is symmetric.

            \[S(i,k)=\frac{\sum_j (v_{ij}-\bar{v_i})(v_{kj}-\bar{v_k})}{\sqrt{\sum_j (v_{ij}-\bar{v_i})^2(v_{kj}-\bar{v_k})^2}}\]

      2. select top k users who has the largest scores.
      3. identify items that similar users like but the prediction user has not seem before.  The prediction of a recommendation is based on the wighted combination of the selected neighbor’s rating.

            \[p(i,k)=\bar{v}_i+\frac{\sum_{i=1}^{n}(v_{ij}-\bar{v}_k)\times S(i,k)}{\sum_{i=1}^{n}S(i,k)}\]

      4. pick up top N of movies based on the predicted rating.
    • Item-based.
      1. Build correlation matrix S based on items. (similar to user-based)
      2. Get the top n movies that prediction user watched and rated before.
      3. return the movies that mostly related to these n movies and the prediction user has never watched.
    • In the real word. The size of user are growing faster than item, and they are easy to be changed. So item-based are most frequency used. 
  • Model-based approach are based on matrix factorization which is popular in  dimension reduction. Here we use Singular value decomposition(SVD) to explain. 

        \[X_{n*m}=U_{n*r}\cdot S_{r*r}\cdot V_{r*m}^T\]

    , where U represents the freature vectors corresponding to the users in the latent space with dimension r, V represents the feature vectors corresponding to the items in the latent space with dimension r.  Once we find U and V, we can calculate any p(i,j) by U_i \cdot V_j.
  • CF is based on historical data, it has “cold start” problem. and the accuracy of prediction is based on the mount of data since the CF matrix has sparsity problem, e.g, few mistake rating will effect the prediction seriously. 

Contented-based(CB).  This approach is based on the information of item itself rather than only rating in CF approach. We need to create meta data for the items. These meta data can be tagged manual or use TF-IDF tech to automatically extra keywords. Then build the connection between the item that prediction user liked and the items with similar meta data. CB avoid of “cold start” and “over recommend” problems, however, it is hard to metain and keep accuracy of meta data.  

Hybrid. It combined CF and CB. We can merge the prediction together or set the weights in different scenarios. 

Deep Learning. In the large scale dataset, it is hard to use traditional recommendation system because of 4V(volume, variety, velocity, and veracity).   Deep learning model are good at solving complex problem( A review on deep learning for recommender systems: challenges and remedies).  We will introduce deep learning model used by YouTube in the next section.

Netflix

I firstly log into the Netflix to find some information provided by the official website. Fortunately, there was a topic How Netflix’s Recommendations System Works. They didn’t give much detail about algorithms but the provides the clues which information they are using for predict users’ choices. Blew is their explanation:

We estimate the likelihood that you will watch a particular title in our catalog based on a number of factors including:

  • your interactions with our service (such as your viewing history and how you rated other titles),

  • other members with similar tastes and preferences on our service (more info here), and

  • information about the titles, such as their genre, categories, actors, release year, etc.

So, we can guess it is a hybrid approach combined with CF(item-base and user-based) and CB approaches. But we don’t know how they design it at this moment. Let keep reading from the official website.

In addition to knowing what you have watched on Netflix, to best personalize the recommendations we also look at things like:

  • the time of day you watch,

  • the devices you are watching Netflix on, and

  • how long you watch.

These actives are not mentioned in the basic section. They are all used as input vector for the deep learning model which we will see in YouTube section. 

It also mentioned “Cold start” problem:

When you create your Netflix account, or add a new profile in your account, we ask you to choose a few titles that you like. We use these titles to “jump start” your recommendations. Choosing a few titles you like is optional. If you choose to forego this step then we will start you off with a diverse and popular set of titles to get you going.

It’s clear they use popularity approach with categories to solve “cold start” problem. As user has more historical information, Netflix will use another approaches to replace the initial one. 

They also personalized row  and title inside:

In addition to choosing which titles to include in the rows on your Netflix homepage, our system also ranks each title within the row, and then ranks the rows themselves, using algorithms and complex systems to provide a personalized experience. …. In each row there are three layers of personalization:

  • the choice of row (e.g. Continue Watching, Trending Now, Award-Winning Comedies, etc.)
  • which titles appear in the row, and
  • the ranking of those titles.

They calculate the score for each item for each users, then sum up these scores into each category to decide the order of rows. As I said, we don’t know how they mix CB and CF to get the score of each item yet. But they are mixed for sure. 

 

YouTube

As Google’s product, it is not surprised that YouTube uses Deep learning as a solution for recommendation system. It is too large both in user and item aspects. A simple stats model can not handle it well.  In the paper “Deep Neural Networks for YouTube Recommendations“, they explained how they use DL to YouTube.

 

It has two parts: Candidate Generation and Ranking. One for filtering hundred candidates from millions, second for sorting by adding more scenario or video features information. Let’s see how they work:

  • Candidate generation.  For candidate generation, it filters from millions videos, so it only uses user activities and scenario information. The basic idea is getting  probabilities of watching specific video V through user U and context C.

        \[P(w_t=i|U,C)=\frac{e^{v_iu}}{\sum_{j\in{v}}e^{v_ju}}}\]

    . The key point is to get user vector u and v.  To get user vector u, author embeds the video watches and search tokens, then average them into watch vector and search vector, then combined with other geogrphic , video ages and gender vectors to get through 3 connected ReLU layer. The output is user vector u.  To get video vectors v, we need to use u to predict probabilities for all v through softmax. After training, the video vector v is what we want. In the serving processing, we only need to put u and v together to calculate the top N highest probability vectors. 
  • Ranking.Compared with Candidate Generation, the number of videos is much less. So we can put more video features into the embedding vectors. These features are mostly focus on scenario, like topic of video, how many videos the user watched under each topic and time since last watch.  It embeds categorical features with shared embeddings and continuous features with powers of normalization. 

Reference:

  • Deep Neural Networks for YouTube Recommendations, Paul Covington, Jay Adams, Emre Sargin, https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/45530.pdf
  • How Netflix’s Recommendations System Works, n/a, https://help.netflix.com/en/node/100639
  • Finding the Latent Factors | Stanford University, https://www.youtube.com/watch?v=GGWBMg0i9d4&index=56&list=PLLssT5z_DsK9JDLcT8T62VtzwyW9LNepV
  • Recommendation System for Netflix, Leidy Esperanza MOLINA
    FERNÁNDEZ, https://beta.vu.nl/nl/Images/werkstuk-fernandez_tcm235-874624.pdf
  • 现在推荐算法都发展成什么样了?来看看这个你就知道了!,章华燕, https://mp.weixin.qq.com/s?__biz=MzIzNzA4NDk3Nw==&mid=2457737060&idx=1&sn=88ef898f5054ae9b8cb005c31b65ee2d&chksm=ff44bf3ac833362c2436002be265c390b033d3e7709553fd8b603e6269d58f366f689beb2639&mpshare=1&scene=1&srcid=#

Love, Death & Robots

img
More than 80 per cent of the TV shows and movies people watch on Netflix are discovered through the platform’s recommendation system. We are not choose the TV, the algorithm does.
In all the services that I am using or used, including youtube, spotify, netflix, hulu etc. The best and most sticky ones are spotify and netflix. But why? In personal, I think these points maybe:
1. the algorithm combined with recommend system with user preference. Not like youtube, it uses auto recommend system too aggressive which means it always give some information that I didn’t like. Netflix/Spotify is more smart, we can choose the catalogs we like in the beginning or give use the main catalogs which makes us easier to choose. In another way, it offers something rather than nothing for a new user.
2. Netflix has hired real life humans to categorize every bit of TV shows and movies and apply tags to each of them in order to create hyperspecific micro genres such as “Visually-striking nostalgic dramas” or “Understated romantic road trip movies”. Since size of movies is controlled by Netflix itself, the manual tag work is more precise than auto tags.
3. They create original movies that based on big data. Okay, that’s maybe not good. But they did, and we love the movies.
img

Text Auto Summarization(Extraction)

Recently I was given a topic to research a manner to summary the text automatically. So I shared some my search results, hope it is helpful.

Summarization Methods

we can classify summarization methods into different types by input type, the purpose and output type. Typically, extractive and abstractive are the most common ways.

img

Here, we would like introduce two methods for Extractive. One is Stats-based , another is Deep Learning-based.

Stats-based

  1. Idea: for each word, we would give a weight frequency. For each sentence, we summary the weight frequency for the words inside. Then pick up the sentences ordered by the sum of weight frequency.
  2. Steps
    2.1 Preprocessing: replace extra whitespace characters or delete some parts we do not need to analysis.
replace = {
ord('\f') : ' ',
ord('\t') : ' ',
ord('\n') : ' ',
ord('\r') : None
}
data.translate(replace)

2.2 Tokenizing the sentence

sent_list = nltk.sent_tokenize(content)

2.3 Get frequency of each word

stopwords = nltk.corpus.stopwords.words('english')
word_frequencies = {}
for word in nltk.word_tokenize(formatted_article_text):
if word not in stopwords: if word not in word_frequencies.keys():
word_frequencies[word] = 1
else:
word_frequencies[word] += 1

2.4 Weighted frequency of occurrence

maximum_frequncy = max(word_frequencies.values())
for word in word_frequencies.keys():
word_frequencies[word] = (word_frequencies[word]/maximum_frequncy)

2.5 Calculate the sum of weight frequency for each sentence

sentence_scores = {}
for sent in sentence_list:
for word in nltk.word_tokenize(sent.lower()):
if word in word_frequencies.keys():
if len(sent.split(' ')) &lt; 30:
if sent not in sentence_scores.keys():
sentence_scores[sent] = word_frequencies[word]
else:
sentence_scores[sent] += word_frequencies[word]

2.6 sort sentences in descending order of sum

import heapq
summary_sentences = heapq.nlargest(7, sentence_scores, key=sentence_scores.get)
summary = ' '.join(summary_sentences)
print(summary)

Deep Learning-based

  1. Idea: vectorizing each sentence into a high dimension space, then cluster the vector using kmean, pick up the sentences which mostly close to the center of each cluster to form the summery of text.
  2. steps:
    2.1 prepossessing and tokenizing the sentence( same as stats-based method)
    2.2 Skip-Thought Encoder

img

Encoder Network: The encoder is typically a GRU-RNN which generates a fixed length vector representation h(i) for each sentence S(i) in the input.
Decoder Network: The decoder network takes this vector representation h(i) as input and tries to generate two sentences — S(i-1) and S(i+1), which could occur before and after the input sentence respectively.

These learned representations h(i) are such that embeddings of semantically similar sentences are closer to each other in vector space, and therefore are suitable for clustering.

img

Skip-Thoughts Architecture

import skipthoughts
# You would need to download pre-trained models first
model = skipthoughts.load_model()
encoder = skipthoughts.Encoder(model)
encoded =  encoder.encode(sentences)

2.3 Clustering

import numpy as np
from sklearn.cluster import KMeans

n_clusters = np.ceil(len(encoded)**0.5)
kmeans = KMeans(n_clusters=n_clusters)
kmeans = kmeans.fit(encoded)

2.4 Summerization

from sklearn.metrics import pairwise_distances_argmin_min

avg = []
for j in range(n_clusters):
idx = np.where(kmeans.labels_ == j)[0]
avg.append(np.mean(idx))
closest, _ = pairwise_distances_argmin_min(kmeans.cluster_centers_, encoded)
ordering = sorted(range(n_clusters), key=lambda k: avg[k])
summary = ' '.join([email[closest[idx]] for idx in ordering])

Reference:

  1. Unsupervised Text Summarization using Sentence Embeddings,https://medium.com/jatana/unsupervised-text-summarization-using-sentence-embeddings-adb15ce83db1
  2. Skip-Thought Vectors, https://arxiv.org/abs/1506.06726
  3. Text Summarization with NLTK in Python, https://stackabuse.com/text-summarization-with-nltk-in-python/

Math in Machine Learning

Linear Algebra

  • mathematics of data: multivariate, least square, variance, covariance, PCA
  • equotion: y = A \cdot b, where A is a matrix, b is a vector of depency variable
  • application in ML
    1. Dataset and Data Files
    2. Images and Photographs
    3. One Hot Encoding: A one hot encoding is a representation of categorical variables as binary vectors. encoded = to_categorical(data)
    4. Linear Regression. L1 and L2
    5. Regularization
    6. Principal Component Analysis. PCA
    7. Singular-Value Decomposition. SVD. M=U*S*V
    8. Latent Semantic Analysis. LSA typically, we use tf-idf rather than number of terms. Through SVD, we know the different docments with same topic or the different terms with same topic
    9. Recommender Systems.
    10. Deep Learning

Numpy

  • array broadcasting
    1. add a scalar or one dimension matrix to another matrix. y = A + b where b is broadcated.
    2. it oly works when when the shape of each dimension in the arrays are equal or one has the dimension size of 1.
    3. The dimensions are considered in reverse order, starting with the trailing dimension;

Matrice

  • Vector
    1. lower letter. \upsilon = (\upsilon<em>1, \upsilon</em>2, \upsilon_3)
    2. Addtion, Substruction
    3. Multiplication, Divsion(Same length) a*b or a / b
    4. Dot product: a\cdot b
  • Vector Norm
    1. Defination: the length of vector
    2. L1. Manhattan Norm. L<em>1(\upsilon)=|a</em>1| + |a<em>2| + |a</em>3| python: norm(vector, 1) . Keep coeffiencents of model samll
    3. L2. Euclidean Norm. L<em>2(\upsilon)=\sqrt(a</em>1^2+a<em>2^2+a</em>3^2) python: norm(vector)
    4. Max Norm. L<em>max=max(a</em>1,a<em>2,a</em>3) python: norm(vector, inf)
  • Matrices
    1. upper letter. A=((a<em>{1,1},a</em>{1,2}),(a<em>{2,1},a</em>{2,2}) )
    2. Addtion, substruction(same dimension)
    3. Multiplication, Divsion( same dimension)
    4. Matrix dot product. If C=A\cdot B, A’s column(n) need to be same size to B’s row(m). python: A.dot(B) or A@B
    5. Matrix-Vector dot product. C=A\cdot \upsilon
    6. Matrix-Scalar. element-wise multiplication
    7. Type of Matrix
      1. square matrix. m=n. readily to add, mulitpy, rotate
      2. symmetric matrix. M=M^T
      3. triangular matrix. python: tril(vector) or triu(vector) lower tri or upper tri matrix
      4. Diagonal matrix. only diagonal line has value, doesnot have to be square matrix. python: diag(vector)
      5. identity matrix. Do not change vector when multiply to it. notatoin as I^n python: identity(dimension)
      6. orthogonal matrix. Two vectors are orthogonal when dot product is zeor. \upsilon \cdot \omega = 0 or \upsilon \cdot \omega^T = 0. which means the project of \upsilon to \omega is zero. An orthogonal matrix is a matrix which Q^T \cdot Q = I
    8. Matrix Operation
      1. Transpose. A^T number of rows and columns filpped. python: A.T
      2. Inverse. A^{-1} where AA^{-1}=I^n python: inv(A)
      3. Trace. tr(A) the sum of the values on the main diagonal of matrix. python: trace(A)
      4. Determinant. a square matrix is a scalar representation of the volume of the matrix. It tell the matrix is invertable. det(A) or |A|. python: det(A) .
      5. Rank. Number of linear indepent row or column(which is less). The number of dimesions spanned by all vectors in the matrix. python: rank(A)
    9. Sparse matrix
      1. sparsity score = \frac{count of non-zero elements}{total elements}
      2. example: word2vector
      3. space and time complexity
      4. Data and preperation
        1. record count of activity: match movie, listen a song, buy a product. It usually be encoded as : one hot, count encoding, TF-IDF
      5. Area: NLP, Recomand system, Computer vision with lots of black pixel.
      6. Solution to represent sparse matrix. reference
        1. Dictionary of keys:  (row, column)-pairs to the value of the elements.
        2. List of Lists: stores one list per row, with each entry containing the column index and the value.
        3. Coordinate List: a list of (row, column, value) tuples.
        4. Compressed Sparse Row: three (one-dimensional) arrays (A, IA, JA).
        5. Compressed Sparse Column: same as SCR
      7. example
        1. covert to sparse matrix python: csr_matrix(dense_matrix)
        2. covert to dense matrix python: sparse_matrix.todense()
        3. sparsity = 1.0 – count_nonzero(A) / A.size
    10. Tensor
      1. multidimensional array.
      2. algriothm is similar to matrix
      3. dot product: python: tensordot()

Factorization

  • Matrix Decompositions
    1. LU Decomposition
      1. square matrix
      2. A = L\cdot U \cdot P, L is lower triangle matrix, U is upper triangle matrix. P matrix is used to permute the result or return result to the orignal order.
      3. python: lu(square_matrix)
    2. QR Decomposition
      1. n*m matrix
      2. A = Q \cdot R where Q a matrix with the size mm, and R is an upper triangle matrix with the size mn.
      3. python: qr(matrix)
    3. Cholesky Decomposition
      1. square symmtric matrix where values are greater than zero
      2. A = L\cdot L^T=U\cdot U^T, L is lower triangle matrix, U is upper triangle matrix.
      3. twice faster than LU decomposition.
      4. python: cholesky(matrix)
    4. EigenDecomposition
      1. eigenvector: A\cdot \upsilon = \lambda\cdot \upsilon, A is matrix we want to decomposite, \upsilon is eigenvector, \lambda is eigenvalue(scalar)
      2. a matrix could have one eigenvector and eigenvalue for each dimension. So the matrix A can be shown as prodcut of eigenvalues and eigenvectors. A = Q \cdot \Lambda \cdot Q^T where Q is the matrix of eigenvectors, \Lambda is the matrix of eigenvalue. This equotion also mean if we know eigenvalues and eigenvectors we can construct the orignal matrix.
      3. python: eig(matrix)
    5. SVD(singluar value decomposition)
      1. A = U\cdot \sum \cdot V^T, where A is m*n, U is m*m matrix, \sum is m*m diagonal matrix also known as singluar value, V^T is n*n matrix.
      2. python: svd(matrix)
      3. reduce dimension
        1. select top largest singluar values in \sum
        2. B = U\cdot \sum<em>k \cdot V</em>k^T, where column select from \sum, row selected from V^T, B is approximate of the orignal matrix A.
        3. `python: TruncatedSVD(n_components=2)

Stats

  • Multivari stats
    1. variance: \sigma^2 = \frac{1}{n-1} * \sum<em>{i=1}^{n}(x</em>i-\mu)^2, python: var(vector, ddof=1)
    2. standard deviation: s = \sqrt{\sigma^2}, python:std(M, ddof=1, axis=0)
    3. covariance: cov(x,y) = \frac{1}{n}\sum<em>{i=1}^{n}(x</em>i-\bar{x})(y_i-\bar{y}), python: cov(x,y)[0,1]
    4. coralation: cor(x,y) = \frac{cov(x,y)}{s<em>x*s</em>y}, normorlized to the value between -1 to 1. python: corrcoef(x,y)[0,1]
    5. PCA
      1. project high dimensions to subdimesnion
      2. steps:
        1. M = mean(A)
        2. C = A-M
        3. V = cov(C)
        4. values,vector = eig(V)
        5. B = select(values,vectors), which order by eigenvalue
      3. scikit learn

        pca = PCA(2) # get two components
        pca.fit(A)
        print(pca.componnets_) # values
        print(pca.explained_variance_) # vectors
        B = pca.transform(A) # transform to new matrix
    • Linear Regression
    1. y = X \cdot b, where b is coeffcient and unkown
    2. linear least squares( similar to MSE) ||X\cdot b - y|| = \sum<em>{i=1}^{m}\sum</em>{j=1}^{n}X<em>{i,j}\cdot (b</em>j - y_i)^2, then b = (X^T\cdot X)^{-1} \cdot X^T \cdot y. Issue: very slow
    3. MSE with SDG

Reference: Basics of Linear Algebra for Machine Learning, jason brownlee, https://machinelearningmastery.com/linear_algebra_for_machine_learning/