8000 fix(xstream): fix memory leak in Limit by chenquan · Pull Request #31 · chenquan/go-pkg · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix(xstream): fix memory leak in Limit #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 51 additions & 22 deletions xstream/xstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ func Range(source <-chan interface{}) *Stream {

// Of Returns a Stream based any element
func Of(items ...interface{}) *Stream {
source := make(chan interface{}, len(items))
n := len(items)
if n == 0 {
return Empty()
}

source := make(chan interface{}, n)
go func() {
for _, item := range items {
source <- item
Expand Down Expand Up @@ -191,20 +196,21 @@ func (s *Stream) Split(n int) *Stream {
// SplitSteam Returns a split Stream that contains multiple stream of chunk size n.
func (s *Stream) SplitSteam(n int) *Stream {
if n < 1 {
go drain(s.source)
panic("n should be greater than 0")
}
source := make(chan interface{})

var chunkSource = make(chan interface{}, n)
go func() {

var chunkSource = make(chan interface{}, n)
for item := range s.source {
chunkSource <- item
if len(chunkSource) == n {

source <- Range(chunkSource)
close(chunkSource)
chunkSource = nil

chunkSource = make(chan interface{}, n)
}
}
Expand Down Expand Up @@ -232,21 +238,25 @@ func (s *Stream) Sort(less LessFunc) *Stream {
}

// Tail Returns a Stream that has n element at the end.
func (s *Stream) Tail(n uint32) *Stream {

if n < 1 {
func (s *Stream) Tail(n int) *Stream {
if n <= 0 {
go drain(s.source)
if n == 0 {
return empty
}
panic("n should be greater than 0")
}

source := make(chan interface{})

go func() {
defer close(source)

ring := newRing(uint(n))
r := newRing(uint(n))
for item := range s.source {
ring.add(item)
r.add(item)
}
for _, item := range ring.take() {
for _, item := range r.take() {
source <- item
}
}()
Expand All @@ -255,7 +265,12 @@ func (s *Stream) Tail(n uint32) *Stream {
}

// Skip Returns a Stream that skips size elements.
func (s *Stream) Skip(size uint32) *Stream {
func (s *Stream) Skip(size int) *Stream {
if size < 0 {
go drain(s.source)
panic("size should be greater than 0")
}

if size == 0 {
return s
}
Expand All @@ -265,7 +280,7 @@ func (s *Stream) Skip(size uint32) *Stream {
go func() {
defer close(source)

i := uint32(0)
i := 0
for item := range s.source {
if i >= size {
source <- item
Expand All @@ -280,6 +295,7 @@ func (s *Stream) Skip(size uint32) *Stream {
// Limit Returns a Stream that contains size elements.
func (s *Stream) Limit(size int) *Stream {
if size == 0 {
go drain(s.source)
return Empty()
}
if size < 0 {
Expand All @@ -304,7 +320,6 @@ func (s *Stream) Limit(size int) *Stream {
if size > 0 {
close(source)
}

}()

return Range(source)
Expand Down Expand Up @@ -524,8 +539,7 @@ func (s *Stream) AllMach(f func(item interface{}) bool) (isFind bool) {
// If the stream has no encounter order, then any element may be returned
func (s *Stream) FindFirst() (result interface{}, err error) {

for item := range s.source {
result = item
for result = range s.source {
go drain(s.source)
return
}
Expand All @@ -534,6 +548,21 @@ func (s *Stream) FindFirst() (result interface{}, err error) {
return
}

// FindLast Returns an interface{} the last element of this stream, or a nil and a error if the stream is empty.
// If the stream has no encounter order, then any element may be returned
func (s *Stream) FindLast() (result interface{}, err error) {
flag := true
for result = range s.source {
flag = false
}

if flag {
err = errors.New("no element")
}

return
}

// Peek Returns a Stream consisting of the elements of this stream,
// additionally performing the provided action on each element as elements are consumed from the resulting stream.
func (s *Stream) Peek(f ForEachFunc) *Stream {
Expand All @@ -556,19 +585,19 @@ func (s *Stream) Copy() *Stream {
data = append(data, v)
}

c1 := make(chan interface{}, len(data))
c2 := make(chan interface{}, len(data))
originChan := make(chan interface{}, len(data))
copyChan := make(chan interface{}, len(data))

go func() {
for v := range data {
c1 <- v
c2 <- v
originChan <- v
copyChan <- v
}
close(c1)
close(c2)
close(originChan)
close(copyChan)
}()
s.source = c1
return Range(c2)
s.source = originChan
return Range(copyChan)
}

func drain(channel <-chan interface{}) {
Expand Down
22 changes: 21 additions & 1 deletion xstream/xstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,11 @@ func TestStream_Tail(t *testing.T) {
equal(t, Of(1, 232, 3, 2, 3).Tail(8), []interface{}{1, 232, 3, 2, 3})
}
func TestTailZero(t *testing.T) {
Of(1, 2, 3, 4).Tail(0).Done()

assert.Panics(t, func() {
Of(1, 2, 3, 4).Tail(0).Done()
Of(1, 2, 3, 4).Tail(-1).Done()

})
}

Expand All @@ -179,6 +182,9 @@ func TestStream_Skip(t *testing.T) {
assertEqual(t, 1, Of(1, 2, 3, 4).Skip(3).Count())
equal(t, Of(1, 2, 3, 4).Skip(3), []interface{}{4})
equal(t, Of(1, 2, 3).Skip(0), []interface{}{1, 2, 3})
assert.Panics(t, func() {
Of(1, 2, 3).Skip(-1)
})

}
func TestStream_Limit(t *testing.T) {
Expand Down Expand Up @@ -389,3 +395,17 @@ func TestStream_Collection(t *testing.T) {
assert.EqualValues(t, []int{1, 2, 3}, ints)
})
}

func TestStream_FindLast(t *testing.T) {
t.Run("has value", func(t *testing.T) {
last, err := Of(1, 2, 3).FindLast()
assert.NoError(t, err)
assert.EqualValues(t, 3, last)
})

t.Run("hasn't value", func(t *testing.T) {
last, err := Of().FindLast()
assert.Error(t, err)
assert.EqualValues(t, nil, last)
})
}
0