<>redis相关学习
<>redis事务
Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
Redis事务的主要作用就是串联多个命令防止别的命令插队。
<>Multi、Exec、discard
Multi:开启事务相当于创建一个队列,将需要执行的redis操作放入队列中 Exec:执行队列中的redis操作
discard:直接关闭创建的队列不需要执行
<>redis事务对异常的处理
multi:开启组队,向队列中放redis指令时,
1.如果依次放入的指令没有语法错误,在exec执行时会正常根据存入顺序依次执行,即使有些指令有错误(不是语法错误)也不会回滚,执行失败的指令会跳过继续执行下一条
2.如果依次存入的指令有语法错误,在exec是会进行语法检查,直接报错队列中所有指令都不会执行
<>1.组队有语法错误执行情况
<>2.组队无语法错误执行情况
<>事务能应用的场景
由于事务执行是连续串行的不会受到其他命令插队,开启事务我们可以执行一整个流程,例如商品抢购,完成库存扣除,购买成功用户添加的过程
<>redis中的自带锁机制
<>乐观锁
乐观锁,顾名思义乐观锁就是对共享资源默认不加锁,认为没有被其他线程占用,只有当资源被修改时才会通过cas进行通知其他获取该资源的线程,共享资源被修改你们需要获取新的数据(一般通过添加版本号的形式来解决)
<>悲观锁
顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
<>redis中的监听机制watch(乐观锁)
我们可以通过watch监听我们的key,然后开启事务,如果在事务开启组队过程中,我们监听的key被人修改了,那么我们组队完成执行事务时,将会不生效
<>案例展示
//秒杀过程 public static boolean doSecKill(String uid,String prodid) throws
IOException { //1 uid和prodid非空判断 if(uid == null || prodid == null) { return
false; } //2 连接redis //Jedis jedis = new Jedis("192.168.44.168",6379);
//通过连接池得到jedis对象 JedisPool jedisPoolInstance = JedisPoolUtil.
getJedisPoolInstance(); Jedis jedis = jedisPoolInstance.getResource(); //3 拼接key
// 3.1 库存key String kcKey = "sk:"+prodid+":qt"; // 3.2 秒杀成功用户key String userKey
= "sk:"+prodid+":user"; //监视库存 jedis.watch(kcKey); //4 获取库存,如果库存null,秒杀还没有开始
String kc = jedis.get(kcKey); if(kc == null) { System.out.println("秒杀还没有开始,请等待")
; jedis.close(); return false; } // 5 判断用户是否重复秒杀操作 if(jedis.sismember(userKey,
uid)) { System.out.println("已经秒杀成功了,不能重复秒杀"); jedis.close(); return false; }
//6 判断如果商品数量,库存数量小于1,秒杀结束 if(Integer.parseInt(kc)<=0) { System.out.println(
"秒杀已经结束了"); jedis.close(); return false; } //7 秒杀过程 //使用事务 Transaction multi =
jedis.multi(); //组队操作 multi.decr(kcKey); multi.sadd(userKey,uid); //执行 List<
Object> results = multi.exec(); if(results == null || results.size()==0) {
System.out.println("秒杀失败了...."); jedis.close(); return false; } //7.1 库存-1
//jedis.decr(kcKey); //7.2 把秒杀成功用户添加清单里面 //jedis.sadd(userKey,uid); System.out.
println("秒杀成功了.."); System.out.println("Thread.currentThread().getName() = " +
Thread.currentThread().getName()); jedis.close(); return true; }
<>使用redis实现分布式锁
在分布式系统中我们可以通过redis实现分布式锁解决一些线程并发对资源的占用情况
<>分布式锁
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java
API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题! package com.
example.demo; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.
springframework.data.redis.core.script.DefaultRedisScript; import org.
springframework.stereotype.Component; import java.util.Arrays; import java.util.
concurrent.TimeUnit; /** * Redis的分布式锁工具类 * 利用 setnx 实现乐观锁 * * @author luqianqi
* @Date 2021-08-08 21:50:16 */ @Component public class RedisTemplateUtil { //
锁名称 prefix expire 锁对象 // public static final String LOCK_PREFIX = "RedisLock";
// 加锁失效时间,毫秒,通常是业务执行时间的3-5倍 public static final int LOCK_EXPIRE = 200; // 轮询次数
根据项目而定 public static final int COUNT = 20; // 轮询间隔时间 一般在50-100ms,根据项目而定 public
static final int INTERVAL = 50; @Autowired private RedisTemplate redisTemplate;
/** * 加锁操作 * * @author luqianqi * @param key 锁的名称 * @param uuid 防止误删 * @return
*/ public Boolean setLock(String key,String uuid) { return redisTemplate.
opsForValue().setIfAbsent(key, uuid, LOCK_EXPIRE, TimeUnit.MILLISECONDS); } /**
* 释放锁 * @author luqianqi * @return true or false */ public Boolean deleteLock(
String key,String uuid) { //防止a由于系统卡顿锁自动释放,但是后续又执行了释放锁操作,释放了b的锁 final String s =
redisTemplate.opsForValue().get(key).toString(); if (s.equals(uuid)){ return
redisTemplate.delete(key); } return true; } /** * 释放锁 * @author luqianqi *
@return true or false */ public Boolean deleteLockLua(String key,String uuid) {
// 释放锁 del 使用lua脚本实现原子性 String script = "if redis.call('get', KEYS[1]) ==
ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; //
设置lua脚本返回的数据类型 DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>()
; // 设置lua脚本返回类型为Long redisScript.setResultType(Long.class); redisScript.
setScriptText(script); redisTemplate.execute(redisScript, Arrays.asList(key),
uuid); return true; } /** * 采用轮询的方式去加锁,轮询次数和轮询间隔时间写死 * 尝试去获得锁 * * @param key *
@param uuid 防止误删 * @return */ public Boolean tryCasLock(String key,String uuid)
{ int count = COUNT;//获取总轮询次数 while (count > 0) { count--; if (setLock(key,uuid)
) { return true; } else { try { Thread.sleep(INTERVAL); } catch (
InterruptedException e) { e.printStackTrace(); throw new RuntimeException(
"服务器繁忙,请稍后再试。"); } } } //如果一直没有取得锁,就返回false return false; } }
<>redis持久化
<>RDB
在指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到 一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。
整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能
如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。
<>相关配置
#当Redis无法写入磁盘的话,直接关掉Redis的写操作 stop-writes-on-bgsave-error yes
#对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。 rdbcompression yes
#在存储快照后,还可以让redis使用CRC64算法来进行数据校验 rdbchecksum yes #快照文件名 dbfilename dump.rdb
#快照文件生成路径 dir ./ #一小时一个key修改持久化 save 3600 1
<>优势
适合大规模的数据恢复 对数据完整性和一致性要求不高更适合使用 节省磁盘空间 恢复速度快
<>劣势
Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑 虽然Redis在fork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。
在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改。
<>AOF
以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录),
只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis
重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作 AOF默认不开启 可以在redis.conf中配置文件名称,默认为
appendonly.aof
<>AOF流程
(1)客户端的请求写命令会被append追加到AOF缓冲区内;
(2)AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;
(3)AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;
(4)Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;
<>相关配置
#开启aof appendonly yes #aof文件名称 appendfilename "appendonly.aof" #同步频率
appendfsync always 始终同步,每次Redis的写入都会立刻记入日志;性能较差但数据完整性比较好 appendfsync everysec
每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。 appendfsync no redis不主动进行同步,把同步时机交给操作系统。
<>同时开启
RDB和AOF同时开启优先使用AOF
<>优势
备份机制更稳健,丢失数据概率更低。 可读的日志文本,通过操作AOF稳健,可以处理误操作
<>劣势
比起RDB占用更多的磁盘空间。 恢复备份速度要慢。 每次读写都同步的话,有一定的性能压力。 存在个别Bug,造成恢复不能。
<>redis主从复制
<>一主多从,读写分离
<>启动三个redis服务相关配置
include /myredis/redis.conf pidfile /var/run/redis_6379.pid port 6379
dbfilename dump6379.rdb
<>启动三台全为master
<>配从(库)不配主(库)
slaveof <ip><port> 成为某个实例的从服务器
<>主写从读(从写会报错)
<>一主多从总结
主挂掉,从不会成为主 从挂掉,需重新声明主
<>主从复制原理
Slave启动成功连接到master后会发送一个sync命令 Master接到命令启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,
在后台进程执行完毕之后,master将传送整个数据文件到slave,以完成一次完全同步
全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。 增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步
但是只要是重新连接master,一次完全同步(全量复制)将被自动执行
<>redis哨兵(反客为主)
<>redis集群
<>redis常见的问题
<>缓存穿透
key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。
<>解决方案
(1)对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟
(2)设置可访问的名单(白名单):
使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。
(3)采用布隆过滤器:(布隆过滤器(Bloom
Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。
布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。)
将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。
<>缓存击穿
key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮
<>解决方案
(1)预先设置热门数据:在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长
(2)实时调整:现场监控哪些数据热门,实时调整key的过期时长
<>缓存雪崩
key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,前者则是某一个key
<>解决方案
使用锁或队列:用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况
设置过期标志更新缓存:记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。
将缓存失效时间分散开:比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。