A simple mistaken occurred leveraging spark in python multiprocessing

Look at this snippet first:

def processEachCustomer(client_id):
    df_test = sql('')
    df_test.write.format('delta').save(path)

num_cores = multiprocessing.cpu_count()
with ThreadPoolExecutor(max_workers=num_cores) as executor:
  executor.map(processEachCustomer,customer_list)

It looks fine at the first glance. However, after the validation, the output was incomplete in delta table. At the end, the issue happens in the df_test, which is not a local variable in a function. So, when it ran as multicore, df_test was overwritten.

The best way to avoid this issue is using pyspark code only. If you have to combine them within the same notebook. Here is maybe a work around.

df_test = {}
def processEachCustomer(client_id):
    df_test[client_id] = sql('')
    df_test[client_id].write.format('delta').save(path)

num_cores = multiprocessing.cpu_count()
with ThreadPoolExecutor(max_workers=num_cores) as executor:
  executor.map(processEachCustomer,customer_list)

Multi Processing in Python

Recently we get the work to improve the ETL efficiency. The latency mostly happen in I/O and file translation, since we only use one thread to handle it. So we want to use multi processing to accelerate this task.

Before everything, I think I need to introduce the difference between multi processing and multi threading in python. The major distance is multi threading is not parallel, there can only be one thread running at any given time in python. You can see it in blew.

  • Actually for CPU heavy tasks, multi threading is useless indeed. However it’s perfect for IO
  • Multiprocessing is always faster than serial, but don’t pop more than number of cores
  • Multiprocessing is for increasing speed. Multi threading is for hiding latency
  • Multiprocessing is best for computations. Multi threading is best for IO
  • If you have CPU heavy tasks, use multiprocessing with n_process = n_cores and never more. Never!
  • If you have IO heavy tasks, use multi threading with n_threads = m * n_cores with m a number bigger than 1 that you can tweak on your own. Try many values and choose the one with the best speedup because there isn’t a general rule. For instance the default value of m in ThreadPoolExecutor is set to 5 [Source] which honestly feels quite random in my opinion.

In this task, we utilized multi processing.

Pro

  1. 5x-10x faster than single core when utilizing 20 cores. (6087 files, 27s (20 cores) vs 180s(single core)
  2. no need to change current coding logic, only adapt to multiprocess pattern.
  3. no effect to upstream and down stream.
Image

Corn:

  1. need to change code by scripts, no general framework or library 
  2. manually decide which part to be parallel 

How to work:

  • Library: you need to import these libraries to enable multi processing in python
from multiprocessing import Pool
from functools import partial
import time # optional
  • Function: you need two functions. parallelize_dataframe is used split dataframe and call multi processing, multiprocess is used for multiprocessing. These two function need to put outside of main function. The yellow marked parameters can be changed by needs.
# split dataframe into parts for parallelize tasks and call multiprocess function
def parallelize_dataframe(df, func, num_cores,cust_id,prodcut_fk,first_last_data):
    df_split = np.array_split(df, num_cores)
    pool = Pool(num_cores)
    parm = partial(func, cust_id=cust_id, prodcut_fk=prodcut_fk,first_last_data=first_last_data)
    return_df = pool.map(parm, df_split)
    df = pd.DataFrame([item for items in return_df for item in items])
    pool.close()
    pool.join()
    return df

# multi processing
# each process handles parts of files imported
def multiprocess(dcu_files_merge_final_val_slice, cust_id,prodcut_fk,first_last_data):
  • Put another script into main function, and utilize  parallelize_dataframe  to execute.
if __name__ == '__main__':
.......
......
                   # set the process number
                    core_num = 20
                    rsr_df = parallelize_dataframe(dcu_files_merge_final_val,multiprocess,core_num, cust_id, prodcut_fk,first_last_data)
                    print(rsr_df.shape)
                    end = time.time()
                    print('Spend:',str(end-start)+'s')

I can’t share the completed code, but you should understand it very well. There are some issues that I have not figure out the solution. The major one is how to more flexible to transmit the parameters to the multicore function rather than hard coding each time.

update@20190621

We added the feature that unzip single zip file with multi processing supporting utilizing futures library.

import os
import zipfile
import concurrent

def _count_file_object(f):
    # Note that this iterates on 'f'.
    # You *could* do 'return len(f.read())'
    # which would be faster but potentially memory
    # inefficient and unrealistic in terms of this
    # benchmark experiment.
    total = 0
    for line in f:
        total += len(line)
    return total


def _count_file(fn):
    with open(fn, 'rb') as f:
        return _count_file_object(f)

def unzip_member_f3(zip_filepath, filename, dest):
    with open(zip_filepath, 'rb') as f:
        zf = zipfile.ZipFile(f)
        zf.extract(filename, dest)
    fn = os.path.join(dest, filename)
    return _count_file(fn)



def unzipper(fn, dest):
    with open(fn, 'rb') as f:
        zf = zipfile.ZipFile(f)
        futures = []
        with concurrent.futures.ProcessPoolExecutor() as executor:
            for member in zf.infolist():
                futures.append(
                    executor.submit(
                        unzip_member_f3,
                        fn,
                        member.filename,
                        dest,
                    )
                )
            total = 0
            for future in concurrent.futures.as_completed(futures):
                total += future.result()
    return total

Ref :

Multithreading VS Multiprocessing in Python, https://medium.com/contentsquare-engineering-blog/multithreading-vs-multiprocessing-in-python-ece023ad55a

Fastest way to unzip a zip file in Python, https://www.peterbe.com/plog/fastest-way-to-unzip-a-zip-file-in-python

Learn Django with me(part 3)

Handle view and templates

View consist of a set of functions which handle the different url request with the specific url pettarns.
And it returns either of HttpResponse or Http404.

Firstly, let’s update webapp/views.py:

from django.shortcuts import render
from .models import Question

def index(request):
# get the lastest 5 questions
latest_question_list = Question.objects.order_by('-pub_date')[:5]
# create context
context = {'latest_question_list': latest_question_list}
# a shortcuts for render request by template
return render(request, 'webapp/index.html', context)

def detail(request, question_id):
question = get_object_or_404(Question, pk=question_id)
return render(request, 'webapp/detail.html', {'question': question})

def results(request, question_id):
response = "You're looking at the results of question %s."
return HttpResponse(response % question_id)

def vote(request, question_id):
return HttpResponse("You're voting on question %s." % question_id)

Here we used template webapp/index.html, which locates in webapp/tempaltes/webapp/index.html. So let’s create a folder templates and its subfolder webapp, the code of index.html:

# list of all items from question object
{% if latest_question_list %}
<ul>
    {% for question in latest_question_list %}
# webapp is the namespace, detail is the name
    <li><a href="{% url 'webapp:detail' question.id %}">{{ question.question_text }}</a></li>
{% endfor %}</ul>
{% else %}

No polls are available.

{% endif %}

The trick point is when we refer to the details, we use {% url 'webapp:detail' question.id %} instand of absolute path. Here webapp is the name space, detail is the name, all can be found in updated webapp/urls.py:

from django.urls import path

from . import views

# namespace
app_name = 'webapp'
urlpatterns = [
# ex: /webapp/
path('', views.index, name='index'),
# ex: /webapp/5/
path('<int:question_id>/', views.detail, name='detail'),
# ex: /webapp/5/results/
path('<int:question_id>/results/', views.results, name='results'),
# ex: /webapp/5/vote/
path('<int:question_id>/vote/', views.vote, name='vote'),
]
```</int:question_id></int:question_id></int:question_id>

Similar to the template `index.html`, we should add the template `webapp/tempaltes/webapp/detail.html`:
``` html
<h1>{{ question.question_text }}</h1>
<ul>
{% for choice in question.choice_set.all %}
    <li>{{ choice.choice_text }}</li>
{% endfor %}</ul>

All the dynamic codes in html are easy to understand, I wouldn’t waste time to explain them.

Now, you can access http://localhost:8000/webapp/ to display the reuslts.The whole process can be described like this:
1. send request to server
2. Djongo pastes the url by ROOT_URLCONF = 'mysite.urls' in mysite/settings.py, which points to mysite.urls.
3. In term of urlpatterns in mysite/urls.py, the request will be transfered to webapp folder.
4. The request can be handled by webapp/urls.py, which points to the different functions in webapp/views.py. Here the second param in the function results is from request pattern.
5. View pastes and handle the request, then retrieves template in template/webapp/.
6. HttpResponse or Http404 back to client

In a nutshell, urls.py handles url patterns and sends request to views.py, views.py calls model.py and templates to send response back.

Learn Django with me(part 2)

Change Database Setting

Open up mysite/settings.py, find snippet as blew:

DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
}
}

Here you can change your database to others if needed
* ENGINE – Either ‘django.db.backends.sqlite3’, ‘django.db.backends.postgresql’, ‘django.db.backends.mysql’, or ‘django.db.backends.oracle’.
* NAME – Name of database

We can aslo change the time zone in the setting file

TIME_ZONE = 'America/Chicago'

To create related tables in the database, we need to execute
python manage.py migrate. It will create tables following by INSTALLED_APPS in setting.py.

To read the recetly created tables in sqlite:

python manage.py dbshell
# into sqlite shell
.table

Thre result will like blew:

sqlite&gt; .tables
auth_group                  auth_user_user_permissions
auth_group_permissions      django_admin_log
auth_permission             django_content_type
auth_user                   django_migrations
auth_user_groups            django_session

Create a model

In offical defination, model is the single, definitive source of truth about your data.. In my option, model is only data model in single place, rather than in database as well as in you codes.

Let copy this into webapp/models.py. We create two classes which are also two tables in the database. Each variable is a filename with its data type, such as models.CharField is type char, and models.DataTimeField is datatime. Here we can also figure out a ForeignKey in class Choice which points to Question.

from django.db import models

class Question(models.Model):
question_text = models.CharField(max_length=200)
pub_date = models.DateTimeField('date published')
def __str__(self):
return self.question_text

class Choice(models.Model):
question = models.ForeignKey(Question, on_delete=models.CASCADE)
choice_text = models.CharField(max_length=200)
votes = models.IntegerField(default=0)
def __str__(self):
return self.choice_text

To active model, we need to add config file into INSTALLED_APPS. webapp.apps.WebappConfig means calling WebappConfig in apps file in webapp folder.

INSTALLED_APPS = [
'webapp.apps.WebappConfig',
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
]

Then we run makemigrations to create migration files.

python manage.py makemigrations webapp
# then you will see somehitng like the following:
Migrations for 'webapp':
webapp/migrations/0001_initial.py
- Create model Choice
- Create model Question
- Add field question to choice

We can find the migration opertaion in webapp/migrations/, we run python manage.py migrate, then we can find two new table in the database already(.schema {tablename})

Summary 3 steps for making model changes:

  • Change your models (in models.py).
  • Run python manage.py makemigrations to create migrations for those changes
  • Run python manage.py migrate to apply those changes to the database.

Play with Shell to add some records into db

To get into the shell, we need to execute

python manage.py shell

Then add some questions and choice

from webapp.models import Choice, Question
# show all question
Question.objects.all()

# add a new question
from django.utils import timezone
q = Question(question_text="What's new?", pub_date=timezone.now())

# save into database
q.save()
q.id()

# search in database, similar to where in sql
Question.objects.filter(id=1) # id=1
Question.objects.filter(question_text__startswith='What')
Question.objects.get(pk=1) # filter with pk

# add some choices, here Django creates a set to hold the "other side" of ForeignKey relation
q.choice_set.create(choice_text='Not much', votes=0)
q.choice_set.create(choice_text='The sky', votes=0)
q.choice_set.create(choice_text='The moon', votes=0)

# delete records
d = q.choice_set.filter(choice_text__startswith='The moon')
d.delete()

Django Admin

To create a admin with python manage.py createsuperuser, then system will ask you enter the username, email and password. After this, we can access admin website http://localhost:8000/admin/

If admin account want to add new question in the website, we need to add the follow snippet into admin.py

from django.contrib import admin

from .models import Question

admin.site.register(Question)

Some issue

When I tried to save question in the admin webpage, there poped out a issue like no such table: main.auth_user__old. Just marked here waiting to find the reason later.

Learn Django with me(part 1)

Although I touched python for a while, most of time I use it only for data analysis with panda or some machine learning packages. Django as one of most famous webframeworks has been existing for over 12 years. So, I decide to learn it step by step with the official tutorial and share my experience with you.

Prepare Django

# install django
sudo pip install Django
# build project
django-admin startproject {name of site}
# run test
# goto project folder, you will find manage.py, run
python manage.py runserver {port}

Fast explain some of files:

  • mange.py: A command-line utility that lets you interact with this Django project in various way.
  • {name of site}: python package for the project
  • mysite/__init__.py: An empty file that tells Python that this directory should be considered a Python package. If you’re a Python beginner, read more about packages in the official Python docs.
  • mysite/settings.py: Settings/configuration for this Django project. Django settings will tell you all about how settings work.
    mysite/urls.py: The URL declarations for this Django project; a “table of contents” of your Django-powered site. You can read more about URLs in URL dispatcher.
  • mysite/wsgi.py: An entry-point for WSGI-compatible web servers to serve your project. See How to deploy with WSGI for more details.

Create a new app

# add a new app
python manage.py startapp {name of app}
  • In each App folder, there are three important python files
  • urls.py: controls what is served based on url patterns
  • models.py: database structures and metadata
  • views.py: handles what the end-user “views” or interacts with

Then we need add the app into setting.py under the site folder {name of site}

INSTALLED_APPS = [
'{name of app}',
]

And update the urls.py under the same folder

from django.contrib import admin
from django.urls import path,include

urlpatterns = [
path('admin/', admin.site.urls),
path('webapp/', include('webapp.urls')),
]

We have Completed all files modification under the site folder. Then we go to app folder to create urls.py and change view.py.

# create a file name `urls.py` under app folder
touch urls.py

# copy this to the file, which directs the request to views.py
# path(route, view, kwargs,name)
# @route: URL pattern
# @view: function name to be called
# @kwargs: argument to be passed in a dictionary
# @name: refer URL with the name
from django.urls import path
from . import views
urlpatterns = [
path('', views.index, name='index'),
]

# change view.py as
from django.http import HttpResponse
def index(request):
return HttpResponse("Hello, world. You're at the polls index.")

After all these done, we can access webapp by http://localhost:8000/{name of app}/

Interesting Python I ( function )

how to pass-by-reference?

Many lauguages support pass by value or pass by reference, like C/C++. It copies the address of an argument into the formal parameter. Inside the function, the address is used to access the actual argument used in the call. It means the changes made to the parameter affect the passed argument. In Python, pass by reference is very tricky. There are two kinds of objects: mutable and immutable. string, tuple, numbers are immuable, list, dict, set are muable. When we try to change the value of immuable object, Python will create a copy of reference rather than changing the value of reference. Let us see the code:

    def ref_demo(x):
        print "x=",x," id=",id(x)
        x=42
        print "x=",x," id=",id(x)

    >>> x = 9
    >>> id(x)
    41902552
    >>> ref_demo(x)
    x= 9  id= 41902552
    x= 42  id= 41903752
    >>> id(x)
    41902552
    >>> 

We can find when x = 42, the address of x has changed.

And so on, if we pass a mutable object into a function, we can change it value as pass-by-reference.

*args and **kwargs

Before I explain them, I want to metion that * is used to unpack tuple or list into positional arguments and ** is used to it unpacks dictionary into named arguments.

* defines a variable number of arguments. The asterisk character has to precede a variable identifier in the parameter list.

>>> def print_everything(*args):
        for count, thing in enumerate(args):
...         print '{0}. {1}'.format(count, thing)
...
>>> print_everything('apple', 'banana', 'cabbage')
0. apple
1. banana
2. cabbage

** defines an arbitrary number of keyword parameters.

>>> def table_things(**kwargs):
...     for name, value in kwargs.items():
...         print '{0} = {1}'.format(name, value)
...
>>> table_things(apple = 'fruit', cabbage = 'vegetable')
cabbage = vegetable
apple = fruit

A * can appear in function calls as well, as we have just seen in the previous exercise: The semantics is in this case “inverse” to a star in a function definition. An argument will be unpacked and not packed. In other words, the elements of the list or tuple are singularized:

>>> def f(x,y,z):
...     print(x,y,z)
... 
>>> p = (47,11,12)
>>> f(*p)
(47, 11, 12)

There is also a mechanism for an arbitrary number of keyword parameters. To do this, we use the double asterisk “**” notation:

>>> def f(a,b,x,y):
...     print(a,b,x,y)
...
>>> d = {'a':'append', 'b':'block','x':'extract','y':'yes'}
>>> f(**d)
('append', 'block', 'extract', 'yes')

Deprecated: preg_replace(): Passing null to parameter #3 ($subject) of type array|string is deprecated in /home/jietao/jie-tao/wp-content/themes/zacklive/library/zacklive.php on line 283