diff --git a/.github/scripts/admin-api/check-column-family-estimate-num-keys.sh b/.github/scripts/admin-api/check-column-family-estimate-num-keys.sh new file mode 100755 index 000000000..5202080ec --- /dev/null +++ b/.github/scripts/admin-api/check-column-family-estimate-num-keys.sh @@ -0,0 +1,11 @@ +#!/bin/bash -e + +SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" +. "$SCRIPT_DIR/../util.sh" + +BASE="http://localhost:8080/fhir" +COLUMN_FAMILY="$1" +EXPECTED_ESTIMATE_NUM_KEYS="$2" +ACTUAL_ESTIMATE_NUM_KEYS=$(curl -s "$BASE/__admin/dbs/index/column-families" | jq ".[] | select(.name == \"$COLUMN_FAMILY\").estimateNumKeys") + +test "estimate number of keys" "$ACTUAL_ESTIMATE_NUM_KEYS" "$EXPECTED_ESTIMATE_NUM_KEYS" diff --git a/.github/scripts/admin-api/run-prune-job.sh b/.github/scripts/admin-api/run-prune-job.sh new file mode 100755 index 000000000..b041414eb --- /dev/null +++ b/.github/scripts/admin-api/run-prune-job.sh @@ -0,0 +1,57 @@ +#!/bin/bash -e + +SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" +. "$SCRIPT_DIR/../util.sh" + +BASE="http://localhost:8080/fhir" + +prune-job() { +cat < [!NOTE] +> In contrast, relational databases, which where designed in the 80's, use an update in-place model, because storage was expensive then. A similar technic is [copy-on-write][4], used in many areas of computing like modern filesystems. Having a database architecture which consists of immutable database values evolving over time, were past values are kept either forever or at least sufficiently long, has the property that reads don't need coordination. Because database values can be referenced by `t`, an arbitrarily number queries, spread over arbitrarily periods of time, can access the same immutable snapshot of the database. In case data is replicated over multiple nodes, queries can even run in parallel, all accessing the same coherent database value. -As one example, in paging of FHIR searchset or history bundles, Blaze simply refers to the database value's `t` of the first page, in order to calculate every remaining page based on the same stable database value. +As one example, in paging of FHIR searchset or history bundles, Blaze simply refers to the database value's `t` of the first page, in order to calculate every remaining page based on the same immutable database value. In practise, each FHIR RESTful API read request will obtain the newest known database value and use that for all queries necessary to answer the request. +Database values will be no longer accessible after [pruning](#pruning) because pruning actually removes purged data for GDPR regularity or simply space saving reasons. + ## Logical Data Model Datomic uses a fact based data model, were each fact is the triple `(entity, attribute, value)` for example `(, birthDate, 2020)`. This model has one big advantage, the minimum change between two database values will be a fact, which is quite small. The disadvantage is, that bigger structures like resources have to be reconstructed from individual facts. In addition to that, because in FHIR, updates are always whole resources, the actual changed facts have to be determined by diffing the old and new resource. @@ -50,6 +53,8 @@ There are two different sets of indices, ones which depend on the database value | TypeStats | type t | total, num-changes | | SystemStats | t | total, num-changes | +The first three indices `ResourceAsOf`, `TypeAsOf` and `SystemAsOf` put all versions of resources on the timeline. The only difference between that indices is the order of the parts of the key. + #### ResourceAsOf The `ResourceAsOf` index is the primary index which maps the resource identifier `(type, id)` together with the `t` to the `content-hash` of the resource version. In addition to that, the index contains the number of changes `num-changes` to the resource, the operator `op` of the change leading to the index entry and an optional `purged-at` point in time were the version was purged. @@ -86,7 +91,7 @@ In addition to direct resource lookup, the `ResourceAsOf` index is used for list #### TypeAsOf -The `TypeAsOf` index contains the same information as the `ResourceAsOf` index with the difference that the components of the key are ordered `type`, `t` and `id` instead of `type`, `id` and `t`. The index is used for listing all versions of all resources of a particular type. Such history listings start with the `t` of the database value going into the past. This is done by not only choosing the resource version with the latest `t` less or equal the database values `t` but instead using all older versions. Such versions even include deleted versions because in FHIR it is allowed to bring back a resource to a new life after it was already deleted. The listing is done by simply scanning through the index in reverse. Because the key is ordered by `type`, `t` and `id`, the entries will be first ordered by time, newest first, and second by resource identifier. +The `TypeAsOf` index contains the same information as the `ResourceAsOf` index with the difference that the parts of the key are ordered `type`, `t` and `id` instead of `type`, `id` and `t`. The index is used for listing all versions of all resources of a particular type. Such history listings start with the `t` of the database value going into the past. This is done by not only choosing the resource version with the latest `t` less or equal the database values `t` but instead using all older versions. Such versions even include deleted versions because in FHIR it is allowed to bring back a resource to a new life after it was already deleted. The listing is done by simply scanning through the index in reverse. Because the key is ordered by `type`, `t` and `id`, the entries will be first ordered by time, newest first, and second by resource identifier. #### SystemAsOf @@ -100,7 +105,8 @@ The `PatientLastChange` index contains all changes to resources in the compartme The `TxSuccess` index contains the real point in time, as `java.time.Instant`, successful transactions happened. In other words, this index maps each `t` which is just a monotonically increasing number to a real point in time. -**Note:** Other than XTDB, Blaze is not a bitemporal. That means the time recorded in the history of resources is the transaction time, not a business time. That also means that one can't fix the history, because the history only reflects the transactions happened. +> [!NOTE] +> Other than XTDB, Blaze is not a bitemporal. That means the time recorded in the history of resources is the transaction time, not a business time. That also means that one can't fix the history, because the history only reflects the transactions happened. #### TxError @@ -332,7 +338,7 @@ The `delete-history` command is used to delete the history of a resource. * get all instance history entries * add the `t` of the transaction as `purged-at?` to the value of each of the history entries not only in the ResourceAsOf index, but also in the TypeAsOf and SystemAsOf index -* remove the number of history entries purged from the number of changes of `type` and thw whole system +* remove the number of history entries purged from the number of changes of `type` and the whole system ### Patient Purge @@ -345,6 +351,14 @@ The `patient-purge` command is used to remove all current and historical version | id | yes | string | patient id | | check-refs | no | boolean | use referential integrity checks | +## History Deletion / Purging + +Blaze keeps always a history of all resources. So updating or deleting a resource creates a new version and keeps all previous versions. However the history can be deleted using the [delete history](#delete-history) command. Also, in case all data of a particular patient including history should be deleted, the [patient purge](#patient-purge) command can be used. + +History deletion works in two stages, first the resource versions are marked are purged and second the marked versions are removed during [pruning](#pruning). The two sage process is necessary, because database values have to be still immutable and actually removing data would violate the immutability. The marks itself will contain the `t` of the transaction that purged the entry. + +## Pruning + [1]: [2]: [3]: diff --git a/docs/implementation/fhir-data-model.md b/docs/implementation/fhir-data-model.md index 5e35f7d4c..fb97dcaf6 100644 --- a/docs/implementation/fhir-data-model.md +++ b/docs/implementation/fhir-data-model.md @@ -34,7 +34,8 @@ will produce the following Clojure data structure: :deceasedBoolean false} ``` -**Note:** Clojure data structures are explained [here][3]. Clojure uses generic data structures like maps and lists instead of domain specific classes like Java. +> [!NOTE] +> Clojure data structures are explained [here][3]. Clojure uses generic data structures like maps and lists instead of domain specific classes like Java. The Clojure map looks exactly the same as the JSON document. The main difference is, that all keys are converted to Clojure keywords which can be written without quotes and always start with a colon. The parsing process is fully generic like in JavaScript. There is no need to define any domain specific classes like in Java. diff --git a/job-ig/input/fsh/job.fsh b/job-ig/input/fsh/job.fsh index 9b22dbed7..c3258678c 100644 --- a/job-ig/input/fsh/job.fsh +++ b/job-ig/input/fsh/job.fsh @@ -10,6 +10,7 @@ Title: "Job Type" * #async-interaction "Asynchronous Interaction Request" * #async-bulk-data "Asynchronous Bulk Data Request" * #compact "Compact a Database Column Family" +* #prune "Prune the Database" * #re-index "(Re)Index a Search Parameter" CodeSystem: JobStatusReason diff --git a/job-ig/input/fsh/prune-job.fsh b/job-ig/input/fsh/prune-job.fsh new file mode 100644 index 000000000..d0b47c03e --- /dev/null +++ b/job-ig/input/fsh/prune-job.fsh @@ -0,0 +1,149 @@ +Alias: UCUM = http://unitsofmeasure.org +Alias: $FT = http://hl7.org/fhir/fhir-types +Alias: $JT = https://samply.github.io/blaze/fhir/CodeSystem/JobType +Alias: $JSR = https://samply.github.io/blaze/fhir/CodeSystem/JobStatusReason +Alias: $JO = https://samply.github.io/blaze/fhir/CodeSystem/JobOutput +Alias: $PJP = https://samply.github.io/blaze/fhir/CodeSystem/PruneJobParameter +Alias: $PJO = https://samply.github.io/blaze/fhir/CodeSystem/PruneJobOutput +Alias: $PI = https://samply.github.io/blaze/fhir/CodeSystem/PruneIndices + +CodeSystem: PruneJobParameter +Id: PruneJobParameter +Title: "Prune Job Parameter" +* ^status = #active +* #t "T" + +CodeSystem: PruneJobOutput +Id: PruneJobOutput +Title: "Prune Job Output" +* ^status = #active +* #total-index-entries "Total Index Entries" +* #index-entries-processed "Index Entries Processed" +* #index-entries-deleted "Index Entries Deleted" +* #processing-duration "Processing Duration" +* #next-index "Next Index" +* #next-type "Next Type" +* #next-id "Next Id" +* #next-t "Next T" + +CodeSystem: PruneIndices +Id: PruneIndices +Title: "Prune Indices" +* ^status = #active +* #resource-as-of "ResourceAsOf" +* #type-as-of "TypeAsOf" +* #system-as-of "SystemAsOf" + +ValueSet: PruneIndices +Id: PruneIndices +Title: "Prune Indices Value Set" +* ^status = #active +* include codes from system PruneIndices + +Profile: PruneJob +Parent: Job +* code = $JT#prune "Prune the Database" +* input ^slicing.discriminator.type = #pattern +* input ^slicing.discriminator.path = "type" +* input ^slicing.rules = #open +* input contains t 1..1 +* input[t] ^short = "T" +* input[t] ^definition = "The database point in time to use as start of pruning." +* input[t].type = $PJP#t +* input[t].value[x] only positiveInt +* output ^slicing.discriminator.type = #pattern +* output ^slicing.discriminator.path = "type" +* output ^slicing.rules = #open +* output contains totalIndexEntries 0..1 +* output[totalIndexEntries] ^short = "Total Index Entries" +* output[totalIndexEntries] ^definition = "Estimated total number of index entries to prune." +* output[totalIndexEntries].type = $PJO#total-index-entries +* output[totalIndexEntries].value[x] only unsignedInt +* output contains indexEntriesProcessed 0..1 +* output[indexEntriesProcessed] ^short = "Index Entries Processed" +* output[indexEntriesProcessed] ^definition = "Number of index entries processed." +* output[indexEntriesProcessed].type = $PJO#index-entries-processed +* output[indexEntriesProcessed].value[x] only unsignedInt +* output contains indexEntriesDeleted 0..1 +* output[indexEntriesDeleted] ^short = "Index Entries Deleted" +* output[indexEntriesDeleted] ^definition = "Number of index entries deleted." +* output[indexEntriesDeleted].type = $PJO#index-entries-deleted +* output[indexEntriesDeleted].value[x] only unsignedInt +* output contains processingDuration 0..1 +* output[processingDuration] ^short = "Processing Duration" +* output[processingDuration] ^definition = "Duration the pruning processing took. Durations while the job was paused don't count." +* output[processingDuration].type = $PJO#processing-duration +* output[processingDuration].value[x] only Quantity +* output[processingDuration].valueQuantity + * system 1..1 + * system = UCUM + * code 1..1 + * code = #s "seconds" +* output contains nextIndex 0..1 +* output[nextIndex] ^short = "Next Index" +* output[nextIndex] ^definition = "The name of the index to continue with. Used in case the job is resumed after manual pausing or shutdown of Blaze." +* output[nextIndex].type = $PJO#next-index +* output[nextIndex].value[x] only code +* output[nextIndex].valueCode from PruneIndices +* output contains nextType 0..1 +* output[nextType] ^short = "Next Type" +* output[nextType] ^definition = "The FHIR resource type to continue with. Used in case the job is resumed after manual pausing or shutdown of Blaze." +* output[nextType].type = $PJO#next-type +* output[nextType].value[x] only code +* output[nextType].valueCode from http://hl7.org/fhir/ValueSet/resource-types +* output contains nextId 0..1 +* output[nextId] ^short = "Next Id" +* output[nextId] ^definition = "The FHIR resource id to continue with. Used in case the job is resumed after manual pausing or shutdown of Blaze." +* output[nextId].type = $PJO#next-id +* output[nextId].value[x] only id +* output contains nextT 0..1 +* output[nextT] ^short = "Next T" +* output[nextT] ^definition = "The database point in time to continue with. Used in case the job is resumed after manual pausing or shutdown of Blaze." +* output[nextT].type = $PJO#next-t +* output[nextT].value[x] only positiveInt + +Instance: PruneJobReadyExample +InstanceOf: PruneJob +* status = #ready +* intent = #order +* code = $JT#prune "Prune the Database" +* authoredOn = "2024-10-15T15:01:00.000Z" +* input[t].type = $PJP#t "T" +* input[t].valuePositiveInt = 42 + +Instance: PruneJobInProgressExample +InstanceOf: PruneJob +* status = #in-progress +* statusReason = $JSR#started "Started" +* intent = #order +* code = $JT#prune "Prune the Database" +* authoredOn = "2024-10-15T15:01:00.000Z" +* input[t].type = $PJP#t "T" +* input[t].valuePositiveInt = 42 +* output[totalIndexEntries].type = $PJO#total-index-entries "Total Index Entries" +* output[totalIndexEntries].valueUnsignedInt = 1000 +* output[indexEntriesProcessed].type = $PJO#index-entries-processed "Index Entries Processed" +* output[indexEntriesProcessed].valueUnsignedInt = 100 +* output[indexEntriesDeleted].type = $PJO#index-entries-deleted "Index Entries Deleted" +* output[indexEntriesDeleted].valueUnsignedInt = 10 +* output[processingDuration].type = $PJO#processing-duration "Processing Duration" +* output[processingDuration].valueQuantity.value = 10 +* output[nextIndex].type = $PJO#next-index "Next Index" +* output[nextIndex].valueCode = $PI#resource-as-of +* output[nextType].type = $PJO#next-type "Next Type" +* output[nextType].valueCode = $FT#Patient +* output[nextId].type = $PJO#next-id "Next Id" +* output[nextId].valueId = "0" +* output[nextT].type = $PJO#next-t "Next T" +* output[nextT].valuePositiveInt = 23 + +Instance: PruneJobFailedExample +InstanceOf: PruneJob +* status = #failed +* intent = #order +* code = $JT#prune "Prune the Database" +* authoredOn = "2024-10-15T15:01:00.000Z" +* input[t].type = $PJP#t "T" +* input[t].valuePositiveInt = 42 +* output[error].type = $JO#error "Error" +* output[error].valueString = "error message" diff --git a/modules/admin-api/deps.edn b/modules/admin-api/deps.edn index f75e293b9..74c09630b 100644 --- a/modules/admin-api/deps.edn +++ b/modules/admin-api/deps.edn @@ -19,6 +19,9 @@ blaze/job-compact {:local/root "../job-compact"} + blaze/job-prune + {:local/root "../job-prune"} + blaze/job-re-index {:local/root "../job-re-index"} diff --git a/modules/admin-api/src/blaze/admin_api.clj b/modules/admin-api/src/blaze/admin_api.clj index 50d7cae82..5a8b03d0a 100644 --- a/modules/admin-api/src/blaze/admin_api.clj +++ b/modules/admin-api/src/blaze/admin_api.clj @@ -18,6 +18,7 @@ [blaze.interaction.util :as iu] [blaze.job-scheduler :as js] [blaze.job-scheduler.spec] + [blaze.job.prune] [blaze.job.re-index] [blaze.middleware.fhir.db :as db] [blaze.middleware.fhir.output :as fhir-output] @@ -186,6 +187,7 @@ (def ^:private allowed-profiles #{#fhir/canonical"https://samply.github.io/blaze/fhir/StructureDefinition/AsyncInteractionJob" #fhir/canonical"https://samply.github.io/blaze/fhir/StructureDefinition/CompactJob" + #fhir/canonical"https://samply.github.io/blaze/fhir/StructureDefinition/PruneJob" #fhir/canonical"https://samply.github.io/blaze/fhir/StructureDefinition/ReIndexJob"}) (defn- check-profile [resource] @@ -477,6 +479,11 @@ "blaze/job/compact/CodeSystem-CompactJobOutput.json" "blaze/job/compact/CodeSystem-CompactJobParameter.json" "blaze/job/compact/StructureDefinition-CompactJob.json" + "blaze/job/prune/StructureDefinition-PruneJob.json" + "blaze/job/prune/CodeSystem-PruneIndices.json" + "blaze/job/prune/CodeSystem-PruneJobOutput.json" + "blaze/job/prune/CodeSystem-PruneJobParameter.json" + "blaze/job/prune/ValueSet-PruneIndices.json" "blaze/job/re_index/StructureDefinition-ReIndexJob.json" "blaze/job/re_index/CodeSystem-ReIndexJobOutput.json" "blaze/job/re_index/CodeSystem-ReIndexJobParameter.json"]) diff --git a/modules/db/src/blaze/db/api.clj b/modules/db/src/blaze/db/api.clj index b4440a57c..4e77d2847 100644 --- a/modules/db/src/blaze/db/api.clj +++ b/modules/db/src/blaze/db/api.clj @@ -526,6 +526,11 @@ ;; ---- (Re) Index ------------------------------------------------------------ (defn re-index-total + "Returns the total number of resources that have to be processed when + (re)indexing the search parameter with `search-param-url`. + + Returns an anomaly if the search parameter with `search-param-url` was not + found." [db search-param-url] (p/-re-index-total db search-param-url)) @@ -541,3 +546,25 @@ (p/-re-index db search-param-url)) ([db search-param-url start-type start-id] (p/-re-index db search-param-url start-type start-id))) + +;; ---- Prune ----------------------------------------------------------------- + +(defn prune-total + "Returns the estimated total number of index entries that have to be processed + during pruning." + [node] + (np/-prune-total node)) + +(defn prune + "Removes purged and outdated index entries from `node` which were purged at or + before `t`. + + Processes at most `n` index entries. The function `prune-total` can be used to + determine to total number of index entries needed to process. + + Returns a map of :index, :type, :id and :t which can supplied to the optional + `start` argument to continue the pruning." + ([node n t] + (np/-prune node n t nil)) + ([node n t start] + (np/-prune node n t start))) diff --git a/modules/db/src/blaze/db/api_spec.clj b/modules/db/src/blaze/db/api_spec.clj index 304f704c9..dea7d1947 100644 --- a/modules/db/src/blaze/db/api_spec.clj +++ b/modules/db/src/blaze/db/api_spec.clj @@ -263,3 +263,12 @@ :start (s/? (s/cat :start-type :fhir.resource/type :start-id :blaze.resource/id))) :ret ac/completable-future?) + +(s/fdef d/prune-total + :args (s/cat :node :blaze.db/node) + :ret nat-int?) + +(s/fdef d/prune + :args (s/cat :node :blaze.db/node :n pos-int? :t :blaze.db/t + :start (s/? :blaze.db.prune/start)) + :ret ac/completable-future?) diff --git a/modules/db/src/blaze/db/impl/index/resource_as_of.clj b/modules/db/src/blaze/db/impl/index/resource_as_of.clj index 6eeadee16..7df93ecf5 100644 --- a/modules/db/src/blaze/db/impl/index/resource_as_of.clj +++ b/modules/db/src/blaze/db/impl/index/resource_as_of.clj @@ -151,11 +151,16 @@ (search-entry! kb vb) result)))))))) -(defn- encode-key-buf [tid id t] - (-> (bb/allocate (unchecked-add-int except-id-key-size (bs/size id))) - (bb/put-int! tid) - (bb/put-byte-string! id) - (bb/put-long! (codec/descending-long t)))) +(defn- encode-key-buf + ([tid id] + (-> (bb/allocate (unchecked-add-int codec/tid-size (bs/size id))) + (bb/put-int! tid) + (bb/put-byte-string! id))) + ([tid id t] + (-> (bb/allocate (unchecked-add-int except-id-key-size (bs/size id))) + (bb/put-int! tid) + (bb/put-byte-string! id) + (bb/put-long! (codec/descending-long t))))) (defn encode-key "Encodes the key of the ResourceAsOf index from `tid`, `id` and `t`." @@ -179,6 +184,10 @@ (defn- start-key ([tid] (-> (Ints/toByteArray tid) bs/from-byte-array)) + ([tid start-id] + (-> (encode-key-buf tid start-id) + bb/flip! + bs/from-byte-buffer!)) ([tid start-id t] (-> (encode-key-buf tid start-id t) bb/flip! @@ -262,6 +271,7 @@ iteration. The state and t which are both longs are read from the off-heap key and value buffer. The hash and state which are read from the value buffer are only read once for each resource handle." + {:arglists '([batch-db tid] [batch-db tid start-id])} ([{:keys [snapshot t]} tid] (i/entries snapshot :resource-as-of-index (type-list-xf t tid) (start-key tid))) @@ -282,6 +292,7 @@ tid and resource id. The list starts at the optional `start-tid` and `start-id`." + {:arglists '([batch-db] [batch-db start-tid start-id])} ([{:keys [snapshot t]}] (i/entries snapshot :resource-as-of-index (system-list-xf t nil))) ([{:keys [snapshot t]} start-tid start-id] @@ -405,3 +416,52 @@ (when (matcher input handle) handle)))) (closer iter))))) + +(defn- delete-entry! [kb] + (bb/set-position! kb 0) + (let [key (byte-array (bb/limit kb))] + (bb/copy-into-byte-array! kb key) + [:resource-as-of-index key])) + +(defn- prune-rf [n] + (fn [ret {:keys [idx delete-entry] [tid id t] :key}] + (if (= idx n) + (reduced (assoc ret :next {:tid tid :id id :t t})) + (cond-> (update ret :num-entries-processed inc) + delete-entry + (update :delete-entries conj delete-entry))))) + +(defn- prune-key! [kb] + [(bb/get-int! kb) + (bs/from-byte-buffer! kb (- (bb/remaining kb) codec/t-size)) + (codec/descending-long (bb/get-long! kb))]) + +(defn- prune-xf [t] + (map-indexed + (fn [idx [kb vb]] + (bb/set-position! vb (+ hash/size codec/state-size)) + (cond-> + {:idx idx :key (prune-key! kb)} + (rh/purged!? vb t) + (assoc :delete-entry (delete-entry! kb)))))) + +(defn prune + "Scans the ResourceAsOf index for entries which were purged at or before `t`. + + Processes at most `n` entries and optionally starts at the entry with + `start-tid` and `start-id`. + + Returns a map with :delete-entries and :next where :delete-entries is a + vector of all index entries to delete and :next is a map of :tid and :id of + the index entry to start with in the next iteration if necessary." + ([snapshot n t] + (reduce + (prune-rf n) + {:delete-entries [] :num-entries-processed 0} + (i/entries snapshot :resource-as-of-index (prune-xf t)))) + ([snapshot n t start-tid start-id start-t] + (reduce + (prune-rf n) + {:delete-entries [] :num-entries-processed 0} + (i/entries snapshot :resource-as-of-index (prune-xf t) + (start-key start-tid start-id start-t))))) diff --git a/modules/db/src/blaze/db/impl/index/resource_handle.clj b/modules/db/src/blaze/db/impl/index/resource_handle.clj index b025b860a..5819f2715 100644 --- a/modules/db/src/blaze/db/impl/index/resource_handle.clj +++ b/modules/db/src/blaze/db/impl/index/resource_handle.clj @@ -61,6 +61,16 @@ :delete :put))) +(defn purged!? + "Returns true if the index entry with `vb` has an encoded purged-at t at or + before `t`. + + The position of `vb` has to be after reading the state and will be incremented + by 8 byte in case the end isn't reached." + [vb t] + (and (= (bb/remaining vb) codec/t-size) + (<= (bb/get-long! vb) (long t)))) + (defn resource-handle! "Creates a new resource handle when not purged at `base-t`. @@ -68,7 +78,7 @@ [tid id t base-t vb] (let [hash (hash/from-byte-buffer! vb) state (bb/get-long! vb)] - (when (or (< (bb/remaining vb) 8) (< (long base-t) (bb/get-long! vb))) + (when-not (purged!? vb base-t) (ResourceHandle. tid id diff --git a/modules/db/src/blaze/db/impl/index/system_as_of.clj b/modules/db/src/blaze/db/impl/index/system_as_of.clj index 05f5809b9..cbc293152 100644 --- a/modules/db/src/blaze/db/impl/index/system_as_of.clj +++ b/modules/db/src/blaze/db/impl/index/system_as_of.clj @@ -5,7 +5,8 @@ [blaze.byte-string :as bs] [blaze.db.impl.codec :as codec] [blaze.db.impl.index.resource-handle :as rh] - [blaze.db.impl.iterators :as i]) + [blaze.db.impl.iterators :as i] + [blaze.fhir.hash :as hash]) (:import [com.google.common.primitives Longs])) @@ -45,22 +46,21 @@ (bb/put-byte-string! id) bb/array)) -(defn- encode-t-tid [start-t start-tid] - (-> (bb/allocate t-tid-size) - (bb/put-long! (codec/descending-long ^long start-t)) - (bb/put-int! start-tid) - bb/array)) - (defn- start-key [start-t start-tid start-id] (cond start-id - (encode-key start-t start-tid start-id) + (bs/from-byte-array (encode-key start-t start-tid start-id)) start-tid - (encode-t-tid start-t start-tid) + (-> (bb/allocate t-tid-size) + (bb/put-long! (codec/descending-long ^long start-t)) + (bb/put-int! start-tid) + bb/flip! + bs/from-byte-buffer!) :else - (Longs/toByteArray (codec/descending-long ^long start-t)))) + (-> (Longs/toByteArray (codec/descending-long ^long start-t)) + bs/from-byte-array))) (defn system-history "Returns a reducible collection of all historic resource handles of the @@ -70,10 +70,59 @@ (i/entries snapshot :system-as-of-index (keep (decoder t)) - (bs/from-byte-array (start-key start-t start-tid start-id)))) + (start-key start-t start-tid start-id))) (defn changes "Returns a reducible collection of all resource handles changed at `t`." [snapshot t] (i/prefix-entries snapshot :system-as-of-index (keep (decoder t)) codec/t-size - (bs/from-byte-array (start-key t nil nil)))) + (start-key t nil nil))) + +(defn- delete-entry! [kb] + (bb/set-position! kb 0) + (let [key (byte-array (bb/limit kb))] + (bb/copy-into-byte-array! kb key) + [:system-as-of-index key])) + +(defn- prune-rf [n] + (fn [ret {:keys [idx delete-entry] [t tid id] :key}] + (if (= idx n) + (reduced (assoc ret :next {:t t :tid tid :id id})) + (cond-> (update ret :num-entries-processed inc) + delete-entry + (update :delete-entries conj delete-entry))))) + +(defn- prune-key! [kb] + [(codec/descending-long (bb/get-long! kb)) + (bb/get-int! kb) + (bs/from-byte-buffer! kb (bb/remaining kb))]) + +(defn- prune-xf [t] + (map-indexed + (fn [idx [kb vb]] + (bb/set-position! vb (+ hash/size codec/state-size)) + (cond-> + {:idx idx :key (prune-key! kb)} + (rh/purged!? vb t) + (assoc :delete-entry (delete-entry! kb)))))) + +(defn prune + "Scans the SystemAsOf index for entries which were purged at or before `t`. + + Processes at most `n` entries and optionally starts at the entry with + `start-t`, `start-tid` and `start-id`. + + Returns a map with :delete-entries and :next where :delete-entries is a + vector of all index entries to delete and :next is a map of :tid, :t and :id + of the index entry to start with in the next iteration if necessary." + ([snapshot n t] + (reduce + (prune-rf n) + {:delete-entries [] :num-entries-processed 0} + (i/entries snapshot :system-as-of-index (prune-xf t)))) + ([snapshot n t start-t start-tid start-id] + (reduce + (prune-rf n) + {:delete-entries [] :num-entries-processed 0} + (i/entries snapshot :system-as-of-index (prune-xf t) + (start-key start-t start-tid start-id))))) diff --git a/modules/db/src/blaze/db/impl/index/type_as_of.clj b/modules/db/src/blaze/db/impl/index/type_as_of.clj index 59ff36d41..d18ef3db1 100644 --- a/modules/db/src/blaze/db/impl/index/type_as_of.clj +++ b/modules/db/src/blaze/db/impl/index/type_as_of.clj @@ -5,7 +5,8 @@ [blaze.byte-string :as bs] [blaze.db.impl.codec :as codec] [blaze.db.impl.index.resource-handle :as rh] - [blaze.db.impl.iterators :as i])) + [blaze.db.impl.iterators :as i] + [blaze.fhir.hash :as hash])) (set! *warn-on-reflection* true) (set! *unchecked-math* :warn-on-boxed) @@ -62,3 +63,52 @@ snapshot :type-as-of-index (keep (decoder tid t)) codec/tid-size (start-key tid start-t start-id))) + +(defn- delete-entry! [kb] + (bb/set-position! kb 0) + (let [key (byte-array (bb/limit kb))] + (bb/copy-into-byte-array! kb key) + [:type-as-of-index key])) + +(defn- prune-rf [n] + (fn [ret {:keys [idx delete-entry] [tid t id] :key}] + (if (= idx n) + (reduced (assoc ret :next {:tid tid :t t :id id})) + (cond-> (update ret :num-entries-processed inc) + delete-entry + (update :delete-entries conj delete-entry))))) + +(defn- prune-key! [kb] + [(bb/get-int! kb) + (codec/descending-long (bb/get-long! kb)) + (bs/from-byte-buffer! kb (bb/remaining kb))]) + +(defn- prune-xf [t] + (map-indexed + (fn [idx [kb vb]] + (bb/set-position! vb (+ hash/size codec/state-size)) + (cond-> + {:idx idx :key (prune-key! kb)} + (rh/purged!? vb t) + (assoc :delete-entry (delete-entry! kb)))))) + +(defn prune + "Scans the TypeAsOf index for entries which were purged at or before `t`. + + Processes at most `n` entries and optionally starts at the entry with + `start-tid`, `start-t` and `start-id`. + + Returns a map with :delete-entries and :next where :delete-entries is a + vector of all index entries to delete and :next is a map of :tid, :t and :id + of the index entry to start with in the next iteration if necessary." + ([snapshot n t] + (reduce + (prune-rf n) + {:delete-entries [] :num-entries-processed 0} + (i/entries snapshot :type-as-of-index (prune-xf t)))) + ([snapshot n t start-tid start-t start-id] + (reduce + (prune-rf n) + {:delete-entries [] :num-entries-processed 0} + (i/entries snapshot :type-as-of-index (prune-xf t) + (start-key start-tid start-t start-id))))) diff --git a/modules/db/src/blaze/db/node.clj b/modules/db/src/blaze/db/node.clj index 73e62c674..fc679ce96 100644 --- a/modules/db/src/blaze/db/node.clj +++ b/modules/db/src/blaze/db/node.clj @@ -10,10 +10,13 @@ [blaze.db.impl.db :as db] [blaze.db.impl.index :as index] [blaze.db.impl.index.patient-last-change :as plc] + [blaze.db.impl.index.resource-as-of :as rao] [blaze.db.impl.index.resource-handle :as rh] + [blaze.db.impl.index.system-as-of :as sao] [blaze.db.impl.index.t-by-instant :as t-by-instant] [blaze.db.impl.index.tx-error :as tx-error] [blaze.db.impl.index.tx-success :as tx-success] + [blaze.db.impl.index.type-as-of :as tao] [blaze.db.impl.protocols :as p] [blaze.db.kv :as kv] [blaze.db.node.patient-last-change-index :as node-plc] @@ -282,6 +285,38 @@ (with-open [db (batch-db/new-batch-db node t t)] (into [] (take-while #(= t (rh/t %))) (d/type-history db type)))) +(defn- prune-next [{:keys [tid id t] :as next} index next-index] + (cond + next + {:index index + :type (codec/tid->type tid) + :id (codec/id-string id) + :t t} + next-index + {:index next-index})) + +(defmulti prune (fn [_ _ _ {:keys [index]}] index)) + +(defmethod prune nil [snapshot n t _] + (-> (rao/prune snapshot n t) + (update :next prune-next :resource-as-of-index :type-as-of-index))) + +(defmethod prune :resource-as-of-index [snapshot n t {:keys [type id] :as start}] + (-> (rao/prune snapshot n t (codec/tid type) (codec/id-byte-string id) (:t start)) + (update :next prune-next :resource-as-of-index :type-as-of-index))) + +(defmethod prune :type-as-of-index [snapshot n t {:keys [type id] :as start}] + (-> (if type + (tao/prune snapshot n t (codec/tid type) (:t start) (codec/id-byte-string id)) + (tao/prune snapshot n t)) + (update :next prune-next :type-as-of-index :system-as-of-index))) + +(defmethod prune :system-as-of-index [snapshot n t {:keys [type id] :as start}] + (-> (if type + (sao/prune snapshot n t (:t start) (codec/tid type) (codec/id-byte-string id)) + (sao/prune snapshot n t)) + (update :next prune-next :system-as-of-index nil))) + (defrecord Node [context tx-log tx-cache kv-store resource-store sync-fn search-param-registry resource-indexer state run? poll-timeout finished] @@ -337,6 +372,21 @@ (flow/submit! publisher changed-handles))))) publisher)) + (-prune-total [_] + (+ (kv/estimate-num-keys kv-store :resource-as-of-index) + (kv/estimate-num-keys kv-store :type-as-of-index) + (kv/estimate-num-keys kv-store :system-as-of-index))) + + (-prune [_ n t start] + (log/trace "prune at most" n "index entries starting at t =" t) + (let [{:keys [delete-entries num-entries-processed next]} + (with-open [snapshot (kv/new-snapshot kv-store)] + (prune snapshot n t start))] + (kv/delete! kv-store delete-entries) + {:num-index-entries-processed num-entries-processed + :num-index-entries-deleted (count delete-entries) + :next next})) + p/Tx (-tx [_ t] (tx-success/tx tx-cache t)) diff --git a/modules/db/src/blaze/db/node/protocols.clj b/modules/db/src/blaze/db/node/protocols.clj index ef3ce0dca..a78136c99 100644 --- a/modules/db/src/blaze/db/node/protocols.clj +++ b/modules/db/src/blaze/db/node/protocols.clj @@ -5,4 +5,6 @@ (-sync [node] [node t]) (-submit-tx [node tx-ops]) (-tx-result [node t]) - (-changed-resources-publisher [node type])) + (-changed-resources-publisher [node type]) + (-prune-total [node]) + (-prune [node n t start])) diff --git a/modules/db/src/blaze/db/spec.clj b/modules/db/src/blaze/db/spec.clj index 1f76f64cd..d9b290ec4 100644 --- a/modules/db/src/blaze/db/spec.clj +++ b/modules/db/src/blaze/db/spec.clj @@ -116,3 +116,13 @@ (s/def :blaze.db/allow-multiple-delete boolean?) + +(s/def :blaze.db.prune/index + #{:resource-as-of-index :type-as-of-index :system-as-of-index}) + +(s/def :blaze.db.prune/start + (s/keys :req-un [(or :blaze.db.prune/index + (and :blaze.db.prune/index + :fhir.resource/type + :blaze.resource/id + :blaze.db/t))])) diff --git a/modules/db/test/blaze/db/api_test.clj b/modules/db/test/blaze/db/api_test.clj index 6a1d09343..3fb959f90 100644 --- a/modules/db/test/blaze/db/api_test.clj +++ b/modules/db/test/blaze/db/api_test.clj @@ -11,6 +11,8 @@ [blaze.db.api-spec] [blaze.db.impl.db-spec] [blaze.db.impl.index.resource-search-param-value-test-util :as r-sp-v-tu] + [blaze.db.impl.iterators :as i] + [blaze.db.kv :as kv] [blaze.db.kv.mem-spec] [blaze.db.node :as node] [blaze.db.node-spec] @@ -7753,3 +7755,98 @@ "Observation" "09999") :num-resources := 1 :next := nil)))))))) + +(defn- prune-steps [node n t num-steps] + (take num-steps (iterate (fn [{start :next}] (d/prune node n t start)) (d/prune node n t)))) + +(deftest prune-total-test + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]]] + + (is (= 3 (d/prune-total node)))) + + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}] + [:create {:fhir/type :fhir/Patient :id "1"}]]] + + (is (= 6 (d/prune-total node))))) + +(deftest prune-test + (testing "one purged patient in three steps" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]] + [[:patient-purge "0"]]] + + (testing "the next handles go from one index to the next" + (given (prune-steps node 1 2 3) + count := 3 + [0 :num-index-entries-processed] := 1 + [0 :num-index-entries-deleted] := 1 + [0 :next] := {:index :type-as-of-index} + [1 :num-index-entries-processed] := 1 + [1 :num-index-entries-deleted] := 1 + [1 :next] := {:index :system-as-of-index} + [2 :num-index-entries-processed] := 1 + [2 :num-index-entries-deleted] := 1 + [2 :next] := nil)) + + (testing "all index entries are gone" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (is (empty? (vec (i/entries snapshot :resource-as-of-index identity)))) + (is (empty? (vec (i/entries snapshot :type-as-of-index identity)))) + (is (empty? (vec (i/entries snapshot :system-as-of-index identity)))))))) + + (testing "two purged patients in three steps" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}] + [:create {:fhir/type :fhir/Patient :id "1"}]] + [[:patient-purge "0"]] + [[:patient-purge "1"]]] + + (testing "the next handles go from one index to the next" + (given (prune-steps node 2 3 3) + count := 3 + [0 :num-index-entries-processed] := 2 + [0 :num-index-entries-deleted] := 2 + [0 :next] := {:index :type-as-of-index} + [1 :num-index-entries-processed] := 2 + [1 :num-index-entries-deleted] := 2 + [1 :next] := {:index :system-as-of-index} + [2 :num-index-entries-processed] := 2 + [2 :num-index-entries-deleted] := 2 + [2 :next] := nil)) + + (testing "all index entries are gone" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (is (empty? (vec (i/entries snapshot :resource-as-of-index identity)))) + (is (empty? (vec (i/entries snapshot :type-as-of-index identity)))) + (is (empty? (vec (i/entries snapshot :system-as-of-index identity)))))))) + + (testing "two purged patients in six steps" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}] + [:create {:fhir/type :fhir/Patient :id "1"}]] + [[:patient-purge "0"]] + [[:patient-purge "1"]]] + + (testing "the next handles go from one index to the next with sub steps" + (given (prune-steps node 1 3 6) + count := 6 + [0 :num-index-entries-processed] := 1 + [0 :next] := {:index :resource-as-of-index :type "Patient" :id "1" :t 1} + [1 :num-index-entries-processed] := 1 + [1 :next] := {:index :type-as-of-index} + [2 :num-index-entries-processed] := 1 + [2 :next] := {:index :type-as-of-index :type "Patient" :id "1" :t 1} + [3 :num-index-entries-processed] := 1 + [3 :next] := {:index :system-as-of-index} + [4 :num-index-entries-processed] := 1 + [4 :next] := {:index :system-as-of-index :type "Patient" :id "1" :t 1} + [5 :num-index-entries-processed] := 1 + [5 :next] := nil)) + + (testing "all index entries are gone" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (is (empty? (vec (i/entries snapshot :resource-as-of-index identity)))) + (is (empty? (vec (i/entries snapshot :type-as-of-index identity)))) + (is (empty? (vec (i/entries snapshot :system-as-of-index identity))))))))) diff --git a/modules/db/test/blaze/db/impl/index/resource_as_of_spec.clj b/modules/db/test/blaze/db/impl/index/resource_as_of_spec.clj index 82089ad85..1c91ed68f 100644 --- a/modules/db/test/blaze/db/impl/index/resource_as_of_spec.clj +++ b/modules/db/test/blaze/db/impl/index/resource_as_of_spec.clj @@ -19,13 +19,13 @@ :ret bytes?) (s/fdef rao/type-list - :args (s/cat :context (s/keys :req-un [::kv/snapshot :blaze.db/t]) + :args (s/cat :batch-db :blaze.db.impl/batch-db :tid :blaze.db/tid :start-id (s/? :blaze.db/id-byte-string)) :ret (cs/coll-of :blaze.db/resource-handle)) (s/fdef rao/system-list - :args (s/cat :context (s/keys :req-un [::kv/snapshot :blaze.db/t]) + :args (s/cat :batch-db :blaze.db.impl/batch-db :start (s/? (s/cat :start-tid :blaze.db/tid :start-id :blaze.db/id-byte-string))) :ret (cs/coll-of :blaze.db/resource-handle)) @@ -57,3 +57,12 @@ :id-extractor (s/? fn?) :matcher (s/? fn?)) :ret fn?) + +(s/fdef rao/prune + :args (s/cat :snapshot ::kv/snapshot + :n pos-int? + :t :blaze.db/t + :start (s/? (s/cat :start-tid :blaze.db/tid + :start-id :blaze.db/id-byte-string + :start-t :blaze.db/t))) + :ret (s/coll-of ::kv/delete-entry :kind vector?)) diff --git a/modules/db/test/blaze/db/impl/index/resource_as_of_test.clj b/modules/db/test/blaze/db/impl/index/resource_as_of_test.clj new file mode 100644 index 000000000..950aa6563 --- /dev/null +++ b/modules/db/test/blaze/db/impl/index/resource_as_of_test.clj @@ -0,0 +1,104 @@ +(ns blaze.db.impl.index.resource-as-of-test + (:require + [blaze.db.impl.codec :as codec] + [blaze.db.impl.index.resource-as-of :as rao] + [blaze.db.impl.index.resource-as-of-spec] + [blaze.db.impl.index.resource-as-of-test-util :as rao-tu] + [blaze.db.kv :as kv] + [blaze.db.test-util :refer [config with-system-data]] + [blaze.module.test-util :refer [with-system]] + [blaze.test-util :as tu] + [clojure.spec.test.alpha :as st] + [clojure.test :as test :refer [deftest testing]] + [juxt.iota :refer [given]])) + +(st/instrument) + +(test/use-fixtures :each tu/fixture) + +(deftest prune-test + (testing "empty database" + (with-system [{:blaze.db/keys [node]} config] + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 10 0) + :delete-entries := [] + :num-entries-processed := 0 + :next := nil)))) + + (testing "one non-purged patient" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]]] + + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 10 1) + :delete-entries := [] + :num-entries-processed := 1 + :next := nil)))) + + (testing "one purged patient" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]] + [[:patient-purge "0"]]] + + (testing "returns no delete entry at t=1" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 10 1) + [:delete-entries count] := 0 + :num-entries-processed := 1 + :next := nil))) + + (testing "returns one delete entry at t=2" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 10 2) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :resource-as-of-index + [:delete-entries 0 1 rao-tu/decode-key] := {:type "Patient" :id "0" :t 1} + :num-entries-processed := 1 + :next := nil))))) + + (testing "two purged patients" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}] + [:create {:fhir/type :fhir/Patient :id "1"}]] + [[:patient-purge "0"]] + [[:patient-purge "1"]]] + + (testing "returns one delete entry at t=2" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 10 2) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :resource-as-of-index + [:delete-entries 0 1 rao-tu/decode-key] := {:type "Patient" :id "0" :t 1} + :num-entries-processed := 2 + :next := nil))) + + (testing "returns one delete entry at t=3 and n=1" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 1 3) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :resource-as-of-index + [:delete-entries 0 1 rao-tu/decode-key] := {:type "Patient" :id "0" :t 1} + :num-entries-processed := 1 + [:next :tid] := (codec/tid "Patient") + [:next :id] := (codec/id-byte-string "1") + [:next :t] := 1)) + + (testing "it's possible to continue with the next entry" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 2 3 (codec/tid "Patient") (codec/id-byte-string "1") 1) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :resource-as-of-index + [:delete-entries 0 1 rao-tu/decode-key] := {:type "Patient" :id "1" :t 1} + :num-entries-processed := 1 + :next := nil)))) + + (testing "returns two delete entries at t=3" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 10 3) + [:delete-entries count] := 2 + [:delete-entries 0 0] := :resource-as-of-index + [:delete-entries 0 1 rao-tu/decode-key] := {:type "Patient" :id "0" :t 1} + [:delete-entries 1 0] := :resource-as-of-index + [:delete-entries 1 1 rao-tu/decode-key] := {:type "Patient" :id "1" :t 1} + :num-entries-processed := 2 + :next := nil)))))) diff --git a/modules/db/test/blaze/db/impl/index/system_as_of_spec.clj b/modules/db/test/blaze/db/impl/index/system_as_of_spec.clj index 7e63b795c..08778fc0c 100644 --- a/modules/db/test/blaze/db/impl/index/system_as_of_spec.clj +++ b/modules/db/test/blaze/db/impl/index/system_as_of_spec.clj @@ -7,6 +7,7 @@ [blaze.db.impl.index.resource-handle-spec] [blaze.db.impl.index.system-as-of :as sao] [blaze.db.impl.iterators-spec] + [blaze.db.kv :as-alias kv] [blaze.db.kv-spec] [blaze.db.kv.spec] [blaze.db.spec] @@ -20,3 +21,12 @@ :start-tid (s/nilable :blaze.db/tid) :start-id (s/nilable :blaze.db/id-byte-string)) :ret (cs/coll-of :blaze.db/resource-handle)) + +(s/fdef sao/prune + :args (s/cat :snapshot ::kv/snapshot + :n pos-int? + :t :blaze.db/t + :start (s/? (s/cat :start-t :blaze.db/t + :start-tid :blaze.db/tid + :start-id :blaze.db/id-byte-string))) + :ret (s/coll-of ::kv/delete-entry :kind vector?)) diff --git a/modules/db/test/blaze/db/impl/index/system_as_of_test.clj b/modules/db/test/blaze/db/impl/index/system_as_of_test.clj new file mode 100644 index 000000000..3bfc5f970 --- /dev/null +++ b/modules/db/test/blaze/db/impl/index/system_as_of_test.clj @@ -0,0 +1,104 @@ +(ns blaze.db.impl.index.system-as-of-test + (:require + [blaze.db.impl.codec :as codec] + [blaze.db.impl.index.system-as-of :as sao] + [blaze.db.impl.index.system-as-of-spec] + [blaze.db.impl.index.system-as-of-test-util :as sao-tu] + [blaze.db.kv :as kv] + [blaze.db.test-util :refer [config with-system-data]] + [blaze.module.test-util :refer [with-system]] + [blaze.test-util :as tu] + [clojure.spec.test.alpha :as st] + [clojure.test :as test :refer [deftest testing]] + [juxt.iota :refer [given]])) + +(st/instrument) + +(test/use-fixtures :each tu/fixture) + +(deftest prune-test + (testing "empty database" + (with-system [{:blaze.db/keys [node]} config] + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 10 0) + :delete-entries := [] + :num-entries-processed := 0 + :next := nil)))) + + (testing "one non-purged patient" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]]] + + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 10 1) + :delete-entries := [] + :num-entries-processed := 1 + :next := nil)))) + + (testing "one purged patient" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]] + [[:patient-purge "0"]]] + + (testing "returns no delete entry at t=1" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 10 1) + [:delete-entries count] := 0 + :num-entries-processed := 1 + :next := nil))) + + (testing "returns one delete entry at t=2" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 10 2) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :system-as-of-index + [:delete-entries 0 1 sao-tu/decode-key] := {:t 1 :type "Patient" :id "0"} + :num-entries-processed := 1 + :next := nil))))) + + (testing "two purged patients" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}] + [:create {:fhir/type :fhir/Patient :id "1"}]] + [[:patient-purge "0"]] + [[:patient-purge "1"]]] + + (testing "returns one delete entry at t=2" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 10 2) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :system-as-of-index + [:delete-entries 0 1 sao-tu/decode-key] := {:t 1 :type "Patient" :id "0"} + :num-entries-processed := 2 + :next := nil))) + + (testing "returns one delete entry at t=3 and n=1" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 1 3) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :system-as-of-index + [:delete-entries 0 1 sao-tu/decode-key] := {:t 1 :type "Patient" :id "0"} + :num-entries-processed := 1 + [:next :tid] := (codec/tid "Patient") + [:next :t] := 1 + [:next :id] := (codec/id-byte-string "1"))) + + (testing "it's possible to continue with the next entry" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 1 3 1 (codec/tid "Patient") (codec/id-byte-string "1")) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :system-as-of-index + [:delete-entries 0 1 sao-tu/decode-key] := {:t 1 :type "Patient" :id "1"} + :num-entries-processed := 1 + :next := nil)))) + + (testing "returns two delete entries at t=3" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 10 3) + [:delete-entries count] := 2 + [:delete-entries 0 0] := :system-as-of-index + [:delete-entries 0 1 sao-tu/decode-key] := {:t 1 :type "Patient" :id "0"} + [:delete-entries 1 0] := :system-as-of-index + [:delete-entries 1 1 sao-tu/decode-key] := {:t 1 :type "Patient" :id "1"} + :num-entries-processed := 2 + :next := nil)))))) diff --git a/modules/db/test/blaze/db/impl/index/type_as_of_spec.clj b/modules/db/test/blaze/db/impl/index/type_as_of_spec.clj index 6ac4dc358..ec0c877bd 100644 --- a/modules/db/test/blaze/db/impl/index/type_as_of_spec.clj +++ b/modules/db/test/blaze/db/impl/index/type_as_of_spec.clj @@ -7,6 +7,7 @@ [blaze.db.impl.index.resource-handle-spec] [blaze.db.impl.index.type-as-of :as tao] [blaze.db.impl.iterators-spec] + [blaze.db.kv :as-alias kv] [blaze.db.kv-spec] [blaze.db.kv.spec] [blaze.fhir.spec] @@ -19,3 +20,12 @@ :start-t :blaze.db/t :start-id (s/nilable :blaze.db/id-byte-string)) :ret (cs/coll-of :blaze.db/resource-handle)) + +(s/fdef tao/prune + :args (s/cat :snapshot ::kv/snapshot + :n pos-int? + :t :blaze.db/t + :start (s/? (s/cat :start-tid :blaze.db/tid + :start-t :blaze.db/t + :start-id :blaze.db/id-byte-string))) + :ret (s/coll-of ::kv/delete-entry :kind vector?)) diff --git a/modules/db/test/blaze/db/impl/index/type_as_of_test.clj b/modules/db/test/blaze/db/impl/index/type_as_of_test.clj new file mode 100644 index 000000000..fa8ac5635 --- /dev/null +++ b/modules/db/test/blaze/db/impl/index/type_as_of_test.clj @@ -0,0 +1,104 @@ +(ns blaze.db.impl.index.type-as-of-test + (:require + [blaze.db.impl.codec :as codec] + [blaze.db.impl.index.type-as-of :as tao] + [blaze.db.impl.index.type-as-of-spec] + [blaze.db.impl.index.type-as-of-test-util :as tao-tu] + [blaze.db.kv :as kv] + [blaze.db.test-util :refer [config with-system-data]] + [blaze.module.test-util :refer [with-system]] + [blaze.test-util :as tu] + [clojure.spec.test.alpha :as st] + [clojure.test :as test :refer [deftest testing]] + [juxt.iota :refer [given]])) + +(st/instrument) + +(test/use-fixtures :each tu/fixture) + +(deftest prune-test + (testing "empty database" + (with-system [{:blaze.db/keys [node]} config] + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 10 0) + :delete-entries := [] + :num-entries-processed := 0 + :next := nil)))) + + (testing "one non-purged patient" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]]] + + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 10 1) + :delete-entries := [] + :num-entries-processed := 1 + :next := nil)))) + + (testing "one purged patient" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]] + [[:patient-purge "0"]]] + + (testing "returns no delete entry at t=1" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 10 1) + [:delete-entries count] := 0 + :num-entries-processed := 1 + :next := nil))) + + (testing "returns one delete entry at t=2" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 10 2) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :type-as-of-index + [:delete-entries 0 1 tao-tu/decode-key] := {:type "Patient" :t 1 :id "0"} + :num-entries-processed := 1 + :next := nil))))) + + (testing "two purged patients" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}] + [:create {:fhir/type :fhir/Patient :id "1"}]] + [[:patient-purge "0"]] + [[:patient-purge "1"]]] + + (testing "returns one delete entry at t=2" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 10 2) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :type-as-of-index + [:delete-entries 0 1 tao-tu/decode-key] := {:type "Patient" :t 1 :id "0"} + :num-entries-processed := 2 + :next := nil))) + + (testing "returns one delete entry at t=3 and n=1" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 1 3) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :type-as-of-index + [:delete-entries 0 1 tao-tu/decode-key] := {:type "Patient" :t 1 :id "0"} + :num-entries-processed := 1 + [:next :tid] := (codec/tid "Patient") + [:next :t] := 1 + [:next :id] := (codec/id-byte-string "1"))) + + (testing "it's possible to continue with the next entry" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 1 3 (codec/tid "Patient") 1 (codec/id-byte-string "1")) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :type-as-of-index + [:delete-entries 0 1 tao-tu/decode-key] := {:type "Patient" :t 1 :id "1"} + :num-entries-processed := 1 + :next := nil)))) + + (testing "returns two delete entries at t=3" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 10 3) + [:delete-entries count] := 2 + [:delete-entries 0 0] := :type-as-of-index + [:delete-entries 0 1 tao-tu/decode-key] := {:type "Patient" :t 1 :id "0"} + [:delete-entries 1 0] := :type-as-of-index + [:delete-entries 1 1 tao-tu/decode-key] := {:type "Patient" :t 1 :id "1"} + :num-entries-processed := 2 + :next := nil)))))) diff --git a/modules/job-prune/.clj-kondo/config.edn b/modules/job-prune/.clj-kondo/config.edn new file mode 100644 index 000000000..10c705f73 --- /dev/null +++ b/modules/job-prune/.clj-kondo/config.edn @@ -0,0 +1,7 @@ +{:config-paths + ["../../../.clj-kondo/root" + "../../anomaly/resources/clj-kondo.exports/blaze/anomaly" + "../../module-test-util/resources/clj-kondo.exports/blaze/module-test-util"] + + :lint-as + {blaze.job.prune-test/with-system-data clojure.core/with-open}} diff --git a/modules/job-prune/.gitignore b/modules/job-prune/.gitignore new file mode 100644 index 000000000..8b4bda0a4 --- /dev/null +++ b/modules/job-prune/.gitignore @@ -0,0 +1 @@ +resources diff --git a/modules/job-prune/Makefile b/modules/job-prune/Makefile index 56c2f4554..2f5a9d1e2 100644 --- a/modules/job-prune/Makefile +++ b/modules/job-prune/Makefile @@ -1 +1,28 @@ +fmt: + cljfmt check src test build.clj deps.edn tests.edn + +lint: + clj-kondo --lint src test build.clj deps.edn + +build: + clojure -T:build copy-profiles + +prep: build + clojure -X:deps prep + +test: prep + clojure -M:test:kaocha --profile :ci + +test-coverage: prep + clojure -M:test:coverage + +cloc-prod: + cloc src + +cloc-test: + cloc test + +clean: + rm -rf .clj-kondo/.cache .cpcache target resources + .PHONY: fmt lint build prep test test-coverage cloc-prod cloc-test clean diff --git a/modules/job-prune/build.clj b/modules/job-prune/build.clj new file mode 100644 index 000000000..beeaa0ebb --- /dev/null +++ b/modules/job-prune/build.clj @@ -0,0 +1,12 @@ +(ns build + (:require [clojure.tools.build.api :as b])) + +(defn copy-profiles [_] + (doseq [file ["StructureDefinition-PruneJob" + "CodeSystem-PruneJobParameter" + "CodeSystem-PruneJobOutput" + "CodeSystem-PruneIndices" + "ValueSet-PruneIndices"]] + (b/copy-file + {:src (str "../../job-ig/fsh-generated/resources/" file ".json") + :target (str "resources/blaze/job/prune/" file ".json")}))) diff --git a/modules/job-prune/deps.edn b/modules/job-prune/deps.edn new file mode 100644 index 000000000..54e1b5093 --- /dev/null +++ b/modules/job-prune/deps.edn @@ -0,0 +1,44 @@ +{:paths ["src" "resources"] + + :deps + {blaze/job-scheduler + {:local/root "../job-scheduler"} + + blaze/module-base + {:local/root "../module-base"}} + + :deps/prep-lib + {:alias :build + :fn copy-profiles + :ensure "resources/blaze/job/prune"} + + :aliases + {:build + {:deps + {io.github.clojure/tools.build + {:git/tag "v0.10.5" :git/sha "2a21b7a"}} + :ns-default build} + + :test + {:extra-paths ["test"] + + :extra-deps + {blaze/db-stub + {:local/root "../db-stub"} + + blaze/job-test-util + {:local/root "../job-test-util"}}} + + :kaocha + {:extra-deps + {lambdaisland/kaocha + {:mvn/version "1.91.1392"}} + + :main-opts ["-m" "kaocha.runner"]} + + :coverage + {:extra-deps + {lambdaisland/kaocha-cloverage + {:mvn/version "1.1.89"}} + + :main-opts ["-m" "kaocha.runner" "--profile" "coverage"]}}} diff --git a/modules/job-prune/src/blaze/job/prune.clj b/modules/job-prune/src/blaze/job/prune.clj new file mode 100644 index 000000000..03d508599 --- /dev/null +++ b/modules/job-prune/src/blaze/job/prune.clj @@ -0,0 +1,212 @@ +(ns blaze.job.prune + "The prune job calls d/prune in several steps ensuring that the progress can + be tracked accordingly. Prune jobs can be paused and resumed." + (:require + [blaze.anomaly :as ba] + [blaze.async.comp :as ac] + [blaze.db.api :as d] + [blaze.fhir.spec.type :as type] + [blaze.job-scheduler.protocols :as p] + [blaze.job.prune.spec] + [blaze.job.util :as job-util] + [blaze.module :as m] + [clojure.spec.alpha :as s] + [integrant.core :as ig])) + +(set! *warn-on-reflection* true) + +(def ^:private ^:const ^long default-index-entries-per-step 100000) + +(def ^:private parameter-system + "https://samply.github.io/blaze/fhir/CodeSystem/PruneJobParameter") + +(def ^:private output-system + "https://samply.github.io/blaze/fhir/CodeSystem/PruneJobOutput") + +(def ^:private task-output + (partial job-util/task-output output-system)) + +(def ^:private initial-duration + #fhir/Quantity + {:value #fhir/decimal 0 + :unit #fhir/string"s" + :system #fhir/uri"http://unitsofmeasure.org" + :code #fhir/code"s"}) + +(defn- start-job [job total-index-entries] + (assoc + job + :status #fhir/code"in-progress" + :statusReason job-util/started-status-reason + :output + [(task-output "total-index-entries" (type/unsignedInt total-index-entries)) + (task-output "index-entries-processed" #fhir/unsignedInt 0) + (task-output "index-entries-deleted" #fhir/unsignedInt 0) + (task-output "processing-duration" initial-duration)])) + +(defn- add-output [job code value] + (job-util/add-output job output-system code value)) + +(defn- remove-output [job code] + (job-util/remove-output job output-system code)) + +(defn- set-output [job code value] + (if value + (add-output job code value) + (remove-output job code))) + +(defn- increment-unsigned-int [value x] + (type/unsignedInt (+ (type/value value) x))) + +(defn- increment-index-entries-processed [job num-index-entries] + (job-util/update-output-value job output-system "index-entries-processed" + increment-unsigned-int num-index-entries)) + +(defn- increment-index-entries-deleted [job num-index-entries] + (job-util/update-output-value job output-system "index-entries-deleted" + increment-unsigned-int num-index-entries)) + +(defn- increment-quantity-value [quantity x] + (update quantity :value #(type/decimal (+ (type/value %) x)))) + +(defn- increment-duration [job duration] + (job-util/update-output-value job output-system "processing-duration" + increment-quantity-value duration)) + +(defn- set-next [job {:keys [index type id t]}] + (-> (set-output job "next-index" (some-> index name type/code)) + (set-output "next-type" (some-> type type/code)) + (set-output "next-id" (some-> id type/id)) + (set-output "next-t" (some-> t type/positiveInt)))) + +(defn- increment-on-hold-job + [job + {:keys [num-index-entries-processed num-index-entries-deleted duration next]}] + (-> (increment-index-entries-processed job num-index-entries-processed) + (increment-index-entries-deleted num-index-entries-deleted) + (increment-duration duration) + (set-next next))) + +(defn- increment-job [job result] + (-> (assoc job :statusReason job-util/incremented-status-reason) + (increment-on-hold-job result))) + +(defn- complete-job [job result] + (-> (increment-job job result) + (assoc :status #fhir/code"completed") + (dissoc :statusReason))) + +(defn- t [job] + (some-> (job-util/input-value job parameter-system "t") type/value)) + +(defn- next-index [job] + (some-> (job-util/output-value job output-system "next-index") type/value keyword)) + +(defn- next-type [job] + (some-> (job-util/output-value job output-system "next-type") type/value)) + +(defn- next-id [job] + (some-> (job-util/output-value job output-system "next-id") type/value)) + +(defn- next-t [job] + (some-> (job-util/output-value job output-system "next-t") type/value)) + +(defn- next-handle [job] + (when-let [index (next-index job)] + (let [type (next-type job)] + (cond-> {:index index} + type + (assoc :type type :id (next-id job) :t (next-t job)))))) + +(defn- on-hold? [job] + (= #fhir/code"on-hold" (:status job))) + +(defn- update-job [{:keys [admin-node]} job {:keys [next] :as result}] + (if next + (-> (job-util/update-job admin-node job increment-job result) + (ac/exceptionally-compose + (fn [e] + (if (job-util/job-update-failed? e) + (-> (job-util/pull-job admin-node (:id job)) + (ac/then-compose + (fn [job] + (if (on-hold? job) + (job-util/update-job admin-node job increment-on-hold-job result) + (ac/completed-future job))))) + (ac/completed-future e))))) + (job-util/update-job admin-node job complete-job result))) + +(defn- assoc-duration [start result] + (assoc result :duration (BigDecimal/valueOf (- (System/nanoTime) start) 9))) + +(defn- time-future [future] + (ac/then-apply future (partial assoc-duration (System/nanoTime)))) + +(defn- prune-fn [main-node n t] + (fn + ([] + (-> (ac/supply-async #(d/prune main-node n t)) + (time-future))) + ([next] + (-> (ac/supply-async #(d/prune main-node n t next)) + (time-future))))) + +(defn- continuation + "Returns a function that takes a prune result (or nil) and an anomaly + (or nil), updates the job and continues processing if there is more work to + do." + [{:keys [admin-node] :as context} prune job] + (fn [{:keys [next] :as result} anomaly] + (if anomaly + (job-util/update-job admin-node job job-util/fail-job anomaly) + (cond-> (update-job context job result) + next + (ac/then-compose + (fn [job] + (if (on-hold? job) + (ac/completed-future job) + (-> (prune next) + (ac/handle (continuation context prune job)) + (ac/then-compose identity))))))))) + +(def ^:private missing-t-anom + (ba/incorrect "Missing T.")) + +(defn- on-start + [{:keys [main-node admin-node index-entries-per-step] + :or {index-entries-per-step default-index-entries-per-step} + :as context} job] + (if-let [t (t job)] + (let [total (d/prune-total main-node) + prune (prune-fn main-node index-entries-per-step t)] + (-> (job-util/update-job admin-node job start-job total) + (ac/then-compose + (fn [job] + (-> (prune) + (ac/handle (continuation context prune job)) + (ac/then-compose identity)))))) + (job-util/update-job admin-node job job-util/fail-job missing-t-anom))) + +(defn- on-resume + [{:keys [main-node index-entries-per-step] + :or {index-entries-per-step default-index-entries-per-step} + :as context} job] + (let [prune (prune-fn main-node index-entries-per-step (t job)) + next (next-handle job)] + (-> (if next (prune next) (prune)) + (ac/handle (continuation context prune job)) + (ac/then-compose identity)))) + +(defmethod m/pre-init-spec :blaze.job/prune [_] + (s/keys :req-un [::main-node ::admin-node :blaze/clock] + :opt-un [::index-entries-per-step])) + +(defmethod ig/init-key :blaze.job/prune + [_ config] + (reify p/JobHandler + (-on-start [_ job] + (on-start config job)) + (-on-resume [_ job] + (on-resume config job)))) + +(derive :blaze.job/prune :blaze.job/handler) diff --git a/modules/job-prune/src/blaze/job/prune/spec.clj b/modules/job-prune/src/blaze/job/prune/spec.clj new file mode 100644 index 000000000..8831171cf --- /dev/null +++ b/modules/job-prune/src/blaze/job/prune/spec.clj @@ -0,0 +1,14 @@ +(ns blaze.job.prune.spec + (:require + [blaze.db.spec] + [blaze.job.prune :as-alias prune] + [clojure.spec.alpha :as s])) + +(s/def ::prune/main-node + :blaze.db/node) + +(s/def ::prune/admin-node + :blaze.db/node) + +(s/def ::prune/index-entries-per-step + pos-int?) diff --git a/modules/job-prune/test/blaze/job/prune_test.clj b/modules/job-prune/test/blaze/job/prune_test.clj new file mode 100644 index 000000000..83c2e4920 --- /dev/null +++ b/modules/job-prune/test/blaze/job/prune_test.clj @@ -0,0 +1,755 @@ +(ns blaze.job.prune-test + (:require + [blaze.db.api :as d] + [blaze.db.api-spec] + [blaze.db.kv :as kv] + [blaze.db.kv.mem] + [blaze.db.node :as node :refer [node?]] + [blaze.db.node.protocols :as np] + [blaze.db.resource-store :as rs] + [blaze.db.resource-store.kv :as rs-kv] + [blaze.db.search-param-registry] + [blaze.db.tx-cache] + [blaze.db.tx-log :as tx-log] + [blaze.db.tx-log.local] + [blaze.fhir.test-util :refer [structure-definition-repo]] + [blaze.job-scheduler :as js] + [blaze.job.prune] + [blaze.job.test-util :as jtu] + [blaze.job.util :as job-util] + [blaze.log] + [blaze.luid :as luid] + [blaze.module.test-util :refer [with-system]] + [blaze.test-util :as tu :refer [given-thrown]] + [clojure.spec.alpha :as s] + [clojure.spec.test.alpha :as st] + [clojure.test :as test :refer [deftest testing]] + [integrant.core :as ig] + [java-time.api :as time] + [juxt.iota :refer [given]])) + +(set! *warn-on-reflection* true) +(st/instrument) + +(test/use-fixtures :each tu/fixture) + +(deftest init-test + (testing "nil config" + (given-thrown (ig/init {:blaze.job/prune nil}) + :key := :blaze.job/prune + :reason := ::ig/build-failed-spec + [:cause-data ::s/problems 0 :pred] := `map?)) + + (testing "missing config" + (given-thrown (ig/init {:blaze.job/prune {}}) + :key := :blaze.job/prune + :reason := ::ig/build-failed-spec + [:cause-data ::s/problems 0 :pred] := `(fn ~'[%] (contains? ~'% :main-node)) + [:cause-data ::s/problems 1 :pred] := `(fn ~'[%] (contains? ~'% :admin-node)) + [:cause-data ::s/problems 2 :pred] := `(fn ~'[%] (contains? ~'% :clock)))) + + (testing "invalid main-node" + (given-thrown (ig/init {:blaze.job/prune {:main-node ::invalid}}) + :key := :blaze.job/prune + :reason := ::ig/build-failed-spec + [:cause-data ::s/problems 0 :pred] := `(fn ~'[%] (contains? ~'% :admin-node)) + [:cause-data ::s/problems 1 :pred] := `(fn ~'[%] (contains? ~'% :clock)) + [:cause-data ::s/problems 2 :pred] := `node? + [:cause-data ::s/problems 2 :val] := ::invalid))) + +(derive :blaze.db.main/node :blaze.db/node) +(derive :blaze.db.admin/node :blaze.db/node) + +(def config + {:blaze/job-scheduler + {:node (ig/ref :blaze.db.admin/node) + :handlers {:blaze.job/prune (ig/ref :blaze.job/prune)} + :clock (ig/ref :blaze.test/offset-clock) + :rng-fn (ig/ref :blaze.test/fixed-rng-fn)} + + :blaze.job/prune + {:main-node (ig/ref :blaze.db.main/node) + :admin-node (ig/ref :blaze.db.admin/node) + :clock (ig/ref :blaze.test/offset-clock) + :index-entries-per-step 10} + + :blaze.db.main/node + {:tx-log (ig/ref :blaze.db.main/tx-log) + :tx-cache (ig/ref :blaze.db.main/tx-cache) + :indexer-executor (ig/ref :blaze.db.node.main/indexer-executor) + :resource-store (ig/ref :blaze.db/resource-store) + :kv-store (ig/ref :blaze.db.main/index-kv-store) + :resource-indexer (ig/ref :blaze.db.node.main/resource-indexer) + :search-param-registry (ig/ref :blaze.db/search-param-registry) + :scheduler (ig/ref :blaze/scheduler) + :poll-timeout (time/millis 10)} + + :blaze.db.admin/node + {:tx-log (ig/ref :blaze.db.admin/tx-log) + :tx-cache (ig/ref :blaze.db.admin/tx-cache) + :indexer-executor (ig/ref :blaze.db.node.admin/indexer-executor) + :resource-store (ig/ref :blaze.db/resource-store) + :kv-store (ig/ref :blaze.db.admin/index-kv-store) + :resource-indexer (ig/ref :blaze.db.node.admin/resource-indexer) + :search-param-registry (ig/ref :blaze.db/search-param-registry) + :scheduler (ig/ref :blaze/scheduler) + :poll-timeout (time/millis 10)} + + [::tx-log/local :blaze.db.main/tx-log] + {:kv-store (ig/ref :blaze.db.main/transaction-kv-store) + :clock (ig/ref :blaze.test/fixed-clock)} + + [::tx-log/local :blaze.db.admin/tx-log] + {:kv-store (ig/ref :blaze.db.admin/transaction-kv-store) + :clock (ig/ref :blaze.test/fixed-clock)} + + [::kv/mem :blaze.db.main/transaction-kv-store] + {:column-families {}} + + [::kv/mem :blaze.db.admin/transaction-kv-store] + {:column-families {}} + + [:blaze.db/tx-cache :blaze.db.main/tx-cache] + {:kv-store (ig/ref :blaze.db.main/index-kv-store)} + + [:blaze.db/tx-cache :blaze.db.admin/tx-cache] + {:kv-store (ig/ref :blaze.db.admin/index-kv-store)} + + [::node/indexer-executor :blaze.db.node.main/indexer-executor] + {} + + [::node/indexer-executor :blaze.db.node.admin/indexer-executor] + {} + + [::kv/mem :blaze.db.main/index-kv-store] + {:column-families + {:search-param-value-index nil + :resource-value-index nil + :compartment-search-param-value-index nil + :compartment-resource-type-index nil + :active-search-params nil + :tx-success-index {:reverse-comparator? true} + :tx-error-index nil + :t-by-instant-index {:reverse-comparator? true} + :resource-as-of-index nil + :type-as-of-index nil + :system-as-of-index nil + :type-stats-index nil + :system-stats-index nil}} + + [::kv/mem :blaze.db.admin/index-kv-store] + {:column-families + {:search-param-value-index nil + :resource-value-index nil + :compartment-search-param-value-index nil + :compartment-resource-type-index nil + :active-search-params nil + :tx-success-index {:reverse-comparator? true} + :tx-error-index nil + :t-by-instant-index {:reverse-comparator? true} + :resource-as-of-index nil + :type-as-of-index nil + :system-as-of-index nil + :type-stats-index nil + :system-stats-index nil}} + + [::node/resource-indexer :blaze.db.node.main/resource-indexer] + {:kv-store (ig/ref :blaze.db.main/index-kv-store) + :resource-store (ig/ref :blaze.db/resource-store) + :search-param-registry (ig/ref :blaze.db/search-param-registry) + :executor (ig/ref :blaze.db.node.resource-indexer.main/executor)} + + [::node/resource-indexer :blaze.db.node.admin/resource-indexer] + {:kv-store (ig/ref :blaze.db.admin/index-kv-store) + :resource-store (ig/ref :blaze.db/resource-store) + :search-param-registry (ig/ref :blaze.db/search-param-registry) + :executor (ig/ref :blaze.db.node.resource-indexer.admin/executor)} + + [:blaze.db.node.resource-indexer/executor :blaze.db.node.resource-indexer.main/executor] + {} + + [:blaze.db.node.resource-indexer/executor :blaze.db.node.resource-indexer.admin/executor] + {} + + ::rs/kv + {:kv-store (ig/ref :blaze.db/resource-kv-store) + :executor (ig/ref ::rs-kv/executor)} + + [::kv/mem :blaze.db/resource-kv-store] + {:column-families {}} + + ::rs-kv/executor {} + + :blaze.db/search-param-registry + {:structure-definition-repo structure-definition-repo} + + :blaze/scheduler {} + + :blaze.test/fixed-clock {} + + :blaze.test/offset-clock + {:clock (ig/ref :blaze.test/fixed-clock) + :offset-seconds 11} + + :blaze.test/fixed-rng-fn {}}) + +(defmacro with-system-data + [[binding-form config] txs & body] + `(with-system [system# ~config] + (run! #(deref (d/transact (:blaze.db.main/node system#) %)) ~txs) + (let [~binding-form system#] ~@body))) + +(def job + {:fhir/type :fhir/Task + :meta #fhir/Meta{:profile [#fhir/canonical"https://samply.github.io/blaze/fhir/StructureDefinition/PruneJob"]} + :status #fhir/code"ready" + :intent #fhir/code"order" + :code #fhir/CodeableConcept + {:coding + [#fhir/Coding + {:system #fhir/uri"https://samply.github.io/blaze/fhir/CodeSystem/JobType" + :code #fhir/code"prune" + :display "Prune the Database"}]}}) + +(def job-missing-t + job) + +(def job-42 + (assoc + job + :input + [{:fhir/type :fhir.Task/input + :type #fhir/CodeableConcept + {:coding + [#fhir/Coding + {:system #fhir/uri"https://samply.github.io/blaze/fhir/CodeSystem/PruneJobParameter" + :code #fhir/code"t"}]} + :value #fhir/positiveInt 42}])) + +(defn- output-value [job code] + (job-util/output-value job "https://samply.github.io/blaze/fhir/CodeSystem/PruneJobOutput" code)) + +(defn- total-index-entries [job] + (output-value job "total-index-entries")) + +(defn- index-entries-processed [job] + (output-value job "index-entries-processed")) + +(defn- index-entries-deleted [job] + (output-value job "index-entries-deleted")) + +(defn- processing-duration [job] + (output-value job "processing-duration")) + +(defn- next-index [job] + (output-value job "next-index")) + +(defn- next-type [job] + (output-value job "next-type")) + +(defn- next-id [job] + (output-value job "next-id")) + +(defn- next-t [job] + (output-value job "next-t")) + +(defn- job-id [{{:keys [clock rng-fn]} :context}] + (luid/luid clock (rng-fn))) + +(defn gen-create-patient-tx-data [n] + (mapv + (fn [id] + [:create {:fhir/type :fhir/Patient :id (format "%05d" id)}]) + (range n))) + +(defn gen-patient-purge-tx-data [n] + (mapv + (fn [id] + [:patient-purge (format "%05d" id)]) + (range n))) + +(deftest simple-job-execution-test + (testing "success" + (testing "increment three times, once for each index" + (with-system-data [{:blaze/keys [job-scheduler] :as system} config] + [(gen-create-patient-tx-data 10) + (gen-patient-purge-tx-data 5)] + + @(js/create-job job-scheduler job-42) + + (testing "the job is completed" + (given @(jtu/pull-job system :completed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :completed + total-index-entries := #fhir/unsignedInt 30 + index-entries-processed := #fhir/unsignedInt 30 + index-entries-deleted := #fhir/unsignedInt 15 + [processing-duration :value] :? pos? + [processing-duration :unit] := #fhir/string"s" + [processing-duration :system] := #fhir/uri"http://unitsofmeasure.org" + [processing-duration :code] := #fhir/code"s" + next-index := nil)) + + (testing "job history" + (given @(jtu/pull-job-history system) + count := 5 + + [0 jtu/combined-status] := :ready + [1 jtu/combined-status] := :in-progress/started + [2 jtu/combined-status] := :in-progress/incremented + [3 jtu/combined-status] := :in-progress/incremented + [4 jtu/combined-status] := :completed + + [0 total-index-entries] := nil + [1 total-index-entries] := #fhir/unsignedInt 30 + [2 total-index-entries] := #fhir/unsignedInt 30 + [3 total-index-entries] := #fhir/unsignedInt 30 + [4 total-index-entries] := #fhir/unsignedInt 30 + + [0 index-entries-processed] := nil + [1 index-entries-processed] := #fhir/unsignedInt 0 + [2 index-entries-processed] := #fhir/unsignedInt 10 + [3 index-entries-processed] := #fhir/unsignedInt 20 + [4 index-entries-processed] := #fhir/unsignedInt 30 + + [0 index-entries-deleted] := nil + [1 index-entries-deleted] := #fhir/unsignedInt 0 + [2 index-entries-deleted] := #fhir/unsignedInt 5 + [3 index-entries-deleted] := #fhir/unsignedInt 10 + [4 index-entries-deleted] := #fhir/unsignedInt 15 + + [0 next-index] := nil + [1 next-index] := nil + [2 next-index] := #fhir/code"type-as-of-index" + [3 next-index] := #fhir/code"system-as-of-index" + [4 next-index] := nil + + [0 next-type] := nil + [1 next-type] := nil + [2 next-type] := nil + [3 next-type] := nil + [4 next-type] := nil)))) + + (testing "increment six times, twice for each index" + (with-system-data [{:blaze/keys [job-scheduler] :as system} config] + [(gen-create-patient-tx-data 20) + (gen-patient-purge-tx-data 10)] + + @(js/create-job job-scheduler job-42) + + (testing "the job is completed" + (given @(jtu/pull-job system :completed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :completed + total-index-entries := #fhir/unsignedInt 60 + index-entries-processed := #fhir/unsignedInt 60 + index-entries-deleted := #fhir/unsignedInt 30 + [processing-duration :value] :? pos? + [processing-duration :unit] := #fhir/string"s" + [processing-duration :system] := #fhir/uri"http://unitsofmeasure.org" + [processing-duration :code] := #fhir/code"s" + next-index := nil)) + + (testing "job history" + (given @(jtu/pull-job-history system) + count := 8 + + [0 jtu/combined-status] := :ready + [1 jtu/combined-status] := :in-progress/started + [2 jtu/combined-status] := :in-progress/incremented + [3 jtu/combined-status] := :in-progress/incremented + [4 jtu/combined-status] := :in-progress/incremented + [5 jtu/combined-status] := :in-progress/incremented + [6 jtu/combined-status] := :in-progress/incremented + [7 jtu/combined-status] := :completed + + [0 total-index-entries] := nil + [1 total-index-entries] := #fhir/unsignedInt 60 + [2 total-index-entries] := #fhir/unsignedInt 60 + [3 total-index-entries] := #fhir/unsignedInt 60 + [4 total-index-entries] := #fhir/unsignedInt 60 + [5 total-index-entries] := #fhir/unsignedInt 60 + [6 total-index-entries] := #fhir/unsignedInt 60 + [7 total-index-entries] := #fhir/unsignedInt 60 + + [0 index-entries-processed] := nil + [1 index-entries-processed] := #fhir/unsignedInt 0 + [2 index-entries-processed] := #fhir/unsignedInt 10 + [3 index-entries-processed] := #fhir/unsignedInt 20 + [4 index-entries-processed] := #fhir/unsignedInt 30 + [5 index-entries-processed] := #fhir/unsignedInt 40 + [6 index-entries-processed] := #fhir/unsignedInt 50 + [7 index-entries-processed] := #fhir/unsignedInt 60 + + [0 index-entries-deleted] := nil + [1 index-entries-deleted] := #fhir/unsignedInt 0 + [2 index-entries-deleted] := #fhir/unsignedInt 10 + [3 index-entries-deleted] := #fhir/unsignedInt 10 + [4 index-entries-deleted] := #fhir/unsignedInt 20 + [5 index-entries-deleted] := #fhir/unsignedInt 20 + [6 index-entries-deleted] := #fhir/unsignedInt 30 + [7 index-entries-deleted] := #fhir/unsignedInt 30 + + [0 next-index] := nil + [1 next-index] := nil + [2 next-index] := #fhir/code"resource-as-of-index" + [3 next-index] := #fhir/code"type-as-of-index" + [4 next-index] := #fhir/code"type-as-of-index" + [5 next-index] := #fhir/code"system-as-of-index" + [6 next-index] := #fhir/code"system-as-of-index" + [7 next-index] := nil + + [0 next-type] := nil + [1 next-type] := nil + [2 next-type] := #fhir/code"Patient" + [3 next-type] := nil + [4 next-type] := #fhir/code"Patient" + [5 next-type] := nil + [6 next-type] := #fhir/code"Patient" + [7 next-type] := nil + + [0 next-id] := nil + [1 next-id] := nil + [2 next-id] := #fhir/id"00010" + [3 next-id] := nil + [4 next-id] := #fhir/id"00010" + [5 next-id] := nil + [6 next-id] := #fhir/id"00010" + [7 next-id] := nil + + [0 next-t] := nil + [1 next-t] := nil + [2 next-t] := #fhir/positiveInt 1 + [3 next-t] := nil + [4 next-t] := #fhir/positiveInt 1 + [5 next-t] := nil + [6 next-t] := #fhir/positiveInt 1 + [7 next-t] := nil))))) + + (testing "missing t" + (with-system [{:blaze/keys [job-scheduler] :as system} config] + + @(js/create-job job-scheduler job-missing-t) + + (testing "the job has failed" + (given @(jtu/pull-job system :failed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :failed + job-util/error-msg := "Missing T."))))) + +(defn- delayed-prune + ([node n t] + (Thread/sleep 100) + (np/-prune node n t nil)) + ([node n t start] + (Thread/sleep 100) + (np/-prune node n t start))) + +(deftest job-execution-with-pause-test + (with-redefs [d/prune delayed-prune] + (testing "resume from started state" + (with-system-data [{:blaze/keys [job-scheduler] :as system} config] + [(gen-create-patient-tx-data 10) + (gen-patient-purge-tx-data 5)] + + @(js/create-job job-scheduler job-42) + + (given @(jtu/pull-job system :in-progress/started) + :fhir/type := :fhir/Task + job-util/job-number := "1" + total-index-entries := #fhir/unsignedInt 30 + index-entries-processed := #fhir/unsignedInt 0 + index-entries-deleted := #fhir/unsignedInt 0) + + (given @(js/pause-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :on-hold/paused) + + (Thread/sleep 200) + + (given @(js/resume-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :in-progress/resumed) + + (testing "the job is completed" + (given @(jtu/pull-job system :completed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :completed + total-index-entries := #fhir/unsignedInt 30 + index-entries-processed := #fhir/unsignedInt 30 + index-entries-deleted := #fhir/unsignedInt 15 + [processing-duration :value] :? pos? + [processing-duration :unit] := #fhir/string"s" + [processing-duration :system] := #fhir/uri"http://unitsofmeasure.org" + [processing-duration :code] := #fhir/code"s")) + + (testing "job history" + (given @(jtu/pull-job-history system) + count := 7 + + [0 jtu/combined-status] := :ready + [1 jtu/combined-status] := :in-progress/started + [2 jtu/combined-status] := :on-hold/paused + [3 jtu/combined-status] := :on-hold/paused + [4 jtu/combined-status] := :in-progress/resumed + [5 jtu/combined-status] := :in-progress/incremented + [6 jtu/combined-status] := :completed + + [0 total-index-entries] := nil + [1 total-index-entries] := #fhir/unsignedInt 30 + [2 total-index-entries] := #fhir/unsignedInt 30 + [3 total-index-entries] := #fhir/unsignedInt 30 + [4 total-index-entries] := #fhir/unsignedInt 30 + [5 total-index-entries] := #fhir/unsignedInt 30 + [6 total-index-entries] := #fhir/unsignedInt 30 + + [0 index-entries-processed] := nil + [1 index-entries-processed] := #fhir/unsignedInt 0 + [2 index-entries-processed] := #fhir/unsignedInt 0 + [3 index-entries-processed] := #fhir/unsignedInt 10 + [4 index-entries-processed] := #fhir/unsignedInt 10 + [5 index-entries-processed] := #fhir/unsignedInt 20 + [6 index-entries-processed] := #fhir/unsignedInt 30 + + [0 index-entries-deleted] := nil + [1 index-entries-deleted] := #fhir/unsignedInt 0 + [2 index-entries-deleted] := #fhir/unsignedInt 0 + [3 index-entries-deleted] := #fhir/unsignedInt 5 + [4 index-entries-deleted] := #fhir/unsignedInt 5 + [5 index-entries-deleted] := #fhir/unsignedInt 10 + [6 index-entries-deleted] := #fhir/unsignedInt 15))) + + (testing "increment six times, twice for each index" + (with-system-data [{:blaze/keys [job-scheduler] :as system} config] + [(gen-create-patient-tx-data 20) + (gen-patient-purge-tx-data 10)] + + @(js/create-job job-scheduler job-42) + + (given @(jtu/pull-job system :in-progress/started) + :fhir/type := :fhir/Task + job-util/job-number := "1" + total-index-entries := #fhir/unsignedInt 60 + index-entries-processed := #fhir/unsignedInt 0 + index-entries-deleted := #fhir/unsignedInt 0) + + (given @(js/pause-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :on-hold/paused + index-entries-processed := #fhir/unsignedInt 0 + index-entries-deleted := #fhir/unsignedInt 0) + + (Thread/sleep 200) + + (given @(js/resume-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :in-progress/resumed + index-entries-processed := #fhir/unsignedInt 10 + index-entries-deleted := #fhir/unsignedInt 10) + + (testing "the job is completed" + (given @(jtu/pull-job system :completed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :completed + total-index-entries := #fhir/unsignedInt 60 + index-entries-processed := #fhir/unsignedInt 60 + index-entries-deleted := #fhir/unsignedInt 30 + [processing-duration :value] :? pos? + [processing-duration :unit] := #fhir/string"s" + [processing-duration :system] := #fhir/uri"http://unitsofmeasure.org" + [processing-duration :code] := #fhir/code"s")) + + (testing "job history" + (given @(jtu/pull-job-history system) + count := 10 + + [0 jtu/combined-status] := :ready + [1 jtu/combined-status] := :in-progress/started + [2 jtu/combined-status] := :on-hold/paused + [3 jtu/combined-status] := :on-hold/paused + [4 jtu/combined-status] := :in-progress/resumed + [5 jtu/combined-status] := :in-progress/incremented + [6 jtu/combined-status] := :in-progress/incremented + [7 jtu/combined-status] := :in-progress/incremented + [8 jtu/combined-status] := :in-progress/incremented + [9 jtu/combined-status] := :completed + + [0 total-index-entries] := nil + [1 total-index-entries] := #fhir/unsignedInt 60 + [2 total-index-entries] := #fhir/unsignedInt 60 + [3 total-index-entries] := #fhir/unsignedInt 60 + [4 total-index-entries] := #fhir/unsignedInt 60 + [5 total-index-entries] := #fhir/unsignedInt 60 + [6 total-index-entries] := #fhir/unsignedInt 60 + [7 total-index-entries] := #fhir/unsignedInt 60 + [8 total-index-entries] := #fhir/unsignedInt 60 + [9 total-index-entries] := #fhir/unsignedInt 60 + + [0 index-entries-processed] := nil + [1 index-entries-processed] := #fhir/unsignedInt 0 + [2 index-entries-processed] := #fhir/unsignedInt 0 + [3 index-entries-processed] := #fhir/unsignedInt 10 + [4 index-entries-processed] := #fhir/unsignedInt 10 + [5 index-entries-processed] := #fhir/unsignedInt 20 + [6 index-entries-processed] := #fhir/unsignedInt 30 + [7 index-entries-processed] := #fhir/unsignedInt 40 + [8 index-entries-processed] := #fhir/unsignedInt 50 + [9 index-entries-processed] := #fhir/unsignedInt 60 + + [0 index-entries-deleted] := nil + [1 index-entries-deleted] := #fhir/unsignedInt 0 + [2 index-entries-deleted] := #fhir/unsignedInt 0 + [3 index-entries-deleted] := #fhir/unsignedInt 10 + [4 index-entries-deleted] := #fhir/unsignedInt 10 + [5 index-entries-deleted] := #fhir/unsignedInt 10 + [6 index-entries-deleted] := #fhir/unsignedInt 20 + [7 index-entries-deleted] := #fhir/unsignedInt 20 + [8 index-entries-deleted] := #fhir/unsignedInt 30 + [9 index-entries-deleted] := #fhir/unsignedInt 30))))) + + (testing "resume from incremented state" + (with-system-data [{:blaze/keys [job-scheduler] :as system} config] + [(gen-create-patient-tx-data 10) + (gen-patient-purge-tx-data 5)] + + @(js/create-job job-scheduler job-42) + + (given @(jtu/pull-job system :in-progress/incremented) + :fhir/type := :fhir/Task + job-util/job-number := "1" + total-index-entries := #fhir/unsignedInt 30 + index-entries-processed := #fhir/unsignedInt 10 + index-entries-deleted := #fhir/unsignedInt 5) + + (given @(js/pause-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :on-hold/paused) + + (Thread/sleep 200) + + (given @(js/resume-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :in-progress/resumed) + + (testing "the job is completed" + (given @(jtu/pull-job system :completed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :completed + total-index-entries := #fhir/unsignedInt 30 + index-entries-processed := #fhir/unsignedInt 30 + index-entries-deleted := #fhir/unsignedInt 15 + [processing-duration :value] :? pos? + [processing-duration :unit] := #fhir/string"s" + [processing-duration :system] := #fhir/uri"http://unitsofmeasure.org" + [processing-duration :code] := #fhir/code"s")) + + (testing "job history" + (given @(jtu/pull-job-history system) + count := 7 + + [0 jtu/combined-status] := :ready + [1 jtu/combined-status] := :in-progress/started + [2 jtu/combined-status] := :in-progress/incremented + [3 jtu/combined-status] := :on-hold/paused + [4 jtu/combined-status] := :on-hold/paused + [5 jtu/combined-status] := :in-progress/resumed + [6 jtu/combined-status] := :completed))) + + (testing "increment six times, twice for each index" + (with-system-data [{:blaze/keys [job-scheduler] :as system} config] + [(gen-create-patient-tx-data 20) + (gen-patient-purge-tx-data 10)] + + @(js/create-job job-scheduler job-42) + + (given @(jtu/pull-job system :in-progress/incremented) + :fhir/type := :fhir/Task + job-util/job-number := "1" + total-index-entries := #fhir/unsignedInt 60 + index-entries-processed := #fhir/unsignedInt 10 + index-entries-deleted := #fhir/unsignedInt 10) + + (given @(js/pause-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :on-hold/paused) + + (Thread/sleep 200) + + (given @(js/resume-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :in-progress/resumed) + + (testing "the job is completed" + (given @(jtu/pull-job system :completed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :completed + total-index-entries := #fhir/unsignedInt 60 + index-entries-processed := #fhir/unsignedInt 60 + index-entries-deleted := #fhir/unsignedInt 30 + [processing-duration :value] :? pos? + [processing-duration :unit] := #fhir/string"s" + [processing-duration :system] := #fhir/uri"http://unitsofmeasure.org" + [processing-duration :code] := #fhir/code"s")) + + (testing "job history" + (given @(jtu/pull-job-history system) + count := 10 + + [0 jtu/combined-status] := :ready + [1 jtu/combined-status] := :in-progress/started + [2 jtu/combined-status] := :in-progress/incremented + [3 jtu/combined-status] := :on-hold/paused + [4 jtu/combined-status] := :on-hold/paused + [5 jtu/combined-status] := :in-progress/resumed + [6 jtu/combined-status] := :in-progress/incremented + [7 jtu/combined-status] := :in-progress/incremented + [8 jtu/combined-status] := :in-progress/incremented + [9 jtu/combined-status] := :completed + + [0 index-entries-processed] := nil + [1 index-entries-processed] := #fhir/unsignedInt 0 + [2 index-entries-processed] := #fhir/unsignedInt 10 + [3 index-entries-processed] := #fhir/unsignedInt 10 + [4 index-entries-processed] := #fhir/unsignedInt 20 + [5 index-entries-processed] := #fhir/unsignedInt 20 + [6 index-entries-processed] := #fhir/unsignedInt 30 + [7 index-entries-processed] := #fhir/unsignedInt 40 + [8 index-entries-processed] := #fhir/unsignedInt 50 + [9 index-entries-processed] := #fhir/unsignedInt 60 + + [0 next-index] := nil + [1 next-index] := nil + [2 next-index] := #fhir/code"resource-as-of-index" + [3 next-index] := #fhir/code"resource-as-of-index" + [4 next-index] := #fhir/code"type-as-of-index" + [5 next-index] := #fhir/code"type-as-of-index" + [6 next-index] := #fhir/code"type-as-of-index" + [7 next-index] := #fhir/code"system-as-of-index" + [8 next-index] := #fhir/code"system-as-of-index" + [9 next-index] := nil + + [0 next-type] := nil + [1 next-type] := nil + [2 next-type] := #fhir/code"Patient" + [3 next-type] := #fhir/code"Patient" + [4 next-type] := nil + [5 next-type] := nil + [6 next-type] := #fhir/code"Patient" + [7 next-type] := nil + [8 next-type] := #fhir/code"Patient" + [9 next-type] := nil))))))) diff --git a/modules/job-prune/tests.edn b/modules/job-prune/tests.edn new file mode 100644 index 000000000..12727a10d --- /dev/null +++ b/modules/job-prune/tests.edn @@ -0,0 +1,10 @@ +#kaocha/v1 + #merge + [{} + #profile {:ci {:reporter kaocha.report/documentation + :color? false} + :coverage {:plugins [:kaocha.plugin/cloverage] + :cloverage/opts + {:codecov? true} + :reporter kaocha.report/documentation + :color? false}}] diff --git a/resources/blaze.edn b/resources/blaze.edn index 0cc5b8e76..0212d7217 100644 --- a/resources/blaze.edn +++ b/resources/blaze.edn @@ -436,6 +436,11 @@ :admin-node #blaze/ref :blaze.db.admin/node :clock #blaze/ref :blaze/clock} + :blaze.job/prune + {:main-node #blaze/ref :blaze.db.main/node + :admin-node #blaze/ref :blaze.db.admin/node + :clock #blaze/ref :blaze/clock} + :blaze.job/re-index {:main-node #blaze/ref :blaze.db.main/node :admin-node #blaze/ref :blaze.db.admin/node