Free cookie consent management tool by TermsFeed get_coro() method of asyncio.Task class | Pythontic.com

Get_coro() method of asyncio.Task class

Overview:

  • The method get_coro() returns the coroutine object wrapped by an asyncio.Task object.

  • In Python, a coroutine object cannot be reused once it has exited. The example below compares the coroutine object returned by the get_coro() and checks whether it is indeed the original coroutine object passed initially to the asyncio.create_task() function.

Example:

# Example Python program that retrieves
# the wrapped coroutine from an asyncio.Task
import asyncio

tickerOn = True

# Coroutine to be wrapped as an asyncio.Task 
async def ticker():
    try:
        global tickerOn    
        while(tickerOn):
            print("tick")
            await asyncio.sleep(1)    
    except asyncio.CancelledError:
        print("Ticker being stopped")
        tickerOn = False
        raise
    finally:
        print("Ticker stopped...")
        tickerOn = False

# Coroutine that creates the asyncio.Task
async def main(ticker1):
    try:
        # Create a task and schedule it
        tickerTask = asyncio.create_task(ticker1)

        print("Making sure create_task() used the same coroutine instance...")
        print("Result:%s"%(ticker1 == tickerTask.get_coro()))
        await asyncio.sleep(2)
        
        # Cancel the task after sometime
        tickerTask.cancel()
        await tickerTask

    except asyncio.CancelledError:
        print("Ticker stopped.")

# Create an event loop and run the task
eventloop = asyncio.new_event_loop()
asyncio.set_event_loop(eventloop)

ticker1 = ticker()
asyncio.run(main(ticker1))

 

Output:

Making sure create_task() used the same coroutine instance...

Result:True

tick

tick

Ticker being stopped

Ticker stopped...

Ticker stopped.


Copyright 2025 © pythontic.com