aboutsummaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c94
-rw-r--r--net/sunrpc/clnt.c8
-rw-r--r--net/sunrpc/sched.c22
-rw-r--r--net/sunrpc/xdr.c55
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c8
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c154
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c32
-rw-r--r--net/sunrpc/xprtrdma/transport.c72
-rw-r--r--net/sunrpc/xprtrdma/verbs.c683
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h89
-rw-r--r--net/sunrpc/xprtsock.c2
11 files changed, 503 insertions, 716 deletions
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index ee060d57d216..25fbd8d9de74 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -20,6 +20,7 @@
#include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/auth.h>
#include <linux/sunrpc/auth_gss.h>
+#include <linux/sunrpc/gss_krb5.h>
#include <linux/sunrpc/svcauth_gss.h>
#include <linux/sunrpc/gss_err.h>
#include <linux/workqueue.h>
@@ -1050,7 +1051,7 @@ gss_create_new(const struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
goto err_put_mech;
auth = &gss_auth->rpc_auth;
auth->au_cslack = GSS_CRED_SLACK >> 2;
- auth->au_rslack = GSS_VERF_SLACK >> 2;
+ auth->au_rslack = GSS_KRB5_MAX_SLACK_NEEDED >> 2;
auth->au_verfsize = GSS_VERF_SLACK >> 2;
auth->au_ralign = GSS_VERF_SLACK >> 2;
auth->au_flags = 0;
@@ -1724,8 +1725,9 @@ bad_mic:
goto out;
}
-static int gss_wrap_req_integ(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
- struct rpc_task *task, struct xdr_stream *xdr)
+static noinline_for_stack int
+gss_wrap_req_integ(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
+ struct rpc_task *task, struct xdr_stream *xdr)
{
struct rpc_rqst *rqstp = task->tk_rqstp;
struct xdr_buf integ_buf, *snd_buf = &rqstp->rq_snd_buf;
@@ -1816,8 +1818,9 @@ out:
return -EAGAIN;
}
-static int gss_wrap_req_priv(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
- struct rpc_task *task, struct xdr_stream *xdr)
+static noinline_for_stack int
+gss_wrap_req_priv(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
+ struct rpc_task *task, struct xdr_stream *xdr)
{
struct rpc_rqst *rqstp = task->tk_rqstp;
struct xdr_buf *snd_buf = &rqstp->rq_snd_buf;
@@ -1934,35 +1937,69 @@ gss_unwrap_resp_auth(struct rpc_cred *cred)
return 0;
}
-static int
+/*
+ * RFC 2203, Section 5.3.2.2
+ *
+ * struct rpc_gss_integ_data {
+ * opaque databody_integ<>;
+ * opaque checksum<>;
+ * };
+ *
+ * struct rpc_gss_data_t {
+ * unsigned int seq_num;
+ * proc_req_arg_t arg;
+ * };
+ */
+static noinline_for_stack int
gss_unwrap_resp_integ(struct rpc_task *task, struct rpc_cred *cred,
struct gss_cl_ctx *ctx, struct rpc_rqst *rqstp,
struct xdr_stream *xdr)
{
- struct xdr_buf integ_buf, *rcv_buf = &rqstp->rq_rcv_buf;
- u32 data_offset, mic_offset, integ_len, maj_stat;
+ struct xdr_buf gss_data, *rcv_buf = &rqstp->rq_rcv_buf;
struct rpc_auth *auth = cred->cr_auth;
+ u32 len, offset, seqno, maj_stat;
struct xdr_netobj mic;
- __be32 *p;
+ int ret;
- p = xdr_inline_decode(xdr, 2 * sizeof(*p));
- if (unlikely(!p))
+ ret = -EIO;
+ mic.data = NULL;
+
+ /* opaque databody_integ<>; */
+ if (xdr_stream_decode_u32(xdr, &len))
goto unwrap_failed;
- integ_len = be32_to_cpup(p++);
- if (integ_len & 3)
+ if (len & 3)
goto unwrap_failed;
- data_offset = (u8 *)(p) - (u8 *)rcv_buf->head[0].iov_base;
- mic_offset = integ_len + data_offset;
- if (mic_offset > rcv_buf->len)
+ offset = rcv_buf->len - xdr_stream_remaining(xdr);
+ if (xdr_stream_decode_u32(xdr, &seqno))
goto unwrap_failed;
- if (be32_to_cpup(p) != rqstp->rq_seqno)
+ if (seqno != rqstp->rq_seqno)
goto bad_seqno;
+ if (xdr_buf_subsegment(rcv_buf, &gss_data, offset, len))
+ goto unwrap_failed;
- if (xdr_buf_subsegment(rcv_buf, &integ_buf, data_offset, integ_len))
+ /*
+ * The xdr_stream now points to the beginning of the
+ * upper layer payload, to be passed below to
+ * rpcauth_unwrap_resp_decode(). The checksum, which
+ * follows the upper layer payload in @rcv_buf, is
+ * located and parsed without updating the xdr_stream.
+ */
+
+ /* opaque checksum<>; */
+ offset += len;
+ if (xdr_decode_word(rcv_buf, offset, &len))
+ goto unwrap_failed;
+ offset += sizeof(__be32);
+ if (offset + len > rcv_buf->len)
goto unwrap_failed;
- if (xdr_buf_read_mic(rcv_buf, &mic, mic_offset))
+ mic.len = len;
+ mic.data = kmalloc(len, GFP_NOFS);
+ if (!mic.data)
+ goto unwrap_failed;
+ if (read_bytes_from_xdr_buf(rcv_buf, offset, mic.data, mic.len))
goto unwrap_failed;
- maj_stat = gss_verify_mic(ctx->gc_gss_ctx, &integ_buf, &mic);
+
+ maj_stat = gss_verify_mic(ctx->gc_gss_ctx, &gss_data, &mic);
if (maj_stat == GSS_S_CONTEXT_EXPIRED)
clear_bit(RPCAUTH_CRED_UPTODATE, &cred->cr_flags);
if (maj_stat != GSS_S_COMPLETE)
@@ -1970,19 +2007,24 @@ gss_unwrap_resp_integ(struct rpc_task *task, struct rpc_cred *cred,
auth->au_rslack = auth->au_verfsize + 2 + 1 + XDR_QUADLEN(mic.len);
auth->au_ralign = auth->au_verfsize + 2;
- return 0;
+ ret = 0;
+
+out:
+ kfree(mic.data);
+ return ret;
+
unwrap_failed:
trace_rpcgss_unwrap_failed(task);
- return -EIO;
+ goto out;
bad_seqno:
- trace_rpcgss_bad_seqno(task, rqstp->rq_seqno, be32_to_cpup(p));
- return -EIO;
+ trace_rpcgss_bad_seqno(task, rqstp->rq_seqno, seqno);
+ goto out;
bad_mic:
trace_rpcgss_verify_mic(task, maj_stat);
- return -EIO;
+ goto out;
}
-static int
+static noinline_for_stack int
gss_unwrap_resp_priv(struct rpc_task *task, struct rpc_cred *cred,
struct gss_cl_ctx *ctx, struct rpc_rqst *rqstp,
struct xdr_stream *xdr)
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 07992d3e8703..325a0858700f 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1099,8 +1099,9 @@ rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
task->tk_msg.rpc_proc = msg->rpc_proc;
task->tk_msg.rpc_argp = msg->rpc_argp;
task->tk_msg.rpc_resp = msg->rpc_resp;
- if (msg->rpc_cred != NULL)
- task->tk_msg.rpc_cred = get_cred(msg->rpc_cred);
+ task->tk_msg.rpc_cred = msg->rpc_cred;
+ if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
+ get_cred(task->tk_msg.rpc_cred);
}
}
@@ -1126,6 +1127,9 @@ struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
task = rpc_new_task(task_setup_data);
+ if (!RPC_IS_ASYNC(task))
+ task->tk_flags |= RPC_TASK_CRED_NOREF;
+
rpc_task_set_client(task, task_setup_data->rpc_client);
rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 55e900255b0c..7eba20a88438 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -204,10 +204,6 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
struct rpc_task *task,
unsigned char queue_priority)
{
- WARN_ON_ONCE(RPC_IS_QUEUED(task));
- if (RPC_IS_QUEUED(task))
- return;
-
INIT_LIST_HEAD(&task->u.tk_wait.timer_list);
if (RPC_IS_PRIORITY(queue))
__rpc_add_wait_queue_priority(queue, task, queue_priority);
@@ -382,7 +378,7 @@ static void rpc_make_runnable(struct workqueue_struct *wq,
* NB: An RPC task will only receive interrupt-driven events as long
* as it's on a wait queue.
*/
-static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
+static void __rpc_do_sleep_on_priority(struct rpc_wait_queue *q,
struct rpc_task *task,
unsigned char queue_priority)
{
@@ -395,12 +391,23 @@ static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
}
+static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
+ struct rpc_task *task,
+ unsigned char queue_priority)
+{
+ if (WARN_ON_ONCE(RPC_IS_QUEUED(task)))
+ return;
+ __rpc_do_sleep_on_priority(q, task, queue_priority);
+}
+
static void __rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q,
struct rpc_task *task, unsigned long timeout,
unsigned char queue_priority)
{
+ if (WARN_ON_ONCE(RPC_IS_QUEUED(task)))
+ return;
if (time_is_after_jiffies(timeout)) {
- __rpc_sleep_on_priority(q, task, queue_priority);
+ __rpc_do_sleep_on_priority(q, task, queue_priority);
__rpc_add_timer(q, task, timeout);
} else
task->tk_status = -ETIMEDOUT;
@@ -1162,7 +1169,8 @@ static void rpc_release_resources_task(struct rpc_task *task)
{
xprt_release(task);
if (task->tk_msg.rpc_cred) {
- put_cred(task->tk_msg.rpc_cred);
+ if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
+ put_cred(task->tk_msg.rpc_cred);
task->tk_msg.rpc_cred = NULL;
}
rpc_task_release_client(task);
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index e5497dc2475b..15b58c5144f9 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -1235,61 +1235,6 @@ xdr_encode_word(struct xdr_buf *buf, unsigned int base, u32 obj)
}
EXPORT_SYMBOL_GPL(xdr_encode_word);
-/**
- * xdr_buf_read_mic() - obtain the address of the GSS mic from xdr buf
- * @buf: pointer to buffer containing a mic
- * @mic: on success, returns the address of the mic
- * @offset: the offset in buf where mic may be found
- *
- * This function may modify the xdr buf if the mic is found to be straddling
- * a boundary between head, pages, and tail. On success the mic can be read
- * from the address returned. There is no need to free the mic.
- *
- * Return: Success returns 0, otherwise an integer error.
- */
-int xdr_buf_read_mic(struct xdr_buf *buf, struct xdr_netobj *mic, unsigned int offset)
-{
- struct xdr_buf subbuf;
- unsigned int boundary;
-
- if (xdr_decode_word(buf, offset, &mic->len))
- return -EFAULT;
- offset += 4;
-
- /* Is the mic partially in the head? */
- boundary = buf->head[0].iov_len;
- if (offset < boundary && (offset + mic->len) > boundary)
- xdr_shift_buf(buf, boundary - offset);
-
- /* Is the mic partially in the pages? */
- boundary += buf->page_len;
- if (offset < boundary && (offset + mic->len) > boundary)
- xdr_shrink_pagelen(buf, boundary - offset);
-
- if (xdr_buf_subsegment(buf, &subbuf, offset, mic->len))
- return -EFAULT;
-
- /* Is the mic contained entirely in the head? */
- mic->data = subbuf.head[0].iov_base;
- if (subbuf.head[0].iov_len == mic->len)
- return 0;
- /* ..or is the mic contained entirely in the tail? */
- mic->data = subbuf.tail[0].iov_base;
- if (subbuf.tail[0].iov_len == mic->len)
- return 0;
-
- /* Find a contiguous area in @buf to hold all of @mic */
- if (mic->len > buf->buflen - buf->len)
- return -ENOMEM;
- if (buf->tail[0].iov_len != 0)
- mic->data = buf->tail[0].iov_base + buf->tail[0].iov_len;
- else
- mic->data = buf->head[0].iov_base + buf->head[0].iov_len;
- __read_bytes_from_xdr_buf(&subbuf, mic->data, mic->len);
- return 0;
-}
-EXPORT_SYMBOL_GPL(xdr_buf_read_mic);
-
/* Returns 0 on success, or else a negative error code. */
static int
xdr_xcode_array2(struct xdr_buf *buf, unsigned int base,
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 1a0ae0c61353..c92c1aac270a 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -44,10 +44,10 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
size_t maxmsg;
- maxmsg = min_t(unsigned int, ep->rep_inline_send, ep->rep_inline_recv);
+ maxmsg = min_t(unsigned int, ep->re_inline_send, ep->re_inline_recv);
maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE);
return maxmsg - RPCRDMA_HDRLEN_MIN;
}
@@ -115,7 +115,7 @@ int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
if (rc < 0)
goto failed_marshal;
- if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
+ if (rpcrdma_post_sends(r_xprt, req))
goto drop_connection;
return 0;
@@ -190,7 +190,7 @@ create_req:
if (xprt->bc_alloc_count >= RPCRDMA_BACKWARD_WRS)
return NULL;
- size = min_t(size_t, r_xprt->rx_ep.rep_inline_recv, PAGE_SIZE);
+ size = min_t(size_t, r_xprt->rx_ep->re_inline_recv, PAGE_SIZE);
req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
if (!req)
return NULL;
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 125297c9aa3e..ef997880e17a 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -52,7 +52,7 @@
/**
* frwr_release_mr - Destroy one MR
- * @mr: MR allocated by frwr_init_mr
+ * @mr: MR allocated by frwr_mr_init
*
*/
void frwr_release_mr(struct rpcrdma_mr *mr)
@@ -74,7 +74,7 @@ static void frwr_mr_recycle(struct rpcrdma_mr *mr)
if (mr->mr_dir != DMA_NONE) {
trace_xprtrdma_mr_unmap(mr);
- ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
+ ib_dma_unmap_sg(r_xprt->rx_ep->re_id->device,
mr->mr_sg, mr->mr_nents, mr->mr_dir);
mr->mr_dir = DMA_NONE;
}
@@ -106,21 +106,22 @@ void frwr_reset(struct rpcrdma_req *req)
}
/**
- * frwr_init_mr - Initialize one MR
- * @ia: interface adapter
+ * frwr_mr_init - Initialize one MR
+ * @r_xprt: controlling transport instance
* @mr: generic MR to prepare for FRWR
*
* Returns zero if successful. Otherwise a negative errno
* is returned.
*/
-int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
+int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
{
- unsigned int depth = ia->ri_max_frwr_depth;
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
+ unsigned int depth = ep->re_max_fr_depth;
struct scatterlist *sg;
struct ib_mr *frmr;
int rc;
- frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
+ frmr = ib_alloc_mr(ep->re_pd, ep->re_mrtype, depth);
if (IS_ERR(frmr))
goto out_mr_err;
@@ -128,6 +129,7 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
if (!sg)
goto out_list_err;
+ mr->mr_xprt = r_xprt;
mr->frwr.fr_mr = frmr;
mr->mr_dir = DMA_NONE;
INIT_LIST_HEAD(&mr->mr_list);
@@ -149,29 +151,24 @@ out_list_err:
/**
* frwr_query_device - Prepare a transport for use with FRWR
- * @r_xprt: controlling transport instance
+ * @ep: endpoint to fill in
* @device: RDMA device to query
*
* On success, sets:
- * ep->rep_attr
- * ep->rep_max_requests
- * ia->ri_max_rdma_segs
- *
- * And these FRWR-related fields:
- * ia->ri_max_frwr_depth
- * ia->ri_mrtype
+ * ep->re_attr
+ * ep->re_max_requests
+ * ep->re_max_rdma_segs
+ * ep->re_max_fr_depth
+ * ep->re_mrtype
*
* Return values:
* On success, returns zero.
* %-EINVAL - the device does not support FRWR memory registration
* %-ENOMEM - the device is not sufficiently capable for NFS/RDMA
*/
-int frwr_query_device(struct rpcrdma_xprt *r_xprt,
- const struct ib_device *device)
+int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device)
{
const struct ib_device_attr *attrs = &device->attrs;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
int max_qp_wr, depth, delta;
unsigned int max_sge;
@@ -188,23 +185,23 @@ int frwr_query_device(struct rpcrdma_xprt *r_xprt,
pr_err("rpcrdma: HCA provides only %u send SGEs\n", max_sge);
return -ENOMEM;
}
- ep->rep_attr.cap.max_send_sge = max_sge;
- ep->rep_attr.cap.max_recv_sge = 1;
+ ep->re_attr.cap.max_send_sge = max_sge;
+ ep->re_attr.cap.max_recv_sge = 1;
- ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
+ ep->re_mrtype = IB_MR_TYPE_MEM_REG;
if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
- ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;
+ ep->re_mrtype = IB_MR_TYPE_SG_GAPS;
/* Quirk: Some devices advertise a large max_fast_reg_page_list_len
* capability, but perform optimally when the MRs are not larger
* than a page.
*/
if (attrs->max_sge_rd > RPCRDMA_MAX_HDR_SEGS)
- ia->ri_max_frwr_depth = attrs->max_sge_rd;
+ ep->re_max_fr_depth = attrs->max_sge_rd;
else
- ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len;
- if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS)
- ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS;
+ ep->re_max_fr_depth = attrs->max_fast_reg_page_list_len;
+ if (ep->re_max_fr_depth > RPCRDMA_MAX_DATA_SEGS)
+ ep->re_max_fr_depth = RPCRDMA_MAX_DATA_SEGS;
/* Add room for frwr register and invalidate WRs.
* 1. FRWR reg WR for head
@@ -220,11 +217,11 @@ int frwr_query_device(struct rpcrdma_xprt *r_xprt,
/* Calculate N if the device max FRWR depth is smaller than
* RPCRDMA_MAX_DATA_SEGS.
*/
- if (ia->ri_max_frwr_depth < RPCRDMA_MAX_DATA_SEGS) {
- delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frwr_depth;
+ if (ep->re_max_fr_depth < RPCRDMA_MAX_DATA_SEGS) {
+ delta = RPCRDMA_MAX_DATA_SEGS - ep->re_max_fr_depth;
do {
depth += 2; /* FRWR reg + invalidate */
- delta -= ia->ri_max_frwr_depth;
+ delta -= ep->re_max_fr_depth;
} while (delta > 0);
}
@@ -233,34 +230,34 @@ int frwr_query_device(struct rpcrdma_xprt *r_xprt,
max_qp_wr -= 1;
if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
return -ENOMEM;
- if (ep->rep_max_requests > max_qp_wr)
- ep->rep_max_requests = max_qp_wr;
- ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
- if (ep->rep_attr.cap.max_send_wr > max_qp_wr) {
- ep->rep_max_requests = max_qp_wr / depth;
- if (!ep->rep_max_requests)
+ if (ep->re_max_requests > max_qp_wr)
+ ep->re_max_requests = max_qp_wr;
+ ep->re_attr.cap.max_send_wr = ep->re_max_requests * depth;
+ if (ep->re_attr.cap.max_send_wr > max_qp_wr) {
+ ep->re_max_requests = max_qp_wr / depth;
+ if (!ep->re_max_requests)
return -ENOMEM;
- ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
+ ep->re_attr.cap.max_send_wr = ep->re_max_requests * depth;
}
- ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
- ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
- ep->rep_attr.cap.max_recv_wr = ep->rep_max_requests;
- ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
- ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
-
- ia->ri_max_rdma_segs =
- DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth);
+ ep->re_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
+ ep->re_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
+ ep->re_attr.cap.max_recv_wr = ep->re_max_requests;
+ ep->re_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
+ ep->re_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
+
+ ep->re_max_rdma_segs =
+ DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ep->re_max_fr_depth);
/* Reply chunks require segments for head and tail buffers */
- ia->ri_max_rdma_segs += 2;
- if (ia->ri_max_rdma_segs > RPCRDMA_MAX_HDR_SEGS)
- ia->ri_max_rdma_segs = RPCRDMA_MAX_HDR_SEGS;
+ ep->re_max_rdma_segs += 2;
+ if (ep->re_max_rdma_segs > RPCRDMA_MAX_HDR_SEGS)
+ ep->re_max_rdma_segs = RPCRDMA_MAX_HDR_SEGS;
/* Ensure the underlying device is capable of conveying the
* largest r/wsize NFS will ask for. This guarantees that
* failing over from one RDMA device to another will not
* break NFS I/O.
*/
- if ((ia->ri_max_rdma_segs * ia->ri_max_frwr_depth) < RPCRDMA_MAX_SEGS)
+ if ((ep->re_max_rdma_segs * ep->re_max_fr_depth) < RPCRDMA_MAX_SEGS)
return -ENOMEM;
return 0;
@@ -286,14 +283,14 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
int nsegs, bool writing, __be32 xid,
struct rpcrdma_mr *mr)
{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
struct ib_reg_wr *reg_wr;
int i, n, dma_nents;
struct ib_mr *ibmr;
u8 key;
- if (nsegs > ia->ri_max_frwr_depth)
- nsegs = ia->ri_max_frwr_depth;
+ if (nsegs > ep->re_max_fr_depth)
+ nsegs = ep->re_max_fr_depth;
for (i = 0; i < nsegs;) {
if (seg->mr_page)
sg_set_page(&mr->mr_sg[i],
@@ -306,7 +303,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
++seg;
++i;
- if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS)
+ if (ep->re_mrtype == IB_MR_TYPE_SG_GAPS)
continue;
if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
@@ -315,7 +312,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
mr->mr_dir = rpcrdma_data_dir(writing);
mr->mr_nents = i;
- dma_nents = ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, mr->mr_nents,
+ dma_nents = ib_dma_map_sg(ep->re_id->device, mr->mr_sg, mr->mr_nents,
mr->mr_dir);
if (!dma_nents)
goto out_dmamap_err;
@@ -356,8 +353,8 @@ out_mapmr_err:
/**
* frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
+ * @cq: completion queue
+ * @wc: WCE for a completed FastReg WR
*
*/
static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
@@ -369,20 +366,25 @@ static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_fastreg(wc, frwr);
/* The MR will get recycled when the associated req is retransmitted */
+
+ rpcrdma_flush_disconnect(cq, wc);
}
/**
- * frwr_send - post Send WR containing the RPC Call message
- * @ia: interface adapter
- * @req: Prepared RPC Call
+ * frwr_send - post Send WRs containing the RPC Call message
+ * @r_xprt: controlling transport instance
+ * @req: prepared RPC Call
*
* For FRWR, chain any FastReg WRs to the Send WR. Only a
* single ib_post_send call is needed to register memory
* and then post the Send WR.
*
- * Returns the result of ib_post_send.
+ * Returns the return code from ib_post_send.
+ *
+ * Caller must hold the transport send lock to ensure that the
+ * pointers to the transport's rdma_cm_id and QP are stable.
*/
-int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+int frwr_send(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
struct ib_send_wr *post_wr;
struct rpcrdma_mr *mr;
@@ -403,7 +405,7 @@ int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
post_wr = &frwr->fr_regwr.wr;
}
- return ib_post_send(ia->ri_id->qp, post_wr, NULL);
+ return ib_post_send(r_xprt->rx_ep->re_id->qp, post_wr, NULL);
}
/**
@@ -419,7 +421,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
list_for_each_entry(mr, mrs, mr_list)
if (mr->mr_handle == rep->rr_inv_rkey) {
list_del_init(&mr->mr_list);
- trace_xprtrdma_mr_remoteinv(mr);
+ trace_xprtrdma_mr_reminv(mr);
rpcrdma_mr_put(mr);
break; /* only one invalidated MR per RPC */
}
@@ -435,8 +437,8 @@ static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
/**
* frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
+ * @cq: completion queue
+ * @wc: WCE for a completed LocalInv WR
*
*/
static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
@@ -449,12 +451,14 @@ static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_li(wc, frwr);
__frwr_release_mr(wc, mr);
+
+ rpcrdma_flush_disconnect(cq, wc);
}
/**
* frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
+ * @cq: completion queue
+ * @wc: WCE for a completed LocalInv WR
*
* Awaken anyone waiting for an MR to finish being fenced.
*/
@@ -469,6 +473,8 @@ static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
trace_xprtrdma_wc_li_wake(wc, frwr);
__frwr_release_mr(wc, mr);
complete(&frwr->fr_linv_done);
+
+ rpcrdma_flush_disconnect(cq, wc);
}
/**
@@ -526,10 +532,10 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
/* Transport disconnect drains the receive CQ before it
* replaces the QP. The RPC reply handler won't call us
- * unless ri_id->qp is a valid pointer.
+ * unless re_id->qp is a valid pointer.
*/
bad_wr = NULL;
- rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
+ rc = ib_post_send(r_xprt->rx_ep->re_id->qp, first, &bad_wr);
/* The final LOCAL_INV WR in the chain is supposed to
* do the wake. If it was never posted, the wake will
@@ -556,8 +562,8 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
/**
* frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
+ * @cq: completion queue
+ * @wc: WCE for a completed LocalInv WR
*
*/
static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
@@ -575,6 +581,8 @@ static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
/* Ensure @rep is generated before __frwr_release_mr */
smp_rmb();
rpcrdma_complete_rqst(rep);
+
+ rpcrdma_flush_disconnect(cq, wc);
}
/**
@@ -629,10 +637,10 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
/* Transport disconnect drains the receive CQ before it
* replaces the QP. The RPC reply handler won't call us
- * unless ri_id->qp is a valid pointer.
+ * unless re_id->qp is a valid pointer.
*/
bad_wr = NULL;
- rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
+ rc = ib_post_send(r_xprt->rx_ep->re_id->qp, first, &bad_wr);
if (!rc)
return;
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 577513b7642e..4a81e6995d3e 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -103,21 +103,20 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
/**
* rpcrdma_set_max_header_sizes - Initialize inline payload sizes
- * @r_xprt: transport instance to initialize
+ * @ep: endpoint to initialize
*
* The max_inline fields contain the maximum size of an RPC message
* so the marshaling code doesn't have to repeat this calculation
* for every RPC.
*/
-void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
+void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep)
{
- unsigned int maxsegs = r_xprt->rx_ia.ri_max_rdma_segs;
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ unsigned int maxsegs = ep->re_max_rdma_segs;
- ep->rep_max_inline_send =
- ep->rep_inline_send - rpcrdma_max_call_header_size(maxsegs);
- ep->rep_max_inline_recv =
- ep->rep_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
+ ep->re_max_inline_send =
+ ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs);
+ ep->re_max_inline_recv =
+ ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
}
/* The client can send a request inline as long as the RPCRDMA header
@@ -132,9 +131,10 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst *rqst)
{
struct xdr_buf *xdr = &rqst->rq_snd_buf;
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
unsigned int count, remaining, offset;
- if (xdr->len > r_xprt->rx_ep.rep_max_inline_send)
+ if (xdr->len > ep->re_max_inline_send)
return false;
if (xdr->page_len) {
@@ -145,7 +145,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
remaining -= min_t(unsigned int,
PAGE_SIZE - offset, remaining);
offset = 0;
- if (++count > r_xprt->rx_ep.rep_attr.cap.max_send_sge)
+ if (++count > ep->re_attr.cap.max_send_sge)
return false;
}
}
@@ -162,7 +162,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst *rqst)
{
- return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep.rep_max_inline_recv;
+ return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv;
}
/* The client is required to provide a Reply chunk if the maximum
@@ -176,7 +176,7 @@ rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
const struct xdr_buf *buf = &rqst->rq_rcv_buf;
return (buf->head[0].iov_len + buf->tail[0].iov_len) <
- r_xprt->rx_ep.rep_max_inline_recv;
+ r_xprt->rx_ep->re_max_inline_recv;
}
/* Split @vec on page boundaries into SGEs. FMR registers pages, not
@@ -255,7 +255,7 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
/* When encoding a Read chunk, the tail iovec contains an
* XDR pad and may be omitted.
*/
- if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
+ if (type == rpcrdma_readch && r_xprt->rx_ep->re_implicit_roundup)
goto out;
/* When encoding a Write chunk, some servers need to see an
@@ -263,7 +263,7 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
* layer provides space in the tail iovec that may be used
* for this purpose.
*/
- if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
+ if (type == rpcrdma_writech && r_xprt->rx_ep->re_implicit_roundup)
goto out;
if (xdrbuf->tail[0].iov_len)
@@ -1450,8 +1450,8 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
if (credits == 0)
credits = 1; /* don't deadlock */
- else if (credits > r_xprt->rx_ep.rep_max_requests)
- credits = r_xprt->rx_ep.rep_max_requests;
+ else if (credits > r_xprt->rx_ep->re_max_requests)
+ credits = r_xprt->rx_ep->re_max_requests;
if (buf->rb_credits != credits)
rpcrdma_update_cwnd(r_xprt, credits);
rpcrdma_post_recvs(r_xprt, false);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 3cfeba68ee9a..659da37020a4 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -240,9 +240,10 @@ xprt_rdma_connect_worker(struct work_struct *work)
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
int rc;
- rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
+ rc = rpcrdma_xprt_connect(r_xprt);
xprt_clear_connecting(xprt);
- if (r_xprt->rx_ep.rep_connected > 0) {
+ if (r_xprt->rx_ep && r_xprt->rx_ep->re_connect_status > 0) {
+ xprt->connect_cookie++;
xprt->stat.connect_count++;
xprt->stat.connect_time += (long)jiffies -
xprt->stat.connect_start;
@@ -265,7 +266,7 @@ xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
trace_xprtrdma_op_inject_dsc(r_xprt);
- rdma_disconnect(r_xprt->rx_ia.ri_id);
+ rdma_disconnect(r_xprt->rx_ep->re_id);
}
/**
@@ -284,9 +285,8 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
- rpcrdma_ep_destroy(r_xprt);
+ rpcrdma_xprt_disconnect(r_xprt);
rpcrdma_buffer_destroy(&r_xprt->rx_buf);
- rpcrdma_ia_close(&r_xprt->rx_ia);
xprt_rdma_free_addresses(xprt);
xprt_free(xprt);
@@ -316,10 +316,15 @@ xprt_setup_rdma(struct xprt_create *args)
if (args->addrlen > sizeof(xprt->addr))
return ERR_PTR(-EBADF);
+ if (!try_module_get(THIS_MODULE))
+ return ERR_PTR(-EIO);
+
xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0,
xprt_rdma_slot_table_entries);
- if (!xprt)
+ if (!xprt) {
+ module_put(THIS_MODULE);
return ERR_PTR(-ENOMEM);
+ }
xprt->timeout = &xprt_rdma_default_timeout;
xprt->connect_timeout = xprt->timeout->to_initval;
@@ -347,23 +352,17 @@ xprt_setup_rdma(struct xprt_create *args)
xprt_rdma_format_addresses(xprt, sap);
new_xprt = rpcx_to_rdmax(xprt);
- rc = rpcrdma_ia_open(new_xprt);
- if (rc)
- goto out1;
-
- rc = rpcrdma_ep_create(new_xprt);
- if (rc)
- goto out2;
-
rc = rpcrdma_buffer_create(new_xprt);
- if (rc)
- goto out3;
-
- if (!try_module_get(THIS_MODULE))
- goto out4;
+ if (rc) {
+ xprt_rdma_free_addresses(xprt);
+ xprt_free(xprt);
+ module_put(THIS_MODULE);
+ return ERR_PTR(rc);
+ }
INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
xprt_rdma_connect_worker);
+
xprt->max_payload = RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
dprintk("RPC: %s: %s:%s\n", __func__,
@@ -371,19 +370,6 @@ xprt_setup_rdma(struct xprt_create *args)
xprt->address_strings[RPC_DISPLAY_PORT]);
trace_xprtrdma_create(new_xprt);
return xprt;
-
-out4:
- rpcrdma_buffer_destroy(&new_xprt->rx_buf);
- rc = -ENODEV;
-out3:
- rpcrdma_ep_destroy(new_xprt);
-out2:
- rpcrdma_ia_close(&new_xprt->rx_ia);
-out1:
- trace_xprtrdma_op_destroy(new_xprt);
- xprt_rdma_free_addresses(xprt);
- xprt_free(xprt);
- return ERR_PTR(rc);
}
/**
@@ -398,26 +384,11 @@ out1:
void xprt_rdma_close(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-
- might_sleep();
trace_xprtrdma_op_close(r_xprt);
- /* Prevent marshaling and sending of new requests */
- xprt_clear_connected(xprt);
-
- if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
- rpcrdma_ia_remove(ia);
- goto out;
- }
-
- if (ep->rep_connected == -ENODEV)
- return;
- rpcrdma_ep_disconnect(ep, ia);
+ rpcrdma_xprt_disconnect(r_xprt);
-out:
xprt->reestablish_timeout = 0;
++xprt->connect_cookie;
xprt_disconnect_done(xprt);
@@ -517,10 +488,11 @@ static void
xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
unsigned long delay;
delay = 0;
- if (r_xprt->rx_ep.rep_connected != 0) {
+ if (ep && ep->re_connect_status != 0) {
delay = xprt_reconnect_delay(xprt);
xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
}
@@ -694,7 +666,7 @@ xprt_rdma_send_request(struct rpc_rqst *rqst)
goto drop_connection;
rqst->rq_xtime = ktime_get();
- if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
+ if (rpcrdma_post_sends(r_xprt, req))
goto drop_connection;
rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 353f61ac8d51..cdd84c09df10 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -84,6 +84,7 @@ static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep);
static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
+static int rpcrdma_ep_destroy(struct rpcrdma_ep *ep);
static struct rpcrdma_regbuf *
rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
gfp_t flags);
@@ -96,17 +97,17 @@ static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
*/
static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rdma_cm_id *id = r_xprt->rx_ep->re_id;
/* Flush Receives, then wait for deferred Reply work
* to complete.
*/
- ib_drain_rq(ia->ri_id->qp);
+ ib_drain_rq(id->qp);
/* Deferred Reply processing might have scheduled
* local invalidations.
*/
- ib_drain_sq(ia->ri_id->qp);
+ ib_drain_sq(id->qp);
}
/**
@@ -115,26 +116,43 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
* @context: ep that owns QP where event occurred
*
* Called from the RDMA provider (device driver) possibly in an interrupt
- * context.
+ * context. The QP is always destroyed before the ID, so the ID will be
+ * reliably available when this handler is invoked.
*/
-static void
-rpcrdma_qp_event_handler(struct ib_event *event, void *context)
+static void rpcrdma_qp_event_handler(struct ib_event *event, void *context)
{
struct rpcrdma_ep *ep = context;
- struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
- rx_ep);
- trace_xprtrdma_qp_event(r_xprt, event);
+ trace_xprtrdma_qp_event(ep, event);
+}
+
+/**
+ * rpcrdma_flush_disconnect - Disconnect on flushed completion
+ * @cq: completion queue
+ * @wc: work completion entry
+ *
+ * Must be called in process context.
+ */
+void rpcrdma_flush_disconnect(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct rpcrdma_xprt *r_xprt = cq->cq_context;
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+
+ if (wc->status != IB_WC_SUCCESS &&
+ r_xprt->rx_ep->re_connect_status == 1) {
+ r_xprt->rx_ep->re_connect_status = -ECONNABORTED;
+ trace_xprtrdma_flush_dct(r_xprt, wc->status);
+ xprt_force_disconnect(xprt);
+ }
}
/**
* rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
* @cq: completion queue
- * @wc: completed WR
+ * @wc: WCE for a completed Send WR
*
*/
-static void
-rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
+static void rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
{
struct ib_cqe *cqe = wc->wr_cqe;
struct rpcrdma_sendctx *sc =
@@ -143,25 +161,25 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_send(sc, wc);
rpcrdma_sendctx_put_locked((struct rpcrdma_xprt *)cq->cq_context, sc);
+ rpcrdma_flush_disconnect(cq, wc);
}
/**
* rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
+ * @cq: completion queue
+ * @wc: WCE for a completed Receive WR
*
*/
-static void
-rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
+static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
{
struct ib_cqe *cqe = wc->wr_cqe;
struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
rr_cqe);
- struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+ struct rpcrdma_xprt *r_xprt = cq->cq_context;
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_receive(wc);
- --r_xprt->rx_ep.rep_receive_count;
+ --r_xprt->rx_ep->re_receive_count;
if (wc->status != IB_WC_SUCCESS)
goto out_flushed;
@@ -178,35 +196,35 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
return;
out_flushed:
+ rpcrdma_flush_disconnect(cq, wc);
rpcrdma_rep_destroy(rep);
}
-static void rpcrdma_update_cm_private(struct rpcrdma_xprt *r_xprt,
+static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep,
struct rdma_conn_param *param)
{
const struct rpcrdma_connect_private *pmsg = param->private_data;
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
unsigned int rsize, wsize;
/* Default settings for RPC-over-RDMA Version One */
- r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize;
+ ep->re_implicit_roundup = xprt_rdma_pad_optimize;
rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
if (pmsg &&
pmsg->cp_magic == rpcrdma_cmp_magic &&
pmsg->cp_version == RPCRDMA_CMP_VERSION) {
- r_xprt->rx_ia.ri_implicit_roundup = true;
+ ep->re_implicit_roundup = true;
rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
}
- if (rsize < ep->rep_inline_recv)
- ep->rep_inline_recv = rsize;
- if (wsize < ep->rep_inline_send)
- ep->rep_inline_send = wsize;
+ if (rsize < ep->re_inline_recv)
+ ep->re_inline_recv = rsize;
+ if (wsize < ep->re_inline_send)
+ ep->re_inline_send = wsize;
- rpcrdma_set_max_header_sizes(r_xprt);
+ rpcrdma_set_max_header_sizes(ep);
}
/**
@@ -220,116 +238,103 @@ static void rpcrdma_update_cm_private(struct rpcrdma_xprt *r_xprt,
static int
rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
- struct rpcrdma_xprt *r_xprt = id->context;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
- struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ struct sockaddr *sap = (struct sockaddr *)&id->route.addr.dst_addr;
+ struct rpcrdma_ep *ep = id->context;
+ struct rpc_xprt *xprt = ep->re_xprt;
might_sleep();
- trace_xprtrdma_cm_event(r_xprt, event);
switch (event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
case RDMA_CM_EVENT_ROUTE_RESOLVED:
- ia->ri_async_rc = 0;
- complete(&ia->ri_done);
+ ep->re_async_rc = 0;
+ complete(&ep->re_done);
return 0;
case RDMA_CM_EVENT_ADDR_ERROR:
- ia->ri_async_rc = -EPROTO;
- complete(&ia->ri_done);
+ ep->re_async_rc = -EPROTO;
+ complete(&ep->re_done);
return 0;
case RDMA_CM_EVENT_ROUTE_ERROR:
- ia->ri_async_rc = -ENETUNREACH;
- complete(&ia->ri_done);
+ ep->re_async_rc = -ENETUNREACH;
+ complete(&ep->re_done);
return 0;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
- pr_info("rpcrdma: removing device %s for %s:%s\n",
- ia->ri_id->device->name,
- rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt));
-#endif
- init_completion(&ia->ri_remove_done);
- set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
- ep->rep_connected = -ENODEV;
+ pr_info("rpcrdma: removing device %s for %pISpc\n",
+ ep->re_id->device->name, sap);
+ /* fall through */
+ case RDMA_CM_EVENT_ADDR_CHANGE:
+ ep->re_connect_status = -ENODEV;
xprt_force_disconnect(xprt);
- wait_for_completion(&ia->ri_remove_done);
-
- ia->ri_id = NULL;
- /* Return 1 to ensure the core destroys the id. */
- return 1;
+ goto disconnected;
case RDMA_CM_EVENT_ESTABLISHED:
- ++xprt->connect_cookie;
- ep->rep_connected = 1;
- rpcrdma_update_cm_private(r_xprt, &event->param.conn);
- trace_xprtrdma_inline_thresh(r_xprt);
- wake_up_all(&ep->rep_connect_wait);
+ kref_get(&ep->re_kref);
+ ep->re_connect_status = 1;
+ rpcrdma_update_cm_private(ep, &event->param.conn);
+ trace_xprtrdma_inline_thresh(ep);
+ wake_up_all(&ep->re_connect_wait);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
- ep->rep_connected = -ENOTCONN;
+ ep->re_connect_status = -ENOTCONN;
goto disconnected;
case RDMA_CM_EVENT_UNREACHABLE:
- ep->rep_connected = -ENETUNREACH;
+ ep->re_connect_status = -ENETUNREACH;
goto disconnected;
case RDMA_CM_EVENT_REJECTED:
- dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
- rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
- rdma_reject_msg(id, event->status));
- ep->rep_connected = -ECONNREFUSED;
+ dprintk("rpcrdma: connection to %pISpc rejected: %s\n",
+ sap, rdma_reject_msg(id, event->status));
+ ep->re_connect_status = -ECONNREFUSED;
if (event->status == IB_CM_REJ_STALE_CONN)
- ep->rep_connected = -EAGAIN;
+ ep->re_connect_status = -EAGAIN;
goto disconnected;
case RDMA_CM_EVENT_DISCONNECTED:
- ep->rep_connected = -ECONNABORTED;
+ ep->re_connect_status = -ECONNABORTED;
disconnected:
- xprt_force_disconnect(xprt);
- wake_up_all(&ep->rep_connect_wait);
- break;
+ return rpcrdma_ep_destroy(ep);
default:
break;
}
- dprintk("RPC: %s: %s:%s on %s/frwr: %s\n", __func__,
- rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
- ia->ri_id->device->name, rdma_event_msg(event->event));
+ dprintk("RPC: %s: %pISpc on %s/frwr: %s\n", __func__, sap,
+ ep->re_id->device->name, rdma_event_msg(event->event));
return 0;
}
-static struct rdma_cm_id *
-rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
+static struct rdma_cm_id *rpcrdma_create_id(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_ep *ep)
{
unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
struct rdma_cm_id *id;
int rc;
- init_completion(&ia->ri_done);
+ init_completion(&ep->re_done);
- id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler,
- xprt, RDMA_PS_TCP, IB_QPT_RC);
+ id = rdma_create_id(xprt->xprt_net, rpcrdma_cm_event_handler, ep,
+ RDMA_PS_TCP, IB_QPT_RC);
if (IS_ERR(id))
return id;
- ia->ri_async_rc = -ETIMEDOUT;
- rc = rdma_resolve_addr(id, NULL,
- (struct sockaddr *)&xprt->rx_xprt.addr,
+ ep->re_async_rc = -ETIMEDOUT;
+ rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)&xprt->addr,
RDMA_RESOLVE_TIMEOUT);
if (rc)
goto out;
- rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
+ rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
if (rc < 0)
goto out;
- rc = ia->ri_async_rc;
+ rc = ep->re_async_rc;
if (rc)
goto out;
- ia->ri_async_rc = -ETIMEDOUT;
+ ep->re_async_rc = -ETIMEDOUT;
rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
if (rc)
goto out;
- rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
+ rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
if (rc < 0)
goto out;
- rc = ia->ri_async_rc;
+ rc = ep->re_async_rc;
if (rc)
goto out;
@@ -340,356 +345,181 @@ out:
return ERR_PTR(rc);
}
-/*
- * Exported functions.
- */
-
-/**
- * rpcrdma_ia_open - Open and initialize an Interface Adapter.
- * @xprt: transport with IA to (re)initialize
- *
- * Returns 0 on success, negative errno if an appropriate
- * Interface Adapter could not be found and opened.
- */
-int
-rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
+static void rpcrdma_ep_put(struct kref *kref)
{
- struct rpcrdma_ia *ia = &xprt->rx_ia;
- int rc;
+ struct rpcrdma_ep *ep = container_of(kref, struct rpcrdma_ep, re_kref);
- ia->ri_id = rpcrdma_create_id(xprt, ia);
- if (IS_ERR(ia->ri_id)) {
- rc = PTR_ERR(ia->ri_id);
- goto out_err;
+ if (ep->re_id->qp) {
+ rdma_destroy_qp(ep->re_id);
+ ep->re_id->qp = NULL;
}
- ia->ri_pd = ib_alloc_pd(ia->ri_id->device, 0);
- if (IS_ERR(ia->ri_pd)) {
- rc = PTR_ERR(ia->ri_pd);
- pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
- goto out_err;
- }
+ if (ep->re_attr.recv_cq)
+ ib_free_cq(ep->re_attr.recv_cq);
+ ep->re_attr.recv_cq = NULL;
+ if (ep->re_attr.send_cq)
+ ib_free_cq(ep->re_attr.send_cq);
+ ep->re_attr.send_cq = NULL;
- return 0;
+ if (ep->re_pd)
+ ib_dealloc_pd(ep->re_pd);
+ ep->re_pd = NULL;
-out_err:
- rpcrdma_ia_close(ia);
- return rc;
+ kfree(ep);
+ module_put(THIS_MODULE);
}
-/**
- * rpcrdma_ia_remove - Handle device driver unload
- * @ia: interface adapter being removed
- *
- * Divest transport H/W resources associated with this adapter,
- * but allow it to be restored later.
- *
- * Caller must hold the transport send lock.
+/* Returns:
+ * %0 if @ep still has a positive kref count, or
+ * %1 if @ep was destroyed successfully.
*/
-void
-rpcrdma_ia_remove(struct rpcrdma_ia *ia)
+static int rpcrdma_ep_destroy(struct rpcrdma_ep *ep)
{
- struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
- rx_ia);
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
-
- /* This is similar to rpcrdma_ep_destroy, but:
- * - Don't cancel the connect worker.
- * - Don't call rpcrdma_ep_disconnect, which waits
- * for another conn upcall, which will deadlock.
- * - rdma_disconnect is unneeded, the underlying
- * connection is already gone.
- */
- if (ia->ri_id->qp) {
- rpcrdma_xprt_drain(r_xprt);
- rdma_destroy_qp(ia->ri_id);
- ia->ri_id->qp = NULL;
- }
- ib_free_cq(ep->rep_attr.recv_cq);
- ep->rep_attr.recv_cq = NULL;
- ib_free_cq(ep->rep_attr.send_cq);
- ep->rep_attr.send_cq = NULL;
-
- /* The ULP is responsible for ensuring all DMA
- * mappings and MRs are gone.
- */
- rpcrdma_reps_unmap(r_xprt);
- rpcrdma_reqs_reset(r_xprt);
- rpcrdma_mrs_destroy(r_xprt);
- rpcrdma_sendctxs_destroy(r_xprt);
- ib_dealloc_pd(ia->ri_pd);
- ia->ri_pd = NULL;
-
- /* Allow waiters to continue */
- complete(&ia->ri_remove_done);
-
- trace_xprtrdma_remove(r_xprt);
-}
-
-/**
- * rpcrdma_ia_close - Clean up/close an IA.
- * @ia: interface adapter to close
- *
- */
-void
-rpcrdma_ia_close(struct rpcrdma_ia *ia)
-{
- if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
- if (ia->ri_id->qp)
- rdma_destroy_qp(ia->ri_id);
- rdma_destroy_id(ia->ri_id);
- }
- ia->ri_id = NULL;
-
- /* If the pd is still busy, xprtrdma missed freeing a resource */
- if (ia->ri_pd && !IS_ERR(ia->ri_pd))
- ib_dealloc_pd(ia->ri_pd);
- ia->ri_pd = NULL;
+ return kref_put(&ep->re_kref, rpcrdma_ep_put);
}
-/**
- * rpcrdma_ep_create - Create unconnected endpoint
- * @r_xprt: transport to instantiate
- *
- * Returns zero on success, or a negative errno.
- */
-int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
+static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
- struct ib_cq *sendcq, *recvcq;
+ struct rpcrdma_connect_private *pmsg;
+ struct ib_device *device;
+ struct rdma_cm_id *id;
+ struct rpcrdma_ep *ep;
int rc;
- ep->rep_max_requests = r_xprt->rx_xprt.max_reqs;
- ep->rep_inline_send = xprt_rdma_max_inline_write;
- ep->rep_inline_recv = xprt_rdma_max_inline_read;
+ ep = kzalloc(sizeof(*ep), GFP_NOFS);
+ if (!ep)
+ return -EAGAIN;
+ ep->re_xprt = &r_xprt->rx_xprt;
+ kref_init(&ep->re_kref);
- rc = frwr_query_device(r_xprt, ia->ri_id->device);
+ id = rpcrdma_create_id(r_xprt, ep);
+ if (IS_ERR(id)) {
+ rc = PTR_ERR(id);
+ goto out_free;
+ }
+ __module_get(THIS_MODULE);
+ device = id->device;
+ ep->re_id = id;
+
+ ep->re_max_requests = r_xprt->rx_xprt.max_reqs;
+ ep->re_inline_send = xprt_rdma_max_inline_write;
+ ep->re_inline_recv = xprt_rdma_max_inline_read;
+ rc = frwr_query_device(ep, device);
if (rc)
- return rc;
- r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->rep_max_requests);
+ goto out_destroy;
+
+ r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests);
- ep->rep_attr.event_handler = rpcrdma_qp_event_handler;
- ep->rep_attr.qp_context = ep;
- ep->rep_attr.srq = NULL;
- ep->rep_attr.cap.max_inline_data = 0;
- ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
- ep->rep_attr.qp_type = IB_QPT_RC;
- ep->rep_attr.port_num = ~0;
+ ep->re_attr.event_handler = rpcrdma_qp_event_handler;
+ ep->re_attr.qp_context = ep;
+ ep->re_attr.srq = NULL;
+ ep->re_attr.cap.max_inline_data = 0;
+ ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+ ep->re_attr.qp_type = IB_QPT_RC;
+ ep->re_attr.port_num = ~0;
dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
"iovs: send %d recv %d\n",
__func__,
- ep->rep_attr.cap.max_send_wr,
- ep->rep_attr.cap.max_recv_wr,
- ep->rep_attr.cap.max_send_sge,
- ep->rep_attr.cap.max_recv_sge);
-
- ep->rep_send_batch = ep->rep_max_requests >> 3;
- ep->rep_send_count = ep->rep_send_batch;
- init_waitqueue_head(&ep->rep_connect_wait);
- ep->rep_receive_count = 0;
-
- sendcq = ib_alloc_cq_any(ia->ri_id->device, r_xprt,
- ep->rep_attr.cap.max_send_wr + 1,
- IB_POLL_WORKQUEUE);
- if (IS_ERR(sendcq)) {
- rc = PTR_ERR(sendcq);
- goto out1;
+ ep->re_attr.cap.max_send_wr,
+ ep->re_attr.cap.max_recv_wr,
+ ep->re_attr.cap.max_send_sge,
+ ep->re_attr.cap.max_recv_sge);
+
+ ep->re_send_batch = ep->re_max_requests >> 3;
+ ep->re_send_count = ep->re_send_batch;
+ init_waitqueue_head(&ep->re_connect_wait);
+
+ ep->re_attr.send_cq = ib_alloc_cq_any(device, r_xprt,
+ ep->re_attr.cap.max_send_wr,
+ IB_POLL_WORKQUEUE);
+ if (IS_ERR(ep->re_attr.send_cq)) {
+ rc = PTR_ERR(ep->re_attr.send_cq);
+ goto out_destroy;
}
- recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
- ep->rep_attr.cap.max_recv_wr + 1,
- IB_POLL_WORKQUEUE);
- if (IS_ERR(recvcq)) {
- rc = PTR_ERR(recvcq);
- goto out2;
+ ep->re_attr.recv_cq = ib_alloc_cq_any(device, r_xprt,
+ ep->re_attr.cap.max_recv_wr,
+ IB_POLL_WORKQUEUE);
+ if (IS_ERR(ep->re_attr.recv_cq)) {
+ rc = PTR_ERR(ep->re_attr.recv_cq);
+ goto out_destroy;
}
-
- ep->rep_attr.send_cq = sendcq;
- ep->rep_attr.recv_cq = recvcq;
+ ep->re_receive_count = 0;
/* Initialize cma parameters */
- memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
+ memset(&ep->re_remote_cma, 0, sizeof(ep->re_remote_cma));
/* Prepare RDMA-CM private message */
+ pmsg = &ep->re_cm_private;
pmsg->cp_magic = rpcrdma_cmp_magic;
pmsg->cp_version = RPCRDMA_CMP_VERSION;
pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
- pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->rep_inline_send);
- pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->rep_inline_recv);
- ep->rep_remote_cma.private_data = pmsg;
- ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
+ pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->re_inline_send);
+ pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->re_inline_recv);
+ ep->re_remote_cma.private_data = pmsg;
+ ep->re_remote_cma.private_data_len = sizeof(*pmsg);
/* Client offers RDMA Read but does not initiate */
- ep->rep_remote_cma.initiator_depth = 0;
- ep->rep_remote_cma.responder_resources =
- min_t(int, U8_MAX, ia->ri_id->device->attrs.max_qp_rd_atom);
+ ep->re_remote_cma.initiator_depth = 0;
+ ep->re_remote_cma.responder_resources =
+ min_t(int, U8_MAX, device->attrs.max_qp_rd_atom);
/* Limit transport retries so client can detect server
* GID changes quickly. RPC layer handles re-establishing
* transport connection and retransmission.
*/
- ep->rep_remote_cma.retry_count = 6;
+ ep->re_remote_cma.retry_count = 6;
/* RPC-over-RDMA handles its own flow control. In addition,
* make all RNR NAKs visible so we know that RPC-over-RDMA
* flow control is working correctly (no NAKs should be seen).
*/
- ep->rep_remote_cma.flow_control = 0;
- ep->rep_remote_cma.rnr_retry_count = 0;
+ ep->re_remote_cma.flow_control = 0;
+ ep->re_remote_cma.rnr_retry_count = 0;
- return 0;
-
-out2:
- ib_free_cq(sendcq);
-out1:
- return rc;
-}
-
-/**
- * rpcrdma_ep_destroy - Disconnect and destroy endpoint.
- * @r_xprt: transport instance to shut down
- *
- */
-void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt)
-{
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-
- if (ia->ri_id && ia->ri_id->qp) {
- rpcrdma_ep_disconnect(ep, ia);
- rdma_destroy_qp(ia->ri_id);
- ia->ri_id->qp = NULL;
- }
-
- if (ep->rep_attr.recv_cq)
- ib_free_cq(ep->rep_attr.recv_cq);
- if (ep->rep_attr.send_cq)
- ib_free_cq(ep->rep_attr.send_cq);
-}
-
-/* Re-establish a connection after a device removal event.
- * Unlike a normal reconnection, a fresh PD and a new set
- * of MRs and buffers is needed.
- */
-static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
- struct ib_qp_init_attr *qp_init_attr)
-{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
- int rc, err;
-
- trace_xprtrdma_reinsert(r_xprt);
-
- rc = -EHOSTUNREACH;
- if (rpcrdma_ia_open(r_xprt))
- goto out1;
-
- rc = -ENOMEM;
- err = rpcrdma_ep_create(r_xprt);
- if (err) {
- pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
- goto out2;
- }
- memcpy(qp_init_attr, &ep->rep_attr, sizeof(*qp_init_attr));
-
- rc = -ENETUNREACH;
- err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr);
- if (err) {
- pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
- goto out3;
- }
- return 0;
-
-out3:
- rpcrdma_ep_destroy(r_xprt);
-out2:
- rpcrdma_ia_close(ia);
-out1:
- return rc;
-}
-
-static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
- struct ib_qp_init_attr *qp_init_attr)
-{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rdma_cm_id *id, *old;
- int err, rc;
-
- rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
-
- rc = -EHOSTUNREACH;
- id = rpcrdma_create_id(r_xprt, ia);
- if (IS_ERR(id))
- goto out;
-
- /* As long as the new ID points to the same device as the
- * old ID, we can reuse the transport's existing PD and all
- * previously allocated MRs. Also, the same device means
- * the transport's previous DMA mappings are still valid.
- *
- * This is a sanity check only. There should be no way these
- * point to two different devices here.
- */
- old = id;
- rc = -ENETUNREACH;
- if (ia->ri_id->device != id->device) {
- pr_err("rpcrdma: can't reconnect on different device!\n");
+ ep->re_pd = ib_alloc_pd(device, 0);
+ if (IS_ERR(ep->re_pd)) {
+ rc = PTR_ERR(ep->re_pd);
goto out_destroy;
}
- err = rdma_create_qp(id, ia->ri_pd, qp_init_attr);
- if (err)
+ rc = rdma_create_qp(id, ep->re_pd, &ep->re_attr);
+ if (rc)
goto out_destroy;
- /* Atomically replace the transport's ID and QP. */
- rc = 0;
- old = ia->ri_id;
- ia->ri_id = id;
- rdma_destroy_qp(old);
+ r_xprt->rx_ep = ep;
+ return 0;
out_destroy:
- rdma_destroy_id(old);
-out:
+ rpcrdma_ep_destroy(ep);
+ rdma_destroy_id(id);
+out_free:
+ kfree(ep);
+ r_xprt->rx_ep = NULL;
return rc;
}
-/*
- * Connect unconnected endpoint.
+/**
+ * rpcrdma_xprt_connect - Connect an unconnected transport
+ * @r_xprt: controlling transport instance
+ *
+ * Returns 0 on success or a negative errno.
*/
-int
-rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
- rx_ia);
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
- struct ib_qp_init_attr qp_init_attr;
+ struct rpcrdma_ep *ep;
int rc;
retry:
- memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
- switch (ep->rep_connected) {
- case 0:
- rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
- if (rc) {
- rc = -ENETUNREACH;
- goto out_noupdate;
- }
- break;
- case -ENODEV:
- rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr);
- if (rc)
- goto out_noupdate;
- break;
- default:
- rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr);
- if (rc)
- goto out;
- }
+ rpcrdma_xprt_disconnect(r_xprt);
+ rc = rpcrdma_ep_create(r_xprt);
+ if (rc)
+ return rc;
+ ep = r_xprt->rx_ep;
- ep->rep_connected = 0;
+ ep->re_connect_status = 0;
xprt_clear_connected(xprt);
rpcrdma_reset_cwnd(r_xprt);
@@ -699,64 +529,68 @@ retry:
if (rc)
goto out;
- rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
+ rc = rdma_connect(ep->re_id, &ep->re_remote_cma);
if (rc)
goto out;
if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
- wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
- if (ep->rep_connected <= 0) {
- if (ep->rep_connected == -EAGAIN)
+ wait_event_interruptible(ep->re_connect_wait,
+ ep->re_connect_status != 0);
+ if (ep->re_connect_status <= 0) {
+ if (ep->re_connect_status == -EAGAIN)
goto retry;
- rc = ep->rep_connected;
+ rc = ep->re_connect_status;
goto out;
}
rc = rpcrdma_reqs_setup(r_xprt);
if (rc) {
- rpcrdma_ep_disconnect(ep, ia);
+ rpcrdma_xprt_disconnect(r_xprt);
goto out;
}
rpcrdma_mrs_create(r_xprt);
out:
if (rc)
- ep->rep_connected = rc;
-
-out_noupdate:
+ ep->re_connect_status = rc;
trace_xprtrdma_connect(r_xprt, rc);
return rc;
}
/**
- * rpcrdma_ep_disconnect - Disconnect underlying transport
- * @ep: endpoint to disconnect
- * @ia: associated interface adapter
+ * rpcrdma_xprt_disconnect - Disconnect underlying transport
+ * @r_xprt: controlling transport instance
*
* Caller serializes. Either the transport send lock is held,
* or we're being called to destroy the transport.
+ *
+ * On return, @r_xprt is completely divested of all hardware
+ * resources and prepared for the next ->connect operation.
*/
-void
-rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
- rx_ep);
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
+ struct rdma_cm_id *id;
int rc;
- /* returns without wait if ID is not connected */
- rc = rdma_disconnect(ia->ri_id);
- if (!rc)
- wait_event_interruptible(ep->rep_connect_wait,
- ep->rep_connected != 1);
- else
- ep->rep_connected = rc;
+ if (!ep)
+ return;
+
+ id = ep->re_id;
+ rc = rdma_disconnect(id);
trace_xprtrdma_disconnect(r_xprt, rc);
rpcrdma_xprt_drain(r_xprt);
+ rpcrdma_reps_unmap(r_xprt);
rpcrdma_reqs_reset(r_xprt);
rpcrdma_mrs_destroy(r_xprt);
rpcrdma_sendctxs_destroy(r_xprt);
+
+ if (rpcrdma_ep_destroy(ep))
+ rdma_destroy_id(id);
+
+ r_xprt->rx_ep = NULL;
}
/* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -793,7 +627,7 @@ static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep)
{
struct rpcrdma_sendctx *sc;
- sc = kzalloc(struct_size(sc, sc_sges, ep->rep_attr.cap.max_send_sge),
+ sc = kzalloc(struct_size(sc, sc_sges, ep->re_attr.cap.max_send_sge),
GFP_KERNEL);
if (!sc)
return NULL;
@@ -813,14 +647,14 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
* the ->send_request call to fail temporarily before too many
* Sends are posted.
*/
- i = r_xprt->rx_ep.rep_max_requests + RPCRDMA_MAX_BC_REQUESTS;
+ i = r_xprt->rx_ep->re_max_requests + RPCRDMA_MAX_BC_REQUESTS;
buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
if (!buf->rb_sc_ctxs)
return -ENOMEM;
buf->rb_sc_last = i - 1;
for (i = 0; i <= buf->rb_sc_last; i++) {
- sc = rpcrdma_sendctx_create(&r_xprt->rx_ep);
+ sc = rpcrdma_sendctx_create(r_xprt->rx_ep);
if (!sc)
return -ENOMEM;
@@ -924,10 +758,10 @@ static void
rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
unsigned int count;
- for (count = 0; count < ia->ri_max_rdma_segs; count++) {
+ for (count = 0; count < ep->re_max_rdma_segs; count++) {
struct rpcrdma_mr *mr;
int rc;
@@ -935,14 +769,12 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
if (!mr)
break;
- rc = frwr_init_mr(ia, mr);
+ rc = frwr_mr_init(r_xprt, mr);
if (rc) {
kfree(mr);
break;
}
- mr->mr_xprt = r_xprt;
-
spin_lock(&buf->rb_lock);
rpcrdma_mr_push(mr, &buf->rb_mrs);
list_add(&mr->mr_all, &buf->rb_all_mrs);
@@ -973,12 +805,12 @@ rpcrdma_mr_refresh_worker(struct work_struct *work)
void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
- /* If there is no underlying device, it's no use to
- * wake the refresh worker.
+ /* If there is no underlying connection, it's no use
+ * to wake the refresh worker.
*/
- if (ep->rep_connected != -ENODEV) {
+ if (ep->re_connect_status == 1) {
/* The work is scheduled on a WQ_MEM_RECLAIM
* workqueue in order to prevent MR allocation
* from recursing into NFS during direct reclaim.
@@ -1042,7 +874,7 @@ int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
/* Compute maximum header buffer size in bytes */
maxhdrsize = rpcrdma_fixed_maxsz + 3 +
- r_xprt->rx_ia.ri_max_rdma_segs * rpcrdma_readchunk_maxsz;
+ r_xprt->rx_ep->re_max_rdma_segs * rpcrdma_readchunk_maxsz;
maxhdrsize *= sizeof(__be32);
rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
DMA_TO_DEVICE, GFP_KERNEL);
@@ -1120,7 +952,7 @@ struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
if (rep == NULL)
goto out;
- rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep.rep_inline_recv,
+ rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep->re_inline_recv,
DMA_FROM_DEVICE, GFP_KERNEL);
if (!rep->rr_rdmabuf)
goto out_free;
@@ -1345,7 +1177,7 @@ void rpcrdma_mr_put(struct rpcrdma_mr *mr)
if (mr->mr_dir != DMA_NONE) {
trace_xprtrdma_mr_unmap(mr);
- ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
+ ib_dma_unmap_sg(r_xprt->rx_ep->re_id->device,
mr->mr_sg, mr->mr_nents, mr->mr_dir);
mr->mr_dir = DMA_NONE;
}
@@ -1463,7 +1295,7 @@ bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_regbuf *rb)
{
- struct ib_device *device = r_xprt->rx_ia.ri_id->device;
+ struct ib_device *device = r_xprt->rx_ep->re_id->device;
if (rb->rg_direction == DMA_NONE)
return false;
@@ -1476,7 +1308,7 @@ bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
}
rb->rg_device = device;
- rb->rg_iov.lkey = r_xprt->rx_ia.ri_pd->local_dma_lkey;
+ rb->rg_iov.lkey = r_xprt->rx_ep->re_pd->local_dma_lkey;
return true;
}
@@ -1502,31 +1334,28 @@ static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
}
/**
- * rpcrdma_ep_post - Post WRs to a transport's Send Queue
- * @ia: transport's device information
- * @ep: transport's RDMA endpoint information
+ * rpcrdma_post_sends - Post WRs to a transport's Send Queue
+ * @r_xprt: controlling transport instance
* @req: rpcrdma_req containing the Send WR to post
*
* Returns 0 if the post was successful, otherwise -ENOTCONN
* is returned.
*/
-int
-rpcrdma_ep_post(struct rpcrdma_ia *ia,
- struct rpcrdma_ep *ep,
- struct rpcrdma_req *req)
+int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
struct ib_send_wr *send_wr = &req->rl_wr;
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
int rc;
- if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
+ if (!ep->re_send_count || kref_read(&req->rl_kref) > 1) {
send_wr->send_flags |= IB_SEND_SIGNALED;
- ep->rep_send_count = ep->rep_send_batch;
+ ep->re_send_count = ep->re_send_batch;
} else {
send_wr->send_flags &= ~IB_SEND_SIGNALED;
- --ep->rep_send_count;
+ --ep->re_send_count;
}
- rc = frwr_send(ia, req);
+ rc = frwr_send(r_xprt, req);
trace_xprtrdma_post_send(req, rc);
if (rc)
return -ENOTCONN;
@@ -1542,7 +1371,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
struct ib_recv_wr *wr, *bad_wr;
struct rpcrdma_rep *rep;
int needed, count, rc;
@@ -1551,9 +1380,9 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
count = 0;
needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
- if (likely(ep->rep_receive_count > needed))
+ if (likely(ep->re_receive_count > needed))
goto out;
- needed -= ep->rep_receive_count;
+ needed -= ep->re_receive_count;
if (!temp)
needed += RPCRDMA_MAX_RECV_BATCH;
@@ -1579,7 +1408,7 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
if (!wr)
goto out;
- rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
+ rc = ib_post_recv(ep->re_id->qp, wr,
(const struct ib_recv_wr **)&bad_wr);
out:
trace_xprtrdma_post_recvs(r_xprt, count, rc);
@@ -1593,6 +1422,6 @@ out:
--count;
}
}
- ep->rep_receive_count += count;
+ ep->re_receive_count += count;
return;
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 37d5080c250b..0a16fdb09b2c 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -65,43 +65,33 @@
#define RPCRDMA_IDLE_DISC_TO (5U * 60 * HZ)
/*
- * Interface Adapter -- one per transport instance
+ * RDMA Endpoint -- connection endpoint details
*/
-struct rpcrdma_ia {
- struct rdma_cm_id *ri_id;
- struct ib_pd *ri_pd;
- int ri_async_rc;
- unsigned int ri_max_rdma_segs;
- unsigned int ri_max_frwr_depth;
- bool ri_implicit_roundup;
- enum ib_mr_type ri_mrtype;
- unsigned long ri_flags;
- struct completion ri_done;
- struct completion ri_remove_done;
-};
-
-enum {
- RPCRDMA_IAF_REMOVING = 0,
-};
-
-/*
- * RDMA Endpoint -- one per transport instance
- */
-
struct rpcrdma_ep {
- unsigned int rep_send_count;
- unsigned int rep_send_batch;
- unsigned int rep_max_inline_send;
- unsigned int rep_max_inline_recv;
- int rep_connected;
- struct ib_qp_init_attr rep_attr;
- wait_queue_head_t rep_connect_wait;
- struct rpcrdma_connect_private rep_cm_private;
- struct rdma_conn_param rep_remote_cma;
- unsigned int rep_max_requests; /* depends on device */
- unsigned int rep_inline_send; /* negotiated */
- unsigned int rep_inline_recv; /* negotiated */
- int rep_receive_count;
+ struct kref re_kref;
+ struct rdma_cm_id *re_id;
+ struct ib_pd *re_pd;
+ unsigned int re_max_rdma_segs;
+ unsigned int re_max_fr_depth;
+ bool re_implicit_roundup;
+ enum ib_mr_type re_mrtype;
+ struct completion re_done;
+ unsigned int re_send_count;
+ unsigned int re_send_batch;
+ unsigned int re_max_inline_send;
+ unsigned int re_max_inline_recv;
+ int re_async_rc;
+ int re_connect_status;
+ struct ib_qp_init_attr re_attr;
+ wait_queue_head_t re_connect_wait;
+ struct rpc_xprt *re_xprt;
+ struct rpcrdma_connect_private
+ re_cm_private;
+ struct rdma_conn_param re_remote_cma;
+ int re_receive_count;
+ unsigned int re_max_requests; /* depends on device */
+ unsigned int re_inline_send; /* negotiated */
+ unsigned int re_inline_recv; /* negotiated */
};
/* Pre-allocate extra Work Requests for handling backward receives
@@ -422,8 +412,7 @@ struct rpcrdma_stats {
*/
struct rpcrdma_xprt {
struct rpc_xprt rx_xprt;
- struct rpcrdma_ia rx_ia;
- struct rpcrdma_ep rx_ep;
+ struct rpcrdma_ep *rx_ep;
struct rpcrdma_buffer rx_buf;
struct delayed_work rx_connect_worker;
struct rpc_timeout rx_timeout;
@@ -455,22 +444,13 @@ extern int xprt_rdma_pad_optimize;
extern unsigned int xprt_rdma_memreg_strategy;
/*
- * Interface Adapter calls - xprtrdma/verbs.c
- */
-int rpcrdma_ia_open(struct rpcrdma_xprt *xprt);
-void rpcrdma_ia_remove(struct rpcrdma_ia *ia);
-void rpcrdma_ia_close(struct rpcrdma_ia *);
-
-/*
* Endpoint calls - xprtrdma/verbs.c
*/
-int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt);
-void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt);
-int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
-void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
+void rpcrdma_flush_disconnect(struct ib_cq *cq, struct ib_wc *wc);
+int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt);
+void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt);
-int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
- struct rpcrdma_req *);
+int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
/*
@@ -536,15 +516,14 @@ rpcrdma_data_dir(bool writing)
/* Memory registration calls xprtrdma/frwr_ops.c
*/
void frwr_reset(struct rpcrdma_req *req);
-int frwr_query_device(struct rpcrdma_xprt *r_xprt,
- const struct ib_device *device);
-int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
+int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device);
+int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr);
void frwr_release_mr(struct rpcrdma_mr *mr);
struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_mr_seg *seg,
int nsegs, bool writing, __be32 xid,
struct rpcrdma_mr *mr);
-int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
+int frwr_send(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
@@ -569,7 +548,7 @@ int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
enum rpcrdma_chunktype rtype);
void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc);
int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
-void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
+void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep);
void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt);
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 17cb902e5153..0bda8a73e8a8 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1861,7 +1861,7 @@ static int xs_local_setup_socket(struct sock_xprt *transport)
struct rpc_xprt *xprt = &transport->xprt;
struct file *filp;
struct socket *sock;
- int status = -EIO;
+ int status;
status = __sock_create(xprt->xprt_net, AF_LOCAL,
SOCK_STREAM, 0, &sock, 1);