Php rabbitMq consuming daemon failing with mysql broken pipe error

312 Views Asked by At

I'm currently trying to restore the functionality of an old app and part of functionality I can't restore (honestly I'm even not sure it worked properly before) logic to consume rabbit MQ messages. This app has custom logic to run workers which processing the rabbit messages. The issues are when workers running they always failing on any action with database (get some data, close connection, etc.) whit error send of 291 bytes failed with errno=32 Broken pipe. Application using Propel ORM. So first I'll share propel configurations file:

config.propel.php:

<?php
use Propel\Runtime\Propel;
use Propel\Runtime\Connection\ConnectionManagerSingle;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;

$loggerConfig = $config->get('logger')['sql'];
$db = $config->get('db');
$env = $config->get('app')['env'];

$defaultLogger = new Logger('defaultLogger');
$defaultLogger->pushHandler(new StreamHandler($loggerConfig['filename'], $loggerConfig['level']));
Propel::getServiceContainer()->setLogger('defaultLogger', $defaultLogger);

$queryLogger = new Logger('main');
$queryLogger->pushHandler(new StreamHandler($loggerConfig['filename'], $loggerConfig['level']));
Propel::getServiceContainer()->setLogger('main', $queryLogger);

$serviceContainer = Propel::getServiceContainer();
$serviceContainer->setAdapterClass('main', 'mysql');

$manager = new ConnectionManagerSingle();

$dsn = 'mysql:host='.$db['host'].';';

if (isset($db['port'])) {
    $dsn .= 'port=' . $db['port'] . ";";
} else {
    $dsn .= 'port=3306;';
}

if (isset($db['charset'])) {
    $dsn .= 'charset='.$db['charset'].";";
}

$dsn .= 'dbname='.$db['dbname'];

$manager->setConfiguration([
    'dsn' => $dsn,
    'user'     => $db['user'],
    'password' => $db['password'],
]);

$serviceContainer->setConnectionManager('main', $manager);

if ($env == 'development') {
    $serviceContainer->getWriteConnection('main')->useDebug(true);
}

bootstrap.daemon.php which is actually used for bootstrapping that worker's application:

    <?php

    require 'lib/vendor/autoload.php';
    require 'app/autoload.php';
    // app config is just set of configurations like database credentials etc. 
    require 'app/config.php';
  // provided above
    require_once 'app/config.propel.php';

and the start script itself which is starting the worker's daemon:

#!/usr/bin/php -q
<?php
set_time_limit(0);

require __DIR__.'/../../bootstrap.daemon.php';
// it's a routing (not sure why it here)
require __DIR__.'/../../app/config.public.php';

use App;
use Luncher;

App::start($config);

/*
    Here is some magic:)
    Do not change
 */
{
    ini_set('mysql.connect_timeout', 0);
    ini_set('default_socket_timeout', 0);
}

$l = new Luncher;
$l->startDeamon();

Launcher:

<?php
namespace App\Daemons\Core;

use App\App;
use App\Daemons\Core\ServiceWrapper;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;

class Luncher
{
    private static $daemonsNamespace = "App\Daemons\\";

    public function __construct()
    {
        set_error_handler(function($errno, $errstr, $errfile, $errline) {
            $this->out("[ERROR] ". $errstr." at line ".$errline." in file ".$errfile);
            die();
        });
    }


    public function startDeamon($daemonName = 'Deamon)
    {
        $className = self::$daemonsNamespace.$daemonName;
        if (class_exists($className)) {
            $class = new \ReflectionClass($className);

            $logger = new Logger('daemon');
            $logger->pushHandler(new StreamHandler($daemonConfig['log']));

            $serives = new ServiceWrapper($logger);

            // start 5 workers  
            for ($i=0; $i<5; $i++) {
                echo date("d.m.Y H:i")." Starting daemon $daemonName\n";
                $className::start($serives);
            }
        } else {
            echo date("d.m.Y H:i")." Class $className not found!\n";
        }
    }

    private function stopDaemon($daemonName = 'Daemon')
    {
        $daemonPid = $this->getDaemonPids(daemonName);
        if (strlen($daemonPid)) {
            $this->out("Killing daemon $daemonName");
            exec("kill ".$daemonPid);
        }
    }

    private function getDaemonPids($daemonName)
    {
        $result = [];
        $tmp = [];
        exec("pidof daemons.$daemonName", $tmp);

        if (isset($tmp[0])) {
            $result = explode(' ', $tmp[0]);
        }

        return $result;
    }

    private function out($str)
    {
         echo date("d.m.Y H:i")." $str \n";
    }
}

Deamon itself:

<?php
namespace App\Daemons\Core\Queue;

use App\App;
use Propel\Runtime\Propel;
use App\Daemons\Core\ServiceWrapper;

class Daemon
{
    protected $isParent = true;
    protected $AMQPChannel;
    protected $services;
    protected $currentMsg;
    protected $reinitializeAttempts = 0;
    const MAX_REINITIALIZE_ATTEMPTS = 2;

    public static function start(ServiceWrapper $services)
    {
        $d = new static;
        $d->services = $services;
        $d->fork();

        if ($d->isChild()) {
            $d->services->setPid(getmypid());
            $d->run();
        }
    }


    protected function fork()
    {
        $this->isParent = pcntl_fork();
    }

    protected function isChild()
    {
        if (!$this->isParent) {
            return true;
        } else {
            return false;
        }
    }

    final protected function run()
    {
        try {
            $fErrorHandler = function($errno, $errstr, $errfile, $errline) {
                $this->error('[ERROR] '.$errstr." at line ".$errline." in file ".$errfile);
                die();
            };
            set_error_handler(function($errno, $errstr, $errfile, $errline) use ($fErrorHandler) {
                $fErrorHandler($errno, $errstr, $errfile, $errline);
            });

            register_shutdown_function(function() use ($fErrorHandler) {
                $error = error_get_last();

                if ($error !== null) {
                    $errno   = $error["type"];
                    $errfile = $error["file"];
                    $errline = $error["line"];
                    $errstr  = $error["message"];
                    $fErrorHandler($errno, $errstr, $errfile, $errline);
                }
            });


            $this->_init();

            $callback = function($msg) {
                if (!is_null($msg) && is_array($msg)) {
                    $this->output("Got a new message from queue");
                    $this->output(json_encode($msg));
                    $this->currentMsg = $msg;
                    $this->onNewMessage($msg);
                    $this->currentMsg = null;
                    $this->reinitializeAttempts = 0;
                }
            };

            if ($this->currentMsg) {
                $callback($this->currentMsg);
            }

            $this->consume('long_jobs', $callback);
            $this->runHandling();
        } catch (\Exception $e) {
            $this->error('[EXCEPTION] '.$e->getMessage()." ".$e->getFile()." : ". $e->getLine());

            /**
             * we can't get error code for PDOException :(
             */
            if ($e instanceof \PDOException || $e instanceof \Doctrine\DBAL\DBALException) {
                if ($this->reinitializeAttempts < self::MAX_REINITIALIZE_ATTEMPTS) {
                    $this->output('Trying to reinitialize daemon');
                    $this->reinitializeAttempts++;
                    $this->run();
                    return;
                } else {
                    $this->error('Max reinitialize attempts were achivied');
                }
            }
            die();
        }
    }

    protected function output($msg)
    {
        $this->services->output($msg);
    }

    protected function error($msg)
    {
        $this->services->error($msg);
    }

    protected function onNewMessage($data)
    {
        $this->output("Got a new job");

        $job = new LongJob();


        $this->output("Lunching job ".$data['jobName']);

        $job->setServices($this->services);

        $t1 = microtime(true);
        $job->run($data['data']);
        Propel::closeConnections();
        $this->output("Job is finished. time: ".(microtime(true) - $t1)." seconds");
    }

    private function _init()
    {
        $config = App::getConfig();

        $connection= new AMQPConnection($config->host, $config->port, $config->user, $config->password);
        $this->AMQPChannel = $connection->channel();
    }

    public function consume($queueName, $userCallback)
    {
        $callback = function($msg) use ($userCallback) {
            $userCallback(json_decode($msg->body, true));
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };
        $this->AMQPChannel->basic_qos(null, 1, null);
        $this->AMQPChannel->queue_declare($queueName, false, true, false, false);
        $this->AMQPChannel->basic_consume($queueName, '', false, false, false, false, $callback);
    }

    public function runHandling()
    {
        while (count($this->AMQPChannel->callbacks)) {
            $this->channel->wait();
        }

        $this->AMQPChannel->close();
    }
}

And LongJob:

namespace App\Daemons\LongJobs;

use App\Daemons\Core\LongJob;
use App\Models\User;
use App\App;
use App\EventsServer\Core\Event;
use App\EventsServer\Core\EventsManager;
use App\Models\AR\Payments\Payments;
    class LongJob {
        protected $services;
        public function setServices(IServices $services)
        {
            $this->services = $services;
        }

        public function run($data)
        {
            if (!isset($data['userId'])) {
                $this->services->error("There is no user id!");
                return;
            }

            $compatProductId = ProductQuery::create()
                ->filterBySku(Product::SKU_COMPAT)
                ->findOne()
                ->getId();


            $oUser = User::find($data['userId']);

            if (!$oUser) {
                $this->services->error("Can't find user with id ". $data['userId']);
                return;
            }

            $arPurchase = PurchaseQuery::create()
                ->filterByProductId($compatProductId)
                ->find();

            foreach ($arPurchase as $purchase) {
                if ($data['userId'] != $purchase->getUserId()) {
                    $oPurchasedUser = User::find($purchase->getUserId());
                    $oPayments = new Payments($oPurchasedUser);
                    $oPayments->calcAndSaveCompat([$oUser]);
                }
            }
        }

And when LongJob is running and first usage of database occure it throwing that error. Sorry for a wall of code, but I do not know how else I can share the issue. Honestly code is not very clean and writtern years ago. In my experience I always use rabbit with some of modern frameworks so I'm not very familair with creating such a daemons by own, and I do not khow to solve it :(

0

There are 0 best solutions below