rukยทsi

๐Ÿ Python
๐Ÿ”„ asyncio (old)

Updated at 2024-04-14 12:29

asyncio introduces concurrency with coroutines and event loops. It was introduced into the standard library in Python 3.4.

Python 3.8+ asyncio uses the async/wait syntax, not this decorator syntax.

import asyncio
import numbers
import typing


@asyncio.coroutine
def progress_logger(
        message: str,
        interval: numbers.Integral = .5
) -> typing.Coroutine:
    """ Prints the given message on the given interval. """
    while True:
        print(message)
        try:
            yield from asyncio.sleep(interval)
        except asyncio.CancelledError:
            break


@asyncio.coroutine
def slow_function() -> typing.Coroutine[None, None, numbers.Integral]:
    """ Pretends to do something. """
    yield from asyncio.sleep(2)
    # never time.sleep() in asyncio coroutine, it will block the main thread
    return 42


@asyncio.coroutine
def supervisor() -> typing.Coroutine[None, None, numbers.Integral]:
    # create a running Task from a coroutine
    logger = asyncio.async(progress_logger('loading...'))
    assert isinstance(logger, asyncio.Task)
    assert isinstance(logger, asyncio.Future)
    result = yield from slow_function()
    logger.cancel()  # raises CancelledError at the yield
    return result


def main() -> None:
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())  # takes coroutine or future
    loop.close()
    assert result == 42


if __name__ == '__main__':
    main()
    # => 4x loading...
    # simple control flow graph:
    #                     โ”Œโ”€โ–บ progress_logger
    # main โ”€โ–บ supervisor โ”€โ”ค
    #                     โ””โ”€โ–บ slow_function

asyncio.Future and concurrent.futures.Future are not the same. They are not interchangeable even if they partially implement the same interface. Main difference is that you use result = yield from with asyncio.Future to get return from the coroutine.

asyncio.Semaphore is useful for limiting running coroutines. Semaphore is an object with an internal counter that is decremented with .acquire() and incremented with .release(). If the counter is zero .acquire() blocks.

import asyncio
import typing


@asyncio.coroutine
def log(name: str, semaphore: asyncio.Semaphore) -> typing.Coroutine:
    with (yield from semaphore):
        print(f'++START {name}')
        yield from asyncio.sleep(.5)
        print(f'--END {name}')
        return f'==RETURN {name}'


@asyncio.coroutine
def log_all() -> typing.Coroutine:
    s = asyncio.Semaphore(2)
    tasks = [log('1', s), log('2', s), log('3', s)]
    for task in asyncio.as_completed(tasks):
        result = yield from task
        print(result)


def main() -> None:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(log_all())
    loop.close()


if __name__ == '__main__':
    main()

You can use loop.run_in_executor to turn code asynchronous. asyncio's even loop has a thread pool executor that can be utilized. This especially useful when writing large files.

import asyncio


def log(message: str) -> None:
    print(message)


def main() -> None:
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None, log, 'Hello World!')
    loop.close()


if __name__ == '__main__':
    main()

Coroutines help to avoid callback hell and pyramid of doom.

import asyncio
import typing


@asyncio.coroutine
def lowerize(message: str) -> typing.Coroutine[None, None, str]:
    yield from asyncio.sleep(.25)
    return message.lower()


@asyncio.coroutine
def leetize(message: str) -> typing.Coroutine[None, None, str]:
    yield from asyncio.sleep(.25)
    return message.replace('i', '1')


@asyncio.coroutine
def consonantize(message: str) -> typing.Coroutine[None, None, str]:
    yield from asyncio.sleep(.25)
    return message.replace('u', '')


@asyncio.coroutine
def plain_of_serenity(message) -> typing.Coroutine[None, None, str]:
    result = yield from lowerize(message)
    result = yield from leetize(result)
    result = yield from consonantize(result)
    return result


def main() -> None:
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(plain_of_serenity('RuKsI'))
    loop.close()
    assert result == 'rks1'


if __name__ == '__main__':
    main()

Source

  • Fluent Python, Luciano Ramalho