8000 [config change] add cluster config (tco, arch) sections; tcb burst · NVIDIA/aistore@fc3d8f3 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit fc3d8f3

Browse files
committed
[config change] add cluster config (tco, arch) sections; tcb burst
* add `TCOConf` and `ArchConf` (new) * add TCB (channel size) burst * all three sections (`TCBConf`, `TCOConf`, `ArchConf`) will now have same knobs - embed the same struct - honor configured compression - ie., utilize transport to compress in flight - share the same config validation * burst (chan size): 256 minimum w/ validated upper limit 10,000 * max parallelism += 4 Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent a419869 commit fc3d8f3

File tree

10 files changed

+99
-48
lines changed

10 files changed

+99
-48
lines changed

cmd/aisinit/config.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,11 @@ var (
3737
}
3838

3939
defaultTCB = aiscmn.TCBConf{
40-
Compression: aisapc.CompressNever,
41-
SbundleMult: 2,
40+
XactConf: aiscmn.XactConf{
41+
Compression: aisapc.CompressNever,
42+
SbundleMult: 2,
43+
Burst: 512,
44+
},
4245
}
4346

4447
defaultDisk = aiscmn.DiskConf{

cmn/config.go

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ type (
102102
Proxy ProxyConf `json:"proxy" allow:"cluster"`
103103
Auth AuthConf `json:"auth"`
104104
Cksum CksumConf `json:"checksum"`
105-
TCB TCBConf `json:"tcb"` // transform (offline) or copy src bucket => dst bucket
105+
TCB TCBConf `json:"tcb"`
106+
TCO TCOConf `json:"tco"`
107+
Arch ArchConf `json:"arch"`
106108
Tracing TracingConf `json:"tracing"`
107109
Keepalive KeepaliveConf `json:"keepalivetracker"`
108110
Rebalance RebalanceConf `json:"rebalance" allow:"cluster"`
@@ -655,15 +657,30 @@ type (
655657
MinPctFree *int `json:"min_pct_free,omitempty"`
656658
}
657659

658-
TCBConf struct {
660+
// generic xaction --
661+
XactConf struct {
659662
Compression string `json:"compression"` // enum { CompressAlways, ... } in api/apc/compression.go
660663
SbundleMult int `json:"bundle_multiplier"` // stream-bundle multiplier: num streams to destination
664+
Burst int `json:"burst_buffer"` // xaction channel (buffer) size
661665
}
662-
TCBConfToSet struct {
666+
XactConfToSet struct {
663667
Compression *string `json:"compression,omitempty"`
664668
SbundleMult *int `json:"bundle_multiplier,omitempty"`
669+
Burst *int `json:"burst_buffer,omitempty"`
665670
}
666671

672+
// bucket-to-bucket copy/transform
673+
TCBConf struct{ XactConf }
674+
TCBConfToSet struct{ XactConfToSet }
675+
676+
// multi-object copy/transform
677+
TCOConf struct{ XactConf }
678+
TCOConfToSet struct{ XactConfToSet }
679+
680+
// multi-object archive (multiple objects => shard)
681+
ArchConf struct{ XactConf }
682+
ArchConfToSet struct{ XactConfToSet }
683+
667684
WritePolicyConf struct {
668685
Data apc.WritePolicy `json:"data"`
669686
MD apc.WritePolicy `json:"md"`
@@ -1779,18 +1796,21 @@ func (c *TransportConf) Validate() (err error) {
17791796
return nil
17801797
}
17811798

1782-
/////////////
1783-
// TCBConf //
1784-
/////////////
1799+
//////////////
1800+
// XactConf //
1801+
//////////////
17851802

1786-
func (c *TCBConf) Validate() error {
1787-
if c.SbundleMult < 0 || c.SbundleMult > 16 {
1788-
return fmt.Errorf("invalid tcb.bundle_multiplier: %v (expected range [0, 16])", c.SbundleMult)
1789-
}
1803+
func (c *XactConf) Validate() error {
17901804
if !apc.IsValidCompression(c.Compression) {
1791-
return fmt.Errorf("invalid tcb.compression: %q (expecting one of: %v)",
1805+
return fmt.Errorf("invalid compression: %q (expecting one of: %v)",
17921806
c.Compression, apc.SupportedCompression)
17931807
}
1808+
if c.SbundleMult < 0 || c.SbundleMult > 16 {
1809+
return fmt.Errorf("invalid bundle_multiplier: %v (expected range [0, 16])", c.SbundleMult)
1810+
}
1811+
if c.Burst < 0 || c.Burst > 10_000 {
1812+
return fmt.Errorf("invalid burst_buffer: %v (expected range [0, 10000])", c.Burst)
1813+
}
17941814
return nil
17951815
}
17961816

xact/xs/archive.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/NVIDIA/aistore/core/meta"
2929
"github.com/NVIDIA/aistore/fs"
3030
"github.com/NVIDIA/aistore/transport"
31+
"github.com/NVIDIA/aistore/transport/bundle"
3132
"github.com/NVIDIA/aistore/xact"
3233
"github.com/NVIDIA/aistore/xact/xreg"
3334
)
@@ -114,18 +115,31 @@ func (p *archFactory) Start() (err error) {
114115
if err != nil {
115116
return err
116117
}
117-
//
118+
118119
// new x-archive
119-
//
120-
r := &XactArch{streamingX: streamingX{p: &p.streamingF, config: cmn.GCO.Get()}}
121-
r.pending.m = make(map[string]*archwi, maxNumInParallel)
120+
var (
121+
config = cmn.GCO.Get()
122+
burst = max(256, config.Arch.Burst)
123+
r = &XactArch{streamingX: streamingX{p: &p.streamingF, config: config}}
124+
)
125+
r.pending.m = make(map[string]*archwi, burst)
126+
122127
avail := fs.GetAvail()
123128
r.joggers.m = make(map[string]*jogger, len(avail))
124129
r.smap = core.T.Sowner().Get()
125130
p.xctn = r
126-
r.DemandBase.Init(p.UUID(), p.kind, "" /*delay ctlmsg until Do()*/, p.Bck /*from*/, xact.IdleDefault)
127131

128-
if err := p.newDM(p.Args.UUID /*trname*/, r.recv, r.config, r.smap, cmn.OwtPut, 0 /*pdu*/); err != nil {
132+
// delay ctlmsg until DoMsg()
133+
r.DemandBase.Init(p.UUID(), p.kind, "" /*ctlmsg*/, p.Bck /*from*/, xact.IdleDefault)
134+
135+
dmxtra := bundle.Extra{
136+
RecvAck: nil, // no ACKs
137+
Config: r.config,
138+
Compression: r.config.Arch.Compression,
139+
Multiplier: r.config.Arch.SbundleMult,
140+
SizePDU: 0,
141+
}
142+
if err := p.newDM(p.Args.UUID /*trname*/, r.recv, r.smap, dmxtra, cmn.OwtPut); err != nil {
129143
return err
130144
}
131145

@@ -163,6 +177,7 @@ func (r *XactArch) BeginMsg(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err erro
163177

164178
// bind a new/existing jogger to this archwi based on archlom's mountpath
165179
var (
180+
burst = max(256, r.config.Arch.Burst)
166181
mpath = archlom.Mountpath()
167182
exists bool
168183
)
@@ -171,7 +186,7 @@ func (r *XactArch) BeginMsg(msg *cmn.ArchiveBckMsg, archlom *core.LOM) (err erro
171186
r.joggers.m[mpath.Path] = &jogger{
172187
r: r,
173188
mpath: mpath,
174-
workCh: make(chan *archtask, maxNumInParallel*2), // TODO -- FIXME: unify
189+
workCh: make(chan *archtask, burst),
175190
}
176191
wi.j = r.joggers.m[mpath.Path]
177192
r.joggers.wg.Add(1)
@@ -253,7 +268,7 @@ func (r *XactArch) DoMsg(msg *cmn.ArchiveBckMsg) {
253268
// lrpWorkersNone since we need a single writer to serialize adding files
254269
// into an eventual `archlom`
255270
lrit := &lrit{}
256-
err := lrit.init(r, &msg.ListRange, r.Bck(), nwpNone)
271+
err := lrit.init(r, &msg.ListRange, r.Bck(), nwpNone, r.config.Arch.Burst)
257272
if err != nil {
258273
r.Abort(err)
259274
r.DecPending()

xact/xs/evict.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func newEvictDelete(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.ListR
7676

7777
// default num-workers hardcoded
7878
// (currently, always num mountpaths)
79-
if err := r.lrit.init(r, msg, bck, nwpDflt); err != nil {
79+
if err := r.lrit.init(r, msg, bck, nwpDflt, 0 /*burst*/); err != nil {
8080
return nil, err
8181
}
8282

xact/xs/lrit.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ type (
9191
// lrit //
9292
//////////
9393

94-
func (r *lrit) init(xctn lrxact, msg *apc.ListRange, bck *meta.Bck, numWorkers int) error {
94+
func (r *lrit) init(xctn lrxact, msg *apc.ListRange, bck *meta.Bck, numWorkers, confBurst int) error {
9595
l := len(fs.GetAvail())
9696
if l == 0 {
9797
xctn.Abort(cmn.ErrNoMountpaths)
@@ -138,18 +138,18 @@ func (r *lrit) init(xctn lrxact, msg *apc.ListRange, bck *meta.Bck, numWorkers i
138138
return err
139139
}
140140

141-
r._iniNwp(numWorkers)
141+
r._iniNwp(numWorkers, confBurst)
142142
return nil
143143
}
144144

145-
func (r *lrit) _iniNwp(numWorkers int) {
145+
func (r *lrit) _iniNwp(numWorkers, confBurst int) {
146146
r.nwp.workers = make([]*lrworker, 0, numWorkers)
147147
for range numWorkers {
148148
r.nwp.workers = append(r.nwp.workers, &lrworker{r})
149149
}
150150

151151
// [burst] work channel capacity: up to 4 pending work items per
152-
r.nwp.workCh = make(chan lrpair, numWorkers*nwpBurst)
152+
r.nwp.workCh = make(chan lrpair, max(numWorkers*nwpBurst, confBurst))
153153
nlog.Infoln(r.parent.Name(), "workers:", numWorkers)
154154
}
155155

xact/xs/prefetch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (*prfFactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) {
105105
func newPrefetch(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.PrefetchMsg) (r *prefetch, err error) {
106106
r = &prefetch{config: cmn.GCO.Get(), msg: msg}
107107

108-
err = r.lrit.init(r, &msg.ListRange, bck, msg.NumWorkers)
108+
err = r.lrit.init(r, &msg.ListRange, bck, msg.NumWorkers, 0 /*burst*/)
109109
if err != nil {
110110
return nil, err
111111
}

xact/xs/streaming.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ const (
3131
waitRegRecv = 4 * time.Second
3232
waitUnregRecv = 2 * waitRegRecv
3333
waitUnregMax = 2 * waitUnregRecv
34-
35-
// TODO -- FIXME: grow 2x upon chanFull
36-
maxNumInParallel = 256
3734
)
3835

3936
type (
@@ -106,16 +103,14 @@ func (p *streamingF) genBEID(fromBck, toBck *meta.Bck) (string, error) {
106103
return "", err
107104
}
108105

109-
func (p *streamingF) newDM(trname string, recv transport.RecvObj, config *cmn.Config, smap *meta.Smap, owt cmn.OWT, sizePDU int32) error {
106+
func (p *streamingF) newDM(trname string, recv transport.RecvObj, smap *meta.Smap, dmxtra bundle.Extra, owt cmn.OWT) error {
110107
if err := core.InMaintOrDecomm(smap, core.T.Snode(), p.xctn); err != nil {
111108
return err
112109
}
113110
if smap.CountActiveTs() <= 1 {
114111
return nil
115112
}
116113

117-
// consider adding config.X.Compression, config.X.SbundleMult (currently, always 1), etc.
118-
dmxtra := bundle.Extra{Config: config, Multiplier: 1, SizePDU: sizePDU}
119114
p.dm = bundle.NewDM(trname, recv, owt, dmxtra)
120115

121116
err := p.dm.RegRecv()

xact/xs/tcb.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (r *XactTCB) _iniNwp(numWorkers int) {
165165
for range numWorkers {
166166
r.numwp.workers = append(r.numwp.workers, tcbworker{r})
167167
}
168-
r.numwp.workCh = make(chan core.LIF, numWorkers*nwpBurst)
168+
r.numwp.workCh = make(chan core.LIF, max(numWorkers*nwpBurst, r.Config.TCB.Burst))
169169
r.numwp.stopCh = cos.NewStopCh()
170170
nlog.Infoln(r.Name(), "workers:", numWorkers)
171171
}
@@ -333,6 +333,7 @@ func (r *XactTCB) Run(wg *sync.WaitGroup) {
333333
r.dm.UnregRecv()
334334
}
335335
if r.args.Msg.Sync {
336+
// TODO -- FIXME: revisit stopCh and related
336337
r.prune.wait()
337338
}
338339

xact/xs/tcobjs.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/NVIDIA/aistore/memsys"
2727
"github.com/NVIDIA/aistore/stats"
2828
"github.com/NVIDIA/aistore/transport"
29+
"github.com/NVIDIA/aistore/transport/bundle"
2930
"github.com/NVIDIA/aistore/xact"
3031
"github.com/NVIDIA/aistore/xact/xreg"
3132
)
@@ -100,9 +101,16 @@ func (p *tcoFactory) Start() error {
100101
p.Args.UUID = PrefixTcoID + uuid
101102

102103
// new x-tco
103-
workCh := make(chan *cmn.TCOMsg, maxNumInParallel)
104-
r := &XactTCO{streamingX: streamingX{p: &p.streamingF, config: cmn.GCO.Get()}, args: p.args, workCh: workCh}
105-
r.pend.m = make(map[string]*tcowi, maxNumInParallel)
104+
var (
105+
config = cmn.GCO.Get()
106+
burst = max(256, config.TCO.Burst)
107+
r = &XactTCO{
108+
streamingX: streamingX{p: &p.streamingF, config: config},
109+
args: p.args,
110+
workCh: make(chan *cmn.TCOMsg, burst),
111+
}
112+
)
113+
r.pend.m = make(map[string]*tcowi, burst)
106114
r.owt = cmn.OwtCopy
107115

108116
if p.kind == apc.ActETLObjects {
@@ -131,7 +139,14 @@ func (p *tcoFactory) Start() error {
131139

132140
// TODO: sentinels require DM; no-DM still requires sentinels
133141
if useDM := !r.args.DisableDM; useDM && nat > 1 {
134-
if err := p.newDM(p.Args.UUID /*trname*/, r.recv, r.config, smap, r.owt, sizePDU); err != nil {
142+
dmxtra := bundle.Extra{
143+
RecvAck: nil, // no ACKs
144+
Config: r.config,
145+
Compression: r.config.TCO.Compression,
146+
Multiplier: r.config.TCO.SbundleMult,
147+
SizePDU: sizePDU,
148+
}
149+
if err := p.newDM(p.Args.UUID /*trname*/, r.recv, smap, dmxtra, r.owt); err != nil {
135150
return err
136151
}
137152
}
@@ -231,7 +246,7 @@ func (r *XactTCO) doMsg(msg *cmn.TCOMsg) (stop bool) {
231246
wi.pend.n.Store(int64(r.sntl.nat - 1)) // must dec down to zero
232247

233248
lrit := &lrit{}
234-
if err := lrit.init(r, &msg.ListRange, r.Bck(), msg.NumWorkers); err != nil {
249+
if err := lrit.init(r, &msg.ListRange, r.Bck(), msg.NumWorkers, r.config.TCO.Burst); err != nil {
235250
r.AddErr(err)
236251
return !msg.ContinueOnError // stop?
237252
}
@@ -248,6 +263,7 @@ func (r *XactTCO) doMsg(msg *cmn.TCOMsg) (stop bool) {
248263
}
249264
// run
250265
if msg.Sync && lrit.lrp != lrpList {
266+
// TODO -- FIXME: revisit stopCh and related
251267
wg = &sync.WaitGroup{}
252268
wg.Add(1)
253269
go func(pt *cos.ParsedTemplate, wg *sync.WaitGroup) {
@@ -256,11 +272,11 @@ func (r *XactTCO) doMsg(msg *cmn.TCOMsg) (stop bool) {
256272
}(lrit.pt.Clone(), wg)
257273
}
258274
err := lrit.run(wi, smap, true /*prealloc buf*/)
275+
276+
lrit.wait()
259277
if wg != nil {
260278
wg.Wait()
261279
}
262-
263-
lrit.wait()
264280
if r.IsAborted() {
265281
return true // stop
266282
}
@@ -437,7 +453,7 @@ func (r *XactTCO) prune(pruneit *lrit, smap *meta.Smap, pt *cos.ParsedTemplate)
437453
var syncit lrit
438454
debug.Assert(pruneit.lrp == lrpRange)
439455

440-
err := syncit.init(pruneit.parent, pruneit.msg, rp.bckTo, nwpDflt) // TODO -- FIXME: stopCh
456+
err := syncit.init(pruneit.parent, pruneit.msg, rp.bckTo, nwpDflt, r.config.TCO.Burst)
441457
debug.AssertNoErr(err)
442458
syncit.pt = pt
443459
syncwi := &syncwi{&rp} // reusing only prune.do (and not init/run/wait)

xact/xs/xutils.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,11 @@ func (rate *tcrate) acquire() {
6666
//
6767

6868
const (
69-
nwpBurst = 4 // burst multiplier (channel size)
70-
nwpNone = -1 // no workers at all - iterated LOMs get executed by the (iterating) goroutine
71-
nwpMin = 2 // throttled
72-
nwpDflt = 0 // (number of mountpaths)
69+
nwpBurst = 32 // work channel burst multiplier (channel size = burst * num-workers)
70+
71+
nwpNone = -1 // no workers at all - iterated LOMs get executed by the (iterating) goroutine
72+
nwpMin = 2 // throttled
73+
nwpDflt = 0 // (number of mountpaths)
7374
)
7475

7576
// strict rules
@@ -82,8 +83,8 @@ func throttleNwp(xname string, n int) (int, error) {
8283
return nwpNone, nil
8384
}
8485

85-
// 2. do not exceed num available cores
86-
n = min(sys.MaxParallelism(), n)
86+
// 2. number of available cores(*)
87+
n = min(sys.MaxParallelism()+4, n)
8788

8889
// 3. factor in memory pressure
8990
var (

0 commit comments

Comments
 (0)
0