Multi Processing in Python

Recently we get the work to improve the ETL efficiency. The latency mostly happen in I/O and file translation, since we only use one thread to handle it. So we want to use multi processing to accelerate this task.

Before everything, I think I need to introduce the difference between multi processing and multi threading in python. The major distance is multi threading is not parallel, there can only be one thread running at any given time in python. You can see it in blew.

  • Actually for CPU heavy tasks, multi threading is useless indeed. However it’s perfect for IO
  • Multiprocessing is always faster than serial, but don’t pop more than number of cores
  • Multiprocessing is for increasing speed. Multi threading is for hiding latency
  • Multiprocessing is best for computations. Multi threading is best for IO
  • If you have CPU heavy tasks, use multiprocessing with n_process = n_cores and never more. Never!
  • If you have IO heavy tasks, use multi threading with n_threads = m * n_cores with m a number bigger than 1 that you can tweak on your own. Try many values and choose the one with the best speedup because there isn’t a general rule. For instance the default value of m in ThreadPoolExecutor is set to 5 [Source] which honestly feels quite random in my opinion.

In this task, we utilized multi processing.

Pro

  1. 5x-10x faster than single core when utilizing 20 cores. (6087 files, 27s (20 cores) vs 180s(single core)
  2. no need to change current coding logic, only adapt to multiprocess pattern.
  3. no effect to upstream and down stream.
Image

Corn:

  1. need to change code by scripts, no general framework or library 
  2. manually decide which part to be parallel 

How to work:

  • Library: you need to import these libraries to enable multi processing in python
from multiprocessing import Pool
from functools import partial
import time # optional
  • Function: you need two functions. parallelize_dataframe is used split dataframe and call multi processing, multiprocess is used for multiprocessing. These two function need to put outside of main function. The yellow marked parameters can be changed by needs.
# split dataframe into parts for parallelize tasks and call multiprocess function
def parallelize_dataframe(df, func, num_cores,cust_id,prodcut_fk,first_last_data):
    df_split = np.array_split(df, num_cores)
    pool = Pool(num_cores)
    parm = partial(func, cust_id=cust_id, prodcut_fk=prodcut_fk,first_last_data=first_last_data)
    return_df = pool.map(parm, df_split)
    df = pd.DataFrame([item for items in return_df for item in items])
    pool.close()
    pool.join()
    return df

# multi processing
# each process handles parts of files imported
def multiprocess(dcu_files_merge_final_val_slice, cust_id,prodcut_fk,first_last_data):
  • Put another script into main function, and utilize  parallelize_dataframe  to execute.
if __name__ == '__main__':
.......
......
                   # set the process number
                    core_num = 20
                    rsr_df = parallelize_dataframe(dcu_files_merge_final_val,multiprocess,core_num, cust_id, prodcut_fk,first_last_data)
                    print(rsr_df.shape)
                    end = time.time()
                    print('Spend:',str(end-start)+'s')

I can’t share the completed code, but you should understand it very well. There are some issues that I have not figure out the solution. The major one is how to more flexible to transmit the parameters to the multicore function rather than hard coding each time.

update@20190621

We added the feature that unzip single zip file with multi processing supporting utilizing futures library.

import os
import zipfile
import concurrent

def _count_file_object(f):
    # Note that this iterates on 'f'.
    # You *could* do 'return len(f.read())'
    # which would be faster but potentially memory
    # inefficient and unrealistic in terms of this
    # benchmark experiment.
    total = 0
    for line in f:
        total += len(line)
    return total


def _count_file(fn):
    with open(fn, 'rb') as f:
        return _count_file_object(f)

def unzip_member_f3(zip_filepath, filename, dest):
    with open(zip_filepath, 'rb') as f:
        zf = zipfile.ZipFile(f)
        zf.extract(filename, dest)
    fn = os.path.join(dest, filename)
    return _count_file(fn)



def unzipper(fn, dest):
    with open(fn, 'rb') as f:
        zf = zipfile.ZipFile(f)
        futures = []
        with concurrent.futures.ProcessPoolExecutor() as executor:
            for member in zf.infolist():
                futures.append(
                    executor.submit(
                        unzip_member_f3,
                        fn,
                        member.filename,
                        dest,
                    )
                )
            total = 0
            for future in concurrent.futures.as_completed(futures):
                total += future.result()
    return total

Ref :

Multithreading VS Multiprocessing in Python, https://medium.com/contentsquare-engineering-blog/multithreading-vs-multiprocessing-in-python-ece023ad55a

Fastest way to unzip a zip file in Python, https://www.peterbe.com/plog/fastest-way-to-unzip-a-zip-file-in-python

Delta Lake Step by Step(1)

Before we start to talk about delta lake, we have to take time to deal with data lake and understand why we need to use data lake. Blew is the best definition I think.

A data lake is a repository for structured, unstructured, and semi-structured data. Data lakes are much different from data warehouses since they allow data to be in its rawest form without needing to be converted and analyzed first.

Devin Pickell 

Data Lake stored everything(raw) with every format. In a short nut, it is a repository. e.g. in Azure, Data Lake Gen2 is a repository based on Blob storage.

What’s the difference between data lake and data warehouse?

Table 1. Difference between data warehouse and data lake

From table 1, we can figure out that dw and dl use in the different scenarios. Data warehouse is used for the classic data analysis, and the data stored in structured rational table; while data lake is used for big data, ELT situation, every raw data coming into data lake then doing following actions. Since data lake is on the cluster cloud, it is very faster to handle the raw data.

What is the downside of data lake?

  • No Transaction.
  • Schema on read.
  • No version control.

These three downsides are very critical in applying data lake into real project. It will affect data quality and efficiency both. It is why most big data projects are failed. Because the data lake can not provide stable data like the database.

Delta Lake comes out!

Figure 1. Delta Lake created ACID and Transaction based on Parquet.

Delta Lake has its own abilities which enables a stable data lake for analysis.

  • Open source storage layer, based on parquet file
  • ACID transaction: serialize isolation
  • Scalable meta data handling: spark distribution data processing
  • stream and batch uniform: batch table as well as stream source
  • Schema enforcement
  • Time travel: data version

Let’s see some simple samples. If you are familiar with spark, you can change to delta lake super quick.

# set up
# upgrade pyspark
pip install --upgrade pyspark
# install
pyspark --packages io.delta:delta-core_2.12:0.1.0


# write table
df= spark.read.csv("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", header="true", inferSchema="true")
df.write.format("delta").save("/delta/diamonds")
# append
df.write.format("delta").mode("append").save("/delta/events")
# we can also partition the table
df.write.format("delta").partitionBy("date").save("/delta/events")
# update
data.write.format("delta").mode("overwrite").option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'").save("/tmp/delta-table")
# by default overwrite doesn't change schema, only if option("overwriteSchema", "true") exiting
# add column automatically
df.write.format("delta").mode("append").option("mergeSchema", "true").save("/delta/events")


# read
df = spark.read.format("delta").load("/tmp/delta-table")
SELECT * FROM delta.`/tmp/delta-table`

# time travel
# create a read version 
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events") 
# For timestamp_string, only date or timestamp strings are accepted
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

# structured stream
streamingDf = spark.readStream.format("delta").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
# set mini-batch
spark.readStream.format("delta").option("maxFilesPerTrigger",1000).load()
# ignore updates and deletes with "ignoreDeletes" and "ignoreChanges"
# append mode and complete mode
streamingDf = spark.writeStream.format("delta").outputMode("append").load() 
streamingDf = spark.writeStream.format("delta").outputMode("complete").load()

stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()
stream.stop()

# check point
.option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")

In next article, I will talk about when to use delta lake and how to optimize it in data brick.

Here is one of my example which covert three small files into one delta table: https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2499619125013057/2952616815059893/2608746070181068/latest.html

Reference:

  1. Delta Lake Document. https://delta.io
  2. What Is a Data Lake and Why Is It Essential for Big Data? https://learn.g2.com/what-is-a-data-lake
  3. Delta Lake Document for databricks. https://docs.databricks.com/delta

Deprecated: preg_replace(): Passing null to parameter #3 ($subject) of type array|string is deprecated in /home/jietao/jie-tao/wp-content/themes/zacklive/library/zacklive.php on line 283