小艾的自留地

Stay foolish, Stay hungry

最近接手一个对接短信的需求,这个需求本身并没有什么难度,直接按照服务商的要求请求具体的接口就好了。

最开始是使用传统的同步阻塞方式实现了一遍,用户体验并不好,发送短信需要等待,等待服务商的接口返回内容,才继续向下执行。

因为最近在学习Swoole,Swoole 中有一个“异步任务”,就特别适合以下应用场景:

  1. 需要执行耗时操作,会阻塞主进程
  2. 用户不需要等待返回结果

结合官网手册和Latent基于 swoole 下 异步消息队列 API,最终简单封装了一个处理API 的类,实现如下:

服务端

服务端是基于本地Tcp,监听9501端口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
<?php
class taskServer{
const HOST = "127.0.0.1";
const PORT = 9501;
public $server = null;

public function __construct()
{
$this->server = new SWoole\Server(self::HOST, self::PORT);
$this->server->set(array(
"enable_coroutine" => false, // 关闭协程
"worker_num" => 2, // 开启的进程数 一般为cup核数 1-4 倍
"task_worker_num" => 2, // task进程的数量
'daemonize' => true, // 以守护进程的方式启动
));

// 注册事件
$this->server->on("connect", [$this, "onConnect"]);
$this->server->on("receive", [$this, "onReceive"]);
$this->server->on("close", [$this, "onClose"]);
$this->server->on("task", [$this, "onTask"]);
$this->server->on("finish", [$this, "onFinish"]);

// 启用服务
$this->server->start();
}

/**
* 监听连接事件
* @param $server
* @param $fd
*/
public function onConnect($server, $fd){
echo "连接成功".PHP_EOL;
}

/**
* 监听客户端发送的消息
* @param $server "Server 对象"
* @param $fd "唯一标示"
* @param $form_id
* @param $data "客户端发送的数据"
*/
public function onReceive($server, $fd, $form_id, $data){
// 投递任务
$server->task($data);
$server->send($fd, "这是客户端向服务端发送的信息:{$data}");
}

/**
* 监听异步任务task事件
* @param $server
* @param $task_id
* @param $worker_id
* @param $data
* @return string
*/
public function onTask($server, $task_id, $worker_id, $data){
$data = json_decode($data, true);
echo "开始执行异步任务".PHP_EOL;
try {
// 开始执行任务
$this->addLog(date('Y-m-d H:i:s')."开始执行任务".PHP_EOL );
// 通知worker(必须 return,否则不会调用 onFinish)
return $this->curl($data['url'], $data['data'], $data['type']);
} catch (Exception $exception) {
// 执行任务失败
$this->addLog(date('Y-m-d H:i:s')."执行任务失败".PHP_EOL);
}
}

/**
* 监听finish 事件
* @param $server
* @param $task_id
* @param $data
*/
public function onFinish($server, $task_id, $data){
$this->addLog(date("Y-m-d H:i:s")."异步任务执行完成".PHP_EOL);
print_r( "来自服务端的消息:{$data}");
}

/**
* 监听关闭连接事件
* @param $server
* @param $fd
*/
public function onClose($server, $fd){
echo "关闭TCP 连接".PHP_EOL;
}

/**
* 发起Get 或 Post 请求
* @param string $url 请求地址
* @param array $request_data 请求参数
* @param string $request_type 请求类型
* @param array $headers 头信息
* @param bool $is_ssl 是否是ssl
* @return bool|string
*/
public function curl($url = '', $request_data = [], $request_type = 'get', $headers = [], $is_ssl = false)
{
$curl = curl_init (); // 初始化
// 设置 URL
curl_setopt($curl, CURLOPT_URL, $url);
// 不返回 Response 头部信息
curl_setopt ( $curl, CURLOPT_HEADER, 0 );
// 如果成功只将结果返回,不自动输出任何内容
curl_setopt ( $curl, CURLOPT_RETURNTRANSFER, 1 );
// 设置请求参数
curl_setopt ( $curl, CURLOPT_POSTFIELDS, http_build_query($request_data));
// TRUE 时追踪句柄的请求字符串
curl_setopt($curl, CURLINFO_HEADER_OUT, true);
// Post 类型增加以下处理
if( $request_type == 'post') {
// 设置为POST方式
curl_setopt ( $curl, CURLOPT_POST, 1 );
// 设置头信息
curl_setopt($curl, CURLOPT_HTTPHEADER, array('Content-Type: application/json', 'Content-Length:' . strlen(json_encode($request_data))));
// 设置请求参数
curl_setopt ( $curl, CURLOPT_POSTFIELDS, json_encode($request_data));
// 当POST 数据大于1024 时强制执行
curl_setopt ( $curl, CURLOPT_HTTPHEADER, array("Expect:"));
}

// 判断是否绕过证书
if( $is_ssl ) {
//绕过ssl验证
curl_setopt($curl, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($curl, CURLOPT_SSL_VERIFYHOST, false);
}
if(!empty($headers)) curl_setopt($curl, CURLOPT_HTTPHEADER, $headers);
// 执行
$result = curl_exec ( $curl );
if ( $result == FALSE) return false;
// 关闭资源
curl_close ( $curl );
return $result;
}

/**
* 写入日志
* @param $content
*/
public function addLog($content){
$path = dirname(__FILE__)."/logs/";
if (!is_dir($path))
mkdir($path,0777,true);

$file_name = $path.date("Y_m_d") . ".log";
if (!file_exists($file_name)) {
touch($file_name);
chown($file_name, "root");
}

$file_log = fopen($file_name, "a");
fputs($file_log, $content);
fclose($file_log);
}
}

$server = new taskServer();

客户端

这里的客户端可以是 cli 脚本,也可以是对应控制器中的具体方法,只要能连接Swoole 监听的Tcp 就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php
namespace app\admin\controller;

class Index extends Base
{
public function index(){
$client = new \Swoole\Client(SWOOLE_SOCK_TCP);
if (!$client->connect('0.0.0.0', 9501)) {
return json("connect failed. Error: {$client->errCode}\n");
}
$data = [
"url" => "https://api.paasoo.com/json",
"data" => [
"key" => "key",
"secret" => "secret",
"from" => "sms",
"to" => "mobile_phone",
"text" => "test",
],
"type" => "get"
];
$client->send(json_encode($data));
return json($client->recv());
}
}

参考链接

评论