(module
bokbok
(passphrase->key
open-connection
close-connection!
request!
remote-error?
remote-error-message
connection?
connection-user
connection-addr
start-server
stop-server!
wait-until-server-stopped
)
(import chicken scheme)
(use extras)
(use data-structures)
(use ports)
(use srfi-1)
(use srfi-4)
(use srfi-13)
(use srfi-18)
(use srfi-69)
(use matchable)
(use tweetnacl)
(use socket)
(use srfi-27)
(use moa) ;; from srfi-27
(use entropy-unix) ;; from srfi-27
;; FIXME: Log handler exceptions - and call (get-call-chain) for details
;; FIXME: Try-again-later responses to requests, caused by a
;; configurable concurrent request limit or a special condition
;; type. CLient sleeps and retries (sleep time is listed in the
;; try-again-later response).
;; FIXME: TCP connection handshake to include option to redirect to
;; another server IP:port, controlled by the open handlers
;; open-connection gets all A records and tries until it gets in
;; FIXME: Accept a hostname for open-connection, and look it up
;; FIXME: Support sending creds over a unix-domain socket instead of a username.
;; FIXME: User data field in a connection
;; FIXME: Reduce timeouts to detect gone servers quickly
;; FIXME: Support IPv6
;; FIXME: Is the 60s receive timeout OK? Connections can't linger. Is
;; that a feature?
;; FIXME: Auto reconnect on request!
;; FIXME: Configurable maximum message size: client and server both
;; declare their intent in the initial handshake, and minimum of both
;; is used as the connection message size limit.
#; (define (debug . args)
(let ((str (apply sprintf args)))
(printf "DEBUG: ~a\n" str)))
(define (debug . args)
(void))
(define *header* "BOKBOKBOK")
(define (read-packet! i)
(let ((l1 (read-byte i)))
(if (eof-object? l1)
l1
(let ((l2 (read-byte i)))
(if (eof-object? l2)
l2
(let ((l3 (read-byte i)))
(if (eof-object? l3)
l3
(let ((len (+ (* 65536 l1) (* 256 l2) l3)))
(let ((p (read-string len i)))
(if (= (string-length p) len)
p
(with-input-from-string "" read)))))))))))
(define (write-packet-no-flush! o p)
(let* ((len (string-length p))
(l1 (remainder (quotient len 65536) 256))
(l2 (remainder (quotient len 256) 256))
(l3 (remainder len 256)))
(when (> len 16777216)
(error "Cannot send oversized packet, maximum is 16777216 bytes" len))
(write-byte l1 o)
(write-byte l2 o)
(write-byte l3 o)
(write-string p #f o)))
(define (write-packet! o p)
(write-packet-no-flush! o p)
(debug "Flushing packets")
(flush-output o))
;; Join a list of strings into a packet
(define (join-packet ps)
(let ((plen (length ps)))
(when (> plen 255)
(error "Can't fit more than 255 fields into a packet"))
(with-output-to-string
(lambda ()
(write-byte plen)
(for-each
(lambda (p)
(write-packet-no-flush! (current-output-port) p))
ps)))))
;; Split a packet into a list of strings
(define (split-packet p)
(if (eof-object? p)
p
(with-input-from-string p
(lambda ()
(let ((plen (read-byte)))
(if (eof-object? plen)
plen)
(let loop
((todo plen)
(result '()))
(if (zero? todo)
(reverse! result)
(loop
(- todo 1)
(let ((p (read-packet! (current-input-port))))
(if (eof-object? p)
p
(cons p
result)))))))))))
(define (passphrase->key key-string)
(let* ((key-hash (hash key-string))
(key (string->blob (substring/shared key-hash 0 symmetric-box-keybytes))))
(cons
(symmetric-box key)
(symmetric-unbox key))))
(define (bytes->key key-bytes)
(let ((key (string->blob key-bytes)))
(cons
(symmetric-box key)
(symmetric-unbox key))))
(define get-random-bytes
(let* ((entropy (make-entropy-source-urandom-device))
(random (make-random-source-moa))
(make-random-u8vector (random-source-make-u8vectors random))
(mutex (make-mutex 'rpc-random-source)))
(random-source-entropy-source-set! random entropy)
(random-source-randomize! random entropy)
(lambda (length)
(mutex-lock! mutex)
(let ((rb (make-random-u8vector length)))
(mutex-unlock! mutex)
rb))))
(define (generate-session-key)
(let* ((key-u8vector (get-random-bytes symmetric-box-keybytes))
(key (u8vector->blob/shared key-u8vector)))
(values
(cons
(symmetric-box key)
(symmetric-unbox key))
(blob->string key))))
(define *nonce-counter* 0)
(define *nonce-seed* (blob->string
(u8vector->blob/shared
(get-random-bytes symmetric-box-noncebytes))))
(define (encrypt key plaintext)
(set! *nonce-counter* (+ *nonce-counter* 1))
(let* ((nonce-hash (hash (string-append (number->string *nonce-counter*)
*nonce-seed*)))
(nonce (substring/shared nonce-hash 0 symmetric-box-noncebytes))
(nonce-u8vector (blob->u8vector/shared (string->blob nonce))))
(string-append
nonce
((car key) plaintext nonce-u8vector))))
(define (decrypt key c)
(assert (>= (string-length c) symmetric-box-noncebytes))
(let* ((nonce (substring/shared c 0 symmetric-box-noncebytes))
(c-wo-n (substring/shared c symmetric-box-noncebytes))
(nonce-u8vector (blob->u8vector/shared (string->blob nonce)))
(plaintext ((cdr key) c-wo-n nonce-u8vector)))
(if plaintext
plaintext
(error "Invalid cyphertext"))))
(define-record-type connection
(make-connection* addr user
socket input output
key
mutex
thread
waiters
request-handler
close-handler
counter)
connection?
(addr connection-addr)
(user connection-user)
(socket connection-socket)
(input connection-input)
(output connection-output)
(key connection-key)
(mutex connection-mutex)
(thread connection-thread)
(waiters connection-waiters)
(request-handler connection-request-handler)
(close-handler connection-close-handler)
(counter connection-counter (setter connection-counter)))
(define (connection-send! con packet-parts)
(let ((packet (if (connection-key con)
(encrypt (connection-key con) (join-packet packet-parts))
(join-packet packet-parts))))
(debug "MUTEX: send")
(mutex-lock! (connection-mutex con))
(write-packet! (connection-output con) packet)
(debug "MUTEX: !send")
(mutex-unlock! (connection-mutex con))))
(define (handle-request! con id body)
(debug "Handling request id:~a ~s" id body)
(let ((thread
(make-thread
(lambda ()
(let ((response
(handle-exceptions
error
(list "err"
id
(cond
((condition? error)
(sprintf
"~s in ~s"
((condition-property-accessor 'exn 'message "Unknown error") error)
(cons ((condition-property-accessor 'exn 'location (void)) error)
((condition-property-accessor 'exn 'arguments '()) error))))
(else (->string error))))
(append
(list "ok" id)
((connection-request-handler con)
con
body)))))
(debug "Sending response id:~a ~s" id response)
(connection-send! con response)))
`(bokbok-request-thread ,(connection-addr con) ,id))))
(thread-start! thread)))
(define (handle-response! con id body)
(debug "Handling response id:~a ~s" id body)
(debug "MUTEX: handle-response")
(mutex-lock! (connection-mutex con))
(let ((waiter (hash-table-ref/default (connection-waiters con)
id #f)))
(debug "MUTEX: !handle-response")
(mutex-unlock! (connection-mutex con))
(if waiter
(begin
(set-cdr! waiter body)
(mutex-unlock! (car waiter)))
(debug "Discarding response to unknown request id:~a" id))))
(define (handle-connection-thread!)
(let* ((con (thread-specific (current-thread)))
(session-key (connection-key con)))
(debug "Session thread starting")
(let loop ()
(debug "Session thread waiting for packet")
;; We are the only thing that reads from connection-input, so need no mutex!
(let ((raw-request (read-packet! (connection-input con))))
(debug "Session thread got ~s" raw-request)
(if (eof-object? raw-request)
;; Terminate loop
((connection-close-handler con) con)
;; Handle request and loop
(let* ((request-bytes (if session-key
(decrypt session-key raw-request)
raw-request))
(request (split-packet request-bytes)))
(match
request
(("req" id . body)
(handle-request! con id body))
(("ok" id . body)
(handle-response! con id (cons 'ok body)))
(("err" id error-string)
(handle-response! con id (cons 'error error-string))))
;; Loop for next request
(loop)))))))
(define (make-connection addr user socket input output key request-handler close-handler)
(let ((con
(make-connection*
addr user
socket input output
key
(make-mutex `(bokbok-connection-mutex ,addr))
(make-thread handle-connection-thread!
`(bokbok-connection-thread ,addr))
(make-hash-table)
request-handler
close-handler
0)))
(thread-specific-set!
(connection-thread con)
con)
(thread-start!
(connection-thread con))
con))
(define (open-connection addr username key request-handler close-handler)
(let ((s
(match
addr
(('unix path)
;; UNIX domain
(let ((s (socket af/unix sock/stream)))
(socket-connect s (unix-address path))
s))
(('tcp host port)
;; TCP domain
(let ((s (socket af/inet sock/stream))
(ai (filter
(lambda (ai)
(eq? (addrinfo-family ai) af/inet))
(address-information host port))))
(socket-connect s (addrinfo-address (car ai)))
(set! (tcp-no-delay? s) #t)
s)))))
(receive
(input output)
(parameterize
((socket-send-buffer-size 4096)
(socket-send-size 16384))
(socket-i/o-ports s))
(if (and username key)
(begin ;; Encrypted connection
(receive
(session-key session-key-bytes)
(generate-session-key)
(debug "Client writing username and session key")
(write-packet-no-flush! output username)
(write-packet! output (encrypt key session-key-bytes))
(debug "Client waiting for header")
(let ((header (read-string (string-length *header*) input)))
(debug "Got header bytes: ~s" header)
(if (string=? header *header*)
(make-connection addr username s input output session-key request-handler close-handler)
(error "Invalid hello from server" header)))))
(begin ;; Plaintext connection
(debug "Client waiting for header")
(let ((header (read-string (string-length *header*) input)))
(debug "Got header bytes: ~s" header)
(if (string=? header *header*)
(make-connection addr #f s input output #f request-handler close-handler)
(error "Invalid hello from server" header))))))))
(define (close-connection! con)
;; FIXME: Set a flag in the connection so we quietly discard any further responses from still-running handlers.
;; FIXME: Return an error to all pending waiters.
(thread-terminate! (connection-thread con))
(close-output-port (connection-output con))
(close-input-port (connection-input con)))
(define (request! con packet-parts)
(debug "MUTEX: request!")
(mutex-lock! (connection-mutex con))
(let* ((id (number->string (connection-counter con)))
(waiter (cons (make-mutex `(bokbok-request-mutex ,(connection-addr con) id))
#f)))
;; Mutex starts life locked by the connection thread
(mutex-lock! (car waiter)
#f
(connection-thread con))
(hash-table-set! (connection-waiters con) id waiter)
(set! (connection-counter con)
(+ (connection-counter con) 1))
(debug "MUTEX: !request!")
(mutex-unlock! (connection-mutex con))
(connection-send! con (cons "req" (cons id packet-parts)))
;; Wait for response, when connection thread unlocks the mutex
(mutex-lock! (car waiter))
;; Return response
(match
(cdr waiter)
(('ok . body)
body)
(('error . error-string)
(signal (make-property-condition 'bokbok-remote 'message error-string)))
(else (error "Invalid response" (cdr waiter))))))
(define remote-error?
(condition-predicate 'bokbok-remote))
(define remote-error-message
(condition-property-accessor 'bokbok-remote 'message))
(define (start-server bind-addr backlog user->key open-handler request-handler close-handler)
(let* ((s (match
bind-addr
(('unix path)
;; UNIX domain
(let ((s (socket af/unix sock/stream)))
(socket-bind s (unix-address path))
(socket-listen s backlog)
s))
(('tcp addr port)
;; TCP domain
(let ((s (socket af/inet sock/stream)))
(socket-bind s (inet-address addr port))
(set! (so-reuse-address? s) #t)
(socket-listen s backlog)
s))
(else (error "Unknown bind address ~s" bind-addr))))
(tcp? (match bind-addr (('tcp . any) #t) (else #f)))
(thread
;; FIXME: Refactor this monstrosity
(make-thread
(lambda ()
(let loop ()
(debug "Listener thread calling accept")
(let* ((cs
(socket-accept s))
(handler-thread
(make-thread
(lambda ()
;; Connection setup handler thread
;; Hands over to connection thread when make-connection is called
(when tcp?
(set! (tcp-no-delay? cs) #t))
(receive
(input output)
(parameterize
((socket-send-buffer-size 4096)
(socket-send-size 16384))
(socket-i/o-ports cs))
(let* ((peer (socket-peer-name cs))
(peer-addr (if tcp?
(list 'tcp (sockaddr-address peer) (sockaddr-port peer))
(list 'unix (sockaddr-path peer)))))
(debug "Handshake thread started for ~s" peer-addr)
(if user->key
;; Encrypted connection
(let* ((user-name (read-packet! input))
(encrypted-session-key (read-packet! input)))
(if (and (not (eof-object? user-name))
(not (eof-object? encrypted-session-key)))
(let ((user-key (user->key user-name)))
(if user-key
(let* ((session-key-bytes (decrypt user-key encrypted-session-key))
(session-key (bytes->key session-key-bytes)))
(if session-key
(begin
(debug "Writing header")
(write-string *header* #f output)
(flush-output output)
(open-handler
(make-connection
peer-addr user-name
cs input output
session-key
request-handler close-handler)))
(begin
(debug "Rejecting connection due to invalid session key")
(close-output-port output)
(close-input-port input))))
(begin
(debug "Rejecting connection due to unknown user")
(close-output-port output)
(close-input-port input))))
(begin
(debug "Rejecting connection due to EOF in session setup")
(close-output-port output)
(close-input-port input))))
;; Plaintext connection
(begin
(debug "Writing header")
(write-string *header* #f output)
(flush-output output)
(open-handler
(make-connection
peer-addr #f
cs input output
#f
request-handler close-handler))))))
cs)
`(bokbok-handshake-handler ,bind-addr))))
(debug "Listener thread starting handshake thread")
(thread-start! handler-thread)
(loop))))
`(bokbok-listen-handler ,bind-addr))))
(thread-specific-set! thread s)
(thread-start! thread)
thread))
(define (stop-server! server)
(thread-terminate! server))
(define (wait-until-server-stopped server)
(let ((s (thread-specific server)))
(handle-exceptions
exc
(if (and
(uncaught-exception? exc)
(terminated-thread-exception? (uncaught-exception-reason exc)))
(void)
(abort exc)) ; unexpected exceptions are rethrown
(debug "Waiting for listener thread death...")
(thread-join! server)
(void))
(socket-close s))))