Skip to content

Commit

Permalink
Merge pull request #8 from D4-project/master
Browse files Browse the repository at this point in the history
Redis_queue + multiprocess
  • Loading branch information
haegardev committed Jul 23, 2020
2 parents e2a5d0f + 559b516 commit b9ef5bc
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 32 deletions.
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ files.sort()
for rf in files:
f = root + "/"+rf
if f.endswith('pcap') == True:
# Compressed files are supported too
# if f.endswith('pcap.gz') == True:
red.rpush("PCAPDJ_IN_QUEUE",f)
```

Expand All @@ -91,11 +93,11 @@ suricata -r /tmp/pcapbuffer
```

Until now no packets are put in the buffer because pcapdj needs an
authorization. PCAPDJ says that it is ready to process the pcapfile 1.pcap
and that it waits for this authorization. For doing so, pcapdj puts the
next file it wants to process in a queue called PCAPDJ_NEXT and it polls the
key PCAPDJ_AUTH. The value of PCAPDJ_AUTH must correspond to the file pcapdj
put previously in the queue PCAPDJ_NEXT.
authorization. PCAPDJ says that it is ready to process the pcapfile 1.pcap and
that it waits for this authorization. For doing so, pcapdj puts the next file
it wants to process in a queue called PCAPDJ_NEXT and it searches for the given
filename in the PCAPDJ_AUTH set. This way several pcadj processes can be managed
by the same authorization script.

```
[INFO] Next file to process /tmp/testpcaps/1.pcap
Expand All @@ -111,7 +113,7 @@ while True:
pcapname = red.lpop("PCAPDJ_NEXT")
if pcapname != None:
print "Authorized file ",pcapname
red.set("PCAPDJ_AUTH", pcapname)
red.sadd("PCAPDJ_AUTH", pcapname)
```

Wait until pcapdj and suricata are done
Expand Down
54 changes: 28 additions & 26 deletions pcapdj.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ statistics_t stats;
void usage(void)
{

printf("pcapdj [-h] -b namedpipe [-s redis_server] -p [redis_srv_port]\n\n");
printf("pcapdj [-h] -b namedpipe [-s redis_server] -p [redis_srv_port] [-q redis_queue]\n\n");
printf("Connects to the redis instance specified with by the redis_server\n");
printf("and redis_srv_port.\n\n");

printf("Read a list of pcap-ng files from the queue PCAPDJ_IN_QUEUE.\n");
printf("Read a list of pcap-ng files from the queue PCAPDJ_IN_QUEUE by default\n");
printf("or the queue specified with the -q flag is set.\n");
printf("Open the pcap-ng file and feed each packet to the fifo buffer\n");
printf("specified by with the -b option. When a pcap file from the list\n");
printf("has been transferred to the buffer update the queue PCAPDJ_PROCESSED\n");
Expand Down Expand Up @@ -176,11 +177,11 @@ void delete_next_file_queue(redisContext* ctx)
}


void delete_auth_file(redisContext* ctx)
void delete_auth_file(redisContext* ctx, char* filename)
{
/* FIXME errors are ignored */
redisReply * reply;
reply = redisCommand(ctx, "DEL %s", AKEY);
reply = redisCommand(ctx, "SREM %s %s", AKEY, filename);
if (reply)
freeReplyObject(reply);
}
Expand All @@ -192,20 +193,15 @@ void wait_auth_to_proceed(redisContext* ctx, char* filename)
/* If there is an error the program waits forever */

do {
reply = redisCommand(ctx,"GET %s",AKEY);
reply = redisCommand(ctx,"SISMEMBER %s %s",AKEY, filename);
if (reply){
if (reply->type == REDIS_REPLY_STRING) {
/* Delete the authorized key. So in the next
* iteration the AUTH_KEY is not there anymore and
* the error message is not reated all the times
*/
delete_auth_file(ctx);
if (!strncmp(reply->str, filename, strlen(filename))) {
if (reply->type == REDIS_REPLY_INTEGER) {
/* Delete the filename from the set if found */
if (reply->integer == 1){
delete_auth_file(ctx, filename);
fprintf(stderr, "[INFO] Got authorization to process %s\n",filename);
freeReplyObject(reply);
return;
}else{
fprintf(stderr,"[ERROR] Got the wrong authorization. Waited for (%s). Got %s.\n", filename, reply->str);
}
}
freeReplyObject(reply);
Expand Down Expand Up @@ -257,7 +253,7 @@ void process_file(redisContext* ctx, wtap_dumper* dumper, char* filename)
}
}

int process_input_queue(wtap_dumper *dumper, char* redis_server, int redis_srv_port)
int process_input_queue(wtap_dumper *dumper, char* redis_server, int redis_srv_port, char* redis_queue)
{
redisContext* ctx;
redisReply* reply;
Expand All @@ -268,10 +264,9 @@ int process_input_queue(wtap_dumper *dumper, char* redis_server, int redis_srv_p
fprintf(stderr,"[ERROR] Could not connect to redis. %s.\n", ctx->errstr);
return EXIT_FAILURE;
}


do {
reply = redisCommand(ctx,"LPOP %s", PQUEUE);
reply = redisCommand(ctx,"LPOP %s", redis_queue);
if (!reply){
fprintf(stderr,"[ERROR] Redis error %s\n",ctx->errstr);
return EXIT_FAILURE;
Expand Down Expand Up @@ -320,36 +315,40 @@ void init(void)
int main(int argc, char* argv[])
{

int opt;
int r;
int opt, r, redis_srv_port, write_err;
char* redis_server;
int redis_srv_port;
char *namedpipe;
char* namedpipe;
char* redis_queue;
FILE *fifo;
wtap_dumper *pdh = NULL;
wtap_dump_params params = WTAP_DUMP_PARAMS_INIT;
int write_err;

init();

namedpipe = calloc(128,1);
assert(namedpipe);


redis_queue = calloc(128,1);
assert(redis_queue);

redis_server = calloc(64,1);
assert(redis_server);

redis_srv_port = 6379;
while ((opt = getopt(argc, argv, "b:hs:p:")) != -1) {
while ((opt = getopt(argc, argv, "b:hs:p:q:")) != -1) {
switch (opt) {
case 's':
strncpy(redis_server,optarg,64);
strncpy(redis_server, optarg, 64);
break;
case 'p':
redis_srv_port = atoi(optarg);
break;
case 'b':
strncpy(namedpipe , optarg, 128);
break;
case 'q':
strncpy(redis_queue , optarg, 128);
break;
case 'h':
usage();
return EXIT_SUCCESS;
Expand All @@ -365,6 +364,8 @@ int main(int argc, char* argv[])
fprintf(stderr,"[ERROR] A named pipe must be specified\n");
return EXIT_FAILURE;
}
if (!redis_queue[0])
strncpy(redis_queue,PQUEUE,128);

fifo = fopen(namedpipe, "wb");
if (fifo == NULL) {
Expand All @@ -373,13 +374,14 @@ int main(int argc, char* argv[])

fprintf(stderr, "[INFO] redis_server = %s\n",redis_server);
fprintf(stderr, "[INFO] redis_port = %d\n",redis_srv_port);
fprintf(stderr, "[INFO] redis_queue = %s\n", redis_queue);
fprintf(stderr, "[INFO] named pipe = %s\n", namedpipe);
fprintf(stderr, "[INFO] pid = %d\n",(int)getpid());

params.encap = WTAP_ENCAP_ETHERNET;
pdh = wtap_dump_fdopen(fileno(fifo), WTAP_FILE_TYPE_SUBTYPE_PCAPNG, WTAP_UNCOMPRESSED, &params, &write_err);
if (pdh != NULL){
r = process_input_queue(pdh, redis_server, redis_srv_port);
r = process_input_queue(pdh, redis_server, redis_srv_port, redis_queue);
if (r == EXIT_FAILURE) {
fprintf(stderr,"[ERROR] Something went wrong in during processing");
}else{
Expand Down

0 comments on commit b9ef5bc

Please sign in to comment.