1 /*
   2  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
   3  */
   4 
   5 /*
   6  * This file contains code imported from the OFED rds source file connection.c
   7  * Oracle elects to have and use the contents of connection.c under and governed
   8  * by the OpenIB.org BSD license (see below for full license text). However,
   9  * the following notice accompanied the original version of this file:
  10  */
  11 
  12 /*
  13  * Copyright (c) 2006 Oracle.  All rights reserved.
  14  *
  15  * This software is available to you under a choice of one of two
  16  * licenses.  You may choose to be licensed under the terms of the GNU
  17  * General Public License (GPL) Version 2, available from the file
  18  * COPYING in the main directory of this source tree, or the
  19  * OpenIB.org BSD license below:
  20  *
  21  *     Redistribution and use in source and binary forms, with or
  22  *     without modification, are permitted provided that the following
  23  *     conditions are met:
  24  *
  25  *      - Redistributions of source code must retain the above
  26  *        copyright notice, this list of conditions and the following
  27  *        disclaimer.
  28  *
  29  *      - Redistributions in binary form must reproduce the above
  30  *        copyright notice, this list of conditions and the following
  31  *        disclaimer in the documentation and/or other materials
  32  *        provided with the distribution.
  33  *
  34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  35  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  36  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  37  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  38  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  39  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  40  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  41  * SOFTWARE.
  42  *
  43  */
  44 #include <sys/types.h>
  45 #include <sys/kmem.h>
  46 #include <sys/rds.h>
  47 
  48 #include <sys/ib/clients/rdsv3/rdsv3.h>
  49 #include <sys/ib/clients/rdsv3/loop.h>
  50 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
  51 
  52 /* converting this to RCU is a chore for another day.. */
  53 static krwlock_t rdsv3_conn_lock;
  54 struct avl_tree rdsv3_conn_hash;
  55 static struct kmem_cache *rdsv3_conn_slab = NULL;
  56 
  57 #define rdsv3_conn_info_set(var, test, suffix) do {               \
  58         if (test)                                               \
  59                 var |= RDS_INFO_CONNECTION_FLAG_##suffix;     \
  60 } while (0)
  61 
  62 
  63 static struct rdsv3_connection *
  64 rdsv3_conn_lookup(uint32_be_t laddr, uint32_be_t faddr, avl_index_t *pos)
  65 {
  66         struct rdsv3_connection *conn;
  67         struct rdsv3_conn_info_s conn_info;
  68         avl_index_t place = 0;
  69 
  70         conn_info.c_laddr = laddr;
  71         conn_info.c_faddr = faddr;
  72 
  73         conn = avl_find(&rdsv3_conn_hash, &conn_info, &place);
  74 
  75         RDSV3_DPRINTF5("rdsv3_conn_lookup",
  76             "returning conn %p for %u.%u.%u.%u -> %u.%u.%u.%u",
  77             conn, NIPQUAD(laddr), NIPQUAD(faddr));
  78 
  79         if (pos != NULL)
  80                 *pos = place;
  81 
  82         return (conn);
  83 }
  84 
  85 /*
  86  * This is called by transports as they're bringing down a connection.
  87  * It clears partial message state so that the transport can start sending
  88  * and receiving over this connection again in the future.  It is up to
  89  * the transport to have serialized this call with its send and recv.
  90  */
  91 void
  92 rdsv3_conn_reset(struct rdsv3_connection *conn)
  93 {
  94         RDSV3_DPRINTF2("rdsv3_conn_reset",
  95             "connection %u.%u.%u.%u to %u.%u.%u.%u reset",
  96             NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr));
  97 
  98         rdsv3_stats_inc(s_conn_reset);
  99         rdsv3_send_reset(conn);
 100         conn->c_flags = 0;
 101 
 102         /*
 103          * Do not clear next_rx_seq here, else we cannot distinguish
 104          * retransmitted packets from new packets, and will hand all
 105          * of them to the application. That is not consistent with the
 106          * reliability guarantees of RDS.
 107          */
 108 }
 109 
 110 /*
 111  * There is only every one 'conn' for a given pair of addresses in the
 112  * system at a time.  They contain messages to be retransmitted and so
 113  * span the lifetime of the actual underlying transport connections.
 114  *
 115  * For now they are not garbage collected once they're created.  They
 116  * are torn down as the module is removed, if ever.
 117  */
 118 static struct rdsv3_connection *
 119 __rdsv3_conn_create(uint32_be_t laddr, uint32_be_t faddr,
 120     struct rdsv3_transport *trans, int gfp, int is_outgoing)
 121 {
 122         struct rdsv3_connection *conn, *parent = NULL;
 123         avl_index_t pos;
 124         int ret;
 125 
 126         rw_enter(&rdsv3_conn_lock, RW_READER);
 127         conn = rdsv3_conn_lookup(laddr, faddr, &pos);
 128         if (conn &&
 129             conn->c_loopback &&
 130             conn->c_trans != &rdsv3_loop_transport &&
 131             !is_outgoing) {
 132                 /*
 133                  * This is a looped back IB connection, and we're
 134                  * called by the code handling the incoming connect.
 135                  * We need a second connection object into which we
 136                  * can stick the other QP.
 137                  */
 138                 parent = conn;
 139                 conn = parent->c_passive;
 140         }
 141         rw_exit(&rdsv3_conn_lock);
 142         if (conn)
 143                 goto out;
 144 
 145         RDSV3_DPRINTF2("__rdsv3_conn_create", "Enter(%x -> %x)",
 146             ntohl(laddr), ntohl(faddr));
 147 
 148         conn = kmem_cache_alloc(rdsv3_conn_slab, gfp);
 149         if (!conn) {
 150                 conn = ERR_PTR(-ENOMEM);
 151                 goto out;
 152         }
 153 
 154         /* see rdsv3_conn_constructor */
 155         conn->c_laddr = laddr;
 156         conn->c_faddr = faddr;
 157 
 158         /*
 159          * We don't allow sockets to send messages without binding.
 160          * So, the IP address will already be there in the bind array.
 161          * Mostly, this is a readonly operation.
 162          * For now, passing GLOBAL_ZONEID.
 163          */
 164         conn->c_bucketp = rdsv3_find_ip_bucket(ntohl(laddr), GLOBAL_ZONEID);
 165 
 166         ret = rdsv3_cong_get_maps(conn);
 167         if (ret) {
 168                 kmem_cache_free(rdsv3_conn_slab, conn);
 169                 conn = ERR_PTR(ret);
 170                 goto out;
 171         }
 172 
 173         /*
 174          * This is where a connection becomes loopback.  If *any* RDS sockets
 175          * can bind to the destination address then we'd rather the messages
 176          * flow through loopback rather than either transport.
 177          */
 178         if (rdsv3_trans_get_preferred(faddr)) {
 179                 conn->c_loopback = 1;
 180                 if (is_outgoing && trans->t_prefer_loopback) {
 181                         /*
 182                          * "outgoing" connection - and the transport
 183                          * says it wants the connection handled by the
 184                          * loopback transport. This is what TCP does.
 185                          */
 186                         trans = &rdsv3_loop_transport;
 187                 }
 188         }
 189 
 190         conn->c_trans = trans;
 191 
 192         ret = trans->conn_alloc(conn, gfp);
 193         if (ret) {
 194                 kmem_cache_free(rdsv3_conn_slab, conn);
 195                 conn = ERR_PTR(ret);
 196                 goto out;
 197         }
 198 
 199         conn->c_state = RDSV3_CONN_DOWN;
 200         conn->c_reconnect_jiffies = 0;
 201         RDSV3_INIT_DELAYED_WORK(&conn->c_send_w, rdsv3_send_worker);
 202         RDSV3_INIT_DELAYED_WORK(&conn->c_recv_w, rdsv3_recv_worker);
 203         RDSV3_INIT_DELAYED_WORK(&conn->c_conn_w, rdsv3_connect_worker);
 204         RDSV3_INIT_DELAYED_WORK(&conn->c_reap_w, rdsv3_reaper_worker);
 205         RDSV3_INIT_WORK(&conn->c_down_w, rdsv3_shutdown_worker);
 206         mutex_init(&conn->c_cm_lock, NULL, MUTEX_DRIVER, NULL);
 207         conn->c_flags = 0;
 208 
 209         RDSV3_DPRINTF2("__rdsv3_conn_create",
 210             "allocated conn %p for %u.%u.%u.%u -> %u.%u.%u.%u over %s %s",
 211             conn, NIPQUAD(laddr), NIPQUAD(faddr),
 212             trans->t_name ? trans->t_name : "[unknown]",
 213             is_outgoing ? "(outgoing)" : "");
 214 
 215         /*
 216          * Since we ran without holding the conn lock, someone could
 217          * have created the same conn (either normal or passive) in the
 218          * interim. We check while holding the lock. If we won, we complete
 219          * init and return our conn. If we lost, we rollback and return the
 220          * other one.
 221          */
 222         rw_enter(&rdsv3_conn_lock, RW_WRITER);
 223         if (parent) {
 224                 /* Creating passive conn */
 225                 if (parent->c_passive) {
 226                         trans->conn_free(conn->c_transport_data);
 227                         kmem_cache_free(rdsv3_conn_slab, conn);
 228                         conn = parent->c_passive;
 229                 } else {
 230                         parent->c_passive = conn;
 231                         rdsv3_cong_add_conn(conn);
 232                 }
 233         } else {
 234                 /* Creating normal conn */
 235                 struct rdsv3_connection *found;
 236 
 237                 found = rdsv3_conn_lookup(laddr, faddr, &pos);
 238                 if (found) {
 239                         trans->conn_free(conn->c_transport_data);
 240                         kmem_cache_free(rdsv3_conn_slab, conn);
 241                         conn = found;
 242                 } else {
 243                         avl_insert(&rdsv3_conn_hash, conn, pos);
 244                         rdsv3_cong_add_conn(conn);
 245                         rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_reap_w,
 246                             RDSV3_REAPER_WAIT_JIFFIES);
 247                 }
 248         }
 249 
 250         rw_exit(&rdsv3_conn_lock);
 251 
 252         RDSV3_DPRINTF2("__rdsv3_conn_create", "Return(conn: %p)", conn);
 253 
 254 out:
 255         return (conn);
 256 }
 257 
 258 struct rdsv3_connection *
 259 rdsv3_conn_create(uint32_be_t laddr, uint32_be_t faddr,
 260     struct rdsv3_transport *trans, int gfp)
 261 {
 262         return (__rdsv3_conn_create(laddr, faddr, trans, gfp, 0));
 263 }
 264 
 265 struct rdsv3_connection *
 266 rdsv3_conn_create_outgoing(uint32_be_t laddr, uint32_be_t faddr,
 267     struct rdsv3_transport *trans, int gfp)
 268 {
 269         return (__rdsv3_conn_create(laddr, faddr, trans, gfp, 1));
 270 }
 271 
 272 extern struct avl_tree  rdsv3_conn_hash;
 273 
 274 void
 275 rdsv3_conn_shutdown(struct rdsv3_connection *conn)
 276 {
 277         RDSV3_DPRINTF2("rdsv3_conn_shutdown", "Enter(conn: %p)", conn);
 278 
 279         /* shut it down unless it's down already */
 280         if (!rdsv3_conn_transition(conn, RDSV3_CONN_DOWN, RDSV3_CONN_DOWN)) {
 281                 /*
 282                  * Quiesce the connection mgmt handlers before we start tearing
 283                  * things down. We don't hold the mutex for the entire
 284                  * duration of the shutdown operation, else we may be
 285                  * deadlocking with the CM handler. Instead, the CM event
 286                  * handler is supposed to check for state DISCONNECTING
 287                  */
 288                 mutex_enter(&conn->c_cm_lock);
 289                 if (!rdsv3_conn_transition(conn, RDSV3_CONN_UP,
 290                     RDSV3_CONN_DISCONNECTING) &&
 291                     !rdsv3_conn_transition(conn, RDSV3_CONN_ERROR,
 292                     RDSV3_CONN_DISCONNECTING)) {
 293                         RDSV3_DPRINTF2("rdsv3_conn_shutdown",
 294                             "shutdown called in state %d",
 295                             atomic_get(&conn->c_state));
 296                         rdsv3_conn_drop(conn);
 297                         mutex_exit(&conn->c_cm_lock);
 298                         return;
 299                 }
 300                 mutex_exit(&conn->c_cm_lock);
 301 
 302                 /* verify everybody's out of rds_send_xmit() */
 303                 mutex_enter(&conn->c_send_lock);
 304                 while (atomic_get(&conn->c_senders)) {
 305                         mutex_exit(&conn->c_send_lock);
 306                         delay(1);
 307                         mutex_enter(&conn->c_send_lock);
 308                 }
 309 
 310                 conn->c_trans->conn_shutdown(conn);
 311                 rdsv3_conn_reset(conn);
 312                 mutex_exit(&conn->c_send_lock);
 313 
 314                 if (!rdsv3_conn_transition(conn, RDSV3_CONN_DISCONNECTING,
 315                     RDSV3_CONN_DOWN)) {
 316                         /*
 317                          * This can happen - eg when we're in the middle of
 318                          * tearing down the connection, and someone unloads
 319                          * the rds module.
 320                          * Quite reproduceable with loopback connections.
 321                          * Mostly harmless.
 322                          */
 323 #ifndef __lock_lint
 324                         RDSV3_DPRINTF2("rdsv3_conn_shutdown",
 325                             "failed to transition to state DOWN, "
 326                             "current statis is: %d",
 327                             atomic_get(&conn->c_state));
 328                         rdsv3_conn_drop(conn);
 329 #endif
 330                         return;
 331                 }
 332         }
 333 
 334         /*
 335          * Then reconnect if it's still live.
 336          * The passive side of an IB loopback connection is never added
 337          * to the conn hash, so we never trigger a reconnect on this
 338          * conn - the reconnect is always triggered by the active peer.
 339          */
 340         rdsv3_cancel_delayed_work(&conn->c_conn_w);
 341 
 342         {
 343                 struct rdsv3_conn_info_s conn_info;
 344 
 345                 conn_info.c_laddr = conn->c_laddr;
 346                 conn_info.c_faddr = conn->c_faddr;
 347                 if (avl_find(&rdsv3_conn_hash, &conn_info, NULL) == conn)
 348                         rdsv3_queue_reconnect(conn);
 349         }
 350         RDSV3_DPRINTF2("rdsv3_conn_shutdown", "Exit");
 351 }
 352 
 353 /*
 354  * Stop and free a connection.
 355  */
 356 void
 357 rdsv3_conn_destroy(struct rdsv3_connection *conn)
 358 {
 359         struct rdsv3_message *rm, *rtmp;
 360         list_t to_be_dropped;
 361 
 362         RDSV3_DPRINTF4("rdsv3_conn_destroy",
 363             "freeing conn %p for %u.%u.%u.%u -> %u.%u.%u.%u",
 364             conn, NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr));
 365 
 366         avl_remove(&rdsv3_conn_hash, conn);
 367 
 368         rdsv3_cancel_delayed_work(&conn->c_reap_w);
 369         rdsv3_cancel_delayed_work(&conn->c_send_w);
 370         rdsv3_cancel_delayed_work(&conn->c_recv_w);
 371 
 372         rdsv3_conn_shutdown(conn);
 373 
 374         /* tear down queued messages */
 375 
 376         list_create(&to_be_dropped, sizeof (struct rdsv3_message),
 377             offsetof(struct rdsv3_message, m_conn_item));
 378 
 379         RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, rtmp, &conn->c_retrans, m_conn_item) {
 380                 list_remove_node(&rm->m_conn_item);
 381                 list_insert_tail(&to_be_dropped, rm);
 382         }
 383 
 384         RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, rtmp, &conn->c_send_queue,
 385             m_conn_item) {
 386                 list_remove_node(&rm->m_conn_item);
 387                 list_insert_tail(&to_be_dropped, rm);
 388         }
 389 
 390         RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, rtmp, &to_be_dropped, m_conn_item) {
 391                 clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
 392                 list_remove_node(&rm->m_conn_item);
 393                 rdsv3_message_put(rm);
 394         }
 395 
 396         if (conn->c_xmit_rm)
 397                 rdsv3_message_put(conn->c_xmit_rm);
 398 
 399         conn->c_trans->conn_free(conn->c_transport_data);
 400 
 401         /*
 402          * The congestion maps aren't freed up here.  They're
 403          * freed by rdsv3_cong_exit() after all the connections
 404          * have been freed.
 405          */
 406         rdsv3_cong_remove_conn(conn);
 407 
 408         ASSERT(list_is_empty(&conn->c_retrans));
 409         kmem_cache_free(rdsv3_conn_slab, conn);
 410 
 411 }
 412 
 413 /* ARGSUSED */
 414 static void
 415 rdsv3_conn_message_info(struct rsock *sock, unsigned int len,
 416     struct rdsv3_info_iterator *iter,
 417     struct rdsv3_info_lengths *lens,
 418     int want_send)
 419 {
 420         struct list *list;
 421         struct rdsv3_connection *conn;
 422         struct rdsv3_message *rm;
 423         unsigned int total = 0;
 424 
 425         RDSV3_DPRINTF4("rdsv3_conn_message_info", "Enter");
 426 
 427         len /= sizeof (struct rds_info_message);
 428 
 429         rw_enter(&rdsv3_conn_lock, RW_READER);
 430 
 431         if (avl_is_empty(&rdsv3_conn_hash)) {
 432                 /* no connections */
 433                 rw_exit(&rdsv3_conn_lock);
 434                 return;
 435         }
 436 
 437         conn = (struct rdsv3_connection *)avl_first(&rdsv3_conn_hash);
 438 
 439         do {
 440                 if (want_send)
 441                         list = &conn->c_send_queue;
 442                 else
 443                         list = &conn->c_retrans;
 444 
 445                 mutex_enter(&conn->c_lock);
 446 
 447                 /* XXX too lazy to maintain counts.. */
 448                 RDSV3_FOR_EACH_LIST_NODE(rm, list, m_conn_item) {
 449                         total++;
 450                         if (total <= len)
 451                                 rdsv3_inc_info_copy(&rm->m_inc, iter,
 452                                     conn->c_laddr, conn->c_faddr, 0);
 453                 }
 454 
 455                 mutex_exit(&conn->c_lock);
 456 
 457                 conn = AVL_NEXT(&rdsv3_conn_hash, conn);
 458         } while (conn != NULL);
 459         rw_exit(&rdsv3_conn_lock);
 460 
 461         lens->nr = total;
 462         lens->each = sizeof (struct rds_info_message);
 463 
 464         RDSV3_DPRINTF4("rdsv3_conn_message_info", "Return");
 465 }
 466 
 467 static void
 468 rdsv3_conn_message_info_send(struct rsock *sock, unsigned int len,
 469     struct rdsv3_info_iterator *iter,
 470     struct rdsv3_info_lengths *lens)
 471 {
 472         rdsv3_conn_message_info(sock, len, iter, lens, 1);
 473 }
 474 
 475 static void
 476 rdsv3_conn_message_info_retrans(struct rsock *sock,
 477     unsigned int len,
 478     struct rdsv3_info_iterator *iter,
 479     struct rdsv3_info_lengths *lens)
 480 {
 481         rdsv3_conn_message_info(sock, len, iter, lens, 0);
 482 }
 483 
 484 /* ARGSUSED */
 485 void
 486 rdsv3_for_each_conn_info(struct rsock *sock, unsigned int len,
 487     struct rdsv3_info_iterator *iter,
 488     struct rdsv3_info_lengths *lens,
 489     int (*visitor)(struct rdsv3_connection *, void *),
 490     size_t item_len)
 491 {
 492         uint8_t *buffer;
 493         struct rdsv3_connection *conn;
 494 
 495         rw_enter(&rdsv3_conn_lock, RW_READER);
 496 
 497         lens->nr = 0;
 498         lens->each = item_len;
 499 
 500         if (avl_is_empty(&rdsv3_conn_hash)) {
 501                 /* no connections */
 502                 rw_exit(&rdsv3_conn_lock);
 503                 return;
 504         }
 505 
 506         /* allocate a little extra as this can get cast to a uint64_t */
 507         buffer = kmem_zalloc(item_len + 8, KM_SLEEP);
 508 
 509         conn = (struct rdsv3_connection *)avl_first(&rdsv3_conn_hash);
 510 
 511         do {
 512                 /* XXX no c_lock usage.. */
 513                 if (visitor(conn, buffer)) {
 514                         /*
 515                          * We copy as much as we can fit in the buffer,
 516                          * but we count all items so that the caller
 517                          * can resize the buffer.
 518                          */
 519                         if (len >= item_len) {
 520                                 RDSV3_DPRINTF4("rdsv3_for_each_conn_info",
 521                                     "buffer: %p iter: %p bytes: %d", buffer,
 522                                     iter->addr + iter->offset, item_len);
 523                                 rdsv3_info_copy(iter, buffer, item_len);
 524                                 len -= item_len;
 525                         }
 526                         lens->nr++;
 527                 }
 528                 conn = AVL_NEXT(&rdsv3_conn_hash, conn);
 529         } while (conn != NULL);
 530         rw_exit(&rdsv3_conn_lock);
 531 
 532         kmem_free(buffer, item_len + 8);
 533 }
 534 
 535 static int
 536 rdsv3_conn_info_visitor(struct rdsv3_connection *conn, void *buffer)
 537 {
 538         struct rds_info_connection *cinfo = buffer;
 539 
 540         cinfo->next_tx_seq = conn->c_next_tx_seq;
 541         cinfo->next_rx_seq = conn->c_next_rx_seq;
 542         cinfo->laddr = conn->c_laddr;
 543         cinfo->faddr = conn->c_faddr;
 544         (void) strncpy((char *)cinfo->transport, conn->c_trans->t_name,
 545             sizeof (cinfo->transport));
 546         cinfo->flags = 0;
 547 
 548         rdsv3_conn_info_set(cinfo->flags,
 549             MUTEX_HELD(&conn->c_send_lock), SENDING);
 550 
 551         /* XXX Future: return the state rather than these funky bits */
 552         rdsv3_conn_info_set(cinfo->flags,
 553             atomic_get(&conn->c_state) == RDSV3_CONN_CONNECTING,
 554             CONNECTING);
 555         rdsv3_conn_info_set(cinfo->flags,
 556             atomic_get(&conn->c_state) == RDSV3_CONN_UP,
 557             CONNECTED);
 558         return (1);
 559 }
 560 
 561 static void
 562 rdsv3_conn_info(struct rsock *sock, unsigned int len,
 563     struct rdsv3_info_iterator *iter, struct rdsv3_info_lengths *lens)
 564 {
 565         rdsv3_for_each_conn_info(sock, len, iter, lens,
 566             rdsv3_conn_info_visitor, sizeof (struct rds_info_connection));
 567 }
 568 
 569 int
 570 rdsv3_conn_init()
 571 {
 572         RDSV3_DPRINTF4("rdsv3_conn_init", "Enter");
 573 
 574         rdsv3_conn_slab = kmem_cache_create("rdsv3_connection",
 575             sizeof (struct rdsv3_connection), 0, rdsv3_conn_constructor,
 576             rdsv3_conn_destructor, NULL, NULL, NULL, 0);
 577         if (!rdsv3_conn_slab) {
 578                 RDSV3_DPRINTF2("rdsv3_conn_init",
 579                     "kmem_cache_create(rdsv3_conn_slab) failed");
 580                 return (-ENOMEM);
 581         }
 582 
 583         avl_create(&rdsv3_conn_hash, rdsv3_conn_compare,
 584             sizeof (struct rdsv3_connection), offsetof(struct rdsv3_connection,
 585             c_hash_node));
 586 
 587         rw_init(&rdsv3_conn_lock, NULL, RW_DRIVER, NULL);
 588 
 589         rdsv3_loop_init();
 590 
 591         rdsv3_info_register_func(RDS_INFO_CONNECTIONS, rdsv3_conn_info);
 592         rdsv3_info_register_func(RDS_INFO_SEND_MESSAGES,
 593             rdsv3_conn_message_info_send);
 594         rdsv3_info_register_func(RDS_INFO_RETRANS_MESSAGES,
 595             rdsv3_conn_message_info_retrans);
 596 
 597         RDSV3_DPRINTF4("rdsv3_conn_init", "Return");
 598 
 599         return (0);
 600 }
 601 
 602 void
 603 rdsv3_conn_exit()
 604 {
 605         RDSV3_DPRINTF4("rdsv3_conn_exit", "Enter");
 606 
 607         rdsv3_loop_exit();
 608 
 609         rw_destroy(&rdsv3_conn_lock);
 610         avl_destroy(&rdsv3_conn_hash);
 611 
 612         ASSERT(rdsv3_conn_slab);
 613         kmem_cache_destroy(rdsv3_conn_slab);
 614 
 615         RDSV3_DPRINTF4("rdsv3_conn_exit", "Return");
 616 }
 617 
 618 /*
 619  * Force a disconnect
 620  */
 621 void
 622 rdsv3_conn_drop(struct rdsv3_connection *conn)
 623 {
 624         conn->c_state = RDSV3_CONN_ERROR;
 625         rdsv3_queue_work(rdsv3_wq, &conn->c_down_w);
 626 }