本文改造的版本为 goInception V1.3.0 版本。
改造核心逻辑可概括为 “扩展字段 + 多源解析 + 兼容原有”:
扩展执行计划存储结构体 ,承接 OB 专属的 Query Plan 字段;
优化解析流程 ,从文本格式的 Query Plan 中提取预估行数;
保持原有逻辑不变 ,通过条件判断实现多数据库兼容。
goInception 的 sql 执行计划 explain 处理在 session/session_inception.go 文件中。
1、扩展 ExplainInfo 结构体:存储 OB 专属执行计划
首先在 session/common.go 中扩展执行计划存储结构体 ,新增 ObPlan 字段专门承接 OB 的 Query Plan 文本:
type ExplainInfo struct { SelectType string `gorm:"Column:select_type"` Table string `gorm:"Column:table"` Partitions string `gorm:"Column:partitions"` Type string `gorm:"Column:type"` PossibleKeys string `gorm:"Column:possible_keys"` Key string `gorm:"Column:key"` KeyLen string `gorm:"Column:key_len"` Ref string `gorm:"Column:ref"` Rows int64 `gorm:"Column:rows"` Filtered float32 `gorm:"Column:filtered"` Extra string `gorm:"Column:Extra"`
Count string `gorm:"Column:count"` EstRows string `gorm:"Column:estRows"` ObPlan sql.NullString `gorm:"Column:Query Plan"`}
采用 sql.NullString 类型可兼容 Query Plan 为 NULL 的场景 ,避免空指针异常 ,避免影响原有mysql ,tidb审核逻辑。
2、优化 getExplainInfo 函数:调度多源解析逻辑
在 session/session_inception.go 中优化核心解析函数 ,实现 “MySQL/TiDB 原有逻辑 + OB专属逻辑” 的分支处理:
func (s *session) getExplainInfo(sql string, sqlId string) { if s.hasError() { return }
var newRecord *Record if s.inc.EnableFingerprint && sqlId != "" { newRecord = &Record{ Buf: new(bytes.Buffer), } } r := s.myRecord
var rows []ExplainInfo if err := s.rawScan(sql, &rows); err != nil { if myErr, ok := err.(*mysqlDriver.MySQLError); ok { s.appendErrorMessage(myErr.Message) if newRecord != nil { newRecord.appendErrorMessage(myErr.Message) } } else { s.appendErrorMessage(err.Error()) if newRecord != nil { newRecord.appendErrorMessage(err.Error()) } } }
if len(rows) > 0 { if s.inc.ExplainRule == "max" { r.AffectedRows = 0 for _, row := range rows { if row.Rows == 0 { if row.Count != "" { if f, err := strconv.ParseFloat(row.Count, 64); err == nil { row.Rows = int64(f) } } else if row.EstRows != "" { if v, err := strconv.ParseFloat(row.EstRows, 64); err == nil { row.Rows = int64(v) } } else if row.ObPlan.Valid { row.Rows = ObRowAffect(row.ObPlan) } } r.AffectedRows = Max64(r.AffectedRows, row.Rows) } } else { row := rows[0] if row.Rows == 0 { if row.Count != "" { if f, err := strconv.ParseFloat(row.Count, 64); err == nil { row.Rows = int64(f) } } else if row.EstRows != "" { if v, err := strconv.ParseFloat(row.EstRows, 64); err == nil { row.Rows = int64(v) } } else if row.ObPlan.Valid { row.Rows = ObRowAffect(row.ObPlan) } } r.AffectedRows = row.Rows }
if newRecord != nil { newRecord.AffectedRows = r.AffectedRows } }
if s.inc.MaxUpdateRows > 0 && r.AffectedRows > int64(s.inc.MaxUpdateRows) { switch r.Type.(type) { case *ast.DeleteStmt, *ast.UpdateStmt: s.appendErrorNo(ER_UDPATE_TOO_MUCH_ROWS, r.AffectedRows, s.inc.MaxUpdateRows)
if newRecord != nil { newRecord.appendErrorNo(s.inc.Lang, ER_UDPATE_TOO_MUCH_ROWS, r.AffectedRows, s.inc.MaxUpdateRows) } } }
if newRecord != nil { s.sqlFingerprint[sqlId] = newRecord }}
改造点在增加两次if row.ObPlan.Valid { row.Rows = ObRowAffect(row.ObPlan)}。
3、核心解析:ObRowAffect 函数提取 OB 预估行数
这也是魔改的关键 ,也在 session/session_inception.go 添加如下函数 ,专门解析 OB 文本表格格式的 Query Plan ,提取最大预估行数:
func ObRowAffect(plan sql.NullString) int64 { if !plan.Valid { return 0 } r := strings.NewReader(plan.String) br := bufio.NewReader(r) estrows := make([]string, 0) for { l, e := br.ReadString('\n') if e != nil && len(l) == 0 { break } if strings.HasPrefix(l, "|") { r := strings.Split(l, "|") estrows = append(estrows, strings.TrimSpace(r[4])) } }
var estrowMax int for i := 1; i len(estrows); i++ { estrow, err := strconv.Atoi(estrows[i]) if err != nil { continue } estrowMax = max(estrow, estrowMax) } return int64(estrowMax)}
func max(a, b int) int { if a > b { return a } return b}
魔改这两个文件后后 ,编译后就可以直接使用了 ,然后数据库工单系统(python开发)中需通 Python封装统—访问GoInception类。
from app.common.utils.db_conn.mysql_conn import OpenMysqlDb
class GoInception: def __init__(self) -> None: self.go_inception_host = "localhost" self.go_inception_user = "root" self.go_inception_password = "" self.go_inception_port = 4000 self.go_inception_db_name = "" self.commit = False
def check_sql(self, host: str, user: str, password: str, port: int, database: str, sqls: str): sql = f"""/*--host='{host}';--port={port};--user={user};--password='{password}';--check=1;max_insert_rows=10;*/ inception_magic_start; use `{database}`; {sqls}; inception_magic_commit; """ with OpenMysqlDb( host=self.go_inception_host, user=self.go_inception_user, port=self.go_inception_port, password=self.go_inception_password, db_name=self.go_inception_db_name, commit=self.commit, ) as conn: conn.ping() return conn.db_query(sql=sql)
def execute_sql(self, host: str, user: str, password: str, port: int, database: str, sqls: str, backup=0, ignore_warnings=0, fingerprint=0): sql = f"""/*--host='{host}';--port={port};--user='{user}';--password='{password}';--execute=1;backup={backup};ignore_warnings={ignore_warnings};fingerprint={fingerprint};*/ inception_magic_start; use `{database}`; {sqls}; inception_magic_commit; """ with OpenMysqlDb( host=self.go_inception_host, user=self.go_inception_user, port=self.go_inception_port, password=self.go_inception_password, db_name=self.go_inception_db_name, commit=self.commit, ) as conn: conn.ping() r = conn.db_query(sql=sql) return r