Overview

Namespaces

  • bandwidthThrottle
    • tokenBucket
      • storage
        • scope

Classes

  • bandwidthThrottle\tokenBucket\BlockingConsumer
  • bandwidthThrottle\tokenBucket\Rate
  • bandwidthThrottle\tokenBucket\storage\FileStorage
  • bandwidthThrottle\tokenBucket\storage\IPCStorage
  • bandwidthThrottle\tokenBucket\storage\MemcachedStorage
  • bandwidthThrottle\tokenBucket\storage\MemcacheStorage
  • bandwidthThrottle\tokenBucket\storage\PDOStorage
  • bandwidthThrottle\tokenBucket\storage\PHPRedisStorage
  • bandwidthThrottle\tokenBucket\storage\PredisStorage
  • bandwidthThrottle\tokenBucket\storage\SessionStorage
  • bandwidthThrottle\tokenBucket\storage\SingleProcessStorage
  • bandwidthThrottle\tokenBucket\TokenBucket

Interfaces

  • bandwidthThrottle\tokenBucket\storage\scope\GlobalScope
  • bandwidthThrottle\tokenBucket\storage\scope\RequestScope
  • bandwidthThrottle\tokenBucket\storage\scope\SessionScope
  • bandwidthThrottle\tokenBucket\storage\Storage

Exceptions

  • bandwidthThrottle\tokenBucket\storage\StorageException
  • bandwidthThrottle\tokenBucket\TimeoutException
  • bandwidthThrottle\tokenBucket\TokenBucketException
  • Overview
  • Namespace
  • Class
  1:   2:   3:   4:   5:   6:   7:   8:   9:  10:  11:  12:  13:  14:  15:  16:  17:  18:  19:  20:  21:  22:  23:  24:  25:  26:  27:  28:  29:  30:  31:  32:  33:  34:  35:  36:  37:  38:  39:  40:  41:  42:  43:  44:  45:  46:  47:  48:  49:  50:  51:  52:  53:  54:  55:  56:  57:  58:  59:  60:  61:  62:  63:  64:  65:  66:  67:  68:  69:  70:  71:  72:  73:  74:  75:  76:  77:  78:  79:  80:  81:  82:  83:  84:  85:  86:  87:  88:  89:  90:  91:  92:  93:  94:  95:  96:  97:  98:  99: 100: 101: 102: 103: 104: 105: 106: 107: 108: 109: 110: 111: 112: 113: 114: 115: 116: 117: 118: 119: 120: 121: 122: 123: 124: 125: 126: 127: 128: 129: 130: 131: 132: 133: 134: 135: 136: 137: 138: 139: 140: 141: 142: 143: 144: 145: 146: 147: 148: 149: 150: 151: 152: 153: 154: 155: 156: 157: 158: 159: 160: 161: 162: 163: 164: 165: 166: 167: 168: 169: 170: 171: 172: 173: 174: 175: 176: 177: 178: 179: 180: 181: 182: 183: 184: 185: 186: 187: 188: 189: 190: 191: 192: 193: 194: 195: 196: 197: 198: 199: 200: 201: 202: 203: 204: 205: 206: 207: 208: 209: 210: 211: 212: 213: 214: 215: 216: 217: 218: 219: 220: 221: 
<?php

namespace bandwidthThrottle\tokenBucket\storage;

use bandwidthThrottle\tokenBucket\storage\Storage;
use malkusch\lock\mutex\TransactionalMutex;
use bandwidthThrottle\tokenBucket\storage\scope\GlobalScope;

/**
 * PDO based storage which can be shared over a common DBS.
 *
 * This storage is in the global scope.
 *
 * @author Markus Malkusch <markus@malkusch.de>
 * @link bitcoin:1335STSwu9hST4vcMRppEPgENMHD2r1REK Donations
 * @license WTFPL
 */
final class PDOStorage implements Storage, GlobalScope
{

    /**
     * @var PDO The pdo.
     */
    private $pdo;
    
    /**
     * @var string The shared name of the token bucket.
     */
    private $name;
    
    /**
     * @var TransactionalMutex The mutex.
     */
    private $mutex;
    
    /**
     * Sets the PDO and the bucket's name for the shared storage.
     *
     * The name should be the same for all token buckets which share the same
     * token storage.
     *
     * The transaction isolation level should avoid lost updates, i.e. it should
     * be at least Repeatable Read.
     *
     * @param string $name The name of the token bucket.
     * @param PDO    $pdo  The PDO.
     *
     * @throws \LengthException          The id should not be longer than 128 characters.
     * @throws \InvalidArgumentException PDO must be configured to throw exceptions.
     */
    public function __construct($name, \PDO $pdo)
    {
        if (strlen($name) > 128) {
            throw new \LengthException("The name should not be longer than 128 characters.");
        }
        if ($pdo->getAttribute(\PDO::ATTR_ERRMODE) !== \PDO::ERRMODE_EXCEPTION) {
            throw new \InvalidArgumentException("The pdo must have PDO::ERRMODE_EXCEPTION set.");
        }
        $this->pdo   = $pdo;
        $this->name  = $name;
        $this->mutex = new TransactionalMutex($pdo);
    }
    
    public function bootstrap($microtime)
    {
        try {
            try {
                $this->onErrorRollback(function () {
                    $options = $this->forVendor(["mysql" => "ENGINE=InnoDB CHARSET=utf8"]);
                    $this->pdo->exec(
                        "CREATE TABLE TokenBucket (
                            name      VARCHAR(128)     PRIMARY KEY,
                            microtime DOUBLE PRECISION NOT NULL
                         ) $options;"
                    );
                });
            } catch (\PDOException $e) {
                /*
                 * This exception is ignored to provide a portable way
                 * to create a table only if it doesn't exist yet.
                 */
            }

            $insert = $this->pdo->prepare(
                "INSERT INTO TokenBucket (name, microtime) VALUES (?, ?)"
            );
            $insert->execute([$this->name, $microtime]);
            if ($insert->rowCount() !== 1) {
                throw new StorageException("Failed to insert token bucket into storage '$this->name'");
            }
        } catch (\PDOException $e) {
            throw new StorageException("Failed to bootstrap storage '$this->name'", 0, $e);
        }
    }
    
    public function isBootstrapped()
    {
        try {
            return $this->onErrorRollback(function () {
                return (bool) $this->querySingleValue(
                    "SELECT 1 FROM TokenBucket WHERE name=?",
                    [$this->name]
                );
            });
        } catch (StorageException $e) {
            // This seems to be a portable way to determine if the table exists or not.
            return false;
        } catch (\PDOException $e) {
            throw new StorageException("Can't check bootstrapped state", 0, $e);
        }
    }

    public function remove()
    {
        try {
            $delete = $this->pdo->prepare("DELETE FROM TokenBucket WHERE name = ?");
            $delete->execute([$this->name]);

            $count = $this->querySingleValue("SELECT count(*) FROM TokenBucket");
            if ($count == 0) {
                $this->pdo->exec("DROP TABLE TokenBucket");
            }
        } catch (\PDOException $e) {
            throw new StorageException("Failed to remove the storage.", 0, $e);
        }
    }

    public function setMicrotime($microtime)
    {
        try {
            $update = $this->pdo->prepare(
                "UPDATE TokenBucket SET microtime = ? WHERE name = ?"
            );
            $update->execute([$microtime, $this->name]);
        } catch (\PDOException $e) {
            throw new StorageException("Failed to write to storage '$this->name'.", 0, $e);
        }
    }
    
    public function getMicrotime()
    {
        $forUpdate = $this->forVendor(["sqlite" => ""], "FOR UPDATE");
        return (double) $this->querySingleValue(
            "SELECT microtime from TokenBucket WHERE name = ? $forUpdate",
            [$this->name]
        );
    }

    /**
     * Returns a vendor specific dialect value.
     *
     * @param string[] $map     The vendor dialect map.
     * @param string   $default The default value, which is empty per default.
     *
     * @return string The vendor specific value.
     */
    private function forVendor(array $map, $default = "")
    {
        $vendor = $this->pdo->getAttribute(\PDO::ATTR_DRIVER_NAME);
        return isset($map[$vendor]) ? $map[$vendor] : $default;
    }
    
    /**
     * Returns one value from a query.
     *
     * @param string $sql        The SQL query.
     * @param array  $parameters The optional query parameters.
     *
     * @return string The value.
     * @throws StorageException The query failed.
     */
    private function querySingleValue($sql, $parameters = [])
    {
        try {
            $statement = $this->pdo->prepare($sql);
            $statement->execute($parameters);

            $value = $statement->fetchColumn();

            $statement->closeCursor();
            if ($value === false) {
                throw new StorageException("The query returned no result.");
            }
            return $value;
        } catch (\PDOException $e) {
            throw new StorageException("The query failed.", 0, $e);
        }
    }

    /**
     * Rollback to an implicit savepoint.
     *
     * @throws \PDOException
     */
    private function onErrorRollback(callable $code)
    {
        if (!$this->pdo->inTransaction()) {
            return call_user_func($code);
        }
        
        $this->pdo->exec("SAVEPOINT onErrorRollback");
        try {
            $result = call_user_func($code);
        } catch (\Exception $e) {
            $this->pdo->exec("ROLLBACK TO SAVEPOINT onErrorRollback");
            throw $e;
        }
        $this->pdo->exec("RELEASE SAVEPOINT onErrorRollback");
        return $result;
    }

    public function getMutex()
    {
        return $this->mutex;
    }

    public function letMicrotimeUnchanged()
    {
    }
}
API documentation generated by ApiGen