cs-lstm Mac M5复现(python 3.11+)
1. 安装官方 Miniconda替代
curl -O https://repo.anaconda.com/miniconda/Miniconda3-latest-MacOSX-arm64.sh
bash Miniconda3-latest-MacOSX-arm64.sh -b -p $HOME/miniconda3
source $HOME/miniconda3/bin/activate
conda init zsh
2.接受服务条款
接受 main 渠道的服务条款
conda tos accept --override-channels --channel https://repo.anaconda.com/pkgs/main
接受 r 渠道的服务条款
conda tos accept --override-channels --channel https://repo.anaconda.com/pkgs/r
执行后应该显示类似 Terms of Service accepted 的提示。
注:常用 conda 操作参考
|
操作 |
命令 |
|
列出所有环境 |
conda env list 或 conda info --envs |
|
创建新环境 |
conda create -n 环境名 python=3.x -y |
|
激活环境 |
conda activate 环境名 |
|
退出环境 |
conda deactivate |
|
删除环境 |
conda remove -n 环境名 --all |
|
安装包 |
conda install 包名 |
|
升级 conda |
conda update conda |
|
查看已安装包 |
conda list |
3.建立虚拟环境
conda create -n cslstm python=3.10 -y
验证环境列表
conda env list
激活环境
conda activate cslstm
4.安装pytorch # 默认从 PyPI 拉到 ARM64 版
pip install torch torchvision torchaudio
5.安装深度学习常用包
# 数据加载和科学计算
pip install numpy pandas scikit-learn matplotlib
# 实验跟踪和可视化
pip install wandb tensorboard
# 数据处理
pip install opencv-python pillow scikit-image
# 深度学习工具
pip install timm datasets
# 科学计算 + 小波 + 数据工具
pip install tqdm
pip install PyWavelets # 小波变换(模块①)
pip install scipy # FFT
# 安装 pywt
pip install pywt PyWavelets
# 安装 TensorBoard
pip install tensorboard
注:numpy要在2以下
a.先查看当前 NumPy 版本
python -c "import numpy; print(numpy.__version__)"
b.降级到 NumPy 1.x
pip install "numpy<2"
c.验证降级成功
python -c "import numpy; print(numpy.__version__)"
# 预期输出:1.26.x 或 1.24.x
6.验证 MPS 可用:
python -c 'import torch;print(torch.backends.mps.is_available())'
7.官方代码仓库:
可以手动下载
git clone https://github.com/NESA-Lab/Contextual-and-Seasonal-LSTMs-for-TSAD.git
cd Contextual-and-Seasonal-LSTMs-for-TSAD
8.修改代码指定用cpu跑
cs_lstms.py
import torch
import numpy as np
from torch import nn
from time_series_decomp import Series_decop
class CSLSTMs(nn.Module):
def __init__(
self,
hp,
):
super(CSLSTMs, self).__init__()
self.hp = hp
self.lstm = nn.LSTM(2 * self.hp.contextual_window + 2, self.hp.d_model // 4,
batch_first=True, dropout=self.hp.dropout_rate)
self.cycle_lstm = nn.LSTM(2 * self.hp.seasonal_window + 2, self.hp.d_model,
batch_first=True, dropout=self.hp.dropout_rate)
self.series_decop = Series_decop(level=int(np.round(np.log2(self.hp.window)) - 4), hp=self.hp)
self.hour_linear = nn.Sequential(
nn.Linear(self.hp.d_model // 4, self.hp.d_model // 4),
nn.Tanh(),
nn.Linear(self.hp.d_model // 4, self.hp.d_model // 4),
nn.Tanh(),
nn.Linear(self.hp.d_model // 4, self.hp.d_model // 4),
nn.Tanh(),
nn.Linear(self.hp.d_model // 4, self.hp.contextual_window + 2),
nn.Tanh()
)
self.day_linear = nn.Sequential(
nn.Linear(self.hp.d_model, self.hp.d_model),
nn.Tanh(),
nn.Linear(self.hp.d_model, self.hp.d_model),
nn.Tanh(),
nn.Linear(self.hp.d_model, self.hp.d_model),
nn.Tanh(),
nn.Linear(self.hp.d_model, self.hp.seasonal_window + 2),
nn.Tanh()
)
self.hour_mu = nn.Linear(self.hp.contextual_window, self.hp.contextual_window)
self.hour_log_var = nn.Linear(self.hp.contextual_window, self.hp.contextual_window)
self.day_mu = nn.Linear(self.hp.seasonal_window, self.hp.seasonal_window)
self.day_log_var = nn.Linear(self.hp.seasonal_window, self.hp.seasonal_window)
def get_input(self, trend_signal):
# 取最后 24 个元素,求一天内的影响
hour_input = trend_signal.clone()
hour_input = hour_input[:, :, -self.hp.contextual_window * 4:]
hour_input = hour_input.unfold(dimension=2, size=self.hp.contextual_window, step=self.hp.step)
hour_input = hour_input.squeeze(1)
f_hour_input = hour_input.clone()
f_hour_input[:, -1, -1] = 0
f_hour_input = self.get_freq(f_hour_input)
hour_input = torch.cat((hour_input, f_hour_input), dim=-1)
# 整个窗口进行切分,求天与天之间的关系
day_input = trend_signal.clone()
day_input = day_input.unfold(dimension=2, size=self.hp.seasonal_window, step=self.hp.cycle)
day_input = day_input.squeeze(1)
f_day_input = day_input.clone()
f_day_input[:, -1, -1] = 0
f_day_input = self.get_freq(f_day_input)
day_input = torch.cat((day_input, f_day_input), dim=-1)
return day_input, hour_input
def get_freq(self, input):
"""
[修改] 将 FFT 计算强制在 CPU 上进行,避免 MPS 不支持复数操作
"""
original_device = input.device
input_cpu = input.to("cpu")
f_global = torch.fft.rfft(input_cpu, dim=-1)
f_global = torch.cat((f_global.real, f_global.imag), dim=-1)
return f_global.to(original_device)
def encode(self, trend_signal):
day_input, hour_input = self.get_input(trend_signal)
# 计算 hour result
result, h = self.lstm(hour_input)
result_hour = self.hour_linear(result[:, -1, :]).unsqueeze(1)
hour_real = result_hour[:, :, :(self.hp.contextual_window // 2 + 1)]
hour_imag = result_hour[:, :, (self.hp.contextual_window // 2 + 1):]
f_hour = torch.stack((hour_real, hour_imag), dim=-1)
# [修改] 复数操作迁移到 CPU
f_hour_cpu = f_hour.to("cpu")
f_hour_cpu = torch.view_as_complex(f_hour_cpu)
result_hour_cpu = torch.fft.irfft(f_hour_cpu)
f_hour = result_hour_cpu.to(result_hour.device)
hour_mu = self.hour_mu(f_hour)
hour_log_var = self.hour_log_var(f_hour)
# 计算 day result
result, h = self.cycle_lstm(day_input)
result_day = self.day_linear(result[:, -1, :]).unsqueeze(1)
day_real = result_day[:, :, :(self.hp.seasonal_window // 2 + 1)]
day_imag = result_day[:, :, (self.hp.seasonal_window // 2 + 1):]
f_day = torch.stack((day_real, day_imag), dim=-1)
# [修改] 复数操作迁移到 CPU
f_day_cpu = f_day.to("cpu")
f_day_cpu = torch.view_as_complex(f_day_cpu)
result_day_cpu = torch.fft.irfft(f_day_cpu)
f_day = result_day_cpu.to(result_day.device)
day_mu = self.day_mu(f_day)
day_log_var = self.day_log_var(f_day)
return hour_mu, hour_log_var, day_mu, day_log_var
def forward(self, input, input_normal, mode, mask):
if mode == "train":
loss = self.loss_func(input, input_normal, mask)
return loss
elif mode == "valid":
loss = self.loss_func(input, input_normal, mask)
return loss
else:
return self.reference(input, input_normal)
def loss_func(self, input, input_normal, mask):
trend_signal = self.series_decop(input)
hour_mu, hour_log_var, day_mu, day_log_var = self.encode(input)
trend_day = trend_signal[:, :, -self.hp.seasonal_window:].squeeze(1)
trend_hour = trend_signal[:, :, -self.hp.contextual_window:].squeeze(1)
input_day = input_normal[:, :, -self.hp.seasonal_window:].squeeze(1)
input_hour = input_normal[:, :, -self.hp.contextual_window:].squeeze(1)
mask_day = mask[:, -self.hp.seasonal_window:]
mask_hour = mask[:, -self.hp.contextual_window:]
hour_mu = hour_mu.squeeze(1)
hour_log_var = hour_log_var.squeeze(1)
day_mu = day_mu.squeeze(1)
day_log_var = day_log_var.squeeze(1)
input_day = input_day * mask_day + trend_day * torch.logical_not(mask_day)
input_hour = input_hour * mask_hour + trend_hour * torch.logical_not(mask_hour)
hour_recon_loss = torch.mean(
0.5 * torch.mean((hour_log_var + (input_hour - hour_mu) ** 2 / torch.exp(hour_log_var)), dim=-1),
dim=0
)
day_recon_loss = torch.mean(
0.5 * torch.mean((day_log_var + (input_day - day_mu) ** 2 / torch.exp(day_log_var)), dim=-1),
dim=0
)
recon_loss = day_recon_loss + hour_recon_loss
loss = recon_loss
if torch.isinf(loss):
raise
return loss
def reference(self, input, input_normal):
loss = 0
input_hour = input_normal[:, :, -self.hp.contextual_window:]
input_day = input_normal[:, :, -self.hp.seasonal_window:]
hour_mu, hour_log_var, day_mu, day_log_var = self.encode(input)
hour_loss = (hour_log_var + (input_hour - hour_mu) ** 2 / torch.exp(hour_log_var))
hour_loss = hour_loss[:, :, -1].unsqueeze(2)
cycle_loss = (day_log_var + (input_day - day_mu) ** 2 / torch.exp(day_log_var))
cycle_loss = cycle_loss[:, :, -1].unsqueeze(2)
loss = hour_loss + cycle_loss
recon_x = (hour_mu[:, :, -1].unsqueeze(2) + day_mu[:, :, -1].unsqueeze(2)) / 2
return loss, recon_x
train.py
import os
# [新增] 强制使用 CPU,避免 MPS 的 ComplexFloat 问题
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"
os.environ["TORCH.backends.mps.enabled"] = "0"
import logging
import numpy as np
import torch
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint
from model import MyCSLSTMs
from pytorch_lightning.loggers import TensorBoardLogger
import argparse
import time
import json
from collections import defaultdict
SEED = 8
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
np.random.seed(SEED)
logger = TensorBoardLogger(name="logs", save_dir="./")
def main(hparams):
print("loading model...")
model = MyCSLSTMs(hparams)
print("model built")
early_stop = EarlyStopping(
monitor="val_loss_valid_epoch", patience=3, verbose=True, mode="min"
)
checkpoint = ModelCheckpoint(
dirpath="./ckpt/",
filename="{}".format(hparams.data_name),
monitor="val_loss_valid_epoch",
mode="min",
)
# 【修改】自动检测可用设备:优先 MPS (Mac),其次是 CPU
if torch.backends.mps.is_available():
accelerator = "mps"
devices = 1
print("Using MPS (Apple Silicon GPU)")
elif torch.cuda.is_available():
accelerator = "gpu"
devices = [hparams.gpu]
print("Using CUDA GPU")
else:
accelerator = "cpu"
devices = 1
print("Using CPU only")
trainer = Trainer(
max_epochs=hparams.max_epoch,
callbacks=[early_stop, checkpoint],
logger=logger,
accelerator=accelerator,
devices=devices,
check_val_every_n_epoch=1,
gradient_clip_algorithm="value",
gradient_clip_val=2,
)
print("fit start")
train_loader = model.mydataloader("train")
val_loader = model.mydataloader("valid")
trainer.fit(model, train_dataloaders=train_loader, val_dataloaders=val_loader)
start_time = time.time()
trainer.test(model, dataloaders=model.mydataloader("test"))
end_time = time.time()
print("View tensorboard logs by running\ntensorboard --logdir %s" % os.getcwd())
print("and going to http://localhost:6006 on your browser")
print(end_time - start_time)
def compute_json_values_mean(json_list):
"""
读取文件内包含的JSON数组,对每个字段求均值。
假设所有JSON对象字段一致且值为数字。
参数:
json_file_path (str): JSON文件路径
返回:
dict: 每个字段的均值字典
"""
if not json_list:
return {}
field_sums = defaultdict(float)
field_counts = defaultdict(int)
for json_obj in json_list:
if isinstance(json_obj, dict):
for key, value in json_obj.items():
if isinstance(value, (int, float)):
field_sums[key] += value
field_counts[key] += 1
else:
print(f"Warning: Skipping non-dictionary item in list: {json_obj}")
averages = {}
for key in field_sums:
if field_counts[key] > 0:
averages[key] = field_sums[key] / field_counts[key]
return averages
if __name__ == "__main__":
parser = MyCSLSTMs.add_model_specific_args()
hyperparams = parser.parse_args()
print(f"RUNNING")
if hyperparams.only_test == 1:
model = MyCSLSTMs.load_from_checkpoint(checkpoint_path=hyperparams.ckpt_path)
model.hp = hyperparams
print(model.hp)
# 【修改】自动检测可用设备:优先 CPU(兼容复数操作),其次 MPS
if torch.cuda.is_available():
accelerator = "gpu"
devices = [hparams.gpu]
print("Using CUDA GPU")
elif torch.backends.mps.is_available():
# 检查是否有复数操作需求,如果有则使用 CPU
accelerator = "cpu"
devices = 1
print("MPS available but using CPU for ComplexTensor compatibility")
else:
accelerator = "cpu"
devices = 1
print("Using CPU only")
trainer = Trainer(accelerator=accelerator, devices=devices)
trainer.test(model, dataloaders=model.mydataloader("test"))
else:
main(hyperparams)
time_series_decomp.py
import pywt
import torch.nn as nn
import torch
import numpy as np
class Series_decop(nn.Module):
def __init__(self, wavlet='db4', level=5, hp=None):
super().__init__()
self.wavlet = wavlet
self.level = level
self.hp = hp
def forward(self, x):
signal = x.clone().cpu().detach().numpy()
coeffs = pywt.wavedec(signal[:,:], self.wavlet, level=self.level)
# 估计噪声阈值
eps = 1e-10
d = np.sqrt(2*np.log(signal.shape[-1]))
for i in range(1, self.level+1):
sigma = np.median(np.abs(coeffs[i])) / 0.6745
sigma = max(sigma, eps)
uthresh = sigma*d
# 对细节系数应用软阈值
coeffs[i] = pywt.threshold(coeffs[i], uthresh, mode='soft')
# 【修改】自动检测可用设备
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
trend_signal = torch.tensor(pywt.waverec(coeffs, self.wavlet)).to(device)
return trend_signal
训练数据
python train.py \
--data_name AIOPS \ #修改数据集名
--data_dir ./data/AIOPS \ #修改数据集名
--window 240 \
--seasonal_window 48 \
--cycle 48 \
--contextual_window 4 \
--step 2 \
--save_file ./result.txt \
--learning_rate 0.0005 \
--max_epoch 100 \
--batch_size 64 \
--gpu 0 \
--num_workers 2
更多推荐

所有评论(0)