Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | [f1f2ce8cdc] Started work on cluster backend (because I need it...) |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | alaricsp |
Files: | files | file ages | folders |
SHA1: |
74f9480c6ab197cdc8de46378806f5f4 |
User & Date: | alaric 2015-07-31 21:37:27 |
Context
2015-09-26
| ||
10:30 | fprintf had absorbed flush-output! Oops! check-in: 485a7f1e1a user: alaric tags: alaricsp | |
2015-07-31
| ||
21:37 | [f1f2ce8cdc] Started work on cluster backend (because I need it...) check-in: 74f9480c6a user: alaric tags: alaricsp | |
21:33 | Fixed "ugarit cat", and improved resilience of splitlog's reindex! admin command. check-in: eef24a12c0 user: alaric tags: alaricsp | |
Changes
Added backend-cluster.scm.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | (use ugarit-backend) (use sql-de-lite) (use matchable) (use miscmacros) (use srfi-9) ;; Clustered storage backend, as per https://www.kitten-technologies.co.uk/project/ugarit/tktview?name=f1f2ce8cdc (define cache-sql-schema (list "CREATE TABLE shards (shard_id INTEGER PRIMARY KEY NOT NULL, storage TEXT NOT NULL, trust INTEGER NOT NULL, write_weight INTEGER NOT NULL, read_priority INTEGER NOT NULL, read_weight INTEGER NOT NULL);" "CREATE TABLE blocks (key_id INTEGER NOT NULL PRIMARY KEY, \"key\" TEXT NOT NULL, UNIQUE INDEX (\"key\");" "CREATE TABLE replicas (key_id INTEGER NOT NULL, shard_id INTEGER NOT NULL, PRIMARY KEY (key_id, shard_id)) WITHOUT ROWID;")) (define-record-type shard (make-shard id storage-desc storage trust write-weight read-priority read-weight) shard? (id shard-id) (storage-desc shard-storage-desc (setter shard-storage-desc)) (storage shard-storage (setter shard-storage)) (trust shard-trust (setter shard-trust)) (write-weight shard-write-weight (setter shard-write-weight)) (read-priority shard-read-priority (setter shard-read-priority)) (read-weight shard-read-weight (setter shard-read-weight))) (define (backend-cache cachepath be) (define *db* (open-database cachepath)) (change-file-mode cachepath (bitwise-ior perm/irusr perm/iwusr)) (set-busy-handler! *db* (busy-timeout 100000)) (when (null? (schema *db*)) (for-each (lambda (statement) (exec (sql *db* statement))) cache-sql-schema)) (exec (sql *db* "BEGIN;")) (define add-block-query (sql *db* "INSERT OR REPLACE INTO blocks (\"key\") VALUES (?);")) (define add-replica-query (sql *db* "INSERT OR REPLACE INTO replicas (key_id,shard_id) VALUES (?,?);")) (define replicas-get-query (sql *db* "SELECT shard_id FROM (SELECT * FROM blocks where \"key\" = ?) INNER JOIN replicas ON blocks.key_id = replicas.key_id;")) (define delete-block-query (sql *db* "DELETE FROM blocks WHERE \"key\" = ?;")) (define delete-block-replicas-query (sql *db* "DELETE FROM replicas WHERE key_id IN (SELECT key_id FROM blocks WHERE \"key\" = ?);")) ;; Load list of shards (define *shards* (alist->hash-table (map (lambda (r) (cons (first r) (make-shard (first r) (second r) #f (third r) (fourth r) (fifth r) (sixth r)))) (query fetch-all (sql *db* "SELECT shard_id, storage, trust, write_weight, read_priority, read_weight FROM shards;"))))) (define (connect! shard) ;; FIXME: Maybe retry once or twice? (set! (shard-storage shard) (import-storage (shard-storage-desc shard))) (when ((storage-writable? (shard-storage shard))) ((storage-set-tag! (shard-storage shard)) "#ugarit-non-vault-storage" "(cluster-shard \"This is a cluster shard, and will only contain a fraction of the contents of the actual vault.\")"))) (define (disconnect! shard) (when (shard-storage shard) ((storage-close! (shard-storage shard))) (set! (shard-storage shard) #f))) ;; Apply thunk to a storage pointing to that shard. ;; Reconnects a few times if anything fails, before giving up. (define (call-with-shard shard thunk) (let loop ((attempt 1)) (unless (shard-storage shard) (connect! shard)) (handle-exceptions exn (begin (disconnect! shard) (if (< attempt *attempts*) (loop (+ attempt 1)) (raise exn))) (thunk shard)))) (define commit-interval 1000) (define *updates-since-last-commit* 0) (define (flush!) (when (> *updates-since-last-commit* 0) (inc! *flushes*) (exec (sql *db* "COMMIT;")) (exec (sql *db* "BEGIN;")) (set! *updates-since-last-commit* 0))) (define (maybe-flush!) (inc! *updates-since-last-commit*) (when (> *updates-since-last-commit* commit-interval) ((storage-flush! be)) (flush!))) (define (cache-set! key type) (when type (begin (exec cache-set-query key (symbol->string type)) (maybe-flush!))) type) (define (cache-get key) (let ((result (query fetch cache-get-query key))) (if (pair? result) (string->symbol (car result)) #f))) (define (cache-delete! key) (exec cache-delete-query key) (maybe-flush!)) (define (cache-clear!) (exec (sql *db* "DELETE FROM cache"))) (make-storage (storage-max-block-size be) (storage-writable? be) (storage-unlinkable? be) (lambda (key data type) ; put! (begin ((storage-put! be) key data type) (cache-set! key type) (void))) (lambda () ; flush! (begin ((storage-flush! be)) (flush!) (void))) (lambda (key) ; exists? (let ((cached-result (cache-get key))) (if cached-result (begin (inc! *hits*) cached-result) (begin (inc! *misses*) (cache-set! key ((storage-exists? be) key)))))) (lambda (key) ; get ((storage-get be) key)) (lambda (key) ; link! ((storage-link! be) key)) (lambda (key) ; unlink! (let ((result ((storage-unlink! be) key))) (if result (begin (cache-delete! key) result) result))) (lambda (tag key) ; set-tag! ((storage-set-tag! be) tag key) ((storage-flush! be)) (flush!)) (lambda (tag) ; tag ((storage-tag be) tag)) (lambda () ; all-tags ((storage-all-tags be))) (lambda (tag) ; remove-tag! ((storage-remove-tag! be) tag) ((storage-flush! be)) (flush!)) (lambda (tag) ; lock-tag! (let ((result ((storage-lock-tag! be) tag))) ((storage-flush! be)) (flush!) result)) (lambda (tag) ; tag-locked? ((storage-tag-locked? be) tag)) (lambda (tag) ; unlock-tag! ((storage-unlock-tag! be) tag) ((storage-flush! be)) (flush!)) (lambda (command) ; admin! (match command (('info) (list (cons 'backend "cache") (cons 'block-size (storage-max-block-size be)) (cons 'writable? (storage-writable? be)) (cons 'unlinkable? (storage-unlinkable? be)) (cons 'cache-file cachepath) (cons 'commit-interval commit-interval))) (('help) (list (cons 'info "Return information about the archive") (cons 'help "List available admin commands") (cons 'stats "Examine the cache and report back statistics") (cons 'clear! "Clear the cache"))) (('stats) (list (cons 'entries (car (query fetch (sql *db* "SELECT COUNT(*) FROM cache")))))) (('clear!) (cache-clear!) (flush!) (list (cons 'result "Done"))) (else (error "Unknown admin command")))) (lambda () ; close! (begin ((backend-log!) 'info (sprintf "Cache hits: ~A misses: ~A flushes: ~A" *hits* *misses* *flushes*)) ((storage-close! be)) (exec (sql *db* "COMMIT;")) (close-database *db*))))) (define backend (match (command-line-arguments) ((cachepath backend) (lambda () (backend-cache cachepath (import-storage backend)))) (else (export-storage-error! "Invalid arguments to backend-cache") (printf "USAGE:\nbackend-cache <path-to-cache-file> \"<backend command line>\"\n") #f))) (if backend (export-storage! backend)) |
Changes to ugarit-api.scm.
︙ | ︙ | |||
465 466 467 468 469 470 471 472 473 474 475 476 477 478 | *global-rules* #f ; Unset version #f ; Unset configuration alist #f)) ; Not changed yet (conf-tag (vault-tag vault "#ugarit-vault-configuration"))) (unless conf-tag ;; Create default v1 tag (receive (conf-key conf-reused?) (store-sexpr! vault '(1) 'ugarit-vault-configuration '()) (vault-flush! vault) (set! conf-tag (make-tag "#ugarit-vault-configuration" 'ugarit-vault-configuration conf-key)) | > > > > > > > > > | 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 | *global-rules* #f ; Unset version #f ; Unset configuration alist #f)) ; Not changed yet (conf-tag (vault-tag vault "#ugarit-vault-configuration"))) (let ((nvs-tag (vault-tag vault "#ugarit-non-vault-storage"))) (when nvs-tag (signal (make-property-condition 'exn 'location 'open-vault 'message (sprintf "The storage is not a vault: ~S" (tag-key nvs-tag)) 'arguments (list (tag-type nvs-tag) (tag-key nvs-tag)))))) (unless conf-tag ;; Create default v1 tag (receive (conf-key conf-reused?) (store-sexpr! vault '(1) 'ugarit-vault-configuration '()) (vault-flush! vault) (set! conf-tag (make-tag "#ugarit-vault-configuration" 'ugarit-vault-configuration conf-key)) |
︙ | ︙ |