From bd381ef9e44f48c3d2b03d2d7afca18187d62568 Mon Sep 17 00:00:00 2001 From: Mark Mennell Date: Sat, 9 May 2026 21:03:38 +0800 Subject: [PATCH 1/9] pid not set fix --- dd.sql | 37 ++++++++++++++ src/sender.go | 4 ++ src/store.go | 95 ++++++++++++++++++++++++++---------- src/store_test.go | 119 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 231 insertions(+), 24 deletions(-) create mode 100644 src/store_test.go diff --git a/dd.sql b/dd.sql index e5ff65c..2b8a450 100644 --- a/dd.sql +++ b/dd.sql @@ -62,6 +62,43 @@ create table if not exists msg_attachment ( primary key (msg_id, filename) ); +-- resolve the local numeric parent id whenever the wire parent hash is known +-- and the parent row already exists. +create or replace function resolve_msg_pid_before_write() returns trigger as $$ +begin + if NEW.psha256 is not null and NEW.pid is null then + select id into NEW.pid from msg where sha256 = NEW.psha256; + end if; + return NEW; +end; +$$ language plpgsql; + +drop trigger if exists trg_msg_resolve_pid_before_write on msg; +create trigger trg_msg_resolve_pid_before_write + before insert or update of psha256 on msg + for each row execute function resolve_msg_pid_before_write(); + +-- if a parent row's sha256 becomes available after child rows were stored, +-- backfill those child rows' numeric parent id. +create or replace function resolve_msg_pid_after_hash() returns trigger as $$ +begin + if NEW.sha256 is not null then + update msg set pid = NEW.id + where psha256 = NEW.sha256 and pid is null; + end if; + return NEW; +end; +$$ language plpgsql; + +drop trigger if exists trg_msg_resolve_pid_after_hash on msg; +create trigger trg_msg_resolve_pid_after_hash + after insert or update of sha256 on msg + for each row execute function resolve_msg_pid_after_hash(); + +update msg child set pid = parent.id +from msg parent +where child.psha256 = parent.sha256 and child.pid is null; + -- notify when a new msg_to row is inserted with null time_delivered so the -- sender can pick it up immediately instead of waiting for the next poll. create or replace function notify_msg_to_insert() returns trigger as $$ diff --git a/src/sender.go b/src/sender.go index 3d50a98..043ac1d 100644 --- a/src/sender.go +++ b/src/sender.go @@ -355,6 +355,10 @@ func deliverMessage(target pendingTarget) { log.Printf("ERROR: sender: storing sha256 for msg %d: %s", target.MsgID, err) return } + if err := resolvePendingChildLinks(txParentLinkStore{tx: tx}, target.MsgID, msgHash); err != nil { + log.Printf("ERROR: sender: resolving child pids for msg %d: %s", target.MsgID, err) + return + } // Compute header hash now; registerOutgoing with Host B's IP happens after // the connection is established (IP needed for challenge validation ยง10.5). diff --git a/src/store.go b/src/store.go index 19775d3..f2380f7 100644 --- a/src/store.go +++ b/src/store.go @@ -102,6 +102,73 @@ func hasAddrReceivedMsgHash(hash []byte, addr *FMsgAddress) (bool, error) { return exists, nil } +type parentLinkStore interface { + lookupParentID(parentHash []byte) (int64, error) + setParentID(msgID int64, parentID int64) error + setPendingChildrenParentID(parentID int64, parentHash []byte) error +} + +type txParentLinkStore struct { + tx *sql.Tx +} + +func (s txParentLinkStore) lookupParentID(parentHash []byte) (int64, error) { + var id int64 + err := s.tx.QueryRow("SELECT id FROM msg WHERE sha256 = $1", parentHash).Scan(&id) + if err == sql.ErrNoRows { + return 0, nil + } + return id, err +} + +func (s txParentLinkStore) setParentID(msgID int64, parentID int64) error { + _, err := s.tx.Exec("UPDATE msg SET pid = $1 WHERE id = $2", parentID, msgID) + return err +} + +func (s txParentLinkStore) setPendingChildrenParentID(parentID int64, parentHash []byte) error { + _, err := s.tx.Exec("UPDATE msg SET pid = $1 WHERE psha256 = $2 AND pid IS NULL", parentID, parentHash) + return err +} + +func resolveStoredParent(store parentLinkStore, msgID int64, parentHash []byte, requireParent bool) error { + if len(parentHash) == 0 { + return nil + } + + parentID, err := store.lookupParentID(parentHash) + if err != nil { + return err + } + if parentID == 0 { + if requireParent { + return fmt.Errorf("parent message not found for psha256 %x", parentHash) + } + return nil + } + + return store.setParentID(msgID, parentID) +} + +func resolvePendingChildLinks(store parentLinkStore, parentID int64, parentHash []byte) error { + if len(parentHash) == 0 { + return nil + } + return store.setPendingChildrenParentID(parentID, parentHash) +} + +func resolveMsgParentLinks(tx *sql.Tx, msgID int64, msgHash []byte, parentHash []byte, requireParent bool) error { + store := txParentLinkStore{tx: tx} + if err := resolveStoredParent(store, msgID, parentHash, requireParent); err != nil { + return err + } + return resolvePendingChildLinks(store, msgID, msgHash) +} + +func requiresStoredParent(msg *FMsgHeader) bool { + return len(msg.Pid) > 0 && msg.Flags&FlagHasAddTo == 0 +} + // getMsgByID loads a message and all its recipients from the database by msg ID. // Returns the full FMsgHeader or nil if the message doesn't exist. func getMsgByID(msgID int64) (*FMsgHeader, error) { @@ -242,18 +309,8 @@ values ($1, $2, $3, $4, $5, $6, $7)`) } } - // resolve pid from psha256 (parent message hash) - if len(msg.Pid) > 0 { - var parentID sql.NullInt64 - err = tx.QueryRow("SELECT id FROM msg WHERE sha256 = $1", msg.Pid).Scan(&parentID) - if err != nil && err != sql.ErrNoRows { - return err - } - if parentID.Valid { - if _, err = tx.Exec("UPDATE msg SET pid = $1 WHERE id = $2", parentID.Int64, msgID); err != nil { - return err - } - } + if err := resolveMsgParentLinks(tx, msgID, msgHash, msg.Pid, requiresStoredParent(msg)); err != nil { + return err } return tx.Commit() @@ -361,18 +418,8 @@ values ($1, $2, $3, $4, $5, $6, $7)`) } } - // resolve pid from psha256 - if len(msg.Pid) > 0 { - var parentID sql.NullInt64 - err = tx.QueryRow("SELECT id FROM msg WHERE sha256 = $1", msg.Pid).Scan(&parentID) - if err != nil && err != sql.ErrNoRows { - return err - } - if parentID.Valid { - if _, err = tx.Exec("UPDATE msg SET pid = $1 WHERE id = $2", parentID.Int64, msgID); err != nil { - return err - } - } + if err := resolveMsgParentLinks(tx, msgID, msgHash, msg.Pid, requiresStoredParent(msg)); err != nil { + return err } return tx.Commit() diff --git a/src/store_test.go b/src/store_test.go new file mode 100644 index 0000000..d37abfc --- /dev/null +++ b/src/store_test.go @@ -0,0 +1,119 @@ +package main + +import ( + "bytes" + "errors" + "testing" +) + +type fakeParentLinkStore struct { + parentID int64 + lookupErr error + + lookupHash []byte + setMsgID int64 + setParentIDValue int64 + setCalled bool + pendingParentID int64 + pendingParentHash []byte + pendingCalled bool +} + +func (s *fakeParentLinkStore) lookupParentID(parentHash []byte) (int64, error) { + s.lookupHash = append([]byte(nil), parentHash...) + return s.parentID, s.lookupErr +} + +func (s *fakeParentLinkStore) setParentID(msgID int64, parentID int64) error { + s.setCalled = true + s.setMsgID = msgID + s.setParentIDValue = parentID + return nil +} + +func (s *fakeParentLinkStore) setPendingChildrenParentID(parentID int64, parentHash []byte) error { + s.pendingCalled = true + s.pendingParentID = parentID + s.pendingParentHash = append([]byte(nil), parentHash...) + return nil +} + +func TestResolveStoredParentRequiresExistingParent(t *testing.T) { + store := &fakeParentLinkStore{} + parentHash := []byte{1, 2, 3} + + err := resolveStoredParent(store, 10, parentHash, true) + if err == nil { + t.Fatal("resolveStoredParent returned nil error for required missing parent") + } + if !bytes.Equal(store.lookupHash, parentHash) { + t.Fatalf("lookup hash = %v, want %v", store.lookupHash, parentHash) + } + if store.setCalled { + t.Fatal("setParentID was called for missing parent") + } +} + +func TestResolveStoredParentAllowsOptionalMissingParent(t *testing.T) { + store := &fakeParentLinkStore{} + + if err := resolveStoredParent(store, 10, []byte{1, 2, 3}, false); err != nil { + t.Fatalf("resolveStoredParent returned error for optional missing parent: %v", err) + } + if store.setCalled { + t.Fatal("setParentID was called for optional missing parent") + } +} + +func TestResolveStoredParentSetsPidWhenParentExists(t *testing.T) { + store := &fakeParentLinkStore{parentID: 42} + + if err := resolveStoredParent(store, 10, []byte{1, 2, 3}, true); err != nil { + t.Fatalf("resolveStoredParent returned error: %v", err) + } + if !store.setCalled { + t.Fatal("setParentID was not called") + } + if store.setMsgID != 10 || store.setParentIDValue != 42 { + t.Fatalf("setParentID called with msgID=%d parentID=%d, want msgID=10 parentID=42", store.setMsgID, store.setParentIDValue) + } +} + +func TestResolveStoredParentPropagatesLookupError(t *testing.T) { + lookupErr := errors.New("lookup failed") + store := &fakeParentLinkStore{lookupErr: lookupErr} + + err := resolveStoredParent(store, 10, []byte{1, 2, 3}, true) + if !errors.Is(err, lookupErr) { + t.Fatalf("resolveStoredParent error = %v, want %v", err, lookupErr) + } + if store.setCalled { + t.Fatal("setParentID was called after lookup error") + } +} + +func TestResolvePendingChildLinksBackfillsByParentHash(t *testing.T) { + store := &fakeParentLinkStore{} + parentHash := []byte{4, 5, 6} + + if err := resolvePendingChildLinks(store, 42, parentHash); err != nil { + t.Fatalf("resolvePendingChildLinks returned error: %v", err) + } + if !store.pendingCalled { + t.Fatal("setPendingChildrenParentID was not called") + } + if store.pendingParentID != 42 || !bytes.Equal(store.pendingParentHash, parentHash) { + t.Fatalf("pending update got parentID=%d hash=%v, want parentID=42 hash=%v", store.pendingParentID, store.pendingParentHash, parentHash) + } +} + +func TestRequiresStoredParentUsesAddToFlag(t *testing.T) { + parentHash := []byte{1, 2, 3} + + if !requiresStoredParent(&FMsgHeader{Flags: FlagHasPid, Pid: parentHash}) { + t.Fatal("normal reply did not require stored parent") + } + if requiresStoredParent(&FMsgHeader{Flags: FlagHasPid | FlagHasAddTo, Pid: parentHash}) { + t.Fatal("add-to message required stored parent") + } +} From cd5695a714f7982b79d37bf8a20a7195f5dd292b Mon Sep 17 00:00:00 2001 From: Mark Mennell Date: Sat, 9 May 2026 21:12:01 +0800 Subject: [PATCH 2/9] graceful closure when not terminate --- src/host.go | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/src/host.go b/src/host.go index 62815fc..5bc6e6e 100644 --- a/src/host.go +++ b/src/host.go @@ -1545,6 +1545,19 @@ func abortConn(c net.Conn) { _ = c.Close() } +type responseTrackingConn struct { + net.Conn + wroteResponse bool +} + +func (c *responseTrackingConn) Write(b []byte) (int, error) { + n, err := c.Conn.Write(b) + if n > 0 { + c.wroteResponse = true + } + return n, err +} + func handleConn(c net.Conn) { defer func() { if r := recover(); r != nil { @@ -1553,11 +1566,16 @@ func handleConn(c net.Conn) { }() log.Printf("INFO: Connection from: %s\n", c.RemoteAddr().String()) + tc := &responseTrackingConn{Conn: c} // read header - header, r, err := readHeader(c) + header, r, err := readHeader(tc) if err != nil { log.Printf("WARN: reading header from, %s: %s", c.RemoteAddr().String(), err) + if tc.wroteResponse { + _ = c.Close() + return + } abortConn(c) return } @@ -1568,7 +1586,7 @@ func handleConn(c net.Conn) { return } - if err := challenge(c, header, determineSenderDomain(header)); err != nil { + if err := challenge(tc, header, determineSenderDomain(header)); err != nil { log.Printf("ERROR: Challenge failed to, %s: %s", c.RemoteAddr().String(), err) abortConn(c) return @@ -1584,8 +1602,11 @@ func handleConn(c net.Conn) { allLocalDup, err = allLocalRecipientsHaveMessageHash(header.ChallengeHash[:], addrs) if err != nil { log.Printf("ERROR: duplicate check failed for %s: %s", c.RemoteAddr().String(), err) - _ = sendCode(c, RejectCodeUndisclosed) - abortConn(c) + if err := sendCode(c, RejectCodeUndisclosed); err != nil { + abortConn(c) + return + } + _ = c.Close() return } } @@ -1598,8 +1619,11 @@ func handleConn(c net.Conn) { // No local add-to recipients; store header and respond code 11, close. if err := storeMsgHeaderOnly(header); err != nil { log.Printf("ERROR: storing add-to header: %s", err) - _ = sendCode(c, RejectCodeUndisclosed) - abortConn(c) + if err := sendCode(c, RejectCodeUndisclosed); err != nil { + abortConn(c) + return + } + _ = c.Close() return } if err := sendCode(c, AcceptCodeAddTo); err != nil { @@ -1643,6 +1667,7 @@ func handleConn(c net.Conn) { // if error was a protocal violation, abort; otherise let sender know there was an internal error log.Printf("ERROR: Download failed from, %s: %s", c.RemoteAddr().String(), err) if errors.Is(err, ErrProtocolViolation) { + abortConn(c) return } else { _ = sendCode(c, RejectCodeUndisclosed) From fb0a5e0e4ab0b1159e734fac87eb9fa1d53245dc Mon Sep 17 00:00:00 2001 From: Mark Mennell Date: Sat, 9 May 2026 21:59:21 +0800 Subject: [PATCH 3/9] rm backfill --- dd.sql | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dd.sql b/dd.sql index 2b8a450..a472821 100644 --- a/dd.sql +++ b/dd.sql @@ -95,10 +95,6 @@ create trigger trg_msg_resolve_pid_after_hash after insert or update of sha256 on msg for each row execute function resolve_msg_pid_after_hash(); -update msg child set pid = parent.id -from msg parent -where child.psha256 = parent.sha256 and child.pid is null; - -- notify when a new msg_to row is inserted with null time_delivered so the -- sender can pick it up immediately instead of waiting for the next poll. create or replace function notify_msg_to_insert() returns trigger as $$ From 0d5641d694abbe569be62f0537d282fa7b1168d9 Mon Sep 17 00:00:00 2001 From: Mark Mennell Date: Sun, 10 May 2026 14:35:04 +0800 Subject: [PATCH 4/9] rm trigger, app must follow the protocol! --- dd.sql | 33 --------------------------------- 1 file changed, 33 deletions(-) diff --git a/dd.sql b/dd.sql index a472821..e5ff65c 100644 --- a/dd.sql +++ b/dd.sql @@ -62,39 +62,6 @@ create table if not exists msg_attachment ( primary key (msg_id, filename) ); --- resolve the local numeric parent id whenever the wire parent hash is known --- and the parent row already exists. -create or replace function resolve_msg_pid_before_write() returns trigger as $$ -begin - if NEW.psha256 is not null and NEW.pid is null then - select id into NEW.pid from msg where sha256 = NEW.psha256; - end if; - return NEW; -end; -$$ language plpgsql; - -drop trigger if exists trg_msg_resolve_pid_before_write on msg; -create trigger trg_msg_resolve_pid_before_write - before insert or update of psha256 on msg - for each row execute function resolve_msg_pid_before_write(); - --- if a parent row's sha256 becomes available after child rows were stored, --- backfill those child rows' numeric parent id. -create or replace function resolve_msg_pid_after_hash() returns trigger as $$ -begin - if NEW.sha256 is not null then - update msg set pid = NEW.id - where psha256 = NEW.sha256 and pid is null; - end if; - return NEW; -end; -$$ language plpgsql; - -drop trigger if exists trg_msg_resolve_pid_after_hash on msg; -create trigger trg_msg_resolve_pid_after_hash - after insert or update of sha256 on msg - for each row execute function resolve_msg_pid_after_hash(); - -- notify when a new msg_to row is inserted with null time_delivered so the -- sender can pick it up immediately instead of waiting for the next poll. create or replace function notify_msg_to_insert() returns trigger as $$ From e7769a6f815ebe72d0198ce8b9f34ff0b057cc2a Mon Sep 17 00:00:00 2001 From: Mark Mennell Date: Sun, 10 May 2026 16:27:56 +0800 Subject: [PATCH 5/9] triggers to set psha256 when sending which was never being set --- dd.sql | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/dd.sql b/dd.sql index e5ff65c..aecf515 100644 --- a/dd.sql +++ b/dd.sql @@ -62,6 +62,45 @@ create table if not exists msg_attachment ( primary key (msg_id, filename) ); +-- keep protocol parent hash populated for locally-created replies that set +-- the relational parent id. Do not overwrite an existing psha256 because +-- incoming protocol messages already carry the parent hash explicitly. +create or replace function populate_msg_psha256_from_pid() returns trigger as $$ +begin + if NEW.pid is not null and (NEW.psha256 is null or octet_length(NEW.psha256) = 0) then + select parent.sha256 + into NEW.psha256 + from msg parent + where parent.id = NEW.pid; + end if; + return NEW; +end; +$$ language plpgsql; + +drop trigger if exists trg_msg_populate_psha256 on msg; +create trigger trg_msg_populate_psha256 + before insert or update of pid on msg + for each row execute function populate_msg_psha256_from_pid(); + +-- if a parent message hash is populated after child rows already exist, +-- fill any missing child protocol parent hashes. +create or replace function backfill_child_psha256_from_parent_sha256() returns trigger as $$ +begin + if NEW.sha256 is not null then + update msg + set psha256 = NEW.sha256 + where pid = NEW.id + and (psha256 is null or octet_length(psha256) = 0); + end if; + return NEW; +end; +$$ language plpgsql; + +drop trigger if exists trg_msg_backfill_child_psha256 on msg; +create trigger trg_msg_backfill_child_psha256 + after insert or update of sha256 on msg + for each row execute function backfill_child_psha256_from_parent_sha256(); + -- notify when a new msg_to row is inserted with null time_delivered so the -- sender can pick it up immediately instead of waiting for the next poll. create or replace function notify_msg_to_insert() returns trigger as $$ From 42b81a69ae0a535bc1ac34b8787792e1345f9938 Mon Sep 17 00:00:00 2001 From: Mark Mennell Date: Sun, 10 May 2026 17:11:25 +0800 Subject: [PATCH 6/9] rm backfill, contrain updating pid and sha256 --- dd.sql | 70 +++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 20 deletions(-) diff --git a/dd.sql b/dd.sql index aecf515..ef559f0 100644 --- a/dd.sql +++ b/dd.sql @@ -63,43 +63,73 @@ create table if not exists msg_attachment ( ); -- keep protocol parent hash populated for locally-created replies that set --- the relational parent id. Do not overwrite an existing psha256 because --- incoming protocol messages already carry the parent hash explicitly. +-- the relational parent id. A reply cannot reference a draft parent, and any +-- explicit psha256 must match the referenced parent's sha256. create or replace function populate_msg_psha256_from_pid() returns trigger as $$ +declare + parent_time_sent double precision; + parent_sha256 bytea; begin - if NEW.pid is not null and (NEW.psha256 is null or octet_length(NEW.psha256) = 0) then - select parent.sha256 - into NEW.psha256 - from msg parent - where parent.id = NEW.pid; + if NEW.pid is null then + return NEW; end if; + + select parent.time_sent, parent.sha256 + into parent_time_sent, parent_sha256 + from msg parent + where parent.id = NEW.pid; + + if not found then + raise exception 'parent message % does not exist', NEW.pid; + end if; + + if parent_time_sent is null then + raise exception 'cannot set pid %: parent message is a draft', NEW.pid; + end if; + + if parent_sha256 is null or octet_length(parent_sha256) = 0 then + raise exception 'cannot set pid %: parent message has no sha256', NEW.pid; + end if; + + if NEW.psha256 is null or octet_length(NEW.psha256) = 0 then + NEW.psha256 = parent_sha256; + elsif NEW.psha256 <> parent_sha256 then + raise exception 'psha256 does not match parent message % sha256', NEW.pid; + end if; + return NEW; end; $$ language plpgsql; drop trigger if exists trg_msg_populate_psha256 on msg; create trigger trg_msg_populate_psha256 - before insert or update of pid on msg + before insert or update of pid, psha256 on msg for each row execute function populate_msg_psha256_from_pid(); --- if a parent message hash is populated after child rows already exist, --- fill any missing child protocol parent hashes. -create or replace function backfill_child_psha256_from_parent_sha256() returns trigger as $$ +-- once a message has replies, it must remain referenceable by protocol hash. +create or replace function prevent_referenced_msg_from_becoming_unreferenceable() returns trigger as $$ begin - if NEW.sha256 is not null then - update msg - set psha256 = NEW.sha256 - where pid = NEW.id - and (psha256 is null or octet_length(psha256) = 0); + if exists (select 1 from msg child where child.pid = NEW.id) then + if NEW.time_sent is null then + raise exception 'cannot make message % a draft: it has replies', NEW.id; + end if; + + if NEW.sha256 is null or octet_length(NEW.sha256) = 0 then + raise exception 'cannot clear sha256 for message %: it has replies', NEW.id; + end if; + + if OLD.sha256 is distinct from NEW.sha256 then + raise exception 'cannot change sha256 for message %: it has replies', NEW.id; + end if; end if; return NEW; end; $$ language plpgsql; -drop trigger if exists trg_msg_backfill_child_psha256 on msg; -create trigger trg_msg_backfill_child_psha256 - after insert or update of sha256 on msg - for each row execute function backfill_child_psha256_from_parent_sha256(); +drop trigger if exists trg_msg_prevent_unreferenceable_parent on msg; +create trigger trg_msg_prevent_unreferenceable_parent + before update of time_sent, sha256 on msg + for each row execute function prevent_referenced_msg_from_becoming_unreferenceable(); -- notify when a new msg_to row is inserted with null time_delivered so the -- sender can pick it up immediately instead of waiting for the next poll. From e5923a7d9434eea241037df5cb6ce21c72e44050 Mon Sep 17 00:00:00 2001 From: Mark Mennell Date: Sun, 10 May 2026 17:34:59 +0800 Subject: [PATCH 7/9] fix add-to pid should always reference message recipients being added to - not its parent --- src/store.go | 1180 +++++++++++++++++++++++---------------------- src/store_test.go | 20 + 2 files changed, 612 insertions(+), 588 deletions(-) diff --git a/src/store.go b/src/store.go index f2380f7..b2689f2 100644 --- a/src/store.go +++ b/src/store.go @@ -1,588 +1,592 @@ -package main - -import ( - "database/sql" - "fmt" - "log" - "strings" - - "github.com/levenlabs/golib/timeutil" - _ "github.com/lib/pq" -) - -func testDb() error { - db, err := sql.Open("postgres", "") - if err != nil { - return err - } - defer db.Close() - err = db.Ping() - if err != nil { - return err - } - - var dbName, user, host, port string - _ = db.QueryRow("SELECT current_database()").Scan(&dbName) - _ = db.QueryRow("SELECT current_user").Scan(&user) - _ = db.QueryRow("SELECT inet_server_addr()::text").Scan(&host) - _ = db.QueryRow("SELECT inet_server_port()::text").Scan(&port) - log.Printf("INFO: Database connected: %s@%s:%s/%s", user, host, port, dbName) - - // verify required tables exist - for _, table := range []string{"msg", "msg_to", "msg_add_to", "msg_attachment"} { - var exists bool - err = db.QueryRow(`SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_name = $1 - )`, table).Scan(&exists) - if err != nil { - return fmt.Errorf("checking table %s: %w", table, err) - } - if !exists { - return fmt.Errorf("required table %s does not exist", table) - } - } - return nil -} - -// lookupMsgIdByHash returns the msg id for a message with the given SHA256 hash, -// or 0 if no such message exists. -func lookupMsgIdByHash(hash []byte) (int64, error) { - db, err := sql.Open("postgres", "") - if err != nil { - return 0, err - } - defer db.Close() - - var id int64 - err = db.QueryRow("SELECT id FROM msg WHERE sha256 = $1", hash).Scan(&id) - if err == sql.ErrNoRows { - return 0, nil - } - return id, err -} - -// hasAddrReceivedMsgHash reports whether addr has already received a stored -// message identified by hash. -func hasAddrReceivedMsgHash(hash []byte, addr *FMsgAddress) (bool, error) { - if addr == nil || len(hash) == 0 { - return false, nil - } - - db, err := sql.Open("postgres", "") - if err != nil { - return false, err - } - defer db.Close() - - addrStr := strings.ToLower(addr.ToString()) - - var exists bool - err = db.QueryRow(` - SELECT EXISTS ( - SELECT 1 - FROM msg m - JOIN msg_to mt ON mt.msg_id = m.id - WHERE m.sha256 = $1 - AND lower(mt.addr) = $2 - AND mt.time_delivered IS NOT NULL - UNION ALL - SELECT 1 - FROM msg m - JOIN msg_add_to mat ON mat.msg_id = m.id - WHERE m.sha256 = $1 - AND lower(mat.addr) = $2 - AND mat.time_delivered IS NOT NULL - ) - `, hash, addrStr).Scan(&exists) - if err != nil { - return false, err - } - - return exists, nil -} - -type parentLinkStore interface { - lookupParentID(parentHash []byte) (int64, error) - setParentID(msgID int64, parentID int64) error - setPendingChildrenParentID(parentID int64, parentHash []byte) error -} - -type txParentLinkStore struct { - tx *sql.Tx -} - -func (s txParentLinkStore) lookupParentID(parentHash []byte) (int64, error) { - var id int64 - err := s.tx.QueryRow("SELECT id FROM msg WHERE sha256 = $1", parentHash).Scan(&id) - if err == sql.ErrNoRows { - return 0, nil - } - return id, err -} - -func (s txParentLinkStore) setParentID(msgID int64, parentID int64) error { - _, err := s.tx.Exec("UPDATE msg SET pid = $1 WHERE id = $2", parentID, msgID) - return err -} - -func (s txParentLinkStore) setPendingChildrenParentID(parentID int64, parentHash []byte) error { - _, err := s.tx.Exec("UPDATE msg SET pid = $1 WHERE psha256 = $2 AND pid IS NULL", parentID, parentHash) - return err -} - -func resolveStoredParent(store parentLinkStore, msgID int64, parentHash []byte, requireParent bool) error { - if len(parentHash) == 0 { - return nil - } - - parentID, err := store.lookupParentID(parentHash) - if err != nil { - return err - } - if parentID == 0 { - if requireParent { - return fmt.Errorf("parent message not found for psha256 %x", parentHash) - } - return nil - } - - return store.setParentID(msgID, parentID) -} - -func resolvePendingChildLinks(store parentLinkStore, parentID int64, parentHash []byte) error { - if len(parentHash) == 0 { - return nil - } - return store.setPendingChildrenParentID(parentID, parentHash) -} - -func resolveMsgParentLinks(tx *sql.Tx, msgID int64, msgHash []byte, parentHash []byte, requireParent bool) error { - store := txParentLinkStore{tx: tx} - if err := resolveStoredParent(store, msgID, parentHash, requireParent); err != nil { - return err - } - return resolvePendingChildLinks(store, msgID, msgHash) -} - -func requiresStoredParent(msg *FMsgHeader) bool { - return len(msg.Pid) > 0 && msg.Flags&FlagHasAddTo == 0 -} - -// getMsgByID loads a message and all its recipients from the database by msg ID. -// Returns the full FMsgHeader or nil if the message doesn't exist. -func getMsgByID(msgID int64) (*FMsgHeader, error) { - db, err := sql.Open("postgres", "") - if err != nil { - return nil, err - } - defer db.Close() - - tx, err := db.Begin() - if err != nil { - return nil, err - } - defer tx.Rollback() - - h, err := loadMsg(tx, msgID) - if err != nil { - // If the message doesn't exist, loadMsg will return an error, - // but we want to distinguish "not found" from other errors - if err.Error() == "no rows in result set" || err == sql.ErrNoRows { - return nil, nil - } - return nil, err - } - - return h, nil -} - -func storeMsgDetail(msg *FMsgHeader) error { - - db, err := sql.Open("postgres", "") - if err != nil { - return err - } - defer db.Close() - - tx, err := db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - msgHash, err := msg.GetMessageHash() - if err != nil { - return err - } - - var addToFrom interface{} - if msg.AddToFrom != nil { - addToFrom = msg.AddToFrom.ToString() - } - - var msgID int64 - err = tx.QueryRow(`insert into msg (version - , no_reply - , is_important - , is_deflate - , time_sent - , from_addr - , add_to_from - , topic - , type - , sha256 - , psha256 - , size - , filepath) -values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) -returning id`, - msg.Version, - msg.Flags&FlagNoReply != 0, - msg.Flags&FlagImportant != 0, - msg.Flags&FlagDeflate != 0, - msg.Timestamp, - msg.From.ToString(), - addToFrom, - msg.Topic, - msg.Type, - msgHash, - msg.Pid, - int(msg.Size), - msg.Filepath).Scan(&msgID) - if err != nil { - return err - } - - stmt, err := tx.Prepare(`insert into msg_to (msg_id, addr, time_delivered) -values ($1, $2, $3)`) - if err != nil { - return err - } - defer stmt.Close() - - now := timeutil.TimestampNow().Float64() - for _, addr := range msg.To { - // recipients on our domain are already delivered; others are pending - var delivered interface{} - if addr.Domain == Domain { - delivered = now - } - if _, err := stmt.Exec(msgID, addr.ToString(), delivered); err != nil { - return err - } - } - - // insert add-to recipients into msg_add_to - if len(msg.AddTo) > 0 { - addToStmt, err := tx.Prepare(`insert into msg_add_to (msg_id, addr, time_delivered) -values ($1, $2, $3)`) - if err != nil { - return err - } - defer addToStmt.Close() - - for _, addr := range msg.AddTo { - var delivered interface{} - if addr.Domain == Domain { - delivered = now - } - if _, err := addToStmt.Exec(msgID, addr.ToString(), delivered); err != nil { - return err - } - } - } - - if len(msg.Attachments) > 0 { - attStmt, err := tx.Prepare(`insert into msg_attachment (msg_id, position, flags, type, filename, filesize, filepath) -values ($1, $2, $3, $4, $5, $6, $7)`) - if err != nil { - return err - } - defer attStmt.Close() - - for i := range msg.Attachments { - att := msg.Attachments[i] - if _, err := attStmt.Exec(msgID, i, int(att.Flags), att.Type, att.Filename, int(att.Size), att.Filepath); err != nil { - return err - } - } - } - - if err := resolveMsgParentLinks(tx, msgID, msgHash, msg.Pid, requiresStoredParent(msg)); err != nil { - return err - } - - return tx.Commit() - -} - -// storeMsgHeaderOnly stores just the message header for add-to notifications -// (spec code 11). Only the header is recorded so the header hash can be -// faithfully computed for subsequent messages referencing this one via pid. -func storeMsgHeaderOnly(msg *FMsgHeader) error { - db, err := sql.Open("postgres", "") - if err != nil { - return err - } - defer db.Close() - - tx, err := db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - msgHash, err := msg.GetMessageHash() - if err != nil { - return err - } - - var addToFrom interface{} - if msg.AddToFrom != nil { - addToFrom = msg.AddToFrom.ToString() - } - - var msgID int64 - err = tx.QueryRow(`insert into msg (version - , no_reply - , is_important - , is_deflate - , time_sent - , from_addr - , add_to_from - , topic - , type - , sha256 - , psha256 - , size - , filepath) -values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) -returning id`, - msg.Version, - msg.Flags&FlagNoReply != 0, - msg.Flags&FlagImportant != 0, - msg.Flags&FlagDeflate != 0, - msg.Timestamp, - msg.From.ToString(), - addToFrom, - msg.Topic, - msg.Type, - msgHash, - msg.Pid, - int(msg.Size), - "").Scan(&msgID) - if err != nil { - return err - } - - // insert to recipients (for record keeping) - toStmt, err := tx.Prepare(`insert into msg_to (msg_id, addr) values ($1, $2)`) - if err != nil { - return err - } - defer toStmt.Close() - for _, addr := range msg.To { - if _, err := toStmt.Exec(msgID, addr.ToString()); err != nil { - return err - } - } - - // insert add-to recipients - if len(msg.AddTo) > 0 { - addToStmt, err := tx.Prepare(`insert into msg_add_to (msg_id, addr) values ($1, $2)`) - if err != nil { - return err - } - defer addToStmt.Close() - for _, addr := range msg.AddTo { - if _, err := addToStmt.Exec(msgID, addr.ToString()); err != nil { - return err - } - } - } - - if len(msg.Attachments) > 0 { - attStmt, err := tx.Prepare(`insert into msg_attachment (msg_id, position, flags, type, filename, filesize, filepath) -values ($1, $2, $3, $4, $5, $6, $7)`) - if err != nil { - return err - } - defer attStmt.Close() - - for i := range msg.Attachments { - att := msg.Attachments[i] - if _, err := attStmt.Exec(msgID, i, int(att.Flags), att.Type, att.Filename, int(att.Size), att.Filepath); err != nil { - return err - } - } - } - - if err := resolveMsgParentLinks(tx, msgID, msgHash, msg.Pid, requiresStoredParent(msg)); err != nil { - return err - } - - return tx.Commit() -} - -// loadMsg loads a message and all its recipients from the database within the -// given transaction and returns a fully populated FMsgHeader. -func loadMsg(tx *sql.Tx, msgID int64) (*FMsgHeader, error) { - var version, size int - var noReply, isImportant, isDeflate bool - var pid, msgHash []byte - var fromAddr, topic, typ, filepath string - var addToFromAddr sql.NullString - var timeSent float64 - err := tx.QueryRow(` - SELECT version, no_reply, is_important, is_deflate, psha256, sha256, from_addr, add_to_from, topic, type, time_sent, size, filepath - FROM msg WHERE id = $1 - `, msgID).Scan(&version, &noReply, &isImportant, &isDeflate, &pid, &msgHash, &fromAddr, &addToFromAddr, &topic, &typ, &timeSent, &size, &filepath) - if err != nil { - return nil, fmt.Errorf("load msg %d: %w", msgID, err) - } - - recipRows, err := tx.Query(`SELECT addr FROM msg_to WHERE msg_id = $1 ORDER BY id`, msgID) - if err != nil { - return nil, fmt.Errorf("load recipients for msg %d: %w", msgID, err) - } - var allRecipientAddrs []string - for recipRows.Next() { - var a string - if err := recipRows.Scan(&a); err != nil { - recipRows.Close() - return nil, fmt.Errorf("scan recipient addr: %w", err) - } - allRecipientAddrs = append(allRecipientAddrs, a) - } - recipRows.Close() - if err := recipRows.Err(); err != nil { - return nil, fmt.Errorf("recipients query err for msg %d: %w", msgID, err) - } - - from, err := parseAddress([]byte(fromAddr)) - if err != nil { - return nil, fmt.Errorf("invalid from address %s: %w", fromAddr, err) - } - allTo := make([]FMsgAddress, 0, len(allRecipientAddrs)) - for _, a := range allRecipientAddrs { - addr, err := parseAddress([]byte(a)) - if err != nil { - return nil, fmt.Errorf("invalid to address %s: %w", a, err) - } - allTo = append(allTo, *addr) - } - - // load add-to recipients from msg_add_to - addToRows, err := tx.Query(`SELECT addr FROM msg_add_to WHERE msg_id = $1 ORDER BY id`, msgID) - if err != nil { - return nil, fmt.Errorf("load add-to recipients for msg %d: %w", msgID, err) - } - var allAddTo []FMsgAddress - for addToRows.Next() { - var a string - if err := addToRows.Scan(&a); err != nil { - addToRows.Close() - return nil, fmt.Errorf("scan add-to addr: %w", err) - } - addr, err := parseAddress([]byte(a)) - if err != nil { - addToRows.Close() - return nil, fmt.Errorf("invalid add-to address %s: %w", a, err) - } - allAddTo = append(allAddTo, *addr) - } - addToRows.Close() - if err := addToRows.Err(); err != nil { - return nil, fmt.Errorf("add-to recipients query err for msg %d: %w", msgID, err) - } - - attRows, err := tx.Query(` - SELECT flags, type, filename, filesize, filepath - FROM msg_attachment - WHERE msg_id = $1 - ORDER BY position, filename - `, msgID) - if err != nil { - return nil, fmt.Errorf("load attachments for msg %d: %w", msgID, err) - } - attachments := []FMsgAttachmentHeader{} - for attRows.Next() { - var flags, filesize int - var typ, filename, filepath string - if err := attRows.Scan(&flags, &typ, &filename, &filesize, &filepath); err != nil { - attRows.Close() - return nil, fmt.Errorf("scan attachment row: %w", err) - } - attachments = append(attachments, FMsgAttachmentHeader{ - Flags: uint8(flags), - Type: typ, - Filename: filename, - Size: uint32(filesize), - Filepath: filepath, - }) - } - attRows.Close() - if err := attRows.Err(); err != nil { - return nil, fmt.Errorf("attachments query err for msg %d: %w", msgID, err) - } - - // Compute flags bitfield from stored booleans and loaded data. - // has_pid and has_add_to are derived from actual data rather than stored, - // so add-to recipients added after the original message are included. - // - // When add-to recipients exist on a root message (no pid), set pid to the - // message's own hash so the wire format is valid: spec requires pid when - // add-to is present. This turns the outgoing message into an add-to - // notification referencing the original message. - if len(allAddTo) > 0 && len(pid) == 0 { - pid = msgHash - } - - var addToFrom *FMsgAddress - if addToFromAddr.Valid && addToFromAddr.String != "" { - addr, err := parseAddress([]byte(addToFromAddr.String)) - if err != nil { - return nil, fmt.Errorf("invalid add_to_from address %s: %w", addToFromAddr.String, err) - } - addToFrom = addr - } - if len(allAddTo) > 0 && addToFrom == nil { - // Backward-compatibility for older rows before add_to_from existed. - fallback := *from - addToFrom = &fallback - } - - var flags uint8 - if len(pid) > 0 { - flags |= FlagHasPid - } - if len(allAddTo) > 0 { - flags |= FlagHasAddTo - } - if noReply { - flags |= FlagNoReply - } - if isImportant { - flags |= FlagImportant - } - if isDeflate { - flags |= FlagDeflate - } - - return &FMsgHeader{ - Version: uint8(version), - Flags: flags, - Pid: pid, - From: *from, - To: allTo, - AddToFrom: addToFrom, - AddTo: allAddTo, - Timestamp: timeSent, - Topic: topic, - Type: typ, - Size: uint32(size), - Attachments: attachments, - Filepath: filepath, - }, nil -} +package main + +import ( + "database/sql" + "fmt" + "log" + "strings" + + "github.com/levenlabs/golib/timeutil" + _ "github.com/lib/pq" +) + +func testDb() error { + db, err := sql.Open("postgres", "") + if err != nil { + return err + } + defer db.Close() + err = db.Ping() + if err != nil { + return err + } + + var dbName, user, host, port string + _ = db.QueryRow("SELECT current_database()").Scan(&dbName) + _ = db.QueryRow("SELECT current_user").Scan(&user) + _ = db.QueryRow("SELECT inet_server_addr()::text").Scan(&host) + _ = db.QueryRow("SELECT inet_server_port()::text").Scan(&port) + log.Printf("INFO: Database connected: %s@%s:%s/%s", user, host, port, dbName) + + // verify required tables exist + for _, table := range []string{"msg", "msg_to", "msg_add_to", "msg_attachment"} { + var exists bool + err = db.QueryRow(`SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = $1 + )`, table).Scan(&exists) + if err != nil { + return fmt.Errorf("checking table %s: %w", table, err) + } + if !exists { + return fmt.Errorf("required table %s does not exist", table) + } + } + return nil +} + +// lookupMsgIdByHash returns the msg id for a message with the given SHA256 hash, +// or 0 if no such message exists. +func lookupMsgIdByHash(hash []byte) (int64, error) { + db, err := sql.Open("postgres", "") + if err != nil { + return 0, err + } + defer db.Close() + + var id int64 + err = db.QueryRow("SELECT id FROM msg WHERE sha256 = $1", hash).Scan(&id) + if err == sql.ErrNoRows { + return 0, nil + } + return id, err +} + +// hasAddrReceivedMsgHash reports whether addr has already received a stored +// message identified by hash. +func hasAddrReceivedMsgHash(hash []byte, addr *FMsgAddress) (bool, error) { + if addr == nil || len(hash) == 0 { + return false, nil + } + + db, err := sql.Open("postgres", "") + if err != nil { + return false, err + } + defer db.Close() + + addrStr := strings.ToLower(addr.ToString()) + + var exists bool + err = db.QueryRow(` + SELECT EXISTS ( + SELECT 1 + FROM msg m + JOIN msg_to mt ON mt.msg_id = m.id + WHERE m.sha256 = $1 + AND lower(mt.addr) = $2 + AND mt.time_delivered IS NOT NULL + UNION ALL + SELECT 1 + FROM msg m + JOIN msg_add_to mat ON mat.msg_id = m.id + WHERE m.sha256 = $1 + AND lower(mat.addr) = $2 + AND mat.time_delivered IS NOT NULL + ) + `, hash, addrStr).Scan(&exists) + if err != nil { + return false, err + } + + return exists, nil +} + +type parentLinkStore interface { + lookupParentID(parentHash []byte) (int64, error) + setParentID(msgID int64, parentID int64) error + setPendingChildrenParentID(parentID int64, parentHash []byte) error +} + +type txParentLinkStore struct { + tx *sql.Tx +} + +func (s txParentLinkStore) lookupParentID(parentHash []byte) (int64, error) { + var id int64 + err := s.tx.QueryRow("SELECT id FROM msg WHERE sha256 = $1", parentHash).Scan(&id) + if err == sql.ErrNoRows { + return 0, nil + } + return id, err +} + +func (s txParentLinkStore) setParentID(msgID int64, parentID int64) error { + _, err := s.tx.Exec("UPDATE msg SET pid = $1 WHERE id = $2", parentID, msgID) + return err +} + +func (s txParentLinkStore) setPendingChildrenParentID(parentID int64, parentHash []byte) error { + _, err := s.tx.Exec("UPDATE msg SET pid = $1 WHERE psha256 = $2 AND pid IS NULL", parentID, parentHash) + return err +} + +func resolveStoredParent(store parentLinkStore, msgID int64, parentHash []byte, requireParent bool) error { + if len(parentHash) == 0 { + return nil + } + + parentID, err := store.lookupParentID(parentHash) + if err != nil { + return err + } + if parentID == 0 { + if requireParent { + return fmt.Errorf("parent message not found for psha256 %x", parentHash) + } + return nil + } + + return store.setParentID(msgID, parentID) +} + +func resolvePendingChildLinks(store parentLinkStore, parentID int64, parentHash []byte) error { + if len(parentHash) == 0 { + return nil + } + return store.setPendingChildrenParentID(parentID, parentHash) +} + +func resolveMsgParentLinks(tx *sql.Tx, msgID int64, msgHash []byte, parentHash []byte, requireParent bool) error { + store := txParentLinkStore{tx: tx} + if err := resolveStoredParent(store, msgID, parentHash, requireParent); err != nil { + return err + } + return resolvePendingChildLinks(store, msgID, msgHash) +} + +func requiresStoredParent(msg *FMsgHeader) bool { + return len(msg.Pid) > 0 && msg.Flags&FlagHasAddTo == 0 +} + +func wirePidForLoadedMessage(storedParentHash []byte, msgHash []byte, hasAddTo bool) []byte { + if hasAddTo { + return msgHash + } + return storedParentHash +} + +// getMsgByID loads a message and all its recipients from the database by msg ID. +// Returns the full FMsgHeader or nil if the message doesn't exist. +func getMsgByID(msgID int64) (*FMsgHeader, error) { + db, err := sql.Open("postgres", "") + if err != nil { + return nil, err + } + defer db.Close() + + tx, err := db.Begin() + if err != nil { + return nil, err + } + defer tx.Rollback() + + h, err := loadMsg(tx, msgID) + if err != nil { + // If the message doesn't exist, loadMsg will return an error, + // but we want to distinguish "not found" from other errors + if err.Error() == "no rows in result set" || err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + + return h, nil +} + +func storeMsgDetail(msg *FMsgHeader) error { + + db, err := sql.Open("postgres", "") + if err != nil { + return err + } + defer db.Close() + + tx, err := db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + msgHash, err := msg.GetMessageHash() + if err != nil { + return err + } + + var addToFrom interface{} + if msg.AddToFrom != nil { + addToFrom = msg.AddToFrom.ToString() + } + + var msgID int64 + err = tx.QueryRow(`insert into msg (version + , no_reply + , is_important + , is_deflate + , time_sent + , from_addr + , add_to_from + , topic + , type + , sha256 + , psha256 + , size + , filepath) +values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) +returning id`, + msg.Version, + msg.Flags&FlagNoReply != 0, + msg.Flags&FlagImportant != 0, + msg.Flags&FlagDeflate != 0, + msg.Timestamp, + msg.From.ToString(), + addToFrom, + msg.Topic, + msg.Type, + msgHash, + msg.Pid, + int(msg.Size), + msg.Filepath).Scan(&msgID) + if err != nil { + return err + } + + stmt, err := tx.Prepare(`insert into msg_to (msg_id, addr, time_delivered) +values ($1, $2, $3)`) + if err != nil { + return err + } + defer stmt.Close() + + now := timeutil.TimestampNow().Float64() + for _, addr := range msg.To { + // recipients on our domain are already delivered; others are pending + var delivered interface{} + if addr.Domain == Domain { + delivered = now + } + if _, err := stmt.Exec(msgID, addr.ToString(), delivered); err != nil { + return err + } + } + + // insert add-to recipients into msg_add_to + if len(msg.AddTo) > 0 { + addToStmt, err := tx.Prepare(`insert into msg_add_to (msg_id, addr, time_delivered) +values ($1, $2, $3)`) + if err != nil { + return err + } + defer addToStmt.Close() + + for _, addr := range msg.AddTo { + var delivered interface{} + if addr.Domain == Domain { + delivered = now + } + if _, err := addToStmt.Exec(msgID, addr.ToString(), delivered); err != nil { + return err + } + } + } + + if len(msg.Attachments) > 0 { + attStmt, err := tx.Prepare(`insert into msg_attachment (msg_id, position, flags, type, filename, filesize, filepath) +values ($1, $2, $3, $4, $5, $6, $7)`) + if err != nil { + return err + } + defer attStmt.Close() + + for i := range msg.Attachments { + att := msg.Attachments[i] + if _, err := attStmt.Exec(msgID, i, int(att.Flags), att.Type, att.Filename, int(att.Size), att.Filepath); err != nil { + return err + } + } + } + + if err := resolveMsgParentLinks(tx, msgID, msgHash, msg.Pid, requiresStoredParent(msg)); err != nil { + return err + } + + return tx.Commit() + +} + +// storeMsgHeaderOnly stores just the message header for add-to notifications +// (spec code 11). Only the header is recorded so the header hash can be +// faithfully computed for subsequent messages referencing this one via pid. +func storeMsgHeaderOnly(msg *FMsgHeader) error { + db, err := sql.Open("postgres", "") + if err != nil { + return err + } + defer db.Close() + + tx, err := db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + msgHash, err := msg.GetMessageHash() + if err != nil { + return err + } + + var addToFrom interface{} + if msg.AddToFrom != nil { + addToFrom = msg.AddToFrom.ToString() + } + + var msgID int64 + err = tx.QueryRow(`insert into msg (version + , no_reply + , is_important + , is_deflate + , time_sent + , from_addr + , add_to_from + , topic + , type + , sha256 + , psha256 + , size + , filepath) +values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) +returning id`, + msg.Version, + msg.Flags&FlagNoReply != 0, + msg.Flags&FlagImportant != 0, + msg.Flags&FlagDeflate != 0, + msg.Timestamp, + msg.From.ToString(), + addToFrom, + msg.Topic, + msg.Type, + msgHash, + msg.Pid, + int(msg.Size), + "").Scan(&msgID) + if err != nil { + return err + } + + // insert to recipients (for record keeping) + toStmt, err := tx.Prepare(`insert into msg_to (msg_id, addr) values ($1, $2)`) + if err != nil { + return err + } + defer toStmt.Close() + for _, addr := range msg.To { + if _, err := toStmt.Exec(msgID, addr.ToString()); err != nil { + return err + } + } + + // insert add-to recipients + if len(msg.AddTo) > 0 { + addToStmt, err := tx.Prepare(`insert into msg_add_to (msg_id, addr) values ($1, $2)`) + if err != nil { + return err + } + defer addToStmt.Close() + for _, addr := range msg.AddTo { + if _, err := addToStmt.Exec(msgID, addr.ToString()); err != nil { + return err + } + } + } + + if len(msg.Attachments) > 0 { + attStmt, err := tx.Prepare(`insert into msg_attachment (msg_id, position, flags, type, filename, filesize, filepath) +values ($1, $2, $3, $4, $5, $6, $7)`) + if err != nil { + return err + } + defer attStmt.Close() + + for i := range msg.Attachments { + att := msg.Attachments[i] + if _, err := attStmt.Exec(msgID, i, int(att.Flags), att.Type, att.Filename, int(att.Size), att.Filepath); err != nil { + return err + } + } + } + + if err := resolveMsgParentLinks(tx, msgID, msgHash, msg.Pid, requiresStoredParent(msg)); err != nil { + return err + } + + return tx.Commit() +} + +// loadMsg loads a message and all its recipients from the database within the +// given transaction and returns a fully populated FMsgHeader. +func loadMsg(tx *sql.Tx, msgID int64) (*FMsgHeader, error) { + var version, size int + var noReply, isImportant, isDeflate bool + var pid, msgHash []byte + var fromAddr, topic, typ, filepath string + var addToFromAddr sql.NullString + var timeSent float64 + err := tx.QueryRow(` + SELECT version, no_reply, is_important, is_deflate, psha256, sha256, from_addr, add_to_from, topic, type, time_sent, size, filepath + FROM msg WHERE id = $1 + `, msgID).Scan(&version, &noReply, &isImportant, &isDeflate, &pid, &msgHash, &fromAddr, &addToFromAddr, &topic, &typ, &timeSent, &size, &filepath) + if err != nil { + return nil, fmt.Errorf("load msg %d: %w", msgID, err) + } + + recipRows, err := tx.Query(`SELECT addr FROM msg_to WHERE msg_id = $1 ORDER BY id`, msgID) + if err != nil { + return nil, fmt.Errorf("load recipients for msg %d: %w", msgID, err) + } + var allRecipientAddrs []string + for recipRows.Next() { + var a string + if err := recipRows.Scan(&a); err != nil { + recipRows.Close() + return nil, fmt.Errorf("scan recipient addr: %w", err) + } + allRecipientAddrs = append(allRecipientAddrs, a) + } + recipRows.Close() + if err := recipRows.Err(); err != nil { + return nil, fmt.Errorf("recipients query err for msg %d: %w", msgID, err) + } + + from, err := parseAddress([]byte(fromAddr)) + if err != nil { + return nil, fmt.Errorf("invalid from address %s: %w", fromAddr, err) + } + allTo := make([]FMsgAddress, 0, len(allRecipientAddrs)) + for _, a := range allRecipientAddrs { + addr, err := parseAddress([]byte(a)) + if err != nil { + return nil, fmt.Errorf("invalid to address %s: %w", a, err) + } + allTo = append(allTo, *addr) + } + + // load add-to recipients from msg_add_to + addToRows, err := tx.Query(`SELECT addr FROM msg_add_to WHERE msg_id = $1 ORDER BY id`, msgID) + if err != nil { + return nil, fmt.Errorf("load add-to recipients for msg %d: %w", msgID, err) + } + var allAddTo []FMsgAddress + for addToRows.Next() { + var a string + if err := addToRows.Scan(&a); err != nil { + addToRows.Close() + return nil, fmt.Errorf("scan add-to addr: %w", err) + } + addr, err := parseAddress([]byte(a)) + if err != nil { + addToRows.Close() + return nil, fmt.Errorf("invalid add-to address %s: %w", a, err) + } + allAddTo = append(allAddTo, *addr) + } + addToRows.Close() + if err := addToRows.Err(); err != nil { + return nil, fmt.Errorf("add-to recipients query err for msg %d: %w", msgID, err) + } + + attRows, err := tx.Query(` + SELECT flags, type, filename, filesize, filepath + FROM msg_attachment + WHERE msg_id = $1 + ORDER BY position, filename + `, msgID) + if err != nil { + return nil, fmt.Errorf("load attachments for msg %d: %w", msgID, err) + } + attachments := []FMsgAttachmentHeader{} + for attRows.Next() { + var flags, filesize int + var typ, filename, filepath string + if err := attRows.Scan(&flags, &typ, &filename, &filesize, &filepath); err != nil { + attRows.Close() + return nil, fmt.Errorf("scan attachment row: %w", err) + } + attachments = append(attachments, FMsgAttachmentHeader{ + Flags: uint8(flags), + Type: typ, + Filename: filename, + Size: uint32(filesize), + Filepath: filepath, + }) + } + attRows.Close() + if err := attRows.Err(); err != nil { + return nil, fmt.Errorf("attachments query err for msg %d: %w", msgID, err) + } + + // Compute flags bitfield from stored booleans and loaded data. + // has_pid and has_add_to are derived from actual data rather than stored, + // so add-to recipients added after the original message are included. + // + // When add-to recipients exist, the wire pid references the message being + // shared, not that message's parent. This keeps add-to on replies pointing + // at the reply payload rather than the root message. + pid = wirePidForLoadedMessage(pid, msgHash, len(allAddTo) > 0) + + var addToFrom *FMsgAddress + if addToFromAddr.Valid && addToFromAddr.String != "" { + addr, err := parseAddress([]byte(addToFromAddr.String)) + if err != nil { + return nil, fmt.Errorf("invalid add_to_from address %s: %w", addToFromAddr.String, err) + } + addToFrom = addr + } + if len(allAddTo) > 0 && addToFrom == nil { + // Backward-compatibility for older rows before add_to_from existed. + fallback := *from + addToFrom = &fallback + } + + var flags uint8 + if len(pid) > 0 { + flags |= FlagHasPid + } + if len(allAddTo) > 0 { + flags |= FlagHasAddTo + } + if noReply { + flags |= FlagNoReply + } + if isImportant { + flags |= FlagImportant + } + if isDeflate { + flags |= FlagDeflate + } + + return &FMsgHeader{ + Version: uint8(version), + Flags: flags, + Pid: pid, + From: *from, + To: allTo, + AddToFrom: addToFrom, + AddTo: allAddTo, + Timestamp: timeSent, + Topic: topic, + Type: typ, + Size: uint32(size), + Attachments: attachments, + Filepath: filepath, + }, nil +} diff --git a/src/store_test.go b/src/store_test.go index d37abfc..6bfdf98 100644 --- a/src/store_test.go +++ b/src/store_test.go @@ -117,3 +117,23 @@ func TestRequiresStoredParentUsesAddToFlag(t *testing.T) { t.Fatal("add-to message required stored parent") } } + +func TestWirePidForLoadedMessageAddToReferencesSharedMessage(t *testing.T) { + parentHash := []byte{1, 2, 3} + msgHash := []byte{4, 5, 6} + + got := wirePidForLoadedMessage(parentHash, msgHash, true) + if !bytes.Equal(got, msgHash) { + t.Fatalf("add-to wire pid = %v, want message hash %v", got, msgHash) + } +} + +func TestWirePidForLoadedMessageReplyKeepsParentHash(t *testing.T) { + parentHash := []byte{1, 2, 3} + msgHash := []byte{4, 5, 6} + + got := wirePidForLoadedMessage(parentHash, msgHash, false) + if !bytes.Equal(got, parentHash) { + t.Fatalf("reply wire pid = %v, want parent hash %v", got, parentHash) + } +} From 167c768a506f12bd64f8dcc0b2f8ad44b93bb6f3 Mon Sep 17 00:00:00 2001 From: Mark Mennell Date: Sun, 10 May 2026 17:45:10 +0800 Subject: [PATCH 8/9] backoff even wheh early terminate, record response_code as -1 --- src/sender.go | 42 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/src/sender.go b/src/sender.go index 043ac1d..a33a1e8 100644 --- a/src/sender.go +++ b/src/sender.go @@ -22,6 +22,16 @@ var RetryMaxAge float64 = 86400 var PollInterval = 10 var MaxConcurrentSend = 1024 +// localResponseCodeNoResponse is stored only in the database; it is not an fmsg protocol response code. +const localResponseCodeNoResponse = -1 + +var retryableResponseCodes = []int16{ + int16(localResponseCodeNoResponse), + int16(RejectCodeUndisclosed), + int16(RejectCodeInsufficentResources), + int16(RejectCodeUserFull), +} + func loadSenderEnvConfig() { RetryInterval = env.GetFloatDefault("FMSG_RETRY_INTERVAL", 20) RetryMaxAge = env.GetFloatDefault("FMSG_RETRY_MAX_AGE", 86400) @@ -54,7 +64,7 @@ func findPendingTargets() ([]pendingTarget, error) { INNER JOIN msg m ON m.id = mt.msg_id WHERE mt.time_delivered IS NULL AND m.time_sent IS NOT NULL - AND (mt.response_code IS NULL OR mt.response_code IN (3, 5, 101)) + AND (mt.response_code IS NULL OR mt.response_code = ANY($4)) AND (mt.time_last_attempt IS NULL OR ($1 - mt.time_last_attempt) > LEAST($2 * POWER(2.0, GREATEST(mt.attempt_count - 1, 0)::float), $3)) AND ($1 - m.time_sent) < $3 UNION ALL @@ -63,10 +73,10 @@ func findPendingTargets() ([]pendingTarget, error) { INNER JOIN msg m ON m.id = mat.msg_id WHERE mat.time_delivered IS NULL AND m.time_sent IS NOT NULL - AND (mat.response_code IS NULL OR mat.response_code IN (3, 5, 101)) + AND (mat.response_code IS NULL OR mat.response_code = ANY($4)) AND (mat.time_last_attempt IS NULL OR ($1 - mat.time_last_attempt) > LEAST($2 * POWER(2.0, GREATEST(mat.attempt_count - 1, 0)::float), $3)) AND ($1 - m.time_sent) < $3 - `, now, RetryInterval, RetryMaxAge) + `, now, RetryInterval, RetryMaxAge, pq.Array(retryableResponseCodes)) if err != nil { return nil, err } @@ -169,6 +179,12 @@ func commitOrLog(tx *sql.Tx, committed *bool, msgID int64) { } } +func recordRetryableFailure(tx *sql.Tx, committed *bool, lockedAddrs, lockedAddToAddrs []string, msgID int64) { + now := timeutil.TimestampNow().Float64() + updateAllLocked(tx, lockedAddrs, lockedAddToAddrs, msgID, now, localResponseCodeNoResponse, false) + commitOrLog(tx, committed, msgID) +} + // deliverMessage handles delivery of a single message to a single remote domain. // // It manages its own database transaction with the following lifecycle: @@ -177,7 +193,7 @@ func commitOrLog(tx *sql.Tx, committed *bool, msgID int64) { // - Sends the complete original message to the remote host. // - On success: updates time_delivered + response_code, commits. // - On rejection (got response code): updates response_code + time_last_attempt, commits. -// - On error (no response): logs error, rolls back โ€” delivery retried in future. +// - On early delivery error: records a retryable failure, commits, and backs off. func deliverMessage(target pendingTarget) { if strings.EqualFold(target.Domain, Domain) { // local domain โ€” mark as delivered rather than sending remotely @@ -228,11 +244,11 @@ func deliverMessage(target pendingTarget) { WHERE mt.msg_id = $1 AND mt.time_delivered IS NULL AND m.time_sent IS NOT NULL - AND (mt.response_code IS NULL OR mt.response_code IN (3, 5, 101)) + AND (mt.response_code IS NULL OR mt.response_code = ANY($5)) AND (mt.time_last_attempt IS NULL OR ($2 - mt.time_last_attempt) > LEAST($3 * POWER(2.0, GREATEST(mt.attempt_count - 1, 0)::float), $4)) AND ($2 - m.time_sent) < $4 FOR UPDATE OF mt SKIP LOCKED - `, target.MsgID, now, RetryInterval, RetryMaxAge) + `, target.MsgID, now, RetryInterval, RetryMaxAge, pq.Array(retryableResponseCodes)) if err != nil { log.Printf("ERROR: sender: lock rows for msg %d: %s", target.MsgID, err) return @@ -265,11 +281,11 @@ func deliverMessage(target pendingTarget) { WHERE mat.msg_id = $1 AND mat.time_delivered IS NULL AND m.time_sent IS NOT NULL - AND (mat.response_code IS NULL OR mat.response_code IN (3, 5, 101)) + AND (mat.response_code IS NULL OR mat.response_code = ANY($5)) AND (mat.time_last_attempt IS NULL OR ($2 - mat.time_last_attempt) > LEAST($3 * POWER(2.0, GREATEST(mat.attempt_count - 1, 0)::float), $4)) AND ($2 - m.time_sent) < $4 FOR UPDATE OF mat SKIP LOCKED - `, target.MsgID, now, RetryInterval, RetryMaxAge) + `, target.MsgID, now, RetryInterval, RetryMaxAge, pq.Array(retryableResponseCodes)) if err != nil { log.Printf("ERROR: sender: lock add-to rows for msg %d: %s", target.MsgID, err) return @@ -298,6 +314,13 @@ func deliverMessage(target pendingTarget) { return // already locked by another sender or no longer eligible } + deferRetry := true + defer func() { + if deferRetry && !committed { + recordRetryableFailure(tx, &committed, lockedAddrs, lockedAddToAddrs, target.MsgID) + } + }() + // Load the full message from msg table h, err := loadMsg(tx, target.MsgID) if err != nil { @@ -471,6 +494,7 @@ func deliverMessage(target pendingTarget) { } log.Printf("INFO: sender: msg %d add-to accepted by %s (code 11)", target.MsgID, target.Domain) updateAllLocked(tx, lockedAddrs, lockedAddToAddrs, target.MsgID, now, int(initCode[0]), true) + deferRetry = false commitOrLog(tx, &committed, target.MsgID) return default: @@ -479,6 +503,7 @@ func deliverMessage(target pendingTarget) { log.Printf("WARN: sender: msg %d rejected by %s: %s (%d)", target.MsgID, target.Domain, responseCodeName(initCode[0]), initCode[0]) updateAllLocked(tx, lockedAddrs, lockedAddToAddrs, target.MsgID, now, int(initCode[0]), false) + deferRetry = false commitOrLog(tx, &committed, target.MsgID) } else { // unexpected code โ€” TERMINATE @@ -516,6 +541,7 @@ func deliverMessage(target pendingTarget) { updateRecipient(tx, table, dr.addr, target.MsgID, now, int(c), delivered) } + deferRetry = false commitOrLog(tx, &committed, target.MsgID) } From dc8eca4e4bd3f72c0b97fc8318b5b7149f90b586 Mon Sep 17 00:00:00 2001 From: Mark Mennell Date: Sun, 10 May 2026 17:51:37 +0800 Subject: [PATCH 9/9] env file location in systemd e.g. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 8b1f677..5c9019c 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ An example systemd service to run fmsgd as a service on startup ASSUMES: * Directory `/opt/fmsgd` has been created and contains built executable: `fmsgd` -* Text file `/opt/fmsgd/env` exists containing environment variables (example below) +* Text file `/etc/fmsgd/env` exists containing environment variables (example below) * User `fmsg` has been created and has - read and execute permissions to `/opt/fmsgd/`, e.g. with `chown -R fmsg:fmsg /opt/fmsgd` after `mkdir /opt/fmsgd` - write permissions to FMSG_DATA_DIR @@ -85,7 +85,7 @@ Type=simple User=fmsg Group=fmsg -EnvironmentFile=/opt/fmsgd/env +EnvironmentFile=/etc/fmsgd/env ExecStart=/opt/fmsgd/fmsgd 0.0.0.0 WorkingDirectory=/opt/fmsgd