diff options
Diffstat (limited to 'fs/ocfs2/stack_user.c')
| -rw-r--r-- | fs/ocfs2/stack_user.c | 310 | 
1 files changed, 269 insertions, 41 deletions
diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c index 252e7c82f92..13a8537d8e8 100644 --- a/fs/ocfs2/stack_user.c +++ b/fs/ocfs2/stack_user.c @@ -23,6 +23,7 @@  #include <linux/mutex.h>  #include <linux/slab.h>  #include <linux/reboot.h> +#include <linux/sched.h>  #include <asm/uaccess.h>  #include "stackglue.h" @@ -102,6 +103,12 @@  #define OCFS2_TEXT_UUID_LEN			32  #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN	2  #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN	8 +#define VERSION_LOCK				"version_lock" + +enum ocfs2_connection_type { +	WITH_CONTROLD, +	NO_CONTROLD +};  /*   * ocfs2_live_connection is refcounted because the filesystem and @@ -110,6 +117,13 @@  struct ocfs2_live_connection {  	struct list_head		oc_list;  	struct ocfs2_cluster_connection	*oc_conn; +	enum ocfs2_connection_type	oc_type; +	atomic_t                        oc_this_node; +	int                             oc_our_slot; +	struct dlm_lksb                 oc_version_lksb; +	char                            oc_lvb[DLM_LVB_LEN]; +	struct completion               oc_sync_wait; +	wait_queue_head_t		oc_wait;  };  struct ocfs2_control_private { @@ -190,7 +204,7 @@ static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)  			return c;  	} -	return c; +	return NULL;  }  /* @@ -198,20 +212,15 @@ static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)   * mount path.  Since the VFS prevents multiple calls to   * fill_super(), we can't get dupes here.   */ -static int ocfs2_live_connection_new(struct ocfs2_cluster_connection *conn, -				     struct ocfs2_live_connection **c_ret) +static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn, +				     struct ocfs2_live_connection *c)  {  	int rc = 0; -	struct ocfs2_live_connection *c; - -	c = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL); -	if (!c) -		return -ENOMEM;  	mutex_lock(&ocfs2_control_lock);  	c->oc_conn = conn; -	if (atomic_read(&ocfs2_control_opened)) +	if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))  		list_add(&c->oc_list, &ocfs2_live_connection_list);  	else {  		printk(KERN_ERR @@ -220,12 +229,6 @@ static int ocfs2_live_connection_new(struct ocfs2_cluster_connection *conn,  	}  	mutex_unlock(&ocfs2_control_lock); - -	if (!rc) -		*c_ret = c; -	else -		kfree(c); -  	return rc;  } @@ -799,18 +802,251 @@ static int fs_protocol_compare(struct ocfs2_protocol_version *existing,  	return 0;  } +static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver) +{ +	struct ocfs2_protocol_version *pv = +		(struct ocfs2_protocol_version *)lvb; +	/* +	 * ocfs2_protocol_version has two u8 variables, so we don't +	 * need any endian conversion. +	 */ +	ver->pv_major = pv->pv_major; +	ver->pv_minor = pv->pv_minor; +} + +static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb) +{ +	struct ocfs2_protocol_version *pv = +		(struct ocfs2_protocol_version *)lvb; +	/* +	 * ocfs2_protocol_version has two u8 variables, so we don't +	 * need any endian conversion. +	 */ +	pv->pv_major = ver->pv_major; +	pv->pv_minor = ver->pv_minor; +} + +static void sync_wait_cb(void *arg) +{ +	struct ocfs2_cluster_connection *conn = arg; +	struct ocfs2_live_connection *lc = conn->cc_private; +	complete(&lc->oc_sync_wait); +} + +static int sync_unlock(struct ocfs2_cluster_connection *conn, +		struct dlm_lksb *lksb, char *name) +{ +	int error; +	struct ocfs2_live_connection *lc = conn->cc_private; + +	error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn); +	if (error) { +		printk(KERN_ERR "%s lkid %x error %d\n", +				name, lksb->sb_lkid, error); +		return error; +	} + +	wait_for_completion(&lc->oc_sync_wait); + +	if (lksb->sb_status != -DLM_EUNLOCK) { +		printk(KERN_ERR "%s lkid %x status %d\n", +				name, lksb->sb_lkid, lksb->sb_status); +		return -1; +	} +	return 0; +} + +static int sync_lock(struct ocfs2_cluster_connection *conn, +		int mode, uint32_t flags, +		struct dlm_lksb *lksb, char *name) +{ +	int error, status; +	struct ocfs2_live_connection *lc = conn->cc_private; + +	error = dlm_lock(conn->cc_lockspace, mode, lksb, flags, +			name, strlen(name), +			0, sync_wait_cb, conn, NULL); +	if (error) { +		printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n", +				name, lksb->sb_lkid, flags, mode, error); +		return error; +	} + +	wait_for_completion(&lc->oc_sync_wait); + +	status = lksb->sb_status; + +	if (status && status != -EAGAIN) { +		printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n", +				name, lksb->sb_lkid, flags, mode, status); +	} + +	return status; +} + + +static int version_lock(struct ocfs2_cluster_connection *conn, int mode, +		int flags) +{ +	struct ocfs2_live_connection *lc = conn->cc_private; +	return sync_lock(conn, mode, flags, +			&lc->oc_version_lksb, VERSION_LOCK); +} + +static int version_unlock(struct ocfs2_cluster_connection *conn) +{ +	struct ocfs2_live_connection *lc = conn->cc_private; +	return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK); +} + +/* get_protocol_version() + * + * To exchange ocfs2 versioning, we use the LVB of the version dlm lock. + * The algorithm is: + * 1. Attempt to take the lock in EX mode (non-blocking). + * 2. If successful (which means it is the first mount), write the + *    version number and downconvert to PR lock. + * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after + *    taking the PR lock. + */ + +static int get_protocol_version(struct ocfs2_cluster_connection *conn) +{ +	int ret; +	struct ocfs2_live_connection *lc = conn->cc_private; +	struct ocfs2_protocol_version pv; + +	running_proto.pv_major = +		ocfs2_user_plugin.sp_max_proto.pv_major; +	running_proto.pv_minor = +		ocfs2_user_plugin.sp_max_proto.pv_minor; + +	lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb; +	ret = version_lock(conn, DLM_LOCK_EX, +			DLM_LKF_VALBLK|DLM_LKF_NOQUEUE); +	if (!ret) { +		conn->cc_version.pv_major = running_proto.pv_major; +		conn->cc_version.pv_minor = running_proto.pv_minor; +		version_to_lvb(&running_proto, lc->oc_lvb); +		version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK); +	} else if (ret == -EAGAIN) { +		ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK); +		if (ret) +			goto out; +		lvb_to_version(lc->oc_lvb, &pv); + +		if ((pv.pv_major != running_proto.pv_major) || +				(pv.pv_minor > running_proto.pv_minor)) { +			ret = -EINVAL; +			goto out; +		} + +		conn->cc_version.pv_major = pv.pv_major; +		conn->cc_version.pv_minor = pv.pv_minor; +	} +out: +	return ret; +} + +static void user_recover_prep(void *arg) +{ +} + +static void user_recover_slot(void *arg, struct dlm_slot *slot) +{ +	struct ocfs2_cluster_connection *conn = arg; +	printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n", +			slot->nodeid, slot->slot); +	conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data); + +} + +static void user_recover_done(void *arg, struct dlm_slot *slots, +		int num_slots, int our_slot, +		uint32_t generation) +{ +	struct ocfs2_cluster_connection *conn = arg; +	struct ocfs2_live_connection *lc = conn->cc_private; +	int i; + +	for (i = 0; i < num_slots; i++) +		if (slots[i].slot == our_slot) { +			atomic_set(&lc->oc_this_node, slots[i].nodeid); +			break; +		} + +	lc->oc_our_slot = our_slot; +	wake_up(&lc->oc_wait); +} + +static const struct dlm_lockspace_ops ocfs2_ls_ops = { +	.recover_prep = user_recover_prep, +	.recover_slot = user_recover_slot, +	.recover_done = user_recover_done, +}; + +static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn) +{ +	version_unlock(conn); +	dlm_release_lockspace(conn->cc_lockspace, 2); +	conn->cc_lockspace = NULL; +	ocfs2_live_connection_drop(conn->cc_private); +	conn->cc_private = NULL; +	return 0; +} +  static int user_cluster_connect(struct ocfs2_cluster_connection *conn)  {  	dlm_lockspace_t *fsdlm; -	struct ocfs2_live_connection *uninitialized_var(control); -	int rc = 0; +	struct ocfs2_live_connection *lc; +	int rc, ops_rv;  	BUG_ON(conn == NULL); -	rc = ocfs2_live_connection_new(conn, &control); +	lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL); +	if (!lc) { +		rc = -ENOMEM; +		goto out; +	} + +	init_waitqueue_head(&lc->oc_wait); +	init_completion(&lc->oc_sync_wait); +	atomic_set(&lc->oc_this_node, 0); +	conn->cc_private = lc; +	lc->oc_type = NO_CONTROLD; + +	rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name, +			       DLM_LSFL_FS, DLM_LVB_LEN, +			       &ocfs2_ls_ops, conn, &ops_rv, &fsdlm); +	if (rc) +		goto out; + +	if (ops_rv == -EOPNOTSUPP) { +		lc->oc_type = WITH_CONTROLD; +		printk(KERN_NOTICE "ocfs2: You seem to be using an older " +				"version of dlm_controld and/or ocfs2-tools." +				" Please consider upgrading.\n"); +	} else if (ops_rv) { +		rc = ops_rv; +		goto out; +	} +	conn->cc_lockspace = fsdlm; + +	rc = ocfs2_live_connection_attach(conn, lc);  	if (rc)  		goto out; +	if (lc->oc_type == NO_CONTROLD) { +		rc = get_protocol_version(conn); +		if (rc) { +			printk(KERN_ERR "ocfs2: Could not determine" +					" locking version\n"); +			user_cluster_disconnect(conn); +			goto out; +		} +		wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0)); +	} +  	/*  	 * running_proto must have been set before we allowed any mounts  	 * to proceed. @@ -818,42 +1054,34 @@ static int user_cluster_connect(struct ocfs2_cluster_connection *conn)  	if (fs_protocol_compare(&running_proto, &conn->cc_version)) {  		printk(KERN_ERR  		       "Unable to mount with fs locking protocol version " -		       "%u.%u because the userspace control daemon has " -		       "negotiated %u.%u\n", +		       "%u.%u because negotiated protocol is %u.%u\n",  		       conn->cc_version.pv_major, conn->cc_version.pv_minor,  		       running_proto.pv_major, running_proto.pv_minor);  		rc = -EPROTO; -		ocfs2_live_connection_drop(control); -		goto out; -	} - -	rc = dlm_new_lockspace(conn->cc_name, strlen(conn->cc_name), -			       &fsdlm, DLM_LSFL_FS, DLM_LVB_LEN); -	if (rc) { -		ocfs2_live_connection_drop(control); -		goto out; +		ocfs2_live_connection_drop(lc); +		lc = NULL;  	} -	conn->cc_private = control; -	conn->cc_lockspace = fsdlm;  out: +	if (rc && lc) +		kfree(lc);  	return rc;  } -static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn) -{ -	dlm_release_lockspace(conn->cc_lockspace, 2); -	conn->cc_lockspace = NULL; -	ocfs2_live_connection_drop(conn->cc_private); -	conn->cc_private = NULL; -	return 0; -} -static int user_cluster_this_node(unsigned int *this_node) +static int user_cluster_this_node(struct ocfs2_cluster_connection *conn, +				  unsigned int *this_node)  {  	int rc; +	struct ocfs2_live_connection *lc = conn->cc_private; + +	if (lc->oc_type == WITH_CONTROLD) +		rc = ocfs2_control_get_this_node(); +	else if (lc->oc_type == NO_CONTROLD) +		rc = atomic_read(&lc->oc_this_node); +	else +		rc = -EINVAL; -	rc = ocfs2_control_get_this_node();  	if (rc < 0)  		return rc;  | 
