4833810a8c91a254299d9a8e04a4a4ebb7105cb5
[~shefty/librdmacm.git] / src / rsocket.c
1 /*
2  * Copyright (c) 2008-2013 Intel Corporation.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  *
32  */
33
34 #if HAVE_CONFIG_H
35 #  include <config.h>
36 #endif /* HAVE_CONFIG_H */
37
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <sys/time.h>
41 #include <stdarg.h>
42 #include <netdb.h>
43 #include <unistd.h>
44 #include <fcntl.h>
45 #include <stdio.h>
46 #include <stddef.h>
47 #include <string.h>
48 #include <netinet/in.h>
49 #include <netinet/tcp.h>
50 #include <sys/epoll.h>
51 #include <search.h>
52
53 #include <rdma/rdma_cma.h>
54 #include <rdma/rdma_verbs.h>
55 #include <rdma/rsocket.h>
56 #include "cma.h"
57 #include "indexer.h"
58
59 #define RS_OLAP_START_SIZE 2048
60 #define RS_MAX_TRANSFER 65536
61 #define RS_SNDLOWAT 2048
62 #define RS_QP_MIN_SIZE 16
63 #define RS_QP_MAX_SIZE 0xFFFE
64 #define RS_QP_CTRL_SIZE 4       /* must be power of 2 */
65 #define RS_CONN_RETRIES 6
66 #define RS_SGL_SIZE 2
67 static struct index_map idm;
68 static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
69
70 struct rsocket;
71
72 enum {
73         RS_SVC_NOOP,
74         RS_SVC_ADD_DGRAM,
75         RS_SVC_REM_DGRAM,
76         RS_SVC_ADD_KEEPALIVE,
77         RS_SVC_REM_KEEPALIVE,
78         RS_SVC_MOD_KEEPALIVE
79 };
80
81 struct rs_svc_msg {
82         uint32_t cmd;
83         uint32_t status;
84         struct rsocket *rs;
85 };
86
87 struct rs_svc {
88         pthread_t id;
89         int sock[2];
90         int cnt;
91         int size;
92         int context_size;
93         void *(*run)(void *svc);
94         struct rsocket **rss;
95         void *contexts;
96 };
97
98 static struct pollfd *udp_svc_fds;
99 static void *udp_svc_run(void *arg);
100 static struct rs_svc udp_svc = {
101         .context_size = sizeof(*udp_svc_fds),
102         .run = udp_svc_run
103 };
104 static uint32_t *tcp_svc_timeouts;
105 static void *tcp_svc_run(void *arg);
106 static struct rs_svc tcp_svc = {
107         .context_size = sizeof(*tcp_svc_timeouts),
108         .run = tcp_svc_run
109 };
110
111 static uint16_t def_iomap_size = 0;
112 static uint16_t def_inline = 64;
113 static uint16_t def_sqsize = 384;
114 static uint16_t def_rqsize = 384;
115 static uint32_t def_mem = (1 << 17);
116 static uint32_t def_wmem = (1 << 17);
117 static uint32_t polling_time = 10;
118
119 /*
120  * Immediate data format is determined by the upper bits
121  * bit 31: message type, 0 - data, 1 - control
122  * bit 30: buffers updated, 0 - target, 1 - direct-receive
123  * bit 29: more data, 0 - end of transfer, 1 - more data available
124  *
125  * for data transfers:
126  * bits [28:0]: bytes transferred
127  * for control messages:
128  * SGL, CTRL
129  * bits [28-0]: receive credits granted
130  * IOMAP_SGL
131  * bits [28-16]: reserved, bits [15-0]: index
132  */
133
134 enum {
135         RS_OP_DATA,
136         RS_OP_RSVD_DATA_MORE,
137         RS_OP_WRITE, /* opcode is not transmitted over the network */
138         RS_OP_RSVD_DRA_MORE,
139         RS_OP_SGL,
140         RS_OP_RSVD,
141         RS_OP_IOMAP_SGL,
142         RS_OP_CTRL
143 };
144 #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
145 #define rs_msg_op(imm_data)   (imm_data >> 29)
146 #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
147 #define RS_MSG_SIZE           sizeof(uint32_t)
148
149 #define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63)
150 #define RS_WR_ID_FLAG_MSG_SEND (((uint64_t) 1) << 62) /* See RS_OPT_MSG_SEND */
151 #define rs_send_wr_id(data) ((uint64_t) data)
152 #define rs_recv_wr_id(data) (RS_WR_ID_FLAG_RECV | (uint64_t) data)
153 #define rs_wr_is_recv(wr_id) (wr_id & RS_WR_ID_FLAG_RECV)
154 #define rs_wr_is_msg_send(wr_id) (wr_id & RS_WR_ID_FLAG_MSG_SEND)
155 #define rs_wr_data(wr_id) ((uint32_t) wr_id)
156
157 enum {
158         RS_CTRL_DISCONNECT,
159         RS_CTRL_KEEPALIVE,
160         RS_CTRL_SHUTDOWN
161 };
162
163 struct rs_msg {
164         uint32_t op;
165         uint32_t data;
166 };
167
168 struct ds_qp;
169
170 struct ds_rmsg {
171         struct ds_qp    *qp;
172         uint32_t        offset;
173         uint32_t        length;
174 };
175
176 struct ds_smsg {
177         struct ds_smsg  *next;
178 };
179
180 struct rs_sge {
181         uint64_t addr;
182         uint32_t key;
183         uint32_t length;
184 };
185
186 struct rs_iomap {
187         uint64_t offset;
188         struct rs_sge sge;
189 };
190
191 struct rs_iomap_mr {
192         uint64_t offset;
193         struct ibv_mr *mr;
194         dlist_entry entry;
195         atomic_t refcnt;
196         int index;      /* -1 if mapping is local and not in iomap_list */
197 };
198
199 #define RS_MAX_CTRL_MSG    (sizeof(struct rs_sge))
200 #define rs_host_is_net()   (1 == htonl(1))
201 #define RS_CONN_FLAG_NET   (1 << 0)
202 #define RS_CONN_FLAG_IOMAP (1 << 1)
203
204 struct rs_conn_data {
205         uint8_t           version;
206         uint8_t           flags;
207         uint16_t          credits;
208         uint8_t           reserved[3];
209         uint8_t           target_iomap_size;
210         struct rs_sge     target_sgl;
211         struct rs_sge     data_buf;
212 };
213
214 struct rs_conn_private_data {
215         union {
216                 struct rs_conn_data             conn_data;
217                 struct {
218                         struct ib_connect_hdr   ib_hdr;
219                         struct rs_conn_data     conn_data;
220                 } af_ib;
221         };
222 };
223
224 /*
225  * rsocket states are ordered as passive, connecting, connected, disconnected.
226  */
227 enum rs_state {
228         rs_init,
229         rs_bound           =                0x0001,
230         rs_listening       =                0x0002,
231         rs_opening         =                0x0004,
232         rs_resolving_addr  = rs_opening |   0x0010,
233         rs_resolving_route = rs_opening |   0x0020,
234         rs_connecting      = rs_opening |   0x0040,
235         rs_accepting       = rs_opening |   0x0080,
236         rs_connected       =                0x0100,
237         rs_writable        =                0x0200,
238         rs_readable        =                0x0400,
239         rs_connect_rdwr    = rs_connected | rs_readable | rs_writable,
240         rs_connect_error   =                0x0800,
241         rs_disconnected    =                0x1000,
242         rs_error           =                0x2000,
243 };
244
245 #define RS_OPT_SWAP_SGL   (1 << 0)
246 /*
247  * iWarp does not support RDMA write with immediate data.  For iWarp, we
248  * transfer rsocket messages as inline sends.
249  */
250 #define RS_OPT_MSG_SEND   (1 << 1)
251 #define RS_OPT_SVC_ACTIVE (1 << 2)
252
253 union socket_addr {
254         struct sockaddr         sa;
255         struct sockaddr_in      sin;
256         struct sockaddr_in6     sin6;
257 };
258
259 struct ds_header {
260         uint8_t           version;
261         uint8_t           length;
262         uint16_t          port;
263         union {
264                 uint32_t  ipv4;
265                 struct {
266                         uint32_t flowinfo;
267                         uint8_t  addr[16];
268                 } ipv6;
269         } addr;
270 };
271
272 #define DS_IPV4_HDR_LEN  8
273 #define DS_IPV6_HDR_LEN 24
274
275 struct ds_dest {
276         union socket_addr addr; /* must be first */
277         struct ds_qp      *qp;
278         struct ibv_ah     *ah;
279         uint32_t           qpn;
280 };
281
282 struct ds_qp {
283         dlist_entry       list;
284         struct rsocket    *rs;
285         struct rdma_cm_id *cm_id;
286         struct ds_header  hdr;
287         struct ds_dest    dest;
288
289         struct ibv_mr     *smr;
290         struct ibv_mr     *rmr;
291         uint8_t           *rbuf;
292
293         int               cq_armed;
294 };
295
296 struct rsocket {
297         int               type;
298         int               index;
299         fastlock_t        slock;
300         fastlock_t        rlock;
301         fastlock_t        cq_lock;
302         fastlock_t        cq_wait_lock;
303         fastlock_t        map_lock; /* acquire slock first if needed */
304
305         union {
306                 /* data stream */
307                 struct {
308                         struct rdma_cm_id *cm_id;
309                         uint64_t          tcp_opts;
310                         unsigned int      keepalive_time;
311
312                         unsigned int      ctrl_seqno;
313                         unsigned int      ctrl_max_seqno;
314                         uint16_t          sseq_no;
315                         uint16_t          sseq_comp;
316                         uint16_t          rseq_no;
317                         uint16_t          rseq_comp;
318
319                         int               remote_sge;
320                         struct rs_sge     remote_sgl;
321                         struct rs_sge     remote_iomap;
322
323                         struct ibv_mr     *target_mr;
324                         int               target_sge;
325                         int               target_iomap_size;
326                         void              *target_buffer_list;
327                         volatile struct rs_sge    *target_sgl;
328                         struct rs_iomap   *target_iomap;
329
330                         int               rbuf_msg_index;
331                         int               rbuf_bytes_avail;
332                         int               rbuf_free_offset;
333                         int               rbuf_offset;
334                         struct ibv_mr     *rmr;
335                         uint8_t           *rbuf;
336
337                         int               sbuf_bytes_avail;
338                         struct ibv_mr     *smr;
339                         struct ibv_sge    ssgl[2];
340                 };
341                 /* datagram */
342                 struct {
343                         struct ds_qp      *qp_list;
344                         void              *dest_map;
345                         struct ds_dest    *conn_dest;
346
347                         int               udp_sock;
348                         int               epfd;
349                         int               rqe_avail;
350                         struct ds_smsg    *smsg_free;
351                 };
352         };
353
354         int               opts;
355         long              fd_flags;
356         uint64_t          so_opts;
357         uint64_t          ipv6_opts;
358         void              *optval;
359         size_t            optlen;
360         int               state;
361         int               cq_armed;
362         int               retries;
363         int               err;
364
365         int               sqe_avail;
366         uint32_t          sbuf_size;
367         uint16_t          sq_size;
368         uint16_t          sq_inline;
369
370         uint32_t          rbuf_size;
371         uint16_t          rq_size;
372         int               rmsg_head;
373         int               rmsg_tail;
374         union {
375                 struct rs_msg     *rmsg;
376                 struct ds_rmsg    *dmsg;
377         };
378
379         uint8_t           *sbuf;
380         struct rs_iomap_mr *remote_iomappings;
381         dlist_entry       iomap_list;
382         dlist_entry       iomap_queue;
383         int               iomap_pending;
384 };
385
386 #define DS_UDP_TAG 0x55555555
387
388 struct ds_udp_header {
389         uint32_t          tag;
390         uint8_t           version;
391         uint8_t           op;
392         uint8_t           length;
393         uint8_t           reserved;
394         uint32_t          qpn;  /* lower 8-bits reserved */
395         union {
396                 uint32_t ipv4;
397                 uint8_t  ipv6[16];
398         } addr;
399 };
400
401 #define DS_UDP_IPV4_HDR_LEN 16
402 #define DS_UDP_IPV6_HDR_LEN 28
403
404 #define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
405
406 static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
407 {
408         if (!rs->qp_list)
409                 dlist_init(&qp->list);
410         else
411                 dlist_insert_head(&qp->list, &rs->qp_list->list);
412         rs->qp_list = qp;
413 }
414
415 static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
416 {
417         if (qp->list.next != &qp->list) {
418                 rs->qp_list = ds_next_qp(qp);
419                 dlist_remove(&qp->list);
420         } else {
421                 rs->qp_list = NULL;
422         }
423 }
424
425 static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd)
426 {
427         struct rs_svc_msg msg;
428         int ret;
429
430         pthread_mutex_lock(&mut);
431         if (!svc->cnt) {
432                 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc->sock);
433                 if (ret)
434                         goto unlock;
435
436                 ret = pthread_create(&svc->id, NULL, svc->run, svc);
437                 if (ret) {
438                         ret = ERR(ret);
439                         goto closepair;
440                 }
441         }
442
443         msg.cmd = cmd;
444         msg.status = EINVAL;
445         msg.rs = rs;
446         write(svc->sock[0], &msg, sizeof msg);
447         read(svc->sock[0], &msg, sizeof msg);
448         ret = rdma_seterrno(msg.status);
449         if (svc->cnt)
450                 goto unlock;
451
452         pthread_join(svc->id, NULL);
453 closepair:
454         close(svc->sock[0]);
455         close(svc->sock[1]);
456 unlock:
457         pthread_mutex_unlock(&mut);
458         return ret;
459 }
460
461 static int ds_compare_addr(const void *dst1, const void *dst2)
462 {
463         const struct sockaddr *sa1, *sa2;
464         size_t len;
465
466         sa1 = (const struct sockaddr *) dst1;
467         sa2 = (const struct sockaddr *) dst2;
468
469         len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
470               sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
471         return memcmp(dst1, dst2, len);
472 }
473
474 static int rs_value_to_scale(int value, int bits)
475 {
476         return value <= (1 << (bits - 1)) ?
477                value : (1 << (bits - 1)) | (value >> bits);
478 }
479
480 static int rs_scale_to_value(int value, int bits)
481 {
482         return value <= (1 << (bits - 1)) ?
483                value : (value & ~(1 << (bits - 1))) << bits;
484 }
485
486 void rs_configure(void)
487 {
488         FILE *f;
489         static int init;
490
491         if (init)
492                 return;
493
494         pthread_mutex_lock(&mut);
495         if (init)
496                 goto out;
497
498         if (ucma_init())
499                 goto out;
500         ucma_ib_init();
501
502         if ((f = fopen(RS_CONF_DIR "/polling_time", "r"))) {
503                 (void) fscanf(f, "%u", &polling_time);
504                 fclose(f);
505         }
506
507         if ((f = fopen(RS_CONF_DIR "/inline_default", "r"))) {
508                 (void) fscanf(f, "%hu", &def_inline);
509                 fclose(f);
510         }
511
512         if ((f = fopen(RS_CONF_DIR "/sqsize_default", "r"))) {
513                 (void) fscanf(f, "%hu", &def_sqsize);
514                 fclose(f);
515         }
516
517         if ((f = fopen(RS_CONF_DIR "/rqsize_default", "r"))) {
518                 (void) fscanf(f, "%hu", &def_rqsize);
519                 fclose(f);
520         }
521
522         if ((f = fopen(RS_CONF_DIR "/mem_default", "r"))) {
523                 (void) fscanf(f, "%u", &def_mem);
524                 fclose(f);
525
526                 if (def_mem < 1)
527                         def_mem = 1;
528         }
529
530         if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
531                 (void) fscanf(f, "%u", &def_wmem);
532                 fclose(f);
533                 if (def_wmem < RS_SNDLOWAT)
534                         def_wmem = RS_SNDLOWAT << 1;
535         }
536
537         if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) {
538                 (void) fscanf(f, "%hu", &def_iomap_size);
539                 fclose(f);
540
541                 /* round to supported values */
542                 def_iomap_size = (uint8_t) rs_value_to_scale(
543                         (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8);
544         }
545         init = 1;
546 out:
547         pthread_mutex_unlock(&mut);
548 }
549
550 static int rs_insert(struct rsocket *rs, int index)
551 {
552         pthread_mutex_lock(&mut);
553         rs->index = idm_set(&idm, index, rs);
554         pthread_mutex_unlock(&mut);
555         return rs->index;
556 }
557
558 static void rs_remove(struct rsocket *rs)
559 {
560         pthread_mutex_lock(&mut);
561         idm_clear(&idm, rs->index);
562         pthread_mutex_unlock(&mut);
563 }
564
565 /* We only inherit from listening sockets */
566 static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
567 {
568         struct rsocket *rs;
569
570         rs = calloc(1, sizeof *rs);
571         if (!rs)
572                 return NULL;
573
574         rs->type = type;
575         rs->index = -1;
576         if (type == SOCK_DGRAM) {
577                 rs->udp_sock = -1;
578                 rs->epfd = -1;
579         }
580
581         if (inherited_rs) {
582                 rs->sbuf_size = inherited_rs->sbuf_size;
583                 rs->rbuf_size = inherited_rs->rbuf_size;
584                 rs->sq_inline = inherited_rs->sq_inline;
585                 rs->sq_size = inherited_rs->sq_size;
586                 rs->rq_size = inherited_rs->rq_size;
587                 if (type == SOCK_STREAM) {
588                         rs->ctrl_max_seqno = inherited_rs->ctrl_max_seqno;
589                         rs->target_iomap_size = inherited_rs->target_iomap_size;
590                 }
591         } else {
592                 rs->sbuf_size = def_wmem;
593                 rs->rbuf_size = def_mem;
594                 rs->sq_inline = def_inline;
595                 rs->sq_size = def_sqsize;
596                 rs->rq_size = def_rqsize;
597                 if (type == SOCK_STREAM) {
598                         rs->ctrl_max_seqno = RS_QP_CTRL_SIZE;
599                         rs->target_iomap_size = def_iomap_size;
600                 }
601         }
602         fastlock_init(&rs->slock);
603         fastlock_init(&rs->rlock);
604         fastlock_init(&rs->cq_lock);
605         fastlock_init(&rs->cq_wait_lock);
606         fastlock_init(&rs->map_lock);
607         dlist_init(&rs->iomap_list);
608         dlist_init(&rs->iomap_queue);
609         return rs;
610 }
611
612 static int rs_set_nonblocking(struct rsocket *rs, long arg)
613 {
614         struct ds_qp *qp;
615         int ret = 0;
616
617         if (rs->type == SOCK_STREAM) {
618                 if (rs->cm_id->recv_cq_channel)
619                         ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
620
621                 if (!ret && rs->state < rs_connected)
622                         ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
623         } else {
624                 ret = fcntl(rs->epfd, F_SETFL, arg);
625                 if (!ret && rs->qp_list) {
626                         qp = rs->qp_list;
627                         do {
628                                 ret = fcntl(qp->cm_id->recv_cq_channel->fd,
629                                             F_SETFL, arg);
630                                 qp = ds_next_qp(qp);
631                         } while (qp != rs->qp_list && !ret);
632                 }
633         }
634
635         return ret;
636 }
637
638 static void rs_set_qp_size(struct rsocket *rs)
639 {
640         uint16_t max_size;
641
642         max_size = min(ucma_max_qpsize(rs->cm_id), RS_QP_MAX_SIZE);
643
644         if (rs->sq_size > max_size)
645                 rs->sq_size = max_size;
646         else if (rs->sq_size < RS_QP_MIN_SIZE)
647                 rs->sq_size = RS_QP_MIN_SIZE;
648
649         if (rs->rq_size > max_size)
650                 rs->rq_size = max_size;
651         else if (rs->rq_size < RS_QP_MIN_SIZE)
652                 rs->rq_size = RS_QP_MIN_SIZE;
653 }
654
655 static void ds_set_qp_size(struct rsocket *rs)
656 {
657         uint16_t max_size;
658
659         max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
660
661         if (rs->sq_size > max_size)
662                 rs->sq_size = max_size;
663         if (rs->rq_size > max_size)
664                 rs->rq_size = max_size;
665
666         if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
667                 rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
668         else
669                 rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
670
671         if (rs->sq_size > (rs->sbuf_size / RS_SNDLOWAT))
672                 rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
673         else
674                 rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
675 }
676
677 static int rs_init_bufs(struct rsocket *rs)
678 {
679         uint32_t total_rbuf_size, total_sbuf_size;
680         size_t len;
681
682         rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
683         if (!rs->rmsg)
684                 return ERR(ENOMEM);
685
686         total_sbuf_size = rs->sbuf_size;
687         if (rs->sq_inline < RS_MAX_CTRL_MSG)
688                 total_sbuf_size += RS_MAX_CTRL_MSG * RS_QP_CTRL_SIZE;
689         rs->sbuf = calloc(total_sbuf_size, 1);
690         if (!rs->sbuf)
691                 return ERR(ENOMEM);
692
693         rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, total_sbuf_size);
694         if (!rs->smr)
695                 return -1;
696
697         len = sizeof(*rs->target_sgl) * RS_SGL_SIZE +
698               sizeof(*rs->target_iomap) * rs->target_iomap_size;
699         rs->target_buffer_list = malloc(len);
700         if (!rs->target_buffer_list)
701                 return ERR(ENOMEM);
702
703         rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
704         if (!rs->target_mr)
705                 return -1;
706
707         memset(rs->target_buffer_list, 0, len);
708         rs->target_sgl = rs->target_buffer_list;
709         if (rs->target_iomap_size)
710                 rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
711
712         total_rbuf_size = rs->rbuf_size;
713         if (rs->opts & RS_OPT_MSG_SEND)
714                 total_rbuf_size += rs->rq_size * RS_MSG_SIZE;
715         rs->rbuf = calloc(total_rbuf_size, 1);
716         if (!rs->rbuf)
717                 return ERR(ENOMEM);
718
719         rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, total_rbuf_size);
720         if (!rs->rmr)
721                 return -1;
722
723         rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
724         rs->sbuf_bytes_avail = rs->sbuf_size;
725         rs->ssgl[0].lkey = rs->ssgl[1].lkey = rs->smr->lkey;
726
727         rs->rbuf_free_offset = rs->rbuf_size >> 1;
728         rs->rbuf_bytes_avail = rs->rbuf_size >> 1;
729         rs->sqe_avail = rs->sq_size - rs->ctrl_max_seqno;
730         rs->rseq_comp = rs->rq_size >> 1;
731         return 0;
732 }
733
734 static int ds_init_bufs(struct ds_qp *qp)
735 {
736         qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), 1);
737         if (!qp->rbuf)
738                 return ERR(ENOMEM);
739
740         qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size);
741         if (!qp->smr)
742                 return -1;
743
744         qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size +
745                                                      sizeof(struct ibv_grh));
746         if (!qp->rmr)
747                 return -1;
748
749         return 0;
750 }
751
752 /*
753  * If a user is waiting on a datagram rsocket through poll or select, then
754  * we need the first completion to generate an event on the related epoll fd
755  * in order to signal the user.  We arm the CQ on creation for this purpose
756  */
757 static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
758 {
759         cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
760         if (!cm_id->recv_cq_channel)
761                 return -1;
762
763         cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
764                                        cm_id, cm_id->recv_cq_channel, 0);
765         if (!cm_id->recv_cq)
766                 goto err1;
767
768         if (rs->fd_flags & O_NONBLOCK) {
769                 if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK))
770                         goto err2;
771         }
772
773         ibv_req_notify_cq(cm_id->recv_cq, 0);
774         cm_id->send_cq_channel = cm_id->recv_cq_channel;
775         cm_id->send_cq = cm_id->recv_cq;
776         return 0;
777
778 err2:
779         ibv_destroy_cq(cm_id->recv_cq);
780         cm_id->recv_cq = NULL;
781 err1:
782         ibv_destroy_comp_channel(cm_id->recv_cq_channel);
783         cm_id->recv_cq_channel = NULL;
784         return -1;
785 }
786
787 static inline int rs_post_recv(struct rsocket *rs)
788 {
789         struct ibv_recv_wr wr, *bad;
790         struct ibv_sge sge;
791
792         wr.next = NULL;
793         if (!(rs->opts & RS_OPT_MSG_SEND)) {
794                 wr.wr_id = rs_recv_wr_id(0);
795                 wr.sg_list = NULL;
796                 wr.num_sge = 0;
797         } else {
798                 wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index);
799                 sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size +
800                            (rs->rbuf_msg_index * RS_MSG_SIZE);
801                 sge.length = RS_MSG_SIZE;
802                 sge.lkey = rs->rmr->lkey;
803
804                 wr.sg_list = &sge;
805                 wr.num_sge = 1;
806                 if(++rs->rbuf_msg_index == rs->rq_size)
807                         rs->rbuf_msg_index = 0;
808         }
809
810         return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
811 }
812
813 static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, uint32_t offset)
814 {
815         struct ibv_recv_wr wr, *bad;
816         struct ibv_sge sge[2];
817
818         sge[0].addr = (uintptr_t) qp->rbuf + rs->rbuf_size;
819         sge[0].length = sizeof(struct ibv_grh);
820         sge[0].lkey = qp->rmr->lkey;
821         sge[1].addr = (uintptr_t) qp->rbuf + offset;
822         sge[1].length = RS_SNDLOWAT;
823         sge[1].lkey = qp->rmr->lkey;
824
825         wr.wr_id = rs_recv_wr_id(offset);
826         wr.next = NULL;
827         wr.sg_list = sge;
828         wr.num_sge = 2;
829
830         return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad));
831 }
832
833 static int rs_create_ep(struct rsocket *rs)
834 {
835         struct ibv_qp_init_attr qp_attr;
836         int i, ret;
837
838         rs_set_qp_size(rs);
839         if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP)
840                 rs->opts |= RS_OPT_MSG_SEND;
841         ret = rs_init_bufs(rs);
842         if (ret)
843                 return ret;
844
845         ret = rs_create_cq(rs, rs->cm_id);
846         if (ret)
847                 return ret;
848
849         memset(&qp_attr, 0, sizeof qp_attr);
850         qp_attr.qp_context = rs;
851         qp_attr.send_cq = rs->cm_id->send_cq;
852         qp_attr.recv_cq = rs->cm_id->recv_cq;
853         qp_attr.qp_type = IBV_QPT_RC;
854         qp_attr.sq_sig_all = 1;
855         qp_attr.cap.max_send_wr = rs->sq_size;
856         qp_attr.cap.max_recv_wr = rs->rq_size;
857         qp_attr.cap.max_send_sge = 2;
858         qp_attr.cap.max_recv_sge = 1;
859         qp_attr.cap.max_inline_data = rs->sq_inline;
860
861         ret = rdma_create_qp(rs->cm_id, NULL, &qp_attr);
862         if (ret)
863                 return ret;
864
865         rs->sq_inline = qp_attr.cap.max_inline_data;
866         if ((rs->opts & RS_OPT_MSG_SEND) && (rs->sq_inline < RS_MSG_SIZE))
867                 return ERR(ENOTSUP);
868
869         for (i = 0; i < rs->rq_size; i++) {
870                 ret = rs_post_recv(rs);
871                 if (ret)
872                         return ret;
873         }
874         return 0;
875 }
876
877 static void rs_release_iomap_mr(struct rs_iomap_mr *iomr)
878 {
879         if (atomic_dec(&iomr->refcnt))
880                 return;
881
882         dlist_remove(&iomr->entry);
883         ibv_dereg_mr(iomr->mr);
884         if (iomr->index >= 0)
885                 iomr->mr = NULL;
886         else
887                 free(iomr);
888 }
889
890 static void rs_free_iomappings(struct rsocket *rs)
891 {
892         struct rs_iomap_mr *iomr;
893
894         while (!dlist_empty(&rs->iomap_list)) {
895                 iomr = container_of(rs->iomap_list.next,
896                                     struct rs_iomap_mr, entry);
897                 riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
898         }
899         while (!dlist_empty(&rs->iomap_queue)) {
900                 iomr = container_of(rs->iomap_queue.next,
901                                     struct rs_iomap_mr, entry);
902                 riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
903         }
904 }
905
906 static void ds_free_qp(struct ds_qp *qp)
907 {
908         if (qp->smr)
909                 rdma_dereg_mr(qp->smr);
910
911         if (qp->rbuf) {
912                 if (qp->rmr)
913                         rdma_dereg_mr(qp->rmr);
914                 free(qp->rbuf);
915         }
916
917         if (qp->cm_id) {
918                 if (qp->cm_id->qp) {
919                         tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
920                         epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
921                                   qp->cm_id->recv_cq_channel->fd, NULL);
922                         rdma_destroy_qp(qp->cm_id);
923                 }
924                 rdma_destroy_id(qp->cm_id);
925         }
926
927         free(qp);
928 }
929
930 static void ds_free(struct rsocket *rs)
931 {
932         struct ds_qp *qp;
933
934         if (rs->udp_sock >= 0)
935                 close(rs->udp_sock);
936
937         if (rs->index >= 0)
938                 rs_remove(rs);
939
940         if (rs->dmsg)
941                 free(rs->dmsg);
942
943         while ((qp = rs->qp_list)) {
944                 ds_remove_qp(rs, qp);
945                 ds_free_qp(qp);
946         }
947
948         if (rs->epfd >= 0)
949                 close(rs->epfd);
950
951         if (rs->sbuf)
952                 free(rs->sbuf);
953
954         tdestroy(rs->dest_map, free);
955         fastlock_destroy(&rs->map_lock);
956         fastlock_destroy(&rs->cq_wait_lock);
957         fastlock_destroy(&rs->cq_lock);
958         fastlock_destroy(&rs->rlock);
959         fastlock_destroy(&rs->slock);
960         free(rs);
961 }
962
963 static void rs_free(struct rsocket *rs)
964 {
965         if (rs->type == SOCK_DGRAM) {
966                 ds_free(rs);
967                 return;
968         }
969
970         if (rs->rmsg)
971                 free(rs->rmsg);
972
973         if (rs->sbuf) {
974                 if (rs->smr)
975                         rdma_dereg_mr(rs->smr);
976                 free(rs->sbuf);
977         }
978
979         if (rs->rbuf) {
980                 if (rs->rmr)
981                         rdma_dereg_mr(rs->rmr);
982                 free(rs->rbuf);
983         }
984
985         if (rs->target_buffer_list) {
986                 if (rs->target_mr)
987                         rdma_dereg_mr(rs->target_mr);
988                 free(rs->target_buffer_list);
989         }
990
991         if (rs->cm_id) {
992                 rs_free_iomappings(rs);
993                 if (rs->cm_id->qp)
994                         rdma_destroy_qp(rs->cm_id);
995                 rdma_destroy_id(rs->cm_id);
996         }
997
998         if (rs->index >= 0)
999                 rs_remove(rs);
1000
1001         fastlock_destroy(&rs->map_lock);
1002         fastlock_destroy(&rs->cq_wait_lock);
1003         fastlock_destroy(&rs->cq_lock);
1004         fastlock_destroy(&rs->rlock);
1005         fastlock_destroy(&rs->slock);
1006         free(rs);
1007 }
1008
1009 static size_t rs_conn_data_offset(struct rsocket *rs)
1010 {
1011         return (rs->cm_id->route.addr.src_addr.sa_family == AF_IB) ?
1012                 sizeof(struct ib_connect_hdr) : 0;
1013 }
1014
1015 static void rs_format_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
1016 {
1017         conn->version = 1;
1018         conn->flags = RS_CONN_FLAG_IOMAP |
1019                       (rs_host_is_net() ? RS_CONN_FLAG_NET : 0);
1020         conn->credits = htons(rs->rq_size);
1021         memset(conn->reserved, 0, sizeof conn->reserved);
1022         conn->target_iomap_size = (uint8_t) rs_value_to_scale(rs->target_iomap_size, 8);
1023
1024         conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl);
1025         conn->target_sgl.length = htonl(RS_SGL_SIZE);
1026         conn->target_sgl.key = htonl(rs->target_mr->rkey);
1027
1028         conn->data_buf.addr = htonll((uintptr_t) rs->rbuf);
1029         conn->data_buf.length = htonl(rs->rbuf_size >> 1);
1030         conn->data_buf.key = htonl(rs->rmr->rkey);
1031 }
1032
1033 static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
1034 {
1035         rs->remote_sgl.addr = ntohll(conn->target_sgl.addr);
1036         rs->remote_sgl.length = ntohl(conn->target_sgl.length);
1037         rs->remote_sgl.key = ntohl(conn->target_sgl.key);
1038         rs->remote_sge = 1;
1039         if ((rs_host_is_net() && !(conn->flags & RS_CONN_FLAG_NET)) ||
1040             (!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET)))
1041                 rs->opts = RS_OPT_SWAP_SGL;
1042
1043         if (conn->flags & RS_CONN_FLAG_IOMAP) {
1044                 rs->remote_iomap.addr = rs->remote_sgl.addr +
1045                                         sizeof(rs->remote_sgl) * rs->remote_sgl.length;
1046                 rs->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8);
1047                 rs->remote_iomap.key = rs->remote_sgl.key;
1048         }
1049
1050         rs->target_sgl[0].addr = ntohll(conn->data_buf.addr);
1051         rs->target_sgl[0].length = ntohl(conn->data_buf.length);
1052         rs->target_sgl[0].key = ntohl(conn->data_buf.key);
1053
1054         rs->sseq_comp = ntohs(conn->credits);
1055 }
1056
1057 static int ds_init(struct rsocket *rs, int domain)
1058 {
1059         rs->udp_sock = socket(domain, SOCK_DGRAM, 0);
1060         if (rs->udp_sock < 0)
1061                 return rs->udp_sock;
1062
1063         rs->epfd = epoll_create(2);
1064         if (rs->epfd < 0)
1065                 return rs->epfd;
1066
1067         return 0;
1068 }
1069
1070 static int ds_init_ep(struct rsocket *rs)
1071 {
1072         struct ds_smsg *msg;
1073         int i, ret;
1074
1075         ds_set_qp_size(rs);
1076
1077         rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
1078         if (!rs->sbuf)
1079                 return ERR(ENOMEM);
1080
1081         rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
1082         if (!rs->dmsg)
1083                 return ERR(ENOMEM);
1084
1085         rs->sqe_avail = rs->sq_size;
1086         rs->rqe_avail = rs->rq_size;
1087
1088         rs->smsg_free = (struct ds_smsg *) rs->sbuf;
1089         msg = rs->smsg_free;
1090         for (i = 0; i < rs->sq_size - 1; i++) {
1091                 msg->next = (void *) msg + RS_SNDLOWAT;
1092                 msg = msg->next;
1093         }
1094         msg->next = NULL;
1095
1096         ret = rs_notify_svc(&udp_svc, rs, RS_SVC_ADD_DGRAM);
1097         if (ret)
1098                 return ret;
1099
1100         rs->state = rs_readable | rs_writable;
1101         return 0;
1102 }
1103
1104 int rsocket(int domain, int type, int protocol)
1105 {
1106         struct rsocket *rs;
1107         int index, ret;
1108
1109         if ((domain != AF_INET && domain != AF_INET6 && domain != AF_IB) ||
1110             ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) ||
1111             (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) ||
1112             (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP))
1113                 return ERR(ENOTSUP);
1114
1115         rs_configure();
1116         rs = rs_alloc(NULL, type);
1117         if (!rs)
1118                 return ERR(ENOMEM);
1119
1120         if (type == SOCK_STREAM) {
1121                 ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
1122                 if (ret)
1123                         goto err;
1124
1125                 rs->cm_id->route.addr.src_addr.sa_family = domain;
1126                 index = rs->cm_id->channel->fd;
1127         } else {
1128                 ret = ds_init(rs, domain);
1129                 if (ret)
1130                         goto err;
1131
1132                 index = rs->udp_sock;
1133         }
1134
1135         ret = rs_insert(rs, index);
1136         if (ret < 0)
1137                 goto err;
1138
1139         return rs->index;
1140
1141 err:
1142         rs_free(rs);
1143         return ret;
1144 }
1145
1146 int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
1147 {
1148         struct rsocket *rs;
1149         int ret;
1150
1151         rs = idm_lookup(&idm, socket);
1152         if (!rs)
1153                 return ERR(EBADF);
1154         if (rs->type == SOCK_STREAM) {
1155                 ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
1156                 if (!ret)
1157                         rs->state = rs_bound;
1158         } else {
1159                 if (rs->state == rs_init) {
1160                         ret = ds_init_ep(rs);
1161                         if (ret)
1162                                 return ret;
1163                 }
1164                 ret = bind(rs->udp_sock, addr, addrlen);
1165         }
1166         return ret;
1167 }
1168
1169 int rlisten(int socket, int backlog)
1170 {
1171         struct rsocket *rs;
1172         int ret;
1173
1174         rs = idm_lookup(&idm, socket);
1175         if (!rs)
1176                 return ERR(EBADF);
1177
1178         if (rs->state != rs_listening) {
1179                 ret = rdma_listen(rs->cm_id, backlog);
1180                 if (!ret)
1181                         rs->state = rs_listening;
1182         } else {
1183                 ret = 0;
1184         }
1185         return ret;
1186 }
1187
1188 /*
1189  * Nonblocking is usually not inherited between sockets, but we need to
1190  * inherit it here to establish the connection only.  This is needed to
1191  * prevent rdma_accept from blocking until the remote side finishes
1192  * establishing the connection.  If we were to allow rdma_accept to block,
1193  * then a single thread cannot establish a connection with itself, or
1194  * two threads which try to connect to each other can deadlock trying to
1195  * form a connection.
1196  *
1197  * Data transfers on the new socket remain blocking unless the user
1198  * specifies otherwise through rfcntl.
1199  */
1200 int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
1201 {
1202         struct rsocket *rs, *new_rs;
1203         struct rdma_conn_param param;
1204         struct rs_conn_data *creq, cresp;
1205         int ret;
1206
1207         rs = idm_lookup(&idm, socket);
1208         if (!rs)
1209                 return ERR(EBADF);
1210         new_rs = rs_alloc(rs, rs->type);
1211         if (!new_rs)
1212                 return ERR(ENOMEM);
1213
1214         ret = rdma_get_request(rs->cm_id, &new_rs->cm_id);
1215         if (ret)
1216                 goto err;
1217
1218         ret = rs_insert(new_rs, new_rs->cm_id->channel->fd);
1219         if (ret < 0)
1220                 goto err;
1221
1222         creq = (struct rs_conn_data *)
1223                (new_rs->cm_id->event->param.conn.private_data + rs_conn_data_offset(rs));
1224         if (creq->version != 1) {
1225                 ret = ERR(ENOTSUP);
1226                 goto err;
1227         }
1228
1229         if (rs->fd_flags & O_NONBLOCK)
1230                 fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
1231
1232         ret = rs_create_ep(new_rs);
1233         if (ret)
1234                 goto err;
1235
1236         rs_save_conn_data(new_rs, creq);
1237         param = new_rs->cm_id->event->param.conn;
1238         rs_format_conn_data(new_rs, &cresp);
1239         param.private_data = &cresp;
1240         param.private_data_len = sizeof cresp;
1241         ret = rdma_accept(new_rs->cm_id, &param);
1242         if (!ret)
1243                 new_rs->state = rs_connect_rdwr;
1244         else if (errno == EAGAIN || errno == EWOULDBLOCK)
1245                 new_rs->state = rs_accepting;
1246         else
1247                 goto err;
1248
1249         if (addr && addrlen)
1250                 rgetpeername(new_rs->index, addr, addrlen);
1251         return new_rs->index;
1252
1253 err:
1254         rs_free(new_rs);
1255         return ret;
1256 }
1257
1258 static int rs_do_connect(struct rsocket *rs)
1259 {
1260         struct rdma_conn_param param;
1261         struct rs_conn_private_data cdata;
1262         struct rs_conn_data *creq, *cresp;
1263         int to, ret;
1264
1265         switch (rs->state) {
1266         case rs_init:
1267         case rs_bound:
1268 resolve_addr:
1269                 to = 1000 << rs->retries++;
1270                 ret = rdma_resolve_addr(rs->cm_id, NULL,
1271                                         &rs->cm_id->route.addr.dst_addr, to);
1272                 if (!ret)
1273                         goto resolve_route;
1274                 if (errno == EAGAIN || errno == EWOULDBLOCK)
1275                         rs->state = rs_resolving_addr;
1276                 break;
1277         case rs_resolving_addr:
1278                 ret = ucma_complete(rs->cm_id);
1279                 if (ret) {
1280                         if (errno == ETIMEDOUT && rs->retries <= RS_CONN_RETRIES)
1281                                 goto resolve_addr;
1282                         break;
1283                 }
1284
1285                 rs->retries = 0;
1286 resolve_route:
1287                 to = 1000 << rs->retries++;
1288                 if (rs->optval) {
1289                         ret = rdma_set_option(rs->cm_id,  RDMA_OPTION_IB,
1290                                               RDMA_OPTION_IB_PATH, rs->optval,
1291                                               rs->optlen);
1292                         free(rs->optval);
1293                         rs->optval = NULL;
1294                         if (!ret) {
1295                                 rs->state = rs_resolving_route;
1296                                 goto resolving_route;
1297                         }
1298                 } else {
1299                         ret = rdma_resolve_route(rs->cm_id, to);
1300                         if (!ret)
1301                                 goto do_connect;
1302                 }
1303                 if (errno == EAGAIN || errno == EWOULDBLOCK)
1304                         rs->state = rs_resolving_route;
1305                 break;
1306         case rs_resolving_route:
1307 resolving_route:
1308                 ret = ucma_complete(rs->cm_id);
1309                 if (ret) {
1310                         if (errno == ETIMEDOUT && rs->retries <= RS_CONN_RETRIES)
1311                                 goto resolve_route;
1312                         break;
1313                 }
1314 do_connect:
1315                 ret = rs_create_ep(rs);
1316                 if (ret)
1317                         break;
1318
1319                 memset(&param, 0, sizeof param);
1320                 creq = (void *) &cdata + rs_conn_data_offset(rs);
1321                 rs_format_conn_data(rs, creq);
1322                 param.private_data = (void *) creq - rs_conn_data_offset(rs);
1323                 param.private_data_len = sizeof(*creq) + rs_conn_data_offset(rs);
1324                 param.flow_control = 1;
1325                 param.retry_count = 7;
1326                 param.rnr_retry_count = 7;
1327                 /* work-around: iWarp issues RDMA read during connection */
1328                 if (rs->opts & RS_OPT_MSG_SEND)
1329                         param.initiator_depth = 1;
1330                 rs->retries = 0;
1331
1332                 ret = rdma_connect(rs->cm_id, &param);
1333                 if (!ret)
1334                         goto connected;
1335                 if (errno == EAGAIN || errno == EWOULDBLOCK)
1336                         rs->state = rs_connecting;
1337                 break;
1338         case rs_connecting:
1339                 ret = ucma_complete(rs->cm_id);
1340                 if (ret)
1341                         break;
1342 connected:
1343                 cresp = (struct rs_conn_data *) rs->cm_id->event->param.conn.private_data;
1344                 if (cresp->version != 1) {
1345                         ret = ERR(ENOTSUP);
1346                         break;
1347                 }
1348
1349                 rs_save_conn_data(rs, cresp);
1350                 rs->state = rs_connect_rdwr;
1351                 break;
1352         case rs_accepting:
1353                 if (!(rs->fd_flags & O_NONBLOCK))
1354                         fcntl(rs->cm_id->channel->fd, F_SETFL, 0);
1355
1356                 ret = ucma_complete(rs->cm_id);
1357                 if (ret)
1358                         break;
1359
1360                 rs->state = rs_connect_rdwr;
1361                 break;
1362         default:
1363                 ret = ERR(EINVAL);
1364                 break;
1365         }
1366
1367         if (ret) {
1368                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1369                         errno = EINPROGRESS;
1370                 } else {
1371                         rs->state = rs_connect_error;
1372                         rs->err = errno;
1373                 }
1374         }
1375         return ret;
1376 }
1377
1378 static int rs_any_addr(const union socket_addr *addr)
1379 {
1380         if (addr->sa.sa_family == AF_INET) {
1381                 return (addr->sin.sin_addr.s_addr == INADDR_ANY ||
1382                         addr->sin.sin_addr.s_addr == INADDR_LOOPBACK);
1383         } else {
1384                 return (!memcmp(&addr->sin6.sin6_addr, &in6addr_any, 16) ||
1385                         !memcmp(&addr->sin6.sin6_addr, &in6addr_loopback, 16));
1386         }
1387 }
1388
1389 static int ds_get_src_addr(struct rsocket *rs,
1390                            const struct sockaddr *dest_addr, socklen_t dest_len,
1391                            union socket_addr *src_addr, socklen_t *src_len)
1392 {
1393         int sock, ret;
1394         uint16_t port;
1395
1396         *src_len = sizeof *src_addr;
1397         ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
1398         if (ret || !rs_any_addr(src_addr))
1399                 return ret;
1400
1401         port = src_addr->sin.sin_port;
1402         sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0);
1403         if (sock < 0)
1404                 return sock;
1405
1406         ret = connect(sock, dest_addr, dest_len);
1407         if (ret)
1408                 goto out;
1409
1410         *src_len = sizeof *src_addr;
1411         ret = getsockname(sock, &src_addr->sa, src_len);
1412         src_addr->sin.sin_port = port;
1413 out:
1414         close(sock);
1415         return ret;
1416 }
1417
1418 static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
1419 {
1420         if (addr->sa.sa_family == AF_INET) {
1421                 hdr->version = 4;
1422                 hdr->length = DS_IPV4_HDR_LEN;
1423                 hdr->port = addr->sin.sin_port;
1424                 hdr->addr.ipv4 = addr->sin.sin_addr.s_addr;
1425         } else {
1426                 hdr->version = 6;
1427                 hdr->length = DS_IPV6_HDR_LEN;
1428                 hdr->port = addr->sin6.sin6_port;
1429                 hdr->addr.ipv6.flowinfo= addr->sin6.sin6_flowinfo;
1430                 memcpy(&hdr->addr.ipv6.addr, &addr->sin6.sin6_addr, 16);
1431         }
1432 }
1433
1434 static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
1435                           socklen_t addrlen)
1436 {
1437         struct ibv_port_attr port_attr;
1438         struct ibv_ah_attr attr;
1439         int ret;
1440
1441         memcpy(&qp->dest.addr, addr, addrlen);
1442         qp->dest.qp = qp;
1443         qp->dest.qpn = qp->cm_id->qp->qp_num;
1444
1445         ret = ibv_query_port(qp->cm_id->verbs, qp->cm_id->port_num, &port_attr);
1446         if (ret)
1447                 return ret;
1448
1449         memset(&attr, 0, sizeof attr);
1450         attr.dlid = port_attr.lid;
1451         attr.port_num = qp->cm_id->port_num;
1452         qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
1453         if (!qp->dest.ah)
1454                 return ERR(ENOMEM);
1455
1456         tsearch(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
1457         return 0;
1458 }
1459
1460 static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
1461                         socklen_t addrlen, struct ds_qp **new_qp)
1462 {
1463         struct ds_qp *qp;
1464         struct ibv_qp_init_attr qp_attr;
1465         struct epoll_event event;
1466         int i, ret;
1467
1468         qp = calloc(1, sizeof(*qp));
1469         if (!qp)
1470                 return ERR(ENOMEM);
1471
1472         qp->rs = rs;
1473         ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP);
1474         if (ret)
1475                 goto err;
1476
1477         ds_format_hdr(&qp->hdr, src_addr);
1478         ret = rdma_bind_addr(qp->cm_id, &src_addr->sa);
1479         if (ret)
1480                 goto err;
1481
1482         ret = ds_init_bufs(qp);
1483         if (ret)
1484                 goto err;
1485
1486         ret = rs_create_cq(rs, qp->cm_id);
1487         if (ret)
1488                 goto err;
1489
1490         memset(&qp_attr, 0, sizeof qp_attr);
1491         qp_attr.qp_context = qp;
1492         qp_attr.send_cq = qp->cm_id->send_cq;
1493         qp_attr.recv_cq = qp->cm_id->recv_cq;
1494         qp_attr.qp_type = IBV_QPT_UD;
1495         qp_attr.sq_sig_all = 1;
1496         qp_attr.cap.max_send_wr = rs->sq_size;
1497         qp_attr.cap.max_recv_wr = rs->rq_size;
1498         qp_attr.cap.max_send_sge = 1;
1499         qp_attr.cap.max_recv_sge = 2;
1500         qp_attr.cap.max_inline_data = rs->sq_inline;
1501         ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr);
1502         if (ret)
1503                 goto err;
1504
1505         rs->sq_inline = qp_attr.cap.max_inline_data;
1506         ret = ds_add_qp_dest(qp, src_addr, addrlen);
1507         if (ret)
1508                 goto err;
1509
1510         event.events = EPOLLIN;
1511         event.data.ptr = qp;
1512         ret = epoll_ctl(rs->epfd,  EPOLL_CTL_ADD,
1513                         qp->cm_id->recv_cq_channel->fd, &event);
1514         if (ret)
1515                 goto err;
1516
1517         for (i = 0; i < rs->rq_size; i++) {
1518                 ret = ds_post_recv(rs, qp, i * RS_SNDLOWAT);
1519                 if (ret)
1520                         goto err;
1521         }
1522
1523         ds_insert_qp(rs, qp);
1524         *new_qp = qp;
1525         return 0;
1526 err:
1527         ds_free_qp(qp);
1528         return ret;
1529 }
1530
1531 static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
1532                      socklen_t addrlen, struct ds_qp **qp)
1533 {
1534         if (rs->qp_list) {
1535                 *qp = rs->qp_list;
1536                 do {
1537                         if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id),
1538                                              src_addr))
1539                                 return 0;
1540
1541                         *qp = ds_next_qp(*qp);
1542                 } while (*qp != rs->qp_list);
1543         }
1544
1545         return ds_create_qp(rs, src_addr, addrlen, qp);
1546 }
1547
1548 static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
1549                        socklen_t addrlen, struct ds_dest **dest)
1550 {
1551         union socket_addr src_addr;
1552         socklen_t src_len;
1553         struct ds_qp *qp;
1554         struct ds_dest **tdest, *new_dest;
1555         int ret = 0;
1556
1557         fastlock_acquire(&rs->map_lock);
1558         tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
1559         if (tdest)
1560                 goto found;
1561
1562         ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
1563         if (ret)
1564                 goto out;
1565
1566         ret = ds_get_qp(rs, &src_addr, src_len, &qp);
1567         if (ret)
1568                 goto out;
1569
1570         tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
1571         if (!tdest) {
1572                 new_dest = calloc(1, sizeof(*new_dest));
1573                 if (!new_dest) {
1574                         ret = ERR(ENOMEM);
1575                         goto out;
1576                 }
1577
1578                 memcpy(&new_dest->addr, addr, addrlen);
1579                 new_dest->qp = qp;
1580                 tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr);
1581         }
1582
1583 found:
1584         *dest = *tdest;
1585 out:
1586         fastlock_release(&rs->map_lock);
1587         return ret;
1588 }
1589
1590 int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
1591 {
1592         struct rsocket *rs;
1593         int ret;
1594
1595         rs = idm_lookup(&idm, socket);
1596         if (!rs)
1597                 return ERR(EBADF);
1598         if (rs->type == SOCK_STREAM) {
1599                 memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
1600                 ret = rs_do_connect(rs);
1601         } else {
1602                 if (rs->state == rs_init) {
1603                         ret = ds_init_ep(rs);
1604                         if (ret)
1605                                 return ret;
1606                 }
1607
1608                 fastlock_acquire(&rs->slock);
1609                 ret = connect(rs->udp_sock, addr, addrlen);
1610                 if (!ret)
1611                         ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
1612                 fastlock_release(&rs->slock);
1613         }
1614         return ret;
1615 }
1616
1617 static void *rs_get_ctrl_buf(struct rsocket *rs)
1618 {
1619         return rs->sbuf + rs->sbuf_size +
1620                 RS_MAX_CTRL_MSG * (rs->ctrl_seqno & (RS_QP_CTRL_SIZE - 1));
1621 }
1622
1623 static int rs_post_msg(struct rsocket *rs, uint32_t msg)
1624 {
1625         struct ibv_send_wr wr, *bad;
1626         struct ibv_sge sge;
1627
1628         wr.wr_id = rs_send_wr_id(msg);
1629         wr.next = NULL;
1630         if (!(rs->opts & RS_OPT_MSG_SEND)) {
1631                 wr.sg_list = NULL;
1632                 wr.num_sge = 0;
1633                 wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
1634                 wr.send_flags = 0;
1635                 wr.imm_data = htonl(msg);
1636         } else {
1637                 sge.addr = (uintptr_t) &msg;
1638                 sge.lkey = 0;
1639                 sge.length = sizeof msg;
1640                 wr.sg_list = &sge;
1641                 wr.num_sge = 1;
1642                 wr.opcode = IBV_WR_SEND;
1643                 wr.send_flags = IBV_SEND_INLINE;
1644         }
1645
1646         return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
1647 }
1648
1649 static int rs_post_write(struct rsocket *rs,
1650                          struct ibv_sge *sgl, int nsge,
1651                          uint32_t wr_data, int flags,
1652                          uint64_t addr, uint32_t rkey)
1653 {
1654         struct ibv_send_wr wr, *bad;
1655
1656         wr.wr_id = rs_send_wr_id(wr_data);
1657         wr.next = NULL;
1658         wr.sg_list = sgl;
1659         wr.num_sge = nsge;
1660         wr.opcode = IBV_WR_RDMA_WRITE;
1661         wr.send_flags = flags;
1662         wr.wr.rdma.remote_addr = addr;
1663         wr.wr.rdma.rkey = rkey;
1664
1665         return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
1666 }
1667
1668 static int rs_post_write_msg(struct rsocket *rs,
1669                          struct ibv_sge *sgl, int nsge,
1670                          uint32_t msg, int flags,
1671                          uint64_t addr, uint32_t rkey)
1672 {
1673         struct ibv_send_wr wr, *bad;
1674         struct ibv_sge sge;
1675         int ret;
1676
1677         wr.next = NULL;
1678         if (!(rs->opts & RS_OPT_MSG_SEND)) {
1679                 wr.wr_id = rs_send_wr_id(msg);
1680                 wr.sg_list = sgl;
1681                 wr.num_sge = nsge;
1682                 wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
1683                 wr.send_flags = flags;
1684                 wr.imm_data = htonl(msg);
1685                 wr.wr.rdma.remote_addr = addr;
1686                 wr.wr.rdma.rkey = rkey;
1687
1688                 return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
1689         } else {
1690                 ret = rs_post_write(rs, sgl, nsge, msg, flags, addr, rkey);
1691                 if (!ret) {
1692                         wr.wr_id = rs_send_wr_id(rs_msg_set(rs_msg_op(msg), 0)) |
1693                                    RS_WR_ID_FLAG_MSG_SEND;
1694                         sge.addr = (uintptr_t) &msg;
1695                         sge.lkey = 0;
1696                         sge.length = sizeof msg;
1697                         wr.sg_list = &sge;
1698                         wr.num_sge = 1;
1699                         wr.opcode = IBV_WR_SEND;
1700                         wr.send_flags = IBV_SEND_INLINE;
1701
1702                         ret = rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
1703                 }
1704                 return ret;
1705         }
1706 }
1707
1708 static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
1709                         uint32_t wr_data)
1710 {
1711         struct ibv_send_wr wr, *bad;
1712
1713         wr.wr_id = rs_send_wr_id(wr_data);
1714         wr.next = NULL;
1715         wr.sg_list = sge;
1716         wr.num_sge = 1;
1717         wr.opcode = IBV_WR_SEND;
1718         wr.send_flags = (sge->length <= rs->sq_inline) ? IBV_SEND_INLINE : 0;
1719         wr.wr.ud.ah = rs->conn_dest->ah;
1720         wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
1721         wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
1722
1723         return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
1724 }
1725
1726 /*
1727  * Update target SGE before sending data.  Otherwise the remote side may
1728  * update the entry before we do.
1729  */
1730 static int rs_write_data(struct rsocket *rs,
1731                          struct ibv_sge *sgl, int nsge,
1732                          uint32_t length, int flags)
1733 {
1734         uint64_t addr;
1735         uint32_t rkey;
1736
1737         rs->sseq_no++;
1738         rs->sqe_avail--;
1739         if (rs->opts & RS_OPT_MSG_SEND)
1740                 rs->sqe_avail--;
1741         rs->sbuf_bytes_avail -= length;
1742
1743         addr = rs->target_sgl[rs->target_sge].addr;
1744         rkey = rs->target_sgl[rs->target_sge].key;
1745
1746         rs->target_sgl[rs->target_sge].addr += length;
1747         rs->target_sgl[rs->target_sge].length -= length;
1748
1749         if (!rs->target_sgl[rs->target_sge].length) {
1750                 if (++rs->target_sge == RS_SGL_SIZE)
1751                         rs->target_sge = 0;
1752         }
1753
1754         return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
1755                                  flags, addr, rkey);
1756 }
1757
1758 static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
1759                            struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
1760 {
1761         uint64_t addr;
1762
1763         rs->sqe_avail--;
1764         rs->sbuf_bytes_avail -= length;
1765
1766         addr = iom->sge.addr + offset - iom->offset;
1767         return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
1768                              flags, addr, iom->sge.key);
1769 }
1770
1771 static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
1772                           struct ibv_sge *sgl, int nsge, int flags)
1773 {
1774         uint64_t addr;
1775
1776         rs->sseq_no++;
1777         rs->sqe_avail--;
1778         if (rs->opts & RS_OPT_MSG_SEND)
1779                 rs->sqe_avail--;
1780         rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
1781
1782         addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
1783         return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
1784                                  flags, addr, rs->remote_iomap.key);
1785 }
1786
1787 static uint32_t rs_sbuf_left(struct rsocket *rs)
1788 {
1789         return (uint32_t) (((uint64_t) (uintptr_t) &rs->sbuf[rs->sbuf_size]) -
1790                            rs->ssgl[0].addr);
1791 }
1792
1793 static void rs_send_credits(struct rsocket *rs)
1794 {
1795         struct ibv_sge ibsge;
1796         struct rs_sge sge, *sge_buf;
1797         int flags;
1798
1799         rs->ctrl_seqno++;
1800         rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1);
1801         if (rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) {
1802                 if (rs->opts & RS_OPT_MSG_SEND)
1803                         rs->ctrl_seqno++;
1804
1805                 if (!(rs->opts & RS_OPT_SWAP_SGL)) {
1806                         sge.addr = (uintptr_t) &rs->rbuf[rs->rbuf_free_offset];
1807                         sge.key = rs->rmr->rkey;
1808                         sge.length = rs->rbuf_size >> 1;
1809                 } else {
1810                         sge.addr = bswap_64((uintptr_t) &rs->rbuf[rs->rbuf_free_offset]);
1811                         sge.key = bswap_32(rs->rmr->rkey);
1812                         sge.length = bswap_32(rs->rbuf_size >> 1);
1813                 }
1814
1815                 if (rs->sq_inline < sizeof sge) {
1816                         sge_buf = rs_get_ctrl_buf(rs);
1817                         memcpy(sge_buf, &sge, sizeof sge);
1818                         ibsge.addr = (uintptr_t) sge_buf;
1819                         ibsge.lkey = rs->smr->lkey;
1820                         flags = 0;
1821                 } else {
1822                         ibsge.addr = (uintptr_t) &sge;
1823                         ibsge.lkey = 0;
1824                         flags = IBV_SEND_INLINE;
1825                 }
1826                 ibsge.length = sizeof(sge);
1827
1828                 rs_post_write_msg(rs, &ibsge, 1,
1829                         rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), flags,
1830                         rs->remote_sgl.addr + rs->remote_sge * sizeof(struct rs_sge),
1831                         rs->remote_sgl.key);
1832
1833                 rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
1834                 rs->rbuf_free_offset += rs->rbuf_size >> 1;
1835                 if (rs->rbuf_free_offset >= rs->rbuf_size)
1836                         rs->rbuf_free_offset = 0;
1837                 if (++rs->remote_sge == rs->remote_sgl.length)
1838                         rs->remote_sge = 0;
1839         } else {
1840                 rs_post_msg(rs, rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size));
1841         }
1842 }
1843
1844 static inline int rs_ctrl_avail(struct rsocket *rs)
1845 {
1846         return rs->ctrl_seqno != rs->ctrl_max_seqno;
1847 }
1848
1849 /* Protocols that do not support RDMA write with immediate may require 2 msgs */
1850 static inline int rs_2ctrl_avail(struct rsocket *rs)
1851 {
1852         return (int)((rs->ctrl_seqno + 1) - rs->ctrl_max_seqno) < 0;
1853 }
1854
1855 static int rs_give_credits(struct rsocket *rs)
1856 {
1857         if (!(rs->opts & RS_OPT_MSG_SEND)) {
1858                 return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
1859                         ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
1860                        rs_ctrl_avail(rs) && (rs->state & rs_connected);
1861         } else {
1862                 return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
1863                         ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
1864                        rs_2ctrl_avail(rs) && (rs->state & rs_connected);
1865         }
1866 }
1867
1868 static void rs_update_credits(struct rsocket *rs)
1869 {
1870         if (rs_give_credits(rs))
1871                 rs_send_credits(rs);
1872 }
1873
1874 static int rs_poll_cq(struct rsocket *rs)
1875 {
1876         struct ibv_wc wc;
1877         uint32_t msg;
1878         int ret, rcnt = 0;
1879
1880         while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) {
1881                 if (rs_wr_is_recv(wc.wr_id)) {
1882                         if (wc.status != IBV_WC_SUCCESS)
1883                                 continue;
1884                         rcnt++;
1885
1886                         if (wc.wc_flags & IBV_WC_WITH_IMM) {
1887                                 msg = ntohl(wc.imm_data);
1888                         } else {
1889                                 msg = ((uint32_t *) (rs->rbuf + rs->rbuf_size))
1890                                         [rs_wr_data(wc.wr_id)];
1891
1892                         }
1893                         switch (rs_msg_op(msg)) {
1894                         case RS_OP_SGL:
1895                                 rs->sseq_comp = (uint16_t) rs_msg_data(msg);
1896                                 break;
1897                         case RS_OP_IOMAP_SGL:
1898                                 /* The iomap was updated, that's nice to know. */
1899                                 break;
1900                         case RS_OP_CTRL:
1901                                 if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
1902                                         rs->state = rs_disconnected;
1903                                         return 0;
1904                                 } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
1905                                         if (rs->state & rs_writable) {
1906                                                 rs->state &= ~rs_readable;
1907                                         } else {
1908                                                 rs->state = rs_disconnected;
1909                                                 return 0;
1910                                         }
1911                                 }
1912                                 break;
1913                         case RS_OP_WRITE:
1914                                 /* We really shouldn't be here. */
1915                                 break;
1916                         default:
1917                                 rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg);
1918                                 rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg);
1919                                 if (++rs->rmsg_tail == rs->rq_size + 1)
1920                                         rs->rmsg_tail = 0;
1921                                 break;
1922                         }
1923                 } else {
1924                         switch  (rs_msg_op(rs_wr_data(wc.wr_id))) {
1925                         case RS_OP_SGL:
1926                                 rs->ctrl_max_seqno++;
1927                                 break;
1928                         case RS_OP_CTRL:
1929                                 rs->ctrl_max_seqno++;
1930                                 if (rs_msg_data(rs_wr_data(wc.wr_id)) == RS_CTRL_DISCONNECT)
1931                                         rs->state = rs_disconnected;
1932                                 break;
1933                         case RS_OP_IOMAP_SGL:
1934                                 rs->sqe_avail++;
1935                                 if (!rs_wr_is_msg_send(wc.wr_id))
1936                                         rs->sbuf_bytes_avail += sizeof(struct rs_iomap);
1937                                 break;
1938                         default:
1939                                 rs->sqe_avail++;
1940                                 rs->sbuf_bytes_avail += rs_msg_data(rs_wr_data(wc.wr_id));
1941                                 break;
1942                         }
1943                         if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) {
1944                                 rs->state = rs_error;
1945                                 rs->err = EIO;
1946                         }
1947                 }
1948         }
1949
1950         if (rs->state & rs_connected) {
1951                 while (!ret && rcnt--)
1952                         ret = rs_post_recv(rs);
1953
1954                 if (ret) {
1955                         rs->state = rs_error;
1956                         rs->err = errno;
1957                 }
1958         }
1959         return ret;
1960 }
1961
1962 static int rs_get_cq_event(struct rsocket *rs)
1963 {
1964         struct ibv_cq *cq;
1965         void *context;
1966         int ret;
1967
1968         if (!rs->cq_armed)
1969                 return 0;
1970
1971         ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
1972         if (!ret) {
1973                 ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
1974                 rs->cq_armed = 0;
1975         } else if (errno != EAGAIN) {
1976                 rs->state = rs_error;
1977         }
1978
1979         return ret;
1980 }
1981
1982 /*
1983  * Although we serialize rsend and rrecv calls with respect to themselves,
1984  * both calls may run simultaneously and need to poll the CQ for completions.
1985  * We need to serialize access to the CQ, but rsend and rrecv need to
1986  * allow each other to make forward progress.
1987  *
1988  * For example, rsend may need to wait for credits from the remote side,
1989  * which could be stalled until the remote process calls rrecv.  This should
1990  * not block rrecv from receiving data from the remote side however.
1991  *
1992  * We handle this by using two locks.  The cq_lock protects against polling
1993  * the CQ and processing completions.  The cq_wait_lock serializes access to
1994  * waiting on the CQ.
1995  */
1996 static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
1997 {
1998         int ret;
1999
2000         fastlock_acquire(&rs->cq_lock);
2001         do {
2002                 rs_update_credits(rs);
2003                 ret = rs_poll_cq(rs);
2004                 if (test(rs)) {
2005                         ret = 0;
2006                         break;
2007                 } else if (ret) {
2008                         break;
2009                 } else if (nonblock) {
2010                         ret = ERR(EWOULDBLOCK);
2011                 } else if (!rs->cq_armed) {
2012                         ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
2013                         rs->cq_armed = 1;
2014                 } else {
2015                         rs_update_credits(rs);
2016                         fastlock_acquire(&rs->cq_wait_lock);
2017                         fastlock_release(&rs->cq_lock);
2018
2019                         ret = rs_get_cq_event(rs);
2020                         fastlock_release(&rs->cq_wait_lock);
2021                         fastlock_acquire(&rs->cq_lock);
2022                 }
2023         } while (!ret);
2024
2025         rs_update_credits(rs);
2026         fastlock_release(&rs->cq_lock);
2027         return ret;
2028 }
2029
2030 static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
2031 {
2032         struct timeval s, e;
2033         uint32_t poll_time = 0;
2034         int ret;
2035
2036         do {
2037                 ret = rs_process_cq(rs, 1, test);
2038                 if (!ret || nonblock || errno != EWOULDBLOCK)
2039                         return ret;
2040
2041                 if (!poll_time)
2042                         gettimeofday(&s, NULL);
2043
2044                 gettimeofday(&e, NULL);
2045                 poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
2046                             (e.tv_usec - s.tv_usec) + 1;
2047         } while (poll_time <= polling_time);
2048
2049         ret = rs_process_cq(rs, 0, test);
2050         return ret;
2051 }
2052
2053 static int ds_valid_recv(struct ds_qp *qp, struct ibv_wc *wc)
2054 {
2055         struct ds_header *hdr;
2056
2057         hdr = (struct ds_header *) (qp->rbuf + rs_wr_data(wc->wr_id));
2058         return ((wc->byte_len >= sizeof(struct ibv_grh) + DS_IPV4_HDR_LEN) &&
2059                 ((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) ||
2060                  (hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN)));
2061 }
2062
2063 /*
2064  * Poll all CQs associated with a datagram rsocket.  We need to drop any
2065  * received messages that we do not have room to store.  To limit drops,
2066  * we only poll if we have room to store the receive or we need a send
2067  * buffer.  To ensure fairness, we poll the CQs round robin, remembering
2068  * where we left off.
2069  */
2070 static void ds_poll_cqs(struct rsocket *rs)
2071 {
2072         struct ds_qp *qp;
2073         struct ds_smsg *smsg;
2074         struct ds_rmsg *rmsg;
2075         struct ibv_wc wc;
2076         int ret, cnt;
2077
2078         if (!(qp = rs->qp_list))
2079                 return;
2080
2081         do {
2082                 cnt = 0;
2083                 do {
2084                         ret = ibv_poll_cq(qp->cm_id->recv_cq, 1, &wc);
2085                         if (ret <= 0) {
2086                                 qp = ds_next_qp(qp);
2087                                 continue;
2088                         }
2089
2090                         if (rs_wr_is_recv(wc.wr_id)) {
2091                                 if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
2092                                     ds_valid_recv(qp, &wc)) {
2093                                         rs->rqe_avail--;
2094                                         rmsg = &rs->dmsg[rs->rmsg_tail];
2095                                         rmsg->qp = qp;
2096                                         rmsg->offset = rs_wr_data(wc.wr_id);
2097                                         rmsg->length = wc.byte_len - sizeof(struct ibv_grh);
2098                                         if (++rs->rmsg_tail == rs->rq_size + 1)
2099                                                 rs->rmsg_tail = 0;
2100                                 } else {
2101                                         ds_post_recv(rs, qp, rs_wr_data(wc.wr_id));
2102                                 }
2103                         } else {
2104                                 smsg = (struct ds_smsg *) (rs->sbuf + rs_wr_data(wc.wr_id));
2105                                 smsg->next = rs->smsg_free;
2106                                 rs->smsg_free = smsg;
2107                                 rs->sqe_avail++;
2108                         }
2109
2110                         qp = ds_next_qp(qp);
2111                         if (!rs->rqe_avail && rs->sqe_avail) {
2112                                 rs->qp_list = qp;
2113                                 return;
2114                         }
2115                         cnt++;
2116                 } while (qp != rs->qp_list);
2117         } while (cnt);
2118 }
2119
2120 static void ds_req_notify_cqs(struct rsocket *rs)
2121 {
2122         struct ds_qp *qp;
2123
2124         if (!(qp = rs->qp_list))
2125                 return;
2126
2127         do {
2128                 if (!qp->cq_armed) {
2129                         ibv_req_notify_cq(qp->cm_id->recv_cq, 0);
2130                         qp->cq_armed = 1;
2131                 }
2132                 qp = ds_next_qp(qp);
2133         } while (qp != rs->qp_list);
2134 }
2135
2136 static int ds_get_cq_event(struct rsocket *rs)
2137 {
2138         struct epoll_event event;
2139         struct ds_qp *qp;
2140         struct ibv_cq *cq;
2141         void *context;
2142         int ret;
2143
2144         if (!rs->cq_armed)
2145                 return 0;
2146
2147         ret = epoll_wait(rs->epfd, &event, 1, -1);
2148         if (ret <= 0)
2149                 return ret;
2150
2151         qp = event.data.ptr;
2152         ret = ibv_get_cq_event(qp->cm_id->recv_cq_channel, &cq, &context);
2153         if (!ret) {
2154                 ibv_ack_cq_events(qp->cm_id->recv_cq, 1);
2155                 qp->cq_armed = 0;
2156                 rs->cq_armed = 0;
2157         }
2158
2159         return ret;
2160 }
2161
2162 static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
2163 {
2164         int ret = 0;
2165
2166         fastlock_acquire(&rs->cq_lock);
2167         do {
2168                 ds_poll_cqs(rs);
2169                 if (test(rs)) {
2170                         ret = 0;
2171                         break;
2172                 } else if (nonblock) {
2173                         ret = ERR(EWOULDBLOCK);
2174                 } else if (!rs->cq_armed) {
2175                         ds_req_notify_cqs(rs);
2176                         rs->cq_armed = 1;
2177                 } else {
2178                         fastlock_acquire(&rs->cq_wait_lock);
2179                         fastlock_release(&rs->cq_lock);
2180
2181                         ret = ds_get_cq_event(rs);
2182                         fastlock_release(&rs->cq_wait_lock);
2183                         fastlock_acquire(&rs->cq_lock);
2184                 }
2185         } while (!ret);
2186
2187         fastlock_release(&rs->cq_lock);
2188         return ret;
2189 }
2190
2191 static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
2192 {
2193         struct timeval s, e;
2194         uint32_t poll_time = 0;
2195         int ret;
2196
2197         do {
2198                 ret = ds_process_cqs(rs, 1, test);
2199                 if (!ret || nonblock || errno != EWOULDBLOCK)
2200                         return ret;
2201
2202                 if (!poll_time)
2203                         gettimeofday(&s, NULL);
2204
2205                 gettimeofday(&e, NULL);
2206                 poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
2207                             (e.tv_usec - s.tv_usec) + 1;
2208         } while (poll_time <= polling_time);
2209
2210         ret = ds_process_cqs(rs, 0, test);
2211         return ret;
2212 }
2213
2214 static int rs_nonblocking(struct rsocket *rs, int flags)
2215 {
2216         return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
2217 }
2218
2219 static int rs_is_cq_armed(struct rsocket *rs)
2220 {
2221         return rs->cq_armed;
2222 }
2223
2224 static int rs_poll_all(struct rsocket *rs)
2225 {
2226         return 1;
2227 }
2228
2229 /*
2230  * We use hardware flow control to prevent over running the remote
2231  * receive queue.  However, data transfers still require space in
2232  * the remote rmsg queue, or we risk losing notification that data
2233  * has been transfered.
2234  *
2235  * Be careful with race conditions in the check below.  The target SGL
2236  * may be updated by a remote RDMA write.
2237  */
2238 static int rs_can_send(struct rsocket *rs)
2239 {
2240         if (!(rs->opts & RS_OPT_MSG_SEND)) {
2241                 return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
2242                        (rs->sseq_no != rs->sseq_comp) &&
2243                        (rs->target_sgl[rs->target_sge].length != 0);
2244         } else {
2245                 return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
2246                        (rs->sseq_no != rs->sseq_comp) &&
2247                        (rs->target_sgl[rs->target_sge].length != 0);
2248         }
2249 }
2250
2251 static int ds_can_send(struct rsocket *rs)
2252 {
2253         return rs->sqe_avail;
2254 }
2255
2256 static int ds_all_sends_done(struct rsocket *rs)
2257 {
2258         return rs->sqe_avail == rs->sq_size;
2259 }
2260
2261 static int rs_conn_can_send(struct rsocket *rs)
2262 {
2263         return rs_can_send(rs) || !(rs->state & rs_writable);
2264 }
2265
2266 static int rs_conn_can_send_ctrl(struct rsocket *rs)
2267 {
2268         return rs_ctrl_avail(rs) || !(rs->state & rs_connected);
2269 }
2270
2271 static int rs_have_rdata(struct rsocket *rs)
2272 {
2273         return (rs->rmsg_head != rs->rmsg_tail);
2274 }
2275
2276 static int rs_conn_have_rdata(struct rsocket *rs)
2277 {
2278         return rs_have_rdata(rs) || !(rs->state & rs_readable);
2279 }
2280
2281 static int rs_conn_all_sends_done(struct rsocket *rs)
2282 {
2283         return ((((int) rs->ctrl_max_seqno) - ((int) rs->ctrl_seqno)) +
2284                 rs->sqe_avail == rs->sq_size) ||
2285                !(rs->state & rs_connected);
2286 }
2287
2288 static void ds_set_src(struct sockaddr *addr, socklen_t *addrlen,
2289                        struct ds_header *hdr)
2290 {
2291         union socket_addr sa;
2292
2293         memset(&sa, 0, sizeof sa);
2294         if (hdr->version == 4) {
2295                 if (*addrlen > sizeof(sa.sin))
2296                         *addrlen = sizeof(sa.sin);
2297
2298                 sa.sin.sin_family = AF_INET;
2299                 sa.sin.sin_port = hdr->port;
2300                 sa.sin.sin_addr.s_addr =  hdr->addr.ipv4;
2301         } else {
2302                 if (*addrlen > sizeof(sa.sin6))
2303                         *addrlen = sizeof(sa.sin6);
2304
2305                 sa.sin6.sin6_family = AF_INET6;
2306                 sa.sin6.sin6_port = hdr->port;
2307                 sa.sin6.sin6_flowinfo = hdr->addr.ipv6.flowinfo;
2308                 memcpy(&sa.sin6.sin6_addr, &hdr->addr.ipv6.addr, 16);
2309         }
2310         memcpy(addr, &sa, *addrlen);
2311 }
2312
2313 static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
2314                            struct sockaddr *src_addr, socklen_t *addrlen)
2315 {
2316         struct ds_rmsg *rmsg;
2317         struct ds_header *hdr;
2318         int ret;
2319
2320         if (!(rs->state & rs_readable))
2321                 return ERR(EINVAL);
2322
2323         if (!rs_have_rdata(rs)) {
2324                 ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
2325                                   rs_have_rdata);
2326                 if (ret)
2327                         return ret;
2328         }
2329
2330         rmsg = &rs->dmsg[rs->rmsg_head];
2331         hdr = (struct ds_header *) (rmsg->qp->rbuf + rmsg->offset);
2332         if (len > rmsg->length - hdr->length)
2333                 len = rmsg->length - hdr->length;
2334
2335         memcpy(buf, (void *) hdr + hdr->length, len);
2336         if (addrlen)
2337                 ds_set_src(src_addr, addrlen, hdr);
2338
2339         if (!(flags & MSG_PEEK)) {
2340                 ds_post_recv(rs, rmsg->qp, rmsg->offset);
2341                 if (++rs->rmsg_head == rs->rq_size + 1)
2342                         rs->rmsg_head = 0;
2343                 rs->rqe_avail++;
2344         }
2345
2346         return len;
2347 }
2348
2349 static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
2350 {
2351         size_t left = len;
2352         uint32_t end_size, rsize;
2353         int rmsg_head, rbuf_offset;
2354
2355         rmsg_head = rs->rmsg_head;
2356         rbuf_offset = rs->rbuf_offset;
2357
2358         for (; left && (rmsg_head != rs->rmsg_tail); left -= rsize) {
2359                 if (left < rs->rmsg[rmsg_head].data) {
2360                         rsize = left;
2361                 } else {
2362                         rsize = rs->rmsg[rmsg_head].data;
2363                         if (++rmsg_head == rs->rq_size + 1)
2364                                 rmsg_head = 0;
2365                 }
2366
2367                 end_size = rs->rbuf_size - rbuf_offset;
2368                 if (rsize > end_size) {
2369                         memcpy(buf, &rs->rbuf[rbuf_offset], end_size);
2370                         rbuf_offset = 0;
2371                         buf += end_size;
2372                         rsize -= end_size;
2373                         left -= end_size;
2374                 }
2375                 memcpy(buf, &rs->rbuf[rbuf_offset], rsize);
2376                 rbuf_offset += rsize;
2377                 buf += rsize;
2378         }
2379
2380         return len - left;
2381 }
2382
2383 /*
2384  * Continue to receive any queued data even if the remote side has disconnected.
2385  */
2386 ssize_t rrecv(int socket, void *buf, size_t len, int flags)
2387 {
2388         struct rsocket *rs;
2389         size_t left = len;
2390         uint32_t end_size, rsize;
2391         int ret;
2392
2393         rs = idm_at(&idm, socket);
2394         if (rs->type == SOCK_DGRAM) {
2395                 fastlock_acquire(&rs->rlock);
2396                 ret = ds_recvfrom(rs, buf, len, flags, NULL, 0);
2397                 fastlock_release(&rs->rlock);
2398                 return ret;
2399         }
2400
2401         if (rs->state & rs_opening) {
2402                 ret = rs_do_connect(rs);
2403                 if (ret) {
2404                         if (errno == EINPROGRESS)
2405                                 errno = EAGAIN;
2406                         return ret;
2407                 }
2408         }
2409         fastlock_acquire(&rs->rlock);
2410         do {
2411                 if (!rs_have_rdata(rs)) {
2412                         ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
2413                                           rs_conn_have_rdata);
2414                         if (ret)
2415                                 break;
2416                 }
2417
2418                 ret = 0;
2419                 if (flags & MSG_PEEK) {
2420                         left = len - rs_peek(rs, buf, left);
2421                         break;
2422                 }
2423
2424                 for (; left && rs_have_rdata(rs); left -= rsize) {
2425                         if (left < rs->rmsg[rs->rmsg_head].data) {
2426                                 rsize = left;
2427                                 rs->rmsg[rs->rmsg_head].data -= left;
2428                         } else {
2429                                 rs->rseq_no++;
2430                                 rsize = rs->rmsg[rs->rmsg_head].data;
2431                                 if (++rs->rmsg_head == rs->rq_size + 1)
2432                                         rs->rmsg_head = 0;
2433                         }
2434
2435                         end_size = rs->rbuf_size - rs->rbuf_offset;
2436                         if (rsize > end_size) {
2437                                 memcpy(buf, &rs->rbuf[rs->rbuf_offset], end_size);
2438                                 rs->rbuf_offset = 0;
2439                                 buf += end_size;
2440                                 rsize -= end_size;
2441                                 left -= end_size;
2442                                 rs->rbuf_bytes_avail += end_size;
2443                         }
2444                         memcpy(buf, &rs->rbuf[rs->rbuf_offset], rsize);
2445                         rs->rbuf_offset += rsize;
2446                         buf += rsize;
2447                         rs->rbuf_bytes_avail += rsize;
2448                 }
2449
2450         } while (left && (flags & MSG_WAITALL) && (rs->state & rs_readable));
2451
2452         fastlock_release(&rs->rlock);
2453         return ret ? ret : len - left;
2454 }
2455
2456 ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
2457                   struct sockaddr *src_addr, socklen_t *addrlen)
2458 {
2459         struct rsocket *rs;
2460         int ret;
2461
2462         rs = idm_at(&idm, socket);
2463         if (rs->type == SOCK_DGRAM) {
2464                 fastlock_acquire(&rs->rlock);
2465                 ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen);
2466                 fastlock_release(&rs->rlock);
2467                 return ret;
2468         }
2469
2470         ret = rrecv(socket, buf, len, flags);
2471         if (ret > 0 && src_addr)
2472                 rgetpeername(socket, src_addr, addrlen);
2473
2474         return ret;
2475 }
2476
2477 /*
2478  * Simple, straightforward implementation for now that only tries to fill
2479  * in the first vector.
2480  */
2481 static ssize_t rrecvv(int socket, const struct iovec *iov, int iovcnt, int flags)
2482 {
2483         return rrecv(socket, iov[0].iov_base, iov[0].iov_len, flags);
2484 }
2485
2486 ssize_t rrecvmsg(int socket, struct msghdr *msg, int flags)
2487 {
2488         if (msg->msg_control && msg->msg_controllen)
2489                 return ERR(ENOTSUP);
2490
2491         return rrecvv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags);
2492 }
2493
2494 ssize_t rread(int socket, void *buf, size_t count)
2495 {
2496         return rrecv(socket, buf, count, 0);
2497 }
2498
2499 ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
2500 {
2501         return rrecvv(socket, iov, iovcnt, 0);
2502 }
2503
2504 static int rs_send_iomaps(struct rsocket *rs, int flags)
2505 {
2506         struct rs_iomap_mr *iomr;
2507         struct ibv_sge sge;
2508         struct rs_iomap iom;
2509         int ret;
2510
2511         fastlock_acquire(&rs->map_lock);
2512         while (!dlist_empty(&rs->iomap_queue)) {
2513                 if (!rs_can_send(rs)) {
2514                         ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
2515                                           rs_conn_can_send);
2516                         if (ret)
2517                                 break;
2518                         if (!(rs->state & rs_writable)) {
2519                                 ret = ERR(ECONNRESET);
2520                                 break;
2521                         }
2522                 }
2523
2524                 iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry);
2525                 if (!(rs->opts & RS_OPT_SWAP_SGL)) {
2526                         iom.offset = iomr->offset;
2527                         iom.sge.addr = (uintptr_t) iomr->mr->addr;
2528                         iom.sge.length = iomr->mr->length;
2529                         iom.sge.key = iomr->mr->rkey;
2530                 } else {
2531                         iom.offset = bswap_64(iomr->offset);
2532                         iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
2533                         iom.sge.length = bswap_32(iomr->mr->length);
2534                         iom.sge.key = bswap_32(iomr->mr->rkey);
2535                 }
2536
2537                 if (rs->sq_inline >= sizeof iom) {
2538                         sge.addr = (uintptr_t) &iom;
2539                         sge.length = sizeof iom;
2540                         sge.lkey = 0;
2541                         ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE);
2542                 } else if (rs_sbuf_left(rs) >= sizeof iom) {
2543                         memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom);
2544                         rs->ssgl[0].length = sizeof iom;
2545                         ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
2546                         if (rs_sbuf_left(rs) > sizeof iom)
2547                                 rs->ssgl[0].addr += sizeof iom;
2548                         else
2549                                 rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
2550                 } else {
2551                         rs->ssgl[0].length = rs_sbuf_left(rs);
2552                         memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
2553                                 rs->ssgl[0].length);
2554                         rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
2555                         memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
2556                                rs->ssgl[1].length);
2557                         ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
2558                         rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
2559                 }
2560                 dlist_remove(&iomr->entry);
2561                 dlist_insert_tail(&iomr->entry, &rs->iomap_list);
2562                 if (ret)
2563                         break;
2564         }
2565
2566         rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
2567         fastlock_release(&rs->map_lock);
2568         return ret;
2569 }
2570
2571 static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
2572                             int iovcnt, int flags, uint8_t op)
2573 {
2574         struct ds_udp_header hdr;
2575         struct msghdr msg;
2576         struct iovec miov[8];
2577         ssize_t ret;
2578
2579         if (iovcnt > 8)
2580                 return ERR(ENOTSUP);
2581
2582         hdr.tag = htonl(DS_UDP_TAG);
2583         hdr.version = rs->conn_dest->qp->hdr.version;
2584         hdr.op = op;
2585         hdr.reserved = 0;
2586         hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
2587         if (rs->conn_dest->qp->hdr.version == 4) {
2588                 hdr.length = DS_UDP_IPV4_HDR_LEN;
2589                 hdr.addr.ipv4 = rs->conn_dest->qp->hdr.addr.ipv4;
2590         } else {
2591                 hdr.length = DS_UDP_IPV6_HDR_LEN;
2592                 memcpy(hdr.addr.ipv6, &rs->conn_dest->qp->hdr.addr.ipv6, 16);
2593         }
2594
2595         miov[0].iov_base = &hdr;
2596         miov[0].iov_len = hdr.length;
2597         if (iov && iovcnt)
2598                 memcpy(&miov[1], iov, sizeof *iov * iovcnt);
2599
2600         memset(&msg, 0, sizeof msg);
2601         msg.msg_name = &rs->conn_dest->addr;
2602         msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
2603         msg.msg_iov = miov;
2604         msg.msg_iovlen = iovcnt + 1;
2605         ret = sendmsg(rs->udp_sock, &msg, flags);
2606         return ret > 0 ? ret - hdr.length : ret;
2607 }
2608
2609 static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
2610                            int flags, uint8_t op)
2611 {
2612         struct iovec iov;
2613         if (buf && len) {
2614                 iov.iov_base = (void *) buf;
2615                 iov.iov_len = len;
2616                 return ds_sendv_udp(rs, &iov, 1, flags, op);
2617         } else {
2618                 return ds_sendv_udp(rs, NULL, 0, flags, op);
2619         }
2620 }
2621
2622 static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
2623 {
2624         struct ds_smsg *msg;
2625         struct ibv_sge sge;
2626         uint64_t offset;
2627         int ret = 0;
2628
2629         if (!rs->conn_dest->ah)
2630                 return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
2631
2632         if (!ds_can_send(rs)) {
2633                 ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
2634                 if (ret)
2635                         return ret;
2636         }
2637
2638         msg = rs->smsg_free;
2639         rs->smsg_free = msg->next;
2640         rs->sqe_avail--;
2641
2642         memcpy((void *) msg, &rs->conn_dest->qp->hdr, rs->conn_dest->qp->hdr.length);
2643         memcpy((void *) msg + rs->conn_dest->qp->hdr.length, buf, len);
2644         sge.addr = (uintptr_t) msg;
2645         sge.length = rs->conn_dest->qp->hdr.length + len;
2646         sge.lkey = rs->conn_dest->qp->smr->lkey;
2647         offset = (uint8_t *) msg - rs->sbuf;
2648
2649         ret = ds_post_send(rs, &sge, offset);
2650         return ret ? ret : len;
2651 }
2652
2653 /*
2654  * We overlap sending the data, by posting a small work request immediately,
2655  * then increasing the size of the send on each iteration.
2656  */
2657 ssize_t rsend(int socket, const void *buf, size_t len, int flags)
2658 {
2659         struct rsocket *rs;
2660         struct ibv_sge sge;
2661         size_t left = len;
2662         uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
2663         int ret = 0;
2664
2665         rs = idm_at(&idm, socket);
2666         if (rs->type == SOCK_DGRAM) {
2667                 fastlock_acquire(&rs->slock);
2668                 ret = dsend(rs, buf, len, flags);
2669                 fastlock_release(&rs->slock);
2670                 return ret;
2671         }
2672
2673         if (rs->state & rs_opening) {
2674                 ret = rs_do_connect(rs);
2675                 if (ret) {
2676                         if (errno == EINPROGRESS)
2677                                 errno = EAGAIN;
2678                         return ret;
2679                 }
2680         }
2681
2682         fastlock_acquire(&rs->slock);
2683         if (rs->iomap_pending) {
2684                 ret = rs_send_iomaps(rs, flags);
2685                 if (ret)
2686                         goto out;
2687         }
2688         for (; left; left -= xfer_size, buf += xfer_size) {
2689                 if (!rs_can_send(rs)) {
2690                         ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
2691                                           rs_conn_can_send);
2692                         if (ret)
2693                                 break;
2694                         if (!(rs->state & rs_writable)) {
2695                                 ret = ERR(ECONNRESET);
2696                                 break;
2697                         }
2698                 }
2699
2700                 if (olen < left) {
2701                         xfer_size = olen;
2702                         if (olen < RS_MAX_TRANSFER)
2703                                 olen <<= 1;
2704                 } else {
2705                         xfer_size = left;
2706                 }
2707
2708                 if (xfer_size > rs->sbuf_bytes_avail)
2709                         xfer_size = rs->sbuf_bytes_avail;
2710                 if (xfer_size > rs->target_sgl[rs->target_sge].length)
2711                         xfer_size = rs->target_sgl[rs->target_sge].length;
2712
2713                 if (xfer_size <= rs->sq_inline) {
2714                         sge.addr = (uintptr_t) buf;
2715                         sge.length = xfer_size;
2716                         sge.lkey = 0;
2717                         ret = rs_write_data(rs, &sge, 1, xfer_size, IBV_SEND_INLINE);
2718                 } else if (xfer_size <= rs_sbuf_left(rs)) {
2719                         memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
2720                         rs->ssgl[0].length = xfer_size;
2721                         ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, 0);
2722                         if (xfer_size < rs_sbuf_left(rs))
2723                                 rs->ssgl[0].addr += xfer_size;
2724                         else
2725                                 rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
2726                 } else {
2727                         rs->ssgl[0].length = rs_sbuf_left(rs);
2728                         memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
2729                                 rs->ssgl[0].length);
2730                         rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
2731                         memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
2732                         ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, 0);
2733                         rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
2734                 }
2735                 if (ret)
2736                         break;
2737         }
2738 out:
2739         fastlock_release(&rs->slock);
2740
2741         return (ret && left == len) ? ret : len - left;
2742 }
2743
2744 ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
2745                 const struct sockaddr *dest_addr, socklen_t addrlen)
2746 {
2747         struct rsocket *rs;
2748         int ret;
2749
2750         rs = idm_at(&idm, socket);
2751         if (rs->type == SOCK_STREAM) {
2752                 if (dest_addr || addrlen)
2753                         return ERR(EISCONN);
2754
2755                 return rsend(socket, buf, len, flags);
2756         }
2757
2758         if (rs->state == rs_init) {
2759                 ret = ds_init_ep(rs);
2760                 if (ret)
2761                         return ret;
2762         }
2763
2764         fastlock_acquire(&rs->slock);
2765         if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
2766                 ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
2767                 if (ret)
2768                         goto out;
2769         }
2770
2771         ret = dsend(rs, buf, len, flags);
2772 out:
2773         fastlock_release(&rs->slock);
2774         return ret;
2775 }
2776
2777 static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
2778 {
2779         size_t size;
2780
2781         while (len) {
2782                 size = (*iov)->iov_len - *offset;
2783                 if (size > len) {
2784                         memcpy (dst, (*iov)->iov_base + *offset, len);
2785                         *offset += len;
2786                         break;
2787                 }
2788
2789                 memcpy(dst, (*iov)->iov_base + *offset, size);
2790                 len -= size;
2791                 dst += size;
2792                 (*iov)++;
2793                 *offset = 0;
2794         }
2795 }
2796
2797 static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags)
2798 {
2799         struct rsocket *rs;
2800         const struct iovec *cur_iov;
2801         size_t left, len, offset = 0;
2802         uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
2803         int i, ret = 0;
2804
2805         rs = idm_at(&idm, socket);
2806         if (rs->state & rs_opening) {
2807                 ret = rs_do_connect(rs);
2808                 if (ret) {
2809                         if (errno == EINPROGRESS)
2810                                 errno = EAGAIN;
2811                         return ret;
2812                 }
2813         }
2814
2815         cur_iov = iov;
2816         len = iov[0].iov_len;
2817         for (i = 1; i < iovcnt; i++)
2818                 len += iov[i].iov_len;
2819         left = len;
2820
2821         fastlock_acquire(&rs->slock);
2822         if (rs->iomap_pending) {
2823                 ret = rs_send_iomaps(rs, flags);
2824                 if (ret)
2825                         goto out;
2826         }
2827         for (; left; left -= xfer_size) {
2828                 if (!rs_can_send(rs)) {
2829                         ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
2830                                           rs_conn_can_send);
2831                         if (ret)
2832                                 break;
2833                         if (!(rs->state & rs_writable)) {
2834                                 ret = ERR(ECONNRESET);
2835                                 break;
2836                         }
2837                 }
2838
2839                 if (olen < left) {
2840                         xfer_size = olen;
2841                         if (olen < RS_MAX_TRANSFER)
2842                                 olen <<= 1;
2843                 } else {
2844                         xfer_size = left;
2845                 }
2846
2847                 if (xfer_size > rs->sbuf_bytes_avail)
2848                         xfer_size = rs->sbuf_bytes_avail;
2849                 if (xfer_size > rs->target_sgl[rs->target_sge].length)
2850                         xfer_size = rs->target_sgl[rs->target_sge].length;
2851
2852                 if (xfer_size <= rs_sbuf_left(rs)) {
2853                         rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr,
2854                                     &cur_iov, &offset, xfer_size);
2855                         rs->ssgl[0].length = xfer_size;
2856                         ret = rs_write_data(rs, rs->ssgl, 1, xfer_size,
2857                                             xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
2858                         if (xfer_size < rs_sbuf_left(rs))
2859                                 rs->ssgl[0].addr += xfer_size;
2860                         else
2861                                 rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
2862                 } else {
2863                         rs->ssgl[0].length = rs_sbuf_left(rs);
2864                         rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr, &cur_iov,
2865                                     &offset, rs->ssgl[0].length);
2866                         rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
2867                         rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length);
2868                         ret = rs_write_data(rs, rs->ssgl, 2, xfer_size,
2869                                             xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
2870                         rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
2871                 }
2872                 if (ret)
2873                         break;
2874         }
2875 out:
2876         fastlock_release(&rs->slock);
2877
2878         return (ret && left == len) ? ret : len - left;
2879 }
2880
2881 ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
2882 {
2883         if (msg->msg_control && msg->msg_controllen)
2884                 return ERR(ENOTSUP);
2885
2886         return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, flags);
2887 }
2888
2889 ssize_t rwrite(int socket, const void *buf, size_t count)
2890 {
2891         return rsend(socket, buf, count, 0);
2892 }
2893
2894 ssize_t rwritev(int socket, const struct iovec *iov, int iovcnt)
2895 {
2896         return rsendv(socket, iov, iovcnt, 0);
2897 }
2898
2899 static struct pollfd *rs_fds_alloc(nfds_t nfds)
2900 {
2901         static __thread struct pollfd *rfds;
2902         static __thread nfds_t rnfds;
2903
2904         if (nfds > rnfds) {
2905                 if (rfds)
2906                         free(rfds);
2907
2908                 rfds = malloc(sizeof *rfds * nfds);
2909                 rnfds = rfds ? nfds : 0;
2910         }
2911
2912         return rfds;
2913 }
2914
2915 static int rs_poll_rs(struct rsocket *rs, int events,
2916                       int nonblock, int (*test)(struct rsocket *rs))
2917 {
2918         struct pollfd fds;
2919         short revents;
2920         int ret;
2921
2922 check_cq:
2923         if ((rs->type == SOCK_STREAM) && ((rs->state & rs_connected) ||
2924              (rs->state == rs_disconnected) || (rs->state & rs_error))) {
2925                 rs_process_cq(rs, nonblock, test);
2926
2927                 revents = 0;
2928                 if ((events & POLLIN) && rs_conn_have_rdata(rs))
2929                         revents |= POLLIN;
2930                 if ((events & POLLOUT) && rs_can_send(rs))
2931                         revents |= POLLOUT;
2932                 if (!(rs->state & rs_connected)) {
2933                         if (rs->state == rs_disconnected)
2934                                 revents |= POLLHUP;
2935                         else
2936                                 revents |= POLLERR;
2937                 }
2938
2939                 return revents;
2940         } else if (rs->type == SOCK_DGRAM) {
2941                 ds_process_cqs(rs, nonblock, test);
2942
2943                 revents = 0;
2944                 if ((events & POLLIN) && rs_have_rdata(rs))
2945                         revents |= POLLIN;
2946                 if ((events & POLLOUT) && ds_can_send(rs))
2947                         revents |= POLLOUT;
2948
2949                 return revents;
2950         }
2951
2952         if (rs->state == rs_listening) {
2953                 fds.fd = rs->cm_id->channel->fd;
2954                 fds.events = events;
2955                 fds.revents = 0;
2956                 poll(&fds, 1, 0);
2957                 return fds.revents;
2958         }
2959
2960         if (rs->state & rs_opening) {
2961                 ret = rs_do_connect(rs);
2962                 if (ret) {
2963                         if (errno == EINPROGRESS) {
2964                                 errno = 0;
2965                                 return 0;
2966                         } else {
2967                                 return POLLOUT;
2968                         }
2969                 }
2970                 goto check_cq;
2971         }
2972
2973         if (rs->state == rs_connect_error)
2974                 return (rs->err && events & POLLOUT) ? POLLOUT : 0;
2975
2976         return 0;
2977 }
2978
2979 static int rs_poll_check(struct pollfd *fds, nfds_t nfds)
2980 {
2981         struct rsocket *rs;
2982         int i, cnt = 0;
2983
2984         for (i = 0; i < nfds; i++) {
2985                 rs = idm_lookup(&idm, fds[i].fd);
2986                 if (rs)
2987                         fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
2988                 else
2989                         poll(&fds[i], 1, 0);
2990
2991                 if (fds[i].revents)
2992                         cnt++;
2993         }
2994         return cnt;
2995 }
2996
2997 static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
2998 {
2999         struct rsocket *rs;
3000         int i;
3001
3002         for (i = 0; i < nfds; i++) {
3003                 rs = idm_lookup(&idm, fds[i].fd);
3004                 if (rs) {
3005                         fds[i].revents = rs_poll_rs(rs, fds[i].events, 0, rs_is_cq_armed);
3006                         if (fds[i].revents)
3007                                 return 1;
3008
3009                         if (rs->type == SOCK_STREAM) {
3010                                 if (rs->state >= rs_connected)
3011                                         rfds[i].fd = rs->cm_id->recv_cq_channel->fd;
3012                                 else
3013                                         rfds[i].fd = rs->cm_id->channel->fd;
3014                         } else {
3015                                 rfds[i].fd = rs->epfd;
3016                         }
3017                         rfds[i].events = POLLIN;
3018                 } else {
3019                         rfds[i].fd = fds[i].fd;
3020                         rfds[i].events = fds[i].events;
3021                 }
3022                 rfds[i].revents = 0;
3023         }
3024         return 0;
3025 }
3026
3027 static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
3028 {
3029         struct rsocket *rs;
3030         int i, cnt = 0;
3031
3032         for (i = 0; i < nfds; i++) {
3033                 if (!rfds[i].revents)
3034                         continue;
3035
3036                 rs = idm_lookup(&idm, fds[i].fd);
3037                 if (rs) {
3038                         fastlock_acquire(&rs->cq_wait_lock);
3039                         if (rs->type == SOCK_STREAM)
3040                                 rs_get_cq_event(rs);
3041                         else
3042                                 ds_get_cq_event(rs);
3043                         fastlock_release(&rs->cq_wait_lock);
3044                         fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
3045                 } else {
3046                         fds[i].revents = rfds[i].revents;
3047                 }
3048                 if (fds[i].revents)
3049                         cnt++;
3050         }
3051         return cnt;
3052 }
3053
3054 /*
3055  * We need to poll *all* fd's that the user specifies at least once.
3056  * Note that we may receive events on an rsocket that may not be reported
3057  * to the user (e.g. connection events or credit updates).  Process those
3058  * events, then return to polling until we find ones of interest.
3059  */
3060 int rpoll(struct pollfd *fds, nfds_t nfds, int timeout)
3061 {
3062         struct timeval s, e;
3063         struct pollfd *rfds;
3064         uint32_t poll_time = 0;
3065         int ret;
3066
3067         do {
3068                 ret = rs_poll_check(fds, nfds);
3069                 if (ret || !timeout)
3070                         return ret;
3071
3072                 if (!poll_time)
3073                         gettimeofday(&s, NULL);
3074
3075                 gettimeofday(&e, NULL);
3076                 poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
3077                             (e.tv_usec - s.tv_usec) + 1;
3078         } while (poll_time <= polling_time);
3079
3080         rfds = rs_fds_alloc(nfds);
3081         if (!rfds)
3082                 return ERR(ENOMEM);
3083
3084         do {
3085                 ret = rs_poll_arm(rfds, fds, nfds);
3086                 if (ret)
3087                         break;
3088
3089                 ret = poll(rfds, nfds, timeout);
3090                 if (ret <= 0)
3091                         break;
3092
3093                 ret = rs_poll_events(rfds, fds, nfds);
3094         } while (!ret);
3095
3096         return ret;
3097 }
3098
3099 static struct pollfd *
3100 rs_select_to_poll(int *nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds)
3101 {
3102         struct pollfd *fds;
3103         int fd, i = 0;
3104
3105         fds = calloc(*nfds, sizeof *fds);
3106         if (!fds)
3107                 return NULL;
3108
3109         for (fd = 0; fd < *nfds; fd++) {
3110                 if (readfds && FD_ISSET(fd, readfds)) {
3111                         fds[i].fd = fd;
3112                         fds[i].events = POLLIN;
3113                 }
3114
3115                 if (writefds && FD_ISSET(fd, writefds)) {
3116                         fds[i].fd = fd;
3117                         fds[i].events |= POLLOUT;
3118                 }
3119
3120                 if (exceptfds && FD_ISSET(fd, exceptfds))
3121                         fds[i].fd = fd;
3122
3123                 if (fds[i].fd)
3124                         i++;
3125         }
3126
3127         *nfds = i;
3128         return fds;
3129 }
3130
3131 static int
3132 rs_poll_to_select(int nfds, struct pollfd *fds, fd_set *readfds,
3133                   fd_set *writefds, fd_set *exceptfds)
3134 {
3135         int i, cnt = 0;
3136
3137         for (i = 0; i < nfds; i++) {
3138                 if (readfds && (fds[i].revents & (POLLIN | POLLHUP))) {
3139                         FD_SET(fds[i].fd, readfds);
3140                         cnt++;
3141                 }
3142
3143                 if (writefds && (fds[i].revents & POLLOUT)) {
3144                         FD_SET(fds[i].fd, writefds);
3145                         cnt++;
3146                 }
3147
3148                 if (exceptfds && (fds[i].revents & ~(POLLIN | POLLOUT))) {
3149                         FD_SET(fds[i].fd, exceptfds);
3150                         cnt++;
3151                 }
3152         }
3153         return cnt;
3154 }
3155
3156 static int rs_convert_timeout(struct timeval *timeout)
3157 {
3158         return !timeout ? -1 :
3159                 timeout->tv_sec * 1000 + timeout->tv_usec / 1000;
3160 }
3161
3162 int rselect(int nfds, fd_set *readfds, fd_set *writefds,
3163             fd_set *exceptfds, struct timeval *timeout)
3164 {
3165         struct pollfd *fds;
3166         int ret;
3167
3168         fds = rs_select_to_poll(&nfds, readfds, writefds, exceptfds);
3169         if (!fds)
3170                 return ERR(ENOMEM);
3171
3172         ret = rpoll(fds, nfds, rs_convert_timeout(timeout));
3173
3174         if (readfds)
3175                 FD_ZERO(readfds);
3176         if (writefds)
3177                 FD_ZERO(writefds);
3178         if (exceptfds)
3179                 FD_ZERO(exceptfds);
3180
3181         if (ret > 0)
3182                 ret = rs_poll_to_select(nfds, fds, readfds, writefds, exceptfds);
3183
3184         free(fds);
3185         return ret;
3186 }
3187
3188 /*
3189  * For graceful disconnect, notify the remote side that we're
3190  * disconnecting and wait until all outstanding sends complete, provided
3191  * that the remote side has not sent a disconnect message.
3192  */
3193 int rshutdown(int socket, int how)
3194 {
3195         struct rsocket *rs;
3196         int ctrl, ret = 0;
3197
3198         rs = idm_lookup(&idm, socket);
3199         if (!rs)
3200                 return ERR(EBADF);
3201         if (rs->opts & RS_OPT_SVC_ACTIVE)
3202                 rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
3203
3204         if (rs->fd_flags & O_NONBLOCK)
3205                 rs_set_nonblocking(rs, 0);
3206
3207         if (rs->state & rs_connected) {
3208                 if (how == SHUT_RDWR) {
3209                         ctrl = RS_CTRL_DISCONNECT;
3210                         rs->state &= ~(rs_readable | rs_writable);
3211                 } else if (how == SHUT_WR) {
3212                         rs->state &= ~rs_writable;
3213                         ctrl = (rs->state & rs_readable) ?
3214                                 RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
3215                 } else {
3216                         rs->state &= ~rs_readable;
3217                         if (rs->state & rs_writable)
3218                                 goto out;
3219                         ctrl = RS_CTRL_DISCONNECT;
3220                 }
3221                 if (!rs_ctrl_avail(rs)) {
3222                         ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl);
3223                         if (ret)
3224                                 goto out;
3225                 }
3226
3227                 if ((rs->state & rs_connected) && rs_ctrl_avail(rs)) {
3228                         rs->ctrl_seqno++;
3229                         ret = rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, ctrl));
3230                 }
3231         }
3232
3233         if (rs->state & rs_connected)
3234                 rs_process_cq(rs, 0, rs_conn_all_sends_done);
3235
3236 out:
3237         if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
3238                 rs_set_nonblocking(rs, rs->fd_flags);
3239
3240         if (rs->state & rs_disconnected) {
3241                 /* Generate event by flushing receives to unblock rpoll */
3242                 ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
3243                 ucma_shutdown(rs->cm_id);
3244         }
3245
3246         return ret;
3247 }
3248
3249 static void ds_shutdown(struct rsocket *rs)
3250 {
3251         if (rs->opts & RS_OPT_SVC_ACTIVE)
3252                 rs_notify_svc(&udp_svc, rs, RS_SVC_REM_DGRAM);
3253
3254         if (rs->fd_flags & O_NONBLOCK)
3255                 rs_set_nonblocking(rs, 0);
3256
3257         rs->state &= ~(rs_readable | rs_writable);
3258         ds_process_cqs(rs, 0, ds_all_sends_done);
3259
3260         if (rs->fd_flags & O_NONBLOCK)
3261                 rs_set_nonblocking(rs, rs->fd_flags);
3262 }
3263
3264 int rclose(int socket)
3265 {
3266         struct rsocket *rs;
3267
3268         rs = idm_lookup(&idm, socket);
3269         if (!rs)
3270                 return EBADF;
3271         if (rs->type == SOCK_STREAM) {
3272                 if (rs->state & rs_connected)
3273                         rshutdown(socket, SHUT_RDWR);
3274                 else if (rs->opts & RS_OPT_SVC_ACTIVE)
3275                         rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
3276         } else {
3277                 ds_shutdown(rs);
3278         }
3279
3280         rs_free(rs);
3281         return 0;
3282 }
3283
3284 static void rs_copy_addr(struct sockaddr *dst, struct sockaddr *src, socklen_t *len)
3285 {
3286         socklen_t size;
3287
3288         if (src->sa_family == AF_INET) {
3289                 size = min(*len, sizeof(struct sockaddr_in));
3290                 *len = sizeof(struct sockaddr_in);
3291         } else {
3292                 size = min(*len, sizeof(struct sockaddr_in6));
3293                 *len = sizeof(struct sockaddr_in6);
3294         }
3295         memcpy(dst, src, size);
3296 }
3297
3298 int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
3299 {
3300         struct rsocket *rs;
3301
3302         rs = idm_lookup(&idm, socket);
3303         if (!rs)
3304                 return ERR(EBADF);
3305         if (rs->type == SOCK_STREAM) {
3306                 rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
3307                 return 0;
3308         } else {
3309                 return getpeername(rs->udp_sock, addr, addrlen);
3310         }
3311 }
3312
3313 int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
3314 {
3315         struct rsocket *rs;
3316
3317         rs = idm_lookup(&idm, socket);
3318         if (!rs)
3319                 return ERR(EBADF);
3320         if (rs->type == SOCK_STREAM) {
3321                 rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
3322                 return 0;
3323         } else {
3324                 return getsockname(rs->udp_sock, addr, addrlen);
3325         }
3326 }
3327
3328 static int rs_set_keepalive(struct rsocket *rs, int on)
3329 {
3330         FILE *f;
3331         int ret;
3332
3333         if ((on && (rs->opts & RS_OPT_SVC_ACTIVE)) ||
3334             (!on && !(rs->opts & RS_OPT_SVC_ACTIVE)))
3335                 return 0;
3336
3337         if (on) {
3338                 if (!rs->keepalive_time) {
3339                         if ((f = fopen("/proc/sys/net/ipv4/tcp_keepalive_time", "r"))) {
3340                                 (void) fscanf(f, "%u", &rs->keepalive_time);
3341                                 fclose(f);
3342                         } else {
3343                                 rs->keepalive_time = 7200;
3344                         }
3345                 }
3346                 ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_ADD_KEEPALIVE);
3347         } else {
3348                 ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
3349         }
3350
3351         return ret;
3352 }
3353
3354 int rsetsockopt(int socket, int level, int optname,
3355                 const void *optval, socklen_t optlen)
3356 {
3357         struct rsocket *rs;
3358         int ret, opt_on = 0;
3359         uint64_t *opts = NULL;
3360
3361         ret = ERR(ENOTSUP);
3362         rs = idm_lookup(&idm, socket);
3363         if (!rs)
3364                 return ERR(EBADF);
3365         if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
3366                 ret = setsockopt(rs->udp_sock, level, optname, optval, optlen);
3367                 if (ret)
3368                         return ret;
3369         }
3370
3371         switch (level) {
3372         case SOL_SOCKET:
3373                 opts = &rs->so_opts;
3374                 switch (optname) {
3375                 case SO_REUSEADDR:
3376                         if (rs->type == SOCK_STREAM) {
3377                                 ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
3378                                                       RDMA_OPTION_ID_REUSEADDR,
3379                                                       (void *) optval, optlen);
3380                                 if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
3381                                     rs->cm_id->context &&
3382                                     (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
3383                                         ret = 0;
3384                         }
3385                         opt_on = *(int *) optval;
3386                         break;
3387                 case SO_RCVBUF:
3388                         if ((rs->type == SOCK_STREAM && !rs->rbuf) ||
3389                             (rs->type == SOCK_DGRAM && !rs->qp_list))
3390                                 rs->rbuf_size = (*(uint32_t *) optval) << 1;
3391                         ret = 0;
3392                         break;
3393                 case SO_SNDBUF:
3394                         if (!rs->sbuf)
3395                                 rs->sbuf_size = (*(uint32_t *) optval) << 1;
3396                         if (rs->sbuf_size < RS_SNDLOWAT)
3397                                 rs->sbuf_size = RS_SNDLOWAT << 1;
3398                         ret = 0;
3399                         break;
3400                 case SO_LINGER:
3401                         /* Invert value so default so_opt = 0 is on */
3402                         opt_on =  !((struct linger *) optval)->l_onoff;
3403                         ret = 0;
3404                         break;
3405                 case SO_KEEPALIVE:
3406                         ret = rs_set_keepalive(rs, *(int *) optval);
3407                         opt_on = rs->opts & RS_OPT_SVC_ACTIVE;
3408                         break;
3409                 case SO_OOBINLINE:
3410                         opt_on = *(int *) optval;
3411                         ret = 0;
3412                         break;
3413                 default:
3414                         break;
3415                 }
3416                 break;
3417         case IPPROTO_TCP:
3418                 opts = &rs->tcp_opts;
3419                 switch (optname) {
3420                 case TCP_KEEPCNT:
3421                 case TCP_KEEPINTVL:
3422                         ret = 0;   /* N/A - we're using a reliable connection */
3423                         break;
3424                 case TCP_KEEPIDLE:
3425                         if (*(int *) optval <= 0) {
3426                                 ret = ERR(EINVAL);
3427                                 break;
3428                         }
3429                         rs->keepalive_time = *(int *) optval;
3430                         ret = (rs->opts & RS_OPT_SVC_ACTIVE) ?
3431                               rs_notify_svc(&tcp_svc, rs, RS_SVC_MOD_KEEPALIVE) : 0;
3432                         break;
3433                 case TCP_NODELAY:
3434                         opt_on = *(int *) optval;
3435                         ret = 0;
3436                         break;
3437                 case TCP_MAXSEG:
3438                         ret = 0;
3439                         break;
3440                 default:
3441                         break;
3442                 }
3443                 break;
3444         case IPPROTO_IPV6:
3445                 opts = &rs->ipv6_opts;
3446                 switch (optname) {
3447                 case IPV6_V6ONLY:
3448                         if (rs->type == SOCK_STREAM) {
3449                                 ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
3450                                                       RDMA_OPTION_ID_AFONLY,
3451                                                       (void *) optval, optlen);
3452                         }
3453                         opt_on = *(int *) optval;
3454                         break;
3455                 default:
3456                         break;
3457                 }
3458                 break;
3459         case SOL_RDMA:
3460                 if (rs->state >= rs_opening) {
3461                         ret = ERR(EINVAL);
3462                         break;
3463                 }
3464
3465                 switch (optname) {
3466                 case RDMA_SQSIZE:
3467                         rs->sq_size = min((*(uint32_t *) optval), RS_QP_MAX_SIZE);
3468                         ret = 0;
3469                         break;
3470                 case RDMA_RQSIZE:
3471                         rs->rq_size = min((*(uint32_t *) optval), RS_QP_MAX_SIZE);
3472                         ret = 0;
3473                         break;
3474                 case RDMA_INLINE:
3475                         rs->sq_inline = min(*(uint32_t *) optval, RS_QP_MAX_SIZE);
3476                         ret = 0;
3477                         break;
3478                 case RDMA_IOMAPSIZE:
3479                         rs->target_iomap_size = (uint16_t) rs_scale_to_value(
3480                                 (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8);
3481                         ret = 0;
3482                         break;
3483                 case RDMA_ROUTE:
3484                         if ((rs->optval = malloc(optlen))) {
3485                                 memcpy(rs->optval, optval, optlen);
3486                                 rs->optlen = optlen;
3487                                 ret = 0;
3488                         } else {
3489                                 ret = ERR(ENOMEM);
3490                         }
3491                         break;
3492                 default:
3493                         break;
3494                 }
3495                 break;
3496         default:
3497                 break;
3498         }
3499
3500         if (!ret && opts) {
3501                 if (opt_on)
3502                         *opts |= (1 << optname);
3503                 else
3504                         *opts &= ~(1 << optname);
3505         }
3506
3507         return ret;
3508 }
3509
3510 static void rs_convert_sa_path(struct ibv_sa_path_rec *sa_path,
3511                                struct ibv_path_data *path_data)
3512 {
3513         uint32_t fl_hop;
3514
3515         memset(path_data, 0, sizeof(*path_data));
3516         path_data->path.dgid = sa_path->dgid;
3517         path_data->path.sgid = sa_path->sgid;
3518         path_data->path.dlid = sa_path->dlid;
3519         path_data->path.slid = sa_path->slid;
3520         fl_hop = ntohl(sa_path->flow_label) << 8;
3521         path_data->path.flowlabel_hoplimit = htonl(fl_hop) | sa_path->hop_limit;
3522         path_data->path.tclass = sa_path->traffic_class;
3523         path_data->path.reversible_numpath = sa_path->reversible << 7 | 1;
3524         path_data->path.pkey = sa_path->pkey;
3525         path_data->path.qosclass_sl = sa_path->sl;
3526         path_data->path.mtu = sa_path->mtu | 2 << 6;    /* exactly */
3527         path_data->path.rate = sa_path->rate | 2 << 6;
3528         path_data->path.packetlifetime = sa_path->packet_life_time | 2 << 6;
3529         path_data->flags= sa_path->preference;
3530 }
3531
3532 int rgetsockopt(int socket, int level, int optname,
3533                 void *optval, socklen_t *optlen)
3534 {
3535         struct rsocket *rs;
3536         void *opt;
3537         struct ibv_sa_path_rec *path_rec;
3538         struct ibv_path_data path_data;
3539         socklen_t len;
3540         int ret = 0;
3541         int num_paths;
3542
3543         rs = idm_lookup(&idm, socket);
3544         if (!rs)
3545                 return ERR(EBADF);
3546         switch (level) {
3547         case SOL_SOCKET:
3548                 switch (optname) {
3549                 case SO_REUSEADDR:
3550                 case SO_KEEPALIVE:
3551                 case SO_OOBINLINE:
3552                         *((int *) optval) = !!(rs->so_opts & (1 << optname));
3553                         *optlen = sizeof(int);
3554                         break;
3555                 case SO_RCVBUF:
3556                         *((int *) optval) = rs->rbuf_size;
3557                         *optlen = sizeof(int);
3558                         break;
3559                 case SO_SNDBUF:
3560                         *((int *) optval) = rs->sbuf_size;
3561                         *optlen = sizeof(int);
3562                         break;
3563                 case SO_LINGER:
3564                         /* Value is inverted so default so_opt = 0 is on */
3565                         ((struct linger *) optval)->l_onoff =
3566                                         !(rs->so_opts & (1 << optname));
3567                         ((struct linger *) optval)->l_linger = 0;
3568                         *optlen = sizeof(struct linger);
3569                         break;
3570                 case SO_ERROR:
3571                         *((int *) optval) = rs->err;
3572                         *optlen = sizeof(int);
3573                         rs->err = 0;
3574                         break;
3575                 default:
3576                         ret = ENOTSUP;
3577                         break;
3578                 }
3579                 break;
3580         case IPPROTO_TCP:
3581                 switch (optname) {
3582                 case TCP_KEEPCNT:
3583                 case TCP_KEEPINTVL:
3584                         *((int *) optval) = 1;   /* N/A */
3585                         break;
3586                 case TCP_KEEPIDLE:
3587                         *((int *) optval) = (int) rs->keepalive_time;
3588                         *optlen = sizeof(int);
3589                         break;
3590                 case TCP_NODELAY:
3591                         *((int *) optval) = !!(rs->tcp_opts & (1 << optname));
3592                         *optlen = sizeof(int);
3593                         break;
3594                 case TCP_MAXSEG:
3595                         *((int *) optval) = (rs->cm_id && rs->cm_id->route.num_paths) ?
3596                                             1 << (7 + rs->cm_id->route.path_rec->mtu) :
3597                                             2048;
3598                         *optlen = sizeof(int);
3599                         break;
3600                 default:
3601                         ret = ENOTSUP;
3602                         break;
3603                 }
3604                 break;
3605         case IPPROTO_IPV6:
3606                 switch (optname) {
3607                 case IPV6_V6ONLY:
3608                         *((int *) optval) = !!(rs->ipv6_opts & (1 << optname));
3609                         *optlen = sizeof(int);
3610                         break;
3611                 default:
3612                         ret = ENOTSUP;
3613                         break;
3614                 }
3615                 break;
3616         case SOL_RDMA:
3617                 switch (optname) {
3618                 case RDMA_SQSIZE:
3619                         *((int *) optval) = rs->sq_size;
3620                         *optlen = sizeof(int);
3621                         break;
3622                 case RDMA_RQSIZE:
3623                         *((int *) optval) = rs->rq_size;
3624                         *optlen = sizeof(int);
3625                         break;
3626                 case RDMA_INLINE:
3627                         *((int *) optval) = rs->sq_inline;
3628                         *optlen = sizeof(int);
3629                         break;
3630                 case RDMA_IOMAPSIZE:
3631                         *((int *) optval) = rs->target_iomap_size;
3632                         *optlen = sizeof(int);
3633                         break;
3634                 case RDMA_ROUTE:
3635                         if (rs->optval) {
3636                                 if (*optlen < rs->optlen) {
3637                                         ret = EINVAL;
3638                                 } else {
3639                                         memcpy(rs->optval, optval, rs->optlen);
3640                                         *optlen = rs->optlen;
3641                                 }
3642                         } else {
3643                                 if (*optlen < sizeof(path_data)) {
3644                                         ret = EINVAL;
3645                                 } else {
3646                                         len = 0;
3647                                         opt = optval;
3648                                         path_rec = rs->cm_id->route.path_rec;
3649                                         num_paths = 0;
3650                                         while (len + sizeof(path_data) <= *optlen &&
3651                                                num_paths < rs->cm_id->route.num_paths) {
3652                                                 rs_convert_sa_path(path_rec, &path_data);
3653                                                 memcpy(opt, &path_data, sizeof(path_data));
3654                                                 len += sizeof(path_data);
3655                                                 opt += sizeof(path_data);
3656                                                 path_rec++;
3657                                                 num_paths++;
3658                                         }
3659                                         *optlen = len;
3660                                         ret = 0;
3661                                 }
3662                         }
3663                         break;
3664                 default:
3665                         ret = ENOTSUP;
3666                         break;
3667                 }
3668                 break;
3669         default:
3670                 ret = ENOTSUP;
3671                 break;
3672         }
3673
3674         return rdma_seterrno(ret);
3675 }
3676
3677 int rfcntl(int socket, int cmd, ... /* arg */ )
3678 {
3679         struct rsocket *rs;
3680         va_list args;
3681         long param;
3682         int ret = 0;
3683
3684         rs = idm_lookup(&idm, socket);
3685         if (!rs)
3686                 return ERR(EBADF);
3687         va_start(args, cmd);
3688         switch (cmd) {
3689         case F_GETFL:
3690                 ret = (int) rs->fd_flags;
3691                 break;
3692         case F_SETFL:
3693                 param = va_arg(args, long);
3694                 if (param & O_NONBLOCK)
3695                         ret = rs_set_nonblocking(rs, O_NONBLOCK);
3696
3697                 if (!ret)
3698                         rs->fd_flags |= param;
3699                 break;
3700         default:
3701                 ret = ERR(ENOTSUP);
3702                 break;
3703         }
3704         va_end(args);
3705         return ret;
3706 }
3707
3708 static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs)
3709 {
3710         int i;
3711
3712         if (!rs->remote_iomappings) {
3713                 rs->remote_iomappings = calloc(rs->remote_iomap.length,
3714                                                sizeof(*rs->remote_iomappings));
3715                 if (!rs->remote_iomappings)
3716                         return NULL;
3717
3718                 for (i = 0; i < rs->remote_iomap.length; i++)
3719                         rs->remote_iomappings[i].index = i;
3720         }
3721
3722         for (i = 0; i < rs->remote_iomap.length; i++) {
3723                 if (!rs->remote_iomappings[i].mr)
3724                         return &rs->remote_iomappings[i];
3725         }
3726         return NULL;
3727 }
3728
3729 /*
3730  * If an offset is given, we map to it.  If offset is -1, then we map the
3731  * offset to the address of buf.  We do not check for conflicts, which must
3732  * be fixed at some point.
3733  */
3734 off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset)
3735 {
3736         struct rsocket *rs;
3737         struct rs_iomap_mr *iomr;
3738         int access = IBV_ACCESS_LOCAL_WRITE;
3739
3740         rs = idm_at(&idm, socket);
3741         if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
3742                 return ERR(EINVAL);
3743
3744         fastlock_acquire(&rs->map_lock);
3745         if (prot & PROT_WRITE) {
3746                 iomr = rs_get_iomap_mr(rs);
3747                 access |= IBV_ACCESS_REMOTE_WRITE;
3748         } else {
3749                 iomr = calloc(1, sizeof *iomr);
3750                 iomr->index = -1;
3751         }
3752         if (!iomr) {
3753                 offset = ERR(ENOMEM);
3754                 goto out;
3755         }
3756
3757         iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access);
3758         if (!iomr->mr) {
3759                 if (iomr->index < 0)
3760                         free(iomr);
3761                 offset = -1;
3762                 goto out;
3763         }
3764
3765         if (offset == -1)
3766                 offset = (uintptr_t) buf;
3767         iomr->offset = offset;
3768         atomic_init(&iomr->refcnt);
3769         atomic_set(&iomr->refcnt, 1);
3770
3771         if (iomr->index >= 0) {
3772                 dlist_insert_tail(&iomr->entry, &rs->iomap_queue);
3773                 rs->iomap_pending = 1;
3774         } else {
3775                 dlist_insert_tail(&iomr->entry, &rs->iomap_list);
3776         }
3777 out:
3778         fastlock_release(&rs->map_lock);
3779         return offset;
3780 }
3781
3782 int riounmap(int socket, void *buf, size_t len)
3783 {
3784         struct rsocket *rs;
3785         struct rs_iomap_mr *iomr;
3786         dlist_entry *entry;
3787         int ret = 0;
3788
3789         rs = idm_at(&idm, socket);
3790         fastlock_acquire(&rs->map_lock);
3791
3792         for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
3793              entry = entry->next) {
3794                 iomr = container_of(entry, struct rs_iomap_mr, entry);
3795                 if (iomr->mr->addr == buf && iomr->mr->length == len) {
3796                         rs_release_iomap_mr(iomr);
3797                         goto out;
3798                 }
3799         }
3800
3801         for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue;
3802              entry = entry->next) {
3803                 iomr = container_of(entry, struct rs_iomap_mr, entry);
3804                 if (iomr->mr->addr == buf && iomr->mr->length == len) {
3805                         rs_release_iomap_mr(iomr);
3806                         goto out;
3807                 }
3808         }
3809         ret = ERR(EINVAL);
3810 out:
3811         fastlock_release(&rs->map_lock);
3812         return ret;
3813 }
3814
3815 static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset)
3816 {
3817         int i;
3818
3819         for (i = 0; i < rs->target_iomap_size; i++) {
3820                 if (offset >= rs->target_iomap[i].offset &&
3821                     offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length)
3822                         return &rs->target_iomap[i];
3823         }
3824         return NULL;
3825 }
3826
3827 size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
3828 {
3829         struct rsocket *rs;
3830         struct rs_iomap *iom = NULL;
3831         struct ibv_sge sge;
3832         size_t left = count;
3833         uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
3834         int ret = 0;
3835
3836         rs = idm_at(&idm, socket);
3837         fastlock_acquire(&rs->slock);
3838         if (rs->iomap_pending) {
3839                 ret = rs_send_iomaps(rs, flags);
3840                 if (ret)
3841                         goto out;
3842         }
3843         for (; left; left -= xfer_size, buf += xfer_size, offset += xfer_size) {
3844                 if (!iom || offset > iom->offset + iom->sge.length) {
3845                         iom = rs_find_iomap(rs, offset);
3846                         if (!iom)
3847                                 break;
3848                 }
3849
3850                 if (!rs_can_send(rs)) {
3851                         ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
3852                                           rs_conn_can_send);
3853                         if (ret)
3854                                 break;
3855                         if (!(rs->state & rs_writable)) {
3856                                 ret = ERR(ECONNRESET);
3857                                 break;
3858                         }
3859                 }
3860
3861                 if (olen < left) {
3862                         xfer_size = olen;
3863                         if (olen < RS_MAX_TRANSFER)
3864                                 olen <<= 1;
3865                 } else {
3866                         xfer_size = left;
3867                 }
3868
3869                 if (xfer_size > rs->sbuf_bytes_avail)
3870                         xfer_size = rs->sbuf_bytes_avail;
3871                 if (xfer_size > iom->offset + iom->sge.length - offset)
3872                         xfer_size = iom->offset + iom->sge.length - offset;
3873
3874                 if (xfer_size <= rs->sq_inline) {
3875                         sge.addr = (uintptr_t) buf;
3876                         sge.length = xfer_size;
3877                         sge.lkey = 0;
3878                         ret = rs_write_direct(rs, iom, offset, &sge, 1,
3879                                               xfer_size, IBV_SEND_INLINE);
3880                 } else if (xfer_size <= rs_sbuf_left(rs)) {
3881                         memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
3882                         rs->ssgl[0].length = xfer_size;
3883                         ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0);
3884                         if (xfer_size < rs_sbuf_left(rs))
3885                                 rs->ssgl[0].addr += xfer_size;
3886                         else
3887                                 rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
3888                 } else {
3889                         rs->ssgl[0].length = rs_sbuf_left(rs);
3890                         memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
3891                                 rs->ssgl[0].length);
3892                         rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
3893                         memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
3894                         ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0);
3895                         rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
3896                 }
3897                 if (ret)
3898                         break;
3899         }
3900 out:
3901         fastlock_release(&rs->slock);
3902
3903         return (ret && left == count) ? ret : count - left;
3904 }
3905
3906 /****************************************************************************
3907  * Service Processing Threads
3908  ****************************************************************************/
3909
3910 static int rs_svc_grow_sets(struct rs_svc *svc, int grow_size)
3911 {
3912         struct rsocket **rss;
3913         void *set, *contexts;
3914
3915         set = calloc(svc->size + grow_size, sizeof(*rss) + svc->context_size);
3916         if (!set)
3917                &nbs