aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@woody.linux-foundation.org>2007-02-08 10:37:22 -0800
committerLinus Torvalds <torvalds@woody.linux-foundation.org>2007-02-08 10:37:22 -0800
commit5986a2ec35836a878350c54af4bd91b1de6abc59 (patch)
tree2efe068e124071ca30a5f1886402b890d7ba429e
parent43187902cbfafe73ede0144166b741fb0f7d04e1 (diff)
parentff05d1c4643dd4260eb699396043d7e8009c0de4 (diff)
Merge branch 'upstream-linus' of master.kernel.org:/pub/scm/linux/kernel/git/mfasheh/ocfs2
* 'upstream-linus' of master.kernel.org:/pub/scm/linux/kernel/git/mfasheh/ocfs2: (22 commits) configfs: Zero terminate data in configfs attribute writes. [PATCH] ocfs2 heartbeat: clean up bio submission code ocfs2: introduce sc->sc_send_lock to protect outbound outbound messages [PATCH] ocfs2: drop INET from Kconfig, not needed ocfs2_dlm: Add timeout to dlm join domain ocfs2_dlm: Silence some messages during join domain ocfs2_dlm: disallow a domain join if node maps mismatch ocfs2_dlm: Ensure correct ordering of set/clear refmap bit on lockres ocfs2: Binds listener to the configured ip address ocfs2_dlm: Calling post handler function in assert master handler ocfs2: Added post handler callable function in o2net message handler ocfs2_dlm: Cookies in locks not being printed correctly in error messages ocfs2_dlm: Silence a failed convert ocfs2_dlm: wake up sleepers on the lockres waitqueue ocfs2_dlm: Dlm dispatch was stopping too early ocfs2_dlm: Drop inflight refmap even if no locks found on the lockres ocfs2_dlm: Flush dlm workqueue before starting to migrate ocfs2_dlm: Fix migrate lockres handler queue scanning ocfs2_dlm: Make dlmunlock() wait for migration to complete ocfs2_dlm: Fixes race between migrate and dirty ...
-rw-r--r--fs/Kconfig1
-rw-r--r--fs/configfs/file.c9
-rw-r--r--fs/ocfs2/cluster/heartbeat.c158
-rw-r--r--fs/ocfs2/cluster/tcp.c35
-rw-r--r--fs/ocfs2/cluster/tcp.h6
-rw-r--r--fs/ocfs2/cluster/tcp_internal.h12
-rw-r--r--fs/ocfs2/dlm/dlmast.c14
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h130
-rw-r--r--fs/ocfs2/dlm/dlmconvert.c40
-rw-r--r--fs/ocfs2/dlm/dlmdebug.c30
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c253
-rw-r--r--fs/ocfs2/dlm/dlmlock.c7
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c579
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c182
-rw-r--r--fs/ocfs2/dlm/dlmthread.c200
-rw-r--r--fs/ocfs2/dlm/dlmunlock.c15
-rw-r--r--fs/ocfs2/vote.c8
17 files changed, 1211 insertions, 468 deletions
diff --git a/fs/Kconfig b/fs/Kconfig
index 8cd2417a14d..5e8e9d9ccb3 100644
--- a/fs/Kconfig
+++ b/fs/Kconfig
@@ -426,7 +426,6 @@ config OCFS2_FS
select CONFIGFS_FS
select JBD
select CRC32
- select INET
help
OCFS2 is a general purpose extent based shared disk cluster file
system with many similarities to ext3. It supports 64 bit inode
diff --git a/fs/configfs/file.c b/fs/configfs/file.c
index 2a7cb086e80..d98be5e0132 100644
--- a/fs/configfs/file.c
+++ b/fs/configfs/file.c
@@ -162,14 +162,17 @@ fill_write_buffer(struct configfs_buffer * buffer, const char __user * buf, size
int error;
if (!buffer->page)
- buffer->page = (char *)get_zeroed_page(GFP_KERNEL);
+ buffer->page = (char *)__get_free_pages(GFP_KERNEL, 0);
if (!buffer->page)
return -ENOMEM;
- if (count > PAGE_SIZE)
- count = PAGE_SIZE;
+ if (count >= PAGE_SIZE)
+ count = PAGE_SIZE - 1;
error = copy_from_user(buffer->page,buf,count);
buffer->needs_read_fill = 1;
+ /* if buf is assumed to contain a string, terminate it by \0,
+ * so e.g. sscanf() can scan the string easily */
+ buffer->page[count] = 0;
return error ? -EFAULT : count;
}
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c
index 277ca67a2ad..5a9779bb923 100644
--- a/fs/ocfs2/cluster/heartbeat.c
+++ b/fs/ocfs2/cluster/heartbeat.c
@@ -184,10 +184,9 @@ static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
flush_scheduled_work();
}
-static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc,
- unsigned int num_ios)
+static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
{
- atomic_set(&wc->wc_num_reqs, num_ios);
+ atomic_set(&wc->wc_num_reqs, 1);
init_completion(&wc->wc_io_complete);
wc->wc_error = 0;
}
@@ -212,6 +211,7 @@ static void o2hb_wait_on_io(struct o2hb_region *reg,
struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
blk_run_address_space(mapping);
+ o2hb_bio_wait_dec(wc, 1);
wait_for_completion(&wc->wc_io_complete);
}
@@ -231,6 +231,7 @@ static int o2hb_bio_end_io(struct bio *bio,
return 1;
o2hb_bio_wait_dec(wc, 1);
+ bio_put(bio);
return 0;
}
@@ -238,23 +239,22 @@ static int o2hb_bio_end_io(struct bio *bio,
* start_slot. */
static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
struct o2hb_bio_wait_ctxt *wc,
- unsigned int start_slot,
- unsigned int num_slots)
+ unsigned int *current_slot,
+ unsigned int max_slots)
{
- int i, nr_vecs, len, first_page, last_page;
+ int len, current_page;
unsigned int vec_len, vec_start;
unsigned int bits = reg->hr_block_bits;
unsigned int spp = reg->hr_slots_per_page;
+ unsigned int cs = *current_slot;
struct bio *bio;
struct page *page;
- nr_vecs = (num_slots + spp - 1) / spp;
-
/* Testing has shown this allocation to take long enough under
* GFP_KERNEL that the local node can get fenced. It would be
* nicest if we could pre-allocate these bios and avoid this
* all together. */
- bio = bio_alloc(GFP_ATOMIC, nr_vecs);
+ bio = bio_alloc(GFP_ATOMIC, 16);
if (!bio) {
mlog(ML_ERROR, "Could not alloc slots BIO!\n");
bio = ERR_PTR(-ENOMEM);
@@ -262,137 +262,53 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
}
/* Must put everything in 512 byte sectors for the bio... */
- bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9);
+ bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
bio->bi_bdev = reg->hr_bdev;
bio->bi_private = wc;
bio->bi_end_io = o2hb_bio_end_io;
- first_page = start_slot / spp;
- last_page = first_page + nr_vecs;
- vec_start = (start_slot << bits) % PAGE_CACHE_SIZE;
- for(i = first_page; i < last_page; i++) {
- page = reg->hr_slot_data[i];
+ vec_start = (cs << bits) % PAGE_CACHE_SIZE;
+ while(cs < max_slots) {
+ current_page = cs / spp;
+ page = reg->hr_slot_data[current_page];
- vec_len = PAGE_CACHE_SIZE;
- /* last page might be short */
- if (((i + 1) * spp) > (start_slot + num_slots))
- vec_len = ((num_slots + start_slot) % spp) << bits;
- vec_len -= vec_start;
+ vec_len = min(PAGE_CACHE_SIZE,
+ (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
- i, vec_len, vec_start);
+ current_page, vec_len, vec_start);
len = bio_add_page(bio, page, vec_len, vec_start);
- if (len != vec_len) {
- bio_put(bio);
- bio = ERR_PTR(-EIO);
-
- mlog(ML_ERROR, "Error adding page to bio i = %d, "
- "vec_len = %u, len = %d\n, start = %u\n",
- i, vec_len, len, vec_start);
- goto bail;
- }
+ if (len != vec_len) break;
+ cs += vec_len / (PAGE_CACHE_SIZE/spp);
vec_start = 0;
}
bail:
+ *current_slot = cs;
return bio;
}
-/*
- * Compute the maximum number of sectors the bdev can handle in one bio,
- * as a power of two.
- *
- * Stolen from oracleasm, thanks Joel!
- */
-static int compute_max_sectors(struct block_device *bdev)
-{
- int max_pages, max_sectors, pow_two_sectors;
-
- struct request_queue *q;
-
- q = bdev_get_queue(bdev);
- max_pages = q->max_sectors >> (PAGE_SHIFT - 9);
- if (max_pages > BIO_MAX_PAGES)
- max_pages = BIO_MAX_PAGES;
- if (max_pages > q->max_phys_segments)
- max_pages = q->max_phys_segments;
- if (max_pages > q->max_hw_segments)
- max_pages = q->max_hw_segments;
- max_pages--; /* Handle I/Os that straddle a page */
-
- if (max_pages) {
- max_sectors = max_pages << (PAGE_SHIFT - 9);
- } else {
- /* If BIO contains 1 or less than 1 page. */
- max_sectors = q->max_sectors;
- }
- /* Why is fls() 1-based???? */
- pow_two_sectors = 1 << (fls(max_sectors) - 1);
-
- return pow_two_sectors;
-}
-
-static inline void o2hb_compute_request_limits(struct o2hb_region *reg,
- unsigned int num_slots,
- unsigned int *num_bios,
- unsigned int *slots_per_bio)
-{
- unsigned int max_sectors, io_sectors;
-
- max_sectors = compute_max_sectors(reg->hr_bdev);
-
- io_sectors = num_slots << (reg->hr_block_bits - 9);
-
- *num_bios = (io_sectors + max_sectors - 1) / max_sectors;
- *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9);
-
- mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This "
- "device can handle %u sectors of I/O\n", io_sectors, num_slots,
- max_sectors);
- mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n",
- *num_bios, *slots_per_bio);
-}
-
static int o2hb_read_slots(struct o2hb_region *reg,
unsigned int max_slots)
{
- unsigned int num_bios, slots_per_bio, start_slot, num_slots;
- int i, status;
+ unsigned int current_slot=0;
+ int status;
struct o2hb_bio_wait_ctxt wc;
- struct bio **bios;
struct bio *bio;
- o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio);
+ o2hb_bio_wait_init(&wc);
- bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL);
- if (!bios) {
- status = -ENOMEM;
- mlog_errno(status);
- return status;
- }
-
- o2hb_bio_wait_init(&wc, num_bios);
-
- num_slots = slots_per_bio;
- for(i = 0; i < num_bios; i++) {
- start_slot = i * slots_per_bio;
-
- /* adjust num_slots at last bio */
- if (max_slots < (start_slot + num_slots))
- num_slots = max_slots - start_slot;
-
- bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots);
+ while(current_slot < max_slots) {
+ bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
if (IS_ERR(bio)) {
- o2hb_bio_wait_dec(&wc, num_bios - i);
-
status = PTR_ERR(bio);
mlog_errno(status);
goto bail_and_wait;
}
- bios[i] = bio;
+ atomic_inc(&wc.wc_num_reqs);
submit_bio(READ, bio);
}
@@ -403,38 +319,30 @@ bail_and_wait:
if (wc.wc_error && !status)
status = wc.wc_error;
- if (bios) {
- for(i = 0; i < num_bios; i++)
- if (bios[i])
- bio_put(bios[i]);
- kfree(bios);
- }
-
return status;
}
static int o2hb_issue_node_write(struct o2hb_region *reg,
- struct bio **write_bio,
struct o2hb_bio_wait_ctxt *write_wc)
{
int status;
unsigned int slot;
struct bio *bio;
- o2hb_bio_wait_init(write_wc, 1);
+ o2hb_bio_wait_init(write_wc);
slot = o2nm_this_node();
- bio = o2hb_setup_one_bio(reg, write_wc, slot, 1);
+ bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
if (IS_ERR(bio)) {
status = PTR_ERR(bio);
mlog_errno(status);
goto bail;
}
+ atomic_inc(&write_wc->wc_num_reqs);
submit_bio(WRITE, bio);
- *write_bio = bio;
status = 0;
bail:
return status;
@@ -826,7 +734,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
{
int i, ret, highest_node, change = 0;
unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
- struct bio *write_bio;
struct o2hb_bio_wait_ctxt write_wc;
ret = o2nm_configured_node_map(configured_nodes,
@@ -864,7 +771,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
/* And fire off the write. Note that we don't wait on this I/O
* until later. */
- ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
+ ret = o2hb_issue_node_write(reg, &write_wc);
if (ret < 0) {
mlog_errno(ret);
return ret;
@@ -882,7 +789,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
* people we find in our steady state have seen us.
*/
o2hb_wait_on_io(reg, &write_wc);
- bio_put(write_bio);
if (write_wc.wc_error) {
/* Do not re-arm the write timeout on I/O error - we
* can't be sure that the new block ever made it to
@@ -943,7 +849,6 @@ static int o2hb_thread(void *data)
{
int i, ret;
struct o2hb_region *reg = data;
- struct bio *write_bio;
struct o2hb_bio_wait_ctxt write_wc;
struct timeval before_hb, after_hb;
unsigned int elapsed_msec;
@@ -993,10 +898,9 @@ static int o2hb_thread(void *data)
*
* XXX: Should we skip this on unclean_stop? */
o2hb_prepare_block(reg, 0);
- ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
+ ret = o2hb_issue_node_write(reg, &write_wc);
if (ret == 0) {
o2hb_wait_on_io(reg, &write_wc);
- bio_put(write_bio);
} else {
mlog_errno(ret);
}
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index ae4ff4a6636..1718215fc01 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -556,6 +556,8 @@ static void o2net_register_callbacks(struct sock *sk,
sk->sk_data_ready = o2net_data_ready;
sk->sk_state_change = o2net_state_change;
+ mutex_init(&sc->sc_send_lock);
+
write_unlock_bh(&sk->sk_callback_lock);
}
@@ -688,6 +690,7 @@ static void o2net_handler_put(struct o2net_msg_handler *nmh)
* be given to the handler if their payload is longer than the max. */
int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
o2net_msg_handler_func *func, void *data,
+ o2net_post_msg_handler_func *post_func,
struct list_head *unreg_list)
{
struct o2net_msg_handler *nmh = NULL;
@@ -722,6 +725,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
nmh->nh_func = func;
nmh->nh_func_data = data;
+ nmh->nh_post_func = post_func;
nmh->nh_msg_type = msg_type;
nmh->nh_max_len = max_len;
nmh->nh_key = key;
@@ -856,10 +860,12 @@ static void o2net_sendpage(struct o2net_sock_container *sc,
ssize_t ret;
+ mutex_lock(&sc->sc_send_lock);
ret = sc->sc_sock->ops->sendpage(sc->sc_sock,
virt_to_page(kmalloced_virt),
(long)kmalloced_virt & ~PAGE_MASK,
size, MSG_DONTWAIT);
+ mutex_unlock(&sc->sc_send_lock);
if (ret != size) {
mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT
" failed with %zd\n", size, SC_NODEF_ARGS(sc), ret);
@@ -974,8 +980,10 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
/* finally, convert the message header to network byte-order
* and send */
+ mutex_lock(&sc->sc_send_lock);
ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen,
sizeof(struct o2net_msg) + caller_bytes);
+ mutex_unlock(&sc->sc_send_lock);
msglog(msg, "sending returned %d\n", ret);
if (ret < 0) {
mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret);
@@ -1049,6 +1057,7 @@ static int o2net_process_message(struct o2net_sock_container *sc,
int ret = 0, handler_status;
enum o2net_system_error syserr;
struct o2net_msg_handler *nmh = NULL;
+ void *ret_data = NULL;
msglog(hdr, "processing message\n");
@@ -1101,17 +1110,26 @@ static int o2net_process_message(struct o2net_sock_container *sc,
sc->sc_msg_type = be16_to_cpu(hdr->msg_type);
handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) +
be16_to_cpu(hdr->data_len),
- nmh->nh_func_data);
+ nmh->nh_func_data, &ret_data);
do_gettimeofday(&sc->sc_tv_func_stop);
out_respond:
/* this destroys the hdr, so don't use it after this */
+ mutex_lock(&sc->sc_send_lock);
ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr,
handler_status);
+ mutex_unlock(&sc->sc_send_lock);
hdr = NULL;
mlog(0, "sending handler status %d, syserr %d returned %d\n",
handler_status, syserr, ret);
+ if (nmh) {
+ BUG_ON(ret_data != NULL && nmh->nh_post_func == NULL);
+ if (nmh->nh_post_func)
+ (nmh->nh_post_func)(handler_status, nmh->nh_func_data,
+ ret_data);
+ }
+
out:
if (nmh)
o2net_handler_put(nmh);
@@ -1795,13 +1813,13 @@ out:
ready(sk, bytes);
}
-static int o2net_open_listening_sock(__be16 port)
+static int o2net_open_listening_sock(__be32 addr, __be16 port)
{
struct socket *sock = NULL;
int ret;
struct sockaddr_in sin = {
.sin_family = PF_INET,
- .sin_addr = { .s_addr = (__force u32)htonl(INADDR_ANY) },
+ .sin_addr = { .s_addr = (__force u32)addr },
.sin_port = (__force u16)port,
};
@@ -1824,15 +1842,15 @@ static int o2net_open_listening_sock(__be16 port)
sock->sk->sk_reuse = 1;
ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
if (ret < 0) {
- mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n",
- ntohs(port), ret);
+ mlog(ML_ERROR, "unable to bind socket at %u.%u.%u.%u:%u, "
+ "ret=%d\n", NIPQUAD(addr), ntohs(port), ret);
goto out;
}
ret = sock->ops->listen(sock, 64);
if (ret < 0) {
- mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n",
- ntohs(port), ret);
+ mlog(ML_ERROR, "unable to listen on %u.%u.%u.%u:%u, ret=%d\n",
+ NIPQUAD(addr), ntohs(port), ret);
}
out:
@@ -1865,7 +1883,8 @@ int o2net_start_listening(struct o2nm_node *node)
return -ENOMEM; /* ? */
}
- ret = o2net_open_listening_sock(node->nd_ipv4_port);
+ ret = o2net_open_listening_sock(node->nd_ipv4_address,
+ node->nd_ipv4_port);
if (ret) {
destroy_workqueue(o2net_wq);
o2net_wq = NULL;
diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h
index 21a4e43df83..da880fc215f 100644
--- a/fs/ocfs2/cluster/tcp.h
+++ b/fs/ocfs2/cluster/tcp.h
@@ -50,7 +50,10 @@ struct o2net_msg
__u8 buf[0];
};
-typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data);
+typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
+typedef void (o2net_post_msg_handler_func)(int status, void *data,
+ void *ret_data);
#define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg))
@@ -99,6 +102,7 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *vec,
int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
o2net_msg_handler_func *func, void *data,
+ o2net_post_msg_handler_func *post_func,
struct list_head *unreg_list);
void o2net_unregister_handler_list(struct list_head *list);
diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h
index b700dc9624d..4dae5df5e46 100644
--- a/fs/ocfs2/cluster/tcp_internal.h
+++ b/fs/ocfs2/cluster/tcp_internal.h
@@ -38,6 +38,12 @@
* locking semantics of the file system using the protocol. It should
* be somewhere else, I'm sure, but right now it isn't.
*
+ * New in version 7:
+ * - DLM join domain includes the live nodemap
+ *
+ * New in version 6:
+ * - DLM lockres remote refcount fixes.
+ *
* New in version 5:
* - Network timeout checking protocol
*
@@ -51,7 +57,7 @@
* - full 64 bit i_size in the metadata lock lvbs
* - introduction of "rw" lock and pushing meta/data locking down
*/
-#define O2NET_PROTOCOL_VERSION 5ULL
+#define O2NET_PROTOCOL_VERSION 7ULL
struct o2net_handshake {
__be64 protocol_version;
__be64 connector_id;
@@ -149,6 +155,8 @@ struct o2net_sock_container {
struct timeval sc_tv_func_stop;
u32 sc_msg_key;
u16 sc_msg_type;
+
+ struct mutex sc_send_lock;
};
struct o2net_msg_handler {
@@ -158,6 +166,8 @@ struct o2net_msg_handler {
u32 nh_key;
o2net_msg_handler_func *nh_func;
o2net_msg_handler_func *nh_func_data;
+ o2net_post_msg_handler_func
+ *nh_post_func;
struct kref nh_kref;
struct list_head nh_unregister_item;
};
diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c
index 681046d5139..241cad342a4 100644
--- a/fs/ocfs2/dlm/dlmast.c
+++ b/fs/ocfs2/dlm/dlmast.c
@@ -263,7 +263,8 @@ void dlm_do_local_bast(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
-int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
int ret;
unsigned int locklen;
@@ -311,8 +312,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
past->type != DLM_BAST) {
mlog(ML_ERROR, "Unknown ast type! %d, cookie=%u:%llu"
"name=%.*s\n", past->type,
- dlm_get_lock_cookie_node(cookie),
- dlm_get_lock_cookie_seq(cookie),
+ dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
locklen, name);
ret = DLM_IVLOCKID;
goto leave;
@@ -323,8 +324,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
mlog(0, "got %sast for unknown lockres! "
"cookie=%u:%llu, name=%.*s, namelen=%u\n",
past->type == DLM_AST ? "" : "b",
- dlm_get_lock_cookie_node(cookie),
- dlm_get_lock_cookie_seq(cookie),
+ dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
locklen, name, locklen);
ret = DLM_IVLOCKID;
goto leave;
@@ -369,7 +370,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
mlog(0, "got %sast for unknown lock! cookie=%u:%llu, "
"name=%.*s, namelen=%u\n", past->type == DLM_AST ? "" : "b",
- dlm_get_lock_cookie_node(cookie), dlm_get_lock_cookie_seq(cookie),
+ dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
locklen, name, locklen);
ret = DLM_NORMAL;
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 6b6ff76538c..e90b92f9ece 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -180,6 +180,11 @@ struct dlm_assert_master_priv
unsigned ignore_higher:1;
};
+struct dlm_deref_lockres_priv
+{
+ struct dlm_lock_resource *deref_res;
+ u8 deref_node;
+};
struct dlm_work_item
{
@@ -191,6 +196,7 @@ struct dlm_work_item
struct dlm_request_all_locks_priv ral;
struct dlm_mig_lockres_priv ml;
struct dlm_assert_master_priv am;
+ struct dlm_deref_lockres_priv dl;
} u;
};
@@ -222,6 +228,9 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm,
#define DLM_LOCK_RES_DIRTY 0x00000008
#define DLM_LOCK_RES_IN_PROGRESS 0x00000010
#define DLM_LOCK_RES_MIGRATING 0x00000020
+#define DLM_LOCK_RES_DROPPING_REF 0x00000040
+#define DLM_LOCK_RES_BLOCK_DIRTY 0x00001000
+#define DLM_LOCK_RES_SETREF_INPROG 0x00002000
/* max milliseconds to wait to sync up a network failure with a node death */
#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
@@ -265,6 +274,8 @@ struct dlm_lock_resource
u8 owner; //node which owns the lock resource, or unknown
u16 state;
char lvb[DLM_LVB_LEN];
+ unsigned int inflight_locks;
+ unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
};
struct dlm_migratable_lock
@@ -367,7 +378,7 @@ enum {
DLM_CONVERT_LOCK_MSG, /* 504 */
DLM_PROXY_AST_MSG, /* 505 */
DLM_UNLOCK_LOCK_MSG, /* 506 */
- DLM_UNUSED_MSG2, /* 507 */
+ DLM_DEREF_LOCKRES_MSG, /* 507 */
DLM_MIGRATE_REQUEST_MSG, /* 508 */
DLM_MIG_LOCKRES_MSG, /* 509 */
DLM_QUERY_JOIN_MSG, /* 510 */
@@ -417,6 +428,9 @@ struct dlm_master_request
u8 name[O2NM_MAX_NAME_LEN];
};
+#define DLM_ASSERT_RESPONSE_REASSERT 0x00000001
+#define DLM_ASSERT_RESPONSE_MASTERY_REF 0x00000002
+
#define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001
#define DLM_ASSERT_MASTER_REQUERY 0x00000002
#define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004
@@ -430,6 +444,8 @@ struct dlm_assert_master
u8 name[O2NM_MAX_NAME_LEN];
};
+#define DLM_MIGRATE_RESPONSE_MASTERY_REF 0x00000001
+
struct dlm_migrate_request
{
u8 master;
@@ -609,12 +625,16 @@ struct dlm_begin_reco
};
+#define BITS_PER_BYTE 8
+#define BITS_TO_BYTES(bits) (((bits)+BITS_PER_BYTE-1)/BITS_PER_BYTE)
+
struct dlm_query_join_request
{
u8 node_idx;
u8 pad1[2];
u8 name_len;
u8 domain[O2NM_MAX_NAME_LEN];
+ u8 node_map[BITS_TO_BYTES(O2NM_MAX_NODES)];
};
struct dlm_assert_joined
@@ -648,6 +668,16 @@ struct dlm_finalize_reco
__be32 pad2;
};
+struct dlm_deref_lockres
+{
+ u32 pad1;
+ u16 pad2;
+ u8 node_idx;
+ u8 namelen;
+
+ u8 name[O2NM_MAX_NAME_LEN];
+};
+
static inline enum dlm_status
__dlm_lockres_state_to_status(struct dlm_lock_resource *res)
{
@@ -688,16 +718,20 @@ void dlm_lock_put(struct dlm_lock *lock);
void dlm_lock_attach_lockres(struct dlm_lock *lock,
struct dlm_lock_resource *res);
-int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data);
+int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
+int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
+int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
void dlm_revert_pending_convert(struct dlm_lock_resource *res,
struct dlm_lock *lock);
void dlm_revert_pending_lock(struct dlm_lock_resource *res,
struct dlm_lock *lock);
-int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data);
+int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
void dlm_commit_pending_cancel(struct dlm_lock_resource *res,
struct dlm_lock *lock);
void dlm_commit_pending_unlock(struct dlm_lock_resource *res,
@@ -721,8 +755,6 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
-void dlm_purge_lockres(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *lockres);
static inline void dlm_lockres_get(struct dlm_lock_resource *res)
{
/* This is called on every lookup, so it might be worth
@@ -733,6 +765,10 @@ void dlm_lockres_put(struct dlm_lock_resource *res);
void __dlm_unhash_lockres(struct dlm_lock_resource *res);
void __dlm_insert_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
+struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
+ const char *name,
+ unsigned int len,
+ unsigned int hash);
struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
@@ -753,6 +789,47 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int namelen);
+#define dlm_lockres_set_refmap_bit(bit,res) \
+ __dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__)
+#define dlm_lockres_clear_refmap_bit(bit,res) \
+ __dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__)
+
+static inline void __dlm_lockres_set_refmap_bit(int bit,
+ struct dlm_lock_resource *res,
+ const char *file,
+ int line)
+{
+ //printk("%s:%d:%.*s: setting bit %d\n", file, line,
+ // res->lockname.len, res->lockname.name, bit);
+ set_bit(bit, res->refmap);
+}
+
+static inline void __dlm_lockres_clear_refmap_bit(int bit,
+ struct dlm_lock_resource *res,
+ const char *file,
+ int line)
+{
+ //printk("%s:%d:%.*s: clearing bit %d\n", file, line,
+ // res->lockname.len, res->lockname.name, bit);
+ clear_bit(bit, res->refmap);
+}
+
+void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ const char *file,
+ int line);
+void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ int new_lockres,
+ const char *file,
+ int line);
+#define dlm_lockres_drop_inflight_ref(d,r) \
+ __dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__)
+#define dlm_lockres_grab_inflight_ref(d,r) \
+ __dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__)
+#define dlm_lockres_grab_inflight_ref_new(d,r) \
+ __dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__)
+
void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void dlm_do_local_ast(struct dlm_ctxt *dlm,
@@ -801,10 +878,7 @@ int dlm_heartbeat_init(struct dlm_ctxt *dlm);
void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data);
void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data);
-int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
-int dlm_migrate_lockres(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 target);
+int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
int dlm_finish_migration(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 old_master);
@@ -812,15 +886,27 @@ void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res);
-int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data);
+int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
+int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
+void dlm_assert_master_post_handler(int status, void *data, void *ret_data);
+int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
+int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
+int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
+int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
+int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
+int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
+int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
+int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data);
int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
u8 nodenum, u8 *real_master);
@@ -856,10 +942,12 @@ static inline void __dlm_wait_on_lockres(struct dlm_lock_resource *res)
int dlm_init_mle_cache(void);
void dlm_destroy_mle_cache(void);
void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up);
+int dlm_drop_lockres_ref(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res);
void dlm_clean_master_list(struct dlm_ctxt *dlm,
u8 dead_node);
int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
-
+int __dlm_lockres_has_locks(struct dlm_lock_resource *res);
int __dlm_lockres_unused(struct dlm_lock_resource *res);
static inline const char * dlm_lock_mode_name(int mode)
diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c
index c764dc8e40a..ecb4d997221 100644
--- a/fs/ocfs2/dlm/dlmconvert.c
+++ b/fs/ocfs2/dlm/dlmconvert.c
@@ -286,8 +286,8 @@ enum dlm_status dlmconvert_remote(struct dlm_ctxt *dlm,
__dlm_print_one_lock_resource(res);
mlog(ML_ERROR, "converting a remote lock that is already "
"converting! (cookie=%u:%llu, conv=%d)\n",
- dlm_get_lock_cookie_node(lock->ml.cookie),
- dlm_get_lock_cookie_seq(lock->ml.cookie),
+ dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
lock->ml.convert_type);
status = DLM_DENIED;
goto bail;
@@ -418,7 +418,8 @@ static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
* returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS,
* status from __dlmconvert_master
*/
-int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf;
@@ -428,7 +429,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
struct dlm_lockstatus *lksb;
enum dlm_status status = DLM_NORMAL;</