python multithreading wait till all threads finished

Posted on

Question :

python multithreading wait till all threads finished

This may have been asked in a similar context but I was unable to find an answer after about 20 minutes of searching, so I will ask.

I have written a Python script (lets say: scriptA.py) and a script (lets say scriptB.py)

In scriptB I want to call scriptA multiple times with different arguments, each time takes about an hour to run, (its a huge script, does lots of stuff.. don’t worry about it) and I want to be able to run the scriptA with all the different arguments simultaneously, but I need to wait till ALL of them are done before continuing; my code:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

I want to do run all the subprocess.call() at the same time, and then wait till they are all done, how should I do this?

I tried to use threading like the example here:

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

But I do not think this is right.

How do I know they have all finished running before going to my do_finish()?

Answer #1:

You need to use join method of Thread object in the end of the script.

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

Thus the main thread will wait till t1, t2 and t3 finish execution.

Answered By: Inbar Rose

Answer #2:

Put the threads in a list and then use the Join method

 threads = []

 t = Thread(...)
 threads.append(t)

 ...repeat as often as necessary...

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()
Answered By: Maksim Skurydzin

Answer #3:

In Python3, since Python 3.2 there is a new approach to reach the same result, that I personally prefer to the traditional thread creation/start/join, package concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html

Using a ThreadPoolExecutor the code would be:

from concurrent.futures.thread import ThreadPoolExecutor
import time

def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')

args = ['argumentsA', 'argumentsB', 'argumentsC']

with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

The output of the previous code is something like:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

One of the advantages is that you can control the throughput setting the max concurrent workers.

Answered By: Aaron Digulla

Answer #4:

I prefer using list comprehension based on an input list:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]
Answered By: Roberto

Answer #5:

You can have class something like below from which you can add ‘n’ number of functions or console_scripts you want to execute in parallel passion and start the execution and wait for all jobs to complete..

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()
Answered By: Adam Matan

Answer #6:

From the threading module documentation

There is a “main thread” object; this corresponds to the initial
thread of control in the Python program. It is not a daemon thread.

There is the possibility that “dummy thread objects” are created.
These are thread objects corresponding to “alien threads”, which are
threads of control started outside the threading module, such as
directly from C code. Dummy thread objects have limited functionality;
they are always considered alive and daemonic, and cannot be join()ed.
They are never deleted, since it is impossible to detect the
termination of alien threads.

So, to catch those two cases when you are not interested in keeping a list of the threads you create:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

Whereupon:

>>> print(data)
[0, 4, 12, 40]
Answered By: PBD

Answer #7:

Maybe, something like

for t in threading.enumerate():
    if t.daemon:
        t.join()
Answered By: berna1111

Answer #8:

I just came across the same problem where I needed to wait for all the threads which were created using the for loop.I just tried out the following piece of code.It may not be the perfect solution but I thought it would be a simple solution to test:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise
Answered By: jno

Leave a Reply

Your email address will not be published. Required fields are marked *