从零搭建天气数据分析平台:Python + PostgreSQL + Pandas + Matplotlib 完整实战
·
从零搭建天气数据分析平台:Python + PostgreSQL + Pandas + Matplotlib 完整实战
一篇文章带你完成:数据采集 → 数据入库 → 数据分析 → 可视化展示的全流程
先看问题
假设老板给你一个任务:
"帮我做一个天气数据分析系统,要能:
- 采集多个城市的天气数据
- 把数据存到数据库
- 分析各城市的温度、降水情况
- 生成可视化图表"
你会怎么做?
本文你将学到
| 技能 | 说明 |
|---|---|
| SQLAlchemy ORM | 用 Python 类操作数据库,告别手写 SQL |
| Session 会话 | 数据库增删改查的核心对象 |
| Pandas 分析 | 从数据库读取数据,进行统计分析 |
| Matplotlib 可视化 | 生成 6 种常用图表 |
| 项目架构 | 数据采集 → 入库 → 分析 → 可视化 |
一、项目架构
天气数据 → Python采集 → PostgreSQL存储 → Pandas分析 → Matplotlib可视化
技术栈
| 技术 | 作用 |
|---|---|
| Python | 数据采集、处理 |
| SQLAlchemy | ORM 操作数据库 |
| PostgreSQL | 数据存储 |
| Pandas | 数据分析 |
| Matplotlib | 数据可视化 |
二、数据库设计
2.1 两张表
-- 城市信息表
CREATE TABLE city_info (
id SERIAL PRIMARY KEY,
city_name VARCHAR(50) NOT NULL UNIQUE,
province VARCHAR(50),
longitude FLOAT,
latitude FLOAT,
created_at TIMESTAMP DEFAULT NOW()
);
-- 天气数据表
CREATE TABLE weather_data (
id SERIAL PRIMARY KEY,
city_name VARCHAR(50) NOT NULL,
weather_date DATE NOT NULL,
temp_max FLOAT,
temp_min FLOAT,
temp_avg FLOAT,
humidity FLOAT,
wind_speed FLOAT,
weather VARCHAR(100),
rainfall FLOAT,
created_at TIMESTAMP DEFAULT NOW()
);
2.2 用 ORM 定义表结构
from sqlalchemy import create_engine, Column, Integer, String, Float, Date, DateTime
from sqlalchemy.orm import declarative_base, sessionmaker
Base = declarative_base()
class CityInfo(Base):
__tablename__ = 'city_info'
id = Column(Integer, primary_key=True, autoincrement=True)
city_name = Column(String(50), nullable=False, unique=True)
province = Column(String(50))
longitude = Column(Float)
latitude = Column(Float)
created_at = Column(DateTime, default=datetime.now)
def __repr__(self):
return f"<CityInfo(city_name='{self.city_name}', province='{self.province}')>"
class WeatherData(Base):
__tablename__ = 'weather_data'
id = Column(Integer, primary_key=True, autoincrement=True)
city_name = Column(String(50), nullable=False)
weather_date = Column(Date, nullable=False)
temp_max = Column(Float)
temp_min = Column(Float)
temp_avg = Column(Float)
humidity = Column(Float)
wind_speed = Column(Float)
weather = Column(String(100))
rainfall = Column(Float)
created_at = Column(DateTime, default=datetime.now)
关键点:
__tablename__:表名(建议小写,避免 PostgreSQL 问题)Column():定义字段类型__repr__():打印对象时显示的信息
三、Session:数据库操作的核心
3.1 创建 Session
# 1. 创建数据库连接
engine = create_engine('postgresql://postgres:123456@localhost:5432/weather')
# 2. 创建表(如果不存在)
Base.metadata.create_all(engine)
# 3. 创建 Session
Session = sessionmaker(bind=engine)
session = Session()
3.2 CRUD 操作
插入数据
# 单条插入
city = CityInfo(city_name='北京', province='北京', longitude=116.4, latitude=39.9)
session.add(city)
session.commit()
# 批量插入
cities = [
CityInfo(city_name='上海', province='上海'),
CityInfo(city_name='广州', province='广东'),
]
session.add_all(cities)
session.commit()
查询数据
# 查询所有
cities = session.query(CityInfo).all()
# 条件查询
beijing = session.query(CityInfo).filter_by(city_name='北京').first()
# 多条件查询
result = session.query(WeatherData).filter(
WeatherData.city_name == '北京',
WeatherData.temp_max > 30
).all()
修改数据
# 修改单个对象
city = session.query(CityInfo).filter_by(city_name='北京').first()
city.longitude = 116.5
session.commit()
# 批量修改
session.query(WeatherData).filter(
WeatherData.city_name == '北京'
).update({WeatherData.temp_avg: 25.0})
session.commit()
删除数据
# 删除单个对象
city = session.query(CityInfo).filter_by(city_name='北京').first()
session.delete(city)
session.commit()
# 批量删除
session.query(WeatherData).filter(
WeatherData.city_name == '北京'
).delete()
session.commit()
3.3 Session 速查表
| 操作 | 代码 |
|---|---|
| 插入 | session.add(obj) → session.commit() |
| 查询所有 | session.query(Model).all() |
| 条件查询 | session.query(Model).filter_by(col=val).first() |
| 修改 | obj.attr = val → session.commit() |
| 删除 | session.delete(obj) → session.commit() |
| 关闭 | session.close() |
四、数据采集与入库
4.1 模拟天气数据
import random
from datetime import datetime, timedelta
def get_weather(city_name, date):
# 设置随机种子,确保同一城市同一天数据一致
random.seed(hash(city_name + date.strftime('%Y-%m-%d')) % 10000)
# 根据月份设置基础温度
month = date.month
if month in [12, 1, 2]: # 冬
base_temp = 0 + random.gauss(0, 5)
elif month in [3, 4, 5]: # 春
base_temp = 15 + random.gauss(0, 5)
elif month in [6, 7, 8]: # 夏
base_temp = 28 + random.gauss(0, 5)
else: # 秋
base_temp = 18 + random.gauss(0, 5)
# 城市气候调整
if city_name in ['广州', '深圳']:
base_temp += 5
elif city_name in ['北京', '西安']:
base_temp -= 2
# 生成天气数据
return {
'city_name': city_name,
'weather_date': date,
'temp_max': round(base_temp + random.uniform(2, 8), 1),
'temp_min': round(base_temp - random.uniform(2, 8), 1),
'humidity': round(random.uniform(40, 90), 1),
'weather': random.choice(['晴', '多云', '阴', '小雨', '中雨', '雷阵雨']),
'created_at': datetime.now()
}
4.2 入库流程
# 城市列表
cities = [
{'city_name': '北京', 'province': '北京', 'longitude': 116.4, 'latitude': 39.9},
{'city_name': '上海', 'province': '上海', 'longitude': 121.4, 'latitude': 31.2},
{'city_name': '广州', 'province': '广东', 'longitude': 113.3, 'latitude': 23.1},
# ...
]
# 插入城市数据
for city in cities:
existing = session.query(CityInfo).filter_by(city_name=city['city_name']).first()
if not existing:
session.add(CityInfo(**city))
session.commit()
# 插入天气数据(过去30天)
today = datetime.now().date()
for city in cities:
for i in range(30):
date = today - timedelta(days=i)
existing = session.query(WeatherData).filter(
WeatherData.city_name == city['city_name'],
WeatherData.weather_date == date
).first()
if not existing:
weather = get_weather(city['city_name'], date)
session.add(WeatherData(**weather))
session.commit()
session.close()
五、Pandas 数据分析
5.1 从数据库读取数据
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('postgresql://postgres:123456@localhost:5432/weather')
# 读取整张表
df = pd.read_sql_table('weather_data', engine)
print(f"数据量: {len(df)} 条")
print(f"时间范围: {df['weather_date'].min()} - {df['weather_date'].max()}")
print(f"城市数量: {df['city_name'].nunique()}")
5.2 分析示例
分析1:各城市平均温度排名
city_temps = df.groupby('city_name')['temp_avg'].mean().sort_values(ascending=False)
print(city_temps)
分析2:天气类型分布
weather_dist = df['weather'].value_counts()
print(weather_dist)
分析3:温度最高/最低的日期
print(f"最高温度: {df.loc[df['temp_max'].idxmax()]}")
print(f"最低温度: {df.loc[df['temp_min'].idxmin()]}")
分析4:降水量统计
rainfall_stats = df[df['rainfall'] > 0].groupby('city_name')['rainfall'].agg(
降水天数='count',
总降水量='sum',
平均降水量='mean'
).round(1)
print(rainfall_stats)
六、Matplotlib 可视化
6.1 设置中文字体
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei']
matplotlib.rcParams['axes.unicode_minus'] = False
6.2 图1:各城市平均温度对比(横向柱状图)
fig, ax = plt.subplots(figsize=(12, 6))
city_temps = df.groupby('city_name')['temp_avg'].mean().sort_values(ascending=True)
city_temps.plot(kind='barh', ax=ax, color='skyblue', edgecolor='black')
ax.set_title('各城市平均温度对比')
ax.set_xlabel('温度 (°C)')
for i, v in enumerate(city_temps.values):
ax.text(v + 0.5, i, f'{v:.1f}°C', va='center')
plt.tight_layout()
plt.savefig('city_temp.png', dpi=150)
plt.close()
6.3 图2:北京温度趋势(折线图)
fig, ax = plt.subplots(figsize=(14, 6))
beijing = df[df['city_name'] == '北京'].sort_values('weather_date')
ax.plot(beijing['weather_date'], beijing['temp_max'], label='最高温度', marker='o', color='red')
ax.plot(beijing['weather_date'], beijing['temp_avg'], label='平均温度', marker='s', color='orange')
ax.plot(beijing['weather_date'], beijing['temp_min'], label='最低温度', marker='^', color='blue')
ax.fill_between(beijing['weather_date'], beijing['temp_min'], beijing['temp_max'], alpha=0.2, color='orange')
ax.set_title('北京30天温度变化趋势')
ax.set_xlabel('日期')
ax.set_ylabel('温度 (°C)')
ax.legend()
ax.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('beijing_trend.png', dpi=150)
plt.close()
6.4 图3:天气类型分布(饼图)
fig, ax = plt.subplots(figsize=(10, 8))
weather_counts = df['weather'].value_counts()
colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7', '#884EA0']
ax.pie(weather_counts.values, labels=weather_counts.index, autopct='%1.0f%%', colors=colors)
ax.set_title('天气类型分布')
plt.tight_layout()
plt.savefig('weather_dist.png', dpi=150)
plt.close()
6.5 图4:多城市温度对比
fig, ax = plt.subplots(figsize=(14, 6))
for city in ['北京', '上海', '广州', '成都']:
city_data = df[df['city_name'] == city].sort_values('weather_date')
ax.plot(city_data['weather_date'], city_data['temp_avg'], marker='o', label=city)
ax.set_title('多城市温度对比')
ax.legend()
ax.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('multi_city.png', dpi=150)
plt.close()
6.6 图5:温度-湿度散点图
fig, ax = plt.subplots(figsize=(10, 8))
ax.scatter(df['temp_avg'], df['humidity'], alpha=0.5, c=df['temp_avg'], cmap='RdYlBu_r')
ax.set_title('温度与湿度关系')
ax.set_xlabel('平均温度 (°C)')
ax.set_ylabel('湿度 (%)')
plt.colorbar(ax.collections[0], label='温度')
plt.tight_layout()
plt.savefig('temp_humidity.png', dpi=150)
plt.close()
6.7 图6:各城市降水量对比
fig, ax = plt.subplots(figsize=(12, 6))
rain_by_city = df.groupby('city_name')['rainfall'].sum().sort_values(ascending=False)
rain_by_city.plot(kind='bar', ax=ax, color='lightblue', edgecolor='darkblue')
ax.set_title('各城市累计降水量')
ax.set_ylabel('降水量 (mm)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('rainfall.png', dpi=150)
plt.close()
七、踩坑记录
坑1:表名大写导致找不到表
# ❌ 错误写法
__tablename__ = 'WeatherData' # PostgreSQL 会转成小写 weatherdata
# ✅ 正确写法
__tablename__ = 'weather_data' # 全小写,用下划线分隔
原因:PostgreSQL 默认会把未加引号的标识符转成小写。
坑2:filter_by 不支持表达式
# ❌ 错误写法
session.query(Student).filter_by(Student.age > 10)
# ✅ 正确写法
session.query(Student).filter(Student.age > 10)
# 或
session.query(Student).filter_by(age=18) # 只支持等值比较
坑3:忘记 commit
# ❌ 忘记 commit,数据不会写入数据库
session.add(city)
# ✅ 正确写法
session.add(city)
session.commit()
坑4:忘记关闭 Session
# ❌ 不关闭会占用数据库连接
session = Session()
session.add(city)
session.commit()
# ✅ 正确写法
session = Session()
try:
session.add(city)
session.commit()
finally:
session.close()
# 或使用上下文管理器
with Session() as session:
session.add(city)
session.commit()
八、项目总结
8.1 技术要点
| 技术点 | 说明 |
|---|---|
| SQLAlchemy ORM | 用 Python 类映射数据库表,告别手写 SQL |
| Session | 数据库会话,所有 CRUD 操作的核心 |
| Pandas | 从数据库读取数据,进行统计分析 |
| Matplotlib | 生成可视化图表 |
8.2 项目流程
1. 设计表结构(ORM 类定义)
2. 创建数据库连接和 Session
3. 采集数据并入库
4. Pandas 读取数据进行分析
5. Matplotlib 生成可视化图表
8.3 后续优化方向
| 方向 | 说明 |
|---|---|
| 真实 API | 接入心知天气、和风天气等真实 API |
| 定时任务 | 使用 Airflow 定时采集数据 |
| Web 界面 | 使用 Flask/FastAPI 提供 Web 界面 |
| 数据建模 | 设计星型模型,支持更多分析维度 |
参考资料
更多推荐

所有评论(0)