Ugarit
Check-in [74f9480c6a]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:[f1f2ce8cdc] Started work on cluster backend (because I need it...)
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | alaricsp
Files: files | file ages | folders
SHA1: 74f9480c6ab197cdc8de46378806f5f42a1cdc61
User & Date: alaric 2015-07-31 21:37:27
Context
2015-09-26
10:30
fprintf had absorbed flush-output! Oops! check-in: 485a7f1e1a user: alaric tags: alaricsp
2015-07-31
21:37
[f1f2ce8cdc] Started work on cluster backend (because I need it...) check-in: 74f9480c6a user: alaric tags: alaricsp
21:33
Fixed "ugarit cat", and improved resilience of splitlog's reindex! admin command. check-in: eef24a12c0 user: alaric tags: alaricsp
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Added backend-cluster.scm.























































































































































































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
(use ugarit-backend)
(use sql-de-lite)
(use matchable)
(use miscmacros)
(use srfi-9)

;; Clustered storage backend, as per https://www.kitten-technologies.co.uk/project/ugarit/tktview?name=f1f2ce8cdc

(define cache-sql-schema
  (list
   "CREATE TABLE shards (shard_id INTEGER PRIMARY KEY NOT NULL, storage TEXT NOT NULL, trust INTEGER NOT NULL, write_weight INTEGER NOT NULL, read_priority INTEGER NOT NULL, read_weight INTEGER NOT NULL);"
   "CREATE TABLE blocks (key_id INTEGER NOT NULL PRIMARY KEY, \"key\" TEXT NOT NULL, UNIQUE INDEX (\"key\");"
   "CREATE TABLE replicas (key_id INTEGER NOT NULL, shard_id INTEGER NOT NULL, PRIMARY KEY (key_id, shard_id)) WITHOUT ROWID;"))

(define-record-type shard
  (make-shard id storage-desc storage trust write-weight read-priority read-weight)
  shard?
  (id shard-id)
  (storage-desc shard-storage-desc (setter shard-storage-desc))
  (storage shard-storage (setter shard-storage))
  (trust shard-trust (setter shard-trust))
  (write-weight shard-write-weight (setter shard-write-weight))
  (read-priority shard-read-priority (setter shard-read-priority))
  (read-weight shard-read-weight (setter shard-read-weight)))

(define (backend-cache cachepath be)
   (define *db* (open-database cachepath))
   (change-file-mode cachepath (bitwise-ior perm/irusr perm/iwusr))
   (set-busy-handler! *db* (busy-timeout 100000))
   (when (null? (schema *db*))
         (for-each (lambda (statement)
                     (exec (sql *db* statement)))
                   cache-sql-schema))
   (exec (sql *db* "BEGIN;"))

   (define add-block-query (sql *db* "INSERT OR REPLACE INTO blocks (\"key\") VALUES (?);"))
   (define add-replica-query (sql *db* "INSERT OR REPLACE INTO replicas (key_id,shard_id) VALUES (?,?);"))
   (define replicas-get-query (sql *db* "SELECT shard_id FROM (SELECT * FROM blocks where \"key\" = ?)
                                           INNER JOIN replicas ON blocks.key_id = replicas.key_id;"))
   (define delete-block-query (sql *db* "DELETE FROM blocks WHERE \"key\" = ?;"))
   (define delete-block-replicas-query (sql *db* "DELETE FROM replicas WHERE key_id IN (SELECT key_id FROM blocks WHERE \"key\" = ?);"))

   ;; Load list of shards
   (define *shards*
     (alist->hash-table
      (map (lambda (r)
             (cons (first r)
                   (make-shard (first r)
                               (second r)
                               #f
                               (third r)
                               (fourth r)
                               (fifth r)
                               (sixth r))))
           (query fetch-all (sql *db* "SELECT shard_id, storage, trust, write_weight, read_priority, read_weight FROM shards;")))))

   (define (connect! shard)
     ;; FIXME: Maybe retry once or twice?
     (set! (shard-storage shard)
           (import-storage (shard-storage-desc shard)))

     (when ((storage-writable? (shard-storage shard)))
           ((storage-set-tag! (shard-storage shard))
            "#ugarit-non-vault-storage"
            "(cluster-shard \"This is a cluster shard, and will only contain a fraction of the contents of the actual vault.\")")))

   (define (disconnect! shard)
     (when (shard-storage shard)
           ((storage-close! (shard-storage shard)))
           (set! (shard-storage shard) #f)))
   
   ;; Apply thunk to a storage pointing to that shard.
   ;; Reconnects a few times if anything fails, before giving up.
   (define (call-with-shard shard thunk)
     (let loop ((attempt 1))
       (unless (shard-storage shard)
               (connect! shard))
       (handle-exceptions
        exn
        (begin
          (disconnect! shard)
          (if (< attempt *attempts*)
              (loop (+ attempt 1))
              (raise exn)))
        (thunk shard))))

   (define commit-interval 1000)
   (define *updates-since-last-commit* 0)
   (define (flush!)
     (when (> *updates-since-last-commit* 0)
      (inc! *flushes*)
      (exec (sql *db* "COMMIT;"))
      (exec (sql *db* "BEGIN;"))
      (set! *updates-since-last-commit* 0)))
   (define (maybe-flush!)
     (inc! *updates-since-last-commit*)
     (when (> *updates-since-last-commit* commit-interval)
           ((storage-flush! be))
           (flush!)))

   (define (cache-set! key type)
      (when type
            (begin
              (exec cache-set-query key (symbol->string type))
              (maybe-flush!)))
      type)

   (define (cache-get key)
      (let ((result
             (query fetch cache-get-query key)))
        (if (pair? result)
            (string->symbol (car result))
            #f)))

   (define (cache-delete! key)
     (exec cache-delete-query key)
     (maybe-flush!))

   (define (cache-clear!)
     (exec (sql *db* "DELETE FROM cache")))

   (make-storage
      (storage-max-block-size be)
      (storage-writable? be)
      (storage-unlinkable? be)
      (lambda (key data type) ; put!
         (begin
            ((storage-put! be) key data type)
            (cache-set! key type)
	    (void)))
      (lambda ()                        ; flush!
        (begin
          ((storage-flush! be))
          (flush!)
          (void)))
      (lambda (key) ; exists?
        (let ((cached-result (cache-get key)))
          (if cached-result
              (begin
                (inc! *hits*)
                cached-result)
              (begin
                (inc! *misses*)
                (cache-set! key ((storage-exists? be) key))))))
      (lambda (key) ; get
         ((storage-get be) key))
      (lambda (key) ; link!
         ((storage-link! be) key))
      (lambda (key) ; unlink!
         (let ((result ((storage-unlink! be) key)))
            (if result
               (begin
                  (cache-delete! key)
                  result)
               result)))
      (lambda (tag key) ; set-tag!
        ((storage-set-tag! be) tag key)
        ((storage-flush! be))
        (flush!))
      (lambda (tag) ; tag
         ((storage-tag be) tag))
      (lambda () ; all-tags
         ((storage-all-tags be)))
      (lambda (tag) ; remove-tag!
         ((storage-remove-tag! be) tag)
         ((storage-flush! be))
         (flush!))
      (lambda (tag) ; lock-tag!
         (let ((result ((storage-lock-tag! be) tag)))
           ((storage-flush! be))
           (flush!)
           result))
      (lambda (tag) ; tag-locked?
         ((storage-tag-locked? be) tag))
      (lambda (tag) ; unlock-tag!
         ((storage-unlock-tag! be) tag)
          ((storage-flush! be))
          (flush!))
      (lambda (command) ; admin!
        (match command
               (('info)
                (list (cons 'backend "cache")
                      (cons 'block-size (storage-max-block-size be))
                      (cons 'writable? (storage-writable? be))
                      (cons 'unlinkable? (storage-unlinkable? be))
                      (cons 'cache-file cachepath)
                      (cons 'commit-interval commit-interval)))
               (('help)
                (list (cons 'info "Return information about the archive")
                      (cons 'help "List available admin commands")
                      (cons 'stats "Examine the cache and report back statistics")
                      (cons 'clear! "Clear the cache")))
               (('stats)
                (list (cons 'entries (car (query fetch (sql *db* "SELECT COUNT(*) FROM cache"))))))
               (('clear!)
                (cache-clear!)
                (flush!)
                (list (cons 'result "Done")))
               (else (error "Unknown admin command"))))
      (lambda () ; close!
        (begin
          ((backend-log!) 'info (sprintf "Cache hits: ~A misses: ~A flushes: ~A" *hits* *misses* *flushes*))
          ((storage-close! be))
          (exec (sql *db* "COMMIT;"))
          (close-database *db*)))))


(define backend
  (match (command-line-arguments)
	 ((cachepath backend)
	  (lambda () (backend-cache cachepath (import-storage backend))))

	 (else
          (export-storage-error! "Invalid arguments to backend-cache")
	  (printf "USAGE:\nbackend-cache <path-to-cache-file> \"<backend command line>\"\n")
	  #f)))

(if backend
    (export-storage! backend))

Changes to ugarit-api.scm.

465
466
467
468
469
470
471









472
473
474
475
476
477
478
                      *global-rules*
                      #f ; Unset version
                      #f ; Unset configuration alist
                      #f)) ; Not changed yet
              (conf-tag
               (vault-tag vault "#ugarit-vault-configuration")))










         (unless conf-tag
                 ;; Create default v1 tag
                 (receive (conf-key conf-reused?)
                          (store-sexpr! vault '(1) 'ugarit-vault-configuration '())

                          (vault-flush! vault)
                          (set! conf-tag (make-tag "#ugarit-vault-configuration" 'ugarit-vault-configuration conf-key))







>
>
>
>
>
>
>
>
>







465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
                      *global-rules*
                      #f ; Unset version
                      #f ; Unset configuration alist
                      #f)) ; Not changed yet
              (conf-tag
               (vault-tag vault "#ugarit-vault-configuration")))

         (let ((nvs-tag (vault-tag vault "#ugarit-non-vault-storage")))
          (when nvs-tag
                (signal (make-property-condition
                         'exn
                         'location 'open-vault
                         'message (sprintf 
                                   "The storage is not a vault: ~S" (tag-key nvs-tag))
                         'arguments (list (tag-type nvs-tag) (tag-key nvs-tag))))))
         
         (unless conf-tag
                 ;; Create default v1 tag
                 (receive (conf-key conf-reused?)
                          (store-sexpr! vault '(1) 'ugarit-vault-configuration '())

                          (vault-flush! vault)
                          (set! conf-tag (make-tag "#ugarit-vault-configuration" 'ugarit-vault-configuration conf-key))