当前位置: 首页> 科技> 名企 > 村建站属于哪个部门_服务器租赁合同范本_免费网络空间搜索引擎_百度搜一下

村建站属于哪个部门_服务器租赁合同范本_免费网络空间搜索引擎_百度搜一下

时间:2025/7/12 10:24:52来源:https://blog.csdn.net/weixin_48502062/article/details/144441773 浏览次数:0次
村建站属于哪个部门_服务器租赁合同范本_免费网络空间搜索引擎_百度搜一下

本节重点介绍 :

  • 启动时同步对象存储的各个block元信息到本地,并且将索引数据也同步过来
  • 启动定时同步的任务
  • 封装prometheus查询的grpc服务,对外提供服务,底层调用配置的对象存储查询

store

  • 文档地址

store 的作用

  • 提供grpc查询 对象存储的接口
  • 充当查询网关

store代码做了什么

  • 启动时同步对象存储的各个block元信息到本地,并且将索引数据也同步过来
  • 启动定时同步的任务
  • 封装prometheus查询的grpc服务,对外提供服务,底层调用配置的对象存储查询

执行入口runStore

  • 代码位置 D:\go_path\src\github.com\thanos-io\thanos\cmd\thanos\store.go

初始化prober 和http server

  • http-server中有三部分内容
    • 探活相关的 /-/ready /-/healthy
    • pprof 相关
    • metrics相关
  • 代码如下
	grpcProbe := prober.NewGRPC()httpProbe := prober.NewHTTP()statusProber := prober.Combine(httpProbe,grpcProbe,prober.NewInstrumentation(conf.component, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),)srv := httpserver.New(logger, reg, conf.component, httpProbe,httpserver.WithListen(conf.httpConfig.bindAddress),httpserver.WithGracePeriod(time.Duration(conf.httpConfig.gracePeriod)),httpserver.WithTLSConfig(conf.httpConfig.tlsConfig),)g.Add(func() error {statusProber.Healthy()return srv.ListenAndServe()}, func(err error) {statusProber.NotReady(err)defer statusProber.NotHealthy(err)srv.Shutdown(err)})

根据配置的对象存储创建bkt

confContentYaml, err := conf.objStoreConfig.Content()if err != nil {return err}bkt, err := client.NewBucket(logger, confContentYaml, reg, conf.component.String())if err != nil {return errors.Wrap(err, "create bucket client")}

根据配置 创建缓存桶

  • 文档地址 https://thanos.io/tip/components/store.md/#caching-bucket
	cachingBucketConfigYaml, err := conf.cachingBucketConfig.Content()if err != nil {return errors.Wrap(err, "get caching bucket configuration")}if len(cachingBucketConfigYaml) > 0 {bkt, err = storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, bkt, logger, reg)if err != nil {return errors.Wrap(err, "create caching bucket")}}
  • 位置 D:\go_path\src\github.com\thanos-io\thanos\pkg\store\cache\caching_bucket_factory.go
  • 类型为memcached 和 本地内存的
func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {level.Info(logger).Log("msg", "loading caching bucket configuration")config := &CachingWithBackendConfig{}config.Defaults()if err := yaml.UnmarshalStrict(yamlContent, config); err != nil {return nil, errors.Wrap(err, "parsing config YAML file")}backendConfig, err := yaml.Marshal(config.BackendConfig)if err != nil {return nil, errors.Wrap(err, "marshal content of cache backend configuration")}var c cache.Cacheswitch strings.ToUpper(string(config.Type)) {case string(MemcachedBucketCacheProvider):var memcached cacheutil.MemcachedClientmemcached, err := cacheutil.NewMemcachedClient(logger, "caching-bucket", backendConfig, reg)if err != nil {return nil, errors.Wrapf(err, "failed to create memcached client")}c = cache.NewMemcachedCache("caching-bucket", logger, memcached, reg)case string(InMemoryBucketCacheProvider):c, err = cache.NewInMemoryCache("caching-bucket", logger, reg, backendConfig)if err != nil {return nil, errors.Wrapf(err, "failed to create inmemory cache")}default:return nil, errors.Errorf("unsupported cache type: %s", config.Type)}// Include interactions with cache in the traces.c = cache.NewTracingCache(c)cfg := NewCachingBucketConfig()// Configure cache.cfg.CacheGetRange("chunks", c, isTSDBChunkFile, config.ChunkSubrangeSize, config.ChunkObjectAttrsTTL, config.ChunkSubrangeTTL, config.MaxChunksGetRangeRequests)cfg.CacheExists("meta.jsons", c, isMetaFile, config.MetafileExistsTTL, config.MetafileDoesntExistTTL)cfg.CacheGet("meta.jsons", c, isMetaFile, int(config.MetafileMaxSize), config.MetafileContentTTL, config.MetafileExistsTTL, config.MetafileDoesntExistTTL)// Cache Iter requests for root.cfg.CacheIter("blocks-iter", c, isBlocksRootDir, config.BlocksIterTTL, JSONIterCodec{})cb, err := NewCachingBucket(bucket, cfg, logger, reg)if err != nil {return nil, err}return cb, nil
}

创建查询rebel配置,作用于查询 block中

  • 根据参数 selector.relabel-config
	relabelContentYaml, err := conf.selectorRelabelConf.Content()if err != nil {return errors.Wrap(err, "get content of relabel configuration")}relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions)if err != nil {return err}

根据配置创建 索引缓存

  • 文档地址 https://thanos.io/tip/components/store.md/#in-memory-index-cache
  • 如果 --index-cache.config-file 没有指定就是用本地内存创建缓存
  • 是用 lru创建缓存
	indexCacheContentYaml, err := conf.indexCacheConfigs.Content()if err != nil {return errors.Wrap(err, "get content of index cache configuration")}// Ensure we close up everything properly.defer func() {if err != nil {runutil.CloseWithLogOnErr(logger, bkt, "bucket client")}}()// Create the index cache loading its config from config file, while keeping// backward compatibility with the pre-config file era.var indexCache storecache.IndexCacheif len(indexCacheContentYaml) > 0 {indexCache, err = storecache.NewIndexCache(logger, indexCacheContentYaml, reg)} else {indexCache, err = storecache.NewInMemoryIndexCacheWithConfig(logger, reg, storecache.InMemoryIndexCacheConfig{MaxSize:     model.Bytes(conf.indexCacheSizeBytes),MaxItemSize: storecache.DefaultInMemoryIndexCacheConfig.MaxItemSize,})}

创建元数据 获取 的fetcher

	ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency)metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, bkt, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),[]block.MetadataFilter{block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime),block.NewLabelShardedMetaFilter(relabelConfig),block.NewConsistencyDelayMetaFilter(logger, time.Duration(conf.consistencyDelay), extprom.WrapRegistererWithPrefix("thanos_", reg)),ignoreDeletionMarkFilter,block.NewDeduplicateFilter(),}, nil)if err != nil {return errors.Wrap(err, "meta fetcher")}

创建prometheus查询的 并发限制器 gate

  • 底层调用 https://github.com/prometheus/prometheus/blob/main/pkg/gate/gate.go
	queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), int(conf.maxConcurrency))

是用上的参数创建 操作对象存储的对象bs

	options := []store.BucketStoreOption{store.WithLogger(logger),store.WithRegistry(reg),store.WithIndexCache(indexCache),store.WithQueryGate(queriesGate),store.WithChunkPool(chunkPool),store.WithFilterConfig(conf.filterConf),}if conf.debugLogging {options = append(options, store.WithDebugLogging())}bs, err := store.NewBucketStore(bkt,metaFetcher,conf.dataDir,store.NewChunksLimiterFactory(conf.maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.store.NewSeriesLimiterFactory(conf.maxTouchedSeriesCount),store.NewGapBasedPartitioner(store.PartitionerMaxGapSize),conf.blockSyncConcurrency,conf.advertiseCompatibilityLabel,conf.postingOffsetsInMemSampling,false,conf.lazyIndexReaderEnabled,conf.lazyIndexReaderIdleTimeout,options...,)if err != nil {return errors.Wrap(err, "create object storage store")}

启动时同步对象存储的 block元信息

  • 如果bs.InitialSync正常同步就继续,否则报错退出
	bucketStoreReady := make(chan struct{}){ctx, cancel := context.WithCancel(context.Background())g.Add(func() error {defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")level.Info(logger).Log("msg", "initializing bucket store")begin := time.Now()if err := bs.InitialSync(ctx); err != nil {close(bucketStoreReady)return errors.Wrap(err, "bucket store initial sync")}level.Info(logger).Log("msg", "bucket store ready", "init_duration", time.Since(begin).String())close(bucketStoreReady)err := runutil.Repeat(conf.syncInterval, ctx.Done(), func() error {if err := bs.SyncBlocks(ctx); err != nil {level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)}return nil})runutil.CloseWithLogOnErr(logger, bs, "bucket store")return err}, func(error) {cancel()})}

然后开启每3分钟同步的任务

			err := runutil.Repeat(conf.syncInterval, ctx.Done(), func() error {if err := bs.SyncBlocks(ctx); err != nil {level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)}return nil})

InitialSync解析

  • 代码位置 D:\go_path\src\github.com\thanos-io\thanos\pkg\store\bucket.go

先调用对象存储的接口获取 block的 metas信息

	metas, _, metaFetchErr := s.fetcher.Fetch(ctx)// For partial view allow adding new blocks at least.if metaFetchErr != nil && metas == nil {return metaFetchErr}var wg sync.WaitGroupblockc := make(chan *metadata.Meta)

并发的加载 block

	var wg sync.WaitGroupblockc := make(chan *metadata.Meta)for i := 0; i < s.blockSyncConcurrency; i++ {wg.Add(1)go func() {for meta := range blockc {if err := s.addBlock(ctx, meta); err != nil {continue}}wg.Done()}()}for id, meta := range metas {if b := s.getBlock(id); b != nil {continue}select {case <-ctx.Done():case blockc <- meta:}}close(blockc)wg.Wait()if metaFetchErr != nil {return metaFetchErr}

加载block addBlock函数

准备工作

  • 拼接dir
  • 读取标签,算哈希
	dir := filepath.Join(s.dir, meta.ULID.String())start := time.Now()level.Debug(s.logger).Log("msg", "loading new block", "id", meta.ULID)defer func() {if err != nil {s.metrics.blockLoadFailures.Inc()if err2 := os.RemoveAll(dir); err2 != nil {level.Warn(s.logger).Log("msg", "failed to remove block we cannot load", "err", err2)}level.Warn(s.logger).Log("msg", "loading block failed", "elapsed", time.Since(start), "id", meta.ULID, "err", err)} else {level.Info(s.logger).Log("msg", "loaded new block", "elapsed", time.Since(start), "id", meta.ULID)}}()s.metrics.blockLoads.Inc()lset := labels.FromMap(meta.Thanos.Labels)h := lset.Hash()

创建索引reader对象

  • 根据data目录下的 index-header 文件准备 这个block的索引数据文件
  • 如 /var/thanos/store/01FEHMEXVPZYD128FHFJSWEZGV/index-header
	indexHeaderReader, err := s.indexReaderPool.NewBinaryReader(ctx,s.logger,s.bkt,s.dir,meta.ULID,s.postingOffsetsInMemSampling,)
查询对象存储索引数据后 写入索引文件
  • D:\go_path\src\github.com\thanos-io\thanos\pkg\block\indexheader\binary_reader.go
  • 先准备 br对象,再调用WriteBinary查询对象存储写入索引
// NewBinaryReader loads or builds new index-header if not present on disk.
func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (*BinaryReader, error) {binfn := filepath.Join(dir, id.String(), block.IndexHeaderFilename)br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling)if err == nil {return br, nil}level.Debug(logger).Log("msg", "failed to read index-header from disk; recreating", "path", binfn, "err", err)start := time.Now()if err := WriteBinary(ctx, bkt, id, binfn); err != nil {return nil, errors.Wrap(err, "write index header")}level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start))return newFileBinaryReader(binfn, postingOffsetsInMemSampling)
}
WriteBinary函数
func WriteBinary(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID, filename string) (err error) {ir, indexVersion, err := newChunkedIndexReader(ctx, bkt, id)if err != nil {return errors.Wrap(err, "new index reader")}tmpFilename := filename + ".tmp"// Buffer for copying and encbuffers.// This also will control the size of file writer buffer.buf := make([]byte, 32*1024)bw, err := newBinaryWriter(tmpFilename, buf)if err != nil {return errors.Wrap(err, "new binary index header writer")}defer runutil.CloseWithErrCapture(&err, bw, "close binary writer for %s", tmpFilename)if err := bw.AddIndexMeta(indexVersion, ir.toc.PostingsTable); err != nil {return errors.Wrap(err, "add index meta")}if err := ir.CopySymbols(bw.SymbolsWriter(), buf); err != nil {return err}if err := bw.f.Flush(); err != nil {return errors.Wrap(err, "flush")}if err := ir.CopyPostingsOffsets(bw.PostingOffsetsWriter(), buf); err != nil {return err}if err := bw.f.Flush(); err != nil {return errors.Wrap(err, "flush")}if err := bw.WriteTOC(); err != nil {return errors.Wrap(err, "write index header TOC")}if err := bw.f.Flush(); err != nil {return errors.Wrap(err, "flush")}if err := bw.f.f.Sync(); err != nil {return errors.Wrap(err, "sync")}// Create index-header in atomic way, to avoid partial writes (e.g during restart or crash of store GW).return os.Rename(tmpFilename, filename)
}

开启grpc 查询数据的服务

  • D:\go_path\src\github.com\thanos-io\thanos\cmd\thanos\store.go
	{tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpcConfig.tlsSrvCert, conf.grpcConfig.tlsSrvKey, conf.grpcConfig.tlsSrvClientCA)if err != nil {return errors.Wrap(err, "setup gRPC server")}s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, conf.component, grpcProbe,grpcserver.WithServer(store.RegisterStoreServer(bs)),grpcserver.WithListen(conf.grpcConfig.bindAddress),grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)),grpcserver.WithTLSConfig(tlsCfg),)g.Add(func() error {<-bucketStoreReadystatusProber.Ready()return s.ListenAndServe()}, func(err error) {statusProber.NotReady(err)s.Shutdown(err)})}// 

注册prometheus查询的grpc 服务

grpcserver.WithServer(store.RegisterStoreServer(bs)),
  • 位置 D:\go_path\src\github.com\thanos-io\thanos\pkg\store\storepb\rpc.pb.go
var _Store_serviceDesc = grpc.ServiceDesc{ServiceName: "thanos.Store",HandlerType: (*StoreServer)(nil),Methods: []grpc.MethodDesc{{MethodName: "Info",Handler:    _Store_Info_Handler,},{MethodName: "LabelNames",Handler:    _Store_LabelNames_Handler,},{MethodName: "LabelValues",Handler:    _Store_LabelValues_Handler,},},Streams: []grpc.StreamDesc{{StreamName:    "Series",Handler:       _Store_Series_Handler,ServerStreams: true,},},Metadata: "store/storepb/rpc.proto",
}

开启查看ui相关的web

	// Add bucket UI for loaded blocks.{r := route.New()ins := extpromhttp.NewInstrumentationMiddleware(reg, nil)compactorView := ui.NewBucketUI(logger, "", conf.webConfig.externalPrefix, conf.webConfig.prefixHeaderName, "/loaded", conf.component)compactorView.Register(r, true, ins)// Configure Request Logging for HTTP calls.logMiddleware := logging.NewHTTPServerMiddleware(logger, httpLogOpts...)api := blocksAPI.NewBlocksAPI(logger, conf.webConfig.disableCORS, "", flagsMap)api.Register(r.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)metaFetcher.UpdateOnChange(func(blocks []metadata.Meta, err error) {compactorView.Set(blocks, err)api.SetLoaded(blocks, err)})srv.Handle("/", r)}

本节重点总结 :

  • 启动时同步对象存储的各个block元信息到本地,并且将索引数据也同步过来
  • 启动定时同步的任务
  • 封装prometheus查询的grpc服务,对外提供服务,底层调用配置的对象存储查询
关键字:村建站属于哪个部门_服务器租赁合同范本_免费网络空间搜索引擎_百度搜一下

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: