48

聊聊skywalking的jvm-receiver-plugin - code-craft - SegmentFault 思否

 4 years ago
source link: https://segmentfault.com/a/1190000022097083?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文主要研究一下skywalking的jvm-receiver-plugin

JVMModuleProvider

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/JVMModuleProvider.java

public class JVMModuleProvider extends ModuleProvider {

    @Override public String name() {
        return "default";
    }

    @Override public Class<? extends ModuleDefine> module() {
        return JVMModule.class;
    }

    @Override public ModuleConfig createConfigBeanIfAbsent() {
        return null;
    }

    @Override public void prepare() {
    }

    @Override public void start() {
        GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
        grpcHandlerRegister.addHandler(new JVMMetricsServiceHandler(getManager()));
        grpcHandlerRegister.addHandler(new JVMMetricReportServiceHandler(getManager()));
    }

    @Override public void notifyAfterCompleted() {

    }

    @Override public String[] requiredModules() {
        return new String[] {CoreModule.NAME, SharingServerModule.NAME};
    }
}
  • JVMModuleProvider继承了ModuleProvider,其start方法获取grpcHandlerRegister然后添加JVMMetricsServiceHandler、JVMMetricReportServiceHandler

JVMMetricsService.proto

skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/language-agent/JVMMetricsService.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.skywalking.apm.network.language.agent";
option csharp_namespace = "SkyWalking.NetworkProtocol";

import "language-agent/Downstream.proto";
import "common/JVM.proto";

service JVMMetricsService {
    rpc collect (JVMMetrics) returns (Downstream) {
    }
}

message JVMMetrics {
    repeated JVMMetric metrics = 1;
    int32 applicationInstanceId = 2;
}
  • JVMMetricsService.proto定义了JVMMetricsService服务,它有一个collect方法接收JVMMetrics类型的参数

JVMMetricsServiceHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMMetricsServiceHandler.java

public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {

    private static final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class);

    private final JVMSourceDispatcher jvmSourceDispatcher;

    public JVMMetricsServiceHandler(ModuleManager moduleManager) {
        this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager);
    }

    @Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
        int serviceInstanceId = request.getApplicationInstanceId();

        if (logger.isDebugEnabled()) {
            logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId);
        }

        request.getMetricsList().forEach(metrics -> {
            long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());
            jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);
        });

        responseObserver.onNext(Downstream.newBuilder().build());
        responseObserver.onCompleted();
    }

}
  • JVMMetricsServiceHandler继承了JVMMetricsServiceGrpc.JVMMetricsServiceImplBase,其构造器创建JVMSourceDispatcher;其collect方法遍历request.getMetricsList()挨个执行jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics)

JVMMetric.proto

skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/language-agent-v2/JVMMetric.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.skywalking.apm.network.language.agent.v2";
option csharp_namespace = "SkyWalking.NetworkProtocol";

import "common/common.proto";
import "common/JVM.proto";

service JVMMetricReportService {
    rpc collect (JVMMetricCollection) returns (Commands) {
    }
}

message JVMMetricCollection {
    repeated JVMMetric metrics = 1;
    int32 serviceInstanceId = 2;
}
  • JVMMetric.proto定义了JVMMetricReportService服务,它有一个collect方法接收JVMMetricCollection类型的参数

JVMMetricReportServiceHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMMetricReportServiceHandler.java

public class JVMMetricReportServiceHandler extends JVMMetricReportServiceGrpc.JVMMetricReportServiceImplBase implements GRPCHandler {

    private static final Logger logger = LoggerFactory.getLogger(JVMMetricReportServiceHandler.class);

    private final JVMSourceDispatcher jvmSourceDispatcher;

    public JVMMetricReportServiceHandler(ModuleManager moduleManager) {
        this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager);
    }

    @Override public void collect(JVMMetricCollection request, StreamObserver<Commands> responseObserver) {
        int serviceInstanceId = request.getServiceInstanceId();

        if (logger.isDebugEnabled()) {
            logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId);
        }

        request.getMetricsList().forEach(metrics -> {
            long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());
            jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);
        });

        responseObserver.onNext(Commands.newBuilder().build());
        responseObserver.onCompleted();
    }

}
  • JVMMetricReportServiceHandler继承了JVMMetricReportServiceGrpc.JVMMetricReportServiceImplBase,其构造器创建JVMSourceDispatcher;其collect方法遍历request.getMetricsList()挨个执行jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics)

JVMSourceDispatcher

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMSourceDispatcher.java

public class JVMSourceDispatcher {
    private static final Logger logger = LoggerFactory.getLogger(JVMSourceDispatcher.class);
    private final SourceReceiver sourceReceiver;
    private final ServiceInstanceInventoryCache instanceInventoryCache;

    public JVMSourceDispatcher(ModuleManager moduleManager) {
        this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
        this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
    }

    void sendMetric(int serviceInstanceId, long minuteTimeBucket, JVMMetric metrics) {
        ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCache.get(serviceInstanceId);
        int serviceId;
        if (Objects.nonNull(serviceInstanceInventory)) {
            serviceId = serviceInstanceInventory.getServiceId();
        } else {
            logger.warn("Can't find service by service instance id from cache, service instance id is: {}", serviceInstanceId);
            return;
        }

        this.sendToCpuMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getCpu());
        this.sendToMemoryMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList());
        this.sendToMemoryPoolMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList());
        this.sendToGCMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getGcList());
    }

    private void sendToCpuMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, CPU cpu) {
        ServiceInstanceJVMCPU serviceInstanceJVMCPU = new ServiceInstanceJVMCPU();
        serviceInstanceJVMCPU.setId(serviceInstanceId);
        serviceInstanceJVMCPU.setName(Const.EMPTY_STRING);
        serviceInstanceJVMCPU.setServiceId(serviceId);
        serviceInstanceJVMCPU.setServiceName(Const.EMPTY_STRING);
        serviceInstanceJVMCPU.setUsePercent(cpu.getUsagePercent());
        serviceInstanceJVMCPU.setTimeBucket(timeBucket);
        sourceReceiver.receive(serviceInstanceJVMCPU);
    }

    private void sendToGCMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, List<GC> gcs) {
        gcs.forEach(gc -> {
            ServiceInstanceJVMGC serviceInstanceJVMGC = new ServiceInstanceJVMGC();
            serviceInstanceJVMGC.setId(serviceInstanceId);
            serviceInstanceJVMGC.setName(Const.EMPTY_STRING);
            serviceInstanceJVMGC.setServiceId(serviceId);
            serviceInstanceJVMGC.setServiceName(Const.EMPTY_STRING);

            switch (gc.getPhrase()) {
                case NEW:
                    serviceInstanceJVMGC.setPhrase(GCPhrase.NEW);
                    break;
                case OLD:
                    serviceInstanceJVMGC.setPhrase(GCPhrase.OLD);
                    break;
            }

            serviceInstanceJVMGC.setTime(gc.getTime());
            serviceInstanceJVMGC.setCount(gc.getCount());
            serviceInstanceJVMGC.setTimeBucket(timeBucket);
            sourceReceiver.receive(serviceInstanceJVMGC);
        });
    }

    private void sendToMemoryMetricProcess(int serviceId, int serviceInstanceId, long timeBucket,
        List<Memory> memories) {
        memories.forEach(memory -> {
            ServiceInstanceJVMMemory serviceInstanceJVMMemory = new ServiceInstanceJVMMemory();
            serviceInstanceJVMMemory.setId(serviceInstanceId);
            serviceInstanceJVMMemory.setName(Const.EMPTY_STRING);
            serviceInstanceJVMMemory.setServiceId(serviceId);
            serviceInstanceJVMMemory.setServiceName(Const.EMPTY_STRING);
            serviceInstanceJVMMemory.setHeapStatus(memory.getIsHeap());
            serviceInstanceJVMMemory.setInit(memory.getInit());
            serviceInstanceJVMMemory.setMax(memory.getMax());
            serviceInstanceJVMMemory.setUsed(memory.getUsed());
            serviceInstanceJVMMemory.setCommitted(memory.getCommitted());
            serviceInstanceJVMMemory.setTimeBucket(timeBucket);
            sourceReceiver.receive(serviceInstanceJVMMemory);
        });
    }

    private void sendToMemoryPoolMetricProcess(int serviceId, int serviceInstanceId, long timeBucket,
        List<MemoryPool> memoryPools) {

        memoryPools.forEach(memoryPool -> {
            ServiceInstanceJVMMemoryPool serviceInstanceJVMMemoryPool = new ServiceInstanceJVMMemoryPool();
            serviceInstanceJVMMemoryPool.setId(serviceInstanceId);
            serviceInstanceJVMMemoryPool.setName(Const.EMPTY_STRING);
            serviceInstanceJVMMemoryPool.setServiceId(serviceId);
            serviceInstanceJVMMemoryPool.setServiceName(Const.EMPTY_STRING);

            switch (memoryPool.getType()) {
                case NEWGEN_USAGE:
                    serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.NEWGEN_USAGE);
                    break;
                case OLDGEN_USAGE:
                    serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.OLDGEN_USAGE);
                    break;
                case PERMGEN_USAGE:
                    serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.PERMGEN_USAGE);
                    break;
                case SURVIVOR_USAGE:
                    serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.SURVIVOR_USAGE);
                    break;
                case METASPACE_USAGE:
                    serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.METASPACE_USAGE);
                    break;
                case CODE_CACHE_USAGE:
                    serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.CODE_CACHE_USAGE);
                    break;
            }

            serviceInstanceJVMMemoryPool.setInit(memoryPool.getInit());
            serviceInstanceJVMMemoryPool.setMax(memoryPool.getMax());
            serviceInstanceJVMMemoryPool.setUsed(memoryPool.getUsed());
            serviceInstanceJVMMemoryPool.setCommitted(memoryPool.getCommited());
            serviceInstanceJVMMemoryPool.setTimeBucket(timeBucket);
            sourceReceiver.receive(serviceInstanceJVMMemoryPool);
        });
    }
}
  • JVMSourceDispatcher主要是提供了sendMetric方法,该方法执行sendToCpuMetricProcess、sendToMemoryMetricProcess、sendToMemoryPoolMetricProcess、sendToGCMetricProcess方法;sendToCpuMetricProcess方法执行sourceReceiver.receive(serviceInstanceJVMCPU);sendToMemoryMetricProcess方法执行sourceReceiver.receive(serviceInstanceJVMMemory);sendToMemoryPoolMetricProcess方法执行sourceReceiver.receive(serviceInstanceJVMMemoryPool);sendToGCMetricProcess方法执行sourceReceiver.receive(serviceInstanceJVMGC)

JVMModuleProvider继承了ModuleProvider,其start方法获取grpcHandlerRegister然后添加JVMMetricsServiceHandler、JVMMetricReportServiceHandler;前者使用的是JVMMetricsService.proto,后者使用的是agent-v2的JVMMetric.proto


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK