一、场景描述:

微服务搭建一个平台版本控制平台:实时检查每个应用及应用下各个平台端的在线用户,定点指定各平台下的软件更新。市面上的长连接工具很多,我这里选择socke.io,因为并发这些都是有专业的人测试过了重点是跨平台ios、Android、web等以及各种语言的支持都比较全面。

二、模型简介

三、基本实现

1、springboot集成socket.io长连接

1.1、pom引入依赖

        <!-- 长连接  -->
        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>1.7.19</version>
        </dependency>

 1.2、配置参数初始化

socketio:
  # 在系统中发布用系统的ip就好如果是docker等容器就配置为 0.0.0.0
  host: 172.16.0.57
  port: 10088
  # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
  maxFramePayloadLength: 1048576
  # 设置http交互最大内容长度
  maxHttpContentLength: 1048576
  # socket连接数大小(如只监听一个端口boss线程组为1即可)
  bossCount: 1
  workCount: 100
  allowCustomRequests: true
  # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
  upgradeTimeout: 1000000
  # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
  pingTimeout: 6000000
  # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
  pingInterval: 25000

1.3、长连接启动类配置

import cn.bdk.vcl.domin.GaodeProvinceInfoEnum;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.stream.Collectors;

/**
 * 长连接启动类
 */
@Configuration
public class SocketIOConfig implements InitializingBean {
    @Value("${socketio.host}")
    private String host;
    @Value("${socketio.port}")
    private Integer port;
    @Value("${socketio.bossCount}")
    private int bossCount;
    @Value("${socketio.workCount}")
    private int workCount;
    @Value("${socketio.allowCustomRequests}")
    private boolean allowCustomRequests;
    @Value("${socketio.upgradeTimeout}")
    private int upgradeTimeout;
    @Value("${socketio.pingTimeout}")
    private int pingTimeout;
    @Value("${socketio.pingInterval}")
    private int pingInterval;
    public SocketIOServer socketIOServer;
    @Autowired
    DefaultSocketIOHandler defaultSocketIOHandler;
    @Autowired
    ProvinceSpassNameSocketIOHandler provinceSpassNameSocketIOHandler;

    @Override
    public void afterPropertiesSet() {
        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setReuseAddress(true);
        socketConfig.setTcpNoDelay(true);
        socketConfig.setSoLinger(0);
        com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
        configuration.setSocketConfig(socketConfig);
        // host在本地测试可以设置为localhost或者本机IP,在Linux服务器跑可换成服务器IP
        configuration.setHostname(host);
        configuration.setPort(port);
        // socket连接数大小(如只监听一个端口boss线程组为1即可)
        configuration.setBossThreads(bossCount);
        configuration.setWorkerThreads(workCount);
        configuration.setAllowCustomRequests(allowCustomRequests);
        // 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
        configuration.setUpgradeTimeout(upgradeTimeout);
        // Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
        configuration.setPingTimeout(pingTimeout);
        // Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
        configuration.setPingInterval(pingInterval);
        socketIOServer = new SocketIOServer(configuration);
        Arrays.stream(GaodeProvinceInfoEnum.values()).peek(item -> {
            socketIOServer.addNamespace("/" + item.getValue()).addConnectListener(provinceSpassNameSocketIOHandler);
        }).collect(Collectors.toList());
        //添加事件监听器
        socketIOServer.addListeners(defaultSocketIOHandler);
        //启动SocketIOServer
        socketIOServer.start();
    }
}

1.4、注册事件

import com.corundumstudio.socketio.HandshakeData;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.listener.ConnectListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;

/**
 * 命名空间链接事件监听
 */
@Component
//@Slf4j
public class ProvinceSpassNameSocketIOHandler implements ConnectListener {
    @Resource
    RedisTemplate redisTemplate;

    @Override
    public void onConnect(SocketIOClient client) {
        if (!ObjectUtils.isEmpty(client)) {
            HandshakeData handshakeData = client.getHandshakeData();
            String platformId = handshakeData.getSingleUrlParam("platformId");
            String appId = handshakeData.getSingleUrlParam("appId");
            String phoneNo = handshakeData.getSingleUrlParam("phoneNo");
            if (!StringUtils.isEmpty(platformId)) {
                if (!StringUtils.isEmpty(phoneNo) && !phoneNo.equals("0")) {
                    String key = appId + ":" + platformId;
                    // 先加平台空间不做推送,统计的时候才会用
                    client.joinRoom(appId);
                    client.joinRoom(key);
                    if (redisTemplate.opsForHash().hasKey("allow", key + ":" + phoneNo)) {
                        // 版本管理进入白名单
                        client.joinRoom(key + ":1:allow");
                    } else {
                        // 版本管理进入正常名单
                        client.joinRoom(key + ":1:rurrency");
                    }
                }
            }
        }
    }
}
import com.corundumstudio.socketio.HandshakeData;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.listener.ConnectListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;

/**
 * 默认主空间连接事件监听
 */
@Component
public class DefaultSocketIOHandler implements ConnectListener {
    @Resource
    RedisTemplate redisTemplate;

    @Override
    public void onConnect(SocketIOClient client) {
        if (!ObjectUtils.isEmpty(client)) {
            HandshakeData handshakeData = client.getHandshakeData();
            String platformId = handshakeData.getSingleUrlParam("platformId");
            String appId = handshakeData.getSingleUrlParam("appId");
            String phoneNo = handshakeData.getSingleUrlParam("phoneNo");
            // 先加平台空间不做推送,统计的时候才会用
            client.joinRoom(appId);
            client.joinRoom(appId + ":" + platformId);
            if (!StringUtils.isEmpty(platformId)) {
                if (!StringUtils.isEmpty(phoneNo) && !phoneNo.equals("0")) {
                    //版本更新白名单
                    String key = appId + ":" + platformId + ":1";
                    if (redisTemplate.opsForHash().hasKey("allow", key + ":" + phoneNo)) {
                        client.joinRoom(key + ":allow");
                    } else {
                        client.joinRoom(key + ":rurrency");
                    }
                } else {
                    //适配老版本 更新一版后删除
                    client.joinRoom(platformId);
                }
            }
        }
    }
}

1.5、向外提供用户统计服务

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.corundumstudio.socketio.BroadcastOperations;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIONamespace;
import com.corundumstudio.socketio.SocketIOServer;
import com.google.gson.Gson;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

import javax.annotation.Resource;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
 * <p>
 * 用户实时在线人数统计
 * </p>
 *
 * @author morik
 * @since 2022-05-27
 */
@Service
public class PlatformInfoServiceImpl extends ServiceImpl<PlatformInfoMapper, PlatformInfo> implements IPlatformInfoService {

    @Resource
    private PlatformInfoMapper platformInfoMapper;
    @Resource
    private PlatformVersionMapper platformVersionMapper;
    @Resource
    private SocketIOConfig socketIOConfig;

   
    /**
     * 根据appid查询所有平台的实施在线人数
     */
    public CommonResult<PlatformUseNoListVO> getPlatformUseNo(GetUsageDO getUsageDO) {
        LambdaQueryWrapper<PlatformInfo> lambdaQueryWrapper = new LambdaQueryWrapper<>();
        lambdaQueryWrapper.eq(PlatformInfo::getAppId, getUsageDO.getAppId());
        List<PlatformInfo> platformInfos = platformInfoMapper.selectList(lambdaQueryWrapper);
        if (ObjectUtils.isEmpty(platformInfos) || platformInfos.size() < 1) {
            return CommonResult.failed("没有平台被找到");
        }

        SocketIOServer socketIOServer = socketIOConfig.socketIOServer;
        Collection<SocketIONamespace> allNamespaces = socketIOServer.getAllNamespaces();
        AtomicInteger appUserNo = new AtomicInteger(0);

        //统计应用各个平台使用  21 Android
        platformInfos.stream().peek(it -> {
            AtomicInteger platUserNo = new AtomicInteger(0);
            //统计各行政区划用户使用  /cn50 重庆
            allNamespaces.stream().peek(item -> {
                getUsageDO.setPlatformId(it.getPlatformId());
                getUseNo(getUsageDO, platUserNo, socketIOServer, item, it.getAppId() + ":" + it.getPlatformId());
            }).collect(Collectors.toList());
            it.setUserNo(platUserNo.get());
            appUserNo.addAndGet(platUserNo.get());
        }).collect(Collectors.toList());
        PlatformUseNoListVO platformUseNoListVO = new PlatformUseNoListVO();
        platformUseNoListVO.setPlatformInfos(platformInfos);
        platformUseNoListVO.setAppUseNo(appUserNo.get());
        return CommonResult.success(platformUseNoListVO);
    }

    /**
     * 查询指定应用使用总数或者查询指定应用的指定平台使用总是
     *
     * @param getUsageDO
     * @re
     */
    public CommonResult<String> getAppUseNo(GetUsageDO getUsageDO) {
        AtomicInteger appUserNo = new AtomicInteger(0);
        AtomicInteger platUserNo = new AtomicInteger(0);
        SocketIOServer socketIOServer = socketIOConfig.socketIOServer;
        Collection<SocketIONamespace> allNamespaces = socketIOServer.getAllNamespaces();
        if (!ObjectUtils.isEmpty(socketIOServer)) {
            if (ObjectUtils.isEmpty(getUsageDO.getPlatformId())) {
                allNamespaces.stream().peek(item -> {
                    getUseNo(getUsageDO, appUserNo, socketIOServer, item, getUsageDO.getAppId() + "");
                }).collect(Collectors.toList());
                return CommonResult.success(appUserNo.get() + "");
            } else {
                allNamespaces.stream().peek(item -> {
                    getUseNo(getUsageDO, platUserNo, socketIOServer, item, getUsageDO.getAppId() + ":" + getUsageDO.getPlatformId());
                }).collect(Collectors.toList());
                return CommonResult.success(platUserNo.get() + "");
            }
        }
        return CommonResult.success("0");
    }

    private void getUseNo(GetUsageDO getUsageDO, AtomicInteger appUserNo, SocketIOServer socketIOServer, SocketIONamespace item, String key) {
        if (!ObjectUtils.isEmpty(item)) {
            Collection<SocketIOClient> allClients = item.getAllClients();
            if (!ObjectUtils.isEmpty(getUsageDO.getPlatformId())) {
                BroadcastOperations roomOperations = item.getRoomOperations(key);
                Collection<SocketIOClient> clients = roomOperations.getClients();
                appUserNo.addAndGet(clients.size());
            }
        }
    }
}

2、我们这里直接用java客户端来链接测试

pom文件依赖参考
<!-- 长连接  客户端所需包-->
<dependency>
    <groupId>io.socket</groupId>
    <artifactId>socket.io-client</artifactId>
    <version>2.0.1</version>
</dependency>

import cn.hutool.core.date.DateUtil;
import io.socket.client.IO;
import io.socket.client.Socket;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SocketIOClientLaunch {

    public static void main(String[] args) {
        // 服务端socket.io连接通信地址
        String url = "http://172.16.0.57:10088/cn50";
        try {
            IO.Options options = new IO.Options();
            options.transports = new String[]{"websocket","xhr-polling","jsonp-polling"};
            options.reconnectionAttempts = 2;
            // 失败重连的时间间隔
            options.reconnectionDelay = 1000;
            // 连接超时时间(ms)
            options.timeout = 500;
            // userId: 唯一标识 传给服务端存储
            final Socket socket = IO.socket(url + "?platformId=21&appId=1234585&phoneNo=20", options);

            socket.on(Socket.EVENT_CONNECT, args1 -> socket.send("hello..."));

            // 自定义事件`connected` -> 接收服务端成功连接消息
            socket.on("connected", objects -> log.debug("服务端:" + objects[0].toString()));

            // 自定义事件`push_data_event` -> 接收服务端消息
            socket.on("push_data_event", objects -> log.debug("服务端:" + objects[0].toString()));

            // 自定义事件`myBroadcast` -> 接收服务端广播消息
            socket.on("myBroadcast", objects -> log.debug("服务端:" + objects[0].toString()));

            socket.emit("chatevent","1230");

            socket.connect();

            int i =1;

            while (true) {
                Thread.sleep(3000);
                i++;
                // 自定义事件`push_data_event` -> 向服务端发送消息
                socket.emit("push_data_event", "发送数据 " + i);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

 3、也可以用JS客户端建立简单的长连接测试(ios、Android等每个平台都有相关sdk几行代码都能集成),官网参考地址:Client API | Socket.IO,js下载地址:Files within /

<html>
<head>
    <meta charset="utf-8"/>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.1.0/socket.io.js"></script>
</head>
<body>
<script type="text/javascript">
    const socket = io("http://172.16.0.57:10088/cn51", {
        reconnectionDelayMax: 10000,
        transports: ['websocket'],
        query: {"platformId": "26","appId":"1234585","phoneNo":"21"}
    });
    console.log(socket);
    socket.on('connect', function () {
        console.log("connect")
    });
</script>
</body>
</html>
1、当然也可以用react一键集成ios、Android移动端
1.1、package.json引包
"socket.io-client": "^2.4.0"
2.1、代码参考
import React, {Component} from 'react';
const io = require('socket.io-client');
class App extends Component {
    componentDidMount(): void {
        const socket  = io("http://172.16.0.57:10088/chat", {
            reconnectionDelayMax: 10000,
            transports:['websocket'],
            query: {"platformId": 66666}
        });
        console.log(socket);
        socket.on('connect', function() {
            console.log("connect")
        });
        socket.on('connect_error', (error : any) => {
            console.log(error)
        });
        socket.on('message_event', function(data : any) {
            console.log("message_event")
            console.log(data);
        });
        socket.emit("push_data_event","react发送的数据");
        socket.on("myBroadcast",function (message :any) {
            console.log(message);
        })
    }
    render() {
        return (
            <div>
              hett
            </div>
        );
    }
}
2、flutter一键集成参考
2.1、引包
dependencies:
  # socket.io包
  socket_io_client: ^0.9.9
2.2、代码参考
IO.Socket channel;

@override
void initState() {
  super.initState();

  _listenWebSocket();
}

void _listenWebSocket() async {
  // 构建请求头,可以放一些cookie等信息,
  //Map<String, dynamic> headers = new Map();
  //headers['origin'] = 'http://127.0.0.1:7001';
  // 建立websocket链接
  // 链接的书写规范,schame://host:port/namespace, 这里socket_io_client在处理链接时候会把path和后面的query参数都作为namespace来处理,所以如果我们的namespace是/的话,就直接使用http://host/
  channel = IO.io('http://172.16.0.57:10088/', <String, dynamic>{
    // 请求的path
    'path': '/chat',
    // 构造的header放这里
    'extraHeaders': headers,
    // 查询参数,扔这里
    'query': {
      'EIO': 3,
      'transport': 'websocket',
      'platformId': 66666,
    },
    // 说明需要升级成websocket链接
    'transports': ['websocket'],
  });

  // 链接建立成功之后,可以发送数据到socket.io的后端了
  channel.on('connect', (_) {
    print('connect');
    // 发送消息和回调函数给socket.io服务端,在服务端可以直接获取到该方法,然后调用
    channel.emitWithAck('exchange', '11111', ack: (data) {
      print('ack $data') ;
      if (data != null) {
        print('from server $data');
      } else {
        print("Null") ;
      }
    });
  });
  // 链接建立失败时调用
  channel.on('error', (data){
    print('error');
    print(data);
  });
  // 链接出错时调用
  channel.on("connect_error", (data) => print('connect_error: '));
  // 链接断开时调用
  channel.on('disconnect', (_) => print('disconnect======'));
  // 链接关闭时调用
  channel.on('close', (_) => print('close===='));
  // 服务端emit一个message的事件时,可以直接监听到
  channel.on('message', (data) {
    print('onmessage');
    print(data);
  });
}
// 关闭websocket链接,避免内存占用
@override
void dispose() {
  super.dispose();
  print('close');
  channel.close();
}

3、Android集成
3.1、引包
implementation 'io.socket:socket.io-client:1.0.0'
3.2、给网络权限
<uses-permission android:name="android.permission.INTERNET"/>
3.3、代码参考
private Socket mSocket;

try {
    IO.Options opts = new IO.Options();
    opts.query = "platformId=666";
    mSocket = IO.socket("http://172.16.0.57:10088/chat", opts);
} catch (URISyntaxException e) {
    e.printStackTrace();
}

mSocket.on("connect", new Emitter.Listener() {
    @Override
    public void call(Object... args) {
        runOnUiThread(new Runnable() {
            @Override
            public void run() {
                tv_lj.setText("链接成功");
            }
        });
    }
});
4、ios集成详细配置参考:https://github.com/socketio/socket.io-client-swift
4.1、代码参考
import SocketIO
let manager = SocketManager(socketURL: URL(string: "http://172.16.0.57:10088/chat?platformId=666")!, config: [.log(true), .compress])
let socket = manager.defaultSocket
socket.on(clientEvent: .connect) {data, ack in
    print("socket connected")
}
socket.on("currentAmount") {data, ack in
    guard let cur = data[0] as? Double else { return }
    socket.emitWithAck("canUpdate", cur).timingOut(after: 0) {data in
        if data.first as? String ?? "passed" == SocketAckValue.noAck {
            // Handle ack timeout 
        }
        socket.emit("update", ["amount": cur + 2.50])
    }
    ack.with("Got your currentAmount", "dude")
}
socket.connect()

四、启动服务端及客户端查看实时在线人数

 

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐