diff options
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r-- | net/tipc/link.c | 673 |
1 files changed, 301 insertions, 372 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c index eaccf4552d15..55b675d20de8 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -77,6 +77,10 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = { }; /* + * Interval between NACKs when packets arrive out of order + */ +#define TIPC_NACK_INTV (TIPC_MIN_LINK_WIN * 2) +/* * Out-of-range value for link session numbers */ #define WILDCARD_SESSION 0x10000 @@ -123,22 +127,19 @@ static int link_establishing(struct tipc_link *l) return l->state == TIPC_LINK_ESTABLISHING; } -static void link_handle_out_of_seq_msg(struct tipc_link *link, - struct sk_buff *skb); -static void tipc_link_proto_rcv(struct tipc_link *link, - struct sk_buff *skb); -static void link_state_event(struct tipc_link *l_ptr, u32 event); +static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *xmitq); static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, u16 rcvgap, int tolerance, int priority, struct sk_buff_head *xmitq); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); -static void tipc_link_sync_xmit(struct tipc_link *l); +static void tipc_link_build_bcast_sync_msg(struct tipc_link *l, + struct sk_buff_head *xmitq); static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb); static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb); static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb); -static void link_activate(struct tipc_link *link); /* * Simple link routines @@ -283,6 +284,26 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id) rcu_read_unlock(); } +/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. + * + * Give a newly added peer node the sequence number where it should + * start receiving and acking broadcast packets. + */ +static void tipc_link_build_bcast_sync_msg(struct tipc_link *l, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb; + struct sk_buff_head list; + + skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, + 0, l->addr, link_own_addr(l), 0, 0, 0); + if (!skb) + return; + __skb_queue_head_init(&list); + __skb_queue_tail(&list, skb); + tipc_link_xmit(l, &list, xmitq); +} + /** * tipc_link_fsm_evt - link finite state machine * @l: pointer to link @@ -295,12 +316,13 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt, int mtyp = 0, rc = 0; struct tipc_link *pl; enum { - LINK_RESET = 1, - LINK_ACTIVATE = (1 << 1), - SND_PROBE = (1 << 2), - SND_STATE = (1 << 3), - SND_RESET = (1 << 4), - SND_ACTIVATE = (1 << 5) + LINK_RESET = 1, + LINK_ACTIVATE = (1 << 1), + SND_PROBE = (1 << 2), + SND_STATE = (1 << 3), + SND_RESET = (1 << 4), + SND_ACTIVATE = (1 << 5), + SND_BCAST_SYNC = (1 << 6) } actions = 0; if (l->exec_mode == TIPC_LINK_BLOCKED) @@ -352,8 +374,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt, if (pl && link_probing(pl)) break; actions |= LINK_ACTIVATE; - if (l->owner->working_links == 1) - tipc_link_sync_xmit(l); + if (!l->owner->working_links) + actions |= SND_BCAST_SYNC; break; case PEER_RESET_EVT: l->state = TIPC_LINK_ESTABLISHING; @@ -374,8 +396,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt, if (pl && link_probing(pl)) break; actions |= LINK_ACTIVATE; - if (l->owner->working_links == 1) - tipc_link_sync_xmit(l); + if (!l->owner->working_links) + actions |= SND_BCAST_SYNC; break; case PEER_RESET_EVT: break; @@ -408,6 +430,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt, if (actions & (SND_PROBE | SND_STATE | SND_RESET | SND_ACTIVATE)) tipc_link_build_proto_msg(l, mtyp, actions & SND_PROBE, 0, 0, 0, xmitq); + if (actions & SND_BCAST_SYNC) + tipc_link_build_bcast_sync_msg(l, xmitq); return rc; } @@ -605,12 +629,14 @@ void tipc_link_reset(struct tipc_link *l_ptr) l_ptr->reasm_buf = NULL; l_ptr->rcv_unacked = 0; l_ptr->snd_nxt = 1; + l_ptr->rcv_nxt = 1; l_ptr->silent_intv_cnt = 0; + l_ptr->stats.recv_info = 0; l_ptr->stale_count = 0; link_reset_statistics(l_ptr); } -static void link_activate(struct tipc_link *link) +void tipc_link_activate(struct tipc_link *link) { struct tipc_node *node = link->owner; @@ -624,36 +650,6 @@ static void link_activate(struct tipc_link *link) } /** - * link_state_event - link finite state machine - * @l_ptr: pointer to link - * @event: state machine event to process - */ -static void link_state_event(struct tipc_link *l, unsigned int evt) -{ - int rc; - struct sk_buff_head xmitq; - struct sk_buff *skb; - - if (l->exec_mode == TIPC_LINK_BLOCKED) - return; - - __skb_queue_head_init(&xmitq); - - rc = tipc_link_fsm_evt(l, evt, &xmitq); - - if (rc & TIPC_LINK_UP_EVT) - link_activate(l); - - if (rc & TIPC_LINK_DOWN_EVT) - tipc_link_reset(l); - - skb = __skb_dequeue(&xmitq); - if (!skb) - return; - tipc_bearer_send(l->owner->net, l->bearer_id, skb, &l->media_addr); -} - -/** * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked * @link: link to use * @list: chain of buffers containing message @@ -808,30 +804,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb) } /* - * tipc_link_sync_xmit - synchronize broadcast link endpoints. - * - * Give a newly added peer node the sequence number where it should - * start receiving and acking broadcast packets. - * - * Called with node locked - */ -static void tipc_link_sync_xmit(struct tipc_link *link) -{ - struct sk_buff *skb; - struct tipc_msg *msg; - - skb = tipc_buf_acquire(INT_H_SIZE); - if (!skb) - return; - - msg = buf_msg(skb); - tipc_msg_init(link_own_addr(link), msg, BCAST_PROTOCOL, STATE_MSG, - INT_H_SIZE, link->addr); - msg_set_last_bcast(msg, link->owner->bclink.acked); - __tipc_link_xmit_skb(link, skb); -} - -/* * tipc_link_sync_rcv - synchronize broadcast link endpoints. * Receive the sequence number where we should start receiving and * acking broadcast packets from a newly added peer node, and open @@ -881,6 +853,34 @@ void tipc_link_push_packets(struct tipc_link *link) link->snd_nxt = seqno; } +void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) +{ + struct sk_buff *skb, *_skb; + struct tipc_msg *hdr; + u16 seqno = l->snd_nxt; + u16 ack = l->rcv_nxt - 1; + + while (skb_queue_len(&l->transmq) < l->window) { + skb = skb_peek(&l->backlogq); + if (!skb) + break; + _skb = skb_clone(skb, GFP_ATOMIC); + if (!_skb) + break; + __skb_dequeue(&l->backlogq); + hdr = buf_msg(skb); + l->backlog[msg_importance(hdr)].len--; + __skb_queue_tail(&l->transmq, skb); + __skb_queue_tail(xmitq, _skb); + msg_set_ack(hdr, ack); + msg_set_seqno(hdr, seqno); + msg_set_bcast_ack(hdr, l->owner->bclink.last_in); + l->rcv_unacked = 0; + seqno++; + } + l->snd_nxt = seqno; +} + void tipc_link_reset_all(struct tipc_node *node) { char addr_string[16]; @@ -978,6 +978,41 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, } } +static int tipc_link_retransm(struct tipc_link *l, int retransm, + struct sk_buff_head *xmitq) +{ + struct sk_buff *_skb, *skb = skb_peek(&l->transmq); + struct tipc_msg *hdr; + + if (!skb) + return 0; + + /* Detect repeated retransmit failures on same packet */ + if (likely(l->last_retransm != buf_seqno(skb))) { + l->last_retransm = buf_seqno(skb); + l->stale_count = 1; + } else if (++l->stale_count > 100) { + link_retransmit_failure(l, skb); + return TIPC_LINK_DOWN_EVT; + } + skb_queue_walk(&l->transmq, skb) { + if (!retransm) + return 0; + hdr = buf_msg(skb); + _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); + if (!_skb) + return 0; + hdr = buf_msg(_skb); + msg_set_ack(hdr, l->rcv_nxt - 1); + msg_set_bcast_ack(hdr, l->owner->bclink.last_in); + _skb->priority = TC_PRIO_CONTROL; + __skb_queue_tail(xmitq, _skb); + retransm--; + l->stats.retransmitted++; + } + return 0; +} + /* link_synch(): check if all packets arrived before the synch * point have been consumed * Returns true if the parallel links are synched, otherwise false @@ -1004,155 +1039,6 @@ synched: return true; } -static void link_retrieve_defq(struct tipc_link *link, - struct sk_buff_head *list) -{ - u16 seq_no; - - if (skb_queue_empty(&link->deferdq)) - return; - - seq_no = buf_seqno(skb_peek(&link->deferdq)); - if (seq_no == link->rcv_nxt) - skb_queue_splice_tail_init(&link->deferdq, list); -} - -/** - * tipc_rcv - process TIPC packets/messages arriving from off-node - * @net: the applicable net namespace - * @skb: TIPC packet - * @b_ptr: pointer to bearer message arrived on - * - * Invoked with no locks held. Bearer pointer must point to a valid bearer - * structure (i.e. cannot be NULL), but bearer can be inactive. - */ -void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct sk_buff_head head; - struct tipc_node *n_ptr; - struct tipc_link *l_ptr; - struct sk_buff *skb1, *tmp; - struct tipc_msg *msg; - u16 seq_no; - u16 ackd; - u32 released; - - skb2list(skb, &head); - - while ((skb = __skb_dequeue(&head))) { - /* Ensure message is well-formed */ - if (unlikely(!tipc_msg_validate(skb))) - goto discard; - - /* Handle arrival of a non-unicast link message */ - msg = buf_msg(skb); - if (unlikely(msg_non_seq(msg))) { - if (msg_user(msg) == LINK_CONFIG) - tipc_disc_rcv(net, skb, b_ptr); - else - tipc_bclink_rcv(net, skb); - continue; - } - - /* Discard unicast link messages destined for another node */ - if (unlikely(!msg_short(msg) && - (msg_destnode(msg) != tn->own_addr))) - goto discard; - - /* Locate neighboring node that sent message */ - n_ptr = tipc_node_find(net, msg_prevnode(msg)); - if (unlikely(!n_ptr)) - goto discard; - - tipc_node_lock(n_ptr); - /* Locate unicast link endpoint that should handle message */ - l_ptr = n_ptr->links[b_ptr->identity].link; - if (unlikely(!l_ptr)) - goto unlock; - - /* Is reception of this pkt permitted at the moment ? */ - if (!tipc_node_filter_skb(n_ptr, msg)) - goto unlock; - - /* Validate message sequence number info */ - seq_no = msg_seqno(msg); - ackd = msg_ack(msg); - - /* Release acked messages */ - if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg))) - tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); - - released = 0; - skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) { - if (more(buf_seqno(skb1), ackd)) - break; - __skb_unlink(skb1, &l_ptr->transmq); - kfree_skb(skb1); - released = 1; - } - - /* Try sending any messages link endpoint has pending */ - if (unlikely(skb_queue_len(&l_ptr->backlogq))) - tipc_link_push_packets(l_ptr); - - if (released && !skb_queue_empty(&l_ptr->wakeupq)) - link_prepare_wakeup(l_ptr); - - /* Process the incoming packet */ - if (unlikely(!link_working(l_ptr))) { - if (msg_user(msg) == LINK_PROTOCOL) { - tipc_link_proto_rcv(l_ptr, skb); - link_retrieve_defq(l_ptr, &head); - skb = NULL; - goto unlock; - } - - /* Traffic message. Conditionally activate link */ - link_state_event(l_ptr, TRAFFIC_EVT); - - if (link_working(l_ptr)) { - /* Re-insert buffer in front of queue */ - __skb_queue_head(&head, skb); - skb = NULL; - goto unlock; - } - goto unlock; - } - - /* Link is now in state TIPC_LINK_WORKING */ - if (unlikely(seq_no != l_ptr->rcv_nxt)) { - link_handle_out_of_seq_msg(l_ptr, skb); - link_retrieve_defq(l_ptr, &head); - skb = NULL; - goto unlock; - } - l_ptr->silent_intv_cnt = 0; - - /* Synchronize with parallel link if applicable */ - if (unlikely((l_ptr->exec_mode == TIPC_LINK_TUNNEL) && - !msg_dup(msg))) { - if (!link_synch(l_ptr)) - goto unlock; - } - l_ptr->rcv_nxt++; - if (unlikely(!skb_queue_empty(&l_ptr->deferdq))) - link_retrieve_defq(l_ptr, &head); - if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) { - l_ptr->stats.sent_acks++; - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0); - } - tipc_link_input(l_ptr, skb); - skb = NULL; -unlock: - tipc_node_unlock(n_ptr); - tipc_node_put(n_ptr); -discard: - if (unlikely(skb)) - kfree_skb(skb); - } -} - /* tipc_data_input - deliver data and name distr msgs to upper layer * * Consumes buffer if message is of right type @@ -1206,9 +1092,6 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb) struct sk_buff *iskb; int pos = 0; - if (likely(tipc_data_input(link, skb))) - return; - switch (msg_user(msg)) { case TUNNEL_PROTOCOL: if (msg_dup(msg)) { @@ -1247,6 +1130,110 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb) }; } +static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) +{ + bool released = false; + struct sk_buff *skb, *tmp; + + skb_queue_walk_safe(&l->transmq, skb, tmp) { + if (more(buf_seqno(skb), acked)) + break; + __skb_unlink(skb, &l->transmq); + kfree_skb(skb); + released = true; + } + return released; +} + +/* tipc_link_rcv - process TIPC packets/messages arriving from off-node + * @link: the link that should handle the message + * @skb: TIPC packet + * @xmitq: queue to place packets to be sent after this call + */ +int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *xmitq) +{ + struct sk_buff_head *arrvq = &l->deferdq; + struct sk_buff *tmp; + struct tipc_msg *hdr; + u16 seqno, rcv_nxt; + int rc = 0; + + if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) { + if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV)) + tipc_link_build_proto_msg(l, STATE_MSG, 0, + 0, 0, 0, xmitq); + return rc; + } + + skb_queue_walk_safe(arrvq, skb, tmp) { + hdr = buf_msg(skb); + + /* Verify and update link state */ + if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) { + __skb_dequeue(arrvq); + rc |= tipc_link_proto_rcv(l, skb, xmitq); + continue; + } + + if (unlikely(!link_working(l))) { + rc |= tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq); + if (!link_working(l)) { + kfree_skb(__skb_dequeue(arrvq)); + return rc; + } + } + + l->silent_intv_cnt = 0; + + /* Forward queues and wake up waiting users */ + if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) { + tipc_link_advance_backlog(l, xmitq); + if (unlikely(!skb_queue_empty(&l->wakeupq))) + link_prepare_wakeup(l); + } + + /* Defer reception if there is a gap in the sequence */ + seqno = msg_seqno(hdr); + rcv_nxt = l->rcv_nxt; + if (unlikely(less(rcv_nxt, seqno))) { + l->stats.deferred_recv++; + return rc; + } + + __skb_dequeue(arrvq); + + /* Drop if packet already received */ + if (unlikely(more(rcv_nxt, seqno))) { + l->stats.duplicates++; + kfree_skb(skb); + return rc; + } + + /* Synchronize with parallel link if applicable */ + if (unlikely(l->exec_mode == TIPC_LINK_TUNNEL)) + if (!msg_dup(hdr) && !link_synch(l)) { + kfree_skb(skb); + return rc; + } + + /* Packet can be delivered */ + l->rcv_nxt++; + l->stats.recv_info++; + if (unlikely(!tipc_data_input(l, skb))) + tipc_link_input(l, skb); + + /* Ack at regular intervals */ + if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) { + l->rcv_unacked = 0; + l->stats.sent_acks++; + tipc_link_build_proto_msg(l, STATE_MSG, + 0, 0, 0, 0, xmitq); + } + } + return rc; +} + /** * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue * @@ -1287,41 +1274,6 @@ u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb) } /* - * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet - */ -static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, - struct sk_buff *buf) -{ - u32 seq_no = buf_seqno(buf); - - if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { - tipc_link_proto_rcv(l_ptr, buf); - return; - } - - /* Record OOS packet arrival */ - l_ptr->silent_intv_cnt = 0; - - /* - * Discard packet if a duplicate; otherwise add it to deferred queue - * and notify peer of gap as per protocol specification - */ - if (less(seq_no, l_ptr->rcv_nxt)) { - l_ptr->stats.duplicates++; - kfree_skb(buf); - return; - } - - if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) { - l_ptr->stats.deferred_recv++; - if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1) - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0); - } else { - l_ptr->stats.duplicates++; - } -} - -/* * Send protocol message to the other endpoint. */ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg, @@ -1341,119 +1293,6 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg, kfree_skb(skb); } -/* - * Receive protocol message : - * Note that network plane id propagates through the network, and may - * change at any time. The node with lowest address rules - */ -static void tipc_link_proto_rcv(struct tipc_link *l_ptr, - struct sk_buff *buf) -{ - u32 rec_gap = 0; - u32 msg_tol; - struct tipc_msg *msg = buf_msg(buf); - - if (l_ptr->exec_mode == TIPC_LINK_BLOCKED) - goto exit; - - if (l_ptr->net_plane != msg_net_plane(msg)) - if (link_own_addr(l_ptr) > msg_prevnode(msg)) - l_ptr->net_plane = msg_net_plane(msg); - - switch (msg_type(msg)) { - - case RESET_MSG: - if (!link_probing(l_ptr) && - (l_ptr->peer_session != WILDCARD_SESSION)) { - if (less_eq(msg_session(msg), l_ptr->peer_session)) - break; /* duplicate or old reset: ignore */ - } - link_state_event(l_ptr, RESET_MSG); - - /* fall thru' */ - case ACTIVATE_MSG: - /* Update link settings according other endpoint's values */ - strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg)); - - msg_tol = msg_link_tolerance(msg); - if (msg_tol > l_ptr->tolerance) - l_ptr->tolerance = msg_tol; - - if (msg_linkprio(msg) > l_ptr->priority) - l_ptr->priority = msg_linkprio(msg); - - if (l_ptr->mtu > msg_max_pkt(msg)) - l_ptr->mtu = msg_max_pkt(msg); - - /* Synchronize broadcast link info, if not done previously */ - if (!tipc_node_is_up(l_ptr->owner)) { - l_ptr->owner->bclink.last_sent = - l_ptr->owner->bclink.last_in = - msg_last_bcast(msg); - l_ptr->owner->bclink.oos_state = 0; - } - - l_ptr->peer_session = msg_session(msg); - l_ptr->peer_bearer_id = msg_bearer_id(msg); - - if (!msg_peer_is_up(msg)) - tipc_node_fsm_evt(l_ptr->owner, PEER_LOST_CONTACT_EVT); - if (msg_type(msg) == ACTIVATE_MSG) - link_state_event(l_ptr, ACTIVATE_MSG); - break; - case STATE_MSG: - - msg_tol = msg_link_tolerance(msg); - if (msg_tol) - l_ptr->tolerance = msg_tol; - - if (msg_linkprio(msg) && - (msg_linkprio(msg) != l_ptr->priority)) { - pr_info("%s<%s>, priority change %u->%u\n", - link_rst_msg, l_ptr->name, - l_ptr->priority, msg_linkprio(msg)); - l_ptr->priority = msg_linkprio(msg); - tipc_link_reset(l_ptr); - break; - } - - /* Record reception; force mismatch at next timeout: */ - l_ptr->silent_intv_cnt = 0; - - link_state_event(l_ptr, TRAFFIC_EVT); - l_ptr->stats.recv_states++; - if (link_resetting(l_ptr)) - break; - - if (less_eq(l_ptr->rcv_nxt, msg_next_sent(msg))) - rec_gap = mod(msg_next_sent(msg) - l_ptr->rcv_nxt); - - if (msg_probe(msg)) - l_ptr->stats.recv_probes++; - - /* Protocol message before retransmits, reduce loss risk */ - if (l_ptr->owner->bclink.recv_permitted) - tipc_bclink_update_link_state(l_ptr->owner, - msg_last_bcast(msg)); - - if (rec_gap || (msg_probe(msg))) - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, - rec_gap, 0, 0); - - if (msg_seq_gap(msg)) { - l_ptr->stats.recv_nacks++; - tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq), - msg_seq_gap(msg)); - } - if (tipc_link_is_up(l_ptr)) - tipc_node_fsm_evt(l_ptr->owner, - PEER_ESTABL_CONTACT_EVT); - break; - } -exit: - kfree_skb(buf); -} - /* tipc_link_build_proto_msg: prepare link protocol message for transmission */ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, @@ -1727,6 +1566,96 @@ exit: return *skb; } +/* tipc_link_proto_rcv(): receive link level protocol message : + * Note that network plane id propagates through the network, and may + * change at any time. The node with lowest numerical id determines + * network plane + */ +static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr = buf_msg(skb); + u16 rcvgap = 0; + u16 nacked_gap = msg_seq_gap(hdr); + u16 peers_snd_nxt = msg_next_sent(hdr); + u16 peers_tol = msg_link_tolerance(hdr); + u16 peers_prio = msg_linkprio(hdr); + char *if_name; + int rc = 0; + + if (l->exec_mode == TIPC_LINK_BLOCKED) + goto exit; + + if (link_own_addr(l) > msg_prevnode(hdr)) + l->net_plane = msg_net_plane(hdr); + + switch (msg_type(hdr)) { + case RESET_MSG: + + /* Ignore duplicate RESET with old session number */ + if ((less_eq(msg_session(hdr), l->peer_session)) && + (l->peer_session != WILDCARD_SESSION)) + break; + /* fall thru' */ + case ACTIVATE_MSG: + + /* Complete own link name with peer's interface name */ + if_name = strrchr(l->name, ':') + 1; + if (sizeof(l->name) - (if_name - l->name) <= TIPC_MAX_IF_NAME) + break; + if (msg_data_sz(hdr) < TIPC_MAX_IF_NAME) + break; + strncpy(if_name, msg_data(hdr), TIPC_MAX_IF_NAME); + + /* Update own tolerance if peer indicates a non-zero value */ + if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL)) + l->tolerance = peers_tol; + + /* Update own priority if peer's priority is higher */ + if (in_range(peers_prio, l->priority + 1, TIPC_MAX_LINK_PRI)) + l->priority = peers_prio; + + l->peer_session = msg_session(hdr); + l->peer_bearer_id = msg_bearer_id(hdr); + rc = tipc_link_fsm_evt(l, msg_type(hdr), xmitq); + if (l->mtu > msg_max_pkt(hdr)) + l->mtu = msg_max_pkt(hdr); + break; + case STATE_MSG: + /* Update own tolerance if peer indicates a non-zero value */ + if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL)) + l->tolerance = peers_tol; + + l->silent_intv_cnt = 0; + l->stats.recv_states++; + if (msg_probe(hdr)) + l->stats.recv_probes++; + rc = tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq); + if (!tipc_link_is_up(l)) + break; + + /* Has peer sent packets we haven't received yet ? */ + if (more(peers_snd_nxt, l->rcv_nxt)) + rcvgap = peers_snd_nxt - l->rcv_nxt; + if (rcvgap || (msg_probe(hdr))) + tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap, + 0, l->mtu, xmitq); + tipc_link_release_pkts(l, msg_ack(hdr)); + + /* If NACK, retransmit will now start at right position */ + if (nacked_gap) { + rc |= tipc_link_retransm(l, nacked_gap, xmitq); + l->stats.recv_nacks++; + } + tipc_link_advance_backlog(l, xmitq); + if (unlikely(!skb_queue_empty(&l->wakeupq))) + link_prepare_wakeup(l); + } +exit: + kfree_skb(skb); + return rc; +} + void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) { int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); |