1 """The tasks module provides a simple light-weight alternative to threads.
2
3 When you have a long-running job you will want to run it in the background,
4 while the user does other things. There are four ways to do this:
5
6 - Use a new thread for each task.
7 - Use callbacks from an idle handler.
8 - Use a recursive mainloop.
9 - Use this module.
10
11 Using threads causes a number of problems: they introduce race conditions,
12 often lead to many subtle bugs, and they require lots of resources (you
13 probably wouldn't want 10,000 threads running at once). In particular, two
14 threads can run at exactly the same time (perhaps on different processors), so
15 you have to be really careful that they don't both try to update the same
16 variables at the same time. This requires lots of messy locking, which is hard
17 to get right.
18
19 Callbacks work within a single thread. For example, you open a dialog box and
20 then tell the system to call one function if it's closed, and another if the
21 user clicks OK, etc. The function that opened the box then returns, and the
22 system calls one of the given callback functions later. Callbacks only
23 execute one at a time, so you don't have to worry about race conditions.
24 However, they are often very awkward to program with, because you have to
25 save state somewhere and then pass it to the functions when they're called.
26
27 A recursive mainloop only works with nested tasks (you can create a
28 sub-task, but the main task can't continue until the sub-task has
29 finished), so they are not appropriate for long-running jobs.
30
31 Tasks use Python's generator API to provide a more pleasant interface to
32 callbacks. See the Task class (below) for more information.
33 """
34
35
36
37
38 from zeroinstall import _, support, logger, gobject
39 import sys
40
41 if gobject:
44 if self.tag is not None:
45 gobject.source_remove(self.tag)
46 self.tag = None
47
49 @staticmethod
51 def wrapper():
52 cb()
53 return False
54 gobject.idle_add(wrapper)
55
56 call_soon = call_soon_threadsafe
57
58 @staticmethod
60 h = _Handler()
61 def wrapper():
62 assert h.tag is not None
63 cb()
64 return True
65 h.tag = gobject.timeout_add(int(interval * 1000), wrapper)
66 return h
67
68 @staticmethod
70 """@type delay: float"""
71 def wrapper():
72 cb()
73 return False
74 gobject.timeout_add(int(delay * 1000), wrapper)
75
76 @staticmethod
78 """@type fd: int
79 @rtype: L{_Handler}"""
80 h = _Handler()
81 def wrapper(src, cond):
82 cb(*args)
83 return True
84 h.tag = gobject.io_add_watch(fd, gobject.IO_IN | gobject.IO_HUP, wrapper)
85 return h
86
87 @staticmethod
89 h = _Handler()
90 def wrapper(src, cond):
91 cb(*args)
92 return True
93 h.tag = gobject.io_add_watch(fd, gobject.IO_OUT | gobject.IO_HUP, wrapper)
94 return h
95 else:
96 try:
97 import tulip
98 except ImportError:
99
100 - class Fail(object):
102 raise Exception("No mainloop available: install python3-gi or tulip")
103 tulip = loop = Fail()
104 else:
105 loop = tulip.get_event_loop()
106
107
108
109 _run_queue = []
110
111 -def check(blockers, reporter = None):
112 """See if any of the blockers have pending exceptions.
113 If reporter is None, raise the first and log the rest.
114 @type blockers: [L{Blocker}]
115 @param reporter: invoke this function on each error"""
116 ex = None
117 if isinstance(blockers, Blocker):
118 blockers = (blockers,)
119 for b in blockers:
120 if b.exception:
121 b.exception_read = True
122 if reporter:
123 try:
124 reporter(*b.exception)
125 except:
126 logger.warning("Failure reporting error! Error was: %s", repr(b.exception[0]))
127 raise
128 elif ex is None:
129 ex = b.exception
130 else:
131 logger.warning(_("Multiple exceptions waiting; skipping %s"), b.exception[0])
132 if ex:
133 support.raise_with_traceback(ex[0], ex[1])
134
136 """A Blocker object starts life with 'happened = False'. Tasks can
137 ask to be suspended until 'happened = True'. The value is changed
138 by a call to trigger().
139
140 Example:
141
142 >>> kettle_boiled = tasks.Blocker()
143 >>> def make_tea():
144 print "Get cup"
145 print "Add tea leaves"
146 yield kettle_boiled
147 print "Pour water into cup"
148 print "Brew..."
149 yield tasks.TimeoutBlocker(120, "Brewing")
150 print "Add milk"
151 print "Ready!"
152 >>> tasks.Task(make_tea())
153
154 Then elsewhere, later::
155
156 print "Kettle boiled!"
157 kettle_boiled.trigger()
158
159 You can also yield a list of Blockers. Your function will resume
160 after any one of them is triggered. Use blocker.happened to
161 find out which one(s). Yielding a Blocker that has already
162 happened is the same as yielding None (gives any other Tasks a
163 chance to run, and then continues).
164 """
165
166 exception = None
167
169 """@type name: str"""
170 self.happened = False
171 self._zero_lib_tasks = set()
172 self.name = name
173
174 - def trigger(self, exception = None):
175 """The event has happened. Note that this cannot be undone;
176 instead, create a new Blocker to handle the next occurance
177 of the event.
178 @param exception: exception to raise in waiting tasks
179 @type exception: (Exception, traceback)"""
180 if self.happened: return
181 self.happened = True
182 self.exception = exception
183 self.exception_read = False
184
185 if not _run_queue:
186 _schedule()
187 _run_queue.append(self)
188
189 if exception:
190 assert isinstance(exception, tuple), exception
191 if not self._zero_lib_tasks:
192 logger.info(_("Exception from '%s', but nothing is waiting for it"), self)
193 import traceback
194 logger.debug(''.join(traceback.format_exception(type(exception[0]), exception[0], exception[1])))
195
196
197
198
199
200
202 """Called by the schedular when a Task yields this
203 Blocker. If you override this method, be sure to still
204 call this method with Blocker.add_task(self)!
205 @type task: L{Task}"""
206 assert task not in self._zero_lib_tasks, "Blocking on a single task twice: %s (%s)" % (task, self)
207 self._zero_lib_tasks.add(task)
208
210 """Called by the schedular when a Task that was waiting for
211 this blocker is resumed.
212 @type task: L{Task}"""
213 self._zero_lib_tasks.remove(task)
214
216 return "<Blocker:%s>" % self
217
220
222 """An IdleBlocker blocks until a task starts waiting on it, then
223 immediately triggers. An instance of this class is used internally
224 when a Task yields None."""
230
232 """Triggers after a set number of seconds."""
234 """Trigger after 'timeout' seconds (may be a fraction).
235 @type timeout: float
236 @type name: str"""
237 Blocker.__init__(self, name)
238 loop.call_later(timeout, self._timeout)
239
242
244 """@type blocker: L{InputBlocker} | L{OutputBlocker}"""
245 blocker._tag.cancel()
246 blocker.trigger()
247
270
273 """Triggers when os.write(stream) would not block."""
274 _tag = None
275 _stream = None
279
284
290
291 _idle_blocker = IdleBlocker("(idle)")
292
293 -class Task(object):
294 """Create a new Task when you have some long running function to
295 run in the background, but which needs to do work in 'chunks'.
296 Example:
297
298 >>> from zeroinstall import tasks
299 >>> def my_task(start):
300 for x in range(start, start + 5):
301 print "x =", x
302 yield None
303
304 >>> tasks.Task(my_task(0))
305 >>> tasks.Task(my_task(10))
306 >>> mainloop()
307
308 Yielding None gives up control of the processor to another Task,
309 causing the sequence printed to be interleaved. You can also yield a
310 Blocker (or a list of Blockers) if you want to wait for some
311 particular event before resuming (see the Blocker class for details).
312 """
313
315 """Call next(iterator) from a glib idle function. This function
316 can yield Blocker() objects to suspend processing while waiting
317 for events. name is used only for debugging.
318 @type name: str"""
319 self.iterator = iterator
320 self.finished = Blocker(name)
321
322 _idle_blocker.add_task(self)
323 self._zero_blockers = (_idle_blocker,)
324 logger.info(_("Scheduling new task: %s"), self)
325
327
328 for blocker in self._zero_blockers:
329 blocker.remove_task(self)
330
331 try:
332 new_blockers = next(self.iterator)
333 except StopIteration:
334
335 self.finished.trigger()
336 return
337 except SystemExit:
338 raise
339 except (Exception, KeyboardInterrupt) as ex:
340
341 logger.info(_("Exception from '%(name)s': %(exception)s"), {'name': self.finished.name, 'exception': ex})
342
343
344 tb = sys.exc_info()[2]
345 self.finished.trigger(exception = (ex, tb))
346 return
347 if new_blockers is None:
348
349 new_blockers = (_idle_blocker,)
350 else:
351 if isinstance(new_blockers, Blocker):
352
353 new_blockers = (new_blockers,)
354
355 for blocker in new_blockers:
356 assert hasattr(blocker, 'happened'), "Not a Blocker: %s from %s" % (blocker, self)
357 if blocker.happened:
358 new_blockers = (_idle_blocker,)
359 logger.info(_("Task '%(task)s' waiting on ready blocker %(blocker)s!"), {'task': self, 'blocker': blocker})
360 break
361 else:
362 logger.info(_("Task '%(task)s' stopping and waiting for '%(new_blockers)s'"), {'task': self, 'new_blockers': new_blockers})
363
364 for blocker in new_blockers:
365 blocker.add_task(self)
366 self._zero_blockers = new_blockers
367
369 return "Task(%s)" % self.finished.name
370
372 return self.finished.name
373
378
407
409 """Decorator that turns a generator function into a function that runs the
410 generator as a Task and returns the Task's finished blocker.
411 @param name: the name for the Task
412 @type name: str"""
413 def deco(fn):
414 def run(*args, **kwargs):
415 return Task(fn(*args, **kwargs), name).finished
416 run.__name__ = fn.__name__
417 return run
418 return deco
419
421 """Decorator that turns a generator function into a function that runs the
422 generator as a Task and returns the Task's finished blocker."""
423 def run(*args, **kwargs):
424 return Task(fn(*args, **kwargs), fn.__name__).finished
425 run.__name__ = fn.__name__
426 return run
427
429 """Run a recursive mainloop until blocker is triggered.
430 @param blocker: event to wait on
431 @type blocker: L{Blocker}
432 @since: 0.53"""
433 assert wait_for_blocker.x is None
434
435 if not blocker.happened:
436 def quitter():
437 yield blocker
438 if gobject:
439 wait_for_blocker.x.quit()
440 else:
441 wait_for_blocker.x.set_result(None)
442 Task(quitter(), "quitter")
443
444 wait_for_blocker.x = gobject.MainLoop() if gobject else tulip.Future()
445 try:
446 logger.debug(_("Entering mainloop, waiting for %s"), blocker)
447 if gobject:
448 wait_for_blocker.x.run()
449 else:
450 loop.run_until_complete(wait_for_blocker.x)
451 finally:
452 wait_for_blocker.x = None
453
454 assert blocker.happened, "Someone quit the main loop!"
455
456 check(blocker)
457 wait_for_blocker.x = None
458