8000 hls: support routing absolute timestamp (#1300) by aler9 · Pull Request #4372 · bluenviron/mediamtx · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

hls: support routing absolute timestamp (#1300) #4372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi
* [Forward streams to other servers](#forward-streams-to-other-servers)
* [Proxy requests to other servers](#proxy-requests-to-other-servers)
* [On-demand publishing](#on-demand-publishing)
* [Route absolute timestamps](#route-absolute-timestamps)
* [Expose the server in a subfolder](#expose-the-server-in-a-subfolder)
* [Start on boot](#start-on-boot)
* [Linux](#linux)
Expand Down Expand Up @@ -1706,6 +1707,17 @@ paths:

The command inserted into `runOnDemand` will start only when a client requests the path `ondemand`, therefore the file will start streaming only when requested.

### Route absolute timestamps

Some streaming protocols allow to route absolute timestamps, associated with each frame, that are useful for synchronizing several video or data streams together. In particular, _MediaMTX_ supports receiving absolute timestamps with the following protocols:

* HLS (through the `EXT-X-PROGRAM-DATE-TIME` tag in playlists)

and supports sending absolute timestamps with the following protocols:

* HLS (through the `EXT-X-PROGRAM-DATE-TIME` tag in playlists)
* RTSP (through RTCP reports)

### Expose the server in a subfolder

HTTP-based services (WebRTC, HLS, Control API, Playback Server, Metrics, pprof) can be exposed in a subfolder of an existing HTTP server or reverse proxy. The reverse proxy must be able to intercept HTTP requests addressed to MediaMTX and corresponding responses, and perform the following changes:
Expand Down
58 changes: 52 additions & 6 deletions internal/protocols/hls/to_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,20 @@
"github.com/bluenviron/gohlslib/v2/pkg/codecs"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)

type ntpState int

const (
ntpStateInitial ntpState = iota
ntpStateUnavailable
ntpStateAvailable
ntpStateDegraded
)

func multiplyAndDivide(v, m, d int64) int64 {
secs := v / d
dec := v % d
Expand All @@ -23,6 +33,42 @@
tracks []*gohlslib.Track,
stream **stream.Stream,
) ([]*description.Media, error) {
var state ntpState

handleNTP := func(track *gohlslib.Track) time.Time {
switch state {
case ntpStateInitial:
ntp, avail := c.AbsoluteTime(track)
if !avail {
state = ntpStateUnavailable
return time.Now()
}

Check warning on line 45 in internal/protocols/hls/to_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/hls/to_stream.go#L43-L45

Added lines #L43 - L45 were not covered by tests

state = ntpStateAvailable
return ntp

case ntpStateAvailable:
ntp, avail := c.AbsoluteTime(track)
if !avail {
panic("should not happen")

Check warning on line 53 in internal/protocols/hls/to_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/hls/to_stream.go#L50-L53

Added lines #L50 - L53 were not covered by tests
}

return ntp

Check warning on line 56 in internal/protocols/hls/to_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/hls/to_stream.go#L56

Added line #L56 was not covered by tests

case ntpStateUnavailable:
_, avail := c.AbsoluteTime(track)
if avail {
(*stream).Parent.Log(logger.Warn, "absolute timestamp appeared after stream started, we are not using it")
state = ntpStateDegraded
}

Check warning on line 63 in internal/protocols/hls/to_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/hls/to_stream.go#L58-L63

Added lines #L58 - L63 were not covered by tests

return time.Now()

Check warning on line 65 in internal/protocols/hls/to_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/hls/to_stream.go#L65

Added line #L65 was not covered by tests

default: // ntpStateDegraded
return time.Now()

Check warning on line 68 in internal/protocols/hls/to_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/hls/to_stream.go#L67-L68

Added lines #L67 - L68 were not covered by tests
}
}

var medias []*description.Media //nolint:prealloc

for _, track := range tracks {
Expand All @@ -41,7 +87,7 @@
c.OnDataAV1(track, func(pts int64, tu [][]byte) {
(*stream).WriteUnit(medi, medi.Formats[0], &unit.AV1{
Base: unit.Base{
NTP: time.Now(),
NTP: handleNTP(track),

Check warning on line 90 in internal/protocols/hls/to_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/hls/to_stream.go#L90

Added line #L90 was not covered by tests
PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
},
TU: tu,
Expand All @@ -59,7 +105,7 @@
c.OnDataVP9(track, func(pts int64, frame []byte) {
(*stream).WriteUnit(medi, medi.Formats[0], &unit.VP9{
Base: unit.Base{
NTP: time.Now(),
NTP: handleNTP(track),

Check warning on line 108 in internal/protocols/hls/to_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/hls/to_stream.go#L108

Added line #L108 was not covered by tests
PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
},
Frame: frame,
Expand All @@ -80,7 +126,7 @@
c.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) {
(*stream).WriteUnit(medi, medi.Formats[0], &unit.H265{
Base: unit.Base{
NTP: time.Now(),
NTP: handleNTP(track),

Check warning on line 129 in internal/protocols/hls/to_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/hls/to_stream.go#L129

Added line #L129 was not covered by tests
PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
},
AU: au,
Expand All @@ -101,7 +147,7 @@
c.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) {
(*stream).WriteUnit(medi, medi.Formats[0], &unit.H264{
Base: unit.Base{
NTP: time.Now(),
NTP: handleNTP(track),
PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
},
AU: au,
Expand All @@ -120,7 +166,7 @@
c.OnDataOpus(track, func(pts int64, packets [][]byte) {
(*stream).WriteUnit(medi, medi.Formats[0], &unit.Opus{
Base: unit.Base{
NTP: time.Now(),
NTP: handleNTP(track),

Check warning on line 169 in internal/protocols/hls/to_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/hls/to_stream.go#L169

Added line #L169 was not covered by tests
PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), int64(clockRate)),
},
Packets: packets,
Expand All @@ -142,7 +188,7 @@
c.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) {
(*stream).WriteUnit(medi, medi.Formats[0], &unit.MPEG4Audio{
Base: unit.Base{
NTP: time.Now(),
NTP: handleNTP(track),

Check warning on line 191 in internal/protocols/hls/to_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/hls/to_stream.go#L191

Added line #L191 was not covered by tests
PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), int64(clockRate)),
},
AUs: aus,
Expand Down
126 changes: 126 additions & 0 deletions internal/protocols/hls/to_stream_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
package hls

import (
"context"
"net"
"net/http"
"testing"
"time"

"github.com/bluenviron/gohlslib/v2"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/bluenviron/mediamtx/internal/unit"
"github.com/stretchr/testify/require"
)

Expand All @@ -14,3 +24,119 @@ func TestToStreamNoSupportedCodecs(t *testing.T) {

// this is impossible to test since currently we support all gohlslib.Tracks.
// func TestToStreamSkipUnsupportedTracks(t *testing.T)

func TestToStream(t *testing.T) {
track1 := &mpegts.Track{
Codec: &mpegts.CodecH264{},
}

s := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/stream.m3u8":
w.Header().Set("Content-Type", `application/vnd.apple.mpegurl`)
w.Write([]byte("#EXTM3U\n" +
"#EXT-X-VERSION:3\n" +
"#EXT-X-ALLOW-CACHE:NO\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MEDIA-SEQUENCE:0\n" +
"#EXT-X-PROGRAM-DATE-TIME:2018-05-20T08:17:15Z\n" +
"#EXTINF:2,\n" +
"segment1.ts\n" +
"#EXTINF:2,\n" +
"segment2.ts\n" +
"#EXTINF:2,\n" +
"segment2.ts\n" +
"#EXT-X-ENDLIST\n"))

case r.Method == http.MethodGet && r.URL.Path == "/segment1.ts":
w.Header().Set("Content-Type", `video/MP2T`)

w := &mpegts.Writer{W: w, Tracks: []*mpegts.Track{track1}}
err := w.Initialize()
require.NoError(t, err)

err = w.WriteH264(track1, 2*90000, 2*90000, [][]byte{
{7, 1, 2, 3}, // SPS
{8}, // PPS
})
require.NoError(t, err)

case r.Method == http.MethodGet && r.URL.Path == "/segment2.ts":
w.Header().Set("Content-Type", `video/MP2T`)

w := &mpegts.Writer{W: w, Tracks: []*mpegts.Track{track1}}
err := w.Initialize()
require.NoError(t, err)

err = w.WriteH264(track1, 2*90000, 2*90000, [][]byte{
{5, 1},
})
require.NoError(t, err)
}
}),
}

ln, err := net.Listen("tcp", "localhost:5780")
require.NoError(t, err)

go s.Serve(ln)
defer s.Shutdown(context.Background())

var strm *stream.Stream
done := make(chan struct{})

reader := test.NilLogger

var c *gohlslib.Client
c = &gohlslib.Client{
URI: "http://localhost:5780/stream.m3u8",
OnTracks: func(tracks []*gohlslib.Track) error {
medias, err2 := ToStream(c, tracks, &strm)
require.NoError(t, err2)
require.Equal(t, []*description.Media{{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
}}, medias)

strm = &stream.Stream{
WriteQueueSize: 512,
UDPMaxPayloadSize: 1472,
Desc: &description.Session{Medias: medias},
GenerateRTPPackets: true,
Parent: test.NilLogger,
}
err2 = strm.Initialize()
require.NoError(t, err2)

strm.AddReader(
reader,
medias[0],
medias[0].Formats[0],
func(u unit.Unit) error {
require.Equal(t, time.Date(2018, 0o5, 20, 8, 17, 15, 0, time.UTC), u.GetNTP())
close(done)
return nil
})

strm.StartReader(reader)

return nil
},
}
err = c.Start()
require.NoError(t, err)
defer c.Close()

select {
case <-done:
case err := <-c.Wait():
t.Error(err.Error())
}

strm.RemoveReader(reader)
strm.Close()
}
95 changes: 46 additions & 49 deletions internal/staticsources/hls/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"

"github.com/bluenviron/mediamtx/internal/conf"
Expand Down Expand Up @@ -36,54 +35,52 @@ func TestSource(t *testing.T) {
track2,
}

gin.SetMode(gin.ReleaseMode)
router := gin.New()

router.GET("/stream.m3u8", func(ctx *gin.Context) {
ctx.Header("Content-Type", `application/vnd.apple.mpegurl`)
ctx.Writer.Write([]byte("#EXTM3U\n" +
"#EXT-X-VERSION:3\n" +
"#EXT-X-ALLOW-CACHE:NO\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MEDIA-SEQUENCE:0\n" +
"#EXTINF:2,\n" +
"segment1.ts\n" +
"#EXTINF:2,\n" +
"segment2.ts\n" +
"#EXTINF:2,\n" +
"segment2.ts\n" +
"#EXT-X-ENDLIST\n"))
})

router.GET("/segment1.ts", func(ctx *gin.Context) {
ctx.Header("Content-Type", `video/MP2T`)

w := &mpegts.Writer{W: ctx.Writer, Tracks: tracks}
err := w.Initialize()
require.NoError(t, err)

err = w.WriteMPEG4Audio(track2, 1*90000, [][]byte{{1, 2, 3, 4}})
require.NoError(t, err)

err = w.WriteH264(track1, 2*90000, 2*90000, [][]byte{
{7, 1, 2, 3}, // SPS
{8}, // PPS
})
require.NoError(t, err)
})

router.GET("/segment2.ts", func(ctx *gin.Context) {
ctx.Header("Content-Type", `video/MP2T`)

w := &mpegts.Writer{W: ctx.Writer, Tracks: tracks}
err := w.Initialize()
require.NoError(t, err)

err = w.WriteMPEG4Audio(track2, 3*90000, [][]byte{{1, 2, 3, 4}})
require.NoError(t, err)
})

s := &http.Server{Handler: router}
s := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/stream.m3u8":
w.Header().Set("Content-Type", `application/vnd.apple.mpegurl`)
w.Write([]byte("#EXTM3U\n" +
"#EXT-X-VERSION:3\n" +
"#EXT-X-ALLOW-CACHE:NO\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MEDIA-SEQUENCE:0\n" +
"#EXTINF:2,\n" +
"segment1.ts\n" +
"#EXTINF:2,\n" +
"segment2.ts\n" +
"#EXTINF:2,\n" +
"segment2.ts\n" +
"#EXT-X-ENDLIST\n"))

case r.Method == http.MethodGet && r.URL.Path == "/segment1.ts":
w.Header().Set("Content-Type", `video/MP2T`)

w := &mpegts.Writer{W: w, Tracks: tracks}
err := w.Initialize()
require.NoError(t, err)

err = w.WriteMPEG4Audio(track2, 1*90000, [][]byte{{1, 2, 3, 4}})
require.NoError(t, err)

err = w.WriteH264(track1, 2*90000, 2*90000, [][]byte{
{7, 1, 2, 3}, // SPS
{8}, // PPS
})
require.NoError(t, err)

case r.Method == http.MethodGet && r.URL.Path == "/segment2.ts":
w.Header().Set("Content-Type", `video/MP2T`)

w := &mpegts.Writer{W: w, Tracks: tracks}
err := w.Initialize()
require.NoError(t, err)

err = w.WriteMPEG4Audio(track2, 3*90000, [][]byte{{1, 2, 3, 4}})
require.NoError(t, err)
}
}),
}

ln, err := net.Listen("tcp", "localhost:5780")
require.NoError(t, err)
Expand Down
Loading
0