Skip to content

Commit a779693

Browse files
authored
Merge pull request #58 from unixliang/master
fix consumer retry
2 parents 0032bde + 97fab5d commit a779693

9 files changed

Lines changed: 26 additions & 25 deletions

File tree

phxqueue/consumer/consumer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class Consumer : public comm::MultiProc {
7676
// Add items to Store.
7777
// Handling failed items requires retry.
7878
// Need to implement an RPC that corresponds to Store::Add().
79-
virtual comm::RetCode Add(const comm::proto::AddRequest &req, comm::proto::AddResponse &resp) = 0;
79+
virtual comm::RetCode Add(comm::proto::AddRequest &req, comm::proto::AddResponse &resp) = 0;
8080

8181
// Consumer reports its own machine load to the Scheduler, which, after statistics, returns the dynamic weight to the Consumer.
8282
// Depending on the dynamic weight, each Consumer uses a same algorithm to calculate which queues should be handled by themselves.

phxqueue/producer/batchhelper.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ comm::RetCode BatchTask::Process(bool is_timeout) {
151151
uint64_t time_wait_ms = now_timestamp_ms - start_timestamp_ms_;
152152

153153
comm::proto::AddRequest batch_req;
154+
comm::proto::AddResponse batch_resp;
154155

155156
for (auto &task : tasks_) {
156157
auto req = task->GetReq();
@@ -164,7 +165,7 @@ comm::RetCode BatchTask::Process(bool is_timeout) {
164165
}
165166
}
166167

167-
auto retcode = producer_->RawAdd(batch_req);
168+
auto retcode = producer_->RawAdd(batch_req, batch_resp);
168169

169170
comm::ProducerBP::GetThreadInstance()->OnBatchStat(batch_req, retcode, time_wait_ms, is_timeout);
170171
//printf("batch %d time_wait_ms %" PRIu64 " is_timeout %d\n", batch_req.items_size(), time_wait_ms, is_timeout);

phxqueue/producer/producer.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ comm::RetCode Producer::Enqueue(const int topic_id, const uint64_t uin, const in
163163
}
164164

165165
for (auto &&req : reqs) {
166-
if (comm::RetCode::RET_OK != (ret = SelectAndAdd(*req, nullptr, nullptr))) {
166+
comm::proto::AddResponse resp;
167+
if (comm::RetCode::RET_OK != (ret = SelectAndAdd(*req, resp, nullptr, nullptr))) {
167168
comm::ProducerBP::GetThreadInstance()->OnSelectAndAddFail(topic_id, pub_id, handle_id, uin);
168169
QLErr("SelectAndAdd client_id %s ret %d", client_id.c_str(), as_integer(ret));
169170
return ret;
@@ -297,7 +298,7 @@ comm::RetCode Producer::MakeAddRequests(const int topic_id,
297298
}
298299

299300

300-
comm::RetCode Producer::SelectAndAdd(comm::proto::AddRequest &req, StoreSelector *ss, QueueSelector *qs) {
301+
comm::RetCode Producer::SelectAndAdd(comm::proto::AddRequest &req, comm::proto::AddResponse &resp, StoreSelector *ss, QueueSelector *qs) {
301302
QLVerb("SelectAndAdd");
302303

303304
comm::ProducerBP::GetThreadInstance()->OnSelectAndAdd(req);
@@ -359,7 +360,7 @@ comm::RetCode Producer::SelectAndAdd(comm::proto::AddRequest &req, StoreSelector
359360
QLErr("BatchRawEnqueue ret %d store %d queue %d uin %" PRIu64, as_integer(ret), store_id, queue_id, uin);
360361
}
361362
} else {
362-
if (comm::RetCode::RET_OK != (ret = RawAdd(req))) {
363+
if (comm::RetCode::RET_OK != (ret = RawAdd(req, resp))) {
363364
comm::ProducerBP::GetThreadInstance()->OnRawAddFail(req);
364365
QLErr("RawEnqueue ret %d store %d queue %d uin %" PRIu64, as_integer(ret), store_id, queue_id, uin);
365366
}
@@ -374,7 +375,7 @@ comm::RetCode Producer::SelectAndAdd(comm::proto::AddRequest &req, StoreSelector
374375
return ret;
375376
}
376377

377-
comm::RetCode Producer::RawAdd(comm::proto::AddRequest &req) {
378+
comm::RetCode Producer::RawAdd(comm::proto::AddRequest &req, comm::proto::AddResponse &resp) {
378379
QLVerb("RawEnqueue");
379380

380381
comm::ProducerBP::GetThreadInstance()->OnRawAdd(req);
@@ -396,8 +397,6 @@ comm::RetCode Producer::RawAdd(comm::proto::AddRequest &req) {
396397
}
397398
if (queue_info->drop_all()) return comm::RetCode::RET_OK;
398399

399-
comm::proto::AddResponse resp;
400-
401400
BeforeAdd(req);
402401

403402
store::StoreMasterClient<comm::proto::AddRequest, comm::proto::AddResponse> store_master_client;

phxqueue/producer/producer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ class Producer {
5959

6060
// Process a batch add to Store.
6161
// Customize StoreSelector/QueueSelector can be specified to determine which store/queue to add.
62-
comm::RetCode SelectAndAdd(comm::proto::AddRequest &req,
62+
comm::RetCode SelectAndAdd(comm::proto::AddRequest &req, comm::proto::AddResponse &resp,
6363
StoreSelector *ss, QueueSelector *qs);
6464

6565
// Process a batch add to Store.
66-
comm::RetCode RawAdd(comm::proto::AddRequest &req);
66+
comm::RetCode RawAdd(comm::proto::AddRequest &req, comm::proto::AddResponse &resp);
6767

6868

6969
// ------------------------ Interfaces MUST be overrided ------------------------

phxqueue/test/simpleconsumer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ comm::RetCode SimpleConsumer::Get(const comm::proto::GetRequest &req,
4949
return comm::RetCode::RET_OK;
5050
}
5151

52-
comm::RetCode SimpleConsumer::Add(const comm::proto::AddRequest &req,
52+
comm::RetCode SimpleConsumer::Add(comm::proto::AddRequest &req,
5353
comm::proto::AddResponse &resp) {
5454
return comm::RetCode::RET_OK;
5555
}

phxqueue/test/simpleconsumer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class SimpleConsumer : public consumer::Consumer {
2929

3030
virtual comm::RetCode Get(const comm::proto::GetRequest &req,
3131
comm::proto::GetResponse &resp) override;
32-
virtual comm::RetCode Add(const comm::proto::AddRequest &req,
32+
virtual comm::RetCode Add(comm::proto::AddRequest &req,
3333
comm::proto::AddResponse &resp) override;
3434

3535
virtual comm::RetCode UncompressBuffer(const std::string &buffer,

phxqueue_phxrpc/consumer/consumer.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Unless required by applicable law or agreed to in writing, software distributed
2323
#include "phxqueue_phxrpc/app/lock/lock_client.h"
2424
#include "phxqueue_phxrpc/app/scheduler/scheduler_client.h"
2525
#include "phxqueue_phxrpc/app/store/store_client.h"
26+
#include "phxqueue_phxrpc/producer.h"
2627

2728

2829
namespace phxqueue_phxrpc {
@@ -61,13 +62,13 @@ Consumer::Get(const phxqueue::comm::proto::GetRequest &req,
6162
}
6263

6364
phxqueue::comm::RetCode
64-
Consumer::Add(const phxqueue::comm::proto::AddRequest &req,
65+
Consumer::Add(phxqueue::comm::proto::AddRequest &req,
6566
phxqueue::comm::proto::AddResponse &resp) {
66-
67-
static __thread StoreClient store_client;
68-
auto ret = store_client.ProtoAdd(req, resp);
67+
phxqueue::producer::ProducerOption opt;
68+
phxqueue_phxrpc::producer::Producer producer(opt);
69+
auto ret = producer.SelectAndAdd(req, resp, nullptr, nullptr);
6970
if (phxqueue::comm::RetCode::RET_OK != ret) {
70-
QLErr("ProtoAdd ret %d", phxqueue::comm::as_integer(ret));
71+
QLErr("Producer::SelectAndAdd ret %d", phxqueue::comm::as_integer(ret));
7172
}
7273
return ret;
7374
}

phxqueue_phxrpc/consumer/consumer.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,22 @@ class Consumer : public phxqueue::consumer::Consumer {
2929

3030
virtual phxqueue::comm::RetCode
3131
UncompressBuffer(const std::string &buffer, const int buffer_type,
32-
std::string &uncompressed_buffer);
32+
std::string &uncompressed_buffer) override;
3333
virtual void CompressBuffer(const std::string &buffer,
34-
std::string &compress_buffer, const int buffer_type);
34+
std::string &compress_buffer, const int buffer_type) override;
3535
virtual phxqueue::comm::RetCode
36-
Get(const phxqueue::comm::proto::GetRequest &req, phxqueue::comm::proto::GetResponse &resp);
36+
Get(const phxqueue::comm::proto::GetRequest &req, phxqueue::comm::proto::GetResponse &resp) override;
3737
virtual phxqueue::comm::RetCode
38-
Add(const phxqueue::comm::proto::AddRequest &req, phxqueue::comm::proto::AddResponse &resp);
38+
Add(phxqueue::comm::proto::AddRequest &req, phxqueue::comm::proto::AddResponse &resp) override;
3939
virtual phxqueue::comm::RetCode
4040
GetAddrScale(const phxqueue::comm::proto::GetAddrScaleRequest &req,
41-
phxqueue::comm::proto::GetAddrScaleResponse &resp);
41+
phxqueue::comm::proto::GetAddrScaleResponse &resp) override;
4242
virtual phxqueue::comm::RetCode
4343
GetLockInfo(const phxqueue::comm::proto::GetLockInfoRequest &req,
44-
phxqueue::comm::proto::GetLockInfoResponse &resp);
44+
phxqueue::comm::proto::GetLockInfoResponse &resp) override;
4545
virtual phxqueue::comm::RetCode
4646
AcquireLock(const phxqueue::comm::proto::AcquireLockRequest &req,
47-
phxqueue::comm::proto::AcquireLockResponse &resp);
47+
phxqueue::comm::proto::AcquireLockResponse &resp) override;
4848

4949
private:
5050
virtual void RestoreUserCookies(const phxqueue::comm::proto::Cookies &user_cookies) {}

phxqueue_phxrpc/test/consumer_main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ int main(int argc, char ** argv) {
8686
opt.proc_pid_path = config.GetProto().consumer().proc_pid_path();
8787
opt.lock_path_base = config.GetProto().consumer().lock_path_base();
8888
opt.use_store_master_client_on_get = 1;
89-
opt.use_store_master_client_on_add = 1;
89+
opt.use_store_master_client_on_add = 0;
9090
opt.shm_key_base = config.GetProto().consumer().shm_key_base();
9191

9292

0 commit comments

Comments
 (0)