8000 EC: close all readers when rebuilding object from slices · NVIDIA/aistore@54ebc11 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit 54ebc11

Browse files
VladimirMarkelovalex-aizman
authored andcommitted
EC: close all readers when rebuilding object from slices
* fix file descriptor leakage Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent 26ed156 commit 54ebc11

File tree

2 files changed

+67
-8
lines changed

2 files changed

+67
-8
lines changed

ec/getjogger.go

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ func (c *getJogger) copyMissingReplicas(ctx *restoreCtx, reader cos.ReadOpenClos
202202
nlog.Errorf("%s failed to send %s to %v: %v", core.T, ctx.lom, daemons, err)
203203
}
204204
freeObject(reader)
205+
srcReader.Close()
205206
}
206207
src := &dataSource{
207208
reader: srcReader,
@@ -477,13 +478,21 @@ func cksumSlice(reader io.Reader, recvCksum *cos.Cksum, objName string) error {
477478
return err
478479
}
479480

481+
func closeReaders(objs []io.ReadCloser) {
482+
for _, obj := range objs {
483+
if obj != nil {
484+
obj.Close()
485+
}
486+
}
487+
}
488+
480489
// Reconstruct the main object from slices. Returns the list of reconstructed slices.
481490
func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
482491
var (
483492
err error
484493
sliceCnt = ctx.meta.Data + ctx.meta.Parity
485494
sliceSize = SliceSize(ctx.meta.Size, ctx.meta.Data)
486-
readers = make([]io.Reader, sliceCnt)
495+
readers = make([]io.ReadCloser, sliceCnt)
487496
writers = make([]io.Writer, sliceCnt)
488497
restored = make([]*slice, sliceCnt)
489498
cksums = make([]*cos.CksumHash, sliceCnt)
@@ -511,7 +520,7 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
511520
continue
512521
}
513522

514-
var cksmReader io.Reader
523+
var cksmReader io.ReadCloser
515524
if sgl, ok := sl.writer.(*memsys.SGL); ok {
516525
readers[i] = memsys.NewReader(sgl)
517526
cksmReader = memsys.NewReader(sgl)
@@ -536,9 +545,11 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
536545
}
537546
readers[i] = nil
538547
}
548+
cksmReader.Close()
539549
}
540550

541551
if err != nil {
552+
closeReaders(readers)
542553
return restored, err
543554
}
544555

@@ -547,10 +558,16 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
547558
}
548559
stream, err := reedsolomon.NewStreamC(ctx.meta.Data, ctx.meta.Parity, true, true)
549560
if err != nil {
561+
closeReaders(readers)
550562
return restored, err
551563
}
552564

553-
if err := stream.Reconstruct(readers, writers); err != nil {
565+
rebuildReaders := make([]io.Reader, len(readers))
566+
for i, rdr := range readers {
567+
rebuildReaders[i] = rdr
568+
}
569+
if err := stream.Reconstruct(rebuildReaders, writers); err != nil {
570+
closeReaders(readers)
554571
return restored, err
555572
}
556573

@@ -565,7 +582,7 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
565582
}
566583

567584
version := ""
568-
srcReaders := make([]io.Reader, ctx.meta.Data)
585+
srcReaders := make([]io.ReadCloser, ctx.meta.Data)
569586
for i := range ctx.meta.Data {
570587
if ctx.slices[i] != nil && ctx.slices[i].writer != nil {
571588
if version == "" {
@@ -575,10 +592,12 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
575592
srcReaders[i] = memsys.NewReader(sgl)
576593
} else {
577594
if ctx.slices[i].workFQN == "" {
595+
closeReaders(readers)
578596
return restored, fmt.Errorf("invalid writer: %T", ctx.slices[i].writer)
579597
}
580598
srcReaders[i], err = cos.NewFileHandle(ctx.slices[i].workFQN)
581599
if err != nil {
600+
closeReaders(readers)
582601
return restored, err
583602
}
584603
}
@@ -592,18 +611,22 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
592611
if restored[i].workFQN != "" {
593612
srcReaders[i], err = cos.NewFileHandle(restored[i].workFQN)
594613
if err != nil {
614+
closeReaders(srcReaders)
615+
closeReaders(readers)
595616
return restored, err
596617
}
597618
} else {
598619
sgl, ok := restored[i].obj.(*memsys.SGL)
599620
if !ok {
621+
closeReaders(srcReaders)
622+
closeReaders(readers)
600623
return restored, fmt.Errorf("empty slice %s[%d]", ctx.lom, i)
601624
}
602625
srcReaders[i] = memsys.NewReader(sgl)
603626
}
604627
}
605628

606-
src := io.MultiReader(srcReaders...)
629+
src := newMultiReader(srcReaders...)
607630
if cmn.Rom.FastV(4, cos.SmoduleEC) {
608631
nlog.Infof("Saving main object %s to %q", ctx.lom, ctx.lom.FQN)
609632
}
@@ -615,13 +638,15 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
615638
mainMeta := *ctx.meta
616639
mainMeta.SliceID = 0
617640
args := &WriteArgs{
618-
Reader: src,
641+
Reader: src.mr,
619642
MD: mainMeta.NewPack(),
620643
Cksum: cos.NewCksum(cksumType, ""),
621644
Generation: mainMeta.Generation,
622645
Xact: c.parent,
623646
}
624647
err = WriteReplicaAndMeta(ctx.lom, args)
648+
src.Close()
649+
closeReaders(readers)
625650
return restored, err
626651
}
627652

@@ -732,14 +757,15 @@ func (c *getJogger) uploadRestoredSlices(ctx *restoreCtx, slices []*slice) error
732757
}
733758

734759
// Every slice's SGL is freed upon transfer completion
735-
cb := func(daemonID string, s *slice) transport.ObjSentCB {
760+
cb := func(daemonID string, s *slice, rdr cos.ReadOpenCloser) transport.ObjSentCB {
736761
return func(_ *transport.ObjHdr, _ io.ReadCloser, _ any, err error) {
737762
if err != nil {
738763
nlog.Errorf("%s failed to send %s to %v: %v", core.T, ctx.lom, daemonID, err)
739764
}
740765
s.free()
766+
rdr.Close()
741767
}
742-
}(tid, sl)
768+
}(tid, sl, reader)
743769
if err := c.parent.writeRemote([]string{tid}, ctx.lom, dataSrc, cb); err != nil {
744770
remoteErr = err
745771
nlog.Errorf("%s failed to send slice %s[%d] to %s", core.T, ctx.lom, sliceID, tid)

ec/multireader.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Package ec provides erasure coding (EC) based data protection for AIStore.
2+
/*
3+
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
4+
*/
5+
package ec
6+
7+
import "io"
8+
9+
type multiReader struct {
10+
mr io.Reader // standard io.multiReader
11+
rcs []io.ReadCloser // to close
12+
}
13+
14+
func newMultiReader(rcs ...io.ReadCloser) *multiReader {
15+
mrWrapper := &multiReader{
16+
rcs: rcs,
17+
}
18+
readers := make([]io.Reader, len(rcs))
19+
for i, rc := range rcs {
20+
readers[i] = io.Reader(rc)
21+
}
22+
mrWrapper.mr = io.MultiReader(readers...)
23+
return mrWrapper
24+
}
25+
26+
func (mr *multiReader) Read(p []byte) (int, error) { return mr.mr.Read(p) }
27+
28+
func (mr *multiReader) Close() (err error) {
29+
for _, r := range mr.rcs {
30+
err = r.Close()
31+
}
32+
return err
33+
}

0 commit comments

Comments
 (0)
0