Event Driven Programming in Forth

Brad Nelson

August 25, 2012

In your browser, press ← and → to move through the slides, ↑ and ↓ jump to the beginning and end.

(On eBook readers, browse normally.)

Overview

Event Driven Programming

Node.js

Node.js (example)

function handleRequest(request, done) {
  if (request.style == 1) {
    getThing(request.name, function(result, err) {
      getThing(result, function(result, err) {
        done(result);
      });
    });
  } else {
    getThing('default', function(result, err) {
      done(result);
    });
  }
}

Twisted

Twisted (example)

from twisted.internet import reactor, defer
def getSlowSquare(x):
  d = defer.Defered()
  reactor.callLater(100, d.callback, x * x)
def printValue(x):
  print d
d = getSlowSquare(4)
d.addCallback(printValue)
reactor.callLater(200, reactor.stop)
reactor.run()

Traditional Forth Approach

PROs:

CONs:

Closures (Plan A)

: func1
   value A
   [:
     A ( Reference A above )
     123 to A
   ;]
;

Closures (Plan B)

: func1
   >s ( add to scope stack )
   [:
     s> ( pull out of scope )
   ;]
   sdrop ( drop in the parent scope )
;

Closures Implementation

closures +≡
carnal knowledge
scope stack
scope flow control
bind and invoke
start and end scope

Carnal Knowledge

For gforth we know colon-sys is on the data stack and 4 cells:

carnal knowledge +≡
4 constant colon-sys-size

Add a word to drop a colon-sys.

carnal knowledge +≡
: colon-sys-drop ( colon-sys -- ) colon-sys-size 0 do drop loop ;

Scope Stack

Scope Stack (implementation)

scope stack +≡
1000 cells constant scope-cells
: scope-alloc ( -- s) scope-cells allocate 0= assert
              1 cells over ! ;
variable myscope
scope-alloc myscope !

Add some push / pop operations.

scope stack +≡
: scope+! ( n -- ) cells myscope @ +! ;
: scope-ptr ( -- n ) myscope @ @ myscope @ + ;
: >s ( n -- ) scope-ptr !  1 scope+! ;
: s> ( -- n ) -1 scope+!  scope-ptr @ ;
: sdrop ( -- ) -1 scope+! ;

Printing a Scope

scope stack +≡
: scope. ( s -- ) ." scope(" dup @ cell / 1- . ." ) "
    dup @ cell ?do dup i + @ . cell +loop drop cr ;

Cloning Scopes and Freeing

scope stack +≡
: scope-clone ( s -- s' )
    scope-alloc dup >r scope-cells cmove r>
;
: scope-free ( s -- ) free 0= assert ;

:noname

Alternate version of :noname.

scope flow control +≡
: :noname2 ( -- xt )
    :noname colon-sys-drop ;

Bind and Invoke

bind and invoke +≡
: bind ( xt -- closure )
    >s myscope @ scope-clone s> drop
;
: invoke ( closure -- )
    myscope @ >r ( leak ) scope-clone myscope !
    s> execute
    myscope @ scope-free r> myscope !
;

Start and End Scope

We want:

: foo a b c [: x y z ;] d e f ;

start and end scope +≡
: [:   postpone ahead postpone exit
       postpone [ :noname2 >s ; immediate
: ;]   postpone exit postpone then
       s> postpone literal postpone bind ; immediate

Try Out Closures

general tests +≡
: scope-test 1 [: 2 [: 3 ;] 4 ;] 5 ;
scope-test 5 assert=
invoke 4 assert=
invoke 3 assert=
2 assert=
1 assert=

Closures with Scope

general tests +≡
: test-adder >s [: s> + ;] ;
5 4 test-adder invoke 9 assert=

Asynchronous I/O

Required Headers

required headers +≡
\c #include <assert.h>
\c #include <fcntl.h>
\c #include <pthread.h>
\c #include <stdio.h>
\c #include <stdlib.h>
\c #include <string.h>
\c #include <unistd.h>

Standard Constants

Some standard constants will be brought over from C.

relevant constants +≡
\c #define DEFINT(name) int name##_int(void) { return name; }
\c DEFINT(O_CREAT)
\c DEFINT(O_TRUNC)
\c DEFINT(O_WRONLY)
\c DEFINT(O_RDONLY)
c-function O_CREAT O_CREAT_int -- n
c-function O_TRUNC O_TRUNC_int -- n
c-function O_WRONLY O_WRONLY_int -- n
c-function O_RDONLY O_RDONLY_int -- n

File Permissions

Declare full octal permissions:

relevant constants +≡
: octal 8 base ! ;
octal
777 constant rwx
decimal

Workers

We will for now assume a fixed number of workers.

worker count +≡
\c #define WORKERS 10
\c static pthread_t g_worker_pool[WORKERS];

Worker Startup

The workers are started when the system is initialized.

start all workers +≡
\c   for (i = 0; i < WORKERS; ++i) {
\c     if (pthread_create(&g_worker_pool[i], NULL, Worker, NULL)) {
\c       assert(0);
\c     }
\c   }

Requests

request structure +≡
\c typedef union {
\c   int number;
\c   void *pointer;
\c } VARIANT;
\c
\c typedef struct _REQUEST {
\c   struct _REQUEST *next;
\c   enum {
       event types
\c   } operation;
\c   VARIANT args[4];
\c   void *callback;
\c   int result;
\c } REQUEST;

Queues

Two queues are involved in the system. Guarded by a single lock so that a single pending count for the complete pipeline can be kept.

lock and count +≡
\c static pthread_mutex_t g_lock;
\c static int g_pending_count;

Request Queue

One to receive pending requests.

requests queue +≡
\c static pthread_cond_t g_requests_ready;
\c static REQUEST *g_requests_head;
\c static REQUEST *g_requests_tail;

Result Queue

Another to gather processed requests for processing in the main event loop.

results queue +≡
\c static pthread_cond_t g_results_ready;
\c static REQUEST *g_results_head;
\c static REQUEST *g_results_tail;

Queue Setup

These will be initialized on startup.

startup routine +≡
\c void async_startup(void) {
\c   int i;
\c   pthread_mutex_init(&g_lock, NULL);
\c   pthread_cond_init(&g_requests_ready, NULL);
\c   pthread_mutex_init(&g_lock, NULL);
\c   pthread_cond_init(&g_results_ready, NULL);
\c   g_requests_head = 0;
\c   g_requests_tail = 0;
\c   g_results_head = 0;
\c   g_results_tail = 0;
\c   g_pending_count = 0;
start all workers
\c }

Enqueue Requests

Requests will then be enqueued on demand.

enqueue a request +≡
\c void async_request_enqueue(REQUEST *req) {
\c   pthread_mutex_lock(&g_lock);
\c   ++g_pending_count;
\c   if (g_requests_tail) {
\c     g_requests_tail->next = req;
\c   } else {
\c     g_requests_head = req;
\c   }
\c   g_requests_tail = req;
\c   req->next = 0;
\c   pthread_cond_signal(&g_requests_ready);
\c   pthread_mutex_unlock(&g_lock);
\c }

forth to c declarations +≡
c-function async-startup async_startup -- void

Shutdown

event types +≡
\c     SHUTDOWN,

forth to c declarations +≡
c-function async-shutdown async_shutdown -- void

handle request types +≡
\c     case SHUTDOWN:
\c       free(req);
\c       return;

issue requests +≡
\c void async_shutdown(void) {
\c   REQUEST *req;
\c   int i;
\c   for (i = 0; i < WORKERS; ++i) {
\c     req = (REQUEST*) calloc(1, sizeof(REQUEST));
\c     req->operation = SHUTDOWN;
\c     async_request_enqueue(req);
\c   }
\c   for (i = 0; i < WORKERS; ++i) {
\c     pthread_join(g_worker_pool[i], NULL);
\c     pthread_mutex_destroy(&g_lock);
\c     pthread_cond_destroy(&g_requests_ready);
\c     pthread_mutex_destroy(&g_lock);
\c     pthread_cond_destroy(&g_results_ready);
\c   }
\c }

Open

event types +≡
\c     OPEN,

forth to c declarations +≡
c-function async-open async_open a n n n a -- void

handle request types +≡
\c     case OPEN:
\c       tmp = malloc(req->args[1].number + 1);
\c       assert(tmp);
\c       memcpy(tmp, req->args[0].pointer, req->args[1].number);
\c       tmp[req->args[1].number] = 0;
\c       req->result = open(tmp, req->args[2].number, req->args[3].number);
\c       free(tmp);
\c       break;

issue requests +≡
\c void async_open(char *path, int path_len,
\c                 int oflag, int mode, void *callback) {
\c   REQUEST *req;
\c   req = (REQUEST*) calloc(1, sizeof(REQUEST));
\c   assert(req);
\c   req->operation = OPEN;
\c   req->args[0].pointer = path;
\c   req->args[1].number = path_len;
\c   req->args[2].number = oflag;
\c   req->args[3].number = mode;
\c   req->callback = callback;
\c   async_request_enqueue(req);
\c }

Close

event types +≡
\c     CLOSE,

forth to c declarations +≡
c-function async-close async_close n a -- void

handle request types +≡
\c     case CLOSE:
\c       req->result = close(req->args[0].number);
\c       break;

issue requests +≡
\c void async_close(int fd, void *callback) {
\c   REQUEST *req;
\c   req = (REQUEST*) calloc(1, sizeof(REQUEST));
\c   assert(req);
\c   req->operation = CLOSE;
\c   req->args[0].number = fd;
\c   req->callback = callback;
\c   async_request_enqueue(req);
\c }

Read

event types +≡
\c     READ,

forth to c declarations +≡
c-function async-read async_read n a n a -- void

handle request types +≡
\c     case READ:
\c       req->result = read(req->args[0].number, req->args[1].pointer,
\c                          req->args[2].number);
\c       break;

issue requests +≡
\c void async_read(int fd, void *buf, int len, void *callback) {
\c   REQUEST *req;
\c   req = (REQUEST*) calloc(1, sizeof(REQUEST));
\c   assert(req);
\c   req->operation = READ;
\c   req->args[0].number = fd;
\c   req->args[1].pointer = buf;
\c   req->args[2].number = len;
\c   req->callback = callback;
\c   async_request_enqueue(req);
\c }

Write

event types +≡
\c     WRITE,

forth to c declarations +≡
c-function async-write async_write n a n a -- void

handle request types +≡
\c     case WRITE:
\c       req->result = write(req->args[0].number, req->args[1].pointer,
\c                           req->args[2].number);
\c       break;

issue requests +≡
\c void async_write(int fd, void *buf, int len, void *callback) {
\c   REQUEST *req;
\c   req = (REQUEST*) calloc(1, sizeof(REQUEST));
\c   assert(req);
\c   req->operation = WRITE;
\c   req->args[0].number = fd;
\c   req->args[1].pointer = buf;
\c   req->args[2].number = len;
\c   req->callback = callback;
\c   async_request_enqueue(req);
\c }

System

event types +≡
\c     SYSTEM,

forth to c declarations +≡
c-function async-system async_system a n a -- void

handle request types +≡
\c     case SYSTEM:
\c       tmp = malloc(req->args[1].number + 1);
\c       assert(tmp);
\c       memcpy(tmp, req->args[0].pointer, req->args[1].number);
\c       tmp[req->args[1].number] = 0;
\c       req->result = system(tmp);
\c       free(tmp);
\c       break;

issue requests +≡
\c void async_system(char *cmd, int cmd_len, void *callback) {
\c   REQUEST *req;
\c   req = (REQUEST*) calloc(1, sizeof(REQUEST));
\c   assert(req);
\c   req->operation = SYSTEM;
\c   req->args[0].pointer = cmd;
\c   req->args[1].number = cmd_len;
\c   req->callback = callback;
\c   async_request_enqueue(req);
\c }

Worker Implementation

worker implementation +≡
\c void *Worker(void *arg) {
\c   REQUEST *req;
\c   char *tmp;
\c
\c   for (;;) {
       get a pending request
       handle a request
       pass on result
\c   }
\c }

Get a Pending Request

get a pending request +≡
\c     pthread_mutex_lock(&g_lock);
\c     while (!g_requests_head) {
\c       pthread_cond_wait(&g_requests_ready, &g_lock);
\c     }
\c     req = g_requests_head;
\c     g_requests_head = req->next;
\c     if (!g_requests_head) { g_requests_tail = 0; }
\c     pthread_mutex_unlock(&g_lock);

Handle a Result

handle a request +≡
\c     switch (req->operation) {
       handle request types
\c     default:
\c       assert(0);
\c       break;
\c     }

Pass on Result

pass on result +≡
\c     pthread_mutex_lock(&g_lock);
\c     if (g_results_tail) {
\c       g_results_tail->next = req;
\c     } else {
\c       g_results_head = req;
\c     }
\c     g_results_tail = req;
\c     req->next = 0;
\c     pthread_cond_signal(&g_results_ready);
\c     pthread_mutex_unlock(&g_lock);

Waiting for Results

Waiting then occurs on the main thread.

implement waiting +≡
\c void async_wait(int *result, void **callback) {
\c   REQUEST *req;
\c   pthread_mutex_lock(&g_lock);
\c   if (g_pending_count <= 0) {
\c     *result = 0;
\c     *callback = 0;
\c     pthread_mutex_unlock(&g_lock);
\c     return;
\c   }
\c   while (!g_results_head) {
\c     pthread_cond_wait(&g_results_ready, &g_lock);
\c   }
\c   req = g_results_head;
\c   g_results_head = req->next;
\c   if (!g_results_head) { g_results_tail = 0; }
\c   *result = req->result;
\c   *callback = req->callback;
\c   free(req);
\c   --g_pending_count;
\c   pthread_mutex_unlock(&g_lock);
\c }

forth to c declarations +≡
c-function async-wait async_wait a a -- void

Run Loop

dispatch events +≡
variable result
variable callback
: async-run
  begin
    result callback async-wait
    callback @ 0= if exit then
    result @ callback @ invoke
  again
;

Putting it together

Some tests are in order.

general tests +≡
: test1
    s" ls -l out" [:
      0= assert
      1 s" Hello world!" [:
        drop cr ." And Done!" cr
      ;] async-write
    ;] async-system
    async-run
;
test1

Using Scope

general tests +≡
: write-whole-file ( data filename next -- )
    >s >r >r >s >s r> r>
    O_CREAT O_TRUNC or O_WRONLY or rwx [:
      dup 0>= assert
      dup >r s> s> r> >s [:
        drop s> [: s> invoke ;] async-close
      ;] async-write
    ;] async-open
    sdrop sdrop sdrop
;
: test2
    s" Hello there!" s" out/test1.txt" [:
      ." Written file." cr
    ;] write-whole-file 
    async-run
;
test2

Closures with Conditions

general tests +≡
: special-adder dup 8 = if
     drop [: 256 ;]
   else
     >s [: s> + ;] sdrop
   then
;
5 4 special-adder invoke 9 assert=
5 8 special-adder invoke 256 assert=

Sugar

add sugar +≡
: '[:   postpone ['] postpone [: ; immediate
: ;]'   postpone ;] postpone swap postpone execute ; immediate

Test Sugar

general tests +≡
: test3
    s" ls -l out" '[: async-system
      0= assert
      1 s" Sugar Test!" '[: async-write
        drop cr ." And Done with Sugar!" cr
      ;]'
    ;]'
    async-run
;
test3

Man or Boy Test (Alogol)

begin
  real procedure A(k, x1, x2, x3, x4, x5);
  value k; integer k;
  begin
    real procedure B;
    begin k := k - 1;
          B := A := A(k, B, x1, x2, x3, x4);
    end;
    if k <= 0 then A := x4 + x5 else B;
  end;
  outreal(A(10, 1, -1, -1, 1, 0));
end;

Man or Boy Test (Javascript)

function A(k, x1, x2, x3, x4, x5) { 
    function B() { return A(--k, B, x1, x2, x3, x4); }
    return k <= 0 ? x4() + x5() : B();
}
function K(n) { function() { return n; } }
alert(A(10, K(1), K(-1), K(-1), K(1), K(0)));

Man or Boy Test (Forth)

: A
    >s >s >s >s >s >s
    [: [ s> dup >s bind ] s> 1- swap s> s> s> s> recurse ;]
    s> s> >r >r sdrop sdrop sdrop
    s> 0<= if
      drop
      r> invoke r> invoke +
    else
      rdrop rdrop
      invoke
    then
;
: K >s [: s> ;] sdrop ;

10 1 K -1 K 1 K 0 K A
-67 assert=

Future Directions

Question?

Questions?