zl程序教程

您现在的位置是:首页 >  Javascript

当前栏目

Kurento实战之五:媒体播放

2023-04-18 12:54:55 时间

本篇概览

  • 本文是《Kurento实战》的第五篇,咱们用KMS的现有能力开发一个简单的媒体播放器,整体架构如下图:
  • 从上图可见,实战主要内容是开发player-with-record应用,整个过程如下:
  1. 部署KMS
  2. 开发名为player-with-record的springboot应用,含简单的网页
  3. 浏览器打开网页,与player-with-record建立websocket连接,将流媒体地址发送到player-with-record
  4. player-with-record通过kurento SDK向KMS发指令,创建媒体播放和webrtc组件实例
  5. player-with-record还负责浏览器和前端页面之间的WebRTC信令传输
  6. 浏览器和KMS之前的媒体连接建立好之后,即可接收流媒体数据再播放出来
  • 接下来进入实战,从部署KMS开始

源码下载

  • 本篇实战中的完整源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):

名称

链接

备注

项目主页

https://github.com/zq2599/blog_demos

该项目在GitHub上的主页

git仓库地址(https)

https://github.com/zq2599/blog_demos.git

该项目源码的仓库地址,https协议

git仓库地址(ssh)

git@github.com:zq2599/blog_demos.git

该项目源码的仓库地址,ssh协议

  • 这个git项目中有多个文件夹,本次实战的源码在kurentordemo文件夹下,如下图红框所示:
  • kurentordemo是整个《Kurento实战》系列的父工程,里面有多个子工程,本篇对应的源码是子工程player-with-record,如下图红框:

部署KMS

  • 为了简单操作,KMS还是采用docker的方式部署,执行如下命令即可:
docker run -d 
--restart always 
--name kms 
--network host 
kurento/kurento-media-server:6.15
  • 和之前实战不同的是,KMS和player-with-record应用分别部署在不同的电脑上,因此,KMS所在机器记得关闭防火墙或者开放8888端口;

开发PlayerWithRecorder应用

  • 在kurentodemo工程下,新增名为player-with-record的子工程,其pom.xml内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <artifactId>kurentodemo</artifactId>
        <groupId>com.bolingcavalry</groupId>
        <version>1.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>
    <groupId>com.bolingcavalry</groupId>
    <artifactId>player-with-record</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>player-with-record</name>
    <packaging>jar</packaging>
    <description>show how to play and record the file</description>

    <!--不用spring-boot-starter-parent作为parent时的配置-->
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.3.3.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-websocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>webjars-locator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.webjars.bower</groupId>
            <artifactId>jquery</artifactId>
        </dependency>

        <dependency>
            <groupId>org.webjars.bower</groupId>
            <artifactId>bootstrap</artifactId>
        </dependency>

        <dependency>
            <groupId>org.webjars.bower</groupId>
            <artifactId>demo-console</artifactId>
        </dependency>

        <dependency>
            <groupId>org.webjars.bower</groupId>
            <artifactId>ekko-lightbox</artifactId>
        </dependency>

        <dependency>
            <groupId>org.webjars.bower</groupId>
            <artifactId>webrtc-adapter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.kurento</groupId>
            <artifactId>kurento-client</artifactId>
        </dependency>

        <dependency>
            <groupId>org.kurento</groupId>
            <artifactId>kurento-utils-js</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.3.RELEASE</version>
                <configuration>
                    <mainClass>com.bolingcavalry.playerwithrecord.PlayerWithRecorder</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>

        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.*</include>
                </includes>
            </resource>
        </resources>
    </build>
</project>
  • 配置文件application.properties很简单:
# 端口
server.port=8080
#
spring.application.name=PlayerWithRecorder
  • 新增一个数据结构UserSession.java,每个网页都对应一个UserSession实例,重点关注的是release方法,在停止播放时调用此方法释放播放器和WebRTC连接资源:
package com.bolingcavalry.playerwithrecord;

import org.kurento.client.IceCandidate;
import org.kurento.client.MediaPipeline;
import org.kurento.client.PlayerEndpoint;
import org.kurento.client.WebRtcEndpoint;

public class UserSession {

  private WebRtcEndpoint webRtcEndpoint;
  private MediaPipeline mediaPipeline;
  private PlayerEndpoint playerEndpoint;

  public UserSession() {
  }

  public WebRtcEndpoint getWebRtcEndpoint() {
    return webRtcEndpoint;
  }

  public void setWebRtcEndpoint(WebRtcEndpoint webRtcEndpoint) {
    this.webRtcEndpoint = webRtcEndpoint;
  }

  public MediaPipeline getMediaPipeline() {
    return mediaPipeline;
  }

  public void setMediaPipeline(MediaPipeline mediaPipeline) {
    this.mediaPipeline = mediaPipeline;
  }

  public void addCandidate(IceCandidate candidate) {
    webRtcEndpoint.addIceCandidate(candidate);
  }

  public PlayerEndpoint getPlayerEndpoint() {
    return playerEndpoint;
  }

  public void setPlayerEndpoint(PlayerEndpoint playerEndpoint) {
    this.playerEndpoint = playerEndpoint;
  }

  public void release() {
    this.playerEndpoint.stop();
    this.mediaPipeline.release();
  }
}
  • 启动类PlayerWithRecorder.java,有两处要注意,一个是registerWebSocketHandlers方法用来绑定websocket的处理类,另一个是kurentoClient,KurentoClient.create方法的入参是KMS的服务地址:
package com.bolingcavalry.playerwithrecord;

import org.kurento.client.KurentoClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;

@EnableWebSocket
@SpringBootApplication
public class PlayerWithRecorder implements WebSocketConfigurer {

  @Bean
  public PlayerHandler handler() {
    return new PlayerHandler();
  }

  /**
   * 实例化KurentoClient,入参是KMS地址
   * @return
   */
  @Bean
  public KurentoClient kurentoClient() {
    return KurentoClient.create("ws://192.168.91.128:8888/kurento");
  }

  @Bean
  public ServletServerContainerFactoryBean createServletServerContainerFactoryBean() {
    ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
    container.setMaxTextMessageBufferSize(32768);
    return container;
  }

  /**
   * 标准的WebSocket处理类绑定
   * @param registry
   */
  @Override
  public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
    registry.addHandler(handler(), "/player");
  }

  public static void main(String[] args) throws Exception {
    SpringApplication.run(PlayerWithRecorder.class, args);
  }
}
  • 接下来就是websocket的处理类PlayerHandler.java,这是本篇的核心,有几处重点稍后会提到:
package com.bolingcavalry.playerwithrecord;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.ConcurrentHashMap;
import org.kurento.client.EndOfStreamEvent;
import org.kurento.client.ErrorEvent;
import org.kurento.client.EventListener;
import org.kurento.client.IceCandidate;
import org.kurento.client.IceCandidateFoundEvent;
import org.kurento.client.KurentoClient;
import org.kurento.client.MediaPipeline;
import org.kurento.client.MediaState;
import org.kurento.client.MediaStateChangedEvent;
import org.kurento.client.PlayerEndpoint;
import org.kurento.client.ServerManager;
import org.kurento.client.VideoInfo;
import org.kurento.client.WebRtcEndpoint;
import org.kurento.commons.exception.KurentoException;
import org.kurento.jsonrpc.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;

public class PlayerHandler extends TextWebSocketHandler {

  @Autowired
  private KurentoClient kurento;

  private final Logger log = LoggerFactory.getLogger(PlayerHandler.class);
  private final Gson gson = new GsonBuilder().create();
  private final ConcurrentHashMap<String, UserSession> users = new ConcurrentHashMap<>();

  @Override
  public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    JsonObject jsonMessage = gson.fromJson(message.getPayload(), JsonObject.class);
    String sessionId = session.getId();
    log.debug("用户[{}]收到websocket命令: {} from sessionId", sessionId, jsonMessage);

    try {
      switch (jsonMessage.get("id").getAsString()) {
        // 开始播放
        case "start":
          start(session, jsonMessage);
          break;
        // 停止播放
        case "stop":
          stop(sessionId);
          break;
        // 暂停
        case "pause":
          pause(sessionId);
          break;
        // 恢复
        case "resume":
          resume(session);
          break;
        // 生成监控内容
        case "debugDot":
          debugDot(session);
          break;
        // 前进或者倒退
        case "doSeek":
          doSeek(session, jsonMessage);
          break;
        // 取位置
        case "getPosition":
          getPosition(session);
          break;
        // 更新WebRTC的ICE数据
        case "onIceCandidate":
          onIceCandidate(sessionId, jsonMessage);
          break;
        default:
          sendError(session, "Invalid message with id " + jsonMessage.get("id").getAsString());
          break;
      }
    } catch (Throwable t) {
      log.error("Exception handling message {} in sessionId {}", jsonMessage, sessionId, t);
      sendError(session, t.getMessage());
    }
  }

  private void start(final WebSocketSession session, JsonObject jsonMessage) {
    // 1.新建MediaPipeline对象
    MediaPipeline pipeline = kurento.createMediaPipeline();

    // 2. 新建连接浏览器的WebRtcEndpoint对象
    WebRtcEndpoint webRtcEndpoint = new WebRtcEndpoint.Builder(pipeline).build();

    // 3.1 取出要播放的地址
    String videourl = jsonMessage.get("videourl").getAsString();

    // 3.2 新建负责播放的PlayerEndpoint对象
    final PlayerEndpoint playerEndpoint = new PlayerEndpoint.Builder(pipeline, videourl).build();

    // 4 playerEndpoint连接webRtcEndpoint,这样playerEndpoint解码出的内容通过webRtcEndpoint给到浏览器
    playerEndpoint.connect(webRtcEndpoint);

    // 5. WebRtc相关的操作
    // 5.1 一旦收到KMS的candidate就立即给到前端
    webRtcEndpoint.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {

      @Override
      public void onEvent(IceCandidateFoundEvent event) {
        JsonObject response = new JsonObject();
        response.addProperty("id", "iceCandidate");
        response.add("candidate", JsonUtils.toJsonObject(event.getCandidate()));
        try {
          synchronized (session) {
            session.sendMessage(new TextMessage(response.toString()));
          }
        } catch (IOException e) {
          log.debug(e.getMessage());
        }
      }
    });

    // SDP offer是前端给的
    String sdpOffer = jsonMessage.get("sdpOffer").getAsString();
    // 给前端准备SDP answer
    String sdpAnswer = webRtcEndpoint.processOffer(sdpOffer);

    log.info("[Handler::start] SDP Offer from browser to KMS:
{}", sdpOffer);
    log.info("[Handler::start] SDP Answer from KMS to browser:
{}", sdpAnswer);

    JsonObject response = new JsonObject();
    response.addProperty("id", "startResponse");
    response.addProperty("sdpAnswer", sdpAnswer);
    sendMessage(session, response.toString());

    // 6. 和媒体播放有关的操作
    // 6.1 KMS会发送和媒体播放有关的消息过来,如果连接媒体成功,就把获取到的相关参数给到前端
    webRtcEndpoint.addMediaStateChangedListener(new EventListener<MediaStateChangedEvent>() {
      @Override
      public void onEvent(MediaStateChangedEvent event) {

        if (event.getNewState() == MediaState.CONNECTED) {
          // 媒体相关的信息可以用getVideoInfo去的
          VideoInfo videoInfo = playerEndpoint.getVideoInfo();

          JsonObject response = new JsonObject();
          response.addProperty("id", "videoInfo");
          response.addProperty("isSeekable", videoInfo.getIsSeekable());
          response.addProperty("initSeekable", videoInfo.getSeekableInit());
          response.addProperty("endSeekable", videoInfo.getSeekableEnd());
          response.addProperty("videoDuration", videoInfo.getDuration());

          // 把这些媒体信息给前端
          sendMessage(session, response.toString());
        }
      }
    });

    // 让KMS把它的ICD Candidate发过来(前面的监听会收到)
    webRtcEndpoint.gatherCandidates();

    // 7.1 添加媒体播放的监听:异常消息
    playerEndpoint.addErrorListener(new EventListener<ErrorEvent>() {
      @Override
      public void onEvent(ErrorEvent event) {
        log.info("ErrorEvent: {}", event.getDescription());
        // 通知前端停止播放
        sendPlayEnd(session);
      }
    });

    // 7.2 添加媒体播放的监听:播放结束
    playerEndpoint.addEndOfStreamListener(new EventListener<EndOfStreamEvent>() {
      @Override
      public void onEvent(EndOfStreamEvent event) {
        log.info("EndOfStreamEvent: {}", event.getTimestamp());
        // 通知前端停止播放
        sendPlayEnd(session);
      }
    });

    // 通过KMS开始连接远程媒体
    playerEndpoint.play();

    // 将pipeline、webRtcEndpoint、playerEndpoint这些信息放入UserSession对象中,
    // 这样方便处理前端发过来的各种命令
    final UserSession user = new UserSession();
    user.setMediaPipeline(pipeline);
    user.setWebRtcEndpoint(webRtcEndpoint);
    user.setPlayerEndpoint(playerEndpoint);
    users.put(session.getId(), user);
  }

  /**
   * 暂停播放
   * @param sessionId
   */
  private void pause(String sessionId) {
    UserSession user = users.get(sessionId);

    if (user != null) {
      user.getPlayerEndpoint().pause();
    }
  }

  /**
   * 从暂停恢复
   * @param session
   */
  private void resume(final WebSocketSession session) {
    UserSession user = users.get(session.getId());

    if (user != null) {
      user.getPlayerEndpoint().play();
      VideoInfo videoInfo = user.getPlayerEndpoint().getVideoInfo();

      JsonObject response = new JsonObject();
      response.addProperty("id", "videoInfo");
      response.addProperty("isSeekable", videoInfo.getIsSeekable());
      response.addProperty("initSeekable", videoInfo.getSeekableInit());
      response.addProperty("endSeekable", videoInfo.getSeekableEnd());
      response.addProperty("videoDuration", videoInfo.getDuration());
      sendMessage(session, response.toString());
    }
  }

  /**
   * 停止播放
   * @param sessionId
   */
  private void stop(String sessionId) {
    UserSession user = users.remove(sessionId);

    if (user != null) {
      user.release();
    }
  }

  /**
   * 取得Gstreamer的dot内容,这样的内容可以被graphviz工具解析成拓扑图
   * @param session
   */
  private void debugDot(final WebSocketSession session) {
    UserSession user = users.get(session.getId());

    if (user != null) {
      final String pipelineDot = user.getMediaPipeline().getGstreamerDot();
      try (PrintWriter out = new PrintWriter("player.dot")) {
        out.println(pipelineDot);
      } catch (IOException ex) {
        log.error("[Handler::debugDot] Exception: {}", ex.getMessage());
      }
      final String playerDot = user.getPlayerEndpoint().getElementGstreamerDot();
      try (PrintWriter out = new PrintWriter("player-decoder.dot")) {
        out.println(playerDot);
      } catch (IOException ex) {
        log.error("[Handler::debugDot] Exception: {}", ex.getMessage());
      }
    }

    ServerManager sm = kurento.getServerManager();
    log.warn("[Handler::debugDot] CPU COUNT: {}", sm.getCpuCount());
    log.warn("[Handler::debugDot] CPU USAGE: {}", sm.getUsedCpu(1000));
    log.warn("[Handler::debugDot] RAM USAGE: {}", sm.getUsedMemory());
  }

  /**
   * 跳转到指定位置
   * @param session
   * @param jsonMessage
   */
  private void doSeek(final WebSocketSession session, JsonObject jsonMessage) {
    UserSession user = users.get(session.getId());

    if (user != null) {
      try {
        user.getPlayerEndpoint().setPosition(jsonMessage.get("position").getAsLong());
      } catch (KurentoException e) {
        log.debug("The seek cannot be performed");
        JsonObject response = new JsonObject();
        response.addProperty("id", "seek");
        response.addProperty("message", "Seek failed");
        sendMessage(session, response.toString());
      }
    }
  }

  /**
   * 取得当前播放位置
   * @param session
   */
  private void getPosition(final WebSocketSession session) {
    UserSession user = users.get(session.getId());

    if (user != null) {
      long position = user.getPlayerEndpoint().getPosition();

      JsonObject response = new JsonObject();
      response.addProperty("id", "position");
      response.addProperty("position", position);
      sendMessage(session, response.toString());
    }
  }

  /**
   * 收到前端的Ice candidate后,立即发给KMS
   * @param sessionId
   * @param jsonMessage
   */
  private void onIceCandidate(String sessionId, JsonObject jsonMessage) {
    UserSession user = users.get(sessionId);

    if (user != null) {
      JsonObject jsonCandidate = jsonMessage.get("candidate").getAsJsonObject();
      IceCandidate candidate =
          new IceCandidate(jsonCandidate.get("candidate").getAsString(), jsonCandidate
              .get("sdpMid").getAsString(), jsonCandidate.get("sdpMLineIndex").getAsInt());
      user.getWebRtcEndpoint().addIceCandidate(candidate);
    }
  }

  /**
   * 通知前端停止播放
   * @param session
   */
  public void sendPlayEnd(WebSocketSession session) {
    if (users.containsKey(session.getId())) {
      JsonObject response = new JsonObject();
      response.addProperty("id", "playEnd");
      sendMessage(session, response.toString());
    }
  }

  /**
   * 将错误信息发给前端
   * @param session
   * @param message
   */
  private void sendError(WebSocketSession session, String message) {
    if (users.containsKey(session.getId())) {
      JsonObject response = new JsonObject();
      response.addProperty("id", "error");
      response.addProperty("message", message);
      sendMessage(session, response.toString());
    }
  }

  /**
   * 给前端发送消息的方法用synchronized修饰,
   * 因为收到KMS通知的时机不确定,此时可能正在给前端发送消息,存在同时调用sendMessage的可能
   * @param session
   * @param message
   */
  private synchronized void sendMessage(WebSocketSession session, String message) {
    try {
      session.sendMessage(new TextMessage(message));
    } catch (IOException e) {
      log.error("Exception sending message", e);
    }
  }

  /**
   * 和前端的websocket连接断开后,此方法会被调用
   * @param session
   * @param status
   * @throws Exception
   */
  @Override
  public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
    stop(session.getId());
  }
}
  • PlayerHandler.java的代码略多,但逻辑还是很清楚的,此处整理如下:
  1. handleTextMessage方法负责接收websocket命令,根据不同的命令调用对应的方法,如播放,暂停等
  2. 最重要的就是start方法了,这里面会通知KMS创建播放器(PlayerEndpoint),WebRTC连接组件(WebRtcEndpoint),还有SDP相关的处理,如offer、answer、candidate等
  3. 其余的如pause、seek等方法都是调用PlayerEndpoint对应的API,并不复杂,了解即可
  • 接下来是前端开发,作者欣宸并不擅长前端,直接使用了kurento官方demo的前端代码,具体代码请根据前面的提示去我的github下载,唯一要注意的是:kurento官方demo是https协议,我这里为了简单是http的,对应的index.js中websocket连接请使用http协议:
  • 如下图红框,所有前端资源和代码都在static目录下,篇幅所限就不展开了:
  • 至此,编码完成,可以启动应用了

验证

  • 启动应用player-with-record,浏览器访问:http://localhost:8080/,效果如下:
  • 找一个在线流流媒体试试播放效果,我用的是广东卫视的地址:rtmp://58.200.131.2:1935/livetv/gdtv,填入上图红框中,再点击绿色的Start按钮,效果如下图,并且声音也正常:
  • 广东卫视是直播类型的,无法执行暂停、快进等操作,咱们换一个点播类型的流媒体试试,我这里用的是http://clips.vorwaerts-gmbh.de/big_buck_bunny.mp4,如下图,各种操作可以进行:
  • 至此,一个简单的媒体播放器就完成了,接下来的实战,咱们给这个播放器增加一个功能:云端录制。