zl程序教程

您现在的位置是:首页 >  其它

当前栏目

pion实现录制WebRTC流

实现 Webrtc 录制
2023-09-11 14:16:28 时间

推荐一个零声学院免费公开课程,个人觉得老师讲得不错,分享给大家:Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,立即学习
stream_test.go

package stream

import (
	"testing"
	"time"
)

func TestStream(t *testing.T) {
	stream, _ := NewStream("172.24.116.214", "5314d4e", "8402180", "D:\\workspace\\biz\\record\\")

	count := 0
	for count < 10 {
		count++
		time.Sleep(1 * time.Second)
	}
	stream.Stop()
}

stream.go

package stream

import (
	"context"
	"fmt"
	"github.com/pion/interceptor"
	"github.com/pion/sdp/v3"
	"github.com/pion/webrtc/v3"
	"github.com/pion/webrtc/v3/pkg/media"
	"github.com/pion/webrtc/v3/pkg/media/h264writer"
	"github.com/pion/webrtc/v3/pkg/media/oggwriter"
	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
	"os/exec"
	"sync"
)

var Streams sync.Map

func Find(host, room string) bool {
	key := "webrtc://" + host + "/" + room
	if _, ok := Streams.Load(key); ok {
		return true
	}
	return false
}

func LoadAndDelStream(host, room string) *Stream {
	key := "webrtc://" + host + "/" + room
	if v, ok := Streams.LoadAndDelete(key); ok {
		if stream, ok := v.(*Stream); ok {
			return stream
		}
		return nil
	}
	return nil
}

type Stream struct {
	Host          string
	Room          string
	Display       string
	rtcUrl        string
	savePath      string
	ctx           context.Context
	cancel        context.CancelFunc
	pc            *webrtc.PeerConnection
	hasAudioTrack bool
	hasVideoTrack bool
	videoFinish   chan struct{}
	audioFinish   chan struct{}
}

func (self *Stream) onTrack(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error {
	// Send a PLI on an interval so that the publisher is pushing a keyframe
	codec := track.Codec()

	trackDesc := fmt.Sprintf("channels=%v", codec.Channels)
	if track.Kind() == webrtc.RTPCodecTypeVideo {
		trackDesc = fmt.Sprintf("fmtp=%v", codec.SDPFmtpLine)
	}
	logrus.Infof("Got track %v, pt=%v tbn=%v, %v", codec.MimeType, codec.PayloadType, codec.ClockRate, trackDesc)
	var err error
	if codec.MimeType == "audio/opus" {
		var da media.Writer
		defer func() {
			if da != nil {
				da.Close()
			}
		}()
		audiopath := self.savePath + self.Display + "_audio.ogg"
		if da, err = oggwriter.New(audiopath, codec.ClockRate, codec.Channels); err != nil {
			return errors.Wrapf(err, "创建"+audiopath+"失败")
		}
		self.hasAudioTrack = true
		logrus.Infof("Open ogg writer file=%v , tbn=%v, channels=%v", audiopath, codec.ClockRate, codec.Channels)
		if err = self.writeTrackToDisk(da, track); err != nil {
			return err
		}
		self.audioFinish <- struct{}{}
	} else if codec.MimeType == "video/H264" {
		var dv_h264 media.Writer
		videopath := self.savePath + self.Display + "_video.h264"

		if dv_h264, err = h264writer.New(videopath); err != nil {
			return err
		}
		logrus.Infof("Open h264 writer file=%v", videopath)
		self.hasVideoTrack = true
		if err = self.writeTrackToDisk(dv_h264, track); err != nil {
			return err
		}
		self.audioFinish <- struct{}{}
	} else {
		logrus.Warnf("Ignore track %v pt=%v", codec.MimeType, codec.PayloadType)
	}

	return nil
}

func (self *Stream) writeTrackToDisk(w media.Writer, track *webrtc.TrackRemote) error {
	for self.ctx.Err() == nil {
		pkt, _, err := track.ReadRTP()
		//fmt.Println(filename, pkt.Timestamp)
		if err != nil {
			if self.ctx.Err() != nil {
				return nil
			}
			return err
		}

		if w == nil {
			continue
		}

		if err := w.WriteRTP(pkt); err != nil {
			if len(pkt.Payload) <= 2 {
				continue
			}
			logrus.Warnf("Ignore write RTP %vB err %+v\n", len(pkt.Payload), err)
		}
	}

	return self.ctx.Err()
}

func (self *Stream) Stop() bool {
	self.cancel()
	if self.hasAudioTrack {
		<-self.audioFinish
	}
	if self.hasVideoTrack {
		<-self.videoFinish
	}

	if self.hasVideoTrack && self.hasAudioTrack {
		audiopath := self.savePath + self.Display + "_audio.ogg"
		videopath := self.savePath + self.Display + "_video.h264"

		cmd := exec.Command("ffmpeg",
			"-i",
			audiopath,
			"-i",
			videopath,
			self.savePath+self.Display+".ts",
			"-y")
		if err := cmd.Run(); err != nil {
			logrus.Errorf("拼接音频和视频失败:%v", err)
			return false
		}
		return true
	}
	return false
}

func NewStream(host, room, display, savePath string) (*Stream, error) {
	var err error
	stream := &Stream{
		Host:          host,
		Room:          room,
		Display:       display,
		rtcUrl:        "webrtc://" + host + "/" + room + "/" + display,
		savePath:      savePath,
		hasAudioTrack: false,
		hasVideoTrack: false,
		videoFinish:   make(chan struct{}, 1),
		audioFinish:   make(chan struct{}, 1),
	}
	stream.ctx, stream.cancel = context.WithCancel(context.Background())

	//创建PeerConncetion
	stream.pc, err = newPeerConnection(webrtc.Configuration{})
	if err != nil {
		return nil, errors.Wrapf(err, "创建PeerConnection失败")
	}

	//设置方向
	stream.pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
		Direction: webrtc.RTPTransceiverDirectionRecvonly,
	})
	stream.pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
		Direction: webrtc.RTPTransceiverDirectionRecvonly,
	})

	//创建offer
	offer, err := stream.pc.CreateOffer(nil)
	if err != nil {
		return nil, errors.Wrap(err, "创建Local offer失败")
	}

	// 设置本地sdp
	if err = stream.pc.SetLocalDescription(offer); err != nil {
		return nil, errors.Wrap(err, "设置Local SDP失败")
	}

	// 设置远端SDP
	answer, err := apiRtcRequest(stream.ctx, "/rtc/v1/play", stream.rtcUrl, offer.SDP)
	if err != nil {
		return nil, errors.Wrap(err, "SDP协商失败")
	}

	if err = stream.pc.SetRemoteDescription(webrtc.SessionDescription{
		Type: webrtc.SDPTypeAnswer, SDP: answer,
	}); err != nil {
		return nil, errors.Wrap(err, "设置Remote SDP失败")
	}

	stream.pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
		fmt.Println("------------------")
		err = stream.onTrack(track, receiver)
		if err != nil {
			codec := track.Codec()
			logrus.Errorf("Handle  track %v, pt=%v\nerr %v", codec.MimeType, codec.PayloadType, err)
			stream.cancel()
		}
		stream.pc.Close()
	})

	stream.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
		logrus.Infof("ICE state %v", state)

		if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed {
			if stream.ctx.Err() != nil {
				return
			}

			logrus.Warnf("Close for ICE state %v", state)
			stream.cancel()
			stream.pc.Close()
		}
	})
	key := "webrtc://" + host + "/" + room
	Streams.Store(key, stream)
	return stream, nil
}

func newPeerConnection(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
	m := &webrtc.MediaEngine{}
	if err := m.RegisterDefaultCodecs(); err != nil {
		return nil, err
	}

	for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.TransportCCURI} {
		if extension == sdp.TransportCCURI {
			continue
		}
		if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeVideo); err != nil {
			return nil, err
		}
	}

	// https://github.com/pion/ion/issues/130
	// https://github.com/pion/ion-sfu/pull/373/files#diff-6f42c5ac6f8192dd03e5a17e9d109e90cb76b1a4a7973be6ce44a89ffd1b5d18R73
	for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.AudioLevelURI} {
		if extension == sdp.AudioLevelURI {
			continue
		}
		if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeAudio); err != nil {
			return nil, err
		}
	}

	i := &interceptor.Registry{}
	if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
		return nil, err
	}

	api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
	return api.NewPeerConnection(configuration)
}

api.go

package stream

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
	"io/ioutil"
	"net/http"
	"net/url"
	"strings"
)

// Request SRS RTC API, the apiPath like "/rtc/v1/play", the r is WebRTC url like
// "webrtc://localhost/live/livestream", and the offer is SDP in string.
//
// Return the response of answer SDP in string.
func apiRtcRequest(ctx context.Context, apiPath, rtcurl, offer string) (string, error) {
	u, err := url.Parse(rtcurl)
	if err != nil {
		return "", errors.Wrapf(err, "Parse url %v", rtcurl)
	}

	// Build api url.
	host := u.Host
	if !strings.Contains(host, ":") {
		host += ":1985"
	}

	api := fmt.Sprintf("http://%v", host)
	if !strings.HasPrefix(apiPath, "/") {
		api += "/"
	}
	api += apiPath

	if !strings.HasSuffix(apiPath, "/") {
		api += "/"
	}
	if u.RawQuery != "" {
		api += "?" + u.RawQuery
	}

	// Build JSON body.
	reqBody := struct {
		Api       string `json:"api"`
		ClientIP  string `json:"clientip"`
		SDP       string `json:"sdp"`
		StreamURL string `json:"streamurl"`
	}{
		api, "", offer, rtcurl,
	}

	resBody := struct {
		Code    int    `json:"code"`
		Session string `json:"sessionid"`
		SDP     string `json:"sdp"`
	}{}

	if err := apiRequest(ctx, api, reqBody, &resBody); err != nil {
		return "", errors.Wrapf(err, "request api=%v", api)
	}

	if resBody.Code != 0 {
		return "", errors.Errorf("Server fail code=%v", resBody.Code)
	}
	logrus.Infof("Parse response to code=%v, session=%v, sdp=%v",
		resBody.Code, resBody.Session, escapeSDP(resBody.SDP))
	logrus.Infof("Parse response to code=%v, session=%v, sdp=%v bytes",
		resBody.Code, resBody.Session, len(resBody.SDP))

	return resBody.SDP, nil
}

func escapeSDP(sdp string) string {
	return strings.ReplaceAll(strings.ReplaceAll(sdp, "\r", "\\r"), "\n", "\\n")
}

// Request SRS API and got response, both in JSON.
// The r is HTTP API to request, like "http://localhost:1985/rtc/v1/play".
// The req is the HTTP request body, will be marshal to JSON object. nil is no body
// The res is the HTTP response body, already unmarshal to JSON object.
func apiRequest(ctx context.Context, r string, req interface{}, res interface{}) error {
	var b []byte
	if req != nil {
		if b0, err := json.Marshal(req); err != nil {
			return errors.Wrapf(err, "Marshal body %v", req)
		} else {
			b = b0
		}
	}
	logrus.Infof("Request url api=%v with %v", r, string(b))
	logrus.Infof("Request url api=%v with %v bytes", r, len(b))

	method := "POST"
	if req == nil {
		method = "GET"
	}
	reqObj, err := http.NewRequest(method, r, strings.NewReader(string(b)))
	if err != nil {
		return errors.Wrapf(err, "HTTP request %v", string(b))
	}

	resObj, err := http.DefaultClient.Do(reqObj.WithContext(ctx))
	if err != nil {
		return errors.Wrapf(err, "Do HTTP request %v", string(b))
	}

	b2, err := ioutil.ReadAll(resObj.Body)
	if err != nil {
		return errors.Wrapf(err, "Read response for %v", string(b))
	}
	logrus.Infof("Response from %v is %v", r, string(b2))
	logrus.Infof("Response from %v is %v bytes", r, len(b2))

	errorCode := struct {
		Code int `json:"code"`
	}{}
	if err := json.Unmarshal(b2, &errorCode); err != nil {
		return errors.Wrapf(err, "Unmarshal %v", string(b2))
	}
	if errorCode.Code != 0 {
		return errors.Errorf("Server fail code=%v %v", errorCode.Code, string(b2))
	}

	if err := json.Unmarshal(b2, res); err != nil {
		return errors.Wrapf(err, "Unmarshal %v", string(b2))
	}
	logrus.Infof("Parse response to code=%v ok, %v", errorCode.Code, res)

	return nil
}