diff options
Diffstat (limited to 'fs/ocfs2/stack_o2cb.c')
| -rw-r--r-- | fs/ocfs2/stack_o2cb.c | 124 |
1 files changed, 90 insertions, 34 deletions
diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c index e49c4105026..1724d43d3da 100644 --- a/fs/ocfs2/stack_o2cb.c +++ b/fs/ocfs2/stack_o2cb.c @@ -19,6 +19,7 @@ #include <linux/kernel.h> #include <linux/crc32.h> +#include <linux/slab.h> #include <linux/module.h> /* Needed for AOP_TRUNCATED_PAGE in mlog_errno() */ @@ -27,6 +28,7 @@ #include "cluster/masklog.h" #include "cluster/nodemanager.h" #include "cluster/heartbeat.h" +#include "cluster/tcp.h" #include "stackglue.h" @@ -161,24 +163,23 @@ static int dlm_status_to_errno(enum dlm_status status) static void o2dlm_lock_ast_wrapper(void *astarg) { - BUG_ON(o2cb_stack.sp_proto == NULL); + struct ocfs2_dlm_lksb *lksb = astarg; - o2cb_stack.sp_proto->lp_lock_ast(astarg); + lksb->lksb_conn->cc_proto->lp_lock_ast(lksb); } static void o2dlm_blocking_ast_wrapper(void *astarg, int level) { - BUG_ON(o2cb_stack.sp_proto == NULL); + struct ocfs2_dlm_lksb *lksb = astarg; - o2cb_stack.sp_proto->lp_blocking_ast(astarg, level); + lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level); } static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status) { + struct ocfs2_dlm_lksb *lksb = astarg; int error = dlm_status_to_errno(status); - BUG_ON(o2cb_stack.sp_proto == NULL); - /* * In o2dlm, you can get both the lock_ast() for the lock being * granted and the unlock_ast() for the CANCEL failing. A @@ -193,16 +194,15 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status) if (status == DLM_CANCELGRANT) return; - o2cb_stack.sp_proto->lp_unlock_ast(astarg, error); + lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, error); } static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn, int mode, - union ocfs2_dlm_lksb *lksb, + struct ocfs2_dlm_lksb *lksb, u32 flags, void *name, - unsigned int namelen, - void *astarg) + unsigned int namelen) { enum dlm_status status; int o2dlm_mode = mode_to_o2dlm(mode); @@ -211,28 +211,27 @@ static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn, status = dlmlock(conn->cc_lockspace, o2dlm_mode, &lksb->lksb_o2dlm, o2dlm_flags, name, namelen, - o2dlm_lock_ast_wrapper, astarg, + o2dlm_lock_ast_wrapper, lksb, o2dlm_blocking_ast_wrapper); ret = dlm_status_to_errno(status); return ret; } static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn, - union ocfs2_dlm_lksb *lksb, - u32 flags, - void *astarg) + struct ocfs2_dlm_lksb *lksb, + u32 flags) { enum dlm_status status; int o2dlm_flags = flags_to_o2dlm(flags); int ret; status = dlmunlock(conn->cc_lockspace, &lksb->lksb_o2dlm, - o2dlm_flags, o2dlm_unlock_ast_wrapper, astarg); + o2dlm_flags, o2dlm_unlock_ast_wrapper, lksb); ret = dlm_status_to_errno(status); return ret; } -static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb) +static int o2cb_dlm_lock_status(struct ocfs2_dlm_lksb *lksb) { return dlm_status_to_errno(lksb->lksb_o2dlm.status); } @@ -242,22 +241,77 @@ static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb) * contents, it will zero out the LVB. Thus the caller can always trust * the contents. */ -static int o2cb_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb) +static int o2cb_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb) { return 1; } -static void *o2cb_dlm_lvb(union ocfs2_dlm_lksb *lksb) +static void *o2cb_dlm_lvb(struct ocfs2_dlm_lksb *lksb) { return (void *)(lksb->lksb_o2dlm.lvb); } -static void o2cb_dump_lksb(union ocfs2_dlm_lksb *lksb) +static void o2cb_dump_lksb(struct ocfs2_dlm_lksb *lksb) { dlm_print_one_lock(lksb->lksb_o2dlm.lockid); } /* + * Check if this node is heartbeating and is connected to all other + * heartbeating nodes. + */ +static int o2cb_cluster_check(void) +{ + u8 node_num; + int i; + unsigned long hbmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; + unsigned long netmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; + + node_num = o2nm_this_node(); + if (node_num == O2NM_MAX_NODES) { + printk(KERN_ERR "o2cb: This node has not been configured.\n"); + return -EINVAL; + } + + /* + * o2dlm expects o2net sockets to be created. If not, then + * dlm_join_domain() fails with a stack of errors which are both cryptic + * and incomplete. The idea here is to detect upfront whether we have + * managed to connect to all nodes or not. If not, then list the nodes + * to allow the user to check the configuration (incorrect IP, firewall, + * etc.) Yes, this is racy. But its not the end of the world. + */ +#define O2CB_MAP_STABILIZE_COUNT 60 + for (i = 0; i < O2CB_MAP_STABILIZE_COUNT; ++i) { + o2hb_fill_node_map(hbmap, sizeof(hbmap)); + if (!test_bit(node_num, hbmap)) { + printk(KERN_ERR "o2cb: %s heartbeat has not been " + "started.\n", (o2hb_global_heartbeat_active() ? + "Global" : "Local")); + return -EINVAL; + } + o2net_fill_node_map(netmap, sizeof(netmap)); + /* Force set the current node to allow easy compare */ + set_bit(node_num, netmap); + if (!memcmp(hbmap, netmap, sizeof(hbmap))) + return 0; + if (i < O2CB_MAP_STABILIZE_COUNT) + msleep(1000); + } + + printk(KERN_ERR "o2cb: This node could not connect to nodes:"); + i = -1; + while ((i = find_next_bit(hbmap, O2NM_MAX_NODES, + i + 1)) < O2NM_MAX_NODES) { + if (!test_bit(i, netmap)) + printk(" %u", i); + } + printk(".\n"); + + return -ENOTCONN; +} + +/* * Called from the dlm when it's about to evict a node. This is how the * classic stack signals node death. */ @@ -265,8 +319,8 @@ static void o2dlm_eviction_cb(int node_num, void *data) { struct ocfs2_cluster_connection *conn = data; - mlog(ML_NOTICE, "o2dlm has evicted node %d from group %.*s\n", - node_num, conn->cc_namelen, conn->cc_name); + printk(KERN_NOTICE "o2cb: o2dlm has evicted node %d from domain %.*s\n", + node_num, conn->cc_namelen, conn->cc_name); conn->cc_recovery_handler(node_num, conn->cc_recovery_data); } @@ -277,15 +331,16 @@ static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn) u32 dlm_key; struct dlm_ctxt *dlm; struct o2dlm_private *priv; - struct dlm_protocol_version dlm_version; + struct dlm_protocol_version fs_version; BUG_ON(conn == NULL); - BUG_ON(o2cb_stack.sp_proto == NULL); + BUG_ON(conn->cc_proto == NULL); - /* for now we only have one cluster/node, make sure we see it - * in the heartbeat universe */ - if (!o2hb_check_local_node_heartbeating()) { - rc = -EINVAL; + /* Ensure cluster stack is up and all nodes are connected */ + rc = o2cb_cluster_check(); + if (rc) { + printk(KERN_ERR "o2cb: Cluster check failed. Fix errors " + "before retrying.\n"); goto out; } @@ -304,24 +359,24 @@ static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn) /* used by the dlm code to make message headers unique, each * node in this domain must agree on this. */ dlm_key = crc32_le(0, conn->cc_name, conn->cc_namelen); - dlm_version.pv_major = conn->cc_version.pv_major; - dlm_version.pv_minor = conn->cc_version.pv_minor; + fs_version.pv_major = conn->cc_version.pv_major; + fs_version.pv_minor = conn->cc_version.pv_minor; - dlm = dlm_register_domain(conn->cc_name, dlm_key, &dlm_version); + dlm = dlm_register_domain(conn->cc_name, dlm_key, &fs_version); if (IS_ERR(dlm)) { rc = PTR_ERR(dlm); mlog_errno(rc); goto out_free; } - conn->cc_version.pv_major = dlm_version.pv_major; - conn->cc_version.pv_minor = dlm_version.pv_minor; + conn->cc_version.pv_major = fs_version.pv_major; + conn->cc_version.pv_minor = fs_version.pv_minor; conn->cc_lockspace = dlm; dlm_register_eviction_cb(dlm, &priv->op_eviction_cb); out_free: - if (rc && conn->cc_private) + if (rc) kfree(conn->cc_private); out: @@ -343,7 +398,8 @@ static int o2cb_cluster_disconnect(struct ocfs2_cluster_connection *conn) return 0; } -static int o2cb_cluster_this_node(unsigned int *node) +static int o2cb_cluster_this_node(struct ocfs2_cluster_connection *conn, + unsigned int *node) { int node_num; |
