libeasy原理,架构和使用方法

libeasy 原理、架构和使用方法

简介

libeasy提供一个处理tcp连接的事件驱动的网络框架。框架本身封装好了底层的网络操作,只需要开发者处理其中的各种事件。本文介绍libeasy的一些实现原理,整体框架,以及使用的样例。本文是经过一系列摸索,以及wireshark抓包,再结合一些互联网上一些仅有的资料整理完成,如有理解不当的地方,烦请指出。

基本概念

libeasy 的基本概念有:easy_connection_t(连接), easy_message_t(消息), easy_request_t(请求)。每个连接上有可以有多个消息,通过链表连起来,每个消息可以由多个请求组成,也通过链表连起来。

easy_request_t就相当于应用层的一个具体的包, 多个请求组合起来形成一个完整的消息。在一次长连接中,用户可以接受多次消息。每个request 只属于一个connection。

处理模型

libeasy是基于epoll的事件模型,程序收到事件后,回调注册的事件的函数。调用回调函数的线程池称为IO Thread , 线程的个数在创建eaay事件时指定。

extern easy_io_t *easy_eio_create(easy_io_t *eio, int io_thread_count);

一些简单的请求,可以直接在io thread中处理完成,并且直接返回,这种处理模型称为同步模型。

在一些情况下,处理逻辑比较复杂,比如需要读取磁盘数据,这种情况下,IO Thread会封装成一个请求,放入后端队列,经由另外一个线程池进行处理,同时IO Thread会把当前的事件挂起,等待后端线程唤醒后继续处理当前事件。这种模型称为异步模型。

这里写图片描述

这里写图片描述

注册事件的回调

开发者注册一系列回调函数,供libeasy在接受请求时回调。按照回调的顺序,回调函数包括:

  1. on_connect
    接受tcp连接时,回调该函数,可以在该事件中做密码验证等事情。
  2. decode
    从网络上读取一段文本,并且按照定义的协议,解析成数据结构,供之后处理。
  3. process
    处理从decode中解析出的结构,可以是同步处理,也可以是异步处理。
  4. encode
    把process的结果,转化成字符串(如果process的结果是要输出的结果,则不需要转化)。然后把结果挂载到request的输出上。r -> opacket = buf;
  5. clean_up
    在连接断开前执行的操作,如果在之前的操作中分配了一些内存,需要在这里释放。
  6. on_disconnect
    连接断开时的操作。

使用方法

libeasy有一些基本的数据结构

  1. easy_pool_t:共享内存池。在一次请求中,一次性分配大块内存,当用户使用小内存的时候,从pool中分配,从而达到避免分配大量小块内存的情况。因为大量小块内存会非常浪费CPU。

    void * ptr = easy_pool_alloc(req->ms->pool,1024);

    分配好的内存可以用于初始化任何对象。例如:以下例子中的new不会分配新内存,而是使用ptr指向的内存,并且传入1作为构造函数的参数。

    UserInfo * infoPtr = new (ptr)UserInfo(1);
  2. easy_buf_t

    管理输入输出缓冲区,用于逐段消费字符串,或分批把结果放入buf进行输出。其中pos指向当前位置,last指向已使用的内存的结束位置,end指向全部内存的结束位置。

    easy_buf_t* buf = reinterpret_cast<easy_buf_t*>(easy_pool_alloc(req->ms->pool, 2*1024*1024));
    char *data_buffer = reinterpret_cast<char *>(buf + sizeof(easy_buf_t));
    buffer_length = 2*1024*1024 - sizeof(easy_buf_t);  
    buf -> pos = buf -> last =  data_buffer;
    buf->end = buf->last + buffer_length; 
    buf->cleanup = NULL;
    easy_list_init(&buf->node); 

一个同步处理的样例

#include <iostream>
#include "easy/easy_io_struct.h"
#include "easy/easy_io.h"
using namespace std;
struct MyMessage
{
    int mLen;
    int mSeq;
    string mMessage;
public :
    MyMessage(int len,int seq,const string & msg):
        mLen(len),mSeq(seq),mMessage(msg)
    {}

};
int  my_on_connect (easy_connection_t *c)
{
    return 0;
}

int my_easy_io_process_pt(easy_request_t *r)
{
    MyMessage * msg = (MyMessage*)r -> ipacket;
    char buffer[1024];
    sprintf(buffer,"i got you message,len:%d,seq:%d,message:%s",msg ->mLen,msg ->mSeq,msg ->mMessage.c_str());
    string ret(buffer);
    MyMessage * outMsg = new MyMessage(ret.size(),msg ->mSeq+1,ret);
    r -> opacket = outMsg;
    delete msg;
    return 0;
}
int my_on_disconnect (easy_connection_t * c)
{
    return 0;
}
void * my_easy_decode_pt(easy_message_t *m)
{
    int len = (*((uint32_t*)m -> input->pos)) >> 8;
    int seq = (*((uint32_t*)m -> input->pos)) && 0xff;
    MyMessage * msg = new MyMessage(len,seq,string(m ->input->pos+4,len));
    m ->input->pos= m ->input -> last;
    return msg;
}
int my_easy_encode_pt(easy_request_t *r, void *packet)
{
    MyMessage * msg = (MyMessage*) packet;
    easy_buf_t * buf =easy_buf_create(r ->  ms -> pool, msg ->mMessage.size());
    easy_buf_set_data(r ->ms ->pool, buf, msg ->mMessage.c_str(),msg ->mMessage.size());
    easy_request_addbuf(r, buf); //加入输出队列
    delete msg;
    return 0;
}
int main(int argc,char ** argv)
{
    easy_io_handler_pt handler;
    memset(&handler, 0, sizeof(easy_io_handler_pt));
    handler.on_connect = my_on_connect;
    handler.decode = my_easy_decode_pt;
    handler.encode= my_easy_encode_pt;
    handler.on_disconnect = my_on_disconnect;
    handler.process = my_easy_io_process_pt;
    easy_io_t *eio = new easy_io_t();
    memset(eio,0,sizeof(easy_io_t));
    eio = easy_eio_create(eio,10);//创建10个线程
    eio->tcp_defer_accept = 0;
    easy_listen_t* listen = easy_connection_add_listen(eio, NULL, 3308, &handler);//侦听3308端口
    int rc = easy_eio_start(eio);
    easy_eio_wait(eio);
}

异步使用方法

异步模型,最主要的区别是在process 函数中。

这里写图片描述

在process 中,把请求放入后端队列,并且return EASY_AGAIN 表示将请求挂起,不会继续调用接下来的 encode和clean_up ,直到被后端线程唤醒。encode函数和同步模型一致,把req -> opacket放入输出缓存中。
process再次回调的的函数实现如下:

if (EASY_AGAIN == r->retcode)  //wakeup request thread called when send result set sync
    {   
        //EASY_AGAIN说明后续服务器端还有包需要发给客户端
        if (NULL != r->client_wait)
        {
            if (r->ms->c->conn_has_error == 1)
            {   
                r->client_wait->status = EASY_CONN_CLOSE;
            }
            easy_client_wait_wakeup_request(r);
            ret = EASY_AGAIN;
        }
        //else no more data send
        ret = EASY_OK;
}

后端线程的实现:

  req->opacket=buf;//把结果挂载到opacket上
  easy_client_wait_t wait_obj;
  if(shouldWait)
  {
      wait_obj.done_count = 0;
      //用于IO线程唤醒工作线程
      easy_client_wait_init(&wait_obj);
      req->client_wait = &wait_obj;
      req->retcode = -11;
      req->waiting = 1;
  }
  //io线程被唤醒,r->opacket被挂过去,send_response->easy_connection_request_done
  easy_request_wakeup(req);
  // IO线程回调 process(easy_request_t* r)的时候唤醒工作线程
    if(shouldWait)
    {   
        wait_client_obj(wait_obj);//工作线程在此处阻塞,等待唤醒
        if(wait_obj.status==3){
            ret=-124;
        }
        easy_client_wait_cleanup(&wait_obj);
        req->client_wait = NULL;
    }