Warning

This is a revival of one of my older lost blog posts from back in 2018. There may be much better ways of doing rate limiting in Kotlin or Java.

I came across a unique problem while working on a project back in 2018. My team was responsible for building the guts of CDP (Customer Data Platform). In one of those data pipelines, there was a part that required calling some APIs in the GCP platform. Like all public APIs, these GCP APIs also had rate limits applicable to them. However, the issue was the SDK didn’t have the code implemented to read the headers returned by the API. This meant I had to write some sort of client-side rate-limiting logic over the SDK-provided function (I was too lazy to implement the client from scratch). So far, nothing is unique. What jumped out to me was this rate limit had two parts:

  1. The client can make n requests per second
  2. The client can have n requests in flight (concurrency)

The problem was I couldn’t find a rate-limiting library in Java or Kotlin that would allow me to do both. The pipeline also dealt with a lot of data and I needed to do everything to make sure the producer doesn’t overwhelm the consumer and also save on computing and memory.

Long story short, I needed to write this client-side rate limiter for this specific use case.

What is a RateLimiter?

I am no expert on RateLimiter. There are far better resources that you can use to educate yourself on the different types. Go read those!

Seriously!

Great! You are back.

General Design

The data processing pipeline runs within a single host and only used the lowest of the lowest tier of the API (to save costs), there was no point in being super fancy and implementing a distributed rate limiter. Given it was a data pipeline processing gigabytes of data, I built it to run on streams from end to end. And since such streams in Kotlin can provide backpressure by default it made sense to incorporate the rate-limiting logic within the Kotlin coroutine primitives.

Info

Initially I had used Kotlin Channels and Flows was being designed around that time. I have updated the code to do it with Flows now.

ratelimit with backpressure

Show me the code

A few assumptions:

  1. The producer stream is implemented as a Flow
  2. The consumer function expects a Flow to be returned from the pipe.

With that said, let’s implement some interfaces first. These will come in handy later to allow for different types of storage. Today the only implementation we’ll focus on is in-memory.

Since we’ll need some sort of storage to keep track of the requests in flight we’d be creating an interface to represent the object with the data, startTime and endTime:

interface RateLimitingItem<T> : Comparable<RateLimitingItem<T>> {
	val startTime: Long
	var endTime: Long?
	val data: T
	fun isFulfilled(): Boolean  // Returns true if endTime is not null
}

Let’s define the interface which represents the collection of the buffered data and duration. Here data is a collection of items. For example if you can make n requests per 10 seconds, then the capacity of data should be n and duration should be Duration.ofSeconds(10).

interface RateLimitBuffer<T> {
	val data: Array<RateLimitingItem<T>?>
	val duration: Duration
	fun insert(item: RateLimitingItem<T>): Boolean  // Inserts item into `data`. Returns true if insertion is completed, false otherwise. `false` indicates that the function inserting should call `causeDelay`
	suspend fun causeDelay()  // Delay the thread by some time
}

Now to define the output from the throttled pipe:

data class Throttled<T>(
	val item: T,
	val acknowledgement: RateLimitingItem<T>
)

And here’s the rateLimiter function:

fun <T> CoroutineScope.rateLimiter(
	receiveFlow: Flow<T>,
	buffer: RateLimitBuffer<T>,
	pushItemToBuffer: (Long, Long?, T) -> RateLimitingItem<T>
): Flow<Throttled<T>> = flow {
	receiveFlow.collect { item ->
		var acknowledgement: RateLimitingItem<T>
		scheduler@ while (true) {
			acknowledgement = pushItemToBuffer(System.nanoTime(), null, item)
			if (!buffer.insert(acknowledgement)) {
				logger.debug { "Causing Delay" }
				buffer.causeDelay()
				continue@scheduler
			} else break@scheduler
		}
 
		emit(Throttled(item, acknowledgement))
	}
}

This is all well and good, but what about the implementation of the RateLimitBuffer and RateLimitingItem? Here it is:

package me.sumit.ratelimiter
 
import kotlinx.coroutines.delay
import mu.KotlinLogging
import java.time.Duration
 
private val logger = KotlinLogging.logger {}
 
class RateLimitBufferImpl<T>(private val capacity: Int, override val duration: Duration) : RateLimitBuffer<T> {
    override val data: Array<RateLimitingItem<T>?> = arrayOfNulls(capacity)
 
    @Synchronized
    override fun insert(item: RateLimitingItem<T>): Boolean {
        // Find write-able position
        // Either find a null position in the array or find an item that satisfies the isFulfilled for nullability
        val writePosition = findWritablePosition()
 
        return when {
            writePosition < 0 -> false
            else -> {
                return when {
                    data[writePosition] != null -> {
                        logger.debug { "Overwriting ${data[writePosition]?.data} with ${item.data}" }
                        data[writePosition] = item
                        true
                    }
 
                    else -> {
                        data[writePosition] = item
                        true
                    }
                }
            }
        }
    }
 
    @Synchronized
    private fun findWritablePosition(): Int {
        val indexedData = data.withIndex()
        val nullPosition = indexedData.find { it.value == null }
        if (nullPosition != null)
            return nullPosition.index
 
        val oldestItem = indexedData.filter { it.value != null }
            .minBy { (_, value) -> value!!.startTime }
 
        return if (oldestItem.value!!.isFulfilled()) {
            val timeElapsed = System.nanoTime().minus(oldestItem.value!!.endTime!!)
            logger.debug { "Elapsed Time: $timeElapsed" }
            if (timeElapsed >= duration.toNanos())
                oldestItem.index
            else
                -1
        } else {
            -1
        }
    }
 
    fun peekOldest(): RateLimitingItem<T>? {
        val sortedNonNullList = data.filterNotNull().sorted()
        return when {
            sortedNonNullList.isNotEmpty() -> sortedNonNullList.find { it.endTime != null } ?: sortedNonNullList[0]
            else -> null
        }
    }
 
    override suspend fun causeDelay() {
        delay(duration.toMillis() / capacity)
    }
 
}
 
 
class RateLimitingItemImpl<T>(override val startTime: Long, override var endTime: Long?, override val data: T) :
    RateLimitingItem<T> {
    override fun isFulfilled(): Boolean {
        return endTime != null
    }
 
    override fun compareTo(other: RateLimitingItem<T>): Int {
        return (other.endTime?.let { this.endTime?.compareTo(it) ?: 1 }) ?: -1
    }
}
 

We can verify the correctness by running a test case:

package me.sumit.ratelimiter
 
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import org.junit.jupiter.api.Test
import java.time.Duration
import kotlin.system.measureTimeMillis
 
private val logger = KotlinLogging.logger {}
 
internal class RateLimiterKtTest {
    @Test
    fun rateLimiterFlowTest() = runBlocking {
 
        val buffer = RateLimitBufferImpl<Int>(2, Duration.ofSeconds(1))
 
        val throttledFlow = rateLimiter(flowCreator(), buffer) { startTime, endTime, item ->
            RateLimitingItemImpl(startTime, endTime, item)
        }
        logger.info { "[${Thread.currentThread().name}]: Let's start our jobs!" }
 
        launch {
            val totalTime = measureTimeMillis {
                throttledFlow.collect { item ->
                    logger.debug { "Received $item" }
                    item.acknowledgement.endTime = System.nanoTime()
                }
            }
            logger.info { "Total time: $totalTime" }
            assert(totalTime > 5000)
            assert(totalTime < 6000)
            logger.info { "All Done!" }
        }
    }
 
    private fun flowCreator(): Flow<Int> = flow {
        for (x in 1..11) {
            emit(x)
        }
    }
}

Conclusion

With Kotlin coroutines writing safe code to handle concurrency and sequential data flow has become easier than ever before. I suspect there are battle-tested libraries like Reslience4j which may already have ready-made functions that can be composed to do this kind of client-side rate limiting.

All the code is available here: https://github.com/sumitsarkar/ratelimiter