forked from glycerine/goq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgoq.go
1921 lines (1647 loc) · 56.7 KB
/
goq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package main
// copyright(c) 2014, Jason E. Aten
//
// goq : a simple queueing system in go; qsub replacement.
//
import (
"bytes"
"fmt"
"io/ioutil"
"math/rand"
"net"
"os"
"os/exec"
"strings"
"sync"
"time"
schema "github.com/glycerine/goq/schema"
//nn "github.com/glycerine/go-nanomsg"
nn "github.com/go-mangos/mangos/compat"
)
// In this model of work dispatch, there are three roles: submitter(s), a server, and worker(s).
//
// The JobServer handles 4 essential types of job messages (marked with ***),
// and many other acks/side info requests. But these four are the
// most important/fundament.
//
/* JOBMSG_INITIALSUBMIT JobMsg = 0 // *** submitter requests job be queued/started
JOBMSG_REQUESTFORWORK = 2 // *** worker requests a new job (msg and workeraddr only)
JOBMSG_DELEGATETOWORKER = 3 // *** worker is sent job with Cmd and Dir filled in.
JOBMSG_FINISHEDWORK = 6 // *** worker replies with finished job.
*/
const GoqExeName = "goq"
// for tons of debug output (see also WorkerVerbose)
var Verbose bool
// for a debug/heap/profile webserver on port, set WebDebug = true
var WebDebug bool = true
// for debugging signature issues
var ShowSig bool
var AesOff bool
// number of finished job records to retain in a ring buffer. Oldest are discarded when full.
var DefaultFinishedRingMaxLen = 1000
func init() {
rand.Seed(time.Now().UnixNano() + int64(GetExternalIPAsInt()) + CryptoRandInt64())
}
func VPrintf(format string, a ...interface{}) {
if Verbose {
TSPrintf(format, a...)
}
}
type control int
const (
nothing control = iota
die
stateToDisk
)
func (cmd control) String() string {
switch cmd {
case die:
return "die"
}
return fmt.Sprintf("%d", cmd)
}
var SocketCountPushCache int
// cache the sockets for reuse
type PushCache struct {
Name string
Addr string // even port number (mnemonic: stdout is 0/even)
pushSock *nn.Socket // from => pull
cfg *Config
}
func NewPushCache(name, addr string, cfg *Config) *PushCache {
p := &PushCache{
Name: name,
Addr: addr,
cfg: cfg,
}
//var count int = SocketCountPushCache
//fmt.Printf("\n SocketCountPushCache = %d\n", count)
t, err := MkPushNN(addr, cfg, false)
if err != nil {
pid := os.Getpid()
fmt.Printf("\n SocketCountPushCache = %d, err = '%s'. Freezing here for debug inspection. pid = %d. errno = %d\n", SocketCountPushCache, err, pid, GetErrno())
out, _ := exec.Command("lsof", "-p", fmt.Sprintf("%d", pid)).Output()
fmt.Printf("lsof: '%s'\n", string(out))
outns, _ := exec.Command("netstat", "-an").Output()
fmt.Printf("netstat: '%s'\n", string(outns))
select {}
panic(err) // panic: too many open files here.
// researching the too many open files upon restoring from state file:
//
// key advice:
/* as root:
echo "\n# increase system IP port limits" >> /etc/sysctl.conf
echo "net.ipv4.ip_local_port_range = 10000 65535" >> /etc/sysctl.conf
echo "net.ipv4.tcp_fin_timeout = 10" >> /etc/sysctl.conf
echo "net.core.somaxconn = 1024" >> /etc/sysctl.conf
*/
/* or setting them before reboot:
sudo sysctl -w net.ipv4.ip_local_port_range="1024 65535"
sudo sysctl -w net.ipv4.tcp_fin_timeout=10
sudo sysctl -w net.core.somaxconn=1024
*/
// # should yield 5100 sockets/sec okay. But we still can't start that quickly.
//
// from:
// http://stackoverflow.com/questions/410616/increasing-the-maximum-number-of-tcp-ip-connections-in-linux
//
//Maximum number of connections are impacted by certain limits on both
// client & server sides, albeit a little differently.
//
//On the client side: Increase the ephermal port range, and decrease the tcp_fin_timeout
//
// To find out the default values:
//
// sysctl net.ipv4.ip_local_port_range
// sysctl net.ipv4.tcp_fin_timeout
// The ephermal port range defines the maximum number of outbound sockets a
// host can create from a particular I.P. address. The fin_timeout defines
// the minimum time these sockets will stay in TIME_WAIT state (unusable
// after being used once). Usual system defaults are:
//
// net.ipv4.ip_local_port_range = 32768 61000
// net.ipv4.tcp_fin_timeout = 60
//
// This basically means your system cannot guarantee more than (61000 - 32768) / 60 =
// 470 sockets [per second (or minute?)]. If you are not happy with that, you could
// begin with increasing the port_range. Setting the range to 15000 61000 is pretty
// common these days. You could further increase the availability by decreasing the
// fin_timeout. Suppose you do both, you should see over 1500 outbound connections,
// more readily.
//
// Added this in my edit:
// *The above should not be interpreted as the factors impacting system capability
// for making outbound connections / second. But rather these factors affect system's
// ability to handle concurrent connections in a sustainable manner for large periods
// of activity.*
//
// Default Sysctl values on a typical linux box for tcp_tw_recycle & tcp_tw_reuse would be
//
// net.ipv4.tcp_tw_recycle = 0
// net.ipv4.tcp_tw_reuse = 0
// These do not allow a connection in wait state after use, and force them to last the complete time_wait cycle. I recommend setting them to:
//
// net.ipv4.tcp_tw_recycle = 1
// net.ipv4.tcp_tw_reuse = 1
// This allows fast cycling of sockets in time_wait state and re-using them. But before you do this change make sure that this does not conflict with the protocols that you would use for the application that needs these sockets.
//
// On the Server Side: The net.core.somaxconn value has an important role. It limits
// the maximum number of requests queued to a listen socket. If you are sure of your
// server application's capability, bump it up from default 128 to something like
// 128 to 1024. Now you can take advantage of this increase by modifying the listen
// backlog variable in your application's listen call, to an equal or higher integer.
//
// txqueuelen parameter of your ethernet cards also have a role to play. Default values are 1000, so bump them up to 5000 or even more if your system can handle it.
//
// Similarly bump up the values for net.core.netdev_max_backlog and net.ipv4.tcp_max_syn_backlog.
// Their default values are 1000 and 1024 respectively.
//
// Now remember to start both your client and server side applications by increasing the
// FD ulimts, in the shell.
//
// Besides the above one more popular technique used by programmers is to reduce the
// number of tcp write calls. My own preference is to use a buffer wherein I push the
// data I wish to send to the client, and then at appropriate points I write out the
// buffered data into the actual socket. This technique allows me to use large data
// packets, reduce fragmentation, reduces my CPU utilization both in the userland at
// kernel-level.
// still running out of resources, 'too many open files' when trying to make a new socket.
/*
try raising max_map_count:
https://my.vertica.com/docs/CE/5.1.1/HTML/index.htm#12962.htm
http://stackoverflow.com/questions/11683850/how-much-memory-could-vm-use-in-linux
sysctl vm.max_map_count
vm.max_map_count = 65530
#may be too low
echo 65535 > /proc/sys/vm/max_map_count
echo "vm.max_map_count = 16777216" | tee -a /etc/sysctl.conf
sudo sysctl -p
#logout and back in
*/
}
SocketCountPushCache++
p.pushSock = t
return p
}
// re-create socket on-demand. Used because we may close
// sockets to keep from using too many.
func (p *PushCache) DemandPushSock() *nn.Socket {
if p.pushSock == nil {
t, err := MkPushNN(p.Addr, p.cfg, false)
if err != nil {
panic(err) // panic: too many open files here.
}
p.pushSock = t
}
return p.pushSock
}
func (p *PushCache) Close() {
// SetLinger is essential or else cancel and immo tests which need submit-replies will fail.
p.pushSock.SetLinger(2 * time.Second)
p.pushSock.Close()
p.pushSock = nil
SocketCountPushCache--
}
// Job represents a job to perform, and is our universal message type.
type Job struct {
Id int64
Msg schema.JobMsg
Aboutjid int64 // in acksubmit, this holds the jobid of the job on the runq, so that Id can be unique and monotonic.
Cmd string
Args []string
Out []string
Env []string
Err string
HadError bool
Host string
Stm int64
Etm int64
Elapsec int64
Status string
Subtime int64
Pid int64
Dir string
Submitaddr string
Serveraddr string
Workeraddr string
Finishaddr []string // who, if anyone, you want notified upon job completion. JOBMSG_JOBFINISHEDNOTICE will be sent.
Signature string
IsLocal bool
Cancelled bool
ArrayId int64
GroupId int64
Delegatetm int64
Lastpingtm int64
Unansweredping int64
Sendtime int64
Sendernonce int64
// not serialized, just used
// for routing
destinationSock *nn.Socket
Runinshell bool
MaxShow int64
CmdOpts uint64
}
func (j *Job) String() string {
if j == nil {
return "&Job{nil}"
} else {
return fmt.Sprintf("&Job{Id:%d Msg:%s Aboutjid:%d Cmd:%s Args:%#v Out:%#v Submitaddr:%s Serveraddr: %s Workeraddr: %s Sendtime: %s Sendernonce: %x}", j.Id, j.Msg, j.Aboutjid, j.Cmd, j.Args, j.Out, j.Submitaddr, j.Serveraddr, j.Workeraddr, time.Unix(j.Sendtime/1e9, j.Sendtime%1e9), j.Sendernonce)
}
}
func NewJob() *Job {
j := &Job{
Id: 0, // only server should assign job.Id, until then, should be 0.
Args: make([]string, 0),
Out: make([]string, 0),
Env: make([]string, 0),
Finishaddr: make([]string, 0),
}
StampJob(j) // also in sendZjob, but here to support local job sends.
return j
}
// only JobServ assigns Ids, submitters and workers just leave Id == 0.
func (js *JobServ) NewJobId() int64 {
id := js.NextJobId
js.NextJobId++
return id
}
func (js *JobServ) RegisterWho(j *Job) {
// add addresses and sockets if not created already
if j.Workeraddr != "" {
if _, ok := js.Who[j.Workeraddr]; !ok {
js.Who[j.Workeraddr] = NewPushCache(j.Workeraddr, j.Workeraddr, &js.Cfg)
}
}
if j.Submitaddr != "" {
if _, ok := js.Who[j.Submitaddr]; !ok {
js.Who[j.Submitaddr] = NewPushCache(j.Submitaddr, j.Submitaddr, &js.Cfg)
}
}
}
func (js *JobServ) UnRegisterWho(j *Job) {
// add addresses and sockets if not created already
if j.Workeraddr != "" {
if c, found := js.Who[j.Workeraddr]; found {
c.Close()
delete(js.Who, j.Workeraddr)
}
}
if j.Submitaddr != "" {
if c, found := js.Who[j.Submitaddr]; found {
c.Close()
delete(js.Who, j.Submitaddr)
}
}
}
func (js *JobServ) UnRegisterSubmitter(j *Job) {
if j.Submitaddr != "" {
if c, found := js.Who[j.Submitaddr]; found {
c.Close()
delete(js.Who, j.Submitaddr)
}
}
}
// assume these won't be long running finishers, so don't cache them in Who
func (js *JobServ) FinishersToNewSocket(j *Job) []*nn.Socket {
res := make([]*nn.Socket, 0)
for i := range j.Finishaddr {
addr := j.Finishaddr[i]
if addr == "" {
panic("addr in Finishers should never be empty")
}
t, err := MkPushNN(addr, &js.Cfg, false)
if err != nil {
panic(err)
}
res = append(res, t)
}
return res
}
func (js *JobServ) CloseRegistry() {
for _, pp := range js.Who {
if pp.pushSock != nil {
pp.Close()
}
}
}
func (js *JobServ) Shutdown() {
VPrintf("at top of JobServ::Shutdown()\n")
js.ShutdownListener()
VPrintf("in JobServ::Shutdown(): after ShutdownListener()\n")
js.CloseRegistry()
VPrintf("in JobServ::Shutdown(): after CloseRegistry()\n")
if js.Nnsock != nil {
js.Nnsock.Close()
}
js.stateToDisk()
VPrintf("in JobServ::Shutdown(): after stateToDisk()\n")
if WebDebug {
VPrintf("calling js.Web.Stop()\n")
js.Web.Stop()
VPrintf("returned from js.Web.Stop()\n")
}
}
func (js *JobServ) ShutdownListener() {
if !js.IsLocal {
// closing the js.ListenerShtudown channel allows us to broadcast
// all the places the listener might be trying to send to JobServ.
VPrintf("in ShutdownListener, about to call CloseChannelIfOpen()\n")
CloseChannelIfOpen(js.ListenerShutdown)
VPrintf("in ShutdownListener, after CloseChannelIfOpen()\n")
<-js.ListenerDone
VPrintf("in ShutdownListener, after <-js.ListenerDone\n")
}
}
func (js *JobServ) stateFilename() string {
return fmt.Sprintf("%s/serverstate", js.dotGoqPath())
}
func (js *JobServ) dotGoqPath() string {
return fmt.Sprintf("%s/.goq", js.Cfg.Home)
}
func (js *JobServ) stateToDisk() {
fn := js.stateFilename()
dir := js.dotGoqPath()
file, err := ioutil.TempFile(dir, "new.serverstate")
if err != nil {
if strings.HasSuffix(err.Error(), "no such file or directory") {
TSPrintf("[pid %d] job server error: stateToDisk() could not find file '%s': %s\n", os.Getpid(), fn, err)
return
} else {
panic(err)
}
}
buf, _ := js.ServerToCapnp()
file.Write(buf.Bytes())
file.Close()
// delete old file
err = os.Remove(fn)
if err != nil {
// it might not exist. that's okay, don't panic.
}
// rename into its place
err = os.Rename(file.Name(), fn)
if err != nil {
panic(err)
}
VPrintf("[pid %d] stateToDisk() done: wrote state (js.NextJobId=%d) to '%s'\n", os.Getpid(), js.NextJobId, fn)
}
func (js *JobServ) diskToState() {
fn := js.stateFilename()
js.checkForOldStateFile(fn)
file, err := os.Open(fn)
if err != nil {
if strings.HasSuffix(err.Error(), "no such file or directory") {
VPrintf("[pid %d] diskToState() done: no state file found in '%s'\n", os.Getpid(), fn)
return
} else {
panic(err)
}
}
defer file.Close()
js.SetStateFromCapnp(file, fn)
VPrintf("[pid %d] diskToState() done: read state (js.NextJobId=%d) from '%s'\n", os.Getpid(), js.NextJobId, fn)
}
type Address string
// JobServ represents the single central job server.
type JobServ struct {
Name string
Nnsock *nn.Socket // receive on
Addr string
Submit chan *Job // submitter sends on, JobServ receives on.
ReSubmit chan int64 // dispatch go-routine sends on when worker is unreachable, JobServ receives on.
WorkerReady chan *Job // worker sends on, JobServ receives on.
ToWorker chan *Job // worker receives on, JobServ sends on.
RunDone chan *Job // worker sends on, JobServ receives on.
SigMismatch chan *Job // Listener tells Start about bad signatures.
BadNonce chan *Job // Listener tells Start about bad nonce (duplicate nonce or stale timestamp)
SnapRequest chan *Job // worker requests state snapshot from JobServ.
ObserveFinish chan *Job // submitter sends on, Jobserv recieves on; when a submitter wants to wait for another job to be done.
NotifyFinishers chan *Job // submitter receives on, jobserv dispatches a notification message for each finish observer
Cancel chan *Job // submitter sends on, to request job cancellation.
ImmoReq chan *Job // submitter sends on, to requst all workers die.
WorkerDead chan *Job // worker tells server just before terminating self.
WorkerAckPing chan *Job // worker replies to server that it is still alive. If working on job then Aboutjid is set.
UnregSubmitWho chan *Job // JobServ internal use: unregister submitter only.
DeafChan chan int // supply CountDeaf, when asked.
WaitingJobs []*Job
RunQ map[int64]*Job
KnownJobHash map[int64]*Job
DedupWorkerHash map[string]bool
Ctrl chan control
Done chan bool
WaitingWorkers []*Job
Pid int
Odir string
NextJobId int64
// listener shutdown
ListenerShutdown chan bool // tell listener to stop by closing this channel.
ListenerDone chan bool // listener closes this channel when finished.
// allow cancel test to not race
FirstCancelDone chan bool // server closes this after hearing on RunDone a job with .Cancelled set.
// directory of submitters and workers
Who map[string]*PushCache
WhoLock sync.RWMutex
// Finishers : who wants to be notified when a job is done.
Finishers map[int64][]Address
CountDeaf int
PrevDeaf int
BadSgtCount int64
FinishedJobsCount int64
CancelledJobCount int64
BadNonceCount int64
// set Cfg *once*, before any goroutines start, then
// treat it as immutable and never changing.
Cfg Config
DebugMode bool // show badsig messages if true
IsLocal bool
NoReplay *NonceRegistry // only ListenForJobs() goroutine should queries/updates this; never Start().
FinishedRing []*Job
FinishedRingMaxLen int
Web *WebServer
}
// DeafChanIfUpdate: don't make consumers of DeafChan busy wait;
// send only upon update
func (js *JobServ) DeafChanIfUpdate() chan int {
if js.CountDeaf != js.PrevDeaf {
return js.DeafChan
} else {
return nil
}
}
func (js *JobServ) SubmitJob(j *Job) error {
fmt.Printf("SubmitJob called.\n")
j.Msg = schema.JOBMSG_INITIALSUBMIT
js.Submit <- j
return nil
}
func NewExternalJobServ(cfg *Config) (pid int, err error) {
// make sure that this external 'goq' version matches
// our own.
detectVersionSkew()
//argv := os.Argv()
cmd := exec.Command(GoqExeName, "serve")
cmd.Env = cfg.Setenv(os.Environ())
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err = cmd.Start()
// reap so we don't zombie-fy, which makes
// it difficult for the test in fetch_test.go to detect that
// the process is indeed gone. This one liner fixes all that.
go func() { cmd.Wait() }()
if err != nil {
// if file not found, there will be no child pid;
// cmd.Process will be nil, and trying to fetch cmd.Process.Pid
// will crash.
return -1, err
}
return cmd.Process.Pid, err
}
// avoid version skew in tests when using an external binary 'goq'.
// go install or make avoids the issue, but sometimes that is forgotten,
// and we need a reminder to run make.
// Called by NewExternalJobServ(), perhaps others.
func detectVersionSkew() {
ver, err := exec.Command(GoqExeName, "version").Output()
if err != nil {
panic(err)
}
ver = bytes.TrimRight(ver, "\n")
my_ver := goq_version()
vers := string(ver)
if vers != my_ver {
panic(fmt.Sprintf("version skew detected, please run 'make' in the goq/ source directory to install the most recent 'goq' into $GOPATH/bin, and be sure that $GOPATH/bin is at the front of your $PATH. Version of 'goq' installed in path: '%s'. Version of this build: '%s'\n", vers, my_ver))
}
}
func NewJobServ(cfg *Config) (*JobServ, error) {
var err error
if cfg == nil {
cfg = DefaultCfg()
}
addr := cfg.JservAddr()
if cfg.Cypher == nil {
var key *CypherKey
key, err = OpenExistingOrCreateNewKey(cfg)
if err != nil || key == nil {
panic(fmt.Sprintf("could not open or create encryption key: %s", err))
}
cfg.Cypher = key
}
MoveToDirOrPanic(cfg.Home)
var pullsock *nn.Socket
var remote bool
if cfg.JservIP != "" {
remote = true
pullsock, err = MkPullNN(addr, cfg, false)
if err != nil {
panic(err)
}
VPrintf("[pid %d] JobServer bound endpoints addr: '%s'\n", os.Getpid(), addr)
}
js := &JobServ{
Name: fmt.Sprintf("jobserver.pid.%d", os.Getpid()),
Addr: addr,
Nnsock: pullsock,
RunQ: make(map[int64]*Job),
// KnownJobHash tracks actual (numbered) jobs, not worker-ready/request-to-object jake-job requests.
KnownJobHash: make(map[int64]*Job), // for fast lookup, jobs are either on WaitingJobs slice, or in RunQ table.
// avoid the same worker doublying up and filling the worker queue
// thus we avoid lots of spurious dispatch attempts.
DedupWorkerHash: make(map[string]bool),
WaitingJobs: make([]*Job, 0),
Submit: make(chan *Job),
ReSubmit: make(chan int64),
WorkerReady: make(chan *Job),
ToWorker: make(chan *Job),
RunDone: make(chan *Job),
SigMismatch: make(chan *Job),
BadNonce: make(chan *Job),
SnapRequest: make(chan *Job),
Cancel: make(chan *Job),
ImmoReq: make(chan *Job),
WorkerDead: make(chan *Job),
WorkerAckPing: make(chan *Job),
ObserveFinish: make(chan *Job), // when a submitter wants to wait for another job to be done.
NotifyFinishers: make(chan *Job),
DeafChan: make(chan int),
Ctrl: make(chan control),
Done: make(chan bool),
WaitingWorkers: make([]*Job, 0),
Who: make(map[string]*PushCache),
Finishers: make(map[int64][]Address),
ListenerShutdown: make(chan bool),
ListenerDone: make(chan bool),
//ListenerAckShutdown: make(chan bool),
FirstCancelDone: make(chan bool),
Pid: os.Getpid(),
Cfg: *cfg,
DebugMode: cfg.DebugMode,
Odir: cfg.Odir,
IsLocal: !remote,
NextJobId: 1,
NoReplay: NewNonceRegistry(NewRealTimeSource()),
FinishedRingMaxLen: DefaultFinishedRingMaxLen,
FinishedRing: make([]*Job, 0, DefaultFinishedRingMaxLen),
UnregSubmitWho: make(chan *Job),
}
VPrintf("ListenerShutdown channel created in ctor.\n")
js.diskToState()
if WebDebug {
js.Web = NewWebServer()
}
js.Start()
if remote {
//VPrintf("remote, server starting ListenForJobs() goroutine.\n")
TSPrintf("**** [jobserver pid %d] listening for jobs on '%s', output to '%s'. GOQ_HOME is '%s'.\n", js.Pid, js.Addr, js.Odir, js.Cfg.Home)
js.ListenForJobs(cfg)
}
return js, nil
}
func (js *JobServ) toWorkerChannelIfJobAvail() chan *Job {
if len(js.WaitingJobs) == 0 {
return nil
}
return js.ToWorker
}
func (js *JobServ) nextJob() *Job {
if len(js.WaitingJobs) == 0 {
return nil
}
js.WaitingJobs[0].Msg = schema.JOBMSG_DELEGATETOWORKER
return js.WaitingJobs[0]
}
func (js *JobServ) ConfirmOrMakeOutputDir(dirname string) error {
if !DirExists(dirname) {
err := os.Mkdir(dirname, 0775)
if err != nil {
return err
}
}
return nil
}
func (js *JobServ) WriteJobOutputToDisk(donejob *Job) {
VPrintf("WriteJobOutputToDisk() called for Job: %s\n", donejob)
var err error
local := false
var fn string
var odir string
// the directories on the submit host (where we start) may not match those on
// the server host where (where we finish), but it would be a common situation
// to have them be on the same host, hence we try to write back to donejob.Dir
// if at all possible.
if DirExists(donejob.Dir) {
odir = fmt.Sprintf("%s/%s", donejob.Dir, js.Odir)
err = js.ConfirmOrMakeOutputDir(odir)
if err == nil {
local = true
}
fn = fmt.Sprintf("%s/%s/out.%05d", donejob.Dir, js.Odir, donejob.Id)
}
// local is false, Drat, couldn't write to Dir on the server-host.
// Instead write to $GOQ_HOME/$GOQ_ODIR
if !local {
odir = fmt.Sprintf("%s/%s", js.Cfg.Home, js.Odir)
err = js.ConfirmOrMakeOutputDir(odir)
if err != nil {
TSPrintf("[pid %d] server job-done badness: could not make output directory '%s' for job %d output.\n", js.Pid, odir, donejob.Id)
return
}
fn = fmt.Sprintf("%s/%s/out.%05d", js.Cfg.Home, js.Odir, donejob.Id)
TSPrintf("[pid %d] drat, could not get to the submit-directory for job %d. Output to '%s' instead.\n", js.Pid, donejob.Id, fn)
}
// invar: fn is set.
// append if already existing file: so we can have incremental updates.
var file *os.File
if FileExists(fn) {
file, err = os.OpenFile(fn, os.O_RDWR|os.O_APPEND, 0666)
} else {
file, err = os.Create(fn)
}
if err != nil {
panic(err)
}
defer file.Close()
for i := range donejob.Out {
fmt.Fprintf(file, "%s\n", donejob.Out[i])
}
TSPrintf("[pid %d] jobserver wrote output for job %d to file '%s'\n", js.Pid, donejob.Id, fn)
}
func (js *JobServ) Start() {
go func() {
// Save state to disk on each heartbeat.
// Currently state is just NextJobId. See stateToDisk()
heartbeat := time.Tick(time.Duration(js.Cfg.Heartbeat) * time.Second)
var loopcount int64 = 0
for {
loopcount++
VPrintf(" - - - JobServ at top for Start() event loop, loopcount: (%d).\n", loopcount)
select {
case newjob := <-js.Submit:
VPrintf(" === event loop case === (%d) JobServ got from Submit channel a newjob, msg: %s, job: %s\n", loopcount, newjob.Msg, newjob)
if newjob.Id != 0 {
panic(fmt.Sprintf("new jobs should have zero (unassigned) Id!!! But, this one did not: %s", newjob))
}
curId := js.NewJobId()
newjob.Id = curId
js.KnownJobHash[curId] = newjob
// open and cache any sockets we will need.
js.RegisterWho(newjob)
if newjob.Msg == schema.JOBMSG_SHUTDOWNSERV {
VPrintf("JobServ got JOBMSG_SHUTDOWNSERV from Submit channel.\n")
go func() { js.Ctrl <- die }()
continue
}
TSPrintf("**** [jobserver pid %d] got job %d submission. Will run '%s'.\n", js.Pid, newjob.Id, newjob.Cmd)
js.WaitingJobs = append(js.WaitingJobs, newjob)
js.Dispatch()
// we just dispatched, now reply to submitter with ack (in an async goroutine); they don't need to
// wait for it, but often they will want confirmation/the jobid.
js.AckBack(newjob, newjob.Submitaddr, schema.JOBMSG_ACKSUBMIT, []string{})
case resubId := <-js.ReSubmit:
VPrintf(" === event loop case === (%d) JobServ got resub for jobid %d\n", loopcount, resubId)
js.CountDeaf++
resubJob, ok := js.RunQ[resubId]
if !ok {
// maybe it was cancelled in the meantime. don't panic.
TSPrintf("**** [jobserver pid %d] got re-submit of job %d that is now not on our RunQ, so dropping it without re-queuing.\n", js.Pid, resubId)
continue
}
js.Resub(resubJob)
case ackping := <-js.WorkerAckPing:
j, ok := js.RunQ[ackping.Aboutjid]
if ok {
j.Unansweredping = 0
now := time.Now()
j.Lastpingtm = now.UnixNano()
if ackping.Workeraddr != j.Workeraddr {
panic(fmt.Sprintf("ackping.Workeraddr(%s) must match j.Workeraddr(%s)", ackping.Workeraddr, j.Workeraddr))
}
if j.Id != ackping.Aboutjid {
panic(fmt.Sprintf("messed up RunQ?? j.Id(%d) must match ackping.Aboutjid(%d). RunQ: %#v", j.Id, ackping.Aboutjid, js.RunQ))
}
VPrintf("**** [jobserver pid %d] got ackping worker at '%s' running job %d. Lastpingtm now: %s\n", js.Pid, j.Workeraddr, j.Id, now)
// record info about running process:
j.Pid = ackping.Pid
j.Stm = ackping.Stm
} else {
TSPrintf("**** [jobserver pid %d] Problem? got ping back from worker at '%s' running job %d that was not in our RunQ???\n", js.Pid, ackping.Workeraddr, ackping.Aboutjid)
}
case reqjob := <-js.WorkerReady:
VPrintf(" === event loop case === (%d) JobServ got request for work from WorkerReady channel: %s\n", loopcount, reqjob)
if !js.IsLocal && reqjob.Workeraddr == "" {
// ignore bad packets
}
js.RegisterWho(reqjob)
if _, dup := js.DedupWorkerHash[reqjob.Workeraddr]; !dup {
js.WaitingWorkers = append(js.WaitingWorkers, reqjob)
js.DedupWorkerHash[reqjob.Workeraddr] = true
} else {
VPrintf("**** [jobserver pid %d] ignored duplicate worker-ready message from '%s'\n", js.Pid, reqjob.Workeraddr)
}
// TODO: if this worker had a job on the RunQ, take it off. Assume that the worker died while running it.
// It looks wierd to have a worker show up on both WaitingWorkers and the RunQ.
js.Dispatch()
case donejob := <-js.RunDone:
VPrintf(" === event loop case === (%d) JobServ got donejob from RunDone channel: %s\n", loopcount, donejob)
// we've got a new copy, with Out on it, but the old copy may have added listeners, so
// we'll need to merge in those Finishaddr too.
if donejob.Cancelled {
VPrintf("jserv: got donejob on js.RunDone that has .Cancelled set. donejob: %s\n", donejob)
js.CancelledJobCount++
if js.CancelledJobCount == 1 {
// allow cancel_test.go to not race
close(js.FirstCancelDone)
}
} else {
VPrintf("jserv: got donejob on js.RunDone without .Cancelled set. donejob: %s\n", donejob)
}
withFinishers, ok := js.RunQ[donejob.Id]
if !ok {
// just ignore, probably a re-issued job that finally woke up and came back.
// panic(fmt.Sprintf("got donejob %d for job(%s) from js.RunDone channel, but it was not in our js.RunQ: %#v", donejob.Id, donejob, js.RunQ))
fmt.Sprintf("ignoring donejob %d for job(%s) since js.RunQ does not show it active.\n", donejob.Id, donejob)
continue
}
kjh, ok := js.KnownJobHash[donejob.Id]
if !ok {
// just ignore, probably a re-issued job that finally woke up and came back.
if js.DebugMode {
TSPrintf("\n jobserv debugmode: got donejob %d for job(%s) from js.RunDone channel, but it was not in our js.KnownJobHash: %#v\n", donejob.Id, donejob, js.KnownJobHash)
}
continue
}
if withFinishers != kjh {
panic(fmt.Sprintf("withFinishers(%v) from RunQ did not agree with kjh(%v) from KnownJobHash", withFinishers, kjh))
}
donejob.Finishaddr = js.MergeAndDedupFinishers(donejob, withFinishers)
delete(js.RunQ, donejob.Id)
delete(js.KnownJobHash, donejob.Id)
js.FinishedJobsCount++
TSPrintf("**** [jobserver pid %d] worker finished job %d, removing from the RunQ\n", js.Pid, donejob.Id)
js.WriteJobOutputToDisk(donejob)
js.TellFinishers(donejob, schema.JOBMSG_JOBFINISHEDNOTICE)
js.AddToFinishedRingbuffer(donejob)
case cmd := <-js.Ctrl:
VPrintf(" === event loop case zowza === (%d) JobServ got control cmd: %v\n", loopcount, cmd)
switch cmd {
case die:
TSPrintf("**** [jobserver pid %d] jobserver got 'die' cmd on js.Ctrl. about to call js.Shutdown().\n", js.Pid)
js.Shutdown()
TSPrintf("**** [jobserver pid %d] jobserver got 'die' cmd on js.Ctrl. js.Shutdown() done. Exiting.\n", js.Pid)
close(js.Done)
return
case stateToDisk:
js.stateToDisk()
}
case js.DeafChanIfUpdate() <- js.CountDeaf:
VPrintf(" === event loop case === (%d) JobServ supplied js.CountDeaf on channel js.DeafChan.\n", loopcount)
// only one consumer gets each change; we only send on js.DeafChan once
// when CountDeaf changes; this prevents our (only) client from busy waiting.
js.PrevDeaf = js.CountDeaf
case badsigjob := <-js.SigMismatch:
//nothing doing with this job, it had a bad signature
js.BadSgtCount++
// ignore badsig packets; to prevent a bad worker from infinite looping/DOS-ing us.
if js.DebugMode {
addr := badsigjob.Submitaddr
if addr == "" {
addr = badsigjob.Workeraddr
}
TSPrintf("**** [jobserver pid %d] DebugMode: actively rejecting badsig message from '%s'.\n", js.Pid, addr)
if addr != "" {
js.RegisterWho(badsigjob)
js.AckBack(badsigjob, addr, schema.JOBMSG_REJECTBADSIG, []string{})
}
}
case badnoncejob := <-js.BadNonce:
// job was too old or duplicate (replay attack) nonce detected.
js.BadNonceCount++
if js.DebugMode {
addr := badnoncejob.Submitaddr
if addr == "" {
addr = badnoncejob.Workeraddr
}
TSPrintf("**** [jobserver pid %d] DebugMode: badnonce/too old message from '%s' (js.BadNonceCount now: %d): '%s'.\n", js.Pid, addr, js.BadNonceCount, badnoncejob)
}
case snapreq := <-js.SnapRequest:
VPrintf("\nStart: got snapreq: '%#v'\n", snapreq)
js.RegisterWho(snapreq)
shot := js.AssembleSnapShot(int(snapreq.MaxShow))
js.AckBack(snapreq, snapreq.Submitaddr, schema.JOBMSG_ACKTAKESNAPSHOT, shot)
//VPrintf("\nHandling snapreq: done with AckBack; shot was: '%#v'\n", shot)
case canreq := <-js.Cancel:
var j *Job
var ok bool
js.RegisterWho(canreq)
canid := canreq.Aboutjid
if j, ok = js.KnownJobHash[canid]; !ok {
js.AckBack(canreq, canreq.Submitaddr, schema.JOBMSG_JOBNOTKNOWN, []string{})
goto unreg
}
if _, running := js.RunQ[canid]; running {
// tell worker to stop
js.AckBack(canreq, j.Workeraddr, schema.JOBMSG_CANCELWIP, []string{})
VPrintf("**** [jobserver pid %d] server sent 'cancelwip' for job %d to '%s'.\n", js.Pid, canid, j.Workeraddr)
}
// if we don't remove from RunQ and KJH immediately, it looks wierd to the user.
delete(js.RunQ, canid)
delete(js.KnownJobHash, canid)