Print this page
XXX review feedback from keith

@@ -2,15 +2,18 @@
  * Copyright (c) 2012 Joyent, Inc.  All rights reserved.
  */
 
 #include <sys/ccompile.h>
 #include <sys/debug.h>
+#include <sys/queue.h>
+#include <sys/types.h>
 #include <stdarg.h>
 #include <string.h>
 #include <strings.h>
 #include <errno.h>
 #include <uv.h>
+#include <pthread.h>
 #include "v8plus_glue.h"
 
 __thread v8plus_errno_t _v8plus_errno;
 __thread char _v8plus_errmsg[V8PLUS_ERRMSG_LEN];
 

@@ -20,10 +23,154 @@
         void *vuc_result;
         v8plus_worker_f vuc_worker;
         v8plus_completion_f vuc_completion;
 } v8plus_uv_ctx_t;
 
+static STAILQ_HEAD(v8plus_callq_head, v8plus_async_call) _v8plus_callq =
+    STAILQ_HEAD_INITIALIZER(_v8plus_callq);
+static pthread_mutex_t _v8plus_callq_mtx;
+static pthread_t _v8plus_uv_event_thread;
+static uv_async_t _v8plus_uv_async;
+
+typedef struct v8plus_async_call {
+        void *vac_cop;
+        const char *vac_name;
+        const nvlist_t *vac_lp;
+
+        pthread_cond_t vac_cv;
+        pthread_mutex_t vac_mtx;
+
+        boolean_t vac_run;
+        nvlist_t *vac_return;
+
+        STAILQ_ENTRY(v8plus_async_call) vac_callq_entry;
+} v8plus_async_call_t;
+
+nvlist_t *v8plus_method_call_direct(void *, const char *, const nvlist_t *);
+
+boolean_t
+v8plus_in_event_thread(void)
+{
+        return (_v8plus_uv_event_thread == pthread_self() ? B_TRUE : B_FALSE);
+}
+
+static void
+v8plus_async_callback(uv_async_t *async, __attribute__((unused)) int status)
+{
+        if (v8plus_in_event_thread() != B_TRUE)
+                v8plus_panic("async callback called outside of event loop");
+
+        for (;;) {
+                v8plus_async_call_t *vac = NULL;
+
+                /*
+                 * Fetch the next queued method:
+                 */
+                if (pthread_mutex_lock(&_v8plus_callq_mtx) != 0)
+                        v8plus_panic("could not lock async queue mutex");
+                if (!STAILQ_EMPTY(&_v8plus_callq)) {
+                        vac = STAILQ_FIRST(&_v8plus_callq);
+                        STAILQ_REMOVE_HEAD(&_v8plus_callq, vac_callq_entry);
+                }
+                if (pthread_mutex_unlock(&_v8plus_callq_mtx) != 0)
+                        v8plus_panic("could not unlock async queue mutex");
+
+                if (vac == NULL)
+                        break;
+
+                /*
+                 * Run the queued method:
+                 */
+                if (pthread_mutex_lock(&vac->vac_mtx) != 0)
+                        v8plus_panic("could not lock async call mutex");
+
+                if (vac->vac_run == B_TRUE)
+                        v8plus_panic("async call already run");
+
+                vac->vac_return = v8plus_method_call_direct(vac->vac_cop,
+                    vac->vac_name, vac->vac_lp);
+                vac->vac_run = B_TRUE;
+
+                if (pthread_cond_broadcast(&vac->vac_cv) != 0)
+                        v8plus_panic("could not signal async call condvar");
+                if (pthread_mutex_unlock(&vac->vac_mtx) != 0)
+                        v8plus_panic("could not unlock async call mutex");
+        }
+}
+
+nvlist_t *
+v8plus_method_call(void *cop, const char *name, const nvlist_t *lp)
+{
+        v8plus_async_call_t vac;
+
+        if (v8plus_in_event_thread() == B_TRUE) {
+                /*
+                 * We're running in the event loop thread, so we can make the
+                 * call directly.
+                 */
+                return (v8plus_method_call_direct(cop, name, lp));
+        }
+
+        /*
+         * As we cannot manipulate v8plus/V8/Node structures directly from
+         * outside the event loop thread, we push the call arguments onto a
+         * queue and post to the event loop thread.  We then sleep on our
+         * condition variable until the event loop thread makes the call
+         * for us and wakes us up.
+         */
+        vac.vac_cop = cop;
+        vac.vac_name = name;
+        vac.vac_lp = lp;
+        if (pthread_mutex_init(&vac.vac_mtx, NULL) != 0)
+                v8plus_panic("could not init async call mutex");
+        if (pthread_cond_init(&vac.vac_cv, NULL) != 0)
+                v8plus_panic("could not init async call condvar");
+        vac.vac_run = B_FALSE;
+        vac.vac_return = NULL;
+
+        /*
+         * Post request to queue:
+         */
+        if (pthread_mutex_lock(&_v8plus_callq_mtx) != 0)
+                v8plus_panic("could not lock async queue mutex");
+        STAILQ_INSERT_TAIL(&_v8plus_callq, &vac, vac_callq_entry);
+        if (pthread_mutex_unlock(&_v8plus_callq_mtx) != 0)
+                v8plus_panic("could not unlock async queue mutex");
+        uv_async_send(&_v8plus_uv_async);
+
+        /*
+         * Wait for our request to be serviced on the Event Loop thread:
+         */
+        if (pthread_mutex_lock(&vac.vac_mtx) != 0)
+                v8plus_panic("could not lock async call mutex");
+        while (vac.vac_run == B_FALSE) {
+                if (pthread_cond_wait(&vac.vac_cv, &vac.vac_mtx) != 0)
+                        v8plus_panic("could not wait on async call condvar");
+        }
+
+        return (vac.vac_return);
+}
+
+
+/*
+ * Initialise structures for off-event-loop method calls.
+ *
+ * Note that uv_async_init() must be called inside the libuv Event Loop, so we
+ * do it here.  We also want to record the thread ID of the Event Loop thread
+ * so as to determine what kind of method calls to make later.
+ */
+void
+v8plus_crossthread_init(void)
+{
+        _v8plus_uv_event_thread = pthread_self();
+        if (uv_async_init(uv_default_loop(), &_v8plus_uv_async,
+            v8plus_async_callback) != 0)
+                v8plus_panic("unable to initialise uv_async_t");
+        if (pthread_mutex_init(&_v8plus_callq_mtx, NULL) != 0)
+                v8plus_panic("unable to initialise mutex");
+}
+
 nvlist_t *
 v8plus_verror(v8plus_errno_t e, const char *fmt, va_list ap)
 {
         if (fmt == NULL) {
                 if (e == V8PLUSERR_NOERROR) {