1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
| <?php class taskServer{ const HOST = "127.0.0.1"; const PORT = 9501; public $server = null;
public function __construct() { $this->server = new SWoole\Server(self::HOST, self::PORT); $this->server->set(array( "enable_coroutine" => false, // 关闭协程 "worker_num" => 2, // 开启的进程数 一般为cup核数 1-4 倍 "task_worker_num" => 2, // task进程的数量 'daemonize' => true, // 以守护进程的方式启动 ));
// 注册事件 $this->server->on("connect", [$this, "onConnect"]); $this->server->on("receive", [$this, "onReceive"]); $this->server->on("close", [$this, "onClose"]); $this->server->on("task", [$this, "onTask"]); $this->server->on("finish", [$this, "onFinish"]);
// 启用服务 $this->server->start(); }
/** * 监听连接事件 * @param $server * @param $fd */ public function onConnect($server, $fd){ echo "连接成功".PHP_EOL; }
/** * 监听客户端发送的消息 * @param $server "Server 对象" * @param $fd "唯一标示" * @param $form_id * @param $data "客户端发送的数据" */ public function onReceive($server, $fd, $form_id, $data){ // 投递任务 $server->task($data); $server->send($fd, "这是客户端向服务端发送的信息:{$data}"); }
/** * 监听异步任务task事件 * @param $server * @param $task_id * @param $worker_id * @param $data * @return string */ public function onTask($server, $task_id, $worker_id, $data){ $data = json_decode($data, true); echo "开始执行异步任务".PHP_EOL; try { // 开始执行任务 $this->addLog(date('Y-m-d H:i:s')."开始执行任务".PHP_EOL ); // 通知worker(必须 return,否则不会调用 onFinish) return $this->curl($data['url'], $data['data'], $data['type']); } catch (Exception $exception) { // 执行任务失败 $this->addLog(date('Y-m-d H:i:s')."执行任务失败".PHP_EOL); } }
/** * 监听finish 事件 * @param $server * @param $task_id * @param $data */ public function onFinish($server, $task_id, $data){ $this->addLog(date("Y-m-d H:i:s")."异步任务执行完成".PHP_EOL); print_r( "来自服务端的消息:{$data}"); }
/** * 监听关闭连接事件 * @param $server * @param $fd */ public function onClose($server, $fd){ echo "关闭TCP 连接".PHP_EOL; }
/** * 发起Get 或 Post 请求 * @param string $url 请求地址 * @param array $request_data 请求参数 * @param string $request_type 请求类型 * @param array $headers 头信息 * @param bool $is_ssl 是否是ssl * @return bool|string */ public function curl($url = '', $request_data = [], $request_type = 'get', $headers = [], $is_ssl = false) { $curl = curl_init (); // 初始化 // 设置 URL curl_setopt($curl, CURLOPT_URL, $url); // 不返回 Response 头部信息 curl_setopt ( $curl, CURLOPT_HEADER, 0 ); // 如果成功只将结果返回,不自动输出任何内容 curl_setopt ( $curl, CURLOPT_RETURNTRANSFER, 1 ); // 设置请求参数 curl_setopt ( $curl, CURLOPT_POSTFIELDS, http_build_query($request_data)); // TRUE 时追踪句柄的请求字符串 curl_setopt($curl, CURLINFO_HEADER_OUT, true); // Post 类型增加以下处理 if( $request_type == 'post') { // 设置为POST方式 curl_setopt ( $curl, CURLOPT_POST, 1 ); // 设置头信息 curl_setopt($curl, CURLOPT_HTTPHEADER, array('Content-Type: application/json', 'Content-Length:' . strlen(json_encode($request_data)))); // 设置请求参数 curl_setopt ( $curl, CURLOPT_POSTFIELDS, json_encode($request_data)); // 当POST 数据大于1024 时强制执行 curl_setopt ( $curl, CURLOPT_HTTPHEADER, array("Expect:")); }
// 判断是否绕过证书 if( $is_ssl ) { //绕过ssl验证 curl_setopt($curl, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($curl, CURLOPT_SSL_VERIFYHOST, false); } if(!empty($headers)) curl_setopt($curl, CURLOPT_HTTPHEADER, $headers); // 执行 $result = curl_exec ( $curl ); if ( $result == FALSE) return false; // 关闭资源 curl_close ( $curl ); return $result; }
/** * 写入日志 * @param $content */ public function addLog($content){ $path = dirname(__FILE__)."/logs/"; if (!is_dir($path)) mkdir($path,0777,true);
$file_name = $path.date("Y_m_d") . ".log"; if (!file_exists($file_name)) { touch($file_name); chown($file_name, "root"); }
$file_log = fopen($file_name, "a"); fputs($file_log, $content); fclose($file_log); } }
$server = new taskServer();
|