diff options
-rw-r--r-- | net/rxrpc/ar-internal.h | 25 | ||||
-rw-r--r-- | net/rxrpc/call_event.c | 18 | ||||
-rw-r--r-- | net/rxrpc/conn_event.c | 113 | ||||
-rw-r--r-- | net/rxrpc/conn_object.c | 52 | ||||
-rw-r--r-- | net/rxrpc/input.c | 68 | ||||
-rw-r--r-- | net/rxrpc/proc.c | 2 | ||||
-rw-r--r-- | net/rxrpc/skbuff.c | 10 |
7 files changed, 226 insertions, 62 deletions
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h index 8cb517fbbd23..7296039c537a 100644 --- a/net/rxrpc/ar-internal.h +++ b/net/rxrpc/ar-internal.h @@ -295,7 +295,12 @@ struct rxrpc_connection { u32 call_id; /* ID of current call */ u32 call_counter; /* Call ID counter */ u32 last_call; /* ID of last call */ - u32 last_result; /* Result of last call (0/abort) */ + u8 last_type; /* Type of last packet */ + u16 last_service_id; + union { + u32 last_seq; + u32 last_abort; + }; } channels[RXRPC_MAXCALLS]; wait_queue_head_t channel_wq; /* queue to wait for channel to become available */ @@ -313,7 +318,7 @@ struct rxrpc_connection { struct rxrpc_crypt csum_iv; /* packet checksum base */ unsigned long flags; unsigned long events; - unsigned long put_time; /* Time at which last put */ + unsigned long idle_timestamp; /* Time at which last became idle */ spinlock_t state_lock; /* state-change lock */ atomic_t usage; enum rxrpc_conn_proto_state state : 8; /* current state of connection */ @@ -322,7 +327,7 @@ struct rxrpc_connection { int error; /* local error incurred */ int debug_id; /* debug ID for printks */ atomic_t serial; /* packet serial number counter */ - atomic_t hi_serial; /* highest serial number received */ + unsigned int hi_serial; /* highest serial number received */ atomic_t avail_chans; /* number of channels available */ u8 size_align; /* data size alignment (for security) */ u8 header_size; /* rxrpc + security header size */ @@ -457,6 +462,7 @@ struct rxrpc_call { rxrpc_seq_t ackr_win_top; /* top of ACK window (rx_data_eaten is bottom) */ rxrpc_seq_t ackr_prev_seq; /* previous sequence number received */ u8 ackr_reason; /* reason to ACK */ + u16 ackr_skew; /* skew on packet being ACK'd */ rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */ atomic_t ackr_not_idle; /* number of packets in Rx queue */ @@ -499,8 +505,8 @@ int rxrpc_reject_call(struct rxrpc_sock *); /* * call_event.c */ -void __rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, bool); -void rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, bool); +void __rxrpc_propose_ACK(struct rxrpc_call *, u8, u16, u32, bool); +void rxrpc_propose_ACK(struct rxrpc_call *, u8, u16, u32, bool); void rxrpc_process_call(struct work_struct *); /* @@ -565,7 +571,7 @@ struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *, struct sk_buff *); void __rxrpc_disconnect_call(struct rxrpc_call *); void rxrpc_disconnect_call(struct rxrpc_call *); -void rxrpc_put_connection(struct rxrpc_connection *); +void __rxrpc_put_connection(struct rxrpc_connection *); void __exit rxrpc_destroy_all_connections(void); static inline bool rxrpc_conn_is_client(const struct rxrpc_connection *conn) @@ -589,6 +595,13 @@ struct rxrpc_connection *rxrpc_get_connection_maybe(struct rxrpc_connection *con return atomic_inc_not_zero(&conn->usage) ? conn : NULL; } +static inline void rxrpc_put_connection(struct rxrpc_connection *conn) +{ + if (conn && atomic_dec_return(&conn->usage) == 1) + __rxrpc_put_connection(conn); +} + + static inline bool rxrpc_queue_conn(struct rxrpc_connection *conn) { if (!rxrpc_get_connection_maybe(conn)) diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c index 3d1267cea9ea..3d1961d82325 100644 --- a/net/rxrpc/call_event.c +++ b/net/rxrpc/call_event.c @@ -25,7 +25,7 @@ * propose an ACK be sent */ void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, - u32 serial, bool immediate) + u16 skew, u32 serial, bool immediate) { unsigned long expiry; s8 prior = rxrpc_ack_priority[ack_reason]; @@ -44,8 +44,10 @@ void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, /* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial * numbers */ if (prior == rxrpc_ack_priority[call->ackr_reason]) { - if (prior <= 4) + if (prior <= 4) { + call->ackr_skew = skew; call->ackr_serial = serial; + } if (immediate) goto cancel_timer; return; @@ -103,13 +105,13 @@ cancel_timer: * propose an ACK be sent, locking the call structure */ void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, - u32 serial, bool immediate) + u16 skew, u32 serial, bool immediate) { s8 prior = rxrpc_ack_priority[ack_reason]; if (prior > rxrpc_ack_priority[call->ackr_reason]) { spin_lock_bh(&call->lock); - __rxrpc_propose_ACK(call, ack_reason, serial, immediate); + __rxrpc_propose_ACK(call, ack_reason, skew, serial, immediate); spin_unlock_bh(&call->lock); } } @@ -628,7 +630,7 @@ process_further: if (ack.reason == RXRPC_ACK_PING) { _proto("Rx ACK %%%u PING Request", latest); rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE, - sp->hdr.serial, true); + skb->priority, sp->hdr.serial, true); } /* discard any out-of-order or duplicate ACKs */ @@ -1153,8 +1155,7 @@ skip_msg_init: goto maybe_reschedule; send_ACK_with_skew: - ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) - - ntohl(ack.serial)); + ack.maxSkew = htons(call->ackr_skew); send_ACK: mtu = call->conn->params.peer->if_mtu; mtu -= call->conn->params.peer->hdrsize; @@ -1244,7 +1245,8 @@ send_message_2: case RXRPC_CALL_SERVER_ACK_REQUEST: _debug("start ACK timer"); rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, - call->ackr_serial, false); + call->ackr_skew, call->ackr_serial, + false); default: break; } diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c index c631d926f4db..c1c6b7f305d1 100644 --- a/net/rxrpc/conn_event.c +++ b/net/rxrpc/conn_event.c @@ -25,6 +25,113 @@ #include "ar-internal.h" /* + * Retransmit terminal ACK or ABORT of the previous call. + */ +static void rxrpc_conn_retransmit(struct rxrpc_connection *conn, + struct sk_buff *skb) +{ + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + struct rxrpc_channel *chan; + struct msghdr msg; + struct kvec iov; + struct { + struct rxrpc_wire_header whdr; + union { + struct { + __be32 code; + } abort; + struct { + struct rxrpc_ackpacket ack; + struct rxrpc_ackinfo info; + }; + }; + } __attribute__((packed)) pkt; + size_t len; + u32 serial, mtu, call_id; + + _enter("%d", conn->debug_id); + + chan = &conn->channels[sp->hdr.cid & RXRPC_CHANNELMASK]; + + /* If the last call got moved on whilst we were waiting to run, just + * ignore this packet. + */ + call_id = READ_ONCE(chan->last_call); + /* Sync with __rxrpc_disconnect_call() */ + smp_rmb(); + if (call_id != sp->hdr.callNumber) + return; + + msg.msg_name = &conn->params.peer->srx.transport; + msg.msg_namelen = conn->params.peer->srx.transport_len; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + pkt.whdr.epoch = htonl(sp->hdr.epoch); + pkt.whdr.cid = htonl(sp->hdr.cid); + pkt.whdr.callNumber = htonl(sp->hdr.callNumber); + pkt.whdr.seq = 0; + pkt.whdr.type = chan->last_type; + pkt.whdr.flags = conn->out_clientflag; + pkt.whdr.userStatus = 0; + pkt.whdr.securityIndex = conn->security_ix; + pkt.whdr._rsvd = 0; + pkt.whdr.serviceId = htons(chan->last_service_id); + + len = sizeof(pkt.whdr); + switch (chan->last_type) { + case RXRPC_PACKET_TYPE_ABORT: + pkt.abort.code = htonl(chan->last_abort); + len += sizeof(pkt.abort); + break; + + case RXRPC_PACKET_TYPE_ACK: + mtu = conn->params.peer->if_mtu; + mtu -= conn->params.peer->hdrsize; + pkt.ack.bufferSpace = 0; + pkt.ack.maxSkew = htons(skb->priority); + pkt.ack.firstPacket = htonl(chan->last_seq); + pkt.ack.previousPacket = htonl(chan->last_seq - 1); + pkt.ack.serial = htonl(sp->hdr.serial); + pkt.ack.reason = RXRPC_ACK_DUPLICATE; + pkt.ack.nAcks = 0; + pkt.info.rxMTU = htonl(rxrpc_rx_mtu); + pkt.info.maxMTU = htonl(mtu); + pkt.info.rwind = htonl(rxrpc_rx_window_size); + pkt.info.jumbo_max = htonl(rxrpc_rx_jumbo_max); + len += sizeof(pkt.ack) + sizeof(pkt.info); + break; + } + + /* Resync with __rxrpc_disconnect_call() and check that the last call + * didn't get advanced whilst we were filling out the packets. + */ + smp_rmb(); + if (READ_ONCE(chan->last_call) != call_id) + return; + + iov.iov_base = &pkt; + iov.iov_len = len; + + serial = atomic_inc_return(&conn->serial); + pkt.whdr.serial = htonl(serial); + + switch (chan->last_type) { + case RXRPC_PACKET_TYPE_ABORT: + _proto("Tx ABORT %%%u { %d } [re]", serial, conn->local_abort); + break; + case RXRPC_PACKET_TYPE_ACK: + _proto("Tx ACK %%%u [re]", serial); + break; + } + + kernel_sendmsg(conn->params.local->socket, &msg, &iov, 1, len); + _leave(""); + return; +} + +/* * pass a connection-level abort onto all calls on that connection */ static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state, @@ -166,6 +273,12 @@ static int rxrpc_process_event(struct rxrpc_connection *conn, _enter("{%d},{%u,%%%u},", conn->debug_id, sp->hdr.type, sp->hdr.serial); switch (sp->hdr.type) { + case RXRPC_PACKET_TYPE_DATA: + case RXRPC_PACKET_TYPE_ACK: + rxrpc_conn_retransmit(conn, skb); + rxrpc_free_skb(skb); + return 0; + case RXRPC_PACKET_TYPE_ABORT: if (skb_copy_bits(skb, 0, &wtmp, sizeof(wtmp)) < 0) return -EPROTO; diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c index 6a5a17efc538..b4af37ebb112 100644 --- a/net/rxrpc/conn_object.c +++ b/net/rxrpc/conn_object.c @@ -56,6 +56,7 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp) atomic_set(&conn->avail_chans, RXRPC_MAXCALLS); conn->size_align = 4; conn->header_size = sizeof(struct rxrpc_wire_header); + conn->idle_timestamp = jiffies; } _leave(" = %p{%d}", conn, conn ? conn->debug_id : 0); @@ -165,7 +166,15 @@ void __rxrpc_disconnect_call(struct rxrpc_call *call) /* Save the result of the call so that we can repeat it if necessary * through the channel, whilst disposing of the actual call record. */ - chan->last_result = call->local_abort; + chan->last_service_id = call->service_id; + if (call->local_abort) { + chan->last_abort = call->local_abort; + chan->last_type = RXRPC_PACKET_TYPE_ABORT; + } else { + chan->last_seq = call->rx_data_eaten; + chan->last_type = RXRPC_PACKET_TYPE_ACK; + } + /* Sync with rxrpc_conn_retransmit(). */ smp_wmb(); chan->last_call = chan->call_id; chan->call_id = chan->call_counter; @@ -191,29 +200,16 @@ void rxrpc_disconnect_call(struct rxrpc_call *call) spin_unlock(&conn->channel_lock); call->conn = NULL; + conn->idle_timestamp = jiffies; rxrpc_put_connection(conn); } /* * release a virtual connection */ -void rxrpc_put_connection(struct rxrpc_connection *conn) +void __rxrpc_put_connection(struct rxrpc_connection *conn) { - if (!conn) - return; - - _enter("%p{u=%d,d=%d}", - conn, atomic_read(&conn->usage), conn->debug_id); - - ASSERTCMP(atomic_read(&conn->usage), >, 1); - - conn->put_time = ktime_get_seconds(); - if (atomic_dec_return(&conn->usage) == 1) { - _debug("zombie"); - rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0); - } - - _leave(""); + rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0); } /* @@ -248,14 +244,14 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu) static void rxrpc_connection_reaper(struct work_struct *work) { struct rxrpc_connection *conn, *_p; - unsigned long reap_older_than, earliest, put_time, now; + unsigned long reap_older_than, earliest, idle_timestamp, now; LIST_HEAD(graveyard); _enter(""); - now = ktime_get_seconds(); - reap_older_than = now - rxrpc_connection_expiry; + now = jiffies; + reap_older_than = now - rxrpc_connection_expiry * HZ; earliest = ULONG_MAX; write_lock(&rxrpc_connection_lock); @@ -264,10 +260,14 @@ static void rxrpc_connection_reaper(struct work_struct *work) if (likely(atomic_read(&conn->usage) > 1)) continue; - put_time = READ_ONCE(conn->put_time); - if (time_after(put_time, reap_older_than)) { - if (time_before(put_time, earliest)) - earliest = put_time; + idle_timestamp = READ_ONCE(conn->idle_timestamp); + _debug("reap CONN %d { u=%d,t=%ld }", + conn->debug_id, atomic_read(&conn->usage), + (long)reap_older_than - (long)idle_timestamp); + + if (time_after(idle_timestamp, reap_older_than)) { + if (time_before(idle_timestamp, earliest)) + earliest = idle_timestamp; continue; } @@ -288,9 +288,9 @@ static void rxrpc_connection_reaper(struct work_struct *work) if (earliest != ULONG_MAX) { _debug("reschedule reaper %ld", (long) earliest - now); - ASSERTCMP(earliest, >, now); + ASSERT(time_after(earliest, now)); rxrpc_queue_delayed_work(&rxrpc_connection_reap, - (earliest - now) * HZ); + earliest - now); } while (!list_empty(&graveyard)) { diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c index 70bb77818dea..66cdeb56f44f 100644 --- a/net/rxrpc/input.c +++ b/net/rxrpc/input.c @@ -125,6 +125,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call, bool terminal; int ret, ackbit, ack; u32 serial; + u16 skew; u8 flags; _enter("{%u,%u},,{%u}", call->rx_data_post, call->rx_first_oos, seq); @@ -133,6 +134,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call, ASSERTCMP(sp->call, ==, NULL); flags = sp->hdr.flags; serial = sp->hdr.serial; + skew = skb->priority; spin_lock(&call->lock); @@ -231,7 +233,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call, spin_unlock(&call->lock); atomic_inc(&call->ackr_not_idle); - rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false); + rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, skew, serial, false); _leave(" = 0 [posted]"); return 0; @@ -244,7 +246,7 @@ out: discard_and_ack: _debug("discard and ACK packet %p", skb); - __rxrpc_propose_ACK(call, ack, serial, true); + __rxrpc_propose_ACK(call, ack, skew, serial, true); discard: spin_unlock(&call->lock); rxrpc_free_skb(skb); @@ -252,7 +254,7 @@ discard: return 0; enqueue_and_ack: - __rxrpc_propose_ACK(call, ack, serial, true); + __rxrpc_propose_ACK(call, ack, skew, serial, true); enqueue_packet: _net("defer skb %p", skb); spin_unlock(&call->lock); @@ -304,7 +306,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb) { struct rxrpc_skb_priv *sp = rxrpc_skb(skb); __be32 wtmp; - u32 hi_serial, abort_code; + u32 abort_code; _enter("%p,%p", call, skb); @@ -321,18 +323,12 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb) } #endif - /* track the latest serial number on this connection for ACK packet - * information */ - hi_serial = atomic_read(&call->conn->hi_serial); - while (sp->hdr.serial > hi_serial) - hi_serial = atomic_cmpxchg(&call->conn->hi_serial, hi_serial, - sp->hdr.serial); - /* request ACK generation for any ACK or DATA packet that requests * it */ if (sp->hdr.flags & RXRPC_REQUEST_ACK) { _proto("ACK Requested on %%%u", sp->hdr.serial); - rxrpc_propose_ACK(call, RXRPC_ACK_REQUESTED, sp->hdr.serial, false); + rxrpc_propose_ACK(call, RXRPC_ACK_REQUESTED, + skb->priority, sp->hdr.serial, false); } switch (sp->hdr.type) { @@ -570,7 +566,8 @@ done: /* * post connection-level events to the connection - * - this includes challenges, responses and some aborts + * - this includes challenges, responses, some aborts and call terminal packet + * retransmission. */ static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn, struct sk_buff *skb) @@ -637,7 +634,7 @@ void rxrpc_data_ready(struct sock *sk) struct rxrpc_skb_priv *sp; struct rxrpc_local *local = sk->sk_user_data; struct sk_buff *skb; - int ret; + int ret, skew; _enter("%p", sk); @@ -700,25 +697,64 @@ void rxrpc_data_ready(struct sock *sk) rcu_read_lock(); conn = rxrpc_find_connection_rcu(local, skb); - if (!conn) + if (!conn) { + skb->priority = 0; goto cant_route_call; + } + + /* Note the serial number skew here */ + skew = (int)sp->hdr.serial - (int)conn->hi_serial; + if (skew >= 0) { + if (skew > 0) + conn->hi_serial = sp->hdr.serial; + skb->priority = 0; + } else { + skew = -skew; + skb->priority = min(skew, 65535); + } if (sp->hdr.callNumber == 0) { /* Connection-level packet */ _debug("CONN %p {%d}", conn, conn->debug_id); rxrpc_post_packet_to_conn(conn, skb); + goto out_unlock; } else { /* Call-bound packets are routed by connection channel. */ unsigned int channel = sp->hdr.cid & RXRPC_CHANNELMASK; struct rxrpc_channel *chan = &conn->channels[channel]; - struct rxrpc_call *call = rcu_dereference(chan->call); + struct rxrpc_call *call; + + /* Ignore really old calls */ + if (sp->hdr.callNumber < chan->last_call) + goto discard_unlock; + + if (sp->hdr.callNumber == chan->last_call) { + /* For the previous service call, if completed + * successfully, we discard all further packets. + */ + if (rxrpc_conn_is_service(call->conn) && + (chan->last_type == RXRPC_PACKET_TYPE_ACK || + sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)) + goto discard_unlock; + + /* But otherwise we need to retransmit the final packet + * from data cached in the connection record. + */ + rxrpc_post_packet_to_conn(conn, skb); + goto out_unlock; + } + call = rcu_dereference(chan->call); if (!call || atomic_read(&call->usage) == 0) goto cant_route_call; rxrpc_post_packet_to_call(call, skb); + goto out_unlock; } +discard_unlock: + rxrpc_free_skb(skb); +out_unlock: rcu_read_unlock(); out: return; diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c index f92de18b5893..31b7f36a39cb 100644 --- a/net/rxrpc/proc.c +++ b/net/rxrpc/proc.c @@ -165,7 +165,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v) rxrpc_conn_states[conn->state], key_serial(conn->params.key), atomic_read(&conn->serial), - atomic_read(&conn->hi_serial)); + conn->hi_serial); return 0; } diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c index d28058a97bc1..fbd8c74d9505 100644 --- a/net/rxrpc/skbuff.c +++ b/net/rxrpc/skbuff.c @@ -53,9 +53,9 @@ static void rxrpc_request_final_ACK(struct rxrpc_call *call) /* * drop the bottom ACK off of the call ACK window and advance the window */ -static void rxrpc_hard_ACK_data(struct rxrpc_call *call, - struct rxrpc_skb_priv *sp) +static void rxrpc_hard_ACK_data(struct rxrpc_call *call, struct sk_buff *skb) { + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); int loop; u32 seq; @@ -91,8 +91,8 @@ static void rxrpc_hard_ACK_data(struct rxrpc_call *call, * its Tx bufferage. */ _debug("send Rx idle ACK"); - __rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, sp->hdr.serial, - false); + __rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, + skb->priority, sp->hdr.serial, false); } spin_unlock_bh(&call->lock); @@ -125,7 +125,7 @@ void rxrpc_kernel_data_consumed(struct rxrpc_call *call, struct sk_buff *skb) ASSERTCMP(sp->hdr.seq, >, call->rx_data_eaten); call->rx_data_recv = sp->hdr.seq; - rxrpc_hard_ACK_data(call, sp); + rxrpc_hard_ACK_data(call, skb); } EXPORT_SYMBOL(rxrpc_kernel_data_consumed); |