Flink简介与集群的简单部署
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算Flink框架处理流程Flink的应用场景。
Flink的特点:高吞吐 低延迟 高容错 结果正确 语义化窗口 易用的API
第一章:初识Flink
1.1 Flink流处理简介
1.1.1 Flink是什么
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算
Flink框架处理流程
Flink的应用场景
1.1.2 为什么要用Flink
- 批处理和流处理是两种截然不同的数据处理方式,Spark更适合批处理
- 流数据更真实地反映了我们的生活方式,但很多时候可以将流数据积攒在一起进行批处理
- 我们的目标
- 低延迟
- 高吞吐
- 结果的准确性和良好的容错性
传统数据处理架构
- 事务处理
耗时较快,但是数据量不会很大
- 分析处理
数据量可以很大,但是耗时比较慢
有状态的流处理
- lambda架构
用两套系统,同时保证低延迟和结果准确
新一代流处理器——Flink - 核心特点
- 高吞吐、低延迟
- 结果的准确性
- 精确一次(exactly-once)的状态一致性保证
- 可以与众多常用存储系统连接
- 高可用,支持动态扩展
流处理在不同架构下的具体应用
- 事件驱动型应用
- 数据分析型应用
- 数据管道型应用
分层API
- 越顶层越抽象,表达含义越简明,使用越方便
- 越底层越具体,表达能力越丰富,使用越灵活
1.2 Flink vs Spark
-
数据处理架构
Spark Streaming本质上还是一个批处理 -
数据模型
- Spark采用RDD模型,Spark Streaming的DStream实际上也就是一组组小批数据RDD的集合
- Flink基本数据模型是数据流,以及事件(Event)序列
-
运行时架构
- Spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
- Flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理
第二章:Flink快速上手
Flink底层是以Java编写的,并为开发人员同时提供了完整的Java和Scala API,在这里我们使用IntelliJ IDEA作为开发工具,用实际项目中最常见的Maven作为包管理工具,在开发环境中编写一个简单的Flink项目
2.1 环境准备
IntelliJ IDEA、Java 8、Maven、Git
2.2 创建项目
在InteliJ中创建一个名为FlinkTutor的Maven项目,在pom文件中加入我们需要的Flink依赖和与日志相关的依赖
<properties>
<flink.version>1.16.1</flink.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.5</slf4j.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!--引入Flink相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.1</version>
</dependency>
<!--引入日志管理相关依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.19.0</version>
</dependency>
在属性中,我们定义了scala.binary.version,这指代的是所依赖的Scala版本,我们需要Scala依赖,是因为Flink架构中使用了Akka来实现底层的分布式通信,而Akka是用Scala开发的
配置日志管理
在目录src/main/resources下添加文件:log4j.properties,内容配置如下:
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
2.3 编写代码
2.3.1 批处理
对于批处理而言,输入的应该是收集好的数据集。这里我们可以将要统计的文字,写入到一个文本文档,然后读取这个文件处理数据就可以了。
- 在项目根目录下新建一个input文件夹,并在下面创建文本文件words.txt
- 在words.txt中输入一些文字,例如
hello world
hello flink
hello java
- 在com.org.example包下新建Java类BatchWordCount,在静态main方法中编写测试代码。
我们进行单词频次统计到基本思路是:先逐行读入文件数据,然后将每一行文字拆分成单词:接着按照单词分组,统计每组数据的个数,就是对应单词的频次
package org.example;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2. 从文件读取数据
DataSource<String> lineDataSource = env.readTextFile("input/words.txt");
// 3. 将每行数据进行分词,转换成二元组类型
FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
// 将一行文本进行分词
String[] words = line.split(" ");
// 将每个单词转换成二元组输出
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
} )
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 按照word进行分组
UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);
// 5. 分组内进行聚合统计
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
// 6. 打印结果
sum.print();
}
}
代码说明和注意事项:
- Flink在执行应用程序前应该获取执行环境对象,也就是运行时上下文环境。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Flink同时提供了Java和Scala两种语言的API,有些类在两套API中名称是一样的。所以在引入包时,如果有Java和Scala两种选择,要注意选用Java的包
- 这种代码的实现方式,是基于DataSet API的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:
bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
这样,DataSet API就已经处于“软弃用”(soft deprecated)的状态,在实际应用中我们只要维护一套DataStream API就可以了。这是只是为了方便大家理解,我们依然使用DataSet API做了批处理的实现。
2.3.2 流处理
我们已经知道,用DataSet API可以很容易地实现批处理;与之对应,流处理当然可以用DataStream API来实现。对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API更为强大,可以直接处理批处理和流处理的所有场景。
DataStream API作为“数据流”的处理接口,又怎样处理批数据呢?
回忆一下上一章中我们讲到的Flink世界观。在Flink的视角里,一切数据都可以认为是流,流数据是无界流,而批数据是有界流。所以批处理,其实就可以看作有界流的处理。
对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我们输入的数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。
下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。
- 读取文件
a. 我们同样尝试读取文档words.txt中的数据
BoundedStreamWordCount
package org.example;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class BoundedStreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建流式的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/words.txt");
// 3. 转换计算
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector< Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
// 6. 打印
sum.print();
// 7. 启动执行
env.execute();
}
}
真正以流的形式输入数据:StreamWordCount
// 从参数中提取主机名和端口号
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String hostname = parameterTool.get("host");
Integer port = parameterTool.getInt("port");
// 2. 读取文本流
DataStreamSource<String> lineDataStream = env.socketTextStream(hostname, port);
第三章 Flink部署
Flink提交作业和执行任务,需要几个关键组件:客户端(client)、作业管理器(JobManager)和任务管理器(TaskManager)。我们的代码由客户端获取并做转换,之后提交给JobManager,所以JobManager就是Flink集群里的“管事人”。对作业进行中央调度管理;而它获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的TaskManager。这里的TaskManager,就是真正“干活的人”。数据的处理操作都是它们来做的。
3.1 快速启动一个Flink集群
3.1.1 环境配置
Flink是一个分布式的流处理框架,所以实际应用一般都需要搭建集群环境。我们在进行Flink安装部署的学习时,需要准备3台Linux机器,具体要求如下:
- 系统环境为CentOS7
- 安装Java8
- 安装Hadoop集群
- 配置集群节点服务器时间同步以及免密登录,关闭防火墙
我们三台服务器的具体设置如下: - 节点服务器1 ,IP地址为192.168.21.102,主机名为hadoop102
- 节点服务器2 ,IP地址为192.168.21.103,主机名为hadoop103
- 节点服务器3 ,IP地址为192.168.21.104,主机名为hadoop104
3.1.2 本地启动
最简单的启动方式,其实不是搭建集群,直接本地启动。本地部署非常简单,直接解压安装包就可以使用,不用进行任何配置,一般用来做一些简单的测试。
具体安装步骤如下:
进入官网下载你需要版本的Flink,放到你虚拟机的/opt/software/
目录下(这里我们下载的是flink-1.16.1-bin-scala_2.12.tgz,注意此处选用对于Scala2.12的安装包),解压到/opt/module/
目录下
tar -zxvf flink-1.16.1-bin-scala_2.12.tgz -C /opt/module/
进入解压后的目录,执行启动命令,并查看进程
cd /opt/module/flink-1.16.1/
bin/start-cluster.sh
如果想让Flink集群停止运行,可以执行以下命令:
bin/stop-cluster.sh
3.1.3 集群启动
可以看到,Flink本地启动非常简单,直接执行start-cluster.sh就可以了。如果我们想要扩展成集群,其实启动命令是不变的,主要是需要指定节点之间的主从关系。
Flink是典型的Master-Slave架构发分布式数据处理框架,其中Master角色对应着JobManager,Workers角色则对应
TaskManager。我们对三台节点服务器的角色分配如下表所示
具体安装部署步骤如下:
- 将Flink分发到其它机器中
xsync flink
- 修改集群配置
- 进入conf目录下,修改flink-conf.yaml文件,修改jobmanager.rpc.address参数为hadoop102,即,这样就指定了hadoop02节点服务器为JobManager节点
# JobManager节点地址
jobmanager.rpc.address: hadoop102
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 4096m # 每个JobManager的可用内存大小
taskmanager.bind-host: 0.0.0.0
taskmanager.memory.process.size: 8192m # 每个TaskManager的可用内存大小
taskmanager.numberOfTaskSlots: 8 # 每台计算机可用的CPU数
parallelism.default: 8 # 集群中默认的并行度
rest.address: hadoop102
rest.bind-address: 0.0.0.0
- 修改workers文件,将另外两台节点服务器添加为本Flink集群的TaskManager节点
hadoop103
hadoop104
- 修改masters文件
hadoop102:8081
- 将配置分发到其它机器
xsync conf/
- 启动
# 在Flink目录下运行
bin/start-cluster.sh
浏览器中访问hadoop102:8081
3.1.4 向集群提交作业
现在,我们有了真正的集群环境,接下来我们就可以把作业提交上去执行了。
本节我们以流处理的程序为例,演示如何将任务提交到集群中进行执行,具体步骤如下
可以选择在Web UI上提交,也可以选择在hadoop上使用命令行的方式提交
在Web UI上提交
- 首先,打包好我们在2.3.2中编写好的代码为jar包
(1)先使用Maven中Lifecycle中的clean,清除target包
(2)再点击Build中的Rebuild Project
(3)使用Maven中Lifecycle中的package,进行打包
(4)Rebuild Project之后,Maven会自动编译你的整个项目,当然也包括了单独使用Maven中compile没有编译到的sr类,并输出到target当中,如(4),生成的FlinkTutor-1.0-SNAPSHOT.jar即我们需要的jar包 - 然后,将jar包上传到Flink的Web UI界面中,
需要填的四个参数:Entry Class
:我们需要运行的类的全类名。在IDE中右键类所在的位置,点击Copy Path/Reference...
——>Copy Reference
Parallelism
:并行度Program Arguments
:运行程序时传入的参数,结合2.3.2中所写的流处理代码,我们需要传入host和port两个参数Savedpoint Path
:程序保存点。这里我们可以先不填
- 设置好之后,可以点击Show Plan查看运行计划
- 在hadoop102中开放7777端口:nc -lk 7777,点击Submit提交任务
- 在hadoop102中以流的形式输入数据,可以在Flink Web UI中的Task Manager里的输出看到运行结果
- 想要停掉作业,点击
Cancel Job
即可
用命令行进行作业提交
可以传jar包到任一主机上,在flink目录下指定各种参数提交任务
./bin/flink run -m hadoop102:8081 -c org.example.StreamWordCount -p 2 ./FlinkTutor-1.0-SNAPSHOT.jar --host hadoop102 --port 7777
Job ID相同,命令行上传任务成功
3.2 部署模式
在一些应用场景,对于集群资源分配和占用的方式,可能会有特定的需求。Flink为各种场景提供了不同的部署模式,主要有以下三种:
- 会话模式(Session Mode)
- 单作业模式(Pre-Job Mode)
- 应用模式(Application Mode)
它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的main方法到底在客户端还是JobManager里执行。
3.2.1 会话模式(Session Mode)
会话模式其实最符合常规思维。我们需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业,如图所示,集群启动时所有的资源就都已经确定,所以所有提交的作业会竞争集群中的资源。
会话模式比较适合于单个规模小、执行时间短的大量作业
3.2.2 单作业模式(Pre-Job Mode)
会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Pre-Job)模式
单作业模式也很好理解,就是严格的一对一,集群只为这个作业而生。同样由客户端运行启动程序,然后启动集群,作业被提交给JobMan ager,进而分发给TaskManager执行。作业完成之后,集群就会关闭,所有资源也会释放。
这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。
需要注意的是,Flink本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如YARN、Kubernetes
3.2.3 应用模式(Application Mode)
前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。
所以解决办法就是,我们不要客户端了,直接把应用提交到JobManager上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JobManager也就关闭了,这就是所谓的应用模式
应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由JobManager执行应用程序的,并且即使包含了多个作业,也只创建一个集群。
这里我们所讲到的部署模式,相对是比较抽象的概念。实际应用时,一般需要和资源管理平台结合起来,选择特定的模式来分配资源、部署应用。接下来,我们就针对不同的资源提供者(Resource Provider)的场景,具体介绍Flink的部署方式。
3.3 独立模式(Standalone)
独立模式(Standalone)是部署Flink最基本也是最简单的方式:所需要的所有Flink组件,都只是操作系统上运行的一个JVM进程。
独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下
3.3.1 会话部署模式
我们在3.1节中使用的就是独立(standalone)集群的会话部署模式
3.3.2 单作业部署模式
在3.2.2节中我们提到,Flink本身无法直接以单作业方式启动集群,一般需要借助一些资源管理平台。所以Flink的独立(standalone)集群并不支持单作业模式部署。
3.3.3 应用部署模式
应用部署模式不会提前创建集群,所以不能调用start-cluster.sh脚本。我们可以使用同样在bin目录下的standalone-job.sh来创建一个JobManager
具体步骤如下
- 进入到Flink的安装路径下,将应用程序的jar包放到lib/目录下
cp ./
- 执行以下命令,启动JobManager
bin/standalone-job.sh --job-classname com.
这里我们直接指定作业入口类,脚本会到lib目录扫描所有的jar包
3. 同样是使用bin目录下的脚本,启动TaskManager
bin/taskmanager.sh start
- 如果希望停掉集群,同样可以使用脚本,命令如下
bin/standalone-job.sh stop
bin/taskmanager.sh stop
3.4 YARN模式
YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManger的实例,从而启动集群。Flink会根据运行在JobManager上的作业所需要的Slot数量动态分配TaskManager资源。
3.4.1 相关准备和配置
在将Flink任务部署到YARN集群之前,需要确认集群是否安装有Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有HDFS服务。
详情可见博主的上一篇文章
配置环境变量,增加环境变量配置如下
sudo vim /etc/profile.d/my_env.sh
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
source一下
source /etc/profile
3.4.2 会话模式部署
YARN的会话模式与独立集群略有不同,需要首先申请一个YARN会话(YARN session)来启动Flink集群。具体步骤如下:
- 启动集群
(1)启动Hadoop集群(HDFS,YARN)
(2)执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群
bin/yarn-session.sh -nm test
可用参数解读:
-d
:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。-jm(--jobManagerMemory)
:配置JobManager所需内存,默认单位MB-nm(--name)
:配置在YARN UI界面上显示的任务名-qu(--queue)
:指定YARN队列名-tm(--taskManager)
:配置每个TaskManager所使用的内存
注意:Flink版本不再使用-n参数和-s参数分别指定TaskManager数量和slot数量,YARN会按照需求动态分配TaskManager和slot。所以从这个意义上讲,YARN的会话模式也不会把集群资源固定,同样是动态分配的。
YARN session启动之后会给出一个web UI地址以及一个YARN application ID,如下所示
用户可以通过web UI或者命令行两种方式提交作业
得到启动的JobManager Interface:http:hadoop104:35055
打开
2. 在hadoop103上通过命令行的方式提交作业
注意:当前的JobManager是直接通过yarn进行管理的,所以当前我们没有必要再去指定-m(即谁是JobManager)了。客户端可以自行确定JobManager的地址,也可以通过-m或者–jobManager参数指定JobManager的地址,JobManager的地址在YARN session的启动页面中可以找到
./bin/flink run -c org.example.StreamWordCount -p 2 ./FlinkTutor-1.0-SNAPSHOT.jar --host hadoop102 --port 7777
任务提交成功后,可以在YARN的Web UI界面查看运行情况
第四章 Flink运行时架构
4.1 Flink系统架构
作业管理器(JobManager)
控制一个应用程序执行的主进程,是Flink集群中任务管理和调度的核心。
Jobmaster
- Jobmaster是JobManager中最核心的组件,负责处理单独的作业(Job)
- 在作业提交时,JobMaster会先接收到要执行的应用。一般是由客户端提交来的,包括:Jar包,数据流图(dataflow graph)和作业图(JobGraph)
- JobMaster会把JobGraph转换成一个物理层面的数据流图,这个图被叫作“执行图”(ExecutionGraph),它包含了所有可以并发执行的任务。JobMaster会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。
- 在运行过程中,JobMaster会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调
资源管理器
- ResourceManager主要负责资源的分配和管理,在Flink集群中只有一个。所谓“资源”,主要是指TaskManager的任务槽(task slots)。任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源。每一个任务(Task)都需要分配到一个slot上执行。
分发器(Dispatcher) - Dispatcher主要负责提供一个REST接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的JobManager组件。Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中并不是必需的,在不同的部署模式下可能会被忽略掉。
任务管理器(TaskManager)
- Flink中的工作进程,通常在Flink中会有多个TaskManager运行,每个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够并行处理的任务数量。
- 启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或多个插槽提供给JobMaster调用。JobMaster就可以向插槽分配任务(tasks)来执行了
- 在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据
更多推荐
所有评论(0)