diff options
Diffstat (limited to 'fs/ocfs2/cluster')
| -rw-r--r-- | fs/ocfs2/cluster/Makefile | 2 | ||||
| -rw-r--r-- | fs/ocfs2/cluster/heartbeat.c | 139 | ||||
| -rw-r--r-- | fs/ocfs2/cluster/masklog.h | 3 | ||||
| -rw-r--r-- | fs/ocfs2/cluster/nodemanager.c | 4 | ||||
| -rw-r--r-- | fs/ocfs2/cluster/quorum.c | 4 | ||||
| -rw-r--r-- | fs/ocfs2/cluster/sys.c | 2 | ||||
| -rw-r--r-- | fs/ocfs2/cluster/tcp.c | 254 | ||||
| -rw-r--r-- | fs/ocfs2/cluster/tcp_internal.h | 2 | ||||
| -rw-r--r-- | fs/ocfs2/cluster/ver.c | 42 | ||||
| -rw-r--r-- | fs/ocfs2/cluster/ver.h | 31 |
10 files changed, 200 insertions, 283 deletions
diff --git a/fs/ocfs2/cluster/Makefile b/fs/ocfs2/cluster/Makefile index bc8c5e7d860..1aefc0350ec 100644 --- a/fs/ocfs2/cluster/Makefile +++ b/fs/ocfs2/cluster/Makefile @@ -1,4 +1,4 @@ obj-$(CONFIG_OCFS2_FS) += ocfs2_nodemanager.o ocfs2_nodemanager-objs := heartbeat.o masklog.o sys.o nodemanager.o \ - quorum.o tcp.o netdebug.o ver.o + quorum.o tcp.o netdebug.o diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c index a4e855e3690..73039295d0d 100644 --- a/fs/ocfs2/cluster/heartbeat.c +++ b/fs/ocfs2/cluster/heartbeat.c @@ -35,6 +35,7 @@ #include <linux/time.h> #include <linux/debugfs.h> #include <linux/slab.h> +#include <linux/bitmap.h> #include "heartbeat.h" #include "tcp.h" @@ -176,7 +177,7 @@ static void o2hb_dead_threshold_set(unsigned int threshold) } } -static int o2hb_global_hearbeat_mode_set(unsigned int hb_mode) +static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode) { int ret = -1; @@ -282,15 +283,6 @@ struct o2hb_bio_wait_ctxt { int wc_error; }; -static int o2hb_pop_count(void *map, int count) -{ - int i = -1, pop = 0; - - while ((i = find_next_bit(map, count, i + 1)) < count) - pop++; - return pop; -} - static void o2hb_write_timeout(struct work_struct *work) { int failed, quorum; @@ -307,9 +299,9 @@ static void o2hb_write_timeout(struct work_struct *work) spin_lock_irqsave(&o2hb_live_lock, flags); if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) set_bit(reg->hr_region_num, o2hb_failed_region_bitmap); - failed = o2hb_pop_count(&o2hb_failed_region_bitmap, + failed = bitmap_weight(o2hb_failed_region_bitmap, O2NM_MAX_REGIONS); - quorum = o2hb_pop_count(&o2hb_quorum_region_bitmap, + quorum = bitmap_weight(o2hb_quorum_region_bitmap, O2NM_MAX_REGIONS); spin_unlock_irqrestore(&o2hb_live_lock, flags); @@ -421,7 +413,7 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, } /* Must put everything in 512 byte sectors for the bio... */ - bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9); + bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9); bio->bi_bdev = reg->hr_bdev; bio->bi_private = wc; bio->bi_end_io = o2hb_bio_end_io; @@ -500,7 +492,7 @@ static int o2hb_issue_node_write(struct o2hb_region *reg, } atomic_inc(&write_wc->wc_num_reqs); - submit_bio(WRITE, bio); + submit_bio(WRITE_SYNC, bio); status = 0; bail: @@ -628,11 +620,9 @@ static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, struct o2nm_node *node, int idx) { - struct list_head *iter; struct o2hb_callback_func *f; - list_for_each(iter, &hbcall->list) { - f = list_entry(iter, struct o2hb_callback_func, hc_item); + list_for_each_entry(f, &hbcall->list, hc_item) { mlog(ML_HEARTBEAT, "calling funcs %p\n", f); (f->hc_func)(node, idx, f->hc_data); } @@ -641,16 +631,9 @@ static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, /* Will run the list in order until we process the passed event */ static void o2hb_run_event_list(struct o2hb_node_event *queued_event) { - int empty; struct o2hb_callback *hbcall; struct o2hb_node_event *event; - spin_lock(&o2hb_live_lock); - empty = list_empty(&queued_event->hn_item); - spin_unlock(&o2hb_live_lock); - if (empty) - return; - /* Holding callback sem assures we don't alter the callback * lists when doing this, and serializes ourselves with other * processes wanting callbacks. */ @@ -709,6 +692,7 @@ static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) struct o2hb_node_event event = { .hn_item = LIST_HEAD_INIT(event.hn_item), }; struct o2nm_node *node; + int queued = 0; node = o2nm_get_node_by_num(slot->ds_node_num); if (!node) @@ -726,11 +710,13 @@ static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, slot->ds_node_num); + queued = 1; } } spin_unlock(&o2hb_live_lock); - o2hb_run_event_list(&event); + if (queued) + o2hb_run_event_list(&event); o2nm_node_put(node); } @@ -771,7 +757,7 @@ static void o2hb_set_quorum_device(struct o2hb_region *reg) * If global heartbeat active, unpin all regions if the * region count > CUT_OFF */ - if (o2hb_pop_count(&o2hb_quorum_region_bitmap, + if (bitmap_weight(o2hb_quorum_region_bitmap, O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF) o2hb_region_unpin(NULL); unlock: @@ -790,6 +776,7 @@ static int o2hb_check_slot(struct o2hb_region *reg, unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS; unsigned int slot_dead_ms; int tmp; + int queued = 0; memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); @@ -883,6 +870,7 @@ fire_callbacks: slot->ds_node_num); changed = 1; + queued = 1; } list_add_tail(&slot->ds_live_item, @@ -934,6 +922,7 @@ fire_callbacks: node, slot->ds_node_num); changed = 1; + queued = 1; } /* We don't clear this because the node is still @@ -949,30 +938,17 @@ fire_callbacks: out: spin_unlock(&o2hb_live_lock); - o2hb_run_event_list(&event); + if (queued) + o2hb_run_event_list(&event); if (node) o2nm_node_put(node); return changed; } -/* This could be faster if we just implmented a find_last_bit, but I - * don't think the circumstances warrant it. */ -static int o2hb_highest_node(unsigned long *nodes, - int numbits) +static int o2hb_highest_node(unsigned long *nodes, int numbits) { - int highest, node; - - highest = numbits; - node = -1; - while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) { - if (node >= numbits) - break; - - highest = node; - } - - return highest; + return find_last_bit(nodes, numbits); } static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) @@ -1131,7 +1107,7 @@ static int o2hb_thread(void *data) mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n"); - set_user_nice(current, -20); + set_user_nice(current, MIN_NICE); /* Pin node */ o2nm_depend_this_node(); @@ -1471,8 +1447,7 @@ static void o2hb_region_release(struct config_item *item) mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name); - if (reg->hr_tmp_block) - kfree(reg->hr_tmp_block); + kfree(reg->hr_tmp_block); if (reg->hr_slot_data) { for (i = 0; i < reg->hr_num_pages; i++) { @@ -1486,8 +1461,7 @@ static void o2hb_region_release(struct config_item *item) if (reg->hr_bdev) blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE); - if (reg->hr_slots) - kfree(reg->hr_slots); + kfree(reg->hr_slots); kfree(reg->hr_db_regnum); kfree(reg->hr_db_livenodes); @@ -1746,8 +1720,8 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, long fd; int sectsize; char *p = (char *)page; - struct file *filp = NULL; - struct inode *inode = NULL; + struct fd f; + struct inode *inode; ssize_t ret = -EINVAL; int live_threshold; @@ -1766,26 +1740,26 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, if (fd < 0 || fd >= INT_MAX) goto out; - filp = fget(fd); - if (filp == NULL) + f = fdget(fd); + if (f.file == NULL) goto out; if (reg->hr_blocks == 0 || reg->hr_start_block == 0 || reg->hr_block_bytes == 0) - goto out; + goto out2; - inode = igrab(filp->f_mapping->host); + inode = igrab(f.file->f_mapping->host); if (inode == NULL) - goto out; + goto out2; if (!S_ISBLK(inode->i_mode)) - goto out; + goto out3; - reg->hr_bdev = I_BDEV(filp->f_mapping->host); + reg->hr_bdev = I_BDEV(f.file->f_mapping->host); ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL); if (ret) { reg->hr_bdev = NULL; - goto out; + goto out3; } inode = NULL; @@ -1797,7 +1771,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, "blocksize %u incorrect for device, expected %d", reg->hr_block_bytes, sectsize); ret = -EINVAL; - goto out; + goto out3; } o2hb_init_region_params(reg); @@ -1811,13 +1785,13 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, ret = o2hb_map_slot_data(reg); if (ret) { mlog_errno(ret); - goto out; + goto out3; } ret = o2hb_populate_slot_data(reg); if (ret) { mlog_errno(ret); - goto out; + goto out3; } INIT_DELAYED_WORK(®->hr_write_timeout_work, o2hb_write_timeout); @@ -1833,7 +1807,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, live_threshold = O2HB_LIVE_THRESHOLD; if (o2hb_global_heartbeat_active()) { spin_lock(&o2hb_live_lock); - if (o2hb_pop_count(&o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1) + if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1) live_threshold <<= 1; spin_unlock(&o2hb_live_lock); } @@ -1847,7 +1821,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, if (IS_ERR(hb_task)) { ret = PTR_ERR(hb_task); mlog_errno(ret); - goto out; + goto out3; } spin_lock(&o2hb_live_lock); @@ -1863,7 +1837,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, if (reg->hr_aborted_start) { ret = -EIO; - goto out; + goto out3; } /* Ok, we were woken. Make sure it wasn't by drop_item() */ @@ -1882,11 +1856,11 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n", config_item_name(®->hr_item), reg->hr_dev_name); +out3: + iput(inode); +out2: + fdput(f); out: - if (filp) - fput(filp); - if (inode) - iput(inode); if (ret < 0) { if (reg->hr_bdev) { blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE); @@ -2184,7 +2158,7 @@ static void o2hb_heartbeat_group_drop_item(struct config_group *group, if (!o2hb_dependent_users) goto unlock; - if (o2hb_pop_count(&o2hb_quorum_region_bitmap, + if (bitmap_weight(o2hb_quorum_region_bitmap, O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF) o2hb_region_pin(NULL); @@ -2273,7 +2247,7 @@ ssize_t o2hb_heartbeat_group_mode_store(struct o2hb_heartbeat_group *group, if (strnicmp(page, o2hb_heartbeat_mode_desc[i], len)) continue; - ret = o2hb_global_hearbeat_mode_set(i); + ret = o2hb_global_heartbeat_mode_set(i); if (!ret) printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n", o2hb_heartbeat_mode_desc[i]); @@ -2306,7 +2280,7 @@ static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = { NULL, }; -static struct configfs_item_operations o2hb_hearbeat_group_item_ops = { +static struct configfs_item_operations o2hb_heartbeat_group_item_ops = { .show_attribute = o2hb_heartbeat_group_show, .store_attribute = o2hb_heartbeat_group_store, }; @@ -2318,7 +2292,7 @@ static struct configfs_group_operations o2hb_heartbeat_group_group_ops = { static struct config_item_type o2hb_heartbeat_group_type = { .ct_group_ops = &o2hb_heartbeat_group_group_ops, - .ct_item_ops = &o2hb_hearbeat_group_item_ops, + .ct_item_ops = &o2hb_heartbeat_group_item_ops, .ct_attrs = o2hb_heartbeat_group_attrs, .ct_owner = THIS_MODULE, }; @@ -2391,6 +2365,9 @@ static int o2hb_region_pin(const char *region_uuid) assert_spin_locked(&o2hb_live_lock); list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) { + if (reg->hr_item_dropped) + continue; + uuid = config_item_name(®->hr_item); /* local heartbeat */ @@ -2441,6 +2418,9 @@ static void o2hb_region_unpin(const char *region_uuid) assert_spin_locked(&o2hb_live_lock); list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) { + if (reg->hr_item_dropped) + continue; + uuid = config_item_name(®->hr_item); if (region_uuid) { if (strcmp(region_uuid, uuid)) @@ -2478,7 +2458,7 @@ static int o2hb_region_inc_user(const char *region_uuid) if (o2hb_dependent_users > 1) goto unlock; - if (o2hb_pop_count(&o2hb_quorum_region_bitmap, + if (bitmap_weight(o2hb_quorum_region_bitmap, O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF) ret = o2hb_region_pin(NULL); @@ -2512,8 +2492,7 @@ unlock: int o2hb_register_callback(const char *region_uuid, struct o2hb_callback_func *hc) { - struct o2hb_callback_func *tmp; - struct list_head *iter; + struct o2hb_callback_func *f; struct o2hb_callback *hbcall; int ret; @@ -2536,10 +2515,9 @@ int o2hb_register_callback(const char *region_uuid, down_write(&o2hb_callback_sem); - list_for_each(iter, &hbcall->list) { - tmp = list_entry(iter, struct o2hb_callback_func, hc_item); - if (hc->hc_priority < tmp->hc_priority) { - list_add_tail(&hc->hc_item, iter); + list_for_each_entry(f, &hbcall->list, hc_item) { + if (hc->hc_priority < f->hc_priority) { + list_add_tail(&hc->hc_item, &f->hc_item); break; } } @@ -2656,6 +2634,9 @@ int o2hb_get_all_regions(char *region_uuids, u8 max_regions) p = region_uuids; list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) { + if (reg->hr_item_dropped) + continue; + mlog(0, "Region: %s\n", config_item_name(®->hr_item)); if (numregs < max_regions) { memcpy(p, config_item_name(®->hr_item), diff --git a/fs/ocfs2/cluster/masklog.h b/fs/ocfs2/cluster/masklog.h index baa2b9ef7ee..2260fb9e650 100644 --- a/fs/ocfs2/cluster/masklog.h +++ b/fs/ocfs2/cluster/masklog.h @@ -199,7 +199,8 @@ extern struct mlog_bits mlog_and_bits, mlog_not_bits; #define mlog_errno(st) do { \ int _st = (st); \ if (_st != -ERESTARTSYS && _st != -EINTR && \ - _st != AOP_TRUNCATED_PAGE && _st != -ENOSPC) \ + _st != AOP_TRUNCATED_PAGE && _st != -ENOSPC && \ + _st != -EDQUOT) \ mlog(ML_ERROR, "status = %lld\n", (long long)_st); \ } while (0) diff --git a/fs/ocfs2/cluster/nodemanager.c b/fs/ocfs2/cluster/nodemanager.c index bb240647ca5..441c84e169e 100644 --- a/fs/ocfs2/cluster/nodemanager.c +++ b/fs/ocfs2/cluster/nodemanager.c @@ -29,7 +29,6 @@ #include "heartbeat.h" #include "masklog.h" #include "sys.h" -#include "ver.h" /* for now we operate under the assertion that there can be only one * cluster active at a time. Changing this will require trickling @@ -945,8 +944,6 @@ static int __init init_o2nm(void) { int ret = -1; - cluster_print_version(); - ret = o2hb_init(); if (ret) goto out; @@ -984,6 +981,7 @@ out: MODULE_AUTHOR("Oracle"); MODULE_LICENSE("GPL"); +MODULE_DESCRIPTION("OCFS2 cluster management"); module_init(init_o2nm) module_exit(exit_o2nm) diff --git a/fs/ocfs2/cluster/quorum.c b/fs/ocfs2/cluster/quorum.c index 8f9cea1597a..1ec141e758d 100644 --- a/fs/ocfs2/cluster/quorum.c +++ b/fs/ocfs2/cluster/quorum.c @@ -264,7 +264,7 @@ void o2quo_hb_still_up(u8 node) /* This is analogous to hb_up. as a node's connection comes up we delay the * quorum decision until we see it heartbeating. the hold will be droped in * hb_up or hb_down. it might be perpetuated by con_err until hb_down. if - * it's already heartbeating we we might be dropping a hold that conn_up got. + * it's already heartbeating we might be dropping a hold that conn_up got. * */ void o2quo_conn_up(u8 node) { @@ -327,5 +327,5 @@ void o2quo_exit(void) { struct o2quo_state *qs = &o2quo_state; - flush_work_sync(&qs->qs_work); + flush_work(&qs->qs_work); } diff --git a/fs/ocfs2/cluster/sys.c b/fs/ocfs2/cluster/sys.c index a4b07730b2e..b7f57271d49 100644 --- a/fs/ocfs2/cluster/sys.c +++ b/fs/ocfs2/cluster/sys.c @@ -41,7 +41,7 @@ static ssize_t version_show(struct kobject *kobj, struct kobj_attribute *attr, return snprintf(buf, PAGE_SIZE, "%u\n", O2NM_API_VERSION); } static struct kobj_attribute attr_version = - __ATTR(interface_revision, S_IFREG | S_IRUGO, version_show, NULL); + __ATTR(interface_revision, S_IRUGO, version_show, NULL); static struct attribute *o2cb_attrs[] = { &attr_version.attr, diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c index 044e7b58d31..681691bc233 100644 --- a/fs/ocfs2/cluster/tcp.c +++ b/fs/ocfs2/cluster/tcp.c @@ -108,7 +108,7 @@ static struct rb_root o2net_handler_tree = RB_ROOT; static struct o2net_node o2net_nodes[O2NM_MAX_NODES]; /* XXX someday we'll need better accounting */ -static struct socket *o2net_listen_sock = NULL; +static struct socket *o2net_listen_sock; /* * listen work is only queued by the listening socket callbacks on the @@ -137,7 +137,7 @@ static int o2net_sys_err_translations[O2NET_ERR_MAX] = static void o2net_sc_connect_completed(struct work_struct *work); static void o2net_rx_until_empty(struct work_struct *work); static void o2net_shutdown_sc(struct work_struct *work); -static void o2net_listen_data_ready(struct sock *sk, int bytes); +static void o2net_listen_data_ready(struct sock *sk); static void o2net_sc_send_keep_req(struct work_struct *work); static void o2net_idle_timer(unsigned long data); static void o2net_sc_postpone_idle(struct o2net_sock_container *sc); @@ -262,17 +262,17 @@ static void o2net_update_recv_stats(struct o2net_sock_container *sc) #endif /* CONFIG_OCFS2_FS_STATS */ -static inline int o2net_reconnect_delay(void) +static inline unsigned int o2net_reconnect_delay(void) { return o2nm_single_cluster->cl_reconnect_delay_ms; } -static inline int o2net_keepalive_delay(void) +static inline unsigned int o2net_keepalive_delay(void) { return o2nm_single_cluster->cl_keepalive_delay_ms; } -static inline int o2net_idle_timeout(void) +static inline unsigned int o2net_idle_timeout(void) { return o2nm_single_cluster->cl_idle_timeout_ms; } @@ -304,28 +304,22 @@ static u8 o2net_num_from_nn(struct o2net_node *nn) static int o2net_prep_nsw(struct o2net_node *nn, struct o2net_status_wait *nsw) { - int ret = 0; - - do { - if (!idr_pre_get(&nn->nn_status_idr, GFP_ATOMIC)) { - ret = -EAGAIN; - break; - } - spin_lock(&nn->nn_lock); - ret = idr_get_new(&nn->nn_status_idr, nsw, &nsw->ns_id); - if (ret == 0) - list_add_tail(&nsw->ns_node_item, - &nn->nn_status_list); - spin_unlock(&nn->nn_lock); - } while (ret == -EAGAIN); + int ret; - if (ret == 0) { - init_waitqueue_head(&nsw->ns_wq); - nsw->ns_sys_status = O2NET_ERR_NONE; - nsw->ns_status = 0; + spin_lock(&nn->nn_lock); + ret = idr_alloc(&nn->nn_status_idr, nsw, 0, 0, GFP_ATOMIC); + if (ret >= 0) { + nsw->ns_id = ret; + list_add_tail(&nsw->ns_node_item, &nn->nn_status_list); } + spin_unlock(&nn->nn_lock); + if (ret < 0) + return ret; - return ret; + init_waitqueue_head(&nsw->ns_wq); + nsw->ns_sys_status = O2NET_ERR_NONE; + nsw->ns_status = 0; + return 0; } static void o2net_complete_nsw_locked(struct o2net_node *nn, @@ -412,6 +406,9 @@ static void sc_kref_release(struct kref *kref) sc->sc_node = NULL; o2net_debug_del_sc(sc); + + if (sc->sc_page) + __free_page(sc->sc_page); kfree(sc); } @@ -546,8 +543,9 @@ static void o2net_set_nn_state(struct o2net_node *nn, } if (was_valid && !valid) { - printk(KERN_NOTICE "o2net: No longer connected to " - SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc)); + if (old_sc) + printk(KERN_NOTICE "o2net: No longer connected to " + SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc)); o2net_complete_nodes_nsw(nn); } @@ -599,9 +597,9 @@ static void o2net_set_nn_state(struct o2net_node *nn, } /* see o2net_register_callbacks() */ -static void o2net_data_ready(struct sock *sk, int bytes) +static void o2net_data_ready(struct sock *sk) { - void (*ready)(struct sock *sk, int bytes); + void (*ready)(struct sock *sk); read_lock(&sk->sk_callback_lock); if (sk->sk_user_data) { @@ -615,7 +613,7 @@ static void o2net_data_ready(struct sock *sk, int bytes) } read_unlock(&sk->sk_callback_lock); - ready(sk, bytes); + ready(sk); } /* see o2net_register_callbacks() */ @@ -636,19 +634,19 @@ static void o2net_state_change(struct sock *sk) state_change = sc->sc_state_change; switch(sk->sk_state) { - /* ignore connecting sockets as they make progress */ - case TCP_SYN_SENT: - case TCP_SYN_RECV: - break; - case TCP_ESTABLISHED: - o2net_sc_queue_work(sc, &sc->sc_connect_work); - break; - default: - printk(KERN_INFO "o2net: Connection to " SC_NODEF_FMT - " shutdown, state %d\n", - SC_NODEF_ARGS(sc), sk->sk_state); - o2net_sc_queue_work(sc, &sc->sc_shutdown_work); - break; + /* ignore connecting sockets as they make progress */ + case TCP_SYN_SENT: + case TCP_SYN_RECV: + break; + case TCP_ESTABLISHED: + o2net_sc_queue_work(sc, &sc->sc_connect_work); + break; + default: + printk(KERN_INFO "o2net: Connection to " SC_NODEF_FMT + " shutdown, state %d\n", + SC_NODEF_ARGS(sc), sk->sk_state); + o2net_sc_queue_work(sc, &sc->sc_shutdown_work); + break; } out: read_unlock(&sk->sk_callback_lock); @@ -768,32 +766,32 @@ static struct o2net_msg_handler * o2net_handler_tree_lookup(u32 msg_type, u32 key, struct rb_node ***ret_p, struct rb_node **ret_parent) { - struct rb_node **p = &o2net_handler_tree.rb_node; - struct rb_node *parent = NULL; + struct rb_node **p = &o2net_handler_tree.rb_node; + struct rb_node *parent = NULL; struct o2net_msg_handler *nmh, *ret = NULL; int cmp; - while (*p) { - parent = *p; - nmh = rb_entry(parent, struct o2net_msg_handler, nh_node); + while (*p) { + parent = *p; + nmh = rb_entry(parent, struct o2net_msg_handler, nh_node); cmp = o2net_handler_cmp(nmh, msg_type, key); - if (cmp < 0) - p = &(*p)->rb_left; - else if (cmp > 0) - p = &(*p)->rb_right; - else { + if (cmp < 0) + p = &(*p)->rb_left; + else if (cmp > 0) + p = &(*p)->rb_right; + else { ret = nmh; - break; + break; } - } + } - if (ret_p != NULL) - *ret_p = p; - if (ret_parent != NULL) - *ret_parent = parent; + if (ret_p != NULL) + *ret_p = p; + if (ret_parent != NULL) + *ret_parent = parent; - return ret; + return ret; } static void o2net_handler_kref_release(struct kref *kref) @@ -870,7 +868,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, /* we've had some trouble with handlers seemingly vanishing. */ mlog_bug_on_msg(o2net_handler_tree_lookup(msg_type, key, &p, &parent) == NULL, - "couldn't find handler we *just* registerd " + "couldn't find handler we *just* registered " "for type %u key %08x\n", msg_type, key); } write_unlock(&o2net_handler_lock); @@ -918,57 +916,30 @@ static struct o2net_msg_handler *o2net_handler_get(u32 msg_type, u32 key) static int o2net_recv_tcp_msg(struct socket *sock, void *data, size_t len) { - int ret; - mm_segment_t oldfs; - struct kvec vec = { - .iov_len = len, - .iov_base = data, - }; - struct msghdr msg = { - .msg_iovlen = 1, - .msg_iov = (struct iovec *)&vec, - .msg_flags = MSG_DONTWAIT, - }; - - oldfs = get_fs(); - set_fs(get_ds()); - ret = sock_recvmsg(sock, &msg, len, msg.msg_flags); - set_fs(oldfs); - - return ret; + struct kvec vec = { .iov_len = len, .iov_base = data, }; + struct msghdr msg = { .msg_flags = MSG_DONTWAIT, }; + return kernel_recvmsg(sock, &msg, &vec, 1, len, msg.msg_flags); } static int o2net_send_tcp_msg(struct socket *sock, struct kvec *vec, size_t veclen, size_t total) { int ret; - mm_segment_t oldfs; - struct msghdr msg = { - .msg_iov = (struct iovec *)vec, - .msg_iovlen = veclen, - }; + struct msghdr msg; if (sock == NULL) { ret = -EINVAL; goto out; } - oldfs = get_fs(); - set_fs(get_ds()); - ret = sock_sendmsg(sock, &msg, total); - set_fs(oldfs); - if (ret != total) { - mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret, - total); - if (ret >= 0) - ret = -EPIPE; /* should be smarter, I bet */ - goto out; - } - - ret = 0; + ret = kernel_sendmsg(sock, &msg, vec, veclen, total); + if (likely(ret == total)) + return 0; + mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret, total); + if (ret >= 0) + ret = -EPIPE; /* should be smarter, I bet */ out: - if (ret < 0) - mlog(0, "returning error: %d\n", ret); + mlog(0, "returning error: %d\n", ret); return ret; } @@ -1165,10 +1136,8 @@ out: o2net_debug_del_nst(&nst); /* must be before dropping sc and node */ if (sc) sc_put(sc); - if (vec) - kfree(vec); - if (msg) - kfree(msg); + kfree(vec); + kfree(msg); o2net_complete_nsw(nn, &nsw, 0, 0, 0); return ret; } @@ -1700,13 +1669,12 @@ static void o2net_start_connect(struct work_struct *work) ret = 0; out: - if (ret) { + if (ret && sc) { printk(KERN_NOTICE "o2net: Connect attempt to " SC_NODEF_FMT " failed with errno %d\n", SC_NODEF_ARGS(sc), ret); /* 0 err so that another will be queued and attempted * from set_nn_state */ - if (sc) - o2net_ensure_shutdown(nn, sc, 0); + o2net_ensure_shutdown(nn, sc, 0); } if (sc) sc_put(sc); @@ -1831,7 +1799,7 @@ int o2net_register_hb_callbacks(void) /* ------------------------------------------------------------ */ -static int o2net_accept_one(struct socket *sock) +static int o2net_accept_one(struct socket *sock, int *more) { int ret, slen; struct sockaddr_in sin; @@ -1842,6 +1810,7 @@ static int o2net_accept_one(struct socket *sock) struct o2net_node *nn; BUG_ON(sock == NULL); + *more = 0; ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type, sock->sk->sk_protocol, &new_sock); if (ret) @@ -1853,6 +1822,7 @@ static int o2net_accept_one(struct socket *sock) if (ret < 0) goto out; + *more = 1; new_sock->sk->sk_allocation = GFP_ATOMIC; ret = o2net_set_nodelay(new_sock); @@ -1878,12 +1848,16 @@ static int o2net_accept_one(struct socket *sock) if (o2nm_this_node() >= node->nd_num) { local_node = o2nm_get_node_by_num(o2nm_this_node()); - printk(KERN_NOTICE "o2net: Unexpected connect attempt seen " - "at node '%s' (%u, %pI4:%d) from node '%s' (%u, " - "%pI4:%d)\n", local_node->nd_name, local_node->nd_num, - &(local_node->nd_ipv4_address), - ntohs(local_node->nd_ipv4_port), node->nd_name, - node->nd_num, &sin.sin_addr.s_addr, ntohs(sin.sin_port)); + if (local_node) + printk(KERN_NOTICE "o2net: Unexpected connect attempt " + "seen at node '%s' (%u, %pI4:%d) from " + "node '%s' (%u, %pI4:%d)\n", + local_node->nd_name, local_node->nd_num, + &(local_node->nd_ipv4_address), + ntohs(local_node->nd_ipv4_port), + node->nd_name, + node->nd_num, &sin.sin_addr.s_addr, + ntohs(sin.sin_port)); ret = -EINVAL; goto out; } @@ -1947,16 +1921,41 @@ out: return ret; } +/* + * This function is invoked in response to one or more + * pending accepts at softIRQ level. We must drain the + * entire que before returning. + */ + static void o2net_accept_many(struct work_struct *work) { struct socket *sock = o2net_listen_sock; - while (o2net_accept_one(sock) == 0) + int more; + int err; + + /* + * It is critical to note that due to interrupt moderation + * at the network driver level, we can't assume to get a + * softIRQ for every single conn since tcp SYN packets + * can arrive back-to-back, and therefore many pending + * accepts may result in just 1 softIRQ. If we terminate + * the o2net_accept_one() loop upon seeing an err, what happens + * to the rest of the conns in the queue? If no new SYN + * arrives for hours, no softIRQ will be delivered, + * and the connections will just sit in the queue. + */ + + for (;;) { + err = o2net_accept_one(sock, &more); + if (!more) + break; cond_resched(); + } } -static void o2net_listen_data_ready(struct sock *sk, int bytes) +static void o2net_listen_data_ready(struct sock *sk) { - void (*ready)(struct sock *sk, int bytes); + void (*ready)(struct sock *sk); read_lock(&sk->sk_callback_lock); ready = sk->sk_user_data; @@ -1965,18 +1964,29 @@ static void o2net_listen_data_ready(struct sock *sk, int bytes) goto out; } - /* ->sk_data_ready is also called for a newly established child socket - * before it has been accepted and the acceptor has set up their - * data_ready.. we only want to queue listen work for our listening - * socket */ + /* This callback may called twice when a new connection + * is being established as a child socket inherits everything + * from a parent LISTEN socket, including the data_ready cb of + * the parent. This leads to a hazard. In o2net_accept_one() + * we are still initializing the child socket but have not + * changed the inherited data_ready callback yet when + * data starts arriving. + * We avoid this hazard by checking the state. + * For the listening socket, the state will be TCP_LISTEN; for the new + * socket, will be TCP_ESTABLISHED. Also, in this case, + * sk->sk_user_data is not a valid function pointer. + */ + if (sk->sk_state == TCP_LISTEN) { - mlog(ML_TCP, "bytes: %d\n", bytes); queue_work(o2net_wq, &o2net_listen_work); + } else { + ready = NULL; } out: read_unlock(&sk->sk_callback_lock); - ready(sk, bytes); + if (ready != NULL) + ready(sk); } static int o2net_open_listening_sock(__be32 addr, __be16 port) @@ -2005,7 +2015,7 @@ static int o2net_open_listening_sock(__be32 addr, __be16 port) o2net_listen_sock = sock; INIT_WORK(&o2net_listen_work, o2net_accept_many); - sock->sk->sk_reuse = 1; + sock->sk->sk_reuse = SK_CAN_REUSE; ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); if (ret < 0) { printk(KERN_ERR "o2net: Error %d while binding socket at " diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h index 4cbcb65784a..dc024367110 100644 --- a/fs/ocfs2/cluster/tcp_internal.h +++ b/fs/ocfs2/cluster/tcp_internal.h @@ -165,7 +165,7 @@ struct o2net_sock_container { /* original handlers for the sockets */ void (*sc_state_change)(struct sock *sk); - void (*sc_data_ready)(struct sock *sk, int bytes); + void (*sc_data_ready)(struct sock *sk); u32 sc_msg_key; u16 sc_msg_type; diff --git a/fs/ocfs2/cluster/ver.c b/fs/ocfs2/cluster/ver.c deleted file mode 100644 index a56eee6abad..00000000000 --- a/fs/ocfs2/cluster/ver.c +++ /dev/null @@ -1,42 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; -*- - * vim: noexpandtab sw=8 ts=8 sts=0: - * - * ver.c - * - * version string - * - * Copyright (C) 2002, 2005 Oracle. All rights reserved. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 021110-1307, USA. - */ - -#include <linux/module.h> -#include <linux/kernel.h> - -#include "ver.h" - -#define CLUSTER_BUILD_VERSION "1.5.0" - -#define VERSION_STR "OCFS2 Node Manager " CLUSTER_BUILD_VERSION - -void cluster_print_version(void) -{ - printk(KERN_INFO "%s\n", VERSION_STR); -} - -MODULE_DESCRIPTION(VERSION_STR); - -MODULE_VERSION(CLUSTER_BUILD_VERSION); diff --git a/fs/ocfs2/cluster/ver.h b/fs/ocfs2/cluster/ver.h deleted file mode 100644 index 32554c3382c..00000000000 --- a/fs/ocfs2/cluster/ver.h +++ /dev/null @@ -1,31 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; -*- - * vim: noexpandtab sw=8 ts=8 sts=0: - * - * ver.h - * - * Function prototypes - * - * Copyright (C) 2005 Oracle. All rights reserved. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 021110-1307, USA. - */ - -#ifndef O2CLUSTER_VER_H -#define O2CLUSTER_VER_H - -void cluster_print_version(void); - -#endif /* O2CLUSTER_VER_H */ |
