kafka实现消费者
kafka实现消费者kafka在springboot中yml 配置kafka:#消费者配置consumer:bootstrap-servers: xxx.xx.xx.xx:9092group-id: kafka-group # 默认的消费组IDauto-offset-reset: earliestenable-auto-commit: true # 是否自动提交offsetauto-commit-
·
kafka实现消费者
- kafka在springboot中yml 配置
kafka:
#消费者配置
consumer:
bootstrap-servers: xxx.xx.xx.xx:9092
group-id: kafka-group # 默认的消费组ID
auto-offset-reset: earliest
enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 1000 # 提交offset延时(接收到消息后多久提交offset)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
topic-name: yinhuan
- 实现监听配置
package org.jeecg.modules.InspectionApplet.server;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Collections;
import java.util.Properties;
@Configuration
@Slf4j
public class KafkaConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
public String kafka_server;
@Value("${spring.kafka.consumer.key-deserializer}")
public String kafka_consumer_key;
@Value("${spring.kafka.consumer.value-deserializer}")
public String kafka_consumer_value;
@Value("${spring.kafka.consumer.enable-auto-commit}")
public String kafka_auto_config;
@Value("${spring.kafka.consumer.auto-commit-interval}")
public String kafka_commmit_interval;
@Value("${spring.kafka.consumer.group-id}")
public String kafka_group_id;
@Value("${spring.kafka.consumer.topic-name}")
public String kafka_topic_name;
@Autowired
private KafkaConsumerListener kafkaConsumerListener;
public static KafkaConsumer<String, String> kafkaConsumer;
@Bean
public void loadKafkaConfig() {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafka_consumer_key);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafka_consumer_value);
p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafka_auto_config);
p.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafka_commmit_interval);
p.put(ConsumerConfig.GROUP_ID_CONFIG, kafka_group_id);
kafkaConsumer = new KafkaConsumer<String, String>(p);
kafkaConsumer.subscribe(Collections.singletonList(kafka_topic_name));// 订阅消息
log.info("消息订阅成功!kafka配置:" + p.toString());
//启动消息监听线程
KafkaListenerJob kafkaListenerJob = new KafkaListenerJob(kafkaConsumerListener);
Thread t = new Thread(kafkaListenerJob);
t.start();
}
}
- 实现监听任务
package org.jeecg.modules.InspectionApplet.server;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.beans.factory.annotation.Autowired;
@Slf4j
public class KafkaListenerJob implements Runnable{
@Autowired
private KafkaConsumerListener kafkaConsumerListener;
//注入消息监听处理类
public KafkaListenerJob(KafkaConsumerListener kafkaConsumerListener) {
this.kafkaConsumerListener = kafkaConsumerListener;
}
@Override
public void run() {
log.info("kafka消息监听任务已启动!");
//进行消息监听
while (true) {
ConsumerRecords<String, String> records = KafkaConfig.kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
try {
kafkaConsumerListener.listen(record);
} catch (Exception e) {
log.error("消息消费异常!", e);
}
}
}
}
}
- 处理业务
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.flowable.engine.impl.el.FixedValue;
import org.jeecg.modules.InspectionApplet.entity.AppletEntity;
import org.jeecg.modules.InspectionApplet.entity.TaskItem;
import org.jeecg.modules.desform.entity.SysDesignData;
import org.jeecg.modules.desform.service.ISysDesignDataService;
import org.jeecg.modules.process.entity.FlowTaskData;
import org.jeecg.modules.process.mapper.FlowCategoryMapper;
import org.jeecg.modules.process.mapper.SystemMapper;
import org.jeecg.modules.process.po.ProcessInfo;
import org.jeecg.modules.process.po.StartProcessInstanceReq;
import org.jeecg.modules.process.po.UserInfo;
import org.jeecg.modules.process.service.IFlowCategoryService;
import org.jeecg.modules.process.service.IFlowTaskDataService;
import org.jeecg.modules.process.service.ProcessEngineService;
import org.jeecg.modules.process.vo.FlowCategroyVo;
import org.jeecg.modules.system.entity.SysUser;
import org.jeecg.modules.system.service.ISysUserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
@Slf4j
@Service
public class KafkaConsumerListener {
private FixedValue isEdit;
private FixedValue editField;
private FixedValue noBack;
@Autowired
private ISysDesignDataService sysDesignDataService;
@Autowired
private IFlowCategoryService flowCategoryService;
@Autowired
private ISysUserService sysUserService;
@Autowired
private IFlowTaskDataService flowTaskDataService;
@Autowired
private SystemMapper systemMapper;
@Autowired
private FlowCategoryMapper flowCategoryMapper;
@Autowired
private ProcessEngineService processEngineService;
private final SimpleDateFormat date = new SimpleDateFormat("yyyy年MM月dd日");
private final SimpleDateFormat time = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒");
/**
* kafka消息处理类
*
* @param consumerRecord
*/
public void listen(ConsumerRecord<String, String> consumerRecord) throws Exception {
try {
String value = consumerRecord.value();
log.info("监听Kafka消息:" + value);
Object parse = JSON.parse(value);
AppletEntity appletEntity = JSON.parseObject(parse.toString(), AppletEntity.class);
List<TaskItem> taskItem = appletEntity.getTask().getTaskItem();
SysUser byId = sysUserService.getById(appletEntity.getUserId());
String post = byId.getPost();
startQualityFlow(taskItem,appletEntity,byId,post);
log.warn(appletEntity.getProjectName());
}catch (Exception e){
throw new Exception(this.getClass().getName() + "Kafka消息处理异常:" + e);
}
}
点击阅读全文
更多推荐
目录
所有评论(0)