Skip to content

Commit

Permalink
Merge pull request #51 from MinaFoundation/postgres
Browse files Browse the repository at this point in the history
PM-1546 - Add option to keep submissions in PostgreSQL
  • Loading branch information
piotr-iohk authored Apr 29, 2024
2 parents ea82752 + f879fbb commit 9a2f2fd
Show file tree
Hide file tree
Showing 12 changed files with 568 additions and 108 deletions.
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ The program can be configured using either a JSON configuration file or environm
},
"filesystem": {
"path": "your_filesystem_path"
},
"postgresql": {
"user": "postgres",
"password": "postgres",
"host": "localhost",
"port": 5432,
"database": "delegation_program",
"sslmode": "require"
}
}
```
Expand Down Expand Up @@ -141,7 +149,18 @@ If the `CONFIG_FILE` environment variable is not set, the program will fall back
5. **Local File System Configuration**:
- `CONFIG_FILESYSTEM_PATH` - Set this to the path where you want the local file system to point.

6. **Test settings**
6. **PostgreSQL Configuration**

If this storage backend is configured it is assumed that submissions are written into `submissions` table in the uptime-service-validation (coordinator) component. In this mode we are not storing `raw_block` in the database.

- `POSTGRES_HOST` - Hostname or IP address where your PostgreSQL server is running.
- `POSTGRES_PORT` - Port number on which PostgreSQL is listening.
- `POSTGRES_DB` - The name of the database to connect to. This is the uptime-service-validation database.
- `POSTGRES_USER` - The username with which to connect to the database.
- `POSTGRES_PASSWORD` - The password for the database user.
- `POSTGRES_SSLMODE` - The mode for SSL connectivity (e.g., `disable`, `require`, `verify-ca`, `verify-full`). Default is `require` for secure setups.

7. **Test settings**

These settings are useful for debugging or testing under controlled conditions. Always revert to secure and sensible defaults before moving to a production environment to maintain the security and reliability of your system.

Expand Down
2 changes: 1 addition & 1 deletion dockerfiles/Dockerfile-delegation-backend
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.20
FROM golang:1.21

# Set the Current Working Directory inside the container
WORKDIR $GOPATH/src/delegation_backend
Expand Down
18 changes: 18 additions & 0 deletions src/cmd/delegation_backend/main_bpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func main() {
app.Log = log
awsctx := AwsContext{}
kc := KeyspaceContext{}
pctx := PostgreSQLContext{}
app.VerifySignatureDisabled = appCfg.VerifySignatureDisabled
if app.VerifySignatureDisabled {
log.Warnf("Signature verification is disabled, it is not recommended to run the delegation backend in this mode!")
Expand Down Expand Up @@ -72,13 +73,30 @@ func main() {
log.Infof("storage backend: Local File System")
}

if appCfg.PostgreSQL != nil {
log.Infof("storage backend: PostgreSQL")
db, err := NewPostgreSQL(appCfg.PostgreSQL)
if err != nil {
log.Fatalf("Error initializing PostgreSQL: %v", err)
}
defer db.Close()

pctx = PostgreSQLContext{
DB: db,
Log: log,
}
}

app.Save = func(objs ObjectsToSave) {
if appCfg.Aws != nil {
awsctx.S3Save(objs)
}
if appCfg.AwsKeyspaces != nil {
kc.KeyspaceSave(objs)
}
if appCfg.PostgreSQL != nil {
pctx.PostgreSQLSave(objs)
}
if appCfg.LocalFileSystem != nil {
LocalFileSystemSave(objs, appCfg.LocalFileSystem.Path, log)
}
Expand Down
34 changes: 34 additions & 0 deletions src/delegation_backend/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,30 @@ func LoadEnv(log logging.EventLogger) AppConfig {
}
}

// PostgreSQL configurations
if postgresHost := os.Getenv("POSTGRES_HOST"); postgresHost != "" {
postgresUser := getEnvChecked("POSTGRES_USER", log)
postgresPassword := getEnvChecked("POSTGRES_PASSWORD", log)
postgresDBName := getEnvChecked("POSTGRES_DB", log)
postgresPort, err := strconv.Atoi(getEnvChecked("POSTGRES_PORT", log))
if err != nil {
log.Fatalf("Error parsing POSTGRES_PORT: %v", err)
}
postgresSSLMode := os.Getenv("POSTGRES_SSLMODE")
if postgresSSLMode == "" {
postgresSSLMode = "require"
}

config.PostgreSQL = &PostgreSQLConfig{
Host: postgresHost,
Port: postgresPort,
User: postgresUser,
Password: postgresPassword,
DBName: postgresDBName,
SSLMode: postgresSSLMode,
}
}

config.NetworkName = networkName
config.GsheetId = gsheetId
config.DelegationWhitelistList = delegationWhitelistList
Expand Down Expand Up @@ -187,6 +211,15 @@ type LocalFileSystemConfig struct {
Path string `json:"path"`
}

type PostgreSQLConfig struct {
Host string `json:"host"`
Port int `json:"port"`
User string `json:"user"`
Password string `json:"password"`
DBName string `json:"database"`
SSLMode string `json:"sslmode"`
}

type AppConfig struct {
NetworkName string `json:"network_name"`
GsheetId string `json:"gsheet_id"`
Expand All @@ -197,4 +230,5 @@ type AppConfig struct {
Aws *AwsConfig `json:"aws,omitempty"`
AwsKeyspaces *AwsKeyspacesConfig `json:"aws_keyspaces,omitempty"`
LocalFileSystem *LocalFileSystemConfig `json:"filesystem,omitempty"`
PostgreSQL *PostgreSQLConfig `json:"postgresql,omitempty"`
}
104 changes: 4 additions & 100 deletions src/delegation_backend/aws_keyspaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package delegation_backend

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -114,72 +111,6 @@ func sigv4Authentication(config *AwsKeyspacesConfig) (sigv4.AwsAuthenticator, er
return auth, nil
}

type Submission struct {
BlockHash string `json:"block_hash"`
SubmittedAtDate string // Extracted from filePath
SubmittedAt time.Time // Extracted from filePath and parsed
CreatedAt time.Time `json:"created_at"`
RemoteAddr string `json:"remote_addr"`
PeerId string `json:"peer_id"`
Submitter string `json:"submitter"` // is base58check-encoded submitter's public key
RawBlock []byte `json:"raw_block,omitempty"`
SnarkWork []byte `json:"snark_work,omitempty"`
GraphqlControlPort int `json:"graphql_control_port,omitempty"`
BuiltWithCommitSha string `json:"built_with_commit_sha,omitempty"`
}

type Block struct {
BlockHash string
RawBlock []byte
}

func (kc *KeyspaceContext) parseSubmissionBytes(data []byte, filePath string) (*Submission, error) {
// Extract information from filePath
// kc.Log.Debugf("filePath: %s\n", filePath)
filePathParts := strings.Split(filePath, "/")
if len(filePathParts) < 3 {
return nil, fmt.Errorf("invalid file path: %s", filePath)
}
submittedAtDate := filePathParts[1]
submittedAtWithSubmitter := strings.TrimSuffix(filePathParts[2], ".json")
lastHyphenIndex := strings.LastIndex(submittedAtWithSubmitter, "-")
submittedAtStr := submittedAtWithSubmitter[:lastHyphenIndex]

// Parse submittedAtStr string into time.Time
submittedAt, err := time.Parse("2006-01-02T15:04:05Z", submittedAtStr)
if err != nil {
return nil, fmt.Errorf("error parsing submitted_at string: %w", err)
}

// Parse JSON contents
var submission Submission
err = json.Unmarshal(data, &submission)
if err != nil {
return nil, fmt.Errorf("error unmarshaling submission JSON: %w", err)
}

// Populate additional fields from filePath
submission.SubmittedAtDate = submittedAtDate
submission.SubmittedAt = submittedAt
// kc.Log.Debugf("submission: %v\n", submission)

return &submission, nil
}

func (kc *KeyspaceContext) parseBlockBytes(data []byte, filePath string) (*Block, error) {
// Extract the filename without the extension to use as the BlockHash
// kc.Log.Debugf("filePath: %s\n", filePath)
filename := filepath.Base(filePath)
blockHash := strings.TrimSuffix(filename, filepath.Ext(filename))

block := &Block{
BlockHash: blockHash,
RawBlock: data,
}
// kc.Log.Debugf("block: %v\n", block)
return block, nil
}

type KeyspaceContext struct {
Session *gocql.Session
Keyspace string
Expand Down Expand Up @@ -266,37 +197,10 @@ func (kc *KeyspaceContext) insertSubmissionWithRawBlock(submission *Submission)

// KeyspaceSave saves the provided objects into Amazon Keyspaces.
func (kc *KeyspaceContext) KeyspaceSave(objs ObjectsToSave) {
var submissionToSave *Submission = &Submission{}
for path, bs := range objs {
if strings.HasPrefix(path, "submissions/") {
submission, err := kc.parseSubmissionBytes(bs, path)
if err != nil {
kc.Log.Errorf("KeyspaceSave: Error parsing submission JSON: %v", err)
continue
}
submissionToSave.BlockHash = submission.BlockHash
submissionToSave.CreatedAt = submission.CreatedAt
submissionToSave.GraphqlControlPort = submission.GraphqlControlPort
submissionToSave.PeerId = submission.PeerId
submissionToSave.RemoteAddr = submission.RemoteAddr
submissionToSave.SnarkWork = submission.SnarkWork
submissionToSave.SubmittedAt = submission.SubmittedAt
submissionToSave.SubmittedAtDate = submission.SubmittedAtDate
submissionToSave.Submitter = submission.Submitter
submissionToSave.BuiltWithCommitSha = submission.BuiltWithCommitSha

} else if strings.HasPrefix(path, "blocks/") {
block, err := kc.parseBlockBytes(bs, path)
if err != nil {
kc.Log.Errorf("KeyspaceSave: Error parsing block file: %v", err)
continue
}
submissionToSave.RawBlock = block.RawBlock
submissionToSave.BlockHash = block.BlockHash
} else {
kc.Log.Errorf("KeyspaceSave: Unknown path format: %s", path)
}

submissionToSave, err := objectToSaveToSubmission(objs, kc.Log)
if err != nil {
kc.Log.Errorf("KeyspaceSave: Error preparing submission for saving: %v", err)
return
}
kc.Log.Infof("KeyspaceSave: Saving submission for block: %v, submitter: %v, submitted_at: %v", submissionToSave.BlockHash, submissionToSave.Submitter, submissionToSave.SubmittedAt)
if err := kc.insertSubmission(submissionToSave); err != nil {
Expand Down
94 changes: 94 additions & 0 deletions src/delegation_backend/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package delegation_backend

import (
"database/sql"
"fmt"

logging "github.com/ipfs/go-log/v2"
_ "github.com/lib/pq"
)

type PostgreSQLContext struct {
DB *sql.DB
Log *logging.ZapEventLogger
}

func NewPostgreSQL(cfg *PostgreSQLConfig) (*sql.DB, error) {
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
cfg.Host, cfg.Port, cfg.User, cfg.Password, cfg.DBName, cfg.SSLMode)
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, err
}
if err = db.Ping(); err != nil {
return nil, err
}
return db, nil
}

func (ctx *PostgreSQLContext) insertSubmission(submission *Submission) error {
// if SnarkWork is empty, do not insert it into the database
if len(submission.SnarkWork) == 0 {
return ctx.insertSubmissionWithoutSnarkWork(submission)
}
return ctx.insertSubmissionWithSnarkWork(submission)
}

func (ctx *PostgreSQLContext) insertSubmissionWithoutSnarkWork(submission *Submission) error {
query := `INSERT INTO submissions
(submitted_at_date,
submitted_at,
submitter,
created_at,
block_hash,
remote_addr,
peer_id,
graphql_control_port,
built_with_commit_sha)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`
_, err := ctx.DB.Exec(query, submission.SubmittedAtDate, submission.SubmittedAt,
submission.Submitter, submission.CreatedAt, submission.BlockHash,
submission.RemoteAddr, submission.PeerId, submission.GraphqlControlPort,
submission.BuiltWithCommitSha)
return err
}

func (ctx *PostgreSQLContext) insertSubmissionWithSnarkWork(submission *Submission) error {
query := `INSERT INTO submissions
(submitted_at_date,
submitted_at,
submitter,
created_at,
block_hash,
remote_addr,
peer_id,
graphql_control_port,
built_with_commit_sha,
snark_work)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`
_, err := ctx.DB.Exec(query, submission.SubmittedAtDate, submission.SubmittedAt,
submission.Submitter, submission.CreatedAt, submission.BlockHash,
submission.RemoteAddr, submission.PeerId, submission.GraphqlControlPort,
submission.BuiltWithCommitSha, submission.SnarkWork)
return err
}

func (ctx *PostgreSQLContext) PostgreSQLSave(objs ObjectsToSave) {
submissionToSave, err := objectToSaveToSubmission(objs, ctx.Log)
if err != nil {
ctx.Log.Errorf("PostgreSQLSave: Error preparing submission for saving: %v", err)
return
}

if err := ctx.insertSubmission(submissionToSave); err != nil {
// if err contains uq_submissions_submitter_date then we can ignore it
// because it means that the submission is already in the database
if err.Error() == "pq: duplicate key value violates unique constraint \"uq_submissions_submitter_date\"" {
ctx.Log.Infof("PostgreSQLSave: Submission for submitter: %v at %v already exists", submissionToSave.Submitter, submissionToSave.SubmittedAt)
return
}
ctx.Log.Errorf("PostgreSQLSave: Error saving submission to PostgreSQL: %v", err)
} else {
ctx.Log.Infof("PostgreSQLSave: Successfully saved submission for submitter: %v at %v", submissionToSave.Submitter, submissionToSave.SubmittedAt)
}
}
Loading

0 comments on commit 9a2f2fd

Please sign in to comment.