RACStream

在ReactiveCocoa世界里面,万物皆为Stream,RACSignal和RACSequence都继承于RACStream,声明了一些抽象方法由子类去实现。

@implementation RACStream
...
#pragma mark Abstract methods

+ (instancetype)empty {
	return nil;
}

- (instancetype)bind:(RACStreamBindBlock (^)(void))block {
	return nil;
}

+ (instancetype)return:(id)value {
	return nil;
}

- (instancetype)concat:(RACStream *)stream {
	return nil;
}

- (instancetype)zipWith:(RACStream *)stream {
	return nil;
}
...
@end

也提供很多底层的操作方法。下面列举了一些常用的方法,更多方法请见RACStream.m,后续会有专题讲解部分的方法,别看这么多方法,其实只要掌握里面的原理,会发现有很多相似的地方。所谓万变不离其宗,就是这个理。

@implementation RACStream (Operations)
...
- (instancetype)flattenMap:(RACStream * (^)(id value))block {
	Class class = self.class;

	return [[self bind:^{
		return ^(id value, BOOL *stop) {
			id stream = block(value) ?: [class empty];
			NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

			return stream;
		};
	}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}

- (instancetype)flatten {
	__weak RACStream *stream __attribute__((unused)) = self;
	return [[self flattenMap:^(id value) {
		return value;
	}] setNameWithFormat:@"[%@] -flatten", self.name];
}

- (instancetype)map:(id (^)(id value))block {
	NSCParameterAssert(block != nil);

	Class class = self.class;
	
	return [[self flattenMap:^(id value) {
		return [class return:block(value)];
	}] setNameWithFormat:@"[%@] -map:", self.name];
}

- (instancetype)mapReplace:(id)object {
	return [[self map:^(id _) {
		return object;
	}] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, [object rac_description]];
}

- (instancetype)filter:(BOOL (^)(id value))block {
	NSCParameterAssert(block != nil);

	Class class = self.class;
	
	return [[self flattenMap:^ id (id value) {
		if (block(value)) {
			return [class return:value];
		} else {
			return class.empty;
		}
	}] setNameWithFormat:@"[%@] -filter:", self.name];
}

- (instancetype)reduceEach:(id (^)())reduceBlock {
	NSCParameterAssert(reduceBlock != nil);

	__weak RACStream *stream __attribute__((unused)) = self;
	return [[self map:^(RACTuple *t) {
		NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
		return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
	}] setNameWithFormat:@"[%@] -reduceEach:", self.name];
}

- (instancetype)skip:(NSUInteger)skipCount {
	Class class = self.class;
	
	return [[self bind:^{
		__block NSUInteger skipped = 0;

		return ^(id value, BOOL *stop) {
			if (skipped >= skipCount) return [class return:value];

			skipped++;
			return class.empty;
		};
	}] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
}

- (instancetype)take:(NSUInteger)count {
	Class class = self.class;
	
	if (count == 0) return class.empty;

	return [[self bind:^{
		__block NSUInteger taken = 0;

		return ^ id (id value, BOOL *stop) {
			if (taken < count) {
				++taken;
				if (taken == count) *stop = YES;
				return [class return:value];
			} else {
				return nil;
			}
		};
	}] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
}

+ (instancetype)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block {
	RACStream *current = nil;

	// Creates streams of successively larger tuples by combining the input
	// streams one-by-one.
	for (RACStream *stream in streams) {
		// For the first stream, just wrap its values in a RACTuple. That way,
		// if only one stream is given, the result is still a stream of tuples.
		if (current == nil) {
			current = [stream map:^(id x) {
				return RACTuplePack(x);
			}];

			continue;
		}

		current = block(current, stream);
	}

	if (current == nil) return [self empty];

	return [current map:^(RACTuple *xs) {
		// Right now, each value is contained in its own tuple, sorta like:
		//
		// (((1), 2), 3)
		//
		// We need to unwrap all the layers and create a tuple out of the result.
		NSMutableArray *values = [[NSMutableArray alloc] init];

		while (xs != nil) {
			[values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
			xs = (xs.count > 1 ? xs.first : nil);
		}

		return [RACTuple tupleWithObjectsFromArray:values];
	}];
}

+ (instancetype)zip:(id<NSFastEnumeration>)streams {
	return [[self join:streams block:^(RACStream *left, RACStream *right) {
		return [left zipWith:right];
	}] setNameWithFormat:@"+zip: %@", streams];
}

+ (instancetype)zip:(id<NSFastEnumeration>)streams reduce:(id (^)())reduceBlock {
	NSCParameterAssert(reduceBlock != nil);

	RACStream *result = [self zip:streams];

	// Although we assert this condition above, older versions of this method
	// supported this argument being nil. Avoid crashing Release builds of
	// apps that depended on that.
	if (reduceBlock != nil) result = [result reduceEach:reduceBlock];

	return [result setNameWithFormat:@"+zip: %@ reduce:", streams];
}

+ (instancetype)concat:(id<NSFastEnumeration>)streams {
	RACStream *result = self.empty;
	for (RACStream *stream in streams) {
		result = [result concat:stream];
	}

	return [result setNameWithFormat:@"+concat: %@", streams];
}
...
@end

RACSignal

RACSignal继承于RACStream,是一个类簇,包含有很多子类,具体如下:

  • RACSubject
  • RACDynamicSignal
  • RACEmptySignal
  • RACErrorSignal
  • RACReturnSignal
  • RACChannelTerminal

其中RACSubject,RACDynamicSignal用得比较多,后续会详细讲解

Demo

下面会用一个场景来描述RACSignal是怎么运作的。

- (void)createSignalDemo {
    //小明(signal),小明有个愿望,想吃水果(即block里面要做的事情)。
    //家里没有水果,所以只能去水果店。(即RAC适合处理异步的时间,例如网络请求等)。
    RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        //挑水果
        NSLog(@"小明挑选水果中...");
        NSString *fruit = [self selectFruit];
        
        //挑好水果,水果店老板(subscriber) 称重
        [subscriber sendNext:fruit];
        //水果店老板结算
        [subscriber sendCompleted];
        
        //水果店老板发现小明没带钱,想偷了就跑
        //complete和error是排斥的存在,只会触发一种。
        [subscriber sendError:nil];
        
        return nil;
    }];
    
    //去水果店是一个耗时的操作,(即signal可以在后续任意时间段subscribeNext)。
    //水果店里面有个老板,(即subscribeNext的时候就会创建一个subscriber)。
    //老板有些行为,针对不同情况,做不同的事(即subscriber持有下面这些block)。
    //到了水果店小明就挑水果(即subscribeNext后,会马上触发上边的block)。
    [signal subscribeNext:^(id x) {
        NSLog(@"老板称重%@", x);
    } error:^(NSError *error) {
        NSLog(@"报警");
    } completed:^{
        NSLog(@"结算");
    }];
}

- (NSString *)selectFruit {
    sleep(5);
    //假设小明挑了5秒钟,在众多水果中挑了苹果
    NSLog(@"小明挑选了apple...");
    return @"apple";
}

2019-05-21 19:33:07.985581+0800 ReactiveCocoaDemo[52190:8753946] 小明挑选水果中…

2019-05-21 19:33:12.986973+0800 ReactiveCocoaDemo[52190:8753946] 小明挑选了apple…

2019-05-21 19:33:12.987353+0800 ReactiveCocoaDemo[52190:8753946] 老板称重apple

2019-05-21 19:33:12.987594+0800 ReactiveCocoaDemo[52190:8753946] 结算

底层代码梳理

  1. 再调用构造方法createSignal的时候,其实是创建了RACDynamicSignal对象,并且持有didSubscribe这个block对象
@implementation RACSignal
...
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
	return [RACDynamicSignal createSignal:didSubscribe];
}
...
@end
  
@implementation RACDynamicSignal
...
#pragma mark Lifecycle
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
	RACDynamicSignal *signal = [[self alloc] init];
	signal->_didSubscribe = [didSubscribe copy];
	return [signal setNameWithFormat:@"+createSignal:"];
}
...
@end
  1. 调用subscribeNext的方法的时候,先创建了一个RACSubscriber对象,并持有nextBlock,errorBlock,completedBlock
@implementation RACSignal
...
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
	NSCParameterAssert(nextBlock != NULL);
	NSCParameterAssert(errorBlock != NULL);
	NSCParameterAssert(completedBlock != NULL);
	
	RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
	return [self subscribe:o];
}
...
@end
  1. 最终调用subscribe,每个子类都会重写这个方法。差异性就体现在这个方法。以下是RACDynamicSignal,基本上我们讨论RACSignal的都是这个类,在subscribe的时候,就会调用之前持有的didSubscribe,并把刚创建的RACSubscriber对象当作输入参数传入。
@implementation RACDynamicSignal
...
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
	NSCParameterAssert(subscriber != nil);

	RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
	subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

	if (self.didSubscribe != NULL) {
		RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
			RACDisposable *innerDisposable = self.didSubscribe(subscriber);
			[disposable addDisposable:innerDisposable];
		}];

		[disposable addDisposable:schedulingDisposable];
	}
	
	return disposable;
}
...
@end
  1. 最后RACSubscriber对象会调用sendNext,sendCompleted或者sendError。因为执行了sendError或者sendCompleted会嗲用[self.disposable dispose],所以这两个方法是互斥的。
@implementation RACSubscriber
...
- (void)sendNext:(id)value {
	@synchronized (self) {
		void (^nextBlock)(id) = [self.next copy];
		if (nextBlock == nil) return;

		nextBlock(value);
	}
}

- (void)sendError:(NSError *)e {
	@synchronized (self) {
		void (^errorBlock)(NSError *) = [self.error copy];
		[self.disposable dispose];

		if (errorBlock == nil) return;
		errorBlock(e);
	}
}

- (void)sendCompleted {
	@synchronized (self) {
		void (^completedBlock)(void) = [self.completed copy];
		[self.disposable dispose];

		if (completedBlock == nil) return;
		completedBlock();
	}
}
...
@end

RACSubject

RACSubject继承于RACSignal,既是信号监听者,也是信号触发器,也就是自导自演。

Demo

- (void)subjectDemo {
    RACSubject *subject = [RACSubject subject];
    //subject是一个简单的signal
    //每次subscribeNext的时候都会创建一个subscriber,subscriber会持有block,然后加入subscribers数组中
    [subject subscribeNext:^(id x) {
        NSLog(@"1--%@",x);
    } error:^(NSError *error) {
        NSLog(@"1--%@", error.localizedDescription);
    } completed:^{
        NSLog(@"1--completed");
    }];
    
    [subject subscribeNext:^(id x) {
        NSLog(@"2--%@",x);
    } error:^(NSError *error) {
        NSLog(@"2--%@", error.localizedDescription);
    } completed:^{
        NSLog(@"2--completed");
    }];
    
    //sendNext的时候,就会遍历subscribers数组,执行subscriber的block
    [subject sendNext:@"x"];
    [subject sendNext:@"y"];
    [subject sendError:[NSError errorWithDomain:@"1" code:0x11 userInfo:nil]];
}

2019-05-21 19:27:47.613451+0800 ReactiveCocoaDemo[52109:8748972] 1–x

2019-05-21 19:27:47.613560+0800 ReactiveCocoaDemo[52109:8748972] 2–x

2019-05-21 19:27:47.613631+0800 ReactiveCocoaDemo[52109:8748972] 1–y

2019-05-21 19:27:47.613707+0800 ReactiveCocoaDemo[52109:8748972] 2–y

2019-05-21 19:27:47.615153+0800 ReactiveCocoaDemo[52109:8748972] 1–The operation couldn’t be completed. (1 error 17.)

2019-05-21 19:27:47.615283+0800 ReactiveCocoaDemo[52109:8748972] 2–The operation couldn’t be completed. (1 error 17.)

底层代码梳理

  1. RACSubject继承于RACSignal,并实现RACSubscriber协议
@interface RACSubject : RACSignal <RACSubscriber>
/// Returns a new subject.
+ (instancetype)subject;
@end
  1. RACSubject是一个Signal,维护了一个数组subscribers,并且重写了subscribe方法,每次subscribeNext的时候都会往这个数组添加新的subscriber。
@implementation RACSubject
//重写subscribe方法,每次subscribeNext就会触发这个
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
	NSCParameterAssert(subscriber != nil);

	RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
	subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

	NSMutableArray *subscribers = self.subscribers;
  //把subscriber添加到数组中,并加锁防止多线程冲突
	@synchronized (subscribers) {
		[subscribers addObject:subscriber];
	}
	
	return [RACDisposable disposableWithBlock:^{
		@synchronized (subscribers) {
			// Since newer subscribers are generally shorter-lived, search
			// starting from the end of the list.
			NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
				return obj == subscriber;
			}];

			if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
		}
	}];
}
@end
  1. RACSubject实现了RACSubscriber协议,具备sendNext,sendError,sendCompletion等方法。在调用这几个方法的时候会遍历self.subscribers,然后会一一执行subscriber持有的sendNext
@implementation RACSubject
...
//遍历subscribers数组,执行响应的block
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
	NSArray *subscribers;
	@synchronized (self.subscribers) {
		subscribers = [self.subscribers copy];
	}

	for (id<RACSubscriber> subscriber in subscribers) {
		block(subscriber);
	}
}

- (void)sendNext:(id)value {
	[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
		[subscriber sendNext:value];
	}];
}

- (void)sendError:(NSError *)error {
	[self.disposable dispose];
	
	[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
		[subscriber sendError:error];
	}];
}

- (void)sendCompleted {
	[self.disposable dispose];
	
	[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
		[subscriber sendCompleted];
	}];
}
...
@end