TopologicalSorter of graphlib in Python

Overview:

  • TopologicalSorter sorts the nodes of a Directed Acyclic Graph(DAG). 
  • Sorting places the origin node of a directed edge first, followed by the ending node of the edge. In this fashion, the TopologicalSorter sorts all the nodes of a DAG. The process is started at the first predecessor(s).
  • A DAG can be added to a TopologicalSorter object in following ways:
    • By passing the DAG as a Python dictionary in the constructor
    • By calling add() method on a TopologicalSorter object
    • Using the constructor and add() method
  • To get the nodes of a DAG in topological order the method static_sort() is called on the TopologicalSorter instance.

Directed Acyclic Graph

  • Like many other concepts, the DAG is also a mathematical concept which has several applications in real world. When a DAG is thought of as a workflow, each node in the DAG can be visualised as a piece of work. While some of those pieces have one or more dependencies others may not.
  • The pieces of work that do not have dependencies have to be completed first for the workflow to make progress. As the workflow progresses, more and more pieces get to their completion and more pieces of work are ready for execution. In the TopologicalSorter class, the get_ready() method returns the nodes whose predecessors (the dependencies in the workflow parlance) have been already visited.
  • The done() method marks the nodes as visited, which is equivalent to marking a piece of work as done.
  • When all the pieces of work are complete the workflow does not have any other work to perform the workflow is inactive. The method is_active() of the TopologicalSorter class returns this state. It returns True when the DAG is having one or more unvisited nodes (pieces of work yet to be complete in the workflow parlance) and returns False otherwise. 
  • The design and the implementation of the TopologicalSorter class through the interface(in OO sense) of the methods get_ready(), done(), is_active() enables getting the topologicalordering done through multiple threads or through any other parallel execution mechanisms.

Example 1:

import graphlib

graph = {"V4": {"V3", "B3"}, 
         "V3": {"V2"}, 
         "V2": {"V1"},
         "B3": {"B2"}, 
         "B2": {"B1"}, 
         "B1": {"V1"}
         }
ts = graphlib.TopologicalSorter(graph)
sortedNodes  = list(ts.static_order())
print("DAG nodes in topological order:")
print(sortedNodes)

Output:

DAG nodes in topological order:

['V1', 'V2', 'B1', 'V3', 'B2', 'B3', 'V4']

Example 2: Multithreaded topological sorting:

# A Python example doing parallel sorting
# of the nodes of a DAG in topological order
import threading
import queue
from graphlib import TopologicalSorter

# The queues 
workQueue = queue.Queue()
doneQueue = queue.Queue()

# Worker thread function
def worker():
        while True:
            workPiece = workQueue.get()
            workQueue.task_done() 
            doneQueue.put(workPiece)

# Create and start the worker thread
threading.Thread(target=worker, daemon=True).start()

# Define a workflow as a DAG using a Python dictionary
workflow = {"D": {"B", "C"}, 
            "C": {"A"}, 
            "B": {"A"}}
topologicalSorter = TopologicalSorter(workflow)
topologicalSorter.prepare()

# Loop till the workflow is alive
while topologicalSorter.is_active():
    # Get a piece of work that is ready for execution
    for workPiece in topologicalSorter.get_ready():
        workQueue.put(workPiece)

    # Mark the completed workpiece as done
    node = doneQueue.get()
    topologicalSorter.done(node)
    print(node)

# Wait for the worker thread to complete
workQueue.join()

print(topologicalSorter.is_active())

Output:

A

C

B

D

False

 


Copyright 2024 © pythontic.com