Skip to content

Commit

Permalink
Fixes #70: leaky bucket overflow bug
Browse files Browse the repository at this point in the history
  • Loading branch information
mennanov committed Jul 17, 2024
1 parent 647439b commit a5665ab
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 11 deletions.
7 changes: 3 additions & 4 deletions leakybucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type LeakyBucket struct {
backend LeakyBucketStateBackend
clock Clock
logger Logger
// Capacity is the maximum allowed number of tockens in the bucket.
// Capacity is the maximum allowed number of tokens in the bucket.
capacity int64
// Rate is the output rate: 1 request per the rate duration (in nanoseconds).
rate int64
Expand All @@ -66,8 +66,7 @@ func NewLeakyBucket(capacity int64, rate time.Duration, locker DistLocker, leaky
}

// Limit returns the time duration to wait before the request can be processed.
// If the last request happened earlier than the rate this method returns zero duration.
// It returns ErrLimitExhausted if the the request overflows the bucket's capacity. In this case the returned duration
// It returns ErrLimitExhausted if the request overflows the bucket's capacity. In this case the returned duration
// means how long it would have taken to wait for the request to be processed if the bucket was not overflowed.
func (t *LeakyBucket) Limit(ctx context.Context) (time.Duration, error) {
t.mu.Lock()
Expand Down Expand Up @@ -100,7 +99,7 @@ func (t *LeakyBucket) Limit(ctx context.Context) (time.Duration, error) {
}

wait := state.Last - now
if wait/t.rate > t.capacity {
if wait/t.rate >= t.capacity {
return time.Duration(wait), ErrLimitExhausted
}
if err = t.backend.SetState(ctx, state); err != nil {
Expand Down
21 changes: 14 additions & 7 deletions leakybucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"

l "github.com/mennanov/limiters"
)
Expand Down Expand Up @@ -116,32 +117,38 @@ func (s *LimitersTestSuite) TestLeakyBucketOverflow() {
for name, bucket := range s.leakyBuckets(capacity, rate, clock) {
s.Run(name, func() {
clock.reset()
// The first call has no wait since there were no calls before. It does not increment the queue size.
// The first call has no wait since there were no calls before.
wait, err := bucket.Limit(context.TODO())
s.Require().NoError(err)
s.Equal(time.Duration(0), wait)
// The second call increments the queue size by 1.
wait, err = bucket.Limit(context.TODO())
s.Require().NoError(err)
s.Equal(rate, wait)
// The third call increments the queue size by 1.
wait, err = bucket.Limit(context.TODO())
s.Require().NoError(err)
s.Equal(rate*2, wait)
// The third call overflows the bucket capacity.
wait, err = bucket.Limit(context.TODO())
s.Require().Equal(l.ErrLimitExhausted, err)
s.Equal(rate*3, wait)
s.Equal(rate*2, wait)
// Move the Clock 1 position forward.
clock.Sleep(rate)
// Retry the last call. This time it should succeed.
wait, err = bucket.Limit(context.TODO())
s.Require().NoError(err)
s.Equal(rate*2, wait)
s.Equal(rate, wait)
})
}
}

func TestLeakyBucket_ZeroCapacity_ReturnsError(t *testing.T) {
capacity := int64(0)
rate := time.Hour
logger := l.NewStdLogger()
bucket := l.NewLeakyBucket(capacity, rate, l.NewLockNoop(), l.NewLeakyBucketInMemory(), newFakeClock(), logger)
wait, err := bucket.Limit(context.TODO())
require.Equal(t, l.ErrLimitExhausted, err)
require.Equal(t, time.Duration(0), wait)
}

func BenchmarkLeakyBuckets(b *testing.B) {
s := new(LimitersTestSuite)
s.SetT(&testing.T{})
Expand Down

0 comments on commit a5665ab

Please sign in to comment.