V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
onanying
V2EX  ›  PHP

Mix PHP V2 实例: AliCloud 短信协程池异步发送守护程序

  •  
  •   onanying · 2019-05-24 13:12:28 +08:00 · 2977 次点击
    这是一个创建于 2059 天前的主题,其中的信息可能已经有所发展或是发生改变。

    前些时间我们发布了 Mix PHP V2 实例:协程池异步邮件发送守护程序 范例,这一次我们提供一个使用大厂 SDK 通过 Swoole Hook 协程化来并行执行短信发送任务,本文是一个代码简单、IO 性能极强的范例。

    请先升级到 mix-framework >= v2.0.5

    本范例依然使用消息队列的方式接收短信发送任务,消息中间件使用:

    • redis

    生产者

    通常框架中使用 Redis 会安装一个类库来使用,本例使用原生代码,便于理解。

    // 连接
    $redis = new \Redis();
    if (!$redis->connect('127.0.0.1', 6379)) {
        throw new \Exception('Redis connect failed.');
    }
    $redis->auth('');
    $redis->select(0);
    // 投递任务
    for($i = 0; $i < 3; $i++){
    	$data = [
    	    'phone'         => '***',
    	    'templateCode'  => 'SMS_***',
    	    'templateParam' => ['code' => 123456],
    	];
    	$redis->lpush('queue:sms', serialize($data));
    }
    

    消费者

    使用的是 ali 云的短信服务,查看官方 PHP SDK 文档 ,使用的库为:

    composer require alibabacloud/client
    

    通过查看该库的 composer 依赖文件,我们得知该库基于 guzzlehttp 开发,因为 Mix PHP 提供了无需修改代码就可 Hook Guzzle 库可在协程中使用的工具 Mix PHP V2 生态:让 Guzzle 支持 Swoole 的 Hook 协程,所以能基本确定该库可在 Swoole 协程中使用。

    首先我们安装 https://github.com/mix-php/guzzle-hookalibabacloud/client 可在协程中使用:

    composer require mix/guzzle-hook
    

    然后在项目的 composer.json 文件中增加 extra 配置项,如下:

    "extra": {
        "include_files": [
          "vendor/mix/guzzle-hook/src/functions_include.php"
        ]
    }
    

    更新自动加载:

    composer dump-autoload
    

    下面我们采用 Mix PHP V2 的守护程序、协程池来完成一个超高性能的短信发送程序。

    首先我们在配置 applications/console/config/main.php 中注册一个命令:

    // 命令
    'commands'         => [
    
            'smser' => [
                'Smser',
                'description' => "SMS send daemon demo.",
                'options'     => [
                    [['d', 'daemon'], 'description' => 'Run in the background'],
                ],
            ],
    
    ],
    

    注册的命令中指定的 Smser 命令类,接下来我们编写一个 SmserCommand 类:

    applications/console/src/Commands/SmserCommand.php
    
    <?php
    
    namespace Console\Commands;
    
    use Console\Libraries\SmserWorker;
    use Mix\Concurrent\CoroutinePool\Dispatcher;
    use Mix\Console\CommandLine\Flag;
    use Mix\Core\Coroutine;
    use Mix\Core\Coroutine\Channel;
    use Mix\Core\Event;
    use Mix\Helper\ProcessHelper;
    use AlibabaCloud\Client\AlibabaCloud;
    
    /**
     * Class SmserCommand
     * @package Daemon\Commands
     * @author liu,jian <[email protected]>
     */
    class SmserCommand
    {
    
        const ACCESS_KEY = '***';
        const ACCESS_SECRET = '***';
    
        /**
         * 退出
         * @var bool
         */
        public $quit = false;
    
        /**
         * 主函数
         */
        public function main()
        {
            // 守护处理
            $daemon = Flag::bool(['d', 'daemon'], false);
            if ($daemon) {
                ProcessHelper::daemon();
            }
            // 捕获信号
            ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], function ($signal) {
                $this->quit = true;
                ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], null);
            });
            // 设置 ali 云全局参数
            AlibabaCloud::accessKeyClient(static::ACCESS_KEY, static::ACCESS_SECRET)->regionId('cn-hangzhou')->asDefaultClient();
            // 手动关闭 Swoole 文件 Hook,因为 ali 云依赖的 uuid 库有文件 hook 协程兼容问题,Swoole 4.4 已经适配该问题
            Coroutine::enableHook(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_FILE);
            // 协程池执行任务
            xgo(function () {
                $maxWorkers = 20;
                $maxQueue   = 20;
                $jobQueue   = new Channel($maxQueue);
                $dispatch   = new Dispatcher([
                    'jobQueue'   => $jobQueue,
                    'maxWorkers' => $maxWorkers,
                ]);
                $dispatch->start(SmserWorker::class);
                // 投放任务
                $redis = app()->redisPool->getConnection();
                while (true) {
                    if ($this->quit) {
                        $dispatch->stop();
                        return;
                    }
                    try {
                        $data = $redis->brPop(['queue:sms'], 3);
                    } catch (\Throwable $e) {
                        $dispatch->stop();
                        return;
                    }
                    if (!$data) {
                        continue;
                    }
                    $data = array_pop($data); // brPop 命令最后一个键才是值
                    $jobQueue->push($data);
                }
            });
            // 等待事件
            Event::wait();
        }
    
    }
    

    $data = $redis->brPop(['queue:sms'], 3); 外部的异常捕获可得知,当 Redis 连接出错时,比如 Redis 重启、连接异常时协程池会安全退出,也就是说当进程异常退出后用户需使用 supervisorpm2 等工具重启守护进程。

    上面是一个 Mix PHP 协程池的使用代码,基本可以直接复制使用,框架默认包含了协程池的 Demo,本次实例只是修改了协程池的 Worker,本命令主要是完成从 Redis 队列中获取消息然后 push 到 jobQueue 中,jobQueue 中的数据会被 20 个 Worker 实例中某一个抢占后并行执行,本例的发送代码逻辑就在 SmserWorker 类中:

    applications/console/src/Libraries/SmserWorker.php
    
    <?php
    
    namespace Console\Libraries;
    
    use Mix\Concurrent\CoroutinePool\AbstractWorker;
    use Mix\Concurrent\CoroutinePool\WorkerInterface;
    
    /**
     * Class SmserWorker
     * @package Daemon\Libraries
     * @author liu,jian <[email protected]>
     */
    class SmserWorker extends AbstractWorker implements WorkerInterface
    {
    
        /**
         * 邮件发送器
         * @var Smser
         */
        public $smser;
    
        /**
         * 初始化事件
         */
        public function onInitialize()
        {
            parent::onInitialize(); // TODO: Change the autogenerated stub
            // 实例化一些需重用的对象
            $this->smser = new Smser();
        }
    
        /**
         * 处理
         * @param $data
         */
        public function handle($data)
        {
            // TODO: Implement handle() method.
            $data = unserialize($data);
            if (empty($data)) {
                return;
            }
            try {
                $result = $this->smser->send($data['phone'], $data['templateCode'], $data['templateParam']);
                app()->log->info("SMS sent successfully:phone {phone} templateCode {templateCode} result {result}", array_merge($data, ['result' => json_encode($result, JSON_UNESCAPED_UNICODE)]));
            } catch (\Throwable $e) {
                app()->log->error("SMS failed to send:phone {phone} templateCode {templateCode} error {error}", array_merge($data, ['error' => $e->getMessage()]));
            }
        }
    
    }
    

    由以上代码可见,Worker 在初始化时,新增了一个 Smser 类的属性,当 jobQueue 消息投递过来时消息会传递到 handle 方法,在该方法中使用 Mailer 类的实例完成邮件发送任务,所以我们要编写了一个 Smser 发送程序:

    applications/console/src/Libraries/Smser.php
    
    <?php
    
    namespace Console\Libraries;
    
    use AlibabaCloud\Client\AlibabaCloud;
    use AlibabaCloud\Client\Exception\ClientException;
    use AlibabaCloud\Client\Exception\ServerException;
    use Mix\Core\Coroutine;
    
    /**
     * Class Smser
     * @package Console\Libraries
     * @author liu,jian <[email protected]>
     */
    class Smser
    {
    
        /**
         * 配置信息
         */
        const SIGN_NAME = '***';
    
        /**
         * Smser constructor.
         */
        public function __construct()
        {
            // 开启协程钩子
            Coroutine::enableHook();
        }
    
        /**
         * 发送
         * @param $phone
         * @param $templateCode
         * @param $templateParam
         * @return array
         * @throws ClientException
         * @throws ServerException
         */
        public function send($phone, $templateCode, $templateParam)
        {
            $result = AlibabaCloud::rpc()
                ->product('Dysmsapi')
                // ->scheme('https') // https | http
                ->version('2017-05-25')
                ->action('SendSms')
                ->method('POST')
                ->options([
                    'query' => [
                        'PhoneNumbers'  => $phone,
                        'SignName'      => static::SIGN_NAME,
                        'TemplateCode'  => $templateCode,
                        'TemplateParam' => json_encode($templateParam),
                    ],
                ])
                ->request();
            return $result->toArray();
        }
    
    }
    

    以上就完成了全部的代码逻辑,现在我们开始测试,先启动消费者守护程序:

    [root@localhost bin]# ./mix-console smser
    

    将上文的生产者脚本命名为 push.php 然后在 CLI 中执行 (开一个新终端):

    [root@localhost bin]# php /tmp/push.php
    

    消费者守护程序结果:

    [root@localhost bin]# ./mix-console smser
    [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"OK","RequestId":"4071D031-6D9E-4F70-9269-6C1979080858","BizId":"939807358670612546^0","Code":"OK"}
    [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"触发分钟级流控 Permits:1","RequestId":"490B73D7-317E-4362-B2DD-5E2153A7B891","Code":"isv.BUSINESS_LIMIT_CONTROL"}
    [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"触发分钟级流控 Permits:1","RequestId":"1FD22EDB-BAA4-4416-8FF9-242EDCF34359","Code":"isv.BUSINESS_LIMIT_CONTROL"}
    

    命令行终端打印了发送成功的日志,发送完成。

    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2688 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 15:04 · PVG 23:04 · LAX 07:04 · JFK 10:04
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.