8000 multi-object copy/transform: op-code 'abort'; refactoring · NVIDIA/aistore@e5ec51a · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit e5ec51a

Browse files
committed
multi-object copy/transform: op-code 'abort'; refactoring
* tco to broadcast and receive (and handle) 'abort' * rw mutex not needed (fix) * DM not needed when single-node (fix) * tcb and tco, both: - prevent re-broadcasting op-code 'abort' (fix) * sentinels: - limited usage: broadcast 'abort' and 'done', the latter on a per message basis - consistently use header fields for opcode messaging * TODO: 1) quiescing logig; 2) progress and respective timeout * check membership change upon every multi-obj control message Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent cce5f41 commit e5ec51a

File tree

8 files changed

+153
-77
lines changed

8 files changed

+153
-77
lines changed

ais/tgttxn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ func (t *target) tcobjs(c *txnSrv, msg *cmn.TCOMsg, disableDM bool) (xid string,
671671
return xid, cmn.NewErrBckNotFound(bckFrom.Bucket())
672672
}
673673
// begin
674-
custom := &xreg.TCObjsArgs{BckFrom: bckFrom, BckTo: bckTo, Msg: &msg.TCOMsg, DisableDM: disableDM}
674+
custom := &xreg.TCOArgs{BckFrom: bckFrom, BckTo: bckTo, Msg: &msg.TCOMsg, DisableDM: disableDM}
675675
rns := xreg.RenewTCObjs(c.msg.Action /*kind*/, custom)
676676
if rns.Err != nil {
677677
nlog.Errorf("%s: %q %+v %v", t, c.uuid, c.msg, rns.Err)

xact/xreg/custom.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type (
2020
Phase string
2121
DisableDM bool
2222
}
23-
TCObjsArgs struct {
23+
TCOArgs struct {
2424
BckFrom *meta.Bck
2525
BckTo *meta.Bck
2626
Msg *apc.TCOMsg

xact/xreg/multiobj.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,6 @@ func RenewPrefetch(uuid string, bck *meta.Bck, msg *apc.PrefetchMsg) RenewRes {
2222
}
2323

2424
// kind: (apc.ActCopyObjects | apc.ActETLObjects)
25-
func RenewTCObjs(kind string, custom *TCObjsArgs) RenewRes {
25+
func RenewTCObjs(kind string, custom *TCOArgs) RenewRes {
2626
return RenewBucketXact(kind, custom.BckFrom, Args{Custom: custom}, custom.BckFrom, custom.BckTo)
2727
}

xact/xs/sentinel.go

Lines changed: 18 additions & 13 deletions
9E81
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ import (
2121
"github.com/NVIDIA/aistore/transport/bundle"
2222
)
2323

24-
// TODO:
25-
// - tco to include/support sentinel
26-
// - request progress only when not having timely update
27-
2824
// sentinel values
2925
const (
3026
opDone = iota + 27182
@@ -71,12 +67,18 @@ func (s *sentinel) cleanup() {
7167
s.pend.p = s.pend.p[:0]
7268
}
7369

74-
func (s *sentinel) bcast(dm *bundle.DM, abortErr error) {
70+
func (s *sentinel) bcast(uuid string, dm *bundle.DM, abortErr error) {
7571
o := transport.AllocSend()
7672
o.Hdr.Opcode = opDone
73+
if uuid != "" {
74+
o.Hdr.Opaque = cos.UnsafeB(uuid)
75+
}
7776
if abortErr != nil {
77+
if _, ok := abortErr.(*recvAbortErr); ok {
78+
return // do nothing
79+
}
7880
o.Hdr.Opcode = opAbort
79-
o.Hdr.Opaque = cos.UnsafeB(abortErr.Error()) // abort error via opaque
81+
o.Hdr.ObjName = abortErr.Error() // (compare w/ sendTerm)
8082
}
8183

8284
err := dm.Bcast(o, nil /*roc*/)
@@ -119,7 +121,8 @@ func (s *sentinel) qcb(dm *bundle.DM, tot, ival, progressTimeout time.Duration,
119121
}
120122

121123
// 2. check Smap; abort if membership changed
122-
if err := s.checkSmap(s.pend.p); err != nil {
124+
smap := core.T.Sowner().Get()
125+
if err := s.checkSmap(smap, s.pend.p); err != nil {
123126
return s._qabort(err)
124127
}
125128

@@ -152,8 +155,7 @@ func (s *sentinel) _qabort(err error) core.QuiRes {
152155
return core.QuiAborted
153156
}
154157

155-
func (s *sentinel) checkSmap(pending []string) error {
156-
smap := core.T.Sowner().Get()
158+
func (s *sentinel) checkSmap(smap *meta.Smap, pending []string) error {
157159
if nat := smap.CountActiveTs(); nat != s.nat {
158160
return cmn.NewErrMembershipChanges(fmt.Sprint(s.r.Name(), smap.String(), nat, s.nat))
159161
}
@@ -198,12 +200,15 @@ func (s *sentinel) rxDone(hdr *transport.ObjHdr) {
198200
}
199201

200202
func (s *sentinel) rxAbort(hdr *transport.ObjHdr) {
201-
if s.r.IsAborted() || s.r.Finished() {
203+
r := s.r
204+
if r.IsAborted() || r.Finished() {
202205
return
203206
}
204-
msg := cos.UnsafeS(hdr.Opaque)
205-
err := fmt.Errorf("%s: %s aborted, err: %s", s.r.Name(), meta.Tname(hdr.SID), msg)
206-
s.r.Abort(err)
207+
msg := hdr.ObjName
208+
err := &recvAbortErr{
209+
err: fmt.Errorf("%s: %s aborted, err: %s", r.Name(), meta.Tname(hdr.SID), msg),
210+
}
211+
r.Abort(err)
207212
nlog.WarningDepth(1, "recv 'abort':", err)
208213
}
209214

xact/xs/streaming.go

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ func (p *streamingF) WhenPrevIsRunning(xprev xreg.Renewable) (xreg.WPR, error) {
6464
return xreg.WprUse, nil
6565
}
6666

67-
// NOTE: transport endpoint (aka "trname") identifies the flow and MUST be identical
67+
// [NOTE]
68+
// transport endpoint (aka "trname") identifies the flow and MUST be identical
6869
// across all participating targets. The mechanism involves generating so-called "best-effort UUID"
6970
// independently on (by) all targets and using the latter as both xaction ID and receive endpoint (trname)
7071
// for target=>target streams.
@@ -135,6 +136,10 @@ func (p *streamingF) newDM(trname string, recv transport.RecvObj, config *cmn.Co
135136
return nil
136137
}
137138

139+
////////////////
140+
// streamingX //
141+
////////////////
142+
138143
func (r *streamingX) String() (s string) {
139144
s = r.DemandBase.String()
140145
if r.p.dm == nil {
@@ -154,23 +159,37 @@ func (r *streamingX) TxnAbort(err error) {
154159
r.Base.Finish()
155160
}
156161

157-
func (r *streamingX) sendTerm(uuid string, tsi *meta.Snode, err error) {
162+
// TODO: dup sentinel.bcast
163+
func (r *streamingX) sendTerm(uuid string, tsi *meta.Snode, abortErr error) {
158164
if r.p.dm == nil { // single target
159165
return
160166
}
161167
o := transport.AllocSend()
162168
o.Hdr.SID = core.T.SID()
163-
o.Hdr.Opaque = []byte(uuid)
164-
if err == nil {
169+
o.Hdr.Opaque = cos.UnsafeB(uuid)
170+
if abortErr == nil {
165171
o.Hdr.Opcode = opDone
166172
} else {
167173
o.Hdr.Opcode = opAbort
168-
o.Hdr.ObjName = err.Error()
174+
o.Hdr.ObjName = abortErr.Error()
169175
}
176+
177+
var err error
170178
if tsi != nil {
171-
r.p.dm.Send(o, nil, tsi) // to the responsible target
179+
err = r.p.dm.Send(o, nil, tsi) // to the responsible target
172180
} else {
173-
r.p.dm.Bcast(o, nil) // to all
181+
err = r.p.dm.Bcast(o, nil) // to all
182+
}
183+
184+
switch {
185+
case abortErr != nil:
186+
nlog.WarningDepth(1, r.String(), "aborted [", abortErr, err, "]")
187+
case err != nil:
188+
nlog.WarningDepth(1, r.String(), err)
189+
default:
190+
if cmn.Rom.FastV(4, cos.SmoduleXs) {
191+
nlog.Infoln(r.Name(), "done")
192+
}
174193
}
175194
}
176195

@@ -186,11 +205,11 @@ func (r *streamingX) fin(unreg bool) {
186205
r.Finish()
187206
if unreg && r.p.dm != nil {
188207
r.maxWt = 0
189-
hk.Reg(r.ID()+hk.NameSuffix, r.wurr, waitUnregRecv) // compare w/ lso
208+
hk.Reg(r.ID()+hk.NameSuffix, r._wurr, waitUnregRecv) // compare w/ lso
190209
}
191210
}
192211

193-
func (r *streamingX) wurr(int64) time.Duration {
212+
func (r *streamingX) _wurr(int64) time.Duration {
194213
if cnt := r.wiCnt.Load(); cnt > 0 {
195214
r.maxWt += waitUnregRecv
196215
if r.maxWt < waitUnregMax {

xact/xs/tcb.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package xs
77

88
import (
99
"encoding/binary"
10-
"fmt"
1110
"io"
1211
"strconv"
1312
"strings"
@@ -94,7 +93,6 @@ func (p *tcbFactory) Start() error {
9493

9594
r.owt = cmn.OwtCopy
9695
if p.kind == apc.ActETLBck {
97-
// TODO upon abort: call r.transform.Finish() to cleanup communicator's state
9896
r.owt = cmn.OwtTransform
9997
r.copier.getROC, r.transform, err = etl.GetOfflineTransform(args.Msg.Transform.Name, r)
10098
if err != nil {
@@ -111,11 +109,13 @@ func (p *tcbFactory) Start() error {
111109
return nil // ---->
112110
}
113111

112+
// TODO: sentinels require DM; no-DM still requires sentinels
114113
if r.args.DisableDM {
115114
return nil
116115
}
117116

118117
// data mover and sentinel
118+
// TODO: add ETL capability to provide Size(transformed-result)
119119
var sizePDU int32
120120
if p.kind == apc.ActETLBck {
121121
sizePDU = memsys.DefaultBufSize // `transport` to generate PDU-based traffic
@@ -274,7 +274,8 @@ func (r *XactTCB) init(p *tcbFactory, slab *m 3D11 emsys.Slab, config *cmn.Config, sma
274274
func (r *XactTCB) Run(wg *sync.WaitGroup) {
275275
// make sure `nat` hasn't changed between Start and now (highly unlikely)
276276
if r.dm != nil {
277-
if err := r.sntl.checkSmap(nil); err != nil {
277+
smap := core.T.Sowner().Get()
278+
if err := r.sntl.checkSmap(smap, nil); err != nil {
278279
r.Abort(err)
279280
wg.Done()
280281
return
@@ -291,6 +292,7 @@ func (r *XactTCB) Run(wg *sync.WaitGroup) {
291292
go worker.run(buf, slab)
292293
}
293294

295+
// run
294296
r.BckJog.Run()
295297
if r.args.Msg.Sync {
296298
r.prune.run() // the 2nd jgroup
@@ -303,14 +305,14 @@ func (r *XactTCB) Run(wg *sync.WaitGroup) {
303305
}
304306

305307
if r.dm != nil {
306-
r.sntl.bcast(r.dm, err) // broadcast: done | abort
308+
r.sntl.bcast("", r.dm, err) // broadcast: done | abort
307309
if !r.IsAborted() {
308310
r.sntl.initLast(mono.NanoTime())
309311
qui := r.Base.Quiesce(r.qival(), r.qcb) // when done: wait for others
310312
if qui == core.QuiAborted {
311313
err := r.AbortErr()
312314
debug.Assert(err != nil)
313-
r.sntl.bcast(r.dm, err) // broadcast: abort
315+
r.sntl.bcast("", r.dm, err) // broadcast: abort
314316
}
315317
}
316318
// close
@@ -384,10 +386,7 @@ func (r *XactTCB) recv(hdr *transport.ObjHdr, objReader io.Reader, err error) er
384386
case opResponse:
385387
r.sntl.rxProgress(hdr) // handle response: progress by others
386388
default:
387-
err := fmt.Errorf("invalid header opcode %d", hdr.Opcode)
388-
debug.AssertNoErr(err)
389-
r.Abort(err)
390-
return err
389+
return abortOpcode(r, hdr.Opcode)
391390
}
392391
return nil
393392
}

0 commit comments

Comments
 (0)
0