Handling and Sharing Data Between Threads (2024)

When working with threads in Python, you will find very useful to beable to share data between different tasks. One of the advantages ofthreads in Python is that they share the same memory space, and thusexchanging information is relatively easy. However, some structures canhelp you achieve more specific goals.

In the previous article, we have covered how to start and synchronizethreads and now it is time toexpand the toolbox to handle the exchange of information between them.

Shared Memory

The first and most naive approach is to use the same variables indifferent threads. We have already used this feature in the previoustutorial, but without discussingit explicitly. Let's see how we can use shared memory through a verysimple example:

from threading import Thread, Eventfrom time import sleepevent = Event()def modify_variable(var): while True: for i in range(len(var)): var[i] += 1 if event.is_set(): break sleep(.5) print('Stop printing')my_var = [1, 2, 3]t = Thread(target=modify_variable, args=(my_var, ))t.start()while True: try: print(my_var) sleep(1) except KeyboardInterrupt: event.set() breakt.join()print(my_var)

The example above is almost trivial, but it has a very importantfeature. We start a new thread by passing an argument, my_var, whichis a list of numbers. The thread will increase the values of the numbersby one, with a certain delay. In this example we use events tograciously finish the thread, if you are not familiar with them, checkthe previous tutorial.

The important piece of code in this example is the print(my_var) line.That print statement lives in the main thread, however, it has access tothe information being generated within a child thread. This behavior ispossible thanks to memory sharing between different threads. Being ableto access the same memory space is useful, but it can also pose somerisks. In the example above, we have started only one thread, but we arenot limited to that. We could, for example, start several threads:

t = Thread(target=modify_variable, args=(my_var, ))t2 = Thread(target=modify_variable, args=(my_var, ))t.start()t2.start()

And you would see that my_var and its information is shared across allthreads. This is good for applications like the one above, in which itdoesn't matter which thread adds one to the variable. Or does it? Let'sslightly modify the code that runs in the thread. Let's remove thesleep:

def modify_variable(var): while True: for i in range(len(var)): var[i] += 1 if event.is_set(): break # sleep(.5) print('Stop printing')

Now, when we run the code, there will be no sleep in between oneiteration and the next. Let's run it for a short time, let's say 5seconds, we can do the following:

from time import time[...]my_var = [1, 2, 3]t = Thread(target=modify_variable, args=(my_var, ))t.start()t0 = time()while time()-t0 < 5: print(my_var) sleep(1)event.set()t.join()print(my_var)

I've suppressed the parts of the code which repeat. If you run thiscode, you will get as outputs very large numbers. In my case, I got:

[6563461, 6563462, 6563463]

There is, however, a very important feature to notice. The three numbersare consecutive. This is expected because the starting variable was[1, 2, 3] and we are adding one to each variable. Let's start a secondthread this time and see what the output is:

my_var = [1, 2, 3]t = Thread(target=modify_variable, args=(my_var, ))t2 = Thread(target=modify_variable, args=(my_var, ))t.start()t2.start()t0 = time()while time()-t0 < 5: try: print(my_var) sleep(1) except KeyboardInterrupt: event.set() breakevent.set()t.join()t2.join()print(my_var)

I've got as an output the following values:

[5738447, 5686971, 5684220]

You can first note that they are not larger than before, meaning thatrunning two threads instead of one could actually be slower for thisoperation. The other thing to note is that the values are no consecutiveto each other! And this is a very important behavior that can appearwhen working with multiple threads in Python. If you think really hard,can you explain where this issue is coming from?

In the previous tutorial, wediscussed that threads are handled by the operating system, whichdecides when to spin one on or off. We have no control over what theoperating system decides to do. In the example above, since there is nosleep in the loop, the operating system will have to decide when tostop one and start another thread. However, that does not explaincompletely the output we are getting. It doesn't matter if one threadruns first and stops, etc. we are always adding +1 to each element.

The problem with the code above is in the line var[i] += 1, which isactually two operations. First, it copies the value from var[i] andads 1. Then it stores the value back to var[i]. In between these twooperations, the operating system may decide to switch from one task toanother. In such case, the value both tasks see in the list is the same,and therefore instead of adding +1 twice, we do it only once. If youwant to do it even more noticeable, you can start two threads, one thatadds and one that subtracts from a list, and that would give you a quickhint of which thread runs faster. In my case, I got the followingoutput:

[-8832, -168606, 2567]

But if I run it another time, I get:

[97998, 133432, 186591]

Note

You may notice that there is a delay between the start of boththreads, which may give a certain advantage to the first thread started.However, that alone cannot explain the output generated.

How to synchronize data access

To solve the problem we found in the previous examples, we have to besure that no two threads try to write at the same time to the samevariable. For that, we can use a Lock:

from threading import Lock[...]data_lock = Lock()def modify_variable(var): while True: for i in range(len(var)): with data_lock: var[i] += 1 if event.is_set(): break # sleep(.5) print('Stop printing')

Note that we added a line with data_lock: to the function. If you runthe code again, you will see that the values we get are alwaysconsecutive. The lock guarantees that only one thread will access thevariable at a time.

The examples of increasing or decreasing values from a list are almosttrivial, but they point in the direction of understanding thecomplications of memory management when dealing with concurrentprogramming. Memory sharing is a nice feature, but it comes with risksalso.

Queues

One of the common situations in which threads are used is when you havesome slow tasks that you can't optimize. For example, imagine you aredownloading data from a website using. Most of the time the processorwould be idle. This means you could use that time for something else. Ifyou want to download an entire website (also called scraping), it wouldbe a good solution to download several pages at the same time. Imagineyou have a list of pages you want to download, and you start severalthreads, each one to download one page. If you are not careful on how toimplement this, you may end up downloading twice the same, as we saw inthe previous section.

Here is where another object can be very useful when working withthreads: Queues. A queue is an object which accepts data in order,i.e. you put data to it one element at a time. Then, the data can beconsumed in the same order, called First-in-first-out (FIFO). A verysimple example would be:

from queue import Queuequeue = Queue()for i in range(20): queue.put(i)while not queue.empty(): data = queue.get() print(data)

In this example you see that we create a Queue, then we put into thequeue the numbers from 0 to 19. Later, we create a while loop thatgets data out of the queue and prints it. This is the basic behavior ofqueues in Python. You should pay attention to the fact that numbers areprinted in the same order in which they were added to the queue.

Coming back to the examples from the beginning of the article, we canuse queues to share information between threads. We can modify thefunction such that instead of a list as an argument, it accepts a queuefrom which it will read elements. Then, it will output the results to anoutput queue:

from threading import Thread, Eventfrom queue import Queuefrom time import sleep, timeevent = Event()def modify_variable(queue_in, queue_out): while True: if not queue_in.empty(): var = queue_in.get() for i in range(len(var)): var[i] += 1 queue_out.put(var) if event.is_set(): break print('Stop printing')

To use the code above, we will need to create two queues. The idea isthat we can also create two threads, in which the input and output queueare reversed. In that case, on thread puts its output on the queue ofthe second thread and the other way around. This would look like thefollowing:

my_var = [1, 2, 3]queue1 = Queue()queue2 = Queue()queue1.put(my_var)t = Thread(target=modify_variable, args=(queue1, queue2))t2 = Thread(target=modify_variable, args=(queue2, queue1))t.start()t2.start()t0 = time()while time()-t0 < 5: try: sleep(1) except KeyboardInterrupt: event.set() breakevent.set()t.join()t2.join()if not queue1.empty(): print(queue1.get())if not queue2.empty(): print(queue2.get())

In my case, the output I get is:

[871, 872, 873]

Much smaller than everything else we have seen so far, but at least wemanaged to shared data between two different threads, without anyconflicts. Where does this slow speed come from? Let's try with thescientific approach which is to split the problem and look at each part.One of the most interesting things is that we are checking whether thequeue is empty before trying to run the rest of the code. We can monitorhow much time it is actually spent running the important part of ourprogram:

def modify_variable(queue_in: Queue, queue_out: Queue): internal_t = 0 while True: if not queue_in.empty(): t0 = time() var = queue_in.get() for i in range(len(var)): var[i] += 1 queue_out.put(var) internal_t += time()-t0 if event.is_set(): break sleep(0.1) print(f'Running time: {internal_t} seconds\n')

The only changes are the addition of a new variable in the function,called internal_t. Then, we monitor the time spent calculating andputting to the new thread. If we run the code again, the output youshould get is something like:

Running time: 0.0006377696990966797 secondsRunning time: 0.0003573894500732422 seconds

This means that out of the 5 seconds in which our program runs, onlyduring about .9 milliseconds we are actually doing something. This is.01% of the time! Let's quickly see what happens if we change the codefor using only one queue instead of two, i.e. the input and output queuewould be the same:

t = Thread(target=modify_variable, args=(queue1, queue1))t2 = Thread(target=modify_variable, args=(queue1, queue1))

With just that change, I've got the following output:

Running time: 4.290639877319336 secondsRunning time: 4.355865955352783 seconds

That is much better! For the about of 5 seconds in which the programruns, the threads run for a total of 8 seconds. Which is what one wouldexpect of parallelizing. Also, the output of the loops is much larger:

[710779, 710780, 710781]

Can you try to guess what made our program so slow if we use two queuesbut reasonably fast if we use the same queue for output and input? Youhave to remember that when you use threads blindly as we have done inthe previous example, we leave everything in the hands of the operatingsystem.

We have no control of whether the OS decides to switch from a task toanother. In the code above, we check whether the queue is empty. It mayvery well be that the operating system decides to give priority to atask which is basically not doing anything, but waiting until there isan element in the queue. If this happens out of synchronization, most ofthe time the program will be just waiting to have an element in thequeue (it is always prioritizing the wrong task). While when we use thesame task for input and output, it doesn't matter which task it runs,there will always be something to proceed.

If you want to see whether the previous speculation is true or not, wecan measure it. We have only one if statement to checkqueue.empty(), we can add an else to accumulate the time the programis actually not doing anything:

def modify_variable(queue_in: Queue, queue_out: Queue): internal_t = 0 sleeping_t = 0 while True: if not queue_in.empty(): t0 = time() var = queue_in.get() for i in range(len(var)): var[i] += 1 queue_out.put(var) internal_t += time()-t0 else: t0 = time() sleep(0.001) sleeping_t += time()-t0 if event.is_set(): break sleep(0.1) print(f'Running time: {internal_t} seconds') print(f'Sleeping time: {sleeping_t} seconds')

In the code above, if the queue is empty, the program will sleep for 1millisecond. Of course, this is not the best, but we can assume that 1millisecond will have no real impact on the overall performance of theprogram. When I run the program above, using two different queues I getthe following output:

Running time: 0.0 secondsSleeping time: 5.001126289367676 secondsRunning time: 0.00018215179443359375 secondsSleeping time: 5.001835107803345 seconds[4126, 4127, 4128]

Where it is clear that most of the time the program is just waitinguntil more data is available on the queue. Since we are sleeping for 1ms every time there is no data available, we are actually making theprogram much slower. But I think it is a good example. We can compare itwith using the same queue for input and output:

Running time: 3.1206254959106445 secondsSleeping time: 1.3756272792816162 secondsRunning time: 3.253162145614624 secondsSleeping time: 1.136244535446167 seconds

Now you see that even if we are wasting some time because of the sleep,most of the time our routine is actually performing a calculation.

The only thing you have to be careful when using the same queue forinput and output is that between checking whether the queue is empty andactually reading from it, it could happen that the other thread grabbedthe result. This is described in the Queuedocumentation.Unless we include a Lock ourselves, the Queue can be read and writtenby any threads. The Lock only comes into effect for the get or putcommands.

Extra Options of Queues

Queues have some extra options, such as the maximum number of elementsthey can hold. You can also define LIFO (last-in, first-out) typesof queues, which you can read about in thedocumentation.What I find more useful about Queues is that they are written in purePython. If you visit their sourcecode, you canlearn a lot about synchronization in threads, customexceptions, anddocumenting.

What is important to note, is that when you work with multiple Threads,sometimes you want to wait (i.e. block the execution), sometimes youdon't. In the examples above, we have always been checking whether theQueue was empty before reading from it. But what happens if we don'tcheck it? The method get has two options: block and timeout. Thefirst is used to determine whether we want the program to wait until anelement is available. The second is to specify the number of seconds wewant it to wait. After that amount of time, an exception is raised. Ifwe set block to false, and the queue is empty, the exception is raisedimmediately.

We can change the function modify_variable to take advantage of this:

def modify_variable(queue_in: Queue, queue_out: Queue): internal_t = 0 while True: t0 = time() var = queue_in.get() for i in range(len(var)): var[i] += 1 queue_out.put(var) internal_t += time()-t0 if event.is_set(): break sleep(0.1) print(f'Running time: {internal_t} seconds\n')

With this code, using different queues for input and output, I get thefollowing:

Running time: 4.914130210876465 secondsRunning time: 4.937211513519287 seconds[179992, 179993, 179994]

Which is much better than what we were getting before. But, this is notreally fair. A lot of time is spent just waiting in the get function,but we are still counting that time. If we move the line oft0 = time() right below the get, the times the code is actuallyrunning are very different:

Running time: 0.7706246376037598 secondsRunning time: 0.763786792755127 seconds[177807, 177808, 177809]

So now you see, perhaps we should have calculated the time differentlyalso in the previous examples, especially when we were using the samequeue for input and output.

If we don't want to program to block while waiting for a get, we can dothe following:

from queue import Empty[...] try: var = queue_in.get(block=False) except Empty: continue

Or, we could specify a timeout, like this:

try: var = queue_in.get(block=True, timeout=0.001)except Empty: continue

In that case, we either don't wait (block==False) and we catch theexception, or we wait for up to 1 millisecond (timeout=0.001) and wecatch the exception. You can play around with these options to see ifthe performance of your code changes in any way.

Queues to Stop Threads

Up to now, we have always used locks to stop threads, which is, Ibelieve, a very elegant way of doing it. However, there is anotherpossibility, which is to control the flow of threads by appendingspecial information to queues. A very simple example would be to add anelement None to a queue, and when the function gets it, it stops theexecution. The code would look like this:

[...]var = queue_in.get()if var is None: break

And then, in the main part of the script, when we want to stop thethreads, we do the following:

queue1.put(None)queue2.put(None)

If you are wondering why you would choose one or the other option, theanswer is actually quite straightforward. The examples we are workingwith, always have queues with 1 element at most. When we stop theprogram, we know everything in the queue has been processed. Imagine,however, that the program is processing a collection of elements, withno relation between each other. This would be the case if you would bedownloading data from a website, for example, or processing images, etc.You want to be sure you finish processing everything before stopping thethread. In such a case, adding a special value to the queue guaranteesthat all elements will be processed.

Warning

it is a very wise idea to be sure a queue is empty after you stop usingit. If, as before, you interrupt the thread by looking at the status ofa lock, the queue may be left with a lot of data in it, and thus thememory will not be freed. A simple while-loop that gets all the elementsof a queue solves it.

IO Bound threads

The examples in this article are computationally intensive, and thusthey are right on the edge where using multi-threading is not applicableand where all the problems arise (such as concurrency, etc.) We havefocused on the limits of multi-threading because if you understand them,you will program with much more confidence. You won't be on your toeshoping for a problem not to arise.

An area where multi-threading excels is in IO (input-output) tasks. Forexample, if you have a program which writes to the hard drive while itis doing something else, the writing to the hard drive can be safelyoffloaded to a separate thread, while the rest of the program keepsrunning. This is also valid if the program waits for user input ornetwork resources to become available, downloads data from the internet,etc.

Example downloading websites

To close this article, let's see an example of downloading websitesusing threadings, queues, and locks. Even if some performanceimprovements are possible, the example will show the basic buildingblocks of almost any threading application of interest.

First, let's discuss what we want to achieve. To keep the examplesimple, we will download all the websites on a list, and we want to savethe downloaded information to the hard drive. The first approach wouldbe to create a for-loop that goes through the list. This code can befound on the Githubrepository.However, we would like to work with multiple threads.

The architecture we propose therefore is: One Queue that hosts thewebsites we want to download, one queue that hosts the data to be saved.Some threads going to the websites to download, and each one outputs thedata to the other queue. Some threads which read the latter queue andsave the data to disk, taking care of not overwriting files. The moduleswe are going to use for this example are:

import osfrom queue import Queuefrom threading import Lock, Threadfrom urllib import request

Note that we are using urllib to downloading data. We then create thequeues and the lock we are going to use:

website_queue = Queue()data_queue = Queue()file_lock = Lock()

Now we can proceed to define the functions which will run on separatedthreads. For downloading data:

def download_data(): while True: var = website_queue.get() if var is None: break response = request.urlopen(var) data = response.read() data_queue.put(data)

Here you see that we used the strategy of checking whether the queue hasa special element, to be sure that we processed all the websites on thequeue before stopping the thread. We download the data from the websiteand we put it on another queue to be later processed.

The saving requires a bit more care because we have to be sure that notwo threads try to write to the same file:

def save_data(): while True: var = data_queue.get() if var is None: break with file_lock: i = 0 while os.path.exists(f'website_data_{i}.dat'): i += 1 open(f'website_data_{i}.dat', 'w').close() with open(f'website_data_{i}.dat', 'wb') as f: f.write(var)

The approach is similar to the downloading of data. We wait until aspecial element is present to stop the thread. Then we acquire a lock tobe sure no other thread is looking at the available files to write to.The loop just checks which file number is available. We have to use alock here because there is a change two threads run the same lines atthe same time and find the available file to be the same.

When we write to the file, we don't care about the lock, because we knowthat only one thread will write to each file. That is why we create thefile on one line, while the lock is acquired:

open(f'website_data_{i}.dat', 'w').close()

But we write the data on a separate line, without the lock:

with open(f'website_data_{i}.dat', 'wb') as f: f.write(var)

This may seem too convoluted for our purposes, and it is true. However,it shows one possible approach in which several threads could be writingto the hard drive at the same time because they are writing to differentfiles. Note that we have used wb for the opening of the file. The wis because we want to write to the file (not append), and the bbecause the result of reading the response is binary and not a string.Then, we need to trigger the threads we want to download and save thedata. First, we create a list of websites we want to download. In thiscase, Wikipedia homepages in different languages:

website_list = [ 'https://www.wikipedia.org/', 'https://nl.wikipedia.org/', 'https://de.wikipedia.org/', 'https://fr.wikipedia.org/', 'https://pt.wikipedia.org/', 'https://it.wikipedia.org', 'https://ru.wikipedia.org', 'https://es.wikipedia.org', 'https://en.wikipedia.org', 'https://ja.wikipedia.org', 'https://zh.wikipedia.org',]

And then we prepare the queues and trigger the threads:

for ws in website_list: website_queue.put(ws)threads_download = []threads_save = []for i in range(3): t = Thread(target=download_data) t.start() threads_download.append(t) t2 = Thread(target=save_data) t2.start() threads_save.append(t2)

With this, we create lists with the threads running for saving anddownloading. Of course, the numbers could have been different. Then, weneed to be sure we stop the downloading threads:

for i in range(3): website_queue.put(None)

Since we run 3 threads for downloading data, we have to be sure weappend 3 None to the Queue, or some thread won't stop. After we aresure the downloading finished, we can stop the saving:

for t in threads_download: t.join()for i in range(3): data_queue.put(None)

And then we wait for the saving to finish:

for t in threads_save: t.join()print(f'Finished downloading {len(website_list)} websites')

Now we know all the threads have finished and the queues are empty. Ifyou run the program, you can see the list of 10 files created, with theHTML of 10 different Wikipedia homepages.

Conclusions

In the previous article, we haveseen how you can use threading to run different functions at the sametime, and some of the most useful tools you have available to controlthe flow of different threads. In this article we have discussed how youcan share data between threads, exploiting both the fact of the sharedmemory between threads and by using queues.

Having access to shared memory makes programs very quick to develop, butthey can give rise to problems when different threads arereading/writing to the same elements. This was discussed at thebeginning of the article, in which we explored what happens when using asimple operator such as =+ to increase the values of an array by 1.Then we explored how to use Queues to share data between threads, bothbetween the main thread and child threads as between child threads.

To finish, we have shown a very simple example of how to use threads todownload data from a website and save it to disk. The example is verybasic, but we will expand it in the following article. Other IO(input-output) tasks that can be explored are acquisition fromdevices such as a camera, waitingfor user input, reading from disk,etc.

Header Illustration by TsvetelinaStoynova

Handling and Sharing Data Between Threads (2024)
Top Articles
Latest Posts
Article information

Author: Reed Wilderman

Last Updated:

Views: 6310

Rating: 4.1 / 5 (52 voted)

Reviews: 83% of readers found this page helpful

Author information

Name: Reed Wilderman

Birthday: 1992-06-14

Address: 998 Estell Village, Lake Oscarberg, SD 48713-6877

Phone: +21813267449721

Job: Technology Engineer

Hobby: Swimming, Do it yourself, Beekeeping, Lapidary, Cosplaying, Hiking, Graffiti

Introduction: My name is Reed Wilderman, I am a faithful, bright, lucky, adventurous, lively, rich, vast person who loves writing and wants to share my knowledge and understanding with you.