1 /*
2 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
3 */
4
5 /*
6 * This file contains code imported from the OFED rds source file ib_send.c
7 * Oracle elects to have and use the contents of ib_send.c under and governed
8 * by the OpenIB.org BSD license (see below for full license text). However,
9 * the following notice accompanied the original version of this file:
10 */
11
12 /*
13 * Copyright (c) 2006 Oracle. All rights reserved.
14 *
15 * This software is available to you under a choice of one of two
16 * licenses. You may choose to be licensed under the terms of the GNU
17 * General Public License (GPL) Version 2, available from the file
18 * COPYING in the main directory of this source tree, or the
19 * OpenIB.org BSD license below:
20 *
21 * Redistribution and use in source and binary forms, with or
22 * without modification, are permitted provided that the following
23 * conditions are met:
24 *
25 * - Redistributions of source code must retain the above
26 * copyright notice, this list of conditions and the following
27 * disclaimer.
28 *
29 * - Redistributions in binary form must reproduce the above
30 * copyright notice, this list of conditions and the following
31 * disclaimer in the documentation and/or other materials
32 * provided with the distribution.
33 *
34 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
35 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
36 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
37 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
38 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
39 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
40 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
41 * SOFTWARE.
42 *
43 */
44 #include <sys/rds.h>
45
46 #include <sys/ib/clients/rdsv3/rdsv3.h>
47 #include <sys/ib/clients/rdsv3/rdma.h>
48 #include <sys/ib/clients/rdsv3/ib.h>
49 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
50
51 static void
52 rdsv3_ib_send_rdma_complete(struct rdsv3_message *rm,
53 int wc_status)
54 {
55 int notify_status;
56
57 RDSV3_DPRINTF4("rdsv3_ib_send_rdma_complete", "rm: %p, wc_status: %d",
58 rm, wc_status);
59
60 switch (wc_status) {
61 case IBT_WC_WR_FLUSHED_ERR:
62 return;
63
64 case IBT_WC_SUCCESS:
65 notify_status = RDS_RDMA_SUCCESS;
66 break;
67
68 case IBT_WC_REMOTE_ACCESS_ERR:
69 notify_status = RDS_RDMA_REMOTE_ERROR;
70 break;
71
72 default:
73 notify_status = RDS_RDMA_OTHER_ERROR;
74 break;
75 }
76 rdsv3_rdma_send_complete(rm, notify_status);
77
78 RDSV3_DPRINTF4("rdsv3_ib_send_rdma_complete", "rm: %p, wc_status: %d",
79 rm, wc_status);
80 }
81
82 static void rdsv3_ib_dma_unmap_sg_rdma(struct ib_device *dev,
83 uint_t num, struct rdsv3_rdma_sg scat[]);
84
85 void
86 rdsv3_ib_send_unmap_rdma(struct rdsv3_ib_connection *ic,
87 struct rdsv3_rdma_op *op)
88 {
89 RDSV3_DPRINTF4("rdsv3_ib_send_unmap_rdma", "ic: %p, op: %p", ic, op);
90 if (op->r_mapped) {
91 op->r_mapped = 0;
92 if (ic->i_cm_id) {
93 rdsv3_ib_dma_unmap_sg_rdma(ic->i_cm_id->device,
94 op->r_nents, op->r_rdma_sg);
95 } else {
96 rdsv3_ib_dma_unmap_sg_rdma((struct ib_device *)NULL,
97 op->r_nents, op->r_rdma_sg);
98 }
99 }
100 }
101
102 static void
103 rdsv3_ib_send_unmap_rm(struct rdsv3_ib_connection *ic,
104 struct rdsv3_ib_send_work *send,
105 int wc_status)
106 {
107 struct rdsv3_message *rm = send->s_rm;
108
109 RDSV3_DPRINTF4("rdsv3_ib_send_unmap_rm", "ic %p send %p rm %p\n",
110 ic, send, rm);
111
112 mutex_enter(&rm->m_rs_lock);
113 if (rm->m_count) {
114 rdsv3_ib_dma_unmap_sg(ic->i_cm_id->device,
115 rm->m_sg, rm->m_count);
116 rm->m_count = 0;
117 }
118 mutex_exit(&rm->m_rs_lock);
119
120 if (rm->m_rdma_op != NULL) {
121 rdsv3_ib_send_unmap_rdma(ic, rm->m_rdma_op);
122
123 /*
124 * If the user asked for a completion notification on this
125 * message, we can implement three different semantics:
126 * 1. Notify when we received the ACK on the RDS message
127 * that was queued with the RDMA. This provides reliable
128 * notification of RDMA status at the expense of a one-way
129 * packet delay.
130 * 2. Notify when the IB stack gives us the completion
131 * event for the RDMA operation.
132 * 3. Notify when the IB stack gives us the completion
133 * event for the accompanying RDS messages.
134 * Here, we implement approach #3. To implement approach #2,
135 * call rdsv3_rdma_send_complete from the cq_handler.
136 * To implement #1,
137 * don't call rdsv3_rdma_send_complete at all, and fall back to
138 * the notify
139 * handling in the ACK processing code.
140 *
141 * Note: There's no need to explicitly sync any RDMA buffers
142 * using
143 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
144 * operation itself unmapped the RDMA buffers, which takes care
145 * of synching.
146 */
147 rdsv3_ib_send_rdma_complete(rm, wc_status);
148
149 if (rm->m_rdma_op->r_write)
150 rdsv3_stats_add(s_send_rdma_bytes,
151 rm->m_rdma_op->r_bytes);
152 else
153 rdsv3_stats_add(s_recv_rdma_bytes,
154 rm->m_rdma_op->r_bytes);
155 }
156
157 /*
158 * If anyone waited for this message to get flushed out, wake
159 * them up now
160 */
161 rdsv3_message_unmapped(rm);
162
163 rdsv3_message_put(rm);
164 send->s_rm = NULL;
165 }
166
167 void
168 rdsv3_ib_send_init_ring(struct rdsv3_ib_connection *ic)
169 {
170 struct rdsv3_ib_send_work *send;
171 uint32_t i;
172
173 RDSV3_DPRINTF4("rdsv3_ib_send_init_ring", "ic: %p", ic);
174
175 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
176 send->s_rm = NULL;
177 send->s_op = NULL;
178 }
179 }
180
181 void
182 rdsv3_ib_send_clear_ring(struct rdsv3_ib_connection *ic)
183 {
184 struct rdsv3_ib_send_work *send;
185 uint32_t i;
186
187 RDSV3_DPRINTF4("rdsv3_ib_send_clear_ring", "ic: %p", ic);
188
189 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
190 if (send->s_opcode == 0xdd)
191 continue;
192 if (send->s_rm)
193 rdsv3_ib_send_unmap_rm(ic, send, IBT_WC_WR_FLUSHED_ERR);
194 if (send->s_op)
195 rdsv3_ib_send_unmap_rdma(ic, send->s_op);
196 }
197
198 RDSV3_DPRINTF4("rdsv3_ib_send_clear_ring", "Return: ic: %p", ic);
199 }
200
201 /*
202 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
203 * operations performed in the send path. As the sender allocs and potentially
204 * unallocs the next free entry in the ring it doesn't alter which is
205 * the next to be freed, which is what this is concerned with.
206 */
207 void
208 rdsv3_ib_send_cqe_handler(struct rdsv3_ib_connection *ic, ibt_wc_t *wc)
209 {
210 struct rdsv3_connection *conn = ic->conn;
211 struct rdsv3_ib_send_work *send;
212 uint32_t completed, polled;
213 uint32_t oldest;
214 uint32_t i = 0;
215 int ret;
216
217 RDSV3_DPRINTF4("rdsv3_ib_send_cqe_handler",
218 "wc wc_id 0x%llx status %u byte_len %u imm_data %u\n",
219 (unsigned long long)wc->wc_id, wc->wc_status,
220 wc->wc_bytes_xfer, ntohl(wc->wc_immed_data));
221
222 rdsv3_ib_stats_inc(s_ib_tx_cq_event);
223
224 if (wc->wc_id == RDSV3_IB_ACK_WR_ID) {
225 if (ic->i_ack_queued + HZ/2 < jiffies)
226 rdsv3_ib_stats_inc(s_ib_tx_stalled);
227 rdsv3_ib_ack_send_complete(ic);
228 return;
229 }
230
231 oldest = rdsv3_ib_ring_oldest(&ic->i_send_ring);
232
233 completed = rdsv3_ib_ring_completed(&ic->i_send_ring,
234 (wc->wc_id & ~RDSV3_IB_SEND_OP), oldest);
235
236 for (i = 0; i < completed; i++) {
237 send = &ic->i_sends[oldest];
238
239 /*
240 * In the error case, wc->opcode sometimes contains
241 * garbage
242 */
243 switch (send->s_opcode) {
244 case IBT_WRC_SEND:
245 if (send->s_rm)
246 rdsv3_ib_send_unmap_rm(ic, send,
247 wc->wc_status);
248 break;
249 case IBT_WRC_RDMAW:
250 case IBT_WRC_RDMAR:
251 /*
252 * Nothing to be done - the SG list will
253 * be unmapped
254 * when the SEND completes.
255 */
256 break;
257 default:
258 RDSV3_DPRINTF2("rdsv3_ib_send_cq_comp_handler",
259 "RDS/IB: %s: unexpected opcode "
260 "0x%x in WR!",
261 __func__, send->s_opcode);
262 break;
263 }
264
265 send->s_opcode = 0xdd;
266 if (send->s_queued + HZ/2 < jiffies)
267 rdsv3_ib_stats_inc(s_ib_tx_stalled);
268
269 /*
270 * If a RDMA operation produced an error, signal
271 * this right
272 * away. If we don't, the subsequent SEND that goes
273 * with this
274 * RDMA will be canceled with ERR_WFLUSH, and the
275 * application
276 * never learn that the RDMA failed.
277 */
278 if (wc->wc_status ==
279 IBT_WC_REMOTE_ACCESS_ERR && send->s_op) {
280 struct rdsv3_message *rm;
281
282 rm = rdsv3_send_get_message(conn, send->s_op);
283 if (rm) {
284 if (rm->m_rdma_op != NULL)
285 rdsv3_ib_send_unmap_rdma(ic,
286 rm->m_rdma_op);
287 rdsv3_ib_send_rdma_complete(rm,
288 wc->wc_status);
289 rdsv3_message_put(rm);
290 }
291 }
292
293 oldest = (oldest + 1) % ic->i_send_ring.w_nr;
294 }
295
296 rdsv3_ib_ring_free(&ic->i_send_ring, completed);
297
298 clear_bit(RDSV3_LL_SEND_FULL, &conn->c_flags);
299
300 /* We expect errors as the qp is drained during shutdown */
301 if (wc->wc_status != IBT_WC_SUCCESS && rdsv3_conn_up(conn)) {
302 RDSV3_DPRINTF2("rdsv3_ib_send_cqe_handler",
303 "send completion on %u.%u.%u.%u "
304 "had status %u, disconnecting and reconnecting\n",
305 NIPQUAD(conn->c_faddr), wc->wc_status);
306 rdsv3_conn_drop(conn);
307 }
308
309 RDSV3_DPRINTF4("rdsv3_ib_send_cqe_handler", "Return: conn: %p", ic);
310 }
311
312 /*
313 * This is the main function for allocating credits when sending
314 * messages.
315 *
316 * Conceptually, we have two counters:
317 * - send credits: this tells us how many WRs we're allowed
318 * to submit without overruning the reciever's queue. For
319 * each SEND WR we post, we decrement this by one.
320 *
321 * - posted credits: this tells us how many WRs we recently
322 * posted to the receive queue. This value is transferred
323 * to the peer as a "credit update" in a RDS header field.
324 * Every time we transmit credits to the peer, we subtract
325 * the amount of transferred credits from this counter.
326 *
327 * It is essential that we avoid situations where both sides have
328 * exhausted their send credits, and are unable to send new credits
329 * to the peer. We achieve this by requiring that we send at least
330 * one credit update to the peer before exhausting our credits.
331 * When new credits arrive, we subtract one credit that is withheld
332 * until we've posted new buffers and are ready to transmit these
333 * credits (see rdsv3_ib_send_add_credits below).
334 *
335 * The RDS send code is essentially single-threaded; rdsv3_send_xmit
336 * grabs c_send_lock to ensure exclusive access to the send ring.
337 * However, the ACK sending code is independent and can race with
338 * message SENDs.
339 *
340 * In the send path, we need to update the counters for send credits
341 * and the counter of posted buffers atomically - when we use the
342 * last available credit, we cannot allow another thread to race us
343 * and grab the posted credits counter. Hence, we have to use a
344 * spinlock to protect the credit counter, or use atomics.
345 *
346 * Spinlocks shared between the send and the receive path are bad,
347 * because they create unnecessary delays. An early implementation
348 * using a spinlock showed a 5% degradation in throughput at some
349 * loads.
350 *
351 * This implementation avoids spinlocks completely, putting both
352 * counters into a single atomic, and updating that atomic using
353 * atomic_add (in the receive path, when receiving fresh credits),
354 * and using atomic_cmpxchg when updating the two counters.
355 */
356 int
357 rdsv3_ib_send_grab_credits(struct rdsv3_ib_connection *ic,
358 uint32_t wanted, uint32_t *adv_credits, int need_posted)
359 {
360 unsigned int avail, posted, got = 0, advertise;
361 long oldval, newval;
362
363 RDSV3_DPRINTF4("rdsv3_ib_send_grab_credits", "ic: %p, %d %d %d",
364 ic, wanted, *adv_credits, need_posted);
365
366 *adv_credits = 0;
367 if (!ic->i_flowctl)
368 return (wanted);
369
370 try_again:
371 advertise = 0;
372 oldval = newval = atomic_get(&ic->i_credits);
373 posted = IB_GET_POST_CREDITS(oldval);
374 avail = IB_GET_SEND_CREDITS(oldval);
375
376 RDSV3_DPRINTF5("rdsv3_ib_send_grab_credits",
377 "wanted (%u): credits=%u posted=%u\n", wanted, avail, posted);
378
379 /* The last credit must be used to send a credit update. */
380 if (avail && !posted)
381 avail--;
382
383 if (avail < wanted) {
384 struct rdsv3_connection *conn = ic->i_cm_id->context;
385
386 /* Oops, there aren't that many credits left! */
387 set_bit(RDSV3_LL_SEND_FULL, &conn->c_flags);
388 got = avail;
389 } else {
390 /* Sometimes you get what you want, lalala. */
391 got = wanted;
392 }
393 newval -= IB_SET_SEND_CREDITS(got);
394
395 /*
396 * If need_posted is non-zero, then the caller wants
397 * the posted regardless of whether any send credits are
398 * available.
399 */
400 if (posted && (got || need_posted)) {
401 advertise = min(posted, RDSV3_MAX_ADV_CREDIT);
402 newval -= IB_SET_POST_CREDITS(advertise);
403 }
404
405 /* Finally bill everything */
406 if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
407 goto try_again;
408
409 *adv_credits = advertise;
410
411 RDSV3_DPRINTF4("rdsv3_ib_send_grab_credits", "ic: %p, %d %d %d",
412 ic, got, *adv_credits, need_posted);
413
414 return (got);
415 }
416
417 void
418 rdsv3_ib_send_add_credits(struct rdsv3_connection *conn, unsigned int credits)
419 {
420 struct rdsv3_ib_connection *ic = conn->c_transport_data;
421
422 if (credits == 0)
423 return;
424
425 RDSV3_DPRINTF5("rdsv3_ib_send_add_credits",
426 "credits (%u): current=%u%s\n",
427 credits,
428 IB_GET_SEND_CREDITS(atomic_get(&ic->i_credits)),
429 test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags) ?
430 ", ll_send_full" : "");
431
432 atomic_add_32(&ic->i_credits, IB_SET_SEND_CREDITS(credits));
433 if (test_and_clear_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
434 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_send_w, 0);
435
436 ASSERT(!(IB_GET_SEND_CREDITS(credits) >= 16384));
437
438 rdsv3_ib_stats_inc(s_ib_rx_credit_updates);
439
440 RDSV3_DPRINTF4("rdsv3_ib_send_add_credits",
441 "Return: conn: %p, credits: %d",
442 conn, credits);
443 }
444
445 void
446 rdsv3_ib_advertise_credits(struct rdsv3_connection *conn, unsigned int posted)
447 {
448 struct rdsv3_ib_connection *ic = conn->c_transport_data;
449
450 RDSV3_DPRINTF4("rdsv3_ib_advertise_credits", "conn: %p, posted: %d",
451 conn, posted);
452
453 if (posted == 0)
454 return;
455
456 atomic_add_32(&ic->i_credits, IB_SET_POST_CREDITS(posted));
457
458 /*
459 * Decide whether to send an update to the peer now.
460 * If we would send a credit update for every single buffer we
461 * post, we would end up with an ACK storm (ACK arrives,
462 * consumes buffer, we refill the ring, send ACK to remote
463 * advertising the newly posted buffer... ad inf)
464 *
465 * Performance pretty much depends on how often we send
466 * credit updates - too frequent updates mean lots of ACKs.
467 * Too infrequent updates, and the peer will run out of
468 * credits and has to throttle.
469 * For the time being, 16 seems to be a good compromise.
470 */
471 if (IB_GET_POST_CREDITS(atomic_get(&ic->i_credits)) >= 16)
472 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
473 }
474
475 static inline void
476 rdsv3_ib_xmit_populate_wr(struct rdsv3_ib_connection *ic,
477 ibt_send_wr_t *wr, unsigned int pos,
478 struct rdsv3_scatterlist *scat, unsigned int off, unsigned int length,
479 int send_flags)
480 {
481 ibt_wr_ds_t *sge;
482
483 RDSV3_DPRINTF4("rdsv3_ib_xmit_populate_wr",
484 "ic: %p, wr: %p scat: %p %d %d %d %d",
485 ic, wr, scat, pos, off, length, send_flags);
486
487 wr->wr_id = pos | RDSV3_IB_SEND_OP;
488 wr->wr_trans = IBT_RC_SRV;
489 wr->wr_flags = send_flags;
490 wr->wr_opcode = IBT_WRC_SEND;
491
492 if (length != 0) {
493 int ix, len, assigned;
494 ibt_wr_ds_t *sgl;
495
496 ASSERT(length <= scat->length - off);
497
498 sgl = scat->sgl;
499 if (off != 0) {
500 /* find the right sgl to begin with */
501 while (sgl->ds_len <= off) {
502 off -= sgl->ds_len;
503 sgl++;
504 }
505 }
506
507 ix = 1; /* first data sgl is at 1 */
508 assigned = 0;
509 len = length;
510 do {
511 sge = &wr->wr_sgl[ix++];
512 sge->ds_va = sgl->ds_va + off;
513 assigned = min(len, sgl->ds_len - off);
514 sge->ds_len = assigned;
515 sge->ds_key = sgl->ds_key;
516 len -= assigned;
517 if (len != 0) {
518 sgl++;
519 off = 0;
520 }
521 } while (len > 0);
522
523 wr->wr_nds = ix;
524 } else {
525 /*
526 * We're sending a packet with no payload. There is only
527 * one SGE
528 */
529 wr->wr_nds = 1;
530 }
531
532 sge = &wr->wr_sgl[0];
533 sge->ds_va = ic->i_send_hdrs_dma + (pos * sizeof (struct rdsv3_header));
534 sge->ds_len = sizeof (struct rdsv3_header);
535 sge->ds_key = ic->i_mr->lkey;
536
537 RDSV3_DPRINTF4("rdsv3_ib_xmit_populate_wr",
538 "Return: ic: %p, wr: %p scat: %p", ic, wr, scat);
539 }
540
541 /*
542 * This can be called multiple times for a given message. The first time
543 * we see a message we map its scatterlist into the IB device so that
544 * we can provide that mapped address to the IB scatter gather entries
545 * in the IB work requests. We translate the scatterlist into a series
546 * of work requests that fragment the message. These work requests complete
547 * in order so we pass ownership of the message to the completion handler
548 * once we send the final fragment.
549 *
550 * The RDS core uses the c_send_lock to only enter this function once
551 * per connection. This makes sure that the tx ring alloc/unalloc pairs
552 * don't get out of sync and confuse the ring.
553 */
554 int
555 rdsv3_ib_xmit(struct rdsv3_connection *conn, struct rdsv3_message *rm,
556 unsigned int hdr_off, unsigned int sg, unsigned int off)
557 {
558 struct rdsv3_ib_connection *ic = conn->c_transport_data;
559 struct ib_device *dev = ic->i_cm_id->device;
560 struct rdsv3_ib_send_work *send = NULL;
561 struct rdsv3_ib_send_work *first;
562 struct rdsv3_ib_send_work *prev;
563 ibt_send_wr_t *wr;
564 struct rdsv3_scatterlist *scat;
565 uint32_t pos;
566 uint32_t i;
567 uint32_t work_alloc;
568 uint32_t credit_alloc;
569 uint32_t posted;
570 uint32_t adv_credits = 0;
571 int send_flags = 0;
572 int sent;
573 int ret;
574 int flow_controlled = 0;
575
576 RDSV3_DPRINTF4("rdsv3_ib_xmit", "conn: %p, rm: %p", conn, rm);
577
578 ASSERT(!(off % RDSV3_FRAG_SIZE));
579 ASSERT(!(hdr_off != 0 && hdr_off != sizeof (struct rdsv3_header)));
580
581 /* Do not send cong updates to IB loopback */
582 if (conn->c_loopback &&
583 rm->m_inc.i_hdr.h_flags & RDSV3_FLAG_CONG_BITMAP) {
584 rdsv3_cong_map_updated(conn->c_fcong, ~(uint64_t)0);
585 return (sizeof (struct rdsv3_header) + RDSV3_CONG_MAP_BYTES);
586 }
587
588 /* FIXME we may overallocate here */
589 if (ntohl(rm->m_inc.i_hdr.h_len) == 0)
590 i = 1;
591 else
592 i = ceil(ntohl(rm->m_inc.i_hdr.h_len), RDSV3_FRAG_SIZE);
593
594 work_alloc = rdsv3_ib_ring_alloc(&ic->i_send_ring, i, &pos);
595 if (work_alloc != i) {
596 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
597 set_bit(RDSV3_LL_SEND_FULL, &conn->c_flags);
598 rdsv3_ib_stats_inc(s_ib_tx_ring_full);
599 ret = -ENOMEM;
600 goto out;
601 }
602
603 credit_alloc = work_alloc;
604 if (ic->i_flowctl) {
605 credit_alloc = rdsv3_ib_send_grab_credits(ic, work_alloc,
606 &posted, 0);
607 adv_credits += posted;
608 if (credit_alloc < work_alloc) {
609 rdsv3_ib_ring_unalloc(&ic->i_send_ring,
610 work_alloc - credit_alloc);
611 work_alloc = credit_alloc;
612 flow_controlled++;
613 }
614 if (work_alloc == 0) {
615 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
616 rdsv3_ib_stats_inc(s_ib_tx_throttle);
617 ret = -ENOMEM;
618 goto out;
619 }
620 }
621
622 /* map the message the first time we see it */
623 if (ic->i_rm == NULL) {
624 /*
625 * printk(KERN_NOTICE
626 * "rdsv3_ib_xmit prep msg dport=%u flags=0x%x len=%d\n",
627 * be16_to_cpu(rm->m_inc.i_hdr.h_dport),
628 * rm->m_inc.i_hdr.h_flags,
629 * be32_to_cpu(rm->m_inc.i_hdr.h_len));
630 */
631 if (rm->m_nents) {
632 rm->m_count = rdsv3_ib_dma_map_sg(dev,
633 rm->m_sg, rm->m_nents);
634 RDSV3_DPRINTF5("rdsv3_ib_xmit",
635 "ic %p mapping rm %p: %d\n", ic, rm, rm->m_count);
636 if (rm->m_count == 0) {
637 rdsv3_ib_stats_inc(s_ib_tx_sg_mapping_failure);
638 rdsv3_ib_ring_unalloc(&ic->i_send_ring,
639 work_alloc);
640 ret = -ENOMEM; /* XXX ? */
641 RDSV3_DPRINTF2("rdsv3_ib_xmit",
642 "fail: ic %p mapping rm %p: %d\n",
643 ic, rm, rm->m_count);
644 goto out;
645 }
646 } else {
647 rm->m_count = 0;
648 }
649
650 ic->i_unsignaled_wrs = rdsv3_ib_sysctl_max_unsig_wrs;
651 ic->i_unsignaled_bytes = rdsv3_ib_sysctl_max_unsig_bytes;
652 rdsv3_message_addref(rm);
653 ic->i_rm = rm;
654
655 /* Finalize the header */
656 if (test_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags))
657 rm->m_inc.i_hdr.h_flags |= RDSV3_FLAG_ACK_REQUIRED;
658 if (test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags))
659 rm->m_inc.i_hdr.h_flags |= RDSV3_FLAG_RETRANSMITTED;
660
661 /*
662 * If it has a RDMA op, tell the peer we did it. This is
663 * used by the peer to release use-once RDMA MRs.
664 */
665 if (rm->m_rdma_op) {
666 struct rdsv3_ext_header_rdma ext_hdr;
667
668 ext_hdr.h_rdma_rkey = htonl(rm->m_rdma_op->r_key);
669 (void) rdsv3_message_add_extension(&rm->m_inc.i_hdr,
670 RDSV3_EXTHDR_RDMA, &ext_hdr,
671 sizeof (ext_hdr));
672 }
673 if (rm->m_rdma_cookie) {
674 (void) rdsv3_message_add_rdma_dest_extension(
675 &rm->m_inc.i_hdr,
676 rdsv3_rdma_cookie_key(rm->m_rdma_cookie),
677 rdsv3_rdma_cookie_offset(rm->m_rdma_cookie));
678 }
679
680 /*
681 * Note - rdsv3_ib_piggyb_ack clears the ACK_REQUIRED bit, so
682 * we should not do this unless we have a chance of at least
683 * sticking the header into the send ring. Which is why we
684 * should call rdsv3_ib_ring_alloc first.
685 */
686 rm->m_inc.i_hdr.h_ack = htonll(rdsv3_ib_piggyb_ack(ic));
687 rdsv3_message_make_checksum(&rm->m_inc.i_hdr);
688
689 /*
690 * Update adv_credits since we reset the ACK_REQUIRED bit.
691 */
692 (void) rdsv3_ib_send_grab_credits(ic, 0, &posted, 1);
693 adv_credits += posted;
694 ASSERT(adv_credits <= 255);
695 }
696
697 send = &ic->i_sends[pos];
698 first = send;
699 prev = NULL;
700 scat = &rm->m_sg[sg];
701 sent = 0;
702 i = 0;
703
704 /*
705 * Sometimes you want to put a fence between an RDMA
706 * READ and the following SEND.
707 * We could either do this all the time
708 * or when requested by the user. Right now, we let
709 * the application choose.
710 */
711 if (rm->m_rdma_op && rm->m_rdma_op->r_fence)
712 send_flags = IBT_WR_SEND_FENCE;
713
714 /*
715 * We could be copying the header into the unused tail of the page.
716 * That would need to be changed in the future when those pages might
717 * be mapped userspace pages or page cache pages. So instead we always
718 * use a second sge and our long-lived ring of mapped headers. We send
719 * the header after the data so that the data payload can be aligned on
720 * the receiver.
721 */
722
723 /* handle a 0-len message */
724 if (ntohl(rm->m_inc.i_hdr.h_len) == 0) {
725 wr = &ic->i_send_wrs[0];
726 rdsv3_ib_xmit_populate_wr(ic, wr, pos, NULL, 0, 0, send_flags);
727 send->s_queued = jiffies;
728 send->s_op = NULL;
729 send->s_opcode = wr->wr_opcode;
730 goto add_header;
731 }
732
733 /* if there's data reference it with a chain of work reqs */
734 for (; i < work_alloc && scat != &rm->m_sg[rm->m_count]; i++) {
735 unsigned int len;
736
737 send = &ic->i_sends[pos];
738
739 wr = &ic->i_send_wrs[i];
740 len = min(RDSV3_FRAG_SIZE,
741 rdsv3_ib_sg_dma_len(dev, scat) - off);
742 rdsv3_ib_xmit_populate_wr(ic, wr, pos, scat, off, len,
743 send_flags);
744 send->s_queued = jiffies;
745 send->s_op = NULL;
746 send->s_opcode = wr->wr_opcode;
747
748 /*
749 * We want to delay signaling completions just enough to get
750 * the batching benefits but not so much that we create dead
751 * time
752 * on the wire.
753 */
754 if (ic->i_unsignaled_wrs-- == 0) {
755 ic->i_unsignaled_wrs = rdsv3_ib_sysctl_max_unsig_wrs;
756 wr->wr_flags |=
757 IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
758 }
759
760 ic->i_unsignaled_bytes -= len;
761 if (ic->i_unsignaled_bytes <= 0) {
762 ic->i_unsignaled_bytes =
763 rdsv3_ib_sysctl_max_unsig_bytes;
764 wr->wr_flags |=
765 IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
766 }
767
768 /*
769 * Always signal the last one if we're stopping due to flow
770 * control.
771 */
772 if (flow_controlled && i == (work_alloc-1)) {
773 wr->wr_flags |=
774 IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
775 }
776
777 RDSV3_DPRINTF5("rdsv3_ib_xmit", "send %p wr %p num_sge %u \n",
778 send, wr, wr->wr_nds);
779
780 sent += len;
781 off += len;
782 if (off == rdsv3_ib_sg_dma_len(dev, scat)) {
783 scat++;
784 off = 0;
785 }
786
787 add_header:
788 /*
789 * Tack on the header after the data. The header SGE
790 * should already
791 * have been set up to point to the right header buffer.
792 */
793 (void) memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr,
794 sizeof (struct rdsv3_header));
795
796 if (0) {
797 struct rdsv3_header *hdr = &ic->i_send_hdrs[pos];
798
799 RDSV3_DPRINTF2("rdsv3_ib_xmit",
800 "send WR dport=%u flags=0x%x len=%d",
801 ntohs(hdr->h_dport),
802 hdr->h_flags,
803 ntohl(hdr->h_len));
804 }
805 if (adv_credits) {
806 struct rdsv3_header *hdr = &ic->i_send_hdrs[pos];
807
808 /* add credit and redo the header checksum */
809 hdr->h_credit = adv_credits;
810 rdsv3_message_make_checksum(hdr);
811 adv_credits = 0;
812 rdsv3_ib_stats_inc(s_ib_tx_credit_updates);
813 }
814
815 prev = send;
816
817 pos = (pos + 1) % ic->i_send_ring.w_nr;
818 }
819
820 /*
821 * Account the RDS header in the number of bytes we sent, but just once.
822 * The caller has no concept of fragmentation.
823 */
824 if (hdr_off == 0)
825 sent += sizeof (struct rdsv3_header);
826
827 /* if we finished the message then send completion owns it */
828 if (scat == &rm->m_sg[rm->m_count]) {
829 prev->s_rm = ic->i_rm;
830 wr->wr_flags |= IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
831 ic->i_rm = NULL;
832 }
833
834 if (i < work_alloc) {
835 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
836 work_alloc = i;
837 }
838 if (ic->i_flowctl && i < credit_alloc)
839 rdsv3_ib_send_add_credits(conn, credit_alloc - i);
840
841 /* XXX need to worry about failed_wr and partial sends. */
842 ret = ibt_post_send(ib_get_ibt_channel_hdl(ic->i_cm_id),
843 ic->i_send_wrs, i, &posted);
844 if (posted != i) {
845 RDSV3_DPRINTF2("rdsv3_ib_xmit",
846 "ic %p first %p nwr: %d ret %d:%d",
847 ic, first, i, ret, posted);
848 }
849 if (ret) {
850 RDSV3_DPRINTF2("rdsv3_ib_xmit",
851 "RDS/IB: ib_post_send to %u.%u.%u.%u "
852 "returned %d\n", NIPQUAD(conn->c_faddr), ret);
853 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
854 if (prev->s_rm) {
855 ic->i_rm = prev->s_rm;
856 prev->s_rm = NULL;
857 }
858 RDSV3_DPRINTF2("rdsv3_ib_xmit", "ibt_post_send failed\n");
859 rdsv3_conn_drop(ic->conn);
860 ret = -EAGAIN;
861 goto out;
862 }
863
864 ret = sent;
865
866 RDSV3_DPRINTF4("rdsv3_ib_xmit", "Return: conn: %p, rm: %p", conn, rm);
867 out:
868 ASSERT(!adv_credits);
869 return (ret);
870 }
871
872 static void
873 rdsv3_ib_dma_unmap_sg_rdma(struct ib_device *dev, uint_t num,
874 struct rdsv3_rdma_sg scat[])
875 {
876 ibt_hca_hdl_t hca_hdl;
877 int i;
878 int num_sgl;
879
880 RDSV3_DPRINTF4("rdsv3_ib_dma_unmap_sg", "rdma_sg: %p", scat);
881
882 if (dev) {
883 hca_hdl = ib_get_ibt_hca_hdl(dev);
884 } else {
885 hca_hdl = scat[0].hca_hdl;
886 RDSV3_DPRINTF2("rdsv3_ib_dma_unmap_sg_rdma",
887 "NULL dev use cached hca_hdl %p", hca_hdl);
888 }
889
890 if (hca_hdl == NULL)
891 return;
892 scat[0].hca_hdl = NULL;
893
894 for (i = 0; i < num; i++) {
895 if (scat[i].mihdl != NULL) {
896 num_sgl = (scat[i].iovec.bytes / PAGESIZE) + 2;
897 kmem_free(scat[i].swr.wr_sgl,
898 (num_sgl * sizeof (ibt_wr_ds_t)));
899 scat[i].swr.wr_sgl = NULL;
900 (void) ibt_unmap_mem_iov(hca_hdl, scat[i].mihdl);
901 scat[i].mihdl = NULL;
902 } else
903 break;
904 }
905 }
906
907 /* ARGSUSED */
908 uint_t
909 rdsv3_ib_dma_map_sg_rdma(struct ib_device *dev, struct rdsv3_rdma_sg scat[],
910 uint_t num, struct rdsv3_scatterlist **scatl)
911 {
912 ibt_hca_hdl_t hca_hdl;
913 ibt_iov_attr_t iov_attr;
914 struct buf *bp;
915 uint_t i, j, k;
916 uint_t count;
917 struct rdsv3_scatterlist *sg;
918 int ret;
919
920 RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma", "scat: %p, num: %d",
921 scat, num);
922
923 hca_hdl = ib_get_ibt_hca_hdl(dev);
924 scat[0].hca_hdl = hca_hdl;
925 bzero(&iov_attr, sizeof (ibt_iov_attr_t));
926 iov_attr.iov_flags = IBT_IOV_BUF;
927 iov_attr.iov_lso_hdr_sz = 0;
928
929 for (i = 0, count = 0; i < num; i++) {
930 /* transpose umem_cookie to buf structure */
931 bp = ddi_umem_iosetup(scat[i].umem_cookie,
932 scat[i].iovec.addr & PAGEOFFSET, scat[i].iovec.bytes,
933 B_WRITE, 0, 0, NULL, DDI_UMEM_SLEEP);
934 if (bp == NULL) {
935 /* free resources and return error */
936 goto out;
937 }
938 /* setup ibt_map_mem_iov() attributes */
939 iov_attr.iov_buf = bp;
940 iov_attr.iov_wr_nds = (scat[i].iovec.bytes / PAGESIZE) + 2;
941 scat[i].swr.wr_sgl =
942 kmem_zalloc(iov_attr.iov_wr_nds * sizeof (ibt_wr_ds_t),
943 KM_SLEEP);
944
945 ret = ibt_map_mem_iov(hca_hdl, &iov_attr,
946 (ibt_all_wr_t *)&scat[i].swr, &scat[i].mihdl);
947 freerbuf(bp);
948 if (ret != IBT_SUCCESS) {
949 RDSV3_DPRINTF2("rdsv3_ib_dma_map_sg_rdma",
950 "ibt_map_mem_iov returned: %d", ret);
951 /* free resources and return error */
952 kmem_free(scat[i].swr.wr_sgl,
953 iov_attr.iov_wr_nds * sizeof (ibt_wr_ds_t));
954 goto out;
955 }
956 count += scat[i].swr.wr_nds;
957
958 #ifdef DEBUG
959 for (j = 0; j < scat[i].swr.wr_nds; j++) {
960 RDSV3_DPRINTF5("rdsv3_ib_dma_map_sg_rdma",
961 "sgl[%d] va %llx len %x", j,
962 scat[i].swr.wr_sgl[j].ds_va,
963 scat[i].swr.wr_sgl[j].ds_len);
964 }
965 #endif
966 RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma",
967 "iovec.bytes: 0x%x scat[%d]swr.wr_nds: %d",
968 scat[i].iovec.bytes, i, scat[i].swr.wr_nds);
969 }
970
971 count = ((count - 1) / RDSV3_IB_MAX_SGE) + 1;
972 RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma", "Ret: num: %d", count);
973 return (count);
974
975 out:
976 rdsv3_ib_dma_unmap_sg_rdma(dev, num, scat);
977 return (0);
978 }
979
980 int
981 rdsv3_ib_xmit_rdma(struct rdsv3_connection *conn, struct rdsv3_rdma_op *op)
982 {
983 struct rdsv3_ib_connection *ic = conn->c_transport_data;
984 struct rdsv3_ib_send_work *send = NULL;
985 struct rdsv3_rdma_sg *scat;
986 uint64_t remote_addr;
987 uint32_t pos;
988 uint32_t work_alloc;
989 uint32_t i, j, k, idx;
990 uint32_t left, count;
991 uint32_t posted;
992 int sent;
993 ibt_status_t status;
994 ibt_send_wr_t *wr;
995 ibt_wr_ds_t *sge;
996
997 RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", "rdsv3_ib_conn: %p", ic);
998
999 /* map the message the first time we see it */
1000 if (!op->r_mapped) {
1001 op->r_count = rdsv3_ib_dma_map_sg_rdma(ic->i_cm_id->device,
1002 op->r_rdma_sg, op->r_nents, &op->r_sg);
1003 RDSV3_DPRINTF5("rdsv3_ib_xmit_rdma", "ic %p mapping op %p: %d",
1004 ic, op, op->r_count);
1005 if (op->r_count == 0) {
1006 rdsv3_ib_stats_inc(s_ib_tx_sg_mapping_failure);
1007 RDSV3_DPRINTF2("rdsv3_ib_xmit_rdma",
1008 "fail: ic %p mapping op %p: %d",
1009 ic, op, op->r_count);
1010 return (-ENOMEM); /* XXX ? */
1011 }
1012 op->r_mapped = 1;
1013 }
1014
1015 /*
1016 * Instead of knowing how to return a partial rdma read/write
1017 * we insist that there
1018 * be enough work requests to send the entire message.
1019 */
1020 work_alloc = rdsv3_ib_ring_alloc(&ic->i_send_ring, op->r_count, &pos);
1021 if (work_alloc != op->r_count) {
1022 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
1023 rdsv3_ib_stats_inc(s_ib_tx_ring_full);
1024 return (-ENOMEM);
1025 }
1026
1027 RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", "pos %u cnt %u", pos, op->r_count);
1028 /*
1029 * take the scatter list and transpose into a list of
1030 * send wr's each with a scatter list of RDSV3_IB_MAX_SGE
1031 */
1032 scat = &op->r_rdma_sg[0];
1033 sent = 0;
1034 remote_addr = op->r_remote_addr;
1035
1036 for (i = 0, k = 0; i < op->r_nents; i++) {
1037 left = scat[i].swr.wr_nds;
1038 for (idx = 0; left > 0; k++) {
1039 send = &ic->i_sends[pos];
1040 send->s_queued = jiffies;
1041 send->s_opcode = op->r_write ? IBT_WRC_RDMAW :
1042 IBT_WRC_RDMAR;
1043 send->s_op = op;
1044
1045 wr = &ic->i_send_wrs[k];
1046 wr->wr_flags = 0;
1047 wr->wr_id = pos | RDSV3_IB_SEND_OP;
1048 wr->wr_trans = IBT_RC_SRV;
1049 wr->wr_opcode = op->r_write ? IBT_WRC_RDMAW :
1050 IBT_WRC_RDMAR;
1051 wr->wr.rc.rcwr.rdma.rdma_raddr = remote_addr;
1052 wr->wr.rc.rcwr.rdma.rdma_rkey = op->r_key;
1053
1054 if (left > RDSV3_IB_MAX_SGE) {
1055 count = RDSV3_IB_MAX_SGE;
1056 left -= RDSV3_IB_MAX_SGE;
1057 } else {
1058 count = left;
1059 left = 0;
1060 }
1061 wr->wr_nds = count;
1062
1063 for (j = 0; j < count; j++) {
1064 sge = &wr->wr_sgl[j];
1065 *sge = scat[i].swr.wr_sgl[idx];
1066 remote_addr += scat[i].swr.wr_sgl[idx].ds_len;
1067 sent += scat[i].swr.wr_sgl[idx].ds_len;
1068 idx++;
1069 RDSV3_DPRINTF5("xmit_rdma",
1070 "send_wrs[%d]sgl[%d] va %llx len %x",
1071 k, j, sge->ds_va, sge->ds_len);
1072 }
1073 RDSV3_DPRINTF5("rdsv3_ib_xmit_rdma",
1074 "wr[%d] %p key: %x code: %d tlen: %d",
1075 k, wr, wr->wr.rc.rcwr.rdma.rdma_rkey,
1076 wr->wr_opcode, sent);
1077
1078 /*
1079 * We want to delay signaling completions just enough
1080 * to get the batching benefits but not so much that
1081 * we create dead time on the wire.
1082 */
1083 if (ic->i_unsignaled_wrs-- == 0) {
1084 ic->i_unsignaled_wrs =
1085 rdsv3_ib_sysctl_max_unsig_wrs;
1086 wr->wr_flags = IBT_WR_SEND_SIGNAL;
1087 }
1088
1089 pos = (pos + 1) % ic->i_send_ring.w_nr;
1090 }
1091 }
1092
1093 status = ibt_post_send(ib_get_ibt_channel_hdl(ic->i_cm_id),
1094 ic->i_send_wrs, k, &posted);
1095 if (status != IBT_SUCCESS) {
1096 RDSV3_DPRINTF2("rdsv3_ib_xmit_rdma",
1097 "RDS/IB: rdma ib_post_send to %u.%u.%u.%u "
1098 "returned %d", NIPQUAD(conn->c_faddr), status);
1099 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
1100 }
1101 RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", "Ret: %p", ic);
1102 return (status);
1103 }
1104
1105 void
1106 rdsv3_ib_xmit_complete(struct rdsv3_connection *conn)
1107 {
1108 struct rdsv3_ib_connection *ic = conn->c_transport_data;
1109
1110 RDSV3_DPRINTF4("rdsv3_ib_xmit_complete", "conn: %p", conn);
1111
1112 /*
1113 * We may have a pending ACK or window update we were unable
1114 * to send previously (due to flow control). Try again.
1115 */
1116 rdsv3_ib_attempt_ack(ic);
1117 }