1、websocketproxy代理服务基于go语言实现

 

2、功能描述:Proxy of gateway. Websockt transfer TCP protocol.

(1) Websocket -> TCP
(2)TCP -> Websocket

即:实现将websocket通信代理到TCP通信,实现web客户端通过websocket通信与后台TCP服务器之间的数据交互。

 

3、使用方法:讲go源码编译出可执行程序:

 

4、查看websocketproxy代理服务进程是否启动 及 启动代理的方法:

(1)查看命令:ps -elf | grep websocketproxy

websocketproxy没有启动状态:

(2)启动websocketproxy进程:

./websocketproxy 按下enter会提示启动方法:

(3)启动websocketproxy进程命令:./websocketproxy -host=127.0.0.1:554 -port=8081

(4)另起一个终端,查看websocketproxy进程是否启动:ps -elf | grep websocketproxy

如下图:则说明websocketproxy进程已经启动:

或者使用命令: ./websocketproxy -host=127.0.0.1:554 -port=8081 &

将websocketproxy进程在后台运行:

(5)启动websocketproxy进程的命令行解释:./websocketproxy -host=127.0.0.1:554 -port=8081 &

1)host=127.0.0.1:台服后务器TCP通信的ip地址

2)554:  后台服务器TCP通信所监听的端口

3)port=8081:建立websocket所使用的端口号,如:ws://127.0.0.1:8081/tztek/test

4)&: 将该进程放在后台运行

5)其他说明:将websocket通信所使用的8081端口的消息代理到tcp通信的554端口,实现消息交互。

 

5、使用举例:

(1)功能:实现将websocket通信所使用的8081端口的消息代理到easydarwin流媒体服务器的tcp通信的554端口,实现web客户端和easydarwin流媒体服务器之间的消息交互。

(2)步骤:

1)下载并进入easydarwin流媒体服务应用程序所在文件夹(自备):

2) 查看easydarwin.ini配置文件信息,查询tcp通信端口号,vim easydarwin.ini:

查询tcp通信端口号为:554

  1 [http]
  2 port=10008
  3 default_username=admin
  4 default_password=admin
  5 
  6 [rtsp]
  7 port=554
  8 
  9 ; rtsp 超时时间,包括RTSP建立连接与数据收发。
 10 timeout=28800
 11 
 12 ; 是否使能gop cache。如果使能,服务器会缓存最后一个I帧以及其后的非I帧,以提高播放速度。但是可能在高并发的情况下带来内存压力。
 13 gop_cache_enable=1
 14 
 15 ; 是否使能推送的同事进行本地存储,使能后则可以进行录像查询与回放。
 16 save_stream_to_local=1
 17 
 18 ;easydarwin使用ffmpeg工具来进行存储。这里表示ffmpeg的可执行程序的路径
 19 ffmpeg_path=/usr/local/ffmpeg/bin/ffmpeg
 20 
 21 ;本地存储所将要保存的根目录。如果不存在,程序会尝试创建该目录。
 22 m3u8_dir_path=/home/xuwei/xuwei/record
 23 
 24 ;切片文件时长。本地存储时,将以该时间段为标准来生成ts文件(该时间+一个I帧间隔),单位秒
 25 ts_duration_second=10
 26 
 27 ;ffmpeg转码格式,比如可设置为-c:v copy -c:a copy,表示copy源格式;default表示使用ffmpeg内置的输出格式,可能需要转码
 28 /stream_265=default

3)启动easydarwin流媒体服务器:./easydarwin

如下:则启动成功

 

4)启动websocketproxy进程,将websocket通信所使用的8081端口的消息代理到easydarwin流媒体服务器的tcp通信的554端口,实现web客户端和easydarwin流媒体服务器之间的消息交互。

命令:./websocketproxy -host=127.0.0.1:554 -port=8081 &

5)编写websocket通信demo,测试代理是否成功:

websocket通信demo源码如下:

<!DOCTYPE HTML>
<html>

<head>
  <meta charset="utf-8">
  <title>websocket通信代理测试</title>

  <script type="text/javascript">
    function WebSocketTest() {
      //1.创建websocket客户端
      var wsServer = 'ws://127.0.0.1:8081/tztek/test';
      var limitConnect = 30; // 断线重连次数
      var timeConnect = 0;
      webSocketInit(wsServer);

      //socket初始化
      function webSocketInit(service) {
        var ws = new WebSocket(service);
        console.log("ws == ", ws);
        ws.onopen = function () {
          console.log("已连接TCP服务器");
        };
        ws.onmessage = function (msg) {
          console.log(msg.data  );
        };
        ws.onclose = function () {
          console.log('服务器已经断开');
          reconnect(service);
        };

        // 重连
        function reconnect(service) {
          if (limitConnect > 0) {
            limitConnect--;
            timeConnect++;
            console.log("第" + timeConnect + "次重连");
            // 进行重连
            setTimeout(function () {
              webSocketInit(service);
            }, 2000);

          } else {
            console.log("TCP连接已超时");
          }
        }

        // 心跳 * 回应
        // setInterval(function () {
        //   ws.send('xw_ping');
        // }, 1000 * 5);

      }

    }
  </script>

</head>

<body>

  <div id="sse">
    <a href="javascript:WebSocketTest()">运行 WebSocket</a>
  </div>

</body>

</html>

测试现象及结果:

结果表明:

a.建立websocket成功

b.代理服务端口转发成功(未放截图),现象描述:web客户端能通过8081端口和easydarwin流媒体服务器之间实现数据交互。

6)其他:本web客户端测试demo,还具备断线重连 + 心跳检测(保活)功能:

a.关闭服务进程:killall websocketproxy

b.重启websocketproxy服务进程:./websocketproxy -host=127.0.0.1:554 -port=8081 &

 

 

 

6、附WebsocketProxy.go代理服务源码

package main

import (
	"flag"
	"fmt"
	"log"
	"net"
	"net/http"
	"os"
	"runtime"

	"github.com/gorilla/websocket"
)

var g_tcpAddress string

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

func copyW2TWorker(dst net.Conn, src *websocket.Conn, doneCh chan<- bool) {
	defer func() {
		doneCh <- true
	}()

	for {
		_, message, err := src.ReadMessage()
		if err != nil {
			//log.Fatalln("[copyW2TWorker] websocket ReadMessage failed! err=%v", err)
			return
		}

		log.Printf("type=%d recv=%s", websocket.BinaryMessage, message)
		_, err = dst.Write([]byte(message))
		if err != nil {
			//log.Fatalln("[copyW2TWorker] tcp Write failed! err=%v", err)
			return
		}
	}
}

func copyT2WWorker(dst *websocket.Conn, src net.Conn, doneCh chan<- bool) {
	defer func() {
		doneCh <- true
	}()

	buff := make([]byte, 1024)
	for {
		n, err := src.Read(buff)
		if err != nil {
		//	log.Fatalln("[copyT2WWorker] tcp Read failed! err=%v", err)
			return
		}

		log.Printf("T2W type = %d read message = %s\n", websocket.BinaryMessage, string(buff[:n]))
		err = dst.WriteMessage(websocket.BinaryMessage, buff[:n])
		if err != nil {
			//log.Fatalln("[copyT2WWorker] websocket WriteMessage failed! err=%v", err)
			return
		}
	}
}

func relayHandler(w http.ResponseWriter, r *http.Request) {
	ws, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Print("upgrade:", err)
		return
	}

	conn, err := net.Dial("tcp", g_tcpAddress)
	if err != nil {
		log.Printf("[ERROR] %v host [%v]\n", err, g_tcpAddress)
		return
	}

	doneCh := make(chan bool)

	go copyW2TWorker(conn, ws, doneCh)
	go copyT2WWorker(ws, conn, doneCh)

	<-doneCh
	conn.Close()
	ws.Close()
	<-doneCh
}

func usage() {
	fmt.Fprintf(os.Stderr, "Usage: %s -host=ip:port -port=listen_port [option]\noption:\n", os.Args[0])
	flag.PrintDefaults()
}

func Init() (string, int, string, string, error) {
	var host string
	var port int
	var certFile string
	var keyFile string

	flag.StringVar(&host, "host", "", "Object server host(ip:port) for proxy")
	flag.IntVar(&port, "port", 0, "Port to listen on.")
	flag.StringVar(&certFile, "tlscert", "", "TLS cert file path")
	flag.StringVar(&keyFile, "tlskey", "", "TLS key file path")
	flag.Usage = usage
	flag.Parse()

	if host == "" || port == 0 {
		return host, port, certFile, keyFile, fmt.Errorf("Not enough args!")
	}

	return host, port, certFile, keyFile, nil
}

func main() {
	runtime.GOMAXPROCS(1) //设置cpu的核的数量,从而实现高并发

	tcpAddress, port, certFile, keyFile, err := Init()
	if err != nil {
		usage()
		os.Exit(1)
	}

	g_tcpAddress = tcpAddress

	portString := fmt.Sprintf(":%d", port)
	log.Printf("[INFO] starting server on port %d to proxy server [%s]\n", port, tcpAddress)

	if certFile != "" && keyFile != "" {
		err = http.ListenAndServeTLS(portString, certFile, keyFile, nil)
	} else {
		http.HandleFunc("/tztek/wstest", relayHandler)
		err = http.ListenAndServe(portString, nil)
	}
	if err != nil {
		log.Fatal(err)
	}
}

 

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐