8000 Restart ICE only on failed connection states by andresuribe87 · Pull Request #3899 · bluenviron/mediamtx · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Restart ICE only on failed connection states #3899

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions internal/protocols/webrtc/peer_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ type PeerConnection struct {
wr *webrtc.PeerConnection
stateChangeMutex sync.Mutex
newLocalCandidate chan *webrtc.ICECandidateInit
connected chan struct{}
disconnected chan struct{}
ready chan struct{}
failed chan struct{}
done chan struct{}
gatheringDone chan struct{}
incomingTrack chan trackRecvPair
Expand Down Expand Up @@ -213,8 +213,8 @@ func (co *PeerConnection) Start() error {
}

co.newLocalCandidate = make(chan *webrtc.ICECandidateInit)
co.connected = make(chan struct{})
co.disconnected = make(chan struct{})
co.ready = make(chan struct{})
co.failed = make(chan struct{})
co.done = make(chan struct{})
co.gatheringDone = make(chan struct{})
co.incomingTrack = make(chan trackRecvPair)
Expand Down Expand Up @@ -268,21 +268,22 @@ func (co *PeerConnection) Start() error {

switch state {
case webrtc.PeerConnectionStateConnected:
// for some reasons, PeerConnectionStateConnected can arrive twice.
// https://github.com/bluenviron/mediamtx/issues/3813
// PeerConnectionStateConnected can arrive twice, since state can
// switch from "disconnected" to "connected".
// contrarily, we're interested into emitting "ready" once.
select {
case <-co.connected:
case <-co.ready:
return
default:
}

co.Log.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v",
co.LocalCandidate(), co.RemoteCandidate())

close(co.connected)
close(co.ready)

case webrtc.PeerConnectionStateDisconnected:
close(co.disconnected)
case webrtc.PeerConnectionStateFailed:
close(co.failed)

case webrtc.PeerConnectionStateClosed:
close(co.done)
Expand All @@ -294,7 +295,7 @@ func (co *PeerConnection) Start() error {
v := i.ToJSON()
select {
case co.newLocalCandidate <- &v:
case <-co.connected:
case <-co.ready:
case <-co.ctx.Done():
}
} else {
Expand Down Expand Up @@ -380,8 +381,8 @@ func (co *PeerConnection) waitGatheringDone(ctx context.Context) error {
}
}

// WaitUntilConnected waits until connection is established.
func (co *PeerConnection) WaitUntilConnected(
// WaitUntilReady waits until connection is established.
func (co *PeerConnection) WaitUntilReady(
ctx context.Context,
) error {
t := time.NewTimer(time.Duration(co.HandshakeTimeout))
Expand All @@ -393,7 +394,7 @@ outer:
case <-t.C:
return fmt.Errorf("deadline exceeded while waiting connection")

case <-co.connected:
case <-co.ready:
break outer

case <-ctx.Done():
Expand Down Expand Up @@ -436,7 +437,7 @@ func (co *PeerConnection) GatherIncomingTracks(ctx context.Context) ([]*Incoming
return co.incomingTracks, nil
}

case <-co.Disconnected():
case <-co.Failed():
return nil, fmt.Errorf("peer connection closed")

case <-ctx.Done():
Expand All @@ -445,14 +446,14 @@ func (co *PeerConnection) GatherIncomingTracks(ctx context.Context) ([]*Incoming
}
}

// Connected returns when connected.
func (co *PeerConnection) Connected() <-chan struct{} {
return co.connected
// Ready returns when ready.
func (co *PeerConnection) Ready() <-chan struct{} {
return co.ready
}

// Disconnected returns when disconnected.
func (co *PeerConnection) Disconnected() <-chan struct{} {
return co.disconnected
// Failed returns when failed.
func (co *PeerConnection) Failed() <-chan struct{} {
return co.failed
}

// NewLocalCandidate returns when there's a new local candidate.
Expand Down
6 changes: 3 additions & 3 deletions internal/protocols/webrtc/to_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,16 +375,16 @@ func TestToStream(t *testing.T) {
err2 := pc2.AddRemoteCandidate(cnd)
require.NoError(t, err2)

case <-pc1.Connected():
case <-pc1.Ready():
return
}
}
}()

err = pc1.WaitUntilConnected(context.Background())
err = pc1.WaitUntilReady(context.Background())
require.NoError(t, err)

err = pc2.WaitUntilConnected(context.Background())
err = pc2.WaitUntilReady(context.Background())
require.NoError(t, err)

err = pc1.OutgoingTracks[0].WriteRTP(&rtp.Packet{
Expand Down
6 changes: 3 additions & 3 deletions internal/protocols/whip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ outer:

case <-c.pc.GatheringDone():

case <-c.pc.Connected():
case <-c.pc.Ready():
break outer

case <-t.C:
Expand Down Expand Up @@ -190,7 +190,7 @@ outer:

case <-c.pc.GatheringDone():

case <-c.pc.Connected():
case <-c.pc.Ready():
break outer

case <-t.C:
Expand Down Expand Up @@ -230,7 +230,7 @@ func (c *Client) Close() error {
// Wait waits for client errors.
func (c *Client) Wait(ctx context.Context) error {
select {
case <-c.pc.Disconnected():
case <-c.pc.Failed():
return fmt.Errorf("peer connection closed")

case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion internal/servers/webrtc/publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@
return;
}

if (this.pc.iceConnectionState === 'disconnected') {
if (this.pc.iceConnectionState === 'failed') {
this.handleError('peer connection closed');
} else if (this.pc.iceConnectionState === 'connected') {
if (this.conf.onConnected !== undefined) {
Expand Down
2 changes: 1 addition & 1 deletion internal/servers/webrtc/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@
return;
}

if (this.pc.iceConnectionState === 'disconnected') {
if (this.pc.iceConnectionState === 'failed') {
this.handleError('peer connection closed');
}
};
Expand Down
8 changes: 4 additions & 4 deletions internal/servers/webrtc/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *session) runPublish() (int, error) {

go s.readRemoteCandidates(pc)

err = pc.WaitUntilConnected(s.ctx)
err = pc.WaitUntilReady(s.ctx)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s *session) runPublish() (int, error) {
pc.StartReading()

select {
case <-pc.Disconnected():
case <-pc.Failed():
return 0, fmt.Errorf("peer connection closed")

case <-s.ctx.Done():
Expand Down Expand Up @@ -300,7 +300,7 @@ func (s *session) runRead() (int, error) {

go s.readRemoteCandidates(pc)

err = pc.WaitUntilConnected(s.ctx)
err = pc.WaitUntilReady(s.ctx)
if err != nil {
stream.RemoveReader(s)
return 0, err
Expand All @@ -327,7 +327,7 @@ func (s *session) runRead() (int, error) {
defer stream.RemoveReader(s)

select {
case <-pc.Disconnected():
case <-pc.Failed():
return 0, fmt.Errorf("peer connection closed")

case err := <-stream.ReaderError(s):
Expand Down
2 changes: 1 addition & 1 deletion internal/staticsources/webrtc/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestSou 5EF7 rce(t *testing.T) {
w.Write([]byte(answer.SDP))

go func() {
err3 := pc.WaitUntilConnected(context.Background())
err3 := pc.WaitUntilReady(context.Background())
require.NoError(t, err3)

err3 = outgoingTracks[0].WriteRTP(&rtp.Packet{
Expand Down
Loading
0