Welcome, RxDart experts! You've conquered the basics, mastered the intermediate concepts, and now you're ready to push the boundaries of what's possible with reactive programming in Flutter. This advanced guide will explore cutting-edge techniques, performance optimizations, and best practices that will elevate your RxDart skills to the next level. We'll delve into complex stream manipulations, advanced error handling, custom operators, and architectural patterns that leverage the full power of reactive programming.
- Advanced Stream Manipulation
1.1 Backpressure Handling: Backpressure occurs when an Observable produces items faster than they can be consumed. Let's explore strategies to handle this:
a) Buffer:
final fastStream = Stream.periodic(Duration(milliseconds: 10), (i) => i).take(100);
fastStream
.buffer(Stream.periodic(Duration(milliseconds: 50)))
.listen(print);
// Output will be lists of integers, buffered every 50ms
b) Window:
final source = Stream.periodic(Duration(milliseconds: 100), (i) => i).take(10);
source
.window(Stream.periodic(Duration(milliseconds: 250)))
.asyncMap((window) => window.toList())
.listen(print);
// Output will be lists of integers, windowed every 250ms
c) Sample:
final source = Stream.periodic(Duration(milliseconds: 10), (i) => i);
source
.sample(Stream.periodic(Duration(milliseconds: 100)))
.listen(print);
// Output will be the latest value every 100ms
1.2 Advanced Combining: Let's explore more complex ways to combine streams:
a) WithLatestFrom:
final source = Stream.periodic(Duration(milliseconds: 100), (i) => 'Source $i').take(5);
final other = Stream.periodic(Duration(milliseconds: 150), (i) => 'Other $i');
source
.withLatestFrom(other, (s, o) => '$s - $o')
.listen(print);
// Output combines latest values from both streams
b) ExhaustMap:
final clicks = PublishSubject<void>();
final exhausted = clicks.exhaustMap((_) =>
Stream.periodic(Duration(seconds: 1), (i) => i).take(3)
);
clicks.add(null); // Start the first stream
Timer(Duration(milliseconds: 500), () => clicks.add(null)); // This click is ignored
Timer(Duration(seconds: 4), () => clicks.add(null)); // This starts a new stream
exhausted.listen(print);
// Output: 0, 1, 2, (pause), 0, 1, 2
- Custom Operators and Extensions
Creating custom operators allows you to encapsulate complex logic and extend RxDart's functionality:
extension AdvancedOperators<T> on Stream<T> {
Stream<R> pairwiseStartWith<R>(T initial, R Function(T previous, T current) mapper) {
return this.startWith(initial).pairwise().map((pair) => mapper(pair.first, pair.last));
}
Stream<T> whereType<R>() => where((event) => event is R).cast<T>();
Stream<T> tapLog([String prefix = '']) =>
tap(
onData: (data) => print('$prefix Data: $data'),
onError: (e, s) => print('$prefix Error: $e'),
onDone: () => print('$prefix Done')
);
}
// Usage
final numbers = PublishSubject<num>();
numbers
.pairwiseStartWith(0, (prev, curr) => curr - prev)
.whereType<int>()
.tapLog('Diff:')
.listen(null);
numbers.add(5);
numbers.add(10);
numbers.add(7);
numbers.add(12.5);
// Output:
// Diff: Data: 5
// Diff: Data: 5
// Diff: Data: -3
// Diff: Data: 5
- Advanced Error Handling and Recovery
3.1 Retrying with Exponential Backoff:
Stream<T> retryWithExponentialBackoff<T>(
Stream<T> Function() streamFactory, {
int maxAttempts = 3,
Duration initialDelay = const Duration(seconds: 1),
}) {
return Observable.defer(() {
int attempts = 0;
return Observable.retry(
() => streamFactory().doOnError((e, s) {
attempts++;
if (attempts >= maxAttempts) throw e;
}),
retryWhen: (errors) => errors.flatMap((e) {
final delay = initialDelay * pow(2, attempts - 1);
print('Retrying after $delay');
return Observable.timer(null, delay);
}),
);
});
}
// Usage
final unreliableStream = Stream<int>.periodic(Duration(seconds: 1), (i) => i)
.map((i) => i == 2 ? throw Exception('Oops!') : i)
.take(5);
retryWithExponentialBackoff(() => unreliableStream)
.listen(print, onError: (e) => print('Error: $e'));
// Output: 0, 1, (error and retry), 0, 1, 3, 4
3.2 Circuit Breaker Pattern:
class CircuitBreaker<T> {
final Stream<T> Function() _streamFactory;
final Duration _resetTimeout;
final int _failureThreshold;
int _failureCount = 0;
bool _isOpen = false;
DateTime? _lastFailure;
CircuitBreaker(this._streamFactory,
{Duration resetTimeout = const Duration(seconds: 60),
int failureThreshold = 3})
: _resetTimeout = resetTimeout,
_failureThreshold = failureThreshold;
Stream<T> stream() {
return Observable.defer(() {
if (_isOpen) {
if (DateTime.now().difference(_lastFailure!) > _resetTimeout) {
_isOpen = false;
_failureCount = 0;
} else {
throw CircuitBreakerOpenException();
}
}
return _streamFactory().doOnError((e, s) {
_failureCount++;
if (_failureCount >= _failureThreshold) {
_isOpen = true;
_lastFailure = DateTime.now();
}
});
});
}
}
// Usage
final unreliableService = CircuitBreaker(() =>
Observable.timer(null, Duration(seconds: 1))
.flatMap((_) => Random().nextBool()
? Stream.value('Success')
: Stream.error(Exception('Service failed')))
);
for (var i = 0; i < 10; i++) {
await Future.delayed(Duration(seconds: 2));
unreliableService.stream().listen(
print,
onError: (e) => print('Error: $e'),
);
}
- Advanced Architectural Patterns
4.1 Redux-style State Management with RxDart:
// State
class AppState {
final int counter;
AppState(this.counter);
}
// Actions
abstract class Action {}
class IncrementAction extends Action {}
class DecrementAction extends Action {}
// Reducer
AppState reducer(AppState state, Action action) {
if (action is IncrementAction) {
return AppState(state.counter + 1);
} else if (action is DecrementAction) {
return AppState(state.counter - 1);
}
return state;
}
// Store
class Store {
final BehaviorSubject<AppState> _state;
final PublishSubject<Action> _actions;
Store(AppState initialState) :
_state = BehaviorSubject.seeded(initialState),
_actions = PublishSubject<Action>() {
_actions
.scan(reducer, initialState)
.listen(_state.add);
}
Stream<AppState> get state => _state.stream;
Function(Action) get dispatch => _actions.add;
void dispose() {
_state.close();
_actions.close();
}
}
// Usage
final store = Store(AppState(0));
store.state.listen((state) => print('Counter: ${state.counter}'));
store.dispatch(IncrementAction());
store.dispatch(IncrementAction());
store.dispatch(DecrementAction());
4.2 Reactive Clean Architecture:
// Domain Layer
abstract class Repository {
Stream<List<Item>> getItems();
Future<void> addItem(Item item);
}
class Item {
final String id;
final String name;
Item(this.id, this.name);
}
// Data Layer
class ApiRepository implements Repository {
final BehaviorSubject<List<Item>> _items = BehaviorSubject.seeded([]);
@override
Stream<List<Item>> getItems() => _items.stream;
@override
Future<void> addItem(Item item) async {
final currentItems = _items.value;
_items.add([...currentItems, item]);
}
}
// Presentation Layer
class ItemsBloc {
final Repository _repository;
final PublishSubject<void> _refresh = PublishSubject<void>();
ItemsBloc(this._repository) {
_refresh
.startWith(null)
.switchMap((_) => _repository.getItems())
.listen(_itemsSubject.add);
}
final BehaviorSubject<List<Item>> _itemsSubject = BehaviorSubject<List<Item>>();
Stream<List<Item>> get items => _itemsSubject.stream;
void refresh() => _refresh.add(null);
Future<void> addItem(String name) =>
_repository.addItem(Item(DateTime.now().toString(), name));
void dispose() {
_refresh.close();
_itemsSubject.close();
}
}
// Usage
final repository = ApiRepository();
final bloc = ItemsBloc(repository);
bloc.items.listen((items) => print('Items: $items'));
bloc.addItem('First Item');
bloc.addItem('Second Item');
bloc.refresh();
- Testing Advanced RxDart Code
Testing complex reactive code requires special techniques:
import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';
void main() {
group('Advanced RxDart Testing', () {
test('Test custom operator', () {
final source = PublishSubject<int>();
final result = source.pairwiseStartWith(0, (prev, curr) => curr - prev);
expectLater(result, emitsInOrder([5, 3, -1, 4]));
source.add(5);
source.add(8);
source.add(7);
source.add(11);
source.close();
});
test('Test CircuitBreaker', () {
int callCount = 0;
final breaker = CircuitBreaker(() {
callCount++;
return callCount <= 3
? Stream.error(Exception('Error'))
: Stream.value('Success');
}, resetTimeout: Duration(milliseconds: 100), failureThreshold: 3);
expectLater(
Stream.periodic(Duration(milliseconds: 50))
.take(10)
.asyncMap((_) => breaker.stream().first.catchError((e) => e.toString())),
emitsInOrder([
isA<String>().having((s) => s.contains('Error'), 'is error', true),
isA<String>().having((s) => s.contains('Error'), 'is error', true),
isA<String>().having((s) => s.contains('Error'), 'is error', true),
isA<String>().having((s) => s.contains('CircuitBreakerOpenException'), 'is open', true),
isA<String>().having((s) => s.contains('CircuitBreakerOpenException'), 'is open', true),
'Success',
'Success',
'Success',
'Success',
'Success',
]),
);
});
});
}
Congratulations on reaching this advanced level of RxDart mastery! We've explored complex stream manipulations, created custom operators, implemented advanced error handling strategies, and designed scalable reactive architectures. These techniques represent the cutting edge of reactive programming in Flutter.
Remember, with great power comes great responsibility. As you implement these advanced patterns, always consider the trade-offs in terms of complexity, performance, and maintainability. Strive for clean, readable code even when working with complex reactive systems.
Happy coding, and may your streams flow ever smoothly!
@gideonsalamii - X (formerly twitter)