系列文章索引

OpenResty使用Lua大全(一)Lua语法入门实战
OpenResty使用Lua大全(二)在OpenResty中使用Lua
OpenResty使用Lua大全(三)OpenResty使用Json模块解析json
OpenResty使用Lua大全(四)OpenResty中使用Redis
OpenResty使用Lua大全(五)OpenResty中使用MySQL
OpenResty使用Lua大全(六)OpenResty发送http请求
OpenResty使用Lua大全(七)OpenResty使用全局缓存
OpenResty使用Lua大全(八)OpenResty执行流程与阶段详解
OpenResty使用Lua大全(九)实战:nginx-lua-redis实现访问频率控制
OpenResty使用Lua大全(十)实战: Lua + Redis 实现动态封禁 IP
OpenResty使用Lua大全(十一)实战: nginx实现接口签名安全认证
OpenResty使用Lua大全(十二)实战: 动手实现一个网关框架

一、连接redis服务器

1、基本使用

---定义 redis关闭连接的方法
local function close_redis(red)  
    if not red then  
        return  
    end  
    local ok, err = red:close()  
    if not ok then  
        ngx.say("close redis error : ", err)  
    end  
end  

local redis = require "resty.redis"  --引入redis模块
local red = redis:new()  --创建一个对象,注意是用冒号调用的
--设置超时(毫秒)  
red:set_timeout(1000) 
--建立连接  
local ip = "127.0.0.1"  
local port = 6379
local ok, err = red:connect(ip, port)
if not ok then  
    ngx.say("connect to redis error : ", err)  
    return close_redis(red)  
end  
--调用API设置key  
ok, err = red:set("msg", "hello world")  
if not ok then  
    ngx.say("set msg error : ", err)  
    return close_redis(red)  
end  
--调用API获取key值  
local resp, err = red:get("msg")  
if not resp then  
    ngx.say("get msg error : ", err)  
    return close_redis(red)  
end 

ngx.say("msg : ", resp) 
close_redis(red)  

请求结果 msg : hello world

2、null处理

注意:得到的数据为空处理 ,redis返回的空 为null,所以不能用nil判断,而要用ngx.null判断

if resp == ngx.null then  
    resp = ''  --比如默认值  
end  

3、连接需要授权的redis

-- 在red:connect成功后,调用red:auth认证密码
ok, err = red:auth("123456")
if not ok then
	ngx.say("failed to auth: ", err)
	return close_redis(red)
end

4、使用redis连接池

redis的连接是tcp连接,建立TCP连接需要三次握手,而释放TCP连接需要四次握手,而这些往返时延仅需要一次,
以后应该复用TCP连接,此时就可以考虑使用连接池,即连接池可以复用连接。
我们需要把close_redis函数改造一下

local function close_redis(red)  
    if not red then  
        return  
    end  
    --释放连接(连接池实现)  
    local pool_max_idle_time = 10000 --毫秒  
    local pool_size = 100 --连接池大小  
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)  
    if not ok then  
        ngx.say("set keepalive error : ", err)  
    end  
end  

即设置空闲连接超时时间防止连接一直占用不释放;设置连接池大小来复用连接。
注意:
1、连接池是每Worker进程的,而不是每Server的;
2、当连接超过最大连接池大小时,会按照LRU算法回收空闲连接为新连接使用;
3、连接池中的空闲连接出现异常时会自动被移除;
4、连接池是通过ip和port标识的,即相同的ip和port会使用同一个连接池(即使是不同类型的客户端);
5、连接池第一次set_keepalive时连接池大小就确定下了,不会再变更;

注意:我们如何知道,redis连接对象是从连接池中获取的,还是新创建的连接呢??
使用 red:get_reused_times —>得到此连接被使用的次数
如果当前连接不是从内建连接池中获取的,该方法总是返回 0 ,也就是说,该连接还没有被使用过。
如果连接来自连接池,那么返回值永远都是非零。所以这个方法可以用来确认当前连接是否来自池子。

5、连接优化

采用连接池,连接带认证的redis

---定义 redis关闭连接的方法
local function close_redis(red)  
    if not red then  
        return  
    end  
    --释放连接(连接池实现)  
    local pool_max_idle_time = 10000 --毫秒  
    local pool_size = 100 --连接池大小  
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)  
    if not ok then  
        ngx.say("set keepalive error : ", err)  
    end  
end   

local redis = require "resty.redis"  --引入redis模块
local red = redis:new()  --创建一个对象,注意是用冒号调用的
--设置超时(毫秒)  
red:set_timeout(1000) 
--建立连接  
local ip = "127.0.0.1"  
local port = 6379
local ok, err = red:connect(ip, port)
if not ok then  
    ngx.say("connect to redis error : ", err)  
    return close_redis(red)  
end  

local count, err = red:get_reused_times()
if 0 == count then ----新建连接,需要认证密码
	ok, err = red:auth("123456")
	if not ok then
		ngx.say("failed to auth: ", err)
		return
	end
elseif err then  ----从连接池中获取连接,无需再次认证密码
	ngx.say("failed to get reused times: ", err)
	return
end

--调用API设置key  
ok, err = red:set("msg", "hello world333333333")  
if not ok then  
    ngx.say("set msg error : ", err)  
    return close_redis(red)  
end  
--调用API获取key值  
local resp, err = red:get("msg")  
if not resp then  
    ngx.say("get msg error : ", err)  
    return close_redis(red)  
end 

ngx.say("msg : ", resp) 
close_redis(red) 

非常注意:连接池使用过程中,业务代码有select方法,会导致数据错乱

ok, err = red:select(1)  --->选择db
if not ok then
    ngx.say("failed to select db: ", err)
    return
end

如:
A业务使用了db1,所以使用了 select(1);

B业务使用默认的db0,select(0)遗漏

但A,B业务共用了连接池,很有可能 B业务拿到的 A业务使用的连接,而此连接操作的数据库db1;
而B业务中代码没有指定select数据库,所以B业务操作数据到了db1中;导致数据错乱

二、openresty中封装的redis

1、封装

在关于web+lua+openresty开发中,项目中会大量操作redis,

重复创建连接–>数据操作–>关闭连接(或放到连接池)这个完整的链路调用完毕,
甚至还要考虑不同的 return 情况做不同处理,就很快发现代码中有大量的重复

推荐一个二次封装的类库
/usr/local/openresty/lualib/resty目录创建redis_iresty.lua文件:

local redis_c = require "resty.redis"

local ok, new_tab = pcall(require, "table.new")
if not ok or type(new_tab) ~= "function" then
    new_tab = function (narr, nrec) return {} end
end

local _M = new_tab(0, 155)
_M._VERSION = '0.01'

local commands = {
    "append",            "auth",              "bgrewriteaof",
    "bgsave",            "bitcount",          "bitop",
    "blpop",             "brpop",
    "brpoplpush",        "client",            "config",
    "dbsize",
    "debug",             "decr",              "decrby",
    "del",               "discard",           "dump",
    "echo",
    "eval",              "exec",              "exists",
    "expire",            "expireat",          "flushall",
    "flushdb",           "get",               "getbit",
    "getrange",          "getset",            "hdel",
    "hexists",           "hget",              "hgetall",
    "hincrby",           "hincrbyfloat",      "hkeys",
    "hlen",
    "hmget",              "hmset",      "hscan",
    "hset",
    "hsetnx",            "hvals",             "incr",
    "incrby",            "incrbyfloat",       "info",
    "keys",
    "lastsave",          "lindex",            "linsert",
    "llen",              "lpop",              "lpush",
    "lpushx",            "lrange",            "lrem",
    "lset",              "ltrim",             "mget",
    "migrate",
    "monitor",           "move",              "mset",
    "msetnx",            "multi",             "object",
    "persist",           "pexpire",           "pexpireat",
    "ping",              "psetex",            "psubscribe",
    "pttl",
    "publish",      --[[ "punsubscribe", ]]   "pubsub",
    "quit",
    "randomkey",         "rename",            "renamenx",
    "restore",
    "rpop",              "rpoplpush",         "rpush",
    "rpushx",            "sadd",              "save",
    "scan",              "scard",             "script",
    "sdiff",             "sdiffstore",
    "select",            "set",               "setbit",
    "setex",             "setnx",             "setrange",
    "shutdown",          "sinter",            "sinterstore",
    "sismember",         "slaveof",           "slowlog",
    "smembers",          "smove",             "sort",
    "spop",              "srandmember",       "srem",
    "sscan",
    "strlen",       --[[ "subscribe",  ]]     "sunion",
    "sunionstore",       "sync",              "time",
    "ttl",
    "type",         --[[ "unsubscribe", ]]    "unwatch",
    "watch",             "zadd",              "zcard",
    "zcount",            "zincrby",           "zinterstore",
    "zrange",            "zrangebyscore",     "zrank",
    "zrem",              "zremrangebyrank",   "zremrangebyscore",
    "zrevrange",         "zrevrangebyscore",  "zrevrank",
    "zscan",
    "zscore",            "zunionstore",       "evalsha"
}

local mt = { __index = _M }

local function is_redis_null( res )
    if type(res) == "table" then
        for k,v in pairs(res) do
            if v ~= ngx.null then
                return false
            end
        end
        return true
    elseif res == ngx.null then
        return true
    elseif res == nil then
        return true
    end

    return false
end

function _M.close_redis(self, redis)  
    if not redis then  
        return  
    end  
    --释放连接(连接池实现)
    local pool_max_idle_time = self.pool_max_idle_time --最大空闲时间 毫秒  
    local pool_size = self.pool_size --连接池大小  
	
    local ok, err = redis:set_keepalive(pool_max_idle_time, pool_size)  
    if not ok then  
        ngx.say("set keepalive error : ", err)  
    end  
end  

-- change connect address as you need
function _M.connect_mod( self, redis )
    redis:set_timeout(self.timeout)
		
	local ok, err = redis:connect(self.ip, self.port)
	if not ok then  
		ngx.say("connect to redis error : ", err)  
		return self:close_redis(redis)  
	end
	
	if self.password then ----密码认证
		local count, err = redis:get_reused_times()
		if 0 == count then ----新建连接,需要认证密码
			ok, err = redis:auth(self.password)
			if not ok then
				ngx.say("failed to auth: ", err)
				return
			end
		elseif err then  ----从连接池中获取连接,无需再次认证密码
			ngx.say("failed to get reused times: ", err)
			return
		end
	end

    return ok,err;
end

function _M.init_pipeline( self )
    self._reqs = {}
end

function _M.commit_pipeline( self )
    local reqs = self._reqs

    if nil == reqs or 0 == #reqs then
        return {}, "no pipeline"
    else
        self._reqs = nil
    end

    local redis, err = redis_c:new()
    if not redis then
        return nil, err
    end

    local ok, err = self:connect_mod(redis)
    if not ok then
        return {}, err
    end

    redis:init_pipeline()
    for _, vals in ipairs(reqs) do
        local fun = redis[vals[1]]
        table.remove(vals , 1)

        fun(redis, unpack(vals))
    end

    local results, err = redis:commit_pipeline()
    if not results or err then
        return {}, err
    end

    if is_redis_null(results) then
        results = {}
        ngx.log(ngx.WARN, "is null")
    end
    -- table.remove (results , 1)

    --self.set_keepalive_mod(redis)
	self:close_redis(redis)  

    for i,value in ipairs(results) do
        if is_redis_null(value) then
            results[i] = nil
        end
    end

    return results, err
end


local function do_command(self, cmd, ... )
    if self._reqs then
        table.insert(self._reqs, {cmd, ...})
        return
    end

    local redis, err = redis_c:new()
    if not redis then
        return nil, err
    end

    local ok, err = self:connect_mod(redis)
    if not ok or err then
        return nil, err
    end

	redis:select(self.db_index)
	
    local fun = redis[cmd]
    local result, err = fun(redis, ...)
    if not result or err then
        -- ngx.log(ngx.ERR, "pipeline result:", result, " err:", err)
        return nil, err
    end

    if is_redis_null(result) then
        result = nil
    end

    --self.set_keepalive_mod(redis)
	self:close_redis(redis)  

    return result, err
end

for i = 1, #commands do
    local cmd = commands[i]
    _M[cmd] =
            function (self, ...)
                return do_command(self, cmd, ...)
            end
end

function _M.new(self, opts)
    opts = opts or {}
    local timeout = (opts.timeout and opts.timeout * 1000) or 1000
    local db_index= opts.db_index or 0
	local ip = opts.ip or '127.0.0.1'
	local port = opts.port or 6379
	local password = opts.password
	local pool_max_idle_time = opts.pool_max_idle_time or 60000
	local pool_size = opts.pool_size or 100

    return setmetatable({
            timeout = timeout,
            db_index = db_index,
			ip = ip,
			port = port,
			password = password,
			pool_max_idle_time = pool_max_idle_time,
			pool_size = pool_size,
            _reqs = nil }, mt)
end

return _M

2、使用

local redis = require "resty.redis_iresty"

local opts = {
	ip = "127.0.0.1",
	port = "6379",
	password = "123456",
	db_index = 1
}

local red = redis:new(opts)

local ok, err = red:set("dog", "an animal11111")
if not ok then
    ngx.say("failed to set dog: ", err)
    return
end

ngx.say("set result: ", ok)

3、管道

每次set、get都会获取一个连接,可以使用管道的方式,将一组命令使用同一个连接进行处理。

red:init_pipeline()
red:set("cat", "Marry")
red:set("horse", "Bob")
red:get("cat")
red:get("horse")
local results, err = red:commit_pipeline()
if not results then
    ngx.say("failed to commit the pipelined requests: ", err)
    return
end

for i, res in ipairs(results) do
    ngx.say(res,"<br/>");
end

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐