使用 ApiTable 连接 TDEngine
本节列出了整合 TDEngine 所需的 Maven 依赖。这些依赖包括 tio-boot 用于构建基于事件驱动的应用程序,以及相关的 JDBC 驱动和连接池用于数据库连接。
连接 TDEngine
添加依赖
<ApiTable.version>1.3.0</ApiTable.version>
<dependency>
<groupId>com.litongjava</groupId>
<artifactId>ApiTable</artifactId>
<version>${ApiTable.version}</version>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.7</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>4.0.3</version>
</dependency>
添加配置信息
app.properties
tdengine.host=192.168.3.9
tdengine.port=6041
tdengine.username=root
tdengine.password=root123
tdengine.database=manliang_pen
tdengine.driverClassName=com.taosdata.jdbc.rs.RestfulDriver
启动类
package com.litongjava.tio.web.hello;
import com.litongjava.hotswap.wrapper.tio.boot.TioApplicationWrapper;
import com.litongjava.jfinal.aop.annotation.AComponentScan;
@AComponentScan
public class HelloApp {
public static void main(String[] args) {
long start = System.currentTimeMillis();
TioApplicationWrapper.run(HelloApp.class, args);
long end = System.currentTimeMillis();
System.out.println((end - start) + "ms");
}
}
ActiveRecordPluginConfiguration 类
import javax.sql.DataSource;
import com.jfinal.template.Engine;
import com.jfinal.template.source.ClassPathSourceFactory;
import com.litongjava.data.utils.TioRequestParamUtils;
import com.litongjava.jfinal.aop.annotation.AConfiguration;
import com.litongjava.jfinal.aop.annotation.AInitialization;
import com.litongjava.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.litongjava.jfinal.plugin.activerecord.OrderedFieldContainerFactory;
import com.litongjava.jfinal.plugin.activerecord.dialect.TdEngineDialect;
import com.litongjava.jfinal.plugin.hikaricp.DsContainer;
import com.litongjava.tio.boot.constatns.ConfigKeys;
import com.litongjava.tio.boot.server.TioBootServer;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
@AConfiguration
public class ActiveRecordPluginConfiguration {
@Initialization
public void activeRecordPlugin() throws Exception {
configArpForTdEngine();
}
public DataSource getTdengineDataSource() {
HikariConfig config = new HikariConfig();
// jdbc properties
String host = EnvUtils.get("tdengine.host");
int port = EnvUtils.getInt("tdengine.port");
String user = EnvUtils.get("tdengine.username");
String pswd = EnvUtils.get("tdengine.password");
String dbName = EnvUtils.get("tdengine.database");
String driverClassName = EnvUtils.get("tdengine.driverClassName");
// String driverClassName = "com.taosdata.jdbc.TSDBDriver";
String jdbcUrl = getTdEngineJdbcUrl(host, port, user, pswd, dbName);
config.setJdbcUrl(jdbcUrl);
config.setDriverClassName(driverClassName);
// connection pool configurations
config.setMinimumIdle(10); // minimum number of idle connection
config.setMaximumPoolSize(10); // maximum number of connection in the pool
config.setConnectionTimeout(30000); // maximum wait milliseconds for get connection from pool
config.setMaxLifetime(0); // maximum life time for each connection
config.setIdleTimeout(0); // max idle time for recycle idle connection
config.setConnectionTestQuery("select server_status()"); // validation query
HikariDataSource ds = new HikariDataSource(config); // create datasource
DsContainer.setDataSource(ds);
TioBootServer.addDestroyMethod(ds::close);
return ds;
}
private String getTdEngineJdbcUrl(String host, int port, String user, String pswd, String dbName) {
// 添加batchfetch=true属性后得到的Websocket连接
return "jdbc:TAOS-RS://" + host + ":" + port + "/" + dbName + "?user=" + user + "&password=" + pswd
+ "&batchfetch=true";
}
public void configArpForTdEngine() {
String property = EnvUtils.get(ConfigKeys.APP_ENV);
DataSource tdengineDataSource = getTdengineDataSource();
// 指定名称为 tdengine
ActiveRecordPlugin tdengineArp = new ActiveRecordPlugin("tdengine", tdengineDataSource);
tdengineArp.setDialect(new TdEngineDialect());
tdengineArp.setContainerFactory(new OrderedFieldContainerFactory());
if ("dev".equals(property)) {
tdengineArp.setDevMode(true);
tdengineArp.setShowSql(true);
}
tdengineArp.start();
TioBootServer.addDestroyMethod(tdengineArp::stop);
}
}
数据源配置:
getTdengineDataSource()
: 此方法创建并配置 TDengine 的数据源。同样使用 HikariCP,并设置了针对 TDengine 特有的属性和连接测试查询。
ActiveRecordPlugin 配置:
configArpForTdEngine()
: 为 TDengine 配置 ActiveRecordPlugin。同样设置了开发模式和 SQL 显示,数据库标识名为 "tdengine"。
资源清理: 在
getTdengineDataSource()
方法中,使用TioBootServer.addDestroyMethod
来确保应用停止时数据源能够正确关闭。
测试类
- ActiveRecordPluginConfigurationTest:
- 使用
TioBootTest.before
方法初始化测试环境。 test()
方法中演示了如何使用DbJsonService
从 MySQL 查询数据,并使用Db.use("tdengine")
从 TDengine 查询数据。
- 使用
import org.junit.Before;
import org.junit.Test;
import com.jfinal.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.DbPro;
import com.litongjava.jfinal.plugin.activerecord.Page;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.jfinal.plugin.activerecord.SqlPara;
import com.litongjava.tio.boot.tesing.TioBootTest;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ActiveRecordPluginConfigurationTest {
@Before
public void before() {
TioBootTest.before(ActiveRecordPluginConfiguration.class);
}
@Test
public void test() {
DbJsonService dbJsonService = DbJsonService.getInstance();
// 查询 mysql
DbJsonBean<Page<Record>> page = dbJsonService.page("sys_user_info", Kv.create());
log.info("size:{}", page.getData().getPageSize());
// 查询 tdgnine
DbPro dbPro = Db.use("tdengine");
SqlPara sqlPara = new SqlPara();
sqlPara.setSql("select `*` from sensor_data");
Page<Record> page2 = dbPro.paginate(1, 10, sqlPara);
log.info("page size:{}", page2.getPageSize());
}
}
控制器测试类
- PenRawDataController:
- 提供了一个
page()
方法,用于处理分页请求。 - 通过
DbJsonService
和Db.use("tdengine")
从特定的数据源(在这里是 TDengine)查询数据。
- 提供了一个
import java.util.Map;
import com.jfinal.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.model.DbPage;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.data.utils.DbJsonBeanUtils;
import com.litongjava.data.utils.KvUtils;
import com.litongjava.data.utils.TioRequestParamUtils;
import com.litongjava.jfinal.aop.annotation.AAutowired;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.DbPro;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.annotation.RequestPath;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequestPath("/pen/raw/data")
public class PenRawDataController {
@AAutowired
private DbJsonService dbJsonService;
@RequestPath("/page")
public DbJsonBean<DbPage<Kv>> page(HttpRequest request) {
String f = "pen_raw_data_stable";
Map<String, Object> map = TioRequestParamUtils.getRequestMap(request);
map.remove("f");
Kv kv = KvUtils.camelToUnderscore(map);
log.info("tableName:{},kv:{}", f, kv);
DbPro dbPro = Db.use("tdengine");
return DbJsonBeanUtils.pageToDbPage(dbJsonService.page(dbPro, f, kv));
}
}
测试参数查询
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import com.jfinal.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.jfinal.plugin.activerecord.Config;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.DbPro;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.jfinal.plugin.activerecord.RecordBuilder;
import com.litongjava.jfinal.plugin.hikaricp.DsContainer;
import com.litongjava.tio.boot.tesing.TioBootTest;
import lombok.Cleanup;
public class PenRawDataControllerTest {
@Before
public void before() {
TioBootTest.before(ActiveRecordPluginConfiguration.class);
}
@Test
public void query() throws SQLException {
String sql = "select payload_ts,data_ts,client_id,user_id from manliang_pen.pen_page_stable where user_id=?";
String userId = "18374686479671623681";
@Cleanup
Connection conn = DsContainer.ds.getConnection();
Config config = Db.use("tdengine").getConfig();
List<Record> list = null;
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, userId);
ResultSet executeQuery = pstmt.executeQuery();
list = RecordBuilder.me.build(config, executeQuery);
}
System.out.println(list);
}
@Test
public void queryWithDb() {
String sql = "select payload_ts,data_ts,client_id,user_id from manliang_pen.pen_page_stable where user_id=?";
String userId = "18374686479671623681";
List<Record> find = Db.use("tdengine").find(sql, userId);
System.out.println(find);
}
@Test
public void queryWithDbJsonService() {
DbJsonService dbJsonService = DbJsonService.getInstance();
Kv kv = Kv.create();
kv.set("table_name", "manliang_pen.pen_page_stable");
kv.set("columns", "payload_ts,data_ts,client_id,user_id");
kv.set("user_id", "18374686479671623681");
DbPro dbPro = Db.use("tdengine");
DbJsonBean<List<Record>> dbJsonBean = dbJsonService.list(dbPro, kv);
System.out.println(dbJsonBean);
}
}
基操作示例
增删改查
package com.litongjava.tio.web.hello.controller;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.List;
import java.util.Random;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.jfinal.plugin.activerecord.RecordBuilder;
import com.litongjava.annotation.RequestPath;
import com.litongjava.tio.web.hello.config.utils.TDUtils;
import com.taosdata.jdbc.ws.TSWSPreparedStatement;
import lombok.Cleanup;
@RequestPath("/tdeingien/test")
public class TbEngineTestController {
public String connection() throws SQLException {
@Cleanup
Connection connection = TDUtils.ds.getConnection();
String string = connection.toString();
return string;
}
/**
* 创建表和数据库
* @throws SQLException
*/
public String init() throws SQLException {
int BINARY_COLUMN_SIZE = 30;
String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE
+ "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE
+ "))" };
@Cleanup
Connection conn = TDUtils.ds.getConnection();
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_ws_parabind");
stmt.execute("create database if not exists test_ws_parabind");
stmt.execute("use test_ws_parabind");
for (int i = 0; i < schemaList.length; i++) {
stmt.execute(schemaList[i]);
}
}
return "success";
}
/**
* init类型参数查询
*/
public String bindInteger() throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
int numOfSubTable = 10, numOfRow = 10;
Random random = new Random(System.currentTimeMillis());
@Cleanup
Connection conn = TDUtils.ds.getConnection();
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
pstmt.execute("use test_ws_parabind");
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t1_" + i);
// set tags
pstmt.setTagByte(1, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setTagShort(2, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setTagInt(3, random.nextInt(Integer.MAX_VALUE));
pstmt.setTagLong(4, random.nextLong());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setByte(2, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setShort(3, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setInt(4, random.nextInt(Integer.MAX_VALUE));
pstmt.setLong(5, random.nextLong());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
return "success";
}
public String bindFloat() throws SQLException {
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
Random random = new Random(System.currentTimeMillis());
@Cleanup
Connection conn = TDUtils.ds.getConnection();
int numOfSubTable = 10, numOfRow = 10;
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
pstmt.execute("use test_ws_parabind");
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t2_" + i);
// set tags
pstmt.setTagFloat(1, random.nextFloat());
pstmt.setTagDouble(2, random.nextDouble());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setFloat(2, random.nextFloat());
pstmt.setDouble(3, random.nextDouble());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
return "success";
}
public String bindBoolean() throws SQLException {
String sql = "insert into ? using stable3 tags(?) values(?,?)";
int numOfSubTable = 10, numOfRow = 10;
Random random = new Random(System.currentTimeMillis());
@Cleanup
Connection conn = TDUtils.ds.getConnection();
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t3_" + i);
// set tags
pstmt.setTagBoolean(1, random.nextBoolean());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setBoolean(2, random.nextBoolean());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
return "success";
}
public String bindBytes() throws SQLException {
String sql = "insert into ? using stable4 tags(?) values(?,?)";
int numOfSubTable = 10, numOfRow = 10;
@Cleanup
Connection conn = TDUtils.ds.getConnection();
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t4_" + i);
// set tags
pstmt.setTagString(1, new String("abc"));
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setString(2, "abc");
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
return "success";
}
public String bindString() throws SQLException {
String sql = "insert into ? using stable5 tags(?) values(?,?)";
int numOfSubTable = 10, numOfRow = 10;
@Cleanup
Connection conn = TDUtils.ds.getConnection();
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t5_" + i);
// set tags
pstmt.setTagNString(1, "California.SanFrancisco");
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(0, new Timestamp(current + j));
pstmt.setNString(1, "California.SanFrancisco");
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
return "success";
}
public List<Record> selectStable1() throws SQLException {
String sql = "select * from test.stable1";
//List<Record> records = Db.find(sql);
//return records;
@Cleanup
Connection conn = TDUtils.ds.getConnection();
@Cleanup
Statement stmt = conn.createStatement();
stmt.execute("use test_ws_parabind");
ResultSet resultSet = stmt.executeQuery(sql);
RecordBuilder.me.build(null, resultSet);
return null;
}
}
查询示例
package com.litongjava.tio.web.hello.controller;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.annotation.RequestPath;
@RequestPath("/stable1")
public class Stable1Controller {
public List<Map<String, Object>> list() {
List<Record> records = Db.findAll("stable1");
List<Map<String, Object>> result = records.stream().map((t) -> t.toMap()).collect(Collectors.toList());
return result;
}
}
package com.litongjava.tio.web.hello.controller;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.annotation.RequestPath;
@RequestPath("/stable2")
public class Stable2Controller {
public List<Map<String, Object>> list() {
List<Record> records = Db.findAll("stable2");
List<Map<String, Object>> result = records.stream().map((t) -> t.toMap()).collect(Collectors.toList());
return result;
}
}
package com.litongjava.tio.web.hello.controller;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.annotation.RequestPath;
@RequestPath("/stable3")
public class Stable3Controller {
public List<Map<String, Object>> list() {
List<Record> records = Db.findAll("stable3");
List<Map<String, Object>> result = records.stream().map((t) -> t.toMap()).collect(Collectors.toList());
return result;
}
}
package com.litongjava.tio.web.hello.controller;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.annotation.RequestPath;
@RequestPath("/stable4")
public class Stable4Controller {
public List<Map<String, Object>> list() {
List<Record> records = Db.findAll("stable4");
List<Map<String, Object>> result = records.stream().map((t) -> t.toMap()).collect(Collectors.toList());
return result;
}
}
package com.litongjava.tio.web.hello.controller;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.annotation.RequestPath;
@RequestPath("/stable5")
public class Stable5Controller {
public List<Map<String, Object>> list() {
List<Record> records = Db.findAll("stable5");
List<Map<String, Object>> result = records.stream().map((t) -> t.toMap()).collect(Collectors.toList());
return result;
}
}
测试
通过以下 URL 可以测试应用程序的不同功能,验证数据的创建、插入和查询是否正常工作。
- http://localhost/stable1/list
- http://localhost/stable2/list
- http://localhost/stable3/list
- http://localhost/stable4/list
- http://localhost/stable5/list