Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ An example systemd service to run fmsgd as a service on startup

ASSUMES:
* Directory `/opt/fmsgd` has been created and contains built executable: `fmsgd`
* Text file `/opt/fmsgd/env` exists containing environment variables (example below)
* Text file `/etc/fmsgd/env` exists containing environment variables (example below)
* User `fmsg` has been created and has
- read and execute permissions to `/opt/fmsgd/`, e.g. with `chown -R fmsg:fmsg /opt/fmsgd` after `mkdir /opt/fmsgd`
- write permissions to FMSG_DATA_DIR
Expand All @@ -85,7 +85,7 @@ Type=simple
User=fmsg
Group=fmsg

EnvironmentFile=/opt/fmsgd/env
EnvironmentFile=/etc/fmsgd/env

ExecStart=/opt/fmsgd/fmsgd 0.0.0.0
WorkingDirectory=/opt/fmsgd
Expand Down
69 changes: 69 additions & 0 deletions dd.sql
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,75 @@ create table if not exists msg_attachment (
primary key (msg_id, filename)
);

-- keep protocol parent hash populated for locally-created replies that set
-- the relational parent id. A reply cannot reference a draft parent, and any
-- explicit psha256 must match the referenced parent's sha256.
create or replace function populate_msg_psha256_from_pid() returns trigger as $$
declare
parent_time_sent double precision;
parent_sha256 bytea;
begin
if NEW.pid is null then
return NEW;
end if;

select parent.time_sent, parent.sha256
into parent_time_sent, parent_sha256
from msg parent
where parent.id = NEW.pid;

if not found then
raise exception 'parent message % does not exist', NEW.pid;
end if;

if parent_time_sent is null then
raise exception 'cannot set pid %: parent message is a draft', NEW.pid;
end if;

if parent_sha256 is null or octet_length(parent_sha256) = 0 then
raise exception 'cannot set pid %: parent message has no sha256', NEW.pid;
end if;

if NEW.psha256 is null or octet_length(NEW.psha256) = 0 then
NEW.psha256 = parent_sha256;
elsif NEW.psha256 <> parent_sha256 then
raise exception 'psha256 does not match parent message % sha256', NEW.pid;
end if;

return NEW;
end;
$$ language plpgsql;

drop trigger if exists trg_msg_populate_psha256 on msg;
create trigger trg_msg_populate_psha256
before insert or update of pid, psha256 on msg
for each row execute function populate_msg_psha256_from_pid();

-- once a message has replies, it must remain referenceable by protocol hash.
create or replace function prevent_referenced_msg_from_becoming_unreferenceable() returns trigger as $$
begin
if exists (select 1 from msg child where child.pid = NEW.id) then
if NEW.time_sent is null then
raise exception 'cannot make message % a draft: it has replies', NEW.id;
end if;

if NEW.sha256 is null or octet_length(NEW.sha256) = 0 then
raise exception 'cannot clear sha256 for message %: it has replies', NEW.id;
end if;

if OLD.sha256 is distinct from NEW.sha256 then
raise exception 'cannot change sha256 for message %: it has replies', NEW.id;
end if;
end if;
return NEW;
end;
$$ language plpgsql;

drop trigger if exists trg_msg_prevent_unreferenceable_parent on msg;
create trigger trg_msg_prevent_unreferenceable_parent
before update of time_sent, sha256 on msg
for each row execute function prevent_referenced_msg_from_becoming_unreferenceable();

-- notify when a new msg_to row is inserted with null time_delivered so the
-- sender can pick it up immediately instead of waiting for the next poll.
create or replace function notify_msg_to_insert() returns trigger as $$
Expand Down
37 changes: 31 additions & 6 deletions src/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,19 @@ func abortConn(c net.Conn) {
_ = c.Close()
}

type responseTrackingConn struct {
net.Conn
wroteResponse bool
}

func (c *responseTrackingConn) Write(b []byte) (int, error) {
n, err := c.Conn.Write(b)
if n > 0 {
c.wroteResponse = true
}
return n, err
}

func handleConn(c net.Conn) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -1553,11 +1566,16 @@ func handleConn(c net.Conn) {
}()

log.Printf("INFO: Connection from: %s\n", c.RemoteAddr().String())
tc := &responseTrackingConn{Conn: c}

// read header
header, r, err := readHeader(c)
header, r, err := readHeader(tc)
if err != nil {
log.Printf("WARN: reading header from, %s: %s", c.RemoteAddr().String(), err)
if tc.wroteResponse {
_ = c.Close()
return
}
abortConn(c)
return
}
Expand All @@ -1568,7 +1586,7 @@ func handleConn(c net.Conn) {
return
}

if err := challenge(c, header, determineSenderDomain(header)); err != nil {
if err := challenge(tc, header, determineSenderDomain(header)); err != nil {
log.Printf("ERROR: Challenge failed to, %s: %s", c.RemoteAddr().String(), err)
abortConn(c)
return
Expand All @@ -1584,8 +1602,11 @@ func handleConn(c net.Conn) {
allLocalDup, err = allLocalRecipientsHaveMessageHash(header.ChallengeHash[:], addrs)
if err != nil {
log.Printf("ERROR: duplicate check failed for %s: %s", c.RemoteAddr().String(), err)
_ = sendCode(c, RejectCodeUndisclosed)
abortConn(c)
if err := sendCode(c, RejectCodeUndisclosed); err != nil {
abortConn(c)
return
}
_ = c.Close()
return
}
}
Expand All @@ -1598,8 +1619,11 @@ func handleConn(c net.Conn) {
// No local add-to recipients; store header and respond code 11, close.
if err := storeMsgHeaderOnly(header); err != nil {
log.Printf("ERROR: storing add-to header: %s", err)
_ = sendCode(c, RejectCodeUndisclosed)
abortConn(c)
if err := sendCode(c, RejectCodeUndisclosed); err != nil {
abortConn(c)
return
}
_ = c.Close()
return
}
if err := sendCode(c, AcceptCodeAddTo); err != nil {
Expand Down Expand Up @@ -1643,6 +1667,7 @@ func handleConn(c net.Conn) {
// if error was a protocal violation, abort; otherise let sender know there was an internal error
log.Printf("ERROR: Download failed from, %s: %s", c.RemoteAddr().String(), err)
if errors.Is(err, ErrProtocolViolation) {
abortConn(c)
return
} else {
_ = sendCode(c, RejectCodeUndisclosed)
Expand Down
46 changes: 38 additions & 8 deletions src/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ var RetryMaxAge float64 = 86400
var PollInterval = 10
var MaxConcurrentSend = 1024

// localResponseCodeNoResponse is stored only in the database; it is not an fmsg protocol response code.
const localResponseCodeNoResponse = -1

var retryableResponseCodes = []int16{
int16(localResponseCodeNoResponse),
int16(RejectCodeUndisclosed),
int16(RejectCodeInsufficentResources),
int16(RejectCodeUserFull),
}

func loadSenderEnvConfig() {
RetryInterval = env.GetFloatDefault("FMSG_RETRY_INTERVAL", 20)
RetryMaxAge = env.GetFloatDefault("FMSG_RETRY_MAX_AGE", 86400)
Expand Down Expand Up @@ -54,7 +64,7 @@ func findPendingTargets() ([]pendingTarget, error) {
INNER JOIN msg m ON m.id = mt.msg_id
WHERE mt.time_delivered IS NULL
AND m.time_sent IS NOT NULL
AND (mt.response_code IS NULL OR mt.response_code IN (3, 5, 101))
AND (mt.response_code IS NULL OR mt.response_code = ANY($4))
AND (mt.time_last_attempt IS NULL OR ($1 - mt.time_last_attempt) > LEAST($2 * POWER(2.0, GREATEST(mt.attempt_count - 1, 0)::float), $3))
AND ($1 - m.time_sent) < $3
UNION ALL
Expand All @@ -63,10 +73,10 @@ func findPendingTargets() ([]pendingTarget, error) {
INNER JOIN msg m ON m.id = mat.msg_id
WHERE mat.time_delivered IS NULL
AND m.time_sent IS NOT NULL
AND (mat.response_code IS NULL OR mat.response_code IN (3, 5, 101))
AND (mat.response_code IS NULL OR mat.response_code = ANY($4))
AND (mat.time_last_attempt IS NULL OR ($1 - mat.time_last_attempt) > LEAST($2 * POWER(2.0, GREATEST(mat.attempt_count - 1, 0)::float), $3))
AND ($1 - m.time_sent) < $3
`, now, RetryInterval, RetryMaxAge)
`, now, RetryInterval, RetryMaxAge, pq.Array(retryableResponseCodes))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,6 +179,12 @@ func commitOrLog(tx *sql.Tx, committed *bool, msgID int64) {
}
}

func recordRetryableFailure(tx *sql.Tx, committed *bool, lockedAddrs, lockedAddToAddrs []string, msgID int64) {
now := timeutil.TimestampNow().Float64()
updateAllLocked(tx, lockedAddrs, lockedAddToAddrs, msgID, now, localResponseCodeNoResponse, false)
commitOrLog(tx, committed, msgID)
}

// deliverMessage handles delivery of a single message to a single remote domain.
//
// It manages its own database transaction with the following lifecycle:
Expand All @@ -177,7 +193,7 @@ func commitOrLog(tx *sql.Tx, committed *bool, msgID int64) {
// - Sends the complete original message to the remote host.
// - On success: updates time_delivered + response_code, commits.
// - On rejection (got response code): updates response_code + time_last_attempt, commits.
// - On error (no response): logs error, rolls back — delivery retried in future.
// - On early delivery error: records a retryable failure, commits, and backs off.
func deliverMessage(target pendingTarget) {
if strings.EqualFold(target.Domain, Domain) {
// local domain — mark as delivered rather than sending remotely
Expand Down Expand Up @@ -228,11 +244,11 @@ func deliverMessage(target pendingTarget) {
WHERE mt.msg_id = $1
AND mt.time_delivered IS NULL
AND m.time_sent IS NOT NULL
AND (mt.response_code IS NULL OR mt.response_code IN (3, 5, 101))
AND (mt.response_code IS NULL OR mt.response_code = ANY($5))
AND (mt.time_last_attempt IS NULL OR ($2 - mt.time_last_attempt) > LEAST($3 * POWER(2.0, GREATEST(mt.attempt_count - 1, 0)::float), $4))
AND ($2 - m.time_sent) < $4
FOR UPDATE OF mt SKIP LOCKED
`, target.MsgID, now, RetryInterval, RetryMaxAge)
`, target.MsgID, now, RetryInterval, RetryMaxAge, pq.Array(retryableResponseCodes))
if err != nil {
log.Printf("ERROR: sender: lock rows for msg %d: %s", target.MsgID, err)
return
Expand Down Expand Up @@ -265,11 +281,11 @@ func deliverMessage(target pendingTarget) {
WHERE mat.msg_id = $1
AND mat.time_delivered IS NULL
AND m.time_sent IS NOT NULL
AND (mat.response_code IS NULL OR mat.response_code IN (3, 5, 101))
AND (mat.response_code IS NULL OR mat.response_code = ANY($5))
AND (mat.time_last_attempt IS NULL OR ($2 - mat.time_last_attempt) > LEAST($3 * POWER(2.0, GREATEST(mat.attempt_count - 1, 0)::float), $4))
AND ($2 - m.time_sent) < $4
FOR UPDATE OF mat SKIP LOCKED
`, target.MsgID, now, RetryInterval, RetryMaxAge)
`, target.MsgID, now, RetryInterval, RetryMaxAge, pq.Array(retryableResponseCodes))
if err != nil {
log.Printf("ERROR: sender: lock add-to rows for msg %d: %s", target.MsgID, err)
return
Expand Down Expand Up @@ -298,6 +314,13 @@ func deliverMessage(target pendingTarget) {
return // already locked by another sender or no longer eligible
}

deferRetry := true
defer func() {
if deferRetry && !committed {
recordRetryableFailure(tx, &committed, lockedAddrs, lockedAddToAddrs, target.MsgID)
}
}()

// Load the full message from msg table
h, err := loadMsg(tx, target.MsgID)
if err != nil {
Expand Down Expand Up @@ -355,6 +378,10 @@ func deliverMessage(target pendingTarget) {
log.Printf("ERROR: sender: storing sha256 for msg %d: %s", target.MsgID, err)
return
}
if err := resolvePendingChildLinks(txParentLinkStore{tx: tx}, target.MsgID, msgHash); err != nil {
log.Printf("ERROR: sender: resolving child pids for msg %d: %s", target.MsgID, err)
return
}

// Compute header hash now; registerOutgoing with Host B's IP happens after
// the connection is established (IP needed for challenge validation §10.5).
Expand Down Expand Up @@ -467,6 +494,7 @@ func deliverMessage(target pendingTarget) {
}
log.Printf("INFO: sender: msg %d add-to accepted by %s (code 11)", target.MsgID, target.Domain)
updateAllLocked(tx, lockedAddrs, lockedAddToAddrs, target.MsgID, now, int(initCode[0]), true)
deferRetry = false
commitOrLog(tx, &committed, target.MsgID)
return
default:
Expand All @@ -475,6 +503,7 @@ func deliverMessage(target pendingTarget) {
log.Printf("WARN: sender: msg %d rejected by %s: %s (%d)",
target.MsgID, target.Domain, responseCodeName(initCode[0]), initCode[0])
updateAllLocked(tx, lockedAddrs, lockedAddToAddrs, target.MsgID, now, int(initCode[0]), false)
deferRetry = false
commitOrLog(tx, &committed, target.MsgID)
} else {
// unexpected code — TERMINATE
Expand Down Expand Up @@ -512,6 +541,7 @@ func deliverMessage(target pendingTarget) {
updateRecipient(tx, table, dr.addr, target.MsgID, now, int(c), delivered)
}

deferRetry = false
commitOrLog(tx, &committed, target.MsgID)
}

Expand Down
Loading
Loading