hotime/cache/cache_db.go

527 lines
16 KiB
Go
Raw Permalink Normal View History

2021-05-24 07:27:41 +08:00
package cache
2017-08-04 08:20:59 +00:00
import (
2021-05-24 07:27:41 +08:00
"database/sql"
2017-08-04 08:20:59 +00:00
"encoding/json"
"fmt"
"os"
2018-04-09 17:16:24 +00:00
"strings"
2019-11-10 18:00:45 +08:00
"time"
. "code.hoteas.com/golang/hotime/common"
)
// #region agent log
func debugLog(hypothesisId, location, message string, data map[string]interface{}) {
logEntry := fmt.Sprintf(`{"hypothesisId":"%s","location":"%s","message":"%s","data":%s,"timestamp":%d,"sessionId":"debug-session"}`,
hypothesisId, location, message, toJSON(data), time.Now().UnixMilli())
f, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if f != nil {
f.WriteString(logEntry + "\n")
f.Close()
}
}
func toJSON(data map[string]interface{}) string {
b, _ := json.Marshal(data)
return string(b)
}
// #endregion
// 表名常量
const (
CacheTableName = "hotime_cache"
CacheHistoryTableName = "hotime_cache_history"
DefaultCacheTimeout = 24 * 60 * 60 // 默认过期时间 24 小时
2017-08-04 08:20:59 +00:00
)
2021-05-24 07:27:41 +08:00
type HoTimeDBInterface interface {
2021-05-29 00:37:20 +08:00
GetPrefix() string
2021-05-24 07:27:41 +08:00
Query(query string, args ...interface{}) []Map
2021-05-25 19:53:34 +08:00
Exec(query string, args ...interface{}) (sql.Result, *Error)
2021-05-24 07:27:41 +08:00
Get(table string, qu ...interface{}) Map
Select(table string, qu ...interface{}) []Map
Delete(table string, data map[string]interface{}) int64
Update(table string, data Map, where Map) int64
Insert(table string, data map[string]interface{}) int64
GetType() string
}
2017-08-04 08:20:59 +00:00
type CacheDb struct {
TimeOut int64
DbSet bool
SessionSet bool
HistorySet bool // 是否开启历史记录
Db HoTimeDBInterface
*Error
2021-05-24 07:27:41 +08:00
ContextBase
2017-08-04 08:20:59 +00:00
isInit bool
}
2022-03-13 01:48:54 +08:00
func (that *CacheDb) GetError() *Error {
return that.Error
}
2022-03-13 01:48:54 +08:00
func (that *CacheDb) SetError(err *Error) {
that.Error = err
}
// getTableName 获取带前缀的表名
func (that *CacheDb) getTableName() string {
return that.Db.GetPrefix() + CacheTableName
}
// getHistoryTableName 获取带前缀的历史表名
func (that *CacheDb) getHistoryTableName() string {
return that.Db.GetPrefix() + CacheHistoryTableName
}
// initDbTable 初始化数据库表
2021-05-25 20:27:24 +08:00
func (that *CacheDb) initDbTable() {
if that.isInit {
2017-08-04 08:20:59 +00:00
return
}
dbType := that.Db.GetType()
tableName := that.getTableName()
historyTableName := that.getHistoryTableName()
// #region agent log
debugLog("F", "cache_db.go:initDbTable", "initDbTable started", map[string]interface{}{
"dbType": dbType, "tableName": tableName, "historyTableName": historyTableName, "HistorySet": that.HistorySet,
})
// #endregion
// 检查并创建主表
if !that.tableExists(tableName) {
that.createMainTable(dbType, tableName)
}
2019-11-10 18:00:45 +08:00
// 检查并迁移旧 cached 表
oldTableName := that.Db.GetPrefix() + "cached"
if that.tableExists(oldTableName) {
that.migrateFromCached(dbType, oldTableName, tableName)
}
// 检查并创建历史表(开启历史记录时)
historyTableExists := that.tableExists(historyTableName)
// #region agent log
debugLog("F", "cache_db.go:initDbTable", "history table check", map[string]interface{}{
"HistorySet": that.HistorySet, "historyTableName": historyTableName, "exists": historyTableExists,
"shouldCreate": that.HistorySet && !historyTableExists,
})
// #endregion
if that.HistorySet && !historyTableExists {
that.createHistoryTable(dbType, historyTableName)
// #region agent log
debugLog("F", "cache_db.go:initDbTable", "createHistoryTable called", map[string]interface{}{
"dbType": dbType, "historyTableName": historyTableName,
})
// #endregion
}
that.isInit = true
}
// tableExists 检查表是否存在
func (that *CacheDb) tableExists(tableName string) bool {
dbType := that.Db.GetType()
switch dbType {
case "mysql":
dbNames := that.Db.Query("SELECT DATABASE()")
2019-11-10 18:00:45 +08:00
if len(dbNames) == 0 {
return false
2019-11-10 18:00:45 +08:00
}
dbName := dbNames[0].GetString("DATABASE()")
res := that.Db.Query("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='" + dbName + "' AND TABLE_NAME='" + tableName + "'")
return len(res) != 0
2019-11-10 18:00:45 +08:00
case "sqlite":
res := that.Db.Query(`SELECT name FROM sqlite_master WHERE type='table' AND name='` + tableName + `'`)
return len(res) != 0
2017-08-04 08:20:59 +00:00
case "postgres":
res := that.Db.Query(`SELECT tablename FROM pg_tables WHERE schemaname='public' AND tablename='` + tableName + `'`)
return len(res) != 0
2017-08-04 08:20:59 +00:00
}
2019-11-10 18:00:45 +08:00
return false
}
2019-11-10 18:00:45 +08:00
// createMainTable 创建主表
func (that *CacheDb) createMainTable(dbType, tableName string) {
var createSQL string
switch dbType {
case "mysql":
createSQL = "CREATE TABLE `" + tableName + "` (" +
"`id` int(11) unsigned NOT NULL AUTO_INCREMENT," +
"`key` varchar(64) NOT NULL COMMENT '缓存键'," +
"`value` text DEFAULT NULL COMMENT '缓存值'," +
"`end_time` datetime DEFAULT NULL COMMENT '过期时间'," +
"`state` int(2) DEFAULT '0' COMMENT '状态:0-正常,1-异常,2-隐藏'," +
"`create_time` datetime DEFAULT NULL COMMENT '创建日期'," +
"`modify_time` datetime DEFAULT NULL COMMENT '变更时间'," +
"PRIMARY KEY (`id`)," +
"UNIQUE KEY `uk_key` (`key`)," +
"KEY `idx_end_time` (`end_time`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='缓存管理'"
case "sqlite":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"key" TEXT NOT NULL UNIQUE,
"value" TEXT,
"end_time" TEXT,
"state" INTEGER DEFAULT 0,
"create_time" TEXT,
"modify_time" TEXT
)`
case "postgres":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" SERIAL PRIMARY KEY,
"key" VARCHAR(64) NOT NULL UNIQUE,
"value" TEXT,
"end_time" TIMESTAMP,
"state" INTEGER DEFAULT 0,
"create_time" TIMESTAMP,
"modify_time" TIMESTAMP
)`
that.Db.Exec(createSQL)
// 创建索引
that.Db.Exec(`CREATE INDEX "idx_` + tableName + `_end_time" ON "` + tableName + `" ("end_time")`)
return
}
that.Db.Exec(createSQL)
}
// createHistoryTable 创建历史表
func (that *CacheDb) createHistoryTable(dbType, tableName string) {
var createSQL string
switch dbType {
case "mysql":
createSQL = "CREATE TABLE `" + tableName + "` (" +
"`id` int(11) unsigned NOT NULL AUTO_INCREMENT," +
"`hotime_cache_id` int(11) unsigned DEFAULT NULL COMMENT '缓存ID'," +
"`key` varchar(64) DEFAULT NULL COMMENT '缓存键'," +
"`value` text DEFAULT NULL COMMENT '缓存值'," +
"`end_time` datetime DEFAULT NULL COMMENT '过期时间'," +
"`state` int(2) DEFAULT '0' COMMENT '状态:0-正常,1-异常,2-隐藏'," +
"`create_time` datetime DEFAULT NULL COMMENT '创建日期'," +
"`modify_time` datetime DEFAULT NULL COMMENT '变更时间'," +
"PRIMARY KEY (`id`)," +
"KEY `idx_hotime_cache_id` (`hotime_cache_id`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='缓存历史'"
case "sqlite":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"hotime_cache_id" INTEGER,
"key" TEXT,
"value" TEXT,
"end_time" TEXT,
"state" INTEGER DEFAULT 0,
"create_time" TEXT,
"modify_time" TEXT
)`
case "postgres":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" SERIAL PRIMARY KEY,
"hotime_cache_id" INTEGER,
"key" VARCHAR(64),
"value" TEXT,
"end_time" TIMESTAMP,
"state" INTEGER DEFAULT 0,
"create_time" TIMESTAMP,
"modify_time" TIMESTAMP
)`
that.Db.Exec(createSQL)
// 创建索引
that.Db.Exec(`CREATE INDEX "idx_` + tableName + `_cache_id" ON "` + tableName + `" ("hotime_cache_id")`)
return
}
that.Db.Exec(createSQL)
}
// migrateFromCached 从旧 cached 表迁移数据
func (that *CacheDb) migrateFromCached(dbType, oldTableName, newTableName string) {
var migrateSQL string
switch dbType {
case "mysql":
// 去重迁移:取每个 key 的最后一条记录id 最大)
migrateSQL = "INSERT INTO `" + newTableName + "` (`key`, `value`, `end_time`, `state`, `create_time`, `modify_time`) " +
"SELECT c.`key`, c.`value`, FROM_UNIXTIME(c.`endtime`), 0, " +
"FROM_UNIXTIME(c.`time` / 1000000000), FROM_UNIXTIME(c.`time` / 1000000000) " +
"FROM `" + oldTableName + "` c " +
"INNER JOIN (SELECT `key`, MAX(id) as max_id FROM `" + oldTableName + "` GROUP BY `key`) m " +
"ON c.id = m.max_id"
case "sqlite":
migrateSQL = `INSERT INTO "` + newTableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`SELECT c."key", c."value", datetime(c."endtime", 'unixepoch'), 0, ` +
`datetime(c."time" / 1000000000, 'unixepoch'), datetime(c."time" / 1000000000, 'unixepoch') ` +
`FROM "` + oldTableName + `" c ` +
`INNER JOIN (SELECT "key", MAX(id) as max_id FROM "` + oldTableName + `" GROUP BY "key") m ` +
`ON c.id = m.max_id`
case "postgres":
migrateSQL = `INSERT INTO "` + newTableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`SELECT c."key", c."value", to_timestamp(c."endtime"), 0, ` +
`to_timestamp(c."time" / 1000000000), to_timestamp(c."time" / 1000000000) ` +
`FROM "` + oldTableName + `" c ` +
`INNER JOIN (SELECT "key", MAX(id) as max_id FROM "` + oldTableName + `" GROUP BY "key") m ` +
`ON c.id = m.max_id`
}
// 执行迁移
_, err := that.Db.Exec(migrateSQL)
if err.GetError() == nil {
// 迁移成功,删除旧表
var dropSQL string
switch dbType {
case "mysql":
dropSQL = "DROP TABLE `" + oldTableName + "`"
case "sqlite", "postgres":
dropSQL = `DROP TABLE "` + oldTableName + `"`
2019-11-10 18:00:45 +08:00
}
that.Db.Exec(dropSQL)
}
}
// writeHistory 写入历史记录
func (that *CacheDb) writeHistory(key string) {
if !that.HistorySet {
return
}
tableName := that.getTableName()
historyTableName := that.getHistoryTableName()
2019-11-10 18:00:45 +08:00
// 查询当前数据
cached := that.Db.Get(tableName, "*", Map{"key": key})
if cached == nil {
return
2017-08-04 08:20:59 +00:00
}
// 构建历史记录数据
historyData := Map{
"hotime_cache_id": cached.GetInt64("id"),
"key": cached.GetString("key"),
"value": cached.GetString("value"),
"end_time": cached.GetString("end_time"),
"state": cached.GetInt("state"),
"create_time": cached.GetString("create_time"),
"modify_time": cached.GetString("modify_time"),
}
// 插入历史表
that.Db.Insert(historyTableName, historyData)
// #region agent log
logFile3, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if logFile3 != nil {
fmt.Fprintf(logFile3, `{"hypothesisId":"C","location":"cache_db.go:writeHistory","message":"history written","data":{"key":"%s","cacheId":%d},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, cached.GetInt64("id"), time.Now().UnixMilli())
logFile3.Close()
}
// #endregion
2017-08-04 08:20:59 +00:00
}
// get 获取缓存
2021-05-25 20:27:24 +08:00
func (that *CacheDb) get(key string) interface{} {
tableName := that.getTableName()
cached := that.Db.Get(tableName, "*", Map{"key": key})
// #region agent log
logFile4, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if logFile4 != nil {
found := cached != nil
fmt.Fprintf(logFile4, `{"hypothesisId":"D","location":"cache_db.go:get","message":"get query","data":{"key":"%s","found":%t},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, found, time.Now().UnixMilli())
logFile4.Close()
}
// #endregion
2017-08-04 08:20:59 +00:00
if cached == nil {
return nil
}
// 使用字符串比较判断过期ISO 格式天然支持)
endTime := cached.GetString("end_time")
nowTime := Time2Str(time.Now())
if endTime != "" && endTime <= nowTime {
// 惰性删除:过期只返回 nil不立即删除
// 依赖随机清理批量删除过期数据
2017-08-04 08:20:59 +00:00
return nil
}
// 直接解析 value不再需要 {"data": value} 包装
valueStr := cached.GetString("value")
if valueStr == "" {
return nil
}
2017-08-04 08:20:59 +00:00
var data interface{}
err := json.Unmarshal([]byte(valueStr), &data)
if err != nil {
return nil
}
// #region agent log
logFile5, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if logFile5 != nil {
fmt.Fprintf(logFile5, `{"hypothesisId":"D","location":"cache_db.go:get","message":"get success","data":{"key":"%s","hasData":true},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, time.Now().UnixMilli())
logFile5.Close()
}
// #endregion
2017-08-04 08:20:59 +00:00
return data
}
2017-08-04 08:20:59 +00:00
// set 设置缓存
func (that *CacheDb) set(key string, value interface{}, endTime time.Time) {
// 直接序列化 value不再包装
bte, _ := json.Marshal(value)
nowTime := Time2Str(time.Now())
endTimeStr := Time2Str(endTime)
dbType := that.Db.GetType()
tableName := that.getTableName()
// 使用 UPSERT 语法解决并发问题
switch dbType {
case "mysql":
upsertSQL := "INSERT INTO `" + tableName + "` (`key`, `value`, `end_time`, `state`, `create_time`, `modify_time`) " +
"VALUES (?, ?, ?, 0, ?, ?) " +
"ON DUPLICATE KEY UPDATE `value`=VALUES(`value`), `end_time`=VALUES(`end_time`), `modify_time`=VALUES(`modify_time`)"
that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime)
case "sqlite":
// SQLite: INSERT OR REPLACE 会删除后插入,所以用 UPSERT 语法
upsertSQL := `INSERT INTO "` + tableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`VALUES (?, ?, ?, 0, ?, ?) ` +
`ON CONFLICT("key") DO UPDATE SET "value"=excluded."value", "end_time"=excluded."end_time", "modify_time"=excluded."modify_time"`
that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime)
case "postgres":
upsertSQL := `INSERT INTO "` + tableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`VALUES ($1, $2, $3, 0, $4, $5) ` +
`ON CONFLICT ("key") DO UPDATE SET "value"=EXCLUDED."value", "end_time"=EXCLUDED."end_time", "modify_time"=EXCLUDED."modify_time"`
that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime)
default:
// 兼容其他数据库:使用 Update + Insert
num := that.Db.Update(tableName, Map{
"value": string(bte),
"end_time": endTimeStr,
"modify_time": nowTime,
}, Map{"key": key})
if num == 0 {
that.Db.Insert(tableName, Map{
"key": key,
"value": string(bte),
"end_time": endTimeStr,
"state": 0,
"create_time": nowTime,
"modify_time": nowTime,
})
}
}
2017-08-04 08:20:59 +00:00
// #region agent log
logFile2, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if logFile2 != nil {
fmt.Fprintf(logFile2, `{"hypothesisId":"B","location":"cache_db.go:set","message":"set completed","data":{"key":"%s","endTime":"%s","dbType":"%s"},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, endTimeStr, dbType, time.Now().UnixMilli())
logFile2.Close()
2017-08-04 08:20:59 +00:00
}
// #endregion
2017-08-04 08:20:59 +00:00
// 写入历史记录
that.writeHistory(key)
// 随机执行删除过期数据命令5% 概率)
2017-08-04 08:20:59 +00:00
if Rand(1000) > 950 {
nowTimeStr := Time2Str(time.Now())
that.Db.Delete(tableName, Map{"end_time[<]": nowTimeStr})
2017-08-04 08:20:59 +00:00
}
}
// delete 删除缓存
2021-05-25 20:27:24 +08:00
func (that *CacheDb) delete(key string) {
tableName := that.getTableName()
2019-11-10 18:00:45 +08:00
del := strings.Index(key, "*")
// 如果通配删除
2019-11-10 18:00:45 +08:00
if del != -1 {
key = Substr(key, 0, del)
that.Db.Delete(tableName, Map{"key[~]": key + "%"})
2019-11-10 18:00:45 +08:00
} else {
that.Db.Delete(tableName, Map{"key": key})
2018-04-09 17:16:24 +00:00
}
2017-08-04 08:20:59 +00:00
}
// Cache 缓存操作入口
// 用法:
// - Cache(key) - 获取缓存
// - Cache(key, value) - 设置缓存(使用默认过期时间)
// - Cache(key, value, timeout) - 设置缓存(指定过期时间,单位:秒)
// - Cache(key, nil) - 删除缓存
2021-05-25 20:27:24 +08:00
func (that *CacheDb) Cache(key string, data ...interface{}) *Obj {
that.initDbTable()
2017-08-04 08:20:59 +00:00
// #region agent log
logFile, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if logFile != nil {
op := "get"
if len(data) == 1 && data[0] == nil {
op = "delete"
} else if len(data) >= 1 {
op = "set"
}
fmt.Fprintf(logFile, `{"hypothesisId":"A","location":"cache_db.go:Cache","message":"Cache called","data":{"key":"%s","operation":"%s","dataLen":%d},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, op, len(data), time.Now().UnixMilli())
logFile.Close()
}
// #endregion
// 获取缓存
2017-08-04 08:20:59 +00:00
if len(data) == 0 {
2021-05-25 20:27:24 +08:00
return &Obj{Data: that.get(key)}
2017-08-04 08:20:59 +00:00
}
// 删除缓存
2017-08-04 08:20:59 +00:00
if len(data) == 1 && data[0] == nil {
2021-05-25 20:27:24 +08:00
that.delete(key)
2017-08-04 08:20:59 +00:00
return &Obj{Data: nil}
}
// 计算过期时间
var timeout int64
2017-08-04 08:20:59 +00:00
if len(data) == 1 {
// 使用配置的 TimeOut如果为 0 则使用默认值
timeout = that.TimeOut
if timeout == 0 {
timeout = DefaultCacheTimeout
2017-08-04 08:20:59 +00:00
}
} else if len(data) >= 2 {
// 使用指定的超时时间
2021-05-25 20:27:24 +08:00
that.SetError(nil)
tempTimeout := ObjToInt64(data[1], that.Error)
if that.GetError() == nil && tempTimeout > 0 {
timeout = tempTimeout
} else {
timeout = that.TimeOut
if timeout == 0 {
timeout = DefaultCacheTimeout
}
2017-08-04 08:20:59 +00:00
}
}
endTime := time.Now().Add(time.Duration(timeout) * time.Second)
that.set(key, data[0], endTime)
2017-08-04 08:20:59 +00:00
return &Obj{Data: nil}
}