Brad Nelson
August 25, 2012
In your browser, press ← and → to move through the slides, ↑ and ↓ jump to the beginning and end.
(On eBook readers, browse normally.)
function handleRequest(request, done) { if (request.style == 1) { getThing(request.name, function(result, err) { getThing(result, function(result, err) { done(result); }); }); } else { getThing('default', function(result, err) { done(result); }); } }
from twisted.internet import reactor, defer def getSlowSquare(x): d = defer.Defered() reactor.callLater(100, d.callback, x * x) def printValue(x): print d d = getSlowSquare(4) d.addCallback(printValue) reactor.callLater(200, reactor.stop) reactor.run()
PROs:
CONs:
: func1 value A [: A ( Reference A above ) 123 to A ;] ;
: func1 >s ( add to scope stack ) [: s> ( pull out of scope ) ;] sdrop ( drop in the parent scope ) ;
carnal knowledge scope stack scope flow control bind and invoke start and end scope
For gforth we know colon-sys is on the data stack and 4 cells:
carnal knowledge +≡4 constant colon-sys-size
Add a word to drop a colon-sys.
carnal knowledge +≡: colon-sys-drop ( colon-sys -- ) colon-sys-size 0 do drop loop ;
1000 cells constant scope-cells : scope-alloc ( -- s) scope-cells allocate 0= assert 1 cells over ! ; variable myscope scope-alloc myscope !
Add some push / pop operations.
scope stack +≡: scope+! ( n -- ) cells myscope @ +! ; : scope-ptr ( -- n ) myscope @ @ myscope @ + ; : >s ( n -- ) scope-ptr ! 1 scope+! ; : s> ( -- n ) -1 scope+! scope-ptr @ ; : sdrop ( -- ) -1 scope+! ;
: scope. ( s -- ) ." scope(" dup @ cell / 1- . ." ) " dup @ cell ?do dup i + @ . cell +loop drop cr ;
: scope-clone ( s -- s' ) scope-alloc dup >r scope-cells cmove r> ; : scope-free ( s -- ) free 0= assert ;
Alternate version of :noname.
scope flow control +≡: :noname2 ( -- xt ) :noname colon-sys-drop ;
: bind ( xt -- closure ) >s myscope @ scope-clone s> drop ; : invoke ( closure -- ) myscope @ >r ( leak ) scope-clone myscope ! s> execute myscope @ scope-free r> myscope ! ;
We want:
: foo a b c [: x y z ;] d e f ;
: [: postpone ahead postpone exit postpone [ :noname2 >s ; immediate : ;] postpone exit postpone then s> postpone literal postpone bind ; immediate
: scope-test 1 [: 2 [: 3 ;] 4 ;] 5 ; scope-test 5 assert= invoke 4 assert= invoke 3 assert= 2 assert= 1 assert=
: test-adder >s [: s> + ;] ; 5 4 test-adder invoke 9 assert=
\c #include <assert.h> \c #include <fcntl.h> \c #include <pthread.h> \c #include <stdio.h> \c #include <stdlib.h> \c #include <string.h> \c #include <unistd.h>
Some standard constants will be brought over from C.
relevant constants +≡\c #define DEFINT(name) int name##_int(void) { return name; } \c DEFINT(O_CREAT) \c DEFINT(O_TRUNC) \c DEFINT(O_WRONLY) \c DEFINT(O_RDONLY) c-function O_CREAT O_CREAT_int -- n c-function O_TRUNC O_TRUNC_int -- n c-function O_WRONLY O_WRONLY_int -- n c-function O_RDONLY O_RDONLY_int -- n
Declare full octal permissions:
relevant constants +≡: octal 8 base ! ; octal 777 constant rwx decimal
We will for now assume a fixed number of workers.
worker count +≡\c #define WORKERS 10 \c static pthread_t g_worker_pool[WORKERS];
The workers are started when the system is initialized.
start all workers +≡\c for (i = 0; i < WORKERS; ++i) { \c if (pthread_create(&g_worker_pool[i], NULL, Worker, NULL)) { \c assert(0); \c } \c }
\c typedef union { \c int number; \c void *pointer; \c } VARIANT; \c \c typedef struct _REQUEST { \c struct _REQUEST *next; \c enum { event types \c } operation; \c VARIANT args[4]; \c void *callback; \c int result; \c } REQUEST;
Two queues are involved in the system. Guarded by a single lock so that a single pending count for the complete pipeline can be kept.
lock and count +≡\c static pthread_mutex_t g_lock; \c static int g_pending_count;
One to receive pending requests.
requests queue +≡\c static pthread_cond_t g_requests_ready; \c static REQUEST *g_requests_head; \c static REQUEST *g_requests_tail;
Another to gather processed requests for processing in the main event loop.
results queue +≡\c static pthread_cond_t g_results_ready; \c static REQUEST *g_results_head; \c static REQUEST *g_results_tail;
These will be initialized on startup.
startup routine +≡\c void async_startup(void) { \c int i; \c pthread_mutex_init(&g_lock, NULL); \c pthread_cond_init(&g_requests_ready, NULL); \c pthread_mutex_init(&g_lock, NULL); \c pthread_cond_init(&g_results_ready, NULL); \c g_requests_head = 0; \c g_requests_tail = 0; \c g_results_head = 0; \c g_results_tail = 0; \c g_pending_count = 0; start all workers \c }
Requests will then be enqueued on demand.
enqueue a request +≡\c void async_request_enqueue(REQUEST *req) { \c pthread_mutex_lock(&g_lock); \c ++g_pending_count; \c if (g_requests_tail) { \c g_requests_tail->next = req; \c } else { \c g_requests_head = req; \c } \c g_requests_tail = req; \c req->next = 0; \c pthread_cond_signal(&g_requests_ready); \c pthread_mutex_unlock(&g_lock); \c }
c-function async-startup async_startup -- void
\c SHUTDOWN,
c-function async-shutdown async_shutdown -- void
\c case SHUTDOWN: \c free(req); \c return;
\c void async_shutdown(void) { \c REQUEST *req; \c int i; \c for (i = 0; i < WORKERS; ++i) { \c req = (REQUEST*) calloc(1, sizeof(REQUEST)); \c req->operation = SHUTDOWN; \c async_request_enqueue(req); \c } \c for (i = 0; i < WORKERS; ++i) { \c pthread_join(g_worker_pool[i], NULL); \c pthread_mutex_destroy(&g_lock); \c pthread_cond_destroy(&g_requests_ready); \c pthread_mutex_destroy(&g_lock); \c pthread_cond_destroy(&g_results_ready); \c } \c }
\c OPEN,
c-function async-open async_open a n n n a -- void
\c case OPEN: \c tmp = malloc(req->args[1].number + 1); \c assert(tmp); \c memcpy(tmp, req->args[0].pointer, req->args[1].number); \c tmp[req->args[1].number] = 0; \c req->result = open(tmp, req->args[2].number, req->args[3].number); \c free(tmp); \c break;
\c void async_open(char *path, int path_len, \c int oflag, int mode, void *callback) { \c REQUEST *req; \c req = (REQUEST*) calloc(1, sizeof(REQUEST)); \c assert(req); \c req->operation = OPEN; \c req->args[0].pointer = path; \c req->args[1].number = path_len; \c req->args[2].number = oflag; \c req->args[3].number = mode; \c req->callback = callback; \c async_request_enqueue(req); \c }
\c CLOSE,
c-function async-close async_close n a -- void
\c case CLOSE: \c req->result = close(req->args[0].number); \c break;
\c void async_close(int fd, void *callback) { \c REQUEST *req; \c req = (REQUEST*) calloc(1, sizeof(REQUEST)); \c assert(req); \c req->operation = CLOSE; \c req->args[0].number = fd; \c req->callback = callback; \c async_request_enqueue(req); \c }
\c READ,
c-function async-read async_read n a n a -- void
\c case READ: \c req->result = read(req->args[0].number, req->args[1].pointer, \c req->args[2].number); \c break;
\c void async_read(int fd, void *buf, int len, void *callback) { \c REQUEST *req; \c req = (REQUEST*) calloc(1, sizeof(REQUEST)); \c assert(req); \c req->operation = READ; \c req->args[0].number = fd; \c req->args[1].pointer = buf; \c req->args[2].number = len; \c req->callback = callback; \c async_request_enqueue(req); \c }
\c WRITE,
c-function async-write async_write n a n a -- void
\c case WRITE: \c req->result = write(req->args[0].number, req->args[1].pointer, \c req->args[2].number); \c break;
\c void async_write(int fd, void *buf, int len, void *callback) { \c REQUEST *req; \c req = (REQUEST*) calloc(1, sizeof(REQUEST)); \c assert(req); \c req->operation = WRITE; \c req->args[0].number = fd; \c req->args[1].pointer = buf; \c req->args[2].number = len; \c req->callback = callback; \c async_request_enqueue(req); \c }
\c SYSTEM,
c-function async-system async_system a n a -- void
\c case SYSTEM: \c tmp = malloc(req->args[1].number + 1); \c assert(tmp); \c memcpy(tmp, req->args[0].pointer, req->args[1].number); \c tmp[req->args[1].number] = 0; \c req->result = system(tmp); \c free(tmp); \c break;
\c void async_system(char *cmd, int cmd_len, void *callback) { \c REQUEST *req; \c req = (REQUEST*) calloc(1, sizeof(REQUEST)); \c assert(req); \c req->operation = SYSTEM; \c req->args[0].pointer = cmd; \c req->args[1].number = cmd_len; \c req->callback = callback; \c async_request_enqueue(req); \c }
\c void *Worker(void *arg) { \c REQUEST *req; \c char *tmp; \c \c for (;;) { get a pending request handle a request pass on result \c } \c }
\c pthread_mutex_lock(&g_lock); \c while (!g_requests_head) { \c pthread_cond_wait(&g_requests_ready, &g_lock); \c } \c req = g_requests_head; \c g_requests_head = req->next; \c if (!g_requests_head) { g_requests_tail = 0; } \c pthread_mutex_unlock(&g_lock);
\c switch (req->operation) { handle request types \c default: \c assert(0); \c break; \c }
\c pthread_mutex_lock(&g_lock); \c if (g_results_tail) { \c g_results_tail->next = req; \c } else { \c g_results_head = req; \c } \c g_results_tail = req; \c req->next = 0; \c pthread_cond_signal(&g_results_ready); \c pthread_mutex_unlock(&g_lock);
Waiting then occurs on the main thread.
implement waiting +≡\c void async_wait(int *result, void **callback) { \c REQUEST *req; \c pthread_mutex_lock(&g_lock); \c if (g_pending_count <= 0) { \c *result = 0; \c *callback = 0; \c pthread_mutex_unlock(&g_lock); \c return; \c } \c while (!g_results_head) { \c pthread_cond_wait(&g_results_ready, &g_lock); \c } \c req = g_results_head; \c g_results_head = req->next; \c if (!g_results_head) { g_results_tail = 0; } \c *result = req->result; \c *callback = req->callback; \c free(req); \c --g_pending_count; \c pthread_mutex_unlock(&g_lock); \c }
c-function async-wait async_wait a a -- void
variable result variable callback : async-run begin result callback async-wait callback @ 0= if exit then result @ callback @ invoke again ;
Some tests are in order.
general tests +≡: test1 s" ls -l out" [: 0= assert 1 s" Hello world!" [: drop cr ." And Done!" cr ;] async-write ;] async-system async-run ; test1
: write-whole-file ( data filename next -- ) >s >r >r >s >s r> r> O_CREAT O_TRUNC or O_WRONLY or rwx [: dup 0>= assert dup >r s> s> r> >s [: drop s> [: s> invoke ;] async-close ;] async-write ;] async-open sdrop sdrop sdrop ; : test2 s" Hello there!" s" out/test1.txt" [: ." Written file." cr ;] write-whole-file async-run ; test2
: special-adder dup 8 = if drop [: 256 ;] else >s [: s> + ;] sdrop then ; 5 4 special-adder invoke 9 assert= 5 8 special-adder invoke 256 assert=
: '[: postpone ['] postpone [: ; immediate : ;]' postpone ;] postpone swap postpone execute ; immediate
: test3 s" ls -l out" '[: async-system 0= assert 1 s" Sugar Test!" '[: async-write drop cr ." And Done with Sugar!" cr ;]' ;]' async-run ; test3
begin real procedure A(k, x1, x2, x3, x4, x5); value k; integer k; begin real procedure B; begin k := k - 1; B := A := A(k, B, x1, x2, x3, x4); end; if k <= 0 then A := x4 + x5 else B; end; outreal(A(10, 1, -1, -1, 1, 0)); end;
function A(k, x1, x2, x3, x4, x5) { function B() { return A(--k, B, x1, x2, x3, x4); } return k <= 0 ? x4() + x5() : B(); } function K(n) { function() { return n; } } alert(A(10, K(1), K(-1), K(-1), K(1), K(0)));
: A >s >s >s >s >s >s [: [ s> dup >s bind ] s> 1- swap s> s> s> s> recurse ;] s> s> >r >r sdrop sdrop sdrop s> 0<= if drop r> invoke r> invoke + else rdrop rdrop invoke then ; : K >s [: s> ;] sdrop ; 10 1 K -1 K 1 K 0 K A -67 assert=
Questions?