1

Jetty线程“互锁”导致数据传输性能降低问题分析

 1 year ago
source link: https://blogread.cn/it/article/6708?f=hot1
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Jetty线程“互锁”导致数据传输性能降低问题分析

浏览:2191次  出处信息

   以下分析针对jetty的特定版本:http://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/tags/jetty-7.2.1.v20101111

   首先介绍一下Jetty的反映器模型,Jetty用的经典的NIO异步模型(Scalable IO in Java http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)。连接管理的示意图如下:

   

pool

   Jetty在使用这个模型的时候,做了一些改动,acceptor是独立出来的一个阻塞线程,用于阻塞地接受新的连接请求,而所有的连接建立之后,都会想selector线程注册网络事件和内部的事件(changes),selector需要同时处理网络事件和内部的changes。同时还要定期检查超时的链接。

   当一个连接建立之后,除了分发网络事件之外,主线程还会与子线程有一些交互。当子线程发现网络拥塞,缓冲区的数据无法及时刷走时,会注册一个表明自己处于阻塞状态的内部事件,并且期望主线程在发现拥塞解除的时候能通知到自己。

   具体的代码如下:

/* ------------------------------------------------------------ */

    * Allows thread to block waiting for further events.

   @Override

   public boolean blockWritable(long timeoutMs) throws IOException

       synchronized (this)

           long start=_selectSet.getNow();

               _writeBlocked=true;

               while (isOpen() && _writeBlocked)

                       updateKey();

                       this.wait(timeoutMs);

                       timeoutMs -= _selectSet.getNow()-start;

                       if (_writeBlocked && timeoutMs<=0)

                           return false;

                   catch (InterruptedException e)

                       Log.warn(e);

           finally

               _writeBlocked=false;

               if (_idleTimestamp!=-1)

                   scheduleIdle();

       return true;

   而主线程的select主循环的代码如下:

/* ------------------------------------------------------------ */

    * Select and dispatch tasks found from changes and the selector.

    * @throws IOException

   public void doSelect() throws IOException

           _selecting=Thread.currentThread();

           final Selector selector=_selector;

           // Make any key changes required

           Object change;

           int changes=_changes.size();

           while (changes-->0 && (change=_changes.poll())!=null)

                   if (change instanceof EndPoint)

                       // Update the operations for a key.

                       SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;

                       endpoint.doUpdateKey();

                   else if (change instanceof ChannelAndAttachment)

                       // finish accepting/connecting this connection

                       final ChannelAndAttachment asc = (ChannelAndAttachment)change;

                       final SelectableChannel channel=asc._channel;

                       final Object att = asc._attachment;

                       SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);

                       SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);

                       key.attach(endpoint);

                       endpoint.schedule();

                   else if (change instanceof SocketChannel)

                       // Newly registered channel

                       final SocketChannel channel=(SocketChannel)change;

                       SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);

                       SelectChannelEndPoint endpoint = createEndPoint(channel,key);

                       key.attach(endpoint);

                       endpoint.schedule();

                   else if (change instanceof Runnable)

                       dispatch((Runnable)change);

                       throw new IllegalArgumentException(change.toString());

               catch (Exception e)

                   if (isRunning())

                       Log.warn(e);

                       Log.debug(e);

               catch (Error e)

                   if (isRunning())

                       Log.warn(e);

                       Log.debug(e);

           // Do and instant select to see if any connections can be handled.

           int selected=selector.selectNow();

           _selects++;

           long now=System.currentTimeMillis();

           // if no immediate things to do

           if (selected==0)

               // If we are in pausing mode

               if (_pausing)

                       Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop

                   catch(InterruptedException e)

                       Log.ignore(e);

                   now=System.currentTimeMillis();

               // workout how long to wait in select

               _timeout.setNow(now);

               long to_next_timeout=_timeout.getTimeToNext();

               long wait = _changes.size()==0?__IDLE_TICK:0L;  

               if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)

                   wait = to_next_timeout;

               // If we should wait with a select

               if (wait>0)

                   long before=now;

                   selected=selector.select(wait);

                   _selects++;

                   now = System.currentTimeMillis();

                   _timeout.setNow(now);

                   checkJvmBugs(before, now, wait, selected);

           // have we been destroyed while sleeping

           if (_selector==null || !selector.isOpen())

               return;

           // Look for things to do

           for (SelectionKey key: selector.selectedKeys())

                   if (!key.isValid())

                       key.cancel();

                       SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();

                       if (endpoint != null)

                           endpoint.doUpdateKey();

                       continue;

                   Object att = key.attachment();

                   if (att instanceof SelectChannelEndPoint)

                       ((SelectChannelEndPoint)att).schedule();

                       // Wrap readable registered channel in an endpoint

                       SocketChannel channel = (SocketChannel)key.channel();

                       SelectChannelEndPoint endpoint = createEndPoint(channel,key);

                       key.attach(endpoint);

                       if (key.isReadable())

                           endpoint.schedule();                          

                   key = null;

               catch (CancelledKeyException e)

                   Log.ignore(e);

               catch (Exception e)

                   if (isRunning())

                       Log.warn(e);

                       Log.ignore(e);

                   if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())

                       key.cancel();

           // Everything always handled

           selector.selectedKeys().clear();

           now=System.currentTimeMillis();

           _timeout.setNow(now);

           Task task = _timeout.expired();

           while (task!=null)

               if (task instanceof Runnable)

                   dispatch((Runnable)task);

               task = _timeout.expired();

           // Idle tick

           if (now-_idleTick>__IDLE_TICK)

               _idleTick=now;

               final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))

                   ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)

                   :now;

               dispatch(new Runnable()

                   public void run()

                       for (SelectChannelEndPoint endp:_endPoints.keySet())

                           endp.checkIdleTimestamp(idle_now);

       catch (CancelledKeyException e)

           Log.ignore(e);

       finally

           _selecting=null;

   /* ------------------------------------------------------------ */

   private void checkJvmBugs(long before, long now, long wait, int selected)

       throws IOException

       Selector selector = _selector;

       if (selector==null)

           return;

       // Look for JVM bugs over a monitor period.

       // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933

       // http://bugs.sun.com/view_bug.do?bug_id=6693490

       if (now>_monitorNext)

           _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));

           _pausing=_selects>__MAX_SELECTS;

           if (_pausing)

               _paused++;

           _selects=0;

           _jvmBug=0;

           _monitorStart=now;

           _monitorNext=now+__MONITOR_PERIOD;

       if (now>_log)

           if (_paused>0)  

               Log.debug(this+" Busy selector - injecting delay "+_paused+" times");

           if (_jvmFix2>0)

               Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");

           if (_jvmFix1>0)

               Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");

           else if(Log.isDebugEnabled() && _jvmFix0>0)

               Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");

           _paused=0;

           _jvmFix2=0;

           _jvmFix1=0;

           _jvmFix0=0;

           _log=now+60000;

       // If we see signature of possible JVM bug, increment count.

       if (selected==0 && wait>10 && (now-before)<(wait/2))

           // Increment bug count and try a work around

           _jvmBug++;

           if (_jvmBug>(__JVMBUG_THRESHHOLD))

                   if (_jvmBug==__JVMBUG_THRESHHOLD+1)

                       _jvmFix2++;

                   Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop

               catch(InterruptedException e)

                   Log.ignore(e);

           else if (_jvmBug==__JVMBUG_THRESHHOLD)

               synchronized (this)

                   // BLOODY SUN BUG !!!  Try refreshing the entire selector.

                   final Selector new_selector = Selector.open();

                   for (SelectionKey k: selector.keys())

                       if (!k.isValid() || k.interestOps()==0)

                           continue;

                       final SelectableChannel channel = k.channel();

                       final Object attachment = k.attachment();

                       if (attachment==null)

                           addChange(channel);

                           addChange(channel,attachment);

                   _selector.close();

                   _selector=new_selector;

                   return;

           else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops

               // Cancel keys with 0 interested ops

               int cancelled=0;

               for (SelectionKey k: selector.keys())

                   if (k.isValid()&&k.interestOps()==0)

                       k.cancel();

                       cancelled++;

               if (cancelled>0)

                   _jvmFix0++;

               return;

       else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)

           // Look for busy key

           SelectionKey busy = selector.selectedKeys().iterator().next();

           if (busy==_busyKey)

               if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))

                   final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();

                   Log.warn("Busy Key "+busy.channel()+" "+endpoint);

                   busy.cancel();

                   if (endpoint!=null)

                       dispatch(new Runnable()

                           public void run()

                                   endpoint.close();

                               catch (IOException e)

                                   Log.ignore(e);

               _busyKeyCount=0;

           _busyKey=busy;

   在checkjvmbugs的方法中,当主线程正在阻塞selector.select(wait)时,子线程出现blockwrite的情况,会导致主线程被惊醒,使得主线程select到的事件数为0,当selector管理大量的连接时,会出现1秒内上百次主线程被惊醒,最终在checkjvmbugs方法内被当作是jvmbug来处理了,强制让主线程休眠50毫秒,而此时还有大量的线程注册阻塞事件,并等待主线程的唤醒。最终一个12K的数据花费了50ms的时间来写往客户端,最终导致性能下降。

   问题已经很清晰,jetty自身的阻塞-唤醒机制被当作jvmbugs来处理,导致数据传输性能受影响,这样的情况也只有在有大量连接和数据传输的时候才会体现出来。

   修改方案,可以将checkjvmbugs调用直接注释掉,保留每秒select 25000次后休眠一段时间的实现即可。也可以将50毫秒的休眠时间调短来避免阻塞时间过长的问题。

   分析最新的jetty代码,发现这个checkjvmbugs的调用已经去掉了,正是按照方案1来实现的。

   问题明确了之后,看似比较简单,但实际上发现问题的过程是很漫长和曲折的,在问题已经锁定在上面之后,有幸参与最终的定位并且分析出了具体的问题点,问题找到的那一刻,有一种兴奋感与成就感。

建议继续学习:

QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK