2

RACSignal常用方法深入分析(3)

 2 years ago
source link: https://chipengliu.github.io/2019/02/23/RACSignal-Operations-3/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本篇文章接着上篇继续分析常用的 RACSignal 方法的第三部分进行分析。

ignoreValues

-ignoreValues 底层比较简单,直接通过封装 filter 方法,将所有输入信号都忽略掉,所以最终的订阅者无法收到任何信号。

- (RACSignal *)ignoreValues {
return [[self filter:^(id _) {
return NO;
}] setNameWithFormat:@"[%@] -ignoreValues", self.name];
}

ignore:

测试代码:

- (void)testIgnore {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
return nil;
}];

RACSignal *filterSignal = [sourceSignal ignore:@1];

[filterSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 0
value = 2

底层实现:

- (__kindof RACStream *)ignore:(id)value {
return [[self filter:^ BOOL (id innerValue) {
return innerValue != value && ![innerValue isEqual:value];
}] setNameWithFormat:@"[%@] -ignore: %@", self.name, RACDescription(value)];
}

-ignore: 内部是通过封装 filter 方法实现,主要通过 filter 筛选闭包判断原信号发出的信号值和参数 value 相同,相同则被过滤掉。

take:

-take: 实现思想和 -distinctUntilChanged 相似,同样的是通过封装 -bind: 实现,在 -bind: 闭包中,用变量 taken 来记录原信号发送信号的次数,当 taken 取到 count 个数的时候,就停止给订阅者发送信号。

- (instancetype)take:(NSUInteger)count {
Class class = self.class;

if (count == 0) return class.empty;

return [[self bind:^{
__block NSUInteger taken = 0;

return ^ id (id value, BOOL *stop) {
if (taken < count) {
++taken;
if (taken == count) *stop = YES;
return [class return:value];
} else {
return nil;
}
};
}] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
}

takeLast:

测试代码:

- (void)testTakeLast {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

RACSignal *takeSignal = [sourceSignal takeLast:2];

[takeSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 2
value = 3

底层实现:

- (RACSignal *)takeLast:(NSUInteger)count {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
return [self subscribeNext:^(id x) {
[valuesTaken addObject:x ? : RACTupleNil.tupleNil];

while (valuesTaken.count > count) {
[valuesTaken removeObjectAtIndex:0];
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
for (id value in valuesTaken) {
[subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
}

[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];
}

takeLast: 先订阅原信号,根据参数 count, 保存最后 count 个信号值到数组 valuesTaken。当原信号发送 sendCompleted的时候遍历 valuesTaken 将其元素逐个发给订阅者。

takeUntilBlock:

- (__kindof RACStream *)takeUntilBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);

Class class = self.class;

return [[self bind:^{
return ^ id (id value, BOOL *stop) {
if (predicate(value)) return nil;

return [class return:value];
};
}] setNameWithFormat:@"[%@] -takeUntilBlock:", self.name];
}

-takeUntilBlock:-taskLast: 实现思路基本一致,后置是通过信号值个数和参数 count 比较判断是否继续给订阅者发送型号,而 -takeUntilBlock: 是根据参数 predicate 闭包执行结果来判断是否给订阅者发送信号

takeUntil:

测试代码:

- (void)testUntil {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@3];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *untilSignal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@10];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *takeSignal = [sourceSignal takeUntil:untilSignal];

[takeSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 0
value = 1
value = 2

底层实现:

- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
void (^triggerCompletion)(void) = ^{
[disposable dispose];
[subscriber sendCompleted];
};

RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
triggerCompletion();
} completed:^{
triggerCompletion();
}];

[disposable addDisposable:triggerDisposable];

if (!disposable.disposed) {
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[disposable dispose];
[subscriber sendCompleted];
}];

[disposable addDisposable:selfDisposable];
}

return disposable;
}] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
}

-takeUntil: 会返回新的信号,新信号被订阅的时候触发入参闭包:

  1. 订阅 signalTrigger 信号,如果发送 sendNext/sendCompleted 则结束整个新信号的订阅
  2. 如果 signalTrigger 尚未发送过 sendNext/sendCompleted ,则将原信号的信号事件透传给订阅者

image-takeuntil

takeUntilReplacement:

测试代码:

- (void)testTakeUntilReplacement {
RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];

dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@5];
[subscriber sendNext:@6];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *newSignal = [signal1 takeUntilReplacement:signal2];
[newSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 0
value = 1
value = 2
value = 5
value = 6

底层实现:

- (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {
return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];

RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) {
[selfDisposable dispose];
[subscriber sendNext:x];
} error:^(NSError *error) {
[selfDisposable dispose];
[subscriber sendError:error];
} completed:^{
[selfDisposable dispose];
[subscriber sendCompleted];
}];

if (!selfDisposable.disposed) {
selfDisposable.disposable = [[self
concat:[RACSignal never]]
subscribe:subscriber];
}

return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[replacementDisposable dispose];
}];
}];
}

takeUntilReplacement: 内部主要经历以下的步骤:

  1. 首先原信号执行 -concat: 方法并传入 [RACSignal never],[RACSignal never] 返回的信号不会给订阅者发送任何东西,这样能保证收到 replacement 信号前,源信号不会被 dispose

    + (RACSignal *)never {
    return [[self createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
    return nil;
    }] setNameWithFormat:@"+never"];
    }
  2. 订阅 replacement 信号,当收到其发送的 sendNext/sendError/sendCompleted 时,先讲原信号 dispose,然后将对应的信号发给最终的订阅者

主要流程如图所示:

image-20190223154342910

skip:

测试代码:

- (void)testSkip {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendCompleted];
return nil;
}];

[[sourceSignal skip:1] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 1
value = 2
value = 3
value = 4

底层实现:

- (__kindof RACStream *)skip:(NSUInteger)skipCount {
Class class = self.class;

return [[self bind:^{
__block NSUInteger skipped = 0;

return ^(id value, BOOL *stop) {
if (skipped >= skipCount) return [class return:value];

skipped++;
return class.empty;
};
}] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
}

-skip: 方法内补也是通过 -bind: 方法进行封装,通过变量 skipped 来记录原始信号已经发过几个信号,当skipped >= skipCount 的时候,会将信号事件直接发给最终订阅者。

image-20190223161533779

skipUntilBlock:

测试代码:

- (void)testSkipUntilBlock {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendNext:@0];
[subscriber sendCompleted];
return nil;
}];

[[sourceSignal skipUntilBlock:^BOOL(id _Nullable x) {
return [x integerValue] > 3;
}] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 4
value = 0

底层实现:

- (__kindof RACStream *)skipUntilBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);

Class class = self.class;

return [[self bind:^{
__block BOOL skipping = YES;

return ^ id (id value, BOOL *stop) {
if (skipping) {
if (predicate(value)) {
skipping = NO;
} else {
return class.empty;
}
}

return [class return:value];
};
}] setNameWithFormat:@"[%@] -skipUntilBlock:", self.name];
}

-skipUntilBlock:-skip: 原理类似,忽略信号的条件从信号个数变化成从 predicate 闭包来判断,值得注意的是,当 predicate 闭包返回 YES 之后的所有信号就不再会被 skip。

skipWhileBlock:

- (__kindof RACStream *)skipWhileBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);

return [[self skipUntilBlock:^ BOOL (id x) {
return !predicate(x);
}] setNameWithFormat:@"[%@] -skipWhileBlock:", self.name];
}

-skipUntilBlock: 类似,也是通过 predicate 闭包返回结果来判断是否来 skip 信号,区别是

-skipWhileBlock: 只有 predicate 返回 YES 才skip 信号,一旦返回 NO,以后的信号就不再会被 skip

distinctUntilChanged

测试代码:

- (void)testDistinctUntilChanged {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

RACSignal *filterSignal = [sourceSignal distinctUntilChanged];

[filterSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 0
value = 1
value = 2
value = 3

底层实现:

- (__kindof RACStream *)distinctUntilChanged {
Class class = self.class;

return [[self bind:^{
__block id lastValue = nil;
__block BOOL initial = YES;

return ^(id x, BOOL *stop) {
if (!initial && (lastValue == x || [x isEqual:lastValue])) return [class empty];

initial = NO;
lastValue = x;
return [class return:x];
};
}] setNameWithFormat:@"[%@] -distinctUntilChanged", self.name];
}

-distinctUntilChanged 是底层是通过 -bind: 方法来实现

  1. 定义 lastValue 记录原信号上一次发送的信号值,initial 判断之前2个信号值是否相同
  2. 如果当前发送的信号值和上一次的相同,直接返回 empty 信号并发送给订阅者
  3. 如果当前原信号发送的信号值和上一次不相同,调用 return 方法将信号值包装成新的信号发送给订阅者

groupBy:transform:

测试代码:

- (void)testGroupByTransform {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendCompleted];
return nil;
}];

RACSignal *groupSignal = [sourceSignal groupBy:^id<NSCopying> _Nullable(id _Nullable object) {
return [object integerValue] > 2 ? @"send" : @"skip";
} transform:^id _Nullable(id _Nullable object) {
return @([object integerValue] * 10);
}];

RACSignal *filterSignal = [[groupSignal filter:^BOOL(RACGroupedSignal *value) {
return [(NSString *)value.key isEqualToString:@"send"];
}] flatten];

[filterSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 30
value = 40

底层实现:

- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock {
NSCParameterAssert(keyBlock != NULL);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
NSMutableDictionary *groups = [NSMutableDictionary dictionary];
NSMutableArray *orderedGroups = [NSMutableArray array];

return [self subscribeNext:^(id x) {
id<NSCopying> key = keyBlock(x);
RACGroupedSignal *groupSubject = nil;
@synchronized(groups) {
groupSubject = groups[key];
if (groupSubject == nil) {
groupSubject = [RACGroupedSignal signalWithKey:key];
groups[key] = groupSubject;
[orderedGroups addObject:groupSubject];
[subscriber sendNext:groupSubject];
}
}

[groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
} error:^(NSError *error) {
[subscriber sendError:error];

[orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error];
} completed:^{
[subscriber sendCompleted];

[orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)];
}];
}] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];
}

-groupBy:transform: 主要是信号事件转化成类似 map 结构数据发给订阅者,主要逻辑可以概括为:

  1. 创建可变字典 groups 和 可变数组 orderedGroups

  2. 订阅原信号,当原信号发送 sendNext 的时候,通过闭包 keyBlock 取到对应的 key 值,然后根据 key 从 groups 中取出对应的 groupSubject (RACGroupedSignal 类型),如果为空则新建一个,并且加入到 orderedGroups 数组中,再把 groupSubject 发给最终订阅者

  3. groupSubject 给订阅者发送 transformBlock 闭包的返回结果,如果 transformBlock 是空,则直接把信号发给订阅者

  4. 原信号发送 sendCompleted / sendError ,遍历数组 orderedGroups 并对其中的 RACGroupedSignal 元素执行对应的 sendCompleted / sendError

    经过上面的分析看得出 -groupBy:transform: 方法返回的方法也是一个高阶信号,我们可以结合之前 -flatten-flattenMap: 等方法结合使用,对齐进行降阶

groupBy:

-groupBy: 是通过封装 -groupBy:transform: 来实现的,把后者入参 transformBlock 赋值为 nil,也就是 -groupBy:transform: 主要逻辑步骤3中,groupSubject 会给其订阅者返回原信号发送的信号值

- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock {
return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name];
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK