1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* 26 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 27 * Copyright 2013 Nexenta Systems, Inc. All rights reserved. 28 * Copyright 2017 RackTop Systems. 29 */ 30 31 #include <sys/taskq_impl.h> 32 33 #include <sys/class.h> 34 #include <sys/debug.h> 35 #include <sys/ksynch.h> 36 #include <sys/kmem.h> 37 #include <sys/time.h> 38 #include <sys/systm.h> 39 #include <sys/sysmacros.h> 40 #include <sys/unistd.h> 41 42 /* avoid <sys/disp.h> */ 43 #define maxclsyspri 99 44 45 /* avoid <unistd.h> */ 46 extern long sysconf(int); 47 48 /* avoiding <thread.h> */ 49 typedef unsigned int thread_t; 50 typedef unsigned int thread_key_t; 51 52 extern int thr_create(void *, size_t, void *(*)(void *), void *, long, 53 thread_t *); 54 extern int thr_join(thread_t, thread_t *, void **); 55 56 /* 57 * POSIX.1c Note: 58 * THR_BOUND is defined same as PTHREAD_SCOPE_SYSTEM in <pthread.h> 59 * THR_DETACHED is defined same as PTHREAD_CREATE_DETACHED in <pthread.h> 60 * Any changes in these definitions should be reflected in <pthread.h> 61 */ 62 #define THR_BOUND 0x00000001 /* = PTHREAD_SCOPE_SYSTEM */ 63 #define THR_NEW_LWP 0x00000002 64 #define THR_DETACHED 0x00000040 /* = PTHREAD_CREATE_DETACHED */ 65 #define THR_SUSPENDED 0x00000080 66 #define THR_DAEMON 0x00000100 67 68 69 int taskq_now; 70 taskq_t *system_taskq; 71 72 #define TASKQ_ACTIVE 0x00010000 73 74 struct taskq { 75 kmutex_t tq_lock; 76 krwlock_t tq_threadlock; 77 kcondvar_t tq_dispatch_cv; 78 kcondvar_t tq_wait_cv; 79 thread_t *tq_threadlist; 80 int tq_flags; 81 int tq_active; 82 int tq_nthreads; 83 int tq_nalloc; 84 int tq_minalloc; 85 int tq_maxalloc; 86 kcondvar_t tq_maxalloc_cv; 87 int tq_maxalloc_wait; 88 taskq_ent_t *tq_freelist; 89 taskq_ent_t tq_task; 90 }; 91 92 static taskq_ent_t * 93 task_alloc(taskq_t *tq, int tqflags) 94 { 95 taskq_ent_t *t; 96 int rv; 97 98 again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 99 tq->tq_freelist = t->tqent_next; 100 } else { 101 if (tq->tq_nalloc >= tq->tq_maxalloc) { 102 if (!(tqflags & KM_SLEEP)) 103 return (NULL); 104 105 /* 106 * We don't want to exceed tq_maxalloc, but we can't 107 * wait for other tasks to complete (and thus free up 108 * task structures) without risking deadlock with 109 * the caller. So, we just delay for one second 110 * to throttle the allocation rate. If we have tasks 111 * complete before one second timeout expires then 112 * taskq_ent_free will signal us and we will 113 * immediately retry the allocation. 114 */ 115 tq->tq_maxalloc_wait++; 116 rv = cv_timedwait(&tq->tq_maxalloc_cv, 117 &tq->tq_lock, ddi_get_lbolt() + hz); 118 tq->tq_maxalloc_wait--; 119 if (rv > 0) 120 goto again; /* signaled */ 121 } 122 mutex_exit(&tq->tq_lock); 123 124 t = kmem_alloc(sizeof (taskq_ent_t), tqflags); 125 126 mutex_enter(&tq->tq_lock); 127 if (t != NULL) 128 tq->tq_nalloc++; 129 } 130 return (t); 131 } 132 133 static void 134 task_free(taskq_t *tq, taskq_ent_t *t) 135 { 136 if (tq->tq_nalloc <= tq->tq_minalloc) { 137 t->tqent_next = tq->tq_freelist; 138 tq->tq_freelist = t; 139 } else { 140 tq->tq_nalloc--; 141 mutex_exit(&tq->tq_lock); 142 kmem_free(t, sizeof (taskq_ent_t)); 143 mutex_enter(&tq->tq_lock); 144 } 145 146 if (tq->tq_maxalloc_wait) 147 cv_signal(&tq->tq_maxalloc_cv); 148 } 149 150 taskqid_t 151 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 152 { 153 taskq_ent_t *t; 154 155 if (taskq_now) { 156 func(arg); 157 return (1); 158 } 159 160 mutex_enter(&tq->tq_lock); 161 ASSERT(tq->tq_flags & TASKQ_ACTIVE); 162 if ((t = task_alloc(tq, tqflags)) == NULL) { 163 mutex_exit(&tq->tq_lock); 164 return (0); 165 } 166 if (tqflags & TQ_FRONT) { 167 t->tqent_next = tq->tq_task.tqent_next; 168 t->tqent_prev = &tq->tq_task; 169 } else { 170 t->tqent_next = &tq->tq_task; 171 t->tqent_prev = tq->tq_task.tqent_prev; 172 } 173 t->tqent_next->tqent_prev = t; 174 t->tqent_prev->tqent_next = t; 175 t->tqent_func = func; 176 t->tqent_arg = arg; 177 t->tqent_flags = 0; 178 cv_signal(&tq->tq_dispatch_cv); 179 mutex_exit(&tq->tq_lock); 180 return (1); 181 } 182 183 void 184 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 185 taskq_ent_t *t) 186 { 187 ASSERT(func != NULL); 188 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 189 190 /* 191 * Mark it as a prealloc'd task. This is important 192 * to ensure that we don't free it later. 193 */ 194 t->tqent_flags |= TQENT_FLAG_PREALLOC; 195 /* 196 * Enqueue the task to the underlying queue. 197 */ 198 mutex_enter(&tq->tq_lock); 199 200 if (flags & TQ_FRONT) { 201 t->tqent_next = tq->tq_task.tqent_next; 202 t->tqent_prev = &tq->tq_task; 203 } else { 204 t->tqent_next = &tq->tq_task; 205 t->tqent_prev = tq->tq_task.tqent_prev; 206 } 207 t->tqent_next->tqent_prev = t; 208 t->tqent_prev->tqent_next = t; 209 t->tqent_func = func; 210 t->tqent_arg = arg; 211 cv_signal(&tq->tq_dispatch_cv); 212 mutex_exit(&tq->tq_lock); 213 } 214 215 void 216 taskq_wait(taskq_t *tq) 217 { 218 mutex_enter(&tq->tq_lock); 219 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 220 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 221 mutex_exit(&tq->tq_lock); 222 } 223 224 static void * 225 taskq_thread(void *arg) 226 { 227 taskq_t *tq = arg; 228 taskq_ent_t *t; 229 boolean_t prealloc; 230 231 mutex_enter(&tq->tq_lock); 232 while (tq->tq_flags & TASKQ_ACTIVE) { 233 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 234 if (--tq->tq_active == 0) 235 cv_broadcast(&tq->tq_wait_cv); 236 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 237 tq->tq_active++; 238 continue; 239 } 240 t->tqent_prev->tqent_next = t->tqent_next; 241 t->tqent_next->tqent_prev = t->tqent_prev; 242 t->tqent_next = NULL; 243 t->tqent_prev = NULL; 244 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 245 mutex_exit(&tq->tq_lock); 246 247 rw_enter(&tq->tq_threadlock, RW_READER); 248 t->tqent_func(t->tqent_arg); 249 rw_exit(&tq->tq_threadlock); 250 251 mutex_enter(&tq->tq_lock); 252 if (!prealloc) 253 task_free(tq, t); 254 } 255 tq->tq_nthreads--; 256 cv_broadcast(&tq->tq_wait_cv); 257 mutex_exit(&tq->tq_lock); 258 return (NULL); 259 } 260 261 /*ARGSUSED*/ 262 taskq_t * 263 taskq_create(const char *name, int nthr, pri_t pri, int minalloc, 264 int maxalloc, uint_t flags) 265 { 266 return (taskq_create_proc(name, nthr, pri, 267 minalloc, maxalloc, NULL, flags)); 268 } 269 270 /*ARGSUSED*/ 271 taskq_t * 272 taskq_create_sysdc(const char *name, int nthr, int minalloc, 273 int maxalloc, proc_t *proc, uint_t dc, uint_t flags) 274 { 275 return (taskq_create_proc(name, nthr, maxclsyspri, 276 minalloc, maxalloc, proc, flags)); 277 } 278 279 /*ARGSUSED*/ 280 taskq_t * 281 taskq_create_proc(const char *name, int nthreads, pri_t pri, 282 int minalloc, int maxalloc, proc_t *proc, uint_t flags) 283 { 284 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 285 int t; 286 287 if (flags & TASKQ_THREADS_CPU_PCT) { 288 int pct; 289 ASSERT3S(nthreads, >=, 0); 290 ASSERT3S(nthreads, <=, 100); 291 pct = MIN(nthreads, 100); 292 pct = MAX(pct, 0); 293 294 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 295 nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 296 } else { 297 ASSERT3S(nthreads, >=, 1); 298 } 299 300 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 301 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 302 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 303 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 304 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 305 tq->tq_flags = flags | TASKQ_ACTIVE; 306 tq->tq_active = nthreads; 307 tq->tq_nthreads = nthreads; 308 tq->tq_minalloc = minalloc; 309 tq->tq_maxalloc = maxalloc; 310 tq->tq_task.tqent_next = &tq->tq_task; 311 tq->tq_task.tqent_prev = &tq->tq_task; 312 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 313 314 if (flags & TASKQ_PREPOPULATE) { 315 mutex_enter(&tq->tq_lock); 316 while (minalloc-- > 0) 317 task_free(tq, task_alloc(tq, KM_SLEEP)); 318 mutex_exit(&tq->tq_lock); 319 } 320 321 for (t = 0; t < nthreads; t++) 322 (void) thr_create(0, 0, taskq_thread, 323 tq, THR_BOUND, &tq->tq_threadlist[t]); 324 325 return (tq); 326 } 327 328 void 329 taskq_destroy(taskq_t *tq) 330 { 331 int t; 332 int nthreads = tq->tq_nthreads; 333 334 taskq_wait(tq); 335 336 mutex_enter(&tq->tq_lock); 337 338 tq->tq_flags &= ~TASKQ_ACTIVE; 339 cv_broadcast(&tq->tq_dispatch_cv); 340 341 while (tq->tq_nthreads != 0) 342 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 343 344 tq->tq_minalloc = 0; 345 while (tq->tq_nalloc != 0) { 346 ASSERT(tq->tq_freelist != NULL); 347 task_free(tq, task_alloc(tq, KM_SLEEP)); 348 } 349 350 mutex_exit(&tq->tq_lock); 351 352 for (t = 0; t < nthreads; t++) 353 (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 354 355 kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 356 357 rw_destroy(&tq->tq_threadlock); 358 mutex_destroy(&tq->tq_lock); 359 cv_destroy(&tq->tq_dispatch_cv); 360 cv_destroy(&tq->tq_wait_cv); 361 cv_destroy(&tq->tq_maxalloc_cv); 362 363 kmem_free(tq, sizeof (taskq_t)); 364 } 365 366 int 367 taskq_member(taskq_t *tq, struct _kthread *t) 368 { 369 int i; 370 371 if (taskq_now) 372 return (1); 373 374 for (i = 0; i < tq->tq_nthreads; i++) 375 if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 376 return (1); 377 378 return (0); 379 } 380 381 void 382 system_taskq_init(void) 383 { 384 system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 385 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 386 } 387 388 void 389 system_taskq_fini(void) 390 { 391 taskq_destroy(system_taskq); 392 system_taskq = NULL; /* defensive */ 393 }