JSONB、Upsert、窗口函数实战
在前面几篇中我们已经完成:
- tio-boot + jOOQ 整合(DSLContext 注入)
- 事务管理
- 批量与性能优化
- jOOQ 代码生成与类型安全升级(可选)
本文聚焦 PostgreSQL 的三类“生产级常用高级特性”:
- JSONB:半结构化数据的存储与查询
- Upsert(ON CONFLICT):避免“先查再写”的并发问题
- 窗口函数:排行榜、分页统计、分组 TopN 的最佳实践
示例代码以 jOOQ 为主,尽量使用 生成代码(强类型);若你当前未启用 codegen,也会给出可替代写法思路。
一、JSONB 实战:存储 + 查询 + 索引
1.1 表设计:业务字段 + JSONB 扩展字段
典型场景:
- 用户属性可能随时扩展(标签、偏好、AB 实验参数)
- 不想频繁改表结构
建表示例:
CREATE TABLE user_profile (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL UNIQUE,
profile JSONB NOT NULL DEFAULT '{}'::jsonb,
updated_at TIMESTAMP NOT NULL DEFAULT now()
);
-- GIN 索引:加速 JSONB 查询
CREATE INDEX idx_user_profile_profile_gin
ON user_profile USING GIN (profile);
profile 存放:
{
"tags": ["vip", "sports"],
"age": 18,
"device": {"os": "ios", "ver": "17.2"},
"ab": {"expA": "B"}
}
1.2 写入 JSONB:用 Upsert 一起做更常见
如果你启用了 jOOQ codegen,字段通常会有 USER_PROFILE.PROFILE(类型可能是 JSONB)。
写入 JSONB
import static demo.jooq.gen.Tables.USER_PROFILE;
import org.jooq.DSLContext;
import org.jooq.JSONB;
public int saveProfile(DSLContext dsl, long userId, String json) {
return dsl.insertInto(USER_PROFILE)
.set(USER_PROFILE.USER_ID, userId)
.set(USER_PROFILE.PROFILE, JSONB.valueOf(json))
.onConflict(USER_PROFILE.USER_ID)
.doUpdate()
.set(USER_PROFILE.PROFILE, JSONB.valueOf(json))
.set(USER_PROFILE.UPDATED_AT, dsl.currentTimestamp())
.execute();
}
说明:
JSONB.valueOf(json)直接把字符串包装为 JSONB- 生产中 json 建议由 fastjson2/jackson 生成,避免手写格式错
1.3 JSONB 查询:按字段过滤
查询 profile.age >= 18
PostgreSQL 常见写法:
(profile->>'age')::int >= 18
jOOQ(生成字段 + 自定义表达式):
import static demo.jooq.gen.Tables.USER_PROFILE;
import org.jooq.Field;
import org.jooq.impl.DSL;
Field<Integer> age =
DSL.field("({0} ->> 'age')::int", Integer.class, USER_PROFILE.PROFILE);
var list = dsl.select(USER_PROFILE.USER_ID, USER_PROFILE.PROFILE)
.from(USER_PROFILE)
.where(age.ge(18))
.fetch();
说明:
- jOOQ 对 PG JSON 操作有不少支持,但在“复杂 JSON path”上,用
DSL.field(sql, type, args...)最灵活 {0}是占位符,把USER_PROFILE.PROFILE安全拼进去(不是字符串拼接)
1.4 JSONB 查询:包含关系(@>)
查包含标签 vip:
profile @> '{"tags":["vip"]}'::jsonb
jOOQ:
import org.jooq.JSONB;
import org.jooq.impl.DSL;
var vipUsers = dsl.select(USER_PROFILE.USER_ID)
.from(USER_PROFILE)
.where(DSL.condition("{0} @> {1}::jsonb",
USER_PROFILE.PROFILE,
JSONB.valueOf("{\"tags\":[\"vip\"]}")))
.fetchInto(Long.class);
建议:
@>是最常见 JSONB 检索方式,配合 GIN 索引性能很好- “包含关系”比“逐层取字段比较”更容易走索引
1.5 JSONB 索引建议(很关键)
GIN(profile):泛用,但体积更大- 若检索集中在某个 key,可考虑表达式索引,例如:
CREATE INDEX idx_user_profile_age
ON user_profile (((profile->>'age')::int));
当你大量做 age 查询时,性能会明显提升。
二、Upsert 实战:ON CONFLICT 的正确打开方式
Upsert 用来解决两类痛点:
- 避免“先 select 再 insert/update”的双 SQL
- 解决并发下的竞争条件(race condition)
2.1 基础 Upsert:按唯一键更新
import static demo.jooq.gen.Tables.SYSTEM_ADMIN;
public int upsertAdmin(DSLContext dsl, String loginName, String password) {
return dsl.insertInto(SYSTEM_ADMIN)
.set(SYSTEM_ADMIN.LOGIN_NAME, loginName)
.set(SYSTEM_ADMIN.PASSWORD, password)
.onConflict(SYSTEM_ADMIN.LOGIN_NAME)
.doUpdate()
.set(SYSTEM_ADMIN.PASSWORD, password)
.execute();
}
说明:
onConflict后面接冲突目标(唯一键/唯一索引字段)doUpdate指定更新行为
2.2 Upsert:只在某些条件下更新(where)
常见需求:版本号、时间戳控制,避免旧数据覆盖新数据。
例如:只允许更“新”的更新时间覆盖
import static demo.jooq.gen.Tables.USER_PROFILE;
public int upsertProfileWithTs(DSLContext dsl, long userId, String json) {
return dsl.insertInto(USER_PROFILE)
.set(USER_PROFILE.USER_ID, userId)
.set(USER_PROFILE.PROFILE, org.jooq.JSONB.valueOf(json))
.set(USER_PROFILE.UPDATED_AT, dsl.currentTimestamp())
.onConflict(USER_PROFILE.USER_ID)
.doUpdate()
.set(USER_PROFILE.PROFILE, org.jooq.JSONB.valueOf(json))
.set(USER_PROFILE.UPDATED_AT, dsl.currentTimestamp())
// 只在目标行的 updated_at 更旧时才更新
.where(USER_PROFILE.UPDATED_AT.lt(dsl.currentTimestamp().minus(DSL.inline(0))))
.execute();
}
要点:
doUpdate().where(...)是 PG 的高级用法- 真实业务里通常会用请求传入的版本号/时间戳做对比
2.3 Upsert + RETURNING:直接拿回插入/更新结果
PG 支持 RETURNING,非常适合“写后读”。
import static demo.jooq.gen.Tables.SYSTEM_ADMIN;
public Integer upsertAndReturnId(DSLContext dsl, String loginName, String password) {
return dsl.insertInto(SYSTEM_ADMIN)
.set(SYSTEM_ADMIN.LOGIN_NAME, loginName)
.set(SYSTEM_ADMIN.PASSWORD, password)
.onConflict(SYSTEM_ADMIN.LOGIN_NAME)
.doUpdate()
.set(SYSTEM_ADMIN.PASSWORD, password)
.returning(SYSTEM_ADMIN.ID)
.fetchOne()
.getId();
}
优势:
- 少一次 select
- 并发下结果更可靠
三、窗口函数实战:TopN、排名、分页统计
窗口函数非常适合做:
- 排行榜(rank/row_number)
- 分组 TopN(每个分类取前 3)
- 分页时同时返回 total
- 近 N 条记录的滚动统计
下面用一个通用表来演示。
3.1 示例表:用户积分流水
CREATE TABLE user_score_log (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
score INT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT now()
);
CREATE INDEX idx_user_score_log_user_time
ON user_score_log(user_id, created_at DESC);
3.2 排名:row_number / rank / dense_rank
需求:按 score 降序给用户排序,取前 20。
import static demo.jooq.gen.Tables.USER_SCORE_LOG;
import org.jooq.Field;
import org.jooq.impl.DSL;
Field<Integer> rn = DSL.rowNumber()
.over(DSL.orderBy(USER_SCORE_LOG.SCORE.desc()))
.as("rn");
var top20 = dsl.select(USER_SCORE_LOG.USER_ID, USER_SCORE_LOG.SCORE, rn)
.from(USER_SCORE_LOG)
.orderBy(USER_SCORE_LOG.SCORE.desc())
.limit(20)
.fetch();
说明:
row_number:严格序号,不会并列rank:并列会跳号dense_rank:并列不跳号
替换很简单:
Field<Integer> r = DSL.rank().over(DSL.orderBy(USER_SCORE_LOG.SCORE.desc())).as("r");
Field<Integer> dr = DSL.denseRank().over(DSL.orderBy(USER_SCORE_LOG.SCORE.desc())).as("dr");
3.3 分组 TopN:每个用户取最近 3 条记录
这是窗口函数最经典的用途之一。
import static demo.jooq.gen.Tables.USER_SCORE_LOG;
import org.jooq.Table;
import org.jooq.impl.DSL;
var rn = DSL.rowNumber()
.over(DSL.partitionBy(USER_SCORE_LOG.USER_ID)
.orderBy(USER_SCORE_LOG.CREATED_AT.desc()))
.as("rn");
Table<?> t = dsl
.select(USER_SCORE_LOG.ID, USER_SCORE_LOG.USER_ID, USER_SCORE_LOG.SCORE, USER_SCORE_LOG.CREATED_AT, rn)
.from(USER_SCORE_LOG)
.asTable("t");
var result = dsl.selectFrom(t)
.where(DSL.field("rn", Integer.class).le(3))
.orderBy(DSL.field("user_id"), DSL.field("created_at").desc())
.fetch();
逻辑解释:
- 先把每个 user_id 内的记录按时间排序并编号
- 再过滤 rn <= 3,就得到“每组 Top3”
3.4 分页 + 总数(同一条 SQL 返回 total)
很多系统分页会写两条 SQL:
- select data limit/offset
- select count(*)
窗口函数可以一次搞定:
import static demo.jooq.gen.Tables.SYSTEM_ADMIN;
import org.jooq.Field;
import org.jooq.impl.DSL;
int pageNo = 1;
int pageSize = 20;
Field<Integer> total = DSL.count().over().as("total");
var rows = dsl.select(
SYSTEM_ADMIN.ID,
SYSTEM_ADMIN.LOGIN_NAME,
total
)
.from(SYSTEM_ADMIN)
.orderBy(SYSTEM_ADMIN.ID.desc())
.limit(pageSize)
.offset((pageNo - 1) * pageSize)
.fetch();
读取 total:
- 每行都有 total(相同值)
- 取第一行的 total 即可
优势:
- 少一次 count SQL
- 数据一致性更好(同一快照)
注意:
- 超大表 count 仍然贵,必要时要做缓存/近似统计/按条件索引优化
四、把这些能力放进 tio-boot + jOOQ 的项目结构里
推荐结构:
dao/:只写 SQL(jOOQ)service/:事务边界、组合多个 DAOcontroller/:参数解析、返回 Kv/DTO
JSONB/Upsert/窗口函数最适合:
- DAO 中封装成方法
- Service 中组合与事务控制
五、总结
PostgreSQL 的强大之处在于:
- JSONB 让你同时拥有“关系型 + 文档型”的能力
- Upsert 让并发写入变得可靠且高性能
- 窗口函数让复杂统计/TopN/分页天然高效
而 jOOQ 的价值是:
- 把这些高级 SQL 能力用 类型安全 + 可组合 的方式写出来
- 在 Java 中保持 SQL 的可读性与可维护性
