1.初始化
Windows上需要自己初始化系统网络库,即调用
WSADATA WSAData;
WSAStartup(0x201, &WSAData);
结束使用时再
WSACleanup();
另外还需要初始化线程设置
windows上 evthread_use_windows_threads
linux上 evthread_use_pthreads
综上,使用其它libevent的函数前需要这样:
#if defined(WIN32)
WSADATA WSAData;
WSAStartup(0x201, &WSAData);
evthread_use_windows_threads();
#else
evthread_use_pthreads();
#endif
2.Loop
因为event_base_loop在没有event的时候就会结束,所以作为事件循环,需要加一个超时时间很长的timer event作为默认的负载,在真正结束事件循环的时候再把这个event delete掉。
3.Timer
event_new出来的对象是需要自己释放的,而且无法从event_base里再获取所有的还没执行的events,所以需要把event*保存起来,执行后再event_del。也就是需要自己做一个封装,callback函数的arg肯定不会是目标的对象。
4.多线程
libevent即使提供了多线程初始化函数,它也不能简单地跨线程发消息。一般来说会想当然地在当前线程中对另一个线程中的event_base加入一个超时为0的timer event,这样就会在另一个线程执行那个事件,达到发线程消息的效果。然而如果短时间内连续这样做,这些timer events并不能保证按序执行。
安全的做法是:
- 每个线程创建自己的消息队列,发线程消息就是往队列增加一个消息。当然,这个过程得加锁。
- 创建一对互相连接好的socket,线程的event_base加入event监听EV_READ事件,触发就按序执行消息队列。
- 每发一个线程消息就等于往队列增加一个消息,然后往socket写一个字节(任意内容)。即目标线程因为socket可读而结束等待继而执行消息。
综上,用libevent来作为多线程间的事件循环和互发消息是还有工作要做的。
5.HTTP client
evhttp有挺多的坑,如回调的时机不明确、回调的参数有可能为空,connection和request的生命周期奇异等。这些在文档中没有过多的说明,只能吃一堑长一智。用C语言来写的库虽然效率高,但确实不怎么好维护。
下面是自己的实验代码,不想过多解释,请自行参考。
// HttpTransaction.h
struct event_base;
struct evhttp_connection;
struct evhttp_request;
struct event;
class HttpTransaction {
public:
explicit HttpTransaction(event_base* base);
~HttpTransaction();
class Delegate {
public:
virtual ~Delegate() {}
virtual void OnResponseReceived(HttpTransaction* transaction,
const HttpResponse& response) = 0;
virtual void OnDataReceived(HttpTransaction* transaction,
const char* data, size_t length) = 0;
virtual void OnFinished(HttpTransaction* transaction) = 0;
virtual void OnError(HttpTransaction* transaction, int error_code) = 0;
};
virtual void Start(int* out_error_code);
virtual void Cancel();
virtual HttpRequest* request() const { return http_request_; }
virtual void set_request(HttpRequest* request) {
http_request_ = request;
}
virtual Delegate* delegate() const { return delegate_; }
virtual void set_delegate(Delegate* delegate) { delegate_ = delegate; }
void OnEvRequestCallback(evhttp_request* req);
void OnTimerCallback();
private:
enum State {
STATE_NONE,
STATE_WAITING_REQUEST_CALLBACK,
STATE_REPORT_RESPONSE,
STATE_REPORT_DATA,
STATE_REPORT_FINISH,
STATE_REPORT_ERROR
};
void SetupTimer();
void DoReportResponse();
void DoReportData();
void DoReportFinish();
void DoReportError();
event_base* base_;
HttpRequest* http_request_;
Delegate* delegate_;
evhttp_connection* ev_connection_;
evhttp_request* ev_request_;
event* call_delegate_timer_;
State next_state_;
};
// HttpTransaction.cpp
#include <string.h>
#include <assert.h>
#include "event2/event.h"
#include "event2/http.h"
#include "event2/buffer.h"
#include "event2/keyvalq_struct.h"
#include "http_transaction_impl.h"
#include "http_request.h"
#include "mutable_http_response.h"
void RequestCallback(struct evhttp_request* req, void *arg) {
HttpTransaction* transaction = static_cast<HttpTransaction*>(arg);
transaction->OnEvRequestCallback(req);
}
void TimerCallback(evutil_socket_t fd, short what, void* arg) {
HttpTransaction* transaction = static_cast<HttpTransaction*>(arg);
transaction->OnTimerCallback();
}
HttpTransaction::HttpTransaction(struct event_base* base)
: base_(base),
http_request_(NULL),
delegate_(NULL),
ev_connection_(NULL),
ev_request_(NULL),
call_delegate_timer_(NULL),
next_state_(STATE_NONE) {
}
HttpTransaction::~HttpTransaction() {
Cancel();
}
void HttpTransaction::Start(int* out_error_code) {
if (!delegate_) {
*out_error_code = 1;
return;
}
if (!http_request_) {
*out_error_code = 4;
return;
}
const char* url = http_request_->url();
if (!url) {
*out_error_code = 11;
return;
}
struct evhttp_uri *uri = evhttp_uri_parse(url);
if (!uri) {
*out_error_code = 12;
return;
}
const char* host = evhttp_uri_get_host(uri);
if (!host) {
evhttp_uri_free(uri);
*out_error_code = 13;
return;
}
int port = evhttp_uri_get_port(uri);
if (port == -1)
port = 80;
const char* method = http_request_->method();
if (!method) {
evhttp_uri_free(uri);
*out_error_code = 14;
return;
}
evhttp_cmd_type cmd_type;
if (strcmp(method, "GET") == 0) {
cmd_type = EVHTTP_REQ_GET;
} else if (strcmp(method, "POST") == 0) {
cmd_type = EVHTTP_REQ_POST;
} else {
evhttp_uri_free(uri);
*out_error_code = 15;
return;
}
ev_connection_ = evhttp_connection_base_new(base_, NULL, host, port);
if (!ev_connection_) {
evhttp_uri_free(uri);
*out_error_code = 2;
return;
}
ev_request_ = evhttp_request_new(&RequestCallback, this);
void* iter = NULL;
const char* name = NULL;
const char* value = NULL;
struct evkeyvalq *headers = evhttp_request_get_output_headers(ev_request_);
while (http_request_->EnumerateHeaderLines(&iter, &name, &value)) {
evhttp_add_header(headers, name, value);
}
value = http_request_->GetHeader("host");
if (!value) {
evhttp_add_header(headers, "host", host);
}
const char* body = NULL;
size_t length = 0;
if (http_request_->body(&body, &length)) {
struct evbuffer *buffer = evhttp_request_get_output_buffer(ev_request_);
evbuffer_add(buffer, body, length);
}
// evhttp_make_request() may trigger RequestCallback() synchronously and the
// return code is 0. An example is DNS error.
int rv = evhttp_make_request(ev_connection_, ev_request_,
cmd_type, evhttp_uri_get_path(uri));
evhttp_uri_free(uri);
if (rv == -1) {
ev_request_ = NULL;
evhttp_connection_free(ev_connection_);
ev_connection_ = NULL;
*out_error_code = 3;
return;
}
if (next_state_ == STATE_NONE)
next_state_ = STATE_WAITING_REQUEST_CALLBACK;
*out_error_code = 0;
// evhttp_connection_set_timeout(m_conn, (int)request.timeoutInterval());
}
void HttpTransaction::Cancel() {
next_state_ = STATE_NONE;
if (call_delegate_timer_) {
event_del(call_delegate_timer_);
event_free(call_delegate_timer_);
call_delegate_timer_ = NULL;
}
if (ev_request_) {
if (evhttp_request_is_owned(ev_request_))
evhttp_request_free(ev_request_);
ev_request_ = NULL;
}
if (ev_connection_) {
evhttp_connection_free(ev_connection_);
ev_connection_ = NULL;
}
}
void HttpTransaction::OnEvRequestCallback(evhttp_request* req) {
SetupTimer();
if (req == NULL ||
evhttp_request_get_response_code(ev_request_) == 0) {
ev_request_ = NULL;
next_state_ = STATE_REPORT_ERROR;
} else {
assert(ev_request_ == req);
evhttp_request_own(req);
next_state_ = STATE_REPORT_RESPONSE;
}
}
void HttpTransaction::OnTimerCallback() {
State state = next_state_;
next_state_ = STATE_NONE;
switch (state) {
case STATE_REPORT_RESPONSE:
DoReportResponse();
break;
case STATE_REPORT_DATA:
DoReportData();
break;
case STATE_REPORT_FINISH:
DoReportFinish();
break;
case STATE_REPORT_ERROR:
DoReportError();
break;
default:
assert(0);
break;
}
}
void HttpTransaction::SetupTimer() {
if (!call_delegate_timer_) {
call_delegate_timer_ = evtimer_new(base_, &TimerCallback, this);
}
struct timeval tv = { 0, 0 };
event_add(call_delegate_timer_, &tv);
}
void HttpTransaction::DoReportResponse() {
SetupTimer();
next_state_ = STATE_REPORT_DATA;
MutableHttpResponse* response =
MutableHttpResponse::Create();
const char* url = http_request_->url();
response->set_url(url);
response->set_status_code(evhttp_request_get_response_code(ev_request_));
struct evkeyvalq* headers = evhttp_request_get_input_headers(ev_request_);
for (struct evkeyval* header = headers->tqh_first; header;
header = header->next.tqe_next) {
response->AddHeader(header->key, header->value);
}
delegate_->OnResponseReceived(this, *response);
delete response;
}
void HttpTransaction::DoReportData() {
struct evbuffer *buf = evhttp_request_get_input_buffer(ev_request_);
if (evbuffer_get_length(buf)) {
const int kBufferSize = 1024 * 12;
char* buffer = new char[kBufferSize];
int n = evbuffer_remove(buf, buffer, kBufferSize);
SetupTimer();
if (evbuffer_get_length(buf))
next_state_ = STATE_REPORT_DATA;
else
next_state_ = STATE_REPORT_FINISH;
if (n > 0) {
struct timeval tv = { 0, 0 };
event_add(call_delegate_timer_, &tv);
delegate_->OnDataReceived(this, buffer, n);
}
delete buffer;
} else {
DoReportFinish();
}
}
void HttpTransaction::DoReportFinish() {
Cancel();
delegate_->OnFinished(this);
}
void HttpTransaction::DoReportError() {
Cancel();
delegate_->OnError(this, 0xdead);
}
转载请注明出处:http://blog.csdn.net/hursing