This repository is currently being migrated. It's locked while the migration is in progress.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
init.go
175 lines (139 loc) · 4.82 KB
/
init.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package main
import (
"flag"
"fmt"
"log"
"os"
"github.com/storageos/init/info"
"github.com/storageos/init/info/k8s"
"github.com/storageos/init/script"
"github.com/storageos/init/script/runner"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
)
const (
daemonSetNameEnvVar = "DAEMONSET_NAME"
daemonSetNamespaceEnvVar = "DAEMONSET_NAMESPACE"
nodeImageEnvVar = "NODE_IMAGE"
)
func main() {
scriptsDir := flag.String("scripts", "", "absolute path of the scripts directory")
dsName := flag.String("dsName", "", "name of the StorageOS DaemonSet")
dsNamespace := flag.String("dsNamespace", "", "namespace of the StorageOS DaemonSet")
nodeImage := flag.String("nodeImage", "", "container image of StorageOS Node, use when running out of k8s")
flag.Parse()
// StorageOS node container image.
var storageosImage string
// Abort if no scripts directory is provided.
if *scriptsDir == "" {
log.Println("no scripts directory specified, pass scripts dir with -scripts flag.")
os.Exit(1)
}
// This is in k8s environment.
kubeclient, err := newK8SClient()
if err != nil {
log.Fatal(err)
}
// Attempt to get storageos node image.
if *nodeImage == "" {
var imageInfo info.ImageInfoer
// Create a k8s image info.
name, namespace := getParamsForK8SImageInfo(*dsName, *dsNamespace)
imageInfo = k8s.NewImageInfo(kubeclient).SetDaemonSet(name, namespace)
// Get image.
storageosImage, err = imageInfo.GetContainerImage(k8s.DefaultContainerName)
if err != nil {
log.Fatal(err)
}
} else {
storageosImage = *nodeImage
}
// Abort if storageos node image is still unknown.
if storageosImage == "" {
log.Println("unknown storageos node image, pass node image with -nodeImage flag.")
os.Exit(1)
}
// scriptEnvVar is the env vars passed to all the scripts.
scriptEnvVar := map[string]string{}
scriptEnvVar[nodeImageEnvVar] = storageosImage
// Get list of all the scripts.
allScripts, err := script.GetAllScripts(*scriptsDir)
if err != nil {
log.Fatalf("failed to get list of scripts: %v", err)
}
log.Println("scripts:", allScripts)
// Create a script runner.
run := runner.NewRun()
// Run all the scripts.
if err := runScripts(run, allScripts, scriptEnvVar); err != nil {
log.Fatalf("init failed: %v", err)
}
node, err := kubeclient.CoreV1().Nodes().Get(os.Getenv("NODE"), v1.GetOptions{})
if err != nil {
log.Fatalf("failed to get node from name \"%s\": %v", os.Getenv("NODE"), err)
}
file, err := os.Create("/var/lib/storageos/init_envs.sh")
if err != nil {
log.Fatalf("failed to create file init_envs.sh: %v", err)
}
defer file.Close()
// add env variables as key/value pairs
file.WriteString(fmt.Sprintf("NODE_TOPOLOGY_ZONE=%s\n", node.Labels["topology.kubernetes.io/zone"]))
log.Printf("stored in env file: NODE_TOPOLOGY_ZONE=%s", node.Labels["topology.kubernetes.io/zone"])
}
// NewK8SClient attempts to get k8s cluster configuration and return a new
// kubernetes client.
func newK8SClient() (kubernetes.Interface, error) {
cfg, err := restclient.InClusterConfig()
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(cfg)
}
// getParamsForK8SImageInfo returns the name and namespace to be used in k8s
// ImageInfo.
func getParamsForK8SImageInfo(dsName, dsNamespace string) (name, namespace string) {
// If DaemonSet name is not provided, read from env var.
if dsName == "" {
dsName = os.Getenv(daemonSetNameEnvVar)
// If DaemonSet name is still empty, use the default DaemonSet name.
if dsName == "" {
dsName = k8s.DefaultDaemonSetName
}
}
// If DaemonSet namespace is not provided, read from env var.
if dsNamespace == "" {
dsNamespace = os.Getenv(daemonSetNamespaceEnvVar)
// If DaemonSet namespace still empty, use the default StorageOS
// deployment namespace.
if dsNamespace == "" {
dsNamespace = k8s.DefaultDaemonSetNamespace
}
}
return dsName, dsNamespace
}
// runScripts takes a list of scripts and env vars, and runs the scripts
// sequentially. The error returned by the script execution is logged as k8s pod
// event.
// Any preliminary checks that need to be performed before running a script can
// be performed here.
func runScripts(run script.Runner, scripts []string, envVars map[string]string) error {
for _, script := range scripts {
// TODO: Check if the script has any preliminary checks to be performed
// before execution.
log.Printf("exec: %s", script)
_, stderr, err := run.RunScript(script, envVars)
// If stderr contains message, log and issue warning event.
if len(stderr) > 0 {
// log.Printf("[STDERR] %s: \n%s\n", script, string(stderr))
// Create k8s warning event.
// Issue a warning event with the stderr log.
}
if err != nil {
// Create a k8s failure events.
return fmt.Errorf("script %q failed: %v", script, err)
}
}
return nil
}