5

RACSignal源码解析

 2 years ago
source link: https://chipengliu.github.io/2018/12/28/RACSignal%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90-RACSignal%E5%8F%91%E9%80%81%E4%BF%A1%E5%8F%B7%E8%BF%87%E7%A8%8B%E8%A7%A3%E6%9E%90/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

RACStream 是 ReactiveCocoa 中的核心概念:信号;RACStream 中有2个子类:

  1. RACSignal
  2. RACSequence

RACSignal

实际项目中,对 RACSignal 的使用中,经常会看到这样的代码:

/* 代码 1 */

RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber){
/// STEP 1
[subscriber sendNext:@(1)];
[subscriber sendNext:@(2)];
[subscriber sendNext:@(3)];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
/// STEP 5
NSLog(@"signal dispose");
}];
}];

RACDisposable *disposable = [signal subscribeNext:^(id x) {
/// STEP 2
NSLog(@"received value = %@", x);
} error:^(NSError *error) {
/// STEP 3
NSLog(@"received error: %@", error);
} completed:^{
/// STEP 4
NSLog(@"received completed");
}];

[disposable dispose];

基于以上的代码,我们可以看看 RACSigal从 被订阅到订阅者受到数据的整个过程,具体经历过那些流程

/* 代码 2 */

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
return [RACDynamicSignal createSignal:didSubscribe]; // 返回 RACDynamicSignal 类型的对象
}

上面初始化返回 RACDynamicSignal 对象,这是一个私有类,继承RACSignal,实现 RACSignal 的订阅行为

/* 代码 3 */

// A private `RACSignal` subclasses that implements its subscription behavior
// using a block.
@interface RACDynamicSignal : RACSignal

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe;

@end

在调用 RACDynamicSignal 的 createSignal 方法中,会传入一个名为 didSubscribe ,返回类型为 RACDisposable 的block,在内部会将 didSubscribe 进行一次copy,然后保存在 RACDynamicSignal 对象中。

/* 代码 4 */

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
signal->_didSubscribe = [didSubscribe copy]; // 这里先进行一次copy
return [signal setNameWithFormat:@"+createSignal:"];
}

上面代码初始化返回 signal 调用方法 setNameWithFormat,会执行基类 RACStream 的方法,给 name 属性赋值

/* 代码 5 */

- (instancetype)setNameWithFormat:(NSString *)format, ... {
if (getenv("RAC_DEBUG_SIGNAL_NAMES") == NULL) return self;

NSCParameterAssert(format != nil);

va_list args;
va_start(args, format);

NSString *str = [[NSString alloc] initWithFormat:format arguments:args];
va_end(args);

self.name = str;
return self;
}

从 didSubscribe 中还可以看到 RACSubscriber 协议:

/* 代码 6 */

@protocol RACSubscriber <NSObject>
@required

/// 给所有订阅者发送 next value
- (void)sendNext:(nullable id)value;

/// 给所有订阅者发送 error
/// 这将终止订阅,并使所有订阅者无法收到后续数据
- (void)sendError:(nullable NSError *)error;

/// 给所有订阅者发送 complete
/// 这将终止订阅,并使所有订阅者无法收到后续数据
- (void)sendCompleted;

/// 用来接收代表某次订阅的 disposable 对象, 用来处理是否释放取消订阅的事件
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;

代码4中可以看到,创建 RACSignal 的时候,signal 会持有保存传入的 didSubscribe 闭包。这个闭包会在 signal 被订阅的时候触发

/* 代码 7 */

- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
NSCParameterAssert(nextBlock != NULL);
NSCParameterAssert(errorBlock != NULL);
NSCParameterAssert(completedBlock != NULL);

RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
return [self subscribe:o];
}

我们平时对 RACSignal 实例对象调用 - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock 或者 - (RACDisposable *)subscribeError:(void (^)(NSError *error))errorBlock 方法会返回 RACDisposable 对象,内部会创建一个实现了 RACSubscriber 协议的对象 o, 把 next、error、completed 三种block进行 copy 保存

/* 代码 8 */

+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];

subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];

return subscriber;
}

然后调用 - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber 方法并把刚刚 RACSubscriber 对象传入,如代码2所示,这里实际上是调用了 RACDynamicSignal 的 subscriber 方法

/* 代码 9 */

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);

RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];

[disposable addDisposable:schedulingDisposable];
}

return disposable;
}

在代码9中我们可以看到 2 个 类:RACCompoundDisposable、 RACPassthroughSubscriber

RACCompoundDisposable 继承 RACDisposable,可以理解为是一个保存 RACDisposable 类型的容器,当 RACCompoundDisposable 执行 - (void)dispose 方法会将 disposables 数组中的元素一一进行 dispose

/* 代码 10 */

static void disposeEach(const void *value, void *context) {
RACDisposable *disposable = (__bridge id)value;
[disposable dispose];
}

- (void)dispose {
#if RACCompoundDisposableInlineCount
RACDisposable *inlineCopy[RACCompoundDisposableInlineCount];
#endif

CFArrayRef remainingDisposables = NULL;

pthread_mutex_lock(&_mutex);
{
_disposed = YES;

#if RACCompoundDisposableInlineCount
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
inlineCopy[i] = _inlineDisposables[i];
_inlineDisposables[i] = nil;
}
#endif

remainingDisposables = _disposables;
_disposables = NULL;
}
pthread_mutex_unlock(&_mutex);

#if RACCompoundDisposableInlineCount
// Dispose outside of the lock in case the compound disposable is used
// recursively.
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
[inlineCopy[i] dispose];
}
#endif

if (remainingDisposables == NULL) return;

/// 遍历 RACDisposable 数组
/// 通过函数 disposeEach 来 dispose 每一个数组元素
CFIndex count = CFArrayGetCount(remainingDisposables);
CFArrayApplyFunction(remainingDisposables, CFRangeMake(0, count), &disposeEach, NULL);
CFRelease(remainingDisposables);
}

RACPassthroughSubscriber 是一个私有类,主要的作用是把一个订阅者 subscriber A 的信号事件传递给另一个没有被 dispose 的订阅者 subscriber B。由以下几个步骤实现:

  1. 包装真正的订阅者,使自己成为订阅者的替代者
  2. 将真正的订阅者与一个订阅时产生的 Disposable 关联起来
/* 代码 11 */

@interface RACPassthroughSubscriber ()

// 接受转发信号事件的订阅者,也就是上面提到的 subscriber B
@property (nonatomic, strong, readonly) id<RACSubscriber> innerSubscriber;

// 给 RACPassthroughSubscriber 发送事件的 RACSignal 信号
//
// 该属性使用 unsafe_unretained 修饰主要是因为 RACSignal 仅是一个 DTrace probes
// 动态跟踪技术的探针, 如果改用 weak 会造成不必要的性能损耗
@property (nonatomic, unsafe_unretained, readonly) RACSignal *signal;

// disposable 若disposed,信号事件则不再转发给 innerSubscriber
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;

@end

回到代码9中,之前已经创建刚刚提到的 RACCompoundDisposable 和 RACPassthroughSubscriber,

/* 代码 12 */

if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];

[disposable addDisposable:schedulingDisposable];
}

subscriptionScheduler 是一个单例方法,返回 RACSubscriptionScheduler 对象

/* 代码 13 */

+ (instancetype)subscriptionScheduler {
static dispatch_once_t onceToken;
static RACScheduler *subscriptionScheduler;
dispatch_once(&onceToken, ^{
subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
});

return subscriptionScheduler;
}

继而调用 RACSubscriptionScheduler 的 schedule 方法

/* 代码 14 */

- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);

if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];

block();
return nil;
}

+ (BOOL)isOnMainThread {
return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}

+ (RACScheduler *)currentScheduler {
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;

return nil;
}

schedule 方法中主要先去找当前线程对应的 RACScheduler 对象,如果找不到,则去找主线程对应的 RACScheduler 对象,如果还是找不到,则返回 backgroundScheduler 对象。

schedule 方法目的是主要释放参数 block,也就是执行代码12中

/* 代码 15 */

RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];

代码 4 中保存的闭包 didSubscribe 会在这里执行,进而可能调用闭包入参 subscriber(也就是之前创建的RACPassthroughSubscriber对象) 的 sendNext,sendError,sendCompleted 方法

/* 代码 16 */

- (void)sendNext:(id)value {
if (self.disposable.disposed) return;

if (RACSIGNAL_NEXT_ENABLED()) {
RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
}

[self.innerSubscriber sendNext:value];
}

- (void)sendError:(NSError *)error {
if (self.disposable.disposed) return;

if (RACSIGNAL_ERROR_ENABLED()) {
RACSIGNAL_ERROR(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString(error.description));
}

[self.innerSubscriber sendError:error];
}

- (void)sendCompleted {
if (self.disposable.disposed) return;

if (RACSIGNAL_COMPLETED_ENABLED()) {
RACSIGNAL_COMPLETED(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description));
}

[self.innerSubscriber sendCompleted];
}

RACPassthroughSubscriber 在以上的方法会将信号事件转发给 innerSubscriber,因为 innerSubscriber 是 RACSubscriber 对象,进而会执行私有类 RACSubscriber 中的方法 sendNext/sendError/sendCompleted

/* 代码 17 */

- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;

nextBlock(value);
}
}

- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];
[self.disposable dispose];

if (errorBlock == nil) return;
errorBlock(e);
}
}

- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
[self.disposable dispose];

if (completedBlock == nil) return;
completedBlock();
}
}

执行上面方法前,都会先加锁保证线程安全,然后将对应的block(代码 8 创建 RACSubscriber 保存的 _next/_error/_completed)进行一次 copy,最后执行 block,最终的结果就是执行 代码 1 中的 STEP 2STEP 3STEP 4

基于以上的分析,RACSignal 从创建到给订阅发送事件可以归纳为以下几个步骤:

  1. 调用RACSignal createSignal 的方法,返回子类 RACDynamicSignal 对象并保存闭包 didSubscribe – 代码 1
  2. 订阅信号,调用RACSignal subscribeNext 方法,在该方法中会创建 RACSubscriber 订阅者对象
  3. 创建的 RACSubscriber 订阅者对象会copy nextBlock,errorBlock,completedBlock,保存到相关属性中 – 代码 7
  4. 调用步骤1创建的 RACDynamicSignal subscribe 方法
  5. 创建 RACCompoundDisposable 和 RACPassthroughSubscriber 对象,RACPassthroughSubscriber 会保存 步骤1返回的 RACDynamicSignal(signal属性),步骤2中的 RACSubscriber(innerSubscriber属性)– 代码 11
  6. 执行 RACDynamicSignal 保存的 didSubscribe 闭包,闭包内调用 RACPassthroughSubscriber sendNextsendErrorsendCompleted 方法
  7. RACPassthroughSubscriber 执行保存的 innerSubscriber 对应的 sendNextsendErrorsendCompleted 方法 — 代码 17

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK