Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions lib/pgsync/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ def quoted_table
quote_ident_full(table)
end

def field_equality(column)
null_cond = "(#{quoted_table}.#{quote_ident(column)} IS NULL AND EXCLUDED.#{quote_ident(column)} IS NULL)"
if to_types[column] == 'json'
"#{quoted_table}.#{quote_ident(column)}::jsonb = EXCLUDED.#{quote_ident(column)}::jsonb OR #{null_cond}"
else
"#{quoted_table}.#{quote_ident(column)} = EXCLUDED.#{quote_ident(column)} OR #{null_cond}"
end
end

def perform
with_notices do
handle_errors do
Expand All @@ -37,6 +46,14 @@ def to_fields
@to_fields ||= to_columns.map { |c| c[:name] }
end

def from_types
@from_types ||= from_columns.map { |c| [c[:name], c[:type]] }.to_h
end

def to_types
@to_types ||= to_columns.map { |c| [c[:name], c[:type]] }.to_h
end

def shared_fields
@shared_fields ||= to_fields & from_fields
end
Expand All @@ -62,8 +79,6 @@ def notes
missing_sequences = from_sequences - to_sequences
notes << "Missing sequences: #{missing_sequences.join(", ")}" if missing_sequences.any?

from_types = from_columns.map { |c| [c[:name], c[:type]] }.to_h
to_types = to_columns.map { |c| [c[:name], c[:type]] }.to_h
different_types = []
shared_fields.each do |field|
if from_types[field] != to_types[field]
Expand Down Expand Up @@ -135,13 +150,14 @@ def sync_data
copy(copy_to_command, dest_table: temp_table, dest_fields: fields)

on_conflict = primary_key.map { |pk| quote_ident(pk) }.join(", ")

action =
if opts[:preserve]
"NOTHING"
else # overwrite or sql clause
setter = shared_fields.reject { |f| primary_key.include?(f) }.map { |f| "#{quote_ident(f)} = EXCLUDED.#{quote_ident(f)}" }
if setter.any?
"UPDATE SET #{setter.join(", ")}"
"UPDATE SET #{setter.join(", ")} WHERE NOT (#{shared_fields.reject { |f| primary_key.include?(f) }.map { |f| field_equality(f) }.join(" AND ")})"
else
"NOTHING"
end
Expand Down
14 changes: 14 additions & 0 deletions test/sync_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ def test_overwrite
assert_result("--overwrite", source, dest, expected)
end

def test_overwrite_skips_identical_rows
source = 3.times.map { |i| {"id" => i + 1, "title" => "Post #{i + 1}"} }
dest = source
expected = source
assert_result("--overwrite", source, dest, expected)
# get ctids (~ table row versions)
ctids = conn2.exec("select ctid from posts").to_a
# rerun the overwrite
assert_works "posts --overwrite", config: true
ctids2 = conn2.exec("select ctid from posts").to_a
# verify ctids have not changed
assert_equal(ctids, ctids2)
end

def test_preserve
source = 3.times.map { |i| {"id" => i + 1, "title" => "Post #{i + 1}"} }
dest = [{"id" => 1, "title" => "First Post"}, {"id" => 4, "title" => "Post 4"}]
Expand Down