From e5ae5948abb1980991a46908a3c806fd4f208108 Mon Sep 17 00:00:00 2001 From: Olav Kvittem Date: Mon, 6 Jul 2015 09:00:46 +0200 Subject: [PATCH] -k, -f -C -A --- crude/main.c | 343 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 310 insertions(+), 33 deletions(-) diff --git a/crude/main.c b/crude/main.c index 4191a4d..857a510 100644 --- a/crude/main.c +++ b/crude/main.c @@ -19,6 +19,10 @@ * * Authors: Juha Laine * Sampo Saaristo + * 2015-07-02 Olav Kvittem + * added reading pcap to disk, using kernel timestamp, forking write process to disk + * bug: reading AF_PACKET does not receive any packets + * caveats: only implemented for rec_to_file() * *****************************************************************************/ #include @@ -28,7 +32,7 @@ #include #include #include -#include +#include #include #include #include @@ -40,9 +44,17 @@ #include #include #include +#include +#include #include - - +//#include +#include +#include +#include /* the L2 protocols */ +//#include // provided by netpacket/packet.h + +#define OBUFFERS 10000 // number of records in ouput buffer +#define OCHUNKS 100 // number of records for each write chunk /* * Private struct for each flow of runtime statistics */ @@ -82,16 +94,35 @@ static int rec_to_file(unsigned short,unsigned long); static int rec_n_print(unsigned short,unsigned long); void crude_handler(int); static void get_str_addr(struct sockaddr_storage,char[]); +static int output_data(int, int); +static int fork_write(); /* * Global variables */ int main_socket = 0; /* The socket to listen to */ int main_file = 0; /* File to read from/to write to */ +int main_output = 0; /* file to write output (is main_file if not fork) */ unsigned long pkt_count = 0; /* Counter for received/processed packets */ struct flow_stat *flows = NULL; /* List of flows for runtime statistics */ int nflows = 0; char preffix[] = "::ffff:"; +short opt_af_packet=0, opt_pcap=0; +short opt_ktime=0; // kernel packet time by ioctl +short opt_fork=0; // fork for write in separate process with noblock for improving packet loss +char pcap_dev[100]=""; +pcap_t *main_pcap; + +// data structure for ouput ring buffer +int osize; +int obufsize; +int ochunksize; +int ofree = 0; // next free pos +int odata = 0; // next data pos +char *obuffer=NULL; +int pipefd[2]; // a pipe for writing to file in a subprocess + + int main(int argc, char **argv) { @@ -120,13 +151,16 @@ int main(int argc, char **argv) "under GNU GENERAL PUBLIC LICENSE Version 2.\n",VERSION); struct sockaddr_storage test; - printf("Size of sockaddr_storage: %d ss_len:%d \n",sizeof(test),sizeof(test.ss_family)); + printf("Size of sockaddr_storage: %lu ss_len:%lu \n",sizeof(test),sizeof(test.ss_family)); while((retval >= 0) && - ((cmd_char = getopt(argc,argv,"hvd:p:i:l:P:n:s:6D:")) != EOF)) + ((cmd_char = getopt(argc,argv,"Ahvd:p:C:fi:kl:P:n:s:6D:")) != EOF)) { switch(cmd_char) { + case 'A': + opt_af_packet=1; + break; case 'v': if((optind == 2) && (argc == 2)) { @@ -173,6 +207,9 @@ int main(int argc, char **argv) } break; + case 'f': // fork output write + opt_fork=1; + break; case 'p': if(optarg != NULL) { @@ -201,6 +238,25 @@ int main(int argc, char **argv) } break; + case 'k': // use kernel time + opt_ktime=1; + // retval=1; + break; + + case 'C': // pcap capture + if(optarg != NULL) + { + errno=0; + opt_pcap=1; + strncpy(pcap_dev, optarg, 10); + } + else + { + RUDEBUG1("crude: invalid commandline arguments!\n"); + retval = -2; + } + break; + case 'i': if(optarg != NULL) { @@ -469,6 +525,10 @@ static void usage(char *name) "\t-P priority = process realtime priority {1-90}\n\n" "\t-s #[,# ...] = don't record: get runtime stats for specified flows\n" "\t-n # = exit automatically after # packets has been logged.\n" + "\t-k = use kernel time stamps(ioctl).\n" + "\t-A = use Linux AF_PACKET capture\n" + "\t-f = fork a write process to avoid write blocks\n" + "\t-C interface = use pcap packet capture\n" "\t use CTRL+C to exit the program otherwise\n\n", name); } @@ -496,7 +556,12 @@ void crude_handler(int value) } if(nflows > 0){ print_stats(); } - if(main_file > 0){ close(main_file); } + if(main_file > 0){ + output_data(main_output, 1); + close(main_file); + if ( main_file != main_output) // forking + close(main_output); // will hopefully close the pipe so that the child dies + } if(main_socket > 0){ close(main_socket); } RUDEBUG1("\ncrude: captured/processed %lu packets\n", pkt_count); exit(0); @@ -549,22 +614,62 @@ static int make_conn(unsigned short port, char *ifaddr,int use_ip6,unsigned int return -1; } - if((our_sock=socket(our_addr.ss_family ,SOCK_DGRAM,0)) < 0) - { + + if ( opt_af_packet) { // ip packet interface + if ( (our_sock=socket( AF_PACKET ,SOCK_DGRAM, htons(ETH_P_IP|ETH_P_IPV6) ) ) < 0 ) { + RUDEBUG1("crude: socket(AF_PACKET,) failed : %s\n", strerror(errno)); + } + struct sockaddr_ll link; + memset(&link, 0, sizeof(struct sockaddr_ll)); + link.sll_family = AF_PACKET; + link.sll_ifindex = if_nametoindex("eth0"); + // link.sll_protocol = ETH_P_IP; + + if(bind(our_sock, (struct sockaddr *)&link, sizeof(struct sockaddr_ll)) < 0) + { + RUDEBUG1("crude: bind(AF_packet) failed: %s\n", strerror(errno)); + close(our_sock); + return -3; + } + } else if (opt_pcap) { + struct bpf_program fp; /* The compiled filter */ + char filter[100]; + char errbuf[PCAP_ERRBUF_SIZE]; + + sprintf( filter, "udp and port %d", port); + + if ( ( main_pcap=pcap_open_live("eth0", BUFSIZ, 1, 1000, errbuf) ) == NULL){ + RUDEBUG1("crude: pcap open_live failed : %s\n", strerror(errno)); + return(-1); + } + if ( pcap_compile(main_pcap, &fp, filter, 0, PCAP_NETMASK_UNKNOWN) == -1) { + RUDEBUG1("Couldn't parse filter %s: %s\n", filter, pcap_geterr(main_pcap)); + return(-1); + } + if ( pcap_setfilter(main_pcap, &fp) == -1) { + RUDEBUG1("Couldn't install filter %s: %s\n", filter, pcap_geterr(main_pcap)); + return(-1); + } + + } else { + + if( (our_sock=socket(our_addr.ss_family ,SOCK_DGRAM,0) ) < 0) + { RUDEBUG1("crude: socket() failed: %s\n", strerror(errno)); return -2; - } - - if(bind(our_sock, (struct sockaddr *)&our_addr, sizeof(struct sockaddr_storage)) < 0) - { - RUDEBUG1("crude: bind() failed: %s\n", strerror(errno)); - close(our_sock); - return -3; + } + + if(bind(our_sock, (struct sockaddr *)&our_addr, sizeof(struct sockaddr_storage)) < 0) + { + RUDEBUG1("crude: bind() failed: %s\n", strerror(errno)); + close(our_sock); + return -3; + } } if(isMulticastAddr(&our_addr)>0) { - RUDEBUG7("crude: make_conn(): address is a multicast one\n",port,ifaddr); + RUDEBUG7("crude: make_conn(): address is a multicast one : port %d if %s\n",port,ifaddr); if(joinGroup(our_sock, 0, 200, &our_addr,ifindex) < 0){ RUDEBUG1("crude: make_conn() failed to join group\n"); close(our_sock); @@ -856,6 +961,13 @@ void print_stats(void) printf("\n"); } +static int check_response( struct timeval tw1, char *msg){ + struct timeval tw2; + gettimeofday(&tw2, NULL); + int tdiff_us=(tw2.tv_sec-tw1.tv_sec)*10^6 + (tw2.tv_usec-tw1.tv_usec); + if (tdiff_us > 500) + fprintf( stderr, "used time us: %d - task %s\n", tdiff_us, msg ); +} /* * rec_to_file() - record packets to file in binary format... @@ -866,47 +978,211 @@ static int rec_to_file(unsigned short port, unsigned long limit) int wri_bytes = 0; /* Bytes written */ int src_len = sizeof(struct sockaddr_storage); struct sockaddr_storage src_addr; - struct timeval time1; + struct timeval time1, tw1; struct crude_struct other_info; char buffer[PMAXSIZE]; + struct udp_data *rude_data= (struct udp_data *)&buffer; + int rude_seq=0; /* Initialize some variables */ memset(buffer,0,PMAXSIZE); other_info.dest_port = htons(port); - while(1) - { + // establish output file ring buffer + osize = sizeof(struct udp_data) + sizeof(struct crude_struct); + obufsize=OBUFFERS * osize; + ochunksize = osize * OCHUNKS; + obuffer=malloc(obufsize); + + if ( opt_fork) + main_output=fork_write(); + else + main_output=main_file; + + + struct pcap_pkthdr *header; + const u_char *packet; + int result; + + while(1) + { + if ( opt_pcap){ + if ( ( result= pcap_next_ex(main_pcap, &header, &packet)) < 1 ){ + if ( result == 0 ) continue; // just timeout waiting for data + fprintf( stderr, "crude: error when receiving pcap packet: result %d , error %s \n", + result, pcap_geterr(main_pcap)); + rec_bytes=0; + }; + rec_bytes = header->caplen; + } else { rec_bytes = recvfrom(main_socket, buffer, PMAXSIZE, 0, (struct sockaddr *)&src_addr, (socklen_t *)&src_len); - if(rec_bytes <= 0) + } + if(rec_bytes <= 0) { - RUDEBUG1("crude: error when receiving packet: %s\n",strerror(errno)); + RUDEBUG1("crude: error when receiving packet: %s\n",strerror(errno)); } - else + else { - gettimeofday(&time1, NULL); + if ( opt_pcap || opt_af_packet){ + if ( opt_pcap) { // ether header + rude_data= (struct udp_data*) (packet + 14) ; // feil : sizeof(struct ether_header); + memcpy(&time1, &header->ts, sizeof(struct timeval)); + } + + unsigned short iphdrlen; + struct iphdr *iph = (struct iphdr*)rude_data; + + if ( iph->protocol == 17){ // udp + iphdrlen = iph->ihl*4; + struct udphdr *udph = (struct udphdr*)((char*)rude_data + iphdrlen); + if ( ntohs(udph->dest) != port) + continue; + + struct sockaddr_in* v4=(struct sockaddr_in *)&(src_addr); + v4->sin_family=AF_INET; + v4->sin_addr=*(struct in_addr*)&iph->saddr; + /* turn around elsewhere + uint32_t v4adr=ntohl(iph->saddr); + memcpy( &(v4->sin_addr), &v4adr, sizeof(uint32_t)); */ + v4->sin_port = port; + } else { + continue; + } + rude_data=(struct udp_data*)((char*)rude_data + iphdrlen + sizeof(struct udphdr)); + } + + if ( opt_ktime) + ioctl(main_socket, SIOCGSTAMP, &time1); + else if ( time1.tv_sec == 0) + gettimeofday(&time1, NULL); + + int newseq=ntohl(rude_data->sequence_number); + if ( rude_seq > 0 && newseq != (rude_seq+1) ){ + fprintf( stderr, "Lost seq : %d - %d\n", rude_seq, newseq-rude_seq-1); + } + rude_seq=newseq; + + pkt_count++; other_info.rx_time_seconds = htonl(time1.tv_sec); other_info.rx_time_useconds = htonl(time1.tv_usec); other_info.pkt_size = htonl(rec_bytes); - //other_info.src_port = ((struct sockaddr_in6 *)&src_addr)->sin6_port; - //other_info.src_addr = ((struct sockaddr_in6 *)&src_addr)->sin6_addr; - other_info.src =src_addr; - - wri_bytes = write(main_file,buffer,(sizeof(struct udp_data))); - wri_bytes += write(main_file,&other_info,sizeof(struct crude_struct)); - RUDEBUG7("crude: pkt %lu (%ldbytes) status: %s\n", - pkt_count, rec_bytes, strerror(errno)); + other_info.src = src_addr; /* don't use network order here + struct sockaddr_in *sin_ref=(struct sockaddr_in*)&other_info.src; + struct sockaddr_in *src_ref=(struct sockaddr_in*)&src_addr; + sin_ref->sin_family=htons(AF_INET); + sin_ref->sin_port = htons(((struct sockaddr_in*)&src_addr)->sin_port); + uint32_t* h_a_r = (uint32_t*)&(src_ref->sin_addr); + uint32_t n_a = htonl(*h_a_r); + memcpy(&(sin_ref->sin_addr), &n_a, sizeof(struct in_addr));*/ + + // fill up buffer + int newfree=ofree + osize; + if ( ( newfree > odata && newfree <= obufsize ) || + ( newfree < odata ) ){ + memcpy( obuffer + ofree, rude_data, sizeof(struct udp_data)); + memcpy( obuffer + ofree + sizeof(struct udp_data), &other_info, + sizeof(struct crude_struct)); + + ofree = newfree % obufsize; + } else { + perror ("Internal output buffer overflow\n"); + } + // output data until empty or bad return + output_data(main_output, 0); + // RUDEBUG7("crude: pkt %lu (%lubytes) status: %s\n", + // pkt_count, rec_bytes, strerror(errno)); } if((limit != 0) && (pkt_count >= limit)){ break; } - } /* end of while */ + } /* end of while */ + RUDEBUG1("\ncrude: captured/processed %lu packets \n", pkt_count); return 0; } +static int fork_write( ){ + // fork a process to write to disk to avoid hangups + pid_t child = 0; + int recsz= sizeof(struct udp_data) + sizeof(struct crude_struct); + char *filebuffer=malloc(recsz); + + if (pipe(pipefd) == -1 ) // Create new pipe + { + fprintf(stderr, "Failed to create pipe\n"); + exit(1); + } + fcntl(pipefd[1], F_SETPIPE_SZ, 1001001); + fprintf( stderr, "pipe size is : %d\n", fcntl(pipefd[1], F_GETPIPE_SZ) ); + child = fork(); + + if ( child){ // mother process + // set noblocking write to avoid buffer emptying wait. + if ( fcntl(pipefd[1], F_SETFL, fcntl(pipefd[1], F_GETFL, 0) | O_NONBLOCK) != 0 ) + fprintf( stderr, "fcntl nonblock failed : %d\n", errno ); + else RUDEBUG1( "fcntl nonblock OK flag : %d\n", fcntl(pipefd[1], F_GETFL, 0) & O_NONBLOCK ); + return(pipefd[1]); + + } else { // in child - read and write to disk + close(pipefd[1]); // Child is read only + signal(SIGINT, SIG_IGN); // ignore signal - just wait for eof + int bytes; + + while( (bytes=read(pipefd[0], filebuffer, ochunksize)) > 0){ + write( main_file, filebuffer, bytes); + RUDEBUG7("crude: writing to disk %u \n", bytes) ; + } + exit(0); + } +} + +// output data until empty or bad return +static int output_data(int fd, int flush){ + ssize_t result; + int written=0; + struct timeval tw1, tw2; + + while (ofree != odata){ + int ochunk=ofree-odata; + if ( ochunk < 0 ) ochunk = obufsize-odata; + if (flush || ochunk >= ochunksize ){ // delay writes until chunksize + // return(0);// just to see that write is not the problem ####################### + if ( ochunk > ochunksize ) ochunk=ochunksize ; + gettimeofday(&tw1, NULL); + if ( (result=write(fd, obuffer+odata, ochunk )) > 0 ){ // IO buffer overflow + check_response(tw1, "write"); + odata= (odata + result) % obufsize; + written += result; + RUDEBUG1("odata %lu ofree %d chunk %d result %d\n", odata, ofree, ochunk, (int)result); + } else { // I'll be back + check_response(tw1, "error"); + if ( result == EAGAIN || result == EWOULDBLOCK){ + RUDEBUG1( "Write failed wouldblock: reason %d\n", (int)result); + break; // probably hit full ouput buffer + } else { + fprintf( stderr, "Write failed : return %d : cause %s\n", (int)result, strerror(errno)); + } + } + } else { + break; + } + } + return(written); +} + + +/* not used */ + +static void handle_interrupt(int signum){ + output_data(pipefd[1], 1); + close(main_file); + fprintf(stderr, "Cancelled by signal : %d", signum); + fflush(stderr); + exit(1); +} /* * get_str_addr - prints address in readable format into buffer @@ -978,7 +1254,8 @@ static int rec_n_print(unsigned short port, unsigned long limit) } for( ; ; ) { - rec_bytes = recvmsg(main_socket, &msgh, MSG_WAITALL); + rec_bytes = recvmsg(main_socket, &msgh, MSG_WAITALL); + if(rec_bytes <= 0) { RUDEBUG1("crude: error when receiving packet: %s\n",strerror(errno)); -- GitLab