14 #include <pthread.h>
15 #include "v8plus_glue.h"
16
17 __thread v8plus_errno_t _v8plus_errno;
18 __thread char _v8plus_errmsg[V8PLUS_ERRMSG_LEN];
19
20 typedef struct v8plus_uv_ctx {
21 void *vuc_obj;
22 void *vuc_ctx;
23 void *vuc_result;
24 v8plus_worker_f vuc_worker;
25 v8plus_completion_f vuc_completion;
26 } v8plus_uv_ctx_t;
27
28 static STAILQ_HEAD(v8plus_callq_head, v8plus_async_call) _v8plus_callq =
29 STAILQ_HEAD_INITIALIZER(_v8plus_callq);
30 static pthread_mutex_t _v8plus_callq_mtx;
31 static pthread_t _v8plus_uv_event_thread;
32 static uv_async_t _v8plus_uv_async;
33
34 typedef struct v8plus_async_call {
35 void *vac_cop;
36 const char *vac_name;
37 const nvlist_t *vac_lp;
38
39 pthread_cond_t vac_cv;
40 pthread_mutex_t vac_mtx;
41
42 boolean_t vac_run;
43 nvlist_t *vac_return;
44
45 STAILQ_ENTRY(v8plus_async_call) vac_callq_entry;
46 } v8plus_async_call_t;
47
48 boolean_t
49 v8plus_in_event_thread(void)
50 {
51 return (_v8plus_uv_event_thread == pthread_self() ? B_TRUE : B_FALSE);
52 }
53
54 static void
55 v8plus_async_callback(uv_async_t *async, int status __UNUSED)
56 {
57 if (v8plus_in_event_thread() != B_TRUE)
58 v8plus_panic("async callback called outside of event loop");
59
60 for (;;) {
61 v8plus_async_call_t *vac = NULL;
62
63 /*
64 * Fetch the next queued method:
65 */
66 if (pthread_mutex_lock(&_v8plus_callq_mtx) != 0)
67 v8plus_panic("could not lock async queue mutex");
68 if (!STAILQ_EMPTY(&_v8plus_callq)) {
69 vac = STAILQ_FIRST(&_v8plus_callq);
70 STAILQ_REMOVE_HEAD(&_v8plus_callq, vac_callq_entry);
71 }
72 if (pthread_mutex_unlock(&_v8plus_callq_mtx) != 0)
73 v8plus_panic("could not unlock async queue mutex");
74
75 if (vac == NULL)
76 break;
77
78 /*
79 * Run the queued method:
80 */
81 if (vac->vac_run == B_TRUE)
82 v8plus_panic("async call already run");
83 vac->vac_return = v8plus_method_call_direct(vac->vac_cop,
84 vac->vac_name, vac->vac_lp);
85
86 if (pthread_mutex_lock(&vac->vac_mtx) != 0)
87 v8plus_panic("could not lock async call mutex");
88 vac->vac_run = B_TRUE;
89 if (pthread_cond_broadcast(&vac->vac_cv) != 0)
90 v8plus_panic("could not signal async call condvar");
91 if (pthread_mutex_unlock(&vac->vac_mtx) != 0)
92 v8plus_panic("could not unlock async call mutex");
93 }
94 }
95
96 nvlist_t *
97 v8plus_method_call(void *cop, const char *name, const nvlist_t *lp)
98 {
99 v8plus_async_call_t vac;
100
101 if (v8plus_in_event_thread() == B_TRUE) {
102 /*
103 * We're running in the event loop thread, so we can make the
104 * call directly.
105 */
106 return (v8plus_method_call_direct(cop, name, lp));
107 }
108
109 /*
110 * As we cannot manipulate v8plus/V8/Node structures directly from
111 * outside the event loop thread, we push the call arguments onto a
112 * queue and post to the event loop thread. We then sleep on our
113 * condition variable until the event loop thread makes the call
114 * for us and wakes us up.
115 */
116 bzero(&vac, sizeof (vac));
117 vac.vac_cop = cop;
118 vac.vac_name = name;
119 vac.vac_lp = lp;
120 if (pthread_mutex_init(&vac.vac_mtx, NULL) != 0)
121 v8plus_panic("could not init async call mutex");
122 if (pthread_cond_init(&vac.vac_cv, NULL) != 0)
123 v8plus_panic("could not init async call condvar");
124 vac.vac_run = B_FALSE;
125
126 /*
127 * Post request to queue:
128 */
129 if (pthread_mutex_lock(&_v8plus_callq_mtx) != 0)
130 v8plus_panic("could not lock async queue mutex");
131 STAILQ_INSERT_TAIL(&_v8plus_callq, &vac, vac_callq_entry);
132 if (pthread_mutex_unlock(&_v8plus_callq_mtx) != 0)
133 v8plus_panic("could not unlock async queue mutex");
134 uv_async_send(&_v8plus_uv_async);
135
136 /*
137 * Wait for our request to be serviced on the event loop thread:
138 */
139 if (pthread_mutex_lock(&vac.vac_mtx) != 0)
140 v8plus_panic("could not lock async call mutex");
141 while (vac.vac_run == B_FALSE) {
142 if (pthread_cond_wait(&vac.vac_cv, &vac.vac_mtx) != 0)
143 v8plus_panic("could not wait on async call condvar");
144 }
145 if (pthread_mutex_unlock(&vac.vac_mtx) != 0)
146 v8plus_panic("could not unlock async call mutex");
147
148 if (pthread_cond_destroy(&vac.vac_cv) != 0)
149 v8plus_panic("could not destroy async call condvar");
150 if (pthread_mutex_destroy(&vac.vac_mtx) != 0)
151 v8plus_panic("could not destroy async call mutex");
152
153 return (vac.vac_return);
154 }
155
156
157 /*
158 * Initialise structures for off-event-loop method calls.
159 *
160 * Note that uv_async_init() must be called inside the libuv event loop, so we
161 * do it here. We also want to record the thread ID of the Event Loop thread
162 * so as to determine what kind of method calls to make later.
163 */
164 void
165 v8plus_crossthread_init(void)
166 {
167 _v8plus_uv_event_thread = pthread_self();
168 if (uv_async_init(uv_default_loop(), &_v8plus_uv_async,
169 v8plus_async_callback) != 0)
170 v8plus_panic("unable to initialise uv_async_t");
171 if (pthread_mutex_init(&_v8plus_callq_mtx, NULL) != 0)
172 v8plus_panic("unable to initialise mutex");
173 }
174
175 nvlist_t *
|
14 #include <pthread.h>
15 #include "v8plus_glue.h"
16
17 __thread v8plus_errno_t _v8plus_errno;
18 __thread char _v8plus_errmsg[V8PLUS_ERRMSG_LEN];
19
20 typedef struct v8plus_uv_ctx {
21 void *vuc_obj;
22 void *vuc_ctx;
23 void *vuc_result;
24 v8plus_worker_f vuc_worker;
25 v8plus_completion_f vuc_completion;
26 } v8plus_uv_ctx_t;
27
28 static STAILQ_HEAD(v8plus_callq_head, v8plus_async_call) _v8plus_callq =
29 STAILQ_HEAD_INITIALIZER(_v8plus_callq);
30 static pthread_mutex_t _v8plus_callq_mtx;
31 static pthread_t _v8plus_uv_event_thread;
32 static uv_async_t _v8plus_uv_async;
33
34 typedef enum v8plus_async_call_type {
35 ACT_OBJECT_CALL = 1,
36 ACT_OBJECT_RELEASE,
37 ACT_JSFUNC_CALL,
38 ACT_JSFUNC_RELEASE,
39 } v8plus_async_call_type_t;
40
41 typedef enum v8plus_async_call_flags {
42 ACF_COMPLETED = 0x01,
43 ACF_NOREPLY = 0x02
44 } v8plus_async_call_flags_t;
45
46
47 typedef struct v8plus_async_call {
48 v8plus_async_call_type_t vac_type;
49 v8plus_async_call_flags_t vac_flags;
50
51 /*
52 * For ACT_OBJECT_{CALL,RELEASE}:
53 */
54 void *vac_cop;
55 const char *vac_name;
56 /*
57 * For ACT_JSFUNC_{CALL,RELEASE}:
58 */
59 v8plus_jsfunc_t vac_func;
60
61 /*
62 * Common call arguments:
63 */
64 const nvlist_t *vac_lp;
65 nvlist_t *vac_return;
66
67 pthread_cond_t vac_cv;
68 pthread_mutex_t vac_mtx;
69
70 STAILQ_ENTRY(v8plus_async_call) vac_callq_entry;
71 } v8plus_async_call_t;
72
73 boolean_t
74 v8plus_in_event_thread(void)
75 {
76 return (_v8plus_uv_event_thread == pthread_self() ? B_TRUE : B_FALSE);
77 }
78
79 static void
80 v8plus_async_callback(uv_async_t *async __UNUSED, int status __UNUSED)
81 {
82 if (v8plus_in_event_thread() != B_TRUE)
83 v8plus_panic("async callback called outside of event loop");
84
85 for (;;) {
86 v8plus_async_call_t *vac = NULL;
87
88 /*
89 * Fetch the next queued method:
90 */
91 if (pthread_mutex_lock(&_v8plus_callq_mtx) != 0)
92 v8plus_panic("could not lock async queue mutex");
93 if (!STAILQ_EMPTY(&_v8plus_callq)) {
94 vac = STAILQ_FIRST(&_v8plus_callq);
95 STAILQ_REMOVE_HEAD(&_v8plus_callq, vac_callq_entry);
96 }
97 if (pthread_mutex_unlock(&_v8plus_callq_mtx) != 0)
98 v8plus_panic("could not unlock async queue mutex");
99
100 if (vac == NULL)
101 break;
102
103 /*
104 * Run the queued method:
105 */
106 if (vac->vac_flags & ACF_COMPLETED)
107 v8plus_panic("async call already run");
108
109 switch (vac->vac_type) {
110 case ACT_OBJECT_CALL:
111 vac->vac_return = v8plus_method_call_direct(
112 vac->vac_cop, vac->vac_name, vac->vac_lp);
113 break;
114 case ACT_OBJECT_RELEASE:
115 v8plus_obj_rele_direct(vac->vac_cop);
116 break;
117 case ACT_JSFUNC_CALL:
118 vac->vac_return = v8plus_call_direct(
119 vac->vac_func, vac->vac_lp);
120 break;
121 case ACT_JSFUNC_RELEASE:
122 v8plus_jsfunc_rele_direct(vac->vac_func);
123 break;
124 }
125
126 if (vac->vac_flags & ACF_NOREPLY) {
127 /*
128 * The caller posted this event and is not sleeping
129 * on a reply. Just free the call structure and move
130 * on.
131 */
132 free(vac);
133 if (vac->vac_lp != NULL)
134 nvlist_free((nvlist_t *)vac->vac_lp);
135 continue;
136 }
137
138 if (pthread_mutex_lock(&vac->vac_mtx) != 0)
139 v8plus_panic("could not lock async call mutex");
140 vac->vac_flags |= ACF_COMPLETED;
141 if (pthread_cond_broadcast(&vac->vac_cv) != 0)
142 v8plus_panic("could not signal async call condvar");
143 if (pthread_mutex_unlock(&vac->vac_mtx) != 0)
144 v8plus_panic("could not unlock async call mutex");
145 }
146 }
147
148 /*
149 * As we cannot manipulate v8plus/V8/Node structures directly from outside the
150 * event loop thread, we push the call arguments onto a queue and post to the
151 * event loop thread. We then sleep on our condition variable until the event
152 * loop thread makes the call for us and wakes us up.
153 *
154 * This routine implements the parts of this interaction common to all
155 * variants.
156 */
157 static nvlist_t *
158 v8plus_cross_thread_call(v8plus_async_call_t *vac)
159 {
160 /*
161 * Common call structure initialisation:
162 */
163 if (pthread_mutex_init(&vac->vac_mtx, NULL) != 0)
164 v8plus_panic("could not init async call mutex");
165 if (pthread_cond_init(&vac->vac_cv, NULL) != 0)
166 v8plus_panic("could not init async call condvar");
167 vac->vac_flags &= ~(ACF_COMPLETED);
168
169 /*
170 * Post request to queue:
171 */
172 if (pthread_mutex_lock(&_v8plus_callq_mtx) != 0)
173 v8plus_panic("could not lock async queue mutex");
174 STAILQ_INSERT_TAIL(&_v8plus_callq, vac, vac_callq_entry);
175 if (pthread_mutex_unlock(&_v8plus_callq_mtx) != 0)
176 v8plus_panic("could not unlock async queue mutex");
177 uv_async_send(&_v8plus_uv_async);
178
179 if (vac->vac_flags & ACF_NOREPLY) {
180 /*
181 * The caller does not care about the reply, and has allocated
182 * the v8plus_async_call_t structure from the heap. The
183 * async callback will free the storage when it completes.
184 */
185 return (NULL);
186 }
187
188 /*
189 * Wait for our request to be serviced on the event loop thread:
190 */
191 if (pthread_mutex_lock(&vac->vac_mtx) != 0)
192 v8plus_panic("could not lock async call mutex");
193 while (!(vac->vac_flags & ACF_COMPLETED)) {
194 if (pthread_cond_wait(&vac->vac_cv, &vac->vac_mtx) != 0)
195 v8plus_panic("could not wait on async call condvar");
196 }
197 if (pthread_mutex_unlock(&vac->vac_mtx) != 0)
198 v8plus_panic("could not unlock async call mutex");
199
200 if (pthread_cond_destroy(&vac->vac_cv) != 0)
201 v8plus_panic("could not destroy async call condvar");
202 if (pthread_mutex_destroy(&vac->vac_mtx) != 0)
203 v8plus_panic("could not destroy async call mutex");
204
205 return (vac->vac_return);
206 }
207
208 nvlist_t *
209 v8plus_method_call(void *cop, const char *name, const nvlist_t *lp)
210 {
211 v8plus_async_call_t vac;
212
213 if (v8plus_in_event_thread() == B_TRUE) {
214 /*
215 * We're running in the event loop thread, so we can make the
216 * call directly.
217 */
218 return (v8plus_method_call_direct(cop, name, lp));
219 }
220
221 bzero(&vac, sizeof (vac));
222 vac.vac_type = ACT_OBJECT_CALL;
223 vac.vac_cop = cop;
224 vac.vac_name = name;
225 vac.vac_lp = lp;
226
227 return (v8plus_cross_thread_call(&vac));
228 }
229
230 nvlist_t *
231 v8plus_call(v8plus_jsfunc_t func, const nvlist_t *lp)
232 {
233 v8plus_async_call_t vac;
234
235 if (v8plus_in_event_thread() == B_TRUE) {
236 /*
237 * We're running in the event loop thread, so we can make the
238 * call directly.
239 */
240 return (v8plus_call_direct(func, lp));
241 }
242
243 bzero(&vac, sizeof (vac));
244 vac.vac_type = ACT_JSFUNC_CALL;
245 vac.vac_func = func;
246 vac.vac_lp = lp;
247
248 return (v8plus_cross_thread_call(&vac));
249 }
250
251 void
252 v8plus_obj_rele(const void *cop)
253 {
254 v8plus_async_call_t *vac;
255
256 if (v8plus_in_event_thread() == B_TRUE) {
257 return (v8plus_obj_rele_direct(cop));
258 }
259
260 vac = calloc(1, sizeof (*vac));
261 if (vac == NULL)
262 v8plus_panic("could not allocate async call structure");
263
264 vac->vac_type = ACT_OBJECT_RELEASE;
265 vac->vac_flags = ACF_NOREPLY;
266 vac->vac_cop = (void *)cop;
267
268 (void) v8plus_cross_thread_call(vac);
269 }
270
271 void
272 v8plus_jsfunc_rele(v8plus_jsfunc_t f)
273 {
274 v8plus_async_call_t *vac;
275
276 if (v8plus_in_event_thread() == B_TRUE) {
277 return (v8plus_jsfunc_rele_direct(f));
278 }
279
280 vac = calloc(1, sizeof (*vac));
281 if (vac == NULL)
282 v8plus_panic("could not allocate async call structure");
283
284 vac->vac_type = ACT_JSFUNC_RELEASE;
285 vac->vac_flags = ACF_NOREPLY;
286 vac->vac_func = f;
287
288 (void) v8plus_cross_thread_call(vac);
289 }
290
291 /*
292 * Initialise structures for off-event-loop method calls.
293 *
294 * Note that uv_async_init() must be called inside the libuv event loop, so we
295 * do it here. We also want to record the thread ID of the Event Loop thread
296 * so as to determine what kind of method calls to make later.
297 */
298 void
299 v8plus_crossthread_init(void)
300 {
301 _v8plus_uv_event_thread = pthread_self();
302 if (uv_async_init(uv_default_loop(), &_v8plus_uv_async,
303 v8plus_async_callback) != 0)
304 v8plus_panic("unable to initialise uv_async_t");
305 if (pthread_mutex_init(&_v8plus_callq_mtx, NULL) != 0)
306 v8plus_panic("unable to initialise mutex");
307 }
308
309 nvlist_t *
|