1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 /*
  26  * Copyright 2012 Garrett D'Amore <garrett@damore.org>.  All rights reserved.
  27  * Copyright 2013 Nexenta Systems, Inc.  All rights reserved.
  28  * Copyright 2017 RackTop Systems.
  29  */
  30 
  31 #include <sys/taskq_impl.h>
  32 
  33 #include <sys/class.h>
  34 #include <sys/debug.h>
  35 #include <sys/ksynch.h>
  36 #include <sys/kmem.h>
  37 #include <sys/time.h>
  38 #include <sys/systm.h>
  39 #include <sys/sysmacros.h>
  40 #include <sys/unistd.h>
  41 
  42 /* avoid <sys/disp.h> */
  43 #define   maxclsyspri     99
  44 
  45 /* avoid <unistd.h> */
  46 extern long sysconf(int);
  47 
  48 /* avoiding <thread.h> */
  49 typedef unsigned int thread_t;
  50 typedef unsigned int thread_key_t;
  51 
  52 extern int thr_create(void *, size_t, void *(*)(void *), void *, long,
  53                         thread_t *);
  54 extern int thr_join(thread_t, thread_t *, void **);
  55 
  56 /*
  57  * POSIX.1c Note:
  58  * THR_BOUND is defined same as PTHREAD_SCOPE_SYSTEM in <pthread.h>
  59  * THR_DETACHED is defined same as PTHREAD_CREATE_DETACHED in <pthread.h>
  60  * Any changes in these definitions should be reflected in <pthread.h>
  61  */
  62 #define THR_BOUND               0x00000001      /* = PTHREAD_SCOPE_SYSTEM */
  63 #define THR_NEW_LWP             0x00000002
  64 #define THR_DETACHED            0x00000040      /* = PTHREAD_CREATE_DETACHED */
  65 #define THR_SUSPENDED           0x00000080
  66 #define THR_DAEMON              0x00000100
  67 
  68 
  69 int taskq_now;
  70 taskq_t *system_taskq;
  71 
  72 #define TASKQ_ACTIVE    0x00010000
  73 
  74 struct taskq {
  75         kmutex_t        tq_lock;
  76         krwlock_t       tq_threadlock;
  77         kcondvar_t      tq_dispatch_cv;
  78         kcondvar_t      tq_wait_cv;
  79         thread_t        *tq_threadlist;
  80         int             tq_flags;
  81         int             tq_active;
  82         int             tq_nthreads;
  83         int             tq_nalloc;
  84         int             tq_minalloc;
  85         int             tq_maxalloc;
  86         kcondvar_t      tq_maxalloc_cv;
  87         int             tq_maxalloc_wait;
  88         taskq_ent_t     *tq_freelist;
  89         taskq_ent_t     tq_task;
  90 };
  91 
  92 static taskq_ent_t *
  93 task_alloc(taskq_t *tq, int tqflags)
  94 {
  95         taskq_ent_t *t;
  96         int rv;
  97 
  98 again:  if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
  99                 tq->tq_freelist = t->tqent_next;
 100         } else {
 101                 if (tq->tq_nalloc >= tq->tq_maxalloc) {
 102                         if (!(tqflags & KM_SLEEP))
 103                                 return (NULL);
 104 
 105                         /*
 106                          * We don't want to exceed tq_maxalloc, but we can't
 107                          * wait for other tasks to complete (and thus free up
 108                          * task structures) without risking deadlock with
 109                          * the caller.  So, we just delay for one second
 110                          * to throttle the allocation rate. If we have tasks
 111                          * complete before one second timeout expires then
 112                          * taskq_ent_free will signal us and we will
 113                          * immediately retry the allocation.
 114                          */
 115                         tq->tq_maxalloc_wait++;
 116                         rv = cv_timedwait(&tq->tq_maxalloc_cv,
 117                             &tq->tq_lock, ddi_get_lbolt() + hz);
 118                         tq->tq_maxalloc_wait--;
 119                         if (rv > 0)
 120                                 goto again;             /* signaled */
 121                 }
 122                 mutex_exit(&tq->tq_lock);
 123 
 124                 t = kmem_alloc(sizeof (taskq_ent_t), tqflags);
 125 
 126                 mutex_enter(&tq->tq_lock);
 127                 if (t != NULL)
 128                         tq->tq_nalloc++;
 129         }
 130         return (t);
 131 }
 132 
 133 static void
 134 task_free(taskq_t *tq, taskq_ent_t *t)
 135 {
 136         if (tq->tq_nalloc <= tq->tq_minalloc) {
 137                 t->tqent_next = tq->tq_freelist;
 138                 tq->tq_freelist = t;
 139         } else {
 140                 tq->tq_nalloc--;
 141                 mutex_exit(&tq->tq_lock);
 142                 kmem_free(t, sizeof (taskq_ent_t));
 143                 mutex_enter(&tq->tq_lock);
 144         }
 145 
 146         if (tq->tq_maxalloc_wait)
 147                 cv_signal(&tq->tq_maxalloc_cv);
 148 }
 149 
 150 taskqid_t
 151 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
 152 {
 153         taskq_ent_t *t;
 154 
 155         if (taskq_now) {
 156                 func(arg);
 157                 return (1);
 158         }
 159 
 160         mutex_enter(&tq->tq_lock);
 161         ASSERT(tq->tq_flags & TASKQ_ACTIVE);
 162         if ((t = task_alloc(tq, tqflags)) == NULL) {
 163                 mutex_exit(&tq->tq_lock);
 164                 return (0);
 165         }
 166         if (tqflags & TQ_FRONT) {
 167                 t->tqent_next = tq->tq_task.tqent_next;
 168                 t->tqent_prev = &tq->tq_task;
 169         } else {
 170                 t->tqent_next = &tq->tq_task;
 171                 t->tqent_prev = tq->tq_task.tqent_prev;
 172         }
 173         t->tqent_next->tqent_prev = t;
 174         t->tqent_prev->tqent_next = t;
 175         t->tqent_func = func;
 176         t->tqent_arg = arg;
 177         t->tqent_flags = 0;
 178         cv_signal(&tq->tq_dispatch_cv);
 179         mutex_exit(&tq->tq_lock);
 180         return (1);
 181 }
 182 
 183 void
 184 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
 185     taskq_ent_t *t)
 186 {
 187         ASSERT(func != NULL);
 188         ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
 189 
 190         /*
 191          * Mark it as a prealloc'd task.  This is important
 192          * to ensure that we don't free it later.
 193          */
 194         t->tqent_flags |= TQENT_FLAG_PREALLOC;
 195         /*
 196          * Enqueue the task to the underlying queue.
 197          */
 198         mutex_enter(&tq->tq_lock);
 199 
 200         if (flags & TQ_FRONT) {
 201                 t->tqent_next = tq->tq_task.tqent_next;
 202                 t->tqent_prev = &tq->tq_task;
 203         } else {
 204                 t->tqent_next = &tq->tq_task;
 205                 t->tqent_prev = tq->tq_task.tqent_prev;
 206         }
 207         t->tqent_next->tqent_prev = t;
 208         t->tqent_prev->tqent_next = t;
 209         t->tqent_func = func;
 210         t->tqent_arg = arg;
 211         cv_signal(&tq->tq_dispatch_cv);
 212         mutex_exit(&tq->tq_lock);
 213 }
 214 
 215 void
 216 taskq_wait(taskq_t *tq)
 217 {
 218         mutex_enter(&tq->tq_lock);
 219         while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
 220                 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
 221         mutex_exit(&tq->tq_lock);
 222 }
 223 
 224 static void *
 225 taskq_thread(void *arg)
 226 {
 227         taskq_t *tq = arg;
 228         taskq_ent_t *t;
 229         boolean_t prealloc;
 230 
 231         mutex_enter(&tq->tq_lock);
 232         while (tq->tq_flags & TASKQ_ACTIVE) {
 233                 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
 234                         if (--tq->tq_active == 0)
 235                                 cv_broadcast(&tq->tq_wait_cv);
 236                         cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
 237                         tq->tq_active++;
 238                         continue;
 239                 }
 240                 t->tqent_prev->tqent_next = t->tqent_next;
 241                 t->tqent_next->tqent_prev = t->tqent_prev;
 242                 t->tqent_next = NULL;
 243                 t->tqent_prev = NULL;
 244                 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
 245                 mutex_exit(&tq->tq_lock);
 246 
 247                 rw_enter(&tq->tq_threadlock, RW_READER);
 248                 t->tqent_func(t->tqent_arg);
 249                 rw_exit(&tq->tq_threadlock);
 250 
 251                 mutex_enter(&tq->tq_lock);
 252                 if (!prealloc)
 253                         task_free(tq, t);
 254         }
 255         tq->tq_nthreads--;
 256         cv_broadcast(&tq->tq_wait_cv);
 257         mutex_exit(&tq->tq_lock);
 258         return (NULL);
 259 }
 260 
 261 /*ARGSUSED*/
 262 taskq_t *
 263 taskq_create(const char *name, int nthr, pri_t pri, int minalloc,
 264     int maxalloc, uint_t flags)
 265 {
 266         return (taskq_create_proc(name, nthr, pri,
 267             minalloc, maxalloc, NULL, flags));
 268 }
 269 
 270 /*ARGSUSED*/
 271 taskq_t *
 272 taskq_create_sysdc(const char *name, int nthr, int minalloc,
 273     int maxalloc, proc_t *proc, uint_t dc, uint_t flags)
 274 {
 275         return (taskq_create_proc(name, nthr, maxclsyspri,
 276             minalloc, maxalloc, proc, flags));
 277 }
 278 
 279 /*ARGSUSED*/
 280 taskq_t *
 281 taskq_create_proc(const char *name, int nthreads, pri_t pri,
 282         int minalloc, int maxalloc, proc_t *proc, uint_t flags)
 283 {
 284         taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);
 285         int t;
 286 
 287         if (flags & TASKQ_THREADS_CPU_PCT) {
 288                 int pct;
 289                 ASSERT3S(nthreads, >=, 0);
 290                 ASSERT3S(nthreads, <=, 100);
 291                 pct = MIN(nthreads, 100);
 292                 pct = MAX(pct, 0);
 293 
 294                 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;
 295                 nthreads = MAX(nthreads, 1);    /* need at least 1 thread */
 296         } else {
 297                 ASSERT3S(nthreads, >=, 1);
 298         }
 299 
 300         rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
 301         mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
 302         cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
 303         cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
 304         cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
 305         tq->tq_flags = flags | TASKQ_ACTIVE;
 306         tq->tq_active = nthreads;
 307         tq->tq_nthreads = nthreads;
 308         tq->tq_minalloc = minalloc;
 309         tq->tq_maxalloc = maxalloc;
 310         tq->tq_task.tqent_next = &tq->tq_task;
 311         tq->tq_task.tqent_prev = &tq->tq_task;
 312         tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP);
 313 
 314         if (flags & TASKQ_PREPOPULATE) {
 315                 mutex_enter(&tq->tq_lock);
 316                 while (minalloc-- > 0)
 317                         task_free(tq, task_alloc(tq, KM_SLEEP));
 318                 mutex_exit(&tq->tq_lock);
 319         }
 320 
 321         for (t = 0; t < nthreads; t++)
 322                 (void) thr_create(0, 0, taskq_thread,
 323                     tq, THR_BOUND, &tq->tq_threadlist[t]);
 324 
 325         return (tq);
 326 }
 327 
 328 void
 329 taskq_destroy(taskq_t *tq)
 330 {
 331         int t;
 332         int nthreads = tq->tq_nthreads;
 333 
 334         taskq_wait(tq);
 335 
 336         mutex_enter(&tq->tq_lock);
 337 
 338         tq->tq_flags &= ~TASKQ_ACTIVE;
 339         cv_broadcast(&tq->tq_dispatch_cv);
 340 
 341         while (tq->tq_nthreads != 0)
 342                 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
 343 
 344         tq->tq_minalloc = 0;
 345         while (tq->tq_nalloc != 0) {
 346                 ASSERT(tq->tq_freelist != NULL);
 347                 task_free(tq, task_alloc(tq, KM_SLEEP));
 348         }
 349 
 350         mutex_exit(&tq->tq_lock);
 351 
 352         for (t = 0; t < nthreads; t++)
 353                 (void) thr_join(tq->tq_threadlist[t], NULL, NULL);
 354 
 355         kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t));
 356 
 357         rw_destroy(&tq->tq_threadlock);
 358         mutex_destroy(&tq->tq_lock);
 359         cv_destroy(&tq->tq_dispatch_cv);
 360         cv_destroy(&tq->tq_wait_cv);
 361         cv_destroy(&tq->tq_maxalloc_cv);
 362 
 363         kmem_free(tq, sizeof (taskq_t));
 364 }
 365 
 366 int
 367 taskq_member(taskq_t *tq, struct _kthread *t)
 368 {
 369         int i;
 370 
 371         if (taskq_now)
 372                 return (1);
 373 
 374         for (i = 0; i < tq->tq_nthreads; i++)
 375                 if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t)
 376                         return (1);
 377 
 378         return (0);
 379 }
 380 
 381 void
 382 system_taskq_init(void)
 383 {
 384         system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512,
 385             TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
 386 }
 387 
 388 void
 389 system_taskq_fini(void)
 390 {
 391         taskq_destroy(system_taskq);
 392         system_taskq = NULL; /* defensive */
 393 }