用Python+Supermind打造智能盯盘系统:告别低效手动监控

盯着电脑屏幕反复刷新行情数据,生怕错过任何一个买卖信号——这是大多数散户投资者的日常。但在这个算法交易主导的时代,手动盯盘不仅效率低下,还容易因情绪波动导致决策失误。本文将带你用Python和Supermind构建一个全天候自动运行的智能监控系统,当股价突破关键点位或成交量异常波动时,系统会立即通过微信通知你,让你在咖啡厅也能掌握市场脉搏。

1. 系统架构设计与环境准备

一个完整的自动盯盘系统需要三大核心组件: 实时数据获取层 策略判断层 消息通知层 。Supermind提供的Python SDK将成为我们的数据引擎,而轻量级的schedule库则负责定时任务调度。整个系统可以部署在树莓派或云服务器上持续运行。

1.1 开发环境配置

首先确保已安装Python 3.7+环境,推荐使用Miniconda创建独立环境:

conda create -n stock_monitor python=3.8
conda activate stock_monitor

安装必要的依赖库:

pip install supermind-sdk schedule requests pandas

提示:Supermind SDK需要API密钥,前往官网注册后可在个人中心获取。免费版提供基础行情数据,对于个人监控系统完全够用。

1.2 初始化Supermind客户端

创建 sm_client.py 文件配置基础连接:

from supermind import Supermind

sm = Supermind(
    api_key="your_api_key",
    timeout=10,
    retry_count=3
)

测试数据接口是否通畅:

# 获取贵州茅台最新行情
data = sm.stock_quote(symbol="600519")
print(data['latest_price'])

2. 核心监控策略实现

监控系统的价值在于其策略逻辑。我们首先实现最常用的突破型策略,当股价突破布林带上轨或下轨时触发警报。

2.1 布林带突破策略

strategies.py 中定义策略类:

import numpy as np

class BollingerStrategy:
    def __init__(self, window=20, num_std=2):
        self.window = window
        self.num_std = num_std
        
    def check_signal(self, history_data):
        closes = [d['close'] for d in history_data]
        if len(closes) < self.window:
            return None
            
        rolling_mean = np.mean(closes[-self.window:])
        rolling_std = np.std(closes[-self.window:])
        
        upper_band = rolling_mean + (rolling_std * self.num_std)
        lower_band = rolling_mean - (rolling_std * self.num_std)
        
        current_price = closes[-1]
        
        if current_price > upper_band:
            return "UPPER_BREAK"
        elif current_price < lower_band:
            return "LOWER_BREAK"
        return None

2.2 成交量异动监测

成交量突然放大往往是变盘信号。添加成交量监测逻辑:

class VolumeAlertStrategy:
    def __init__(self, multiplier=2.5, lookback_days=5):
        self.multiplier = multiplier
        self.lookback = lookback_days
        
    def check_signal(self, history_data):
        volumes = [d['volume'] for d in history_data]
        if len(volumes) < self.lookback:
            return None
            
        avg_volume = np.mean(volumes[-self.lookback:])
        current_volume = volumes[-1]
        
        if current_volume > avg_volume * self.multiplier:
            return "VOLUME_SPIKE"
        return None

3. 实时数据获取与策略调度

3.1 构建数据获取管道

创建 data_fetcher.py 处理实时数据:

from datetime import datetime
import time

class DataFetcher:
    def __init__(self, sm_client, symbols):
        self.client = sm_client
        self.symbols = symbols
        
    def fetch_realtime_data(self):
        results = {}
        for symbol in self.symbols:
            try:
                data = self.client.stock_quote(symbol)
                results[symbol] = {
                    'price': data['latest_price'],
                    'volume': data['volume'],
                    'time': datetime.now()
                }
            except Exception as e:
                print(f"Error fetching {symbol}: {str(e)}")
        return results

3.2 定时任务调度器

scheduler.py 中实现主循环:

import schedule
from time import sleep

class MonitorScheduler:
    def __init__(self, fetcher, strategies, notifier):
        self.fetcher = fetcher
        self.strategies = strategies
        self.notifier = notifier
        self.history = {symbol: [] for symbol in fetcher.symbols}
        
    def run_pipeline(self):
        print("Running monitoring cycle...")
        new_data = self.fetcher.fetch_realtime_data()
        
        for symbol, data in new_data.items():
            self.history[symbol].append(data)
            # 保留最近100条数据
            if len(self.history[symbol]) > 100:
                self.history[symbol] = self.history[symbol][-100:]
                
            for strategy in self.strategies:
                signal = strategy.check_signal(self.history[symbol])
                if signal:
                    msg = f"{symbol} 触发信号: {signal} | 价格: {data['price']}"
                    self.notifier.send(msg)
                    
    def start(self, interval_minutes=5):
        schedule.every(interval_minutes).minutes.do(self.run_pipeline)
        while True:
            schedule.run_pending()
            sleep(1)

4. 消息通知系统集成

4.1 微信通知实现

通过Server酱实现微信推送(需扫码关注公众号获取SCKEY):

import requests

class WechatNotifier:
    def __init__(self, sckey):
        self.base_url = f"https://sc.ftqq.com/{sckey}.send"
        
    def send(self, message):
        params = {
            "text": "股票监控提醒",
            "desp": message
        }
        try:
            requests.get(self.base_url, params=params)
        except Exception as e:
            print(f"微信通知发送失败: {str(e)}")

4.2 邮件通知备选方案

对于重要信号,可以添加邮件通知作为双重保障:

import smtplib
from email.mime.text import MIMEText

class EmailNotifier:
    def __init__(self, sender, password, receivers):
        self.sender = sender
        self.password = password
        self.receivers = receivers
        
    def send(self, message):
        msg = MIMEText(message, 'plain', 'utf-8')
        msg['Subject'] = '股票监控紧急通知'
        msg['From'] = self.sender
        msg['To'] = ', '.join(self.receivers)
        
        try:
            server = smtplib.SMTP_SSL('smtp.qq.com', 465)
            server.login(self.sender, self.password)
            server.sendmail(self.sender, self.receivers, msg.as_string())
            server.quit()
        except Exception as e:
            print(f"邮件发送失败: {str(e)}")

5. 系统部署与优化技巧

5.1 使用PM2持久化运行

在Linux服务器上,用PM2管理Python进程:

pm2 start monitor.py --name "stock_monitor" --interpreter python3
pm2 save
pm2 startup

5.2 策略参数优化建议

不同股票适用的参数可能不同:

股票类型 布林带窗口 标准差倍数 成交量回溯天数
大盘蓝筹股 20 2 5
中小创股票 15 1.8 3
科创板股票 10 1.5 2

5.3 异常处理机制增强

MonitorScheduler 中添加错误恢复逻辑:

def run_pipeline(self):
    try:
        # 原有逻辑...
    except Exception as e:
        error_msg = f"监控系统异常: {str(e)}"
        self.notifier.send(error_msg)
        # 等待5分钟后重试
        sleep(300)

6. 扩展更多实用策略

6.1 MACD金叉死叉策略

class MACDStrategy:
    def __init__(self, fast=12, slow=26, signal=9):
        self.fast = fast
        self.slow = slow
        self.signal = signal
        
    def check_signal(self, history_data):
        closes = [d['close'] for d in history_data]
        if len(closes) < self.slow + self.signal:
            return None
            
        # 计算EMA
        ema_fast = self._calc_ema(closes, self.fast)
        ema_slow = self._calc_ema(closes, self.slow)
        dif = ema_fast - ema_slow
        dea = self._calc_ema(dif, self.signal)
        macd = dif - dea
        
        # 金叉死叉判断
        if dif[-1] > dea[-1] and dif[-2] <= dea[-2]:
            return "MACD_GOLDEN_CROSS"
        elif dif[-1] < dea[-1] and dif[-2] >= dea[-2]:
            return "MACD_DEAD_CROSS"
        return None

6.2 多策略组合监控

创建策略组合器提高信号质量:

class StrategyCombo:
    def __init__(self, strategies, required_votes=2):
        self.strategies = strategies
        self.required = required_votes
        
    def check_signal(self, history_data):
        signals = []
        for strategy in self.strategies:
            signal = strategy.check_signal(history_data)
            if signal:
                signals.append(signal)
                
        if len(signals) >= self.required:
            return f"COMBO_TRIGGER({'+'.join(signals)})"
        return None

7. 实战案例:监控宁德时代关键点位

假设我们要监控宁德时代(300750)的突破情况:

if __name__ == "__main__":
    symbols = ["300750"]
    
    # 初始化各组件
    sm = Supermind(api_key="your_key")
    fetcher = DataFetcher(sm, symbols)
    
    # 配置策略
    strategies = [
        BollingerStrategy(window=20, num_std=2),
        VolumeAlertStrategy(multiplier=3)
    ]
    
    # 配置通知
    notifier = WechatNotifier(sckey="your_sckey")
    
    # 启动监控
    scheduler = MonitorScheduler(fetcher, strategies, notifier)
    scheduler.start(interval_minutes=3)

运行后当出现以下情况时会收到微信提醒:

  • 股价突破布林带上轨或下轨
  • 成交量达到过去5日均值的3倍以上

更多推荐