Multitask Analysis

Multitask Analysis

A Brief Analysis of Multitask

I encountered multitask while analyzing an open-source project, which involved many Python concepts that I was completely unfamiliar with or barely understood. There isn’t much information online about Python multitask, and I’ve always believed that you must truly understand the implementation mechanism of a tool to use it effectively. Therefore, after considering myself to have understood the flow logic of multitask, I attempted to analyze its source code—though I’m just offering my humble opinion.

Prerequisites

python-multitask
Cooperative multitasking
Coroutine

metaclass:

generator:

How to Use?

The main code for multitasking is contained in the multitasking.py file. According to its documentation, multitasking has the following usage examples:

Example 1. Using multitasking to concurrently run two completely unrelated tasks:

>>> def printer(message):
...     while True:
...         print message
...         yield
... 
>>> multitask.add(printer('hello'))
>>> multitask.add(printer('goodbye'))
>>> multitask.run()
hello
goodbye
hello
goodbye
hello
goodbye
[and so on ...]

Example 2. Implement a service program that can handle concurrent client connections using multitask:

def listener(sock):
    while True:
        conn, address = (yield multitask.accept(sock))
        multitask.add(client_handler(conn))

def client_handler(sock):
    while True:
        request = (yield multitask.recv(sock, 1024))
        if not request:
            break
        response = handle_request(request)
        yield multitask.send(sock, response)

multitask.add(listener(sock))
multitask.run()

Example 3. A parent task can yield other tasks (child tasks). The child task will continue to execute until it finishes and raises a StopIteration statement. The child task can also return output data to the parent task via the StopIteration parameter.

task:

>>> def parent():
...     print (yield return_none())
...     print (yield return_one())
...     print (yield return_many())
...     try:
...         yield raise_exception()
...     except Exception, e:
...         print 'caught exception: %s' % e
... 
>>> def return_none():
...     yield
...     # do nothing
...     # or return
...     # or raise StopIteration
...     # or raise StopIteration(None)
... 
>>> def return_one():
...     yield
...     raise StopIteration(1)
... 
>>> def return_many():
...     yield
...     raise StopIteration(2, 3)  # or raise StopIteration((2, 3))
... 
>>> def raise_exception():
...     yield
...     raise RuntimeError('foo')
... 
>>> multitask.add(parent())
>>> multitask.run()
None
1
(2, 3)
caught exception: foo

Why does it work?

– Asynchronous I/O – Multitask uses select/poll to handle asynchronous I/O and uses coroutines to implement so-called Cooperative Multi-tasking. We know that due to the limitations of select itself and the inefficiency of poll calls on Linux platforms, using multitask to implement a high-performance service program with 100k concurrent requests may not be suitable. Also, since select/poll/epoll/kqueue cannot accurately detect changes in the state of ordinary files, multitask cannot be used for reading and writing disk files (Python v2.7 documentation. 16.1 select — Waiting for I/O completion).

Why doesn’t multitask use epoll on Linux platforms and kqueue on BSD platforms? I don’t have an answer to this question yet. Perhaps it’s based on cross-platform considerations (but special handling can be done for specific platforms), or perhaps the author wanted to provide a platform-independent implementation as much as possible. In any case, extending it for platforms is not difficult.

Asynchronous I/O processing is implemented in _FDSelector. Besides registering file descriptors, Besides routine operations such as annotating file descriptors, let’s take a look at the implementation of its event handler function process() (here we’ve selected an implementation based on select.poll):

354         def process(self, tm, timeout):
355             try:
356                 ready = self._poller.poll(timeout)
357             except (select.error, IOError, OSError), err:
358                 if err.args[0] != errno.EINTR:
359                     raise
360             else:
361                 for fd, event in ready:
362                     fd = self._waits[fd]
363 
364                     if event & select.POLLNVAL:
365                         err = errno.EINVAL
366                     elif event & select.POLLHUP:
367                         err = errno.ECONNRESET
368                     else:
369                         err = 0
370 
371                     if err == 0:
372                         fd._reenqueue(tm)
373                     else:
374                         fd._reenqueue(tm,
375                            exc_info=(_socket_error_from_errno(err),))

To clarify the process, we’ll temporarily ignore error handling in the code. Furthermore, based on the code context, we know that _waits is a dictionary variable of the form {fileno: FDReady()}, FDReady corresponds to a task that needs asynchronous processing, and the tm variable is the main object of the multitasking system, TaskManager. It drives the operation of each coroutine, which we will discuss later. So,

fd._reenqueue(tm)

This involves appending FDReady() itself to the TaskManager’s processing queue. Then, within the TaskManager’s processing flow, the read/write function is called to receive/send data.

– A read operation – When using

def foo():
    data = yield (multitask.recv(sock, 1024))
时, 发生了什么事儿呢? 我们具体来看一看:

714 @yieldable
715 def recv(sock, *args, **kwargs):
733     return _fdaction(sock, sock.recv, args, kwargs, read=True)

575 @yieldable
576 def _fdaction(fd, func, args=(), kwargs={}, read=False, write=False, exc=False):
577     timeout = kwargs.pop('timeout', None)
578 
579     yield FDReady(fd, read, write, exc, timeout)
580 
581     while True:
582         try:
583             raise StopIteration(func(*(args), **(kwargs)))
584         except (socket.error, IOError, OSError), err:
585             if err.args[0] != errno.EINTR:
586                 raise

The yield statement at the asterisk (*) returns another generator, _fdaction, which returns upon its first call.

FDReady(fd, True, False, False, timeout)

The event listener instance. When a read event occurs on file descriptor (fd), TaskManager calls this _fdaction again and returns the data read by sock.recv(func) as a StopIteration parameter to the caller of multitask.recv(). That is, the data is now handed over to the data variable at the * (this handover is completed by TaskManager calling _fdaction‘s send()).

Each recv() operation is completed by an FDReady and a sock.recv() function. This non-blocking execution suspension is accomplished by yield(multitask.recv(sock, 1024) and yield FDReady(fd, True, False, False, timeout) together.

– YieldCondition –

What is FDReady, and how is it combined with FDSelector?

FDReady._waits is a common feature of all FDReady instances. The TaskManager and the index of its corresponding _FDSelector. For example, if FDReady is generated in TaskManager x, then it will be added to the monitoring list of the _FDSelector corresponding to x. Thus, FDReady and _FDSelector are combined.

According to the code docstring, YieldCondition is the base class of all classes that will be yielded to a TaskManager by a task and restarted when certain events occur. YieldCondition has a metaclass MetaYieldCondition, used to track the subclasses of globally alive YieldCondition classes (FDReady, _QueueAction). The FDReady class can also track its own TaskManager and the _FDSelector it uses through _waits. Therefore, through MetaYieldCondition, all existing TaskManagers can be accessed (there may be multiple instances, such as in a multi-threaded model where each thread instantiates its own TaskManager and its corresponding _FDSelector).

– Driving Force For a time, my lack of knowledge led me to a series of “ridiculous” questions: What is the Linux kernel, why does it keep running, why are processes executed, and what drives them? Of course, I now know that the computer’s heart muscle—the clock—is the “culprit” behind all of this. When I see a piece of code, I must first understand its life-bearing carrier (process, thread) and behavioral patterns. Only then can I understand where the code comes from and where it goes.

The carrier of multitask is the process or thread that calls it, so its behavioral patterns are represented by TaskManager. When you call the only blocking function in the event-driven driver, multitask.run(), this “world” begins to operate.

1106     def run(self):
1117         while (self._queue or
1118                self._timeouts or
1119                MetaYieldCondition._has_waits(self)):
1120 
1121             while self._queue:
1122                 self._run_next_task()
1123 
1124             MetaYieldCondition._handle_waits(self)
1125 
1126             if self._timeouts:
1127                 self._handle_timeouts(self._get_run_timeout())

The self._queue element is a tuple like (task, input, exc_info). Each task added via TaskManager.add() is appended to it in the form of (task, None, ()). It also includes intermediate tasks generated by TaskManager during runtime.

MetaYieldCondition._has_waits() and MetaYieldCondition._handle_waits(), etc., are mainly used for timeout handling, which we will not discuss for now. Therefore, let’s focus on self._run_next_task() for now.

1186     def _run_next_task(self):
1187         task, input, exc_info = self._queue.popleft()
1188         while True:
1189             try:
1190                 if exc_info:
1191                     output = task.throw(*exc_info)
1192                 else:
1193                     output = task.send(input)
1194             except StopIteration, e:
1195                 if not isinstance(task, _ChildTask):
1196                     break
1197                 else:
1198                     if not e.args:
1199                         output = None
1200                     elif len(e.args) == 1:
1201                         output = e.args[0]
1202                     else:
1203                         output = e.args
1204                     task, input, exc_info = task.parent, output, ()
1205             except:
1206                 if isinstance(task, _ChildTask):
1207                     # Propagate exception to parent
1208                     task, input, exc_info = task.parent, None, sys.exc_info()
1209                 else:
1210                     # No parent task, so just die
1211                     raise
1212             else:
1213                 if isinstance(output, types.GeneratorType):
1214                     task, input, exc_info = _ChildTask(task, output), None, ()
1215                 else:
1216                     if isinstance(output, YieldCondition):
1217                         output.task = task
1218                         output._handle(self)
1219                     else:
1220                         # Return any other output as input and send task to
1221                         # end of queue
1222                         self._enqueue(task, input=output)
1223                     break

For simplicity and clarity, let’s assume that there is currently only one task generated by the above example call in TaskManager:

def foo():
    data = yield (multitask.recv(sock, 1024))    # *

The generated generator ‘foo’ will be analyzed step by step in its transformation and execution flow within the TaskManager.

First, when _run_next_task() is executed for the first time, the generator ‘foo’ is retrieved from the queue:

1187         task, input, exc_info = self._queue.popleft()

Where task is generator ‘foo’, input is the initial value None, and exc_info is the initial value ().

1190                 if exc_info:
1191                     output = task.throw(*exc_info)
1192                 else:
1193                     output = task.send(input)

The code above will actually be executed as output = task.send(None). This is what PEP 342 mentioned: using None to initially call the generator, this generator ‘foo’ returns multitask.recv(sock, 1024) and is suspended before yield returns. From the above analysis, we know that multitask.recv() returns the generator ‘_fdaction’.

Assuming there are no exceptions during this process, the code will execute to:

1213                 if isinstance(output, types.GeneratorType):
1214                     task, input, exc_info = _ChildTask(task, output), None, ()
1215                 else:
1216                     if isinstance(output, YieldCondition):
1217                         output.task = task
1218                         output._handle(self)
1219                     else:
1220                         # Return any other output as input and send task to
1221                         # end of queue
1222                         self._enqueue(task, input=output)
1223                     break

Clearly, the condition in code 1213 is true, and the output is the generator ‘_fdaction’. Execute line 1214:

1214 task, input, exc_info = _ChildTask(task, output), None, ()

Here, generator ‘foo’ and generator ‘_fdaction’ are treated as parent and child:

1029 class _ChildTask(object):
1030 
1031     __slots__ = ('parent', 'send', 'throw')
1032 
1033     def __init__(self, parent, task):
1034         self.parent = parent
1035         self.send = task.send
1036         self.throw = task.throw

The two generators establish a parent-child relationship through _ChildTask and are temporarily merged into a single task—generator '_ChildTask'. Then, on line 1214, the variables task, input, and exc_info are reassigned.

Then, returning to the beginning of the loop:

1190                 if exc_info:
1191                     output = task.throw(*exc_info)
1192                 else:
1193                     output = task.send(input)

The code block above is ultimately called with the form output = task.send(None). Remember PEP 342 mentioned earlier?

When a generator is called for the first time with a None parameter, this generator has become the generator _ChildTask generated in the first loop, which is the generator _fdaction itself. Recalling the code above,

generator _fdaction returns an FDReady instance, which is assigned to output.

Then the second loop continues.

FDReady is a subclass of YieldCondition, and it is not itself a generator. Therefore,

1215                 else:
1216                     if isinstance(output, YieldCondition):
1217                         output.task = task
1218                         output._handle(self)
1219                     else:
1220                         # Return any other output as input and send task to
1221                         # end of queue
1222                         self._enqueue(task, input=output)
1223                     break

Ha, 1217 and 1218 were executed. The task variable of FDReady (a parameter inherited from YieldCondition) temporarily stores the wrapper generator _ChildTask for generator _fdaction, and then calls its _handle method:

508     def _handle(self, tm):
509         self._waits[tm].add(self)
510         super(FDReady, self)._handle(tm)

The _handle method of FDReady registers this FDReady instance to the _FDSelector corresponding to the current TaskManager, and then calls the _handle method of the FDReady parent class YieldCondition.

297     def _handle(self, tm):
298         if self.expiration is not None:
299             tm._add_timeout(self)

Okay, timeout handling. As mentioned above, timeouts are temporarily not accepted.

The second loop continues. A break occurs. At this point, TaskManager’s _queue is empty. FDReady is registered in _FDSelector, and FDReady‘s task variable references the wrapper of generator _fdaction, _ChildTask, whose parent member references the original generator foo. Perfect!

Yes, those familiar with asynchronous I/O might be thinking about operations like select()/poll()/epoll_wait()/kqueue().

Well, it’s time to collect the event.

Back in the loop of the run() function, _run_next_task() has finished executing, and we’re now at line 1124:

1124             MetaYieldCondition._handle_waits(self)

248     @staticmethod
249     def _handle_waits(tm, timeout=None):
250         for cls in MetaYieldCondition.__custom_wait_handlers:
251             if cls._has_waits(tm):
252                 cls._handle_waits(tm, tm._get_run_timeout(timeout))

Remember the statement above that “YieldCondition has a metaclass MetaYieldCondition, used to track globally alive classes that inherit from YieldCondition (FDReady, _QueueAction)”? __custom_wait_handlers is the variable used for this tracking. Under all the assumptions above, this variable only contains the class FDReady. Okay, then the above lines of code are equivalent to:

if FDReady._has_waits(tm):
    FDReady._handle_waits(tm, tm._get_run_timeout(timeout))

The implementation is as follows:

520     @classmethod
521     def _has_waits(cls, tm):
522         return bool(cls._waits[tm])
523 
524     @classmethod
525     def _handle_waits(cls, tm, timeout):
526         if (timeout is None) or (timeout > 0.0):
527             cls._waits[tm].process(tm, timeout)

Now, the _FDSelector of the tm object has just added a new FDReady file descriptor, _has_waits is True, and timeout is assumed to be None. cls._waits[tm].process(tm, timeout) calls the process function of _FDSelector, as analyzed at the beginning of the article. Everything is now connected.

When data arrives on the file descriptor (remember that our generator ‘foo’ uses multitask.recv()), the _reenqueue() function of FDReady is called:

516     def _reenqueue(self, tm, input=None, exc_info=()):
517         self._waits[tm].remove(self)
518         super(FDReady, self)._reenqueue(tm, input, exc_info)

FDReady first unannotates itself from _FDSelector (event-driven, you know), then,

304     def _reenqueue(self, tm, input=None, exc_info=()):
305         tm._enqueue(self.task, input, exc_info)
306         if self.expiration is not None:
307             tm._remove_timeout(self)

Yes! tm._enqueue(task, None, ()) indicates a read event has occurred on the file descriptor (fd) of FDReady, so it returns to the _queue of TaskManager. Here, task is a variable reference in FDReady that wraps the generator _ChildTask within the generator _fdaction.

Don’t worry, at this point there’s only a read event; the actual data is still in the TCP/IP stack and must be read.

Let’s jump directly to TaskManager‘s _run_next_task():

1190                 if exc_info:
1191                     output = task.throw(*exc_info)
1192                 else:
1193                     output = task.send(input)

The task is the wrapper generator ‘_ChildTask’ for generator ‘_fdaction’, and the input is None. Therefore, after the send() call, the code in _fdaction starts executing from the line following yield and begins reading data.

583             raise StopIteration(func(*(args), **(kwargs)))

The func calls sock.recv(). The read data is returned to the output function above. ………

It’s not over yet; our final destination is the generator ‘foo’. But I think if you understand the above process, the following process shouldn’t be difficult either.

– Communication Component Queue – To be discussed later.

Look back. The above analysis only simplified the process of Example 2. Can you understand Examples 1 and 3?

Actually, it took me a long time to reach this superficial understanding. When reading the definitions and related code of coroutines, metaclasses, including multitasks, I kept repeating the word “brain fuck” in my head. I had never been exposed to Functional Programming before; it’s a new concept and a new field, so it’s normal to be a little confused at first, right?!

                    --- Yeah, I smoke a lot, still on top.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *