From bf25d2150d0f8a34d40ed24db4a5362252e28ecd Mon Sep 17 00:00:00 2001 From: Roman Shtylman Date: Sat, 18 Nov 2023 16:49:22 -0800 Subject: [PATCH 1/7] Add `mcap du` command --- go/cli/mcap/cmd/du.go | 282 ++++++++++++++++++++++++++++++++++++++++++ mcap.code-workspace | 19 +++ 2 files changed, 301 insertions(+) create mode 100644 go/cli/mcap/cmd/du.go create mode 100644 mcap.code-workspace diff --git a/go/cli/mcap/cmd/du.go b/go/cli/mcap/cmd/du.go new file mode 100644 index 0000000000..9955f65334 --- /dev/null +++ b/go/cli/mcap/cmd/du.go @@ -0,0 +1,282 @@ +package cmd + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "hash/crc32" + "io" + "os" + "strings" + + "github.com/foxglove/mcap/go/cli/mcap/utils" + "github.com/foxglove/mcap/go/mcap" + "github.com/klauspost/compress/zstd" + "github.com/olekukonko/tablewriter" + "github.com/pierrec/lz4/v4" + "github.com/spf13/cobra" +) + +type usage struct { + reader io.ReadSeeker + + channels map[uint16]*mcap.Channel + + // total message size of all messages + totalMessageSize uint64 + + // total message size by topic name + topicMessageSize map[string]uint64 + + totalSize uint64 + + // record kind to size + recordKindSize map[string]uint64 +} + +func newUsage(reader io.ReadSeeker) *usage { + return &usage{ + reader: reader, + channels: make(map[uint16]*mcap.Channel), + topicMessageSize: make(map[string]uint64), + recordKindSize: make(map[string]uint64), + } +} + +func (instance *usage) processChunk(chunk *mcap.Chunk) error { + compressionFormat := mcap.CompressionFormat(chunk.Compression) + var uncompressedBytes []byte + + switch compressionFormat { + case mcap.CompressionNone: + uncompressedBytes = chunk.Records + case mcap.CompressionZSTD: + compressedDataReader := bytes.NewReader(chunk.Records) + chunkDataReader, err := zstd.NewReader(compressedDataReader) + if err != nil { + return fmt.Errorf("could not make zstd decoder: %w", err) + } + uncompressedBytes, err = io.ReadAll(chunkDataReader) + if err != nil { + return fmt.Errorf("could not decompress: %w", err) + } + case mcap.CompressionLZ4: + var err error + compressedDataReader := bytes.NewReader(chunk.Records) + chunkDataReader := lz4.NewReader(compressedDataReader) + uncompressedBytes, err = io.ReadAll(chunkDataReader) + if err != nil { + return fmt.Errorf("could not decompress: %w", err) + } + default: + return fmt.Errorf("unsupported compression format: %s", chunk.Compression) + } + + if uint64(len(uncompressedBytes)) != chunk.UncompressedSize { + return fmt.Errorf("uncompressed chunk data size != Chunk.uncompressed_size") + } + + if chunk.UncompressedCRC != 0 { + crc := crc32.ChecksumIEEE(uncompressedBytes) + if crc != chunk.UncompressedCRC { + return fmt.Errorf("invalid CRC: %x != %x", crc, chunk.UncompressedCRC) + } + } + + uncompressedBytesReader := bytes.NewReader(uncompressedBytes) + + lexer, err := mcap.NewLexer(uncompressedBytesReader, &mcap.LexerOptions{ + SkipMagic: true, + ValidateChunkCRCs: true, + EmitChunks: true, + }) + if err != nil { + return fmt.Errorf("failed to make lexer for chunk bytes: %s", err) + } + defer lexer.Close() + + msg := make([]byte, 1024) + for { + tokenType, data, err := lexer.Next(msg) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return fmt.Errorf("failed to read next token: %s", err) + } + if len(data) > len(msg) { + msg = data + } + + switch tokenType { + case mcap.TokenChannel: + channel, err := mcap.ParseChannel(data) + if err != nil { + return fmt.Errorf("Error parsing Channel: %s", err) + } + + instance.channels[channel.ID] = channel + case mcap.TokenMessage: + message, err := mcap.ParseMessage(data) + if err != nil { + return fmt.Errorf("Error parsing Message: %s", err) + } + + channel := instance.channels[message.ChannelID] + if channel == nil { + return fmt.Errorf("got a Message record for unknown channel: %d", message.ChannelID) + } + + messageSize := uint64(len(message.Data)) + + instance.totalMessageSize += messageSize + instance.topicMessageSize[channel.Topic] += messageSize + } + } + + return nil +} + +func (instance *usage) RunDu() error { + lexer, err := mcap.NewLexer(instance.reader, &mcap.LexerOptions{ + SkipMagic: false, + ValidateChunkCRCs: true, + EmitChunks: true, + }) + if err != nil { + return err + } + defer lexer.Close() + + msg := make([]byte, 1024) + for { + tokenType, data, err := lexer.Next(msg) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return fmt.Errorf("failed to read next token: %s", err) + } + if len(data) > len(msg) { + msg = data + } + + instance.totalSize += uint64(len(data)) + instance.recordKindSize[tokenType.String()] += uint64(len(data)) + + switch tokenType { + case mcap.TokenChannel: + channel, err := mcap.ParseChannel(data) + if err != nil { + return fmt.Errorf("error parsing Channel: %s", err) + } + + instance.channels[channel.ID] = channel + case mcap.TokenMessage: + message, err := mcap.ParseMessage(data) + if err != nil { + return fmt.Errorf("error parsing Message: %s", err) + } + channel := instance.channels[message.ChannelID] + if channel == nil { + return fmt.Errorf("got a Message record for unknown channel: %d", message.ChannelID) + } + + messageSize := uint64(len(message.Data)) + + instance.totalMessageSize += messageSize + instance.topicMessageSize[channel.Topic] += messageSize + case mcap.TokenChunk: + chunk, err := mcap.ParseChunk(data) + if err != nil { + return fmt.Errorf("error parsing Message: %s", err) + } + err = instance.processChunk(chunk) + if err != nil { + return err + } + } + } + + { + rows := [][]string{} + + for recordKind, size := range instance.recordKindSize { + row := []string{ + recordKind, fmt.Sprintf("%d", size), + fmt.Sprintf("%f", float32(size)/float32(instance.totalSize)*100.0), + } + + rows = append(rows, row) + } + + printTable(os.Stdout, rows, []string{ + "record kind", "sum bytes", "% of total file bytes", + }) + } + + fmt.Println() + + { + rows := [][]string{} + + for topic, topicSize := range instance.topicMessageSize { + row := []string{ + topic, fmt.Sprintf("%d", topicSize), + fmt.Sprintf("%f", float32(topicSize)/float32(instance.totalMessageSize)*100.0), + } + + rows = append(rows, row) + } + + printTable(os.Stdout, rows, []string{ + "topic", "sum bytes", "% of total message bytes", + }) + } + + return nil +} + +func printTable(w io.Writer, rows [][]string, header []string) { + buf := &bytes.Buffer{} + tw := tablewriter.NewWriter(buf) + tw.SetBorder(false) + tw.SetAutoWrapText(false) + tw.SetAlignment(tablewriter.ALIGN_DEFAULT) + tw.SetHeaderAlignment(tablewriter.ALIGN_LEFT) + tw.SetHeader(header) + tw.AppendBulk(rows) + tw.Render() + // This tablewriter puts a leading space on the lines for some reason, so + // remove it. + scanner := bufio.NewScanner(buf) + for scanner.Scan() { + fmt.Fprintln(w, strings.TrimLeft(scanner.Text(), " ")) + } +} + +var duCmd = &cobra.Command{ + Use: "du ", + Short: "Report space usage within an MCAP file", + Run: func(cmd *cobra.Command, args []string) { + ctx := context.Background() + if len(args) != 1 { + die("An MCAP file argument is required.") + } + filename := args[0] + err := utils.WithReader(ctx, filename, func(remote bool, rs io.ReadSeeker) error { + usage := newUsage(rs) + return usage.RunDu() + }) + if err != nil { + die("du command failed: %s", err) + } + }, +} + +func init() { + rootCmd.AddCommand(duCmd) +} diff --git a/mcap.code-workspace b/mcap.code-workspace new file mode 100644 index 0000000000..ddf9f01742 --- /dev/null +++ b/mcap.code-workspace @@ -0,0 +1,19 @@ +{ + "folders": [ + { + "path": "go" + }, + { + "path": "rust" + }, + { + "path": "typescript" + }, + { + "path": "swift" + }, + { + "path": "cpp" + } +] +} From 15d848bb28bafa588ce51cc33468b2bc4546c5a0 Mon Sep 17 00:00:00 2001 From: Roman Shtylman Date: Wed, 30 Oct 2024 04:05:36 -0700 Subject: [PATCH 2/7] update --- go/cli/mcap/cmd/du.go | 1 + mcap.code-workspace | 19 ------------------- 2 files changed, 1 insertion(+), 19 deletions(-) delete mode 100644 mcap.code-workspace diff --git a/go/cli/mcap/cmd/du.go b/go/cli/mcap/cmd/du.go index 9955f65334..67de5dd35e 100644 --- a/go/cli/mcap/cmd/du.go +++ b/go/cli/mcap/cmd/du.go @@ -42,6 +42,7 @@ func newUsage(reader io.ReadSeeker) *usage { channels: make(map[uint16]*mcap.Channel), topicMessageSize: make(map[string]uint64), recordKindSize: make(map[string]uint64), + totalSize: 16, /* 8 bytes for leading magic and 8 bytes for trailing magic */ } } diff --git a/mcap.code-workspace b/mcap.code-workspace deleted file mode 100644 index ddf9f01742..0000000000 --- a/mcap.code-workspace +++ /dev/null @@ -1,19 +0,0 @@ -{ - "folders": [ - { - "path": "go" - }, - { - "path": "rust" - }, - { - "path": "typescript" - }, - { - "path": "swift" - }, - { - "path": "cpp" - } -] -} From 1e4d266c048c79d73766252676c0ac9182ec1e71 Mon Sep 17 00:00:00 2001 From: Roman Shtylman Date: Wed, 30 Oct 2024 04:46:22 -0700 Subject: [PATCH 3/7] update --- go/cli/mcap/cmd/du.go | 76 ++++++++++++++++++++++++++----------------- 1 file changed, 46 insertions(+), 30 deletions(-) diff --git a/go/cli/mcap/cmd/du.go b/go/cli/mcap/cmd/du.go index 67de5dd35e..67757dea64 100644 --- a/go/cli/mcap/cmd/du.go +++ b/go/cli/mcap/cmd/du.go @@ -1,7 +1,6 @@ package cmd import ( - "bufio" "bytes" "context" "errors" @@ -9,12 +8,11 @@ import ( "hash/crc32" "io" "os" - "strings" + "sort" "github.com/foxglove/mcap/go/cli/mcap/utils" "github.com/foxglove/mcap/go/mcap" "github.com/klauspost/compress/zstd" - "github.com/olekukonko/tablewriter" "github.com/pierrec/lz4/v4" "github.com/spf13/cobra" ) @@ -202,8 +200,21 @@ func (instance *usage) RunDu() error { } } + fmt.Println("Top level record stats:") + fmt.Println() + { rows := [][]string{} + rows = append(rows, []string{ + "record", + "sum bytes", + "% of total file bytes", + }) + rows = append(rows, []string{ + "------", + "---------", + "---------------------", + }) for recordKind, size := range instance.recordKindSize { row := []string{ @@ -214,51 +225,56 @@ func (instance *usage) RunDu() error { rows = append(rows, row) } - printTable(os.Stdout, rows, []string{ - "record kind", "sum bytes", "% of total file bytes", - }) + utils.FormatTable(os.Stdout, rows) } + fmt.Println() + fmt.Println("Message size stats:") fmt.Println() { rows := [][]string{} + rows = append(rows, []string{ + "topic", + "sum bytes (uncompressed)", + "% of total message bytes (uncompressed)", + }) + rows = append(rows, []string{ + "-----", + "------------------------", + "---------------------------------------", + }) - for topic, topicSize := range instance.topicMessageSize { + type topicInfo struct { + name string + size uint64 + } + topicInfos := make([]topicInfo, 0, len(instance.topicMessageSize)) + for topic, size := range instance.topicMessageSize { + topicInfos = append(topicInfos, topicInfo{topic, size}) + } + + // Sort for largest topics first + sort.Slice(topicInfos, func(i, j int) bool { + return topicInfos[i].size > topicInfos[j].size + }) + + for _, info := range topicInfos { row := []string{ - topic, fmt.Sprintf("%d", topicSize), - fmt.Sprintf("%f", float32(topicSize)/float32(instance.totalMessageSize)*100.0), + info.name, + humanBytes(info.size), + fmt.Sprintf("%f", float32(info.size)/float32(instance.totalMessageSize)*100.0), } rows = append(rows, row) } - printTable(os.Stdout, rows, []string{ - "topic", "sum bytes", "% of total message bytes", - }) + utils.FormatTable(os.Stdout, rows) } return nil } -func printTable(w io.Writer, rows [][]string, header []string) { - buf := &bytes.Buffer{} - tw := tablewriter.NewWriter(buf) - tw.SetBorder(false) - tw.SetAutoWrapText(false) - tw.SetAlignment(tablewriter.ALIGN_DEFAULT) - tw.SetHeaderAlignment(tablewriter.ALIGN_LEFT) - tw.SetHeader(header) - tw.AppendBulk(rows) - tw.Render() - // This tablewriter puts a leading space on the lines for some reason, so - // remove it. - scanner := bufio.NewScanner(buf) - for scanner.Scan() { - fmt.Fprintln(w, strings.TrimLeft(scanner.Text(), " ")) - } -} - var duCmd = &cobra.Command{ Use: "du ", Short: "Report space usage within an MCAP file", From 3c1cc244ef95353d53f0fad0bf8845981b389596 Mon Sep 17 00:00:00 2001 From: Roman Shtylman Date: Wed, 30 Oct 2024 04:53:02 -0700 Subject: [PATCH 4/7] linter --- go/cli/mcap/cmd/du.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/go/cli/mcap/cmd/du.go b/go/cli/mcap/cmd/du.go index 67757dea64..cb6b397228 100644 --- a/go/cli/mcap/cmd/du.go +++ b/go/cli/mcap/cmd/du.go @@ -92,7 +92,7 @@ func (instance *usage) processChunk(chunk *mcap.Chunk) error { EmitChunks: true, }) if err != nil { - return fmt.Errorf("failed to make lexer for chunk bytes: %s", err) + return fmt.Errorf("failed to make lexer for chunk bytes: %w", err) } defer lexer.Close() @@ -103,7 +103,7 @@ func (instance *usage) processChunk(chunk *mcap.Chunk) error { if errors.Is(err, io.EOF) { break } - return fmt.Errorf("failed to read next token: %s", err) + return fmt.Errorf("failed to read next token: %w", err) } if len(data) > len(msg) { msg = data @@ -113,7 +113,7 @@ func (instance *usage) processChunk(chunk *mcap.Chunk) error { case mcap.TokenChannel: channel, err := mcap.ParseChannel(data) if err != nil { - return fmt.Errorf("Error parsing Channel: %s", err) + return fmt.Errorf("Error parsing Channel: %w", err) } instance.channels[channel.ID] = channel @@ -209,8 +209,7 @@ func (instance *usage) RunDu() error { "record", "sum bytes", "% of total file bytes", - }) - rows = append(rows, []string{ + }, []string{ "------", "---------", "---------------------", @@ -238,8 +237,7 @@ func (instance *usage) RunDu() error { "topic", "sum bytes (uncompressed)", "% of total message bytes (uncompressed)", - }) - rows = append(rows, []string{ + }, []string{ "-----", "------------------------", "---------------------------------------", @@ -278,13 +276,13 @@ func (instance *usage) RunDu() error { var duCmd = &cobra.Command{ Use: "du ", Short: "Report space usage within an MCAP file", - Run: func(cmd *cobra.Command, args []string) { + Run: func(_ *cobra.Command, args []string) { ctx := context.Background() if len(args) != 1 { die("An MCAP file argument is required.") } filename := args[0] - err := utils.WithReader(ctx, filename, func(remote bool, rs io.ReadSeeker) error { + err := utils.WithReader(ctx, filename, func(_ bool, rs io.ReadSeeker) error { usage := newUsage(rs) return usage.RunDu() }) From af81a34daf6e99230c68bcce09ecba4641063435 Mon Sep 17 00:00:00 2001 From: Roman Shtylman Date: Wed, 30 Oct 2024 04:59:26 -0700 Subject: [PATCH 5/7] more lint --- go/cli/mcap/cmd/du.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/go/cli/mcap/cmd/du.go b/go/cli/mcap/cmd/du.go index cb6b397228..c5fe1c623d 100644 --- a/go/cli/mcap/cmd/du.go +++ b/go/cli/mcap/cmd/du.go @@ -120,7 +120,7 @@ func (instance *usage) processChunk(chunk *mcap.Chunk) error { case mcap.TokenMessage: message, err := mcap.ParseMessage(data) if err != nil { - return fmt.Errorf("Error parsing Message: %s", err) + return fmt.Errorf("Error parsing Message: %w", err) } channel := instance.channels[message.ChannelID] @@ -157,7 +157,7 @@ func (instance *usage) RunDu() error { break } - return fmt.Errorf("failed to read next token: %s", err) + return fmt.Errorf("failed to read next token: %w", err) } if len(data) > len(msg) { msg = data @@ -170,14 +170,14 @@ func (instance *usage) RunDu() error { case mcap.TokenChannel: channel, err := mcap.ParseChannel(data) if err != nil { - return fmt.Errorf("error parsing Channel: %s", err) + return fmt.Errorf("error parsing Channel: %w", err) } instance.channels[channel.ID] = channel case mcap.TokenMessage: message, err := mcap.ParseMessage(data) if err != nil { - return fmt.Errorf("error parsing Message: %s", err) + return fmt.Errorf("error parsing Message: %w", err) } channel := instance.channels[message.ChannelID] if channel == nil { @@ -191,7 +191,7 @@ func (instance *usage) RunDu() error { case mcap.TokenChunk: chunk, err := mcap.ParseChunk(data) if err != nil { - return fmt.Errorf("error parsing Message: %s", err) + return fmt.Errorf("error parsing Message: %w", err) } err = instance.processChunk(chunk) if err != nil { @@ -279,7 +279,7 @@ var duCmd = &cobra.Command{ Run: func(_ *cobra.Command, args []string) { ctx := context.Background() if len(args) != 1 { - die("An MCAP file argument is required.") + die("Unexpected number of args") } filename := args[0] err := utils.WithReader(ctx, filename, func(_ bool, rs io.ReadSeeker) error { @@ -287,7 +287,7 @@ var duCmd = &cobra.Command{ return usage.RunDu() }) if err != nil { - die("du command failed: %s", err) + die("Failed to read file %s: %v", filename, err) } }, } From 30ac0e9d7e7fe9a92bd738141ac9f65da6c39e9d Mon Sep 17 00:00:00 2001 From: Roman Shtylman Date: Fri, 1 Nov 2024 12:49:00 -0700 Subject: [PATCH 6/7] add long description --- go/cli/mcap/cmd/du.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/cli/mcap/cmd/du.go b/go/cli/mcap/cmd/du.go index c5fe1c623d..1ed6ba0600 100644 --- a/go/cli/mcap/cmd/du.go +++ b/go/cli/mcap/cmd/du.go @@ -276,6 +276,7 @@ func (instance *usage) RunDu() error { var duCmd = &cobra.Command{ Use: "du ", Short: "Report space usage within an MCAP file", + Long: "This command reports space usage within an mcap file. Space usage for messages is calculated using the uncompressed size.\n\nNote: This command will scan and uncompress the entire file.", Run: func(_ *cobra.Command, args []string) { ctx := context.Background() if len(args) != 1 { From bac6c7e1545a6e9cb420213323ce5932638d6eaa Mon Sep 17 00:00:00 2001 From: Roman Shtylman Date: Fri, 1 Nov 2024 12:59:36 -0700 Subject: [PATCH 7/7] lint --- go/cli/mcap/cmd/du.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/go/cli/mcap/cmd/du.go b/go/cli/mcap/cmd/du.go index 1ed6ba0600..a637602d79 100644 --- a/go/cli/mcap/cmd/du.go +++ b/go/cli/mcap/cmd/du.go @@ -276,7 +276,10 @@ func (instance *usage) RunDu() error { var duCmd = &cobra.Command{ Use: "du ", Short: "Report space usage within an MCAP file", - Long: "This command reports space usage within an mcap file. Space usage for messages is calculated using the uncompressed size.\n\nNote: This command will scan and uncompress the entire file.", + Long: `This command reports space usage within an mcap file. Space usage for messages is +calculated using the uncompressed size. + +Note: This command will scan and uncompress the entire file.`, Run: func(_ *cobra.Command, args []string) { ctx := context.Background() if len(args) != 1 {