Skip to content

Conversation

@minal-kyada
Copy link

Summary

Implements replica-side write threshold warnings to detect and warn about writes to large partitions or partitions with many tombstones. This follows the same architecture as the existing read threshold warnings but is warning-only (no blocking behavior).

Related Work

This implementation follows the pattern established by read threshold warnings but adapts it for write operations with a warning-only approach.

patch by Minal Kyada reviewed by TBD for CASSANDRA-17258


public volatile boolean write_thresholds_enabled = false;
public volatile DataStorageSpec.LongBytesBound write_size_warn_threshold = null;
public volatile int write_tombstone_warn_threshold = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use -1 for disabled. The reason that

    public volatile int tombstone_warn_threshold = 1000;
    public volatile int tombstone_failure_threshold = 100000;

have values is that we retro fit those old behaviors into read_thresholds, which fixed their behavior to work better. They are the only configs enabled by default and have been enabled for years.

read thresholds has been around for awhile, so I might send a patch to enable things by default, but that still won't change this patch as this patch adds a new feature; so should be off by default.

{
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
totalTombstones += update.affectedRowCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method doesn't have anything to do with tombstones

{
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
totalSize += update.dataSize();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the metric we actually want? this is how large a single mutation is, but we won't warn that its writing to a partition thats "too large"?

Comment on lines +661 to +662
maybeWarnWriteSize(mutations, options);
maybeWarnWriteTombstones(mutations, options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this logic here rather than in the write coordination like read is? Wasn't the assumption that we check replica state which wouldn't be possible here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pre-flight check at the co-ordinator side, it's tracking large write operations, I added this because today we do have a pre-check on large read operations, and we take actions against those calls, should we have a similar implementation for write too?

Additionally, I have implemented replica side write threshold check too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do have a pre-check on large read operations

are those not different checks? There is read_size which is replica, and coordinator_read_size which is how much the coordinator used; this check has 2 very different definitions and won't be clear from client point of view what happens if these checks actually return a warn

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the co-ordinator side checks can be guardrailed for read and write both. Removing these changes from this PR,a s it needs another one.

Comment on lines +40 to +42
Message<NoPayload> reply = respondTo.emptyResponse();
reply = MessageParams.addToMessage(reply);
MessagingService.instance().send(reply, respondToAddress);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can make this a single line

MessagingService.instance().send(MessageParams.addToMessage(respondTo.emptyResponse()), respondToAddress);

DataStorageSpec.LongBytesBound sizeWarnThreshold = DatabaseDescriptor.getWriteSizeWarnThreshold();
int tombstoneWarnThreshold = DatabaseDescriptor.getWriteTombstoneWarnThreshold();

if (sizeWarnThreshold == null && tombstoneWarnThreshold == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tombstoneWarnThreshold == -1

Long currentSize = org.apache.cassandra.db.MessageParams.get(ParamType.WRITE_SIZE_WARN);
if (currentSize == null || currentSize < estimatedSize)
{
org.apache.cassandra.db.MessageParams.add(ParamType.WRITE_SIZE_WARN, estimatedSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we import this class to make this easier to read?

long estimatedSize = cfs.topPartitions.topSizes().getEstimate(key);
long estimatedTombstones = cfs.topPartitions.topTombstones().getEstimate(key);

if (sizeWarnBytes > 0 && estimatedSize > sizeWarnBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (sizeWarnBytes > 0 && estimatedSize > sizeWarnBytes)
if (sizeWarnBytes != -1 && estimatedSize > sizeWarnBytes)

Should be consistent. -1 is the "not enabled" case, so should check for that

org.apache.cassandra.db.MessageParams.add(ParamType.WRITE_SIZE_WARN, estimatedSize);

TableMetadata meta = update.metadata();
String pk = meta.partitionKeyType.getString(key.getKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think toCQLString is better than getString.


TableMetadata meta = update.metadata();
String pk = meta.partitionKeyType.getString(key.getKey());
logger.warn("Write to {}.{} partition {} triggered size warning; " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could spam, can we use a NoSpamLogger and only log once a minute?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks for highlighting this!


TableMetadata meta = update.metadata();
String pk = meta.partitionKeyType.getString(key.getKey());
logger.warn("Write to {}.{} partition {} triggered tombstone warning; " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could spam, can we use a NoSpamLogger and log once a minute?

org.apache.cassandra.db.MessageParams.add(ParamType.WRITE_TOMBSTONE_WARN, (int) estimatedTombstones);

TableMetadata meta = update.metadata();
String pk = meta.partitionKeyType.getString(key.getKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, toCQLString is prob best here

String pk = meta.partitionKeyType.getString(key.getKey());
logger.warn("Write to {}.{} partition {} triggered size warning; " +
"estimated size is {} bytes, threshold is {} bytes (see write_size_warn_threshold)",
meta.keyspace, meta.name, pk, estimatedSize, sizeWarnBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI the toString for TableMetadata is keyspace.name so you could simplify this and remove {}.{} and just do {} and pass in meta. It will also do the correct quoting if needed

String pk = meta.partitionKeyType.getString(key.getKey());
logger.warn("Write to {}.{} partition {} triggered tombstone warning; " +
"estimated tombstone count is {}, threshold is {} (see write_tombstone_warn_threshold)",
meta.keyspace, meta.name, pk, estimatedTombstones, tombstoneWarnThreshold);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI the toString for TableMetadata is keyspace.name so you could simplify this and remove {}.{} and just do {} and pass in meta. It will also do the correct quoting if needed

DataStorageSpec.LongBytesBound sizeWarnThreshold = DatabaseDescriptor.getWriteSizeWarnThreshold();
int tombstoneWarnThreshold = DatabaseDescriptor.getWriteTombstoneWarnThreshold();

if (sizeWarnThreshold == null && tombstoneWarnThreshold == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (sizeWarnThreshold == null && tombstoneWarnThreshold == 0)
if (sizeWarnThreshold == null && tombstoneWarnThreshold == -1)

{
top.pollLast();
TopPartition p = top.pollLast();
lookup.remove(p.key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you remove from the lookup but you don't add?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, missed the lookup.put() call. Thanks!

if (!Paxos.isInRangeAndShouldProcess(from, agreed.update.partitionKey(), agreed.update.metadata(), false))
return null;

WriteThresholds.checkWriteThresholds(agreed.update, agreed.update.partitionKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im not sure if this would actually work in practice

if (executeOnSelf)
        {
            ExecutorPlus executor = PAXOS_COMMIT_REQ.stage.executor();
            if (async) executor.execute(this::executeOnSelf);
            else executor.maybeExecuteImmediately(this::executeOnSelf);
        }

in some code paths we do a blocking execute, in others we do it async and ignore when it completes

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we hold onto making these changes in this patch.

* Accumulates warnings from multiple write operations in a single client request,
* then sends them to the client and updates metrics.
*/
public class CoordinatorWriteWarnings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a lot of overlap with CoordinatorWarnings, can we refactor so read/write can use 80% the same logic and only the read/write related changes need to be custom?

Comment on lines +34 to +35
maxWarningValue.accumulateAndGet(value, Math::max);
warnings.add(from);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't thread safe... what is the concurrency expecations for this method?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants