首先需要安装 Python 操作 MySQL 的库,最常用的是 mysql-connector-python(官方库)或 pymysql,这里以官方库为例:

1

pip install mysql-connector-python

二、完整实现步骤

1. 先在 MySQL 中创建测试存储过程

首先我们需要一个可测试的存储过程,比如创建一个根据用户 ID 查询用户信息的存储过程:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

-- 先创建测试表(可选)

CREATE TABLE IF NOT EXISTS users (

    id INT PRIMARY KEY AUTO_INCREMENT,

    name VARCHAR(50) NOT NULL,

    age INT,

    email VARCHAR(100)

);

-- 插入测试数据

INSERT INTO users (name, age, email) VALUES

('张三', 25, 'zhangsan@test.com'),

('李四', 30, 'lisi@test.com');

-- 创建存储过程:根据ID查询用户信息

DELIMITER //  -- 临时修改语句结束符为//,避免与存储过程内的;冲突

CREATE PROCEDURE get_user_by_id(IN user_id INT, OUT user_name VARCHAR(50), OUT user_age INT)

BEGIN

    SELECT name, age INTO user_name, user_age FROM users WHERE id = user_id;

END //

DELIMITER ;  -- 恢复语句结束符为;

-- 另一个测试存储过程:无参数,查询所有用户

CREATE PROCEDURE get_all_users()

BEGIN

    SELECT * FROM users;

END;

2. Python 调用存储过程的代码实现

下面是 Python 调用存储过程的完整代码,包含两种常见场景:带输入/输出参数的存储过程无参数的存储过程

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

import mysql.connector

from mysql.connector import Error

def call_mysql_procedure():

    # 数据库连接配置

    config = {

        'host': 'localhost',       # 数据库地址

        'user': 'root',            # 用户名

        'password': '你的数据库密码', # 密码

        'database': 'test_db'      # 数据库名(替换为你的库名)

    }

    connection = None

    try:

        # 1. 建立数据库连接

        connection = mysql.connector.connect(**config)

        if connection.is_connected():

            cursor = connection.cursor()

            # ========== 场景1:调用带输入/输出参数的存储过程 ==========

            # 定义输入参数和输出参数

            user_id = 1  # 输入参数

            user_name = None  # 输出参数(初始化为None)

            user_age = None   # 输出参数(初始化为None)

            # 调用存储过程:callproc(存储过程名, (参数列表))

            cursor.callproc('get_user_by_id', (user_id, user_name, user_age))

            # 获取输出参数(存储过程执行后,参数会被更新,存在cursor.stored_results()中)

            for result in cursor.stored_results():

                # 或者直接通过 cursor.outputs 获取(不同版本可能略有差异)

                output_params = result.fetchone()

                if output_params:

                    user_name, user_age = output_params

            print(f"场景1 - 查询ID为{user_id}的用户:")

            print(f"姓名:{user_name},年龄:{user_age}\n")

            # ========== 场景2:调用无参数的存储过程(返回结果集) ==========

            print("场景2 - 查询所有用户:")

            cursor.callproc('get_all_users'# 无参数,传入空元组或省略

            # 遍历结果集

            for result in cursor.stored_results():

                users = result.fetchall()  # 获取所有结果

                # 打印表头

                columns = [desc[0] for desc in result.description]

                print("\t".join(columns))

                # 打印数据

                for user in users:

                    print("\t".join(str(col) for col in user))

            # 提交事务(如果存储过程有修改操作,必须提交)

            connection.commit()

    except Error as e:

        print(f"数据库操作出错:{e}")

        # 出错时回滚事务

        if connection:

            connection.rollback()

    finally:

        # 关闭游标和连接

        if connection and connection.is_connected():

            if cursor:

                cursor.close()

            connection.close()

            print("\n数据库连接已关闭")

if __name__ == "__main__":

    call_mysql_procedure()

复制讲解

三、关键代码解释

1.连接数据库:通过 mysql.connector.connect() 传入配置参数建立连接,注意替换为你的数据库地址、用户名、密码和库名。

2.调用存储过程核心方法cursor.callproc(proc_name, params)

  • proc_name:存储过程名称(字符串)。
  • params:参数列表(元组),顺序需与存储过程的参数(IN/OUT/INOUT)一一对应。

3.获取输出参数/结果集

  • 带输出参数的存储过程:执行 callproc 后,通过 cursor.stored_results() 遍历结果,获取输出参数的值。

更多推荐