异步连接器StreamClient

当beyod server需要和其它服务器交互时, 这时就作为tcp/udp/unix socket客户端,StreamClient提供了此功能支持。

特性:

  1. 支持多种协议tcp/udp/unix socket
  2. 异步非阻塞、事件驱动
  3. 支持自定义协议解析器,理论上可以实现任何协议解析。
  4. 自定义事件回调
  5. 连接保持和断线自动重连机制。

 范例:一个异步redis客户端连接

一般来说,应该在Server::onWorkerStart或Handler::init方法中初始化StreamClient连接。以下是一个异步Redis连接过程。

appMyServer

<?php

namespace app;

use Yii;
use beyod\Server;
use beyod\dispatcher\Client;
use beyod\MessageEvent;
use beyod\IOEvent;
use beyod\protocol\redis\Request;
use beyod\protocol\dns\Message;

class MyServer extends \beyod\Server
{
    public $redis_server = 'tcp://192.168.1.19:6379';
    public $redisClient;
    
    public function onWorkerStart($workerId, $GPID)
    {
        parent::onWorkerStart($workerId, $GPID);
        $this->connectRedisServer(); //工作进程启动后,就开始连接到Redis服务器
    }
    
    protected function connectRedisServer()
    {
        //设置连接目标,重连间隔,连接超时时间
        $options = [
            'target' => $this->redis_server,
            'reconnect_interval' => 5, //连接异常断开, 5秒后重连
            'timeout' => 10, //连接超时10秒
        ];
        
        //初始化一个连接对象,此时还未开始连接
        $this->redisClient = new \beyod\protocol\redis\Client($options);
        
        //收到数据时的回调
        $this->redisClient->on(Server::ON_MESSAGE, function($event){
            /** 
             * @var MessageEvent $event
             * @var Request $event->message  
             * */
            print_r($event->message);
            
        });
        
        //连接成功之后就可以发送数据了
        $this->redisClient->on(Server::ON_CONNECT, function(IOEvent $event){
            Yii::debug($event->sender.' connect ok ');
            
            $this->redisClient->set('a',serialize($_SERVER));
            $this->redisClient->get('a');
        });
        
        //事件回调成功 开始连接。
        $this->redisClient->connect();
    }
}

beyodStreamClient实现了基础的客户端连接,自定义协议时,应该在此类上扩展实现。

StreamClient类参考

常量:
const STATUS_CONNECTING = 1; 正在建立状态
const STATUS_ESTABLISHED = 2; 连接已经建立
const STATUS_CLOSED = 0; 连接已经关闭 连接失败,被动关闭或主动关闭

属性
static $connections = []
只读属性,当前所有StreamClient对象,键名为id, 值为StreamClient对象

$parser string|array|Parser


当前连接的协议解析器配置。

$reconnect_interval = 0;
连接失败或连接被动关闭时,重新连接间隔秒数,0表示断开后不自动重新连接,此值可以使用小数。

$options=[];

socket创建时的配置选项,参阅:
http://php.net/manual/en/function.socket-set-option.php

http://php.net/manual/en/sockets.constants.php

键名为选项常量名字符串而不是常量值。

默认的选项参数:


'backlog' => 256,
'SO_REUSEADDR' => 1,//重用已经断开的连接5元组(源地址、端口、目标地址、端口、传输层协议),对需要建立大量短连接的客户端,必须设置此值
'SO_KEEPALIVE' => 1,  //启用系统内核层的tcp keepalive机制
'backlog' => 256, //tcp连接队列值
'SO_REUSEADDR' => 1, 'SO_KEEPALIVE' => 1, //启用系统内核层的tcp keepalive机制
'SO_LINGER' => null, 
'SO_SNDBUF' => null, 
'SO_RCVBUF' => null, 
'TCP_NODELAY' => 1 

$ssl=[]

使用ssl连接时的选项,

可使用的选项: http://php.net/manual/en/context.ssl.php

php ssl使用方法: https://www.devdungeon.com/content/how-use-ssl-sockets-php
默认选项:

'enabled' => false, //是否启用ssl
'verify_peer' => false, //是否验证服务器证书
'verify_perr_name' => false, //是否验证对方主机名
'allow_self_signaed' => true, //是否允许自签名的证书

$target
连接目标, 传输层协议/目标地址/端口三部分组成,如果使用ipv6, ip部分必须使用方括号括起来。
http://php.net/manual/en/transports.inet.php
http://php.net/manual/en/transports.unix.php

$timeout=30
连接超时秒数, 0表示使用系统默认超时值。

$max_send_buffer_size=10485760
单个连接的发送缓冲区字节数,如果缓冲区内容超出此值,Server::ON_BUFFER_FULL事件将被激活,并且开始丢弃最新的发送数据包。

$read_buffer_size = 65536
接收数据时,每次读取的字节数,一般无须修改此值

string|array|Parser $parser
协议解析器组件配置信息,可以是解析器类名、配置数组、Parser对象。

$peer
连接对端的地址和端口,分号连接

$local
本地地址和端口,只有在连接建立后才有意义。

方法

pipe(Connection $target)
为当前连接设置管道连接,常用来作代理、数据库连接池等。
StreamClient代表了beyod与后端服务器的连接,而Connection为客户端与beyod之间的连接,客户端的任何数据包,将原样通过StreamClient发送给后端服务器,而后端服务器返回的数据,将通过Connection发送给客户端,从而实现代理转发机制。

unpipe(void)
同上,取消管道连接

setAttribute(string $name, $value=null)
为当前连接设置一个属性,参数分别为属性名称,属性值,null表示删除指定的属性名。例如我们可以在连接建立之后,将客户端的验证状态,客户端参数保存在连接对象上,后续交互中就可以通过getAttribute($name)取得此属性值。

mixed function getAttribute($name, $default = null)
同上,读取指定的属性的值

bool function hasAttribute($name)
判断指定的属性值是否存在

bool function getIsClosed()
判断当前连接是否为关闭状态

bool function getIsEstablished()
判断当前连接是否为已经建立状态

function __toString(){
return "$this->id $this->peer $this->local";
}

__toString魔术方法

int getId()
当前连接的id, beyod使用id复用机制,不存在连接id持续增长耗尽的问题。

StreamClient connect($renew=false)
开始连接,$renew:是否强制重新连接,否则只有在连接未建立或关闭的情况下才重新连接

StreamClient StreamClient function prepare()
针对udp,使用prepare而不是connect, 为了保持字面上的意义(虽然本质上是调用了connect)。

string getScheme()
获取传输层协议

bool isSSL()
判断当前连接是否启用SSL

bool isTCP() 是否为tcp

bool isUDP() 是否为udp/udg

function isUnix() 是否为unix套接字

int send(mixed $message, $raw=false)
向服务器发送数据, $raw:是否只发送原始字节流而不使用parser封包。

void close()
主动关闭当前连接

float getResponseAt()
获取服务器最近响应数据到达时间戳

float getConnectAt()
最近一次连接建立时间戳

事件的绑定和处理


$options = [
  'target' => 'tcp://127.0.0.1:3306',
  'reconnect_interval' => 1,
  'timeout' => 10,
];
$client = new \beyod\StreamClient($options);

 //收到数据时的回调
$client->on(Server::ON_MESSAGE, function($event){
	/** 
    * @var MessageEvent $event
    * @var Request $event->message  
    * */
   print_r($event->message);
});

 //连接成功之后就可以发送数据了
$client->on(Server::ON_CONNECT, function(IOEvent $event) use($client){
  Yii::debug($event->sender.' connect ok ');
            
  $client->send('hello!');
});

//尝试连接
$client->connect();
        


事件列表

事件功能参数类型
Server::ON_CONNECT_FAILED连接失败 ErrorEvent
Server::ON_CLOSE 连接被动或主动关闭CloseEvent
Server::ON_BEFORE_CONNECT准备连接前IOEvent
Server::ON_CONNECT连接建立成功IOEvent
Server::ON_BAD_PACKET收到错误的数据包ErrorEvent
Server::ON_MESSAGE收到完整数据包MessageEvent
Server::ON_BUFFER_FULL缓冲区已满IOEvent
Server::ON_BUFFER_DRAIN缓冲区已空IOEvent