Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
protected final Optional<KeySchema> indexKeySchema;
private final long retentionPeriod;

protected StateStoreContext stateStoreContext;
protected InternalProcessorContext<?, ?> internalProcessorContext;
private Sensor expiredRecordSensor;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
Expand Down Expand Up @@ -145,7 +146,7 @@ void putIndex(final Bytes indexKey, final byte[] value) {

final long timestamp = indexKeySchema.get().segmentTimestamp(indexKey);
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, stateStoreContext, observedStreamTime);

if (segment != null) {
segment.put(indexKey, value);
Expand All @@ -159,7 +160,7 @@ byte[] getIndex(final Bytes indexKey) {

final long timestamp = indexKeySchema.get().segmentTimestamp(indexKey);
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, stateStoreContext, observedStreamTime);

if (segment != null) {
return segment.get(indexKey);
Expand All @@ -174,7 +175,7 @@ void removeIndex(final Bytes indexKey) {

final long timestamp = indexKeySchema.get().segmentTimestamp(indexKey);
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, stateStoreContext, observedStreamTime);

if (segment != null) {
segment.delete(indexKey);
Expand All @@ -187,14 +188,16 @@ public void put(final Bytes rawBaseKey,
final long timestamp = baseKeySchema.segmentTimestamp(rawBaseKey);
observedStreamTime = Math.max(observedStreamTime, timestamp);
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, stateStoreContext, observedStreamTime);

if (segment == null) {
expiredRecordSensor.record(1.0d, internalProcessorContext.currentSystemTimeMs());
if (internalProcessorContext != null) {
expiredRecordSensor.record(1.0d, internalProcessorContext.currentSystemTimeMs());
}
LOG.warn("Skipping record for expired segment.");
} else {
synchronized (position) {
StoreQueryUtils.updatePosition(position, internalProcessorContext);
StoreQueryUtils.updatePosition(position, stateStoreContext);

// Put to index first so that if put to base failed, when we iterate index, we will
// find no base value. If put to base first but putting to index fails, when we iterate
Expand Down Expand Up @@ -242,7 +245,13 @@ public String name() {

@Override
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);
if (stateStoreContext instanceof InternalProcessorContext) {
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);
this.stateStoreContext = stateStoreContext;
} else {
this.internalProcessorContext = null;
this.stateStoreContext = stateStoreContext;
}

final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
Expand All @@ -259,7 +268,7 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
segments.setPosition(this.position);

segments.openExisting(internalProcessorContext, observedStreamTime);
segments.openExisting(stateStoreContext, observedStreamTime);

// register and possibly restore the state from the logs
stateStoreContext.register(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
private final long retentionPeriod;
private final KeySchema keySchema;

private StateStoreContext stateStoreContext;
private InternalProcessorContext<?, ?> internalProcessorContext;
private Sensor expiredRecordSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
Expand Down Expand Up @@ -250,12 +251,14 @@ public void put(final Bytes key,
final long timestamp = keySchema.segmentTimestamp(key);
observedStreamTime = Math.max(observedStreamTime, timestamp);
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, stateStoreContext, observedStreamTime);
if (segment == null) {
expiredRecordSensor.record(1.0d, internalProcessorContext.currentSystemTimeMs());
if (internalProcessorContext != null) {
expiredRecordSensor.record(1.0d, internalProcessorContext.currentSystemTimeMs());
}
} else {
synchronized (position) {
StoreQueryUtils.updatePosition(position, internalProcessorContext);
StoreQueryUtils.updatePosition(position, stateStoreContext);
segment.put(key, value);
}
}
Expand Down Expand Up @@ -284,7 +287,13 @@ public String name() {

@Override
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);
if (stateStoreContext instanceof InternalProcessorContext) {
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);
this.stateStoreContext = stateStoreContext;
} else {
this.internalProcessorContext = null;
this.stateStoreContext = stateStoreContext;
}

final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
Expand All @@ -300,7 +309,7 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
segments.setPosition(position);
segments.openExisting(internalProcessorContext, observedStreamTime);
segments.openExisting(stateStoreContext, observedStreamTime);

// register and possibly restore the state from the logs
stateStoreContext.register(
Expand Down Expand Up @@ -372,7 +381,7 @@ Map<S, WriteBatch> getWriteBatches(final Collection<ConsumerRecord<byte[], byte[
for (final ConsumerRecord<byte[], byte[]> record : records) {
final long timestamp = keySchema.segmentTimestamp(Bytes.wrap(record.key()));
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, stateStoreContext, observedStreamTime);
if (segment != null) {
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
record,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
private final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, byte[]>> segmentMap = new ConcurrentSkipListMap<>();
private final Set<InMemoryWindowStoreIteratorWrapper> openIterators = ConcurrentHashMap.newKeySet();

private StateStoreContext stateStoreContext;
private InternalProcessorContext<?, ?> internalProcessorContext;
private Sensor expiredRecordSensor;
private int seqnum = 0;
Expand Down Expand Up @@ -103,8 +104,14 @@ public String name() {
@Override
public void init(final StateStoreContext stateStoreContext,
final StateStore root) {
this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);

if (stateStoreContext instanceof InternalProcessorContext) {
this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
this.stateStoreContext = stateStoreContext;
} else {
this.internalProcessorContext = null;
this.stateStoreContext = stateStoreContext;
}

final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();
Expand Down Expand Up @@ -155,7 +162,9 @@ public void put(final Bytes key, final byte[] value, final long windowStartTimes

synchronized (position) {
if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
expiredRecordSensor.record(1.0d, internalProcessorContext.currentSystemTimeMs());
if (internalProcessorContext != null) {
expiredRecordSensor.record(1.0d, internalProcessorContext.currentSystemTimeMs());
}
LOG.warn("Skipping record for expired segment.");
} else {
if (value != null) {
Expand All @@ -175,7 +184,7 @@ public void put(final Bytes key, final byte[] value, final long windowStartTimes
}
}

StoreQueryUtils.updatePosition(position, internalProcessorContext);
StoreQueryUtils.updatePosition(position, stateStoreContext);
}
}

Expand Down Expand Up @@ -381,7 +390,7 @@ public <R> QueryResult<R> query(final Query<R> query,
config,
this,
position,
internalProcessorContext
stateStoreContext
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Map<KeyValueSegment, WriteBatch> getWriteBatches(final Collection<ConsumerRecord
observedStreamTime = Math.max(observedStreamTime, timestamp);
minTimestamp = Math.min(minTimestamp, timestamp);
final long segmentId = segments.segmentId(timestamp);
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, stateStoreContext, observedStreamTime);
if (segment != null) {
//null segment is if it has expired, so we don't want those records
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ Map<KeyValueSegment, WriteBatch> getWriteBatches(
for (final ConsumerRecord<byte[], byte[]> record : records) {
final long timestamp = SessionKeySchema.extractEndTimestamp(record.key());
final long segmentId = segments.segmentId(timestamp);
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, stateStoreContext, observedStreamTime);
if (segment != null) {
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
record,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Map<KeyValueSegment, WriteBatch> getWriteBatches(
for (final ConsumerRecord<byte[], byte[]> record : records) {
final long timestamp = WindowKeySchema.extractStoreTimestamp(record.key());
final long segmentId = segments.segmentId(timestamp);
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, stateStoreContext, observedStreamTime);
if (segment != null) {
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
record,
Expand Down
Loading