Commit 4c21dbde authored by 's avatar
Browse files

Added flow direction recognition.

Note: EXPIRED_FLOWS has changed in previous revision (rev. 1349), packetloss client and mapi server have to share the same version of expiredflowshash.h


git-svn-id: file:///home/svn/mapi/trunk@1350 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 11df0119
......@@ -147,17 +147,21 @@ struct thost {
int fd; // descriptor of a flow opened to this host
int fid_loss; // id of a function (EXPIREDR_FLOWS, BPF_FILTER)
int fid_load; // id of a function (BYTE_COUNTER)
struct tstats stats; // stats
struct tstats *statsrelto; // stats (in relation to other hosts)
struct tstats *statsdirto; // stats (in direction to other hosts)
struct tstats *oldstatsrelto; // old "
struct tstats *oldstatsdirto; // old "
struct tstats *stats_dev; // stats (for interfaces/devices)
struct tstats stats; // stats
struct tstats *statsrelwith; // stats (in relation to other hosts)
struct tstats *statsdirto; // stats (in direction to other hosts)
struct tstats *statsdirfrom; // stats (in direction from other hosts)
struct tstats *oldstatsrelwith; // old "
struct tstats *oldstatsdirto; // old "
struct tstats *oldstatsdirfrom; // old "
struct tstats *stats_dev; // stats (for interfaces/devices)
};
struct thost *host; // hosts
int hosts = 0; // number of hosts (total)
struct tstats stats;
// Functions declaration
static void terminate();
void print_protocols(FILE * fp, char *protocol, unsigned long long total_pkts, unsigned long long total_lost);
......@@ -354,23 +358,41 @@ int main(int argc, char *argv[])
// Initialize host arrays {{{
for(h = 0; h < hosts; h++) {
if((host[h].statsrelto = (struct tstats *) malloc(malloc_size = (hosts * sizeof(struct tstats)))) == NULL) {
if((host[h].statsrelwith = (struct tstats *) malloc(malloc_size = (hosts * sizeof(struct tstats)))) == NULL) {
fprintf(stderr, "malloc(): could not allocate memory");
return -1;
}
memset(host[h].statsrelto, 0, malloc_size);
memset(host[h].statsrelwith, 0, malloc_size);
// we do not recognize flow direction yet :(
host[h].statsdirto = host[h].statsrelto;
if((host[h].statsdirto = (struct tstats *) malloc(malloc_size = (hosts * sizeof(struct tstats)))) == NULL) {
fprintf(stderr, "malloc(): could not allocate memory");
return -1;
}
memset(host[h].statsdirto, 0, malloc_size);
if((host[h].oldstatsrelto = (struct tstats *) malloc(malloc_size = (hosts * sizeof(struct tstats)))) == NULL) {
if((host[h].statsdirfrom = (struct tstats *) malloc(malloc_size = (hosts * sizeof(struct tstats)))) == NULL) {
fprintf(stderr, "malloc(): could not allocate memory");
return -1;
}
memset(host[h].oldstatsrelto, 0, malloc_size);
memset(host[h].statsdirfrom, 0, malloc_size);
// we do not recognize flow direction yet :(
host[h].oldstatsdirto = host[h].oldstatsrelto;
if((host[h].oldstatsrelwith = (struct tstats *) malloc(malloc_size = (hosts * sizeof(struct tstats)))) == NULL) {
fprintf(stderr, "malloc(): could not allocate memory");
return -1;
}
memset(host[h].oldstatsrelwith, 0, malloc_size);
if((host[h].oldstatsdirto = (struct tstats *) malloc(malloc_size = (hosts * sizeof(struct tstats)))) == NULL) {
fprintf(stderr, "malloc(): could not allocate memory");
return -1;
}
memset(host[h].oldstatsdirto, 0, malloc_size);
if((host[h].oldstatsdirfrom = (struct tstats *) malloc(malloc_size = (hosts * sizeof(struct tstats)))) == NULL) {
fprintf(stderr, "malloc(): could not allocate memory");
return -1;
}
memset(host[h].oldstatsdirfrom, 0, malloc_size);
if((host[h].mapi_stats_dev = (struct mapi_stat *) malloc(malloc_size = (host[h].devices * sizeof(struct mapi_stat)))) == NULL) {
fprintf(stderr, "malloc(): could not allocate memory");
......@@ -403,8 +425,8 @@ int main(int argc, char *argv[])
// Create flows at hosts, build and apply filters, connect {{{
for(h = 0; h < hosts; h++) {
fprintf(stdout, "mapi_create_flow(%s)\n", argv[h + 1]); fflush(stdout);
fprintf(stdout, "packetloss: mapi_create_flow(%s)\n", argv[h + 1]); fflush(stdout);
if ((host[h].fd = mapi_create_flow(argv[h + 1])) < 0) {
fprintf(stderr, "Could not create flow using '%s'\n", argv[h + 1]);
mapi_read_error(&err_no, error);
......@@ -426,7 +448,7 @@ int main(int argc, char *argv[])
// }
sprintf(filter, argv[argc - 1]);
fprintf(stdout, "packetloss: apply BPF_FILTER: \"%s\"\n", filter); fflush(stdout);
fprintf(stdout, "packetloss: apply BPF_FILTER: \"%s\" at %s\n", filter, host[h].name); fflush(stdout);
if ((host[h].fid_loss = mapi_apply_function(host[h].fd, "BPF_FILTER", filter)) < 0) {
fprintf(stderr, "Count not apply function BPF_FILTER to flow %d\n", host[h].fd);
......@@ -435,6 +457,8 @@ int main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
fprintf(stdout, "packetloss: apply BYTE_COUNTER at %s\n", host[h].name); fflush(stdout);
if ((host[h].fid_load = mapi_apply_function(host[h].fd, "BYTE_COUNTER")) < 0) { // need for netload
fprintf(stderr, "Count not apply function BYTE_COUNTER to flow %d\n", host[h].fd);
mapi_read_error(&err_no, error);
......@@ -442,6 +466,8 @@ int main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
fprintf(stdout, "packetloss: apply EXPIRED_FLOWS at %s\n", host[h].name); fflush(stdout);
if ((host[h].fid_loss = mapi_apply_function(host[h].fd, "EXPIRED_FLOWS")) < 0) {
fprintf(stderr, "Count not apply function EXPIRED_FLOWS to flow %d\n", host[h].fd);
mapi_read_error(&err_no, error);
......@@ -537,6 +563,7 @@ int main(int argc, char *argv[])
// Read host results from hosts {{{
for (h = 0; h < hosts; h++) {
fprintf(stdout, "packetloss: reading results from %s...\n", host[h].name); fflush(stdout);
host[h].results_dev = (mapi_results_t *) mapi_read_results(host[h].fd, host[h].fid_loss);
host[h].results_netload_dev = (mapi_results_t *) mapi_read_results(host[h].fd, host[h].fid_load);
}
......@@ -544,6 +571,7 @@ int main(int argc, char *argv[])
// Read host stats from hosts {{{
for(h = 0; h < hosts; h++) {
fprintf(stdout, "packetloss: reading mapi_stats from %s...\n", host[h].name); fflush(stdout);
if (mapi_stats(argv[h + 1], host[h].mapi_stats_dev) < 0) {
fprintf(stderr, "Error in mapi_stats using '%s'\n", argv[h + 1]);
mapi_read_error(&err_no, error);
......@@ -606,6 +634,7 @@ int main(int argc, char *argv[])
fflush(fp_log_lost);
}
stats.totalpk.loss += pkloss;
host[h].stats.totalpk.loss += pkloss;
// Loss by protocols {{{
......@@ -624,6 +653,7 @@ int main(int argc, char *argv[])
}
stats.totalpk.count += flowdata->packets_count;
host[h].stats.totalpk.count += flowdata->packets_count;
host[h].stats_dev[d].totalpk.count += flowdata->packets_count;
flowdata = flowdata + 1;
......@@ -652,7 +682,7 @@ int main(int argc, char *argv[])
else { // FIXME
for(h1 = 0; h1 < hosts; h1++) {
for(h2 = 0; h2 < hosts; h2++) {
if(h2 != h1) p += snprintf(p, 24, ":%f", (float) (h2 > h1 ? -1 : 1) * ((host[h1].statsrelto[h2].totalpk.loss - host[h1].oldstatsdirto[h2].totalpk.loss) * 100) / (float) (totalpk.count - old_totalpk.count));
if(h2 != h1) p += snprintf(p, 24, ":%f", (float) (h2 > h1 ? -1 : 1) * ((host[h1].statsdirto[h2].totalpk.loss - host[h1].oldstatsdirto[h2].totalpk.loss) * 100) / (float) (totalpk.count - old_totalpk.count));
}
}
}
......@@ -670,7 +700,7 @@ int main(int argc, char *argv[])
else { // FIXME
for(h1 = 0; h1 < hosts; h1++) {
for(h2 = 0; h2 < hosts; h2++) {
if(h2 != h1) p2 += snprintf(p2, 24, ":%f", (float) (h2 > h1 ? -1 : 1) * (host[h1].statsrelto[h2].totalpk.loss - host[h1].oldstatsdirto[h2].totalpk.loss));
if(h2 != h1) p2 += snprintf(p2, 24, ":%f", (float) (h2 > h1 ? -1 : 1) * (host[h1].statsdirto[h2].totalpk.loss - host[h1].oldstatsdirto[h2].totalpk.loss));
}
}
}
......@@ -783,7 +813,9 @@ int main(int argc, char *argv[])
old_totalpk.loss = totalpk.loss;
for(h1 = 0; h1 < hosts; h1++) {
for(h2 = 0; h2 < hosts; h2++) {
if(h2 != h1) host[h1].oldstatsdirto[h2].totalpk.loss = host[h1].statsrelto[h2].totalpk.loss;
if(h2 != h1) host[h1].oldstatsrelwith[h2].totalpk.loss = host[h1].statsrelwith[h2].totalpk.loss;
if(h2 != h1) host[h1].oldstatsdirto[h2].totalpk.loss = host[h1].statsdirto[h2].totalpk.loss;
if(h2 != h1) host[h1].oldstatsdirfrom[h2].totalpk.loss = host[h1].statsdirfrom[h2].totalpk.loss;
}
}
// }}}
......@@ -849,22 +881,38 @@ int main(int argc, char *argv[])
fprintf(fp, "top;plr1m;Last 1 Month Packet Loss Ratio;%.2f %% (%llu pkts)\n", (float) (onemonth.pkcount ? (onemonth.pkloss * 100) / (float) onemonth.pkcount : onemonth.pkcount), onemonth.pkloss);
fprintf(fp, "top;plr1y;Last 1 Year Packet Loss Ratio;%.2f %% (%llu pkts)\n", (float) (oneyear.pkcount ? (oneyear.pkloss * 100) / (float) oneyear.pkcount : oneyear.pkcount), oneyear.pkloss);
//TODO fprintf(fp, "top;blr1h;Last Hour Byte Loss Ratio;%.2f %% (%.2f Kbytes);\n", (float) (totalbytes.lost * 100) / (float) totalbytes.count, (float) (totalbytes.lost / (float) 1024));
fprintf(fp, "\n");
fprintf(fp, "top;tfc;Total flows captured;%u\n", stats.totalflows.count);
fprintf(fp, "top;tfm;Total flows matched;%u (%.2f %%)\n", stats.totalflows.matched, (float) (stats.totalflows.count ? (float) stats.totalflows.matched * 100 / (float) stats.totalflows.count : 0));
fprintf(fp, "top;tfu;Total flows unmatched;%u\n", stats.totalflows.unmatched);
fprintf(fp, "top;tft;Total flows timeouted;%u\n", stats.totalflows.timeouted);
fprintf(fp, "top;tpc;Total packets captured;%llu\n", stats.totalpk.count);
fprintf(fp, "top;tpl;Total packets lost;%llu (%.2f %%)\n", stats.totalpk.loss, (float) (stats.totalpk.count ? (float) stats.totalpk.loss * 100 / (float) stats.totalpk.count : 0));
fprintf(fp, "\n");
for(h1 = 0; h1 < hosts; h1++) {
fprintf(fp, "top;tfch%d;Total flows captured at %s;%u\n", h1, host[h1].name, host[h1].stats.totalflows.count);
fprintf(fp, "top;tfmh%d;Total flows matched at %s;%u\n", h1, host[h1].name, host[h1].stats.totalflows.matched);
fprintf(fp, "top;tfch%d;Total flows captured at %s;%u (%.2f %%)\n", h1, host[h1].name, host[h1].stats.totalflows.count, (float) (stats.totalflows.count ? (float) host[h1].stats.totalflows.count * 100 / (float) stats.totalflows.count : 0));
fprintf(fp, "top;tfmh%d;Total flows matched at %s;%u (%.2f %%)\n", h1, host[h1].name, host[h1].stats.totalflows.matched, (float) (host[h1].stats.totalflows.count ? (float) host[h1].stats.totalflows.matched * 100 / (float) host[h1].stats.totalflows.count : 0));
fprintf(fp, "top;tfuh%d;Total flows unmatched at %s;%u\n", h1, host[h1].name, host[h1].stats.totalflows.unmatched);
fprintf(fp, "top;tfth%d;Total flows timeouted at %s;%u\n", h1, host[h1].name, host[h1].stats.totalflows.timeouted);
fprintf(fp, "top;tpch%d;Total packets captured at %s;%llu\n", h1, host[h1].name, host[h1].stats.totalpk.count);
for(d = 0; d < host[h1].devices; d++) {
fprintf(fp, "top;tpch%dd%d;Total packets captured at %s:%s;%llu\n", h1, d, host[h1].name, host[h1].mapi_stats_dev[d].dev, host[h1].stats_dev[d].totalpk.count);
}
fprintf(fp, "\n");
for(h2 = 0; h2 < hosts; h2++) {
if(h2 != h1) {
fprintf(fp, "top;tfmrh%dh%d;Total flows matched at %s in relation to %s;%u\n", h1, h2, host[h1].name, host[h2].name, host[h1].statsrelto[h2].totalflows.matched);
fprintf(fp, "top;tftrh%dh%d;Total flows timeouted at %s in relation to %s;%u\n", h1, h2, host[h1].name, host[h2].name, host[h1].statsrelto[h2].totalflows.timeouted);
fprintf(fp, "top;tpmrh%dh%d;Total packets matched at %s in relation to %s;%llu\n", h1, h2, host[h1].name, host[h2].name, host[h1].statsrelto[h2].totalpk.matched);
fprintf(fp, "top;tplrh%dh%d;Total packets lost at %s in relation to %s;%llu (%.2f %%)\n", h1, h2, host[h1].name, host[h2].name, host[h1].statsrelto[h2].totalpk.loss, (float)(totalpk.count ? (host[h1].statsrelto[h2].totalpk.loss * 100) / (float) totalpk.count: totalpk.count));
fprintf(fp, "top;tfmrwh%dh%d;Total flows matched at %s in relation with %s;%u (%.2f %%)\n", h1, h2, host[h1].name, host[h2].name, host[h1].statsrelwith[h2].totalflows.matched, (float) (host[h1].stats.totalflows.matched ? (float) host[h1].statsrelwith[h2].totalflows.matched * 100 / (float) host[h1].stats.totalflows.matched : 0));
fprintf(fp, "top;tfmdth%dh%d;Total flows matched at %s in direction to %s;%u (%.2f %%)\n", h1, h2, host[h1].name, host[h2].name, host[h1].statsdirto[h2].totalflows.matched, (float) (host[h1].stats.totalflows.matched ? (float) host[h1].statsdirto[h2].totalflows.matched * 100 / (float) host[h1].stats.totalflows.matched : 0));
fprintf(fp, "top;tfmdfh%dh%d;Total flows matched at %s in direction from %s;%u (%.2f %%)\n", h2, h1, host[h2].name, host[h1].name, host[h2].statsdirfrom[h1].totalflows.matched, (float) (host[h2].stats.totalflows.matched ? (float) host[h2].statsdirfrom[h1].totalflows.matched * 100 / (float) host[h2].stats.totalflows.matched : 0));
fprintf(fp, "top;tftrwh%dh%d;Total flows timeouted at %s in relation with %s;%u\n", h1, h2, host[h1].name, host[h2].name, host[h1].statsrelwith[h2].totalflows.timeouted);
fprintf(fp, "top;tftdth%dh%d;Total flows timeouted at %s in direction to %s;%u\n", h1, h2, host[h1].name, host[h2].name, host[h1].statsdirto[h2].totalflows.timeouted);
fprintf(fp, "top;tftdfh%dh%d;Total flows timeouted at %s in direction from %s;%u\n", h2, h1, host[h2].name, host[h1].name, host[h2].statsdirfrom[h1].totalflows.timeouted);
fprintf(fp, "top;tpmrwh%dh%d;Total packets matched at %s in relation with %s;%llu\n", h1, h2, host[h1].name, host[h2].name, host[h1].statsrelwith[h2].totalpk.matched);
fprintf(fp, "top;tpmdth%dh%d;Total packets matched at %s in direction to %s;%llu\n", h1, h2, host[h1].name, host[h2].name, host[h1].statsdirto[h2].totalpk.matched);
fprintf(fp, "top;tpmdfh%dh%d;Total packets matched at %s in direction from %s;%llu\n", h2, h1, host[h2].name, host[h1].name, host[h2].statsdirfrom[h1].totalpk.matched);
fprintf(fp, "top;tplrwh%dh%d;Total packets lost at %s in relation with %s;%llu (%.2f %%)\n", h1, h2, host[h1].name, host[h2].name, host[h1].statsrelwith[h2].totalpk.loss, (float) (totalpk.count ? (float) host[h1].statsrelwith[h2].totalpk.loss * 100 / (float) totalpk.count : 0));
fprintf(fp, "top;tpldth%dh%d;Total packets lost at %s in direction to %s;%llu (%.2f %%)\n", h1, h2, host[h1].name, host[h2].name, host[h1].statsdirto[h2].totalpk.loss, (float) (totalpk.count ? (float) host[h1].statsdirto[h2].totalpk.loss * 100 / (float) totalpk.count : 0));
fprintf(fp, "top;tpldfh%dh%d;Total packets lost at %s in direction from %s;%llu (%.2f %%)\n", h2, h1, host[h2].name, host[h1].name, host[h2].statsdirfrom[h1].totalpk.loss, (float) (totalpk.count ? (float) host[h2].statsdirfrom[h1].totalpk.loss * 100 / (float) totalpk.count : 0));
fprintf(fp, "\n");
}
}
......@@ -1079,9 +1127,10 @@ void drop_old_and_unassigned_flows(void)
next = tmp->next;
if ((current_ts - tmp->data.timestamp > ((unsigned long long) (TIMEOUT)) << 32)) {
// if ((current_ts - tmp->data.timestamp > ((unsigned long long) (TIMEOUT)) << 32) || !tmp->assigned) {
stats.totalflows.timeouted++;
host[tmp->sensor].stats.totalflows.timeouted++;
if(tmp->assigned) {
host[tmp->sensor].stats.totalflows.timeouted++;
host[tmp->sensor].statsrelto[tmp->sensor2].totalflows.timeouted++;
host[tmp->sensor].statsrelwith[tmp->sensor2].totalflows.timeouted++;
}
if (LOGFILE) {
......@@ -1142,14 +1191,18 @@ struct hash_bucket *compare_bucket(struct hash_bucket *bucket, struct flow_data
unsigned long long cmp_results(struct flow_data *data, unsigned int sensor)
{
struct hash_bucket *prev, *flowathost[3];
int currenthost = 0;
int otherhost = 1;
int looser = 0;
int winner = 1;
int loosersensor = 0;
int winnersensor = 1;
int tmphost = 3;
struct hash_bucket *prev, *flow[3];
unsigned int currenthostflow = 0;
unsigned int otherhostflow = 1;
unsigned int tmpflow = 2;
unsigned int looserflow = 0;
unsigned int winnerflow = 1;
unsigned int senderflow = 0;
unsigned int receiverflow = 0;
unsigned int loosersensor = 0;
unsigned int winnersensor = 1;
unsigned int sendersensor = 0;
unsigned int receiversensor = 1;
unsigned int hsvalue = 0, pos, pkloss = 0;
ip_addr saddr = data->saddr, daddr = data->daddr;
u_short sport = data->sport, dport = data->dport;
......@@ -1162,81 +1215,102 @@ unsigned long long cmp_results(struct flow_data *data, unsigned int sensor)
pos = hsvalue % HASHTABLE_SIZE;
// Flow doesnt exist? (it could be already removed after processing it at the other host)
if ((flowathost[otherhost] = compare_bucket(hashtable[pos], data, sensor, 1)) == NULL ||
(flowathost[currenthost] = compare_bucket(hashtable[pos], data, sensor, 0)) == NULL) {
if ((flow[otherhostflow] = compare_bucket(hashtable[pos], data, sensor, 1)) == NULL ||
(flow[currenthostflow] = compare_bucket(hashtable[pos], data, sensor, 0)) == NULL) {
return (0);
}
else {
if(!flowathost[currenthost]->assigned || !flowathost[otherhost]->assigned) {
fprintf(stdout, "packetloss: WARNING flow NOT ASSIGNED (all values ignored)! (currenthost/otherhost: %p/%p, assign: %d/%d)\n", flowathost[currenthost], flowathost[otherhost], flowathost[currenthost]->assigned, flowathost[otherhost]->assigned); fflush(stdout);
if(!flow[currenthostflow]->assigned || !flow[otherhostflow]->assigned) {
fprintf(stdout, "packetloss: WARNING flow NOT ASSIGNED (all values ignored)! (currenthostflow/otherhostflow: %p/%p, assign: %d/%d)\n", flow[currenthostflow], flow[otherhostflow], flow[currenthostflow]->assigned, flow[otherhostflow]->assigned); fflush(stdout);
}
else {
if(flowathost[currenthost]->sensor != sensor) {fprintf(stderr, "packetloss: ERROR sensor!=sensor\n"); exit(EXIT_FAILURE); }
if(flowathost[currenthost]->sensor2 == sensor) {fprintf(stderr, "packetloss: ERROR sensor2==sensor\n"); exit(EXIT_FAILURE); }
matchedsensor = flowathost[currenthost]->sensor2;
host[sensor].statsrelto[matchedsensor].totalpk.matched += flowathost[currenthost]->data.packets_count;
host[matchedsensor].statsrelto[sensor].totalpk.matched += flowathost[otherhost]->data.packets_count;
matchedsensor = flow[currenthostflow]->sensor2;
// set the-host-which-looses as "looser"
looser = (flowathost[0]->data.packets_count > flowathost[1]->data.packets_count);
winner = !looser;
if(flow[currenthostflow]->data.ttl_pkt1 <= flow[otherhostflow]->data.ttl_pkt1) {
sendersensor = sensor;
receiversensor = matchedsensor;
senderflow = currenthostflow;
receiverflow = otherhostflow;
}
else {
sendersensor = matchedsensor;
receiversensor = sensor;
senderflow = otherhostflow;
receiverflow = currenthostflow;
}
host[sensor].statsrelwith[matchedsensor].totalpk.matched += flow[currenthostflow]->data.packets_count;
host[matchedsensor].statsrelwith[sensor].totalpk.matched += flow[otherhostflow]->data.packets_count;
host[sendersensor].statsdirto[receiversensor].totalpk.matched += flow[senderflow]->data.packets_count;
host[receiversensor].statsdirfrom[sendersensor].totalpk.matched += flow[receiverflow]->data.packets_count;
loosersensor = flowathost[looser]->sensor;
winnersensor = flowathost[winner]->sensor;
// set the index of the flow at host which looses as "looserflow"
looserflow = (flow[0]->data.packets_count > flow[1]->data.packets_count);
winnerflow = !looserflow;
totalpk.count += flowathost[winner]->data.packets_count; // total number of packets
totalbytes.count += flowathost[winner]->data.bytes_count; // total number of bytes
if(flowathost[winner]->data.packets_count != flowathost[looser]->data.packets_count) { // dont be pedantic -- different adapters, different bytes_counts...
loosersensor = flow[looserflow]->sensor;
winnersensor = flow[winnerflow]->sensor;
totalpk.count += flow[winnerflow]->data.packets_count; // total number of packets
totalbytes.count += flow[winnerflow]->data.bytes_count; // total number of bytes
if(flow[winnerflow]->data.packets_count != flow[looserflow]->data.packets_count) { // dont be pedantic -- different adapters, different bytes_counts...
totalbytes.lost += flowathost[winner]->data.bytes_count - flowathost[looser]->data.bytes_count; // total byte loss
totalbytes.lost += flow[winnerflow]->data.bytes_count - flow[looserflow]->data.bytes_count; // total byte loss
}
pkloss = flowathost[winner]->data.packets_count - flowathost[looser]->data.packets_count; // packet loss
pkloss = flow[winnerflow]->data.packets_count - flow[looserflow]->data.packets_count; // packet loss
// store stats -- now or never
host[loosersensor].statsrelto[winnersensor].totalpk.loss += pkloss;
if (sport == 2233 || dport == 2233) mapi.pkts += flowathost[winner]->data.packets_count;
else if (sport == 1194 || dport == 1194) openvpn.pkts += flowathost[winner]->data.packets_count;
else if (sport == 554 || dport == 554) rtsp.pkts += flowathost[winner]->data.packets_count;
else if (sport == 443 || dport == 443) https.pkts += flowathost[winner]->data.packets_count;
else if (sport == 115 || dport == 115) sftp.pkts += flowathost[winner]->data.packets_count;
else if (sport == 80 || dport == 80) http.pkts += flowathost[winner]->data.packets_count;
else if (sport == 53 || dport == 53) dns.pkts += flowathost[winner]->data.packets_count;
else if (sport == 25 || dport == 25) smtp.pkts += flowathost[winner]->data.packets_count;
else if (sport == 23 || dport == 23) telnet.pkts += flowathost[winner]->data.packets_count;
else if (sport == 22 || dport == 22) ssh.pkts += flowathost[winner]->data.packets_count;
else if (sport == 21 || dport == 21) ftp.pkts += flowathost[winner]->data.packets_count;
host[loosersensor].statsrelwith[winnersensor].totalpk.loss += pkloss;
if(loosersensor == sendersensor) {
host[loosersensor].statsdirto[winnersensor].totalpk.loss += pkloss;
}
else {
host[loosersensor].statsdirfrom[winnersensor].totalpk.loss += pkloss;
//host[winnersensor].statsdirfrom[loosersensor].totalpk.loss += pkloss;
}
if (sport == 2233 || dport == 2233) mapi.pkts += flow[winnerflow]->data.packets_count;
else if (sport == 1194 || dport == 1194) openvpn.pkts += flow[winnerflow]->data.packets_count;
else if (sport == 554 || dport == 554) rtsp.pkts += flow[winnerflow]->data.packets_count;
else if (sport == 443 || dport == 443) https.pkts += flow[winnerflow]->data.packets_count;
else if (sport == 115 || dport == 115) sftp.pkts += flow[winnerflow]->data.packets_count;
else if (sport == 80 || dport == 80) http.pkts += flow[winnerflow]->data.packets_count;
else if (sport == 53 || dport == 53) dns.pkts += flow[winnerflow]->data.packets_count;
else if (sport == 25 || dport == 25) smtp.pkts += flow[winnerflow]->data.packets_count;
else if (sport == 23 || dport == 23) telnet.pkts += flow[winnerflow]->data.packets_count;
else if (sport == 22 || dport == 22) ssh.pkts += flow[winnerflow]->data.packets_count;
else if (sport == 21 || dport == 21) ftp.pkts += flow[winnerflow]->data.packets_count;
}
}
// Remove expired flows from hash table that were matched
prev = hashtable[pos]; // remove node first
if (prev == flowathost[otherhost]) { // if flowathost[otherhost] == head
if (prev == flow[otherhostflow]) { // if flow[otherhostflow] == head
hashtable[pos] = hashtable[pos]->next;
free(flowathost[otherhost]);
free(flow[otherhostflow]);
} else {
while (prev->next != flowathost[otherhost]) // find flowathost[otherhost]
while (prev->next != flow[otherhostflow]) // find flow[otherhostflow]
prev = prev->next;
prev->next = flowathost[otherhost]->next;
free(flowathost[otherhost]);
prev->next = flow[otherhostflow]->next;
free(flow[otherhostflow]);
}
if ((flowathost[tmphost] = compare_bucket(hashtable[pos], data, sensor, 0)) == NULL) {
if ((flow[tmpflow] = compare_bucket(hashtable[pos], data, sensor, 0)) == NULL) {
fprintf(stderr, "Error, expired flow not found in hashtable[%d]!\n", pos);
fprintf(stderr, "flow[%d] : saddr = %d.%d.%d.%d, daddr = %d.%d.%d.%d, sport = %d, dport = %d, packets = %lld\n", sensor, flowathost[currenthost]->data.saddr.byte1, flowathost[currenthost]->data.saddr.byte2, flowathost[currenthost]->data.saddr.byte3, flowathost[currenthost]->data.saddr.byte4, flowathost[currenthost]->data.daddr.byte1, flowathost[currenthost]->data.daddr.byte2, flowathost[currenthost]->data.daddr.byte3, flowathost[currenthost]->data.daddr.byte4, flowathost[currenthost]->data.sport, flowathost[currenthost]->data.dport, flowathost[currenthost]->data.packets_count);
fprintf(stderr, "flow[%d] : saddr = %d.%d.%d.%d, daddr = %d.%d.%d.%d, sport = %d, dport = %d, packets = %lld\n", sensor, flow[currenthostflow]->data.saddr.byte1, flow[currenthostflow]->data.saddr.byte2, flow[currenthostflow]->data.saddr.byte3, flow[currenthostflow]->data.saddr.byte4, flow[currenthostflow]->data.daddr.byte1, flow[currenthostflow]->data.daddr.byte2, flow[currenthostflow]->data.daddr.byte3, flow[currenthostflow]->data.daddr.byte4, flow[currenthostflow]->data.sport, flow[currenthostflow]->data.dport, flow[currenthostflow]->data.packets_count);
}
else {
prev = hashtable[pos]; // remove node first
if (prev == flowathost[currenthost]) { // if flowathost[currenthost] == head
if (prev == flow[currenthostflow]) { // if flow[currenthostflow] == head
hashtable[pos] = hashtable[pos]->next;
free(flowathost[currenthost]);
free(flow[currenthostflow]);
} else {
while (prev->next != flowathost[currenthost]) // find flowathost[currenthost]
while (prev->next != flow[currenthostflow]) // find flow[currenthostflow]
prev = prev->next;
prev->next = flowathost[currenthost]->next;
free(flowathost[currenthost]);
prev->next = flow[currenthostflow]->next;
free(flow[currenthostflow]);
}
}
......@@ -1286,26 +1360,6 @@ void store_results(unsigned int sensor, unsigned int count, struct flow_data *fl
tmp->data.timestamp = current_ts;
}
// if(!tmp->assigned) {
// // Flow exist at the other host?
// if ((tmp2 = compare_bucket(hashtable[pos], flowdata, sensor, 1)) != NULL) {
// if(tmp2->assigned) { // the other flow already assigned
// if(tmp2->sensor2 != sensor) {
// fprintf(stderr, "packetloss: ERROR flow assigned to ANOTHER host!!! (%d->%d, not %d)\n", tmp2->sensor, tmp2->sensor2, sensor);
// }
// }
// else { // assign the other flow
// tmp2->sensor2 = tmp->sensor; // with tmp->sensor
// tmp2->assigned = 1;
// host[tmp2->sensor].stats.totalflows.matched++;
// host[tmp2->sensor].statsrelto[tmp->sensor].totalflows.matched++;
// }
// tmp->assigned = 1; // assign the existing flow
// tmp->sensor2 = tmp2->sensor; // with sensor2
// host[tmp->sensor].stats.totalflows.matched++;
// host[tmp->sensor].statsrelto[tmp2->sensor].totalflows.matched++;
// }
// }
}
else {
assigned = 0;
......@@ -1319,9 +1373,19 @@ void store_results(unsigned int sensor, unsigned int count, struct flow_data *fl
else { // assign the other flow
tmp->sensor2 = sensor; // with sensor
tmp->assigned = 1;
stats.totalflows.unmatched--;
stats.totalflows.matched++;
host[tmp->sensor].stats.totalflows.unmatched--;
host[tmp->sensor].stats.totalflows.matched++;
host[tmp->sensor].statsrelto[sensor].totalflows.matched++;
host[tmp->sensor].statsrelwith[sensor].totalflows.matched++;
if(flowdata->ttl_pkt1 < tmp->data.ttl_pkt1) {
host[tmp->sensor].statsdirfrom[sensor].totalflows.matched++;
//host[sensor].statsdirto[tmp->sensor].totalflows.matched++;
}
else {
host[tmp->sensor].statsdirto[sensor].totalflows.matched++;
//host[sensor].statsdirfrom[tmp->sensor].totalflows.matched++;
}
}
assigned = 1; // shall assign the new flow soon
sensor2 = tmp->sensor; // with sensor2
......@@ -1346,12 +1410,23 @@ void store_results(unsigned int sensor, unsigned int count, struct flow_data *fl
hashtable[pos] = tmp;
if(assigned) {
stats.totalflows.matched++;
host[sensor].stats.totalflows.matched++;
host[sensor].statsrelto[sensor2].totalflows.matched++;
host[sensor].statsrelwith[sensor2].totalflows.matched++;
if(flowdata->ttl_pkt1 < tmp->data.ttl_pkt1) {
host[sensor].statsdirto[sensor2].totalflows.matched++;
//host[sensor2].statsdirfrom[sensor].totalflows.matched++;
}
else {
host[sensor].statsdirfrom[sensor2].totalflows.matched++;
//host[sensor2].statsdirto[sensor].totalflows.matched++;
}
}
else {
stats.totalflows.unmatched++;
host[sensor].stats.totalflows.unmatched++;
}
stats.totalflows.count++;
host[sensor].stats.totalflows.count++;
}
flowdata++;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment