图数据库Nebula Graph如何快速导入10亿+数据
随着社交、电商、金融、零售、物联网等行业的快速发展,现实社会织起了了一张庞大而复杂的关系网,亟需一种支持海量复杂数据关系运算的数据库即图数据库。本系列文章是学习知识图谱以及图数据库相关的知识梳理与总结本文会包含如下内容:如何快速导入10亿+数据本篇文章适合人群:架构师、技术专家、对知识图谱与图数据库感兴趣的高级工程师1. nebula cluster环境nubula版本2.0.0,后端存储使用的是
随着社交、电商、金融、零售、物联网等行业的快速发展,现实社会织起了了一张庞大而复杂的关系网,亟需一种支持海量复杂数据关系运算的数据库即图数据库。本系列文章是学习知识图谱以及图数据库相关的知识梳理与总结
本文会包含如下内容:
- 如何快速导入10亿+数据
本篇文章适合人群:架构师、技术专家、对知识图谱与图数据库感兴趣的高级工程师
1. nebula cluster环境
nubula版本2.0.0,后端存储使用的是RocksDB,3个节点。 服务器版本是:CPU: 2 * Intel(R) Xeon(R) CPU E5-2680 v4 @ 2.40GHz,内存:256GB, 硬盘:SAS盘
在其中一台服务器上执行数据加载
nebula集群的安装,请参考:https://blog.csdn.net/penriver/article/details/115486872
2. 数据准备
请参考我之前写的blog 图数据库hugegraph如何快速导入10亿+数据
friendster数据集的统计信息如下:共有65608366个顶点,1806067135条边,约18亿+的数据量。
3. 导入数据
因为nebula的importer不能导入vid为int类型的数据,【importer目前只支持导入vid为fixed_string类型的数据】
3.1 创建schema
create space friendster(partition_num=30,replica_factor=1,vid_type=int64);
create tag person();
create edge friend();
3.2 编写程序
源代码详见附录,目前程序特性如下:
-
支持导入顶点、边的数据
-
支持指定并发线程数、每批次提交的记录条数
-
按秒打印导入的记录数
-
统计导入耗时
3.2 执行导入
导入命令用法: USAGE: ./bin/start.sh dataFile 数据类型【1 点, 2 边】 numThread batchNum
导入顶点:
bin/start.sh path_to_data/com-friendster.vertex.txt 1 20 1000
导入边:
bin/start.sh path_to_data/com-friendster/com-friendster.ungraph.txt 2 20 1000
顶点的导入速率是61.4万/秒,共耗时106.8秒 边的导入速率是:48.7万/秒,共耗时61.7分钟,如果指定更大的并发数,估计导入速度更好。
导入时,实时日志如下:
4. 附录
4.1. 源代码
package com.vesoft.nebula.examples;
import com.google.common.collect.Lists;
import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.exception.AuthFailedException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.exception.NotValidConnectionException;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class GraphMultiThreadWriteExample {
private static final Logger logger = LoggerFactory.getLogger(GraphMultiThreadWriteExample.class);
private static final NebulaPool pool = new NebulaPool();
private static volatile boolean isRunning = false;
public static void main(String[] args) throws IOException, InterruptedException {
String dataFile = "vertexs.txt";
int dataType = 1;
int numThread = 3;
int batchNum = 100;
if (args.length > 3) {
dataFile = args[0].trim();
dataType = NumberUtils.toInt(args[1]);
numThread = NumberUtils.toInt(args[2]);
batchNum = NumberUtils.toInt(args[3]);
}
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10000);
ExecutorService executor = Executors.newFixedThreadPool(numThread+1);
Runtime.getRuntime().addShutdownHook(getExitHandler());
initPool();
CountDownLatch latch = new CountDownLatch(numThread);
List<WorkThread> workThreads = Lists.newArrayList();
try {
for (int k = 0; k < numThread; k++) {
WorkThread workThread = new WorkThread(queue, pool, batchNum, dataType, latch);
workThreads.add(workThread);
executor.execute(workThread);
}
} catch (Exception e) {
logger.error("添加工作线程失败", e);
}
isRunning = true;
Runnable statThread = () -> {
while (isRunning) {
long total = workThreads.stream().map(x -> x.getProcessNum()).reduce(0L, (a, b) -> a + b);
logger.info("已经处理了{}条", total);
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
// logger.error(e.getMessage(), e);
}
}
};
executor.execute(statThread);
executor.shutdown();
long start = System.currentTimeMillis();
BufferedReader br = new BufferedReader(new FileReader(new File(dataFile)));
String temp = null;
while ((temp = br.readLine()) != null) {
if (temp.startsWith("#")) {
continue;
}
queue.put(temp);
}
for (int i = 0; i < numThread; i++) {
queue.add("QUIT");
}
isRunning = false;
latch.await();
for (WorkThread workThread : workThreads) {
workThread.close();
}
pool.close();
executor.shutdownNow();
long totalNum = workThreads.stream().map(x -> x.getProcessNum()).reduce(0L, (a, b) -> a + b);
logger.info("全部{}线程执行完毕,总共处理{}条, 总耗时{}ms", numThread, totalNum, (System.currentTimeMillis() - start));
}
private static void initPool() {
try {
NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
nebulaPoolConfig.setMaxConnSize(100);
List<HostAddress> addresses = Lists.newArrayList();
addresses.add(new HostAddress("172.25.21.17", 9669));
addresses.add(new HostAddress("172.25.21.19", 9669));
addresses.add(new HostAddress("172.25.21.22", 9669));
pool.init(addresses, nebulaPoolConfig);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private static Thread getExitHandler() {
return new Thread() {
@Override
public void run() {
System.out.println("程序退出");
}
};
}
}
class WorkThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(WorkThread.class);
private static int counter = 0;
private final int id = ++counter;
private final BlockingQueue<String> queue;
private final NebulaPool pool;
private final int batchNum;
private final int dataType;
private final CountDownLatch latch;
private final String QUIT_STR = "QUIT";
private final AtomicLong sum = new AtomicLong(0);
;
private Session session;
public WorkThread(BlockingQueue<String> queue, NebulaPool pool, int batchNum, int dataType, CountDownLatch latch) throws NotValidConnectionException, IOErrorException, AuthFailedException, UnsupportedEncodingException {
this.queue = queue;
this.pool = pool;
this.batchNum = batchNum;
this.dataType = dataType;
this.latch = latch;
session = pool.getSession("admin", "admin", true);
session.execute("use friendster;");
}
@Override
public void run() {
if (dataType == 1) {
doVertexWork();
} else {
doEdgeWork();
}
latch.countDown();
}
private void doVertexWork() {
try {
String insertVertexes = "INSERT VERTEX person() VALUES ";
String rowStrFormat = ", %s:()";
StringBuffer sb = new StringBuffer();
for (; ; ) {
String line = queue.take();
if (QUIT_STR.equalsIgnoreCase(line)) {
break;
}
sb.append(String.format(rowStrFormat, line));
sum.addAndGet(1);
if (sum.get() % batchNum == 0) {
String sql = insertVertexes + sb.substring(1) + ";";
sb.setLength(0);
insertData(sql);
}
}
if (sum.get() % batchNum != 0) {
String sql = insertVertexes + sb.substring(1) + ";";
insertData(sql);
logger.info(String.format("线程%s共处理了%s条", id, sum.get()));
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private void doEdgeWork() {
try {
String insertVertexes = "INSERT EDGE friend() VALUES ";
String rowStrFormat = ", %s->%s:()";
StringBuffer sb = new StringBuffer();
for (; ; ) {
String line = queue.take();
if (QUIT_STR.equalsIgnoreCase(line)) {
break;
}
String[] cols = line.split("\\s+");
sb.append(String.format(rowStrFormat, cols[0], cols[1]));
sum.addAndGet(1);
if (sum.get() % batchNum == 0) {
String sql = insertVertexes + sb.substring(1) + ";";
sb.setLength(0);
insertData(sql);
}
}
if (sum.get() % batchNum != 0) {
String sql = insertVertexes + sb.substring(1) + ";";
insertData(sql);
logger.info(String.format("线程%s共处理了%s条", id, sum.get()));
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public void insertData(String sql) {
try {
ResultSet resp = session.execute(sql);
if (!resp.isSucceeded()) {
logger.error(String.format("Execute: `%s', failed: %s", sql, resp.getErrorMessage()));
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public void close() {
if (session != null) {
session.release();
}
}
public long getProcessNum() {
return sum.get();
}
}
4.2. 脚本
start.sh脚本
#!/bin/sh
#-------------------------------------------------------------------------------------------------------------
#该脚本的使用方式为-->[sh run.sh]
#该脚本可在服务器上的任意目录下执行,不会影响到日志的输出位置等
#-------------------------------------------------------------------------------------------------------------
SCRIPT="$0"
while [ -h "$SCRIPT" ] ; do
ls=`ls -ld "$SCRIPT"`
# Drop everything prior to ->
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
SCRIPT="$link"
else
SCRIPT=`dirname "$SCRIPT"`/"$link"
fi
done
APP_HOME=`dirname "$SCRIPT"`/..
JAVA_HOME="/usr/java/jdk1.8.0_60"
if [ $# -lt 4 ] ; then
echo "导入数据到nebula"
echo "USAGE: ./bin/start.sh dataFile 数据类型【1 点, 2 边】 numThread batchNum"
echo " e.g.: ./bin/start.sh vertexs.txt 1 10 100"
exit 1;
fi
APP_LOG=${APP_HOME}/logs
APP_HOME=`cd "$SCRIPT"; pwd`
CLASSPATH=$APP_HOME/conf
for jarFile in ${APP_HOME}/lib/*.jar;
do
CLASSPATH=$CLASSPATH:$jarFile
done
#参数处理
APP_MAIN="com.vesoft.nebula.examples.GraphMultiThreadWriteExample"
params=$@
JAVA_OPTS="-Duser.timezone=GMT+8 -server -Xms2048m -Xmx2048m -Xloggc:${APP_LOG}/gc.log -DLOG_DIR=${APP_LOG}"
startup(){
aparams=($params)
#echo "params len "${#aparams[@]}
len=${#aparams[@]}
for ((i=0;i<$len;i++));do
echo "第${i}参数:${aparams[$i]}";
str=$str" "${aparams[$i]};
done
echo "Starting $APP_MAIN"
echo "$JAVA_HOME/bin/java $JAVA_OPTS -classpath $CLASSPATH $APP_MAIN $str"
$JAVA_HOME/bin/java $JAVA_OPTS -classpath $CLASSPATH $APP_MAIN $str
}
startup
4.3. log4j配置
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<param name="encoding" value="UTF-8" />
<param name="target" value="System.out" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %p [%c:%L] - %m%n" />
</layout>
</appender>
<appender name="FileAppender" class="org.apache.log4j.RollingFileAppender">
<param name="file" value="${LOG_DIR}/run.log" />
<!-- <param name="file" value="./logs/run.log" /> -->
<param name="append" value="true" />
<param name="encoding" value="UTF-8" />
<param name="maxFileSize" value="1073741824" />
<param name="maxBackupIndex" value="10" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyy-MM-dd HH\:mm\:ss,SSS} %p [%c:%L] - %m%n" />
</layout>
</appender>
<appender name="ASYNCFileAppender" class="org.apache.log4j.AsyncAppender">
<param name="BufferSize" value="16384" />
<appender-ref ref="FileAppender" />
</appender>
<root>
<level value="INFO" />
<appender-ref ref="CONSOLE" />
<appender-ref ref="ASYNCFileAppender" />
</root>
</log4j:configuration>
4.4. 依赖的jar包
- client-2.0.0-SNAPSHOT.jar 是nebula客户端jar包
- examples-2.0.0-SNAPSHOT.jar就是源代码打成的jar包
- commons-csv-1.7.jar
- commons-lang3-3.8.jar
- log4j-1.2.17.jar
- slf4j-log4j12-1.7.25.jar
- commons-codec-1.13.jar
- commons-lang-2.6.jar
- commons-pool2-2.2.jar
- guava-14.0.jar
- slf4j-api-1.7.25.jar
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)