decode.go
  1  // Copyright 2015 The Prometheus Authors
  2  // Licensed under the Apache License, Version 2.0 (the "License");
  3  // you may not use this file except in compliance with the License.
  4  // You may obtain a copy of the License at
  5  //
  6  // http://www.apache.org/licenses/LICENSE-2.0
  7  //
  8  // Unless required by applicable law or agreed to in writing, software
  9  // distributed under the License is distributed on an "AS IS" BASIS,
 10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 11  // See the License for the specific language governing permissions and
 12  // limitations under the License.
 13  
 14  package expfmt
 15  
 16  import (
 17  	"fmt"
 18  	"io"
 19  	"math"
 20  	"mime"
 21  	"net/http"
 22  
 23  	dto "github.com/prometheus/client_model/go"
 24  
 25  	"github.com/matttproud/golang_protobuf_extensions/pbutil"
 26  	"github.com/prometheus/common/model"
 27  )
 28  
 29  // Decoder types decode an input stream into metric families.
 30  type Decoder interface {
 31  	Decode(*dto.MetricFamily) error
 32  }
 33  
 34  // DecodeOptions contains options used by the Decoder and in sample extraction.
 35  type DecodeOptions struct {
 36  	// Timestamp is added to each value from the stream that has no explicit timestamp set.
 37  	Timestamp model.Time
 38  }
 39  
 40  // ResponseFormat extracts the correct format from a HTTP response header.
 41  // If no matching format can be found FormatUnknown is returned.
 42  func ResponseFormat(h http.Header) Format {
 43  	ct := h.Get(hdrContentType)
 44  
 45  	mediatype, params, err := mime.ParseMediaType(ct)
 46  	if err != nil {
 47  		return FmtUnknown
 48  	}
 49  
 50  	const textType = "text/plain"
 51  
 52  	switch mediatype {
 53  	case ProtoType:
 54  		if p, ok := params["proto"]; ok && p != ProtoProtocol {
 55  			return FmtUnknown
 56  		}
 57  		if e, ok := params["encoding"]; ok && e != "delimited" {
 58  			return FmtUnknown
 59  		}
 60  		return FmtProtoDelim
 61  
 62  	case textType:
 63  		if v, ok := params["version"]; ok && v != TextVersion {
 64  			return FmtUnknown
 65  		}
 66  		return FmtText
 67  	}
 68  
 69  	return FmtUnknown
 70  }
 71  
 72  // NewDecoder returns a new decoder based on the given input format.
 73  // If the input format does not imply otherwise, a text format decoder is returned.
 74  func NewDecoder(r io.Reader, format Format) Decoder {
 75  	switch format {
 76  	case FmtProtoDelim:
 77  		return &protoDecoder{r: r}
 78  	}
 79  	return &textDecoder{r: r}
 80  }
 81  
 82  // protoDecoder implements the Decoder interface for protocol buffers.
 83  type protoDecoder struct {
 84  	r io.Reader
 85  }
 86  
 87  // Decode implements the Decoder interface.
 88  func (d *protoDecoder) Decode(v *dto.MetricFamily) error {
 89  	_, err := pbutil.ReadDelimited(d.r, v)
 90  	if err != nil {
 91  		return err
 92  	}
 93  	if !model.IsValidMetricName(model.LabelValue(v.GetName())) {
 94  		return fmt.Errorf("invalid metric name %q", v.GetName())
 95  	}
 96  	for _, m := range v.GetMetric() {
 97  		if m == nil {
 98  			continue
 99  		}
100  		for _, l := range m.GetLabel() {
101  			if l == nil {
102  				continue
103  			}
104  			if !model.LabelValue(l.GetValue()).IsValid() {
105  				return fmt.Errorf("invalid label value %q", l.GetValue())
106  			}
107  			if !model.LabelName(l.GetName()).IsValid() {
108  				return fmt.Errorf("invalid label name %q", l.GetName())
109  			}
110  		}
111  	}
112  	return nil
113  }
114  
115  // textDecoder implements the Decoder interface for the text protocol.
116  type textDecoder struct {
117  	r    io.Reader
118  	p    TextParser
119  	fams []*dto.MetricFamily
120  }
121  
122  // Decode implements the Decoder interface.
123  func (d *textDecoder) Decode(v *dto.MetricFamily) error {
124  	// TODO(fabxc): Wrap this as a line reader to make streaming safer.
125  	if len(d.fams) == 0 {
126  		// No cached metric families, read everything and parse metrics.
127  		fams, err := d.p.TextToMetricFamilies(d.r)
128  		if err != nil {
129  			return err
130  		}
131  		if len(fams) == 0 {
132  			return io.EOF
133  		}
134  		d.fams = make([]*dto.MetricFamily, 0, len(fams))
135  		for _, f := range fams {
136  			d.fams = append(d.fams, f)
137  		}
138  	}
139  
140  	*v = *d.fams[0]
141  	d.fams = d.fams[1:]
142  
143  	return nil
144  }
145  
146  // SampleDecoder wraps a Decoder to extract samples from the metric families
147  // decoded by the wrapped Decoder.
148  type SampleDecoder struct {
149  	Dec  Decoder
150  	Opts *DecodeOptions
151  
152  	f dto.MetricFamily
153  }
154  
155  // Decode calls the Decode method of the wrapped Decoder and then extracts the
156  // samples from the decoded MetricFamily into the provided model.Vector.
157  func (sd *SampleDecoder) Decode(s *model.Vector) error {
158  	err := sd.Dec.Decode(&sd.f)
159  	if err != nil {
160  		return err
161  	}
162  	*s, err = extractSamples(&sd.f, sd.Opts)
163  	return err
164  }
165  
166  // ExtractSamples builds a slice of samples from the provided metric
167  // families. If an error occurrs during sample extraction, it continues to
168  // extract from the remaining metric families. The returned error is the last
169  // error that has occurred.
170  func ExtractSamples(o *DecodeOptions, fams ...*dto.MetricFamily) (model.Vector, error) {
171  	var (
172  		all     model.Vector
173  		lastErr error
174  	)
175  	for _, f := range fams {
176  		some, err := extractSamples(f, o)
177  		if err != nil {
178  			lastErr = err
179  			continue
180  		}
181  		all = append(all, some...)
182  	}
183  	return all, lastErr
184  }
185  
186  func extractSamples(f *dto.MetricFamily, o *DecodeOptions) (model.Vector, error) {
187  	switch f.GetType() {
188  	case dto.MetricType_COUNTER:
189  		return extractCounter(o, f), nil
190  	case dto.MetricType_GAUGE:
191  		return extractGauge(o, f), nil
192  	case dto.MetricType_SUMMARY:
193  		return extractSummary(o, f), nil
194  	case dto.MetricType_UNTYPED:
195  		return extractUntyped(o, f), nil
196  	case dto.MetricType_HISTOGRAM:
197  		return extractHistogram(o, f), nil
198  	}
199  	return nil, fmt.Errorf("expfmt.extractSamples: unknown metric family type %v", f.GetType())
200  }
201  
202  func extractCounter(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
203  	samples := make(model.Vector, 0, len(f.Metric))
204  
205  	for _, m := range f.Metric {
206  		if m.Counter == nil {
207  			continue
208  		}
209  
210  		lset := make(model.LabelSet, len(m.Label)+1)
211  		for _, p := range m.Label {
212  			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
213  		}
214  		lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
215  
216  		smpl := &model.Sample{
217  			Metric: model.Metric(lset),
218  			Value:  model.SampleValue(m.Counter.GetValue()),
219  		}
220  
221  		if m.TimestampMs != nil {
222  			smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
223  		} else {
224  			smpl.Timestamp = o.Timestamp
225  		}
226  
227  		samples = append(samples, smpl)
228  	}
229  
230  	return samples
231  }
232  
233  func extractGauge(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
234  	samples := make(model.Vector, 0, len(f.Metric))
235  
236  	for _, m := range f.Metric {
237  		if m.Gauge == nil {
238  			continue
239  		}
240  
241  		lset := make(model.LabelSet, len(m.Label)+1)
242  		for _, p := range m.Label {
243  			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
244  		}
245  		lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
246  
247  		smpl := &model.Sample{
248  			Metric: model.Metric(lset),
249  			Value:  model.SampleValue(m.Gauge.GetValue()),
250  		}
251  
252  		if m.TimestampMs != nil {
253  			smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
254  		} else {
255  			smpl.Timestamp = o.Timestamp
256  		}
257  
258  		samples = append(samples, smpl)
259  	}
260  
261  	return samples
262  }
263  
264  func extractUntyped(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
265  	samples := make(model.Vector, 0, len(f.Metric))
266  
267  	for _, m := range f.Metric {
268  		if m.Untyped == nil {
269  			continue
270  		}
271  
272  		lset := make(model.LabelSet, len(m.Label)+1)
273  		for _, p := range m.Label {
274  			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
275  		}
276  		lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
277  
278  		smpl := &model.Sample{
279  			Metric: model.Metric(lset),
280  			Value:  model.SampleValue(m.Untyped.GetValue()),
281  		}
282  
283  		if m.TimestampMs != nil {
284  			smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
285  		} else {
286  			smpl.Timestamp = o.Timestamp
287  		}
288  
289  		samples = append(samples, smpl)
290  	}
291  
292  	return samples
293  }
294  
295  func extractSummary(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
296  	samples := make(model.Vector, 0, len(f.Metric))
297  
298  	for _, m := range f.Metric {
299  		if m.Summary == nil {
300  			continue
301  		}
302  
303  		timestamp := o.Timestamp
304  		if m.TimestampMs != nil {
305  			timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
306  		}
307  
308  		for _, q := range m.Summary.Quantile {
309  			lset := make(model.LabelSet, len(m.Label)+2)
310  			for _, p := range m.Label {
311  				lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
312  			}
313  			// BUG(matt): Update other names to "quantile".
314  			lset[model.LabelName(model.QuantileLabel)] = model.LabelValue(fmt.Sprint(q.GetQuantile()))
315  			lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
316  
317  			samples = append(samples, &model.Sample{
318  				Metric:    model.Metric(lset),
319  				Value:     model.SampleValue(q.GetValue()),
320  				Timestamp: timestamp,
321  			})
322  		}
323  
324  		lset := make(model.LabelSet, len(m.Label)+1)
325  		for _, p := range m.Label {
326  			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
327  		}
328  		lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
329  
330  		samples = append(samples, &model.Sample{
331  			Metric:    model.Metric(lset),
332  			Value:     model.SampleValue(m.Summary.GetSampleSum()),
333  			Timestamp: timestamp,
334  		})
335  
336  		lset = make(model.LabelSet, len(m.Label)+1)
337  		for _, p := range m.Label {
338  			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
339  		}
340  		lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
341  
342  		samples = append(samples, &model.Sample{
343  			Metric:    model.Metric(lset),
344  			Value:     model.SampleValue(m.Summary.GetSampleCount()),
345  			Timestamp: timestamp,
346  		})
347  	}
348  
349  	return samples
350  }
351  
352  func extractHistogram(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
353  	samples := make(model.Vector, 0, len(f.Metric))
354  
355  	for _, m := range f.Metric {
356  		if m.Histogram == nil {
357  			continue
358  		}
359  
360  		timestamp := o.Timestamp
361  		if m.TimestampMs != nil {
362  			timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
363  		}
364  
365  		infSeen := false
366  
367  		for _, q := range m.Histogram.Bucket {
368  			lset := make(model.LabelSet, len(m.Label)+2)
369  			for _, p := range m.Label {
370  				lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
371  			}
372  			lset[model.LabelName(model.BucketLabel)] = model.LabelValue(fmt.Sprint(q.GetUpperBound()))
373  			lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
374  
375  			if math.IsInf(q.GetUpperBound(), +1) {
376  				infSeen = true
377  			}
378  
379  			samples = append(samples, &model.Sample{
380  				Metric:    model.Metric(lset),
381  				Value:     model.SampleValue(q.GetCumulativeCount()),
382  				Timestamp: timestamp,
383  			})
384  		}
385  
386  		lset := make(model.LabelSet, len(m.Label)+1)
387  		for _, p := range m.Label {
388  			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
389  		}
390  		lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
391  
392  		samples = append(samples, &model.Sample{
393  			Metric:    model.Metric(lset),
394  			Value:     model.SampleValue(m.Histogram.GetSampleSum()),
395  			Timestamp: timestamp,
396  		})
397  
398  		lset = make(model.LabelSet, len(m.Label)+1)
399  		for _, p := range m.Label {
400  			lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
401  		}
402  		lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
403  
404  		count := &model.Sample{
405  			Metric:    model.Metric(lset),
406  			Value:     model.SampleValue(m.Histogram.GetSampleCount()),
407  			Timestamp: timestamp,
408  		}
409  		samples = append(samples, count)
410  
411  		if !infSeen {
412  			// Append an infinity bucket sample.
413  			lset := make(model.LabelSet, len(m.Label)+2)
414  			for _, p := range m.Label {
415  				lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
416  			}
417  			lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf")
418  			lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
419  
420  			samples = append(samples, &model.Sample{
421  				Metric:    model.Metric(lset),
422  				Value:     count.Value,
423  				Timestamp: timestamp,
424  			})
425  		}
426  	}
427  
428  	return samples
429  }