如果beyod是以多进程方式运行,不同的客户端会连接到不同的工作进程上,因为这些工作属于不同的地址空间,它们之间无法直接通讯,必须借助一些共享通道(如信号、unix domain socket、消息队列、socket等)才能通讯。
beyod开发之初,就充分考虑到分布式应用环境,为此内置了订阅发布模型的客户端和服务器端组件, 可以很容易构建跨服务器的分布式应用架构。
为了支持分布式消息通讯,同时引入了一些概念,需要预先深入了解:
每个beyod server实例的唯一id, 默认为1, 不同的beyod server实例,必须不能相同。取值范围1~65535。应该为server组件配置其server_id:
config/main.php
'components'=>[
'server' => [
'class' => 'beyoio\Server',
'server_id' => 174,
//...
]
]
工作进程的编号,这是一个只读属性,表示工作进程的顺序号,从1开始递增。
==如果一个进程崩溃后被主进程重新生成后,worker_id是不变的==。我们可以根据worker_id实现进程区别和标识。
运行时可以通过以下方式取得当前工作进程的workder_id:
$worker_id = \Yii::$app->server->getWorkerId();
全局进程唯一标识(Global process unique identification)
即工作进程在整个集群中的全局唯一标识,算法为:
$GPID = ($server_id << 16) + $worker_id;
可以使用以下方式取得当前进程在集群中的唯一标识:
$gpid = \Yii::$app->server->getGPID();
典型的,在beyod的分布式消息推送架构中,GPID作为频道名称使用。
分发器,beyod内建了分布式消息推送服务器组件,它的作用:
发布订阅模型
hash表共享存储
基于SplMaxHeap的优先队列
消息广播
dispatcher使用内存存储hash/队列数据,规划前要考虑到内存容量限制的问题。 生产环境建议使用数据库/缓存系统/队列系统来存储数据。dispatcher最适合的就场景就是分发消息而不是存储数据。
考虑到dipatch server仅仅只是转发消息,并不参与复杂的业务逻辑处理,它本身即便以单进程方式运行,也很难形成瓶颈。如果业务量较大,单个dispatcher server无法支持,那么完全可以部署多个dispatcher,实现负载均衡架构。
dispatcher server的配置
config/main.php
'components'=>[
'server'=>[
'class' => 'beyoio\Server',
'worker_num'=>1, //以单进程方式运行,保证数据是共享唯一的
'listeners' => [
'class' => 'beyoio\Listener',
'listen'=>'tcp://0.0.0.0:2491'
'handler'=>'beyoio\dispatcher\Handler',
'parser' =>'beyoio\dispatcher\Parser'
],
]
]
客户端
例如:我们可以在worker启动之后,就订阅频道并准消息的接收处理(假设dispatcher的ip为192.168.0.101):
appMyServer.php中的内容:
namespace app;
use beyoio\Server;
use beyoio\dispatcher\Client;
class MyServer extends Server
{
public $client;
//此回调在工作进程启动后执行
public function onWorkerStart($workerId, $GPID)
{
parent::onWorkerStart($workerId, $GPID);
$this->connectDispatcher($GPID);
}
/**
* 连接订阅服务器
*/
protected function connectDispatcher()
{
$options = [
'target' => 'tcp://192.168.0.101:2491',
'reconnect_interval' => 6, //连接被对方断开后,6秒后重连。
];
$this->client = new Client($options);
$this->client->default_channel=$this->getGpid();//连接成功后,即订阅频道,频道号即为GPID
$this->client->on(Server::ON_MESSAGE, function(MessageEvent $event){
print_r($event->message); //收到消息时的回调
});
//连接到分发服务器。
$this->client->connect();
}
}
beyoiodispatcherClient
beyoiodispatcherClient分发器客户端连接组件:
<?php
namespace beyoio\dispatcher;
use beyoio\StreamClient;
use beyoio\Server;
use beyoio\IOEvent;
class Client extends StreamClient
{
public $reconnect_interval = 3; //连接异常中断后的重连间隔3秒
//连接后自动订阅的频道
public $default_channel = [];
/**
* 订阅一个或多个频道
* @param array|string $keys The channel names to subscribe to
*/
public function subscribe($keys)
{
return $this->send(['command'=>'subscribe', 'key'=>$keys, 'seq'=>$this->getSequence()]);
}
/**
* 取消订阅一个或多个频道
* @param array|string $keys The channel names to cancel
*/
public function unsubscribe($keys)
{
return $this->send(['command'=>'unsubscribe', 'key'=>$keys, 'seq'=>$this->getSequence()]);
}
/**
* 向一个或多个频道上推送消息
* @param string|array $key
* @param mixed $value
*/
public function publish($key, $value)
{
return $this->send(['command'=>'publish', 'key'=>$keys, 'value'=>$value, 'seq'=>$this->getSequence()]);
}
/**
* 向所有频道广播消息
* @param mixed $value
*/
public function broadcat($value)
{
return $this->send(['command'=>'broadcat','value'=>$value, 'seq'=>$this->getSequence()]);
}
/**
* 获取已经订阅的频道集合
*/
public function channels()
{
$this->send(['command' => 'channels', 'seq'=>$this->getSequence()]);
}
/**
* 设置一个hash表条目
* @param string $key
* @param mixed $value
* @param int $expire 缓存过期绝对时间戳 0表示永不过期
*/
public function set($key, $value, $expire=0)
{
return $this->send(['command'=>'set', 'key'=>$key, 'value'=>$value, 'seq'=>$this->getSequence()]);
}
/**
* 读取一个hash表条目
* @param string $key
*/
public function get($key)
{
return $this->send(['command'=>'get', 'key'=>$key, 'seq'=>$this->getSequence()]);
}
/**
* 删除一个hash表条目
* @param string $key
*/
public function delete($key)
{
return $this->send(['command'=>'delete', 'key'=>$key, 'seq'=>$this->getSequence()]);
}
//删除所有hash条目
public function deleteAll()
{
return $this->send(['command'=>'deleteall','seq'=>$this->getSequence()]);
}
/**
* 向队列中追加记录
* @param string $key queue name
* @param mixed $value data
* @param int $priority
* @see http://php.net/manual/en/splpriorityqueue.insert.php
*/
public function qpush($key, $value)
{
return $this->send(['command'=>'qpush', 'key'=>$key,'value'=>$value, 'seq'=>$this->getSequence()]);
}
/**
* 从队列中取出记录
* @param string $key queue name
* @param string $block whether enable blocking mode. In blocking mode, the server returns only if there is data in the queue
* @return void|boolean|number
*/
public function qpop($key, $block=false){
return $this->send(['command'=>'qpop', 'key'=>$key, 'block'=>$block, 'seq'=>$this->getSequence()]);
}
/**
* 清空指定队列
* @param string $key queue name
*/
public function qdelete($key)
{
return $this->send(['command'=>'qdelete', 'key'=>$key, 'seq'=>$this->getSequence()]);
}
/**
* 清空所有队列
* @param string $key queue name
*/
public function qdeleteAll($key)
{
return $this->send(['command'=>'qdeleteall', 'key'=>$key, 'seq'=>$this->getSequence()]);
}
}
协议格式
dispatcher消息使用二进制json格式,由包头和数据部分组成。使用json格式而不是PHP序列化,就很容易兼容多种编程语言。
包头是一个4字节无符号的整数,指明了整个数据包的长度,数据部分即json字符串。以下描述中,省去包头部分,着重关注数据部分:
{"command":"subscribe", "key": [], "seq":""}
key:是一个字符串数组,指定了要订阅的频道
seq:当前消息的唯一标识,由client自动生成。
响应结果:
{"command":"subscribe","key":[], "code":0,"message":"ok", "seq":""}
{"command":"unsubscribe", "key": [], "seq":""}
响应结果:
{"command":"unsubscribe","key":[], "code":0,"message":"ok", "seq":""}
{"command":"publish", "key": [],"value": "", "seq":""}
value即为消息内容
响应结果:
{"command":"publish", "key": [],"value": "", "seq":""}
{"command":"message", "value":"", "source":"消息发送者的ip和端口"}