18.5.3. Tasks and coroutines¶
Coroutines used with asyncio may be implemented using the async def statement, or by using generators . The async def type of coroutine was added in Python 3.5, and is recommended if there is no need to support older Python versions.
Generator-based coroutines should be decorated with @asyncio.coroutine , although this is not strictly enforced. The decorator enables compatibility with async def coroutines, and also serves as documentation. Generator-based coroutines use the yield from syntax introduced in PEP 380, instead of the original yield syntax.
The word “coroutine”, like the word “generator”, is used for two different (though related) concepts:
- The function that defines a coroutine (a function definition using async def or decorated with @asyncio.coroutine ). If disambiguation is needed we will call this a coroutine function ( iscoroutinefunction() returns True ).
- The object obtained by calling a coroutine function. This object represents a computation or an I/O operation (usually a combination) that will complete eventually. If disambiguation is needed we will call it a coroutine object ( iscoroutine() returns True ).
Things a coroutine can do:
- result = await future or result = yield from future – suspends the coroutine until the future is done, then returns the future’s result, or raises an exception, which will be propagated. (If the future is cancelled, it will raise a CancelledError exception.) Note that tasks are futures, and everything said about futures also applies to tasks.
- result = await coroutine or result = yield from coroutine – wait for another coroutine to produce a result (or raise an exception, which will be propagated). The coroutine expression must be a call to another coroutine.
- return expression – produce a result to the coroutine that is waiting for this one using await or yield from .
- raise exception – raise an exception in the coroutine that is waiting for this one using await or yield from .
Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There are two basic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), or schedule its execution using the ensure_future() function or the AbstractEventLoop.create_task() method.
Coroutines (and tasks) can only run when the event loop is running.
Decorator to mark generator-based coroutines. This enables the generator use yield from to call async def coroutines, and also enables the generator to be called by async def coroutines, for instance using an await expression.
There is no need to decorate async def coroutines themselves.
If the generator is not yielded from before it is destroyed, an error message is logged. See Detect coroutines never scheduled .
In this documentation, some methods are documented as coroutines, even if they are plain Python functions returning a Future . This is intentional to have a freedom of tweaking the implementation of these functions in the future. If such a function is needed to be used in a callback-style code, wrap its result with ensure_future() .
18.5.3.1.1. Example: Hello World coroutine¶
Example of coroutine displaying "Hello World" :
The Hello World with call_soon() example uses the AbstractEventLoop.call_soon() method to schedule a callback.
18.5.3.1.2. Example: Coroutine displaying the current date¶
Example of coroutine displaying the current date every second during 5 seconds using the sleep() function:
18.5.3.1.3. Example: Chain coroutines¶
Example chaining coroutines:
compute() is chained to print_sum() : print_sum() coroutine waits until compute() is completed before returning its result.
Sequence diagram of the example:
The “Task” is created by the AbstractEventLoop.run_until_complete() method when it gets a coroutine object instead of a task.
The diagram shows the control flow, it does not describe exactly how things work internally. For example, the sleep coroutine creates an internal future which uses AbstractEventLoop.call_later() to wake up the task in 1 second.
18.5.3.2. InvalidStateError¶
The operation is not allowed in this state.
18.5.3.3. TimeoutError¶
The operation exceeded the given deadline.
This exception is different from the builtin TimeoutError exception!
18.5.3.4. Future¶
-
and exception() do not take a timeout argument and raise an exception when the future isn’t done yet.
- Callbacks registered with add_done_callback() are always called via the event loop’s call_soon_threadsafe() .
- This class is not compatible with the wait() and as_completed() functions in the concurrent.futures package.
Cancel the future and schedule callbacks.
If the future is already done or cancelled, return False . Otherwise, change the future’s state to cancelled, schedule the callbacks and return True .
Return True if the future was cancelled.
Return True if the future is done.
Done means either that a result / exception are available, or that the future was cancelled.
Return the result this future represents.
If the future has been cancelled, raises CancelledError . If the future’s result isn’t yet available, raises InvalidStateError . If the future is done and has an exception set, this exception is raised.
Return the exception that was set on this future.
The exception (or None if no exception was set) is returned only if the future is done. If the future has been cancelled, raises CancelledError . If the future isn’t done yet, raises InvalidStateError .
Add a callback to be run when the future becomes done.
The callback is called with a single argument — the future object. If the future is already done when this is called, the callback is scheduled with call_soon() .
Use functools.partial to pass parameters to the callback . For example, fut.add_done_callback(functools.partial(print, "Future:", flush=True)) will call print("Future:", fut, flush=True) .
Remove all instances of a callback from the “call when done” list.
Returns the number of callbacks removed.
Mark the future done and set its result.
If the future is already done when this method is called, raises InvalidStateError .
Mark the future done and set an exception.
If the future is already done when this method is called, raises InvalidStateError .
18.5.3.4.1. Example: Future with run_until_complete()¶
Example combining a Future and a coroutine function :
The coroutine function is responsible for the computation (which takes 1 second) and it stores the result into the future. The run_until_complete() method waits for the completion of the future.
The run_until_complete() method uses internally the add_done_callback() method to be notified when the future is done.
18.5.3.4.2. Example: Future with run_forever()¶
The previous example can be written differently using the Future.add_done_callback() method to describe explicitly the control flow:
In this example, the future is used to link slow_operation() to got_result() : when slow_operation() is done, got_result() is called with the result.
18.5.3.5. Task¶
Schedule the execution of a coroutine : wrap it in a future. A task is a subclass of Future .
A task is responsible for executing a coroutine object in an event loop. If the wrapped coroutine yields from a future, the task suspends the execution of the wrapped coroutine and waits for the completion of the future. When the future is done, the execution of the wrapped coroutine restarts with the result or the exception of the future.
Event loops use cooperative scheduling: an event loop only runs one task at a time. Other tasks may run in parallel if other event loops are running in different threads. While a task waits for the completion of a future, the event loop executes a new task.
The cancellation of a task is different from the cancelation of a future. Calling cancel() will throw a CancelledError to the wrapped coroutine. cancelled() only returns True if the wrapped coroutine did not catch the CancelledError exception, or raised a CancelledError exception.
If a pending task is destroyed, the execution of its wrapped coroutine did not complete. It is probably a bug and a warning is logged: see Pending task destroyed .
Don’t directly create Task instances: use the ensure_future() function or the AbstractEventLoop.create_task() method.
classmethod all_tasks ( loop=None ) ¶
Return a set of all tasks for an event loop.
By default all tasks for the current event loop are returned.
classmethod current_task ( loop=None ) ¶
Return the currently running task in an event loop or None .
By default the current task for the current event loop is returned.
None is returned when called not in the context of a Task .
Request that this task cancel itself.
This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop. The coroutine then has a chance to clean up or even deny the request using try/except/finally.
Unlike Future.cancel() , this does not guarantee that the task will be cancelled: the exception might be caught and acted upon, delaying cancellation of the task or preventing cancellation completely. The task may also return a value or raise a different exception.
Immediately after this method is called, cancelled() will not return True (unless the task was already cancelled). A task will be marked as cancelled when the wrapped coroutine terminates with a CancelledError exception (even if cancel() was not called).
Return the list of stack frames for this task’s coroutine.
If the coroutine is not done, this returns the stack where it is suspended. If the coroutine has completed successfully or was cancelled, this returns an empty list. If the coroutine was terminated by an exception, this returns the list of traceback frames.
The frames are always ordered from oldest to newest.
The optional limit gives the maximum number of frames to return; by default all available frames are returned. Its meaning differs depending on whether a stack or a traceback is returned: the newest frames of a stack are returned, but the oldest frames of a traceback are returned. (This matches the behavior of the traceback module.)
For reasons beyond our control, only one stack frame is returned for a suspended coroutine.
Корутины в Python
Когда говорят «написать корутину», обычно подразумевают асинхронную функцию. Корутины можно ставить на паузу, чтобы дать другим функциям немного поработать. В этом заключается принцип асинхронности. О нём мы рассказывали в этой статье.
Давайте сразу рассмотрим пример асинхронной функции:
Очень похоже на обычную функцию, однако здесь есть два новых слова: async и await .
async говорит Питону о том, что мы пишем не просто функцию, а асинхронную функцию. Просто добавили async и всё, функция теперь асинхронная.
Второе слово — await. Оно прерывает исполнение функции, и возвращает управление программой наружу. После этого корутину можно запустить повторно, а затем еще и еще, и каждый раз она будет продолжать работу с того await , на котором прервалась ранее. Например, в функции count_to_three команда await встречается три раза, значит корутину можно вызвать четыре раза (да, не три!). Корутина будет работать до первого await, затем до второго, до третьего и на четвёртый раз выполнит остатки до конца.
Нельзя делать await None или await "Hello, World!" . Можно await только то, что так и называют — «awaitable».
await asyncio.sleep(0) — это команда корутине «Дай поработать другим!»
Сразу покажем, как это выглядит на практике:
Мы вызываем асинхронную функцию count_to_three , однако она не выводит на экран цифру 1, а возвращает корутину. Все асинхронные функции так делают. Это сделано для того, чтобы у вас был объект этой корутины в переменной. Теперь корутину можно запускать раз за разом, а она раз за разом будет делать кусочек и останавливаться на следующем await .
Чтобы запустить корутину, используют метод send() . При каждом запуске корутины этим методом она продолжает исполняться с последнего await , на котором она остановилась. Поэтому при новом запуске той же корутины срабатывает не тот же print , а следующий.
Нельзя просто .send() . Всегда нужно передавать какое-то значение. Об этом тоже расскажем позже. Пока что воспринимайте .send(None) как команду «продолжи выполнять корутину».
Когда корутина закончится?
Она остановится навсегда, когда закончатся все await или встретится return . Когда корутина заканчивается — она истощается и вызов .send() выдаёт ошибку:
Если мы хотим запустить наш счётчик сначала, придётся создать новую корутину, вызвав count_to_three() :
Обычно заранее не известно сколько await будет до момента «истощения», поэтому исключение приходится «перехватывать»:
Исключение StopIteration возникает всего один раз. Если после него попробовать запустить корутину ещё раз, то поднимется другое исключение — RuntimeError , и оно уже будет считаться ошибкой. О том как работать с исключениями читайте в статье про try except.
Нельзя запускать истощённую корутину.
Добиваемся асинхронности
С корутинами разобрались, останавливать их научились. А зачем.
Корутины позволят вашему коду работать асинхронно, т.е. делать несколько вещей одновременно. Допустим, вы решили скачать несколько файлов. Обычный, синхронный код скачивает файлы по-очереди. Сначала первый файл целиком, затем второй, тоже целиком. Асинхронный код качает файлы одновременно, по кусочкам. Приведём пример скачивания двух файлов:
Разберём как работает код:
Мы создали 2 корутины: image_downloader и music_downloader . Первая качает картинку по ссылке https://www.some-images.com/image1.jpg , вторая — музыку по ссыке https://www.music-site.com/artist/album/song5.mp3 .
Мы положили их в список coroutines
В бесконечном цикле мы по очереди запускаем все корутины из списка. Если вышла ошибка StopIteration — корутина истощилась, т.е. файл скачан. Убираем её из списка, корутина больше запускаться не будет.
Чтобы итерация по списку coroutines не сбивалась после удаления элемента из него итерируем не по оригиналу, а по копии coroutines.copy() .
Если список с корутинами закончился (его длина равна нулю), пора заканчивать и бесконечный цикл, потому что все файлы скачаны.
Передать параметры в асинхронную функцию
В плане аргументов асинхронные функции ничем не отличаются от обычных. Доработаем пример со счетчиком и вместо async def count_to_three напишем универсальную функцию async def count :
Попробуйте бесплатные уроки по Python
Получите крутое код-ревью от практикующих программистов с разбором ошибок и рекомендациями, на что обратить внимание — бесплатно.
Корутины и задачи¶
В этом разделе приведено высокоуровневое API asyncio для работы с корутинами и задачами.
Корутины¶
Корутины , объявляемые с помощью async/await синтаксиса, является предпочтительным способом написания asyncio приложений. Например, следующий фрагмент кода (требует Python 3.7) напечатает «hello», ожидает 1 секунду, а затем печатает «world»:
Заметим, что простой вызов корутины не приведёт к её выполнению:
Чтобы фактически запустить корутину, asyncio предоставляет три основных механизма:
Функция asyncio.run() для запуска функции точки входа верхнего уровня «main()» (см. приведённый пример выше)
Ожидающая корутина. Следующий фрагмент кода напечатает «hello» после ожидания в 1 секунду, а затем напечатает «world» после ожидания в течении ещё 2х секунд
Функция asyncio.create_task() используется для конкурентного запуска корутин как asyncio: Tasks .
Давайте изменим приведённый выше пример и запустим две say_after корутины конкурентно:
Обратите внимание, что ожидаемые выходные данные теперь показывают, что фрагмент выполняется на 1 секунду быстрее, чем ранее:
Ожидаемые объекты¶
Мы говорим, что объект является ожидаемым объектом, если его можно использовать в await выражении. Многие API-интерфейсы asyncio предназначены для приёма ожидаемых. Существует три основных типа ожидаемых объектов: корутины, задачи и футуры.
Python корутины являются ожидаемыми и поэтому могут ожидаться из других корутин:
В этой документации термин «корутина» может использоваться для двух тесно связанных понятий:
- Функция корутина: функция async def ;
- Объект корутины: возвращенный объект после вызова функции корутины.
asyncio также поддерживает устаревшие основанные на генераторах корутины.
Задачи используются для конкурентного планирования корутин.
Когда корутина обвёрнута в Задачу с такими функциями, как asyncio.create_task() , то автоматически планируется запуск корутины в ближайшее время:
Объект Future — это специальный ожидаемый (await) объект низкого уровня , представляющий конечный результат асинхронной операции.
Когда объект Футуры ожидается, это означает, что корутина будет ждать, пока Футура будет решена в каком-то другом месте.
Объекты Футуры в asyncio нужны, чтобы позволить основанному на колбэках коду использоваться с async/await.
Обычно нет нужды создавать объекты Футуры на уровне кода приложения.
Объекты Футуры, иногда раскрываемые библиотеками и некоторыми asyncio API, могут быть ожидаемыми:
Хорошим примером низкоуровневой функции, возвращающей объект Футуры, является loop.run_in_executor() .
Запуск asyncio программы¶
Выполняет корутину coro и возвращает результат.
Функция управляет переданной корутиной, заботясь об управлении asyncio событийного цикла и завершения асинхронных генераторов.
Функция не может быть вызвана, когда в том же потоке выполняется другой asyncio событийный цикл.
Если debug — True , событийный цикл будет выполняться в режиме отладки.
Функция всегда создаёт новый событийный цикл и закрывает его в конце. Его следует использовать в качестве основной точки входа для asyncio программ, и в идеале его следует вызывать только один раз.
Добавлено в версии 3.7.
Исходный код asyncio.run() можно найти в Lib/asyncio/runners.py.
Создание задач¶
Обёртывание coro корутины в Task и запланировать её выполнение. Возвращает объект задачи.
Если name не None , он задаётся как имя задачи с помощью Task.set_name() .
Задача выполняется в цикле, возвращенного get_running_loop() . Вызывает RuntimeError , если в текущем потоке нет запущенного цикла.
Функция была добавлена в Python 3.7. Ранее Python 3.7 вместо неё можно использовать низкоуровневую функцию asyncio.ensure_future() :
Добавлено в версии 3.7.
Изменено в версии 3.8: Добавлен параметр name .
Блокировка на delay секунд.
Если result предоставляется, он возвращается вызывающему после завершения корутины.
sleep() всегда приостанавливает выполнение текущей задачи, позволяя выполнять другие задачи.
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Пример корутины, отображающей текущую дату каждую секунду в течение 5 секунд:
Конкурентный запуск задач¶
Запускает ожидаемые объекты в последовательности aws конкурентно.
Если какой-либо ожидаемый объект в aws является корутиной, он автоматически назначается как задача.
Если все await объекты выполнены успешно, результатом является сводный список возвращенных значений. Порядок значений результата соответствует порядку await в aws.
Если return_exceptions является False (по умолчанию), первое вызванное исключение немедленно распространяется на задачу, которая ожидает на gather() . Другие await объекты в aws последовательности не будут отменены и продолжат работу.
При return_exceptions True исключения обрабатываются так же, как и успешные результаты, и агрегируются в списке результатов.
Если gather() отменён, все представленные ожидаемые (которые ещё не завершены) также будут отменены.
Если какая-либо задача или футура в последовательности aws отменена, это рассматривается, как будто сработало исключение CancelledError — вызов gather() не отменяется в этом случае. Это необходимо для предотвращения отмены одной отправленной задачи/футуры, чтобы привести к отмене других задач/футур.
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Если return_exceptions содержит значение False, отмена gather() после того, как он был помечен как выполненный, не отменит ни одного отправленного ожидаемого объекта. Например, gather может быть помечена как выполненная после передачи исключения вызывающей стороне, поэтому вызов gather.cancel() после перехвата исключения (вызванного одним из ожидаемых объектов) из gather не отменяет другие ожидаемые объекты.
Изменено в версии 3.7: Если gather отменяется, отмена распространяется независимо от return_exceptions.
Защита от отмены¶
Если aw корутина, она автоматически назначается как задача.
кроме того, что если корутина, содержащая её, отменяется, задача, выполняемая в something() , не отменяется. С точки зрения something() отмены не произошло. Хотя его вызывающий объект всё ещё отменён, «await» выражение по-прежнему вызывает CancelledError .
Если something() отменяется другими средствами (т.е. изнутри), которые также отменяют shield() .
Если требуется полностью игнорировать отмену (не рекомендуется), функция shield() должна быть объединена с предложением try/except следующим образом:
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Таймауты¶
Дождаться завершения aw ожидаемого с таймаутом.
Если aw корутина, она автоматически назначается как задача.
timeout может быть либо None , либо числом секунд ожидания с плавающей запятой, либо int числом. Если timeout None , блокировать до завершения футуры.
Если завершается таймаут, задача отменяется и вызывается asyncio.TimeoutError .
Чтобы избежать отмены задачи, оберните её в shield() .
Функция будет ждать, пока футура будет фактически отменена, поэтому общее время ожидания может превысить timeout.
Если ожидание отменяется, то также отменяется и будущий aw.
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Изменено в версии 3.7: Когда aw отменяется из-за тайм-аута, wait_for ожидает отмены aw. Ранее она сразу вызывала asyncio.TimeoutError .
Примитивы ожидания¶
Конкурентный запуск ожидаемых объектов в итерации aws и блокировка, пока не будет выполнено условие, указанное в return_when.
Возвращает два множества задачи/футуры: (done, pending) .
timeout (float или int), если он указан, можно использовать для управления максимальным количеством секунд ожидания перед возвращением.
Обратите внимание, что функция не вызывает asyncio.TimeoutError . Футуры или задачи, которые не были выполнены при наступлении тайм-аута, просто возвращаются во втором множестве.
return_when указывает, когда функция должна возвращать. Она должна быть одной из следующих констант:
Константа | Описание |
---|---|
FIRST_COMPLETED | Функция возвращает, когда любая футура завершится или отменится. |
FIRST_EXCEPTION | Функция возвращает после завершения любого процесса футуры путём создания исключения. Если в футуре исключение не вызывается, то оно эквивалентно ALL_COMPLETED . |
ALL_COMPLETED | Функция возвращает после завершения или отмены всех футур. |
В отличие от wait_for() , wait() не отменяет футуры при наступлении тайм-аута.
Не рекомендуется, начиная с версии 3.8: Если какой-либо ожидаемый в aws является корутиной, он автоматически назначается как задача. Непосредственная передача объектов корутине в wait() является устаревшей практикой, т. к. приводит к запутанному поведению .
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
wait() автоматическое планирование корутины как задачи, затем возвращает создаваемые объекты задачи в множестве (done, pending) . Поэтому следующий код не будет работать так, как ожидалось:
Вот как можно зафиксировать вышеуказанный фрагмент:
Не рекомендуется, начиная с версии 3.8: Передача корутиновых объектов непосредственно в wait() устарела.
Запуск ожидаемых объектов в aws итеративно и конкурентно. Возвращает итератор корутины. Каждую возвращенную корутину можно ожидать, чтобы получить самый ранний следующий результат от итерируемого из оставшихся ожидаемых.
Вызывает asyncio.TimeoutError , если тайм-аут наступает до выполнения всех футур.
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Планирование из других потоков¶
Отправить корутину в событийный цикл. Потокобезопасный.
Возвращает concurrent.futures.Future дожидаясь результата из другого потока ОС.
Функция вызывается из потока операционной системы, отличного от того, где выполняется событийный цикл. Пример:
Если в корутине вызывается исключение, возвращенная футура будет уведомлена. Также можно использовать отмену задачи в событийном цикле:
В отличие от других asyncio функций данная функция требует явной передачи loop аргумента.
Добавлено в версии 3.5.1.
Интроспекция¶
Возвращает текущую Task сущность или None , если задача не выполняется.
Если loop — None , используется get_running_loop() для получения текущего цикла.
Добавлено в версии 3.7.
Возвращает множество ещё не завершенных объектов Task , запущенных в цикле.
Если loop — None , используется get_running_loop() для получения текущего цикла.
Добавлено в версии 3.7.
Объект задачи¶
Футуроподобный объект, запускающий Python корутину . Не потокобезопасной.
Задачи используются для запуска корутин в событийных циклах. Если корутина ожидает футуры, задача приостанавливает выполнение корутины и ожидает завершения футуры. Когда выполнено, футура возобновляет исполнение обёрнутой корутины.
Событийный цикл использует совместное планирование: событийный цикл выполняет одну задачу одновременно. В то время как задача ожидает для завершения футуры, событийный цикл выполняет другие задачи, колбэки или выполняет операции ввода-вывода.
Используйте высокоуровневую функцию asyncio.create_task() , чтобы создать задачи или низкоуровневые функции loop.create_task() или ensure_future() . Не рекомендуется создавать экземпляры задач вручную.
Для отмены выполняемой задачи используйте метод cancel() . Этот вызов приведёт к тому, что задача бросит CancelledError исключение в обернутую корутину. Если корутина ожидает в объекте футуры во время отмены, объект футуры будет отменён.
Можно использовать cancelled() , чтобы проверить, была ли задача отменена. Метод возвращает True , если обёрнутая корутина не подавила CancelledError исключение и фактически была отменена.
Задачи поддерживают модуль contextvars . При создании задачи она копирует текущий контекст, а затем запускает корутину в скопированном контексте.
Изменено в версии 3.7: Добавлена поддержка модуля contextvars .
Изменено в версии 3.8: Добавлен параметр name .
Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.
Запрос отмены задачи.
Это позволяет создать CancelledError исключение для обернутой корутины на следующем цикле событийного цикла.
После этого у корутины есть шанс очистить или даже отклонить просьбу, подавив исключение в блоке try … except CancelledError … finally . Поэтому, в отличие от Future.cancel() , Task.cancel() не гарантирует, что задача будет отменена, хотя подавление отмены полностью не распространено и активно отговаривается.
В следующем примере показано, как корутины могут перехватывать запрос на отмену:
Возвращает True , если задача отменена.
Задача отменена, когда запрашивалась отмена с cancel() и обернутая корутина распространила в неё CancelledError исключение.
Возвращает True , если задача завершена.
Задача завершена, когда обернутая корутина либо возвратила значение, либо вызвала исключение, либо задача была отменена.
Возвращает результат выполнения задачи.
Если задача является завершенной, то результатом обернутой корутины является возвращаемое (или если корутина вызвала исключение, то это исключение возникает повторно.)
Если задача была отменена, это метод вызывает CancelledError исключение.
Если результат задачи ещё не доступен, это метод вызывает InvalidStateError исключение.
Возвращает исключение задачи.
Если у обернутой корутины возникло исключение, то возвращается это исключение. Если обернутая корутина возвращается нормально, то этот метод возвращает None .
Если задача была отменена, это метод вызывает CancelledError исключение.
Если задача еще не завершена, это метод вызывает InvalidStateError исключение.
add_done_callback ( callback, *, context=None ) ¶
Добавление колбэка для выполнения при выполнении задачи.
Этот метод должен быть использован только в низкоуровневом основанном на колбэках коде.
Для получения дополнительной информации см. документацию Future.add_done_callback() .
Удалить callback из списка колбэков.
Этот метод должен быть использован только в низкоуровневом основанном на колбэках коде.
Для получения дополнительной информации см. документацию Future.remove_done_callback() .
Возвращает список фреймов стека для этой задачи.
Если обёрнутая корутина не выполнена, он возвращает стек, где он приостановлен. Если корутина успешно завершена или отменена, возвращает пустой список. Если корутина была прервана исключением, возвращает список кадров трейсбэка.
Фреймы всегда упорядочиваются от самых старых до самых новых.
Для приостановленной корутины возвращается только одни фрейм стека.
Необязательный аргумент limit устанавливает максимальное количество возвращаемых кадров; по умолчанию возвращаются все доступные фреймы. Порядок возвращаемого списка различается в зависимости от того, возвращается ли стек или трассировка: возвращаются самые новые фреймы стека, но возвращаются самые старые фреймы трассировки. (Это соответствует поведению модуля трассировки.)
Печать стека или трейсбэка для этой задачи.
При этом выводятся выходные данные, аналогичные данным модуля traceback для фреймов, извлекаемых get_stack() .
Аргумент limit передается непосредственно get_stack() .
Аргумент file представляет собой поток I/O, в который записываются выходные данные; по умолчанию выходные данные записываются в sys.stderr .
Возвращает объект корутины, обернутый Task .
Добавлено в версии 3.8.
Возвращает имя задачи.
Если ни одно имя не было явно назначено задаче, реализация задачи asyncio по умолчанию создаёт имя по умолчанию во время создания экземпляра.
Добавлено в версии 3.8.
Задание имя задачи.
Аргументом value может быть любой объект, который затем преобразуется в строку.
В реализации задачи по умолчанию имя будет отображаться в repr() выходных данных объекта задачи.
Добавлено в версии 3.8.
Возвращает множество всех задач для событийного цикла.
По умолчанию возвращаются все задачи для текущего событийного цикла. Если loop None , то используется get_event_loop() функция для получения текущего цикла.
Устарело с версии 3.7, будет удалено в 3.9 версии.: Не вызывайте этот метод задач. Вместо этого используйте функцию asyncio.all_tasks() .
Возвращает текущую запущенную задачу или None .
Если loop — None , используется функция get_event_loop() для получения текущего цикла.
Устарело с версии 3.7, будет удалено в 3.9 версии.: Не вызывайте этот метод задач. Вместо него используйте функцию asyncio.current_task() .
Основанные на генераторах корутины¶
Поддержка основанных на генераторах корутин запрещено и планируется к удалению в Python 3.10.
Корутины на основе генератора предшествовали синтаксису async/await. Они представляют собой Python генераторы, которые используют yield from выражения для ожидания футур и других корутин.
Генераторные корутины должны быть задекорированы @asyncio.coroutine , хотя это не применяется.
Декоратор для маркировки основанных на генераторах корутин.
Этот декоратор обеспечивает совместимость устаревших основанных на генераторах корутин с async/await кодом:
Этот декоратор не должен использоваться для async def корутин.
Устарело с версии 3.8, будет удалено в 3.10 версии.: Используйте async def вместо этого.
Что такое сопрограммы (coroutines) в Python
Посвященное сопрограммам в Python выступление Джона Риза (John Reese), инженера-технолога Facebook, на North Bay Python 2019. Докладчик удачно начинает с азов (функции, стек) и постепенно приближается к асинхронному программированию, проходя через генераторы и доступные виды параллелизма. В материал включены все примеры кода, большинство слайдов выступления и почти весь текст (кроме некоторых замечаний, которые не относились к теме).
- Вступление
- Функции
- Взаимосовместимость (concurrency)
- Реализация взаимосовместимости
- Кооперативная многозадачность (cooperative multitasking)
- Генераторы (Generators)
- Сопрограммы (coroutines)
- Пример программы с кооперативной многозадачностью
- Сопрограммы со стандартной библиотекой Python
- Заключение
Вступление
Всем привет, я работаю с Лизой, с которой вы все познакомились вчера утром. У нас своя внутренняя команда разработчиков на Python, и мы стремимся сосредоточиться на создании инструментов и инфраструктуры для Python в Facebook. Также я стараюсь копаться во многих моментах, которые не обязательно всем интересны, но интересны мне лично. Мне нравится все узнавать.
Итак, 10 лет назад я получил степень инженера ПО в школе, которая была слишком дорогой и находилась в северной части штата Нью-Йорк, где было слишком холодно. Было много снега, поэтому, естественно, мне нравилось задавать вопросы, например, зачем я сюда приехал и сколько я должен.
Одна из причин, почему я люблю Python — это легкость поиска ответов на вопросы о том, как и почему все работает.
Итак, сопрограммы (coroutines). Это основа асинхронного программирования на Python и фундаментальный стройматериал инфраструктуры асинхронного ввода-вывода (async i/o). Но эти моменты тоже могут оказаться окружены большой тайной.
Что такое сопрограммы, как они работают, почему они так важны. Я сейчас дам небольшой спойлер насчет финального ответа. Давайте посмотрим, что говорит источник всей правды и знаний в Интернете.
Википедия пишет, что сопрограммы – компоненты компьютерной программы, которые обобщают подпрограммы для реализации не вытесняющей многозадачности (non pre-emptive multitasking), позволяя приостанавливать и возобновлять исполнение.
Если среди слушателей есть роботы, то они наверное будут удовлетворены этим ответом. Постичь смысл этой фразы невозможно.
Давайте попробуем разбить ее на понятные человеку элементы. Сопрограмма – особый вид функции, которая реализует взаимосовместимость через кооперативную многозадачность. Мы изучим, откуда взялись сопрограммы, и разберемся, как они работают и зачем они нам нужны.
Придется охватить много важных концепций. Поэтому я сосредоточусь на базовых идеях, а затем на том, что лежит в их основе, и как это все соотносится с Python.
Функции
Начнем с самого начала. Что такое функция. На этот раз не буду тратить ваше время на то самое единственное правильное определение, так как мы все здесь сегодня люди, за исключением, может быть, вчерашнего бота skull.
Функция — последовательность инструкций, которая принимает входные данные и возвращает выходные данные. Практически любая полезная функция на любом языке программирования представляет собой фундаментальный строительный блок. Она позволяет организовать код во многоразовые компоненты, которые принимают входные данные, что-то делают с ними и выдают выходные данные.
Мы можем исполнить функцию main() , и рабочая среда (runtime) проходит через все операторы, а дойдя до вызова функции, приостанавливает исполнение текущей функции, направляет входные данные вызываемой функции, исполняет тело данной функции, затем направляет выходные данные обратно вызывающему объекту и возобновляет исполнение исходной функции с того места, на котором вышла.
На данный момент все кажется достаточно простым, но есть много деталей, которые мы принимаем как должное. Можно пойти глубже.
Как вы, наверное, знаете, CPython использует виртуальную машину для исполнения наших программ. В нем есть замечательная утилита позволяющая заглянуть под покровы. Модуль dis , сокращенно от «дизассемблировать», может дать нам понятный для человека список скомпилированного байткода для каждой функции.
Данный байткод представляет собой точные инструкции, используемые виртуальной машиной или рабочей средой для исполнения нашего кода Python.
Это выходные данные, которые мы получаем при дизассемблировании функций square() и main() .
В данном виде уже выглядит сложнее, но если мы продолжим задавать вопросы и копать глубже, все начнет обретать смысл. Нам просто нужно понять контекст того, как функционирует рабочая среда.
Как вы, наверное, тоже знаете, рабочая среда CPython использует виртуальную машину на основе стека для исполнения инструкций. Нет регистров, которые можно встретить или использовать в программировании на ассемблере.
Если инструкциям нужны какие-то фрагменты данных, то эти данные нужно сначала поместить в стек. Сам по себе, стек – просто линейный блок памяти, содержащий данные или ссылки на них. Рабочая среда использует так называемый указатель стека, чтобы отслеживать местонахождеине вершины стека при исполнении инструкций.
Можно помещать данные на вершину стека. Например, у инструкции LOAD_FAST задача одна: поместить одно значение в стек. Каждый раз, когда это происходит, указатель вершины стека поднимается, чтобы указывать на новую вершину стека.
Другие инструкции могут затем потреблять (consume) или извлекать (pop) элементы из стека и даже добавлять новые элементы на вершину.
В то же время, например, математическая операция, скажем, умножение, извлекает из стека два верхних элемента. В результате указатель стека сбрасывается.
Затем она исполняет вычисление и помещает полученное значение обратно в стек для использования в будущих инструкциях.
Но не все живет только в стеке. Данные, которые переживают создавшую их функцию, должны храниться в другом месте, чтобы не исчезнуть после возврата из функции.
Поэтому, как и большинство языков, Python использует динамическую память (heap, «куча») для хранения объектов в долговременной памяти. В отличие от стека, это просто неупорядоченное пространство, в котором объекты могут быть выделены и освобождены в любой момент во время исполнения.
Часто элементы в стеке представляют собой просто ссылки на реальные объекты в куче. Но, опять-таки, сейчас нам менее важны конкретные детали, данная связь, чем понимание того, что она существуют.
Если мы вернемся к вышеприведенным дизассемблированным инструкциям, то сможем разобраться в том, что происходит при исполнении функции main() .
Каждая строка представляет собой одну инструкцию для виртуальной машины CPython. Этот процесс немного похоже на инструкции в ассемблерном коде, но он разработан для идеализированной, но не настоящей архитектуры ЦП.
Если мы посмотрим на отдельную инструкцию, то увидим четыре части информации:
— номер строки исходного кода (10), которая отображается группой инструкций;
— адрес инструкции (0) относительно вершины класса функции или модуля в зависимости от того, что дизассемблировано;
— код операции (LOAD_GLOBAL), который представляет собой конкретную операцию виртуальной машины на исполнение;
— числовой параметр (0 (square)) для операции, часто в виде исходного значения или индекса в некотором наборе данных, например в списке глобальных или локальных переменных; значение в скобках нужно только для того, чтобы мы, люди, знали, что представляет собой данный параметр.
Если сравнить это с нашим исходным кодом, то можно увидеть, как все это связано с функциональностью.
Эти инструкции исполняют умножение и возвращают результат:
Эти инструкции вызывают функцию square() и сохраняют результат в X :
Эти выводят значение, хранящееся в X :
Наконец, эти последние две инструкции представляют собой неявный оператор возврата:
Теперь перейдем к тому, что при исполнении кода рабочей среде нужно отслеживать свое положение в наборе инструкций. Каждой инструкции выделяется заранее определенный зафиксированный объем памяти. Следовательно, у нее свой адрес в памяти. Рабочая среда будет просто отслеживать адрес в памяти для следующей инструкции, которую она должна исполнить.
Это — указатель инструкции.
Каждый раз, когда рабочая среда исполняет инструкцию, она автоматически переводит указатель инструкции на следующий шаг. Затем специальные инструкции могут менять указатель инструкции, позволяя рабочей среде переключаться между различными разделами кода.
Именно это лежит в основе управления последовательностью операций (flow control), в том числе условными блоками if-else , циклами for и while , а также вызовами функций.
Теперь давайте вернемся к прошлому стеку и концептуально рассмотрим, как все это может исполняться рабочей средой.
Сначала мы добавляем ссылку на функцию square() , которую мы хотим вызвать. Она находится в глобальном контексте (global scope).
Далее после каждой операции мы поднимаем указатель инструкции. Мы загружаем постоянное значение параметра (LOAD_CONST), который мы передаем нашей функции square() .
Теперь мы можем запустить операцию для реального вызова функции. Параметром данной инструкции является количество передаваемых нами аргументов, чтобы операция вызова функции (CALL_FUNCTION), знала, что нужно потребить данное количество элементов из стека, прежде чем искать функцию для вызова.
Параметры, которые она потребляет, будут затем использоваться для создания набора локальных переменных, доступных внутри данной функции.
После извлечения аргументов функции из стека и повышения указателя инструкции операция вызова функции теперь может извлечь саму ссылку на функцию и использовать значение для обновления позиции указателя инструкции, перемещая его в первый адрес данной функции, сохраняя при этом предыдущее значение указателя инструкции на стеке, чтобы мы знали, куда вернуться позже.
Теперь наше исполнение находится в функции square() :
Первым делом мы загружаем X в стек, и, поскольку мы умножаем X на самого себя, то мы загружаем X в стек и второй раз.
Теперь мы можем по-настоящему умножить два значения между собой, и благодаря этому обе стороны умножения будут извлечены из стека. Четыре будет умножено на четыре, а полученное значение будет сохранено обратно в стек.
Теперь у нас есть результат, и мы готовы вернуть его вызывающему объекту.
Операция возврата значения извлекает результат из стека, затем восстанавливает указатель инструкции со следующим значением в стеке:
Далее она снова добавляет фактический результат обратно в стек:
Мы вернулись к нашей функции main() с того места, на котором остановились, но с результатом функции square() , который остается на верхушке стека. Мы можем сохранить данное значение в локальной переменной X .
Следующая строка кода должна вывести значение X . Поэтому мы добавляем ссылку на функцию print() :
А затем ссылку на X благодаря стеку. Теперь мы снова исполняем операцию вызова функции (CALL_FUNCTION), и снова это будет с одним параметром, равным единице, так как в наличии только один аргумент.
Благодаря этому ссылка на X будет потреблена и помещена в локальный контекст print . Затем снова ссылка на функцию извлекается, указатель инструкции обновляется, а в стеке сохраняется закладка для возврата из окна.
Теперь такой момент. Поскольку функция print() реализована на C, нет байт-кода, за которым мы могли бы следовать. По сути, исполнение будет происходить за пределами обычной виртуальной машины, но результаты все так же будут переноситься в стек при возврате.
Кроме того, поскольку у нас функция print() , при возврате мы получаем None .
Указатель инструкции снова там, где мы остановились. Но, так как мы не присваиваем никаких значений, следующая инструкция просто извлекает значение из стека и отбрасывает его.
Теперь, когда функция main() завершена, мы можем подготовиться к возврату неявного значения None , которое присваивается постоянной величине (LOAD_CONST), помещенной обратно в стек.
Получилось немного забавно, так как мы только что извлекли None из стека, но в ходе компиляции байт-кода не было возможности узнать это или рассчитывать на возможность повторного использования величины.
Наконец, мы можем вернуть управление той инструкции, которая находится в стеке под возвращаемым значением, и функция main() будет завершена.
На самом деле, наш стек – не просто горстка значений, а потенциально сотни или тысячи элементов в зависимости от того, насколько глубоким будет стек вызовов и насколько сложными становятся сигнатуры функций.
Сложные функции вполне могут содержать десятки или сотни инструкций. Приложение или сервис как единое целое может выходить за пределы сотни тысяч инструкций и иметь огромные стеки. При этом рабочая среда CPython должна все отслеживать.
Думаю, что для одних выходных байт-кода хватит, так что давайте поговорим о взаимосовместимости.
Взаимосовместимость (concurrency)
Это простая идея, но ее часто путают с другими концепциями. На самом базовом уровне взаимосовместимость заключается в исполнении нескольких задач. Задача может быть, в принцие, чем угодно. Изобразим задачу в виде полоски.
Представьте, что, исполняя задачу, мы двигались слева направо.
Но ни у кого никогда не бывает только одной задачи. У нас будет несколько задач, которые нам нужно исполнить, часто задач бывает на несколько порядков больше.
Наивное решение – просто обработать их по порядку. По завершении одной мы приступаем к следующей. Если у нас четыре задачи, для их исполнения требуется в четыре раза больше времени, чем на одну задачу.
Это все легко, имеет смысл и подходит для традиционных вычислений, но не так выглядит взаимосовместимость для многих реальных задач. Еще они похожи не на однородные блоки вычислений, а скорее вот на это с большими периодами простоя в ожидании чего-нибудь. Например, выборки данных с диска или исполнения сетевого запроса.
Если применить наш наивный метод и исполнять их последовательно, получится, что мы проводим много времени в бездействии. Все равно у нас уходит в четыре раза больше времени, чем на исполнение одной задачи.
Вместо этого мы можем в полной мере воспользоваться моментами простоя, чтобы начать исполнение других задач.
Мы можем исполнить все четыре задачи гораздо быстрее. Чем больше времени мы тратим на исполнение какой-то отдельной задачи, тем больше задач мы можем совмещать с ней для экономии времени.
В этом основной принцип и цель взаимосовместимости.
Реализация взаимосовместимости
Существует несколько различных стратегий для реализации взаимосовместимости.
Очевидный подход – несколько исполнителей (worker, «работник»). Каждый исполнитель мог бы обрабатывать одну задачу за раз.
Один из вариантов реализации – мультипроцессорная обработка (multiprocessing), где каждый процесс-исполнитель представляет собой отдельный процесс.
То есть, у каждого исполнителя есть своя рабочая среда CPython, свой стек, своя куча и свой набор скомпилированного байт-кода. Если мы хотим обрабатывать больше задач одновременно, мы просто добавляем больше исполнителей.
Но с этим вариантом связаны некоторые издержки. Конечно, прежде всего, это дублирование памяти, используемой для каждой рабочей среды. Во-вторых, любое взаимодействие между процессами должно происходить путем сериализации и десериализации данных, добавляя дополнительную работу для каждой задачи.
С положительной стороны это означает, что каждый исполнитель может обрабатывать задачи параллельно, обеспечивая для нас более высокий потенциал использования многозадачного курса.
Это — параллелизм (parallelism), младший и более популярный аналог взаимосовместимости.
Но, пусть даже общая пропускная способность обычно масштабируется с количеством исполнителей, каждый исполнитель и, следовательно, каждый процесс по-прежнему простаивает, пока задачи ожидают ресурсов.
Другой альтернативой является использование потоков (threads) для каждого исполнителя, а не процессов с одним исполнителем.
Мы начинаем с той же точки, что и с процессами, но на этот раз, когда мы добавляем больше исполнителей, нам нужно просто добавить новый стек вызовов для каждого потока, уменьшая дублирование куч и байт-кода.
Благодаря этому также устраняется необходимость в сериализации данных между исполнителями, так как теперь все они используют один и тот же объем памяти.
Но, как всегда, данный вариант создает свои издержки и компромиссы вследствие уникального метода разработки CPython.
У нас есть хороший друг «Глобальный блок интерпретатора» (Global Interpreter Lock, GIL). Мы не можем запускать несколько потоков кода Python одновременно, поэтому все остальные потоки должны бездействовать и ждать своей очереди, пока исполняется текущий поток.
Кроме того, рабочая среда отвечает за планирование потоков, не имея представления о том, что они делают. Каждый раз, когда выбирается новый поток, мы должны ждать влекущего свои издержки переключения контактов, которое включает в себя сохранение стека исполнения предыдущего потока и восстановление состояния для нового.
Именно это известно как вытесняющая многозадачность (pre-emptive multitasking). Она гарантирует справедливый доступ к ЦП и упрощает полное использование отдельного ядра ЦП несколькими потоками.
Но здесь повышается вероятность того, что мы будем переключать потоки в неподходящее время. Именно это причиной неоптимального поведения, например, задержки при запуске сетевых запросов.
Это может значительно увеличить время отдельной задачи, как, например, у верхней задачи из четырех вышеприведенных, исполнение которой занимает в два раза больше времени, чем следовало бы.
Возможно, это пример самого плохого сценария. Но, в любом случае, чем больше потоков, тем чаще происходит переключение контекста. Это повышает вероятность того, что ваши задачи будут прерваны в критические моменты.
Так что, и многопроцессорность, и многопоточность имеют разные преимущества и недостатки, но никто из них не приближается к вот этой идеальной форме параллелизма, которую я показывал раньше.
Кооперативная многозадачность (cooperative multitasking)
На самом деле, мы хотим получить систему, в которой задачи могут проходить через свои критические фазы и переключаться на следующую задачу только тогда, когда они ожидают внешних ресурсов. Это — кооперативная многозадачность (cooperative multitasking), простая форма взаимосовместимости без больших издержек.
Компромисс заключается в том, есть зависимость от хороших манер каждой задачи и ее кооперации со всеми остальными задачами для совместного использования времени ЦП и общих ресурсов. Но в приложении, где мы контролируем все, это может дать большие преимущества.
На практике это означает, что каждая задача должна либо завершиться, либо явно передать контроль, прежде чем сможет исполняться другая задача. Очевидный момент для передачи управления: когда задача ожидает внешних ресурсов, ведь других действий у нее пока нет.
Когда мы получим от задачи контроль на данном уровне, то сможем планировать более оптимально, сможем лучше использовать ресурсы и запускать все задачи в одном потоке и в одном процессе. Поэтому у нас будет только один общий стек вызовов и одна общая куча.
Так что если мы думаем о том, как создать подобные кооперирующиеся задачи, то можно найти несколько простых решений.
Одним из них может стать создание задач, которые следуют простому шаблону: по возможности продвигаются, по возможности прогрессируют, выдают (yield), когда это не удается, и, в конечном итоге, завершаются и возвращают.
Если бы мы хотели реализовать нечто подобное в каком-нибудь наивном коде Python, мы могли бы начать с чего-то вроде этого:
Мы просто исполняем метод run() несколько раз и рассчитываем, что в конечном итоге у него будет возврат (return) или выдача (yield) в тот момент, когда это наиболее удобно для отдельных задач.
В какой-то момент метод установит для атрибута ready значение True . Тогда мы сможем проверить результат на предмет конечного значения переменной.
Еще мы можем реализовать что-нибудь совсем бесполезное само по себе, например задачу Sleep с заданной продолжительностью и конечным значением.
При каждом запуске данная задача проверяет время. Когда пройдет достаточно времени, она помечает себя как завершенную, указывая на то, что результат стал доступен.
Но сама по себе задача бесполезна. Нам нужно что-нибудь, что будет исполнять много задач одновременно. Нам нужен цикл с ожиданием событий (event loop).
Мы будем использовать это как основу для исполнения задач. Она берет список объектов задач и многократно вызывает метод run() для ожидающих задач, когда каждая задача помечает себя как завершенную.
Цикл событий в конечном итоге будет исполнять меньше задач на каждой итерации. После завершения всех задач он возвращает список конечных результатов.
Теперь, раз мы создаем несколько задач, в данном случае спящие случайный объем времени, мы можем запустить для них цикл событий. Цикл завершит все 10 задач, а результаты вернутся, когда самая долгая задача будет окончательно завершена.
Если мы дадим ему тысячу таких задач, даже этот наивный цикл событий все равно сможет завершить их быстрее, чем могло бы потребоваться для запуска run() и компиляции результатов из тысячи потоков или тысячи процессов.
Но данная реализация Task не идеальна. В лучшем случае каждая задача должна вручную отслеживать свой прогресс, и каждая задача должна создавать свой метод run() , чтобы каждый раз запускаться сначала.
Наш фреймворк также не предлагает задачам легкий способ для вызова других функций, которым также может потребоваться подождать своих ресурсов.
Хорошо бы у нас был способ написать какой-нибудь код, который мог бы выдавать (yield) несколько значений с течением времени и иметь возможность начать исполнение с того места, где ранее произошла остановка. Некоторые из вас, возможно, уже поняли, к чему я веду.
Генераторы (Generators)
Генераторы идеально подходят под данное описание. Давайте посмотрим, как они работают.
Последовательность Фибоначчи — один из основных примеров генераторов и концептуально проста. Каждый раз по ходу цикла мы складываем два предыдущих числа и выдаем значение. В результате чего получается последовательность 1 1 2 3 5 и так далее.
Но когда мы вызываем данную функцию, мы не получаем ни одного из данных значений напрямую. Вместо это мы на самом деле получаем объект-генератор.
Вот скомпилированная версия нашей функции генератора. Фактический код, внутренняя функция даже не начала исполняться, но по объекту-генератору можно проводить итерацию, совсем как по списку.
Cтандартную функцию next() из стандартной библиотеки можно использовать для итерации только один раз.
Каждый раз, когда мы вызываем next() для своего объекта-генератора, он повторно входит в функцию на том месте, где остановился с полным сохранением состояния. И, если функция выдает другое значение, мы получаем данное значение как результат или возвращаемое значение от вызова next() .
Когда функция генератора завершается или возвращает, объект-генератор поднимает значение остановки итерации StopIteration , как и любой другой итератор, когда вы достигнете его конца.
Довольно часто встречаются генераторы, которые выдают значения, но еще могут передавать или направлять значения обратно в генератор извне. Когда это происходит после передачи исполнения в функцию генератора, оператор yield сам дает значение, которое было направлено в генератор.
Для этого нам нужно заменить функцию next() методом send() для генераторов. Благодаря этому функция исполняется с того места, где она остановилась. Как и с next() , будут возвращаться все выданные значения генератора, пока функция не будет запущена по-настоящему.
Несмотря на то, что после этого можно направить только None , мы можем и дальше направлять и получать значения, пока генератор не завершится.
Ура, мы только что нашли сопрограммы (coroutines). Python годами прятал их на виду у всех.
Сопрограммы (coroutines)
Как в реальности использовать все это для исполнения параллельных задач. Оказывается, мы можем адаптировать наш цикл с ожиданием событий и улучшить его по ходу дела.
Как он будет выглядеть:
Вместо вызова метода run() для каждой задачи мы вызываем метод send() для каждого объекта генератора. Вместо того, чтобы искать флаг, мы перехватываем ошибку StopIteration и помечаем данные генераторы или задачи как завершенные.
Начиная с Python 3.3, StopIteration содержит значение, возвращаемое данными генераторами. Поэтому мы сохраняем их для добавления в конечный результат.
Наконец, мы также захватываем промежуточные выданные (yielded) значения и направляем их обратно на следующей итерации, что позволяет сопрограммам вызывать другие сопрограммы. То есть, теперь мы можем получать выдачу из другой сопрограммы, чтобы вызывать ее, и наша позиция в стеке вызовов сопрограмм будет сохраняться при всех выдачах.
В общем итоге наше использование сопрограмм становится более похожим на обычные функции.
Но они по-прежнему уступают контроль на своих условиях и могут продолжить с того места, на котором остановились, когда снова придет их очередь.
Поэтому, если мы создадим пару сопрограмм из нашей функции foo() и передадим их нашему циклу событий, он будет следовать за исполнением через foo() в bar() , а затем в сопрограмму sleep() .
Там он будет и дальше делать выдачи обратно в цикл событий, пока не истечет время. Затем на следующей итерации через возврат контроля в bar , который возвращает значение обратно в foo , который, наконец, завершается и возвращает значение.
Уточню, что при каждой выдаче наш цикл с ожиданием событий циклически переходит к следующей ожидающей задаче, что дает нам кооперативную многозадачность, которую мы искали.
Пример программы с кооперативной многозадачностью
Теперь давайте сделаем что-нибудь более полезное, например, загрузим контент c нескольких URL-адресов.
Мы напишем сопрограмму fetch() , которая инициирует соединение для одного URL, и сопрограмму read() , которая отправляет в буфер данные из этого подключения по мере их доступности. Это делается через выдачу из сопрограммы read() после прочтения каждого фрагмента. Благодаря этому другие задачи могут исполняться, пока ожидается поступление новых данных.
Затем мы вызовем fetch() несколько раз, чтобы создать сопрограммы генератора. Наш цикл событий будет запускать каждую из них и исполнять их одновременно, пока не завершатся все запросы. Тогда у нас будут необработанные данные ответа для каждого запроса, и мы сможем поступать с ними по своему усмотрению.
Снова ура, вы только что изобрели асинхронный ввод-вывод (async i/o). То, что мы получили, на самом деле является очень примитивной версией асинхронного ввода-вывода с использованием того же синтаксиса, который был доступен в Python 3.4.
Но у нас пока нет абсолютно никаких наворотов.
Отмечу, что данный пример кода был информативным, и с ним было весело поиграться. Но он не очень гибкий и не будет полезным, если мы используем его с библиотеками, которые для него не предназначены. Поэтому очень прошу вас не использовать вот это все в продуктиве. Ведь неспроста этот код не выложили на PyPI.
Сопрограммы со стандартной библиотекой Python
Итак, теперь, когда мы поняли процесс создания сопрограмм из различных функций Python, давайте посмотрим, как реализована поддержка сопрограмм непосредственно в Python.
Начиная с версии 3.5, стандартная библиотека поддерживает синтаксис async def для декларирования нативных сопрограмм наподобие наших игрушечных сопрограмм-генераторов.
Это уже не стандартная функция, которая исполняется сразу после вызова. Если вызвать ее, то будет возвращен объект сопрограммы, который затем можно запускать в цикле событий.
Данный цикл событий создается, среди прочего, фреймворком асинхронного ввода-вывода и исполняет задачи циклически, как и наша функция wait() .
Мы используем нового помощника из Python 3.7, который создаст цикл, исполняющий нашу сопрограмму, а затем закроет цикл событий, когда сопрограмма завершится.
Давайте попробуем. Мы вызываем функцию сопрограммы foo() , выводим данный объект через print() , затем запускаем сопрограмму в цикле событий, получаем результат и выводим его.
После завершения мы увидим сам объект сопрограммы, который мы получили при вызове foo() . Он начнет исполняться только после того как мы передадим его в asyncio.run() , запущенный внутри сопрограмм.
Еще нам стала доступна новая сила — способность ожидать объекты.
Она немного похожа на то, как мы использовали сопрограммы-генераторы. Но в целом она лучше, в данном примере asyncio.sleep() исполняет ту же роль, что и наш генератор sleep .
Так создается простой способ передачи управления циклом событий, причем задержка при передаче равна нулю.
Но на самом деле у нас еще больше возможностей. Нам не обязательно ждать только другие сопрограммы, мы можем ожидать самые разные асинхронные объекты, в том числе объекты ожидания (awaitables) и фьючерсы (futures).
Данная гибкость упрощает совмещение синхронных библиотек старого стиля с базами асинхронного кода, а также дает нам более выразительные формы для встраивания параллелизма в приложения.
Итак, возьмем наши сопрограммы, которые мы сделали в начале. Мы можем вызывать и ожидать сопрограмму sleepy() . Сама sleepy() может что-то делать, ждать чего-то другого и в конечном итоге возвращать значение.
Данное значение затем возвращается благодаря ключевому слову await в foo() и может использоваться традиционными способами.
Одним из распространенных рабочих процессов в параллельном программировании является создание нескольких подзадач.
Если мы просто используем ключевое слово await напрямую на каждой задаче по порядку, мы не получим никакого параллелизма.
Вместо этого, если мы хотим ожидать несколько вещей одновременно и фактически запускать их одновременно, нам нужно использовать помощника asyncio.gather().
Он берет несколько сопрограмм-фьючерсов или объектов ожидания, продвигает их в задачи и запускает их одновременно. В конечном итоге он возвращает результаты в том же порядке, в котором они были даны. Это прямой эквивалент в асинхронном вводе-выводе для нашей функции wait() из начала.
Еще у нас есть есть поддержка для асинхронных итерируемых объектов и менеджеров контекста.
Асинхронные итерируемые объекты похожи на стандартные, но используют сопрограммы для извлечения следующего элемента.
Аналогичным образом асинхронные менеджеры контекста используют сопрограммы вместо традиционных функций при входе в контекст и выходе из него.
Наконец, у нас есть поддержка асинхронных генераторов.
Они построены на основе генераторов и сопрограмм, чтобы дать нам, как вы уже догадались, больше генераторов. На самом деле, они представляют собой асинхронные итерируемые объекты. Благодаря этому, они фантастически полезны для создания выразительных асинхронных интерфейсов без ущерба для удобочитаемости или сопровождения.
Итак, давайте объединим все это в одну финальную демонстрацию и создадим вариант извлечения URL с асинхронным вводом-выводом.
В async def fetch() мы реализуем извлечение одного URL-адреса с использованием библиотеки aiohttp . Запрос создает для нас асинхронный контекст, который устанавливает соединение и дает нам объект ответа. Затем мы можем подождать метод text() и получить полное тело ответа от нашей основной сопрограммы.
В async def main() мы можем вызвать fetch() для каждого URL. Так мы получим список объектов сопрограмм.
Мы передадим их в asyncio.gather() , который будет исполнять каждую из них как параллельную задачу. Затем мы возьмем результаты ожидания gather и выведем ответы в консоль.
Когда мы запускаемся с помощью asyncio.run() , то увидим, что результаты соответствуют тем, что мы получили ранее с помощью сопрограмм-генераторов и настраиваемого цикла событий.
Заключение
Кооперативная многозадачность используется уже давно. Она даже была базовой функциональностью ранних версий Mac OS и Windows. Сопрограммы развиваются в Python более десяти лет. Async I/O как фреймворк в той или иной форме существует уже более шести лет.
Никакой магии здесь нет. Все, что мы видим, является результатом итеративной структуры и разработки на основе предыдущих функций или абстракций.
Просто нужно немного любопытства, чтобы продолжать чистить лук, расшифровать мотивы и понимать, как и почему принимались решения и как они повлияли на будущее.
Если вам понравилась какая-нибудь часть данного выступления, вам понравятся данные PEP:
Они на удивление хорошо читаются. Даже если вы уже знакомы с сопрограммами или Async I/O , готов поспорить, что вы узнаете что-то, что поможет вам или повлияет на решения, которые вы будете принимать в будущем. Возможно, в следующий раз, когда вы увидите, что кто-то работает с асинхронным вводом-выводом, вы наконец сможете сказать, что это сопрограмма.
Если вы хотите поиграть с какими-нибудь примерами кода, все они доступны в моем репозитории на github: