博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
02.InfluxDB系统化学习-InfluxDB初始化
阅读量:4110 次
发布时间:2019-05-25

本文共 14554 字,大约阅读时间需要 48 分钟。

说明

版本信息

InfluxDB v1.7.2 (git: 1.7 76f907b0fada2f16931e37471da695349fcdf8c6)

Git地址

官方文档

安装方式

使用docker安装本地使用:docker pull influxdb

启动命令

使用influxd来完成InfluxDB数据库的启动,启动命令如下:

influxd run -pidfile /var/log/influxdb/pidfile.log -cpuprofile /var/log/influxdb/cpuprofile.log -memprofile /var/log/influxdb/memprofile.log

influxd命令说明

Usage: influxd [[command] [arguments]]The commands are:    backup               downloads a snapshot of a data node and saves it to disk    config               display the default configuration    help                 display this help message    restore              uses a snapshot of a data node to rebuild a cluster    run                  run node with existing configuration    version              displays the InfluxDB version"run" is the default command.Use "influxd [command] -help" for more information about a command.root@c6239814501a:/var/log/influxdb# influxd run -helpRuns the InfluxDB server.Usage: influxd run [flags]    -config 
Set the path to the configuration file. This defaults to the environment variable INFLUXDB_CONFIG_PATH, ~/.influxdb/influxdb.conf, or /etc/influxdb/influxdb.conf if a file is present at any of these locations. Disable the automatic loading of a configuration file using the null device (such as /dev/null). -pidfile
Write process ID to a file. -cpuprofile
Write CPU profiling information to a file. -memprofile
Write memory usage information to a file.

代码分析

入口

Influxdb的启动代码实现在 cmd/influxd/main.go

// influxdb数据库服务启动入口func main() {	rand.Seed(time.Now().UnixNano())	m := NewMain() //定义输出方式(日志使用标准输出完成)	if err := m.Run(os.Args[1:]...); err != nil {		fmt.Fprintln(os.Stderr, err)		os.Exit(1)	}}// Main represents the program execution.type Main struct {	Stdin  io.Reader	Stdout io.Writer	Stderr io.Writer}// NewMain return a new instance of Main.func NewMain() *Main {	return &Main{		Stdin:  os.Stdin,		Stdout: os.Stdout,		Stderr: os.Stderr,	}}

main中的Run方法

首先使用cmd := run.NewCommand()创建cmd,然后调用run方法开始服务的启动与初始化,具体代码如下:

// Run determines and runs the command specified by the CLI args.func (m *Main) Run(args ...string) error {    name, args := cmd.ParseCommandName(args)    // Extract name from args.    switch name {    case "", "run": //如果参数为空或者为run则调用run中的command完成启动        cmd := run.NewCommand()        // Tell the server the build details.        cmd.Version = version        cmd.Commit = commit        cmd.Branch = branch        if err := cmd.Run(args...); err != nil {            return fmt.Errorf("run: %s", err)        }        signalCh := make(chan os.Signal, 1)        signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)        cmd.Logger.Info("Listening for signals")        // Block until one of the signals above is received        <-signalCh        cmd.Logger.Info("Signal received, initializing clean shutdown...")        go cmd.Close()        // Block again until another signal is received, a shutdown timeout elapses,        // or the Command is gracefully closed        cmd.Logger.Info("Waiting for clean shutdown...")        select {        case <-signalCh:            cmd.Logger.Info("Second signal received, initializing hard shutdown")        case <-time.After(time.Second * 30):            cmd.Logger.Info("Time limit reached, initializing hard shutdown")        case <-cmd.Closed:            cmd.Logger.Info("Server shutdown completed")        }        // goodbye.    case "backup":        name := backup.NewCommand()        if err := name.Run(args...); err != nil {            return fmt.Errorf("backup: %s", err)        }    case "restore":        name := restore.NewCommand()        if err := name.Run(args...); err != nil {            return fmt.Errorf("restore: %s", err)        }    case "config":        if err := run.NewPrintConfigCommand().Run(args...); err != nil {            return fmt.Errorf("config: %s", err)        }    case "version":        if err := NewVersionCommand().Run(args...); err != nil {            return fmt.Errorf("version: %s", err)        }    case "help":        if err := help.NewCommand().Run(args...); err != nil {            return fmt.Errorf("help: %s", err)        }    default:        return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influxd help' for usage`+"\n\n", name)    }    return nil}

cmd/influxd/run/command

创建cmd对象

// NewCommand return a new instance of Command.func NewCommand() *Command {	return &Command{		closing: make(chan struct{}),		Closed:  make(chan struct{}),		Stdin:   os.Stdin,		Stdout:  os.Stdout,		Stderr:  os.Stderr,		Logger:  zap.NewNop(),	}}

调用Run方法完成信息的初始化和Server服务的调用

// Run parses the config from args and runs the server.func (cmd *Command) Run(args ...string) error {	// Parse the command line flags.--解析命令行参数并放入options变量中	options, err := cmd.ParseFlags(args...)	if err != nil {		return err	}	// 解析配置文件,初始化各组件的配置信息	config, err := cmd.ParseConfig(options.GetConfigPath())	if err != nil {		return fmt.Errorf("parse config: %s", err)	}	// Apply any environment variables on top of the parsed config	if err := config.ApplyEnvOverrides(cmd.Getenv); err != nil {		return fmt.Errorf("apply env config: %v", err)	}	// Validate the configuration.	if err := config.Validate(); err != nil {		return fmt.Errorf("%s. To generate a valid configuration file run `influxd config > influxdb.generated.conf`", err)	}	// 初始化logger--使用操作系统标准输出	var logErr error	if cmd.Logger, logErr = config.Logging.New(cmd.Stderr); logErr != nil {		// assign the default logger		cmd.Logger = logger.New(cmd.Stderr)	}	// Attempt to run pprof on :6060 before startup if debug pprof enabled.	if config.HTTPD.DebugPprofEnabled {		runtime.SetBlockProfileRate(int(1 * time.Second))		runtime.SetMutexProfileFraction(1)		go func() { http.ListenAndServe("localhost:6060", nil) }()	}	// Print sweet InfluxDB logo.	if !config.Logging.SuppressLogo && logger.IsTerminal(cmd.Stdout) {		fmt.Fprint(cmd.Stdout, logo)	}	// Mark start-up in log.	cmd.Logger.Info("InfluxDB starting",		zap.String("version", cmd.Version),		zap.String("branch", cmd.Branch),		zap.String("commit", cmd.Commit))	cmd.Logger.Info("Go runtime",		zap.String("version", runtime.Version()),		zap.Int("maxprocs", runtime.GOMAXPROCS(0)))	// If there was an error on startup when creating the logger, output it now.	if logErr != nil {		cmd.Logger.Error("Unable to configure logger", zap.Error(logErr))	}	// 如果配置了pid file path, 就写pud	// Write the PID file.	if err := cmd.writePIDFile(options.PIDFile); err != nil {		return fmt.Errorf("write pid file: %s", err)	}	cmd.pidfile = options.PIDFile	if config.HTTPD.PprofEnabled {		// Turn on block and mutex profiling.		runtime.SetBlockProfileRate(int(1 * time.Second))		runtime.SetMutexProfileFraction(1) // Collect every sample	}	// Create server from config and start it.	buildInfo := &BuildInfo{		Version: cmd.Version,		Commit:  cmd.Commit,		Branch:  cmd.Branch,		Time:    cmd.BuildTime,	}	// NewServer returns a new instance of Server built from a config.	// 创建Server对象,并调用Open方法将 Server运行起来---核心入口	s, err := NewServer(config, buildInfo)	if err != nil {		return fmt.Errorf("create server: %s", err)	}	s.Logger = cmd.Logger	s.CPUProfile = options.CPUProfile	s.MemProfile = options.MemProfile	if err := s.Open(); err != nil { //Open opens the meta and data store and all services.		return fmt.Errorf("open server: %s", err)	}	cmd.Server = s	// Begin monitoring the server's error channel.	// 开如monitor server error信息	go cmd.monitorServerErrors()	return nil}

cmd/influxd/run/Server

创建server对象

// NewServer returns a new instance of Server built from a config.// 依据配置Server对象和它管理的各个组件func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {    // First grab the base tls config we will use for all clients and servers    tlsConfig, err := c.TLS.Parse()    if err != nil {        return nil, fmt.Errorf("tls configuration: %v", err)    }    // Update the TLS values on each of the configs to be the parsed one if    // not already specified (set the default).    updateTLSConfig(&c.HTTPD.TLS, tlsConfig)    updateTLSConfig(&c.Subscriber.TLS, tlsConfig)    for i := range c.OpenTSDBInputs {        updateTLSConfig(&c.OpenTSDBInputs[i].TLS, tlsConfig)    }    // We need to ensure that a meta directory always exists even if    // we don't start the meta store. node.json is always stored under    // the meta directory.    if err := os.MkdirAll(c.Meta.Dir, 0777); err != nil {        return nil, fmt.Errorf("mkdir all: %s", err)    }    // 0.10-rc1 and prior would sometimes put the node.json at the root    // dir which breaks backup/restore and restarting nodes. This moves    // the file from the root so it's always under the meta dir.    oldPath := filepath.Join(filepath.Dir(c.Meta.Dir), "node.json")    newPath := filepath.Join(c.Meta.Dir, "node.json")    if _, err := os.Stat(oldPath); err == nil {        if err := os.Rename(oldPath, newPath); err != nil {            return nil, err        }    }    _, err = influxdb.LoadNode(c.Meta.Dir)    if err != nil {        if !os.IsNotExist(err) {            return nil, err        }    }    if err := raftDBExists(c.Meta.Dir); err != nil {        return nil, err    }    // In 0.10.0 bind-address got moved to the top level. Check    // The old location to keep things backwards compatible    bind := c.BindAddress    s := &Server{        buildInfo: *buildInfo,        err: make(chan error),        closing: make(chan struct{}),        BindAddress: bind,        Logger: logger.New(os.Stderr),        MetaClient: meta.NewClient(c.Meta),        reportingDisabled: c.ReportingDisabled,        httpAPIAddr: c.HTTPD.BindAddress,        httpUseTLS: c.HTTPD.HTTPSEnabled,        tcpAddr: bind,        config: c,    }    s.Monitor = monitor.New(s, c.Monitor)    s.config.registerDiagnostics(s.Monitor)    if err := s.MetaClient.Open(); err != nil {        return nil, err    }    // 初始化存储结构    s.TSDBStore = tsdb.NewStore(c.Data.Dir)    s.TSDBStore.EngineOptions.Config = c.Data    // Copy TSDB configuration.    s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine    s.TSDBStore.EngineOptions.IndexVersion = c.Data.Index    // Create the Subscriber service    s.Subscriber = subscriber.NewService(c.Subscriber)    // Initialize points writer.    s.PointsWriter = coordinator.NewPointsWriter()    s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout)    s.PointsWriter.TSDBStore = s.TSDBStore    // Initialize query executor.    s.QueryExecutor = query.NewExecutor()    s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{        MetaClient: s.MetaClient,        TaskManager: s.QueryExecutor.TaskManager,        TSDBStore: s.TSDBStore,        ShardMapper: &coordinator.LocalShardMapper{            MetaClient: s.MetaClient,            TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},        },        Monitor: s.Monitor,        PointsWriter: s.PointsWriter,        MaxSelectPointN: c.Coordinator.MaxSelectPointN,        MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN,        MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,    }    s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)    s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)    s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries    // Initialize the monitor    s.Monitor.Version = s.buildInfo.Version    s.Monitor.Commit = s.buildInfo.Commit    s.Monitor.Branch = s.buildInfo.Branch    s.Monitor.BuildTime = s.buildInfo.Time    s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter)    return s, nil}

开启Server服务

调用Server.Open添加各种service,让各个组件运行起来,代码如下:

// Open opens the meta and data store and all services.func (s *Server) Open() error {    // Start profiling, if set.    // 开启性能分析,根据启动时参数    startProfile(s.CPUProfile, s.MemProfile)    // Open shared TCP connection.    // 创建并运行一个tcp的连接复用器    ln, err := net.Listen("tcp", s.BindAddress)    if err != nil {        return fmt.Errorf("listen: %s", err)    }    s.Listener = ln    // Multiplex listener.    mux := tcp.NewMux()    go mux.Serve(ln)    // Append services.--添加各种服务    s.appendMonitorService()    s.appendPrecreatorService(s.config.Precreator) //预创建ShardGroup    s.appendSnapshotterService() //使用上面的tcp连接复用器,处理snapshot相关的请求    s.appendContinuousQueryService(s.config.ContinuousQuery) // 连续query服务    s.appendHTTPDService(s.config.HTTPD) //http服务,接收并处理所有客户端的请求    s.appendRetentionPolicyService(s.config.Retention) //依据RetentionPolicy周期性的作清理    // Graphite, Collectd, OpenTSDB都会对其实TSDB数据格式的支持    for _, i := range s.config.GraphiteInputs {        if err := s.appendGraphiteService(i); err != nil {            return err        }    }    for _, i := range s.config.CollectdInputs {        s.appendCollectdService(i)    }    for _, i := range s.config.OpenTSDBInputs {        if err := s.appendOpenTSDBService(i); err != nil {            return err        }    }    for _, i := range s.config.UDPInputs {        s.appendUDPService(i)    }    s.Subscriber.MetaClient = s.MetaClient    s.PointsWriter.MetaClient = s.MetaClient    s.Monitor.MetaClient = s.MetaClient    s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)    // Configure logging for all services and clients.    if s.config.Meta.LoggingEnabled {        s.MetaClient.WithLogger(s.Logger)    }    s.TSDBStore.WithLogger(s.Logger)    if s.config.Data.QueryLogEnabled {        s.QueryExecutor.WithLogger(s.Logger)    }    s.PointsWriter.WithLogger(s.Logger)    s.Subscriber.WithLogger(s.Logger)    for _, svc := range s.Services {        svc.WithLogger(s.Logger)    }    s.SnapshotterService.WithLogger(s.Logger)    s.Monitor.WithLogger(s.Logger)    // Open TSDB store.--初始化tsdb数据文件    if err := s.TSDBStore.Open(); err != nil {        return fmt.Errorf("open tsdb store: %s", err)    }    // Open the subscriber service    if err := s.Subscriber.Open(); err != nil {        return fmt.Errorf("open subscriber: %s", err)    }    // Open the points writer service    if err := s.PointsWriter.Open(); err != nil {        return fmt.Errorf("open points writer: %s", err)    }    s.PointsWriter.AddWriteSubscriber(s.Subscriber.Points())    for _, service := range s.Services {        if err := service.Open(); err != nil {            return fmt.Errorf("open service: %s", err)        }    }    // Start the reporting service, if not disabled.    if !s.reportingDisabled {        go s.startServerReporting()    }    return nil}

整体初始化流程

参考

 

 

 

转载地址:http://qfasi.baihongyu.com/

你可能感兴趣的文章
C语言中的共用体(union)
查看>>
《SQL Server 2012 学习日记 》——初识SQL Server 2012
查看>>
《SQL Server 2012 学习日记 》——数据库的操作
查看>>
《SQL Server 2012 学习日记 》——数据表的操作
查看>>
C语言中的数据类型更深层次的理解
查看>>
C++标准库体系结构与内核分析学习笔记——C++标准库基础小知识
查看>>
C++标准库体系结构与内核分析学习笔记——C++STL基础结构
查看>>
Linux学习日记(一)——Linux的文件目录
查看>>
《SQL Server 2012 学习日记 》——Transact-SQL语言基础
查看>>
C++的运算符重载及其限制
查看>>
Linux学习日记(二)——Ubuntu下文件和目录的操作
查看>>
Linux学习日记(三)——Ubuntu下查看和修改文件的权限
查看>>
Linux学习日记(五)——Ubuntu下U盘的挂载与分区
查看>>
Linux学习日记(四)——Ubuntu下文件的查找和检索
查看>>
Linux学习日记(六)——Ubuntu下的压缩包管理
查看>>
Linux学习日记(七)——Ubuntu下的进程管理
查看>>
Linux学习日记(八)——Ubuntu下的网络相关命令
查看>>
Linux学习日记(九)——Ubuntu下的用户管理
查看>>
Linux学习日记(十)——Ubuntu下Vim编辑器的使用
查看>>
Linux学习日记(十一)——Ubuntu下使用gcc编译器
查看>>