Warning
This is a revival of one of my older lost blog posts from back in 2018. There may be much better ways of doing rate limiting in Kotlin or Java.
I came across a unique problem while working on a project back in 2018. My team was responsible for building the guts of CDP (Customer Data Platform). In one of those data pipelines, there was a part that required calling some APIs in the GCP platform. Like all public APIs, these GCP APIs also had rate limits applicable to them. However, the issue was the SDK didn’t have the code implemented to read the headers returned by the API. This meant I had to write some sort of client-side rate-limiting logic over the SDK-provided function (I was too lazy to implement the client from scratch). So far, nothing is unique. What jumped out to me was this rate limit had two parts:
- The client can make n requests per second
- The client can have n requests in flight (concurrency)
The problem was I couldn’t find a rate-limiting library in Java or Kotlin that would allow me to do both. The pipeline also dealt with a lot of data and I needed to do everything to make sure the producer doesn’t overwhelm the consumer and also save on computing and memory.
Long story short, I needed to write this client-side rate limiter for this specific use case.
What is a RateLimiter?
I am no expert on RateLimiter. There are far better resources that you can use to educate yourself on the different types. Go read those!
Seriously!
Great! You are back.
General Design
The data processing pipeline runs within a single host and only used the lowest of the lowest tier of the API (to save costs), there was no point in being super fancy and implementing a distributed rate limiter. Given it was a data pipeline processing gigabytes of data, I built it to run on streams from end to end. And since such streams in Kotlin can provide backpressure by default it made sense to incorporate the rate-limiting logic within the Kotlin coroutine primitives.
Info
Initially I had used Kotlin Channels and Flows was being designed around that time. I have updated the code to do it with Flows now.

Show me the code
A few assumptions:
- The producer stream is implemented as a
Flow
- The consumer function expects a Flow to be returned from the pipe.
With that said, let’s implement some interfaces first. These will come in handy later to allow for different types of storage. Today the only implementation we’ll focus on is in-memory.
Since we’ll need some sort of storage to keep track of the requests in flight we’d be creating an interface to represent the object with the data
, startTime
and endTime
:
interface RateLimitingItem<T> : Comparable<RateLimitingItem<T>> {
val startTime: Long
var endTime: Long?
val data: T
fun isFulfilled(): Boolean // Returns true if endTime is not null
}
Let’s define the interface which represents the collection of the buffered data and duration. Here data
is a collection of items. For example if you can make n
requests per 10 seconds, then the capacity of data
should be n
and duration
should be Duration.ofSeconds(10)
.
interface RateLimitBuffer<T> {
val data: Array<RateLimitingItem<T>?>
val duration: Duration
fun insert(item: RateLimitingItem<T>): Boolean // Inserts item into `data`. Returns true if insertion is completed, false otherwise. `false` indicates that the function inserting should call `causeDelay`
suspend fun causeDelay() // Delay the thread by some time
}
Now to define the output from the throttled pipe:
data class Throttled<T>(
val item: T,
val acknowledgement: RateLimitingItem<T>
)
And here’s the rateLimiter
function:
fun <T> CoroutineScope.rateLimiter(
receiveFlow: Flow<T>,
buffer: RateLimitBuffer<T>,
pushItemToBuffer: (Long, Long?, T) -> RateLimitingItem<T>
): Flow<Throttled<T>> = flow {
receiveFlow.collect { item ->
var acknowledgement: RateLimitingItem<T>
scheduler@ while (true) {
acknowledgement = pushItemToBuffer(System.nanoTime(), null, item)
if (!buffer.insert(acknowledgement)) {
logger.debug { "Causing Delay" }
buffer.causeDelay()
continue@scheduler
} else break@scheduler
}
emit(Throttled(item, acknowledgement))
}
}
This is all well and good, but what about the implementation of the RateLimitBuffer
and RateLimitingItem
? Here it is:
package me.sumit.ratelimiter
import kotlinx.coroutines.delay
import mu.KotlinLogging
import java.time.Duration
private val logger = KotlinLogging.logger {}
class RateLimitBufferImpl<T>(private val capacity: Int, override val duration: Duration) : RateLimitBuffer<T> {
override val data: Array<RateLimitingItem<T>?> = arrayOfNulls(capacity)
@Synchronized
override fun insert(item: RateLimitingItem<T>): Boolean {
// Find write-able position
// Either find a null position in the array or find an item that satisfies the isFulfilled for nullability
val writePosition = findWritablePosition()
return when {
writePosition < 0 -> false
else -> {
return when {
data[writePosition] != null -> {
logger.debug { "Overwriting ${data[writePosition]?.data} with ${item.data}" }
data[writePosition] = item
true
}
else -> {
data[writePosition] = item
true
}
}
}
}
}
@Synchronized
private fun findWritablePosition(): Int {
val indexedData = data.withIndex()
val nullPosition = indexedData.find { it.value == null }
if (nullPosition != null)
return nullPosition.index
val oldestItem = indexedData.filter { it.value != null }
.minBy { (_, value) -> value!!.startTime }
return if (oldestItem.value!!.isFulfilled()) {
val timeElapsed = System.nanoTime().minus(oldestItem.value!!.endTime!!)
logger.debug { "Elapsed Time: $timeElapsed" }
if (timeElapsed >= duration.toNanos())
oldestItem.index
else
-1
} else {
-1
}
}
fun peekOldest(): RateLimitingItem<T>? {
val sortedNonNullList = data.filterNotNull().sorted()
return when {
sortedNonNullList.isNotEmpty() -> sortedNonNullList.find { it.endTime != null } ?: sortedNonNullList[0]
else -> null
}
}
override suspend fun causeDelay() {
delay(duration.toMillis() / capacity)
}
}
class RateLimitingItemImpl<T>(override val startTime: Long, override var endTime: Long?, override val data: T) :
RateLimitingItem<T> {
override fun isFulfilled(): Boolean {
return endTime != null
}
override fun compareTo(other: RateLimitingItem<T>): Int {
return (other.endTime?.let { this.endTime?.compareTo(it) ?: 1 }) ?: -1
}
}
We can verify the correctness by running a test case:
package me.sumit.ratelimiter
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import org.junit.jupiter.api.Test
import java.time.Duration
import kotlin.system.measureTimeMillis
private val logger = KotlinLogging.logger {}
internal class RateLimiterKtTest {
@Test
fun rateLimiterFlowTest() = runBlocking {
val buffer = RateLimitBufferImpl<Int>(2, Duration.ofSeconds(1))
val throttledFlow = rateLimiter(flowCreator(), buffer) { startTime, endTime, item ->
RateLimitingItemImpl(startTime, endTime, item)
}
logger.info { "[${Thread.currentThread().name}]: Let's start our jobs!" }
launch {
val totalTime = measureTimeMillis {
throttledFlow.collect { item ->
logger.debug { "Received $item" }
item.acknowledgement.endTime = System.nanoTime()
}
}
logger.info { "Total time: $totalTime" }
assert(totalTime > 5000)
assert(totalTime < 6000)
logger.info { "All Done!" }
}
}
private fun flowCreator(): Flow<Int> = flow {
for (x in 1..11) {
emit(x)
}
}
}
Conclusion
With Kotlin coroutines writing safe code to handle concurrency and sequential data flow has become easier than ever before. I suspect there are battle-tested libraries like Reslience4j which may already have ready-made functions that can be composed to do this kind of client-side rate limiting.
All the code is available here: https://github.com/sumitsarkar/ratelimiter