Data Factory CI/CD in Azure DevOps

Azure Pipeline is consist of two parts: pipeline and release. They represent
CI and CD separately.

  1. Build Pipeline – to build and test the code. The build creates an artifact that’s used by the rest of your pipeline to run tasks such as deploying to staging or production. 
  2. Release Pipeline – once the code is updated, built and packaged, it can be deployed to target services using Release Pipelines. 
top half: CI pipeline; down half: CD pipeline

Let’s talk about how to implement this process for Azure data factory. Before you start, I suppose your current ADF has matched these requirements (if not, please refer to here):

  • You already have a Azure Repos.
  • ADF is integrated with this Git Repos.
  • A key Vault. which is used for storing database and datalake connection information, plus all configuration parameters for release.

The whole ADF CI/CD pipeline is like this:

Building Pipeline (CI)

  • Create new pipeline, choose “adf_publish” for default branch
    • “adf_publish” branch is created by ADF automatically, after you click “publish” in your ADF GUI.
the three json files are used to set parameters in the different environments
  • Add new Agent job
  • search for “Data factory”, add a new publish artifacts
    • set the path to publish(git path)
    • set artifact publish location as Azure pipelines

Release Pipeline (CD)

left part: CI right part: CD

So next step is deploying artifact into three environments: DEV, QA and PROD.

use key vault to set sQL Server in pipeline
  • Import Azure Key Vault. where you stored all the connection information. We would use these information in two areas.
    • Release pipeline: get the basic information in each environment, e.g, database name, data lake name, etc. see the screenshot above.
    • template parameters used in adf_publish.
  • Set the Datafactory.
    • Add a “Azure Resource group deployment”. where we need to focus one the template path.
we have to set the ARM template and parameters which point to adf_publish branch
  • How to set up adf_publish
    • go back to azure repos.
    • switch to adf_publish.
    • create corresponding ARM parameter template for each environment. You can find an example blew.
the three json files are used to set parameters in the different environments
{
	"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
	"contentVersion": "1.0.0.0",
	"parameters": {
		"factoryName": {
			"value": "datafactory name"
		},
		"LS_AKV_KeyVault_properties_typeProperties_baseUrl": {
			"value": "https://XXXXkeyvault.vault.azure.net/"
		},
		"LS_SQL_ConfigDb_properties_typeProperties_connectionString_secretName": {
			"value": "sqlconnection"
		},
		"LS_DLS_Datalake_properties_typeProperties_url": {
			"value": "https://XXXXXdatalake.dfs.core.windows.net/"
		}
	}
}

If we need enable Continuous deployment trigger , we have to link with adf_publish branch as well.

click the little logo in release pipeline page.
set to adf_publish

Reference:

Azure DevOps Pipelines. https://docs.microsoft.com/en-us/azure/devops/pipelines/?view=azure-devops

Continuous integration and delivery in Azure Data Factory. https://docs.microsoft.com/en-us/azure/data-factory/continuous-integration-deployment

Use Azure Key Vault to pass secure parameter value during deployment. https://docs.microsoft.com/en-us/azure/azure-resource-manager/templates/key-vault-parameter?tabs=azure-cli

Apache Kafka in Practice – 1

First thing first, I should remind all visitors I am not a master in Kafka. Actually I am just a beginner learning through official Apache Kafka website and some free udemy class. There might be some mistakes although, I will fix them once I find.

I put this as Kafka in practice as I want to introduce some basic skills such as launching a Kafka service, creating a producer also consumer, UI tools and basic python library.

Install and Launch Kafka

Kafka is based on Java and Zookeeper( which is used to manage cluster). I recommend to use Linux or WSL on windows.

sudo apt update
sudo apt install openjdk-8-jdk

# test 
java -version

Download kafka, and unzip it

# download from website
wget <URL of kafka.tgz>

# unzip
tar -xvf kafka.****.tgz
cd <kafka_folder>

# test, run following
bin/kafka-topics.sh

Add command to path(optional)

# need to restart after these steps
nano ~/.bashrc
add this to the end:
PATH="$PATH:/home/<your name>/<kafka folder>/bin"

# check under any folder
kafka-topics.sh

Start Zookeeper

# change zookeeper data dictionary
mkdir data
mkdir data/zookeeper
nano config/zookeeper.properties
# add blew into zookeeper.properties:
dataDir=/home/name/data/zookeeper

# run zookeeper:
zookeeper-server-start.sh config/zookeeper.properties

Start a kafaka broker

mkdir data/kafka
nano config/server.properties

# modify server.properties to change log folder:
log.dirs=/home/name/data/kafka

# start kafka
kafka-server-start.sh config/server.properties

Right now you should have a zookeeper and a broker running. Then let’s do some operations about topic, producer and consumer.

Topic operations

# create a topic
# you can not have replication factor greater than available brokers
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --create --partitions 3 --replication-factor 1


# list all topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe


# delete topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic second_topic --delete

# recover in case broker starting failed after deleting topic
1. delete topic under broker log folder, which you can find log.dirs in server.property
2. start zookeeper
3. enter zookeeper shell, zookeeper-shell.sh host:port
    3.1 list the topics using: ls /brokers/topics
    3.2 remove topic:rmr /brokers/topics/yourtopic
4. restart kafka server

Create a console producer

# console producer
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic

# add some properties
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=all

# if you set a topic that not exists, the kafka will create a new topic, with one partition and one replication by default. we can change it in config/server.properties

Create a console consumer and Set consumer groups

# read from now on
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic
# read all messages
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

# set consumers in group
# all the message sending to this group will be split into all consumers
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group group1

# consumer groups
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group group1
# Lag: show how many message has not received yet
# reset offset ---> offset descide by consumer
# --to-earliest / --shift-by n [offsets]
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092  --group group1 --reset-offsets --to-earliest --execute  --topic first_topic
# when a consumer leaves, reblance will happen

Add Keys to producer

# keys
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --property parse.key=true --property key.separator=,
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,

UI Tools

kafka Tool is easy to use UI tool for manage topics, brokers, consumers. You can install it on Linux or windows. If you use WSL on windows, the zookeeper port is also open for windows.

# how to use in Linux
wget http://www.kafkatool.com/download2/kafkatool.sh
chmod +x kafkatool.sh
# after installation
cd ~/kafkatool2/
./kafkatool

Developing in Python

Kafka-python provides common functions for kafka. we can find more information here.

Consumer API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# basic
from kafka import KafkaConsumer
consumer = KafkaConsumer('first_topic',bootstrap_servers=['localhost:9092'],auto_offset_reset='earliest')
for msg in consumer:
    print (msg.value)

# other key parameters
# group_id, key_deserializer, value_deserializer,auto_offset_reset[earliest, latest,None]

# assign and seek offset
consumer.assign(partitions)
consumer.seek(partitions, offset) #Manually specify the fetch offset for a TopicPartition.
consumer.assignment() # read assigned partition
beginning_offsets(partitions)
end_offset(partitions)

Producer API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# basic
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],acks = 1)

for _ in range(10):
    future = producer.send('test-topic', b'some_message_bytes')
    # or add key to fix the partitions by ordering
    future = producer.send('test-topic', key = 'key', value = b'some_message_bytes')
    # result = future.get(timeout=60)  # do not block, it will kill performance
    # print(result)
# producer.flush()  # Block until all pending messages are at least put on the network
producer.close()  # make sure to .close() your producer before shutting down your application

Serialization and Deserialization

  • Serialization: the process of converting an object into a stream of bytes for the purpose of transmission
  • Deserialization: the opposite of Serialization
# Serialize json messages
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('fizzbuzz', {'foo': 'bar'})

# Deserialize msgpack-encoded values
consumer = KafkaConsumer(value_deserializer=msgpack.loads)  # same as key_deserializer
consumer.subscribe(['msgpackfoo'])
for msg in consumer:
     assert isinstance(msg.value, dict)

# Serialize string keys
producer = KafkaProducer(key_serializer=str.encode)
producer.send('flipflap', key='ping', value=b'1234')

# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
     producer.send('foobar', b'msg %d' % i)

Thread

Producer is thread safe, however, consumer is not. recommend use multiprocessing.

Client Compatibility

Always use the latest client library version, since older client/newer client can talk to any broker.

Reference:

kafka-python: https://github.com/dpkp/kafka-python

Apache Kafka Series - Learn Apache Kafka for Beginners v2

Apache Kafka Series – Learn Apache Kafka for Beginners v2

Apache Kafka Website, https://kafka.apache.org/

Apache Kafka Concepts and Theory.

It’s a little bit late to talk about Kafka, since this technology has been widely used for a long time. These days, I finally has time to learn it ans summary the major concepts inside. In this nutshell, I will split the page into three parts: why do we need it, basic concepts and how it works.

Why do we need Kafka?

Related image
before we use kafka

As we have more and more systems merged into a big network structure. and each system has their own protocols and communication methods. we have to take more time and endure the super complex structure. At the end, the cost will be raised as well.

Kafka as a central messaging bus

Kafka provides a central messaging bus with distributed, resilient and fault tolerant. Each source and target connects to kafka cluster as producers and consumers. They use same protocols to send and receive the messages(key-value and timestamp). We can seem it as a hybrid solution combining message system with distributed retention.

Concepts

Producers publish the data into kafka, it’s source of the data. which could be sensors, laptop, loT, logs etc.

  • producers choose which records to assign to which partition within the topic ( topic is stream of data, like table in database; partition is part of topic. we will talk about them soon)
  • Producers use round-robin or function to choose partition to archive load balance between nodes(brokers) in the cluster.
    • targetPartition=utils.abs(utils.murmur2(record.key()))%numPartitions
    • use murmur2 algorithm

Brokers are nodes/servers in the cluster. They are key features for kafka features like fault tolerance, high performance.

Thumbnail
  • broker identified by ID
  • each brokers contains certain topic partitions
  • connect any broker = connect to the entire cluster
  • typically 3 or more brokers to achieve redundancy
  • message in a topic are spread across partitions in difference brokers, each partition can be replicated aross multiple brokers

Consumers subscribe the message from kafka cluster.

  • consumer can be a single receiver or a distributed cluster named consumer group
  • a consumer group can read from topic in parallel

Topics are a particular stream of data which identified by its name. Each topic is split into partitions. We can set partition number and replication factor for each topic. Partition is ordered immutable sequence of records(messages) with their id named offset.

  • offset’s order is only guaranteed in one partition
  • data is kept only for a limited time(default is one week).
  • partition is immutable
  • consumer can choose any offset as start reading point, so that each consumer wouldn’t impact others.

Connector is used to connect between topics and app/database. A Source Connector (with help of Source Tasks) is responsible for getting data into kafka while a Sink Connector (with help of Sink Tasks) is responsible for getting data out of Kafka.

Image result for kafka connector
  • stream an entire sql database to kafka
  • stream kafka topics into hdfs
  • recommend to leverage build-in connectors

How does it work?

Producer to kafka

  • Producer chooses topic and partition to send the inbound message into kafka.
  • Topic splits into several partitions. Each partition has one broker as leader and zero or more brokers as follower named ISR (in sync replica) . Only the leader can receive and serve data for partitions. Other brokers will sync the data from the leader.
  • A topic with replication factor N, it can tolerate up to N-1 server failures.

kafka to consumers

  • As we mentioned ahead, consumer can be single node or consumer groups.
  • if consumer number is greater than partitions number, some consumers will be idle. 
  • if consumer number is less than partitions number, some consumers will receive messages from multiple partitions. 
  • if consumer number is equal to partitions number, each consumer reads messages in order from exactly one partition

Data consistency and availability

  • Partition
    • Messages sent to a topic partition will be appended to the commit log in the order they are sent
    • a single consumer instance will see messages in the order they appear in the log (message order only guaranteed in a partition)
    • a message is ‘committed’ when all in sync replicas have applied it to their log
    • any committed message will not be lost, as long as at least one in sync replica is alive.
  • Producer options
    • wait for all in sync replicas to acknowledge the message
    • wait for only the leader to acknowledge the message
    • do not wait for acknowledgement
  • consumer options
    • receive each message at most once
      • restart from the next offset without ever having processed the message
      • potentially message loss
    • receive each message at least once
      • restart and process message again. duplicate messages in downstream,
      • no data loss
      • ( recommended, downside stream handles duplicate message)
    • receive each message exactly once
      • transitional level
      • re read the last transaction committed
      • no data loss and no data duplication
      • significantly decreasing the throughput using a transaction

Architecture

Message structure

Fundamental data flow

Kafka in Azure through HDinsight

Reference

Apache Kafka official website. https://kafka.apache.org/intro.

Kafka in a Nutshell. https://sookocheff.com/post/kafka/kafka-in-a-nutshell/

Apache Kafka Series – Learn Apache Kafka for Beginners v2, https://tpl.udemy.com/course/apache-kafka/learn/lecture/11566878?start=180#overview

What is Apache Kafka in Azure HDInsight, https://docs.microsoft.com/en-us/azure/hdinsight/kafka/apache-kafka-introduction

Our new cloud architecture launched!

After so many discussion, evaluation and testing, we finally launched a basic architecture for Azure cloud. I hid some key words that explain the business flows underneaths. Basically, however it is good for all similar scenarios.

We tried to use tableau to read data from datalake directly, bot spark sql or native databrick JDBC are not stable for large size data(over 10,000,000). So we use RDBMS replace. However, if you already use powerBI, we tried, you can extract data from datalake directly without any problem.

Another thing is standards. Since we have lots of pipelines developed by a team. so we utilize data factory to standardize our components. But you can totally use coding style in databricks.

Git is very helpful in version control. ADF and databricks have provides the GUI and API to connect to Git as well.

One problem we have not solved yet is the storage life cycle which is fine in Blobstorage, but seems not ready in data lake gen2. I think Microsoft would fix it soon.

Set up MySQL on Azure Ubuntu and compare with Azure SQL

I will combine three parts: Create Ubuntu VM & attach data disk, Install and configure MySQL, Performance comparison with Azure SQL.

Create Ubuntu VM

  • Choose your size of VM. Here I used D4s_v3, which has 4 cores and 16GB memory. You need to choose disk for storage and set the initial admin password, I recommend premium SSD.
  • Open SSH access. Go to network, add SSH port 22 into your inbound port rules. Later we will add mySQL port 3306 as well.
  • Mount datadisk. Remember the disk for storage you chosen in the step 1? It wouldn’t mount automatically. So you need to do the following steps in your SSH.
dmesg | grep SCSI
sudo fdisk /dev/sdc
---------------------------------------
Warning: invalid flag 0x0000 of partition table 4 will be corrected by w(rite)

Command (m for help): n
Partition type:
   p   primary (0 primary, 0 extended, 4 free)
   e   extended
Select (default p): p
Partition number (1-4, default 1): 1
First sector (2048-10485759, default 2048):
Using default value 2048
Last sector, +sectors or +size{K,M,G} (2048-10485759, default 10485759):
Using default value 10485759
Command (m for help): p

Disk /dev/sdc: 5368 MB, 5368709120 bytes
255 heads, 63 sectors/track, 652 cylinders, total 10485760 sectors
Units = sectors of 1 * 512 = 512 bytes
Sector size (logical/physical): 512 bytes / 512 bytes
I/O size (minimum/optimal): 512 bytes / 512 bytes
Disk identifier: 0x2a59b123

   Device Boot      Start         End      Blocks   Id  System
/dev/sdc1            2048    10485759     5241856   83  Linux

Command (m for help): w
The partition table has been altered!

Calling ioctl() to re-read partition table.
Syncing disks.
------------------------------------------------
sudo mkfs -t ext4 /dev/sdc1
sudo mkdir /datadrive
sudo mount /dev/sdc1 /datadrive
------------------------------------------------
//if you want to automount, you need to edit fstab file
# retrieve UUID
ls -al /dev/disk/by-uuid/
# edit fstab 
sudo nano /etc/fstab
UUID=<ID> /datadrive auto defaults 0 0 

After these steps, you have done all configurations for Ubuntu. You can check the link by using df -H command.

Install and configure MySQL

  • Install MySQL.
sudo apt-get update
sudo apt-get install mysql-server
  • Allow remote access
sudo ufw enable
sudo ufw allow mysql

then edit “/etc/mysql/mysql.conf.d/mysqld.cnf ” to change bind-address to 0.0.0.0 which allow all ip to remote mySQL.

nano /etc/mysql/mysql.conf.d/mysqld.cnf
  • Start MySQL
sudo systemctl start mysql
  • Add a new root user. You can use any IP address to replace % blew.
CREATE USER '<username>'@'%' IDENTIFIED BY '<user password>';
  • Change the data dictionary. By default, the VM only provides 30GB, you have to use your extra disk to save the database.
# stop service
sudo systemctl stop mysql
# sync to new path
sudo rsync -av /var/lib/mysql /datadrive
# backup 
sudo mv /var/lib/mysql /var/lib/mysql.bak
# change configure files
sudo nano /etc/mysql/mysql.conf.d/mysqld.cnf
-----------------------
datadir=/datadrive/mysql
--------------------
# configure AppArmor Access Control
sudo nano /etc/apparmor.d/tunables/alias
-----------------------
alias /var/lib/mysql/ -> /datadrive/mysql/,
----------------------
sudo systemctl restart apparmor
# dummy file
sudo mkdir /var/lib/mysql/mysql -p
# restart service
sudo systemctl start mysql

Then you can use select * from @@datadir to check the data dictionary.

Performance comparison with Azure SQL

  40,000,000 rows AzureSQL Ubuntu+mySQL
Write(from databricks) 25mins 31mins
Read(to Tableau) 44mins 12mins

what a surprise! VM mySQL is faster than AzureSQL.

Tips: How to write data into AzureSQL and mySQL through Databricks.

To SQL server:

# you have to load com.microsoft.azure:azure-sqldb-spark:1.0.2 into library first
%scala
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._


val config = Config(Map(
  "url"          -> "<accountname>.database.windows.net",
  "databaseName" -> "<dbname>",
  "dbTable"      -> "<tablename>",
  "user"         -> "<admin name>",
  "password"     -> "<password name>"
))

import org.apache.spark.sql.SaveMode

df.write.mode(SaveMode.Overwrite).sqlDB(config)

To mySQL:

%scala
val jdbcHostname = "<mysql address>"
val jdbcPort = 3306
val jdbcDatabase = "<dbname>"
val jdbcUsername = "<user name>"
val jdbcPassword ="<password>"

// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"

// Create a Properties() object to hold the parameters.
import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")

import org.apache.spark.sql.SaveMode


     df.write
     .mode(SaveMode.Overwrite) // <--- Append to the existing table
     .jdbc(jdbcUrl, "<table name>", connectionProperties)

Tableau 2019.3 starts to support Databricks

Since version 2019.3, Tableau starts to support databricks with the native connection driver. So we don’t have to use some wired unknown product or custom API to connect them together. Here is the step how to use it simply.

  • Download the new version of Tableau through pre-release page. (it will come to the official version soon)
  • Download the supported Simba ODBC driver. and install it.
  • After all these two steps, you will find Databrick option in your tableau connect.
Select Databricks connector

The next steps are figuring out the authorization information for databricks.

  • Login Databricks.
  • Go to Cluster – <your selection cluster name> – Advanced Options – JDBC/ODBC

Here you can find Server Hostname and Http path, copy them to authorization page.

For user name and password, you have to use token.

  • find the little logo on the right up corner. Press it then choose user setting.
  • Click “Generate new Token”. there will be a popup windows, save the series code which is your PASSWORD.
  • Enter “token” as username, the code you just got as the password. Then you can see the tables you created in databricks.

For Saving tables in databricks, it is also simple.

df_test = spark.read.format("parquet").load("/mnt/aclaradls01/test/dcu_health.parquet")
df_test.write.mode('overwrite').saveAsTable("df_test") 

Tips:

We can also use spark SQL to connect databricks. And even use databricks as API to connect to datalakes. It pretty much like a cluster running Spark. Here is the steps:

  1. create a view in databricks which links to the mounted datalake path. spark.sql(“create view df_test_view as SELECT * FROM parquet./mnt/<mounted_name>/<folder_name>/“)
  2. Add “spark.hadoop.hive.server2.enable.doAs false” into your cluster spark config.
  3. If you tried to load large size data which takes long time, add spark.executor.heartbeatInterval 10s and spark.network.timeout 9999s to your cluster spark config as well.

Reference:

Tableau with databricks. https://docs.databricks.com/user-guide/bi/tableau.html

How to connect parquet files from Azure through Apache Drill

Parquet format is a very good choice for big data. It is fast, small and distributed. But not all software support this format. So, we have to use Apache Drill as intermediate layer to connect source and target together.

drill query flow

Today, we are going to take tableau as example to explain how to read parquet files from Azure Blobstorage to Tableau dashboard.

  • Install java JDK.
  • Install 7-zip.
  • Setting up windows environment.
    1. Add JAVA_HOME to your environment variable. set the value as ‘C:\progra~1\Java\jdk1.8.0_221‘. Please change the path if you have a different version.
    2. Add %JAVA_HOME%\bin to Path variable.
  • Create UDF directories manually in bash.
    mkdir "%userprofile%\drill"
    mkdir "%userprofile%\drill\udf"
    mkdir "%userprofile%\drill\udf\registry"
    mkdir "%userprofile%\drill\udf\tmp"
    mkdir "%userprofile%\drill\udf\staging"
    takeown /R /F "%userprofile%\drill"
  • download and unzip Apache Drill to your install folder.
  • Use cmd to start Drill.
cd  \apache-drill-1.16.0\bin
drill-embedded.bat

now, you can see apache drill has been running. If you have data files on your local machine, you can simply retrive data by running SQL select * from dfs.<file path>.

  • Download support jar for Azure.Put them into folder ‘apache-drill-1.16.0\jars\3rdparty’
    1. hadoop-azure-2.7.7.jar
    2. azure-storage-8.0.0.jar
  • Configure connection information.
    • goto ‘apache-drill-1.16.0\conf’, create a copy of core-site-example.xml at the same folder, and change the name to core-site.xml. Modify the contents as blew, remember to change STORAGE_ACCOUNT_NAME and AUTHENTICATION_KEY in term of your azure blob storage account:
<property>
  <name>fs.azure.account.key.STORAGE_ACCOUNT_NAME.blob.core.windows.net</name>
   <value>AUTHENTICATION_KEY</value>
</property>
  • Configure connection information.(con’t)
    • goto ‘http://localhost:8047/storage‘ to open WebUI. Create a new plugin called ‘AZure’, the contents is as same as cp. change the connection and set config as ‘null’ since we already set it up in core-site.xml.
 "type": "file",
 "connection":"wasbs://container_name@STORAGE_ACCOUNT_NAME.blob.core.windows.net",
    "config": null,
  • Try to read parquet files into tableau.
    1. Open Tableau and choose connection Apache Drill. Entry *localhost for Server. Click Signin.
    2. Create a new custom SQL. select * from `az.default`.`parquet file name`.
  • Create view.
    1. Install Drill JDBC Driver.
    2. Go to Start, find Drill Explorer.
    3. Connect to Server. Choose SQL Tab, write SQL here, then click Create as View. Now, you can see the view under dfs.temp.
    4. Tableau can read this view anytime.

Plus: Currently, Drill API didn’t support Datalake Gen2 since it only supports Blob Storage API which is without hierarchy.

How to build a data pipeline in Databricks

For a long term, I thought there was no pipeline concept in Databricks. For the most engineers they will write the whole script into one notebook rather than split into several activities like in Data factory. In this case, we have to rewirte everything in the script when the next pipeline coming.

But recently, I found there was indeed a way to achieve pipeline in databricks. Here is the document. And follows my understanding.

  • Create widgets as parameters in callee notebook
dbutils.widgets.text("input", "default") # define a text widget in callee notebook
......
dbutils.notebook.exit(defaultText) # set return variable
  • Call callee notebook through caller notebook(we can write our pipeline in this caller notebook, then call all following activity notebooks)
status = dbutils.notebook.run("pipeline",30,{"input":"pass success"}) # transfer input = “pass success” to a notebook called pipeline
  • Callee notebook can return a string to indicate the execution statues( you still remember dbutils.notebook.exit in callee notebook right?)
returned_string = dbutils.notebook.run("pipeline2", 60) 
  • The data exchange between two notebooks can be done with global tempview which locates in memory.
# in callee
sqlContext.range(10).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

# in caller
returned_table = dbutils.notebook.run("pipeline2", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
  • We can also call function in another notebook, which means we can build a library for all notebook
%run ./aclaraLibary # in this notebook, there are two functions which are overloaded.
simple(1,2)
simple(1,2,3)