diff options
Diffstat (limited to 'net/ceph')
| -rw-r--r-- | net/ceph/auth_none.h | 2 | ||||
| -rw-r--r-- | net/ceph/auth_x.h | 2 | ||||
| -rw-r--r-- | net/ceph/buffer.c | 22 | ||||
| -rw-r--r-- | net/ceph/ceph_common.c | 26 | ||||
| -rw-r--r-- | net/ceph/crush/crush.c | 7 | ||||
| -rw-r--r-- | net/ceph/crush/mapper.c | 375 | ||||
| -rw-r--r-- | net/ceph/crypto.h | 48 | ||||
| -rw-r--r-- | net/ceph/debugfs.c | 66 | ||||
| -rw-r--r-- | net/ceph/messenger.c | 109 | ||||
| -rw-r--r-- | net/ceph/mon_client.c | 158 | ||||
| -rw-r--r-- | net/ceph/osd_client.c | 406 | ||||
| -rw-r--r-- | net/ceph/osdmap.c | 1059 | ||||
| -rw-r--r-- | net/ceph/pagevec.c | 35 |
13 files changed, 1750 insertions, 565 deletions
diff --git a/net/ceph/auth_none.h b/net/ceph/auth_none.h index ed7d088b1bc..059a3ce4b53 100644 --- a/net/ceph/auth_none.h +++ b/net/ceph/auth_none.h @@ -23,7 +23,7 @@ struct ceph_auth_none_info { struct ceph_none_authorizer au; /* we only need one; it's static */ }; -extern int ceph_auth_none_init(struct ceph_auth_client *ac); +int ceph_auth_none_init(struct ceph_auth_client *ac); #endif diff --git a/net/ceph/auth_x.h b/net/ceph/auth_x.h index c5a058da7ac..65ee72082d9 100644 --- a/net/ceph/auth_x.h +++ b/net/ceph/auth_x.h @@ -45,7 +45,7 @@ struct ceph_x_info { struct ceph_x_authorizer auth_authorizer; }; -extern int ceph_x_init(struct ceph_auth_client *ac); +int ceph_x_init(struct ceph_auth_client *ac); #endif diff --git a/net/ceph/buffer.c b/net/ceph/buffer.c index bf3e6a13c21..621b5f65407 100644 --- a/net/ceph/buffer.c +++ b/net/ceph/buffer.c @@ -6,6 +6,7 @@ #include <linux/ceph/buffer.h> #include <linux/ceph/decode.h> +#include <linux/ceph/libceph.h> /* for ceph_kv{malloc,free} */ struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp) { @@ -15,16 +16,10 @@ struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp) if (!b) return NULL; - b->vec.iov_base = kmalloc(len, gfp | __GFP_NOWARN); - if (b->vec.iov_base) { - b->is_vmalloc = false; - } else { - b->vec.iov_base = __vmalloc(len, gfp | __GFP_HIGHMEM, PAGE_KERNEL); - if (!b->vec.iov_base) { - kfree(b); - return NULL; - } - b->is_vmalloc = true; + b->vec.iov_base = ceph_kvmalloc(len, gfp); + if (!b->vec.iov_base) { + kfree(b); + return NULL; } kref_init(&b->kref); @@ -40,12 +35,7 @@ void ceph_buffer_release(struct kref *kref) struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref); dout("buffer_release %p\n", b); - if (b->vec.iov_base) { - if (b->is_vmalloc) - vfree(b->vec.iov_base); - else - kfree(b->vec.iov_base); - } + ceph_kvfree(b->vec.iov_base); kfree(b); } EXPORT_SYMBOL(ceph_buffer_release); diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 34b11ee8124..1675021d8c1 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -15,6 +15,7 @@ #include <linux/slab.h> #include <linux/statfs.h> #include <linux/string.h> +#include <linux/vmalloc.h> #include <linux/nsproxy.h> #include <net/net_namespace.h> @@ -71,6 +72,8 @@ const char *ceph_msg_type_name(int type) case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack"; case CEPH_MSG_STATFS: return "statfs"; case CEPH_MSG_STATFS_REPLY: return "statfs_reply"; + case CEPH_MSG_MON_GET_VERSION: return "mon_get_version"; + case CEPH_MSG_MON_GET_VERSION_REPLY: return "mon_get_version_reply"; case CEPH_MSG_MDS_MAP: return "mds_map"; case CEPH_MSG_CLIENT_SESSION: return "client_session"; case CEPH_MSG_CLIENT_RECONNECT: return "client_reconnect"; @@ -170,6 +173,25 @@ int ceph_compare_options(struct ceph_options *new_opt, } EXPORT_SYMBOL(ceph_compare_options); +void *ceph_kvmalloc(size_t size, gfp_t flags) +{ + if (size <= (PAGE_SIZE << PAGE_ALLOC_COSTLY_ORDER)) { + void *ptr = kmalloc(size, flags | __GFP_NOWARN); + if (ptr) + return ptr; + } + + return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL); +} + +void ceph_kvfree(const void *ptr) +{ + if (is_vmalloc_addr(ptr)) + vfree(ptr); + else + kfree(ptr); +} + static int parse_fsid(const char *str, struct ceph_fsid *fsid) { @@ -461,8 +483,8 @@ EXPORT_SYMBOL(ceph_client_id); * create a fresh client instance */ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, - unsigned int supported_features, - unsigned int required_features) + u64 supported_features, + u64 required_features) { struct ceph_client *client; struct ceph_entity_addr *myaddr = NULL; diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c index 089613234f0..16bc199d9a6 100644 --- a/net/ceph/crush/crush.c +++ b/net/ceph/crush/crush.c @@ -116,11 +116,14 @@ void crush_destroy(struct crush_map *map) if (map->rules) { __u32 b; for (b = 0; b < map->max_rules; b++) - kfree(map->rules[b]); + crush_destroy_rule(map->rules[b]); kfree(map->rules); } kfree(map); } - +void crush_destroy_rule(struct crush_rule *rule) +{ + kfree(rule); +} diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index cbd06a91941..a1ef53c0441 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -189,7 +189,7 @@ static int terminal(int x) static int bucket_tree_choose(struct crush_bucket_tree *bucket, int x, int r) { - int n, l; + int n; __u32 w; __u64 t; @@ -197,6 +197,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket, n = bucket->num_nodes >> 1; while (!terminal(n)) { + int l; /* pick point in [0, w) */ w = bucket->node_weights[n]; t = (__u64)crush_hash32_4(bucket->h.hash, x, n, r, @@ -264,8 +265,12 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r) * true if device is marked "out" (failed, fully offloaded) * of the cluster */ -static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x) +static int is_out(const struct crush_map *map, + const __u32 *weight, int weight_max, + int item, int x) { + if (item >= weight_max) + return 1; if (weight[item] >= 0x10000) return 0; if (weight[item] == 0) @@ -277,7 +282,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in } /** - * crush_choose - choose numrep distinct items of given type + * crush_choose_firstn - choose numrep distinct items of given type * @map: the crush_map * @bucket: the bucket we are choose an item from * @x: crush input value @@ -285,18 +290,28 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in * @type: the type of item to choose * @out: pointer to output vector * @outpos: our position in that vector - * @firstn: true if choosing "first n" items, false if choosing "indep" - * @recurse_to_leaf: true if we want one device under each item of given type - * @descend_once: true if we should only try one descent before giving up + * @tries: number of attempts to make + * @recurse_tries: number of attempts to have recursive chooseleaf make + * @local_retries: localized retries + * @local_fallback_retries: localized fallback retries + * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose) + * @vary_r: pass r to recursive calls * @out2: second output vector for leaf items (if @recurse_to_leaf) + * @parent_r: r value passed from the parent */ -static int crush_choose(const struct crush_map *map, - struct crush_bucket *bucket, - const __u32 *weight, - int x, int numrep, int type, - int *out, int outpos, - int firstn, int recurse_to_leaf, - int descend_once, int *out2) +static int crush_choose_firstn(const struct crush_map *map, + struct crush_bucket *bucket, + const __u32 *weight, int weight_max, + int x, int numrep, int type, + int *out, int outpos, + unsigned int tries, + unsigned int recurse_tries, + unsigned int local_retries, + unsigned int local_fallback_retries, + int recurse_to_leaf, + unsigned int vary_r, + int *out2, + int parent_r) { int rep; unsigned int ftotal, flocal; @@ -308,8 +323,11 @@ static int crush_choose(const struct crush_map *map, int itemtype; int collide, reject; - dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", - bucket->id, x, outpos, numrep); + dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d tries %d recurse_tries %d local_retries %d local_fallback_retries %d parent_r %d\n", + recurse_to_leaf ? "_LEAF" : "", + bucket->id, x, outpos, numrep, + tries, recurse_tries, local_retries, local_fallback_retries, + parent_r); for (rep = outpos; rep < numrep; rep++) { /* keep trying until we get a non-out, non-colliding item */ @@ -324,36 +342,18 @@ static int crush_choose(const struct crush_map *map, do { collide = 0; retry_bucket = 0; - r = rep; - if (in->alg == CRUSH_BUCKET_UNIFORM) { - /* be careful */ - if (firstn || (__u32)numrep >= in->size) - /* r' = r + f_total */ - r += ftotal; - else if (in->size % numrep == 0) - /* r'=r+(n+1)*f_local */ - r += (numrep+1) * - (flocal+ftotal); - else - /* r' = r + n*f_local */ - r += numrep * (flocal+ftotal); - } else { - if (firstn) - /* r' = r + f_total */ - r += ftotal; - else - /* r' = r + n*f_local */ - r += numrep * (flocal+ftotal); - } + r = rep + parent_r; + /* r' = r + f_total */ + r += ftotal; /* bucket choose */ if (in->size == 0) { reject = 1; goto reject; } - if (map->choose_local_fallback_tries > 0 && + if (local_fallback_retries > 0 && flocal >= (in->size>>1) && - flocal > map->choose_local_fallback_tries) + flocal > local_fallback_retries) item = bucket_perm_choose(in, x, r); else item = crush_bucket_choose(in, x, r); @@ -394,14 +394,23 @@ static int crush_choose(const struct crush_map *map, reject = 0; if (!collide && recurse_to_leaf) { if (item < 0) { - if (crush_choose(map, + int sub_r; + if (vary_r) + sub_r = r >> (vary_r-1); + else + sub_r = 0; + if (crush_choose_firstn(map, map->buckets[-1-item], - weight, + weight, weight_max, x, outpos+1, 0, out2, outpos, - firstn, 0, - map->chooseleaf_descend_once, - NULL) <= outpos) + recurse_tries, 0, + local_retries, + local_fallback_retries, + 0, + vary_r, + NULL, + sub_r) <= outpos) /* didn't get leaf */ reject = 1; } else { @@ -414,6 +423,7 @@ static int crush_choose(const struct crush_map *map, /* out? */ if (itemtype == 0) reject = is_out(map, weight, + weight_max, item, x); else reject = 0; @@ -424,17 +434,14 @@ reject: ftotal++; flocal++; - if (reject && descend_once) - /* let outer call try again */ - skip_rep = 1; - else if (collide && flocal <= map->choose_local_tries) + if (collide && flocal <= local_retries) /* retry locally a few times */ retry_bucket = 1; - else if (map->choose_local_fallback_tries > 0 && - flocal <= in->size + map->choose_local_fallback_tries) + else if (local_fallback_retries > 0 && + flocal <= in->size + local_fallback_retries) /* exhaustive bucket search */ retry_bucket = 1; - else if (ftotal <= map->choose_total_tries) + else if (ftotal < tries) /* then retry descent */ retry_descent = 1; else @@ -464,21 +471,179 @@ reject: /** + * crush_choose_indep: alternative breadth-first positionally stable mapping + * + */ +static void crush_choose_indep(const struct crush_map *map, + struct crush_bucket *bucket, + const __u32 *weight, int weight_max, + int x, int left, int numrep, int type, + int *out, int outpos, + unsigned int tries, + unsigned int recurse_tries, + int recurse_to_leaf, + int *out2, + int parent_r) +{ + struct crush_bucket *in = bucket; + int endpos = outpos + left; + int rep; + unsigned int ftotal; + int r; + int i; + int item = 0; + int itemtype; + int collide; + + dprintk("CHOOSE%s INDEP bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", + bucket->id, x, outpos, numrep); + + /* initially my result is undefined */ + for (rep = outpos; rep < endpos; rep++) { + out[rep] = CRUSH_ITEM_UNDEF; + if (out2) + out2[rep] = CRUSH_ITEM_UNDEF; + } + + for (ftotal = 0; left > 0 && ftotal < tries; ftotal++) { + for (rep = outpos; rep < endpos; rep++) { + if (out[rep] != CRUSH_ITEM_UNDEF) + continue; + + in = bucket; /* initial bucket */ + + /* choose through intervening buckets */ + for (;;) { + /* note: we base the choice on the position + * even in the nested call. that means that + * if the first layer chooses the same bucket + * in a different position, we will tend to + * choose a different item in that bucket. + * this will involve more devices in data + * movement and tend to distribute the load. + */ + r = rep + parent_r; + + /* be careful */ + if (in->alg == CRUSH_BUCKET_UNIFORM && + in->size % numrep == 0) + /* r'=r+(n+1)*f_total */ + r += (numrep+1) * ftotal; + else + /* r' = r + n*f_total */ + r += numrep * ftotal; + + /* bucket choose */ + if (in->size == 0) { + dprintk(" empty bucket\n"); + break; + } + + item = crush_bucket_choose(in, x, r); + if (item >= map->max_devices) { + dprintk(" bad item %d\n", item); + out[rep] = CRUSH_ITEM_NONE; + if (out2) + out2[rep] = CRUSH_ITEM_NONE; + left--; + break; + } + + /* desired type? */ + if (item < 0) + itemtype = map->buckets[-1-item]->type; + else + itemtype = 0; + dprintk(" item %d type %d\n", item, itemtype); + + /* keep going? */ + if (itemtype != type) { + if (item >= 0 || + (-1-item) >= map->max_buckets) { + dprintk(" bad item type %d\n", type); + out[rep] = CRUSH_ITEM_NONE; + if (out2) + out2[rep] = + CRUSH_ITEM_NONE; + left--; + break; + } + in = map->buckets[-1-item]; + continue; + } + + /* collision? */ + collide = 0; + for (i = outpos; i < endpos; i++) { + if (out[i] == item) { + collide = 1; + break; + } + } + if (collide) + break; + + if (recurse_to_leaf) { + if (item < 0) { + crush_choose_indep(map, + map->buckets[-1-item], + weight, weight_max, + x, 1, numrep, 0, + out2, rep, + recurse_tries, 0, + 0, NULL, r); + if (out2[rep] == CRUSH_ITEM_NONE) { + /* placed nothing; no leaf */ + break; + } + } else { + /* we already have a leaf! */ + out2[rep] = item; + } + } + + /* out? */ + if (itemtype == 0 && + is_out(map, weight, weight_max, item, x)) + break; + + /* yay! */ + out[rep] = item; + left--; + break; + } + } + } + for (rep = outpos; rep < endpos; rep++) { + if (out[rep] == CRUSH_ITEM_UNDEF) { + out[rep] = CRUSH_ITEM_NONE; + } + if (out2 && out2[rep] == CRUSH_ITEM_UNDEF) { + out2[rep] = CRUSH_ITEM_NONE; + } + } +} + +/** * crush_do_rule - calculate a mapping with the given input and rule * @map: the crush_map * @ruleno: the rule id * @x: hash input * @result: pointer to result vector * @result_max: maximum result size + * @weight: weight vector (for map leaves) + * @weight_max: size of weight vector + * @scratch: scratch vector for private use; must be >= 3 * result_max */ int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, - const __u32 *weight) + const __u32 *weight, int weight_max, + int *scratch) { int result_len; - int a[CRUSH_MAX_SET]; - int b[CRUSH_MAX_SET]; - int c[CRUSH_MAX_SET]; + int *a = scratch; + int *b = scratch + result_max; + int *c = scratch + result_max*2; int recurse_to_leaf; int *w; int wsize = 0; @@ -489,8 +654,20 @@ int crush_do_rule(const struct crush_map *map, __u32 step; int i, j; int numrep; - int firstn; - const int descend_once = 0; + /* + * the original choose_total_tries value was off by one (it + * counted "retries" and not "tries"). add one. + */ + int choose_tries = map->choose_total_tries + 1; + int choose_leaf_tries = 0; + /* + * the local tries values were counted as "retries", though, + * and need no adjustment + */ + int choose_local_retries = map->choose_local_tries; + int choose_local_fallback_retries = map->choose_local_fallback_tries; + + int vary_r = map->chooseleaf_vary_r; if ((__u32)ruleno >= map->max_rules) { dprintk(" bad ruleno %d\n", ruleno); @@ -503,29 +680,54 @@ int crush_do_rule(const struct crush_map *map, o = b; for (step = 0; step < rule->len; step++) { + int firstn = 0; struct crush_rule_step *curstep = &rule->steps[step]; - firstn = 0; switch (curstep->op) { case CRUSH_RULE_TAKE: w[0] = curstep->arg1; wsize = 1; break; - case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: + case CRUSH_RULE_SET_CHOOSE_TRIES: + if (curstep->arg1 > 0) + choose_tries = curstep->arg1; + break; + + case CRUSH_RULE_SET_CHOOSELEAF_TRIES: + if (curstep->arg1 > 0) + choose_leaf_tries = curstep->arg1; + break; + + case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES: + if (curstep->arg1 >= 0) + choose_local_retries = curstep->arg1; + break; + + case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES: + if (curstep->arg1 >= 0) + choose_local_fallback_retries = curstep->arg1; + break; + + case CRUSH_RULE_SET_CHOOSELEAF_VARY_R: + if (curstep->arg1 >= 0) + vary_r = curstep->arg1; + break; + + case CRUSH_RULE_CHOOSELEAF_FIRSTN: case CRUSH_RULE_CHOOSE_FIRSTN: firstn = 1; /* fall through */ - case CRUSH_RULE_CHOOSE_LEAF_INDEP: + case CRUSH_RULE_CHOOSELEAF_INDEP: case CRUSH_RULE_CHOOSE_INDEP: if (wsize == 0) break; recurse_to_leaf = curstep->op == - CRUSH_RULE_CHOOSE_LEAF_FIRSTN || + CRUSH_RULE_CHOOSELEAF_FIRSTN || curstep->op == - CRUSH_RULE_CHOOSE_LEAF_INDEP; + CRUSH_RULE_CHOOSELEAF_INDEP; /* reset output */ osize = 0; @@ -543,22 +745,53 @@ int crush_do_rule(const struct crush_map *map, continue; } j = 0; - osize += crush_choose(map, - map->buckets[-1-w[i]], - weight, - x, numrep, - curstep->arg2, - o+osize, j, - firstn, - recurse_to_leaf, - descend_once, c+osize); + if (firstn) { + int recurse_tries; + if (choose_leaf_tries) + recurse_tries = + choose_leaf_tries; + else if (map->chooseleaf_descend_once) + recurse_tries = 1; + else + recurse_tries = choose_tries; + osize += crush_choose_firstn( + map, + map->buckets[-1-w[i]], + weight, weight_max, + x, numrep, + curstep->arg2, + o+osize, j, + choose_tries, + recurse_tries, + choose_local_retries, + choose_local_fallback_retries, + recurse_to_leaf, + vary_r, + c+osize, + 0); + } else { + crush_choose_indep( + map, + map->buckets[-1-w[i]], + weight, weight_max, + x, numrep, numrep, + curstep->arg2, + o+osize, j, + choose_tries, + choose_leaf_tries ? + choose_leaf_tries : 1, + recurse_to_leaf, + c+osize, + 0); + osize += numrep; + } } if (recurse_to_leaf) /* copy final _leaf_ values to output set */ memcpy(o, c, osize*sizeof(*o)); - /* swap t and w arrays */ + /* swap o and w arrays */ tmp = o; o = w; w = tmp; diff --git a/net/ceph/crypto.h b/net/ceph/crypto.h index 3572dc518bc..d1498224c49 100644 --- a/net/ceph/crypto.h +++ b/net/ceph/crypto.h @@ -20,34 +20,32 @@ static inline void ceph_crypto_key_destroy(struct ceph_crypto_key *key) kfree(key->key); } -extern int ceph_crypto_key_clone(struct ceph_crypto_key *dst, - const struct ceph_crypto_key *src); -extern int ceph_crypto_key_encode(struct ceph_crypto_key *key, - void **p, void *end); -extern int ceph_crypto_key_decode(struct ceph_crypto_key *key, - void **p, void *end); -extern int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *in); +int ceph_crypto_key_clone(struct ceph_crypto_key *dst, + const struct ceph_crypto_key *src); +int ceph_crypto_key_encode(struct ceph_crypto_key *key, void **p, void *end); +int ceph_crypto_key_decode(struct ceph_crypto_key *key, void **p, void *end); +int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *in); /* crypto.c */ -extern int ceph_decrypt(struct ceph_crypto_key *secret, - void *dst, size_t *dst_len, - const void *src, size_t src_len); -extern int ceph_encrypt(struct ceph_crypto_key *secret, - void *dst, size_t *dst_len, - const void *src, size_t src_len); -extern int ceph_decrypt2(struct ceph_crypto_key *secret, - void *dst1, size_t *dst1_len, - void *dst2, size_t *dst2_len, - const void *src, size_t src_len); -extern int ceph_encrypt2(struct ceph_crypto_key *secret, - void *dst, size_t *dst_len, - const void *src1, size_t src1_len, - const void *src2, size_t src2_len); -extern int ceph_crypto_init(void); -extern void ceph_crypto_shutdown(void); +int ceph_decrypt(struct ceph_crypto_key *secret, + void *dst, size_t *dst_len, + const void *src, size_t src_len); +int ceph_encrypt(struct ceph_crypto_key *secret, + void *dst, size_t *dst_len, + const void *src, size_t src_len); +int ceph_decrypt2(struct ceph_crypto_key *secret, + void *dst1, size_t *dst1_len, + void *dst2, size_t *dst2_len, + const void *src, size_t src_len); +int ceph_encrypt2(struct ceph_crypto_key *secret, + void *dst, size_t *dst_len, + const void *src1, size_t src1_len, + const void *src2, size_t src2_len); +int ceph_crypto_init(void); +void ceph_crypto_shutdown(void); /* armor.c */ -extern int ceph_armor(char *dst, const char *src, const char *end); -extern int ceph_unarmor(char *dst, const char *src, const char *end); +int ceph_armor(char *dst, const char *src, const char *end); +int ceph_unarmor(char *dst, const char *src, const char *end); #endif diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 83661cdc076..d1a62c69a9f 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -53,34 +53,55 @@ static int osdmap_show(struct seq_file *s, void *p) { int i; struct ceph_client *client = s->private; + struct ceph_osdmap *map = client->osdc.osdmap; struct rb_node *n; - if (client->osdc.osdmap == NULL) + if (map == NULL) return 0; - seq_printf(s, "epoch %d\n", client->osdc.osdmap->epoch); + + seq_printf(s, "epoch %d\n", map->epoch); seq_printf(s, "flags%s%s\n", - (client->osdc.osdmap->flags & CEPH_OSDMAP_NEARFULL) ? - " NEARFULL" : "", - (client->osdc.osdmap->flags & CEPH_OSDMAP_FULL) ? - " FULL" : ""); - for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) { + (map->flags & CEPH_OSDMAP_NEARFULL) ? " NEARFULL" : "", + (map->flags & CEPH_OSDMAP_FULL) ? " FULL" : ""); + + for (n = rb_first(&map->pg_pools); n; n = rb_next(n)) { struct ceph_pg_pool_info *pool = rb_entry(n, struct ceph_pg_pool_info, node); - seq_printf(s, "pg_pool %llu pg_num %d / %d\n", - (unsigned long long)pool->id, pool->pg_num, - pool->pg_num_mask); + + seq_printf(s, "pool %lld pg_num %u (%d) read_tier %lld write_tier %lld\n", + pool->id, pool->pg_num, pool->pg_num_mask, + pool->read_tier, pool->write_tier); } - for (i = 0; i < client->osdc.osdmap->max_osd; i++) { - struct ceph_entity_addr *addr = - &client->osdc.osdmap->osd_addr[i]; - int state = client->osdc.osdmap->osd_state[i]; + for (i = 0; i < map->max_osd; i++) { + struct ceph_entity_addr *addr = &map->osd_addr[i]; + int state = map->osd_state[i]; char sb[64]; - seq_printf(s, "\tosd%d\t%s\t%3d%%\t(%s)\n", + seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n", i, ceph_pr_addr(&addr->in_addr), - ((client->osdc.osdmap->osd_weight[i]*100) >> 16), - ceph_osdmap_state_str(sb, sizeof(sb), state)); + ((map->osd_weight[i]*100) >> 16), + ceph_osdmap_state_str(sb, sizeof(sb), state), + ((ceph_get_primary_affinity(map, i)*100) >> 16)); + } + for (n = rb_first(&map->pg_temp); n; n = rb_next(n)) { + struct ceph_pg_mapping *pg = + rb_entry(n, struct ceph_pg_mapping, node); + + seq_printf(s, "pg_temp %llu.%x [", pg->pgid.pool, + pg->pgid.seed); + for (i = 0; i < pg->pg_temp.len; i++) + seq_printf(s, "%s%d", (i == 0 ? "" : ","), + pg->pg_temp.osds[i]); + seq_printf(s, "]\n"); } + for (n = rb_first(&map->primary_temp); n; n = rb_next(n)) { + struct ceph_pg_mapping *pg = + rb_entry(n, struct ceph_pg_mapping, node); + + seq_printf(s, "primary_temp %llu.%x %d\n", pg->pgid.pool, + pg->pgid.seed, pg->primary_temp.osd); + } + return 0; } @@ -105,9 +126,13 @@ static int monc_show(struct seq_file *s, void *p) req = rb_entry(rp, struct ceph_mon_generic_request, node); op = le16_to_cpu(req->request->hdr.type); if (op == CEPH_MSG_STATFS) - seq_printf(s, "%lld statfs\n", req->tid); + seq_printf(s, "%llu statfs\n", req->tid); + else if (op == CEPH_MSG_POOLOP) + seq_printf(s, "%llu poolop\n", req->tid); + else if (op == CEPH_MSG_MON_GET_VERSION) + seq_printf(s, "%llu mon_get_version", req->tid); else - seq_printf(s, "%lld unknown\n", req->tid); + seq_printf(s, "%llu unknown\n", req->tid); } mutex_unlock(&monc->mutex); @@ -132,7 +157,8 @@ static int osdc_show(struct seq_file *s, void *pp) req->r_osd ? req->r_osd->o_osd : -1, req->r_pgid.pool, req->r_pgid.seed); - seq_printf(s, "%.*s", req->r_oid_len, req->r_oid); + seq_printf(s, "%.*s", req->r_base_oid.name_len, + req->r_base_oid.name); if (req->r_reassert_version.epoch) seq_printf(s, "\t%u'%llu", diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 4a5df7b1cc9..1948d592aa5 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -15,6 +15,7 @@ #include <linux/dns_resolver.h> #include <net/tcp.h> +#include <linux/ceph/ceph_features.h> #include <linux/ceph/libceph.h> #include <linux/ceph/messenger.h> #include <linux/ceph/decode.h> @@ -382,7 +383,7 @@ static void con_sock_state_closed(struct ceph_connection *con) */ /* data available on socket, or listen socket received a connect */ -static void ceph_sock_data_ready(struct sock *sk, int count_unused) +static void ceph_sock_data_ready(struct sock *sk) { struct ceph_connection *con = sk->sk_user_data; if (atomic_read(&con->msgr->stopping)) { @@ -556,7 +557,7 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, return r; } -static int ceph_tcp_sendpage(struct socket *sock, struct page *page, +static int __ceph_tcp_sendpage(struct socket *sock, struct page *page, int offset, size_t size, bool more) { int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR); @@ -569,6 +570,24 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page, return ret; } +static int ceph_tcp_sendpage(struct socket *sock, struct page *page, + int offset, size_t size, bool more) +{ + int ret; + struct kvec iov; + + /* sendpage cannot properly handle pages with page_count == 0, + * we need to fallback to sendmsg if that's the case */ + if (page_count(page) >= 1) + return __ceph_tcp_sendpage(sock, page, offset, size, more); + + iov.iov_base = kmap(page) + offset; + iov.iov_len = size; + ret = ceph_tcp_sendmsg(sock, &iov, 1, size, more); + kunmap(page); + + return ret; +} /* * Shutdown/close the socket for the given connection. @@ -777,13 +796,12 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor, bio = data->bio; BUG_ON(!bio); - BUG_ON(!bio->bi_vcnt); cursor->resid = min(length, data->bio_length); cursor->bio = bio; - cursor->vector_index = 0; - cursor->vector_offset = 0; - cursor->last_piece = length <= bio->bi_io_vec[0].bv_len; + cursor->bvec_iter = bio->bi_iter; + cursor->last_piece = + cursor->resid <= bio_iter_len(bio, cursor->bvec_iter); } static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor, @@ -792,71 +810,67 @@ static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor, { struct ceph_msg_data *data = cursor->data; struct bio *bio; - struct bio_vec *bio_vec; - unsigned int index; + struct bio_vec bio_vec; BUG_ON(data->type != CEPH_MSG_DATA_BIO); bio = cursor->bio; BUG_ON(!bio); - index = cursor->vector_index; - BUG_ON(index >= (unsigned int) bio->bi_vcnt); + bio_vec = bio_iter_iovec(bio, cursor->bvec_iter); - bio_vec = &bio->bi_io_vec[index]; - BUG_ON(cursor->vector_offset >= bio_vec->bv_len); - *page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset); + *page_offset = (size_t) bio_vec.bv_offset; BUG_ON(*page_offset >= PAGE_SIZE); if (cursor->last_piece) /* pagelist offset is always 0 */ *length = cursor->resid; else - *length = (size_t) (bio_vec->bv_len - cursor->vector_offset); + *length = (size_t) bio_vec.bv_len; BUG_ON(*length > cursor->resid); BUG_ON(*page_offset + *length > PAGE_SIZE); - return bio_vec->bv_page; + return bio_vec.bv_page; } static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) { struct bio *bio; - struct bio_vec *bio_vec; - unsigned int index; + struct bio_vec bio_vec; BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO); bio = cursor->bio; BUG_ON(!bio); - index = cursor->vector_index; - BUG_ON(index >= (unsigned int) bio->bi_vcnt); - bio_vec = &bio->bi_io_vec[index]; + bio_vec = bio_iter_iovec(bio, cursor->bvec_iter); /* Advance the cursor offset */ BUG_ON(cursor->resid < bytes); cursor->resid -= bytes; - cursor->vector_offset += bytes; - if (cursor->vector_offset < bio_vec->bv_len) + + bio_advance_iter(bio, &cursor->bvec_iter, bytes); + + if (bytes < bio_vec.bv_len) return false; /* more bytes to process in this segment */ - BUG_ON(cursor->vector_offset != bio_vec->bv_len); /* Move on to the next segment, and possibly the next bio */ - if (++index == (unsigned int) bio->bi_vcnt) { + if (!cursor->bvec_iter.bi_size) { bio = bio->bi_next; - index = 0; + cursor->bio = bio; + if (bio) + cursor->bvec_iter = bio->bi_iter; + else + memset(&cursor->bvec_iter, 0, + sizeof(cursor->bvec_iter)); } - cursor->bio = bio; - cursor->vector_index = index; - cursor->vector_offset = 0; if (!cursor->last_piece) { BUG_ON(!cursor->resid); BUG_ON(!bio); /* A short read is OK, so use <= rather than == */ - if (cursor->resid <= bio->bi_io_vec[index].bv_len) + if (cursor->resid <= bio_iter_len(bio, cursor->bvec_iter)) cursor->last_piece = true; } @@ -923,6 +937,9 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor, if (!bytes || cursor->page_offset) return false; /* more bytes to process in the current page */ + if (!cursor->resid) + return false; /* no more data */ + /* Move on to the next page; offset is already at 0 */ BUG_ON(cursor->page_index >= cursor->page_count); @@ -1008,6 +1025,9 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor, if (!bytes || cursor->offset & ~PAGE_MASK) return false; /* more bytes to process in the current page */ + if (!cursor->resid) + return false; /* no more data */ + /* Move on to the next page */ BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); @@ -1865,7 +1885,9 @@ int ceph_parse_ips(const char *c, const char *end, port = (port * 10) + (*p - '0'); p++; } - if (port > 65535 || port == 0) + if (port == 0) + port = CEPH_MON_PORT; + else if (port > 65535) goto bad; } else { port = CEPH_MON_PORT; @@ -1945,7 +1967,8 @@ static int process_connect(struct ceph_connection *con) { u64 sup_feat = con->msgr->supported_features; u64 req_feat = con->msgr->required_features; - u64 server_feat = le64_to_cpu(con->in_reply.features); + u64 server_feat = ceph_sanitize_features( + le64_to_cpu(con->in_reply.features)); int ret; dout("process_connect on %p tag %d\n", con, (int)con->in_tag); @@ -2853,8 +2876,8 @@ static void con_fault(struct ceph_connection *con) */ void ceph_messenger_init(struct ceph_messenger *msgr, struct ceph_entity_addr *myaddr, - u32 supported_features, - u32 required_features, + u64 supported_features, + u64 required_features, bool nocrc) { msgr->supported_features = supported_features; @@ -3126,15 +3149,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, INIT_LIST_HEAD(&m->data); /* front */ - m->front_max = front_len; if (front_len) { - if (front_len > PAGE_CACHE_SIZE) { - m->front.iov_base = __vmalloc(front_len, flags, - PAGE_KERNEL); - m->front_is_vmalloc = true; - } else { - m->front.iov_base = kmalloc(front_len, flags); - } + m->front.iov_base = ceph_kvmalloc(front_len, flags); if (m->front.iov_base == NULL) { dout("ceph_msg_new can't allocate %d bytes\n", front_len); @@ -3143,7 +3159,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, } else { m->front.iov_base = NULL; } - m->front.iov_len = front_len; + m->front_alloc_len = m->front.iov_len = front_len; dout("ceph_msg_new %p front %d\n", m, front_len); return m; @@ -3256,10 +3272,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) void ceph_msg_kfree(struct ceph_msg *m) { dout("msg_kfree %p\n", m); - if (m->front_is_vmalloc) - vfree(m->front.iov_base); - else - kfree(m->front.iov_base); + ceph_kvfree(m->front.iov_base); kmem_cache_free(ceph_msg_cache, m); } @@ -3301,8 +3314,8 @@ EXPORT_SYMBOL(ceph_msg_last_put); void ceph_msg_dump(struct ceph_msg *msg) { - pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, - msg->front_max, msg->data_length); + pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg, + msg->front_alloc_len, msg->data_length); print_hex_dump(KERN_DEBUG, "header: ", DUMP_PREFIX_OFFSET, 16, 1, &msg->hdr, sizeof(msg->hdr), true); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 1fe25cd29d0..067d3af2eaf 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -152,7 +152,7 @@ static int __open_session(struct ceph_mon_client *monc) /* initiatiate authentication handshake */ ret = ceph_auth_build_hello(monc->auth, monc->m_auth->front.iov_base, - monc->m_auth->front_max); + monc->m_auth->front_alloc_len); __send_prepared_auth_request(monc, ret); } else { dout("open_session mon%d already open\n", monc->cur_mon); @@ -196,7 +196,7 @@ static void __send_subscribe(struct ceph_mon_client *monc) int num; p = msg->front.iov_base; - end = p + msg->front_max; + end = p + msg->front_alloc_len; num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; ceph_encode_32(&p, num); @@ -296,6 +296,33 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) __send_subscribe(monc); mutex_unlock(&monc->mutex); } +EXPORT_SYMBOL(ceph_monc_request_next_osdmap); + +int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, + unsigned long timeout) +{ + unsigned long started = jiffies; + int ret; + + mutex_lock(&monc->mutex); + while (monc->have_osdmap < epoch) { + mutex_unlock(&monc->mutex); + + if (timeout != 0 && time_after_eq(jiffies, started + timeout)) + return -ETIMEDOUT; + + ret = wait_event_interruptible_timeout(monc->client->auth_wq, + monc->have_osdmap >= epoch, timeout); + if (ret < 0) + return ret; + + mutex_lock(&monc->mutex); + } + + mutex_unlock(&monc->mutex); + return 0; +} +EXPORT_SYMBOL(ceph_monc_wait_osdmap); /* * @@ -477,14 +504,13 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con, return m; } -static int do_generic_request(struct ceph_mon_client *monc, - struct ceph_mon_generic_request *req) +static int __do_generic_request(struct ceph_mon_client *monc, u64 tid, + struct ceph_mon_generic_request *req) { int err; /* register request */ - mutex_lock(&monc->mutex); - req->tid = ++monc->last_tid; + req->tid = tid != 0 ? tid : ++monc->last_tid; req->request->hdr.tid = cpu_to_le64(req->tid); __insert_generic_request(monc, req); monc->num_generic_requests++; @@ -496,13 +522,24 @@ static int do_generic_request(struct ceph_mon_client *monc, mutex_lock(&monc->mutex); rb_erase(&req->node, &monc->generic_request_tree); monc->num_generic_requests--; - mutex_unlock(&monc->mutex); if (!err) err = req->result; return err; } +static int do_generic_request(struct ceph_mon_client *monc, + struct ceph_mon_generic_request *req) +{ + int err; + + mutex_lock(&monc->mutex); + err = __do_generic_request(monc, 0, req); + mutex_unlock(&monc->mutex); + + return err; +} + /* * statfs */ @@ -579,6 +616,96 @@ out: } EXPORT_SYMBOL(ceph_monc_do_statfs); +static void handle_get_version_reply(struct ceph_mon_client *monc, + struct ceph_msg *msg) +{ + struct ceph_mon_generic_request *req; + u64 tid = le64_to_cpu(msg->hdr.tid); + void *p = msg->front.iov_base; + void *end = p + msg->front_alloc_len; + u64 handle; + + dout("%s %p tid %llu\n", __func__, msg, tid); + + ceph_decode_need(&p, end, 2*sizeof(u64), bad); + handle = ceph_decode_64(&p); + if (tid != 0 && tid != handle) + goto bad; + + mutex_lock(&monc->mutex); + req = __lookup_generic_req(monc, handle); + if (req) { + *(u64 *)req->buf = ceph_decode_64(&p); + req->result = 0; + get_generic_request(req); + } + mutex_unlock(&monc->mutex); + if (req) { + complete_all(&req->completion); + put_generic_request(req); + } + + return; +bad: + pr_err("corrupt mon_get_version reply\n"); + ceph_msg_dump(msg); +} + +/* + * Send MMonGetVersion and wait for the reply. + * + * @what: one of "mdsmap", "osdmap" or "monmap" + */ +int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what, + u64 *newest) +{ + struct ceph_mon_generic_request *req; + void *p, *end; + u64 tid; + int err; + + req = kzalloc(sizeof(*req), GFP_NOFS); + if (!req) + return -ENOMEM; + + kref_init(&req->kref); + req->buf = newest; + req->buf_len = sizeof(*newest); + init_completion(&req->completion); + + req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, + sizeof(u64) + sizeof(u32) + strlen(what), + GFP_NOFS, true); + if (!req->request) { + err = -ENOMEM; + goto out; + } + + req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024, + GFP_NOFS, true); + if (!req->reply) { + err = -ENOMEM; + goto out; + } + + p = req->request->front.iov_base; + end = p + req->request->front_alloc_len; + + /* fill out request */ + mutex_lock(&monc->mutex); + tid = ++monc->last_tid; + ceph_encode_64(&p, tid); /* handle */ + ceph_encode_string(&p, end, what, strlen(what)); + + err = __do_generic_request(monc, tid, req); + + mutex_unlock(&monc->mutex); +out: + kref_put(&req->kref, release_generic_request); + return err; +} +EXPORT_SYMBOL(ceph_monc_do_get_version); + /* * pool ops */ @@ -897,7 +1024,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc, ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, msg->front.iov_len, monc->m_auth->front.iov_base, - monc->m_auth->front_max); + monc->m_auth->front_alloc_len); if (ret < 0) { monc->client->auth_err = ret; wake_up_all(&monc->client->auth_wq); @@ -939,7 +1066,7 @@ static int __validate_auth(struct ceph_mon_client *monc) return 0; ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, - monc->m_auth->front_max); + monc->m_auth->front_alloc_len); if (ret <= 0) return ret; /* either an error, or no need to authenticate */ __send_prepared_auth_request(monc, ret); @@ -981,6 +1108,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) handle_statfs_reply(monc, msg); break; + case CEPH_MSG_MON_GET_VERSION_REPLY: + handle_get_version_reply(monc, msg); + break; + case CEPH_MSG_POOLOP_REPLY: handle_poolop_reply(monc, msg); break; @@ -1029,6 +1160,15 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, case CEPH_MSG_AUTH_REPLY: m = ceph_msg_get(monc->m_auth_reply); break; + case CEPH_MSG_MON_GET_VERSION_REPLY: + if (le64_to_cpu(hdr->tid) != 0) + return get_generic_reply(con, hdr, skip); + + /* + * Older OSDs don't set reply tid even if the orignal + * request had a non-zero tid. Workaround this weirdness + * by falling through to the allocate case. + */ case CEPH_MSG_MON_MAP: case CEPH_MSG_MDS_MAP: case CEPH_MSG_OSD_MAP: diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 2b4b32aaa89..05be0c18169 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -338,7 +338,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, msg_size = 4 + 4 + 8 + 8 + 4+8; msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ msg_size += 1 + 8 + 4 + 4; /* pg_t */ - msg_size += 4 + MAX_OBJ_NAME_SIZE; + msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */ msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); msg_size += 8; /* snapid */ msg_size += 8; /* snap_seq */ @@ -368,6 +368,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_osd_item); + req->r_base_oloc.pool = -1; + req->r_target_oloc.pool = -1; + /* create reply message */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); @@ -433,6 +436,7 @@ static bool osd_req_opcode_valid(u16 opcode) case CEPH_OSD_OP_OMAPCLEAR: case CEPH_OSD_OP_OMAPRMKEYS: case CEPH_OSD_OP_OMAP_CMP: + case CEPH_OSD_OP_SETALLOCHINT: case CEPH_OSD_OP_CLONERANGE: case CEPH_OSD_OP_ASSERT_SRC_VERSION: case CEPH_OSD_OP_SRC_CMPXATTR: @@ -588,6 +592,26 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_watch_init); +void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, + unsigned int which, + u64 expected_object_size, + u64 expected_write_size) +{ + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + CEPH_OSD_OP_SETALLOCHINT); + + op->alloc_hint.expected_object_size = expected_object_size; + op->alloc_hint.expected_write_size = expected_write_size; + + /* + * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed + * not worth a feature bit. Set FAILOK per-op flag to make + * sure older osds don't trip over an unsupported opcode. + */ + op->flags |= CEPH_OSD_OP_FLAG_FAILOK; +} +EXPORT_SYMBOL(osd_req_op_alloc_hint_init); + static void ceph_osdc_msg_data_add(struct ceph_msg *msg, struct ceph_osd_data *osd_data) { @@ -678,6 +702,12 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, dst->watch.ver = cpu_to_le64(src->watch.ver); dst->watch.flag = src->watch.flag; break; + case CEPH_OSD_OP_SETALLOCHINT: + dst->alloc_hint.expected_object_size = + cpu_to_le64(src->alloc_hint.expected_object_size); + dst->alloc_hint.expected_write_size = + cpu_to_le64(src->alloc_hint.expected_write_size); + break; default: pr_err("unsupported osd opcode %s\n", ceph_osd_op_name(src->op)); @@ -685,7 +715,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, return 0; } + dst->op = cpu_to_le16(src->op); + dst->flags = cpu_to_le32(src->flags); dst->payload_len = cpu_to_le32(src->payload_len); return request_data_len; @@ -761,11 +793,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (num_ops > 1) osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); - req->r_file_layout = *layout; /* keep a copy */ + req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); - snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", - vino.ino, objnum); - req->r_oid_len = strlen(req->r_oid); + snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name), + "%llx.%08llx", vino.ino, objnum); + req->r_base_oid.name_len = strlen(req->r_base_oid.name); return req; } @@ -1044,8 +1076,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) !ceph_con_opened(&osd->o_con)) { struct ceph_osd_request *req; - dout(" osd addr hasn't changed and connection never opened," - " letting msgr retry"); + dout("osd addr hasn't changed and connection never opened, " + "letting msgr retry\n"); /* touch each r_stamp for handle_timeout()'s benfit */ list_for_each_entry(req, &osd->o_requests, r_osd_item) req->r_stamp = jiffies; @@ -1232,6 +1264,61 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, EXPORT_SYMBOL(ceph_osdc_set_request_linger); /* + * Returns whether a request should be blocked from being sent + * based on the current osdmap and osd_client settings. + * + * Caller should hold map_sem for read. + */ +static bool __req_should_be_paused(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD); + bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); + return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) || + (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr); +} + +/* + * Calculate mapping of a request to a PG. Takes tiering into account. + */ +static int __calc_request_pg(struct ceph_osdmap *osdmap, + struct ceph_osd_request *req, + struct ceph_pg *pg_out) +{ + bool need_check_tiering; + + need_check_tiering = false; + if (req->r_target_oloc.pool == -1) { + req->r_target_oloc = req->r_base_oloc; /* struct */ + need_check_tiering = true; + } + if (req->r_target_oid.name_len == 0) { + ceph_oid_copy(&req->r_target_oid, &req->r_base_oid); + need_check_tiering = true; + } + + if (need_check_tiering && + (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { + struct ceph_pg_pool_info *pi; + + pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool); + if (pi) { + if ((req->r_flags & CEPH_OSD_FLAG_READ) && + pi->read_tier >= 0) + req->r_target_oloc.pool = pi->read_tier; + if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && + pi->write_tier >= 0) + req->r_target_oloc.pool = pi->write_tier; + } + /* !pi is caught in ceph_oloc_oid_to_pg() */ + } + + return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc, + &req->r_target_oid, pg_out); +} + +/* * Pick an osd (the first 'up' osd in the pg), allocate the osd struct * (as needed), and set the request r_osd appropriately. If there is * no up osd, set r_osd to NULL. Move the request to the appropriate list @@ -1246,30 +1333,35 @@ static int __map_request(struct ceph_osd_client *osdc, { struct ceph_pg pgid; int acting[CEPH_PG_MAX_SIZE]; - int o = -1, num = 0; + int num, o; int err; + bool was_paused; dout("map_request %p tid %lld\n", req, req->r_tid); - err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, - ceph_file_layout_pg_pool(req->r_file_layout)); + + err = __calc_request_pg(osdc->osdmap, req, &pgid); if (err) { list_move(&req->r_req_lru_item, &osdc->req_notarget); return err; } req->r_pgid = pgid; - err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); - if (err > 0) { - o = acting[0]; - num = err; - } + num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o); + if (num < 0) + num = 0; + + was_paused = req->r_paused; + req->r_paused = __req_should_be_paused(osdc, req); + if (was_paused && !req->r_paused) + force_resend = 1; if ((!force_resend && req->r_osd && req->r_osd->o_osd == o && req->r_sent >= req->r_osd->o_incarnation && req->r_num_pg_osds == num && memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || - (req->r_osd == NULL && o == -1)) + (req->r_osd == NULL && o == -1) || + req->r_paused) return 0; /* no change */ dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", @@ -1331,7 +1423,7 @@ static void __send_request(struct ceph_osd_client *osdc, /* fill in message content that changes each time we send it */ put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); put_unaligned_le32(req->r_flags, req->r_request_flags); - put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); + put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool); p = req->r_request_pgid; ceph_encode_64(&p, req->r_pgid.pool); ceph_encode_32(&p, req->r_pgid.seed); @@ -1362,6 +1454,40 @@ static void __send_queued(struct ceph_osd_client *osdc) } /* + * Caller should hold map_sem for read and request_mutex. + */ +static int __ceph_osdc_start_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req, + bool nofail) +{ + int rc; + + __register_request(osdc, req); + req->r_sent = 0; + req->r_got_reply = 0; + rc = __map_request(osdc, req, 0); + if (rc < 0) { + if (nofail) { + dout("osdc_start_request failed map, " + " will retry %lld\n", req->r_tid); + rc = 0; + } else { + __unregister_request(osdc, req); + } + return rc; + } + + if (req->r_osd == NULL) { + dout("send_request %p no up osds in pg\n", req); + ceph_monc_request_next_osdmap(&osdc->client->monc); + } else { + __send_queued(osdc); + } + + return 0; +} + +/* * Timeout callback, called every N seconds when 1 or more osd * requests has been active for more than N seconds. When this * happens, we ping all OSDs with requests who have timed out to @@ -1432,6 +1558,109 @@ static void handle_osds_timeout(struct work_struct *work) round_jiffies_relative(delay)); } +static int ceph_oloc_decode(void **p, void *end, + struct ceph_object_locator *oloc) +{ + u8 struct_v, struct_cv; + u32 len; + void *struct_end; + int ret = 0; + + ceph_decode_need(p, end, 1 + 1 + 4, e_inval); + struct_v = ceph_decode_8(p); + struct_cv = ceph_decode_8(p); + if (struct_v < 3) { + pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", + struct_v, struct_cv); + goto e_inval; + } + if (struct_cv > 6) { + pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", + struct_v, struct_cv); + goto e_inval; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, e_inval); + struct_end = *p + len; + + oloc->pool = ceph_decode_64(p); + *p += 4; /* skip preferred */ + + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_object_locator::key is set\n"); + goto e_inval; + } + + if (struct_v >= 5) { + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_object_locator::nspace is set\n"); + goto e_inval; + } + } + + if (struct_v >= 6) { + s64 hash = ceph_decode_64(p); + if (hash != -1) { + pr_warn("ceph_object_locator::hash is set\n"); + goto e_inval; + } + } + + /* skip the rest */ + *p = struct_end; +out: + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + +static int ceph_redirect_decode(void **p, void *end, + struct ceph_request_redirect *redir) +{ + u8 struct_v, struct_cv; + u32 len; + void *struct_end; + int ret; + + ceph_decode_need(p, end, 1 + 1 + 4, e_inval); + struct_v = ceph_decode_8(p); + struct_cv = ceph_decode_8(p); + if (struct_cv > 1) { + pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", + struct_v, struct_cv); + goto e_inval; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, e_inval); + struct_end = *p + len; + + ret = ceph_oloc_decode(p, end, &redir->oloc); + if (ret) + goto out; + + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_request_redirect::object_name is set\n"); + goto e_inval; + } + + len = ceph_decode_32(p); + *p += len; /* skip osd_instructions */ + + /* skip the rest */ + *p = struct_end; +out: + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + static void complete_request(struct ceph_osd_request *req) { complete_all(&req->r_safe_completion); /* fsync waiter */ @@ -1446,6 +1675,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, { void *p, *end; struct ceph_osd_request *req; + struct ceph_request_redirect redir; u64 tid; int object_len; unsigned int numops; @@ -1484,6 +1714,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, osdmap_epoch = ceph_decode_32(&p); /* lookup */ + down_read(&osdc->map_sem); mutex_lock(&osdc->request_mutex); req = __lookup_request(osdc, tid); if (req == NULL) { @@ -1525,10 +1756,40 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, for (i = 0; i < numops; i++) req->r_reply_op_result[i] = ceph_decode_32(&p); - already_completed = req->r_got_reply; + if (le16_to_cpu(msg->hdr.version) >= 6) { + p += 8 + 4; /* skip replay_version */ + p += 8; /* skip user_version */ - if (!req->r_got_reply) { + err = ceph_redirect_decode(&p, end, &redir); + if (err) + goto bad_put; + } else { + redir.oloc.pool = -1; + } + if (redir.oloc.pool != -1) { + dout("redirect pool %lld\n", redir.oloc.pool); + + __unregister_request(osdc, req); + + req->r_target_oloc = redir.oloc; /* struct */ + + /* + * Start redirect requests with nofail=true. If + * mapping fails, request will end up on the notarget + * list, waiting for the new osdmap (which can take + * a while), even though the original request mapped + * successfully. In the future we might want to follow + * original request's nofail setting here. + */ + err = __ceph_osdc_start_request(osdc, req, true); + BUG_ON(err); + + goto out_unlock; + } + + already_completed = req->r_got_reply; + if (!req->r_got_reply) { req->r_result = result; dout("handle_reply result %d bytes %d\n", req->r_result, bytes); @@ -1542,8 +1803,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, req->r_got_reply = 1; } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { dout("handle_reply tid %llu dup ack\n", tid); - mutex_unlock(&osdc->request_mutex); - goto done; + goto out_unlock; } dout("handle_reply tid %llu flags %d\n", tid, flags); @@ -1558,6 +1818,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, __unregister_request(osdc, req); mutex_unlock(&osdc->request_mutex); + up_read(&osdc->map_sem); if (!already_completed) { if (req->r_unsafe_callback && @@ -1575,15 +1836,27 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, complete_request(req); } -done: +out: dout("req=%p req->r_linger=%d\n", req, req->r_linger); ceph_osdc_put_request(req); return; +out_unlock: + mutex_unlock(&osdc->request_mutex); + up_read(&osdc->map_sem); + goto out; bad_put: + req->r_result = -EIO; + __unregister_request(osdc, req); + if (req->r_callback) + req->r_callback(req, msg); + else + complete_all(&req->r_completion); + complete_request(req); ceph_osdc_put_request(req); bad_mutex: mutex_unlock(&osdc->request_mutex); + up_read(&osdc->map_sem); bad: pr_err("corrupt osd_op_reply got %d %d\n", (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); @@ -1613,14 +1886,17 @@ static void reset_changed_osds(struct ceph_osd_client *osdc) * * Caller should hold map_sem for read. */ -static void kick_requests(struct ceph_osd_client *osdc, int force_resend) +static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, + bool force_resend_writes) { struct ceph_osd_request *req, *nreq; struct rb_node *p; int needmap = 0; int err; + bool force_resend_req; - dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); + dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "", + force_resend_writes ? " (force resend writes)" : ""); mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; ) { req = rb_entry(p, struct ceph_osd_request, r_node); @@ -1645,7 +1921,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) continue; } - err = __map_request(osdc, req, force_resend); + force_resend_req = force_resend || + (force_resend_writes && + req->r_flags & CEPH_OSD_FLAG_WRITE); + err = __map_request(osdc, req, force_resend_req); if (err < 0) continue; /* error */ if (req->r_osd == NULL) { @@ -1665,7 +1944,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) r_linger_item) { dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); - err = __map_request(osdc, req, force_resend); + err = __map_request(osdc, req, + force_resend || force_resend_writes); dout("__map_request returned %d\n", err); if (err == 0) continue; /* no change and no osd was specified */ @@ -1707,6 +1987,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) struct ceph_osdmap *newmap = NULL, *oldmap; int err; struct ceph_fsid fsid; + bool was_full; dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); p = msg->front.iov_base; @@ -1720,6 +2001,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) down_write(&osdc->map_sem); + was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); + /* incremental maps */ ceph_decode_32_safe(&p, end, nr_maps, bad); dout(" %d inc maps\n", nr_maps); @@ -1744,7 +2027,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ceph_osdmap_destroy(osdc->osdmap); osdc->osdmap = newmap; } - kick_requests(osdc, 0); + was_full = was_full || + ceph_osdmap_flag(osdc->osdmap, + CEPH_OSDMAP_FULL); + kick_requests(osdc, 0, was_full); } else { dout("ignoring incremental map %u len %d\n", epoch, maplen); @@ -1774,7 +2060,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) int skipped_map = 0; dout("taking full map %u len %d\n", epoch, maplen); - newmap = osdmap_decode(&p, p+maplen); + newmap = ceph_osdmap_decode(&p, p+maplen); if (IS_ERR(newmap)) { err = PTR_ERR(newmap); goto bad; @@ -1787,7 +2073,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) skipped_map = 1; ceph_osdmap_destroy(oldmap); } - kick_requests(osdc, skipped_map); + was_full = was_full || + ceph_osdmap_flag(osdc->osdmap, + CEPH_OSDMAP_FULL); + kick_requests(osdc, skipped_map, was_full); } p += maplen; nr_maps--; @@ -1804,7 +2093,9 @@ done: * we find out when we are no longer full and stop returning * ENOSPC. */ - if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) ceph_monc_request_next_osdmap(&osdc->client->monc); mutex_lock(&osdc->request_mutex); @@ -1818,7 +2109,6 @@ bad: pr_err("osdc handle_map corrupt msg\n"); ceph_msg_dump(msg); up_write(&osdc->map_sem); - return; } /* @@ -2017,7 +2307,6 @@ done_err: bad: pr_err("osdc handle_watch_notify corrupt msg\n"); - return; } /* @@ -2068,10 +2357,11 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, ceph_encode_32(&p, -1); /* preferred */ /* oid */ - ceph_encode_32(&p, req->r_oid_len); - memcpy(p, req->r_oid, req->r_oid_len); - dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); - p += req->r_oid_len; + ceph_encode_32(&p, req->r_base_oid.name_len); + memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); + dout("oid '%.*s' len %d\n", req->r_base_oid.name_len, + req->r_base_oid.name, req->r_base_oid.name_len); + p += req->r_base_oid.name_len; /* ops--can imply data */ ceph_encode_16(&p, (u16)req->r_num_ops); @@ -2125,34 +2415,16 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req, bool nofail) { - int rc = 0; + int rc; down_read(&osdc->map_sem); mutex_lock(&osdc->request_mutex); - __register_request(osdc, req); - req->r_sent = 0; - req->r_got_reply = 0; - rc = __map_request(osdc, req, 0); - if (rc < 0) { - if (nofail) { - dout("osdc_start_request failed map, " - " will retry %lld\n", req->r_tid); - rc = 0; - } else { - __unregister_request(osdc, req); - } - goto out_unlock; - } - if (req->r_osd == NULL) { - dout("send_request %p no up osds in pg\n", req); - ceph_monc_request_next_osdmap(&osdc->client->monc); - } else { - __send_queued(osdc); - } - rc = 0; -out_unlock: + + rc = __ceph_osdc_start_request(osdc, req, nofail); + mutex_unlock(&osdc->request_mutex); up_read(&osdc->map_sem); + return rc; } EXPORT_SYMBOL(ceph_osdc_start_request); @@ -2219,7 +2491,7 @@ EXPORT_SYMBOL(ceph_osdc_sync); * Call all pending notify callbacks - for use after a watch is * unregistered, to make sure no more callbacks for it will be invoked */ -extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) +void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) { flush_workqueue(osdc->notify_wq); } @@ -2278,9 +2550,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) err = -ENOMEM; osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); if (!osdc->notify_wq) - goto out_msgpool; + goto out_msgpool_reply; + return 0; +out_msgpool_reply: + ceph_msgpool_destroy(&osdc->msgpool_op_reply); out_msgpool: ceph_msgpool_destroy(&osdc->msgpool_op); out_mempool: @@ -2454,7 +2729,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, struct ceph_osd_client *osdc = osd->o_osdc; struct ceph_msg *m; struct ceph_osd_request *req; - int front = le32_to_cpu(hdr->front_len); + int front_len = le32_to_cpu(hdr->front_len); int data_len = le32_to_cpu(hdr->data_len); u64 tid; @@ -2474,12 +2749,13 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, req->r_reply, req->r_reply->con); ceph_msg_revoke_incoming(req->r_reply); - if (front > req->r_reply->front.iov_len) { + if (front_len > req->r_reply->front_alloc_len) { pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", - front, (int)req->r_reply->front.iov_len, + front_len, req->r_reply->front_alloc_len, (unsigned int)con->peer_name.type, le64_to_cpu(con->peer_name.num)); - m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); + m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, + false); if (!m) goto out; ceph_msg_put(req->r_reply); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index dbd9a479242..c547e46084d 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -329,6 +329,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end) dout("crush decode tunable chooseleaf_descend_once = %d", c->chooseleaf_descend_once); + ceph_decode_need(p, end, sizeof(u8), done); + c->chooseleaf_vary_r = ceph_decode_8(p); + dout("crush decode tunable chooseleaf_vary_r = %d", + c->chooseleaf_vary_r); + done: dout("crush_decode success\n"); return c; @@ -343,7 +348,7 @@ bad: /* * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid - * to a set of osds) + * to a set of osds) and primary_temp (explicit primary setting) */ static int pgid_cmp(struct ceph_pg l, struct ceph_pg r) { @@ -464,6 +469,11 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id) return NULL; } +struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id) +{ + return __lookup_pg_pool(&map->pg_pools, id); +} + const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id) { struct ceph_pg_pool_info *pi; @@ -501,7 +511,7 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) kfree(pi); } -static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) +static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) { u8 ev, cv; unsigned len, num; @@ -514,8 +524,8 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); return -EINVAL; } - if (cv > 7) { - pr_warning("got v %d cv %d > 7 of ceph_pg_pool\n", ev, cv); + if (cv > 9) { + pr_warning("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv); return -EINVAL; } len = ceph_decode_32(p); @@ -543,12 +553,34 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) *p += len; } - /* skip removed snaps */ + /* skip removed_snaps */ num = ceph_decode_32(p); *p += num * (8 + 8); *p += 8; /* skip auid */ pi->flags = ceph_decode_64(p); + *p += 4; /* skip crash_replay_interval */ + + if (ev >= 7) + *p += 1; /* skip min_size */ + + if (ev >= 8) + *p += 8 + 8; /* skip quota_max_* */ + + if (ev >= 9) { + /* skip tiers */ + num = ceph_decode_32(p); + *p += num * 8; + + *p += 8; /* skip tier_of */ + *p += 1; /* skip cache_mode */ + + pi->read_tier = ceph_decode_64(p); + pi->write_tier = ceph_decode_64(p); + } else { + pi->read_tier = -1; + pi->write_tier = -1; + } /* ignore the rest */ @@ -560,7 +592,7 @@ bad: return -EINVAL; } -static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) +static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map) { struct ceph_pg_pool_info *pi; u32 num, len; @@ -606,6 +638,13 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map) rb_erase(&pg->node, &map->pg_temp); kfree(pg); } + while (!RB_EMPTY_ROOT(&map->primary_temp)) { + struct ceph_pg_mapping *pg = + rb_entry(rb_first(&map->primary_temp), + struct ceph_pg_mapping, node); + rb_erase(&pg->node, &map->primary_temp); + kfree(pg); + } while (!RB_EMPTY_ROOT(&map->pg_pools)) { struct ceph_pg_pool_info *pi = rb_entry(rb_first(&map->pg_pools), @@ -615,186 +654,516 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map) kfree(map->osd_state); kfree(map->osd_weight); kfree(map->osd_addr); + kfree(map->osd_primary_affinity); kfree(map); } /* - * adjust max osd value. reallocate arrays. + * Adjust max_osd value, (re)allocate arrays. + * + * The new elements are properly initialized. */ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) { u8 *state; - struct ceph_entity_addr *addr; u32 *weight; + struct ceph_entity_addr *addr; + int i; - state = kcalloc(max, sizeof(*state), GFP_NOFS); - addr = kcalloc(max, sizeof(*addr), GFP_NOFS); - weight = kcalloc(max, sizeof(*weight), GFP_NOFS); - if (state == NULL || addr == NULL || weight == NULL) { + state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS); + weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS); + addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS); + if (!state || !weight || !addr) { kfree(state); - kfree(addr); kfree(weight); + kfree(addr); + return -ENOMEM; } - /* copy old? */ - if (map->osd_state) { - memcpy(state, map->osd_state, map->max_osd*sizeof(*state)); - memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr)); - memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight)); - kfree(map->osd_state); - kfree(map->osd_addr); - kfree(map->osd_weight); + for (i = map->max_osd; i < max; i++) { + state[i] = 0; + weight[i] = CEPH_OSD_OUT; + memset(addr + i, 0, sizeof(*addr)); } map->osd_state = state; map->osd_weight = weight; map->osd_addr = addr; + + if (map->osd_primary_affinity) { + u32 *affinity; + + affinity = krealloc(map->osd_primary_affinity, + max*sizeof(*affinity), GFP_NOFS); + if (!affinity) + return -ENOMEM; + + for (i = map->max_osd; i < max; i++) + affinity[i] = CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; + + map->osd_primary_affinity = affinity; + } + map->max_osd = max; + return 0; } +#define OSDMAP_WRAPPER_COMPAT_VER 7 +#define OSDMAP_CLIENT_DATA_COMPAT_VER 1 + /* - * decode a full map. + * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps, + * to struct_v of the client_data section for new (v7 and above) + * osdmaps. */ -struct ceph_osdmap *osdmap_decode(void **p, void *end) +static int get_osdmap_client_data_v(void **p, void *end, + const char *prefix, u8 *v) { - struct ceph_osdmap *map; - u16 version; - u32 len, max, i; - int err = -EINVAL; - void *start = *p; - struct ceph_pg_pool_info *pi; + u8 struct_v; + + ceph_decode_8_safe(p, end, struct_v, e_inval); + if (struct_v >= 7) { + u8 struct_compat; + + ceph_decode_8_safe(p, end, struct_compat, e_inval); + if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) { + pr_warning("got v %d cv %d > %d of %s ceph_osdmap\n", + struct_v, struct_compat, + OSDMAP_WRAPPER_COMPAT_VER, prefix); + return -EINVAL; + } + *p += 4; /* ignore wrapper struct_len */ + + ceph_decode_8_safe(p, end, struct_v, e_inval); + ceph_decode_8_safe(p, end, struct_compat, e_inval); + if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) { + pr_warning("got v %d cv %d > %d of %s ceph_osdmap client data\n", + struct_v, struct_compat, + OSDMAP_CLIENT_DATA_COMPAT_VER, prefix); + return -EINVAL; + } + *p += 4; /* ignore client data struct_len */ + } else { + u16 version; + + *p -= 1; + ceph_decode_16_safe(p, end, version, e_inval); + if (version < 6) { + pr_warning("got v %d < 6 of %s ceph_osdmap\n", version, + prefix); + return -EINVAL; + } - dout("osdmap_decode %p to %p len %d\n", *p, end, (int)(end - *p)); + /* old osdmap enconding */ + struct_v = 0; + } - map = kzalloc(sizeof(*map), GFP_NOFS); - if (map == NULL) - return ERR_PTR(-ENOMEM); - map->pg_temp = RB_ROOT; + *v = struct_v; + return 0; - ceph_decode_16_safe(p, end, version, bad); - if (version > 6) { - pr_warning("got unknown v %d > 6 of osdmap\n", version); - goto bad; +e_inval: + return -EINVAL; +} + +static int __decode_pools(void **p, void *end, struct ceph_osdmap *map, + bool incremental) +{ + u32 n; + + ceph_decode_32_safe(p, end, n, e_inval); + while (n--) { + struct ceph_pg_pool_info *pi; + u64 pool; + int ret; + + ceph_decode_64_safe(p, end, pool, e_inval); + + pi = __lookup_pg_pool(&map->pg_pools, pool); + if (!incremental || !pi) { + pi = kzalloc(sizeof(*pi), GFP_NOFS); + if (!pi) + return -ENOMEM; + + pi->id = pool; + + ret = __insert_pg_pool(&map->pg_pools, pi); + if (ret) { + kfree(pi); + return ret; + } + } + + ret = decode_pool(p, end, pi); + if (ret) + return ret; } - if (version < 6) { - pr_warning("got old v %d < 6 of osdmap\n", version); - goto bad; + + return 0; + +e_inval: + return -EINVAL; +} + +static int decode_pools(void **p, void *end, struct ceph_osdmap *map) +{ + return __decode_pools(p, end, map, false); +} + +static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map) +{ + return __decode_pools(p, end, map, true); +} + +static int __decode_pg_temp(void **p, void *end, struct ceph_osdmap *map, + bool incremental) +{ + u32 n; + + ceph_decode_32_safe(p, end, n, e_inval); + while (n--) { + struct ceph_pg pgid; + u32 len, i; + int ret; + + ret = ceph_decode_pgid(p, end, &pgid); + if (ret) + return ret; + + ceph_decode_32_safe(p, end, len, e_inval); + + ret = __remove_pg_mapping(&map->pg_temp, pgid); + BUG_ON(!incremental && ret != -ENOENT); + + if (!incremental || len > 0) { + struct ceph_pg_mapping *pg; + + ceph_decode_need(p, end, len*sizeof(u32), e_inval); + + if (len > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) + return -EINVAL; + + pg = kzalloc(sizeof(*pg) + len*sizeof(u32), GFP_NOFS); + if (!pg) + return -ENOMEM; + + pg->pgid = pgid; + pg->pg_temp.len = len; + for (i = 0; i < len; i++) + pg->pg_temp.osds[i] = ceph_decode_32(p); + + ret = __insert_pg_mapping(pg, &map->pg_temp); + if (ret) { + kfree(pg); + return ret; + } + } + } + + return 0; + +e_inval: + return -EINVAL; +} + +static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map) +{ + return __decode_pg_temp(p, end, map, false); +} + +static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map) +{ + return __decode_pg_temp(p, end, map, true); +} + +static int __decode_primary_temp(void **p, void *end, struct ceph_osdmap *map, + bool incremental) +{ + u32 n; + + ceph_decode_32_safe(p, end, n, e_inval); + while (n--) { + struct ceph_pg pgid; + u32 osd; + int ret; + + ret = ceph_decode_pgid(p, end, &pgid); + if (ret) + return ret; + + ceph_decode_32_safe(p, end, osd, e_inval); + + ret = __remove_pg_mapping(&map->primary_temp, pgid); + BUG_ON(!incremental && ret != -ENOENT); + + if (!incremental || osd != (u32)-1) { + struct ceph_pg_mapping *pg; + + pg = kzalloc(sizeof(*pg), GFP_NOFS); + if (!pg) + return -ENOMEM; + + pg->pgid = pgid; + pg->primary_temp.osd = osd; + + ret = __insert_pg_mapping(pg, &map->primary_temp); + if (ret) { + kfree(pg); + return ret; + } + } } - ceph_decode_need(p, end, 2*sizeof(u64)+6*sizeof(u32), bad); + return 0; + +e_inval: + return -EINVAL; +} + +static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map) +{ + return __decode_primary_temp(p, end, map, false); +} + +static int decode_new_primary_temp(void **p, void *end, + struct ceph_osdmap *map) +{ + return __decode_primary_temp(p, end, map, true); +} + +u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd) +{ + BUG_ON(osd >= map->max_osd); + + if (!map->osd_primary_affinity) + return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; + + return map->osd_primary_affinity[osd]; +} + +static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff) +{ + BUG_ON(osd >= map->max_osd); + + if (!map->osd_primary_affinity) { + int i; + + map->osd_primary_affinity = kmalloc(map->max_osd*sizeof(u32), + GFP_NOFS); + if (!map->osd_primary_affinity) + return -ENOMEM; + + for (i = 0; i < map->max_osd; i++) + map->osd_primary_affinity[i] = + CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; + } + + map->osd_primary_affinity[osd] = aff; + + return 0; +} + +static int decode_primary_affinity(void **p, void *end, + struct ceph_osdmap *map) +{ + u32 len, i; + + ceph_decode_32_safe(p, end, len, e_inval); + if (len == 0) { + kfree(map->osd_primary_affinity); + map->osd_primary_affinity = NULL; + return 0; + } + if (len != map->max_osd) + goto e_inval; + + ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval); + + for (i = 0; i < map->max_osd; i++) { + int ret; + + ret = set_primary_affinity(map, i, ceph_decode_32(p)); + if (ret) + return ret; + } + + return 0; + +e_inval: + return -EINVAL; +} + +static int decode_new_primary_affinity(void **p, void *end, + struct ceph_osdmap *map) +{ + u32 n; + + ceph_decode_32_safe(p, end, n, e_inval); + while (n--) { + u32 osd, aff; + int ret; + + ceph_decode_32_safe(p, end, osd, e_inval); + ceph_decode_32_safe(p, end, aff, e_inval); + + ret = set_primary_affinity(map, osd, aff); + if (ret) + return ret; + + pr_info("osd%d primary-affinity 0x%x\n", osd, aff); + } + + return 0; + +e_inval: + return -EINVAL; +} + +/* + * decode a full map. + */ +static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) +{ + u8 struct_v; + u32 epoch = 0; + void *start = *p; + u32 max; + u32 len, i; + int err; + + dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); + + err = get_osdmap_client_data_v(p, end, "full", &struct_v); + if (err) + goto bad; + + /* fsid, epoch, created, modified */ + ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) + + sizeof(map->created) + sizeof(map->modified), e_inval); ceph_decode_copy(p, &map->fsid, sizeof(map->fsid)); - map->epoch = ceph_decode_32(p); + epoch = map->epoch = ceph_decode_32(p); ceph_decode_copy(p, &map->created, sizeof(map->created)); ceph_decode_copy(p, &map->modified, sizeof(map->modified)); - ceph_decode_32_safe(p, end, max, bad); - while (max--) { - ceph_decode_need(p, end, 8 + 2, bad); - err = -ENOMEM; - pi = kzalloc(sizeof(*pi), GFP_NOFS); - if (!pi) - goto bad; - pi->id = ceph_decode_64(p); - err = __decode_pool(p, end, pi); - if (err < 0) { - kfree(pi); - goto bad; - } - __insert_pg_pool(&map->pg_pools, pi); - } + /* pools */ + err = decode_pools(p, end, map); + if (err) + goto bad; - err = __decode_pool_names(p, end, map); - if (err < 0) { - dout("fail to decode pool names"); + /* pool_name */ + err = decode_pool_names(p, end, map); + if (err) goto bad; - } - ceph_decode_32_safe(p, end, map->pool_max, bad); + ceph_decode_32_safe(p, end, map->pool_max, e_inval); - ceph_decode_32_safe(p, end, map->flags, bad); + ceph_decode_32_safe(p, end, map->flags, e_inval); - max = ceph_decode_32(p); + /* max_osd */ + ceph_decode_32_safe(p, end, max, e_inval); /* (re)alloc osd arrays */ err = osdmap_set_max_osd(map, max); - if (err < 0) + if (err) goto bad; - dout("osdmap_decode max_osd = %d\n", map->max_osd); - /* osds */ - err = -EINVAL; + /* osd_state, osd_weight, osd_addrs->client_addr */ ceph_decode_need(p, end, 3*sizeof(u32) + map->max_osd*(1 + sizeof(*map->osd_weight) + - sizeof(*map->osd_addr)), bad); - *p += 4; /* skip length field (should match max) */ + sizeof(*map->osd_addr)), e_inval); + + if (ceph_decode_32(p) != map->max_osd) + goto e_inval; + ceph_decode_copy(p, map->osd_state, map->max_osd); - *p += 4; /* skip length field (should match max) */ + if (ceph_decode_32(p) != map->max_osd) + goto e_inval; + for (i = 0; i < map->max_osd; i++) map->osd_weight[i] = ceph_decode_32(p); - *p += 4; /* skip length field (should match max) */ + if (ceph_decode_32(p) != map->max_osd) + goto e_inval; + ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr)); for (i = 0; i < map->max_osd; i++) ceph_decode_addr(&map->osd_addr[i]); /* pg_temp */ - ceph_decode_32_safe(p, end, len, bad); - for (i = 0; i < len; i++) { - int n, j; - struct ceph_pg pgid; - struct ceph_pg_mapping *pg; + err = decode_pg_temp(p, end, map); + if (err) + goto bad; - err = ceph_decode_pgid(p, end, &pgid); + /* primary_temp */ + if (struct_v >= 1) { + err = decode_primary_temp(p, end, map); if (err) goto bad; - ceph_decode_need(p, end, sizeof(u32), bad); - n = ceph_decode_32(p); - err = -EINVAL; - if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) - goto bad; - ceph_decode_need(p, end, n * sizeof(u32), bad); - err = -ENOMEM; - pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS); - if (!pg) - goto bad; - pg->pgid = pgid; - pg->len = n; - for (j = 0; j < n; j++) - pg->osds[j] = ceph_decode_32(p); + } - err = __insert_pg_mapping(pg, &map->pg_temp); + /* primary_affinity */ + if (struct_v >= 2) { + err = decode_primary_affinity(p, end, map); if (err) goto bad; - dout(" added pg_temp %lld.%x len %d\n", pgid.pool, pgid.seed, - len); + } else { + /* XXX can this happen? */ + kfree(map->osd_primary_affinity); + map->osd_primary_affinity = NULL; } /* crush */ - ceph_decode_32_safe(p, end, len, bad); - dout("osdmap_decode crush len %d from off 0x%x\n", len, - (int)(*p - start)); - ceph_decode_need(p, end, len, bad); - map->crush = crush_decode(*p, end); - *p += len; + ceph_decode_32_safe(p, end, len, e_inval); + map->crush = crush_decode(*p, min(*p + len, end)); if (IS_ERR(map->crush)) { err = PTR_ERR(map->crush); map->crush = NULL; goto bad; } + *p += len; - /* ignore the rest of the map */ + /* ignore the rest */ *p = end; - dout("osdmap_decode done %p %p\n", *p, end); - return map; + dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); + return 0; +e_inval: + err = -EINVAL; bad: - dout("osdmap_decode fail err %d\n", err); - ceph_osdmap_destroy(map); - return ERR_PTR(err); + pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n", + err, epoch, (int)(*p - start), *p, start, end); + print_hex_dump(KERN_DEBUG, "osdmap: ", + DUMP_PREFIX_OFFSET, 16, 1, + start, end - start, true); + return err; +} + +/* + * Allocate and decode a full map. + */ +struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end) +{ + struct ceph_osdmap *map; + int ret; + + map = kzalloc(sizeof(*map), GFP_NOFS); + if (!map) + return ERR_PTR(-ENOMEM); + + map->pg_temp = RB_ROOT; + map->primary_temp = RB_ROOT; + mutex_init(&map->crush_scratch_mutex); + + ret = osdmap_decode(p, end, map); + if (ret) { + ceph_osdmap_destroy(map); + return ERR_PTR(ret); + } + + return map; } /* @@ -813,17 +1182,18 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, __s64 new_pool_max; __s32 new_flags, max; void *start = *p; - int err = -EINVAL; - u16 version; + int err; + u8 struct_v; + + dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); - ceph_decode_16_safe(p, end, version, bad); - if (version != 6) { - pr_warning("got unknown v %d != 6 of inc osdmap\n", version); + err = get_osdmap_client_data_v(p, end, "inc", &struct_v); + if (err) goto bad; - } - ceph_decode_need(p, end, sizeof(fsid)+sizeof(modified)+2*sizeof(u32), - bad); + /* fsid, epoch, modified, new_pool_max, new_flags */ + ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) + + sizeof(u64) + sizeof(u32), e_inval); ceph_decode_copy(p, &fsid, sizeof(fsid)); epoch = ceph_decode_32(p); BUG_ON(epoch != map->epoch+1); @@ -832,21 +1202,22 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, new_flags = ceph_decode_32(p); /* full map? */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); if (len > 0) { dout("apply_incremental full map len %d, %p to %p\n", len, *p, end); - return osdmap_decode(p, min(*p+len, end)); + return ceph_osdmap_decode(p, min(*p+len, end)); } /* new crush? */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); if (len > 0) { - dout("apply_incremental new crush map len %d, %p to %p\n", - len, *p, end); newcrush = crush_decode(*p, min(*p+len, end)); - if (IS_ERR(newcrush)) - return ERR_CAST(newcrush); + if (IS_ERR(newcrush)) { + err = PTR_ERR(newcrush); + newcrush = NULL; + goto bad; + } *p += len; } @@ -856,13 +1227,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, if (new_pool_max >= 0) map->pool_max = new_pool_max; - ceph_decode_need(p, end, 5*sizeof(u32), bad); - /* new max? */ - max = ceph_decode_32(p); + ceph_decode_32_safe(p, end, max, e_inval); if (max >= 0) { err = osdmap_set_max_osd(map, max); - if (err < 0) + if (err) goto bad; } @@ -875,51 +1244,34 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, newcrush = NULL; } - /* new_pool */ - ceph_decode_32_safe(p, end, len, bad); - while (len--) { - struct ceph_pg_pool_info *pi; + /* new_pools */ + err = decode_new_pools(p, end, map); + if (err) + goto bad; - ceph_decode_64_safe(p, end, pool, bad); - pi = __lookup_pg_pool(&map->pg_pools, pool); - if (!pi) { - pi = kzalloc(sizeof(*pi), GFP_NOFS); - if (!pi) { - err = -ENOMEM; - goto bad; - } - pi->id = pool; - __insert_pg_pool(&map->pg_pools, pi); - } - err = __decode_pool(p, end, pi); - if (err < 0) - goto bad; - } - if (version >= 5) { - err = __decode_pool_names(p, end, map); - if (err < 0) - goto bad; - } + /* new_pool_names */ + err = decode_pool_names(p, end, map); + if (err) + goto bad; /* old_pool */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); while (len--) { struct ceph_pg_pool_info *pi; - ceph_decode_64_safe(p, end, pool, bad); + ceph_decode_64_safe(p, end, pool, e_inval); pi = __lookup_pg_pool(&map->pg_pools, pool); if (pi) __remove_pg_pool(&map->pg_pools, pi); } /* new_up */ - err = -EINVAL; - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); while (len--) { u32 osd; struct ceph_entity_addr addr; - ceph_decode_32_safe(p, end, osd, bad); - ceph_decode_copy_safe(p, end, &addr, sizeof(addr), bad); + ceph_decode_32_safe(p, end, osd, e_inval); + ceph_decode_copy_safe(p, end, &addr, sizeof(addr), e_inval); ceph_decode_addr(&addr); pr_info("osd%d up\n", osd); BUG_ON(osd >= map->max_osd); @@ -928,11 +1280,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } /* new_state */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); while (len--) { u32 osd; u8 xorstate; - ceph_decode_32_safe(p, end, osd, bad); + ceph_decode_32_safe(p, end, osd, e_inval); xorstate = **(u8 **)p; (*p)++; /* clean flag */ if (xorstate == 0) @@ -944,10 +1296,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } /* new_weight */ - ceph_decode_32_safe(p, end, len, bad); + ceph_decode_32_safe(p, end, len, e_inval); while (len--) { u32 osd, off; - ceph_decode_need(p, end, sizeof(u32)*2, bad); + ceph_decode_need(p, end, sizeof(u32)*2, e_inval); osd = ceph_decode_32(p); off = ceph_decode_32(p); pr_info("osd%d weight 0x%x %s\n", osd, off, @@ -958,56 +1310,35 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } /* new_pg_temp */ - ceph_decode_32_safe(p, end, len, bad); - while (len--) { - struct ceph_pg_mapping *pg; - int j; - struct ceph_pg pgid; - u32 pglen; + err = decode_new_pg_temp(p, end, map); + if (err) + goto bad; - err = ceph_decode_pgid(p, end, &pgid); + /* new_primary_temp */ + if (struct_v >= 1) { + err = decode_new_primary_temp(p, end, map); if (err) goto bad; - ceph_decode_need(p, end, sizeof(u32), bad); - pglen = ceph_decode_32(p); - if (pglen) { - ceph_decode_need(p, end, pglen*sizeof(u32), bad); - - /* removing existing (if any) */ - (void) __remove_pg_mapping(&map->pg_temp, pgid); + } - /* insert */ - err = -EINVAL; - if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) - goto bad; - err = -ENOMEM; - pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); - if (!pg) - goto bad; - pg->pgid = pgid; - pg->len = pglen; - for (j = 0; j < pglen; j++) - pg->osds[j] = ceph_decode_32(p); - err = __insert_pg_mapping(pg, &map->pg_temp); - if (err) { - kfree(pg); - goto bad; - } - dout(" added pg_temp %lld.%x len %d\n", pgid.pool, - pgid.seed, pglen); - } else { - /* remove */ - __remove_pg_mapping(&map->pg_temp, pgid); - } + /* new_primary_affinity */ + if (struct_v >= 2) { + err = decode_new_primary_affinity(p, end, map); + if (err) + goto bad; } /* ignore the rest */ *p = end; + + dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); return map; +e_inval: + err = -EINVAL; bad: - pr_err("corrupt inc osdmap epoch %d off %d (%p of %p-%p)\n", - epoch, (int)(*p - start), *p, start, end); + pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n", + err, epoch, (int)(*p - start), *p, start, end); print_hex_dump(KERN_DEBUG, "osdmap: ", DUMP_PREFIX_OFFSET, 16, 1, start, end - start, true); @@ -1090,71 +1421,275 @@ invalid: EXPORT_SYMBOL(ceph_calc_file_object_mapping); /* - * calculate an object layout (i.e. pgid) from an oid, - * file_layout, and osdmap + * Calculate mapping of a (oloc, oid) pair to a PG. Should only be + * called with target's (oloc, oid), since tiering isn't taken into + * account. */ -int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid, - struct ceph_osdmap *osdmap, uint64_t pool) +int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, + struct ceph_object_locator *oloc, + struct ceph_object_id *oid, + struct ceph_pg *pg_out) { - struct ceph_pg_pool_info *pool_info; + struct ceph_pg_pool_info *pi; - BUG_ON(!osdmap); - pool_info = __lookup_pg_pool(&osdmap->pg_pools, pool); - if (!pool_info) + pi = __lookup_pg_pool(&osdmap->pg_pools, oloc->pool); + if (!pi) return -EIO; - pg->pool = pool; - pg->seed = ceph_str_hash(pool_info->object_hash, oid, strlen(oid)); - dout("%s '%s' pgid %lld.%x\n", __func__, oid, pg->pool, pg->seed); + pg_out->pool = oloc->pool; + pg_out->seed = ceph_str_hash(pi->object_hash, oid->name, + oid->name_len); + + dout("%s '%.*s' pgid %llu.%x\n", __func__, oid->name_len, oid->name, + pg_out->pool, pg_out->seed); return 0; } -EXPORT_SYMBOL(ceph_calc_ceph_pg); +EXPORT_SYMBOL(ceph_oloc_oid_to_pg); + +static int do_crush(struct ceph_osdmap *map, int ruleno, int x, + int *result, int result_max, + const __u32 *weight, int weight_max) +{ + int r; + + BUG_ON(result_max > CEPH_PG_MAX_SIZE); + + mutex_lock(&map->crush_scratch_mutex); + r = crush_do_rule(map->crush, ruleno, x, result, result_max, + weight, weight_max, map->crush_scratch_ary); + mutex_unlock(&map->crush_scratch_mutex); + + return r; +} /* - * Calculate raw osd vector for the given pgid. Return pointer to osd - * array, or NULL on failure. + * Calculate raw (crush) set for given pgid. + * + * Return raw set length, or error. */ -static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, - int *osds, int *num) +static int pg_to_raw_osds(struct ceph_osdmap *osdmap, + struct ceph_pg_pool_info *pool, + struct ceph_pg pgid, u32 pps, int *osds) { - struct ceph_pg_mapping *pg; - struct ceph_pg_pool_info *pool; int ruleno; - int r; - u32 pps; + int len; - pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); - if (!pool) - return NULL; + /* crush */ + ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, + pool->type, pool->size); + if (ruleno < 0) { + pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n", + pgid.pool, pool->crush_ruleset, pool->type, + pool->size); + return -ENOENT; + } - /* pg_temp? */ + len = do_crush(osdmap, ruleno, pps, osds, + min_t(int, pool->size, CEPH_PG_MAX_SIZE), + osdmap->osd_weight, osdmap->max_osd); + if (len < 0) { + pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n", + len, ruleno, pgid.pool, pool->crush_ruleset, + pool->type, pool->size); + return len; + } + + return len; +} + +/* + * Given raw set, calculate up set and up primary. + * + * Return up set length. *primary is set to up primary osd id, or -1 + * if up set is empty. + */ +static int raw_to_up_osds(struct ceph_osdmap *osdmap, + struct ceph_pg_pool_info *pool, + int *osds, int len, int *primary) +{ + int up_primary = -1; + int i; + + if (ceph_can_shift_osds(pool)) { + int removed = 0; + + for (i = 0; i < len; i++) { + if (ceph_osd_is_down(osdmap, osds[i])) { + removed++; + continue; + } + if (removed) + osds[i - removed] = osds[i]; + } + + len -= removed; + if (len > 0) + up_primary = osds[0]; + } else { + for (i = len - 1; i >= 0; i--) { + if (ceph_osd_is_down(osdmap, osds[i])) + osds[i] = CRUSH_ITEM_NONE; + else + up_primary = osds[i]; + } + } + + *primary = up_primary; + return len; +} + +static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps, + struct ceph_pg_pool_info *pool, + int *osds, int len, int *primary) +{ + int i; + int pos = -1; + + /* + * Do we have any non-default primary_affinity values for these + * osds? + */ + if (!osdmap->osd_primary_affinity) + return; + + for (i = 0; i < len; i++) { + int osd = osds[i]; + + if (osd != CRUSH_ITEM_NONE && + osdmap->osd_primary_affinity[osd] != + CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) { + break; + } + } + if (i == len) + return; + + /* + * Pick the primary. Feed both the seed (for the pg) and the + * osd into the hash/rng so that a proportional fraction of an + * osd's pgs get rejected as primary. + */ + for (i = 0; i < len; i++) { + int osd = osds[i]; + u32 aff; + + if (osd == CRUSH_ITEM_NONE) + continue; + + aff = osdmap->osd_primary_affinity[osd]; + if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY && + (crush_hash32_2(CRUSH_HASH_RJENKINS1, + pps, osd) >> 16) >= aff) { + /* + * We chose not to use this primary. Note it + * anyway as a fallback in case we don't pick + * anyone else, but keep looking. + */ + if (pos < 0) + pos = i; + } else { + pos = i; + break; + } + } + if (pos < 0) + return; + + *primary = osds[pos]; + + if (ceph_can_shift_osds(pool) && pos > 0) { + /* move the new primary to the front */ + for (i = pos; i > 0; i--) + osds[i] = osds[i - 1]; + osds[0] = *primary; + } +} + +/* + * Given up set, apply pg_temp and primary_temp mappings. + * + * Return acting set length. *primary is set to acting primary osd id, + * or -1 if acting set is empty. + */ +static int apply_temps(struct ceph_osdmap *osdmap, + struct ceph_pg_pool_info *pool, struct ceph_pg pgid, + int *osds, int len, int *primary) +{ + struct ceph_pg_mapping *pg; + int temp_len; + int temp_primary; + int i; + + /* raw_pg -> pg */ pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num, pool->pg_num_mask); + + /* pg_temp? */ pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); if (pg) { - *num = pg->len; - return pg->osds; + temp_len = 0; + temp_primary = -1; + + for (i = 0; i < pg->pg_temp.len; i++) { + if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) { + if (ceph_can_shift_osds(pool)) + continue; + else + osds[temp_len++] = CRUSH_ITEM_NONE; + } else { + osds[temp_len++] = pg->pg_temp.osds[i]; + } + } + + /* apply pg_temp's primary */ + for (i = 0; i < temp_len; i++) { + if (osds[i] != CRUSH_ITEM_NONE) { + temp_primary = osds[i]; + break; + } + } + } else { + temp_len = len; + temp_primary = *primary; } - /* crush */ - ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, - pool->type, pool->size); - if (ruleno < 0) { - pr_err("no crush rule pool %lld ruleset %d type %d size %d\n", - pgid.pool, pool->crush_ruleset, pool->type, - pool->size); - return NULL; + /* primary_temp? */ + pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid); + if (pg) + temp_primary = pg->primary_temp.osd; + + *primary = temp_primary; + return temp_len; +} + +/* + * Calculate acting set for given pgid. + * + * Return acting set length, or error. *primary is set to acting + * primary osd id, or -1 if acting set is empty or on error. + */ +int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, + int *osds, int *primary) +{ + struct ceph_pg_pool_info *pool; + u32 pps; + int len; + + pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); + if (!pool) { + *primary = -1; + return -ENOENT; } if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) { - /* hash pool id and seed sothat pool PGs do not overlap */ + /* hash pool id and seed so that pool PGs do not overlap */ pps = crush_hash32_2(CRUSH_HASH_RJENKINS1, ceph_stable_mod(pgid.seed, pool->pgp_num, pool->pgp_num_mask), pgid.pool); } else { /* - * legacy ehavior: add ps and pool together. this is + * legacy behavior: add ps and pool together. this is * not a great approach because the PGs from each pool * will overlap on top of each other: 0.5 == 1.4 == * 2.3 == ... @@ -1163,38 +1698,20 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, pool->pgp_num_mask) + (unsigned)pgid.pool; } - r = crush_do_rule(osdmap->crush, ruleno, pps, osds, - min_t(int, pool->size, *num), - osdmap->osd_weight); - if (r < 0) { - pr_err("error %d from crush rule: pool %lld ruleset %d type %d" - " size %d\n", r, pgid.pool, pool->crush_ruleset, - pool->type, pool->size); - return NULL; + + len = pg_to_raw_osds(osdmap, pool, pgid, pps, osds); + if (len < 0) { + *primary = -1; + return len; } - *num = r; - return osds; -} -/* - * Return acting set for given pgid. - */ -int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, - int *acting) -{ - int rawosds[CEPH_PG_MAX_SIZE], *osds; - int i, o, num = CEPH_PG_MAX_SIZE; + len = raw_to_up_osds(osdmap, pool, osds, len, primary); - osds = calc_pg_raw(osdmap, pgid, rawosds, &num); - if (!osds) - return -1; + apply_primary_affinity(osdmap, pps, pool, osds, len, primary); + + len = apply_temps(osdmap, pool, pgid, osds, len, primary); - /* primary is first up osd */ - o = 0; - for (i = 0; i < num; i++) - if (ceph_osd_is_up(osdmap, osds[i])) - acting[o++] = osds[i]; - return o; + return len; } /* @@ -1202,17 +1719,11 @@ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, */ int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid) { - int rawosds[CEPH_PG_MAX_SIZE], *osds; - int i, num = CEPH_PG_MAX_SIZE; + int osds[CEPH_PG_MAX_SIZE]; + int primary; - osds = calc_pg_raw(osdmap, pgid, rawosds, &num); - if (!osds) - return -1; + ceph_calc_pg_acting(osdmap, pgid, osds, &primary); - /* primary is first up osd */ - for (i = 0; i < num; i++) - if (ceph_osd_is_up(osdmap, osds[i])) - return osds[i]; - return -1; + return primary; } EXPORT_SYMBOL(ceph_calc_pg_primary); diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c index 815a2249cfa..555013034f7 100644 --- a/net/ceph/pagevec.c +++ b/net/ceph/pagevec.c @@ -53,7 +53,10 @@ void ceph_put_page_vector(struct page **pages, int num_pages, bool dirty) set_page_dirty_lock(pages[i]); put_page(pages[i]); } - kfree(pages); + if (is_vmalloc_addr(pages)) + vfree(pages); + else + kfree(pages); } EXPORT_SYMBOL(ceph_put_page_vector); @@ -165,36 +168,6 @@ void ceph_copy_from_page_vector(struct page **pages, EXPORT_SYMBOL(ceph_copy_from_page_vector); /* - * copy user data from a page vector into a user pointer - */ -int ceph_copy_page_vector_to_user(struct page **pages, - void __user *data, - loff_t off, size_t len) -{ - int i = 0; - int po = off & ~PAGE_CACHE_MASK; - int left = len; - int l, bad; - - while (left > 0) { - l = min_t(int, left, PAGE_CACHE_SIZE-po); - bad = copy_to_user(data, page_address(pages[i]) + po, l); - if (bad == l) - return -EFAULT; - data += l - bad; - left -= l - bad; - if (po) { - po += l - bad; - if (po == PAGE_CACHE_SIZE) - po = 0; - } - i++; - } - return len; -} -EXPORT_SYMBOL(ceph_copy_page_vector_to_user); - -/* * Zero an extent within a page vector. Offset is relative to the * start of the first page. */ |
