本文共 14554 字,大约阅读时间需要 48 分钟。
版本信息 | InfluxDB v1.7.2 (git: 1.7 76f907b0fada2f16931e37471da695349fcdf8c6) |
Git地址 | |
官方文档 | |
安装方式 | 使用docker安装本地使用:docker pull influxdb |
使用influxd来完成InfluxDB数据库的启动,启动命令如下:
influxd run -pidfile /var/log/influxdb/pidfile.log -cpuprofile /var/log/influxdb/cpuprofile.log -memprofile /var/log/influxdb/memprofile.log
Usage: influxd [[command] [arguments]]The commands are: backup downloads a snapshot of a data node and saves it to disk config display the default configuration help display this help message restore uses a snapshot of a data node to rebuild a cluster run run node with existing configuration version displays the InfluxDB version"run" is the default command.Use "influxd [command] -help" for more information about a command.root@c6239814501a:/var/log/influxdb# influxd run -helpRuns the InfluxDB server.Usage: influxd run [flags] -configSet the path to the configuration file. This defaults to the environment variable INFLUXDB_CONFIG_PATH, ~/.influxdb/influxdb.conf, or /etc/influxdb/influxdb.conf if a file is present at any of these locations. Disable the automatic loading of a configuration file using the null device (such as /dev/null). -pidfile Write process ID to a file. -cpuprofile Write CPU profiling information to a file. -memprofile Write memory usage information to a file.
Influxdb的启动代码实现在 cmd/influxd/main.go中
// influxdb数据库服务启动入口func main() { rand.Seed(time.Now().UnixNano()) m := NewMain() //定义输出方式(日志使用标准输出完成) if err := m.Run(os.Args[1:]...); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) }}// Main represents the program execution.type Main struct { Stdin io.Reader Stdout io.Writer Stderr io.Writer}// NewMain return a new instance of Main.func NewMain() *Main { return &Main{ Stdin: os.Stdin, Stdout: os.Stdout, Stderr: os.Stderr, }}
首先使用cmd := run.NewCommand()创建cmd,然后调用run方法开始服务的启动与初始化,具体代码如下:
// Run determines and runs the command specified by the CLI args.func (m *Main) Run(args ...string) error { name, args := cmd.ParseCommandName(args) // Extract name from args. switch name { case "", "run": //如果参数为空或者为run则调用run中的command完成启动 cmd := run.NewCommand() // Tell the server the build details. cmd.Version = version cmd.Commit = commit cmd.Branch = branch if err := cmd.Run(args...); err != nil { return fmt.Errorf("run: %s", err) } signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) cmd.Logger.Info("Listening for signals") // Block until one of the signals above is received <-signalCh cmd.Logger.Info("Signal received, initializing clean shutdown...") go cmd.Close() // Block again until another signal is received, a shutdown timeout elapses, // or the Command is gracefully closed cmd.Logger.Info("Waiting for clean shutdown...") select { case <-signalCh: cmd.Logger.Info("Second signal received, initializing hard shutdown") case <-time.After(time.Second * 30): cmd.Logger.Info("Time limit reached, initializing hard shutdown") case <-cmd.Closed: cmd.Logger.Info("Server shutdown completed") } // goodbye. case "backup": name := backup.NewCommand() if err := name.Run(args...); err != nil { return fmt.Errorf("backup: %s", err) } case "restore": name := restore.NewCommand() if err := name.Run(args...); err != nil { return fmt.Errorf("restore: %s", err) } case "config": if err := run.NewPrintConfigCommand().Run(args...); err != nil { return fmt.Errorf("config: %s", err) } case "version": if err := NewVersionCommand().Run(args...); err != nil { return fmt.Errorf("version: %s", err) } case "help": if err := help.NewCommand().Run(args...); err != nil { return fmt.Errorf("help: %s", err) } default: return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influxd help' for usage`+"\n\n", name) } return nil}
// NewCommand return a new instance of Command.func NewCommand() *Command { return &Command{ closing: make(chan struct{}), Closed: make(chan struct{}), Stdin: os.Stdin, Stdout: os.Stdout, Stderr: os.Stderr, Logger: zap.NewNop(), }}
// Run parses the config from args and runs the server.func (cmd *Command) Run(args ...string) error { // Parse the command line flags.--解析命令行参数并放入options变量中 options, err := cmd.ParseFlags(args...) if err != nil { return err } // 解析配置文件,初始化各组件的配置信息 config, err := cmd.ParseConfig(options.GetConfigPath()) if err != nil { return fmt.Errorf("parse config: %s", err) } // Apply any environment variables on top of the parsed config if err := config.ApplyEnvOverrides(cmd.Getenv); err != nil { return fmt.Errorf("apply env config: %v", err) } // Validate the configuration. if err := config.Validate(); err != nil { return fmt.Errorf("%s. To generate a valid configuration file run `influxd config > influxdb.generated.conf`", err) } // 初始化logger--使用操作系统标准输出 var logErr error if cmd.Logger, logErr = config.Logging.New(cmd.Stderr); logErr != nil { // assign the default logger cmd.Logger = logger.New(cmd.Stderr) } // Attempt to run pprof on :6060 before startup if debug pprof enabled. if config.HTTPD.DebugPprofEnabled { runtime.SetBlockProfileRate(int(1 * time.Second)) runtime.SetMutexProfileFraction(1) go func() { http.ListenAndServe("localhost:6060", nil) }() } // Print sweet InfluxDB logo. if !config.Logging.SuppressLogo && logger.IsTerminal(cmd.Stdout) { fmt.Fprint(cmd.Stdout, logo) } // Mark start-up in log. cmd.Logger.Info("InfluxDB starting", zap.String("version", cmd.Version), zap.String("branch", cmd.Branch), zap.String("commit", cmd.Commit)) cmd.Logger.Info("Go runtime", zap.String("version", runtime.Version()), zap.Int("maxprocs", runtime.GOMAXPROCS(0))) // If there was an error on startup when creating the logger, output it now. if logErr != nil { cmd.Logger.Error("Unable to configure logger", zap.Error(logErr)) } // 如果配置了pid file path, 就写pud // Write the PID file. if err := cmd.writePIDFile(options.PIDFile); err != nil { return fmt.Errorf("write pid file: %s", err) } cmd.pidfile = options.PIDFile if config.HTTPD.PprofEnabled { // Turn on block and mutex profiling. runtime.SetBlockProfileRate(int(1 * time.Second)) runtime.SetMutexProfileFraction(1) // Collect every sample } // Create server from config and start it. buildInfo := &BuildInfo{ Version: cmd.Version, Commit: cmd.Commit, Branch: cmd.Branch, Time: cmd.BuildTime, } // NewServer returns a new instance of Server built from a config. // 创建Server对象,并调用Open方法将 Server运行起来---核心入口 s, err := NewServer(config, buildInfo) if err != nil { return fmt.Errorf("create server: %s", err) } s.Logger = cmd.Logger s.CPUProfile = options.CPUProfile s.MemProfile = options.MemProfile if err := s.Open(); err != nil { //Open opens the meta and data store and all services. return fmt.Errorf("open server: %s", err) } cmd.Server = s // Begin monitoring the server's error channel. // 开如monitor server error信息 go cmd.monitorServerErrors() return nil}
// NewServer returns a new instance of Server built from a config.// 依据配置Server对象和它管理的各个组件func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { // First grab the base tls config we will use for all clients and servers tlsConfig, err := c.TLS.Parse() if err != nil { return nil, fmt.Errorf("tls configuration: %v", err) } // Update the TLS values on each of the configs to be the parsed one if // not already specified (set the default). updateTLSConfig(&c.HTTPD.TLS, tlsConfig) updateTLSConfig(&c.Subscriber.TLS, tlsConfig) for i := range c.OpenTSDBInputs { updateTLSConfig(&c.OpenTSDBInputs[i].TLS, tlsConfig) } // We need to ensure that a meta directory always exists even if // we don't start the meta store. node.json is always stored under // the meta directory. if err := os.MkdirAll(c.Meta.Dir, 0777); err != nil { return nil, fmt.Errorf("mkdir all: %s", err) } // 0.10-rc1 and prior would sometimes put the node.json at the root // dir which breaks backup/restore and restarting nodes. This moves // the file from the root so it's always under the meta dir. oldPath := filepath.Join(filepath.Dir(c.Meta.Dir), "node.json") newPath := filepath.Join(c.Meta.Dir, "node.json") if _, err := os.Stat(oldPath); err == nil { if err := os.Rename(oldPath, newPath); err != nil { return nil, err } } _, err = influxdb.LoadNode(c.Meta.Dir) if err != nil { if !os.IsNotExist(err) { return nil, err } } if err := raftDBExists(c.Meta.Dir); err != nil { return nil, err } // In 0.10.0 bind-address got moved to the top level. Check // The old location to keep things backwards compatible bind := c.BindAddress s := &Server{ buildInfo: *buildInfo, err: make(chan error), closing: make(chan struct{}), BindAddress: bind, Logger: logger.New(os.Stderr), MetaClient: meta.NewClient(c.Meta), reportingDisabled: c.ReportingDisabled, httpAPIAddr: c.HTTPD.BindAddress, httpUseTLS: c.HTTPD.HTTPSEnabled, tcpAddr: bind, config: c, } s.Monitor = monitor.New(s, c.Monitor) s.config.registerDiagnostics(s.Monitor) if err := s.MetaClient.Open(); err != nil { return nil, err } // 初始化存储结构 s.TSDBStore = tsdb.NewStore(c.Data.Dir) s.TSDBStore.EngineOptions.Config = c.Data // Copy TSDB configuration. s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine s.TSDBStore.EngineOptions.IndexVersion = c.Data.Index // Create the Subscriber service s.Subscriber = subscriber.NewService(c.Subscriber) // Initialize points writer. s.PointsWriter = coordinator.NewPointsWriter() s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout) s.PointsWriter.TSDBStore = s.TSDBStore // Initialize query executor. s.QueryExecutor = query.NewExecutor() s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{ MetaClient: s.MetaClient, TaskManager: s.QueryExecutor.TaskManager, TSDBStore: s.TSDBStore, ShardMapper: &coordinator.LocalShardMapper{ MetaClient: s.MetaClient, TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore}, }, Monitor: s.Monitor, PointsWriter: s.PointsWriter, MaxSelectPointN: c.Coordinator.MaxSelectPointN, MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN, MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN, } s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout) s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter) s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries // Initialize the monitor s.Monitor.Version = s.buildInfo.Version s.Monitor.Commit = s.buildInfo.Commit s.Monitor.Branch = s.buildInfo.Branch s.Monitor.BuildTime = s.buildInfo.Time s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter) return s, nil}
调用Server.Open添加各种service,让各个组件运行起来,代码如下:
// Open opens the meta and data store and all services.func (s *Server) Open() error { // Start profiling, if set. // 开启性能分析,根据启动时参数 startProfile(s.CPUProfile, s.MemProfile) // Open shared TCP connection. // 创建并运行一个tcp的连接复用器 ln, err := net.Listen("tcp", s.BindAddress) if err != nil { return fmt.Errorf("listen: %s", err) } s.Listener = ln // Multiplex listener. mux := tcp.NewMux() go mux.Serve(ln) // Append services.--添加各种服务 s.appendMonitorService() s.appendPrecreatorService(s.config.Precreator) //预创建ShardGroup s.appendSnapshotterService() //使用上面的tcp连接复用器,处理snapshot相关的请求 s.appendContinuousQueryService(s.config.ContinuousQuery) // 连续query服务 s.appendHTTPDService(s.config.HTTPD) //http服务,接收并处理所有客户端的请求 s.appendRetentionPolicyService(s.config.Retention) //依据RetentionPolicy周期性的作清理 // Graphite, Collectd, OpenTSDB都会对其实TSDB数据格式的支持 for _, i := range s.config.GraphiteInputs { if err := s.appendGraphiteService(i); err != nil { return err } } for _, i := range s.config.CollectdInputs { s.appendCollectdService(i) } for _, i := range s.config.OpenTSDBInputs { if err := s.appendOpenTSDBService(i); err != nil { return err } } for _, i := range s.config.UDPInputs { s.appendUDPService(i) } s.Subscriber.MetaClient = s.MetaClient s.PointsWriter.MetaClient = s.MetaClient s.Monitor.MetaClient = s.MetaClient s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader) // Configure logging for all services and clients. if s.config.Meta.LoggingEnabled { s.MetaClient.WithLogger(s.Logger) } s.TSDBStore.WithLogger(s.Logger) if s.config.Data.QueryLogEnabled { s.QueryExecutor.WithLogger(s.Logger) } s.PointsWriter.WithLogger(s.Logger) s.Subscriber.WithLogger(s.Logger) for _, svc := range s.Services { svc.WithLogger(s.Logger) } s.SnapshotterService.WithLogger(s.Logger) s.Monitor.WithLogger(s.Logger) // Open TSDB store.--初始化tsdb数据文件 if err := s.TSDBStore.Open(); err != nil { return fmt.Errorf("open tsdb store: %s", err) } // Open the subscriber service if err := s.Subscriber.Open(); err != nil { return fmt.Errorf("open subscriber: %s", err) } // Open the points writer service if err := s.PointsWriter.Open(); err != nil { return fmt.Errorf("open points writer: %s", err) } s.PointsWriter.AddWriteSubscriber(s.Subscriber.Points()) for _, service := range s.Services { if err := service.Open(); err != nil { return fmt.Errorf("open service: %s", err) } } // Start the reporting service, if not disabled. if !s.reportingDisabled { go s.startServerReporting() } return nil}
转载地址:http://qfasi.baihongyu.com/