Expose rate limit bucket tokens as metric (#502)
This commit is contained in:
+19
-3
@@ -111,13 +111,13 @@ func (c *Connector) UseClientFilter(filter *ClientFilter) {
|
||||
c.clientFilter = filter
|
||||
}
|
||||
|
||||
func (c *Connector) StartAcceptingConnections(listenAddress string, connRateLimit int) error {
|
||||
func (c *Connector) StartAcceptingConnections(listenAddress string, connRateLimit int, metricsPeriod time.Duration) error {
|
||||
ln, err := c.createListener(listenAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go c.acceptConnections(ln, connRateLimit)
|
||||
go c.acceptConnections(ln, connRateLimit, metricsPeriod)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -201,11 +201,14 @@ func (c *Connector) AcceptConnection(conn net.Conn) {
|
||||
go c.HandleConnection(conn)
|
||||
}
|
||||
|
||||
func (c *Connector) acceptConnections(ln net.Listener, connRateLimit int) {
|
||||
func (c *Connector) acceptConnections(ln net.Listener, connRateLimit int, metricsPeriod time.Duration) {
|
||||
//noinspection GoUnhandledErrorResult
|
||||
defer ln.Close()
|
||||
|
||||
bucket := ratelimit.NewBucketWithRate(float64(connRateLimit), int64(connRateLimit*2))
|
||||
if metricsPeriod > 0 {
|
||||
go c.bucketMetrics(bucket, metricsPeriod)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -223,6 +226,19 @@ func (c *Connector) acceptConnections(ln net.Listener, connRateLimit int) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connector) bucketMetrics(bucket *ratelimit.Bucket, period time.Duration) {
|
||||
ticker := time.NewTicker(period)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.metrics.RateLimitAvailable.Set(float64(bucket.Available()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connector) HandleConnection(frontendConn net.Conn) {
|
||||
c.metrics.ConnectionsFrontend.Add(1)
|
||||
//noinspection GoUnhandledErrorResult
|
||||
|
||||
Reference in New Issue
Block a user