1 /*
   2  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
   3  */
   4 
   5 /*
   6  * This file contains code imported from the OFED rds source file ib_send.c
   7  * Oracle elects to have and use the contents of ib_send.c under and governed
   8  * by the OpenIB.org BSD license (see below for full license text). However,
   9  * the following notice accompanied the original version of this file:
  10  */
  11 
  12 /*
  13  * Copyright (c) 2006 Oracle.  All rights reserved.
  14  *
  15  * This software is available to you under a choice of one of two
  16  * licenses.  You may choose to be licensed under the terms of the GNU
  17  * General Public License (GPL) Version 2, available from the file
  18  * COPYING in the main directory of this source tree, or the
  19  * OpenIB.org BSD license below:
  20  *
  21  *     Redistribution and use in source and binary forms, with or
  22  *     without modification, are permitted provided that the following
  23  *     conditions are met:
  24  *
  25  *      - Redistributions of source code must retain the above
  26  *        copyright notice, this list of conditions and the following
  27  *        disclaimer.
  28  *
  29  *      - Redistributions in binary form must reproduce the above
  30  *        copyright notice, this list of conditions and the following
  31  *        disclaimer in the documentation and/or other materials
  32  *        provided with the distribution.
  33  *
  34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  35  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  36  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  37  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  38  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  39  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  40  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  41  * SOFTWARE.
  42  *
  43  */
  44 #include <sys/rds.h>
  45 
  46 #include <sys/ib/clients/rdsv3/rdsv3.h>
  47 #include <sys/ib/clients/rdsv3/rdma.h>
  48 #include <sys/ib/clients/rdsv3/ib.h>
  49 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
  50 
  51 static void
  52 rdsv3_ib_send_rdma_complete(struct rdsv3_message *rm,
  53     int wc_status)
  54 {
  55         int notify_status;
  56 
  57         RDSV3_DPRINTF4("rdsv3_ib_send_rdma_complete", "rm: %p, wc_status: %d",
  58             rm, wc_status);
  59 
  60         switch (wc_status) {
  61         case IBT_WC_WR_FLUSHED_ERR:
  62                 return;
  63 
  64         case IBT_WC_SUCCESS:
  65                 notify_status = RDS_RDMA_SUCCESS;
  66                 break;
  67 
  68         case IBT_WC_REMOTE_ACCESS_ERR:
  69                 notify_status = RDS_RDMA_REMOTE_ERROR;
  70                 break;
  71 
  72         default:
  73                 notify_status = RDS_RDMA_OTHER_ERROR;
  74                 break;
  75         }
  76         rdsv3_rdma_send_complete(rm, notify_status);
  77 
  78         RDSV3_DPRINTF4("rdsv3_ib_send_rdma_complete", "rm: %p, wc_status: %d",
  79             rm, wc_status);
  80 }
  81 
  82 static void rdsv3_ib_dma_unmap_sg_rdma(struct ib_device *dev,
  83     uint_t num, struct rdsv3_rdma_sg scat[]);
  84 
  85 void
  86 rdsv3_ib_send_unmap_rdma(struct rdsv3_ib_connection *ic,
  87     struct rdsv3_rdma_op *op)
  88 {
  89         RDSV3_DPRINTF4("rdsv3_ib_send_unmap_rdma", "ic: %p, op: %p", ic, op);
  90         if (op->r_mapped) {
  91                 op->r_mapped = 0;
  92                 if (ic->i_cm_id) {
  93                         rdsv3_ib_dma_unmap_sg_rdma(ic->i_cm_id->device,
  94                             op->r_nents, op->r_rdma_sg);
  95                 } else {
  96                         rdsv3_ib_dma_unmap_sg_rdma((struct ib_device *)NULL,
  97                             op->r_nents, op->r_rdma_sg);
  98                 }
  99         }
 100 }
 101 
 102 static void
 103 rdsv3_ib_send_unmap_rm(struct rdsv3_ib_connection *ic,
 104     struct rdsv3_ib_send_work *send,
 105     int wc_status)
 106 {
 107         struct rdsv3_message *rm = send->s_rm;
 108 
 109         RDSV3_DPRINTF4("rdsv3_ib_send_unmap_rm", "ic %p send %p rm %p\n",
 110             ic, send, rm);
 111 
 112         mutex_enter(&rm->m_rs_lock);
 113         if (rm->m_count) {
 114                 rdsv3_ib_dma_unmap_sg(ic->i_cm_id->device,
 115                     rm->m_sg, rm->m_count);
 116                 rm->m_count = 0;
 117         }
 118         mutex_exit(&rm->m_rs_lock);
 119 
 120         if (rm->m_rdma_op != NULL) {
 121                 rdsv3_ib_send_unmap_rdma(ic, rm->m_rdma_op);
 122 
 123                 /*
 124                  * If the user asked for a completion notification on this
 125                  * message, we can implement three different semantics:
 126                  *  1.  Notify when we received the ACK on the RDS message
 127                  *      that was queued with the RDMA. This provides reliable
 128                  *      notification of RDMA status at the expense of a one-way
 129                  *      packet delay.
 130                  *  2.  Notify when the IB stack gives us the completion
 131                  *      event for the RDMA operation.
 132                  *  3.  Notify when the IB stack gives us the completion
 133                  *      event for the accompanying RDS messages.
 134                  * Here, we implement approach #3. To implement approach #2,
 135                  * call rdsv3_rdma_send_complete from the cq_handler.
 136                  * To implement #1,
 137                  * don't call rdsv3_rdma_send_complete at all, and fall back to
 138                  * the notify
 139                  * handling in the ACK processing code.
 140                  *
 141                  * Note: There's no need to explicitly sync any RDMA buffers
 142                  * using
 143                  * ib_dma_sync_sg_for_cpu - the completion for the RDMA
 144                  * operation itself unmapped the RDMA buffers, which takes care
 145                  * of synching.
 146                  */
 147                 rdsv3_ib_send_rdma_complete(rm, wc_status);
 148 
 149                 if (rm->m_rdma_op->r_write)
 150                         rdsv3_stats_add(s_send_rdma_bytes,
 151                             rm->m_rdma_op->r_bytes);
 152                 else
 153                         rdsv3_stats_add(s_recv_rdma_bytes,
 154                             rm->m_rdma_op->r_bytes);
 155         }
 156 
 157         /*
 158          * If anyone waited for this message to get flushed out, wake
 159          * them up now
 160          */
 161         rdsv3_message_unmapped(rm);
 162 
 163         rdsv3_message_put(rm);
 164         send->s_rm = NULL;
 165 }
 166 
 167 void
 168 rdsv3_ib_send_init_ring(struct rdsv3_ib_connection *ic)
 169 {
 170         struct rdsv3_ib_send_work *send;
 171         uint32_t i;
 172 
 173         RDSV3_DPRINTF4("rdsv3_ib_send_init_ring", "ic: %p", ic);
 174 
 175         for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
 176                 send->s_rm = NULL;
 177                 send->s_op = NULL;
 178         }
 179 }
 180 
 181 void
 182 rdsv3_ib_send_clear_ring(struct rdsv3_ib_connection *ic)
 183 {
 184         struct rdsv3_ib_send_work *send;
 185         uint32_t i;
 186 
 187         RDSV3_DPRINTF4("rdsv3_ib_send_clear_ring", "ic: %p", ic);
 188 
 189         for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
 190                 if (send->s_opcode == 0xdd)
 191                         continue;
 192                 if (send->s_rm)
 193                         rdsv3_ib_send_unmap_rm(ic, send, IBT_WC_WR_FLUSHED_ERR);
 194                 if (send->s_op)
 195                         rdsv3_ib_send_unmap_rdma(ic, send->s_op);
 196         }
 197 
 198         RDSV3_DPRINTF4("rdsv3_ib_send_clear_ring", "Return: ic: %p", ic);
 199 }
 200 
 201 /*
 202  * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
 203  * operations performed in the send path.  As the sender allocs and potentially
 204  * unallocs the next free entry in the ring it doesn't alter which is
 205  * the next to be freed, which is what this is concerned with.
 206  */
 207 void
 208 rdsv3_ib_send_cqe_handler(struct rdsv3_ib_connection *ic, ibt_wc_t *wc)
 209 {
 210         struct rdsv3_connection *conn = ic->conn;
 211         struct rdsv3_ib_send_work *send;
 212         uint32_t completed, polled;
 213         uint32_t oldest;
 214         uint32_t i = 0;
 215         int ret;
 216 
 217         RDSV3_DPRINTF4("rdsv3_ib_send_cqe_handler",
 218             "wc wc_id 0x%llx status %u byte_len %u imm_data %u\n",
 219             (unsigned long long)wc->wc_id, wc->wc_status,
 220             wc->wc_bytes_xfer, ntohl(wc->wc_immed_data));
 221 
 222         rdsv3_ib_stats_inc(s_ib_tx_cq_event);
 223 
 224         if (wc->wc_id == RDSV3_IB_ACK_WR_ID) {
 225                 if (ic->i_ack_queued + HZ/2 < jiffies)
 226                         rdsv3_ib_stats_inc(s_ib_tx_stalled);
 227                 rdsv3_ib_ack_send_complete(ic);
 228                 return;
 229         }
 230 
 231         oldest = rdsv3_ib_ring_oldest(&ic->i_send_ring);
 232 
 233         completed = rdsv3_ib_ring_completed(&ic->i_send_ring,
 234             (wc->wc_id & ~RDSV3_IB_SEND_OP), oldest);
 235 
 236         for (i = 0; i < completed; i++) {
 237                 send = &ic->i_sends[oldest];
 238 
 239                 /*
 240                  * In the error case, wc->opcode sometimes contains
 241                  * garbage
 242                  */
 243                 switch (send->s_opcode) {
 244                 case IBT_WRC_SEND:
 245                         if (send->s_rm)
 246                                 rdsv3_ib_send_unmap_rm(ic, send,
 247                                     wc->wc_status);
 248                         break;
 249                 case IBT_WRC_RDMAW:
 250                 case IBT_WRC_RDMAR:
 251                         /*
 252                          * Nothing to be done - the SG list will
 253                          * be unmapped
 254                          * when the SEND completes.
 255                          */
 256                         break;
 257                 default:
 258 #ifndef __lock_lint
 259                         RDSV3_DPRINTF2("rdsv3_ib_send_cq_comp_handler",
 260                             "RDS/IB: %s: unexpected opcode "
 261                             "0x%x in WR!",
 262                             __func__, send->s_opcode);
 263 #endif
 264                         break;
 265                 }
 266 
 267                 send->s_opcode = 0xdd;
 268                 if (send->s_queued + HZ/2 < jiffies)
 269                         rdsv3_ib_stats_inc(s_ib_tx_stalled);
 270 
 271                 /*
 272                  * If a RDMA operation produced an error, signal
 273                  * this right
 274                  * away. If we don't, the subsequent SEND that goes
 275                  * with this
 276                  * RDMA will be canceled with ERR_WFLUSH, and the
 277                  * application
 278                  * never learn that the RDMA failed.
 279                  */
 280                 if (wc->wc_status ==
 281                     IBT_WC_REMOTE_ACCESS_ERR && send->s_op) {
 282                         struct rdsv3_message *rm;
 283 
 284                         rm = rdsv3_send_get_message(conn, send->s_op);
 285                         if (rm) {
 286                                 if (rm->m_rdma_op != NULL)
 287                                         rdsv3_ib_send_unmap_rdma(ic,
 288                                             rm->m_rdma_op);
 289                                 rdsv3_ib_send_rdma_complete(rm,
 290                                     wc->wc_status);
 291                                 rdsv3_message_put(rm);
 292                         }
 293                 }
 294 
 295                 oldest = (oldest + 1) % ic->i_send_ring.w_nr;
 296         }
 297 
 298         rdsv3_ib_ring_free(&ic->i_send_ring, completed);
 299 
 300         clear_bit(RDSV3_LL_SEND_FULL, &conn->c_flags);
 301 
 302         /* We expect errors as the qp is drained during shutdown */
 303         if (wc->wc_status != IBT_WC_SUCCESS && rdsv3_conn_up(conn)) {
 304                 RDSV3_DPRINTF2("rdsv3_ib_send_cqe_handler",
 305                     "send completion on %u.%u.%u.%u "
 306                     "had status %u, disconnecting and reconnecting\n",
 307                     NIPQUAD(conn->c_faddr), wc->wc_status);
 308                 rdsv3_conn_drop(conn);
 309         }
 310 
 311         RDSV3_DPRINTF4("rdsv3_ib_send_cqe_handler", "Return: conn: %p", ic);
 312 }
 313 
 314 /*
 315  * This is the main function for allocating credits when sending
 316  * messages.
 317  *
 318  * Conceptually, we have two counters:
 319  *  -   send credits: this tells us how many WRs we're allowed
 320  *      to submit without overruning the reciever's queue. For
 321  *      each SEND WR we post, we decrement this by one.
 322  *
 323  *  -   posted credits: this tells us how many WRs we recently
 324  *      posted to the receive queue. This value is transferred
 325  *      to the peer as a "credit update" in a RDS header field.
 326  *      Every time we transmit credits to the peer, we subtract
 327  *      the amount of transferred credits from this counter.
 328  *
 329  * It is essential that we avoid situations where both sides have
 330  * exhausted their send credits, and are unable to send new credits
 331  * to the peer. We achieve this by requiring that we send at least
 332  * one credit update to the peer before exhausting our credits.
 333  * When new credits arrive, we subtract one credit that is withheld
 334  * until we've posted new buffers and are ready to transmit these
 335  * credits (see rdsv3_ib_send_add_credits below).
 336  *
 337  * The RDS send code is essentially single-threaded; rdsv3_send_xmit
 338  * grabs c_send_lock to ensure exclusive access to the send ring.
 339  * However, the ACK sending code is independent and can race with
 340  * message SENDs.
 341  *
 342  * In the send path, we need to update the counters for send credits
 343  * and the counter of posted buffers atomically - when we use the
 344  * last available credit, we cannot allow another thread to race us
 345  * and grab the posted credits counter.  Hence, we have to use a
 346  * spinlock to protect the credit counter, or use atomics.
 347  *
 348  * Spinlocks shared between the send and the receive path are bad,
 349  * because they create unnecessary delays. An early implementation
 350  * using a spinlock showed a 5% degradation in throughput at some
 351  * loads.
 352  *
 353  * This implementation avoids spinlocks completely, putting both
 354  * counters into a single atomic, and updating that atomic using
 355  * atomic_add (in the receive path, when receiving fresh credits),
 356  * and using atomic_cmpxchg when updating the two counters.
 357  */
 358 int
 359 rdsv3_ib_send_grab_credits(struct rdsv3_ib_connection *ic,
 360     uint32_t wanted, uint32_t *adv_credits, int need_posted)
 361 {
 362         unsigned int avail, posted, got = 0, advertise;
 363         long oldval, newval;
 364 
 365         RDSV3_DPRINTF4("rdsv3_ib_send_grab_credits", "ic: %p, %d %d %d",
 366             ic, wanted, *adv_credits, need_posted);
 367 
 368         *adv_credits = 0;
 369         if (!ic->i_flowctl)
 370                 return (wanted);
 371 
 372 try_again:
 373         advertise = 0;
 374         oldval = newval = atomic_get(&ic->i_credits);
 375         posted = IB_GET_POST_CREDITS(oldval);
 376         avail = IB_GET_SEND_CREDITS(oldval);
 377 
 378         RDSV3_DPRINTF5("rdsv3_ib_send_grab_credits",
 379             "wanted (%u): credits=%u posted=%u\n", wanted, avail, posted);
 380 
 381         /* The last credit must be used to send a credit update. */
 382         if (avail && !posted)
 383                 avail--;
 384 
 385         if (avail < wanted) {
 386                 struct rdsv3_connection *conn = ic->i_cm_id->context;
 387 
 388                 /* Oops, there aren't that many credits left! */
 389                 set_bit(RDSV3_LL_SEND_FULL, &conn->c_flags);
 390                 got = avail;
 391         } else {
 392                 /* Sometimes you get what you want, lalala. */
 393                 got = wanted;
 394         }
 395         newval -= IB_SET_SEND_CREDITS(got);
 396 
 397         /*
 398          * If need_posted is non-zero, then the caller wants
 399          * the posted regardless of whether any send credits are
 400          * available.
 401          */
 402         if (posted && (got || need_posted)) {
 403                 advertise = min(posted, RDSV3_MAX_ADV_CREDIT);
 404                 newval -= IB_SET_POST_CREDITS(advertise);
 405         }
 406 
 407         /* Finally bill everything */
 408         if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
 409                 goto try_again;
 410 
 411         *adv_credits = advertise;
 412 
 413         RDSV3_DPRINTF4("rdsv3_ib_send_grab_credits", "ic: %p, %d %d %d",
 414             ic, got, *adv_credits, need_posted);
 415 
 416         return (got);
 417 }
 418 
 419 void
 420 rdsv3_ib_send_add_credits(struct rdsv3_connection *conn, unsigned int credits)
 421 {
 422         struct rdsv3_ib_connection *ic = conn->c_transport_data;
 423 
 424         if (credits == 0)
 425                 return;
 426 
 427         RDSV3_DPRINTF5("rdsv3_ib_send_add_credits",
 428             "credits (%u): current=%u%s\n",
 429             credits,
 430             IB_GET_SEND_CREDITS(atomic_get(&ic->i_credits)),
 431             test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags) ?
 432             ", ll_send_full" : "");
 433 
 434         atomic_add_32(&ic->i_credits, IB_SET_SEND_CREDITS(credits));
 435         if (test_and_clear_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
 436                 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_send_w, 0);
 437 
 438         ASSERT(!(IB_GET_SEND_CREDITS(credits) >= 16384));
 439 
 440         rdsv3_ib_stats_inc(s_ib_rx_credit_updates);
 441 
 442         RDSV3_DPRINTF4("rdsv3_ib_send_add_credits",
 443             "Return: conn: %p, credits: %d",
 444             conn, credits);
 445 }
 446 
 447 void
 448 rdsv3_ib_advertise_credits(struct rdsv3_connection *conn, unsigned int posted)
 449 {
 450         struct rdsv3_ib_connection *ic = conn->c_transport_data;
 451 
 452         RDSV3_DPRINTF4("rdsv3_ib_advertise_credits", "conn: %p, posted: %d",
 453             conn, posted);
 454 
 455         if (posted == 0)
 456                 return;
 457 
 458         atomic_add_32(&ic->i_credits, IB_SET_POST_CREDITS(posted));
 459 
 460         /*
 461          * Decide whether to send an update to the peer now.
 462          * If we would send a credit update for every single buffer we
 463          * post, we would end up with an ACK storm (ACK arrives,
 464          * consumes buffer, we refill the ring, send ACK to remote
 465          * advertising the newly posted buffer... ad inf)
 466          *
 467          * Performance pretty much depends on how often we send
 468          * credit updates - too frequent updates mean lots of ACKs.
 469          * Too infrequent updates, and the peer will run out of
 470          * credits and has to throttle.
 471          * For the time being, 16 seems to be a good compromise.
 472          */
 473         if (IB_GET_POST_CREDITS(atomic_get(&ic->i_credits)) >= 16)
 474                 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 475 }
 476 
 477 static inline void
 478 rdsv3_ib_xmit_populate_wr(struct rdsv3_ib_connection *ic,
 479     ibt_send_wr_t *wr, unsigned int pos,
 480     struct rdsv3_scatterlist *scat, unsigned int off, unsigned int length,
 481     int send_flags)
 482 {
 483         ibt_wr_ds_t *sge;
 484 
 485         RDSV3_DPRINTF4("rdsv3_ib_xmit_populate_wr",
 486             "ic: %p, wr: %p scat: %p %d %d %d %d",
 487             ic, wr, scat, pos, off, length, send_flags);
 488 
 489         wr->wr_id = pos | RDSV3_IB_SEND_OP;
 490         wr->wr_trans = IBT_RC_SRV;
 491         wr->wr_flags = send_flags;
 492         wr->wr_opcode = IBT_WRC_SEND;
 493 
 494         if (length != 0) {
 495                 int     ix, len, assigned;
 496                 ibt_wr_ds_t *sgl;
 497 
 498                 ASSERT(length <= scat->length - off);
 499 
 500                 sgl = scat->sgl;
 501                 if (off != 0) {
 502                         /* find the right sgl to begin with */
 503                         while (sgl->ds_len <= off) {
 504                                 off -= sgl->ds_len;
 505                                 sgl++;
 506                         }
 507                 }
 508 
 509                 ix = 1; /* first data sgl is at 1 */
 510                 assigned = 0;
 511                 len = length;
 512                 do {
 513                         sge = &wr->wr_sgl[ix++];
 514                         sge->ds_va = sgl->ds_va + off;
 515                         assigned = min(len, sgl->ds_len - off);
 516                         sge->ds_len = assigned;
 517                         sge->ds_key = sgl->ds_key;
 518                         len -= assigned;
 519                         if (len != 0) {
 520                                 sgl++;
 521                                 off = 0;
 522                         }
 523                 } while (len > 0);
 524 
 525                 wr->wr_nds = ix;
 526         } else {
 527                 /*
 528                  * We're sending a packet with no payload. There is only
 529                  * one SGE
 530                  */
 531                 wr->wr_nds = 1;
 532         }
 533 
 534         sge = &wr->wr_sgl[0];
 535         sge->ds_va = ic->i_send_hdrs_dma + (pos * sizeof (struct rdsv3_header));
 536         sge->ds_len = sizeof (struct rdsv3_header);
 537         sge->ds_key = ic->i_mr->lkey;
 538 
 539         RDSV3_DPRINTF4("rdsv3_ib_xmit_populate_wr",
 540             "Return: ic: %p, wr: %p scat: %p", ic, wr, scat);
 541 }
 542 
 543 /*
 544  * This can be called multiple times for a given message.  The first time
 545  * we see a message we map its scatterlist into the IB device so that
 546  * we can provide that mapped address to the IB scatter gather entries
 547  * in the IB work requests.  We translate the scatterlist into a series
 548  * of work requests that fragment the message.  These work requests complete
 549  * in order so we pass ownership of the message to the completion handler
 550  * once we send the final fragment.
 551  *
 552  * The RDS core uses the c_send_lock to only enter this function once
 553  * per connection.  This makes sure that the tx ring alloc/unalloc pairs
 554  * don't get out of sync and confuse the ring.
 555  */
 556 int
 557 rdsv3_ib_xmit(struct rdsv3_connection *conn, struct rdsv3_message *rm,
 558     unsigned int hdr_off, unsigned int sg, unsigned int off)
 559 {
 560         struct rdsv3_ib_connection *ic = conn->c_transport_data;
 561         struct ib_device *dev = ic->i_cm_id->device;
 562         struct rdsv3_ib_send_work *send = NULL;
 563         struct rdsv3_ib_send_work *first;
 564         struct rdsv3_ib_send_work *prev;
 565         ibt_send_wr_t *wr;
 566         struct rdsv3_scatterlist *scat;
 567         uint32_t pos;
 568         uint32_t i;
 569         uint32_t work_alloc;
 570         uint32_t credit_alloc;
 571         uint32_t posted;
 572         uint32_t adv_credits = 0;
 573         int send_flags = 0;
 574         int sent;
 575         int ret;
 576         int flow_controlled = 0;
 577 
 578         RDSV3_DPRINTF4("rdsv3_ib_xmit", "conn: %p, rm: %p", conn, rm);
 579 
 580         ASSERT(!(off % RDSV3_FRAG_SIZE));
 581         ASSERT(!(hdr_off != 0 && hdr_off != sizeof (struct rdsv3_header)));
 582 
 583         /* Do not send cong updates to IB loopback */
 584         if (conn->c_loopback &&
 585             rm->m_inc.i_hdr.h_flags & RDSV3_FLAG_CONG_BITMAP) {
 586                 rdsv3_cong_map_updated(conn->c_fcong, ~(uint64_t)0);
 587                 return (sizeof (struct rdsv3_header) + RDSV3_CONG_MAP_BYTES);
 588         }
 589 
 590 #ifndef __lock_lint
 591         /* FIXME we may overallocate here */
 592         if (ntohl(rm->m_inc.i_hdr.h_len) == 0)
 593                 i = 1;
 594         else
 595                 i = ceil(ntohl(rm->m_inc.i_hdr.h_len), RDSV3_FRAG_SIZE);
 596 #endif
 597 
 598         work_alloc = rdsv3_ib_ring_alloc(&ic->i_send_ring, i, &pos);
 599         if (work_alloc != i) {
 600                 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 601                 set_bit(RDSV3_LL_SEND_FULL, &conn->c_flags);
 602                 rdsv3_ib_stats_inc(s_ib_tx_ring_full);
 603                 ret = -ENOMEM;
 604                 goto out;
 605         }
 606 
 607         credit_alloc = work_alloc;
 608         if (ic->i_flowctl) {
 609                 credit_alloc = rdsv3_ib_send_grab_credits(ic, work_alloc,
 610                     &posted, 0);
 611                 adv_credits += posted;
 612                 if (credit_alloc < work_alloc) {
 613                         rdsv3_ib_ring_unalloc(&ic->i_send_ring,
 614                             work_alloc - credit_alloc);
 615                         work_alloc = credit_alloc;
 616                         flow_controlled++;
 617                 }
 618                 if (work_alloc == 0) {
 619                         rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 620                         rdsv3_ib_stats_inc(s_ib_tx_throttle);
 621                         ret = -ENOMEM;
 622                         goto out;
 623                 }
 624         }
 625 
 626         /* map the message the first time we see it */
 627         if (ic->i_rm == NULL) {
 628                 /*
 629                  * printk(KERN_NOTICE
 630                  * "rdsv3_ib_xmit prep msg dport=%u flags=0x%x len=%d\n",
 631                  * be16_to_cpu(rm->m_inc.i_hdr.h_dport),
 632                  * rm->m_inc.i_hdr.h_flags,
 633                  * be32_to_cpu(rm->m_inc.i_hdr.h_len));
 634                  */
 635                 if (rm->m_nents) {
 636                         rm->m_count = rdsv3_ib_dma_map_sg(dev,
 637                             rm->m_sg, rm->m_nents);
 638                         RDSV3_DPRINTF5("rdsv3_ib_xmit",
 639                             "ic %p mapping rm %p: %d\n", ic, rm, rm->m_count);
 640                         if (rm->m_count == 0) {
 641                                 rdsv3_ib_stats_inc(s_ib_tx_sg_mapping_failure);
 642                                 rdsv3_ib_ring_unalloc(&ic->i_send_ring,
 643                                     work_alloc);
 644                                 ret = -ENOMEM; /* XXX ? */
 645                                 RDSV3_DPRINTF2("rdsv3_ib_xmit",
 646                                     "fail: ic %p mapping rm %p: %d\n",
 647                                     ic, rm, rm->m_count);
 648                                 goto out;
 649                         }
 650                 } else {
 651                         rm->m_count = 0;
 652                 }
 653 
 654                 ic->i_unsignaled_wrs = rdsv3_ib_sysctl_max_unsig_wrs;
 655                 ic->i_unsignaled_bytes = rdsv3_ib_sysctl_max_unsig_bytes;
 656                 rdsv3_message_addref(rm);
 657                 ic->i_rm = rm;
 658 
 659                 /* Finalize the header */
 660                 if (test_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags))
 661                         rm->m_inc.i_hdr.h_flags |= RDSV3_FLAG_ACK_REQUIRED;
 662                 if (test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags))
 663                         rm->m_inc.i_hdr.h_flags |= RDSV3_FLAG_RETRANSMITTED;
 664 
 665                 /*
 666                  * If it has a RDMA op, tell the peer we did it. This is
 667                  * used by the peer to release use-once RDMA MRs.
 668                  */
 669                 if (rm->m_rdma_op) {
 670                         struct rdsv3_ext_header_rdma ext_hdr;
 671 
 672                         ext_hdr.h_rdma_rkey = htonl(rm->m_rdma_op->r_key);
 673                         (void) rdsv3_message_add_extension(&rm->m_inc.i_hdr,
 674                             RDSV3_EXTHDR_RDMA, &ext_hdr,
 675                             sizeof (ext_hdr));
 676                 }
 677                 if (rm->m_rdma_cookie) {
 678                         (void) rdsv3_message_add_rdma_dest_extension(
 679                             &rm->m_inc.i_hdr,
 680                             rdsv3_rdma_cookie_key(rm->m_rdma_cookie),
 681                             rdsv3_rdma_cookie_offset(rm->m_rdma_cookie));
 682                 }
 683 
 684                 /*
 685                  * Note - rdsv3_ib_piggyb_ack clears the ACK_REQUIRED bit, so
 686                  * we should not do this unless we have a chance of at least
 687                  * sticking the header into the send ring. Which is why we
 688                  * should call rdsv3_ib_ring_alloc first.
 689                  */
 690                 rm->m_inc.i_hdr.h_ack = htonll(rdsv3_ib_piggyb_ack(ic));
 691                 rdsv3_message_make_checksum(&rm->m_inc.i_hdr);
 692 
 693                 /*
 694                  * Update adv_credits since we reset the ACK_REQUIRED bit.
 695                  */
 696                 (void) rdsv3_ib_send_grab_credits(ic, 0, &posted, 1);
 697                 adv_credits += posted;
 698                 ASSERT(adv_credits <= 255);
 699         }
 700 
 701         send = &ic->i_sends[pos];
 702         first = send;
 703         prev = NULL;
 704         scat = &rm->m_sg[sg];
 705         sent = 0;
 706         i = 0;
 707 
 708         /*
 709          * Sometimes you want to put a fence between an RDMA
 710          * READ and the following SEND.
 711          * We could either do this all the time
 712          * or when requested by the user. Right now, we let
 713          * the application choose.
 714          */
 715         if (rm->m_rdma_op && rm->m_rdma_op->r_fence)
 716                 send_flags = IBT_WR_SEND_FENCE;
 717 
 718         /*
 719          * We could be copying the header into the unused tail of the page.
 720          * That would need to be changed in the future when those pages might
 721          * be mapped userspace pages or page cache pages.  So instead we always
 722          * use a second sge and our long-lived ring of mapped headers.  We send
 723          * the header after the data so that the data payload can be aligned on
 724          * the receiver.
 725          */
 726 
 727         /* handle a 0-len message */
 728         if (ntohl(rm->m_inc.i_hdr.h_len) == 0) {
 729                 wr = &ic->i_send_wrs[0];
 730                 rdsv3_ib_xmit_populate_wr(ic, wr, pos, NULL, 0, 0, send_flags);
 731                 send->s_queued = jiffies;
 732                 send->s_op = NULL;
 733                 send->s_opcode = wr->wr_opcode;
 734                 goto add_header;
 735         }
 736 
 737         /* if there's data reference it with a chain of work reqs */
 738         for (; i < work_alloc && scat != &rm->m_sg[rm->m_count]; i++) {
 739                 unsigned int len;
 740 
 741                 send = &ic->i_sends[pos];
 742 
 743                 wr = &ic->i_send_wrs[i];
 744                 len = min(RDSV3_FRAG_SIZE,
 745                     rdsv3_ib_sg_dma_len(dev, scat) - off);
 746                 rdsv3_ib_xmit_populate_wr(ic, wr, pos, scat, off, len,
 747                     send_flags);
 748                 send->s_queued = jiffies;
 749                 send->s_op = NULL;
 750                 send->s_opcode = wr->wr_opcode;
 751 
 752                 /*
 753                  * We want to delay signaling completions just enough to get
 754                  * the batching benefits but not so much that we create dead
 755                  * time
 756                  * on the wire.
 757                  */
 758                 if (ic->i_unsignaled_wrs-- == 0) {
 759                         ic->i_unsignaled_wrs = rdsv3_ib_sysctl_max_unsig_wrs;
 760                         wr->wr_flags |=
 761                             IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
 762                 }
 763 
 764                 ic->i_unsignaled_bytes -= len;
 765                 if (ic->i_unsignaled_bytes <= 0) {
 766                         ic->i_unsignaled_bytes =
 767                             rdsv3_ib_sysctl_max_unsig_bytes;
 768                         wr->wr_flags |=
 769                             IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
 770                 }
 771 
 772                 /*
 773                  * Always signal the last one if we're stopping due to flow
 774                  * control.
 775                  */
 776                 if (flow_controlled && i == (work_alloc-1)) {
 777                         wr->wr_flags |=
 778                             IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
 779                 }
 780 
 781                 RDSV3_DPRINTF5("rdsv3_ib_xmit", "send %p wr %p num_sge %u \n",
 782                     send, wr, wr->wr_nds);
 783 
 784                 sent += len;
 785                 off += len;
 786                 if (off == rdsv3_ib_sg_dma_len(dev, scat)) {
 787                         scat++;
 788                         off = 0;
 789                 }
 790 
 791 add_header:
 792                 /*
 793                  * Tack on the header after the data. The header SGE
 794                  * should already
 795                  * have been set up to point to the right header buffer.
 796                  */
 797                 (void) memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr,
 798                     sizeof (struct rdsv3_header));
 799 
 800                 if (0) {
 801                         struct rdsv3_header *hdr = &ic->i_send_hdrs[pos];
 802 
 803                         RDSV3_DPRINTF2("rdsv3_ib_xmit",
 804                             "send WR dport=%u flags=0x%x len=%d",
 805                             ntohs(hdr->h_dport),
 806                             hdr->h_flags,
 807                             ntohl(hdr->h_len));
 808                 }
 809                 if (adv_credits) {
 810                         struct rdsv3_header *hdr = &ic->i_send_hdrs[pos];
 811 
 812                         /* add credit and redo the header checksum */
 813                         hdr->h_credit = adv_credits;
 814                         rdsv3_message_make_checksum(hdr);
 815                         adv_credits = 0;
 816                         rdsv3_ib_stats_inc(s_ib_tx_credit_updates);
 817                 }
 818 
 819                 prev = send;
 820 
 821                 pos = (pos + 1) % ic->i_send_ring.w_nr;
 822         }
 823 
 824         /*
 825          * Account the RDS header in the number of bytes we sent, but just once.
 826          * The caller has no concept of fragmentation.
 827          */
 828         if (hdr_off == 0)
 829                 sent += sizeof (struct rdsv3_header);
 830 
 831         /* if we finished the message then send completion owns it */
 832         if (scat == &rm->m_sg[rm->m_count]) {
 833                 prev->s_rm = ic->i_rm;
 834                 wr->wr_flags |= IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
 835                 ic->i_rm = NULL;
 836         }
 837 
 838         if (i < work_alloc) {
 839                 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
 840                 work_alloc = i;
 841         }
 842         if (ic->i_flowctl && i < credit_alloc)
 843                 rdsv3_ib_send_add_credits(conn, credit_alloc - i);
 844 
 845         /* XXX need to worry about failed_wr and partial sends. */
 846         ret = ibt_post_send(ib_get_ibt_channel_hdl(ic->i_cm_id),
 847             ic->i_send_wrs, i, &posted);
 848         if (posted != i) {
 849                 RDSV3_DPRINTF2("rdsv3_ib_xmit",
 850                     "ic %p first %p nwr: %d ret %d:%d",
 851                     ic, first, i, ret, posted);
 852         }
 853         if (ret) {
 854                 RDSV3_DPRINTF2("rdsv3_ib_xmit",
 855                     "RDS/IB: ib_post_send to %u.%u.%u.%u "
 856                     "returned %d\n", NIPQUAD(conn->c_faddr), ret);
 857                 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 858                 if (prev->s_rm) {
 859                         ic->i_rm = prev->s_rm;
 860                         prev->s_rm = NULL;
 861                 }
 862                 RDSV3_DPRINTF2("rdsv3_ib_xmit", "ibt_post_send failed\n");
 863                 rdsv3_conn_drop(ic->conn);
 864                 ret = -EAGAIN;
 865                 goto out;
 866         }
 867 
 868         ret = sent;
 869 
 870         RDSV3_DPRINTF4("rdsv3_ib_xmit", "Return: conn: %p, rm: %p", conn, rm);
 871 out:
 872         ASSERT(!adv_credits);
 873         return (ret);
 874 }
 875 
 876 static void
 877 rdsv3_ib_dma_unmap_sg_rdma(struct ib_device *dev, uint_t num,
 878         struct rdsv3_rdma_sg scat[])
 879 {
 880         ibt_hca_hdl_t hca_hdl;
 881         int i;
 882         int num_sgl;
 883 
 884         RDSV3_DPRINTF4("rdsv3_ib_dma_unmap_sg", "rdma_sg: %p", scat);
 885 
 886         if (dev) {
 887                 hca_hdl = ib_get_ibt_hca_hdl(dev);
 888         } else {
 889                 hca_hdl = scat[0].hca_hdl;
 890                 RDSV3_DPRINTF2("rdsv3_ib_dma_unmap_sg_rdma",
 891                     "NULL dev use cached hca_hdl %p", hca_hdl);
 892         }
 893 
 894         if (hca_hdl == NULL)
 895                 return;
 896         scat[0].hca_hdl = NULL;
 897 
 898         for (i = 0; i < num; i++) {
 899                 if (scat[i].mihdl != NULL) {
 900                         num_sgl = (scat[i].iovec.bytes / PAGESIZE) + 2;
 901                         kmem_free(scat[i].swr.wr_sgl,
 902                             (num_sgl * sizeof (ibt_wr_ds_t)));
 903                         scat[i].swr.wr_sgl = NULL;
 904                         (void) ibt_unmap_mem_iov(hca_hdl, scat[i].mihdl);
 905                         scat[i].mihdl = NULL;
 906                 } else
 907                         break;
 908         }
 909 }
 910 
 911 /* ARGSUSED */
 912 uint_t
 913 rdsv3_ib_dma_map_sg_rdma(struct ib_device *dev, struct rdsv3_rdma_sg scat[],
 914     uint_t num, struct rdsv3_scatterlist **scatl)
 915 {
 916         ibt_hca_hdl_t hca_hdl;
 917         ibt_iov_attr_t iov_attr;
 918         struct buf *bp;
 919         uint_t i, j, k;
 920         uint_t count;
 921         struct rdsv3_scatterlist *sg;
 922         int ret;
 923 
 924         RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma", "scat: %p, num: %d",
 925             scat, num);
 926 
 927         hca_hdl = ib_get_ibt_hca_hdl(dev);
 928         scat[0].hca_hdl = hca_hdl;
 929         bzero(&iov_attr, sizeof (ibt_iov_attr_t));
 930         iov_attr.iov_flags = IBT_IOV_BUF;
 931         iov_attr.iov_lso_hdr_sz = 0;
 932 
 933         for (i = 0, count = 0; i < num; i++) {
 934                 /* transpose umem_cookie  to buf structure */
 935                 bp = ddi_umem_iosetup(scat[i].umem_cookie,
 936                     scat[i].iovec.addr & PAGEOFFSET, scat[i].iovec.bytes,
 937                     B_WRITE, 0, 0, NULL, DDI_UMEM_SLEEP);
 938                 if (bp == NULL) {
 939                         /* free resources  and return error */
 940                         goto out;
 941                 }
 942                 /* setup ibt_map_mem_iov() attributes */
 943                 iov_attr.iov_buf = bp;
 944                 iov_attr.iov_wr_nds = (scat[i].iovec.bytes / PAGESIZE) + 2;
 945                 scat[i].swr.wr_sgl =
 946                     kmem_zalloc(iov_attr.iov_wr_nds * sizeof (ibt_wr_ds_t),
 947                     KM_SLEEP);
 948 
 949                 ret = ibt_map_mem_iov(hca_hdl, &iov_attr,
 950                     (ibt_all_wr_t *)&scat[i].swr, &scat[i].mihdl);
 951                 freerbuf(bp);
 952                 if (ret != IBT_SUCCESS) {
 953                         RDSV3_DPRINTF2("rdsv3_ib_dma_map_sg_rdma",
 954                             "ibt_map_mem_iov returned: %d", ret);
 955                         /* free resources and return error */
 956                         kmem_free(scat[i].swr.wr_sgl,
 957                             iov_attr.iov_wr_nds * sizeof (ibt_wr_ds_t));
 958                         goto out;
 959                 }
 960                 count += scat[i].swr.wr_nds;
 961 
 962 #ifdef  DEBUG
 963                 for (j = 0; j < scat[i].swr.wr_nds; j++) {
 964                         RDSV3_DPRINTF5("rdsv3_ib_dma_map_sg_rdma",
 965                             "sgl[%d] va %llx len %x", j,
 966                             scat[i].swr.wr_sgl[j].ds_va,
 967                             scat[i].swr.wr_sgl[j].ds_len);
 968                 }
 969 #endif
 970                 RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma",
 971                     "iovec.bytes: 0x%x scat[%d]swr.wr_nds: %d",
 972                     scat[i].iovec.bytes, i, scat[i].swr.wr_nds);
 973         }
 974 
 975         count = ((count - 1) / RDSV3_IB_MAX_SGE) + 1;
 976         RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma", "Ret: num: %d", count);
 977         return (count);
 978 
 979 out:
 980         rdsv3_ib_dma_unmap_sg_rdma(dev, num, scat);
 981         return (0);
 982 }
 983 
 984 int
 985 rdsv3_ib_xmit_rdma(struct rdsv3_connection *conn, struct rdsv3_rdma_op *op)
 986 {
 987         struct rdsv3_ib_connection *ic = conn->c_transport_data;
 988         struct rdsv3_ib_send_work *send = NULL;
 989         struct rdsv3_rdma_sg *scat;
 990         uint64_t remote_addr;
 991         uint32_t pos;
 992         uint32_t work_alloc;
 993         uint32_t i, j, k, idx;
 994         uint32_t left, count;
 995         uint32_t posted;
 996         int sent;
 997         ibt_status_t status;
 998         ibt_send_wr_t *wr;
 999         ibt_wr_ds_t *sge;
1000 
1001         RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", "rdsv3_ib_conn: %p", ic);
1002 
1003         /* map the message the first time we see it */
1004         if (!op->r_mapped) {
1005                 op->r_count = rdsv3_ib_dma_map_sg_rdma(ic->i_cm_id->device,
1006                     op->r_rdma_sg, op->r_nents, &op->r_sg);
1007                 RDSV3_DPRINTF5("rdsv3_ib_xmit_rdma", "ic %p mapping op %p: %d",
1008                     ic, op, op->r_count);
1009                 if (op->r_count == 0) {
1010                         rdsv3_ib_stats_inc(s_ib_tx_sg_mapping_failure);
1011                         RDSV3_DPRINTF2("rdsv3_ib_xmit_rdma",
1012                             "fail: ic %p mapping op %p: %d",
1013                             ic, op, op->r_count);
1014                         return (-ENOMEM); /* XXX ? */
1015                 }
1016                 op->r_mapped = 1;
1017         }
1018 
1019         /*
1020          * Instead of knowing how to return a partial rdma read/write
1021          * we insist that there
1022          * be enough work requests to send the entire message.
1023          */
1024         work_alloc = rdsv3_ib_ring_alloc(&ic->i_send_ring, op->r_count, &pos);
1025         if (work_alloc != op->r_count) {
1026                 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
1027                 rdsv3_ib_stats_inc(s_ib_tx_ring_full);
1028                 return (-ENOMEM);
1029         }
1030 
1031         RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", "pos %u cnt %u", pos, op->r_count);
1032         /*
1033          * take the scatter list and transpose into a list of
1034          * send wr's each with a scatter list of RDSV3_IB_MAX_SGE
1035          */
1036         scat = &op->r_rdma_sg[0];
1037         sent = 0;
1038         remote_addr = op->r_remote_addr;
1039 
1040         for (i = 0, k = 0; i < op->r_nents; i++) {
1041                 left = scat[i].swr.wr_nds;
1042                 for (idx = 0; left > 0; k++) {
1043                         send = &ic->i_sends[pos];
1044                         send->s_queued = jiffies;
1045                         send->s_opcode = op->r_write ? IBT_WRC_RDMAW :
1046                             IBT_WRC_RDMAR;
1047                         send->s_op = op;
1048 
1049                         wr = &ic->i_send_wrs[k];
1050                         wr->wr_flags = 0;
1051                         wr->wr_id = pos | RDSV3_IB_SEND_OP;
1052                         wr->wr_trans = IBT_RC_SRV;
1053                         wr->wr_opcode = op->r_write ? IBT_WRC_RDMAW :
1054                             IBT_WRC_RDMAR;
1055                         wr->wr.rc.rcwr.rdma.rdma_raddr = remote_addr;
1056                         wr->wr.rc.rcwr.rdma.rdma_rkey = op->r_key;
1057 
1058                         if (left > RDSV3_IB_MAX_SGE) {
1059                                 count = RDSV3_IB_MAX_SGE;
1060                                 left -= RDSV3_IB_MAX_SGE;
1061                         } else {
1062                                 count = left;
1063                                 left = 0;
1064                         }
1065                         wr->wr_nds = count;
1066 
1067                         for (j = 0; j < count; j++) {
1068                                 sge = &wr->wr_sgl[j];
1069                                 *sge = scat[i].swr.wr_sgl[idx];
1070                                 remote_addr += scat[i].swr.wr_sgl[idx].ds_len;
1071                                 sent += scat[i].swr.wr_sgl[idx].ds_len;
1072                                 idx++;
1073                                 RDSV3_DPRINTF5("xmit_rdma",
1074                                     "send_wrs[%d]sgl[%d] va %llx len %x",
1075                                     k, j, sge->ds_va, sge->ds_len);
1076                         }
1077                         RDSV3_DPRINTF5("rdsv3_ib_xmit_rdma",
1078                             "wr[%d] %p key: %x code: %d tlen: %d",
1079                             k, wr, wr->wr.rc.rcwr.rdma.rdma_rkey,
1080                             wr->wr_opcode, sent);
1081 
1082                         /*
1083                          * We want to delay signaling completions just enough
1084                          * to get the batching benefits but not so much that
1085                          * we create dead time on the wire.
1086                          */
1087                         if (ic->i_unsignaled_wrs-- == 0) {
1088                                 ic->i_unsignaled_wrs =
1089                                     rdsv3_ib_sysctl_max_unsig_wrs;
1090                                 wr->wr_flags = IBT_WR_SEND_SIGNAL;
1091                         }
1092 
1093                         pos = (pos + 1) % ic->i_send_ring.w_nr;
1094                 }
1095         }
1096 
1097         status = ibt_post_send(ib_get_ibt_channel_hdl(ic->i_cm_id),
1098             ic->i_send_wrs, k, &posted);
1099         if (status != IBT_SUCCESS) {
1100                 RDSV3_DPRINTF2("rdsv3_ib_xmit_rdma",
1101                     "RDS/IB: rdma ib_post_send to %u.%u.%u.%u "
1102                     "returned %d", NIPQUAD(conn->c_faddr), status);
1103                 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
1104         }
1105         RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", "Ret: %p", ic);
1106         return (status);
1107 }
1108 
1109 void
1110 rdsv3_ib_xmit_complete(struct rdsv3_connection *conn)
1111 {
1112         struct rdsv3_ib_connection *ic = conn->c_transport_data;
1113 
1114         RDSV3_DPRINTF4("rdsv3_ib_xmit_complete", "conn: %p", conn);
1115 
1116         /*
1117          * We may have a pending ACK or window update we were unable
1118          * to send previously (due to flow control). Try again.
1119          */
1120         rdsv3_ib_attempt_ack(ic);
1121 }