入门

NoSQL的四大分类

image-20210813114228841

入门概述

什么是Redis?

Redis (Remote Dictionary Server ),即远程字典服务,是一个开源的支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。 可以用作数据可、缓存、消息中间件

Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 
它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets),
 有序集合(sorted sets) 与范围查询, bitmaps, hyperloglogs 和 地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication),LUA脚本(Lua scripting), LRU驱动事件(LRU eviction),
 事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)。

redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

redis中文官方网站 Redis推荐都是在Linux服务器上搭建的

Redis的作用

  1. 内存存储、持久化(rdb、aof)
  2. 效率高,用于高速缓存
  3. 发布订阅系统
  4. 地图信息分析
  5. 计数器

特性

  1. 多样的数据类型
  2. 持久化
  3. 支持集群、事务等

Windows安装

  1. 自行下载解压 不做详细说明

  2. 开启redis 双击运行服务 默认端口号为6379
    image-20210813122130103

  3. 使用redis客户端进行连接

    输入ping 命令返回pong 表示成功连接
    image-20210813122624181

windows下就介绍到此,毕竟企业级的开发都是在redis下开发的

Linux安装

  1. 宝塔轻易安装即可 进入www/server/redis可以看到redis的配置
    image-20210813132824676

  2. 因为redis的运行需要依赖gcc 进行编译,所以还要安装gcc环境

  yum install gcc-c++
  # 检查gcc是否成功安装
	gcc -v
  # 配置所需要的文件
  make 
  # 检查
  make install
image-20210813134948297
  1. redis的默认安装路径在==usr/local/bin==目录下
    image-20210813135541246

  2. 新建目录将redis的配置文件移动到当前

# 新建文件
mkdir qdconfig
# 复制配置文件
 cp /www/server/redis/redis.conf qdconfig

​ 我们之后用这个配置文件进行启动

  1. redis默认不是后台启动的,需要修改配置文件
vim redis.conf

image-20210813141252927
6. 以当前配置启动redis服务

 redis-server qdconfig/redis.conf 

image-20210813142826653
7. 测试set
image-20210813143111340
8. 查看服务
image-20210813144023219
9. 关闭服务

shutdown
exit

ok~~安装至此完成

性能测试

redis-benchmark 是一个压力测试工具,可以通过他测试性能问题
image-20210814083830888
举个栗子:测试10个并发,100个请求

redis-benchmark -h localhost -p 6379 -c 10 -n 100

image-20210814085729675

Reids基础

redis有16个数据库,默认使用第0个数据库,可以使用select 切换数据库 数据不同下标不同

127.0.0.1:6379> SELECT 3  # 切换数据库
OK
127.0.0.1:6379[3]> DBSIZE # 查看当前数据库的大小
(integer) 0

127.0.0.1:6379>  exists name # 判断key是否存在
(integer) 0
127.0.0.1:6379> exists name qiandu # 判断key为qiandu是否存在
(integer) 0   # 返回1为存在  返回0为不存在
127.0.0.1:6379> move name 1  # 移除key  1代表当前的数据库
(integer) 1
127.0.0.1:6379> TYPE name  # 查看key的具体类型
string

127.0.0.1:6379> expire name 10 # 设置过期时间
(integer) 1
127.0.0.1:6379> ttl name  # 查看剩余时间

127.0.0.1:6379[3]> keys *  # 查看所有的key
(empty array)
127.0.0.1:6379> FLUSHdb   # 清空当前库的key
OK
127.0.0.1:6379> FLUSHALL   # 清空全部数据库中的key
OK

更过命令:Redis命令中心

为什么Redis是单线程的呢?

Redis是单线程的! Redis是基于内存操作的,CPU并不是它的瓶颈,Reids的瓶颈是服务器的内存以及网络的带宽,这些可以通过单线程来实现

为什么Redis的单线程还那么快呢?

Redis是C语言写的,官方提供的数据为100000+的QPS(每秒查询率) 不比其他的慢。

另外多线程的效率也不一定比单线程的效率高,Reids将所有的数据都放入内存之中,多次读写都在一个cpu进行,多线程会使用CPU进行切换上下文的操作 这是一个耗时的操作,对于系统内存如果没有内存的切换效率就是最佳的

基本数据类型

Redis有5大常见数据类型 String List Set Hash Zset 3大特殊数据类型geospatial hyperloglog bitmaps

String

string是最常用的类型了

127.0.0.1:6379> set name qiandu  # 设置key与value
OK
127.0.0.1:6379> append name "hello"  # 追加字符串 如果key不存在 相当于新建
(integer) 11
127.0.0.1:6379> get name   # 获取值
"qianduhello"
127.0.0.1:6379> STRLEN name   # 获取字符串的长度
(integer) 11
#========================================================================
127.0.0.1:6379> incr i    # 自增 1
(integer) 1
127.0.0.1:6379> decr i  # 自减 1
(integer) 0
127.0.0.1:6379> incrby i 2  # 设置步长 指定增量
(integer) 2
127.0.0.1:6379> decrby i 3 # 设置步长 指定减量
(integer) -1
#========================================================================
127.0.0.1:6379> get name
"qianduhello"
127.0.0.1:6379> getrange name 2 4  # 取指定范围的字符串 [2,4]
"and"
127.0.0.1:6379> getrange name 0 -1  # 获取全部字符串 
"qianduhello"
127.0.0.1:6379> setrange name 0 w  #替换指定位置的字符串
(integer) 11
#========================================================================
# setex  # 设置过期时间
127.0.0.1:6379> setex key1 10 "qinadu" 
OK
# setnx # 不存在再设置 (在分布式锁中会使用)
127.0.0.1:6379> setnx key2 "study redis"
(integer) 1
#========================================================================
# mset    # 批量设置
127.0.0.1:6379> mset k1 v1 k2 v2  # 设置 k1=v1 k2=v2
OK
127.0.0.1:6379> keys *
1) "k1"
2) "k2"
# mget    # 批量获取
127.0.0.1:6379> mget k1 k2  # 获取key为k1、k2的值
1) "v1"
2) "v2"
127.0.0.1:6379> msetnx k1 v1 k4 v4  # msetnx 原子性操作 一起成功或者失败
(integer) 0
127.0.0.1:6379> keys *
1) "k1"
2) "k2"
#========================================================================
# 设置对象
127.0.0.1:6379> set user:1 {name:qiandu,age:12}
# 方法2
127.0.0.1:6379> mset user:2:name qiandu user:2:age 3
#========================================================================
# getset   #先获取值,在设置
127.0.0.1:6379> getset name "qiandu666" #如果不存在值 返回nil
(nil)
127.0.0.1:6379> get name
"qiandu666"
127.0.0.1:6379> getset name "qiandu"  # 如果存在 获取原来的值并且设置新的值
"qiandu666"
127.0.0.1:6379> get name
"qiandu"

String的使用场景:计数器(文章的统计数量等)、对象的缓存存储

List

List是一种的基本的数据类型

# lpush
127.0.0.1:6379> lpush list one  # 将值插入列表头部
(integer) 1
127.0.0.1:6379> lpush list wo
(integer) 2
127.0.0.1:6379> lpush list three
(integer) 3
127.0.0.1:6379> lrange list 0 -1  # 通过区间获取具体的值
1) "three"
2) "wo"
3) "one"
# rpush
127.0.0.1:6379> rpush list four  # 将值插入列表尾部
#========================================================================
127.0.0.1:6379> lpop list  # 移除头部
"three"
127.0.0.1:6379> rpop list  # 移除尾部
"four"
#========================================================================
# lindex
127.0.0.1:6379> lindex list 1   # 获取list的第一个值
"one"
# llen
127.0.0.1:6379> llen list  # 获取长度
(integer) 2
# lrem
127.0.0.1:6379> lrem list 1 two  # 从list中移除一个值为two的操作
#========================================================================
# rpoplpush  移除列表的最后一个元素并移动到新的列表之中
127.0.0.1:6379> rpoplpush list myList
"one"
127.0.0.1:6379> lrange list 0 -1
(empty array)
127.0.0.1:6379> lrange myList 0 -1
1) "one"
127.0.0.1:6379> 
#========================================================================
#lset  将列表中指定下标的值替换为另外一个  不存在列表会报错
127.0.0.1:6379> lset myList 0 qd
OK
127.0.0.1:6379> lrange myList 0 -1
1) "qd"
#========================================================================
# linsert  插入命令
127.0.0.1:6379> linsert  myList after qd qd666
(integer) 2
127.0.0.1:6379> lrange myList 0 -1
1) "qd"
2) "qd666"

小结

  • list实际上是一个链表
  • 如果key不存在,会创建新的链表
  • 如果key存在,新增内容
  • 如果移除了所有值,就是空链表,也代表不存在
  • 在两边插入操作效率更高
  • 可以用list实现消息队列的功能

Set

set中的值是不能重复的

127.0.0.1:6379> sadd myset qiandu qiandu666  # sadd 添加元素
(integer) 2
127.0.0.1:6379> smembers myset  # smembers 获取指定集合中的元素
1) "qiandu"
2) "qiandu666"
127.0.0.1:6379> sismember myset qiandu  # 判断集合中是否含有指定的值
(integer) 1    # 有返回1  无返回0
127.0.0.1:6379> sismember myset qiandua
(integer) 0
127.0.0.1:6379> scard myset  # 判断集合中元素的个数
(integer) 2
127.0.0.1:6379> srem myset qiandu666  # 移除集合中的指定元素
(integer) 1
#========================================================================
127.0.0.1:6379> SRANDMEMBER myset 1 # 随机获取元素
1) "qiandu"
127.0.0.1:6379> spop myset  # 随机删除元素
"qiandu"
#========================================================================
127.0.0.1:6379> smove myset2 myset qiandu123  # 将指定的元素移动到另一个集合之中
(integer) 1
127.0.0.1:6379> smembers myset
1) "qiandu123"
#========================================================================
127.0.0.1:6379> sdiff myset myset2  # 取myset中myset2没有的元素
(empty array)
127.0.0.1:6379> sinter myset myset2  # 取交集
1) "qiandu123"
127.0.0.1:6379> SUNION myset myset2  # 取并集
1) "qiandu123"
2) "qiandu888"

小结

set集合中存放的是无序的、不可重复的数据、实际应用场景:共同关注、好友推荐

Hash

hash,我们当作一个map集合对待就简单了 存放的是map集合 本质与String没有太大的区别

127.0.0.1:6379> hset myhash field qiandu
(integer) 1
127.0.0.1:6379> hget myhash field
"qiandu"


127.0.0.1:6379> hmset myhash field1 qiandu field2 hello  # 设置多个值
OK
127.0.0.1:6379> hmget myhash field1 field2   # 获取多个值
1) "qiandu"
2) "hello"

127.0.0.1:6379> hgetall myhash  # 获取所有的key与value
1) "field"
2) "qiandu"

127.0.0.1:6379> hdel myhash field   # 删除字段
(integer) 1

127.0.0.1:6379> hlen myhash    # 获取长度
(integer) 2

127.0.0.1:6379> hexists myhash field1  # 判断指定字段是否存在 
(integer) 1
#========================================================================
127.0.0.1:6379> hkeys myhash   # 只获取key
1) "field1"
2) "field2"
127.0.0.1:6379> hvals myhash  # 只获取value
1) "field2"
2) "hello"
#========================================================================
 hincrby myhash field3 5 # 指定增量

小结

可以用hash存放经常变动的数据、或者用户的信息

Zset

zset与set只差了一个z 但是zset可以排序

127.0.0.1:6379> zadd zset 1 one  # 添加一个值
(integer) 1
127.0.0.1:6379> zadd zset 2 two 3 three  # 添加多个值
(integer) 2
127.0.0.1:6379> zrange zset 0 -1  # 取得全部数据
1) "one"
2) "two"
3) "three"
#========================================================================
127.0.0.1:6379> zadd score 100 qiandu  200 xiaohong 300 xiaowang
(integer) 3
127.0.0.1:6379> ZRANGEBYSCORE score -inf +inf  # 排序 正(负)无穷
1) "qiandu"
2) "xiaohong"
3) "xiaowang"
127.0.0.1:6379> zrevrange score 0 -1  # 排序 从大到小
#========================================================================
127.0.0.1:6379> zrem score xiaohong   # 移除元素
(integer) 1

127.0.0.1:6379> zcard score   # 获取集合的大小
(integer) 2

127.0.0.1:6379> zcount score 1 3  # 获取指定区间的成员数量
(integer) 0

小结

set可以实现的zset都可以实现还要排序功能 使用场景:权重、排行榜

特殊数据类型

geospatial

这个是提供地理位置的api 比如打车、附近的人、两地距离都可以用它来实现

geoadd 添加地理位置

# getadd 添加地理位置
# 规则:两级无法直接添加,我们一般会下载城市数据,直接通过java程序一次性导入!
# 参数:经度、纬度、名称
127.0.0.1:6379> geoadd china:city 116.40 39.90 beijing  
(integer) 1
127.0.0.1:6379> geoadd china:city 121.47 31.23 shanghai
(integer) 1

geopos 获取城市的经度与纬度

127.0.0.1:6379> geopos china:city beijing
1) 1) "116.39999896287918091"
   2) "39.90000009167092543"

GEODIST 返回两个给定位置之间的距离。

如果两个位置之间的其中一个不存在, 那么命令返回空值。指定单位的参数 unit 必须是以下单位的其中一个:

  • m 表示单位为米。 默认
  • km 表示单位为千米。
  • mi 表示单位为英里。
  • ft 表示单位为英尺。
127.0.0.1:6379> geodist china:city beijing  shanghai
"1067378.7564"
# 改变单位
127.0.0.1:6379> geodist china:city beijing  shanghai km  
"1067.3788"

GEORADIUS :以给定的经纬度为中心,找出某一半径的所有位置元素。应用场景(附近的人) georadius 命令-----详情点我

127.0.0.1:6379> GEORADIUS china:city 110 30 1000 km
# 110 30 模拟定位的经纬度  以当前位置查找半径在1000km以内的城市

GEORADIUSBYMEMBER 中心点是由给定的位置元素决定的

127.0.0.1:6379> GEORADIUSBYMEMBER china:city beijing 2000 km
1) "shanghai"
2) "beijing"

Hyperloglog

基数:两个集合中不重复的元素

应用:可以使用它用作网站统计的访问量,传统方式是使用set来统计它的元素作为判断 ,但是仅仅只因为计数还要保存用户的信息、浪费内存;如果可以忽略容错,可以使用它完成

127.0.0.1:6379> pfadd set a s f e g y  # 添加元素
(integer) 1
127.0.0.1:6379> PFCOUNT set   # 获取元素个数
(integer) 6
127.0.0.1:6379> PFMERGE set1 set set2  # 合并集合
OK

Bitmap

bitmaps(位图)也是一种数据结构,操作二进制位来进行记录,只有0与1两种状态

使用bitmap的场景有很多:例如 统计(活跃与不活跃)用户的信息、是否登录、打卡…只要是两个状态的,都可以使用bitmap解决

# 假设打卡场景 0:星期一     1:星期二  以此类推
127.0.0.1:6379> setbit sign 0 1   # 星期一成功打卡
(integer) 0
127.0.0.1:6379> setbit sign 1 0   # 星期二未打卡
(integer) 0

127.0.0.1:6379> getbit sign 0   # 检查某天的打卡清空
(integer) 1

127.0.0.1:6379> BITCOUNT sign   # 统计打卡记录
(integer) 1

基本的事务操作

事务我们在MysQL这章节开始了解,事务有4个原则; 在Redis中事务中的所有命令都会被序列化 且会按照顺序执行

Redis的事务,不能保证原子性

Redis事务也没有隔离级别的概念 只有发起执行命令时才会执行

操作步骤:

  1. 开启事务(multi
  2. 命令入队
  3. 执行事务(exec

执行事务

# 开启事务
127.0.0.1:6379> multi
OK
# 命令入队
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
# 执行事务
127.0.0.1:6379(TX)> exec
1) OK
2) OK

取消事务

# 开启事务
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k2 v2
QUEUED
# 取消事务
127.0.0.1:6379(TX)> DISCARD
OK
# 事务中的命令没有被执行
127.0.0.1:6379> get k2  
(nil)

在事务中出现错误情况分为两种 类比java

  • 编译时异常(命令有误):事务中的命令是不会被执行的
  • 运行时异常:如果事务中出现语义性异常时,其他命令可以正常执行

乐观锁

Reids使用watch(监视)来实现乐观锁

127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch money  # 监视 money
OK
127.0.0.1:6379> multi   #事务正常结束,期间未发生变动、正常结束
OK
127.0.0.1:6379(TX)> decrby money 10
QUEUED
127.0.0.1:6379(TX)> incrby out 10
QUEUED
127.0.0.1:6379(TX)> exec  
1) (integer) 90
2) (integer) 10

如果事务执行失败 先使用unwatch解锁 然后获取最新的值再次监视

Jedis

什么是Jedis ?

Redis官方推荐的java连接开发工具!使用lava操作Redis 中间件!如果你要使用java操作redis,那么一定要对Jedis十分的熟悉!

  1. 导入依赖
        <!-- jedis-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.6.1</version>
        </dependency>
        <!--fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.54</version>
        </dependency>
  1. 编码测试
  • 连接数据库
  • 操作命令
  • 关闭连接

测试连接

    public static void main(String[] args) {
        //1. new jedis对象
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        //jedis 所有指令就是上述学的指令
        System.out.println(jedis.ping());
    }

常用API
image-20210815162034331
String
image-20210815162307217
image-20210815162332587
List

Set
图片无法显示

Hash
image-20210815164553046
事务
image-20210815165024228

整合SpringBoot

相关配置

  • 配置
    image-20210815172723410
  • 源码
    @Bean
    @ConditionalOnMissingBean( name = {"redisTemplate"})
    @ConditionalOnSingleCandidate(RedisConnectionFactory.class)
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
      //默认的 RedisTemplate 没有过多的设置,redis对象都是需要序列化!
      //两个泛型都是 object,object的类型,我们后使用需要强制转换<String,object>
        RedisTemplate<Object, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

//由于string 是redis中最常使用的类型,所以说单独提出来了一个bean !
    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnSingleCandidate(RedisConnectionFactory.class)
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

测试

  1. 导入依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
  1. 配置
## redis配置
spring.redis.host=127.0.0.1
spring.redis.port=6379
  1. 简单测试
    @Autowired
    private RedisTemplate redisTemplate;

    @Test
    public void test01() {
        //操作字符串 类似String
        redisTemplate.opsForValue().set("key", "qiandu");
        System.out.println(redisTemplate.opsForValue().get("key"););
        //获取redis的连接对象 ,用的少
//        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
//        connection.flushAll();
    }

序列化

image-20210815175852670
image-20210815175923405
它默认的序列化方式是jdk序列化,真实开发一般使用json传递数据、我们需要使用json方式序列化,我们可编写配置来尽可能的满足开发

        User user = new User(10, "qd666", "123456", 1);
        String jsonUser=new ObjectMapper.writeValueAsString(user);
        redisTemplate.opsForValue().set("user",jsonUser);
        redisTemplate.opsForValue().get("user");

在企业的开发中 pojo类一般都会实现序列化接口

自定义RedisTemplate

image-20210815181949243
自己编写一个模板,可以拿去直接使用

@Configuration
public class ReidsConfig {
    //编写配置类
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        //json序列化配置
        Jackson2JsonRedisSerializer Jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        Jackson2JsonRedisSerializer.setObjectMapper(om);
        //String 的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        //key使用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        //hash使用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        //value采用的是Jackson
        template.setValueSerializer(Jackson2JsonRedisSerializer);
        //hash的value采用的是Jackson
        template.setDefaultSerializer(Jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}

在开发中,直接使用原生的api太麻烦了,我们可以编写RedisUtil来操作


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public final class RedisUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // =============================common============================

    /**
     * 指定缓存失效时间
     *
     * @param key  键
     * @param time 时间(秒)
     */
    public boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     *
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }


    /**
     * 判断key是否存在
     *
     * @param key 键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除缓存
     *
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete(CollectionUtils.arrayToList(key));
            }
        }
    }


    // ============================String=============================

    /**
     * 普通缓存获取
     *
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     *
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */

    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 普通缓存放入并设置时间
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */

    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 递增
     *
     * @param key   键
     * @param delta 要增加几(大于0)
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }


    /**
     * 递减
     *
     * @param key   键
     * @param delta 要减少几(小于0)
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }


    // ================================Map=================================

    /**
     * HashGet
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     *
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     *
     * @param key 键
     * @param map 对应多个键值
     */
    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * HashSet 并设置时间
     *
     * @param key  键
     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除hash表中的值
     *
     * @param key  键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }


    /**
     * 判断hash表中是否有该项的值
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }


    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key  键
     * @param item 项
     * @param by   要增加几(大于0)
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }


    /**
     * hash递减
     *
     * @param key  键
     * @param item 项
     * @param by   要减少记(小于0)
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }


    // ============================set=============================

    /**
     * 根据key获取Set中的所有值
     *
     * @param key 键
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key   键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将数据放入set缓存
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 将set数据放入缓存
     *
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 获取set缓存的长度
     *
     * @param key 键
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 移除值为value的
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 移除的个数
     */

    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    // ===============================list=================================

    /**
     * 获取list缓存的内容
     *
     * @param key   键
     * @param start 开始
     * @param end   结束 0 到 -1代表所有值
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 获取list缓存的长度
     *
     * @param key 键
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 通过索引 获取list中的值
     *
     * @param key   键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 根据索引修改list中的某条数据
     *
     * @param key   键
     * @param index 索引
     * @param value 值
     * @return
     */

    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 移除N个值为value
     *
     * @param key   键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */

    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }

    }

}

Redis.conf详解

这个文件我们再熟悉不过了,我们启动reids就需要这个配置文件

单位

配置文件 units单位对大小写不敏感

包含

# include .\path\to\local.conf
# include c:\path\to\other.conf

可以把多个配置文件配置一起

网络 NETWORK

bind 127.0.0.1  # 绑定的IP
protected-mode yes  # 保证安全性
port 6379  #端口号

通用配置 GENERAL

daemonized  yes  #以守护进程的方式运行
pidfile /var/run/redis_6379.pid  # 如果以后台的方式运行,我们就需要指定一个 pid 文件!

# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)
# warning (only very important / critical messages are logged)
loglevel notice  # 日志级别

logfile ""   # 日志的文件位置名
databases 16 # 数据库数量

快照 SNAPSHOTTING

​ 持久化,在规定的时间内,执行了多少次操作,则会持久化到文件.rdb .aof

# 如果900s内,如果至少有一个1 key进行了修改,我们及进行持久化操作
save 900 1
# 如果300s内,如果至少有一个10 key进行了修改,我们及进行持久化操作
save 300 10
# 如果60s内,如果至少有一个10000 key进行了修改,我们及进行持久化操作
save 60 10000

stop-writes-on-bgsave-error yes  # 持久化错误是否继续工作
rdbcompression yes  # 是否压缩rdb文件、需要消耗cpu资源
rdbchecksum yes   # 是否校验rdb文件
dir ./  # rdb文件保存目录

复制 REPLICATION

# slaveof <masterip> <masterport>   # 配置主机信息
# masterauth <master-password>  # 主机密码

安全 SECURITY

# requirepass foobared  # 默认没有密码
## 设置密码
## 方式1
requirepass 123456  # 修改配置文件
## 方式2  命令行
config get  requirepass  # 获取密码
config set  requirepass 123456 # 设置密码
auth 123456 # 验证

限制 LIMITS

# maxclients 10000    # 最大客户端数
# maxmemory <bytes>  # 最大内存设置
# maxmemory-policy noeviction   # 内存上限处理策略  看下图


image-20210815201803603

APPEND ONLY MODE aof的配置

appendonly no  # 默认不开启
appendfilename "appendonly.aof"  # 文件名

# appendfsync always  # 每次修改都会 sync。消耗性能
appendfsync everysec  # 每秒执行一次 sync,可能会丢失这1s的数据!
# appendfsync no   #  不执行sync,这个时候操作系统自己同步数据,速度最快!

Redis持久化

面试和工作,持久化都是重点!

Redis是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以Redis提供了持久化功能!

RDB

image-20210816080139349

在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。

Redis会单独创建 ( fork )一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何I0操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。

rdb保存的文件是dump.rdb 在生产环境我们会将这个文件进行备份

触发规则

  1. save的规则满足的情况下,会自动触发rdb规则
  2. 执行flushall命令,也会触发我们的rdb规则
  3. 退出redis,也会产生rdb文件!

恢复文件

  1. 只需要将rdb文件放在我们redis启动目录就可以,redis启动的时候会自动检查dump.rdb恢复其中的数据
  2. 查看需要存在的位置
127.0.0.1:6379> config get dir
1) "dir"
2) "/www/server/redis"   # 如果在这个目录下存在 dump .rdb文件,启动就会自动恢复其中的数据

优点

  1. 适合大规模的数据恢复
  2. 对数据的完整性要不高

缺点

  1. 需要一定的时间间隔进程操作!如果redis意外宕机了,这个最后一次修改数据就没有的了!
  2. fork进程的时候,会占用一定的内容空间!!

AOF

意为追加文件,将所有的命令全部记录下来,恢复的时候将命令全部执行一遍

image-20210816082432980

以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作

Aof保存的是appendonly.aof文件

默认是不开启的,我们需要手动进行配置!我们只需要将appendorly改为yes就开启了aof ! 重启reids即可以生效

如果appendonly.aof 被修改,可以使用以下命令恢复

redis-check-aof - -fix appendonly.aof

优点

  1. 每次修改都同步,保证完整性

缺点

  1. 相对于数据文件来说,aof远远大于rdb,修复的速度也比 rdb慢!
  2. Aof运行效率也要比 rdb 慢,所以我们redis默认的配置就是rdb持久化

扩展

image-20210816084455057
image-20210816084522437

Redis主从复制

概念

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(masterleader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。Master以写为主,Slave以读为主。

默认情况下,每台Redis服务器都是主节点;且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。

作用:

  1. 数据冗余︰主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
  2. 故障恢复∶当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。
  3. 负载均衡∶在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载﹔尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
  4. 高可用基石∶除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。

一般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的,原因如下︰

1、从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大;

2、从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有内存用作Redis存储内存,一般来说,单台Redis最大使用内存不应该超过20G。

架构

image-20210816160756932

环境配置

查看配置信息

127.0.0.1:6379> info replication
# Replication
role:master  # 角色
connected_slaves:0  # 连接从机的个数
master_failover_state:no-failover
master_replid:3f55769ced26d9c31df8a1568e7653195933bf34
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

配置从机

127.0.0.1:6380> slaveof 127.0.0.1 6379   # 选择主机
ok  

真实的从主配置应该在配置文件中配置,这样的话是永久的 主机可以写,从机不能写只能读!主机中的所有信息和数据,都会自动被从机保存

复制原理

Slave启动成功连接到master后会发送一个sync同步命令

Master接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。

全量复制︰而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。

增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步

但是只要是重新连接master,一次完全同步(全量复制)将被自动执行

哨兵模式

概述

主从切换技术的方法是∶当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel (哨兵)架构来解决这个问题。

哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。

基本模型:

image-20210816172945862

这里的哨兵有两个作用:

  • 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
  • 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。

然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。

image-20210816173227206

假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线

配置

  1. 配置哨兵配置文件 sentinel.conf
# # sentinel monitor  被监控的名称 host  port  1:票数最多的,就会成为主机
sentinel monitor myredis 127.0.0.1 6379 1
  1. 启动哨兵
redis-sentinel qdconfig/sentinel.conf

如果主机断掉了,哨兵会根据它的算法自动把一个从机变成主机;如果之前的主机恢复了做从机。

优点

  1. 哨兵集群,基于主从复制模式,所有的主从配置优点,它全有
  2. 主从可以切换,故障可以转移,系统的可用性就会更好
  3. 哨兵模式就是主从模式的升级,手动到自动,更加健壮!

缺点

  1. Redis不好啊在线扩容的,集群容量一旦到达上限,在线扩容就十分麻烦!
  2. 实现哨兵模式的配置其实是很麻烦的,里面有很多选择! 哨兵模式配置文件

Redis缓存穿透和雪崩

Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。

另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案

缓存穿透

概念

缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中,于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。

解决方案

布隆过滤器

布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力﹔

image-20210816180316175

缓存空对象

当存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源;

image-20210816180433712

但是这种方法会存在两个问题:

1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键;

2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。

缓存击穿

概述

这里需要注意和缓存击穿的区别,缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。

当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。

解决方案

设置热点数据永不过期

从缓存层面来看,没有设置过期时间,所以不会出现热点 key过期后产生的问题。

加互斥锁

分布式锁∶使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。

缓存雪崩

概念

缓存雪崩,是指在某一个时间段,缓存集中过期失效。比如Redis宕机

产生雪崩的原因之一,比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

image-20210816181112363

其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的,很有可能瞬间就把数据库压垮。

解决方案

redis高可用

这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群。

限流降级

这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。

数据预热

数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。

该笔记基于狂神的视频讲解所写 视频链接:【狂神说Java】Redis_哔哩哔哩


The End~~

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐