diff options
Diffstat (limited to 'net/smc')
-rw-r--r-- | net/smc/smc.h | 2 | ||||
-rw-r--r-- | net/smc/smc_cdc.c | 11 | ||||
-rw-r--r-- | net/smc/smc_tx.c | 107 |
3 files changed, 105 insertions, 15 deletions
diff --git a/net/smc/smc.h b/net/smc/smc.h index a096d8af21a0..e266b04b7585 100644 --- a/net/smc/smc.h +++ b/net/smc/smc.h @@ -29,6 +29,7 @@ #define SMC_MAX_ISM_DEVS 8 /* max # of proposed non-native ISM * devices */ +#define SMC_AUTOCORKING_DEFAULT_SIZE 0x10000 /* 64K by default */ extern struct proto smc_proto; extern struct proto smc_proto6; @@ -192,6 +193,7 @@ struct smc_connection { * - dec on polled tx cqe */ wait_queue_head_t cdc_pend_tx_wq; /* wakeup on no cdc_pend_tx_wr*/ + atomic_t tx_pushing; /* nr_threads trying tx push */ struct delayed_work tx_work; /* retry of smc_cdc_msg_send */ u32 tx_off; /* base offset in peer rmb */ diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c index 9d5a97168969..2b37bec90824 100644 --- a/net/smc/smc_cdc.c +++ b/net/smc/smc_cdc.c @@ -48,9 +48,14 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd, conn->tx_cdc_seq_fin = cdcpend->ctrl_seq; } - if (atomic_dec_and_test(&conn->cdc_pend_tx_wr) && - unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq))) - wake_up(&conn->cdc_pend_tx_wq); + if (atomic_dec_and_test(&conn->cdc_pend_tx_wr)) { + /* If this is the last pending WR complete, we must push to + * prevent hang when autocork enabled. + */ + smc_tx_sndbuf_nonempty(conn); + if (unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq))) + wake_up(&conn->cdc_pend_tx_wq); + } WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0); smc_tx_sndbuf_nonfull(smc); diff --git a/net/smc/smc_tx.c b/net/smc/smc_tx.c index 436ac836f363..062c6b1535e3 100644 --- a/net/smc/smc_tx.c +++ b/net/smc/smc_tx.c @@ -131,6 +131,51 @@ static bool smc_tx_is_corked(struct smc_sock *smc) return (tp->nonagle & TCP_NAGLE_CORK) ? true : false; } +/* If we have pending CDC messages, do not send: + * Because CQE of this CDC message will happen shortly, it gives + * a chance to coalesce future sendmsg() payload in to one RDMA Write, + * without need for a timer, and with no latency trade off. + * Algorithm here: + * 1. First message should never cork + * 2. If we have pending Tx CDC messages, wait for the first CDC + * message's completion + * 3. Don't cork to much data in a single RDMA Write to prevent burst + * traffic, total corked message should not exceed sendbuf/2 + */ +static bool smc_should_autocork(struct smc_sock *smc) +{ + struct smc_connection *conn = &smc->conn; + int corking_size; + + corking_size = min(SMC_AUTOCORKING_DEFAULT_SIZE, + conn->sndbuf_desc->len >> 1); + + if (atomic_read(&conn->cdc_pend_tx_wr) == 0 || + smc_tx_prepared_sends(conn) > corking_size) + return false; + return true; +} + +static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg) +{ + struct smc_connection *conn = &smc->conn; + + if (smc_should_autocork(smc)) + return true; + + /* for a corked socket defer the RDMA writes if + * sndbuf_space is still available. The applications + * should known how/when to uncork it. + */ + if ((msg->msg_flags & MSG_MORE || + smc_tx_is_corked(smc) || + msg->msg_flags & MSG_SENDPAGE_NOTLAST) && + atomic_read(&conn->sndbuf_space)) + return true; + + return false; +} + /* sndbuf producer: main API called by socket layer. * called under sock lock. */ @@ -235,13 +280,10 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) */ if ((msg->msg_flags & MSG_OOB) && !send_remaining) conn->urg_tx_pend = true; - /* for a corked socket defer the RDMA writes if - * sndbuf_space is still available. The applications - * should known how/when to uncork it. + /* If we need to cork, do nothing and wait for the next + * sendmsg() call or push on tx completion */ - if (!((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc) || - msg->msg_flags & MSG_SENDPAGE_NOTLAST) && - atomic_read(&conn->sndbuf_space))) + if (!smc_tx_should_cork(smc, msg)) smc_tx_sndbuf_nonempty(conn); trace_smc_tx_sendmsg(smc, copylen); @@ -589,13 +631,26 @@ static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn) return rc; } -int smc_tx_sndbuf_nonempty(struct smc_connection *conn) +static int __smc_tx_sndbuf_nonempty(struct smc_connection *conn) { - int rc; + struct smc_sock *smc = container_of(conn, struct smc_sock, conn); + int rc = 0; + + /* No data in the send queue */ + if (unlikely(smc_tx_prepared_sends(conn) <= 0)) + goto out; + + /* Peer don't have RMBE space */ + if (unlikely(atomic_read(&conn->peer_rmbe_space) <= 0)) { + SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk); + goto out; + } if (conn->killed || - conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) - return -EPIPE; /* connection being aborted */ + conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) { + rc = -EPIPE; /* connection being aborted */ + goto out; + } if (conn->lgr->is_smcd) rc = smcd_tx_sndbuf_nonempty(conn); else @@ -603,10 +658,38 @@ int smc_tx_sndbuf_nonempty(struct smc_connection *conn) if (!rc) { /* trigger socket release if connection is closing */ - struct smc_sock *smc = container_of(conn, struct smc_sock, - conn); smc_close_wake_tx_prepared(smc); } + +out: + return rc; +} + +int smc_tx_sndbuf_nonempty(struct smc_connection *conn) +{ + int rc; + + /* This make sure only one can send simultaneously to prevent wasting + * of CPU and CDC slot. + * Record whether someone has tried to push while we are pushing. + */ + if (atomic_inc_return(&conn->tx_pushing) > 1) + return 0; + +again: + atomic_set(&conn->tx_pushing, 1); + smp_wmb(); /* Make sure tx_pushing is 1 before real send */ + rc = __smc_tx_sndbuf_nonempty(conn); + + /* We need to check whether someone else have added some data into + * the send queue and tried to push but failed after the atomic_set() + * when we are pushing. + * If so, we need to push again to prevent those data hang in the send + * queue. + */ + if (unlikely(!atomic_dec_and_test(&conn->tx_pushing))) + goto again; + return rc; } |