@@ -202,6 +202,7 @@ func (c *getJogger) copyMissingReplicas(ctx *restoreCtx, reader cos.ReadOpenClos
202
202
nlog .Errorf ("%s failed to send %s to %v: %v" , core .T , ctx .lom , daemons , err )
203
203
}
204
204
freeObject (reader )
205
+ srcReader .Close ()
205
206
}
206
207
src := & dataSource {
207
208
reader : srcReader ,
@@ -477,13 +478,21 @@ func cksumSlice(reader io.Reader, recvCksum *cos.Cksum, objName string) error {
477
478
return err
478
479
}
479
480
481
+ func closeReaders (objs []io.ReadCloser ) {
482
+ for _ , obj := range objs {
483
+ if obj != nil {
484
+ obj .Close ()
485
+ }
486
+ }
487
+ }
488
+
480
489
// Reconstruct the main object from slices. Returns the list of reconstructed slices.
481
490
func (c * getJogger ) restoreMainObj (ctx * restoreCtx ) ([]* slice , error ) {
482
491
var (
483
492
err error
484
493
sliceCnt = ctx .meta .Data + ctx .meta .Parity
485
494
sliceSize = SliceSize (ctx .meta .Size , ctx .meta .Data )
486
- readers = make ([]io.Reader , sliceCnt )
495
+ readers = make ([]io.ReadCloser , sliceCnt )
487
496
writers = make ([]io.Writer , sliceCnt )
488
497
restored = make ([]* slice , sliceCnt )
489
498
cksums = make ([]* cos.CksumHash , sliceCnt )
@@ -511,7 +520,7 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
511
520
continue
512
521
}
513
522
514
- var cksmReader io.Reader
523
+ var cksmReader io.ReadCloser
515
524
if sgl , ok := sl .writer .(* memsys.SGL ); ok {
516
525
readers [i ] = memsys .NewReader (sgl )
517
526
cksmReader = memsys .NewReader (sgl )
@@ -536,9 +545,11 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
536
545
}
537
546
readers [i ] = nil
538
547
}
548
+ cksmReader .Close ()
539
549
}
540
550
541
551
if err != nil {
552
+ closeReaders (readers )
542
553
return restored , err
543
554
}
544
555
@@ -547,10 +558,16 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
547
558
}
548
559
stream , err := reedsolomon .NewStreamC (ctx .meta .Data , ctx .meta .Parity , true , true )
549
560
if err != nil {
561
+ closeReaders (readers )
550
562
return restored , err
551
563
}
552
564
553
- if err := stream .Reconstruct (readers , writers ); err != nil {
565
+ rebuildReaders := make ([]io.Reader , len (readers ))
566
+ for i , rdr := range readers {
567
+ rebuildReaders [i ] = rdr
568
+ }
569
+ if err := stream .Reconstruct (rebuildReaders , writers ); err != nil {
570
+ closeReaders (readers )
554
571
return restored , err
555
572
}
556
573
@@ -565,7 +582,7 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
565
582
}
566
583
567
584
version := ""
568
- srcReaders := make ([]io.Reader , ctx .meta .Data )
585
+ srcReaders := make ([]io.ReadCloser , ctx .meta .Data )
569
586
for i := range ctx .meta .Data {
570
587
if ctx .slices [i ] != nil && ctx .slices [i ].writer != nil {
571
588
if version == "" {
@@ -575,10 +592,12 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
575
592
srcReaders [i ] = memsys .NewReader (sgl )
576
593
} else {
577
594
if ctx .slices [i ].workFQN == "" {
595
+ closeReaders (readers )
578
596
return restored , fmt .Errorf ("invalid writer: %T" , ctx .slices [i ].writer )
579
597
}
580
598
srcReaders [i ], err = cos .NewFileHandle (ctx .slices [i ].workFQN )
581
599
if err != nil {
600
+ closeReaders (readers )
582
601
return restored , err
583
602
}
584
603
}
@@ -592,18 +611,22 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
592
611
if restored [i ].workFQN != "" {
593
612
srcReaders [i ], err = cos .NewFileHandle (restored [i ].workFQN )
594
613
if err != nil {
614
+ closeReaders (srcReaders )
615
+ closeReaders (readers )
595
616
return restored , err
596
617
}
597
618
} else {
598
619
sgl , ok := restored [i ].obj .(* memsys.SGL )
599
620
if ! ok {
621
+ closeReaders (srcReaders )
622
+ closeReaders (readers )
600
623
return restored , fmt .Errorf ("empty slice %s[%d]" , ctx .lom , i )
601
624
}
602
625
srcReaders [i ] = memsys .NewReader (sgl )
603
626
}
604
627
}
605
628
606
- src := io . MultiReader (srcReaders ... )
629
+ src := newMultiReader (srcReaders ... )
607
630
if cmn .Rom .FastV (4 , cos .SmoduleEC ) {
608
631
nlog .Infof ("Saving main object %s to %q" , ctx .lom , ctx .lom .FQN )
609
632
}
@@ -615,13 +638,15 @@ func (c *getJogger) restoreMainObj(ctx *restoreCtx) ([]*slice, error) {
615
638
mainMeta := * ctx .meta
616
639
mainMeta .SliceID = 0
617
640
args := & WriteArgs {
618
- Reader : src ,
641
+ Reader : src . mr ,
619
642
MD : mainMeta .NewPack (),
620
643
Cksum : cos .NewCksum (cksumType , "" ),
621
644
Generation : mainMeta .Generation ,
622
645
Xact : c .parent ,
623
646
}
624
647
err = WriteReplicaAndMeta (ctx .lom , args )
648
+ src .Close ()
649
+ closeReaders (readers )
625
650
return restored , err
626
651
}
627
652
@@ -732,14 +757,15 @@ func (c *getJogger) uploadRestoredSlices(ctx *restoreCtx, slices []*slice) error
732
757
}
733
758
734
759
// Every slice's SGL is freed upon transfer completion
735
- cb := func (daemonID string , s * slice ) transport.ObjSentCB {
760
+ cb := func (daemonID string , s * slice , rdr cos. ReadOpenCloser ) transport.ObjSentCB {
736
761
return func (_ * transport.ObjHdr , _ io.ReadCloser , _ any , err error ) {
737
762
if err != nil {
738
763
nlog .Errorf ("%s failed to send %s to %v: %v" , core .T , ctx .lom , daemonID , err )
739
764
}
740
765
s .free ()
766
+ rdr .Close ()
741
767
}
742
- }(tid , sl )
768
+ }(tid , sl , reader )
743
769
if err := c .parent .writeRemote ([]string {tid }, ctx .lom , dataSrc , cb ); err != nil {
744
770
remoteErr = err
745
771
nlog .Errorf ("%s failed to send slice %s[%d] to %s" , core .T , ctx .lom , sliceID , tid )
0 commit comments