hotime/cache/cache_db.go
hoteas 27300025f3 refactor(cache): 移除调试日志并优化缓存逻辑
- 删除 CacheDb 结构体中的 debugLog 调用,清理冗余的调试日志代码
- 优化历史表的创建逻辑,简化条件检查
- 更新缓存的获取和设置方法,确保逻辑清晰且高效
- 维护代码整洁性,提升可读性和可维护性
2026-01-25 23:15:32 +08:00

443 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cache
import (
"database/sql"
"encoding/json"
"strings"
"time"
. "code.hoteas.com/golang/hotime/common"
)
// 表名常量
const (
CacheTableName = "hotime_cache"
CacheHistoryTableName = "hotime_cache_history"
DefaultCacheTimeout = 24 * 60 * 60 // 默认过期时间 24 小时
)
type HoTimeDBInterface interface {
GetPrefix() string
Query(query string, args ...interface{}) []Map
Exec(query string, args ...interface{}) (sql.Result, *Error)
Get(table string, qu ...interface{}) Map
Select(table string, qu ...interface{}) []Map
Delete(table string, data map[string]interface{}) int64
Update(table string, data Map, where Map) int64
Insert(table string, data map[string]interface{}) int64
GetType() string
}
type CacheDb struct {
TimeOut int64
DbSet bool
SessionSet bool
HistorySet bool // 是否开启历史记录
Db HoTimeDBInterface
*Error
ContextBase
isInit bool
}
func (that *CacheDb) GetError() *Error {
return that.Error
}
func (that *CacheDb) SetError(err *Error) {
that.Error = err
}
// getTableName 获取带前缀的表名
func (that *CacheDb) getTableName() string {
return that.Db.GetPrefix() + CacheTableName
}
// getHistoryTableName 获取带前缀的历史表名
func (that *CacheDb) getHistoryTableName() string {
return that.Db.GetPrefix() + CacheHistoryTableName
}
// initDbTable 初始化数据库表
func (that *CacheDb) initDbTable() {
if that.isInit {
return
}
dbType := that.Db.GetType()
tableName := that.getTableName()
historyTableName := that.getHistoryTableName()
// 检查并创建主表
if !that.tableExists(tableName) {
that.createMainTable(dbType, tableName)
}
// 检查并迁移旧 cached 表
oldTableName := that.Db.GetPrefix() + "cached"
if that.tableExists(oldTableName) {
that.migrateFromCached(dbType, oldTableName, tableName)
}
// 检查并创建历史表(开启历史记录时)
if that.HistorySet && !that.tableExists(historyTableName) {
that.createHistoryTable(dbType, historyTableName)
}
that.isInit = true
}
// tableExists 检查表是否存在
func (that *CacheDb) tableExists(tableName string) bool {
dbType := that.Db.GetType()
switch dbType {
case "mysql":
dbNames := that.Db.Query("SELECT DATABASE()")
if len(dbNames) == 0 {
return false
}
dbName := dbNames[0].GetString("DATABASE()")
res := that.Db.Query("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='" + dbName + "' AND TABLE_NAME='" + tableName + "'")
return len(res) != 0
case "sqlite":
res := that.Db.Query(`SELECT name FROM sqlite_master WHERE type='table' AND name='` + tableName + `'`)
return len(res) != 0
case "postgres":
res := that.Db.Query(`SELECT tablename FROM pg_tables WHERE schemaname='public' AND tablename='` + tableName + `'`)
return len(res) != 0
}
return false
}
// createMainTable 创建主表
func (that *CacheDb) createMainTable(dbType, tableName string) {
var createSQL string
switch dbType {
case "mysql":
createSQL = "CREATE TABLE `" + tableName + "` (" +
"`id` int(11) unsigned NOT NULL AUTO_INCREMENT," +
"`key` varchar(64) NOT NULL COMMENT '缓存键'," +
"`value` text DEFAULT NULL COMMENT '缓存值'," +
"`end_time` datetime DEFAULT NULL COMMENT '过期时间'," +
"`state` int(2) DEFAULT '0' COMMENT '状态:0-正常,1-异常,2-隐藏'," +
"`create_time` datetime DEFAULT NULL COMMENT '创建日期'," +
"`modify_time` datetime DEFAULT NULL COMMENT '变更时间'," +
"PRIMARY KEY (`id`)," +
"UNIQUE KEY `uk_key` (`key`)," +
"KEY `idx_end_time` (`end_time`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='缓存管理'"
case "sqlite":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"key" TEXT NOT NULL UNIQUE,
"value" TEXT,
"end_time" TEXT,
"state" INTEGER DEFAULT 0,
"create_time" TEXT,
"modify_time" TEXT
)`
case "postgres":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" SERIAL PRIMARY KEY,
"key" VARCHAR(64) NOT NULL UNIQUE,
"value" TEXT,
"end_time" TIMESTAMP,
"state" INTEGER DEFAULT 0,
"create_time" TIMESTAMP,
"modify_time" TIMESTAMP
)`
that.Db.Exec(createSQL)
// 创建索引
that.Db.Exec(`CREATE INDEX "idx_` + tableName + `_end_time" ON "` + tableName + `" ("end_time")`)
return
}
that.Db.Exec(createSQL)
}
// createHistoryTable 创建历史表
func (that *CacheDb) createHistoryTable(dbType, tableName string) {
var createSQL string
switch dbType {
case "mysql":
createSQL = "CREATE TABLE `" + tableName + "` (" +
"`id` int(11) unsigned NOT NULL AUTO_INCREMENT," +
"`hotime_cache_id` int(11) unsigned DEFAULT NULL COMMENT '缓存ID'," +
"`key` varchar(64) DEFAULT NULL COMMENT '缓存键'," +
"`value` text DEFAULT NULL COMMENT '缓存值'," +
"`end_time` datetime DEFAULT NULL COMMENT '过期时间'," +
"`state` int(2) DEFAULT '0' COMMENT '状态:0-正常,1-异常,2-隐藏'," +
"`create_time` datetime DEFAULT NULL COMMENT '创建日期'," +
"`modify_time` datetime DEFAULT NULL COMMENT '变更时间'," +
"PRIMARY KEY (`id`)," +
"KEY `idx_hotime_cache_id` (`hotime_cache_id`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='缓存历史'"
case "sqlite":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"hotime_cache_id" INTEGER,
"key" TEXT,
"value" TEXT,
"end_time" TEXT,
"state" INTEGER DEFAULT 0,
"create_time" TEXT,
"modify_time" TEXT
)`
case "postgres":
createSQL = `CREATE TABLE "` + tableName + `" (
"id" SERIAL PRIMARY KEY,
"hotime_cache_id" INTEGER,
"key" VARCHAR(64),
"value" TEXT,
"end_time" TIMESTAMP,
"state" INTEGER DEFAULT 0,
"create_time" TIMESTAMP,
"modify_time" TIMESTAMP
)`
that.Db.Exec(createSQL)
// 创建索引
that.Db.Exec(`CREATE INDEX "idx_` + tableName + `_cache_id" ON "` + tableName + `" ("hotime_cache_id")`)
return
}
that.Db.Exec(createSQL)
}
// migrateFromCached 从旧 cached 表迁移数据
func (that *CacheDb) migrateFromCached(dbType, oldTableName, newTableName string) {
var migrateSQL string
switch dbType {
case "mysql":
// 去重迁移:取每个 key 的最后一条记录id 最大)
migrateSQL = "INSERT INTO `" + newTableName + "` (`key`, `value`, `end_time`, `state`, `create_time`, `modify_time`) " +
"SELECT c.`key`, c.`value`, FROM_UNIXTIME(c.`endtime`), 0, " +
"FROM_UNIXTIME(c.`time` / 1000000000), FROM_UNIXTIME(c.`time` / 1000000000) " +
"FROM `" + oldTableName + "` c " +
"INNER JOIN (SELECT `key`, MAX(id) as max_id FROM `" + oldTableName + "` GROUP BY `key`) m " +
"ON c.id = m.max_id"
case "sqlite":
migrateSQL = `INSERT INTO "` + newTableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`SELECT c."key", c."value", datetime(c."endtime", 'unixepoch'), 0, ` +
`datetime(c."time" / 1000000000, 'unixepoch'), datetime(c."time" / 1000000000, 'unixepoch') ` +
`FROM "` + oldTableName + `" c ` +
`INNER JOIN (SELECT "key", MAX(id) as max_id FROM "` + oldTableName + `" GROUP BY "key") m ` +
`ON c.id = m.max_id`
case "postgres":
migrateSQL = `INSERT INTO "` + newTableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`SELECT c."key", c."value", to_timestamp(c."endtime"), 0, ` +
`to_timestamp(c."time" / 1000000000), to_timestamp(c."time" / 1000000000) ` +
`FROM "` + oldTableName + `" c ` +
`INNER JOIN (SELECT "key", MAX(id) as max_id FROM "` + oldTableName + `" GROUP BY "key") m ` +
`ON c.id = m.max_id`
}
// 执行迁移
_, err := that.Db.Exec(migrateSQL)
if err.GetError() == nil {
// 迁移成功,删除旧表
var dropSQL string
switch dbType {
case "mysql":
dropSQL = "DROP TABLE `" + oldTableName + "`"
case "sqlite", "postgres":
dropSQL = `DROP TABLE "` + oldTableName + `"`
}
that.Db.Exec(dropSQL)
}
}
// writeHistory 写入历史记录
func (that *CacheDb) writeHistory(key string) {
if !that.HistorySet {
return
}
tableName := that.getTableName()
historyTableName := that.getHistoryTableName()
// 查询当前数据
cached := that.Db.Get(tableName, "*", Map{"key": key})
if cached == nil {
return
}
// 构建历史记录数据
historyData := Map{
"hotime_cache_id": cached.GetInt64("id"),
"key": cached.GetString("key"),
"value": cached.GetString("value"),
"end_time": cached.GetString("end_time"),
"state": cached.GetInt("state"),
"create_time": cached.GetString("create_time"),
"modify_time": cached.GetString("modify_time"),
}
// 插入历史表
that.Db.Insert(historyTableName, historyData)
}
// get 获取缓存
func (that *CacheDb) get(key string) interface{} {
tableName := that.getTableName()
cached := that.Db.Get(tableName, "*", Map{"key": key})
if cached == nil {
return nil
}
// 使用字符串比较判断过期ISO 格式天然支持)
endTime := cached.GetString("end_time")
nowTime := Time2Str(time.Now())
if endTime != "" && endTime <= nowTime {
// 惰性删除:过期只返回 nil不立即删除
// 依赖随机清理批量删除过期数据
return nil
}
// 直接解析 value不再需要 {"data": value} 包装
valueStr := cached.GetString("value")
if valueStr == "" {
return nil
}
var data interface{}
err := json.Unmarshal([]byte(valueStr), &data)
if err != nil {
return nil
}
return data
}
// set 设置缓存
func (that *CacheDb) set(key string, value interface{}, endTime time.Time) {
// 直接序列化 value不再包装
bte, _ := json.Marshal(value)
nowTime := Time2Str(time.Now())
endTimeStr := Time2Str(endTime)
dbType := that.Db.GetType()
tableName := that.getTableName()
// 使用 UPSERT 语法解决并发问题
switch dbType {
case "mysql":
upsertSQL := "INSERT INTO `" + tableName + "` (`key`, `value`, `end_time`, `state`, `create_time`, `modify_time`) " +
"VALUES (?, ?, ?, 0, ?, ?) " +
"ON DUPLICATE KEY UPDATE `value`=VALUES(`value`), `end_time`=VALUES(`end_time`), `modify_time`=VALUES(`modify_time`)"
that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime)
case "sqlite":
// SQLite: INSERT OR REPLACE 会删除后插入,所以用 UPSERT 语法
upsertSQL := `INSERT INTO "` + tableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`VALUES (?, ?, ?, 0, ?, ?) ` +
`ON CONFLICT("key") DO UPDATE SET "value"=excluded."value", "end_time"=excluded."end_time", "modify_time"=excluded."modify_time"`
that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime)
case "postgres":
upsertSQL := `INSERT INTO "` + tableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` +
`VALUES ($1, $2, $3, 0, $4, $5) ` +
`ON CONFLICT ("key") DO UPDATE SET "value"=EXCLUDED."value", "end_time"=EXCLUDED."end_time", "modify_time"=EXCLUDED."modify_time"`
that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime)
default:
// 兼容其他数据库:使用 Update + Insert
num := that.Db.Update(tableName, Map{
"value": string(bte),
"end_time": endTimeStr,
"modify_time": nowTime,
}, Map{"key": key})
if num == 0 {
that.Db.Insert(tableName, Map{
"key": key,
"value": string(bte),
"end_time": endTimeStr,
"state": 0,
"create_time": nowTime,
"modify_time": nowTime,
})
}
}
// 写入历史记录
that.writeHistory(key)
// 随机执行删除过期数据命令5% 概率)
if Rand(1000) > 950 {
nowTimeStr := Time2Str(time.Now())
that.Db.Delete(tableName, Map{"end_time[<]": nowTimeStr})
}
}
// delete 删除缓存
func (that *CacheDb) delete(key string) {
tableName := that.getTableName()
del := strings.Index(key, "*")
// 如果通配删除
if del != -1 {
key = Substr(key, 0, del)
that.Db.Delete(tableName, Map{"key[~]": key + "%"})
} else {
that.Db.Delete(tableName, Map{"key": key})
}
}
// Cache 缓存操作入口
// 用法:
// - Cache(key) - 获取缓存
// - Cache(key, value) - 设置缓存(使用默认过期时间)
// - Cache(key, value, timeout) - 设置缓存(指定过期时间,单位:秒)
// - Cache(key, nil) - 删除缓存
func (that *CacheDb) Cache(key string, data ...interface{}) *Obj {
that.initDbTable()
// 获取缓存
if len(data) == 0 {
return &Obj{Data: that.get(key)}
}
// 删除缓存
if len(data) == 1 && data[0] == nil {
that.delete(key)
return &Obj{Data: nil}
}
// 计算过期时间
var timeout int64
if len(data) == 1 {
// 使用配置的 TimeOut如果为 0 则使用默认值
timeout = that.TimeOut
if timeout == 0 {
timeout = DefaultCacheTimeout
}
} else if len(data) >= 2 {
// 使用指定的超时时间
that.SetError(nil)
tempTimeout := ObjToInt64(data[1], that.Error)
if that.GetError() == nil && tempTimeout > 0 {
timeout = tempTimeout
} else {
timeout = that.TimeOut
if timeout == 0 {
timeout = DefaultCacheTimeout
}
}
}
endTime := time.Now().Add(time.Duration(timeout) * time.Second)
that.set(key, data[0], endTime)
return &Obj{Data: nil}
}