前言

大学的学习时光临近尾声,感叹时光匆匆,三年一晃而过。同学们都忙着找工作,我也在这里抛一份简历吧,欢迎各位老板和猎手诚邀。我们进入正题。直播行业是当前火热的行业,谁都想从中分得一杯羹,直播养活了一大批人,一个平台主播粗略估计就有几千号人,但是实时在线观看量有的居然到了惊人的百万级别,特别是游戏主播,可想而知,直播间是一个磁铁式的广告传播媒介,也难怪这么多巨头公司都抢着做直播。我不太清楚直播行业技术有多深,毕竟自己没做过,但是咱们可以自己实现一个满足几百号人同时观看的直播间呀。


最终成果

手机端效果

动图

这个场景很熟悉吧~~ 通过obs推流软件来推流。

图片描述

户外直播,通过yasea手机端推流软件,使用手机摄像头推流。

图片描述

电脑端效果

播放香港卫视

图片描述

直播画面

图片描述

项目总览

项目分为三个部分:

  1. 客户端 
    直播间视频拉流、播放和聊天室,炫酷的弹幕以及直播间信息

  2. 服务端 
    处理直播间、用户的数据业务,聊天室消息的处理

  3. 服务器部署 
    视频服务器和web服务器

技术栈

移动客户端

  • VUE全家桶

  • UI层vonic

  • axios

  • 视频播放器: vue-video-player + videojs-contrib-hls

  • websocket客户端: vue-stomp

  • 弹幕插件: vue-barrage

  • 打包工具:webpack

电脑端客户端

  • 项目架构: Jquery + BootStrap

  • 视频播放器: video.js

  • websocket客户端: stomp.js + sockjs.js

  • 弹幕插件: Jquery.danmu.js

  • 模版引擎: thymeleaf

服务端

  • IDE: IntelliJ IDEA

  • 项目架构: SpringBoot1.5.4 +Maven3.0

  • 主数据库: Mysql5.7

  • 辅数据库: redis3.2

  • 数据库访问层: spring-boot-starter-data-jpa + spring-boot-starter-data-redis

  • websocket: spring-boot-starter-websocket

  • 消息中间件: RabbitMQ/3.6.10

服务器部署

  • 视频直播模块: nginx-rtmp-module

  • web应用服务器: tomcat8.0

  • 服务器: 腾讯云centos6.5

技术点讲解

直播间主要涉及到两个主要功能:第一是视频直播、第二是聊天室。这两个都是非常讲究实时性。

  • 视频直播

说到直播我们先了解下几个常用的直播流协议,看了挺多的流媒体协议文章博客,但都是非常粗略,这里有个比较详细的 流媒体协议介绍,如果想详细了解协议内容估计去要看看专业书籍了。这里我们用到的只是rtmp和hls,实践后发现:rtmp只能够在电脑端播放,hls只能够在手机端播放。而且rtmp是相当快的尽管没有rtsp那么快,延迟只有几秒,我测试的就差不多2-5秒,但是hls大概有10几秒。所以如果你体验过demo,就会发现手机延迟比较多。

直播的流程:
直播分为推流和拉流两个过程,那么流推向哪里,拉流又从哪里拉取呢?那当然需要视频服务器啦,千万不要以为视频直播服务器很复杂,其实在nginx服务器中一切都变得简单。后面我会讲解如何部署Nginx服务器并配置视频模块(nginx-rtmp-module).

首先主播通过推流软件,比如OBS Studio推流软件,这个是比较专业级别的,很多直播平台的推荐主播使用这个软件来推送视频流,这里我也推荐一个开源的安卓端推流工具Yasea,下载地址,文件很小,但是很强大。
直播内容推送到服务器后,就可以在服务器端使用视频编码工具进行转码了,可以转换成各种高清,标清,超清的分辨率视频,也就是为什么我们在各个视频网站都可以选择视频清晰度。这里我们没有转码,只是通过前端视频播放器(video.js)来拉取视频.这样整个视频推流拉流过程就完成了。

  • 聊天室

直播间里面的聊天室跟我们的群聊天差不多,只不过它变成了web端,web端的即时通信方案有很多,这里我们选择websocket协议来与服务端通信,websocket是基于http之上的传输协议,客户端向服务端发送http请求,并携带Upgrade:websocket升级头信息表示转换websocket协议,通过与服务端握手成功后就可以建立tcp通道,由此来传递消息,它与http最大的差别就是,服务端可以主动向客户端发送消息。

既然建立了消息通道,那我们就需要往通道里发消息,但是总得需要一个东西来管控消息该发给谁吧,要不然全乱套了,所以我们选择了消息中间件RabbitMQ.使用它来负责消息的路由去向。


理论知识都讲完啦,实操时间到!

移动客户端实操

源码地址

工程结构

|—— build                        构建服务和webpack配置        
|—— congfig                      项目不同环境的配置
|—— dist                         build生成生产目录
|—— static                       静态资源
|—— package.json                 项目配置文件
|—— src                          开发源代码目录
    |—— api                      通过axios导出的api目录
    |—— components               页面和组件
    |—— public                   公有组件
    |—— vuex                     全局状态
    |—— main.js                  应用启动配置点

功能模块

  • 拉取服务器的直播视频流(hls)并播放直播画面

  • 与服务端创建websocket连接,收发聊天室消息

  • 通过websocket获取消息并发送到弹幕

  • 通过websocket实时更新在线用户

  • 结合服务端获取访问历史记录

  • 问题反馈模块

效果图

全局功能

项目说明

请参考源码

服务端实操

源码地址

由于个人比较喜欢接触新的东西,所以后端选择了springboot,前端选择了Vue.js年轻人嘛总得跟上潮流。SpringBoot实践过后发现真的太省心了,不用再理会各种配置文件,全自动化装配。
这里贴一下pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hushangjie</groupId>
    <artifactId>rtmp-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>rtmp-demo</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-actuator-docs</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </dependency>-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        <!--非严格模式解析HTML5-->
        <dependency>
            <groupId>net.sourceforge.nekohtml</groupId>
            <artifactId>nekohtml</artifactId>
            <version>1.9.22</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <!-- 打包成war时可以移除嵌入式tomcat插件 -->
            <!--<exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>-->
        </dependency>
        <!--<dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
            <scope>provided</scope>
        </dependency>-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>vue</artifactId>
            <version>2.1.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- RabbitMQ相关配置-->
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>2.0.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-net</artifactId>
            <version>2.0.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.6.Final</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <fork>true</fork>
                </configuration>
            </plugin>
        </plugins>
    </build>


</project>

application.properties文件

spring.datasource.url=jdbc:mysql://host:3306/database?characterEncoding=utf8&amp;useSSL=false
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.thymeleaf.mode=LEGACYHTML5
server.port=8085
# REDIS (RedisProperties)
# Redis数据库索引(默认为0)
spring.redis.database=0  
# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379  
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.pool.max-active=8  
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.pool.max-wait=-1  
# 连接池中的最大空闲连接
spring.redis.pool.max-idle=8  
# 连接池中的最小空闲连接
spring.redis.pool.min-idle=0  
# 连接超时时间(毫秒)
spring.redis.timeout=0 

websocket配置

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    //拦截器注入service失败解决办法
    @Bean
    public MyChannelInterceptor myChannelInterceptor(){
        return new MyChannelInterceptor();
    }
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //添加访问域名限制可以防止跨域socket连接
        //setAllowedOrigins("http://localhost:8085")
        registry.addEndpoint("/live").setAllowedOrigins("*").addInterceptors(new HandShkeInceptor()).withSockJS();

    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        /*.enableSimpleBroker("/topic","/queue");*/
        //假如需要第三方消息代理,比如rabitMQ,activeMq,在这里配置
        registry.setApplicationDestinationPrefixes("/demo")
                .enableStompBrokerRelay("/topic","/queue")
                .setRelayHost("127.0.0.1")
                .setRelayPort(61613)
                .setClientLogin("guest")
                .setClientPasscode("guest")
                .setSystemLogin("guest")
                .setSystemPasscode("guest")
                .setSystemHeartbeatSendInterval(5000)
                .setSystemHeartbeatReceiveInterval(4000);
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        ChannelRegistration channelRegistration = registration.setInterceptors(myChannelInterceptor());
        super.configureClientInboundChannel(registration);
    }

    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        super.configureClientOutboundChannel(registration);
    }

}

配置类继承了消息代理配置类,意味着我们将使用消息代理rabbitmq.使用registerStompEndpoints方法注册一个websocket终端连接。这里我们需要了解两个东西,第一个是stomp和sockjs,sockjs是啥呢,其实它是对于websocket的封装,因为如果单纯使用websocket的话效率会非常低,我们需要的编码量也会增多,而且如果浏览器不支持websocket,sockjs会自动降级为轮询策略,并模拟websocket,保证客户端和服务端可以通信。
stomp有是什么看这里

stomp是一种简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互,也就是我们上面的RabbbitMQ,它就是一个消息代理。
我们可以通过configureMessageBroker来配置消息代理,需要注意的是我们将要部署的服务器也应该要有RabbitMQ,因为它是一个中间件,安装非常容易,这里就不说明了。这里我们配置了“/topic,/queue”两个代理转播策略,就是说客户端订阅了前缀为“/topic,/queue”频道都会通过消息代理(RabbitMQ)来转发。跟spring没啥关系啦,完全解耦。

websocke如何保证安全

一开始接触 stomp的时候一直有个问题困扰我,客户端只要与服务端通过websocket建立了连接,那么他就可以订阅任何内容,意味着可以接受任何消息,这样岂不是乱了套啦,于是我翻阅了大量博客文章,很多都是官方的例子并没有解决实际问题。经过琢磨,其实websocket是要考虑安全性的。具体在以下几个方面

  1. 跨域websocket连接

  2. 协议升级前握手拦截器

  3. 消息信道拦截器

对于跨域问题,我们可以通过setAllowedOrigins方法来设置可连接的域名,防止跨站连接。

对于站内用户是否允许连接我们可以如下配置

public class HandShkeInceptor extends HttpSessionHandshakeInterceptor {
    private static final Set<UserEntity> ONLINE_USERS = new HashSet<>();
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {

        System.out.println("握手前"+request.getURI());
        //http协议转换websoket协议进行前,通常这个拦截器可以用来判断用户合法性等
        //鉴别用户
       if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
           //这句话很重要如果getSession(true)会导致移动端无法握手成功
           //request.getSession(true):若存在会话则返回该会话,否则新建一个会话。
           //request.getSession(false):若存在会话则返回该会话,否则返回NULL
           //HttpSession session = servletRequest.getServletRequest().getSession(false);
            HttpSession session = servletRequest.getServletRequest().getSession();
            UserEntity user = (UserEntity) session.getAttribute("user");
            if (user != null) {
                //这里只使用简单的session来存储用户,如果使用了springsecurity可以直接使用principal
                return super.beforeHandshake(request, response, wsHandler, attributes);
            }else {
                System.out.println("用户未登录,握手失败!");
                return false;
            }
        }
        return false;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
        //握手成功后,通常用来注册用户信息
        System.out.println("握手后");
        super.afterHandshake(request, response, wsHandler, ex);
    }
}

HttpSessionHandshakeInterceptor 这个拦截器用来管理握手和握手后的事情,我们可以通过请求信息,比如token、或者session判用户是否可以连接,这样就能够防范非法用户。

那如何限制用户只能订阅指定内容呢?我们接着往下看

public class MyChannelInterceptor extends ChannelInterceptorAdapter {
    @Autowired
    private StatDao statDao;
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @Override
    public boolean preReceive(MessageChannel channel) {
        System.out.println("preReceive");
        return super.preReceive(channel);
    }

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        //检测用户订阅内容(防止用户订阅不合法频道)
        if (StompCommand.SUBSCRIBE.equals(command)) {
            //从数据库获取用户订阅频道进行对比(这里为了演示直接使用set集合代替)
            Set<String> subedChannelInDB = new HashSet<>();
            subedChannelInDB.add("/topic/group");
            subedChannelInDB.add("/topic/online_user");
            if (subedChannelInDB.contains(accessor.getDestination())) {
                //该用户订阅的频道合法
                return super.preSend(message, channel);
            } else {
                //该用户订阅的频道不合法直接返回null前端用户就接受不到该频道信息。
                return null;
            }
        } else {
            return super.preSend(message, channel);
        }

    }
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        //System.out.println("afterSendCompletion");
        //检测用户是否连接成功,搜集在线的用户信息如果数据量过大我们可以选择使用缓存数据库比如redis,
        //这里由于需要频繁的删除和增加集合内容,我们选择set集合来存储在线用户
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        if (StompCommand.SUBSCRIBE.equals(command)){
            Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes");
            //ONLINE_USERS.add(map.get("user"));
            UserEntity user = map.get("user");
            if(user != null){
                statDao.pushOnlineUser(user);
                Guest guest = new Guest();
                guest.setUserEntity(user);
                guest.setAccessTime(Calendar.getInstance().getTimeInMillis());
                statDao.pushGuestHistory(guest);
                //通过websocket实时返回在线人数
                this.simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline());
            }

        }
        //如果用户断开连接,删除用户信息
        if (StompCommand.DISCONNECT.equals(command)){
            Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes");
            //ONLINE_USERS.remove(map.get("user"));
            UserEntity user = map.get("user");
            if (user != null){
                statDao.popOnlineUser(user);
                simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline());
            }

        }
        super.afterSendCompletion(message, channel, sent, ex);
    }

}

在stomp里面,Channel信道就是消息传送的通道,客户端与服务端建立了连接就相当于建立了通道,以后的信息就是通过这个通道来传输。所有的消息都有消息头,被封装在了spring 的messag接口中,比如建立连接时候消息头就含有CONNECT,当然还有一些其他的信息。客户端订阅的时候也有订阅头信息SUBSCRIBE,那么我是不是可以在这个拦截器ChannelInterceptorAdapter 中拦截每个人的订阅信息,然后与数据库的信息作比对,最后决定这个用户是否可以订阅这个频道的信息呢,对的,这是我的想法,按照这样的思路,做单聊不是迎刃而解了吗。
那客户端通过websocket发送的消息如何到达订阅者手中呢,按照rabbitmq的规则,订阅者属于消费者,发送消息的一方属于生产者,生产者通过websocket把消息发送到服务端,服务端通过转发给消息代理(rabbitmq),消息代理负责存储消息,管理发送规则,推送消息给订阅者,看下面的代码

    @MessageMapping(value = "/chat")
    @SendTo("/topic/group")
    public MsgEntity testWst(String message , @Header(value = "simpSessionAttributes") Map<String,Object> session){
        UserEntity user = (UserEntity) session.get("user");
        String username = user.getRandomName();
        MsgEntity msg = new MsgEntity();
        msg.setCreator(username);
        msg.setsTime(Calendar.getInstance());
        msg.setMsgBody(message);
        return msg;
    }

@MessageMapping看起来跟springmvc方法特别像,它即可以用在类级别上也可以用在方法级别上
当发送者往‘/chat’发送消息后,服务端接受到消息,再发送给“/topic/group”的订阅者,@SendTo就是发送给谁,这里需要注意的有,如果我们没有配置消息代理,只使用了enableSimpleBroker("/topic","/queue")简单消息代理,那么就是直接发送到消息订阅者,如果配置了消息代理,那还要通过消息代理,由它来转发。

如果我们想在服务端随时发送消息,而不是在客户端发送(这样的场景很常见,比如发送全局通知),可以使用SimpMessagingTemplate类,通过注入该bean,在合适的业务场景中发送消息。

Redis统计数据

直播间经常需要统计数据,比如实时在线人数,访问量,贡献排行榜,订阅量。我选择的方案是使用redis来计数,尽管这个demo可能不会太多人访问,但是我的目的是学习如何使用redis
先看springboot中redis的配置

@Configuration
public class RedisConfig extends CachingConfigurerSupport{
    /**
     * 生成key的策略
     *
     * @return
     */
    @Bean
    public KeyGenerator keyGenerator() {
        return new KeyGenerator() {
            @Override
            public Object generate(Object target, Method method, Object... params) {
                StringBuilder sb = new StringBuilder();
                sb.append(target.getClass().getName());
                sb.append(method.getName());
                for (Object obj : params) {
                    sb.append(obj.toString());
                }
                return sb.toString();
            }
        };
    }

    /**
     * 管理缓存
     *
     * @param redisTemplate
     * @return
     */
    @SuppressWarnings("rawtypes")
    @Bean
    public CacheManager cacheManager(RedisTemplate redisTemplate) {
        RedisCacheManager rcm = new RedisCacheManager(redisTemplate);
        //设置缓存过期时间
        // rcm.setDefaultExpiration(60);//秒
        //设置value的过期时间
        Map<String,Long> map=new HashMap();
        map.put("test",60L);
        rcm.setExpires(map);
        return rcm;
    }

    /**
     * RedisTemplate配置
     * @param factory
     * @return
     */
    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);//如果key是String 需要配置一下StringSerializer,不然key会乱码 /XX/XX
        template.afterPropertiesSet();
        //template.setStringSerializer();
        return template;
    }
}

redis数据统计Dao的实现

@Repository
public class StatDao {
    @Autowired
    RedisTemplate redisTemplate;
    public void pushOnlineUser(UserEntity userEntity){
        redisTemplate.opsForSet().add("OnlineUser",userEntity);
    }
    public void popOnlineUser(UserEntity userEntity){
        redisTemplate.opsForSet().remove("OnlineUser" ,userEntity);
    }
    public Set getAllUserOnline(){
        return redisTemplate.opsForSet().members("OnlineUser");
    }
    public void pushGuestHistory(Guest guest){
        //最多存储指定个数的访客
        if (redisTemplate.opsForList().size("Guest") == 200l){
            redisTemplate.opsForList().rightPop("Guest");
        }
        redisTemplate.opsForList().leftPush("Guest",guest);
    }
    public List getGuestHistory(){
        return redisTemplate.opsForList().range("Guest",0,-1);
    }
}

Dao层非常简单,因为我们只需要统计在线人数和访客。但是在线人数是实时更新的,既然我们使用了websocket实时数据更新就非常容易了,前面我们讲过,通过信道拦截器可以拦截连接,订阅,断开连接等等事件信息,所以我们就可以当用户连接时存储在线用户,通过websocket返回在线用户信息。

public class MyChannelInterceptor extends ChannelInterceptorAdapter {
    @Autowired
    private StatDao statDao;
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @Override
    public boolean preReceive(MessageChannel channel) {
        System.out.println("preReceive");
        return super.preReceive(channel);
    }

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        //检测用户订阅内容(防止用户订阅不合法频道)
        if (StompCommand.SUBSCRIBE.equals(command)) {
            //从数据库获取用户订阅频道进行对比(这里为了演示直接使用set集合代替)
            Set<String> subedChannelInDB = new HashSet<>();
            subedChannelInDB.add("/topic/group");
            subedChannelInDB.add("/topic/online_user");
            if (subedChannelInDB.contains(accessor.getDestination())) {
                //该用户订阅的频道合法
                return super.preSend(message, channel);
            } else {
                //该用户订阅的频道不合法直接返回null前端用户就接受不到该频道信息。
                return null;
            }
        } else {
            return super.preSend(message, channel);
        }

    }
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        //System.out.println("afterSendCompletion");
        //检测用户是否连接成功,搜集在线的用户信息如果数据量过大我们可以选择使用缓存数据库比如redis,
        //这里由于需要频繁的删除和增加集合内容,我们选择set集合来存储在线用户
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        if (StompCommand.SUBSCRIBE.equals(command)){
            Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes");
            //ONLINE_USERS.add(map.get("user"));
            UserEntity user = map.get("user");
            if(user != null){
                statDao.pushOnlineUser(user);
                Guest guest = new Guest();
                guest.setUserEntity(user);
                guest.setAccessTime(Calendar.getInstance().getTimeInMillis());
                statDao.pushGuestHistory(guest);
                //通过websocket实时返回在线人数
                this.simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline());
            }

        }
        //如果用户断开连接,删除用户信息
        if (StompCommand.DISCONNECT.equals(command)){
            Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes");
            //ONLINE_USERS.remove(map.get("user"));
            UserEntity user = map.get("user");
            if (user != null){
                statDao.popOnlineUser(user);
                simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline());
            }

        }
        super.afterSendCompletion(message, channel, sent, ex);
    }

}

由于这个项目有移动端和电脑端,所以需要根据请求代理UserAgent来判断客户端属于哪一种类型。这个工具类在源码上有。我就不贴了。

服务器部署

说了这么多即时通信,却没发现视频直播。不要着急我们马上进入视频环节。文章开头就说明了几种媒体流协议,这里不讲解详细的协议流程,只需要知道,我们是通过推流软件采集视频信息,如何采集也不是我们关注的。采集到信息后通过软件来推送到指定的服务器,如下图

obs推流设置

电脑端

yasea手机端推流设置

电脑端

红色部分是服务器开放的获取流接口。

Nginx-rtmp-module配置

视频服务器有很多,也支持很多媒体流协议。这里我们选择nginx-rtmp-module来做视频服务,接下来我们需要在linux下安装nginx,并安装rtmp模块。本人也是linux初学者,一步步摸索着把服务器搭建好,听说tomcat和nginx很配哦,所以作为免费开源的当然首选这两个。
接下来需要在linux安装一下软件和服务。

  1. Nginx以及Nginx-rtmp-module

  2. Tomcat

  3. Mysql

  4. Redis

  5. RabbitMQ

安装步骤我就不说了,大家搜索一下啦,这里贴一下nginx.conf文件配置

rtmp {
    server {
        listen 1935;
        chunk_size 4096;

        application video {
                play /yjdata/www/www/video;
        }
        application live {
                live on;
                hls on;
                hls_path /yjdata/www/www/live/hls/;
                hls_fragment 5s;
        }
    }
}

上面代码是配置rtmp模块, play /yjdata/www/www/video 指的是配置点播模块,可以直接播放/yjdata/www/www/video路径下的视频。hls_path制定hls分块存放路径,因为hls是通过获取到推送的视频流信息,分块存储在服务器。所以它的延时比rtmp要更高。

 server {
        listen       80;
        server_name  localhost;

        #charset koi8-r;
        index index.jsp index.html;
        root /yjdata/www/www;
        #access_log  logs/host.access.log  main;

        location / {
            proxy_pass  http://127.0.0.1:8080;
        }
        location ~ .*\.(gif|jpg|jpeg|png|bmp|swf|js|css|docx|pdf|doc|ppt|html|properties)$ {
                expires 30d;
                root /yjdata/www/www/static/;
        }
        location /hls {
            types {
                application/vnd.apple.mpegurl m3u8;
                #application/x-mpegURL;
                video/mp2t ts;
            }
            alias /yjdata/www/www/live/hls/;
            expires -1;
            add_header Cache-Control no-cache;
        }

        location /stat {
                 rtmp_stat all;
                 rtmp_stat_stylesheet stat.xsl;
        }

        location /stat.xsl {
                root /soft/nginx/nginx-rtmp-module/;
         }

上面配置了location 指向/hls,别名是/yjdata/www/www/live/hls/,所以可以在前端直接通过域名+/hls/+文件名.m3u8获取直播视频。
关于nginx的配置还有很多,我也在学习当中。总而言之nginx非常强大。

Logo

前往低代码交流专区

更多推荐