Skip to content

test_hiredisvip_async_ae.c

deep edited this page Dec 6, 2016 · 1 revision
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<time.h>
#include<signal.h>
#include<pthread.h>
#include<sys/socket.h>

#include<hircluster.h>
#include<adlist.h>
#include<async.h>
#include<adapters/ae.h>

static aeEventLoop *loop;
static redisClusterAsyncContext *acc;

static int null_count=0,ok_count=0,string_count=0,error_count=0,reply_null_count=0,array_count=0;
static long long total_cmd_count, callback_count;

static long long counter = 0;

struct data
{
    long long key_num;
    long long value_num;
};

void getCallback(redisClusterAsyncContext *acc, void *r, void *privdata) {
    redisReply *reply = r;
    struct data *dt = privdata;

    callback_count ++;

    if (reply == NULL) 
	{
		reply_null_count ++;
		goto done;
	}

	switch(reply->type)
    {
    case REDIS_REPLY_STRING:
        string_count ++;
        break;
    case REDIS_REPLY_ARRAY:
        array_count ++;
        break;
    case REDIS_REPLY_INTEGER:

        break;
    case REDIS_REPLY_NIL:
        null_count ++;
        break;
    case REDIS_REPLY_STATUS:
        if (strcmp(reply->str, "OK") == 0) {
            ok_count ++;
        }
        break;
    case REDIS_REPLY_ERROR:
        error_count ++;
        break;
    default:
        break;
    }

done:
    if(callback_count%50000 == 0)
	{
        printf("\n\n******%lld*******\nroute version: %d\n", counter, acc->cc->route_version);

        printf("null_count: %d\n", null_count);
        printf("ok_count: %d\n", ok_count);
        printf("array_count: %d\n", array_count);
        printf("string_count: %d\n", string_count);
        printf("error_count: %d\n", error_count);
        printf("reply_null_count: %d\n", reply_null_count);

        null_count = ok_count = array_count = string_count = error_count = reply_null_count = 0;
        counter ++;
	}
   
    if (dt != NULL) {
        free(dt);
    }
}

void connectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        return;
    }

    printf("Connected...\n");
}

void disconnectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        printf("disconnect error: %s\n", c->errstr);
        return;
    }
    
	printf("\nDisconnected...\n");
}

static int sd_notice_recieve;
static int sd_notice_send;

static hilist *l;
static pthread_mutex_t lmutex;

#define handle_one_time_count 1000

void handle_event(aeEventLoop *el, int fd, void *privdata, int mask)
{
    listNode *node;
    struct data *dt;
    char buf[10];
    
    int count = 0;
    
    if(acc == NULL)
    {
        printf("error: privdata is null\n");
        return;
    }
       
    pthread_mutex_lock(&lmutex);
    while(listLength(l) > 0) {
        if ((total_cmd_count-callback_count) > 10000) {
            break;
        }
        
        read(sd_notice_recieve, buf, 1);
        node = listFirst(l);
        dt = (struct data *)(listNodeValue(node));
        listDelNode(l, node);
        redisClusterAsyncCommand(acc, getCallback, dt, "set %lld %lld", dt->key_num, dt->value_num);
        total_cmd_count ++;
        if (++count > handle_one_time_count)
            break;
    }
    pthread_mutex_unlock(&lmutex);
}

void *event_run(void *args)
{
    aeCreateFileEvent(loop, sd_notice_recieve, AE_READABLE, handle_event, acc);
    aeMain(loop);
    return 0;
}

//gcc -o test_hiredisvip_async_ae_new -lhiredis_vip -lpthread test_hiredisvip_async_ae_new.c ../redis-2.8.13/src/ae.o ../redis-2.8.13/src/zmalloc.o -I/usr/local/include -I/usr/local/include/hiredis-vip -L/usr/local/lib -I../redis-2.8.13/src/
int main(int argc, char **argv)
{
    char *addr;
	long long count;
    pthread_t thread_id;
    int fds_pipe[2];

    signal(SIGPIPE, SIG_IGN);

    callback_count = 0;
	total_cmd_count = 0;
    
    pthread_mutex_init(&lmutex,NULL);

    if(argc < 2){    
        printf("Error: must have the first argument address\n");
        return;
    }
    addr = argv[1];
    
    int flags = HIRCLUSTER_FLAG_NULL;

    acc = redisClusterAsyncConnect(addr, flags);
    if (acc->err) {
        /* Let *c leak for now... */
        printf("connect error: %s\n", acc->errstr);
        return -1;
    }

    loop = aeCreateEventLoop(6400000);
    redisClusterAeAttach(loop, acc);
    redisClusterAsyncSetConnectCallback(acc,connectCallback);
    redisClusterAsyncSetDisconnectCallback(acc,disconnectCallback);

    pipe(fds_pipe);
    sd_notice_recieve = fds_pipe[0];
    sd_notice_send = fds_pipe[1];

    l = listCreate();
    
    callback_count = 0;

    pthread_create(&thread_id, NULL, event_run, loop);
    count = 0;
	for(;;)
	{
        struct data *dt = malloc(sizeof(struct data));
        dt->key_num = count;
        dt->value_num = count;
        pthread_mutex_lock(&lmutex);    
        if (listLength(l) > 10000) {
            pthread_mutex_unlock(&lmutex);
            free(dt);
            continue;
        }
        listAddNodeTail(l, dt);
        pthread_mutex_unlock(&lmutex);
        count ++;
        write(sd_notice_send, "c", 1);
	}

    pthread_join(thread_id,NULL);

	return 0;
}
Clone this wiki locally