Understanding the fundamental differences between push and pull models in RxGo and how backpressure works.
The Observable API in pkg/observable implements a push-based model:
import "github.com/droxer/RxGo/pkg/observable"
// Producer pushes data as fast as it can generate it
obs := observable.Just(1, 2, 3, 4, 5)
if err := obs.Subscribe(context.Background(), observable.NewSubscriber(
func(v int) { fmt.Printf("Received: %d\n", v) },
func() { fmt.Println("Completed") },
func(err error) { fmt.Printf("Error: %v\n", err) },
)); err != nil {
fmt.Printf("Subscription failed: %v\n", err)
}- Producer Controlled: The producer determines when data is emitted
- Immediate Delivery: Data is pushed to subscribers immediately
- No Backpressure: No mechanism for subscribers to control data flow
- Simple API: Easy to understand and use for basic scenarios
- Potential Issues: Can overwhelm slow consumers, leading to memory issues
The Reactive Streams API in pkg/streams implements a pull-based model with backpressure:
import "github.com/droxer/RxGo/pkg/streams"
// Subscriber requests data in controlled amounts
publisher := streams.NewCompliantRangePublisher(1, 1000)
publisher.Subscribe(context.Background(), &MyReactiveSubscriber{})With a custom subscriber that implements proper backpressure:
type MyReactiveSubscriber struct {
subscription streams.Subscription
}
func (s *MyReactiveSubscriber) OnSubscribe(sub streams.Subscription) {
s.subscription = sub
// Request only 10 items initially
sub.Request(10)
}
func (s *MyReactiveSubscriber) OnNext(value int) {
fmt.Printf("Received: %d
", value)
// Request one more item after processing
s.subscription.Request(1)
}
func (s *MyReactiveSubscriber) OnError(err error) {
fmt.Printf("Error: %v
", err)
}
func (s *MyReactiveSubscriber) OnComplete() {
fmt.Println("Completed")
}- Subscriber Controlled: Subscribers request data using
Request(n) - Backpressure Support: Publishers must respect subscriber demand
- Reactive Streams Compliant: Full specification compliance
- Production Ready: Suitable for systems with unbounded data streams
- Resource Control: Prevents memory exhaustion and thread starvation
The pull model supports multiple backpressure strategies:
Keeps all items in a bounded buffer:
config := streams.BackpressureConfig{
Strategy: streams.Buffer,
BufferSize: 100,
}
publisher := streams.NewBufferedPublisher[int](config, func(ctx context.Context, sub streams.Subscriber[int]) {
// Implementation here
})Discards new items when buffer is full:
config := streams.BackpressureConfig{
Strategy: streams.Drop,
BufferSize: 50,
}
publisher := streams.NewBufferedPublisher[int](config, func(ctx context.Context, sub streams.Subscriber[int]) {
// Implementation here
})Keeps only the latest item:
config := streams.BackpressureConfig{
Strategy: streams.Latest,
BufferSize: 1,
}
publisher := streams.NewBufferedPublisher[int](config, func(ctx context.Context, sub streams.Subscriber[int]) {
// Implementation here
})Signals an error when buffer overflows:
config := streams.BackpressureConfig{
Strategy: streams.Error,
BufferSize: 10,
}
publisher := streams.NewBufferedPublisher[int](config, func(ctx context.Context, sub streams.Subscriber[int]) {
// Implementation here
})- Building simple applications with predictable data rates
- Prototyping or learning reactive programming concepts
- Working with data sources that naturally emit at controlled rates
- Consumer can handle data at the producer's rate
- Building production systems with potentially unbounded data streams
- Need to handle producer/consumer speed mismatches
- Working with external data sources (network, file I/O, database)
- Requiring Reactive Streams 1.0.4 compliance
- Need for resource control to prevent memory exhaustion
| Aspect | Push Model (Observable) | Pull Model (Reactive Streams) |
|---|---|---|
| Control Flow | Producer controlled | Subscriber controlled |
| Backpressure | Not supported | Fully supported |
| Complexity | Simple API | More complex but powerful |
| Use Cases | Simple scenarios | Production systems |
| Compliance | None | Reactive Streams 1.0.4 |
| Resource Safety | Potential issues | Built-in protection |
- Start Simple: Use the Observable API for learning and simple use cases
- Move to Streams: Switch to Reactive Streams API for production systems
- Implement Proper Backpressure: Always request data in controlled amounts
- Handle Errors: Properly handle backpressure-related errors
- Monitor Performance: Watch for memory and CPU usage in high-volume scenarios