# Welcome!

I help people and companies solve difficult problems using Machine Learning and Optimization. Read about my services here.

I write about themes in ML:

- Iterative algorithms are often given in papers in a form that is easy to understand and copy & paste into an implementation, but hard to maintain and reuse. I wrote some posts about implementing them in Rust via a streaming abstraction.
- Sensitivity of algorithms to (even just a few) outliers is a common ML/statistics foot-gun. It hurts generalization, makes in-sample performance unstable (therefore harder to debug), and creates data-quality work. This is often caused by the very common squared error loss which is not easy to give up on, because it leads to such nice algorithms! we proposed an approach to reweight data (which is compatible with many algorithms) that enjoys (provably) good statistical and algorithmic properties.

# Make the most of Data and AI in your business

Whether your company is initiating a new AI project, taking an existing one to the next level or preparing to scale up, we can help you take ideas from the whiteboard to production systems. Our first priority is the intended business result, whether it is improving addressable markets, sales, time to market or unit economics.

# Services

**Roadmap**: we conduct a joint deep dive into your data, environment, and goals. Deliverable: a brief written report describing one or two priority projects, the top risks to mitigate, and promising methods. This aims to be concrete enough to start a project.**Methods review**: one day study of the algorithmic approaches currently in practice. Deliverable: a report outlining easy improvements, directions for future improvement and risks/blindspots.**Ongoing expert support**: for a fixed cost per month, we will guide key research processes: review experimental results, suggest next directions, and answer questions.**Bespoke contracting**: implement advanced AI methods, dive into data or coach your staff.

# About me

My approach is based on years of experience. I have:

- Helped startups begin their ML journeys, including one later acquired by Microsoft.
- Helped executive team quantify the financial impact of their service, leveraging data to drive adoption.
- Improved prediction and planning in automated trading systems.
- Mentored individuals, founded teams, and created courseware to improve organizational performance.
- Invented, implemented and analyzed algorithms in the areas of robust ML, stochastic optimization, sparse representations, and published this work in top tier ML venues.

See my LinkedIn profile for more information about my professional experience, or my Publications for the public research.

# Testimonials

"What concerns were top of mind when we contacted Daniel for help apply ML at Quest Mindshare? We were looking at someone with a lot of experience that could also teach so that we could start to onboard the knowledge rather than someone build it for us and bail and then we wouldn't know how it works or more importantly, why it works.

After Daniel and I did a methods review session, where we looked at our efforts so far, I had a better idea of the types of questions that I need to ask, and had learned about a whole set of unknown unknowns. This helped me direct my own learning going forward.

Would I recommend Daniel's services to others in a similar position? Yes, definitely. Especially for companies like ours that don't have the need for a full-time ML expert, this is the perfect solution. Get someone in to help you understand what questions to ask about the data etc and get you going on the path, with an option for hands-on follow up sessions as needed." Jayd Lawrence, Staff Software Engineer at Quest Global Research Corp

“Having an expert there to help guide us to understand if we were thinking about something the right way was hugely valuable”: Jason Maier, Head of Research at Uneven Labs , about a project I partnered with Erik and Tribe.ai on.

"I happened to know Daniel from the Technion where he was part of the Excellence program. It took some time to convince him to join the team, but it was worth it. He is one of the most technically brilliant people I have worked with, fast-learning, meticulous, responsible, and also good in working remotely. Daniel a send-and-forget type of person understanding the task on the fly and coming back with results quickly." Michael Bronstein, DeepMind Professor of AI, Oxford / Serial startupper

# Go ahead and get in touch!

Email me, schedule us an initial meeting, or leave me a note below and sign up to hear more.

# Design of iterative methods

This series introduces iterative methods (an important class of algorithms that progressively improve solutions) and discusses reusable components for their use, in Rust.

# Iterative methods in Rust: Conjugate Gradient

## Introduction: the Beauty and the Trap

Iterative methods (IM) power many magical-seeming uses of computers, from deep learning to PageRank. An IM repeatedly applies a simple recipe which improves an approximate solution to a problem. As a block of wood in the right hands gradually transforms into a figurine, the right method will produce a sequence of solutions approaching perfection from an arbitrarily bad initial value. IM are quite unlike most CS algorithms, and take some getting used to, but for many problems they are the only game in town. Unfortunately, implementations of IM are often hard to maintain (read, test, benchmark) because they mix the recipe with other concerns.

We start with a simple concrete example. Implementing an iterative
method often starts as a `for`

loop around the recipe (to try it out,
press the play button):

`#![allow(unused)] fn main() { // Problem: minimize the convex parabola f(x) = x^2 + x // An iterative solution by gradient descent let mut x = 2.0; for i in 0..10 { // "2.0*x + 1.0" is the derivative of f(x) // so moving a little bit in the opposite direction x -= 0.2 * (2.0*x + 1.0); // should reduce the value of f(x)! println!("x_{} = {:.2}; f(x_{}) = {:.4}", i, x, i, x*x + x); } // An alternative that does not scale well to harder problems is to // use the fact `f` does not decrease/increase at the minimum to // find its location directly: // // `f` at a minimum zeros its derivative: 2*x + 1 = 0. // Rearranging terms gives the solution x = -1/2. }`

... but that loop body already mixes multiple concerns. If it didn't
`println`

progress, we wouldn't even see it is working. If it didn't
stop after 10 iterations, it would go on forever. But a reader might
justly be confused why that limit is there: does the method work only
for 10 iterations? and perhaps a different user does not want any
intermediate output, must they reimplement the recipe? how should they
ensure the reimplementation is also correct? The approach taken in
this example does not allow additional functionalities to be composed
cleanly, and thus they build up like barnacles on a ship. Is there a
way to keep the recipe separate from other concerns?

The answer is yes. This series of posts is about such a way to
implement IM in Rust using iterators. This design, realized
for Rust in the `iterative_methods`

crate
(repo),
follows (and expands on) the approach "Iterative Methods Done
Right"
(IMDR), by Lorenzo Stella, demonstrated with Julia
iterables. The
main idea is that each IM is an iterator over algorithm states, and
each reusable "utility" is an iterator adaptor augmenting or
processing them. These components are highly reusable, composable and
can be tailored using closures. This is basically an extension to this
domain of an approach the Rust standard library already follows. But
if it seems a bit dense, that's fine, we'll unpack it over this post
and the next, introducing along the way a common iterative method as
well as measures of IM quality.

Beyond reuse of methods and utilities, there is another reason to separate them: iterative methods are often highly sensitive beasts. Small changes can cause subtle numerical and/or dynamic effects that are very difficult to trace and resolve. Thus a primary design goal is to minimize the need for modifications to the method implementation, even when attempting to study/debug it.

## Our first full iterative method

Our main guest IM for this post and the next will be Conjugate
Gradient
(CG). What is CG for? here is one example: to simulate physics like
spread of heat (or deformation, fluid flows,
etc)
over a complex shape, we break it up into simpler pieces to apply the
Finite Element Method
(FEM). In FEM,
the relations between temperatures at different elements implied by
the heat equation are encoded into a matrix \(A\) and a vector
\(b\). To find the vector of temperatures \(x\), it is enough to
solve the matrix equation \(Ax = b\). CG is an IM for solving such
equations when \(A\) is positive definite (PD)^{1}, which it is for
a wide variety of domains.

Why the conjugate gradient method works is beyond the scope of this post, but good sources include the Wikipedia exposition and also these lecture notes.

^{1}

Positive Definite matrices

A positive definite matrix \(A\) only scales \(x\)'s differently in different directions; no rotation or flipping allowed.

## Implementation and a general interface

To store the state our method maintains we define a struct
`ConjugateGradient`

, for which we implement the StreamingIterator
trait. This trait is simple, and requires us to implement two methods:

`advance`

applies one iteration of the algorithm, updating state.`get`

returns a borrow of the`Item`

type, generally some part of its state.

The benefit of the StreamingIterator trait over the ubiquitous
Iterator is `get`

exposing information by reference; this leaves
decisions to copy state up to the implementor.

The signatures for implementing an iterative method in this style are as follows:

```
#[derive(Clone, Debug)]
pub struct ConjugateGradient {
// State of the algorithm
}
impl ConjugateGradient {
/// Initialize a conjugate gradient iterative solver to solve linear system `p`.
pub fn for_problem(p: &LinearSystem) -> ConjugateGradient {
// Problem data such as the LinearSystem is often large, and should not be
// duplicated. This implementation uses ndarray's ArcArray's which are cheap
// to clone as they share the data they point to.
}
}
impl StreamingIterator for ConjugateGradient {
type Item = Self;
fn advance(&mut self) {
// the improvement recipe goes here
}
fn get(&self) -> Option<&Self::Item> {
// Return self, immutably borrowed. This allows callers read-only access
// to method state. The following is a bit simplified:
Some(self)
}
}
```

Note a few design decisions in the above:

- The
*problem*is a distinct concept from any*method*for solving it. The same problem representation (here,`LinearSystem`

) often can and should be reused to initialize different methods. - The constructor method
`for_problem`

is responsible to set up the initial state for the first iteration, and so is part of the method definition. - Another constructor responsibility is to perform applicable and cheap checks of the input problem; expensive initialization is a bad fit for an iterative method.
`Item`

is set to the whole`ConjugateGradient`

, all algorithm state. We could set the`Item`

type returned by the`get`

method be only a result field, thus hiding implementation details from downstream. Similarly, there is some flexibility in defining the iterable struct: beyond a minimal representation of state required for the next iteration, should we add fields to store intermediate steps of calculations? How about auxiliary information not needed at all in the method itself? Consider the following excerpt from the implementation of`advance`

:

```
// while r_k != 0:
// alpha_k = ||r_k||^2 / ||p_k||^2_A
self.alpha_k = self.r_k2 / self.pap_k;
if (!too_small(self.r_k2)) && (!too_small(self.pap_k)) {
// x_{k+1} = x_k + alpha_k*p_k
...
```

Where `self.alpha_k`

is only read in the remainder of the recipe. So
why not make it a temporary, instead of a fields of
`ConjugateGradient`

? This would seem to shrink the struct saving
memory and hide an unnecessary detail, generally positive outcomes,
right? But soon after implementing this code I found myself wanting to
print `alpha_k`

, which is impossible for a local without modifying the
`advance`

method! By storing more intermediate state in the iterator
state, exposing all of it via `get`

, and inspecting it externally, we
avoid modifying the method for our inspection and the dreaded
Heisenbugs that could
ensue. On top of a solid whitebox implementation, we can always build
an interface that abstracts away some aspects.

## Running an Iterative Method

How do we call such an implementation? the example below illustrates a common workflow:

```
// First we generate a problem, which consists of the pair (A,b).
let p = make_3x3_pd_system_2();
// Next convert it into an iterator
let mut cg_iter = ConjugateGradient::for_problem(&p);
// and loop over intermediate solutions.
// Note `next` is provided by the StreamingIterator trait using
// `advance` then `get`.
while let Some(result) = cg_iter.next() {
// We want to find x such that a.dot(x) = b
// then the difference between the two sides (called the residual),
// is a good measure of the error in a solution.
let res = result.a.dot(&result.solution) - &result.b;
// The (squared) length of the residual is a cost, a number
// summarizing how bad a solution is. When working on iterative
// methods, we want to see these numbers decrease quickly.
let res_squared_length = res.dot(&res);
// || ... ||_2 is notation for euclidean length of what
// lies between the vertical lines.
println!(
"||Ax - b||_2 = {:.5}, for x = {:.4}, residual = {:.7}",
res_squared_length.sqrt(),
result.solution,
res
);
// Stop if residual is small enough
if res_squared_length < 1e-3 {
break;
}
}
```

Indeed the output shows nice convergence, with the residual \(Ax - b\) tending quickly to zero:

```
||Ax - b||_2 = 1.00000, for x = [+0.000, +0.000, +0.000], residual = [+0.000, -1.000, +0.000]
||Ax - b||_2 = 0.94281, for x = [+0.000, +0.667, +0.000], residual = [+0.667, +0.000, +0.667]
||Ax - b||_2 = 0.00000, for x = [-4.000, +6.000, -4.000], residual = [+0.000, +0.000, +0.000]
```

In terms of the code, notice the algorithm is taken out of the loop! We do not modify it merely to report progress, not even to decide when to stop. But we do change that loop body, which gets a bit messy. Once we start looking for such niceties, soon we'll want to:

- look at only every Nth iteration,
- measure the runtime of an iteration (excluding the cost of reporting itself),
- plot progress over time,
- save progress in case of a power failure...

for basically every method we work on, and we certainly don't want all of those tangled up in our loops. We will want reusable components, named to convey intention! As mentioned above, the idea of representing processes with streaming iterators applies in a similar way to utilities as well, in a way that is clean and orthogonal. We demonstrate this in the next post.

Looking beyond design for code reuse, IM also put a new twist on benchmarking and testing. How does one time or test code that doesn't really want to stop, and for which solutions only approach correctness? We'll get to those questions as well.

Thanks to Daniel Fox (a collaborator on this project) and Yevgenia Vainsencher for feedback on early versions of this post.

# Using iterative methods and reusable utilities

## Means and ends

In our previous post, we introduced iterative methods, implemented one (conjugate gradient) in Rust as a StreamingIterator, and saw a "cost" of its solutions decrease towards zero. The StreamingIterator interface allowed us to separate two concerns:

- produce a sequence of solutions
- use the solutions

In this post we focus on the second part, using as our focus this workflow of tracking solution quality. This is an handy tool for evaluation of methods, from ensuring the initial implementation is complete, through validating on a real problem, to a plot in a research paper demonstrating advantage (hopefully) over the state of the art.

But what do we mean by quality? and how does it look to decompose the current problem into reusable parts?

Before diving into those questions we recall the end goal: users eventually want to call a function, pass it a problem, and receive one final good enough solution (or an indication of error). The existence of the sequence of worse preceding approximations is an implementation detail to them. However, this kind of interface also is easy to build on top of a stream based implementation and a few of the same utilities.

## Measures of progress

Let's generalize and nail down the terms we used loosely before. Let
\(x_0\) be an initial solution to a problem \(p\),
\(x_1,x_2\dots\) the sequence of solutions produced by an iterative
method, and \(f(p,x)\) a real function measuring a *cost* of using
solution \(x\) for problem \(p\).

In the workflows we are considering, we show \(f(p,x_t)\) as a function of either the iteration number \(t\) or elapsed time. Going back to the example of finding solutions for a matrix equality \(Ax=b\), we would like to have the vector of errors \(Ax-b\) to be all zero, so a natural measure of cost is the Euclidean length (aka \(l_2\) norm) of the error vector: \(\sqrt{\sum_{i=1}^n\left(Ax-b\right)_i^2}\), as we printed last time.

There are different useful cost functions, not just when considering different problem types, but even for one particular problem. Other measures of the size of a vector sometimes make more sense; the sum of absolute values (\(l_1\)) or their maximum (\(l_\infty\)), the worst error in any dimension) are two very common examples. Another source of variants is to give some error components a larger weight than others, for example based on their roles in an application. Finding a cost function that models well the desiderata of a particular application is a bit of an art, but a critical step in selecting among distinct methods.

We have defined an iterative method as one that improves a
solution. More precisely, this means there exists a particular cost
function (also known as a Lyapunov function or energy function), which
the improvement recipe *can be proven* to reduce (perhaps in
expectation). This energy function is generally tailored to the
problem data, method, assumptions and proof method, and so not to an
application, but it has a special role to play in validation.

## Implementing benchmarking

Time to get dirty with some implementation. First let us review how we
used `ConjugateGradient`

last time (removing the comments), and then improve on
it:

```
let p = make_3x3_pd_system_2();
let mut cg_iter = ConjugateGradient::for_problem(&p);
while let Some(result) = cg_iter.next() {
let res = result.a.dot(&result.solution) - &result.b;
let res_squared_length = res.dot(&res);
println!(
"||Ax - b||_2 = {:.5}, for x = {:+.3}, residual = {:+.3}",
res_squared_length.sqrt(),
result.solution,
res
);
if res_squared_length < 1e-3 {
break;
}
}
```

To start with, our computation of the l_2 of residual cost is mixed in with the I/O in the loop. We extract a reusable function:

```
fn residual_l2(result: &ConjugateGradient) -> f64 {
let res = result.a.dot(&result.solution) - &result.b;
res.dot(&res).sqrt()
}
```

We can now separate the application of the cost function out of the loop body:

```
let p = make_3x3_pd_system_2();
let cg_iter = ConjugateGradient::for_problem(&p);
// Annotate each approximate solution with its cost
let cg_iter = assess(cg_iter, residual_l2);
// and use it in stopping condition
let mut cg_iter = take_until(cg_iter, |ar| ar.annotation < 1e-3);
// Deconstruct to break out the result and cost
while let Some(AnnotatedResult {
result: cgi,
annotation: euc,
}) = cg_iter.next()
{
// Now the loop body is I/O only as it should be!
println!("||Ax - b||_2 = {:.5}, for x = {:.4}", euc, cgi.solution);
}
```

What is happening here? `assess`

composes onto ConjugateGradient a
StreamingIterator `adaptor`

(just like Iterator
adaptors),
so below that statement, `cg_iter`

is a new kind of StreamingIterator,
whose items pair the underlying items with their cost, as evaluated by
the given function. Similarly `take_until`

adapts the stream to return
elements and stop after the first satisfying apredicate
(`take_while`

is similar but stops before the desired element). Lastly, the loop
header now deconstructs the struct, and its body does only I/O. Ok,
cute trick, but does this scale for more general scenarios?

## A larger scenario, or: more realistic benchmarking

Suppose that:

- Having seen
`residual_l2`

converge, we decide that to our application the \(l_\infty\) norm of the residual matters more, so we want to track convergence in that metric as well. - We also learn that an energy function for conjugate gradient method
is: \(||x-x^
*||_A\) (where \(||a||_A^2 = a^\top A a\)), where ((x^*)) is the optimal solution (usually not known in advance in applications, but available in benchmarks). We decide to track that as well.

```
fn residual_linf(result: &ConjugateGradient) -> f64 {
(result.a.dot(&result.solution) - &result.b).fold(0.0, |m, e| m.max(e.abs()))
}
fn a_distance(result: &ConjugateGradient, optimum: V) -> f64 {
let error = &result.solution - &optimum;
error.dot(&result.a.dot(&error)).sqrt()
}
```

- We want to compare performance to a different algorithm solving the same problems. Since the algorithms are different, comparing progress per iteration is apples to oranges, so we want to measure progress by time.
- We want to stop after the earliest of reaching a sufficiently good solution based on the 2 effecitve measures (useful for a solid method) or iteration count exceeding N (especially useful for a method still under development).

These requirements can all be fulfilled using a readable composition of pre-existing components:

```
// Set up a problem for which we happen to know the solution
let p = make_3x3_pd_system_2();
let optimum = rcarr1(&[-4.0, 6., -4.]);
let cg_iter = ConjugateGradient::for_problem(&p);
// Cap the number of iterations.
let cg_iter = cg_iter.take(80);
// Time each iteration, only of preceding steps (the method)
// excluding downstream evaluation and I/O (tracking overhead), as
// well as elapsed clocktime (combining both).
let cg_iter = time(cg_iter);
// Record multiple measures of quality
let cg_iter = assess(cg_iter, |TimedResult { result, .. }| {
(
residual_l2(result),
residual_linf(result),
a_distance(result, optimum.clone()),
)
});
// Stop if converged by both criteria
fn small_residual((euc, linf, _): &(f64, f64, f64)) -> bool {
euc < &1e-3 && linf < &1e-3
}
let mut cg_iter = take_until(cg_iter, |ar| small_residual(&ar.annotation));
// Output progress
while let Some(AnnotatedResult {
annotation: (euc, linf, a_dist),
result:
TimedResult {
result,
start_time,
duration,
},
}) = cg_iter.next()
{
println!(
"{:8} : {:6}, \
||Ax - b||_2 = {:.3}, ||Ax - b||_inf = {:.3}, ||x-x*||_A = {:.3}, \
for x = {:+.3}",
start_time.as_nanos(),
duration.as_nanos(),
euc,
linf,
a_dist,
result.solution,
);
}
```

The result happily shows constant improvement on the energy function from the very first iterations:

```
651 : 30304, ||Ax - b||_2 = 1.000, ||Ax - b||_inf = 1.000, ||x-x*||_A = 2.449, for x = [+0.000, +0.000, +0.000]
75431 : 31863, ||Ax - b||_2 = 0.943, ||Ax - b||_inf = 0.667, ||x-x*||_A = 2.309, for x = [+0.000, +0.667, +0.000]
148849 : 31173, ||Ax - b||_2 = 0.000, ||Ax - b||_inf = 0.000, ||x-x*||_A = 0.000, for x = [-4.000, +6.000, -4.000]
```

So far the components have been pretty general purpose like `time`

and
`take_until`

even when parametrized like `assess`

. In the next post
Daniel Fox describes some more substantial adaptors he has contributed
that I find pretty exciting!

### Contributed by Daniel Fox

# Iterative Methods in Rust

## Reservoir Sampling

This is the third post in the series presenting the `Iterative Methods`

Rust crate. Thanks to Daniel Vainsencher for inviting me to post. As discussed in the earlier posts, the `Iterative Methods`

crate aims to expand the repertoire of iterative methods readily available in the Rust ecosystem.

This post describes how the `Iterative Methods`

crate facilitates easy reservoir sampling of a `StreamingIterator`

. Reservoir sampling produces an up-to-date and relatively low cost random sample of a large stream of data. For example, suppose you want to maintain an up-to-date sample of \(k\) tweets from a twitter feed. At any moment, a reservoir sample of the tweets is equivalent to a random sample of \(k\) items from the portion of the stream that has been processed at that moment. The reservoir sampling algorithm accomplishes this without needing to know the total number of samples in the stream and it updates the sample to take into account new behavior in the data that may not have been present initially. Below we'll see how to use reservoir sampling for a `StreamingIterator`

in Rust. Using animations we can see how the reservoir samples stay up-to-date as the data stream exhibits new behavior.

## Outline of the Post

- The UI for Reservoir Sampling
- Visualizing the Evolving Distribution
- Reservoir Means Approximate Stream Means
- Exporting Data for Visualizations Using Adaptors

## The UI for Reservoir Sampling

The UI uses adaptors to transform the behavior of a `StreamingIterator`

. Suppose that `stream`

is a `StreamingIterator`

with items of type `T`

. We adapt that iterator using `reservoir_sample(stream, capacity, rng)`

, whose fields are 1) a streaming iterator, 2) the capacity or sample size of the reservoir sample, and 3) a choice of a random number generator. The default is to set `rng`

to `None`

so that `rand_pcg::Pcg64`

is used. You also have the option of using a seedable `rng`

that might be useful for debugging or testing. Each item of the returned `StreamingIterator`

is a `Vec<T>`

, where the vector holds a reservoir sample.

```
let stream = reservoir_sample(stream, capacity, None);
while let Some(item) = stream.next() {
// do stuff with each reservoir;
// each item of the iterator is now a reservoir sample,
// not a single item of the original stream
}
```

Let's look at an example demonstrating the utility of both the UI and reservoir sampling.

## Visualizing the Evolving Distribution

Suppose that we have a stream of data for which the distribution changes. For example, suppose that `stream`

is a `StreamingIterator`

of floats for which the first quarter of the stream is generated by a normal distribution with mean \( \mu = 0.25\) and standard deviation \(\sigma = 0.15\), but that the final three-quarters of the stream is generated by a normal distribution with \( \mu = 0.75\), \( \sigma = 0.15\) —the mean jumps from \( 0.25\) to \( 0.75\). Here are histograms showing the initial and final distributions of the stream:

The reservoir sample starts off approximating the initial normal distribution centered at \(\mu = 0.25\). Gradually it shifts to approximate the distribution of the portion of the stream that has been processed. Here is an animation showing how the reservoir distribution evolves compared to the distribution of the portion of the stream that has been processed.

## Reservoir Means vs. Stream Means

While the animation in Figure 2 *visually* demonstrates how the reservoir distribution tracks the stream distribution, we can also track the performance of the reservoir sampling by generating metrics with adaptors. For example, we can check that the mean of the reservoir sample is close to the mean of the portion of the stream that has been processed. So let's plot the reservoir mean vs. the mean of the portion of the stream that was sampled to produce the reservoir. The `Iterative Methods`

crate allows you to accomplish this through adaptors as if you were using a Rust `Iterator`

. (If you only want to compute streaming means, reservoir samples are not the efficient way to do it. Rather, what follows is useful for checking that the means of the reservoirs are behaving as expected.)

In order to know which portion of the stream has been sampled for each reservoir, we'll prepare the stream by enumerating its items with the `enumerate()`

adaptor. This wraps each item of a `StreamingIterator`

in a `Numbered{count, item}`

struct that contains the original item and the index of the item. All of the adaptors are lazy, so the enumeration is added on the fly as the stream is processed. With the enumeration added in, for each reservoir we can find the item with the largest index, which we'll name `max_index`

. We compare the reservoir mean to the mean of the stream up to and including that index.

Here is the code that accomplishes this. The code is modular; once the data stream exists we adapt, adapt, adapt in whichever sequence is currently useful. As before `stream`

is our `StreamingIterator`

full of float samples from the pair of distributions as described above. The `stream`

starts off with each item an `f64`

;

- After the
`enumerate`

adaptor, the items are`Numbered<f64>`

with indices that will allow us to calculate`max_index`

for each reservoir; - After the
`reservoir_sample()`

each item is a reservoir sample in a`Vec<Numbered<f64>>`

; - The
`map`

adaptor uses the named closure`reservoir_mean_and_max_index`

to compute the mean and maximum index for each reservoir. After the`map`

adaptor each item is a`Numbered`

struct containing the mean of the reservoir and the`max_index`

indicating how much of the stream was sampled to obtain that reservoir. See the source code for the wheels within wheels.

```
let stream = enumerate(stream);
let stream = reservoir_sample(stream, capacity, None);
let stream = stream.map(reservoir_mean_and_max_index);
while let Some(item) = stream.next() {
// you could do things here, but probably it is more
//convenient to use adaptors to accomplish your goals
}
```

Now let's visuallly compare the means of the reservoirs and the means of the portions of the stream from which the reservoir sample was drawn. In the figure below, we see that, informally speaking, the mean of the reservoir does a nice job of approximating the mean of the portion of the stream that has been sampled.

What we see in these examples is that the idiomatic use of adaptors for a Rust `Iterator`

that you know and love can be applied to more complex iterative methods, such as Reservoir Sampling: we abstract all the complexities to adaptors and closures in this example, keeping the work flow of the iterative method clear.

## Exporting Data for Visualizations Using Adaptors

How did we make the visualizations, you ask? Why, by exporting the data with more adaptors! Let's take a look at how we adapt the stream to write the data needed for the visualizations in this blog post. The data needed for all three visualizations was written to YAML files using only a single pass through the stream. Here is what we need to accomplish in order to obtain all the data needed for the visualizations:

- enumerate the stream of floats (the enumeration is needed to compute
`max_index`

as described above) - write the enumerated stream to YAML (used in Figure 1)
- generate reservoir samples
- write reservoirs to YAML for the histogram animations (used in Figure 2)
- calculate the mean and
`max_index`

for each reservoir - write the
`(mean, max_index)`

pair to YAML (used in Figure 3)

Finally, the only thing we do inside the loop is count the total number of reservoir samples that were made. This is helpful for initializing array sizes when making the visualizations. Here is the code:

```
// Enumerate the items in the stream; item type is now
// Numbered<f64>{count: index, item: f64 value}
let stream = enumerate(stream);
// Write the enumerated stream to YAML as a side effect,
// passing through the enumerated items
let stream = write_yaml_documents(stream, population_file.to_string())
.expect("Create File and initialize yaml iter failed.");
// Convert items to reservoir samples of type Vec<Numbered<f64>>
let stream = reservoir_sample(stream, capacity, None);
// Write the reservoirs to YAML as a side effect
let stream = write_yaml_documents(
stream,
reservoir_samples_file.to_string()
).expect("Create File and initialize yaml iter failed.");
// Convert items to
// Numbered<f64>{count: max_index, item: reservoir mean}
// using the named closure reservoir_mean_and_max_index
let stream = stream.map(reservoir_mean_and_max_index);
// Write these new items to YAML as side effect
let mut stream = write_yaml_documents(
stream,
reservoir_means_file.to_string()
).expect("Create File and initialize yaml iter failed.");
// num_res is used in the Python script for visualizations to
// initialize array sizes
let mut num_res = 0;
while let Some(_item) = stream.next() {
num_res += 1
}
```

The head of the YAML file for the reservoir samples is shown below in Code Block 4. Each reservoir sample is in its own YAML document within a single file.

```
---
- - 0
- 0.07243419605614634
- - 1
- 0.10434526397435201
- - 2
- 0.29291775753278054
- - 3
- 0.5312252557573507
```

The visualizations were generated from the data in the YAML files using the Plotly module in Python. In the future we hope to switch to an entirely Rusty solution using the Plotters crate. Currently, our Rust program runs Python scripts using `std::process::Command`

and writes errors they might throw to `stderr`

.

The iterator-adaptor idiom popular in Rust and other modern languages provides an ergonomic way to write code. As we build a repertoire of adaptors that implement useful iterative methods, we can easily deploy them in myriad combinations to meet our engineering needs. For example, the YAML adaptor can also be used for checkpoints or logs. So go ahead and run the examples. If you try out the `Iterative Methods`

crate, please send us feedback!

*Thanks to Daniel Vainsencher for inviting me to contribute to the Iterative Methods crate and for helpful feedback about this blog post and the implementation of reservoir sampling.*