Web应用防火墙的接口调用是向WAF API的服务端地址发送HTTP GET请求,并按照接口说明在请求中加入相应请求参数,调用后系统会返回处理结果。 以下Python示例代码演示了如何添加公共参数和接口请求参数、如何用请求参数构造规范化请求字符串、如何构造StringToSign字符串,以及如何获得OpenAPI服务端地址,最终以Get方式发送HTTP请求获取相应的响应结果。 下载Python示例代码
说明 如果您需要使用以下示例,请替换示例中的公共参数及接口请求参数信息。 定义公共参数
#! /usr/bin/env python
import hashlib import urllib import requests import hmac import random import datetime import sys
class OpenAPI(object): def init(self, signature_version='1.0', api_url=None, ak=None, sk=None, api_version=None): assert api_url is not None assert ak is not None assert sk is not None assert api_version is not None
self.signature_once = 0
self.signature_method = 'HMAC-SHA1'
self.signature_version = signature_version
self.api_version = api_version
self.format = 'json'
self.signature_method = 'HMAC-SHA1'
self.api_url = api_url
self.access_key = ak
self.access_secret = sk
def __gen_common_params(self, req_type, api_version, access_key, access_secret, http_params):
while 1:
rand_int = random.randint(10, 999999999)
if rand_int != self.signature_once:
self.signature_once = rand_int
break
# 当前步骤中是否含有AccessKey参数
if access_key == None:
return None
http_params.append(('AccessKeyId', access_key))
http_params.append(('Format', self.format))
http_params.append(('Version', api_version))
timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
http_params.append(('Timestamp', timestamp))
http_params.append(('SignatureMethod', self.signature_method))
http_params.append(('SignatureVersion', self.signature_version))
http_params.append(('SignatureNonce', str(self.signature_once)))
# 签名
http_params = self.sign(req_type, http_params, access_secret)
return urllib.urlencode(http_params)
def get(self, http_params=[], host=None, execute=True):
data = self.__gen_common_params('GET', self.api_version, self.access_key, self.access_secret, http_params)
api_url = self.api_url
if data == None:
url = "%s" % (api_url)
else:
url = "%s/?" % api_url + data
print ("URL: %s"%url)
if execute is False:
return url
ret = {}
try:
if host is not None:
response = requests.get(url,headers={'Host':host}, verify=False)
else:
response = requests.get(url, verify=False)
ret['code'] = response.status_code
ret['data'] = response.text
except Exception as e:
ret['data'] = str(e)
return ret
def __get_data(self, http_params):
params = self.__gen_common_params('POST', self.api_version, self.access_key, self.access_secret, http_params)
if params == []:
data = None
else:
data = params.replace("+", "%20")
data = data.replace("*", "%2A")
data = data.replace("%7E", "~")
return data
def post(self, http_params=[], out_fd=sys.stdout):
data = self.__get_data(self.api_version, self.access_key, self.access_secret, http_params)
api_url = self.api_url
out_fd.write(u"[%s] --> (POST):%s\n%s\n" % (datetime.datetime.now(), api_url, data))
ret = requests.post(api_url, data, verify=False)
print (ret.text)
return ret
def sign(self, http_method, http_params, secret):
list_params = sorted(http_params, key=lambda d: d[0])
#print list_params
url_encode_str = urllib.urlencode(list_params)
#print url_encode_str
url_encode_str = url_encode_str.replace("+", "%20")
url_encode_str = url_encode_str.replace("*", "%2A")
url_encode_str = url_encode_str.replace("%7E", "~")
string_to_sign = http_method + "&%2F&" + urllib.quote(url_encode_str)
#print string_to_sign
hmac_key = str(secret + "&")
sign_value = str(hmac.new(hmac_key, string_to_sign, hashlib.sha1).digest().encode('base64').rstrip())
http_params.append(('Signature', sign_value))
return http_params
生成接口调用请求
说明 以下代码示例以调用ModifyWafSwitch接口开启Web应用攻击防护功能为例。
from open_api import OpenAPI
class Waf(OpenAPI): def init(self, api_url, ak, sk, api_version, instance_id, region):
super(Waf, self).__init__(api_url=api_url, ak=ak, sk=sk, api_version=api_version)
self.instance_id = instance_id
self.region = region
def ModifyWafSwitch(self,domain, instance_id=None, region='cn', service_on=1, execute=True):
if instance_id is None:
instance_id = self.instance_id
if region is None:
region = self.region
params = [
('Action', 'ModifyWafSwitch'),
('InstanceId', instance_id),
('Domain', domain),
('Region',region),
('ServiceOn', service_on)
]
print (params)
return self.get(http_params=params,execute=execute)
if name == "main": api_url = "https://wafopenapi.cn-hangzhou.aliyuncs.com" # 填写您账号的AccessKeyId信息 ak = "" # 填写您账号的AccessKeyScecret信息 sk = "" # 填写您WAF的实例ID,您可以通过调用GetPayInfo接口获取InstanceID信息 instance_id = "" # 填写您WAF实例所在地域信息 region = "" api_version = "2018-01-17"
t = Waf(api_url=api_url, ak=ak, sk=sk, api_version=api_version, instance_id=instance_id, region=region)
print (t.ModifyWafSwitch(domain="", service_on=1))
发送HTTP GET请求 通过上述代码得到HTTP请求,向WAF API的服务端地址发送该HTTP GET请求。 请求示例
https://wafopenapi.cn-hangzhou.aliyuncs.com/?Action=ModifyWafSwitch&Domain=www.aliyun.com&ServiceOn=1&Region=cn&InstanceId=waf_elasticity-cn-0xldbqtm005&TimeStamp=2018-08-23T12:46:24Z&Format=JSON&AccessKeyId=testid&SignatureMethod=HMAC-SHA1&SignatureNonce=3ee8c1b8-83d3-44af-a94f-4e0ad82fd6cf&Version=2018-01-17&SignatureVersion=1.0&Signature=CT9X0VtwR86fNWSnsc6v8YGOjuE%3D 获得响应结果 最终收到WAF API服务端返回的响应结果。 返回示例
{ "RequestId":"D7861F61-5B61-46CE-A47C-6B19160D5EB0", "Result":{ "Status":2, "WafTaskId":"aliyun.waf.20180712214032277.qmxI9a" } }
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。