Print this page
8115 parallel zfs mount
Split |
Close |
Expand all |
Collapse all |
--- old/usr/src/lib/libfakekernel/common/taskq.c
+++ new/usr/src/lib/libfakekernel/common/taskq.c
1 1 /*
2 2 * CDDL HEADER START
3 3 *
4 4 * The contents of this file are subject to the terms of the
5 5 * Common Development and Distribution License (the "License").
6 6 * You may not use this file except in compliance with the License.
7 7 *
8 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 9 * or http://www.opensolaris.org/os/licensing.
10 10 * See the License for the specific language governing permissions
11 11 * and limitations under the License.
12 12 *
13 13 * When distributing Covered Code, include this CDDL HEADER in each
14 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 15 * If applicable, add the following below this CDDL HEADER, with the
16 16 * fields enclosed by brackets "[]" replaced with your own identifying
17 17 * information: Portions Copyright [yyyy] [name of copyright owner]
↓ open down ↓ |
17 lines elided |
↑ open up ↑ |
18 18 *
19 19 * CDDL HEADER END
20 20 */
21 21 /*
22 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
23 23 * Use is subject to license terms.
24 24 */
25 25 /*
26 26 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved.
27 27 * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
28 + * Copyright 2017 RackTop Systems.
28 29 */
29 30
30 31 #include <sys/taskq_impl.h>
31 32
32 33 #include <sys/class.h>
33 34 #include <sys/debug.h>
34 35 #include <sys/ksynch.h>
35 36 #include <sys/kmem.h>
36 37 #include <sys/time.h>
37 38 #include <sys/systm.h>
38 39 #include <sys/sysmacros.h>
39 40 #include <sys/unistd.h>
40 41
42 +/* avoid <sys/disp.h> */
43 +#define maxclsyspri 99
44 +
41 45 /* avoid <unistd.h> */
42 46 extern long sysconf(int);
43 47
44 48 /* avoiding <thread.h> */
45 49 typedef unsigned int thread_t;
46 50 typedef unsigned int thread_key_t;
47 51
48 52 extern int thr_create(void *, size_t, void *(*)(void *), void *, long,
49 53 thread_t *);
50 54 extern int thr_join(thread_t, thread_t *, void **);
51 55
52 56 /*
53 57 * POSIX.1c Note:
54 58 * THR_BOUND is defined same as PTHREAD_SCOPE_SYSTEM in <pthread.h>
55 59 * THR_DETACHED is defined same as PTHREAD_CREATE_DETACHED in <pthread.h>
56 60 * Any changes in these definitions should be reflected in <pthread.h>
57 61 */
58 62 #define THR_BOUND 0x00000001 /* = PTHREAD_SCOPE_SYSTEM */
59 63 #define THR_NEW_LWP 0x00000002
60 64 #define THR_DETACHED 0x00000040 /* = PTHREAD_CREATE_DETACHED */
61 65 #define THR_SUSPENDED 0x00000080
62 66 #define THR_DAEMON 0x00000100
63 67
64 68
65 69 int taskq_now;
66 70 taskq_t *system_taskq;
67 71
68 72 #define TASKQ_ACTIVE 0x00010000
69 73
70 74 struct taskq {
71 75 kmutex_t tq_lock;
72 76 krwlock_t tq_threadlock;
73 77 kcondvar_t tq_dispatch_cv;
74 78 kcondvar_t tq_wait_cv;
75 79 thread_t *tq_threadlist;
76 80 int tq_flags;
77 81 int tq_active;
78 82 int tq_nthreads;
79 83 int tq_nalloc;
80 84 int tq_minalloc;
81 85 int tq_maxalloc;
82 86 kcondvar_t tq_maxalloc_cv;
83 87 int tq_maxalloc_wait;
84 88 taskq_ent_t *tq_freelist;
85 89 taskq_ent_t tq_task;
86 90 };
87 91
88 92 static taskq_ent_t *
89 93 task_alloc(taskq_t *tq, int tqflags)
90 94 {
91 95 taskq_ent_t *t;
92 96 int rv;
93 97
94 98 again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
95 99 tq->tq_freelist = t->tqent_next;
96 100 } else {
97 101 if (tq->tq_nalloc >= tq->tq_maxalloc) {
98 102 if (!(tqflags & KM_SLEEP))
99 103 return (NULL);
100 104
101 105 /*
102 106 * We don't want to exceed tq_maxalloc, but we can't
103 107 * wait for other tasks to complete (and thus free up
104 108 * task structures) without risking deadlock with
105 109 * the caller. So, we just delay for one second
106 110 * to throttle the allocation rate. If we have tasks
107 111 * complete before one second timeout expires then
108 112 * taskq_ent_free will signal us and we will
109 113 * immediately retry the allocation.
110 114 */
111 115 tq->tq_maxalloc_wait++;
112 116 rv = cv_timedwait(&tq->tq_maxalloc_cv,
113 117 &tq->tq_lock, ddi_get_lbolt() + hz);
114 118 tq->tq_maxalloc_wait--;
115 119 if (rv > 0)
116 120 goto again; /* signaled */
117 121 }
118 122 mutex_exit(&tq->tq_lock);
119 123
120 124 t = kmem_alloc(sizeof (taskq_ent_t), tqflags);
121 125
122 126 mutex_enter(&tq->tq_lock);
123 127 if (t != NULL)
124 128 tq->tq_nalloc++;
125 129 }
126 130 return (t);
127 131 }
128 132
129 133 static void
130 134 task_free(taskq_t *tq, taskq_ent_t *t)
131 135 {
132 136 if (tq->tq_nalloc <= tq->tq_minalloc) {
133 137 t->tqent_next = tq->tq_freelist;
134 138 tq->tq_freelist = t;
135 139 } else {
136 140 tq->tq_nalloc--;
137 141 mutex_exit(&tq->tq_lock);
138 142 kmem_free(t, sizeof (taskq_ent_t));
139 143 mutex_enter(&tq->tq_lock);
140 144 }
141 145
142 146 if (tq->tq_maxalloc_wait)
143 147 cv_signal(&tq->tq_maxalloc_cv);
144 148 }
145 149
146 150 taskqid_t
147 151 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
148 152 {
149 153 taskq_ent_t *t;
150 154
151 155 if (taskq_now) {
152 156 func(arg);
153 157 return (1);
154 158 }
155 159
156 160 mutex_enter(&tq->tq_lock);
157 161 ASSERT(tq->tq_flags & TASKQ_ACTIVE);
158 162 if ((t = task_alloc(tq, tqflags)) == NULL) {
159 163 mutex_exit(&tq->tq_lock);
160 164 return (0);
161 165 }
162 166 if (tqflags & TQ_FRONT) {
163 167 t->tqent_next = tq->tq_task.tqent_next;
164 168 t->tqent_prev = &tq->tq_task;
165 169 } else {
166 170 t->tqent_next = &tq->tq_task;
167 171 t->tqent_prev = tq->tq_task.tqent_prev;
168 172 }
169 173 t->tqent_next->tqent_prev = t;
170 174 t->tqent_prev->tqent_next = t;
171 175 t->tqent_func = func;
172 176 t->tqent_arg = arg;
173 177 t->tqent_flags = 0;
174 178 cv_signal(&tq->tq_dispatch_cv);
175 179 mutex_exit(&tq->tq_lock);
176 180 return (1);
177 181 }
178 182
179 183 void
180 184 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
181 185 taskq_ent_t *t)
182 186 {
183 187 ASSERT(func != NULL);
184 188 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
185 189
186 190 /*
187 191 * Mark it as a prealloc'd task. This is important
188 192 * to ensure that we don't free it later.
189 193 */
190 194 t->tqent_flags |= TQENT_FLAG_PREALLOC;
191 195 /*
192 196 * Enqueue the task to the underlying queue.
193 197 */
194 198 mutex_enter(&tq->tq_lock);
195 199
196 200 if (flags & TQ_FRONT) {
197 201 t->tqent_next = tq->tq_task.tqent_next;
198 202 t->tqent_prev = &tq->tq_task;
199 203 } else {
200 204 t->tqent_next = &tq->tq_task;
201 205 t->tqent_prev = tq->tq_task.tqent_prev;
202 206 }
203 207 t->tqent_next->tqent_prev = t;
204 208 t->tqent_prev->tqent_next = t;
205 209 t->tqent_func = func;
206 210 t->tqent_arg = arg;
207 211 cv_signal(&tq->tq_dispatch_cv);
208 212 mutex_exit(&tq->tq_lock);
209 213 }
210 214
211 215 void
212 216 taskq_wait(taskq_t *tq)
213 217 {
214 218 mutex_enter(&tq->tq_lock);
215 219 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
216 220 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
217 221 mutex_exit(&tq->tq_lock);
218 222 }
219 223
220 224 static void *
221 225 taskq_thread(void *arg)
222 226 {
223 227 taskq_t *tq = arg;
224 228 taskq_ent_t *t;
225 229 boolean_t prealloc;
226 230
227 231 mutex_enter(&tq->tq_lock);
228 232 while (tq->tq_flags & TASKQ_ACTIVE) {
229 233 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
230 234 if (--tq->tq_active == 0)
231 235 cv_broadcast(&tq->tq_wait_cv);
232 236 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
233 237 tq->tq_active++;
234 238 continue;
235 239 }
236 240 t->tqent_prev->tqent_next = t->tqent_next;
237 241 t->tqent_next->tqent_prev = t->tqent_prev;
238 242 t->tqent_next = NULL;
239 243 t->tqent_prev = NULL;
240 244 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
241 245 mutex_exit(&tq->tq_lock);
242 246
243 247 rw_enter(&tq->tq_threadlock, RW_READER);
244 248 t->tqent_func(t->tqent_arg);
245 249 rw_exit(&tq->tq_threadlock);
246 250
247 251 mutex_enter(&tq->tq_lock);
248 252 if (!prealloc)
249 253 task_free(tq, t);
250 254 }
251 255 tq->tq_nthreads--;
252 256 cv_broadcast(&tq->tq_wait_cv);
253 257 mutex_exit(&tq->tq_lock);
254 258 return (NULL);
255 259 }
↓ open down ↓ |
205 lines elided |
↑ open up ↑ |
256 260
257 261 /*ARGSUSED*/
258 262 taskq_t *
259 263 taskq_create(const char *name, int nthr, pri_t pri, int minalloc,
260 264 int maxalloc, uint_t flags)
261 265 {
262 266 return (taskq_create_proc(name, nthr, pri,
263 267 minalloc, maxalloc, NULL, flags));
264 268 }
265 269
270 +/*ARGSUSED*/
271 +taskq_t *
272 +taskq_create_sysdc(const char *name, int nthr, int minalloc,
273 + int maxalloc, proc_t *proc, uint_t dc, uint_t flags)
274 +{
275 + return (taskq_create_proc(name, nthr, maxclsyspri,
276 + minalloc, maxalloc, proc, flags));
277 +}
278 +
266 279 /*ARGSUSED*/
267 280 taskq_t *
268 281 taskq_create_proc(const char *name, int nthreads, pri_t pri,
269 282 int minalloc, int maxalloc, proc_t *proc, uint_t flags)
270 283 {
271 284 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);
272 285 int t;
273 286
274 287 if (flags & TASKQ_THREADS_CPU_PCT) {
275 288 int pct;
276 289 ASSERT3S(nthreads, >=, 0);
277 290 ASSERT3S(nthreads, <=, 100);
278 291 pct = MIN(nthreads, 100);
279 292 pct = MAX(pct, 0);
280 293
281 294 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;
282 295 nthreads = MAX(nthreads, 1); /* need at least 1 thread */
283 296 } else {
284 297 ASSERT3S(nthreads, >=, 1);
285 298 }
286 299
287 300 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
288 301 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
289 302 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
290 303 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
291 304 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
292 305 tq->tq_flags = flags | TASKQ_ACTIVE;
293 306 tq->tq_active = nthreads;
294 307 tq->tq_nthreads = nthreads;
295 308 tq->tq_minalloc = minalloc;
296 309 tq->tq_maxalloc = maxalloc;
297 310 tq->tq_task.tqent_next = &tq->tq_task;
298 311 tq->tq_task.tqent_prev = &tq->tq_task;
299 312 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP);
300 313
301 314 if (flags & TASKQ_PREPOPULATE) {
302 315 mutex_enter(&tq->tq_lock);
303 316 while (minalloc-- > 0)
304 317 task_free(tq, task_alloc(tq, KM_SLEEP));
305 318 mutex_exit(&tq->tq_lock);
306 319 }
307 320
308 321 for (t = 0; t < nthreads; t++)
309 322 (void) thr_create(0, 0, taskq_thread,
310 323 tq, THR_BOUND, &tq->tq_threadlist[t]);
311 324
312 325 return (tq);
313 326 }
314 327
315 328 void
316 329 taskq_destroy(taskq_t *tq)
317 330 {
318 331 int t;
319 332 int nthreads = tq->tq_nthreads;
320 333
321 334 taskq_wait(tq);
322 335
323 336 mutex_enter(&tq->tq_lock);
324 337
325 338 tq->tq_flags &= ~TASKQ_ACTIVE;
326 339 cv_broadcast(&tq->tq_dispatch_cv);
327 340
328 341 while (tq->tq_nthreads != 0)
329 342 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
330 343
331 344 tq->tq_minalloc = 0;
332 345 while (tq->tq_nalloc != 0) {
333 346 ASSERT(tq->tq_freelist != NULL);
334 347 task_free(tq, task_alloc(tq, KM_SLEEP));
335 348 }
336 349
337 350 mutex_exit(&tq->tq_lock);
338 351
339 352 for (t = 0; t < nthreads; t++)
340 353 (void) thr_join(tq->tq_threadlist[t], NULL, NULL);
341 354
342 355 kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t));
343 356
344 357 rw_destroy(&tq->tq_threadlock);
345 358 mutex_destroy(&tq->tq_lock);
346 359 cv_destroy(&tq->tq_dispatch_cv);
347 360 cv_destroy(&tq->tq_wait_cv);
348 361 cv_destroy(&tq->tq_maxalloc_cv);
349 362
350 363 kmem_free(tq, sizeof (taskq_t));
351 364 }
352 365
353 366 int
354 367 taskq_member(taskq_t *tq, struct _kthread *t)
355 368 {
356 369 int i;
357 370
358 371 if (taskq_now)
359 372 return (1);
360 373
361 374 for (i = 0; i < tq->tq_nthreads; i++)
362 375 if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t)
363 376 return (1);
364 377
365 378 return (0);
366 379 }
367 380
368 381 void
369 382 system_taskq_init(void)
370 383 {
371 384 system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512,
372 385 TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
373 386 }
374 387
375 388 void
376 389 system_taskq_fini(void)
377 390 {
378 391 taskq_destroy(system_taskq);
379 392 system_taskq = NULL; /* defensive */
380 393 }
↓ open down ↓ |
105 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX