0

Tracking motifs in changing graphs

 3 years ago
source link: https://github.com/frankmcsherry/blog/blob/master/posts/2016-09-17.md
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Tracking motifs in changing graphs

Imagine you are the sort of person who gets to sit and watch high-throughput streams of social signals fly past. There is probably lots of really interesting information here, but how can you find it? Maybe you've realized that PageRank is kinda silly, and what you want to find are instances of surprising, non-trivial structure in your graphs.

In this post we are going to look at the problem of finding and tracking motifs in graphs that change.

A "motif" is a small connected subgraph whose nodes are variables. For example, the "triangle motif" is a graph that looks like so:

(x0, x1), (x1, x2), (x0, x2)

A motif "exists" in a larger graph whenever we can assign actual node identifiers to the node variables so that all edges identified by the motif exist in the graph. There is some debate about whether the non-edges should non-exist; we'll talk about both. Each assignment of these variables is an instance of the motif.

Returning to the example of a triangle, we will write it the way we might write a Datalog rule, announcing some variables on one side and what needs to be true about the relationships between them on the other.

triangle(x0, x1, x2) := edge(x0, x1), edge(x0, x2), edge(x1, x2)

A triangle is a small graph, but we are interested in the sets of assignments to x0, x1, and x2 so that all three edges exist. There could be lots of these in a larger graph, and they could also change substantially as we add and remove edges. It would be great to have all of them brought to our attention.

Perhaps you aren't as interested in abstract "triangles", because seriously who cares? In which case, the motif above applied to a directed graph is an instance of a feed-forward loop, where signal from x0 gets to x2 both directly and through x1. There are lots of other motifs that are plausibly interesting, a subject on which I have only limited credibility (determining motifs which are meaningful, if any, seems to be open research).

For the busy people among you, we are going to end up describing a data-parallel system that tracks all occurrences of arbitrary motifs in arbitrarily changing graphs, with (i) low latency, (ii) worst-case optimal throughput, (iii) a memory footprint linear in the size of the graph. I'm also experimenting with bolding important things, rather than writing paragraphs explaining them.

This is all joint work with Khaled Ammar and Semih Salihoglu, both at Waterloo. It is an instance of a more general approach to efficiently maintaining relational algebra queries over continually updating relations, if you prefer to think of things that way. We are writing it up now, and experimenting with the question of "what happens to academics when they tell people what they are working on before they publish it".

An outline

There are some steps to follow to get where we need to go, so we will do this in parts.

I'm going to use the livejournal graph dataset for most of the experiments, so if you want to grab that and follow along, sweet. I'll also report some statistics for the twitter_rv graph dataset, though mainly to demonstrate the asymptotic difference between various algorithms, as it is just too painful to run the slower algorithms on this larger (and more skewed) dataset.

I'm going to use the directed version of these graph, and all of the work we will develop apply to motifs in directed graphs. This means that "triangles"

triangle(x0, x1, x2) := edge(x0, x1), edge(x0, x2), edge(x1, x2)

are really the feed-forward loops, because the edges have directions and we need to respect them. In this setting, a feed-forward loop is totally different from a directed 3-cycle (here written oddly),

3cycle(x0, x1, x2) := edge(x0, x1), edge(x2, x0), edge(x1, x2)

Independently, there are a parade of tricks one can apply to motifs in undirected graphs, from only representing the edges once to observing symmetries in the motif to re-labelling graph identifiers to smooth out some issue we will run into with directed motifs. Our take is that you can and should (i) develop general infrastructure for directed graphs and relations generally, and (ii) then apply your collection of cute tricks. This isn't going to be the place to learn about the absolute best way to determine triangles in an undirected graph, but it will be pretty good nonetheless.

1. Motifs in graphs.

One way to look for meaning in graphs is to look for interesting repeated structural patterns.

Triangles are an interesting subgraph because they appear relatively infrequently by chance. In a graph with m edges placed randomly among n nodes, there are n^3 possible directed triangles each of which has a (roughly) (m/n^2)^3 chance of existing. This gives us roughly (m/n)^3 directed triangles, in expectation. In real graphs triangles appear much more often than this and act as signals of structure, places where things are not completely random.

More generally, specific subgraphs on k vertices with at least k edges are unlikely to appear by chance. When they do occur, it is usually a sign that something interesting is going on in the graph. One can project various semantic interpretations of each of these subgraphs, but several lines of work are interested in tracking the arrival and departure of these subgraphs, or "motifs" as we will call them here, as graph edges arrive and depart.

There are several ways to go about finding motifs in a graph, so let's start with the simplest and get progressively smarter.

Enumerate everything

From a graph on n nodes, we could just enumerate all n choose k possible assignments of nodes identifiers to the k variables, and see if the corresponding edges exist. Even for small n this could be an enormous amount of work. We want to compute the answer that this would compute, we just don't want to compute it this way.

Concretely, the livejournal graph has some 4.8 million vertices, in which the number of possible directed triangles is

4.8m^3 = 1.10592x10^20

The twitter_rv graph has some 42 million vertices, and so has about 670x as many possible directed triangles as does livejournal.

Explicitly enumerating these possibilities and checking them is not reasonable, and we aren't going to implement and evaluate that for obvious reasons. It is nonetheless good to understand that what we want can be computed, and we just need to optimize the implementation.

Tracking changes explicitly might be easier: if an edge (x0, x1) changes (i.e. arrives or departs), two of the variables are already bound and we only need to look at all of the x2. There are as many settings of x2 as nodes in the graph, which is still a lot but less crazy than when we had to cube that number. However, if we go to motifs on more variables the craziness returns. Let's hold off on this for now.

Relational joins

One way to represent the triangle motif is by a relational query, representing the set of edges by a relation edge(x,y) and joining three instances of the edge relation. We've actually been writing things this way already.

triangle(x0, x1, x2) := edge(x0, x1), edge(x0, x2), edge(x1, x2)

This notation says that if we have a relation (set) of pairs edge(x0, x1) then we can define triangles by the triples (x0, x1, x2) so that each of the corresponding pairs exists in edge.

More generally, if we have a motif on k variables with edges (xi, xj) we can reframe the motif as a relational query (like, SQL) where we use the edge relation once for each edge we require between variables.

motif(x0, x1, ..) := edge(xi0, xj0), edge(xi1, xj0), ..

This may seem like we've just re-though how we write things down, but once we write the motifs as relational queries over an edge relation, we can evaluate each query as if we were a vanilla database system.

The typical approach to evaluating relational joins is to create a tree of binary joins, where the leaves are relations in the join and each internal node corresponds to the relational join of whatever relations are produced by its two children. This approach has a certain simplicity, and has been the standard way that databases have resolved joins for quite some time. If your motif doesn't have any cycles (i.e. has k-1 edges on k variables), you can even implement this optimally, in the sense that the amount of work done is at worst linear in the size of your input (number of edges) plus the output (number of motif instances found).

At the same time, if your motif has cycles (and the most non-trivial ones will) this approach can be much worse. Let's look at the triangles example, which uses the edge relation three times, and choose as the first join to perform:

candidates(x0, x1, x2) := edge(x0, x1), edge(x0, x2)

Our next step will join candidates(x0, x1, x2) with edge(x1, x2), an intersection, but as part of doing this we end up enumerating at least the contents of candidates; how big could it possibly be?

Each graph node x0 contributes each pair of its out-neighbors to candidates. The maximum degrees in livejournal and twitter_rv are

max_deg(livejournal) = 20,293
max_deg(twitter_rv)  = 2,997,469

which mean that they could produce lots of candidates, especially twitter_rv. In fact we can just evaluate the size of candidates for each by summing the squares of out-degrees, giving

candidates(livejournal) = 7,292,467,251
candidates(twitter_rv)  = 243,932,072,881,466

The livejournal graph has quite a few candidates, but I can't even remember how to pronounce how many candidates twitter_rv has.

Of pedagogical import, there are more candidates in twitter_rv than could possibly exist with its 1,468,365,182 edges. If we used all of those edges to make a complete directed graph on sqrt(1.5b) nodes (we are a few edges short), we would have only as many directed triangles as the cube of that, or

tri_max(1.5b) = 58,094,750,193,111

So, we are looking at at least 4x as many candidates as there could possibly be triangles in an adversarially arranged graph. We can probably do better than this (and we will, because others before us already have).

Let's write up some code to go and compute the number of triangles using a standard relational join. We are only going to run it on the livejournal graph, for obvious reasons. Rather than actually materialize the seven billion triples (x0, x1, x2), we will enumerate them and check each to see if the edge (x1, x2) exists.

let mut triangles = 0;
for x0 in 0 .. graph.nodes() {
	for x1 in graph.edges(x0) {
		for x2 in graph.edges(x0)
			// check if edge (x1, x2) exists.
			if graph.edges(x1).contains(x2) {
				triangles += 1;
			}
		}
	}
}

The contains method could be written several ways. If the data were hashsets, we would just do a hash lookup. In my code, edges are stored as a sorted list of identifiers, so it would be binary search. If we were smarter, we could note that the x2 are themselves sorted, and that means that we could probably do a bulk intersect a bit faster. Hold that thought for a moment.

triangles: 946400853 in: Duration { secs: 41, nanos: 538378761 }

This is ... a time. Cool. It's actually not even a bad time, relative to the smarter approaches we are going to look at next.

If we were to run this computation on the twitter_rv graph, imagining we could enumerate and check edges at the same rate (say, 200m candidates / sec) it would take only 1219660.36s, roughly 338 hours or just over two weeks.

So, this vanilla approach using relational joins might work ok on smaller graphs, but probably isn't so great on some larger graphs.

Being smarter about triangles

A certain smartness (who's lineage I don't know) has developed for computing triangles, which can substantially improve on the performance of the approach we discussed above. There are a few ways to think about the smartness, but the one I like most (for pedagogical reasons) starts by re-writing our code above to use intersect rather than contains:

let mut triangles = 0;
for x0 in 0 .. graph.nodes() {
	for x1 in graph.edges(x0) {
		// check all x2 at once ...
		let edges0 = graph.edges(x0);
		let edges1 = graph.edges(x1);
		triangles += intersect(edges0, edges1);
	}
}

How is this different from the version above? It depends on how we implement intersect. Here is one implementation that you might have though of, paralleling the code we wrote above.

// counts number of element of `a` also in `b`.
fn intersect1(a: &[u32], b: &[u32]) -> usize {
	let mut count = 0;
	for x in a {
		if b.contains(x) { count += 1; }
	}
	count
}

This intersect1 corresponds to the query plan where we first join edge(x0, x1) with edge(x0, x2) and then with edge(x1, x2), as the candidates x2 come from their relationship to x0. It takes time linear in the size of a, and if contains is fast then roughly independent of the size of b.

Here is another pretty similar implementation, nonetheless different:

// counts number of element of `b` also in `a`.
fn intersect2(a: &[u32], b: &[u32]) -> usize {
	let mut count = 0;
	for x in b {
		if a.contains(x) { count += 1; }
	}
	count
}

This intersect2 corresponds to the query plan where we first join edge(x0, x1) with edge(x1, x2) and then with edge(x0, x2), as the candidates x2 come from their relationship to x1. It takes time linear in the size of b, and if contains is fast then roughly independent of the size of a.

These two implementations vary only in which argument supplies the candidates and which filters the candidates, just like the two join orders we discussed. Each implementation takes an amount of time that depends mainly on how long their first (intersect1) or second (intersect2) arguments are. So which one should we use?

I mean, we don't really have to choose do we? We can just use the right one each time:

fn intersect(a: &[u32], b: &[u32]) -> usize {
	if a.len() < b.len() {
		intersect1(a, b);
	}
	else {
		intersect2(a, b);
	}
}

It turns out that this obviously hack-y change does something amazing. Using this "optimized" intersect takes the worst-case running time of the triangle finding computation down from O(edges^2) to O(edges^{3/2}). Boom, theory happened.

Ok slow down. What actually happened here?

Our new implementation no longer corresponds to a standard implementation of a relational join, in which we would have to pick two relations to pair up. Rather, we are choosing which relation proposes candidates and which validates them on a record-by-record basis. That is a pretty big departure from the standard relational approach, and it makes all the difference.

Let's run our modified implementation on livejournal

triangles: 946400853 in: Duration { secs: 40, nanos: 173807791 }

Compared to the run above this is ... one second faster. Not exactly the revolution we were promised. Let's dive a bit deeper and see what happened.

The first implementation considers each pair (x0, x1) and for each proposes degree(x0) candidates, which we computed up above as the sum of squared degrees. Our second implementation considers each pair (x0, x1) and for each proposes min(degree(x0), degree(x1)) candidates, which we can also compute:

candidates_1(livejournal) = 7,292,467,251
candidates_2(livejournal) = 3,251,016,553

This doesn't seem to be a massive reduction, and we've made the code a bit more complicated so perhaps .. sucks. Let's look at the numbers for twitter_rv

candidates_1(twitter_rv) = 243,932,072,881,466
candidates_2(twitter_rv) = 754,462,139,483

That is a much more substantial difference. Indeed, our estimated time of two weeks for the first approach actually runs in just about three and a half hours using the second approach.

triangles: 143011093363 in: Duration { secs: 12977, nanos: 592176810 }

The out-degrees of the livejournal graph just aren't as skewed as the twitter_rv graph, so smartly choosing between proposers doesn't help much (or at all, really). On the other hand, smartly choosing is pretty much mandatory for the twitter_rv graph.

Ok, cute hack. But does it apply to anything other than triangles? It does.

2. Worst-case optimal relational joins.

Some recent very nice research by Ngo et al shook up the world of relational joins, or at least my view of them. There is a lot of crazy math and stuff, but the short version is that the neat trick up above with triangles applies to any old relational join you might like.

When evaluating a join, one can repeatedly introduce attributes rather than repeatedly introducing relations, and thereby substantially improve the worst-case work required.

What does this mean? In the triangles example we started with x0 and x1 and introduced x2. We did this by thinking holistically about x2 rather than just by picking some relation that had an x2 in it. We acted using both edges(x0, x2) and edges(x1, x2) at the same time, rather than one after the other.

This idea generalizes to any relational query where we order the attributes, introduce values for each attribute by considering the candidates from each relation, and start from the relation proposing the smallest set of candidates.

To demonstrate the ideas more clearly, let's think about directed four-cliques: sets of four nodes where all six edges exist.

four(x0, x1, x2, x3) := edge(x0, x1), edge(x0, x2), edge(x0, x3),
                        edge(x1, x2), edge(x1, x3), edge(x2, x3)

This query is the same as the triangles query on the first three attributes: x0, x1, and x2.

The fourth attribute x3 is different and has three relations that propose candidates based on the current values of x0, x1, and x2. This isn't fundamentally more complicated than with triangles, which has two sets of candidates, we just need a new intersect method that takes any number of lists of proposals (rather than just two). Here is a hypothetical implementation:

// returns the intersection of the supplied relations.
fn intersect(mut candidates: &[&[u32]]) -> Vec<u32> {

	// order `candidates` by number of candidates.
	candidates.sort_by(|x,y| x.len().cmp(&y.len()));

	// start with smallest number of candidates, intersect up.
	let mut result = candidates[0].to_vec();
	for other in 1 .. candidates.len() {
		result.retain(|x| candidates[other].contains(x));
	}

	result
}

The important property Ngo et al need intersect to have is that it should take time proportional to the smallest of the candidate sets. We do this by finding the smallest set, and then using it to drive intersection tests with the other sets of candidates. It is important that contains runs efficiently; in the Ngo et al paper they were cool with logarithmic, which works for me too.

There are even smarter ways of doing this, like what the LogicBlox folks did in "leapfrog join", where you repeatedly advance the smallest of the heads of each of the lists, as long as some head is larger.

Now with this intersect at hand, we can write logic to count the number of directed four-cliques, which looks a lot like the code we used for directed triangles just with another nested loop, as:

let mut cliques = 0;
for x0 in 0 .. graph.nodes {
	for x1 in graph.edges(x0) {
		let edges0 = graph.edges(x0);
		let edges1 = graph.edges(x1);
		for x2 in intersect(&[edges0, edges1]) {
			let edges2 = graph.edges(x2);
			for x3 in intersect(&[edges0, edges1, edges2]) {
				cliques += 1;
			}
		}
	}
}

Each time we introduced a new attribute, here just x2 and x3, we took each of the relations which constrain the new attribute based on existing attributes, looked up their candidate sets, and passed them as arguments to intersect. As long as intersect is smart and uses the smallest candidate set first, we get Ngo et al's sweet mathematical guarantees (time bounded by the most four-cliques there could be with as many edges as we have).

General graph motifs

Up until this point we've been using motifs with some pleasant properties that made our life easy, without bringing these properties to light. Let's change the four query slightly and see how we might need to do a little more work for general queries:

four(x0, x1, x2, x3) :=               edge(x0, x2), edge(x0, x3),
                        edge(x1, x2), edge(x1, x3), edge(x2, x3)

This is nearly identical to the four-cliques query, but we have removed the edge(x0, x1) edge. This .. complicates our "algorithm" as written above, because we can no longer just say "for each x0, for each x1" unless we want to consider all x1 for each x0, and we don't. At the same time this is a totally legit graph motif / relational join, so how can we sort this out?

There are two things we need to be able to do to deal with general motifs:

  1. We may need to re-order the variables.

    The variable x1 isn't constrained by x0, so we can't just add it after x0 despite its name suggesting it should come next. Only the variables x2 or x3 are constrained by x0, and we will want to add one of them next, after which we could add either of the others.

    If the motif is connected then we can always order the variables so that when each is introduced it is constrained by at least one prior variable, using any graph traversal (e.g. DFS or BFS). If the motif isn't connected, we should instead find instances of its connected components and take their cross products.

  2. We may need to index edges by destination.

    When we add x1 it is constrained by x2 and x3, but the constraint is not that both of them have an edge to x1, but rather that there is an edge from x1 to each of them. This means that to get a list of candidates from x2 and x3, we will need an index on the second element of the edge relation. The constraint on a prior variable may be in either the source or target position, and we need to index the relation in both directions (unless we know we don't need one direction).

    Each relation may need to be indexed by any proper non-empty subset of its attributes. In the case of graphs this is pretty easy, because there are just two coordinates, and so two ways to index the graph. More generally we may need to build more indices.

What's a better way to do things than simply speculating about what we might need to do? Actually doing it. Then we can see: does it work?

Re-ordering variables

Let's imagine that the query is presented to us as a list description of pairs of attribute identifiers (i,j) each indicating the constraint edges(x_i, x_j), as well as the index start of the attribute we'd like to start from. To order the variables we just need to repeatedly look at pairs in description and add any un-added variables once they become constrained:

// orders attributes from `start` so that each is constrained by a prior attribute.
fn order_attributes(start: usize, description: &[(usize, usize)]) -> Vec<usize> {

    let mut order = vec![start];
    let mut done = false;
    while !done {
        done = true;
        for &(src, dst) in description {
            if order.contains(&src) && !order.contains(&dst) {
                order.push(dst);
                done = false;
            }
            if order.contains(&dst) && !order.contains(&src) {
                order.push(src);
                done = false;
            }
        }
    }

    order
}

This allows us to re-map the attributes to their position in this list, so that we may safely just add them in order of the identifiers, 0 ... For what follows, let's imagine we've done that.

Planning the query

Our next step is to take our list description and turn it in to a list of constraints on each attribute, so that we can just read out which prior attributes to use to propose candidates (and whether they should use the forward or reverse index to do so). We will return this as list (one entry for each attribute) of a list of pairs of (i) attribute identifiers and (ii) bools which indicate forward index or not (else, the reverse index).

// indicates for each attribute which prior attributes constrain it, and how.
fn plan_query(description: &[(usize, usize)]) -> Vec<Vec<(usize, bool)>> {

	let mut plan = vec![];

	// for each attribute, determine relations constraining that attribute.
	for attribute in 1 .. {
		let mut constraints = vec![];
		for &(src, dst) in description {
			if src == attribute && dst < attribute {
				constraints.push((dst, false));
			}
			if dst == attribute && src < attribute {
				constraints.push((src, true));
			}
		}
		// add the constraints; return when done.
		if constraints.len() > 0 {
			plan.push(constraints);
		}
		else {
			return plan;
		}
	}
}

This is great! Now we have a "query plan", which tells us as we extend some binding of a prefix of the attributes, which of these bindings we need to use to produce candidates for the subsequent attributes. This is similar to a traditional relational query plan but different in some important ways, for example it will let us compute directed triangles in less than two weeks.

Suturing things together

Let's put these bits of code together into a general motif finding framework. We will start by planning out the query (step 1). We then loop through each value for x0 to initialize our working set bindings of solutions (step 2). Finally, and the hardest part, for each attribute we extend each element of our working set of solutions by looking up candidate extension sets and intersecting them (step 3).

// reports all bindings of node ids to variables in `description`.
fn motifs(forward: &[&[u32]], reverse: &[&[u32]], description: &[(usize, usize)]) -> Vec<Vec<u32>> {

	// 1. determine how to extend each attribute
	let plan = plan_query(description);

	// 2. start things out with `x0`
	let mut bindings = vec![];
	for x0 in 0 .. graph.len() {
		bindings.push(vec![x0]);
	}

	// 3. extend each attribute (starting at `x2`).
	for constraints in plan {
		// assemble extended bindings
		let mut new_bindings = vec![];
		for binding in &bindings {

			// a. assemble candidate extensions
		 	let mut candidates = vec![];
		 	for (idx, is_forward) in &constraints {
		 		if is_forward {
			 		candidates.push(forward[binding[idx]]);
			 	}
			 	else {
			 		candidates.push(reverse[binding[idx]]);
			 	}
		 	}

		 	// b. intersect candidates to find valid extensions
		 	let extensions = intersect(&candidates);

		 	// c. create an extended binding for each extension.
		 	for extension in extensions {
		 		let mut clone = binding.clone();
		 		clone.push(extension)
		 		new_bindings.push(clone);
		 	}
		}
	}
}

That is a bit of an eye-full, but I hope the first two steps at least should be clear.

The third step proceeds through each of the attributes, and takes each of the proposed bindings to the attributes so far and tries to extend them. The extension process finds all of the candidates indicated by the plan (step 3a) and uses our intersect method to find their intersection (step 3b). For each extension, we create a totally new and independent binding and stash it in our working list (step 3c).

General relational joins

We also took advantage of the fact that graphs have only two directions to index: forward and reverse. More general relations may require more indices based on the order of attributes. For example:

query(x0, x1, x2, x3) := edge(x0, x1), edge(x1, x2), edge(x2, x3),
                         other(x0, x1, x2), other(x3, x2, x1)

When we get around to extending x2 the other relation needs to be indexed by its first two coordinates for the first use, and by the third coordinate for the second use. When we extend x3 we need it indexed by its last two coordinates. If we had re-labeled some variables, we could switch these requirements around however we like. The other relation can be indexed six different ways (each attribute and pair of attributes), although we may not need so many. This isn't too different than relational queries, which also keep multiple indices of relations if you use multiple fields as keys, except that the traditional approach needs at most one index per use of a relation, whereas we might revisit the relation multiple times (e.g. our three indices for other while only using it twice).

Measurements

The code as written up above produces all of the extensions for each attribute in turn, so we don't actually see any results until we are done. Even for directed four-cliques, the most constrained motif on four attributes, the number of results are pretty epic and the time correspondingly large. That .. sucks for trying to figure out whether we are doing a good job or even computing the correct answer or anything like that. So we are going to put that off for the moment.

We should probably change the program somehow so that we produce results in a more continuous fashion...

3. Streaming, data-parallel on static data.

In this section we will re-organize how we write our motif finding programs to both (i) produce motifs in a continuous manner, and (ii) to partition responsibility for examining the data across multiple workers.

To achieve these changes we will start using timely dataflow, which is great but not strictly necessary. However, once you start working with streaming inputs and data-parallelism your program is sufficiently different from a 20-30 code line snippet that it helps to take advantage of frameworks which hide some of the complexity. Not all frameworks are equally good, and I will try and point out reasons why towards the end.

Organizing computation

We now have two types of computation for finding motifs. When we started, with triangles and four-cliques, we wrote programs like so:

for x0 in 0 .. {
    for x1 in graph.edges(x0) {
    	for x2 in intersect( ... )
	    	for x3 in intersect( ... ) {
	    		// do something
	    	}
	    }
	}
}

We then moved to a more general motif finding framework where the number of attributes is not statically known, and re-organized the code like so:

let mut bindings = vec![];
for attr in 0 .. attributes {
	for binding in bindings {
		// extend `binding` with x_attr values
	}
}

Our first approach walks sequentially through each of the candidates, and keeps a quite small amount of information on the stack: current x0, x1, x2, x3 values, and our progress through each's intersect iterators. Our second approach performs all of the work for each attribute before advancing to the next, and requires a relatively large amount of intermediate storage. This change seemed necessary in order to use a data-dependent number of attributes, short of doing code generation, but it does change the behavior of our program.

These two approaches compute the same thing, and really do the same elemental computations, but just in a different order. They are roughly analogous to exploring the set of motifs depth-first (the first approach) and breadth first (the second approach). Our transition to "streaming" and "parallel" is going to have some of the same flavor: doing the same computations, just in different (or in some cases unspecified) orders.

Data-parallel dataflow programming is largely about expressing your program to reveal these opportunities to re-order computation.

Streaming

In the first program above, we have an outer loop over the values of x0. We process all x0 bindings with some value before advancing to any of the bindings with the next value. Moveover, the processing of each value is independent from that of other values. We could stop working on any one binding and start working on another, as we see fit.

One way to express this flexibility is through dataflow, a style of programming in which availability of data drive the computation. All bits of code express what should happen when data arrive, and these bits of code are connected by queues of data. Computation happens as the data "flow" through the directed graph. However, the program does not specify which bits of code run when, which gives a system the flexibility to start and stop bits of code as it deeps appropriate (for example, some code produces too much data and we need to run the consumer).

Let's try to rewrite our simplest triangles computation using dataflow. Recall that the program looks something like

// for each (x0, x1) pair, find and intersect candidates
for (x0, x1) in graph.edges() {
	let candidates0 = forward(x0);
	let candidates1 = forward(x1);
	for x2 in intersect(candidates0, candidates1) {
		// do something ...
	}
}

We can re-imagine this computation as a stream of (x0, x1) values, where each time we receive a new pair we need to perform some more work. Let's sketch the computation using timely dataflow streams, and its built-in operator map which applies a one-to-one function independently on records.

let edges = graph.edges();

// for each (x0, x1) pair, find and intersect candidates
let result = edges.map(|(x0, x1)| {
	let candidates0 = forward(x0);
	let candidates1 = forward(x1);
	// return their intersection, as data
	(x0, x1, intersect(candidates0, candidates1))
});

result.map(|(x0, x1, x2s)| {
	for x2 in x2s {
		// do something ...
	}
});

This change seems relatively minor. We have taken the body of the for loop and put it in the map operator. However, we have made a strong statement to the timely dataflow runtime that it has the flexibility to run the computation as it sees fit. For example, it could pick up batches of 1,000 pairs and do the work for each, retiring the pairs with a bounded memory footprint. Timely dataflow is also able to start up multiple worker threads and have them produce triangles concurrently.

The code above does well at breaking down the work of triangle computation into smaller pieces, which can result in more efficient execution. However, it still has some short-comings we will want to fix, whose improvement will let us scale to larger graphs and support graphs whose edges come and go.

Data Parallelism

Data parallelism is used to describe techniques that process disjoint sets of data, allowing them to both read and write concurrently without complicated coordination. Our dataflow program above is data-parallel in the sense that each edge is processed independently, but not in the sense that each worker requires full access to the forward index (and would require the full reverse index had we used it).

The most common way to achieve data-parallelism is by upgrading the data queues between dataflow operators so that they "exchange" data rather than just enqueue it. Where one worker may push some data into the queue, another worker may be the one that recovers it, depending on the data. Most commonly, one supplies a function that is applied to each record, whose results indicate which worker should receive the record. These exchange queues allow us to route data to specific workers so that each can avoid maintaining redundant copies of, for example, the forward index.

This sound like fun. Let's use timely dataflow's exchange operator to shuffle the edges!

This is a bit harder than it sounds. If we hope to partition the data of forward so that the neighbors of each vertex live at exactly one worker, then there is no routing function that lets us route (x0, x1) to one specific worker. The two values of x0 and x1 may have their data at separate workers, which means .. is data-parallelism not going to work?

Of course it will work! We just need to reduce our computation even further. Rather than fully processing (x0, x1) in a single map statement, which we will find difficult due to the partitioning of forward data, we can break it into two steps: first look up the edges of x0 (by exchanging keyed on x0) and second look up the edges of x1 (by exchanging keyed on x1).

let stream = graph.edges();

// exchange (x0, x1) pairs to look up x0 candidates.
let stage0 = stream.exchange(|&(x0, _)| x0 % num_workers)
                   .map(|(x0, x1)| (x0, x1, forward(x0)));

// exchange (x0, x1, c0) pairs to look up x1 candidates.
let stage1 = stream.exchange(|&(_, x1, _)| x1 % num_workers)
                   .map(|(x0, x1, c0)| (x0, x1, c0, forward(x1)));

// intersect candidates
let result = stage1.map(|(x0, x1, c0, c1)|
	intersect(candidates0, candidates1).map(|x2| (x0, x1, x2))
)
result.map(|(x0, x1, x2)| /* do something ... */);

This code exchanges (x0, x1) pairs using x0 as a key and then looks up forward(x0). The code adds these edges as data and exchanges the result using x1 as a key, then looking up and intersecting with forward(x1). The code has the property that if there were multiple workers, each worker only has to look up keys in forward with the same modulus as the worker's index. This partitions the use of forward between all of the workers: each worker maintains a roughly 1/num_workers fraction of the index.

This is nice. However, it still has an issue. We were very careful to write intersect so that it took time proportional to the smaller candidate set, which it did by looking up each set and checking out their lengths before walking through either. In the code above we grab the candidates for x0 and then ship all of them to another worker. If this were just a pointer and length, no harm right?* On the other hand, if the data need to go to a worker on a remote machine we would need to serialize the data, and may have just performed enough work to screw up the mathematical properties we relied on.

*: Even if it is just a pointer and a length handed to a worker on the same machine, Rust is going to (rightly) flip out and complain that you have no reason to believe that the memory will still be valid when the other worker gets around to processing it. You somehow need to prove that what you are communicating will be valid when received, which typically involves one of (i) copying it, (ii) some reference counting, (iii) lying and hoping that you don't get caught out by something you didn't understand. As someone who works with data-driven programming I really like Rust's commitment to ownership, as it makes it very clear when you are attempting something naughty and you feel relatively safe that your code behaves as written, once it builds at all.

Doing it right

Let's recall what we actually wanted our computer to do with the candidate sets. Although we get handles to the candidate sets earlier, we are really interested in: first the length of each, and second the contents of the shorter. We should make that explicit, rather than a fortunate consequence of how our program was interpreted (e.g. that forward(x) only returns a pointer and length).

let edges = graph.edges();

// first determine the sizes of `forward(x0)` and `forward(x1)`.
let sizes0 = edges.exchange(|&(x0, _)| x0 % num_workers)
                  .map(|(x0, x1)| (x0, x1, forward(x0).len()));

let sizes1 = edges.exchange(|&(x0, _, _)| x1 % num_workers)
                  .map(|(x0, x1, s0)| (x0, x1, s0, forward(x1).len()));

// partition the data based on which attribute will propose.
let route0 = sizes1.filter(|&(_, _, s0, s1)| s0 < s1);
let route1 = sizes1.filter(|&(_, _, s0, s1)| s0 >= s1);

// handle route0 records by x0 proposing then x1 intersecting
let stage00 = route0.map(|(x0, x1, _, _)| (x0, x1));
					.exchange(|&(x0, _)| x0 % num_workers)
                    .map(|(x0, x1)| (x0, x1, forward(x0)))
                    .exchange(|&(_, x1, _)| x1 % num_workers)
                    .map(|(x0, x1, c0)| (x0, x1, intersect(c0, forward(x1))));

// handle route1 records by x1 proposing then x0 intersecting
let stage10 = route1.map(|(x0, x1, _, _)| (x0, x1));
					.exchange(|&(x0, _)| x1 % num_workers)
                    .map(|(x0, x1)| (x0, x1, forward(x1)))
                    .exchange(|&(_, x1, _)| x0 % num_workers)
                    .map(|(x0, x1, c1)| (x0, x1, intersect(c1, forward(x0))));

// stitch the two routes back together.
let result = stage01.concat(stage11).flat_map(|(x0, x1, x2s)|
	x2s.map(|x2| (x0, x1, x2))
)
result.map(|(x0, x1, x2)| /* do something ... */);

This may look pretty beastly, but it is what we want our computer to do, right?

Some comments

Let me take a moment to try and argue that writing your programs like this, perhaps not exactly in this language with this many symbols, is a healthy thing to do. Even with just one worker and no parallelism, we have explained the available concurrency: opportunities to re-order how things are run. This flexibility can have some serious potential upsides. Here are some examples:

  1. Our vanilla triangle finding code decided which of x0 and x1 it should use to propose candidates as it goes, meaning there is a conditional in the middle of the main loop. We are able to separate edges using x0 from edges using x1, and run code fragments without a conditional test in their heart. In each of these fragments we are now doing the same thing each iteration, which makes caching and re-using results a great deal simpler. To the extent that branch mis-prediction was hurting pre-fetching that should be improved now too.

  2. Each batch of requests for forward(x0) and forward(x1) can be re-ordered to improve locality. We can get "more sequential" memory access rather than accesses all over the place, which can really help. In the surprisingly common case where we have many of the same values for x0 and x1, for example we are processing all edges to or from one node, then putting these requests in sequence allows us to re-use their results (we do so explicitly in the fully dynamic case).

  3. The amount of work we want to process at a time (batch size) may change with the data. Some times we may want to grab 1,000 edges and work with them, for the reasons above, and some times 1,000 edges may result in 1,000,000,000 triangles and so we'd like to work with slightly fewer. We have the ability to modulate the amount of work we do in each block of code, rather than committing to either no batching (very first approach) or batch everything (second approach we wrote).

I think it is great for us to have explained all of these aspects of our program, though it is a different question whether a system can meaningfully exploit this information. Also, could we have done it with fewer symbols, maybe?

Trying out triangles

There is a handy data-parallel dataflow implementation that has been up for a while, at frankmcsherry/dataflow-join. We can use it with our livejournal and twitter_rv graph datasets to find directed triangles (or four-cliques, five-cliques, etc-cliques). It isn't written to do general motifs, but we will get there in the next section. The program takes a forward index (as a file) and a batch size, and introduces all nodes in batches of this size.

Recall from above that the single threaded version finds all directed triangles in livejournal in about 40 seconds. Let's just run the dataflow-join example program triangles on livejournal using 1,000 nodes at a time.

Echidnatron% cargo run --release --example triangles -- ~/Projects/Datasets/soc-LiveJournal1 inspect 1000
     Running `target/release/examples/triangles /Users/mcsherry/Projects/Datasets/soc-LiveJournal1 inspect 1000`
...
count: 946400853 @ Duration { secs: 66, nanos: 781511131 }
Echidnatron%

The number is correct! It printed a great deal of other things too, over 4,800 batches. Let's look at it again without all the printing (which also disables the counting, which is where some of the time is spent):

Echidnatron% time cargo run --release --example triangles -- ~/Projects/Datasets/soc-LiveJournal1 no 1000
     Running `target/release/examples/triangles /Users/mcsherry/Projects/Datasets/soc-LiveJournal1 no 1000`
cargo run --release --example triangles --  no 1000  54.42s user 2.61s system 100% cpu 57.024 total
Echidnatron%

That is 57 seconds; not quite 40 seconds yet. That makes some sense, because we are doing more things than just blasting through all triangles with a few compiled loops (we are buffering things up, moving control around between the operators, and other horrible things). I made some funny sounds just above about how life would automatically be better by writing your program this way, and apparently that wasn't exactly true as written.

Let's add a second worker, because we did parallelism for a reason.

Echidnatron% time cargo run --release --example triangles -- ~/Projects/Datasets/soc-LiveJournal1 no 1000 -w2
     Running `target/release/examples/triangles /Users/mcsherry/Projects/Datasets/soc-LiveJournal1 no 1000 -w2`
cargo run --release --example triangles --  no 1000 -w2  59.63s user 2.38s system 196% cpu 31.481 total
Echidnatron%

Ok great, 31.5 seconds. Nearly twice as fast. The COST here is just two threads, and by a pretty safe margin.

What about four workers? I only have two cores, but with the random accesses into the indices, hyper-threading could help.

Echidnatron% time cargo run --release --example triangles -- ~/Projects/Datasets/soc-LiveJournal1 no 1000 -w4
     Running `target/release/examples/triangles /Users/mcsherry/Projects/Datasets/soc-LiveJournal1 no 1000 -w4`
cargo run --release --example triangles --  no 1000 -w4  85.55s user 2.54s system 369% cpu 23.816 total
Echidnatron%

Down to 23.8 seconds. Not exactly linear scaling, but we didn't really add twice the resources either.


I should come clean and say that scaling in this sort of computation is a real beast of a problem. We are used to thinking about problems like pagerank and wordcount, where the amount of work for each worker is pretty easily understood (proportional to the number of edges / words each worker manages). Motif finding, even triangles, can have weirdly unfair skew: high degree nodes participate in many triangles, and in the case of our algorithm end up doing lots of intersect computations because they lose all the "candidate proposal" fights due to their long lengths. We do less work over all, but it may tend to occur where the highest degree vertices live.

Just because something is written as data-parallel dataflow doesn't mean it will have linear scaling. We may need to re-think the program to give it better scaling properties. We have done a bit of work on this, and I'll get to it later on, but be aware that actual performance may or may not be greatly improved as the number of workers increases.


We can also do the measurements for twitter_rv, which originally produced (after 3.6 hours):

triangles: 143011093363 in: Duration { secs: 12977, nanos: 592176810 }

Here are the reports on one, two, and four threads:

** TODO: DO THESE, EVEN THOUGH THEY TAKE A WHILE **

Streaming?

The batch size argument controls how many x0 values our program introduces before it sits back and waits. This in turn controls how long we wait before we see any output, and to some degree how long the whole computation takes (small batches introduce overhead, but large batches risk running us out of memory). Let's re-examine the output from up above, where I replace slightly less of the output with ...:

Echidnatron% cargo run --release --example triangles -- ~/Projects/Datasets/soc-LiveJournal1 inspect 1000
     Running `target/release/examples/triangles /Users/mcsherry/Projects/Datasets/soc-LiveJournal1 inspect 1000`
count: 385726 @ Duration { secs: 0, nanos: 288497368 }
count: 785827 @ Duration { secs: 0, nanos: 363069351 }
count: 1071377 @ Duration { secs: 0, nanos: 437563966 }
count: 1369000 @ Duration { secs: 0, nanos: 532887637 }
count: 1743370 @ Duration { secs: 0, nanos: 590076839 }
count: 2032510 @ Duration { secs: 0, nanos: 684989135 }
count: 2607015 @ Duration { secs: 0, nanos: 759143628 }
count: 2957975 @ Duration { secs: 0, nanos: 834344081 }
count: 3348587 @ Duration { secs: 0, nanos: 889717561 }
count: 3635621 @ Duration { secs: 1, nanos: 51602924 }
count: 4230639 @ Duration { secs: 1, nanos: 139536392 }
count: 4682313 @ Duration { secs: 1, nanos: 202587083 }
...
count: 946400853 @ Duration { secs: 66, nanos: 781511131 }
Echidnatron%

Our computation produces accumulated triangle counts for each batch of 1,000 vertices. It is doing this without waiting for all of the input, but is nonetheless clear about the partial information it is showing us: exact accumulated counts for each batch of 1,000 vertices. Although not complete we can do whatever we like with this partial information, for example get excited about how the numbers look good.

We get the sense from these numbers that we can plow through 1,000 vertices in a fraction of a second, on one just thread. There are about 68 million edges in livejournal, which means that over the course of the 66 seconds above we are grinding through about one million (x0, x1) pairs per second, again with one thread. With two threads, in half the time, we process more than two million edges.

These numbers depend entirely on the structure of the graph, which determines how many triangles there are in each part of the graph. For example, the twitter_rv graph has some 1.5 billion edges and our vanilla code took three and a half hours, which is a less impressive rate of about 115,589 edges per second. Here is the beginning of our computation on twitter_rv, using batch sizes of one vertex at a time (a much smaller batch size!):

Echidnatron% cargo run --release --example triangles -- ~/Projects/Datasets/twitter_rv inspect 1
     Running `target/release/examples/triangles /Users/mcsherry/Projects/Datasets/twitter_rv inspect 1`
count: 18570438 @ Duration { secs: 12, nanos: 461604308 }
count: 45653891 @ Duration { secs: 25, nanos: 969997334 }
count: 45923382 @ Duration { secs: 26, nanos: 108039502 }
count: 46603571 @ Duration { secs: 26, nanos: 394974822 }
count: 46608327 @ Duration { secs: 26, nanos: 397154766 }
count: 46843428 @ Duration { secs: 26, nanos: 503820835 }
count: 47036670 @ Duration { secs: 26, nanos: 599466048 }
count: 89762911 @ Duration { secs: 48, nanos: 158951288 }
count: 90126309 @ Duration { secs: 48, nanos: 331441480 }
...

This looks (and is) a lot less fluid, in part because we are introducing data in larger bursts. The degrees in the twitter_rv data can be much larger (up to three million), and a single vertex can result in a great number of new triangles. The very first node, for example, appears as x0 in 18,570,438 directed triangles, the second node in 27,083,453 directed triangles. The third node appears as x0 in only 269,491 directed triangles, the discovery of which takes proportionately less time.


Remember that comment about skew, and how things may not go faster just because we wrote them in a distributed framework? This is totally demonstrated if we add a second worker to the setting just above:

Echidnatron% cargo run --release --example triangles -- ~/Projects/Datasets/twitter_rv inspect 1 -w2
     Running `target/release/examples/triangles /Users/mcsherry/Projects/Datasets/twitter_rv inspect 1 -w2`
count: 1939869 @ Duration { secs: 10, nanos: 766975396 }
count: 16630569 @ Duration { secs: 10, nanos: 768873286 }
count: 19238588 @ Duration { secs: 23, nanos: 667188763 }
count: 26415303 @ Duration { secs: 23, nanos: 670505219 }
count: 26454233 @ Duration { secs: 23, nanos: 803005841 }
count: 19469149 @ Duration { secs: 23, nanos: 803266431 }
count: 19570120 @ Duration { secs: 24, nanos: 54505172 }
count: 27033451 @ Duration { secs: 24, nanos: 54924556 }
...

There are two lines for each round (two workers) but the point is that things aren't going much faster. The first round took 12 seconds with one worker thread, and now takes 10 seconds. Well done.

If we look at the first two lines, the counts for the triangles of that first vertex produced by the two workers, we see a huge difference between the number of triangles produced at each worker:

count: 1939869 @ Duration { secs: 10, nanos: 766975396 }
count: 16630569 @ Duration { secs: 10, nanos: 768873286 }

This makes some sense. With a batch size of one, in each round there is only one value of x0. This means that roughly half of the work to do (related to x0 proposals) is pegged to one worker. If we think of x1 as evenly distributed across workers, we have one worker doing 75% of the work and the other worker doing 25%. More generally one worker out of n would do a 1/2 + 1/2n fraction of the work, and all other workers would do a 1/2n fraction of the work. That sucks!

And, in this case x0 had a stupidly high degree (it must be at least sqrt(18570438), right?) so that worker ends up doing most of the intersection work. The other worker will only do intersection work for neighbors with higher degrees (and only those it hosts). As it turns out, intersection is much more expensive than proposing (which ends up being a memcpy in this implementation). The only positive is that because "streaming" the system can overlap one worker proposing with the other worker intersecting, an instance of a third form of parallelism called "pipeline parallelism".

The numbers get a bit better if we increase the batch size:

Echidnatron% cargo run --release --example triangles -- ~/Projects/Datasets/twitter_rv inspect 2 -w2
     Running `target/release/examples/triangles /Users/mcsherry/Projects/Datasets/twitter_rv inspect 2 -w2`
count: 19238588 @ Duration { secs: 18, nanos: 421999694 }
count: 26415303 @ Duration { secs: 18, nanos: 424508728 }
count: 19570120 @ Duration { secs: 18, nanos: 691085756 }
count: 27033451 @ Duration { secs: 18, nanos: 691177402 }
...

The 26 seconds for one worker to finish the first two rounds, which dropped to only 23 seconds with batch size one, now drops to 18 seconds. It's still not close to perfect, with smaller skew between reported triangle counts still clearly present. We will probably have to change the algorithm if we really want to get decent scaling properties.

Skew is a much different beast in streaming computations than in batch computations, because you have much shorter timescales over which to hide any imbalances.


Implementation details

I have talked through how the code we just evaluated is implemented before, and it has not changed much since then. Nonetheless, it is worth reviewing, because we are going to be tweaking it to get our fully dynamic motif tracking off the ground.

The rough structure of the code follows the lines of our second "more-general" single-threaded approach, applied to the dataflow pattern of "count first, then propose, then intersect" we used for triangles counting. For each attribute, starting from a stream of bindings to prior attributes, we build the dataflow graph that elicits counts of candidate set sizes, partitions the bindings by the index that should propose the candidates, and the intersects each using the other indices.

Here is the actual code used to extend a binding with an additional attribute. This method adds takes a stream of "prefixes" of type P and turns each into a pair (P, Vec<E>) of the prefix and its "extensions": valid settings of the next attribute. It uses a trait StreamPrefixExtender that wraps some methods (count, propose, and intersect) lifted to streams of bindings, which is how we wrap up the constraints on the extensions in terms of the data in each prefix.

// A layer of GenericJoin, in which a collection of prefixes are extended by one attribute
impl<G: Scope, P: Data> GenericJoinExt<G, P> for Stream<G, P> {
    fn extend<E: Data>(self, extenders: Vec<&StreamPrefixExtender<G, Prefix=P, Extension=E>>)
        -> Stream<G, (P, Vec<E>)> {

        // for each prefix, determine extender with fewest extensions.
        let mut counts = self.map(|p| (p, 1 << 31, 0));
        for (index, extender) in extenders.iter().enumerate() {
            counts = extender.count(counts, index as u64);
        }

        // partition prefixes by the index of smallest extender.
        let parts = counts.partition(extenders.len() as u64, |(p, _, i)| (i, p));

        let mut results = Vec::new();
        for (index, nominations) in parts.into_iter().enumerate() {

        	// propose extensions, and then validate against other extenders.
            let mut extensions = extenders[index].propose(nominations);
            for other in (0..extenders.len()).filter(|&x| x != index) {
                extensions = extenders[other].intersect(extensions);
            }

            results.push(extensions);    // save extensions
        }

        self.scope().concatenate(results)
    }
}

More concretely, the code first tracks for each prefix a triple (prefix, count, index) of the smallest count for each prefix and the index of the corresponding extender. These values are initially junk, with a badly chosen large count (shush!), but by passing through each extender's count method they are improved to track the smallest values. The result is the stream of (prefix, count, index) triples containing the smallest counts and corresponding indices.

As in our triangles example, this stream is partitioned by the index identifying the constraint with the smallest candidate set size. For each of these parts, the extender proposes candidates in propose, and we pass the lists of candidates through the other relations in intersect. We then concatenate everything back together and return the result.

Each application of this method, extend, produces a stream of elements of type (P, Vec<E>): pairs of prefixes and each of the extensions they yielded. If we flatten these down into say pairs (P, E) we can repeat the process and produce more complicated motifs.


Here is the first half of triangles.rs where we build up a dataflow graph that extends initial x0 settings twice: first to pairs (x0, x1) and then to triples (x0, x1, x2). It uses as extenders pairs of the forward index graph and functions to indicate which field of the prefix they want to use as a key. I've trimmed out some argument parsing and output printing and stuff like that.

fn main () {
    timely::execute_from_args(std::env::args(), |root| {
        let graph = Rc::new(GraphMMap::<u32>::new(&filename));
        let (mut input, probe) = root.scoped(|builder| {

        	// exogenous input of `x0` variables.
            let (input, nodes) = builder.new_input::<u32>();

            // extend to pairs, then to triangles, perhaps beyond ...
            let pairs = nodes.extend(vec![&graph.extend_using(|&a| a as u64)]);
            let trigs = pairs.flat_map(|(p, es)| es.into_iter().map(move |e| (p, e)))
                             .extend(vec![&graph.extend_using(|&(a,_)| a as u64),
                                          &graph.extend_using(|&(_,b)| b as u64)]);

            // // four cliques?
            // let quads = trigs.flat_map(|(p,es)| es.into_iter().map(move |e| (p, e)))
            //                  .extend(vec![&graph.extend_using(|&((a,_),_)| a as u64),
            //                               &graph.extend_using(|&((_,b),_)| b as u64),
            //                               &graph.extend_using(|&((_,_),c)| c as u64)]);

            (input, trigs.probe().0)
        });

This starts with some standard (though non-obvious) timely dataflow boilerplate, where the important bits from our point of view are: We create a new input nodes that supplies a stream of x0 values. We then use our extend functionality to turn nodes into pairs, and pairs into trigs, and optionally trigs into quads. We return the other end of the nodes input as well as a probe at the end of trigs which will tell us whether there is still data in flight. We don't actually do anything with the data, but we do produce it (we count it in most examples, but I snipped that code out as unhelpful).

One sneaky thing occurs when we use the extend_using method. This is what takes an index, here the forward edge index, and a function to extract a field from whatever tuples are flying by. For example, when we have pairs (x0, x1) the code

    .extend(vec![&graph.extend_using(|&(a,_)| a as u64),
                 &graph.extend_using(|&(_,b)| b as u64)]);

puts together two extenders, both using the graph index, but varying in which field each uses in order to produce candidates (respectively x0 and x1). For some reason they need to be u64s. Anyhow, this is handy in that it lets us define the index once and re-use it throughout the computation, rather than creating a new copy of the index each time we need a new instance of it (three times in this example). This is going to be a big deal when we get to the fully dynamic case, where things get a bit more complicated.

Having defined the computation, we run it by repeatedly introducing step_size many nodes identifiers and then running the computation until it quiets down:

        for node in 0 .. (graph.nodes() - 1) {
        	if root.index() == 0 {
	        	input.send(candidate as u32);
	        }
        	if (node % step_size) == step_size - 1 {
	            input.advance_to(round as u64 + 1);
	            root.step_while(|| probe.lt(&input.time()));
	        }
	    }
    }).unwrap();
}

This has worker 0 introduce all the identifiers (we only want one instance of each node id, not one from each worker). Every step_size steps we advance the input epoch and wait for records to drain. Timely also automatically does this for us when it shuts down, so we don't need a second copy of that code outside the loop.

That's pretty much it. It's not wildly complicated, but will certainly get a bit more so when we start trying to change the contents of forward, which we will do in the very next post!

Differences between frameworks

I promised I would say something about different data-parallel frameworks, and while I'm generally opposed to making limiting statements about other folks (they might have improved while I was napping!), there are some qualitative differences between approaches that make each more or less suitable than the others.

Batch processors

Systems like Hadoop and Spark operate in batch. You point them at a large pile of input data, describe what you'd like done, and wait for them to finish up. This isn't so bad if the computation either (i) reduces the input data or (ii) does work linear in the input data. When you have a computation like motif finding, where the output can be quadratic or beyond in the size of the input, these systems start to fall over.

These systems generally assume that it is safe to materialize intermediate state, because .. I'm not sure. Because that is how word count works, I guess. As we saw way back when, triangles on twitter_rv considers some 754,462,139,483 candidate triples. That could be as large as

  754,462,139,483 x 12 bytes
= 9,053,545,673,796 bytes

Nine terabytes just to write down the candidates to visit.

And yet, this ran just fine on my laptop, using at most a few GB of memory.

There is a serious difference between batch processors and stream processors on computations that can produce and retire work without synchronizing. It is much, much, much faster to produce and consume data without it leaving your processor core or cache than to spill it all to main memory or disk. Streaming data across the network (and consuming it) is much more likely to happen successfull than accumulating all of the data in local memory first, especially in the multi-terabyte range. The flexibility of stream processing can make your computations much faster, and much less likely to fall over due to resource exhaustion.

This is why you should be considering stream processors rather than batch processors.

Stream processors

The good news is that there are stream processors. Apache Flink is a good example. Spark Streaming is a less-good example. Timely dataflow is a !@#$ing amazing example. Each of these is slightly different from the other. Now isn't the right time to get in to the differences between these systems, because (i) I'm not really an expert on most of them (not having infinity machines to run the JVM-based approaches), and (ii) they each have their own thing they try to excel at. I will say that something like motif finding, with its non-trivial dataflow and where the sizes of intermediate data vary amazingly, is probably a good test to separate the stream processors from the mini-batch processor.

4. Streaming, data-parallel on dynamic data.

And now, on to the main event: tracking motifs in a changing graph.

Specifically, we are going to tackle the question:

Given an initial graph and a sequence of sets of edge changes (additions, deletions), what is the corresponding sequence of sets of changes (additions, deletions) in the instances of a given motif in the graph?

Rather than wax philosophical about this, let's just look at the answer.

Delta-queries

In the relational database world, folks update relational joins in response to changes in their inputs by forming the "delta query": the relational join describing changes in the output relation in terms of a query over the input relations, and the relation containing the changes.

Recall the triangles query, which we write

triangles(x0, x1, x2) := edges(x0, x1), edges(x0, x2), edges(x1, x2)

Imagine for the moment that we've rewritten this using three different relation names, each of which will eventually be instantiated as edges:

triangles(x0, x1, x2) := A(x0, x1), B(x0, x2), C(x1, x2)

Why do this? We can now examine the change in the output relation triangles as a function of each of the relations A, B, and C independently. For example, if relation A changes, perhaps by the tuples in a relation dA, the change to triangles equals

dTdA(x0, x1, x2) := dA(x0, x1), B(x0, x2), C(x1, x2)

How did we determine this? Just by using the distributive property of the relational algebra.

  (A + dA) x B x C
= (A x B x C) + (dA x B x C)

Amazing. We can do the same thing with B and C, forming delta queries

dTdB(x0, x1, x2) := A(x0, x1), dB(x0, x2), C(x1, x2)
dTdC(x0, x1, x2) := A(x0, x1), B(x0, x2), dC(x1, x2)

Now any changes to the input relations can be resolved by computing first dTdA then dTdB then dTdC.

Important: In the case of graph motifs A, B, and C are all instantiated by the same relation, edges, and so they always receive updates at the same time. For the delta queries to be correct, we must imagine the relations A, B, C to be updated in sequence, and use the old value of B in dTdA and the new value of B in dTdC.

Alternately, we could perform simultaneous updates and correct with the higher-order terms dA x dB, dA x dC, dB x dC, dA x dB x dC, but in more general motifs this ends up with an exponential increase in the number of terms. We will stick with the multi-versioned approach here.

Are we done? Streaming redux.

We now have queries describing the quantities we require (changes in triangles as a function of changes in the inputs). We can generalize these delta-queries to arbitrary motifs. We also have all this great pre-existing "worst-case optimal" join infrastructure. Can we just apply this infrastructure to our queries and announce success? Unfortunately, no.

The existing worst-case optimal join techniques permit pre-processing time linear in the size of the input relations. This makes sense if you are doing just one join query, because you might expect to need to read each tuple at least once. In our case the inputs to the delta query are a mix of small and large: dA is small but B and C may be large. We do not want to pay |A| + |B| + |C| for each update to the relations. We would much rather have a bound that looks like |dA| + |dB| + |dC|, plus the worst-case optimal "largest number of output tuples that could possibly be produced" term.

So we need to try a little harder.

The good news is that, based on the framework we've written so far, we just need to look a little harder. It mostly does the right things already. Other implementations of worst-case join computation may also mostly do the right things already, so the point is more that we need to pay attention to the details, not so much that we must actively reject other approaches (ideally they will tell us if their infrastructure accommodates this approach).

Looking harder

Recall how we perform triangle computation (and motifs generally) in dataflow: we start from a stream of initial (x0, x1) bindings, and for each binding consider how to extend each of the attributes x2, x3, .. in turn.

Each (x0, x1) pair we introduce produces all output bindings that derive from the (x0, x1) pair. It is, in essence, computing the delta query for the edges(x0, x1) relation. However, it is computing this delta query only for this first relation, and with all other relations (e.g. edges(x0, x2) and edges(x1, x2)) fully loaded. If for each batch dA_t of (x0, x1) pairs the dataflow used intermediate relations forward_t and reverse_t, rather than static indices forward and reverse, it would compute

dTdA_t(x0, x1, x2) := dA_t(x0, x1), B_t(x0, x2), C_t(x1, x2)

Our implementation is already very nearly the delta-query processor that performs work linear in the input differences and worst-case output differences. We just need to generalize it to work against dynamic relations, to reflect the fact that we do not have the final relations for B and C (nor A) at hand, and make sure that it works for each of the delta queries.

Fixing things up

The main change we must perform is to support our dataflow operations count, propose, and intersect against a dynamically changing index. Moreover, to get correct answers out we will want the index to go through well defined versions that we can query against, rather than some asynchronously updated mess.

Fortunately, timely dataflow is awesome at maintaining an explicit understanding of time in a dataflow graph. Each circulated tuple has a logical timestamp, and each of our count, propose, and intersect operators can use this timestamp to look at the "right version" of the forward and reverse indices. We must still say a bit about how we write a multi-version graph index, but it isn't fundamentally complicated (we only need to do a few things well). The most important features are that timely dataflow is explicit about (i) which version of the index a record requires, and (ii) when no further records will require some version (allowing index compaction).

From the top, with more detail

To sketch our plan in a bit more detail, for a query

Q(x0, x1, ..) := A(x0, x1), .. R(xi, xj), ..
  1. Impose an arbitrary ordering on the input relations. Let's just take them in the order they are presented. Our computation will produce deltas as if the relations were updated in this order. The relations may name the same underlying source of data, e.g. edges, and we will need to make sure the updates happen in a sequence, rather than all at once.
  2. For each relation R, determine dataflow graph for dQdR starting from pairs (xi, xj) in dR. This dataflow graph must use the "new" version of indices for relations A preceding R, and the "old" version of indices for relations following R. This gives us the result we would get from updating the relations in sequence, even though some of the indices may be shared and so not be in an "old" or "new" state.
  3. Update our implementations of count, propose, and intersect to be mindful of times, both waiting until an index contains all updates for a time, and then interrogating the index with that time as an explicit argument.

Here is the update code for the count method, which takes mainly a stream of tuples containing the triple (prefix, count, index) as before, with a fourth i32 field indicating change (e.g. +2 or -1 in addition to the implicit +1 from before), and updates each tuple if the index it uses would propose fewer extensions for the prefix. The binary operator has a second input, which comes as an output from the index it uses; the operator uses the "progress" on this input to know whether the index has processed all of its updates for a given time and is ready to report correct counts for that time.

fn count(&self, prefixes: Stream<G, (Self::Prefix, u64, u64, i32)>, ident: u64)
-> Stream<G, (Self::Prefix, u64, u64, i32)> {

    let clone = self.helper.clone();
    let logic = self.helper.logic();
    let exch = Exchange::new(move |&(ref x,_,_,_)| (*logic)(x));

    let mut blocked = vec![];

    prefixes.binary_notify(&self.stream, exch, Pipeline, "Count", vec![],
        move |input1, input2, output, notificator| {

        // The logic in this operator should only be applied to data inputs at `time` once we are
        // certain that the second input has also advanced to `time`. The shared index `clone` is
        // only guaranteed to be up to date once that has happened. So, if we receive data inputs
        // for a time that has not also been achieved in the other input, we must delay the input.
        //
        // The same structure applies to `propose` and `intersect`, so these comments apply there too.

        // stash all (time, data) pairs into a temporary list, ignore data in the second input.
        input1.for_each(|time, data| blocked.push((time, data.take())));
        input2.for_each(|_,_| {});

        // scan each stashed element and see if it is time to process it.
		for &mut (ref time, ref mut data) in &mut blocked {

            // ok to process if no further updates less or equal to `time` on the second input.
			if !notificator.frontier(1).iter().any(|t| t.le(&time.time())) {

                // pop the data out of the list; we'll clean up the entry later.
                let mut data = data.take();
                (*clone).count(&mut data, &time.time(), ident);
                data.retain(|x| x.1 > 0);

                output.session(time).give_content(&mut data);
			}
		}

        // discard any data we processed up above.
        blocked.retain(|&(_, ref data)| data.len() > 0);
	})
}

This code block mostly does what it says, holding back blocks of data until their indicated time is ready for the index. Not shown is the code that maintains an index, which both pretends that it might send data for any incomplete time (as a warning to its consumers) and which awaits information about who still needs access to different versions so that it can compact its representation as appropriate.

The other operators are similar in spirit, but each do a few clever things. We will get back to them and their cleverness once we've explained the higher level points.

Multiversion graph indices

You could make this arbitrarily complicated, but we didn't. It seems to work fine.

pub struct Index<T> {
    edges: Vec<EdgeList>,
    diffs: Vec<(u32, u32, T, i32)>,
    peers: usize,
}

Our multi-version index keeps in edges for each node a list of "committed" updates that everyone should use (those with timestamp less than those of all in-flight tuples), and a list diffs of outstanding diffs that may be interesting depending on what timestamp you are interested in. It also tracks the number peers of other workers so that it knows what it should div indices by to densely index into edges. We could just have a HashMap instead, but we don't.

The entries of diffs are just (src, dst, time, delta), and are maintained sorted in that order. If you want to know something about src, we look up self.edges[src] and do a binary search into self.diffs.

The EdgeList struct contains the committed per-node state (neighbors) and looks like

struct EdgeList {
    pub edge_list: Vec<(u32, i32)>,
    pub elsewhere: u32,
    pub effort: u32,
}

The edge_list field keeps (dst, delta) updates in an at-most-logarithmic number of sorted ranges, concatenated, so that we can both quickly (i) do things like intersection testing (at most a logarithmic number of look-ups), and (ii) add few elements with low amortized overhead. The elsewhere field just tracks the number of diffs in diffs so that we can skip the search for something like count, which we do a lot. The effort field tracks the amount of work we have done with the entry, and once this gets large we may start merging the edge_list sorted ranges to simplify their representation.

New updates land in the diffs field of the index, and as times are determined to be complete get migrated to the end of the corresponding edges[src].edge_list sorted ranges. If there are enough updates we may merge some of the sorted ranges.

Generally speaking, all of the data for a node is found in at most two places: the corresponding edge_list array or in a contiguous section of the sorted diffs field of the index. All of the data for a (src, dst) pair is in at most a number of places logarithmic in the degree of src, which allows for relatively brisk intersection testing, especially against batches of candidates.

Overhead

The memory overhead for this representation initially worried me, but it seems not horrible. If we load up the graph in batches of one thousand nodes at a time, the livejournal graph ends up taking around 2.19GB (the process ends up at this in stable state). The graph has about 69 million edges, which means just about 32 bytes for each edge. This can be accounted as

  1. We use 4 bytes to represent the destination.
  2. We use an additional 4 bytes to represent the change in count.
  3. We index the graph twice (once in each direction).

This would give us 16 bytes per edge, and so we have about 2x lost due to slop: things like 24 bytes for each Vec not counting the data, allocations being bigger than we needed, fields used to describe the boundaries of the sorted ranges. We could certainly compact this, for example by observing that the largest oldest sorted range for each edge_list doesn't require counts (each entry should be a positive accumulation of weights). We could do even more, and you are totally welcome to try out other strategies and send some pull requests once the code is out.

Trying it out

I've taken the livejournal graph and permuted its edges randomly. We will load up some fraction (most) of the edges, and then add the remaining edges in batches of various sizes. The twitter_rv graph would be fun to play with, but with our overhead it clocks in at about 2x my laptop's memory. We will get you numbers using a larger machine.

There are 68,993,773 edges in the directed livejournal graph that I have. We will first load up the first 68,000,000 of these, in batches of 1,000,000 edges. We will then walk through the remaining edges as updates, in batches of 1,000. We should get about 994 lines of output indicating the numbers of new instances of the motifs.

Directed triangles

Let's start with the directed triangles query we have used so far.

Echidnatron% cargo run --release --example motif -- 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000000 1000 inspect
     Running `target/release/examples/motif 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000000 1000 inspect`
motif:	[(0, 1), (0, 2), (1, 2)]
filename:	"./soc-LiveJournal1.random.txt"
Duration { secs: 63, nanos: 399875208 }	[worker 0]	data loaded
Duration { secs: 63, nanos: 399912670 }	[worker 0]	indices merged
(Root, 70): [37695]
(Root, 71): [36745]
(Root, 72): [42000]
(Root, 73): [37232]
(Root, 74): [43110]
(Root, 75): [37599]
...
(Root, 1061): [37462]
(Root, 1062): [44505]
(Root, 1063): [30189]
elapsed: 86.67318760592025	total motifs at this process: 39772836
Echidnatron%

So, it takes 63 seconds to load up the first 68,000,000 edges, presented in random order. This includes parsing stuff from text, sorting using Rust's sort, and likely several other inefficiencies. It is a little over one million edges per second, which isn't great but works for now.

Once we get started, it takes 23 seconds to process the remaining nearly one million edges, in batches of one thousand edges at a time. This means about 23 milliseconds per batch, and about 1.7 million observed directed triangles per second.

Of course, this is just one thread. Let's try two.

Echidnatron% cargo run --release --example motif -- 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000000 1000 inspect -w2
     Running `target/release/examples/motif 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000000 1000 inspect -w2`
motif:	[(0, 1), (0, 2), (1, 2)]
filename:	"./soc-LiveJournal1.random.txt"
motif:	[(0, 1), (0, 2), (1, 2)]
filename:	"./soc-LiveJournal1.random.txt"
Duration { secs: 35, nanos: 540201480 }	[worker 0]	data loaded
Duration { secs: 35, nanos: 540201433 }	[worker 1]	data loaded
Duration { secs: 35, nanos: 540309928 }	[worker 1]	indices merged
Duration { secs: 35, nanos: 540321775 }	[worker 0]	indices merged
(Root, 70): [16492]
(Root, 70): [21203]
(Root, 71): [16693]
(Root, 71): [20052]
(Root, 72): [19135]
(Root, 72): [22865]
(Root, 73): [17004]
(Root, 73): [20228]
...
(Root, 1061): [17022]
(Root, 1061): [20440]
(Root, 1062): [22421]
(Root, 1062): [22084]
(Root, 1063): [13953]
(Root, 1063): [16236]
elapsed: 51.03766422404442	total motifs at this process: 39772836
Echidnatron%

The loading time has dropped from 63 seconds to 35 seconds, and the update processing time has dropped from 23 seconds to 16 seconds.

It turns out that a fair hunk of this time is in the counting up of motifs. If we ditch the inspect argument, which detatches the code that counts all the updates, we get improvements like

Echidnatron% cargo run --release --example motif -- 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000000 1000
     Running `target/release/examples/motif 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000000 1000`
...
elapsed: 80.91951578599401	total motifs at this process: 0
Echidnatron%
Echidnatron% cargo run --release --example motif -- 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000000 1000 -w2
     Running `target/release/examples/motif 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000000 1000 -w2`
...
elapsed: 45.1404357770225	total motifs at this process: 0
Echidnatron%

The loading times have stayed the same, but the update processing has dropped from 23 seconds to 17 seconds for one thread, and from 16 seconds to 10 seconds for two threads. That brings us to about 100,000 edge updates per second, and four million observed motifs per second.

Directed four-cliques

Let's move on to a slightly more interesting motif, a directed four clique. Recall that we write this motif like so:

quads(x0, x1, x2) := edges(x0, x1), edges(x0, x2), edges(x0, x3),
                     edges(x1, x2), edges(x1, x3), edges(x2, x3)

This is likely going to produce more results than triangles, but let's just run it and see

Echidnatron% cargo run --release --example motif -- 6 0 1 0 2 0 3 1 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68000000 1000000 1000 inspect
     Running `target/release/examples/motif 6 0 1 0 2 0 3 1 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68000000 1000000 1000 inspect`
motif:	[(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]
filename:	"./soc-LiveJournal1.random.txt"
Duration { secs: 66, nanos: 146885879 }	[worker 0]	data loaded
Duration { secs: 66, nanos: 146974141 }	[worker 0]	indices merged
(Root, 70): [5467256]
...
(Root, 1063): [3938957]
elapsed: 1899.4709296720102	total motifs at this process: 5676359086
Echidnatron%

Gaaah... that took forever. Like, half an hour. We can conclude that the rate at which we can process edge updates depends on the complexity of the query; we managed about 525 edges per second, compared with 100x that for triangles.

On the other hand, we did produce 5,676,359,086 motif instances, at a rate of about 3 million per second. That is an improvement on the number for the corresponding triangle experiment (about 1.7 million per second).

In the interest of showing you a variety of numbers, let's change the experiment so that we load 68,900,000 edges and then process the remaining 90,000-ish as updates, a 10x reduction in the number of updates.

Echidnatron% cargo run --release --example motif -- 6 0 1 0 2 0 3 1 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000 inspect
     Running `target/release/examples/motif 6 0 1 0 2 0 3 1 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000 inspect`
motif:	[(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]
filename:	"./soc-LiveJournal1.random.txt"
Duration { secs: 63, nanos: 652378537 }	[worker 0]	data loaded
Duration { secs: 64, nanos: 82679176 }	[worker 0]	indices merged
(Root, 70): [6239159]
...
(Root, 163): [3938957]
elapsed: 247.77742787706666	total motifs at this process: 550759817
Echidnatron%

Ok, this was much more manageable. We produced again about 3 million motifs per second, once the graph was loaded.

With two workers we get:

Echidnatron% cargo run --release --example motif -- 6 0 1 0 2 0 3 1 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000 inspect -w2
     Running `target/release/examples/motif 6 0 1 0 2 0 3 1 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000 inspect -w2`
motif:	[(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]
filename:	"./soc-LiveJournal1.random.txt"
motif:	[(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]
filename:	"./soc-LiveJournal1.random.txt"
Duration { secs: 36, nanos: 6450156 }	[worker 1]	data loaded
Duration { secs: 36, nanos: 6456809 }	[worker 0]	data loaded
Duration { secs: 36, nanos: 241080660 }	[worker 1]	indices merged
Duration { secs: 36, nanos: 241102454 }	[worker 0]	indices merged
(Root, 70): [2912750]
(Root, 70): [3326409]
...
(Root, 163): [2405342]
(Root, 163): [1533615]
elapsed: 139.3537110920297	total motifs at this process: 550759817

which is the usual reduction in graph loading, and then a reduction from 183 seconds of update processing to 103 seconds. If we scotch the inspect counting and printing we see for one thread

Echidnatron% cargo run --release --example motif -- 6 0 1 0 2 0 3 1 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000
     Running `target/release/examples/motif 6 0 1 0 2 0 3 1 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000`
motif:	[(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]
filename:	"./soc-LiveJournal1.random.txt"
Duration { secs: 63, nanos: 975649581 }	[worker 0]	data loaded
Duration { secs: 64, nanos: 410750017 }	[worker 0]	indices merged
elapsed: 197.72248245810624	total motifs at this process: 0
Echidnatron%

and for two threads

Echidnatron% cargo run --release --example motif -- 6 0 1 0 2 0 3 1 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000 -w2
     Running `target/release/examples/motif 6 0 1 0 2 0 3 1 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000 -w2`
motif:	[(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]
filename:	"./soc-LiveJournal1.random.txt"
motif:	[(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]
filename:	"./soc-LiveJournal1.random.txt"
Duration { secs: 36, nanos: 71786237 }	[worker 1]	data loaded
Duration { secs: 36, nanos: 71718054 }	[worker 0]	data loaded
Duration { secs: 36, nanos: 287560833 }	[worker 1]	indices merged
Duration { secs: 36, nanos: 287496086 }	[worker 0]	indices merged
elapsed: 117.44219726102892	total motifs at this process: 0
Echidnatron%

We do see a fairly decent reduction from not counting the motifs, due to not slogging around so much data. For two threads, we end up processing just over 1,000 updates per second, and enumerating roughly 6.7 million motifs per second.

Twitter doo-dads

There is a recent twitter paper where they are interested in recommending content based on the graph structure of ... engagement? likes? Something. They have a graph that evolves and a query that look like

rec(x0, x1, x2, x3) := edge(x0, x1), edge(x0, x2), edge(x1, x3), edge(x2, x3)

roughly this is a diamond, from x0 to both x1 and x2, and from each of them to x3. This is a good indicator that we should tell x0 about x3.

We can totally write down this motif, but notice that every directed four-clique is an instance of this motif, so we are going to produce at least as many outputs. We might produce a lot more.

Echidnatron% cargo run --release --example motif -- 4 0 1 0 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000 inspect
     Running `target/release/examples/motif 4 0 1 0 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000 inspect`
motif:	[(0, 1), (0, 2), (1, 3), (2, 3)]
filename:	"./soc-LiveJournal1.random.txt"
Duration { secs: 63, nanos: 831294181 }	[worker 0]	data loaded
Duration { secs: 64, nanos: 258243081 }	[worker 0]	indices merged
(Root, 70): [7430177]
...
(Root, 163): [4758367]
elapsed: 342.3828599349363	total motifs at this process: 652802206
Echidnatron%

Hrm. Not that many more (the total number of four-cliques for the same parameters was 550,759,817). However, the running time did go up by a bit (from 247.7 seconds). The computations are pretty similar, except at each extension we have fewer restrictions we get to apply, likely just resulting in larger candidate sets. We could go and measure the details to try and find out, which might be a smart thing to do in the future.

Let's grab some numbers for multiple threads, and without the counting / printing overhead. Here is one thread without counting, printing:

Echidnatron% cargo run --release --example motif -- 4 0 1 0 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000
     Running `target/release/examples/motif 4 0 1 0 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000`
motif:	[(0, 1), (0, 2), (1, 3), (2, 3)]
filename:	"./soc-LiveJournal1.random.txt"
Duration { secs: 64, nanos: 146720869 }	[worker 0]	data loaded
Duration { secs: 64, nanos: 573365193 }	[worker 0]	indices merged
elapsed: 284.03139675501734	total motifs at this process: 0
Echidnatron%

And two threads without counting, printing:

Echidnatron% cargo run --release --example motif -- 4 0 1 0 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000 -w2
     Running `target/release/examples/motif 4 0 1 0 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000000 1000 -w2`
motif:	[(0, 1), (0, 2), (1, 3), (2, 3)]
filename:	"./soc-LiveJournal1.random.txt"
motif:	[(0, 1), (0, 2), (1, 3), (2, 3)]
filename:	"./soc-LiveJournal1.random.txt"
Duration { secs: 36, nanos: 135581789 }	[worker 1]	data loaded
Duration { secs: 36, nanos: 135566269 }	[worker 0]	data loaded
Duration { secs: 36, nanos: 377814133 }	[worker 0]	indices merged
Duration { secs: 36, nanos: 377841251 }	[worker 1]	indices merged
elapsed: 158.41133950592484	total motifs at this process: 0
Echidnatron%

Well, those times sure are exciting. Of course, we've computed totally the wrong thing, I think.

Oh geez. Now what?

The Twitter computation has a few other constraints that we ignored when we wrote down the motif. For example, we probably only want tuples where the elements are all distinct; having x0 == x3 would be cheating. Having x1 == x2 is even more obviously broken. We probably also want x0 and x3 that are not already connected, right?

The good news is that these sorts of constraints can be easily integrated into our dataflow.

  1. The "distinctness" constraint is a per-tuple filter, and we can just apply this filter after each extension.

  2. The "non-presence" requirement is nearly identical to the presence requirement: we go and look things up in an adjacency list and make a decision based on what we find. In fact, we could present it as an intersect implementation if we wanted (though we cannot use this constraint to count or propose extensions).

These are relatively easy conceptual changes, though they require a bit of tap-dancing with the code. They are important to support, because they can make the computations much more efficient. For example, we could add the constraint x1 < x2 to the Twitter query, and remove duplicates of the same motif (each would otherwise be present twice, with x1 and x2 swapped). Similar tricks show up when looking for motifs in undirected graphs, where

5. Discussion, cool things!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK