aboutsummaryrefslogtreecommitdiff
path: root/fs/ocfs2/stack_user.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/stack_user.c')
-rw-r--r--fs/ocfs2/stack_user.c364
1 files changed, 290 insertions, 74 deletions
diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index da78a2a334f..13a8537d8e8 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -21,11 +21,11 @@
#include <linux/fs.h>
#include <linux/miscdevice.h>
#include <linux/mutex.h>
-#include <linux/smp_lock.h>
+#include <linux/slab.h>
#include <linux/reboot.h>
+#include <linux/sched.h>
#include <asm/uaccess.h>
-#include "ocfs2.h" /* For struct ocfs2_lock_res */
#include "stackglue.h"
#include <linux/dlm_plock.h>
@@ -63,8 +63,8 @@
* negotiated by the client. The client negotiates based on the maximum
* version advertised in /sys/fs/ocfs2/max_locking_protocol. The major
* number from the "SETV" message must match
- * ocfs2_user_plugin.sp_proto->lp_max_version.pv_major, and the minor number
- * must be less than or equal to ...->lp_max_version.pv_minor.
+ * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
+ * must be less than or equal to ...sp_max_version.pv_minor.
*
* Once this information has been set, mounts will be allowed. From this
* point on, the "DOWN" message can be sent for node down notification.
@@ -103,6 +103,12 @@
#define OCFS2_TEXT_UUID_LEN 32
#define OCFS2_CONTROL_MESSAGE_VERNUM_LEN 2
#define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8
+#define VERSION_LOCK "version_lock"
+
+enum ocfs2_connection_type {
+ WITH_CONTROLD,
+ NO_CONTROLD
+};
/*
* ocfs2_live_connection is refcounted because the filesystem and
@@ -111,6 +117,13 @@
struct ocfs2_live_connection {
struct list_head oc_list;
struct ocfs2_cluster_connection *oc_conn;
+ enum ocfs2_connection_type oc_type;
+ atomic_t oc_this_node;
+ int oc_our_slot;
+ struct dlm_lksb oc_version_lksb;
+ char oc_lvb[DLM_LVB_LEN];
+ struct completion oc_sync_wait;
+ wait_queue_head_t oc_wait;
};
struct ocfs2_control_private {
@@ -191,7 +204,7 @@ static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
return c;
}
- return c;
+ return NULL;
}
/*
@@ -199,20 +212,15 @@ static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
* mount path. Since the VFS prevents multiple calls to
* fill_super(), we can't get dupes here.
*/
-static int ocfs2_live_connection_new(struct ocfs2_cluster_connection *conn,
- struct ocfs2_live_connection **c_ret)
+static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
+ struct ocfs2_live_connection *c)
{
int rc = 0;
- struct ocfs2_live_connection *c;
-
- c = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
- if (!c)
- return -ENOMEM;
mutex_lock(&ocfs2_control_lock);
c->oc_conn = conn;
- if (atomic_read(&ocfs2_control_opened))
+ if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
list_add(&c->oc_list, &ocfs2_live_connection_list);
else {
printk(KERN_ERR
@@ -221,12 +229,6 @@ static int ocfs2_live_connection_new(struct ocfs2_cluster_connection *conn,
}
mutex_unlock(&ocfs2_control_lock);
-
- if (!rc)
- *c_ret = c;
- else
- kfree(c);
-
return rc;
}
@@ -401,7 +403,7 @@ static int ocfs2_control_do_setversion_msg(struct file *file,
char *ptr = NULL;
struct ocfs2_control_private *p = file->private_data;
struct ocfs2_protocol_version *max =
- &ocfs2_user_plugin.sp_proto->lp_max_version;
+ &ocfs2_user_plugin.sp_max_proto;
if (ocfs2_control_get_handshake_state(file) !=
OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
@@ -612,12 +614,10 @@ static int ocfs2_control_open(struct inode *inode, struct file *file)
return -ENOMEM;
p->op_this_node = -1;
- lock_kernel();
mutex_lock(&ocfs2_control_lock);
file->private_data = p;
list_add(&p->op_list, &ocfs2_control_private_list);
mutex_unlock(&ocfs2_control_lock);
- unlock_kernel();
return 0;
}
@@ -628,6 +628,7 @@ static const struct file_operations ocfs2_control_fops = {
.read = ocfs2_control_read,
.write = ocfs2_control_write,
.owner = THIS_MODULE,
+ .llseek = default_llseek,
};
static struct miscdevice ocfs2_control_device = {
@@ -664,18 +665,10 @@ static void ocfs2_control_exit(void)
-rc);
}
-static struct dlm_lksb *fsdlm_astarg_to_lksb(void *astarg)
-{
- struct ocfs2_lock_res *res = astarg;
- return &res->l_lksb.lksb_fsdlm;
-}
-
static void fsdlm_lock_ast_wrapper(void *astarg)
{
- struct dlm_lksb *lksb = fsdlm_astarg_to_lksb(astarg);
- int status = lksb->sb_status;
-
- BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
+ struct ocfs2_dlm_lksb *lksb = astarg;
+ int status = lksb->lksb_fsdlm.sb_status;
/*
* For now we're punting on the issue of other non-standard errors
@@ -688,25 +681,24 @@ static void fsdlm_lock_ast_wrapper(void *astarg)
*/
if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
- ocfs2_user_plugin.sp_proto->lp_unlock_ast(astarg, 0);
+ lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
else
- ocfs2_user_plugin.sp_proto->lp_lock_ast(astarg);
+ lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
}
static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
{
- BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
+ struct ocfs2_dlm_lksb *lksb = astarg;
- ocfs2_user_plugin.sp_proto->lp_blocking_ast(astarg, level);
+ lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
}
static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
- union ocfs2_dlm_lksb *lksb,
+ struct ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
- unsigned int namelen,
- void *astarg)
+ unsigned int namelen)
{
int ret;
@@ -716,36 +708,35 @@ static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
flags|DLM_LKF_NODLCKWT, name, namelen, 0,
- fsdlm_lock_ast_wrapper, astarg,
+ fsdlm_lock_ast_wrapper, lksb,
fsdlm_blocking_ast_wrapper);
return ret;
}
static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
- union ocfs2_dlm_lksb *lksb,
- u32 flags,
- void *astarg)
+ struct ocfs2_dlm_lksb *lksb,
+ u32 flags)
{
int ret;
ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
- flags, &lksb->lksb_fsdlm, astarg);
+ flags, &lksb->lksb_fsdlm, lksb);
return ret;
}
-static int user_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
+static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
{
return lksb->lksb_fsdlm.sb_status;
}
-static int user_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb)
+static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
{
int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;
return !invalid;
}
-static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb)
+static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
{
if (!lksb->lksb_fsdlm.sb_lvbptr)
lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
@@ -753,7 +744,7 @@ static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb)
return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
}
-static void user_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
+static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
{
}
@@ -811,18 +802,251 @@ static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
return 0;
}
+static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver)
+{
+ struct ocfs2_protocol_version *pv =
+ (struct ocfs2_protocol_version *)lvb;
+ /*
+ * ocfs2_protocol_version has two u8 variables, so we don't
+ * need any endian conversion.
+ */
+ ver->pv_major = pv->pv_major;
+ ver->pv_minor = pv->pv_minor;
+}
+
+static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb)
+{
+ struct ocfs2_protocol_version *pv =
+ (struct ocfs2_protocol_version *)lvb;
+ /*
+ * ocfs2_protocol_version has two u8 variables, so we don't
+ * need any endian conversion.
+ */
+ pv->pv_major = ver->pv_major;
+ pv->pv_minor = ver->pv_minor;
+}
+
+static void sync_wait_cb(void *arg)
+{
+ struct ocfs2_cluster_connection *conn = arg;
+ struct ocfs2_live_connection *lc = conn->cc_private;
+ complete(&lc->oc_sync_wait);
+}
+
+static int sync_unlock(struct ocfs2_cluster_connection *conn,
+ struct dlm_lksb *lksb, char *name)
+{
+ int error;
+ struct ocfs2_live_connection *lc = conn->cc_private;
+
+ error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn);
+ if (error) {
+ printk(KERN_ERR "%s lkid %x error %d\n",
+ name, lksb->sb_lkid, error);
+ return error;
+ }
+
+ wait_for_completion(&lc->oc_sync_wait);
+
+ if (lksb->sb_status != -DLM_EUNLOCK) {
+ printk(KERN_ERR "%s lkid %x status %d\n",
+ name, lksb->sb_lkid, lksb->sb_status);
+ return -1;
+ }
+ return 0;
+}
+
+static int sync_lock(struct ocfs2_cluster_connection *conn,
+ int mode, uint32_t flags,
+ struct dlm_lksb *lksb, char *name)
+{
+ int error, status;
+ struct ocfs2_live_connection *lc = conn->cc_private;
+
+ error = dlm_lock(conn->cc_lockspace, mode, lksb, flags,
+ name, strlen(name),
+ 0, sync_wait_cb, conn, NULL);
+ if (error) {
+ printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n",
+ name, lksb->sb_lkid, flags, mode, error);
+ return error;
+ }
+
+ wait_for_completion(&lc->oc_sync_wait);
+
+ status = lksb->sb_status;
+
+ if (status && status != -EAGAIN) {
+ printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n",
+ name, lksb->sb_lkid, flags, mode, status);
+ }
+
+ return status;
+}
+
+
+static int version_lock(struct ocfs2_cluster_connection *conn, int mode,
+ int flags)
+{
+ struct ocfs2_live_connection *lc = conn->cc_private;
+ return sync_lock(conn, mode, flags,
+ &lc->oc_version_lksb, VERSION_LOCK);
+}
+
+static int version_unlock(struct ocfs2_cluster_connection *conn)
+{
+ struct ocfs2_live_connection *lc = conn->cc_private;
+ return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
+}
+
+/* get_protocol_version()
+ *
+ * To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
+ * The algorithm is:
+ * 1. Attempt to take the lock in EX mode (non-blocking).
+ * 2. If successful (which means it is the first mount), write the
+ * version number and downconvert to PR lock.
+ * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
+ * taking the PR lock.
+ */
+
+static int get_protocol_version(struct ocfs2_cluster_connection *conn)
+{
+ int ret;
+ struct ocfs2_live_connection *lc = conn->cc_private;
+ struct ocfs2_protocol_version pv;
+
+ running_proto.pv_major =
+ ocfs2_user_plugin.sp_max_proto.pv_major;
+ running_proto.pv_minor =
+ ocfs2_user_plugin.sp_max_proto.pv_minor;
+
+ lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
+ ret = version_lock(conn, DLM_LOCK_EX,
+ DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
+ if (!ret) {
+ conn->cc_version.pv_major = running_proto.pv_major;
+ conn->cc_version.pv_minor = running_proto.pv_minor;
+ version_to_lvb(&running_proto, lc->oc_lvb);
+ version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
+ } else if (ret == -EAGAIN) {
+ ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
+ if (ret)
+ goto out;
+ lvb_to_version(lc->oc_lvb, &pv);
+
+ if ((pv.pv_major != running_proto.pv_major) ||
+ (pv.pv_minor > running_proto.pv_minor)) {
+ ret = -EINVAL;
+ goto out;
+ }
+
+ conn->cc_version.pv_major = pv.pv_major;
+ conn->cc_version.pv_minor = pv.pv_minor;
+ }
+out:
+ return ret;
+}
+
+static void user_recover_prep(void *arg)
+{
+}
+
+static void user_recover_slot(void *arg, struct dlm_slot *slot)
+{
+ struct ocfs2_cluster_connection *conn = arg;
+ printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n",
+ slot->nodeid, slot->slot);
+ conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data);
+
+}
+
+static void user_recover_done(void *arg, struct dlm_slot *slots,
+ int num_slots, int our_slot,
+ uint32_t generation)
+{
+ struct ocfs2_cluster_connection *conn = arg;
+ struct ocfs2_live_connection *lc = conn->cc_private;
+ int i;
+
+ for (i = 0; i < num_slots; i++)
+ if (slots[i].slot == our_slot) {
+ atomic_set(&lc->oc_this_node, slots[i].nodeid);
+ break;
+ }
+
+ lc->oc_our_slot = our_slot;
+ wake_up(&lc->oc_wait);
+}
+
+static const struct dlm_lockspace_ops ocfs2_ls_ops = {
+ .recover_prep = user_recover_prep,
+ .recover_slot = user_recover_slot,
+ .recover_done = user_recover_done,
+};
+
+static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
+{
+ version_unlock(conn);
+ dlm_release_lockspace(conn->cc_lockspace, 2);
+ conn->cc_lockspace = NULL;
+ ocfs2_live_connection_drop(conn->cc_private);
+ conn->cc_private = NULL;
+ return 0;
+}
+
static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
{
dlm_lockspace_t *fsdlm;
- struct ocfs2_live_connection *uninitialized_var(control);
- int rc = 0;
+ struct ocfs2_live_connection *lc;
+ int rc, ops_rv;
BUG_ON(conn == NULL);
- rc = ocfs2_live_connection_new(conn, &control);
+ lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
+ if (!lc) {
+ rc = -ENOMEM;
+ goto out;
+ }
+
+ init_waitqueue_head(&lc->oc_wait);
+ init_completion(&lc->oc_sync_wait);
+ atomic_set(&lc->oc_this_node, 0);
+ conn->cc_private = lc;
+ lc->oc_type = NO_CONTROLD;
+
+ rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
+ DLM_LSFL_FS, DLM_LVB_LEN,
+ &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
if (rc)
goto out;
+ if (ops_rv == -EOPNOTSUPP) {
+ lc->oc_type = WITH_CONTROLD;
+ printk(KERN_NOTICE "ocfs2: You seem to be using an older "
+ "version of dlm_controld and/or ocfs2-tools."
+ " Please consider upgrading.\n");
+ } else if (ops_rv) {
+ rc = ops_rv;
+ goto out;
+ }
+ conn->cc_lockspace = fsdlm;
+
+ rc = ocfs2_live_connection_attach(conn, lc);
+ if (rc)
+ goto out;
+
+ if (lc->oc_type == NO_CONTROLD) {
+ rc = get_protocol_version(conn);
+ if (rc) {
+ printk(KERN_ERR "ocfs2: Could not determine"
+ " locking version\n");
+ user_cluster_disconnect(conn);
+ goto out;
+ }
+ wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
+ }
+
/*
* running_proto must have been set before we allowed any mounts
* to proceed.
@@ -830,42 +1054,34 @@ static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
printk(KERN_ERR
"Unable to mount with fs locking protocol version "
- "%u.%u because the userspace control daemon has "
- "negotiated %u.%u\n",
+ "%u.%u because negotiated protocol is %u.%u\n",
conn->cc_version.pv_major, conn->cc_version.pv_minor,
running_proto.pv_major, running_proto.pv_minor);
rc = -EPROTO;
- ocfs2_live_connection_drop(control);
- goto out;
+ ocfs2_live_connection_drop(lc);
+ lc = NULL;
}
- rc = dlm_new_lockspace(conn->cc_name, strlen(conn->cc_name),
- &fsdlm, DLM_LSFL_FS, DLM_LVB_LEN);
- if (rc) {
- ocfs2_live_connection_drop(control);
- goto out;
- }
-
- conn->cc_private = control;
- conn->cc_lockspace = fsdlm;
out:
+ if (rc && lc)
+ kfree(lc);
return rc;
}
-static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
-{
- dlm_release_lockspace(conn->cc_lockspace, 2);
- conn->cc_lockspace = NULL;
- ocfs2_live_connection_drop(conn->cc_private);
- conn->cc_private = NULL;
- return 0;
-}
-static int user_cluster_this_node(unsigned int *this_node)
+static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
+ unsigned int *this_node)
{
int rc;
+ struct ocfs2_live_connection *lc = conn->cc_private;
+
+ if (lc->oc_type == WITH_CONTROLD)
+ rc = ocfs2_control_get_this_node();
+ else if (lc->oc_type == NO_CONTROLD)
+ rc = atomic_read(&lc->oc_this_node);
+ else
+ rc = -EINVAL;
- rc = ocfs2_control_get_this_node();
if (rc < 0)
return rc;