iTick 港美股即時行情API結合富途牛牛交易接口實現個人量化交易——高頻股票API - iTick

在當今數字化金融時代,個人投資者通過 API 接口實現量化交易已成為可能。本文將詳細介紹如何利用 iTick 提供的港美股即時行情 API,結合富途牛牛(Futu OpenAPI)的交易接口,構建一個完整的個人量化交易系統。我們將從系統架構設計開始,逐步深入到具體實現,最後提供完整的 Python 代碼和回測策略。

一、系統架構設計

本系統採用模組化設計,各組件之間鬆耦合,便於單獨擴展和維護。以下是完整的系統架構:

二、技術選型與準備

1. 行情數據源 - iTick API

iTick 提供穩定可靠的港美股即時行情數據,具有以下特點:

  • 低延遲:毫秒級行情推送
  • 全面覆蓋:港股、美股、A 股(通過港股通)等全球市場
  • 豐富數據類型:tick 數據、分鐘線、日線等
  • 穩定可靠:99.9%的可用性保證

2. 交易接口 - 富途牛牛 Futu OpenAPI

富途牛牛開放平台提供完善的交易 API:

  • 支援港股、美股、A 股(港股通)交易
  • 提供模擬交易環境
  • 完善的文檔和社區支持
  • 相對較高的交易執行速度

3. 開發環境準備

      # 所需Python庫
import requests  # 用於API調用
import pandas as pd  # 數據處理
import numpy as np  # 數值計算
from futu import *  # 富途OpenAPI
import matplotlib.pyplot as plt  # 可視化
import sqlite3  # 本地數據存儲
import time  # 時間控制
import threading  # 多線程處理

    

三、系統核心模組實現

1. 行情數據獲取模組

      class ITickDataFetcher:

     def __init__(self):
            self.headers = {
             "accept": "application/json",
             "token": ITICK_API_KEY
            }

    def get_realtime_data(self,symbol,region):
        """獲取即時行情數據"""
       url = f"https://api.itick.org/stock/quote?symbol={symbol}&region={region}"
        try:

            response = requests.get(url, headers=self.headers)
            data = response.json()
            return self._parse_data(data)
        except Exception as e:
            print(f"獲取即時數據失敗: {str(e)}")
            return None

    def get_historical_data(self,symbol, region, freq='1'):
        """獲取歷史數據"""
            url = f"https://api.itick.org/stock/kline?symbol={symbol}&region={region}&kType={freq}"
            try:
                response = requests.get(url, headers=self.headers)
                data = response.json()
                df = self._parse_historical_data(data)
            except Exception as e:
                print(f"獲取{symbol}歷史數據失敗: {str(e)}")
        return None

    def _parse_data(self, raw_data):
        """解析即時數據"""
        # 實現數據解析邏輯
        pass

    def _parse_historical_data(self, raw_data):
        """解析歷史數據"""
        # 實現歷史數據解析邏輯
        pass

    

2. 數據預處理模組

      class DataProcessor:
    def __init__(self):
        self.conn = sqlite3.connect('quant_trading.db')

    def clean_data(self, df):
        """數據清洗"""
        # 處理缺失值
        df = df.dropna()
        # 去除異常值
        df = df[(df['v'] > 0) & (df['c'] > 0)]
        return df

    def calculate_technical_indicators(self, df):
        """計算技術指標"""
        # 移動平均線
        df['ma5'] = df['c'].rolling(5).mean()
        df['ma20'] = df['c'].rolling(20).mean()

        # RSI
        delta = df['c'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
        rs = gain / loss
        df['rsi'] = 100 - (100 / (1 + rs))

        # MACD
        exp12 = df['c'].ewm(span=12, adjust=False).mean()
        exp26 = df['c'].ewm(span=26, adjust=False).mean()
        df['macd'] = exp12 - exp26
        df['signal'] = df['macd'].ewm(span=9, adjust=False).mean()

        return df

    def save_to_db(self, df, table_name):
        """保存數據到數據庫"""
        df.to_sql(table_name, self.conn, if_exists='append', index=False)

    def load_from_db(self, table_name):
        """從數據庫加載數據"""
        return pd.read_sql(f"SELECT * FROM {table_name}", self.conn)

    

3. 策略引擎

      class DualMovingAverageStrategy:
    def __init__(self, fast_window=5, slow_window=20):
        self.fast_window = fast_window
        self.slow_window = slow_window
        self.position = 0  # 0:無持倉, 1:多頭, -1:空頭
        self.signals = []

    def generate_signals(self, df):
        """生成交易信號"""
        signals = pd.DataFrame(index=df.index)
        signals['price'] = df['c']
        signals['fast_ma'] = df['c'].rolling(self.fast_window).mean()
        signals['slow_ma'] = df['c'].rolling(self.slow_window).mean()

        # 生成交易信號
        signals['signal'] = 0
        signals['signal'][self.fast_window:] = np.where(
            signals['fast_ma'][self.fast_window:] > signals['slow_ma'][self.fast_window:], 1, 0)

        # 計算實際的買賣信號
        signals['positions'] = signals['signal'].diff()

        return signals

    def on_tick(self, tick_data):
        """即時行情處理"""
        # 實現即時行情處理邏輯
        pass

    

4. 交易執行模組

      class FutuTrader:
    def __init__(self, host='127.0.0.1', port=11111):
        self.quote_ctx = OpenQuoteContext(host=host, port=port)
        self.trade_ctx = OpenTradeContext(host=host, port=port)

    def get_account_info(self):
        """獲取賬戶信息"""
        ret, data = self.trade_ctx.accinfo_query()
        if ret == RET_OK:
            return data
        else:
            print(f"獲取賬戶信息失敗: {data}")
            return None

    def place_order(self, symbol, price, quantity, trd_side, order_type=OrderType.NORMAL):
        """下單"""
        ret, data = self.trade_ctx.place_order(
            price=price,
            qty=quantity,
            code=symbol,
            trd_side=trd_side,
            order_type=order_type,
            trd_env=TrdEnv.SIMULATE  # 模擬交易,實盤改為TrdEnv.REAL
        )
        if ret == RET_OK:
            print(f"下單成功: {data}")
            return data
        else:
            print(f"下單失敗: {data}")
            return None

    def close_all_positions(self):
        """平掉所有倉位"""
        ret, data = self.trade_ctx.position_list_query()
        if ret == RET_OK:
            for _, row in data.iterrows():
                if row['qty'] > 0:
                    self.place_order(
                        row['code'],
                        row['cost_price'],
                        row['qty'],
                        TrdSide.SELL
                    )
        else:
            print(f"獲取持倉失敗: {data}")

    def subscribe(self, symbols, subtype_list=[SubType.QUOTE]):
        """訂閱行情"""
        ret, data = self.quote_ctx.subscribe(symbols, subtype_list)
        if ret != RET_OK:
            print(f"訂閱行情失敗: {data}")

    def unsubscribe(self, symbols, subtype_list=[SubType.QUOTE]):
        """取消訂閱"""
        ret, data = self.quote_ctx.unsubscribe(symbols, subtype_list)
        if ret != RET_OK:
            print(f"取消訂閱失敗: {data}")

    def __del__(self):
        self.quote_ctx.close()
        self.trade_ctx.close()

    

四、回測系統實現

      class BacktestEngine:
    def __init__(self, initial_capital=100000):
        self.initial_capital = initial_capital
        self.results = None

    def run_backtest(self, strategy, data):
        """運行回測"""
        signals = strategy.generate_signals(data)

        # 初始化投資組合
        portfolio = pd.DataFrame(index=signals.index)
        portfolio['signal'] = signals['positions']
        portfolio['price'] = signals['price']
        portfolio['cash'] = self.initial_capital
        portfolio['shares'] = 0
        portfolio['total'] = self.initial_capital

        # 執行交易
        for i in range(1, len(portfolio)):
            # 買入信號
            if portfolio['signal'][i] == 1:
                shares_to_buy = portfolio['cash'][i-1] // portfolio['price'][i]
                portfolio['shares'][i] = portfolio['shares'][i-1] + shares_to_buy
                portfolio['cash'][i] = portfolio['cash'][i-1] - shares_to_buy * portfolio['price'][i]

            # 賣出信號
            elif portfolio['signal'][i] == -1:
                portfolio['cash'][i] = portfolio['cash'][i-1] + portfolio['shares'][i-1] * portfolio['price'][i]
                portfolio['shares'][i] = 0

            # 無信號
            else:
                portfolio['shares'][i] = portfolio['shares'][i-1]
                portfolio['cash'][i] = portfolio['cash'][i-1]

            # 更新總資產
            portfolio['total'][i] = portfolio['cash'][i] + portfolio['shares'][i] * portfolio['price'][i]

        self.results = portfolio
        return portfolio

    def analyze_results(self):
        """分析回測結果"""
        if self.results is None:
            print("請先運行回測")
            return None

        returns = self.results['total'].pct_change()
        cumulative_returns = (returns + 1).cumprod()

        # 計算年化收益率
        total_days = len(self.results)
        annualized_return = (cumulative_returns.iloc[-1] ** (252/total_days)) - 1

        # 計算最大回撤
        max_drawdown = (self.results['total'].cummax() - self.results['total']).max()

        # 計算夏普比率
        sharpe_ratio = returns.mean() / returns.std() * np.sqrt(252)

        # 可視化
        plt.figure(figsize=(12, 6))
        plt.plot(cumulative_returns)
        plt.title("Cumulative Returns")
        plt.xlabel("Date")
        plt.ylabel("Returns")
        plt.grid(True)
        plt.show()

        return {
            'annualized_return': annualized_return,
            'max_drawdown': max_drawdown,
            'sharpe_ratio': sharpe_ratio,
            'final_value': self.results['total'].iloc[-1]
        }

    

五、總結

本文詳細介紹了如何利用iTick港美股即時行情API和富途牛牛交易接口構建個人量化交易系統。系統包含以下核心組件:

  • 行情數據獲取模組:從iTick API獲取即時和歷史數據
  • 數據預處理模組:清洗數據並計算技術指標
  • 策略引擎:實現雙均線交易策略
  • 交易執行模組:通過富途OpenAPI執行交易
  • 回測系統:評估策略歷史表現