看merge之前,我们先看一下flatten

flatten

//以下是merge另外的一种实现
- (void)flattenDemo {
    RACSubject *letters = [RACSubject subject];
    RACSubject *numbers = [RACSubject subject];
    
    //创建信号数组,当flattened信号subscribeNext的时候,会在其内部触发signalOfSignals的subscribeNext
    //当[subscriber sendNext:letters];时,会把letters这个signal传递到里面去,会添加到flattened的一个数组中,并建立letters的subscribeNext
    RACSignal *signalOfSignals = [RACSignal createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
        [subscriber sendNext:letters];
        [subscriber sendNext:numbers];
        [subscriber sendCompleted];
        return nil;
    }];
    
    //flatten主要是打平,降维
    //flatten的时候,创建了一个新的signal,在新的signal里面监听了信号数组,即
    /*[RACSignal createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
     [signalOfSignals subscribeNext:^(id x) {
        addSignal(signal);
     }];
     }];
     
     addSignal() {
     [letters subscribeNext:^(NSString *x) {
	     [subscriber sendNext:x];
     }];
     }
     */
    RACSignal *flattened = [signalOfSignals flatten];
    
    // Outputs: A 1 B C 2
    [flattened subscribeNext:^(NSString *x) {
        NSLog(@"%@", x);
    }];
    
    //letters信号sendNext就会触发底层的subscribeNext
    //而底层中subscribeNext就会调用flattened的subscriber
    //最终就会触发[flattened subscribeNext:^(id x) {}];
    [letters sendNext:@"A"];
    [numbers sendNext:@"1"];
    [letters sendNext:@"B"];
    [letters sendNext:@"C"];
    [numbers sendNext:@"2"];
}

merge

- (void)mergeDemo {
    RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"signalA"];
        return nil;
    }];
    
    RACSignal *signalB = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"signalB"];
        return nil;
    }];
    
  //signalA merge signalB 产生一个新的信号mergeSignal
    RACSignal *mergeSignal = [signalA merge:signalB];
  
  //新的信号mergeSignal通过subscribeNext创建监听者。
    [mergeSignal subscribeNext:^(id x) {
        NSLog(@"%@", x);
    } error:^(NSError *error) {
        NSLog(@"%@", error.localizedDescription);
    }];
}

RACSignal *mergeSignal = [signalA merge:signalB];

这里的关键是RACSignal *mergeSignal = [signalA merge:signalB]; signalA merge了signalB产生了一个新的信号mergeSignal,下面我们就看看merge里面做了什么。

+ (RACSignal *)merge:(id<NSFastEnumeration>)signals {
  //将两个signal添加一个数组中
	NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
	for (RACSignal *signal in signals) {
		[copiedSignals addObject:signal];
	}

	return [[[RACSignal
		createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
      //遍历数组,触发sendNext,注意这里sendNext的是signal
			for (RACSignal *signal in copiedSignals) {
				[subscriber sendNext:signal];
			}

			[subscriber sendCompleted];
			return nil;
		}]
    //倘若没有flatten,外面新的signal就是这个signal,外面subscribeNext后,就会触发上面的block
		flatten]
		setNameWithFormat:@"+merge: %@", copiedSignals];
}

merge里面创建了一个信号,然后调用了flatten。下面我们再来看看flatten里面做了什么

- (instancetype)flatten {
	__weak RACStream *stream __attribute__((unused)) = self;
	return [[self flattenMap:^(id value) {
		return value;
	}] setNameWithFormat:@"[%@] -flatten", self.name];
}
- (instancetype)flattenMap:(RACStream * (^)(id value))block {
	Class class = self.class;

  //最主要的是这个bind,几乎所有的operation底层都调用这个bind,通过bind创建一个新的signal
	return [[self bind:^{
		return ^(id value, BOOL *stop) {
			id stream = block(value) ?: [class empty];
			NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

			return stream;
		};
	}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}

一直到最底层,调用了bind, bind几乎是所有的operation的根基。我们再看看bind里面有做了什么

- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
	NSCParameterAssert(block != NULL);

	/*
	 * -bind: should:
	 * 
	 * 1. Subscribe to the original signal of values.
	 * 2. Any time the original signal sends a value, transform it using the binding block.
	 * 3. If the binding block returns a signal, subscribe to it, and pass all of its values through to the subscriber as they're received.
	 * 4. If the binding block asks the bind to terminate, complete the _original_ signal.
	 * 5. When _all_ signals complete, send completed to the subscriber.
	 * 
	 * If any signal sends an error at any point, send that to the subscriber.
	 */
	//创建一个新的siganl,也就是最外面的mergeSignal就是这个siganl,mergeSignal调用了subscribeNext后,就会创建一个subscriber,并当作参数触发以下这个block
	return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
		RACStreamBindBlock bindingBlock = block();

		NSMutableArray *signals = [NSMutableArray arrayWithObject:self];

		RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

    //创建completionSignal,处理complete以后的操作
		void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) {
			BOOL removeDisposable = NO;

			@synchronized (signals) {
				[signals removeObject:signal];

				if (signals.count == 0) {
					[subscriber sendCompleted];
					[compoundDisposable dispose];
				} else {
					removeDisposable = YES;
				}
			}

			if (removeDisposable) [compoundDisposable removeDisposable:finishedDisposable];
		};
    
    //创建addSignal,
		void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
			@synchronized (signals) {
				[signals addObject:signal];
			}

			RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
			[compoundDisposable addDisposable:selfDisposable];
			//在添加的过程中,会调用subscribeNext,这时候就会触发signalA的block。
			RACDisposable *disposable = [signal subscribeNext:^(id x) {
        //以下这个subscriber就是mergeSignal的,通过sendNext后,最外面的subscribeNext就会被调用。
				[subscriber sendNext:x];
			} error:^(NSError *error) {
				[compoundDisposable dispose];
				[subscriber sendError:error];
			} completed:^{
				@autoreleasepool {
					completeSignal(signal, selfDisposable);
				}
			}];

			selfDisposable.disposable = disposable;
		};

		@autoreleasepool {
			RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
			[compoundDisposable addDisposable:selfDisposable];

      //这里的self是merge的时候创建的signal,调用flatten时,进入到bind,调用subscribeNext把最外面的signalA和signalB取到。这里的x就是signal
			RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
				// Manually check disposal to handle synchronous errors.
				if (compoundDisposable.disposed) return;

        //x也就是signal,会通过block抛到外面,由外面再进行二次处理,但是flatten不做任何做处理,会原封不动返回signal
				BOOL stop = NO;
				id signal = bindingBlock(x, &stop);

				@autoreleasepool {
          //然后会把这个signal添加到数组signals中。
					if (signal != nil) addSignal(signal);
					if (signal == nil || stop) {
						[selfDisposable dispose];
						completeSignal(self, selfDisposable);
					}
				}
			} error:^(NSError *error) {
				[compoundDisposable dispose];
				[subscriber sendError:error];
			} completed:^{
				@autoreleasepool {
					completeSignal(self, selfDisposable);
				}
			}];

			selfDisposable.disposable = bindingDisposable;
		}

		return compoundDisposable;
	}] setNameWithFormat:@"[%@] -bind:", self.name];
}