From a6ca109443842e7251c68451f8137ae68ae6d8a6 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Wed, 26 Nov 2014 11:41:55 +0800 Subject: tipc: use generic SKB list APIs to manage TIPC outgoing packet chains Use standard SKB list APIs associated with struct sk_buff_head to manage socket outgoing packet chain and name table outgoing packet chain, having relevant code simpler and more readable. Signed-off-by: Ying Xue Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/link.c | 98 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 62 insertions(+), 36 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 0e04508cdba4..34bf15c90c78 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -664,9 +664,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) * - For all other messages we discard the buffer and return -EHOSTUNREACH * - For TIPC internal messages we also reset the link */ -static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) +static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list) { - struct tipc_msg *msg = buf_msg(buf); + struct sk_buff *skb = skb_peek(list); + struct tipc_msg *msg = buf_msg(skb); uint imp = tipc_msg_tot_importance(msg); u32 oport = msg_tot_origport(msg); @@ -679,28 +680,29 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) goto drop; if (unlikely(msg_reroute_cnt(msg))) goto drop; - if (TIPC_SKB_CB(buf)->wakeup_pending) + if (TIPC_SKB_CB(skb)->wakeup_pending) return -ELINKCONG; - if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp)) + if (link_schedule_user(link, oport, skb_queue_len(list), imp)) return -ELINKCONG; drop: - kfree_skb_list(buf); + __skb_queue_purge(list); return -EHOSTUNREACH; } /** * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked * @link: link to use - * @skb: chain of buffers containing message + * @list: chain of buffers containing message + * * Consumes the buffer chain, except when returning -ELINKCONG * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket * user data messages) or -EHOSTUNREACH (all other messages/senders) * Only the socket functions tipc_send_stream() and tipc_send_packet() need * to act on the return value, since they may need to do more send attempts. */ -int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb) +int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list) { - struct tipc_msg *msg = buf_msg(skb); + struct tipc_msg *msg = buf_msg(skb_peek(list)); uint psz = msg_size(msg); uint sndlim = link->queue_limit[0]; uint imp = tipc_msg_tot_importance(msg); @@ -710,21 +712,21 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb) uint bc_last_in = link->owner->bclink.last_in; struct tipc_media_addr *addr = &link->media_addr; struct sk_buff_head *outqueue = &link->outqueue; - struct sk_buff *next; + struct sk_buff *skb, *tmp; /* Match queue limits against msg importance: */ if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp])) - return tipc_link_cong(link, skb); + return tipc_link_cong(link, list); /* Has valid packet limit been used ? */ if (unlikely(psz > mtu)) { - kfree_skb_list(skb); + __skb_queue_purge(list); return -EMSGSIZE; } /* Prepare each packet for sending, and add to outqueue: */ - while (skb) { - next = skb->next; + skb_queue_walk_safe(list, skb, tmp) { + __skb_unlink(skb, list); msg = buf_msg(skb); msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); msg_set_bcast_ack(msg, bc_last_in); @@ -736,7 +738,6 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb) link->unacked_window = 0; } else if (tipc_msg_bundle(outqueue, skb, mtu)) { link->stats.sent_bundled++; - skb = next; continue; } else if (tipc_msg_make_bundle(outqueue, skb, mtu, link->addr)) { @@ -750,22 +751,43 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb) link->next_out = skb; } seqno++; - skb = next; } link->next_out_no = seqno; return 0; } +static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) +{ + __skb_queue_head_init(list); + __skb_queue_tail(list, skb); +} + +static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb) +{ + struct sk_buff_head head; + + skb2list(skb, &head); + return __tipc_link_xmit(link, &head); +} + +int tipc_link_xmit_skb(struct sk_buff *skb, u32 dnode, u32 selector) +{ + struct sk_buff_head head; + + skb2list(skb, &head); + return tipc_link_xmit(&head, dnode, selector); +} + /** * tipc_link_xmit() is the general link level function for message sending - * @buf: chain of buffers containing message + * @list: chain of buffers containing message * @dsz: amount of user data to be sent * @dnode: address of destination node * @selector: a number used for deterministic link selection * Consumes the buffer chain, except when returning -ELINKCONG * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ -int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) +int tipc_link_xmit(struct sk_buff_head *list, u32 dnode, u32 selector) { struct tipc_link *link = NULL; struct tipc_node *node; @@ -776,17 +798,22 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) tipc_node_lock(node); link = node->active_links[selector & 1]; if (link) - rc = __tipc_link_xmit(link, buf); + rc = __tipc_link_xmit(link, list); tipc_node_unlock(node); } if (link) return rc; - if (likely(in_own_node(dnode))) - return tipc_sk_rcv(buf); + if (likely(in_own_node(dnode))) { + /* As a node local message chain never contains more than one + * buffer, we just need to dequeue one SKB buffer from the + * head list. + */ + return tipc_sk_rcv(__skb_dequeue(list)); + } + __skb_queue_purge(list); - kfree_skb_list(buf); return rc; } @@ -800,17 +827,17 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) */ static void tipc_link_sync_xmit(struct tipc_link *link) { - struct sk_buff *buf; + struct sk_buff *skb; struct tipc_msg *msg; - buf = tipc_buf_acquire(INT_H_SIZE); - if (!buf) + skb = tipc_buf_acquire(INT_H_SIZE); + if (!skb) return; - msg = buf_msg(buf); + msg = buf_msg(skb); tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); msg_set_last_bcast(msg, link->owner->bclink.acked); - __tipc_link_xmit(link, buf); + __tipc_link_xmit_skb(link, skb); } /* @@ -1053,8 +1080,7 @@ void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *b_ptr) u32 ackd; u32 released; - __skb_queue_head_init(&head); - __skb_queue_tail(&head, skb); + skb2list(skb, &head); while ((skb = __skb_dequeue(&head))) { /* Ensure message is well-formed */ @@ -1573,7 +1599,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, u32 selector) { struct tipc_link *tunnel; - struct sk_buff *buf; + struct sk_buff *skb; u32 length = msg_size(msg); tunnel = l_ptr->owner->active_links[selector & 1]; @@ -1582,14 +1608,14 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, return; } msg_set_size(tunnel_hdr, length + INT_H_SIZE); - buf = tipc_buf_acquire(length + INT_H_SIZE); - if (!buf) { + skb = tipc_buf_acquire(length + INT_H_SIZE); + if (!skb) { pr_warn("%sunable to send tunnel msg\n", link_co_err); return; } - skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); - __tipc_link_xmit(tunnel, buf); + skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE); + skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length); + __tipc_link_xmit_skb(tunnel, skb); } @@ -1620,7 +1646,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) if (skb) { skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); msg_set_size(&tunnel_hdr, INT_H_SIZE); - __tipc_link_xmit(tunnel, skb); + __tipc_link_xmit_skb(tunnel, skb); } else { pr_warn("%sunable to send changeover msg\n", link_co_err); @@ -1691,7 +1717,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE); skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data, length); - __tipc_link_xmit(tunnel, outskb); + __tipc_link_xmit_skb(tunnel, outskb); if (!tipc_link_is_up(l_ptr)) return; } -- cgit v1.2.3