go-mysql笔记

一、安装配置 mysql 驱动

1.安装驱动

1
go get -u github.com/go-sql-driver/mysql

2.初始化模块

1
go mod init name

3.执行 go mod tidy

1
go mod tidy

4.导入驱动

1
2
3
4
import(
"database/sql"
"github.com/go-sql-driver/mysql"
)

二、连接数据库

1.获取连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
"time"
)

func main() {
db, err := sql.Open("mysql", "root:123456@/go_db")
if err != nil {
panic(err)
}
print(db)
// 最大连接时长
db.SetConnMaxLifetime(time.Minute * 3)
// 最大连接数
db.SetMaxOpenConns(10)
// 空闲连接数
db.SetMaxIdleConns(10)
}

2.连接完整步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
)

// 定义一个全局对象db
var db *sql.DB

// 定义一个初始化数据库的函数
func initDB() (err error) {
dsn := "root:508065@tcp(127.0.0.1:3306)/go_db?charset=utf8mb4&parseTime=True"
// 不会校验账号密码是否正确
// 注意!!!这里不要使用:=,我们是给全局变量赋值,然后在main函数中使用全局变量db
db, err = sql.Open("mysql", dsn)
if err != nil {
return err
}
// 尝试与数据库建立连接(校验dsn是否正确)
err = db.Ping()
if err != nil {
return err
}
return nil
}

func main() {
err := initDB() // 调用输出化数据库的函数
if err != nil {
fmt.Printf("初始化失败!,err:%v\\n", err)
return
}else{
fmt.Printf("初始化成功")
}
}

3.深入理解 sql.Open

1 基本介绍

1
func Open(driverName, dataSourceName string) (*sql.DB, error)

参数说明:

  1. driverName:数据库驱动名称(如 “mysql“、”postgres“、”sqlite3“)
  2. dataSourceName:驱动特定的连接字符串(DSN)

基本用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)

func main() {
// MySQL 连接示例
db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/dbname?parseTime=true")

// PostgreSQL 连接示例
// db, err := sql.Open("postgres", "host=localhost port=5432 user=user password=password dbname=dbname sslmode=disable")

// SQLite 连接示例
// db, err := sql.Open("sqlite3", "./mydatabase.db")

if err != nil {
// 处理错误
}
defer db.Close()

// 使用db进行操作...
}

2.核心特性

2.1 惰性连接机制

错误认知:
sql.Open() 立即建立数据库连接 ❌

实际行为:
sql.Open() 仅初始化连接池,不建立实际连接 ✅

2.2 真正的连接时机

连接实际建立于:

  • 首次执行查询或事务时(Query(), Exec(), Begin()
  • 显式调用 Ping() 方法时
2.3 连接池管理

sql.Open() 返回的 *sql.DB 对象是 连接池管理器

1
2
3
4
// 连接池配置示例:
db.SetMaxOpenConns(25) // 最大活跃连接数
db.SetMaxIdleConns(10) // 最大空闲连接数
db.SetConnMaxLifetime(5 * time.Minute) // 连接最长生命周期

3.连接验证

最佳实践:始终在 sql.Open() 后调用 Ping() 来验证连接

1
2
3
4
5
6
7
8
9
10
11
12
// 正确使用示例
db, err := sql.Open("mysql", "user:pass@/dbname")
if err != nil {
// 处理错误:驱动未注册/DSN格式错误
log.Fatal(err)
}

// 实际验证连接
if err := db.Ping(); err != nil {
// 处理错误:认证失败/网络问题/数据库不可达
log.Fatal("数据库连接失败:", err)
}

4.不同数据库的 DSN 格式

数据库 DSN 格式示例
MySQL "user:password@tcp(localhost:3306)/dbname?params"
PostgreSQL "postgres://user:password@localhost:5432/dbname?sslmode=disable"
SQLite "file:/path/to/database.db?_fk=true"
SQL Server "sqlserver://user:password@localhost:1433?database=dbname"

5.生命周期管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func main() {
// 1. 初始化连接池
db, err := sql.Open("mysql", DSN_STRING)
if err != nil {
log.Fatal(err)
}

// 2. 配置连接池
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(10)

// 3. 在整个应用生命周期中复用 db 对象
http.HandleFunc("/", handler(db))

// 4. 退出时关闭连接池
defer func() {
if err := db.Close(); err != nil {
log.Printf("关闭数据库连接失败: %v", err)
}
}()

// ...
}

// 请求处理函数
func handler(db *sql.DB) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 安全地从连接池获取连接
row := db.QueryRow("SELECT ...")
// ...
}
}

6.注意事项

  1. 延迟连接sql.Open() 不实际建立连接
  2. 必要验证:总是使用 Ping() 验证连接
  3. 连接池管理:配置合理的连接池参数
  4. 生命周期:
    • 应用启动时创建 *sql.DB
    • 应用退出时调用 Close()
    • 整个生命周期内复用
  5. 安全使用:
    • 使用参数化查询防止注入
    • 在事务中处理关键操作
    • 及时释放 RowsTx 资源

黄金法则:将 *sql.DB 视为长期存在的全局对象,而不是每次需要时创建和销毁的临时资源。

三、go-mysql查询操作

在 Go 语言中操作 MySQL 数据库进行查询,需要结合 database/sql 包和 MySQL 驱动(如 github.com/go-sql-driver/mysql)。下面是详细指南和最佳实践:

1. 基础查询操作

单行查询 (QueryRow)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//待查询的表的结构
type User struct{
id int
name string
email string
}
//通过ID查询
func getUserByID(db *sql.DB, userID int) (User, error) {
var user User
//SQL语句字符串
sqlStr:=`SELECT id, name, email FROM users WHERE id = ?`
err := db.QueryRow(sqlStr, userID).Scan(
&user.ID,
&user.Name,
&user.Email,
)
//错误处理
switch {
case err == sql.ErrNoRows:
return User{}, fmt.Errorf("用户不存在 (ID: %d)", userID)
case err != nil:
return User{}, fmt.Errorf("查询失败: %w", err)
default:
return user, nil
}
}

多行查询 (Query)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//待查询的表的结构
type User struct{
id int
name string
email string
created_at string
}


func getActiveUsers(db *sql.DB) ([]User, error) {
//SQL语句
sqlStr:=`
SELECT id, name, email, created_at
FROM users
WHERE last_login > NOW() - INTERVAL 30 DAY
ORDER BY created_at DESC
`
// 执行查询
rows, err := db.Query(sqlStr)
if err != nil {
return nil, fmt.Errorf("查询失败: %w", err)
}
defer rows.Close() // 必须关闭!

// 处理结果集
var users []User
for rows.Next() {
var user User
if err := rows.Scan(
&user.ID,
&user.Name,
&user.Email,
&user.CreatedAt,
); err != nil {
return nil, fmt.Errorf("扫描行失败: %w", err)
}
users = append(users, user)
}

// 检查遍历过程中是否出错
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("结果集错误: %w", err)
}

return users, nil
}

2. 高级查询技巧

处理 NULL 值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//表的结构
type UserProfile struct {
ID int
Name string
Bio sql.NullString // 可能为 NULL 的字段
AvatarURL sql.NullString
LastLogin sql.NullTime
}

func getProfile(userID int) (UserProfile, error) {
var p UserProfile
err := db.QueryRow(`
SELECT name, bio, avatar_url, last_login
FROM user_profiles
WHERE id = ?
`, userID).Scan(
&p.Name,
&p.Bio,
&p.AvatarURL,
&p.LastLogin,
)

// 使用示例:
if p.Bio.Valid {
fmt.Println("个人简介:", p.Bio.String)
} else {
fmt.Println("未设置个人简介")
}
}

分页查询

1
2
3
4
5
6
7
8
9
10
11
12
func getUsersPaginated(db *sql.DB, page, perPage int) ([]User, error) {
offset := (page - 1) * perPage

rows, err := db.Query(`
SELECT id, name, email
FROM users
ORDER BY id
LIMIT ? OFFSET ?`,
perPage, offset,
)
// ... (处理同上)
}

关联查询(JOIN)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//表的结构
type OrderWithUser struct {
OrderID int
OrderDate time.Time
Amount float64
UserID int
UserName string
UserEmail string
}

func getOrdersWithUsers(db *sql.DB) ([]OrderWithUser, error) {
rows, err := db.Query(`
SELECT
o.id AS order_id,
o.order_date,
o.amount,
u.id AS user_id,
u.name AS user_name,
u.email AS user_email
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE o.status = 'completed'
`)
// ... (处理同上)
}

3. 使用预编译语句 (Prepared Statements)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func searchUsersByName(db *sql.DB, name string) ([]User, error) {
// 准备语句
stmt, err := db.Prepare(`
SELECT id, name, email
FROM users
WHERE name LIKE ?
ORDER BY name
`)
if err != nil {
return nil, fmt.Errorf("准备语句失败: %w", err)
}
defer stmt.Close()

// 执行查询
rows, err := stmt.Query("%" + name + "%")
if err != nil {
return nil, fmt.Errorf("查询失败: %w", err)
}
defer rows.Close()

// ... (处理结果集)
}

4. 使用上下文 (Context)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func getProductsWithTimeout(ctx context.Context, db *sql.DB) ([]Product, error) {
// 创建子上下文,设置超时
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

rows, err := db.QueryContext(ctx, `
SELECT id, name, price
FROM products
WHERE stock > 0
`)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return nil, fmt.Errorf("查询超时")
}
return nil, fmt.Errorf("查询失败: %w", err)
}
defer rows.Close()

// ... (处理结果集)
}

5. 查询优化技巧

避免 SELECT *

1
2
3
4
5
// 反例 ❌
db.Query("SELECT * FROM users")

// 正例 ✅
db.Query("SELECT id, name, email FROM users")

使用缓冲提升性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func getLargeDataSet(db *sql.DB) ([]LargeItem, error) {
rows, err := db.Query(`SELECT ...`)
if err != nil {
return nil, err
}
defer rows.Close()

// 预先缓冲
items := make([]LargeItem, 0, 1000)

for rows.Next() {
var item LargeItem
if err := rows.Scan(...); err != nil {
return nil, err
}
items = append(items, item)
}

return items, nil
}

使用 EXPLAIN 分析查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func explainQuery(db *sql.DB, query string, args ...interface{}) {
row := db.QueryRow("EXPLAIN "+query, args...)

var (
id, selectType, table string
partitions, typeInfo, key sql.NullString
keyLen, ref, rows, filtered sql.NullInt64
extra sql.NullString
)

if err := row.Scan(
&id, &selectType, &table, &partitions, &typeInfo,
&key, &keyLen, &ref, &rows, &filtered, &extra,
); err != nil {
log.Printf("EXPLAIN 失败: %v", err)
return
}

log.Printf("查询计划: 表=%s 类型=%s 预计行数=%d 索引=%s",
table, typeInfo.String, rows.Int64, key.String)
}

6. 完整示例:带错误处理的查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func GetUserOrders(db *sql.DB, userID int) ([]Order, error) {
// 1. 准备带参数的查询
const query = `
SELECT id, order_date, total, status
FROM orders
WHERE user_id = ?
AND status NOT IN ('cancelled')
ORDER BY order_date DESC
`

// 2. 执行查询
rows, err := db.Query(query, userID)
if err != nil {
return nil, fmt.Errorf("查询用户订单失败: %w", err)
}
defer func() {
if closeErr := rows.Close(); closeErr != nil {
log.Printf("警告: 关闭行失败: %v", closeErr)
}
}()

// 3. 处理结果集
var orders []Order
for rows.Next() {
var order Order
if err := rows.Scan(
&order.ID,
&order.Date,
&order.Total,
&order.Status,
); err != nil {
// 部分数据处理失败,记录但继续处理
log.Printf("扫描订单行失败: %v", err)
continue
}
orders = append(orders, order)
}

// 4. 检查结果集错误
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("处理结果集时出错: %w", err)
}

// 5. 处理空结果
if len(orders) == 0 {
return nil, fmt.Errorf("用户 %d 没有有效订单", userID)
}

return orders, nil
}

关键注意事项

  1. 错误处理
    • 总是检查 db.Query/db.QueryRow 的错误
    • 使用 sql.ErrNoRows 特殊处理空结果
    • 检查 rows.Err() 捕获行遍历中的错误
  2. 资源管理
    • 使用 defer rows.Close() 确保释放资源
    • 在长期运行的函数中,提前关闭而不是依赖 defer
  3. 参数化查询
    • 总是使用 ? 占位符防止 SQL 注入
    • 不要拼接 SQL 字符串
  4. 性能优化
    • 复用预编译语句
    • 使用正确的数据类型减少转换开销
    • 避免 SELECT * 选择不必要的列
  5. 连接管理
    • 使用 db.SetConnMaxIdleTime() 防止数据库关闭空闲连接
    • 在高并发应用中调整 SetMaxOpenConnsSetMaxIdleConns

常见错误及解决

  1. 忘记关闭 rows

    1
    2
    3
    4
    5
    6
    7
    8
    // 错误: 导致连接泄漏
    rows, _ := db.Query(...)
    // 忘记 rows.Close()

    // 正确:
    rows, err := db.Query(...)
    if err != nil { ... }
    defer rows.Close()
  2. 扫描顺序不匹配

    1
    2
    3
    4
    5
    // 错误: 列顺序不匹配
    rows.Scan(&user.Name, &user.ID)
    // 但查询是 SELECT id, name ...

    // 正确: 保持查询列顺序和扫描顺序一致
  3. 空结果处理不当

    1
    2
    3
    4
    5
    6
    7
    // 错误: 没有处理 sql.ErrNoRows
    err := db.QueryRow(...).Scan(...)

    // 正确:
    if errors.Is(err, sql.ErrNoRows) {
    // 特殊处理空结果
    }
  4. 忽略上下文超时

    1
    2
    3
    4
    5
    6
    7
    // 错误: 没有超时控制的慢查询
    db.Query(...)

    // 正确: 使用带有超时的Context
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    db.QueryContext(ctx, ...)

重:查询中用到的方法:

1.db.Query()

1.基本语法与功能
1
rows, err := db.Query("SELECT ...", args...)
  1. 返回结果集:返回一个 *sql.Rows 对象,代表多行结果的集合
  2. 参数化查询:支持使用 ? 作为参数的占位符
  3. 高效资源管理:需要显式关闭 rows.Close() 来释放数据库连接
  4. 查询非修改:仅用于查询操作(SELECT),不用于数据修改语句
2.基本用法案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rows, err := db.Query(`
SELECT id, name, email, created_at
FROM users
WHERE active = ?
AND created_at > ?
`, true, time.Now().AddDate(0, -1, 0))

if err != nil {
if errors.Is(err, sql.ErrNoRows) {
// 特殊处理无结果情况
log.Println("未找到活跃用户")
return nil
}
return fmt.Errorf("用户查询失败: %w", err)
}

2.db.QueryRow()

db.QueryRow() 是 Go 的 database/sql 包中用于执行预期返回单行结果的 SQL 查询的关键方法。它简化了单行查询处理流程,并提供了比完整结果集更高效的资源利用方式。

1.核心特性与方法签名
1
func (db *DB) QueryRow(query string, args ...interface{}) *Row
  1. 专为单行结果设计:预期只返回一行数据
  2. 高效资源管理:自动关闭底层连接
  3. 简化错误处理:错误延后到 Scan() 时处理
  4. 内置防错机制:若返回多行也只扫描第一行
2.基本用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func getUserByID(db *sql.DB, id int) (User, error) {
var user User
err := db.QueryRow(`
SELECT id, name, email, created_at
FROM users
WHERE id = ?
`, id).Scan(
&user.ID,
&user.Name,
&user.Email,
&user.CreatedAt,
)

switch {
case err == sql.ErrNoRows:
return User{}, fmt.Errorf("用户不存在 (ID: %d)", id)
case err != nil:
return User{}, fmt.Errorf("查询失败: %w", err)
default:
return user, nil
}
}

替代方法与选择指南

方法 适用场景 特点
db.Query() 多行查询 支持全功能结果集遍历
db.QueryRow() 单行查询 简化代码,自动关闭资源
db.Exec() 数据修改 (INSERT, UPDATE, DELETE) 返回受影响行数,无结果集
db.QueryContext() 需要上下文控制 支持超时和取消
tx.Query() 事务中的查询 在事务上下文中运行

四、go-mysql插入数据

基础插入方法

1. 简单插入(Exec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func insertUser(db *sql.DB, name, email string) (int64, error) {
result, err := db.Exec(
"INSERT INTO users (name, email) VALUES (?, ?)",
name, email,
)
if err != nil {
return 0, fmt.Errorf("插入失败: %w", err)
}

// 获取自增ID
id, err := result.LastInsertId()
if err != nil {
return 0, fmt.Errorf("获取ID失败: %w", err)
}

// 验证影响行数
affected, err := result.RowsAffected()
if err != nil {
log.Printf("警告: 无法获取影响行数: %v", err)
} else if affected != 1 {
log.Printf("警告: 非预期影响行数: %d", affected)
}

return id, nil
}

2. 使用预处理语句(Prepared Statement)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func prepareInsertProduct(db *sql.DB) func(Product) (int64, error) {
stmt, err := db.Prepare(`
INSERT INTO products
(name, price, category, stock)
VALUES (?, ?, ?, ?)
`)
if err != nil {
log.Fatal("准备语句失败:", err)
}

return func(p Product) (int64, error) {
result, err := stmt.Exec(p.Name, p.Price, p.Category, p.Stock)
if err != nil {
return 0, fmt.Errorf("插入产品失败: %w", err)
}
return result.LastInsertId()
}
}

// 使用示例
insertProduct := prepareInsertProduct(db)
id, err := insertProduct(Product{
Name: "Go语言编程",
Price: 89.90,
Category: "图书",
Stock: 100,
})

高级插入技巧

1. 批量插入优化

方法1:拼接SQL语句(小批量)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func batchInsertUsers(db *sql.DB, users []User) error {
if len(users) == 0 {
return nil
}

// 构建批量插入SQL
query := "INSERT INTO users (name, email) VALUES "
var args []interface{}
var placeholders []string

for _, u := range users {
placeholders = append(placeholders, "(?, ?)")
args = append(args, u.Name, u.Email)
}

query += strings.Join(placeholders, ",")

_, err := db.Exec(query, args...)
return err
}
方法2:事务+预处理(大批量)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func bulkInsertLogs(db *sql.DB, logs []Log) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("开启事务失败: %w", err)
}

stmt, err := tx.Prepare(`
INSERT INTO logs
(timestamp, action, user_id, details)
VALUES (?, ?, ?, ?)
`)
if err != nil {
tx.Rollback()
return fmt.Errorf("准备语句失败: %w", err)
}
defer stmt.Close()

for _, log := range logs {
if _, err := stmt.Exec(
log.Timestamp,
log.Action,
log.UserID,
log.Details,
); err != nil {
tx.Rollback()
return fmt.Errorf("插入日志失败: %w", err)
}
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("提交事务失败: %w", err)
}

return nil
}

2. 插入特殊数据类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Product struct {
ID int
Name string
Price float64
Available sql.NullBool
Tags sql.NullString
CreatedAt time.Time
}

func insertComplexProduct(db *sql.DB, p Product) error {
_, err := db.Exec(`
INSERT INTO products
(name, price, available, tags, created_at)
VALUES (?, ?, ?, ?, ?)`,
p.Name,
p.Price,
p.Available, // 处理NULL bool
p.Tags, // 处理NULL string
p.CreatedAt,
)
return err
}

3. 使用 UPSERT 或 REPLACE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// ON DUPLICATE KEY UPDATE
func upsertUser(db *sql.DB, u User) error {
_, err := db.Exec(`
INSERT INTO users (id, name, email, last_login)
VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
email = VALUES(email),
last_login = VALUES(last_login)
`, u.ID, u.Name, u.Email, time.Now())
return err
}

// REPLACE INTO (注意: 会先删除原有行)
func replaceConfig(db *sql.DB, key, value string) error {
_, err := db.Exec(`
REPLACE INTO config (config_key, config_value)
VALUES (?, ?)
`, key, value)
return err
}

使用上下文(Context)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func insertWithTimeout(ctx context.Context, db *sql.DB, data Data) error {
// 创建带超时的上下文
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

result, err := db.ExecContext(ctx,
"INSERT INTO data_table (value) VALUES (?)",
data.Value,
)

if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return fmt.Errorf("插入操作超时")
}
return fmt.Errorf("插入失败: %w", err)
}

// 确保至少1行受到影响
affected, _ := result.RowsAffected()
if affected < 1 {
return fmt.Errorf("无行受到影响")
}

return nil
}

插入错误处理

1. 处理重复键错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func safeInsertUser(db *sql.DB, u User) error {
_, err := db.Exec(
"INSERT INTO users (email, name) VALUES (?, ?)",
u.Email, u.Name,
)

if err != nil {
// MySQL 特定错误处理
if mysqlErr, ok := err.(*mysql.MySQLError); ok {
if mysqlErr.Number == 1062 { // ER_DUP_ENTRY
return fmt.Errorf("邮箱已被注册: %s", u.Email)
}
}
return fmt.Errorf("插入失败: %w", err)
}

return nil
}

2. 插入重试机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func insertWithRetry(db *sql.DB, query string, args ...interface{}) (int64, error) {
const maxAttempts = 3
var lastErr error

for attempt := 1; attempt <= maxAttempts; attempt++ {
result, err := db.Exec(query, args...)
if err == nil {
return result.LastInsertId()
}

// MySQL 死锁错误代码
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1213 {
lastErr = err
// 指数退避
time.Sleep(time.Duration(attempt*attempt) * 100 * time.Millisecond)
continue
}

return 0, err
}

return 0, fmt.Errorf("插入失败(重试%d次): %w", maxAttempts, lastErr)
}

插入性能优化

1. 批量插入参数优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func optimizedBulkInsert(db *sql.DB, items []Item) error {
const batchSize = 1000
tx, err := db.Begin()
if err != nil {
return err
}

stmt, err := tx.Prepare(`
INSERT INTO items (data1, data2, data3)
VALUES (?, ?, ?)
`)
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()

for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}

// 执行当前批次
for j := i; j < end; j++ {
if _, err := stmt.Exec(
items[j].Data1,
items[j].Data2,
items[j].Data3,
); err != nil {
tx.Rollback()
return err
}
}

// 分批提交
if err := tx.Commit(); err != nil {
return err
}

// 开始新的事务
tx, err = db.Begin()
if err != nil {
return err
}

// 重新准备语句
stmt, err = tx.Prepare(...)
if err != nil {
tx.Rollback()
return err
}
}

return nil
}

2. 使用 LOAD DATA INFILE(超大数据量)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func bulkLoadCSV(db *sql.DB, csvPath string) error {
// 临时禁用外键检查
if _, err := db.Exec("SET foreign_key_checks = 0"); err != nil {
return err
}

// 执行数据加载
_, err := db.Exec(fmt.Sprintf(`
LOAD DATA LOCAL INFILE '%s'
INTO TABLE data_table
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 LINES
(field1, field2, field3)
`, csvPath))

// 恢复外键检查
if _, reErr := db.Exec("SET foreign_key_checks = 1"); reErr != nil {
log.Printf("警告: 恢复外键检查失败: %v", reErr)
}

return err
}

完整示例:用户注册流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func registerUser(ctx context.Context, db *sql.DB, newUser User) (int64, error) {
// 验证邮箱是否可用
var exists bool
err := db.QueryRowContext(ctx,
"SELECT 1 FROM users WHERE email = ?",
newUser.Email).Scan(&exists)

if err != nil && err != sql.ErrNoRows {
return 0, fmt.Errorf("检查邮箱失败: %w", err)
}
if exists {
return 0, fmt.Errorf("邮箱已被注册")
}

// 开始事务
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return 0, fmt.Errorf("开启事务失败: %w", err)
}

// 插入用户
result, err := tx.ExecContext(ctx, `
INSERT INTO users (email, password, name, created_at)
VALUES (?, ?, ?, ?)`,
newUser.Email,
hashPassword(newUser.Password),
newUser.Name,
time.Now().UTC(),
)
if err != nil {
tx.Rollback()
return 0, fmt.Errorf("创建用户失败: %w", err)
}

// 获取用户ID
userID, err := result.LastInsertId()
if err != nil {
tx.Rollback()
return 0, fmt.Errorf("获取用户ID失败: %w", err)
}

// 插入用户配置文件
_, err = tx.ExecContext(ctx, `
INSERT INTO user_profiles (user_id)
VALUES (?)`,
userID,
)
if err != nil {
tx.Rollback()
return 0, fmt.Errorf("创建用户配置失败: %w", err)
}

// 提交事务
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("提交事务失败: %w", err)
}

return userID, nil
}

最佳实践总结

  1. 使用参数化查询

    1
    2
    3
    4
    5
    // 错误 ❌ (SQL注入风险)
    db.Exec(fmt.Sprintf("INSERT INTO users (name) VALUES ('%s')", userName))

    // 正确 ✅
    db.Exec("INSERT INTO users (name) VALUES (?)", userName)
  2. 始终检查错误

    1
    2
    3
    if _, err := db.Exec(...); err != nil {
    // 处理错误
    }
  3. 复用预处理语句

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 应用启动时
    var insertUserStmt *sql.Stmt

    func initDB() {
    insertUserStmt = db.Prepare("INSERT ...")
    }

    // 应用中复用
    insertUserStmt.Exec(...)
  4. 重要操作使用事务

    1
    2
    3
    tx.Begin()
    // 多个操作
    tx.Commit()
  5. 处理NULL值

    1
    2
    3
    4
    type User struct {
    Biography sql.NullString
    Birthdate sql.NullTime
    }
  6. 批量插入优化

    • 小批量:拼接SQL语句
    • 大批量:事务+预处理
    • 超大:LOAD DATA INFILE
  7. 超时控制

    1
    2
    3
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    db.ExecContext(ctx, ...)
  8. 连接池配置

    1
    2
    3
    db.SetMaxOpenConns(25)
    db.SetMaxIdleConns(10)
    db.SetConnMaxLifetime(5*time.Minute)
  9. 性能监控

    1
    2
    3
    4
    5
    6
    7
    8
    // 监控连接池状态
    go func() {
    for range time.Tick(1 * time.Minute) {
    stats := db.Stats()
    log.Printf("连接池: 打开=%d 使用中=%d 空闲=%d 等待=%d",
    stats.OpenConnections, stats.InUse, stats.Idle, stats.WaitCount)
    }
    }()
  10. 关闭资源

    1
    2
    3
    4
    5
    // 确保关闭连接
    defer db.Close()
    // 及时关闭Rows和Statements
    defer rows.Close()
    defer stmt.Close()

五、go-mysql删除数据

基础删除操作

1. 简单删除(使用 Exec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func deleteUser(db *sql.DB, userID int64) error {
// 执行DELETE语句
result, err := db.Exec("DELETE FROM users WHERE id = ?", userID)
if err != nil {
return fmt.Errorf("删除失败: %w", err)
}

// 检查受影响的行数
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("获取影响行数失败: %w", err)
}

if rowsAffected == 0 {
return fmt.Errorf("未找到用户 (ID: %d)", userID)
}

fmt.Printf("成功删除用户,影响行数: %d\n", rowsAffected)
return nil
}

2. 使用上下文(Context)控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func deleteWithTimeout(ctx context.Context, db *sql.DB, userID int64) error {
// 创建超时上下文(3秒)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

result, err := db.ExecContext(ctx, "DELETE FROM users WHERE id = ?", userID)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return fmt.Errorf("删除操作超时")
}
return fmt.Errorf("删除失败: %w", err)
}

// 验证受影响行数
rowsAffected, _ := result.RowsAffected()
if rowsAffected == 0 {
return fmt.Errorf("用户不存在 (ID: %d)", userID)
}

return nil
}

高级删除场景

1. 事务中的删除(确保数据一致性)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func deleteUserAndRelatedData(db *sql.DB, userID int64) error {
// 开始事务
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("开启事务失败: %w", err)
}

// 删除相关配置
if _, err := tx.Exec("DELETE FROM user_settings WHERE user_id = ?", userID); err != nil {
tx.Rollback()
return fmt.Errorf("删除用户设置失败: %w", err)
}

// 删除相关订单
if _, err := tx.Exec("DELETE FROM orders WHERE user_id = ?", userID); err != nil {
tx.Rollback()
return fmt.Errorf("删除用户订单失败: %w", err)
}

// 删除用户主记录
result, err := tx.Exec("DELETE FROM users WHERE id = ?", userID)
if err != nil {
tx.Rollback()
return fmt.Errorf("删除用户失败: %w", err)
}

// 验证用户删除
if rowsAffected, _ := result.RowsAffected(); rowsAffected == 0 {
tx.Rollback()
return fmt.Errorf("用户不存在 (ID: %d)", userID)
}

// 提交事务
if err := tx.Commit(); err != nil {
return fmt.Errorf("提交事务失败: %w", err)
}

return nil
}

2. 软删除(逻辑删除,使用标志位)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func softDeleteProduct(db *sql.DB, productID int64) error {
// 更新 deleted_at 字段而不是真正删除
result, err := db.Exec(`
UPDATE products
SET deleted_at = NOW()
WHERE id = ?
`, productID)

if err != nil {
return fmt.Errorf("软删除失败: %w", err)
}

if rowsAffected, _ := result.RowsAffected(); rowsAffected == 0 {
return fmt.Errorf("产品不存在 (ID: %d)", productID)
}

return nil
}

// 查询时排除已软删除的记录
func getActiveProducts(db *sql.DB) ([]Product, error) {
rows, err := db.Query(`
SELECT id, name, price
FROM products
WHERE deleted_at IS NULL
`)
// ...
}

3. 批量删除

1
2
3
4
5
6
7
8
9
10
11
12
13
func deleteExpiredSessions(db *sql.DB, maxAge time.Duration) (int64, error) {
// 执行批量删除
result, err := db.Exec(`
DELETE FROM user_sessions
WHERE last_activity < ?
`, time.Now().Add(-maxAge))

if err != nil {
return 0, fmt.Errorf("批量删除失败: %w", err)
}

return result.RowsAffected()
}

4. 联表删除(DELETE with JOIN)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func deleteInactiveUsersWithNoOrders(db *sql.DB) (int64, error) {
// 删除30天未登录且没有订单的用户
result, err := db.Exec(`
DELETE users
FROM users
LEFT JOIN orders ON orders.user_id = users.id
WHERE users.last_login < DATE_SUB(NOW(), INTERVAL 30 DAY)
AND orders.id IS NULL
`)

if err != nil {
return 0, fmt.Errorf("删除用户失败: %w", err)
}

return result.RowsAffected()
}

错误处理与安全措施

1. 处理外键约束错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func safeDeleteCategory(db *sql.DB, categoryID int64) error {
_, err := db.Exec("DELETE FROM categories WHERE id = ?", categoryID)

if err != nil {
// MySQL 特定错误处理
if mysqlErr, ok := err.(*mysql.MySQLError); ok {
if mysqlErr.Number == 1451 { // ER_ROW_IS_REFERENCED
return fmt.Errorf("分类下有产品存在,无法删除")
}
}
return fmt.Errorf("删除失败: %w", err)
}

return nil
}

2. 删除前验证条件(防止意外删除)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func deleteUserWithVerification(db *sql.DB, userID int64, username string) error {
// 验证用户名匹配
var count int
err := db.QueryRow(`
SELECT COUNT(*)
FROM users
WHERE id = ? AND username = ?
`, userID, username).Scan(&count)

if err != nil {
return fmt.Errorf("验证失败: %w", err)
}

if count == 0 {
return fmt.Errorf("ID与用户名不匹配")
}

// 执行删除
return deleteUser(db, userID)
}

3. 限流与重试机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func deleteWithRetry(db *sql.DB, query string, args ...interface{}) error {
const maxAttempts = 3
var lastErr error

for attempt := 1; attempt <= maxAttempts; attempt++ {
_, err := db.Exec(query, args...)
if err == nil {
return nil
}

// MySQL 死锁错误代码 (ER_LOCK_DEADLOCK)
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1213 {
lastErr = err
// 指数退避
time.Sleep(time.Duration(attempt*attempt) * 100 * time.Millisecond)
continue
}

return err
}

return fmt.Errorf("删除失败(重试%d次): %w", maxAttempts, lastErr)
}

性能优化策略

1. 分批删除(避免锁表)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func deleteLargeDataSet(db *sql.DB) error {
const batchSize = 1000
totalDeleted := int64(0)

for {
result, err := db.Exec(`
DELETE FROM large_table
WHERE processed = 0
LIMIT ?
`, batchSize)

if err != nil {
return fmt.Errorf("批量删除失败: %w", err)
}

rowsAffected, _ := result.RowsAffected()
totalDeleted += rowsAffected

if rowsAffected == 0 {
break
}

// 给数据库喘息的机会
time.Sleep(500 * time.Millisecond)
}

fmt.Printf("共删除 %d 行记录\n", totalDeleted)
return nil
}

2. 优化删除条件(使用索引)

1
2
3
4
5
// 差 ❌ - 全表扫描
db.Exec("DELETE FROM users WHERE DATE(last_login) < DATE_SUB(NOW(), INTERVAL 90 DAY)")

// 优 ✅ - 使用索引列
db.Exec("DELETE FROM users WHERE last_login < DATE_SUB(NOW(), INTERVAL 90 DAY)")

完整示例:安全删除服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main

import (
"database/sql"
"fmt"
"log"
"time"

_ "github.com/go-sql-driver/mysql"
)

type UserDeleter struct {
db *sql.DB
}

func NewUserDeleter(dsn string) (*UserDeleter, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, err
}

// 配置连接池
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(5 * time.Minute)

if err := db.Ping(); err != nil {
return nil, fmt.Errorf("数据库连接失败: %w", err)
}

return &UserDeleter{db: db}, nil
}

func (ud *UserDeleter) DeleteUser(userID int64) error {
// 开始事务
tx, err := ud.db.Begin()
if err != nil {
return fmt.Errorf("开启事务失败: %w", err)
}

// 备份用户数据到历史表
if _, err := tx.Exec(`
INSERT INTO deleted_users_history
(user_id, username, email, deleted_at)
SELECT id, username, email, NOW()
FROM users
WHERE id = ?
`, userID); err != nil {
tx.Rollback()
return fmt.Errorf("创建历史记录失败: %w", err)
}

// 执行删除
result, err := tx.Exec("DELETE FROM users WHERE id = ?", userID)
if err != nil {
tx.Rollback()
return fmt.Errorf("删除用户失败: %w", err)
}

// 验证删除
if rowsAffected, _ := result.RowsAffected(); rowsAffected == 0 {
tx.Rollback()
return fmt.Errorf("用户不存在 (ID: %d)", userID)
}

// 提交事务
if err := tx.Commit(); err != nil {
return fmt.Errorf("提交事务失败: %w", err)
}

return nil
}

func (ud *UserDeleter) Close() error {
return ud.db.Close()
}

func main() {
deleter, err := NewUserDeleter("user:password@tcp(localhost:3306)/mydb")
if err != nil {
log.Fatal(err)
}
defer deleter.Close()

if err := deleter.DeleteUser(123); err != nil {
log.Printf("删除用户失败: %v", err)
} else {
log.Println("用户删除成功")
}
}

最佳实践总结

  1. 始终使用参数化查询

    1
    2
    3
    4
    5
    // 错误 ❌ (SQL注入风险)
    db.Exec(fmt.Sprintf("DELETE FROM users WHERE id = %d", userID))

    // 正确 ✅
    db.Exec("DELETE FROM users WHERE id = ?", userID)
  2. 使用事务保障数据完整性

    1
    2
    3
    tx.Begin()
    // 多个关联操作
    tx.Commit()
  3. 验证影响行数

    1
    2
    3
    if rowsAffected, _ := result.RowsAffected(); rowsAffected == 0 {
    return fmt.Errorf("目标记录不存在")
    }
  4. 实现软删除(逻辑删除)

    1
    ALTER TABLE users ADD COLUMN deleted_at TIMESTAMP NULL;
  5. 批量删除优化

    1
    2
    // 使用LIMIT分批删除
    db.Exec("DELETE FROM large_table LIMIT 1000")
  6. 上下文超时控制

    1
    2
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    db.ExecContext(ctx, ...)
  7. 连接池配置

    1
    2
    3
    db.SetMaxOpenConns(25)
    db.SetMaxIdleConns(10)
    db.SetConnMaxLifetime(5*time.Minute)
  8. 错误处理与重试

    1
    2
    3
    4
    // 处理死锁等可重试错误
    if isRetryableError(err) {
    // 执行重试
    }
  9. 记录删除日志

    1
    2
    // 在事务中创建删除日志记录
    tx.Exec("INSERT INTO deletion_logs ...")
  10. 权限最小化

    1
    2
    3
    -- 创建专用删除用户
    CREATE USER 'deleter'@'localhost' IDENTIFIED BY 'strongpassword';
    GRANT DELETE ON mydb.users TO 'deleter'@'localhost';

六、go-mysql更新数据

在 Go 语言中更新 MySQL 数据是常见的数据库操作。以下是从基础到高级的全面指南,涵盖各种更新场景和最佳实践。

基础更新操作

1. 简单更新(使用 Exec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func updateUserName(db *sql.DB, id int, newName string) error {
// 执行UPDATE语句
result, err := db.Exec(
"UPDATE users SET name = ? WHERE id = ?",
newName, id,
)
if err != nil {
return fmt.Errorf("更新失败: %w", err)
}

// 获取受影响行数
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("获取影响行数失败: %w", err)
}

if rowsAffected == 0 {
return fmt.Errorf("未找到用户 (ID: %d)", id)
}

fmt.Printf("成功更新 %d 条记录\n", rowsAffected)
return nil
}

2. 使用上下文(Context)控制超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func updateWithTimeout(ctx context.Context, db *sql.DB, id int, email string) error {
// 创建超时上下文
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

result, err := db.ExecContext(ctx,
"UPDATE users SET email = ? WHERE id = ?",
email, id,
)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return fmt.Errorf("更新超时")
}
return fmt.Errorf("更新失败: %w", err)
}

// 检查影响行数
if rowsAffected, _ := result.RowsAffected(); rowsAffected == 0 {
return fmt.Errorf("未找到用户 (ID: %d)", id)
}

return nil
}

高级更新技巧

1. 事务中的更新(确保数据一致性)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func transferBalance(db *sql.DB, fromID, toID int, amount float64) error {
// 开始事务
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("开启事务失败: %w", err)
}

// 转出
result, err := tx.Exec(`
UPDATE accounts SET balance = balance - ?
WHERE id = ? AND balance >= ?`,
amount, fromID, amount,
)
if err != nil {
tx.Rollback()
return fmt.Errorf("转出操作失败: %w", err)
}

// 验证转出操作影响行数
rowsAffected, err := result.RowsAffected()
if err != nil || rowsAffected != 1 {
tx.Rollback()
return fmt.Errorf("转出操作无效或余额不足")
}

// 转入
_, err = tx.Exec(`
UPDATE accounts SET balance = balance + ?
WHERE id = ?`,
amount, toID,
)
if err != nil {
tx.Rollback()
return fmt.Errorf("转入操作失败: %w", err)
}

// 提交事务
if err := tx.Commit(); err != nil {
return fmt.Errorf("提交事务失败: %w", err)
}

return nil
}

2. 处理 NULL 值更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func updateUserProfile(db *sql.DB, id int, bio *string) error {
var bioValue interface{}
if bio != nil {
bioValue = *bio
} else {
bioValue = nil // 这将设置为NULL
}

_, err := db.Exec(`
UPDATE user_profiles
SET bio = ?
WHERE user_id = ?`,
bioValue, id,
)
return err
}

// 或者使用sql.NullString
func updateUserProfileWithNull(db *sql.DB, id int, bio sql.NullString) error {
_, err := db.Exec(`
UPDATE user_profiles
SET bio = ?
WHERE user_id = ?`,
bio, id,
)
return err
}

3. 基于查询结果的更新(嵌套查询)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func updateCustomerStatus(db *sql.DB) error {
// 将大客户的等级提升
result, err := db.Exec(`
UPDATE customers
SET account_level = 'VIP'
WHERE id IN (
SELECT customer_id
FROM orders
WHERE order_date > NOW() - INTERVAL 6 MONTH
GROUP BY customer_id
HAVING SUM(total_amount) > 10000
)
`)

if err != nil {
return fmt.Errorf("更新失败: %w", err)
}

affected, _ := result.RowsAffected()
fmt.Printf("成功升级 %d 名客户为VIP\n", affected)
return nil
}

4. 批量更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func bulkUpdatePrices(db *sql.DB, productUpdates map[int]float64) error {
tx, err := db.Begin()
if err != nil {
return err
}

stmt, err := tx.Prepare(`
UPDATE products
SET price = ?
WHERE id = ?
`)
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()

for id, newPrice := range productUpdates {
result, err := stmt.Exec(newPrice, id)
if err != nil {
tx.Rollback()
return fmt.Errorf("产品 %d 更新失败: %w", id, err)
}

// 验证更新是否成功
if affected, _ := result.RowsAffected(); affected != 1 {
tx.Rollback()
return fmt.Errorf("产品 %d 未更新", id)
}
}

return tx.Commit()
}

5. 条件更新(乐观锁实现)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func updateWithVersion(db *sql.DB, id int, newValue string) error {
// 先查询当前版本
var currentValue string
var version int
err := db.QueryRow(`
SELECT value, version
FROM config
WHERE id = ?
`, id).Scan(&currentValue, &version)

if err != nil {
return fmt.Errorf("查询失败: %w", err)
}

// 使用版本号防止并发修改冲突
result, err := db.Exec(`
UPDATE config
SET value = ?, version = version + 1
WHERE id = ? AND version = ?`,
newValue, id, version,
)

if err != nil {
return fmt.Errorf("更新失败: %w", err)
}

// 检查更新是否成功
if affected, _ := result.RowsAffected(); affected != 1 {
return fmt.Errorf("更新冲突,数据已被修改")
}

return nil
}

错误处理与重试

1. 处理死锁错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func updateUserStatsWithRetry(db *sql.DB, userID int) error {
const maxRetries = 3
var lastErr error

for i := 0; i < maxRetries; i++ {
_, err := db.Exec(`
UPDATE user_stats
SET login_count = login_count + 1,
last_login = NOW()
WHERE user_id = ?
`, userID)

if err == nil {
return nil
}

// 检查是否为死锁错误
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1213 {
lastErr = err
// 指数退避
time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Millisecond * 100)
continue
}

return err
}

return fmt.Errorf("更新失败(重试%d次): %w", maxRetries, lastErr)
}

2. 处理唯一约束冲突

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func updateEmail(db *sql.DB, userID int, newEmail string) error {
result, err := db.Exec(`
UPDATE users
SET email = ?
WHERE id = ?`,
newEmail, userID,
)

if err != nil {
if mysqlErr, ok := err.(*mysql.MySQLError); ok {
if mysqlErr.Number == 1062 { // ER_DUP_ENTRY
return fmt.Errorf("邮箱已被使用: %s", newEmail)
}
}
return fmt.Errorf("更新失败: %w", err)
}

// 检查更新结果
if affected, _ := result.RowsAffected(); affected != 1 {
return fmt.Errorf("用户不存在")
}

return nil
}

性能优化技巧

1. 高效批量更新(使用 CASE WHEN)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func updateMultiProducts(db *sql.DB, updates map[int]float64) error {
// 构建CASE语句
var caseStmt strings.Builder
args := []interface{}{}

caseStmt.WriteString("UPDATE products SET price = CASE id ")

for id, price := range updates {
caseStmt.WriteString("WHEN ? THEN ? ")
args = append(args, id, price)
}

caseStmt.WriteString("END WHERE id IN (")

// 添加所有ID到WHERE条件
ids := []interface{}{}
for id := range updates {
ids = append(ids, id)
caseStmt.WriteString("?,")
}

// 移除最后的逗号
query := caseStmt.String()
if len(ids) > 0 {
query = strings.TrimSuffix(query, ",")
}
query += ")"

args = append(args, ids...)

// 执行更新
_, err := db.Exec(query, args...)
return err
}

2. 更新后读取最新值(使用 OUTPUT 或 RETURNING)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func updateAndReturn(db *sql.DB, id int, newName string) (User, error) {
var user User

// MySQL不支持RETURNING,但可以通过查询实现类似效果
tx, err := db.Begin()
if err != nil {
return User{}, err
}

// 先执行更新
result, err := tx.Exec(`
UPDATE users
SET name = ?
WHERE id = ?`,
newName, id,
)

if err != nil {
tx.Rollback()
return User{}, fmt.Errorf("更新失败: %w", err)
}

// 验证更新成功
if affected, _ := result.RowsAffected(); affected != 1 {
tx.Rollback()
return User{}, fmt.Errorf("用户不存在")
}

// 查询更新后的记录
err = tx.QueryRow(`
SELECT id, name, email
FROM users
WHERE id = ?`,
id,
).Scan(&user.ID, &user.Name, &user.Email)

if err != nil {
tx.Rollback()
return User{}, fmt.Errorf("查询失败: %w", err)
}

if err = tx.Commit(); err != nil {
return User{}, fmt.Errorf("提交失败: %w", err)
}

return user, nil
}

3. 避免全表扫描

1
2
3
4
5
// 差 ❌ - 全表扫描
db.Exec("UPDATE orders SET status = 'shipped' WHERE DATE(order_date) = '2023-10-01'")

// 优 ✅ - 使用索引列
db.Exec("UPDATE orders SET status = 'shipped' WHERE order_date BETWEEN '2023-10-01 00:00:00' AND '2023-10-01 23:59:59'")

完整示例:库存管理系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main

import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"time"

_ "github.com/go-sql-driver/mysql"
)

type InventoryService struct {
db *sql.DB
}

func NewInventoryService(dsn string) (*InventoryService, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, err
}

// 配置连接池
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(5 * time.Minute)

if err := db.Ping(); err != nil {
return nil, fmt.Errorf("数据库连接失败: %w", err)
}

return &InventoryService{db: db}, nil
}

func (s *InventoryService) AdjustStock(productID int, quantityChange int) error {
// 开始事务
tx, err := s.db.BeginTx(context.Background(), nil)
if err != nil {
return fmt.Errorf("开启事务失败: %w", err)
}

// 使用行级锁确保更新时的数据一致性
var currentStock int
err = tx.QueryRow(`
SELECT stock FROM inventory
WHERE product_id = ? FOR UPDATE`,
productID,
).Scan(&currentStock)

if errors.Is(err, sql.ErrNoRows) {
tx.Rollback()
return fmt.Errorf("产品不存在 (ID: %d)", productID)
} else if err != nil {
tx.Rollback()
return fmt.Errorf("库存查询失败: %w", err)
}

// 检查库存是否足够
if currentStock+quantityChange < 0 {
tx.Rollback()
return fmt.Errorf("库存不足")
}

// 更新库存
result, err := tx.Exec(`
UPDATE inventory
SET stock = stock + ?
WHERE product_id = ?`,
quantityChange, productID,
)

if err != nil {
tx.Rollback()
return fmt.Errorf("库存更新失败: %w", err)
}

// 验证更新
if affected, _ := result.RowsAffected(); affected != 1 {
tx.Rollback()
return fmt.Errorf("库存更新失败")
}

// 记录库存变更
_, err = tx.Exec(`
INSERT INTO inventory_history
(product_id, quantity_change, updated_at)
VALUES (?, ?, ?)`,
productID, quantityChange, time.Now().UTC(),
)

if err != nil {
tx.Rollback()
return fmt.Errorf("历史记录失败: %w", err)
}

// 提交事务
if err := tx.Commit(); err != nil {
return fmt.Errorf("事务提交失败: %w", err)
}

return nil
}

func (s *InventoryService) Close() {
s.db.Close()
}

func main() {
service, err := NewInventoryService("user:password@tcp(localhost:3306)/inventory_db")
if err != nil {
log.Fatal(err)
}
defer service.Close()

// 减少库存(出货)
if err := service.AdjustStock(101, -5); err != nil {
log.Printf("库存更新失败: %v", err)
} else {
log.Println("库存更新成功")
}
}

最佳实践总结

  1. 始终使用参数化查询

    1
    2
    3
    4
    5
    // 错误 ❌ (SQL注入风险)
    db.Exec(fmt.Sprintf("UPDATE users SET name = '%s' WHERE id = %d", name, id))

    // 正确 ✅
    db.Exec("UPDATE users SET name = ? WHERE id = ?", name, id)
  2. 验证影响行数

    1
    2
    3
    if rowsAffected, _ := result.RowsAffected(); rowsAffected == 0 {
    return fmt.Errorf("目标记录不存在")
    }
  3. 关键操作使用事务

    1
    2
    3
    tx.Begin()
    // 多个相关操作
    tx.Commit()
  4. 处理并发冲突

    1
    2
    3
    4
    5
    -- 使用行级锁
    SELECT ... FOR UPDATE

    -- 使用版本号
    UPDATE ... WHERE version = ?
  5. 优化批量更新

    • 小批量:使用 IN 子句
    • 大批量:使用 CASE WHEN 或分批处理
  6. 上下文超时控制

    1
    2
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    db.ExecContext(ctx, ...)
  7. 连接池配置

    1
    2
    3
    db.SetMaxOpenConns(20)
    db.SetMaxIdleConns(10)
    db.SetConnMaxLifetime(5*time.Minute)
  8. 错误处理与重试

    1
    2
    3
    4
    // 处理死锁等可重试错误
    if isRetryableError(err) {
    // 执行重试
    }
  9. 避免全表更新

    1
    2
    -- 必须有WHERE条件
    UPDATE products SET ... WHERE id = 123
  10. 性能监控

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 监控连接池状态
    go func() {
    ticker := time.NewTicker(1 * time.Minute)
    for range ticker.C {
    stats := service.db.Stats()
    log.Printf("连接池: 打开=%d 使用中=%d 空闲=%d 等待=%d",
    stats.OpenConnections, stats.InUse, stats.Idle, stats.WaitCount)
    }
    }()

七、Go 中 db.Exec() 方法

db.Exec() 是 Go 中 database/sql 包的核心方法,用于执行不返回行结果集的 SQL 命令。它对于执行插入(INSERT)、更新(UPDATE)、删除(DELETE)、DDL 语句等操作至关重要。

基本用法和语法

1
result, err := db.Exec(query string, args ...interface{})

参数说明:

  • query:要执行的 SQL 语句
  • args:SQL 语句中的参数(用于参数化查询)

返回值:

  • resultsql.Result 接口对象
  • err:执行过程中遇到的错误

核心功能和方法

sql.Result 接口提供的方法:

1
2
3
4
5
6
7
type Result interface {
// 返回最后插入行的自增 ID(对于 INSERT)
LastInsertId() (int64, error)

// 返回受影响的行数(对于 INSERT、UPDATE、DELETE)
RowsAffected() (int64, error)
}

实际应用示例

1. 创建表(DDL 操作)

1
2
3
4
5
6
7
8
9
10
11
12
13
// 创建用户表
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
email VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`)
if err != nil {
log.Fatal("创建表失败:", err)
}
fmt.Println("用户表创建成功")

2. 插入数据(INSERT)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 插入用户数据
result, err := db.Exec(
"INSERT INTO users (username, email) VALUES (?, ?)",
"john_doe", "john@example.com",
)
if err != nil {
log.Fatal("插入失败:", err)
}

// 获取自增 ID
id, err := result.LastInsertId()
if err != nil {
log.Println("警告: 无法获取自增ID")
} else {
fmt.Printf("用户创建成功,ID: %d\n", id)
}

// 验证影响行数
rowsAffected, err := result.RowsAffected()
if err != nil {
log.Println("警告: 无法获取影响行数")
} else if rowsAffected != 1 {
log.Println("警告: 预期影响1行,实际:", rowsAffected)
}

3. 更新数据(UPDATE)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 更新用户邮箱
result, err := db.Exec(
"UPDATE users SET email = ? WHERE username = ?",
"john.new@example.com", "john_doe",
)
if err != nil {
log.Fatal("更新失败:", err)
}

// 验证更新是否成功
if rowsAffected, _ := result.RowsAffected(); rowsAffected == 0 {
log.Println("警告: 未找到匹配用户")
} else {
fmt.Println("用户信息更新成功")
}

4. 删除数据(DELETE)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 删除非活动用户
result, err := db.Exec(`
DELETE FROM users
WHERE last_login < NOW() - INTERVAL 1 YEAR
`)
if err != nil {
log.Fatal("删除失败:", err)
}

if rowsAffected, _ := result.RowsAffected(); rowsAffected > 0 {
fmt.Printf("删除 %d 个非活跃用户\n", rowsAffected)
} else {
fmt.Println("没有符合删除条件的用户")
}

5. 带参数的 DELETE 操作

1
2
3
4
5
6
7
8
9
10
11
12
// 删除指定用户
result, err := db.Exec(
"DELETE FROM users WHERE username = ?",
"inactive_user",
)
if err != nil {
log.Fatal("删除失败:", err)
}

if rowsAffected, _ := result.RowsAffected(); rowsAffected == 0 {
log.Println("未找到用户 'inactive_user'")
}

高级用法与技巧

1. 使用事务处理多个操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 转账操作事务示例
tx, err := db.Begin()
if err != nil {
log.Fatal("开启事务失败:", err)
}

// 从源账户扣款
_, err = tx.Exec(`
UPDATE accounts
SET balance = balance - ?
WHERE id = ? AND balance >= ?`,
100, 1, 100,
)
if err != nil {
tx.Rollback()
log.Fatal("扣款失败:", err)
}

// 向目标账户加款
_, err = tx.Exec(`
UPDATE accounts
SET balance = balance + ?
WHERE id = ?`,
100, 2,
)
if err != nil {
tx.Rollback()
log.Fatal("加款失败:", err)
}

// 提交事务
if err := tx.Commit(); err != nil {
log.Fatal("提交事务失败:", err)
}
fmt.Println("转账操作成功")

2. 使用预处理语句提高性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 批量插入产品数据
func bulkInsertProducts(db *sql.DB, products []Product) error {
tx, err := db.Begin()
if err != nil {
return err
}

// 准备预处理语句
stmt, err := tx.Prepare(`
INSERT INTO products
(name, price, category)
VALUES (?, ?, ?)
`)
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()

// 批量执行
for _, p := range products {
if _, err := stmt.Exec(p.Name, p.Price, p.Category); err != nil {
tx.Rollback()
return fmt.Errorf("插入产品 '%s' 失败: %w", p.Name, err)
}
}

return tx.Commit()
}

3. 使用上下文控制超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func updateProfileWithTimeout(ctx context.Context, db *sql.DB, userID int, bio string) error {
// 设置超时上下文(3秒)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

// 执行带上下文的操作
_, err := db.ExecContext(ctx,
"UPDATE user_profiles SET bio = ? WHERE user_id = ?",
bio, userID,
)

if err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return fmt.Errorf("更新操作超时")
}
return fmt.Errorf("更新失败: %w", err)
}

return nil
}

错误处理和常见问题

1. 处理唯一约束冲突

1
2
3
4
5
6
7
8
9
10
11
12
13
14
_, err := db.Exec(`
INSERT INTO users (username, email)
VALUES (?, ?)`,
"john_doe", "john@example.com",
)

if err != nil {
// 检查是否唯一约束冲突
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1062 {
fmt.Println("用户名或邮箱已被使用")
} else {
log.Fatal("其他错误:", err)
}
}

2. 处理外键约束错误

1
2
3
4
5
6
7
8
9
10
11
12
13
_, err := db.Exec(`
DELETE FROM categories
WHERE id = ?`,
5,
)

if err != nil {
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1451 {
fmt.Println("该分类下存在产品,无法删除")
} else {
log.Fatal("其他错误:", err)
}
}

3. 处理结果未更新情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
result, err := db.Exec(`
UPDATE products
SET price = price * 0.9
WHERE category = ?`,
"electronics",
)

if err != nil {
log.Fatal("更新失败:", err)
}

if affected, _ := result.RowsAffected(); affected == 0 {
fmt.Println("该分类下无产品可更新")
}

性能优化建议

1. 批量操作优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 高效批量更新方法
func batchUpdatePrices(db *sql.DB, updates map[int]float64) error {
var b strings.Builder
b.WriteString("UPDATE products SET price = CASE id ")

ids := make([]interface{}, 0, len(updates)*2+len(updates))
for id, price := range updates {
b.WriteString("WHEN ? THEN ? ")
ids = append(ids, id, price)
}

b.WriteString("END WHERE id IN (")
for id := range updates {
b.WriteString("?,")
ids = append(ids, id)
}

// 移除末尾逗号
query := b.String()
if len(updates) > 0 {
query = query[:len(query)-1] + ")"
}

_, err := db.Exec(query, ids...)
return err
}

2. 正确配置连接池

1
2
3
4
5
6
7
db, _ := sql.Open("mysql", dsn)

// 优化连接池设置
db.SetMaxOpenConns(25) // 最大打开连接数
db.SetMaxIdleConns(10) // 最大空闲连接数
db.SetConnMaxLifetime(5 * time.Minute) // 连接最大生命周期
db.SetConnMaxIdleTime(2 * time.Minute) // 空闲连接最大保留时间

3. 避免 N+1 更新问题

1
2
3
4
5
6
7
// 差 ❌ - N+1 更新
for _, user := range users {
db.Exec("UPDATE users SET last_login = ? WHERE id = ?", now, user.ID)
}

// 优 ✅ - 批量更新
db.Exec("UPDATE users SET last_login = ? WHERE id IN (?)", now, ids)

安全最佳实践go

1. 参数化查询(防 SQL 注入)

1
2
3
4
5
6
// 危险:SQL 注入风险 ❌
id := "1; DROP TABLE users;--"
db.Exec("DELETE FROM products WHERE id = " + id)

// 安全:参数化查询 ✅
db.Exec("DELETE FROM products WHERE id = ?", id)

2. 最小权限原则

1
2
3
-- 创建专用操作账号
CREATE USER 'app_update'@'%' IDENTIFIED BY 'StrongPassword!123';
GRANT INSERT, UPDATE, DELETE ON mydb.* TO 'app_update'@'%';

3. 敏感操作审计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 在高危操作前记录日志
func deleteUserAccount(db *sql.DB, userID int, requester string) error {
log.Printf("%s 请求删除用户 %d", requester, userID)

// 执行删除
_, err := db.Exec("DELETE FROM users WHERE id = ?", userID)

if err != nil {
log.Printf("删除失败: %v", err)
return err
}

log.Printf("用户 %d 已被 %s 删除", userID, requester)
return nil
}

特殊场景处理

1. 更新 JSON 字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type ProductMetadata struct {
Color string `json:"color"`
Size string `json:"size"`
WeightKG int `json:"weight_kg"`
}

metadata := ProductMetadata{Color: "red", Size: "L", WeightKG: 1}
jsonData, _ := json.Marshal(metadata)

_, err := db.Exec(`
UPDATE products
SET metadata = ?
WHERE id = ?`,
jsonData,
productID,
)

2. 处理空值更新

1
2
3
4
5
6
7
8
9
10
11
12
var email *string  // 可能为 nil
if updateEmail {
email = &newEmail
}

_, err := db.Exec(`
UPDATE users
SET email = ?
WHERE id = ?`,
email, // 自动处理 NULL
userID,
)

3. UPSERT 操作

1
2
3
4
5
6
7
8
9
// MySQL 的 ON DUPLICATE KEY UPDATE
_, err := db.Exec(`
INSERT INTO user_settings (user_id, notifications_enabled)
VALUES (?, ?)
ON DUPLICATE KEY UPDATE
notifications_enabled = VALUES(notifications_enabled)
`,
userID, enableNotifications,
)

完整示例:电商库存扣减服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package main

import (
"context"
"database/sql"
"fmt"
"log"
"time"

_ "github.com/go-sql-driver/mysql"
)

type InventoryService struct {
db *sql.DB
}

func NewInventoryService(dsn string) (*InventoryService, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, err
}

// 配置连接池
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(5 * time.Minute)
db.SetConnMaxIdleTime(2 * time.Minute)

// 测试连接
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("数据库连接失败: %w", err)
}

return &InventoryService{db: db}, nil
}

func (s *InventoryService) ReduceStock(ctx context.Context, productID, quantity int) error {
// 设置超时
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

// 开始事务
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("开启事务失败: %w", err)
}

// 获取并锁定库存行
var currentStock int
err = tx.QueryRowContext(ctx, `
SELECT stock FROM inventory
WHERE product_id = ? FOR UPDATE`,
productID,
).Scan(&currentStock)

if err == sql.ErrNoRows {
tx.Rollback()
return fmt.Errorf("产品不存在 (ID: %d)", productID)
} else if err != nil {
tx.Rollback()
return fmt.Errorf("查询库存失败: %w", err)
}

// 检查库存是否足够
if currentStock < quantity {
tx.Rollback()
return fmt.Errorf("库存不足,当前库存: %d,需求数量: %d",
currentStock, quantity)
}

// 更新库存
result, err := tx.ExecContext(ctx, `
UPDATE inventory
SET stock = stock - ?
WHERE product_id = ?`,
quantity,
productID,
)
if err != nil {
tx.Rollback()
return fmt.Errorf("更新库存失败: %w", err)
}

// 验证更新结果
if rowsAffected, _ := result.RowsAffected(); rowsAffected != 1 {
tx.Rollback()
return fmt.Errorf("库存更新异常")
}

// 记录库存变动
_, err = tx.ExecContext(ctx, `
INSERT INTO inventory_history
(product_id, quantity_change, type, created_at)
VALUES (?, ?, ?, ?)`,
productID, -quantity, "order_fulfillment", time.Now().UTC(),
)
if err != nil {
tx.Rollback()
return fmt.Errorf("记录历史失败: %w", err)
}

// 提交事务
if err := tx.Commit(); err != nil {
return fmt.Errorf("提交事务失败: %w", err)
}

return nil
}

func (s *InventoryService) Close() {
s.db.Close()
}

func main() {
service, err := NewInventoryService("user:password@tcp(dbserver:3306)/inventory_db")
if err != nil {
log.Fatal(err)
}
defer service.Close()

ctx := context.Background()

// 扣减库存
if err := service.ReduceStock(ctx, 101, 3); err != nil {
log.Printf("库存扣减失败: %v", err)
} else {
log.Println("库存扣减成功")
}
}

总结:db.Exec() 最佳实践

  1. 始终使用参数化查询

    1
    2
    3
    4
    5
    // 错误 ❌ (SQL注入风险)
    db.Exec(fmt.Sprintf("DELETE FROM users WHERE id = %d", userID))

    // 正确 ✅
    db.Exec("DELETE FROM users WHERE id = ?", userID)
  2. 关键操作使用事务

    1
    2
    3
    4
    tx, _ := db.Begin()
    tx.Exec(...)
    tx.Exec(...)
    tx.Commit()
  3. 检查操作结果

    1
    2
    3
    if rowsAffected, _ := result.RowsAffected(); rowsAffected == 0 {
    // 处理未更新情况
    }
  4. 使用上下文控制

    1
    2
    3
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    db.ExecContext(ctx, ...)
  5. 连接池优化

    1
    2
    3
    db.SetMaxOpenConns(25)
    db.SetMaxIdleConns(10)
    db.SetConnMaxLifetime(5*time.Minute)
  6. 批量操作处理

    • 小批量:参数化语句
    • 大批量:事务 + 预处理
  7. 错误分类处理

    1
    2
    3
    4
    5
    6
    if mysqlErr, ok := err.(*mysql.MySQLError); ok {
    switch mysqlErr.Number {
    case 1062: // 唯一约束
    case 1451: // 外键约束
    }
    }
  8. 资源及时关闭

    1
    2
    defer db.Close()
    defer stmt.Close()
  9. 使用预处理重用

    1
    2
    3
    stmt, _ := db.Prepare("UPDATE ...")
    defer stmt.Close()
    stmt.Exec(...)
  10. 日志审计

    1
    log.Printf("执行更新: %s", query)

八、go sql包中常用方法

database/sql 是 Go 标准库中用于数据库操作的核心包,提供了一套统一接口操作各种数据库。以下是该包中常用方法的全面总结和最佳实践指南。

核心接口与方法概览

1. 数据库连接管理

方法/函数 用途 示例
sql.Open 初始化数据库连接 db, err := sql.Open("mysql", dsn)
db.Ping() 验证连接是否有效 err := db.Ping()
db.Close() 关闭连接并释放资源 defer db.Close()
sql.DB 数据库连接池对象 db.SetMaxOpenConns(25)

2. 查询操作

方法 用途 返回类型 适用场景
db.Query() 执行返回多行的查询 *sql.Rows SELECT 返回多行结果
db.QueryRow() 执行返回单行的查询 *sql.Row SELECT 返回单行结果
db.QueryContext() 带上下文的查询 *sql.Rows 需要超时或取消的查询
db.QueryRowContext() 带上下文的单行查询 *sql.Row 需要超时或取消的单行查询

3. 非查询操作

方法 用途 返回类型 适用场景
db.Exec() 执行不返回行的操作 sql.Result INSERT, UPDATE, DELETE, DDL
db.ExecContext() 带上下文的非查询操作 sql.Result 需要超时或取消的写操作
sql.Result 操作结果接口 接口 获取 LastInsertId, RowsAffected

4. 预处理语句

方法 用途 返回类型
db.Prepare() 创建预处理语句 *sql.Stmt
db.PrepareContext() 带上下文的预处理 *sql.Stmt
stmt.Exec() 执行预编译语句 sql.Result
stmt.Query() 执行预编译查询 *sql.Rows
stmt.Close() 关闭预处理语句 -

5. 事务处理

方法 用途 返回类型
db.Begin() 开始事务 *sql.Tx
db.BeginTx() 带选项开始事务 *sql.Tx
tx.Exec() 在事务中执行操作 sql.Result
tx.Query() 在事务中执行查询 *sql.Rows
tx.Commit() 提交事务 -
tx.Rollback() 回滚事务 -

6. 连接池管理

方法 用途
db.SetMaxOpenConns() 设置最大打开连接数
db.SetMaxIdleConns() 设置最大空闲连接数
db.SetConnMaxLifetime() 设置连接最大生存时间
db.SetConnMaxIdleTime() 设置连接最大空闲时间
db.Stats() 获取连接池统计信息

Go database/sql 包中的常见类型详解

Go 的 database/sql 包提供了丰富的类型来处理数据库操作,理解这些核心类型对于编写高效、安全的数据库代码至关重要。以下是主要类型的详细解析:

核心结构类型

1. sql.DB - 数据库连接池

作用

  • 数据库操作的入口点
  • 管理连接池(包括连接创建、重用和关闭)

主要方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 执行不返回行的操作(INSERT, UPDATE, DELETE)
func (db *DB) Exec(query string, args ...interface{}) (Result, error)
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)

// 执行返回多行的查询
func (db *DB) Query(query string, args ...interface{}) (*Rows, error)
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error)

// 执行返回单行的查询
func (db *DB) QueryRow(query string, args ...interface{}) *Row
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row

// 预处理语句
func (db *DB) Prepare(query string) (*Stmt, error)
func (db *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error)

// 事务管理
func (db *DB) Begin() (*Tx, error)
func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error)

连接池控制

1
2
3
4
5
6
7
8
9
10
// 连接池配置
db.SetMaxOpenConns(25) // 最大打开连接数
db.SetMaxIdleConns(10) // 最大空闲连接数
db.SetConnMaxLifetime(5 * time.Minute) // 连接最大生存时间
db.SetConnMaxIdleTime(2 * time.Minute) // 连接最大空闲时间

// 监控连接池状态
stats := db.Stats()
fmt.Printf("打开连接数: %d\n", stats.OpenConnections)
fmt.Printf("使用中连接数: %d\n", stats.InUse)

2. sql.Tx - 事务处理器

作用

  • 表示数据库事务
  • 提供在事务中执行操作的方法

核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
func (tx *Tx) Exec(query string, args ...interface{}) (Result, error)
func (tx *Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)

func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error)
func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error)

func (tx *Tx) QueryRow(query string, args ...interface{}) *Row
func (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row

func (tx *Tx) Prepare(query string) (*Stmt, error)
func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error)

func (tx *Tx) Stmt(stmt *Stmt) *Stmt // 转换预编译语句为事务语句

示例用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
tx, err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
ReadOnly: false,
})
if err != nil {
return err
}

defer func() {
if p := recover(); p != nil {
tx.Rollback()
panic(p)
}
}()

// 执行事务操作
if _, err := tx.Exec("INSERT INTO ...", args...); err != nil {
tx.Rollback()
return err
}

if _, err := tx.Exec("UPDATE ...", args...); err != nil {
tx.Rollback()
return err
}

return tx.Commit()

3. sql.Stmt - 预编译语句

作用

  • 预编译的 SQL 语句,可重复执行
  • 提高执行效率和安全性

核心方法

1
2
3
4
5
6
7
8
9
10
func (s *Stmt) Exec(args ...interface{}) (Result, error)
func (s *Stmt) ExecContext(ctx context.Context, args ...interface{}) (Result, error)

func (s *Stmt) Query(args ...interface{}) (*Rows, error)
func (s *Stmt) QueryContext(ctx context.Context, args ...interface{}) (*Rows, error)

func (s *Stmt) QueryRow(args ...interface{}) *Row
func (s *Stmt) QueryRowContext(ctx context.Context, args ...interface{}) *Row

func (s *Stmt) Close() error // 关闭预编译语句

使用模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 全局准备预编译语句
var createUserStmt *sql.Stmt

func init() {
var err error
createUserStmt, err = db.Prepare(`
INSERT INTO users (name, email)
VALUES (?, ?)`)
if err != nil {
panic("准备语句失败: " + err.Error())
}
}

// 使用预编译语句
func createUser(name, email string) error {
_, err := createUserStmt.Exec(name, email)
return err
}

// 应用退出时关闭
defer createUserStmt.Close()

4. sql.Rows - 多行结果集

作用

  • 表示 SELECT 查询返回的多行结果
  • 支持迭代处理每一行数据

核心方法

1
2
3
4
5
6
7
func (rs *Rows) Close() error           // 关闭结果集(必须调用)
func (rs *Rows) ColumnTypes() ([]*ColumnType, error) // 获取列类型信息
func (rs *Rows) Columns() ([]string, error) // 获取列名
func (rs *Rows) Err() error // 获取迭代过程中的错误
func (rs *Rows) Next() bool // 移动到下一行
func (rs *Rows) NextResultSet() bool // 移动到下一个结果集
func (rs *Rows) Scan(dest ...interface{}) error // 扫描当前行到变量

完整使用流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
rows, err := db.Query("SELECT id, name FROM products")
if err != nil {
return err
}
defer rows.Close() // 确保关闭

var products []Product
for rows.Next() {
var p Product
if err := rows.Scan(&p.ID, &p.Name); err != nil {
log.Printf("扫描错误: %v", err)
continue
}
products = append(products, p)
}

if err := rows.Err(); err != nil {
return fmt.Errorf("遍历出错: %w", err)
}

5. sql.Row - 单行结果处理器

作用

  • 表示 SELECT 查询返回的单行结果
  • 自动处理资源释放(无需 Close)

核心方法

1
func (r *Row) Scan(dest ...interface{}) error

使用模式

1
2
3
4
5
6
7
8
9
10
11
12
13
func getUserName(db *sql.DB, id int) (string, error) {
var name string
err := db.QueryRow("SELECT name FROM users WHERE id = ?", id).Scan(&name)

switch {
case err == sql.ErrNoRows:
return "", fmt.Errorf("用户不存在 (ID: %d)", id)
case err != nil:
return "", fmt.Errorf("查询失败: %w", err)
default:
return name, nil
}
}

6. sql.Result - 操作结果

作用

  • 表示 Exec 操作的结果(INSERT, UPDATE, DELETE)
  • 提供受影响行数和最后插入 ID 信息

核心方法

1
2
3
4
type Result interface {
LastInsertId() (int64, error) // 获取最后插入行的自动生成 ID
RowsAffected() (int64, error) // 获取受影响的行数
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
result, err := db.Exec(`
UPDATE users
SET login_count = login_count + 1
WHERE id = ?`,
userID,
)
if err != nil {
return err
}

// 获取影响行数验证操作是否成功
if affected, _ := result.RowsAffected(); affected == 0 {
return fmt.Errorf("用户不存在 (ID: %d)", userID)
}

// 插入操作获取自增ID
if id, err := result.LastInsertId(); err == nil {
fmt.Println("新建ID:", id)
}

数据处理类型

1. 可空类型(处理 NULL 值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 允许为空的字符串
type NullString struct {
String string
Valid bool // 是否非空
}

// 允许为空的时间
type NullTime struct {
Time time.Time
Valid bool
}

// 允许为空的整数
type NullInt64 struct {
Int64 int64
Valid bool
}

// 允许为空的浮点数
type NullFloat64 struct {
Float64 float64
Valid bool
}

// 允许为空的布尔值
type NullBool struct {
Bool bool
Valid bool
}

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type UserProfile struct {
Bio sql.NullString
BirthDate sql.NullTime
RepScore sql.NullFloat64
}

func getProfile(userID int) (UserProfile, error) {
var profile UserProfile

err := db.QueryRow(`
SELECT bio, birth_date, reputation
FROM profiles
WHERE user_id = ?`,
userID,
).Scan(
&profile.Bio,
&profile.BirthDate,
&profile.RepScore,
)

if err != nil {
return UserProfile{}, err
}

// 使用可空值
if profile.Bio.Valid {
fmt.Println("个人简介:", profile.Bio.String)
} else {
fmt.Println("未设置简介")
}

return profile, nil
}

2. sql.ColumnType - 列元数据类型

作用

  • 提供查询结果的列元数据信息

主要方法

1
2
3
4
5
6
func (ci *ColumnType) DatabaseTypeName() string      // 数据库类型名
func (ci *ColumnType) DecimalSize() (int64, int64, bool) // 小数精度
func (ci *ColumnType) Length() (int64, bool) // 最大长度
func (ci *ColumnType) Name() string // 列名
func (ci *ColumnType) Nullable() (bool, bool) // 是否允许 NULL
func (ci *ColumnType) ScanType() reflect.Type // Go 扫描类型

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
rows, err := db.Query("SELECT * FROM products")
if err != nil {
return err
}
defer rows.Close()

colTypes, err := rows.ColumnTypes()
if err != nil {
return err
}

for _, colType := range colTypes {
name := colType.Name()
dbType := colType.DatabaseTypeName()
nullable, _ := colType.Nullable()

length, hasLength := colType.Length()
if hasLength {
log.Printf("列 %s: 类型 %s, 长度 %d, 可空 %v",
name, dbType, length, nullable)
} else {
log.Printf("列 %s: 类型 %s, 可空 %v",
name, dbType, nullable)
}
}

错误类型与常量

1. 特殊错误

1
2
3
4
5
6
7
var (
// 没有查询到行
ErrNoRows = errors.New("sql: no rows in result set")

// 事务已经完成(已提交或回滚)
ErrTxDone = errors.New("sql: Transaction has already been committed or rolled back")
)

错误处理模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
err := db.QueryRow("SELECT ...").Scan(...)
if errors.Is(err, sql.ErrNoRows) {
// 特殊处理无结果情况
} else if err != nil {
// 其他错误
}

// MySQL 特定错误处理
if mysqlErr, ok := err.(*mysql.MySQLError); ok {
switch mysqlErr.Number {
case 1062: // 唯一约束冲突
case 1213: // 死锁
case 1451: // 外键约束违反
// ...
}
}

2. 事务隔离级别常量

1
2
3
4
5
6
7
8
9
10
11
12
type IsolationLevel int

const (
LevelDefault IsolationLevel = iota
LevelReadUncommitted
LevelReadCommitted
LevelWriteCommitted
LevelRepeatableRead
LevelSnapshot
LevelSerializable
LevelLinearizable
)

使用方式

1
2
3
4
tx, err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: false,
})

接口类型

1. sql.Scanner - 自定义扫描接口

1
2
3
4
type Scanner interface {
// 扫描数据库值到实现类型
Scan(src interface{}) error
}

实现示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type JSONData struct {
Field1 string `json:"field1"`
Field2 int `json:"field2"`
}

func (j *JSONData) Scan(src interface{}) error {
var data []byte
switch v := src.(type) {
case string:
data = []byte(v)
case []byte:
data = v
default:
return fmt.Errorf("不支持的JSON类型: %T", src)
}
return json.Unmarshal(data, j)
}

// 使用
var config JSONData
err := db.QueryRow("SELECT config FROM settings").Scan(&config)

2. sql.Valuer - 自定义值转换接口

1
2
3
4
type Valuer interface {
// 将Go值转换为数据库支持的值
Value() (Value, error)
}

实现示例

1
2
3
4
5
6
7
8
9
10
11
12
type BoolAsInt bool

func (b BoolAsInt) Value() (driver.Value, error) {
if b {
return int64(1), nil
}
return int64(0), nil
}

// 使用
isActive := BoolAsInt(true)
_, err := db.Exec("INSERT INTO settings (is_active) VALUES (?)", isActive)

连接池统计结构

sql.DBStats - 连接池统计信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type DBStats struct {
MaxOpenConnections int // 最大打开连接数

// 连接池状态
OpenConnections int // 当前打开的连接数
InUse int // 正在使用的连接数
Idle int // 空闲连接数

// 连接等待状态
WaitCount int64 // 等待获取连接的总次数
WaitDuration time.Duration // 等待连接的总时间
MaxIdleClosed int64 // 因为超过MaxIdleConns而关闭的连接总数
MaxIdleTimeClosed int64 // 因为超过ConnMaxIdleTime而关闭的连接总数
MaxLifetimeClosed int64 // 因为超过ConnMaxLifetime而关闭的连接总数
}

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 定时获取连接池状态
go func() {
ticker := time.NewTicker(1 * time.Minute)
for range ticker.C {
stats := db.Stats()
log.Printf(`
数据库连接池状态:
最大打开: %d
当前打开: %d
使用中: %d
空闲: %d
等待次数: %d
等待时间: %v`,
stats.MaxOpenConnections,
stats.OpenConnections,
stats.InUse,
stats.Idle,
stats.WaitCount,
stats.WaitDuration,
)
}
}()

实战类型使用模式

1. 安全查询和扫描模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func queryProducts(db *sql.DB, category string) ([]Product, error) {
const query = `
SELECT id, name, price, quantity
FROM products
WHERE category = ? AND available = 1`

rows, err := db.Query(query, category)
if err != nil {
return nil, fmt.Errorf("查询失败: %w", err)
}
defer rows.Close()

var products []Product
for rows.Next() {
var p Product
if err := rows.Scan(
&p.ID,
&p.Name,
&p.Price,
&p.Quantity,
); err != nil {
log.Printf("扫描错误: %v", err)
continue
}
products = append(products, p)
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("结果集错误: %w", err)
}

return products, nil
}

2. 高级事务处理模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func completeOrder(db *sql.DB, orderID int) error {
// 开启事务
tx, err := db.BeginTx(context.Background(), &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
return fmt.Errorf("开启事务失败: %w", err)
}

// 确保事务结束
var succeeded bool
defer func() {
if !succeeded {
if rErr := tx.Rollback(); rErr != nil {
log.Printf("回滚失败: %v", rErr)
}
}
}()

// 标记订单为完成
if _, err := tx.Exec(`
UPDATE orders
SET status = 'completed'
WHERE id = ? AND status = 'pending'`,
orderID,
); err != nil {
return fmt.Errorf("更新订单状态失败: %w", err)
}

// 减少库存
rows, err := tx.Query(`
SELECT product_id, quantity
FROM order_items
WHERE order_id = ?`,
orderID,
)
if err != nil {
return fmt.Errorf("查询订单项失败: %w", err)
}
defer rows.Close()

for rows.Next() {
var productID, quantity int
if err := rows.Scan(&productID, &quantity); err != nil {
return fmt.Errorf("扫描订单项失败: %w", err)
}

if _, err := tx.Exec(`
UPDATE inventory
SET quantity = quantity - ?
WHERE product_id = ? AND quantity >= ?`,
quantity, productID, quantity,
); err != nil {
return fmt.Errorf("更新库存失败: %w", err)
}
}

if err := rows.Err(); err != nil {
return fmt.Errorf("处理订单项错误: %w", err)
}

// 提交事务
if err := tx.Commit(); err != nil {
return fmt.Errorf("提交事务失败: %w", err)
}

succeeded = true
return nil