Jetbrains——Ktor 初体验!

一、什么是Ktor

​ Ktor 是一个使用强大的 Kotlin 语言在互联系统中构建异步服务器与客户端的框架。利用Ktor可以实现web服务器以及Socket周边的通信实现。

​ 参考官网:https://ktor.kotlincn.net/

​ 前置知识:IDEA Kotlin的函数式 Koltin协程 Socket通信

二、如何使用

​ 你可以使用gradle或者maven构建一个ktor应用,当然你可以在dockerhub上去找关于ktor的镜像,关于初学者建议使用前两个构建工具来搭建一个简单的ktor项目入门。

  • 构建ktor应用方式一:idea插件 https://plugins.jetbrains.com/plugin/10823-ktor-obsolete-
  • 构建ktor应用方式二:官网项目初始化器 https://start.ktor.io/#

官网的web服务有例子,这便着重测试Socket编程(官网socket文档很垃圾

三、基于maven的ktot使用netty实现的socket程序

maven依赖

	    <dependency>
            <groupId>io.ktor</groupId>
            <artifactId>ktor-server-netty</artifactId>
            <version>${ktor_version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback_version}</version>
        </dependency>
        <dependency>
            <groupId>io.ktor</groupId>
            <artifactId>ktor-server-core</artifactId>
            <version>${ktor_version}</version>
        </dependency>
        <dependency>
            <groupId>io.ktor</groupId>
            <artifactId>ktor-server-sessions</artifactId>
            <version>${ktor_version}</version>
        </dependency>
        <dependency>
            <groupId>io.ktor</groupId>
            <artifactId>ktor-websockets</artifactId>
            <version>${ktor_version}</version>
        </dependency>

四、编程思路

​ 这里我实现了一个基于netty的网络服务器,用来转发客户端的请求来实现网络通信,本来打算使用websocket实现(其实过),但由于考虑到websocket是socket的封装,性能毫无疑问的比socket低,对于游戏,fps,及时类的应用程序socket编程才是最佳之选。

​ [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v1npQQKx-1618722963322)(http://www.jiayou.art/ktor/ktor.jpg)]

​ 通过Socket Server来维护各个Socket Client的连接对象,对数据交互IO进行处理。

五、代码实现

对象序列化器

​ 传递对象避免出现粘包

class SerializableTool {
    companion object {
    	//将对象转为ByteArray(字节)数组
        fun ObjectToByteArray(obj: Any?): ByteArray {
            val byteArrayOutputStream = ByteArrayOutputStream()
            val objectOutputStream = ObjectOutputStream(byteArrayOutputStream)
            objectOutputStream.writeObject(obj)
            objectOutputStream.flush()
            return byteArrayOutputStream.toByteArray()
        }
		//将ByteArray(字节)数组转为对象(Object,在Kotlin对应的是Any)
        fun ByteArrayToObject(byteArray: ByteArray?): Any {
            val `in` = ByteArrayInputStream(byteArray)
            val sIn = ObjectInputStream(`in`)
            return sIn.readObject()
        }
    }
}
数据协议

各个socket通信我们使用自定义协议来实现,避免粘包

data class Translation(var data: ByteArray? = null, var messageCurrentTime: Long? = null) : Serializable
服务端
fun main(args: Array<String>) {
    val socketSet = Collections.synchronizedSet(LinkedHashSet<Socket>())
    val writeMap = Collections.synchronizedMap(LinkedHashMap<Socket, ByteWriteChannel>())
    runBlocking {
        val server = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind(InetSocketAddress("127.0.0.1", 2323))
        println("Started echo telnet server at ${server.localAddress}")

        while (true) {
            val socket = server.accept()
            socketSet.add(socket)
            println("有远程链连接!当前维护的连接对象数为:${socketSet.size}")
            val input = socket.openReadChannel()
            val output = socket.openWriteChannel(autoFlush = true)
            writeMap[socket] = output
            launch {
                println("Socket accepted: ${socket.remoteAddress}")
                try {
                    while (true) {
                        input.awaitContent()
                        if (input.availableForRead == 0) break
                        val availableLength = input.readInt()
                        val byteArray = ByteArray(availableLength)    //是的  ktor
                        input.readFully(byteArray)
                        socketSet.forEach {
                            if (it != socket) {
                                writeMap[it]!!.writeInt(availableLength)
                                writeMap[it]!!.writeFully(byteArray)
                            }
                        }
                        println((SerializableTool.ByteArrayToObject(byteArray) as Translation).data?.let { String(it) })
                    }

                } catch (e: Throwable) {
                    e.printStackTrace()
                    socketSet.remove(socket)
                    println("有远程链连接异常关闭!当前维护的连接对象数为:${socketSet.size}")
                } finally {
                     socket.close()
                }
            }


            launch {
                socket.awaitClosed()
                socketSet.forEach {
                    if (it==socket)
                        writeMap.remove(it)
                }
                socketSet.remove(socket)
                println("连接正常关闭。当前维护的连接对象数为:${socketSet.size}")
            }
        }
    }

客户端
fun main(args: Array<String>) {

    //我们使用的Scanner在hasNext方法等待输入的时候是阻塞式的
    //且输出操作的协程和读取操作的协程是在同一个协程上下文的,这就导致了该线程一直被Scanner的next方法阻塞
    //协程调度器无法从阻塞的线程中再调度,也就是说输出操作的协程一直会阻塞读取操作的协程
    //协程默认的上下文为当前线程(在这里是main线程),所以launch协程的默认调度上下文不符合我们的期望。

    // launch是CoroutineScope的拓展函数,第一个参数指定是一个携程的上下文,不传默认就是当前线程
    // public fun CoroutineScope.launch(
    //     context: CoroutineContext = EmptyCoroutineContext,
    //     start: CoroutineStart = CoroutineStart.DEFAULT,
    //     block: suspend CoroutineScope.() -> Unit
    // )
    // 这里我们new了两个单线程的协程上下文
    // threadPoolOfInput为读取操作的协程的上下文
    // threadPoolOfOutPut为输出操作的协程的上下文
    // 他们其实是两个线程,也就是读取操作和写出操作是运行在两个独立的线程
    // 所以他们不会互相阻塞对方

    val threadPoolOfInput = newSingleThreadContext("input")
    val threadPoolOfOutPut = newSingleThreadContext("output")

    //启动一个阻塞式协程上下文构建器
    runBlocking {
        //在指定主机和端口来连接一个ServerSockert服务,等待服务端响应连接(异步非阻塞式)
        val socket = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress("127.0.0.1", 2323))
        //打开socker套接字的输入流(后面的autoFlush参数为刷新缓冲区,不用再手动write完数据后手动调flush)
        val output = socket.openWriteChannel(autoFlush = true)
        //打开socker套接字的输入流
        val input = socket.openReadChannel()


        //如果是launch {
        // ...
        // }
        //读取操作协程会被输出操作的协程中scanner的hasNext()一直阻塞,收不到服务端发来的消息
        //读取操作的协程
        launch(threadPoolOfInput) {
            while (true) {
                input.awaitContent()
                if (input.availableForRead == 0) break
                val availableLength = input.readInt()
                val byteArray = ByteArray(availableLength)
                input.readFully(byteArray)
                val data = SerializableTool.ByteArrayToObject(byteArray) as Translation
                println(data.data?.let { kotlin.text.String(it) } + ",当前消息延迟为:" +
                        "${System.currentTimeMillis() - (if (data.messageCurrentTime == null) 0 else data.messageCurrentTime)!!}")

            }
        }

        //如果是launch {
        // ...
        // }
        //输出操作的协程中scanner的hasNext()会一直阻塞读取操作协程,收不到服务端发来的消息
        //输出操作的协程
        launch(threadPoolOfOutPut) {
            val scanner = Scanner(System.`in`)
            while (scanner.hasNext()) {
                val objectToByteArray = SerializableTool.ObjectToByteArray(
                    Translation(
                        ("来自客户端${socket.remoteAddress}的消息:" + scanner.nextLine()).toByteArray(Charsets.UTF_8),
                        System.currentTimeMillis()
                    )
                )
                output.writeInt(objectToByteArray.size)
                output.writeFully(objectToByteArray)
            }

        }

    }

}

六、测试

启动一个服务端,和三个客户端

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tZ1dJEex-1618722963326)(https://www.jiayou.art/ktor/1.png)]

客户端一发送"hello ktor!"

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fmYJ6Bht-1618722963328)(https://www.jiayou.art/ktor/2.png)]

服务端接收到消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Sf4wPc5J-1618722963330)(https://www.jiayou.art/ktor/3.png)]

客户端二接收到消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7nrv6XTW-1618722963330)(https://www.jiayou.art/ktor/4.png)]

客户端三接收到消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5vwmG0qN-1618722963331)(https://www.jiayou.art/ktor/5.png)]

大家忽略上面的延迟字样,第一次发消息延迟会比较高,因为会涉及第一次初始化,缓冲,建立流传输通道等操作,会后IO操作都是5ms左右。

七、总结

​ ktor非常实用,小巧,对netty和jetty都进行了封装,在web方面也提供了websocket、Auth、JWT、Jackson、SSL实现,包括web周边的路由,CORS,模板渲染引擎(Freemarker,Themyleaf,Velocity)实现,很适合作为项目的脚手架。
推荐链接:

​ 中国唯一 Google 官方认证 Android 和 Kotlin 双领域开发专家(GDE): https://space.bilibili.com/27559447?from=search&seid=18022887471961950104

​ ktor官网:https://ktor.kotlincn.net/

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐