Package zeroinstall :: Package injector :: Module sat
[frames] | no frames]

Source Code for Module zeroinstall.injector.sat

  1  """ 
  2  Internal implementation of a SAT solver, used by L{solver.SATSolver}. 
  3  This is not part of the public API. 
  4  """ 
  5   
  6  from __future__ import print_function 
7 8 # Copyright (C) 2010, Thomas Leonard 9 # See the README file for details, or visit http://0install.net. 10 11 # The design of this solver is very heavily based on the one described in 12 # the MiniSat paper "An Extensible SAT-solver [extended version 1.2]" 13 # http://minisat.se/Papers.html 14 # 15 # The main differences are: 16 # 17 # - We care about which solution we find (not just "satisfiable" or "not"). 18 # - We take care to be deterministic (always select the same versions given 19 # the same input). We do not do random restarts, etc. 20 # - We add an AtMostOneClause (the paper suggests this in the Excercises, and 21 # it's very useful for our purposes). 22 23 -def debug(msg, *args):
24 """@type msg: str""" 25 return 26 print("SAT:", msg % args)
27
28 # variables are numbered from 0 29 # literals have the same number as the corresponding variable, 30 # except they for negatives they are (-1-v): 31 # 32 # Variable Literal not(Literal) 33 # 0 0 -1 34 # 1 1 -2 35 -def neg(lit):
36 """@type lit: int 37 @rtype: int""" 38 return -1 - lit
39
40 -def watch_index(lit):
41 """@type lit: int 42 @rtype: int""" 43 if lit >= 0: 44 return lit * 2 45 return neg(lit) * 2 + 1
46
47 -class AtMostOneClause(object):
48 - def __init__(self, solver, lits):
49 """Preferred literals come first. 50 @type solver: L{SATProblem}""" 51 self.solver = solver 52 self.lits = lits 53 54 # The single literal from our set that is True. 55 # We store this explicitly because the decider needs to know quickly. 56 self.current = None
57
58 - def propagate(self, lit):
59 # Re-add ourselves to the watch list. 60 # (we we won't get any more notifications unless we backtrack, 61 # in which case we'd need to get back on the list anyway) 62 self.solver.watch_lit(lit, self) 63 64 # value[lit] has just become True 65 assert self.solver.lit_value(lit) == True 66 assert lit >= 0 67 68 #debug("%s: noticed %s has become True" % (self, self.solver.name_lit(lit))) 69 70 # If we already propagated successfully when the first 71 # one was set then we set all the others to False and 72 # anyone trying to set one True will get rejected. And 73 # if we didn't propagate yet, current will still be 74 # None, even if we now have a conflict (which we'll 75 # detect below). 76 assert self.current is None 77 78 self.current = lit 79 80 # If we later backtrace, call our undo function to unset current 81 self.solver.get_varinfo_for_lit(lit).undo.append(self) 82 83 for l in self.lits: 84 value = self.solver.lit_value(l) 85 #debug("Value of %s is %s" % (self.solver.name_lit(l), value)) 86 if value is True and l is not lit: 87 # Due to queuing, we might get called with current = None 88 # and two versions already selected. 89 debug("CONFLICT: already selected %s" % self.solver.name_lit(l)) 90 return False 91 if value is None: 92 # Since one of our lits is already true, all unknown ones 93 # can be set to False. 94 if not self.solver.enqueue(neg(l), self): 95 debug("CONFLICT: enqueue failed for %s", self.solver.name_lit(neg(l))) 96 return False # Conflict; abort 97 98 return True
99
100 - def undo(self, lit):
101 debug("(backtracking: no longer selected %s)" % self.solver.name_lit(lit)) 102 assert lit == self.current 103 self.current = None
104 105 # Why is lit True? 106 # Or, why are we causing a conflict (if lit is None)?
107 - def calc_reason(self, lit):
108 if lit is None: 109 # Find two True literals 110 trues = [] 111 for l in self.lits: 112 if self.solver.lit_value(l) is True: 113 trues.append(l) 114 if len(trues) == 2: return trues 115 else: 116 for l in self.lits: 117 if l is not lit and self.solver.lit_value(l) is True: 118 return [l] 119 # Find one True literal 120 assert 0 # don't know why!
121
122 - def best_undecided(self):
123 debug("best_undecided: %s" % (self.solver.name_lits(self.lits))) 124 for lit in self.lits: 125 #debug("%s = %s" % (self.solver.name_lit(lit), self.solver.lit_value(lit))) 126 if self.solver.lit_value(lit) is None: 127 return lit 128 return None
129
130 - def __repr__(self):
131 return "<at most one: %s>" % (', '.join(self.solver.name_lits(self.lits)))
132
133 -class UnionClause(object):
134 - def __init__(self, solver, lits):
135 """@type solver: L{SATProblem}""" 136 self.solver = solver 137 self.lits = lits
138 139 # Try to infer new facts. 140 # We can do this only when all of our literals are False except one, 141 # which is undecided. That is, 142 # False... or X or False... = True => X = True 143 # 144 # To get notified when this happens, we tell the solver to 145 # watch two of our undecided literals. Watching two undecided 146 # literals is sufficient. When one changes we check the state 147 # again. If we still have two or more undecided then we switch 148 # to watching them, otherwise we propagate. 149 # 150 # Returns False on conflict.
151 - def propagate(self, lit):
152 # value[get(lit)] has just become False 153 154 #debug("%s: noticed %s has become False" % (self, self.solver.name_lit(neg(lit)))) 155 156 # For simplicity, only handle the case where self.lits[1] 157 # is the one that just got set to False, so that: 158 # - value[lits[0]] = None | True 159 # - value[lits[1]] = False 160 # If it's the other way around, just swap them before we start. 161 if self.lits[0] == neg(lit): 162 self.lits[0], self.lits[1] = self.lits[1], self.lits[0] 163 164 if self.solver.lit_value(self.lits[0]) == True: 165 # We're already satisfied. Do nothing. 166 self.solver.watch_lit(lit, self) 167 return True 168 169 assert self.solver.lit_value(self.lits[1]) == False 170 171 # Find a new literal to watch now that lits[1] is resolved, 172 # swap it with lits[1], and start watching it. 173 for i in range(2, len(self.lits)): 174 value = self.solver.lit_value(self.lits[i]) 175 if value != False: 176 # Could be None or True. If it's True then we've already done our job, 177 # so this means we don't get notified unless we backtrack, which is fine. 178 self.lits[1], self.lits[i] = self.lits[i], self.lits[1] 179 self.solver.watch_lit(neg(self.lits[1]), self) 180 return True 181 182 # Only lits[0], is now undefined. 183 self.solver.watch_lit(lit, self) 184 return self.solver.enqueue(self.lits[0], self)
185
186 - def undo(self, lit): pass
187 188 # Why is lit True? 189 # Or, why are we causing a conflict (if lit is None)?
190 - def calc_reason(self, lit):
191 assert lit is None or lit is self.lits[0] 192 193 # The cause is everything except lit. 194 return [neg(l) for l in self.lits if l is not lit]
195
196 - def __repr__(self):
197 return "<some: %s>" % (', '.join(self.solver.name_lits(self.lits)))
198
199 # Using an array of VarInfo objects is less efficient than using multiple arrays, but 200 # easier for me to understand. 201 -class VarInfo(object):
202 __slots__ = ['value', 'reason', 'level', 'undo', 'obj']
203 - def __init__(self, obj):
204 self.value = None # True/False/None 205 self.reason = None # The constraint that implied our value, if True or False 206 self.level = -1 # The decision level at which we got a value (when not None) 207 self.undo = [] # Constraints to update if we become unbound (by backtracking) 208 self.obj = obj # The object this corresponds to (for our caller and for debugging)
209
210 - def __repr__(self):
211 return '%s=%s' % (self.name, self.value)
212 213 @property
214 - def name(self):
215 return str(self.obj)
216
217 -class SATProblem(object):
218 - def __init__(self):
219 # Propagation 220 self.watches = [] # watches[2i,2i+1] = constraints to check when literal[i] becomes True/False 221 self.propQ = [] # propagation queue 222 223 # Assignments 224 self.assigns = [] # [VarInfo] 225 self.trail = [] # order of assignments [lit] 226 self.trail_lim = [] # decision levels (len(trail) at each decision) 227 228 self.toplevel_conflict = False
229
230 - def get_decision_level(self):
231 """@rtype: int""" 232 return len(self.trail_lim)
233
234 - def add_variable(self, obj):
235 """@rtype: int""" 236 debug("add_variable('%s')", obj) 237 index = len(self.assigns) 238 239 self.watches += [[], []] # Add watch lists for X and not(X) 240 self.assigns.append(VarInfo(obj)) 241 return index
242 243 # lit is now True 244 # reason is the clause that is asserting this 245 # Returns False if this immediately causes a conflict.
246 - def enqueue(self, lit, reason):
247 """@type lit: int 248 @rtype: bool""" 249 debug("%s => %s" % (reason, self.name_lit(lit))) 250 old_value = self.lit_value(lit) 251 if old_value is not None: 252 if old_value is False: 253 # Conflict 254 return False 255 else: 256 # Already set (shouldn't happen) 257 return True 258 259 if lit < 0: 260 var_info = self.assigns[neg(lit)] 261 var_info.value = False 262 else: 263 var_info = self.assigns[lit] 264 var_info.value = True 265 var_info.level = self.get_decision_level() 266 var_info.reason = reason 267 268 self.trail.append(lit) 269 self.propQ.append(lit) 270 271 return True
272 273 # Pop most recent assignment from self.trail
274 - def undo_one(self):
275 lit = self.trail[-1] 276 debug("(pop %s)", self.name_lit(lit)) 277 var_info = self.get_varinfo_for_lit(lit) 278 var_info.value = None 279 var_info.reason = None 280 var_info.level = -1 281 self.trail.pop() 282 283 while var_info.undo: 284 var_info.undo.pop().undo(lit)
285
286 - def cancel(self):
287 n_this_level = len(self.trail) - self.trail_lim[-1] 288 debug("backtracking from level %d (%d assignments)" % 289 (self.get_decision_level(), n_this_level)) 290 while n_this_level != 0: 291 self.undo_one() 292 n_this_level -= 1 293 self.trail_lim.pop()
294
295 - def cancel_until(self, level):
296 """@type level: int""" 297 while self.get_decision_level() > level: 298 self.cancel()
299 300 # Process the propQ. 301 # Returns None when done, or the clause that caused a conflict.
302 - def propagate(self):
303 #debug("propagate: queue length = %d", len(self.propQ)) 304 while self.propQ: 305 lit = self.propQ[0] 306 del self.propQ[0] 307 wi = watch_index(lit) 308 watches = self.watches[wi] 309 self.watches[wi] = [] 310 311 debug("%s -> True : watches: %s" % (self.name_lit(lit), watches)) 312 313 # Notifiy all watchers 314 for i in range(len(watches)): 315 clause = watches[i] 316 if not clause.propagate(lit): 317 # Conflict 318 319 # Re-add remaining watches 320 self.watches[wi] += watches[i+1:] 321 322 # No point processing the rest of the queue as 323 # we'll have to backtrack now. 324 self.propQ = [] 325 326 return clause 327 return None
328
329 - def impossible(self):
330 self.toplevel_conflict = True
331
332 - def get_varinfo_for_lit(self, lit):
333 """@type lit: int 334 @rtype: L{VarInfo}""" 335 if lit >= 0: 336 return self.assigns[lit] 337 else: 338 return self.assigns[neg(lit)]
339
340 - def lit_value(self, lit):
341 """@type lit: int 342 @rtype: bool | None""" 343 if lit >= 0: 344 value = self.assigns[lit].value 345 return value 346 else: 347 v = -1 - lit 348 value = self.assigns[v].value 349 if value is None: 350 return None 351 else: 352 return not value
353 354 # Call cb when lit becomes True
355 - def watch_lit(self, lit, cb):
356 #debug("%s is watching for %s to become True" % (cb, self.name_lit(lit))) 357 """@type lit: int""" 358 self.watches[watch_index(lit)].append(cb)
359 360 # Returns the new clause if one was added, True if none was added 361 # because this clause is trivially True, or False if the clause is 362 # False.
363 - def _add_clause(self, lits, learnt, reason):
364 """@type lits: [int] 365 @type learnt: bool 366 @type reason: str 367 @rtype: L{zeroinstall.injector.sat.UnionClause} | bool""" 368 if not lits: 369 assert not learnt 370 self.toplevel_conflict = True 371 return False 372 elif len(lits) == 1: 373 # A clause with only a single literal is represented 374 # as an assignment rather than as a clause. 375 return self.enqueue(lits[0], reason) 376 377 clause = UnionClause(self, lits) 378 379 if learnt: 380 # lits[0] is None because we just backtracked. 381 # Start watching the next literal that we will 382 # backtrack over. 383 best_level = -1 384 best_i = 1 385 for i in range(1, len(lits)): 386 level = self.get_varinfo_for_lit(lits[i]).level 387 if level > best_level: 388 best_level = level 389 best_i = i 390 lits[1], lits[best_i] = lits[best_i], lits[1] 391 392 # Watch the first two literals in the clause (both must be 393 # undefined at this point). 394 for lit in lits[:2]: 395 self.watch_lit(neg(lit), clause) 396 397 return clause
398
399 - def name_lits(self, lst):
400 """@type lst: [int] 401 @rtype: [str]""" 402 return [self.name_lit(l) for l in lst]
403 404 # For nicer debug messages
405 - def name_lit(self, lit):
406 """@type lit: int 407 @rtype: str""" 408 if lit >= 0: 409 return self.assigns[lit].name 410 return "not(%s)" % self.assigns[neg(lit)].name
411
412 - def add_clause(self, lits):
413 # Public interface. Only used before the solve starts. 414 """@type lits: [int] 415 @rtype: L{zeroinstall.injector.sat.UnionClause} | bool""" 416 assert lits 417 418 debug("add_clause([%s])" % ', '.join(self.name_lits(lits))) 419 420 if any(self.lit_value(l) == True for l in lits): 421 # Trivially true already. 422 return True 423 lit_set = set(lits) 424 for l in lits: 425 if neg(l) in lit_set: 426 # X or not(X) is always True. 427 return True 428 # Remove duplicates and values known to be False 429 lits = [l for l in lit_set if self.lit_value(l) != False] 430 431 retval = self._add_clause(lits, learnt = False, reason = "input fact") 432 if not retval: 433 self.toplevel_conflict = True 434 return retval
435
436 - def at_most_one(self, lits):
437 """@type lits: [int] 438 @rtype: L{zeroinstall.injector.sat.AtMostOneClause}""" 439 assert lits 440 441 debug("at_most_one(%s)" % ', '.join(self.name_lits(lits))) 442 443 # If we have zero or one literals then we're trivially true 444 # and not really needed for the solve. However, Zero Install 445 # monitors these objects to find out what was selected, so 446 # keep even trivial ones around for that. 447 # 448 #if len(lits) < 2: 449 # return True # Trivially true 450 451 # Ensure no duplicates 452 assert len(set(lits)) == len(lits), lits 453 454 # Ignore any literals already known to be False. 455 # If any are True then they're enqueued and we'll process them 456 # soon. 457 lits = [l for l in lits if self.lit_value(l) != False] 458 459 clause = AtMostOneClause(self, lits) 460 461 for lit in lits: 462 self.watch_lit(lit, clause) 463 464 return clause
465
466 - def analyse(self, cause):
467 # After trying some assignments, we've discovered a conflict. 468 # e.g. 469 # - we selected A then B then C 470 # - from A, B, C we got X, Y 471 # - we have a rule: not(A) or not(X) or not(Y) 472 # 473 # The simplest thing to do would be: 474 # 1. add the rule "not(A) or not(B) or not(C)" 475 # 2. unassign C 476 # 477 # Then we we'd deduce not(C) and we could try something else. 478 # However, that would be inefficient. We want to learn a more 479 # general rule that will help us with the rest of the problem. 480 # 481 # We take the clause that caused the conflict ("cause") and 482 # ask it for its cause. In this case: 483 # 484 # A and X and Y => conflict 485 # 486 # Since X and Y followed logically from A, B, C there's no 487 # point learning this rule; we need to know to avoid A, B, C 488 # *before* choosing C. We ask the two variables deduced at the 489 # current level (X and Y) what caused them, and work backwards. 490 # e.g. 491 # 492 # X: A and C => X 493 # Y: C => Y 494 # 495 # Combining these, we get the cause of the conflict in terms of 496 # things we knew before the current decision level: 497 # 498 # A and X and Y => conflict 499 # A and (A and C) and (C) => conflict 500 # A and C => conflict 501 # 502 # We can then learn (record) the more general rule: 503 # 504 # not(A) or not(C) 505 # 506 # Then, in future, whenever A is selected we can remove C and 507 # everything that depends on it from consideration. 508 509 510 learnt = [None] # The general rule we're learning 511 btlevel = 0 # The deepest decision in learnt 512 p = None # The literal we want to expand now 513 seen = set() # The variables involved in the conflict 514 515 counter = 0 516 517 while True: 518 # cause is the reason why p is True (i.e. it enqueued it). 519 # The first time, p is None, which requests the reason 520 # why it is conflicting. 521 if p is None: 522 debug("Why did %s make us fail?" % cause) 523 p_reason = cause.calc_reason(p) 524 debug("Because: %s => conflict" % (' and '.join(self.name_lits(p_reason)))) 525 else: 526 debug("Why did %s lead to %s?" % (cause, self.name_lit(p))) 527 p_reason = cause.calc_reason(p) 528 debug("Because: %s => %s" % (' and '.join(self.name_lits(p_reason)), self.name_lit(p))) 529 530 # p_reason is in the form (A and B and ...) 531 # p_reason => p 532 533 # Check each of the variables in p_reason that we haven't 534 # already considered: 535 # - if the variable was assigned at the current level, 536 # mark it for expansion 537 # - otherwise, add it to learnt 538 539 for lit in p_reason: 540 var_info = self.get_varinfo_for_lit(lit) 541 if var_info not in seen: 542 seen.add(var_info) 543 if var_info.level == self.get_decision_level(): 544 # We deduced this var since the last decision. 545 # It must be in self.trail, so we'll get to it 546 # soon. Remember not to stop until we've processed it. 547 counter += 1 548 elif var_info.level > 0: 549 # We won't expand lit, just remember it. 550 # (we could expand it if it's not a decision, but 551 # apparently not doing so is useful) 552 learnt.append(neg(lit)) 553 btlevel = max(btlevel, var_info.level) 554 # else we already considered the cause of this assignment 555 556 # At this point, counter is the number of assigned 557 # variables in self.trail at the current decision level that 558 # we've seen. That is, the number left to process. Pop 559 # the next one off self.trail (as well as any unrelated 560 # variables before it; everything up to the previous 561 # decision has to go anyway). 562 563 # On the first time round the loop, we must find the 564 # conflict depends on at least one assignment at the 565 # current level. Otherwise, simply setting the decision 566 # variable caused a clause to conflict, in which case 567 # the clause should have asserted not(decision-variable) 568 # before we ever made the decision. 569 # On later times round the loop, counter was already > 570 # 0 before we started iterating over p_reason. 571 assert counter > 0 572 573 while True: 574 p = self.trail[-1] 575 var_info = self.get_varinfo_for_lit(p) 576 cause = var_info.reason 577 self.undo_one() 578 if var_info in seen: 579 break 580 debug("(irrelevant)") 581 counter -= 1 582 583 if counter <= 0: 584 assert counter == 0 585 # If counter = 0 then we still have one more 586 # literal (p) at the current level that we 587 # could expand. However, apparently it's best 588 # to leave this unprocessed (says the minisat 589 # paper). 590 break 591 592 # p is the literal we decided to stop processing on. It's either 593 # a derived variable at the current level, or the decision that 594 # led to this level. Since we're not going to expand it, add it 595 # directly to the learnt clause. 596 learnt[0] = neg(p) 597 598 debug("Learnt: %s" % (' or '.join(self.name_lits(learnt)))) 599 600 return learnt, btlevel
601
602 - def run_solver(self, decide):
603 """@rtype: bool""" 604 605 # Check whether we detected a trivial problem 606 # during setup. 607 if self.toplevel_conflict: 608 debug("FAIL: toplevel_conflict before starting solve!") 609 return False 610 611 while True: 612 # Use logical deduction to simplify the clauses 613 # and assign literals where there is only one possibility. 614 conflicting_clause = self.propagate() 615 if not conflicting_clause: 616 debug("new state: %s", self.assigns) 617 if all(info.value != None for info in self.assigns): 618 # Everything is assigned without conflicts 619 debug("SUCCESS!") 620 return True 621 else: 622 # Pick a variable and try assigning it one way. 623 # If it leads to a conflict, we'll backtrack and 624 # try it the other way. 625 lit = decide() 626 #print "TRYING:", self.name_lit(lit) 627 assert lit is not None, "decide function returned None!" 628 assert self.lit_value(lit) is None 629 self.trail_lim.append(len(self.trail)) 630 r = self.enqueue(lit, reason = "considering") 631 assert r is True 632 else: 633 if self.get_decision_level() == 0: 634 debug("FAIL: conflict found at top level") 635 return False 636 else: 637 # Figure out the root cause of this failure. 638 learnt, backtrack_level = self.analyse(conflicting_clause) 639 640 self.cancel_until(backtrack_level) 641 642 c = self._add_clause(learnt, learnt = True, reason = conflicting_clause) 643 644 if c is not True: 645 # Everything except the first literal in learnt is known to 646 # be False, so the first must be True. 647 e = self.enqueue(learnt[0], c) 648 assert e is True
649