11'use strict' ;
22
33var dgram = require ( 'dgram' ) ;
4- var blocked = require ( 'blocked' ) ;
4+
55var merge = require ( 'merge' ) ;
66var configHelper = require ( './config-helper' ) ;
77var transport = require ( './transport' ) ;
@@ -54,60 +54,6 @@ function putMetric (self, metricTypeConf, name, value, aggregation, aggregationF
5454 }
5555}
5656
57- /**
58- * Puts a system stats metric into the system stats buffer ready to be sent.
59- *
60- * @param self A self statful client.
61- * @param metricTypeConf A configuration for each metric type (counter, gauge, timer). Can be null if it a custom metric.
62- * @param name A metric name.
63- * @param value A metric value.
64- * @param aggregation The aggregation with which metric was aggregated.
65- * @param aggregationFreq The aggregation frequency with which metric was aggregated.
66- * @param parameters An object with metric para meters: tags, agg, aggFreq, namespace and timestamp.
67- */
68- function putSystemStatsMetrics ( self , name , value , parameters ) {
69- var metricParams = parameters || { } ;
70- var tags = metricParams . tags ,
71- agg = metricParams . agg ,
72- aggFreq = metricParams . aggFreq ,
73- namespace = metricParams . namespace ,
74- timestamp = metricParams . timestamp ,
75- sampleRate = metricParams . sampleRate ;
76-
77- putSystemStats ( self , name , value , {
78- tags : tags ,
79- agg : agg ,
80- aggFreq : aggFreq ,
81- namespace : namespace ,
82- timestamp : timestamp ,
83- sampleRate : sampleRate
84- } ) ;
85- }
86-
87- function sendFlushStats ( self ) {
88- if ( self . systemStats ) {
89- var aggregations = [ 'avg' , 'sum' ] ;
90- if ( self . aggregatedBuffer . bufferSize > 0 && self . transport === 'api' ) {
91- putSystemStatsMetrics ( self , 'buffer.flush_length' , self . aggregatedBuffer . bufferSize , {
92- agg : aggregations ,
93- tags : { buffer_type : 'aggregated' }
94- } ) ;
95- }
96- if ( self . nonAggregatedBuffer . bufferSize > 0 ) {
97- putSystemStatsMetrics ( self , 'buffer.flush_length' , self . nonAggregatedBuffer . bufferSize , {
98- agg : aggregations ,
99- tags : { buffer_type : 'non-aggregated' }
100- } ) ;
101- }
102- if ( self . systemStatsBuffer . bufferSize > 0 ) {
103- putSystemStatsMetrics ( self , 'buffer.flush_length' , self . systemStatsBuffer . bufferSize , {
104- agg : aggregations ,
105- tags : { buffer_type : 'system-stats' }
106- } ) ;
107- }
108- }
109- }
110-
11157/**
11258 * Logs all the metrics to the logger
11359 *
@@ -138,11 +84,21 @@ function logMetrics (self) {
13884 }
13985 self . logger . debug ( stringToLogHeader + stringToLog ) ;
14086 }
141- if ( self . systemStatsBuffer . bufferSize > 0 ) {
142- self . logger . debug ( 'Flushing metrics (system stats): ' + self . systemStatsBuffer . buffer ) ;
87+ if ( pluginsBuffersSize ( self . pluginBuffers ) > 0 ) {
88+ self . logger . debug ( 'Flushing plugins metrics' ) ;
14389 }
14490}
14591
92+ function pluginsBuffersSize ( buffers ) {
93+ var size = 0 ;
94+
95+ for ( var i in buffers ) {
96+ size += buffers [ i ] . bufferSize ;
97+ }
98+
99+ return size ;
100+ }
101+
146102/**
147103 * Sends the non aggregated and system stats metrics using UDP transport
148104 *
@@ -164,8 +120,8 @@ function sendMetricsByUdpTransport (self) {
164120 self . socket . send ( buffer , 0 , buffer . length , self . port , self . host ) ;
165121 }
166122
167- if ( self . systemStatsBuffer . bufferSize > 0 ) {
168- buffer = new Buffer ( self . systemStatsBuffer . buffer ) ;
123+ for ( var i in self . pluginBuffers ) {
124+ buffer = new Buffer ( self . pluginBuffers [ i ] . buffer ) ;
169125 self . socket . send ( buffer , 0 , buffer . length , self . port , self . host ) ;
170126 }
171127}
@@ -233,22 +189,25 @@ function sendMetricsByApiTransport (self) {
233189 }
234190 }
235191
236- if ( self . systemStatsBuffer . bufferSize > 0 ) {
237- var nonAggregatedStatsOptions = transport . buildRequestOptions (
238- self . protocol ,
239- self . host ,
240- self . port ,
241- self . basePath ,
242- self . token ,
243- self . timeout
244- ) ;
245-
246- self . logger . debug ( 'Flushing to ' + nonAggregatedStatsOptions . url + ' system stats metrics' ) ;
247-
248- if ( self . compression ) {
249- transport . sendCompressedMessage ( nonAggregatedStatsOptions , self . systemStatsBuffer . buffer , self . logger ) ;
250- } else {
251- transport . sendUncompressedMessage ( nonAggregatedStatsOptions , self . systemStatsBuffer . buffer , self . logger ) ;
192+ for ( var i in self . pluginBuffers ) {
193+ var element = self . pluginBuffers [ i ] ;
194+ if ( element . bufferSize > 0 ) {
195+ var nonAggregatedStatsOptions = transport . buildRequestOptions (
196+ self . protocol ,
197+ self . host ,
198+ self . port ,
199+ self . basePath ,
200+ self . token ,
201+ self . timeout
202+ ) ;
203+
204+ self . logger . debug ( 'Flushing to ' + nonAggregatedStatsOptions . url + ' system stats metrics' ) ;
205+
206+ if ( self . compression ) {
207+ transport . sendCompressedMessage ( nonAggregatedStatsOptions , element . buffer , self . logger ) ;
208+ } else {
209+ transport . sendUncompressedMessage ( nonAggregatedStatsOptions , element . buffer , self . logger ) ;
210+ }
252211 }
253212 }
254213}
@@ -259,9 +218,12 @@ function sendMetricsByApiTransport (self) {
259218 * @param self A self client instance.
260219 */
261220function flush ( self ) {
262- sendFlushStats ( self ) ;
221+ for ( var index = 0 ; index < self . plugins . length ; index ++ ) {
222+ self . plugins [ index ] . onFlush ( self ) ;
223+ }
224+
263225 var metricsCounter =
264- self . aggregatedBuffer . bufferSize + self . nonAggregatedBuffer . bufferSize + self . systemStatsBuffer . bufferSize ;
226+ self . aggregatedBuffer . bufferSize + self . nonAggregatedBuffer . bufferSize + pluginsBuffersSize ( self . pluginBuffers ) ;
265227
266228 if ( metricsCounter > 0 ) {
267229 if ( self . dryRun ) {
@@ -280,8 +242,11 @@ function flush (self) {
280242 self . aggregatedBuffer . bufferSize = 0 ;
281243 self . nonAggregatedBuffer . buffer = '' ;
282244 self . nonAggregatedBuffer . bufferSize = 0 ;
283- self . systemStatsBuffer . buffer = '' ;
284- self . systemStatsBuffer . bufferSize = 0 ;
245+
246+ for ( var i in self . pluginBuffers ) {
247+ self . pluginBuffers [ i ] . buffer = '' ;
248+ self . pluginBuffers [ i ] . bufferSize = 0 ;
249+ }
285250 }
286251}
287252
@@ -317,7 +282,7 @@ function addToBuffer (self, metricLines, isMetricAggregated, agg, aggFreq) {
317282 if (
318283 self . aggregatedBuffer . bufferSize +
319284 self . nonAggregatedBuffer . bufferSize +
320- self . systemStatsBuffer . bufferSize >=
285+ pluginsBuffersSize ( self . pluginBuffers ) >=
321286 self . flushSize
322287 ) {
323288 setTimeout ( function ( ) {
@@ -329,27 +294,6 @@ function addToBuffer (self, metricLines, isMetricAggregated, agg, aggFreq) {
329294 }
330295}
331296
332- /**
333- * Adds raw metrics directly into the flush buffer. Use this method with caution.
334- *
335- * @param self A self client instance.
336- * @param metricLines The metrics, in valid line protocol, to push to the buffer.
337- */
338- function addToStatsBuffer ( self , metricLines ) {
339- if ( typeof metricLines !== 'undefined' ) {
340- var targetBuffer = self . systemStatsBuffer ;
341-
342- if ( targetBuffer . bufferSize > 0 ) {
343- targetBuffer . buffer += '\n' ;
344- }
345-
346- targetBuffer . buffer += metricLines ;
347- targetBuffer . bufferSize ++ ;
348- } else {
349- self . logger . error ( 'addToStatsBuffer: Invalid metric lines: ' + metricLines ) ;
350- }
351- }
352-
353297/**
354298 * Adds a new metric to the in-memory buffer.
355299 *
@@ -409,60 +353,6 @@ function putRaw (self, metric, value, parameters, isMetricAggregated) {
409353 }
410354}
411355
412- /**
413- * Adds a new system stats metric to the in-memory system stats buffer.
414- *
415- * @param self A self client instance.
416- * @param metric Name metric such as 'response_time'.
417- * @param value.
418- * @param parameters An object with metric para meters: tags, agg, aggFreq, namespace and timestamp.
419- * - tags: Tags to associate this value with, for example {from: 'serviceA', to: 'serviceB', method: 'login'}.
420- * - agg: List of aggregations to be applied by Statful. Ex: ['avg', 'p90', 'min'].
421- * - aggFreq: Aggregation frequency in seconds. One of: 10, 30, 60 ,120, 180, 300. Default: 10.
422- * - namespace: Define the metric namespace. Default: application.
423- * - timestamp: Defines the metrics timestamp. Default: current timestamp.
424- */
425- function putSystemStats ( self , metric , value , parameters ) {
426- var metricParams = parameters || { } ;
427-
428- var tags = metricParams . tags ,
429- agg = metricParams . agg ,
430- aggFreq = metricParams . aggFreq ,
431- namespace = metricParams . namespace ,
432- timestamp = metricParams . timestamp ,
433- sampleRate = parameters . sampleRate || self . sampleRate ;
434-
435- // Vars to Put
436- var putNamespace = namespace || self . namespace ;
437- var putAggFreq = aggFreq || 10 ;
438- var putTags = merge ( self . app ? merge ( { app : self . app } , tags ) : tags , self . tags ) ;
439-
440- var metricName = putNamespace + '.' + metric ,
441- flushLine = metricName ,
442- sampleRateNormalized = ( sampleRate || 100 ) / 100 ;
443-
444- if ( Math . random ( ) <= sampleRateNormalized ) {
445- flushLine = Object . keys ( putTags ) . reduce ( function ( previousValue , tag ) {
446- return previousValue + ',' + tag + '=' + putTags [ tag ] ;
447- } , flushLine ) ;
448-
449- flushLine += ' ' + value + ' ' + ( timestamp || Math . round ( new Date ( ) . getTime ( ) / 1000 ) ) ;
450-
451- if ( agg ) {
452- agg . push ( putAggFreq ) ;
453- flushLine += ' ' + agg . join ( ',' ) ;
454-
455- if ( sampleRate && sampleRate < 100 ) {
456- flushLine += ' ' + sampleRate ;
457- }
458- }
459-
460- addToStatsBuffer ( self , [ flushLine ] ) ;
461- } else {
462- self . logger . debug ( 'Metric was discarded due to sample rate.' ) ;
463- }
464- }
465-
466356/**
467357 * Calls put metric with an aggregated metric.
468358 *
@@ -532,7 +422,6 @@ var Client = function (config, logger) {
532422 this . namespace = config . namespace || 'application' ;
533423 this . dryRun = config . dryRun ;
534424 this . tags = config . tags || { } ;
535- this . systemStats = config . systemStats !== undefined ? config . systemStats : true ;
536425 this . sampleRate = config . sampleRate || 100 ;
537426 this . flushInterval = config . flushInterval || 3000 ;
538427 this . flushSize = config . flushSize || 1000 ;
@@ -559,19 +448,8 @@ var Client = function (config, logger) {
559448 bufferSize : 0
560449 } ;
561450
562- this . systemStatsBuffer = {
563- buffer : '' ,
564- bufferSize : 0
565- } ;
566-
567- if ( this . systemStats ) {
568- blocked ( function ( ms ) {
569- putSystemStatsMetrics ( self , 'timer.event_loop' , ms , {
570- agg : [ 'avg' , 'p90' , 'count' ] ,
571- tags : { unit : 'ms' }
572- } ) ;
573- } ) ;
574- }
451+ this . plugins = [ ] ;
452+ this . pluginBuffers = { } ;
575453
576454 setInterval (
577455 function ( obj ) {
@@ -739,4 +617,9 @@ Client.prototype.timer = function (name, value, parameters) {
739617 putNonAggregatedMetric ( this , this . default . timer , 'timer.' + name , value , parameters ) ;
740618} ;
741619
620+ Client . prototype . use = function ( plugin ) {
621+ plugin . onInit ( this ) ;
622+ this . plugins . push ( plugin ) ;
623+ } ;
624+
742625module . exports = Client ;
0 commit comments