How to build a data pipeline in Databricks

For a long term, I thought there was no pipeline concept in Databricks. For the most engineers they will write the whole script into one notebook rather than split into several activities like in Data factory. In this case, we have to rewirte everything in the script when the next pipeline coming.

But recently, I found there was indeed a way to achieve pipeline in databricks. Here is the document. And follows my understanding.

  • Create widgets as parameters in callee notebook
dbutils.widgets.text("input", "default") # define a text widget in callee notebook
......
dbutils.notebook.exit(defaultText) # set return variable
  • Call callee notebook through caller notebook(we can write our pipeline in this caller notebook, then call all following activity notebooks)
status = dbutils.notebook.run("pipeline",30,{"input":"pass success"}) # transfer input = “pass success” to a notebook called pipeline
  • Callee notebook can return a string to indicate the execution statues( you still remember dbutils.notebook.exit in callee notebook right?)
returned_string = dbutils.notebook.run("pipeline2", 60) 
  • The data exchange between two notebooks can be done with global tempview which locates in memory.
# in callee
sqlContext.range(10).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

# in caller
returned_table = dbutils.notebook.run("pipeline2", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
  • We can also call function in another notebook, which means we can build a library for all notebook
%run ./aclaraLibary # in this notebook, there are two functions which are overloaded.
simple(1,2)
simple(1,2,3)

First step into Azure loT Edge

loT + AI + Cloud = Edge computing.

This is my understanding of edge computing. As the raising of loT, more and more AI work will be dedicated to the local machines, they may be weaker than the central cloud, but it provides significantly quicker reaction.

Frankly, I only have experience to install the models on raspberry Pi 3 which can be seem as a standalone system. In the real world, there are hundreds of endpoints, it is impossible to maintain them separately, and there is also existing communication between endpoints and cloud to transmit data.

Azure gives its own solution called Azure loT Edge. It builds on the top of loT hub(an bi-direction instant message control component )and has three major components: loT module, loT runtime and cloud interface. Let’s figure out some concepts first, then we tried to build a real systems.

Diagram - Quickstart architecture for device and cloud
Figure 1: loT edge architecture

loT hub: is a managed service, hosted in the cloud, that acts as a central message hub for bi-directional communication between your IoT application and the devices it manages. It provides a interface to manage all the loT edge devices and all the modules for delivery to devices.

loT runtime: hosted on device, has two responsibilities: communication and manage modules. These two roles are performed by two parts: loT edge hub and loT agent. loT edge hub takes responsibility of communication between loT hub and loT device and between loT modules, so we can consider loT edge hub is a local proxy for loT hub. loT agent runs and monitors modules on the device.

loT module: is the smallest unit to achieve the business logic on the device. It could be Azure service( AI, Azure function etc.) or the custom functions written by C#, java, python, node.js…. Modules are docker images pulled from Azure register service or Docker hub. The maximum number of modules is 20 in a single device. Each modules has four elements: docker image, instance, identity and twin. The instance is a running image on the device, identity is initialed when the instance created and it stored in loT hub(cloud) as well, in case we launch the communication between cloud and device. Twin is Json document contains some state information, like metadata, configuration and conditions. There are two types of properties in twin: desired properties and reported properties. They have different write/read authorities for device and cloud.

Figure 2: device twin. tags is used for grouping devices when we delivery modules.

The last thing is deployment manifest, which is a Json document tells device which modules to install and how to configure them. Please find the detail in the comments(start with “//”) blew.

{
  "modulesContent": {
// $edgeAgent indicates the docker version
    "$edgeAgent": {
      "properties.desired": {
        "schemaVersion": "1.0",
        "runtime": {
          "type": "docker",
          "settings": {
            "minDockerVersion": "v1.25",
            "loggingOptions": "",
            "registryCredentials": {}
          }
        },
// system module includes information of imges of edgeAgent & edgeHub, "createOptions" indicates when to launch these images.  
        "systemModules": {
          "edgeAgent": {
            "type": "docker",
            "settings": {
              "image": "mcr.microsoft.com/azureiotedge-agent:1.0.7",
              "createOptions": "{}"
            }
          },
          "edgeHub": {
            "type": "docker",
            "status": "running",
            "restartPolicy": "always",
            "settings": {
              "image": "mcr.microsoft.com/azureiotedge-hub:1.0.7",
              "createOptions": "{\"HostConfig\":{\"PortBindings\":{\"5671/tcp\":[{\"HostPort\":\"5671\"}],\"8883/tcp\":[{\"HostPort\":\"8883\"}],\"443/tcp\":[{\"HostPort\":\"443\"}]}}}"
            }
          }
        },
// modules includes all information about custom modules or Azure service 
        "modules": {
          "camera-capture": {
            "version": "1.0",
            "type": "docker",
            "status": "running",
            "restartPolicy": "always",
            "settings": {
              "image": "glovebox/camera-capture-opencv:1.1.45-arm32v7",
              "createOptions": "{\"Env\":[\"Video=0\",\"azureSpeechServicesKey=2f57f2d9f1074faaa0e9484e1f1c08c1\",\"AiEndpoint=http://image-classifier-service:80/image\"],\"HostConfig\":{\"PortBindings\":{\"5678/tcp\":[{\"HostPort\":\"5678\"}]},\"Devices\":[{\"PathOnHost\":\"/dev/video0\",\"PathInContainer\":\"/dev/video0\",\"CgroupPermissions\":\"mrw\"},{\"PathOnHost\":\"/dev/snd\",\"PathInContainer\":\"/dev/snd\",\"CgroupPermissions\":\"mrw\"}]}}"
            }
          },
          "image-classifier-service": {
            "version": "1.0",
            "type": "docker",
            "status": "running",
            "restartPolicy": "always",
            "settings": {
              "image": "glovebox/image-classifier-service:1.1.4-arm32v7",
              "createOptions": "{\"HostConfig\":{\"Binds\":[\"/home/pi/images:/images\"],\"PortBindings\":{\"8000/tcp\":[{\"HostPort\":\"80\"}],\"5679/tcp\":[{\"HostPort\":\"5679\"}]}}}"
            }
          }
        }
      }
    },
// $edgeHub includes routers which is a data pipeline
    "$edgeHub": {
      "properties.desired": {
        "schemaVersion": "1.0",
        "routes": {
          "camera-capture": "FROM /messages/modules/camera-capture/outputs/output1 INTO $upstream"
        },
        "storeAndForwardConfiguration": {
          "timeToLiveSecs": 7200
        }
      }
    }
  }
}

Let’s start to do a demo step by step.

  • Create a loT hub. There are three ways to create a loT hub. One is through the Azure portal, the second through the CLI, the third through vs code.
// through CLI
az iot hub create --resource-group {resource group name} --name {hub_name} 
Figure 3: Create loT hub through Azure loT hub toolkit.
  • Register an loT Edge device in the loT hub. In this step, we actually do nothing for the device. We create the device ID and connect string for using later.
// through CLI
az iot hub device-identity create --hub-name {hub_name} --device-id myEdgeDevice --edge-enabled
az iot hub device-identity show-connection-string --device-id myEdgeDevice --hub-name {hub_name}
// connection string will be created
"connectionString": "HostName={host name};DeviceId=loTEdgeTest;SharedAccessKey=Hn/60BimXvYBM16y0hgwZtGtcERG+FQI0FEFUtVCiXA="
Figure 4: Add a loT Edge device through Azure portal. The connection string will automatically created.
Figure 5: Add a loT Edge device through VS code. Click “Get Device Info” to get the connection string.
  • Install loT runtime and manage the device. Depending on the environment, we can install loT runtime on Linux or Linux containers on Windows. (sorry, I think few windows loT device existing). Linux container on windows is usually used for developing and testing in local, it is very easy to configure since everything done in the image, you can find the steps here. We am going through the how to set up in Linux.
// register repository for Ubuntu server 18.04
curl https://packages.microsoft.com/config/ubuntu/18.04/multiarch/prod.list > ./microsoft-prod.list
// copy list to container list
sudo cp ./microsoft-prod.list /etc/apt/sources.list.d/
// install Microsoft GPG public key
curl https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor > microsoft.gpg
sudo cp ./microsoft.gpg /etc/apt/trusted.gpg.d/

//install container runtime
sudo apt-get update
sudo apt-get install moby-engine
sudo apt-get install moby-cli

// install lot edge security daemon
// security daemon is used for launching edge hub and edge agent
sudo apt-get update
sudo apt-get install iotedge

// manual provision
// set up the connection string you got in previous step
sudo nano /etc/iotedge/config.yaml
# Manual provisioning configuration
provisioning:
  source: "manual"
  device_connection_string: "<ADD DEVICE CONNECTION STRING HERE>"

// verify the installation
sudo systemctl restart iotedge
sudo iotedge list

There is another automatic method to manage the devices, called DPS(Device provisioning Service). But I am planing to talk about it as a new topic.

  • Developing module. As we know, each module is an image located on cloud register or local. Then it is deployed through loT hub to loT edge device. Fortunately, Microsoft has provided an integrated solution in Visual Studio Code. We mostly can do everything in vs code, rather than switch back and forth. Before we start, we need install some extensions for vs code: Azure IoT Tools , Docker extension, Azure IoT EdgeHub Dev Tool, and Visual Studio extension(s) specific to the language( I installed Python extension and C# extension ). Don’t forget to install common language environment: .net core or python, pip…..
Run New IoT Edge Solution
Figure 6: create a new loT edge solution

When you creating a module, you have to provide module’s image repository. It could be Azure container registry( <registry name>.azurecr.io/<module name> ) or local address ( localhost:5000/ <module name> )

Figure 7: solution structure on the left Tab

launch.json is debug configuration, config folder is the deployment manifest. modules folder has subfolders for each module, module.json file defines the Docker build process, the module version, and your docker registry, updating the version number, pushing the updated module to an image registry, and updating the deployment manifest for an edge device triggers the Azure IoT Edge runtime to pull down the new module to the edge device; deployment.template.json file is used for build process, define what module to build, what router used and what version of runtime. If you right click this file, you can find the option to create deployment manifest.

Figure 8: create deployment manifest through build template file
  • Test and Deployment. We can deploy the module to local docker container by right click” deployment.template.json ” and choose “build and push loT edge solution”(or simulator)
Figure 9: Test the module

Deployment is easy as well. right click the json file under config folder(this json file is generate by tempalte.json). choose “create deployment for single device”, then choose device ID to finish single device deployment.

Figure 10: select single device deployment or auto scale deployment
  • Manage device and module on Cloud. If you already upload the module to cloud container registry. I can deploy instantly through Azure portal and this process is automatically.
Figure 11: Set modules. we can add an existing module on container registry or add a custom model by click the device ID in Azure portal.

Next time, I will try to deploy an image classification module to raspberry(or Linux virtual machine) to achieve a simple edge machine learning.

Reference:

Quickstart: Deploy your first IoT Edge module to a Linux device, https://docs.microsoft.com/en-us/azure/iot-edge/quickstart-linux

Creating an image recognition solution with Azure IoT Edge and Azure Cognitive Services, https://dev.to/azure/creating-an-image-recognition-solution-with-azure-iot-edge-and-azure-cognitive-services-4n5i

loT EDGE document, https://docs.microsoft.com/en-us/azure/iot-edge/

Multi Processing in Python

Recently we get the work to improve the ETL efficiency. The latency mostly happen in I/O and file translation, since we only use one thread to handle it. So we want to use multi processing to accelerate this task.

Before everything, I think I need to introduce the difference between multi processing and multi threading in python. The major distance is multi threading is not parallel, there can only be one thread running at any given time in python. You can see it in blew.

  • Actually for CPU heavy tasks, multi threading is useless indeed. However it’s perfect for IO
  • Multiprocessing is always faster than serial, but don’t pop more than number of cores
  • Multiprocessing is for increasing speed. Multi threading is for hiding latency
  • Multiprocessing is best for computations. Multi threading is best for IO
  • If you have CPU heavy tasks, use multiprocessing with n_process = n_cores and never more. Never!
  • If you have IO heavy tasks, use multi threading with n_threads = m * n_cores with m a number bigger than 1 that you can tweak on your own. Try many values and choose the one with the best speedup because there isn’t a general rule. For instance the default value of m in ThreadPoolExecutor is set to 5 [Source] which honestly feels quite random in my opinion.

In this task, we utilized multi processing.

Pro

  1. 5x-10x faster than single core when utilizing 20 cores. (6087 files, 27s (20 cores) vs 180s(single core)
  2. no need to change current coding logic, only adapt to multiprocess pattern.
  3. no effect to upstream and down stream.
Image

Corn:

  1. need to change code by scripts, no general framework or library 
  2. manually decide which part to be parallel 

How to work:

  • Library: you need to import these libraries to enable multi processing in python
from multiprocessing import Pool
from functools import partial
import time # optional
  • Function: you need two functions. parallelize_dataframe is used split dataframe and call multi processing, multiprocess is used for multiprocessing. These two function need to put outside of main function. The yellow marked parameters can be changed by needs.
# split dataframe into parts for parallelize tasks and call multiprocess function
def parallelize_dataframe(df, func, num_cores,cust_id,prodcut_fk,first_last_data):
    df_split = np.array_split(df, num_cores)
    pool = Pool(num_cores)
    parm = partial(func, cust_id=cust_id, prodcut_fk=prodcut_fk,first_last_data=first_last_data)
    return_df = pool.map(parm, df_split)
    df = pd.DataFrame([item for items in return_df for item in items])
    pool.close()
    pool.join()
    return df

# multi processing
# each process handles parts of files imported
def multiprocess(dcu_files_merge_final_val_slice, cust_id,prodcut_fk,first_last_data):
  • Put another script into main function, and utilize  parallelize_dataframe  to execute.
if __name__ == '__main__':
.......
......
                   # set the process number
                    core_num = 20
                    rsr_df = parallelize_dataframe(dcu_files_merge_final_val,multiprocess,core_num, cust_id, prodcut_fk,first_last_data)
                    print(rsr_df.shape)
                    end = time.time()
                    print('Spend:',str(end-start)+'s')

I can’t share the completed code, but you should understand it very well. There are some issues that I have not figure out the solution. The major one is how to more flexible to transmit the parameters to the multicore function rather than hard coding each time.

update@20190621

We added the feature that unzip single zip file with multi processing supporting utilizing futures library.

import os
import zipfile
import concurrent

def _count_file_object(f):
    # Note that this iterates on 'f'.
    # You *could* do 'return len(f.read())'
    # which would be faster but potentially memory
    # inefficient and unrealistic in terms of this
    # benchmark experiment.
    total = 0
    for line in f:
        total += len(line)
    return total


def _count_file(fn):
    with open(fn, 'rb') as f:
        return _count_file_object(f)

def unzip_member_f3(zip_filepath, filename, dest):
    with open(zip_filepath, 'rb') as f:
        zf = zipfile.ZipFile(f)
        zf.extract(filename, dest)
    fn = os.path.join(dest, filename)
    return _count_file(fn)



def unzipper(fn, dest):
    with open(fn, 'rb') as f:
        zf = zipfile.ZipFile(f)
        futures = []
        with concurrent.futures.ProcessPoolExecutor() as executor:
            for member in zf.infolist():
                futures.append(
                    executor.submit(
                        unzip_member_f3,
                        fn,
                        member.filename,
                        dest,
                    )
                )
            total = 0
            for future in concurrent.futures.as_completed(futures):
                total += future.result()
    return total

Ref :

Multithreading VS Multiprocessing in Python, https://medium.com/contentsquare-engineering-blog/multithreading-vs-multiprocessing-in-python-ece023ad55a

Fastest way to unzip a zip file in Python, https://www.peterbe.com/plog/fastest-way-to-unzip-a-zip-file-in-python

Delta Lake Step by Step(1)

Before we start to talk about delta lake, we have to take time to deal with data lake and understand why we need to use data lake. Blew is the best definition I think.

A data lake is a repository for structured, unstructured, and semi-structured data. Data lakes are much different from data warehouses since they allow data to be in its rawest form without needing to be converted and analyzed first.

Devin Pickell 

Data Lake stored everything(raw) with every format. In a short nut, it is a repository. e.g. in Azure, Data Lake Gen2 is a repository based on Blob storage.

What’s the difference between data lake and data warehouse?

Table 1. Difference between data warehouse and data lake

From table 1, we can figure out that dw and dl use in the different scenarios. Data warehouse is used for the classic data analysis, and the data stored in structured rational table; while data lake is used for big data, ELT situation, every raw data coming into data lake then doing following actions. Since data lake is on the cluster cloud, it is very faster to handle the raw data.

What is the downside of data lake?

  • No Transaction.
  • Schema on read.
  • No version control.

These three downsides are very critical in applying data lake into real project. It will affect data quality and efficiency both. It is why most big data projects are failed. Because the data lake can not provide stable data like the database.

Delta Lake comes out!

Figure 1. Delta Lake created ACID and Transaction based on Parquet.

Delta Lake has its own abilities which enables a stable data lake for analysis.

  • Open source storage layer, based on parquet file
  • ACID transaction: serialize isolation
  • Scalable meta data handling: spark distribution data processing
  • stream and batch uniform: batch table as well as stream source
  • Schema enforcement
  • Time travel: data version

Let’s see some simple samples. If you are familiar with spark, you can change to delta lake super quick.

# set up
# upgrade pyspark
pip install --upgrade pyspark
# install
pyspark --packages io.delta:delta-core_2.12:0.1.0


# write table
df= spark.read.csv("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", header="true", inferSchema="true")
df.write.format("delta").save("/delta/diamonds")
# append
df.write.format("delta").mode("append").save("/delta/events")
# we can also partition the table
df.write.format("delta").partitionBy("date").save("/delta/events")
# update
data.write.format("delta").mode("overwrite").option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'").save("/tmp/delta-table")
# by default overwrite doesn't change schema, only if option("overwriteSchema", "true") exiting
# add column automatically
df.write.format("delta").mode("append").option("mergeSchema", "true").save("/delta/events")


# read
df = spark.read.format("delta").load("/tmp/delta-table")
SELECT * FROM delta.`/tmp/delta-table`

# time travel
# create a read version 
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events") 
# For timestamp_string, only date or timestamp strings are accepted
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

# structured stream
streamingDf = spark.readStream.format("delta").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
# set mini-batch
spark.readStream.format("delta").option("maxFilesPerTrigger",1000).load()
# ignore updates and deletes with "ignoreDeletes" and "ignoreChanges"
# append mode and complete mode
streamingDf = spark.writeStream.format("delta").outputMode("append").load() 
streamingDf = spark.writeStream.format("delta").outputMode("complete").load()

stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()
stream.stop()

# check point
.option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")

In next article, I will talk about when to use delta lake and how to optimize it in data brick.

Here is one of my example which covert three small files into one delta table: https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2499619125013057/2952616815059893/2608746070181068/latest.html

Reference:

  1. Delta Lake Document. https://delta.io
  2. What Is a Data Lake and Why Is It Essential for Big Data? https://learn.g2.com/what-is-a-data-lake
  3. Delta Lake Document for databricks. https://docs.databricks.com/delta

Night Sight with Google Camera

As an amateur photographer, I was believing DSL is better than phone camera since it has a much larger CMOS so that it can receive more photons, until I installed Google Camera on my Galaxy S8. The results surprised me a lot. I mean, it performed better than my Conan 5DIII in the most of case without editing in the software like photoshop.

Except HDR+ and portrait mode, Google camera provides a magic mode called Night Sight. Unfortunately, this can only work on Pixel series (Pixel 3/3a is the best) phone with hardware support. You can find the A/B Testing here.

left: disable Night Sight; right: enable the Night Sight.

How does Night Sight improve the quality of shots in the night?

  • HDR+. HDR+ is the foundational function for Night Sight. It is a computational photography technology that improves this situation by capturing a burst of frames, aligning the frames in software, and merging them together. Since each frame is short enough to prevent the blur caused by hand shake, the result turns out to be sharper and wilder dynamic range than without HDR+.
This image has an empty alt attribute; its file name is IMG_20190518_101130.jpg

with HDR+, we can clear see the background in an indoor place
  • Positive-shutter-lag (PSL) . In the regular mode, Google camera uses zero-shutter-lag (ZSL) protocol which  limits exposures to at most 66ms no matter how dim the scene is, and allows our viewfinder to keep up a display rate of at least 15 frames per second. Using PSL means you need to hold still for a short time after pressing the shutter, but it allows the use of longer exposures, thereby improving SNR at much lower brightness levels. 
  • Motion metering. Optical image stabilization (OIS) is widely used in many devices to prevent hand shake. But it doesn’t help for long exposure and motion object. Pixel 3 adds motion metering to detect the motion object and adjust the exposure time for each frame. For example, if it detects a dog moving in the frame or we are using the tripod, it will increase exposure time.
  • Super Res Zoom.  HDR essentially uses algorithm to aliment and merge the frames to increase the SNR( signal to noise ratio). Pixel 3 provides a new algorithm called Super Res Zoom for super-resolution and reduce noise, since it averages multiple images together. Super Res Zoom produces better results for some nighttime scenes than HDR+, but it requires the faster processor of the Pixel 3.
  • Learning-based AWB algorithm. When it is dim, AWB( auto white balance) is not functional well. And it is an ill-posed problem, which means we cannot inverse the problem (find out the real color of object in the dark). In this case, Google camera utilizes machine learning to “guess” what is the true color and shift the white balance.
  •  S-curve into our tone mapping. As we know, if we exposure a picture for a long time, all the objects become brighter so that we can not figure out when this picture takes. Google uses sigmoid function to adjust the object brightness ( dark objects become darker, light objects become brighter).

Reference: Night Sight: Seeing in the Dark on Pixel Phones, https://ai.googleblog.com/2018/11/night-sight-seeing-in-dark-on-pixel.html

Brief Talk Object Detection Algorithm of YOLO

Figure 1: YOLO: Real-Time Object Detection

YOLO also know as You Only Look Once. Not like R-CNN, YOLO uses single CNN to do the object detection as well as localization which makes it super faster than R-CNN with only losing a little accuracy. From 2016 to 2018, YOLO has been imporved from v1 to v3. In this article, I will use a simple way to explain how YOLO works.

What tasks we need to solve in object detection problem?

Yolo use the same method as human to detect the object. There are three major steps: 1. is it an object? 2. what object is it? 3. where is the position and size of this object. BUT! Through CNN, YOLO can do these three things all together.

How YOLO solve this problem?

First, Let’s introduce Grid Cell in YOLO. The whole input image is divided into S \times S grid. Each grid cell predicts only one objects with fixed boundary boxes( say #B). For each boundary box has its own box confidence score to reflet the possibility of object. For each grid cell it predicts C conditional class probabilities( one per class). so that we will get
S\times S \times (B*5+C) predictions. Here 5 means central location(x,y), size( h,w) and confidence score of each boundary box.

Figure 2: the cell with red mark predicts two boundary boxes for a single object.

Then you will find so many boundary box from output. How we choose of them? Here we need to do Non-max suppression. The step is as blew:

  1. discard all boxes with box confidence less then a threshold. ( say 0.65)
  2. While there are any renaming box(overlapping):
    1. pick the box with the largest confidence that as a prediction
    2. discard any remaining boxes with IoU(intersection over union: you can see it as overlap size between two boundary box) greater than a threshold(say 0.5)
Figure 3: boundary boxes for each grid cell

After Non-max suppression, we need to calculate class confidence score , which equals to box confidence score * conditional class probability. Finally, we get the object with probability and its localization. (see Figure 1)

YOLO Network Design

Let’s see how YOLO v1 looks like. Input = 448*448 image, output = S\times S \times (B*5+C). There are 24 convolutional layers followed by 2 full connected layer for localization. It use sum-squared error between the predictions and ground truth to calculate loss which is consist of classification loss, localization loss and confidence loss.

Figure 4: YOLO v1 architecture

Classification loss:

Localization loss:

Confidence loss:

an object is detected in the box
an object is not detected in the box

The final loss add three components together.

YOLO V2

YOLO v2 improves accuracy compared with YOLO v1.

  1. Add batch normalization on all of the convolutional layers. It get more than 2% improvement in accuracy.
  2. High-resolution classifier. First fine tune the classification network at the full 448 \times 448 resolution for 10 epochs on IMageNet. This gives network time to adjust tis filters to work better on higher resolution input.
  3. Convolutional with Anchor Boxes. YOLO v1 can only predicts 98 boxes per images and it makes arbitrary guesses on the boundary boxes which leads to bad generalization, but with anchor boxes, YOLO v2 predicts more than a thousand. Then it use dimension cluster and direct location prediction to get the boundary box.
  4. Dimension Cluster. use K-mean to get the boundary boxes patterns. Figure 5 might be the most common boundary boxes in spec dataset.
Figure 5: 5 anchor boxes
Figure 6: remove the class prediction from the cell level to the boundary box level

5. Direct location prediction. Since we use anchor boxes, we have to predict on the offsets to these anchors.

Figure 6: Bounding boxes with dimension priors and location prediction

6. Multi-Scale Training. Every 10 batches, YOLOv2 randomly selects another image size to train the model. This acts as data augmentation and forces the network to predict well for different input image dimension and scale. 

YOLO v3

  1. Detection at three scales. YOLOv3 predicts boxes at 3 different scales. Then features are extracted from each scale by using a method similar to that of feature pyramid networks
  2. Bounding box predictions. YOLO v3 predicts the object score using logistic regression.
  3. Class prediction. Use independent logistic classifiers instead of softmax. This is done to make the classification multi-able classification.

Reference:

YOLOv1 : https://arxiv.org/abs/1506.02640

YOLOv2 : https://pjreddie.com/media/files/papers/YOLO9000.pdf

YOLOv3 : https://pjreddie.com/media/files/papers/YOLOv3.pdf

Machine Learning on Spark — How it works and why it doesn’t work well

Spark provides spark MLlib for machine learning in a scalable environment. MLlib includes three major parts: Transformer, Estimator and Pipeline. Essentially, transformer takes a dataframe as an input and returns a new data frame with more columns. Most featurization tasks are transformer. Estimator takes a dataframes as an input and returns a model(transformer), as we know the ML algorithms.. Pipeline combines transformer and estimator together.
Additionally, data frame becomes the primary API for MLlib. There is not any more new features for RDD based API in Spark MLib.

If you already understood or used high level machine learning or deep learning frameworks, like scikit-learning, keras, tersorflow, you will find everything is so familiar with. But when you use spark MLlib in practice, you still need third library’s help. I will talk about it in the end.

Basic Stats

# Corrlation
from pyspark.ml.stat import Correlation
r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

# Summarizer
from pyspark.ml.stat import Summarizer
# compute statistics for multiple metrics without weight
df.select(summarizer.summary(df.features)).show(truncate=False)

# ChiSquare
r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

Featurization

# TF-IDF
# stop word remove
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData)
# tokenize
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
# n grame
ngram = NGram(n=2, inputCol="wordsData", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
# word frequence
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# idf
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

# word2vec
word2Vec = Word2Vec(vectorSize=N, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

# binarizer
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)

# PCA
# reduce dimension to 3
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

# StringIndex
# encodes a string column of labels to a column of label of indices order by frequency or alphabet
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)

# OneHotEstimator
# we need to use StringIndex first if apply to categorical feature
encoder = OneHotEncoderEstimator(inputCols=["categoryIndex1", "categoryIndex2"],
                                 outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)encoded = model.transform(df)

# Normalize & Scaler
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
l1NormData = normalizer.transform(dataFrame)
# standard scaler
# withMean=false: standard deviation, withMean=true: mean 
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)
# maxmin scaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# max abs scaler
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# bin
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# QuantileDiscretizer
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

# ElementwiseProduct
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")

# SQL Transformer
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

# VectorAssembler
# combine vector together for future model inputs
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"], # the columns we need to combine
    outputCol="features") # output column

# Imputer
# handle missing value
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
imputer.setMissingValue(custom_value)

# slice vector
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

# ChiSqSelector
# use Chisqare to select the features
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

Clarification and Regression

# Linear regression
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# logistic regression
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) 
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial") # multinomial

# decision tree
# classification
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# regression
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Random forest
# classification
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# regeression
rf = RandomForestRegressor(featuresCol="indexedFeatures")
# gradient-boosted 
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# preceptron
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# SVM
# Linear SVM, there is no kernel SVM like RBF
lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Naive Bayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# KNN
knn = KNNClassifier().setTopTreeSize(training.count().toInt / 500).setK(10)

Clustering

# k-means
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# GMM
gmm = GaussianMixture().setK(2).setSeed(538009335)

Collaborative Filtering

als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)
# Evaluate the model by computing the RMSE on the test datapredictions = model.transform(test)evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

Validation

# split train and test
train, test = data.randomSplit([0.9, 0.1], seed=12345)
# cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

You might be already found the problem. The ecosystem of Spark MLlib is not as rich as scikit learning, and it is lack of deep learning (of course, its name is machine learning). According to Databricks documents, We still have the solutions.

  • Use scikit learning on single node. Very simple solution. but since scikit leanring load the data still in memory. If the note is faster enough(driver), we can get a good performance as well.
  • To solve deep learning problem. we have two work around methods.
    • Apply keras, tensorflow on single node with GPU acceleration(Recommend by databricks).
    • Distribute Training. It might be slower than on the single node because of communication overhead. There are two frameworks used for distribute training. Horovod and Apache SystemML. I’ve never use Horovod, but you can find information here. As to SystemML, it is more like a wrapper for high level API and provide cluster optimizer which parses the code into spark RDD(live variable analysis, propagate stats, rewrite by matrix decomposition and runtime instruction). From the official website, we know it is much faster than MLLib and native R. The problem is it didn’t update anymore since 2017.
# Create and save a single-hidden-layer Keras model for binary classification# NOTE: In a typical workflow, we'd train the model before exporting it to disk,# but we skip that step here for brevity
model = Sequential()
model.add(Dense(units=20, input_shape=[num_features], activation='relu'))
model.add(Dense(units=1, activation='sigmoid'))
model_path = "/tmp/simple-binary-classification"
model.save(model_path)

transformer = KerasTransformer(inputCol="features", outputCol="predictions", modelFile=model_path)

It seems no perfect solution for machine learning in spark, right? Don’t forget we have other time costing jobs: hyper parameters configuration and validation. We can run same model with different hyper parameters on different nodes using paramMap which similar to grid search or random search.

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [10, 100, 1000]).addGrid(lr.regParam, [0.1, 0.01]).build()
crossval = CrossValidator(estimator=pipeline,
                      estimatorParamMaps=paramGrid,
                      evaluator=RegressionEvaluator(),
                      numFolds=2)  # use 3+ folds in practice
cvModel = crossval.fit(training)

There might be someone saying: why we don’t use MPI? The answer is simple, too complex. Although it can gain the perfect performance, and you can do whatever you want even running distributed GPU + CPU codes, there are too many things we need to manually configuration on low level API without fail tolerance.

In conclusion, we can utilize spark for ELT and training/ validation model to maximize the performance(it did really well for these works). But until now, we still need third frameworks to help us do deep learning or machine learning tasks on single strong node.

Reference:

Spark MLlib: http://spark.apache.org/docs/latest/ml-guide.html

Databrick: https://docs.databricks.com/getting-started/index.html

Spark ML Tuning: http://spark.apache.org/docs/latest/ml-tuning.html

Harovod: https://github.com/horovod/horovod

Apache SystemML: https://systemml.apache.org/docs/1.1.0/beginners-guide-keras2dml

How to use Dataframe in pySpark (compared with SQL)

-- version 1.0: initial @20190428
-- version 1.1: add image processing, broadcast and accumulator
-- version 1.2: add ambiguous column handle, maptype

When we implement spark, there are two ways to manipulate data: RDD and Dataframe. I don’t know why in most of books, they start with RDD rather than Dataframe. Since RDD is more OOP and functional structure, it is not very friendly to the people like SQL, pandas or R. Then Dataframe comes, it looks like a star in the dark. The advantage of using Dataframe can be listed as follows:

  • Static-typing and runtime type-safety. We can know syntax error in compile time, saves developer lots of time.
  • High-level abstraction and tell what to do rather than how to do. If you ever touched pandas, well you will find they are almost same thing.
  • High performance. Yes. Dataframe is not only simple but also much faster than using RDD directly, As the optimization work has been done in the catalyst which generates an optimized logical and physical query plan.

For more information, we can find in this article.

After know why we need to use dataframe, let’s us see how to use it to handle daily work. To make it easier, I will compare dataframe operation with SQL.

Initializing Spark Session

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("example project") \
        .config("spark.some.config.option", "some-value") \ # set paramaters for spark
        .getOrCreate()

Create DataFrames

## From RDDs
>>> from pyspark.sql.types import *
# Infer Schema
>>> sc = spark.sparkContext
>>> lines = sc.textFile("people.txt")
>>> parts = lines.map(lambda l: l.split(","))
>>> people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
>>> peopledf = spark.createDataFrame(people)
# Specify Schema
>>> people = parts.map(lambda p: Row(name=p[0],
age=int(p[1].strip())))
>>> schemaString = "name age"
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
>>> schema = StructType(fields)
>>> spark.createDataFrame(people, schema).show()

## From Spark Data Source
# JSON
>>> df = spark.read.json("customer.json")

# Use Maptype to read dynamic columns from JSON
customSchema = StructType([
                StructField("col1", StringType(),True),
                                StructField("event", MapType(StringType(),StringType()))])
spark.read.schema(customSchema).jason(path)

# Parquet files
>>> df3 = spark.read.load("users.parquet")
# TXT files
>>> df4 = spark.read.text("people.txt")
# CSV files
>>> df5 = spark.read.format("csv").option("header", true).option("inferSchema", true).load("csvfile.csv")
# MS SQL
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"}
pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
# or we can use
# collection = spark.read.sqlDB(config)
display(df)

Dataframe Manipulation

from pyspark.sql import functions as F
# select & where
df.select("column1","column2", explod("phonenumber").alias("contactInfo"), df['age']>24).show()

# join
df = dfa.join(dfb, dfa.id==dfb.id & dfa.name == dfb.name, how ='left') 
df = dfa.join(dfb, dfa.id==dfb.id | dfa.name == dfb.name , how ='right')
df = dfa.join(dfb, dfa.id==dfb.id, how ='full')
df = dfa.join(dfb, dfa.id==dfb.id)
df = dfa.crossjoin(dfb)

# distinct count
from pyspark.sql.functions import countDistinct
df = df.groupby('col1','col2').agg(countDistinct("col3").alias("others"))

# ambiguous column handle
# both date and endpoint_id exist in two dataframes
df_result = df_result.join(df_result2, ["date","endpoint_id"],how="left") 

# exits and not exits
new_df = df.join(
    spark.table("target"),
    how='left_semi',
    on='id')

new_df = df.join(
    spark.table("target"),
    how='left_anti',
    on='id')

# when
df.select("first name", F.when(df.age>30,1).otherwise(0))

# like
df.select("firstName", df.lastName.like("Smith"))

# startwith-endwith
df.select("firstName", df.lastName.like("Smith"))

# substring
df.select(df.firstName.substr(1,3).alias("name"))

# between
df.select(df.age.between(22,24))

# add columns
df = df.withColumn('city',df.address.city) \
    .withColumn('postalCode',df.address.postalCode) \
    .withColumn('state',df.address.state) \
    .withColumn('streetAddress',df.address.streetAddress) \
    .withColumn('telePhoneNumber',
    explode(df.phoneNumber.number)) \
    .withColumn('telePhoneType',
    explode(df.phoneNumber.type))

# update column name
df = df.withColumnRenamed('prename','aftername')

# removing column
df = df.drop("ColumnName1","columnname2")

# group by
df.groupby("groupbycolumn").agg({"salary": "avg", "age": "max"})

# filter
df.filter(df["age"]>24).show()

# Sort
df.sort("age",ascending=False).collect()

# Missing & Replace values
df.na.fill(value)
df.na.drop()
df.na.replace(value1,value2)
df["age"].na.fill(value)

# repartitioning
df.repartition(10)\ df with 10 partitions
    .rdd \
    .getNumPartitions()
df.coalesce(1).rdd.getNumPartitions()

# union and unionAll
df.union(df2)

# windows function
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func
windowSpec = \
  Window
    .partitionBy(df['category']) \
    .orderBy(df['revenue'].desc()) \
    .rangeBetween(-3,3) # or rowframe:  .rowBetween(Window.unboundedPreceding, Window.currentRow)
dataFrame = sqlContext.table("productRevenue")
revenue_difference = \
  (func.max(dataFrame['revenue']).over(windowSpec) - dataFrame['revenue'])
dataFrame.select(
  dataFrame['product'],
  dataFrame['category'],
  dataFrame['revenue'],
  revenue_difference.alias("revenue_difference"))

from pyspark.sql.functions import percentRank, ntile
df.select(
    "k", "v",
    percentRank().over(windowSpec).alias("percent_rank"),
    ntile(3).over(windowSpec).alias("ntile3"))

# pivot & unpivot
df_data
    .groupby(df_data.id, df_data.type)
    .pivot("date")
    .agg(count("ship"))
    .show())

df.selectExpr(df_data.id, df_data.type, "stack(3, '2010', 2010, '2011', 2011, '2012', 2012) as (date, shipNumber)").where("shipNumber is not null").show()

# Remove Duplicate
df.dropDuplicates()

Running SQL queries

# registering Dataframe as vies
>>> peopledf.createGlobalTempView("people")
>>> df.createTempView("customer")
>>> df.createOrReplaceTempView("customer")

# Query view
>>> df5 = spark.sql("SELECT * FROM customer").show()
>>> peopledf2 = spark.sql("SELECT * FROM global_temp.people")\
.show()
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()

Output

# Data convert
rdd = df.rdd
df.toJSON().first()
df.toPandas()

# write and save
df.select("columnname").write.save("filename",format="jason")

Check data

>>> df.dtypes Return df column names and data types
>>> df.show() Display the content of df
>>> df.head() Return first n rows
>>> df.first() Return first row
>>> df.take(2) Return the first n rows
>>> df.schema Return the schema of df
>>> df.describe().show() Compute summary statistics
>>> df.columns Return the columns of df
>>> df.count() Count the number of rows in df
>>> df.distinct().count() Count the number of distinct rows in df
>>> df.printSchema() Print the schema of df
>>> df.explain() Print the (logical and physical) plans

Image Processing

# spark 2.3 provoid the ImageSchema.readImages API
image_df = spark.read.format("image").option("dropInvalid", true).load("/path/to/images")
# the structure of output dataframe is like
image: struct containing all the image data
 |    |-- origin: string representing the source URI
 |    |-- height: integer, image height in pixels
 |    |-- width: integer, image width in pixels
 |    |-- nChannels: integer, number of color channels
 |    |-- mode: integer, OpenCV type
 |    |-- data: binary, the actual image
# Then we can use sparkML to build and train the model, blew is a sample crop and resize process
from mmlspark import ImageTransformer
tr = (ImageTransformer() # images are resized and then cropped
    .setOutputCol(“transformed”)
    .resize(height = 200, width = 200)
    .crop(0, 0, height = 180, width = 180) )

smallImages = tr.transform(images_df).select(“transformed”)

Broadcast and Accumulator

# Broadcast is a read-only variable to reduce data transfer, mostly we use it for "lookup" operation. In Azure data warehouse, there is a similar structure named "Replicate".
from pyspark.sql import SQLContext
from pyspark.sql.functions import broadcast
 
sqlContext = SQLContext(sc)
df_tiny = sqlContext.sql('select * from tiny_table')
df_large = sqlContext.sql('select * from massive_table')
df3 = df_large.join(broadcast(df_tiny), df_large.some_sort_of_key == df_tiny.key)

# Accumulator is a write-only(except spark driver) structure to aggregate information across executor. We can understand it as a global variable, but write-only.
from pyspark import SparkContext 

sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(1) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([2,3,4,5]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)

Someone might ask ADF dataflow can do almost same thing, is there any difference? In my understanding till now, NO. ADF dataflow need to translate to spark SQL which is the same engine with dataframe. If you like coding and familiar with python and pandas, or you want to do some data exploration/data science tasks, choose dataframe, if you like GUI similar to SSIS to do something like ELT tasks, choose ADF dataflow.

Reference:

A Tale of Three Apache Spark APIs: RDDs vs DataFrames and Datasets, https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

PySpark Cheat Sheet: Spark DataFrames in Python, https://www.datacamp.com/community/blog/pyspark-sql-cheat-sheet

pyspark.sql module, http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html

Spark Overview, http://spark.apache.org/docs/latest/

ADF tends to replace SSIS

Essentially, ADF(Azure Data Factory) only takes responsibility of Extraction and Load in the ELT pipeline. Then we have to use another tool, like databrick to write jupyter notebook to manipulate dataframe or RDD to complete transform activities. However, as Microsoft launched data “Data Flow” in ADF, it becomes more and more similar to SSIS. Most of ETL work can be done in ADF with nicely and friendly GUI. For ETL stuff, I copied a cheat sheet comparing the dataflows between ADF and SSIS. It maybe helpful.

Surprisingly, ADF didn’t provide SCD(slowing changing Dimension). You have to manually work on it. It won’t be hard although. Just set up some start and end time. Here is the article about it.

Anyway, for me. I like this way to complete some simple but repeatable actives with GUI, and let some complex operations in databrick.