bokbok
Artifact [a9a68859a3]
Login

Artifact a9a68859a3c85883a05f9d27f991081e0ed35611:


(module
 bokbok
 (passphrase->key

  open-connection
  close-connection!
  request!

  remote-error?
  remote-error-message

  connection?
  connection-user
  connection-addr

  start-server
  stop-server!
  wait-until-server-stopped
  )

 (import chicken scheme)

 (use extras)
 (use data-structures)
 (use ports)
 (use srfi-1)
 (use srfi-4)
 (use srfi-13)
 (use srfi-18)
 (use srfi-69)

 (use matchable)
 (use tweetnacl)
 (use socket)
 (use srfi-27)
 (use moa) ;; from srfi-27
 (use entropy-unix) ;; from srfi-27

 ;; FIXME: Log handler exceptions - and call (get-call-chain) for details

 ;; FIXME: Try-again-later responses to requests, caused by a
 ;; configurable concurrent request limit or a special condition
 ;; type. CLient sleeps and retries (sleep time is listed in the
 ;; try-again-later response).

 ;; FIXME: TCP connection handshake to include option to redirect to
 ;; another server IP:port, controlled by the open handlers

 ;; open-connection gets all A records and tries until it gets in

 ;; FIXME: Accept a hostname for open-connection, and look it up

 ;; FIXME: Support sending creds over a unix-domain socket instead of a username.

 ;; FIXME: User data field in a connection

 ;; FIXME: Reduce timeouts to detect gone servers quickly

 ;; FIXME: Support IPv6

 ;; FIXME: Is the 60s receive timeout OK? Connections can't linger. Is
 ;; that a feature?

 ;; FIXME: Auto reconnect on request!

 ;; FIXME: Configurable maximum message size: client and server both
 ;; declare their intent in the initial handshake, and minimum of both
 ;; is used as the connection message size limit.

#; (define (debug . args)
  (let ((str (apply sprintf args)))
    (printf "DEBUG: ~a\n" str)))

 (define (debug . args)
   (void))

 (define *header* "BOKBOKBOK")

 (define (read-packet! i)
   (let ((l1 (read-byte i)))
     (if (eof-object? l1)
         l1
         (let ((l2 (read-byte i)))
           (if (eof-object? l2)
               l2
               (let ((l3 (read-byte i)))
                 (if (eof-object? l3)
                     l3
                     (let ((len (+ (* 65536 l1) (* 256 l2) l3)))
                       (let ((p (read-string len i)))
                         (if (= (string-length p) len)
                             p
                             (with-input-from-string "" read)))))))))))

 (define (write-packet-no-flush! o p)
   (let* ((len (string-length p))
          (l1 (remainder (quotient len 65536) 256))
          (l2 (remainder (quotient len 256) 256))
          (l3 (remainder len 256)))
     (when (> len 16777216)
           (error "Cannot send oversized packet, maximum is 16777216 bytes" len))

     (write-byte l1 o)
     (write-byte l2 o)
     (write-byte l3 o)
     (write-string p #f o)))

 (define (write-packet! o p)
   (write-packet-no-flush! o p)
   (debug "Flushing packets")
   (flush-output o))

 ;; Join a list of strings into a packet
 (define (join-packet ps)
   (let ((plen (length ps)))
     (when (> plen 255)
           (error "Can't fit more than 255 fields into a packet"))
     (with-output-to-string
       (lambda ()
        (write-byte plen)
        (for-each
         (lambda (p)
           (write-packet-no-flush! (current-output-port) p))
         ps)))))

 ;; Split a packet into a list of strings
 (define (split-packet p)
   (if (eof-object? p)
       p
       (with-input-from-string p
         (lambda ()
           (let ((plen (read-byte)))
             (if (eof-object? plen)
                 plen)
             (let loop
                 ((todo plen)
                  (result '()))
               (if (zero? todo)
                   (reverse! result)
                   (loop
                    (- todo 1)
                    (let ((p (read-packet! (current-input-port))))
                      (if (eof-object? p)
                          p
                          (cons p
                                result)))))))))))

 (define (passphrase->key key-string)
   (let* ((key-hash (hash key-string))
          (key (string->blob (substring/shared key-hash 0 symmetric-box-keybytes))))
     (cons
      (symmetric-box key)
      (symmetric-unbox key))))

 (define (bytes->key key-bytes)
   (let ((key (string->blob key-bytes)))
    (cons
     (symmetric-box key)
     (symmetric-unbox key))))

 (define get-random-bytes
   (let* ((entropy (make-entropy-source-urandom-device))
          (random (make-random-source-moa))
          (make-random-u8vector (random-source-make-u8vectors random))
          (mutex (make-mutex 'rpc-random-source)))
     (random-source-entropy-source-set! random entropy)
     (random-source-randomize! random entropy)
     (lambda (length)
       (mutex-lock! mutex)
       (let ((rb (make-random-u8vector length)))
         (mutex-unlock! mutex)
         rb))))

 (define (generate-session-key)
   (let* ((key-u8vector (get-random-bytes symmetric-box-keybytes))
          (key (u8vector->blob/shared key-u8vector)))
     (values
      (cons
       (symmetric-box key)
       (symmetric-unbox key))
      (blob->string key))))

 (define *nonce-counter* 0)
 (define *nonce-seed* (blob->string
                       (u8vector->blob/shared
                        (get-random-bytes symmetric-box-noncebytes))))

 (define (encrypt key plaintext)
   (set! *nonce-counter* (+ *nonce-counter* 1))
   (let* ((nonce-hash (hash (string-append (number->string *nonce-counter*)
                                           *nonce-seed*)))
          (nonce (substring/shared nonce-hash 0 symmetric-box-noncebytes))
          (nonce-u8vector (blob->u8vector/shared (string->blob nonce))))
     (string-append
      nonce
      ((car key) plaintext nonce-u8vector))))

 (define (decrypt key c)
   (assert (>= (string-length c) symmetric-box-noncebytes))
   (let* ((nonce (substring/shared c 0 symmetric-box-noncebytes))
          (c-wo-n (substring/shared c symmetric-box-noncebytes))
          (nonce-u8vector (blob->u8vector/shared (string->blob nonce)))
          (plaintext ((cdr key) c-wo-n nonce-u8vector)))
     (if plaintext
         plaintext
         (error "Invalid cyphertext"))))

 (define-record-type connection
   (make-connection* addr user
                    socket input output
                    key
                    mutex
                    thread
                    waiters
                    request-handler
                    close-handler
                    counter)
   connection?
   (addr connection-addr)
   (user connection-user)
   (socket connection-socket)
   (input connection-input)
   (output connection-output)
   (key connection-key)
   (mutex connection-mutex)
   (thread connection-thread)
   (waiters connection-waiters)
   (request-handler connection-request-handler)
   (close-handler connection-close-handler)
   (counter connection-counter (setter connection-counter)))

 (define (connection-send! con packet-parts)
   (let ((packet (if (connection-key con)
                     (encrypt (connection-key con) (join-packet packet-parts))
                     (join-packet packet-parts))))
     (debug "MUTEX: send")
     (mutex-lock! (connection-mutex con))
     (write-packet! (connection-output con) packet)
     (debug "MUTEX: !send")
     (mutex-unlock! (connection-mutex con))))

 (define (handle-request! con id body)
   (debug "Handling request id:~a ~s" id body)
   (let ((thread
          (make-thread
           (lambda ()
             (let ((response
                    (handle-exceptions
                     error
                     (list "err"
                           id
                           (cond
                            ((condition? error)
                             (sprintf
                              "~s in ~s"
                              ((condition-property-accessor 'exn 'message "Unknown error") error)
                              (cons ((condition-property-accessor 'exn 'location (void)) error)
                                    ((condition-property-accessor 'exn 'arguments '()) error))))
                            (else (->string error))))
                     (append
                      (list "ok" id)
                      ((connection-request-handler con)
                       con
                       body)))))
               (debug "Sending response id:~a ~s" id response)
               (connection-send! con response)))
           `(bokbok-request-thread ,(connection-addr con) ,id))))
     (thread-start! thread)))

 (define (handle-response! con id body)
   (debug "Handling response id:~a ~s" id body)
   (debug "MUTEX: handle-response")
   (mutex-lock! (connection-mutex con))
   (let ((waiter (hash-table-ref/default (connection-waiters con)
                                         id #f)))
     (debug "MUTEX: !handle-response")
     (mutex-unlock! (connection-mutex con))
     (if waiter
         (begin
           (set-cdr! waiter body)
           (mutex-unlock! (car waiter)))
         (debug "Discarding response to unknown request id:~a" id))))

 (define (handle-connection-thread!)
   (let* ((con (thread-specific (current-thread)))
          (session-key (connection-key con)))
     (debug "Session thread starting")
    (let loop ()
      (debug "Session thread waiting for packet")
      ;; We are the only thing that reads from connection-input, so need no mutex!
      (let ((raw-request (read-packet! (connection-input con))))
        (debug "Session thread got ~s" raw-request)
        (if (eof-object? raw-request)
            ;; Terminate loop
            ((connection-close-handler con) con)
            ;; Handle request and loop
            (let* ((request-bytes (if session-key
                                      (decrypt session-key raw-request)
                                      raw-request))
                   (request (split-packet request-bytes)))
              (match
               request
               (("req" id . body)
                (handle-request! con id body))
               (("ok" id . body)
                (handle-response! con id (cons 'ok body)))
               (("err" id error-string)
                (handle-response! con id (cons 'error error-string))))
              ;; Loop for next request
              (loop)))))))

 (define (make-connection addr user socket input output key request-handler close-handler)
   (let ((con
          (make-connection*
           addr user
           socket input output
           key
           (make-mutex `(bokbok-connection-mutex ,addr))
           (make-thread handle-connection-thread!
                        `(bokbok-connection-thread ,addr))
           (make-hash-table)
           request-handler
           close-handler
           0)))

     (thread-specific-set!
      (connection-thread con)
      con)
     (thread-start!
      (connection-thread con))
     con))

 (define (open-connection addr username key request-handler close-handler)
   (let ((s
          (match
           addr
           (('unix path)
            ;; UNIX domain
            (let ((s (socket af/unix sock/stream)))
              (socket-connect s (unix-address path))
              s))
           (('tcp host port)
            ;; TCP domain
            (let ((s (socket af/inet sock/stream))
                  (ai (filter
                       (lambda (ai)
                         (eq? (addrinfo-family ai) af/inet))
                       (address-information host port))))
              (socket-connect s (addrinfo-address (car ai)))
              (set! (tcp-no-delay? s) #t)
              s)))))
     (receive
      (input output)
      (parameterize
       ((socket-send-buffer-size 4096)
        (socket-send-size 16384))
       (socket-i/o-ports s))
      (if (and username key)
          (begin ;; Encrypted connection
            (receive
             (session-key session-key-bytes)
             (generate-session-key)
             (debug "Client writing username and session key")
             (write-packet-no-flush! output username)
             (write-packet! output (encrypt key session-key-bytes))
             (debug "Client waiting for header")
             (let ((header (read-string (string-length *header*) input)))
               (debug "Got header bytes: ~s" header)
               (if (string=? header *header*)
                   (make-connection addr username s input output session-key request-handler close-handler)
                   (error "Invalid hello from server" header)))))
          (begin ;; Plaintext connection
            (debug "Client waiting for header")
            (let ((header (read-string (string-length *header*) input)))
              (debug "Got header bytes: ~s" header)
              (if (string=? header *header*)
                  (make-connection addr #f s input output #f request-handler close-handler)
                  (error "Invalid hello from server" header))))))))

 (define (close-connection! con)
   ;; FIXME: Set a flag in the connection so we quietly discard any further responses from still-running handlers.
   ;; FIXME: Return an error to all pending waiters.
   (thread-terminate! (connection-thread con))
   (close-output-port (connection-output con))
   (close-input-port (connection-input con)))

 (define (request! con packet-parts)
   (debug "MUTEX: request!")
   (mutex-lock! (connection-mutex con))
   (let* ((id (number->string (connection-counter con)))
          (waiter (cons (make-mutex `(bokbok-request-mutex ,(connection-addr con) id))
                        #f)))

     ;; Mutex starts life locked by the connection thread
     (mutex-lock! (car waiter)
                  #f
                  (connection-thread con))

     (hash-table-set! (connection-waiters con) id waiter)
     (set! (connection-counter con)
           (+ (connection-counter con) 1))
     (debug "MUTEX: !request!")
     (mutex-unlock! (connection-mutex con))

     (connection-send! con (cons "req" (cons id packet-parts)))

     ;; Wait for response, when connection thread unlocks the mutex
     (mutex-lock! (car waiter))

     ;; Return response
     (match
      (cdr waiter)
      (('ok . body)
       body)
      (('error . error-string)
       (signal (make-property-condition 'bokbok-remote 'message error-string)))
      (else (error "Invalid response" (cdr waiter))))))

 (define remote-error?
   (condition-predicate 'bokbok-remote))

 (define remote-error-message
   (condition-property-accessor 'bokbok-remote 'message))
 
 (define (start-server bind-addr backlog user->key open-handler request-handler close-handler)
   (let* ((s (match
              bind-addr
              (('unix path)
                 ;; UNIX domain
               (let ((s (socket af/unix sock/stream)))
                 (socket-bind s (unix-address path))
                 (socket-listen s backlog)
                 s))
              (('tcp addr port)
               ;; TCP domain
               (let ((s (socket af/inet sock/stream)))
                 (socket-bind s (inet-address addr port))
                 (set! (so-reuse-address? s) #t)
                 (socket-listen s backlog)
                 s))
              (else (error "Unknown bind address ~s" bind-addr))))
          (tcp? (match bind-addr (('tcp . any) #t) (else #f)))
          (thread
           ;; FIXME: Refactor this monstrosity
           (make-thread
            (lambda ()
              (let loop ()
                (debug "Listener thread calling accept")
                (let* ((cs
                        (socket-accept s))
                       (handler-thread
                        (make-thread
                         (lambda ()
                           ;; Connection setup handler thread
                           ;; Hands over to connection thread when make-connection is called
                           (when tcp?
                                 (set! (tcp-no-delay? cs) #t))
                           (receive
                            (input output)
                            (parameterize
                             ((socket-send-buffer-size 4096)
                              (socket-send-size 16384))
                             (socket-i/o-ports cs))
                            (let* ((peer (socket-peer-name cs))
                                   (peer-addr (if tcp?
                                                  (list 'tcp (sockaddr-address peer) (sockaddr-port peer))
                                                  (list 'unix (sockaddr-path peer)))))
                              (debug "Handshake thread started for ~s" peer-addr)
                              (if user->key
                                  ;; Encrypted connection
                                  (let* ((user-name (read-packet! input))
                                         (encrypted-session-key (read-packet! input)))
                                    (if (and (not (eof-object? user-name))
                                             (not (eof-object? encrypted-session-key)))
                                        (let ((user-key (user->key user-name)))
                                          (if user-key
                                              (let* ((session-key-bytes (decrypt user-key encrypted-session-key))
                                                     (session-key (bytes->key session-key-bytes)))
                                                (if session-key
                                                    (begin
                                                      (debug "Writing header")
                                                      (write-string *header* #f output)
                                                      (flush-output output)
                                                      (open-handler
                                                       (make-connection
                                                        peer-addr user-name
                                                        cs input output
                                                        session-key
                                                        request-handler close-handler)))
                                                    (begin
                                                      (debug "Rejecting connection due to invalid session key")
                                                      (close-output-port output)
                                                      (close-input-port input))))
                                              (begin
                                                (debug "Rejecting connection due to unknown user")
                                                (close-output-port output)
                                                (close-input-port input))))
                                        (begin
                                          (debug "Rejecting connection due to EOF in session setup")
                                          (close-output-port output)
                                          (close-input-port input))))
                                  ;; Plaintext connection
                                  (begin
                                    (debug "Writing header")
                                    (write-string *header* #f output)
                                    (flush-output output)
                                    (open-handler
                                     (make-connection
                                      peer-addr #f
                                      cs input output
                                      #f
                                      request-handler close-handler))))))
                           cs)
                         `(bokbok-handshake-handler ,bind-addr))))
                  (debug "Listener thread starting handshake thread")
                  (thread-start! handler-thread)
                  (loop))))
            `(bokbok-listen-handler ,bind-addr))))
     (thread-specific-set! thread s)
     (thread-start! thread)
     thread))

 (define (stop-server! server)
   (thread-terminate! server))

 (define (wait-until-server-stopped server)
   (let ((s (thread-specific server)))
    (handle-exceptions
     exc
     (if (and
          (uncaught-exception? exc)
          (terminated-thread-exception? (uncaught-exception-reason exc)))
         (void)
         (abort exc))             ; unexpected exceptions are rethrown
     (debug "Waiting for listener thread death...")
     (thread-join! server)
     (void))
    (socket-close s))))