Skip to content

Commit

Permalink
Add anonymous error reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunlol committed Feb 24, 2025
1 parent ee55cff commit bac6b91
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 76 deletions.
3 changes: 0 additions & 3 deletions devbox.lock
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
{
"lockfile_version": "1",
"packages": {
"github:NixOS/nixpkgs/nixpkgs-unstable": {
"resolved": "github:NixOS/nixpkgs/b1b43d32be000928cc71250ed77f4a0a5f2bc23a?lastModified=1739698114&narHash=sha256-8S9n69Dnpg8DhfFlP0YvMGmSOY2X4kImGSPWXYNpaHM%3D"
},
"go@latest": {
"last_modified": "2024-09-10T15:01:03Z",
"resolved": "github:NixOS/nixpkgs/5ed627539ac84809c78b2dd6d26a5cebeb5ae269#go_1_23",
Expand Down
14 changes: 7 additions & 7 deletions src/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Duckdb struct {
func NewDuckdb(config *Config) *Duckdb {
ctx := context.Background()
db, err := sql.Open("duckdb", "")
PanicIfError(err)
PanicIfError(err, config)

duckdb := &Duckdb{
db: db,
Expand Down Expand Up @@ -64,7 +64,7 @@ func NewDuckdb(config *Config) *Duckdb {

for _, query := range bootQueries {
_, err := duckdb.ExecContext(ctx, query, nil)
PanicIfError(err)
PanicIfError(err, config)
}

switch config.StorageType {
Expand All @@ -77,11 +77,11 @@ func NewDuckdb(config *Config) *Duckdb {
"endpoint": config.Aws.S3Endpoint,
"s3Bucket": "s3://" + config.Aws.S3Bucket,
})
PanicIfError(err)
PanicIfError(err, config)

if config.LogLevel == LOG_LEVEL_TRACE {
_, err = duckdb.ExecContext(ctx, "SET enable_http_logging=true", nil)
PanicIfError(err)
PanicIfError(err, config)
}
}

Expand Down Expand Up @@ -143,19 +143,19 @@ func readDuckdbInitFile(config *Config) []string {
LogDebug(config, "DuckDB: No init file found at", config.InitSqlFilepath)
return nil
}
PanicIfError(err)
PanicIfError(err, config)
}

LogInfo(config, "DuckDB: Reading init file", config.InitSqlFilepath)
file, err := os.Open(config.InitSqlFilepath)
PanicIfError(err)
PanicIfError(err, config)
defer file.Close()

lines := []string{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
PanicIfError(scanner.Err())
PanicIfError(scanner.Err(), config)
return lines
}
14 changes: 7 additions & 7 deletions src/iceberg_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,29 +333,29 @@ func (icebergWriter *IcebergWriter) Write(schemaTable IcebergSchemaTable, pgSche
dataDirPath := icebergWriter.storage.CreateDataDir(schemaTable)

parquetFile, err := icebergWriter.storage.CreateParquet(dataDirPath, pgSchemaColumns, loadRows)
PanicIfError(err)
PanicIfError(err, icebergWriter.config)

metadataDirPath := icebergWriter.storage.CreateMetadataDir(schemaTable)

manifestFile, err := icebergWriter.storage.CreateManifest(metadataDirPath, parquetFile)
PanicIfError(err)
PanicIfError(err, icebergWriter.config)

manifestListFile, err := icebergWriter.storage.CreateManifestList(metadataDirPath, parquetFile, manifestFile)
PanicIfError(err)
PanicIfError(err, icebergWriter.config)

metadataFile, err := icebergWriter.storage.CreateMetadata(metadataDirPath, pgSchemaColumns, parquetFile, manifestFile, manifestListFile)
PanicIfError(err)
PanicIfError(err, icebergWriter.config)

err = icebergWriter.storage.CreateVersionHint(metadataDirPath, metadataFile)
PanicIfError(err)
PanicIfError(err, icebergWriter.config)
}

func (icebergWriter *IcebergWriter) DeleteSchemaTable(schemaTable IcebergSchemaTable) {
err := icebergWriter.storage.DeleteSchemaTable(schemaTable)
PanicIfError(err)
PanicIfError(err, icebergWriter.config)
}

func (icebergWriter *IcebergWriter) DeleteSchema(schema string) {
err := icebergWriter.storage.DeleteSchema(schema)
PanicIfError(err)
PanicIfError(err, icebergWriter.config)
}
2 changes: 1 addition & 1 deletion src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func start(config *Config) {
queryHandler := NewQueryHandler(config, duckdb, icebergReader)

for {
conn := AcceptConnection(tcpListener)
conn := AcceptConnection(config, tcpListener)
LogInfo(config, "BemiDB: Accepted connection from", conn.RemoteAddr())
postgres := NewPostgres(config, &conn)

Expand Down
47 changes: 27 additions & 20 deletions src/pg_schema_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ type PgSchemaColumn struct {
NumericScale string
DatetimePrecision string
Namespace string
config *Config
}

func NewPgSchemaColumn(config *Config) *PgSchemaColumn {
return &PgSchemaColumn{
config: config,
}
}

type ParquetSchemaField struct {
Expand Down Expand Up @@ -151,7 +158,7 @@ func (pgSchemaColumn *PgSchemaColumn) FormatParquetValue(value string) interface

csvReader := csv.NewReader(strings.NewReader(csvString))
stringValues, err := csvReader.Read()
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)

for _, stringValue := range stringValues {
values = append(values, pgSchemaColumn.parquetPrimitiveValue(stringValue))
Expand Down Expand Up @@ -184,9 +191,9 @@ func (pgSchemaColumn *PgSchemaColumn) toParquetSchemaField() ParquetSchemaField
switch pgSchemaColumn.UdtName {
case "numeric":
scale, err := StringToInt(pgSchemaColumn.NumericScale)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
precision, err := StringToInt(pgSchemaColumn.NumericPrecision)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
if precision > PARQUET_MAX_DECIMAL_PRECISION {
precision = PARQUET_MAX_DECIMAL_PRECISION
} else if precision == 0 {
Expand Down Expand Up @@ -223,87 +230,87 @@ func (pgSchemaColumn *PgSchemaColumn) parquetPrimitiveValue(value string) interf
return trimmedValue
case "int2", "int4":
intValue, err := StringToInt(value)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
return int32(intValue)
case "int8":
intValue, err := strconv.ParseInt(value, 10, 64)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
return intValue
case "xid":
intValue, err := strconv.ParseUint(value, 10, 32)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
return intValue
case "xid8":
intValue, err := strconv.ParseUint(value, 10, 64)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
return intValue
case "float4":
floatValue, err := strconv.ParseFloat(value, 32)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
if math.IsNaN(floatValue) {
return PARQUET_NAN
}
return float32(floatValue)
case "float8":
floatValue, err := strconv.ParseFloat(value, 64)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
if math.IsNaN(floatValue) {
return PARQUET_NAN
}
return floatValue
case "bool":
boolValue, err := strconv.ParseBool(value)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
return boolValue
case "timestamp":
if pgSchemaColumn.DatetimePrecision == "6" {
parsedTime, err := time.Parse("2006-01-02 15:04:05.999999", value)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
return parsedTime.UnixMicro()
} else {
parsedTime, err := time.Parse("2006-01-02 15:04:05.999", value)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
return parsedTime.UnixMilli()
}
case "timestamptz":
if pgSchemaColumn.DatetimePrecision == "6" {
parsedTime, err := time.Parse("2006-01-02 15:04:05.999999-07:00", value)
if err != nil {
parsedTime, err = time.Parse("2006-01-02 15:04:05.999999-07", value)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
}
return parsedTime.UnixMicro()
} else {
parsedTime, err := time.Parse("2006-01-02 15:04:05.999-07:00", value)
if err != nil {
parsedTime, err = time.Parse("2006-01-02 15:04:05.999-07", value)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
}
return parsedTime.UnixMilli()
}
case "time":
if pgSchemaColumn.DatetimePrecision == "6" {
parsedTime, err := time.Parse("15:04:05.999999", value)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
return int64(-EPOCH_TIME_MS*1000 + parsedTime.UnixMicro())
} else {
parsedTime, err := time.Parse("15:04:05.999", value)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
return -EPOCH_TIME_MS + parsedTime.UnixMilli()
}
case "timetz":
if pgSchemaColumn.DatetimePrecision == "6" {
parsedTime, err := time.Parse("15:04:05.999999-07", value)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
return int64(-EPOCH_TIME_MS*1000 + parsedTime.UnixMicro())
} else {
parsedTime, err := time.Parse("15:04:05.999-07", value)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
return -EPOCH_TIME_MS + parsedTime.UnixMilli()
}
case "date":
parsedTime, err := time.Parse("2006-01-02", value)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
return parsedTime.Unix() / 86400
default:
// User-defined types
Expand Down Expand Up @@ -385,7 +392,7 @@ func (pgSchemaColumn *PgSchemaColumn) icebergPrimitiveType() string {
}

precision, err := StringToInt(pgSchemaColumn.NumericPrecision)
PanicIfError(err)
PanicIfError(err, pgSchemaColumn.config)
if precision > PARQUET_MAX_DECIMAL_PRECISION {
precision = PARQUET_MAX_DECIMAL_PRECISION
}
Expand Down
10 changes: 5 additions & 5 deletions src/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ func NewTcpListener(config *Config) net.Listener {
}

tcpListener, err := net.Listen(network, host+":"+config.Port)
PanicIfError(err)
PanicIfError(err, config)
return tcpListener
}

func AcceptConnection(listener net.Listener) net.Conn {
func AcceptConnection(config *Config, listener net.Listener) net.Conn {
conn, err := listener.Accept()
PanicIfError(err)
PanicIfError(err, config)
return conn
}

Expand Down Expand Up @@ -177,10 +177,10 @@ func (postgres *Postgres) writeMessages(messages ...pgproto3.Message) {
var err error
for _, message := range messages {
buf, err = message.Encode(buf)
PanicIfError(err, "Error encoding messages")
PanicIfError(err, nil, "Error encoding messages")
}
_, err = (*postgres.conn).Write(buf)
PanicIfError(err, "Error writing messages")
PanicIfError(err, nil, "Error writing messages")
}

func (postgres *Postgres) writeError(err error) {
Expand Down
4 changes: 2 additions & 2 deletions src/query_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,15 +385,15 @@ func (queryHandler *QueryHandler) HandleExecuteQuery(message *pgproto3.Execute,
func (queryHandler *QueryHandler) createSchemas() {
ctx := context.Background()
schemas, err := queryHandler.icebergReader.Schemas()
PanicIfError(err)
PanicIfError(err, queryHandler.config)

for _, schema := range schemas {
_, err := queryHandler.duckdb.ExecContext(
ctx,
"CREATE SCHEMA IF NOT EXISTS \"$schema\"",
map[string]string{"schema": schema},
)
PanicIfError(err)
PanicIfError(err, queryHandler.config)
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/query_remapper_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,29 +185,29 @@ func (remapper *QueryRemapperTable) RemapTableFunctionCall(rangeFunction *pgQuer

func (remapper *QueryRemapperTable) reloadIceberSchemaTables() {
newIcebergSchemaTables, err := remapper.icebergReader.SchemaTables()
PanicIfError(err)
PanicIfError(err, remapper.config)

ctx := context.Background()
for _, icebergSchemaTable := range newIcebergSchemaTables.Values() {
if !remapper.icebergSchemaTables.Contains(icebergSchemaTable) {
icebergTableFields, err := remapper.icebergReader.TableFields(icebergSchemaTable)
PanicIfError(err)
PanicIfError(err, remapper.config)

var sqlColumns []string
for _, icebergTableField := range icebergTableFields {
sqlColumns = append(sqlColumns, icebergTableField.ToSql())
}

_, err = remapper.duckdb.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+icebergSchemaTable.Schema, nil)
PanicIfError(err)
PanicIfError(err, remapper.config)
_, err = remapper.duckdb.ExecContext(ctx, "CREATE TABLE IF NOT EXISTS "+icebergSchemaTable.String()+" ("+strings.Join(sqlColumns, ", ")+")", nil)
PanicIfError(err)
PanicIfError(err, remapper.config)
}
}
for _, icebergSchemaTable := range remapper.icebergSchemaTables.Values() {
if !newIcebergSchemaTables.Contains(icebergSchemaTable) {
_, err = remapper.duckdb.ExecContext(ctx, "DROP TABLE IF EXISTS "+icebergSchemaTable.String(), nil)
PanicIfError(err)
PanicIfError(err, remapper.config)
}
}

Expand All @@ -224,7 +224,7 @@ func (remapper *QueryRemapperTable) upsertPgStatUserTables(icebergSchemaTables S
"DELETE FROM pg_stat_user_tables",
"INSERT INTO pg_stat_user_tables VALUES " + strings.Join(values, ", "),
})
PanicIfError(err)
PanicIfError(err, remapper.config)
}

// System pg_* tables
Expand Down
4 changes: 2 additions & 2 deletions src/storage_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (storage *StorageBase) WriteParquetFile(fileWriter source.ParquetFile, pgSc
schemaMap["Fields"] = append(schemaMap["Fields"].([]map[string]interface{}), fieldMap)
}
schemaJson, err := json.Marshal(schemaMap)
PanicIfError(err)
PanicIfError(err, storage.config)

LogDebug(storage.config, "Parquet schema:", string(schemaJson))
parquetWriter, err := writer.NewJSONWriter(string(schemaJson), fileWriter, PARQUET_PARALLEL_NUMBER)
Expand All @@ -106,7 +106,7 @@ func (storage *StorageBase) WriteParquetFile(fileWriter source.ParquetFile, pgSc
rowMap[pgSchemaColumns[i].ColumnName] = pgSchemaColumns[i].FormatParquetValue(rowValue)
}
rowJson, err := json.Marshal(rowMap)
PanicIfError(err)
PanicIfError(err, storage.config)

if err = parquetWriter.Write(string(rowJson)); err != nil {
return 0, fmt.Errorf("Write error: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions src/storage_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (storage *StorageLocal) IcebergTableFields(icebergSchemaTable IcebergSchema

func (storage *StorageLocal) absoluteIcebergPath(relativePaths ...string) string {
execPath, err := os.Getwd()
PanicIfError(err)
PanicIfError(err, storage.config)

return filepath.Join(execPath, storage.config.StoragePath, filepath.Join(relativePaths...))
}
Expand Down Expand Up @@ -111,15 +111,15 @@ func (storage *StorageLocal) CreateDataDir(schemaTable IcebergSchemaTable) strin
tablePath := storage.tablePath(schemaTable)
dataPath := filepath.Join(tablePath, "data")
err := os.MkdirAll(dataPath, os.ModePerm)
PanicIfError(err)
PanicIfError(err, storage.config)
return dataPath
}

func (storage *StorageLocal) CreateMetadataDir(schemaTable IcebergSchemaTable) string {
tablePath := storage.tablePath(schemaTable)
metadataPath := filepath.Join(tablePath, "metadata")
err := os.MkdirAll(metadataPath, os.ModePerm)
PanicIfError(err)
PanicIfError(err, storage.config)
return metadataPath
}

Expand Down
Loading

0 comments on commit bac6b91

Please sign in to comment.