-
Notifications
You must be signed in to change notification settings - Fork 101
CMR-11070: Fix bug when finalizing rebalancing back to small collections, fix in-memory db to allow rebalancing #2384
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
aa6f36b
d36dd87
2120228
fc001f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,6 @@ | |
| (:import | ||
| (clojure.lang ExceptionInfo))) | ||
|
|
||
|
|
||
| (defn add-searchable-generic-types | ||
| "Add the list of supported generic document types to a list of fixed searchable | ||
| concept types presumable from searchable-concept-types. | ||
|
|
@@ -32,13 +31,13 @@ | |
| filtered-pipeline-documents (cond | ||
| (= es-config/gran-elastic-name es-cluster-name) | ||
| (into {} (for [[k v] approved-pipeline-documents | ||
| :when (.startsWith (name k) "granule")] | ||
| [k v])) | ||
| :when (.startsWith (name k) "granule")] | ||
| [k v])) | ||
|
|
||
| (= es-config/elastic-name es-cluster-name) | ||
| (into {} (for [[k v] approved-pipeline-documents | ||
| :when (not (.startsWith (name k) "granule"))] | ||
| [k v])) | ||
| :when (not (.startsWith (name k) "granule"))] | ||
| [k v])) | ||
|
|
||
| :else (throw (Exception. (es-config/invalid-elastic-cluster-name-msg es-cluster-name))))] | ||
| (reduce (fn [data, item] (conj data (keyword (str "generic-" (name item))))) | ||
|
|
@@ -134,8 +133,8 @@ | |
| (not (nil? generated-updated-granule-concepts))) | ||
| (do | ||
| (info "Generated updated granule concepts = " generated-updated-granule-concepts | ||
| " is not equal to the requested updated granule concepts = " requested-updated-granule-concepts | ||
| " We are going to merge the two lists together and any duplicates will be overwritten to favor the requested granule concept.") | ||
| " is not equal to the requested updated granule concepts = " requested-updated-granule-concepts | ||
| " We are going to merge the two lists together and any duplicates will be overwritten to favor the requested granule concept.") | ||
| (assoc-in generated-concepts [:granule] (merge generated-updated-granule-concepts requested-updated-granule-concepts))) | ||
| generated-concepts)] | ||
| {:id (:id index-set) | ||
|
|
@@ -235,10 +234,10 @@ | |
| non-gran-index-keys-to-extract (set/difference (set (keys inner-combined-index-set)) (set gran-index-keys)) | ||
| non-gran-outer-map-index-set (select-keys inner-combined-index-set non-gran-index-keys-to-extract) | ||
| non-gran-index-set-without-concepts (-> non-gran-outer-map-index-set | ||
| (dissoc :concepts) | ||
| (assoc :name (:name inner-combined-index-set) | ||
| :id (:id inner-combined-index-set) | ||
| :create-reason (:create-reason inner-combined-index-set))) | ||
| (dissoc :concepts) | ||
| (assoc :name (:name inner-combined-index-set) | ||
| :id (:id inner-combined-index-set) | ||
| :create-reason (:create-reason inner-combined-index-set))) | ||
| non-gran-concepts-map-index-set (select-keys combined-concepts-map (set/difference (set (keys combined-concepts-map)) (set gran-index-keys))) | ||
| non-gran-index-set (assoc non-gran-index-set-without-concepts :concepts non-gran-concepts-map-index-set)] | ||
|
|
||
|
|
@@ -502,13 +501,19 @@ | |
|
|
||
| (defn- remove-granule-index-from-index-set | ||
| "Removes the separate granule index for the given collection from the index set. Validates the | ||
| collection index is listed in the index-set." | ||
| collection index is listed in the index-set. | ||
| Returns the updated given index-set." | ||
| [index-set collection-concept-id] | ||
| (validate-granule-index-exists index-set collection-concept-id) | ||
| (update-in index-set [:index-set :granule :indexes] | ||
| (fn [indexes] | ||
| (remove #(= collection-concept-id (index-name->concept-id (:name %))) | ||
| indexes)))) | ||
| (let [_ (validate-granule-index-exists index-set collection-concept-id) | ||
| ;; remove granule index from the :granule key indexes details | ||
| index-set (update-in index-set [:index-set :granule :indexes] | ||
| (fn [indexes] | ||
| (remove #(= collection-concept-id (index-name->concept-id (:name %))) | ||
| indexes))) | ||
| coll-base-name (keyword (index-name->concept-id collection-concept-id)) | ||
| ;; remove granule index from concepts list | ||
| index-set (update-in index-set [:index-set :concepts :granule] dissoc coll-base-name)] | ||
| index-set)) | ||
|
|
||
| (defn mark-collection-as-rebalancing | ||
| "Marks the given collection as rebalancing in the index set." | ||
|
|
@@ -524,52 +529,66 @@ | |
| (format "Cannot rebalance [%s] while its related indexes are being resharded." | ||
| concept-id))) | ||
| gran-index-set (-> gran-index-set | ||
| (update-in | ||
| [:index-set :granule :rebalancing-collections] | ||
| add-rebalancing-collection concept-id) | ||
| (update-in | ||
| [:index-set :granule :rebalancing-targets] | ||
| assoc concept-id target) | ||
| (update-in | ||
| [:index-set :granule :rebalancing-status] | ||
| assoc concept-id "IN_PROGRESS") | ||
| ((fn [gran-index-set] | ||
| (if (= "small-collections" target) | ||
| (do | ||
| (validate-granule-index-exists gran-index-set concept-id) | ||
| gran-index-set) | ||
| (add-new-granule-index-to-index-set gran-index-set concept-id)))))] | ||
| (update-in | ||
| [:index-set :granule :rebalancing-collections] | ||
| add-rebalancing-collection concept-id) | ||
| (update-in | ||
| [:index-set :granule :rebalancing-targets] | ||
| assoc concept-id target) | ||
| (update-in | ||
| [:index-set :granule :rebalancing-status] | ||
| assoc concept-id "IN_PROGRESS") | ||
| ((fn [gran-index-set] | ||
| (if (= "small-collections" target) | ||
| (do | ||
| (validate-granule-index-exists gran-index-set concept-id) | ||
| gran-index-set) | ||
| (add-new-granule-index-to-index-set gran-index-set concept-id)))))] | ||
| ;; Update the index set. This will create the new collection indexes as needed. | ||
| (validate-requested-index-set context es-config/gran-elastic-name gran-index-set true) | ||
| (update-index-set context es-config/gran-elastic-name gran-index-set))) | ||
|
|
||
| (defn finalize-collection-rebalancing | ||
| "Removes the collection from the list of rebalancing collections" | ||
| [context index-set-id concept-id] | ||
| (let [gran-index-set (index-set-util/get-index-set context es-config/gran-elastic-name index-set-id) | ||
| target (get-in gran-index-set [:index-set :granule :rebalancing-targets (keyword concept-id)]) | ||
| (let [old-gran-index-set (index-set-util/get-index-set context es-config/gran-elastic-name index-set-id) | ||
| target (get-in old-gran-index-set [:index-set :granule :rebalancing-targets (keyword concept-id)]) | ||
| _ (info (format "Finalizing rebalancing granules for collection [%s] to target [%s]." | ||
| concept-id target)) | ||
| _ (rebalancing-collections/validate-target target concept-id) | ||
| gran-index-set (as-> gran-index-set index-set | ||
| (update-in | ||
| index-set | ||
| [:index-set :granule :rebalancing-collections] | ||
| remove-rebalancing-collection concept-id) | ||
| (update-in | ||
| index-set | ||
| [:index-set :granule :rebalancing-targets] | ||
| dissoc (keyword concept-id)) | ||
| (update-in | ||
| index-set | ||
| [:index-set :granule :rebalancing-status] | ||
| dissoc (keyword concept-id)) | ||
| (if (= "small-collections" target) | ||
| (remove-granule-index-from-index-set index-set concept-id) | ||
| index-set))] | ||
| ;; Update the index set. This will create the new collection indexes as needed. | ||
| (validate-requested-index-set context es-config/gran-elastic-name gran-index-set true) | ||
| (update-index-set context es-config/gran-elastic-name (util/remove-nils-empty-maps-seqs gran-index-set)))) | ||
| new-gran-index-set (as-> old-gran-index-set index-set | ||
| (update-in | ||
| index-set | ||
| [:index-set :granule :rebalancing-collections] | ||
| remove-rebalancing-collection concept-id) | ||
| (update-in | ||
| index-set | ||
| [:index-set :granule :rebalancing-targets] | ||
| dissoc (keyword concept-id)) | ||
| (update-in | ||
| index-set | ||
| [:index-set :granule :rebalancing-status] | ||
| dissoc (keyword concept-id)) | ||
| (if (= "small-collections" target) | ||
| (remove-granule-index-from-index-set index-set concept-id) | ||
| index-set)) | ||
| es-store (indexer-util/context->es-store context es-config/gran-elastic-name)] | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
| (try | ||
| ;; Update the index set. This will create the new collection indexes as needed. | ||
| (validate-requested-index-set context es-config/gran-elastic-name new-gran-index-set true) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These two forms |
||
| (update-index-set context es-config/gran-elastic-name (util/remove-nils-empty-maps-seqs new-gran-index-set)) | ||
|
|
||
| ;; Delete the separate index for this collection when moving back into small collections index | ||
| (when (= "small-collections" target) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
| (let [collection-key (keyword concept-id) | ||
| old-separate-index (get-in old-gran-index-set [:index-set :concepts :granule collection-key])] | ||
| (if old-separate-index | ||
| (es/delete-index es-store old-separate-index) | ||
| (warn (format "No separate index found for [%s] in old index-set; skipping deletion." concept-id))))) | ||
| (catch Exception e | ||
| (error e (format "Failed to finalize rebalancing for [%s] -> [%s]" concept-id target)) | ||
| (errors/throw-service-error :internal-error | ||
| (format "Failed to finalize rebalancing for [%s]; see server logs." concept-id)))))) | ||
|
|
||
| (defn update-collection-rebalancing-status | ||
| "Update the collection rebalancing status." | ||
|
|
@@ -705,14 +724,14 @@ | |
| current-target (get-in index-set [:index-set concept-type :resharding-targets (keyword index)]) | ||
| _ (when-not current-target | ||
| (errors/throw-service-error | ||
| :not-found | ||
| (format "The index [%s] is not being resharded." index))) | ||
| :not-found | ||
| (format "The index [%s] is not being resharded." index))) | ||
| current-status (get-in index-set [:index-set concept-type :resharding-status (keyword index)]) | ||
| _ (when-not current-status | ||
| (errors/throw-service-error | ||
| :internal-error | ||
| (format | ||
| "The status of resharding index [%s] is not found." index))) | ||
| :internal-error | ||
| (format | ||
| "The status of resharding index [%s] is not found." index))) | ||
| updated-index-set (if-not (= current-status "COMPLETE") | ||
| ;; check if es /_reindex is still happening when we started the reshard asynchronously in reshard/start | ||
| (let [reindexing-still-in-progress (es-helper/reindexing-still-in-progress? conn index)] | ||
|
|
@@ -732,13 +751,13 @@ | |
| :reshard-index updated-target | ||
| :reshard-status updated-status} | ||
| (errors/throw-service-error | ||
| :internal-error | ||
| (format | ||
| "The status of resharding index [%s] is not found." index))) | ||
| :internal-error | ||
| (format | ||
| "The status of resharding index [%s] is not found." index))) | ||
| (errors/throw-service-error | ||
| :not-found | ||
| (format | ||
| "The index [%s] is not being resharded." index))))) | ||
| :not-found | ||
| (format | ||
| "The index [%s] is not being resharded." index))))) | ||
|
|
||
| (defn- validate-resharding-complete | ||
| "Validate that resharding has completed successfully for the given index " | ||
|
|
@@ -808,8 +827,8 @@ | |
| concept-type (get-concept-type-for-index index-set index) | ||
| _ (when-not concept-type | ||
| (errors/throw-service-error | ||
| :not-found | ||
| (format "Index [%s] does not exist in elastic cluster [%s]." index elastic-name))) | ||
| :not-found | ||
| (format "Index [%s] does not exist in elastic cluster [%s]." index elastic-name))) | ||
| target-index-name (get-in index-set [:index-set concept-type :resharding-targets (keyword index)]) | ||
| es-store (indexer-util/context->es-store context elastic-name) | ||
| prefix-id (get-in index-set [:index-set :id]) | ||
|
|
@@ -837,20 +856,19 @@ | |
| (errors/throw-service-error :internal-error | ||
| (format "Failed to rollback resharding for [%s]; see server logs." index)))))) | ||
|
|
||
|
|
||
| (defn reset | ||
| "Put elastic in a clean state after deleting indices associated with index-sets and index-set docs." | ||
| [context] | ||
| (let [{:keys [index-name]} (config/idx-cfg-for-index-sets es-config/gran-elastic-name) | ||
| gran-index-set-ids (es/get-index-set-ids | ||
| (indexer-util/context->es-store context es-config/gran-elastic-name) | ||
| index-name | ||
| "_doc") | ||
| (indexer-util/context->es-store context es-config/gran-elastic-name) | ||
| index-name | ||
| "_doc") | ||
| {:keys [index-name]} (config/idx-cfg-for-index-sets es-config/elastic-name) | ||
| non-gran-index-set-ids (es/get-index-set-ids | ||
| (indexer-util/context->es-store context es-config/elastic-name) | ||
| index-name | ||
| "_doc")] | ||
| (indexer-util/context->es-store context es-config/elastic-name) | ||
| index-name | ||
| "_doc")] | ||
| ;; delete indices assoc with index-set | ||
| (doseq [id gran-index-set-ids] | ||
| (delete-index-set context (str id) es-config/gran-elastic-name)) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,6 @@ | |
| "An in memory implementation of the metadata database." | ||
| (:require | ||
| [clj-time.core :as t] | ||
| [clojure.string :as string] | ||
| [cmr.common.concepts :as cc] | ||
| [cmr.common.date-time-parser :as p] | ||
| [cmr.common.generics :as common-generic] | ||
|
|
@@ -282,6 +281,27 @@ | |
| params)] | ||
| (concepts->find-result found-concepts params))) | ||
|
|
||
| (defn find-concepts-in-batches | ||
| ([db provider params batch-size] | ||
| (find-concepts-in-batches db provider params batch-size 0)) | ||
| ([db provider params batch-size requested-start-index] | ||
| (let [{:keys [concept-type]} params | ||
| provider-id (:provider-id provider) | ||
| concepts @(:concepts-atom db) | ||
| filtered-concepts (concepts/search-with-params concepts (assoc params :provider-id provider-id)) | ||
| sorted-concepts (vec (sort-by :concept-id filtered-concepts)) | ||
| start-index (max requested-start-index 0)] | ||
| (letfn [(find-batch [start-index] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. did AI suggest letfn, I seam to be seeing it more often these days.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. I think it's neat and useful |
||
| (let [end-index (min (+ start-index batch-size) (count sorted-concepts)) | ||
| batch (subvec sorted-concepts start-index end-index)] | ||
| (mapv #(assoc % :provider-id provider-id) batch))) | ||
| (lazy-find [start-index] | ||
| (when (< start-index (count sorted-concepts)) | ||
| (let [batch (find-batch start-index)] | ||
| (when (not (empty? batch)) | ||
| (cons batch (lazy-seq (lazy-find (+ start-index batch-size))))))))] | ||
| (lazy-find start-index))))) | ||
|
|
||
| (defn find-associations-by-concept-id | ||
| "Searches the passed in concepts (which is the in-memory DB) for all associations that | ||
| involve the passed in concept id." | ||
|
|
@@ -314,6 +334,7 @@ | |
| (def concept-search-behaviour | ||
| {:find-concepts find-concepts | ||
| :find-latest-concepts find-latest-concepts | ||
| :find-concepts-in-batches find-concepts-in-batches | ||
| :find-associations find-associations | ||
| :find-latest-associations find-latest-associations}) | ||
|
|
||
|
|
@@ -673,4 +694,25 @@ | |
| ;; to view most recently saved concept | ||
| (first @(:concepts-atom db)) | ||
| (get-concept-id db :order-option {:provider-id "PROV1"} "order-option-1") | ||
| (get-concept db :order-option {:provider-id "PROV1"} "OO1200000001-PROV1")) | ||
| (get-concept db :order-option {:provider-id "PROV1"} "OO1200000001-PROV1") | ||
|
|
||
| ;; Test for find-concepts-in-batches | ||
| (def test-db (create-db)) | ||
| (def test-provider {:provider-id "TEST-PROVIDER"}) | ||
| ;; Add some test concepts | ||
| (doseq [i (range 1 11)] | ||
| (save-concept test-db test-provider | ||
| {:concept-type :collection | ||
| :provider-id "TEST-PROVIDER" | ||
| :native-id (str "test-collection-" i) | ||
| :concept-id (str "C" i "-TEST-PROVIDER") | ||
| :revision-id 1})) | ||
| ;; Test find-concepts-in-batches | ||
| (def batches (find-concepts-in-batches test-db test-provider {:concept-type :collection} 3)) | ||
| ;; Print the results | ||
| (doseq [batch batches] | ||
| (println "Batch:") | ||
| (doseq [concept batch] | ||
| (println (select-keys concept [:concept-id :native-id]))) (println)) | ||
| ) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My LSP did not like this but I left it because I could have sworn that rich comments are the one time you are supposed to have the ending paren on its own line, for IDE line-wise selection.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as a lover of: what's up with the floating println? I understand your just writing a new line after a loop, but it is just hanging out there
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if you are asking about what's up with its existence or up with its formatting, so --
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not bothered so much by putting ) with all the others because putting them on their own line would be way way way too cluttered for Lisps, and because it's expected that the IDE has form-wise selection. But in the case of rich comments I could have sworn it was supposed to be on its own line ... IDK, maybe they changed it but I have a feeling it might be a missing LSP feature. Conceptually it also makes it more readable because rich comments are the one time when you should not interpret the entire form as one thing, some pieces of it might execute, others might need something else to resolve, etc.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these lines for defining this var
new-gran-index-setactually remain the same, except the names in this first line were updated to have thenew-andold-prefixes.