|
15 | 15 | // specific language governing permissions and limitations |
16 | 16 | // under the License. |
17 | 17 |
|
| 18 | +#include <chrono> |
18 | 19 | #include <cstdint> |
19 | 20 | #include <string> |
| 21 | +#include <thread> |
20 | 22 |
|
21 | 23 | #include <gtest/gtest.h> |
22 | 24 |
|
@@ -906,3 +908,177 @@ TEST(LowLevelE2E_Message, PollMessagesAfterStreamDeleted) { |
906 | 908 |
|
907 | 909 | ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); |
908 | 910 | } |
| 911 | + |
| 912 | +TEST(LowLevelE2E_Message, PollMessagesWithInvalidPartitionId) { |
| 913 | + RecordProperty("description", "Throws when polling with a non-existent partition ID."); |
| 914 | + const std::string stream_name = "cpp-msg-invalid-partition"; |
| 915 | + iggy::ffi::Client *client = login_to_server(); |
| 916 | + ASSERT_NE(client, nullptr); |
| 917 | + |
| 918 | + client->create_stream(stream_name); |
| 919 | + auto stream = client->get_stream(make_string_identifier(stream_name)); |
| 920 | + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, |
| 921 | + "server_default"); |
| 922 | + |
| 923 | + ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 9999, "consumer", |
| 924 | + make_numeric_identifier(1), "offset", 0, 10, false), |
| 925 | + std::exception); |
| 926 | + |
| 927 | + client->delete_stream(make_numeric_identifier(stream.id)); |
| 928 | + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); |
| 929 | +} |
| 930 | + |
| 931 | +TEST(LowLevelE2E_Message, PollMessagesWithCountZero) { |
| 932 | + RecordProperty("description", "Verifies polling with count=0 returns zero messages successfully."); |
| 933 | + const std::string stream_name = "cpp-msg-count-zero"; |
| 934 | + iggy::ffi::Client *client = login_to_server(); |
| 935 | + ASSERT_NE(client, nullptr); |
| 936 | + |
| 937 | + client->create_stream(stream_name); |
| 938 | + auto stream = client->get_stream(make_string_identifier(stream_name)); |
| 939 | + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, |
| 940 | + "server_default"); |
| 941 | + |
| 942 | + rust::Vec<iggy::ffi::Message> messages; |
| 943 | + for (std::uint32_t i = 0; i < 5; i++) { |
| 944 | + iggy::ffi::Message msg; |
| 945 | + msg.new_message(to_payload("msg-" + std::to_string(i))); |
| 946 | + messages.push_back(std::move(msg)); |
| 947 | + } |
| 948 | + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", |
| 949 | + partition_id_bytes(0), std::move(messages)); |
| 950 | + |
| 951 | + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", |
| 952 | + make_numeric_identifier(1), "offset", 0, 0, false); |
| 953 | + |
| 954 | + ASSERT_EQ(polled.count, 0u); |
| 955 | + ASSERT_EQ(polled.messages.size(), 0u); |
| 956 | + |
| 957 | + client->delete_stream(make_numeric_identifier(stream.id)); |
| 958 | + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); |
| 959 | +} |
| 960 | + |
| 961 | +TEST(LowLevelE2E_Message, PollMessagesWithoutSpecifyingPartition) { |
| 962 | + RecordProperty("description", |
| 963 | + "Verifies polling with partition_id=u32::MAX defaults to partition 0 and returns messages."); |
| 964 | + const std::string stream_name = "cpp-msg-no-partition"; |
| 965 | + iggy::ffi::Client *client = login_to_server(); |
| 966 | + ASSERT_NE(client, nullptr); |
| 967 | + |
| 968 | + client->create_stream(stream_name); |
| 969 | + auto stream = client->get_stream(make_string_identifier(stream_name)); |
| 970 | + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, |
| 971 | + "server_default"); |
| 972 | + |
| 973 | + rust::Vec<iggy::ffi::Message> messages; |
| 974 | + for (std::uint32_t i = 0; i < 5; i++) { |
| 975 | + iggy::ffi::Message msg; |
| 976 | + msg.new_message(to_payload("msg-" + std::to_string(i))); |
| 977 | + messages.push_back(std::move(msg)); |
| 978 | + } |
| 979 | + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", |
| 980 | + partition_id_bytes(0), std::move(messages)); |
| 981 | + |
| 982 | + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), UINT32_MAX, |
| 983 | + "consumer", make_numeric_identifier(1), "offset", 0, 100, false); |
| 984 | + |
| 985 | + ASSERT_EQ(polled.count, 5u); |
| 986 | + ASSERT_EQ(polled.messages.size(), 5u); |
| 987 | + |
| 988 | + client->delete_stream(make_numeric_identifier(stream.id)); |
| 989 | + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); |
| 990 | +} |
| 991 | + |
| 992 | +TEST(LowLevelE2E_Message, PollMessagesTimestampStrategy) { |
| 993 | + RecordProperty("description", |
| 994 | + "Verifies timestamp polling strategy returns messages with timestamp >= the specified value."); |
| 995 | + const std::string stream_name = "cpp-msg-timestamp-strategy"; |
| 996 | + iggy::ffi::Client *client = login_to_server(); |
| 997 | + ASSERT_NE(client, nullptr); |
| 998 | + |
| 999 | + client->create_stream(stream_name); |
| 1000 | + auto stream = client->get_stream(make_string_identifier(stream_name)); |
| 1001 | + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, |
| 1002 | + "server_default"); |
| 1003 | + |
| 1004 | + rust::Vec<iggy::ffi::Message> batch1; |
| 1005 | + for (std::uint32_t i = 0; i < 5; i++) { |
| 1006 | + iggy::ffi::Message msg; |
| 1007 | + msg.new_message(to_payload("batch1-" + std::to_string(i))); |
| 1008 | + batch1.push_back(std::move(msg)); |
| 1009 | + } |
| 1010 | + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", |
| 1011 | + partition_id_bytes(0), std::move(batch1)); |
| 1012 | + |
| 1013 | + std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
| 1014 | + |
| 1015 | + rust::Vec<iggy::ffi::Message> batch2; |
| 1016 | + for (std::uint32_t i = 0; i < 5; i++) { |
| 1017 | + iggy::ffi::Message msg; |
| 1018 | + msg.new_message(to_payload("batch2-" + std::to_string(i))); |
| 1019 | + batch2.push_back(std::move(msg)); |
| 1020 | + } |
| 1021 | + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", |
| 1022 | + partition_id_bytes(0), std::move(batch2)); |
| 1023 | + |
| 1024 | + auto all = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", |
| 1025 | + make_numeric_identifier(1), "offset", 0, 100, false); |
| 1026 | + ASSERT_EQ(all.count, 10u); |
| 1027 | + |
| 1028 | + std::uint64_t batch2_timestamp = all.messages[5].timestamp; |
| 1029 | + ASSERT_GT(batch2_timestamp, all.messages[0].timestamp); |
| 1030 | + |
| 1031 | + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", |
| 1032 | + make_numeric_identifier(2), "timestamp", batch2_timestamp, 100, false); |
| 1033 | + |
| 1034 | + ASSERT_GE(polled.count, 5u); |
| 1035 | + for (std::size_t i = 0; i < polled.messages.size(); i++) { |
| 1036 | + EXPECT_GE(polled.messages[i].timestamp, batch2_timestamp) |
| 1037 | + << "Message at index " << i << " has earlier timestamp"; |
| 1038 | + } |
| 1039 | + |
| 1040 | + client->delete_stream(make_numeric_identifier(stream.id)); |
| 1041 | + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); |
| 1042 | +} |
| 1043 | + |
| 1044 | +TEST(LowLevelE2E_Message, PollMessagesMonotonicOffsets) { |
| 1045 | + RecordProperty("description", |
| 1046 | + "Verifies offsets are monotonically increasing and continuous across multiple polls."); |
| 1047 | + const std::string stream_name = "cpp-msg-monotonic-offsets"; |
| 1048 | + iggy::ffi::Client *client = login_to_server(); |
| 1049 | + ASSERT_NE(client, nullptr); |
| 1050 | + |
| 1051 | + client->create_stream(stream_name); |
| 1052 | + auto stream = client->get_stream(make_string_identifier(stream_name)); |
| 1053 | + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, |
| 1054 | + "server_default"); |
| 1055 | + |
| 1056 | + rust::Vec<iggy::ffi::Message> messages; |
| 1057 | + for (std::uint32_t i = 0; i < 20; i++) { |
| 1058 | + iggy::ffi::Message msg; |
| 1059 | + msg.new_message(to_payload("mono-" + std::to_string(i))); |
| 1060 | + messages.push_back(std::move(msg)); |
| 1061 | + } |
| 1062 | + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", |
| 1063 | + partition_id_bytes(0), std::move(messages)); |
| 1064 | + |
| 1065 | + std::uint64_t expected_offset = 0; |
| 1066 | + for (int chunk = 0; chunk < 4; chunk++) { |
| 1067 | + auto polled = |
| 1068 | + client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", |
| 1069 | + make_numeric_identifier(1), "offset", expected_offset, 5, false); |
| 1070 | + |
| 1071 | + ASSERT_EQ(polled.count, 5u) << "Chunk " << chunk; |
| 1072 | + ASSERT_EQ(polled.messages.size(), 5u) << "Chunk " << chunk; |
| 1073 | + |
| 1074 | + for (std::size_t i = 0; i < polled.messages.size(); i++) { |
| 1075 | + EXPECT_EQ(polled.messages[i].offset, expected_offset) << "Chunk " << chunk << " index " << i; |
| 1076 | + expected_offset++; |
| 1077 | + } |
| 1078 | + } |
| 1079 | + |
| 1080 | + ASSERT_EQ(expected_offset, 20u); |
| 1081 | + |
| 1082 | + client->delete_stream(make_numeric_identifier(stream.id)); |
| 1083 | + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); |
| 1084 | +} |
0 commit comments