实战教程:使用API获取日本股市前100支股票数据
引言
日本作为全球第三大股票市场,拥有东京证券交易所、大阪证券交易所等重要平台,吸引了众多全球投资者的目光。对于开发者而言,如何快速获取日本股票数据并进行有效分析是一个重要的技术挑战。本文将基于StockTV API,手把手教你如何获取日本股市前100支股票数据,并完成一个完整的实战训练项目。
日本股市以其成熟的投资者结构、高外资参与度和独特的涨跌停制度而著称,了解如何通过API接口获取这些数据,对于构建投资分析系统、量化交易策略或市场监控工具都至关重要。
一、环境准备与API配置
1.1 获取API访问密钥
要使用StockTV API,首先需要获取API密钥。所有请求都需要在URL参数中包含有效的API Key进行身份认证。
import requests
import pandas as pd
# 基础配置
API_KEY = "您的API密钥" # 请替换为实际密钥
BASE_URL = "https://api.stocktv.top"
JAPAN_COUNTRY_ID = 35 # 日本市场国家ID
# 设置请求头
headers = {
"Content-Type": "application/json"
}
1.2 日本市场特色参数
日本股市有一些特定参数需要特别注意:
- 交易时间:东京时间上午9:00-11:30,下午12:30-15:00
- 主要指数:日经225指数(Nikkei 225)、东证股价指数(TOPIX)
- 货币单位:日元(JPY)
- 涨跌幅限制:根据股价不同分为多个档次
二、核心API接口详解
2.1 获取日本股票列表接口
StockTV提供了/stock/stocks接口用于获取日本股票列表,支持分页查询。
def get_japan_stocks(page=1, page_size=100):
"""获取日本股票列表
Args:
page: 页码
page_size: 每页数量
Returns:
股票列表数据
"""
url = f"{BASE_URL}/stock/stocks"
params = {
"countryId": JAPAN_COUNTRY_ID,
"page": page,
"pageSize": page_size,
"key": API_KEY
}
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"API请求失败: {e}")
return None
# 获取第一页的100支股票
stocks_data = get_japan_stocks(1, 100)
接口返回的核心字段说明:
symbol: 股票代码(通常为4位数字)name: 公司名称last: 最新价chg: 涨跌额chgPct: 涨跌幅volume: 成交量high: 当日最高价low: 当日最低价
2.2 响应数据处理
完整的响应处理代码:
def process_stocks_data(stocks_data):
"""处理股票数据"""
if stocks_data and stocks_data["code"] == 200:
records = stocks_data["data"]["records"]
print(f"成功获取到{len(records)}支日本股票数据")
# 创建DataFrame便于分析
df = pd.DataFrame(records)
# 基本数据统计
print("\n=== 日本前100支股票概览 ===")
print(f"平均价格: {df['last'].mean():.2f} JPY")
print(f"最高价格: {df['last'].max():.2f} JPY")
print(f"最低价格: {df['last'].min():.2f} JPY")
print(f"平均涨跌幅: {df['chgPct'].mean():.2f}%")
# 显示前10支股票
print("\n=== 前10支股票详情 ===")
for i, stock in enumerate(records[:10]):
print(f"{i+1}. {stock['symbol']} - {stock['name']}: {stock['last']} JPY ({stock['chgPct']}%)")
return df
else:
print("获取股票数据失败")
return None
# 执行数据获取和处理
df = process_stocks_data(stocks_data)
三、完整实战项目:日本股票监控系统
下面我们构建一个完整的日本股票监控系统,可以实时获取前100支股票数据并进行基本分析。
import requests
import pandas as pd
import time
from datetime import datetime
import matplotlib.pyplot as plt
class JapanStockMonitor:
"""日本股票监控系统"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.stocktv.top"
self.japan_country_id = 35
def get_stock_list(self, page_size=100):
"""获取日本股票列表"""
url = f"{self.base_url}/stock/stocks"
params = {
"countryId": self.japan_country_id,
"pageSize": page_size,
"page": 1,
"key": self.api_key
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
if data["code"] == 200:
return data["data"]["records"]
return []
def get_stock_detail(self, symbol):
"""获取单个股票详细信息"""
url = f"{self.base_url}/stock/queryStocks"
params = {
"symbol": symbol,
"countryId": self.japan_country_id,
"key": self.api_key
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
if data["code"] == 200 and len(data["data"]) > 0:
return data["data"][0]
return None
def analyze_top_stocks(self, top_n=100):
"""分析前N支股票"""
print("开始获取日本股票数据...")
stocks = self.get_stock_list(top_n)
if not stocks:
print("未能获取到股票数据")
return None
print(f"成功获取到{len(stocks)}支日本股票")
# 转换为DataFrame
df = pd.DataFrame(stocks)
# 数据分析
self.perform_analysis(df)
return df
def perform_analysis(self, df):
"""执行数据分析"""
# 1. 基本统计
print("\n" + "="*50)
print("日本前100支股票数据分析报告")
print("="*50)
print(f"1. 价格统计:")
print(f" - 平均价格: {df['last'].mean():.2f} JPY")
print(f" - 价格中位数: {df['last'].median():.2f} JPY")
print(f" - 价格标准差: {df['last'].std():.2f} JPY")
print(f" - 最高价股票: {df.loc[df['last'].idxmax()]['name']} ({df['last'].max():.2f} JPY)")
print(f" - 最低价股票: {df.loc[df['last'].idxmin()]['name']} ({df['last'].min():.2f} JPY)")
print(f"\n2. 涨跌幅统计:")
print(f" - 平均涨跌幅: {df['chgPct'].mean():.2f}%")
print(f" - 上涨股票数量: {len(df[df['chgPct'] > 0])}支")
print(f" - 下跌股票数量: {len(df[df['chgPct'] < 0])}支")
print(f" - 涨幅最大: {df.loc[df['chgPct'].idxmax()]['name']} ({df['chgPct'].max():.2f}%)")
print(f" - 跌幅最大: {df.loc[df['chgPct'].idxmin()]['name']} ({df['chgPct'].min():.2f}%)")
print(f"\n3. 成交量分析:")
print(f" - 总成交量: {df['volume'].sum():,}股")
print(f" - 平均成交量: {df['volume'].mean():,.0f}股")
print(f" - 最活跃股票: {df.loc[df['volume'].idxmax()]['name']} ({df['volume'].max():,}股)")
# 4. 行业分布(如果有行业信息)
if 'industry' in df.columns:
industry_dist = df['industry'].value_counts()
print(f"\n4. 行业分布:")
for industry, count in industry_dist.head().items():
print(f" - {industry}: {count}支股票")
def export_to_csv(self, df, filename=None):
"""导出数据到CSV文件"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"japan_top100_stocks_{timestamp}.csv"
df.to_csv(filename, index=False, encoding='utf-8-sig')
print(f"\n数据已导出到: {filename}")
def create_visualization(self, df):
"""创建基本可视化"""
plt.figure(figsize=(15, 10))
# 1. 价格分布直方图
plt.subplot(2, 2, 1)
plt.hist(df['last'], bins=20, alpha=0.7, color='skyblue')
plt.title('日本前100支股票价格分布')
plt.xlabel('价格(JPY)')
plt.ylabel('数量')
# 2. 涨跌幅分布
plt.subplot(2, 2, 2)
plt.hist(df['chgPct'], bins=20, alpha=0.7, color='lightcoral')
plt.title('涨跌幅分布')
plt.xlabel('涨跌幅(%)')
plt.ylabel('数量')
# 3. 成交量TOP10
plt.subplot(2, 2, 3)
top_volume = df.nlargest(10, 'volume')
plt.bar(range(len(top_volume)), top_volume['volume'], color='lightgreen')
plt.title('成交量TOP10')
plt.xlabel('股票')
plt.ylabel('成交量')
plt.xticks(range(len(top_volume)), top_volume['symbol'], rotation=45)
# 4. 涨跌幅TOP10
plt.subplot(2, 2, 4)
top_gainer = df.nlargest(10, 'chgPct')
plt.bar(range(len(top_gainer)), top_gainer['chgPct'], color='gold')
plt.title('涨幅TOP10')
plt.xlabel('股票')
plt.ylabel('涨跌幅(%)')
plt.xticks(range(len(top_gainer)), top_gainer['symbol'], rotation=45)
plt.tight_layout()
plt.savefig('japan_stocks_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
# 使用示例
if __name__ == "__main__":
# 初始化监控系统
monitor = JapanStockMonitor(API_KEY)
# 获取并分析前100支股票
df = monitor.analyze_top_stocks(100)
if df is not None:
# 导出数据
monitor.export_to_csv(df)
# 创建可视化图表
monitor.create_visualization(df)
# 显示详细信息
print("\n" + "="*50)
print("详细信息预览:")
print("="*50)
print(df[['symbol', 'name', 'last', 'chgPct', 'volume']].head(10))
四、错误处理与最佳实践
在实际使用API时,完善的错误处理机制至关重要。
import requests
from requests.exceptions import RequestException
import time
def safe_api_call(url, params, max_retries=3):
"""带重试机制的API调用"""
for attempt in range(max_retries):
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as err:
print(f"HTTP错误: {err.response.status_code}")
if attempt == max_retries - 1:
return {
"code": err.response.status_code, "message": "HTTP错误"}
except requests.exceptions.ConnectionError:
print("网络连接失败")
if attempt == max_retries - 1:
return {
"code": -1, "message": "网络连接失败"}
except requests.exceptions.Timeout:
print("请求超时")
if attempt == max_retries - 1:
return {
"code": -2, "message": "请求超时"}
# 重试前等待
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 指数退避
print(f"{wait_time}秒后重试...")
time.sleep(wait_time)
return {
"code": -3, "message": "最大重试次数已用完"}
# 改进版的股票获取函数
def robust_get_stocks(api_key, max_retries=3):
"""健壮的股票获取函数"""
url = "https://api.stocktv.top/stock/stocks"
params = {
"countryId": 35,
"pageSize": 100,
"key": api_key
}
return safe_api_call(url, params, max_retries)
五、扩展功能:实时数据监控
除了获取静态数据,StockTV API还支持WebSocket实时数据订阅,可以构建实时监控系统。
import websocket
import json
import threading
class JapanStockRealtimeMonitor:
"""日本股票实时监控"""
def __init__(self, api_key):
self.api_key = api_key
self.ws_url = "wss://ws-api.stocktv.top/connect"
self.ws = None
def on_message(self, ws, message):
"""处理实时消息"""
data = json.loads(message)
print(f"实时行情: {data}")
# 这里可以添加实时数据处理逻辑
# 如价格预警、交易信号检测等
def on_error(self, ws, error):
print(f"WebSocket错误: {error}")
def on_close(self, ws, close_status_code, close_msg):
print("WebSocket连接关闭")
def on_open(self, ws):
print("WebSocket连接已建立")
# 订阅日本股票
subscribe_msg = {
"action": "subscribe",
"countryId": 35,
"symbols": ["7203", "9984", "9430"] # 丰田、软银、日经指数
}
ws.send(json.dumps(subscribe_msg))
def start_monitor(self):
"""启动实时监控"""
websocket.enableTrace(True)
self.ws = websocket.WebSocketApp(
f"{self.ws_url}?key={self.api_key}",
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# 在后台线程中运行
wst = threading.Thread(target=self.ws.run_forever)
wst.daemon = True
wst.start()
# 使用示例
# monitor = JapanStockRealtimeMonitor(API_KEY)
# monitor.start_monitor()
六、总结与进一步学习建议
通过本文的实战教程,我们学习了如何使用StockTV API获取日本前100支股票数据,并构建了一个完整的股票监控分析系统。关键要点总结:
- API基础:StockTV提供了完善的RESTful接口,支持日本股票数据获取
- 数据完整性:接口返回的数据包含价格、涨跌幅、成交量等关键信息
- 扩展性:基于获取的数据可以进行进一步的分析、可视化和监控
进一步学习建议:
- 探索更多API功能,如K线数据、历史数据查询
- 结合技术指标进行量化分析
- 构建自动化交易策略回测系统
- 集成其他市场数据进行比较分析
日本股市作为全球重要市场,通过API接口获取和分析数据是金融科技开发的重要技能。希望本教程能够帮助你快速上手日本股票数据获取,为更复杂的金融应用开发奠定基础。
提示:本文示例代码仅供参考,实际使用时请确保遵守API的使用条款和频率限制。市场有风险,投资需谨慎。