diff options
author | Linus Torvalds | 2021-09-02 10:19:45 -0700 |
---|---|---|
committer | Linus Torvalds | 2021-09-02 10:19:45 -0700 |
commit | 265113f70f3d63ae8b6eb1ce4303d14dbbd71b2d (patch) | |
tree | 354bd2e4d642fee66d3fe2351607e4f081bd48f1 /fs | |
parent | b0cfcdd9b9672ea90642f33d6c0dd8516553adf2 (diff) | |
parent | ecd95673142ef80169a6c003b569b8a86d1e6329 (diff) |
Merge tag 'dlm-5.15' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm
Pull dlm updates from David Teigland:
"This set includes a number of minor fixes and cleanups related to the
networking changes in the last release.
A patch to delay ack messages reduces network traffic significantly"
* tag 'dlm-5.15' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
fs: dlm: avoid comms shutdown delay in release_lockspace
fs: dlm: fix return -EINTR on recovery stopped
fs: dlm: implement delayed ack handling
fs: dlm: move receive loop into receive handler
fs: dlm: fix multiple empty writequeue alloc
fs: dlm: generic connect func
fs: dlm: auto load sctp module
fs: dlm: introduce generic listen
fs: dlm: move to static proto ops
fs: dlm: introduce con_next_wq helper
fs: dlm: cleanup and remove _send_rcom
fs: dlm: clear CF_APP_LIMITED on close
fs: dlm: fix typo in tlv prefix
fs: dlm: use READ_ONCE for config var
fs: dlm: use sk->sk_socket instead of con->sock
Diffstat (limited to 'fs')
-rw-r--r-- | fs/dlm/dir.c | 4 | ||||
-rw-r--r-- | fs/dlm/dlm_internal.h | 2 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 3 | ||||
-rw-r--r-- | fs/dlm/lowcomms.c | 770 | ||||
-rw-r--r-- | fs/dlm/lowcomms.h | 1 | ||||
-rw-r--r-- | fs/dlm/member.c | 4 | ||||
-rw-r--r-- | fs/dlm/midcomms.c | 56 | ||||
-rw-r--r-- | fs/dlm/rcom.c | 29 | ||||
-rw-r--r-- | fs/dlm/recoverd.c | 4 |
9 files changed, 458 insertions, 415 deletions
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index 10c36ae1a8f9..45ebbe602bbf 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -85,8 +85,10 @@ int dlm_recover_directory(struct dlm_ls *ls) for (;;) { int left; error = dlm_recovery_stopped(ls); - if (error) + if (error) { + error = -EINTR; goto out_free; + } error = dlm_rcom_names(ls, memb->nodeid, last_name, last_len); diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 91d1ca3a121a..5f57538b5d45 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -468,7 +468,7 @@ struct dlm_rcom { struct dlm_opt_header { uint16_t t_type; uint16_t t_length; - uint32_t o_pad; + uint32_t t_pad; /* need to be 8 byte aligned */ char t_value[]; }; diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index d71aba8c3e64..10eddfa6c3d7 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -498,7 +498,7 @@ static int new_lockspace(const char *name, const char *cluster, ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS | DLM_LSFL_NEWEXCL)); - size = dlm_config.ci_rsbtbl_size; + size = READ_ONCE(dlm_config.ci_rsbtbl_size); ls->ls_rsbtbl_size = size; ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable))); @@ -793,6 +793,7 @@ static int release_lockspace(struct dlm_ls *ls, int force) if (ls_count == 1) { dlm_scand_stop(); + dlm_clear_members(ls); dlm_midcomms_shutdown(); } diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 0ea9ae35da0b..8f715c620e1f 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -84,9 +84,7 @@ struct connection { struct list_head writequeue; /* List of outgoing writequeue_entries */ spinlock_t writequeue_lock; atomic_t writequeue_cnt; - void (*connect_action) (struct connection *); /* What to do to connect */ - void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ - bool (*eof_condition)(struct connection *con); /* What to do to eof check */ + struct mutex wq_alloc; int retries; #define MAX_CONNECT_RETRIES 3 struct hlist_node list; @@ -145,6 +143,24 @@ struct dlm_node_addr { struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; }; +struct dlm_proto_ops { + bool try_new_addr; + const char *name; + int proto; + + int (*connect)(struct connection *con, struct socket *sock, + struct sockaddr *addr, int addr_len); + void (*sockopts)(struct socket *sock); + int (*bind)(struct socket *sock); + int (*listen_validate)(void); + void (*listen_sockopts)(struct socket *sock); + int (*listen_bind)(struct socket *sock); + /* What to do to shutdown */ + void (*shutdown_action)(struct connection *con); + /* What to do to eof check */ + bool (*eof_condition)(struct connection *con); +}; + static struct listen_sock_callbacks { void (*sk_error_report)(struct sock *); void (*sk_data_ready)(struct sock *); @@ -168,12 +184,26 @@ static struct hlist_head connection_hash[CONN_HASH_SIZE]; static DEFINE_SPINLOCK(connections_lock); DEFINE_STATIC_SRCU(connections_srcu); +static const struct dlm_proto_ops *dlm_proto_ops; + static void process_recv_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work); -static void sctp_connect_to_sock(struct connection *con); -static void tcp_connect_to_sock(struct connection *con); -static void dlm_tcp_shutdown(struct connection *con); +/* need to held writequeue_lock */ +static struct writequeue_entry *con_next_wq(struct connection *con) +{ + struct writequeue_entry *e; + + if (list_empty(&con->writequeue)) + return NULL; + + e = list_first_entry(&con->writequeue, struct writequeue_entry, + list); + if (e->len == 0) + return NULL; + + return e; +} static struct connection *__find_con(int nodeid, int r) { @@ -208,20 +238,6 @@ static int dlm_con_init(struct connection *con, int nodeid) INIT_WORK(&con->rwork, process_recv_sockets); init_waitqueue_head(&con->shutdown_wait); - switch (dlm_config.ci_protocol) { - case DLM_PROTO_TCP: - con->connect_action = tcp_connect_to_sock; - con->shutdown_action = dlm_tcp_shutdown; - con->eof_condition = tcp_eof_condition; - break; - case DLM_PROTO_SCTP: - con->connect_action = sctp_connect_to_sock; - break; - default: - kfree(con->rx_buf); - return -EINVAL; - } - return 0; } @@ -249,6 +265,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) return NULL; } + mutex_init(&con->wq_alloc); + spin_lock(&connections_lock); /* Because multiple workqueues/threads calls this function it can * race on multiple cpu's. Instead of locking hot path __find_con() @@ -583,8 +601,7 @@ static void lowcomms_error_report(struct sock *sk) goto out; orig_report = listen_sock.sk_error_report; - if (con->sock == NULL || - kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) { + if (kernel_getpeername(sk->sk_socket, (struct sockaddr *)&saddr) < 0) { printk_ratelimited(KERN_ERR "dlm: node %d: socket error " "sending to node %d, port %d, " "sk_err=%d/%d\n", dlm_our_nodeid(), @@ -801,6 +818,7 @@ static void close_connection(struct connection *con, bool and_other, con->rx_leftover = 0; con->retries = 0; + clear_bit(CF_APP_LIMITED, &con->flags); clear_bit(CF_CONNECTED, &con->flags); clear_bit(CF_DELAY_CONNECT, &con->flags); clear_bit(CF_RECONNECT, &con->flags); @@ -877,7 +895,6 @@ static int con_realloc_receive_buf(struct connection *con, int newlen) /* Data received from remote end */ static int receive_from_sock(struct connection *con) { - int call_again_soon = 0; struct msghdr msg; struct kvec iov; int ret, buflen; @@ -897,41 +914,40 @@ static int receive_from_sock(struct connection *con) goto out_resched; } - /* calculate new buffer parameter regarding last receive and - * possible leftover bytes - */ - iov.iov_base = con->rx_buf + con->rx_leftover; - iov.iov_len = con->rx_buflen - con->rx_leftover; - - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; - ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, - msg.msg_flags); - if (ret <= 0) - goto out_close; - else if (ret == iov.iov_len) - call_again_soon = 1; - - /* new buflen according readed bytes and leftover from last receive */ - buflen = ret + con->rx_leftover; - ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); - if (ret < 0) - goto out_close; + for (;;) { + /* calculate new buffer parameter regarding last receive and + * possible leftover bytes + */ + iov.iov_base = con->rx_buf + con->rx_leftover; + iov.iov_len = con->rx_buflen - con->rx_leftover; + + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, + msg.msg_flags); + if (ret == -EAGAIN) + break; + else if (ret <= 0) + goto out_close; - /* calculate leftover bytes from process and put it into begin of - * the receive buffer, so next receive we have the full message - * at the start address of the receive buffer. - */ - con->rx_leftover = buflen - ret; - if (con->rx_leftover) { - memmove(con->rx_buf, con->rx_buf + ret, - con->rx_leftover); - call_again_soon = true; + /* new buflen according readed bytes and leftover from last receive */ + buflen = ret + con->rx_leftover; + ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); + if (ret < 0) + goto out_close; + + /* calculate leftover bytes from process and put it into begin of + * the receive buffer, so next receive we have the full message + * at the start address of the receive buffer. + */ + con->rx_leftover = buflen - ret; + if (con->rx_leftover) { + memmove(con->rx_buf, con->rx_buf + ret, + con->rx_leftover); + } } - if (call_again_soon) - goto out_resched; - + dlm_midcomms_receive_done(con->nodeid); mutex_unlock(&con->sock_mutex); return 0; @@ -946,7 +962,8 @@ out_close: log_print("connection %p got EOF from %d", con, con->nodeid); - if (con->eof_condition && con->eof_condition(con)) { + if (dlm_proto_ops->eof_condition && + dlm_proto_ops->eof_condition(con)) { set_bit(CF_EOF, &con->flags); mutex_unlock(&con->sock_mutex); } else { @@ -1134,242 +1151,6 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port) return result; } -/* Initiate an SCTP association. - This is a special case of send_to_sock() in that we don't yet have a - peeled-off socket for this association, so we use the listening socket - and add the primary IP address of the remote node. - */ -static void sctp_connect_to_sock(struct connection *con) -{ - struct sockaddr_storage daddr; - int result; - int addr_len; - struct socket *sock; - unsigned int mark; - - mutex_lock(&con->sock_mutex); - - /* Some odd races can cause double-connects, ignore them */ - if (con->retries++ > MAX_CONNECT_RETRIES) - goto out; - - if (con->sock) { - log_print("node %d already connected.", con->nodeid); - goto out; - } - - memset(&daddr, 0, sizeof(daddr)); - result = nodeid_to_addr(con->nodeid, &daddr, NULL, true, &mark); - if (result < 0) { - log_print("no address for nodeid %d", con->nodeid); - goto out; - } - - /* Create a socket to communicate with */ - result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, - SOCK_STREAM, IPPROTO_SCTP, &sock); - if (result < 0) - goto socket_err; - - sock_set_mark(sock->sk, mark); - - add_sock(sock, con); - - /* Bind to all addresses. */ - if (sctp_bind_addrs(con->sock, 0)) - goto bind_err; - - make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len); - - log_print_ratelimited("connecting to %d", con->nodeid); - - /* Turn off Nagle's algorithm */ - sctp_sock_set_nodelay(sock->sk); - - /* - * Make sock->ops->connect() function return in specified time, - * since O_NONBLOCK argument in connect() function does not work here, - * then, we should restore the default value of this attribute. - */ - sock_set_sndtimeo(sock->sk, 5); - result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len, - 0); - sock_set_sndtimeo(sock->sk, 0); - - if (result == -EINPROGRESS) - result = 0; - if (result == 0) { - if (!test_and_set_bit(CF_CONNECTED, &con->flags)) - log_print("successful connected to node %d", con->nodeid); - goto out; - } - -bind_err: - con->sock = NULL; - sock_release(sock); - -socket_err: - /* - * Some errors are fatal and this list might need adjusting. For other - * errors we try again until the max number of retries is reached. - */ - if (result != -EHOSTUNREACH && - result != -ENETUNREACH && - result != -ENETDOWN && - result != -EINVAL && - result != -EPROTONOSUPPORT) { - log_print("connect %d try %d error %d", con->nodeid, - con->retries, result); - mutex_unlock(&con->sock_mutex); - msleep(1000); - lowcomms_connect_sock(con); - return; - } - -out: - mutex_unlock(&con->sock_mutex); -} - -/* Connect a new socket to its peer */ -static void tcp_connect_to_sock(struct connection *con) -{ - struct sockaddr_storage saddr, src_addr; - unsigned int mark; - int addr_len; - struct socket *sock = NULL; - int result; - - mutex_lock(&con->sock_mutex); - if (con->retries++ > MAX_CONNECT_RETRIES) - goto out; - - /* Some odd races can cause double-connects, ignore them */ - if (con->sock) - goto out; - - /* Create a socket to communicate with */ - result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, - SOCK_STREAM, IPPROTO_TCP, &sock); - if (result < 0) - goto out_err; - - memset(&saddr, 0, sizeof(saddr)); - result = nodeid_to_addr(con->nodeid, &saddr, NULL, false, &mark); - if (result < 0) { - log_print("no address for nodeid %d", con->nodeid); - goto out_err; - } - - sock_set_mark(sock->sk, mark); - - add_sock(sock, con); - - /* Bind to our cluster-known address connecting to avoid - routing problems */ - memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); - make_sockaddr(&src_addr, 0, &addr_len); - result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, - addr_len); - if (result < 0) { - log_print("could not bind for connect: %d", result); - /* This *may* not indicate a critical error */ - } - - make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); - - log_print_ratelimited("connecting to %d", con->nodeid); - - /* Turn off Nagle's algorithm */ - tcp_sock_set_nodelay(sock->sk); - - result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, - O_NONBLOCK); - if (result == -EINPROGRESS) - result = 0; - if (result == 0) - goto out; - -out_err: - if (con->sock) { - sock_release(con->sock); - con->sock = NULL; - } else if (sock) { - sock_release(sock); - } - /* - * Some errors are fatal and this list might need adjusting. For other - * errors we try again until the max number of retries is reached. - */ - if (result != -EHOSTUNREACH && - result != -ENETUNREACH && - result != -ENETDOWN && - result != -EINVAL && - result != -EPROTONOSUPPORT) { - log_print("connect %d try %d error %d", con->nodeid, - con->retries, result); - mutex_unlock(&con->sock_mutex); - msleep(1000); - lowcomms_connect_sock(con); - return; - } -out: - mutex_unlock(&con->sock_mutex); - return; -} - -/* On error caller must run dlm_close_sock() for the - * listen connection socket. - */ -static int tcp_create_listen_sock(struct listen_connection *con, - struct sockaddr_storage *saddr) -{ - struct socket *sock = NULL; - int result = 0; - int addr_len; - - if (dlm_local_addr[0]->ss_family == AF_INET) - addr_len = sizeof(struct sockaddr_in); - else - addr_len = sizeof(struct sockaddr_in6); - - /* Create a socket to communicate with */ - result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, - SOCK_STREAM, IPPROTO_TCP, &sock); - if (result < 0) { - log_print("Can't create listening comms socket"); - goto create_out; - } - - sock_set_mark(sock->sk, dlm_config.ci_mark); - - /* Turn off Nagle's algorithm */ - tcp_sock_set_nodelay(sock->sk); - - sock_set_reuseaddr(sock->sk); - - add_listen_sock(sock, con); - - /* Bind to our port */ - make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); - result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); - if (result < 0) { - log_print("Can't bind to port %d", dlm_config.ci_tcp_port); - goto create_out; - } - sock_set_keepalive(sock->sk); - - result = sock->ops->listen(sock, 5); - if (result < 0) { - log_print("Can't listen on port %d", dlm_config.ci_tcp_port); - goto create_out; - } - - return 0; - -create_out: - return result; -} - /* Get local addresses */ static void init_local(void) { @@ -1396,63 +1177,6 @@ static void deinit_local(void) kfree(dlm_local_addr[i]); } -/* Initialise SCTP socket and bind to all interfaces - * On error caller must run dlm_close_sock() for the - * listen connection socket. - */ -static int sctp_listen_for_all(struct listen_connection *con) -{ - struct socket *sock = NULL; - int result = -EINVAL; - - log_print("Using SCTP for communications"); - - result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, - SOCK_STREAM, IPPROTO_SCTP, &sock); - if (result < 0) { - log_print("Can't create comms socket, check SCTP is loaded"); - goto out; - } - - sock_set_rcvbuf(sock->sk, NEEDED_RMEM); - sock_set_mark(sock->sk, dlm_config.ci_mark); - sctp_sock_set_nodelay(sock->sk); - - add_listen_sock(sock, con); - - /* Bind to all addresses. */ - result = sctp_bind_addrs(con->sock, dlm_config.ci_tcp_port); - if (result < 0) - goto out; - - result = sock->ops->listen(sock, 5); - if (result < 0) { - log_print("Can't set socket listening"); - goto out; - } - - return 0; - -out: - return result; -} - -static int tcp_listen_for_all(void) -{ - /* We don't support multi-homed hosts */ - if (dlm_local_count > 1) { - log_print("TCP protocol can't handle multi-homed hosts, " - "try SCTP"); - return -EINVAL; - } - - log_print("Using TCP for communications"); - - return tcp_create_listen_sock(&listen_con, dlm_local_addr[0]); -} - - - static struct writequeue_entry *new_writequeue_entry(struct connection *con, gfp_t allocation) { @@ -1528,19 +1252,37 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, { struct writequeue_entry *e; struct dlm_msg *msg; + bool sleepable; msg = kzalloc(sizeof(*msg), allocation); if (!msg) return NULL; + /* this mutex is being used as a wait to avoid multiple "fast" + * new writequeue page list entry allocs in new_wq_entry in + * normal operation which is sleepable context. Without it + * we could end in multiple writequeue entries with one + * dlm message because multiple callers were waiting at + * the writequeue_lock in new_wq_entry(). + */ + sleepable = gfpflags_normal_context(allocation); + if (sleepable) + mutex_lock(&con->wq_alloc); + kref_init(&msg->ref); e = new_wq_entry(con, len, allocation, ppc, cb, mh); if (!e) { + if (sleepable) + mutex_unlock(&con->wq_alloc); + kfree(msg); return NULL; } + if (sleepable) + mutex_unlock(&con->wq_alloc); + msg->ppc = *ppc; msg->len = len; msg->entry = e; @@ -1646,10 +1388,9 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg) /* Send a message */ static void send_to_sock(struct connection *con) { - int ret = 0; const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; struct writequeue_entry *e; - int len, offset; + int len, offset, ret; int count = 0; mutex_lock(&con->sock_mutex); @@ -1658,7 +1399,8 @@ static void send_to_sock(struct connection *con) spin_lock(&con->writequeue_lock); for (;;) { - if (list_empty(&con->writequeue)) + e = con_next_wq(con); + if (!e) break; e = list_first_entry(&con->writequeue, struct writequeue_entry, list); @@ -1667,25 +1409,22 @@ static void send_to_sock(struct connection *con) BUG_ON(len == 0 && e->users == 0); spin_unlock(&con->writequeue_lock); - ret = 0; - if (len) { - ret = kernel_sendpage(con->sock, e->page, offset, len, - msg_flags); - if (ret == -EAGAIN || ret == 0) { - if (ret == -EAGAIN && - test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && - !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { - /* Notify TCP that we're limited by the - * application window size. - */ - set_bit(SOCK_NOSPACE, &con->sock->flags); - con->sock->sk->sk_write_pending++; - } - cond_resched(); - goto out; - } else if (ret < 0) - goto out; - } + ret = kernel_sendpage(con->sock, e->page, offset, len, + msg_flags); + if (ret == -EAGAIN || ret == 0) { + if (ret == -EAGAIN && + test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && + !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { + /* Notify TCP that we're limited by the + * application window size. + */ + set_bit(SOCK_NOSPACE, &con->sock->flags); + con->sock->sk->sk_write_pending++; + } + cond_resched(); + goto out; + } else if (ret < 0) + goto out; /* Don't starve people filling buffers */ if (++count >= MAX_SEND_MSG_COUNT) { @@ -1770,12 +1509,9 @@ int dlm_lowcomms_close(int nodeid) static void process_recv_sockets(struct work_struct *work) { struct connection *con = container_of(work, struct connection, rwork); - int err; clear_bit(CF_READ_PENDING, &con->flags); - do { - err = receive_from_sock(con); - } while (!err); + receive_from_sock(con); } static void process_listen_recv_socket(struct work_struct *work) @@ -1783,6 +1519,74 @@ static void process_listen_recv_socket(struct work_struct *work) accept_from_sock(&listen_con); } +static void dlm_connect(struct connection *con) +{ + struct sockaddr_storage addr; + int result, addr_len; + struct socket *sock; + unsigned int mark; + + /* Some odd races can cause double-connects, ignore them */ + if (con->retries++ > MAX_CONNECT_RETRIES) + return; + + if (con->sock) { + log_print("node %d already connected.", con->nodeid); + return; + } + + memset(&addr, 0, sizeof(addr)); + result = nodeid_to_addr(con->nodeid, &addr, NULL, + dlm_proto_ops->try_new_addr, &mark); + if (result < 0) { + log_print("no address for nodeid %d", con->nodeid); + return; + } + + /* Create a socket to communicate with */ + result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, + SOCK_STREAM, dlm_proto_ops->proto, &sock); + if (result < 0) + goto socket_err; + + sock_set_mark(sock->sk, mark); + dlm_proto_ops->sockopts(sock); + + add_sock(sock, con); + + result = dlm_proto_ops->bind(sock); + if (result < 0) + goto add_sock_err; + + log_print_ratelimited("connecting to %d", con->nodeid); + make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); + result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, + addr_len); + if (result < 0) + goto add_sock_err; + + return; + +add_sock_err: + dlm_close_sock(&con->sock); + +socket_err: + /* + * Some errors are fatal and this list might need adjusting. For other + * errors we try again until the max number of retries is reached. + */ + if (result != -EHOSTUNREACH && + result != -ENETUNREACH && + result != -ENETDOWN && + result != -EINVAL && + result != -EPROTONOSUPPORT) { + log_print("connect %d try %d error %d", con->nodeid, + con->retries, result); + msleep(1000); + lowcomms_connect_sock(con); + } +} + /* Send workqueue function */ static void process_send_sockets(struct work_struct *work) { @@ -1797,11 +1601,15 @@ static void process_send_sockets(struct work_struct *work) dlm_midcomms_unack_msg_resend(con->nodeid); } - if (con->sock == NULL) { /* not mutex protected so check it inside too */ + if (con->sock == NULL) { if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags)) msleep(1000); - con->connect_action(con); + + mutex_lock(&con->sock_mutex); + dlm_connect(con); + mutex_unlock(&con->sock_mutex); } + if (!list_empty(&con->writequeue)) send_to_sock(con); } @@ -1840,8 +1648,8 @@ static int work_start(void) static void shutdown_conn(struct connection *con) { - if (con->shutdown_action) - con->shutdown_action(con); + if (dlm_proto_ops->shutdown_action) + dlm_proto_ops->shutdown_action(con); } void dlm_lowcomms_shutdown(void) @@ -1948,8 +1756,198 @@ void dlm_lowcomms_stop(void) srcu_read_unlock(&connections_srcu, idx); work_stop(); deinit_local(); + + dlm_proto_ops = NULL; } +static int dlm_listen_for_all(void) +{ + struct socket *sock; + int result; + + log_print("Using %s for communications", + dlm_proto_ops->name); + + result = dlm_proto_ops->listen_validate(); + if (result < 0) + return result; + + result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, + SOCK_STREAM, dlm_proto_ops->proto, &sock); + if (result < 0) { + log_print("Can't create comms socket, check SCTP is loaded"); + goto out; + } + + sock_set_mark(sock->sk, dlm_config.ci_mark); + dlm_proto_ops->listen_sockopts(sock); + + result = dlm_proto_ops->listen_bind(sock); + if (result < 0) + goto out; + + save_listen_callbacks(sock); + add_listen_sock(sock, &listen_con); + + INIT_WORK(&listen_con.rwork, process_listen_recv_socket); + result = sock->ops->listen(sock, 5); + if (result < 0) { + dlm_close_sock(&listen_con.sock); + goto out; + } + + return 0; + +out: + sock_release(sock); + return result; +} + +static int dlm_tcp_bind(struct socket *sock) +{ + struct sockaddr_storage src_addr; + int result, addr_len; + + /* Bind to our cluster-known address connecting to avoid + * routing problems. + */ + memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); + make_sockaddr(&src_addr, 0, &addr_len); + + result = sock->ops->bind(sock, (struct sockaddr *)&src_addr, + addr_len); + if (result < 0) { + /* This *may* not indicate a critical error */ + log_print("could not bind for connect: %d", result); + } + + return 0; +} + +static int dlm_tcp_connect(struct connection *con, struct socket *sock, + struct sockaddr *addr, int addr_len) +{ + int ret; + + ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK); + switch (ret) { + case -EINPROGRESS: + fallthrough; + case 0: + return 0; + } + + return ret; +} + +static int dlm_tcp_listen_validate(void) +{ + /* We don't support multi-homed hosts */ + if (dlm_local_count > 1) { + log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); + return -EINVAL; + } + + return 0; +} + +static void dlm_tcp_sockopts(struct socket *sock) +{ + /* Turn off Nagle's algorithm */ + tcp_sock_set_nodelay(sock->sk); +} + +static void dlm_tcp_listen_sockopts(struct socket *sock) +{ + dlm_tcp_sockopts(sock); + sock_set_reuseaddr(sock->sk); +} + +static int dlm_tcp_listen_bind(struct socket *sock) +{ + int addr_len; + + /* Bind to our port */ + make_sockaddr(dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); + return sock->ops->bind(sock, (struct sockaddr *)dlm_local_addr[0], + addr_len); +} + +static const struct dlm_proto_ops dlm_tcp_ops = { + .name = "TCP", + .proto = IPPROTO_TCP, + .connect = dlm_tcp_connect, + .sockopts = dlm_tcp_sockopts, + .bind = dlm_tcp_bind, + .listen_validate = dlm_tcp_listen_validate, + .listen_sockopts = dlm_tcp_listen_sockopts, + .listen_bind = dlm_tcp_listen_bind, + .shutdown_action = dlm_tcp_shutdown, + .eof_condition = tcp_eof_condition, +}; + +static int dlm_sctp_bind(struct socket *sock) +{ + return sctp_bind_addrs(sock, 0); +} + +static int dlm_sctp_connect(struct connection *con, struct socket *sock, + struct sockaddr *addr, int addr_len) +{ + int ret; + + /* + * Make sock->ops->connect() function return in specified time, + * since O_NONBLOCK argument in connect() function does not work here, + * then, we should restore the default value of this attribute. + */ + sock_set_sndtimeo(sock->sk, 5); + ret = sock->ops->connect(sock, addr, addr_len, 0); + sock_set_sndtimeo(sock->sk, 0); + if (ret < 0) + return ret; + + if (!test_and_set_bit(CF_CONNECTED, &con->flags)) + log_print("successful connected to node %d", con->nodeid); + + return 0; +} + +static int dlm_sctp_listen_validate(void) +{ + if (!IS_ENABLED(CONFIG_IP_SCTP)) { + log_print("SCTP is not enabled by this kernel"); + return -EOPNOTSUPP; + } + + request_module("sctp"); + return 0; +} + +static int dlm_sctp_bind_listen(struct socket *sock) +{ + return sctp_bind_addrs(sock, dlm_config.ci_tcp_port); +} + +static void dlm_sctp_sockopts(struct socket *sock) +{ + /* Turn off Nagle's algorithm */ + sctp_sock_set_nodelay(sock->sk); + sock_set_rcvbuf(sock->sk, NEEDED_RMEM); +} + +static const struct dlm_proto_ops dlm_sctp_ops = { + .name = "SCTP", + .proto = IPPROTO_SCTP, + .try_new_addr = true, + .connect = dlm_sctp_connect, + .sockopts = dlm_sctp_sockopts, + .bind = dlm_sctp_bind, + .listen_validate = dlm_sctp_listen_validate, + .listen_sockopts = dlm_sctp_sockopts, + .listen_bind = dlm_sctp_bind_listen, +}; + int dlm_lowcomms_start(void) { int error = -EINVAL; @@ -1976,23 +1974,27 @@ int dlm_lowcomms_start(void) /* Start listening */ switch (dlm_config.ci_protocol) { case DLM_PROTO_TCP: - error = tcp_listen_for_all(); + dlm_proto_ops = &dlm_tcp_ops; break; case DLM_PROTO_SCTP: - error = sctp_listen_for_all(&listen_con); + dlm_proto_ops = &dlm_sctp_ops; break; default: log_print("Invalid protocol identifier %d set", dlm_config.ci_protocol); error = -EINVAL; - break; + goto fail_proto_ops; } + + error = dlm_listen_for_all(); if (error) - goto fail_unlisten; + goto fail_listen; return 0; -fail_unlisten: +fail_listen: + dlm_proto_ops = NULL; +fail_proto_ops: dlm_allow_conn = 0; dlm_close_sock(&listen_con.sock); work_stop(); diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h index aaae7115c00d..4ccae07cf005 100644 --- a/fs/dlm/lowcomms.h +++ b/fs/dlm/lowcomms.h @@ -46,6 +46,7 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg); int dlm_lowcomms_connect_node(int nodeid); int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark); int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); +void dlm_midcomms_receive_done(int nodeid); #endif /* __LOWCOMMS_DOT_H__ */ diff --git a/fs/dlm/member.c b/fs/dlm/member.c index d9e1e4170eb1..731d489aa323 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -443,8 +443,10 @@ static int ping_members(struct dlm_ls *ls) list_for_each_entry(memb, &ls->ls_nodes, list) { error = dlm_recovery_stopped(ls); - if (error) + if (error) { + error = -EINTR; break; + } error = dlm_rcom_status(ls, memb->nodeid, 0); if (error) break; diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index e3de268898ed..7ae39ec8d9b0 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -109,12 +109,6 @@ * compatibility. There exists better ways to make a better handling. * However this should be changed in the next major version bump of dlm. * - * Ack handling: - * - * Currently we send an ack message for every dlm message. However we - * can ack multiple dlm messages with one ack by just delaying the ack - * message. Will reduce some traffic but makes the drop detection slower. - * * Tail Size checking: * * There exists a message tail payload in e.g. DLM_MSG however we don't @@ -169,6 +163,7 @@ struct midcomms_node { #define DLM_NODE_FLAG_CLOSE 1 #define DLM_NODE_FLAG_STOP_TX 2 #define DLM_NODE_FLAG_STOP_RX 3 +#define DLM_NODE_ULP_DELIVERED 4 unsigned long flags; wait_queue_head_t shutdown_wait; @@ -480,11 +475,12 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, { if (seq == node->seq_next) { node->seq_next++; - /* send ack before fin */ - dlm_send_ack(node->nodeid, node->seq_next); switch (p->header.h_cmd) { case DLM_FIN: + /* send ack before fin */ + dlm_send_ack(node->nodeid, node->seq_next); + spin_lock(&node->state_lock); pr_debug("receive fin msg from node %d with state %s\n", node->nodeid, dlm_state_str(node->state)); @@ -534,6 +530,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, default: WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags)); dlm_receive_buffer(p, node->nodeid); + set_bit(DLM_NODE_ULP_DELIVERED, &node->flags); break; } } else { @@ -933,6 +930,49 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) return ret; } +void dlm_midcomms_receive_done(int nodeid) +{ + struct midcomms_node *node; + int idx; + + idx = srcu_read_lock(&nodes_srcu); + node = nodeid2node(nodeid, 0); + if (!node) { + srcu_read_unlock(&nodes_srcu, idx); + return; + } + + /* old protocol, we do nothing */ + switch (node->version) { + case DLM_VERSION_3_2: + break; + default: + srcu_read_unlock(&nodes_srcu, idx); + return; + } + + /* do nothing if we didn't delivered stateful to ulp */ + if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED, + &node->flags)) { + srcu_read_unlock(&nodes_srcu, idx); + return; + } + + spin_lock(&node->state_lock); + /* we only ack if state is ESTABLISHED */ + switch (node->state) { + case DLM_ESTABLISHED: + spin_unlock(&node->state_lock); + dlm_send_ack(node->nodeid, node->seq_next); + break; + default: + spin_unlock(&node->state_lock); + /* do nothing FIN has it's own ack send */ + break; + }; + srcu_read_unlock(&nodes_srcu, idx); +} + void dlm_midcomms_unack_msg_resend(int nodeid) { struct midcomms_node *node; diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index 5651933f54a4..6cba86470278 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -89,22 +89,15 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type, return 0; } -static void _send_rcom(struct dlm_ls *ls, struct dlm_rcom *rc) +static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc) { dlm_rcom_out(rc); -} - -static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh, - struct dlm_rcom *rc) -{ - _send_rcom(ls, rc); dlm_midcomms_commit_mhandle(mh); } -static void send_rcom_stateless(struct dlm_ls *ls, struct dlm_msg *msg, - struct dlm_rcom *rc) +static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc) { - _send_rcom(ls, rc); + dlm_rcom_out(rc); dlm_lowcomms_commit_msg(msg); dlm_lowcomms_put_msg(msg); } @@ -204,7 +197,7 @@ retry: allow_sync_reply(ls, &rc->rc_id); memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE); - send_rcom_stateless(ls, msg, rc); + send_rcom_stateless(msg, rc); error = dlm_wait_function(ls, &rcom_response); disallow_sync_reply(ls); @@ -287,7 +280,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) spin_unlock(&ls->ls_recover_lock); do_send: - send_rcom_stateless(ls, msg, rc); + send_rcom_stateless(msg, rc); } static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) @@ -327,7 +320,7 @@ retry: allow_sync_reply(ls, &rc->rc_id); memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE); - send_rcom_stateless(ls, msg, rc); + send_rcom_stateless(msg, rc); error = dlm_wait_function(ls, &rcom_response); disallow_sync_reply(ls); @@ -356,7 +349,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, nodeid); - send_rcom_stateless(ls, msg, rc); + send_rcom_stateless(msg, rc); } int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) @@ -373,7 +366,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) memcpy(rc->rc_buf, r->res_name, r->res_length); rc->rc_id = (unsigned long) r->res_id; - send_rcom(ls, mh, rc); + send_rcom(mh, rc); out: return error; } @@ -404,7 +397,7 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) rc->rc_id = rc_in->rc_id; rc->rc_seq_reply = rc_in->rc_seq; - send_rcom(ls, mh, rc); + send_rcom(mh, rc); } static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) @@ -461,7 +454,7 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) pack_rcom_lock(r, lkb, rl); rc->rc_id = (unsigned long) r; - send_rcom(ls, mh, rc); + send_rcom(mh, rc); out: return error; } @@ -487,7 +480,7 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) rc->rc_id = rc_in->rc_id; rc->rc_seq_reply = rc_in->rc_seq; - send_rcom(ls, mh, rc); + send_rcom(mh, rc); } /* If the lockspace doesn't exist then still send a status message diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 85e245392715..97d052cea5a9 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -125,8 +125,10 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_recover_waiters_pre(ls); error = dlm_recovery_stopped(ls); - if (error) + if (error) { + error = -EINTR; goto fail; + } if (neg || dlm_no_directory(ls)) { /* |