diff options
-rw-r--r-- | src/include/gnunet_protocols.h | 5 | ||||
-rw-r--r-- | src/include/gnunet_sensor_util_lib.h | 38 | ||||
-rw-r--r-- | src/sensor/gnunet-service-sensor_analysis.c | 12 | ||||
-rw-r--r-- | src/sensor/gnunet-service-sensor_reporting_anomaly.c | 426 | ||||
-rw-r--r-- | src/sensor/sensor.h | 15 |
5 files changed, 383 insertions, 113 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index ea295197c9..246f7f836a 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -2447,6 +2447,11 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_SENSOR_FULL 808 +/** + * Sensor anomaly report + */ +#define GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT 809 + /******************************************************************************* * PEERSTORE message types diff --git a/src/include/gnunet_sensor_util_lib.h b/src/include/gnunet_sensor_util_lib.h index b497d05d61..3ff6097c70 100644 --- a/src/include/gnunet_sensor_util_lib.h +++ b/src/include/gnunet_sensor_util_lib.h @@ -297,6 +297,44 @@ struct GNUNET_SENSOR_SensorFullMessage }; +/** + * Message carrying an anomaly status change report + */ +struct AnomalyReportMessage +{ + + /** + * Message header + */ + struct GNUNET_MessageHeader header; + + /** + * Hash of sensor name + */ + struct GNUNET_HashCode sensorname_hash; + + /** + * First part of sensor version number + */ + uint16_t sensorversion_major; + + /** + * Second part of sensor version name + */ + uint16_t sensorversion_minor; + + /** + * New anomaly status + */ + uint8_t anomalous; + + /** + * Percentage of neighbors reported the same anomaly + */ + float anomalous_neighbors; + +}; + GNUNET_NETWORK_STRUCT_END /** diff --git a/src/sensor/gnunet-service-sensor_analysis.c b/src/sensor/gnunet-service-sensor_analysis.c index 46422d0396..63249a4e2a 100644 --- a/src/sensor/gnunet-service-sensor_analysis.c +++ b/src/sensor/gnunet-service-sensor_analysis.c @@ -209,11 +209,7 @@ sensor_watcher (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg) model->anomalous = GNUNET_YES; LOG (GNUNET_ERROR_TYPE_WARNING, "Anomaly state started for sensor `%s'.\n", model->sensor->name); - GNUNET_PEERSTORE_store (peerstore, "senosr-analysis", &peerid, - model->sensor->name, &model->anomalous, - sizeof (model->anomalous), - GNUNET_TIME_absolute_get (), - GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL); + SENSOR_reporting_anomaly_update (model->sensor, model->anomalous); } } else @@ -226,11 +222,7 @@ sensor_watcher (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg) model->anomalous = GNUNET_NO; LOG (GNUNET_ERROR_TYPE_INFO, "Anomaly state stopped for sensor `%s'.\n", model->sensor->name); - GNUNET_PEERSTORE_store (peerstore, "senosr-analysis", &peerid, - model->sensor->name, &model->anomalous, - sizeof (model->anomalous), - GNUNET_TIME_absolute_get (), - GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL); + SENSOR_reporting_anomaly_update (model->sensor, model->anomalous); } } return GNUNET_YES; diff --git a/src/sensor/gnunet-service-sensor_reporting_anomaly.c b/src/sensor/gnunet-service-sensor_reporting_anomaly.c index 9a7a4eeb05..ff07c2e523 100644 --- a/src/sensor/gnunet-service-sensor_reporting_anomaly.c +++ b/src/sensor/gnunet-service-sensor_reporting_anomaly.c @@ -31,47 +31,64 @@ #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting-anomaly",__VA_ARGS__) -struct AnomalyReportingContext +struct AnomalyInfo { /** * DLL */ - struct AnomalyReportingContext *prev; + struct AnomalyInfo *prev; /** * DLL */ - struct AnomalyReportingContext *next; + struct AnomalyInfo *next; /** * Sensor information */ struct GNUNET_SENSOR_SensorInfo *sensor; + /** + * Current anomalous status of sensor + */ + int anomalous; + + /** + * List of peers that reported an anomaly for this sensor + */ + struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors; + }; /** - * Context of a connection to a peer through CORE + * Information about a connected CORE peer. + * Note that we only know about a connected peer if it is running the same + * application (sensor anomaly reporting) as us. */ -struct CorePeerContext +struct CorePeer { /** * DLL */ - struct CorePeerContext *prev; + struct CorePeer *prev; /** * DLL */ - struct CorePeerContext *next; + struct CorePeer *next; /** * Peer identity of connected peer */ struct GNUNET_PeerIdentity *peerid; + /** + * Message queue for messages to be sent to this peer + */ + struct GNUNET_MQ_Handle *mq; + }; @@ -81,6 +98,11 @@ struct CorePeerContext static const struct GNUNET_CONFIGURATION_Handle *cfg; /** + * Multihashmap of loaded sensors + */ +static struct GNUNET_CONTAINER_MultiHashMap *sensors; + +/** * Handle to core service */ static struct GNUNET_CORE_Handle *core; @@ -91,35 +113,74 @@ static struct GNUNET_CORE_Handle *core; static struct GNUNET_PeerIdentity mypeerid; /** - * Head of DLL of anomaly reporting contexts + * Head of DLL of anomaly info structs */ -static struct AnomalyReportingContext *arc_head; +static struct AnomalyInfo *ai_head; /** - * Tail of DLL of anomaly reporting contexts + * Tail of DLL of anomaly info structs */ -static struct AnomalyReportingContext *arc_tail; +static struct AnomalyInfo *ai_tail; /** - * Head of DLL of CORE peer contexts + * Head of DLL of CORE peers */ -static struct CorePeerContext *cp_head; +static struct CorePeer *cp_head; /** - * Tail of DLL of CORE peer contexts + * Tail of DLL of CORE peers + */ +static struct CorePeer *cp_tail; + +/** + * Is the module started? + */ +static int module_running = GNUNET_NO; + +/** + * Number of known neighborhood peers + */ +static int neighborhood; + + +/** + * Destroy anomaly info struct + * + * @param ai struct to destroy */ -static struct CorePeerContext *cp_tail; +static void +destroy_anomaly_info (struct AnomalyInfo *ai) +{ + if (NULL != ai->anomalous_neighbors) + GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors); + GNUNET_free (ai); +} /** - * Destroy anomaly reporting context struct + * Destroy core peer struct * - * @param arc struct to destroy + * @param cp struct to destroy */ static void -destroy_anomaly_reporting_context (struct AnomalyReportingContext *arc) +destroy_core_peer (struct CorePeer *cp) { - GNUNET_free (arc); + struct AnomalyInfo *ai; + + if (NULL != cp->mq) + { + GNUNET_MQ_destroy (cp->mq); + cp->mq = NULL; + } + ai = ai_head; + while (NULL != ai) + { + GNUNET_assert (NULL != ai->anomalous_neighbors); + GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, + cp->peerid); + ai = ai->next; + } + GNUNET_free (cp); } @@ -129,98 +190,150 @@ destroy_anomaly_reporting_context (struct AnomalyReportingContext *arc) void SENSOR_reporting_anomaly_stop () { - struct AnomalyReportingContext *arc; + struct AnomalyInfo *ai; + struct CorePeer *cp; LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n"); - //TODO: destroy core peer contexts - //TODO: destroy core connection - arc = arc_head; - while (NULL != arc) + module_running = GNUNET_NO; + ai = ai_head; + while (NULL != ai) + { + GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai); + destroy_anomaly_info (ai); + ai = ai_head; + } + cp = cp_head; + while (NULL != cp) { - GNUNET_CONTAINER_DLL_remove (arc_head, arc_tail, arc); - destroy_anomaly_reporting_context (arc); - arc = arc_head; + GNUNET_CONTAINER_DLL_remove (cp_head, cp_tail, cp); + destroy_core_peer (cp); + cp = cp_head; + } + neighborhood = 0; + if (NULL != core) + { + GNUNET_CORE_disconnect (core); + core = NULL; } } /** - * Iterator for defined sensors - * Watches sensors for anomaly status change to report + * Gets the anomaly info struct related to the given sensor * - * @param cls unused - * @param key unused - * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information - * @return #GNUNET_YES to continue iterations + * @param sensor Sensor to search by */ -static int -init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key, - void *value) +static struct AnomalyInfo * +get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor) { - struct GNUNET_SENSOR_SensorInfo *sensor = value; - struct AnomalyReportingContext *arc; - - if (NULL == sensor->collection_point) - return GNUNET_YES; - LOG (GNUNET_ERROR_TYPE_INFO, - "Reporting sensor `%s' anomalies to collection point `%s'.\n", - sensor->name, GNUNET_i2s_full (sensor->collection_point)); - arc = GNUNET_new (struct AnomalyReportingContext); - arc->sensor = sensor; - GNUNET_CONTAINER_DLL_insert (arc_head, arc_tail, arc); - //TODO - return GNUNET_YES; + struct AnomalyInfo *ai; + + ai = ai_head; + while (NULL != ai) + { + if (ai->sensor == sensor) + { + return ai; + } + ai = ai->next; + } + return NULL; } /** - * Function called after #GNUNET_CORE_connect has succeeded (or failed - * for good). Note that the private key of the peer is intentionally - * not exposed here; if you need it, your process should try to read - * the private key file directly (which should work if you are - * authorized...). Implementations of this function must not call - * #GNUNET_CORE_disconnect (other than by scheduling a new task to - * do this later). + * Create an anomaly report message from a given anomaly info structb inside an + * MQ envelope. * - * @param cls closure (unused) - * @param my_identity ID of this peer, NULL if we failed + * @param ai Anomaly info struct to use + * @return + */ +static struct GNUNET_MQ_Envelope * +create_anomaly_report_message (struct AnomalyInfo *ai) +{ + struct AnomalyReportMessage *arm; + struct GNUNET_MQ_Envelope *ev; + + ev = GNUNET_MQ_msg (arm, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT); + GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1, + &arm->sensorname_hash); + arm->sensorversion_major = ai->sensor->version_major; + arm->sensorversion_minor = ai->sensor->version_minor; + arm->anomalous = ai->anomalous; + arm->anomalous_neighbors = + ((float) GNUNET_CONTAINER_multipeermap_size (ai->anomalous_neighbors)) / + neighborhood; + return ev; +} + + +/** + * Send given anomaly info report to given core peer. + * + * @param cp Core peer to send the report to + * @param ai Anomaly info to report */ static void -core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity) +send_anomaly_report (struct CorePeer *cp, struct AnomalyInfo *ai) { - if (NULL == my_identity) - { - LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n")); - SENSOR_reporting_anomaly_stop (); - return; - } - if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - _("Peer identity received from CORE doesn't match ours.\n")); - SENSOR_reporting_anomaly_stop (); - return; - } + struct GNUNET_MQ_Envelope *ev; + + GNUNET_assert (NULL != cp->mq); + ev = create_anomaly_report_message (ai); + GNUNET_MQ_send (cp->mq, ev); } /** - * Method called whenever a given peer connects through CORE. + * An inbound anomaly report is received from a peer through CORE. * * @param cls closure (unused) - * @param peer peer identity this notification is about + * @param peer the other peer involved + * @param message the actual message + * @return #GNUNET_OK to keep the connection open, + * #GNUNET_SYSERR to close connection to the peer (signal serious error) */ -static void -core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer) +static int +handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other, + const struct GNUNET_MessageHeader *message) { - struct CorePeerContext *cp; + struct AnomalyReportMessage *arm; + struct GNUNET_SENSOR_SensorInfo *sensor; + struct AnomalyInfo *ai; + int peer_in_list; - if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer)) - return; - cp = GNUNET_new (struct CorePeerContext); - cp->peerid = (struct GNUNET_PeerIdentity *)peer; - GNUNET_CONTAINER_DLL_insert (cp_head, cp_tail, cp); - //TODO: report to peer your anomaly status + arm = (struct AnomalyReportMessage *) message; + sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash); + if (NULL == sensor || sensor->version_major != arm->sensorversion_major || + sensor->version_minor != arm->sensorversion_minor) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "I don't have the sensor reported by the peer `%s'.\n", + GNUNET_i2s (other)); + return GNUNET_OK; + } + ai = get_anomaly_info_by_sensor (sensor); + GNUNET_assert (NULL != ai); + peer_in_list = + GNUNET_CONTAINER_multipeermap_contains (ai->anomalous_neighbors, other); + if (GNUNET_YES == ai->anomalous) + { + if (GNUNET_YES == peer_in_list) + GNUNET_break_op (0); + else + GNUNET_CONTAINER_multipeermap_put (ai->anomalous_neighbors, other, NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + else + { + if (GNUNET_NO == peer_in_list) + GNUNET_break_op (0); + else + GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, other); + } + //TODO: report to collection point if anomalous neigbors jump up or down + // by a configurable percentage or is now 0% or 100% + return GNUNET_OK; } @@ -233,19 +346,21 @@ core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer) static void core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer) { - struct CorePeerContext *cp; + struct CorePeer *cp; if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer)) return; + neighborhood--; cp = cp_head; while (NULL != cp) { if (peer == cp->peerid) { GNUNET_CONTAINER_DLL_remove (cp_head, cp_tail, cp); - //TODO: call peer context destroy function + destroy_core_peer (cp); return; } + cp = cp->next; } LOG (GNUNET_ERROR_TYPE_ERROR, _("Received disconnect notification from CORE" @@ -254,21 +369,119 @@ core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer) /** - * An inbound message is received from a peer through CORE. + * Method called whenever a given peer connects through CORE. * * @param cls closure (unused) - * @param peer the other peer involved - * @param message the actual message - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close connection to the peer (signal serious error) + * @param peer peer identity this notification is about + */ +static void +core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer) +{ + struct CorePeer *cp; + struct AnomalyInfo *ai; + + if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer)) + return; + neighborhood++; + cp = GNUNET_new (struct CorePeer); + cp->peerid = (struct GNUNET_PeerIdentity *) peer; + cp->mq = GNUNET_CORE_mq_create (core, peer); + GNUNET_CONTAINER_DLL_insert (cp_head, cp_tail, cp); + /* Send any locally anomalous sensors to the new peer */ + ai = ai_head; + while (NULL != ai) + { + if (GNUNET_YES == ai->anomalous) + send_anomaly_report (cp, ai); + ai = ai->next; + } +} + + +/** + * Function called after #GNUNET_CORE_connect has succeeded (or failed + * for good). Note that the private key of the peer is intentionally + * not exposed here; if you need it, your process should try to read + * the private key file directly (which should work if you are + * authorized...). Implementations of this function must not call + * #GNUNET_CORE_disconnect (other than by scheduling a new task to + * do this later). + * + * @param cls closure (unused) + * @param my_identity ID of this peer, NULL if we failed + */ +static void +core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity) +{ + if (NULL == my_identity) + { + LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n")); + SENSOR_reporting_anomaly_stop (); + return; + } + if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Peer identity received from CORE init doesn't match ours.\n")); + SENSOR_reporting_anomaly_stop (); + return; + } +} + + +/** + * Used by the analysis module to tell the reporting module about a change in + * the anomaly status of a sensor. + * + * @param sensor Related sensor + * @param anomalous The new sensor anomalous status + */ +void +SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor, + int anomalous) +{ + struct AnomalyInfo *ai; + struct CorePeer *cp; + + if (GNUNET_NO == module_running) + return; + ai = get_anomaly_info_by_sensor (sensor); + GNUNET_assert (NULL != ai); + ai->anomalous = anomalous; + /* Report change to all neighbors */ + cp = cp_head; + while (NULL != cp) + { + send_anomaly_report (cp, ai); + cp = cp->next; + } + //TODO: report change to collection point if report_anomalies +} + + +/** + * Iterator for defined sensors and creates anomaly info context + * + * @param cls unused + * @param key unused + * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information + * @return #GNUNET_YES to continue iterations */ static int -core_inbound_cb (void *cls, - const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message) +init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key, + void *value) { - //TODO - return GNUNET_OK; + struct GNUNET_SENSOR_SensorInfo *sensor = value; + struct AnomalyInfo *ai; + + ai = GNUNET_new (struct AnomalyInfo); + + ai->sensor = sensor; + ai->anomalous = GNUNET_NO; + ai->anomalous_neighbors = + GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); + GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai); + return GNUNET_YES; } @@ -276,26 +489,37 @@ core_inbound_cb (void *cls, * Start the sensor anomaly reporting module * * @param c our service configuration - * @param sensors multihashmap of loaded sensors + * @param s multihashmap of loaded sensors * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise */ int SENSOR_reporting_anomaly_start (const struct GNUNET_CONFIGURATION_Handle *c, - struct GNUNET_CONTAINER_MultiHashMap *sensors) + struct GNUNET_CONTAINER_MultiHashMap *s) { static struct GNUNET_CORE_MessageHandler core_handlers[] = { - {NULL, 0, 0} //TODO + {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT, + sizeof (struct AnomalyReportMessage)}, + {NULL, 0, 0} }; LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor anomaly reporting module.\n"); - GNUNET_assert (NULL != sensors); + GNUNET_assert (NULL != s); + sensors = s; cfg = c; core = GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb, - &core_disconnect_cb, core_inbound_cb, GNUNET_NO, - NULL, GNUNET_YES, core_handlers); + &core_disconnect_cb, NULL, GNUNET_YES, NULL, + GNUNET_YES, core_handlers); + if (NULL == core) + { + LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n")); + SENSOR_reporting_anomaly_stop (); + return GNUNET_SYSERR; + } GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid); GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL); + neighborhood = 0; + module_running = GNUNET_YES; return GNUNET_OK; } diff --git a/src/sensor/sensor.h b/src/sensor/sensor.h index 45fe2edda9..3cca3b0c0a 100644 --- a/src/sensor/sensor.h +++ b/src/sensor/sensor.h @@ -105,17 +105,28 @@ SENSOR_reporting_value_start (const struct GNUNET_CONFIGURATION_Handle *c, void SENSOR_reporting_anomaly_stop (); +/** + * Used by the analysis module to tell the reporting module about a change in + * the anomaly status of a sensor. + * + * @param sensor Related sensor + * @param anomalous The new sensor anomalous status + */ +void +SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor, + int anomalous); + /** * Start the sensor anomaly reporting module * * @param c our service configuration - * @param sensors multihashmap of loaded sensors + * @param s multihashmap of loaded sensors * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise */ int SENSOR_reporting_anomaly_start (const struct GNUNET_CONFIGURATION_Handle *c, - struct GNUNET_CONTAINER_MultiHashMap *sensors); + struct GNUNET_CONTAINER_MultiHashMap *s); /** |