您现在的位置: 365建站网 > 365文章 > PHP中fsockopen函数多线程的实现方法

PHP中fsockopen函数多线程的实现方法

文章来源:365jz.com     点击数:520    更新时间:2017-12-17 09:07   参与评论

PHP本身是不是支持多线程的,不过我们可以借助其他的方法来实现多线程,比如 shell 服务,比如 web 服务器,本文我们来讲讲这两个方法如何实现。需要的朋友可以来参考一下。

多线程是java中一个很不错的东西,很多朋友说在php中不可以使用PHP多线程了,其实那是错误的说法PHP多线程实现方法和fsockopen函数有关,下面我们来介绍具体实现程序代码,有需要了解的同学可参考。

当有人想要实现并发功能时,他们通常会想到用fork或者spawn threads,但是当他们发现php不支持多线程的时候,大概会转换思路去用一些不够好的语言,比如perl。

其实的是大多数情况下,你大可不必使用 fork 或者线程,并且你会得到比用 fork 或 thread 更好的性能。

假设你要建立一个服务来检查正在运行的n台服务器,以确定他们还在正常运转。你可能会写下面这样的代码:

代码如下:

</>code

  1. <?php
  2. $hosts = array("host1.sample.com", "host2.sample.com", "host3.sample.com");
  3. $timeout = 15;
  4. $status = array();
  5. foreach ($hosts as $host) {
  6. $errno = 0;
  7. $errstr = "";
  8. $s = fsockopen($host, 80, $errno, $errstr, $timeout);
  9. if ($s) {
  10. $status[$host] = "Connectedn";
  11. fwrite($s, "HEAD / HTTP/1.0rnHost: $hostrnrn");
  12. do {
  13. $data = fread($s, 8192);
  14. if (strlen($data) == 0) {
  15. break;
  16. }
  17. $status[$host] .= $data;
  18. } while (true);
  19. fclose($s);
  20. } else {
  21. $status[$host] = "Connection failed: $errno $errstrn";
  22. }
  23. }
  24. print_r($status);
  25. ?>

它运行的很好,但是在fsockopen()分析完hostname并且建立一个成功的连接(或者延时$timeout秒)之前,扩充这段代码来管理大量服务器将耗费很长时间。

因此我们必须放弃这段代码;我们可以建立异步连接-不需要等待fsockopen返回连接状态。PHP仍然需要解析hostname(所以直接使用ip更加明智),不过将在打开一个连接之后立刻返回,继而我们就可以连接下一台服务器。

有两种方法可以实现;PHP5中可以使用新增的stream_socket_client()函数直接替换掉fsocketopen()。PHP5之前的版本,你需要自己动手,用sockets扩展解决问题。

下面是PHP5中的解决方法:

它运行的很好,但是在fsockopen()分析完hostname并且建立一个成功的连接(或者延时$timeout秒)之前,扩充这段代码来管理大量服务器将耗费很长时间。

因此我们必须放弃这段代码;我们可以建立异步连接-不需要等待fsockopen返回连接状态。PHP仍然需要解析hostname(所以直接使用ip更加明智),不过将在打开一个连接之后立刻返回,继而我们就可以连接下一台服务器。

有两种方法可以实现;PHP5中可以使用新增的stream_socket_client()函数直接替换掉fsocketopen()。PHP5之前的版本,你需要自己动手,用sockets扩展解决问题。

 代码如下: 

</>code

  1. <?php
  2. $hosts = array("host1.sample.com", "host2.sample.com", "host3.sample.com");
  3. $timeout = 15;
  4. $status = array();
  5. $sockets = array();
  6. /* Initiate connections to all the hosts simultaneously */
  7. foreach ($hosts as $id => $host) {
  8. $s = stream_socket_client("
  9. $
  10. $host:80", $errno, $errstr, $timeout,
  11. STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
  12. if ($s) {
  13. $sockets[$id] = $s;
  14. $status[$id] = "in progress";
  15. } else {
  16. $status[$id] = "failed, $errno $errstr";
  17. }
  18. }
  19. /* Now, wait for the results to come back in */
  20. while (count($sockets)) {
  21. $read = $write = $sockets;
  22. /* This is the magic function - explained below */
  23. $n = stream_select($read, $write, $e = null, $timeout);
  24. if ($n > 0) {
  25. /* readable sockets either have data for us, or are failed
  26. * connection attempts */
  27. foreach ($read as $r) {
  28. $id = array_search($r, $sockets);
  29. $data = fread($r, 8192);
  30. if (strlen($data) == 0) {
  31. if ($status[$id] == "in progress") {
  32. $status[$id] = "failed to connect";
  33. }
  34. fclose($r);
  35. unset($sockets[$id]);
  36. } else {
  37. $status[$id] .= $data;
  38. }
  39. }
  40. /* writeable sockets can accept an HTTP request */
  41. foreach ($write as $w) {
  42. $id = array_search($w, $sockets);
  43. fwrite($w, "HEAD / HTTP/1.0rnHost: "
  44. . $hosts[$id] . "rnrn");
  45. $status[$id] = "waiting for response";
  46. }
  47. } else {
  48. /* timed out waiting; assume that all hosts associated
  49. * with $sockets are faulty */
  50. foreach ($sockets as $id => $s) {
  51. $status[$id] = "timed out " . $status[$id];
  52. }
  53. break;
  54. }
  55. }
  56. foreach ($hosts as $id => $host) {
  57. echo "Host: $hostn";
  58. echo "Status: " . $status[$id] . "nn";
  59. }
  60. ?>

我们用stream_select()等待sockets打开的连接事件。stream_select()调用系统的select(2)函数来工作:前面三个参数是你要使用的streams的数组;你可以对其读取,写入和获取异常(分别针对三个参数)。stream_select()可以通过设置$timeout(秒)参数来等待事件发生-事件发生时,相应的sockets数据将写入你传入的参数。

下面是PHP4.1.0之后版本的实现,如果你已经在编译PHP时包含了sockets(ext/sockets)支持,你可以使用根上面类似的代码,只是需要将上面的streams/filesystem函数的功能用ext/sockets函数实现。主要的不同在于我们用下面的函数代替stream_socket_client()来建立连接:

代码如下:

</>code

  1. <?php
  2. // This value is correct for Linux, other systems have other values
  3. define('EINPROGRESS', 115);
  4. function non_blocking_connect($host, $port, &$errno, &$errstr, $timeout) {
  5. $ip = gethostbyname($host);
  6. $s = socket_create(AF_INET, SOCK_STREAM, 0);
  7. if (socket_set_nonblock($s)) {
  8. $r = @socket_connect($s, $ip, $port);
  9. if ($r || socket_last_error() == EINPROGRESS) {
  10. $errno = EINPROGRESS;
  11. return $s;
  12. }
  13. }
  14. $errno = socket_last_error($s);
  15. $errstr = socket_strerror($errno);
  16. socket_close($s);
  17. return false;
  18. }
  19. ?>

现在用socket_select()替换掉stream_select(),用socket_read()替换掉fread(),用socket_write()替换掉fwrite(),用socket_close()替换掉fclose()就可以执行脚本了!

PHP5的先进之处在于,你可以用stream_select()处理几乎所有的stream-例如你可以通过include STDIN用它接收键盘输入并保存进数组,你还可以接收通过proc_open()打开的管道中的数据。

下面来分享一个PHP多线程类

代码如下:

</>code

  1. class thread {
  2. var $hooks = array();
  3. var $args = array();
  4. function thread() {
  5. }
  6. function addthread($func)
  7. {
  8. $args = array_slice(func_get_args(), 1);
  9. $this->hooks[] = $func;
  10. $this->args[] = $args;
  11. return true;
  12. }
  13. function runthread()
  14. {
  15. if(isset($_GET['flag']))
  16. {
  17. $flag = intval($_GET['flag']);
  18. }
  19. if($flag || $flag === 0)
  20. {
  21. call_user_func_array($this->hooks[$flag], $this->args[$flag]);
  22. }
  23. else
  24. {
  25. for($i = 0, $size = count($this->hooks); $i < $size; $i++)
  26. {
  27. $fp=fsockopen($_SERVER['HTTP_HOST'],$_SERVER['SERVER_PORT']);
  28. if($fp)
  29. {
  30. $out = "GET {$_SERVER['PHP_SELF']}?flag=$i HTTP/1.1rn";
  31. $out .= "Host: {$_SERVER['HTTP_HOST']}rn";
  32. $out .= "Connection: Closernrn";
  33. fputs($fp,$out);
  34. fclose($fp);
  35. }
  36. }
  37. }
  38. }
  39. }

 

项目情况:

要同步300W+的用户数据到qcloud,只能每次一个curl同步,大概每秒同步3个,算下来同步完300W数据估计要10天+,所以想到用多线程解决。

方案1:

用c++写多进程方案,fork多个进程出来解决。主线程负责读取/存储数据,子线程负责curl。

 

方案2:

用php写多进程方案,同方案1.主线程读取,子线程curl。

但是由于php没有多进程,不能直接操控线程/进程。所以只能依赖于linux来实现多进程。

php函数pcntl_fork()可以创建进程,等同于linux的fork。

和fork不同的是,pcntl_fork返回的0是子进程,返回的id是子进程的pid(而fork是父进程返回0,子进程返回ppid).

注意pcntl_fork()函数必须要在linux上面才行,据说要加载pcntl.so模块,但貌似不加载也行。

 

方案3:

php+shell模拟多线程

例如:test.php文件实现了项目所需功能(包含数据库读写和curl)

再写一份shell如下,保存为start.sh:

#!/bin/bash  //指定bash,必须

for(( i=0; i<20; i++))
do
    php test.php &    //执行test.php,&符号的意思是把该操作放在后台执行,这样shell就可以继续执行下一步sleep命令了。如果没有这个符号,shell会阻塞在这里。
    sleep 1s
done

最后执行该shell,sudo ./start.sh

shell会启动20个进程同时分别执行test.php

但是由于每个进程都相当于主进程,所以共同的资源不好控制,这个case中,共同的资源只有数据库,所以每次操作数据库我都加锁详见我另一篇文章

 因为相对于数据库操作基本都是每秒能解决,而curl连续发个几百个需要几十秒甚至更多,所以数据库加锁影响不大。
 

我们在做项目的时候,有些需求,特别是数据的响应处理需要花费大量的时间,由于php是一个短生命周期的脚本语言,到了默认的30秒,php的数据处理还没完成,php的生命周期就结束了。这时需要使用异步并发处理策略,也就是说,一次php调用可以发出的多个请求,这些请求不是按照顺序执行,而是可以异步并发执行的,一些请求用于在后台处理数据,一些请求用于接受后台响应状态,根据状态,与用户做一些简单的交互。但是问题来了,我们都知道php本身是不支持多线程的,那么应该怎么实现php的多线程呢?

 

一、php模拟实现多线程的三种方法

 

1、linux下的php多线程

下面所讲的东西是源自php的pcntl_fork函数.因为这个函数依赖操作系统fork的实现,所以本文所讲的东西只适用于linux/unix。那么先看看这个函数的用法吧.php手册上是这么说的:

 

</>code

  1. <?php
  2. $pid = pcntl_fork();
  3. if ($pid == -1) {
  4. die('could not fork');
  5. } else if ($pid) {
  6. // we are the parent
  7. pcntl_wait($status); //Protect against Zombie children
  8. } else {
  9. // we are the child
  10. }
  11. ?>

 

 

通过pcntl_fork创建一个子进程,如果返回值是-1的话,那么说明子进程创建失败.创建成功的进程id会返回给父进程,0返回给子进程.不好理解吧,所以应该这样写:

 

</>code

  1. <?php
  2. $pid = pcntl_fork();
  3. if($pid == -1){
  4. //创建失败咱就退出呗,没啥好说的
  5. die('could not fork');
  6. }
  7. else{
  8. if($pid){
  9. //从这里开始写的代码是父进程的,因为写的是系统程序,记得退出的时候给个返回值
  10. exit(0);
  11. }
  12. else{
  13. //从这里开始写的代码都是在新的进程里执行的,同样正常退出的话,最好也给一个返回值
  14. exit(0);
  15. }
  16. }
  17. ?>

这样一改好理解多了,如果你父进程希望知道子进程正常退出的话,可以加上前面的pcntl_wait。

 

 

2.通过stream_socket_client 方式

 

</>code

  1. function sendStream() {
  2. $english_format_number = number_format($number, 4, '.', '');
  3. echo $english_format_number;
  4. exit();
  5. $timeout = 10;
  6. $result = array();
  7. $sockets = array();
  8. $convenient_read_block = 8192;
  9. $host = "test.local.com";
  10. $sql = "select waybill_id,order_id from xm_waybill where status>40 order by update_time desc limit 1 ";
  11. $data = Yii::app()->db->createCommand($sql)->queryAll();
  12. $id = 0;
  13. foreach ($data as $k => $v) {
  14. if ($k % 2 == 0) {
  15. $send_data[$k]['body'] = NoticeOrder::getSendData($v['waybill_id']);
  16. } else {
  17. $send_data[$k]['body'] = array($v['order_id'] => array('extra' => 16));
  18. }
  19. $data = json_encode($send_data[$k]['body']);
  20. $s = stream_socket_client($host . ":80", $errno, $errstr, $timeout, STREAM_CLIENT_ASYNC_CONNECT | STREAM_CLIENT_CONNECT);
  21. if ($s) {
  22. $sockets[$id++] = $s;
  23. $http_message = "GET /php/test.php?data=" . $data . " HTTP/1.0\r\nHost:" . $host . "\r\n\r\n";
  24. fwrite($s, $http_message);
  25. } else {
  26. echo "Stream " . $id . " failed to open correctly.";
  27. }
  28. }
  29. while (count($sockets)) {
  30. $read = $sockets;
  31. stream_select($read, $w = null, $e = null, $timeout);
  32. if (count($read)) {
  33. /* stream_select generally shuffles $read, so we need to
  34. compute from which socket(s) we're reading. */
  35. foreach ($read as $r) {
  36. $id = array_search($r, $sockets);
  37. $data = fread($r, $convenient_read_block);
  38. if (strlen($data) == 0) {
  39. echo "Stream " . $id . " closes at " . date('h:i:s') . ".<br> ";
  40. fclose($r);
  41. unset($sockets[$id]);
  42. } else {
  43. $result[$id] = $data;
  44. }
  45. }
  46. } else {
  47. /* A time-out means that *all* streams have failed
  48. to receive a response. */
  49. echo "Time-out!\n";
  50. break;
  51. }
  52. }
  53. print_r($result);
  54. }

 

 

3、通过多进程代替多线程

 

</>code

  1. function daemon($func_name,$args,$number){
  2. while(true){
  3. $pid=pcntl_fork();
  4. if($pid==-1){
  5. echo "fork process fail";
  6. exit();
  7. }elseif($pid){//创建的子进程
  8. static $num=0;
  9. $num++;
  10. if($num>=$number){
  11. //当进程数量达到一定数量时候,就对子进程进行回收。
  12. pcntl_wait($status);
  13. $num--;
  14. }
  15. }else{ //为0 则代表是子进程创建的,则直接进入工作状态
  16. if(function_exists($func_name)){
  17. while (true) {
  18. $ppid=posix_getpid();
  19. var_dump($ppid);
  20. call_user_func_array($func_name,$args);
  21. sleep(2);
  22. }
  23. }else{
  24. echo "function is not exists";
  25. }
  26. exit();
  27. }
  28. }
  29. }
  30. function worker($args){
  31. //do something
  32. }
  33. daemon('worker',array(1),2);

 

 

二、真正实现php多线程的方法

php真正的多线程实现方式,通过安装php的扩展 pthread 可以做到。
 
 
但是这个下载的是 版本3 也就是php 7 才能用的,我们需要使的是 版本2
 
php扩展

 
然后刷新的页面如下,拖到最底部:

php多线程
 
下载文件

下一页找到版本2的

下载下来,这个v2 才是php5才可以使用的

下载下来,安装:

或者,您直接这样下载:

</>code

  1. cd /tools
  2. wget https://github.com/krakjoe/pthreads/archive/v2.0.10.zip
  3. unzip v2.0.10.zip
  4. cd pthreads-2.0.10
  5. /usr/local/php/bin/phpize
  6. ./configure --with-php-config=/usr/local/php/bin/php-config
  7. make
  8. make install
注意:您的php 在编译的时候需要开启 –enable-maintainer-zts

</>code

  1. ./configure --prefix=/usr/local/php --disable-fileinfo --enable-fpm --with-config-file-path=/etc --with-config-file-scan-dir=/etc/php.d --with-openssl --with-zlib --with-curl --enable-ftp --with-gd --with-xmlrpc --with-jpeg-dir --with-png-dir --with-freetype-dir --enable-gd-native-ttf --enable-mbstring --with-mcrypt=/usr/local/libmcrypt --enable-zip --with-mysql=/usr/local/mysql --without-pear --enable-maintainer-zts

</>code

  1. vim /etc/php.ini
  2. 添加
  3. extension=pthreads.so

</>code

  1. 重启php
  2. /etc/init.d/php-fpm restart
 

如对本文有疑问,请提交到交流论坛,广大热心网友会为你解答!! 点击进入论坛

发表评论 (520人查看0条评论)
请自觉遵守互联网相关的政策法规,严禁发布色情、暴力、反动的言论。
昵称:
最新评论
------分隔线----------------------------

快速入口

· 365软件
· 杰创官网
· 建站工具
· 网站大全

其它栏目

· 建站教程
· 365学习

业务咨询

· 技术支持
· 服务时间:9:00-18:00
365建站网二维码

Powered by 365建站网 RSS地图 HTML地图

copyright © 2013-2024 版权所有 鄂ICP备17013400号