关于Redis持久化与主从与哨兵架构

  • 一、Redis持久化
    • 1、RDB快照(snapshot)
    • 2、AOF(append-only file)
      • AOF重写
      • RDB和AOF的选用:
    • 3、Redis 4.0 混合持久化
      • Redis数据备份策略:
  • 二、Redis主从架构
    • Redis 主从架构搭建,配置从节点步骤:
    • Redis主从工作原理
    • 主从复制(全量复制):
    • 数据部分复制
    • 主从复制(部分复制,断点续传):
    • Jedis连接代码示例:
      • 1、引入相关依赖:
      • 2、访问代码:
    • Redis 管道(Pipeline)
    • Redis Lua脚本
        • 脚本好处:
  • 三、Redis哨兵高可用架构
    • redis哨兵架构搭建步骤
    • sentinel集群重启之后的配置文件
    • StringRedisTemplate与RedisTemplate
      • Redis客户端命令与RedisTemplate方法的对照表

一、Redis持久化

1、RDB快照(snapshot)

Redis 默认将内存数据库快照以二进制的形式写入磁盘,文件名为 dump.rdb。

保存频率可进行设置:N 秒内数据集至少有 M 个改动便进行保存。

  1. 配置文件设置

例如设置:60 秒内有至少有 1000 个键被改动则进行保存

save 60 1000

关闭 RDB 只需要将所有的 save 保存策略注释掉即可

  1. 执行命令设置

进入 Redis 客户端,执行命令 savebgsave,即可生成 dump.rdb 文件,每次命令执行都会将所有 Redis 内存快照写到一个新的 rdb 文件里,并覆盖原有 rdb 快照文件。

bgsave 的写时复制(COW)机制

Redis 使用写时复制技术(Copy-On-Write)来执行 bgsave,即在生成快照的同时,仍然可以正常处理写命令。

即 bgsave 由 Redis 主进程 fork 出一个子进程执行。该子进程是主进程的副本,负责读取主线程的内存数据并将其写入 RDB 文件。当发生修改操作(增删改)时,修改是在子进程的副本上进行的,并将修改后的数据写入一个临时的 RDB 文件。写完后,通过原子替换操作将临时的 RDB 文件替换掉旧的 RDB 文件。这确保了在生成快照的同时,对数据进行了保护性拷贝,使得主线程和子进程相互独立,互不干扰。

save 与 bgsave 对比:

命令savebgsave
IO类型同步异步
是否阻塞 redis 其它命令否(在生成子进程执行调用 fork 函数时会有短暂阻塞)
复杂度O(n)O(n)
优点不会消耗额外内存不阻塞客户端命令
缺点阻塞客户端命令需要 fork 子进程,消耗内存

配置设置的自动生成 RDB 文件时,Redis 后台使用了 bgsave 方式

2、AOF(append-only file)

当 Redis 因某些原因宕机时,会导致最近的写入数据丢失,尚未保存到快照中。为了解决这个问题,Redis 从1.1 版本开始引入 AOF(Append-Only File)持久化机制。

AOF 持久化的工作原理是将每一条修改数据的指令都记录在一个文件(通常是 appendonly.aof)中。这个文件的写入操作首先发生在操作系统的缓存中(os cache),然后每隔一段时间或者在执行特定操作后,Redis 会调用 fsync 将这些操作同步到磁盘上,确保数据持久保存。

此时 Redis 因为故障停机,便可通过 AOF 文件中的记录,将丢失的数据重新加载到 Redis 中,从而减小数据丢失的风险。这种方式相比于传统的 RDB 快照方式更加可靠,但牺牲了一定的性能。

例如执行命令“set zhangsan 666”,AOF 文件记录:

*3$3set$5zhangsan$3666

此为一种 resp 协议格式数据,星号后面的数字代表命令有多少个参数,$ 号后面的数字代表此参数有几个字符

若执行带过期时间的 set 命令,AOF 文件里记录的并不是执行的原始命令,而是记录 key 过期的时间戳

例如执行**“set autumn 888 ex 1000”**,对应 aof 文件里记录如下

*3$3set$6autumn$3888*3$9PEXPIREAT$6autumn$131604249786301

可通过修改配置文件来打开 AOF 功能(默认被注释):

appendonly yes 

重启 Redis 服务之后生效,即可每执行一个修改(增删改)命令便追加到 AOF 文件的末尾。
之后当 Redis 重启时, 程序就可以通过重新执行 AOF 文件中的命令来达到重建数据集的目的。
配置 Redis 多久才将数据 fsync 到磁盘一次的选项有三种:

  1. appendfsync always:每次有新命令追加到 AOF 文件时就执行一次 fsync ,非常慢但非常安全。
  2. appendfsync everysec:每秒 fsync 一次,速度快,且故障时只丢失 1 秒数据。
  3. appendfsync no:从不 fsync ,将数据交给操作系统来处理,最快也最不安全。
appendfsync everysec

推荐默认设置:每秒 fsync 一次, 这种 fsync 策略可以兼顾速度和安全性。

AOF重写

AOF 文件中可能存在大量的冗余指令,这些指令记录了Redis服务器接收到的每个写入操作。为了优化AOF 文件,Redis 引入了 AOF Rewrite 机制。该机制根据内存中的最新数据,以更为紧凑和高效的方式重新生成 AOF 文件。这个过程并非按照固定时间表执行,而是由 Redis 在后台根据一定条件启动。AOF Rewrite 通过消除冗余指令、整理写入操作,从而提高了 AOF 文件的性能和可读性,同时减小了文件的体积。

例如,执行如下命令:

127.0.0.1:6379> incr readcount(integer) 1127.0.0.1:6379> incr readcount(integer) 2127.0.0.1:6379> incr readcount(integer) 3127.0.0.1:6379> incr readcount(integer) 4127.0.0.1:6379> incr readcount(integer) 5

重写后 AOF 文件:

*3$3SET$2readcount$15

配置 AOF 自动重写的频率:

# AOF 文件至少达 64M 才自动重写,因为文件若太小,恢复速度本来就很快,重写的意义就不大# auto-aof-rewrite-min-size 64mb# AOF 文件自上一次重写后文件大小增长了 100% 则再次触发重写# auto-aof-rewrite-percentage 100

AOF 也可以手动重写,进入 Redis 客户端执行命令 bgrewriteaof 便能重写 AOF

RDB和AOF的选用:

命令RDBAOF
启动优先级
体积
恢复速度
数据安全性容易丢数据根据策略决定

生产环境可以都启用,Redis 启动时若既有 RDB 文件又有 AOF 文件,Redis 则优先选择 AOF 文件恢复数据,通常 AOF 记录了更详细的操作历史。

但 AOF 文件也可能会比较大,且还原速度相对较慢,因此在某些情况下,也以根据需求考虑使用 RDB 文件来加速还原过程。

3、Redis 4.0 混合持久化

重启 Redis 时,很少使用 RDB 来恢复内存状态,因为会丢失大量数据。通常使用 AOF 日志重放,但是重放 AOF 日志性能相对 RDB 慢很多,文件越大,恢复花费的时间越长。 Redis 4.0 为了解决这个问题,引入了混合持久化。

配置文件开启混合持久化:(必须先开启 AOF

# 必须先开启 AOFappendonly yes aof-use-rdb-preamble yes 

如果开启了混合持久化,AOF 在重写时不再简单地将内存数据转换为 RESP 命令写入 AOF 文件。相反,它会在重写开始时对当前内存做一个 RDB 快照,将这个快照和之后产生的 AOF 文件增量命令一同写入新的 AOF 文件。在重写过程中,这个新文件的名称不是 appendonly.aof,而是一个临时文件。只有在整个重写过程完成后,才会将新 AOF 文件改名为 appendonly.aof,从而覆盖原有的 AOF 文件。

这种机制的好处在于,Redis 在重启时可以加载 RDB 文件的内容,然后重放增量 AOF 日志。这样就无需完全依赖原始的 AOF 全量文件进行重放,从而大幅提升了重启效率。这也是混合持久化机制下 AOF 重写的优势之一。

混合持久化 AOF 文件结构:

Redis数据备份策略:

  1. 定时全量备份: 使用 crontab 定时调度脚本,每小时将 rdb 或 aof 的备份复制到指定目录,仅保留最近 48 小时的备份。
  2. 增加增量备份: 除了全量备份,考虑增加每小时的增量备份,以减小备份文件的大小。
  3. 定期测试备份可用性: 定期测试备份的还原过程,确保在需要时可以顺利还原数据,防范备份文件损坏的可能。
  4. 使用分层次备份: 采用周全量备份和每日增量备份的分层次备份,平衡备份完整性和存储空间的利用率。
  5. 考虑异地备份: 将备份复制到其他机器,甚至考虑异地或云存储服务,提高数据的安全性。
  6. 监控备份过程: 设置监控,确保备份任务正常运行,包括备份文件大小、备份所花费时间和备份的可还原性。
  7. 文档化备份策略: 编写详细文档记录备份策略和还原步骤,以便团队协作和未来的维护。
  8. 保护备份访问权限: 确保备份文件的读写权限受到保护,只有授权人员可以访问和修改。

以上策略的制定旨在确保备份的完整性、及时性和可还原性。不过具体的备份策略需要根据业务需求和系统特点进行调整。

二、Redis主从架构

Redis 主从架构搭建,配置从节点步骤:

  1. 复制配置文件: 复制一份原始 redis.conf 文件。
  2. 修改配置: 修改复制的配置文件,调整端口、进程号、日志文件路径、数据存放目录等配置,同时注释掉 bind 以允许从节点可以通过任何网络接口访问。
port 6380# 把pid进程号写入 pidfile 配置的文件pidfile /var/run/redis_6380.pidlogfile "6380.log"# 指定数据存放目录dir /usr/local/redis-5.0.3/data/6380# bind 127.0.0.1
  1. 配置主从复制: 在配置文件中加入主从复制的相关配置,包括主节点的 IP 和端口,以及设置从节点为只读模式。
# 从本机 6379 的 redis 实例复制数据,Redis 5.0 之前使用 slaveofreplicaof 192.168.0.60 6379replica-read-only yes
  1. 启动从节点: 使用修改后的配置文件启动 Redis 从节点。
redis-server redis.conf
  1. 连接从节点: 使用 redis-cli 连接到从节点的指定端口。
redis-cli -p 6380
  1. 测试同步: 在主节点写入数据,确保从节点能够及时同步新的修改数据。
  2. (可选)配置额外从节点: 重复以上步骤,配置额外的从节点,如配置一个 6381 的从节点。

Redis主从工作原理

  1. PSYNC 命令触发复制: 当 Redis 主节点(Master)配置了从节点(Slave)时,不论是首次连接还是重连,Slave 会发送 PSYNC 命令给 Master 请求数据复制。
  2. Master 数据持久化: Master 在收到 PSYNC 命令后,会在后台进行数据持久化,通过 bgsave 生成最新的 RDB 快照文件。在持久化期间,Master 继续接收客户端请求,将这些可能修改数据集的请求缓存在内存中。
  3. RDB文件传输: 持久化完成后,Master 将生成的 RDB 文件数据集发送给 Slave。Slave 接收到数据后,进行持久化,生成 RDB 文件,并加载到内存中。
  4. 命令传输: Master 将之前缓存在内存中的命令发送给 Slave,确保在持久化期间客户端的修改得以同步。
  5. 自动重连: 如果 Master 与 Slave 之间的连接由于某些原因断开,Slave 能够自动重新连接 Master。
  6. 多个 Slave 连接: 如果 Master 同时收到多个 Slave 并发连接请求,Master 只会进行一次持久化,而不是为每个连接都进行一次。Master 将这一份持久化的数据发送给多个并发连接的 Slave。

这种机制确保了数据的可靠传输和同步,同时最小化了对 Master 的持久化操作的重复执行。

主从复制(全量复制):

数据部分复制

当master和slave断开重连后,一般都会对整份数据进行复制。但从redis2.8版本开始,redis改用可以支持部分数据复制的命令PSYNC去master同步数据,slave与master能够在网络连接断开重连后只进行部分数据复制(断点续传)。

master会在其内存中创建一个复制数据用的缓存队列,缓存最近一段时间的数据,master和它所有的slave都维护了复制的数据下标offset和master的进程id,因此,当网络连接断开后,slave会请求master继续进行未完成的复制,从所记录的数据下标开始。如果master进程id变化了,或者从节点数据下标offset太旧,已经不在master的缓存队列里了,那么将会进行一次全量数据的复制。

主从复制(部分复制,断点续传):


若有很多从节点,会存在主从复制风暴(即多个从节点同时复制主节点导致主节点压力过大)。

解决方案:可将从节点分为两个级别,一级从节点与主节点直接同步,而二级从节点与一级从节点同步。这样,主节点只需负责与一级从节点同步,而一级从节点负责与二级从节点同步,有助于减轻主节点的负担。

Jedis连接代码示例:

1、引入相关依赖:

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency>

2、访问代码:

public class JedisSingleTest {public static void main(String[] args) throws IOException {JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPoolConfig.setMinIdle(5);// timeout,这里既是连接超时又是读写超时,从 Jedis 2.8 开始有区分 connectionTimeout 和 soTimeout 的构造函数JedisPool jedisPool = new JedisPool(jedisPoolConfig, "192.168.0.60", 6379, 3000, null);Jedis jedis = null;try {//从redis连接池里拿出一个连接执行命令jedis = jedisPool.getResource();System.out.println(jedis.set("single", "zhuge"));System.out.println(jedis.get("single"));//管道示例//管道的命令执行方式:cat redis.txt | redis-cli -h 127.0.0.1 -a password - p 6379 --pipe/*Pipeline pl = jedis.pipelined();for (int i = 0; i < 10; i++) {pl.incr("pipelineKey");pl.set("zhuge" + i, "zhuge");}List results = pl.syncAndReturnAll();System.out.println(results);*///lua 脚本模拟一个商品减库存的原子操作//lua 脚本命令执行方式:redis-cli --eval /tmp/test.lua , 10/*jedis.set("product_count_10016", "15");//初始化商品10016的库存String script = " local count = redis.call('get', KEYS[1]) " +" local a = tonumber(count) " +" local b = tonumber(ARGV[1]) " +" if a >= b then " +" redis.call('set', KEYS[1], a-b) " +" return 1 " +" end " +" return 0 ";Object obj = jedis.eval(script, Arrays.asList("product_count_10016"), Arrays.asList("10"));System.out.println(obj);*/} catch (Exception e) {e.printStackTrace();} finally {//注意这里不是关闭连接,在 JedisPool 模式下,Jedis 会被归还给资源池。if (jedis != null)jedis.close();}}}

代码中包含 lua 与 管道使用

Redis 管道(Pipeline)

客户端可以一次性发送多个请求而不用等待服务器的响应,待所有命令都发送完后再一次性读取服务的响应,这样可以极大降低多条命令执行的网络传输开销,管道执行多条命令的网络开销实际上只相当于一次命令执行的网络开销。

用 pipeline 方式打包命令发送,Redis 必须在处理完所有命令前先缓存起所有命令的处理结果。打包的命令越多,缓存消耗内存也越多。所以并不是打包的命令越多越好。

pipeline 中发送的每个 command 都会被 server 立即执行,若执行失败,则会在此后的响应中得到信息,但是管道中前面命令失败,后面命令不会有影响,继续执行

详细代码示例见上方 jedis 连接示例:

Pipeline pl = jedis.pipelined();for (int i = 0; i < 10; i++) {pl.incr("pipelineKey");pl.set("zhuge" + i, "zhuge");//模拟管道报错// pl.setbit("zhuge", -1, true);}List<Object> results = pl.syncAndReturnAll();System.out.println(results);

Redis Lua脚本

Redis 在 2.6 引入脚本支持,允许开发者使用 Lua 语言编写脚本传到 Redis 中执行。

脚本好处:
  1. 减少网络开销: 通过将多个命令放在一个脚本中执行,减少了网络往返时延,类似于使用管道(pipeline)的效果,提高了性能。
  2. 原子操作: Redis 会将整个 Lua 脚本作为一个整体执行,确保了原子性。这在某些需要原子性操作的场景中非常有用,尤其是相较于管道而言。
  3. 替代事务功能: Redis 的事务功能有一些限制,而 Lua 脚本可以几乎实现通常的事务功能。官方也建议使用 Redis Lua 脚本替代事务功能,因为它更为灵活且性能更好。

官网文档的一段话:

A Redis script is transactional by definition, so everything you can do with a Redis transaction, you can also do with a script, and usually the script will be both simpler and faster.
根据定义,Redis 脚本是事务性的,因此您可以使用 Redis 事务执行任何操作,也可以使用脚本执行任何操作,而且通常脚本会更简单、更快。

从 Redis 2.6.0 版本开始,通过内置的 Lua 解释器,可以使用 EVAL 命令对 Lua 脚本进行求值。

EVAL 命令的格式:

EVAL script numkeys key [key ...] arg [arg ...] 
  1. script 命令的 参数 是一段 Lua 脚本程序,它在 Redis 服务器上下文中运行。这段脚本不需要也不应该定义为一个 Lua 函数。
  2. numkeys 参数用于指定键名参数的个数。键名参数 key [key …]EVAL 命令的第三个参数开始计数,表示在脚本中所用到的 Redis 键(key)。这些键名参数可以在 Lua 中通过全局变量 KEYS 数组访问,以 1 为基址的形式访问(KEYS[1]KEYS[2],以此类推)。
  3. 在命令的最后,那些不是键名参数的附加参数 arg [arg …] 可以在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似(ARGV[1]ARGV[2],诸如此类)。

例如:

127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second1) "key1"2) "key2"3) "first"4) "second"
  1. “return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}” 是待求值的 Lua 脚本,其中 return 语句用于返回结果。这个脚本通过 KEYSARGV 全局变量获取键名参数和附加参数。
  2. 数字 2 指定了键名参数的数量,即 key1key2
  3. key1key2 是键名参数,可以通过 KEYS[1]KEYS[2] 在 Lua 脚本中访问。
  4. firstsecond 是附加参数,可以通过 ARGV[1]ARGV[2] 在 Lua 脚本中访问。
  5. 结果返回了一个数组 {“key1”, “key2”, “first”, “second”}

在 Lua 脚本中,可以使用 **redis.call() **函数来执行 Redis 命令。

Jedis 调用示例详见上方 Jedis 连接示例:

jedis.set("product_stock_10016", "15");//初始化商品10016的库存String script = " local count = redis.call('get', KEYS[1]) " +" local a = tonumber(count) " +" local b = tonumber(ARGV[1]) " +" if a >= b then " +" redis.call('set', KEYS[1], a-b) " +" return 1 " +" end " +" return 0 ";Object obj = jedis.eval(script, Arrays.asList("product_stock_10016"), Arrays.asList("10"));System.out.println(obj);

注意点:

1. 避免死循环: 如果在 Lua 脚本中编写了死循环,它会导致 Redis 无法执行其他命令,因为 Lua 脚本的执行是阻塞的。
2. 避免耗时运算: 耗时的运算同样会导致 Redis 在执行脚本期间无法响应其他命令。Lua 脚本的执行应该是相对较快的,不应该包含长时间运行的计算。
3. 管道执行: 如果有大量的 Redis 命令需要执行,可以考虑使用管道(Pipeline)。使用管道可以将多个命令批量发送到 Redis 服务器,减少通信开销,但要确保这些命令是相对简单和快速的,不会阻塞 Redis。

三、Redis哨兵高可用架构

sentinel哨兵是特殊的redis服务,不提供读写服务,主要用来监控redis实例节点。

哨兵架构下client端第一次从哨兵找出redis的主节点,后续就直接访问redis的主节点,不会每次都通过sentinel代理访问redis的主节点,当redis的主节点发生变化,哨兵会第一时间感知到,并且将新的redis主节点通知给client端(这里面redis的client端一般都实现了订阅功能,订阅sentinel发布的节点变动消息)

  1. Sentinel不提供读写服务: Sentinel 本身并不提供对外的读写服务,它的职责主要是监控和管理 Redis 实例。这确保了 Sentinel 的轻量级和高效性。
  2. 发现和通知主节点变更: Sentinel 的一个关键功能是发现 Redis 主节点的变更。当 Redis 主节点发生故障或变更时,Sentinel 会感知到这一变化,并及时通知相关的客户端。这确保了系统在主节点变更时可以无缝切换。
  3. 客户端第一次从哨兵获取主节点信息: 当客户端首次连接到 Sentinel 时,Sentinel 会告知客户端当前的 Redis 主节点是哪一个。之后,客户端会直接连接到 Redis 主节点,不再通过 Sentinel 代理。这可以减少对 Sentinel 的依赖,提高连接的效率。
  4. 实现订阅功能: 客户端一般实现了订阅功能,订阅 Sentinel 发布的节点变动消息。这样,当 Redis 主节点发生变化时,Sentinel 会发布消息通知所有订阅者,客户端就能及时获知主节点的变更信息。

总体来说,Sentinel 架构通过监控 Redis 主节点的健康状态,实现了高可用性和自动故障转移。客户端通过与 Sentinel 交互,能够获得当前主节点的信息,保证系统在主节点故障时能够迅速切换到备用节点,确保服务的可靠性。

redis哨兵架构搭建步骤

  1. 复制 Sentinel 配置文件:
cp sentinel.conf sentinel-26379.conf
  1. 修改配置文件 sentinel-26379.conf:
port 26379daemonize yespidfile "/var/run/redis-sentinel-26379.pid"logfile "26379.log"dir "/usr/local/redis-5.0.3/data"sentinel monitor mymaster 192.168.0.60 6379 2
  • port: Sentinel 的监听端口。
  • daemonize: 是否以守护进程方式运行。
  • pidfile: PID 文件路径。
  • logfile: 日志文件路径。
  • dir: 工作目录。
  • sentinel monitor mymaster 192.168.0.60 6379 2: 监控名为 mymaster 的 Redis 主节点,主节点的 IP 是 192.168.0.60,端口是 6379quorum 设置为 2
  1. 启动 Sentinel 哨兵实例:
src/redis-sentinel sentinel-26379.conf
  1. 查看 Sentinel 的 info 信息:
src/redis-cli -p 26379127.0.0.1:26379> info

能看到 Sentinel 的 info 信息中已经识别出了 Redis 的主从节点

  1. 配置更多 Sentinel 实例:

如果需要配置更多的 Sentinel 实例,可以按照相同的步骤,修改配置文件和端口号,例如,配置两个额外的 Sentinel,分别使用端口 2638026381

sentinel集群重启之后的配置文件

哨兵集群的元数据信息会被写入所有 sentinel 的配置文件中,追加在文件的最下方。

例如 sentinel-26379.conf:

sentinel known-replica mymaster 192.168.0.60 6380 #代表redis主节点的从节点信息sentinel known-replica mymaster 192.168.0.60 6381 #代表redis主节点的从节点信息sentinel known-sentinel mymaster 192.168.0.60 26380 52d0a5d70c1f90475b4fc03b6ce7c3c56935760f#代表感知到的其它哨兵节点sentinel known-sentinel mymaster 192.168.0.60 26381 e9f530d3882f8043f76ebb8e1686438ba8bd5ca6#代表感知到的其它哨兵节点

当 Redis 主节点挂了,哨兵集群会重新选举出新的 Redis 主节点,同时会修改所有 sentinel 节点配置文件的集群元数据信息。

例如 6379 的 Redis 挂了,假设选举出的新主节点是 6380 ,则 sentinel 文件里的集群元数据信息会变成:

sentinel known-replica mymaster 192.168.0.60 6379 #代表主节点的从节点信息sentinel known-replica mymaster 192.168.0.60 6381 #代表主节点的从节点信息sentinel known-sentinel mymaster 192.168.0.60 26380 52d0a5d70c1f90475b4fc03b6ce7c3c56935760f#代表感知到的其它哨兵节点sentinel known-sentinel mymaster 192.168.0.60 26381 e9f530d3882f8043f76ebb8e1686438ba8bd5ca6#代表感知到的其它哨兵节点

有可能会保留已失效的元数据信息

同时还会修改 sentinel 文件里之前配置的 mymaster 对应的 6379 端口,修改为6380:

sentinel monitor mymaster 192.168.0.60 6380 2

当 6379 的 redis 实例再次启动时,哨兵集群根据集群元数据信息就可以将 6379 端口的 Redis 节点作为从节点加入集群。

哨兵的 Jedis 连接代码:

public class JedisSentinelTest {public static void main(String[] args) throws IOException {JedisPoolConfig config = new JedisPoolConfig();config.setMaxTotal(20);config.setMaxIdle(10);config.setMinIdle(5);String masterName = "mymaster";Set<String> sentinels = new HashSet<String>();sentinels.add(new HostAndPort("192.168.0.60",26379).toString());sentinels.add(new HostAndPort("192.168.0.60",26380).toString());sentinels.add(new HostAndPort("192.168.0.60",26381).toString());//JedisSentinelPool其实本质跟JedisPool类似,都是与redis主节点建立的连接池//JedisSentinelPool并不是说与sentinel建立的连接池,而是通过sentinel发现redis主节点并与其建立连接JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(masterName, sentinels, config, 3000, null);Jedis jedis = null;try {jedis = jedisSentinelPool.getResource();System.out.println(jedis.set("sentinel", "zhuge"));System.out.println(jedis.get("sentinel"));} catch (Exception e) {e.printStackTrace();} finally {//注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。if (jedis != null)jedis.close();}}}

哨兵的 SpringBoot 整合 Redis 连接代码:
1、引入相关依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency>

SpringBoot 项目核心配置:

server:port: 8080spring:redis:database: 0timeout: 3000sentinel:#哨兵模式master: mymaster #主服务器所在集群名称 nodes: 192.168.0.60:26379,192.168.0.60:26380,192.168.0.60:26381 lettuce:pool:max-idle: 50min-idle: 10max-active: 100max-wait: 1000

访问代码:

@RestControllerpublic class IndexController {private static final Logger logger = LoggerFactory.getLogger(IndexController.class);@Autowiredprivate StringRedisTemplate stringRedisTemplate;/** * 测试节点挂了哨兵重新选举新的master节点,客户端是否能动态感知到 * 新的master选举出来后,哨兵会把消息发布出去,客户端实际上是实现了一个消息监听机制, * 当哨兵把新master的消息发布出去,客户端会立马感知到新master的信息,从而动态切换访问的masterip * * @throws InterruptedException */@RequestMapping("/test_sentinel")public void testSentinel() throws InterruptedException {int i = 1;while (true){try {stringRedisTemplate.opsForValue().set("zhuge"+i, i+"");System.out.println("设置key:"+ "zhuge" + i);i++;Thread.sleep(1000);}catch (Exception e){logger.error("错误:", e);}}}}

StringRedisTemplate与RedisTemplate

Spring 封装了 RedisTemplate 对象来进行对 Redis 的各种操作,支持所有的 Redis 原生的 API。

RedisTemplate 提供了几个常用的接口方法的使用:

private ValueOperations<K, V> valueOps;private HashOperations<K, V> hashOps;private ListOperations<K, V> listOps;private SetOperations<K, V> setOps;private ZSetOperations<K, V> zSetOps;

RedisTemplate 中定义了对 5 种数据结构操作

redisTemplate.opsForValue();//操作字符串redisTemplate.opsForHash();//操作 hashredisTemplate.opsForList();//操作 listredisTemplate.opsForSet();//操作 setredisTemplate.opsForZSet();//操作有序 set
  • StringRedisTemplate 继承自 RedisTemplate,同样拥有上方操作。
  • StringRedisTemplate 默认采用 String 的序列化策略,保存的 key 和 value 都是采用此策略序列化保存。
  • RedisTemplate 默认采用的是 JDK 的序列化策略,保存的 key 和 value 都是采用此策略序列化保存。

Redis客户端命令与RedisTemplate方法的对照表

String类型结构
RedisRedisTemplate rt
set key valuert.opsForValue().set(“key”,“value”)
get keyrt.opsForValue().get(“key”)
del keyrt.delete(“key”)
strlen keyrt.opsForValue().size(“key”)
getset key valuert.opsForValue().getAndSet(“key”,“value”)
getrange key start endrt.opsForValue().get(“key”,start,end)
append key valuert.opsForValue().append(“key”,“value”)
Hash结构
hmset key field1 value1 field2 value2…rt.opsForHash().putAll(“key”,map) //map是一个集合对象
hset key field valuert.opsForHash().put(“key”,“field”,“value”)
hexists key fieldrt.opsForHash().hasKey(“key”,“field”)
hgetall keyrt.opsForHash().entries(“key”) //返回Map对象
hvals keyrt.opsForHash().values(“key”) //返回List对象
hkeys keyrt.opsForHash().keys(“key”) //返回List对象
hmget key field1 field2…rt.opsForHash().multiGet(“key”,keyList)
hsetnx key field valuert.opsForHash().putIfAbsent(“key”,“field”,“value”
hdel key field1 field2rt.opsForHash().delete(“key”,“field1”,“field2”)
hget key fieldrt.opsForHash().get(“key”,“field”)
List结构
lpush list node1 node2 node3…rt.opsForList().leftPush(“list”,“node”)
rt.opsForList().leftPushAll(“list”,list) //list是集合对象
rpush list node1 node2 node3…rt.opsForList().rightPush(“list”,“node”)
rt.opsForList().rightPushAll(“list”,list) //list是集合对象
lindex key indexrt.opsForList().index(“list”, index)
llen keyrt.opsForList().size(“key”)
lpop keyrt.opsForList().leftPop(“key”)
rpop keyrt.opsForList().rightPop(“key”)
lpushx list nodert.opsForList().leftPushIfPresent(“list”,“node”)
rpushx list nodert.opsForList().rightPushIfPresent(“list”,“node”)
lrange list start endrt.opsForList().range(“list”,start,end)
lrem list count valuert.opsForList().remove(“list”,count,“value”)
lset key index valuert.opsForList().set(“list”,index,“value”)
Set结构
sadd key member1 member2…rt.boundSetOps(“key”).add(“member1”,“member2”,…)
rt.opsForSet().add(“key”, set) //set是一个集合对象
scard keyrt.opsForSet().size(“key”)
sidff key1 key2rt.opsForSet().difference(“key1”,“key2”) //返回一个集合对象
sinter key1 key2rt.opsForSet().intersect(“key1”,“key2”)//同上
sunion key1 key2rt.opsForSet().union(“key1”,“key2”)//同上
sdiffstore des key1 key2rt.opsForSet().differenceAndStore(“key1”,“key2”,“des”)
sinter des key1 key2rt.opsForSet().intersectAndStore(“key1”,“key2”,“des”)
sunionstore des key1 key2rt.opsForSet().unionAndStore(“key1”,“key2”,“des”)
sismember key memberrt.opsForSet().isMember(“key”,“member”)
smembers keyrt.opsForSet().members(“key”)
spop keyrt.opsForSet().pop(“key”)
srandmember key countrt.opsForSet().randomMember(“key”,count)
srem key member1 member2…rt.opsForSet().remove(“key”,“member1”,“member2”,…)
Copyright © maxssl.com 版权所有 浙ICP备2022011180号