$qpsChannel = new Channel();
go(function () use ($qpsChannel, $configs) {
$waitGroup = new WaitGroup();
foreach ($configs as $key => $config) {
$waitGroup->add();
go(function () use ($qpsChannel, $waitGroup, $key, $config) {
if ($this->skipConfigByStatus($config) === false
&& $this->skipConfigByQps($config) === false) {
$qpsChannel->push('111111');
} else {
$qpsChannel->push('222222');
}
$waitGroup->done();
});
}
$waitGroup->wait();
$qpsChannel->close();
});
while (true) {
$qpsStatusArr = $qpsChannel->pop();
var_dump($qpsStatusArr);
if ($qpsStatusArr === false) break;
}
上面代码逻辑是打开一个子协程去循环判断一些逻辑,并把结果写入到 channel 中,处理完之后,从子协程把 channle 关闭(关闭之后再操作 channel 就会返回 false)
正常的逻辑应该是打开子协程去处理逻辑,然后进入到 while 去读取 channle,当子协程有 channel push 时,while 中的逻辑会把结果打印出来
现在遇到的问题是只打印出一个 false,就结束了。难道是子协程处理时间太快了,还没走到 while,channle 就被 close 了?
问题 1:求大神解答上面代码问题。
问题 2:大家都是怎么在代码中用子协程去节省时间呢?写法是什么样的呢?
可能区别在于golang中的chan和swoole中的channel
在golang中chan要在生产者方关闭chan,当chan被close之后,for range还是可以读取到chan中未消费的数据。
在swoole中,当channel被close之后,读取不到未消费的数据。
swoole文档中介绍:
Close()方法:唤醒所有生产者协程,push 方法返回 false;唤醒所有消费者协程,pop 方法返回 false
下面为修改过后的版本:
$qpsChannel = new Channel();
go(function () use ($qpsChannel, $configs) {
$waitGroup = new WaitGroup();
foreach ($configs as $key => $config) {
$waitGroup->add();
go(function () use ($qpsChannel, $waitGroup, $key, $config) {
if ($this->skipConfigByStatus($config) === false
&& $this->skipConfigByQps($config) === false) {
$qpsChannel->push([$key => false]);
} else {
$qpsChannel->push([$key => true]);
}
$waitGroup->done();
});
}
$waitGroup->wait();
$qpsChannel->push(self::CHANNEL_CLOSE);
});
while (true) {
$qpsStatus = $qpsChannel->pop();
if ($qpsStatus === self::CHANNEL_CLOSE) {
$qpsChannel->close();
break;
}
if (is_array($qpsStatus)) {
$configKey = array_keys($qpsStatus)[0];
$configSkip = array_values($qpsStatus)[0];
if ($configSkip === true) unset($configs[$configKey]);
}
}
在生产者方push一个结束的标识,从而让消费者方退出阻塞和关闭channel