diff options
Diffstat (limited to 'net/ceph')
| -rw-r--r-- | net/ceph/auth_none.h | 2 | ||||
| -rw-r--r-- | net/ceph/auth_x.h | 2 | ||||
| -rw-r--r-- | net/ceph/buffer.c | 22 | ||||
| -rw-r--r-- | net/ceph/ceph_common.c | 26 | ||||
| -rw-r--r-- | net/ceph/crush/crush.c | 7 | ||||
| -rw-r--r-- | net/ceph/crush/mapper.c | 375 | ||||
| -rw-r--r-- | net/ceph/crypto.h | 48 | ||||
| -rw-r--r-- | net/ceph/debugfs.c | 66 | ||||
| -rw-r--r-- | net/ceph/messenger.c | 109 | ||||
| -rw-r--r-- | net/ceph/mon_client.c | 158 | ||||
| -rw-r--r-- | net/ceph/osd_client.c | 415 | ||||
| -rw-r--r-- | net/ceph/osdmap.c | 1059 | ||||
| -rw-r--r-- | net/ceph/pagevec.c | 35 | 
13 files changed, 1760 insertions, 564 deletions
diff --git a/net/ceph/auth_none.h b/net/ceph/auth_none.h index ed7d088b1bc..059a3ce4b53 100644 --- a/net/ceph/auth_none.h +++ b/net/ceph/auth_none.h @@ -23,7 +23,7 @@ struct ceph_auth_none_info {  	struct ceph_none_authorizer au;   /* we only need one; it's static */  }; -extern int ceph_auth_none_init(struct ceph_auth_client *ac); +int ceph_auth_none_init(struct ceph_auth_client *ac);  #endif diff --git a/net/ceph/auth_x.h b/net/ceph/auth_x.h index c5a058da7ac..65ee72082d9 100644 --- a/net/ceph/auth_x.h +++ b/net/ceph/auth_x.h @@ -45,7 +45,7 @@ struct ceph_x_info {  	struct ceph_x_authorizer auth_authorizer;  }; -extern int ceph_x_init(struct ceph_auth_client *ac); +int ceph_x_init(struct ceph_auth_client *ac);  #endif diff --git a/net/ceph/buffer.c b/net/ceph/buffer.c index bf3e6a13c21..621b5f65407 100644 --- a/net/ceph/buffer.c +++ b/net/ceph/buffer.c @@ -6,6 +6,7 @@  #include <linux/ceph/buffer.h>  #include <linux/ceph/decode.h> +#include <linux/ceph/libceph.h> /* for ceph_kv{malloc,free} */  struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp)  { @@ -15,16 +16,10 @@ struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp)  	if (!b)  		return NULL; -	b->vec.iov_base = kmalloc(len, gfp | __GFP_NOWARN); -	if (b->vec.iov_base) { -		b->is_vmalloc = false; -	} else { -		b->vec.iov_base = __vmalloc(len, gfp | __GFP_HIGHMEM, PAGE_KERNEL); -		if (!b->vec.iov_base) { -			kfree(b); -			return NULL; -		} -		b->is_vmalloc = true; +	b->vec.iov_base = ceph_kvmalloc(len, gfp); +	if (!b->vec.iov_base) { +		kfree(b); +		return NULL;  	}  	kref_init(&b->kref); @@ -40,12 +35,7 @@ void ceph_buffer_release(struct kref *kref)  	struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref);  	dout("buffer_release %p\n", b); -	if (b->vec.iov_base) { -		if (b->is_vmalloc) -			vfree(b->vec.iov_base); -		else -			kfree(b->vec.iov_base); -	} +	ceph_kvfree(b->vec.iov_base);  	kfree(b);  }  EXPORT_SYMBOL(ceph_buffer_release); diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 34b11ee8124..1675021d8c1 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -15,6 +15,7 @@  #include <linux/slab.h>  #include <linux/statfs.h>  #include <linux/string.h> +#include <linux/vmalloc.h>  #include <linux/nsproxy.h>  #include <net/net_namespace.h> @@ -71,6 +72,8 @@ const char *ceph_msg_type_name(int type)  	case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack";  	case CEPH_MSG_STATFS: return "statfs";  	case CEPH_MSG_STATFS_REPLY: return "statfs_reply"; +	case CEPH_MSG_MON_GET_VERSION: return "mon_get_version"; +	case CEPH_MSG_MON_GET_VERSION_REPLY: return "mon_get_version_reply";  	case CEPH_MSG_MDS_MAP: return "mds_map";  	case CEPH_MSG_CLIENT_SESSION: return "client_session";  	case CEPH_MSG_CLIENT_RECONNECT: return "client_reconnect"; @@ -170,6 +173,25 @@ int ceph_compare_options(struct ceph_options *new_opt,  }  EXPORT_SYMBOL(ceph_compare_options); +void *ceph_kvmalloc(size_t size, gfp_t flags) +{ +	if (size <= (PAGE_SIZE << PAGE_ALLOC_COSTLY_ORDER)) { +		void *ptr = kmalloc(size, flags | __GFP_NOWARN); +		if (ptr) +			return ptr; +	} + +	return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL); +} + +void ceph_kvfree(const void *ptr) +{ +	if (is_vmalloc_addr(ptr)) +		vfree(ptr); +	else +		kfree(ptr); +} +  static int parse_fsid(const char *str, struct ceph_fsid *fsid)  { @@ -461,8 +483,8 @@ EXPORT_SYMBOL(ceph_client_id);   * create a fresh client instance   */  struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, -				       unsigned int supported_features, -				       unsigned int required_features) +				       u64 supported_features, +				       u64 required_features)  {  	struct ceph_client *client;  	struct ceph_entity_addr *myaddr = NULL; diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c index 089613234f0..16bc199d9a6 100644 --- a/net/ceph/crush/crush.c +++ b/net/ceph/crush/crush.c @@ -116,11 +116,14 @@ void crush_destroy(struct crush_map *map)  	if (map->rules) {  		__u32 b;  		for (b = 0; b < map->max_rules; b++) -			kfree(map->rules[b]); +			crush_destroy_rule(map->rules[b]);  		kfree(map->rules);  	}  	kfree(map);  } - +void crush_destroy_rule(struct crush_rule *rule) +{ +	kfree(rule); +} diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index cbd06a91941..a1ef53c0441 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -189,7 +189,7 @@ static int terminal(int x)  static int bucket_tree_choose(struct crush_bucket_tree *bucket,  			      int x, int r)  { -	int n, l; +	int n;  	__u32 w;  	__u64 t; @@ -197,6 +197,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket,  	n = bucket->num_nodes >> 1;  	while (!terminal(n)) { +		int l;  		/* pick point in [0, w) */  		w = bucket->node_weights[n];  		t = (__u64)crush_hash32_4(bucket->h.hash, x, n, r, @@ -264,8 +265,12 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r)   * true if device is marked "out" (failed, fully offloaded)   * of the cluster   */ -static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x) +static int is_out(const struct crush_map *map, +		  const __u32 *weight, int weight_max, +		  int item, int x)  { +	if (item >= weight_max) +		return 1;  	if (weight[item] >= 0x10000)  		return 0;  	if (weight[item] == 0) @@ -277,7 +282,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in  }  /** - * crush_choose - choose numrep distinct items of given type + * crush_choose_firstn - choose numrep distinct items of given type   * @map: the crush_map   * @bucket: the bucket we are choose an item from   * @x: crush input value @@ -285,18 +290,28 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in   * @type: the type of item to choose   * @out: pointer to output vector   * @outpos: our position in that vector - * @firstn: true if choosing "first n" items, false if choosing "indep" - * @recurse_to_leaf: true if we want one device under each item of given type - * @descend_once: true if we should only try one descent before giving up + * @tries: number of attempts to make + * @recurse_tries: number of attempts to have recursive chooseleaf make + * @local_retries: localized retries + * @local_fallback_retries: localized fallback retries + * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose) + * @vary_r: pass r to recursive calls   * @out2: second output vector for leaf items (if @recurse_to_leaf) + * @parent_r: r value passed from the parent   */ -static int crush_choose(const struct crush_map *map, -			struct crush_bucket *bucket, -			const __u32 *weight, -			int x, int numrep, int type, -			int *out, int outpos, -			int firstn, int recurse_to_leaf, -			int descend_once, int *out2) +static int crush_choose_firstn(const struct crush_map *map, +			       struct crush_bucket *bucket, +			       const __u32 *weight, int weight_max, +			       int x, int numrep, int type, +			       int *out, int outpos, +			       unsigned int tries, +			       unsigned int recurse_tries, +			       unsigned int local_retries, +			       unsigned int local_fallback_retries, +			       int recurse_to_leaf, +			       unsigned int vary_r, +			       int *out2, +			       int parent_r)  {  	int rep;  	unsigned int ftotal, flocal; @@ -308,8 +323,11 @@ static int crush_choose(const struct crush_map *map,  	int itemtype;  	int collide, reject; -	dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", -		bucket->id, x, outpos, numrep); +	dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d tries %d recurse_tries %d local_retries %d local_fallback_retries %d parent_r %d\n", +		recurse_to_leaf ? "_LEAF" : "", +		bucket->id, x, outpos, numrep, +		tries, recurse_tries, local_retries, local_fallback_retries, +		parent_r);  	for (rep = outpos; rep < numrep; rep++) {  		/* keep trying until we get a non-out, non-colliding item */ @@ -324,36 +342,18 @@ static int crush_choose(const struct crush_map *map,  			do {  				collide = 0;  				retry_bucket = 0; -				r = rep; -				if (in->alg == CRUSH_BUCKET_UNIFORM) { -					/* be careful */ -					if (firstn || (__u32)numrep >= in->size) -						/* r' = r + f_total */ -						r += ftotal; -					else if (in->size % numrep == 0) -						/* r'=r+(n+1)*f_local */ -						r += (numrep+1) * -							(flocal+ftotal); -					else -						/* r' = r + n*f_local */ -						r += numrep * (flocal+ftotal); -				} else { -					if (firstn) -						/* r' = r + f_total */ -						r += ftotal; -					else -						/* r' = r + n*f_local */ -						r += numrep * (flocal+ftotal); -				} +				r = rep + parent_r; +				/* r' = r + f_total */ +				r += ftotal;  				/* bucket choose */  				if (in->size == 0) {  					reject = 1;  					goto reject;  				} -				if (map->choose_local_fallback_tries > 0 && +				if (local_fallback_retries > 0 &&  				    flocal >= (in->size>>1) && -				    flocal > map->choose_local_fallback_tries) +				    flocal > local_fallback_retries)  					item = bucket_perm_choose(in, x, r);  				else  					item = crush_bucket_choose(in, x, r); @@ -394,14 +394,23 @@ static int crush_choose(const struct crush_map *map,  				reject = 0;  				if (!collide && recurse_to_leaf) {  					if (item < 0) { -						if (crush_choose(map, +						int sub_r; +						if (vary_r) +							sub_r = r >> (vary_r-1); +						else +							sub_r = 0; +						if (crush_choose_firstn(map,  							 map->buckets[-1-item], -							 weight, +							 weight, weight_max,  							 x, outpos+1, 0,  							 out2, outpos, -							 firstn, 0, -							 map->chooseleaf_descend_once, -							 NULL) <= outpos) +							 recurse_tries, 0, +							 local_retries, +							 local_fallback_retries, +							 0, +							 vary_r, +							 NULL, +							 sub_r) <= outpos)  							/* didn't get leaf */  							reject = 1;  					} else { @@ -414,6 +423,7 @@ static int crush_choose(const struct crush_map *map,  					/* out? */  					if (itemtype == 0)  						reject = is_out(map, weight, +								weight_max,  								item, x);  					else  						reject = 0; @@ -424,17 +434,14 @@ reject:  					ftotal++;  					flocal++; -					if (reject && descend_once) -						/* let outer call try again */ -						skip_rep = 1; -					else if (collide && flocal <= map->choose_local_tries) +					if (collide && flocal <= local_retries)  						/* retry locally a few times */  						retry_bucket = 1; -					else if (map->choose_local_fallback_tries > 0 && -						 flocal <= in->size + map->choose_local_fallback_tries) +					else if (local_fallback_retries > 0 && +						 flocal <= in->size + local_fallback_retries)  						/* exhaustive bucket search */  						retry_bucket = 1; -					else if (ftotal <= map->choose_total_tries) +					else if (ftotal < tries)  						/* then retry descent */  						retry_descent = 1;  					else @@ -464,21 +471,179 @@ reject:  /** + * crush_choose_indep: alternative breadth-first positionally stable mapping + * + */ +static void crush_choose_indep(const struct crush_map *map, +			       struct crush_bucket *bucket, +			       const __u32 *weight, int weight_max, +			       int x, int left, int numrep, int type, +			       int *out, int outpos, +			       unsigned int tries, +			       unsigned int recurse_tries, +			       int recurse_to_leaf, +			       int *out2, +			       int parent_r) +{ +	struct crush_bucket *in = bucket; +	int endpos = outpos + left; +	int rep; +	unsigned int ftotal; +	int r; +	int i; +	int item = 0; +	int itemtype; +	int collide; + +	dprintk("CHOOSE%s INDEP bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", +		bucket->id, x, outpos, numrep); + +	/* initially my result is undefined */ +	for (rep = outpos; rep < endpos; rep++) { +		out[rep] = CRUSH_ITEM_UNDEF; +		if (out2) +			out2[rep] = CRUSH_ITEM_UNDEF; +	} + +	for (ftotal = 0; left > 0 && ftotal < tries; ftotal++) { +		for (rep = outpos; rep < endpos; rep++) { +			if (out[rep] != CRUSH_ITEM_UNDEF) +				continue; + +			in = bucket;  /* initial bucket */ + +			/* choose through intervening buckets */ +			for (;;) { +				/* note: we base the choice on the position +				 * even in the nested call.  that means that +				 * if the first layer chooses the same bucket +				 * in a different position, we will tend to +				 * choose a different item in that bucket. +				 * this will involve more devices in data +				 * movement and tend to distribute the load. +				 */ +				r = rep + parent_r; + +				/* be careful */ +				if (in->alg == CRUSH_BUCKET_UNIFORM && +				    in->size % numrep == 0) +					/* r'=r+(n+1)*f_total */ +					r += (numrep+1) * ftotal; +				else +					/* r' = r + n*f_total */ +					r += numrep * ftotal; + +				/* bucket choose */ +				if (in->size == 0) { +					dprintk("   empty bucket\n"); +					break; +				} + +				item = crush_bucket_choose(in, x, r); +				if (item >= map->max_devices) { +					dprintk("   bad item %d\n", item); +					out[rep] = CRUSH_ITEM_NONE; +					if (out2) +						out2[rep] = CRUSH_ITEM_NONE; +					left--; +					break; +				} + +				/* desired type? */ +				if (item < 0) +					itemtype = map->buckets[-1-item]->type; +				else +					itemtype = 0; +				dprintk("  item %d type %d\n", item, itemtype); + +				/* keep going? */ +				if (itemtype != type) { +					if (item >= 0 || +					    (-1-item) >= map->max_buckets) { +						dprintk("   bad item type %d\n", type); +						out[rep] = CRUSH_ITEM_NONE; +						if (out2) +							out2[rep] = +								CRUSH_ITEM_NONE; +						left--; +						break; +					} +					in = map->buckets[-1-item]; +					continue; +				} + +				/* collision? */ +				collide = 0; +				for (i = outpos; i < endpos; i++) { +					if (out[i] == item) { +						collide = 1; +						break; +					} +				} +				if (collide) +					break; + +				if (recurse_to_leaf) { +					if (item < 0) { +						crush_choose_indep(map, +						   map->buckets[-1-item], +						   weight, weight_max, +						   x, 1, numrep, 0, +						   out2, rep, +						   recurse_tries, 0, +						   0, NULL, r); +						if (out2[rep] == CRUSH_ITEM_NONE) { +							/* placed nothing; no leaf */ +							break; +						} +					} else { +						/* we already have a leaf! */ +						out2[rep] = item; +					} +				} + +				/* out? */ +				if (itemtype == 0 && +				    is_out(map, weight, weight_max, item, x)) +					break; + +				/* yay! */ +				out[rep] = item; +				left--; +				break; +			} +		} +	} +	for (rep = outpos; rep < endpos; rep++) { +		if (out[rep] == CRUSH_ITEM_UNDEF) { +			out[rep] = CRUSH_ITEM_NONE; +		} +		if (out2 && out2[rep] == CRUSH_ITEM_UNDEF) { +			out2[rep] = CRUSH_ITEM_NONE; +		} +	} +} + +/**   * crush_do_rule - calculate a mapping with the given input and rule   * @map: the crush_map   * @ruleno: the rule id   * @x: hash input   * @result: pointer to result vector   * @result_max: maximum result size + * @weight: weight vector (for map leaves) + * @weight_max: size of weight vector + * @scratch: scratch vector for private use; must be >= 3 * result_max   */  int crush_do_rule(const struct crush_map *map,  		  int ruleno, int x, int *result, int result_max, -		  const __u32 *weight) +		  const __u32 *weight, int weight_max, +		  int *scratch)  {  	int result_len; -	int a[CRUSH_MAX_SET]; -	int b[CRUSH_MAX_SET]; -	int c[CRUSH_MAX_SET]; +	int *a = scratch; +	int *b = scratch + result_max; +	int *c = scratch + result_max*2;  	int recurse_to_leaf;  	int *w;  	int wsize = 0; @@ -489,8 +654,20 @@ int crush_do_rule(const struct crush_map *map,  	__u32 step;  	int i, j;  	int numrep; -	int firstn; -	const int descend_once = 0; +	/* +	 * the original choose_total_tries value was off by one (it +	 * counted "retries" and not "tries").  add one. +	 */ +	int choose_tries = map->choose_total_tries + 1; +	int choose_leaf_tries = 0; +	/* +	 * the local tries values were counted as "retries", though, +	 * and need no adjustment +	 */ +	int choose_local_retries = map->choose_local_tries; +	int choose_local_fallback_retries = map->choose_local_fallback_tries; + +	int vary_r = map->chooseleaf_vary_r;  	if ((__u32)ruleno >= map->max_rules) {  		dprintk(" bad ruleno %d\n", ruleno); @@ -503,29 +680,54 @@ int crush_do_rule(const struct crush_map *map,  	o = b;  	for (step = 0; step < rule->len; step++) { +		int firstn = 0;  		struct crush_rule_step *curstep = &rule->steps[step]; -		firstn = 0;  		switch (curstep->op) {  		case CRUSH_RULE_TAKE:  			w[0] = curstep->arg1;  			wsize = 1;  			break; -		case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: +		case CRUSH_RULE_SET_CHOOSE_TRIES: +			if (curstep->arg1 > 0) +				choose_tries = curstep->arg1; +			break; + +		case CRUSH_RULE_SET_CHOOSELEAF_TRIES: +			if (curstep->arg1 > 0) +				choose_leaf_tries = curstep->arg1; +			break; + +		case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES: +			if (curstep->arg1 >= 0) +				choose_local_retries = curstep->arg1; +			break; + +		case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES: +			if (curstep->arg1 >= 0) +				choose_local_fallback_retries = curstep->arg1; +			break; + +		case CRUSH_RULE_SET_CHOOSELEAF_VARY_R: +			if (curstep->arg1 >= 0) +				vary_r = curstep->arg1; +			break; + +		case CRUSH_RULE_CHOOSELEAF_FIRSTN:  		case CRUSH_RULE_CHOOSE_FIRSTN:  			firstn = 1;  			/* fall through */ -		case CRUSH_RULE_CHOOSE_LEAF_INDEP: +		case CRUSH_RULE_CHOOSELEAF_INDEP:  		case CRUSH_RULE_CHOOSE_INDEP:  			if (wsize == 0)  				break;  			recurse_to_leaf =  				curstep->op == -				 CRUSH_RULE_CHOOSE_LEAF_FIRSTN || +				 CRUSH_RULE_CHOOSELEAF_FIRSTN ||  				curstep->op == -				CRUSH_RULE_CHOOSE_LEAF_INDEP; +				CRUSH_RULE_CHOOSELEAF_INDEP;  			/* reset output */  			osize = 0; @@ -543,22 +745,53 @@ int crush_do_rule(const struct crush_map *map,  						continue;  				}  				j = 0; -				osize += crush_choose(map, -						      map->buckets[-1-w[i]], -						      weight, -						      x, numrep, -						      curstep->arg2, -						      o+osize, j, -						      firstn, -						      recurse_to_leaf, -						      descend_once, c+osize); +				if (firstn) { +					int recurse_tries; +					if (choose_leaf_tries) +						recurse_tries = +							choose_leaf_tries; +					else if (map->chooseleaf_descend_once) +						recurse_tries = 1; +					else +						recurse_tries = choose_tries; +					osize += crush_choose_firstn( +						map, +						map->buckets[-1-w[i]], +						weight, weight_max, +						x, numrep, +						curstep->arg2, +						o+osize, j, +						choose_tries, +						recurse_tries, +						choose_local_retries, +						choose_local_fallback_retries, +						recurse_to_leaf, +						vary_r, +						c+osize, +						0); +				} else { +					crush_choose_indep( +						map, +						map->buckets[-1-w[i]], +						weight, weight_max, +						x, numrep, numrep, +						curstep->arg2, +						o+osize, j, +						choose_tries, +						choose_leaf_tries ? +						   choose_leaf_tries : 1, +						recurse_to_leaf, +						c+osize, +						0); +					osize += numrep; +				}  			}  			if (recurse_to_leaf)  				/* copy final _leaf_ values to output set */  				memcpy(o, c, osize*sizeof(*o)); -			/* swap t and w arrays */ +			/* swap o and w arrays */  			tmp = o;  			o = w;  			w = tmp; diff --git a/net/ceph/crypto.h b/net/ceph/crypto.h index 3572dc518bc..d1498224c49 100644 --- a/net/ceph/crypto.h +++ b/net/ceph/crypto.h @@ -20,34 +20,32 @@ static inline void ceph_crypto_key_destroy(struct ceph_crypto_key *key)  		kfree(key->key);  } -extern int ceph_crypto_key_clone(struct ceph_crypto_key *dst, -				 const struct ceph_crypto_key *src); -extern int ceph_crypto_key_encode(struct ceph_crypto_key *key, -				  void **p, void *end); -extern int ceph_crypto_key_decode(struct ceph_crypto_key *key, -				  void **p, void *end); -extern int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *in); +int ceph_crypto_key_clone(struct ceph_crypto_key *dst, +			  const struct ceph_crypto_key *src); +int ceph_crypto_key_encode(struct ceph_crypto_key *key, void **p, void *end); +int ceph_crypto_key_decode(struct ceph_crypto_key *key, void **p, void *end); +int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *in);  /* crypto.c */ -extern int ceph_decrypt(struct ceph_crypto_key *secret, -			void *dst, size_t *dst_len, -			const void *src, size_t src_len); -extern int ceph_encrypt(struct ceph_crypto_key *secret, -			void *dst, size_t *dst_len, -			const void *src, size_t src_len); -extern int ceph_decrypt2(struct ceph_crypto_key *secret, -			void *dst1, size_t *dst1_len, -			void *dst2, size_t *dst2_len, -			const void *src, size_t src_len); -extern int ceph_encrypt2(struct ceph_crypto_key *secret, -			 void *dst, size_t *dst_len, -			 const void *src1, size_t src1_len, -			 const void *src2, size_t src2_len); -extern int ceph_crypto_init(void); -extern void ceph_crypto_shutdown(void); +int ceph_decrypt(struct ceph_crypto_key *secret, +		 void *dst, size_t *dst_len, +		 const void *src, size_t src_len); +int ceph_encrypt(struct ceph_crypto_key *secret, +		 void *dst, size_t *dst_len, +		 const void *src, size_t src_len); +int ceph_decrypt2(struct ceph_crypto_key *secret, +		  void *dst1, size_t *dst1_len, +		  void *dst2, size_t *dst2_len, +		  const void *src, size_t src_len); +int ceph_encrypt2(struct ceph_crypto_key *secret, +		  void *dst, size_t *dst_len, +		  const void *src1, size_t src1_len, +		  const void *src2, size_t src2_len); +int ceph_crypto_init(void); +void ceph_crypto_shutdown(void);  /* armor.c */ -extern int ceph_armor(char *dst, const char *src, const char *end); -extern int ceph_unarmor(char *dst, const char *src, const char *end); +int ceph_armor(char *dst, const char *src, const char *end); +int ceph_unarmor(char *dst, const char *src, const char *end);  #endif diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 83661cdc076..d1a62c69a9f 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -53,34 +53,55 @@ static int osdmap_show(struct seq_file *s, void *p)  {  	int i;  	struct ceph_client *client = s->private; +	struct ceph_osdmap *map = client->osdc.osdmap;  	struct rb_node *n; -	if (client->osdc.osdmap == NULL) +	if (map == NULL)  		return 0; -	seq_printf(s, "epoch %d\n", client->osdc.osdmap->epoch); + +	seq_printf(s, "epoch %d\n", map->epoch);  	seq_printf(s, "flags%s%s\n", -		   (client->osdc.osdmap->flags & CEPH_OSDMAP_NEARFULL) ? -		   " NEARFULL" : "", -		   (client->osdc.osdmap->flags & CEPH_OSDMAP_FULL) ? -		   " FULL" : ""); -	for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) { +		   (map->flags & CEPH_OSDMAP_NEARFULL) ?  " NEARFULL" : "", +		   (map->flags & CEPH_OSDMAP_FULL) ?  " FULL" : ""); + +	for (n = rb_first(&map->pg_pools); n; n = rb_next(n)) {  		struct ceph_pg_pool_info *pool =  			rb_entry(n, struct ceph_pg_pool_info, node); -		seq_printf(s, "pg_pool %llu pg_num %d / %d\n", -			   (unsigned long long)pool->id, pool->pg_num, -			   pool->pg_num_mask); + +		seq_printf(s, "pool %lld pg_num %u (%d) read_tier %lld write_tier %lld\n", +			   pool->id, pool->pg_num, pool->pg_num_mask, +			   pool->read_tier, pool->write_tier);  	} -	for (i = 0; i < client->osdc.osdmap->max_osd; i++) { -		struct ceph_entity_addr *addr = -			&client->osdc.osdmap->osd_addr[i]; -		int state = client->osdc.osdmap->osd_state[i]; +	for (i = 0; i < map->max_osd; i++) { +		struct ceph_entity_addr *addr = &map->osd_addr[i]; +		int state = map->osd_state[i];  		char sb[64]; -		seq_printf(s, "\tosd%d\t%s\t%3d%%\t(%s)\n", +		seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n",  			   i, ceph_pr_addr(&addr->in_addr), -			   ((client->osdc.osdmap->osd_weight[i]*100) >> 16), -			   ceph_osdmap_state_str(sb, sizeof(sb), state)); +			   ((map->osd_weight[i]*100) >> 16), +			   ceph_osdmap_state_str(sb, sizeof(sb), state), +			   ((ceph_get_primary_affinity(map, i)*100) >> 16)); +	} +	for (n = rb_first(&map->pg_temp); n; n = rb_next(n)) { +		struct ceph_pg_mapping *pg = +			rb_entry(n, struct ceph_pg_mapping, node); + +		seq_printf(s, "pg_temp %llu.%x [", pg->pgid.pool, +			   pg->pgid.seed); +		for (i = 0; i < pg->pg_temp.len; i++) +			seq_printf(s, "%s%d", (i == 0 ? "" : ","), +				   pg->pg_temp.osds[i]); +		seq_printf(s, "]\n");  	} +	for (n = rb_first(&map->primary_temp); n; n = rb_next(n)) { +		struct ceph_pg_mapping *pg = +			rb_entry(n, struct ceph_pg_mapping, node); + +		seq_printf(s, "primary_temp %llu.%x %d\n", pg->pgid.pool, +			   pg->pgid.seed, pg->primary_temp.osd); +	} +  	return 0;  } @@ -105,9 +126,13 @@ static int monc_show(struct seq_file *s, void *p)  		req = rb_entry(rp, struct ceph_mon_generic_request, node);  		op = le16_to_cpu(req->request->hdr.type);  		if (op == CEPH_MSG_STATFS) -			seq_printf(s, "%lld statfs\n", req->tid); +			seq_printf(s, "%llu statfs\n", req->tid); +		else if (op == CEPH_MSG_POOLOP) +			seq_printf(s, "%llu poolop\n", req->tid); +		else if (op == CEPH_MSG_MON_GET_VERSION) +			seq_printf(s, "%llu mon_get_version", req->tid);  		else -			seq_printf(s, "%lld unknown\n", req->tid); +			seq_printf(s, "%llu unknown\n", req->tid);  	}  	mutex_unlock(&monc->mutex); @@ -132,7 +157,8 @@ static int osdc_show(struct seq_file *s, void *pp)  			   req->r_osd ? req->r_osd->o_osd : -1,  			   req->r_pgid.pool, req->r_pgid.seed); -		seq_printf(s, "%.*s", req->r_oid_len, req->r_oid); +		seq_printf(s, "%.*s", req->r_base_oid.name_len, +			   req->r_base_oid.name);  		if (req->r_reassert_version.epoch)  			seq_printf(s, "\t%u'%llu", diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 4a5df7b1cc9..1948d592aa5 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -15,6 +15,7 @@  #include <linux/dns_resolver.h>  #include <net/tcp.h> +#include <linux/ceph/ceph_features.h>  #include <linux/ceph/libceph.h>  #include <linux/ceph/messenger.h>  #include <linux/ceph/decode.h> @@ -382,7 +383,7 @@ static void con_sock_state_closed(struct ceph_connection *con)   */  /* data available on socket, or listen socket received a connect */ -static void ceph_sock_data_ready(struct sock *sk, int count_unused) +static void ceph_sock_data_ready(struct sock *sk)  {  	struct ceph_connection *con = sk->sk_user_data;  	if (atomic_read(&con->msgr->stopping)) { @@ -556,7 +557,7 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,  	return r;  } -static int ceph_tcp_sendpage(struct socket *sock, struct page *page, +static int __ceph_tcp_sendpage(struct socket *sock, struct page *page,  		     int offset, size_t size, bool more)  {  	int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR); @@ -569,6 +570,24 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,  	return ret;  } +static int ceph_tcp_sendpage(struct socket *sock, struct page *page, +		     int offset, size_t size, bool more) +{ +	int ret; +	struct kvec iov; + +	/* sendpage cannot properly handle pages with page_count == 0, +	 * we need to fallback to sendmsg if that's the case */ +	if (page_count(page) >= 1) +		return __ceph_tcp_sendpage(sock, page, offset, size, more); + +	iov.iov_base = kmap(page) + offset; +	iov.iov_len = size; +	ret = ceph_tcp_sendmsg(sock, &iov, 1, size, more); +	kunmap(page); + +	return ret; +}  /*   * Shutdown/close the socket for the given connection. @@ -777,13 +796,12 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,  	bio = data->bio;  	BUG_ON(!bio); -	BUG_ON(!bio->bi_vcnt);  	cursor->resid = min(length, data->bio_length);  	cursor->bio = bio; -	cursor->vector_index = 0; -	cursor->vector_offset = 0; -	cursor->last_piece = length <= bio->bi_io_vec[0].bv_len; +	cursor->bvec_iter = bio->bi_iter; +	cursor->last_piece = +		cursor->resid <= bio_iter_len(bio, cursor->bvec_iter);  }  static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor, @@ -792,71 +810,67 @@ static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,  {  	struct ceph_msg_data *data = cursor->data;  	struct bio *bio; -	struct bio_vec *bio_vec; -	unsigned int index; +	struct bio_vec bio_vec;  	BUG_ON(data->type != CEPH_MSG_DATA_BIO);  	bio = cursor->bio;  	BUG_ON(!bio); -	index = cursor->vector_index; -	BUG_ON(index >= (unsigned int) bio->bi_vcnt); +	bio_vec = bio_iter_iovec(bio, cursor->bvec_iter); -	bio_vec = &bio->bi_io_vec[index]; -	BUG_ON(cursor->vector_offset >= bio_vec->bv_len); -	*page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset); +	*page_offset = (size_t) bio_vec.bv_offset;  	BUG_ON(*page_offset >= PAGE_SIZE);  	if (cursor->last_piece) /* pagelist offset is always 0 */  		*length = cursor->resid;  	else -		*length = (size_t) (bio_vec->bv_len - cursor->vector_offset); +		*length = (size_t) bio_vec.bv_len;  	BUG_ON(*length > cursor->resid);  	BUG_ON(*page_offset + *length > PAGE_SIZE); -	return bio_vec->bv_page; +	return bio_vec.bv_page;  }  static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,  					size_t bytes)  {  	struct bio *bio; -	struct bio_vec *bio_vec; -	unsigned int index; +	struct bio_vec bio_vec;  	BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);  	bio = cursor->bio;  	BUG_ON(!bio); -	index = cursor->vector_index; -	BUG_ON(index >= (unsigned int) bio->bi_vcnt); -	bio_vec = &bio->bi_io_vec[index]; +	bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);  	/* Advance the cursor offset */  	BUG_ON(cursor->resid < bytes);  	cursor->resid -= bytes; -	cursor->vector_offset += bytes; -	if (cursor->vector_offset < bio_vec->bv_len) + +	bio_advance_iter(bio, &cursor->bvec_iter, bytes); + +	if (bytes < bio_vec.bv_len)  		return false;	/* more bytes to process in this segment */ -	BUG_ON(cursor->vector_offset != bio_vec->bv_len);  	/* Move on to the next segment, and possibly the next bio */ -	if (++index == (unsigned int) bio->bi_vcnt) { +	if (!cursor->bvec_iter.bi_size) {  		bio = bio->bi_next; -		index = 0; +		cursor->bio = bio; +		if (bio) +			cursor->bvec_iter = bio->bi_iter; +		else +			memset(&cursor->bvec_iter, 0, +			       sizeof(cursor->bvec_iter));  	} -	cursor->bio = bio; -	cursor->vector_index = index; -	cursor->vector_offset = 0;  	if (!cursor->last_piece) {  		BUG_ON(!cursor->resid);  		BUG_ON(!bio);  		/* A short read is OK, so use <= rather than == */ -		if (cursor->resid <= bio->bi_io_vec[index].bv_len) +		if (cursor->resid <= bio_iter_len(bio, cursor->bvec_iter))  			cursor->last_piece = true;  	} @@ -923,6 +937,9 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,  	if (!bytes || cursor->page_offset)  		return false;	/* more bytes to process in the current page */ +	if (!cursor->resid) +		return false;   /* no more data */ +  	/* Move on to the next page; offset is already at 0 */  	BUG_ON(cursor->page_index >= cursor->page_count); @@ -1008,6 +1025,9 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,  	if (!bytes || cursor->offset & ~PAGE_MASK)  		return false;	/* more bytes to process in the current page */ +	if (!cursor->resid) +		return false;   /* no more data */ +  	/* Move on to the next page */  	BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); @@ -1865,7 +1885,9 @@ int ceph_parse_ips(const char *c, const char *end,  				port = (port * 10) + (*p - '0');  				p++;  			} -			if (port > 65535 || port == 0) +			if (port == 0) +				port = CEPH_MON_PORT; +			else if (port > 65535)  				goto bad;  		} else {  			port = CEPH_MON_PORT; @@ -1945,7 +1967,8 @@ static int process_connect(struct ceph_connection *con)  {  	u64 sup_feat = con->msgr->supported_features;  	u64 req_feat = con->msgr->required_features; -	u64 server_feat = le64_to_cpu(con->in_reply.features); +	u64 server_feat = ceph_sanitize_features( +				le64_to_cpu(con->in_reply.features));  	int ret;  	dout("process_connect on %p tag %d\n", con, (int)con->in_tag); @@ -2853,8 +2876,8 @@ static void con_fault(struct ceph_connection *con)   */  void ceph_messenger_init(struct ceph_messenger *msgr,  			struct ceph_entity_addr *myaddr, -			u32 supported_features, -			u32 required_features, +			u64 supported_features, +			u64 required_features,  			bool nocrc)  {  	msgr->supported_features = supported_features; @@ -3126,15 +3149,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,  	INIT_LIST_HEAD(&m->data);  	/* front */ -	m->front_max = front_len;  	if (front_len) { -		if (front_len > PAGE_CACHE_SIZE) { -			m->front.iov_base = __vmalloc(front_len, flags, -						      PAGE_KERNEL); -			m->front_is_vmalloc = true; -		} else { -			m->front.iov_base = kmalloc(front_len, flags); -		} +		m->front.iov_base = ceph_kvmalloc(front_len, flags);  		if (m->front.iov_base == NULL) {  			dout("ceph_msg_new can't allocate %d bytes\n",  			     front_len); @@ -3143,7 +3159,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,  	} else {  		m->front.iov_base = NULL;  	} -	m->front.iov_len = front_len; +	m->front_alloc_len = m->front.iov_len = front_len;  	dout("ceph_msg_new %p front %d\n", m, front_len);  	return m; @@ -3256,10 +3272,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)  void ceph_msg_kfree(struct ceph_msg *m)  {  	dout("msg_kfree %p\n", m); -	if (m->front_is_vmalloc) -		vfree(m->front.iov_base); -	else -		kfree(m->front.iov_base); +	ceph_kvfree(m->front.iov_base);  	kmem_cache_free(ceph_msg_cache, m);  } @@ -3301,8 +3314,8 @@ EXPORT_SYMBOL(ceph_msg_last_put);  void ceph_msg_dump(struct ceph_msg *msg)  { -	pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, -		 msg->front_max, msg->data_length); +	pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg, +		 msg->front_alloc_len, msg->data_length);  	print_hex_dump(KERN_DEBUG, "header: ",  		       DUMP_PREFIX_OFFSET, 16, 1,  		       &msg->hdr, sizeof(msg->hdr), true); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 1fe25cd29d0..067d3af2eaf 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -152,7 +152,7 @@ static int __open_session(struct ceph_mon_client *monc)  		/* initiatiate authentication handshake */  		ret = ceph_auth_build_hello(monc->auth,  					    monc->m_auth->front.iov_base, -					    monc->m_auth->front_max); +					    monc->m_auth->front_alloc_len);  		__send_prepared_auth_request(monc, ret);  	} else {  		dout("open_session mon%d already open\n", monc->cur_mon); @@ -196,7 +196,7 @@ static void __send_subscribe(struct ceph_mon_client *monc)  		int num;  		p = msg->front.iov_base; -		end = p + msg->front_max; +		end = p + msg->front_alloc_len;  		num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;  		ceph_encode_32(&p, num); @@ -296,6 +296,33 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)  		__send_subscribe(monc);  	mutex_unlock(&monc->mutex);  } +EXPORT_SYMBOL(ceph_monc_request_next_osdmap); + +int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, +			  unsigned long timeout) +{ +	unsigned long started = jiffies; +	int ret; + +	mutex_lock(&monc->mutex); +	while (monc->have_osdmap < epoch) { +		mutex_unlock(&monc->mutex); + +		if (timeout != 0 && time_after_eq(jiffies, started + timeout)) +			return -ETIMEDOUT; + +		ret = wait_event_interruptible_timeout(monc->client->auth_wq, +					 monc->have_osdmap >= epoch, timeout); +		if (ret < 0) +			return ret; + +		mutex_lock(&monc->mutex); +	} + +	mutex_unlock(&monc->mutex); +	return 0; +} +EXPORT_SYMBOL(ceph_monc_wait_osdmap);  /*   * @@ -477,14 +504,13 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,  	return m;  } -static int do_generic_request(struct ceph_mon_client *monc, -			      struct ceph_mon_generic_request *req) +static int __do_generic_request(struct ceph_mon_client *monc, u64 tid, +				struct ceph_mon_generic_request *req)  {  	int err;  	/* register request */ -	mutex_lock(&monc->mutex); -	req->tid = ++monc->last_tid; +	req->tid = tid != 0 ? tid : ++monc->last_tid;  	req->request->hdr.tid = cpu_to_le64(req->tid);  	__insert_generic_request(monc, req);  	monc->num_generic_requests++; @@ -496,13 +522,24 @@ static int do_generic_request(struct ceph_mon_client *monc,  	mutex_lock(&monc->mutex);  	rb_erase(&req->node, &monc->generic_request_tree);  	monc->num_generic_requests--; -	mutex_unlock(&monc->mutex);  	if (!err)  		err = req->result;  	return err;  } +static int do_generic_request(struct ceph_mon_client *monc, +			      struct ceph_mon_generic_request *req) +{ +	int err; + +	mutex_lock(&monc->mutex); +	err = __do_generic_request(monc, 0, req); +	mutex_unlock(&monc->mutex); + +	return err; +} +  /*   * statfs   */ @@ -579,6 +616,96 @@ out:  }  EXPORT_SYMBOL(ceph_monc_do_statfs); +static void handle_get_version_reply(struct ceph_mon_client *monc, +				     struct ceph_msg *msg) +{ +	struct ceph_mon_generic_request *req; +	u64 tid = le64_to_cpu(msg->hdr.tid); +	void *p = msg->front.iov_base; +	void *end = p + msg->front_alloc_len; +	u64 handle; + +	dout("%s %p tid %llu\n", __func__, msg, tid); + +	ceph_decode_need(&p, end, 2*sizeof(u64), bad); +	handle = ceph_decode_64(&p); +	if (tid != 0 && tid != handle) +		goto bad; + +	mutex_lock(&monc->mutex); +	req = __lookup_generic_req(monc, handle); +	if (req) { +		*(u64 *)req->buf = ceph_decode_64(&p); +		req->result = 0; +		get_generic_request(req); +	} +	mutex_unlock(&monc->mutex); +	if (req) { +		complete_all(&req->completion); +		put_generic_request(req); +	} + +	return; +bad: +	pr_err("corrupt mon_get_version reply\n"); +	ceph_msg_dump(msg); +} + +/* + * Send MMonGetVersion and wait for the reply. + * + * @what: one of "mdsmap", "osdmap" or "monmap" + */ +int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what, +			     u64 *newest) +{ +	struct ceph_mon_generic_request *req; +	void *p, *end; +	u64 tid; +	int err; + +	req = kzalloc(sizeof(*req), GFP_NOFS); +	if (!req) +		return -ENOMEM; + +	kref_init(&req->kref); +	req->buf = newest; +	req->buf_len = sizeof(*newest); +	init_completion(&req->completion); + +	req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, +				    sizeof(u64) + sizeof(u32) + strlen(what), +				    GFP_NOFS, true); +	if (!req->request) { +		err = -ENOMEM; +		goto out; +	} + +	req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024, +				  GFP_NOFS, true); +	if (!req->reply) { +		err = -ENOMEM; +		goto out; +	} + +	p = req->request->front.iov_base; +	end = p + req->request->front_alloc_len; + +	/* fill out request */ +	mutex_lock(&monc->mutex); +	tid = ++monc->last_tid; +	ceph_encode_64(&p, tid); /* handle */ +	ceph_encode_string(&p, end, what, strlen(what)); + +	err = __do_generic_request(monc, tid, req); + +	mutex_unlock(&monc->mutex); +out: +	kref_put(&req->kref, release_generic_request); +	return err; +} +EXPORT_SYMBOL(ceph_monc_do_get_version); +  /*   * pool ops   */ @@ -897,7 +1024,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc,  	ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,  				     msg->front.iov_len,  				     monc->m_auth->front.iov_base, -				     monc->m_auth->front_max); +				     monc->m_auth->front_alloc_len);  	if (ret < 0) {  		monc->client->auth_err = ret;  		wake_up_all(&monc->client->auth_wq); @@ -939,7 +1066,7 @@ static int __validate_auth(struct ceph_mon_client *monc)  		return 0;  	ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, -			      monc->m_auth->front_max); +			      monc->m_auth->front_alloc_len);  	if (ret <= 0)  		return ret; /* either an error, or no need to authenticate */  	__send_prepared_auth_request(monc, ret); @@ -981,6 +1108,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)  		handle_statfs_reply(monc, msg);  		break; +	case CEPH_MSG_MON_GET_VERSION_REPLY: +		handle_get_version_reply(monc, msg); +		break; +  	case CEPH_MSG_POOLOP_REPLY:  		handle_poolop_reply(monc, msg);  		break; @@ -1029,6 +1160,15 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,  	case CEPH_MSG_AUTH_REPLY:  		m = ceph_msg_get(monc->m_auth_reply);  		break; +	case CEPH_MSG_MON_GET_VERSION_REPLY: +		if (le64_to_cpu(hdr->tid) != 0) +			return get_generic_reply(con, hdr, skip); + +		/* +		 * Older OSDs don't set reply tid even if the orignal +		 * request had a non-zero tid.  Workaround this weirdness +		 * by falling through to the allocate case. +		 */  	case CEPH_MSG_MON_MAP:  	case CEPH_MSG_MDS_MAP:  	case CEPH_MSG_OSD_MAP: diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 1606f740d6a..05be0c18169 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -338,7 +338,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,  	msg_size = 4 + 4 + 8 + 8 + 4+8;  	msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */  	msg_size += 1 + 8 + 4 + 4;     /* pg_t */ -	msg_size += 4 + MAX_OBJ_NAME_SIZE; +	msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */  	msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);  	msg_size += 8;  /* snapid */  	msg_size += 8;  /* snap_seq */ @@ -368,6 +368,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,  	INIT_LIST_HEAD(&req->r_req_lru_item);  	INIT_LIST_HEAD(&req->r_osd_item); +	req->r_base_oloc.pool = -1; +	req->r_target_oloc.pool = -1; +  	/* create reply message */  	if (use_mempool)  		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); @@ -433,6 +436,7 @@ static bool osd_req_opcode_valid(u16 opcode)  	case CEPH_OSD_OP_OMAPCLEAR:  	case CEPH_OSD_OP_OMAPRMKEYS:  	case CEPH_OSD_OP_OMAP_CMP: +	case CEPH_OSD_OP_SETALLOCHINT:  	case CEPH_OSD_OP_CLONERANGE:  	case CEPH_OSD_OP_ASSERT_SRC_VERSION:  	case CEPH_OSD_OP_SRC_CMPXATTR: @@ -588,6 +592,26 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req,  }  EXPORT_SYMBOL(osd_req_op_watch_init); +void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, +				unsigned int which, +				u64 expected_object_size, +				u64 expected_write_size) +{ +	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, +						      CEPH_OSD_OP_SETALLOCHINT); + +	op->alloc_hint.expected_object_size = expected_object_size; +	op->alloc_hint.expected_write_size = expected_write_size; + +	/* +	 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed +	 * not worth a feature bit.  Set FAILOK per-op flag to make +	 * sure older osds don't trip over an unsupported opcode. +	 */ +	op->flags |= CEPH_OSD_OP_FLAG_FAILOK; +} +EXPORT_SYMBOL(osd_req_op_alloc_hint_init); +  static void ceph_osdc_msg_data_add(struct ceph_msg *msg,  				struct ceph_osd_data *osd_data)  { @@ -678,6 +702,12 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,  		dst->watch.ver = cpu_to_le64(src->watch.ver);  		dst->watch.flag = src->watch.flag;  		break; +	case CEPH_OSD_OP_SETALLOCHINT: +		dst->alloc_hint.expected_object_size = +		    cpu_to_le64(src->alloc_hint.expected_object_size); +		dst->alloc_hint.expected_write_size = +		    cpu_to_le64(src->alloc_hint.expected_write_size); +		break;  	default:  		pr_err("unsupported osd opcode %s\n",  			ceph_osd_op_name(src->op)); @@ -685,7 +715,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,  		return 0;  	} +  	dst->op = cpu_to_le16(src->op); +	dst->flags = cpu_to_le32(src->flags);  	dst->payload_len = cpu_to_le32(src->payload_len);  	return request_data_len; @@ -761,11 +793,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,  	if (num_ops > 1)  		osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); -	req->r_file_layout = *layout;  /* keep a copy */ +	req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); -	snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", -		vino.ino, objnum); -	req->r_oid_len = strlen(req->r_oid); +	snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name), +		 "%llx.%08llx", vino.ino, objnum); +	req->r_base_oid.name_len = strlen(req->r_base_oid.name);  	return req;  } @@ -1044,8 +1076,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)  			!ceph_con_opened(&osd->o_con)) {  		struct ceph_osd_request *req; -		dout(" osd addr hasn't changed and connection never opened," -		     " letting msgr retry"); +		dout("osd addr hasn't changed and connection never opened, " +		     "letting msgr retry\n");  		/* touch each r_stamp for handle_timeout()'s benfit */  		list_for_each_entry(req, &osd->o_requests, r_osd_item)  			req->r_stamp = jiffies; @@ -1232,6 +1264,61 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,  EXPORT_SYMBOL(ceph_osdc_set_request_linger);  /* + * Returns whether a request should be blocked from being sent + * based on the current osdmap and osd_client settings. + * + * Caller should hold map_sem for read. + */ +static bool __req_should_be_paused(struct ceph_osd_client *osdc, +				   struct ceph_osd_request *req) +{ +	bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD); +	bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) || +		ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); +	return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) || +		(req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr); +} + +/* + * Calculate mapping of a request to a PG.  Takes tiering into account. + */ +static int __calc_request_pg(struct ceph_osdmap *osdmap, +			     struct ceph_osd_request *req, +			     struct ceph_pg *pg_out) +{ +	bool need_check_tiering; + +	need_check_tiering = false; +	if (req->r_target_oloc.pool == -1) { +		req->r_target_oloc = req->r_base_oloc; /* struct */ +		need_check_tiering = true; +	} +	if (req->r_target_oid.name_len == 0) { +		ceph_oid_copy(&req->r_target_oid, &req->r_base_oid); +		need_check_tiering = true; +	} + +	if (need_check_tiering && +	    (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { +		struct ceph_pg_pool_info *pi; + +		pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool); +		if (pi) { +			if ((req->r_flags & CEPH_OSD_FLAG_READ) && +			    pi->read_tier >= 0) +				req->r_target_oloc.pool = pi->read_tier; +			if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && +			    pi->write_tier >= 0) +				req->r_target_oloc.pool = pi->write_tier; +		} +		/* !pi is caught in ceph_oloc_oid_to_pg() */ +	} + +	return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc, +				   &req->r_target_oid, pg_out); +} + +/*   * Pick an osd (the first 'up' osd in the pg), allocate the osd struct   * (as needed), and set the request r_osd appropriately.  If there is   * no up osd, set r_osd to NULL.  Move the request to the appropriate list @@ -1246,30 +1333,35 @@ static int __map_request(struct ceph_osd_client *osdc,  {  	struct ceph_pg pgid;  	int acting[CEPH_PG_MAX_SIZE]; -	int o = -1, num = 0; +	int num, o;  	int err; +	bool was_paused;  	dout("map_request %p tid %lld\n", req, req->r_tid); -	err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, -				ceph_file_layout_pg_pool(req->r_file_layout)); + +	err = __calc_request_pg(osdc->osdmap, req, &pgid);  	if (err) {  		list_move(&req->r_req_lru_item, &osdc->req_notarget);  		return err;  	}  	req->r_pgid = pgid; -	err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); -	if (err > 0) { -		o = acting[0]; -		num = err; -	} +	num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o); +	if (num < 0) +		num = 0; + +	was_paused = req->r_paused; +	req->r_paused = __req_should_be_paused(osdc, req); +	if (was_paused && !req->r_paused) +		force_resend = 1;  	if ((!force_resend &&  	     req->r_osd && req->r_osd->o_osd == o &&  	     req->r_sent >= req->r_osd->o_incarnation &&  	     req->r_num_pg_osds == num &&  	     memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || -	    (req->r_osd == NULL && o == -1)) +	    (req->r_osd == NULL && o == -1) || +	    req->r_paused)  		return 0;  /* no change */  	dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", @@ -1331,7 +1423,7 @@ static void __send_request(struct ceph_osd_client *osdc,  	/* fill in message content that changes each time we send it */  	put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);  	put_unaligned_le32(req->r_flags, req->r_request_flags); -	put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); +	put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);  	p = req->r_request_pgid;  	ceph_encode_64(&p, req->r_pgid.pool);  	ceph_encode_32(&p, req->r_pgid.seed); @@ -1362,6 +1454,40 @@ static void __send_queued(struct ceph_osd_client *osdc)  }  /* + * Caller should hold map_sem for read and request_mutex. + */ +static int __ceph_osdc_start_request(struct ceph_osd_client *osdc, +				     struct ceph_osd_request *req, +				     bool nofail) +{ +	int rc; + +	__register_request(osdc, req); +	req->r_sent = 0; +	req->r_got_reply = 0; +	rc = __map_request(osdc, req, 0); +	if (rc < 0) { +		if (nofail) { +			dout("osdc_start_request failed map, " +				" will retry %lld\n", req->r_tid); +			rc = 0; +		} else { +			__unregister_request(osdc, req); +		} +		return rc; +	} + +	if (req->r_osd == NULL) { +		dout("send_request %p no up osds in pg\n", req); +		ceph_monc_request_next_osdmap(&osdc->client->monc); +	} else { +		__send_queued(osdc); +	} + +	return 0; +} + +/*   * Timeout callback, called every N seconds when 1 or more osd   * requests has been active for more than N seconds.  When this   * happens, we ping all OSDs with requests who have timed out to @@ -1432,6 +1558,109 @@ static void handle_osds_timeout(struct work_struct *work)  			      round_jiffies_relative(delay));  } +static int ceph_oloc_decode(void **p, void *end, +			    struct ceph_object_locator *oloc) +{ +	u8 struct_v, struct_cv; +	u32 len; +	void *struct_end; +	int ret = 0; + +	ceph_decode_need(p, end, 1 + 1 + 4, e_inval); +	struct_v = ceph_decode_8(p); +	struct_cv = ceph_decode_8(p); +	if (struct_v < 3) { +		pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", +			struct_v, struct_cv); +		goto e_inval; +	} +	if (struct_cv > 6) { +		pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", +			struct_v, struct_cv); +		goto e_inval; +	} +	len = ceph_decode_32(p); +	ceph_decode_need(p, end, len, e_inval); +	struct_end = *p + len; + +	oloc->pool = ceph_decode_64(p); +	*p += 4; /* skip preferred */ + +	len = ceph_decode_32(p); +	if (len > 0) { +		pr_warn("ceph_object_locator::key is set\n"); +		goto e_inval; +	} + +	if (struct_v >= 5) { +		len = ceph_decode_32(p); +		if (len > 0) { +			pr_warn("ceph_object_locator::nspace is set\n"); +			goto e_inval; +		} +	} + +	if (struct_v >= 6) { +		s64 hash = ceph_decode_64(p); +		if (hash != -1) { +			pr_warn("ceph_object_locator::hash is set\n"); +			goto e_inval; +		} +	} + +	/* skip the rest */ +	*p = struct_end; +out: +	return ret; + +e_inval: +	ret = -EINVAL; +	goto out; +} + +static int ceph_redirect_decode(void **p, void *end, +				struct ceph_request_redirect *redir) +{ +	u8 struct_v, struct_cv; +	u32 len; +	void *struct_end; +	int ret; + +	ceph_decode_need(p, end, 1 + 1 + 4, e_inval); +	struct_v = ceph_decode_8(p); +	struct_cv = ceph_decode_8(p); +	if (struct_cv > 1) { +		pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", +			struct_v, struct_cv); +		goto e_inval; +	} +	len = ceph_decode_32(p); +	ceph_decode_need(p, end, len, e_inval); +	struct_end = *p + len; + +	ret = ceph_oloc_decode(p, end, &redir->oloc); +	if (ret) +		goto out; + +	len = ceph_decode_32(p); +	if (len > 0) { +		pr_warn("ceph_request_redirect::object_name is set\n"); +		goto e_inval; +	} + +	len = ceph_decode_32(p); +	*p += len; /* skip osd_instructions */ + +	/* skip the rest */ +	*p = struct_end; +out: +	return ret; + +e_inval: +	ret = -EINVAL; +	goto out; +} +  static void complete_request(struct ceph_osd_request *req)  {  	complete_all(&req->r_safe_completion);  /* fsync waiter */ @@ -1446,6 +1675,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,  {  	void *p, *end;  	struct ceph_osd_request *req; +	struct ceph_request_redirect redir;  	u64 tid;  	int object_len;  	unsigned int numops; @@ -1484,6 +1714,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,  	osdmap_epoch = ceph_decode_32(&p);  	/* lookup */ +	down_read(&osdc->map_sem);  	mutex_lock(&osdc->request_mutex);  	req = __lookup_request(osdc, tid);  	if (req == NULL) { @@ -1525,10 +1756,40 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,  	for (i = 0; i < numops; i++)  		req->r_reply_op_result[i] = ceph_decode_32(&p); -	already_completed = req->r_got_reply; +	if (le16_to_cpu(msg->hdr.version) >= 6) { +		p += 8 + 4; /* skip replay_version */ +		p += 8; /* skip user_version */ -	if (!req->r_got_reply) { +		err = ceph_redirect_decode(&p, end, &redir); +		if (err) +			goto bad_put; +	} else { +		redir.oloc.pool = -1; +	} +	if (redir.oloc.pool != -1) { +		dout("redirect pool %lld\n", redir.oloc.pool); + +		__unregister_request(osdc, req); + +		req->r_target_oloc = redir.oloc; /* struct */ + +		/* +		 * Start redirect requests with nofail=true.  If +		 * mapping fails, request will end up on the notarget +		 * list, waiting for the new osdmap (which can take +		 * a while), even though the original request mapped +		 * successfully.  In the future we might want to follow +		 * original request's nofail setting here. +		 */ +		err = __ceph_osdc_start_request(osdc, req, true); +		BUG_ON(err); + +		goto out_unlock; +	} + +	already_completed = req->r_got_reply; +	if (!req->r_got_reply) {  		req->r_result = result;  		dout("handle_reply result %d bytes %d\n", req->r_result,  		     bytes); @@ -1542,8 +1803,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,  		req->r_got_reply = 1;  	} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {  		dout("handle_reply tid %llu dup ack\n", tid); -		mutex_unlock(&osdc->request_mutex); -		goto done; +		goto out_unlock;  	}  	dout("handle_reply tid %llu flags %d\n", tid, flags); @@ -1558,6 +1818,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,  		__unregister_request(osdc, req);  	mutex_unlock(&osdc->request_mutex); +	up_read(&osdc->map_sem);  	if (!already_completed) {  		if (req->r_unsafe_callback && @@ -1575,15 +1836,27 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,  		complete_request(req);  	} -done: +out:  	dout("req=%p req->r_linger=%d\n", req, req->r_linger);  	ceph_osdc_put_request(req);  	return; +out_unlock: +	mutex_unlock(&osdc->request_mutex); +	up_read(&osdc->map_sem); +	goto out;  bad_put: +	req->r_result = -EIO; +	__unregister_request(osdc, req); +	if (req->r_callback) +		req->r_callback(req, msg); +	else +		complete_all(&req->r_completion); +	complete_request(req);  	ceph_osdc_put_request(req);  bad_mutex:  	mutex_unlock(&osdc->request_mutex); +	up_read(&osdc->map_sem);  bad:  	pr_err("corrupt osd_op_reply got %d %d\n",  	       (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); @@ -1613,14 +1886,17 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)   *   * Caller should hold map_sem for read.   */ -static void kick_requests(struct ceph_osd_client *osdc, int force_resend) +static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, +			  bool force_resend_writes)  {  	struct ceph_osd_request *req, *nreq;  	struct rb_node *p;  	int needmap = 0;  	int err; +	bool force_resend_req; -	dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); +	dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "", +		force_resend_writes ? " (force resend writes)" : "");  	mutex_lock(&osdc->request_mutex);  	for (p = rb_first(&osdc->requests); p; ) {  		req = rb_entry(p, struct ceph_osd_request, r_node); @@ -1645,7 +1921,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)  			continue;  		} -		err = __map_request(osdc, req, force_resend); +		force_resend_req = force_resend || +			(force_resend_writes && +				req->r_flags & CEPH_OSD_FLAG_WRITE); +		err = __map_request(osdc, req, force_resend_req);  		if (err < 0)  			continue;  /* error */  		if (req->r_osd == NULL) { @@ -1665,7 +1944,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)  				 r_linger_item) {  		dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); -		err = __map_request(osdc, req, force_resend); +		err = __map_request(osdc, req, +				    force_resend || force_resend_writes);  		dout("__map_request returned %d\n", err);  		if (err == 0)  			continue;  /* no change and no osd was specified */ @@ -1707,6 +1987,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)  	struct ceph_osdmap *newmap = NULL, *oldmap;  	int err;  	struct ceph_fsid fsid; +	bool was_full;  	dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);  	p = msg->front.iov_base; @@ -1720,6 +2001,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)  	down_write(&osdc->map_sem); +	was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); +  	/* incremental maps */  	ceph_decode_32_safe(&p, end, nr_maps, bad);  	dout(" %d inc maps\n", nr_maps); @@ -1744,7 +2027,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)  				ceph_osdmap_destroy(osdc->osdmap);  				osdc->osdmap = newmap;  			} -			kick_requests(osdc, 0); +			was_full = was_full || +				ceph_osdmap_flag(osdc->osdmap, +						 CEPH_OSDMAP_FULL); +			kick_requests(osdc, 0, was_full);  		} else {  			dout("ignoring incremental map %u len %d\n",  			     epoch, maplen); @@ -1774,7 +2060,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)  			int skipped_map = 0;  			dout("taking full map %u len %d\n", epoch, maplen); -			newmap = osdmap_decode(&p, p+maplen); +			newmap = ceph_osdmap_decode(&p, p+maplen);  			if (IS_ERR(newmap)) {  				err = PTR_ERR(newmap);  				goto bad; @@ -1787,7 +2073,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)  					skipped_map = 1;  				ceph_osdmap_destroy(oldmap);  			} -			kick_requests(osdc, skipped_map); +			was_full = was_full || +				ceph_osdmap_flag(osdc->osdmap, +						 CEPH_OSDMAP_FULL); +			kick_requests(osdc, skipped_map, was_full);  		}  		p += maplen;  		nr_maps--; @@ -1804,7 +2093,9 @@ done:  	 * we find out when we are no longer full and stop returning  	 * ENOSPC.  	 */ -	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) +	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || +		ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) || +		ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))  		ceph_monc_request_next_osdmap(&osdc->client->monc);  	mutex_lock(&osdc->request_mutex); @@ -1818,7 +2109,6 @@ bad:  	pr_err("osdc handle_map corrupt msg\n");  	ceph_msg_dump(msg);  	up_write(&osdc->map_sem); -	return;  }  /* @@ -2017,7 +2307,6 @@ done_err:  bad:  	pr_err("osdc handle_watch_notify corrupt msg\n"); -	return;  }  /* @@ -2068,10 +2357,11 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,  	ceph_encode_32(&p, -1);  /* preferred */  	/* oid */ -	ceph_encode_32(&p, req->r_oid_len); -	memcpy(p, req->r_oid, req->r_oid_len); -	dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); -	p += req->r_oid_len; +	ceph_encode_32(&p, req->r_base_oid.name_len); +	memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); +	dout("oid '%.*s' len %d\n", req->r_base_oid.name_len, +	     req->r_base_oid.name, req->r_base_oid.name_len); +	p += req->r_base_oid.name_len;  	/* ops--can imply data */  	ceph_encode_16(&p, (u16)req->r_num_ops); @@ -2125,34 +2415,16 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,  			    struct ceph_osd_request *req,  			    bool nofail)  { -	int rc = 0; +	int rc;  	down_read(&osdc->map_sem);  	mutex_lock(&osdc->request_mutex); -	__register_request(osdc, req); -	req->r_sent = 0; -	req->r_got_reply = 0; -	rc = __map_request(osdc, req, 0); -	if (rc < 0) { -		if (nofail) { -			dout("osdc_start_request failed map, " -				" will retry %lld\n", req->r_tid); -			rc = 0; -		} else { -			__unregister_request(osdc, req); -		} -		goto out_unlock; -	} -	if (req->r_osd == NULL) { -		dout("send_request %p no up osds in pg\n", req); -		ceph_monc_request_next_osdmap(&osdc->client->monc); -	} else { -		__send_queued(osdc); -	} -	rc = 0; -out_unlock: + +	rc = __ceph_osdc_start_request(osdc, req, nofail); +  	mutex_unlock(&osdc->request_mutex);  	up_read(&osdc->map_sem); +  	return rc;  }  EXPORT_SYMBOL(ceph_osdc_start_request); @@ -2216,6 +2488,17 @@ void ceph_osdc_sync(struct ceph_osd_client *osdc)  EXPORT_SYMBOL(ceph_osdc_sync);  /* + * Call all pending notify callbacks - for use after a watch is + * unregistered, to make sure no more callbacks for it will be invoked + */ +void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) +{ +	flush_workqueue(osdc->notify_wq); +} +EXPORT_SYMBOL(ceph_osdc_flush_notifies); + + +/*   * init, shutdown   */  int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) @@ -2267,9 +2550,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)  	err = -ENOMEM;  	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");  	if (!osdc->notify_wq) -		goto out_msgpool; +		goto out_msgpool_reply; +  	return 0; +out_msgpool_reply: +	ceph_msgpool_destroy(&osdc->msgpool_op_reply);  out_msgpool:  	ceph_msgpool_destroy(&osdc->msgpool_op);  out_mempool: @@ -2443,7 +2729,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,  	struct ceph_osd_client *osdc = osd->o_osdc;  	struct ceph_msg *m;  	struct ceph_osd_request *req; -	int front = le32_to_cpu(hdr->front_len); +	int front_len = le32_to_cpu(hdr->front_len);  	int data_len = le32_to_cpu(hdr->data_len);  	u64 tid; @@ -2463,12 +2749,13 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,  		     req->r_reply, req->r_reply->con);  	ceph_msg_revoke_incoming(req->r_reply); -	if (front > req->r_reply->front.iov_len) { +	if (front_len > req->r_reply->front_alloc_len) {  		pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", -			   front, (int)req->r_reply->front.iov_len, +			   front_len, req->r_reply->front_alloc_len,  			   (unsigned int)con->peer_name.type,  			   le64_to_cpu(con->peer_name.num)); -		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); +		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, +				 false);  		if (!m)  			goto out;  		ceph_msg_put(req->r_reply); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index dbd9a479242..c547e46084d 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -329,6 +329,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end)  	dout("crush decode tunable chooseleaf_descend_once = %d",  	     c->chooseleaf_descend_once); +	ceph_decode_need(p, end, sizeof(u8), done); +	c->chooseleaf_vary_r = ceph_decode_8(p); +	dout("crush decode tunable chooseleaf_vary_r = %d", +	     c->chooseleaf_vary_r); +  done:  	dout("crush_decode success\n");  	return c; @@ -343,7 +348,7 @@ bad:  /*   * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid - * to a set of osds) + * to a set of osds) and primary_temp (explicit primary setting)   */  static int pgid_cmp(struct ceph_pg l, struct ceph_pg r)  { @@ -464,6 +469,11 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)  	return NULL;  } +struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id) +{ +	return __lookup_pg_pool(&map->pg_pools, id); +} +  const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)  {  	struct ceph_pg_pool_info *pi; @@ -501,7 +511,7 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)  	kfree(pi);  } -static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) +static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)  {  	u8 ev, cv;  	unsigned len, num; @@ -514,8 +524,8 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)  		pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);  		return -EINVAL;  	} -	if (cv > 7) { -		pr_warning("got v %d cv %d > 7 of ceph_pg_pool\n", ev, cv); +	if (cv > 9) { +		pr_warning("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);  		return -EINVAL;  	}  	len = ceph_decode_32(p); @@ -543,12 +553,34 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)  		*p += len;  	} -	/* skip removed snaps */ +	/* skip removed_snaps */  	num = ceph_decode_32(p);  	*p += num * (8 + 8);  	*p += 8;  /* skip auid */  	pi->flags = ceph_decode_64(p); +	*p += 4;  /* skip crash_replay_interval */ + +	if (ev >= 7) +		*p += 1;  /* skip min_size */ + +	if (ev >= 8) +		*p += 8 + 8;  /* skip quota_max_* */ + +	if (ev >= 9) { +		/* skip tiers */ +		num = ceph_decode_32(p); +		*p += num * 8; + +		*p += 8;  /* skip tier_of */ +		*p += 1;  /* skip cache_mode */ + +		pi->read_tier = ceph_decode_64(p); +		pi->write_tier = ceph_decode_64(p); +	} else { +		pi->read_tier = -1; +		pi->write_tier = -1; +	}  	/* ignore the rest */ @@ -560,7 +592,7 @@ bad:  	return -EINVAL;  } -static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) +static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)  {  	struct ceph_pg_pool_info *pi;  	u32 num, len; @@ -606,6 +638,13 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)  		rb_erase(&pg->node, &map->pg_temp);  		kfree(pg);  	} +	while (!RB_EMPTY_ROOT(&map->primary_temp)) { +		struct ceph_pg_mapping *pg = +			rb_entry(rb_first(&map->primary_temp), +				 struct ceph_pg_mapping, node); +		rb_erase(&pg->node, &map->primary_temp); +		kfree(pg); +	}  	while (!RB_EMPTY_ROOT(&map->pg_pools)) {  		struct ceph_pg_pool_info *pi =  			rb_entry(rb_first(&map->pg_pools), @@ -615,186 +654,516 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)  	kfree(map->osd_state);  	kfree(map->osd_weight);  	kfree(map->osd_addr); +	kfree(map->osd_primary_affinity);  	kfree(map);  }  /* - * adjust max osd value.  reallocate arrays. + * Adjust max_osd value, (re)allocate arrays. + * + * The new elements are properly initialized.   */  static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)  {  	u8 *state; -	struct ceph_entity_addr *addr;  	u32 *weight; +	struct ceph_entity_addr *addr; +	int i; -	state = kcalloc(max, sizeof(*state), GFP_NOFS); -	addr = kcalloc(max, sizeof(*addr), GFP_NOFS); -	weight = kcalloc(max, sizeof(*weight), GFP_NOFS); -	if (state == NULL || addr == NULL || weight == NULL) { +	state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS); +	weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS); +	addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS); +	if (!state || !weight || !addr) {  		kfree(state); -		kfree(addr);  		kfree(weight); +		kfree(addr); +  		return -ENOMEM;  	} -	/* copy old? */ -	if (map->osd_state) { -		memcpy(state, map->osd_state, map->max_osd*sizeof(*state)); -		memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr)); -		memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight)); -		kfree(map->osd_state); -		kfree(map->osd_addr); -		kfree(map->osd_weight); +	for (i = map->max_osd; i < max; i++) { +		state[i] = 0; +		weight[i] = CEPH_OSD_OUT; +		memset(addr + i, 0, sizeof(*addr));  	}  	map->osd_state = state;  	map->osd_weight = weight;  	map->osd_addr = addr; + +	if (map->osd_primary_affinity) { +		u32 *affinity; + +		affinity = krealloc(map->osd_primary_affinity, +				    max*sizeof(*affinity), GFP_NOFS); +		if (!affinity) +			return -ENOMEM; + +		for (i = map->max_osd; i < max; i++) +			affinity[i] = CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; + +		map->osd_primary_affinity = affinity; +	} +  	map->max_osd = max; +  	return 0;  } +#define OSDMAP_WRAPPER_COMPAT_VER	7 +#define OSDMAP_CLIENT_DATA_COMPAT_VER	1 +  /* - * decode a full map. + * Return 0 or error.  On success, *v is set to 0 for old (v6) osdmaps, + * to struct_v of the client_data section for new (v7 and above) + * osdmaps.   */ -struct ceph_osdmap *osdmap_decode(void **p, void *end) +static int get_osdmap_client_data_v(void **p, void *end, +				    const char *prefix, u8 *v)  { -	struct ceph_osdmap *map; -	u16 version; -	u32 len, max, i; -	int err = -EINVAL; -	void *start = *p; -	struct ceph_pg_pool_info *pi; +	u8 struct_v; + +	ceph_decode_8_safe(p, end, struct_v, e_inval); +	if (struct_v >= 7) { +		u8 struct_compat; + +		ceph_decode_8_safe(p, end, struct_compat, e_inval); +		if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) { +			pr_warning("got v %d cv %d > %d of %s ceph_osdmap\n", +				   struct_v, struct_compat, +				   OSDMAP_WRAPPER_COMPAT_VER, prefix); +			return -EINVAL; +		} +		*p += 4; /* ignore wrapper struct_len */ + +		ceph_decode_8_safe(p, end, struct_v, e_inval); +		ceph_decode_8_safe(p, end, struct_compat, e_inval); +		if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) { +			pr_warning("got v %d cv %d > %d of %s ceph_osdmap client data\n", +				   struct_v, struct_compat, +				   OSDMAP_CLIENT_DATA_COMPAT_VER, prefix); +			return -EINVAL; +		} +		*p += 4; /* ignore client data struct_len */ +	} else { +		u16 version; + +		*p -= 1; +		ceph_decode_16_safe(p, end, version, e_inval); +		if (version < 6) { +			pr_warning("got v %d < 6 of %s ceph_osdmap\n", version, +				   prefix); +			return -EINVAL; +		} -	dout("osdmap_decode %p to %p len %d\n", *p, end, (int)(end - *p)); +		/* old osdmap enconding */ +		struct_v = 0; +	} -	map = kzalloc(sizeof(*map), GFP_NOFS); -	if (map == NULL) -		return ERR_PTR(-ENOMEM); -	map->pg_temp = RB_ROOT; +	*v = struct_v; +	return 0; -	ceph_decode_16_safe(p, end, version, bad); -	if (version > 6) { -		pr_warning("got unknown v %d > 6 of osdmap\n", version); -		goto bad; +e_inval: +	return -EINVAL; +} + +static int __decode_pools(void **p, void *end, struct ceph_osdmap *map, +			  bool incremental) +{ +	u32 n; + +	ceph_decode_32_safe(p, end, n, e_inval); +	while (n--) { +		struct ceph_pg_pool_info *pi; +		u64 pool; +		int ret; + +		ceph_decode_64_safe(p, end, pool, e_inval); + +		pi = __lookup_pg_pool(&map->pg_pools, pool); +		if (!incremental || !pi) { +			pi = kzalloc(sizeof(*pi), GFP_NOFS); +			if (!pi) +				return -ENOMEM; + +			pi->id = pool; + +			ret = __insert_pg_pool(&map->pg_pools, pi); +			if (ret) { +				kfree(pi); +				return ret; +			} +		} + +		ret = decode_pool(p, end, pi); +		if (ret) +			return ret;  	} -	if (version < 6) { -		pr_warning("got old v %d < 6 of osdmap\n", version); -		goto bad; + +	return 0; + +e_inval: +	return -EINVAL; +} + +static int decode_pools(void **p, void *end, struct ceph_osdmap *map) +{ +	return __decode_pools(p, end, map, false); +} + +static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map) +{ +	return __decode_pools(p, end, map, true); +} + +static int __decode_pg_temp(void **p, void *end, struct ceph_osdmap *map, +			    bool incremental) +{ +	u32 n; + +	ceph_decode_32_safe(p, end, n, e_inval); +	while (n--) { +		struct ceph_pg pgid; +		u32 len, i; +		int ret; + +		ret = ceph_decode_pgid(p, end, &pgid); +		if (ret) +			return ret; + +		ceph_decode_32_safe(p, end, len, e_inval); + +		ret = __remove_pg_mapping(&map->pg_temp, pgid); +		BUG_ON(!incremental && ret != -ENOENT); + +		if (!incremental || len > 0) { +			struct ceph_pg_mapping *pg; + +			ceph_decode_need(p, end, len*sizeof(u32), e_inval); + +			if (len > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) +				return -EINVAL; + +			pg = kzalloc(sizeof(*pg) + len*sizeof(u32), GFP_NOFS); +			if (!pg) +				return -ENOMEM; + +			pg->pgid = pgid; +			pg->pg_temp.len = len; +			for (i = 0; i < len; i++) +				pg->pg_temp.osds[i] = ceph_decode_32(p); + +			ret = __insert_pg_mapping(pg, &map->pg_temp); +			if (ret) { +				kfree(pg); +				return ret; +			} +		} +	} + +	return 0; + +e_inval: +	return -EINVAL; +} + +static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map) +{ +	return __decode_pg_temp(p, end, map, false); +} + +static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map) +{ +	return __decode_pg_temp(p, end, map, true); +} + +static int __decode_primary_temp(void **p, void *end, struct ceph_osdmap *map, +				 bool incremental) +{ +	u32 n; + +	ceph_decode_32_safe(p, end, n, e_inval); +	while (n--) { +		struct ceph_pg pgid; +		u32 osd; +		int ret; + +		ret = ceph_decode_pgid(p, end, &pgid); +		if (ret) +			return ret; + +		ceph_decode_32_safe(p, end, osd, e_inval); + +		ret = __remove_pg_mapping(&map->primary_temp, pgid); +		BUG_ON(!incremental && ret != -ENOENT); + +		if (!incremental || osd != (u32)-1) { +			struct ceph_pg_mapping *pg; + +			pg = kzalloc(sizeof(*pg), GFP_NOFS); +			if (!pg) +				return -ENOMEM; + +			pg->pgid = pgid; +			pg->primary_temp.osd = osd; + +			ret = __insert_pg_mapping(pg, &map->primary_temp); +			if (ret) { +				kfree(pg); +				return ret; +			} +		}  	} -	ceph_decode_need(p, end, 2*sizeof(u64)+6*sizeof(u32), bad); +	return 0; + +e_inval: +	return -EINVAL; +} + +static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map) +{ +	return __decode_primary_temp(p, end, map, false); +} + +static int decode_new_primary_temp(void **p, void *end, +				   struct ceph_osdmap *map) +{ +	return __decode_primary_temp(p, end, map, true); +} + +u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd) +{ +	BUG_ON(osd >= map->max_osd); + +	if (!map->osd_primary_affinity) +		return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; + +	return map->osd_primary_affinity[osd]; +} + +static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff) +{ +	BUG_ON(osd >= map->max_osd); + +	if (!map->osd_primary_affinity) { +		int i; + +		map->osd_primary_affinity = kmalloc(map->max_osd*sizeof(u32), +						    GFP_NOFS); +		if (!map->osd_primary_affinity) +			return -ENOMEM; + +		for (i = 0; i < map->max_osd; i++) +			map->osd_primary_affinity[i] = +			    CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; +	} + +	map->osd_primary_affinity[osd] = aff; + +	return 0; +} + +static int decode_primary_affinity(void **p, void *end, +				   struct ceph_osdmap *map) +{ +	u32 len, i; + +	ceph_decode_32_safe(p, end, len, e_inval); +	if (len == 0) { +		kfree(map->osd_primary_affinity); +		map->osd_primary_affinity = NULL; +		return 0; +	} +	if (len != map->max_osd) +		goto e_inval; + +	ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval); + +	for (i = 0; i < map->max_osd; i++) { +		int ret; + +		ret = set_primary_affinity(map, i, ceph_decode_32(p)); +		if (ret) +			return ret; +	} + +	return 0; + +e_inval: +	return -EINVAL; +} + +static int decode_new_primary_affinity(void **p, void *end, +				       struct ceph_osdmap *map) +{ +	u32 n; + +	ceph_decode_32_safe(p, end, n, e_inval); +	while (n--) { +		u32 osd, aff; +		int ret; + +		ceph_decode_32_safe(p, end, osd, e_inval); +		ceph_decode_32_safe(p, end, aff, e_inval); + +		ret = set_primary_affinity(map, osd, aff); +		if (ret) +			return ret; + +		pr_info("osd%d primary-affinity 0x%x\n", osd, aff); +	} + +	return 0; + +e_inval: +	return -EINVAL; +} + +/* + * decode a full map. + */ +static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) +{ +	u8 struct_v; +	u32 epoch = 0; +	void *start = *p; +	u32 max; +	u32 len, i; +	int err; + +	dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); + +	err = get_osdmap_client_data_v(p, end, "full", &struct_v); +	if (err) +		goto bad; + +	/* fsid, epoch, created, modified */ +	ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) + +			 sizeof(map->created) + sizeof(map->modified), e_inval);  	ceph_decode_copy(p, &map->fsid, sizeof(map->fsid)); -	map->epoch = ceph_decode_32(p); +	epoch = map->epoch = ceph_decode_32(p);  	ceph_decode_copy(p, &map->created, sizeof(map->created));  	ceph_decode_copy(p, &map->modified, sizeof(map->modified)); -	ceph_decode_32_safe(p, end, max, bad); -	while (max--) { -		ceph_decode_need(p, end, 8 + 2, bad); -		err = -ENOMEM; -		pi = kzalloc(sizeof(*pi), GFP_NOFS); -		if (!pi) -			goto bad; -		pi->id = ceph_decode_64(p); -		err = __decode_pool(p, end, pi); -		if (err < 0) { -			kfree(pi); -			goto bad; -		} -		__insert_pg_pool(&map->pg_pools, pi); -	} +	/* pools */ +	err = decode_pools(p, end, map); +	if (err) +		goto bad; -	err = __decode_pool_names(p, end, map); -	if (err < 0) { -		dout("fail to decode pool names"); +	/* pool_name */ +	err = decode_pool_names(p, end, map); +	if (err)  		goto bad; -	} -	ceph_decode_32_safe(p, end, map->pool_max, bad); +	ceph_decode_32_safe(p, end, map->pool_max, e_inval); -	ceph_decode_32_safe(p, end, map->flags, bad); +	ceph_decode_32_safe(p, end, map->flags, e_inval); -	max = ceph_decode_32(p); +	/* max_osd */ +	ceph_decode_32_safe(p, end, max, e_inval);  	/* (re)alloc osd arrays */  	err = osdmap_set_max_osd(map, max); -	if (err < 0) +	if (err)  		goto bad; -	dout("osdmap_decode max_osd = %d\n", map->max_osd); -	/* osds */ -	err = -EINVAL; +	/* osd_state, osd_weight, osd_addrs->client_addr */  	ceph_decode_need(p, end, 3*sizeof(u32) +  			 map->max_osd*(1 + sizeof(*map->osd_weight) + -				       sizeof(*map->osd_addr)), bad); -	*p += 4; /* skip length field (should match max) */ +				       sizeof(*map->osd_addr)), e_inval); + +	if (ceph_decode_32(p) != map->max_osd) +		goto e_inval; +  	ceph_decode_copy(p, map->osd_state, map->max_osd); -	*p += 4; /* skip length field (should match max) */ +	if (ceph_decode_32(p) != map->max_osd) +		goto e_inval; +  	for (i = 0; i < map->max_osd; i++)  		map->osd_weight[i] = ceph_decode_32(p); -	*p += 4; /* skip length field (should match max) */ +	if (ceph_decode_32(p) != map->max_osd) +		goto e_inval; +  	ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));  	for (i = 0; i < map->max_osd; i++)  		ceph_decode_addr(&map->osd_addr[i]);  	/* pg_temp */ -	ceph_decode_32_safe(p, end, len, bad); -	for (i = 0; i < len; i++) { -		int n, j; -		struct ceph_pg pgid; -		struct ceph_pg_mapping *pg; +	err = decode_pg_temp(p, end, map); +	if (err) +		goto bad; -		err = ceph_decode_pgid(p, end, &pgid); +	/* primary_temp */ +	if (struct_v >= 1) { +		err = decode_primary_temp(p, end, map);  		if (err)  			goto bad; -		ceph_decode_need(p, end, sizeof(u32), bad); -		n = ceph_decode_32(p); -		err = -EINVAL; -		if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) -			goto bad; -		ceph_decode_need(p, end, n * sizeof(u32), bad); -		err = -ENOMEM; -		pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS); -		if (!pg) -			goto bad; -		pg->pgid = pgid; -		pg->len = n; -		for (j = 0; j < n; j++) -			pg->osds[j] = ceph_decode_32(p); +	} -		err = __insert_pg_mapping(pg, &map->pg_temp); +	/* primary_affinity */ +	if (struct_v >= 2) { +		err = decode_primary_affinity(p, end, map);  		if (err)  			goto bad; -		dout(" added pg_temp %lld.%x len %d\n", pgid.pool, pgid.seed, -		     len); +	} else { +		/* XXX can this happen? */ +		kfree(map->osd_primary_affinity); +		map->osd_primary_affinity = NULL;  	}  	/* crush */ -	ceph_decode_32_safe(p, end, len, bad); -	dout("osdmap_decode crush len %d from off 0x%x\n", len, -	     (int)(*p - start)); -	ceph_decode_need(p, end, len, bad); -	map->crush = crush_decode(*p, end); -	*p += len; +	ceph_decode_32_safe(p, end, len, e_inval); +	map->crush = crush_decode(*p, min(*p + len, end));  	if (IS_ERR(map->crush)) {  		err = PTR_ERR(map->crush);  		map->crush = NULL;  		goto bad;  	} +	*p += len; -	/* ignore the rest of the map */ +	/* ignore the rest */  	*p = end; -	dout("osdmap_decode done %p %p\n", *p, end); -	return map; +	dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); +	return 0; +e_inval: +	err = -EINVAL;  bad: -	dout("osdmap_decode fail err %d\n", err); -	ceph_osdmap_destroy(map); -	return ERR_PTR(err); +	pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n", +	       err, epoch, (int)(*p - start), *p, start, end); +	print_hex_dump(KERN_DEBUG, "osdmap: ", +		       DUMP_PREFIX_OFFSET, 16, 1, +		       start, end - start, true); +	return err; +} + +/* + * Allocate and decode a full map. + */ +struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end) +{ +	struct ceph_osdmap *map; +	int ret; + +	map = kzalloc(sizeof(*map), GFP_NOFS); +	if (!map) +		return ERR_PTR(-ENOMEM); + +	map->pg_temp = RB_ROOT; +	map->primary_temp = RB_ROOT; +	mutex_init(&map->crush_scratch_mutex); + +	ret = osdmap_decode(p, end, map); +	if (ret) { +		ceph_osdmap_destroy(map); +		return ERR_PTR(ret); +	} + +	return map;  }  /* @@ -813,17 +1182,18 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,  	__s64 new_pool_max;  	__s32 new_flags, max;  	void *start = *p; -	int err = -EINVAL; -	u16 version; +	int err; +	u8 struct_v; + +	dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); -	ceph_decode_16_safe(p, end, version, bad); -	if (version != 6) { -		pr_warning("got unknown v %d != 6 of inc osdmap\n", version); +	err = get_osdmap_client_data_v(p, end, "inc", &struct_v); +	if (err)  		goto bad; -	} -	ceph_decode_need(p, end, sizeof(fsid)+sizeof(modified)+2*sizeof(u32), -			 bad); +	/* fsid, epoch, modified, new_pool_max, new_flags */ +	ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) + +			 sizeof(u64) + sizeof(u32), e_inval);  	ceph_decode_copy(p, &fsid, sizeof(fsid));  	epoch = ceph_decode_32(p);  	BUG_ON(epoch != map->epoch+1); @@ -832,21 +1202,22 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,  	new_flags = ceph_decode_32(p);  	/* full map? */ -	ceph_decode_32_safe(p, end, len, bad); +	ceph_decode_32_safe(p, end, len, e_inval);  	if (len > 0) {  		dout("apply_incremental full map len %d, %p to %p\n",  		     len, *p, end); -		return osdmap_decode(p, min(*p+len, end)); +		return ceph_osdmap_decode(p, min(*p+len, end));  	}  	/* new crush? */ -	ceph_decode_32_safe(p, end, len, bad); +	ceph_decode_32_safe(p, end, len, e_inval);  	if (len > 0) { -		dout("apply_incremental new crush map len %d, %p to %p\n", -		     len, *p, end);  		newcrush = crush_decode(*p, min(*p+len, end)); -		if (IS_ERR(newcrush)) -			return ERR_CAST(newcrush); +		if (IS_ERR(newcrush)) { +			err = PTR_ERR(newcrush); +			newcrush = NULL; +			goto bad; +		}  		*p += len;  	} @@ -856,13 +1227,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,  	if (new_pool_max >= 0)  		map->pool_max = new_pool_max; -	ceph_decode_need(p, end, 5*sizeof(u32), bad); -  	/* new max? */ -	max = ceph_decode_32(p); +	ceph_decode_32_safe(p, end, max, e_inval);  	if (max >= 0) {  		err = osdmap_set_max_osd(map, max); -		if (err < 0) +		if (err)  			goto bad;  	} @@ -875,51 +1244,34 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,  		newcrush = NULL;  	} -	/* new_pool */ -	ceph_decode_32_safe(p, end, len, bad); -	while (len--) { -		struct ceph_pg_pool_info *pi; +	/* new_pools */ +	err = decode_new_pools(p, end, map); +	if (err) +		goto bad; -		ceph_decode_64_safe(p, end, pool, bad); -		pi = __lookup_pg_pool(&map->pg_pools, pool); -		if (!pi) { -			pi = kzalloc(sizeof(*pi), GFP_NOFS); -			if (!pi) { -				err = -ENOMEM; -				goto bad; -			} -			pi->id = pool; -			__insert_pg_pool(&map->pg_pools, pi); -		} -		err = __decode_pool(p, end, pi); -		if (err < 0) -			goto bad; -	} -	if (version >= 5) { -		err = __decode_pool_names(p, end, map); -		if (err < 0) -			goto bad; -	} +	/* new_pool_names */ +	err = decode_pool_names(p, end, map); +	if (err) +		goto bad;  	/* old_pool */ -	ceph_decode_32_safe(p, end, len, bad); +	ceph_decode_32_safe(p, end, len, e_inval);  	while (len--) {  		struct ceph_pg_pool_info *pi; -		ceph_decode_64_safe(p, end, pool, bad); +		ceph_decode_64_safe(p, end, pool, e_inval);  		pi = __lookup_pg_pool(&map->pg_pools, pool);  		if (pi)  			__remove_pg_pool(&map->pg_pools, pi);  	}  	/* new_up */ -	err = -EINVAL; -	ceph_decode_32_safe(p, end, len, bad); +	ceph_decode_32_safe(p, end, len, e_inval);  	while (len--) {  		u32 osd;  		struct ceph_entity_addr addr; -		ceph_decode_32_safe(p, end, osd, bad); -		ceph_decode_copy_safe(p, end, &addr, sizeof(addr), bad); +		ceph_decode_32_safe(p, end, osd, e_inval); +		ceph_decode_copy_safe(p, end, &addr, sizeof(addr), e_inval);  		ceph_decode_addr(&addr);  		pr_info("osd%d up\n", osd);  		BUG_ON(osd >= map->max_osd); @@ -928,11 +1280,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,  	}  	/* new_state */ -	ceph_decode_32_safe(p, end, len, bad); +	ceph_decode_32_safe(p, end, len, e_inval);  	while (len--) {  		u32 osd;  		u8 xorstate; -		ceph_decode_32_safe(p, end, osd, bad); +		ceph_decode_32_safe(p, end, osd, e_inval);  		xorstate = **(u8 **)p;  		(*p)++;  /* clean flag */  		if (xorstate == 0) @@ -944,10 +1296,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,  	}  	/* new_weight */ -	ceph_decode_32_safe(p, end, len, bad); +	ceph_decode_32_safe(p, end, len, e_inval);  	while (len--) {  		u32 osd, off; -		ceph_decode_need(p, end, sizeof(u32)*2, bad); +		ceph_decode_need(p, end, sizeof(u32)*2, e_inval);  		osd = ceph_decode_32(p);  		off = ceph_decode_32(p);  		pr_info("osd%d weight 0x%x %s\n", osd, off, @@ -958,56 +1310,35 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,  	}  	/* new_pg_temp */ -	ceph_decode_32_safe(p, end, len, bad); -	while (len--) { -		struct ceph_pg_mapping *pg; -		int j; -		struct ceph_pg pgid; -		u32 pglen; +	err = decode_new_pg_temp(p, end, map); +	if (err) +		goto bad; -		err = ceph_decode_pgid(p, end, &pgid); +	/* new_primary_temp */ +	if (struct_v >= 1) { +		err = decode_new_primary_temp(p, end, map);  		if (err)  			goto bad; -		ceph_decode_need(p, end, sizeof(u32), bad); -		pglen = ceph_decode_32(p); -		if (pglen) { -			ceph_decode_need(p, end, pglen*sizeof(u32), bad); - -			/* removing existing (if any) */ -			(void) __remove_pg_mapping(&map->pg_temp, pgid); +	} -			/* insert */ -			err = -EINVAL; -			if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) -				goto bad; -			err = -ENOMEM; -			pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); -			if (!pg) -				goto bad; -			pg->pgid = pgid; -			pg->len = pglen; -			for (j = 0; j < pglen; j++) -				pg->osds[j] = ceph_decode_32(p); -			err = __insert_pg_mapping(pg, &map->pg_temp); -			if (err) { -				kfree(pg); -				goto bad; -			} -			dout(" added pg_temp %lld.%x len %d\n", pgid.pool, -			     pgid.seed, pglen); -		} else { -			/* remove */ -			__remove_pg_mapping(&map->pg_temp, pgid); -		} +	/* new_primary_affinity */ +	if (struct_v >= 2) { +		err = decode_new_primary_affinity(p, end, map); +		if (err) +			goto bad;  	}  	/* ignore the rest */  	*p = end; + +	dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);  	return map; +e_inval: +	err = -EINVAL;  bad: -	pr_err("corrupt inc osdmap epoch %d off %d (%p of %p-%p)\n", -	       epoch, (int)(*p - start), *p, start, end); +	pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n", +	       err, epoch, (int)(*p - start), *p, start, end);  	print_hex_dump(KERN_DEBUG, "osdmap: ",  		       DUMP_PREFIX_OFFSET, 16, 1,  		       start, end - start, true); @@ -1090,71 +1421,275 @@ invalid:  EXPORT_SYMBOL(ceph_calc_file_object_mapping);  /* - * calculate an object layout (i.e. pgid) from an oid, - * file_layout, and osdmap + * Calculate mapping of a (oloc, oid) pair to a PG.  Should only be + * called with target's (oloc, oid), since tiering isn't taken into + * account.   */ -int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid, -			struct ceph_osdmap *osdmap, uint64_t pool) +int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, +			struct ceph_object_locator *oloc, +			struct ceph_object_id *oid, +			struct ceph_pg *pg_out)  { -	struct ceph_pg_pool_info *pool_info; +	struct ceph_pg_pool_info *pi; -	BUG_ON(!osdmap); -	pool_info = __lookup_pg_pool(&osdmap->pg_pools, pool); -	if (!pool_info) +	pi = __lookup_pg_pool(&osdmap->pg_pools, oloc->pool); +	if (!pi)  		return -EIO; -	pg->pool = pool; -	pg->seed = ceph_str_hash(pool_info->object_hash, oid, strlen(oid)); -	dout("%s '%s' pgid %lld.%x\n", __func__, oid, pg->pool, pg->seed); +	pg_out->pool = oloc->pool; +	pg_out->seed = ceph_str_hash(pi->object_hash, oid->name, +				     oid->name_len); + +	dout("%s '%.*s' pgid %llu.%x\n", __func__, oid->name_len, oid->name, +	     pg_out->pool, pg_out->seed);  	return 0;  } -EXPORT_SYMBOL(ceph_calc_ceph_pg); +EXPORT_SYMBOL(ceph_oloc_oid_to_pg); + +static int do_crush(struct ceph_osdmap *map, int ruleno, int x, +		    int *result, int result_max, +		    const __u32 *weight, int weight_max) +{ +	int r; + +	BUG_ON(result_max > CEPH_PG_MAX_SIZE); + +	mutex_lock(&map->crush_scratch_mutex); +	r = crush_do_rule(map->crush, ruleno, x, result, result_max, +			  weight, weight_max, map->crush_scratch_ary); +	mutex_unlock(&map->crush_scratch_mutex); + +	return r; +}  /* - * Calculate raw osd vector for the given pgid.  Return pointer to osd - * array, or NULL on failure. + * Calculate raw (crush) set for given pgid. + * + * Return raw set length, or error.   */ -static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, -			int *osds, int *num) +static int pg_to_raw_osds(struct ceph_osdmap *osdmap, +			  struct ceph_pg_pool_info *pool, +			  struct ceph_pg pgid, u32 pps, int *osds)  { -	struct ceph_pg_mapping *pg; -	struct ceph_pg_pool_info *pool;  	int ruleno; -	int r; -	u32 pps; +	int len; -	pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); -	if (!pool) -		return NULL; +	/* crush */ +	ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, +				 pool->type, pool->size); +	if (ruleno < 0) { +		pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n", +		       pgid.pool, pool->crush_ruleset, pool->type, +		       pool->size); +		return -ENOENT; +	} -	/* pg_temp? */ +	len = do_crush(osdmap, ruleno, pps, osds, +		       min_t(int, pool->size, CEPH_PG_MAX_SIZE), +		       osdmap->osd_weight, osdmap->max_osd); +	if (len < 0) { +		pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n", +		       len, ruleno, pgid.pool, pool->crush_ruleset, +		       pool->type, pool->size); +		return len; +	} + +	return len; +} + +/* + * Given raw set, calculate up set and up primary. + * + * Return up set length.  *primary is set to up primary osd id, or -1 + * if up set is empty. + */ +static int raw_to_up_osds(struct ceph_osdmap *osdmap, +			  struct ceph_pg_pool_info *pool, +			  int *osds, int len, int *primary) +{ +	int up_primary = -1; +	int i; + +	if (ceph_can_shift_osds(pool)) { +		int removed = 0; + +		for (i = 0; i < len; i++) { +			if (ceph_osd_is_down(osdmap, osds[i])) { +				removed++; +				continue; +			} +			if (removed) +				osds[i - removed] = osds[i]; +		} + +		len -= removed; +		if (len > 0) +			up_primary = osds[0]; +	} else { +		for (i = len - 1; i >= 0; i--) { +			if (ceph_osd_is_down(osdmap, osds[i])) +				osds[i] = CRUSH_ITEM_NONE; +			else +				up_primary = osds[i]; +		} +	} + +	*primary = up_primary; +	return len; +} + +static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps, +				   struct ceph_pg_pool_info *pool, +				   int *osds, int len, int *primary) +{ +	int i; +	int pos = -1; + +	/* +	 * Do we have any non-default primary_affinity values for these +	 * osds? +	 */ +	if (!osdmap->osd_primary_affinity) +		return; + +	for (i = 0; i < len; i++) { +		int osd = osds[i]; + +		if (osd != CRUSH_ITEM_NONE && +		    osdmap->osd_primary_affinity[osd] != +					CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) { +			break; +		} +	} +	if (i == len) +		return; + +	/* +	 * Pick the primary.  Feed both the seed (for the pg) and the +	 * osd into the hash/rng so that a proportional fraction of an +	 * osd's pgs get rejected as primary. +	 */ +	for (i = 0; i < len; i++) { +		int osd = osds[i]; +		u32 aff; + +		if (osd == CRUSH_ITEM_NONE) +			continue; + +		aff = osdmap->osd_primary_affinity[osd]; +		if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY && +		    (crush_hash32_2(CRUSH_HASH_RJENKINS1, +				    pps, osd) >> 16) >= aff) { +			/* +			 * We chose not to use this primary.  Note it +			 * anyway as a fallback in case we don't pick +			 * anyone else, but keep looking. +			 */ +			if (pos < 0) +				pos = i; +		} else { +			pos = i; +			break; +		} +	} +	if (pos < 0) +		return; + +	*primary = osds[pos]; + +	if (ceph_can_shift_osds(pool) && pos > 0) { +		/* move the new primary to the front */ +		for (i = pos; i > 0; i--) +			osds[i] = osds[i - 1]; +		osds[0] = *primary; +	} +} + +/* + * Given up set, apply pg_temp and primary_temp mappings. + * + * Return acting set length.  *primary is set to acting primary osd id, + * or -1 if acting set is empty. + */ +static int apply_temps(struct ceph_osdmap *osdmap, +		       struct ceph_pg_pool_info *pool, struct ceph_pg pgid, +		       int *osds, int len, int *primary) +{ +	struct ceph_pg_mapping *pg; +	int temp_len; +	int temp_primary; +	int i; + +	/* raw_pg -> pg */  	pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num,  				    pool->pg_num_mask); + +	/* pg_temp? */  	pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);  	if (pg) { -		*num = pg->len; -		return pg->osds; +		temp_len = 0; +		temp_primary = -1; + +		for (i = 0; i < pg->pg_temp.len; i++) { +			if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) { +				if (ceph_can_shift_osds(pool)) +					continue; +				else +					osds[temp_len++] = CRUSH_ITEM_NONE; +			} else { +				osds[temp_len++] = pg->pg_temp.osds[i]; +			} +		} + +		/* apply pg_temp's primary */ +		for (i = 0; i < temp_len; i++) { +			if (osds[i] != CRUSH_ITEM_NONE) { +				temp_primary = osds[i]; +				break; +			} +		} +	} else { +		temp_len = len; +		temp_primary = *primary;  	} -	/* crush */ -	ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, -				 pool->type, pool->size); -	if (ruleno < 0) { -		pr_err("no crush rule pool %lld ruleset %d type %d size %d\n", -		       pgid.pool, pool->crush_ruleset, pool->type, -		       pool->size); -		return NULL; +	/* primary_temp? */ +	pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid); +	if (pg) +		temp_primary = pg->primary_temp.osd; + +	*primary = temp_primary; +	return temp_len; +} + +/* + * Calculate acting set for given pgid. + * + * Return acting set length, or error.  *primary is set to acting + * primary osd id, or -1 if acting set is empty or on error. + */ +int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, +			int *osds, int *primary) +{ +	struct ceph_pg_pool_info *pool; +	u32 pps; +	int len; + +	pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); +	if (!pool) { +		*primary = -1; +		return -ENOENT;  	}  	if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) { -		/* hash pool id and seed sothat pool PGs do not overlap */ +		/* hash pool id and seed so that pool PGs do not overlap */  		pps = crush_hash32_2(CRUSH_HASH_RJENKINS1,  				     ceph_stable_mod(pgid.seed, pool->pgp_num,  						     pool->pgp_num_mask),  				     pgid.pool);  	} else {  		/* -		 * legacy ehavior: add ps and pool together.  this is +		 * legacy behavior: add ps and pool together.  this is  		 * not a great approach because the PGs from each pool  		 * will overlap on top of each other: 0.5 == 1.4 ==  		 * 2.3 == ... @@ -1163,38 +1698,20 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,  				      pool->pgp_num_mask) +  			(unsigned)pgid.pool;  	} -	r = crush_do_rule(osdmap->crush, ruleno, pps, osds, -			  min_t(int, pool->size, *num), -			  osdmap->osd_weight); -	if (r < 0) { -		pr_err("error %d from crush rule: pool %lld ruleset %d type %d" -		       " size %d\n", r, pgid.pool, pool->crush_ruleset, -		       pool->type, pool->size); -		return NULL; + +	len = pg_to_raw_osds(osdmap, pool, pgid, pps, osds); +	if (len < 0) { +		*primary = -1; +		return len;  	} -	*num = r; -	return osds; -} -/* - * Return acting set for given pgid. - */ -int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, -			int *acting) -{ -	int rawosds[CEPH_PG_MAX_SIZE], *osds; -	int i, o, num = CEPH_PG_MAX_SIZE; +	len = raw_to_up_osds(osdmap, pool, osds, len, primary); -	osds = calc_pg_raw(osdmap, pgid, rawosds, &num); -	if (!osds) -		return -1; +	apply_primary_affinity(osdmap, pps, pool, osds, len, primary); + +	len = apply_temps(osdmap, pool, pgid, osds, len, primary); -	/* primary is first up osd */ -	o = 0; -	for (i = 0; i < num; i++) -		if (ceph_osd_is_up(osdmap, osds[i])) -			acting[o++] = osds[i]; -	return o; +	return len;  }  /* @@ -1202,17 +1719,11 @@ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,   */  int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid)  { -	int rawosds[CEPH_PG_MAX_SIZE], *osds; -	int i, num = CEPH_PG_MAX_SIZE; +	int osds[CEPH_PG_MAX_SIZE]; +	int primary; -	osds = calc_pg_raw(osdmap, pgid, rawosds, &num); -	if (!osds) -		return -1; +	ceph_calc_pg_acting(osdmap, pgid, osds, &primary); -	/* primary is first up osd */ -	for (i = 0; i < num; i++) -		if (ceph_osd_is_up(osdmap, osds[i])) -			return osds[i]; -	return -1; +	return primary;  }  EXPORT_SYMBOL(ceph_calc_pg_primary); diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c index 815a2249cfa..555013034f7 100644 --- a/net/ceph/pagevec.c +++ b/net/ceph/pagevec.c @@ -53,7 +53,10 @@ void ceph_put_page_vector(struct page **pages, int num_pages, bool dirty)  			set_page_dirty_lock(pages[i]);  		put_page(pages[i]);  	} -	kfree(pages); +	if (is_vmalloc_addr(pages)) +		vfree(pages); +	else +		kfree(pages);  }  EXPORT_SYMBOL(ceph_put_page_vector); @@ -165,36 +168,6 @@ void ceph_copy_from_page_vector(struct page **pages,  EXPORT_SYMBOL(ceph_copy_from_page_vector);  /* - * copy user data from a page vector into a user pointer - */ -int ceph_copy_page_vector_to_user(struct page **pages, -					 void __user *data, -					 loff_t off, size_t len) -{ -	int i = 0; -	int po = off & ~PAGE_CACHE_MASK; -	int left = len; -	int l, bad; - -	while (left > 0) { -		l = min_t(int, left, PAGE_CACHE_SIZE-po); -		bad = copy_to_user(data, page_address(pages[i]) + po, l); -		if (bad == l) -			return -EFAULT; -		data += l - bad; -		left -= l - bad; -		if (po) { -			po += l - bad; -			if (po == PAGE_CACHE_SIZE) -				po = 0; -		} -		i++; -	} -	return len; -} -EXPORT_SYMBOL(ceph_copy_page_vector_to_user); - -/*   * Zero an extent within a page vector.  Offset is relative to the   * start of the first page.   */  | 
