A simple mistaken occurred leveraging spark in python multiprocessing

Look at this snippet first:

def processEachCustomer(client_id):
    df_test = sql('')
    df_test.write.format('delta').save(path)

num_cores = multiprocessing.cpu_count()
with ThreadPoolExecutor(max_workers=num_cores) as executor:
  executor.map(processEachCustomer,customer_list)

It looks fine at the first glance. However, after the validation, the output was incomplete in delta table. At the end, the issue happens in the df_test, which is not a local variable in a function. So, when it ran as multicore, df_test was overwritten.

The best way to avoid this issue is using pyspark code only. If you have to combine them within the same notebook. Here is maybe a work around.

df_test = {}
def processEachCustomer(client_id):
    df_test[client_id] = sql('')
    df_test[client_id].write.format('delta').save(path)

num_cores = multiprocessing.cpu_count()
with ThreadPoolExecutor(max_workers=num_cores) as executor:
  executor.map(processEachCustomer,customer_list)

Read More

Leave a Reply

Your email address will not be published. Required fields are marked *