4

苹果APNs推送框架pushy

 2 years ago
source link: https://qiankunli.github.io/2017/06/06/pushy.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

苹果APNs推送框架pushy | 李乾坤的博客

relayrides/pushy是一个 Java library for sending APNs (iOS, OS X, and Safari) push notifications. 揉和了netty和http2协议。因此,除了它本身的功能外,从中也可以学到许多对netty框架以及http2协议的使用技巧。

一次消息的发送实例

// 构建payload
ApnsPayloadBuilder payloadBuilder = new ApnsPayloadBuilder();
payloadBuilder.setAlertBody("Example!");
String payload = payloadBuilder.buildWithDefaultMaximumLength();
String token = TokenUtil.sanitizeTokenString("<efc7492 bdbd8209>");
// 构建推送model
SimpleApnsPushNotification  pushNotification = new SimpleApnsPushNotification(token, "com.example.myApp", payload);
// 发送
Future<PushNotificationResponse<SimpleApnsPushNotification>> sendNotificationFuture = apnsClient.sendNotification(pushNotification);

几乎看不到netty 的痕迹,而传统的netty 客户端启动代码

EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap b = new Bootstrap();
    b.group(group)
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<SocketChannel>(){
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                ch.pipeline().addLast(new StringDecoder());
                ch.pipeline().addLast(new EchoClientHandler());
            }
        });
    ChannelFuture f = b.connect(host, port).sync();
    f.channel().closeFuture().sync();
} finally {
    group.shutdownGracefully();
}

可以看到,两者差异还是蛮大的。

send notification 流程

netty对外提供的操作接口有以下几个问题,当然,这也不怨netty

  1. 操作对象有多个,比如bootstrap、channel。并且,操作对象间存在依赖关系,channel不是直接初始化,而是通过bootstrap获得。
  2. netty启动逻辑复杂,必须由用户显式编写代码。
  3. 操作接口不简洁,对于io操作,简介的接口应该是

    1. 同步,response send(request)
    2. 异步,Future<response> send(request)

    而netty则无明确封装

通过pushy,我们可以学习到如何封装netty,同时学习如何以纯异步的方式新增自定义逻辑,比如线程池等。

使用ApnsChannelFactory封装Bootstrap

A Bootstrap that makes it easy to bootstrap a Channel to use for clients. 可见,Bootstrap 就是为了创建Channel,我们可以将创建channel的部分分两步

  1. 创建并配置Bootstrap
  2. 使用Bootstrap 创建channel

ApnsChannelFactory构造方法 构建Bootstrap

ApnsChannelFactory(final SslContext sslContext, ...
                    final EventLoopGroup eventLoopGroup) {
    this.sslContext = sslContext;
    this.addressResolverGroup = ...
    this.bootstrapTemplate = new Bootstrap();
    this.bootstrapTemplate.group(eventLoopGroup);
    this.bootstrapTemplate.option(ChannelOption.TCP_NODELAY, true);
    this.bootstrapTemplate.remoteAddress(apnsServerAddress);
    this.bootstrapTemplate.resolver(this.addressResolverGroup);
    this.bootstrapTemplate.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
    this.bootstrapTemplate.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(final SocketChannel channel) {
            final ChannelPipeline pipeline = channel.pipeline();
            if (proxyHandlerFactory != null) {
                pipeline.addFirst(proxyHandlerFactory.createProxyHandler());
            }
            final SslHandler sslHandler = sslContext.newHandler(channel.alloc());
            sslHandler.handshakeFuture().addListener(...);
            pipeline.addLast(sslHandler);
            pipeline.addLast(ConnectionNegotiationErrorHandler.INSTANCE);
        }
    });
}

ApnsChannelFactory.create 调用Bootstrap 创建channel

public Future<Channel> create(final Promise<Channel> channelReadyPromise) {
    final long delay = this.currentDelaySeconds.get();
    this.bootstrapTemplate.config().group().schedule(new Runnable() {
        public void run() {
            final Bootstrap bootstrap = ApnsChannelFactory.this.bootstrapTemplate.clone()
                    .channelFactory(...);
            final ChannelFuture connectFuture = bootstrap.connect();
            connectFuture.addListener(...);
            connectFuture.channel().closeFuture().addListener(...);
        }
    }, delay, TimeUnit.SECONDS);

    return channelReadyPromise;
}

ApnsChannelPool.acquire

ApnsChannelPool 比较关键的几个字段,决定了复用现有的channel 还是执行channelFactory.create

  1. capacity,ApnsChannelPool 的最大连接数
  2. allChannels,ChannelGroup 真正持有 channel的容器
  3. pendingCreateChannelFutures,持有channelFactory.create返回的Future<Channel> 即当前正在创建的 channel

channel容器——ChannelGroup

A thread-safe Set that contains open Channels and provides various bulk operations on them. 管理所有channel,提供批量操作,比如ChannelGroupFuture write(Object message) 的实现便是 遍历所有channel并执行channel.write

A closed Channel is automatically removed from the collection, so that you don’t need to worry about the life cycle of the added Channel.

private final ChannelFutureListener remover = new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        remove(future.channel());
    }
};

ChannelGroup 监听了所管理的每一个channel的close 事件

public boolean add(Channel channel) {
    ConcurrentMap<ChannelId, Channel> map = xx;
    boolean added = map.putIfAbsent(channel.id(), channel) == null;
    if (added) {
        channel.closeFuture().addListener(remover);
    }
    ...
    return added;
}

2018.6.18 补充 relayrides/pushy

  1. Flow control。pushy 是纯异步接口,pushy 有个控制,实现原理估计是1500个future 未返回时,则将后续的请求 缓冲起来,等等这个1500个inflight 的reqeust。

    当数据量很大时,必然触发1500 的上线,进而大量请求 缓存在netty buffer 中,然后耗尽内存。此时,需要一个 flow control layer ,可以使用CountDown 或 Semaphore

从pushy 中我们可以学习到,如何去写一个纯异步框架


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK