Look at this snippet first:
def processEachCustomer(client_id):
df_test = sql('')
df_test.write.format('delta').save(path)
num_cores = multiprocessing.cpu_count()
with ThreadPoolExecutor(max_workers=num_cores) as executor:
executor.map(processEachCustomer,customer_list)
It looks fine at the first glance. However, after the validation, the output was incomplete in delta table. At the end, the issue happens in the df_test, which is not a local variable in a function. So, when it ran as multicore, df_test was overwritten.
The best way to avoid this issue is using pyspark code only. If you have to combine them within the same notebook. Here is maybe a work around.
df_test = {}
def processEachCustomer(client_id):
df_test[client_id] = sql('')
df_test[client_id].write.format('delta').save(path)
num_cores = multiprocessing.cpu_count()
with ThreadPoolExecutor(max_workers=num_cores) as executor:
executor.map(processEachCustomer,customer_list)
Leave a Reply