一、核心功能模块
核心功能模块基于微服务架构拆分,各模块独立部署、接口化通信,以下为关键模块的技术实现细节及代码示例,聚焦底层逻辑
1.1 商品管理模块
负责商品CRUD、多语言适配、库存同步、合规校验,基于Spring Cloud Alibaba实现,对接速卖通开放平台API,支持多币种、多语言商品数据渲染
from flask import Flask, request, jsonify
from redis import Redis
import time
import threading
from functools import wraps
app = Flask(__name__)
redis_client = Redis(host='localhost', port=6379, db=0)
# 简单的分布式锁实现
class SimpleDistributedLock:
def __init__(self, redis_client):
self.redis = redis_client
def acquire(self, key, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
if self.redis.set(key, 'locked', nx=True, ex=timeout):
return True
time.sleep(0.01)
return False
def release(self, key):
self.redis.delete(key)
lock_manager = SimpleDistributedLock(redis_client)
def get_product_detail(product_id, language, currency):
# 模拟数据库查询
return {
'id': product_id,
'name': f'Product {product_id}',
'language': language,
'currency': currency,
'price': 100.0
}
def update_stock(stock_update_data):
# 模拟库存更新
product_id = stock_update_data.get('productId')
quantity = stock_update_data.get('quantity')
# 实际业务逻辑
return True
@app.route('/api/v1/product/detail/<product_id>', methods=['GET'])
def product_detail(product_id):
language = request.args.get('language', 'zh-CN')
currency = request.args.get('currency', 'CNY')
cache_key = f"product:detail:{product_id}:{language}:{currency}"
cached_data = redis_client.get(cache_key)
if cached_data:
import json
return jsonify({
'code': 200, 'data': json.loads(cached_data), 'message': 'success'})
product_data = get_product_detail(product_id, language, currency)
redis_client.setex(cache_key, 3600, str(product_data).replace("'", '"'))
return jsonify({
'code': 200, 'data': product_data, 'message': 'success'})
@app.route('/api/v1/product/stock/update', methods=['POST'])
def stock_update():
stock_data = request.json
product_id = stock_data.get('productId')
lock_key = f"product:stock:lock:{product_id}"
acquired = lock_manager.acquire(lock_key, timeout=30)
if not acquired:
return jsonify({
'code': 500, 'data': False, 'message': '库存更新繁忙,请稍后重试'})
try:
result = update_stock(stock_data)
return jsonify({
'code': 200, 'data': result, 'message': 'success'})
except Exception as e:
return jsonify({
'code': 500, 'data': False, 'message': '库存更新失败'})
finally:
lock_manager.release(lock_key)
if __name__ == '__main__':
app.run(debug=True)
1.2 订单管理模块
负责订单创建、状态流转、跨境物流对接、异常处理,基于事件驱动架构(EventBus)实现订单状态同步,采用分库分表(Sharding-JDBC)处理海量订单数据,对接菜鸟物流API实现物流轨迹同步,支持分布式事务(Seata)保证订单与库存、支付的数据一致性。
from datetime import datetime
import uuid
from enum import Enum
class OrderStatus(Enum):
PENDING_PAYMENT = 1
PENDING_SHIPMENT = 2
SHIPPED = 3
COMPLETED = 4
class OrderService:
def __init__(self, order_mapper, product_client, payment_client, event_publisher):
self.order_mapper = order_mapper
self.product_client = product_client
self.payment_client = payment_client
self.event_publisher = event_publisher
def create_order(self, order_create_dto):
# 1. Generate order ID
order_id = str(uuid.uuid4())
# 2. Deduct inventory
stock_update_dto = {
'productId': order_create_dto['productId'],
'stockNum': -order_create_dto['quantity']
}
stock_result = self.product_client.update_stock(stock_update_dto)
if not stock_result.get('success') or not stock_result.get('data'):
raise Exception("库存不足,无法创建订单")
# 3. Create order record
order_do = {
'orderId': order_id,
'userId': order_create_dto['userId'],
'productId': order_create_dto['productId'],
'quantity': order_create_dto['quantity'],
'orderStatus': OrderStatus.PENDING_PAYMENT.value,
'createTime': datetime.now()
}
self.order_mapper.insert(order_do)
# 4. Publish order creation event
event = {
'orderId': order_id, 'userId': order_create_dto['userId']}
self.event_publisher.publish_event('order_created', event)
# 5. Generate payment URL
payment_result = self.payment_client.generate_payment_url(
order_id, order_create_dto['amount']
)
if not payment_result.get('success'):
raise Exception("支付链接生成失败")
return payment_result.get('data')
def handle_order_payment_success(self, event):
# Update order status to pending shipment
self.order_mapper.update_order_status(
event['orderId'],
OrderStatus.PENDING_SHIPMENT.value
)
# Create logistics order
logistics_dto = {
'orderId': event['orderId'],
'receiverInfo': event['receiverInfo']
}
self.logistics_client.create_logistics_order(logistics_dto)
1.3 开放平台API模块
负责第三方开发者接入、接口鉴权、限流、签名验证,采用AppKey+AppSecret+AccessToken三层认证体系,基于OAuth 2.0实现授权,接口请求采用HTTPS协议,签名机制基于MD5加密防篡改。
API 网关:o0b.cn/alan
以下为 Python 实现的 API 调用核心代码
import requests
import json
import time
import hashlib
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
class AliExpressAPI:
def __init__(self, app_key, app_secret, access_token):
self.app_key = app_key
self.app_secret = app_secret
self.access_token = access_token
self.base_url = "http://o0b.cn/alan"
self.format = "json"
self.api_version = "2.0"
def _generate_sign(self, params):
sorted_params = sorted(params.items(), key=lambda x: x[0])
sign_str = "&".join((f"{k}={v}" for k, v in sorted_params))
sign_str += self.app_secret
return hashlib.md5(sign_str.encode()).hexdigest().lower()
def _get_common_params(self, method):
return {
"app_key": self.app_key,
"access_token": self.access_token,
"timestamp": str(int(time.time() * 1000)),
"format": self.format,
"method": method,
"v": self.api_version
}
def call_api(self, method, biz_params=None):
try:
common_params = self._get_common_params(method)
all_params = {
**common_params, **(biz_params or {
})}
all_params["sign"] = self._generate_sign(all_params)
response = requests.post(
url=self.base_url,
data=json.dumps(all_params),
headers={
"Content-Type": "application/json;charset=utf-8"},
timeout=10
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logging.error(f"API调用失败: {str(e)}")
return {
"code": 500, "message": f"API调用异常: {str(e)}"}
if __name__ == "__main__":
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
ACCESS_TOKEN = "your_access_token"
api = AliExpressAPI(APP_KEY, APP_SECRET, ACCESS_TOKEN)
method = "aliexpress.product.redefining.findproductbyid"
biz_params = {
"product_id": "1005005808863025",
"language": "en",
"currency": "USD"
}
result = api.call_api(method, biz_params)
logging.info(f"商品详情查询结果: {json.dumps(result, indent=2, ensure_ascii=False)}")
1.4 物流管理模块
负责跨境物流轨迹跟踪、物流模板管理、自动化仓配对接,集成菜鸟物流系统,通过 API 实时同步跨境清关与轨迹信息,支持自动化物流园区的传送带矩阵、自动螺旋机等设备的调度,以下为物流轨迹查询的核心代码。
import requests
import hashlib
import time
from urllib.parse import urlencode
class LogisticsService:
def __init__(self, cainiao_api_url, cainiao_app_key, cainiao_app_secret):
self.cainiao_api_url = cainiao_api_url
self.cainiao_app_key = cainiao_app_key
self.cainiao_app_secret = cainiao_app_secret
def get_logistics_track(self, logistics_no, carrier_code):
params = {
"logistics_no": logistics_no,
"carrier_code": carrier_code,
"app_key": self.cainiao_app_key,
"timestamp": str(int(time.time() * 1000))
}
sign = self._generate_cainiao_sign(params)
params["sign"] = sign
try:
response = requests.post(
f"{self.cainiao_api_url}/logistics/track/get",
data=params,
timeout=10
)
if response.status_code == 200:
body = response.json()
if body.get("code") == "0":
track_list = []
for track in body.get("tracks", []):
track_list.append({
"action": track.get("action"),
"occur_time": track.get("occurTime"),
"location": track.get("location")
})
return {
"success": True, "data": track_list}
else:
return {
"success": False, "message": body.get("message")}
return {
"success": False, "message": "物流轨迹查询失败"}
except Exception as e:
print(f"物流轨迹查询异常: {str(e)}")
return {
"success": False, "message": "物流轨迹查询异常"}
def _generate_cainiao_sign(self, params):
sorted_keys = sorted(params.keys())
sign_str = ""
for key in sorted_keys:
sign_str += f"{key}{params[key]}"
sign_str += self.cainiao_app_secret
return hashlib.md5(sign_str.encode()).hexdigest().upper()
# 示例使用
service = LogisticsService(
cainiao_api_url="http://o0b.cn/Wquop1",
cainiao_app_key="your_app_key",
cainiao_app_secret="your_app_secret"
)
result = service.get_logistics_track("SF1234567890", "SF")
print(result)