Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[go function] support localrun and cluster mode for go function (#4174)
Browse files Browse the repository at this point in the history
### Motivation

Master Issue: #3767 

support  local-run and cluster mode for go function.

in go function, we can use:

```
./bin/pulsar-admin functions localrun/create  
--go /Users/wolf4j/github.com/apache/pulsar/pulsar-function-go/examples/outputFunc.go 
--inputs persistent://public/default/my-topic 
--output persistent://public/default/test 
--tenant public 
--namespace default 
--name pulsarfunction 
--classname hellopulsar 
--log-topic logtopic
```

Different from `--jar` or `--py`, `--go` uploads a complete executable file(including: instance file + user code file)
  • Loading branch information
wolfstudy authored and sijie committed May 6, 2019
1 parent 93f4d41 commit 42c3bf9
Show file tree
Hide file tree
Showing 18 changed files with 394 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ abstract class FunctionDetailsCommand extends BaseCommand {
description = "Path to the main Python file/Python Wheel file for the function (if the function is written in Python)",
listConverter = StringConverter.class)
protected String pyFile;
@Parameter(
names = "--go",
description = "Path to the main Go executable binary for the function (if the function is written in Go)")
protected String goFile;
@Parameter(names = {"-i",
"--inputs"}, description = "The function's input topic or topics (multiple topics can be specified as a comma-separated list)")
protected String inputs;
Expand Down Expand Up @@ -477,19 +481,28 @@ void processArguments() throws Exception {
functionConfig.setPy(pyFile);
}

if (null != goFile) {
functionConfig.setGo(goFile);
}

if (functionConfig.getJar() != null) {
userCodeFile = functionConfig.getJar();
} else if (functionConfig.getPy() != null) {
userCodeFile = functionConfig.getPy();
} else if (functionConfig.getGo() != null) {
userCodeFile = functionConfig.getGo();
}

// check if configs are valid
validateFunctionConfigs(functionConfig);
}

protected void validateFunctionConfigs(FunctionConfig functionConfig) {
if (StringUtils.isEmpty(functionConfig.getClassName())) {
throw new IllegalArgumentException("No Function Classname specified");
// go doesn't need className
if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON || functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA){
if (StringUtils.isEmpty(functionConfig.getClassName())) {
throw new IllegalArgumentException("No Function Classname specified");
}
}
if (StringUtils.isEmpty(functionConfig.getName())) {
org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig);
Expand All @@ -501,13 +514,13 @@ protected void validateFunctionConfigs(FunctionConfig functionConfig) {
org.apache.pulsar.common.functions.Utils.inferMissingNamespace(functionConfig);
}

if (isNotBlank(functionConfig.getJar()) && isNotBlank(functionConfig.getPy())) {
throw new ParameterException("Either a Java jar or a Python file needs to"
if (isNotBlank(functionConfig.getJar()) && isNotBlank(functionConfig.getPy()) && isNotBlank(functionConfig.getGo())) {
throw new ParameterException("Either a Java jar or a Python file or a Go executable binary needs to"
+ " be specified for the function. Cannot specify both.");
}

if (isBlank(functionConfig.getJar()) && isBlank(functionConfig.getPy())) {
throw new ParameterException("Either a Java jar or a Python file needs to"
if (isBlank(functionConfig.getJar()) && isBlank(functionConfig.getPy()) && isBlank(functionConfig.getGo())) {
throw new ParameterException("Either a Java jar or a Python file or a Go executable binary needs to"
+ " be specified for the function. Please specify one.");
}

Expand All @@ -519,6 +532,10 @@ protected void validateFunctionConfigs(FunctionConfig functionConfig) {
!new File(functionConfig.getPy()).exists()) {
throw new ParameterException("The specified python file does not exist");
}
if (!isBlank(functionConfig.getGo()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getGo()) &&
!new File(functionConfig.getGo()).exists()) {
throw new ParameterException("The specified go executable binary does not exist");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public enum ProcessingGuarantees {

public enum Runtime {
JAVA,
PYTHON
PYTHON,
GO
}

// Any flags that you want to pass to the runtime.
Expand Down Expand Up @@ -97,6 +98,7 @@ public enum Runtime {
private Long timeoutMs;
private String jar;
private String py;
private String go;
// Whether the subscriptions the functions created/used should be deleted when the functions is deleted
private Boolean cleanupSubscription;
}
105 changes: 63 additions & 42 deletions pulsar-function-go/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package conf

import (
"encoding/json"
"flag"
"io/ioutil"
"os"
Expand All @@ -33,60 +34,78 @@ import (
const ConfigPath = "github.com/apache/pulsar/pulsar-function-go/conf/conf.yaml"

type Conf struct {
PulsarServiceURL string `yaml:"pulsarServiceURL"`
InstanceID int `yaml:"instanceID"`
FuncID string `yaml:"funcID"`
FuncVersion string `yaml:"funcVersion"`
MaxBufTuples int `yaml:"maxBufTuples"`
Port int `yaml:"port"`
ClusterName string `yaml:"clusterName"`
KillAfterIdleMs time.Duration `yaml:"killAfterIdleMs"`
PulsarServiceURL string `json:"pulsarServiceURL" yaml:"pulsarServiceURL"`
InstanceID int `json:"instanceID" yaml:"instanceID"`
FuncID string `json:"funcID" yaml:"funcID"`
FuncVersion string `json:"funcVersion" yaml:"funcVersion"`
MaxBufTuples int `json:"maxBufTuples" yaml:"maxBufTuples"`
Port int `json:"port" yaml:"port"`
ClusterName string `json:"clusterName" yaml:"clusterName"`
KillAfterIdleMs time.Duration `json:"killAfterIdleMs" yaml:"killAfterIdleMs"`
// function details config
Tenant string `yaml:"tenant"`
NameSpace string `yaml:"nameSpace"`
Name string `yaml:"name"`
LogTopic string `yaml:"logTopic"`
ProcessingGuarantees int32 `yaml:"processingGuarantees"`
SecretsMap string `yaml:"secretsMap"`
Runtime int32 `yaml:"runtime"`
AutoACK bool `yaml:"autoAck"`
Parallelism int32 `yaml:"parallelism"`
Tenant string `json:"tenant" yaml:"tenant"`
NameSpace string `json:"nameSpace" yaml:"nameSpace"`
Name string `json:"name" yaml:"name"`
LogTopic string `json:"logTopic" yaml:"logTopic"`
ProcessingGuarantees int32 `json:"processingGuarantees" yaml:"processingGuarantees"`
SecretsMap string `json:"secretsMap" yaml:"secretsMap"`
Runtime int32 `json:"runtime" yaml:"runtime"`
AutoACK bool `json:"autoAck" yaml:"autoAck"`
Parallelism int32 `json:"parallelism" yaml:"parallelism"`
//source config
SubscriptionType int32 `yaml:"subscriptionType"`
TimeoutMs uint64 `yaml:"timeoutMs"`
SubscriptionName string `yaml:"subscriptionName"`
CleanupSubscription bool `yaml:"cleanupSubscription"`
SubscriptionType int32 `json:"subscriptionType" yaml:"subscriptionType"`
TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"`
SubscriptionName string `json:"subscriptionName" yaml:"subscriptionName"`
CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"`
//source input specs
SourceSpecTopic string `yaml:"sourceSpecsTopic"`
SourceSchemaType string `yaml:"sourceSchemaType"`
IsRegexPatternSubscription bool `yaml:"isRegexPatternSubscription"`
ReceiverQueueSize int32 `yaml:"receiverQueueSize"`
SourceSpecTopic string `json:"sourceSpecsTopic" yaml:"sourceSpecsTopic"`
SourceSchemaType string `json:"sourceSchemaType" yaml:"sourceSchemaType"`
IsRegexPatternSubscription bool `json:"isRegexPatternSubscription" yaml:"isRegexPatternSubscription"`
ReceiverQueueSize int32 `json:"receiverQueueSize" yaml:"receiverQueueSize"`
//sink spec config
SinkSpecTopic string `yaml:"sinkSpecsTopic"`
SinkSchemaType string `yaml:"sinkSchemaType"`
SinkSpecTopic string `json:"sinkSpecsTopic" yaml:"sinkSpecsTopic"`
SinkSchemaType string `json:"sinkSchemaType" yaml:"sinkSchemaType"`
//resources config
Cpu float64 `yaml:"cpu"`
Ram int64 `yaml:"ram"`
Disk int64 `yaml:"disk"`
Cpu float64 `json:"cpu" yaml:"cpu"`
Ram int64 `json:"ram" yaml:"ram"`
Disk int64 `json:"disk" yaml:"disk"`
//retryDetails config
MaxMessageRetries int32 `yaml:"maxMessageRetries"`
DeadLetterTopic string `yaml:"deadLetterTopic"`
MaxMessageRetries int32 `json:"maxMessageRetries" yaml:"maxMessageRetries"`
DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
}

var opts string
var (
help bool
confFilePath string
confContent string
)

func (c *Conf) GetConf() *Conf {
flag.Parse()

yamlFile, err := ioutil.ReadFile(opts)
if err != nil {
log.Errorf("not found conf file, err:%s", err.Error())
return nil
if help {
flag.Usage()
}

if confFilePath != "" {
yamlFile, err := ioutil.ReadFile(confFilePath)
if err != nil {
log.Errorf("not found conf file, err:%s", err.Error())
return nil
}
err = yaml.Unmarshal(yamlFile, c)
if err != nil {
log.Errorf("unmarshal yaml file error:%s", err.Error())
return nil
}
}
err = yaml.Unmarshal(yamlFile, c)
if err != nil {
log.Errorf("unmarshal yaml file error:%s", err.Error())
return nil

if confContent != "" {
err := json.Unmarshal([]byte(confContent), c)
if err != nil {
log.Errorf("unmarshal config content error:%s", err.Error())
return nil
}
}
return c
}
Expand All @@ -105,5 +124,7 @@ func init() {
homeDir = os.Getenv("HOME")
}
defaultPath := homeDir + "/" + ConfigPath
flag.StringVar(&opts, "instance-conf", defaultPath, "config conf.yml filepath")
flag.BoolVar(&help, "help", false, "print help cmd")
flag.StringVar(&confFilePath, "instance-conf-path", defaultPath, "config conf.yml filepath")
flag.StringVar(&confContent, "instance-conf", "", "the string content of Conf struct")
}
2 changes: 1 addition & 1 deletion pulsar-function-go/examples/test/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {
defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-02",
Topic: "test",
SubscriptionName: "my-subscription",
Type: pulsar.Shared,
})
Expand Down
2 changes: 1 addition & 1 deletion pulsar-function-go/examples/test/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func main() {
defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "topic-01",
Topic: "my-topic",
})

defer producer.Close()
Expand Down
5 changes: 3 additions & 2 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (gi *goInstance) nackInputMessage(inputMessage pulsar.Message) {
}

func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
if timeoutMilliSecond < 0 {
if timeoutMilliSecond <= 0 {
return time.Duration(math.MaxInt64)
}
return timeoutMilliSecond
Expand All @@ -284,7 +284,8 @@ func (gi *goInstance) setupLogHandler() error {

func (gi *goInstance) addLogTopicHandler() {
if gi.context.logAppender == nil {
panic("please init logAppender")
log.Error("the logAppender is nil, if you want to use it, please specify `--log-topic` at startup.")
return
}

for _, logByte := range log.StrEntry {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance.go;

import lombok.Getter;
import lombok.Setter;

@Setter
@Getter
public class GoInstanceConfig {
private String pulsarServiceURL = "";
private int instanceID;
private String funcID = "";
private String funcVersion = "";
private int maxBufTuples;
private int port;
private String clusterName = "";
private int killAfterIdleMs;

private String tenant = "";
private String nameSpace = "";
private String name = "";
private String className = "";
private String logTopic = "";
private int processingGuarantees;
private String secretsMap = "";
private int runtime;
private boolean autoAck;
private int parallelism;

private int subscriptionType;
private long timeoutMs;
private String subscriptionName = "";
private boolean cleanupSubscription;

private String sourceSpecsTopic = "";
private String sourceSchemaType = "";
private boolean isRegexPatternSubscription;
private int receiverQueueSize;

private String sinkSpecsTopic = "";
private String sinkSchemaType = "";

private double cpu;
private long ram;
private long disk;

private int maxMessageRetries;
private String deadLetterTopic = "";
}
10 changes: 10 additions & 0 deletions pulsar-functions/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@
<artifactId>jcommander</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ public class KubernetesRuntime implements Runtime {
case PYTHON:
logConfigFile = pulsarRootDir + "/conf/functions-logging/console_logging_config.ini";
break;
case GO:
throw new UnsupportedOperationException();
}

this.authConfig = authConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,16 @@ public boolean externallyManaged() {
public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String codePkgUrl,
String originalCodeFileName,
Long expectedHealthCheckInterval) throws Exception {
String instanceFile;
String instanceFile = null;
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
instanceFile = javaInstanceJarFile;
break;
case PYTHON:
instanceFile = pythonInstanceFile;
break;
case GO:
throw new UnsupportedOperationException();
default:
throw new RuntimeException("Unsupported Runtime " + instanceConfig.getFunctionDetails().getRuntime());
}
Expand Down
Loading

0 comments on commit 42c3bf9

Please sign in to comment.