一、Redis安装

1.1 WIndows下安装

1.WIN下载地址:https://github.com/microsoftarchive/redis/releases/tag/win-3.2.100
2.下载完毕后,解压到工作目录。
3.双击运行服务 redis-server
4.双击客户端 redis-cli
在这里插入图片描述

1.2 Linux下安装

1.下载地址:https://redis.io/ ,注意版本号更高的版本需要更高的GCC版本
2.redis-5.0.8.tar.gz
3.上传服务器linux
4. tar -zxvf redis-5.0.8.tar.gz
5. yum install gcc-c++
6. cd redis-5.0.8 -> make && make install
7. 默认redis的安装路径为 : /usr/local/bin
8. 创建redis的配置目录: mkdir /usr/local/bin/redisconf
9. 复制配置文件: cp /home/redis-5.0.8/redis.conf /usr/local/bin/redisconf
10.可以修改为后台启动:vim redis.conf -->daemonize yes
10. 指定配置文件启动redis服务:redis-server /usr/local/bin/redisconf/redis.conf
11. 连接服务:redis-cli -p 6379
12. 关闭服务:shutdown --> exit

1.3 Redis性能测试工具

redis-benchmark
1.测试:100个并发连接,每个并发100000个请求
redis-benchmark -h localhost -p 6379 -c 100 -n 100000
在这里插入图片描述

1.4 基础知识

  1. redis默认有 16 个数据库,默认使用第 0 个数据库。
  2. 切换数据库:select 3
  3. 查看所有的数据K: keys *
  4. 清空当前数据库:flushdb,清除所有数据库:flushall
  5. redis为单线程的。基于内存操作。性能瓶颈为机器内存网络带宽
  6. 多线程CPU切换轮转【耗时间】,上下文切换,不一定比单线程快。
  7. redis所有的数据全部在内存中,单线程效率最高。

二、五大数据类型

2.1 String 类型

  1. 设置 set key1 1www
  2. 获取get key1
  3. 追加 append key “oooo”
  4. 获取长度 STRLEN key1
  5. 自增 浏览量
    set views 0
    incr views 自增1
    decr views 自减1
    incr views 2 设置自增2
  6. 对象 : set user:1 {name:wang,age:24}
    mset user:1:name wang user:1:age 24
    mget user:1name user:1:age

2.2 List类型【栈,队列,阻塞队列】

  1. LPUSH list one 多个值插入到列表的头部
  2. LRANGE list 0 -1 通过区间拿到所有的值
  3. LRANGE list 0 1 通过区间拿到所有的值
  4. Rpush list three 插入队列的尾部
  5. Lpop list 移除左侧第一个值
  6. Rpop list 移除右侧第一个值
  7. lindex list 1 通过瞎做获取值
  8. Llen list 得到列表的长度
  9. Lrem list 1 one 移除一个或者多个指定的值
  10. ltrim list 1 2 从1-2截断下来
  11. lset list 0 kkk ;列表指定下标的值更新为另外一个值

2.3 Set类型

  1. Set中的值是不可以重复的
  2. sadd myset “hello” 添加一个值
  3. SMEMBERS myset 查看某个集合的元素
  4. SISMEMBER myset hello 判断某一个元素是否在这个集合中
  5. scard myset 获取Set集合的个数
  6. srem myset hello 移除某一个值
  7. SRANDMEMBER myset 随机取出一个元素
  8. spop myset 随机移除一个元素
  9. SDIFF key1 key2 差集
  10. SINTER key1 key2 交集
  11. SUNION key1 key2 并集

2.4 Hash类型

Map集合,key-map,值是一个map集合,和String类型差不多
更适合对象的存储

  1. hset myhash field1 wangyin 存储一个值
    hget myhash field1 取出一个值
  2. hmset myhash field1 hello field2 word 存储多个k-v值
  3. hmget field1 field2 获得多个值
  4. hgetall myhash 取出所有字段值
  5. hdel myhash field1 删除指定的key字段,对应的value也被删除了
  6. hlen myhash 集合长度
  7. hkeys myhash 获取所有的key
  8. hvals myhash 获取所有的值
  9. hset user:1 name wangyi 存储对象属性值
  10. hget user:1 name 获取对象属性值

2.5 Zset类型

在set的基础之上,增加了一个值 ,set k1 v1 zset k1 score v1

  1. zadd myset 1 one 1 为排序
  2. zadd salary 1000 xiaoming
    zadd salary 2000 xiaohong
    zadd salary 3000 xiaowang
    ZRANGEBYSCORE salary -inf +inf 所有的字段升序排序
    ZRANGEBYSCORE salary 0 -1 所有的字段降序排序
    ZRANGESCORE salary -inf 2500 withscores 查出所有在2500以内带有分数和名字降序排列
  3. zrange salary 0 -1 查询所有
  4. zrem salary xiaohong 移除一个元素
  5. zscard salary 获取有序集合中的个数

三、特殊数据类型

3.1 Geospatia地理位置

朋友定位,附近的人,打车距离查询
规则:两级数据无法导入,一般通过java批量导入
参数 key 值(经度,维度,名称)
redis在redis3.2之后就推出了,计算地理位置信息,两地距离

  1. geoadd china:city 116.40 39.90 beijing 添加地理位置信息
    geoadd china:city 121.47 31.23 shanghai
    geoadd china:city 106.50 29.53 chongqing
    geoadd china:city 114.05 22.54 guangdong
    geoadd china:city 120.16 30.24 hangzhou
    geoadd china:city 108.96 34.26 xian
  2. geopos china:city beijing xian 获取指定城市的地理位置
  3. geodist china:city beijing shanghai 默认直线距离单位m
    geodist china:city beijing shanghai km 单位为km

附近的人?获取所有的地址 ,通过半径来查询定位
4. georadius china:city 110 30 1000 km 维度 查询附近1000km的城市
5. georadius china:city 110 30 1000 km withdist withcoord count 1 周围人定位信息限定数量1

3.2 Hyperloglog 基数统计

不重复的元素的个数,可以接受误差0.81%
优点:占用的内存是固定的,只需要12K的内存。
页面的访问量【UV】,一个人访问一个网站多次,算作一个人。
传统的方式:Set保存用户ID,如果大量的话,比较麻烦,目的为了计数,而不是保存用户ID。

  1. PFadd mykey a b c d e f g h i h 存储
  2. PFCOUNT mykey 计算基数数量
  3. PFMERGE mykey mykey2 合并

3.3 Bitmap 位图场景

只有0和1,两个状态,操作二进制位
统计疫情感染人数: 0 1 0 1 0 0 0 1
统计同户信息,活跃,不活跃,登录, 不登录, 打卡
356天 = 365 bit 46byte足够

测试只有1-周末的打卡记录:周一:1 周二:0

  1. setbit sign 0 1
    setbit sign 1 0
    setbit sign 2 1
    setbit sign 3 1
    setbit sign 4 0
  2. getbit sign 3 查看周三有没有打卡
  3. bitcount sign 打卡天数
  4. bitcount sign 0 3 统计周一到周四的打卡记录天数

四、redis事务操作

1.ACID:要么一起成功,要么一起失败。
2.redis单条命令保证原子性,但是事务不保证原子性 ,所有的命令就会被序列化,会按照顺序执行。
3.redis事务没有隔离级别的概念,所有的命令在事务中,没有直接被执行。
4.**redis的事务:**开启事务【multi】–>命令入队【…】–>执行事务【exec】
在这里插入图片描述
5.DISCARD:取消事务

五、redis乐观锁

5.1 悲观锁

很悲观,认为什么时候都会出现问题

5.2 乐观锁

1.很乐观,认为神魔时候都不会出现问题,不会上锁,更新数据的时候会比较一下这个期间是否有人修改过。
2.获取版本号
3.更新时候比较version
如:
set money 100
set out 0
watch money 监控监事测试
multi 开启事务
DECRBY money 20 减少20
INCRBY out 20 增加20
exec 执行事务

六、Jedis 客户端

6.1 maven项目添加pom

         <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.2.0</version>
         </dependency>
         
		<dependency>
		    <groupId>com.alibaba</groupId>
		    <artifactId>fastjson</artifactId>
		    <version>1.2.66</version>
		</dependency>

6.2 连接测试

public class TestJedis{
	public static void main(String[] args){
		Jedis jedis = new Jedis("127.0.0.1",6379);
		// 所有的方法全部在jedis中
		.........
		JSONObject jsonObject = new JSONObject();
		jsonObject.put("hello","world");
		jsonObject.put("name","reba");
		// 事务
		Transaction multi = jedis.multi();
		String result = jsonObject.toJSONString();
		
		try{
			multi.set("user1",result);
			multi.set("user2",result);
			int i = 1/0; // 模拟异常
			multi.exec(); // 执行事务
		}catch(...){
			multi.discard(); // 放弃事务
		}finally{
			jedis.close(); // 关闭连接
		}

		
	}
}

七、Springboot整合【spring-data】

7.1 添加依赖

在这里插入图片描述
说明:springboot2.x之后,jedis被替换成了lettuce
**jedis:**采用的是直连,多个线程操作是不安全的,如果避免的话,需要使用jedis pool连接池。
**lettuce:**采用netty,实例可以在多个线程中共享,不存在线程不安全的情况。

7.2 添加yaml

spring.redis.host=127.0.0.1
spring.redis.port=6379

7.3 测试

@Autowired
private RedisTemplate redisTemplate;

@Test
void testRedis(){
	// 获取连接对象[可不]
	// RedisConnection conn = redisTemplate.getConnectionFactory().getConnection();
	// conn.flushDb();
	
	redisTemplate.opsForValue();
	redisTemplate.opsForList();
	redisTemplate.opsForSet();
	redisTemplate.opsForHash();
	redisTemplate.opsForZSet();
	
	redisTemplate.opsForGeo();
	redisTemplate.opsForHyperLogLog();
	
} 

7.4 自定义RedisTemplate【开发】 乱码问题

默认的序列化方式为JDK的:
在这里插入图片描述
解决办法:

@Configuration
public class RedisConfig {
 
	@Bean
	public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){
		RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
		redisTemplate.setConnectionFactory(connectionFactory);
		// 解决乱码 实现序列化
		Jackson2JsonRedisSerializer j2r = new Jackson2JsonRedisSerializer<>(Object);
		jedisTemplate.setKeySerializer(j2r);
		return redisTemplate;
	}
	
	@Bean
	public RedisConnectionFactory connectionFactory(JedisPoolConfig poolConfig){
		JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(poolConfig);
		jedisConnectionFactory.setHostName("localhost");
		jedisConnectionFactory.setPort(6379);
		return jedisConnectionFactory;
	}
	
	@Bean
	public JedisPoolConfig poolConfig(){
		JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
		jedisPoolConfig.setMaxIdle(20);
		jedisPoolConfig.setMaxTotal(200);
		jedisPoolConfig.setMaxWaitMillis(2000);
		jedisPoolConfig.setTestOnBorrow(true);
		jedisPoolConfig.setTestOnCreate(true);
		
		return jedisPoolConfig;
	}
}

7.5 RedisUtil工具类

// 在我们真实的分发中,或者你们在公司,一般都可以看到一个公司自己封装RedisUtil

package com.kuang.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public final class RedisUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // =============================common============================
    /**
     * 指定缓存失效时间
     * @param key  键
     * @param time 时间(秒)
     */
    public boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }


    /**
     * 判断key是否存在
     * @param key 键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除缓存
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete(CollectionUtils.arrayToList(key));
            }
        }
    }


    // ============================String=============================

    /**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }
    
    /**
     * 普通缓存放入
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */

    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 普通缓存放入并设置时间
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */

    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 递增
     * @param key   键
     * @param delta 要增加几(大于0)
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }


    /**
     * 递减
     * @param key   键
     * @param delta 要减少几(小于0)
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }


    // ================================Map=================================

    /**
     * HashGet
     * @param key  键 不能为null
     * @param item 项 不能为null
     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }
    
    /**
     * 获取hashKey对应的所有键值
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }
    
    /**
     * HashSet
     * @param key 键
     * @param map 对应多个键值
     */
    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * HashSet 并设置时间
     * @param key  键
     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除hash表中的值
     *
     * @param key  键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }


    /**
     * 判断hash表中是否有该项的值
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }


    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key  键
     * @param item 项
     * @param by   要增加几(大于0)
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }


    /**
     * hash递减
     *
     * @param key  键
     * @param item 项
     * @param by   要减少记(小于0)
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }


    // ============================set=============================

    /**
     * 根据key获取Set中的所有值
     * @param key 键
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key   键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将数据放入set缓存
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 将set数据放入缓存
     *
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 获取set缓存的长度
     *
     * @param key 键
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 移除值为value的
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 移除的个数
     */

    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    // ===============================list=================================
    
    /**
     * 获取list缓存的内容
     *
     * @param key   键
     * @param start 开始
     * @param end   结束 0 到 -1代表所有值
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 获取list缓存的长度
     *
     * @param key 键
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 通过索引 获取list中的值
     *
     * @param key   键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将list放入缓存
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据索引修改list中的某条数据
     *
     * @param key   键
     * @param index 索引
     * @param value 值
     * @return
     */

    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 移除N个值为value
     *
     * @param key   键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */

    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
}

八、redis持久化.RDB/.AOF

8.1 RDB

redis 是内存数据库,断电失去数据,需要持久化。
在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。
Redis会单独创建( fork ) -个子进程来进行持久化,会先将数据写入到一一个临时文件中,待持久化过程都结束了, 再用这个临时文件替换.上次持久化好的文件。整个过程中,主进程是不进行任何I0操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。
默认的是RDB:dump.rdb
在这里插入图片描述
1.测试:修改配置文件
在这里插入图片描述
2.删掉dump.rdb rm -f dump.rdb
3.set k1 v1
set k2 v2
set k3 v3
set k4 v4
set k5 v5
4.可以看到有了一个 dump.rdb文件
5.redis 关机:shutdown
6.开机查看数据:get k1
7.数据恢复:将rdb文件放置redis目录下即可,启动会自动加载
在这里插入图片描述
优点:大规模的数据恢复,dump.rdb。对数据的完整性要求不高。
缺点:需要一定的时间来进行备份,如果脱机,最后一次修改的数据就没了。fork进行的时候,会占用内存空间。

8.2 AOF [Append Only File]

将所有的命令都记录下来,以日志的形式来记录每个写操作, 将Redis执行过的所有指令记录下来(读操作不记录) , 只许追加文件但不可以改写文件, redis启动之初会读取该文件重新构建数据,换言之, redis重启的话就根据日志文件的内容将写指令从前到后执行一-次以完成数据的恢复工作。
在这里插入图片描述
默认是不开启的,我们需要手动进行配置!我们只需要将appendonly改为yes就开启了aof !
重启, redis就可以生效了!
如果这个aof文件有错位,这时候redis 是启动不起来的吗,我们需要修复这个aof文件
redis给我们提供了一个工具[ redis-check-aof --fix
优点: 每一次修改都同步数据,文件完整性较好,每秒同步一次,可能会丢失1s一次
缺点: aof>rdb,修复速度较慢,运行效率也较慢

九、Redis发布-订阅

Redis发布订阅(pub/sub)是一-种消息通信模式:发送者(pub)发送消息。订阅者(sub)接收消息。
Redis客户端可以订阅任意数量的频道。
订阅/发布消息图:
在这里插入图片描述
1.Redis是使用C实现的,通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现,籍此加深对Redis的理解。
2.Redis通过PUBLISH、SUBSCRIBE和PSUBSCRIBE等命令实现发布和订阅功能。
3.通过SUBSCRIBE命令订阅某频道后, redis-server里维护了- -个字典,字典的键就是一一个个channel , 而字典的值则是一个链表,链表中保存了所有订阅这个channel的客户端。SUBSCRIBE 命令的关键,就是将客户端添加到给定channel的订阅链表中。
4.通过PUBLISH命令向订阅者发送消息, redis-server会使用给定的频道作为键,在它所维护的channel字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
5.Pub/Sub从字面上理解就是发布( Publish )与订阅( Subscribe) , 在Redis中,你可以设定对某- -个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。

十、Redis集群

10.1 主从复制

概念
主从复制,是指将一台Redis服务 器的数据,复制到其他的Redis服务器。前者称为主节(master/eader) ,后者称为从节点(slave/follower ;数据的复制是单向的,只能由主节点到从节点。Master以写为主 . Slave 以读为主。
默认情况下.每台Redis服务器都是主节点;且一个主节点可以有多个从节点(或没有从节点) ,但- -个从节点只能有一个主节点。
主从复制的作用主要包括:
1.数据冗余:主从复制实现了数据的热备份,是持久化Z外的一种数据冗余方式。
2.故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是-种服务的冗余。
3、负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务.由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点) . 分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
4、高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。
一般来说 。要将Redis运用于工程项目中,只使用一台Redis是万万不能的,原因如下:
1.从结构上,单个Redis服务器会发生单点故障, 并且一台服务器需要处理所有的请求负载,压力较大;
2.从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G ,也不能将所有内存用作Redis存储内存,
一般来说 。单台Redis最大使用内存不应该超过20G。
在这里插入图片描述
低配:1主2从

10.2 单机伪分布式集群搭建

只配置从库,默认为主库。
info replication -->master
1. 将上面的redis.conf 复制多份
cp redis . conf redis79. conf
cp redis . conf redis80. conf
cp redis . conf redis81. conf
2. 将所有配置文件端口修改
port 6379
port 6380
port 6381
3. 修改所有配置文件的
dbfilename dump6379.rdb
dbfilename dump6380.rdb
dbfilename dump6381.rdb
4. 修改所有配置文件的后台运行pid
pidfile /var/run/redis_6379.pid
pidfile /var/run/redis_6380.pid
pidfile /var/run/redis_6381.pid
5. 修改所有的日子文件名称
logfile “6379.log”
logfile “6380.log”
logfile “6381.log”
6. 启动所有服务测试
redis-server /usr/local/bin/redisconf/redis79.conf
redis-server /usr/local/bin/redisconf/redis80.conf
redis-server /usr/local/bin/redisconf/redis81.conf

10.3 集群配置主从复制

  1. 默认三台都是主机master: 6379【主机】 6380【主机】 6381【主机】
  2. info replication 查看信息
  3. 配置从机6380:【从机】
    SLAVEOF 127.0.0.1 6379
  4. 配置从机6381:【从机】
    SLAVEOF 127.0.0.1 6379

以上1主2从 配置为暂时性的,需要永久配置区配置文件。
主机负责写,从机负责读
在这里插入图片描述
5. 以上没有哨兵,主机宕机后,从机没有上位
从机需要手动上位:
SLAVEOF on noe
Slave启动成功连接到master后会发送一个sync命令
Master接到命令,启动后台的存盘进程,同时收集所有接收至的用于修改数据集命令,在后台进程执行完 毕之后, master将传送整个数据文件到slave ,并完成一次完全同步。
全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
增量复制: Master继续将新的所有收集到的修改命令依次传给slave ,完成同步
但是只要是重新连接master , -次完全同步(全量复制)将被自动执行

10.4 哨兵模式【自动上位】

主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成-段时间内服务不可用。这不是一种推荐的方式 ,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供 了Sentinel (哨兵)架构来解决这个问题。
谋朝篡位的自动版,能够后台监控主机是否故障。如果故障了根据投票数自动将从库转换为主库。
哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。
这里的哨兵有两个作用
●通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
●当哨兵监测到master宕机,会自动将slavet切换成master ,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。
在这里插入图片描述
假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票 ,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线

10.4.1 配置哨兵

1.vim sentinel.conf
sentinel monitor myredis 127.0.0.1 6379 1
2.启动哨兵
redis-sentinel /usr/local/bin/redisconf/sentinel.conf
主机宕机之后,会选举一台机器作为主机。

优点:
1、哨兵集群,基于主从复制模式,所有的主从配置优点,它全有
2、主从可以切换,故障可以转移,系统的可用性就会更好
3、哨兵模式就是主从模式的升级,手动到自动,更加健壮!
缺点:
1、 Redis不好啊在线扩容的,集群容量一旦到达上限,在线扩容就十分麻烦!
2、实现哨兵模式的配置其实是很麻烦的,里面有很多选择!

十一、Redis缓存穿透和雪崩

Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要 害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。
另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。

11.1 缓存穿透

缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中.于是向持久层数据库查询。发现也没有于是本次查询失败。 当用户很多的时候,缓存都没有命中 (秒杀! ) ,于是都去请求 了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。
解决方案:布隆过滤器
布隆过滤器是一种数据结构 ,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;
在这里插入图片描述
但是这种方法会存在两个问题:
1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键;
2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一-段时间窗口的不一 致,这对于需要保持一致性的业务会 :有影响。

11.2 缓存击穿

这里需要注意和缓存击穿的区别, 缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一-个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一一个洞。
当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。
解决方案:

1.设置热点数据永不过期
从缓存层面来看,没有设置过期时间,所以不会出现热点key过期后产生的问题。
2.加互斥锁
分布式锁:使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务 ,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。

11.3 缓存雪崩

缓存雪崩,是指在某一个时间段,缓存集中过期失效。Redis 宕机!
产生雪崩的原因之一, 比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一一个小时。那么到了凌晨一 -点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然形成的缓存雪崩, - 定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的,很有可能瞬间就把数据库压垮。
解决方案:

redis高可用
这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis ,这样一台挂掉之 后其他的还可以继续工作,其实就是搭建的集群。(异地多活!)
限流降级
这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。
数据预热
数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key ,设置不同的过期时间,让缓存失效的时间点尽量均匀。

Logo

更多推荐