diff --git a/src/pubsub/src/publisher/actor.rs b/src/pubsub/src/publisher/actor.rs index ecdfc8c5d6..fb6c42c412 100644 --- a/src/pubsub/src/publisher/actor.rs +++ b/src/pubsub/src/publisher/actor.rs @@ -31,7 +31,7 @@ pub(crate) enum ToDispatcher { } /// A command sent from the Dispatcher to a batch actor. -pub(crate) enum ToBatchActor { +enum ToBatchActor { /// A request to publish a single message. Publish(BundledMessage), /// A request to flush all outstanding messages. @@ -74,11 +74,7 @@ impl Dispatcher { } } - pub(crate) fn spawn_actor( - &mut self, - key: String, - tasks: &mut JoinMap, - ) -> BatchActorHandle { + fn spawn_actor(&mut self, key: String, tasks: &mut JoinMap) -> BatchActorHandle { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); if key.is_empty() { tasks.spawn( @@ -206,12 +202,12 @@ impl Dispatcher { } #[derive(Debug)] -pub(crate) struct BatchActorHandle { +struct BatchActorHandle { sender: mpsc::UnboundedSender, } #[derive(Debug)] -pub(crate) struct BatchActorContext { +struct BatchActorContext { topic: String, client: GapicPublisher, rx: mpsc::UnboundedReceiver, @@ -219,7 +215,7 @@ pub(crate) struct BatchActorContext { } impl BatchActorContext { - pub(crate) fn new( + fn new( topic: String, client: GapicPublisher, batching_options: BatchingOptions, @@ -236,12 +232,12 @@ impl BatchActorContext { /// A batch actor that sends batches concurrently. #[derive(Debug)] -pub(crate) struct ConcurrentBatchActor { +struct ConcurrentBatchActor { context: BatchActorContext, } impl ConcurrentBatchActor { - pub(crate) fn new( + fn new( topic: String, client: GapicPublisher, batching_options: BatchingOptions, @@ -267,7 +263,7 @@ impl ConcurrentBatchActor { /// /// The loop terminates when the `rx` channel is closed, which happens when the /// Dispatcher drops the Sender. - pub(crate) async fn run(mut self) { + async fn run(mut self) { // We have multiple inflight batches concurrently. let mut inflight = JoinSet::new(); let mut batch = Batch::new( @@ -308,7 +304,7 @@ impl ConcurrentBatchActor { } // Flush the pending batch if it's not empty. - pub(crate) fn flush(&mut self, inflight: &mut JoinSet>, batch: &mut Batch) { + fn flush(&mut self, inflight: &mut JoinSet>, batch: &mut Batch) { if !batch.is_empty() { batch.flush( self.context.client.clone(), @@ -320,7 +316,7 @@ impl ConcurrentBatchActor { // Move message to the pending batch respecting batch thresholds // and flush the batch if it is full. - pub(crate) fn add_msg_and_flush( + fn add_msg_and_flush( &mut self, inflight: &mut JoinSet>, batch: &mut Batch, @@ -338,14 +334,14 @@ impl ConcurrentBatchActor { /// A batch actor that sends batches sequentially by awaiting on the previous batch. #[derive(Debug)] -pub(crate) struct SequentialBatchActor { +struct SequentialBatchActor { context: BatchActorContext, pending_msgs: VecDeque, paused: bool, } impl SequentialBatchActor { - pub(crate) fn new( + fn new( topic: String, client: GapicPublisher, batching_options: BatchingOptions, @@ -376,7 +372,7 @@ impl SequentialBatchActor { /// /// The loop terminates when the `rx` channel is closed, which happens when the /// Dispatcher drops the Sender. - pub(crate) async fn run(mut self) { + async fn run(mut self) { // While it is possible to use Some(JoinHandle) here as there is at max // a single inflight task at any given time, the use of JoinSet // simplify the managing the inflight JoinHandle. @@ -465,7 +461,7 @@ impl SequentialBatchActor { // Move message to the pending batch respecting batch thresholds // and flush the batch if it is full. - pub(crate) fn move_to_batch_and_flush( + fn move_to_batch_and_flush( &mut self, inflight: &mut JoinSet>, batch: &mut Batch, @@ -497,7 +493,7 @@ impl SequentialBatchActor { } // Pause publish operations. - pub(crate) fn pause(&mut self) { + fn pause(&mut self) { self.paused = true; while let Some(publish) = self.pending_msgs.pop_front() { // The user may have dropped the handle, so it is ok if this fails. @@ -507,7 +503,7 @@ impl SequentialBatchActor { } } - pub(crate) fn handle_inflight_join( + fn handle_inflight_join( &mut self, join_next_option: Option, tokio::task::JoinError>>, ) { diff --git a/src/pubsub/src/subscriber/handler.rs b/src/pubsub/src/subscriber/handler.rs index b498acfea9..850dc32adf 100644 --- a/src/pubsub/src/subscriber/handler.rs +++ b/src/pubsub/src/subscriber/handler.rs @@ -352,13 +352,13 @@ impl ExactlyOnce { /// /// Note that the acknowledgement is best effort. The message may still be /// redelivered to this client, or another client. - pub(crate) fn ack(mut self) { + fn ack(mut self) { if let Some(inner) = self.inner.take() { inner.ack(); } } - pub(crate) fn nack(mut self) { + fn nack(mut self) { if let Some(inner) = self.inner.take() { inner.nack(); }