diff options
Diffstat (limited to 'net/rxrpc/call.c')
-rw-r--r-- | net/rxrpc/call.c | 2278 |
1 files changed, 2278 insertions, 0 deletions
diff --git a/net/rxrpc/call.c b/net/rxrpc/call.c new file mode 100644 index 00000000000..5cfd4cadee4 --- /dev/null +++ b/net/rxrpc/call.c @@ -0,0 +1,2278 @@ +/* call.c: Rx call routines + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#include <linux/sched.h> +#include <linux/slab.h> +#include <linux/module.h> +#include <rxrpc/rxrpc.h> +#include <rxrpc/transport.h> +#include <rxrpc/peer.h> +#include <rxrpc/connection.h> +#include <rxrpc/call.h> +#include <rxrpc/message.h> +#include "internal.h" + +__RXACCT_DECL(atomic_t rxrpc_call_count); +__RXACCT_DECL(atomic_t rxrpc_message_count); + +LIST_HEAD(rxrpc_calls); +DECLARE_RWSEM(rxrpc_calls_sem); + +unsigned rxrpc_call_rcv_timeout = HZ/3; +static unsigned rxrpc_call_acks_timeout = HZ/3; +static unsigned rxrpc_call_dfr_ack_timeout = HZ/20; +static unsigned short rxrpc_call_max_resend = HZ/10; + +const char *rxrpc_call_states[] = { + "COMPLETE", + "ERROR", + "SRVR_RCV_OPID", + "SRVR_RCV_ARGS", + "SRVR_GOT_ARGS", + "SRVR_SND_REPLY", + "SRVR_RCV_FINAL_ACK", + "CLNT_SND_ARGS", + "CLNT_RCV_REPLY", + "CLNT_GOT_REPLY" +}; + +const char *rxrpc_call_error_states[] = { + "NO_ERROR", + "LOCAL_ABORT", + "PEER_ABORT", + "LOCAL_ERROR", + "REMOTE_ERROR" +}; + +const char *rxrpc_pkts[] = { + "?00", + "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug", + "?09", "?10", "?11", "?12", "?13", "?14", "?15" +}; + +static const char *rxrpc_acks[] = { + "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL", + "-?-" +}; + +static const char _acktype[] = "NA-"; + +static void rxrpc_call_receive_packet(struct rxrpc_call *call); +static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, + struct rxrpc_message *msg); +static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, + struct rxrpc_message *msg); +static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, + rxrpc_seq_t higest); +static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest); +static int __rxrpc_call_read_data(struct rxrpc_call *call); + +static int rxrpc_call_record_ACK(struct rxrpc_call *call, + struct rxrpc_message *msg, + rxrpc_seq_t seq, + size_t count); + +static int rxrpc_call_flush(struct rxrpc_call *call); + +#define _state(call) \ + _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]); + +static void rxrpc_call_default_attn_func(struct rxrpc_call *call) +{ + wake_up(&call->waitq); +} + +static void rxrpc_call_default_error_func(struct rxrpc_call *call) +{ + wake_up(&call->waitq); +} + +static void rxrpc_call_default_aemap_func(struct rxrpc_call *call) +{ + switch (call->app_err_state) { + case RXRPC_ESTATE_LOCAL_ABORT: + call->app_abort_code = -call->app_errno; + case RXRPC_ESTATE_PEER_ABORT: + call->app_errno = -ECONNABORTED; + default: + break; + } +} + +static void __rxrpc_call_acks_timeout(unsigned long _call) +{ + struct rxrpc_call *call = (struct rxrpc_call *) _call; + + _debug("ACKS TIMEOUT %05lu", jiffies - call->cjif); + + call->flags |= RXRPC_CALL_ACKS_TIMO; + rxrpc_krxiod_queue_call(call); +} + +static void __rxrpc_call_rcv_timeout(unsigned long _call) +{ + struct rxrpc_call *call = (struct rxrpc_call *) _call; + + _debug("RCV TIMEOUT %05lu", jiffies - call->cjif); + + call->flags |= RXRPC_CALL_RCV_TIMO; + rxrpc_krxiod_queue_call(call); +} + +static void __rxrpc_call_ackr_timeout(unsigned long _call) +{ + struct rxrpc_call *call = (struct rxrpc_call *) _call; + + _debug("ACKR TIMEOUT %05lu",jiffies - call->cjif); + + call->flags |= RXRPC_CALL_ACKR_TIMO; + rxrpc_krxiod_queue_call(call); +} + +/*****************************************************************************/ +/* + * calculate a timeout based on an RTT value + */ +static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call *call, + unsigned long val) +{ + unsigned long expiry = call->conn->peer->rtt / (1000000 / HZ); + + expiry += 10; + if (expiry < HZ / 25) + expiry = HZ / 25; + if (expiry > HZ) + expiry = HZ; + + _leave(" = %lu jiffies", expiry); + return jiffies + expiry; +} /* end __rxrpc_rtt_based_timeout() */ + +/*****************************************************************************/ +/* + * create a new call record + */ +static inline int __rxrpc_create_call(struct rxrpc_connection *conn, + struct rxrpc_call **_call) +{ + struct rxrpc_call *call; + + _enter("%p", conn); + + /* allocate and initialise a call record */ + call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL); + if (!call) { + _leave(" ENOMEM"); + return -ENOMEM; + } + + atomic_set(&call->usage, 1); + + init_waitqueue_head(&call->waitq); + spin_lock_init(&call->lock); + INIT_LIST_HEAD(&call->link); + INIT_LIST_HEAD(&call->acks_pendq); + INIT_LIST_HEAD(&call->rcv_receiveq); + INIT_LIST_HEAD(&call->rcv_krxiodq_lk); + INIT_LIST_HEAD(&call->app_readyq); + INIT_LIST_HEAD(&call->app_unreadyq); + INIT_LIST_HEAD(&call->app_link); + INIT_LIST_HEAD(&call->app_attn_link); + + init_timer(&call->acks_timeout); + call->acks_timeout.data = (unsigned long) call; + call->acks_timeout.function = __rxrpc_call_acks_timeout; + + init_timer(&call->rcv_timeout); + call->rcv_timeout.data = (unsigned long) call; + call->rcv_timeout.function = __rxrpc_call_rcv_timeout; + + init_timer(&call->ackr_dfr_timo); + call->ackr_dfr_timo.data = (unsigned long) call; + call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout; + + call->conn = conn; + call->ackr_win_bot = 1; + call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1; + call->ackr_prev_seq = 0; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_attn_func = rxrpc_call_default_attn_func; + call->app_error_func = rxrpc_call_default_error_func; + call->app_aemap_func = rxrpc_call_default_aemap_func; + call->app_scr_alloc = call->app_scratch; + + call->cjif = jiffies; + + _leave(" = 0 (%p)", call); + + *_call = call; + + return 0; +} /* end __rxrpc_create_call() */ + +/*****************************************************************************/ +/* + * create a new call record for outgoing calls + */ +int rxrpc_create_call(struct rxrpc_connection *conn, + rxrpc_call_attn_func_t attn, + rxrpc_call_error_func_t error, + rxrpc_call_aemap_func_t aemap, + struct rxrpc_call **_call) +{ + DECLARE_WAITQUEUE(myself, current); + + struct rxrpc_call *call; + int ret, cix, loop; + + _enter("%p", conn); + + /* allocate and initialise a call record */ + ret = __rxrpc_create_call(conn, &call); + if (ret < 0) { + _leave(" = %d", ret); + return ret; + } + + call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS; + if (attn) + call->app_attn_func = attn; + if (error) + call->app_error_func = error; + if (aemap) + call->app_aemap_func = aemap; + + _state(call); + + spin_lock(&conn->lock); + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&conn->chanwait, &myself); + + try_again: + /* try to find an unused channel */ + for (cix = 0; cix < 4; cix++) + if (!conn->channels[cix]) + goto obtained_chan; + + /* no free channels - wait for one to become available */ + ret = -EINTR; + if (signal_pending(current)) + goto error_unwait; + + spin_unlock(&conn->lock); + + schedule(); + set_current_state(TASK_INTERRUPTIBLE); + + spin_lock(&conn->lock); + goto try_again; + + /* got a channel - now attach to the connection */ + obtained_chan: + remove_wait_queue(&conn->chanwait, &myself); + set_current_state(TASK_RUNNING); + + /* concoct a unique call number */ + next_callid: + call->call_id = htonl(++conn->call_counter); + for (loop = 0; loop < 4; loop++) + if (conn->channels[loop] && + conn->channels[loop]->call_id == call->call_id) + goto next_callid; + + rxrpc_get_connection(conn); + conn->channels[cix] = call; /* assign _after_ done callid check loop */ + do_gettimeofday(&conn->atime); + call->chan_ix = htonl(cix); + + spin_unlock(&conn->lock); + + down_write(&rxrpc_calls_sem); + list_add_tail(&call->call_link, &rxrpc_calls); + up_write(&rxrpc_calls_sem); + + __RXACCT(atomic_inc(&rxrpc_call_count)); + *_call = call; + + _leave(" = 0 (call=%p cix=%u)", call, cix); + return 0; + + error_unwait: + remove_wait_queue(&conn->chanwait, &myself); + set_current_state(TASK_RUNNING); + spin_unlock(&conn->lock); + + free_page((unsigned long) call); + _leave(" = %d", ret); + return ret; +} /* end rxrpc_create_call() */ + +/*****************************************************************************/ +/* + * create a new call record for incoming calls + */ +int rxrpc_incoming_call(struct rxrpc_connection *conn, + struct rxrpc_message *msg, + struct rxrpc_call **_call) +{ + struct rxrpc_call *call; + unsigned cix; + int ret; + + cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK; + + _enter("%p,%u,%u", conn, ntohl(msg->hdr.callNumber), cix); + + /* allocate and initialise a call record */ + ret = __rxrpc_create_call(conn, &call); + if (ret < 0) { + _leave(" = %d", ret); + return ret; + } + + call->pkt_rcv_count = 1; + call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID; + call->app_mark = sizeof(uint32_t); + + _state(call); + + /* attach to the connection */ + ret = -EBUSY; + call->chan_ix = htonl(cix); + call->call_id = msg->hdr.callNumber; + + spin_lock(&conn->lock); + + if (!conn->channels[cix] || + conn->channels[cix]->app_call_state == RXRPC_CSTATE_COMPLETE || + conn->channels[cix]->app_call_state == RXRPC_CSTATE_ERROR + ) { + conn->channels[cix] = call; + rxrpc_get_connection(conn); + ret = 0; + } + + spin_unlock(&conn->lock); + + if (ret < 0) { + free_page((unsigned long) call); + call = NULL; + } + + if (ret == 0) { + down_write(&rxrpc_calls_sem); + list_add_tail(&call->call_link, &rxrpc_calls); + up_write(&rxrpc_calls_sem); + __RXACCT(atomic_inc(&rxrpc_call_count)); + *_call = call; + } + + _leave(" = %d [%p]", ret, call); + return ret; +} /* end rxrpc_incoming_call() */ + +/*****************************************************************************/ +/* + * free a call record + */ +void rxrpc_put_call(struct rxrpc_call *call) +{ + struct rxrpc_connection *conn = call->conn; + struct rxrpc_message *msg; + + _enter("%p{u=%d}",call,atomic_read(&call->usage)); + + /* sanity check */ + if (atomic_read(&call->usage) <= 0) + BUG(); + + /* to prevent a race, the decrement and the de-list must be effectively + * atomic */ + spin_lock(&conn->lock); + if (likely(!atomic_dec_and_test(&call->usage))) { + spin_unlock(&conn->lock); + _leave(""); + return; + } + + if (conn->channels[ntohl(call->chan_ix)] == call) + conn->channels[ntohl(call->chan_ix)] = NULL; + + spin_unlock(&conn->lock); + + wake_up(&conn->chanwait); + + rxrpc_put_connection(conn); + + /* clear the timers and dequeue from krxiod */ + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + + rxrpc_krxiod_dequeue_call(call); + + /* clean up the contents of the struct */ + if (call->snd_nextmsg) + rxrpc_put_message(call->snd_nextmsg); + + if (call->snd_ping) + rxrpc_put_message(call->snd_ping); + + while (!list_empty(&call->acks_pendq)) { + msg = list_entry(call->acks_pendq.next, + struct rxrpc_message, link); + list_del(&msg->link); + rxrpc_put_message(msg); + } + + while (!list_empty(&call->rcv_receiveq)) { + msg = list_entry(call->rcv_receiveq.next, + struct rxrpc_message, link); + list_del(&msg->link); + rxrpc_put_message(msg); + } + + while (!list_empty(&call->app_readyq)) { + msg = list_entry(call->app_readyq.next, + struct rxrpc_message, link); + list_del(&msg->link); + rxrpc_put_message(msg); + } + + while (!list_empty(&call->app_unreadyq)) { + msg = list_entry(call->app_unreadyq.next, + struct rxrpc_message, link); + list_del(&msg->link); + rxrpc_put_message(msg); + } + + module_put(call->owner); + + down_write(&rxrpc_calls_sem); + list_del(&call->call_link); + up_write(&rxrpc_calls_sem); + + __RXACCT(atomic_dec(&rxrpc_call_count)); + free_page((unsigned long) call); + + _leave(" [destroyed]"); +} /* end rxrpc_put_call() */ + +/*****************************************************************************/ +/* + * actually generate a normal ACK + */ +static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call, + rxrpc_seq_t seq) +{ + struct rxrpc_message *msg; + struct kvec diov[3]; + __be32 aux[4]; + int delta, ret; + + /* ACKs default to DELAY */ + if (!call->ackr.reason) + call->ackr.reason = RXRPC_ACK_DELAY; + + _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", + jiffies - call->cjif, + ntohs(call->ackr.maxSkew), + ntohl(call->ackr.firstPacket), + ntohl(call->ackr.previousPacket), + ntohl(call->ackr.serial), + rxrpc_acks[call->ackr.reason], + call->ackr.nAcks); + + aux[0] = htonl(call->conn->peer->if_mtu); /* interface MTU */ + aux[1] = htonl(1444); /* max MTU */ + aux[2] = htonl(16); /* rwind */ + aux[3] = htonl(4); /* max packets */ + + diov[0].iov_len = sizeof(struct rxrpc_ackpacket); + diov[0].iov_base = &call->ackr; + diov[1].iov_len = call->ackr_pend_cnt + 3; + diov[1].iov_base = call->ackr_array; + diov[2].iov_len = sizeof(aux); + diov[2].iov_base = &aux; + + /* build and send the message */ + ret = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK, + 3, diov, GFP_KERNEL, &msg); + if (ret < 0) + goto out; + + msg->seq = seq; + msg->hdr.seq = htonl(seq); + msg->hdr.flags |= RXRPC_SLOW_START_OK; + + ret = rxrpc_conn_sendmsg(call->conn, msg); + rxrpc_put_message(msg); + if (ret < 0) + goto out; + call->pkt_snd_count++; + + /* count how many actual ACKs there were at the front */ + for (delta = 0; delta < call->ackr_pend_cnt; delta++) + if (call->ackr_array[delta] != RXRPC_ACK_TYPE_ACK) + break; + + call->ackr_pend_cnt -= delta; /* all ACK'd to this point */ + + /* crank the ACK window around */ + if (delta == 0) { + /* un-ACK'd window */ + } + else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) { + /* partially ACK'd window + * - shuffle down to avoid losing out-of-sequence packets + */ + call->ackr_win_bot += delta; + call->ackr_win_top += delta; + + memmove(&call->ackr_array[0], + &call->ackr_array[delta], + call->ackr_pend_cnt); + + memset(&call->ackr_array[call->ackr_pend_cnt], + RXRPC_ACK_TYPE_NACK, + sizeof(call->ackr_array) - call->ackr_pend_cnt); + } + else { + /* fully ACK'd window + * - just clear the whole thing + */ + memset(&call->ackr_array, + RXRPC_ACK_TYPE_NACK, + sizeof(call->ackr_array)); + } + + /* clear this ACK */ + memset(&call->ackr, 0, sizeof(call->ackr)); + + out: + if (!call->app_call_state) + printk("___ STATE 0 ___\n"); + return ret; +} /* end __rxrpc_call_gen_normal_ACK() */ + +/*****************************************************************************/ +/* + * note the reception of a packet in the call's ACK records and generate an + * appropriate ACK packet if necessary + * - returns 0 if packet should be processed, 1 if packet should be ignored + * and -ve on an error + */ +static int rxrpc_call_generate_ACK(struct rxrpc_call *call, + struct rxrpc_header *hdr, + struct rxrpc_ackpacket *ack) +{ + struct rxrpc_message *msg; + rxrpc_seq_t seq; + unsigned offset; + int ret = 0, err; + u8 special_ACK, do_ACK, force; + + _enter("%p,%p { seq=%d tp=%d fl=%02x }", + call, hdr, ntohl(hdr->seq), hdr->type, hdr->flags); + + seq = ntohl(hdr->seq); + offset = seq - call->ackr_win_bot; + do_ACK = RXRPC_ACK_DELAY; + special_ACK = 0; + force = (seq == 1); + + if (call->ackr_high_seq < seq) + call->ackr_high_seq = seq; + + /* deal with generation of obvious special ACKs first */ + if (ack && ack->reason == RXRPC_ACK_PING) { + special_ACK = RXRPC_ACK_PING_RESPONSE; + ret = 1; + goto gen_ACK; + } + + if (seq < call->ackr_win_bot) { + special_ACK = RXRPC_ACK_DUPLICATE; + ret = 1; + goto gen_ACK; + } + + if (seq >= call->ackr_win_top) { + special_ACK = RXRPC_ACK_EXCEEDS_WINDOW; + ret = 1; + goto gen_ACK; + } + + if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) { + special_ACK = RXRPC_ACK_DUPLICATE; + ret = 1; + goto gen_ACK; + } + + /* okay... it's a normal data packet inside the ACK window */ + call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK; + + if (offset < call->ackr_pend_cnt) { + } + else if (offset > call->ackr_pend_cnt) { + do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE; + call->ackr_pend_cnt = offset; + goto gen_ACK; + } + + if (hdr->flags & RXRPC_REQUEST_ACK) { + do_ACK = RXRPC_ACK_REQUESTED; + } + + /* generate an ACK on the final packet of a reply just received */ + if (hdr->flags & RXRPC_LAST_PACKET) { + if (call->conn->out_clientflag) + force = 1; + } + else if (!(hdr->flags & RXRPC_MORE_PACKETS)) { + do_ACK = RXRPC_ACK_REQUESTED; + } + + /* re-ACK packets previously received out-of-order */ + for (offset++; offset < RXRPC_CALL_ACK_WINDOW_SIZE; offset++) + if (call->ackr_array[offset] != RXRPC_ACK_TYPE_ACK) + break; + + call->ackr_pend_cnt = offset; + + /* generate an ACK if we fill up the window */ + if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE) + force = 1; + + gen_ACK: + _debug("%05lu ACKs pend=%u norm=%s special=%s%s", + jiffies - call->cjif, + call->ackr_pend_cnt, + rxrpc_acks[do_ACK], + rxrpc_acks[special_ACK], + force ? " immediate" : + do_ACK == RXRPC_ACK_REQUESTED ? " merge-req" : + hdr->flags & RXRPC_LAST_PACKET ? " finalise" : + " defer" + ); + + /* send any pending normal ACKs if need be */ + if (call->ackr_pend_cnt > 0) { + /* fill out the appropriate form */ + call->ackr.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); + call->ackr.maxSkew = htons(min(call->ackr_high_seq - seq, + 65535U)); + call->ackr.firstPacket = htonl(call->ackr_win_bot); + call->ackr.previousPacket = call->ackr_prev_seq; + call->ackr.serial = hdr->serial; + call->ackr.nAcks = call->ackr_pend_cnt; + + if (do_ACK == RXRPC_ACK_REQUESTED) + call->ackr.reason = do_ACK; + + /* generate the ACK immediately if necessary */ + if (special_ACK || force) { + err = __rxrpc_call_gen_normal_ACK( + call, do_ACK == RXRPC_ACK_DELAY ? 0 : seq); + if (err < 0) { + ret = err; + goto out; + } + } + } + + if (call->ackr.reason == RXRPC_ACK_REQUESTED) + call->ackr_dfr_seq = seq; + + /* start the ACK timer if not running if there are any pending deferred + * ACKs */ + if (call->ackr_pend_cnt > 0 && + call->ackr.reason != RXRPC_ACK_REQUESTED && + !timer_pending(&call->ackr_dfr_timo) + ) { + unsigned long timo; + + timo = rxrpc_call_dfr_ack_timeout + jiffies; + + _debug("START ACKR TIMER for cj=%lu", timo - call->cjif); + + spin_lock(&call->lock); + mod_timer(&call->ackr_dfr_timo, timo); + spin_unlock(&call->lock); + } + else if ((call->ackr_pend_cnt == 0 || + call->ackr.reason == RXRPC_ACK_REQUESTED) && + timer_pending(&call->ackr_dfr_timo) + ) { + /* stop timer if no pending ACKs */ + _debug("CLEAR ACKR TIMER"); + del_timer_sync(&call->ackr_dfr_timo); + } + + /* send a special ACK if one is required */ + if (special_ACK) { + struct rxrpc_ackpacket ack; + struct kvec diov[2]; + uint8_t acks[1] = { RXRPC_ACK_TYPE_ACK }; + + /* fill out the appropriate form */ + ack.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); + ack.maxSkew = htons(min(call->ackr_high_seq - seq, + 65535U)); + ack.firstPacket = htonl(call->ackr_win_bot); + ack.previousPacket = call->ackr_prev_seq; + ack.serial = hdr->serial; + ack.reason = special_ACK; + ack.nAcks = 0; + + _proto("Rx Sending s-ACK" + " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", + ntohs(ack.maxSkew), + ntohl(ack.firstPacket), + ntohl(ack.previousPacket), + ntohl(ack.serial), + rxrpc_acks[ack.reason], + ack.nAcks); + + diov[0].iov_len = sizeof(struct rxrpc_ackpacket); + diov[0].iov_base = &ack; + diov[1].iov_len = sizeof(acks); + diov[1].iov_base = acks; + + /* build and send the message */ + err = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK, + hdr->seq ? 2 : 1, diov, + GFP_KERNEL, + &msg); + if (err < 0) { + ret = err; + goto out; + } + + msg->seq = seq; + msg->hdr.seq = htonl(seq); + msg->hdr.flags |= RXRPC_SLOW_START_OK; + + err = rxrpc_conn_sendmsg(call->conn, msg); + rxrpc_put_message(msg); + if (err < 0) { + ret = err; + goto out; + } + call->pkt_snd_count++; + } + + out: + if (hdr->seq) + call->ackr_prev_seq = hdr->seq; + + _leave(" = %d", ret); + return ret; +} /* end rxrpc_call_generate_ACK() */ + +/*****************************************************************************/ +/* + * handle work to be done on a call + * - includes packet reception and timeout processing + */ +void rxrpc_call_do_stuff(struct rxrpc_call *call) +{ + _enter("%p{flags=%lx}", call, call->flags); + + /* handle packet reception */ + if (call->flags & RXRPC_CALL_RCV_PKT) { + _debug("- receive packet"); + call->flags &= ~RXRPC_CALL_RCV_PKT; + rxrpc_call_receive_packet(call); + } + + /* handle overdue ACKs */ + if (call->flags & RXRPC_CALL_ACKS_TIMO) { + _debug("- overdue ACK timeout"); + call->flags &= ~RXRPC_CALL_ACKS_TIMO; + rxrpc_call_resend(call, call->snd_seq_count); + } + + /* handle lack of reception */ + if (call->flags & RXRPC_CALL_RCV_TIMO) { + _debug("- reception timeout"); + call->flags &= ~RXRPC_CALL_RCV_TIMO; + rxrpc_call_abort(call, -EIO); + } + + /* handle deferred ACKs */ + if (call->flags & RXRPC_CALL_ACKR_TIMO || + (call->ackr.nAcks > 0 && call->ackr.reason == RXRPC_ACK_REQUESTED) + ) { + _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u", + jiffies - call->cjif, + rxrpc_acks[call->ackr.reason], + call->ackr.nAcks); + + call->flags &= ~RXRPC_CALL_ACKR_TIMO; + + if (call->ackr.nAcks > 0 && + call->app_call_state != RXRPC_CSTATE_ERROR) { + /* generate ACK */ + __rxrpc_call_gen_normal_ACK(call, call->ackr_dfr_seq); + call->ackr_dfr_seq = 0; + } + } + + _leave(""); + +} /* end rxrpc_call_do_stuff() */ + +/*****************************************************************************/ +/* + * send an abort message at call or connection level + * - must be called with call->lock held + * - the supplied error code is sent as the packet data + */ +static int __rxrpc_call_abort(struct rxrpc_call *call, int errno) +{ + struct rxrpc_connection *conn = call->conn; + struct rxrpc_message *msg; + struct kvec diov[1]; + int ret; + __be32 _error; + + _enter("%p{%08x},%p{%d},%d", + conn, ntohl(conn->conn_id), call, ntohl(call->call_id), errno); + + /* if this call is already aborted, then just wake up any waiters */ + if (call->app_call_state == RXRPC_CSTATE_ERROR) { + spin_unlock(&call->lock); + call->app_error_func(call); + _leave(" = 0"); + return 0; + } + + rxrpc_get_call(call); + + /* change the state _with_ the lock still held */ + call->app_call_state = RXRPC_CSTATE_ERROR; + call->app_err_state = RXRPC_ESTATE_LOCAL_ABORT; + call->app_errno = errno; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_read_buf = NULL; + call->app_async_read = 0; + + _state(call); + + /* ask the app to translate the error code */ + call->app_aemap_func(call); + + spin_unlock(&call->lock); + + /* flush any outstanding ACKs */ + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + + if (rxrpc_call_is_ack_pending(call)) + __rxrpc_call_gen_normal_ACK(call, 0); + + /* send the abort packet only if we actually traded some other + * packets */ + ret = 0; + if (call->pkt_snd_count || call->pkt_rcv_count) { + /* actually send the abort */ + _proto("Rx Sending Call ABORT { data=%d }", + call->app_abort_code); + + _error = htonl(call->app_abort_code); + + diov[0].iov_len = sizeof(_error); + diov[0].iov_base = &_error; + + ret = rxrpc_conn_newmsg(conn, call, RXRPC_PACKET_TYPE_ABORT, + 1, diov, GFP_KERNEL, &msg); + if (ret == 0) { + ret = rxrpc_conn_sendmsg(conn, msg); + rxrpc_put_message(msg); + } + } + + /* tell the app layer to let go */ + call->app_error_func(call); + + rxrpc_put_call(call); + + _leave(" = %d", ret); + return ret; +} /* end __rxrpc_call_abort() */ + +/*****************************************************************************/ +/* + * send an abort message at call or connection level + * - the supplied error code is sent as the packet data + */ +int rxrpc_call_abort(struct rxrpc_call *call, int error) +{ + spin_lock(&call->lock); + + return __rxrpc_call_abort(call, error); + +} /* end rxrpc_call_abort() */ + +/*****************************************************************************/ +/* + * process packets waiting for this call + */ +static void rxrpc_call_receive_packet(struct rxrpc_call *call) +{ + struct rxrpc_message *msg; + struct list_head *_p; + + _enter("%p", call); + + rxrpc_get_call(call); /* must not go away too soon if aborted by + * app-layer */ + + while (!list_empty(&call->rcv_receiveq)) { + /* try to get next packet */ + _p = NULL; + spin_lock(&call->lock); + if (!list_empty(&call->rcv_receiveq)) { + _p = call->rcv_receiveq.next; + list_del_init(_p); + } + spin_unlock(&call->lock); + + if (!_p) + break; + + msg = list_entry(_p, struct rxrpc_message, link); + + _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)", + jiffies - call->cjif, + rxrpc_pkts[msg->hdr.type], + ntohl(msg->hdr.serial), + msg->seq, + msg->hdr.flags & RXRPC_JUMBO_PACKET ? 'j' : '-', + msg->hdr.flags & RXRPC_MORE_PACKETS ? 'm' : '-', + msg->hdr.flags & RXRPC_LAST_PACKET ? 'l' : '-', + msg->hdr.flags & RXRPC_REQUEST_ACK ? 'r' : '-', + msg->hdr.flags & RXRPC_CLIENT_INITIATED ? 'C' : 'S' + ); + + switch (msg->hdr.type) { + /* deal with data packets */ + case RXRPC_PACKET_TYPE_DATA: + /* ACK the packet if necessary */ + switch (rxrpc_call_generate_ACK(call, &msg->hdr, + NULL)) { + case 0: /* useful packet */ + rxrpc_call_receive_data_packet(call, msg); + break; + case 1: /* duplicate or out-of-window packet */ + break; + default: + rxrpc_put_message(msg); + goto out; + } + break; + + /* deal with ACK packets */ + case RXRPC_PACKET_TYPE_ACK: + rxrpc_call_receive_ack_packet(call, msg); + break; + + /* deal with abort packets */ + case RXRPC_PACKET_TYPE_ABORT: { + __be32 _dbuf, *dp; + + dp = skb_header_pointer(msg->pkt, msg->offset, + sizeof(_dbuf), &_dbuf); + if (dp == NULL) + printk("Rx Received short ABORT packet\n"); + + _proto("Rx Received Call ABORT { data=%d }", + (dp ? ntohl(*dp) : 0)); + + spin_lock(&call->lock); + call->app_call_state = RXRPC_CSTATE_ERROR; + call->app_err_state = RXRPC_ESTATE_PEER_ABORT; + call->app_abort_code = (dp ? ntohl(*dp) : 0); + call->app_errno = -ECONNABORTED; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_read_buf = NULL; + call->app_async_read = 0; + + /* ask the app to translate the error code */ + call->app_aemap_func(call); + _state(call); + spin_unlock(&call->lock); + call->app_error_func(call); + break; + } + default: + /* deal with other packet types */ + _proto("Rx Unsupported packet type %u (#%u)", + msg->hdr.type, msg->seq); + break; + } + + rxrpc_put_message(msg); + } + + out: + rxrpc_put_call(call); + _leave(""); +} /* end rxrpc_call_receive_packet() */ + +/*****************************************************************************/ +/* + * process next data packet + * - as the next data packet arrives: + * - it is queued on app_readyq _if_ it is the next one expected + * (app_ready_seq+1) + * - it is queued on app_unreadyq _if_ it is not the next one expected + * - if a packet placed on app_readyq completely fills a hole leading up to + * the first packet on app_unreadyq, then packets now in sequence are + * tranferred to app_readyq + * - the application layer can only see packets on app_readyq + * (app_ready_qty bytes) + * - the application layer is prodded every time a new packet arrives + */ +static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, + struct rxrpc_message *msg) +{ + const struct rxrpc_operation *optbl, *op; + struct rxrpc_message *pmsg; + struct list_head *_p; + int ret, lo, hi, rmtimo; + __be32 opid; + + _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq); + + rxrpc_get_message(msg); + + /* add to the unready queue if we'd have to create a hole in the ready + * queue otherwise */ + if (msg->seq != call->app_ready_seq + 1) { + _debug("Call add packet %d to unreadyq", msg->seq); + + /* insert in seq order */ + list_for_each(_p, &call->app_unreadyq) { + pmsg = list_entry(_p, struct rxrpc_message, link); + if (pmsg->seq > msg->seq) + break; + } + + list_add_tail(&msg->link, _p); + + _leave(" [unreadyq]"); + return; + } + + /* next in sequence - simply append into the call's ready queue */ + _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)", + msg->seq, msg->dsize, call->app_ready_qty); + + spin_lock(&call->lock); + call->app_ready_seq = msg->seq; + call->app_ready_qty += msg->dsize; + list_add_tail(&msg->link, &call->app_readyq); + + /* move unready packets to the readyq if we got rid of a hole */ + while (!list_empty(&call->app_unreadyq)) { + pmsg = list_entry(call->app_unreadyq.next, + struct rxrpc_message, link); + + if (pmsg->seq != call->app_ready_seq + 1) + break; + + /* next in sequence - just move list-to-list */ + _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)", + pmsg->seq, pmsg->dsize, call->app_ready_qty); + + call->app_ready_seq = pmsg->seq; + call->app_ready_qty += pmsg->dsize; + list_del_init(&pmsg->link); + list_add_tail(&pmsg->link, &call->app_readyq); + } + + /* see if we've got the last packet yet */ + if (!list_empty(&call->app_readyq)) { + pmsg = list_entry(call->app_readyq.prev, + struct rxrpc_message, link); + if (pmsg->hdr.flags & RXRPC_LAST_PACKET) { + call->app_last_rcv = 1; + _debug("Last packet on readyq"); + } + } + + switch (call->app_call_state) { + /* do nothing if call already aborted */ + case RXRPC_CSTATE_ERROR: + spin_unlock(&call->lock); + _leave(" [error]"); + return; + + /* extract the operation ID from an incoming call if that's not + * yet been done */ + case RXRPC_CSTATE_SRVR_RCV_OPID: + spin_unlock(&call->lock); + + /* handle as yet insufficient data for the operation ID */ + if (call->app_ready_qty < 4) { + if (call->app_last_rcv) + /* trouble - last packet seen */ + rxrpc_call_abort(call, -EINVAL); + + _leave(""); + return; + } + + /* pull the operation ID out of the buffer */ + ret = rxrpc_call_read_data(call, &opid, sizeof(opid), 0); + if (ret < 0) { + printk("Unexpected error from read-data: %d\n", ret); + if (call->app_call_state != RXRPC_CSTATE_ERROR) + rxrpc_call_abort(call, ret); + _leave(""); + return; + } + call->app_opcode = ntohl(opid); + + /* locate the operation in the available ops table */ + optbl = call->conn->service->ops_begin; + lo = 0; + hi = call->conn->service->ops_end - optbl; + + while (lo < hi) { + int mid = (hi + lo) / 2; + op = &optbl[mid]; + if (call->app_opcode == op->id) + goto found_op; + if (call->app_opcode > op->id) + lo = mid + 1; + else + hi = mid; + } + + /* search failed */ + kproto("Rx Client requested operation %d from %s service", + call->app_opcode, call->conn->service->name); + rxrpc_call_abort(call, -EINVAL); + _leave(" [inval]"); + return; + + found_op: + _proto("Rx Client requested operation %s from %s service", + op->name, call->conn->service->name); + + /* we're now waiting for the argument block (unless the call + * was aborted) */ + spin_lock(&call->lock); + if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_OPID || + call->app_call_state == RXRPC_CSTATE_SRVR_SND_REPLY) { + if (!call->app_last_rcv) + call->app_call_state = + RXRPC_CSTATE_SRVR_RCV_ARGS; + else if (call->app_ready_qty > 0) |