hotime/cache/cache_db.go
hoteas 3d83c41905 feat(cache): 增加批量操作支持以提升性能
- 在 HoTimeCache 中新增 SessionsGet、SessionsSet 和 SessionsDelete 方法,支持批量获取、设置和删除 Session 缓存
- 优化缓存逻辑,减少数据库写入次数,提升性能
- 更新文档,详细说明批量操作的使用方法和性能对比
- 添加调试日志记录,便于追踪批量操作的执行情况
2026-01-30 17:51:43 +08:00

694 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cache
import (
"database/sql"
"encoding/json"
"os"
"strings"
"time"
. "code.hoteas.com/golang/hotime/common"
)
// debugLogDb 写入调试日志
func debugLogDb(hypothesisId, location, message string, data map[string]interface{}) {
// #region agent log
logFile, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if logFile != nil {
logEntry, _ := json.Marshal(map[string]interface{}{
"sessionId": "cache-db-debug",
"runId": "test-run",
"hypothesisId": hypothesisId,
"location": location,
"message": message,
"data": data,
"timestamp": time.Now().UnixMilli(),
})
logFile.Write(append(logEntry, '\n'))
logFile.Close()
}
// #endregion
}
// 表名常量
const (
CacheTableName = "hotime_cache"
CacheHistoryTableName = "hotime_cache_history"
LegacyCacheTableName = "cached" // 老版本缓存表名
DefaultCacheTimeout = 24 * 60 * 60 // 默认过期时间 24 小时
CacheModeNew = "new" // 新模式:只使用新表
CacheModeCompatible = "compatible" // 兼容模式:写新读老
)
type HoTimeDBInterface interface {
GetPrefix() string
Query(query string, args ...interface{}) []Map
Exec(query string, args ...interface{}) (sql.Result, *Error)
Get(table string, qu ...interface{}) Map
Select(table string, qu ...interface{}) []Map
Delete(table string, data map[string]interface{}) int64
Update(table string, data Map, where Map) int64
Insert(table string, data map[string]interface{}) int64
GetType() string
}
type CacheDb struct {
TimeOut int64
DbSet bool
SessionSet bool
HistorySet bool // 是否开启历史记录
Mode string // 缓存模式:"new"(默认,只用新表) 或 "compatible"(写新读老)
Db HoTimeDBInterface
*Error
ContextBase
isInit bool
}
func (that *CacheDb) GetError() *Error {
return that.Error
}
func (that *CacheDb) SetError(err *Error) {
that.Error = err
}
// getTableName 获取带前缀的表名
func (that *CacheDb) getTableName() string {
return that.Db.GetPrefix() + CacheTableName
}
// getHistoryTableName 获取带前缀的历史表名
func (that *CacheDb) getHistoryTableName() string {
return that.Db.GetPrefix() + CacheHistoryTableName
}
// getLegacyTableName 获取带前缀的老版本缓存表名
func (that *CacheDb) getLegacyTableName() string {
return that.Db.GetPrefix() + LegacyCacheTableName
}
// isCompatibleMode 是否为兼容模式
func (that *CacheDb) isCompatibleMode() bool {
return that.Mode == CacheModeCompatible
}
// getEffectiveMode 获取有效模式(默认为 new
func (that *CacheDb) getEffectiveMode() string {
if that.Mode == CacheModeCompatible {
return CacheModeCompatible
}
return CacheModeNew
}
// initDbTable 初始化数据库表
func (that *CacheDb) initDbTable() {
if that.isInit {
return
}
dbType := that.Db.GetType()
tableName := that.getTableName()
historyTableName := that.getHistoryTableName()
legacyTableName := that.getLegacyTableName()
// 检查并创建主表
if !that.tableExists(tableName) {
that.createMainTable(dbType, tableName)
}
// 根据模式处理老表
// new 模式:迁移老表数据到新表(不删除老表,由人工删除)
// compatible 模式:不迁移,老表继续使用
if that.getEffectiveMode() == CacheModeNew {
if that.tableExists(legacyTableName) {
that.migrateFromCached(dbType, legacyTableName, tableName)
}
}
// compatible 模式不做任何处理,老表保留供读取
// 检查并创建历史表(开启历史记录时)
if that.HistorySet && !that.tableExists(historyTableName) {
that.createHistoryTable(dbType, historyTableName)
}
that.isInit = true
}
// tableExists 检查表是否存在
func (that *CacheDb) tableExists(tableName string) bool {
dbType := that.Db.GetType()
switch dbType {
case "mysql":
dbNames := that.Db.Query("SELECT DATABASE()")
if len(dbNames) == 0 {
return false
}
dbName := dbNames[0].GetString("DATABASE()")
res := that.Db.Query("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='" + dbName + "' AND TABLE_NAME='" + tableName + "'")
return len(res) != 0
case "sqlite":
res := that.Db.Query(`SELECT name FROM sqlite_master WHERE type='table' AND name='` + tableName + `'`)
return len(res) != 0
case "postgres":
res := that.Db.Query(`SELECT tablename FROM pg_tables WHERE schemaname='public' AND tablename='` + tableName + `'`)
return len(res) != 0
}
return false
}
// createMainTable 创建主表
func (that *CacheDb) createMainTable(dbType, tableName string) {
var createSQL string
switch dbType {
case "mysql":
createSQL = "CREATE TABLE `" + tableName + "` (" +
"`id` int(11) unsigned NOT NULL AUTO_INCREMENT," +
"`key` varchar(64) NOT NULL COMMENT '缓存键'," +
"`value` text DEFAULT NULL COMMENT '缓存值'," +
"`end_time` datetime DEFAULT NULL COMMENT '过期时间'," +
"`state` int(2) DEFAULT '0' COMMENT '状态:0-正常,1-异常,2-隐藏'," +
"`create_time` datetime DEFAULT NULL COMMENT '创建日期'," +
"`modify_time` datetime DEFAULT NULL COMMENT '变更时间'," +
"PRIMARY KEY (`id`)," +
"UNIQUE KEY `uk_key` (`key`)," +
"KEY `idx_end_time` (`end_time`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='缓存管理'"
case "sqlite":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"key" TEXT NOT NULL UNIQUE,
"value" TEXT,
"end_time" TEXT,
"state" INTEGER DEFAULT 0,
"create_time" TEXT,
"modify_time" TEXT
)`
case "postgres":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" SERIAL PRIMARY KEY,
"key" VARCHAR(64) NOT NULL UNIQUE,
"value" TEXT,
"end_time" TIMESTAMP,
"state" INTEGER DEFAULT 0,
"create_time" TIMESTAMP,
"modify_time" TIMESTAMP
)`
that.Db.Exec(createSQL)
// 创建索引
that.Db.Exec(`CREATE INDEX "idx_` + tableName + `_end_time" ON "` + tableName + `" ("end_time")`)
return
}
that.Db.Exec(createSQL)
}
// createHistoryTable 创建历史表
func (that *CacheDb) createHistoryTable(dbType, tableName string) {
var createSQL string
switch dbType {
case "mysql":
createSQL = "CREATE TABLE `" + tableName + "` (" +
"`id` int(11) unsigned NOT NULL AUTO_INCREMENT," +
"`hotime_cache_id` int(11) unsigned DEFAULT NULL COMMENT '缓存ID'," +
"`key` varchar(64) DEFAULT NULL COMMENT '缓存键'," +
"`value` text DEFAULT NULL COMMENT '缓存值'," +
"`end_time` datetime DEFAULT NULL COMMENT '过期时间'," +
"`state` int(2) DEFAULT '0' COMMENT '状态:0-正常,1-异常,2-隐藏'," +
"`create_time` datetime DEFAULT NULL COMMENT '创建日期'," +
"`modify_time` datetime DEFAULT NULL COMMENT '变更时间'," +
"PRIMARY KEY (`id`)," +
"KEY `idx_hotime_cache_id` (`hotime_cache_id`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='缓存历史'"
case "sqlite":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"hotime_cache_id" INTEGER,
"key" TEXT,
"value" TEXT,
"end_time" TEXT,
"state" INTEGER DEFAULT 0,
"create_time" TEXT,
"modify_time" TEXT
)`
case "postgres":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" SERIAL PRIMARY KEY,
"hotime_cache_id" INTEGER,
"key" VARCHAR(64),
"value" TEXT,
"end_time" TIMESTAMP,
"state" INTEGER DEFAULT 0,
"create_time" TIMESTAMP,
"modify_time" TIMESTAMP
)`
that.Db.Exec(createSQL)
// 创建索引
that.Db.Exec(`CREATE INDEX "idx_` + tableName + `_cache_id" ON "` + tableName + `" ("hotime_cache_id")`)
return
}
that.Db.Exec(createSQL)
}
// migrateFromCached 从旧 cached 表迁移数据(不删除老表,由人工删除)
func (that *CacheDb) migrateFromCached(dbType, oldTableName, newTableName string) {
var migrateSQL string
switch dbType {
case "mysql":
// 去重迁移:取每个 key 的最后一条记录id 最大)
// 使用 INSERT IGNORE 避免重复 key 冲突
migrateSQL = "INSERT IGNORE INTO `" + newTableName + "` (`key`, `value`, `end_time`, `state`, `create_time`, `modify_time`) " +
"SELECT c.`key`, c.`value`, FROM_UNIXTIME(c.`endtime`), 0, " +
"FROM_UNIXTIME(c.`time` / 1000000000), FROM_UNIXTIME(c.`time` / 1000000000) " +
"FROM `" + oldTableName + "` c " +
"INNER JOIN (SELECT `key`, MAX(id) as max_id FROM `" + oldTableName + "` GROUP BY `key`) m " +
"ON c.id = m.max_id"
case "sqlite":
migrateSQL = `INSERT OR IGNORE INTO "` + newTableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`SELECT c."key", c."value", datetime(c."endtime", 'unixepoch'), 0, ` +
`datetime(c."time" / 1000000000, 'unixepoch'), datetime(c."time" / 1000000000, 'unixepoch') ` +
`FROM "` + oldTableName + `" c ` +
`INNER JOIN (SELECT "key", MAX(id) as max_id FROM "` + oldTableName + `" GROUP BY "key") m ` +
`ON c.id = m.max_id`
case "postgres":
migrateSQL = `INSERT INTO "` + newTableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`SELECT c."key", c."value", to_timestamp(c."endtime"), 0, ` +
`to_timestamp(c."time" / 1000000000), to_timestamp(c."time" / 1000000000) ` +
`FROM "` + oldTableName + `" c ` +
`INNER JOIN (SELECT "key", MAX(id) as max_id FROM "` + oldTableName + `" GROUP BY "key") m ` +
`ON c.id = m.max_id ` +
`ON CONFLICT ("key") DO NOTHING`
}
// 执行迁移,不删除老表(由人工确认后删除,更安全)
that.Db.Exec(migrateSQL)
}
// writeHistory 写入历史记录
func (that *CacheDb) writeHistory(key string) {
if !that.HistorySet {
return
}
tableName := that.getTableName()
historyTableName := that.getHistoryTableName()
// 查询当前数据
cached := that.Db.Get(tableName, "*", Map{"key": key})
if cached == nil {
return
}
// 构建历史记录数据
historyData := Map{
"hotime_cache_id": cached.GetInt64("id"),
"key": cached.GetString("key"),
"value": cached.GetString("value"),
"end_time": cached.GetString("end_time"),
"state": cached.GetInt("state"),
"create_time": cached.GetString("create_time"),
"modify_time": cached.GetString("modify_time"),
}
// 插入历史表
that.Db.Insert(historyTableName, historyData)
}
// getLegacy 从老表获取缓存(兼容模式使用)
// 老表结构key, value, endtime(unix秒), time(纳秒时间戳)
func (that *CacheDb) getLegacy(key string) interface{} {
legacyTableName := that.getLegacyTableName()
// 检查老表是否存在
if !that.tableExists(legacyTableName) {
return nil
}
cached := that.Db.Get(legacyTableName, "*", Map{"key": key})
if cached == nil {
return nil
}
// 检查过期时间(老表使用 unix 时间戳)
endTime := cached.GetInt64("endtime")
nowUnix := time.Now().Unix()
if endTime > 0 && endTime <= nowUnix {
// 已过期,删除该条记录
that.deleteLegacy(key)
return nil
}
// 解析 value老表 value 格式可能是 {"data": xxx} 或直接值)
valueStr := cached.GetString("value")
if valueStr == "" {
return nil
}
var data interface{}
err := json.Unmarshal([]byte(valueStr), &data)
if err != nil {
return nil
}
// 兼容老版本 {"data": xxx} 包装格式
if dataMap, ok := data.(map[string]interface{}); ok {
if innerData, exists := dataMap["data"]; exists {
return innerData
}
}
return data
}
// deleteLegacy 从老表删除缓存(兼容模式使用)
func (that *CacheDb) deleteLegacy(key string) {
legacyTableName := that.getLegacyTableName()
// 检查老表是否存在
if !that.tableExists(legacyTableName) {
return
}
del := strings.Index(key, "*")
// 如果通配删除
if del != -1 {
keyPrefix := Substr(key, 0, del)
that.Db.Delete(legacyTableName, Map{"key[~]": keyPrefix + "%"})
} else {
that.Db.Delete(legacyTableName, Map{"key": key})
}
}
// get 获取缓存
func (that *CacheDb) get(key string) interface{} {
tableName := that.getTableName()
cached := that.Db.Get(tableName, "*", Map{"key": key})
if cached != nil {
// 使用字符串比较判断过期ISO 格式天然支持)
endTime := cached.GetString("end_time")
nowTime := Time2Str(time.Now())
if endTime != "" && endTime <= nowTime {
// 惰性删除:过期只返回 nil不立即删除
// 依赖随机清理批量删除过期数据
// 继续检查老表(如果是兼容模式)
} else {
// 直接解析 value不再需要 {"data": value} 包装
valueStr := cached.GetString("value")
if valueStr != "" {
var data interface{}
err := json.Unmarshal([]byte(valueStr), &data)
if err == nil {
return data
}
}
}
}
// 兼容模式:新表没有数据时,回退读取老表
if that.isCompatibleMode() {
return that.getLegacy(key)
}
return nil
}
// set 设置缓存
func (that *CacheDb) set(key string, value interface{}, endTime time.Time) {
// 直接序列化 value不再包装
bte, _ := json.Marshal(value)
nowTime := Time2Str(time.Now())
endTimeStr := Time2Str(endTime)
dbType := that.Db.GetType()
tableName := that.getTableName()
// 使用 UPSERT 语法解决并发问题
switch dbType {
case "mysql":
upsertSQL := "INSERT INTO `" + tableName + "` (`key`, `value`, `end_time`, `state`, `create_time`, `modify_time`) " +
"VALUES (?, ?, ?, 0, ?, ?) " +
"ON DUPLICATE KEY UPDATE `value`=VALUES(`value`), `end_time`=VALUES(`end_time`), `modify_time`=VALUES(`modify_time`)"
that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime)
case "sqlite":
// SQLite: INSERT OR REPLACE 会删除后插入,所以用 UPSERT 语法
upsertSQL := `INSERT INTO "` + tableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`VALUES (?, ?, ?, 0, ?, ?) ` +
`ON CONFLICT("key") DO UPDATE SET "value"=excluded."value", "end_time"=excluded."end_time", "modify_time"=excluded."modify_time"`
that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime)
case "postgres":
upsertSQL := `INSERT INTO "` + tableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`VALUES ($1, $2, $3, 0, $4, $5) ` +
`ON CONFLICT ("key") DO UPDATE SET "value"=EXCLUDED."value", "end_time"=EXCLUDED."end_time", "modify_time"=EXCLUDED."modify_time"`
that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime)
default:
// 兼容其他数据库:使用 Update + Insert
num := that.Db.Update(tableName, Map{
"value": string(bte),
"end_time": endTimeStr,
"modify_time": nowTime,
}, Map{"key": key})
if num == 0 {
that.Db.Insert(tableName, Map{
"key": key,
"value": string(bte),
"end_time": endTimeStr,
"state": 0,
"create_time": nowTime,
"modify_time": nowTime,
})
}
}
// 写入历史记录
that.writeHistory(key)
// 兼容模式:写新表后删除老表同 key 记录(加速老数据消亡)
if that.isCompatibleMode() {
that.deleteLegacy(key)
}
// 随机执行删除过期数据命令5% 概率)
if Rand(1000) > 950 {
nowTimeStr := Time2Str(time.Now())
that.Db.Delete(tableName, Map{"end_time[<]": nowTimeStr})
}
}
// delete 删除缓存
func (that *CacheDb) delete(key string) {
tableName := that.getTableName()
del := strings.Index(key, "*")
// 如果通配删除
if del != -1 {
keyPrefix := Substr(key, 0, del)
that.Db.Delete(tableName, Map{"key[~]": keyPrefix + "%"})
} else {
that.Db.Delete(tableName, Map{"key": key})
}
// 兼容模式:同时删除老表中的 key避免回退读到老数据
if that.isCompatibleMode() {
that.deleteLegacy(key)
}
}
// Cache 缓存操作入口
// 用法:
// - Cache(key) - 获取缓存
// - Cache(key, value) - 设置缓存(使用默认过期时间)
// - Cache(key, value, timeout) - 设置缓存(指定过期时间,单位:秒)
// - Cache(key, nil) - 删除缓存
func (that *CacheDb) Cache(key string, data ...interface{}) *Obj {
that.initDbTable()
// 获取缓存
if len(data) == 0 {
return &Obj{Data: that.get(key)}
}
// 删除缓存
if len(data) == 1 && data[0] == nil {
that.delete(key)
return &Obj{Data: nil}
}
// 计算过期时间
var timeout int64
if len(data) == 1 {
// 使用配置的 TimeOut如果为 0 则使用默认值
timeout = that.TimeOut
if timeout == 0 {
timeout = DefaultCacheTimeout
}
} else if len(data) >= 2 {
// 使用指定的超时时间
var err Error
tempTimeout := ObjToInt64(data[1], &err)
if err.GetError() == nil && tempTimeout > 0 {
timeout = tempTimeout
} else {
timeout = that.TimeOut
if timeout == 0 {
timeout = DefaultCacheTimeout
}
}
}
endTime := time.Now().Add(time.Duration(timeout) * time.Second)
that.set(key, data[0], endTime)
return &Obj{Data: nil}
}
// CachesGet 批量获取缓存(使用 IN 查询优化)
// 返回 Mapkey 为缓存键value 为缓存值(不存在或过期的 key 不包含在结果中)
func (that *CacheDb) CachesGet(keys []string) Map {
that.initDbTable()
result := make(Map, len(keys))
if len(keys) == 0 {
return result
}
// #region agent log
debugLogDb("E", "cache_db.go:CachesGet:start", "CacheDb.CachesGet开始", map[string]interface{}{
"keys_count": len(keys),
"keys": keys,
})
// #endregion
tableName := that.getTableName()
nowTime := Time2Str(time.Now())
// 使用 IN 查询批量获取
cachedList := that.Db.Select(tableName, "*", Map{
"key": keys,
"end_time[>]": nowTime,
})
// #region agent log
debugLogDb("E", "cache_db.go:CachesGet:afterSelect", "DB Select完成", map[string]interface{}{
"table": tableName,
"now_time": nowTime,
"found_rows": len(cachedList),
})
// #endregion
for _, cached := range cachedList {
valueStr := cached.GetString("value")
if valueStr != "" {
var data interface{}
err := json.Unmarshal([]byte(valueStr), &data)
if err == nil {
result[cached.GetString("key")] = data
} else {
// #region agent log
debugLogDb("E", "cache_db.go:CachesGet:unmarshalError", "JSON解析失败", map[string]interface{}{
"key": cached.GetString("key"),
"error": err.Error(),
})
// #endregion
}
}
}
// 兼容模式:新表没有的 key回退读取老表
if that.isCompatibleMode() {
// #region agent log
debugLogDb("E", "cache_db.go:CachesGet:compatMode", "兼容模式检查老表", nil)
// #endregion
for _, key := range keys {
if _, exists := result[key]; !exists {
legacyData := that.getLegacy(key)
if legacyData != nil {
result[key] = legacyData
}
}
}
}
// #region agent log
debugLogDb("E", "cache_db.go:CachesGet:end", "CacheDb.CachesGet完成", map[string]interface{}{
"result_count": len(result),
})
// #endregion
return result
}
// CachesSet 批量设置缓存
// data: Mapkey 为缓存键value 为缓存值
// timeout: 可选过期时间(秒),不传则使用默认超时时间
func (that *CacheDb) CachesSet(data Map, timeout ...int64) {
that.initDbTable()
if len(data) == 0 {
return
}
// #region agent log
debugLogDb("A", "cache_db.go:CachesSet:start", "CacheDb.CachesSet开始", map[string]interface{}{
"data_count": len(data),
})
// #endregion
// 计算过期时间
var tim int64
if len(timeout) > 0 && timeout[0] > 0 {
tim = timeout[0]
} else {
tim = that.TimeOut
if tim == 0 {
tim = DefaultCacheTimeout
}
}
endTime := time.Now().Add(time.Duration(tim) * time.Second)
// 逐个设置(保持事务一致性和历史记录)
for key, value := range data {
that.set(key, value, endTime)
}
// #region agent log
debugLogDb("A", "cache_db.go:CachesSet:end", "CacheDb.CachesSet完成", map[string]interface{}{
"data_count": len(data),
"end_time": Time2Str(endTime),
})
// #endregion
}
// CachesDelete 批量删除缓存
func (that *CacheDb) CachesDelete(keys []string) {
that.initDbTable()
if len(keys) == 0 {
return
}
tableName := that.getTableName()
// 使用 IN 条件批量删除
that.Db.Delete(tableName, Map{"key": keys})
// 兼容模式:同时删除老表中的 keys
if that.isCompatibleMode() {
legacyTableName := that.getLegacyTableName()
if that.tableExists(legacyTableName) {
that.Db.Delete(legacyTableName, Map{"key": keys})
}
}
}