Building a scalable sequence generator (in Scala)

Building a scalable sequence generator was more difficult than I’d anticipated.

The challenge

  • Build a scalable sequence generator (must scale out and provide resilience)
  • Master sequence number is stored in MongoDB, updated atomically using find and modify
  • Sequence numbers must never be repeated (but strict ordering isn’t required)

The problem

Since the sequence number is a single value stored in a single document in a single collection, the document gets locked on every request. MongoDB can’t help with scaling:

  • Starting multiple instances of our sequence generator doesn’t help, they all need to lock the same document
  • Multiple MongoDB nodes doesn’t help - we’d need replica acknowledged write concern to avoid duplicate sequence numbers

The solution

The solution is to take batches of sequence numbers from MongoDB, multiplying the scalability - for example, using a batch size of 10 means we can run (approximately) 10 instances of our sequence generator to our 1 MongoDB document, though any instance failure could waste up to 10 sequence numbers.

Using batches also dramatically improves our performance - we make far fewer MongoDB requests, generating less network traffic and reducing service response times.

The unscalable sequence generator

Building an unscalable sequence generator is easy. We can just find and modify the next sequence, MongoDB takes care of the rest.

An implementation might look a bit like this:

object UnscalableSequenceGenerator extends App {
  // the master sequence number
  var seq = 0

  def nextSeq : Future[Int] = future { blocking {
    // pretend we're doing a find and modify asynchronously
    Thread.sleep(30)
    this.synchronized {
      seq = seq + 1
      seq
    }
  } }

  // simulate calling our HTTP service 100 times
  for(i <- 1 to 100) {
    nextSeq map { j =>
      // pretend we're doing something useful with the sequence number
      print(s"$j ")
      if(i % 10 == 0) println
    }
  }

  Thread.sleep(5000)
}

Running that example produces output like this (the exact ordering of numbers may be different):

2 3 1 4 5 7 6 8 9 10 11 12
14 13 16 17 15 19 18 21 22 20 24 23 25
26 27 28 30 29
31 32 34 33 36 35 37 38 39 40 41 43 42
44 46 45 47 48 49 50 51 52
53 55 54 56 57 58 60 59
62 61 63 64 65 66 67 68 69 70
71 72 74 73 75 76 78 77 80 79 81
82 83 85 84 86 87 89 88 90 91
93 95 92 96 97 94 99 98 100

No duplicates, but it’s not scalable, and the performance is terrible.

Making it scalable

To make it scalable (and get a performance boost), we can use sequence number batches. But that turned out to be more difficult than I’d expected.

The first attempt looked a bit like this:

object BatchedSequenceGenerator extends App {
  // the master sequence number and batch size
  var seq = 0
  val batch_size = 10

  // our current sequence and maximum sequence numbers
  var local_seq = 0
  var local_max = 0

  def newBatch : Future[Int] = future { blocking {
    // pretend we're doing a find and modify asynchronously
    Thread.sleep(30)
    this.synchronized {
        seq = seq + 10
        seq
    }
  } }

  def nextSeq : Future[Int] = {
    if(local_seq >= local_max) {
      // Get a new batch of sequence numbers
      newBatch map { new_max =>
        // Update our local sequence
        local_max = new_max
        local_seq = local_max - batch_size
        local_seq = local_seq + 1
        local_seq
      }
    } else {
      // Use our local sequence number
      val next_seq = local_seq
      local_seq = local_seq + 1
      future { next_seq }
    }
  }

  // simulate calling our HTTP service 100 times
  for(i <- 1 to 100) nextSeq map { j =>
    // pretend we're doing something useful with the sequence number
    print(s"$j ")
    if(i % 10 == 0) println
  }

  Thread.sleep(5000)
}

While it does at least take batches of sequence numbers, we get the following unexpected but understandable output:

11 1 41 61 71 91 21 31 121 181 191 131 141 151 161 171 201 211 221
101 81 111 51
231 251 241 261 271 281 291 301
311 321 331 341 351 361 371 381
391 401 411 421 441 431 451 461 471
481 491 501 511 521 531 541 551
561 571 581 591 601 611 621 631 641 651 661 671 681 701 701 731 731 721
741 751 761 791 781 771 801 811 821 831 841
861 871 851 881 891 911
901 921 931 951 961 941 971
991 981

We’re only using 1/10th of each batch, and we get to 991 in only 100 requests. It’s no more scalable than the unbatched version.

It should probably have been obvious, but the problem is caused by requests arriving between requesting a new batch and getting a response:

  • The 10th request gives out the last local sequence number
  • The 11th request gets a new batch asynchronously
  • The 12th request arrives before we get a new batch, and requests another new batch asynchronously
  • We get the 11th request batch, reset our sequence numbers and return a sequence
  • We get the 12th request batch, and again reset our sequence numbers and return a sequence, wasting the rest of the previous batch

To fix it, we need the 12th request to wait for the 11th request to complete first.

Making it work

This was the tricky bit - implementing it led me down a road of endless compiler errors, but the idea was simple.

When we call nextSeq, we need to know if a new batch request is pending. If it is, instead of requesting a new batch, we need to wait for the existing request to complete, otherwise handle the request as normal.

We can do this by chaining futures together, keeping track of whether a batch request is currently in progress.

It’s a fairly simple change to our batched sequence generator (or at least, in hindsight it is):

object BatchedSequenceGenerator extends App {
  // the master sequence number and batch size
  var seq = 10
  val batch_size = 10

  // our current sequence and maximum sequence numbers
  var local_seq = 0
  var local_max = 10
  var pending : Option[Future[Int]] = None

  def newBatch : Future[Int] = future { blocking {
    // pretend we're doing a find and modify asynchronously
    Thread.sleep(30)
    this.synchronized {
      seq = seq + batch_size
      seq
    }
  } }

  def nextSeq : Future[Int] = this.synchronized {
    pending match {
      case None =>
        if(local_seq >= local_max) {
          // Get a new batch of sequence numbers
          pending = Some(newBatch map { new_max =>
            // Update our local sequence
            local_max = new_max
            local_seq = local_max - batch_size + 1
            local_seq
          })
          // Clear the pending future once we've got the batch
          pending.get andThen { case _ => pending = None }
        } else {
          // Use our local sequence number
          local_seq = local_seq + 1
          val seq = local_seq
          future(seq)
        }
      case Some(f) =>
        // Wait on the pending future
        f flatMap { f => nextSeq }
    }
  }

  // simulate calling our HTTP service 100 times
  for(i <- 1 to 100) nextSeq map { j =>
    // pretend we're doing something useful with the sequence number
    print(s"$j ")
    if(i % 10 == 0) println
  }

  Thread.sleep(5000)
}

And running that example generates output like this:

3 5 6 2 7 8 9 10
4 1 13 11 12 14 15 17 19 20
16 18 23 21 24 26 27 28 29 30 22
25 34 35 31 33 37 38 39 40
32 36 45 41 44 46 47 48 43
49 50 42 52 53 55 51 60 54 56
57 58 59 62
64 70 63 61 65 66 67 68 69 72 75 71
73 74 76 77 78 80 79 82 83 85 84 86 87 81
89 88 90 92 95 93 99 98 100 97
96 91 94

The changes we made are straightforward:

  • When we request a new sequence number, check if a pending future exists
    • If it does, wait on that and return a new call to nextSeq
    • If not, check if a new batch is required
      • If it is, store the future before returning
      • It not, use the existing batch as normal

A limitation of this approach - if we have a sufficiently small batch size with a high volume of requests, the considerable number of chained futures could potentially cause out of memory errors.

Getting it to work felt like an achievement, but I’m still not happy with the code. It looks like there should be a nicer way to do it, and it doesn’t feel all that functional, but I can’t see it yet!