A Brief Analysis of Multitask
I encountered multitask while analyzing an open-source project, which involved many Python concepts that I was completely unfamiliar with or barely understood. There isn’t much information online about Python multitask, and I’ve always believed that you must truly understand the implementation mechanism of a tool to use it effectively. Therefore, after considering myself to have understood the flow logic of multitask, I attempted to analyze its source code—though I’m just offering my humble opinion.
Prerequisites
python-multitask
Cooperative multitasking
Coroutine
metaclass:
- http://en.wikipedia.org/wiki/Metaclass
- http://www.ibm.com/developerworks/linux/library/l-pymeta/index.html
- http://www.voidspace.org.uk/python/articles/metaclasses.shtml
- http://gnosis.cx/publish/programming/metaclass_1.html
- http://gnosis.cx/publish/programming/metaclass_2.html
generator:
- PEP 255 — Simple Generators
- PEP 288 — Generators Attributes and Exceptions
- PEP 342 — Coroutines via Enhanced Generators
- The C10K problem
How to Use?
The main code for multitasking is contained in the multitasking.py file. According to its documentation, multitasking has the following usage examples:
Example 1. Using multitasking to concurrently run two completely unrelated tasks:
>>> def printer(message):
... while True:
... print message
... yield
...
>>> multitask.add(printer('hello'))
>>> multitask.add(printer('goodbye'))
>>> multitask.run()
hello
goodbye
hello
goodbye
hello
goodbye
[and so on ...]
Example 2. Implement a service program that can handle concurrent client connections using multitask:
def listener(sock):
while True:
conn, address = (yield multitask.accept(sock))
multitask.add(client_handler(conn))
def client_handler(sock):
while True:
request = (yield multitask.recv(sock, 1024))
if not request:
break
response = handle_request(request)
yield multitask.send(sock, response)
multitask.add(listener(sock))
multitask.run()
Example 3. A parent task can yield other tasks (child tasks). The child task will continue to execute until it finishes and raises a StopIteration statement. The child task can also return output data to the parent task via the StopIteration parameter.
task:
>>> def parent():
... print (yield return_none())
... print (yield return_one())
... print (yield return_many())
... try:
... yield raise_exception()
... except Exception, e:
... print 'caught exception: %s' % e
...
>>> def return_none():
... yield
... # do nothing
... # or return
... # or raise StopIteration
... # or raise StopIteration(None)
...
>>> def return_one():
... yield
... raise StopIteration(1)
...
>>> def return_many():
... yield
... raise StopIteration(2, 3) # or raise StopIteration((2, 3))
...
>>> def raise_exception():
... yield
... raise RuntimeError('foo')
...
>>> multitask.add(parent())
>>> multitask.run()
None
1
(2, 3)
caught exception: foo
Why does it work?
– Asynchronous I/O – Multitask uses select/poll to handle asynchronous I/O and uses coroutines to implement so-called Cooperative Multi-tasking. We know that due to the limitations of select itself and the inefficiency of poll calls on Linux platforms, using multitask to implement a high-performance service program with 100k concurrent requests may not be suitable. Also, since select/poll/epoll/kqueue cannot accurately detect changes in the state of ordinary files, multitask cannot be used for reading and writing disk files (Python v2.7 documentation. 16.1 select — Waiting for I/O completion).
Why doesn’t multitask use epoll on Linux platforms and kqueue on BSD platforms? I don’t have an answer to this question yet. Perhaps it’s based on cross-platform considerations (but special handling can be done for specific platforms), or perhaps the author wanted to provide a platform-independent implementation as much as possible. In any case, extending it for platforms is not difficult.
Asynchronous I/O processing is implemented in _FDSelector. Besides registering file descriptors, Besides routine operations such as annotating file descriptors, let’s take a look at the implementation of its event handler function process() (here we’ve selected an implementation based on select.poll):
354 def process(self, tm, timeout):
355 try:
356 ready = self._poller.poll(timeout)
357 except (select.error, IOError, OSError), err:
358 if err.args[0] != errno.EINTR:
359 raise
360 else:
361 for fd, event in ready:
362 fd = self._waits[fd]
363
364 if event & select.POLLNVAL:
365 err = errno.EINVAL
366 elif event & select.POLLHUP:
367 err = errno.ECONNRESET
368 else:
369 err = 0
370
371 if err == 0:
372 fd._reenqueue(tm)
373 else:
374 fd._reenqueue(tm,
375 exc_info=(_socket_error_from_errno(err),))
To clarify the process, we’ll temporarily ignore error handling in the code. Furthermore, based on the code context, we know that _waits is a dictionary variable of the form {fileno: FDReady()}, FDReady corresponds to a task that needs asynchronous processing, and the tm variable is the main object of the multitasking system, TaskManager. It drives the operation of each coroutine, which we will discuss later. So,
fd._reenqueue(tm)
This involves appending FDReady() itself to the TaskManager’s processing queue. Then, within the TaskManager’s processing flow, the read/write function is called to receive/send data.
– A read operation – When using
def foo():
data = yield (multitask.recv(sock, 1024))
时, 发生了什么事儿呢? 我们具体来看一看:
714 @yieldable
715 def recv(sock, *args, **kwargs):
733 return _fdaction(sock, sock.recv, args, kwargs, read=True)
575 @yieldable
576 def _fdaction(fd, func, args=(), kwargs={}, read=False, write=False, exc=False):
577 timeout = kwargs.pop('timeout', None)
578
579 yield FDReady(fd, read, write, exc, timeout)
580
581 while True:
582 try:
583 raise StopIteration(func(*(args), **(kwargs)))
584 except (socket.error, IOError, OSError), err:
585 if err.args[0] != errno.EINTR:
586 raise
The yield statement at the asterisk (*) returns another generator, _fdaction, which returns upon its first call.
FDReady(fd, True, False, False, timeout)
The event listener instance. When a read event occurs on file descriptor (fd), TaskManager calls this _fdaction again and returns the data read by sock.recv(func) as a StopIteration parameter to the caller of multitask.recv(). That is, the data is now handed over to the data variable at the * (this handover is completed by TaskManager calling _fdaction‘s send()).
Each recv() operation is completed by an FDReady and a sock.recv() function. This non-blocking execution suspension is accomplished by yield(multitask.recv(sock, 1024) and yield FDReady(fd, True, False, False, timeout) together.
– YieldCondition –
What is FDReady, and how is it combined with FDSelector?
FDReady._waits is a common feature of all FDReady instances. The TaskManager and the index of its corresponding _FDSelector. For example, if FDReady is generated in TaskManager x, then it will be added to the monitoring list of the _FDSelector corresponding to x. Thus, FDReady and _FDSelector are combined.
According to the code docstring, YieldCondition is the base class of all classes that will be yielded to a TaskManager by a task and restarted when certain events occur. YieldCondition has a metaclass MetaYieldCondition, used to track the subclasses of globally alive YieldCondition classes (FDReady, _QueueAction). The FDReady class can also track its own TaskManager and the _FDSelector it uses through _waits. Therefore, through MetaYieldCondition, all existing TaskManagers can be accessed (there may be multiple instances, such as in a multi-threaded model where each thread instantiates its own TaskManager and its corresponding _FDSelector).
– Driving Force For a time, my lack of knowledge led me to a series of “ridiculous” questions: What is the Linux kernel, why does it keep running, why are processes executed, and what drives them? Of course, I now know that the computer’s heart muscle—the clock—is the “culprit” behind all of this. When I see a piece of code, I must first understand its life-bearing carrier (process, thread) and behavioral patterns. Only then can I understand where the code comes from and where it goes.
The carrier of multitask is the process or thread that calls it, so its behavioral patterns are represented by TaskManager. When you call the only blocking function in the event-driven driver, multitask.run(), this “world” begins to operate.
1106 def run(self):
1117 while (self._queue or
1118 self._timeouts or
1119 MetaYieldCondition._has_waits(self)):
1120
1121 while self._queue:
1122 self._run_next_task()
1123
1124 MetaYieldCondition._handle_waits(self)
1125
1126 if self._timeouts:
1127 self._handle_timeouts(self._get_run_timeout())
The self._queue element is a tuple like (task, input, exc_info). Each task added via TaskManager.add() is appended to it in the form of (task, None, ()). It also includes intermediate tasks generated by TaskManager during runtime.
MetaYieldCondition._has_waits() and MetaYieldCondition._handle_waits(), etc., are mainly used for timeout handling, which we will not discuss for now. Therefore, let’s focus on self._run_next_task() for now.
1186 def _run_next_task(self):
1187 task, input, exc_info = self._queue.popleft()
1188 while True:
1189 try:
1190 if exc_info:
1191 output = task.throw(*exc_info)
1192 else:
1193 output = task.send(input)
1194 except StopIteration, e:
1195 if not isinstance(task, _ChildTask):
1196 break
1197 else:
1198 if not e.args:
1199 output = None
1200 elif len(e.args) == 1:
1201 output = e.args[0]
1202 else:
1203 output = e.args
1204 task, input, exc_info = task.parent, output, ()
1205 except:
1206 if isinstance(task, _ChildTask):
1207 # Propagate exception to parent
1208 task, input, exc_info = task.parent, None, sys.exc_info()
1209 else:
1210 # No parent task, so just die
1211 raise
1212 else:
1213 if isinstance(output, types.GeneratorType):
1214 task, input, exc_info = _ChildTask(task, output), None, ()
1215 else:
1216 if isinstance(output, YieldCondition):
1217 output.task = task
1218 output._handle(self)
1219 else:
1220 # Return any other output as input and send task to
1221 # end of queue
1222 self._enqueue(task, input=output)
1223 break
For simplicity and clarity, let’s assume that there is currently only one task generated by the above example call in TaskManager:
def foo():
data = yield (multitask.recv(sock, 1024)) # *
The generated generator ‘foo’ will be analyzed step by step in its transformation and execution flow within the TaskManager.
First, when _run_next_task() is executed for the first time, the generator ‘foo’ is retrieved from the queue:
1187 task, input, exc_info = self._queue.popleft()
Where task is generator ‘foo’, input is the initial value None, and exc_info is the initial value ().
1190 if exc_info:
1191 output = task.throw(*exc_info)
1192 else:
1193 output = task.send(input)
The code above will actually be executed as output = task.send(None). This is what PEP 342 mentioned: using None to initially call the generator, this generator ‘foo’ returns multitask.recv(sock, 1024) and is suspended before yield returns. From the above analysis, we know that multitask.recv() returns the generator ‘_fdaction’.
Assuming there are no exceptions during this process, the code will execute to:
1213 if isinstance(output, types.GeneratorType):
1214 task, input, exc_info = _ChildTask(task, output), None, ()
1215 else:
1216 if isinstance(output, YieldCondition):
1217 output.task = task
1218 output._handle(self)
1219 else:
1220 # Return any other output as input and send task to
1221 # end of queue
1222 self._enqueue(task, input=output)
1223 break
Clearly, the condition in code 1213 is true, and the output is the generator ‘_fdaction’. Execute line 1214:
1214 task, input, exc_info = _ChildTask(task, output), None, ()
Here, generator ‘foo’ and generator ‘_fdaction’ are treated as parent and child:
1029 class _ChildTask(object):
1030
1031 __slots__ = ('parent', 'send', 'throw')
1032
1033 def __init__(self, parent, task):
1034 self.parent = parent
1035 self.send = task.send
1036 self.throw = task.throw
The two generators establish a parent-child relationship through _ChildTask and are temporarily merged into a single task—generator '_ChildTask'. Then, on line 1214, the variables task, input, and exc_info are reassigned.
Then, returning to the beginning of the loop:
1190 if exc_info:
1191 output = task.throw(*exc_info)
1192 else:
1193 output = task.send(input)
The code block above is ultimately called with the form output = task.send(None). Remember PEP 342 mentioned earlier?
When a generator is called for the first time with a None parameter, this generator has become the generator _ChildTask generated in the first loop, which is the generator _fdaction itself. Recalling the code above,
generator _fdaction returns an FDReady instance, which is assigned to output.
Then the second loop continues.
FDReady is a subclass of YieldCondition, and it is not itself a generator. Therefore,
1215 else:
1216 if isinstance(output, YieldCondition):
1217 output.task = task
1218 output._handle(self)
1219 else:
1220 # Return any other output as input and send task to
1221 # end of queue
1222 self._enqueue(task, input=output)
1223 break
Ha, 1217 and 1218 were executed. The task variable of FDReady (a parameter inherited from YieldCondition) temporarily stores the wrapper generator _ChildTask for generator _fdaction, and then calls its _handle method:
508 def _handle(self, tm):
509 self._waits[tm].add(self)
510 super(FDReady, self)._handle(tm)
The _handle method of FDReady registers this FDReady instance to the _FDSelector corresponding to the current TaskManager, and then calls the _handle method of the FDReady parent class YieldCondition.
297 def _handle(self, tm):
298 if self.expiration is not None:
299 tm._add_timeout(self)
Okay, timeout handling. As mentioned above, timeouts are temporarily not accepted.
The second loop continues. A break occurs. At this point, TaskManager’s _queue is empty. FDReady is registered in _FDSelector, and FDReady‘s task variable references the wrapper of generator _fdaction, _ChildTask, whose parent member references the original generator foo. Perfect!
Yes, those familiar with asynchronous I/O might be thinking about operations like select()/poll()/epoll_wait()/kqueue().
Well, it’s time to collect the event.
Back in the loop of the run() function, _run_next_task() has finished executing, and we’re now at line 1124:
1124 MetaYieldCondition._handle_waits(self)
248 @staticmethod
249 def _handle_waits(tm, timeout=None):
250 for cls in MetaYieldCondition.__custom_wait_handlers:
251 if cls._has_waits(tm):
252 cls._handle_waits(tm, tm._get_run_timeout(timeout))
Remember the statement above that “YieldCondition has a metaclass MetaYieldCondition, used to track globally alive classes that inherit from YieldCondition (FDReady, _QueueAction)”? __custom_wait_handlers is the variable used for this tracking. Under all the assumptions above, this variable only contains the class FDReady. Okay, then the above lines of code are equivalent to:
if FDReady._has_waits(tm):
FDReady._handle_waits(tm, tm._get_run_timeout(timeout))
The implementation is as follows:
520 @classmethod
521 def _has_waits(cls, tm):
522 return bool(cls._waits[tm])
523
524 @classmethod
525 def _handle_waits(cls, tm, timeout):
526 if (timeout is None) or (timeout > 0.0):
527 cls._waits[tm].process(tm, timeout)
Now, the _FDSelector of the tm object has just added a new FDReady file descriptor, _has_waits is True, and timeout is assumed to be None. cls._waits[tm].process(tm, timeout) calls the process function of _FDSelector, as analyzed at the beginning of the article. Everything is now connected.
When data arrives on the file descriptor (remember that our generator ‘foo’ uses multitask.recv()), the _reenqueue() function of FDReady is called:
516 def _reenqueue(self, tm, input=None, exc_info=()):
517 self._waits[tm].remove(self)
518 super(FDReady, self)._reenqueue(tm, input, exc_info)
FDReady first unannotates itself from _FDSelector (event-driven, you know), then,
304 def _reenqueue(self, tm, input=None, exc_info=()):
305 tm._enqueue(self.task, input, exc_info)
306 if self.expiration is not None:
307 tm._remove_timeout(self)
Yes! tm._enqueue(task, None, ()) indicates a read event has occurred on the file descriptor (fd) of FDReady, so it returns to the _queue of TaskManager. Here, task is a variable reference in FDReady that wraps the generator _ChildTask within the generator _fdaction.
Don’t worry, at this point there’s only a read event; the actual data is still in the TCP/IP stack and must be read.
Let’s jump directly to TaskManager‘s _run_next_task():
1190 if exc_info:
1191 output = task.throw(*exc_info)
1192 else:
1193 output = task.send(input)
The task is the wrapper generator ‘_ChildTask’ for generator ‘_fdaction’, and the input is None. Therefore, after the send() call, the code in _fdaction starts executing from the line following yield and begins reading data.
583 raise StopIteration(func(*(args), **(kwargs)))
The func calls sock.recv(). The read data is returned to the output function above. ………
It’s not over yet; our final destination is the generator ‘foo’. But I think if you understand the above process, the following process shouldn’t be difficult either.
– Communication Component Queue – To be discussed later.
Look back. The above analysis only simplified the process of Example 2. Can you understand Examples 1 and 3?
Actually, it took me a long time to reach this superficial understanding. When reading the definitions and related code of coroutines, metaclasses, including multitasks, I kept repeating the word “brain fuck” in my head. I had never been exposed to Functional Programming before; it’s a new concept and a new field, so it’s normal to be a little confused at first, right?!
--- Yeah, I smoke a lot, still on top.


Leave a Reply