Execute multiple threads concurrently

Posted on

Question :

Execute multiple threads concurrently

Current code is:

def export_data(file):
    <runs the db2 database command to export tables to file>

def export_to_files(yaml):
    logger = logging.getLogger("export_to_files")
    thread1 = threading.Thread(target=export_data, args=[out_file1])
    thread1.start()
    thread2 = threading.Thread(target=export_data, args=[out_file2])
    thread2.start()
    thread1.join()
    thread2.join()

def main():
    export_to_files()

if __name__ == "__main__":
    main()

My understanding was that join() only blocks the calling thread. However, I did not realize that thread1.join() would even block thread2 from executing, essentially making the code to only run 1 thread i.e. thread1.

How can I execute both the threads concurrently, while have the main thread wait for both to complete?

EDIT: I stand corrected, the 2 threads do run, but it seems like only 1 thread is actually “doing” things at a point in time.

To elaborate further, the callable_method is reading data from the database and writing to a file. While I can now see 2 files being updated(each thread writes to a separate file), one of the files is not updated for quite some time now, while the other file is up-to-date as to current time.

There is no connection object being used. The queries are run from the db2 command line interface.

Answer #1:

You could use the largely undocumented ThreadPool class in multiprocessing.pool to do something along these lines:

from multiprocessing.pool import ThreadPool
import random
import threading
import time

MAX_THREADS = 2
print_lock = threading.Lock()

def export_data(fileName):
    # simulate writing to file
    runtime = random.randint(1, 10)
    while runtime:
        with print_lock: # prevent overlapped printing
            print('[{:2d}] Writing to {}...'.format(runtime, fileName))
        time.sleep(1)
        runtime -= 1

def export_to_files(filenames):
    pool = ThreadPool(processes=MAX_THREADS)
    pool.map_async(export_data, filenames)
    pool.close()
    pool.join()  # block until all threads exit

def main():
    export_to_files(['out_file1', 'out_file2', 'out_file3'])

if __name__ == "__main__":
    main()

Example output:

[ 9] Writing to out_file1...
[ 6] Writing to out_file2...
[ 5] Writing to out_file2...
[ 8] Writing to out_file1...
[ 4] Writing to out_file2...
[ 7] Writing to out_file1...
[ 3] Writing to out_file2...
[ 6] Writing to out_file1...
[ 2] Writing to out_file2...
[ 5] Writing to out_file1...
[ 1] Writing to out_file2...
[ 4] Writing to out_file1...
[ 8] Writing to out_file3...
[ 3] Writing to out_file1...
[ 7] Writing to out_file3...
[ 2] Writing to out_file1...
[ 6] Writing to out_file3...
[ 1] Writing to out_file1...
[ 5] Writing to out_file3...
[ 4] Writing to out_file3...
[ 3] Writing to out_file3...
[ 2] Writing to out_file3...
[ 1] Writing to out_file3...
Answered By: martineau

Answer #2:

This illustrates a runnable version of your example code:

import time
import threading

def export_data(fileName):
    # runs the db2 database command to export tables to file
    while True:
        print 'If I were the real function, I would be writing to ' + fileName
        time.sleep(1)

thread1 = threading.Thread(target=export_data, args=[ 'out_file1' ])
thread2 = threading.Thread(target=export_data, args=[ 'out_file2' ])

thread1.start()
thread2.start()

thread1.join()
thread2.join()
Answered By: Joseph Dunn

Answer #3:

Your visible code is fine, however some code invisible to us does use locking, the locking can happen even in the database itself.

Answered By: sanyi

Leave a Reply

Your email address will not be published.