Perform completion event acknowledgments in batches instead
[~shefty/librdmacm.git] / src / rsocket.c
1 /*
2  * Copyright (c) 2008-2013 Intel Corporation.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  *
32  */
33
34 #if HAVE_CONFIG_H
35 #  include <config.h>
36 #endif /* HAVE_CONFIG_H */
37
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <sys/time.h>
41 #include <stdarg.h>
42 #include <netdb.h>
43 #include <unistd.h>
44 #include <fcntl.h>
45 #include <stdio.h>
46 #include <stddef.h>
47 #include <string.h>
48 #include <netinet/in.h>
49 #include <netinet/tcp.h>
50 #include <sys/epoll.h>
51 #include <search.h>
52
53 #include <rdma/rdma_cma.h>
54 #include <rdma/rdma_verbs.h>
55 #include <rdma/rsocket.h>
56 #include "cma.h"
57 #include "indexer.h"
58
59 #define RS_OLAP_START_SIZE 2048
60 #define RS_MAX_TRANSFER 65536
61 #define RS_SNDLOWAT 2048
62 #define RS_QP_MIN_SIZE 16
63 #define RS_QP_MAX_SIZE 0xFFFE
64 #define RS_QP_CTRL_SIZE 4       /* must be power of 2 */
65 #define RS_CONN_RETRIES 6
66 #define RS_SGL_SIZE 2
67 static struct index_map idm;
68 static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
69
70 struct rsocket;
71
72 enum {
73         RS_SVC_NOOP,
74         RS_SVC_ADD_DGRAM,
75         RS_SVC_REM_DGRAM,
76         RS_SVC_ADD_KEEPALIVE,
77         RS_SVC_REM_KEEPALIVE,
78         RS_SVC_MOD_KEEPALIVE
79 };
80
81 struct rs_svc_msg {
82         uint32_t cmd;
83         uint32_t status;
84         struct rsocket *rs;
85 };
86
87 struct rs_svc {
88         pthread_t id;
89         int sock[2];
90         int cnt;
91         int size;
92         int context_size;
93         void *(*run)(void *svc);
94         struct rsocket **rss;
95         void *contexts;
96 };
97
98 static struct pollfd *udp_svc_fds;
99 static void *udp_svc_run(void *arg);
100 static struct rs_svc udp_svc = {
101         .context_size = sizeof(*udp_svc_fds),
102         .run = udp_svc_run
103 };
104 static uint32_t *tcp_svc_timeouts;
105 static void *tcp_svc_run(void *arg);
106 static struct rs_svc tcp_svc = {
107         .context_size = sizeof(*tcp_svc_timeouts),
108         .run = tcp_svc_run
109 };
110
111 static uint16_t def_iomap_size = 0;
112 static uint16_t def_inline = 64;
113 static uint16_t def_sqsize = 384;
114 static uint16_t def_rqsize = 384;
115 static uint32_t def_mem = (1 << 17);
116 static uint32_t def_wmem = (1 << 17);
117 static uint32_t polling_time = 10;
118
119 /*
120  * Immediate data format is determined by the upper bits
121  * bit 31: message type, 0 - data, 1 - control
122  * bit 30: buffers updated, 0 - target, 1 - direct-receive
123  * bit 29: more data, 0 - end of transfer, 1 - more data available
124  *
125  * for data transfers:
126  * bits [28:0]: bytes transferred
127  * for control messages:
128  * SGL, CTRL
129  * bits [28-0]: receive credits granted
130  * IOMAP_SGL
131  * bits [28-16]: reserved, bits [15-0]: index
132  */
133
134 enum {
135         RS_OP_DATA,
136         RS_OP_RSVD_DATA_MORE,
137         RS_OP_WRITE, /* opcode is not transmitted over the network */
138         RS_OP_RSVD_DRA_MORE,
139         RS_OP_SGL,
140         RS_OP_RSVD,
141         RS_OP_IOMAP_SGL,
142         RS_OP_CTRL
143 };
144 #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
145 #define rs_msg_op(imm_data)   (imm_data >> 29)
146 #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
147 #define RS_MSG_SIZE           sizeof(uint32_t)
148
149 #define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63)
150 #define RS_WR_ID_FLAG_MSG_SEND (((uint64_t) 1) << 62) /* See RS_OPT_MSG_SEND */
151 #define rs_send_wr_id(data) ((uint64_t) data)
152 #define rs_recv_wr_id(data) (RS_WR_ID_FLAG_RECV | (uint64_t) data)
153 #define rs_wr_is_recv(wr_id) (wr_id & RS_WR_ID_FLAG_RECV)
154 #define rs_wr_is_msg_send(wr_id) (wr_id & RS_WR_ID_FLAG_MSG_SEND)
155 #define rs_wr_data(wr_id) ((uint32_t) wr_id)
156
157 enum {
158         RS_CTRL_DISCONNECT,
159         RS_CTRL_KEEPALIVE,
160         RS_CTRL_SHUTDOWN
161 };
162
163 struct rs_msg {
164         uint32_t op;
165         uint32_t data;
166 };
167
168 struct ds_qp;
169
170 struct ds_rmsg {
171         struct ds_qp    *qp;
172         uint32_t        offset;
173         uint32_t        length;
174 };
175
176 struct ds_smsg {
177         struct ds_smsg  *next;
178 };
179
180 struct rs_sge {
181         uint64_t addr;
182         uint32_t key;
183         uint32_t length;
184 };
185
186 struct rs_iomap {
187         uint64_t offset;
188         struct rs_sge sge;
189 };
190
191 struct rs_iomap_mr {
192         uint64_t offset;
193         struct ibv_mr *mr;
194         dlist_entry entry;
195         atomic_t refcnt;
196         int index;      /* -1 if mapping is local and not in iomap_list */
197 };
198
199 #define RS_MAX_CTRL_MSG    (sizeof(struct rs_sge))
200 #define rs_host_is_net()   (1 == htonl(1))
201 #define RS_CONN_FLAG_NET   (1 << 0)
202 #define RS_CONN_FLAG_IOMAP (1 << 1)
203
204 struct rs_conn_data {
205         uint8_t           version;
206         uint8_t           flags;
207         uint16_t          credits;
208         uint8_t           reserved[3];
209         uint8_t           target_iomap_size;
210         struct rs_sge     target_sgl;
211         struct rs_sge     data_buf;
212 };
213
214 struct rs_conn_private_data {
215         union {
216                 struct rs_conn_data             conn_data;
217                 struct {
218                         struct ib_connect_hdr   ib_hdr;
219                         struct rs_conn_data     conn_data;
220                 } af_ib;
221         };
222 };
223
224 /*
225  * rsocket states are ordered as passive, connecting, connected, disconnected.
226  */
227 enum rs_state {
228         rs_init,
229         rs_bound           =                0x0001,
230         rs_listening       =                0x0002,
231         rs_opening         =                0x0004,
232         rs_resolving_addr  = rs_opening |   0x0010,
233         rs_resolving_route = rs_opening |   0x0020,
234         rs_connecting      = rs_opening |   0x0040,
235         rs_accepting       = rs_opening |   0x0080,
236         rs_connected       =                0x0100,
237         rs_writable        =                0x0200,
238         rs_readable        =                0x0400,
239         rs_connect_rdwr    = rs_connected | rs_readable | rs_writable,
240         rs_connect_error   =                0x0800,
241         rs_disconnected    =                0x1000,
242         rs_error           =                0x2000,
243 };
244
245 #define RS_OPT_SWAP_SGL   (1 << 0)
246 /*
247  * iWarp does not support RDMA write with immediate data.  For iWarp, we
248  * transfer rsocket messages as inline sends.
249  */
250 #define RS_OPT_MSG_SEND   (1 << 1)
251 #define RS_OPT_SVC_ACTIVE (1 << 2)
252
253 union socket_addr {
254         struct sockaddr         sa;
255         struct sockaddr_in      sin;
256         struct sockaddr_in6     sin6;
257 };
258
259 struct ds_header {
260         uint8_t           version;
261         uint8_t           length;
262         uint16_t          port;
263         union {
264                 uint32_t  ipv4;
265                 struct {
266                         uint32_t flowinfo;
267                         uint8_t  addr[16];
268                 } ipv6;
269         } addr;
270 };
271
272 #define DS_IPV4_HDR_LEN  8
273 #define DS_IPV6_HDR_LEN 24
274
275 struct ds_dest {
276         union socket_addr addr; /* must be first */
277         struct ds_qp      *qp;
278         struct ibv_ah     *ah;
279         uint32_t           qpn;
280 };
281
282 struct ds_qp {
283         dlist_entry       list;
284         struct rsocket    *rs;
285         struct rdma_cm_id *cm_id;
286         struct ds_header  hdr;
287         struct ds_dest    dest;
288
289         struct ibv_mr     *smr;
290         struct ibv_mr     *rmr;
291         uint8_t           *rbuf;
292
293         int               cq_armed;
294 };
295
296 struct rsocket {
297         int               type;
298         int               index;
299         fastlock_t        slock;
300         fastlock_t        rlock;
301         fastlock_t        cq_lock;
302         fastlock_t        cq_wait_lock;
303         fastlock_t        map_lock; /* acquire slock first if needed */
304
305         union {
306                 /* data stream */
307                 struct {
308                         struct rdma_cm_id *cm_id;
309                         uint64_t          tcp_opts;
310                         unsigned int      keepalive_time;
311
312                         unsigned int      ctrl_seqno;
313                         unsigned int      ctrl_max_seqno;
314                         uint16_t          sseq_no;
315                         uint16_t          sseq_comp;
316                         uint16_t          rseq_no;
317                         uint16_t          rseq_comp;
318
319                         int               remote_sge;
320                         struct rs_sge     remote_sgl;
321                         struct rs_sge     remote_iomap;
322
323                         struct ibv_mr     *target_mr;
324                         int               target_sge;
325                         int               target_iomap_size;
326                         void              *target_buffer_list;
327                         volatile struct rs_sge    *target_sgl;
328                         struct rs_iomap   *target_iomap;
329
330                         int               rbuf_msg_index;
331                         int               rbuf_bytes_avail;
332                         int               rbuf_free_offset;
333                         int               rbuf_offset;
334                         struct ibv_mr     *rmr;
335                         uint8_t           *rbuf;
336
337                         int               sbuf_bytes_avail;
338                         struct ibv_mr     *smr;
339                         struct ibv_sge    ssgl[2];
340                 };
341                 /* datagram */
342                 struct {
343                         struct ds_qp      *qp_list;
344                         void              *dest_map;
345                         struct ds_dest    *conn_dest;
346
347                         int               udp_sock;
348                         int               epfd;
349                         int               rqe_avail;
350                         struct ds_smsg    *smsg_free;
351                 };
352         };
353
354         int               opts;
355         long              fd_flags;
356         uint64_t          so_opts;
357         uint64_t          ipv6_opts;
358         void              *optval;
359         size_t            optlen;
360         int               state;
361         int               cq_armed;
362         int               retries;
363         int               err;
364
365         int               sqe_avail;
366         uint32_t          sbuf_size;
367         uint16_t          sq_size;
368         uint16_t          sq_inline;
369
370         uint32_t          rbuf_size;
371         uint16_t          rq_size;
372         int               rmsg_head;
373         int               rmsg_tail;
374         union {
375                 struct rs_msg     *rmsg;
376                 struct ds_rmsg    *dmsg;
377         };
378
379         uint8_t           *sbuf;
380         struct rs_iomap_mr *remote_iomappings;
381         dlist_entry       iomap_list;
382         dlist_entry       iomap_queue;
383         int               iomap_pending;
384         int               unack_cqe;
385 };
386
387 #define DS_UDP_TAG 0x55555555
388
389 struct ds_udp_header {
390         uint32_t          tag;
391         uint8_t           version;
392         uint8_t           op;
393         uint8_t           length;
394         uint8_t           reserved;
395         uint32_t          qpn;  /* lower 8-bits reserved */
396         union {
397                 uint32_t ipv4;
398                 uint8_t  ipv6[16];
399         } addr;
400 };
401
402 #define DS_UDP_IPV4_HDR_LEN 16
403 #define DS_UDP_IPV6_HDR_LEN 28
404
405 #define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
406
407 static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
408 {
409         if (!rs->qp_list)
410                 dlist_init(&qp->list);
411         else
412                 dlist_insert_head(&qp->list, &rs->qp_list->list);
413         rs->qp_list = qp;
414 }
415
416 static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
417 {
418         if (qp->list.next != &qp->list) {
419                 rs->qp_list = ds_next_qp(qp);
420                 dlist_remove(&qp->list);
421         } else {
422                 rs->qp_list = NULL;
423         }
424 }
425
426 static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd)
427 {
428         struct rs_svc_msg msg;
429         int ret;
430
431         pthread_mutex_lock(&mut);
432         if (!svc->cnt) {
433                 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc->sock);
434                 if (ret)
435                         goto unlock;
436
437                 ret = pthread_create(&svc->id, NULL, svc->run, svc);
438                 if (ret) {
439                         ret = ERR(ret);
440                         goto closepair;
441                 }
442         }
443
444         msg.cmd = cmd;
445         msg.status = EINVAL;
446         msg.rs = rs;
447         write(svc->sock[0], &msg, sizeof msg);
448         read(svc->sock[0], &msg, sizeof msg);
449         ret = rdma_seterrno(msg.status);
450         if (svc->cnt)
451                 goto unlock;
452
453         pthread_join(svc->id, NULL);
454 closepair:
455         close(svc->sock[0]);
456         close(svc->sock[1]);
457 unlock:
458         pthread_mutex_unlock(&mut);
459         return ret;
460 }
461
462 static int ds_compare_addr(const void *dst1, const void *dst2)
463 {
464         const struct sockaddr *sa1, *sa2;
465         size_t len;
466
467         sa1 = (const struct sockaddr *) dst1;
468         sa2 = (const struct sockaddr *) dst2;
469
470         len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
471               sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
472         return memcmp(dst1, dst2, len);
473 }
474
475 static int rs_value_to_scale(int value, int bits)
476 {
477         return value <= (1 << (bits - 1)) ?
478                value : (1 << (bits - 1)) | (value >> bits);
479 }
480
481 static int rs_scale_to_value(int value, int bits)
482 {
483         return value <= (1 << (bits - 1)) ?
484                value : (value & ~(1 << (bits - 1))) << bits;
485 }
486
487 void rs_configure(void)
488 {
489         FILE *f;
490         static int init;
491
492         if (init)
493                 return;
494
495         pthread_mutex_lock(&mut);
496         if (init)
497                 goto out;
498
499         if (ucma_init())
500                 goto out;
501         ucma_ib_init();
502
503         if ((f = fopen(RS_CONF_DIR "/polling_time", "r"))) {
504                 (void) fscanf(f, "%u", &polling_time);
505                 fclose(f);
506         }
507
508         if ((f = fopen(RS_CONF_DIR "/inline_default", "r"))) {
509                 (void) fscanf(f, "%hu", &def_inline);
510                 fclose(f);
511         }
512
513         if ((f = fopen(RS_CONF_DIR "/sqsize_default", "r"))) {
514                 (void) fscanf(f, "%hu", &def_sqsize);
515                 fclose(f);
516         }
517
518         if ((f = fopen(RS_CONF_DIR "/rqsize_default", "r"))) {
519                 (void) fscanf(f, "%hu", &def_rqsize);
520                 fclose(f);
521         }
522
523         if ((f = fopen(RS_CONF_DIR "/mem_default", "r"))) {
524                 (void) fscanf(f, "%u", &def_mem);
525                 fclose(f);
526
527                 if (def_mem < 1)
528                         def_mem = 1;
529         }
530
531         if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
532                 (void) fscanf(f, "%u", &def_wmem);
533                 fclose(f);
534                 if (def_wmem < RS_SNDLOWAT)
535                         def_wmem = RS_SNDLOWAT << 1;
536         }
537
538         if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) {
539                 (void) fscanf(f, "%hu", &def_iomap_size);
540                 fclose(f);
541
542                 /* round to supported values */
543                 def_iomap_size = (uint8_t) rs_value_to_scale(
544                         (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8);
545         }
546         init = 1;
547 out:
548         pthread_mutex_unlock(&mut);
549 }
550
551 static int rs_insert(struct rsocket *rs, int index)
552 {
553         pthread_mutex_lock(&mut);
554         rs->index = idm_set(&idm, index, rs);
555         pthread_mutex_unlock(&mut);
556         return rs->index;
557 }
558
559 static void rs_remove(struct rsocket *rs)
560 {
561         pthread_mutex_lock(&mut);
562         idm_clear(&idm, rs->index);
563         pthread_mutex_unlock(&mut);
564 }
565
566 /* We only inherit from listening sockets */
567 static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
568 {
569         struct rsocket *rs;
570
571         rs = calloc(1, sizeof *rs);
572         if (!rs)
573                 return NULL;
574
575         rs->type = type;
576         rs->index = -1;
577         if (type == SOCK_DGRAM) {
578                 rs->udp_sock = -1;
579                 rs->epfd = -1;
580         }
581
582         if (inherited_rs) {
583                 rs->sbuf_size = inherited_rs->sbuf_size;
584                 rs->rbuf_size = inherited_rs->rbuf_size;
585                 rs->sq_inline = inherited_rs->sq_inline;
586                 rs->sq_size = inherited_rs->sq_size;
587                 rs->rq_size = inherited_rs->rq_size;
588                 if (type == SOCK_STREAM) {
589                         rs->ctrl_max_seqno = inherited_rs->ctrl_max_seqno;
590                         rs->target_iomap_size = inherited_rs->target_iomap_size;
591                 }
592         } else {
593                 rs->sbuf_size = def_wmem;
594                 rs->rbuf_size = def_mem;
595                 rs->sq_inline = def_inline;
596                 rs->sq_size = def_sqsize;
597                 rs->rq_size = def_rqsize;
598                 if (type == SOCK_STREAM) {
599                         rs->ctrl_max_seqno = RS_QP_CTRL_SIZE;
600                         rs->target_iomap_size = def_iomap_size;
601                 }
602         }
603         fastlock_init(&rs->slock);
604         fastlock_init(&rs->rlock);
605         fastlock_init(&rs->cq_lock);
606         fastlock_init(&rs->cq_wait_lock);
607         fastlock_init(&rs->map_lock);
608         dlist_init(&rs->iomap_list);
609         dlist_init(&rs->iomap_queue);
610         return rs;
611 }
612
613 static int rs_set_nonblocking(struct rsocket *rs, long arg)
614 {
615         struct ds_qp *qp;
616         int ret = 0;
617
618         if (rs->type == SOCK_STREAM) {
619                 if (rs->cm_id->recv_cq_channel)
620                         ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
621
622                 if (!ret && rs->state < rs_connected)
623                         ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
624         } else {
625                 ret = fcntl(rs->epfd, F_SETFL, arg);
626                 if (!ret && rs->qp_list) {
627                         qp = rs->qp_list;
628                         do {
629                                 ret = fcntl(qp->cm_id->recv_cq_channel->fd,
630                                             F_SETFL, arg);
631                                 qp = ds_next_qp(qp);
632                         } while (qp != rs->qp_list && !ret);
633                 }
634         }
635
636         return ret;
637 }
638
639 static void rs_set_qp_size(struct rsocket *rs)
640 {
641         uint16_t max_size;
642
643         max_size = min(ucma_max_qpsize(rs->cm_id), RS_QP_MAX_SIZE);
644
645         if (rs->sq_size > max_size)
646                 rs->sq_size = max_size;
647         else if (rs->sq_size < RS_QP_MIN_SIZE)
648                 rs->sq_size = RS_QP_MIN_SIZE;
649
650         if (rs->rq_size > max_size)
651                 rs->rq_size = max_size;
652         else if (rs->rq_size < RS_QP_MIN_SIZE)
653                 rs->rq_size = RS_QP_MIN_SIZE;
654 }
655
656 static void ds_set_qp_size(struct rsocket *rs)
657 {
658         uint16_t max_size;
659
660         max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
661
662         if (rs->sq_size > max_size)
663                 rs->sq_size = max_size;
664         if (rs->rq_size > max_size)
665                 rs->rq_size = max_size;
666
667         if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
668                 rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
669         else
670                 rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
671
672         if (rs->sq_size > (rs->sbuf_size / RS_SNDLOWAT))
673                 rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
674         else
675                 rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
676 }
677
678 static int rs_init_bufs(struct rsocket *rs)
679 {
680         uint32_t total_rbuf_size, total_sbuf_size;
681         size_t len;
682
683         rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
684         if (!rs->rmsg)
685                 return ERR(ENOMEM);
686
687         total_sbuf_size = rs->sbuf_size;
688         if (rs->sq_inline < RS_MAX_CTRL_MSG)
689                 total_sbuf_size += RS_MAX_CTRL_MSG * RS_QP_CTRL_SIZE;
690         rs->sbuf = calloc(total_sbuf_size, 1);
691         if (!rs->sbuf)
692                 return ERR(ENOMEM);
693
694         rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, total_sbuf_size);
695         if (!rs->smr)
696                 return -1;
697
698         len = sizeof(*rs->target_sgl) * RS_SGL_SIZE +
699               sizeof(*rs->target_iomap) * rs->target_iomap_size;
700         rs->target_buffer_list = malloc(len);
701         if (!rs->target_buffer_list)
702                 return ERR(ENOMEM);
703
704         rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
705         if (!rs->target_mr)
706                 return -1;
707
708         memset(rs->target_buffer_list, 0, len);
709         rs->target_sgl = rs->target_buffer_list;
710         if (rs->target_iomap_size)
711                 rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
712
713         total_rbuf_size = rs->rbuf_size;
714         if (rs->opts & RS_OPT_MSG_SEND)
715                 total_rbuf_size += rs->rq_size * RS_MSG_SIZE;
716         rs->rbuf = calloc(total_rbuf_size, 1);
717         if (!rs->rbuf)
718                 return ERR(ENOMEM);
719
720         rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, total_rbuf_size);
721         if (!rs->rmr)
722                 return -1;
723
724         rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
725         rs->sbuf_bytes_avail = rs->sbuf_size;
726         rs->ssgl[0].lkey = rs->ssgl[1].lkey = rs->smr->lkey;
727
728         rs->rbuf_free_offset = rs->rbuf_size >> 1;
729         rs->rbuf_bytes_avail = rs->rbuf_size >> 1;
730         rs->sqe_avail = rs->sq_size - rs->ctrl_max_seqno;
731         rs->rseq_comp = rs->rq_size >> 1;
732         return 0;
733 }
734
735 static int ds_init_bufs(struct ds_qp *qp)
736 {
737         qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), 1);
738         if (!qp->rbuf)
739                 return ERR(ENOMEM);
740
741         qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size);
742         if (!qp->smr)
743                 return -1;
744
745         qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size +
746                                                      sizeof(struct ibv_grh));
747         if (!qp->rmr)
748                 return -1;
749
750         return 0;
751 }
752
753 /*
754  * If a user is waiting on a datagram rsocket through poll or select, then
755  * we need the first completion to generate an event on the related epoll fd
756  * in order to signal the user.  We arm the CQ on creation for this purpose
757  */
758 static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
759 {
760         cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
761         if (!cm_id->recv_cq_channel)
762                 return -1;
763
764         cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
765                                        cm_id, cm_id->recv_cq_channel, 0);
766         if (!cm_id->recv_cq)
767                 goto err1;
768
769         if (rs->fd_flags & O_NONBLOCK) {
770                 if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK))
771                         goto err2;
772         }
773
774         ibv_req_notify_cq(cm_id->recv_cq, 0);
775         cm_id->send_cq_channel = cm_id->recv_cq_channel;
776         cm_id->send_cq = cm_id->recv_cq;
777         return 0;
778
779 err2:
780         ibv_destroy_cq(cm_id->recv_cq);
781         cm_id->recv_cq = NULL;
782 err1:
783         ibv_destroy_comp_channel(cm_id->recv_cq_channel);
784         cm_id->recv_cq_channel = NULL;
785         return -1;
786 }
787
788 static inline int rs_post_recv(struct rsocket *rs)
789 {
790         struct ibv_recv_wr wr, *bad;
791         struct ibv_sge sge;
792
793         wr.next = NULL;
794         if (!(rs->opts & RS_OPT_MSG_SEND)) {
795                 wr.wr_id = rs_recv_wr_id(0);
796                 wr.sg_list = NULL;
797                 wr.num_sge = 0;
798         } else {
799                 wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index);
800                 sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size +
801                            (rs->rbuf_msg_index * RS_MSG_SIZE);
802                 sge.length = RS_MSG_SIZE;
803                 sge.lkey = rs->rmr->lkey;
804
805                 wr.sg_list = &sge;
806                 wr.num_sge = 1;
807                 if(++rs->rbuf_msg_index == rs->rq_size)
808                         rs->rbuf_msg_index = 0;
809         }
810
811         return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
812 }
813
814 static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, uint32_t offset)
815 {
816         struct ibv_recv_wr wr, *bad;
817         struct ibv_sge sge[2];
818
819         sge[0].addr = (uintptr_t) qp->rbuf + rs->rbuf_size;
820         sge[0].length = sizeof(struct ibv_grh);
821         sge[0].lkey = qp->rmr->lkey;
822         sge[1].addr = (uintptr_t) qp->rbuf + offset;
823         sge[1].length = RS_SNDLOWAT;
824         sge[1].lkey = qp->rmr->lkey;
825
826         wr.wr_id = rs_recv_wr_id(offset);
827         wr.next = NULL;
828         wr.sg_list = sge;
829         wr.num_sge = 2;
830
831         return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad));
832 }
833
834 static int rs_create_ep(struct rsocket *rs)
835 {
836         struct ibv_qp_init_attr qp_attr;
837         int i, ret;
838
839         rs_set_qp_size(rs);
840         if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP)
841                 rs->opts |= RS_OPT_MSG_SEND;
842         ret = rs_init_bufs(rs);
843         if (ret)
844                 return ret;
845
846         ret = rs_create_cq(rs, rs->cm_id);
847         if (ret)
848                 return ret;
849
850         memset(&qp_attr, 0, sizeof qp_attr);
851         qp_attr.qp_context = rs;
852         qp_attr.send_cq = rs->cm_id->send_cq;
853         qp_attr.recv_cq = rs->cm_id->recv_cq;
854         qp_attr.qp_type = IBV_QPT_RC;
855         qp_attr.sq_sig_all = 1;
856         qp_attr.cap.max_send_wr = rs->sq_size;
857         qp_attr.cap.max_recv_wr = rs->rq_size;
858         qp_attr.cap.max_send_sge = 2;
859         qp_attr.cap.max_recv_sge = 1;
860         qp_attr.cap.max_inline_data = rs->sq_inline;
861
862         ret = rdma_create_qp(rs->cm_id, NULL, &qp_attr);
863         if (ret)
864                 return ret;
865
866         rs->sq_inline = qp_attr.cap.max_inline_data;
867         if ((rs->opts & RS_OPT_MSG_SEND) && (rs->sq_inline < RS_MSG_SIZE))
868                 return ERR(ENOTSUP);
869
870         for (i = 0; i < rs->rq_size; i++) {
871                 ret = rs_post_recv(rs);
872                 if (ret)
873                         return ret;
874         }
875         return 0;
876 }
877
878 static void rs_release_iomap_mr(struct rs_iomap_mr *iomr)
879 {
880         if (atomic_dec(&iomr->refcnt))
881                 return;
882
883         dlist_remove(&iomr->entry);
884         ibv_dereg_mr(iomr->mr);
885         if (iomr->index >= 0)
886                 iomr->mr = NULL;
887         else
888                 free(iomr);
889 }
890
891 static void rs_free_iomappings(struct rsocket *rs)
892 {
893         struct rs_iomap_mr *iomr;
894
895         while (!dlist_empty(&rs->iomap_list)) {
896                 iomr = container_of(rs->iomap_list.next,
897                                     struct rs_iomap_mr, entry);
898                 riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
899         }
900         while (!dlist_empty(&rs->iomap_queue)) {
901                 iomr = container_of(rs->iomap_queue.next,
902                                     struct rs_iomap_mr, entry);
903                 riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
904         }
905 }
906
907 static void ds_free_qp(struct ds_qp *qp)
908 {
909         if (qp->smr)
910                 rdma_dereg_mr(qp->smr);
911
912         if (qp->rbuf) {
913                 if (qp->rmr)
914                         rdma_dereg_mr(qp->rmr);
915                 free(qp->rbuf);
916         }
917
918         if (qp->cm_id) {
919                 if (qp->cm_id->qp) {
920                         tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
921                         epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
922                                   qp->cm_id->recv_cq_channel->fd, NULL);
923                         rdma_destroy_qp(qp->cm_id);
924                 }
925                 rdma_destroy_id(qp->cm_id);
926         }
927
928         free(qp);
929 }
930
931 static void ds_free(struct rsocket *rs)
932 {
933         struct ds_qp *qp;
934
935         if (rs->udp_sock >= 0)
936                 close(rs->udp_sock);
937
938         if (rs->index >= 0)
939                 rs_remove(rs);
940
941         if (rs->dmsg)
942                 free(rs->dmsg);
943
944         while ((qp = rs->qp_list)) {
945                 ds_remove_qp(rs, qp);
946                 ds_free_qp(qp);
947         }
948
949         if (rs->epfd >= 0)
950                 close(rs->epfd);
951
952         if (rs->sbuf)
953                 free(rs->sbuf);
954
955         tdestroy(rs->dest_map, free);
956         fastlock_destroy(&rs->map_lock);
957         fastlock_destroy(&rs->cq_wait_lock);
958         fastlock_destroy(&rs->cq_lock);
959         fastlock_destroy(&rs->rlock);
960         fastlock_destroy(&rs->slock);
961         free(rs);
962 }
963
964 static void rs_free(struct rsocket *rs)
965 {
966         if (rs->type == SOCK_DGRAM) {
967                 ds_free(rs);
968                 return;
969         }
970
971         if (rs->rmsg)
972                 free(rs->rmsg);
973
974         if (rs->sbuf) {
975                 if (rs->smr)
976                         rdma_dereg_mr(rs->smr);
977                 free(rs->sbuf);
978         }
979
980         if (rs->rbuf) {
981                 if (rs->rmr)
982                         rdma_dereg_mr(rs->rmr);
983                 free(rs->rbuf);
984         }
985
986         if (rs->target_buffer_list) {
987                 if (rs->target_mr)
988                         rdma_dereg_mr(rs->target_mr);
989                 free(rs->target_buffer_list);
990         }
991
992         if (rs->cm_id) {
993                 rs_free_iomappings(rs);
994                 if (rs->cm_id->qp) {
995                         ibv_ack_cq_events(rs->cm_id->recv_cq, rs->unack_cqe);
996                         rdma_destroy_qp(rs->cm_id);
997                 }
998                 rdma_destroy_id(rs->cm_id);
999         }
1000
1001         if (rs->index >= 0)
1002                 rs_remove(rs);
1003
1004         fastlock_destroy(&rs->map_lock);
1005         fastlock_destroy(&rs->cq_wait_lock);
1006         fastlock_destroy(&rs->cq_lock);
1007         fastlock_destroy(&rs->rlock);
1008         fastlock_destroy(&rs->slock);
1009         free(rs);
1010 }
1011
1012 static size_t rs_conn_data_offset(struct rsocket *rs)
1013 {
1014         return (rs->cm_id->route.addr.src_addr.sa_family == AF_IB) ?
1015                 sizeof(struct ib_connect_hdr) : 0;
1016 }
1017
1018 static void rs_format_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
1019 {
1020         conn->version = 1;
1021         conn->flags = RS_CONN_FLAG_IOMAP |
1022                       (rs_host_is_net() ? RS_CONN_FLAG_NET : 0);
1023         conn->credits = htons(rs->rq_size);
1024         memset(conn->reserved, 0, sizeof conn->reserved);
1025         conn->target_iomap_size = (uint8_t) rs_value_to_scale(rs->target_iomap_size, 8);
1026
1027         conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl);
1028         conn->target_sgl.length = htonl(RS_SGL_SIZE);
1029         conn->target_sgl.key = htonl(rs->target_mr->rkey);
1030
1031         conn->data_buf.addr = htonll((uintptr_t) rs->rbuf);
1032         conn->data_buf.length = htonl(rs->rbuf_size >> 1);
1033         conn->data_buf.key = htonl(rs->rmr->rkey);
1034 }
1035
1036 static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
1037 {
1038         rs->remote_sgl.addr = ntohll(conn->target_sgl.addr);
1039         rs->remote_sgl.length = ntohl(conn->target_sgl.length);
1040         rs->remote_sgl.key = ntohl(conn->target_sgl.key);
1041         rs->remote_sge = 1;
1042         if ((rs_host_is_net() && !(conn->flags & RS_CONN_FLAG_NET)) ||
1043             (!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET)))
1044                 rs->opts = RS_OPT_SWAP_SGL;
1045
1046         if (conn->flags & RS_CONN_FLAG_IOMAP) {
1047                 rs->remote_iomap.addr = rs->remote_sgl.addr +
1048                                         sizeof(rs->remote_sgl) * rs->remote_sgl.length;
1049                 rs->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8);
1050                 rs->remote_iomap.key = rs->remote_sgl.key;
1051         }
1052
1053         rs->target_sgl[0].addr = ntohll(conn->data_buf.addr);
1054         rs->target_sgl[0].length = ntohl(conn->data_buf.length);
1055         rs->target_sgl[0].key = ntohl(conn->data_buf.key);
1056
1057         rs->sseq_comp = ntohs(conn->credits);
1058 }
1059
1060 static int ds_init(struct rsocket *rs, int domain)
1061 {
1062         rs->udp_sock = socket(domain, SOCK_DGRAM, 0);
1063         if (rs->udp_sock < 0)
1064                 return rs->udp_sock;
1065
1066         rs->epfd = epoll_create(2);
1067         if (rs->epfd < 0)
1068                 return rs->epfd;
1069
1070         return 0;
1071 }
1072
1073 static int ds_init_ep(struct rsocket *rs)
1074 {
1075         struct ds_smsg *msg;
1076         int i, ret;
1077
1078         ds_set_qp_size(rs);
1079
1080         rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
1081         if (!rs->sbuf)
1082                 return ERR(ENOMEM);
1083
1084         rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
1085         if (!rs->dmsg)
1086                 return ERR(ENOMEM);
1087
1088         rs->sqe_avail = rs->sq_size;
1089         rs->rqe_avail = rs->rq_size;
1090
1091         rs->smsg_free = (struct ds_smsg *) rs->sbuf;
1092         msg = rs->smsg_free;
1093         for (i = 0; i < rs->sq_size - 1; i++) {
1094                 msg->next = (void *) msg + RS_SNDLOWAT;
1095                 msg = msg->next;
1096         }
1097         msg->next = NULL;
1098
1099         ret = rs_notify_svc(&udp_svc, rs, RS_SVC_ADD_DGRAM);
1100         if (ret)
1101                 return ret;
1102
1103         rs->state = rs_readable | rs_writable;
1104         return 0;
1105 }
1106
1107 int rsocket(int domain, int type, int protocol)
1108 {
1109         struct rsocket *rs;
1110         int index, ret;
1111
1112         if ((domain != AF_INET && domain != AF_INET6 && domain != AF_IB) ||
1113             ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) ||
1114             (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) ||
1115             (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP))
1116                 return ERR(ENOTSUP);
1117
1118         rs_configure();
1119         rs = rs_alloc(NULL, type);
1120         if (!rs)
1121                 return ERR(ENOMEM);
1122
1123         if (type == SOCK_STREAM) {
1124                 ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
1125                 if (ret)
1126                         goto err;
1127
1128                 rs->cm_id->route.addr.src_addr.sa_family = domain;
1129                 index = rs->cm_id->channel->fd;
1130         } else {
1131                 ret = ds_init(rs, domain);
1132                 if (ret)
1133                         goto err;
1134
1135                 index = rs->udp_sock;
1136         }
1137
1138         ret = rs_insert(rs, index);
1139         if (ret < 0)
1140                 goto err;
1141
1142         return rs->index;
1143
1144 err:
1145         rs_free(rs);
1146         return ret;
1147 }
1148
1149 int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
1150 {
1151         struct rsocket *rs;
1152         int ret;
1153
1154         rs = idm_lookup(&idm, socket);
1155         if (!rs)
1156                 return ERR(EBADF);
1157         if (rs->type == SOCK_STREAM) {
1158                 ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
1159                 if (!ret)
1160                         rs->state = rs_bound;
1161         } else {
1162                 if (rs->state == rs_init) {
1163                         ret = ds_init_ep(rs);
1164                         if (ret)
1165                                 return ret;
1166                 }
1167                 ret = bind(rs->udp_sock, addr, addrlen);
1168         }
1169         return ret;
1170 }
1171
1172 int rlisten(int socket, int backlog)
1173 {
1174         struct rsocket *rs;
1175         int ret;
1176
1177         rs = idm_lookup(&idm, socket);
1178         if (!rs)
1179                 return ERR(EBADF);
1180
1181         if (rs->state != rs_listening) {
1182                 ret = rdma_listen(rs->cm_id, backlog);
1183                 if (!ret)
1184                         rs->state = rs_listening;
1185         } else {
1186                 ret = 0;
1187         }
1188         return ret;
1189 }
1190
1191 /*
1192  * Nonblocking is usually not inherited between sockets, but we need to
1193  * inherit it here to establish the connection only.  This is needed to
1194  * prevent rdma_accept from blocking until the remote side finishes
1195  * establishing the connection.  If we were to allow rdma_accept to block,
1196  * then a single thread cannot establish a connection with itself, or
1197  * two threads which try to connect to each other can deadlock trying to
1198  * form a connection.
1199  *
1200  * Data transfers on the new socket remain blocking unless the user
1201  * specifies otherwise through rfcntl.
1202  */
1203 int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
1204 {
1205         struct rsocket *rs, *new_rs;
1206         struct rdma_conn_param param;
1207         struct rs_conn_data *creq, cresp;
1208         int ret;
1209
1210         rs = idm_lookup(&idm, socket);
1211         if (!rs)
1212                 return ERR(EBADF);
1213         new_rs = rs_alloc(rs, rs->type);
1214         if (!new_rs)
1215                 return ERR(ENOMEM);
1216
1217         ret = rdma_get_request(rs->cm_id, &new_rs->cm_id);
1218         if (ret)
1219                 goto err;
1220
1221         ret = rs_insert(new_rs, new_rs->cm_id->channel->fd);
1222         if (ret < 0)
1223                 goto err;
1224
1225         creq = (struct rs_conn_data *)
1226                (new_rs->cm_id->event->param.conn.private_data + rs_conn_data_offset(rs));
1227         if (creq->version != 1) {
1228                 ret = ERR(ENOTSUP);
1229                 goto err;
1230         }
1231
1232         if (rs->fd_flags & O_NONBLOCK)
1233                 fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
1234
1235         ret = rs_create_ep(new_rs);
1236         if (ret)
1237                 goto err;
1238
1239         rs_save_conn_data(new_rs, creq);
1240         param = new_rs->cm_id->event->param.conn;
1241         rs_format_conn_data(new_rs, &cresp);
1242         param.private_data = &cresp;
1243         param.private_data_len = sizeof cresp;
1244         ret = rdma_accept(new_rs->cm_id, &param);
1245         if (!ret)
1246                 new_rs->state = rs_connect_rdwr;
1247         else if (errno == EAGAIN || errno == EWOULDBLOCK)
1248                 new_rs->state = rs_accepting;
1249         else
1250                 goto err;
1251
1252         if (addr && addrlen)
1253                 rgetpeername(new_rs->index, addr, addrlen);
1254         return new_rs->index;
1255
1256 err:
1257         rs_free(new_rs);
1258         return ret;
1259 }
1260
1261 static int rs_do_connect(struct rsocket *rs)
1262 {
1263         struct rdma_conn_param param;
1264         struct rs_conn_private_data cdata;
1265         struct rs_conn_data *creq, *cresp;
1266         int to, ret;
1267
1268         switch (rs->state) {
1269         case rs_init:
1270         case rs_bound:
1271 resolve_addr:
1272                 to = 1000 << rs->retries++;
1273                 ret = rdma_resolve_addr(rs->cm_id, NULL,
1274                                         &rs->cm_id->route.addr.dst_addr, to);
1275                 if (!ret)
1276                         goto resolve_route;
1277                 if (errno == EAGAIN || errno == EWOULDBLOCK)
1278                         rs->state = rs_resolving_addr;
1279                 break;
1280         case rs_resolving_addr:
1281                 ret = ucma_complete(rs->cm_id);
1282                 if (ret) {
1283                         if (errno == ETIMEDOUT && rs->retries <= RS_CONN_RETRIES)
1284                                 goto resolve_addr;
1285                         break;
1286                 }
1287
1288                 rs->retries = 0;
1289 resolve_route:
1290                 to = 1000 << rs->retries++;
1291                 if (rs->optval) {
1292                         ret = rdma_set_option(rs->cm_id,  RDMA_OPTION_IB,
1293                                               RDMA_OPTION_IB_PATH, rs->optval,
1294                                               rs->optlen);
1295                         free(rs->optval);
1296                         rs->optval = NULL;
1297                         if (!ret) {
1298                                 rs->state = rs_resolving_route;
1299                                 goto resolving_route;
1300                         }
1301                 } else {
1302                         ret = rdma_resolve_route(rs->cm_id, to);
1303                         if (!ret)
1304                                 goto do_connect;
1305                 }
1306                 if (errno == EAGAIN || errno == EWOULDBLOCK)
1307                         rs->state = rs_resolving_route;
1308                 break;
1309         case rs_resolving_route:
1310 resolving_route:
1311                 ret = ucma_complete(rs->cm_id);
1312                 if (ret) {
1313                         if (errno == ETIMEDOUT && rs->retries <= RS_CONN_RETRIES)
1314                                 goto resolve_route;
1315                         break;
1316                 }
1317 do_connect:
1318                 ret = rs_create_ep(rs);
1319                 if (ret)
1320                         break;
1321
1322                 memset(&param, 0, sizeof param);
1323                 creq = (void *) &cdata + rs_conn_data_offset(rs);
1324                 rs_format_conn_data(rs, creq);
1325                 param.private_data = (void *) creq - rs_conn_data_offset(rs);
1326                 param.private_data_len = sizeof(*creq) + rs_conn_data_offset(rs);
1327                 param.flow_control = 1;
1328                 param.retry_count = 7;
1329                 param.rnr_retry_count = 7;
1330                 /* work-around: iWarp issues RDMA read during connection */
1331                 if (rs->opts & RS_OPT_MSG_SEND)
1332                         param.initiator_depth = 1;
1333                 rs->retries = 0;
1334
1335                 ret = rdma_connect(rs->cm_id, &param);
1336                 if (!ret)
1337                         goto connected;
1338                 if (errno == EAGAIN || errno == EWOULDBLOCK)
1339                         rs->state = rs_connecting;
1340                 break;
1341         case rs_connecting:
1342                 ret = ucma_complete(rs->cm_id);
1343                 if (ret)
1344                         break;
1345 connected:
1346                 cresp = (struct rs_conn_data *) rs->cm_id->event->param.conn.private_data;
1347                 if (cresp->version != 1) {
1348                         ret = ERR(ENOTSUP);
1349                         break;
1350                 }
1351
1352                 rs_save_conn_data(rs, cresp);
1353                 rs->state = rs_connect_rdwr;
1354                 break;
1355         case rs_accepting:
1356                 if (!(rs->fd_flags & O_NONBLOCK))
1357                         fcntl(rs->cm_id->channel->fd, F_SETFL, 0);
1358
1359                 ret = ucma_complete(rs->cm_id);
1360                 if (ret)
1361                         break;
1362
1363                 rs->state = rs_connect_rdwr;
1364                 break;
1365         default:
1366                 ret = ERR(EINVAL);
1367                 break;
1368         }
1369
1370         if (ret) {
1371                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1372                         errno = EINPROGRESS;
1373                 } else {
1374                         rs->state = rs_connect_error;
1375                         rs->err = errno;
1376                 }
1377         }
1378         return ret;
1379 }
1380
1381 static int rs_any_addr(const union socket_addr *addr)
1382 {
1383         if (addr->sa.sa_family == AF_INET) {
1384                 return (addr->sin.sin_addr.s_addr == INADDR_ANY ||
1385                         addr->sin.sin_addr.s_addr == INADDR_LOOPBACK);
1386         } else {
1387                 return (!memcmp(&addr->sin6.sin6_addr, &in6addr_any, 16) ||
1388                         !memcmp(&addr->sin6.sin6_addr, &in6addr_loopback, 16));
1389         }
1390 }
1391
1392 static int ds_get_src_addr(struct rsocket *rs,
1393                            const struct sockaddr *dest_addr, socklen_t dest_len,
1394                            union socket_addr *src_addr, socklen_t *src_len)
1395 {
1396         int sock, ret;
1397         uint16_t port;
1398
1399         *src_len = sizeof *src_addr;
1400         ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
1401         if (ret || !rs_any_addr(src_addr))
1402                 return ret;
1403
1404         port = src_addr->sin.sin_port;
1405         sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0);
1406         if (sock < 0)
1407                 return sock;
1408
1409         ret = connect(sock, dest_addr, dest_len);
1410         if (ret)
1411                 goto out;
1412
1413         *src_len = sizeof *src_addr;
1414         ret = getsockname(sock, &src_addr->sa, src_len);
1415         src_addr->sin.sin_port = port;
1416 out:
1417         close(sock);
1418         return ret;
1419 }
1420
1421 static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
1422 {
1423         if (addr->sa.sa_family == AF_INET) {
1424                 hdr->version = 4;
1425                 hdr->length = DS_IPV4_HDR_LEN;
1426                 hdr->port = addr->sin.sin_port;
1427                 hdr->addr.ipv4 = addr->sin.sin_addr.s_addr;
1428         } else {
1429                 hdr->version = 6;
1430                 hdr->length = DS_IPV6_HDR_LEN;
1431                 hdr->port = addr->sin6.sin6_port;
1432                 hdr->addr.ipv6.flowinfo= addr->sin6.sin6_flowinfo;
1433                 memcpy(&hdr->addr.ipv6.addr, &addr->sin6.sin6_addr, 16);
1434         }
1435 }
1436
1437 static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
1438                           socklen_t addrlen)
1439 {
1440         struct ibv_port_attr port_attr;
1441         struct ibv_ah_attr attr;
1442         int ret;
1443
1444         memcpy(&qp->dest.addr, addr, addrlen);
1445         qp->dest.qp = qp;
1446         qp->dest.qpn = qp->cm_id->qp->qp_num;
1447
1448         ret = ibv_query_port(qp->cm_id->verbs, qp->cm_id->port_num, &port_attr);
1449         if (ret)
1450                 return ret;
1451
1452         memset(&attr, 0, sizeof attr);
1453         attr.dlid = port_attr.lid;
1454         attr.port_num = qp->cm_id->port_num;
1455         qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
1456         if (!qp->dest.ah)
1457                 return ERR(ENOMEM);
1458
1459         tsearch(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
1460         return 0;
1461 }
1462
1463 static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
1464                         socklen_t addrlen, struct ds_qp **new_qp)
1465 {
1466         struct ds_qp *qp;
1467         struct ibv_qp_init_attr qp_attr;
1468         struct epoll_event event;
1469         int i, ret;
1470
1471         qp = calloc(1, sizeof(*qp));
1472         if (!qp)
1473                 return ERR(ENOMEM);
1474
1475         qp->rs = rs;
1476         ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP);
1477         if (ret)
1478                 goto err;
1479
1480         ds_format_hdr(&qp->hdr, src_addr);
1481         ret = rdma_bind_addr(qp->cm_id, &src_addr->sa);
1482         if (ret)
1483                 goto err;
1484
1485         ret = ds_init_bufs(qp);
1486         if (ret)
1487                 goto err;
1488
1489         ret = rs_create_cq(rs, qp->cm_id);
1490         if (ret)
1491                 goto err;
1492
1493         memset(&qp_attr, 0, sizeof qp_attr);
1494         qp_attr.qp_context = qp;
1495         qp_attr.send_cq = qp->cm_id->send_cq;
1496         qp_attr.recv_cq = qp->cm_id->recv_cq;
1497         qp_attr.qp_type = IBV_QPT_UD;
1498         qp_attr.sq_sig_all = 1;
1499         qp_attr.cap.max_send_wr = rs->sq_size;
1500         qp_attr.cap.max_recv_wr = rs->rq_size;
1501         qp_attr.cap.max_send_sge = 1;
1502         qp_attr.cap.max_recv_sge = 2;
1503         qp_attr.cap.max_inline_data = rs->sq_inline;
1504         ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr);
1505         if (ret)
1506                 goto err;
1507
1508         rs->sq_inline = qp_attr.cap.max_inline_data;
1509         ret = ds_add_qp_dest(qp, src_addr, addrlen);
1510         if (ret)
1511                 goto err;
1512
1513         event.events = EPOLLIN;
1514         event.data.ptr = qp;
1515         ret = epoll_ctl(rs->epfd,  EPOLL_CTL_ADD,
1516                         qp->cm_id->recv_cq_channel->fd, &event);
1517         if (ret)
1518                 goto err;
1519
1520         for (i = 0; i < rs->rq_size; i++) {
1521                 ret = ds_post_recv(rs, qp, i * RS_SNDLOWAT);
1522                 if (ret)
1523                         goto err;
1524         }
1525
1526         ds_insert_qp(rs, qp);
1527         *new_qp = qp;
1528         return 0;
1529 err:
1530         ds_free_qp(qp);
1531         return ret;
1532 }
1533
1534 static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
1535                      socklen_t addrlen, struct ds_qp **qp)
1536 {
1537         if (rs->qp_list) {
1538                 *qp = rs->qp_list;
1539                 do {
1540                         if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id),
1541                                              src_addr))
1542                                 return 0;
1543
1544                         *qp = ds_next_qp(*qp);
1545                 } while (*qp != rs->qp_list);
1546         }
1547
1548         return ds_create_qp(rs, src_addr, addrlen, qp);
1549 }
1550
1551 static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
1552                        socklen_t addrlen, struct ds_dest **dest)
1553 {
1554         union socket_addr src_addr;
1555         socklen_t src_len;
1556         struct ds_qp *qp;
1557         struct ds_dest **tdest, *new_dest;
1558         int ret = 0;
1559
1560         fastlock_acquire(&rs->map_lock);
1561         tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
1562         if (tdest)
1563                 goto found;
1564
1565         ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
1566         if (ret)
1567                 goto out;
1568
1569         ret = ds_get_qp(rs, &src_addr, src_len, &qp);
1570         if (ret)
1571                 goto out;
1572
1573         tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
1574         if (!tdest) {
1575                 new_dest = calloc(1, sizeof(*new_dest));
1576                 if (!new_dest) {
1577                         ret = ERR(ENOMEM);
1578                         goto out;
1579                 }
1580
1581                 memcpy(&new_dest->addr, addr, addrlen);
1582                 new_dest->qp = qp;
1583                 tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr);
1584         }
1585
1586 found:
1587         *dest = *tdest;
1588 out:
1589         fastlock_release(&rs->map_lock);
1590         return ret;
1591 }
1592
1593 int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
1594 {
1595         struct rsocket *rs;
1596         int ret;
1597
1598         rs = idm_lookup(&idm, socket);
1599         if (!rs)
1600                 return ERR(EBADF);
1601         if (rs->type == SOCK_STREAM) {
1602                 memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
1603                 ret = rs_do_connect(rs);
1604         } else {
1605                 if (rs->state == rs_init) {
1606                         ret = ds_init_ep(rs);
1607                         if (ret)
1608                                 return ret;
1609                 }
1610
1611                 fastlock_acquire(&rs->slock);
1612                 ret = connect(rs->udp_sock, addr, addrlen);
1613                 if (!ret)
1614                         ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
1615                 fastlock_release(&rs->slock);
1616         }
1617         return ret;
1618 }
1619
1620 static void *rs_get_ctrl_buf(struct rsocket *rs)
1621 {
1622         return rs->sbuf + rs->sbuf_size +
1623                 RS_MAX_CTRL_MSG * (rs->ctrl_seqno & (RS_QP_CTRL_SIZE - 1));
1624 }
1625
1626 static int rs_post_msg(struct rsocket *rs, uint32_t msg)
1627 {
1628         struct ibv_send_wr wr, *bad;
1629         struct ibv_sge sge;
1630
1631         wr.wr_id = rs_send_wr_id(msg);
1632         wr.next = NULL;
1633         if (!(rs->opts & RS_OPT_MSG_SEND)) {
1634                 wr.sg_list = NULL;
1635                 wr.num_sge = 0;
1636                 wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
1637                 wr.send_flags = 0;
1638                 wr.imm_data = htonl(msg);
1639         } else {
1640                 sge.addr = (uintptr_t) &msg;
1641                 sge.lkey = 0;
1642                 sge.length = sizeof msg;
1643                 wr.sg_list = &sge;
1644                 wr.num_sge = 1;
1645                 wr.opcode = IBV_WR_SEND;
1646                 wr.send_flags = IBV_SEND_INLINE;
1647         }
1648
1649         return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
1650 }
1651
1652 static int rs_post_write(struct rsocket *rs,
1653                          struct ibv_sge *sgl, int nsge,
1654                          uint32_t wr_data, int flags,
1655                          uint64_t addr, uint32_t rkey)
1656 {
1657         struct ibv_send_wr wr, *bad;
1658
1659         wr.wr_id = rs_send_wr_id(wr_data);
1660         wr.next = NULL;
1661         wr.sg_list = sgl;
1662         wr.num_sge = nsge;
1663         wr.opcode = IBV_WR_RDMA_WRITE;
1664         wr.send_flags = flags;
1665         wr.wr.rdma.remote_addr = addr;
1666         wr.wr.rdma.rkey = rkey;
1667
1668         return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
1669 }
1670
1671 static int rs_post_write_msg(struct rsocket *rs,
1672                          struct ibv_sge *sgl, int nsge,
1673                          uint32_t msg, int flags,
1674                          uint64_t addr, uint32_t rkey)
1675 {
1676         struct ibv_send_wr wr, *bad;
1677         struct ibv_sge sge;
1678         int ret;
1679
1680         wr.next = NULL;
1681         if (!(rs->opts & RS_OPT_MSG_SEND)) {
1682                 wr.wr_id = rs_send_wr_id(msg);
1683                 wr.sg_list = sgl;
1684                 wr.num_sge = nsge;
1685                 wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
1686                 wr.send_flags = flags;
1687                 wr.imm_data = htonl(msg);
1688                 wr.wr.rdma.remote_addr = addr;
1689                 wr.wr.rdma.rkey = rkey;
1690
1691                 return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
1692         } else {
1693                 ret = rs_post_write(rs, sgl, nsge, msg, flags, addr, rkey);
1694                 if (!ret) {
1695                         wr.wr_id = rs_send_wr_id(rs_msg_set(rs_msg_op(msg), 0)) |
1696                                    RS_WR_ID_FLAG_MSG_SEND;
1697                         sge.addr = (uintptr_t) &msg;
1698                         sge.lkey = 0;
1699                         sge.length = sizeof msg;
1700                         wr.sg_list = &sge;
1701                         wr.num_sge = 1;
1702                         wr.opcode = IBV_WR_SEND;
1703                         wr.send_flags = IBV_SEND_INLINE;
1704
1705                         ret = rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
1706                 }
1707                 return ret;
1708         }
1709 }
1710
1711 static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
1712                         uint32_t wr_data)
1713 {
1714         struct ibv_send_wr wr, *bad;
1715
1716         wr.wr_id = rs_send_wr_id(wr_data);
1717         wr.next = NULL;
1718         wr.sg_list = sge;
1719         wr.num_sge = 1;
1720         wr.opcode = IBV_WR_SEND;
1721         wr.send_flags = (sge->length <= rs->sq_inline) ? IBV_SEND_INLINE : 0;
1722         wr.wr.ud.ah = rs->conn_dest->ah;
1723         wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
1724         wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
1725
1726         return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
1727 }
1728
1729 /*
1730  * Update target SGE before sending data.  Otherwise the remote side may
1731  * update the entry before we do.
1732  */
1733 static int rs_write_data(struct rsocket *rs,
1734                          struct ibv_sge *sgl, int nsge,
1735                          uint32_t length, int flags)
1736 {
1737         uint64_t addr;
1738         uint32_t rkey;
1739
1740         rs->sseq_no++;
1741         rs->sqe_avail--;
1742         if (rs->opts & RS_OPT_MSG_SEND)
1743                 rs->sqe_avail--;
1744         rs->sbuf_bytes_avail -= length;
1745
1746         addr = rs->target_sgl[rs->target_sge].addr;
1747         rkey = rs->target_sgl[rs->target_sge].key;
1748
1749         rs->target_sgl[rs->target_sge].addr += length;
1750         rs->target_sgl[rs->target_sge].length -= length;
1751
1752         if (!rs->target_sgl[rs->target_sge].length) {
1753                 if (++rs->target_sge == RS_SGL_SIZE)
1754                         rs->target_sge = 0;
1755         }
1756
1757         return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
1758                                  flags, addr, rkey);
1759 }
1760
1761 static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
1762                            struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
1763 {
1764         uint64_t addr;
1765
1766         rs->sqe_avail--;
1767         rs->sbuf_bytes_avail -= length;
1768
1769         addr = iom->sge.addr + offset - iom->offset;
1770         return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
1771                              flags, addr, iom->sge.key);
1772 }
1773
1774 static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
1775                           struct ibv_sge *sgl, int nsge, int flags)
1776 {
1777         uint64_t addr;
1778
1779         rs->sseq_no++;
1780         rs->sqe_avail--;
1781         if (rs->opts & RS_OPT_MSG_SEND)
1782                 rs->sqe_avail--;
1783         rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
1784
1785         addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
1786         return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
1787                                  flags, addr, rs->remote_iomap.key);
1788 }
1789
1790 static uint32_t rs_sbuf_left(struct rsocket *rs)
1791 {
1792         return (uint32_t) (((uint64_t) (uintptr_t) &rs->sbuf[rs->sbuf_size]) -
1793                            rs->ssgl[0].addr);
1794 }
1795
1796 static void rs_send_credits(struct rsocket *rs)
1797 {
1798         struct ibv_sge ibsge;
1799         struct rs_sge sge, *sge_buf;
1800         int flags;
1801
1802         rs->ctrl_seqno++;
1803         rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1);
1804         if (rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) {
1805                 if (rs->opts & RS_OPT_MSG_SEND)
1806                         rs->ctrl_seqno++;
1807
1808                 if (!(rs->opts & RS_OPT_SWAP_SGL)) {
1809                         sge.addr = (uintptr_t) &rs->rbuf[rs->rbuf_free_offset];
1810                         sge.key = rs->rmr->rkey;
1811                         sge.length = rs->rbuf_size >> 1;
1812                 } else {
1813                         sge.addr = bswap_64((uintptr_t) &rs->rbuf[rs->rbuf_free_offset]);
1814                         sge.key = bswap_32(rs->rmr->rkey);
1815                         sge.length = bswap_32(rs->rbuf_size >> 1);
1816                 }
1817
1818                 if (rs->sq_inline < sizeof sge) {
1819                         sge_buf = rs_get_ctrl_buf(rs);
1820                         memcpy(sge_buf, &sge, sizeof sge);
1821                         ibsge.addr = (uintptr_t) sge_buf;
1822                         ibsge.lkey = rs->smr->lkey;
1823                         flags = 0;
1824                 } else {
1825                         ibsge.addr = (uintptr_t) &sge;
1826                         ibsge.lkey = 0;
1827                         flags = IBV_SEND_INLINE;
1828                 }
1829                 ibsge.length = sizeof(sge);
1830
1831                 rs_post_write_msg(rs, &ibsge, 1,
1832                         rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), flags,
1833                         rs->remote_sgl.addr + rs->remote_sge * sizeof(struct rs_sge),
1834                         rs->remote_sgl.key);
1835
1836                 rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
1837                 rs->rbuf_free_offset += rs->rbuf_size >> 1;
1838                 if (rs->rbuf_free_offset >= rs->rbuf_size)
1839                         rs->rbuf_free_offset = 0;
1840                 if (++rs->remote_sge == rs->remote_sgl.length)
1841                         rs->remote_sge = 0;
1842         } else {
1843                 rs_post_msg(rs, rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size));
1844         }
1845 }
1846
1847 static inline int rs_ctrl_avail(struct rsocket *rs)
1848 {
1849         return rs->ctrl_seqno != rs->ctrl_max_seqno;
1850 }
1851
1852 /* Protocols that do not support RDMA write with immediate may require 2 msgs */
1853 static inline int rs_2ctrl_avail(struct rsocket *rs)
1854 {
1855         return (int)((rs->ctrl_seqno + 1) - rs->ctrl_max_seqno) < 0;
1856 }
1857
1858 static int rs_give_credits(struct rsocket *rs)
1859 {
1860         if (!(rs->opts & RS_OPT_MSG_SEND)) {
1861                 return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
1862                         ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
1863                        rs_ctrl_avail(rs) && (rs->state & rs_connected);
1864         } else {
1865                 return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
1866                         ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
1867                        rs_2ctrl_avail(rs) && (rs->state & rs_connected);
1868         }
1869 }
1870
1871 static void rs_update_credits(struct rsocket *rs)
1872 {
1873         if (rs_give_credits(rs))
1874                 rs_send_credits(rs);
1875 }
1876
1877 static int rs_poll_cq(struct rsocket *rs)
1878 {
1879         struct ibv_wc wc;
1880         uint32_t msg;
1881         int ret, rcnt = 0;
1882
1883         while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) {
1884                 if (rs_wr_is_recv(wc.wr_id)) {
1885                         if (wc.status != IBV_WC_SUCCESS)
1886                                 continue;
1887                         rcnt++;
1888
1889                         if (wc.wc_flags & IBV_WC_WITH_IMM) {
1890                                 msg = ntohl(wc.imm_data);
1891                         } else {
1892                                 msg = ((uint32_t *) (rs->rbuf + rs->rbuf_size))
1893                                         [rs_wr_data(wc.wr_id)];
1894
1895                         }
1896                         switch (rs_msg_op(msg)) {
1897                         case RS_OP_SGL:
1898                                 rs->sseq_comp = (uint16_t) rs_msg_data(msg);
1899                                 break;
1900                         case RS_OP_IOMAP_SGL:
1901                                 /* The iomap was updated, that's nice to know. */
1902                                 break;
1903                         case RS_OP_CTRL:
1904                                 if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
1905                                         rs->state = rs_disconnected;
1906                                         return 0;
1907                                 } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
1908                                         if (rs->state & rs_writable) {
1909                                                 rs->state &= ~rs_readable;
1910                                         } else {
1911                                                 rs->state = rs_disconnected;
1912                                                 return 0;
1913                                         }
1914                                 }
1915                                 break;
1916                         case RS_OP_WRITE:
1917                                 /* We really shouldn't be here. */
1918                                 break;
1919                         default:
1920                                 rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg);
1921                                 rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg);
1922                                 if (++rs->rmsg_tail == rs->rq_size + 1)
1923                                         rs->rmsg_tail = 0;
1924                                 break;
1925                         }
1926                 } else {
1927                         switch  (rs_msg_op(rs_wr_data(wc.wr_id))) {
1928                         case RS_OP_SGL:
1929                                 rs->ctrl_max_seqno++;
1930                                 break;
1931                         case RS_OP_CTRL:
1932                                 rs->ctrl_max_seqno++;
1933                                 if (rs_msg_data(rs_wr_data(wc.wr_id)) == RS_CTRL_DISCONNECT)
1934                                         rs->state = rs_disconnected;
1935                                 break;
1936                         case RS_OP_IOMAP_SGL:
1937                                 rs->sqe_avail++;
1938                                 if (!rs_wr_is_msg_send(wc.wr_id))
1939                                         rs->sbuf_bytes_avail += sizeof(struct rs_iomap);
1940                                 break;
1941                         default:
1942                                 rs->sqe_avail++;
1943                                 rs->sbuf_bytes_avail += rs_msg_data(rs_wr_data(wc.wr_id));
1944                                 break;
1945                         }
1946                         if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) {
1947                                 rs->state = rs_error;
1948                                 rs->err = EIO;
1949                         }
1950                 }
1951         }
1952
1953         if (rs->state & rs_connected) {
1954                 while (!ret && rcnt--)
1955                         ret = rs_post_recv(rs);
1956
1957                 if (ret) {
1958                         rs->state = rs_error;
1959                         rs->err = errno;
1960                 }
1961         }
1962         return ret;
1963 }
1964
1965 static int rs_get_cq_event(struct rsocket *rs)
1966 {
1967         struct ibv_cq *cq;
1968         void *context;
1969         int ret;
1970
1971         if (!rs->cq_armed)
1972                 return 0;
1973
1974         ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
1975         if (!ret) {
1976                 if (++rs->unack_cqe >= rs->sq_size + rs->rq_size) {
1977                         ibv_ack_cq_events(rs->cm_id->recv_cq, rs->unack_cqe);
1978                         rs->unack_cqe = 0;
1979                 }
1980                 rs->cq_armed = 0;
1981         } else if (errno != EAGAIN) {
1982                 rs->state = rs_error;
1983         }
1984
1985         return ret;
1986 }
1987
1988 /*
1989  * Although we serialize rsend and rrecv calls with respect to themselves,
1990  * both calls may run simultaneously and need to poll the CQ for completions.
1991  * We need to serialize access to the CQ, but rsend and rrecv need to
1992  * allow each other to make forward progress.
1993  *
1994  * For example, rsend may need to wait for credits from the remote side,
1995  * which could be stalled until the remote process calls rrecv.  This should
1996  * not block rrecv from receiving data from the remote side however.
1997  *
1998  * We handle this by using two locks.  The cq_lock protects against polling
1999  * the CQ and processing completions.  The cq_wait_lock serializes access to
2000  * waiting on the CQ.
2001  */
2002 static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
2003 {
2004         int ret;
2005
2006         fastlock_acquire(&rs->cq_lock);
2007         do {
2008                 rs_update_credits(rs);
2009                 ret = rs_poll_cq(rs);
2010                 if (test(rs)) {
2011                         ret = 0;
2012                         break;
2013                 } else if (ret) {
2014                         break;
2015                 } else if (nonblock) {
2016                         ret = ERR(EWOULDBLOCK);
2017                 } else if (!rs->cq_armed) {
2018                         ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
2019                         rs->cq_armed = 1;
2020                 } else {
2021                         rs_update_credits(rs);
2022                         fastlock_acquire(&rs->cq_wait_lock);
2023                         fastlock_release(&rs->cq_lock);
2024
2025                         ret = rs_get_cq_event(rs);
2026                         fastlock_release(&rs->cq_wait_lock);
2027                         fastlock_acquire(&rs->cq_lock);
2028                 }
2029         } while (!ret);
2030
2031         rs_update_credits(rs);
2032         fastlock_release(&rs->cq_lock);
2033         return ret;
2034 }
2035
2036 static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
2037 {
2038         struct timeval s, e;
2039         uint32_t poll_time = 0;
2040         int ret;
2041
2042         do {
2043                 ret = rs_process_cq(rs, 1, test);
2044                 if (!ret || nonblock || errno != EWOULDBLOCK)
2045                         return ret;
2046
2047                 if (!poll_time)
2048                         gettimeofday(&s, NULL);
2049
2050                 gettimeofday(&e, NULL);
2051                 poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
2052                             (e.tv_usec - s.tv_usec) + 1;
2053         } while (poll_time <= polling_time);
2054
2055         ret = rs_process_cq(rs, 0, test);
2056         return ret;
2057 }
2058
2059 static int ds_valid_recv(struct ds_qp *qp, struct ibv_wc *wc)
2060 {
2061         struct ds_header *hdr;
2062
2063         hdr = (struct ds_header *) (qp->rbuf + rs_wr_data(wc->wr_id));
2064         return ((wc->byte_len >= sizeof(struct ibv_grh) + DS_IPV4_HDR_LEN) &&
2065                 ((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) ||
2066                  (hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN)));
2067 }
2068
2069 /*
2070  * Poll all CQs associated with a datagram rsocket.  We need to drop any
2071  * received messages that we do not have room to store.  To limit drops,
2072  * we only poll if we have room to store the receive or we need a send
2073  * buffer.  To ensure fairness, we poll the CQs round robin, remembering
2074  * where we left off.
2075  */
2076 static void ds_poll_cqs(struct rsocket *rs)
2077 {
2078         struct ds_qp *qp;
2079         struct ds_smsg *smsg;
2080         struct ds_rmsg *rmsg;
2081         struct ibv_wc wc;
2082         int ret, cnt;
2083
2084         if (!(qp = rs->qp_list))
2085                 return;
2086
2087         do {
2088                 cnt = 0;
2089                 do {
2090                         ret = ibv_poll_cq(qp->cm_id->recv_cq, 1, &wc);
2091                         if (ret <= 0) {
2092                                 qp = ds_next_qp(qp);
2093                                 continue;
2094                         }
2095
2096                         if (rs_wr_is_recv(wc.wr_id)) {
2097                                 if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
2098                                     ds_valid_recv(qp, &wc)) {
2099                                         rs->rqe_avail--;
2100                                         rmsg = &rs->dmsg[rs->rmsg_tail];
2101                                         rmsg->qp = qp;
2102                                         rmsg->offset = rs_wr_data(wc.wr_id);
2103                                         rmsg->length = wc.byte_len - sizeof(struct ibv_grh);
2104                                         if (++rs->rmsg_tail == rs->rq_size + 1)
2105                                                 rs->rmsg_tail = 0;
2106                                 } else {
2107                                         ds_post_recv(rs, qp, rs_wr_data(wc.wr_id));
2108                                 }
2109                         } else {
2110                                 smsg = (struct ds_smsg *) (rs->sbuf + rs_wr_data(wc.wr_id));
2111                                 smsg->next = rs->smsg_free;
2112                                 rs->smsg_free = smsg;
2113                                 rs->sqe_avail++;
2114                         }
2115
2116                         qp = ds_next_qp(qp);
2117                         if (!rs->rqe_avail && rs->sqe_avail) {
2118                                 rs->qp_list = qp;
2119                                 return;
2120                         }
2121                         cnt++;
2122                 } while (qp != rs->qp_list);
2123         } while (cnt);
2124 }
2125
2126 static void ds_req_notify_cqs(struct rsocket *rs)
2127 {
2128         struct ds_qp *qp;
2129
2130         if (!(qp = rs->qp_list))
2131                 return;
2132
2133         do {
2134                 if (!qp->cq_armed) {
2135                         ibv_req_notify_cq(qp->cm_id->recv_cq, 0);
2136                         qp->cq_armed = 1;
2137                 }
2138                 qp = ds_next_qp(qp);
2139         } while (qp != rs->qp_list);
2140 }
2141
2142 static int ds_get_cq_event(struct rsocket *rs)
2143 {
2144         struct epoll_event event;
2145         struct ds_qp *qp;
2146         struct ibv_cq *cq;
2147         void *context;
2148         int ret;
2149
2150         if (!rs->cq_armed)
2151                 return 0;
2152
2153         ret = epoll_wait(rs->epfd, &event, 1, -1);
2154         if (ret <= 0)
2155                 return ret;
2156
2157         qp = event.data.ptr;
2158         ret = ibv_get_cq_event(qp->cm_id->recv_cq_channel, &cq, &context);
2159         if (!ret) {
2160                 ibv_ack_cq_events(qp->cm_id->recv_cq, 1);
2161                 qp->cq_armed = 0;
2162                 rs->cq_armed = 0;
2163         }
2164
2165         return ret;
2166 }
2167
2168 static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
2169 {
2170         int ret = 0;
2171
2172         fastlock_acquire(&rs->cq_lock);
2173         do {
2174                 ds_poll_cqs(rs);
2175                 if (test(rs)) {
2176                         ret = 0;
2177                         break;
2178                 } else if (nonblock) {
2179                         ret = ERR(EWOULDBLOCK);
2180                 } else if (!rs->cq_armed) {
2181                         ds_req_notify_cqs(rs);
2182                         rs->cq_armed = 1;
2183                 } else {
2184                         fastlock_acquire(&rs->cq_wait_lock);
2185                         fastlock_release(&rs->cq_lock);
2186
2187                         ret = ds_get_cq_event(rs);
2188                         fastlock_release(&rs->cq_wait_lock);
2189                         fastlock_acquire(&rs->cq_lock);
2190                 }
2191         } while (!ret);
2192
2193         fastlock_release(&rs->cq_lock);
2194         return ret;
2195 }
2196
2197 static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
2198 {
2199         struct timeval s, e;
2200         uint32_t poll_time = 0;
2201         int ret;
2202
2203         do {
2204                 ret = ds_process_cqs(rs, 1, test);
2205                 if (!ret || nonblock || errno != EWOULDBLOCK)
2206                         return ret;
2207
2208                 if (!poll_time)
2209                         gettimeofday(&s, NULL);
2210
2211                 gettimeofday(&e, NULL);
2212                 poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
2213                             (e.tv_usec - s.tv_usec) + 1;
2214         } while (poll_time <= polling_time);
2215
2216         ret = ds_process_cqs(rs, 0, test);
2217         return ret;
2218 }
2219
2220 static int rs_nonblocking(struct rsocket *rs, int flags)
2221 {
2222         return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
2223 }
2224
2225 static int rs_is_cq_armed(struct rsocket *rs)
2226 {
2227         return rs->cq_armed;
2228 }
2229
2230 static int rs_poll_all(struct rsocket *rs)
2231 {
2232         return 1;
2233 }
2234
2235 /*
2236  * We use hardware flow control to prevent over running the remote
2237  * receive queue.  However, data transfers still require space in
2238  * the remote rmsg queue, or we risk losing notification that data
2239  * has been transfered.
2240  *
2241  * Be careful with race conditions in the check below.  The target SGL
2242  * may be updated by a remote RDMA write.
2243  */
2244 static int rs_can_send(struct rsocket *rs)
2245 {
2246         if (!(rs->opts & RS_OPT_MSG_SEND)) {
2247                 return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
2248                        (rs->sseq_no != rs->sseq_comp) &&
2249                        (rs->target_sgl[rs->target_sge].length != 0);
2250         } else {
2251                 return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
2252                        (rs->sseq_no != rs->sseq_comp) &&
2253                        (rs->target_sgl[rs->target_sge].length != 0);
2254         }
2255 }
2256
2257 static int ds_can_send(struct rsocket *rs)
2258 {
2259         return rs->sqe_avail;
2260 }
2261
2262 static int ds_all_sends_done(struct rsocket *rs)
2263 {
2264         return rs->sqe_avail == rs->sq_size;
2265 }
2266
2267 static int rs_conn_can_send(struct rsocket *rs)
2268 {
2269         return rs_can_send(rs) || !(rs->state & rs_writable);
2270 }
2271
2272 static int rs_conn_can_send_ctrl(struct rsocket *rs)
2273 {
2274         return rs_ctrl_avail(rs) || !(rs->state & rs_connected);
2275 }
2276
2277 static int rs_have_rdata(struct rsocket *rs)
2278 {
2279         return (rs->rmsg_head != rs->rmsg_tail);
2280 }
2281
2282 static int rs_conn_have_rdata(struct rsocket *rs)
2283 {
2284         return rs_have_rdata(rs) || !(rs->state & rs_readable);
2285 }
2286
2287 static int rs_conn_all_sends_done(struct rsocket *rs)
2288 {
2289         return ((((int) rs->ctrl_max_seqno) - ((int) rs->ctrl_seqno)) +
2290                 rs->sqe_avail == rs->sq_size) ||
2291                !(rs->state & rs_connected);
2292 }
2293
2294 static void ds_set_src(struct sockaddr *addr, socklen_t *addrlen,
2295                        struct ds_header *hdr)
2296 {
2297         union socket_addr sa;
2298
2299         memset(&sa, 0, sizeof sa);
2300         if (hdr->version == 4) {
2301                 if (*addrlen > sizeof(sa.sin))
2302                         *addrlen = sizeof(sa.sin);
2303
2304                 sa.sin.sin_family = AF_INET;
2305                 sa.sin.sin_port = hdr->port;
2306                 sa.sin.sin_addr.s_addr =  hdr->addr.ipv4;
2307         } else {
2308                 if (*addrlen > sizeof(sa.sin6))
2309                         *addrlen = sizeof(sa.sin6);
2310
2311                 sa.sin6.sin6_family = AF_INET6;
2312                 sa.sin6.sin6_port = hdr->port;
2313                 sa.sin6.sin6_flowinfo = hdr->addr.ipv6.flowinfo;
2314                 memcpy(&sa.sin6.sin6_addr, &hdr->addr.ipv6.addr, 16);
2315         }
2316         memcpy(addr, &sa, *addrlen);
2317 }
2318
2319 static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
2320                            struct sockaddr *src_addr, socklen_t *addrlen)
2321 {
2322         struct ds_rmsg *rmsg;
2323         struct ds_header *hdr;
2324         int ret;
2325
2326         if (!(rs->state & rs_readable))
2327                 return ERR(EINVAL);
2328
2329         if (!rs_have_rdata(rs)) {
2330                 ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
2331                                   rs_have_rdata);
2332                 if (ret)
2333                         return ret;
2334         }
2335
2336         rmsg = &rs->dmsg[rs->rmsg_head];
2337         hdr = (struct ds_header *) (rmsg->qp->rbuf + rmsg->offset);
2338         if (len > rmsg->length - hdr->length)
2339                 len = rmsg->length - hdr->length;
2340
2341         memcpy(buf, (void *) hdr + hdr->length, len);
2342         if (addrlen)
2343                 ds_set_src(src_addr, addrlen, hdr);
2344
2345         if (!(flags & MSG_PEEK)) {
2346                 ds_post_recv(rs, rmsg->qp, rmsg->offset);
2347                 if (++rs->rmsg_head == rs->rq_size + 1)
2348                         rs->rmsg_head = 0;
2349                 rs->rqe_avail++;
2350         }
2351
2352         return len;
2353 }
2354
2355 static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
2356 {
2357         size_t left = len;
2358         uint32_t end_size, rsize;
2359         int rmsg_head, rbuf_offset;
2360
2361         rmsg_head = rs->rmsg_head;
2362         rbuf_offset = rs->rbuf_offset;
2363
2364         for (; left && (rmsg_head != rs->rmsg_tail); left -= rsize) {
2365                 if (left < rs->rmsg[rmsg_head].data) {
2366                         rsize = left;
2367                 } else {
2368                         rsize = rs->rmsg[rmsg_head].data;
2369                         if (++rmsg_head == rs->rq_size + 1)
2370                                 rmsg_head = 0;
2371                 }
2372
2373                 end_size = rs->rbuf_size - rbuf_offset;
2374                 if (rsize > end_size) {
2375                         memcpy(buf, &rs->rbuf[rbuf_offset], end_size);
2376                         rbuf_offset = 0;
2377                         buf += end_size;
2378                         rsize -= end_size;
2379                         left -= end_size;
2380                 }
2381                 memcpy(buf, &rs->rbuf[rbuf_offset], rsize);
2382                 rbuf_offset += rsize;
2383                 buf += rsize;
2384         }
2385
2386         return len - left;
2387 }
2388
2389 /*
2390  * Continue to receive any queued data even if the remote side has disconnected.
2391  */
2392 ssize_t rrecv(int socket, void *buf, size_t len, int flags)
2393 {
2394         struct rsocket *rs;
2395         size_t left = len;
2396         uint32_t end_size, rsize;
2397         int ret;
2398
2399         rs = idm_at(&idm, socket);
2400         if (rs->type == SOCK_DGRAM) {
2401                 fastlock_acquire(&rs->rlock);
2402                 ret = ds_recvfrom(rs, buf, len, flags, NULL, 0);
2403                 fastlock_release(&rs->rlock);
2404                 return ret;
2405         }
2406
2407         if (rs->state & rs_opening) {
2408                 ret = rs_do_connect(rs);
2409                 if (ret) {
2410                         if (errno == EINPROGRESS)
2411                                 errno = EAGAIN;
2412                         return ret;
2413                 }
2414         }
2415         fastlock_acquire(&rs->rlock);
2416         do {
2417                 if (!rs_have_rdata(rs)) {
2418                         ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
2419                                           rs_conn_have_rdata);
2420                         if (ret)
2421                                 break;
2422                 }
2423
2424                 ret = 0;
2425                 if (flags & MSG_PEEK) {
2426                         left = len - rs_peek(rs, buf, left);
2427                         break;
2428                 }
2429
2430                 for (; left && rs_have_rdata(rs); left -= rsize) {
2431                         if (left < rs->rmsg[rs->rmsg_head].data) {
2432                                 rsize = left;
2433                                 rs->rmsg[rs->rmsg_head].data -= left;
2434                         } else {
2435                                 rs->rseq_no++;
2436                                 rsize = rs->rmsg[rs->rmsg_head].data;
2437                                 if (++rs->rmsg_head == rs->rq_size + 1)
2438                                         rs->rmsg_head = 0;
2439                         }
2440
2441                         end_size = rs->rbuf_size - rs->rbuf_offset;
2442                         if (rsize > end_size) {
2443                                 memcpy(buf, &rs->rbuf[rs->rbuf_offset], end_size);
2444                                 rs->rbuf_offset = 0;
2445                                 buf += end_size;
2446                                 rsize -= end_size;
2447                                 left -= end_size;
2448                                 rs->rbuf_bytes_avail += end_size;
2449                         }
2450                         memcpy(buf, &rs->rbuf[rs->rbuf_offset], rsize);
2451                         rs->rbuf_offset += rsize;
2452                         buf += rsize;
2453                         rs->rbuf_bytes_avail += rsize;
2454                 }
2455
2456         } while (left && (flags & MSG_WAITALL) && (rs->state & rs_readable));
2457
2458         fastlock_release(&rs->rlock);
2459         return ret ? ret : len - left;
2460 }
2461
2462 ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
2463                   struct sockaddr *src_addr, socklen_t *addrlen)
2464 {
2465         struct rsocket *rs;
2466         int ret;
2467
2468         rs = idm_at(&idm, socket);
2469         if (rs->type == SOCK_DGRAM) {
2470                 fastlock_acquire(&rs->rlock);
2471                 ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen);
2472                 fastlock_release(&rs->rlock);
2473                 return ret;
2474         }
2475
2476         ret = rrecv(socket, buf, len, flags);
2477         if (ret > 0 && src_addr)
2478                 rgetpeername(socket, src_addr, addrlen);
2479
2480         return ret;
2481 }
2482
2483 /*
2484  * Simple, straightforward implementation for now that only tries to fill
2485  * in the first vector.
2486  */
2487 static ssize_t rrecvv(int socket, const struct iovec *iov, int iovcnt, int flags)
2488 {
2489         return rrecv(socket, iov[0].iov_base, iov[0].iov_len, flags);
2490 }
2491
2492 ssize_t rrecvmsg(int socket, struct msghdr *msg, int flags)
2493 {
2494         if (msg->msg_control && msg->msg_controllen)
2495                 return ERR(ENOTSUP);
2496
2497         return rrecvv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags);
2498 }
2499
2500 ssize_t rread(int socket, void *buf, size_t count)
2501 {
2502         return rrecv(socket, buf, count, 0);
2503 }
2504
2505 ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
2506 {
2507         return rrecvv(socket, iov, iovcnt, 0);
2508 }
2509
2510 static int rs_send_iomaps(struct rsocket *rs, int flags)
2511 {
2512         struct rs_iomap_mr *iomr;
2513         struct ibv_sge sge;
2514         struct rs_iomap iom;
2515         int ret;
2516
2517         fastlock_acquire(&rs->map_lock);
2518         while (!dlist_empty(&rs->iomap_queue)) {
2519                 if (!rs_can_send(rs)) {
2520                         ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
2521                                           rs_conn_can_send);
2522                         if (ret)
2523                                 break;
2524                         if (!(rs->state & rs_writable)) {
2525                                 ret = ERR(ECONNRESET);
2526                                 break;
2527                         }
2528                 }
2529
2530                 iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry);
2531                 if (!(rs->opts & RS_OPT_SWAP_SGL)) {
2532                         iom.offset = iomr->offset;
2533                         iom.sge.addr = (uintptr_t) iomr->mr->addr;
2534                         iom.sge.length = iomr->mr->length;
2535                         iom.sge.key = iomr->mr->rkey;
2536                 } else {
2537                         iom.offset = bswap_64(iomr->offset);
2538                         iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
2539                         iom.sge.length = bswap_32(iomr->mr->length);
2540                         iom.sge.key = bswap_32(iomr->mr->rkey);
2541                 }
2542
2543                 if (rs->sq_inline >= sizeof iom) {
2544                         sge.addr = (uintptr_t) &iom;
2545                         sge.length = sizeof iom;
2546                         sge.lkey = 0;
2547                         ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE);
2548                 } else if (rs_sbuf_left(rs) >= sizeof iom) {
2549                         memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom);
2550                         rs->ssgl[0].length = sizeof iom;
2551                         ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
2552                         if (rs_sbuf_left(rs) > sizeof iom)
2553                                 rs->ssgl[0].addr += sizeof iom;
2554                         else
2555                                 rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
2556                 } else {
2557                         rs->ssgl[0].length = rs_sbuf_left(rs);
2558                         memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
2559                                 rs->ssgl[0].length);
2560                         rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
2561                         memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
2562                                rs->ssgl[1].length);
2563                         ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
2564                         rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
2565                 }
2566                 dlist_remove(&iomr->entry);
2567                 dlist_insert_tail(&iomr->entry, &rs->iomap_list);
2568                 if (ret)
2569                         break;
2570         }
2571
2572         rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
2573         fastlock_release(&rs->map_lock);
2574         return ret;
2575 }
2576
2577 static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
2578                             int iovcnt, int flags, uint8_t op)
2579 {
2580         struct ds_udp_header hdr;
2581         struct msghdr msg;
2582         struct iovec miov[8];
2583         ssize_t ret;
2584
2585         if (iovcnt > 8)
2586                 return ERR(ENOTSUP);
2587
2588         hdr.tag = htonl(DS_UDP_TAG);
2589         hdr.version = rs->conn_dest->qp->hdr.version;
2590         hdr.op = op;
2591         hdr.reserved = 0;
2592         hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
2593         if (rs->conn_dest->qp->hdr.version == 4) {
2594                 hdr.length = DS_UDP_IPV4_HDR_LEN;
2595                 hdr.addr.ipv4 = rs->conn_dest->qp->hdr.addr.ipv4;
2596         } else {
2597                 hdr.length = DS_UDP_IPV6_HDR_LEN;
2598                 memcpy(hdr.addr.ipv6, &rs->conn_dest->qp->hdr.addr.ipv6, 16);
2599         }
2600
2601         miov[0].iov_base = &hdr;
2602         miov[0].iov_len = hdr.length;
2603         if (iov && iovcnt)
2604                 memcpy(&miov[1], iov, sizeof *iov * iovcnt);
2605
2606         memset(&msg, 0, sizeof msg);
2607         msg.msg_name = &rs->conn_dest->addr;
2608         msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
2609         msg.msg_iov = miov;
2610         msg.msg_iovlen = iovcnt + 1;
2611         ret = sendmsg(rs->udp_sock, &msg, flags);
2612         return ret > 0 ? ret - hdr.length : ret;
2613 }
2614
2615 static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
2616                            int flags, uint8_t op)
2617 {
2618         struct iovec iov;
2619         if (buf && len) {
2620                 iov.iov_base = (void *) buf;
2621                 iov.iov_len = len;
2622                 return ds_sendv_udp(rs, &iov, 1, flags, op);
2623         } else {
2624                 return ds_sendv_udp(rs, NULL, 0, flags, op);
2625         }
2626 }
2627
2628 static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
2629 {
2630         struct ds_smsg *msg;
2631         struct ibv_sge sge;
2632         uint64_t offset;
2633         int ret = 0;
2634
2635         if (!rs->conn_dest->ah)
2636                 return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
2637
2638         if (!ds_can_send(rs)) {
2639                 ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
2640                 if (ret)
2641                         return ret;
2642         }
2643
2644         msg = rs->smsg_free;
2645         rs->smsg_free = msg->next;
2646         rs->sqe_avail--;
2647
2648         memcpy((void *) msg, &rs->conn_dest->qp->hdr, rs->conn_dest->qp->hdr.length);
2649         memcpy((void *) msg + rs->conn_dest->qp->hdr.length, buf, len);
2650         sge.addr = (uintptr_t) msg;
2651         sge.length = rs->conn_dest->qp->hdr.length + len;
2652         sge.lkey = rs->conn_dest->qp->smr->lkey;
2653         offset = (uint8_t *) msg - rs->sbuf;
2654
2655         ret = ds_post_send(rs, &sge, offset);
2656         return ret ? ret : len;
2657 }
2658
2659 /*
2660  * We overlap sending the data, by posting a small work request immediately,
2661  * then increasing the size of the send on each iteration.
2662  */
2663 ssize_t rsend(int socket, const void *buf, size_t len, int flags)
2664 {
2665         struct rsocket *rs;
2666         struct ibv_sge sge;
2667         size_t left = len;
2668         uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
2669         int ret = 0;
2670
2671         rs = idm_at(&idm, socket);
2672         if (rs->type == SOCK_DGRAM) {
2673                 fastlock_acquire(&rs->slock);
2674                 ret = dsend(rs, buf, len, flags);
2675                 fastlock_release(&rs->slock);
2676                 return ret;
2677         }
2678
2679         if (rs->state & rs_opening) {
2680                 ret = rs_do_connect(rs);
2681                 if (ret) {
2682                         if (errno == EINPROGRESS)
2683                                 errno = EAGAIN;
2684                         return ret;
2685                 }
2686         }
2687
2688         fastlock_acquire(&rs->slock);
2689         if (rs->iomap_pending) {
2690                 ret = rs_send_iomaps(rs, flags);
2691                 if (ret)
2692                         goto out;
2693         }
2694         for (; left; left -= xfer_size, buf += xfer_size) {
2695                 if (!rs_can_send(rs)) {
2696                         ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
2697                                           rs_conn_can_send);
2698                         if (ret)
2699                                 break;
2700                         if (!(rs->state & rs_writable)) {
2701                                 ret = ERR(ECONNRESET);
2702                                 break;
2703                         }
2704                 }
2705
2706                 if (olen < left) {
2707                         xfer_size = olen;
2708                         if (olen < RS_MAX_TRANSFER)
2709                                 olen <<= 1;
2710                 } else {
2711                         xfer_size = left;
2712                 }
2713
2714                 if (xfer_size > rs->sbuf_bytes_avail)
2715                         xfer_size = rs->sbuf_bytes_avail;
2716                 if (xfer_size > rs->target_sgl[rs->target_sge].length)
2717                         xfer_size = rs->target_sgl[rs->target_sge].length;
2718
2719                 if (xfer_size <= rs->sq_inline) {
2720                         sge.addr = (uintptr_t) buf;
2721                         sge.length = xfer_size;
2722                         sge.lkey = 0;
2723                         ret = rs_write_data(rs, &sge, 1, xfer_size, IBV_SEND_INLINE);
2724                 } else if (xfer_size <= rs_sbuf_left(rs)) {
2725                         memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
2726                         rs->ssgl[0].length = xfer_size;
2727                         ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, 0);
2728                         if (xfer_size < rs_sbuf_left(rs))
2729                                 rs->ssgl[0].addr += xfer_size;
2730                         else
2731                                 rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
2732                 } else {
2733                         rs->ssgl[0].length = rs_sbuf_left(rs);
2734                         memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
2735                                 rs->ssgl[0].length);
2736                         rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
2737                         memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
2738                         ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, 0);
2739                         rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
2740                 }
2741                 if (ret)
2742                         break;
2743         }
2744 out:
2745         fastlock_release(&rs->slock);
2746
2747         return (ret && left == len) ? ret : len - left;
2748 }
2749
2750 ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
2751                 const struct sockaddr *dest_addr, socklen_t addrlen)
2752 {
2753         struct rsocket *rs;
2754         int ret;
2755
2756         rs = idm_at(&idm, socket);
2757         if (rs->type == SOCK_STREAM) {
2758                 if (dest_addr || addrlen)
2759                         return ERR(EISCONN);
2760
2761                 return rsend(socket, buf, len, flags);
2762         }
2763
2764         if (rs->state == rs_init) {
2765                 ret = ds_init_ep(rs);
2766                 if (ret)
2767                         return ret;
2768         }
2769
2770         fastlock_acquire(&rs->slock);
2771         if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
2772                 ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
2773                 if (ret)
2774                         goto out;
2775         }
2776
2777         ret = dsend(rs, buf, len, flags);
2778 out:
2779         fastlock_release(&rs->slock);
2780         return ret;
2781 }
2782
2783 static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
2784 {
2785         size_t size;
2786
2787         while (len) {
2788                 size = (*iov)->iov_len - *offset;
2789                 if (size > len) {
2790                         memcpy (dst, (*iov)->iov_base + *offset, len);
2791                         *offset += len;
2792                         break;
2793                 }
2794
2795                 memcpy(dst, (*iov)->iov_base + *offset, size);
2796                 len -= size;
2797                 dst += size;
2798                 (*iov)++;
2799                 *offset = 0;
2800         }
2801 }
2802
2803 static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags)
2804 {
2805         struct rsocket *rs;
2806         const struct iovec *cur_iov;
2807         size_t left, len, offset = 0;
2808         uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
2809         int i, ret = 0;
2810
2811         rs = idm_at(&idm, socket);
2812         if (rs->state & rs_opening) {
2813                 ret = rs_do_connect(rs);
2814                 if (ret) {
2815                         if (errno == EINPROGRESS)
2816                                 errno = EAGAIN;
2817                         return ret;
2818                 }
2819         }
2820
2821         cur_iov = iov;
2822         len = iov[0].iov_len;
2823         for (i = 1; i < iovcnt; i++)
2824                 len += iov[i].iov_len;
2825         left = len;
2826
2827         fastlock_acquire(&rs->slock);
2828         if (rs->iomap_pending) {
2829                 ret = rs_send_iomaps(rs, flags);
2830                 if (ret)
2831                         goto out;
2832         }
2833         for (; left; left -= xfer_size) {
2834                 if (!rs_can_send(rs)) {
2835                         ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
2836                                           rs_conn_can_send);
2837                         if (ret)
2838                                 break;
2839                         if (!(rs->state & rs_writable)) {
2840                                 ret = ERR(ECONNRESET);
2841                                 break;
2842                         }
2843                 }
2844
2845                 if (olen < left) {
2846                         xfer_size = olen;
2847                         if (olen < RS_MAX_TRANSFER)
2848                                 olen <<= 1;
2849                 } else {
2850                         xfer_size = left;
2851                 }
2852
2853                 if (xfer_size > rs->sbuf_bytes_avail)
2854                         xfer_size = rs->sbuf_bytes_avail;
2855                 if (xfer_size > rs->target_sgl[rs->target_sge].length)
2856                         xfer_size = rs->target_sgl[rs->target_sge].length;
2857
2858                 if (xfer_size <= rs_sbuf_left(rs)) {
2859                         rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr,
2860                                     &cur_iov, &offset, xfer_size);
2861                         rs->ssgl[0].length = xfer_size;
2862                         ret = rs_write_data(rs, rs->ssgl, 1, xfer_size,
2863                                             xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
2864                         if (xfer_size < rs_sbuf_left(rs))
2865                                 rs->ssgl[0].addr += xfer_size;
2866                         else
2867                                 rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
2868                 } else {
2869                         rs->ssgl[0].length = rs_sbuf_left(rs);
2870                         rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr, &cur_iov,
2871                                     &offset, rs->ssgl[0].length);
2872                         rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
2873                         rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length);
2874                         ret = rs_write_data(rs, rs->ssgl, 2, xfer_size,
2875                                             xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
2876                         rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
2877                 }
2878                 if (ret)
2879                         break;
2880         }
2881 out:
2882         fastlock_release(&rs->slock);
2883
2884         return (ret && left == len) ? ret : len - left;
2885 }
2886
2887 ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
2888 {
2889         if (msg->msg_control && msg->msg_controllen)
2890                 return ERR(ENOTSUP);
2891
2892         return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, flags);
2893 }
2894
2895 ssize_t rwrite(int socket, const void *buf, size_t count)
2896 {
2897         return rsend(socket, buf, count, 0);
2898 }
2899
2900 ssize_t rwritev(int socket, const struct iovec *iov, int iovcnt)
2901 {
2902         return rsendv(socket, iov, iovcnt, 0);
2903 }
2904
2905 static struct pollfd *rs_fds_alloc(nfds_t nfds)
2906 {
2907         static __thread struct pollfd *rfds;
2908         static __thread nfds_t rnfds;
2909
2910         if (nfds > rnfds) {
2911                 if (rfds)
2912                         free(rfds);
2913
2914                 rfds = malloc(sizeof *rfds * nfds);
2915                 rnfds = rfds ? nfds : 0;
2916         }
2917
2918         return rfds;
2919 }
2920
2921 static int rs_poll_rs(struct rsocket *rs, int events,
2922                       int nonblock, int (*test)(struct rsocket *rs))
2923 {
2924         struct pollfd fds;
2925         short revents;
2926         int ret;
2927
2928 check_cq:
2929         if ((rs->type == SOCK_STREAM) && ((rs->state & rs_connected) ||
2930              (rs->state == rs_disconnected) || (rs->state & rs_error))) {
2931                 rs_process_cq(rs, nonblock, test);
2932
2933                 revents = 0;
2934                 if ((events & POLLIN) && rs_conn_have_rdata(rs))
2935                         revents |= POLLIN;
2936                 if ((events & POLLOUT) && rs_can_send(rs))
2937                         revents |= POLLOUT;
2938                 if (!(rs->state & rs_connected)) {
2939                         if (rs->state == rs_disconnected)
2940                                 revents |= POLLHUP;
2941                         else
2942                                 revents |= POLLERR;
2943                 }
2944
2945                 return revents;
2946         } else if (rs->type == SOCK_DGRAM) {
2947                 ds_process_cqs(rs, nonblock, test);
2948
2949                 revents = 0;
2950                 if ((events & POLLIN) && rs_have_rdata(rs))
2951                         revents |= POLLIN;
2952                 if ((events & POLLOUT) && ds_can_send(rs))
2953                         revents |= POLLOUT;
2954
2955                 return revents;
2956         }
2957
2958         if (rs->state == rs_listening) {
2959                 fds.fd = rs->cm_id->channel->fd;
2960                 fds.events = events;
2961                 fds.revents = 0;
2962                 poll(&fds, 1, 0);
2963                 return fds.revents;
2964         }
2965
2966         if (rs->state & rs_opening) {
2967                 ret = rs_do_connect(rs);
2968                 if (ret) {
2969                         if (errno == EINPROGRESS) {
2970                                 errno = 0;
2971                                 return 0;
2972                         } else {
2973                                 return POLLOUT;
2974                         }
2975                 }
2976                 goto check_cq;
2977         }
2978
2979         if (rs->state == rs_connect_error)
2980                 return (rs->err && events & POLLOUT) ? POLLOUT : 0;
2981
2982         return 0;
2983 }
2984
2985 static int rs_poll_check(struct pollfd *fds, nfds_t nfds)
2986 {
2987         struct rsocket *rs;
2988         int i, cnt = 0;
2989
2990         for (i = 0; i < nfds; i++) {
2991                 rs = idm_lookup(&idm, fds[i].fd);
2992                 if (rs)
2993                         fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
2994                 else
2995                         poll(&fds[i], 1, 0);
2996
2997                 if (fds[i].revents)
2998                         cnt++;
2999         }
3000         return cnt;
3001 }
3002
3003 static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
3004 {
3005         struct rsocket *rs;
3006         int i;
3007
3008         for (i = 0; i < nfds; i++) {
3009                 rs = idm_lookup(&idm, fds[i].fd);
3010                 if (rs) {
3011                         fds[i].revents = rs_poll_rs(rs, fds[i].events, 0, rs_is_cq_armed);
3012                         if (fds[i].revents)
3013                                 return 1;
3014
3015                         if (rs->type == SOCK_STREAM) {
3016                                 if (rs->state >= rs_connected)
3017                                         rfds[i].fd = rs->cm_id->recv_cq_channel->fd;
3018                                 else
3019                                         rfds[i].fd = rs->cm_id->channel->fd;
3020                         } else {
3021                                 rfds[i].fd = rs->epfd;
3022                         }
3023                         rfds[i].events = POLLIN;
3024                 } else {
3025                         rfds[i].fd = fds[i].fd;
3026                         rfds[i].events = fds[i].events;
3027                 }
3028                 rfds[i].revents = 0;
3029         }
3030         return 0;
3031 }
3032
3033 static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
3034 {
3035         struct rsocket *rs;
3036         int i, cnt = 0;
3037
3038         for (i = 0; i < nfds; i++) {
3039                 if (!rfds[i].revents)
3040                         continue;
3041
3042                 rs = idm_lookup(&idm, fds[i].fd);
3043                 if (rs) {
3044                         fastlock_acquire(&rs->cq_wait_lock);
3045                         if (rs->type == SOCK_STREAM)
3046                                 rs_get_cq_event(rs);
3047                         else
3048                                 ds_get_cq_event(rs);
3049                         fastlock_release(&rs->cq_wait_lock);
3050                         fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
3051                 } else {
3052                         fds[i].revents = rfds[i].revents;
3053                 }
3054                 if (fds[i].revents)
3055                         cnt++;
3056         }
3057         return cnt;
3058 }
3059
3060 /*
3061  * We need to poll *all* fd's that the user specifies at least once.
3062  * Note that we may receive events on an rsocket that may not be reported
3063  * to the user (e.g. connection events or credit updates).  Process those
3064  * events, then return to polling until we find ones of interest.
3065  */
3066 int rpoll(struct pollfd *fds, nfds_t nfds, int timeout)
3067 {
3068         struct timeval s, e;
3069         struct pollfd *rfds;
3070         uint32_t poll_time = 0;
3071         int ret;
3072
3073         do {
3074                 ret = rs_poll_check(fds, nfds);
3075                 if (ret || !timeout)
3076                         return ret;
3077
3078                 if (!poll_time)
3079                         gettimeofday(&s, NULL);
3080
3081                 gettimeofday(&e, NULL);
3082                 poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
3083                             (e.tv_usec - s.tv_usec) + 1;
3084         } while (poll_time <= polling_time);
3085
3086         rfds = rs_fds_alloc(nfds);
3087         if (!rfds)
3088                 return ERR(ENOMEM);
3089
3090         do {
3091                 ret = rs_poll_arm(rfds, fds, nfds);
3092                 if (ret)
3093                         break;
3094
3095                 ret = poll(rfds, nfds, timeout);
3096                 if (ret <= 0)
3097                         break;
3098
3099                 ret = rs_poll_events(rfds, fds, nfds);
3100         } while (!ret);
3101
3102         return ret;
3103 }
3104
3105 static struct pollfd *
3106 rs_select_to_poll(int *nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds)
3107 {
3108         struct pollfd *fds;
3109         int fd, i = 0;
3110
3111         fds = calloc(*nfds, sizeof *fds);
3112         if (!fds)
3113                 return NULL;
3114
3115         for (fd = 0; fd < *nfds; fd++) {
3116                 if (readfds && FD_ISSET(fd, readfds)) {
3117                         fds[i].fd = fd;
3118                         fds[i].events = POLLIN;
3119                 }
3120
3121                 if (writefds && FD_ISSET(fd, writefds)) {
3122                         fds[i].fd = fd;
3123                         fds[i].events |= POLLOUT;
3124                 }
3125
3126                 if (exceptfds && FD_ISSET(fd, exceptfds))
3127                         fds[i].fd = fd;
3128
3129                 if (fds[i].fd)
3130                         i++;
3131         }
3132
3133         *nfds = i;
3134         return fds;
3135 }
3136
3137 static int
3138 rs_poll_to_select(int nfds, struct pollfd *fds, fd_set *readfds,
3139                   fd_set *writefds, fd_set *exceptfds)
3140 {
3141         int i, cnt = 0;
3142
3143         for (i = 0; i < nfds; i++) {
3144                 if (readfds && (fds[i].revents & (POLLIN | POLLHUP))) {
3145                         FD_SET(fds[i].fd, readfds);
3146                         cnt++;
3147                 }
3148
3149                 if (writefds && (fds[i].revents & POLLOUT)) {
3150                         FD_SET(fds[i].fd, writefds);
3151                         cnt++;
3152                 }
3153
3154                 if (exceptfds && (fds[i].revents & ~(POLLIN | POLLOUT))) {
3155                         FD_SET(fds[i].fd, exceptfds);
3156                         cnt++;
3157                 }
3158         }
3159         return cnt;
3160 }
3161
3162 static int rs_convert_timeout(struct timeval *timeout)
3163 {
3164         return !timeout ? -1 :
3165                 timeout->tv_sec * 1000 + timeout->tv_usec / 1000;
3166 }
3167
3168 int rselect(int nfds, fd_set *readfds, fd_set *writefds,
3169             fd_set *exceptfds, struct timeval *timeout)
3170 {
3171         struct pollfd *fds;
3172         int ret;
3173
3174         fds = rs_select_to_poll(&nfds, readfds, writefds, exceptfds);
3175         if (!fds)
3176                 return ERR(ENOMEM);
3177
3178         ret = rpoll(fds, nfds, rs_convert_timeout(timeout));
3179
3180         if (readfds)
3181                 FD_ZERO(readfds);
3182         if (writefds)
3183                 FD_ZERO(writefds);
3184         if (exceptfds)
3185                 FD_ZERO(exceptfds);
3186
3187         if (ret > 0)
3188                 ret = rs_poll_to_select(nfds, fds, readfds, writefds, exceptfds);
3189
3190         free(fds);
3191         return ret;
3192 }
3193
3194 /*
3195  * For graceful disconnect, notify the remote side that we're
3196  * disconnecting and wait until all outstanding sends complete, provided
3197  * that the remote side has not sent a disconnect message.
3198  */
3199 int rshutdown(int socket, int how)
3200 {
3201         struct rsocket *rs;
3202         int ctrl, ret = 0;
3203
3204         rs = idm_lookup(&idm, socket);
3205         if (!rs)
3206                 return ERR(EBADF);
3207         if (rs->opts & RS_OPT_SVC_ACTIVE)
3208                 rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
3209
3210         if (rs->fd_flags & O_NONBLOCK)
3211                 rs_set_nonblocking(rs, 0);
3212
3213         if (rs->state & rs_connected) {
3214                 if (how == SHUT_RDWR) {
3215                         ctrl = RS_CTRL_DISCONNECT;
3216                         rs->state &= ~(rs_readable | rs_writable);
3217                 } else if (how == SHUT_WR) {
3218                         rs->state &= ~rs_writable;
3219                         ctrl = (rs->state & rs_readable) ?
3220                                 RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
3221                 } else {
3222                         rs->state &= ~rs_readable;
3223                         if (rs->state & rs_writable)
3224                                 goto out;
3225                         ctrl = RS_CTRL_DISCONNECT;
3226                 }
3227                 if (!rs_ctrl_avail(rs)) {
3228                         ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl);
3229                         if (ret)
3230                                 goto out;
3231                 }
3232
3233                 if ((rs->state & rs_connected) && rs_ctrl_avail(rs)) {
3234                         rs->ctrl_seqno++;
3235                         ret = rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, ctrl));
3236                 }
3237         }
3238
3239         if (rs->state & rs_connected)
3240                 rs_process_cq(rs, 0, rs_conn_all_sends_done);
3241
3242 out:
3243         if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
3244                 rs_set_nonblocking(rs, rs->fd_flags);
3245
3246         if (rs->state & rs_disconnected) {
3247                 /* Generate event by flushing receives to unblock rpoll */
3248                 ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
3249                 ucma_shutdown(rs->cm_id);
3250         }
3251
3252         return ret;
3253 }
3254
3255 static void ds_shutdown(struct rsocket *rs)
3256 {
3257         if (rs->opts & RS_OPT_SVC_ACTIVE)
3258                 rs_notify_svc(&udp_svc, rs, RS_SVC_REM_DGRAM);
3259
3260         if (rs->fd_flags & O_NONBLOCK)
3261                 rs_set_nonblocking(rs, 0);
3262
3263         rs->state &= ~(rs_readable | rs_writable);
3264         ds_process_cqs(rs, 0, ds_all_sends_done);
3265
3266         if (rs->fd_flags & O_NONBLOCK)
3267                 rs_set_nonblocking(rs, rs->fd_flags);
3268 }
3269
3270 int rclose(int socket)
3271 {
3272         struct rsocket *rs;
3273
3274         rs = idm_lookup(&idm, socket);
3275         if (!rs)
3276                 return EBADF;
3277         if (rs->type == SOCK_STREAM) {
3278                 if (rs->state & rs_connected)
3279                         rshutdown(socket, SHUT_RDWR);
3280                 else if (rs->opts & RS_OPT_SVC_ACTIVE)
3281                         rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
3282         } else {
3283                 ds_shutdown(rs);
3284         }
3285
3286         rs_free(rs);
3287         return 0;
3288 }
3289
3290 static void rs_copy_addr(struct sockaddr *dst, struct sockaddr *src, socklen_t *len)
3291 {
3292         socklen_t size;
3293
3294         if (src->sa_family == AF_INET) {
3295                 size = min(*len, sizeof(struct sockaddr_in));
3296                 *len = sizeof(struct sockaddr_in);
3297         } else {
3298                 size = min(*len, sizeof(struct sockaddr_in6));
3299                 *len = sizeof(struct sockaddr_in6);
3300         }
3301         memcpy(dst, src, size);
3302 }
3303
3304 int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
3305 {
3306         struct rsocket *rs;
3307
3308         rs = idm_lookup(&idm, socket);
3309         if (!rs)
3310                 return ERR(EBADF);
3311         if (rs->type == SOCK_STREAM) {
3312                 rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
3313                 return 0;
3314         } else {
3315                 return getpeername(rs->udp_sock, addr, addrlen);
3316         }
3317 }
3318
3319 int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
3320 {
3321         struct rsocket *rs;
3322
3323         rs = idm_lookup(&idm, socket);
3324         if (!rs)
3325                 return ERR(EBADF);
3326         if (rs->type == SOCK_STREAM) {
3327                 rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
3328                 return 0;
3329         } else {
3330                 return getsockname(rs->udp_sock, addr, addrlen);
3331         }
3332 }
3333
3334 static int rs_set_keepalive(struct rsocket *rs, int on)
3335 {
3336         FILE *f;
3337         int ret;
3338
3339         if ((on && (rs->opts & RS_OPT_SVC_ACTIVE)) ||
3340             (!on && !(rs->opts & RS_OPT_SVC_ACTIVE)))
3341                 return 0;
3342
3343         if (on) {
3344                 if (!rs->keepalive_time) {
3345                         if ((f = fopen("/proc/sys/net/ipv4/tcp_keepalive_time", "r"))) {
3346                                 (void) fscanf(f, "%u", &rs->keepalive_time);
3347                                 fclose(f);
3348                         } else {
3349                                 rs->keepalive_time = 7200;
3350                         }
3351                 }
3352                 ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_ADD_KEEPALIVE);
3353         } else {
3354                 ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
3355         }
3356
3357         return ret;
3358 }
3359
3360 int rsetsockopt(int socket, int level, int optname,
3361                 const void *optval, socklen_t optlen)
3362 {
3363         struct rsocket *rs;
3364         int ret, opt_on = 0;
3365         uint64_t *opts = NULL;
3366
3367         ret = ERR(ENOTSUP);
3368         rs = idm_lookup(&idm, socket);
3369         if (!rs)
3370                 return ERR(EBADF);
3371         if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
3372                 ret = setsockopt(rs->udp_sock, level, optname, optval, optlen);
3373                 if (ret)
3374                         return ret;
3375         }
3376
3377         switch (level) {
3378         case SOL_SOCKET:
3379                 opts = &rs->so_opts;
3380                 switch (optname) {
3381                 case SO_REUSEADDR:
3382                         if (rs->type == SOCK_STREAM) {
3383                                 ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
3384                                                       RDMA_OPTION_ID_REUSEADDR,
3385                                                       (void *) optval, optlen);
3386                                 if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
3387                                     rs->cm_id->context &&
3388                                     (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
3389                                         ret = 0;
3390                         }
3391                         opt_on = *(int *) optval;
3392                         break;
3393                 case SO_RCVBUF:
3394                         if ((rs->type == SOCK_STREAM && !rs->rbuf) ||
3395                             (rs->type == SOCK_DGRAM && !rs->qp_list))
3396                                 rs->rbuf_size = (*(uint32_t *) optval) << 1;
3397                         ret = 0;
3398                         break;
3399                 case SO_SNDBUF:
3400                         if (!rs->sbuf)
3401                                 rs->sbuf_size = (*(uint32_t *) optval) << 1;
3402                         if (rs->sbuf_size < RS_SNDLOWAT)
3403                                 rs->sbuf_size = RS_SNDLOWAT << 1;
3404                         ret = 0;
3405                         break;
3406                 case SO_LINGER: