diff --git a/src/applications/metamta/daemon/mta/PhabricatorMetaMTADaemon.php b/src/applications/metamta/daemon/mta/PhabricatorMetaMTADaemon.php index 4058924a27..43f2c8fc4f 100644 --- a/src/applications/metamta/daemon/mta/PhabricatorMetaMTADaemon.php +++ b/src/applications/metamta/daemon/mta/PhabricatorMetaMTADaemon.php @@ -1,36 +1,35 @@ <?php /* * Copyright 2011 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ class PhabricatorMetaMTADaemon extends PhabricatorDaemon { public function run() { echo "OK. Sending mail"; do { $mail = id(new PhabricatorMetaMTAMail())->loadAllWhere( 'status = %s AND nextRetry <= %d LIMIT 10', PhabricatorMetaMTAMail::STATUS_QUEUE, time()); foreach ($mail as $message) { $message->sendNow(); - echo "."; } $this->sleep(1); } while (true); } } diff --git a/src/infrastructure/daemon/garbagecollector/PhabricatorGarbageCollectorDaemon.php b/src/infrastructure/daemon/garbagecollector/PhabricatorGarbageCollectorDaemon.php index 4faad2c859..641a48651a 100644 --- a/src/infrastructure/daemon/garbagecollector/PhabricatorGarbageCollectorDaemon.php +++ b/src/infrastructure/daemon/garbagecollector/PhabricatorGarbageCollectorDaemon.php @@ -1,154 +1,157 @@ <?php /* * Copyright 2011 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** * Collects old logs and caches to reduce the amount of data stored in the * database. * * @group daemon */ class PhabricatorGarbageCollectorDaemon extends PhabricatorDaemon { public function run() { // Keep track of when we start and stop the GC so we can emit useful log // messages. $just_ran = false; do { $run_at = PhabricatorEnv::getEnvConfig('gcdaemon.run-at'); $run_for = PhabricatorEnv::getEnvConfig('gcdaemon.run-for'); // Just use the default timezone, we don't need to get fancy and try // to localize this. $start = strtotime($run_at); if ($start === false) { throw new Exception( "Configuration 'gcdaemon.run-at' could not be parsed: '{$run_at}'."); } $now = time(); if ($now < $start || $now > ($start + $run_for)) { if ($just_ran) { echo "Stopped garbage collector.\n"; $just_ran = false; } // The configuration says we can't collect garbage right now, so // just sleep until we can. $this->sleep(300); continue; } if (!$just_ran) { echo "Started garbage collector.\n"; $just_ran = true; } $n_herald = $this->collectHeraldTranscripts(); $n_daemon = $this->collectDaemonLogs(); $n_parse = $this->collectParseCaches(); $collected = array( 'Herald Transcript' => $n_herald, 'Daemon Log' => $n_daemon, 'Differential Parse Cache' => $n_parse, ); $collected = array_filter($collected); foreach ($collected as $thing => $count) { + if ($thing == 'Daemon Log' && !$this->getTraceMode()) { + continue; + } $count = number_format($count); echo "Garbage collected {$count} '{$thing}' objects.\n"; } $total = array_sum($collected); if ($total < 100) { // We didn't max out any of the GCs so we're basically caught up. Ease // off the GC loop so we don't keep doing table scans just to delete // a handful of rows. $this->sleep(300); } else { $this->stillWorking(); } } while (true); } private function collectHeraldTranscripts() { $ttl = PhabricatorEnv::getEnvConfig('gcdaemon.ttl.herald-transcripts'); if ($ttl <= 0) { return 0; } $table = new HeraldTranscript(); $conn_w = $table->establishConnection('w'); queryfx( $conn_w, 'UPDATE %T SET objectTranscript = "", ruleTranscripts = "", conditionTranscripts = "", applyTranscripts = "", garbageCollected = 1 WHERE garbageCollected = 0 AND `time` < %d LIMIT 100', $table->getTableName(), time() - $ttl); return $conn_w->getAffectedRows(); } private function collectDaemonLogs() { $ttl = PhabricatorEnv::getEnvConfig('gcdaemon.ttl.daemon-logs'); if ($ttl <= 0) { return 0; } $table = new PhabricatorDaemonLogEvent(); $conn_w = $table->establishConnection('w'); queryfx( $conn_w, 'DELETE FROM %T WHERE epoch < %d LIMIT 100', $table->getTableName(), time() - $ttl); return $conn_w->getAffectedRows(); } private function collectParseCaches() { $key = 'gcdaemon.ttl.differential-parse-cache'; $ttl = PhabricatorEnv::getEnvConfig($key); if ($ttl <= 0) { return 0; } $table = new DifferentialChangeset(); $conn_w = $table->establishConnection('w'); queryfx( $conn_w, 'DELETE FROM %T WHERE dateCreated < %d LIMIT 100', DifferentialChangeset::TABLE_CACHE, time() - $ttl); return $conn_w->getAffectedRows(); } } diff --git a/src/infrastructure/daemon/irc/bot/PhabricatorIRCBot.php b/src/infrastructure/daemon/irc/bot/PhabricatorIRCBot.php index ae2b394133..be1c529791 100644 --- a/src/infrastructure/daemon/irc/bot/PhabricatorIRCBot.php +++ b/src/infrastructure/daemon/irc/bot/PhabricatorIRCBot.php @@ -1,253 +1,255 @@ <?php /* * Copyright 2011 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** * Simple IRC bot which runs as a Phabricator daemon. Although this bot is * somewhat useful, it is also intended to serve as a demo of how to write * "system agents" which communicate with Phabricator over Conduit, so you can * script system interactions and integrate with other systems. * * NOTE: This is super janky and experimental right now. * * @group irc */ final class PhabricatorIRCBot extends PhabricatorDaemon { private $socket; private $handlers; private $writeBuffer; private $readBuffer; private $conduit; private $config; public function run() { $argv = $this->getArgv(); if (count($argv) !== 1) { throw new Exception("usage: PhabricatorIRCBot <json_config_file>"); } $json_raw = Filesystem::readFile($argv[0]); $config = json_decode($json_raw, true); if (!is_array($config)) { throw new Exception("File '{$argv[0]}' is not valid JSON!"); } $server = idx($config, 'server'); $port = idx($config, 'port', 6667); $handlers = idx($config, 'handlers', array()); $pass = idx($config, 'pass'); $nick = idx($config, 'nick', 'phabot'); $user = idx($config, 'user', $nick); $ssl = idx($config, 'ssl', false); $nickpass = idx($config, 'nickpass'); $this->config = $config; if (!preg_match('/^[A-Za-z0-9_`[{}^|\]\\-]+$/', $nick)) { throw new Exception( "Nickname '{$nick}' is invalid!"); } foreach ($handlers as $handler) { $obj = newv($handler, array($this)); $this->handlers[] = $obj; } $conduit_uri = idx($config, 'conduit.uri'); if ($conduit_uri) { $conduit_user = idx($config, 'conduit.user'); $conduit_cert = idx($config, 'conduit.cert'); $conduit = new ConduitClient($conduit_uri); $response = $conduit->callMethodSynchronous( 'conduit.connect', array( 'client' => 'PhabricatorIRCBot', 'clientVersion' => '1.0', 'clientDescription' => php_uname('n').':'.$nick, 'user' => $conduit_user, 'certificate' => $conduit_cert, )); $this->conduit = $conduit; } $errno = null; $error = null; if (!$ssl) { $socket = fsockopen($server, $port, $errno, $error); } else { $socket = fsockopen('ssl://'.$server, $port, $errno, $error); } if (!$socket) { throw new Exception("Failed to connect, #{$errno}: {$error}"); } $ok = stream_set_blocking($socket, false); if (!$ok) { throw new Exception("Failed to set stream nonblocking."); } $this->socket = $socket; $this->writeCommand('USER', "{$user} 0 * :{$user}"); if ($pass) { $this->writeCommand('PASS', "{$pass}"); } if ($nickpass) { $this->writeCommand("NickServ IDENTIFY ", "{$nickpass}"); } $this->writeCommand('NICK', "{$nick}"); $this->runSelectLoop(); } public function getConfig($key, $default = null) { return idx($this->config, $key, $default); } private function runSelectLoop() { do { $this->stillWorking(); $read = array($this->socket); if (strlen($this->writeBuffer)) { $write = array($this->socket); } else { $write = array(); } $except = array(); $ok = @stream_select($read, $write, $except, $timeout_sec = 1); if ($ok === false) { throw new Exception( "socket_select() failed: ".socket_strerror(socket_last_error())); } if ($read) { // Test for connection termination; in PHP, fread() off a nonblocking, // closed socket is empty string. if (feof($this->socket)) { // This indicates the connection was terminated on the other side, // just exit via exception and let the overseer restart us after a // delay so we can reconnect. throw new Exception("Remote host closed connection."); } do { $data = fread($this->socket, 4096); if ($data === false) { throw new Exception("fread() failed!"); } else { $this->debugLog(true, $data); $this->readBuffer .= $data; } } while (strlen($data)); } if ($write) { do { $len = fwrite($this->socket, $this->writeBuffer); if ($len === false) { throw new Exception("fwrite() failed!"); } else { $this->debugLog(false, substr($this->writeBuffer, 0, $len)); $this->writeBuffer = substr($this->writeBuffer, $len); } } while (strlen($this->writeBuffer)); } do { $routed_message = $this->processReadBuffer(); } while ($routed_message); } while (true); } private function write($message) { $this->writeBuffer .= $message; return $this; } public function writeCommand($command, $message) { return $this->write($command.' '.$message."\r\n"); } private function processReadBuffer() { $until = strpos($this->readBuffer, "\r\n"); if ($until === false) { return false; } $message = substr($this->readBuffer, 0, $until); $this->readBuffer = substr($this->readBuffer, $until + 2); $pattern = '/^'. '(?:(?P<sender>:(\S+)) )?'. // This may not be present. '(?P<command>[A-Z0-9]+) '. '(?P<data>.*)'. '$/'; $matches = null; if (!preg_match($pattern, $message, $matches)) { throw new Exception("Unexpected message from server: {$message}"); } $irc_message = new PhabricatorIRCMessage( idx($matches, 'sender'), $matches['command'], $matches['data']); $this->routeMessage($irc_message); return true; } private function routeMessage(PhabricatorIRCMessage $message) { foreach ($this->handlers as $handler) { try { $handler->receiveMessage($message); } catch (Exception $ex) { phlog($ex); } } } public function __destroy() { $this->write("QUIT Goodbye.\r\n"); fclose($this->socket); } private function debugLog($is_read, $message) { - echo $is_read ? '<<< ' : '>>> '; - echo addcslashes($message, "\0..\37\177..\377"); - echo "\n"; + if ($this->getTraceMode()) { + echo $is_read ? '<<< ' : '>>> '; + echo addcslashes($message, "\0..\37\177..\377"); + echo "\n"; + } } public function getConduit() { if (empty($this->conduit)) { throw new Exception( "This bot is not configured with a Conduit uplink. Set 'conduit.uri', ". "'conduit.user' and 'conduit.cert' in the configuration to connect."); } return $this->conduit; } }