夜莺初探四·mtail插件采集日志指标前言
上一篇介绍了Categraf的配置,这篇我们尝试通过使用google开源的mtail工具来作为Categraf的插件,从应用日志中提取指标数据。
mtail项目介绍和配置文件说明
通过mtail -h可以很方便看到参数详细,也推荐乔克-从日志中提取指标的瑞士军刀或者Dream运维梦工厂-categraf-mtail日志收集插件详解来了解更多,我就不再班门弄斧了。
当然也可以通过官方来了解详情新手村介绍 和高手入门
Categraf采集插件
categraf-mtail插件地址
https://github.com/flashcatcloud/categraf/tree/main/inputs/mtail
源码解读
package mtail...//常量值const inputName = `mtail`const description = ` extract internal monitoring data from application logs`//配置// MTail holds the configuration for the plugin.type MTail struct { config.PluginConfig Instances []*Instance `toml:"instances"`}//配置文件中instances对象需要参数结构体type Instance struct {config.InstanceConfig/**type InternalConfig struct {// append labelsLabels map[string]string `toml:"labels"`// metrics drop and pass filterMetricsDrop []string `toml:"metrics_drop"`MetricsPass []string `toml:"metrics_pass"`MetricsDropFilter filter.FilterMetricsPassFilter filter.Filter// metric name prefixMetricsNamePrefix string `toml:"metrics_name_prefix"`// mapping valueProcessorEnum []*ProcessorEnum `toml:"processor_enum"`// whether instance initial successinited bool `toml:"-"`}type InstanceConfig struct {InternalConfigIntervalTimes int64 `toml:"interval_times"`}**/NamePrefix string `toml:"name_prefix"`Progs string `toml:"progs"` //规则文件(xxx.mtail)的目录Logs []string `toml:"logs"` //要监控的日志文件IgnoreFileRegPattern string `toml:"ignore_filename_regex_pattern"`OverrideTimeZone string `toml:"override_timezone"`//指定时区EmitProgLabel string `toml:"emit_prog_label"`//是否导出label标签 string类型的bool值 emitProgLabel bool `toml:"-"`EmitMetricTimestamp string `toml:"emit_metric_timestamp"`//metrics是否带时间戳 string类型的bool值emitMetricTimestamp bool `toml:"-"`PollInterval time.Duration `toml:"poll_interval"`PollLogInterval time.Duration `toml:"poll_log_interval"`MetricPushInterval time.Duration `toml:"metric_push_interval"`MaxRegexpLen int `toml:"max_regexp_length"`MaxRecursionDepth int `toml:"max_recursion_depth"`SyslogUseCurrentYear string `toml:"syslog_use_current_year"` // truesysLogUseCurrentYear bool `toml:"-"`LogRuntimeErrors string `toml:"vm_logs_runtime_errors"` // truelogRuntimeErrors bool `toml:"-"`//ctx context.Context `toml:"-"`cancel context.CancelFunc `toml:"-"`m *mtail.Server}//配置文件中instances对象的Init函数,调用mtailfunc (ins *Instance) Init() error {//初始化检查,设置默认值if len(ins.Progs) == 0 || len(ins.Logs) == 0 {return types.ErrInstancesEmpty}// set default valueins.sysLogUseCurrentYear = ins.SyslogUseCurrentYear == "true"ins.logRuntimeErrors = ins.LogRuntimeErrors == "true"ins.emitProgLabel = ins.EmitProgLabel == "true"ins.emitMetricTimestamp = ins.EmitMetricTimestamp == "true"if ins.PollLogInterval == 0 {ins.PollLogInterval = 250 * time.Millisecond}if ins.PollInterval == 0 {ins.PollInterval = 250 * time.Millisecond}if ins.MetricPushInterval == 0 {ins.MetricPushInterval = 1 * time.Minute}if ins.MaxRegexpLen == 0 {ins.MaxRegexpLen = 1024}if ins.MaxRecursionDepth == 0 {ins.MaxRecursionDepth = 100}buildInfo := mtail.BuildInfo{Version: config.Version,}//时区设置loc, err := time.LoadLocation(ins.OverrideTimeZone)if err != nil {fmt.Fprintf(os.Stderr, "Couldn't parse timezone %q: %s", ins.OverrideTimeZone, err)return err}//mtail参数设置opts := []mtail.Option{mtail.ProgramPath(ins.Progs),mtail.LogPathPatterns(ins.Logs...),mtail.IgnoreRegexPattern(ins.IgnoreFileRegPattern),mtail.SetBuildInfo(buildInfo),mtail.OverrideLocation(loc),mtail.MetricPushInterval(ins.MetricPushInterval), // keep it here ?mtail.MaxRegexpLength(ins.MaxRegexpLen),mtail.MaxRecursionDepth(ins.MaxRecursionDepth),mtail.LogRuntimeErrors,}if ins.cancel != nil {ins.cancel()} else {ins.ctx, ins.cancel = context.WithCancel(context.Background()) //父级ctx}//mtail配置,每隔1h启动 清理过期日志staleLogGcWaker := waker.NewTimed(ins.ctx, time.Hour)opts = append(opts, mtail.StaleLogGcWaker(staleLogGcWaker))if ins.PollInterval > 0 {logStreamPollWaker := waker.NewTimed(ins.ctx, ins.PollInterval)logPatternPollWaker := waker.NewTimed(ins.ctx, ins.PollLogInterval)opts = append(opts, mtail.LogPatternPollWaker(logPatternPollWaker), mtail.LogstreamPollWaker(logStreamPollWaker))}if ins.sysLogUseCurrentYear {opts = append(opts, mtail.SyslogUseCurrentYear)}if !ins.emitProgLabel {opts = append(opts, mtail.OmitProgLabel)}if ins.emitMetricTimestamp {opts = append(opts, mtail.EmitMetricTimestamp)}//指标结果存储对象store := metrics.NewStore()//间隔1h清理历史指标store.StartGcLoop(ins.ctx, time.Hour)m, err := mtail.New(ins.ctx, store, opts...)if err != nil {log.Println(err)ins.cancel()return err}ins.m = mreturn nil}//销毁取消所有任务func (ins *Instance) Drop() {ins.cancel()}//对象初始化方法func init() {inputs.Add(inputName, func() inputs.Input {return &MTail{}})}//对象复制返回新建对象func (s *MTail) Clone() inputs.Input {return &MTail{}}func (s *MTail) Name() string {return inputName}//MTail获取配置文件中所有instancesfunc (s *MTail) GetInstances() []inputs.Instance {ret := make([]inputs.Instance, len(s.Instances))for i := 0; i < len(s.Instances); i++ {ret[i] = s.Instances[i]}return ret}// Description returns a one-sentence description on the input.func (s *MTail) Description() string {return description}//抓取数据方法?// Gather retrieves all the configured fields and tables.// Any error encountered does not halt the process. The errors are accumulated// and returned at the end.// func (s *Instance) Gather(acc telegraf.Accumulator) error {func (ins *Instance) Gather(slist *types.SampleList) {//获取到prometheus注册器reg := ins.m.GetRegistry()mfs, done, err := prometheus.ToTransactionalGatherer(reg).Gather()if err != nil {log.Println(err)return}defer done()//遍历所有指标向量?for _, mf := range mfs {metricName := mf.GetName()//遍历所有指标for _, m := range mf.Metric {//加入配置的Lablestags := util.MakeLabels(m, ins.GetLabels())//处理不同指标类型if mf.GetType() == dto.MetricType_SUMMARY {util.HandleSummary(inputName, m, tags, metricName, ins.GetLogMetricTime, slist)} else if mf.GetType() == dto.MetricType_HISTOGRAM {util.HandleHistogram(inputName, m, tags, metricName, ins.GetLogMetricTime, slist)} else {util.HandleGaugeCounter(inputName, m, tags, metricName, ins.GetLogMetricTime, slist)}}}}//返回时间戳func (p *Instance) GetLogMetricTime(ts int64) time.Time {var tm time.Timeif ts <= 0 || !p.emitMetricTimestamp {return tm}sec := ts / 1000ms := ts % 1000 * 1e6tm = time.Unix(sec, ms)return tm}
整体理解下来,Categraf有效的通过统一的文件完成了多个目录,多个规则的日志采集,简化许多操作。
最后感谢看完,由于作者水平有限,使用很多工具并不熟悉,如有错误和遗漏欢迎指出,感谢谅解。
以上内容来源于官方推出的夜莺黄埔营的免费培训活动,加入 QQ 群查看直播视频,还可以在官方答疑站点获得更多支持 https://answer.flashcat.cloud/