Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ public class MeteredKeyValueStore<K, V>
protected NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));


@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
private final Map<Class<?>, QueryHandler<?>> queryHandlers =
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we cannot have safe types, make it explicit using <?> (similar elsewhere)

mkMap(
mkEntry(
RangeQuery.class,
Expand Down Expand Up @@ -230,31 +229,29 @@ record -> {

@SuppressWarnings("unchecked")
@Override
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {

public <R> QueryResult<R> query(
final Query<R> query,
final PositionBound positionBound,
final QueryConfig config
) {
final long start = time.nanoseconds();
final QueryResult<R> result;

final QueryHandler handler = queryHandlers.get(query.getClass());
final QueryHandler<?> handler = queryHandlers.get(query.getClass());
if (handler == null) {
result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
result.addExecutionInfo("Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
}
} else {
result = (QueryResult<R>) handler.apply(
result = ((QueryHandler<R>) handler).apply(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That we have improved type check on the the QueryHandler, we need to cast the handler, and get the correct result type automatically.

This is still cleaner. The issue is with the queryHandlers Map that introduces the missing types, so the "problem" is not at the right place in the code.

(similar elsewhere in this PR)

query,
positionBound,
config,
this
);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + getClass() + " with serdes "
+ serdes + " in " + (time.nanoseconds() - start) + "ns");
result.addExecutionInfo("Handled in " + getClass() + " with serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,13 @@ public class MeteredSessionStore<K, V>
protected final LongAdder numOpenIterators = new LongAdder();
protected final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));

@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
mkMap(
mkEntry(
WindowRangeQuery.class,
(query, positionBound, config, store) -> runRangeQuery(query, positionBound, config)
)
);
private final Map<Class<?>, QueryHandler<?>> queryHandlers =
mkMap(
mkEntry(
WindowRangeQuery.class,
(query, positionBound, config, store) -> runRangeQuery(query, positionBound, config)
)
);


MeteredSessionStore(final SessionStore<Bytes, byte[]> inner,
Expand Down Expand Up @@ -445,39 +444,40 @@ public void close() {

@SuppressWarnings("unchecked")
@Override
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
public <R> QueryResult<R> query(
final Query<R> query,
final PositionBound positionBound,
final QueryConfig config
) {
final long start = time.nanoseconds();
final QueryResult<R> result;

final QueryHandler handler = queryHandlers.get(query.getClass());
final QueryHandler<?> handler = queryHandlers.get(query.getClass());
if (handler == null) {
result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
result.addExecutionInfo("Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
}
} else {
result = (QueryResult<R>) handler.apply(
result = ((QueryHandler<R>) handler).apply(
query,
positionBound,
config,
this
);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + getClass() + " with serdes "
+ serdes + " in " + (time.nanoseconds() - start) + "ns");
result.addExecutionInfo("Handled in " + getClass() + " with serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
}

@SuppressWarnings("unchecked")
private <R> QueryResult<R> runRangeQuery(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
private <R> QueryResult<R> runRangeQuery(
final Query<R> query,
final PositionBound positionBound,
final QueryConfig config
) {
final QueryResult<R> result;
final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) query;
if (typedQuery.getKey().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;

import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -67,26 +68,25 @@ public class MeteredTimestampedKeyValueStore<K, V>
super(inner, metricScope, time, keySerde, valueSerde);
}

@SuppressWarnings("rawtypes")
private final Map<Class, StoreQueryUtils.QueryHandler> queryHandlers =
mkMap(
mkEntry(
RangeQuery.class,
(query, positionBound, config, store) -> runRangeQuery(query, positionBound, config)
),
mkEntry(
TimestampedRangeQuery.class,
(query, positionBound, config, store) -> runTimestampedRangeQuery(query, positionBound, config)
),
mkEntry(
KeyQuery.class,
(query, positionBound, config, store) -> runKeyQuery(query, positionBound, config)
),
mkEntry(
TimestampedKeyQuery.class,
(query, positionBound, config, store) -> runTimestampedKeyQuery(query, positionBound, config)
)
);
private final Map<Class<?>, QueryHandler<?>> queryHandlers =
mkMap(
mkEntry(
RangeQuery.class,
(query, positionBound, config, store) -> runRangeQuery(query, positionBound, config)
),
mkEntry(
TimestampedRangeQuery.class,
(query, positionBound, config, store) -> runTimestampedRangeQuery(query, positionBound, config)
),
mkEntry(
KeyQuery.class,
(query, positionBound, config, store) -> runKeyQuery(query, positionBound, config)
),
mkEntry(
TimestampedKeyQuery.class,
(query, positionBound, config, store) -> runTimestampedKeyQuery(query, positionBound, config)
)
);

@SuppressWarnings("unchecked")
@Override
Expand Down Expand Up @@ -134,40 +134,40 @@ static class RawAndDeserializedValue<ValueType> {

@SuppressWarnings("unchecked")
@Override
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {

public <R> QueryResult<R> query(
final Query<R> query,
final PositionBound positionBound,
final QueryConfig config
) {
final long start = time.nanoseconds();
final QueryResult<R> result;

final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass());
final QueryHandler<?> handler = queryHandlers.get(query.getClass());
if (handler == null) {
result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
result.addExecutionInfo("Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
}
} else {
result = (QueryResult<R>) handler.apply(
query,
positionBound,
config,
this
result = ((QueryHandler<R>) handler).apply(
query,
positionBound,
config,
this
);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + getClass() + " with serdes "
+ serdes + " in " + (time.nanoseconds() - start) + "ns");
result.addExecutionInfo("Handled in " + getClass() + " with serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
}

@SuppressWarnings("unchecked")
private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
private <R> QueryResult<R> runTimestampedKeyQuery(
final Query<R> query,
final PositionBound positionBound,
final QueryConfig config
) {
final QueryResult<R> result;
final TimestampedKeyQuery<K, V> typedKeyQuery = (TimestampedKeyQuery<K, V>) query;
final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key()));
Expand All @@ -176,7 +176,7 @@ private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
final Function<byte[], ValueAndTimestamp<V>> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped());
final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult());
final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, valueAndTimestamp);
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, valueAndTimestamp);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no result set.
Expand All @@ -186,10 +186,11 @@ private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
}

@SuppressWarnings("unchecked")
private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {

private <R> QueryResult<R> runTimestampedRangeQuery(
final Query<R> query,
final PositionBound positionBound,
final QueryConfig config
) {
final QueryResult<R> result;
final TimestampedRangeQuery<K, V> typedQuery = (TimestampedRangeQuery<K, V>) query;
RangeQuery<Bytes, byte[]> rawRangeQuery;
Expand All @@ -205,20 +206,21 @@ private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
rawRangeQuery = rawRangeQuery.withAscendingKeys();
}
final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
wrapped().query(rawRangeQuery, positionBound, config);
wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult();
final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator = (KeyValueIterator<K, ValueAndTimestamp<V>>) new MeteredTimestampedKeyValueStoreIterator(
final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
(KeyValueIterator<K, ValueAndTimestamp<V>>) new MeteredTimestampedKeyValueStoreIterator(
iterator,
getSensor,
StoreQueryUtils.deserializeValue(serdes, wrapped()),
false
);
);
final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
rawResult,
resultIterator
);
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
rawResult,
resultIterator
);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no result set.
Expand All @@ -228,9 +230,11 @@ private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
}

@SuppressWarnings("unchecked")
private <R> QueryResult<R> runKeyQuery(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
private <R> QueryResult<R> runKeyQuery(
final Query<R> query,
final PositionBound positionBound,
final QueryConfig config
) {
final QueryResult<R> result;
final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.getKey()));
Expand All @@ -241,7 +245,7 @@ private <R> QueryResult<R> runKeyQuery(final Query<R> query,
final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult());
final V plainValue = valueAndTimestamp == null ? null : valueAndTimestamp.value();
final QueryResult<V> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, plainValue);
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, plainValue);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no result set.
Expand All @@ -251,10 +255,11 @@ private <R> QueryResult<R> runKeyQuery(final Query<R> query,
}

@SuppressWarnings("unchecked")
private <R> QueryResult<R> runRangeQuery(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {

private <R> QueryResult<R> runRangeQuery(
final Query<R> query,
final PositionBound positionBound,
final QueryConfig config
) {
final QueryResult<R> result;
final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
RangeQuery<Bytes, byte[]> rawRangeQuery;
Expand All @@ -270,7 +275,7 @@ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
rawRangeQuery = rawRangeQuery.withAscendingKeys();
}
final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
wrapped().query(rawRangeQuery, positionBound, config);
wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult();
final KeyValueIterator<K, V> resultIterator = new MeteredTimestampedKeyValueStoreIterator(
Expand All @@ -280,10 +285,10 @@ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
true
);
final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
rawResult,
resultIterator
);
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
rawResult,
resultIterator
);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no result set.
Expand Down
Loading
Loading