随着社交、电商、金融、零售、物联网等行业的快速发展,现实社会织起了了一张庞大而复杂的关系网,亟需一种支持海量复杂数据关系运算的数据库即图数据库。本系列文章是学习知识图谱以及图数据库相关的知识梳理与总结

本文会包含如下内容:

  • 如何快速导入10亿+数据

本篇文章适合人群:架构师、技术专家、对知识图谱与图数据库感兴趣的高级工程师

1. nebula cluster环境

nubula版本2.0.0,后端存储使用的是RocksDB3个节点。 服务器版本是:CPU:  2 * Intel(R) Xeon(R) CPU E5-2680 v4 @ 2.40GHz,内存:256GB, 硬盘:SAS盘

在其中一台服务器上执行数据加载

nebula集群的安装,请参考:https://blog.csdn.net/penriver/article/details/115486872

2. 数据准备

请参考我之前写的blog 图数据库hugegraph如何快速导入10亿+数据

friendster数据集的统计信息如下:共有65608366个顶点,1806067135条边,约18亿+的数据量。

3. 导入数据

因为nebula的importer不能导入vid为int类型的数据,【importer目前只支持导入vid为fixed_string类型的数据】

3.1 创建schema

create space friendster(partition_num=30,replica_factor=1,vid_type=int64);
create tag person();
create edge friend();

3.2 编写程序

源代码详见附录,目前程序特性如下:

  • 支持导入顶点、边的数据

  • 支持指定并发线程数、每批次提交的记录条数

  • 按秒打印导入的记录数

  • 统计导入耗时

3.2 执行导入

导入命令用法: USAGE: ./bin/start.sh dataFile 数据类型【1 点, 2 边】 numThread batchNum

        导入顶点:     

      bin/start.sh path_to_data/com-friendster.vertex.txt 1 20 1000

导入边:

             bin/start.sh path_to_data/com-friendster/com-friendster.ungraph.txt 2 20 1000

顶点的导入速率是61.4万/秒,共耗时106.8秒 边的导入速率是:48.7万/秒,共耗时61.7分钟,如果指定更大的并发数,估计导入速度更好。

导入时,实时日志如下:

4. 附录

4.1. 源代码

package com.vesoft.nebula.examples;

import com.google.common.collect.Lists;
import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.exception.AuthFailedException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.exception.NotValidConnectionException;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class GraphMultiThreadWriteExample {
    private static final Logger logger = LoggerFactory.getLogger(GraphMultiThreadWriteExample.class);
    private static final NebulaPool pool = new NebulaPool();
    private static volatile boolean isRunning = false;

    public static void main(String[] args) throws IOException, InterruptedException {

        String dataFile = "vertexs.txt";
        int dataType = 1;
        int numThread = 3;
        int batchNum = 100;
        if (args.length > 3) {
            dataFile = args[0].trim();
            dataType = NumberUtils.toInt(args[1]);
            numThread = NumberUtils.toInt(args[2]);
            batchNum = NumberUtils.toInt(args[3]);
        }
        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10000);
        ExecutorService executor = Executors.newFixedThreadPool(numThread+1);
        Runtime.getRuntime().addShutdownHook(getExitHandler());
        initPool();

        CountDownLatch latch = new CountDownLatch(numThread);
        List<WorkThread> workThreads = Lists.newArrayList();
        try {
            for (int k = 0; k < numThread; k++) {
                WorkThread workThread = new WorkThread(queue, pool, batchNum, dataType, latch);
                workThreads.add(workThread);
                executor.execute(workThread);
            }
        } catch (Exception e) {
            logger.error("添加工作线程失败", e);
        }
        isRunning = true;
        Runnable statThread = () -> {
            while (isRunning) {
                long total = workThreads.stream().map(x -> x.getProcessNum()).reduce(0L, (a, b) -> a + b);
                logger.info("已经处理了{}条", total);
                try {
                    TimeUnit.MILLISECONDS.sleep(2000);
                } catch (InterruptedException e) {
//                    logger.error(e.getMessage(), e);
                }
            }
        };
        executor.execute(statThread);

        executor.shutdown();

        long start = System.currentTimeMillis();
        BufferedReader br = new BufferedReader(new FileReader(new File(dataFile)));
        String temp = null;
        while ((temp = br.readLine()) != null) {
            if (temp.startsWith("#")) {
                continue;
            }
            queue.put(temp);
        }


        for (int i = 0; i < numThread; i++) {
            queue.add("QUIT");
        }
        isRunning = false;

        latch.await();
        for (WorkThread workThread : workThreads) {
            workThread.close();
        }
        pool.close();
        executor.shutdownNow();

        long totalNum = workThreads.stream().map(x -> x.getProcessNum()).reduce(0L, (a, b) -> a + b);
        logger.info("全部{}线程执行完毕,总共处理{}条, 总耗时{}ms", numThread, totalNum, (System.currentTimeMillis() - start));
    }

    private static void initPool() {
        try {
            NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
            nebulaPoolConfig.setMaxConnSize(100);
            List<HostAddress> addresses = Lists.newArrayList();
            addresses.add(new HostAddress("172.25.21.17", 9669));
            addresses.add(new HostAddress("172.25.21.19", 9669));
            addresses.add(new HostAddress("172.25.21.22", 9669));
            pool.init(addresses, nebulaPoolConfig);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    private static Thread getExitHandler() {
        return new Thread() {
            @Override
            public void run() {
                System.out.println("程序退出");
            }
        };
    }
}

class WorkThread extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(WorkThread.class);
    private static int counter = 0;
    private final int id = ++counter;

    private final BlockingQueue<String> queue;
    private final NebulaPool pool;
    private final int batchNum;
    private final int dataType;
    private final CountDownLatch latch;

    private final String QUIT_STR = "QUIT";
    private final AtomicLong sum = new AtomicLong(0);
    ;
    private Session session;

    public WorkThread(BlockingQueue<String> queue, NebulaPool pool, int batchNum, int dataType, CountDownLatch latch) throws NotValidConnectionException, IOErrorException, AuthFailedException, UnsupportedEncodingException {
        this.queue = queue;
        this.pool = pool;
        this.batchNum = batchNum;
        this.dataType = dataType;
        this.latch = latch;

        session = pool.getSession("admin", "admin", true);
        session.execute("use friendster;");
    }

    @Override
    public void run() {
        if (dataType == 1) {
            doVertexWork();
        } else {
            doEdgeWork();
        }
        latch.countDown();
    }

    private void doVertexWork() {
        try {
            String insertVertexes = "INSERT VERTEX person() VALUES ";
            String rowStrFormat = ", %s:()";
            StringBuffer sb = new StringBuffer();

            for (; ; ) {
                String line = queue.take();
                if (QUIT_STR.equalsIgnoreCase(line)) {
                    break;
                }
                sb.append(String.format(rowStrFormat, line));
                sum.addAndGet(1);
                if (sum.get() % batchNum == 0) {
                    String sql = insertVertexes + sb.substring(1) + ";";
                    sb.setLength(0);
                    insertData(sql);
                }
            }
            if (sum.get() % batchNum != 0) {
                String sql = insertVertexes + sb.substring(1) + ";";
                insertData(sql);
                logger.info(String.format("线程%s共处理了%s条", id, sum.get()));
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    private void doEdgeWork() {
        try {
            String insertVertexes = "INSERT EDGE friend() VALUES ";
            String rowStrFormat = ", %s->%s:()";
            StringBuffer sb = new StringBuffer();

            for (; ; ) {
                String line = queue.take();
                if (QUIT_STR.equalsIgnoreCase(line)) {
                    break;
                }
                String[] cols = line.split("\\s+");
                sb.append(String.format(rowStrFormat, cols[0], cols[1]));
                sum.addAndGet(1);
                if (sum.get() % batchNum == 0) {
                    String sql = insertVertexes + sb.substring(1) + ";";
                    sb.setLength(0);
                    insertData(sql);
                }
            }
            if (sum.get() % batchNum != 0) {
                String sql = insertVertexes + sb.substring(1) + ";";
                insertData(sql);
                logger.info(String.format("线程%s共处理了%s条", id, sum.get()));
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    public void insertData(String sql) {
        try {
            ResultSet resp = session.execute(sql);
            if (!resp.isSucceeded()) {
                logger.error(String.format("Execute: `%s', failed: %s", sql, resp.getErrorMessage()));
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    public void close() {
        if (session != null) {
            session.release();
        }
    }

    public long getProcessNum() {
        return sum.get();
    }
}

4.2. 脚本 

start.sh脚本

#!/bin/sh
#-------------------------------------------------------------------------------------------------------------
#该脚本的使用方式为-->[sh run.sh]
#该脚本可在服务器上的任意目录下执行,不会影响到日志的输出位置等
#-------------------------------------------------------------------------------------------------------------
SCRIPT="$0"
while [ -h "$SCRIPT" ] ; do
  ls=`ls -ld "$SCRIPT"`
  # Drop everything prior to ->
  link=`expr "$ls" : '.*-> \(.*\)$'`
  if expr "$link" : '/.*' > /dev/null; then
    SCRIPT="$link"
  else
    SCRIPT=`dirname "$SCRIPT"`/"$link"
  fi
done

APP_HOME=`dirname "$SCRIPT"`/..

JAVA_HOME="/usr/java/jdk1.8.0_60"
if [ $# -lt 4 ] ; then
  echo "导入数据到nebula"
  echo "USAGE: ./bin/start.sh dataFile 数据类型【1 点, 2 边】 numThread batchNum"
  echo " e.g.: ./bin/start.sh vertexs.txt 1 10 100"
  exit 1;
fi

APP_LOG=${APP_HOME}/logs
APP_HOME=`cd "$SCRIPT"; pwd`
CLASSPATH=$APP_HOME/conf
for jarFile in ${APP_HOME}/lib/*.jar;
do
   CLASSPATH=$CLASSPATH:$jarFile
done

#参数处理
APP_MAIN="com.vesoft.nebula.examples.GraphMultiThreadWriteExample"

params=$@
JAVA_OPTS="-Duser.timezone=GMT+8 -server -Xms2048m -Xmx2048m -Xloggc:${APP_LOG}/gc.log -DLOG_DIR=${APP_LOG}"


startup(){
  aparams=($params)
  #echo "params len "${#aparams[@]}
  len=${#aparams[@]}
  for ((i=0;i<$len;i++));do
    echo "第${i}参数:${aparams[$i]}";
    str=$str" "${aparams[$i]};
  done
  echo "Starting $APP_MAIN"
  echo "$JAVA_HOME/bin/java $JAVA_OPTS -classpath $CLASSPATH $APP_MAIN $str"
  $JAVA_HOME/bin/java $JAVA_OPTS -classpath $CLASSPATH $APP_MAIN $str
}
startup

4.3. log4j配置

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">

<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
        <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
                <param name="encoding" value="UTF-8" />
                <param name="target" value="System.out" />
                <layout class="org.apache.log4j.PatternLayout">
                        <param name="ConversionPattern" value="%d %p [%c:%L] - %m%n" />
                </layout>
        </appender>

        <appender name="FileAppender" class="org.apache.log4j.RollingFileAppender">
                <param name="file" value="${LOG_DIR}/run.log" />
                <!-- <param name="file" value="./logs/run.log" /> -->
                <param name="append" value="true" />
                <param name="encoding" value="UTF-8" />
                <param name="maxFileSize" value="1073741824" />
                <param name="maxBackupIndex" value="10" />
                <layout class="org.apache.log4j.PatternLayout">
                        <param name="ConversionPattern" value="%d{yyy-MM-dd HH\:mm\:ss,SSS} %p [%c:%L] - %m%n" />
                </layout>
        </appender>

        <appender name="ASYNCFileAppender" class="org.apache.log4j.AsyncAppender">
                <param name="BufferSize" value="16384" />
                <appender-ref ref="FileAppender" />
        </appender>

        <root>
                <level value="INFO" />
                <appender-ref ref="CONSOLE" />
                <appender-ref ref="ASYNCFileAppender" />
        </root>

</log4j:configuration>

4.4. 依赖的jar包

  • client-2.0.0-SNAPSHOT.jar 是nebula客户端jar包
  • examples-2.0.0-SNAPSHOT.jar就是源代码打成的jar包
  • commons-csv-1.7.jar
  • commons-lang3-3.8.jar 
  • log4j-1.2.17.jar      
  • slf4j-log4j12-1.7.25.jar
  • commons-codec-1.13.jar    
  • commons-lang-2.6.jar
  • commons-pool2-2.2.jar  
  • guava-14.0.jar              
  • slf4j-api-1.7.25.jar
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐