看merge之前,我们先看一下flatten
flatten
//以下是merge另外的一种实现
- (void)flattenDemo {
RACSubject *letters = [RACSubject subject];
RACSubject *numbers = [RACSubject subject];
//创建信号数组,当flattened信号subscribeNext的时候,会在其内部触发signalOfSignals的subscribeNext
//当[subscriber sendNext:letters];时,会把letters这个signal传递到里面去,会添加到flattened的一个数组中,并建立letters的subscribeNext
RACSignal *signalOfSignals = [RACSignal createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
[subscriber sendNext:letters];
[subscriber sendNext:numbers];
[subscriber sendCompleted];
return nil;
}];
//flatten主要是打平,降维
//flatten的时候,创建了一个新的signal,在新的signal里面监听了信号数组,即
/*[RACSignal createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
[signalOfSignals subscribeNext:^(id x) {
addSignal(signal);
}];
}];
addSignal() {
[letters subscribeNext:^(NSString *x) {
[subscriber sendNext:x];
}];
}
*/
RACSignal *flattened = [signalOfSignals flatten];
// Outputs: A 1 B C 2
[flattened subscribeNext:^(NSString *x) {
NSLog(@"%@", x);
}];
//letters信号sendNext就会触发底层的subscribeNext
//而底层中subscribeNext就会调用flattened的subscriber
//最终就会触发[flattened subscribeNext:^(id x) {}];
[letters sendNext:@"A"];
[numbers sendNext:@"1"];
[letters sendNext:@"B"];
[letters sendNext:@"C"];
[numbers sendNext:@"2"];
}
merge
- (void)mergeDemo {
RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"signalA"];
return nil;
}];
RACSignal *signalB = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"signalB"];
return nil;
}];
//signalA merge signalB 产生一个新的信号mergeSignal
RACSignal *mergeSignal = [signalA merge:signalB];
//新的信号mergeSignal通过subscribeNext创建监听者。
[mergeSignal subscribeNext:^(id x) {
NSLog(@"%@", x);
} error:^(NSError *error) {
NSLog(@"%@", error.localizedDescription);
}];
}
RACSignal *mergeSignal = [signalA merge:signalB];
这里的关键是RACSignal *mergeSignal = [signalA merge:signalB]; signalA merge了signalB产生了一个新的信号mergeSignal,下面我们就看看merge里面做了什么。
+ (RACSignal *)merge:(id<NSFastEnumeration>)signals {
//将两个signal添加一个数组中
NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
for (RACSignal *signal in signals) {
[copiedSignals addObject:signal];
}
return [[[RACSignal
createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
//遍历数组,触发sendNext,注意这里sendNext的是signal
for (RACSignal *signal in copiedSignals) {
[subscriber sendNext:signal];
}
[subscriber sendCompleted];
return nil;
}]
//倘若没有flatten,外面新的signal就是这个signal,外面subscribeNext后,就会触发上面的block
flatten]
setNameWithFormat:@"+merge: %@", copiedSignals];
}
merge里面创建了一个信号,然后调用了flatten。下面我们再来看看flatten里面做了什么
- (instancetype)flatten {
__weak RACStream *stream __attribute__((unused)) = self;
return [[self flattenMap:^(id value) {
return value;
}] setNameWithFormat:@"[%@] -flatten", self.name];
}
- (instancetype)flattenMap:(RACStream * (^)(id value))block {
Class class = self.class;
//最主要的是这个bind,几乎所有的operation底层都调用这个bind,通过bind创建一个新的signal
return [[self bind:^{
return ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}
一直到最底层,调用了bind, bind几乎是所有的operation的根基。我们再看看bind里面有做了什么
- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
NSCParameterAssert(block != NULL);
/*
* -bind: should:
*
* 1. Subscribe to the original signal of values.
* 2. Any time the original signal sends a value, transform it using the binding block.
* 3. If the binding block returns a signal, subscribe to it, and pass all of its values through to the subscriber as they're received.
* 4. If the binding block asks the bind to terminate, complete the _original_ signal.
* 5. When _all_ signals complete, send completed to the subscriber.
*
* If any signal sends an error at any point, send that to the subscriber.
*/
//创建一个新的siganl,也就是最外面的mergeSignal就是这个siganl,mergeSignal调用了subscribeNext后,就会创建一个subscriber,并当作参数触发以下这个block
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACStreamBindBlock bindingBlock = block();
NSMutableArray *signals = [NSMutableArray arrayWithObject:self];
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
//创建completionSignal,处理complete以后的操作
void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) {
BOOL removeDisposable = NO;
@synchronized (signals) {
[signals removeObject:signal];
if (signals.count == 0) {
[subscriber sendCompleted];
[compoundDisposable dispose];
} else {
removeDisposable = YES;
}
}
if (removeDisposable) [compoundDisposable removeDisposable:finishedDisposable];
};
//创建addSignal,
void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
@synchronized (signals) {
[signals addObject:signal];
}
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
//在添加的过程中,会调用subscribeNext,这时候就会触发signalA的block。
RACDisposable *disposable = [signal subscribeNext:^(id x) {
//以下这个subscriber就是mergeSignal的,通过sendNext后,最外面的subscribeNext就会被调用。
[subscriber sendNext:x];
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(signal, selfDisposable);
}
}];
selfDisposable.disposable = disposable;
};
@autoreleasepool {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
//这里的self是merge的时候创建的signal,调用flatten时,进入到bind,调用subscribeNext把最外面的signalA和signalB取到。这里的x就是signal
RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
// Manually check disposal to handle synchronous errors.
if (compoundDisposable.disposed) return;
//x也就是signal,会通过block抛到外面,由外面再进行二次处理,但是flatten不做任何做处理,会原封不动返回signal
BOOL stop = NO;
id signal = bindingBlock(x, &stop);
@autoreleasepool {
//然后会把这个signal添加到数组signals中。
if (signal != nil) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose];
completeSignal(self, selfDisposable);
}
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(self, selfDisposable);
}
}];
selfDisposable.disposable = bindingDisposable;
}
return compoundDisposable;
}] setNameWithFormat:@"[%@] -bind:", self.name];
}