Python+MySQL制作一个简易的问卷调查结果收集与统计工具
·

我来教你用Python+MySQL制作一个完整的问卷调查结果收集与统计工具。
第一步:创建数据库和表
CREATE DATABASE IF NOT EXISTS survey_db;
USE survey_db;
-- 问卷表
CREATE TABLE IF NOT EXISTS surveys (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(200) NOT NULL COMMENT '问卷标题',
description TEXT COMMENT '问卷说明',
creator VARCHAR(100) COMMENT '创建者',
status ENUM('draft', 'active', 'closed') DEFAULT 'draft' COMMENT '状态',
start_time DATETIME COMMENT '开始时间',
end_time DATETIME COMMENT '结束时间',
response_limit INT DEFAULT 0 COMMENT '答卷数量限制(0不限)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 题目表
CREATE TABLE IF NOT EXISTS questions (
id INT AUTO_INCREMENT PRIMARY KEY,
survey_id INT NOT NULL COMMENT '所属问卷ID',
question_text TEXT NOT NULL COMMENT '题目内容',
question_type ENUM('single', 'multiple', 'text', 'rating', 'scale') NOT NULL COMMENT '题型',
options JSON COMMENT '选项(JSON数组)',
required BOOLEAN DEFAULT TRUE COMMENT '是否必答',
sort_order INT DEFAULT 0 COMMENT '排序',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (survey_id) REFERENCES surveys(id) ON DELETE CASCADE
);
-- 答卷表
CREATE TABLE IF NOT EXISTS responses (
id INT AUTO_INCREMENT PRIMARY KEY,
survey_id INT NOT NULL COMMENT '问卷ID',
respondent_name VARCHAR(100) COMMENT '受访者姓名',
respondent_email VARCHAR(100) COMMENT '受访者邮箱',
submit_time DATETIME NOT NULL COMMENT '提交时间',
ip_address VARCHAR(45) COMMENT 'IP地址',
duration INT COMMENT '填写时长(秒)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (survey_id) REFERENCES surveys(id) ON DELETE CASCADE
);
-- 答案表
CREATE TABLE IF NOT EXISTS answers (
id INT AUTO_INCREMENT PRIMARY KEY,
response_id INT NOT NULL COMMENT '答卷ID',
question_id INT NOT NULL COMMENT '题目ID',
answer_value TEXT COMMENT '答案内容',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (response_id) REFERENCES responses(id) ON DELETE CASCADE,
FOREIGN KEY (question_id) REFERENCES questions(id) ON DELETE CASCADE
);
完整代码实现
import pymysql
import json
from datetime import datetime, date
from prettytable import PrettyTable
import matplotlib.pyplot as plt
import matplotlib
import numpy as np
import uuid
import csv
matplotlib.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
matplotlib.rcParams['axes.unicode_minus'] = False
class SurveyTool:
def __init__(self):
"""初始化问卷工具"""
self.current_survey = None
self.db_config = {
'host': 'localhost',
'user': 'root',
'password': 'your_password', # 修改为你的密码
'database': 'survey_db',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
def connect_db(self):
"""连接数据库"""
self.connection = pymysql.connect(**self.db_config)
self.cursor = self.connection.cursor()
def close_db(self):
"""关闭数据库连接"""
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
# ========== 问卷管理 ==========
def create_survey(self):
"""创建问卷"""
print("\n--- 📝 创建新问卷 ---")
title = input("问卷标题: ").strip()
if not title:
print("❌ 标题不能为空")
return
description = input("问卷说明: ").strip()
creator = input("创建者: ").strip()
# 添加题目
questions = []
print("\n📋 开始添加题目 (输入空题号结束)")
q_num = 1
while True:
print(f"\n--- 第{q_num}题 ---")
q_text = input("题目内容 (直接回车结束): ").strip()
if not q_text:
break
print("题型选择:")
print("1. 单选题 (single)")
print("2. 多选题 (multiple)")
print("3. 文本题 (text)")
print("4. 评分题 (rating)")
print("5. 量表题 (scale)")
type_map = {'1': 'single', '2': 'multiple', '3': 'text', '4': 'rating', '5': 'scale'}
q_type = type_map.get(input("请选择 (1-5): ").strip(), 'text')
options = []
if q_type in ['single', 'multiple']:
print("请输入选项 (每行一个,输入空行结束):")
while True:
option = input(f" 选项{len(options)+1}: ").strip()
if not option:
break
options.append(option)
required = input("是否必答? (y/n, 默认y): ").strip().lower() != 'n'
questions.append({
'text': q_text,
'type': q_type,
'options': options,
'required': required,
'sort_order': q_num
})
q_num += 1
if not questions:
print("❌ 至少需要一个题目")
return
# 保存到数据库
self.connect_db()
try:
sql = """INSERT INTO surveys (title, description, creator, status)
VALUES (%s, %s, %s, 'draft')"""
self.cursor.execute(sql, (title, description, creator))
survey_id = self.cursor.lastrowid
for q in questions:
sql = """INSERT INTO questions (survey_id, question_text, question_type,
options, required, sort_order)
VALUES (%s, %s, %s, %s, %s, %s)"""
self.cursor.execute(sql, (
survey_id, q['text'], q['type'],
json.dumps(q['options'], ensure_ascii=False),
q['required'], q['sort_order']
))
self.connection.commit()
print(f"\n✅ 问卷创建成功! ID: {survey_id}")
print(f"📊 共 {len(questions)} 道题目")
except Exception as e:
print(f"❌ 创建失败: {e}")
self.connection.rollback()
finally:
self.close_db()
def list_surveys(self):
"""列出所有问卷"""
self.connect_db()
sql = """SELECT s.*,
(SELECT COUNT(*) FROM questions WHERE survey_id = s.id) as question_count,
(SELECT COUNT(*) FROM responses WHERE survey_id = s.id) as response_count
FROM surveys s ORDER BY s.created_at DESC"""
self.cursor.execute(sql)
surveys = self.cursor.fetchall()
self.close_db()
if not surveys:
print("📭 没有问卷")
return
table = PrettyTable()
table.field_names = ["ID", "标题", "题目数", "答卷数", "状态", "创建时间"]
status_icons = {'draft': '📝草稿', 'active': '🟢进行中', 'closed': '🔴已结束'}
for s in surveys:
table.add_row([
s['id'],
s['title'][:20] + '..' if len(s['title']) > 22 else s['title'],
s['question_count'],
s['response_count'],
status_icons.get(s['status'], s['status']),
s['created_at'].strftime('%Y-%m-%d')
])
print(f"\n📋 共 {len(surveys)} 份问卷:")
print(table)
def edit_survey(self):
"""编辑问卷"""
print("\n--- ✏️ 编辑问卷 ---")
survey_id = input("请输入问卷ID: ").strip()
if not survey_id.isdigit():
print("❌ 无效的ID")
return
self.connect_db()
sql = "SELECT * FROM surveys WHERE id = %s"
self.cursor.execute(sql, (int(survey_id),))
survey = self.cursor.fetchone()
if not survey:
print("❌ 未找到该问卷")
self.close_db()
return
if survey['status'] == 'active':
print("⚠️ 问卷已发布,只能修改标题和说明")
print(f"\n当前标题: {survey['title']}")
title = input(f"新标题 (回车保持不变): ").strip() or survey['title']
print(f"当前说明: {survey['description']}")
description = input(f"新说明 (回车保持不变): ").strip() or survey['description']
sql = "UPDATE surveys SET title=%s, description=%s WHERE id=%s"
self.cursor.execute(sql, (title, description, int(survey_id)))
self.connection.commit()
print("✅ 问卷信息已更新")
# 如果是草稿状态,可以编辑题目
if survey['status'] == 'draft':
edit_q = input("\n是否编辑题目? (y/n): ").strip().lower()
if edit_q == 'y':
self._edit_questions(int(survey_id))
self.close_db()
def _edit_questions(self, survey_id):
"""编辑题目"""
sql = "SELECT * FROM questions WHERE survey_id = %s ORDER BY sort_order"
self.cursor.execute(sql, (survey_id,))
questions = self.cursor.fetchall()
print(f"\n当前共 {len(questions)} 道题目:")
for q in questions:
print(f" {q['sort_order']}. [{q['question_type']}] {q['question_text'][:30]}")
print("\n操作选项:")
print("1. 添加题目")
print("2. 删除题目")
print("3. 修改题目")
choice = input("请选择 (1-3): ").strip()
if choice == '1':
self._add_question(survey_id, len(questions) + 1)
elif choice == '2':
q_id = input("请输入要删除的题目ID: ").strip()
if q_id.isdigit():
self.cursor.execute("DELETE FROM questions WHERE id = %s AND survey_id = %s",
(int(q_id), survey_id))
self.connection.commit()
print("✅ 题目已删除")
elif choice == '3':
# 简化版:重新创建题目
print("功能开发中...")
def _add_question(self, survey_id, sort_order):
"""添加题目"""
q_text = input("题目内容: ").strip()
if not q_text:
return
print("题型: 1.单选 2.多选 3.文本 4.评分 5.量表")
type_map = {'1': 'single', '2': 'multiple', '3': 'text', '4': 'rating', '5': 'scale'}
q_type = type_map.get(input("请选择: ").strip(), 'text')
options = []
if q_type in ['single', 'multiple']:
print("输入选项 (每行一个,空行结束):")
while True:
opt = input(f" 选项{len(options)+1}: ").strip()
if not opt:
break
options.append(opt)
required = input("是否必答? (y/n): ").strip().lower() != 'n'
sql = """INSERT INTO questions (survey_id, question_text, question_type,
options, required, sort_order) VALUES (%s, %s, %s, %s, %s, %s)"""
self.cursor.execute(sql, (survey_id, q_text, q_type,
json.dumps(options, ensure_ascii=False), required, sort_order))
self.connection.commit()
print("✅ 题目已添加")
def publish_survey(self):
"""发布/关闭问卷"""
print("\n--- 🚀 发布/管理问卷 ---")
survey_id = input("请输入问卷ID: ").strip()
if not survey_id.isdigit():
print("❌ 无效的ID")
return
self.connect_db()
sql = "SELECT * FROM surveys WHERE id = %s"
self.cursor.execute(sql, (int(survey_id),))
survey = self.cursor.fetchone()
if not survey:
print("❌ 未找到该问卷")
self.close_db()
return
print(f"\n当前状态: {survey['status']}")
print("1. 发布问卷 (active)")
print("2. 关闭问卷 (closed)")
print("3. 设为草稿 (draft)")
choice = input("请选择 (1-3): ").strip()
status_map = {'1': 'active', '2': 'closed', '3': 'draft'}
new_status = status_map.get(choice)
if new_status:
sql = "UPDATE surveys SET status = %s WHERE id = %s"
self.cursor.execute(sql, (new_status, int(survey_id)))
self.connection.commit()
print(f"✅ 问卷状态已更新为: {new_status}")
else:
print("❌ 无效选项")
self.close_db()
def delete_survey(self):
"""删除问卷"""
print("\n--- 🗑️ 删除问卷 ---")
survey_id = input("请输入问卷ID: ").strip()
if not survey_id.isdigit():
print("❌ 无效的ID")
return
confirm = input(f"确认删除问卷 ID={survey_id} 及其所有数据? (y/n): ").strip().lower()
if confirm != 'y':
return
self.connect_db()
try:
# 级联删除会处理关联数据
self.cursor.execute("DELETE FROM surveys WHERE id = %s", (int(survey_id),))
self.connection.commit()
print("✅ 问卷已删除")
except Exception as e:
print(f"❌ 删除失败: {e}")
self.connection.rollback()
finally:
self.close_db()
# ========== 问卷填写 ==========
def take_survey(self):
"""填写问卷"""
print("\n--- 📝 填写问卷 ---")
survey_id = input("请输入问卷ID: ").strip()
if not survey_id.isdigit():
print("❌ 无效的ID")
return
self.connect_db()
sql = "SELECT * FROM surveys WHERE id = %s AND status = 'active'"
self.cursor.execute(sql, (int(survey_id),))
survey = self.cursor.fetchone()
if not survey:
print("❌ 问卷不存在或未开放")
self.close_db()
return
# 获取题目
sql = "SELECT * FROM questions WHERE survey_id = %s ORDER BY sort_order"
self.cursor.execute(sql, (int(survey_id),))
questions = self.cursor.fetchall()
if not questions:
print("❌ 问卷没有题目")
self.close_db()
return
print(f"\n📋 {survey['title']}")
if survey['description']:
print(f"📄 {survey['description']}")
print(f"📊 共 {len(questions)} 道题")
print("=" * 40)
# 受访者信息
respondent_name = input("\n您的姓名 (可选): ").strip()
respondent_email = input("您的邮箱 (可选): ").strip()
start_time = datetime.now()
answers = []
# 逐题作答
for q in questions:
print(f"\n第{q['sort_order']}题: {q['question_text']}")
if q['required']:
print(" (必答)")
options = json.loads(q['options']) if q['options'] else []
if q['question_type'] == 'single':
for i, opt in enumerate(options, 1):
print(f" {i}. {opt}")
while True:
choice = input("请选择 (输入编号): ").strip()
if choice.isdigit() and 1 <= int(choice) <= len(options):
answers.append({'question_id': q['id'], 'answer': options[int(choice)-1]})
break
elif not q['required'] and not choice:
answers.append({'question_id': q['id'], 'answer': ''})
break
print("❌ 无效选择")
elif q['question_type'] == 'multiple':
for i, opt in enumerate(options, 1):
print(f" {i}. {opt}")
choices = input("请选择 (多个用逗号分隔,如1,3): ").strip()
selected = []
if choices:
for c in choices.split(','):
c = c.strip()
if c.isdigit() and 1 <= int(c) <= len(options):
selected.append(options[int(c)-1])
answers.append({'question_id': q['id'], 'answer': ','.join(selected)})
elif q['question_type'] == 'rating':
print(" 评分: 1-5星")
while True:
rating = input("请评分 (1-5): ").strip()
if rating.isdigit() and 1 <= int(rating) <= 5:
answers.append({'question_id': q['id'], 'answer': rating})
break
elif not q['required'] and not rating:
answers.append({'question_id': q['id'], 'answer': ''})
break
print("❌ 请输入1-5的数字")
elif q['question_type'] == 'scale':
print(" 量表: 1(非常不满意) - 10(非常满意)")
while True:
scale = input("请打分 (1-10): ").strip()
if scale.isdigit() and 1 <= int(scale) <= 10:
answers.append({'question_id': q['id'], 'answer': scale})
break
elif not q['required'] and not scale:
answers.append({'question_id': q['id'], 'answer': ''})
break
print("❌ 请输入1-10的数字")
else: # text
text_answer = input("请输入您的回答: ").strip()
if q['required'] and not text_answer:
print("❌ 此题必答")
text_answer = input("请输入您的回答: ").strip()
answers.append({'question_id': q['id'], 'answer': text_answer})
# 计算填写时长
end_time = datetime.now()
duration = int((end_time - start_time).total_seconds())
# 保存答卷
try:
sql = """INSERT INTO responses (survey_id, respondent_name, respondent_email,
submit_time, duration) VALUES (%s, %s, %s, %s, %s)"""
self.cursor.execute(sql, (int(survey_id), respondent_name, respondent_email,
end_time, duration))
response_id = self.cursor.lastrowid
for ans in answers:
sql = "INSERT INTO answers (response_id, question_id, answer_value) VALUES (%s, %s, %s)"
self.cursor.execute(sql, (response_id, ans['question_id'], ans['answer']))
self.connection.commit()
print(f"\n✅ 问卷提交成功! 感谢您的参与!")
print(f"⏱️ 填写用时: {duration}秒")
except Exception as e:
print(f"❌ 提交失败: {e}")
self.connection.rollback()
finally:
self.close_db()
# ========== 数据统计 ==========
def view_statistics(self):
"""查看统计数据"""
print("\n--- 📊 问卷统计 ---")
survey_id = input("请输入问卷ID: ").strip()
if not survey_id.isdigit():
print("❌ 无效的ID")
return
self.connect_db()
# 获取问卷信息
sql = "SELECT * FROM surveys WHERE id = %s"
self.cursor.execute(sql, (int(survey_id),))
survey = self.cursor.fetchone()
if not survey:
print("❌ 未找到该问卷")
self.close_db()
return
# 获取题目
sql = "SELECT * FROM questions WHERE survey_id = %s ORDER BY sort_order"
self.cursor.execute(sql, (int(survey_id),))
questions = self.cursor.fetchall()
# 获取答卷数量
sql = "SELECT COUNT(*) as count FROM responses WHERE survey_id = %s"
self.cursor.execute(sql, (int(survey_id),))
response_count = self.cursor.fetchone()['count']
print(f"\n📊 {survey['title']} 统计报告")
print(f"📝 总答卷数: {response_count}")
print(f"📅 创建时间: {survey['created_at'].strftime('%Y-%m-%d %H:%M')}")
print("=" * 50)
if response_count == 0:
print("📭 暂无答卷数据")
self.close_db()
return
# 逐题统计
for q in questions:
print(f"\n📌 第{q['sort_order']}题: {q['question_text']}")
options = json.loads(q['options']) if q['options'] else []
if q['question_type'] in ['single', 'multiple']:
# 选择题统计
sql = """SELECT answer_value, COUNT(*) as count
FROM answers WHERE question_id = %s
GROUP BY answer_value ORDER BY count DESC"""
self.cursor.execute(sql, (q['id'],))
stats = self.cursor.fetchall()
if stats:
table = PrettyTable()
table.field_names = ["选项", "票数", "比例", "条形图"]
total = sum(s['count'] for s in stats)
for s in stats:
pct = (s['count'] / total) * 100
bar = '█' * int(pct / 5)
table.add_row([s['answer_value'], s['count'], f"{pct:.1f}%", bar])
print(table)
elif q['question_type'] == 'rating':
# 评分统计
sql = """SELECT answer_value, COUNT(*) as count
FROM answers WHERE question_id = %s
GROUP BY answer_value ORDER BY answer_value"""
self.cursor.execute(sql, (q['id'],))
ratings = self.cursor.fetchall()
if ratings:
avg_rating = sum(int(r['answer_value']) * r['count'] for r in ratings) / \
sum(r['count'] for r in ratings)
stars = '⭐' * round(avg_rating)
print(f"平均评分: {avg_rating:.1f} {stars}")
for r in ratings:
bar = '█' * (int(r['count']) * 2)
print(f" {r['answer_value']}星: {bar} ({r['count']}人)")
elif q['question_type'] == 'scale':
# 量表统计
sql = """SELECT answer_value, COUNT(*) as count
FROM answers WHERE question_id = %s
GROUP BY answer_value ORDER BY answer_value"""
self.cursor.execute(sql, (q['id'],))
scales = self.cursor.fetchall()
if scales:
avg_scale = sum(int(s['answer_value']) * s['count'] for s in scales) / \
sum(s['count'] for s in scales)
print(f"平均分: {avg_scale:.1f}/10")
for s in scales:
bar = '█' * (int(s['count']) * 2)
print(f" {s['answer_value']}分: {bar} ({s['count']}人)")
else: # text
# 文本题显示所有回答
sql = "SELECT answer_value FROM answers WHERE question_id = %s AND answer_value != ''"
self.cursor.execute(sql, (q['id'],))
texts = self.cursor.fetchall()
if texts:
print(f"共 {len(texts)} 条文本回答:")
for i, t in enumerate(texts[:10], 1):
print(f" {i}. {t['answer_value'][:50]}")
if len(texts) > 10:
print(f" ... 还有 {len(texts)-10} 条回答")
self.close_db()
def generate_charts(self):
"""生成统计图表"""
print("\n--- 📈 生成统计图表 ---")
survey_id = input("请输入问卷ID: ").strip()
if not survey_id.isdigit():
print("❌ 无效的ID")
return
self.connect_db()
# 获取问卷信息
sql = "SELECT * FROM surveys WHERE id = %s"
self.cursor.execute(sql, (int(survey_id),))
survey = self.cursor.fetchone()
if not survey:
print("❌ 未找到该问卷")
self.close_db()
return
# 获取题目
sql = "SELECT * FROM questions WHERE survey_id = %s ORDER BY sort_order"
self.cursor.execute(sql, (int(survey_id),))
questions = self.cursor.fetchall()
# 获取答卷数量
sql = "SELECT COUNT(*) as count FROM responses WHERE survey_id = %s"
self.cursor.execute(sql, (int(survey_id),))
response_count = self.cursor.fetchone()['count']
if response_count == 0:
print("📭 暂无数据,无法生成图表")
self.close_db()
return
# 创建图表
chart_count = sum(1 for q in questions if q['question_type'] in ['single', 'multiple', 'rating', 'scale'])
if chart_count == 0:
print("📭 没有适合生成图表的题目类型")
self.close_db()
return
fig, axes = plt.subplots((chart_count + 1) // 2, 2, figsize=(14, 5 * ((chart_count + 1) // 2)))
fig.suptitle(f'{survey["title"]} - 调查结果统计', fontsize=16, fontweight='bold')
plot_idx = 0
for q in questions:
if q['question_type'] not in ['single', 'multiple', 'rating', 'scale']:
continue
ax = axes[plot_idx // 2, plot_idx % 2] if chart_count > 1 else axes
plot_idx += 1
sql = """SELECT answer_value, COUNT(*) as count
FROM answers WHERE question_id = %s
GROUP BY answer_value ORDER BY count DESC"""
self.cursor.execute(sql, (q['id'],))
data = self.cursor.fetchall()
if not data:
continue
labels = [d['answer_value'] for d in data]
values = [d['count'] for d in data]
colors = plt.cm.Set3(np.linspace(0, 1, len(data)))
if q['question_type'] in ['single', 'multiple']:
# 饼图
wedges, texts, autotexts = ax.pie(values, labels=None, autopct='%1.1f%%',
colors=colors, startangle=90)
ax.set_title(f'Q{q["sort_order"]}: {q["question_text"][:20]}', fontsize=10)
ax.legend(wedges, labels, title="选项", loc="center left",
bbox_to_anchor=(1, 0, 0.5, 1), fontsize=8)
else:
# 柱状图
bars = ax.bar(labels, values, color=colors)
ax.set_title(f'Q{q["sort_order"]}: {q["question_text"][:20]}', fontsize=10)
ax.set_ylabel('人数')
ax.tick_params(axis='x', rotation=45, labelsize=8)
for bar, val in zip(bars, values):
ax.text(bar.get_x() + bar.get_width()/2, bar.get_height(),
str(val), ha='center', va='bottom', fontsize=9)
# 隐藏多余的子图
for i in range(plot_idx, (chart_count + 1) // 2 * 2):
if chart_count > 1:
axes[i // 2, i % 2].axis('off')
plt.tight_layout()
# 保存图片
filename = f"survey_{survey_id}_results.png"
plt.savefig(filename, dpi=150, bbox_inches='tight')
print(f"✅ 图表已保存: {filename}")
plt.show()
self.close_db()
def export_results(self):
"""导出调查结果"""
print("\n--- 💾 导出调查结果 ---")
survey_id = input("请输入问卷ID: ").strip()
if not survey_id.isdigit():
print("❌ 无效的ID")
return
self.connect_db()
# 获取问卷信息
sql = "SELECT * FROM surveys WHERE id = %s"
self.cursor.execute(sql, (int(survey_id),))
survey = self.cursor.fetchone()
if not survey:
print("❌ 未找到该问卷")
self.close_db()
return
# 获取题目
sql = "SELECT * FROM questions WHERE survey_id = %s ORDER BY sort_order"
self.cursor.execute(sql, (int(survey_id),))
questions = self.cursor.fetchall()
# 获取所有答卷
sql = """SELECT * FROM responses WHERE survey_id = %s ORDER BY submit_time"""
self.cursor.execute(sql, (int(survey_id),))
responses = self.cursor.fetchall()
if not responses:
print("📭 没有答卷数据可导出")
self.close_db()
return
# 导出CSV
filename = f"survey_{survey_id}_results_{date.today()}.csv"
with open(filename, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.writer(f)
# 写入表头
headers = ['答卷ID', '受访者', '邮箱', '提交时间', '用时(秒)']
for q in questions:
headers.append(f'Q{q["sort_order"]}: {q["question_text"][:20]}')
writer.writerow(headers)
# 写入数据
for resp in responses:
row = [
resp['id'],
resp['respondent_name'] or '',
resp['respondent_email'] or '',
resp['submit_time'].strftime('%Y-%m-%d %H:%M:%S'),
resp['duration']
]
for q in questions:
sql = "SELECT answer_value FROM answers WHERE response_id = %s AND question_id = %s"
self.cursor.execute(sql, (resp['id'], q['id']))
ans = self.cursor.fetchone()
row.append(ans['answer_value'] if ans else '')
writer.writerow(row)
print(f"✅ 已导出 {len(responses)} 条答卷到: {filename}")
# 也导出统计摘要
summary_file = f"survey_{survey_id}_summary_{date.today()}.txt"
with open(summary_file, 'w', encoding='utf-8') as f:
f.write(f"问卷: {survey['title']}\n")
f.write(f"总答卷数: {len(responses)}\n")
f.write(f"导出时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 50 + "\n\n")
for q in questions:
f.write(f"Q{q['sort_order']}: {q['question_text']}\n")
if q['question_type'] in ['single', 'multiple']:
sql = """SELECT answer_value, COUNT(*) as count
FROM answers WHERE question_id = %s
GROUP BY answer_value ORDER BY count DESC"""
self.cursor.execute(sql, (q['id'],))
stats = self.cursor.fetchall()
total = sum(s['count'] for s in stats)
for s in stats:
pct = (s['count'] / total) * 100
f.write(f" {s['answer_value']}: {s['count']}票 ({pct:.1f}%)\n")
elif q['question_type'] in ['rating', 'scale']:
sql = """SELECT AVG(CAST(answer_value AS DECIMAL)) as avg_val
FROM answers WHERE question_id = %s AND answer_value != ''"""
self.cursor.execute(sql, (q['id'],))
avg = self.cursor.fetchone()['avg_val']
if avg:
f.write(f" 平均分: {avg:.2f}\n")
f.write("\n")
print(f"✅ 统计摘要已保存到: {summary_file}")
self.close_db()
def view_response_details(self):
"""查看答卷详情"""
print("\n--- 📋 查看答卷详情 ---")
survey_id = input("请输入问卷ID: ").strip()
if not survey_id.isdigit():
print("❌ 无效的ID")
return
self.connect_db()
# 获取问卷信息
sql = "SELECT * FROM surveys WHERE id = %s"
self.cursor.execute(sql, (int(survey_id),))
survey = self.cursor.fetchone()
if not survey:
print("❌ 未找到该问卷")
self.close_db()
return
# 获取所有答卷
sql = """SELECT * FROM responses WHERE survey_id = %s ORDER BY submit_time DESC"""
self.cursor.execute(sql, (int(survey_id),))
responses = self.cursor.fetchall()
if not responses:
print("📭 没有答卷")
self.close_db()
return
# 显示答卷列表
table = PrettyTable()
table.field_names = ["答卷ID", "受访者", "提交时间", "用时"]
for r in responses:
table.add_row([
r['id'],
r['respondent_name'] or '匿名',
r['submit_time'].strftime('%m-%d %H:%M'),
f"{r['duration']}秒" if r['duration'] else '-'
])
print(f"\n📋 共 {len(responses)} 份答卷:")
print(table)
# 查看详情
resp_id = input("\n输入答卷ID查看详情 (回车返回): ").strip()
if resp_id and resp_id.isdigit():
self._show_response_detail(int(resp_id), int(survey_id))
self.close_db()
def _show_response_detail(self, response_id, survey_id):
"""显示单份答卷详情"""
sql = "SELECT * FROM responses WHERE id = %s AND survey_id = %s"
self.cursor.execute(sql, (response_id, survey_id))
response = self.cursor.fetchone()
if not response:
print("❌ 未找到该答卷")
return
# 获取题目
sql = "SELECT * FROM questions WHERE survey_id = %s ORDER BY sort_order"
self.cursor.execute(sql, (survey_id,))
questions = self.cursor.fetchall()
print(f"\n📄 答卷 #{response_id}")
print(f"👤 受访者: {response['respondent_name'] or '匿名'}")
print(f"📅 提交时间: {response['submit_time']}")
print("-" * 40)
for q in questions:
sql = "SELECT answer_value FROM answers WHERE response_id = %s AND question_id = %s"
self.cursor.execute(sql, (response_id, q['id']))
ans = self.cursor.fetchone()
answer = ans['answer_value'] if ans else '(未回答)'
print(f"\nQ{q['sort_order']}: {q['question_text']}")
print(f" 答: {answer}")
def run(self):
"""主循环"""
while True:
print("\n" + "=" * 46)
print("📋 问卷调查系统")
print("=" * 46)
print("📝 问卷管理")
print(" 1. ➕ 创建问卷")
print(" 2. 📋 问卷列表")
print(" 3. ✏️ 编辑问卷")
print(" 4. 🚀 发布/管理")
print(" 5. 🗑️ 删除问卷")
print("📋 问卷填写")
print(" 6. 📝 填写问卷")
print("📊 数据统计")
print(" 7. 📊 查看统计")
print(" 8. 📈 生成图表")
print(" 9. 📋 答卷详情")
print(" 10. 💾 导出数据")
print(" 11. 🚪 退出")
print("=" * 46)
choice = input("请选择 (1-11): ").strip()
actions = {
'1': self.create_survey,
'2': self.list_surveys,
'3': self.edit_survey,
'4': self.publish_survey,
'5': self.delete_survey,
'6': self.take_survey,
'7': self.view_statistics,
'8': self.generate_charts,
'9': self.view_response_details,
'10': self.export_results,
'11': lambda: (print("👋 再见!"), exit())
}
action = actions.get(choice)
if action:
action()
else:
print("❌ 无效选项")
if __name__ == "__main__":
# 安装依赖: pip install pymysql prettytable matplotlib numpy
tool = SurveyTool()
try:
tool.run()
except KeyboardInterrupt:
print("\n\n程序被中断")
except Exception as e:
print(f"\n❌ 发生错误: {e}")
安装依赖
pip install pymysql prettytable matplotlib numpy
功能特点
📝 问卷管理
-
创建问卷:支持5种题型(单选、多选、文本、评分、量表)
-
编辑修改:发布前可调整题目
-
状态管理:草稿→发布→关闭
📋 问卷填写
-
友好的填写界面:逐题作答
-
必答验证:确保关键问题有回答
-
自动计时:记录填写时长
📊 数据统计
-
实时统计:每题自动生成统计报告
-
图表可视化:饼图、柱状图自动生成
-
详细查看:逐份查看答卷详情
💾 数据导出
-
CSV导出:原始数据导出
-
统计摘要:文本格式统计报告
-
图表保存:PNG格式保存
使用流程
-
创建问卷 → 添加题目 → 保存草稿
-
发布问卷 → 获取问卷ID
-
受访者填写 → 使用问卷ID参与
-
查看结果 → 统计分析 → 导出数据
示例场景
📋 创建满意度调查问卷
├── Q1: 您对我们的服务满意吗? (单选)
│ ├── 非常满意
│ ├── 满意
│ └── 不满意
├── Q2: 请选择您喜欢的服务 (多选)
│ ├── 售前咨询
│ ├── 售后服务
│ └── 技术支持
└── Q3: 请给我们提建议 (文本)更多推荐
所有评论(0)