1515 */
1616package io .serverlessworkflow .impl .executors .grpc ;
1717
18+ import com .fasterxml .jackson .core .JsonProcessingException ;
19+ import com .fasterxml .jackson .databind .JsonNode ;
20+ import com .fasterxml .jackson .databind .node .NullNode ;
21+ import com .google .protobuf .DescriptorProtos ;
22+ import com .google .protobuf .Descriptors ;
23+ import com .google .protobuf .DynamicMessage ;
24+ import com .google .protobuf .InvalidProtocolBufferException ;
25+ import com .google .protobuf .Message ;
26+ import io .grpc .CallOptions ;
27+ import io .grpc .Channel ;
28+ import io .grpc .ClientCall ;
29+ import io .grpc .MethodDescriptor ;
30+ import io .grpc .protobuf .ProtoUtils ;
31+ import io .grpc .stub .ClientCalls ;
32+ import io .grpc .stub .StreamObserver ;
33+ import io .serverlessworkflow .api .types .ExternalResource ;
1834import io .serverlessworkflow .impl .TaskContext ;
1935import io .serverlessworkflow .impl .WorkflowContext ;
36+ import io .serverlessworkflow .impl .WorkflowError ;
37+ import io .serverlessworkflow .impl .WorkflowException ;
2038import io .serverlessworkflow .impl .WorkflowModel ;
2139import io .serverlessworkflow .impl .WorkflowValueResolver ;
2240import io .serverlessworkflow .impl .executors .CallableTask ;
41+ import io .serverlessworkflow .impl .jackson .JsonUtils ;
42+ import java .util .ArrayList ;
43+ import java .util .Collection ;
44+ import java .util .List ;
2345import java .util .Map ;
46+ import java .util .Objects ;
2447import java .util .concurrent .CompletableFuture ;
2548
2649public class GrpcExecutor implements CallableTask {
2750
2851 private final GrpcRequestContext requestContext ;
29- private final GrpcCallExecutor grpcCallExecutor ;
3052 private final WorkflowValueResolver <Map <String , Object >> arguments ;
53+ private final FileDescriptorContext fileDescriptorContext ;
54+ private final ExternalResource proto ;
3155
3256 public GrpcExecutor (
3357 GrpcRequestContext builder ,
34- GrpcCallExecutor grpcCallExecutor ,
35- WorkflowValueResolver <Map <String , Object >> arguments ) {
58+ WorkflowValueResolver <Map <String , Object >> arguments ,
59+ FileDescriptorContext fileDescriptorContext ,
60+ ExternalResource proto ) {
3661 this .requestContext = builder ;
37- this .grpcCallExecutor = grpcCallExecutor ;
3862 this .arguments = arguments ;
63+ this .fileDescriptorContext = fileDescriptorContext ;
64+ this .proto = proto ;
3965 }
4066
4167 @ Override
@@ -44,9 +70,193 @@ public CompletableFuture<WorkflowModel> apply(
4470
4571 Map <String , Object > arguments = this .arguments .apply (workflowContext , taskContext , input );
4672
47- return CompletableFuture .supplyAsync (
48- () ->
49- this .grpcCallExecutor .apply (
50- requestContext , workflowContext , taskContext , input , arguments ));
73+ CompletableFuture <WorkflowModel > completableFuture = new CompletableFuture <>();
74+
75+ GrpcCallExecutor grpcExec =
76+ buildGrpcCallExecutor (workflowContext , taskContext , arguments , completableFuture );
77+
78+ return grpcExec .apply (
79+ this .requestContext ,
80+ workflowContext ,
81+ taskContext ,
82+ input ,
83+ arguments ,
84+ this .fileDescriptorContext );
85+ }
86+
87+ private GrpcCallExecutor buildGrpcCallExecutor (
88+ WorkflowContext workflowContext ,
89+ TaskContext taskContext ,
90+ Map <String , Object > arguments ,
91+ CompletableFuture <WorkflowModel > completableFuture ) {
92+ return (GrpcRequestContext requestContext ,
93+ WorkflowContext workflowCtx ,
94+ TaskContext taskCtx ,
95+ WorkflowModel i ,
96+ Map <String , Object > args ,
97+ FileDescriptorContext fileDescriptorContext ) -> {
98+ Channel channel = GrpcChannelResolver .channel (workflowCtx , taskCtx , requestContext );
99+
100+ String protoName = fileDescriptorContext .inputProto ();
101+
102+ DescriptorProtos .FileDescriptorProto fileDescriptorProto =
103+ fileDescriptorContext .fileDescriptorSet ().getFileList ().stream ()
104+ .filter (
105+ file ->
106+ file .getName ()
107+ .equals (this .proto .getName () != null ? this .proto .getName () : protoName ))
108+ .findFirst ()
109+ .orElseThrow (
110+ () -> new IllegalStateException ("Proto file not found in descriptor set" ));
111+
112+ try {
113+ Descriptors .FileDescriptor fileDescriptor =
114+ Descriptors .FileDescriptor .buildFrom (
115+ fileDescriptorProto , new Descriptors .FileDescriptor [] {});
116+
117+ Descriptors .ServiceDescriptor serviceDescriptor =
118+ fileDescriptor .findServiceByName (requestContext .service ());
119+
120+ Objects .requireNonNull (serviceDescriptor , "Service not found: " + requestContext .service ());
121+
122+ Descriptors .MethodDescriptor methodDescriptor =
123+ serviceDescriptor .findMethodByName (requestContext .method ());
124+
125+ Objects .requireNonNull (methodDescriptor , "Method not found: " + requestContext .method ());
126+
127+ MethodDescriptor .MethodType methodType =
128+ ProtobufMessageUtils .getMethodType (methodDescriptor );
129+
130+ ClientCall <Message , Message > call =
131+ buildClientCall (channel , methodType , serviceDescriptor , methodDescriptor );
132+
133+ return switch (methodType ) {
134+ case CLIENT_STREAMING ->
135+ CompletableFuture .completedFuture (
136+ handleClientStreaming (workflowContext , arguments , methodDescriptor , call ));
137+ case BIDI_STREAMING ->
138+ CompletableFuture .completedFuture (
139+ handleBidiStreaming (workflowContext , arguments , methodDescriptor , call ));
140+ case SERVER_STREAMING ->
141+ CompletableFuture .completedFuture (
142+ handleServerStreaming (workflowContext , methodDescriptor , arguments , call ));
143+ case UNARY , UNKNOWN ->
144+ handleAsyncUnary (workflowContext , methodDescriptor , arguments , call );
145+ };
146+
147+ } catch (Descriptors .DescriptorValidationException
148+ | InvalidProtocolBufferException
149+ | JsonProcessingException e ) {
150+ throw new WorkflowException (WorkflowError .runtime (taskContext , e ).build ());
151+ }
152+ };
153+ }
154+
155+ private static ClientCall <Message , Message > buildClientCall (
156+ Channel channel ,
157+ MethodDescriptor .MethodType methodType ,
158+ Descriptors .ServiceDescriptor serviceDescriptor ,
159+ Descriptors .MethodDescriptor methodDescriptor ) {
160+ return channel .newCall (
161+ MethodDescriptor .<Message , Message >newBuilder ()
162+ .setType (methodType )
163+ .setFullMethodName (
164+ MethodDescriptor .generateFullMethodName (
165+ serviceDescriptor .getFullName (), methodDescriptor .getName ()))
166+ .setRequestMarshaller (
167+ ProtoUtils .marshaller (
168+ DynamicMessage .newBuilder (methodDescriptor .getInputType ()).buildPartial ()))
169+ .setResponseMarshaller (
170+ ProtoUtils .marshaller (
171+ DynamicMessage .newBuilder (methodDescriptor .getOutputType ()).buildPartial ()))
172+ .build (),
173+ CallOptions .DEFAULT .withWaitForReady ());
174+ }
175+
176+ private static WorkflowModel handleClientStreaming (
177+ WorkflowContext workflowContext ,
178+ Map <String , Object > parameters ,
179+ Descriptors .MethodDescriptor methodDescriptor ,
180+ ClientCall <Message , Message > call ) {
181+ JsonNode jsonNode =
182+ ProtobufMessageUtils .asyncStreamingCall (
183+ parameters ,
184+ methodDescriptor ,
185+ responseObserver -> ClientCalls .asyncClientStreamingCall (call , responseObserver ),
186+ nodes -> nodes .isEmpty () ? NullNode .instance : nodes .get (0 ));
187+ return workflowContext .definition ().application ().modelFactory ().fromAny (jsonNode );
188+ }
189+
190+ private static WorkflowModel handleBidiStreaming (
191+ WorkflowContext workflowContext ,
192+ Map <String , Object > parameters ,
193+ Descriptors .MethodDescriptor methodDescriptor ,
194+ ClientCall <Message , Message > call ) {
195+ return workflowContext
196+ .definition ()
197+ .application ()
198+ .modelFactory ()
199+ .fromAny (
200+ ProtobufMessageUtils .asyncStreamingCall (
201+ parameters ,
202+ methodDescriptor ,
203+ responseObserver -> ClientCalls .asyncBidiStreamingCall (call , responseObserver ),
204+ v -> {
205+ Collection <JsonNode > nodes = v ;
206+ List <JsonNode > list = new ArrayList <>(nodes );
207+ return JsonUtils .fromValue (list );
208+ }));
209+ }
210+
211+ private static WorkflowModel handleServerStreaming (
212+ WorkflowContext workflowContext ,
213+ Descriptors .MethodDescriptor methodDescriptor ,
214+ Map <String , Object > parameters ,
215+ ClientCall <Message , Message > call )
216+ throws InvalidProtocolBufferException , JsonProcessingException {
217+ Message .Builder builder = ProtobufMessageUtils .buildMessage (methodDescriptor , parameters );
218+ List <JsonNode > nodes = new ArrayList <>();
219+ ClientCalls .blockingServerStreamingCall (call , builder .build ())
220+ .forEachRemaining (message -> nodes .add (ProtobufMessageUtils .convert (message )));
221+ return workflowContext .definition ().application ().modelFactory ().fromAny (nodes );
222+ }
223+
224+ private static CompletableFuture <WorkflowModel > handleAsyncUnary (
225+ WorkflowContext workflowContext ,
226+ Descriptors .MethodDescriptor methodDescriptor ,
227+ Map <String , Object > parameters ,
228+ ClientCall <Message , Message > call )
229+ throws InvalidProtocolBufferException , JsonProcessingException {
230+
231+ CompletableFuture <WorkflowModel > future = new CompletableFuture <>();
232+
233+ Message .Builder builder = ProtobufMessageUtils .buildMessage (methodDescriptor , parameters );
234+
235+ ClientCalls .asyncUnaryCall (
236+ call ,
237+ builder .build (),
238+ new StreamObserver <Message >() {
239+ @ Override
240+ public void onNext (Message value ) {
241+ WorkflowModel model =
242+ workflowContext
243+ .definition ()
244+ .application ()
245+ .modelFactory ()
246+ .fromAny (ProtobufMessageUtils .convert (value ));
247+ future .complete (model );
248+ }
249+
250+ @ Override
251+ public void onError (Throwable t ) {
252+ future .completeExceptionally (t );
253+ }
254+
255+ @ Override
256+ public void onCompleted () {
257+ // no-op
258+ }
259+ });
260+ return future ;
51261 }
52262}
0 commit comments