当前位置: 首页 > news >正文

聊聊storm的LoggingMetricsConsumer

本文主要研究一下storm的LoggingMetricsConsumer

LoggingMetricsConsumer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/LoggingMetricsConsumer.java

public class LoggingMetricsConsumer implements IMetricsConsumer {
    public static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsConsumer.class);
    static private String padding = "                       ";

    @Override
    public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
    }

    @Override
    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
        StringBuilder sb = new StringBuilder();
        String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t",
                                      taskInfo.timestamp,
                                      taskInfo.srcWorkerHost, taskInfo.srcWorkerPort,
                                      taskInfo.srcTaskId,
                                      taskInfo.srcComponentId);
        sb.append(header);
        for (DataPoint p : dataPoints) {
            sb.delete(header.length(), sb.length());
            sb.append(p.name)
              .append(padding).delete(header.length() + 23, sb.length()).append("\t")
              .append(p.value);
            LOG.info(sb.toString());
        }
    }

    @Override
    public void cleanup() {
    }
}
复制代码
  • LoggingMetricsConsumer实现了IMetricsConsumer接口,在handleDataPoints方法将taskInfo及dataPoints打印到log;具体打印到哪个log呢,这个需要看storm的log4j2的配置

log4j2/worker.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->

<configuration monitorInterval="60" shutdownHook="disable">
<properties>
    <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property>
    <property name="patternNoTime">%msg%n</property>
    <property name="patternMetrics">%d %-8r %m%n</property>
</properties>
<appenders>
    <RollingFile name="A1"
		fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}"
		filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz">
        <PatternLayout>
            <pattern>${pattern}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <RollingFile name="STDOUT"
		fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out"
		filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz">
        <PatternLayout>
            <pattern>${patternNoTime}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="4"/>
    </RollingFile>
    <RollingFile name="STDERR"
		fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err"
		filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz">
        <PatternLayout>
            <pattern>${patternNoTime}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="4"/>
    </RollingFile>
    <RollingFile name="METRICS"
		fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics"
		filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics.%i.gz">
        <PatternLayout>
            <pattern>${patternMetrics}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="2 MB"/>
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"
        protocol="UDP" appName="[${sys:storm.id}:${sys:worker.port}]" mdcId="mdc" includeMDC="true"
        facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"
        messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFail="true" immediateFlush="true"/>
</appenders>
<loggers>
    <root level="info"> <!-- We log everything -->
        <appender-ref ref="A1"/>
        <appender-ref ref="syslog"/>
    </root>
    <Logger name="org.apache.storm.metric.LoggingMetricsConsumer" level="info" additivity="false">
        <appender-ref ref="METRICS"/>
    </Logger>
    <Logger name="STDERR" level="INFO">
        <appender-ref ref="STDERR"/>
        <appender-ref ref="syslog"/>
    </Logger>
    <Logger name="STDOUT" level="INFO">
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="syslog"/>
    </Logger>
</loggers>
</configuration>
复制代码
  • 以worker.xml为例,这里对name为org.apache.storm.metric.LoggingMetricsConsumer的logger配置了info级别的输出,additivity为false
  • METRICS的appender指定了文件名为${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics,比如workers-artifacts/tickDemo-1-1541070680/6700/worker.log.metrics
  • METRCIS配置的是RollingFile,SizeBasedTriggeringPolicy的大小为2MB

配置

topology配置

conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
复制代码
  • 可以在topology提交的时候,在conf注册LoggingMetricsConsumer;这种配置只有该topology的worker生效,即有指标数据的话,会写入topology的worker.log.metrics文件

storm.yaml配置

topology.metrics.consumer.register:
  - class: "org.apache.storm.metric.LoggingMetricsConsumer"
    max.retain.metric.tuples: 100
    parallelism.hint: 1
  - class: "org.apache.storm.metric.HttpForwardingMetricsConsumer"
    parallelism.hint: 1
    argument: "http://example.com:8080/metrics/my-topology/"
复制代码
  • storm.yaml配置是作用于所有的topology,注意这里配置的是topology.metrics.consumer.register,是topology级别的,数据是写入worker.log.metrics文件
  • 如果是cluster级别的话,配置的是storm.cluster.metrics.consumer.register,而且只能使用storm.yaml的配置方式,开启这个的话,有指标数据会写入nimbus.log.metrics以及supervisor.log.metrics文件
  • 启动nimbus以及supervisor采用的log4j配置参数为-Dlog4j.configurationFile=/apache-storm/log4j2/cluster.xml;而启动woker采用的log4j配置参数为-Dlog4j.configurationFile=/apache-storm/log4j2/worker.xml;各个组件的-Dlogfile.name参数分别为nimbus.log、supervisor.log、worker.log

MetricsConsumerBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java

public class MetricsConsumerBolt implements IBolt {
    public static final Logger LOG = LoggerFactory.getLogger(MetricsConsumerBolt.class);
    private final int _maxRetainMetricTuples;
    private final Predicate<IMetricsConsumer.DataPoint> _filterPredicate;
    private final DataPointExpander _expander;
    private final BlockingQueue<MetricsTask> _taskQueue;
    IMetricsConsumer _metricsConsumer;
    String _consumerClassName;
    OutputCollector _collector;
    Object _registrationArgument;
    private Thread _taskExecuteThread;
    private volatile boolean _running = true;

    public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples,
                               Predicate<IMetricsConsumer.DataPoint> filterPredicate, DataPointExpander expander) {

        _consumerClassName = consumerClassName;
        _registrationArgument = registrationArgument;
        _maxRetainMetricTuples = maxRetainMetricTuples;
        _filterPredicate = filterPredicate;
        _expander = expander;

        if (_maxRetainMetricTuples > 0) {
            _taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples);
        } else {
            _taskQueue = new LinkedBlockingDeque<>();
        }
    }

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        try {
            _metricsConsumer = (IMetricsConsumer) Class.forName(_consumerClassName).newInstance();
        } catch (Exception e) {
            throw new RuntimeException("Could not instantiate a class listed in config under section " +
                                       Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
        }
        _metricsConsumer.prepare(topoConf, _registrationArgument, context, collector);
        _collector = collector;
        _taskExecuteThread = new Thread(new MetricsHandlerRunnable());
        _taskExecuteThread.setDaemon(true);
        _taskExecuteThread.start();
    }

    @Override
    public void execute(Tuple input) {
        IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) input.getValue(0);
        Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) input.getValue(1);
        Collection<IMetricsConsumer.DataPoint> expandedDataPoints = _expander.expandDataPoints(dataPoints);
        List<IMetricsConsumer.DataPoint> filteredDataPoints = getFilteredDataPoints(expandedDataPoints);
        MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints);

        while (!_taskQueue.offer(metricsTask)) {
            _taskQueue.poll();
        }

        _collector.ack(input);
    }

    private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) {
        return Lists.newArrayList(Iterables.filter(dataPoints, _filterPredicate));
    }

    @Override
    public void cleanup() {
        _running = false;
        _metricsConsumer.cleanup();
        _taskExecuteThread.interrupt();
    }

    static class MetricsTask {
        private IMetricsConsumer.TaskInfo taskInfo;
        private Collection<IMetricsConsumer.DataPoint> dataPoints;

        public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
            this.taskInfo = taskInfo;
            this.dataPoints = dataPoints;
        }

        public IMetricsConsumer.TaskInfo getTaskInfo() {
            return taskInfo;
        }

        public Collection<IMetricsConsumer.DataPoint> getDataPoints() {
            return dataPoints;
        }
    }

    class MetricsHandlerRunnable implements Runnable {

        @Override
        public void run() {
            while (_running) {
                try {
                    MetricsTask task = _taskQueue.take();
                    _metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
                } catch (InterruptedException e) {
                    break;
                } catch (Throwable t) {
                    LOG.error("Exception occurred during handle metrics", t);
                }
            }
        }
    }

}
复制代码
  • MetricsConsumerBolt在构造器里头创建了_taskQueue,如果_maxRetainMetricTuples大于0,则创建的是有界队列,否则创建的是无界队列;读取的是topology.metrics.consumer.register下面的max.retain.metric.tuples值,读取不到默认为100
  • MetricsConsumerBolt在prepare的时候启动了MetricsHandlerRunnable线程,该线程从_taskQueue取出MetricsTask,然后调用_metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
  • MetricsConsumerBolt的execute方法,在接收到tuple的时候,就会往_taskQueue添加数据,如果添加不进去,则poll掉一个再添加

StormCommon.systemTopologyImpl

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

    protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
        validateBasic(topology);

        StormTopology ret = topology.deepCopy();
        addAcker(topoConf, ret);
        if (hasEventLoggers(topoConf)) {
            addEventLogger(topoConf, ret);
        }
        addMetricComponents(topoConf, ret);
        addSystemComponents(topoConf, ret);
        addMetricStreams(ret);
        addSystemStreams(ret);

        validateStructure(ret);

        return ret;
    }

    public static void addMetricComponents(Map<String, Object> conf, StormTopology topology) {
        Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);
        for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {
            topology.put_to_bolts(entry.getKey(), entry.getValue());
        }
    }

    public static void addMetricStreams(StormTopology topology) {
        for (Object component : allComponents(topology).values()) {
            ComponentCommon common = getComponentCommon(component);
            StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
            common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);
        }
    }

    public static Map<String, Bolt> metricsConsumerBoltSpecs(Map<String, Object> conf, StormTopology topology) {
        Map<String, Bolt> metricsConsumerBolts = new HashMap<>();

        Set<String> componentIdsEmitMetrics = new HashSet<>();
        componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
        componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);

        Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
        for (String componentId : componentIdsEmitMetrics) {
            inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
        }

        List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
        if (registerInfo != null) {
            Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
            for (Map<String, Object> info : registerInfo) {
                String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);
                Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);
                Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(
                    TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100);
                Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);
                Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
                metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
                List<String> whitelist = (List<String>) info.get(
                    TOPOLOGY_METRICS_CONSUMER_WHITELIST);
                List<String> blacklist = (List<String>) info.get(
                    TOPOLOGY_METRICS_CONSUMER_BLACKLIST);
                FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);
                Boolean expandMapType = ObjectReader.getBoolean(info.get(
                    TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE), false);
                String metricNameSeparator = ObjectReader.getString(info.get(
                    TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR), ".");
                DataPointExpander expander = new DataPointExpander(expandMapType, metricNameSeparator);
                MetricsConsumerBolt boltInstance = new MetricsConsumerBolt(className, argument,
                                                                           maxRetainMetricTuples, filterPredicate, expander);
                Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
                                                                               boltInstance, null, phintNum, metricsConsumerConf);

                String id = className;
                if (classOccurrencesMap.containsKey(className)) {
                    // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
                    int occurrenceNum = classOccurrencesMap.get(className);
                    occurrenceNum++;
                    classOccurrencesMap.put(className, occurrenceNum);
                    id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;
                } else {
                    id = Constants.METRICS_COMPONENT_ID_PREFIX + className;
                    classOccurrencesMap.put(className, 1);
                }
                metricsConsumerBolts.put(id, metricsConsumerBolt);
            }
        }
        return metricsConsumerBolts;
    }
复制代码
  • StormCommon在创建systemTopologyImpl的时候,会添加添加一些系统的components,这里就调用了addMetricComponents以及addMetricStreams
  • addMetricComponents根据conf创建MetricsConsumerBolt,并使用shuffle以及Constants.METRICS_STREAM_ID指定所有的component为输入源
  • addMetricStreams给每个component配置了输出数据到Constants.METRICS_STREAM_ID,且输出的字段为Arrays.asList("task-info", "data-points")

Executor.setupMetrics

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;

    protected void setupMetrics() {
        for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
            StormTimer timerTask = workerData.getUserTimer();
            timerTask.scheduleRecurring(interval, interval,
                                        () -> {
                                            TupleImpl tuple =
                                                new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID,
                                                              (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
                                            AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
                                            try {
                                                receiveQueue.publish(metricsTickTuple);
                                                receiveQueue.flush();  // avoid buffering
                                            } catch (InterruptedException e) {
                                                LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");
                                                Thread.currentThread().interrupt();
                                                return;
                                            }
                                        }
            );
        }
    }

    public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() {
        return intervalToTaskToMetricToRegistry;
    }
复制代码
  • Executor在setupMetrics方法里头,设置了定时任务,采用BROADCAST_DEST的方式定时向METRICS_TICK_STREAM_ID发射metricsTickTuple
  • 这里是依据intervalToTaskToMetricToRegistry来配置的,其key为interval
  • intervalToTaskToMetricToRegistry在Executor构造器中初始化:intervalToTaskToMetricToRegistry = new HashMap<>()

Task.mkTopologyContext

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java

    private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
        Map<String, Object> conf = workerData.getConf();
        return new TopologyContext(
            topology,
            workerData.getTopologyConf(),
            workerData.getTaskToComponent(),
            workerData.getComponentToSortedTasks(),
            workerData.getComponentToStreamToFields(),
            // This is updated by the Worker and the topology has shared access to it
            workerData.getBlobToLastKnownVersion(),
            workerData.getTopologyId(),
            ConfigUtils.supervisorStormResourcesPath(
                ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),
            ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
            taskId,
            workerData.getPort(), workerData.getLocalTaskIds(),
            workerData.getDefaultSharedResources(),
            workerData.getUserSharedResources(),
            executor.getSharedExecutorData(),
            executor.getIntervalToTaskToMetricToRegistry(),
            executor.getOpenOrPrepareWasCalled());
    }
复制代码
  • mkTopologyContext方法在创建TopologyContext的时候,传递进去了executor.getIntervalToTaskToMetricToRegistry()

TopologyContext

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java

public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
    private Integer _taskId;
    private Map<String, Object> _taskData = new HashMap<>();
    private List<ITaskHook> _hooks = new ArrayList<>();
    private Map<String, Object> _executorData;
    private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics;

    public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
        if (_openOrPrepareWasCalled.get()) {
            throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
                                       "IBolt::prepare() or ISpout::open() method.");
        }

        if (metric == null) {
            throw new IllegalArgumentException("Cannot register a null metric");
        }

        if (timeBucketSizeInSecs <= 0) {
            throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " +
                                               "greater than or equal to 1 second.");
        }

        if (getRegisteredMetricByName(name) != null) {
            throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
        }

        Map<Integer, Map<Integer, Map<String, IMetric>>> m1 = _registeredMetrics;
        if (!m1.containsKey(timeBucketSizeInSecs)) {
            m1.put(timeBucketSizeInSecs, new HashMap<Integer, Map<String, IMetric>>());
        }

        Map<Integer, Map<String, IMetric>> m2 = m1.get(timeBucketSizeInSecs);
        if (!m2.containsKey(_taskId)) {
            m2.put(_taskId, new HashMap<String, IMetric>());
        }

        Map<String, IMetric> m3 = m2.get(_taskId);
        if (m3.containsKey(name)) {
            throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
        } else {
            m3.put(name, metric);
        }

        return metric;
    }

    //......
}
复制代码
  • Executor的intervalToTaskToMetricToRegistry最后传递给了TopologyContext的_registeredMetrics
  • registerMetric方法会往_registeredMetrics添加值,其key为timeBucketSizeInSecs
  • 内置metrics的timeBucketSizeInSecs读取的是Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS(topology.builtin.metrics.bucket.size.secs)值,在defaults.yaml中默认为60,即Executor每隔60秒发射一次metricsTickTuple,其streamId为Constants.METRICS_TICK_STREAM_ID

Executor.metricsTick

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    public void metricsTick(Task task, TupleImpl tuple) {
        try {
            Integer interval = tuple.getInteger(0);
            int taskId = task.getTaskId();
            Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval);
            Map<String, IMetric> nameToRegistry = null;
            if (taskToMetricToRegistry != null) {
                nameToRegistry = taskToMetricToRegistry.get(taskId);
            }
            if (nameToRegistry != null) {
                IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
                    hostname, workerTopologyContext.getThisWorkerPort(),
                    componentId, taskId, Time.currentTimeSecs(), interval);
                List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
                for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
                    IMetric metric = entry.getValue();
                    Object value = metric.getValueAndReset();
                    if (value != null) {
                        IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
                        dataPoints.add(dataPoint);
                    }
                }
                if (!dataPoints.isEmpty()) {
                    task.sendUnanchored(Constants.METRICS_STREAM_ID,
                                        new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);
                    executorTransfer.flush();
                }
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
复制代码
  • SpoutExecutor以及BoltExecutor在tupleActionFn中接收到streamId为Constants.METRICS_TICK_STREAM_ID的tuple的时候,会调用父类Executor.metricsTick方法
  • metricsTick采用task.sendUnanchored(Constants.METRICS_STREAM_ID, new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);发射数据,发射到Constants.METRICS_STREAM_ID中,values为taskInfo及dataPoints;dataPoints的数据从TopologyContext的_registeredMetrics中读取(这个使用的是旧版的metrics,非V2版本)
  • MetricsConsumerBolt接收到数据之后,就是放入_taskQueue队列;与此同时MetricsHandlerRunnable线程会阻塞从_taskQueue中取数据,然后回调_metricsConsumer.handleDataPoints方法来消费数据

小结

  • LoggingMetricsConsumer是storm metric提供的,metrics2中没有;nimbus及supervisor使用的是-Dlog4j.configurationFile=/apache-storm/log4j2/cluster.xml;worker使用的是-Dlog4j.configurationFile=/apache-storm/log4j2/worker.xml;各个组件的-Dlogfile.name分别为nimbus.log、supervisor.log、worker.log
  • storm在构建topology的时候会添加系统的component,其中就包括添加metricsConsumerBolt以及metricStreams;同时Executor在init方法中会setupMetrics,定时发射metricsTickTuple;SpoutExecutor以及BoltExecutor在tupleActionFn接收到metricsTickTuple的时候,会调用metricsTick方法来生产数据发射到Constants.METRICS_STREAM_ID中,之后MetricsConsumerBolt就可以接收数据,然后回调_metricsConsumer.handleDataPoints方法来消费数据
  • 这里要注意两个参数,一个是MetricsConsumerBolt里头用到的max.retain.metric.tuples,它是配置在topology.metrics.consumer.register下面的,如果没有配置默认为100;它是MetricsConsumerBolt里头_taskQueue队列的大小,如果设置为0,则表示无界;内置metric的interval读取的是Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS(topology.builtin.metrics.bucket.size.secs)参数,默认为60,即60秒发射一次metricsTickTuple

doc

  • Storm Metrics
  • New Metrics Reporting API

相关文章:

  • 寻址方式的综合运用
  • 如何正确选择开源数据库?你需要这的5个技巧
  • 这个十月,我写出了人生中第一个小程序
  • @RestController注解的使用
  • 为什么你总是拿不到高薪?
  • 移动端常遇见的问题
  • SQLServer之创建显式事务
  • 未来的浏览器会怎么发展呢?
  • 外网访问内网CouchDB数据库
  • Google开源机器学习工作流Kubeflow Pipelines,推出AI Hub
  • 阿里新任CEO张勇首次电视采访:云计算将是阿里未来主要业务
  • 4.3dotnet watch run「深入浅出ASP.NET Core系列」
  • MATLAB 求两个矩阵的 欧氏距离
  • Git初体验
  • Adaptive Execution让Spark SQL更高效更好用
  • SegmentFault for Android 3.0 发布
  • 【跃迁之路】【669天】程序员高效学习方法论探索系列(实验阶段426-2018.12.13)...
  • 2018以太坊智能合约编程语言solidity的最佳IDEs
  • HTTP传输编码增加了传输量,只为解决这一个问题 | 实用 HTTP
  • HTTP请求重发
  • Iterator 和 for...of 循环
  • JavaScript创建对象的四种方式
  • JavaWeb(学习笔记二)
  • mongodb--安装和初步使用教程
  • quasar-framework cnodejs社区
  • Xmanager 远程桌面 CentOS 7
  • 关于Android中设置闹钟的相对比较完善的解决方案
  • 关于字符编码你应该知道的事情
  • 聊聊directory traversal attack
  • 实习面试笔记
  • 实战|智能家居行业移动应用性能分析
  • 突破自己的技术思维
  • JavaScript 新语法详解:Class 的私有属性与私有方法 ...
  • Play Store发现SimBad恶意软件,1.5亿Android用户成受害者 ...
  • 容器镜像
  • ​​快速排序(四)——挖坑法,前后指针法与非递归
  • ​低代码平台的核心价值与优势
  • #!/usr/bin/python与#!/usr/bin/env python的区别
  • #QT(TCP网络编程-服务端)
  • #我与Java虚拟机的故事#连载02:“小蓝”陪伴的日日夜夜
  • (4)logging(日志模块)
  • (Forward) Music Player: From UI Proposal to Code
  • (新)网络工程师考点串讲与真题详解
  • (续)使用Django搭建一个完整的项目(Centos7+Nginx)
  • (转)Android学习系列(31)--App自动化之使用Ant编译项目多渠道打包
  • (转)Java socket中关闭IO流后,发生什么事?(以关闭输出流为例) .
  • (转)Windows2003安全设置/维护
  • (转)四层和七层负载均衡的区别
  • (转贴)用VML开发工作流设计器 UCML.NET工作流管理系统
  • .[hudsonL@cock.li].mkp勒索加密数据库完美恢复---惜分飞
  • .NET Core 网络数据采集 -- 使用AngleSharp做html解析
  • .net Signalr 使用笔记
  • .NET 中小心嵌套等待的 Task,它可能会耗尽你线程池的现有资源,出现类似死锁的情况
  • .NET/C# 推荐一个我设计的缓存类型(适合缓存反射等耗性能的操作,附用法)
  • .net实现客户区延伸至至非客户区