Free cookie consent management tool by TermsFeed cancel method of asyncio task | Pythontic.com

Cancel method of asyncio task

Overview:

  • A Task is created and scheduled for its execution through the asyncio.create_task() function.

  • Once scheduled, a Task can be requested for cancellation through task.cancel() method.

  • If successful, a CancelledError is thrown into the coroutine corresponding to the Task.

Example:

# Example Python program that makes a cancellation
# request on a scheduled task
from datetime import date, datetime
import asyncio

# Define a coroutine
async def alarm():
    try:
        print("Task entered:%s"%datetime.now())
        for i in range (1, 10):
            print("Beep...%d"%i)
            await asyncio.sleep(1)
        print(date.today())    
    except asyncio.CancelledError as e:
        print("Stopping the alarm")
    finally:
        print("Alarm stopped")
        print("Task cancelled:%s"%datetime.now())


# A coroutine for creating and cancelling tasks
async def PerformTasks(cancelAfter):
    # Create and schedule a task for execution
    t1 = asyncio.create_task(alarm())
    await asyncio.sleep(cancelAfter)

    # Make a cancellation request
    t1.cancel()

    # Wait for the cancellation of t1 to be complete
    await t1
    print("Hello world")

# Create a new event loop and set to asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Run the coroutine containing tasks
cancelAfter = 5
asyncio.run(PerformTasks(cancelAfter))

 

Output:

Task entered:2022-05-06 12:05:12.124871

Beep...1

Beep...2

Beep...3

Beep...4

Beep...5

Stopping the alarm

Alarm stopped

Task cancelled:2022-05-06 12:05:17.130425

Hello world

 


Copyright 2025 © pythontic.com