package cache import ( "database/sql" "encoding/json" "os" "strings" "time" . "code.hoteas.com/golang/hotime/common" ) // debugLogDb 写入调试日志 func debugLogDb(hypothesisId, location, message string, data map[string]interface{}) { // #region agent log logFile, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if logFile != nil { logEntry, _ := json.Marshal(map[string]interface{}{ "sessionId": "cache-db-debug", "runId": "test-run", "hypothesisId": hypothesisId, "location": location, "message": message, "data": data, "timestamp": time.Now().UnixMilli(), }) logFile.Write(append(logEntry, '\n')) logFile.Close() } // #endregion } // 表名常量 const ( CacheTableName = "hotime_cache" CacheHistoryTableName = "hotime_cache_history" LegacyCacheTableName = "cached" // 老版本缓存表名 DefaultCacheTimeout = 24 * 60 * 60 // 默认过期时间 24 小时 CacheModeNew = "new" // 新模式:只使用新表 CacheModeCompatible = "compatible" // 兼容模式:写新读老 ) type HoTimeDBInterface interface { GetPrefix() string Query(query string, args ...interface{}) []Map Exec(query string, args ...interface{}) (sql.Result, *Error) Get(table string, qu ...interface{}) Map Select(table string, qu ...interface{}) []Map Delete(table string, data map[string]interface{}) int64 Update(table string, data Map, where Map) int64 Insert(table string, data map[string]interface{}) int64 GetType() string } type CacheDb struct { TimeOut int64 DbSet bool SessionSet bool HistorySet bool // 是否开启历史记录 Mode string // 缓存模式:"new"(默认,只用新表) 或 "compatible"(写新读老) Db HoTimeDBInterface *Error ContextBase isInit bool } func (that *CacheDb) GetError() *Error { return that.Error } func (that *CacheDb) SetError(err *Error) { that.Error = err } // getTableName 获取带前缀的表名 func (that *CacheDb) getTableName() string { return that.Db.GetPrefix() + CacheTableName } // getHistoryTableName 获取带前缀的历史表名 func (that *CacheDb) getHistoryTableName() string { return that.Db.GetPrefix() + CacheHistoryTableName } // getLegacyTableName 获取带前缀的老版本缓存表名 func (that *CacheDb) getLegacyTableName() string { return that.Db.GetPrefix() + LegacyCacheTableName } // isCompatibleMode 是否为兼容模式 func (that *CacheDb) isCompatibleMode() bool { return that.Mode == CacheModeCompatible } // getEffectiveMode 获取有效模式(默认为 new) func (that *CacheDb) getEffectiveMode() string { if that.Mode == CacheModeCompatible { return CacheModeCompatible } return CacheModeNew } // initDbTable 初始化数据库表 func (that *CacheDb) initDbTable() { if that.isInit { return } dbType := that.Db.GetType() tableName := that.getTableName() historyTableName := that.getHistoryTableName() legacyTableName := that.getLegacyTableName() // 检查并创建主表 if !that.tableExists(tableName) { that.createMainTable(dbType, tableName) } // 根据模式处理老表 // new 模式:迁移老表数据到新表(不删除老表,由人工删除) // compatible 模式:不迁移,老表继续使用 if that.getEffectiveMode() == CacheModeNew { if that.tableExists(legacyTableName) { that.migrateFromCached(dbType, legacyTableName, tableName) } } // compatible 模式不做任何处理,老表保留供读取 // 检查并创建历史表(开启历史记录时) if that.HistorySet && !that.tableExists(historyTableName) { that.createHistoryTable(dbType, historyTableName) } that.isInit = true } // tableExists 检查表是否存在 func (that *CacheDb) tableExists(tableName string) bool { dbType := that.Db.GetType() switch dbType { case "mysql": dbNames := that.Db.Query("SELECT DATABASE()") if len(dbNames) == 0 { return false } dbName := dbNames[0].GetString("DATABASE()") res := that.Db.Query("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='" + dbName + "' AND TABLE_NAME='" + tableName + "'") return len(res) != 0 case "sqlite": res := that.Db.Query(`SELECT name FROM sqlite_master WHERE type='table' AND name='` + tableName + `'`) return len(res) != 0 case "postgres": res := that.Db.Query(`SELECT tablename FROM pg_tables WHERE schemaname='public' AND tablename='` + tableName + `'`) return len(res) != 0 } return false } // createMainTable 创建主表 func (that *CacheDb) createMainTable(dbType, tableName string) { var createSQL string switch dbType { case "mysql": createSQL = "CREATE TABLE `" + tableName + "` (" + "`id` int(11) unsigned NOT NULL AUTO_INCREMENT," + "`key` varchar(64) NOT NULL COMMENT '缓存键'," + "`value` text DEFAULT NULL COMMENT '缓存值'," + "`end_time` datetime DEFAULT NULL COMMENT '过期时间'," + "`state` int(2) DEFAULT '0' COMMENT '状态:0-正常,1-异常,2-隐藏'," + "`create_time` datetime DEFAULT NULL COMMENT '创建日期'," + "`modify_time` datetime DEFAULT NULL COMMENT '变更时间'," + "PRIMARY KEY (`id`)," + "UNIQUE KEY `uk_key` (`key`)," + "KEY `idx_end_time` (`end_time`)" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='缓存管理'" case "sqlite": createSQL = `CREATE TABLE "` + tableName + `" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "key" TEXT NOT NULL UNIQUE, "value" TEXT, "end_time" TEXT, "state" INTEGER DEFAULT 0, "create_time" TEXT, "modify_time" TEXT )` case "postgres": createSQL = `CREATE TABLE "` + tableName + `" ( "id" SERIAL PRIMARY KEY, "key" VARCHAR(64) NOT NULL UNIQUE, "value" TEXT, "end_time" TIMESTAMP, "state" INTEGER DEFAULT 0, "create_time" TIMESTAMP, "modify_time" TIMESTAMP )` that.Db.Exec(createSQL) // 创建索引 that.Db.Exec(`CREATE INDEX "idx_` + tableName + `_end_time" ON "` + tableName + `" ("end_time")`) return } that.Db.Exec(createSQL) } // createHistoryTable 创建历史表 func (that *CacheDb) createHistoryTable(dbType, tableName string) { var createSQL string switch dbType { case "mysql": createSQL = "CREATE TABLE `" + tableName + "` (" + "`id` int(11) unsigned NOT NULL AUTO_INCREMENT," + "`hotime_cache_id` int(11) unsigned DEFAULT NULL COMMENT '缓存ID'," + "`key` varchar(64) DEFAULT NULL COMMENT '缓存键'," + "`value` text DEFAULT NULL COMMENT '缓存值'," + "`end_time` datetime DEFAULT NULL COMMENT '过期时间'," + "`state` int(2) DEFAULT '0' COMMENT '状态:0-正常,1-异常,2-隐藏'," + "`create_time` datetime DEFAULT NULL COMMENT '创建日期'," + "`modify_time` datetime DEFAULT NULL COMMENT '变更时间'," + "PRIMARY KEY (`id`)," + "KEY `idx_hotime_cache_id` (`hotime_cache_id`)" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='缓存历史'" case "sqlite": createSQL = `CREATE TABLE "` + tableName + `" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "hotime_cache_id" INTEGER, "key" TEXT, "value" TEXT, "end_time" TEXT, "state" INTEGER DEFAULT 0, "create_time" TEXT, "modify_time" TEXT )` case "postgres": createSQL = `CREATE TABLE "` + tableName + `" ( "id" SERIAL PRIMARY KEY, "hotime_cache_id" INTEGER, "key" VARCHAR(64), "value" TEXT, "end_time" TIMESTAMP, "state" INTEGER DEFAULT 0, "create_time" TIMESTAMP, "modify_time" TIMESTAMP )` that.Db.Exec(createSQL) // 创建索引 that.Db.Exec(`CREATE INDEX "idx_` + tableName + `_cache_id" ON "` + tableName + `" ("hotime_cache_id")`) return } that.Db.Exec(createSQL) } // migrateFromCached 从旧 cached 表迁移数据(不删除老表,由人工删除) func (that *CacheDb) migrateFromCached(dbType, oldTableName, newTableName string) { var migrateSQL string switch dbType { case "mysql": // 去重迁移:取每个 key 的最后一条记录(id 最大) // 使用 INSERT IGNORE 避免重复 key 冲突 migrateSQL = "INSERT IGNORE INTO `" + newTableName + "` (`key`, `value`, `end_time`, `state`, `create_time`, `modify_time`) " + "SELECT c.`key`, c.`value`, FROM_UNIXTIME(c.`endtime`), 0, " + "FROM_UNIXTIME(c.`time` / 1000000000), FROM_UNIXTIME(c.`time` / 1000000000) " + "FROM `" + oldTableName + "` c " + "INNER JOIN (SELECT `key`, MAX(id) as max_id FROM `" + oldTableName + "` GROUP BY `key`) m " + "ON c.id = m.max_id" case "sqlite": migrateSQL = `INSERT OR IGNORE INTO "` + newTableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` + `SELECT c."key", c."value", datetime(c."endtime", 'unixepoch'), 0, ` + `datetime(c."time" / 1000000000, 'unixepoch'), datetime(c."time" / 1000000000, 'unixepoch') ` + `FROM "` + oldTableName + `" c ` + `INNER JOIN (SELECT "key", MAX(id) as max_id FROM "` + oldTableName + `" GROUP BY "key") m ` + `ON c.id = m.max_id` case "postgres": migrateSQL = `INSERT INTO "` + newTableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` + `SELECT c."key", c."value", to_timestamp(c."endtime"), 0, ` + `to_timestamp(c."time" / 1000000000), to_timestamp(c."time" / 1000000000) ` + `FROM "` + oldTableName + `" c ` + `INNER JOIN (SELECT "key", MAX(id) as max_id FROM "` + oldTableName + `" GROUP BY "key") m ` + `ON c.id = m.max_id ` + `ON CONFLICT ("key") DO NOTHING` } // 执行迁移,不删除老表(由人工确认后删除,更安全) that.Db.Exec(migrateSQL) } // writeHistory 写入历史记录 func (that *CacheDb) writeHistory(key string) { if !that.HistorySet { return } tableName := that.getTableName() historyTableName := that.getHistoryTableName() // 查询当前数据 cached := that.Db.Get(tableName, "*", Map{"key": key}) if cached == nil { return } // 构建历史记录数据 historyData := Map{ "hotime_cache_id": cached.GetInt64("id"), "key": cached.GetString("key"), "value": cached.GetString("value"), "end_time": cached.GetString("end_time"), "state": cached.GetInt("state"), "create_time": cached.GetString("create_time"), "modify_time": cached.GetString("modify_time"), } // 插入历史表 that.Db.Insert(historyTableName, historyData) } // getLegacy 从老表获取缓存(兼容模式使用) // 老表结构:key, value, endtime(unix秒), time(纳秒时间戳) func (that *CacheDb) getLegacy(key string) interface{} { legacyTableName := that.getLegacyTableName() // 检查老表是否存在 if !that.tableExists(legacyTableName) { return nil } cached := that.Db.Get(legacyTableName, "*", Map{"key": key}) if cached == nil { return nil } // 检查过期时间(老表使用 unix 时间戳) endTime := cached.GetInt64("endtime") nowUnix := time.Now().Unix() if endTime > 0 && endTime <= nowUnix { // 已过期,删除该条记录 that.deleteLegacy(key) return nil } // 解析 value(老表 value 格式可能是 {"data": xxx} 或直接值) valueStr := cached.GetString("value") if valueStr == "" { return nil } var data interface{} err := json.Unmarshal([]byte(valueStr), &data) if err != nil { return nil } // 兼容老版本 {"data": xxx} 包装格式 if dataMap, ok := data.(map[string]interface{}); ok { if innerData, exists := dataMap["data"]; exists { return innerData } } return data } // deleteLegacy 从老表删除缓存(兼容模式使用) func (that *CacheDb) deleteLegacy(key string) { legacyTableName := that.getLegacyTableName() // 检查老表是否存在 if !that.tableExists(legacyTableName) { return } del := strings.Index(key, "*") // 如果通配删除 if del != -1 { keyPrefix := Substr(key, 0, del) that.Db.Delete(legacyTableName, Map{"key[~]": keyPrefix + "%"}) } else { that.Db.Delete(legacyTableName, Map{"key": key}) } } // get 获取缓存 func (that *CacheDb) get(key string) interface{} { tableName := that.getTableName() cached := that.Db.Get(tableName, "*", Map{"key": key}) if cached != nil { // 使用字符串比较判断过期(ISO 格式天然支持) endTime := cached.GetString("end_time") nowTime := Time2Str(time.Now()) if endTime != "" && endTime <= nowTime { // 惰性删除:过期只返回 nil,不立即删除 // 依赖随机清理批量删除过期数据 // 继续检查老表(如果是兼容模式) } else { // 直接解析 value,不再需要 {"data": value} 包装 valueStr := cached.GetString("value") if valueStr != "" { var data interface{} err := json.Unmarshal([]byte(valueStr), &data) if err == nil { return data } } } } // 兼容模式:新表没有数据时,回退读取老表 if that.isCompatibleMode() { return that.getLegacy(key) } return nil } // set 设置缓存 func (that *CacheDb) set(key string, value interface{}, endTime time.Time) { // 直接序列化 value,不再包装 bte, _ := json.Marshal(value) nowTime := Time2Str(time.Now()) endTimeStr := Time2Str(endTime) dbType := that.Db.GetType() tableName := that.getTableName() // 使用 UPSERT 语法解决并发问题 switch dbType { case "mysql": upsertSQL := "INSERT INTO `" + tableName + "` (`key`, `value`, `end_time`, `state`, `create_time`, `modify_time`) " + "VALUES (?, ?, ?, 0, ?, ?) " + "ON DUPLICATE KEY UPDATE `value`=VALUES(`value`), `end_time`=VALUES(`end_time`), `modify_time`=VALUES(`modify_time`)" that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime) case "sqlite": // SQLite: INSERT OR REPLACE 会删除后插入,所以用 UPSERT 语法 upsertSQL := `INSERT INTO "` + tableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` + `VALUES (?, ?, ?, 0, ?, ?) ` + `ON CONFLICT("key") DO UPDATE SET "value"=excluded."value", "end_time"=excluded."end_time", "modify_time"=excluded."modify_time"` that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime) case "postgres": upsertSQL := `INSERT INTO "` + tableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` + `VALUES ($1, $2, $3, 0, $4, $5) ` + `ON CONFLICT ("key") DO UPDATE SET "value"=EXCLUDED."value", "end_time"=EXCLUDED."end_time", "modify_time"=EXCLUDED."modify_time"` that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime) default: // 兼容其他数据库:使用 Update + Insert num := that.Db.Update(tableName, Map{ "value": string(bte), "end_time": endTimeStr, "modify_time": nowTime, }, Map{"key": key}) if num == 0 { that.Db.Insert(tableName, Map{ "key": key, "value": string(bte), "end_time": endTimeStr, "state": 0, "create_time": nowTime, "modify_time": nowTime, }) } } // 写入历史记录 that.writeHistory(key) // 兼容模式:写新表后删除老表同 key 记录(加速老数据消亡) if that.isCompatibleMode() { that.deleteLegacy(key) } // 随机执行删除过期数据命令(5% 概率) if Rand(1000) > 950 { nowTimeStr := Time2Str(time.Now()) that.Db.Delete(tableName, Map{"end_time[<]": nowTimeStr}) } } // delete 删除缓存 func (that *CacheDb) delete(key string) { tableName := that.getTableName() del := strings.Index(key, "*") // 如果通配删除 if del != -1 { keyPrefix := Substr(key, 0, del) that.Db.Delete(tableName, Map{"key[~]": keyPrefix + "%"}) } else { that.Db.Delete(tableName, Map{"key": key}) } // 兼容模式:同时删除老表中的 key(避免回退读到老数据) if that.isCompatibleMode() { that.deleteLegacy(key) } } // Cache 缓存操作入口 // 用法: // - Cache(key) - 获取缓存 // - Cache(key, value) - 设置缓存(使用默认过期时间) // - Cache(key, value, timeout) - 设置缓存(指定过期时间,单位:秒) // - Cache(key, nil) - 删除缓存 func (that *CacheDb) Cache(key string, data ...interface{}) *Obj { that.initDbTable() // 获取缓存 if len(data) == 0 { return &Obj{Data: that.get(key)} } // 删除缓存 if len(data) == 1 && data[0] == nil { that.delete(key) return &Obj{Data: nil} } // 计算过期时间 var timeout int64 if len(data) == 1 { // 使用配置的 TimeOut,如果为 0 则使用默认值 timeout = that.TimeOut if timeout == 0 { timeout = DefaultCacheTimeout } } else if len(data) >= 2 { // 使用指定的超时时间 var err Error tempTimeout := ObjToInt64(data[1], &err) if err.GetError() == nil && tempTimeout > 0 { timeout = tempTimeout } else { timeout = that.TimeOut if timeout == 0 { timeout = DefaultCacheTimeout } } } endTime := time.Now().Add(time.Duration(timeout) * time.Second) that.set(key, data[0], endTime) return &Obj{Data: nil} } // CachesGet 批量获取缓存(使用 IN 查询优化) // 返回 Map,key 为缓存键,value 为缓存值(不存在或过期的 key 不包含在结果中) func (that *CacheDb) CachesGet(keys []string) Map { that.initDbTable() result := make(Map, len(keys)) if len(keys) == 0 { return result } // #region agent log debugLogDb("E", "cache_db.go:CachesGet:start", "CacheDb.CachesGet开始", map[string]interface{}{ "keys_count": len(keys), "keys": keys, }) // #endregion tableName := that.getTableName() nowTime := Time2Str(time.Now()) // 使用 IN 查询批量获取 cachedList := that.Db.Select(tableName, "*", Map{ "key": keys, "end_time[>]": nowTime, }) // #region agent log debugLogDb("E", "cache_db.go:CachesGet:afterSelect", "DB Select完成", map[string]interface{}{ "table": tableName, "now_time": nowTime, "found_rows": len(cachedList), }) // #endregion for _, cached := range cachedList { valueStr := cached.GetString("value") if valueStr != "" { var data interface{} err := json.Unmarshal([]byte(valueStr), &data) if err == nil { result[cached.GetString("key")] = data } else { // #region agent log debugLogDb("E", "cache_db.go:CachesGet:unmarshalError", "JSON解析失败", map[string]interface{}{ "key": cached.GetString("key"), "error": err.Error(), }) // #endregion } } } // 兼容模式:新表没有的 key,回退读取老表 if that.isCompatibleMode() { // #region agent log debugLogDb("E", "cache_db.go:CachesGet:compatMode", "兼容模式检查老表", nil) // #endregion for _, key := range keys { if _, exists := result[key]; !exists { legacyData := that.getLegacy(key) if legacyData != nil { result[key] = legacyData } } } } // #region agent log debugLogDb("E", "cache_db.go:CachesGet:end", "CacheDb.CachesGet完成", map[string]interface{}{ "result_count": len(result), }) // #endregion return result } // CachesSet 批量设置缓存 // data: Map,key 为缓存键,value 为缓存值 // timeout: 可选过期时间(秒),不传则使用默认超时时间 func (that *CacheDb) CachesSet(data Map, timeout ...int64) { that.initDbTable() if len(data) == 0 { return } // #region agent log debugLogDb("A", "cache_db.go:CachesSet:start", "CacheDb.CachesSet开始", map[string]interface{}{ "data_count": len(data), }) // #endregion // 计算过期时间 var tim int64 if len(timeout) > 0 && timeout[0] > 0 { tim = timeout[0] } else { tim = that.TimeOut if tim == 0 { tim = DefaultCacheTimeout } } endTime := time.Now().Add(time.Duration(tim) * time.Second) // 逐个设置(保持事务一致性和历史记录) for key, value := range data { that.set(key, value, endTime) } // #region agent log debugLogDb("A", "cache_db.go:CachesSet:end", "CacheDb.CachesSet完成", map[string]interface{}{ "data_count": len(data), "end_time": Time2Str(endTime), }) // #endregion } // CachesDelete 批量删除缓存 func (that *CacheDb) CachesDelete(keys []string) { that.initDbTable() if len(keys) == 0 { return } tableName := that.getTableName() // 使用 IN 条件批量删除 that.Db.Delete(tableName, Map{"key": keys}) // 兼容模式:同时删除老表中的 keys if that.isCompatibleMode() { legacyTableName := that.getLegacyTableName() if that.tableExists(legacyTableName) { that.Db.Delete(legacyTableName, Map{"key": keys}) } } }