8000 [config change] add rate-limiting sections to global config and BMD · NVIDIA/aistore@8c49b6b · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content 8000

Commit 8c49b6b

Browse files
committed
[config change] add rate-limiting sections to global config and BMD
Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent f5a8126 commit 8c49b6b

File tree

7 files changed

+209
-66
lines changed

7 files changed

+209
-66
lines changed

cmn/api.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,13 @@ type (
5757
BID uint64 `json:"bid,string" list:"omit"` // unique ID
5858
Created int64 `json:"created,string" list:"readonly"` // creation timestamp
5959
Versioning VersionConf `json:"versioning"` // versioning (see "inherit")
60+
RateLimit RateLimitConf `json:"rate_limit"` // adaptive rate limiting (front, back) if enabled
6061
}
6162

6263
ExtraProps struct {
63-
AWS ExtraPropsAWS `json:"aws,omitempty" list:"omitempty"`
6464
HTTP ExtraPropsHTTP `json:"http,omitempty" list:"omitempty"`
6565
HDFS ExtraPropsHDFS `json:"hdfs,omitempty" list:"omitempty"` // NOTE: obsolete; rm with meta-version
66+
AWS ExtraPropsAWS `json:"aws,omitempty" list:"omitempty"`
6667
}
6768
ExtraToSet struct { // ref. bpropsFilterExtra
6869
AWS *ExtraPropsAWSToSet `json:"aws"`
@@ -124,6 +125,7 @@ type (
124125
Mirror *MirrorConfToSet `json:"mirror,omitempty"`
125126
EC *ECConfToSet `json:"ec,omitempty"`
126127
Access *apc.AccessAttrs `json:"access,string,omitempty"`
128+
RateLimit *RateLimitConfToSet `json:"rate_limit,omitempty"`
127129
Features *feat.Flags `json:"features,string,omitempty"`
128130
WritePolicy *WritePolicyConfToSet `json:"write_policy,omitempty"`
129131
Extra *ExtraToSet `json:"extra,omitempty"`
@@ -136,9 +138,9 @@ type (
136138
}
137139
)
138140

139-
/////////////////
140-
// Bprops //
141-
/////////////////
141+
//
142+
// bucket props (Bprops)
143+
//
142144

143145
// By default, created buckets inherit their properties from the cluster (global) configuration.
144146
// Global configuration, in turn, is protected versioned, checksummed, and replicated across the entire cluster.
@@ -168,6 +170,8 @@ func (bck *Bck) DefaultProps(c *ClusterConfig) *Bprops {
168170
if wp.Data.IsImmediate() {
169171
wp.Data = apc.WriteImmediate
170172
}
173+
174+
// inherit cluster defaults (w/ override via api.CreateBucket and api.SetBucketProps)
171175
return &Bprops{
172176
Cksum: cksum,
173177
LRU: lru,
@@ -176,6 +180,7 @@ func (bck *Bck) DefaultProps(c *ClusterConfig) *Bprops {
176180
Access: apc.AccessAll,
177181
EC: c.EC,
178182
WritePolicy: wp,
183+
RateLimit: c.RateLimit,
179184
Features: c.Features,
180185
}
181186
}

cmn/config.go

Lines changed: 157 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"encoding/json"
1111
"errors"
1212
"fmt"
13+
"math"
1314
"net/url"
1415
"os"
1516
"path"
@@ -118,6 +119,7 @@ type (
118119
Periodic PeriodConf `json:"periodic"`
119120
Mirror MirrorConf `json:"mirror" allow:"cluster"`
120121
Downloader DownloaderConf `json:"downloader"`
122+
RateLimit RateLimitConf `json:"rate_limit"`
121123

122124
// standalone enumerated features that can be configured
123125
// to flip assorted global defaults (see cmn/feat/feat.go)
@@ -155,6 +157,7 @@ type (
155157
TCB *TCBConfToSet `json:"tcb,omitempty"`
156158
WritePolicy *WritePolicyConfToSet `json:"write_policy,omitempty"`
157159
Proxy *ProxyConfToSet `json:"proxy,omitempty"`
160+
RateLimit *RateLimitConfToSet `json:"rate_limit"`
158161
Features *feat.Flags `json:"features,string,omitempty"`
159162

160163
// LocalConfig
@@ -670,6 +673,56 @@ type (
670673
}
671674
)
672675

676+
// global config that can be used to manage:
677+
// * adaptive rate limit vis-à-vis Cloud backend
678+
// * rate limiting (bursty) user workloads on the front
679+
type (
680+
RateLimitConf struct {
681+
Backend Adaptive `json:"backend"`
682+
Frontend Bursty `json:"frontend"`
683+
}
684+
RateLimitConfToSet struct {
685+
Backend *AdaptiveToSet `json:"backend,omitempty"`
686+
Frontend *BurstyToSet `json:"frontend,omitempty"`
687+
}
688+
RateLimitBase struct {
689+
Interval cos.Duration `json:"interval"`
690+
MaxTokens int `json:"max_tokens"`
691+
Enabled bool `json:"enabled"`
692+
}
693+
RateLimitBaseToSet struct {
694+
Interval *cos.Duration `json:"interval,omitempty"`
695+
MaxTokens *int `json:"max_tokens,omitempty"`
696+
Enabled *bool `json:"enabled,omitempty"`
697+
}
698+
699+
// Adaptive rate limit (a.k.a. rate shaper):
700+
// usage: handle status 429 and 503 from remote backends
701+
// - up to max tokens (originally, `MaxTokens`) during one Interval with NumRetries and
702+
// - `recompute` between Intervals
703+
// - see cmn/cos/rate_limit
704+
Adaptive struct {
705+
NumRetries int `json:"num_retries"`
706+
RateLimitBase
707+
}
708+
AdaptiveToSet struct {
709+
NumRetries *int `json:"num_retries,omitempty"`
710+
RateLimitBaseToSet
711+
}
712+
713+
// rate limit that fails 'too-many requests' while permitting a certain level of burstiness
714+
// - usage: to restrict the rate of user GET, PUT, and HEAD requests
715+
// - see cmn/cos/rate_limit
716+
Bursty struct {
717+
Size int `json:"burst_size"`
718+
RateLimitBase
719+
}
720+
BurstyToSet struct {
721+
Size *int `json:"burst_size,omitempty"`
722+
RateLimitBaseToSet
723+
}
724+
)
725+
673726
// assorted named fields that require (cluster | node) restart for changes to make an effect
674727
// (used by CLI)
675728
var ConfigRestartRequired = [...]string{"auth.secret", "memsys", "net"}
@@ -733,9 +786,9 @@ var (
733786
_ json.Unmarshaler = (*FSPConf)(nil)
734787
)
735788

736-
/////////////////////////////////////////////
737-
// Config and its nested (Cluster | Local) //
738-
/////////////////////////////////////////////
789+
//
790+
// Config and its nested (Cluster | Local) -------------------------
791+
//
739792

740793
// main config validator
741794
func (c *Config) Validate() error {
@@ -792,6 +845,50 @@ func (c *Config) TestingEnv() bool {
792845
return c.LocalConfig.TestingEnv()
793846
}
794847

848+
/////////////////
849+
// ConfigToSet //
850+
/////////////////
851+
852+
// FillFromQuery populates ConfigToSet from URL query values
853+
func (ctu *ConfigToSet) FillFromQuery(query url.Values) error {
854+
var anyExists bool
855+
for key := range query {
856+
if key == apc.ActTransient {
857+
continue
858+
}
859+
anyExists = true
860+
name, value := strings.ToLower(key), query.Get(key)
861+
if err := UpdateFieldValue(ctu, name, value); err != nil {
862+
return err
863+
}
864+
}
865+
866+
if !anyExists {
867+
return errors.New("no properties to update")
868+
}
869+
return nil
870+
}
871+
872+
func (ctu *ConfigToSet) Merge(update *ConfigToSet) {
873+
mergeProps(update, ctu)
874+
}
875+
876+
// FillFromKVS populates `ConfigToSet` from key value pairs of the form `key=value`
877+
func (ctu *ConfigToSet) FillFromKVS(kvs []string) (err error) {
878+
const format = "failed to parse `-config_custom` flag (invalid entry: %q)"
879+
for _, kv := range kvs {
880+
entry := strings.SplitN(kv, "=", 2)
881+
if len(entry) != 2 {
882+
return fmt.Errorf(format, kv)
883+
}
884+
name, value := entry[0], entry[1]
885+
if err := UpdateFieldValue(ctu, name, value); err != nil {
886+
return fmt.Errorf(format, kv)
887+
}
888+
}
889+
return
890+
}
891+
795892
///////////////////
796893
// ClusterConfig //
797894
///////////////////
@@ -825,6 +922,10 @@ func (c *LocalConfig) DelPath(mpath string) {
825922
c.FSP.Paths.Delete(mpath)
826923
}
827924

925+
//
926+
// config sections: validation, default settings, helpers ------------------------------
927+
//
928+
828929
////////////////
829930
// PeriodConf //
830931
////////////////
@@ -1281,7 +1382,7 @@ func KeepaliveRetryDuration(c *Config) time.Duration {
12811 3D11 382
}
12821383

12831384
/////////////
1284-
// NetConf //
1385+
// NetConf and NetConf.HTTPConf
12851386
/////////////
12861387

12871388
func (c *NetConf) Validate() (err error) {
@@ -1330,8 +1431,8 @@ func (c *HTTPConf) ToTLS() TLSArgs {
13301431
//////////////
13311432

13321433
const (
1333-
IOErrTimeDflt = 10 * time.Second
1334-
IOErrsLimit = 10
1434+
ioErrTimeDflt = 10 * time.Second
1435+
ioErrsLimit = 10
13351436
)
13361437

13371438
func (c *FSHCConf) Validate() error {
@@ -1343,14 +1444,14 @@ func (c *FSHCConf) Validate() error {
13431444
}
13441445

13451446
if c.IOErrs == 0 && c.IOErrTime == 0 {
1346-
c.IOErrs = IOErrsLimit
1347-
c.IOErrTime = cos.Duration(IOErrTimeDflt)
1447+
c.IOErrs = ioErrsLimit
1448+
c.IOErrTime = cos.Duration(ioErrTimeDflt)
13481449
}
13491450
if c.IOErrs == 0 {
1350-
c.IOErrs = IOErrsLimit
1451+
c.IOErrs = ioErrsLimit
13511452
}
13521453
if c.IOErrTime == 0 {
1353-
c.IOErrTime = cos.Duration(IOErrTimeDflt)
1454+
c.IOErrTime = cos.Duration(ioErrTimeDflt)
13541455
}
13551456

13561457
if c.IOErrs < 10 {
@@ -1773,8 +1874,8 @@ func (c *ResilverConf) String() string {
17731874
return confDisabled
17741875
}
17751876

1776-
///////////////////
1777-
// Tracing Conf //
1877+
/////////////////
1878+
// TracingConf //
17781879
/////////////////
17791880

17801881
const defaultSampleProbability = 1.0
@@ -1784,7 +1885,7 @@ func (c *TracingConf) Validate() error {
17841885
return nil
17851886
}
17861887
if c.ExporterEndpoint == "" {
1787-
return errors.New("invalid tracing.exporter_endpoint can't be empty when tracing is enabled")
1888+
return errors.New("tracing.exporter_endpoint can't be empty when tracing enabled")
17881889
}
17891890
if c.SamplerProbabilityStr == "" {
17901891
c.SamplerProbability = defaultSampleProbability
@@ -1798,56 +1899,61 @@ func (c *TracingConf) Validate() error {
17981899
return nil
17991900
}
18001901

1902+
// only with `oteltracing` build tag
18011903
func (tac TraceExporterAuthConf) IsEnabled() bool {
18021904
return tac.TokenFile != "" && tac.TokenHeader != ""
18031905
}
18041906

1805-
////////////////////
1806-
// ConfigToSet //
1807-
////////////////////
1907+
///////////////////
1908+
// RateLimitConf: adaptive (back) and bursty (front)
1909+
///////////////////
18081910

1809-
// FillFromQuery populates ConfigToSet from URL query values
1810-
func (ctu *ConfigToSet) FillFromQuery(query url.Values) error {
1811-
var anyExists bool
1812-
for key := range query {
1813-
if key == apc.ActTransient {
1814-
continue
1815-
}
1816-
anyExists = true
1817-
name, value := strings.ToLower(key), query.Get(key)
1818-
if err := UpdateFieldValue(ctu, name, value); err != nil {
1819-
return err
1820-
}
1821-
}
1911+
const (
1912+
dfltRateIval = time.Minute
1913+
dfltRateMaxTokens = 1000
1914+
dfltRateRetries = 3
1915+
dfltRateBurst = dfltRateMaxTokens>>1 - dfltRateMaxTokens>>3
1916+
)
18221917

1823-
if !anyExists {
1824-
return errors.New("no properties to update")
1918+
func (c *RateLimitConf) Validate() error {
1919+
const tag = "rate limit"
1920+
{
1921+
c.Backend.NumRetries = cos.NonZero(c.Backend.NumRetries, dfltRateRetries)
1922+
c.Backend.Interval = cos.Duration(cos.NonZero(c.Backend.Interval.D(), dfltRateIval))
1923+
c.Backend.MaxTokens = cos.NonZero(c.Backend.MaxTokens, dfltRateMaxTokens)
1924+
}
1925+
{
1926+
c.Frontend.Size = cos.NonZero(c.Frontend.Size, dfltRateBurst)
1927+
c.Frontend.Interval = cos.Duration(cos.NonZero(c.Frontend.Interval.D(), dfltRateIval))
1928+
c.Frontend.MaxTokens = cos.NonZero(c.Frontend.MaxTokens, dfltRateMaxTokens)
18251929
}
1826-
return nil
1827-
}
1828-
1829-
func (ctu *ConfigToSet) Merge(update *ConfigToSet) {
1830-
mergeProps(update, ctu)
1831-
}
18321930

1833-
// FillFromKVS populates `ConfigToSet` from key value pairs of the form `key=value`
1834-
func (ctu *ConfigToSet) FillFromKVS(kvs []string) (err error) {
1835-
const format = "failed to parse `-config_custom` flag (invalid entry: %q)"
1836-
for _, kv := range kvs {
1837-
entry := strings.SplitN(kv, "=", 2)
1838-
if len(entry) != 2 {
1839-
return fmt.Errorf(format, kv)
1840-
}
1841-
name, value := entry[0], entry[1]
1842-
if err := UpdateFieldValue(ctu, name, value); err != nil {
1843-
return fmt.Errorf(format, kv)
1844-
}
1931+
if c.Backend.Interval.D() < cos.DfltRateMinIval || c.Backend.Interval.D() > cos.DfltRateMaxIval {
1932+
return fmt.Errorf("%s: invalid backend.interval %v (min=%v, max=%v)", tag,
1933+
c.Backend.Interval, cos.DfltRateMinIval, cos.DfltRateMaxIval)
18451934
}
1846-
return
1935+
if c.Frontend.Interval.D() < cos.DfltRateMinIval || c.Frontend.Interval.D() > cos.DfltRateMaxIval {
1936+
return fmt.Errorf("%s: invalid frontend.interval %v (min=%v, max=%v)", tag,
1937+
c.Frontend.Interval, cos.DfltRateMinIval, cos.DfltRateMaxIval)
1938+
}
1939+
if c.Backend.MaxTokens <= 0 || c.Backend.MaxTokens >= math.MaxInt32 {
1940+
return fmt.Errorf("%s: invalid backend.max_tokens %d", tag, c.Backend.MaxTokens)
1941+
}
1942+
if c.Frontend.MaxTokens <= 0 || c.Frontend.MaxTokens >= math.MaxInt32 {
1943+
return fmt.Errorf("%s: invalid frontend.max_tokens %d", tag, c.Frontend.MaxTokens)
1944+
}
1945+
if c.Backend.NumRetries <= 0 || c.Backend.NumRetries > cos.DfltRateMaxRetries {
1946+
return fmt.Errorf("%s: invalid backend.num_retries %d", tag, c.Backend.NumRetries)
1947+
}
1948+
if c.Frontend.Size <= 0 || c.Frontend.Size > c.Frontend.MaxTokens*cos.DfltRateMaxBurstPct/100 {
1949+
return fmt.Errorf("%s: invalid frontend.burst_size %d (expecting positive integer <= (%d%% of maxTokens %d)",
1950+
tag, c.Frontend.Size, cos.DfltRateMaxBurstPct, c.Frontend.MaxTokens)
1951+
}
1952+
return nil
18471953
}
18481954

18491955
//
1850-
// misc config utils
1956+
// misc config utilities ---------------------------------------------------------
18511957
//
18521958

18531959
// checks if the two comma-separated IPv4 address lists contain at least one common IPv4

0 commit comments

Comments
 (0)
0