Skip to content

Commit

Permalink
Merge pull request #1 from Reasno/0.1.1
Browse files Browse the repository at this point in the history
0.1.1
  • Loading branch information
Reasno authored Mar 27, 2020
2 parents 371ed8d + 30ac19c commit 53c32c0
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 50 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"hyperf/config": "^1.1",
"hyperf/di": "^1.1",
"hyperf/testing": "1.1.*",
"phpstan/phpstan": "^0.10.5",
"phpstan/phpstan": "^0.12",
"swoft/swoole-ide-helper": "dev-master"
},
"config": {
Expand Down
5 changes: 4 additions & 1 deletion example/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/reasno/gotask/pkg/gotask"
"io/ioutil"
"log"
)

// App sample
Expand Down Expand Up @@ -49,5 +50,7 @@ func (a *App) HelloError(name interface{}, r *interface{}) error {

func main() {
gotask.Register(new(App))
gotask.Run()
if err := gotask.Run(); err != nil {
log.Fatal(err)
}
}
4 changes: 2 additions & 2 deletions pkg/gotask/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func GetAddress() string {

func Run() error {
var g run.Group
{
if *address == "" {
relay := goridge.NewPipeRelay(os.Stdin, os.Stdout)
codec := goridge.NewCodecWithRelay(relay)
g.Add(func() error {
rpc.ServeCodec(codec)
return nil
return fmt.Errorf("pipe is closed")
}, func(err error) {
_ = os.Stdin.Close()
_ = os.Stdout.Close()
Expand Down
4 changes: 2 additions & 2 deletions publish/gotask.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
}),
'pool' => [
'min_connections' => 1,
'max_connections' => 100,
'max_connections' => 30,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'wait_timeout' => 30.0,
'heartbeat' => -1,
'max_idle_time' => (float) env('GOTASK_MAX_IDLE_TIME', 60),
],
Expand Down
14 changes: 11 additions & 3 deletions src/LocalGoTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ public function call(string $method, $payload, int $flags = 0){
}
$returnChannel = new Channel(1);
$this->taskChannel->push([$method, $payload, $flags, $returnChannel]);
return $returnChannel->pop();
$result = $returnChannel->pop();
if ($result instanceof \Throwable){
throw $result;
}
return $result;
}

private function start()
Expand All @@ -47,8 +51,12 @@ private function start()
);
while (true) {
[$method, $payload, $flag, $returnChannel] = $this->taskChannel->pop();
$result = $task->call($method, $payload, $flag);
$returnChannel->push($result);
try{
$result = $task->call($method, $payload, $flag);
$returnChannel->push($result);
} catch (\Throwable $e){
$returnChannel->push($e);
}
}
}
}
37 changes: 37 additions & 0 deletions src/Relay/CoroutineSocketRelay.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use Spiral\Goridge\Exceptions\GoridgeException;
use Spiral\Goridge\Exceptions\InvalidArgumentException;
use Spiral\Goridge\Exceptions\RelayException;
use Swoole\Coroutine\Socket;

/**
Expand Down Expand Up @@ -87,6 +88,15 @@ public function __construct(string $address, int $port = null, int $type = self:
$this->type = $type;
}

public function __toString(): string
{
if ($this->type == self::SOCK_TCP) {
return "tcp://{$this->address}:{$this->port}";
}

return "unix://{$this->address}";
}

/**
* @return string
*/
Expand All @@ -111,6 +121,33 @@ public function getType(): int
return $this->type;
}


/**
* Ensure socket connection. Returns true if socket successfully connected
* or have already been connected.
*
* @throws RelayException
* @throws \Error when sockets are used in unsupported environment
*/
public function connect(): bool
{
if ($this->isConnected()) {
return true;
}

$this->socket = $this->createSocket();
try {
// Port type needs to be int, so we convert null to 0
if ($this->socket->connect($this->address, $this->port ?? 0) === false) {
throw new RelayException(sprintf('%s (%s)', $this->socket->errMsg, $this->socket->errCode));
}
} catch (\Exception $e) {
throw new RelayException("unable to establish connection {$this}", 0, $e);
}

return true;
}

/**
* @throws GoridgeException
* @return Socket
Expand Down
35 changes: 0 additions & 35 deletions src/Relay/SocketTransporter.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@ public function __destruct()
}
}

public function __toString(): string
{
if ($this->type == self::SOCK_TCP) {
return "tcp://{$this->address}:{$this->port}";
}

return "unix://{$this->address}";
}

/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -82,32 +73,6 @@ public function isConnected(): bool
return $this->socket != null;
}

/**
* Ensure socket connection. Returns true if socket successfully connected
* or have already been connected.
*
* @throws RelayException
* @throws \Error when sockets are used in unsupported environment
*/
public function connect(): bool
{
if ($this->isConnected()) {
return true;
}

$this->socket = $this->createSocket();
try {
// Port type needs to be int, so we convert null to 0
if ($this->socket->connect($this->address, $this->port ?? 0) === false) {
throw new RelayException(sprintf('%s (%s)', $this->socket->errMsg, $this->socket->errCode));
}
} catch (\Exception $e) {
throw new RelayException("unable to establish connection {$this}", 0, $e);
}

return true;
}

/**
* Close connection.
*
Expand Down
16 changes: 10 additions & 6 deletions tests/Cases/CoroutineSocketTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,28 @@
class CoroutineSocketTest extends AbstractTestCase
{
const UNIX_SOCKET = '/tmp/test.sock';

/**
* @var RPC
* @var Process
*/
private $task;

private $p;

public function setUp()
{
! defined('BASE_PATH') && define('BASE_PATH', dirname(__DIR__, 1));
@unlink(self::UNIX_SOCKET);
$p = new Process(function (Process $process) {
$this->p = new Process(function (Process $process) {
$process->exec(__DIR__ . '/../../app', ['-address', self::UNIX_SOCKET]);
});
$p->start();
$this->p->start();
sleep(1);
}

public function tearDown()
{
Process::kill($this->p->pid);
}


public function testOnCoroutine()
{
\Swoole\Coroutine\run(function () {
Expand Down
5 changes: 5 additions & 0 deletions tests/Cases/IPCRelayTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public function setUp()
sleep(1);
}

public function tearDown()
{
Process::kill($this->p->pid);
}

public function testOnCoroutine()
{
\Swoole\Coroutine\run(function () {
Expand Down

0 comments on commit 53c32c0

Please sign in to comment.