Nosql概述

什么是Nosql

Not Only Structured Query Language

关系型数据库:列+行,同一个表下数据的结构是一样的。

非关系型数据库:数据存储没有固定的格式,并且可以进行横向扩展。

NoSQL泛指非关系型数据库,随着web2.0互联网的诞生,传统的关系型数据库很难对付web2.0时代!尤其是超大规模的高并发的社区,暴露出来很多难以克服的问题,NoSQL在当今大数据环境下发展的十分迅速,Redis是发展最快的。

Nosql特点

  1. 方便扩展(数据之间没有关系,很好扩展!)

  2. 大数据量高性能(Redis一秒可以写8万次,读11万次,NoSQL的缓存记录级,是一种细粒度的缓存,性能会比较高!)

  3. 数据类型是多样型的!(不需要事先设计数据库,随取随用)

  4. 传统的 RDBMS 和 NoSQL

传统的 RDBMS(关系型数据库)
- 结构化组织
- SQL
- 数据和关系都存在单独的表中 row col
- 操作,数据定义语言
- 严格的一致性
- 基础的事务
- ...
Nosql
- 不仅仅是数据
- 没有固定的查询语言
- 键值对存储,列存储,文档存储,图形数据库(社交关系)
- 最终一致性
- CAP定理和BASE
- 高性能,高可用,高扩展
- ...

了解:3V + 3高

大数据时代的3V :主要是描述问题

海量Velume

多样Variety

实时Velocity

大数据时代的3高 : 主要是对程序的要求

高并发

高可扩

高性能

真正在公司中的实践:NoSQL + RDBMS 一起使用才是最强的。

阿里巴巴演进分析

推荐阅读:阿里云的这群疯子https://yq.aliyun.com/articles/653511

image

image

# 商品信息
- 一般存放在关系型数据库:Mysql,阿里巴巴使用的Mysql都是经过内部改动的。

# 商品描述、评论(文字居多)
- 文档型数据库:MongoDB

# 图片
- 分布式文件系统 FastDFS
- 淘宝:TFS
- Google: GFS
- Hadoop: HDFS
- 阿里云: oss

# 商品关键字 用于搜索
- 搜索引擎:solr,elasticsearch
- 阿里:Isearch 多隆

# 商品热门的波段信息
- 内存数据库:Redis,Memcache

# 商品交易,外部支付接口
- 第三方应用

Nosql的四大分类

KV键值对

  • 新浪:Redis
  • 美团:Redis + Tair
  • 阿里、百度:Redis + Memcache

文档型数据库(bson数据格式):

  • MongoDB(掌握)

基于分布式文件存储的数据库。C++编写,用于处理大量文档。 MongoDB是RDBMS和NoSQL的中间产品。MongoDB是非关系型数据库中功能最丰富的,NoSQL中最像关系型数据库的数据库。

  • ConthDB

列存储数据库

  • HBase(大数据必学)
  • 分布式文件系统

图关系数据库

用于广告推荐,社交网络

  • Neo4j、InfoGrid

    image.png

Redis 入门

redis中文官网

概述

Redis是什么?

Redis(Remote Dictionary Server ),即远程字典服务。

是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。

与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

Redis能该干什么?

  1. 内存存储、持久化,内存是断电即失的,所以需要持久化(RDB、AOF)
  2. 高效率、用于高速缓冲
  3. 发布订阅系统
  4. 地图信息分析
  5. 计时器、计数器(eg:浏览量)
  6. 。。。

特性

  • 多样的数据类型

  • 持久化

  • 集群

  • 事务

  • ... …

环境搭建

官网:https://redis.io/

推荐使用Linux服务器学习。

windows版本的Redis已经停更很久了,最近有热心志愿者更新

Windows安装

https://github.com/dmajkic/redis

  1. 解压安装包 image

  2. 开启redis-server.exe

  3. 启动redis-cli.exe测试

image

Linux安装

  1. 下载安装包!redis-5.0.8.tar.gz

  2. 解压Redis的安装包!程序一般放在 /opt 目录下 我的在Document里

image

  1. 基本环境安装
yum install gcc-c++
# 然后进入redis目录下执行
# 如果安装redies6.X 版本 , 出现error错误 , 重新回退到5.X版本 ,
## 或者安装一个依赖  yum install tcl  出错是因为6.X的redis需要5.3以上的版本的gcc 
make
# 然后执行
make install

image

  1. redis默认安装路径 /usr/local/bin image

  2. 将redis的配置文件(redis里的redis.conf)复制到 程序安装目录 /usr/local/bin/kconfig下 

image

  1. redis默认不是后台启动的,需要修改配置文件!用vim redis.conf  打开配置文件

image

  1. 通过制定的配置文件启动redis服务   redis-server kconfig/redis.conf

image

  1. 使用redis-cli连接指定的端口号测试,Redis的默认端口6379

image

  1. 查看redis进程是否开启

image

  1. 关闭Redis服务 shutdown  断开连接 exit退出

image

redis-benchmark性能测试

redis-benchmark 是一个官方自带的压力测试工具,检测性能,速度。

image

 简单测试

在开启redis服务后 
测试100个并发连接 共100000请求
redis-benchmark -h localhost -p 6379 -c 100 -n 100000

image

基础知识

redis默认有16个数据库

image

默认使用的第0个;

16个数据库为:DB 0~DB 15 默认使用DB 0 ,可以使用select n切换到DB n,dbsize可以查看当前数据库的大小,与key数量相关。

127.0.0.1:6379> config get databases # 命令行查看数据库数量databases
1) "databases"
2) "16"

127.0.0.1:6379> select 8 # 切换数据库 DB 8
OK
127.0.0.1:6379[8]> dbsize # 查看数据库大小
(integer) 0

# 不同数据库之间 数据是不能互通的,并且dbsize 是根据库中key的个数。
127.0.0.1:6379> set name sakura 
OK
127.0.0.1:6379> SELECT 8
OK
127.0.0.1:6379[8]> get name # db8中并不能获取db0中的键值对。
(nil)
127.0.0.1:6379[8]> DBSIZE
(integer) 0
127.0.0.1:6379[8]> SELECT 0
OK
127.0.0.1:6379> keys *
1) "counter:__rand_int__"
2) "mylist"
3) "name"
4) "key:__rand_int__"
5) "myset:__rand_int__"
127.0.0.1:6379> DBSIZE # size和key个数相关
(integer) 5

keys * :查看当前数据库中所有的key。

flushdb:清空当前数据库中的键值对。

flushall:清空所有数据库(16)的键值对。

Redis是单线程的(6版本是多线程),Redis是基于内存操作的。

所以Redis的性能瓶颈不是CPU,而是机器内存和网络带宽。

那么为什么Redis的速度如此快呢,性能这么高呢?QPS达到10W+

Redis为什么单线程还这么快?

  • 误区1:高性能的服务器一定是多线程的?
  • 误区2:多线程(CPU上下文会切换!)一定比单线程效率高!

核心:Redis是将所有的数据放在内存中的,所以说使用单线程去操作效率就是最高的,多线程(CPU上下文会切换:耗时的操作!),对于内存系统来说,如果没有上下文切换效率就是最高的,多次读写都是在一个CPU上的,在内存存储数据情况下,单线程就是最佳的方案。

五大数据类型

Redis是一个开源(BSD许可),内存存储的数据结构服务器,可用作数据库,高速缓存和消息队列代理。它支持字符串、哈希表、列表、集合、有序集合,位图,hyperloglogs等数据类型。内置复制、Lua脚本、LRU收回、事务以及不同级别磁盘持久化功能,同时通过Redis Sentinel提供高可用,通过Redis Cluster提供自动分区。

Redis-key

在redis中无论什么数据类型,在数据库中都是以key-value形式保存,通过进行对Redis-key的操作,来完成对数据库中数据的操作。

下面学习的命令:

exists key:判断键是否存在

del key:删除键值对

move key db:将键值对移动到指定数据库

expire key second:设置键值对的过期时间

type key:查看value的数据类型

 测试代码

127.0.0.1:6379> keys *    显示所有的key
1) "key:__rand_int__"
2) "myset:__rand_int__"
3) "mylist"
4) "counter:__rand_int__"
127.0.0.1:6379> set name smile
OK
127.0.0.1:6379> keys *
1) "key:__rand_int__"
2) "myset:__rand_int__"
3) "counter:__rand_int__"
4) "mylist"
5) "name"
127.0.0.1:6379> move name 1   将name移动到1号数据库,默认0号
(integer) 1
127.0.0.1:6379> SELECT 1
OK
127.0.0.1:6379[1]> keys *
1) "name"
127.0.0.1:6379[1]> flashdb   清空当前数据库
(error) ERR unknown command `flashdb`, with args beginning with: 
127.0.0.1:6379[1]> flushdb
OK
127.0.0.1:6379[1]> keys *
(empty list or set)
127.0.0.1:6379[1]> SELECT 0
OK
127.0.0.1:6379> set name smile
OK
127.0.0.1:6379> EXISTS name    判断key是否存在
(integer) 1
127.0.0.1:6379> EXPIRE name 10  设置name过期时间
(integer) 1
127.0.0.1:6379> ttl name    查看name过期时间
(integer) 5
127.0.0.1:6379> ttl name
(integer) 4
127.0.0.1:6379> ttl name
(integer) 3
127.0.0.1:6379> ttl name
(integer) 2
127.0.0.1:6379> ttl name
(integer) 2
127.0.0.1:6379> ttl name
(integer) 1
127.0.0.1:6379> ttl name
(integer) 0
127.0.0.1:6379> ttl name
(integer) -2
127.0.0.1:6379> EXISTS name 
(integer) 0
127.0.0.1:6379> keys *
1) "key:__rand_int__"
2) "myset:__rand_int__"
3) "counter:__rand_int__"
4) "mylist"
127.0.0.1:6379> TYPE name   判断name数据类型
none
127.0.0.1:6379> set name smile
OK
127.0.0.1:6379> TYPE name
string

关于TTL命令

Redis的key,通过TTL命令返回key的过期时间,一般来说有3种:

  1. 当前key没有设置过期时间,所以会返回-1.
  2. 当前key有设置过期时间,而且key已经过期,所以会返回-2.
  3. 当前key有设置过期时间,且key还没有过期,故会返回key的正常剩余时间.

关于重命名RENAME和RENAMENX

  • RENAME key newkey修改 key 的名称
  • RENAMENX key newkey仅当 newkey 不存在时,将 key 改名为 newkey 。

更多命令学习:https://www.redis.net.cn/order/

String

========================================================
127.0.0.1:6379> get name     
"smile"
127.0.0.1:6379> APPEND name "hello"    在name后追加字符串
(integer) 10
127.0.0.1:6379> STRLEN name            name的长度
(integer) 10
127.0.0.1:6379> get name
"smilehello"
127.0.0.1:6379> APPEND age 12          追加的字符串是原来没有的会自动创建
(integer) 2
127.0.0.1:6379> EXISTS age              判断age是否存在
(integer) 1
========================================================
类似java中的i++
设置步长            文章浏览量的原理
127.0.0.1:6379> set count 0            
OK
127.0.0.1:6379> get count
"0"
127.0.0.1:6379> INCR count        自增1
(integer) 1
127.0.0.1:6379> INCR count
(integer) 2
127.0.0.1:6379> get count
"2"
127.0.0.1:6379> decr count        自减1
(integer) 1
127.0.0.1:6379> decr count
(integer) 0
127.0.0.1:6379> decr count
(integer) -1
127.0.0.1:6379> get count
"-1"
127.0.0.1:6379> incrby count 5     自增加5
(integer) 4
127.0.0.1:6379> incrby count 5
(integer) 9
127.0.0.1:6379> decrby count 10    自减10
(integer) -1
========================================================
截取字符串 range
127.0.0.1:6379> GETRANGE name 0 4      获取指定位置字符串
"smile"
127.0.0.1:6379> set name smile,hello
OK
127.0.0.1:6379> get name
"smile,hello"
127.0.0.1:6379> setrange name 6 zhangsan   替换指定位置开始的字符串
(integer) 14
127.0.0.1:6379> get name
"smile,zhangsan"

========================================================
127.0.0.1:6379> setex key1 10 zhangsan     设置过期时间和expire类似
OK
127.0.0.1:6379> ttl key1
(integer) 7
127.0.0.1:6379> get key1
"zhangsan"
127.0.0.1:6379> ttl key1
(integer) -2
127.0.0.1:6379> setnx key1 zhangsan        设置值和set类似,不同的是不能覆盖
(integer) 1
127.0.0.1:6379> get key1
"zhangsan"
127.0.0.1:6379> setnx key1 wanwu
(integer) 0

setex和setnx会在分布式中使用,具有原子性  setnx作用应该是避免覆盖
========================================================
127.0.0.1:6379> mset k1 v1 k2 v2   同时设置多个值
OK
127.0.0.1:6379> mget k1 v1
1) "v1"
2) (nil)
127.0.0.1:6379> mget k1 k2         同时获取多个值
1) "v1"
2) "v2"
127.0.0.1:6379> msetnx k1 v1 k4 k4       k1已经存在k4不存在,因为msetnx具有原子性,即一个事                            
(integer) 0                              务,同时成功或者同时失败
127.0.0.1:6379> get k4
(nil)
========================================================
user:1 {"name":smile,"age":12}   设置一个user:1对象  值为json
redis中设计key   user:{id}:{filed}

127.0.0.1:6379> mset user:1:name smile user:1:age 12 user:2:name zhangsan user:2:age 18
OK
127.0.0.1:6379> mget user:1:name user:1:age user:2:name user:2:age
1) "smile"
2) "12"
3) "zhangsan"
4) "18"

========================================================
getset 先get后set,返回set的值
127.0.0.1:6379> get name
"smile"
127.0.0.1:6379> getset name smlie2
"smile"
127.0.0.1:6379> get name
"smlie2"
========================================================

String的操作场景

统计数,粉丝数,浏览量

List类型

##########################################################################
127.0.0.1:6379> LPUSH list one # 将一个值或者多个值,插入到列表头部 (左)
(integer) 1
127.0.0.1:6379> LPUSH list two
(integer) 2
127.0.0.1:6379> LPUSH list three
(integer) 3
127.0.0.1:6379> LRANGE list 0 -1 # 获取list中值!
1) "three"
2) "two"
3) "one"
127.0.0.1:6379> LRANGE list 0 1 # 通过区间获取具体的值!
1) "three"
2) "two"
127.0.0.1:6379> Rpush list righr # 将一个值或者多个值,插入到列表尾部 (右)
(integer) 4
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "two"
3) "one"
4) "righr"
##########################################################################
127.0.0.1:6379> lpush list 1 2 3     向list添加一个或者多个元素
(integer) 3
127.0.0.1:6379> lpop list            从list左取值
"3"
127.0.0.1:6379> rpop list            从list右取值
"1"
##########################################################################
lindex 获取指定下标的值

127.0.0.1:6379> lrange list 0 -1     遍历list
1) "2"
127.0.0.1:6379> lindex list 0
"2"
##########################################################################
127.0.0.1:6379> lrange list 0 -1     遍历list
1) "3"
2) "2"
3) "1"
4) "2"
127.0.0.1:6379> llen list            返回list集合长度
(integer) 4
##########################################################################
127.0.0.1:6379> lrange list 0 -1
1) "3"
2) "2"
3) "1"
4) "2"
127.0.0.1:6379> llen list
(integer) 4
127.0.0.1:6379> lrem list 2 2        移除指定值的特定个数
(integer) 2
127.0.0.1:6379> lrange list 0 -1      遍历
1) "3"
2) "1"
127.0.0.1:6379> lrem list 1 3         移除list中元素3共一个
(integer) 1
127.0.0.1:6379> 
##########################################################################
ltrim 截取
127.0.0.1:6379> LRANGE list 0 -1       遍历list
1) "3"
2) "2"
3) "2"
4) "1"
127.0.0.1:6379> LTRIM list 1 2         截取指定位置字符
OK
127.0.0.1:6379> LRANGE list 0 -1
1) "2"
2) "2"
##########################################################################
127.0.0.1:6379> lrange list 0 -1
1) "1"
2) "2"
3) "3"
127.0.0.1:6379> rpoplpush list list2   将最右边的元素取出添加到list2中
"3"
127.0.0.1:6379> keys *
1) "list"
2) "list2"
127.0.0.1:6379> lrange list2 0 -1       查看新列表中元素
1) "3"
##########################################################################
lset在已经存在的list中按下标设置值
127.0.0.1:6379> EXISTS list        判断list是否存在
(integer) 1
127.0.0.1:6379> lrange list 0 -1    遍历list
1) "1"
127.0.0.1:6379> lset list 0 val1    将第一个元素设置成val1
OK
127.0.0.1:6379> lrange list 0 -1
1) "val1"
127.0.0.1:6379> lset list 1 val2    由于没有第二个元素,不能通过lset设置值
(error) ERR index out of range
##########################################################################
linsert 将某个值插入到元素的前面或者后面
127.0.0.1:6379> lrange list 0 -1
1) "val2"
2) "val1"
127.0.0.1:6379> LINSERT list before val2 val3       val2左边添加val3
(integer) 3
127.0.0.1:6379> lrange list 0 -1
1) "val3"
2) "val2"
3) "val1"
127.0.0.1:6379> LINSERT list after val2 val4
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "val3"
2) "val2"
3) "val4"
4) "val1"
127.0.0.1:6379> 
##########################################################################


小结

  • 他实际上是一个双向链表,before Node after , left,right 都可以插入值
  • 如果key 不存在,创建新的链表
  • 如果key存在,新增内容
  • 如果移除了所有值,空链表,也代表不存在!
  • 在两边插入或者改动值,效率最高! 中间元素,相对来说效率会低一点~

实现

消息排队!消息队列 (Lpush Rpop), 栈( Lpush Lpop)!

set

127.0.0.1:6379> sadd myset hello        向set中添加
(integer) 1
127.0.0.1:6379> sadd myset smile
(integer) 1
127.0.0.1:6379> SMEMBERS myset            遍历
1) "hello"
2) "smile"
127.0.0.1:6379> SISMEMBER myset hello        判断set中是否存在hello
(integer) 1
127.0.0.1:6379> SISMEMBER myset 2
(integer) 0
==============================================================
统计个数scard
127.0.0.1:6379> SCARD myset         统计元素个数
(integer) 2
==============================================================
127.0.0.1:6379> sadd myset hello    添加不可重复
(integer) 0
127.0.0.1:6379> srem myset hello    移除元素
(integer) 1
127.0.0.1:6379> SCARD myset         统计个数
(integer) 1
127.0.0.1:6379> SMEMBERS myset        遍历
1) "smile"
127.0.0.1:6379> sadd myset hello        
(integer) 1
127.0.0.1:6379> sadd myset water
(integer) 1
127.0.0.1:6379> SRANDMEMBER myset    随机取值
"smile"
127.0.0.1:6379> SRANDMEMBER myset
"smile"
127.0.0.1:6379> SRANDMEMBER myset
"smile"
127.0.0.1:6379> SRANDMEMBER myset
"water"
127.0.0.1:6379> SRANDMEMBER myset
"smile"
==============================================================
微博中的共同关注
127.0.0.1:6379> sadd myset2 she        
(integer) 1
127.0.0.1:6379> SMOVE myset myset2 smile    将myset中smile移动到myset2
(integer) 1
127.0.0.1:6379> SMEMBERS myset               遍历             
1) "water"
2) "hello"
127.0.0.1:6379> SMEMBERS myset2
1) "she"
2) "smile"
127.0.0.1:6379> sadd myset smile
(integer) 1
127.0.0.1:6379> SDIFF myset myset2        两者的差异,显示myset的
1) "hello"
2) "water"
127.0.0.1:6379> SDIFF myset2 myset        两者的差异,显示myset2的
1) "she"
127.0.0.1:6379> SINTER myset myset2        两者的交集
1) "smile"
127.0.0.1:6379> SUNION myset myset2        两者的并集
1) "she"
2) "hello"
3) "water"
4) "smile"
==============================================================
127.0.0.1:6379> spop myset        随机删除
"smile"
127.0.0.1:6379> spop myset        
"hello"    
127.0.0.1:6379> SMEMBERS myset
1) "water"

Hash

Map集合,key-map! 即key-<key,val>时候这个值是一个map集合! 本质和String类型没有太大区别,还是一个简单的 key-vlaue! 

127.0.0.1:6379> hset myhash name smile        key:myhash-name value:smile
(integer) 1
127.0.0.1:6379> hset myhash name zhangsan    在此设置值会覆盖
(integer) 0
127.0.0.1:6379> hset myhash age 12            
(integer) 1
127.0.0.1:6379> hget myhash name
"zhangsan"
127.0.0.1:6379> hget myhash age
"12"
127.0.0.1:6379> hmset key val key2 val2        设置多个值。只能在原有的hash上设置
(error) ERR wrong number of arguments for HMSET
127.0.0.1:6379> hmset myhash name smile age 1    
OK
127.0.0.1:6379> hmget myhash name age            一次取多个值
1) "smile"
2) "1"
=========================================================
127.0.0.1:6379> hgetall myhash            获取全部的k-v键值对
1) "name"
2) "smile"
3) "age"
4) "1"
127.0.0.1:6379> hlen myhash                获取长度
(integer) 2
127.0.0.1:6379> hdel myhash age            删除指定的
(integer) 1
127.0.0.1:6379> hlen myhash
(integer) 1
==============================================================
127.0.0.1:6379> HEXISTS myhash name         判断指定字段是否存在
(integer) 1
127.0.0.1:6379> HEXISTS myhash age
(integer) 0
==============================================================
127.0.0.1:6379> hset myhash age 12
(integer) 1
127.0.0.1:6379> hkeys myhash        获取全部的key
1) "name"
2) "age"
127.0.0.1:6379> hvals myhash        获取全部的val
1) "smile"
2) "12"
==============================================================
127.0.0.1:6379> HINCRBY myhash age 2        设置增量
(integer) 14
127.0.0.1:6379> HINCRBY myhash age 2
(integer) 16
127.0.0.1:6379> HINCRBY myhash age -2
(integer) 14

hash变更的数据 user name age,尤其是是用户信息之类的,经常变动的信息! hash 更适合于对象的 存储,String更加适合字符串存储!

Zset

在set集合的基础上添加序号,变成有序的集合

127.0.0.1:6379> zadd name 1 smile            添加
(integer) 1
127.0.0.1:6379> zadd name 3 zhangsan
(integer) 1
127.0.0.1:6379> zadd name 2 lisi
(integer) 1
127.0.0.1:6379> zrange name 0 -1            排序,默认从小到大
1) "smile"
2) "lisi"
3) "zhangsan"
127.0.0.1:6379> zrange name 0 -1 withscores    带上序号
1) "smile"
2) "1"
3) "lisi"
4) "2"
5) "zhangsan"
6) "3"
127.0.0.1:6379> zadd money 2000 zhangsan        
(integer) 1
127.0.0.1:6379> zadd money 200 lisi
(integer) 1
127.0.0.1:6379> zadd money 6000 smile
(integer) 1
127.0.0.1:6379> zrange money 0 -1            默认从小到大
1) "lisi"
2) "zhangsan"
3) "smile"
127.0.0.1:6379> zrangebyscore money -inf +inf withscores        可以设置左右边界
1) "lisi"
2) "200"
3) "zhangsan"
4) "2000"
5) "smile"
6) "6000"
127.0.0.1:6379> zrangebyscore money -inf 5000 withscores
1) "lisi"
2) "200"
3) "zhangsan"
4) "2000"
127.0.0.1:6379> ZREVRANGE name 0 -1            从大到小
1) "zhangsan"
2) "smile"
=============================================================================
删除元素
127.0.0.1:6379> zrange name 0 -1        遍历
1) "smile"
2) "lisi"
3) "zhangsan"
127.0.0.1:6379> zrem name lisi        移除lisi
(integer) 1
127.0.0.1:6379> zrange name 0 -1
1) "smile"
2) "zhangsan"
=============================================================================
集合大小
127.0.0.1:6379> zcard name
(integer) 2
127.0.0.1:6379> zcard money
(integer) 3

工资表,成绩表,排行榜的实现

三种特殊类型

Geospatial 地理位置

朋友的定位,附近的人,打车距离计算? Redis 的 Geo 在Redis3.2 版本就推出了! 这个功能可以推算地理位置的信息,两地之间的距离,方圆 几里的人! 可以查询一些测试数据:http://www.jsons.cn/lngcodeinfo/0706D99C19A781A3/ 只有 六个命令:

官方文档:https://www.redis.net.cn/order/3685.html

# geoadd 添加地理位置
# 规则:两级无法直接添加,我们一般会下载城市数据,直接通过java程序一次性导入!
# 有效的经度从-180度到180度。
# 有效的纬度从-85.05112878度到85.05112878度。
# 当坐标位置超出上述指定范围时,该命令将会返回一个错误。
127.0.0.1:6379> geoadd city 116.40 39.90 beijing        添加坐标
(integer) 1
127.0.0.1:6379> geoadd city 121.47 31.23 shanghai
(integer) 1
127.0.0.1:6379> geopos city beijing                    获取具体坐标
1) 1) "116.39999896287918091"
   2) "39.90000009167092543"
127.0.0.1:6379> geopos city shanghai                    
1) 1) "121.47000163793563843"    
   2) "31.22999903975783553"
127.0.0.1:6379> geopos city shanghai beijing
1) 1) "121.47000163793563843"
   2) "31.22999903975783553"
2) 1) "116.39999896287918091"
   2) "39.90000009167092543"
=============================================================================
    m 表示单位为米。
    km 表示单位为千米。
    mi 表示单位为英里。
    ft 表示单位为英尺。
127.0.0.1:6379> geodist city shanghai beijing        获取两个城市的直线距离
"1067378.7564"
127.0.0.1:6379> geodist city shanghai beijing km      距离添加单位
"1067.3788"
===========================================================================
127.0.0.1:6379> georadius city 120 33 50 km            坐标半径内元素
(empty list or set)
127.0.0.1:6379> georadius city 120 33 5000 km
1) "shanghai"
2) "beijing"
127.0.0.1:6379> georadius city 120 33 5000 km withdist    附加半径内两者的距离
1) 1) "shanghai"
   2) "240.6912"
2) 1) "beijing"
   2) "832.1011"
127.0.0.1:6379> georadius city 120 33 5000 km withdist withcoord    附加元素经纬度
1) 1) "shanghai"
   2) "240.6912"
   3) 1) "121.47000163793563843"
      2) "31.22999903975783553"
2) 1) "beijing"
   2) "832.1011"
   3) 1) "116.39999896287918091"
      2) "39.90000009167092543"
127.0.0.1:6379> georadius city 120 33 5000 km withdist withcoord count 1    显示个数
1) 1) "shanghai"
   2) "240.6912"
   3) 1) "121.47000163793563843"
      2) "31.22999903975783553"
======================================================================================
127.0.0.1:6379> georadiusbymember city beijing 1000 km        通过元素为圆心
1) "beijing"
127.0.0.1:6379> georadiusbymember city beijing 5000 km
1) "shanghai"
2) "beijing"
127.0.0.1:6379> GEOHASH city beijing            获取城市hash值
1) "wx4fbxxfke0"
=====================================================================================
GEO 底层的实现原理其实就是 Zset!我们可以使用Zset命令来操作geo!
127.0.0.1:6379> zrange city 0 -1
1) "shanghai"
2) "beijing"
127.0.0.1:6379> zrem city beijing
(integer) 1
127.0.0.1:6379> zrange city 0 -1
1) "shanghai"

注意

GEO 底层的实现原理其实就是 Zset!我们可以使用Zset命令来操作geo!

Hyperloglog

什么是基数?

A {1,3,5,7,8,7} B{1,3,5,7,8} 基数(不重复的元素) = 5,可以接受误差!

简介

Redis 2.8.9 版本就更新了 Hyperloglog 数据结构! Redis Hyperloglog 基数统计的算法! 优点:占用的内存是固定,2^64 不同的元素的技术,只需要废 12KB内存!如果要从内存角度来比较的 话 Hyperloglog 首选!

网页的 UV (一个人访问一个网站多次,但是还是算作一个人!)

传统的方式, set 保存用户的id,然后就可以统计 set 中的元素数量作为标准判断 !

这个方式如果保存大量的用户id,就会比较麻烦!我们的目的是为了计数,而不是保存用户id; 0.81% 错误率! 统计UV任务,可以忽略不计的!

用途:计数

127.0.0.1:6379> pfadd name a b c d a e            创建一个集合
(integer) 1
127.0.0.1:6379> PFCOUNT name                    拥挤个数
(integer) 5
127.0.0.1:6379> pfadd name2 z x v n 
(integer) 1
127.0.0.1:6379> pfmerge name3 name name2        两个集合的并集
OK
127.0.0.1:6379> pfcount name3
(integer) 9

Bitmap

这些在生活中或者开发中,都有十分多的应用场景,学习了,就是就 是多一个思路!

位存储

统计用户信息,活跃,不活跃! 登录 、 未登录! 打卡,365打卡! 两个状态的,都可以使用 Bitmaps! Bitmap 位图,数据结构! 都是操作二进制位来进行记录,就只有0 和 1 两个状态!

365 天 = 365 bit

1字节 = 8bit

46 个字节左右!

127.0.0.1:6379> setbit sign 0 1        设置值
(integer) 0
127.0.0.1:6379> setbit sign 1 1
(integer) 0
127.0.0.1:6379> setbit sign 2 1
(integer) 0
127.0.0.1:6379> setbit sign 3 0
(integer) 0
127.0.0.1:6379> setbit sign 4 0
(integer) 0
127.0.0.1:6379> getbit sign 4        获取值
(integer) 0
127.0.0.1:6379> getbit sign 2
(integer) 1
127.0.0.1:6379> bitcount sign         统计值
(integer) 3

事务

Redis 事务本质:一组命令的集合! 一个事务中的所有命令都会被序列化,在事务执行过程的中,会按 照顺序执行!

一次性、顺序性、排他性!执行一些列的命令!

------ 队列 set set set 执行------

Redis事务没有没有隔离级别的概念! 所有的命令在事务中,并没有直接被执行!只有发起执行命令的时候才会执行!Redis单条命令式保存原子性的,但是事务不保证原子性!即事务中一条语句发生错误不会影响其他语句的处理。

redis的事务:

  • 开启事务(multi)
  • 命令入队(......)
  • 执行事务(exec)

正常执行事务!

127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
3) OK
127.0.0.1:6379> get k2
"v2"
127.0.0.1:6379> get k3
"v3"

放弃事务

127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> discard
OK
127.0.0.1:6379> get k4
(nil)

编译时异常 

所以的语句都不会执行 

127.0.0.1:6379> multi
OK
127.0.0.1:6379> setget k2
(error) ERR unknown command `setget`, with args beginning with: `k2`, 
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k4
(nil)

运行时异常

出错语句无法执行


127.0.0.1:6379> multi 
OK
127.0.0.1:6379> incr k1
QUEUED
127.0.0.1:6379> get k1
QUEUED
127.0.0.1:6379> exec
1) (error) ERR value is not an integer or out of range
2) "v1"

监控! Watch (面试常问!)

悲观锁:

  • 很悲观,认为什么时候都会出问题,无论做什么都会加锁!

乐观锁:

  • 很乐观,认为什么时候都不会出问题,所以不会上锁! - - 更新数据的时候去判断一下,在此期间是否 有人修改过这个数据,
  • 获取version
  • 更新的时候比较 version

Redis测监视测试

正常执行成功

127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch money # 监视 money 对象
OK
127.0.0.1:6379> multi # 事务正常结束,数据期间没有发生变动,这个时候就正常执行成功!
OK
127.0.0.1:6379> DECRBY money 20
QUEUED
127.0.0.1:6379> INCRBY out 20
QUEUED
127.0.0.1:6379> exec
1) (integer) 80
2) (integer) 20

测试多线程修改值 , 使用watch 可以当做redis的乐观锁操作!

127.0.0.1:6379> watch money # 监视 money
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> DECRBY money 10
QUEUED
127.0.0.1:6379> INCRBY out 10
QUEUED
127.0.0.1:6379> exec # 执行之前,另外一个线程,修改了我们的值,这个时候,就会导致事务执行失
败!
(nil)

 如果修改失败 unwatch解锁,获取最新的值就好

注意:每次提交执行exec后都会自动释放锁,不管是否成功 

Jedis

通过java操作redis,是java整合redis的中间件

测试

  1. 导入对应的依赖
<!--导入jedis的包-->
<dependencies>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
         <version>3.2.0</version>
    </dependency>
    <!--fastjson-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>
  1. 、编码测试:
  • 连接数据库
  • 操作命令
  • 断开连接!
package com.kuang;
import redis.clients.jedis.Jedis;

public class TestPing {
    public static void main(String[] args) {
        // 1、 new Jedis 对象即可
        Jedis jedis = new Jedis("127.0.0.1",6379);
        // jedis 所有的命令就是我们之前学习的所有指令!所以之前的指令学习很重要!
        System.out.println(jedis.ping());
    }
}

 其中的api操作和原生的redis命令行操作一样

事务

public class TestTX {
    public static void main(String[] args) {
            Jedis jedis = new Jedis("127.0.0.1", 6379);
            jedis.flushDB();
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("hello","world");
            jsonObject.put("name","kuangshen");
            // 开启事务
            Transaction multi = jedis.multi();
            String result = jsonObject.toJSONString();
            // jedis.watch(result)
        try {
            multi.set("user1",result);
            multi.set("user2",result);
            int i = 1/0 ; // 代码抛出异常事务,执行失败!
            multi.exec(); // 执行事务!
        } catch (Exception e) {
             multi.discard(); // 放弃事务
               e.printStackTrace();
        } finally {
            System.out.println(jedis.get("user1"));
            System.out.println(jedis.get("user2"));
            jedis.close(); // 关闭连接
        }
    }
}

SpringBoot整合

SpringBoot 操作数据:spring-data jpa jdbc mongodb redis!

SpringData 也是和 SpringBoot 齐名的项目!

说明: 在 SpringBoot2.x 之后,原来使用的jedis 被替换为了 lettuce?

jedis : 采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用 jedis pool 连接 池! 更像 BIO 模式

lettuce : 采用netty,实例可以再多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据 了,更像 NIO 模式

源码分析:

@Bean
@ConditionalOnMissingBean(name = "redisTemplate") // 我们可以自己定义一个redisTemplate来替换这个默认的!
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory)
    throws UnknownHostException {
    // 默认的 RedisTemplate 没有过多的设置,redis 对象都是需要序列化!
    // 两个泛型都是 Object, Object 的类型,我们后使用需要强制转换 <String, Object>
    RedisTemplate<Object, Object> template = new RedisTemplate<>();
    template.setConnectionFactory(redisConnectionFactory);
    return template;
}
@Bean
@ConditionalOnMissingBean //由于String是redis中最常使用的类型,所以说单独提出来了一 个bean!
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory)
    throws UnknownHostException {
    StringRedisTemplate template = new StringRedisTemplate();
    template.setConnectionFactory(redisConnectionFactory);
    return template;
}

测试:

  1. 导入依赖
<!-- 操作redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

  1. 配置连接
# 配置redis
spring.redis.host=127.0.0.1
spring.redis.port=6379

  1. 测试!
@SpringBootTest
class Redis02SpringbootApplicationTests {
    @Autowired
    private RedisTemplate redisTemplate;
    @Test
    void contextLoads() {
        // redisTemplate 操作不同的数据类型,api和我们的指令是一样的
        // opsForValue 操作字符串 类似String
        // opsForList 操作List 类似List
        // opsForSet
        // opsForHash
        // opsForZSet
        // opsForGeo
        // opsForHyperLogLog
        // 除了进本的操作,我们常用的方法都可以直接通过redisTemplate操作,比如事务,和基本的 CRUD
        // 获取redis的连接对象
        // RedisConnection connection =
        redisTemplate.getConnectionFactory().getConnection();
        // connection.flushDb();
        // connection.flushAll();
		redisTemplate.opsForValue().set("name","smile");
		System.out.println(redisTemplate.opsForValue().get("name"));
    }
}

 关于对象的保存:
所有的对象都需要序列化

	@Test
	public void Test() throws JsonProcessingException {
		User user = new User("smile", 12);
		//将对象进行序列化操作
		String s = new ObjectMapper().writeValueAsString(user);
		redisTemplate.opsForValue().set("user",s);
		System.out.println(redisTemplate.opsForValue().get("user"));
	}

除了手动将对象序列化,我们还可以编写一个自己的 RedisTemplete,自动序列化

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    // 这是我给大家写好的一个固定模板,大家在企业中,拿去就可以直接使用!
    // 自己定义了一个 RedisTemplate
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> MyRedisTemplate(RedisConnectionFactory factory) {
        // 我们为了自己开发方便,一般直接使用 <String,Object>
        RedisTemplate<String, Object> template = new RedisTemplate<String,Object>();
        template.setConnectionFactory(factory);
        // Json序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new
                Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // String 的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}

原生的redis操作较多,可以封装一个工具类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public class RedisUtil {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // =============================common============================
    /**
     * 指定缓存失效时间
     * @param key  键
     * @param time 时间(秒)
     */
    public boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }


    /**
     * 判断key是否存在
     * @param key 键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除缓存
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
            }
        }
    }


    // ============================String=============================

    /**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */

    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 普通缓存放入并设置时间
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */

    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 递增
     * @param key   键
     * @param delta 要增加几(大于0)
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }


    /**
     * 递减
     * @param key   键
     * @param delta 要减少几(小于0)
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }


    // ================================Map=================================

    /**
     * HashGet
     * @param key  键 不能为null
     * @param item 项 不能为null
     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     * @param key 键
     * @param map 对应多个键值
     */
    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * HashSet 并设置时间
     * @param key  键
     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除hash表中的值
     *
     * @param key  键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }


    /**
     * 判断hash表中是否有该项的值
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }


    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key  键
     * @param item 项
     * @param by   要增加几(大于0)
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }


    /**
     * hash递减
     *
     * @param key  键
     * @param item 项
     * @param by   要减少记(小于0)
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }


    // ============================set=============================

    /**
     * 根据key获取Set中的所有值
     * @param key 键
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key   键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将数据放入set缓存
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 将set数据放入缓存
     *
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 获取set缓存的长度
     *
     * @param key 键
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 移除值为value的
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 移除的个数
     */

    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    // ===============================list=================================

    /**
     * 获取list缓存的内容
     *
     * @param key   键
     * @param start 开始
     * @param end   结束 0 到 -1代表所有值
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 获取list缓存的长度
     *
     * @param key 键
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 通过索引 获取list中的值
     *
     * @param key   键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将list放入缓存
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 根据索引修改list中的某条数据
     *
     * @param key   键
     * @param index 索引
     * @param value 值
     * @return
     */

    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 移除N个值为value
     *
     * @param key   键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */

    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }

    }

}

Redis.conf详解

启动的时候,就通过配置文件来启动! 

单位

image.png

  1. 配置文件 unit单位 对大小写不敏感!  

    image.png

就是好比我们学习Spring、Improt, include

网络

bind 127.0.0.1 # 绑定的ip
protected-mode yes # 保护模式
port 6379 # 端口设置

通用 GENERAL

daemonize yes # 以守护进程的方式运行,默认是 no,我们需要自己开启为yes!
pidfile /var/run/redis_6379.pid # 如果以后台的方式运行,我们就需要指定一个 pid 文件!
# 日志
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably) 生产环境
# warning (only very important / critical messages are logged)
loglevel notice
logfile "" # 日志的文件位置名
databases 16 # 数据库的数量,默认是 16 个数据库
always-show-logo yes # 是否总是显示LOGO

快照

持久化, 在规定的时间内,执行了多少次操作,则会持久化到文件 .rdb. aof redis 是内存数据库,如果没有持久化,那么数据断电及失!

# 如果900s内,如果至少有一个1 key进行了修改,我们及进行持久化操作
save 900 1
# 如果300s内,如果至少10 key进行了修改,我们及进行持久化操作
save 300 10
# 如果60s内,如果至少10000 key进行了修改,我们及进行持久化操作
save 60 10000
# 我们之后学习持久化,会自己定义这个测试!
stop-writes-on-bgsave-error yes # 持久化如果出错,是否还需要继续工作!
rdbcompression yes # 是否压缩 rdb 文件,需要消耗一些cpu资源!
rdbchecksum yes # 保存rdb文件的时候,进行错误的检查校验!
dir ./ # rdb 文件保存的目录!

REPLICATION 复制,我们后面讲解主从复制的,时候再进行讲解

SECURITY 安全

127.0.0.1:6379> ping
PONG
127.0.0.1:6379> config get requirepass # 获取redis的密码
1) "requirepass"
2) ""
127.0.0.1:6379> config set requirepass "123456" # 设置redis的密码
OK
127.0.0.1:6379> config get requirepass # 发现所有的命令都没有权限了
(error) NOAUTH Authentication required.
127.0.0.1:6379> ping
(error) NOAUTH Authentication required.
127.0.0.1:6379> auth 123456 # 使用密码进行登录!
OK
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "123456"

限制 CLIENTS

maxclients 10000 # 设置能连接上redis的最大客户端的数量
maxmemory <bytes> # redis 配置最大的内存容量
maxmemory-policy noeviction # 内存到达上限之后的处理策略
1、volatile-lru:只对设置了过期时间的key进行LRU(默认值)
2、allkeys-lru : 删除lru算法的key
3、volatile-random:随机删除即将过期key
4、allkeys-random:随机删除
5、volatile-ttl : 删除即将过期的
6、noeviction : 永不过期,返回错误

APPEND ONLY 模式 aof配置

appendonly no # 默认是不开启aof模式的,默认是使用rdb方式持久化的,在大部分所有的情况下,
rdb完全够用!
appendfilename "appendonly.aof" # 持久化的文件的名字
# appendfsync always # 每次修改都会 sync。消耗性能
appendfsync everysec # 每秒执行一次 sync,可能会丢失这1s的数据!
# appendfsync no # 不执行 sync,这个时候操作系统自己同步数据,速度最快!

具体的配置,我们在 Redis持久化 中去给大家详细详解!

Redis持久化

面试和工作,持久化都是重点! Redis 是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中 的数据库状态也会消失。所以 Redis 提供了持久化功能!

RDB(Redis DataBase)

什么是RDB

在主从复制中,rdb就是备用了!从机上面!

image.png

在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快 照文件直接读到内存里。

Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程 都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。 这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那 RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。我们默认的就是 RDB,一般情况下不需要修改这个配置!

有时候在生产环境我们会将这个文件进行备份!

rdb保存的文件是dump.rdb都是在我们的配置文件中快照中进行配置的!

image.png

触发机制

  

  1. save的规则满足的情况下,会自动触发rdb规则
  2. 执行 flushall 命令,也会触发我们的rdb规则!
  3. 退出redis,也会产生 rdb 文件!

备份就自动生成一个 dump.rdb

image.png

如果恢复rdb文件!

  1. 只需要将rdb文件放在我们redis启动目录就可以,redis启动的时候会自动检查dump.rdb 恢复其中 的数据!
  2. 查看需要存在的位置
127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin" # 如果在这个目录下存在 dump.rdb 文件,启动就会自动恢复其中的数据

几乎就他自己默认的配置就够用了,但是我们还是需要去学习!

优点

  1. 适合大规模的数据恢复!
  2. 对数据的完整性要不高!

缺点:

  1. 需要一定的时间间隔进程操作!如果redis意外宕机了,这个最后一次修改数据就没有的了!
  2. fork进程的时候,会占用一定的内容空间!!

AOF(Append Only File)

将我们的所有命令都记录下来,history,恢复的时候就把这个文件全部在执行一遍!

是什么

image.png

以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件 但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件 的内容将写指令从前到后执行一次以完成数据的恢复工作

Aof保存的是 appendonly.aof 文件

append

image.png

默认是不开启的,我们需要手动进行配置!我们只需要将 appendonly 改为yes就开启了 aof! 重启,redis 就可以生效了!

如果这个 aof 文件有错误,这时候 redis 是启动不起来的吗,我们需要修复这个aof文件

redis 给我们提供了一个工具

 redis-check-aof --fix

image.png

如果文件正常,重启就可以直接恢复了!

image.png

重写规则说明

aof 默认就是文件的无限追加,文件会越来越大!

image.png

如果 aof 文件大于 64m,太大了! fork一个新的进程来将我们的文件进行重写!

优点和缺点!

appendonly no # 默认是不开启aof模式的,默认是使用rdb方式持久化的,在大部分所有的情况下,
rdb完全够用!
appendfilename "appendonly.aof" # 持久化的文件的名字
# appendfsync always # 每次修改都会 sync。消耗性能
appendfsync everysec # 每秒执行一次 sync,可能会丢失这1s的数据!
# appendfsync no # 不执行 sync,这个时候操作系统自己同步数据,速度最快!
# rewrite 重写,

优点:

  1. 每一次修改都同步,文件的完整会更加好!
  2. 每秒同步一次,可能会丢失一秒的数据
  3. 从不同步,效率最高的!

缺点:

  1. 相对于数据文件来说,aof远远大于 rdb,修复的速度也比 rdb慢!
  2. Aof 运行效率也要比rdb慢,所以我们redis默认的配置就是rdb持久化

扩展:

  1. RDB 持久化方式能够在指定的时间间隔内对你的数据进行快照存储
  2. AOF 持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始 的数据,AOF命令以Redis 协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重 写,使得AOF文件的体积不至于过大。
  3. 只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化
  4. 同时开启两种持久化方式
  • 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF 文件保存的数据集要比RDB文件保存的数据集要完整。
  • RDB 的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要只使用AOF呢?作者 建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有 AOF可能潜在的Bug,留着作为一个万一的手段。
  1. 性能建议
  • 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够 了,只保留 save 900 1 这条规则。
  • 如果Enable AOF ,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自 己的AOF文件就可以了,代价一是带来了持续的IO,二是AOF rewrite 的最后将 rewrite 过程中产 生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite 的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小重 写可以改到适当的数值。
  • 如果不Enable AOF ,仅靠 Master-Slave Repllcation 实现高可用性也可以,能省掉一大笔IO,也 减少了rewrite时带来的系统波动。代价是如果Master/Slave 同时倒掉,会丢失十几分钟的数据, 启动脚本也要比较两个 Master/Slave 中的 RDB文件,载入较新的那个,微博就是这种架构。

Redis发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。微信、 微博、关注系统!

Redis 客户端可以订阅任意数量的频道。

订阅/发布消息图:

第一个:消息发送者, 第二个:频道 第三个:消息订阅者! 

image.png

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的 关系: 

image.png

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户 端: 

image.png

命令

这些命令被广泛用于构建即时通信应用,比如网络聊天室(chatroom)和实时广播、实时提醒等。

image.png

测试

订阅端:

127.0.0.1:6379> subscribe smile
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "smile"
3) (integer) 1
1) "message"
2) "smile"
3) "hello"
1) "message"
2) "smile"
3) "Miss the day of March 14\x16"

发送端:

127.0.0.1:6379> publish smile hello
(integer) 1
127.0.0.1:6379> publish smile Miss the day of March 14
(error) ERR wrong number of arguments for 'publish' command
127.0.0.1:6379> publish smile "Miss the day of March 14"
(integer) 1
127.0.0.1:6379> 

原理

Redis是使用C实现的,通过分析 Redis 源码里的 pubsub.c 文件,了解发布和订阅机制的底层实现,籍 此加深对 Redis 的理解。

Redis 通过 PUBLISH 、SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能。

微信:

通过 SUBSCRIBE 命令订阅某频道后,redis-server里维护了一个字典,字典的键是一个个 频道!, 而字典的值则是一个链表,链表中保存了所有订阅这个channel客户端。SUBSCRIBE 命令的关键, 就是将客户端添加到给定 channel的订阅链表中。

image.png

通过 PUBLISH 命令向订阅者发送消息,redis-server会使用给定的频道作为键,它所维护的 channel 字典中查找记录了订阅这个频道的所有客户端的链表,遍历这表,将消息发布给所有订阅者。

Pub/Sub 从字面上理解就是发布(Publish)与订阅(Subscribe),在Redis中,可以设定对某一个 key值进行消息发布及消息订阅,当一个key值上进行了消息发后,所有订阅它的客户端都会收到相应的消息。这一功能明显的用法就是用作实时消系统,比如普通的即时聊天,群聊等功能。

使用场景:

  1. 实时消息系统!
  2. 事实聊天!(频道当做聊天室,将信息回显给所有人即可!)
  3. 订阅,关注系统都是可以的! 稍微复杂的场景我们就会使用 消息中间件MQ()

Redis主从复制

概念

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(Master/Leader),后者称为从节点(Slave/Follower), 数据的复制是单向的!只能由主节点复制到从节点(主节点以写为主、从节点以读为主)。

默认情况下,每台Redis服务器都是主节点,

一个主节点可以有0个或者多个从节点,但每个从节点只能由一个主节点。

主从复制的作用主要包括:

  1. 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余的方式。
  2. 故障恢复:当主节点故障时,从节点可以暂时替代主节点提供服务,是一种服务冗余的方式
  3. 负载均衡:在主从复制的基础上,配合读写分离,由主节点进行写操作,从节点进行读操作,分担服务器的负载;尤其是在多读少写的场景下,通过多个从节点分担负载,提高并发量。
  4. 高可用基石:主从复制还是哨兵和集群能够实施的基础。

一般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的(宕机),原因如下:

  1. 从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较 大;

  2. 从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有 内存用作Redis存储内存,一般来说,单台Redis大使用内存不应该超过20G。

  3. 电商网站上的商品,一般都是一次上传,无数次浏览的,说专业点也就是"多读少写"。

对于这种场景,我们可以使如下这种架构:

image.png

主从复制,读写分离!80%的情况下都是在进行读操作!减缓服务器的压力!架构中经常使用! 一主 二从!

只要在公司中,主从复制就是必须要使用的,因为在真实的项目中不可能单机使用Redis!

环境配置

只配置从库,不用配置主库!

127.0.0.1:6379> info replication   # 查看当前库的信息 # Replication 
role:master  # 角色  
master connected_slaves:0 #  没有从机
master_replid:b63c90e6c501143759cb0e7f450bd1eb0c70882a
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0 
second_repl_offset:-1
repl_backlog_active:0 
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0 
repl_backlog_histlen:0

复制3个配置文件,然后修改对应的信息

  1. 端口
  2. pid 名字
  3. log文件名字
  4. dump.rdb 名字

修改完毕之后,启动我们的3个redis服务器,可以通过进程信息查看!

image.png

一主二从 (至少)

默认情况下,每台Redis服务器都是主节点;我们一般情况下只用配置从就好了!

认老大! 一主 (79)二从(80,81)

认老大
slaveof 127.0.0.1 6379
127.0.0.1:6380> SLAVEOF 127.0.0.1 6379   #  SLAVEOF host 6379  找谁当自己的老大!
OK
127.0.0.1:6380> info replication 
# Replication
role:slave  # 当前角色是从机
master_host:127.0.0.1   # 可以的看到主机的信息 
master_port:6379 
master_link_status:up 
master_last_io_seconds_ago:3 
master_sync_in_progress:0 
slave_repl_offset:14 
slave_priority:100 
slave_read_only:1 
connected_slaves:0 master_replid:a81be8dd257636b2d3e7a9f595e69d73ff03774e
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:14
second_repl_offset:-1 
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1 
repl_backlog_histlen:14

# 在主机中查看!
127.0.0.1:6379> info replication
# Replication 
role:master connected_slaves:1  # 多了从机的配置
slave0:ip=127.0.0.1,port=6380,state=online,offset=42,lag=1    # 多了从机的配置
master_replid:a81be8dd257636b2d3e7a9f595e69d73ff03774e
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:42
second_repl_offset:-1 
repl_backlog_active:1 
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1 
repl_backlog_histlen:42 

如果两个都配置完了,就是有两个从机

image.png

真实的从主配置应该在配置文件中配置,这样的话是永久的,我们这里用的是命令,暂时的!

细节

主机可以写,从机不能写只能读!主机中的所有信息和数据,都会自动从机保存! 主机写: 

image.png

从机只能读取内容!

image.png

测试:主机断开连接,从机依旧连接到主机的,但是没有写操作,这个候,主机如果回来了,从机依旧可以直接获取到主机写的信息!

如果是使用命令行,来配置的主从,这个时候如果重启了,就会变回主机!只要变为从机,立马就会从 主机中获取值!如果从机重启,会重新变成默认的主机

复制原理

Slave 启动成功连接到 master 后会发送一个sync同步命令

Master 接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行 完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。

全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。

增量复制:Master 继续将新的所有收集到的修改命令依次传给slave,完成同步

但是只要是重新连接master,一次完全同步(全量复制)将被自动执行! 我们的数据一定可以在从机中 看到!

层层链路

上一个M链接下一个 S!此时的80是81的主机,但因为79的存在依旧是从机,只能有读的操作

image.png

这时候也可以完成我们的主从复制!

如果没有老大了,这个时候能不能选择一个老大出来呢? 手动!

谋朝篡位
如果主机断开了连接,我们可以使用 

SLAVEOF no one

 让自己变成主机!其他的节点就可以手动连 接到最新的这个主节点(手动)!如果这个时候老大修复了,那就重新连接!

这两种模式在宕机时,需要手动产生主机,所以在实际开发,并不适合,而是使用哨兵模式。自主产生主机。

哨兵模式

(自动选举老大的模式)

概述

主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel(哨兵) 架构来解决这个问题。

谋朝篡位的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独 立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。

image.png

这里的哨兵有两个作用

  • 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
  • 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服 务器,修改配置文件,让它们切换主机。

然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。 各个哨兵之间还会进行监控,这样就形成了多哨兵模式。

image.png

假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认 为主服务器不可用,这个现象成为 主观下线 。当后面的哨兵也检测到主服务器不可用,并且数量达到一 定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。 切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为 客观下线

测试!

我们目前的状态是 一主二从!

  1. 配置哨兵配置文件 sentinel.conf
# sentinel monitor 被监控的名称  host  port  1

sentinel montitor myredis  127.0.0.1  6379 1 

后面的这个数字1,代表主机挂了,slave投票看让谁接替成为主机,票数最多的,就会成为主机!

  1. 启动哨兵!
[root@kuangshen bin]# redis-sentinel kconfig/sentinel.conf
26607:X 31 Mar 2020 21:13:10.027 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
26607:X 31 Mar 2020 21:13:10.027 # Redis version=5.0.8, bits=64,
commit=00000000, modified=0, pid=26607, just started
26607:X 31 Mar 2020 21:13:10.027 # Configuration loaded

                _._
            _.-``__ ''-._
        _.-`` `. `_. ''-._       Redis 5.0.8 (00000000/0) 64 bit
    .-`` .-```. ```\/ _.,_ ''-._
    ( ' , .-` | `, )` _.-'|         Running in sentinel mode
    |`-._`-...-` __...-.``-._|'        Port: 26379
    | `-._ `._ / _.-' |             PID: 26607
    `-._ `-._ `-./ _.-' _.-'
    |`-._`-._ `-.__.-' _.-'_.-'|
    | `-._`-._ _.-'_.-' |            http://redis.io
    `-._ `-._`-.__.-'_.-' _.-'
    |`-._`-._ `-.__.-' _.-'_.-'|
    | `-._`-._ _.-'_.-' |
    `-._ `-._`-.__.-'_.-' _.-'

26607:X 31 Mar 2020 21:13:10.029 # WARNING: The TCP backlog setting of 511
cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value
of 128.
26607:X 31 Mar 2020 21:13:10.031 # Sentinel ID is
4c780da7e22d2aebe3bc20c333746f202ce72996
26607:X 31 Mar 2020 21:13:10.031 # +monitor master myredis 127.0.0.1 6379 quorum
1
26607:X 31 Mar 2020 21:13:10.031 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @
myredis 127.0.0.1 6379
26607:X 31 Mar 2020 21:13:10.033 * +slave slave 127.0.0.1:6381 127.0.0.1 6381 @
myredis 127.0.0.1 6379

如果Master 节点断开了,这个时候就会从从机中随机选择一个服务器! (这里面有一个投票算法!)

image.png

哨兵日志!

image.png

如果主机此时回来了,只能归并到新的主机下,当做从机,这就是哨兵模式的规则!

哨兵模式

优点:

  1. 哨兵集群,基于主从复制模式,所有的主从配置优点,它全有
  2. 主从可以切换,故障可以转移,系统的可用性就会更好
  3. 哨兵模式就是主从模式的升级,手动到自动,更加健壮!

缺点:

  1. Redis 不好啊在线扩容的,集群容量一旦到达上限,在线扩容就十分麻烦!
  2. 实现哨兵模式的配置其实是很麻烦的,里面有很多选择!

哨兵模式的全部配置!

# Example sentinel.conf
# 哨兵sentinel实例运行的端口 默认26379
port 26379
# 哨兵sentinel的工作目录
dir /tmp
# 哨兵sentinel监控的redis主节点的 ip port
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 配置多少个sentinel哨兵统一认为master主节点失联 那么这时客观上认为主节点失联了
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 2
# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供
密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒
# sentinel down-after-milliseconds <master-name> <milliseconds>
sentinel down-after-milliseconds mymaster 30000
# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步,
这个数字越小,完成failover所需的时间就越长,
但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。
可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1
# 故障转移的超时时间 failover-timeout 可以用在以下这些方面:
#1. 同一个sentinel对同一个master两次failover之间的间隔时间。
#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那
里同步数据时。
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,
slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
# 默认三分钟
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000
# SCRIPTS EXECUTION
#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知
相关人员。
#对于脚本的运行结果有以下规则:
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。
#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),
将会去调用这个脚本,这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信
息。调用该脚本时,将传给脚本两个参数,一个是事件的类型,一个是事件的描述。如果sentinel.conf配
置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无
法正常启动成功。
#通知脚本
# shell编程
# sentinel notification-script <master-name> <script-path>
sentinel notification-script mymaster /var/redis/notify.sh
# 客户端重新配置主节点参数脚本
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已
经发生改变的信息。
# 以下参数将会在调用脚本时传给脚本:
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
# 目前<state>总是“failover”,
# <role>是“leader”或者“observer”中的一个。
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通
信的
# 这个脚本应该是通用的,能被多次调用,不是针对性的。
# sentinel client-reconfig-script <master-name> <script-path>
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh # 一般都是由运维来配
置!

社会目前程序员饱和(初级和中级)、高级程序员重金难求!(提升自己!)

Redis缓存穿透和雪崩

服务的高可用问题!

在这里我们不会详细的区分析解决方案的底层!

Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一 些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据 的一致性要求很高,那么就不能使用缓存。

另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。

image.png

缓存穿透(查不到)

概念

缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于 是向持久层数据库查询。发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中(秒 杀!),于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了 缓存穿透。

解决方案

布隆过滤器

布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则 丢弃,从而避免了对底层存储系统的查询压力;

image.png

缓存空对象

当存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数 据将会从缓存中获取,保护了后端数据源;

image.png

但是这种方法会存在两个问题:

  1. 如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多 的空值的键;
  2. 即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于 需要保持一致性的业务会有影响。

缓存击穿(量太大,缓存过期!)

概述

这里需要注意和缓存穿透的区别,缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中 对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一 个屏障上凿开了一个洞。

当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访 问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。

解决方案

设置热点数据永不过期 从缓存层面来看,没有设置过期时间,所以不会出现热点 key 过期后产生的问题。

加互斥锁(setnx)

分布式锁:使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布 式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考 验很大。 

image.png

缓存雪崩

概念

缓存雪崩,是指在某一个时间段,缓存集中过期失效。Redis 宕机!

产生雪崩的原因之一,比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商 品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都 过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波 峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

image.png

其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然 形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就 是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知 的,很有可能瞬间就把数据库压垮。

解决方案

redis高可用 这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续 工作,其实就是搭建的集群。(异地多活!)

限流降级(在SpringCloud讲解过!)

这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对 某个key只允许一个线程查询数据和写缓存,其他线程等待。

数据预热

数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数 据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让 缓存失效的时间点尽量均匀。

分类文档:https://note.youdao.com/ynoteshare1/index.html?id=c357abb49568f83fcf2ce2e2972615bf&type=notebook#/WEBf43eb78890e04870b7d545f41641e988

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐