RACStream
在ReactiveCocoa世界里面,万物皆为Stream,RACSignal和RACSequence都继承于RACStream,声明了一些抽象方法由子类去实现。
@implementation RACStream
...
#pragma mark Abstract methods
+ (instancetype)empty {
return nil;
}
- (instancetype)bind:(RACStreamBindBlock (^)(void))block {
return nil;
}
+ (instancetype)return:(id)value {
return nil;
}
- (instancetype)concat:(RACStream *)stream {
return nil;
}
- (instancetype)zipWith:(RACStream *)stream {
return nil;
}
...
@end
也提供很多底层的操作方法。下面列举了一些常用的方法,更多方法请见RACStream.m,后续会有专题讲解部分的方法,别看这么多方法,其实只要掌握里面的原理,会发现有很多相似的地方。所谓万变不离其宗,就是这个理。
@implementation RACStream (Operations)
...
- (instancetype)flattenMap:(RACStream * (^)(id value))block {
Class class = self.class;
return [[self bind:^{
return ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}
- (instancetype)flatten {
__weak RACStream *stream __attribute__((unused)) = self;
return [[self flattenMap:^(id value) {
return value;
}] setNameWithFormat:@"[%@] -flatten", self.name];
}
- (instancetype)map:(id (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^(id value) {
return [class return:block(value)];
}] setNameWithFormat:@"[%@] -map:", self.name];
}
- (instancetype)mapReplace:(id)object {
return [[self map:^(id _) {
return object;
}] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, [object rac_description]];
}
- (instancetype)filter:(BOOL (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^ id (id value) {
if (block(value)) {
return [class return:value];
} else {
return class.empty;
}
}] setNameWithFormat:@"[%@] -filter:", self.name];
}
- (instancetype)reduceEach:(id (^)())reduceBlock {
NSCParameterAssert(reduceBlock != nil);
__weak RACStream *stream __attribute__((unused)) = self;
return [[self map:^(RACTuple *t) {
NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
}] setNameWithFormat:@"[%@] -reduceEach:", self.name];
}
- (instancetype)skip:(NSUInteger)skipCount {
Class class = self.class;
return [[self bind:^{
__block NSUInteger skipped = 0;
return ^(id value, BOOL *stop) {
if (skipped >= skipCount) return [class return:value];
skipped++;
return class.empty;
};
}] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
}
- (instancetype)take:(NSUInteger)count {
Class class = self.class;
if (count == 0) return class.empty;
return [[self bind:^{
__block NSUInteger taken = 0;
return ^ id (id value, BOOL *stop) {
if (taken < count) {
++taken;
if (taken == count) *stop = YES;
return [class return:value];
} else {
return nil;
}
};
}] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
}
+ (instancetype)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block {
RACStream *current = nil;
// Creates streams of successively larger tuples by combining the input
// streams one-by-one.
for (RACStream *stream in streams) {
// For the first stream, just wrap its values in a RACTuple. That way,
// if only one stream is given, the result is still a stream of tuples.
if (current == nil) {
current = [stream map:^(id x) {
return RACTuplePack(x);
}];
continue;
}
current = block(current, stream);
}
if (current == nil) return [self empty];
return [current map:^(RACTuple *xs) {
// Right now, each value is contained in its own tuple, sorta like:
//
// (((1), 2), 3)
//
// We need to unwrap all the layers and create a tuple out of the result.
NSMutableArray *values = [[NSMutableArray alloc] init];
while (xs != nil) {
[values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
xs = (xs.count > 1 ? xs.first : nil);
}
return [RACTuple tupleWithObjectsFromArray:values];
}];
}
+ (instancetype)zip:(id<NSFastEnumeration>)streams {
return [[self join:streams block:^(RACStream *left, RACStream *right) {
return [left zipWith:right];
}] setNameWithFormat:@"+zip: %@", streams];
}
+ (instancetype)zip:(id<NSFastEnumeration>)streams reduce:(id (^)())reduceBlock {
NSCParameterAssert(reduceBlock != nil);
RACStream *result = [self zip:streams];
// Although we assert this condition above, older versions of this method
// supported this argument being nil. Avoid crashing Release builds of
// apps that depended on that.
if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
return [result setNameWithFormat:@"+zip: %@ reduce:", streams];
}
+ (instancetype)concat:(id<NSFastEnumeration>)streams {
RACStream *result = self.empty;
for (RACStream *stream in streams) {
result = [result concat:stream];
}
return [result setNameWithFormat:@"+concat: %@", streams];
}
...
@end
RACSignal
RACSignal继承于RACStream,是一个类簇,包含有很多子类,具体如下:
- RACSubject
- RACDynamicSignal
- RACEmptySignal
- RACErrorSignal
- RACReturnSignal
- RACChannelTerminal
其中RACSubject,RACDynamicSignal用得比较多,后续会详细讲解
Demo
下面会用一个场景来描述RACSignal是怎么运作的。
- (void)createSignalDemo {
//小明(signal),小明有个愿望,想吃水果(即block里面要做的事情)。
//家里没有水果,所以只能去水果店。(即RAC适合处理异步的时间,例如网络请求等)。
RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
//挑水果
NSLog(@"小明挑选水果中...");
NSString *fruit = [self selectFruit];
//挑好水果,水果店老板(subscriber) 称重
[subscriber sendNext:fruit];
//水果店老板结算
[subscriber sendCompleted];
//水果店老板发现小明没带钱,想偷了就跑
//complete和error是排斥的存在,只会触发一种。
[subscriber sendError:nil];
return nil;
}];
//去水果店是一个耗时的操作,(即signal可以在后续任意时间段subscribeNext)。
//水果店里面有个老板,(即subscribeNext的时候就会创建一个subscriber)。
//老板有些行为,针对不同情况,做不同的事(即subscriber持有下面这些block)。
//到了水果店小明就挑水果(即subscribeNext后,会马上触发上边的block)。
[signal subscribeNext:^(id x) {
NSLog(@"老板称重%@", x);
} error:^(NSError *error) {
NSLog(@"报警");
} completed:^{
NSLog(@"结算");
}];
}
- (NSString *)selectFruit {
sleep(5);
//假设小明挑了5秒钟,在众多水果中挑了苹果
NSLog(@"小明挑选了apple...");
return @"apple";
}
2019-05-21 19:33:07.985581+0800 ReactiveCocoaDemo[52190:8753946] 小明挑选水果中…
2019-05-21 19:33:12.986973+0800 ReactiveCocoaDemo[52190:8753946] 小明挑选了apple…
2019-05-21 19:33:12.987353+0800 ReactiveCocoaDemo[52190:8753946] 老板称重apple
2019-05-21 19:33:12.987594+0800 ReactiveCocoaDemo[52190:8753946] 结算
底层代码梳理
- 再调用构造方法createSignal的时候,其实是创建了RACDynamicSignal对象,并且持有didSubscribe这个block对象
@implementation RACSignal
...
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
return [RACDynamicSignal createSignal:didSubscribe];
}
...
@end
@implementation RACDynamicSignal
...
#pragma mark Lifecycle
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
signal->_didSubscribe = [didSubscribe copy];
return [signal setNameWithFormat:@"+createSignal:"];
}
...
@end
- 调用subscribeNext的方法的时候,先创建了一个RACSubscriber对象,并持有nextBlock,errorBlock,completedBlock
@implementation RACSignal
...
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
NSCParameterAssert(nextBlock != NULL);
NSCParameterAssert(errorBlock != NULL);
NSCParameterAssert(completedBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
return [self subscribe:o];
}
...
@end
- 最终调用subscribe,每个子类都会重写这个方法。差异性就体现在这个方法。以下是RACDynamicSignal,基本上我们讨论RACSignal的都是这个类,在subscribe的时候,就会调用之前持有的didSubscribe,并把刚创建的RACSubscriber对象当作输入参数传入。
@implementation RACDynamicSignal
...
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
...
@end
- 最后RACSubscriber对象会调用sendNext,sendCompleted或者sendError。因为执行了sendError或者sendCompleted会嗲用[self.disposable dispose],所以这两个方法是互斥的。
@implementation RACSubscriber
...
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];
[self.disposable dispose];
if (errorBlock == nil) return;
errorBlock(e);
}
}
- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
[self.disposable dispose];
if (completedBlock == nil) return;
completedBlock();
}
}
...
@end
RACSubject
RACSubject继承于RACSignal,既是信号监听者,也是信号触发器,也就是自导自演。
Demo
- (void)subjectDemo {
RACSubject *subject = [RACSubject subject];
//subject是一个简单的signal
//每次subscribeNext的时候都会创建一个subscriber,subscriber会持有block,然后加入subscribers数组中
[subject subscribeNext:^(id x) {
NSLog(@"1--%@",x);
} error:^(NSError *error) {
NSLog(@"1--%@", error.localizedDescription);
} completed:^{
NSLog(@"1--completed");
}];
[subject subscribeNext:^(id x) {
NSLog(@"2--%@",x);
} error:^(NSError *error) {
NSLog(@"2--%@", error.localizedDescription);
} completed:^{
NSLog(@"2--completed");
}];
//sendNext的时候,就会遍历subscribers数组,执行subscriber的block
[subject sendNext:@"x"];
[subject sendNext:@"y"];
[subject sendError:[NSError errorWithDomain:@"1" code:0x11 userInfo:nil]];
}
2019-05-21 19:27:47.613451+0800 ReactiveCocoaDemo[52109:8748972] 1–x
2019-05-21 19:27:47.613560+0800 ReactiveCocoaDemo[52109:8748972] 2–x
2019-05-21 19:27:47.613631+0800 ReactiveCocoaDemo[52109:8748972] 1–y
2019-05-21 19:27:47.613707+0800 ReactiveCocoaDemo[52109:8748972] 2–y
2019-05-21 19:27:47.615153+0800 ReactiveCocoaDemo[52109:8748972] 1–The operation couldn’t be completed. (1 error 17.)
2019-05-21 19:27:47.615283+0800 ReactiveCocoaDemo[52109:8748972] 2–The operation couldn’t be completed. (1 error 17.)
底层代码梳理
- RACSubject继承于RACSignal,并实现RACSubscriber协议
@interface RACSubject : RACSignal <RACSubscriber>
/// Returns a new subject.
+ (instancetype)subject;
@end
- RACSubject是一个Signal,维护了一个数组subscribers,并且重写了subscribe方法,每次subscribeNext的时候都会往这个数组添加新的subscriber。
@implementation RACSubject
//重写subscribe方法,每次subscribeNext就会触发这个
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
NSMutableArray *subscribers = self.subscribers;
//把subscriber添加到数组中,并加锁防止多线程冲突
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
return [RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}];
}
@end
- RACSubject实现了RACSubscriber协议,具备sendNext,sendError,sendCompletion等方法。在调用这几个方法的时候会遍历self.subscribers,然后会一一执行subscriber持有的sendNext
@implementation RACSubject
...
//遍历subscribers数组,执行响应的block
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}
for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value];
}];
}
- (void)sendError:(NSError *)error {
[self.disposable dispose];
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendError:error];
}];
}
- (void)sendCompleted {
[self.disposable dispose];
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendCompleted];
}];
}
...
@end