侧边栏壁纸
  • 累计撰写 11 篇文章
  • 累计创建 15 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

黑马点评详细总结-redis入门到实战

Pone
2022-11-13 / 0 评论 / 0 点赞 / 1,050 阅读 / 36,897 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-11-13,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

本文来源于网络
原文作者:iTsawaysu
原文链接:https://blog.csdn.net/weixin_45033015/article/details/127545710

项目源码地址:https://gitee.com/sjd75/comment
无限进步,加油~

黑马点评是黑马程序员出品的一套学习Redis的视频,公开在B站,本视频内含大量实战知识,帮助程序猿由入门到实战,学完本套课程一定会受益颇多。

1. Redis 入门

1.1 SQL 与 NoSQL

SQL 关系型数据库:采用关系模型组织数据的数据库,关系模型就是二维表格模型。(关系 — 二维表;记录 — 行;属性 — 字段)

NoSQL 非关系型数据库:非关系型的、分布式的,且一般不保证 ACID 的数据存储系统。

  1. 结构化 与 非结构化
    • 关系型数据库:结构化(Structured),每一张表都有严格的约束信息,字段名、字段数据类型、字段约束等等;约束定义好后,表的结构就固定了,向表中插入数据时都需要严格遵守这些约定,数据库也会对需要插入的数据进行校验。
    • NoSQL:非结构化,对数据库格式没有严格约束,可以是 键值型、文档型、列类型、Graphic类型。
  2. 关联 与 非关联
    • 关系型数据库:数据库的表与表之间往往存在关联,例如 外键
    • NoSLQ:非关系型数据库不存在关联关系,维护关系靠代码中的业务逻辑 或者 数据之间的耦合。
  3. 查询方式
    • 关系型数据库:基于 SQL 查询。
    • NoSQL:语法差异极大,五花八门。
  4. 事务
    • 关系型数据库:满足事务的 ACID 特性。
    • NoSQL:不支持事务 或 不能严格保证 ACID 特性。
SQL NoSQL
数据结构 结构化(Structured) 非结构化
数据关联 关联的(Relational) 无关联的
查询方式 SQL 查询 非 SQL
事务特性 ACID BASE
存储方式 磁盘 内存

1.2 Redis 简介

Redis Remote Dictionary Server 远程字典服务;Redis 是 速度极快的、基于内存的键值型 NoSQL 数据库

  • 键值型 Key-Value,Value 支持多种不同的数据结构;

  • 单线程;

  • 低延迟,速度快(基于内存IO 多路复用、良好的编码);

  • 支持数据的持久化;

  • 支持主从集群、分片集群。

  • Redis 的数据类型

    • 键(Key)的类型只能为 字符串
    • 值(Value)支持五种数据类型:字符串(stirng)、列表(list)、集合(set)、散列表(hash)、有序集合(zset)。
  • Redis 有16个数据库,默认使用的是第 0 个。

    # 切换数据库
    select [0-15]
    
    # 查看数据库大小
    dbsize
    
    # 清除当前数据库内容
    flushdb
    
    # 清除所有数据库内容
    flushall
    

1.3 Redis 相关配置

# 监听的地址,默认是 127.0.0.1 只能本地访问;修改为 0.0.0.0,可以在任意 IP 访问。
bind 0.0.0.0

# 守护进程
daemonize yes

# 密码
requirepass sjd75
# 启动 Redis 服务
redis-server devTools/redis-6.2.7/redis.conf

# 启动 Redis 客户端
redis-cli -p 6379

# 若有密码,启动 Redis 客户端后需要输入密码
auth 密码

# 关闭 Redis 服务
quit
redis-cli shutdown

# 有密码,直接在客户端内 shutdown
shutdown

1.4 Redis 通用命令

  • KEYS patthern :查看所有符合的 key;
  • DEL key [key ...] :删除一个 或 多个 指定的key;
  • EXISTS key :判断某个 key 是否存在;
  • EXPIRE key seconds :给一个 key 设置有效时间,超时后该 key 会被自动删除;
  • TTL key :查看一个 key 的剩余有效时间;
  • TYPE key :查看某个 key 所存储的 value 的类型;
  • RENAME key newkey :为某个 key 重命名;

1.5 Key 的结构

假设 Blog 项目中需要存储 用户、文章信息,且 用户ID 和 文章ID 都为 1。

让 Redis 的 key 形成层级结构,使用 : 隔开,格式如下:

# 该格式并非固定,根据需求进行修改
项目名:业务名:类型:id

若 value 是一个 Java 对象,可以将对象序列化为 JSON 字符串后存储(注意加单引号):

set blog:user:1 '{"id":1, "name":"张三", "age":22}'
set blog:user:2 '{"id":2, "name":"李四", "age":23}'
set blog:article:1 '{"id":1, "title":"SpringCloud", "likes": 751214}'

Key 的层级结构

2. 五大基本数据类型

2.1 String

String 的三种类型:字符串、int、float。

  • STRLEN key :获取某个 key 存储的长度;
  • SET key value :添加 或 修改一个键值对;
  • GET key :获取某个 key 的 value;
  • MSET key value [key value ...] :批量 SET;
  • MGET key [key ...] :批量 GET;
  • INCR key :一个整型的 key 自增 1;
  • INCRBY key increment :一个整型的 key 自增 increment;
  • INCRBYFLOAT key increment :让一个浮点型的 key 自增;
  • SETNX key value :添加一个 String 类型的键值对,前提是这个 key 不存在;
  • SETEX key seconds value :添加一个 String 类型的键值对,并且指定有效时间。

2.2 Hash

Hash 可以当作一个 Map 集合,Key-Value 形式,只不过 Value 是一个 Map,也就是 Key-Map(Key-Map<Key, Value>)

  • String 类型存储对象需要将其序列化为 JSON 字符串后存储,如果需要修改对象的某个字段,比较不方便,只能修改整个 JSON 字符串。

    KEYVALUE
    blog:user:1{"id": 1, "name":"Jack", "age": 22}
    blog:user:2{"id": 2, "name":"Mike", "age": 23}
  • Hash 类型可以将对象的每个字段独立存储,可以针对对象中的单个字段进行 CRUD。

    KEYVALUE
    fieldvalue
    blog:user:1nameJack
    age22
    blog:user:2nameMike
    age23
  • HSET key field value [field value ...] :添加 或 修改 HashKey 的 field 的值;

  • HMSET key field value [field value ...] :HSET 和 HMSET 都可以批量添加 或 修改 HashKey 的 field 的值;

  • HSETNX key field value :添加一个 HashKey 的 field 值,前提是这个 field 不存在。

HSET blog:user:1 id 1
HSET blog:user:1 name Jack
HSET blog:user:1 age 22

# 单独修改一个属性
HSET blog:user:1 age 33

# HSET 和 HMSET 都可以批量 添加 或 修改 HashKey 的 field 值。
HSET blog:user:2 id 2 name Mike age 44
  • HGET key field:获取 HashKey 的 field 的值;
  • HMGET key field [field ...] :批量获取 HashKey 的 field 的值。
HGET blog:user:1 id# 1
HGET blog:user:1 name# Jack
HGET blog:user:1 age# 33

# 批量获取 HashKey 的 field 的值(HGET 不行)
HMGET blog:user:1 id name age
  • HGETALL key :获取一个 HashKey 中的所有 field 和 value;
  • HKEYS key :获取指定 HashKey 中的所有 field;
  • HVALS key :获取指定 HashKey 中的所有 field 的 value。
HGETALL blog:user:1
1) "id"
2) "1"
3) "name"
4) "Jack"
5) "age"
6) "22"

HKEYS blog:user:1
1) "id"
2) "name"
3) "age"

HVALS blog:user:1
1) "1"
2) "Jack"
3) "22"
  • HINCRBY key field increment :一个整型的 HashKey 的 field 的值自增 increment。
HINCRBY blog:user:1 id 10
(integer) 11

HINCRBY blog:user:1 id 100
(integer) 111
  • HDEL key field [field ...] :批量删除 HashKey 的 field 和 field 对应的 value;
  • HLEN key :查看 HashKey 的字段数量;
  • HEXISTS key field :查看 HashKey 的指定字段 field 是否存在。

2.3 List

Redis中的List 类似 Java 中的 LinkedList,可以看作一个双向链表,支持正向检索和反向检索。其特征也与 LinkedList 类似:有序可重复。

  • LPUSH key element [element ...] :向列表左侧(头)插入一个或多个元素;
  • LPOP key [count] :移除并返回左侧(头)的第一个元素;
  • RPUSH key element [element ...] :向列表右侧(尾)插入一个或多个元素;
  • RPOP key :移除并返回右侧(尾)第一个元素。
LPUSH node 1 2 3 4 5
# 5 4 3 2 1

LPOP node
# 4 3 2 1

LPOP node 2
# 2 1

RPUSH 11 22 33
# 2 1 11 22 33

RPOP node
# 2 1 11 22

RPOP node 2
# 2 1
  • LRANGE key start end :返回指定下标范围内的所有元素。
# 返回所有元素
LRANGE node 0 -1

# 返回 第一个、第二个 元素
LRANGE node 0 1
  • LTRIM key start end :只保留指定范围内的元素,其他的删除。
# 保留 第二、三、四个元素
LTRIM node 2 4
  • BLPOP kye [key ...] timeoutBRPOP key [key ...] timeout:与 LPOP 和 RPOP 类似,但是在没有元素时等待指定时间,而不是直接返回 nil。
# 此时数据库中没有 key 为 test 的数据,10 秒内不会返回 nil
BLPOP test 10

# 在另外一个终端添加一个 test
LPUSH test 1

# 10 秒之内若数据库中新增了 test,会将其 POP,并返回 等待时间
127.0.0.1:6379> BLPOP test 10
1) "test"
2) "1"
(7.55s)
  • LINDEX key index :返回指定下标的值;
  • LLEN key :返回列表的元素个数;

:LPUSH + LPOP 或 RPUSH + RPOP。

队列:LPUSH + RPOP 或 RPUSH + LPOP。

2.4 Set

Redis 的 Set 类似 HashSet,可以看作一个 value 为 null 的 HashMap;其特征也与 HashSet 类似:无序不可重复,支持 交集、并集、差集等功能。

  • SADD key member [member ...] :向 Set 中添加一个或多个元素;
  • SMEMBERS key :获取指定 Set 中的所有元素;
  • SISMEMBER key member :判断 Set 中是否存在指定元素;
  • SCARD key :返回 Set 中的元素个数;
  • SREM key member [member ...] :移除 Set 中的指定元素;
SADD set hello
SADD set hi hey halo

SMEMBERS set
1) "halo"
2) "hey"
3) "hi"
4) "hello"

SISMEMBER set hello
SISMEMBER set hello1

SCARD set
(integer) 4

SREM set halo
SREM set hello hi hey
  • SINTER key [key ...] :求 n 个 key 间的交集;
  • SDIFF key [key ...] :求 n 个 key 间的差集;
  • SUNION key [key ...] :求 n 个 key 间的并集。
SADD set1 1 2 3 5 7 9
SADD set2 1 2 4 6 8 10

SINTER set1 set2
1) "1"
2) "2"

SDIFF set1 set2
1) "3"
2) "5"
3) "7"
4) "9"

SDIFF set2 set1
1) "4"
2) "6"
3) "8"
4) "10"

SUNION set1 set2
 1) "1"
 2) "2"
 3) "3"
 4) "4"
 5) "5"
 6) "6"
 7) "7"
 8) "8"
 9) "9"
10) "10"

2.5 Sorted Set

Redis 的 SortedSet 是一个可排序的 Set 集合,SortedSet 的每一个元素都带有一个 score 属性,可以基于 score 属性对元素排序。

**注意:**排名默认升序,降序需要在命令的 Z 后面添加 REV

  • ZADD key [score member ...] :以 score 为权重向 SortedSet 中添加一个或多个元素,如果存在则更新 score;
  • ZREM key member [member ...] :删除 SortedSet 中的指定元素;
  • ZCARD key :返回 SortedSet 中的元素个数;
  • ZSCORE key member :获取 SortedSet 中指定元素的 score 值。
ZADD students 85 Jack 89 Lucy 82 Rose 95 Tom 78 Jerry 92 Amy 76 Miles

ZREM students Tom

ZCARD students

ZSCORE students Lucy
"89"
  • ZRANK key member :获取 SortedSet 中指定元素的排名(按照 score 升序)。
# 升序
ZRANK students Miles
(integer) 0
ZRANK students Jerry
(integer) 1
ZRANK students Amy
(integer) 5

# 降序
ZREVRANK students Miles
(integer) 5
ZREVRANK students Jerry
(integer) 4
ZREVRANK students Amy
(integer) 0
  • ZCOUNT key min max :统计 score 的值在给定范围内的元素个数;
  • ZINCRBY key increment member :让 SortedSet 中的指定元素自增 increment。
ZCOUNT students 70 100
(integer) 6
ZCOUNT students 70 80
(integer) 2
ZCOUNT students 70 90
(integer) 5
ZCOUNT students 90 100
(integer) 1

ZINCRBY students 2 Amy
"94"
  • ZRANGE key min max :按照 score 排序后,获取 指定排名范围 内的元素。
# 升序
# 获取前三名(按 score 排序)
ZRANGE students 0 2
1) "Miles"
2) "Jerry"
3) "Rose"
# 获取所有元素
ZRANGE students 0 -1
1) "Miles"
2) "Jerry"
3) "Rose"
4) "Jack"
5) "Lucy"
6) "Amy"

# 降序
ZRANGE students 0 2
1) "Miles"
2) "Jerry"
3) "Rose"
ZREVRANGE students 0 -1
1) "Amy"
2) "Lucy"
3) "Jack"
4) "Rose"
5) "Jerry"
6) "Miles"
  • ZRANGEBYSCORE key min max :按照 score 排序后,获取 指定 score 范围 内的元素。
# 获取 score 在 0 -80 范围内的元素
ZRANGEBYSCORE students 0 80
1) "Miles"
2) "Jerry"
  • ZINTER numberKeys key [key ...] | ZDIFF numberKeys key [key ...] | ZUNION numberKeys key [key ...] :求交集、差集、并集。
ZINTER 2 sortedSet1 sortedSet2

ZDIFF 2 sortedSet1 sortedSet2
ZDIFF 2 sortedSet2 sortedSet1

ZUNION 2 sortedSet1 sortedSet2

3. Redis 的 Java 客户端

3.1 Jedis

引入 jedis 依赖即可测试。

@SpringBootTest
class JedisTest {

    private Jedis jedis;

    @BeforeEach
    public void beforeEach() {
        jedis = new Jedis("127.0.0.1", 6379);
        jedis.auth("root");
        jedis.select(0);
    }

    @Test
    public void testString() {
        jedis.msetnx("k1", "v1", "k2", "v2", "k3", "v3", "kkk", "1");
        System.out.println(jedis.strlen("k1"));     // 2

        jedis.incr("kkk");      // 2
        jedis.incrBy("kkk", 10);    // 12
        jedis.decrBy("kkk", 5);     // 7

        jedis.setex("expireInTenSeconds", 10, "Bye~");
        try {
            Thread.sleep(1000 * 5);
            System.out.println(jedis.ttl("expireInTenSeconds") + " seconds left!!!");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        List<String> stringList = jedis.mget("k1", "k2", "k3", "kkk");
        stringList.forEach(System.out::println);
    }


    @Test
    public void testList() {
        jedis.rpush("node", "1", "2", "3", "4", "5");       // 1 2 3 4 5
        jedis.lpush("node", "10", "9", "8", "7", "6");      // 6 7 8 9 10 1 2 3 4 5

        jedis.lpop("node", 3);      // 9 10 1 2 3 4 5
        jedis.rpop("node", 3);      // 9 10 1 2

        System.out.println(jedis.llen("node"));     // 4
        System.out.println(jedis.lindex("node", 0));    // 9
        System.out.println(jedis.ltrim("node", 0, 2));  // 9 10 1

        List<String> node = jedis.lrange("node", 0, -1);
        node.forEach(System.out::println);
    }

    @Test
    public void testSet() {
        jedis.sadd("set1", "1", "2", "3", "4", "5");
        jedis.sadd("set2", "4", "5", "6", "7", "8");

        System.out.println(jedis.scard("set1"));    // 5
        Set<String> set1 = jedis.smembers("set1");
        set1.forEach(System.out::println);      // 1 2 3 4 5

        jedis.sinter("set1", "set2").forEach(System.out::println);  // 4 5
        jedis.sdiff("set1", "set2").forEach(System.out::println);   // 1 2 3
        jedis.sdiff("set2", "set1").forEach(System.out::println);   // 6 7 8
        jedis.sunion("set1", "set2").forEach(System.out::println);  // 1 2 3 4 5 6 7 8

        jedis.srem("set1", "1");
        System.out.println(jedis.sismember("set1", "1"));   // false
    }

    @AfterEach
    public void afterEach() {
        if (jedis != null) {
            jedis.close();
        }
    }
}

Jedis 本身是线程不安全的,频繁的创建和销毁连接会有性能损耗,因此使用 Jedis连接池 代替 Jedis 直连

public class JedisConnectionFactory {
    private static final JedisPool jedisPool;

    static {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        // 最大连接
        jedisPoolConfig.setMaxTotal(8);
        // 最大空闲连接
        jedisPoolConfig.setMaxIdle(8);
        // 最小空闲连接
        jedisPoolConfig.setMinIdle(0);
        jedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379, 1000, "root");
    }

    public static Jedis getJedis() {
        return jedisPool.getResource();
    }
}
@SpringBootTest
class JedisTest {

    private Jedis jedis;

    @BeforeEach
    public void beforeEach() {
        jedis = JedisConnectionFactory.getJedis();
    }

    @Test
    public void testHash() {
        jedis.hset("blog:user:1", "id", "1");
        jedis.hsetnx("blog:user:1", "name", "张三");
        jedis.hset("blog:user:1", "age", "22");

        HashMap<String, String> map = new HashMap<>();
        map.put("id", "2");
        map.put("name", "李四");
        map.put("age", "20");
        jedis.hmset("blog:user:2", map);

        jedis.hincrBy("blog:user:1", "age", 1);
        jedis.hincrByFloat("blog:user:1", "age", 1.5);
        System.out.println(jedis.hmget("blog:user:1", "name", "age"));  // 张三 23.5

        System.out.println(jedis.hlen("blog:user:2"));      // 3
        System.out.println(jedis.hdel("blog:user:2", "age"));
        System.out.println(jedis.hexists("blog:user:2", "age"));    // false

        Map<String, String> hgetAll = jedis.hgetAll("blog:user:1");
        hgetAll.entrySet().forEach(System.out::println);            // id=1 name=张三 age=24.5
        jedis.hkeys("blog:user:1").forEach(System.out::println);    // id name age
        jedis.hvals("blog:user:1").forEach(System.out::println);   // 1 张三 24.5
    }

    @AfterEach
    public void afterEach() {
        if (jedis != null) {
            jedis.close();
        }
    }
}

3.2 RedisTemplate

Spring Data Redis 是 Spring 中数据操作的模块,Spring Data Redis 提供了:对不同 Redis 客户端的整合,Lettuce 和 Jedis;通过统一 API 操作 Redis。

API 返回值类型 说明
redisTemplate.opsForValue() ValueOperations 操作 String 类型数据
redisTemplate.opsForHash() HashOperations 操作 Hash 类型数据
redisTemplate.opsForList() ListOperations 操作 List 类型数据
redisTemplate.opsForSet() SetOperations 操作 Set 类型数据
redisTemplate.opsForZSet() ZSetOperations 操作 SortedSet 类型数据
  1. 引入 spring-boot-starter-data-rediscommons-pool2(连接池)依赖;

  2. 配置文件:

    spring:
      redis:
        host: 127.0.0.1
        password: root
        port: 6379
        lettuce:
          pool:
            max-active: 8   # 最大连接数
            max-idle: 8     # 最大空闲数
            min-idle: 0     # 最小空闲数
            max-wait: 100   # 连接等待时间
    
  3. 注入 RestTemplate,测试。

    @Autowired
    private RedisTemplate redisTemplate;
    
    @Test
    public void test() {
      redisTemplate.opsForValue().set("k1", "v1");
      Map<String, String> map = new HashMap<>();
      map.put("k2", "v2");
      map.put("k3", "v3");
    map.put("k4", "v4");
      redisTemplate.opsForValue().multiSet(map);
    redisTemplate.opsForValue().multiGet(Arrays.asList("k1", "k2", "k3", "k4"))
          .forEach(System.out::println);
    }
    

问题1:RedisTemplate 可以接受任意类型的数据写入到 Redis 中,写入前会将其序列化为字节形式存储,底层采用 ObjectOutputStream 序列化。

  • 可读性差;
  • 内存占用大。
KEYS *
    1) "\xac\xed\x00\x05t\x00\x02k1"
    2) "\xac\xed\x00\x05t\x00\x02k2"
    3) "\xac\xed\x00\x05t\x00\x02k3"
    4) "\xac\xed\x00\x05t\x00\x02k4"

GET "\xac\xed\x00\x05t\x00\x02k1"
    "\xac\xed\x00\x05t\x00\x02v1"

自定义 RedisTemplate 的 序列化方式

  1. 引入 jackson-databind 依赖;
  2. 编写配置类 RedisTemplateConfig
@Configuration
public class RedisTemplateConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        // 创建 RedisTemplate 对象
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        // 设置连接工厂
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 设置序列化工具
        GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();

        // Key 和 HashKey 采用 String 序列化(StringRedisSerializer)
        redisTemplate.setKeySerializer(RedisSerializer.string());
        redisTemplate.setHashKeySerializer(RedisSerializer.string());
      
        // Value 和 HashValue 采用 JSON 序列化(GenericJackson2JsonRedisSerializer)
        redisTemplate.setValueSerializer(jsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jsonRedisSerializer);
        
        return redisTemplate;
    }
}
// 自动注入的 `RedisTemplate` 需要加上泛型
@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Test
public void test() {
    redisTemplate.opsForValue().set("k1", "v1");
  redisTemplate.opsForValue().set("user:1", new User("Jack", 21));
}

问题2:通过以上的方法能够解决数据序列化时 可读性差、内存占用大的问题。但是 JSON 的序列化方式仍然存在一些问题,为了反序列化时知道对象的类型,JSON 序列化器会将类的 class 类型写入 JSON 结果,存入 Redis 中,会带来额外的内存开销。

{
  "@class": "com.sun.entity.User",
  "username": "Jack",
  "age": 21
}

为了节省内存空间,不使用 JSON 序列化器来处理 value,统一使用 String 序列化器,要求只能存储 String 类型的 key 和 value;当需要存储 Java 对象时,手动完成对象的序列化和反序列化。

Spring 提供了一个 StringRedisTemplate,它的 key 和 value 的序列化方式默认就是 String,省去了自定义 RedisTemplate 的过程。

  1. 使用 StringRedisTemplate;
  2. 写入数据到 Redis 中,手动将对象序列化为 JSON;
  3. 从 Redis 中读取数据,手动将读取到的 JSON 反序列化为对象。
@Autowired
private StringRedisTemplate stringRedisTemplate;

private static final ObjectMapper objectMapper = new ObjectMapper();

@Test
public void ttt() throws JsonProcessingException {
    User user = new User("Michael", 27);
    // 手动序列化
    String json = objectMapper.writeValueAsString(user);
    // 写入数据
    stringRedisTemplate.opsForValue().set("user:1", json);
    // 读取数据
    String data = stringRedisTemplate.opsForValue().get("user:1");
    // 反序列化
    User deserializedUser = objectMapper.readValue(data, User.class);
    System.out.println(deserializedUser);
}
{
  "username": "Michael",
  "age": 27
}

4. 短信登录

git clone https://gitee.com/sjd75/comment.git

前端代码导入

  • Windows:在 nginx 目录下打开 CMD 窗口,输入 start nginx.exe

  • Mac OS:

    brew install nginx
    
    # 查看 Nginx 安装地址
    brew info nginx
    /opt/homebrew/var/www
    /opt/homebrew/etc/nginx/nginx.conf
    
    # 将前端项目中 html/hmdp 复制到 /opt/homebrew/var/www
    # 将前端项目中 conf/nginx.conf 复制到 /opt/homebrew/etc/nginx/nginx.conf 替换并修改
    

4.1 基于 Session 实现登录

基于 Session 实现短信登录的流程

基于 Session 实现短信登录的流程

发送短信验证码

请求方式 请求地址 请求参数 返回值
POST /user/code phone (@RequestParam)
@Override
public Result sendCode(String phone, HttpSession session) {
    // 1. 校验手机号
    if (RegexUtils.isPhoneInvalid(phone)) {
        return Result.fail("手机号格式错误");
    }
  session.setAttribute("phone", phone);
    
  // 2. 手机号格式正确,则生成验证码
    String captcha = RandomUtil.randomNumbers(6);
    
  // 3. 将验证码保存到 Session
    session.setAttribute("captcha", captcha);
    
  // 4. 发送验证码
    // TODO 暂时不接入第三方短信 API 接口
    log.debug("captcha: {}", captcha);
    return Result.ok();
}
@PostMapping("code")
public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
    // 发送短信验证码并保存验证码
    return userService.sendCode(phone, session);
}

短信验证码登录和注册

请求方式 请求路径 请求参数 返回值
POST /user/login phone, code (@RequestBody)
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
    // 1. 校验手机号
    String phone = (String) session.getAttribute("phone");
    String loginFormPhone = loginForm.getPhone();
    if (!StrUtil.equals(phone, loginFormPhone)) {
        return Result.fail("两次输入的手机号不同");
    } else {
        if (RegexUtils.isPhoneInvalid(phone)) {
            return Result.fail("手机号格式错误");
        }
    }
    
  // 2. 校验验证码
    String captcha = (String) session.getAttribute("captcha");
    String loginFormCode = loginForm.getCode();
    if (!StrUtil.equals(captcha, loginFormCode)) {
        return Result.fail("验证码错误");
    }
  
    // 3. 根据手机号查询用户
    User user = query().eq("phone", phone).one();
  
    // 4. 判断用户是否存在
    // 不存在:创建用户并且保存到 Session 中
    // 存在:保存用户到 Session 中
    if (user == null) {
        user = createNewUserWithPhone(phone);
    }
  
    UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
    session.setAttribute("user", userDTO);
    return Result.ok();
}
@PostMapping("/login")
public Result login(@RequestBody LoginFormDTO loginForm, HttpSession session){
    // 实现登录功能
    return userService.login(loginForm, session);
}

登录验证流程

登录验证流程

@Component
public class LoginInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 从 Session 中获取用户
        Object user = request.getSession().getAttribute("user");

        // 判断用户是否存在:若不存在,拦截;存在,则将其保存到 ThreadLocal 中
        if (user == null) {
            response.setStatus(401);
            return false;
        }
        UserHolder.saveUser((UserDTO) user);
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 线程处理完之后移除用户,防止内存泄漏
        UserHolder.removeUser();
    }
}
@Configuration
public class WebMvcConfiguration implements WebMvcConfigurer {
  @Autowired
  private LoginInterceptor loginInterceptor;
  
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor())
                .excludePathPatterns(
                        "/shop/**", "/shop-type/**", "/upload/**", 
                        "/voucher/**", "/user/login", "/user/code", "/blog/hot"
                );
    }
}
@GetMapping("/me")
public Result me(){
    UserDTO user = UserHolder.getUser();
    return Result.ok(user);
}

4.2 基于 Redis 实现短信登录

Session 共享问题

  1. 假设有三台服务器,第一次请求的 Session 保存在 服务器1,第二次请求被 服务器2 处理,尽管是同一个客户发送的请求,但是在 服务器2 中无法获取到 服务器1 中的 Session;
  2. Tomcat 支持 Session 共享,但是存在广播风暴,用户发送一次请求,服务器1 会将 Session 同步到 服务器2 和 服务器3;当请求数量很大时,占用资源严重。

基于 Redis 实现共享 Session 登录

基于 Redis 实现共享 Session 登录01

@Slf4j
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 发送验证码
     */
    @Override
    public Result sendCode(String phone) {
        // 1. 校验手机号
        if (RegexUtils.isPhoneInvalid(phone)) {
            return Result.fail("手机号格式错误");
        }

        // 2. 手机号格式正确,则生成验证码
        String captcha = RandomUtil.randomNumbers(6);

        // 3. 将验证码保存到 Redis (login:captcha:phone ---> expire 2 minutes)
        redisTemplate.opsForValue().set(LOGIN_CAPTCHA_KEY + phone, captcha, TTL_TWO, TimeUnit.MINUTES);

        // 4. 发送验证码
        // TODO 暂时不接入第三方短信 API 接口
        log.debug("captcha: {}", captcha);
        return Result.ok();
    }


    /**
     * 短信验证码注册和登录
     */
    @Override
    public Result login(LoginFormDTO loginForm) {
        // 1. 校验手机号
        String loginFormPhone = loginForm.getPhone();
        if (RegexUtils.isPhoneInvalid(loginFormPhone)) {
            return Result.fail("手机号格式错误");
        }

        // 2. 校验验证码
        String captcha = redisTemplate.opsForValue().get(LOGIN_CAPTCHA_KEY + loginFormPhone);
        String loginFormCode = loginForm.getCode();
        if (!StrUtil.equals(captcha, loginFormCode)) {
            return Result.fail("验证码错误");
        }

        // 3. 根据手机号查询用户
        User user = query().eq("phone", loginFormPhone).one();

        // 4. 判断用户是否存在。不存在:数据库中创建用户;存在,保存用户信息到 Redis 中。
        if (user == null) {
            user = createNewUserWithPhone(loginFormPhone);
        }

        // 5. 保存用户信息到 Redis 中(随机生成 Token,作为登录令牌;将 User 对象转为 Hash 存储)
        String token = UUID.randomUUID().toString(true);
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        Map<String, Object> map = BeanUtil.beanToMap(userDTO, new HashMap<>(),
                CopyOptions.create()
                        .setIgnoreNullValue(true)
                        .setFieldValueEditor((filedName, fieldValue) -> fieldValue.toString())
        );
        redisTemplate.opsForHash().putAll(LOGIN_USER_KEY + token, map);
        redisTemplate.expire(LOGIN_CAPTCHA_KEY + token, TTL_THIRTY, TimeUnit.DAYS);
        return Result.ok(token);
    }

    private User createNewUserWithPhone(String phone) {
        User user = new User();
        user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomNumbers(10));
        user.setPhone(phone);
        save(user);
        return user;
    }
}

登录验证流程

基于 Redis 实现共享 Session 登录02

@Component
public class LoginInterceptor implements HandlerInterceptor {

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 登录验证流程
     */
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1. 获取 Token
        String token = request.getHeader("authorization");
        if (StrUtil.isBlank(token)) {
            response.setStatus(401);
            return false;
        }

        // 2. 基于 Token 从 Redis 中获取用户
        String key = LOGIN_USER_KEY + token;
        Map<Object, Object> map = redisTemplate.opsForHash().entries(key);
        if (map.isEmpty()) {
            response.setStatus(401);
            return false;
        }

        // 3. 将查询到的 Hash 类型的数据,转换为 UserDTO 类型的数据;保存到 ThreadLocal 中。
        UserDTO userDTO = BeanUtil.fillBeanWithMap(map, new UserDTO(), false);
        UserHolder.saveUser(userDTO);

        // 4. 刷新 Token 有效期
        redisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.DAYS);
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 线程处理完之后移除用户,防止内存泄漏
        UserHolder.removeUser();
    }
}

登录拦截器的优化

登录拦截器的优化

@Component
public class RefreshTokenInterceptor implements HandlerInterceptor {

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 登录验证流程
     */
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1. 获取 Token
        String token = request.getHeader("authorization");
        if (StrUtil.isBlank(token)) {
            return true;
        }

        // 2. 基于 Token 从 Redis 中获取用户
        String key = LOGIN_USER_KEY + token;
        Map<Object, Object> map = redisTemplate.opsForHash().entries(key);
        if (map.isEmpty()) {
            return true;
        }

        // 3. 将查询到的 Hash 类型的数据,转换为 UserDTO 类型的数据;保存到 ThreadLocal 中。
        UserDTO userDTO = BeanUtil.fillBeanWithMap(map, new UserDTO(), false);
        UserHolder.saveUser(userDTO);

        // 4. 刷新 Token 有效期
        redisTemplate.expire(key, TTL_THIRTY, TimeUnit.MINUTES);
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 线程处理完之后移除用户,防止内存泄漏
        UserHolder.removeUser();
    }
}
@Component
public class LoginInterceptor implements HandlerInterceptor {

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 登录验证流程
     */
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 判断是否需要拦截(ThreadLocal 中是否有用户)
        if (UserHolder.getUser() == null) {
            response.setStatus(401);
            return false;
        }
        return true;
    }
}
@Configuration
public class WebMvcConfiguration implements WebMvcConfigurer {
    @Resource
    private LoginInterceptor loginInterceptor;

    @Resource
    private RefreshTokenInterceptor refreshTokenInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(loginInterceptor)
                .excludePathPatterns(
                        "/shop/**", "/shop-type/**", "/upload/**",
                        "/voucher/**", "/user/login", "/user/code", "/blog/hot"
                ).order(1);
        
        registry.addInterceptor(refreshTokenInterceptor)
                .addPathPatterns("/**")
                .order(0);
    }
}

5. 缓存

缓存

数据交换的缓冲区(称作 Cache),存储数据的临时地方,读写性能较高

5.1 添加缓存

添加 Redis 缓存

/shop/{id} 接口添加缓存

@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
    return shopService.queryById(id);
}

@Override
public Result queryById(Long id) {
    String key = CACHE_SHOP_KEY + id;

    // 1. 从 Redis 中查询店铺缓存;
    String shopJson = redisTemplate.opsForValue().get(key);

    // 2. 若 Redis 中存在,则将其转换为 Java 对象后返回;
    if (StrUtil.isNotBlank(shopJson)) {
        Shop shop = JSONUtil.toBean(shopJson, Shop.class);
        return Result.ok(shop);
    }

    // 3. 若 Redis 中不存在,则根据 id 从数据库中查询;
    Shop shop = getById(id);

    // 4. 若 数据库 中不存在,则报错;
    if (shop == null) {
        return Result.fail("店铺不存在!");
    }

    // 5. 若 数据库 中存在,则将其返回并存入 Redis 缓存中。
    redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), TTL_THIRTY, TimeUnit.MINUTES);
    return Result.ok(shop);
}

/shop-type/list 接口添加缓存

@GetMapping("list")
public Result queryTypeList() {
    // return typeService.usingStringToQueryByCacheOrderByAscSort();
    return typeService.usingListToQueryByCacheOrderByAscSort();
}

// 使用 List
@Override
public Result usingListToQueryByCacheOrderByAscSort() {
    // 1. 从 Redis 中查询;
    List<String> shopTypeJsonList = redisTemplate.opsForList().range(CACHE_SHOP_TYPE_KEY, 0, -1);

  // 2. Redis 中存在则直接返回;
    if (!shopTypeJsonList.isEmpty()) {
        ArrayList<ShopType> shopTypeList = new ArrayList<>();
        for (String str : shopTypeJsonList) {
            ShopType shopType = JSONUtil.toBean(str, ShopType.class);
            shopTypeList.add(shopType);
        }
        return Result.ok(shopTypeList);
    }
    
  // 3.Redis 中不存在则从数据库中查询;数据库中不存在则报错。
    List<ShopType> shopTypeList = query().orderByAsc("sort").list();
    if (shopTypeList.isEmpty() && shopTypeList == null) {
        return Result.fail("该店铺类型不存在!") ;
    }
    for (ShopType shopType : shopTypeList) {
        String shopTypeJson = JSONUtil.toJsonStr(shopType);
        shopTypeJsonList.add(shopTypeJson);
    }
    
  // 4. 数据库中存在,将其保存到 Redis 中并返回。
    redisTemplate.opsForList().rightPushAll(CACHE_SHOP_TYPE_KEY, shopTypeJsonList);
    return Result.ok(shopTypeList);
}

// 使用 String
@Override
public Result usingStringToQueryByCacheOrderByAscSort() {
    // 1. 从 Redis 中查询
    String shopTypeJson = redisTemplate.opsForValue().get(CACHE_SHOP_TYPE_KEY);
    
  // 2. Redis 中存在则直接返回
    if (StrUtil.isNotBlank(shopTypeJson)) {
        List<ShopType> shopTypeList = JSONUtil.toList(JSONUtil.parseArray(shopTypeJson), ShopType.class);
        return Result.ok(shopTypeList);
    }
    
  // 3. Redis 中不存在则从数据库中查询;数据库中不存在则报错.
    List<ShopType> shopTypeList = query().orderByAsc("sort").list();
    if (shopTypeList.isEmpty() && shopTypeList == null) {
        return Result.fail("店铺类型不存在!") ;
    }
  
    // 4. 数据库中存在,将其保存到 Redis 中并返回
    redisTemplate.opsForValue().set(CACHE_SHOP_TYPE_KEY, JSONUtil.toJsonStr(shopTypeList), TTL_THIRTY, TimeUnit.MINUTES);
    return Result.ok(shopTypeList);
}

5.2 缓存更新策略

内存淘汰 超时剔除 主动更新
说明 利用 Redis 的内存淘汰机制,内存不足时自动淘汰部分数据,下次查询时更新缓存。 为缓存数据添加 TTL 时间,到期后自动删除缓存,下次查询时更新缓存。 编写业务逻辑,在修改数据库的同时更新缓存。
一致性 一般
维护成本
  • 低一致性需求:使用内存淘汰机制;
  • 高一致性需求:使用主动更新,并以超时剔除为兜底方案。

主动更新策略

Cache Aside Pattern Read/Write Through Pattern Write Behind Catching Pattern
由缓存的调用者,在更新数据库的同时更新缓存 缓存与数据库整合为一个服务,由服务来维护一致性。调用者使用该服务,无需关心缓存一致性问题。 调用者只凑走缓存,由其他线程异步的将缓存数据持久到数据库中,保证最终一致性。

操作缓存和数据库时需要考虑的三个问题

  1. 删除缓存还是更新缓存?(删除缓存!!!

    • 更新缓存:每次更新数据库时都更新缓存,无效写操作较多;
    • 删除缓存:每次更新数据库时都让缓存失效,查询时再更新缓存。
  2. 如何保证缓存和数据库的操作同时成功或失败?

    • 单体系统:将缓存与数据库操作放在一个事务;
    • 分布式系统:利用 TCC 等分布式事务方案。
  3. 先操作缓存还是先操作数据库?(假设缓存为 10,数据库为 10)

    • 先删除缓存,再操作数据库;

      • ✔️✔️✔️ 线程1 删除缓存并且数据库为 20;线程2 查询缓存未命中,缓存更新为 20。

        先删缓存,再操作数据库01

      • ❌❌❌ 线程1 删除缓存;线程2 并行执行,查询缓存未命中(此时数据库仍为 10,更新未完毕),缓存更新为 10;线程2 更新数据库为 20。

        先删缓存,再操作数据库02

    • 先操作数据库,再删除缓存。

      • ✔️✔️✔️ 线程2 更新数据库为 20,删除缓存;线程1 查询缓存未命中,写入缓存为 20。

        先操作数据库,再删缓存01

      • ❌❌❌ 线程1 查询数据库未命中;线程2 更新数据库为 20,删除缓存;线程1 写入缓存为10。

        But,缓存的速度比数据库快很多,比较可能的是线程1 查询未命中后直接写入缓存,后又被线程2 删除。

        先操作数据库,再删缓存02

缓存更新策略的最佳实践方案

  1. 低一致性需求:使用 Redis 自带的内存淘汰机制;
  2. 高一致性需求:使用主动更新策略,并以超时剔除作为兜底方案。
    • 读操作:
      • 缓存命中则直接返回;
      • 缓存未命中则查询数据库,并写入缓存,设定超时时间。
    • 写操作:
      • 先写数据库,然后再删缓存;
      • 确保数据库与缓存操作的原子性。

5.2.1 添加缓存更新策略

为修改店铺信息的 /shop 接口添加缓存更新策略。

  1. 访问店铺:http://localhost:8080/api/shop/1,该店铺信息将被存储到 Redis 缓存中;
  2. http://localhost:8081(PUT),修改该店铺信息(先操作数据库);
  3. Redis 缓存中所存储的店铺信息被删除(后删除缓存)。
@Transactional
@Override
public Result modify(Shop shop) {
    Long id = shop.getId();
    if (id == null) {
        return Result.fail("店铺 ID 不能为空!");
    }
  
    // 1. 修改数据库
    this.updateById(shop);
    
  // 2. 删除缓存
    redisTemplate.delete(CACHE_SHOP_KEY + id);
    return Result.ok();
}

/**
 * 更新店铺信息
 */
@PutMapping
public Result updateShop(@RequestBody Shop shop) {
    return shopService.modify(shop);
}

5.3 缓存穿透

缓存穿透:查询某个 Key 对应的数据,Redis 缓存中没有相应的数据,则直接到数据库中查询;数据库中也不存在要查询的数据,数据库会返回空,Redis 也不会缓存这个空结果;导致每次通过这个 Key 查询数据都会直接到数据库中查询。给数据库带来巨大的压力,可能最终会导致数据库崩溃。

Redis 缓存穿透的两种方法

  • 缓存空对象:发送请求,未命中缓存,未命中数据库,为了防止不断的请求,将 null 缓存到 Redis 中;之后的请求将会直接命中 Redis 缓存中的 null 值。
    • 实现简单,维护方便;
    • 缓存中包含过多的 null 值,会造成额外的内存消耗(可以设置 TTL 解决);
    • 可能造成短期的不一致(可以通过 先操作数据库,后删除缓存 解决)。
  • 布隆过滤
    • 内存占用较少,没有多余的 key;
    • 实现复杂;
    • 存在误判的可能。

缓存空对象 & 布隆过滤

5.3.1 基于缓存空对象解决缓存穿透问题

/shop/{id} 接口增加解决缓存穿透的方案

缓存穿透的实现

@Override
public Result queryById(Long id) {
    // 缓存穿透
    Shop shop = dealWithCachePenetrationByNullValue(id);
    return Result.ok(shop);
}

/**
 * 通过缓存空对象解决 Redis 的缓存穿透问题
 */
public Shop dealWithCachePenetrationByNullValue(Long id) {
    String key = CACHE_SHOP_KEY + id;
    
  // 1. 从 Redis 中查询店铺缓存;
    String shopJson = redisTemplate.opsForValue().get(key);
    
  // 2. 若 Redis 中存在(命中),则将其转换为 Java 对象后返回;
    if (StrUtil.isNotBlank(shopJson)) {
        Shop shop = JSONUtil.toBean(shopJson, Shop.class);
        return shop;
    }
    
  // 3. 命中缓存后判断是否为空值
    if (ObjectUtil.equals(shopJson, "")) {
        return null;
    }
    
  // 4. 若 Redis 中不存在(未命中),则根据 id 从数据库中查询;
    Shop shop = getById(id);
    
  // 5. 若 数据库 中不存在,将空值写入 Redis(缓存空对象)
    if (shop == null) {
        redisTemplate.opsForValue().set(key, "", TTL_TWO, TimeUnit.MINUTES);
        return null;
    }
  
    // 6. 若 数据库 中存在,则将其返回并存入 Redis 缓存中。
    redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), TTL_THIRTY, TimeUnit.MINUTES);
    return shop;
}
  • 访问 http://localhost:8080/api/shop/0,查询一次数据库,返回空值,通过缓存空对象解决 Redis 的缓存穿透问题,在 Redis 中缓存一个空值;
  • 再次访问,不会查询数据库,直接将 Redis 中的空值返回。

5.4 缓存雪崩

缓存雪崩:大量的 Key 在同一时间内大面积的失效 或者 Redis 服务宕机,导致后面的请求直接打到数据库,造成数据库短时间内承受大量的请求。

解决方案

  • 给不同的 Key 的 TTL 添加随机值,避免同时失效;
  • 利用 Redis 集群提高服务的可用性;
  • 给缓存业务添加降级限流策略;
  • 给业务添加多级缓存。

5.5 缓存击穿

缓存击穿问题,也叫 热点 Key 问题;就是一个被 高并发访问 并且 缓存中业务较复杂的 Key 突然失效,大量的请求在极短的时间内一起请求这个 Key 并且都未命中,无数的请求访问在瞬间打到数据库上,给数据库带来巨大的冲击。

缓存击穿

缓存击穿的解决方案

  • 互斥锁:查询缓存未命中,获取互斥锁,获取到互斥锁的才能查询数据库重建缓存,将数据写入缓存中后,释放锁。
  • 逻辑过期:查询缓存,发现逻辑时间已经过期,获取互斥锁,开启新线程;在新线程中查询数据库重建缓存,将数据写入缓存中后,释放锁;在释放锁之前,查询该数据时,都会将过期的数据返回。

解决缓存击穿 —— 互斥锁 & 逻辑过期

解决方案 优点 缺点
互斥锁 没有额外的内存消耗;保证一致性;实现简单 线程需要等待,性能受影响;可能有死锁风险
逻辑过期 线程无需等待,性能较好 有额外内存消耗;不保证一致性;实现复杂

5.5.1 基于互斥锁解决缓存击穿问题

基于互斥锁解决缓存击穿问题

核心:利用 Redis 的 setnx 方法来表示获取锁。该方法的含义是:如果 Redis 中没有这个 Key,则插入成功;如果有这个 Key,则插入失败。通过插入成功或失败来表示是否有线程插入 Key,插入成功的 Key 则认为是获取到锁的线程;释放锁就是将这个 Key 删除,因为删除 Key 以后其他线程才能再执行 setnx 方法。

/**
 * 获取互斥锁
 */
private boolean tryLock(String key) {
    Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "1", TTL_TEN, TimeUnit.SECONDS);
    return BooleanUtil.isTrue(flag);
}

/**
 * 释放互斥锁
 */
private void unLock(String key) {
    redisTemplate.delete(key);
}

一次请求的过程

  1. 请求打进来,先去 Redis 中查,未命中;
  2. 获取互斥锁:将一个 Key 为 LOCK_SHOP_KEY + id 的数据写入 Redis 中,此时其他线程就无法拿到这个 Key,也就无法继续后续操作;
  3. 获取失败就进行休眠,休眠结束后通过递归再次请求;
  4. 获取成功,查询数据库、将需要查询的那个数据写入 Redis;
  5. 最后,删除通过 setnx 创建的那个 Key。
@Override
public Result queryById(Long id) {
    // 缓存击穿(Mutex)
    Shop shop = dealWithCacheHotspotInvalidByMutex(id);
    return Result.ok(shop);
}

/**
 * 通过互斥锁解决 Redis 的缓存击穿问题
 */
public Shop dealWithCacheHotspotInvalidByMutex(Long id) {
    String key = CACHE_SHOP_KEY + id;
    
  // 1. 从 Redis 中查询店铺缓存;
    String shopJson = redisTemplate.opsForValue().get(key);
    
  // 2. 若 Redis 中存在(命中),则将其转换为 Java 对象后返回;
    if (StrUtil.isNotBlank(shopJson)) {
        return JSONUtil.toBean(shopJson, Shop.class);
    }
    
  // 3. 命中缓存后判断是否为空值
    if (ObjectUtil.equals(shopJson, "")) {
        return null;
    }
  
    // 4. 若 Redis 中不存在(缓存未命中),实现缓存重建
    // 4.1 获取互斥锁
    String lockKey = LOCK_SHOP_KEY + id;
    Shop shop = null;
    try {
        boolean isLocked = tryLock(lockKey);
        // 4.2 获取失败,休眠重试
        if (!isLocked) {
            Thread.sleep(50);
            return dealWithCacheHotspotInvalidByMutex(id);
        }
        // 4.3 获取成功,从数据库中根据 id 查询数据
        shop = getById(id);
        // 4.4 若 数据库 中不存在,将空值写入 Redis(缓存空对象)
        if (shop == null) {
            redisTemplate.opsForValue().set(key, "", TTL_TWO, TimeUnit.MINUTES);
            return null;
        }
        // 4.5 若 数据库 中存在,则将其返回并存入 Redis 缓存中。
        redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), TTL_THIRTY, TimeUnit.MINUTES);
    } catch (Exception e) {
        throw new RuntimeException(e);
    } finally {
      // 5. 释放互斥锁
        unLock(lockKey);
    }
    return shop;
}

5.5.2 基于逻辑过期解决缓存击穿问题

基于逻辑过期解决缓存击穿问题

数据预热

  • 可以认为存储到 Redis 中的 Key 永久有效的,其过期时间是可以代码控制的,而非通过 TTL 控制。
  • 因此 Redis 存储的数据需要带上一个逻辑过期时间,即 Shop 实体类中需要一个逻辑过期时间属性。
  • 可以新建一个 RedisData,该类包含两个属性 —— expireTime 和 Data,对原来的代码没有入侵性。
@Data
public class RedisData {
    private LocalDateTime expireTime;
    private Object data;
}

ShopServiceImpl 中新增一个方法,利用单元测试对热点数据进行预热。

/**
 * 对热点数据进行预热(提前存储到 Redis 中)
 */
public void saveHotspot2Redis(Long id, Long expireSeconds) {
    // 1. 查询店铺信息
    Shop shop = getById(id);
    // 2. 封装逻辑过期时间
    RedisData redisData = new RedisData();
    redisData.setData(shop);
    redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
    // 3. 写入 Redis
    redisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
@SpringBootTest
class CommentApplicationTests {
    @Autowired
    private ShopServiceImpl shopService;

    @Test
    public void test() {
        shopService.saveHotspot2Redis(1L, 10L);
    }
}
# Redis 中存储的数据会多一个 expireTime 的值
{
    "data": {
        "area": "大关",
        "openHours": "10:00-22:00",
        "sold": 4215,
        ...
    },
    "expireTime": 1665566674797
}

逻辑过期解决缓存击穿问题

// 重建缓存的方法 sleep 200ms,让一部分线程先于该方法完成查询。(实现 5.5 处,“缓存击穿的解决方案“图片的效果)
public void saveHotspot2Redis(Long id, Long expireSeconds) throws InterruptedException {
    // 1. 查询店铺信息
    Shop shop = getById(id);
    Thread.sleep(200);
    // 2. 封装逻辑过期时间
    RedisData redisData = new RedisData();
    redisData.setData(shop);
    redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
    // 3. 写入 Redis
    redisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
@Override
public Result queryById(Long id) {
    // 缓存击穿(Logical Expiration)
    Shop shop = dealWithCacheHotspotInvalidByLogicalExpiration(id);
    return Result.ok(shop);
}
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

public Shop dealWithCacheHotspotInvalidByLogicalExpiration(Long id) {
    String key = CACHE_SHOP_KEY + id;
    
  // 1. 从 Redis 中查询店铺缓存;
    String shopJson = redisTemplate.opsForValue().get(key);
    
  // 2. 未命中
    if (StrUtil.isBlank(shopJson)) {
        return null;
    }
    
  // 3. 命中(先将 JSON 反序列化为 对象)
    RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
    Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
    LocalDateTime expireTime = redisData.getExpireTime();
    
  // 4. 判断是否过期:未过期,直接返回店铺信息;过期,需要缓存重建。
    if (expireTime.isAfter(LocalDateTime.now())) {
        return shop;
    }
    
  // 5. 缓存重建(未获取到互斥锁,直接返回店铺信息)
    String lockKey = LOCK_SHOP_KEY + id;
    boolean isLocked = tryLock(lockKey);
    
  // 5.1 获取到互斥锁
    // 开启独立线程:根据 id 查询数据库,将数据写入到 Redis,并且设置逻辑过期时间。
    // 此处必须进行 DoubleCheck:多线程并发下,若线程1 和 线程2都到达这一步,线程1 拿到锁,进行操作后释放锁;线程2 拿到锁后会再次进行查询数据库、写入到 Redis 中等操作。
    shopJson = redisTemplate.opsForValue().get(key);
    if (StrUtil.isBlank(shopJson)) {
        return null;
    }
    if (isLocked) {
        CACHE_REBUILD_EXECUTOR.submit(() -> {
            try {
                // 重建缓存
                this.saveHotShopInformation2Redis(id, 30L);
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                // 释放互斥锁
                unLock(lockKey);
            }
        });
    }
  
  // 5.2 返回店铺信息
    return shop;
}

5.6 封装 Redis 工具类

CacheClient

@Component
@Slf4j
public class CacheClient {
    private final StringRedisTemplate redisTemplate;

    public CacheClient(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 将任意 Java 对象序列化为 JSON 存储在 String 类型的 Key 中,并且可以设置 TTL 过期时间
     */
    public void set(String key, Object value, Long time, TimeUnit timeUnit) {
        redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, timeUnit);
    }

    /**
     * 将任意 Java 对象序列化为 JSON 存储在 String 类型的 Key 中,并且可以设置逻辑过期时间,用于处理缓存击穿
     */
    public void setWithLogicalExpiration(String key, Object value, Long time, TimeUnit timeUnit) {
        RedisData redisData = new RedisData();
        redisData.setData(value);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(time)));
        redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }

    /**
     * 根据指定的 Key 查询缓存,反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题。
     */
    public <R, ID> R dealWithCachePenetration(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit timeUnit) {
        String key = keyPrefix + id;
        // 1. 从 Redis 中查询店铺缓存;
        String json = redisTemplate.opsForValue().get(key);
        // 2. 若 Redis 中存在(命中),则将其转换为 Java 对象后返回;
        if (StrUtil.isNotBlank(json)) {
            return JSONUtil.toBean(json, type);
        }
        // 3. 命中缓存后判断是否为空值
        if (ObjectUtil.equal(json, "")) {
            return null;
        }
        // 4. 若 Redis 中不存在(未命中),则根据 id 从数据库中查询;
        R r = dbFallback.apply(id);

        // 5. 若 数据库 中不存在,将空值写入 Redis(缓存空对象)
        if (r == null) {
            redisTemplate.opsForValue().set(key, "", TTL_TWO, TimeUnit.MINUTES);
            return null;
        }

        // 6. 若 数据库 中存在,则将其返回并存入 Redis 缓存中。
        this.set(key, r, time, timeUnit);
        return r;
    }

    /**
     * 根据指定的 Key 查询缓存,反序列化为指定类型,利用逻辑过期的方式解决缓存击穿问题。
     */
    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
    public <R, ID> R dealWithCacheHotspotInvalid(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit timeUnit) {
        String key = keyPrefix + id;
        // 1. 从 Redis 中查询店铺缓存;
        String json = redisTemplate.opsForValue().get(key);
        // 2. 未命中
        if (StrUtil.isBlank(json)) {
            return null;
        }
        // 3. 命中(先将 JSON 反序列化为 对象)
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
        LocalDateTime expireTime = redisData.getExpireTime();
        // 4. 判断是否过期:未过期,直接返回店铺信息;过期,需要缓存重建。
        if (expireTime.isAfter(LocalDateTime.now())) {
            return r;
        }
        // 5. 缓存重建(未获取到互斥锁,直接返回店铺信息)
        String lockKey = LOCK_SHOP_KEY + id;
        boolean isLocked = tryLock(lockKey);
        // 5.1 获取到互斥锁
        // 开启独立线程:根据 id 查询数据库,将数据写入到 Redis,并且设置逻辑过期时间。
        // 此处必须进行 DoubleCheck:多线程并发下,若线程1 和 线程2都到达这一步,线程1 拿到锁,进行操作后释放锁;线程2 拿到锁后会再次进行查询数据库、写入到 Redis 中等操作。
        json = redisTemplate.opsForValue().get(key);
        if (StrUtil.isBlank(json)) {
            return null;
        }
        if (isLocked) {
            CACHE_REBUILD_EXECUTOR.submit(() -> {
                try {
                    // 重建缓存
                    R apply = dbFallback.apply(id);
                    this.setWithLogicalExpiration(key, apply, time, timeUnit);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    // 释放互斥锁
                    unLock(lockKey);
                }
            });
        }
        // 5.2 返回店铺信息
        return r;
    }

    /**
     * 获取互斥锁
     */
    private boolean tryLock(String key) {
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "1", TTL_TEN, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    /**
     * 释放互斥锁
     */
    private void unLock(String key) {
        redisTemplate.delete(key);
    }
}

测试

// 预热,为实现逻辑过期做准备
@Autowired
private CacheClient cacheClient;

@Test
public void advanceHotspot() {
Shop shop = shopService.getById(1L);
cacheClient.setWithLogicalExpiration(CACHE_SHOP_KEY + 1L, shop, TTL_TEN, TimeUnit.SECONDS);
}
@Resource
private CacheClient cacheClient;

@Override
public Result queryById(Long id) {
    // 缓存穿透
     Shop shop = cacheClient.dealWithCachePenetration(CACHE_SHOP_KEY, id, Shop.class, this::getById, TTL_THIRTY, TimeUnit.MINUTES);

    // 缓存击穿(Logical Expiration)
    Shop shop = cacheClient.dealWithCacheHotspotInvalid(CACHE_SHOP_KEY, id, Shop.class, this::getById, TTL_TEN, TimeUnit.SECONDS);
    return Result.ok(shop);
}

6. 秒杀相关

每个店铺都可以发布优惠券,保存到 tb_voucher 表中;当用户抢购时,生成订单并保存到 tb_voucher_order 表中。

  • 订单表如果使用数据库自增 ID,会存在以下问题:
    • ID 的规律太明显,容易暴露信息;
    • 单表数据量的限制,订单过多时单表很难存储得下。

6.1 全局唯一 ID

全局唯一 ID,一般需要满足以下特性:

  • 唯一性;
  • 高可用;
  • 高性能;
  • 递增性;
  • 安全性。

ID 的组成部分

  • 符号位:1 bit,永远为 0;
  • 时间戳:31 bit,以秒为单位,可以使用 69 年;
  • 序列号:32 bit,秒内的计数器,支持每秒产生 2^32 个不同的 ID。

全局唯一 ID

Redis ID 自增策略:通过设置每天存入一个 Key,方便统计订单数量;ID 构造为 时间戳 + 计数器。

@Component
public class RedisIdWorker {

    /**
     * LocalDateTime.of(2022, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC);
     * 2022年1月1日 0:0:00 的时间戳
     */
    private static final long BEGIN_TIMESTAMP_2022 = 1640995200L;
    /**
     * 序列号的位数
     */
    private static final int BITS_COUNT = 32;

    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public long nextId(String keyPrefix) {
        // 1. 时间戳
        long currentTimestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
        long timestamp = currentTimestamp - BEGIN_TIMESTAMP_2022;

        // 2. 序列号
        String formatTime = DateTimeFormatter.ofPattern("yyyy:MM:dd").format(LocalDateTime.now());
        long serialNumber = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + formatTime);

        // 3. 拼接(时间戳向左移 32 位,通过或运算将其与序列号拼接)
        return timestamp << BITS_COUNT | serialNumber;
    }

    //public static void main(String[] args) {
    //    long timestamp = LocalDateTime.of(2022, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC);
    //    System.out.println(timestamp);
    //
    //    String formatTime = DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDateTime.now());
    //    System.out.println(formatTime);
    //}
}

测试

@Resource
private RedisIdWorker redisIdWorker;

private ExecutorService es = Executors.newFixedThreadPool(500);

@Test
public void testRedisIdWorker() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(300);
    Runnable task = () -> {
        for (int i = 0; i < 100; i++) {
            long id = redisIdWorker.nextId("sun");
            System.out.println("id = " + id);
        }
        latch.countDown();
    };

    long begin = System.currentTimeMillis();
    for (int i = 0; i < 300; i++) {
        es.submit(task);
    }
    long end = System.currentTimeMillis();
    latch.await();
    System.out.println("Execution Time: " + (end - begin));
}

6.2 实现优惠券秒杀下单

6.2.1 增加优惠券

优惠券分为平价券和特价券;平价券可以任意购买,特价券需要秒杀抢购。表关系如下:

  • tb_voucher :优惠券的基本信息,优惠金额、使用规则等;

    id  shop_id  title  sub_title  rules  pay_value  actual_value  type  status  create_time  update_time
    
  • tb_seckill_voucher :秒杀优惠券的库存、开始抢购时间、结束抢购时间等。

    voucher_id  stock  begin_time  end_time  create_time  update_time
    

VoucherController 中提高了一个接口,可以添加优惠券和秒杀优惠券:

/**
 * 新增秒杀优惠券
 * @param voucher 优惠券信息,包含秒杀信息
 */
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
    voucherService.addSeckillVoucher(voucher);
    return Result.ok(voucher.getId());
}

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
    // 保存优惠券
    save(voucher);
    // 保存秒杀信息
    SeckillVoucher seckillVoucher = new SeckillVoucher();
    seckillVoucher.setVoucherId(voucher.getId());
    seckillVoucher.setStock(voucher.getStock());
    seckillVoucher.setBeginTime(voucher.getBeginTime());
    seckillVoucher.setEndTime(voucher.getEndTime());
    seckillVoucherService.save(seckillVoucher);
}
{
    "shopId": 7,
    "title": "100元代金券",
    "subTitle": "周一到周五均可使用",
    "rules": "全场通用\\n无需预约\\n可无限叠加\\不兑现、不找零\\n仅限堂食",
    "payValue": 8000,
    "actualValue": 10000,
    "type": 1,
    "stock": 100,
    "beginTime":"2022-10-17T9:40:00",
    "endTime":"2022-10-17T23:40:00"
}

增加优惠券(秒杀券)

6.2.2 实现下单

请求方式 请求路径 请求参数 返回值
POST /voucher-order/seckill/{id} 优惠券 id (@PathVarible) 订单 id

下单前需要判断:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单;
  • 库存是否充足,不足则无法下单。

下单流程

秒杀下单功能

@Transactional
@Override
public Result seckillVoucher(Long voucherId) {
    // 1. 根据 优惠券 id 查询数据库
    SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
  
    // 2. 判断秒杀是否开始或结束(未开始或已结束,返回异常结果)
    if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
        return Result.fail("秒杀尚未开始..");
    }
    if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
        return Result.fail("秒杀已经结束..");
    }
    
  // 3. 判断库存是否充足(不充足返回异常结果)
    if (seckillVoucher.getStock() < 1) {
        return Result.fail("库存不足..");
    }
    
  // 4. 减扣库存
    boolean isAccomplished = seckillVoucherService.update()
            .setSql("stock = stock -1")
            .eq("voucher_id", voucherId).update();
    if (!isAccomplished) {
        return Result.fail("库存不足..");
    }
    
  // 5. 创建订单
    VoucherOrder voucherOrder = new VoucherOrder();
    long orderId = redisIdWorker.nextId("order");
    Long userId = UserHolder.getUser().getId();
    voucherOrder.setId(orderId);
    voucherOrder.setUserId(userId);
    voucherOrder.setVoucherId(voucherId);
    boolean isSaved = save(voucherOrder);
    if (!isSaved) {
        return Result.fail("下单失败..");
    }
    
  // 6. 返回 订单 id
    return Result.ok(orderId);
}

6.3 超卖问题

使用 Jmeter 进行测试能够发现:秒杀优惠券的库存为 负数,生成的订单数量超过 100 份。

测试优惠券秒杀下单

超卖问题的出现

假设库存为 1,有线程1、2、3,时刻 t1、t2、t3、t4。

  • 线程1 在 t1 查询库存,库存为 1;线程2 在 t2 查询库存,库存为 1;线程3 在 t2 查询库存,库存为 1。
  • 线程1 在 t3 下单,库存扣减为0;
  • 线程2 和 线程3 在 t4 下单,库存扣减为 -2。

6.3.1 乐观锁

  • 悲观锁:每次拿数据的时候都会上锁,共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程
  • 乐观锁:每次拿数据的时候都不会上锁,但在更新时会判断在此期间有没有其他线程更新这个数据;如果存在冲突,则采取一个补偿措施(比如告知用户失败)。
    • 乐观锁的关键是判断之前查询得到的数据是否被修改过;
    • 一般有 2 种实现方式:版本号法CAS 法

通过版本号法实现乐观锁

  1. 查询数据,获取当前需要操作数据的 版本号;
  2. 更新数据时同时需要更新版本号;
  3. 若执行更新时的版本号 与 最初查询获取到的版本号不同,则更新失败。

假设库存为 1,有线程1、2、3,时刻 t1、t2、t3、t4。

  • 线程1 在 t1 查询库存,库存为 1,版本号为 1;线程2 在 t2 查询库存,库存为 1,版本号为 1;线程3 在 t2 查询库存,库存为 1,版本号为 1。
  • 线程1 在 t3 下单,库存扣减为0,版本号为 2;
  • 线程2 和 线程3 在 t4 下单,版本号为 2,与最初查询到的版本号不同,执行失败。
# id = 10, stock = 1, version = 1
SELECT id, stock, version FROM tb_scekill_voucher;

# id = 10, stock = 0, version = 2
UDATE SET tb_seckill_voucher stock = stock - 1, version = version + 1 WHERE id = 10 AND version = 1;

CAS Compare And Set

通过以上描述发现,stock 能够替代 version 字段 —— 查询、然后更新、更新时检查其是否与最初查询的值一致。

  1. 查询获取 stock
  2. 更新 stock
  3. 若执行更新时的 stock 与最初查询到的 stock 的值不同,则更新失败。

假设库存为 1,有线程1、2、3,时刻 t1、t2、t3、t4。

  • 线程1 在 t1 查询库存,库存为 1;线程2 在 t2 查询库存,库存为 1;线程3 在 t2 查询库存,库存为 1。
  • 线程1 在 t3 下单,库存扣减为0;
  • 线程2 和 线程3 在 t4 下单,库存为 0,与最初查询到的库存不同,执行失败。
# id = 10, stock = 1
SELECT id, stock FROM tb_seckill_voucher;

# id = 10, stock = 0
UPDATE tb_seckill_voucher SET stock = stock - 1 WHERE id = 10 AND stock = 1;

6.3.2 乐观锁解决超卖问题

使用乐观锁:进行测试会发现,库存尚未不足时就会导致很多线程更新失败 —— 若有十个线程查询到的 stock 为100,只要有一个更新成功,其他全部失败。

// 4. 减扣库存
boolean isAccomplished = seckillVoucherService.update()
        // SET stock= stock - 1
        .setSql("stock = stock - 1")
        // WHERE  voucher_id = ? AND stock = ?
        .eq("voucher_id", voucherId).eq("stock",seckillVoucher.getStock())
        .update();
if (!isAccomplished) {
    return Result.fail("库存不足..");
}

**只需要让 stock > 0 即可 **

// 4. 减扣库存
boolean isAccomplished = seckillVoucherService.update()
        // SET stock= stock - 1
        .setSql("stock = stock - 1")
        // WHERE  voucher_id = ? AND stock > 0
        .eq("voucher_id", voucherId).gt("stock", 0)
        .update();
if (!isAccomplished) {
    return Result.fail("库存不足..");
}

6.4 一人一单

6.4.1 增加一人一单逻辑

一人一单

@Override
public Result seckillVoucher(Long voucherId) {
    // 1. 根据 优惠券 id 查询数据库
    SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
    
  // 2. 判断秒杀是否开始或结束(未开始或已结束,返回异常结果)
    if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
        return Result.fail("秒杀尚未开始..");
    }
    if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
        return Result.fail("秒杀已经结束..");
    }
    
  // 3. 判断库存是否充足(不充足返回异常结果)
    if (seckillVoucher.getStock() < 1) {
        return Result.fail("库存不足..");
    }
 
    // 4. 一人一单(根据 优惠券id 和 用户id 查询订单;存在,则直接返回)
Long userId = UserHolder.getUser().getId();
    Integer count = query().eq("voucher_id", voucherId).eq("user_id", userId).count();
    if (count > 0) {
        return Result.fail("不可重复下单!");
    }
  
  // 5. 减扣库存
    boolean isAccomplished = seckillVoucherService.update()
            .setSql("stock = stock - 1")
            .eq("voucher_id", voucherId).gt("stock", 0)
            .update();
    if (!isAccomplished) {
        return Result.fail("库存不足..");
    }
    
  // 6. 创建订单
    VoucherOrder voucherOrder = new VoucherOrder();
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    voucherOrder.setUserId(userId);
    voucherOrder.setVoucherId(voucherId);
    boolean isSaved = save(voucherOrder);
    if (!isSaved) {
        return Result.fail("下单失败..");
    }
    
  // 7. 返回 订单 id
    return Result.ok(orderId);
}

6.4.2 并发安全问题

存在问题:高并发的情况下,查询数据库时,都不存在订单,仍然会出现一人多单的情况,仍需加锁。乐观锁比较适合更新操作,此处的插入操作选择悲观锁。

  • 首先,初始方案是在 createVoucherOrder 方法上添加 synchronized,这样导致锁的粒度过大。

    public synchronized Result createVoucherOrder(Long voucherId) { ... }
    
  • 于是选择 “一个用户一把锁” 这样的方案。但是必须先保证 锁是同一把userId.toString() 方法锁获取到的字符串是不同的对象,底层是 new 出来的,intern() 方法是从常量池里获取数据,保证了同一个用户的 userId.toString() 值相同。

    @Transactional
    @Override
    public Result createVoucherOrder(Long voucherId) {
        Long userId = UserHolder.getUser().getId();
      synchronized(userId.toString().intern()) {
          ...
        }
    }
    
  • 此外,还需要注意一个点,我们需要将 createVoucherOrder 方法整体包裹起来,确保事务不会出问题;否则会出现 “synchronized 包裹的代码片段执行完毕,事务还未提交,但是锁已经释放了” 的情况。

    synchronized (userId.toString().intern()) {
    return createVoucherOrder(voucherId);
    }
    
  • 最后,createVoucherOrder 方法实际上是通过 this.createVoucherOrder() 的方式调用的,this 拿到的是原始对象,没有经过动态代理,事务要生效,需要使用代理对象来执行。

    synchronized (userId.toString().intern()) {
        // 获取代理对象
        VoucherOrderService currentProxy = (VoucherOrderService) AopContext.currentProxy();
        return currentProxy.createVoucherOrder(voucherId);
    }
    

终极版

@Override
public Result seckillVoucher(Long voucherId) {
    // 1. 根据 优惠券 id 查询数据库
    SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
    
  // 2. 判断秒杀是否开始或结束(未开始或已结束,返回异常结果)
    if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
        return Result.fail("秒杀尚未开始..");
    }
    if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
        return Result.fail("秒杀已经结束..");
    }
  
    // 3. 判断库存是否充足(不充足返回异常结果)
    if (seckillVoucher.getStock() < 1) {
        return Result.fail("库存不足..");
    }
  
    Long userId = UserHolder.getUser().getId();
    synchronized (userId.toString().intern()) {
        // 获取代理对象
        VoucherOrderService currentProxy = (VoucherOrderService) AopContext.currentProxy();
        return currentProxy.createVoucherOrder(voucherId);
    }
}

@Transactional
@Override
public Result createVoucherOrder(Long voucherId) {
    Long userId = UserHolder.getUser().getId();
    // 4. 一人一单(根据 优惠券id 和 用户id 查询订单;存在,则直接返回)
    Integer count = query().eq("voucher_id", voucherId).eq("user_id", userId).count();
    if (count > 0) {
        return Result.fail("不可重复下单!");
    }
  
    // 5. 减扣库存
    boolean isAccomplished = seckillVoucherService.update()
            // SET stock= stock - 1
            .setSql("stock = stock - 1")
            // WHERE  voucher_id = ? AND stock > 0
            .eq("voucher_id", voucherId).gt("stock", 0)
            .update();
    if (!isAccomplished) {
        return Result.fail("库存不足..");
    }
  
    // 6. 创建订单
    VoucherOrder voucherOrder = new VoucherOrder();
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    voucherOrder.setUserId(userId);
    voucherOrder.setVoucherId(voucherId);
    boolean isSaved = save(voucherOrder);
    if (!isSaved) {
        return Result.fail("下单失败..");
    }
  
    // 7. 返回 订单 id
    return Result.ok(orderId);
}

最终实现效果

一人一单的并发安全问题

6.4.3 集群环境下的并发问题

  1. 将服务启动两份,端口分别为 8081 和 8082:Service -> Spring Boot -> 点击 CommentApplication,按下 Command + D 或 Ctrl + D -> VM Options: -Dserver.port=8082

  2. 修改 Nginx 的 conf 目录下的 nignx.conf,配置反向代理和负载均衡,nginx -s reload Reload 一下。(访问 8080 就会进入到 Nginx,访问 /api 就会找到 http://backend,又会被转到 8081 和 8082)

    ...
    http {
        ...
    
        server {
            listen       8080;
            server_name  localhost;
            # 指定前端项目所在的位置
            location / {
                root   /opt/homebrew/var/www/hmdp;
                index  index.html index.htm;
            }
    
            error_page   500 502 503 504  /50x.html;
            location = /50x.html {
                root   html;
            }
    
            location /api {
                ...
                proxy_pass http://backend;
            }
        }
    
        upstream backend {
            server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;
            server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;
        }
    }
    

一人一单的集群环境下的并发安全问题

由于部署了多个 Tomcat,每个 Tomcat 中都有属于自己的 JVM。

  • 在 服务器A 的 Tomcat 内部,有两个线程,这两个线程使用的是同一份代码,他们的锁对象是同一个,可以实现互斥(线程1 和 线程2);
  • 在 服务器B 的 Tomcat 内部,有两个线程,这两个线程使用的是同一份代码,他们的锁对象是同一个,可以实现互斥(线程3 和 线程4);
  • 线程1/2 和 线程3/4 使用的不是同一份代码,锁对象不是同一个,于是线程1/2 与 线程3/4 之间无法实现互斥;导致 synchronized 锁失效,这种情况下就需要 分布式锁 来解决。

一人一单的集群环境下并发的安全问题

7. 分布式锁

7.1 基本原理和不同实现方式的对比

  • 分布式锁:满足分布式系统或集群模式下的多进程可见并互斥的锁。
  • 分布式锁的核心思想:所有线程都使用同一把锁,让程序串行执行。
  • 分布式锁需要满足的条件
    • 可见行:多个线程都能看到相同的结果,也就是感知到变化;
    • 互斥:分布式锁的最基本条件,为了让程序串行执行;
    • 高可用:保证程序不易崩溃;
    • 高性能:加锁本身会让性能降低,因此需要分布式锁具有较高的加锁性能和释放锁性能;
    • 安全性。

分布式锁

常见的分布式锁

  • MySQL:MySQL 本身带有锁机制,但是由于 MySQL 性能一般,所以采用分布式锁的情况下,使用 MySQL 作为分布式锁比较少见。
  • Redis:Redis 作为分布式锁比较常见,利用 setnx 方法,如果 Key 插入成功,则表示获取到锁,插入失败则表示无法获取到锁。
  • Zookeeper:Zookeeper 也是企业级开发中比较好的一个实现分布式锁的方案。
MySQL Redis Zookeeper
互斥 利用 MySQL 本身的互斥锁机制 利用 setnx 互斥命令 利用节点的唯一性和有序性
高可用
高性能 一般 一般
安全性 断开链接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开链接自动释放

7.2 Redis 分布式锁的实现思路

实现分布式锁时需要实现两个基本方法:

  • 获取锁
    • 互斥:确保只有一个线程获取到锁;
    • 非阻塞:尝试一次,成功返回 true,失败返回 false。
# 添加锁  NX 互斥 EX 设置超时时间
SET lock thread1 NX EX 10
  • 释放锁
    • 手动释放;
    • 超时释放:获取锁时添加一个超时时间。
# 释放锁,删除即可
del lock

基于 Redis 的分布式锁的实现思路

7.3 Redis 分布式锁的实现 初级版本

public interface DistributedLock {
    /**
     * 尝试获取锁
     * @param timeoutSeconds 锁的超时时间,过期后自动释放
     * @return true 代表获取锁成功;false 代表获取锁失败
     */
    boolean tryLock(long timeoutSeconds);

    /**
     * 释放锁
     */
    void unlock();
}
public class SimpleDistributedLockBasedOnRedis implements DistributedLock {
    private String name;
    private StringRedisTemplate stringRedisTemplate;

    public SimpleDistributedLockBasedOnRedis(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    private static final String KEY_PREFIX = "lock:";

    @Override
    public boolean tryLock(long timeoutSeconds) {
        String threadName = Thread.currentThread().getId();
        Boolean isSucceeded = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadName, timeoutSeconds, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(isSucceeded);
    }

    @Override
    public void unlock() {
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}
@Override
public Result seckillVoucher(Long voucherId) {
    // 1. 根据 优惠券 id 查询数据库
    SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
    // 2. 判断秒杀是否开始或结束(未开始或已结束,返回异常结果)
    if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
        return Result.fail("秒杀尚未开始..");
    }
    if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
        return Result.fail("秒杀已经结束..");
    }
    // 3. 判断库存是否充足(不充足返回异常结果)
    if (seckillVoucher.getStock() < 1) {
        return Result.fail("库存不足..");
    }
    Long userId = UserHolder.getUser().getId();
    
  // 创建锁对象并获取锁,判断是否获取锁成功
    SimpleDistributedLockBasedOnRedis lock = new SimpleDistributedLockBasedOnRedis("order:" + userId, stringRedisTemplate);
    boolean isLocked = lock.tryLock(1200);
    if (!isLocked) {
        return Result.fail("不可重复下单!");
    }
    try {
        // 获取代理对象
        VoucherOrderService currentProxy = (VoucherOrderService) AopContext.currentProxy();
        return currentProxy.createVoucherOrder(voucherId);
    } finally {
        lock.unlock();
    }
}

测试

  • 将断点打到 “判断是否获取到锁” 处,发送两次 http://localhost:8080/api/voucher-order/seckill/10 请求,第一次请求打到 8081,第二次请求打到 8082。
  • 8081 获取到的 isLocked 为 true,8082 获取到 isLocked 为 false;
  • Redis 中存储的 Key 为 lock:order:userId,Value 为 http-nio-8081-exec-1

7.4 Redis 分布式锁的误删问题

误删问题的逻辑说明

  • 线程1 获取到锁,持有锁的线程碰到了业务阻塞,业务阻塞的时间超过了该锁的超时时间,触发锁的超时释放。
  • 此时,线程2 获取到锁,执行业务;在线程2 执行业务的过程中,线程1 的业务执行完毕并且释放锁,但是释放的是线程2 的锁。
  • 之后,线程3 获取到锁,执行业务;导致此时有两个线程同时在并行执行业务。Redis 分布式锁的误删问题

解决方案:在每个线程释放锁的时候,需要判断一下当前这把锁是否属于自己,如果不属于自己,就不会进行锁的释放(删除)。

  • 线程1 获取到锁,持有锁的线程碰到了业务阻塞,业务阻塞的时间超过了该锁的超时时间,触发锁的超时释放。
  • 此时,线程2 获取到锁,执行业务;在线程2 执行业务的过程中,线程1 的业务执行完毕并且释放锁,但是此时线程1 需要判断当前这把锁是否属于自己,不属于则不会删除锁。于是线程2 一直持有这把锁直至其业务执行结束后才会释放,并且在释放的时候也需要判断当前要释放的锁是否属于自己。
  • 之后,线程3 获取到锁,执行业务。

解决Redis 分布式锁误删问题

基于 Redis 的分布式锁的实现(解决误删问题)

基于 Redis 的分布式锁的实现思路(解决误删问题)

改进 Redis 分布式锁:

  • 在获取锁的时候存入线程标识(用 UUID 表示);
  • 在释放锁时先获取锁中的线程标识,判断是否与当前的线程标识一致;
    • 一致则释放锁;
    • 不一致则不释放锁。
public class SimpleDistributedLockBasedOnRedis implements DistributedLock {
    private String name;
    private StringRedisTemplate stringRedisTemplate;

    public SimpleDistributedLockBasedOnRedis(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    private static final String KEY_PREFIX = "lock:";

    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

    /**
     * 获取锁
     */
    @Override
    public boolean tryLock(long timeoutSeconds) {
        // 线程标识
        String threadIdentifier = ID_PREFIX + Thread.currentThread().getId();
        Boolean isSucceeded = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadIdentifier, timeoutSeconds, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(isSucceeded);
    }

    /**
     * 释放锁
     */
    @Override
    public void unlock() {
        // 线程标识
        String threadIdentifier = ID_PREFIX + Thread.currentThread().getId();
        String threadIdentifierFromRedis = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        // 比较 锁中的线程标识 与 当前的线程标识 是否一致
        if (StrUtil.equals(threadIdentifier, threadIdentifierFromRedis)) {
            // 释放锁标识
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }
}

测试是否解决误删问题

启动 8081 和 8082,在获取锁(tryLock)和 释放锁(unlock)处打上断点,发送两次 http://localhost:8080/api/voucher-order/seckill/10 请求。

  1. 让 8081 先获取到锁,线程标识和锁被存入 Redis 中,然后在 Redis 中将锁删除(模拟超时);
  2. 让 8082 获取到锁,线程标识和锁被存入 Redis 中;
  3. 进入 8081 释放锁 处的断点,8081 最初存入 Redis 中的线程标识为 uuid...-http-nio-8081-exec-1,而当前 Redis 中存储的线程标识为 uuid...--http-nio-8082-exec-1;8081 无法释放锁。
  4. 8082 正常执行完毕后释放锁。

7.5 Lua 脚本解决原子性问题

分布式锁的原子性问题

  • 线程1 执行业务并且判断 “当前 Redis 中的线程标识 与 获取锁时存入 Redis 的线程标识” 一致后,执行 释放锁操作 时出现阻塞,导致锁并为释放。在阻塞的过程中,又因为超时原因导致锁的释放。
  • 此时 线程2 获取到锁,并且执行业务,执行业务的过程锁被中线程 1 释放。
  • 于是 线程3 也能够获取到锁,并且执行业务。最终,又一次导致此时有两个线程同时在并行执行业务。

因此,需要保证 “判断线程标识的一致性 与 释放锁” 操作的原子性。

分布式锁的原子性问题

Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保 Redis 多条命令执行时的原子性。

Redis 提供的调用函数语法格式redis.call('命令名称', 'key', '其他参数', ...)

# 执行 SET name Michael
redis.call('set', 'name', 'Michael')


# 先执行 SET name Annabelle,再执行 GET name
# 先执行 SET name Annabelle
redis.call('set', 'name', 'Annabelle')
# 再执行 GET name
local name = redis.call('get', 'name')
# 返回
return name

编写完脚本后,需要使用 Redis 命令来调用脚本:EVAL script numkeys key [key ...] arg [arg ...]

  • 执行 redis.call('set', 'name', 'Michael')

    # 双引号中间的值为 脚本;后面的 0 代表的是 脚本需要的 Key 类型的参数个数
    127.0.0.1:6379> EVAL "return redis.call('set', 'name', 'Michael')" 0
    OK
    127.0.0.1:6379> get name
    "Michael"
    
  • Key 和 Value 可以作为参数传递,Key 类型的参数会放入 KEYS 数组,其他参数会放入 ARGV 数组,可以通过 KEYS 和 ARGV 获取参数。

    # name ==> KEYS[1] 、Annabelle ==> ARGV[1]  (Lua 的数组下标从 1 开始)
    127.0.0.1:6379> EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Annabelle
    OK
    127.0.0.1:6379> get name
    "Annabelle"
    

Lua 脚本的编写

  1. 获取 Redis 中的线程标识;
  2. 判断是否与最初存入的线程标识一致;
  3. 如果一致则释放锁(删除);
  4. 如果不一致则什么都不做。
-- 锁的 Key
-- local key = "lock:order:10"
-- local key = KEYS[1]

-- 最初存入 Redis 中的线程标识
-- local threadIdentifier = "uuid-http-nio-8081-exec-1"
-- local threadIdentifier = ARGV[1]

-- 锁中的线程标识
local threadIdentifierFromRedis = redis.call('get', KEYS[1])

-- 比较 最初存入 Redis 中的线程标识 与 目前 Redis 中存储的线程标识 是否一致
if (threadIdentifierFromRedis == ARGV[1]) then
    -- 一致,则释放锁 del key
    return redis.call('del', KEYS[1])
end
-- 若不一致,则返回 0
return 0
-- 终极版
if (redis.call('get', KEYS[1]) == ARGV[1]) then
  return redis.call('del', KEYS[1])
end
return 0

通过 RedisTemplate 中的 execute() 方法调用 Lua 脚本

public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {
return scriptExecutor.execute(script, keys, args);
}
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("Unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
  
/**
 * 释放锁
 */
@Override
public void unlock() {
    // 调用 Lua 脚本
    stringRedisTemplate.execute(
            UNLOCK_SCRIPT,  // SCRIPT
            Collections.singletonList(KEY_PREFIX + name),   // KEY[1]
            ID_PREFIX + Thread.currentThread().getId()    // ARGV[1]
    );
}

测试

  • 在 “获取锁处 ” 和 “释放锁处” 打上断点,分别发送两次 http://localhost:8080/api/voucher-order/seckill/10 请求,第一次请求打到 8081,第二次请求打到 8082。
  • 8081 获取到锁后,在 Redis-CLI 中手动删除该锁;
  • 8082 获取到锁,然后 8081 执行到 unlock(),利用 Lua 脚本删除锁,但是不会误删 8082 的锁;8082 执行到 unlock(),利用 Lua 脚本删除锁,只会删除自己的锁。

8. 分布式锁 Redisson

基于 setnx 实现的分布式锁存在以下问题

  • 重入问题:获得锁的线程可以再次进入到相同的锁的代码中,可重入锁的意义在于 防止死锁synchronizedLock 锁都是可重入的。
    • 例如:HashTable 中的代码都是使用 synchronized 修饰的,假设在 方法A 中调用 方法B。
    • 在方法A 中,需要先获取锁,执行业务、调用方法B;而方法B 中,又需要获取同一把锁。
    • 此时如果是不可重入锁,调用方法B 时无法获取锁,就会等待锁的释放,而锁无法释放,因为 方法A 还没有执行完毕,造成死锁。
  • 不可重试:目前的分布式只能尝试一次,合理的情况应该是:线程在获取锁失败后,应该能够再次尝试获取锁。
  • 超时释放:加锁时增加超时时间,并且采用 Lua 脚本防止误删;但是如果锁住的时间太长导致其他线程都在等待,或者锁住的时间太短导致业务未执行完毕锁就释放等隐患。
  • 主从一致性:若 Redis 提供了主从集群,向集群写数据时,主机需要异步的将数据同步给从机,若在同步之前主机宕机,就会出现死锁。

基于 setnx 实现的分布式锁存在的问题

8.1 Redisson 快速入门

Redisson 是一个在 Redis 基础上实现的分布式工具集合。

  1. 引入依赖;

    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.16.8</version>
    </dependency>
    
  2. 配置 Redisson 客户端;

    @Configuration
    public class RedisConfiguration {
        @Bean
        public RedissonClient redissonClient() {
            // 配置类
            Config config = new Config();
            // 添加 Redis 地址:此处是单节点地址,也可以通过 config.useClusterServers() 添加集群地址
            config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("root");
            // 创建客户端
            return Redisson.create(config);
        }
    }
    
  3. 使用 Redisson 的分布式锁。

    • 按照名称返回 Lock 实例:RLock lock = redissonClient.getLock(name);
    • 尝试获取锁:boolean isLocked = lock.tryLock(1, 10, TimeUnit.SECONDS);
      • 获取锁失败,失败后的最大等待时间,期间会重试:默认为 -1,即不等待;
      • 锁的自动施放时间:30;
      • 时间单位:秒。
    @Resource
    private RedissonClient redissonClient;
    
    @Test
    void testRedisson() throws InterruptedException {
        RLock lock = redissonClient.getLock("anyLock");
        boolean isLocked = lock.tryLock(1,0 10, TimeUnit.SECONDS);
        // 判断是否获取成功
        if (isLocked) {
            try {
                log.debug("执行业务~");
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    }
    
    
    // 创建锁对象并获取锁,判断是否获取锁成功
    // SimpleDistributedLockBasedOnRedis lock = new SimpleDistributedLockBasedOnRedis("order:" + userId, stringRedisTemplate);
    RLock lock = redissonClient.getLock("lock:order" + userId);
    
    // 获取锁
    // boolean isLocked = lock.tryLock(1200);
    boolean isLocked = lock.tryLock();
    

8.2 可重入锁原理

@Slf4j
@SpringBootTest
public class RedissonTest {

    @Resource
    private RedissonClient redissonClient;

    private RLock lock;

    @BeforeEach     // 创建 Lock 实例(可重入)
    void setUp() {
        lock = redissonClient.getLock("anyLock");
    }

    @Test
    void methodOne() throws InterruptedException {
        boolean isLocked = lock.tryLock();
        if (!isLocked) {
            log.error("Fail To Get Lock~");
            return;
        }
        try {
            log.info("Get Lock Successfully~");
            methodTwo();
        } finally {
            log.info("Release Lock~");
            lock.unlock();
        }
    }

    @Test
    void methodTwo() throws InterruptedException {
        boolean isLocked = lock.tryLock();
        if (!isLocked) {
            log.error("Fail To Get Lock!");
            return;
        }
        try {
            log.info("Get Lock Successfully!");
        } finally {
            log.info("Release Lock!");
            lock.unlock();
        }
    }
}

使用之前的分布式锁实现思路执行上述代码

methodOne() 中获取到锁后调用 methodTwo(),在 methodTwo() 中无法获取到锁:因为在该图中的 尝试获取锁 采用的方法是 SET key value NX EX 10

基于 Redis 的分布式锁的实现思路(解决误删问题)

可重入锁的实现思路

  • 在 Lock 锁中,借助于一个 state 变量来记录重入的状态,如果当前没有人持有该把锁,state = 0;若有人持有该把锁,state = 1;如果持有该把锁的人再次持有这把锁,state + 1
  • 对于 synchronized 而言,底层 C 语言代码中有一个 count,与 state 原理类似,重入一次加一,释放一次减一,直至为 0,表示当前这把锁无人持有。
  • 释放锁(删除)的时机:state 为 0。
  • 采用 Hash 结构存储锁:Key 中存储锁名称、Field 中存储线程标识、Value 中存储重入数,即 state

使用 可重入锁 执行上述代码:

  1. methodOne() 中获取到锁后 state + 1 ==> state = 1;调用 methodTwo(),在 methodTwo() 中获取到锁后再次 state + 1 ===> state = 2
  2. methodTwo() 中执行业务后释放锁 state - 1 ===> state = 1methodOne() 中执行业务后 state - 1 ===> state = 0,此时 Redis 中的锁已经被删除。
    可重入锁的实现思路

Redisson 底层使用 Lua 脚本实现上述步骤

local key = KEYS[1];    -- 锁名称
local threadIdentifier = ARGV[1];   -- 线程唯一标识
local releaseTime = ARGV[2];    -- 锁的自动施放时间

-- 锁不存在:获取锁并且添加线程标识 + 设置有效期
if (redis.call('EXISTS', key) == 0) then
    redis.call('HSET', key, threadIdentifier, 1);
    redis.call('EXPIRE', key, releaseTime);
    return 1;
end;

-- 锁存在,线程标识是自己:重入次数加1 + 设置有效期
if (redis.call('HEXISTS', key, threadIdentifier) == 1) then
    redis.call('HINCRBY', key, threadIdentifier, 1);
    redis.call('EXPIRE', key, releaseTime);
    return 1;
end;

-- 锁存在,线程标识不是自己,获取锁失败
return 0;
local key = KEYS[1];    -- 锁名称
local threadIdentifier = ARGV[1];   -- 线程唯一标识
local releaseTime = ARGV[2];    -- 锁的自动施放时间

-- 当前锁不是自己持有:直接返回
if (redis.call('HEXISTS', key, threadIdentifier) == 0) then
    return nil;
end;

-- 当前锁是自己持有的,重入数减1
local count = redis.call('HINCRBY', key, threadIdentifier, -1);

-- 重入数大于0:不能释放锁,重置有效期后返回
if (count > 0) then
    redis.call('EXPIRE', key, releaseTime);
    return nil;
else
    -- 重入数等于0:可以释放锁,直接删除
    redis.call('DEL', key);
    return nil;
end;

8.3 可重试 和 超时释放(WatchDog)

锁重试

Redisson#tryLock 锁重试

WatchDog

对抢锁过程进行监听,抢锁完毕后,scheduleExpirationRenewal(threadId) 方法会被调用来对锁的过期时间进行续约,在后台开启一个线程,进行续约逻辑,也就是看门狗线程。

// 续约逻辑
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {... }, 锁失效时间 / 3, TimeUnit.MILLISECONDS);

Method(new TimerTask(){}, 参数2, 参数3)

通过参数2、参数3 去描述,什么时候做参数1 的事情。

  • 锁的失效时间为 30s,10s 后这个 TimerTask 就会被触发,于是进行续约,将其续约为 30s;
  • 若操作成功,则递归调用自己,重新设置一个 TimerTask 并且在 10s 后触发;循环往复,不停的续约。

8.4 Redisson 分布式锁原理

  • 可重入:利用 Hash 结构记录线程标识和重入次数。
  • 可重试:利用信号量 和 PubSub 功能实现等待、唤醒、获取锁失败的重试机制。
  • 超时续约:利用 WatchDog,每隔一段时间(releaseTime / 3),重置超时时间。
    Redisson 分布式锁原理

8.5 MultiLock 原理

  • 为了提高 Redis 的可用性,搭建一个主从集群,向集群写数据时,主机需要将数据同步给从机。
  • 假设在主机还没来得及将数据写入到从机时,此时主机宕机,哨兵就会发现主机宕机,并且选举一个 Slave 成为一个 Master,而此时新的 Master 中并没有刚刚写入的数据,导致数据丢失。

为了解决上述问题,Redisson 提出了 MutiLock 锁;即每个节点的地位都是一样的,写入数据时需要将其写入到每一个主从节点上,所有的服务器写入成功才算写入成功。

假设某个主节点挂了,并且未完成主从同步;从节点会变成新的主节点,此时若想要获取写入的数据时,只要有一个节点拿不到都不算写入数据成功,保证了数据写入的可靠性。

Redisson 分布式锁主从一致性问题

当设置了多个锁时,Redisson 会将多个锁添加到一个集合中,然后用 While 循环不停的尝试拿锁,会有一个总共的加锁时间(锁的个数 * 1500ms)。

假设有 3 个锁,时间就是 4500ms,在这 4500ms 内,所有的锁都加锁成功,才算是加锁成功,如果在 4500ms 有线程加锁失败,则会再次去进行重试。

MultiLock 的实现

  1. 搭建三个 Redis 节点;

  2. 配置 Redis 客户端;

    @Configuration
    public class RedisConfiguration {
        @Bean
        public RedissonClient redissonClient() {
            // 配置类
            Config config = new Config();
            // 添加 Redis 地址:此处是单节点地址,也可以通过 config.useClusterServers() 添加集群地址
            config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("root");
            // 创建客户端
            return Redisson.create(config);
        }
    
        @Bean
        public RedissonClient redissonClientTwo() {
            Config config = new Config();
            config.useSingleServer().setAddress("redis://127.0.0.1:6380").setPassword("root");
            return Redisson.create(config);
        }
    
        @Bean
        public RedissonClient redissonClientThree() {
            Config config = new Config();
            config.useSingleServer().setAddress("redis://127.0.0.1:6381").setPassword("root");
            return Redisson.create(config);
        }
    }
    
  3. 创建联锁并测试;

    @Slf4j
    @SpringBootTest
    public class RedissonTest {
    
        @Resource
        private RedissonClient redissonClient;
    
        @Resource
        private RedissonClient redissonClientTwo;
    
        @Resource
        private RedissonClient redissonClientThree;
    
        RLock multiLock;
    
        @BeforeEach
        void setUp() {
            RLock lock = redissonClient.getLock("anyLock");
            RLock lockTwo = redissonClientTwo.getLock("anyLock");
            RLock lockThree = redissonClientThree.getLock("anyLock");
            // 创建联锁 MultiLock
            RLock multiLock = redissonClient.getMultiLock(lock, lockTwo, lockThree);
        }
    
        @Test
        void methodOne() throws InterruptedException {
            boolean isLocked = multiLock.tryLock(1L, TimeUnit.SECONDS);
            if (!isLocked) {
                log.error("Fail To Get Lock~");
                return;
            }
            try {
                log.info("Get Lock Successfully~");
                methodTwo();
            } finally {
                log.info("Release Lock~");
                multiLock.unlock();
            }
        }
    
        @Test
        void methodTwo() throws InterruptedException {
            boolean isLocked = multiLock.tryLock(1L, TimeUnit.SECONDS);
            if (!isLocked) {
                log.error("Fail To Get Lock!");
                return;
            }
            try {
                log.info("Get Lock Successfully!");
            } finally {
                log.info("Release Lock!");
                multiLock.unlock();
            }
        }
    }
    
  4. 测试结果。

    1. 执行到 methodOne 中的 tryLock 处,Redis 集群中的三个节点中都存储了 Hash 结构的锁数据,并且 state = 1
    2. 执行到 methodTwo 中的 tryLock 处,Redis 集群中的三个节点中都存储的 Hash 结构的锁数据,并且 state = 2
    3. 执行到 methodTwo 中的 unlock 处,Redis 集群中的三个节点中都存储的 Hash 结构的锁数据,并且 state = 1
    4. 执行到 methodOne 中的 unlock 处,Redis 集群中的三个节点中的锁数据都被删除了。

总结

  1. 不可重入的 Redis 分布式锁
    • 原理:利用 setnx 的互斥性、ex 避免死锁、释放锁时判断线程标识避免误删。
    • 缺陷:不可重入、无法重试、锁超时有自动释放的风险。
  2. 可重入的 Redis 分布式锁
    • 原理:利用 Hash 结构,记录线程标识和重入次数、利用 WatchDog 延续锁的超时时间、利用信号量控制锁重试等待。
    • 缺陷:Redis 宕机引起锁失效问题。
  3. Redisson 的 MultiLock
    • 原理:多个独立的 Redis 节点,必须在所有节点都获取重入锁,才算获取所成功。
    • 缺陷:运维成本高、实现复杂。

9. 优化秒杀

用户发送请求,请求到 Nginx,Nginx 访问 Tomcat,Tomcat 中的程序串行执行:

  1. 查询优惠券;
  2. 判断库存是否充足;
  3. 查询订单;
  4. 校验一人一单;
  5. 减库存;
  6. 创建订单。

以上操作都是串行执行的,并且 1、3、5、6 的操作都需要与数据库进行交互,从而导致程序执行的很慢。

秒杀优化方案

将耗时较短的逻辑判断放到 Redis 中,比如 2、4 中的操作,只要这样的逻辑能够完成,意味着一定能够完成下单,只需要进行快速的逻辑判断,无需等待下单逻辑全部走完即可返回成功;再在后台开一个线程,后台线程负责慢慢的执行 Queue 中的消息。

秒杀优化方案

秒杀优化的实现思路

  1. 新增优惠券的同时,将优惠券信息保存到 Redis 中;
  2. 基于 Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功,如果 Lua 执行返回 0,则有购买资格;
    • 用户下单后,判断库存是否充足,只需要在 Redis 中根据 Key 去找到对应的 Value 是否大于 0 即可。
    • 若不充足,直接结束;若充足,则继续在 Redis 中判断用户是否可以下单,如果 Set 集合中不存在这个 Value(用户 ID),说明该用户可以下单。
  3. 如果有购买资格,将订单信息存入阻塞队列,并且返回 订单 ID(此时已经秒杀业务已经结束,何时进行异步下单操作数据库不再重要);
  4. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单。

秒杀优化方案的实现思路

秒杀优化的代码实现

添加秒杀券的同时将优惠券库存保存到 Redis 中。

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
    // 保存优惠券
    save(voucher);
    // 保存秒杀信息
    SeckillVoucher seckillVoucher = new SeckillVoucher();
    seckillVoucher.setVoucherId(voucher.getId());
    seckillVoucher.setStock(voucher.getStock());
    seckillVoucher.setBeginTime(voucher.getBeginTime());
    seckillVoucher.setEndTime(voucher.getEndTime());
    seckillVoucherService.save(seckillVoucher);
  
    // 将秒杀优惠券的库存保存到 Redis 中
    stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

基于 Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功。

local voucherId = ARGV[1]
local userId = ARGV[2]

local stockKey = "seckill:stock:" .. voucherId
local orderKey = "seckill:order:" .. voucherId

-- 判断库存是否充足(不足,返回 1)
if (tonumber(redis.call('GET', stockKey)) <= 0) then
    return 1;
end;

-- 判断用户是否下单(重复下单,返回 2)
if (redis.call('SISMEMBER', orderKey, userId) == 1) then
    return 2;
end;

-- 下单成功:扣减库存、保存用户。
redis.call('INCRBY', stockKey, -1);
redis.call('SADD', orderKey, userId);
return 0;

判断是否有购买资格,如果有购买资格,将订单信息存入阻塞队列,并且返回 订单 ID。

开启线程任务,不断从阻塞队列中获取信息,实现异步下单。

public interface VoucherOrderService extends IService<VoucherOrder> {

  Result seckillVoucher(Long voucherId);

    void createVoucherOrder(VoucherOrder voucherOrder);
}
@Service
@SuppressWarnings("ALL")
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements VoucherOrderService {

    @Resource
    private SeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    // Lua 脚本
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("SeckillVoucher.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    // 异步处理线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    // 在当前类初始完毕后执行 VoucherOrderHandler 中的 run 方法
    @PostConstruct
    public void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    // 阻塞队列:当一个线程尝试从队列中获取元素时:若队列中没有元素线程就会被阻塞,直到队列中有元素时线程才会被唤醒并且去获取元素。
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

    // 从队列中获取信息
    public class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    // 获取队列中的订单信息
                    VoucherOrder voucherOrder = orderTasks.take();
                    // 创建订单
                    handleVoucherOrder(voucherOrder);
                } catch (Exception e) {
                    log.error("订单处理异常", e);
                }
            }
        }

        private void handleVoucherOrder(VoucherOrder voucherOrder) {
            Long userId = voucherOrder.getUserId();
            RLock lock = redissonClient.getLock("lock:order:" + userId);
            boolean isLocked = lock.tryLock();
            if (!isLocked) {
                log.error("不允许重复下单!");
                return;
            }
            try {
                // 该方法非主线程调用,代理对象需要在主线程中获取。
                currentProxy.createVoucherOrder(voucherOrder);
            } finally {
                lock.unlock();
            }
        }
    }

    // 代理对象
    private VoucherOrderService currentProxy;

    @Override
    public Result seckillVoucher(Long voucherId) {
        // 1. 执行 Lua 脚本
        Long userId = UserHolder.getUser().getId();
        long orderId = redisIdWorker.nextId("order");
        Long executeResult = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString()
        );

        // 2. Lua 脚本的执行结果不为 0 则没有购买资格
        int result = executeResult.intValue();
        if (result != 0) {
            return Result.fail(result == 1 ? "库存不足!" : "请勿重复下单!");
        }

        // 3. 将下单信息保存到阻塞队列中
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);
        orderTasks.add(voucherOrder);

        // 4. 获取代理对象
        currentProxy = (VoucherOrderService) AopContext.currentProxy();

        // 5. 返回订单号(告诉用户下单成功,业务结束;执行异步下单操作数据库)
        return Result.ok(orderId);
    }

    @Transactional
    @Override
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        // 1. 一人一单
        Integer count = query()
                .eq("voucher_id", voucherOrder.getVoucherId())
                .eq("user_id", userId)
                .count();
        if (count > 0) {
            log.error("不可重复下单!");
            return;
        }

        // 2. 减扣库存
        boolean isAccomplished = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
                .update();
        if (!isAccomplished) {
            log.error("库存不足!");
            return;
        }

        // 3. 下单
        boolean isSaved = save(voucherOrder);
        if (!isSaved) {
            log.error("下单失败!");
            return;
        }
    }
}

10. Redis 消息队列

消息队列 Message Queue :存放消息的队列。最简单的消息队列模型包括 3 个角色:

  • 消息队列:存储和管理消息,也被称为 消息代理(Message Broker);
  • 生产者:发送消息到消息队列;
  • 消费者:从消息队列中获取消息并处理消息。

Redis 提供了三种不同的方式实现消息队列

  • List 结构:基于 List 结构模拟消息队列;
  • PubSub:基本的点对点消息模型;
  • Stream:比较完善的消息队列模型。

10.1 基于 List 结构的消息队列

Redis 的 List 数据结构是一个双向链表,很容易模拟出队列的效果。可以通过 LPUSH + RPOP 或者 RPUSH + LPOP 实现。

基于 List 结构的消息队列模型

# 生产者使用 LPUSH 发布消息
127.0.0.1:6379> LPUSH queue msg1
(integer) 1
127.0.0.1:6379> LPUSH queue msg2
(integer) 2


# 消费者使用 RPOP 拉取消息
127.0.0.1:6379> RPOP queue
"msg1"
127.0.0.1:6379> RPOP queue
"msg2"

在编写消费逻辑时,需要不断地从队列中拉取消息进行处理,因此一般是一个 死循环

当队列中没有消息的时候,消费者执行 RPOPLPOP 时会返回 NULL,并不像 JVM 的阻塞队列那样会阻塞并等待消息。此时消费者仍会不断的拉取消息,从而造成 CPU空转

127.0.0.1:6379> RPOP queue
(nil)
// 伪代码
while(true) {
  msg = redis.pop("queue");
  // 如果没有消息,继续循环
  if (msg = null) {
      countinue;
    }
  // 处理消息
 handleMsg(msg);
}

当队列为空时,休眠 一会再尝试拉取消息:该方式能够解决 CPU空转 问题,但是又面临新的问题;当消费者在休眠等待期间有新消息来了,消费者处理新消息就会存在 延迟。

while(true) {
  msg = redis.pop("queue");
  // 如果没有消息,休眠 2s
  if (msg = null) {
      Thread.sleep(2000);
      countine;
    }
  // 处理消息
 handleMsg(msg);
}

Redis 提供了 阻塞式拉取消息 的命令:BRPOP / BLPOP。(B means Block)

该方式还支持传入一个超时时间:0 表示不设置超时,直到有新消息才返回;否则会在指定的超时时间后返回 NULL

基于 List 结构的消息队列模型(阻塞等待)

// 伪代码
while(true) {
  msg = redis.bpop("queue");
  // 如果没有消息,继续循环
  if (msg = null) {
      countine;
    }
  // 处理消息
 handleMsg(msg);
}
  • 优点
    • 利用 Redis 存储,不受限于 JVM 的内存上限;
    • 基于 Redis 的持久化机制,数据安全性有保证;
    • 可以满足有序性(先进先出)。
  • 缺点
    • 不支持重复消费,只支持单消费者:消费者拉取消息后,这条消息就从 List 中删除了,无法被其他消费者再次消费;
    • 无法避免消息丢失:消费者拉取到消息后,如果发生异常宕机,该条消息就丢失了;

10.2 基于 PubSub 的消息队列

PubSub(发布订阅)是 Redis2.0 引入的消息传递模型,该方法解决了 重复消费 问题;消费者可以订阅一个或多个 channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息。

Redis 提供了 PUBLISH / SUBSCRIBE 命令完成发布和订阅操作:

  • PUBLISH channel msg :向一个频道发送消息;
  • SUBSCRIBE channel [channel] :订阅一个或多个频道;
  • PSUBSCRIBE pattern [pattern] :订阅与 pattern 格式匹配的所有频道。
    • pattern
      • ? :匹配一个字符,h?llo -> hello or hxllo
      • * :匹配多个字符,h*llo -> hello or heeello
      • [ae] :匹配括号内存在的字符,h[ae]llo -> hallo and hello, not the hillo or others

开启两个消费者,同时消费同一批数据。

Pub:Sub01

  1. 使用 SUBSCRIBE 命令,启动两个消费者并且订阅同一个队列;此时两个消费者都会被堵塞住,等待新消息的到来。

    127.0.0.1:6379> SUBSCRIBE queue
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "queue"
    3) (integer) 1
    
  2. 使用 PUBLISH 命令启动一个生产者,发布一条消息。

    127.0.0.1:6379> PUBLISH queue msg1
    (integer) 1
    
  3. 两个消费者解除堵塞,收到生产者发送的新消息。

    127.0.0.1:6379> SUBSCRIBE queue
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "queue"
    3) (integer) 1
    1) "message"
    2) "queue"
    3) "msg1"
    

消费者使用 PSUBSCRIBE 命令 订阅 queue.* 相关的队列信息,之后生产者分别向 queue.p1queue.p2 发布消息。

Pub:Sub02

127.0.0.1:6379> PUBLISH queue.p1 msg1
(integer) 1
127.0.0.1:6379> PUBLISH queue.p2 msg2
(integer) 1
127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "queue.*"
3) (integer) 1

# 来自 queue.p1 的消息
1) "pmessage"
2) "queue.*"
3) "queue.p1"
4) "msg1"

1) "pmessage"
2) "queue.*"
3) "queue.p2"
4) "msg2"
  • Pub/Sub 的最大优点 :支持 多组生产者、消费者处理消息
  • Pub/Sub 的最大缺点 :丢数据。
    • 消费者下线、Redis 宕机、消息堆积 都会导致数据丢失。

Pub/Sub 的实现十分简单,没有基于任何数据结构,也没有任何的数据存储,只是单纯的为生产者和消费者建立 数据转发通道,将符合规则的数据,从一端发到另一端。

一个完整的发布、订阅消息处理流程

  1. 消费者订阅指定队列,Redis 就会记录一个映射关系 —— 队列 — 消费者
  2. 生产者向这个队列发布消息,从 Redis 的映射关系中找出对应的消费者,将消息转发给消费者。

注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失

Pub:Sub03

10.3 基于 Stream 的消息队列

Stream 是 Redis5.0 引入的一种新的 数据类型,可以实现一个功能非常完善的消息队列。

Stream 通过 XADD(发布消息) 和 XREAD(读取消息) 完成最简单的生产、消费模型。

发送消息的命令: XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

  • key :队列名称;
  • [NOMKSTREAM] :若队列不存在,是否自动创建队列,默认自动创建(不用管);
  • [MAXLEN|MINID [=|~] threshold [LIMIT count]] :设置消息队列的最大消息数量(不用管);
  • *|ID :消息的唯一 ID,* 代表由 Redis 自动生成,格式是 时间戳-递增数字,例如:1666161469358-0
  • field value [field value ...] :发送到队列中的消息,称为 Entry。格式为多个 Key-Value 键值对。
# 创建名为 users 的队列并向该队列送一个消息,ID 由 Redis 自动生成;内容为: { name: Jack, age: 21}
XADD users * name Jack age 21

127.0.0.1:6379> XADD users * name Jack
"1666169070359-0"
127.0.0.1:6379> XADD users * name Rose
"1666169072899-0"

读取消息的方式之一:XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

  • [COUNT count] :每次读取消息的最大数量;
  • [BLOCK milliseconds] :当没有消息时,是否阻塞 和 阻塞时长;
  • STREAMS key [key ...] :从哪个队列读取消息,Key 就是队列名;
  • ID [ID ...] :起始ID,只返回大于该 ID 的消息;0 代表从第一个消息开始,$ 代表从最新的消息开始。
127.0.0.1:6379> XREAD COUNT 1 STREAMS users 0
1) 1) "queue"
   2) 1) 1) "1666169070359-0"
         2) 1) "name"
            2) "Jack"
127.0.0.1:6379> XREAD COUNT 2 STREAMS users 0
1) 1) "queue"
   2) 1) 1) "1666169070359-0"
         2) 1) "name"
            2) "Jack"
      2) 1) "1666169072899-0"
         2) 1) "name"
            2) "Rose"

阻塞读取最新消息XREAD COUNT 1 BLOCK STREAMS queue $

// 业务开发中可以循环调用 XREAD 的阻塞读取方式查询最新消息,从而实现持续监听队列的效果(伪代码)
while(true) {
  // 尝试获取队列中的最新消息,最多阻塞 2s
  Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS queue $");
  // 2s 内未获取到消息,继续循环
  if(msg == null) {
      continue;
    }
  handleMessage(msg);
}

注意:指定起始 ID 为 $ 时,代表读取最新消息,如果处理一条消息的过程中,又有超过一条以上的消息到达队列,则下次获取时也只能获取到最新的一条,出现 漏读 问题。

STREAM 类型消息队列的 XREAD 命令的特点

  • 永久保存在队列中,消息可回溯;
  • 一个消息可以被多个消费者读取;
  • 可以阻塞读取;
  • 有消息漏读风险。

10.3.1 Stream 的 消费者组模式

消费者组(Consumer Group)将多个消费者划分到一个组中,监听同一个队列。具备以下特点:

  • 消息分流:队列中的 消息会分流给组内不同的消费者,而不是重复消费,从而加快消息处理的速度。

  • 消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,即使消费者宕机重启,还会从标示之后读取消息,确保每一个消息都会被消费。(解决漏读问题)

  • 消息确认:消费者获取消息后,消息处于 pending 状态,并存入一个 pending-list。当处理完成后需要通过 XACK 命令来确认消息,标记消息为已处理,才会从 pending-list 中移除。(解决消息丢失问题)

  • 创建消费组XGROUP CREATE key groupName ID [MKSTREAM]

    • key :队列名称;

    • groupName :消费组名称;

    • ID :起始 ID 标示,$ 代表队列中的最后一个消息,0 代表队列中的第一个消息;

    • [MKSTREAM] :队列中不存在时自动创建队列。

  • 其他命令

    • 删除指定的消费组XGROUP DESTROY key groupName
    • 给指定的消费组添加消费者XGROUP CREATECONSUMER key groupName consumerName
    • 删除消费组中的指定消费者XGROUP DELCONSUMER key groupName consumerName
  • 从消费者组中读取消息XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

    • group:消费组名称;
    • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者;
    • count:本次查询的最大数量;
    • BLOCK milliseconds:当没有消息时的最长等待时间;
    • NOACK:无需手动 ACK,获取到消息后自动确认;
    • STREAMS key:指定队列名称;
    • ID:获取消息的起始ID:
      • >:从下一个未消费的消息开始;
      • 其他:根据 ID 从 pending-list 中获取已消费但未确认的消息;例如0,从 pending-list 中的第一个消息开始。
    # 发送消息到队列
    127.0.0.1:6379> XADD queue * name Jack
    "1666172276809-0"
    127.0.0.1:6379> xadd queue * name Rose
    "1666172286673-0"
    
    # 读取队列中的消息
    127.0.0.1:6379> XREAD COUNT 2 STREAMS queue 0
    1) 1) "queue"
       2) 1) 1) "1666172276809-0"
             2) 1) "name"
                2) "Jack"
          2) 1) "1666172286673-0"
             2) 1) "name"
                2) "Rose"
    
    # 创建消费者组
    127.0.0.1:6379> XGROUP CREATE queue queueGroup 0
    OK
    
    # 从消费者组中读取消息
    # 监听 queue 队列:消费者组为 queueGroup、消费者为 consumerOne(若不存在则自动创建)、每次读取 1 条消息、阻塞时间为 2s、从下一个未消费消息开始。
    127.0.0.1:6379> XREADGROUP GROUP queueGroup consumerOne COUNT 1 BLOCK 2000 STREAMS queue >
    1) 1) "queue"
       2) 1) 1) "1666172276809-0"
             2) 1) "name"
                2) "Jack"
    
    # 消费者为 consumerTwo
    127.0.0.1:6379> XREADGROUP GROUP queueGroup consumerTwo COUNT 1 BLOCK 2000 STREAMS queue >
    1) 1) "queue"
       2) 1) 1) "1666172286673-0"
             2) 1) "name"
                2) "Rose"
    
    # 消费者为 consumerThree
    127.0.0.1:6379> XREADGROUP GROUP queueGroup consumerThree COUNT 1 BLOCK 2000 STREAMS queue >
    (nil)
    (2.04s)
    
  • 消费者获取到消息后,消息处于 pending 状态,将 pending 状态的消息标记为已处理并且从 pending-list 中删除(命令的返回值是成功确认的消息数):XACK key group ID [ID ...]

    127.0.0.1:6379> XACK queue queueGroup 1666172276809-0 1666172286673-0
    (integer) 2
    

STREAM 类型消息队列的 XREADGROUP 命令的特点

  • 永久保存在队列中,消息可回溯;
  • 多消费者争抢消息,加快读取速度;
  • 可以阻塞读取;
  • 没有消息漏读风险;
  • 有消息确认机制,能够保证消息至少被消费一次。
List PubSub Stream
消息持久化 支持 不支持 支持
阻塞读取 支持 支持 支持
消息堆积处理 受限于内存空间,可以利用多消费者加快处理 受限于消费者缓冲区 受限于队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制 不支持 不支持 支持
消息回溯 不支持 不支持 支持

消费者监听消息的基本思路(伪代码)

while (true) {
    // 监听 queue 队列:消费者组为 queueGroup、消费者为 consumerOne(若不存在则自动创建)、每次读取 1 条消息、阻塞时间为 2s、从下一个未消费消息开始。
    Object msg = redis.call("XREADGROUP GROUP queueGroup consumerOne COUNT 1 BLOCK 2000 STREAMS queue >");
    if (msg == null) {  // null 说明没有消息,继续下一次循环
        continue;
    }
    try {
        // 处理消息(处理完后必须 XACK)
        HandleMessage(msg);
    } catch (Exception e) {
        while (true) {
            // 监听 queue 队列:消费者组为 queueGroup、消费者为 consumerOne(不存在则自动创建)、每次读取 1 条消息、从 pending-list 中的第一个消息开始。
            Object msg = redis.call("XREADGROUP GROUP queueGroup consumerOne COUNT 1 STREAMS queue 0");
            if (msg == null) {  // null 说明没有异常,所有消息都已确认,结束循环
                break;
            }
            try {
                // 处理消息(处理完后必须 XACK)
                HandleMessage(msg);
            } catch (Exception e) {
                // 再次出现异常,继续循环
                continue;
            }
        }
    }
}

10.4基于 Stream 消息队列实现异步秒杀

创建一个 Stream 类型的消息队列,名为 stream.orders

127.0.0.1:6379> XGROUP CREATE stream.orders orderGroup 0 MKSTREAM
OK

修改秒杀下单的 Lua 脚本,在认定有抢购资格后,直接向 stream.orders 中添加消息,内容包括 voucherIduserIdorderId

local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]

local stockKey = "seckill:stock:" .. voucherId
local orderKey = "seckill:order:" .. voucherId

-- 判断库存是否充足(不足,返回 1)
if (tonumber(redis.call('GET', stockKey)) <= 0) then
    return 1;
end;

-- 判断用户是否下单(重复下单,返回 2)
if (redis.call('SISMEMBER', orderKey, userId) == 1) then
    return 2;
end;

-- 下单成功:扣减库存、保存用户。
redis.call('INCRBY', stockKey, -1);
redis.call('SADD', orderKey, userId);
-- 发送消息到 stream.orders 队列中(*:消息的唯一ID 由 Redis 自动生成):XADD stream.orders * key field ...
redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId);
return 0;
@Override
public Result seckillVoucher(Long voucherId) {
    // 1. 执行 Lua 脚本(有购买资格:向 stream.orders 中添加消息,内容包括 voucherId、userId、orderId)
    Long userId = UserHolder.getUser().getId();
    long orderId = redisIdWorker.nextId("order");
    Long executeResult = stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),
            voucherId.toString(), userId.toString(), String.valueOf(orderId)
    );
  
    // 2. Lua 脚本的执行结果不为 0,则没有购买资格
    int result = executeResult.intValue();
    if (result != 0) {
        return Result.fail(result == 1 ? "库存不足!" : "请勿重复下单!");
    }
  
    // 3. 获取代理对象
    currentProxy = (VoucherOrderService) AopContext.currentProxy();
    
  // 4. 返回订单号(告诉用户下单成功,业务结束;执行异步下单操作数据库)
    return Result.ok(orderId);
}

项目启动时,开启一个线程任务,尝试获取 stream.orders 中的消息,完成下单。

// 从队列中获取信息
public class VoucherOrderHandler implements Runnable {
    String queueName = "stream.orders";
    String groupName = "orderGroup";
    String consumerName = "consumerOne";
    @Override
    public void run() {
        while (true) {
            try {
                // 1. 获取消息队列中的订单信息
                // XREAD GROUP orderGroup consumerOne COUNT 1 BLOCK 2000 STREAMS stream.orders >
                // 队列 stream.orders、消费者组 orderGroup、消费者 consumerOne、每次读 1 条消息、阻塞时间 2s、从下一个未消费的消息开始。
                List<MapRecord<String, Object, Object>> readingList = stringRedisTemplate.opsForStream().read(
                        Consumer.from(groupName, consumerName),
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                        StreamOffset.create(queueName, ReadOffset.lastConsumed())
                );
                
              // 2. 判断消息是否获取成功
                if (readingList.isEmpty() || readingList == null) {
                    // 获取失败说明没有消息,则继续下一次循环
                    continue;
                }
                
              // 3. 解析消息中的订单信息
                // MapRecord:String 代表 消息ID;两个 Object 代表 消息队列中的 Key-Value
                MapRecord<String, Object, Object> record = readingList.get(0);
                Map<Object, Object> recordValue = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(recordValue, new VoucherOrder(), true);
                
              // 4. 获取成功则下单
                handleVoucherOrder(voucherOrder);
                
              // 5. 确认消息 XACK stream.orders orderGroup id
                stringRedisTemplate.opsForStream().acknowledge(groupName, consumerName, record.getId());
            } catch (Exception e) {
                log.error("订单处理异常", e);
                handlePendingMessages();
            }
        }
    }
    private void handlePendingMessages() {
        while (true) {
            try {
                // 1. 获取 pending-list 中的订单信息
                // XREAD GROUP orderGroup consumerOne COUNT 1 STREAM stream.orders 0
                List<MapRecord<String, Object, Object>> readingList = stringRedisTemplate.opsForStream().read(
                        Consumer.from(groupName, consumerName),
                        StreamReadOptions.empty().count(1),
                        StreamOffset.create(queueName, ReadOffset.from("0"))
                );
                
              // 2. 判断消息是否获取成功
                if (readingList.isEmpty() || readingList == null) {
                    // 获取失败 pending-list 中没有异常消息,结束循环
                    break;
                }
                
              // 3. 解析消息中的订单信息并下单
                MapRecord<String, Object, Object> record = readingList.get(0);
                Map<Object, Object> recordValue = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(recordValue, new VoucherOrder(), true);
                handleVoucherOrder(voucherOrder);
                
              // 4. XACK
                stringRedisTemplate.opsForStream().acknowledge(queueName, groupName, record.getId());
            } catch (Exception e) {
                log.error("订单处理异常(pending-list)", e);
                try {
                    // 稍微休眠一下再进行循环
                    Thread.sleep(20);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
  ...
}

秒杀下单 Ultimate VER

SeckillVoucher.lua

local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]

local stockKey = "seckill:stock:" .. voucherId
local orderKey = "seckill:order:" .. voucherId

-- 判断库存是否充足(不足,返回 1)
if (tonumber(redis.call('GET', stockKey)) <= 0) then
    return 1;
end;

-- 判断用户是否下单(重复下单,返回 2)
if (redis.call('SISMEMBER', orderKey, userId) == 1) then
    return 2;
end;

-- 下单成功:扣减库存、保存用户。
redis.call('INCRBY', stockKey, -1);
redis.call('SADD', orderKey, userId);
-- 发送消息到 stream.orders 队列中(*:消息的唯一ID 由 Redis 自动生成):XADD stream.orders * key field ...
redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId);
return 0;

VoucherOrderService

@Service
@SuppressWarnings("ALL")
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements VoucherOrderService {

    @Resource
    private SeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    // Lua 脚本
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("SeckillVoucher.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    // 代理对象
    private VoucherOrderService currentProxy;

    @Override
    public Result seckillVoucher(Long voucherId) {
        // 1. 执行 Lua 脚本(有购买资格:向 stream.orders 中添加消息,内容包括 voucherId、userId、orderId)
        Long userId = UserHolder.getUser().getId();
        long orderId = redisIdWorker.nextId("order");
        Long executeResult = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(), String.valueOf(orderId)
        );

        // 2. Lua 脚本的执行结果不为 0,则没有购买资格
        int result = executeResult.intValue();
        if (result != 0) {
            return Result.fail(result == 1 ? "库存不足!" : "请勿重复下单!");
        }

        // 3. 获取代理对象
        currentProxy = (VoucherOrderService) AopContext.currentProxy();

        // 4. 返回订单号(告诉用户下单成功,业务结束;执行异步下单操作数据库)
        return Result.ok(orderId);
    }


    // 异步处理线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    // 在当前类初始完毕后执行 VoucherOrderHandler 中的 run 方法
    @PostConstruct
    public void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }
    
    // 从队列中获取信息
    public class VoucherOrderHandler implements Runnable {
        String queueName = "stream.orders";
        String groupName = "orderGroup";
        String consumerName = "consumerOne";

        @Override
        public void run() {
            while (true) {
                try {
                    // 1. 获取消息队列中的订单信息
                    // XREAD GROUP orderGroup consumerOne COUNT 1 BLOCK 2000 STREAMS stream.orders >
                    // 队列 stream.orders、消费者组 orderGroup、消费者 consumerOne、每次读 1 条消息、阻塞时间 2s、从下一个未消费的消息开始。
                    List<MapRecord<String, Object, Object>> readingList = stringRedisTemplate.opsForStream().read(
                            Consumer.from(groupName, consumerName),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );

                    // 2. 判断消息是否获取成功
                    if (readingList.isEmpty() || readingList == null) {
                        // 获取失败说明没有消息,则继续下一次循环
                        continue;
                    }

                    // 3. 解析消息中的订单信息
                    // MapRecord:String 代表 消息ID;两个 Object 代表 消息队列中的 Key-Value
                    MapRecord<String, Object, Object> record = readingList.get(0);
                    Map<Object, Object> recordValue = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(recordValue, new VoucherOrder(), true);

                    // 4. 获取成功则下单
                    handleVoucherOrder(voucherOrder);

                    // 5. 确认消息 XACK stream.orders orderGroup id
                    stringRedisTemplate.opsForStream().acknowledge(groupName, consumerName, record.getId());
                } catch (Exception e) {
                    log.error("订单处理异常", e);
                    handlePendingMessages();
                }
            }
        }

        private void handlePendingMessages() {
            while (true) {
                try {
                    // 1. 获取 pending-list 中的订单信息
                    // XREAD GROUP orderGroup consumerOne COUNT 1 STREAM stream.orders 0
                    List<MapRecord<String, Object, Object>> readingList = stringRedisTemplate.opsForStream().read(
                            Consumer.from(groupName, consumerName),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );

                    // 2. 判断消息是否获取成功
                    if (readingList.isEmpty() || readingList == null) {
                        // 获取失败 pending-list 中没有异常消息,结束循环
                        break;
                    }

                    // 3. 解析消息中的订单信息并下单
                    MapRecord<String, Object, Object> record = readingList.get(0);
                    Map<Object, Object> recordValue = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(recordValue, new VoucherOrder(), true);
                    handleVoucherOrder(voucherOrder);

                    // 4. XACK
                    stringRedisTemplate.opsForStream().acknowledge(queueName, groupName, record.getId());
                } catch (Exception e) {
                    log.error("订单处理异常(pending-list)", e);
                    try {
                        // 稍微休眠一下再进行循环
                        Thread.sleep(20);
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }

        }

        private void handleVoucherOrder(VoucherOrder voucherOrder) {
            Long userId = voucherOrder.getUserId();
            RLock lock = redissonClient.getLock("lock:order:" + userId);
            boolean isLocked = lock.tryLock();
            if (!isLocked) {
                log.error("不允许重复下单!");
                return;
            }
            try {
                // 该方法非主线程调用,代理对象需要在主线程中获取。
                currentProxy.createVoucherOrder(voucherOrder);
            } finally {
                lock.unlock();
            }
        }
    }

    @Transactional
    @Override
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        // 1. 一人一单
        Integer count = query()
                .eq("voucher_id", voucherOrder.getVoucherId())
                .eq("user_id", userId)
                .count();
        if (count > 0) {
            log.error("不可重复下单!");
            return;
        }

        // 2. 减扣库存
        boolean isAccomplished = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
                .update();
        if (!isAccomplished) {
            log.error("库存不足!");
            return;
        }

        // 3. 下单
        boolean isSaved = save(voucherOrder);
        if (!isSaved) {
            log.error("下单失败!");
            return;
        }
    }
}

11. 点赞相关

11.1 发布、查看笔记

  • tb_blog:笔记表,包含比较重的标题、文字、图片等;
  • tb_blog_comments:其他用户对笔记的评论。
  • 上传图片接口地址:http://localhost:8080/api/upload/blog
  • 发布笔记接口地址:http://localhost:8080/api/blog

目前需求:点击笔记,进入详情页面,实现该页面的查询接口。

请求方式 请求路径 请求参数 返回值
GET /blog/{id} id(@PathVariable 笔记信息(包含用户信息)

Blog 实体类中添加两个属性,iconname,并且添加 @TableField(exist = false) 注解,表示该注解不属于 tb_blog 表中的字段。

/**
 * 用户图标
 */
@TableField(exist = false)
private String icon;

/**
 * 用户姓名
 */
@TableField(exist = false)
private String name;
@GetMapping("/{id}")
public Result queryById(@PathVariable("id") Long id) {
    return blogService.queryById(id);
}

// BlogService
@Override
public Result queryById(Long id) {
    Blog blog = getById(id);
    if (blog == null) {
        return Result.fail("笔记不存在!");
    }
    queryBlogWithUserInfo(blog);
    return Result.ok(blog);
}

private void queryBlogWithUserInfo(Blog blog) {
    Long userId = blog.getUserId();
    User user = userService.getById(userId);
    blog.setIcon(user.getIcon());
    blog.setName(user.getNickName());
}

11.2 点赞

初始代码:http://localhost:8080/blog/like/{id}

目前存在的问题:一个用户可以无限点赞,目前的逻辑发起的请求只是将 liked 字段的值 +1

@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
    return blogService.likeBlog(id);
}

@Override
public Result likeBlog(Long id) {
    // update set tb_blog liked = liked + 1 where id = ?
    update().setSql("liked = liked + 1").eq("id", id).update();
    return Result.ok();
}

需求

  1. 同一个用户只能点赞一次,再次点赞则取消点赞;
  2. 若当前用户已经点赞,则点赞按钮高亮显示(前段实现,判断 Blog 类中的 isLike 属性的值)。

实现步骤

  1. 为 Blog 类添加一个 isLike 属性,标识是否被当前用户点赞;
  2. 修改点赞功能,利用 Redis 的 Set 集合判断是否点过赞;用户点赞,未点过赞则点赞数 +1,已点过赞则点赞数 -1;
    • Set 集合:无序不可重复,支持 交集、并集、差集等功能。
  3. 修改根据 ID 查询 Blog 的业务,判断当前登录用户是否点过赞,赋值给 isLike 属性;
  4. 修改分页查询 Blog 业务,判断当前登录用户是否点过赞,赋值给 isLike 属性。

Blog 类中添加一个 isLike 属性

/**
 * 是否点赞
 */
@TableField(exist = false)
private Boolean isLike;

判断用户是否对该 Blog 点赞过

/**
 * 判断用户是否对该 Blog 点赞过
 */
private void isBlogLiked(Blog blog) {
    String key = BLOG_LIKED_KEY + blog.getId();
  UserDTO user = UserHolder.getUser();
    if (user == null) {
        // 用户未登录,无需查询是否点过赞
        return;
    }
    Long userId = user.getId();
    Boolean isLiked = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
    blog.setIsLike(BooleanUtil.isTrue(isLiked);
}


/**
 * 展示热门 Blog
 */
@Override
public Result queryHotBlog(Integer current) {
    Page<Blog> page = query()
            .orderByDesc("liked")
            .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
    List<Blog> records = page.getRecords();
    records.forEach(blog -> {
        this.queryBlogWithUserInfo(blog);
        this.isBlogLiked(blog);
    });
    return Result.ok(records);
}

/**
 * 展示 Blog 详情页(根据 ID)
 */
@Override
public Result queryById(Long id) {
    Blog blog = getById(id);
    if (blog == null) {
        return Result.fail("笔记不存在!");
    }
    queryBlogWithUserInfo(blog);
    isBlogLiked(blog);
    return Result.ok(blog);
}

实现点赞功能

@Override
public Result likeBlog(Long id) {
    // 1. 判断当前登录用户是否点过赞。
    Long userId = UserHolder.getUser().getId();
    String key = BLOG_LIKED_KEY + id;
    Boolean isLiked = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
  
    // 2. 未点过赞:点赞,数据库点赞数 +1,将用户保存到 Redis 的 Set 集合中。
    if (BooleanUtil.isFalse(isLiked)) {
        Boolean isSucceed = update().setSql("liked = liked + 1").eq("id", id).update();
        if (BooleanUtil.isTrue(isSucceed)) {
            stringRedisTemplate.opsForSet().add(key, userId.toString());
        }
    } else {
        // 3. 已点过赞:取消赞,数据库点赞数 -1,将用户从 Redis 的 Set 集合中移除。
        Boolean isSucceed = update().setSql("liked = liked - 1").eq("id", id).update();
        if (BooleanUtil.isTrue(isSucceed)) {
            stringRedisTemplate.opsForSet().remove(key, userId.toString());
        }
    }
    return Result.ok();
}

11.3 点赞排行榜

在笔记的详情页面,应该显示给该笔记点赞的人,比如:显示最早给该笔记点赞的用户的 TOP 5。

之前的点赞放在 Set 集合中,但是 Set 集合是无序不可重复的,此处需要使用可排序的 Set 集合,即 SortedSet。

List Set SortedSet
排序方式 按照顺序排序 无法排序 根据 score 值排序
唯一性 不唯一 唯一 唯一
查找方式 按照索引查找 或 首尾查找 根据元素查找 根据元素查找

修改点赞业务逻辑

private void isBlogLiked(Blog blog) {
    String key = BLOG_LIKED_KEY + blog.getId();
  UserDTO user = UserHolder.getUser();
    if (user == null) {
        // 用户未登录,无需查询是否点过赞
        return;
    }
    Long userId = user.getId();
    Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
    blog.setIsLike(score != null);
}
@Override
public Result likeBlog(Long id) {
    // 1. 判断当前登录用户是否点过赞。
    Long userId = UserHolder.getUser().getId();
    String key = BLOG_LIKED_KEY + id;
  
    // `ZSCORE key member` :获取 SortedSet 中指定元素的 score 值(如果不存在,则代表未点过赞)。
    Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
    
  // 2. 未点过赞:点赞,数据库点赞数 +1,将用户保存到 Redis 的 Set 集合中。
    if (score == null) {
        Boolean isSucceed = update().setSql("liked = liked + 1").eq("id", id).update();
        if (BooleanUtil.isTrue(isSucceed)) {
            stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
        }
    } else {
        // 3. 已点过赞:取消赞,数据库点赞数 -1,将用户从 Redis 的 Set 集合中移除。
        Boolean isSucceed = update().setSql("liked = liked - 1").eq("id", id).update();
        if (BooleanUtil.isTrue(isSucceed)) {
            stringRedisTemplate.opsForZSet().remove(key, userId.toString());
        }
    }
    return Result.ok();
}

接口详情

请求方式 请求路径 请求参数 返回值
GET /blog/likes/{id} id(@PathVariable List<UserDeto>(给该笔记点赞的 TopN 用户的集合)
@GetMapping("/likes/{id}")
public Result queryBlogLikes(@PathVariable("id") Long id) {
    return blogService.queryBlogLikes(id);
}

注意:

  • select id from tb_user where id in (5, 2, 1) 的查询结果顺序为:1、2、5;
  • select id from tb_user where id in (5, 2, 1) ORDER BY FIELD(id, 5, 2, 1); 的查询结果顺序为:5、2、1,指定根据何种字段排序以及字段值。
/**
 * Blog 详情页展示最早点赞的 5 个用户
 */
@Override
public Result queryBlogLikes(Long id) {
    String key = BLOG_LIKED_KEY + id;
    // 1. 查询最早五个点赞的用户
    Set<String> topFive = stringRedisTemplate.opsForZSet().range(key, 0, 4);
    if (topFive == null || topFive.isEmpty()) {
        return Result.ok(Collections.emptyList());
    }
    
  // 2. 解析出其中的 用户ID
    List<Long> userIdList = topFive.stream()
            .map(Long::valueOf)
            .collect(Collectors.toList());
    String userIdStrWithComma = StrUtil.join(", ", userIdList);
    
  // 3. 根据 ID 批量查询
    List<UserDTO> userDTOList = userService.query()
            .in("id", userIdList)
            .last("ORDER BY FIELD(id, " + userIdStrWithComma + ")")
            .list()
            .stream()
            .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
            .collect(Collectors.toList());
    return Result.ok(userDTOList);
}

12. 关注相关

12.1 关注 和 取关

关注是 User 表之间的关系,通过 tb_follow 表进行标识;关注的实现需要通过两个接口实现:关注与取关、判断是否关注。

关注与取关:http://localhost:8080/api/follow/{id}/{boolean}

判断是否关注:http://localhost:8080/api/follow/or/not/{id}

tb_follow 表:

id user_id follow_user_id
主键ID 用户ID 关联的用户ID
@RestController
@RequestMapping("/follow")
public class FollowController {

    @Resource
    private FollowService followService;

    /**
     * 关注或取关
     * @param followUserId 需要关注 or 取关的 用户ID
     * @param isFollowed 是否关注
     */
    @PutMapping("/{id}/{isFollowed}")
    public Result followOrNot(@PathVariable("id") Long followUserId, @PathVariable("isFollowed") Boolean isFollowed) {
        return followService.followOrNot(followUserId, isFollowed);
    }

    /**
     * 判断是否关注该用户
     * @param followUserId 关注用户的ID
     */
    @GetMapping("/or/not/{id}")
    public Result isFollowed(@PathVariable("id") Long followUserId) {
        return followService.isFollowed(followUserId);
    }
}
@Override
public Result followOrNot(Long followUserId, Boolean isFollowed) {
    Long userId = UserHolder.getUser().getId();
    // 判断是关注还是取关
    if (BooleanUtil.isTrue(isFollowed)) {
        // 关注,增加
        Follow follow = new Follow();
        follow.setUserId(userId);
        follow.setFollowUserId(followUserId);
        save(follow);
    } else {
        // 取关,删除
        remove(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(
Follow::getFollowUserId, followUserId));
    }
    return Result.ok();
}

@Override
public Result isFollowed(Long followUserId) {
    Long userId = UserHolder.getUser().getId();
    Integer count = lambdaQuery().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId).count();
    return Result.ok(count > 0);
}

12.2 共同关注

关注时,将当前用户所关注的用户ID存入到 Redis 中:以当前用户的 ID 为 Key,关注用户的 ID 为 value。

取关时,将其从 Redis 中删除。

注意:为了实现共同关注功能,使用 Set,因为 Set 中有 SINTER - 交集SDIFFER - 差集SUNION - 并集 命令。

@Override
public Result followOrNot(Long followUserId, Boolean isFollowed) {
    Long userId = UserHolder.getUser().getId();
    String key = "follow:" + userId;
    // 判断是关注还是取关
    if (BooleanUtil.isTrue(isFollowed)) {
        // 关注,增加
        Follow follow = new Follow();
        follow.setUserId(userId);
        follow.setFollowUserId(followUserId);
        boolean isSucceed = save(follow);
      // 添加到 Redis 中(当前用户ID 为 key,关注用户ID 为 value)
        if (Boolean.TRUE.equals(isSucceed)) {
            stringRedisTemplate.opsForSet().add(key, followUserId.toString());
        }
    } else {
        // 取关,删除
        boolean isSucceed = remove(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, 
userId).eq(Follow::getFollowUserId, followUserId));
        if (BooleanUtil.isTrue(isSucceed)) {
          // 从 Redis 中删除
            stringRedisTemplate.opsForSet().remove(key, followUserId.toString());
        }
    }
    return Result.ok();
}

@Override
public Result isFollowed(Long followUserId) {
    Long userId = UserHolder.getUser().getId();
    Integer count = lambdaQuery().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId).count();
    return Result.ok(count > 0);
}

使用 SINTER key [key ...] 求出两个用户间的共同关注。

请求方式 请求路径 请求参数 返回值
GET /follow/common/{id} id(目标用户ID,@PathVariable List<UserDTO> 两个用户的共同关注
/**
 * 获取两个用户之间的共同关注用户
 * @param followUserId 关注用户的ID
 */
@GetMapping("/common/{id}")
public Result commonFollow(@PathVariable("id") Long followUserId) {
    return followService.commonFollow(followUserId);
}

@Override
public Result commonFollow(Long followUserId) {
    Long userId = UserHolder.getUser().getId();
    String selfKey = "follow:" + userId;
    String aimKey = "follow:" + followUserId;
    Set<String> userIdSet = stringRedisTemplate.opsForSet().intersect(selfKey, aimKey);
    if (userIdSet.isEmpty() || userIdSet == null) {
        // 无交集
        return Result.ok(Collections.emptyList());
    }
    List<UserDTO> userDTOList = userService.listByIds(userIdSet)
            .stream()
            .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
            .collect(Collectors.toList());
    return Result.ok(userDTOList);
}

12.3 关注推送 - Feed 流

关注推送也叫做 Feed 流(投喂),通过无线下拉刷新获取新的信息。

  • 传统模式:需要用户通过搜索引擎或其他方式检索自己需要的内容;
  • Feed 模式:通过系统分析用户想要什么,直接将内容推送给用户,从而使用户能更加节约时间,不需要再主动寻找。

传统模式 VS Feed模式

12.3.1 Feed 流的实现方案

Feed 流的两种常见模式

  • Timeline:不做内容筛选,简单的按照内容发布时间顺序,常用于好友或关注。例如朋友圈。
    • 优点:信息全面,不会有所缺失,实现较为简单;
    • 缺点:信息噪音较多,用户不一定感兴趣,对感兴趣的内容获取效率低。
  • 智能排序:利用智能算法屏蔽违规、用户不感兴趣的内容了,推送用户感兴趣的信息来吸引用户。
    • 优点:投喂用户感兴趣的信息,用户粘度很高,容易沉迷;
    • 缺点:如果算法不精准,可能会起到反作用。

在本例中的个人主页,是基于关注的好友来做 Feed 流,因此采用 Timeline 模式。该模式的实现方案有三种:

  1. 拉模式;
  2. 推模式;
  3. 推拉结合。

拉模式,也叫读扩散。

当张三和李四发送消息后,都会保存在自己的发件箱中;若赵六要读取消息,需要读取自己的收件箱,此时系统会从赵六关注的人的发件箱中拉取全部消息到其收件箱中,然后按照时间戳进行排序。

  • 优点:节省空间,赵六读取信息时没有重复读取,而且读取完后可以将他的收件箱清除;
  • 缺点:有延迟,当用户读取数据时才去关注的人的发件箱中读取,若用户关注了大量用户,此时会拉取海量的内容,导致服务器压力巨大。

拉模式

推模式,也叫写扩散。

当张三发送一个消息时,会主动将张三的内容发送到其粉丝的收件箱中,此时粉丝读取则无需再去临时拉取。

  • 优点:时效快,无需临时拉取;
  • 缺点:内存压力大,假设 张三 的粉丝量巨大,则需要发送海量的数据到粉丝的收件箱。
    推模式

推拉结合,也叫读写混合,兼具推和拉两种模式的优点(推拉结合是一个折中的方案)。

  • 站在发件人方
    • 发件人粉丝量小,采用写扩散(推)的方式,直接将消息写入到粉丝的收件箱中。
    • 发件人粉丝量大,直接将数据写入到一个发件箱中,然后再直接写一份到活跃粉丝的收件箱中。
  • 站在收件人方
    • 收件人是活跃粉丝,不论发件人的粉丝量多或少,都直接写入到其收件箱中。
    • 收件人是普通粉丝,上线不是很频繁,等他们上线时再去发件箱中拉取消息。

推拉结合

推模式 & 拉模式 & 推拉结合

拉模式 推模式 推拉结合
写比例
读比例
用户读取延迟
实现难度 复杂 简单 很复杂
使用场景 很少使用 用户量少 用户量多

12.3.2 Feed 流的滚动分页

Feed 流中的数据会不断的更新,数据的下标也在变化,因此不能采用传统的分页模式。

传统分页

  • 假设在 t1 时刻,读取第一页消息,page = 1 & size = 5,读取到的就是 10 9 8 7 6 五条数据;
  • 在 t2 时刻又发布了一条消息,page = 2 & size = 5;t3 时刻所读取的为 6 5 4 3 2 五条数据(读取到了重复的数据)。

传统分页

滚动分页

记录每次操作的最后一条消息,从这个位置开始读取数据。

  • 假设在 t1 时刻,拿到了第一页的数据 —— 10 9 8 7 6,然后记录下当前最后一次拿到的数据,即 lastId = 6
  • 在 t2 时刻又发布了一条消息;t3 时刻所读取的为 5 4 3 2 1 五条数据。
  • 注意:刷新即可获取到最新发布的消息,即 11,因为 lastId = ♾️

因此,可以通过 SortedSet 实现,按照 score 值(时间戳)从大到小进行范围查询,每一次查询都记录最小的时间戳,下一次查询时再寻找更小的时间戳对应的数据,从而实现滚动分页。

滚动分页

12.3.3 代码实现

需求

  1. 修改新增笔记业务,在保存 Blog 到数据库的同时,推送消息到粉丝的收件箱;
  2. 收件箱可以根据时间戳排序,使用 Redis 的 SortedSet 数据结构实现;
  3. 实现分页查询查询收件箱数据。

推送消息到粉丝的收件箱

@Override
public Result saveBlog(Blog blog) {
    // 1. 获取登录用户
    UserDTO user = UserHolder.getUser();
    blog.setUserId(user.getId());
    // 2. 保存探店博文
    boolean isSucceed = save(blog);
    if (BooleanUtil.isFalse(isSucceed)) {
        return Result.fail("发布失败~");
    }
    // 3. 查询笔记作者的所有粉丝(select * from tb_follow where follow_user_id = ?)
    List<Follow> followUserList = followService.lambdaQuery().eq(Follow::getFollowUserId, user.getId()).list();
  if (followUserList.isEmpty() || followUserList == null) {
    return Result.ok(blog.getId());
 }
    // 4. 推送笔记给所有粉丝
    for (Follow follow : followUserList) {
        // 粉丝ID
        Long userId = follow.getUserId();
        // 推送
        String key = FEED_KEY + userId;
        stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
    }
    // 5. 返回id
    return Result.ok(blog.getId());
}

分页查询收件箱:在个人主页的 “关注” 中,查询并展示推送的 Blog。

  1. 第一次查询的 lastId 为当前时间戳,每次查询后,lastId 为上一次查询的最小时间戳;
  2. 偏移量 offset 为 上一次查询的最小值的元素个数,下一次查询时需要跳过这些已经查询过的数据。
127.0.0.1:6379> ZADD zset 1 m1 2 m2 3 m3 4 m4 5 m5 6 m6 6 m7 8 m8
(integer) 8

127.0.0.1:6379> ZREVRANGE zset 0 -1 WITHSCORES
 1) "m8"
 2) "8"
 3) "m7"
 4) "6"
 5) "m6"
 6) "6"
 7) "m5"
 8) "5"
 9) "m4"
10) "4"
11) "m3"
12) "3"
13) "m2"
14) "2"
15) "m1"
16) "1"

# ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
# max:当前时间戳 | 上一次查询的最小时间戳
# min: 0
# offset:0 | 上一次查询结果中,与最小值相同的元素个数
# count:固定值
127.0.0.1:6379> ZREVRANGEBYSCORE zset 999 0 WITHSCORES LIMIT 0 3
1) "m8"
2) "8"
3) "m7"
4) "6"
5) "m6"
6) "6"

127.0.0.1:6379> ZREVRANGEBYSCORE zset 6 0 WITHSCORES LIMIT 2 3
1) "m5"
2) "5"
3) "m4"
4) "4"
5) "m3"
6) "3"

127.0.0.1:6379> ZREVRANGEBYSCORE zset 3 0 WITHSCORES LIMIT 1 3
1) "m2"
2) "2"
3) "m1"
4) "1"
请求方式 请求路径 请求参数 返回值
GET /blog/of/follow lastId(上一次查询的最小时间戳);offset(偏移量) List<Blog>(小于指定时间戳的 Blog 集合);minTime(本次查询的最小时间戳);offset(偏移量)

返回值实体类

@Data
public class ScrollResult {
    private List<?> list;
    private Long minTime;
    private Integer offset;
}

BlogController & BlogService

@GetMapping("/of/follow")
public Result queryBlogOfFollow(
        @RequestParam("lastId") Long max,
        @RequestParam(value = "offset", defaultValue = "0") Integer offset) {
    return blogService.queryBlogOfFollow(max, offset);
}
public Result queryBlogOfFollow(Long max, Integer offset) {
    // 1. 获取当前用户
    Long userId = UserHolder.getUser().getId();

    // 2. 查询收件箱 ZREVRANGEBYSCORE key max min LIMIT offset count
    String key = FEED_KEY + userId;
    Set<ZSetOperations.TypedTuple<String>> tupleSet = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);
    if (tupleSet.isEmpty() || tupleSet == null) {
        return Result.ok();
    }

    // 3. 解析数据:blogId、lastId、offset
    List<Long> blogIdList = new ArrayList<>(tupleSet.size());
    long minTime = 0;
    int nextOffset = 1;
    for (ZSetOperations.TypedTuple<String> tuple : tupleSet) {
        blogIdList.add(Long.valueOf(tuple.getValue()));
        // 时间戳(最后一个元素即为最小时间戳)
        long time = tuple.getScore().longValue();

        // 假设时间戳为:5 4 4 2 2
        // 5 != 0 --> minTime=5; nextOffset = 1;
// 4 != 5 --> minTime=4; nextOffset = 1;
// 4 == 4 --> minTime=4; nextOffset = 2;
// 2 != 4 --> minTime=2; nextOffset = 1;
// 2 == 2 --> minTime=2; nextOffset = 2;
        if (time == minTime) {
            nextOffset ++;
        } else {
            minTime = time;
            nextOffset = 1;
        }
    }

    // 4. 根据 ID 查询 Blog
    String blogIdStr = StrUtil.join(", ", blogIdList);
List<Blog> blogList = lambdaQuery().in(Blog::getId, blogIdList).last("ORDER BY FIELD(id, " + blogIdStr + ")").list();
    for (Blog blog : blogList) {
        // 完善 Blog 数据:查询并且设置与 Blog 有关的用户信息,以及 Blog 是否被该用户点赞
        queryBlogWithUserInfo(blog);
        isBlogLiked(blog);
    }

    // 5. 封装并返回
    ScrollResult scrollResult = new ScrollResult();
    scrollResult.setList(blogList);
    scrollResult.setMinTime(minTime);
    scrollResult.setOffset(nextOffset);
    return Result.ok(scrollResult);
}

13. GEO 附近搜索

13.1 GEO 数据结构

GEO Geolocation,代表地理位置,允许存储地理坐标。GEO 底层的实现原理是 ZSET,可以使用 ZSET 的命令操作 GEO。

  • GEOADD key longitude latitude member [longitude latitude member ...]:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member);

    127.0.0.1:6379> GEOADD China:City 116.40 39.90 Beijing
    (integer) 1
    127.0.0.1:6379> GEOADD China:City 121.47 31.23 Shanghai 106.50 29.53 Chongqing 114.08 22.547 Shenzhen 120.15 30.28 Hangzhou 125.15 42.93 Xian 102.71 25.04 Kunming
    (integer) 6
    
  • GEODIST key member1 member2 [unit]:计算指定的两个点之间的距离并返回;

    127.0.0.1:6379> GEODIST China:City Beijing Shanghai km
    "1067.3788"
    127.0.0.1:6379> GEODIST China:City Shanghai Kunming km
    "1961.3500"
    
  • GEOHASH key member [member ...]:将指定 member 的坐标转为 hash 字符串形式并返回;

    # 降低内存存储压力,会损失一些精度,但是仍然指向同一个地区。
    127.0.0.1:6379> GEOHASH China:City Beijing Shanghai Kunming
    1) "wx4fbxxfke0"
    2) "wtw3sj5zbj0"
    3) "wk3n3nrhs60"
    
  • GEOPOS key member [member ...]:返回指定 member 的坐标;

    127.0.0.1:6379> GEOPOS China:City Beijing
    1) 1) "116.39999896287918091"
       2) "39.90000009167092543"
    127.0.0.1:6379> GEOPOS China:City Shanghai Kunming Hangzhou
    1) 1) "121.47000163793563843"
       2) "31.22999903975783553"
    2) 1) "102.70999878644943237"
       2) "25.03999958679589355"
    3) 1) "120.15000075101852417"
       2) "30.2800007575645509"
    
  • GEORADIUS key longitude latitude radius [unit] [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC]:指定圆心、半径,找到该圆范围内包含的所有 member,并按照与圆心的距离排序后返回(6.2 后弃用);

    127.0.0.1:6379> GEORADIUS China:City 102 30 1000 km
    1) "Kunming"
    2) "Chongqing"
    127.0.0.1:6379> GEORADIUS China:City 102 30 500 km
    1) "Chongqing"
    
    127.0.0.1:6379> GEORADIUS China:City 102 30 1000 km WITHDIST WITHCOORD
    1) 1) "Kunming"
       2) "556.1051"
       3) 1) "102.70999878644943237"
          2) "25.03999958679589355"
    2) 1) "Chongqing"
       2) "437.5880"
       3) 1) "106.49999767541885376"
          2) "29.52999957900659211"
    
    127.0.0.1:6379> GEORADIUS China:City 102 30 1000 km WITHCOORD WITHDIST COUNT 1
    1) 1) "Chongqing"
       2) "437.5880"
       3) 1) "106.49999767541885376"
          2) "29.52999957900659211"
    
  • GEOSEARCH key [FROMMEMBER member] [FROMLONLAT longitude latitude] [BYRADIUS radius [unit]] [BYBOX width height [unit]] [ASC|DESC] [COUNT count [ANY]] [WITHCOORD] [WITHDIST] [WITHHASH]:在指定范围内搜索 member,并按照与指定之间的距离顺序后返回,范围内可以是圆形或矩形(6.2 新功能);

    127.0.0.1:6379> GEOSEARCH China:City FROMLONLAT 116.397904 39.909005 BYRADIUS 1000 km WITHDIST
    1) 1) "Beijing"
       2) "1.0174"
    2) 1) "Xian"
       2) "803.0689"
    
    127.0.0.1:6379> GEOSEARCH China:City FROMLONLAT 116.397904 39.909005 BYBOX 2000 2000 km WITHDIST
    1) 1) "Shanghai"
       2) "1068.3526"
    2) 1) "Beijing"
       2) "1.0174"
    3) 1) "Xian"
       2) "803.0689
    
    127.0.0.1:6379> GEOSEARCH China:City FROMMEMBER Beijing BYBOX 2000 2000 km WITHDIST
    1) 1) "Shanghai"
       2) "1067.3788"
    2) 1) "Beijing"
       2) "0.0000"
    3) 1) "Xian"
       2) "803.3746"
    
  • GEOSEARCHSTORE :与 GEOSEARCH 功能一致,不过可以把结果存储到一个指定的 Key(6.2 新功能)。

13.2 附近商家搜索

将数据库中的数据导入到 Redis 中:按照商家类型分组,类型相同的商家作为一组,以 typeId 为 Key,商家地址为 Value。

@Test
void loadShopData() {
    List<Shop> shopList = shopService.list();
    // 1. 店铺按照 TypeId 分组
    Map<Long, List<Shop>> map = shopList.stream().collect(Collectors.groupingBy(Shop::getTypeId));
    // 2. 分批写入 Redis
    for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
        Long typeId = entry.getKey();
        String key = SHOP_GEO_KEY + typeId;
        List<Shop> shops = entry.getValue();
        List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(shops.size());
        for (Shop shop : shops) {
            // stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString());
            locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(), new Point(shop.getX(), shop.getY())));
        }
        stringRedisTemplate.opsForGeo().add(key, locations);
    }
}

注意:spring-data-redis 2.3.9 版本不支持 Redis 6.2 提供的 GEOSEARCH 命令。

spring-data-redis版本

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
        </exclusion>
        <exclusion>
            <groupId>lettuce-core</groupId>
            <artifactId>io.lettuce</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-redis</artifactId>
    <version>2.6.2</version>
</dependency>

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>6.1.6.RELEASE</version>
</dependency>
请求方式 请求路径 请求参数 返回值
GET /shop/of/type typeId:商家类型;current:页码;x:经度;y:纬度 List<Shop>:符合要求的商家
/**
 * 根据店铺类型分页查询店铺信息(按照距离排序)
 * @param typeId 店铺类型
 * @param current 当前页码
 * @param x 经度
 * @param y 纬度
 * @return 店铺列表
 */
@GetMapping("/of/type")
public Result queryShopByType(
        @RequestParam("typeId") Integer typeId,
        @RequestParam(value = "current", defaultValue = "1") Integer current,
        @RequestParam(value = "x", required = false) Double x,
        @RequestParam(value = "y", required = false) Double y) {
    return shopService.queryShopByTypeId(typeId, current, x, y);
}
@Override
public Result queryShopByTypeId(Integer typeId, Integer current, Double x, Double y) {
    // 1. 判断是否需要根据坐标查询
    if (ObjectUtil.isNull(x) || ObjectUtil.isNull(y)) {
        return Result.ok(lambdaQuery().eq(Shop::getTypeId, typeId).page(new Page<>(current, DEFAULT_PAGE_SIZE)).getRecords());
    }

    // 2. 计算分页参数
    int start = (current - 1) * DEFAULT_PAGE_SIZE;
    int end = current * DEFAULT_PAGE_SIZE;

    // 3. 查询 Redis,按照距离排序 --> GEOSEARCH key BYLONLAT x y BYRADIUS 5000 mi WITHDISTANCE
    String key = SHOP_GEO_KEY + typeId;
    GeoResults<RedisGeoCommands.GeoLocation<String>> geoResults = redisTemplate.opsForGeo().search(
                    key,
                    GeoReference.fromCoordinate(x, y),
                    new Distance(5000),
                    RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));
    if (ObjectUtil.isNull(geoResults)) {
        return Result.ok(Collections.emptyList());
    }

    // 4. 解析出 ID,根据 ID 查询商店
    List<GeoResult<RedisGeoCommands.GeoLocation<String>>> content = geoResults.getContent();
  if (content.size() <= start) {
    return Result.ok(Collections.emptyList());
}
    List<Long> shopIdList = new ArrayList<>(content.size());
    Map<String, Distance> distanceMap = new HashMap<>(content.size());
    content.stream().skip(start).forEach(result -> {
        String shopIdStr = result.getContent().getName();
        shopIdList.add(Long.valueOf(shopIdStr));
        Distance distance = result.getDistance();
        distanceMap.put(shopIdStr, distance);
    });

    // 5. 根据 shopId 查询 Shop
    String shopIdStrWithComma = StrUtil.join(", ", shopIdList);
    List<Shop> shopList = lambdaQuery().in(Shop::getId, shopIdList).last("ORDER BY FIELD(id, " + shopIdStrWithComma + ")").list();
    for (Shop shop : shopList) {
        shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
    }

    // 6. Return ShopList
    return Result.ok(shopList);
}

14. BitMap 签到

14.1 BitMap 数据结构

CREATE TABLE `tb_sign` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `user_id` bigint unsigned NOT NULL COMMENT '用户id',
  `year` year NOT NULL COMMENT '签到的年',
  `month` tinyint NOT NULL COMMENT '签到的月',
  `date` date NOT NULL COMMENT '签到的日期',
  `is_backup` tinyint unsigned DEFAULT NULL COMMENT '是否补签',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=COMPACT;

用户签到一次就是一条记录,若有 1000 万用户,平均每人每年的签到次数为 10 次,这张表的数据量为 1 亿条;每次签到需要使用 (8 + 8 + 1 + 1 + 3 + 1)22 个字节的内存,则一个月需要 600 多字节,存储压力过大。

解决方案:用一张签到表,签到打个 ✔️ 即可,方便统计。

  • 按月统计用户签到信息,签到记为 1,未签到则记为 0。
  • 每一个 Bit 位对应当月的一天,形成映射关系;用 0 和 1 标识业务状态,这种思路被称为 位图(BitMap)。
  • 如此即可使用极小的空间实现大量数据的表示。
  • Redis 中使用 String 数据结构实现 BitMap,最大上限是 512 M,转换为 Bit 则是 2^32 个 Bit 位。

签到表 BitMap

BitMap 的操作命令

  • SETBIT key offset value :向指定位置 offset 存入一个 0 或 1;

  • GETBIT key offset :获取指定位置 offset 的 Bit 值;

    127.0.0.1:6379> SETBIT bm 0 1
    (integer) 0
    127.0.0.1:6379> SETBIT bm 1 1
    (integer) 0
    127.0.0.1:6379> SETBIT bm 2 1
    (integer) 0
    127.0.0.1:6379> SETBIT bm 5 1
    (integer) 0
    127.0.0.1:6379> SETBIT bm 6 1
    (integer) 0
    
    127.0.0.1:6379> GETBIT bm 1
    (integer) 1
    127.0.0.1:6379> GETBIT bm 3
    (integer) 0
    127.0.0.1:6379> GETBIT bm 5
    (integer) 1
    127.0.0.1:6379> GETBIT bm 7
    (integer) 0
    
    # Redis 中存储的 weekOne 的二进制:11100110
    
  • BITCOUNT key [start end] :统计 BitMap 中值为 1 的 Bit 位的数量;

    127.0.0.1:6379> BITCOUNT weekOne
    (integer) 5
    
  • BITFIELD key [GET type offset] :操作(查询、修改、自增) BitMap 中 Bit 数组中指定位置 offset 的值;

    • typeu 为无符号,i 为有符号;符号后的数字为
    # bm -> 11100110
    
    # 11
    127.0.0.1:6379> BITFIELD bm GET u2 0
    1) (integer) 3
    # 111
    127.0.0.1:6379> BITFIELD bm GET u3 0
    1) (integer) 7
    # 1110
    127.0.0.1:6379> BITFIELD bm GET u4 0
    1) (integer) 14
    
    # 110
    127.0.0.1:6379> BITFIELD bm GET u3 5
    1) (integer) 6
    # 10
    127.0.0.1:6379> BITFIELD bm GET u3 6
    1) (integer) 4
    
  • BITPOS key bit [start] [end] :查找 Bit 数组中指定范围内的第一个 0 或 1 出现的位置。

    127.0.0.1:6379> BITPOS weekOne 1
    (integer) 0
    
    127.0.0.1:6379> BITPOS weekOne 0
    (integer) 3
    

14.2 签到

请求方式 请求路径 请求参数 返回值
POST /user/sign

BitMap 底层基于 String 数据结构,因此其操作也都封装到在字符串的相关操作中。

@PostMapping("/sign")
public Result sign() {
    return userService.sign();
}
@Override
public Result sign() {
    // 1. 获取当前登录用户
    Long userId = UserHolder.getUser().getId();

    // 2. 获取日期
    LocalDateTime nowDateTime = LocalDateTime.now();
    String formatTime = nowDateTime.format(DateTimeFormatter.ofPattern(":yyyyMM"));

    // 3. 拼接 Key
    String key = USER_SIGN_KEY + userId + formatTime;

    // 4. 获取今天是本月的第几天(1~31,BitMap 则为 0~30)
    int dayOfMonth = nowDateTime.getDayOfMonth();

    // 5. 写入 Redis  SETBIT key offset 1
    stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
    return Result.ok();
}

14.3 签到统计

本月到今天为止的所有签到数据

BITFIELD key GET u[dayOfMonth] 0

连续签到天数

从最后一次签到向前统计,直到遇到第一次未签到为止;计算总的签到次数,就是连续签到天数。

Java 逻辑代码:获取到当前余月的最后一次签到数据,定义一个计数器;不断地向前统计,每次获得一个 非0 的数字计数器 + 1,直到遍历完所有的数据。

从后向前遍历每个 Bit 位

BitMap 返回的数据是 10 进制的,只需要让得到的 10 禁止数字 和 1 进行与运算,每与一次就将签到结果右移一位,实现遍历。

统计当前用户截止当前时间在本月的连续签到天数

请求方式 请求路径 请求参数 返回值
GET /user/sign/count 连续签到天数
@Override
public Result signCount() {
    // 1. 获取当前登录用户
    Long userId = UserHolder.getUser().getId();

    // 2. 获取日期
    LocalDateTime nowDateTime = LocalDateTime.now();
    String formatTime = nowDateTime.format(DateTimeFormatter.ofPattern(":yyyyMM"));

    // 3. 拼接 Key
    String key = USER_SIGN_KEY + userId + formatTime;

    // 4. 获取今天是本月的第几天(1~31,BitMap 则为 0~30)
    int dayOfMonth = nowDateTime.getDayOfMonth();

    // 5. 获取本月截止今天的所有签到记录,返回的是一个 十进制数字
    // BITFIELD sign:1010:202210 GET u26 0 (当前为 26 号)
    List<Long> result = stringRedisTemplate.opsForValue().bitField(
            key,
            BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
    );
    if (result.isEmpty() || result == null) {
        return Result.ok(0);
    }
    Long num = result.get(0);
    if (num == 0 || num == null) {
        return Result.ok(0);
    }

    // 6. 让这个数字与 1 做 与运算,得到数字的最后一个 Bit 位;判断这个 Bit 位是否为 0。
    int count = 0;
    while (true) {
        // 0:未签到,结束
        if ((num & 1) == 0) {
            break;
        } else {
            // 非0:签到,计数器 +1
            count ++;
        }
        // 右移一位,抛弃最后一个 Bit 位,继续下一个 Bit 位。
        // num = num >> 1;
        num >>>= 1;
    }

    return Result.ok(count);
}

15. UV 统计

15.1 HyperLogLog 数据结构

UV & PV

  • UV Unique Visitor :独立访客量,一天内同一个用户多次访问该网站,只记录一次;
  • PV Page View :页面访问量,用户每访问网站的一个页面,记录一个 PV。

UV 统计在服务器端比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存;但是如果所有访问过该网站的用户都保存到 Redis 中,数据量会十分大。

HyperLogLog(HLL) 用于确定非常大的集合的基数,而不需要存储其所有值。

  • 基数:假设数据集 {1,3,5,7,5,7,8},那么这个数据集的基数集为 {1,3,5,7,8},基数(不重复的元素)为 5。
  • Redis 中的 HyperLogLog 是基于 String 数据结构实现的,单个 HLL 的内存永远小于 16 KB,内存占用非常非常低。
  • 但是它的测量存在小于 0.81% 的误差,不过对于 UV 统计而言,几乎可以忽略。

PFADD & PFCOUNT & PFMERGE

127.0.0.1:6379> pfadd hll e1 e2 e3 e4 e5
(integer) 1
127.0.0.1:6379> PFCOUNT hll
(integer) 5
127.0.0.1:6379> pfadd hll e1 e2 e3 e4 e5
(integer) 0
127.0.0.1:6379> PFCOUNT hll
(integer) 5

127.0.0.1:6379> pfadd set1 e1 e2 e3 e4 e5
(integer) 1
127.0.0.1:6379> pfadd set2 e4 e5 e6 e7 e8
(integer) 1
# 合并 set1 set2 得到并集 set3
127.0.0.1:6379> pfmerge set3 set1 set2
OK
127.0.0.1:6379> pfcount set3
(integer) 8

15.2 测试百万级数据的统计

利用单元测试,向 HyperLogLog 中添加 100 万条数据,查看内存占用和统计效果:

@Test
void millionDataHyperLogLogTest() {
    String[] users = new String[1000];
    int j = 0;
    for (int i = 0; i < 1000000; i++) {
        j = i % 1000;
        users[j] = "user_" + i;
        // 分批导入,每 1000 条数据写入一次
        if (j == 999) {
            stringRedisTemplate.opsForHyperLogLog().add("hll", users);
        }
    }
    Long hllSize = stringRedisTemplate.opsForHyperLogLog().size("hll");
    System.out.println("size = " + hllSize);    // size = 997593
}
  • 测试之前 和 测试之后的内存占用:1106056 、1118960;
  • HyperLogLog 占用内存:(1118960 - 1106056) / 1024 = 12.6KB

问题汇总

JSONException: A JSONObject text must begin with '{'

List<ShopType> 转 JSON,JSON 的格式为 ["", "", ..],最外层为 [],导致报错。

redisTemplate.opsForValue().set(
  CACHE_SHOP_TYPE_KEY, JSONUtil.toJsonStr(shopTypeList), TTL_THIRTY, TimeUnit.MINUTES
);

解决方案:将 Redis 缓存的字符串转换为数组,再将数据转换为 ShopType 类型的 List。

List<ShopType> shopTypeList = JSONUtil.toList(JSONUtil.parseArray(shopTypeJson), ShopType.class);

Json timestamp 转 LocalDateTime 报错

JSON parse error: raw timestamp (1642066339000) not allowed for java.time.LocalDateTime: need additional information such as an offset or time-zone

数据库查询后,返回给调用方时序列化时进行了一次转换,将 LocalDateTime 转换为 时间戳;但是接受方需要 LocalDateTime。

需要加一个 LocalDateTime 序列化的配置类,在接收时再转换一次。

@Configuration
public class LocalDateTimeSerializerConfig {

    @Bean
    public Jackson2ObjectMapperBuilderCustomizer jackson2ObjectMapperBuilderCustomizer() {
        return builder -> {
            builder.serializerByType(LocalDateTime.class, new LocalDateTimeSerializer());
            builder.deserializerByType(LocalDateTime.class, new LocalDateTimeDeserializer());
        };
    }

    /**
     * 序列化
     */
    public static class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> {
        @Override
        public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider serializers)
                throws IOException {
            if (value != null) {
                long timestamp = value.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
                gen.writeNumber(timestamp);
            }
        }
    }

    /**
     * 反序列化
     */
    public static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {
        @Override
        public LocalDateTime deserialize(JsonParser p, DeserializationContext deserializationContext)
                throws IOException {
            long timestamp = p.getValueAsLong();
            if (timestamp > 0) {
                return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
            } else {
                return null;
            }
        }
    }
}

项目介绍

后端

Spring 相关:

  • Spring Boot 2.x
  • Spring MVC

数据存储层:

  • MySQL:存储数据
  • MyBatis Plus:数据访问框架

Redis 相关:

  • spring-data-redis:操作 Redis
  • Lettuce:操作 Redis 的高级客户端
  • Apache Commons Pool:用于实现 Redis 连接池
  • Redisson:基于 Redis 的分布式数据网格

工具库:

  • HuTool:工具库合集
  • Lombok:注解式代码生成工具

前端

前端不是本项目的重点,了解即可。

  • 原生 HTML、CSS、JS 三件套
  • Vue 2(渐进式使用)
  • Element UI 组件库
  • Axios 请求库
src
├── main
│   ├── java
│   │   └── com
Comment
├── config :存放项目依赖相关配置;
│   ├── LocalDateTimeSerializerConfig.java :解决 Json timestamp 转 LocalDateTime 的报错问题;
│   ├── MybatisPlusConfiguration.java :配置 MyBatis Plus 分页插件;
│   ├── RedisConfiguration.java :创建单例 Redisson 客户端;
│   ├── WebExceptionAdvice.java :全局响应拦截器;
│   └── WebMvcConfiguration.java :配置了登录、自动刷新登录 Token 的拦截器。
│
├── controller :存放 Restful 风格的 API 接口;
│
├── dto :存放业务封装类,如 Result 通用响应封装(不推荐学习它的写法);
│
├── entity :存放和数据库对应的 Java POJO;
│
├── interceptor :登录拦截器 & 自动刷新 Redis 登录 Token 有效期;
│
├── mapper :存放操作数据库的代码;
│
├── service :存放业务逻辑处理代码;
│   ├── BlogCommentsService.java
│   ├── BlogService.java : 基于 Redis 实现点赞、按时间排序的点赞排行榜;基于 Redis 实现拉模式的 Feed 流;
│   ├── FollowService.java :基于 Redis 集合实现关注、共同关注;
│   ├── ShopService.java : 基于 Redis 缓存优化店铺查询性能;基于 Redis GEO 实现附近店铺按距离排序;
│   ├── UserService.java : 基于 Redis 实现短信登录(分布式 Session);
│   ├── VoucherOrderService.java :基于 Redis 分布式锁、Redis + Lua 两种方式,结合消息队列,共同实现了秒杀和一人一单功能;
│   ├── VoucherService.java :添加优惠券,并将库存保存在 Redis 中,为秒杀做准备。
│
└── utils :存放项目内通用的工具类;
    ├── CacheClient.java :封装了通用的缓存工具类,涉及泛型、函数式编程等知识点;
    ├── DistributedLock.java
    ├── RedisConstants.java :保存项目中用到的 Redis 键、过期时间等常量;
    ├── RedisData.java
    ├── RedisIdWorker.java :基于 Redis 的全局唯一自增 ID 生成器;
    ├── SimpleDistributedLockBasedOnRedis.java :简单的 Redis 锁实现,了解即可,一般用 Redisson;
    └── UserHolder.java :线程内缓存用户信息。
0

评论区