【Skynet】Skynet入门实例
一、下载和编辑CentOS7.7安装相应软件:yum install git#git,用于下载源码yum install gcc#用于编译源码yun install autoconf#用于编译源码yum install readline-devel#编译Lua会用到下载skynet源码:git clone https://github.com/cloudwu/skynet.git编译:cd sky
Skynet入门实例
一、下载和编辑
CentOS7.7安装相应软件:
yum install git #git,用于下载源码
yum install gcc #用于编译源码
yun install autoconf #用于编译源码
yum install readline-devel #编译Lua会用到
下载skynet源码:
git clone https://github.com/cloudwu/skynet.git
编译:
cd skynet #进入skynet目录
make linux #编译
执行指令“make linux”会自动下载第三方库“jemalloc"到/skynet/3rd/
目录中,如果下载过程太慢或者失败,可以自行到https://github.com/jemalloc/jemalloc/
下载,然后解压到/skynet/3rd/
目录中,然后重新执行指令“make linux”进行编译。
注意:gcc版本需要大于4.9(因为用到c++11新特性),所以类似ubuntu14等低版本需要下载gcc-4.9以上的版本,然后替换gcc、g++软连接到gcc-4.9、g++-4.9。
二、运行解析
启动skynet需要指定一份配置文件,例如:
./skynet examples/config
运行结果如下:
[:01000002] LAUNCH snlua bootstrap
[:01000003] LAUNCH snlua launcher
[:01000004] LAUNCH snlua cmaster
[:01000004] master listen socket 0.0.0.0:2013
[:01000005] LAUNCH snlua cslave
[:01000005] slave connect to master 127.0.0.1:2013
[:01000004] connect from 127.0.0.1:40324 4
[:01000006] LAUNCH harbor 1 16777221
[:01000004] Harbor 1 (fd=4) report 127.0.0.1:2526
[:01000005] Waiting for 0 harbors
[:01000005] Shakehand ready
[:01000007] LAUNCH snlua datacenterd
[:01000008] LAUNCH snlua service_mgr
[:01000009] LAUNCH snlua main
[:01000009] Server start
[:0100000a] LAUNCH snlua protoloader
[:0100000b] LAUNCH snlua console
[:0100000c] LAUNCH snlua debug_console 8000
[:0100000c] Start debug console at 127.0.0.1:8000
[:0100000d] LAUNCH snlua simpledb
[:0100000e] LAUNCH snlua watchdog
[:0100000f] LAUNCH snlua gate
[:0100000f] Listen on 0.0.0.0:8888
[:01000009] Watchdog listen on 8888
[:01000009] KILL self
[:01000002] KILL self
输出内容显示skynet依次启动了bootstrap、launcher、cmaster、cslave、harbor、datacenterd、service_mgr、main、protoloader、debug_console、simpledb、watchdog、gate等服务。左侧[:0100000x]代表该条消息由哪个服务产生。可以看到,gate服务监控8888端口。debug_console 服务监控8000端口。
skynet也包含配套的客户端范例,位于“examples/client.lua”中。通过如下语句启动:
lua examples/client.lua
skynet编译后,会包含Lua程序,此程序位于“3rd/lua/lua”中。如果服务器没有安装Lua,或者Lua版本小于5.3,可以用如下命令启动客户端:
./3rd/lua/lua examples/client.lua
三、理解skynet
每个skynet进程(操作系统进程)称为一个节点,每个节点可以开启数千个服务。
不同节点可以部署在不同的物理机上,提供分布式集群能力。
3.1 配置文件说明:
config.path:
root = "./"
luaservice = root.."service/?.lua;"..root.."test/?.lua;"..root.."examples/?.lua;"..root.."test/?/init.lua"
lualoader = root .. "lualib/loader.lua"
lua_path = root.."lualib/?.lua;"..root.."lualib/?/init.lua"
lua_cpath = root .. "luaclib/?.so"
snax = root.."examples/?.lua;"..root.."test/?.lua"
config:
include "config.path"
-- preload = "./examples/preload.lua" -- run preload.lua before every lua service run
thread = 8 -- 启动多少个线程
logpath = "."
harbor = 0 -- skynet初期版本提供了“master/slave”集群模式,后来又提供了更适用的“cluster”集群模式。由于“master/slave”并不完备,因此不推荐使用,将它设置为0即可。
address = "127.0.0.1:2526"
master = "127.0.0.1:2013"
start = "main" -- main script,主服务入口,路径为config文件同目录下
bootstrap = "snlua bootstrap" -- The service for bootstrap,(固定)启动的第一个服务
standalone = "0.0.0.0:2013"
-- snax_interface_g = "snax_g"
cpath = root.."cservice/?.so" -- 用c编写的服务模块的位置
-- daemon = "./skynet.pid" -- 守护进程
logger = nil --输出日志保存到logger项指定的文件中
3.2 目录结构:
- 3rd:存放第三方的代码,如Lua、jemalloc、lpeg等。
- cservice:存放内置的用c语言编写的服务,如gate、harbor、snlua等。
- examples:范例。
- luaclib:用c语言编写的程序库,如bson解析、md5解析等
- lualib:用Lua编写的程序库。
- lualib-src:luaclib目录下,库文件的源码。
- service:包含skynet内置的一些服务。用Lua编写的服务。
- service-src:cservice目录下,程序代的源码。
- skynet-src:用c写的skynet核心代码。
- test:测试代码。
四、skynetAPI
skynet中8个最重要的API:
LuaAPI | 说明 |
---|---|
skynet.newservice(name, …) | 启动一个名(类型)为name的新服务,并返回新服务的地址。同节点内的服务会有唯一地址。例如 local ping1 = newservice(“ping”)表示开启一个ping类型的服务,把地址存放到ping1中。 |
skynet.start(func) | 用func函数初始化服务。编写服务时,都会写一句skynet.start,并在func写一些初始化代码。 |
skynet.dispatch(type, func) | 为type类型的消息设定处理函数func。skynet支持多种消息类型,由于Lua服务间的消息类型是”lua“,因此这里暂时将它固定为”lua“。func是指收到消息后的处理函数,当一个服务收到新消息时,skynet就会开启新协程,并调用它。 func的形式为 function (session, source, cmd, ...) ... end 。 参数session代表消息的唯一id。source代表消息来源,指发送消息的服务地址。cmd代表消息名。”…“是一个可变参数,内容由发送方的skynet.send或skynet.call指定。编写服务,一般会用如下的固定形式。表示以匿名函数的方式编写skynet.start的参数func,并在func中调用dispatch。 skynet.start(function() skynet.dispatch(“lua”, function(参数略)) … end end) |
skynet.send(addr, type, cmd, …) | 向地址为addr的服务发送一条type类型的消息,消息名为cmd。发送方用skynet.send发送消息,接收方用skynet.dispatch接收消息,它们的参数相互对应。若用于服务间通信,类型一般固定为”lua“。 例如,使用如下语句向服务ping1发送消息 skynet.send(ping1, “lua”, “ping”, 1, 2) 在ping1的dispatch回调中,参数的值如下 function(session, source, cmd, p1, p2, p3) –cmd = “ping” –p1=1 –p2=2 –p3=nil end |
skynet.call(addr, type, cmd, …) | 向地址为addr的服务发送一条type类型的消息,消息名为cmd,并等待对方的回应。skynet.call是一个阻塞方法。 |
skynet.exit() | 结束当前服务 |
skynet.self() | 返回当前服务的地址 |
skynet.error(msg) | 向log服务发送一条消息,即打印日志 |
skynet其他API:
SkynetAPI | 说明 |
---|---|
skynet.dispose() | 延迟执行 |
skynet.timeout(time, func) | 定时器 |
处理网络消息的API:
LuaAPI | 说明 |
---|---|
socket.listen(host, port) | 监听客户端连接,其中host代表IP地址,port代表端口,它将返回监听socket的标识 例如 local listenfd = socket.listen(“0.0.0.0”, 8888) 代表监听8888端口,”0.0.0.0“代表不限制客户端的IP,listenfd保存着监听socket的标识。 |
socket.start(fd, connect) | 新客户端连接时,回调方法connect会被调用。参数fd是socket.listen返回的标识;回调方法connect带有两个参数,第一参数代表新连接的标志,第二个参数代表新连接的地址 另外,connect获得一个新连接后,并不会立即接收它的数据,需再次调用socket.start(fd)才会开始接收 一般开启监听的完整写法为 function connect(fd, addr) socket.start(fd) print(fd…" connected addr:" …addr) end |
socket.read(fd) | 从指定的socket上读数据,它是个阻塞方法 |
socket.write(fd, data) | 把数据data置入写队列,skynet框架会在socket可写时发送它 |
socket.close(fd) | 关闭连接,它是个阻塞方法 |
连接MySQL数据库的API:
LuaAPI | 说明 |
---|---|
mysql.connect(args) | 连接数据库。参数args是一个Lua表,包含数据库地址、用户名、密码等信息,API会返回数据库对象,用于后续操作 例如 local db = mysql.connect({ host = “127.0.0.1”, port = 3306, database = “message_board”, user = “root”, password = “123456”, max_packet_size = 1024*1024, on_connect = nil }) 代表连接地址为127.0.0.1、端口为3306、数据库名为message_board、用户名为root、密码为123456的MySQL数据库 |
db:query(sql) | 执行SQL语句。db代表mysql.connect返回的对象,参数sql代表SQL语句 例如 local res = db:query(“select * from msgs”) 代表查询数据库表msgs,返回值res代表查询的结果 db:query(“insert into msgs(text) values(‘hello’)”) 代表把字符串”hello“插入msgs表的text栏位 |
cluster集群的API:
LuaAPI | 说明 |
---|---|
cluster.reload(cfg) | 让本节点(重新)加载节点配置,参数cfg是个Lua表,指示集群中各节点的地址 例如 cluster.reload({ node1 = “127.0.0.1:7001” node2 = “127.0.0.1:7002” }) 指明集群中有名为”node1“和”node2“的两个节点,node1监听本地7001端口,node2监听本地7002端口。 |
cluster.open(node) | 启动节点。节点1需要调用cluster.open(“node1”)、节点2需要调用cluster.open(“node2”),这样它们才能知道自己是cluster.reload中的哪一项,并开启对应的端口监听。 |
cluster.send(node, address, cmd, …) | 向名为node节点、地址为address的服务推送一条消息,这里参数cmd代表消息名 |
cluster.call(node, address, cmd, …) | 它与cluster.send的功能相似,都是向另一个服务推送消息。不同的是,它是个阻塞方法,会等待对方的回应。通过cluster发送的消息均为”lua“类型,无需指定 |
cluster.proxy(node, address) | 为远程节点上的服务创建一个本地代理服务,它会返回代理对象,之后可以用skynet.send、skynet.call操作该代理 |
五、skynet实例程序
4.1 PingPong
examples/Pmain.lua:
local skynet = require "skynet"
skynet.start(function() --skynet.start以function初始化服务
skynet.error("[Pmain] start")
local ping1 = skynet.newservice("ping")
local ping2 = skynet.newservice("ping")
skynet.send(ping1, "lua", "start", ping2)
skynet.exit()
end)
examples/ping.lua:
local skynet = require "skynet"
local CMD = {}
function CMD.start(source, target)
skynet.send(target, "lua", "ping", 1)
end
function CMD.ping(source, count)
local id = skynet.self()
skynet.error("["..id.."] recv ping count="..count)
skynet.sleep(100)
skynet.send(source, "lua", "ping", count+1)
end
skynet.start(function()
skynet.dispatch("lua", function(session, source, cmd , ...) --skynet.dispatch指定参数一类型消息的处理方式(这里是“lua”类型,Lua服务间的消息类型是“lua”),即处理lua服务之间的消息
local f = assert(CMD[cmd])
f(source, ...)
end)
end)
examples/Pconfig:
include "config.path"
-- preload = "./examples/preload.lua" -- run preload.lua before every lua service run
thread = 8
logger = nil
logpath = "."
harbor = 1
address = "127.0.0.1:2526"
master = "127.0.0.1:2013"
start = "Pmain" -- main script
bootstrap = "snlua bootstrap" -- The service for bootstrap
standalone = "0.0.0.0:2013"
-- snax_interface_g = "snax_g"
cpath = root.."cservice/?.so"
-- daemon = "./skynet.pid"
运行:
./skynet examples/Pconfig
4.2 聊天室(Echo的升级,收到的信息广播给所有在线玩家)
examples/Pmain.lua:
local skynet = require "skynet"
local socket = require "skynet.socket"
local clients = {}
function connect(fd, addr)
--启用连接,开始等待接收客户端消息
print(fd .. " connected addr:" .. addr)
socket.start(fd)
clients[fd] = {}
--消息处理
while true do
local readdata = socket.read(fd) --利用协程实现阻塞模式
--正常接收
if readdata ~= nil then
print(fd .. " recv " .. readdata)
for k,v in pairs(clients) do --广播
socket.write(k, readdata)
end
--断开连接
else
print(fd .. " close ")
socket.close(fd)
clients[fd] = nil
end
end
end
skynet.start(function()
local listenfd = socket.listen("0.0.0.0", 8888) --监听所有ip,端口8888
socket.start(listenfd, connect) --新客户端发起连接时,conncet方法将被调用。
end)
examples/Pconfig:
include "config.path"
-- preload = "./examples/preload.lua" -- run preload.lua before every lua service run
thread = 8
logger = nil
logpath = "."
harbor = 1
address = "127.0.0.1:2526"
master = "127.0.0.1:2013"
start = "Pmain" -- main script
bootstrap = "snlua bootstrap" -- The service for bootstrap
standalone = "0.0.0.0:2013"
-- snax_interface_g = "snax_g"
cpath = root.."cservice/?.so"
-- daemon = "./skynet.pid"
运行:
./skynet examples/Pconfig
4.3 做留言板,使用数据库
ubuntu linux安装 mysql:
sudo apt-get install mysql-server-5.6
ubuntu linux以root登录mysql服务, -p是密码登录:
mysql -u root -p
显示mysql数据库中的库:
show databases;
退出mysql:
exit
examples/Pmain.lua:
local skynet = require "skynet"
local mysql = require "skynet.db.mysql"
local socket = require "skynet.socket"
local clients = {}
local db = nil
function connect(fd, addr)
--启用连接,开始等待接收客户端消息
print(fd .. " connected addr:" .. addr)
socket.start(fd)
clients[fd] = {}
--消息处理
while true do
local readdata = socket.read(fd) --利用协程实现阻塞模式
--正常接收
if readdata ~= nil then
if readdata == "get\r\n" then
local res = db:query("select * from msgs") --执行SQL语句。
for k,v in pairs(res) do
socket.write(fd, v.id .. " " .. v.text .. "\r\n")
end
--留言
else
local data = string.match(readdata, "set (.-)\r\n")
db:query("insert into msgs(text) values(\'"..data.."\')") --执行SQL语句。
end
--断开连接
else
print(fd .. " close ")
socket.close(fd)
clients[fd] = nil
end
end
end
skynet.start(function()
--连接数据库
db = mysql.connect({
host="192.168.184.130", --ip
port=3306, --port
database="message_board", --使用的数据库
user="root", --用户名
password="123456", --密码
max_packet_size=1024*1024, --最大包大小
on_connect=nil
})
--网络监听
local listenfd = socket.listen("0.0.0.0", 8888)
socket.start(listenfd, connect)
end)
examples/Pconfig:
include "config.path"
-- preload = "./examples/preload.lua" -- run preload.lua before every lua service run
thread = 8
logger = nil
logpath = "."
harbor = 1
address = "127.0.0.1:2526"
master = "127.0.0.1:2013"
start = "Pmain" -- main script
bootstrap = "snlua bootstrap" -- The service for bootstrap
standalone = "0.0.0.0:2013"
-- snax_interface_g = "snax_g"
cpath = root.."cservice/?.so"
-- daemon = "./skynet.pid"
运行:
./skynet examples/Pconfig
运行客户端:
telnet 127.0.0.1 8888 //连接本地ip端口8888,即连接上了skynet进程开启的端口服务
get
set lrh
4.4 监控服务状态
skynet自带了一个调试控制台服务debug_console,启动它之后,可以查看节点的内部状态。
examples/Pmain.lua:
local skynet = require "skynet"
skynet.start(function() --skynet.start以function初始化服务
skynet.error("[Pmain] start")
skynet.newservice("debug_console", 8000)
local ping1 = skynet.newservice("ping")
local ping2 = skynet.newservice("ping")
local ping3 = skynet.newservice("ping")
skynet.send(ping1, "lua", "start", ping2)
skynet.send(ping2, "lua", "start", ping3)
skynet.exit()
end)
examples/ping.lua:
local skynet = require "skynet"
local CMD = {}
function CMD.start(source, target)
skynet.send(target, "lua", "ping", 1)
end
function CMD.ping(source, count)
local id = skynet.self()
skynet.error("["..id.."] recv ping count="..count)
skynet.sleep(100)
skynet.send(source, "lua", "ping", count+1)
end
skynet.start(function()
skynet.dispatch("lua", function(session, source, cmd , ...) --skynet.dispatch指定参数一类型消息的处理方式(这里是“lua”类型,Lua服务间的消息类型是“lua”),即处理lua服务之间的消息
local f = assert(CMD[cmd])
f(source, ...)
end)
end)
运行:
./skynet examples/Pconfig
运行客户端:
telnet 127.0.0.1 8000
- list:列出skynet启动的所有服务,以及启动服务的参数。在编写程序的过程中,如果怀疑某些服务没成功启动,可用list命令检查。
- mem:用于显示所有Lua服务占用的内存。如果某个服务占用内存很高,可以针对性优化。
- stat:用于列出所有Lua服务的CPU时间、处理的消息总数(message)、消息队列长度(mqlen)、被挂起的请求数量(task)等。
- netstat:用于列出网络连接的概括。
4.5 使用节点集群建立分布式系统
下图展示了skynet的cluster集群模式。在该模式中,用户需为每个节点配置cluster监听端口(即途中的7001和7002),skynet会自动开启gate、cluster等多个服务,用于处理节点间通信 功能。
假如图2-23的ping1要发送消息给另一个节点ping3,流程是:节点1先和节点2建立TCP连接,消息经由skynet传送至节点2的cluster服务,再由cluster转发给节点内的ping3。
节点配置:
examples/.Pconfig.c1中新增的内容如下:
node = "node1"
examples/Pconfig.c2中新增的内容如下:
node = "node2"
代码实现:
examples/Pmain.lua:
local skynet = require "skynet"
local cluster = require "skynet.cluster"
require "skynet.manager"
skynet.start(function ()
cluster.reload({
node1 = "127.0.0.1:7001",
node2 = "127.0.0.1:7002"
})
local mynode = skynet.getenv("node") --获取./skynet启动配置的node变量
if mynode == "node1" then
cluster.open("node1")
local ping1 = skynet.newservice("ping")
local ping2 = skynet.newservice("ping")
skynet.send(ping1, "lua", "start", "node2", "pong")
skynet.send(ping2, "lua", "start", "node2", "pong")
--使用代理,之后便可以将它视为本地服务
local pong = cluster.proxy("node2", "pong")
skynet.send(pong, "lua", "ping", "node1", "ping1", 1)
skynet.send(pong, "lua", "ping", "node1", "ping2", 1)
elseif mynode == "node2" then
cluster.open("node2")
local ping3 = skynet.newservice("ping")
skynet.name("pong", ping3) --修改服务名字
end
end)
examples/ping.lua:
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local mynode = skynet.getenv("node") --获取./skynet启动配置的node变量
local CMD = {}
function CMD.ping(source, source_node, source_srv, count)
local id = skynet.self()
skynet.error("["..id.."] recv ping count=" .. count)
skynet.sleep(100)
cluster.send(source_node, source_srv, "ping", mynode, skynet.self(), count+1)
end
function CMD.start(source, target_node, target)
cluster.send(target_node, target, "ping", mynode, skynet.self(), 1)
end
skynet.start(function ()
skynet.dispatch("lua", function(session, source, cmd , ...) --skynet.dispatch指定参数一类型消息的处理方式(这里是“lua”类型,Lua服务间的消息类型是“lua”),即处理lua服务之间的消息
local f = assert(CMD[cmd])
f(source, ...)
end)
end)
运行:
物理机1:
./skynet examples/Pconfig.c1
物理机2:
./skynet examples/Pconfig.c2
六、使用skynet注意事项
skynet最大的特性是”提供同一机器上充分利用多核CPU的处理能力“,但是由此带来的时序问题值得特别注意。
5.1 协程的作用
skynet服务在收到消息时,会创建一个协程,在协程中会运行消息处理方法(即用skynet.dispatch设置的回调方法)。这意味着,如果在消息处理方法中调用阻塞API(如skynet.call、skynet.sleep、skynet.read),服务不会被卡住(仅仅是处理消息的协程被卡住),执行效率得以提高,但程序的执行顺序将得不到保证。
如图2-36所示,某个服务的消息队列存在多条消息,第一条消息的处理函数是OnMsg1,第二条是OnMsg2。OnMsg1调用了阻塞方法skynet,sleep。尽管程序会依次调用OnMsg1、OnMsg2……。但当执行到阻塞函数时,协程会挂起。实际执行顺序可能是图2-36中右边展示的”语句1、skynet.sleep、语句3、语句4、语句2“。
5.2 扣除金币的Bug
假设游戏有”存款“功能,玩家可以把一定数量的金币存入银行,获得利息。相关服务如图2-37所示,agent服务代表玩家控制的角色,bank代表银行。
存款过程如下:
- 客户发起存款请求(阶段1)
- agent向bank转达请求(阶段2)
- bank返回操作的结果(阶段3)
写法1(有Bug):
local coin = 20 --角色身上的金币数
function CMD.deposit(source)
if coin < 20 then --假设每次存20金币
return
end
local isok = skynet.call(bank, "lua", "deposit", 20)
if isok then
coin = coin -20
end
end
存在这么一种可能,玩家快速地两次点击存款按钮,消息时序按图2-37中①①②③的顺序执行。如果角色身上仅剩20金币,第一次操作时,尚剩余20金币,第二次操作时,依然剩余20金币,两次都操作成功,玩家总共存入40金币,剩余”-20“金币,显然不合理。
写法2(修复bug):
function CMD.deposit(source)
if coin < 20 then --假设每次存20金币
return
end
coin = coin - 20
local isok = skynet.call(bank, "lua", "deposit", 20)
if not isok then
coin = coin + 20
end
end
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)