Merge "improve performance of blockimgdiff"

This commit is contained in:
Doug Zongker 2016-02-10 16:52:10 +00:00 committed by Gerrit Code Review
commit ae91d54e89
1 changed files with 106 additions and 33 deletions

View File

@ -16,7 +16,9 @@ from __future__ import print_function
from collections import deque, OrderedDict from collections import deque, OrderedDict
from hashlib import sha1 from hashlib import sha1
import array
import common import common
import functools
import heapq import heapq
import itertools import itertools
import multiprocessing import multiprocessing
@ -24,6 +26,7 @@ import os
import re import re
import subprocess import subprocess
import threading import threading
import time
import tempfile import tempfile
from rangelib import RangeSet from rangelib import RangeSet
@ -204,6 +207,23 @@ class Transfer(object):
" to " + str(self.tgt_ranges) + ">") " to " + str(self.tgt_ranges) + ">")
@functools.total_ordering
class HeapItem(object):
def __init__(self, item):
self.item = item
# Negate the score since python's heap is a min-heap and we want
# the maximum score.
self.score = -item.score
def clear(self):
self.item = None
def __bool__(self):
return self.item is None
def __eq__(self, other):
return self.score == other.score
def __le__(self, other):
return self.score <= other.score
# BlockImageDiff works on two image objects. An image object is # BlockImageDiff works on two image objects. An image object is
# anything that provides the following attributes: # anything that provides the following attributes:
# #
@ -734,7 +754,7 @@ class BlockImageDiff(object):
# - we write every block we care about exactly once. # - we write every block we care about exactly once.
# Start with no blocks having been touched yet. # Start with no blocks having been touched yet.
touched = RangeSet() touched = array.array("B", "\0" * self.tgt.total_blocks)
# Imagine processing the transfers in order. # Imagine processing the transfers in order.
for xf in self.transfers: for xf in self.transfers:
@ -745,14 +765,22 @@ class BlockImageDiff(object):
for _, sr in xf.use_stash: for _, sr in xf.use_stash:
x = x.subtract(sr) x = x.subtract(sr)
assert not touched.overlaps(x) for s, e in x:
# Check that the output blocks for this transfer haven't yet been touched. for i in range(s, e):
assert not touched.overlaps(xf.tgt_ranges) assert touched[i] == 0
# Touch all the blocks written by this transfer.
touched = touched.union(xf.tgt_ranges) # Check that the output blocks for this transfer haven't yet
# been touched, and touch all the blocks written by this
# transfer.
for s, e in xf.tgt_ranges:
for i in range(s, e):
assert touched[i] == 0
touched[i] = 1
# Check that we've written every target block. # Check that we've written every target block.
assert touched == self.tgt.care_map for s, e in self.tgt.care_map:
for i in range(s, e):
assert touched[i] == 1
def ImproveVertexSequence(self): def ImproveVertexSequence(self):
print("Improving vertex order...") print("Improving vertex order...")
@ -889,6 +917,7 @@ class BlockImageDiff(object):
for xf in self.transfers: for xf in self.transfers:
xf.incoming = xf.goes_after.copy() xf.incoming = xf.goes_after.copy()
xf.outgoing = xf.goes_before.copy() xf.outgoing = xf.goes_before.copy()
xf.score = sum(xf.outgoing.values()) - sum(xf.incoming.values())
# We use an OrderedDict instead of just a set so that the output # We use an OrderedDict instead of just a set so that the output
# is repeatable; otherwise it would depend on the hash values of # is repeatable; otherwise it would depend on the hash values of
@ -899,52 +928,67 @@ class BlockImageDiff(object):
s1 = deque() # the left side of the sequence, built from left to right s1 = deque() # the left side of the sequence, built from left to right
s2 = deque() # the right side of the sequence, built from right to left s2 = deque() # the right side of the sequence, built from right to left
while G: heap = []
for xf in self.transfers:
xf.heap_item = HeapItem(xf)
heap.append(xf.heap_item)
heapq.heapify(heap)
sinks = set(u for u in G if not u.outgoing)
sources = set(u for u in G if not u.incoming)
def adjust_score(iu, delta):
iu.score += delta
iu.heap_item.clear()
iu.heap_item = HeapItem(iu)
heapq.heappush(heap, iu.heap_item)
while G:
# Put all sinks at the end of the sequence. # Put all sinks at the end of the sequence.
while True: while sinks:
sinks = [u for u in G if not u.outgoing] new_sinks = set()
if not sinks:
break
for u in sinks: for u in sinks:
if u not in G: continue
s2.appendleft(u) s2.appendleft(u)
del G[u] del G[u]
for iu in u.incoming: for iu in u.incoming:
del iu.outgoing[u] adjust_score(iu, -iu.outgoing.pop(u))
if not iu.outgoing: new_sinks.add(iu)
sinks = new_sinks
# Put all the sources at the beginning of the sequence. # Put all the sources at the beginning of the sequence.
while True: while sources:
sources = [u for u in G if not u.incoming] new_sources = set()
if not sources:
break
for u in sources: for u in sources:
if u not in G: continue
s1.append(u) s1.append(u)
del G[u] del G[u]
for iu in u.outgoing: for iu in u.outgoing:
del iu.incoming[u] adjust_score(iu, +iu.incoming.pop(u))
if not iu.incoming: new_sources.add(iu)
sources = new_sources
if not G: if not G: break
break
# Find the "best" vertex to put next. "Best" is the one that # Find the "best" vertex to put next. "Best" is the one that
# maximizes the net difference in source blocks saved we get by # maximizes the net difference in source blocks saved we get by
# pretending it's a source rather than a sink. # pretending it's a source rather than a sink.
max_d = None while True:
best_u = None u = heapq.heappop(heap)
for u in G: if u and u.item in G:
d = sum(u.outgoing.values()) - sum(u.incoming.values()) u = u.item
if best_u is None or d > max_d: break
max_d = d
best_u = u
u = best_u
s1.append(u) s1.append(u)
del G[u] del G[u]
for iu in u.outgoing: for iu in u.outgoing:
del iu.incoming[u] adjust_score(iu, +iu.incoming.pop(u))
if not iu.incoming: sources.add(iu)
for iu in u.incoming: for iu in u.incoming:
del iu.outgoing[u] adjust_score(iu, -iu.outgoing.pop(u))
if not iu.outgoing: sinks.add(iu)
# Now record the sequence in the 'order' field of each transfer, # Now record the sequence in the 'order' field of each transfer,
# and by rearranging self.transfers to be in the chosen sequence. # and by rearranging self.transfers to be in the chosen sequence.
@ -960,10 +1004,38 @@ class BlockImageDiff(object):
def GenerateDigraph(self): def GenerateDigraph(self):
print("Generating digraph...") print("Generating digraph...")
# Each item of source_ranges will be:
# - None, if that block is not used as a source,
# - a transfer, if one transfer uses it as a source, or
# - a set of transfers.
source_ranges = []
for b in self.transfers:
for s, e in b.src_ranges:
if e > len(source_ranges):
source_ranges.extend([None] * (e-len(source_ranges)))
for i in range(s, e):
if source_ranges[i] is None:
source_ranges[i] = b
else:
if not isinstance(source_ranges[i], set):
source_ranges[i] = set([source_ranges[i]])
source_ranges[i].add(b)
for a in self.transfers: for a in self.transfers:
for b in self.transfers: intersections = set()
if a is b: for s, e in a.tgt_ranges:
continue for i in range(s, e):
if i >= len(source_ranges): break
b = source_ranges[i]
if b is not None:
if isinstance(b, set):
intersections.update(b)
else:
intersections.add(b)
for b in intersections:
if a is b: continue
# If the blocks written by A are read by B, then B needs to go before A. # If the blocks written by A are read by B, then B needs to go before A.
i = a.tgt_ranges.intersect(b.src_ranges) i = a.tgt_ranges.intersect(b.src_ranges)
@ -1092,6 +1164,7 @@ class BlockImageDiff(object):
"""Assert that all the RangeSets in 'seq' form a partition of the """Assert that all the RangeSets in 'seq' form a partition of the
'total' RangeSet (ie, they are nonintersecting and their union 'total' RangeSet (ie, they are nonintersecting and their union
equals 'total').""" equals 'total')."""
so_far = RangeSet() so_far = RangeSet()
for i in seq: for i in seq:
assert not so_far.overlaps(i) assert not so_far.overlaps(i)