集群架构:分布式进程通讯

如果beyod是以多进程方式运行,不同的客户端会连接到不同的工作进程上,因为这些工作属于不同的地址空间,它们之间无法直接通讯,必须借助一些共享通道(如信号、unix domain socket、消息队列、socket等)才能通讯。

beyod开发之初,就充分考虑到分布式应用环境,为此内置了订阅发布模型的客户端和服务器端组件, 可以很容易构建跨服务器的分布式应用架构。

为了支持分布式消息通讯,同时引入了一些概念,需要预先深入了解:

server_id

每个beyod server实例的唯一id, 默认为1, 不同的beyod server实例,必须不能相同。取值范围1~65535。应该为server组件配置其server_id:
config/main.php

'components'=>[
    'server' => [
        'class' => 'beyoio\Server',
        'server_id' => 174,
        //...
    ]
]

worker_id

工作进程的编号,这是一个只读属性,表示工作进程的顺序号,从1开始递增。
==如果一个进程崩溃后被主进程重新生成后,worker_id是不变的==。我们可以根据worker_id实现进程区别和标识。

运行时可以通过以下方式取得当前工作进程的workder_id:

$worker_id = \Yii::$app->server->getWorkerId();

GPID

全局进程唯一标识(Global process unique identification)

即工作进程在整个集群中的全局唯一标识,算法为:

$GPID = ($server_id << 16) + $worker_id;

可以使用以下方式取得当前进程在集群中的唯一标识:

$gpid = \Yii::$app->server->getGPID();

典型的,在beyod的分布式消息推送架构中,GPID作为频道名称使用。

dispatcher

分发器,beyod内建了分布式消息推送服务器组件,它的作用:

  1. 接受客户端的订阅请求
  2. 接收客户端的发布请求,根据频道号,将消息推送到相关订阅者。
  3. 支持多种数据结构:

发布订阅模型
hash表共享存储
基于SplMaxHeap的优先队列
消息广播

dispatcher使用内存存储hash/队列数据,规划前要考虑到内存容量限制的问题。 生产环境建议使用数据库/缓存系统/队列系统来存储数据。dispatcher最适合的就场景就是分发消息而不是存储数据。

考虑到dipatch server仅仅只是转发消息,并不参与复杂的业务逻辑处理,它本身即便以单进程方式运行,也很难形成瓶颈。如果业务量较大,单个dispatcher server无法支持,那么完全可以部署多个dispatcher,实现负载均衡架构。

dispatcher server的配置
config/main.php

'components'=>[
    'server'=>[
        'class' => 'beyoio\Server',
        'worker_num'=>1, //以单进程方式运行,保证数据是共享唯一的
        'listeners' => [
            'class' => 'beyoio\Listener',
            'listen'=>'tcp://0.0.0.0:2491'
            'handler'=>'beyoio\dispatcher\Handler',
            'parser' =>'beyoio\dispatcher\Parser'
        ],
    ]
]

客户端
例如:我们可以在worker启动之后,就订阅频道并准消息的接收处理(假设dispatcher的ip为192.168.0.101):
appMyServer.php中的内容:

namespace app;

use beyoio\Server;
use beyoio\dispatcher\Client;

class MyServer extends Server
{
    public $client;
    
    //此回调在工作进程启动后执行
    public function onWorkerStart($workerId, $GPID)
    {
        parent::onWorkerStart($workerId, $GPID);
        $this->connectDispatcher($GPID);
    }
    
    
    /**
     * 连接订阅服务器
     */
    protected function connectDispatcher()
    {
        $options = [
            'target' => 'tcp://192.168.0.101:2491',
            'reconnect_interval' => 6, //连接被对方断开后,6秒后重连。
        ];
        
        $this->client = new Client($options);
        $this->client->default_channel=$this->getGpid();//连接成功后,即订阅频道,频道号即为GPID
        
        
        $this->client->on(Server::ON_MESSAGE, function(MessageEvent $event){
            print_r($event->message); //收到消息时的回调
        });
        
        //连接到分发服务器。
        $this->client->connect();
    }
}

beyoiodispatcherClient

beyoiodispatcherClient分发器客户端连接组件:

<?php

namespace beyoio\dispatcher;

use beyoio\StreamClient;
use beyoio\Server;
use beyoio\IOEvent;

class Client extends StreamClient
{

    public $reconnect_interval = 3; //连接异常中断后的重连间隔3秒
    
    //连接后自动订阅的频道
    public $default_channel = [];
    
    /**
     * 订阅一个或多个频道
     * @param array|string $keys The channel names to subscribe to
     */
    public function subscribe($keys)
    {
        return $this->send(['command'=>'subscribe', 'key'=>$keys, 'seq'=>$this->getSequence()]);
    }
    
    /**
     * 取消订阅一个或多个频道
     * @param array|string $keys The channel names to cancel
     */
    public function unsubscribe($keys)
    {
        return $this->send(['command'=>'unsubscribe', 'key'=>$keys, 'seq'=>$this->getSequence()]);
    }
    
    /**
     * 向一个或多个频道上推送消息
     * @param string|array $key
     * @param mixed $value
     */
    public function publish($key, $value)
    {
        return $this->send(['command'=>'publish', 'key'=>$keys, 'value'=>$value, 'seq'=>$this->getSequence()]);
    }
    
    /**
     * 向所有频道广播消息
     * @param mixed $value
     */
    public function broadcat($value)
    {
        return $this->send(['command'=>'broadcat','value'=>$value, 'seq'=>$this->getSequence()]);
    }
    
    
    /**
     * 获取已经订阅的频道集合
     */
    public function channels()
    {
        $this->send(['command' => 'channels', 'seq'=>$this->getSequence()]);
    }
    
    
    /**
     * 设置一个hash表条目
     * @param string $key
     * @param mixed $value
     * @param int $expire 缓存过期绝对时间戳 0表示永不过期
     */
    public function set($key, $value, $expire=0)
    {
        return $this->send(['command'=>'set', 'key'=>$key, 'value'=>$value, 'seq'=>$this->getSequence()]);
    }
    
    /**
     * 读取一个hash表条目
     * @param string $key
     */
    public function get($key)
    {
        return $this->send(['command'=>'get', 'key'=>$key, 'seq'=>$this->getSequence()]);
    }
    
    /**
     * 删除一个hash表条目
     * @param string $key
     */    
    public function delete($key)
    {
        return $this->send(['command'=>'delete', 'key'=>$key, 'seq'=>$this->getSequence()]);
    }
    
    //删除所有hash条目
    public function deleteAll()
    {
        return $this->send(['command'=>'deleteall','seq'=>$this->getSequence()]);
    }
    
    
    /**
     * 向队列中追加记录
     * @param string $key queue name
     * @param mixed $value data
     * @param int $priority 
     * @see http://php.net/manual/en/splpriorityqueue.insert.php
     */
    public function qpush($key, $value)
    {
        return $this->send(['command'=>'qpush', 'key'=>$key,'value'=>$value, 'seq'=>$this->getSequence()]);
    }
    
    /**
     * 从队列中取出记录
     * @param string $key queue name
     * @param string $block whether enable blocking mode. In blocking mode, the server returns only if there is data in the queue
     * @return void|boolean|number
     */
    public function qpop($key, $block=false){
        return $this->send(['command'=>'qpop', 'key'=>$key, 'block'=>$block, 'seq'=>$this->getSequence()]);
    }
    
    /**
     * 清空指定队列
     * @param string $key queue name
     */
    public function qdelete($key)
    {
        return $this->send(['command'=>'qdelete', 'key'=>$key, 'seq'=>$this->getSequence()]);
    }
    
    /**
     * 清空所有队列
     * @param string $key queue name
     */
    public function qdeleteAll($key)
    {
        return $this->send(['command'=>'qdeleteall', 'key'=>$key, 'seq'=>$this->getSequence()]);
    }
}

协议格式
dispatcher消息使用二进制json格式,由包头和数据部分组成。使用json格式而不是PHP序列化,就很容易兼容多种编程语言。
包头是一个4字节无符号的整数,指明了整个数据包的长度,数据部分即json字符串。以下描述中,省去包头部分,着重关注数据部分:

  1. 订阅请求

{"command":"subscribe", "key": [], "seq":""}

key:是一个字符串数组,指定了要订阅的频道    
seq:当前消息的唯一标识,由client自动生成。   

响应结果:
{"command":"subscribe","key":[], "code":0,"message":"ok", "seq":""}

  1. 取消订阅:

{"command":"unsubscribe", "key": [], "seq":""}
响应结果:
{"command":"unsubscribe","key":[], "code":0,"message":"ok", "seq":""}

  1. 发布消息

{"command":"publish", "key": [],"value": "", "seq":""}
value即为消息内容
响应结果:
{"command":"publish", "key": [],"value": "", "seq":""}

  1. 频道消息结构:

{"command":"message", "value":"", "source":"消息发送者的ip和端口"}