From 1279a5e695880b2fec96275dd0496645649ab3e7 Mon Sep 17 00:00:00 2001 From: Marco Montagna Date: Mon, 13 Sep 2021 16:58:33 -0700 Subject: [PATCH 1/5] Add option to skip duplicate upserts. --- lib/pgsync/client.rb | 1 + lib/pgsync/task.rb | 7 ++++++- test/sync_test.rb | 29 +++++++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/lib/pgsync/client.rb b/lib/pgsync/client.rb index 56997f9c..720b8be6 100644 --- a/lib/pgsync/client.rb +++ b/lib/pgsync/client.rb @@ -56,6 +56,7 @@ def slop_options o.boolean "--overwrite", "overwrite existing rows", default: false o.boolean "--preserve", "preserve existing rows", default: false o.boolean "--truncate", "truncate existing rows", default: false + o.boolean "--overwrite-only-changed", "Only overwrite rows with values that have changed", default: false o.separator "" o.separator "Foreign key options:" diff --git a/lib/pgsync/task.rb b/lib/pgsync/task.rb index 4776d48c..794139d4 100644 --- a/lib/pgsync/task.rb +++ b/lib/pgsync/task.rb @@ -140,8 +140,13 @@ def sync_data "NOTHING" else # overwrite or sql clause setter = shared_fields.reject { |f| primary_key.include?(f) }.map { |f| "#{quote_ident(f)} = EXCLUDED.#{quote_ident(f)}" } + if opts[:overwrite_only_changed] + nooper = "WHERE NOT (#{shared_fields.reject { |f| primary_key.include?(f) }.map { |f| "#{quoted_table}.#{quote_ident(f)} = EXCLUDED.#{quote_ident(f)}" }.join(" AND ")})" + else + nooper = "" + end if setter.any? - "UPDATE SET #{setter.join(", ")}" + "UPDATE SET #{setter.join(", ")} #{nooper}" else "NOTHING" end diff --git a/test/sync_test.rb b/test/sync_test.rb index 3498738b..0ef525ca 100644 --- a/test/sync_test.rb +++ b/test/sync_test.rb @@ -19,6 +19,35 @@ def test_overwrite assert_result("--overwrite", source, dest, expected) end + def test_overwrite_only_changed + source = 3.times.map { |i| {"id" => i + 1, "title" => "Post #{i + 1}"} } + dest = source + expected = source + assert_result("--overwrite --overwrite-only-changed", source, dest, expected) + # get ctids (~ table row versions) + ctids = conn2.exec("select ctid from posts").to_a + # rerun the overwrite + assert_works "posts --overwrite --overwrite-only-changed", config: true + ctids2 = conn2.exec("select ctid from posts").to_a + # verify ctids have not changed + assert_equal(ctids, ctids2) + end + + + def test_overwrite_without_only_changed_has_ctid_diffs + source = 3.times.map { |i| {"id" => i + 1, "title" => "Post #{i + 1}"} } + dest = source + expected = source + assert_result("--overwrite", source, dest, expected) + # get ctids (~ table row versions) + ctids = conn2.exec("select ctid from posts").to_a + # rerun the overwrite + assert_works "posts --overwrite", config: true + ctids2 = conn2.exec("select ctid from posts").to_a + # verify ctids have changed. + refute_equal(ctids, ctids2) + end + def test_preserve source = 3.times.map { |i| {"id" => i + 1, "title" => "Post #{i + 1}"} } dest = [{"id" => 1, "title" => "First Post"}, {"id" => 4, "title" => "Post 4"}] From 72e88eb367a451e672dc2b9293099ac085717597 Mon Sep 17 00:00:00 2001 From: Marco Montagna Date: Thu, 16 Sep 2021 09:49:34 -0700 Subject: [PATCH 2/5] Remove option. --- lib/pgsync/client.rb | 1 - lib/pgsync/task.rb | 9 ++------- test/sync_test.rb | 21 +++------------------ 3 files changed, 5 insertions(+), 26 deletions(-) diff --git a/lib/pgsync/client.rb b/lib/pgsync/client.rb index 720b8be6..56997f9c 100644 --- a/lib/pgsync/client.rb +++ b/lib/pgsync/client.rb @@ -56,7 +56,6 @@ def slop_options o.boolean "--overwrite", "overwrite existing rows", default: false o.boolean "--preserve", "preserve existing rows", default: false o.boolean "--truncate", "truncate existing rows", default: false - o.boolean "--overwrite-only-changed", "Only overwrite rows with values that have changed", default: false o.separator "" o.separator "Foreign key options:" diff --git a/lib/pgsync/task.rb b/lib/pgsync/task.rb index 794139d4..2f8502c5 100644 --- a/lib/pgsync/task.rb +++ b/lib/pgsync/task.rb @@ -140,18 +140,13 @@ def sync_data "NOTHING" else # overwrite or sql clause setter = shared_fields.reject { |f| primary_key.include?(f) }.map { |f| "#{quote_ident(f)} = EXCLUDED.#{quote_ident(f)}" } - if opts[:overwrite_only_changed] - nooper = "WHERE NOT (#{shared_fields.reject { |f| primary_key.include?(f) }.map { |f| "#{quoted_table}.#{quote_ident(f)} = EXCLUDED.#{quote_ident(f)}" }.join(" AND ")})" - else - nooper = "" - end if setter.any? - "UPDATE SET #{setter.join(", ")} #{nooper}" + "UPDATE SET #{setter.join(", ")} WHERE NOT (#{shared_fields.reject { |f| primary_key.include?(f) }.map { |f| "#{quoted_table}.#{quote_ident(f)} = EXCLUDED.#{quote_ident(f)}" }.join(" AND ")})" else "NOTHING" end end - destination.execute("INSERT INTO #{quoted_table} (#{fields}) (SELECT #{fields} FROM #{quote_ident_full(temp_table)}) ON CONFLICT (#{on_conflict}) DO #{action}") + destination.execute("INSERT INTO #{quoted_table} (#{fields}) (SELECT #{fields} FROM #{quote_ident_full(temp_table)}) ON CONFLICT (#{on_conflict}) DO #{action} returning *").length else # use delete instead of truncate for foreign keys if opts[:defer_constraints] || opts[:defer_constraints_v2] diff --git a/test/sync_test.rb b/test/sync_test.rb index 0ef525ca..157b514b 100644 --- a/test/sync_test.rb +++ b/test/sync_test.rb @@ -19,22 +19,7 @@ def test_overwrite assert_result("--overwrite", source, dest, expected) end - def test_overwrite_only_changed - source = 3.times.map { |i| {"id" => i + 1, "title" => "Post #{i + 1}"} } - dest = source - expected = source - assert_result("--overwrite --overwrite-only-changed", source, dest, expected) - # get ctids (~ table row versions) - ctids = conn2.exec("select ctid from posts").to_a - # rerun the overwrite - assert_works "posts --overwrite --overwrite-only-changed", config: true - ctids2 = conn2.exec("select ctid from posts").to_a - # verify ctids have not changed - assert_equal(ctids, ctids2) - end - - - def test_overwrite_without_only_changed_has_ctid_diffs + def test_overwrite_skips_identical_rows source = 3.times.map { |i| {"id" => i + 1, "title" => "Post #{i + 1}"} } dest = source expected = source @@ -44,8 +29,8 @@ def test_overwrite_without_only_changed_has_ctid_diffs # rerun the overwrite assert_works "posts --overwrite", config: true ctids2 = conn2.exec("select ctid from posts").to_a - # verify ctids have changed. - refute_equal(ctids, ctids2) + # verify ctids have not changed + assert_equal(ctids, ctids2) end def test_preserve From bf2e61306de4d22f0702d582f847e025fd65ff04 Mon Sep 17 00:00:00 2001 From: Marco Montagna Date: Thu, 16 Sep 2021 09:50:25 -0700 Subject: [PATCH 3/5] remove debug stmt. --- lib/pgsync/task.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pgsync/task.rb b/lib/pgsync/task.rb index 2f8502c5..1ceda54d 100644 --- a/lib/pgsync/task.rb +++ b/lib/pgsync/task.rb @@ -146,7 +146,7 @@ def sync_data "NOTHING" end end - destination.execute("INSERT INTO #{quoted_table} (#{fields}) (SELECT #{fields} FROM #{quote_ident_full(temp_table)}) ON CONFLICT (#{on_conflict}) DO #{action} returning *").length + destination.execute("INSERT INTO #{quoted_table} (#{fields}) (SELECT #{fields} FROM #{quote_ident_full(temp_table)}) ON CONFLICT (#{on_conflict}) DO #{action}") else # use delete instead of truncate for foreign keys if opts[:defer_constraints] || opts[:defer_constraints_v2] From 918d8e581cb3f6f8ffa35661eb4327bd77902fc1 Mon Sep 17 00:00:00 2001 From: Marco Montagna Date: Thu, 16 Sep 2021 14:18:47 -0700 Subject: [PATCH 4/5] Fix json column comparison. --- lib/pgsync/task.rb | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/lib/pgsync/task.rb b/lib/pgsync/task.rb index 1ceda54d..5cc8f929 100644 --- a/lib/pgsync/task.rb +++ b/lib/pgsync/task.rb @@ -19,6 +19,14 @@ def quoted_table quote_ident_full(table) end + def field_equality(column) + if to_types[column] == 'json' + "#{quoted_table}.#{quote_ident(column)}::jsonb = EXCLUDED.#{quote_ident(column)}::jsonb" + else + "#{quoted_table}.#{quote_ident(column)} = EXCLUDED.#{quote_ident(column)}" + end + end + def perform with_notices do handle_errors do @@ -37,6 +45,14 @@ def to_fields @to_fields ||= to_columns.map { |c| c[:name] } end + def from_types + @from_types ||= from_columns.map { |c| [c[:name], c[:type]] }.to_h + end + + def to_types + @to_types ||= to_columns.map { |c| [c[:name], c[:type]] }.to_h + end + def shared_fields @shared_fields ||= to_fields & from_fields end @@ -62,8 +78,6 @@ def notes missing_sequences = from_sequences - to_sequences notes << "Missing sequences: #{missing_sequences.join(", ")}" if missing_sequences.any? - from_types = from_columns.map { |c| [c[:name], c[:type]] }.to_h - to_types = to_columns.map { |c| [c[:name], c[:type]] }.to_h different_types = [] shared_fields.each do |field| if from_types[field] != to_types[field] @@ -135,13 +149,14 @@ def sync_data copy(copy_to_command, dest_table: temp_table, dest_fields: fields) on_conflict = primary_key.map { |pk| quote_ident(pk) }.join(", ") + action = if opts[:preserve] "NOTHING" else # overwrite or sql clause setter = shared_fields.reject { |f| primary_key.include?(f) }.map { |f| "#{quote_ident(f)} = EXCLUDED.#{quote_ident(f)}" } if setter.any? - "UPDATE SET #{setter.join(", ")} WHERE NOT (#{shared_fields.reject { |f| primary_key.include?(f) }.map { |f| "#{quoted_table}.#{quote_ident(f)} = EXCLUDED.#{quote_ident(f)}" }.join(" AND ")})" + "UPDATE SET #{setter.join(", ")} WHERE NOT (#{shared_fields.reject { |f| primary_key.include?(f) }.map { |f| field_equality(f) }.join(" AND ")})" else "NOTHING" end From 32b3773c90b77241d582bf2c5df681ba612fa590 Mon Sep 17 00:00:00 2001 From: Marco Montagna Date: Mon, 20 Sep 2021 16:34:52 -0700 Subject: [PATCH 5/5] Fix null case --- lib/pgsync/task.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/pgsync/task.rb b/lib/pgsync/task.rb index 5cc8f929..c1520a67 100644 --- a/lib/pgsync/task.rb +++ b/lib/pgsync/task.rb @@ -20,10 +20,11 @@ def quoted_table end def field_equality(column) + null_cond = "(#{quoted_table}.#{quote_ident(column)} IS NULL AND EXCLUDED.#{quote_ident(column)} IS NULL)" if to_types[column] == 'json' - "#{quoted_table}.#{quote_ident(column)}::jsonb = EXCLUDED.#{quote_ident(column)}::jsonb" + "#{quoted_table}.#{quote_ident(column)}::jsonb = EXCLUDED.#{quote_ident(column)}::jsonb OR #{null_cond}" else - "#{quoted_table}.#{quote_ident(column)} = EXCLUDED.#{quote_ident(column)}" + "#{quoted_table}.#{quote_ident(column)} = EXCLUDED.#{quote_ident(column)} OR #{null_cond}" end end