From 35fcde7f8deb51b707b161bf19cbd22363aef2df Mon Sep 17 00:00:00 2001 From: Magnus Karlsson Date: Wed, 2 May 2018 13:01:34 +0200 Subject: xsk: support for Tx Here, Tx support is added. The user fills the Tx queue with frames to be sent by the kernel, and let's the kernel know using the sendmsg syscall. Signed-off-by: Magnus Karlsson Signed-off-by: Alexei Starovoitov --- net/xdp/xsk.c | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++-- net/xdp/xsk_queue.h | 93 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 200 insertions(+), 4 deletions(-) (limited to 'net') diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 2d7b0c90d996..b33c535c7996 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -36,6 +36,8 @@ #include "xsk_queue.h" #include "xdp_umem.h" +#define TX_BATCH_SIZE 16 + static struct xdp_sock *xdp_sk(struct sock *sk) { return (struct xdp_sock *)sk; @@ -101,6 +103,108 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) return err; } +static void xsk_destruct_skb(struct sk_buff *skb) +{ + u32 id = (u32)(long)skb_shinfo(skb)->destructor_arg; + struct xdp_sock *xs = xdp_sk(skb->sk); + + WARN_ON_ONCE(xskq_produce_id(xs->umem->cq, id)); + + sock_wfree(skb); +} + +static int xsk_generic_xmit(struct sock *sk, struct msghdr *m, + size_t total_len) +{ + bool need_wait = !(m->msg_flags & MSG_DONTWAIT); + u32 max_batch = TX_BATCH_SIZE; + struct xdp_sock *xs = xdp_sk(sk); + bool sent_frame = false; + struct xdp_desc desc; + struct sk_buff *skb; + int err = 0; + + if (unlikely(!xs->tx)) + return -ENOBUFS; + if (need_wait) + return -EOPNOTSUPP; + + mutex_lock(&xs->mutex); + + while (xskq_peek_desc(xs->tx, &desc)) { + char *buffer; + u32 id, len; + + if (max_batch-- == 0) { + err = -EAGAIN; + goto out; + } + + if (xskq_reserve_id(xs->umem->cq)) { + err = -EAGAIN; + goto out; + } + + len = desc.len; + if (unlikely(len > xs->dev->mtu)) { + err = -EMSGSIZE; + goto out; + } + + skb = sock_alloc_send_skb(sk, len, !need_wait, &err); + if (unlikely(!skb)) { + err = -EAGAIN; + goto out; + } + + skb_put(skb, len); + id = desc.idx; + buffer = xdp_umem_get_data(xs->umem, id) + desc.offset; + err = skb_store_bits(skb, 0, buffer, len); + if (unlikely(err)) { + kfree_skb(skb); + goto out; + } + + skb->dev = xs->dev; + skb->priority = sk->sk_priority; + skb->mark = sk->sk_mark; + skb_shinfo(skb)->destructor_arg = (void *)(long)id; + skb->destructor = xsk_destruct_skb; + + err = dev_direct_xmit(skb, xs->queue_id); + /* Ignore NET_XMIT_CN as packet might have been sent */ + if (err == NET_XMIT_DROP || err == NETDEV_TX_BUSY) { + err = -EAGAIN; + /* SKB consumed by dev_direct_xmit() */ + goto out; + } + + sent_frame = true; + xskq_discard_desc(xs->tx); + } + +out: + if (sent_frame) + sk->sk_write_space(sk); + + mutex_unlock(&xs->mutex); + return err; +} + +static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len) +{ + struct sock *sk = sock->sk; + struct xdp_sock *xs = xdp_sk(sk); + + if (unlikely(!xs->dev)) + return -ENXIO; + if (unlikely(!(xs->dev->flags & IFF_UP))) + return -ENETDOWN; + + return xsk_generic_xmit(sk, m, total_len); +} + static unsigned int xsk_poll(struct file *file, struct socket *sock, struct poll_table_struct *wait) { @@ -110,6 +214,8 @@ static unsigned int xsk_poll(struct file *file, struct socket *sock, if (xs->rx && !xskq_empty_desc(xs->rx)) mask |= POLLIN | POLLRDNORM; + if (xs->tx && !xskq_full_desc(xs->tx)) + mask |= POLLOUT | POLLWRNORM; return mask; } @@ -270,6 +376,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) xs->queue_id = sxdp->sxdp_queue_id; xskq_set_umem(xs->rx, &xs->umem->props); + xskq_set_umem(xs->tx, &xs->umem->props); out_unlock: if (err) @@ -383,8 +490,6 @@ static int xsk_mmap(struct file *file, struct socket *sock, q = xs->umem->fq; else if (offset == XDP_UMEM_PGOFF_COMPLETION_RING) q = xs->umem->cq; - else - return -EINVAL; } if (!q) @@ -420,7 +525,7 @@ static const struct proto_ops xsk_proto_ops = { .shutdown = sock_no_shutdown, .setsockopt = xsk_setsockopt, .getsockopt = sock_no_getsockopt, - .sendmsg = sock_no_sendmsg, + .sendmsg = xsk_sendmsg, .recvmsg = sock_no_recvmsg, .mmap = xsk_mmap, .sendpage = sock_no_sendpage, diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 0a9b92b4f93a..3497e8808608 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -111,7 +111,93 @@ static inline void xskq_discard_id(struct xsk_queue *q) (void)xskq_validate_id(q); } -/* Rx queue */ +static inline int xskq_produce_id(struct xsk_queue *q, u32 id) +{ + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; + + ring->desc[q->prod_tail++ & q->ring_mask] = id; + + /* Order producer and data */ + smp_wmb(); + + WRITE_ONCE(q->ring->producer, q->prod_tail); + return 0; +} + +static inline int xskq_reserve_id(struct xsk_queue *q) +{ + if (xskq_nb_free(q, q->prod_head, 1) == 0) + return -ENOSPC; + + q->prod_head++; + return 0; +} + +/* Rx/Tx queue */ + +static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d) +{ + u32 buff_len; + + if (unlikely(d->idx >= q->umem_props.nframes)) { + q->invalid_descs++; + return false; + } + + buff_len = q->umem_props.frame_size; + if (unlikely(d->len > buff_len || d->len == 0 || + d->offset > buff_len || d->offset + d->len > buff_len)) { + q->invalid_descs++; + return false; + } + + return true; +} + +static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q, + struct xdp_desc *desc) +{ + while (q->cons_tail != q->cons_head) { + struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; + unsigned int idx = q->cons_tail & q->ring_mask; + + if (xskq_is_valid_desc(q, &ring->desc[idx])) { + if (desc) + *desc = ring->desc[idx]; + return desc; + } + + q->cons_tail++; + } + + return NULL; +} + +static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, + struct xdp_desc *desc) +{ + struct xdp_rxtx_ring *ring; + + if (q->cons_tail == q->cons_head) { + WRITE_ONCE(q->ring->consumer, q->cons_tail); + q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); + + /* Order consumer and data */ + smp_rmb(); + + return xskq_validate_desc(q, desc); + } + + ring = (struct xdp_rxtx_ring *)q->ring; + *desc = ring->desc[q->cons_tail & q->ring_mask]; + return desc; +} + +static inline void xskq_discard_desc(struct xsk_queue *q) +{ + q->cons_tail++; + (void)xskq_validate_desc(q, NULL); +} static inline int xskq_produce_batch_desc(struct xsk_queue *q, u32 id, u32 len, u16 offset) @@ -139,6 +225,11 @@ static inline void xskq_produce_flush_desc(struct xsk_queue *q) WRITE_ONCE(q->ring->producer, q->prod_tail); } +static inline bool xskq_full_desc(struct xsk_queue *q) +{ + return (xskq_nb_avail(q, q->nentries) == q->nentries); +} + static inline bool xskq_empty_desc(struct xsk_queue *q) { return (xskq_nb_free(q, q->prod_tail, 1) == q->nentries); -- cgit v1.2.3