hotime/cache/cache_db.go
hoteas 230dfc5a2b feat(cache): 增加历史记录功能和数据库设计规范
- 在 README.md 中新增数据库设计规范、代码生成配置规范和改进规划的文档链接
- 在 CacheDb 结构体中添加历史记录开关 HistorySet
- 实现历史记录的写入逻辑,记录每次缓存的新增和修改
- 更新缓存的获取和设置方法,支持历史记录的管理
- 优化数据库表的初始化和迁移逻辑,确保历史表的创建与管理
2026-01-25 05:14:18 +08:00

527 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cache
import (
"database/sql"
"encoding/json"
"fmt"
"os"
"strings"
"time"
. "code.hoteas.com/golang/hotime/common"
)
// #region agent log
func debugLog(hypothesisId, location, message string, data map[string]interface{}) {
logEntry := fmt.Sprintf(`{"hypothesisId":"%s","location":"%s","message":"%s","data":%s,"timestamp":%d,"sessionId":"debug-session"}`,
hypothesisId, location, message, toJSON(data), time.Now().UnixMilli())
f, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if f != nil {
f.WriteString(logEntry + "\n")
f.Close()
}
}
func toJSON(data map[string]interface{}) string {
b, _ := json.Marshal(data)
return string(b)
}
// #endregion
// 表名常量
const (
CacheTableName = "hotime_cache"
CacheHistoryTableName = "hotime_cache_history"
DefaultCacheTimeout = 24 * 60 * 60 // 默认过期时间 24 小时
)
type HoTimeDBInterface interface {
GetPrefix() string
Query(query string, args ...interface{}) []Map
Exec(query string, args ...interface{}) (sql.Result, *Error)
Get(table string, qu ...interface{}) Map
Select(table string, qu ...interface{}) []Map
Delete(table string, data map[string]interface{}) int64
Update(table string, data Map, where Map) int64
Insert(table string, data map[string]interface{}) int64
GetType() string
}
type CacheDb struct {
TimeOut int64
DbSet bool
SessionSet bool
HistorySet bool // 是否开启历史记录
Db HoTimeDBInterface
*Error
ContextBase
isInit bool
}
func (that *CacheDb) GetError() *Error {
return that.Error
}
func (that *CacheDb) SetError(err *Error) {
that.Error = err
}
// getTableName 获取带前缀的表名
func (that *CacheDb) getTableName() string {
return that.Db.GetPrefix() + CacheTableName
}
// getHistoryTableName 获取带前缀的历史表名
func (that *CacheDb) getHistoryTableName() string {
return that.Db.GetPrefix() + CacheHistoryTableName
}
// initDbTable 初始化数据库表
func (that *CacheDb) initDbTable() {
if that.isInit {
return
}
dbType := that.Db.GetType()
tableName := that.getTableName()
historyTableName := that.getHistoryTableName()
// #region agent log
debugLog("F", "cache_db.go:initDbTable", "initDbTable started", map[string]interface{}{
"dbType": dbType, "tableName": tableName, "historyTableName": historyTableName, "HistorySet": that.HistorySet,
})
// #endregion
// 检查并创建主表
if !that.tableExists(tableName) {
that.createMainTable(dbType, tableName)
}
// 检查并迁移旧 cached 表
oldTableName := that.Db.GetPrefix() + "cached"
if that.tableExists(oldTableName) {
that.migrateFromCached(dbType, oldTableName, tableName)
}
// 检查并创建历史表(开启历史记录时)
historyTableExists := that.tableExists(historyTableName)
// #region agent log
debugLog("F", "cache_db.go:initDbTable", "history table check", map[string]interface{}{
"HistorySet": that.HistorySet, "historyTableName": historyTableName, "exists": historyTableExists,
"shouldCreate": that.HistorySet && !historyTableExists,
})
// #endregion
if that.HistorySet && !historyTableExists {
that.createHistoryTable(dbType, historyTableName)
// #region agent log
debugLog("F", "cache_db.go:initDbTable", "createHistoryTable called", map[string]interface{}{
"dbType": dbType, "historyTableName": historyTableName,
})
// #endregion
}
that.isInit = true
}
// tableExists 检查表是否存在
func (that *CacheDb) tableExists(tableName string) bool {
dbType := that.Db.GetType()
switch dbType {
case "mysql":
dbNames := that.Db.Query("SELECT DATABASE()")
if len(dbNames) == 0 {
return false
}
dbName := dbNames[0].GetString("DATABASE()")
res := that.Db.Query("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='" + dbName + "' AND TABLE_NAME='" + tableName + "'")
return len(res) != 0
case "sqlite":
res := that.Db.Query(`SELECT name FROM sqlite_master WHERE type='table' AND name='` + tableName + `'`)
return len(res) != 0
case "postgres":
res := that.Db.Query(`SELECT tablename FROM pg_tables WHERE schemaname='public' AND tablename='` + tableName + `'`)
return len(res) != 0
}
return false
}
// createMainTable 创建主表
func (that *CacheDb) createMainTable(dbType, tableName string) {
var createSQL string
switch dbType {
case "mysql":
createSQL = "CREATE TABLE `" + tableName + "` (" +
"`id` int(11) unsigned NOT NULL AUTO_INCREMENT," +
"`key` varchar(64) NOT NULL COMMENT '缓存键'," +
"`value` text DEFAULT NULL COMMENT '缓存值'," +
"`end_time` datetime DEFAULT NULL COMMENT '过期时间'," +
"`state` int(2) DEFAULT '0' COMMENT '状态:0-正常,1-异常,2-隐藏'," +
"`create_time` datetime DEFAULT NULL COMMENT '创建日期'," +
"`modify_time` datetime DEFAULT NULL COMMENT '变更时间'," +
"PRIMARY KEY (`id`)," +
"UNIQUE KEY `uk_key` (`key`)," +
"KEY `idx_end_time` (`end_time`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='缓存管理'"
case "sqlite":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"key" TEXT NOT NULL UNIQUE,
"value" TEXT,
"end_time" TEXT,
"state" INTEGER DEFAULT 0,
"create_time" TEXT,
"modify_time" TEXT
)`
case "postgres":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" SERIAL PRIMARY KEY,
"key" VARCHAR(64) NOT NULL UNIQUE,
"value" TEXT,
"end_time" TIMESTAMP,
"state" INTEGER DEFAULT 0,
"create_time" TIMESTAMP,
"modify_time" TIMESTAMP
)`
that.Db.Exec(createSQL)
// 创建索引
that.Db.Exec(`CREATE INDEX "idx_` + tableName + `_end_time" ON "` + tableName + `" ("end_time")`)
return
}
that.Db.Exec(createSQL)
}
// createHistoryTable 创建历史表
func (that *CacheDb) createHistoryTable(dbType, tableName string) {
var createSQL string
switch dbType {
case "mysql":
createSQL = "CREATE TABLE `" + tableName + "` (" +
"`id` int(11) unsigned NOT NULL AUTO_INCREMENT," +
"`hotime_cache_id` int(11) unsigned DEFAULT NULL COMMENT '缓存ID'," +
"`key` varchar(64) DEFAULT NULL COMMENT '缓存键'," +
"`value` text DEFAULT NULL COMMENT '缓存值'," +
"`end_time` datetime DEFAULT NULL COMMENT '过期时间'," +
"`state` int(2) DEFAULT '0' COMMENT '状态:0-正常,1-异常,2-隐藏'," +
"`create_time` datetime DEFAULT NULL COMMENT '创建日期'," +
"`modify_time` datetime DEFAULT NULL COMMENT '变更时间'," +
"PRIMARY KEY (`id`)," +
"KEY `idx_hotime_cache_id` (`hotime_cache_id`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='缓存历史'"
case "sqlite":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"hotime_cache_id" INTEGER,
"key" TEXT,
"value" TEXT,
"end_time" TEXT,
"state" INTEGER DEFAULT 0,
"create_time" TEXT,
"modify_time" TEXT
)`
case "postgres":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" SERIAL PRIMARY KEY,
"hotime_cache_id" INTEGER,
"key" VARCHAR(64),
"value" TEXT,
"end_time" TIMESTAMP,
"state" INTEGER DEFAULT 0,
"create_time" TIMESTAMP,
"modify_time" TIMESTAMP
)`
that.Db.Exec(createSQL)
// 创建索引
that.Db.Exec(`CREATE INDEX "idx_` + tableName + `_cache_id" ON "` + tableName + `" ("hotime_cache_id")`)
return
}
that.Db.Exec(createSQL)
}
// migrateFromCached 从旧 cached 表迁移数据
func (that *CacheDb) migrateFromCached(dbType, oldTableName, newTableName string) {
var migrateSQL string
switch dbType {
case "mysql":
// 去重迁移:取每个 key 的最后一条记录id 最大)
migrateSQL = "INSERT INTO `" + newTableName + "` (`key`, `value`, `end_time`, `state`, `create_time`, `modify_time`) " +
"SELECT c.`key`, c.`value`, FROM_UNIXTIME(c.`endtime`), 0, " +
"FROM_UNIXTIME(c.`time` / 1000000000), FROM_UNIXTIME(c.`time` / 1000000000) " +
"FROM `" + oldTableName + "` c " +
"INNER JOIN (SELECT `key`, MAX(id) as max_id FROM `" + oldTableName + "` GROUP BY `key`) m " +
"ON c.id = m.max_id"
case "sqlite":
migrateSQL = `INSERT INTO "` + newTableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`SELECT c."key", c."value", datetime(c."endtime", 'unixepoch'), 0, ` +
`datetime(c."time" / 1000000000, 'unixepoch'), datetime(c."time" / 1000000000, 'unixepoch') ` +
`FROM "` + oldTableName + `" c ` +
`INNER JOIN (SELECT "key", MAX(id) as max_id FROM "` + oldTableName + `" GROUP BY "key") m ` +
`ON c.id = m.max_id`
case "postgres":
migrateSQL = `INSERT INTO "` + newTableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`SELECT c."key", c."value", to_timestamp(c."endtime"), 0, ` +
`to_timestamp(c."time" / 1000000000), to_timestamp(c."time" / 1000000000) ` +
`FROM "` + oldTableName + `" c ` +
`INNER JOIN (SELECT "key", MAX(id) as max_id FROM "` + oldTableName + `" GROUP BY "key") m ` +
`ON c.id = m.max_id`
}
// 执行迁移
_, err := that.Db.Exec(migrateSQL)
if err.GetError() == nil {
// 迁移成功,删除旧表
var dropSQL string
switch dbType {
case "mysql":
dropSQL = "DROP TABLE `" + oldTableName + "`"
case "sqlite", "postgres":
dropSQL = `DROP TABLE "` + oldTableName + `"`
}
that.Db.Exec(dropSQL)
}
}
// writeHistory 写入历史记录
func (that *CacheDb) writeHistory(key string) {
if !that.HistorySet {
return
}
tableName := that.getTableName()
historyTableName := that.getHistoryTableName()
// 查询当前数据
cached := that.Db.Get(tableName, "*", Map{"key": key})
if cached == nil {
return
}
// 构建历史记录数据
historyData := Map{
"hotime_cache_id": cached.GetInt64("id"),
"key": cached.GetString("key"),
"value": cached.GetString("value"),
"end_time": cached.GetString("end_time"),
"state": cached.GetInt("state"),
"create_time": cached.GetString("create_time"),
"modify_time": cached.GetString("modify_time"),
}
// 插入历史表
that.Db.Insert(historyTableName, historyData)
// #region agent log
logFile3, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if logFile3 != nil {
fmt.Fprintf(logFile3, `{"hypothesisId":"C","location":"cache_db.go:writeHistory","message":"history written","data":{"key":"%s","cacheId":%d},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, cached.GetInt64("id"), time.Now().UnixMilli())
logFile3.Close()
}
// #endregion
}
// get 获取缓存
func (that *CacheDb) get(key string) interface{} {
tableName := that.getTableName()
cached := that.Db.Get(tableName, "*", Map{"key": key})
// #region agent log
logFile4, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if logFile4 != nil {
found := cached != nil
fmt.Fprintf(logFile4, `{"hypothesisId":"D","location":"cache_db.go:get","message":"get query","data":{"key":"%s","found":%t},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, found, time.Now().UnixMilli())
logFile4.Close()
}
// #endregion
if cached == nil {
return nil
}
// 使用字符串比较判断过期ISO 格式天然支持)
endTime := cached.GetString("end_time")
nowTime := Time2Str(time.Now())
if endTime != "" && endTime <= nowTime {
// 惰性删除:过期只返回 nil不立即删除
// 依赖随机清理批量删除过期数据
return nil
}
// 直接解析 value不再需要 {"data": value} 包装
valueStr := cached.GetString("value")
if valueStr == "" {
return nil
}
var data interface{}
err := json.Unmarshal([]byte(valueStr), &data)
if err != nil {
return nil
}
// #region agent log
logFile5, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if logFile5 != nil {
fmt.Fprintf(logFile5, `{"hypothesisId":"D","location":"cache_db.go:get","message":"get success","data":{"key":"%s","hasData":true},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, time.Now().UnixMilli())
logFile5.Close()
}
// #endregion
return data
}
// set 设置缓存
func (that *CacheDb) set(key string, value interface{}, endTime time.Time) {
// 直接序列化 value不再包装
bte, _ := json.Marshal(value)
nowTime := Time2Str(time.Now())
endTimeStr := Time2Str(endTime)
dbType := that.Db.GetType()
tableName := that.getTableName()
// 使用 UPSERT 语法解决并发问题
switch dbType {
case "mysql":
upsertSQL := "INSERT INTO `" + tableName + "` (`key`, `value`, `end_time`, `state`, `create_time`, `modify_time`) " +
"VALUES (?, ?, ?, 0, ?, ?) " +
"ON DUPLICATE KEY UPDATE `value`=VALUES(`value`), `end_time`=VALUES(`end_time`), `modify_time`=VALUES(`modify_time`)"
that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime)
case "sqlite":
// SQLite: INSERT OR REPLACE 会删除后插入,所以用 UPSERT 语法
upsertSQL := `INSERT INTO "` + tableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`VALUES (?, ?, ?, 0, ?, ?) ` +
`ON CONFLICT("key") DO UPDATE SET "value"=excluded."value", "end_time"=excluded."end_time", "modify_time"=excluded."modify_time"`
that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime)
case "postgres":
upsertSQL := `INSERT INTO "` + tableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`VALUES ($1, $2, $3, 0, $4, $5) ` +
`ON CONFLICT ("key") DO UPDATE SET "value"=EXCLUDED."value", "end_time"=EXCLUDED."end_time", "modify_time"=EXCLUDED."modify_time"`
that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime)
default:
// 兼容其他数据库:使用 Update + Insert
num := that.Db.Update(tableName, Map{
"value": string(bte),
"end_time": endTimeStr,
"modify_time": nowTime,
}, Map{"key": key})
if num == 0 {
that.Db.Insert(tableName, Map{
"key": key,
"value": string(bte),
"end_time": endTimeStr,
"state": 0,
"create_time": nowTime,
"modify_time": nowTime,
})
}
}
// #region agent log
logFile2, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if logFile2 != nil {
fmt.Fprintf(logFile2, `{"hypothesisId":"B","location":"cache_db.go:set","message":"set completed","data":{"key":"%s","endTime":"%s","dbType":"%s"},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, endTimeStr, dbType, time.Now().UnixMilli())
logFile2.Close()
}
// #endregion
// 写入历史记录
that.writeHistory(key)
// 随机执行删除过期数据命令5% 概率)
if Rand(1000) > 950 {
nowTimeStr := Time2Str(time.Now())
that.Db.Delete(tableName, Map{"end_time[<]": nowTimeStr})
}
}
// delete 删除缓存
func (that *CacheDb) delete(key string) {
tableName := that.getTableName()
del := strings.Index(key, "*")
// 如果通配删除
if del != -1 {
key = Substr(key, 0, del)
that.Db.Delete(tableName, Map{"key[~]": key + "%"})
} else {
that.Db.Delete(tableName, Map{"key": key})
}
}
// Cache 缓存操作入口
// 用法:
// - Cache(key) - 获取缓存
// - Cache(key, value) - 设置缓存(使用默认过期时间)
// - Cache(key, value, timeout) - 设置缓存(指定过期时间,单位:秒)
// - Cache(key, nil) - 删除缓存
func (that *CacheDb) Cache(key string, data ...interface{}) *Obj {
that.initDbTable()
// #region agent log
logFile, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if logFile != nil {
op := "get"
if len(data) == 1 && data[0] == nil {
op = "delete"
} else if len(data) >= 1 {
op = "set"
}
fmt.Fprintf(logFile, `{"hypothesisId":"A","location":"cache_db.go:Cache","message":"Cache called","data":{"key":"%s","operation":"%s","dataLen":%d},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, op, len(data), time.Now().UnixMilli())
logFile.Close()
}
// #endregion
// 获取缓存
if len(data) == 0 {
return &Obj{Data: that.get(key)}
}
// 删除缓存
if len(data) == 1 && data[0] == nil {
that.delete(key)
return &Obj{Data: nil}
}
// 计算过期时间
var timeout int64
if len(data) == 1 {
// 使用配置的 TimeOut如果为 0 则使用默认值
timeout = that.TimeOut
if timeout == 0 {
timeout = DefaultCacheTimeout
}
} else if len(data) >= 2 {
// 使用指定的超时时间
that.SetError(nil)
tempTimeout := ObjToInt64(data[1], that.Error)
if that.GetError() == nil && tempTimeout > 0 {
timeout = tempTimeout
} else {
timeout = that.TimeOut
if timeout == 0 {
timeout = DefaultCacheTimeout
}
}
}
endTime := time.Now().Add(time.Duration(timeout) * time.Second)
that.set(key, data[0], endTime)
return &Obj{Data: nil}
}