/*
* Copyright (c) 2006 Oracle. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
* General Public License (GPL) Version 2, available from the file
* COPYING in the main directory of this source tree, or the
* OpenIB.org BSD license below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*/
#include <linux/kernel.h>
#include <linux/gfp.h>
#include <net/sock.h>
#include <linux/in.h>
#include <linux/list.h>
#include "rds.h"
/* When transmitting messages in rds_send_xmit, we need to emerge from
* time to time and briefly release the CPU. Otherwise the softlock watchdog
* will kick our shin.
* Also, it seems fairer to not let one busy connection stall all the
* others.
*
* send_batch_count is the number of times we'll loop in send_xmit. Setting
* it to 0 will restore the old behavior (where we looped until we had
* drained the queue).
*/
static int send_batch_count = 64;
module_param(send_batch_count, int, 0444);
MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
/*
* Reset the send state. Caller must hold c_send_lock when calling here.
*/
void rds_send_reset(struct rds_connection *conn)
{
struct rds_message *rm, *tmp;
unsigned long flags;
if (conn->c_xmit_rm) {
/* Tell the user the RDMA op is no longer mapped by the
* transport. This isn't entirely true (it's flushed out
* independently) but as the connection is down, there's
* no ongoing RDMA to/from that memory */
rds_message_unmapped(conn->c_xmit_rm);
rds_message_put(conn->c_xmit_rm);
conn->c_xmit_rm = NULL;
}
conn->c_xmit_sg = 0;
conn->c_xmit_hdr_off = 0;
conn->c_xmit_data_off = 0;
conn->c_xmit_atomic_sent = 0;
conn->c_xmit_rdma_sent = 0;
conn->c_xmit_data_sent = 0;
conn->c_map_queued = 0;
conn->c_unacked_packets = rds_sysctl_max_unacked_packets;
conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes;
/* Mark messages as retransmissions, and move them to the send q */
spin_lock_irqsave(&conn->c_lock, flags);
list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
}
list_splice_init(&conn->c_retrans, &conn->c_send_queue);
spin_unlock_irqrestore(&conn->c_lock, flags);
}
/*
* We're making the concious trade-off here to only send one message
* down the connection at a time.
* Pro:
* - tx queueing is a simple fifo list
* - reassembly is optional and easily done by transports per conn
* - no per flow rx lookup at all, straight to the socket
* - less per-frag memory and wire overhead
* Con:
* - queued acks can be delayed behind large messages
* Depends:
* - small message latency is higher behind queued large messages
* - large message latency isn't starved by intervening small sends
*/
int rds_send_xmit(struct rds_connection *conn)
{
struct rds_message *rm;
unsigned long flags;
unsigned int tmp;
unsigned int send_quota = send_batch_count;
struct scatterlist *sg;
int ret = 0;
int was_empty = 0;
LIST_HEAD(to_be_dropped);
/*
* sendmsg calls here after having queued its message on the send
* queue. We only have one task feeding the connection at a time. If
* another thread is already feeding the queue then we back off. This
* avoids blocking the caller and trading per-connection data between
* caches per message.
*
* The sem holder will issue a retry if they notice that someone queued
* a message after they stopped walking the send queue but before they
* dropped the sem.
*/
if (!mutex_trylock(&conn->c_send_lock)) {
rds_stats_inc(s_send_sem_contention);
ret = -ENOMEM;
goto out;
}
if (conn->c_trans->xmit_prepare)
conn->c_trans->xmit_prepare(conn);
/*
* spin trying to push headers and data down the connection until
* the connection doesn't make forward progress.
*/
while (--send_quota) {
rm = conn->c_xmit_rm;
/*
* If between sending messages, we can send a pending congestion
* map update.
*/
if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
rm = rds_cong_update_alloc(conn);
if (IS_ERR