Skip to content

Commit

Permalink
Rename storage module
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Apr 22, 2019
1 parent 76b1372 commit 727e8c4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 25 deletions.
4 changes: 2 additions & 2 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
// https://godoc.org/github.com/prometheus/client_golang/prometheus#hdr-Custom_Collectors_and_constant_Metrics
type Collector struct {
opts *options.Options
storage *storage.OffsetStorage
storage *storage.MemoryStorage
logger *log.Entry
}

Expand All @@ -45,7 +45,7 @@ type versionedConsumerGroup struct {

// NewCollector returns a new prometheus collector, preinitialized with all the to be exposed metrics under respect
// of the metrics prefix which can be passed via environment variables
func NewCollector(opts *options.Options, storage *storage.OffsetStorage) *Collector {
func NewCollector(opts *options.Options, storage *storage.MemoryStorage) *Collector {
logger := log.WithFields(log.Fields{
"module": "collector",
})
Expand Down
46 changes: 23 additions & 23 deletions storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
// PartitionWaterMarks represents a map of PartitionWaterMarks grouped by PartitionID
type PartitionWaterMarks = map[int32]kafka.PartitionWaterMark

// OffsetStorage stores the latest committed offsets for each group, topic, partition combination and offers an interface
// MemoryStorage stores the latest committed offsets for each group, topic, partition combination and offers an interface
// to access these information
type OffsetStorage struct {
type MemoryStorage struct {
logger *log.Entry

// Channels for receiving storage requests
Expand Down Expand Up @@ -66,8 +66,8 @@ type ConsumerPartitionOffsetMetric struct {
TotalCommitCount float64
}

// NewOffsetStorage creates a new storage and preinitializes the required maps which store the PartitionOffset information
func NewOffsetStorage(consumerOffsetCh <-chan *kafka.StorageRequest, clusterCh <-chan *kafka.StorageRequest) *OffsetStorage {
// NewMemoryStorage creates a new storage and preinitializes the required maps which store the PartitionOffset information
func NewMemoryStorage(consumerOffsetCh <-chan *kafka.StorageRequest, clusterCh <-chan *kafka.StorageRequest) *MemoryStorage {
groups := &consumerGroup{
Offsets: make(map[string]ConsumerPartitionOffsetMetric),
Metadata: make(map[string]kafka.ConsumerGroupMetadata),
Expand All @@ -87,7 +87,7 @@ func NewOffsetStorage(consumerOffsetCh <-chan *kafka.StorageRequest, clusterCh <
Configs: make(map[string]kafka.TopicConfiguration),
}

return &OffsetStorage{
return &MemoryStorage{
logger: log.WithFields(log.Fields{
"module": "storage",
}),
Expand All @@ -103,12 +103,12 @@ func NewOffsetStorage(consumerOffsetCh <-chan *kafka.StorageRequest, clusterCh <
}

// Start starts listening for incoming offset entries on the input channel so that they can be stored
func (module *OffsetStorage) Start() {
func (module *MemoryStorage) Start() {
go module.consumerOffsetWorker()
go module.clusterWorker()
}

func (module *OffsetStorage) consumerOffsetWorker() {
func (module *MemoryStorage) consumerOffsetWorker() {
for request := range module.consumerOffsetCh {
switch request.RequestType {
case kafka.StorageAddConsumerOffset:
Expand All @@ -132,7 +132,7 @@ func (module *OffsetStorage) consumerOffsetWorker() {
log.Panic("Group offset storage channel closed")
}

func (module *OffsetStorage) clusterWorker() {
func (module *MemoryStorage) clusterWorker() {
for request := range module.clusterCh {
switch request.RequestType {
case kafka.StorageAddPartitionLowWaterMark:
Expand All @@ -154,7 +154,7 @@ func (module *OffsetStorage) clusterWorker() {
log.Panic("Partition Offset storage channel closed")
}

func (module *OffsetStorage) deleteTopic(topicName string) {
func (module *MemoryStorage) deleteTopic(topicName string) {
module.topics.ConfigsLock.Lock()
module.partitions.LowWaterMarksLock.Lock()
module.partitions.HighWaterMarksLock.Lock()
Expand All @@ -167,7 +167,7 @@ func (module *OffsetStorage) deleteTopic(topicName string) {
delete(module.topics.Configs, topicName)
}

func (module *OffsetStorage) storePartitionHighWaterMark(offset *kafka.PartitionWaterMark) {
func (module *MemoryStorage) storePartitionHighWaterMark(offset *kafka.PartitionWaterMark) {
module.partitions.HighWaterMarksLock.Lock()
defer module.partitions.HighWaterMarksLock.Unlock()

Expand All @@ -179,7 +179,7 @@ func (module *OffsetStorage) storePartitionHighWaterMark(offset *kafka.Partition
module.partitions.HighWaterMarks[offset.TopicName][offset.PartitionID] = *offset
}

func (module *OffsetStorage) storePartitionLowWaterMark(offset *kafka.PartitionWaterMark) {
func (module *MemoryStorage) storePartitionLowWaterMark(offset *kafka.PartitionWaterMark) {
module.partitions.LowWaterMarksLock.Lock()
defer module.partitions.LowWaterMarksLock.Unlock()

Expand All @@ -191,29 +191,29 @@ func (module *OffsetStorage) storePartitionLowWaterMark(offset *kafka.PartitionW
module.partitions.LowWaterMarks[offset.TopicName][offset.PartitionID] = *offset
}

func (module *OffsetStorage) storeGroupMetadata(metadata *kafka.ConsumerGroupMetadata) {
func (module *MemoryStorage) storeGroupMetadata(metadata *kafka.ConsumerGroupMetadata) {
module.groups.MetadataLock.Lock()
defer module.groups.MetadataLock.Unlock()

module.groups.Metadata[metadata.Group] = *metadata
}

func (module *OffsetStorage) storeTopicConfig(config *kafka.TopicConfiguration) {
func (module *MemoryStorage) storeTopicConfig(config *kafka.TopicConfiguration) {
module.topics.ConfigsLock.Lock()
defer module.topics.ConfigsLock.Unlock()

module.topics.Configs[config.TopicName] = *config
}

func (module *OffsetStorage) registerOffsetPartitions(partitionCount int) {
func (module *MemoryStorage) registerOffsetPartitions(partitionCount int) {
module.status.Lock.Lock()
defer module.status.Lock.Unlock()

module.logger.Infof("Registered %v __consumer_offsets partitions which have to be consumed before metrics can be exposed", partitionCount)
module.status.NotReadyPartitionConsumers = partitionCount
}

func (module *OffsetStorage) markOffsetPartitionReady(partitionID int32) {
func (module *MemoryStorage) markOffsetPartitionReady(partitionID int32) {
module.status.Lock.Lock()
defer module.status.Lock.Unlock()

Expand All @@ -224,7 +224,7 @@ func (module *OffsetStorage) markOffsetPartitionReady(partitionID int32) {
}
}

func (module *OffsetStorage) storeOffsetEntry(offset *kafka.ConsumerPartitionOffset) {
func (module *MemoryStorage) storeOffsetEntry(offset *kafka.ConsumerPartitionOffset) {
module.groups.OffsetsLock.Lock()
defer module.groups.OffsetsLock.Unlock()

Expand All @@ -244,7 +244,7 @@ func (module *OffsetStorage) storeOffsetEntry(offset *kafka.ConsumerPartitionOff
}
}

func (module *OffsetStorage) deleteOffsetEntry(consumerGroupName string, topicName string, partitionID int32) {
func (module *MemoryStorage) deleteOffsetEntry(consumerGroupName string, topicName string, partitionID int32) {
key := fmt.Sprintf("%v:%v:%v", consumerGroupName, topicName, partitionID)
module.groups.OffsetsLock.Lock()
defer module.groups.OffsetsLock.Unlock()
Expand All @@ -254,7 +254,7 @@ func (module *OffsetStorage) deleteOffsetEntry(consumerGroupName string, topicNa

// ConsumerOffsets returns a copy of the currently known consumer group offsets, so that they can safely be processed
// in another go routine
func (module *OffsetStorage) ConsumerOffsets() map[string]ConsumerPartitionOffsetMetric {
func (module *MemoryStorage) ConsumerOffsets() map[string]ConsumerPartitionOffsetMetric {
module.groups.OffsetsLock.RLock()
defer module.groups.OffsetsLock.RUnlock()

Expand All @@ -267,7 +267,7 @@ func (module *OffsetStorage) ConsumerOffsets() map[string]ConsumerPartitionOffse
}

// GroupMetadata returns a copy of the currently known group metadata
func (module *OffsetStorage) GroupMetadata() map[string]kafka.ConsumerGroupMetadata {
func (module *MemoryStorage) GroupMetadata() map[string]kafka.ConsumerGroupMetadata {
module.groups.MetadataLock.RLock()
defer module.groups.MetadataLock.RUnlock()

Expand All @@ -281,7 +281,7 @@ func (module *OffsetStorage) GroupMetadata() map[string]kafka.ConsumerGroupMetad

// TopicConfigs returns all topic configurations in a copied map, so that it
// is safe to process in another go routine
func (module *OffsetStorage) TopicConfigs() map[string]kafka.TopicConfiguration {
func (module *MemoryStorage) TopicConfigs() map[string]kafka.TopicConfiguration {
module.topics.ConfigsLock.RLock()
defer module.topics.ConfigsLock.RUnlock()

Expand All @@ -295,7 +295,7 @@ func (module *OffsetStorage) TopicConfigs() map[string]kafka.TopicConfiguration

// PartitionHighWaterMarks returns all partition high water marks in a copied map, so that it
// is safe to process in another go routine
func (module *OffsetStorage) PartitionHighWaterMarks() map[string]PartitionWaterMarks {
func (module *MemoryStorage) PartitionHighWaterMarks() map[string]PartitionWaterMarks {
module.partitions.HighWaterMarksLock.RLock()
defer module.partitions.HighWaterMarksLock.RUnlock()

Expand All @@ -309,7 +309,7 @@ func (module *OffsetStorage) PartitionHighWaterMarks() map[string]PartitionWater

// PartitionLowWaterMarks returns all partition low water marks in a copied map, so that it
// is safe to process in another go routine
func (module *OffsetStorage) PartitionLowWaterMarks() map[string]PartitionWaterMarks {
func (module *MemoryStorage) PartitionLowWaterMarks() map[string]PartitionWaterMarks {
module.partitions.LowWaterMarksLock.RLock()
defer module.partitions.LowWaterMarksLock.RUnlock()

Expand All @@ -323,7 +323,7 @@ func (module *OffsetStorage) PartitionLowWaterMarks() map[string]PartitionWaterM

// IsConsumed indicates whether the consumer offsets topic lag has been caught up and therefore
// the metrics reported by this module are accurate or not
func (module *OffsetStorage) IsConsumed() bool {
func (module *MemoryStorage) IsConsumed() bool {
module.status.Lock.RLock()
defer module.status.Lock.RUnlock()

Expand Down

0 comments on commit 727e8c4

Please sign in to comment.