aboutsummaryrefslogtreecommitdiff
path: root/fs/ocfs2/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/cluster')
-rw-r--r--fs/ocfs2/cluster/Makefile2
-rw-r--r--fs/ocfs2/cluster/heartbeat.c101
-rw-r--r--fs/ocfs2/cluster/masklog.h3
-rw-r--r--fs/ocfs2/cluster/nodemanager.c4
-rw-r--r--fs/ocfs2/cluster/quorum.c2
-rw-r--r--fs/ocfs2/cluster/sys.c2
-rw-r--r--fs/ocfs2/cluster/tcp.c252
-rw-r--r--fs/ocfs2/cluster/tcp_internal.h2
-rw-r--r--fs/ocfs2/cluster/ver.c42
-rw-r--r--fs/ocfs2/cluster/ver.h31
10 files changed, 179 insertions, 262 deletions
diff --git a/fs/ocfs2/cluster/Makefile b/fs/ocfs2/cluster/Makefile
index bc8c5e7d860..1aefc0350ec 100644
--- a/fs/ocfs2/cluster/Makefile
+++ b/fs/ocfs2/cluster/Makefile
@@ -1,4 +1,4 @@
obj-$(CONFIG_OCFS2_FS) += ocfs2_nodemanager.o
ocfs2_nodemanager-objs := heartbeat.o masklog.o sys.o nodemanager.o \
- quorum.o tcp.o netdebug.o ver.o
+ quorum.o tcp.o netdebug.o
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c
index f7c648d7d6b..73039295d0d 100644
--- a/fs/ocfs2/cluster/heartbeat.c
+++ b/fs/ocfs2/cluster/heartbeat.c
@@ -35,6 +35,7 @@
#include <linux/time.h>
#include <linux/debugfs.h>
#include <linux/slab.h>
+#include <linux/bitmap.h>
#include "heartbeat.h"
#include "tcp.h"
@@ -176,7 +177,7 @@ static void o2hb_dead_threshold_set(unsigned int threshold)
}
}
-static int o2hb_global_hearbeat_mode_set(unsigned int hb_mode)
+static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode)
{
int ret = -1;
@@ -282,15 +283,6 @@ struct o2hb_bio_wait_ctxt {
int wc_error;
};
-static int o2hb_pop_count(void *map, int count)
-{
- int i = -1, pop = 0;
-
- while ((i = find_next_bit(map, count, i + 1)) < count)
- pop++;
- return pop;
-}
-
static void o2hb_write_timeout(struct work_struct *work)
{
int failed, quorum;
@@ -307,9 +299,9 @@ static void o2hb_write_timeout(struct work_struct *work)
spin_lock_irqsave(&o2hb_live_lock, flags);
if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
- failed = o2hb_pop_count(&o2hb_failed_region_bitmap,
+ failed = bitmap_weight(o2hb_failed_region_bitmap,
O2NM_MAX_REGIONS);
- quorum = o2hb_pop_count(&o2hb_quorum_region_bitmap,
+ quorum = bitmap_weight(o2hb_quorum_region_bitmap,
O2NM_MAX_REGIONS);
spin_unlock_irqrestore(&o2hb_live_lock, flags);
@@ -421,7 +413,7 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
}
/* Must put everything in 512 byte sectors for the bio... */
- bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
+ bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9);
bio->bi_bdev = reg->hr_bdev;
bio->bi_private = wc;
bio->bi_end_io = o2hb_bio_end_io;
@@ -500,7 +492,7 @@ static int o2hb_issue_node_write(struct o2hb_region *reg,
}
atomic_inc(&write_wc->wc_num_reqs);
- submit_bio(WRITE, bio);
+ submit_bio(WRITE_SYNC, bio);
status = 0;
bail:
@@ -628,11 +620,9 @@ static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
struct o2nm_node *node,
int idx)
{
- struct list_head *iter;
struct o2hb_callback_func *f;
- list_for_each(iter, &hbcall->list) {
- f = list_entry(iter, struct o2hb_callback_func, hc_item);
+ list_for_each_entry(f, &hbcall->list, hc_item) {
mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
(f->hc_func)(node, idx, f->hc_data);
}
@@ -641,16 +631,9 @@ static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
/* Will run the list in order until we process the passed event */
static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
{
- int empty;
struct o2hb_callback *hbcall;
struct o2hb_node_event *event;
- spin_lock(&o2hb_live_lock);
- empty = list_empty(&queued_event->hn_item);
- spin_unlock(&o2hb_live_lock);
- if (empty)
- return;
-
/* Holding callback sem assures we don't alter the callback
* lists when doing this, and serializes ourselves with other
* processes wanting callbacks. */
@@ -709,6 +692,7 @@ static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
struct o2hb_node_event event =
{ .hn_item = LIST_HEAD_INIT(event.hn_item), };
struct o2nm_node *node;
+ int queued = 0;
node = o2nm_get_node_by_num(slot->ds_node_num);
if (!node)
@@ -726,11 +710,13 @@ static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
slot->ds_node_num);
+ queued = 1;
}
}
spin_unlock(&o2hb_live_lock);
- o2hb_run_event_list(&event);
+ if (queued)
+ o2hb_run_event_list(&event);
o2nm_node_put(node);
}
@@ -771,7 +757,7 @@ static void o2hb_set_quorum_device(struct o2hb_region *reg)
* If global heartbeat active, unpin all regions if the
* region count > CUT_OFF
*/
- if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
+ if (bitmap_weight(o2hb_quorum_region_bitmap,
O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
o2hb_region_unpin(NULL);
unlock:
@@ -790,6 +776,7 @@ static int o2hb_check_slot(struct o2hb_region *reg,
unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
unsigned int slot_dead_ms;
int tmp;
+ int queued = 0;
memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
@@ -883,6 +870,7 @@ fire_callbacks:
slot->ds_node_num);
changed = 1;
+ queued = 1;
}
list_add_tail(&slot->ds_live_item,
@@ -934,6 +922,7 @@ fire_callbacks:
node, slot->ds_node_num);
changed = 1;
+ queued = 1;
}
/* We don't clear this because the node is still
@@ -949,30 +938,17 @@ fire_callbacks:
out:
spin_unlock(&o2hb_live_lock);
- o2hb_run_event_list(&event);
+ if (queued)
+ o2hb_run_event_list(&event);
if (node)
o2nm_node_put(node);
return changed;
}
-/* This could be faster if we just implmented a find_last_bit, but I
- * don't think the circumstances warrant it. */
-static int o2hb_highest_node(unsigned long *nodes,
- int numbits)
+static int o2hb_highest_node(unsigned long *nodes, int numbits)
{
- int highest, node;
-
- highest = numbits;
- node = -1;
- while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) {
- if (node >= numbits)
- break;
-
- highest = node;
- }
-
- return highest;
+ return find_last_bit(nodes, numbits);
}
static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
@@ -1131,7 +1107,7 @@ static int o2hb_thread(void *data)
mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
- set_user_nice(current, -20);
+ set_user_nice(current, MIN_NICE);
/* Pin node */
o2nm_depend_this_node();
@@ -1471,8 +1447,7 @@ static void o2hb_region_release(struct config_item *item)
mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
- if (reg->hr_tmp_block)
- kfree(reg->hr_tmp_block);
+ kfree(reg->hr_tmp_block);
if (reg->hr_slot_data) {
for (i = 0; i < reg->hr_num_pages; i++) {
@@ -1486,8 +1461,7 @@ static void o2hb_region_release(struct config_item *item)
if (reg->hr_bdev)
blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
- if (reg->hr_slots)
- kfree(reg->hr_slots);
+ kfree(reg->hr_slots);
kfree(reg->hr_db_regnum);
kfree(reg->hr_db_livenodes);
@@ -1833,7 +1807,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
live_threshold = O2HB_LIVE_THRESHOLD;
if (o2hb_global_heartbeat_active()) {
spin_lock(&o2hb_live_lock);
- if (o2hb_pop_count(&o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
+ if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
live_threshold <<= 1;
spin_unlock(&o2hb_live_lock);
}
@@ -2184,7 +2158,7 @@ static void o2hb_heartbeat_group_drop_item(struct config_group *group,
if (!o2hb_dependent_users)
goto unlock;
- if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
+ if (bitmap_weight(o2hb_quorum_region_bitmap,
O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
o2hb_region_pin(NULL);
@@ -2273,7 +2247,7 @@ ssize_t o2hb_heartbeat_group_mode_store(struct o2hb_heartbeat_group *group,
if (strnicmp(page, o2hb_heartbeat_mode_desc[i], len))
continue;
- ret = o2hb_global_hearbeat_mode_set(i);
+ ret = o2hb_global_heartbeat_mode_set(i);
if (!ret)
printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
o2hb_heartbeat_mode_desc[i]);
@@ -2306,7 +2280,7 @@ static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
NULL,
};
-static struct configfs_item_operations o2hb_hearbeat_group_item_ops = {
+static struct configfs_item_operations o2hb_heartbeat_group_item_ops = {
.show_attribute = o2hb_heartbeat_group_show,
.store_attribute = o2hb_heartbeat_group_store,
};
@@ -2318,7 +2292,7 @@ static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
static struct config_item_type o2hb_heartbeat_group_type = {
.ct_group_ops = &o2hb_heartbeat_group_group_ops,
- .ct_item_ops = &o2hb_hearbeat_group_item_ops,
+ .ct_item_ops = &o2hb_heartbeat_group_item_ops,
.ct_attrs = o2hb_heartbeat_group_attrs,
.ct_owner = THIS_MODULE,
};
@@ -2391,6 +2365,9 @@ static int o2hb_region_pin(const char *region_uuid)
assert_spin_locked(&o2hb_live_lock);
list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
+ if (reg->hr_item_dropped)
+ continue;
+
uuid = config_item_name(&reg->hr_item);
/* local heartbeat */
@@ -2441,6 +2418,9 @@ static void o2hb_region_unpin(const char *region_uuid)
assert_spin_locked(&o2hb_live_lock);
list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
+ if (reg->hr_item_dropped)
+ continue;
+
uuid = config_item_name(&reg->hr_item);
if (region_uuid) {
if (strcmp(region_uuid, uuid))
@@ -2478,7 +2458,7 @@ static int o2hb_region_inc_user(const char *region_uuid)
if (o2hb_dependent_users > 1)
goto unlock;
- if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
+ if (bitmap_weight(o2hb_quorum_region_bitmap,
O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
ret = o2hb_region_pin(NULL);
@@ -2512,8 +2492,7 @@ unlock:
int o2hb_register_callback(const char *region_uuid,
struct o2hb_callback_func *hc)
{
- struct o2hb_callback_func *tmp;
- struct list_head *iter;
+ struct o2hb_callback_func *f;
struct o2hb_callback *hbcall;
int ret;
@@ -2536,10 +2515,9 @@ int o2hb_register_callback(const char *region_uuid,
down_write(&o2hb_callback_sem);
- list_for_each(iter, &hbcall->list) {
- tmp = list_entry(iter, struct o2hb_callback_func, hc_item);
- if (hc->hc_priority < tmp->hc_priority) {
- list_add_tail(&hc->hc_item, iter);
+ list_for_each_entry(f, &hbcall->list, hc_item) {
+ if (hc->hc_priority < f->hc_priority) {
+ list_add_tail(&hc->hc_item, &f->hc_item);
break;
}
}
@@ -2656,6 +2634,9 @@ int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
p = region_uuids;
list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
+ if (reg->hr_item_dropped)
+ continue;
+
mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
if (numregs < max_regions) {
memcpy(p, config_item_name(&reg->hr_item),
diff --git a/fs/ocfs2/cluster/masklog.h b/fs/ocfs2/cluster/masklog.h
index baa2b9ef7ee..2260fb9e650 100644
--- a/fs/ocfs2/cluster/masklog.h
+++ b/fs/ocfs2/cluster/masklog.h
@@ -199,7 +199,8 @@ extern struct mlog_bits mlog_and_bits, mlog_not_bits;
#define mlog_errno(st) do { \
int _st = (st); \
if (_st != -ERESTARTSYS && _st != -EINTR && \
- _st != AOP_TRUNCATED_PAGE && _st != -ENOSPC) \
+ _st != AOP_TRUNCATED_PAGE && _st != -ENOSPC && \
+ _st != -EDQUOT) \
mlog(ML_ERROR, "status = %lld\n", (long long)_st); \
} while (0)
diff --git a/fs/ocfs2/cluster/nodemanager.c b/fs/ocfs2/cluster/nodemanager.c
index bb240647ca5..441c84e169e 100644
--- a/fs/ocfs2/cluster/nodemanager.c
+++ b/fs/ocfs2/cluster/nodemanager.c
@@ -29,7 +29,6 @@
#include "heartbeat.h"
#include "masklog.h"
#include "sys.h"
-#include "ver.h"
/* for now we operate under the assertion that there can be only one
* cluster active at a time. Changing this will require trickling
@@ -945,8 +944,6 @@ static int __init init_o2nm(void)
{
int ret = -1;
- cluster_print_version();
-
ret = o2hb_init();
if (ret)
goto out;
@@ -984,6 +981,7 @@ out:
MODULE_AUTHOR("Oracle");
MODULE_LICENSE("GPL");
+MODULE_DESCRIPTION("OCFS2 cluster management");
module_init(init_o2nm)
module_exit(exit_o2nm)
diff --git a/fs/ocfs2/cluster/quorum.c b/fs/ocfs2/cluster/quorum.c
index c19897d0fe1..1ec141e758d 100644
--- a/fs/ocfs2/cluster/quorum.c
+++ b/fs/ocfs2/cluster/quorum.c
@@ -264,7 +264,7 @@ void o2quo_hb_still_up(u8 node)
/* This is analogous to hb_up. as a node's connection comes up we delay the
* quorum decision until we see it heartbeating. the hold will be droped in
* hb_up or hb_down. it might be perpetuated by con_err until hb_down. if
- * it's already heartbeating we we might be dropping a hold that conn_up got.
+ * it's already heartbeating we might be dropping a hold that conn_up got.
* */
void o2quo_conn_up(u8 node)
{
diff --git a/fs/ocfs2/cluster/sys.c b/fs/ocfs2/cluster/sys.c
index a4b07730b2e..b7f57271d49 100644
--- a/fs/ocfs2/cluster/sys.c
+++ b/fs/ocfs2/cluster/sys.c
@@ -41,7 +41,7 @@ static ssize_t version_show(struct kobject *kobj, struct kobj_attribute *attr,
return snprintf(buf, PAGE_SIZE, "%u\n", O2NM_API_VERSION);
}
static struct kobj_attribute attr_version =
- __ATTR(interface_revision, S_IFREG | S_IRUGO, version_show, NULL);
+ __ATTR(interface_revision, S_IRUGO, version_show, NULL);
static struct attribute *o2cb_attrs[] = {
&attr_version.attr,
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index 1bfe8802cc1..681691bc233 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -108,7 +108,7 @@ static struct rb_root o2net_handler_tree = RB_ROOT;
static struct o2net_node o2net_nodes[O2NM_MAX_NODES];
/* XXX someday we'll need better accounting */
-static struct socket *o2net_listen_sock = NULL;
+static struct socket *o2net_listen_sock;
/*
* listen work is only queued by the listening socket callbacks on the
@@ -137,7 +137,7 @@ static int o2net_sys_err_translations[O2NET_ERR_MAX] =
static void o2net_sc_connect_completed(struct work_struct *work);
static void o2net_rx_until_empty(struct work_struct *work);
static void o2net_shutdown_sc(struct work_struct *work);
-static void o2net_listen_data_ready(struct sock *sk, int bytes);
+static void o2net_listen_data_ready(struct sock *sk);
static void o2net_sc_send_keep_req(struct work_struct *work);
static void o2net_idle_timer(unsigned long data);
static void o2net_sc_postpone_idle(struct o2net_sock_container *sc);
@@ -262,17 +262,17 @@ static void o2net_update_recv_stats(struct o2net_sock_container *sc)
#endif /* CONFIG_OCFS2_FS_STATS */
-static inline int o2net_reconnect_delay(void)
+static inline unsigned int o2net_reconnect_delay(void)
{
return o2nm_single_cluster->cl_reconnect_delay_ms;
}
-static inline int o2net_keepalive_delay(void)
+static inline unsigned int o2net_keepalive_delay(void)
{
return o2nm_single_cluster->cl_keepalive_delay_ms;
}
-static inline int o2net_idle_timeout(void)
+static inline unsigned int o2net_idle_timeout(void)
{
return o2nm_single_cluster->cl_idle_timeout_ms;
}
@@ -304,28 +304,22 @@ static u8 o2net_num_from_nn(struct o2net_node *nn)
static int o2net_prep_nsw(struct o2net_node *nn, struct o2net_status_wait *nsw)
{
- int ret = 0;
-
- do {
- if (!idr_pre_get(&nn->nn_status_idr, GFP_ATOMIC)) {
- ret = -EAGAIN;
- break;
- }
- spin_lock(&nn->nn_lock);
- ret = idr_get_new(&nn->nn_status_idr, nsw, &nsw->ns_id);
- if (ret == 0)
- list_add_tail(&nsw->ns_node_item,
- &nn->nn_status_list);
- spin_unlock(&nn->nn_lock);
- } while (ret == -EAGAIN);
+ int ret;
- if (ret == 0) {
- init_waitqueue_head(&nsw->ns_wq);
- nsw->ns_sys_status = O2NET_ERR_NONE;
- nsw->ns_status = 0;
+ spin_lock(&nn->nn_lock);
+ ret = idr_alloc(&nn->nn_status_idr, nsw, 0, 0, GFP_ATOMIC);
+ if (ret >= 0) {
+ nsw->ns_id = ret;
+ list_add_tail(&nsw->ns_node_item, &nn->nn_status_list);
}
+ spin_unlock(&nn->nn_lock);
+ if (ret < 0)
+ return ret;
- return ret;
+ init_waitqueue_head(&nsw->ns_wq);
+ nsw->ns_sys_status = O2NET_ERR_NONE;
+ nsw->ns_status = 0;
+ return 0;
}
static void o2net_complete_nsw_locked(struct o2net_node *nn,
@@ -412,6 +406,9 @@ static void sc_kref_release(struct kref *kref)
sc->sc_node = NULL;
o2net_debug_del_sc(sc);
+
+ if (sc->sc_page)
+ __free_page(sc->sc_page);
kfree(sc);
}
@@ -546,8 +543,9 @@ static void o2net_set_nn_state(struct o2net_node *nn,
}
if (was_valid && !valid) {
- printk(KERN_NOTICE "o2net: No longer connected to "
- SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc));
+ if (old_sc)
+ printk(KERN_NOTICE "o2net: No longer connected to "
+ SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc));
o2net_complete_nodes_nsw(nn);
}
@@ -599,9 +597,9 @@ static void o2net_set_nn_state(struct o2net_node *nn,
}
/* see o2net_register_callbacks() */
-static void o2net_data_ready(struct sock *sk, int bytes)
+static void o2net_data_ready(struct sock *sk)
{
- void (*ready)(struct sock *sk, int bytes);
+ void (*ready)(struct sock *sk);
read_lock(&sk->sk_callback_lock);
if (sk->sk_user_data) {
@@ -615,7 +613,7 @@ static void o2net_data_ready(struct sock *sk, int bytes)
}
read_unlock(&sk->sk_callback_lock);
- ready(sk, bytes);
+ ready(sk);
}
/* see o2net_register_callbacks() */
@@ -636,19 +634,19 @@ static void o2net_state_change(struct sock *sk)
state_change = sc->sc_state_change;
switch(sk->sk_state) {
- /* ignore connecting sockets as they make progress */
- case TCP_SYN_SENT:
- case TCP_SYN_RECV:
- break;
- case TCP_ESTABLISHED:
- o2net_sc_queue_work(sc, &sc->sc_connect_work);
- break;
- default:
- printk(KERN_INFO "o2net: Connection to " SC_NODEF_FMT
- " shutdown, state %d\n",
- SC_NODEF_ARGS(sc), sk->sk_state);
- o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
- break;
+ /* ignore connecting sockets as they make progress */
+ case TCP_SYN_SENT:
+ case TCP_SYN_RECV:
+ break;
+ case TCP_ESTABLISHED:
+ o2net_sc_queue_work(sc, &sc->sc_connect_work);
+ break;
+ default:
+ printk(KERN_INFO "o2net: Connection to " SC_NODEF_FMT
+ " shutdown, state %d\n",
+ SC_NODEF_ARGS(sc), sk->sk_state);
+ o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
+ break;
}
out:
read_unlock(&sk->sk_callback_lock);
@@ -768,32 +766,32 @@ static struct o2net_msg_handler *
o2net_handler_tree_lookup(u32 msg_type, u32 key, struct rb_node ***ret_p,
struct rb_node **ret_parent)
{
- struct rb_node **p = &o2net_handler_tree.rb_node;
- struct rb_node *parent = NULL;
+ struct rb_node **p = &o2net_handler_tree.rb_node;
+ struct rb_node *parent = NULL;
struct o2net_msg_handler *nmh, *ret = NULL;
int cmp;
- while (*p) {
- parent = *p;
- nmh = rb_entry(parent, struct o2net_msg_handler, nh_node);
+ while (*p) {
+ parent = *p;
+ nmh = rb_entry(parent, struct o2net_msg_handler, nh_node);
cmp = o2net_handler_cmp(nmh, msg_type, key);
- if (cmp < 0)
- p = &(*p)->rb_left;
- else if (cmp > 0)
- p = &(*p)->rb_right;
- else {
+ if (cmp < 0)
+ p = &(*p)->rb_left;
+ else if (cmp > 0)
+ p = &(*p)->rb_right;
+ else {
ret = nmh;
- break;
+ break;
}
- }
+ }
- if (ret_p != NULL)
- *ret_p = p;
- if (ret_parent != NULL)
- *ret_parent = parent;
+ if (ret_p != NULL)
+ *ret_p = p;
+ if (ret_parent != NULL)
+ *ret_parent = parent;
- return ret;
+ return ret;
}
static void o2net_handler_kref_release(struct kref *kref)
@@ -870,7 +868,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
/* we've had some trouble with handlers seemingly vanishing. */
mlog_bug_on_msg(o2net_handler_tree_lookup(msg_type, key, &p,
&parent) == NULL,
- "couldn't find handler we *just* registerd "
+ "couldn't find handler we *just* registered "
"for type %u key %08x\n", msg_type, key);
}
write_unlock(&o2net_handler_lock);
@@ -918,57 +916,30 @@ static struct o2net_msg_handler *o2net_handler_get(u32 msg_type, u32 key)
static int o2net_recv_tcp_msg(struct socket *sock, void *data, size_t len)
{
- int ret;
- mm_segment_t oldfs;
- struct kvec vec = {
- .iov_len = len,
- .iov_base = data,
- };
- struct msghdr msg = {
- .msg_iovlen = 1,
- .msg_iov = (struct iovec *)&vec,
- .msg_flags = MSG_DONTWAIT,
- };
-
- oldfs = get_fs();
- set_fs(get_ds());
- ret = sock_recvmsg(sock, &msg, len, msg.msg_flags);
- set_fs(oldfs);
-
- return ret;
+ struct kvec vec = { .iov_len = len, .iov_base = data, };
+ struct msghdr msg = { .msg_flags = MSG_DONTWAIT, };
+ return kernel_recvmsg(sock, &msg, &vec, 1, len, msg.msg_flags);
}
static int o2net_send_tcp_msg(struct socket *sock, struct kvec *vec,
size_t veclen, size_t total)
{
int ret;
- mm_segment_t oldfs;
- struct msghdr msg = {
- .msg_iov = (struct iovec *)vec,
- .msg_iovlen = veclen,
- };
+ struct msghdr msg;
if (sock == NULL) {
ret = -EINVAL;
goto out;
}
- oldfs = get_fs();
- set_fs(get_ds());
- ret = sock_sendmsg(sock, &msg, total);
- set_fs(oldfs);
- if (ret != total) {
- mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret,
- total);
- if (ret >= 0)
- ret = -EPIPE; /* should be smarter, I bet */
- goto out;
- }
-
- ret = 0;
+ ret = kernel_sendmsg(sock, &msg, vec, veclen, total);
+ if (likely(ret == total))
+ return 0;
+ mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret, total);
+ if (ret >= 0)
+ ret = -EPIPE; /* should be smarter, I bet */
out:
- if (ret < 0)
- mlog(0, "returning error: %d\n", ret);
+ mlog(0, "returning error: %d\n", ret);
return ret;
}
@@ -1165,10 +1136,8 @@ out:
o2net_debug_del_nst(&nst); /* must be before dropping sc and node */
if (sc)
sc_put(sc);
- if (vec)
- kfree(vec);
- if (msg)
- kfree(msg);
+ kfree(vec);
+ kfree(msg);
o2net_complete_nsw(nn, &nsw, 0, 0, 0);
return ret;
}
@@ -1700,13 +1669,12 @@ static void o2net_start_connect(struct work_struct *work)
ret = 0;
out:
- if (ret) {
+ if (ret && sc) {
printk(KERN_NOTICE "o2net: Connect attempt to " SC_NODEF_FMT
" failed with errno %d\n", SC_NODEF_ARGS(sc), ret);
/* 0 err so that another will be queued and attempted
* from set_nn_state */
- if (sc)
- o2net_ensure_shutdown(nn, sc, 0);
+ o2net_ensure_shutdown(nn, sc, 0);
}
if (sc)
sc_put(sc);
@@ -1831,7 +1799,7 @@ int o2net_register_hb_callbacks(void)
/* ------------------------------------------------------------ */
-static int o2net_accept_one(struct socket *sock)
+static int o2net_accept_one(struct socket *sock, int *more)
{
int ret, slen;
struct sockaddr_in sin;
@@ -1842,6 +1810,7 @@ static int o2net_accept_one(struct socket *sock)
struct o2net_node *nn;
BUG_ON(sock == NULL);
+ *more = 0;
ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
sock->sk->sk_protocol, &new_sock);
if (ret)
@@ -1853,6 +1822,7 @@ static int o2net_accept_one(struct socket *sock)
if (ret < 0)
goto out;
+ *more = 1;
new_sock->sk->sk_allocation = GFP_ATOMIC;
ret = o2net_set_nodelay(new_sock);
@@ -1878,12 +1848,16 @@ static int o2net_accept_one(struct socket *sock)
if (o2nm_this_node() >= node->nd_num) {
local_node = o2nm_get_node_by_num(o2nm_this_node());
- printk(KERN_NOTICE "o2net: Unexpected connect attempt seen "
- "at node '%s' (%u, %pI4:%d) from node '%s' (%u, "
- "%pI4:%d)\n", local_node->nd_name, local_node->nd_num,
- &(local_node->nd_ipv4_address),
- ntohs(local_node->nd_ipv4_port), node->nd_name,
- node->nd_num, &sin.sin_addr.s_addr, ntohs(sin.sin_port));
+ if (local_node)
+ printk(KERN_NOTICE "o2net: Unexpected connect attempt "
+ "seen at node '%s' (%u, %pI4:%d) from "
+ "node '%s' (%u, %pI4:%d)\n",
+ local_node->nd_name, local_node->nd_num,
+ &(local_node->nd_ipv4_address),
+ ntohs(local_node->nd_ipv4_port),
+ node->nd_name,
+ node->nd_num, &sin.sin_addr.s_addr,
+ ntohs(sin.sin_port));
ret = -EINVAL;
goto out;
}
@@ -1947,16 +1921,41 @@ out:
return ret;
}
+/*
+ * This function is invoked in response to one or more
+ * pending accepts at softIRQ level. We must drain the
+ * entire que before returning.
+ */
+
static void o2net_accept_many(struct work_struct *work)
{
struct socket *sock = o2net_listen_sock;
- while (o2net_accept_one(sock) == 0)
+ int more;
+ int err;
+
+ /*
+ * It is critical to note that due to interrupt moderation
+ * at the network driver level, we can't assume to get a
+ * softIRQ for every single conn since tcp SYN packets
+ * can arrive back-to-back, and therefore many pending
+ * accepts may result in just 1 softIRQ. If we terminate
+ * the o2net_accept_one() loop upon seeing an err, what happens
+ * to the rest of the conns in the queue? If no new SYN
+ * arrives for hours, no softIRQ will be delivered,
+ * and the connections will just sit in the queue.
+ */
+
+ for (;;) {
+ err = o2net_accept_one(sock, &more);
+ if (!more)
+ break;
cond_resched();
+ }
}
-static void o2net_listen_data_ready(struct sock *sk, int bytes)
+static void o2net_listen_data_ready(struct sock *sk)
{
- void (*ready)(struct sock *sk, int bytes);
+ void (*ready)(struct sock *sk);
read_lock(&sk->sk_callback_lock);
ready = sk->sk_user_data;
@@ -1965,18 +1964,29 @@ static void o2net_listen_data_ready(struct sock *sk, int bytes)
goto out;
}
- /* ->sk_data_ready is also called for a newly established child socket
- * before it has been accepted and the acceptor has set up their
- * data_ready.. we only want to queue listen work for our listening
- * socket */
+ /* This callback may called twice when a new connection
+ * is being established as a child socket inherits everything
+ * from a parent LISTEN socket, including the data_ready cb of
+ * the parent. This leads to a hazard. In o2net_accept_one()
+ * we are still initializing the child socket but have not
+ * changed the inherited data_ready callback yet when
+ * data starts arriving.
+ * We avoid this hazard by checking the state.
+ * For the listening socket, the state will be TCP_LISTEN; for the new
+ * socket, will be TCP_ESTABLISHED. Also, in this case,
+ * sk->sk_user_data is not a valid function pointer.
+ */
+
if (sk->sk_state == TCP_LISTEN) {
- mlog(ML_TCP, "bytes: %d\n", bytes);
queue_work(o2net_wq, &o2net_listen_work);
+ } else {
+ ready = NULL;
}
out:
read_unlock(&sk->sk_callback_lock);
- ready(sk, bytes);
+ if (ready != NULL)
+ ready(sk);
}
static int o2net_open_listening_sock(__be32 addr, __be16 port)
diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h
index 4cbcb65784a..dc024367110 100644
--- a/fs/ocfs2/cluster/tcp_internal.h
+++ b/fs/ocfs2/cluster/tcp_internal.h
@@ -165,7 +165,7 @@ struct o2net_sock_container {
/* original handlers for the sockets */
void (*sc_state_change)(struct sock *sk);
- void (*sc_data_ready)(struct sock *sk, int bytes);
+ void (*sc_data_ready)(struct sock *sk);
u32 sc_msg_key;
u16 sc_msg_type;
diff --git a/fs/ocfs2/cluster/ver.c b/fs/ocfs2/cluster/ver.c
deleted file mode 100644
index a56eee6abad..00000000000
--- a/fs/ocfs2/cluster/ver.c
+++ /dev/null
@@ -1,42 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; -*-
- * vim: noexpandtab sw=8 ts=8 sts=0:
- *
- * ver.c
- *
- * version string
- *
- * Copyright (C) 2002, 2005 Oracle. All rights reserved.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public
- * License along with this program; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 021110-1307, USA.
- */
-
-#include <linux/module.h>
-#include <linux/kernel.h>
-
-#include "ver.h"
-
-#define CLUSTER_BUILD_VERSION "1.5.0"
-
-#define VERSION_STR "OCFS2 Node Manager " CLUSTER_BUILD_VERSION
-
-void cluster_print_version(void)
-{
- printk(KERN_INFO "%s\n", VERSION_STR);
-}
-
-MODULE_DESCRIPTION(VERSION_STR);
-
-MODULE_VERSION(CLUSTER_BUILD_VERSION);
diff --git a/fs/ocfs2/cluster/ver.h b/fs/ocfs2/cluster/ver.h
deleted file mode 100644
index 32554c3382c..00000000000
--- a/fs/ocfs2/cluster/ver.h
+++ /dev/null
@@ -1,31 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; -*-
- * vim: noexpandtab sw=8 ts=8 sts=0:
- *
- * ver.h
- *
- * Function prototypes
- *
- * Copyright (C) 2005 Oracle. All rights reserved.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public
- * License along with this program; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 021110-1307, USA.
- */
-
-#ifndef O2CLUSTER_VER_H
-#define O2CLUSTER_VER_H
-
-void cluster_print_version(void);
-
-#endif /* O2CLUSTER_VER_H */