使用 api-table 连接 mysql and tdengine 多数据源
如何在 tio-boot 应用程序中配置多个数据源,这里以 MySQL 和 TDengine 为例。
ActiveRecordPluginConfiguration 类
import javax.sql.DataSource;
import com.jfinal.template.Engine;
import com.jfinal.template.source.ClassPathSourceFactory;
import com.litongjava.data.utils.TioRequestParamUtils;
import com.litongjava.jfinal.aop.annotation.AConfiguration;
import com.litongjava.jfinal.aop.annotation.AInitialization;
import com.litongjava.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.litongjava.jfinal.plugin.activerecord.OrderedFieldContainerFactory;
import com.litongjava.jfinal.plugin.activerecord.dialect.TdEngineDialect;
import com.litongjava.jfinal.plugin.hikaricp.DsContainer;
import com.litongjava.tio.boot.constatns.ConfigKeys;
import com.litongjava.tio.boot.server.TioBootServer;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
@AConfiguration
public class ActiveRecordPluginConfiguration {
@Initialization
public void activeRecordPlugin() throws Exception {
configArpForMysql();
configArpForTdEngine();
}
public DataSource getMysqlDataSource() {
String jdbcUrl = EnvUtils.get("jdbc.url");
String jdbcUser = EnvUtils.get("jdbc.user");
String jdbcPswd = EnvUtils.get("jdbc.pswd");
int maximumPoolSize = EnvUtils.getInt("jdbc.MaximumPoolSize", 2);
HikariConfig config = new HikariConfig();
// 设定基本参数
config.setJdbcUrl(jdbcUrl);
config.setUsername(jdbcUser);
config.setPassword(jdbcPswd);
config.setMaximumPoolSize(maximumPoolSize);
HikariDataSource hikariDataSource = new HikariDataSource(config);
TioBootServer.addDestroyMethod(hikariDataSource::close);
return hikariDataSource;
}
public ActiveRecordPlugin configArpForMysql() {
String property = EnvUtils.get(ConfigKeys.APP_ENV);
DataSource mysqlDataSource = getMysqlDataSource();
ActiveRecordPlugin mysqlArp = new ActiveRecordPlugin(mysqlDataSource);
mysqlArp.setContainerFactory(new OrderedFieldContainerFactory());
Engine engine = mysqlArp.getEngine();
engine.setSourceFactory(new ClassPathSourceFactory());
engine.setCompressorOn(' ');
engine.setCompressorOn('\n');
if ("dev".equals(property)) {
mysqlArp.setDevMode(true);
engine.setDevMode(true);
mysqlArp.setShowSql(true);
}
mysqlArp.addSqlTemplate("/sql/all_sqls.sql");
mysqlArp.start();
TioBootServer.addDestroyMethod(mysqlArp::stop);
// add
TioRequestParamUtils.types.add("bigint");
return mysqlArp;
}
public DataSource getTdengineDataSource() {
HikariConfig config = new HikariConfig();
// jdbc properties
String host = EnvUtils.get("tdengine.host");
int port = EnvUtils.getInt("tdengine.port");
String user = EnvUtils.get("tdengine.username");
String pswd = EnvUtils.get("tdengine.password");
String dbName = EnvUtils.get("tdengine.database");
String driverClassName = EnvUtils.get("tdengine.driverClassName");
// String driverClassName = "com.taosdata.jdbc.TSDBDriver";
String jdbcUrl = getTdEngineJdbcUrl(host, port, user, pswd, dbName);
config.setJdbcUrl(jdbcUrl);
config.setDriverClassName(driverClassName);
// connection pool configurations
config.setMinimumIdle(10); // minimum number of idle connection
config.setMaximumPoolSize(10); // maximum number of connection in the pool
config.setConnectionTimeout(30000); // maximum wait milliseconds for get connection from pool
config.setMaxLifetime(0); // maximum life time for each connection
config.setIdleTimeout(0); // max idle time for recycle idle connection
config.setConnectionTestQuery("select server_status()"); // validation query
HikariDataSource ds = new HikariDataSource(config); // create datasource
DsContainer.setDataSource(ds);
TioBootServer.addDestroyMethod(ds::close);
return ds;
}
private String getTdEngineJdbcUrl(String host, int port, String user, String pswd, String dbName) {
// 添加batchfetch=true属性后得到的Websocket连接
return "jdbc:TAOS-RS://" + host + ":" + port + "/" + dbName + "?user=" + user + "&password=" + pswd
+ "&batchfetch=true";
}
public void configArpForTdEngine() {
String property = EnvUtils.get(ConfigKeys.APP_ENV);
DataSource tdengineDataSource = getTdengineDataSource();
// 指定名称为 tdengine
ActiveRecordPlugin tdengineArp = new ActiveRecordPlugin("tdengine", tdengineDataSource);
tdengineArp.setDialect(new TdEngineDialect());
tdengineArp.setContainerFactory(new OrderedFieldContainerFactory());
if ("dev".equals(property)) {
tdengineArp.setDevMode(true);
tdengineArp.setShowSql(true);
}
tdengineArp.start();
TioBootServer.addDestroyMethod(tdengineArp::stop);
}
}
数据源配置:
getMysqlDataSource()
: 此方法创建并配置 MySQL 的数据源。使用 HikariCP 作为连接池管理器。getTdengineDataSource()
: 此方法创建并配置 TDengine 的数据源。同样使用 HikariCP,并设置了针对 TDengine 特有的属性和连接测试查询。
ActiveRecordPlugin 配置:
configArpForMysql()
: 为 MySQL 配置 ActiveRecordPlugin。设置了 SQL 模板路径、开发模式、SQL 显示等。configArpForTdEngine()
: 为 TDengine 配置 ActiveRecordPlugin。同样设置了开发模式和 SQL 显示,数据库标识名为 "tdengine"。
资源清理: 在
getMysqlDataSource()
和getTdengineDataSource()
方法中,使用TioBootServer.addDestroyMethod
来确保应用停止时数据源能够正确关闭。
测试类
- ActiveRecordPluginConfigurationTest:
- 使用
TioBootTest.before
方法初始化测试环境。 test()
方法中演示了如何使用DbJsonService
从 MySQL 查询数据,并使用Db.use("tdengine")
从 TDengine 查询数据。
- 使用
import org.junit.Before;
import org.junit.Test;
import com.jfinal.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.DbPro;
import com.litongjava.jfinal.plugin.activerecord.Page;
import com.litongjava.jfinal.plugin.activerecord.Row;
import com.litongjava.jfinal.plugin.activerecord.SqlPara;
import com.litongjava.tio.boot.tesing.TioBootTest;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ActiveRecordPluginConfigurationTest {
@Before
public void before() {
TioBootTest.before(ActiveRecordPluginConfiguration.class);
}
@Test
public void test() {
DbJsonService dbJsonService = DbJsonService.getInstance();
// 查询 mysql
DbJsonBean<Page<Row>> page = dbJsonService.page("sys_user_info", Kv.create());
log.info("size:{}", page.getData().getPageSize());
// 查询 tdgnine
DbPro dbPro = Db.use("tdengine");
SqlPara sqlPara = new SqlPara();
sqlPara.setSql("select `*` from sensor_data");
Page<Row> page2 = dbPro.paginate(1, 10, sqlPara);
log.info("page size:{}", page2.getPageSize());
}
}
控制器测试类
- PenRawDataController:
- 提供了一个
page()
方法,用于处理分页请求。 - 通过
DbJsonService
和Db.use("tdengine")
从特定的数据源(在这里是 TDengine)查询数据。
- 提供了一个
import java.util.Map;
import com.jfinal.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.model.DbPage;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.data.utils.DbJsonBeanUtils;
import com.litongjava.data.utils.KvUtils;
import com.litongjava.data.utils.TioRequestParamUtils;
import com.litongjava.jfinal.aop.annotation.AAutowired;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.DbPro;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.annotation.RequestPath;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequestPath("/pen/raw/data")
public class PenRawDataController {
@AAutowired
private DbJsonService dbJsonService;
@RequestPath("/page")
public DbJsonBean<DbPage<Kv>> page(HttpRequest request) {
String f = "pen_raw_data_stable";
Map<String, Object> map = TioRequestParamUtils.getRequestMap(request);
map.remove("f");
Kv kv = KvUtils.camelToUnderscore(map);
log.info("tableName:{},kv:{}", f, kv);
DbPro dbPro = Db.use("tdengine");
return DbJsonBeanUtils.pageToDbPage(dbJsonService.page(dbPro, f, kv));
}
}
测试参数查询
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import com.jfinal.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.jfinal.plugin.activerecord.Config;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.DbPro;
import com.litongjava.jfinal.plugin.activerecord.Row;
import com.litongjava.jfinal.plugin.activerecord.RecordBuilder;
import com.litongjava.jfinal.plugin.hikaricp.DsContainer;
import com.litongjava.tio.boot.tesing.TioBootTest;
import lombok.Cleanup;
public class PenRawDataControllerTest {
@Before
public void before() {
TioBootTest.before(ActiveRecordPluginConfiguration.class);
}
@Test
public void query() throws SQLException {
String sql = "select payload_ts,data_ts,client_id,user_id from manliang_pen.pen_page_stable where user_id=?";
String userId = "18374686479671623681";
@Cleanup
Connection conn = DsContainer.ds.getConnection();
Config config = Db.use("tdengine").getConfig();
List<Row> list = null;
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, userId);
ResultSet executeQuery = pstmt.executeQuery();
list = RecordBuilder.me.build(config, executeQuery);
}
System.out.println(list);
}
@Test
public void queryWithDb() {
String sql = "select payload_ts,data_ts,client_id,user_id from manliang_pen.pen_page_stable where user_id=?";
String userId = "18374686479671623681";
List<Row> find = Db.use("tdengine").find(sql, userId);
System.out.println(find);
}
@Test
public void queryWithDbJsonService() {
DbJsonService dbJsonService = DbJsonService.getInstance();
Kv kv = Kv.create();
kv.set("table_name", "manliang_pen.pen_page_stable");
kv.set("columns", "payload_ts,data_ts,client_id,user_id");
kv.set("user_id", "18374686479671623681");
DbPro dbPro = Db.use("tdengine");
DbJsonBean<List<Row>> dbJsonBean = dbJsonService.list(dbPro, kv);
System.out.println(dbJsonBean);
}
}
···