Package zeroinstall :: Package support :: Module tasks
[frames] | no frames]

Source Code for Module zeroinstall.support.tasks

  1  """The tasks module provides a simple light-weight alternative to threads. 
  2   
  3  When you have a long-running job you will want to run it in the background, 
  4  while the user does other things. There are four ways to do this: 
  5   
  6   - Use a new thread for each task. 
  7   - Use callbacks from an idle handler. 
  8   - Use a recursive mainloop. 
  9   - Use this module. 
 10   
 11  Using threads causes a number of problems: they introduce race conditions, 
 12  often lead to many subtle bugs, and they require lots of resources (you 
 13  probably wouldn't want 10,000 threads running at once). In particular, two 
 14  threads can run at exactly the same time (perhaps on different processors), so 
 15  you have to be really careful that they don't both try to update the same 
 16  variables at the same time. This requires lots of messy locking, which is hard 
 17  to get right. 
 18   
 19  Callbacks work within a single thread. For example, you open a dialog box and 
 20  then tell the system to call one function if it's closed, and another if the 
 21  user clicks OK, etc. The function that opened the box then returns, and the 
 22  system calls one of the given callback functions later. Callbacks only 
 23  execute one at a time, so you don't have to worry about race conditions. 
 24  However, they are often very awkward to program with, because you have to 
 25  save state somewhere and then pass it to the functions when they're called. 
 26   
 27  A recursive mainloop only works with nested tasks (you can create a 
 28  sub-task, but the main task can't continue until the sub-task has 
 29  finished), so they are not appropriate for long-running jobs. 
 30   
 31  Tasks use Python's generator API to provide a more pleasant interface to 
 32  callbacks. See the Task class (below) for more information. 
 33  """ 
 34   
 35  # Copyright (C) 2009, Thomas Leonard 
 36  # See the README file for details, or visit http://0install.net. 
 37   
 38  from zeroinstall import _, support, logger, gobject 
 39  import sys 
 40   
 41  if gobject: 
42 - class _Handler(object):
43 - def cancel(self):
44 if self.tag is not None: 45 gobject.source_remove(self.tag) 46 self.tag = None
47
48 - class loop(object):
49 @staticmethod
50 - def call_soon_threadsafe(cb):
51 def wrapper(): 52 cb() 53 return False
54 gobject.idle_add(wrapper)
55 56 call_soon = call_soon_threadsafe 57 58 @staticmethod
59 - def call_repeatedly(interval, cb):
60 h = _Handler() 61 def wrapper(): 62 assert h.tag is not None 63 cb() 64 return True
65 h.tag = gobject.timeout_add(int(interval * 1000), wrapper) 66 return h 67 68 @staticmethod
69 - def call_later(delay, cb):
70 """@type delay: float""" 71 def wrapper(): 72 cb() 73 return False
74 gobject.timeout_add(int(delay * 1000), wrapper) 75 76 @staticmethod
77 - def add_reader(fd, cb, *args):
78 """@type fd: int 79 @rtype: L{_Handler}""" 80 h = _Handler() 81 def wrapper(src, cond): 82 cb(*args) 83 return True
84 h.tag = gobject.io_add_watch(fd, gobject.IO_IN | gobject.IO_HUP, wrapper) 85 return h 86 87 @staticmethod
88 - def add_writer(fd, cb, *args):
89 h = _Handler() 90 def wrapper(src, cond): 91 cb(*args) 92 return True
93 h.tag = gobject.io_add_watch(fd, gobject.IO_OUT | gobject.IO_HUP, wrapper) 94 return h 95 else: 96 try: 97 import tulip 98 except ImportError:
99 # Delay the error until we actually need a mainloop 100 - class Fail(object):
101 - def __getattr__(self, x):
102 raise Exception("No mainloop available: install python3-gi or tulip")
103 tulip = loop = Fail() 104 else: 105 loop = tulip.get_event_loop() 106 107 # The list of Blockers whose event has happened, in the order they were 108 # triggered 109 _run_queue = []
110 111 -def check(blockers, reporter = None):
112 """See if any of the blockers have pending exceptions. 113 If reporter is None, raise the first and log the rest. 114 @type blockers: [L{Blocker}] 115 @param reporter: invoke this function on each error""" 116 ex = None 117 if isinstance(blockers, Blocker): 118 blockers = (blockers,) 119 for b in blockers: 120 if b.exception: 121 b.exception_read = True 122 if reporter: 123 try: 124 reporter(*b.exception) 125 except: 126 logger.warning("Failure reporting error! Error was: %s", repr(b.exception[0])) 127 raise 128 elif ex is None: 129 ex = b.exception 130 else: 131 logger.warning(_("Multiple exceptions waiting; skipping %s"), b.exception[0]) 132 if ex: 133 support.raise_with_traceback(ex[0], ex[1])
134
135 -class Blocker(object):
136 """A Blocker object starts life with 'happened = False'. Tasks can 137 ask to be suspended until 'happened = True'. The value is changed 138 by a call to trigger(). 139 140 Example: 141 142 >>> kettle_boiled = tasks.Blocker() 143 >>> def make_tea(): 144 print "Get cup" 145 print "Add tea leaves" 146 yield kettle_boiled 147 print "Pour water into cup" 148 print "Brew..." 149 yield tasks.TimeoutBlocker(120, "Brewing") 150 print "Add milk" 151 print "Ready!" 152 >>> tasks.Task(make_tea()) 153 154 Then elsewhere, later:: 155 156 print "Kettle boiled!" 157 kettle_boiled.trigger() 158 159 You can also yield a list of Blockers. Your function will resume 160 after any one of them is triggered. Use blocker.happened to 161 find out which one(s). Yielding a Blocker that has already 162 happened is the same as yielding None (gives any other Tasks a 163 chance to run, and then continues). 164 """ 165 166 exception = None 167
168 - def __init__(self, name):
169 """@type name: str""" 170 self.happened = False # False until event triggered 171 self._zero_lib_tasks = set() # Tasks waiting on this blocker 172 self.name = name
173
174 - def trigger(self, exception = None):
175 """The event has happened. Note that this cannot be undone; 176 instead, create a new Blocker to handle the next occurance 177 of the event. 178 @param exception: exception to raise in waiting tasks 179 @type exception: (Exception, traceback)""" 180 if self.happened: return # Already triggered 181 self.happened = True 182 self.exception = exception 183 self.exception_read = False 184 #assert self not in _run_queue # Slow 185 if not _run_queue: 186 _schedule() 187 _run_queue.append(self) 188 189 if exception: 190 assert isinstance(exception, tuple), exception 191 if not self._zero_lib_tasks: 192 logger.info(_("Exception from '%s', but nothing is waiting for it"), self) 193 import traceback 194 logger.debug(''.join(traceback.format_exception(type(exception[0]), exception[0], exception[1])))
195 196 # (causes leaks by preventing blockers from being GC'd if in cycles) 197 #def __del__(self): 198 # if self.exception and not self.exception_read: 199 # warn(_("Blocker %(blocker)s garbage collected without having it's exception read: %(exception)s"), {'blocker': self, 'exception': self.exception}) 200
201 - def add_task(self, task):
202 """Called by the schedular when a Task yields this 203 Blocker. If you override this method, be sure to still 204 call this method with Blocker.add_task(self)! 205 @type task: L{Task}""" 206 assert task not in self._zero_lib_tasks, "Blocking on a single task twice: %s (%s)" % (task, self) 207 self._zero_lib_tasks.add(task)
208
209 - def remove_task(self, task):
210 """Called by the schedular when a Task that was waiting for 211 this blocker is resumed. 212 @type task: L{Task}""" 213 self._zero_lib_tasks.remove(task)
214
215 - def __repr__(self):
216 return "<Blocker:%s>" % self
217
218 - def __str__(self):
219 return self.name
220
221 -class IdleBlocker(Blocker):
222 """An IdleBlocker blocks until a task starts waiting on it, then 223 immediately triggers. An instance of this class is used internally 224 when a Task yields None."""
225 - def add_task(self, task):
226 """Also calls trigger. 227 @type task: L{Task}""" 228 Blocker.add_task(self, task) 229 self.trigger()
230
231 -class TimeoutBlocker(Blocker):
232 """Triggers after a set number of seconds."""
233 - def __init__(self, timeout, name):
234 """Trigger after 'timeout' seconds (may be a fraction). 235 @type timeout: float 236 @type name: str""" 237 Blocker.__init__(self, name) 238 loop.call_later(timeout, self._timeout)
239
240 - def _timeout(self):
241 self.trigger()
242
243 -def _io_callback(blocker):
244 """@type blocker: L{InputBlocker} | L{OutputBlocker}""" 245 blocker._tag.cancel() 246 blocker.trigger()
247
248 -class InputBlocker(Blocker):
249 """Triggers when os.read(stream) would not block.""" 250 _tag = None 251 _stream = None
252 - def __init__(self, stream, name):
253 """@type stream: int 254 @type name: str""" 255 Blocker.__init__(self, name) 256 self._stream = stream
257
258 - def add_task(self, task):
259 """@type task: L{Task}""" 260 Blocker.add_task(self, task) 261 if self._tag is None: 262 self._tag = loop.add_reader(self._stream, _io_callback, self)
263
264 - def remove_task(self, task):
265 """@type task: L{Task}""" 266 Blocker.remove_task(self, task) 267 if not self._zero_lib_tasks: 268 self._tag.cancel() 269 self._tag = None
270
271 # Note: this isn't used within 0install 272 -class OutputBlocker(Blocker):
273 """Triggers when os.write(stream) would not block.""" 274 _tag = None 275 _stream = None
276 - def __init__(self, stream, name):
277 Blocker.__init__(self, name) 278 self._stream = stream
279
280 - def add_task(self, task):
281 Blocker.add_task(self, task) 282 if self._tag is None: 283 self._tag = loop.add_writer(self._stream, _io_callback, self)
284
285 - def remove_task(self, task):
286 Blocker.remove_task(self, task) 287 if not self._zero_lib_tasks: 288 self._tag.cancel() 289 self._tag = None
290 291 _idle_blocker = IdleBlocker("(idle)")
292 293 -class Task(object):
294 """Create a new Task when you have some long running function to 295 run in the background, but which needs to do work in 'chunks'. 296 Example: 297 298 >>> from zeroinstall import tasks 299 >>> def my_task(start): 300 for x in range(start, start + 5): 301 print "x =", x 302 yield None 303 304 >>> tasks.Task(my_task(0)) 305 >>> tasks.Task(my_task(10)) 306 >>> mainloop() 307 308 Yielding None gives up control of the processor to another Task, 309 causing the sequence printed to be interleaved. You can also yield a 310 Blocker (or a list of Blockers) if you want to wait for some 311 particular event before resuming (see the Blocker class for details). 312 """ 313
314 - def __init__(self, iterator, name):
315 """Call next(iterator) from a glib idle function. This function 316 can yield Blocker() objects to suspend processing while waiting 317 for events. name is used only for debugging. 318 @type name: str""" 319 self.iterator = iterator 320 self.finished = Blocker(name) 321 # Block new task on the idle handler... 322 _idle_blocker.add_task(self) 323 self._zero_blockers = (_idle_blocker,) 324 logger.info(_("Scheduling new task: %s"), self)
325
326 - def _resume(self):
327 # Remove from our blockers' queues 328 for blocker in self._zero_blockers: 329 blocker.remove_task(self) 330 # Resume the task 331 try: 332 new_blockers = next(self.iterator) 333 except StopIteration: 334 # Task ended 335 self.finished.trigger() 336 return 337 except SystemExit: 338 raise 339 except (Exception, KeyboardInterrupt) as ex: 340 # Task crashed 341 logger.info(_("Exception from '%(name)s': %(exception)s"), {'name': self.finished.name, 'exception': ex}) 342 #import traceback 343 #debug(''.join(traceback.format_exception(*sys.exc_info()))) 344 tb = sys.exc_info()[2] 345 self.finished.trigger(exception = (ex, tb)) 346 return 347 if new_blockers is None: 348 # Just give up control briefly 349 new_blockers = (_idle_blocker,) 350 else: 351 if isinstance(new_blockers, Blocker): 352 # Wrap a single yielded blocker into a list 353 new_blockers = (new_blockers,) 354 # Are we blocking on something that already happened? 355 for blocker in new_blockers: 356 assert hasattr(blocker, 'happened'), "Not a Blocker: %s from %s" % (blocker, self) 357 if blocker.happened: 358 new_blockers = (_idle_blocker,) 359 logger.info(_("Task '%(task)s' waiting on ready blocker %(blocker)s!"), {'task': self, 'blocker': blocker}) 360 break 361 else: 362 logger.info(_("Task '%(task)s' stopping and waiting for '%(new_blockers)s'"), {'task': self, 'new_blockers': new_blockers}) 363 # Add to new blockers' queues 364 for blocker in new_blockers: 365 blocker.add_task(self) 366 self._zero_blockers = new_blockers
367
368 - def __repr__(self):
369 return "Task(%s)" % self.finished.name
370
371 - def __str__(self):
372 return self.finished.name
373
374 # Must append to _run_queue right after calling this! 375 -def _schedule():
376 assert not _run_queue 377 loop.call_soon(_handle_run_queue)
378
379 -def _handle_run_queue():
380 global _idle_blocker 381 assert _run_queue 382 383 next = _run_queue[0] 384 assert next.happened 385 386 if next is _idle_blocker: 387 # Since this blocker will never run again, create a 388 # new one for future idling. 389 _idle_blocker = IdleBlocker("(idle)") 390 elif next._zero_lib_tasks: 391 logger.info(_("Running %(task)s due to triggering of '%(next)s'"), {'task': next._zero_lib_tasks, 'next': next}) 392 else: 393 logger.info(_("Running %s"), next) 394 395 tasks = frozenset(next._zero_lib_tasks) 396 if tasks: 397 next.noticed = True 398 399 for task in tasks: 400 # Run 'task'. 401 task._resume() 402 403 del _run_queue[0] 404 405 if _run_queue: 406 loop.call_soon(_handle_run_queue)
407
408 -def named_async(name):
409 """Decorator that turns a generator function into a function that runs the 410 generator as a Task and returns the Task's finished blocker. 411 @param name: the name for the Task 412 @type name: str""" 413 def deco(fn): 414 def run(*args, **kwargs): 415 return Task(fn(*args, **kwargs), name).finished
416 run.__name__ = fn.__name__ 417 return run 418 return deco 419
420 -def async(fn):
421 """Decorator that turns a generator function into a function that runs the 422 generator as a Task and returns the Task's finished blocker.""" 423 def run(*args, **kwargs): 424 return Task(fn(*args, **kwargs), fn.__name__).finished
425 run.__name__ = fn.__name__ 426 return run 427
428 -def wait_for_blocker(blocker):
429 """Run a recursive mainloop until blocker is triggered. 430 @param blocker: event to wait on 431 @type blocker: L{Blocker} 432 @since: 0.53""" 433 assert wait_for_blocker.x is None # Avoid recursion 434 435 if not blocker.happened: 436 def quitter(): 437 yield blocker 438 if gobject: 439 wait_for_blocker.x.quit() 440 else: 441 wait_for_blocker.x.set_result(None)
442 Task(quitter(), "quitter") 443 444 wait_for_blocker.x = gobject.MainLoop() if gobject else tulip.Future() 445 try: 446 logger.debug(_("Entering mainloop, waiting for %s"), blocker) 447 if gobject: 448 wait_for_blocker.x.run() 449 else: 450 loop.run_until_complete(wait_for_blocker.x) 451 finally: 452 wait_for_blocker.x = None 453 454 assert blocker.happened, "Someone quit the main loop!" 455 456 check(blocker) 457 wait_for_blocker.x = None 458