
在當今數字化金融時代,個人投資者通過 API 接口實現量化交易已成為可能。本文將詳細介紹如何利用 iTick 提供的港美股即時行情 API,結合富途牛牛(Futu OpenAPI)的交易接口,構建一個完整的個人量化交易系統。我們將從系統架構設計開始,逐步深入到具體實現,最後提供完整的 Python 代碼和回測策略。
一、系統架構設計
本系統採用模組化設計,各組件之間鬆耦合,便於單獨擴展和維護。以下是完整的系統架構:
二、技術選型與準備
1. 行情數據源 - iTick API
iTick 提供穩定可靠的港美股即時行情數據,具有以下特點:
- 低延遲:毫秒級行情推送
- 全面覆蓋:港股、美股、A 股(通過港股通)等全球市場
- 豐富數據類型:tick 數據、分鐘線、日線等
- 穩定可靠:99.9%的可用性保證
2. 交易接口 - 富途牛牛 Futu OpenAPI
富途牛牛開放平台提供完善的交易 API:
- 支援港股、美股、A 股(港股通)交易
- 提供模擬交易環境
- 完善的文檔和社區支持
- 相對較高的交易執行速度
3. 開發環境準備
# 所需Python庫
import requests # 用於API調用
import pandas as pd # 數據處理
import numpy as np # 數值計算
from futu import * # 富途OpenAPI
import matplotlib.pyplot as plt # 可視化
import sqlite3 # 本地數據存儲
import time # 時間控制
import threading # 多線程處理
三、系統核心模組實現
1. 行情數據獲取模組
class ITickDataFetcher:
def __init__(self):
self.headers = {
"accept": "application/json",
"token": ITICK_API_KEY
}
def get_realtime_data(self,symbol,region):
"""獲取即時行情數據"""
url = f"https://api.itick.org/stock/quote?symbol={symbol}®ion={region}"
try:
response = requests.get(url, headers=self.headers)
data = response.json()
return self._parse_data(data)
except Exception as e:
print(f"獲取即時數據失敗: {str(e)}")
return None
def get_historical_data(self,symbol, region, freq='1'):
"""獲取歷史數據"""
url = f"https://api.itick.org/stock/kline?symbol={symbol}®ion={region}&kType={freq}"
try:
response = requests.get(url, headers=self.headers)
data = response.json()
df = self._parse_historical_data(data)
except Exception as e:
print(f"獲取{symbol}歷史數據失敗: {str(e)}")
return None
def _parse_data(self, raw_data):
"""解析即時數據"""
# 實現數據解析邏輯
pass
def _parse_historical_data(self, raw_data):
"""解析歷史數據"""
# 實現歷史數據解析邏輯
pass
2. 數據預處理模組
class DataProcessor:
def __init__(self):
self.conn = sqlite3.connect('quant_trading.db')
def clean_data(self, df):
"""數據清洗"""
# 處理缺失值
df = df.dropna()
# 去除異常值
df = df[(df['v'] > 0) & (df['c'] > 0)]
return df
def calculate_technical_indicators(self, df):
"""計算技術指標"""
# 移動平均線
df['ma5'] = df['c'].rolling(5).mean()
df['ma20'] = df['c'].rolling(20).mean()
# RSI
delta = df['c'].diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
# MACD
exp12 = df['c'].ewm(span=12, adjust=False).mean()
exp26 = df['c'].ewm(span=26, adjust=False).mean()
df['macd'] = exp12 - exp26
df['signal'] = df['macd'].ewm(span=9, adjust=False).mean()
return df
def save_to_db(self, df, table_name):
"""保存數據到數據庫"""
df.to_sql(table_name, self.conn, if_exists='append', index=False)
def load_from_db(self, table_name):
"""從數據庫加載數據"""
return pd.read_sql(f"SELECT * FROM {table_name}", self.conn)
3. 策略引擎
class DualMovingAverageStrategy:
def __init__(self, fast_window=5, slow_window=20):
self.fast_window = fast_window
self.slow_window = slow_window
self.position = 0 # 0:無持倉, 1:多頭, -1:空頭
self.signals = []
def generate_signals(self, df):
"""生成交易信號"""
signals = pd.DataFrame(index=df.index)
signals['price'] = df['c']
signals['fast_ma'] = df['c'].rolling(self.fast_window).mean()
signals['slow_ma'] = df['c'].rolling(self.slow_window).mean()
# 生成交易信號
signals['signal'] = 0
signals['signal'][self.fast_window:] = np.where(
signals['fast_ma'][self.fast_window:] > signals['slow_ma'][self.fast_window:], 1, 0)
# 計算實際的買賣信號
signals['positions'] = signals['signal'].diff()
return signals
def on_tick(self, tick_data):
"""即時行情處理"""
# 實現即時行情處理邏輯
pass
4. 交易執行模組
class FutuTrader:
def __init__(self, host='127.0.0.1', port=11111):
self.quote_ctx = OpenQuoteContext(host=host, port=port)
self.trade_ctx = OpenTradeContext(host=host, port=port)
def get_account_info(self):
"""獲取賬戶信息"""
ret, data = self.trade_ctx.accinfo_query()
if ret == RET_OK:
return data
else:
print(f"獲取賬戶信息失敗: {data}")
return None
def place_order(self, symbol, price, quantity, trd_side, order_type=OrderType.NORMAL):
"""下單"""
ret, data = self.trade_ctx.place_order(
price=price,
qty=quantity,
code=symbol,
trd_side=trd_side,
order_type=order_type,
trd_env=TrdEnv.SIMULATE # 模擬交易,實盤改為TrdEnv.REAL
)
if ret == RET_OK:
print(f"下單成功: {data}")
return data
else:
print(f"下單失敗: {data}")
return None
def close_all_positions(self):
"""平掉所有倉位"""
ret, data = self.trade_ctx.position_list_query()
if ret == RET_OK:
for _, row in data.iterrows():
if row['qty'] > 0:
self.place_order(
row['code'],
row['cost_price'],
row['qty'],
TrdSide.SELL
)
else:
print(f"獲取持倉失敗: {data}")
def subscribe(self, symbols, subtype_list=[SubType.QUOTE]):
"""訂閱行情"""
ret, data = self.quote_ctx.subscribe(symbols, subtype_list)
if ret != RET_OK:
print(f"訂閱行情失敗: {data}")
def unsubscribe(self, symbols, subtype_list=[SubType.QUOTE]):
"""取消訂閱"""
ret, data = self.quote_ctx.unsubscribe(symbols, subtype_list)
if ret != RET_OK:
print(f"取消訂閱失敗: {data}")
def __del__(self):
self.quote_ctx.close()
self.trade_ctx.close()
四、回測系統實現
class BacktestEngine:
def __init__(self, initial_capital=100000):
self.initial_capital = initial_capital
self.results = None
def run_backtest(self, strategy, data):
"""運行回測"""
signals = strategy.generate_signals(data)
# 初始化投資組合
portfolio = pd.DataFrame(index=signals.index)
portfolio['signal'] = signals['positions']
portfolio['price'] = signals['price']
portfolio['cash'] = self.initial_capital
portfolio['shares'] = 0
portfolio['total'] = self.initial_capital
# 執行交易
for i in range(1, len(portfolio)):
# 買入信號
if portfolio['signal'][i] == 1:
shares_to_buy = portfolio['cash'][i-1] // portfolio['price'][i]
portfolio['shares'][i] = portfolio['shares'][i-1] + shares_to_buy
portfolio['cash'][i] = portfolio['cash'][i-1] - shares_to_buy * portfolio['price'][i]
# 賣出信號
elif portfolio['signal'][i] == -1:
portfolio['cash'][i] = portfolio['cash'][i-1] + portfolio['shares'][i-1] * portfolio['price'][i]
portfolio['shares'][i] = 0
# 無信號
else:
portfolio['shares'][i] = portfolio['shares'][i-1]
portfolio['cash'][i] = portfolio['cash'][i-1]
# 更新總資產
portfolio['total'][i] = portfolio['cash'][i] + portfolio['shares'][i] * portfolio['price'][i]
self.results = portfolio
return portfolio
def analyze_results(self):
"""分析回測結果"""
if self.results is None:
print("請先運行回測")
return None
returns = self.results['total'].pct_change()
cumulative_returns = (returns + 1).cumprod()
# 計算年化收益率
total_days = len(self.results)
annualized_return = (cumulative_returns.iloc[-1] ** (252/total_days)) - 1
# 計算最大回撤
max_drawdown = (self.results['total'].cummax() - self.results['total']).max()
# 計算夏普比率
sharpe_ratio = returns.mean() / returns.std() * np.sqrt(252)
# 可視化
plt.figure(figsize=(12, 6))
plt.plot(cumulative_returns)
plt.title("Cumulative Returns")
plt.xlabel("Date")
plt.ylabel("Returns")
plt.grid(True)
plt.show()
return {
'annualized_return': annualized_return,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe_ratio,
'final_value': self.results['total'].iloc[-1]
}
五、總結
本文詳細介紹了如何利用iTick港美股即時行情API和富途牛牛交易接口構建個人量化交易系統。系統包含以下核心組件:
- 行情數據獲取模組:從iTick API獲取即時和歷史數據
- 數據預處理模組:清洗數據並計算技術指標
- 策略引擎:實現雙均線交易策略
- 交易執行模組:通過富途OpenAPI執行交易
- 回測系統:評估策略歷史表現


