forked from eclipse-uprotocol/up-cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRpcServer.cpp
More file actions
151 lines (131 loc) · 5.19 KB
/
RpcServer.cpp
File metadata and controls
151 lines (131 loc) · 5.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache License Version 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: Apache-2.0
#include "up-cpp/communication/RpcServer.h"
namespace uprotocol::communication {
namespace Validator = datamodel::validator;
RpcServer::RpcServer(std::shared_ptr<transport::UTransport> transport,
std::optional<v1::UPayloadFormat> format,
std::optional<std::chrono::milliseconds> ttl)
: transport_(std::move(transport)),
ttl_(ttl),
expected_payload_format_(format) {
if (!transport_) {
throw transport::NullTransport("transport cannot be null");
}
}
RpcServer::ServerOrStatus RpcServer::create(
std::shared_ptr<transport::UTransport> transport,
const v1::UUri& method_name, RpcCallback&& callback,
std::optional<v1::UPayloadFormat> payload_format,
std::optional<std::chrono::milliseconds> ttl) {
// Validate the method name using a URI validator.
auto [valid, reason] = Validator::uri::isValidRpcMethod(method_name);
if (!transport) {
throw transport::NullTransport("transport cannot be null");
}
if (!valid) {
// If the method name is invalid, return an error status.
v1::UStatus status;
status.set_code(v1::UCode::INVALID_ARGUMENT);
status.set_message("Invalid rpc URI");
return ServerOrStatus(utils::Unexpected<v1::UStatus>(status));
}
// Validate the payload format, if provided.
if (payload_format.has_value()) {
if (!UPayloadFormat_IsValid(payload_format.value())) {
// If the payload format is invalid, return an error status.
v1::UStatus status;
status.set_code(v1::UCode::OUT_OF_RANGE);
status.set_message("Invalid payload format");
return ServerOrStatus(utils::Unexpected<v1::UStatus>(status));
}
}
// Create the RpcServer instance with the provided parameters.
auto server = std::make_unique<RpcServer>(
std::forward<std::shared_ptr<transport::UTransport>>(transport),
std::forward<std::optional<v1::UPayloadFormat>>(payload_format),
std::forward<std::optional<std::chrono::milliseconds>>(ttl));
// Attempt to connect the server with the provided method name and callback.
auto status = server->connect(method_name, std::move(callback));
if (status.code() == v1::UCode::OK) {
// If connection is successful, return the server instance.
return ServerOrStatus(std::move(server));
}
// If connection fails, return the error status.
return ServerOrStatus(utils::Unexpected<v1::UStatus>(status));
}
v1::UStatus RpcServer::connect(const v1::UUri& method, RpcCallback&& callback) {
callback_ = std::move(callback);
if (!transport_) {
throw transport::NullTransport("transport cannot be null");
}
auto result = transport_->registerListener(
// listener=
[this](const v1::UMessage& request) {
// Validate the request message using a RPC message validator.
auto [valid, reason] =
Validator::message::isValidRpcRequest(request);
if (!valid) {
return;
}
// Create a response message builder using the request message.
auto builder =
datamodel::builder::UMessageBuilder::response(request);
// Call the RPC callback method with the request message.
auto payload_data = callback_(request);
if (ttl_.has_value()) {
builder.withTtl(ttl_.value());
}
if (expected_payload_format_.has_value()) {
builder.withPayloadFormat(expected_payload_format_.value());
}
// Check for payload data requirement based on expected format
// presence
if (!payload_data.has_value()) {
// builder.build() verifies if payload format is required
auto response = builder.build();
// Ignoring status code for transport send
std::ignore = transport_->send(response);
} else {
// builder.build(payloadData) verifies if payload format
// matches the expected
auto response = builder.build(std::move(payload_data).value());
// Ignoring status code for transport send
std::ignore = transport_->send(response);
}
},
// source_filter=
[]() {
v1::UUri any_uri;
any_uri.set_authority_name("*");
// Instance ID FFFF and UE ID FFFF for wildcard
constexpr auto WILDCARD_INSTANCE_ID_WITH_WILDCARD_SERVICE_ID =
0xFFFFFFFF;
constexpr auto VERSION_MAJOR_WILDCARD = 0xFF;
constexpr auto RESOURCE_ID_WILDCARD = 0xFFFF;
any_uri.set_ue_id(WILDCARD_INSTANCE_ID_WITH_WILDCARD_SERVICE_ID);
any_uri.set_ue_version_major(VERSION_MAJOR_WILDCARD);
any_uri.set_resource_id(RESOURCE_ID_WILDCARD);
return any_uri;
}(),
// sink_filter=
method);
if (result.has_value()) {
// If result is successful, extract the UTransport::ListenHandle and
// assign it to callback_handle_
callback_handle_ = std::move(result).value();
v1::UStatus status;
status.set_code(v1::UCode::OK);
return status;
}
return result.error();
}
} // namespace uprotocol::communication