Jetbrain——Ktor 初体验!
Jetbrain——Ktor 初体验!一、什么是KtorKtor 是一个使用强大的 Kotlin 语言在互联系统中构建异步服务器与客户端的框架。利用Ktor可以实现web服务器以及Socket周边的通信实现。参考官网:https://ktor.kotlincn.net/前置知识:IDEA Kotlin的函数式Koltin协程Socket通信二、如何使用你可以使用gradle或者maven构
Jetbrains——Ktor 初体验!
一、什么是Ktor
Ktor 是一个使用强大的 Kotlin 语言在互联系统中构建异步服务器与客户端的框架。利用Ktor可以实现web服务器以及Socket周边的通信实现。
参考官网:https://ktor.kotlincn.net/
前置知识:IDEA Kotlin的函数式 Koltin协程 Socket通信
二、如何使用
你可以使用gradle或者maven构建一个ktor应用,当然你可以在dockerhub上去找关于ktor的镜像,关于初学者建议使用前两个构建工具来搭建一个简单的ktor项目入门。
- 构建ktor应用方式一:idea插件 https://plugins.jetbrains.com/plugin/10823-ktor-obsolete-
- 构建ktor应用方式二:官网项目初始化器 https://start.ktor.io/#
官网的web服务有例子,这便着重测试Socket编程(官网socket文档很垃圾)
三、基于maven的ktot使用netty实现的socket程序
maven依赖
<dependency>
<groupId>io.ktor</groupId>
<artifactId>ktor-server-netty</artifactId>
<version>${ktor_version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback_version}</version>
</dependency>
<dependency>
<groupId>io.ktor</groupId>
<artifactId>ktor-server-core</artifactId>
<version>${ktor_version}</version>
</dependency>
<dependency>
<groupId>io.ktor</groupId>
<artifactId>ktor-server-sessions</artifactId>
<version>${ktor_version}</version>
</dependency>
<dependency>
<groupId>io.ktor</groupId>
<artifactId>ktor-websockets</artifactId>
<version>${ktor_version}</version>
</dependency>
四、编程思路
这里我实现了一个基于netty的网络服务器,用来转发客户端的请求来实现网络通信,本来打算使用websocket实现(其实过),但由于考虑到websocket是socket的封装,性能毫无疑问的比socket低,对于游戏,fps,及时类的应用程序socket编程才是最佳之选。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v1npQQKx-1618722963322)(http://www.jiayou.art/ktor/ktor.jpg)]
通过Socket Server来维护各个Socket Client的连接对象,对数据交互IO进行处理。
五、代码实现
对象序列化器
传递对象避免出现粘包
class SerializableTool {
companion object {
//将对象转为ByteArray(字节)数组
fun ObjectToByteArray(obj: Any?): ByteArray {
val byteArrayOutputStream = ByteArrayOutputStream()
val objectOutputStream = ObjectOutputStream(byteArrayOutputStream)
objectOutputStream.writeObject(obj)
objectOutputStream.flush()
return byteArrayOutputStream.toByteArray()
}
//将ByteArray(字节)数组转为对象(Object,在Kotlin对应的是Any)
fun ByteArrayToObject(byteArray: ByteArray?): Any {
val `in` = ByteArrayInputStream(byteArray)
val sIn = ObjectInputStream(`in`)
return sIn.readObject()
}
}
}
数据协议
各个socket通信我们使用自定义协议来实现,避免粘包
data class Translation(var data: ByteArray? = null, var messageCurrentTime: Long? = null) : Serializable
服务端
fun main(args: Array<String>) {
val socketSet = Collections.synchronizedSet(LinkedHashSet<Socket>())
val writeMap = Collections.synchronizedMap(LinkedHashMap<Socket, ByteWriteChannel>())
runBlocking {
val server = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind(InetSocketAddress("127.0.0.1", 2323))
println("Started echo telnet server at ${server.localAddress}")
while (true) {
val socket = server.accept()
socketSet.add(socket)
println("有远程链连接!当前维护的连接对象数为:${socketSet.size}")
val input = socket.openReadChannel()
val output = socket.openWriteChannel(autoFlush = true)
writeMap[socket] = output
launch {
println("Socket accepted: ${socket.remoteAddress}")
try {
while (true) {
input.awaitContent()
if (input.availableForRead == 0) break
val availableLength = input.readInt()
val byteArray = ByteArray(availableLength) //是的 ktor
input.readFully(byteArray)
socketSet.forEach {
if (it != socket) {
writeMap[it]!!.writeInt(availableLength)
writeMap[it]!!.writeFully(byteArray)
}
}
println((SerializableTool.ByteArrayToObject(byteArray) as Translation).data?.let { String(it) })
}
} catch (e: Throwable) {
e.printStackTrace()
socketSet.remove(socket)
println("有远程链连接异常关闭!当前维护的连接对象数为:${socketSet.size}")
} finally {
socket.close()
}
}
launch {
socket.awaitClosed()
socketSet.forEach {
if (it==socket)
writeMap.remove(it)
}
socketSet.remove(socket)
println("连接正常关闭。当前维护的连接对象数为:${socketSet.size}")
}
}
}
客户端
fun main(args: Array<String>) {
//我们使用的Scanner在hasNext方法等待输入的时候是阻塞式的
//且输出操作的协程和读取操作的协程是在同一个协程上下文的,这就导致了该线程一直被Scanner的next方法阻塞
//协程调度器无法从阻塞的线程中再调度,也就是说输出操作的协程一直会阻塞读取操作的协程
//协程默认的上下文为当前线程(在这里是main线程),所以launch协程的默认调度上下文不符合我们的期望。
// launch是CoroutineScope的拓展函数,第一个参数指定是一个携程的上下文,不传默认就是当前线程
// public fun CoroutineScope.launch(
// context: CoroutineContext = EmptyCoroutineContext,
// start: CoroutineStart = CoroutineStart.DEFAULT,
// block: suspend CoroutineScope.() -> Unit
// )
// 这里我们new了两个单线程的协程上下文
// threadPoolOfInput为读取操作的协程的上下文
// threadPoolOfOutPut为输出操作的协程的上下文
// 他们其实是两个线程,也就是读取操作和写出操作是运行在两个独立的线程
// 所以他们不会互相阻塞对方
val threadPoolOfInput = newSingleThreadContext("input")
val threadPoolOfOutPut = newSingleThreadContext("output")
//启动一个阻塞式协程上下文构建器
runBlocking {
//在指定主机和端口来连接一个ServerSockert服务,等待服务端响应连接(异步非阻塞式)
val socket = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress("127.0.0.1", 2323))
//打开socker套接字的输入流(后面的autoFlush参数为刷新缓冲区,不用再手动write完数据后手动调flush)
val output = socket.openWriteChannel(autoFlush = true)
//打开socker套接字的输入流
val input = socket.openReadChannel()
//如果是launch {
// ...
// }
//读取操作协程会被输出操作的协程中scanner的hasNext()一直阻塞,收不到服务端发来的消息
//读取操作的协程
launch(threadPoolOfInput) {
while (true) {
input.awaitContent()
if (input.availableForRead == 0) break
val availableLength = input.readInt()
val byteArray = ByteArray(availableLength)
input.readFully(byteArray)
val data = SerializableTool.ByteArrayToObject(byteArray) as Translation
println(data.data?.let { kotlin.text.String(it) } + ",当前消息延迟为:" +
"${System.currentTimeMillis() - (if (data.messageCurrentTime == null) 0 else data.messageCurrentTime)!!}")
}
}
//如果是launch {
// ...
// }
//输出操作的协程中scanner的hasNext()会一直阻塞读取操作协程,收不到服务端发来的消息
//输出操作的协程
launch(threadPoolOfOutPut) {
val scanner = Scanner(System.`in`)
while (scanner.hasNext()) {
val objectToByteArray = SerializableTool.ObjectToByteArray(
Translation(
("来自客户端${socket.remoteAddress}的消息:" + scanner.nextLine()).toByteArray(Charsets.UTF_8),
System.currentTimeMillis()
)
)
output.writeInt(objectToByteArray.size)
output.writeFully(objectToByteArray)
}
}
}
}
六、测试
启动一个服务端,和三个客户端
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tZ1dJEex-1618722963326)(https://www.jiayou.art/ktor/1.png)]
客户端一发送"hello ktor!"
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fmYJ6Bht-1618722963328)(https://www.jiayou.art/ktor/2.png)]
服务端接收到消息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Sf4wPc5J-1618722963330)(https://www.jiayou.art/ktor/3.png)]
客户端二接收到消息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7nrv6XTW-1618722963330)(https://www.jiayou.art/ktor/4.png)]
客户端三接收到消息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5vwmG0qN-1618722963331)(https://www.jiayou.art/ktor/5.png)]
大家忽略上面的延迟字样,第一次发消息延迟会比较高,因为会涉及第一次初始化,缓冲,建立流传输通道等操作,会后IO操作都是5ms左右。
七、总结
ktor非常实用,小巧,对netty和jetty都进行了封装,在web方面也提供了websocket、Auth、JWT、Jackson、SSL实现,包括web周边的路由,CORS,模板渲染引擎(Freemarker,Themyleaf,Velocity)实现,很适合作为项目的脚手架。
推荐链接:
中国唯一 Google 官方认证 Android 和 Kotlin 双领域开发专家(GDE): https://space.bilibili.com/27559447?from=search&seid=18022887471961950104
ktor官网:https://ktor.kotlincn.net/
更多推荐
所有评论(0)