本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.
以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php
<?php
namespace AppTools;
use IlluminateConfigRepository;
use IlluminateSupportFacadesDB;
use MonologLogger;
use MonologHandlerStreamHandler;
use IlluminateHttpRequest;
class Kafka
{
public $broker_list = "127.0.0.1";//配置kafka,可以用逗号隔开多个kafka
public $topic = "test";//管道名称
public $partition = 0;
protected $producer = null;
protected $consumer = null;
public function __construct()
{
if (empty($this->broker_list)) {
throw new InvalidConfigException("broker not config");
}
$rk = new RdKafkaProducer();
if (empty($rk)) {
throw new InvalidConfigException("producer error");
}
$rk->setLogLevel(LOG_DEBUG);
if (!$rk->addBrokers($this->broker_list)) {
throw new InvalidConfigException("producer error");
}
$this->producer = $rk;
}
/**
* 生产者
* @param array $messages
* @return mixed
*/
public function send($messages = [],$topic)
{
$topic = $this->producer->newTopic($topic);
return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
}
/**
* 消费者
*/
public function consumer($object, $callback){
$conf = new RdKafkaConf();
$conf->set("group.id", 0);
$conf->set("metadata.broker.list", $this->broker_list);
$topicConf = new RdKafkaTopicConf();
$topicConf->set("auto.offset.reset", "smallest");
$conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafkaKafkaConsumer($conf);
$consumer->subscribe([$this->topic]);
echo "waiting for messages.....n";
while(true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo "message payload....";
$object->$callback($message->payload);
break;
}
sleep(1);
}
}
}
?>
在控制器中如何使用:
首先再头部导入这个类:use AppToolsKafka;
下面是使用生产者实例:
public function test(){
$topic = "tool";//输入使用管道名称
$data["shop_id"] = 58;
$data["bar_code"]=586;
$data["goods_num"] = 1;
$data["goods_unit"] = "个";
$Kafka = new Kafka();
$Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json
var_dump($Error_Msg);
}
下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:
<?php
$conf = new RdKafkaConf();
$conf->set("group.id", "myConsumerGroup");
$rk = new RdKafkaConsumer($conf);
$rk->addBrokers("localhost:9092");
$topicConf = new RdKafkaTopicConf();
$topicConf->set("auto.commit.interval.ms", 100);
$topicConf->set("offset.store.method", "file");
$topicConf->set("offset.store.path", sys_get_temp_dir());
$topicConf->set("auto.offset.reset", "smallest");
$topic = $rk->newTopic("tool", $topicConf);//读取的管道
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 120*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
//没有错误打印信息
$message = json_decode(json_encode($message),true);
$data = json_decode($message["payload"],true);
var_dump($data);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "等待接收信息n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "超时n";
break;
default:
throw new Exception($message->errstr(), $message->err);
break;
}
sleep(1);
}
?>
到此这篇关于Laravel中Kafka的使用详解的文章就介绍到这了,更多相关Laravel中Kafka内容请搜索IT博客社区以前的文章或继续浏览下面的相关文章希望大家以后多多支持IT博客社区!
没有更多内容。


711