1 /*
   2  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
   3  */
   4 
   5 /*
   6  * This file contains code imported from the OFED rds source file connection.c
   7  * Oracle elects to have and use the contents of connection.c under and governed
   8  * by the OpenIB.org BSD license (see below for full license text). However,
   9  * the following notice accompanied the original version of this file:
  10  */
  11 
  12 /*
  13  * Copyright (c) 2006 Oracle.  All rights reserved.
  14  *
  15  * This software is available to you under a choice of one of two
  16  * licenses.  You may choose to be licensed under the terms of the GNU
  17  * General Public License (GPL) Version 2, available from the file
  18  * COPYING in the main directory of this source tree, or the
  19  * OpenIB.org BSD license below:
  20  *
  21  *     Redistribution and use in source and binary forms, with or
  22  *     without modification, are permitted provided that the following
  23  *     conditions are met:
  24  *
  25  *      - Redistributions of source code must retain the above
  26  *        copyright notice, this list of conditions and the following
  27  *        disclaimer.
  28  *
  29  *      - Redistributions in binary form must reproduce the above
  30  *        copyright notice, this list of conditions and the following
  31  *        disclaimer in the documentation and/or other materials
  32  *        provided with the distribution.
  33  *
  34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  35  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  36  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  37  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  38  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  39  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  40  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  41  * SOFTWARE.
  42  *
  43  */
  44 #include <sys/types.h>
  45 #include <sys/kmem.h>
  46 #include <sys/rds.h>
  47 
  48 #include <sys/ib/clients/rdsv3/rdsv3.h>
  49 #include <sys/ib/clients/rdsv3/loop.h>
  50 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
  51 
  52 /* converting this to RCU is a chore for another day.. */
  53 static krwlock_t rdsv3_conn_lock;
  54 struct avl_tree rdsv3_conn_hash;
  55 static struct kmem_cache *rdsv3_conn_slab = NULL;
  56 
  57 #define rdsv3_conn_info_set(var, test, suffix) do {               \
  58         if (test)                                               \
  59                 var |= RDS_INFO_CONNECTION_FLAG_##suffix;     \
  60 } while (0)
  61 
  62 
  63 static struct rdsv3_connection *
  64 rdsv3_conn_lookup(uint32_be_t laddr, uint32_be_t faddr, avl_index_t *pos)
  65 {
  66         struct rdsv3_connection *conn;
  67         struct rdsv3_conn_info_s conn_info;
  68         avl_index_t place = 0;
  69 
  70         conn_info.c_laddr = laddr;
  71         conn_info.c_faddr = faddr;
  72 
  73         conn = avl_find(&rdsv3_conn_hash, &conn_info, &place);
  74 
  75         RDSV3_DPRINTF5("rdsv3_conn_lookup",
  76             "returning conn %p for %u.%u.%u.%u -> %u.%u.%u.%u",
  77             conn, NIPQUAD(laddr), NIPQUAD(faddr));
  78 
  79         if (pos != NULL)
  80                 *pos = place;
  81 
  82         return (conn);
  83 }
  84 
  85 /*
  86  * This is called by transports as they're bringing down a connection.
  87  * It clears partial message state so that the transport can start sending
  88  * and receiving over this connection again in the future.  It is up to
  89  * the transport to have serialized this call with its send and recv.
  90  */
  91 void
  92 rdsv3_conn_reset(struct rdsv3_connection *conn)
  93 {
  94         RDSV3_DPRINTF2("rdsv3_conn_reset",
  95             "connection %u.%u.%u.%u to %u.%u.%u.%u reset",
  96             NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr));
  97 
  98         rdsv3_stats_inc(s_conn_reset);
  99         rdsv3_send_reset(conn);
 100         conn->c_flags = 0;
 101 
 102         /*
 103          * Do not clear next_rx_seq here, else we cannot distinguish
 104          * retransmitted packets from new packets, and will hand all
 105          * of them to the application. That is not consistent with the
 106          * reliability guarantees of RDS.
 107          */
 108 }
 109 
 110 /*
 111  * There is only every one 'conn' for a given pair of addresses in the
 112  * system at a time.  They contain messages to be retransmitted and so
 113  * span the lifetime of the actual underlying transport connections.
 114  *
 115  * For now they are not garbage collected once they're created.  They
 116  * are torn down as the module is removed, if ever.
 117  */
 118 static struct rdsv3_connection *
 119 __rdsv3_conn_create(uint32_be_t laddr, uint32_be_t faddr,
 120     struct rdsv3_transport *trans, int gfp, int is_outgoing)
 121 {
 122         struct rdsv3_connection *conn, *parent = NULL;
 123         avl_index_t pos;
 124         int ret;
 125 
 126         rw_enter(&rdsv3_conn_lock, RW_READER);
 127         conn = rdsv3_conn_lookup(laddr, faddr, &pos);
 128         if (conn &&
 129             conn->c_loopback &&
 130             conn->c_trans != &rdsv3_loop_transport &&
 131             !is_outgoing) {
 132                 /*
 133                  * This is a looped back IB connection, and we're
 134                  * called by the code handling the incoming connect.
 135                  * We need a second connection object into which we
 136                  * can stick the other QP.
 137                  */
 138                 parent = conn;
 139                 conn = parent->c_passive;
 140         }
 141         rw_exit(&rdsv3_conn_lock);
 142         if (conn)
 143                 goto out;
 144 
 145         RDSV3_DPRINTF2("__rdsv3_conn_create", "Enter(%x -> %x)",
 146             ntohl(laddr), ntohl(faddr));
 147 
 148         conn = kmem_cache_alloc(rdsv3_conn_slab, gfp);
 149         if (!conn) {
 150                 conn = ERR_PTR(-ENOMEM);
 151                 goto out;
 152         }
 153 
 154         /* see rdsv3_conn_constructor */
 155         conn->c_laddr = laddr;
 156         conn->c_faddr = faddr;
 157 
 158         /*
 159          * We don't allow sockets to send messages without binding.
 160          * So, the IP address will already be there in the bind array.
 161          * Mostly, this is a readonly operation.
 162          * For now, passing GLOBAL_ZONEID.
 163          */
 164         conn->c_bucketp = rdsv3_find_ip_bucket(ntohl(laddr), GLOBAL_ZONEID);
 165 
 166         ret = rdsv3_cong_get_maps(conn);
 167         if (ret) {
 168                 kmem_cache_free(rdsv3_conn_slab, conn);
 169                 conn = ERR_PTR(ret);
 170                 goto out;
 171         }
 172 
 173         /*
 174          * This is where a connection becomes loopback.  If *any* RDS sockets
 175          * can bind to the destination address then we'd rather the messages
 176          * flow through loopback rather than either transport.
 177          */
 178         if (rdsv3_trans_get_preferred(faddr)) {
 179                 conn->c_loopback = 1;
 180                 if (is_outgoing && trans->t_prefer_loopback) {
 181                         /*
 182                          * "outgoing" connection - and the transport
 183                          * says it wants the connection handled by the
 184                          * loopback transport. This is what TCP does.
 185                          */
 186                         trans = &rdsv3_loop_transport;
 187                 }
 188         }
 189 
 190         conn->c_trans = trans;
 191 
 192         ret = trans->conn_alloc(conn, gfp);
 193         if (ret) {
 194                 kmem_cache_free(rdsv3_conn_slab, conn);
 195                 conn = ERR_PTR(ret);
 196                 goto out;
 197         }
 198 
 199         conn->c_state = RDSV3_CONN_DOWN;
 200         conn->c_reconnect_jiffies = 0;
 201         RDSV3_INIT_DELAYED_WORK(&conn->c_send_w, rdsv3_send_worker);
 202         RDSV3_INIT_DELAYED_WORK(&conn->c_recv_w, rdsv3_recv_worker);
 203         RDSV3_INIT_DELAYED_WORK(&conn->c_conn_w, rdsv3_connect_worker);
 204         RDSV3_INIT_DELAYED_WORK(&conn->c_reap_w, rdsv3_reaper_worker);
 205         RDSV3_INIT_WORK(&conn->c_down_w, rdsv3_shutdown_worker);
 206         mutex_init(&conn->c_cm_lock, NULL, MUTEX_DRIVER, NULL);
 207         conn->c_flags = 0;
 208 
 209         RDSV3_DPRINTF2("__rdsv3_conn_create",
 210             "allocated conn %p for %u.%u.%u.%u -> %u.%u.%u.%u over %s %s",
 211             conn, NIPQUAD(laddr), NIPQUAD(faddr),
 212             trans->t_name ? trans->t_name : "[unknown]",
 213             is_outgoing ? "(outgoing)" : "");
 214 
 215         /*
 216          * Since we ran without holding the conn lock, someone could
 217          * have created the same conn (either normal or passive) in the
 218          * interim. We check while holding the lock. If we won, we complete
 219          * init and return our conn. If we lost, we rollback and return the
 220          * other one.
 221          */
 222         rw_enter(&rdsv3_conn_lock, RW_WRITER);
 223         if (parent) {
 224                 /* Creating passive conn */
 225                 if (parent->c_passive) {
 226                         trans->conn_free(conn->c_transport_data);
 227                         kmem_cache_free(rdsv3_conn_slab, conn);
 228                         conn = parent->c_passive;
 229                 } else {
 230                         parent->c_passive = conn;
 231                         rdsv3_cong_add_conn(conn);
 232                 }
 233         } else {
 234                 /* Creating normal conn */
 235                 struct rdsv3_connection *found;
 236 
 237                 found = rdsv3_conn_lookup(laddr, faddr, &pos);
 238                 if (found) {
 239                         trans->conn_free(conn->c_transport_data);
 240                         kmem_cache_free(rdsv3_conn_slab, conn);
 241                         conn = found;
 242                 } else {
 243                         avl_insert(&rdsv3_conn_hash, conn, pos);
 244                         rdsv3_cong_add_conn(conn);
 245                         rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_reap_w,
 246                             RDSV3_REAPER_WAIT_JIFFIES);
 247                 }
 248         }
 249 
 250         rw_exit(&rdsv3_conn_lock);
 251 
 252         RDSV3_DPRINTF2("__rdsv3_conn_create", "Return(conn: %p)", conn);
 253 
 254 out:
 255         return (conn);
 256 }
 257 
 258 struct rdsv3_connection *
 259 rdsv3_conn_create(uint32_be_t laddr, uint32_be_t faddr,
 260     struct rdsv3_transport *trans, int gfp)
 261 {
 262         return (__rdsv3_conn_create(laddr, faddr, trans, gfp, 0));
 263 }
 264 
 265 struct rdsv3_connection *
 266 rdsv3_conn_create_outgoing(uint32_be_t laddr, uint32_be_t faddr,
 267     struct rdsv3_transport *trans, int gfp)
 268 {
 269         return (__rdsv3_conn_create(laddr, faddr, trans, gfp, 1));
 270 }
 271 
 272 extern struct avl_tree  rdsv3_conn_hash;
 273 
 274 void
 275 rdsv3_conn_shutdown(struct rdsv3_connection *conn)
 276 {
 277         RDSV3_DPRINTF2("rdsv3_conn_shutdown", "Enter(conn: %p)", conn);
 278 
 279         /* shut it down unless it's down already */
 280         if (!rdsv3_conn_transition(conn, RDSV3_CONN_DOWN, RDSV3_CONN_DOWN)) {
 281                 /*
 282                  * Quiesce the connection mgmt handlers before we start tearing
 283                  * things down. We don't hold the mutex for the entire
 284                  * duration of the shutdown operation, else we may be
 285                  * deadlocking with the CM handler. Instead, the CM event
 286                  * handler is supposed to check for state DISCONNECTING
 287                  */
 288                 mutex_enter(&conn->c_cm_lock);
 289                 if (!rdsv3_conn_transition(conn, RDSV3_CONN_UP,
 290                     RDSV3_CONN_DISCONNECTING) &&
 291                     !rdsv3_conn_transition(conn, RDSV3_CONN_ERROR,
 292                     RDSV3_CONN_DISCONNECTING)) {
 293                         RDSV3_DPRINTF2("rdsv3_conn_shutdown",
 294                             "shutdown called in state %d",
 295                             atomic_get(&conn->c_state));
 296                         rdsv3_conn_drop(conn);
 297                         mutex_exit(&conn->c_cm_lock);
 298                         return;
 299                 }
 300                 mutex_exit(&conn->c_cm_lock);
 301 
 302                 /* verify everybody's out of rds_send_xmit() */
 303                 mutex_enter(&conn->c_send_lock);
 304                 while (atomic_get(&conn->c_senders)) {
 305                         mutex_exit(&conn->c_send_lock);
 306                         delay(1);
 307                         mutex_enter(&conn->c_send_lock);
 308                 }
 309 
 310                 conn->c_trans->conn_shutdown(conn);
 311                 rdsv3_conn_reset(conn);
 312                 mutex_exit(&conn->c_send_lock);
 313 
 314                 if (!rdsv3_conn_transition(conn, RDSV3_CONN_DISCONNECTING,
 315                     RDSV3_CONN_DOWN)) {
 316                         /*
 317                          * This can happen - eg when we're in the middle of
 318                          * tearing down the connection, and someone unloads
 319                          * the rds module.
 320                          * Quite reproduceable with loopback connections.
 321                          * Mostly harmless.
 322                          */
 323                         RDSV3_DPRINTF2("rdsv3_conn_shutdown",
 324                             "failed to transition to state DOWN, "
 325                             "current statis is: %d",
 326                             atomic_get(&conn->c_state));
 327                         rdsv3_conn_drop(conn);
 328                         return;
 329                 }
 330         }
 331 
 332         /*
 333          * Then reconnect if it's still live.
 334          * The passive side of an IB loopback connection is never added
 335          * to the conn hash, so we never trigger a reconnect on this
 336          * conn - the reconnect is always triggered by the active peer.
 337          */
 338         rdsv3_cancel_delayed_work(&conn->c_conn_w);
 339 
 340         {
 341                 struct rdsv3_conn_info_s conn_info;
 342 
 343                 conn_info.c_laddr = conn->c_laddr;
 344                 conn_info.c_faddr = conn->c_faddr;
 345                 if (avl_find(&rdsv3_conn_hash, &conn_info, NULL) == conn)
 346                         rdsv3_queue_reconnect(conn);
 347         }
 348         RDSV3_DPRINTF2("rdsv3_conn_shutdown", "Exit");
 349 }
 350 
 351 /*
 352  * Stop and free a connection.
 353  */
 354 void
 355 rdsv3_conn_destroy(struct rdsv3_connection *conn)
 356 {
 357         struct rdsv3_message *rm, *rtmp;
 358         list_t to_be_dropped;
 359 
 360         RDSV3_DPRINTF4("rdsv3_conn_destroy",
 361             "freeing conn %p for %u.%u.%u.%u -> %u.%u.%u.%u",
 362             conn, NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr));
 363 
 364         avl_remove(&rdsv3_conn_hash, conn);
 365 
 366         rdsv3_cancel_delayed_work(&conn->c_reap_w);
 367         rdsv3_cancel_delayed_work(&conn->c_send_w);
 368         rdsv3_cancel_delayed_work(&conn->c_recv_w);
 369 
 370         rdsv3_conn_shutdown(conn);
 371 
 372         /* tear down queued messages */
 373 
 374         list_create(&to_be_dropped, sizeof (struct rdsv3_message),
 375             offsetof(struct rdsv3_message, m_conn_item));
 376 
 377         RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, rtmp, &conn->c_retrans, m_conn_item) {
 378                 list_remove_node(&rm->m_conn_item);
 379                 list_insert_tail(&to_be_dropped, rm);
 380         }
 381 
 382         RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, rtmp, &conn->c_send_queue,
 383             m_conn_item) {
 384                 list_remove_node(&rm->m_conn_item);
 385                 list_insert_tail(&to_be_dropped, rm);
 386         }
 387 
 388         RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, rtmp, &to_be_dropped, m_conn_item) {
 389                 clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
 390                 list_remove_node(&rm->m_conn_item);
 391                 rdsv3_message_put(rm);
 392         }
 393 
 394         if (conn->c_xmit_rm)
 395                 rdsv3_message_put(conn->c_xmit_rm);
 396 
 397         conn->c_trans->conn_free(conn->c_transport_data);
 398 
 399         /*
 400          * The congestion maps aren't freed up here.  They're
 401          * freed by rdsv3_cong_exit() after all the connections
 402          * have been freed.
 403          */
 404         rdsv3_cong_remove_conn(conn);
 405 
 406         ASSERT(list_is_empty(&conn->c_retrans));
 407         kmem_cache_free(rdsv3_conn_slab, conn);
 408 
 409 }
 410 
 411 /* ARGSUSED */
 412 static void
 413 rdsv3_conn_message_info(struct rsock *sock, unsigned int len,
 414     struct rdsv3_info_iterator *iter,
 415     struct rdsv3_info_lengths *lens,
 416     int want_send)
 417 {
 418         struct list *list;
 419         struct rdsv3_connection *conn;
 420         struct rdsv3_message *rm;
 421         unsigned int total = 0;
 422 
 423         RDSV3_DPRINTF4("rdsv3_conn_message_info", "Enter");
 424 
 425         len /= sizeof (struct rds_info_message);
 426 
 427         rw_enter(&rdsv3_conn_lock, RW_READER);
 428 
 429         if (avl_is_empty(&rdsv3_conn_hash)) {
 430                 /* no connections */
 431                 rw_exit(&rdsv3_conn_lock);
 432                 return;
 433         }
 434 
 435         conn = (struct rdsv3_connection *)avl_first(&rdsv3_conn_hash);
 436 
 437         do {
 438                 if (want_send)
 439                         list = &conn->c_send_queue;
 440                 else
 441                         list = &conn->c_retrans;
 442 
 443                 mutex_enter(&conn->c_lock);
 444 
 445                 /* XXX too lazy to maintain counts.. */
 446                 RDSV3_FOR_EACH_LIST_NODE(rm, list, m_conn_item) {
 447                         total++;
 448                         if (total <= len)
 449                                 rdsv3_inc_info_copy(&rm->m_inc, iter,
 450                                     conn->c_laddr, conn->c_faddr, 0);
 451                 }
 452 
 453                 mutex_exit(&conn->c_lock);
 454 
 455                 conn = AVL_NEXT(&rdsv3_conn_hash, conn);
 456         } while (conn != NULL);
 457         rw_exit(&rdsv3_conn_lock);
 458 
 459         lens->nr = total;
 460         lens->each = sizeof (struct rds_info_message);
 461 
 462         RDSV3_DPRINTF4("rdsv3_conn_message_info", "Return");
 463 }
 464 
 465 static void
 466 rdsv3_conn_message_info_send(struct rsock *sock, unsigned int len,
 467     struct rdsv3_info_iterator *iter,
 468     struct rdsv3_info_lengths *lens)
 469 {
 470         rdsv3_conn_message_info(sock, len, iter, lens, 1);
 471 }
 472 
 473 static void
 474 rdsv3_conn_message_info_retrans(struct rsock *sock,
 475     unsigned int len,
 476     struct rdsv3_info_iterator *iter,
 477     struct rdsv3_info_lengths *lens)
 478 {
 479         rdsv3_conn_message_info(sock, len, iter, lens, 0);
 480 }
 481 
 482 /* ARGSUSED */
 483 void
 484 rdsv3_for_each_conn_info(struct rsock *sock, unsigned int len,
 485     struct rdsv3_info_iterator *iter,
 486     struct rdsv3_info_lengths *lens,
 487     int (*visitor)(struct rdsv3_connection *, void *),
 488     size_t item_len)
 489 {
 490         uint8_t *buffer;
 491         struct rdsv3_connection *conn;
 492 
 493         rw_enter(&rdsv3_conn_lock, RW_READER);
 494 
 495         lens->nr = 0;
 496         lens->each = item_len;
 497 
 498         if (avl_is_empty(&rdsv3_conn_hash)) {
 499                 /* no connections */
 500                 rw_exit(&rdsv3_conn_lock);
 501                 return;
 502         }
 503 
 504         /* allocate a little extra as this can get cast to a uint64_t */
 505         buffer = kmem_zalloc(item_len + 8, KM_SLEEP);
 506 
 507         conn = (struct rdsv3_connection *)avl_first(&rdsv3_conn_hash);
 508 
 509         do {
 510                 /* XXX no c_lock usage.. */
 511                 if (visitor(conn, buffer)) {
 512                         /*
 513                          * We copy as much as we can fit in the buffer,
 514                          * but we count all items so that the caller
 515                          * can resize the buffer.
 516                          */
 517                         if (len >= item_len) {
 518                                 RDSV3_DPRINTF4("rdsv3_for_each_conn_info",
 519                                     "buffer: %p iter: %p bytes: %d", buffer,
 520                                     iter->addr + iter->offset, item_len);
 521                                 rdsv3_info_copy(iter, buffer, item_len);
 522                                 len -= item_len;
 523                         }
 524                         lens->nr++;
 525                 }
 526                 conn = AVL_NEXT(&rdsv3_conn_hash, conn);
 527         } while (conn != NULL);
 528         rw_exit(&rdsv3_conn_lock);
 529 
 530         kmem_free(buffer, item_len + 8);
 531 }
 532 
 533 static int
 534 rdsv3_conn_info_visitor(struct rdsv3_connection *conn, void *buffer)
 535 {
 536         struct rds_info_connection *cinfo = buffer;
 537 
 538         cinfo->next_tx_seq = conn->c_next_tx_seq;
 539         cinfo->next_rx_seq = conn->c_next_rx_seq;
 540         cinfo->laddr = conn->c_laddr;
 541         cinfo->faddr = conn->c_faddr;
 542         (void) strncpy((char *)cinfo->transport, conn->c_trans->t_name,
 543             sizeof (cinfo->transport));
 544         cinfo->flags = 0;
 545 
 546         rdsv3_conn_info_set(cinfo->flags,
 547             MUTEX_HELD(&conn->c_send_lock), SENDING);
 548 
 549         /* XXX Future: return the state rather than these funky bits */
 550         rdsv3_conn_info_set(cinfo->flags,
 551             atomic_get(&conn->c_state) == RDSV3_CONN_CONNECTING,
 552             CONNECTING);
 553         rdsv3_conn_info_set(cinfo->flags,
 554             atomic_get(&conn->c_state) == RDSV3_CONN_UP,
 555             CONNECTED);
 556         return (1);
 557 }
 558 
 559 static void
 560 rdsv3_conn_info(struct rsock *sock, unsigned int len,
 561     struct rdsv3_info_iterator *iter, struct rdsv3_info_lengths *lens)
 562 {
 563         rdsv3_for_each_conn_info(sock, len, iter, lens,
 564             rdsv3_conn_info_visitor, sizeof (struct rds_info_connection));
 565 }
 566 
 567 int
 568 rdsv3_conn_init()
 569 {
 570         RDSV3_DPRINTF4("rdsv3_conn_init", "Enter");
 571 
 572         rdsv3_conn_slab = kmem_cache_create("rdsv3_connection",
 573             sizeof (struct rdsv3_connection), 0, rdsv3_conn_constructor,
 574             rdsv3_conn_destructor, NULL, NULL, NULL, 0);
 575         if (!rdsv3_conn_slab) {
 576                 RDSV3_DPRINTF2("rdsv3_conn_init",
 577                     "kmem_cache_create(rdsv3_conn_slab) failed");
 578                 return (-ENOMEM);
 579         }
 580 
 581         avl_create(&rdsv3_conn_hash, rdsv3_conn_compare,
 582             sizeof (struct rdsv3_connection), offsetof(struct rdsv3_connection,
 583             c_hash_node));
 584 
 585         rw_init(&rdsv3_conn_lock, NULL, RW_DRIVER, NULL);
 586 
 587         rdsv3_loop_init();
 588 
 589         rdsv3_info_register_func(RDS_INFO_CONNECTIONS, rdsv3_conn_info);
 590         rdsv3_info_register_func(RDS_INFO_SEND_MESSAGES,
 591             rdsv3_conn_message_info_send);
 592         rdsv3_info_register_func(RDS_INFO_RETRANS_MESSAGES,
 593             rdsv3_conn_message_info_retrans);
 594 
 595         RDSV3_DPRINTF4("rdsv3_conn_init", "Return");
 596 
 597         return (0);
 598 }
 599 
 600 void
 601 rdsv3_conn_exit()
 602 {
 603         RDSV3_DPRINTF4("rdsv3_conn_exit", "Enter");
 604 
 605         rdsv3_loop_exit();
 606 
 607         rw_destroy(&rdsv3_conn_lock);
 608         avl_destroy(&rdsv3_conn_hash);
 609 
 610         ASSERT(rdsv3_conn_slab);
 611         kmem_cache_destroy(rdsv3_conn_slab);
 612 
 613         RDSV3_DPRINTF4("rdsv3_conn_exit", "Return");
 614 }
 615 
 616 /*
 617  * Force a disconnect
 618  */
 619 void
 620 rdsv3_conn_drop(struct rdsv3_connection *conn)
 621 {
 622         conn->c_state = RDSV3_CONN_ERROR;
 623         rdsv3_queue_work(rdsv3_wq, &conn->c_down_w);
 624 }