TDengine 如何批量插入?
【本文正在参与“拥抱开源 | 涛思数据 TDengine有奖征稿】拥抱开源 涛思数据 TDengine有奖征稿活动-CSDN链接:https://marketing.csdn.net/p/0ada836ca30caa924b9baae0fd33857c文章目录正常插入批量插入解决错误查询官方文档常见问题官方文档连接器官方文档高效写入数据官方文档最终结论修改后的代码  
【本文正在参与 “拥抱开源 | 涛思数据 TDengine有奖征稿】拥抱开源 涛思数据 TDengine有奖征稿活动-CSDN 链接:https://marketing.csdn.net/p/0ada836ca30caa924b9baae0fd33857c
最近在使用涛思旗下tdengine数据库的时候,因为需求,需要做一个批量插入,遇到了些问题,记录一下。
正常插入
在测试时,数据都是单条插入的,代码如下:
package test;
import java.sql.*;
public class jdbcTest {
public static void main(String[] args) throws ClassNotFoundException, SQLException {
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
// Class.forName("com.taosdata.jdbc.TSDBDriver");
// j?charset=UTF-8&locale=en_US.UTF-8
String jdbcUrl = "jdbc:TAOS://:6030/restful_test?user=zhangsan&password=123456";
Connection conn = DriverManager.getConnection(jdbcUrl);
Statement stmt = conn.createStatement();
stmt.executeUpdate("use restful_test");
System.out.println( 111 );
// int affectedRows = stmt.executeUpdate("insert into restful_test.l_test values(now,1) ");
int affectedRows = stmt.executeUpdate("insert into student values(1622685792584, \"zhangsan\", \"lisi\", 90) ");
System.out.println("insert " + affectedRows + " rows.");
stmt.close();
conn.close();
}
}
该代码运行正常,可以插入。但是生产环境下这样去插入数据肯定是不妥的,于是尝试批量插入。
批量插入
根据以前的开发经验,批量插入,使用PrepareStatement
即可。代码百度即可,这里不再演示如下:
PreparedStatement ps = conn.prepareStatement(
"INSERT into student values (?, ?, ?)");
long l = new Date().getTime();
for (n = 0; n < 100; n++) {
ps.setLong(l);
ps.setString("zhangsan");
ps.setString("lisi");
ps.setInt(90);
l ++;
ps.addBatch();
}
ps.executeBatch();
但实际运行时,会报错。
解决错误
查询官方文档
这个我实在想不通,这么简单的代码,我应该没写错吧。再次检查代码,仍然报错;再次百度,写法仍然是这样的。
那就查查官方文档吧。
搜索批量插入
后,只搜索到三篇文章。
文档 | 涛思数据 | 连接器 链接:https://www.taosdata.com/cn/documentation/connector
文档 | 涛思数据 | 高效写入数据 链接:https://www.taosdata.com/cn/documentation/insert
文档 | 涛思数据 | 常见问题 链接:https://www.taosdata.com/cn/documentation/faq
根据标体来看,肯定是高效写入数据比较靠谱。但既然都碰到问题了,就都看一遍吧。
常见问题官方文档
该篇官方文档比较简短,只找到如下相关描述:
11. 最有效的写入数据的方法是什么?
批量插入。每条写入语句可以一张表同时插入多条记录,也可以同时插入多张表的多条记录。
虽然简短,但根据这条信息可以确定,tdengine是支持一次插入多条记录的。
连接器官方文档
顾名思义,该篇官方文章主要是讲各种语言的连接器的创建和使用的。找到java的连接器,查看相关官方文档。在Python客户端安装的windows代码示例下,找到如下描述:
import datetime
# 创建数据库
c1.execute('create database db')
c1.execute('use db')
# 建表
c1.execute('create table tb (ts timestamp, temperature int, humidity float)')
# 插入数据
start_time = datetime.datetime(2019, 11, 1)
affected_rows = c1.execute('insert into tb values (\'%s\', 0, 0.0)' %start_time)
# 批量插入数据
time_interval = datetime.timedelta(seconds=60)
sqlcmd = ['insert into tb values']
for irow in range(1,11):
start_time += time_interval
sqlcmd.append('(\'%s\', %d, %f)' %(start_time, irow, irow*1.2))
affected_rows = c1.execute(' '.join(sqlcmd))
所幸之前接触过python,能看懂啥意思。这段代码中的批量插入,并没有使用prepareStatement的方式,而是将sql拼接为insert into tb values(‘xxx’,xxx)(‘xxx’,xxx)的形式,再去执行sql。
从该篇官方文章中,再没有得到更具体的关于批量插入的信息。
高效写入数据官方文档
这篇官方文章,看名字就像要找的答案。很容易就找到了如下描述:
高效写入数据
TDengine支持多种接口写入数据,包括SQL, Prometheus, Telegraf, EMQ MQTT Broker, HiveMQ Broker, CSV文件等,后续还将提供Kafka, OPC等接口。数据可以单条插入,也可以批量插入,可以插入一个数据采集点的数据,也可以同时插入多个数据采集点的数据。支持多线程插入,支持时间乱序数据插入,也支持历史数据插入。
再次确定了连接器官方文章
小节中的结论: 以及如下官方描述:tdengine是支持一次插入多条记录的。
SQL写入
应用通过C/C++, JDBC, GO, 或Python Connector 执行SQL insert语句来插入数据,用户还可以通过TAOS Shell,手动输入SQL insert语句插入数据。比如下面这条insert 就将一条记录写入到表d1001中:
INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31);
TDengine支持一次写入多条记录,比如下面这条命令就将两条记录写入到表d1001中:
INSERT INTO d1001 VALUES (1538548684000, 10.2, 220, 0.23) (1538548696650, 10.3, 218, 0.25);
TDengine也支持一次向多个表写入数据,比如下面这条命令就向d1001写入两条记录,向d1002写入一条记录:
INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6, 218, 0.33) d1002 VALUES (1538548696800, 12.3, 221, 0.31);
详细的SQL INSERT语法规则请见 TAOS SQL 的数据写入 章节。
在这里,可以查到一次插入多条数据的sql写法。需要注意的是:多条数据之间并没有用逗号分隔。
继续向下查看还有如下官方描述:
Tips:
要提高写入效率,需要批量写入。一批写入的记录条数越多,插入效率就越高。但一条记录不能超过16K,一条SQL语句总长度不能超过64K(可通过参数maxSQLLength配置,最大可配置为1M)。
TDengine支持多线程同时写入,要进一步提高写入速度,一个客户端需要打开20个以上的线程同时写。但线程数达到一定数量后,无法再提高,甚至还会下降,因为线程切频繁切换,带来额外开销。
对同一张表,如果新插入记录的时间戳已经存在,默认(没有使用 UPDATE 1 创建数据库)新记录将被直接抛弃,也就是说,在一张表里,时间戳必须是唯一的。如果应用自动生成记录,很有可能生成的时间戳是一样的,这样,成功插入的记录条数会小于应用插入的记录条数。如果在创建数据库时使用 UPDATE 1 选项,插入相同时间戳的新记录将覆盖原有记录。
写入的数据的时间戳必须大于当前时间减去配置参数keep的时间。如果keep配置为3650天,那么无法写入比3650天还老的数据。写入数据的时间戳也不能大于当前时间加配置参数days。如果days配置为2,那么无法写入比当前时间还晚2天的数据。
这里包含很重要的信息:要提高写入效率,需要批量写入。一批写入的记录条数越多,插入效率就越高。但一条记录不能超过16K,一条SQL语句总长度不能超过64K(可通过参数maxSQLLength配置,最大可配置为1M)。
以及: TDengine支持多线程同时写入,要进一步提高写入速度,一个客户端需要打开20个以上的线程同时写。
虽然与批量插入无关,但这条信息也很重要:对同一张表,如果新插入记录的时间戳已经存在,默认(没有使用 UPDATE 1 创建数据库)新记录将被直接抛弃,也就是说,在一张表里,时间戳必须是唯一的。
最终结论
tdengine是支持批量插入的,但只支持sql形式的批量插入:insert into tb values(xxx,xxx)(xxx,xxx)
。最重要的是:一条数据不能超过16K,一条sql不能超过64K(可配置)。如果需要更快的写入速度,可以考虑多线程同时写入;官方文档对线程数量的描述为:20个以上。
同时,需要注意:没有使用 UPDATE 1 创建数据库,同一张表中,若需插入的数据的时间戳已存在,则该条需插入的数据被废弃。
修改后的代码
经过如上文章查询后,最终代码如下:
(用scala写的,语言不是问题,理解思路即可,注释也还是挺够用的)
def insert(size: Int, dataArray: Array[Row]): Unit ={
// j?charset=UTF-8&locale=en_US.UTF-8
val jdbcUrl = "jdbc:TAOS://:6030/restful_test?user=zhangsan&password=123456"
val conn = DriverManager.getConnection(jdbcUrl)
var ts = System.currentTimeMillis();
val sql = "insert into restful_test.student values"
var jdbcSql = new StringBuffer (sql )
val stmt = conn.createStatement();
// 当前一条sql插入的数据量
var i = 0;
var start = 0L
var end = 0L
// 共插入数据量
var allCount = 0
// 插入数据总消耗时间
var allTime = 0L
for ( row <- dataArray ) {
i = i + 1
// 同一张表中,若需插入的数据的时间戳已存在,则该条需插入的数据被废弃。所以需要手动该表ts。
ts = ts + 1l;
jdbcSql.append(
"(%d, \"%s\", \"%s\", %f)".format(ts, row.getString(0), row.getString(1), row.getDouble(2))
)
// 保证一条sql不超过16K,大于15K时就执行
if( jdbcSql.length > 15 * 1024 ){
start = System.currentTimeMillis();
stmt.executeUpdate(jdbcSql.toString)
end = System.currentTimeMillis();
println( i + ", " + ( end - start) )
allCount += i
allTime += ( end - start)
i = 0
jdbcSql = new StringBuffer(sql )
}
}
// 并不能保证数据中最后一条sql也大于15K,如果不添加下面的执行逻辑,则最后一条sql不会执行,会导致数据没有完全插入。
start = System.currentTimeMillis();
stmt.executeUpdate(jdbcSql.toString)
end = System.currentTimeMillis();
allCount += i
allTime += ( end - start)
println( i + ", " + ( end - start) )
println( allCount + ", " + allTime )
println( " " + size + ", -----------------" )
println( "-----------------------, ------------------------------------")
stmt.close()
conn.close()
}
【本文正在参与 “拥抱开源 | 涛思数据 TDengine有奖征稿】拥抱开源 涛思数据 TDengine有奖征稿活动-CSDN 链接:https://marketing.csdn.net/p/0ada836ca30caa924b9baae0fd33857c
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)