写在前面

项目基于spring-boot 2.3.12.RELEASE和spring-cloud Hoxton.SR12开发

开发spring-cloud-stream-redis模块

项目结构如下

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.lhstack.stream</groupId>
    <artifactId>spring-cloud-stream-redis</artifactId>
    <version>0.0.1</version>
    <name>spring-cloud-stream-redis</name>
    <description>spring-cloud-stream-redis project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
        <protostuff.version>1.7.4</protostuff.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>${protostuff.version}</version>
        </dependency>

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>${protostuff.version}</version>
        </dependency>

        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <scope>test</scope>
            <classifier>test-binder</classifier>
            <type>test-jar</type>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
        <pluginRepository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <releases>
                <enabled>false</enabled>
            </releases>
        </pluginRepository>
    </pluginRepositories>

</project>

项目目录结构
在这里插入图片描述

自定义Consumer和Producer配置类

这里自定义配置类,用于后面的配置扩展
RedisStreamProducerProperties

package com.lhstack.stream.redis.properties;

/**
 * 自定义producer配置
 *
 * @author lhstack
 * @date 2021/9/7
 * @class RedisStreamProducerProperties
 * @since 1.8
 */
public class RedisStreamProducerProperties {


}

RedisStreamConsumerProperties

package com.lhstack.stream.redis.properties;

/**
 * 自定义consumer配置
 *
 * @author lhstack
 * @date 2021/9/7
 * @class RedisConsumerProperties
 * @since 1.8
 */
public class RedisStreamConsumerProperties {


}

定义redis客户端配置类

这里单独定义,是为了和业务解耦

package com.lhstack.stream.redis.properties;

import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * 配置客户端信息
 * @author lhstack
 * @date 2021/9/7
 * @class RedisStreamBindingProperties
 * @since 1.8
 */
@ConfigurationProperties("spring.cloud.stream.redis.binder")
public class RedisStreamClientProperties {

    private String host;

    private Integer port;

    private Integer database;

    private String password;

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public Integer getDatabase() {
        return database;
    }

    public void setDatabase(Integer database) {
        this.database = database;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "RedisStreamBindingProperties{" +
                "host='" + host + '\'' +
                ", port=" + port +
                ", database=" + database +
                ", password='" + password + '\'' +
                '}';
    }
}

定义StreamBindingProperties配置类

用于在stream中返回自定义的Consumer和Producer配置类

package com.lhstack.stream.redis.properties;

import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;

/**
 * 绑定consumer和producer
 * @author lhstack
 * @date 2021/9/7
 * @class RedisBindingProperties
 * @since 1.8
 */
public class RedisStreamBindingProperties implements BinderSpecificPropertiesProvider {

    public RedisStreamProducerProperties redisStreamProducerProperties = new RedisStreamProducerProperties();

    public RedisStreamConsumerProperties redisStreamConsumerProperties = new RedisStreamConsumerProperties();

    @Override
    public Object getConsumer() {
        return redisStreamConsumerProperties;
    }

    @Override
    public Object getProducer() {
        return redisStreamProducerProperties;
    }

    public RedisStreamProducerProperties getRedisStreamProducerProperties() {
        return redisStreamProducerProperties;
    }

    public void setRedisStreamProducerProperties(RedisStreamProducerProperties redisStreamProducerProperties) {
        this.redisStreamProducerProperties = redisStreamProducerProperties;
    }

    public RedisStreamConsumerProperties getRedisStreamConsumerProperties() {
        return redisStreamConsumerProperties;
    }

    public void setRedisStreamConsumerProperties(RedisStreamConsumerProperties redisStreamConsumerProperties) {
        this.redisStreamConsumerProperties = redisStreamConsumerProperties;
    }
}

定义ExtendBindingProperties

这里用于在原有的ConsumerProperties和ProducerProperties类的基础上继承自定义的Consumer和Producer类,实现在output和input的基础上实现自定义配置的功能

package com.lhstack.stream.redis.properties;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;

/**
 * 自定义扩展的bindings配置,命名规则 spring.cloud.stream.redis.input spring.cloud.stream.redis.output
 * @author lhstack
 * @date 2021/9/7
 * @class RedisExtendBindingProperties
 * @since 1.8
 */
@ConfigurationProperties(value = "spring.cloud.stream.redis")
public class RedisExtendBindingProperties extends
        AbstractExtendedBindingProperties<RedisStreamConsumerProperties, RedisStreamProducerProperties, RedisStreamBindingProperties> {

    private static final String DEFAULTS_PREFIX = "spring.cloud.stream.redis.default";

    @Override
    public String getDefaultsPrefix() {
        return DEFAULTS_PREFIX;
    }

    @Override
    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return RedisStreamBindingProperties.class;
    }
}

自定义消息发送器

这里的消息发送器,是指向系统的SubscribableChannel发送消息,这里应用的场景是,我从redis的通道里面订阅消息,然后发送到系统的SubscribableChannel里面,然后StreamListener去订阅消息

package com.lhstack.stream.redis.producer;

import com.lhstack.stream.redis.properties.RedisStreamConsumerProperties;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;

import java.util.function.BiConsumer;

/**
 * 自定义消息发送器,监听redis 通道里面的消息,发送到应用的StreamListener上面
 * @author lhstack
 * @date 2021/9/7
 * @class RedisMessageProducer
 * @since 1.8
 */
public class RedisStreamMessageProducer extends MessageProducerSupport {

    private final StatefulRedisPubSubConnection<String, Message<byte[]>> connection;
    private final ConsumerDestination destination;
    private final ExtendedConsumerProperties<RedisStreamConsumerProperties> properties;
    private final String group;

    public RedisStreamMessageProducer(StatefulRedisPubSubConnection<String, Message<byte[]>> connection, ConsumerDestination destination, String group, ExtendedConsumerProperties<RedisStreamConsumerProperties> properties) {
        this.connection = connection;
        this.destination = destination;
        this.group = group;
        this.properties = properties;
    }

    @Override
    protected void doStart() {
        //订阅通道
        connection.async().subscribe(destination.getName()).whenComplete(new BiConsumer<Void, Throwable>() {
            @Override
            public void accept(Void unused, Throwable throwable) {
                //监听消息变化,然后发送到input,这里订阅订阅的是所有通道的消息,需要判断一下
                connection.addListener(new RedisPubSubAdapter<String, Message<byte[]>>() {
                    @Override
                    public void message(String channel, Message<byte[]> message) {
                        if (channel.equals(destination.getName())) {
                            sendMessage(message);
                        }
                    }
                });
            }
        });
    }
}

自定义消息处理器

这里消息处理器,是系统通过MessageChannel发送消息,接收到后的处理,如我在这里是将接收的消息发送到redis的通道里面

package com.lhstack.stream.redis;

import com.lhstack.stream.redis.properties.RedisStreamProducerProperties;
import io.lettuce.core.api.StatefulRedisConnection;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;

/**
 * @author lhstack
 * @date 2021/9/7
 * @class RedisStreamMessageHandler
 * @since 1.8
 */
public class RedisStreamMessageHandler implements MessageHandler {

    private final ProducerDestination destination;
    private final ExtendedProducerProperties<RedisStreamProducerProperties> producerProperties;
    private final MessageChannel errorChannel;
    private final StatefulRedisConnection<String, Message<byte[]>> statefulConnection;

    public RedisStreamMessageHandler(StatefulRedisConnection<String, Message<byte[]>> statefulConnection, ProducerDestination destination, ExtendedProducerProperties<RedisStreamProducerProperties> producerProperties, MessageChannel errorChannel) {
        this.destination = destination;
        this.producerProperties = producerProperties;
        this.errorChannel = errorChannel;
        this.statefulConnection = statefulConnection;
    }


    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        try {
            //这里对应output,所以这里向redis发送消息即可
            this.statefulConnection.async().publish(destination.getName(), MessageBuilder.createMessage((byte[]) message.getPayload(), message.getHeaders()));
        } catch (Exception e) {
            errorChannel.send(MessageBuilder.withPayload(e.getMessage()).build());
        }
    }
}

创建StreamBinder

用于将InPut,OutPut通道和自定义的消息处理器和发送器进行绑定

package com.lhstack.stream.redis;

import com.lhstack.stream.redis.producer.RedisStreamMessageProducer;
import com.lhstack.stream.redis.properties.RedisExtendBindingProperties;
import com.lhstack.stream.redis.properties.RedisStreamClientProperties;
import com.lhstack.stream.redis.properties.RedisStreamConsumerProperties;
import com.lhstack.stream.redis.properties.RedisStreamProducerProperties;
import com.lhstack.stream.redis.utils.ProtostuffUtils;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import org.springframework.cloud.stream.binder.*;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
 * redisBinder,output和input处理
 *
 * @author lhstack
 * @date 2021/9/7
 * @class RedisStreamBinder
 * @since 1.8
 */
public class RedisStreamBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<RedisStreamConsumerProperties>, ExtendedProducerProperties<RedisStreamProducerProperties>, RedisProvisioningProvider> implements ExtendedPropertiesBinder<MessageChannel, RedisStreamConsumerProperties, RedisStreamProducerProperties> {

    private final StatefulRedisPubSubConnection<String, Message<byte[]>> connection;
    private final StatefulRedisConnection<String, Message<byte[]>> statefulConnection;
    private final RedisExtendBindingProperties redisExtendBindingProperties;

    public RedisStreamBinder(RedisProvisioningProvider provisioningProvider, RedisStreamClientProperties redisStreamClientProperties, RedisExtendBindingProperties redisExtendBindingProperties) {
        super(null, provisioningProvider);
        //创建redis客户端
        RedisClient redisClient = RedisClient.create(RedisURI.builder().withHost(redisStreamClientProperties.getHost())
                .withPort(redisStreamClientProperties.getPort())
                .withDatabase(redisStreamClientProperties.getDatabase())
                .withPassword(redisStreamClientProperties.getPassword().toCharArray()).build());
        //自定义redis解编码器
        RedisCodec<String, Message<byte[]>> redisCodec = new RedisCodec<String, Message<byte[]>>() {
            @Override
            public String decodeKey(ByteBuffer buf) {
                //key就直接用String
                int remaining = buf.remaining();
                byte[] bytes = new byte[remaining];
                buf.get(bytes);
                return new String(bytes, StandardCharsets.UTF_8);
            }

            @Override
            public Message<byte[]> decodeValue(ByteBuffer buf) {
                //value解码成Message<byte[]>对象,spring会自动处理
                if (!buf.hasRemaining()) {
                    return MessageBuilder.withPayload(new byte[0]).build();
                }
                //这里header用2个字节表示长度即可,一般header长度不会太长,32768基本满足
                short headerLength = buf.getShort();
                byte[] headerBody = new byte[headerLength];
                buf.get(headerBody);
                //body长度是int大小 Integer.MAX_VALUE
                int bodyLength = buf.getInt();
                byte[] bytes = new byte[bodyLength];
                buf.get(bytes);
                //这里内容不做序列化处理,只对header用Protobuf做序列化操作
                Map<String, Object> header = ProtostuffUtils.deserialize(headerBody);
                return MessageBuilder.createMessage(bytes, new MessageHeaders(header));
            }

            @Override
            public ByteBuffer encodeKey(String s) {
                return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
            }

            @Override
            public ByteBuffer encodeValue(Message<byte[]> s) {
                MessageHeaders headers = s.getHeaders();
                //序列化header
                byte[] headerBytes = ProtostuffUtils.serialize(new HashMap<>(headers));
                //申请内存,这里设置为header长度,body长度和header序列化之后的内容与body内容
                ByteBuffer buf = ByteBuffer.allocate(2 + headerBytes.length + s.getPayload().length + 4);
                buf.putShort((short) headerBytes.length).put(headerBytes).putInt(s.getPayload().length).put(s.getPayload());
                //翻转
                buf.flip();
                return buf;
            }
        };
        this.connection = redisClient.connectPubSub(redisCodec);
        this.statefulConnection = redisClient.connect(redisCodec);
        this.redisExtendBindingProperties = redisExtendBindingProperties;
    }

    @Override
    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<RedisStreamProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
        return new RedisStreamMessageHandler(this.statefulConnection,destination, producerProperties, errorChannel);
    }

    @Override
    protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<RedisStreamConsumerProperties> properties) throws Exception {
        //这里对应input,向input发送消息即可
        return new RedisStreamMessageProducer(this.connection, destination, group, properties);
    }


    @Override
    public RedisStreamConsumerProperties getExtendedConsumerProperties(String channelName) {
        return redisExtendBindingProperties.getExtendedConsumerProperties(channelName);
    }

    @Override
    public RedisStreamProducerProperties getExtendedProducerProperties(String channelName) {
        return redisExtendBindingProperties.getExtendedProducerProperties(channelName);
    }

    @Override
    public String getDefaultsPrefix() {
        return redisExtendBindingProperties.getDefaultsPrefix();
    }

    @Override
    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return redisExtendBindingProperties.getExtendedPropertiesEntryClass();
    }
}

ProtostuffUtils

package com.lhstack.stream.redis.utils;


import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.runtime.RuntimeSchema;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * @author lhstack
 * @date 2021/9/7
 * @class ProtostuffUtils
 * @since 1.8
 */
public class ProtostuffUtils {

    public static class ProtostuffData {
        private final Map<String, Object> data = new HashMap<>();

        public void put(String key, Object value) {
            this.data.put(key, value);
        }

        public <T> T get(String key) {
            return (T) this.data.get(key);
        }

        public Set<String> keys() {
            return this.data.keySet();
        }

        public Collection<Object> values() {
            return this.data.values();
        }

    }

    /**
     * 缓存
     */
    private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE * 2 * 4);
    private static final RuntimeSchema<ProtostuffData> SCHEMA = (RuntimeSchema<ProtostuffData>) RuntimeSchema.getSchema(ProtostuffData.class);


    /**
     * 对象序列化
     */
    public static <T> byte[] serialize(T obj) {
        ProtostuffData protostuffData = new ProtostuffData();
        protostuffData.put("data", obj);
        byte[] bytes = ProtostuffIOUtil.toByteArray(protostuffData, SCHEMA, BUFFER);
        BUFFER.clear();
        return bytes;
    }

    /**
     * 对象序列化
     */
    public static <T> T deserialize(byte[] bytes) {
        ProtostuffData data = SCHEMA.newMessage();
        ProtostuffIOUtil.mergeFrom(bytes, data, SCHEMA);
        return data.get("data");
    }

}

创建配置类,将相关bean注册到spring容器

这里使用spring.binders实现配置注入

package com.lhstack.stream.redis;

import com.lhstack.stream.redis.properties.RedisExtendBindingProperties;
import com.lhstack.stream.redis.properties.RedisStreamClientProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;

/**
 * @author lhstack
 * @date 2021/9/7
 * @class RedisStreamBinderAutoConfiguration
 * @since 1.8
 */
@EnableConfigurationProperties({RedisStreamClientProperties.class, RedisExtendBindingProperties.class})
public class RedisStreamBinderAutoConfiguration {

    @Autowired
    private RedisStreamClientProperties redisStreamClientProperties;

    @Autowired
    private RedisExtendBindingProperties redisExtendBindingProperties;

    @Bean
    public RedisStreamBinder redisStreamBinder(RedisProvisioningProvider redisProvisioningProvider) {
        return new RedisStreamBinder(redisProvisioningProvider, redisStreamClientProperties,redisExtendBindingProperties);
    }

    @Bean
    public RedisProvisioningProvider redisProvisioningProvider() {
        return new RedisProvisioningProvider();
    }
}

META-INF/spring.binders

redis:com.lhstack.stream.redis.RedisStreamBinderAutoConfiguration

创建测试项目,使用自定义的spring-cloud-stream-redis模块

这里先执行install将模块安装到maven本地仓库
在这里插入图片描述
在这里插入图片描述

项目结构如下

在这里插入图片描述

pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.lhstack.stream.redis.test</groupId>
    <artifactId>spring-cloud-stream-redis-tests</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-cloud-stream-redis-tests</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
        <spring-cloud-stream-redis.version>0.0.1</spring-cloud-stream-redis.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.lhstack.stream</groupId>
            <artifactId>spring-cloud-stream-redis</artifactId>
            <version>${spring-cloud-stream-redis.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <scope>test</scope>
            <classifier>test-binder</classifier>
            <type>test-jar</type>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

创建自定义Input和Output

用于创建发布和订阅的通道
ByteChannelIo.java

package com.lhstack.stream.redis.test.io;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * 定义用于接收字节流的数据
 * @author lhstack
 * @date 2021/9/7
 * @class ByteChannelIo
 * @since 1.8
 */
public interface ByteChannelIo {

    String BYTE_STREAM_OUT_PUT = "BYTE_STREAM_OUT_PUT";

    String BYTE_STREAM_IN_PUT = "BYTE_STREAM_IN_PUT";

    /**
     * 消息发送通道
     * @return
     */
    @Output(BYTE_STREAM_OUT_PUT)
    MessageChannel messageChannel();

    /**
     * 消息接收通道
     * @return
     */
    @Input(BYTE_STREAM_IN_PUT)
    SubscribableChannel subscribableChannel();
}

JsonChannelIo.java

package com.lhstack.stream.redis.test.io;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * @author lhstack
 * @date 2021/9/7
 * @class JsonChannelIo
 * @since 1.8
 */
public interface JsonChannelIo {
    String JSON_STREAM_OUT_PUT = "JSON_STREAM_OUT_PUT";

    String JSON_STREAM_IN_PUT = "JSON_STREAM_IN_PUT";

    /**
     * json数据结构
     *
     * @return
     */
    @Output(JSON_STREAM_OUT_PUT)
    MessageChannel messageChannel();


    /**
     * json数据结构
     *
     * @return
     */
    @Input(JSON_STREAM_IN_PUT)
    SubscribableChannel subscribableChannel();
}

TextChannelIo.java

package com.lhstack.stream.redis.test.io;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * @author lhstack
 * @date 2021/9/7
 * @class TextChannelIo
 * @since 1.8
 */
public interface TextChannelIo {
    String TEXT_STREAM_OUT_PUT = "TEXT_STREAM_OUT_PUT";

    String TEXT_STREAM_IN_PUT = "TEXT_STREAM_IN_PUT";

    /**
     * json数据结构
     *
     * @return
     */
    @Output(TEXT_STREAM_OUT_PUT)
    MessageChannel messageChannel();


    /**
     * json数据结构
     *
     * @return
     */
    @Input(TEXT_STREAM_IN_PUT)
    SubscribableChannel subscribableChannel();
}

自定义streamListener

用于创建订阅消息的handler
StreamListenerHandler.java

package com.lhstack.stream.redis.test.listener;

import com.lhstack.stream.redis.test.io.ByteChannelIo;
import com.lhstack.stream.redis.test.io.JsonChannelIo;
import com.lhstack.stream.redis.test.io.TextChannelIo;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
 * @author lhstack
 * @date 2021/9/7
 * @class StreamListenerHandler
 * @since 1.8
 */
@Component
public class StreamListenerHandler {

    @StreamListener(TextChannelIo.TEXT_STREAM_IN_PUT)
    public void textListener(String message, Message<String> msg) {
        System.out.println("收到text消息: " + message + "," + msg);
    }

    @StreamListener(JsonChannelIo.JSON_STREAM_IN_PUT)
    public void jsonListener(Map<String, Object> message, Message<Map<String, Object>> msg) {
        System.out.println("收到json消息: " + message + "," + msg);
    }

    @StreamListener(ByteChannelIo.BYTE_STREAM_IN_PUT)
    public void byteListener(byte[] message, Message<byte[]> msg) {
        System.out.println("收到byte消息: " + new String(message, StandardCharsets.UTF_8) + "," + msg);
    }
}

创建controller,用于测试消息发送

package com.lhstack.stream.redis.test.controller;

import com.lhstack.stream.redis.test.io.ByteChannelIo;
import com.lhstack.stream.redis.test.io.JsonChannelIo;
import com.lhstack.stream.redis.test.io.TextChannelIo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

/**
 * @author lhstack
 * @date 2021/9/7
 * @class MessageController
 * @since 1.8
 */
@RestController
@RequestMapping
public class MessageController {

    @Autowired
    @Output(TextChannelIo.TEXT_STREAM_OUT_PUT)
    private MessageChannel textMessageChannel;

    @Autowired
    @Output(JsonChannelIo.JSON_STREAM_OUT_PUT)
    private MessageChannel jsonMessageChannel;

    @Autowired
    @Output(ByteChannelIo.BYTE_STREAM_OUT_PUT)
    private MessageChannel streamMessageChannel;

    @GetMapping("text")
    public ResponseEntity<Boolean> sendTextMessage(@RequestParam(name = "msg") String msg) {
        textMessageChannel.send(MessageBuilder.withPayload(msg).build());
        return ResponseEntity.ok(true);
    }

    @GetMapping("json")
    public ResponseEntity<Boolean> sendJsonMessage(HttpServletRequest request) {
        Map<String, Object> map = new HashMap<>();
        map.put("date", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        map.put("ip", request.getRemoteAddr());
        jsonMessageChannel.send(MessageBuilder.withPayload(map).build());
        return ResponseEntity.ok(true);
    }

    @GetMapping("stream")
    public ResponseEntity<Boolean> sendStreamMessage() throws Exception {
        FileChannel fileChannel = FileChannel.open(Paths.get("C:\\Users\\lhstack\\Desktop\\spring-cloud-stream-redis-tests\\src\\main\\resources\\application.yml"), StandardOpenOption.READ);
        MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
        int remaining = byteBuffer.remaining();
        int count = remaining / 10;
        for (int i = 0; i < count; i++) {
            if (byteBuffer.hasRemaining()) {
                if (count > byteBuffer.remaining()) {
                    count = byteBuffer.remaining();
                }
                byte[] bytes = new byte[count];
                byteBuffer.get(bytes);
                streamMessageChannel.send(MessageBuilder.withPayload(bytes).build());
            }
        }
        return ResponseEntity.ok(true);
    }
}

配置stream Binding

SpringCloudStreamRedisTestsApplication.java

package com.lhstack.stream.redis.test;

import com.lhstack.stream.redis.test.io.ByteChannelIo;
import com.lhstack.stream.redis.test.io.JsonChannelIo;
import com.lhstack.stream.redis.test.io.TextChannelIo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

/**
 * @author lhstack
 */
@SpringBootApplication
@EnableBinding({ByteChannelIo.class, JsonChannelIo.class, TextChannelIo.class})
public class SpringCloudStreamRedisTestsApplication {


    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamRedisTestsApplication.class, args);
    }

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        BYTE_STREAM_OUT_PUT: #对应input,output
          destination: BYTE-STREAM # 对应redis的通道
          contentType: application/octet-stream
        BYTE_STREAM_IN_PUT:
          destination: BYTE-STREAM
          contentType: application/octet-stream
        JSON_STREAM_OUT_PUT:
          destination: JSON-STREAM #通道地址,默认contentType是json
        JSON_STREAM_IN_PUT:
          destination: JSON-STREAM
        TEXT_STREAM_OUT_PUT:
          destination: TEXT-STREAM
          contentType: text/plain
        TEXT_STREAM_IN_PUT:
          destination: TEXT-STREAM
          contentType: text/plain
      redis:
        binder:
          host: 192.168.101.5
          password: 123456
          database: 1
          port: 6379

启动项目,测试消息发送

在这里插入图片描述

发送text消息

在这里插入图片描述
在这里插入图片描述

发送json消息

在这里插入图片描述
在这里插入图片描述

发送stream消息

在这里插入图片描述
在这里插入图片描述

测试集群接收消息

修改为允许并行运行
在这里插入图片描述
修改端口,防止端口冲突,导致启动失败
在这里插入图片描述
启动成功,两个应用
在这里插入图片描述

任意一台机器,发送三种消息

text消息
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
json消息
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
stream消息
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

测试携带header的消息

修改controller,添加header信息
    @GetMapping("header")
    public ResponseEntity<Boolean> sendHeaderJsonMessage(HttpServletRequest request) {
        Map<String, Object> map = new HashMap<>();
        map.put("date", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        map.put("ip", request.getRemoteAddr());
        map.put("msg", "携带了header信息");
        Map<String, Object> headers = new HashMap<>();
        headers.put("date", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        headers.put("ip", request.getRemoteAddr());
        jsonMessageChannel.send(MessageBuilder.createMessage(map, new MessageHeaders(headers)));
        return ResponseEntity.ok(true);
    }

启动项目,并访问测试

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

写在后面

以上就是基于redis使用spring-cloud-stream的功能,这里因为使用的redis发布订阅模型,导致无法手动去拉取消息,所以没有实现Poll的功能,同时,如果使用队列去实现poll的功能,就没法实现发布订阅的功能,所以这里是相冲的。虽然可以使用队列实现发布订阅的功能,但是过程是相当复杂的,这里就不在说明了
代码已经上传到gitee,地址为: https://gitee.com/myprofile/spring-cloud-stream-redis.git

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐