从零搭建天气数据分析平台:Python + PostgreSQL + Pandas + Matplotlib 完整实战

一篇文章带你完成:数据采集 → 数据入库 → 数据分析 → 可视化展示的全流程


先看问题

假设老板给你一个任务:

"帮我做一个天气数据分析系统,要能:

  1. 采集多个城市的天气数据
  2. 把数据存到数据库
  3. 分析各城市的温度、降水情况
  4. 生成可视化图表"

你会怎么做?


本文你将学到

技能 说明
SQLAlchemy ORM 用 Python 类操作数据库,告别手写 SQL
Session 会话 数据库增删改查的核心对象
Pandas 分析 从数据库读取数据,进行统计分析
Matplotlib 可视化 生成 6 种常用图表
项目架构 数据采集 → 入库 → 分析 → 可视化

一、项目架构

天气数据 → Python采集 → PostgreSQL存储 → Pandas分析 → Matplotlib可视化

技术栈

技术 作用
Python 数据采集、处理
SQLAlchemy ORM 操作数据库
PostgreSQL 数据存储
Pandas 数据分析
Matplotlib 数据可视化

二、数据库设计

2.1 两张表

-- 城市信息表
CREATE TABLE city_info (
    id SERIAL PRIMARY KEY,
    city_name VARCHAR(50) NOT NULL UNIQUE,
    province VARCHAR(50),
    longitude FLOAT,
    latitude FLOAT,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 天气数据表
CREATE TABLE weather_data (
    id SERIAL PRIMARY KEY,
    city_name VARCHAR(50) NOT NULL,
    weather_date DATE NOT NULL,
    temp_max FLOAT,
    temp_min FLOAT,
    temp_avg FLOAT,
    humidity FLOAT,
    wind_speed FLOAT,
    weather VARCHAR(100),
    rainfall FLOAT,
    created_at TIMESTAMP DEFAULT NOW()
);

2.2 用 ORM 定义表结构

from sqlalchemy import create_engine, Column, Integer, String, Float, Date, DateTime
from sqlalchemy.orm import declarative_base, sessionmaker

Base = declarative_base()

class CityInfo(Base):
    __tablename__ = 'city_info'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    city_name = Column(String(50), nullable=False, unique=True)
    province = Column(String(50))
    longitude = Column(Float)
    latitude = Column(Float)
    created_at = Column(DateTime, default=datetime.now)
    
    def __repr__(self):
        return f"<CityInfo(city_name='{self.city_name}', province='{self.province}')>"

class WeatherData(Base):
    __tablename__ = 'weather_data'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    city_name = Column(String(50), nullable=False)
    weather_date = Column(Date, nullable=False)
    temp_max = Column(Float)
    temp_min = Column(Float)
    temp_avg = Column(Float)
    humidity = Column(Float)
    wind_speed = Column(Float)
    weather = Column(String(100))
    rainfall = Column(Float)
    created_at = Column(DateTime, default=datetime.now)

关键点

  • __tablename__:表名(建议小写,避免 PostgreSQL 问题)
  • Column():定义字段类型
  • __repr__():打印对象时显示的信息

三、Session:数据库操作的核心

3.1 创建 Session

# 1. 创建数据库连接
engine = create_engine('postgresql://postgres:123456@localhost:5432/weather')

# 2. 创建表(如果不存在)
Base.metadata.create_all(engine)

# 3. 创建 Session
Session = sessionmaker(bind=engine)
session = Session()

3.2 CRUD 操作

插入数据
# 单条插入
city = CityInfo(city_name='北京', province='北京', longitude=116.4, latitude=39.9)
session.add(city)
session.commit()

# 批量插入
cities = [
    CityInfo(city_name='上海', province='上海'),
    CityInfo(city_name='广州', province='广东'),
]
session.add_all(cities)
session.commit()
查询数据
# 查询所有
cities = session.query(CityInfo).all()

# 条件查询
beijing = session.query(CityInfo).filter_by(city_name='北京').first()

# 多条件查询
result = session.query(WeatherData).filter(
    WeatherData.city_name == '北京',
    WeatherData.temp_max > 30
).all()
修改数据
# 修改单个对象
city = session.query(CityInfo).filter_by(city_name='北京').first()
city.longitude = 116.5
session.commit()

# 批量修改
session.query(WeatherData).filter(
    WeatherData.city_name == '北京'
).update({WeatherData.temp_avg: 25.0})
session.commit()
删除数据
# 删除单个对象
city = session.query(CityInfo).filter_by(city_name='北京').first()
session.delete(city)
session.commit()

# 批量删除
session.query(WeatherData).filter(
    WeatherData.city_name == '北京'
).delete()
session.commit()

3.3 Session 速查表

操作 代码
插入 session.add(obj)session.commit()
查询所有 session.query(Model).all()
条件查询 session.query(Model).filter_by(col=val).first()
修改 obj.attr = valsession.commit()
删除 session.delete(obj)session.commit()
关闭 session.close()

四、数据采集与入库

4.1 模拟天气数据

import random
from datetime import datetime, timedelta

def get_weather(city_name, date):
    # 设置随机种子,确保同一城市同一天数据一致
    random.seed(hash(city_name + date.strftime('%Y-%m-%d')) % 10000)
    
    # 根据月份设置基础温度
    month = date.month
    if month in [12, 1, 2]:  # 冬
        base_temp = 0 + random.gauss(0, 5)
    elif month in [3, 4, 5]:  # 春
        base_temp = 15 + random.gauss(0, 5)
    elif month in [6, 7, 8]:  # 夏
        base_temp = 28 + random.gauss(0, 5)
    else:  # 秋
        base_temp = 18 + random.gauss(0, 5)
    
    # 城市气候调整
    if city_name in ['广州', '深圳']:
        base_temp += 5
    elif city_name in ['北京', '西安']:
        base_temp -= 2
    
    # 生成天气数据
    return {
        'city_name': city_name,
        'weather_date': date,
        'temp_max': round(base_temp + random.uniform(2, 8), 1),
        'temp_min': round(base_temp - random.uniform(2, 8), 1),
        'humidity': round(random.uniform(40, 90), 1),
        'weather': random.choice(['晴', '多云', '阴', '小雨', '中雨', '雷阵雨']),
        'created_at': datetime.now()
    }

4.2 入库流程

# 城市列表
cities = [
    {'city_name': '北京', 'province': '北京', 'longitude': 116.4, 'latitude': 39.9},
    {'city_name': '上海', 'province': '上海', 'longitude': 121.4, 'latitude': 31.2},
    {'city_name': '广州', 'province': '广东', 'longitude': 113.3, 'latitude': 23.1},
    # ...
]

# 插入城市数据
for city in cities:
    existing = session.query(CityInfo).filter_by(city_name=city['city_name']).first()
    if not existing:
        session.add(CityInfo(**city))
session.commit()

# 插入天气数据(过去30天)
today = datetime.now().date()
for city in cities:
    for i in range(30):
        date = today - timedelta(days=i)
        existing = session.query(WeatherData).filter(
            WeatherData.city_name == city['city_name'],
            WeatherData.weather_date == date
        ).first()
        if not existing:
            weather = get_weather(city['city_name'], date)
            session.add(WeatherData(**weather))
    session.commit()

session.close()

五、Pandas 数据分析

5.1 从数据库读取数据

import pandas as pd
from sqlalchemy import create_engine

engine = create_engine('postgresql://postgres:123456@localhost:5432/weather')

# 读取整张表
df = pd.read_sql_table('weather_data', engine)

print(f"数据量: {len(df)} 条")
print(f"时间范围: {df['weather_date'].min()} - {df['weather_date'].max()}")
print(f"城市数量: {df['city_name'].nunique()}")

5.2 分析示例

分析1:各城市平均温度排名
city_temps = df.groupby('city_name')['temp_avg'].mean().sort_values(ascending=False)
print(city_temps)
分析2:天气类型分布
weather_dist = df['weather'].value_counts()
print(weather_dist)
分析3:温度最高/最低的日期
print(f"最高温度: {df.loc[df['temp_max'].idxmax()]}")
print(f"最低温度: {df.loc[df['temp_min'].idxmin()]}")
分析4:降水量统计
rainfall_stats = df[df['rainfall'] > 0].groupby('city_name')['rainfall'].agg(
    降水天数='count',
    总降水量='sum',
    平均降水量='mean'
).round(1)
print(rainfall_stats)

六、Matplotlib 可视化

6.1 设置中文字体

import matplotlib.pyplot as plt
import matplotlib

matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei']
matplotlib.rcParams['axes.unicode_minus'] = False

6.2 图1:各城市平均温度对比(横向柱状图)

fig, ax = plt.subplots(figsize=(12, 6))
city_temps = df.groupby('city_name')['temp_avg'].mean().sort_values(ascending=True)
city_temps.plot(kind='barh', ax=ax, color='skyblue', edgecolor='black')
ax.set_title('各城市平均温度对比')
ax.set_xlabel('温度 (°C)')
for i, v in enumerate(city_temps.values):
    ax.text(v + 0.5, i, f'{v:.1f}°C', va='center')
plt.tight_layout()
plt.savefig('city_temp.png', dpi=150)
plt.close()

6.3 图2:北京温度趋势(折线图)

fig, ax = plt.subplots(figsize=(14, 6))
beijing = df[df['city_name'] == '北京'].sort_values('weather_date')

ax.plot(beijing['weather_date'], beijing['temp_max'], label='最高温度', marker='o', color='red')
ax.plot(beijing['weather_date'], beijing['temp_avg'], label='平均温度', marker='s', color='orange')
ax.plot(beijing['weather_date'], beijing['temp_min'], label='最低温度', marker='^', color='blue')
ax.fill_between(beijing['weather_date'], beijing['temp_min'], beijing['temp_max'], alpha=0.2, color='orange')

ax.set_title('北京30天温度变化趋势')
ax.set_xlabel('日期')
ax.set_ylabel('温度 (°C)')
ax.legend()
ax.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('beijing_trend.png', dpi=150)
plt.close()

6.4 图3:天气类型分布(饼图)

fig, ax = plt.subplots(figsize=(10, 8))
weather_counts = df['weather'].value_counts()
colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7', '#884EA0']
ax.pie(weather_counts.values, labels=weather_counts.index, autopct='%1.0f%%', colors=colors)
ax.set_title('天气类型分布')
plt.tight_layout()
plt.savefig('weather_dist.png', dpi=150)
plt.close()

6.5 图4:多城市温度对比

fig, ax = plt.subplots(figsize=(14, 6))
for city in ['北京', '上海', '广州', '成都']:
    city_data = df[df['city_name'] == city].sort_values('weather_date')
    ax.plot(city_data['weather_date'], city_data['temp_avg'], marker='o', label=city)
ax.set_title('多城市温度对比')
ax.legend()
ax.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('multi_city.png', dpi=150)
plt.close()

6.6 图5:温度-湿度散点图

fig, ax = plt.subplots(figsize=(10, 8))
ax.scatter(df['temp_avg'], df['humidity'], alpha=0.5, c=df['temp_avg'], cmap='RdYlBu_r')
ax.set_title('温度与湿度关系')
ax.set_xlabel('平均温度 (°C)')
ax.set_ylabel('湿度 (%)')
plt.colorbar(ax.collections[0], label='温度')
plt.tight_layout()
plt.savefig('temp_humidity.png', dpi=150)
plt.close()

6.7 图6:各城市降水量对比

fig, ax = plt.subplots(figsize=(12, 6))
rain_by_city = df.groupby('city_name')['rainfall'].sum().sort_values(ascending=False)
rain_by_city.plot(kind='bar', ax=ax, color='lightblue', edgecolor='darkblue')
ax.set_title('各城市累计降水量')
ax.set_ylabel('降水量 (mm)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('rainfall.png', dpi=150)
plt.close()

七、踩坑记录

坑1:表名大写导致找不到表

# ❌ 错误写法
__tablename__ = 'WeatherData'  # PostgreSQL 会转成小写 weatherdata

# ✅ 正确写法
__tablename__ = 'weather_data'  # 全小写,用下划线分隔

原因:PostgreSQL 默认会把未加引号的标识符转成小写。

坑2:filter_by 不支持表达式

# ❌ 错误写法
session.query(Student).filter_by(Student.age > 10)

# ✅ 正确写法
session.query(Student).filter(Student.age > 10)
# 或
session.query(Student).filter_by(age=18)  # 只支持等值比较

坑3:忘记 commit

# ❌ 忘记 commit,数据不会写入数据库
session.add(city)

# ✅ 正确写法
session.add(city)
session.commit()

坑4:忘记关闭 Session

# ❌ 不关闭会占用数据库连接
session = Session()
session.add(city)
session.commit()

# ✅ 正确写法
session = Session()
try:
    session.add(city)
    session.commit()
finally:
    session.close()

# 或使用上下文管理器
with Session() as session:
    session.add(city)
    session.commit()

八、项目总结

8.1 技术要点

技术点 说明
SQLAlchemy ORM 用 Python 类映射数据库表,告别手写 SQL
Session 数据库会话,所有 CRUD 操作的核心
Pandas 从数据库读取数据,进行统计分析
Matplotlib 生成可视化图表

8.2 项目流程

1. 设计表结构(ORM 类定义)
2. 创建数据库连接和 Session
3. 采集数据并入库
4. Pandas 读取数据进行分析
5. Matplotlib 生成可视化图表

8.3 后续优化方向

方向 说明
真实 API 接入心知天气、和风天气等真实 API
定时任务 使用 Airflow 定时采集数据
Web 界面 使用 Flask/FastAPI 提供 Web 界面
数据建模 设计星型模型,支持更多分析维度

参考资料

更多推荐