我来教你用Python+MySQL制作一个完整的问卷调查结果收集与统计工具。

第一步:创建数据库和表

CREATE DATABASE IF NOT EXISTS survey_db;
USE survey_db;

-- 问卷表
CREATE TABLE IF NOT EXISTS surveys (
    id INT AUTO_INCREMENT PRIMARY KEY,
    title VARCHAR(200) NOT NULL COMMENT '问卷标题',
    description TEXT COMMENT '问卷说明',
    creator VARCHAR(100) COMMENT '创建者',
    status ENUM('draft', 'active', 'closed') DEFAULT 'draft' COMMENT '状态',
    start_time DATETIME COMMENT '开始时间',
    end_time DATETIME COMMENT '结束时间',
    response_limit INT DEFAULT 0 COMMENT '答卷数量限制(0不限)',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 题目表
CREATE TABLE IF NOT EXISTS questions (
    id INT AUTO_INCREMENT PRIMARY KEY,
    survey_id INT NOT NULL COMMENT '所属问卷ID',
    question_text TEXT NOT NULL COMMENT '题目内容',
    question_type ENUM('single', 'multiple', 'text', 'rating', 'scale') NOT NULL COMMENT '题型',
    options JSON COMMENT '选项(JSON数组)',
    required BOOLEAN DEFAULT TRUE COMMENT '是否必答',
    sort_order INT DEFAULT 0 COMMENT '排序',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (survey_id) REFERENCES surveys(id) ON DELETE CASCADE
);

-- 答卷表
CREATE TABLE IF NOT EXISTS responses (
    id INT AUTO_INCREMENT PRIMARY KEY,
    survey_id INT NOT NULL COMMENT '问卷ID',
    respondent_name VARCHAR(100) COMMENT '受访者姓名',
    respondent_email VARCHAR(100) COMMENT '受访者邮箱',
    submit_time DATETIME NOT NULL COMMENT '提交时间',
    ip_address VARCHAR(45) COMMENT 'IP地址',
    duration INT COMMENT '填写时长(秒)',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (survey_id) REFERENCES surveys(id) ON DELETE CASCADE
);

-- 答案表
CREATE TABLE IF NOT EXISTS answers (
    id INT AUTO_INCREMENT PRIMARY KEY,
    response_id INT NOT NULL COMMENT '答卷ID',
    question_id INT NOT NULL COMMENT '题目ID',
    answer_value TEXT COMMENT '答案内容',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (response_id) REFERENCES responses(id) ON DELETE CASCADE,
    FOREIGN KEY (question_id) REFERENCES questions(id) ON DELETE CASCADE
);

完整代码实现

import pymysql
import json
from datetime import datetime, date
from prettytable import PrettyTable
import matplotlib.pyplot as plt
import matplotlib
import numpy as np
import uuid
import csv
matplotlib.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
matplotlib.rcParams['axes.unicode_minus'] = False

class SurveyTool:
    def __init__(self):
        """初始化问卷工具"""
        self.current_survey = None
        
        self.db_config = {
            'host': 'localhost',
            'user': 'root',
            'password': 'your_password',  # 修改为你的密码
            'database': 'survey_db',
            'charset': 'utf8mb4',
            'cursorclass': pymysql.cursors.DictCursor
        }

    def connect_db(self):
        """连接数据库"""
        self.connection = pymysql.connect(**self.db_config)
        self.cursor = self.connection.cursor()

    def close_db(self):
        """关闭数据库连接"""
        if self.cursor:
            self.cursor.close()
        if self.connection:
            self.connection.close()

    # ========== 问卷管理 ==========

    def create_survey(self):
        """创建问卷"""
        print("\n--- 📝 创建新问卷 ---")
        
        title = input("问卷标题: ").strip()
        if not title:
            print("❌ 标题不能为空")
            return
        
        description = input("问卷说明: ").strip()
        creator = input("创建者: ").strip()
        
        # 添加题目
        questions = []
        print("\n📋 开始添加题目 (输入空题号结束)")
        
        q_num = 1
        while True:
            print(f"\n--- 第{q_num}题 ---")
            q_text = input("题目内容 (直接回车结束): ").strip()
            if not q_text:
                break
            
            print("题型选择:")
            print("1. 单选题 (single)")
            print("2. 多选题 (multiple)")
            print("3. 文本题 (text)")
            print("4. 评分题 (rating)")
            print("5. 量表题 (scale)")
            
            type_map = {'1': 'single', '2': 'multiple', '3': 'text', '4': 'rating', '5': 'scale'}
            q_type = type_map.get(input("请选择 (1-5): ").strip(), 'text')
            
            options = []
            if q_type in ['single', 'multiple']:
                print("请输入选项 (每行一个,输入空行结束):")
                while True:
                    option = input(f"  选项{len(options)+1}: ").strip()
                    if not option:
                        break
                    options.append(option)
            
            required = input("是否必答? (y/n, 默认y): ").strip().lower() != 'n'
            
            questions.append({
                'text': q_text,
                'type': q_type,
                'options': options,
                'required': required,
                'sort_order': q_num
            })
            q_num += 1
        
        if not questions:
            print("❌ 至少需要一个题目")
            return
        
        # 保存到数据库
        self.connect_db()
        try:
            sql = """INSERT INTO surveys (title, description, creator, status) 
                     VALUES (%s, %s, %s, 'draft')"""
            self.cursor.execute(sql, (title, description, creator))
            survey_id = self.cursor.lastrowid
            
            for q in questions:
                sql = """INSERT INTO questions (survey_id, question_text, question_type, 
                         options, required, sort_order) 
                         VALUES (%s, %s, %s, %s, %s, %s)"""
                self.cursor.execute(sql, (
                    survey_id, q['text'], q['type'],
                    json.dumps(q['options'], ensure_ascii=False),
                    q['required'], q['sort_order']
                ))
            
            self.connection.commit()
            print(f"\n✅ 问卷创建成功! ID: {survey_id}")
            print(f"📊 共 {len(questions)} 道题目")
            
        except Exception as e:
            print(f"❌ 创建失败: {e}")
            self.connection.rollback()
        finally:
            self.close_db()

    def list_surveys(self):
        """列出所有问卷"""
        self.connect_db()
        sql = """SELECT s.*, 
                 (SELECT COUNT(*) FROM questions WHERE survey_id = s.id) as question_count,
                 (SELECT COUNT(*) FROM responses WHERE survey_id = s.id) as response_count
                 FROM surveys s ORDER BY s.created_at DESC"""
        self.cursor.execute(sql)
        surveys = self.cursor.fetchall()
        self.close_db()
        
        if not surveys:
            print("📭 没有问卷")
            return
        
        table = PrettyTable()
        table.field_names = ["ID", "标题", "题目数", "答卷数", "状态", "创建时间"]
        
        status_icons = {'draft': '📝草稿', 'active': '🟢进行中', 'closed': '🔴已结束'}
        
        for s in surveys:
            table.add_row([
                s['id'],
                s['title'][:20] + '..' if len(s['title']) > 22 else s['title'],
                s['question_count'],
                s['response_count'],
                status_icons.get(s['status'], s['status']),
                s['created_at'].strftime('%Y-%m-%d')
            ])
        
        print(f"\n📋 共 {len(surveys)} 份问卷:")
        print(table)

    def edit_survey(self):
        """编辑问卷"""
        print("\n--- ✏️ 编辑问卷 ---")
        
        survey_id = input("请输入问卷ID: ").strip()
        if not survey_id.isdigit():
            print("❌ 无效的ID")
            return
        
        self.connect_db()
        sql = "SELECT * FROM surveys WHERE id = %s"
        self.cursor.execute(sql, (int(survey_id),))
        survey = self.cursor.fetchone()
        
        if not survey:
            print("❌ 未找到该问卷")
            self.close_db()
            return
        
        if survey['status'] == 'active':
            print("⚠️ 问卷已发布,只能修改标题和说明")
        
        print(f"\n当前标题: {survey['title']}")
        title = input(f"新标题 (回车保持不变): ").strip() or survey['title']
        
        print(f"当前说明: {survey['description']}")
        description = input(f"新说明 (回车保持不变): ").strip() or survey['description']
        
        sql = "UPDATE surveys SET title=%s, description=%s WHERE id=%s"
        self.cursor.execute(sql, (title, description, int(survey_id)))
        self.connection.commit()
        print("✅ 问卷信息已更新")
        
        # 如果是草稿状态,可以编辑题目
        if survey['status'] == 'draft':
            edit_q = input("\n是否编辑题目? (y/n): ").strip().lower()
            if edit_q == 'y':
                self._edit_questions(int(survey_id))
        
        self.close_db()

    def _edit_questions(self, survey_id):
        """编辑题目"""
        sql = "SELECT * FROM questions WHERE survey_id = %s ORDER BY sort_order"
        self.cursor.execute(sql, (survey_id,))
        questions = self.cursor.fetchall()
        
        print(f"\n当前共 {len(questions)} 道题目:")
        for q in questions:
            print(f"  {q['sort_order']}. [{q['question_type']}] {q['question_text'][:30]}")
        
        print("\n操作选项:")
        print("1. 添加题目")
        print("2. 删除题目")
        print("3. 修改题目")
        
        choice = input("请选择 (1-3): ").strip()
        
        if choice == '1':
            self._add_question(survey_id, len(questions) + 1)
        elif choice == '2':
            q_id = input("请输入要删除的题目ID: ").strip()
            if q_id.isdigit():
                self.cursor.execute("DELETE FROM questions WHERE id = %s AND survey_id = %s", 
                                   (int(q_id), survey_id))
                self.connection.commit()
                print("✅ 题目已删除")
        elif choice == '3':
            # 简化版:重新创建题目
            print("功能开发中...")

    def _add_question(self, survey_id, sort_order):
        """添加题目"""
        q_text = input("题目内容: ").strip()
        if not q_text:
            return
        
        print("题型: 1.单选 2.多选 3.文本 4.评分 5.量表")
        type_map = {'1': 'single', '2': 'multiple', '3': 'text', '4': 'rating', '5': 'scale'}
        q_type = type_map.get(input("请选择: ").strip(), 'text')
        
        options = []
        if q_type in ['single', 'multiple']:
            print("输入选项 (每行一个,空行结束):")
            while True:
                opt = input(f"  选项{len(options)+1}: ").strip()
                if not opt:
                    break
                options.append(opt)
        
        required = input("是否必答? (y/n): ").strip().lower() != 'n'
        
        sql = """INSERT INTO questions (survey_id, question_text, question_type, 
                 options, required, sort_order) VALUES (%s, %s, %s, %s, %s, %s)"""
        self.cursor.execute(sql, (survey_id, q_text, q_type, 
                                 json.dumps(options, ensure_ascii=False), required, sort_order))
        self.connection.commit()
        print("✅ 题目已添加")

    def publish_survey(self):
        """发布/关闭问卷"""
        print("\n--- 🚀 发布/管理问卷 ---")
        
        survey_id = input("请输入问卷ID: ").strip()
        if not survey_id.isdigit():
            print("❌ 无效的ID")
            return
        
        self.connect_db()
        sql = "SELECT * FROM surveys WHERE id = %s"
        self.cursor.execute(sql, (int(survey_id),))
        survey = self.cursor.fetchone()
        
        if not survey:
            print("❌ 未找到该问卷")
            self.close_db()
            return
        
        print(f"\n当前状态: {survey['status']}")
        print("1. 发布问卷 (active)")
        print("2. 关闭问卷 (closed)")
        print("3. 设为草稿 (draft)")
        
        choice = input("请选择 (1-3): ").strip()
        status_map = {'1': 'active', '2': 'closed', '3': 'draft'}
        new_status = status_map.get(choice)
        
        if new_status:
            sql = "UPDATE surveys SET status = %s WHERE id = %s"
            self.cursor.execute(sql, (new_status, int(survey_id)))
            self.connection.commit()
            print(f"✅ 问卷状态已更新为: {new_status}")
        else:
            print("❌ 无效选项")
        
        self.close_db()

    def delete_survey(self):
        """删除问卷"""
        print("\n--- 🗑️ 删除问卷 ---")
        
        survey_id = input("请输入问卷ID: ").strip()
        if not survey_id.isdigit():
            print("❌ 无效的ID")
            return
        
        confirm = input(f"确认删除问卷 ID={survey_id} 及其所有数据? (y/n): ").strip().lower()
        if confirm != 'y':
            return
        
        self.connect_db()
        try:
            # 级联删除会处理关联数据
            self.cursor.execute("DELETE FROM surveys WHERE id = %s", (int(survey_id),))
            self.connection.commit()
            print("✅ 问卷已删除")
        except Exception as e:
            print(f"❌ 删除失败: {e}")
            self.connection.rollback()
        finally:
            self.close_db()

    # ========== 问卷填写 ==========

    def take_survey(self):
        """填写问卷"""
        print("\n--- 📝 填写问卷 ---")
        
        survey_id = input("请输入问卷ID: ").strip()
        if not survey_id.isdigit():
            print("❌ 无效的ID")
            return
        
        self.connect_db()
        sql = "SELECT * FROM surveys WHERE id = %s AND status = 'active'"
        self.cursor.execute(sql, (int(survey_id),))
        survey = self.cursor.fetchone()
        
        if not survey:
            print("❌ 问卷不存在或未开放")
            self.close_db()
            return
        
        # 获取题目
        sql = "SELECT * FROM questions WHERE survey_id = %s ORDER BY sort_order"
        self.cursor.execute(sql, (int(survey_id),))
        questions = self.cursor.fetchall()
        
        if not questions:
            print("❌ 问卷没有题目")
            self.close_db()
            return
        
        print(f"\n📋 {survey['title']}")
        if survey['description']:
            print(f"📄 {survey['description']}")
        print(f"📊 共 {len(questions)} 道题")
        print("=" * 40)
        
        # 受访者信息
        respondent_name = input("\n您的姓名 (可选): ").strip()
        respondent_email = input("您的邮箱 (可选): ").strip()
        
        start_time = datetime.now()
        answers = []
        
        # 逐题作答
        for q in questions:
            print(f"\n第{q['sort_order']}题: {q['question_text']}")
            if q['required']:
                print("  (必答)")
            
            options = json.loads(q['options']) if q['options'] else []
            
            if q['question_type'] == 'single':
                for i, opt in enumerate(options, 1):
                    print(f"  {i}. {opt}")
                
                while True:
                    choice = input("请选择 (输入编号): ").strip()
                    if choice.isdigit() and 1 <= int(choice) <= len(options):
                        answers.append({'question_id': q['id'], 'answer': options[int(choice)-1]})
                        break
                    elif not q['required'] and not choice:
                        answers.append({'question_id': q['id'], 'answer': ''})
                        break
                    print("❌ 无效选择")
            
            elif q['question_type'] == 'multiple':
                for i, opt in enumerate(options, 1):
                    print(f"  {i}. {opt}")
                
                choices = input("请选择 (多个用逗号分隔,如1,3): ").strip()
                selected = []
                if choices:
                    for c in choices.split(','):
                        c = c.strip()
                        if c.isdigit() and 1 <= int(c) <= len(options):
                            selected.append(options[int(c)-1])
                answers.append({'question_id': q['id'], 'answer': ','.join(selected)})
            
            elif q['question_type'] == 'rating':
                print("  评分: 1-5星")
                while True:
                    rating = input("请评分 (1-5): ").strip()
                    if rating.isdigit() and 1 <= int(rating) <= 5:
                        answers.append({'question_id': q['id'], 'answer': rating})
                        break
                    elif not q['required'] and not rating:
                        answers.append({'question_id': q['id'], 'answer': ''})
                        break
                    print("❌ 请输入1-5的数字")
            
            elif q['question_type'] == 'scale':
                print("  量表: 1(非常不满意) - 10(非常满意)")
                while True:
                    scale = input("请打分 (1-10): ").strip()
                    if scale.isdigit() and 1 <= int(scale) <= 10:
                        answers.append({'question_id': q['id'], 'answer': scale})
                        break
                    elif not q['required'] and not scale:
                        answers.append({'question_id': q['id'], 'answer': ''})
                        break
                    print("❌ 请输入1-10的数字")
            
            else:  # text
                text_answer = input("请输入您的回答: ").strip()
                if q['required'] and not text_answer:
                    print("❌ 此题必答")
                    text_answer = input("请输入您的回答: ").strip()
                answers.append({'question_id': q['id'], 'answer': text_answer})
        
        # 计算填写时长
        end_time = datetime.now()
        duration = int((end_time - start_time).total_seconds())
        
        # 保存答卷
        try:
            sql = """INSERT INTO responses (survey_id, respondent_name, respondent_email, 
                     submit_time, duration) VALUES (%s, %s, %s, %s, %s)"""
            self.cursor.execute(sql, (int(survey_id), respondent_name, respondent_email, 
                                     end_time, duration))
            response_id = self.cursor.lastrowid
            
            for ans in answers:
                sql = "INSERT INTO answers (response_id, question_id, answer_value) VALUES (%s, %s, %s)"
                self.cursor.execute(sql, (response_id, ans['question_id'], ans['answer']))
            
            self.connection.commit()
            print(f"\n✅ 问卷提交成功! 感谢您的参与!")
            print(f"⏱️  填写用时: {duration}秒")
            
        except Exception as e:
            print(f"❌ 提交失败: {e}")
            self.connection.rollback()
        finally:
            self.close_db()

    # ========== 数据统计 ==========

    def view_statistics(self):
        """查看统计数据"""
        print("\n--- 📊 问卷统计 ---")
        
        survey_id = input("请输入问卷ID: ").strip()
        if not survey_id.isdigit():
            print("❌ 无效的ID")
            return
        
        self.connect_db()
        
        # 获取问卷信息
        sql = "SELECT * FROM surveys WHERE id = %s"
        self.cursor.execute(sql, (int(survey_id),))
        survey = self.cursor.fetchone()
        
        if not survey:
            print("❌ 未找到该问卷")
            self.close_db()
            return
        
        # 获取题目
        sql = "SELECT * FROM questions WHERE survey_id = %s ORDER BY sort_order"
        self.cursor.execute(sql, (int(survey_id),))
        questions = self.cursor.fetchall()
        
        # 获取答卷数量
        sql = "SELECT COUNT(*) as count FROM responses WHERE survey_id = %s"
        self.cursor.execute(sql, (int(survey_id),))
        response_count = self.cursor.fetchone()['count']
        
        print(f"\n📊 {survey['title']} 统计报告")
        print(f"📝 总答卷数: {response_count}")
        print(f"📅 创建时间: {survey['created_at'].strftime('%Y-%m-%d %H:%M')}")
        print("=" * 50)
        
        if response_count == 0:
            print("📭 暂无答卷数据")
            self.close_db()
            return
        
        # 逐题统计
        for q in questions:
            print(f"\n📌 第{q['sort_order']}题: {q['question_text']}")
            
            options = json.loads(q['options']) if q['options'] else []
            
            if q['question_type'] in ['single', 'multiple']:
                # 选择题统计
                sql = """SELECT answer_value, COUNT(*) as count 
                         FROM answers WHERE question_id = %s 
                         GROUP BY answer_value ORDER BY count DESC"""
                self.cursor.execute(sql, (q['id'],))
                stats = self.cursor.fetchall()
                
                if stats:
                    table = PrettyTable()
                    table.field_names = ["选项", "票数", "比例", "条形图"]
                    
                    total = sum(s['count'] for s in stats)
                    for s in stats:
                        pct = (s['count'] / total) * 100
                        bar = '█' * int(pct / 5)
                        table.add_row([s['answer_value'], s['count'], f"{pct:.1f}%", bar])
                    
                    print(table)
            
            elif q['question_type'] == 'rating':
                # 评分统计
                sql = """SELECT answer_value, COUNT(*) as count 
                         FROM answers WHERE question_id = %s 
                         GROUP BY answer_value ORDER BY answer_value"""
                self.cursor.execute(sql, (q['id'],))
                ratings = self.cursor.fetchall()
                
                if ratings:
                    avg_rating = sum(int(r['answer_value']) * r['count'] for r in ratings) / \
                                sum(r['count'] for r in ratings)
                    stars = '⭐' * round(avg_rating)
                    print(f"平均评分: {avg_rating:.1f} {stars}")
                    
                    for r in ratings:
                        bar = '█' * (int(r['count']) * 2)
                        print(f"  {r['answer_value']}星: {bar} ({r['count']}人)")
            
            elif q['question_type'] == 'scale':
                # 量表统计
                sql = """SELECT answer_value, COUNT(*) as count 
                         FROM answers WHERE question_id = %s 
                         GROUP BY answer_value ORDER BY answer_value"""
                self.cursor.execute(sql, (q['id'],))
                scales = self.cursor.fetchall()
                
                if scales:
                    avg_scale = sum(int(s['answer_value']) * s['count'] for s in scales) / \
                               sum(s['count'] for s in scales)
                    print(f"平均分: {avg_scale:.1f}/10")
                    
                    for s in scales:
                        bar = '█' * (int(s['count']) * 2)
                        print(f"  {s['answer_value']}分: {bar} ({s['count']}人)")
            
            else:  # text
                # 文本题显示所有回答
                sql = "SELECT answer_value FROM answers WHERE question_id = %s AND answer_value != ''"
                self.cursor.execute(sql, (q['id'],))
                texts = self.cursor.fetchall()
                
                if texts:
                    print(f"共 {len(texts)} 条文本回答:")
                    for i, t in enumerate(texts[:10], 1):
                        print(f"  {i}. {t['answer_value'][:50]}")
                    if len(texts) > 10:
                        print(f"  ... 还有 {len(texts)-10} 条回答")
        
        self.close_db()

    def generate_charts(self):
        """生成统计图表"""
        print("\n--- 📈 生成统计图表 ---")
        
        survey_id = input("请输入问卷ID: ").strip()
        if not survey_id.isdigit():
            print("❌ 无效的ID")
            return
        
        self.connect_db()
        
        # 获取问卷信息
        sql = "SELECT * FROM surveys WHERE id = %s"
        self.cursor.execute(sql, (int(survey_id),))
        survey = self.cursor.fetchone()
        
        if not survey:
            print("❌ 未找到该问卷")
            self.close_db()
            return
        
        # 获取题目
        sql = "SELECT * FROM questions WHERE survey_id = %s ORDER BY sort_order"
        self.cursor.execute(sql, (int(survey_id),))
        questions = self.cursor.fetchall()
        
        # 获取答卷数量
        sql = "SELECT COUNT(*) as count FROM responses WHERE survey_id = %s"
        self.cursor.execute(sql, (int(survey_id),))
        response_count = self.cursor.fetchone()['count']
        
        if response_count == 0:
            print("📭 暂无数据,无法生成图表")
            self.close_db()
            return
        
        # 创建图表
        chart_count = sum(1 for q in questions if q['question_type'] in ['single', 'multiple', 'rating', 'scale'])
        if chart_count == 0:
            print("📭 没有适合生成图表的题目类型")
            self.close_db()
            return
        
        fig, axes = plt.subplots((chart_count + 1) // 2, 2, figsize=(14, 5 * ((chart_count + 1) // 2)))
        fig.suptitle(f'{survey["title"]} - 调查结果统计', fontsize=16, fontweight='bold')
        
        plot_idx = 0
        for q in questions:
            if q['question_type'] not in ['single', 'multiple', 'rating', 'scale']:
                continue
            
            ax = axes[plot_idx // 2, plot_idx % 2] if chart_count > 1 else axes
            plot_idx += 1
            
            sql = """SELECT answer_value, COUNT(*) as count 
                     FROM answers WHERE question_id = %s 
                     GROUP BY answer_value ORDER BY count DESC"""
            self.cursor.execute(sql, (q['id'],))
            data = self.cursor.fetchall()
            
            if not data:
                continue
            
            labels = [d['answer_value'] for d in data]
            values = [d['count'] for d in data]
            colors = plt.cm.Set3(np.linspace(0, 1, len(data)))
            
            if q['question_type'] in ['single', 'multiple']:
                # 饼图
                wedges, texts, autotexts = ax.pie(values, labels=None, autopct='%1.1f%%',
                                                  colors=colors, startangle=90)
                ax.set_title(f'Q{q["sort_order"]}: {q["question_text"][:20]}', fontsize=10)
                ax.legend(wedges, labels, title="选项", loc="center left", 
                         bbox_to_anchor=(1, 0, 0.5, 1), fontsize=8)
            else:
                # 柱状图
                bars = ax.bar(labels, values, color=colors)
                ax.set_title(f'Q{q["sort_order"]}: {q["question_text"][:20]}', fontsize=10)
                ax.set_ylabel('人数')
                ax.tick_params(axis='x', rotation=45, labelsize=8)
                
                for bar, val in zip(bars, values):
                    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height(),
                           str(val), ha='center', va='bottom', fontsize=9)
        
        # 隐藏多余的子图
        for i in range(plot_idx, (chart_count + 1) // 2 * 2):
            if chart_count > 1:
                axes[i // 2, i % 2].axis('off')
        
        plt.tight_layout()
        
        # 保存图片
        filename = f"survey_{survey_id}_results.png"
        plt.savefig(filename, dpi=150, bbox_inches='tight')
        print(f"✅ 图表已保存: {filename}")
        plt.show()
        
        self.close_db()

    def export_results(self):
        """导出调查结果"""
        print("\n--- 💾 导出调查结果 ---")
        
        survey_id = input("请输入问卷ID: ").strip()
        if not survey_id.isdigit():
            print("❌ 无效的ID")
            return
        
        self.connect_db()
        
        # 获取问卷信息
        sql = "SELECT * FROM surveys WHERE id = %s"
        self.cursor.execute(sql, (int(survey_id),))
        survey = self.cursor.fetchone()
        
        if not survey:
            print("❌ 未找到该问卷")
            self.close_db()
            return
        
        # 获取题目
        sql = "SELECT * FROM questions WHERE survey_id = %s ORDER BY sort_order"
        self.cursor.execute(sql, (int(survey_id),))
        questions = self.cursor.fetchall()
        
        # 获取所有答卷
        sql = """SELECT * FROM responses WHERE survey_id = %s ORDER BY submit_time"""
        self.cursor.execute(sql, (int(survey_id),))
        responses = self.cursor.fetchall()
        
        if not responses:
            print("📭 没有答卷数据可导出")
            self.close_db()
            return
        
        # 导出CSV
        filename = f"survey_{survey_id}_results_{date.today()}.csv"
        
        with open(filename, 'w', encoding='utf-8-sig', newline='') as f:
            writer = csv.writer(f)
            
            # 写入表头
            headers = ['答卷ID', '受访者', '邮箱', '提交时间', '用时(秒)']
            for q in questions:
                headers.append(f'Q{q["sort_order"]}: {q["question_text"][:20]}')
            writer.writerow(headers)
            
            # 写入数据
            for resp in responses:
                row = [
                    resp['id'],
                    resp['respondent_name'] or '',
                    resp['respondent_email'] or '',
                    resp['submit_time'].strftime('%Y-%m-%d %H:%M:%S'),
                    resp['duration']
                ]
                
                for q in questions:
                    sql = "SELECT answer_value FROM answers WHERE response_id = %s AND question_id = %s"
                    self.cursor.execute(sql, (resp['id'], q['id']))
                    ans = self.cursor.fetchone()
                    row.append(ans['answer_value'] if ans else '')
                
                writer.writerow(row)
        
        print(f"✅ 已导出 {len(responses)} 条答卷到: {filename}")
        
        # 也导出统计摘要
        summary_file = f"survey_{survey_id}_summary_{date.today()}.txt"
        with open(summary_file, 'w', encoding='utf-8') as f:
            f.write(f"问卷: {survey['title']}\n")
            f.write(f"总答卷数: {len(responses)}\n")
            f.write(f"导出时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write("=" * 50 + "\n\n")
            
            for q in questions:
                f.write(f"Q{q['sort_order']}: {q['question_text']}\n")
                
                if q['question_type'] in ['single', 'multiple']:
                    sql = """SELECT answer_value, COUNT(*) as count 
                             FROM answers WHERE question_id = %s 
                             GROUP BY answer_value ORDER BY count DESC"""
                    self.cursor.execute(sql, (q['id'],))
                    stats = self.cursor.fetchall()
                    
                    total = sum(s['count'] for s in stats)
                    for s in stats:
                        pct = (s['count'] / total) * 100
                        f.write(f"  {s['answer_value']}: {s['count']}票 ({pct:.1f}%)\n")
                
                elif q['question_type'] in ['rating', 'scale']:
                    sql = """SELECT AVG(CAST(answer_value AS DECIMAL)) as avg_val 
                             FROM answers WHERE question_id = %s AND answer_value != ''"""
                    self.cursor.execute(sql, (q['id'],))
                    avg = self.cursor.fetchone()['avg_val']
                    if avg:
                        f.write(f"  平均分: {avg:.2f}\n")
                
                f.write("\n")
        
        print(f"✅ 统计摘要已保存到: {summary_file}")
        
        self.close_db()

    def view_response_details(self):
        """查看答卷详情"""
        print("\n--- 📋 查看答卷详情 ---")
        
        survey_id = input("请输入问卷ID: ").strip()
        if not survey_id.isdigit():
            print("❌ 无效的ID")
            return
        
        self.connect_db()
        
        # 获取问卷信息
        sql = "SELECT * FROM surveys WHERE id = %s"
        self.cursor.execute(sql, (int(survey_id),))
        survey = self.cursor.fetchone()
        
        if not survey:
            print("❌ 未找到该问卷")
            self.close_db()
            return
        
        # 获取所有答卷
        sql = """SELECT * FROM responses WHERE survey_id = %s ORDER BY submit_time DESC"""
        self.cursor.execute(sql, (int(survey_id),))
        responses = self.cursor.fetchall()
        
        if not responses:
            print("📭 没有答卷")
            self.close_db()
            return
        
        # 显示答卷列表
        table = PrettyTable()
        table.field_names = ["答卷ID", "受访者", "提交时间", "用时"]
        
        for r in responses:
            table.add_row([
                r['id'],
                r['respondent_name'] or '匿名',
                r['submit_time'].strftime('%m-%d %H:%M'),
                f"{r['duration']}秒" if r['duration'] else '-'
            ])
        
        print(f"\n📋 共 {len(responses)} 份答卷:")
        print(table)
        
        # 查看详情
        resp_id = input("\n输入答卷ID查看详情 (回车返回): ").strip()
        if resp_id and resp_id.isdigit():
            self._show_response_detail(int(resp_id), int(survey_id))
        
        self.close_db()

    def _show_response_detail(self, response_id, survey_id):
        """显示单份答卷详情"""
        sql = "SELECT * FROM responses WHERE id = %s AND survey_id = %s"
        self.cursor.execute(sql, (response_id, survey_id))
        response = self.cursor.fetchone()
        
        if not response:
            print("❌ 未找到该答卷")
            return
        
        # 获取题目
        sql = "SELECT * FROM questions WHERE survey_id = %s ORDER BY sort_order"
        self.cursor.execute(sql, (survey_id,))
        questions = self.cursor.fetchall()
        
        print(f"\n📄 答卷 #{response_id}")
        print(f"👤 受访者: {response['respondent_name'] or '匿名'}")
        print(f"📅 提交时间: {response['submit_time']}")
        print("-" * 40)
        
        for q in questions:
            sql = "SELECT answer_value FROM answers WHERE response_id = %s AND question_id = %s"
            self.cursor.execute(sql, (response_id, q['id']))
            ans = self.cursor.fetchone()
            
            answer = ans['answer_value'] if ans else '(未回答)'
            print(f"\nQ{q['sort_order']}: {q['question_text']}")
            print(f"  答: {answer}")

    def run(self):
        """主循环"""
        while True:
            print("\n" + "=" * 46)
            print("📋 问卷调查系统")
            print("=" * 46)
            print("📝 问卷管理")
            print("  1. ➕ 创建问卷")
            print("  2. 📋 问卷列表")
            print("  3. ✏️  编辑问卷")
            print("  4. 🚀 发布/管理")
            print("  5. 🗑️  删除问卷")
            print("📋 问卷填写")
            print("  6. 📝 填写问卷")
            print("📊 数据统计")
            print("  7. 📊 查看统计")
            print("  8. 📈 生成图表")
            print("  9. 📋 答卷详情")
            print("  10. 💾 导出数据")
            print("  11. 🚪 退出")
            print("=" * 46)
            
            choice = input("请选择 (1-11): ").strip()
            
            actions = {
                '1': self.create_survey,
                '2': self.list_surveys,
                '3': self.edit_survey,
                '4': self.publish_survey,
                '5': self.delete_survey,
                '6': self.take_survey,
                '7': self.view_statistics,
                '8': self.generate_charts,
                '9': self.view_response_details,
                '10': self.export_results,
                '11': lambda: (print("👋 再见!"), exit())
            }
            
            action = actions.get(choice)
            if action:
                action()
            else:
                print("❌ 无效选项")

if __name__ == "__main__":
    # 安装依赖: pip install pymysql prettytable matplotlib numpy
    tool = SurveyTool()
    try:
        tool.run()
    except KeyboardInterrupt:
        print("\n\n程序被中断")
    except Exception as e:
        print(f"\n❌ 发生错误: {e}")

安装依赖

pip install pymysql prettytable matplotlib numpy

功能特点

📝 问卷管理

  • 创建问卷:支持5种题型(单选、多选、文本、评分、量表)

  • 编辑修改:发布前可调整题目

  • 状态管理:草稿→发布→关闭

📋 问卷填写

  • 友好的填写界面:逐题作答

  • 必答验证:确保关键问题有回答

  • 自动计时:记录填写时长

📊 数据统计

  • 实时统计:每题自动生成统计报告

  • 图表可视化:饼图、柱状图自动生成

  • 详细查看:逐份查看答卷详情

💾 数据导出

  • CSV导出:原始数据导出

  • 统计摘要:文本格式统计报告

  • 图表保存:PNG格式保存

使用流程

  1. 创建问卷​ → 添加题目 → 保存草稿

  2. 发布问卷​ → 获取问卷ID

  3. 受访者填写​ → 使用问卷ID参与

  4. 查看结果​ → 统计分析 → 导出数据

示例场景

📋 创建满意度调查问卷
├── Q1: 您对我们的服务满意吗? (单选)
│   ├── 非常满意
│   ├── 满意
│   └── 不满意
├── Q2: 请选择您喜欢的服务 (多选)
│   ├── 售前咨询
│   ├── 售后服务
│   └── 技术支持
└── Q3: 请给我们提建议 (文本)

更多推荐