package cache import ( "database/sql" "encoding/json" "fmt" "os" "strings" "time" . "code.hoteas.com/golang/hotime/common" ) // #region agent log func debugLog(hypothesisId, location, message string, data map[string]interface{}) { logEntry := fmt.Sprintf(`{"hypothesisId":"%s","location":"%s","message":"%s","data":%s,"timestamp":%d,"sessionId":"debug-session"}`, hypothesisId, location, message, toJSON(data), time.Now().UnixMilli()) f, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if f != nil { f.WriteString(logEntry + "\n") f.Close() } } func toJSON(data map[string]interface{}) string { b, _ := json.Marshal(data) return string(b) } // #endregion // 表名常量 const ( CacheTableName = "hotime_cache" CacheHistoryTableName = "hotime_cache_history" DefaultCacheTimeout = 24 * 60 * 60 // 默认过期时间 24 小时 ) type HoTimeDBInterface interface { GetPrefix() string Query(query string, args ...interface{}) []Map Exec(query string, args ...interface{}) (sql.Result, *Error) Get(table string, qu ...interface{}) Map Select(table string, qu ...interface{}) []Map Delete(table string, data map[string]interface{}) int64 Update(table string, data Map, where Map) int64 Insert(table string, data map[string]interface{}) int64 GetType() string } type CacheDb struct { TimeOut int64 DbSet bool SessionSet bool HistorySet bool // 是否开启历史记录 Db HoTimeDBInterface *Error ContextBase isInit bool } func (that *CacheDb) GetError() *Error { return that.Error } func (that *CacheDb) SetError(err *Error) { that.Error = err } // getTableName 获取带前缀的表名 func (that *CacheDb) getTableName() string { return that.Db.GetPrefix() + CacheTableName } // getHistoryTableName 获取带前缀的历史表名 func (that *CacheDb) getHistoryTableName() string { return that.Db.GetPrefix() + CacheHistoryTableName } // initDbTable 初始化数据库表 func (that *CacheDb) initDbTable() { if that.isInit { return } dbType := that.Db.GetType() tableName := that.getTableName() historyTableName := that.getHistoryTableName() // #region agent log debugLog("F", "cache_db.go:initDbTable", "initDbTable started", map[string]interface{}{ "dbType": dbType, "tableName": tableName, "historyTableName": historyTableName, "HistorySet": that.HistorySet, }) // #endregion // 检查并创建主表 if !that.tableExists(tableName) { that.createMainTable(dbType, tableName) } // 检查并迁移旧 cached 表 oldTableName := that.Db.GetPrefix() + "cached" if that.tableExists(oldTableName) { that.migrateFromCached(dbType, oldTableName, tableName) } // 检查并创建历史表(开启历史记录时) historyTableExists := that.tableExists(historyTableName) // #region agent log debugLog("F", "cache_db.go:initDbTable", "history table check", map[string]interface{}{ "HistorySet": that.HistorySet, "historyTableName": historyTableName, "exists": historyTableExists, "shouldCreate": that.HistorySet && !historyTableExists, }) // #endregion if that.HistorySet && !historyTableExists { that.createHistoryTable(dbType, historyTableName) // #region agent log debugLog("F", "cache_db.go:initDbTable", "createHistoryTable called", map[string]interface{}{ "dbType": dbType, "historyTableName": historyTableName, }) // #endregion } that.isInit = true } // tableExists 检查表是否存在 func (that *CacheDb) tableExists(tableName string) bool { dbType := that.Db.GetType() switch dbType { case "mysql": dbNames := that.Db.Query("SELECT DATABASE()") if len(dbNames) == 0 { return false } dbName := dbNames[0].GetString("DATABASE()") res := that.Db.Query("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='" + dbName + "' AND TABLE_NAME='" + tableName + "'") return len(res) != 0 case "sqlite": res := that.Db.Query(`SELECT name FROM sqlite_master WHERE type='table' AND name='` + tableName + `'`) return len(res) != 0 case "postgres": res := that.Db.Query(`SELECT tablename FROM pg_tables WHERE schemaname='public' AND tablename='` + tableName + `'`) return len(res) != 0 } return false } // createMainTable 创建主表 func (that *CacheDb) createMainTable(dbType, tableName string) { var createSQL string switch dbType { case "mysql": createSQL = "CREATE TABLE `" + tableName + "` (" + "`id` int(11) unsigned NOT NULL AUTO_INCREMENT," + "`key` varchar(64) NOT NULL COMMENT '缓存键'," + "`value` text DEFAULT NULL COMMENT '缓存值'," + "`end_time` datetime DEFAULT NULL COMMENT '过期时间'," + "`state` int(2) DEFAULT '0' COMMENT '状态:0-正常,1-异常,2-隐藏'," + "`create_time` datetime DEFAULT NULL COMMENT '创建日期'," + "`modify_time` datetime DEFAULT NULL COMMENT '变更时间'," + "PRIMARY KEY (`id`)," + "UNIQUE KEY `uk_key` (`key`)," + "KEY `idx_end_time` (`end_time`)" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='缓存管理'" case "sqlite": createSQL = `CREATE TABLE "` + tableName + `" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "key" TEXT NOT NULL UNIQUE, "value" TEXT, "end_time" TEXT, "state" INTEGER DEFAULT 0, "create_time" TEXT, "modify_time" TEXT )` case "postgres": createSQL = `CREATE TABLE "` + tableName + `" ( "id" SERIAL PRIMARY KEY, "key" VARCHAR(64) NOT NULL UNIQUE, "value" TEXT, "end_time" TIMESTAMP, "state" INTEGER DEFAULT 0, "create_time" TIMESTAMP, "modify_time" TIMESTAMP )` that.Db.Exec(createSQL) // 创建索引 that.Db.Exec(`CREATE INDEX "idx_` + tableName + `_end_time" ON "` + tableName + `" ("end_time")`) return } that.Db.Exec(createSQL) } // createHistoryTable 创建历史表 func (that *CacheDb) createHistoryTable(dbType, tableName string) { var createSQL string switch dbType { case "mysql": createSQL = "CREATE TABLE `" + tableName + "` (" + "`id` int(11) unsigned NOT NULL AUTO_INCREMENT," + "`hotime_cache_id` int(11) unsigned DEFAULT NULL COMMENT '缓存ID'," + "`key` varchar(64) DEFAULT NULL COMMENT '缓存键'," + "`value` text DEFAULT NULL COMMENT '缓存值'," + "`end_time` datetime DEFAULT NULL COMMENT '过期时间'," + "`state` int(2) DEFAULT '0' COMMENT '状态:0-正常,1-异常,2-隐藏'," + "`create_time` datetime DEFAULT NULL COMMENT '创建日期'," + "`modify_time` datetime DEFAULT NULL COMMENT '变更时间'," + "PRIMARY KEY (`id`)," + "KEY `idx_hotime_cache_id` (`hotime_cache_id`)" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='缓存历史'" case "sqlite": createSQL = `CREATE TABLE "` + tableName + `" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "hotime_cache_id" INTEGER, "key" TEXT, "value" TEXT, "end_time" TEXT, "state" INTEGER DEFAULT 0, "create_time" TEXT, "modify_time" TEXT )` case "postgres": createSQL = `CREATE TABLE "` + tableName + `" ( "id" SERIAL PRIMARY KEY, "hotime_cache_id" INTEGER, "key" VARCHAR(64), "value" TEXT, "end_time" TIMESTAMP, "state" INTEGER DEFAULT 0, "create_time" TIMESTAMP, "modify_time" TIMESTAMP )` that.Db.Exec(createSQL) // 创建索引 that.Db.Exec(`CREATE INDEX "idx_` + tableName + `_cache_id" ON "` + tableName + `" ("hotime_cache_id")`) return } that.Db.Exec(createSQL) } // migrateFromCached 从旧 cached 表迁移数据 func (that *CacheDb) migrateFromCached(dbType, oldTableName, newTableName string) { var migrateSQL string switch dbType { case "mysql": // 去重迁移:取每个 key 的最后一条记录(id 最大) migrateSQL = "INSERT INTO `" + newTableName + "` (`key`, `value`, `end_time`, `state`, `create_time`, `modify_time`) " + "SELECT c.`key`, c.`value`, FROM_UNIXTIME(c.`endtime`), 0, " + "FROM_UNIXTIME(c.`time` / 1000000000), FROM_UNIXTIME(c.`time` / 1000000000) " + "FROM `" + oldTableName + "` c " + "INNER JOIN (SELECT `key`, MAX(id) as max_id FROM `" + oldTableName + "` GROUP BY `key`) m " + "ON c.id = m.max_id" case "sqlite": migrateSQL = `INSERT INTO "` + newTableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` + `SELECT c."key", c."value", datetime(c."endtime", 'unixepoch'), 0, ` + `datetime(c."time" / 1000000000, 'unixepoch'), datetime(c."time" / 1000000000, 'unixepoch') ` + `FROM "` + oldTableName + `" c ` + `INNER JOIN (SELECT "key", MAX(id) as max_id FROM "` + oldTableName + `" GROUP BY "key") m ` + `ON c.id = m.max_id` case "postgres": migrateSQL = `INSERT INTO "` + newTableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` + `SELECT c."key", c."value", to_timestamp(c."endtime"), 0, ` + `to_timestamp(c."time" / 1000000000), to_timestamp(c."time" / 1000000000) ` + `FROM "` + oldTableName + `" c ` + `INNER JOIN (SELECT "key", MAX(id) as max_id FROM "` + oldTableName + `" GROUP BY "key") m ` + `ON c.id = m.max_id` } // 执行迁移 _, err := that.Db.Exec(migrateSQL) if err.GetError() == nil { // 迁移成功,删除旧表 var dropSQL string switch dbType { case "mysql": dropSQL = "DROP TABLE `" + oldTableName + "`" case "sqlite", "postgres": dropSQL = `DROP TABLE "` + oldTableName + `"` } that.Db.Exec(dropSQL) } } // writeHistory 写入历史记录 func (that *CacheDb) writeHistory(key string) { if !that.HistorySet { return } tableName := that.getTableName() historyTableName := that.getHistoryTableName() // 查询当前数据 cached := that.Db.Get(tableName, "*", Map{"key": key}) if cached == nil { return } // 构建历史记录数据 historyData := Map{ "hotime_cache_id": cached.GetInt64("id"), "key": cached.GetString("key"), "value": cached.GetString("value"), "end_time": cached.GetString("end_time"), "state": cached.GetInt("state"), "create_time": cached.GetString("create_time"), "modify_time": cached.GetString("modify_time"), } // 插入历史表 that.Db.Insert(historyTableName, historyData) // #region agent log logFile3, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if logFile3 != nil { fmt.Fprintf(logFile3, `{"hypothesisId":"C","location":"cache_db.go:writeHistory","message":"history written","data":{"key":"%s","cacheId":%d},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, cached.GetInt64("id"), time.Now().UnixMilli()) logFile3.Close() } // #endregion } // get 获取缓存 func (that *CacheDb) get(key string) interface{} { tableName := that.getTableName() cached := that.Db.Get(tableName, "*", Map{"key": key}) // #region agent log logFile4, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if logFile4 != nil { found := cached != nil fmt.Fprintf(logFile4, `{"hypothesisId":"D","location":"cache_db.go:get","message":"get query","data":{"key":"%s","found":%t},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, found, time.Now().UnixMilli()) logFile4.Close() } // #endregion if cached == nil { return nil } // 使用字符串比较判断过期(ISO 格式天然支持) endTime := cached.GetString("end_time") nowTime := Time2Str(time.Now()) if endTime != "" && endTime <= nowTime { // 惰性删除:过期只返回 nil,不立即删除 // 依赖随机清理批量删除过期数据 return nil } // 直接解析 value,不再需要 {"data": value} 包装 valueStr := cached.GetString("value") if valueStr == "" { return nil } var data interface{} err := json.Unmarshal([]byte(valueStr), &data) if err != nil { return nil } // #region agent log logFile5, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if logFile5 != nil { fmt.Fprintf(logFile5, `{"hypothesisId":"D","location":"cache_db.go:get","message":"get success","data":{"key":"%s","hasData":true},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, time.Now().UnixMilli()) logFile5.Close() } // #endregion return data } // set 设置缓存 func (that *CacheDb) set(key string, value interface{}, endTime time.Time) { // 直接序列化 value,不再包装 bte, _ := json.Marshal(value) nowTime := Time2Str(time.Now()) endTimeStr := Time2Str(endTime) dbType := that.Db.GetType() tableName := that.getTableName() // 使用 UPSERT 语法解决并发问题 switch dbType { case "mysql": upsertSQL := "INSERT INTO `" + tableName + "` (`key`, `value`, `end_time`, `state`, `create_time`, `modify_time`) " + "VALUES (?, ?, ?, 0, ?, ?) " + "ON DUPLICATE KEY UPDATE `value`=VALUES(`value`), `end_time`=VALUES(`end_time`), `modify_time`=VALUES(`modify_time`)" that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime) case "sqlite": // SQLite: INSERT OR REPLACE 会删除后插入,所以用 UPSERT 语法 upsertSQL := `INSERT INTO "` + tableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` + `VALUES (?, ?, ?, 0, ?, ?) ` + `ON CONFLICT("key") DO UPDATE SET "value"=excluded."value", "end_time"=excluded."end_time", "modify_time"=excluded."modify_time"` that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime) case "postgres": upsertSQL := `INSERT INTO "` + tableName + `" ("key", "value", "end_time", "state", "create_time", "modify_time") ` + `VALUES ($1, $2, $3, 0, $4, $5) ` + `ON CONFLICT ("key") DO UPDATE SET "value"=EXCLUDED."value", "end_time"=EXCLUDED."end_time", "modify_time"=EXCLUDED."modify_time"` that.Db.Exec(upsertSQL, key, string(bte), endTimeStr, nowTime, nowTime) default: // 兼容其他数据库:使用 Update + Insert num := that.Db.Update(tableName, Map{ "value": string(bte), "end_time": endTimeStr, "modify_time": nowTime, }, Map{"key": key}) if num == 0 { that.Db.Insert(tableName, Map{ "key": key, "value": string(bte), "end_time": endTimeStr, "state": 0, "create_time": nowTime, "modify_time": nowTime, }) } } // #region agent log logFile2, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if logFile2 != nil { fmt.Fprintf(logFile2, `{"hypothesisId":"B","location":"cache_db.go:set","message":"set completed","data":{"key":"%s","endTime":"%s","dbType":"%s"},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, endTimeStr, dbType, time.Now().UnixMilli()) logFile2.Close() } // #endregion // 写入历史记录 that.writeHistory(key) // 随机执行删除过期数据命令(5% 概率) if Rand(1000) > 950 { nowTimeStr := Time2Str(time.Now()) that.Db.Delete(tableName, Map{"end_time[<]": nowTimeStr}) } } // delete 删除缓存 func (that *CacheDb) delete(key string) { tableName := that.getTableName() del := strings.Index(key, "*") // 如果通配删除 if del != -1 { key = Substr(key, 0, del) that.Db.Delete(tableName, Map{"key[~]": key + "%"}) } else { that.Db.Delete(tableName, Map{"key": key}) } } // Cache 缓存操作入口 // 用法: // - Cache(key) - 获取缓存 // - Cache(key, value) - 设置缓存(使用默认过期时间) // - Cache(key, value, timeout) - 设置缓存(指定过期时间,单位:秒) // - Cache(key, nil) - 删除缓存 func (that *CacheDb) Cache(key string, data ...interface{}) *Obj { that.initDbTable() // #region agent log logFile, _ := os.OpenFile(`d:\work\hotimev1.5\.cursor\debug.log`, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if logFile != nil { op := "get" if len(data) == 1 && data[0] == nil { op = "delete" } else if len(data) >= 1 { op = "set" } fmt.Fprintf(logFile, `{"hypothesisId":"A","location":"cache_db.go:Cache","message":"Cache called","data":{"key":"%s","operation":"%s","dataLen":%d},"timestamp":%d,"sessionId":"debug-session"}`+"\n", key, op, len(data), time.Now().UnixMilli()) logFile.Close() } // #endregion // 获取缓存 if len(data) == 0 { return &Obj{Data: that.get(key)} } // 删除缓存 if len(data) == 1 && data[0] == nil { that.delete(key) return &Obj{Data: nil} } // 计算过期时间 var timeout int64 if len(data) == 1 { // 使用配置的 TimeOut,如果为 0 则使用默认值 timeout = that.TimeOut if timeout == 0 { timeout = DefaultCacheTimeout } } else if len(data) >= 2 { // 使用指定的超时时间 that.SetError(nil) tempTimeout := ObjToInt64(data[1], that.Error) if that.GetError() == nil && tempTimeout > 0 { timeout = tempTimeout } else { timeout = that.TimeOut if timeout == 0 { timeout = DefaultCacheTimeout } } } endTime := time.Now().Add(time.Duration(timeout) * time.Second) that.set(key, data[0], endTime) return &Obj{Data: nil} }