8000 enable/disable rate-limited backends · NVIDIA/aistore@c4b796a · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit c4b796a

Browse files
committed
enable/disable rate-limited backends
* part two, prev. commit: 666796f Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent 666796f commit c4b796a

File tree

3 files changed

+88
-66
lines changed

3 files changed

+88
-66
lines changed

ais/rlbackend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (bp *rlbackend) retry(ctx context.Context, err error, bck *meta.Bck, verb,
103103

104104
_ = ctx // TODO -- FIXME: in progress
105105
vlabs := map[string]string{stats.VarlabBucket: bck.Cname(""), stats.VarlabXactKind: ""}
106-
bp.t.StatsUpdater().AddWith(
106+
bp.t.statsT.AddWith(
107107
cos.NamedVal64{Name: metric, Value: int64(sleep), VarLabs: vlabs},
108108
)
109109
return sleep

ais/target.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ func (*target) interruptedRestarted() (i, r bool) {
106106

107107
func (t *target) Backend(bck *meta.Bck) core.Backend { // as core.Target
108108
if bck.IsRemoteAIS() {
109-
return t._rlbp(t.bps[apc.AIS], bck.Props, apc.AIS)
109+
aisbp := t.bps[apc.AIS]
110+
return t._rlbp(aisbp, bck.Props, apc.AIS)
110111
}
111112
provider := bck.Provider
112113
if bck.Props != nil {
@@ -133,12 +134,12 @@ func (t *target) _rlbp(bp core.Backend, bprops *cmn.Bprops, provider string) cor
133134
return t.rlbps[provider]
134135
}
135136

136-
func (t *target) initBackends(tstats *stats.Trunner) {
137+
func (t *target) initBackends() {
137138
t.bps = make(backends, 8)
138139
t.rlbps = make(rlbackends, 8)
139140

140141
config := cmn.GCO.Get()
141-
aisbp := backend.NewAIS(t, tstats, true)
142+
aisbp := backend.NewAIS(t, t.statsT, true)
142143
t.bps[apc.AIS] = aisbp // always present
143144

144145
if aisConf := config.Backend.Get(apc.AIS); aisConf != nil {
@@ -149,7 +150,7 @@ func (t *target) initBackends(tstats *stats.Trunner) {
149150
}
150151
}
151152

152-
if err := t.initBuiltTagged(tstats, config, true /*starting up*/); err != nil {
153+
if err := t.initBuiltTagged(config, true /*starting up*/); err != nil {
153154
cos.ExitLog(err)
154155
}
155156

@@ -161,31 +162,35 @@ func (t *target) initBackends(tstats *stats.Trunner) {
161162

162163
// - remote (e.g. cloud) backends w/ empty stubs unless populated via build tags
163164
// - enabled/disabled via config.Backend
164-
func (t *target) initBuiltTagged(tstats *stats.Trunner, config *cmn.Config, startingUp bool) error {
165-
var enabled, disabled, notlinked []string
166-
165+
func (t *target) initBuiltTagged(config *cmn.Config, startingUp bool) error {
166+
var (
167+
enabled []string
168+
disabled []string
169+
notlinked []string
170+
tstats = t.statsT
171+
)
167172
for provider := range apc.Providers {
168173
var (
169-
add core.Backend
174+
bp core.Backend
170175
err error
171176
)
172177
switch provider {
173178
case apc.AWS:
174-
add, err = backend.NewAWS(t, tstats, startingUp)
179+
bp, err = backend.NewAWS(t, tstats, startingUp)
175180
case apc.GCP:
176-
add, err = backend.NewGCP(t, tstats, startingUp)
181+
bp, err = backend.NewGCP(t, tstats, startingUp)
177182
case apc.Azure:
178-
add, err = backend.NewAzure(t, tstats, startingUp)
183+
bp, err = backend.NewAzure(t, tstats, startingUp)
179184
case apc.OCI:
180-
add, err = backend.NewOCI(t, tstats, startingUp)
185+
bp, err = backend.NewOCI(t, tstats, startingUp)
181186
case apc.HT:
182-
add, err = backend.NewHT(t, config, tstats, startingUp)
187+
bp, err = backend.NewHT(t, config, tstats, startingUp)
183188
case apc.AIS:
184189
continue
185190
default:
186191
return fmt.Errorf("unknown backend provider %q", provider)
187192
}
188-
t.bps[provider] = add
193+
t.bps[provider] = bp
189194

190195
configured := config.Backend.Get(provider) != nil
191196
switch {
@@ -429,7 +434,7 @@ func (t *target) Run() error {
429434

430435
tstats.RegMetrics(t.si)
431436

432-
t.initBackends(tstats) // (+ reg backend metrics)
437+
t.initBackends() // (+ reg backend metrics)
433438

434439
// end target metrics -----------------------
435440

ais/tgtcp.go

Lines changed: 67 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"github.com/NVIDIA/aistore/nl"
3030
"github.com/NVIDIA/aistore/reb"
3131
"github.com/NVIDIA/aistore/res"
32-
"github.com/NVIDIA/aistore/stats"
3332
"github.com/NVIDIA/aistore/xact"
3433
"github.com/NVIDIA/aistore/xact/xreg"
3534
jsoniter "github.com/json-iterator/go"
@@ -183,34 +182,38 @@ func (t *target) daeputMsg(w http.ResponseWriter, r *http.Request) {
183182
errorsOnly := msg.Value.(bool)
184183
t.statsT.ResetStats(errorsOnly)
185184
case apc.ActReloadBackendCreds:
186-
var (
187-
tstats = t.statsT.(*stats.Trunner)
188-
provider = msg.Name
189-
)
190-
if provider == "" { // all
191-
if err := t.initBuiltTagged(t.statsT.(*stats.Trunner), cmn.GCO.Get(), false); err != nil {
185+
provider := msg.Name
186+
187+
// all
188+
if provider == "" {
189+
if err := t.initBuiltTagged(cmn.GCO.Get(), false); err != nil {
192190
t.writeErr(w, r, err)
193191
}
192+
// rate-limited wrappers, respectively
193+
for provider, bp := range t.bps {
194+
t.rlbps[provider] = &rlbackend{Backend: bp, t: t}
195+
}
194196
return
195197
}
196-
197198
// one
198-
var add core.Backend
199+
var bp core.Backend
199200
switch provider {
200201
case apc.AWS:
201-
add, err = backend.NewAWS(t, tstats, false /*starting up*/)
202+
bp, err = backend.NewAWS(t, t.statsT, false /*starting up*/)
202203
case apc.GCP:
203-
add, err = backend.NewGCP(t, tstats, false)
204+
bp, err = backend.NewGCP(t, t.statsT, false)
204205
case apc.Azure:
205-
add, err = backend.NewAzure(t, tstats, false)
206+
bp, err = backend.NewAzure(t, t.statsT, false)
206207
case apc.OCI:
207-
add, err = backend.NewOCI(t, tstats, false)
208+
bp, err = backend.NewOCI(t, t.statsT, false)
208209
}
209210
if err != nil {
210211
t.writeErr(w, r, err)
211212
return
212213
}
213-
t.bps[provider] = add
214+
debug.Assert(bp != nil)
215+
t.bps[provider] = bp
216+
t.rlbps[provider] = &rlbackend{Backend: bp, t: t}
214217
case apc.ActStartMaintenance:
215218
if !t.ensureIntraControl(w, r, true /* from primary */) {
216219
return
@@ -246,7 +249,7 @@ func (t *target) daeputMsg(w http.ResponseWriter, r *http.Request) {
246249
}
247250

248251
func (t *target) daeputItems(w http.ResponseWriter, r *http.Request, apiItems []string) {
249-
switch apiItems[0] {
252+
switch act := apiItems[0]; act {
250253
case apc.Proxy:
251254
// PUT /v1/daemon/proxy/newprimaryproxyid
252255
t.daeSetPrimary(w, r, apiItems)
@@ -263,41 +266,53 @@ func (t *target) daeputItems(w http.ResponseWriter, r *http.Request, apiItems []
263266
t.handleMpathReq(w, r)
264267
case apc.ActSetConfig: // set-config #1 - via query parameters and "?n1=v1&n2=v2..."
265268
t.setDaemonConfigQuery(w, r)
266-
case apc.ActEnableBackend:
267-
t.regstate.mu.Lock()
268-
t.enableBackend(w, r, apiItems)
269-
t.regstate.mu.Unlock()
270-
case apc.ActDisableBackend:
269+
case apc.ActEnableBackend, apc.ActDisableBackend:
271270
t.regstate.mu.Lock()
272-
t.disableBackend(w, r, apiItems)
273-
t.regstate.mu.Unlock()
271+
defer t.regstate.mu.Unlock()
272+
if len(apiItems) < 3 { // act, provider, phase
273+
t.writeErrURL(w, r)
274+
return
275+
}
276+
var (
277+
provider = apiItems[1]
278+
phase = apiItems[2]
279+
)
280+
if !apc.IsCloudProvider(provider) {
281+
t.writeErrf(w, r, "expecting cloud storage provider (have %q)", provider)
282+
return
283+
}
284+
if phase != apc.ActBegin && phase != apc.ActCommit {
285+
t.writeErrf(w, r, "expecting 'begin' or 'commit' phase (have %q)", phase)
286+
return
287+
}
288+
if act == apc.ActEnableBackend {
289+
t.enableBackend(w, r, provider, phase)
290+
} else {
291+
t.disableBackend(w, r, provider, phase)
292+
}
274293
case apc.LoadX509:
275294
t.daeLoadX509(w, r)
276295
}
277296
}
278297

279-
func (t *target) enableBackend(w http.ResponseWriter, r *http.Request, items []string) {
298+
func (t *target) enableBackend(w http.ResponseWriter, r *http.Request, provider, phase string) {
280299
var (
281-
provider = items[1]
282-
phase = items[2]
283-
config = cmn.GCO.Get()
300+
config = cmn.GCO.Get()
284301
)
285-
debug.Assert(apc.IsCloudProvider(provider), provider)
286-
debug.Assert(phase == apc.ActBegin || phase == apc.ActCommit, phase)
287-
288302
_, ok := config.Backend.Providers[provider]
289303
if !ok {
290304
t.writeErrf(w, r, "backend %q is not configured, cannot enable", provider)
291305
return
292306
}
293307
bp, k := t.bps[provider]
294308
debug.Assert(k, provider)
295-
if bp != nil {
296-
// TODO: return http.StatusNoContent
309+
310+
switch {
311+
case bp != nil:
297312
t.writeErrf(w, r, "backend %q is already enabled, nothing to do", provider)
298-
return
299-
}
300-
if phase == apc.ActCommit {
313+
case phase == apc.ActBegin:
314+
nlog.Infof("ready to enable backend %q", provider)
315+
default:
301316
var err error
302317
switch provider {
303318
case apc.AWS:
@@ -314,37 +329,37 @@ func (t *target) enableBackend(w http.ResponseWriter, r *http.Request, items []s
314329
t.writeErr(w, r, err)
315330
return
316331
}
332+
333+
debug.Assert(bp != nil)
317334
t.bps[provider] = bp
335+
t.rlbps[provider] = &rlbackend{Backend: bp, t: t}
336+
337+
nlog.Infof("enabled backend %q", provider)
318338
}
319-
nlog.Infoln(phase+":", "enable", provider)
320339
}
321340

322-
func (t *target) disableBackend(w http.ResponseWriter, r *http.Request, items []string) {
341+
func (t *target) disableBackend(w http.ResponseWriter, r *http.Request, provider, phase string) {
323342
var (
324-
provider = items[1]
325-
phase = items[2]
326-
config = cmn.GCO.Get()
343+
config = cmn.GCO.Get()
327344
)
328-
debug.Assert(apc.IsCloudProvider(provider), provider)
329-
debug.Assert(phase == apc.ActBegin || phase == apc.ActCommit, phase)
330-
331345
_, ok := config.Backend.Providers[provider]
332346
if !ok {
333-
// TODO: return http.StatusNoContent
334347
t.writeErrf(w, r, "backend %q is not configured, nothing to do", provider)
335348
return
336349
}
337350
bp, k := t.bps[provider]
338351
debug.Assert(k, provider)
339-
if bp == nil {
340-
// TODO: return http.StatusNoContent
352+
353+
switch {
354+
case bp == nil:
341355
t.writeErrf(w, r, "backend %q is already disabled, nothing to do", provider)
342-
return
343-
}
344-
if phase == apc.ActCommit {
356+
case phase == apc.ActBegin:
357+
nlog.Infof("ready to disable backend %q", provider)
358+
default:
359+
// NOTE: not locking bp := t.Backend()
345360
t.bps[provider] = nil
361+
nlog.Infof("disabled backend %q", provider)
346362
}
347-
nlog.Infoln(phase+":", "disable", provider)
348363
}
349364

350365
func (t *target) daeSetPrimary(w http.ResponseWriter, r *http.Request, apiItems []string) {
@@ -1205,7 +1220,9 @@ func (t *target) receiveConfig(newConfig *globalConfig, msg *actMsgExt, payload
12051220
if aisConf := newConfig.Backend.Get(apc.AIS); aisConf != nil {
12061221
err = t.attachDetachRemAis(newConfig, msg)
12071222
} else {
1208-
t.bps[apc.AIS] = backend.NewAIS(t, t.statsT, false)
1223+
aisbp := backend.NewAIS(t, t.statsT, false)
1224+
t.bps[apc.AIS] = aisbp
1225+
t.rlbps[apc.AIS] = &rlbackend{Backend: aisbp, t: t}
12091226
}
12101227
}
12111228
return

0 commit comments

Comments
 (0)
0