Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions src/pubsub/src/publisher/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub(crate) enum ToDispatcher {
}

/// A command sent from the Dispatcher to a batch actor.
pub(crate) enum ToBatchActor {
enum ToBatchActor {
/// A request to publish a single message.
Publish(BundledMessage),
/// A request to flush all outstanding messages.
Expand Down Expand Up @@ -74,11 +74,7 @@ impl Dispatcher {
}
}

pub(crate) fn spawn_actor(
&mut self,
key: String,
tasks: &mut JoinMap<String, ()>,
) -> BatchActorHandle {
fn spawn_actor(&mut self, key: String, tasks: &mut JoinMap<String, ()>) -> BatchActorHandle {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
if key.is_empty() {
tasks.spawn(
Expand Down Expand Up @@ -206,20 +202,20 @@ impl Dispatcher {
}

#[derive(Debug)]
pub(crate) struct BatchActorHandle {
struct BatchActorHandle {
sender: mpsc::UnboundedSender<ToBatchActor>,
}

#[derive(Debug)]
pub(crate) struct BatchActorContext {
struct BatchActorContext {
topic: String,
client: GapicPublisher,
rx: mpsc::UnboundedReceiver<ToBatchActor>,
batching_options: BatchingOptions,
}

impl BatchActorContext {
pub(crate) fn new(
fn new(
topic: String,
client: GapicPublisher,
batching_options: BatchingOptions,
Expand All @@ -236,12 +232,12 @@ impl BatchActorContext {

/// A batch actor that sends batches concurrently.
#[derive(Debug)]
pub(crate) struct ConcurrentBatchActor {
struct ConcurrentBatchActor {
context: BatchActorContext,
}

impl ConcurrentBatchActor {
pub(crate) fn new(
fn new(
topic: String,
client: GapicPublisher,
batching_options: BatchingOptions,
Expand All @@ -267,7 +263,7 @@ impl ConcurrentBatchActor {
///
/// The loop terminates when the `rx` channel is closed, which happens when the
/// Dispatcher drops the Sender.
pub(crate) async fn run(mut self) {
async fn run(mut self) {
// We have multiple inflight batches concurrently.
let mut inflight = JoinSet::new();
let mut batch = Batch::new(
Expand Down Expand Up @@ -308,7 +304,7 @@ impl ConcurrentBatchActor {
}

// Flush the pending batch if it's not empty.
pub(crate) fn flush(&mut self, inflight: &mut JoinSet<crate::Result<()>>, batch: &mut Batch) {
fn flush(&mut self, inflight: &mut JoinSet<crate::Result<()>>, batch: &mut Batch) {
if !batch.is_empty() {
batch.flush(
self.context.client.clone(),
Expand All @@ -320,7 +316,7 @@ impl ConcurrentBatchActor {

// Move message to the pending batch respecting batch thresholds
// and flush the batch if it is full.
pub(crate) fn add_msg_and_flush(
fn add_msg_and_flush(
&mut self,
inflight: &mut JoinSet<crate::Result<()>>,
batch: &mut Batch,
Expand All @@ -338,14 +334,14 @@ impl ConcurrentBatchActor {

/// A batch actor that sends batches sequentially by awaiting on the previous batch.
#[derive(Debug)]
pub(crate) struct SequentialBatchActor {
struct SequentialBatchActor {
context: BatchActorContext,
pending_msgs: VecDeque<BundledMessage>,
paused: bool,
}

impl SequentialBatchActor {
pub(crate) fn new(
fn new(
topic: String,
client: GapicPublisher,
batching_options: BatchingOptions,
Expand Down Expand Up @@ -376,7 +372,7 @@ impl SequentialBatchActor {
///
/// The loop terminates when the `rx` channel is closed, which happens when the
/// Dispatcher drops the Sender.
pub(crate) async fn run(mut self) {
async fn run(mut self) {
// While it is possible to use Some(JoinHandle) here as there is at max
// a single inflight task at any given time, the use of JoinSet
// simplify the managing the inflight JoinHandle.
Expand Down Expand Up @@ -465,7 +461,7 @@ impl SequentialBatchActor {

// Move message to the pending batch respecting batch thresholds
// and flush the batch if it is full.
pub(crate) fn move_to_batch_and_flush(
fn move_to_batch_and_flush(
&mut self,
inflight: &mut JoinSet<crate::Result<()>>,
batch: &mut Batch,
Expand Down Expand Up @@ -497,7 +493,7 @@ impl SequentialBatchActor {
}

// Pause publish operations.
pub(crate) fn pause(&mut self) {
fn pause(&mut self) {
self.paused = true;
while let Some(publish) = self.pending_msgs.pop_front() {
// The user may have dropped the handle, so it is ok if this fails.
Expand All @@ -507,7 +503,7 @@ impl SequentialBatchActor {
}
}

pub(crate) fn handle_inflight_join(
fn handle_inflight_join(
&mut self,
join_next_option: Option<Result<crate::Result<()>, tokio::task::JoinError>>,
) {
Expand Down
4 changes: 2 additions & 2 deletions src/pubsub/src/subscriber/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,13 @@ impl ExactlyOnce {
///
/// Note that the acknowledgement is best effort. The message may still be
/// redelivered to this client, or another client.
pub(crate) fn ack(mut self) {
fn ack(mut self) {
if let Some(inner) = self.inner.take() {
inner.ack();
}
}

pub(crate) fn nack(mut self) {
fn nack(mut self) {
if let Some(inner) = self.inner.take() {
inner.nack();
}
Expand Down
Loading