Brad Nelson
August 25, 2012
In your browser, press ← and → to move through the slides, ↑ and ↓ jump to the beginning and end.
(On eBook readers, browse normally.)
function handleRequest(request, done) {
if (request.style == 1) {
getThing(request.name, function(result, err) {
getThing(result, function(result, err) {
done(result);
});
});
} else {
getThing('default', function(result, err) {
done(result);
});
}
}
from twisted.internet import reactor, defer def getSlowSquare(x): d = defer.Defered() reactor.callLater(100, d.callback, x * x) def printValue(x): print d d = getSlowSquare(4) d.addCallback(printValue) reactor.callLater(200, reactor.stop) reactor.run()
PROs:
CONs:
: func1
value A
[:
A ( Reference A above )
123 to A
;]
;
: func1
>s ( add to scope stack )
[:
s> ( pull out of scope )
;]
sdrop ( drop in the parent scope )
;
carnal knowledge scope stack scope flow control bind and invoke start and end scope
For gforth we know colon-sys is on the data stack and 4 cells:
carnal knowledge +≡4 constant colon-sys-size
Add a word to drop a colon-sys.
carnal knowledge +≡: colon-sys-drop ( colon-sys -- ) colon-sys-size 0 do drop loop ;
1000 cells constant scope-cells
: scope-alloc ( -- s) scope-cells allocate 0= assert
1 cells over ! ;
variable myscope
scope-alloc myscope !
Add some push / pop operations.
scope stack +≡: scope+! ( n -- ) cells myscope @ +! ; : scope-ptr ( -- n ) myscope @ @ myscope @ + ; : >s ( n -- ) scope-ptr ! 1 scope+! ; : s> ( -- n ) -1 scope+! scope-ptr @ ; : sdrop ( -- ) -1 scope+! ;
: scope. ( s -- ) ." scope(" dup @ cell / 1- . ." ) "
dup @ cell ?do dup i + @ . cell +loop drop cr ;
: scope-clone ( s -- s' )
scope-alloc dup >r scope-cells cmove r>
;
: scope-free ( s -- ) free 0= assert ;
Alternate version of :noname.
scope flow control +≡: :noname2 ( -- xt )
:noname colon-sys-drop ;
: bind ( xt -- closure )
>s myscope @ scope-clone s> drop
;
: invoke ( closure -- )
myscope @ >r ( leak ) scope-clone myscope !
s> execute
myscope @ scope-free r> myscope !
;
We want:
: foo a b c [: x y z ;] d e f ;
: [: postpone ahead postpone exit
postpone [ :noname2 >s ; immediate
: ;] postpone exit postpone then
s> postpone literal postpone bind ; immediate
: scope-test 1 [: 2 [: 3 ;] 4 ;] 5 ; scope-test 5 assert= invoke 4 assert= invoke 3 assert= 2 assert= 1 assert=
: test-adder >s [: s> + ;] ; 5 4 test-adder invoke 9 assert=
\c #include <assert.h> \c #include <fcntl.h> \c #include <pthread.h> \c #include <stdio.h> \c #include <stdlib.h> \c #include <string.h> \c #include <unistd.h>
Some standard constants will be brought over from C.
relevant constants +≡\c #define DEFINT(name) int name##_int(void) { return name; }
\c DEFINT(O_CREAT)
\c DEFINT(O_TRUNC)
\c DEFINT(O_WRONLY)
\c DEFINT(O_RDONLY)
c-function O_CREAT O_CREAT_int -- n
c-function O_TRUNC O_TRUNC_int -- n
c-function O_WRONLY O_WRONLY_int -- n
c-function O_RDONLY O_RDONLY_int -- n
Declare full octal permissions:
relevant constants +≡: octal 8 base ! ; octal 777 constant rwx decimal
We will for now assume a fixed number of workers.
worker count +≡\c #define WORKERS 10 \c static pthread_t g_worker_pool[WORKERS];
The workers are started when the system is initialized.
start all workers +≡\c for (i = 0; i < WORKERS; ++i) {
\c if (pthread_create(&g_worker_pool[i], NULL, Worker, NULL)) {
\c assert(0);
\c }
\c }
\c typedef union {
\c int number;
\c void *pointer;
\c } VARIANT;
\c
\c typedef struct _REQUEST {
\c struct _REQUEST *next;
\c enum {
event types
\c } operation;
\c VARIANT args[4];
\c void *callback;
\c int result;
\c } REQUEST;
Two queues are involved in the system. Guarded by a single lock so that a single pending count for the complete pipeline can be kept.
lock and count +≡\c static pthread_mutex_t g_lock; \c static int g_pending_count;
One to receive pending requests.
requests queue +≡\c static pthread_cond_t g_requests_ready; \c static REQUEST *g_requests_head; \c static REQUEST *g_requests_tail;
Another to gather processed requests for processing in the main event loop.
results queue +≡\c static pthread_cond_t g_results_ready; \c static REQUEST *g_results_head; \c static REQUEST *g_results_tail;
These will be initialized on startup.
startup routine +≡\c void async_startup(void) {
\c int i;
\c pthread_mutex_init(&g_lock, NULL);
\c pthread_cond_init(&g_requests_ready, NULL);
\c pthread_mutex_init(&g_lock, NULL);
\c pthread_cond_init(&g_results_ready, NULL);
\c g_requests_head = 0;
\c g_requests_tail = 0;
\c g_results_head = 0;
\c g_results_tail = 0;
\c g_pending_count = 0;
start all workers
\c }
Requests will then be enqueued on demand.
enqueue a request +≡\c void async_request_enqueue(REQUEST *req) {
\c pthread_mutex_lock(&g_lock);
\c ++g_pending_count;
\c if (g_requests_tail) {
\c g_requests_tail->next = req;
\c } else {
\c g_requests_head = req;
\c }
\c g_requests_tail = req;
\c req->next = 0;
\c pthread_cond_signal(&g_requests_ready);
\c pthread_mutex_unlock(&g_lock);
\c }
c-function async-startup async_startup -- void
\c SHUTDOWN,
c-function async-shutdown async_shutdown -- void
\c case SHUTDOWN: \c free(req); \c return;
\c void async_shutdown(void) {
\c REQUEST *req;
\c int i;
\c for (i = 0; i < WORKERS; ++i) {
\c req = (REQUEST*) calloc(1, sizeof(REQUEST));
\c req->operation = SHUTDOWN;
\c async_request_enqueue(req);
\c }
\c for (i = 0; i < WORKERS; ++i) {
\c pthread_join(g_worker_pool[i], NULL);
\c pthread_mutex_destroy(&g_lock);
\c pthread_cond_destroy(&g_requests_ready);
\c pthread_mutex_destroy(&g_lock);
\c pthread_cond_destroy(&g_results_ready);
\c }
\c }
\c OPEN,
c-function async-open async_open a n n n a -- void
\c case OPEN: \c tmp = malloc(req->args[1].number + 1); \c assert(tmp); \c memcpy(tmp, req->args[0].pointer, req->args[1].number); \c tmp[req->args[1].number] = 0; \c req->result = open(tmp, req->args[2].number, req->args[3].number); \c free(tmp); \c break;
\c void async_open(char *path, int path_len,
\c int oflag, int mode, void *callback) {
\c REQUEST *req;
\c req = (REQUEST*) calloc(1, sizeof(REQUEST));
\c assert(req);
\c req->operation = OPEN;
\c req->args[0].pointer = path;
\c req->args[1].number = path_len;
\c req->args[2].number = oflag;
\c req->args[3].number = mode;
\c req->callback = callback;
\c async_request_enqueue(req);
\c }
\c CLOSE,
c-function async-close async_close n a -- void
\c case CLOSE: \c req->result = close(req->args[0].number); \c break;
\c void async_close(int fd, void *callback) {
\c REQUEST *req;
\c req = (REQUEST*) calloc(1, sizeof(REQUEST));
\c assert(req);
\c req->operation = CLOSE;
\c req->args[0].number = fd;
\c req->callback = callback;
\c async_request_enqueue(req);
\c }
\c READ,
c-function async-read async_read n a n a -- void
\c case READ: \c req->result = read(req->args[0].number, req->args[1].pointer, \c req->args[2].number); \c break;
\c void async_read(int fd, void *buf, int len, void *callback) {
\c REQUEST *req;
\c req = (REQUEST*) calloc(1, sizeof(REQUEST));
\c assert(req);
\c req->operation = READ;
\c req->args[0].number = fd;
\c req->args[1].pointer = buf;
\c req->args[2].number = len;
\c req->callback = callback;
\c async_request_enqueue(req);
\c }
\c WRITE,
c-function async-write async_write n a n a -- void
\c case WRITE: \c req->result = write(req->args[0].number, req->args[1].pointer, \c req->args[2].number); \c break;
\c void async_write(int fd, void *buf, int len, void *callback) {
\c REQUEST *req;
\c req = (REQUEST*) calloc(1, sizeof(REQUEST));
\c assert(req);
\c req->operation = WRITE;
\c req->args[0].number = fd;
\c req->args[1].pointer = buf;
\c req->args[2].number = len;
\c req->callback = callback;
\c async_request_enqueue(req);
\c }
\c SYSTEM,
c-function async-system async_system a n a -- void
\c case SYSTEM: \c tmp = malloc(req->args[1].number + 1); \c assert(tmp); \c memcpy(tmp, req->args[0].pointer, req->args[1].number); \c tmp[req->args[1].number] = 0; \c req->result = system(tmp); \c free(tmp); \c break;
\c void async_system(char *cmd, int cmd_len, void *callback) {
\c REQUEST *req;
\c req = (REQUEST*) calloc(1, sizeof(REQUEST));
\c assert(req);
\c req->operation = SYSTEM;
\c req->args[0].pointer = cmd;
\c req->args[1].number = cmd_len;
\c req->callback = callback;
\c async_request_enqueue(req);
\c }
\c void *Worker(void *arg) {
\c REQUEST *req;
\c char *tmp;
\c
\c for (;;) {
get a pending request
handle a request
pass on result
\c }
\c }
\c pthread_mutex_lock(&g_lock);
\c while (!g_requests_head) {
\c pthread_cond_wait(&g_requests_ready, &g_lock);
\c }
\c req = g_requests_head;
\c g_requests_head = req->next;
\c if (!g_requests_head) { g_requests_tail = 0; }
\c pthread_mutex_unlock(&g_lock);
\c switch (req->operation) {
handle request types
\c default:
\c assert(0);
\c break;
\c }
\c pthread_mutex_lock(&g_lock);
\c if (g_results_tail) {
\c g_results_tail->next = req;
\c } else {
\c g_results_head = req;
\c }
\c g_results_tail = req;
\c req->next = 0;
\c pthread_cond_signal(&g_results_ready);
\c pthread_mutex_unlock(&g_lock);
Waiting then occurs on the main thread.
implement waiting +≡\c void async_wait(int *result, void **callback) {
\c REQUEST *req;
\c pthread_mutex_lock(&g_lock);
\c if (g_pending_count <= 0) {
\c *result = 0;
\c *callback = 0;
\c pthread_mutex_unlock(&g_lock);
\c return;
\c }
\c while (!g_results_head) {
\c pthread_cond_wait(&g_results_ready, &g_lock);
\c }
\c req = g_results_head;
\c g_results_head = req->next;
\c if (!g_results_head) { g_results_tail = 0; }
\c *result = req->result;
\c *callback = req->callback;
\c free(req);
\c --g_pending_count;
\c pthread_mutex_unlock(&g_lock);
\c }
c-function async-wait async_wait a a -- void
variable result
variable callback
: async-run
begin
result callback async-wait
callback @ 0= if exit then
result @ callback @ invoke
again
;
Some tests are in order.
general tests +≡: test1
s" ls -l out" [:
0= assert
1 s" Hello world!" [:
drop cr ." And Done!" cr
;] async-write
;] async-system
async-run
;
test1
: write-whole-file ( data filename next -- )
>s >r >r >s >s r> r>
O_CREAT O_TRUNC or O_WRONLY or rwx [:
dup 0>= assert
dup >r s> s> r> >s [:
drop s> [: s> invoke ;] async-close
;] async-write
;] async-open
sdrop sdrop sdrop
;
: test2
s" Hello there!" s" out/test1.txt" [:
." Written file." cr
;] write-whole-file
async-run
;
test2
: special-adder dup 8 = if
drop [: 256 ;]
else
>s [: s> + ;] sdrop
then
;
5 4 special-adder invoke 9 assert=
5 8 special-adder invoke 256 assert=
: '[: postpone ['] postpone [: ; immediate : ;]' postpone ;] postpone swap postpone execute ; immediate
: test3
s" ls -l out" '[: async-system
0= assert
1 s" Sugar Test!" '[: async-write
drop cr ." And Done with Sugar!" cr
;]'
;]'
async-run
;
test3
begin
real procedure A(k, x1, x2, x3, x4, x5);
value k; integer k;
begin
real procedure B;
begin k := k - 1;
B := A := A(k, B, x1, x2, x3, x4);
end;
if k <= 0 then A := x4 + x5 else B;
end;
outreal(A(10, 1, -1, -1, 1, 0));
end;
function A(k, x1, x2, x3, x4, x5) {
function B() { return A(--k, B, x1, x2, x3, x4); }
return k <= 0 ? x4() + x5() : B();
}
function K(n) { function() { return n; } }
alert(A(10, K(1), K(-1), K(-1), K(1), K(0)));
: A
>s >s >s >s >s >s
[: [ s> dup >s bind ] s> 1- swap s> s> s> s> recurse ;]
s> s> >r >r sdrop sdrop sdrop
s> 0<= if
drop
r> invoke r> invoke +
else
rdrop rdrop
invoke
then
;
: K >s [: s> ;] sdrop ;
10 1 K -1 K 1 K 0 K A
-67 assert=
Questions?