Skip to content

Commit d22e51a

Browse files
authored
Added support for Lettuce reactive Redis commands (#788)
1 parent 6594727 commit d22e51a

File tree

43 files changed

+1928
-15
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1928
-15
lines changed

.github/workflows/plugins-jdk17-test.1.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ jobs:
8080
- c3p0-0.9.2.x-0.10.x-scenario
8181
- spring-scheduled-6.x-scenario
8282
- caffeine-3.x-scenario
83+
- lettuce-webflux-6x-scenario
8384
steps:
8485
- uses: actions/checkout@v2
8586
with:

.github/workflows/plugins-test.1.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ jobs:
8888
- kotlin-coroutine-scenario
8989
- lettuce-scenario
9090
- lettuce-6.5.x-scenario
91+
- lettuce-webflux-5x-scenario
9192
- mongodb-3.x-scenario
9293
- mongodb-4.x-scenario
9394
- netty-socketio-scenario

CHANGES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Release Notes.
55
9.7.0
66
------------------
77

8-
8+
* Added support for Lettuce reactive Redis commands.
99

1010
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/249?closed=1)
1111

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.lettuce.v5;
20+
21+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
22+
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
23+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
24+
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
27+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
28+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
29+
import reactor.core.publisher.Flux;
30+
import reactor.core.publisher.Mono;
31+
import reactor.util.context.Context;
32+
33+
import java.lang.reflect.Method;
34+
import java.util.function.Function;
35+
36+
/**
37+
* Intercepts reactive publisher factory methods (createMono/createFlux)
38+
* to ensure the SkyWalking context snapshot is propagated via Reactor Context.
39+
*
40+
* <p>If the Reactor Context does not already contain a snapshot, this interceptor
41+
* captures the current active context and writes it into the subscriber context
42+
* as a fallback propagation mechanism.</p>
43+
*/
44+
public class RedisReactiveCreatePublisherMethodInterceptorV5 implements InstanceMethodsAroundInterceptorV2 {
45+
46+
private static final String SNAPSHOT_KEY = "SKYWALKING_CONTEXT_SNAPSHOT";
47+
48+
@Override
49+
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
50+
MethodInvocationContext context) {
51+
}
52+
53+
@Override
54+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) {
55+
56+
if (!(ret instanceof Mono) && !(ret instanceof Flux)) {
57+
return ret;
58+
}
59+
final AbstractSpan localSpan = ContextManager.createLocalSpan("Lettuce/Reactive/" + method.getName());
60+
localSpan.setComponent(ComponentsDefine.LETTUCE);
61+
SpanLayer.asCache(localSpan);
62+
63+
try {
64+
final ContextSnapshot snapshot = ContextManager.capture();
65+
66+
Function<Context, Context> contextFunction = ctx -> {
67+
if (ctx.hasKey(SNAPSHOT_KEY)) {
68+
return ctx;
69+
}
70+
return ctx.put(SNAPSHOT_KEY, snapshot);
71+
};
72+
73+
if (ret instanceof Mono) {
74+
Mono<?> original = (Mono<?>) ret;
75+
return original.subscriberContext(contextFunction);
76+
} else {
77+
Flux<?> original = (Flux<?>) ret;
78+
return original.subscriberContext(contextFunction);
79+
}
80+
} finally {
81+
ContextManager.stopSpan();
82+
}
83+
}
84+
85+
@Override
86+
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[]
87+
argumentsTypes, Throwable t, MethodInvocationContext context) {
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.lettuce.v5.define;
20+
21+
import net.bytebuddy.description.method.MethodDescription;
22+
import net.bytebuddy.matcher.ElementMatcher;
23+
import org.apache.skywalking.apm.agent.core.plugin.WitnessMethod;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
27+
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
28+
29+
import java.util.Collections;
30+
import java.util.List;
31+
32+
import static net.bytebuddy.matcher.ElementMatchers.named;
33+
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
34+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
35+
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
36+
37+
/**
38+
*
39+
*/
40+
public class RedisReactiveCommandsInstrumentationV5 extends ClassInstanceMethodsEnhancePluginDefineV2 {
41+
42+
private static final String ENHANCE_CLASS = "io.lettuce.core.AbstractRedisReactiveCommands";
43+
44+
private static final String REDIS_REACTIVE_CREATEMONO_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.lettuce.v5.RedisReactiveCreatePublisherMethodInterceptorV5";
45+
46+
@Override
47+
public InstanceMethodsInterceptV2Point[] getInstanceMethodsInterceptV2Points() {
48+
return new InstanceMethodsInterceptV2Point[]{
49+
new InstanceMethodsInterceptV2Point() {
50+
@Override
51+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
52+
return namedOneOf(
53+
"createMono",
54+
"createFlux",
55+
"createDissolvingFlux"
56+
).and(takesArguments(1));
57+
}
58+
59+
@Override
60+
public String getMethodsInterceptorV2() {
61+
return REDIS_REACTIVE_CREATEMONO_METHOD_INTERCEPTOR;
62+
}
63+
64+
@Override
65+
public boolean isOverrideArgs() {
66+
return false;
67+
}
68+
}
69+
};
70+
}
71+
72+
@Override
73+
public ClassMatch enhanceClass() {
74+
return byHierarchyMatch(ENHANCE_CLASS);
75+
}
76+
77+
@Override
78+
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
79+
return new ConstructorInterceptPoint[0];
80+
}
81+
82+
@Override
83+
protected List<WitnessMethod> witnessMethods() {
84+
return Collections.singletonList(new WitnessMethod(
85+
"reactor.core.publisher.Mono",
86+
named("subscriberContext")
87+
));
88+
}
89+
}

apm-sniffer/apm-sdk-plugin/lettuce-plugins/lettuce-5.x-6.4.x-plugin/src/main/resources/skywalking-plugin.def

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
lettuce-5.x-6.4.x=org.apache.skywalking.apm.plugin.lettuce.v5.define.RedisChannelWriterInstrumentationV5
17+
lettuce-5.x-6.4.x=org.apache.skywalking.apm.plugin.lettuce.v5.define.RedisChannelWriterInstrumentationV5
18+
lettuce-5.x-6.4.x=org.apache.skywalking.apm.plugin.lettuce.v5.define.RedisReactiveCommandsInstrumentationV5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.lettuce.v65;
20+
21+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
22+
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
23+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
24+
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
27+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
28+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
29+
import org.reactivestreams.Publisher;
30+
import reactor.core.publisher.Flux;
31+
import reactor.core.publisher.Mono;
32+
33+
import java.lang.reflect.Method;
34+
35+
/**
36+
* Intercepts reactive publisher factory methods (createMono/createFlux)
37+
* to ensure the SkyWalking context snapshot is propagated via Reactor Context.
38+
*
39+
* <p>If the Reactor Context does not already contain a snapshot, this interceptor
40+
* captures the current active context and writes it into the subscriber context
41+
* as a fallback propagation mechanism.</p>
42+
*/
43+
public class RedisReactiveCreatePublisherMethodInterceptorV65 implements InstanceMethodsAroundInterceptorV2 {
44+
45+
private static final String SNAPSHOT_KEY = "SKYWALKING_CONTEXT_SNAPSHOT";
46+
47+
@Override
48+
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
49+
MethodInvocationContext context) {
50+
}
51+
52+
@Override
53+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) {
54+
55+
if (!(ret instanceof Mono) && !(ret instanceof Flux)) {
56+
return ret;
57+
}
58+
59+
final AbstractSpan localSpan = ContextManager.createLocalSpan("Lettuce/Reactive/" + method.getName());
60+
localSpan.setComponent(ComponentsDefine.LETTUCE);
61+
SpanLayer.asCache(localSpan);
62+
63+
try {
64+
final ContextSnapshot snapshot = ContextManager.capture();
65+
66+
return wrapPublisher((Publisher<?>) ret, snapshot);
67+
} finally {
68+
ContextManager.stopSpan();
69+
}
70+
}
71+
72+
private <T> Publisher<T> wrapPublisher(Publisher<T> original, ContextSnapshot snapshot) {
73+
if (original instanceof Mono) {
74+
return Mono.deferContextual(ctxView -> {
75+
if (ctxView.hasKey(SNAPSHOT_KEY)) {
76+
return (Mono<T>) original;
77+
}
78+
return ((Mono<T>) original).contextWrite(c -> c.put(SNAPSHOT_KEY, snapshot));
79+
});
80+
} else {
81+
return Flux.deferContextual(ctxView -> {
82+
if (ctxView.hasKey(SNAPSHOT_KEY)) {
83+
return original;
84+
}
85+
return ((Flux<T>) original).contextWrite(c -> c.put(SNAPSHOT_KEY, snapshot));
86+
});
87+
}
88+
}
89+
90+
@Override
91+
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t, MethodInvocationContext context) {
92+
}
93+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.lettuce.v65.define;
20+
21+
import net.bytebuddy.description.method.MethodDescription;
22+
import net.bytebuddy.matcher.ElementMatcher;
23+
import net.bytebuddy.matcher.ElementMatchers;
24+
import org.apache.skywalking.apm.agent.core.plugin.WitnessMethod;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;
27+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
28+
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
29+
30+
import java.util.Collections;
31+
import java.util.List;
32+
33+
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
34+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
35+
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
36+
37+
/**
38+
*
39+
*/
40+
public class RedisReactiveCommandsInstrumentationV65 extends ClassInstanceMethodsEnhancePluginDefineV2 {
41+
42+
private static final String ENHANCE_CLASS = "io.lettuce.core.AbstractRedisReactiveCommands";
43+
44+
private static final String REDIS_REACTIVE_CREATEMONO_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.lettuce.v65.RedisReactiveCreatePublisherMethodInterceptorV65";
45+
46+
@Override
47+
public InstanceMethodsInterceptV2Point[] getInstanceMethodsInterceptV2Points() {
48+
return new InstanceMethodsInterceptV2Point[]{
49+
new InstanceMethodsInterceptV2Point() {
50+
@Override
51+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
52+
return namedOneOf(
53+
"createMono",
54+
"createFlux",
55+
"createDissolvingFlux"
56+
).and(takesArguments(1));
57+
}
58+
59+
@Override
60+
public String getMethodsInterceptorV2() {
61+
return REDIS_REACTIVE_CREATEMONO_METHOD_INTERCEPTOR;
62+
}
63+
64+
@Override
65+
public boolean isOverrideArgs() {
66+
return false;
67+
}
68+
}
69+
};
70+
}
71+
72+
@Override
73+
public ClassMatch enhanceClass() {
74+
return byHierarchyMatch(ENHANCE_CLASS);
75+
}
76+
77+
@Override
78+
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
79+
return new ConstructorInterceptPoint[0];
80+
}
81+
82+
@Override
83+
protected List<WitnessMethod> witnessMethods() {
84+
return Collections.singletonList(new WitnessMethod(
85+
"reactor.core.publisher.Mono",
86+
ElementMatchers.named("deferContextual")
87+
));
88+
}
89+
}

0 commit comments

Comments
 (0)