Synchronized queue implementations provided by the Python Standard Library

Queue Module:

  • The Queue module in the Python Standard library provides several kinds of queue classes. All these variants of queues are thread-safe but non-reentrant. They can be used in a producer(s)-consumer(s) threads environment, without writing any synchronization code. The synchronization is built-in to the queue implementations.
  • The queue classes provided by the queue module include SimpleQueue, Queue, LifoQueue and PriorityQueue.
  • The Python example provided here uses instances of the queue.Queue class. Queue.Queue is a FIFO based queue.

Producer-Consumer pattern:

  • The producer threads produce messages into a shared queue. In the Python example provided here, the queue is identified with the name rawQueue.
  • The consumer threads consume and process the messages from the shared queue and transform the raw messages into transformed messages. In the Python example, this queue is represented by transformedQueue.
  • The raw queue is shared between multiple producer queues. They put raw messages into the queue.
  • The raw queue is also shared between multiple consumer queues. They remove message from the raw queue and process them.
  • The transformed queue is shared between multiple consumer queues.
  • The above schematic can be extended into a workflow. In such workflow a group of threads will work on one kind of activity and another group of threads will work on another kind of activity. The data for each activity is the shared queue between two kinds of activity threads.
  • In the final activity, shared queue is shared only between the threads that belong to the last activity. There will not be a set of consumer threads beyond this terminal point of the workflow.
Producer-consumer pattern using threads and queues

Example:

# Example Python program that runs a producer thread and a consumer thread
# to transform messages from one queue(raw queue) and write to another queue
# (transformed Queue)
import queue
import threading

# A producer thread
def producerThreadFx(rawQueue):
    for i in range(10):
        rawQueue.put("Raw", timeout = 5);
        print("PT:PQ:%d, ThreadId: %d "%(rawQueue.qsize(),threading.get_native_id()));

# A consumer thread
def consumerThreadFx(rawQueue, transformedQueue):
    for i in range(10):
        # Transform the message read from the raw queue
        message     = rawQueue.get();
        tmessage    = "%s transformed"%message;

        # Write transformed message to the transformed queue
        transformedQueue.put(tmessage, timeout = 5);

        print("CT:PQ:%d, ThreadId:%d"%(rawQueue.qsize(), threading.get_native_id()));
        print("CT:TQ:%d, ThreadId:%d"%(transformedQueue.qsize(), threading.get_native_id()));

# Create a queue for raw messages
rawQueue = queue.Queue();

# Create a queue for transformed messages
transformedQueue = queue.Queue();

# Create a producer thread
producerThread1 = threading.Thread(target = producerThreadFx, args=(rawQueue,));
producerThread2 = threading.Thread(target = producerThreadFx, args=(rawQueue,));

# Create a consumer thread
consumerThread1 = threading.Thread(target = consumerThreadFx, args=(rawQueue,transformedQueue));
consumerThread2 = threading.Thread(target = consumerThreadFx, args=(rawQueue,transformedQueue));

# Start the producer and consumer threads
producerThread1.start();
producerThread2.start();

consumerThread1.start();
consumerThread2.start();

# Wait for all the threads to complete
producerThread1.join();
producerThread2.join();
consumerThread1.join();
consumerThread2.join();

 

Output:

 

PT:PQ:1, ThreadId: 116127

PT:PQ:2, ThreadId: 116127

PT:PQ:3, ThreadId: 116127

...

...

PT:PQ:19, ThreadId: 116128

PT:PQ:20, ThreadId: 116128

CT:PQ:19, ThreadId:116129

CT:TQ:1, ThreadId:116129

CT:PQ:18, ThreadId:116130

CT:PQ:17, ThreadId:116129

CT:TQ:3, ThreadId:116130

...

...

CT:PQ:2, ThreadId:116129

CT:PQ:1, ThreadId:116130

CT:TQ:19, ThreadId:116129

CT:TQ:19, ThreadId:116130

CT:PQ:0, ThreadId:116130

CT:TQ:20, ThreadId:116130


Copyright 2019 © pythontic.com