Call us Toll-Free:
1-800-218-1525
Email us

 Sponsors

Cassandra PHP Wrapper

Mike Peters, 04-07-2010
Update: Check out the Cassandra PHP Wrapper 0.7

In a previous post, I explained how the Cassandra decentralized database can allow you to scale well beyond what's possible with MySQL.

The Cassandra Data Model and API are very different than traditional RDBMS way-of-thinking. That's why a lot of developers switching from MySQL to Cassandra are finding it difficult to grasp at first.

To simplify the migration from MySQL to Cassandra, we created a high-level PHP wrapper for Cassandra, using function prototypes and variable names that are close in meaning to RDBMS.

While there are several other high level PHP wrappers for Cassandra, none of the existing ones answered our requirements -

* Simple
* Don't throw exceptions, return error codes
* As close as possible to the low level Thrift
* Make it easy for RDBMS developers to adopt

This is how the CassandraDB class was born.

Adding records to the database is as simple as:


// Initialize Cassandra
$cassandra = new CassandraDB("SPI");

// Debug on
$cassandra->SetDisplayErrors(true);

// Insert record ("Columns" in Cassandra)
$record = array();
$record["name"] = "Mike Peters";
$record["email"] = "mike at softwareprojects.com";
if (
$cassandra->InsertRecord('mytable', "Mike Peters", $record))
{
  echo
"Record (Columns) inserted successfully.\r\n";
}

// Print record
$record = $cassandra->GetRecordByKey('mytable', "Mike Peters");
print_r($record);

Output is

Array
(
[email] => mike at softwareprojects.com
[name] => Mike Peters
)

Adding record arrays:


// Initialize Cassandra
$cassandra = new CassandraDB("SPI");

// Debug on
$cassandra->SetDisplayErrors(true);

// Insert record array ("SuperColumns" in Cassandra)
$record = array();
$record["Mike Peters"] = array("name" => "Mike Peters", "email" => "Mike at Peters");
$record["Jonathan Ellis"] = array("name" => "Jonathan Ellis", "email" => "Jonathan at Ellis");
if (
$cassandra->InsertRecordArray('my_super_table', "People", $record))
{
  echo
"RecordArray (SuperColumns) inserted successfully.\r\n";
}

// Print record array
$record = $cassandra->GetRecordByKey('my_super_table', 'People');
print_r($record);

Output is

Array
(
[Jonathan Ellis] => Array
(
[email] => Jonathan at Ellis
[name] => Jonathan Ellis
)

[Mike Peters] => Array
(
[email] => Mike at Peters
[name] => Mike Peters
)

)

--

Here's the complete Cassandra PHP Wrapper class. Enjoy!


// CassandraDB version 0.1
// Software Projects Inc
// http://www.softwareprojects.com
//

// Includes
$GLOBALS['THRIFT_ROOT'] = '/services/thrift/';
require_once
$GLOBALS['THRIFT_ROOT'].'/packages/cassandra/Cassandra.php';
require_once
$GLOBALS['THRIFT_ROOT'].'/packages/cassandra/cassandra_types.php';
require_once
$GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
require_once
$GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php';
require_once
$GLOBALS['THRIFT_ROOT'].'/transport/TFramedTransport.php';
require_once
$GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php';

class
CassandraDB
{
 
// Internal variables
 
protected $socket;
  protected
$client;
  protected
$keyspace;
  protected
$transport;
  protected
$protocol;
  protected
$err_str = "";
  protected
$display_errors = 0;
  protected
$consistency = 1;
  protected
$parse_columns = 1;

 
// Functions

  // Constructor - Connect to Cassandra via Thrift
 
function CassandraDB  ($keyspace, $host = "127.0.0.1", $port = 9160)
  {
   
// Initialize
   
$this->err_str = '';

    try
    {
   
// Store passed 'keyspace' in object
   
$this->keyspace = $keyspace;

   
// Make a connection to the Thrift interface to Cassandra
   
$this->socket = new TSocket($host, $port);
   
$this->transport = new TFramedTransport($this->socket, 1024, 1024);
   
$this->protocol = new TBinaryProtocolAccelerated($this->transport);
   
$this->client = new CassandraClient($this->protocol);
   
$this->transport->open();
    }
    catch (
TException $tx)
    {
     
// Error occured
     
$this->err_str = $tx->why;
     
$this->Debug($tx->why." ".$tx->getMessage());
    }
  }

 
// Insert Column into ColumnFamily
  // (Equivalent to RDBMS Insert record to a table)
 
function InsertRecord  ($table /* ColumnFamily */, $key /* ColumnFamily Key */, $record /* Columns */)
  {
   
// Initialize
   
$this->err_str = '';

    try
    {
     
// Timestamp for update
     
$timestamp = time();

     
// Build batch mutation
     
$cfmap = array();
     
$cfmap[$table] = $this->array_to_supercolumns_or_columns($record, $timestamp);
 
     
// Insert
     
$this->client->batch_insert($this->keyspace, $key, $cfmap, $this->consistency);

     
// If we're up to here, all is well
     
$result = 1;
    }
    catch (
TException $tx)
    {
     
// Error occured
     
$result = 0;
     
$this->err_str = $tx->why;
     
$this->Debug($tx->why." ".$tx->getMessage());
    }

   
// Return result
   
return $result;
  }

 
// Insert SuperColumn into SuperColumnFamily
  // (Equivalent to RDMBS Insert record to a "nested table")
 
function InsertRecordArray  ($table /* SuperColumnFamily */, $key_parent /* Super CF */,
               
$record /* Columns */)
  {
   
// Initialize
   
$err_str = '';

    try
    {
     
// Timestamp for update
     
$timestamp = time();

     
// Build batch mutation
     
$cfmap = array();
     
$cfmap[$table] = $this->array_to_supercolumns_or_columns($record, $timestamp);

     
// Insert
     
$this->client->batch_insert($this->keyspace, $key_parent, $cfmap, $this->consistency);

     
// If we're up to here, all is well
     
$result = 1;
    }
    catch (
TException $tx)
    {
     
// Error occured
     
$result = 0;
     
$this->err_str = $tx->why;
     
$this->Debug($tx->why." ".$tx->getMessage());
    }

   
// Return result
   
return $result;
  }

 
// Get record by key
 
function GetRecordByKey  ($table /* ColumnFamily or SuperColumnFamily */, $key, $start_from="", $end_at="")
  {
   
// Initialize
   
$err_str = '';

    try
    {
      return
$this->get($table, $key, NULL, $start_from, $end_at);
    }
    catch (
TException $tx)
    {
     
// Error occured
     
$this->err_str = $tx->why;
     
$this->Debug($tx->why." ".$tx->getMessage());
      return array();
    }
  }

 
// Print debug message
 
function Debug      ($str)
  {
   
// If verbose is off, we're done
   
if (!$this->display_errors) return;

   
// Print
   
echo date("Y-m-d h:i:s")." CassandraDB ERROR: $str\r\n";
  }

 
// Turn verbose debug on/off (Default is off)
 
function SetDisplayErrors($flag)
  {
   
$this->display_errors = $flag;
  }

 
// Set Consistency level (Default is 1)
 
function SetConsistency  ($consistency)
  {
   
$this->consistency = $consistency;
  }

 
// Build cf array
 
function array_to_supercolumns_or_columns($array, $timestamp=null)
  {
    if(empty(
$timestamp)) $timestamp = time();

   
$ret = null;
    foreach(
$array as $name => $value) {
     
$c_or_sc = new cassandra_ColumnOrSuperColumn();
      if(
is_array($value)) {
       
$c_or_sc->super_column = new cassandra_SuperColumn();
       
$c_or_sc->super_column->name = $this->unparse_column_name($name, true);
       
$c_or_sc->super_column->columns = $this->array_to_columns($value, $timestamp);
       
$c_or_sc->super_column->timestamp = $timestamp;
      }
      else
      {
       
$c_or_sc = new cassandra_ColumnOrSuperColumn();
       
$c_or_sc->column = new cassandra_Column();
       
$c_or_sc->column->name = $this->unparse_column_name($name, true);
       
$c_or_sc->column->value = $value;
       
$c_or_sc->column->timestamp = $timestamp;
      }
     
$ret[] = $c_or_sc;
    }

    return
$ret;
  }


 
// Parse column names for Cassandra
 
function parse_column_name($column_name, $is_column=true)
  {
    if(!
$column_name) return NULL;

    return
$column_name;
  }
 
 
// Unparse column names for Cassandra
 
function unparse_column_name($column_name, $is_column=true)
  {
    if(!
$column_name) return NULL;

    return
$column_name;
  }

 
// Convert supercolumns or columns into an array
 
function supercolumns_or_columns_to_array($array)
  {
   
$ret = null;
    for (
$i=0; $i<count($array); $i++)
    foreach (
$array[$i] as $object)
    {
      if (
$object)
      {
       
// If supercolumn
       
if (isset($object->columns))
        {
         
$record = array();
          for (
$j=0; $j<count($object->columns); $j++)
          {
           
$column = $object->columns[$j];
           
$record[$column->name] = $column->value;
          }
         
$ret[$object->name] = $record;
        }
       
// (Otherwise - not supercolumn)
       
else
        {
         
$ret[$object->name] = $object->value;
        }
      }
    }

    return
$ret;
  }

 
// Get record from Cassandra
 
function get($table, $key, $super_column=NULL, $slice_start="", $slice_finish="")
  {
    try
    {
   
$column_parent = new cassandra_ColumnParent();
   
$column_parent->column_family = $table;
   
$column_parent->super_column = $this->unparse_column_name($super_column, false);

   
$slice_range = new cassandra_SliceRange();
   
$slice_range->start = $slice_start;
   
$slice_range->finish = $slice_finish;
   
$predicate = new cassandra_SlicePredicate();
   
$predicate->slice_range = $slice_range;

   
$resp = $this->client->get_slice($this->keyspace, $key, $column_parent, $predicate, $this->consistency);

    return
$this->supercolumns_or_columns_to_array($resp);
    }
    catch (
TException $tx)
    {
   
$this->Debug($tx->why." ".$tx->getMessage());
    return array();
    }
  }

 
// Convert array to columns
 
function array_to_columns($array, $timestamp=null) {
    if(empty(
$timestamp)) $timestamp = time();

   
$ret = null;
    foreach(
$array as $name => $value) {
     
$column = new cassandra_Column();
     
$column->name = $this->unparse_column_name($name, false);
     
$column->value = $value;
     
$column->timestamp = $timestamp;

     
$ret[] = $column;
    }
    return
$ret;
  }

 
// Get error string
 
function ErrorStr()
  {
    return
$this->err_str;
  }
}

CassandraDB Todo:

* Ability to add multiple nodes and try to connect to next-in-line automatically if connection fails
* Implement the remaining get_ functions
* Implement delete
* Implement update vs insert-fail-on-duplicate-key?
* Implement Order By desc

-

Recommended Further Reading:

* Cassandra in Production at Digg
* WTF is a SuperColumn?
* Cassandra Reads - How do they work
* Cassandra Writes - How do they work
* Cassandra Write Properties (Video)
* Engineering notes by Facebook
* RandomPartitioner vs OrderPreservingPartitioner

Jeremy Hutchings, 04-08-2010
Looks cool, though getting some strange results during set up here :

-------
2010-04-08 08:17:49 CassandraDB ERROR: TSocket: timed out reading 4 bytes from 127.0.0.1:9160 2010-04-08 08:17:49 CassandraDB ERROR: TSocket: Could not read 2147549187 bytes from 127.0.0.1:9160 Array ( )
-------

Maybe adding some "Is Cassandra working" error handeling might help the migrating people.

Jeremy Hutchings, 04-08-2010
Just saw the previous post ....... my bad !

Mike Peters, 04-08-2010
Jeremy -

Often times the dreaded "timed out reading 4 bytes" error is something else.

Make sure you install version 0.6 of Cassandra, it reports the real error messages so you can tell what's wrong.

Version 0.5 will always spit out the "timed out reading 4 bytes" regardless of what the error is.

Anthony ROUX, 04-14-2010
Hi Jeremy,I have been the same problem. i changed TFramedTransport by TBufferedTransport on the constructor and it works.

See your cassandra configuration, there is a line for transport method.

Mek, 05-31-2010
thnx sir, got i working smoothly.. but now i'm thinking how to create auto increment keys.. should i use the timestamps instead?

Emir Habul, 07-10-2010
Thank you for providing this class.

My question is about it's license.

Is it allowed to copy, redistribute, modify, include in other project, or publish modified version?

Ryan, 07-23-2010
I have a minor fix for this I was getting a weird error.

PHP Notice: Undefined property: cassandra_Column::$columns in /var/www/html/fileman.php on line 58
PHP Notice: Undefined property: cassandra_Column::$columns in /var/www/html/fileman.php on line 58

if ($object->columns)

That seemns to be the issue

try

if (is_array($object))

fixes this.

Mike Peters, 07-27-2010
@Emir - Yes, feel free to copy, redistribute and modify this code as you see fit.

@Ryan - Thanks! Updated the original code

Michel Megens, 08-19-2010
I've just installed cassandra and I'm messing around a bit with it.. It works nice but... when I insert a row in a super column everything is OK (using your cassandra class), then i delete it using the CLI and after that I try to readd it using your class works fine too.. Altough it says that everything is ok ("RecordArray (SuperColumns) inserted successfully."), but when I check the columns with GetRecordByKey nothing is added.. And when I change the first dimension of the array it does add normaly again.. How can i fix it?

---Michel

PS: I cán readd manualy using the CLI

Ruggero Domenichini, 10-13-2010
Hi, great work!
I have successfully installed and used Cassandra and your wrapper on a Ubuntu-64 machine running under VMWare on a Mac.

I am trying now to use the Mac as the Apache/PHP server and the Ubuntu virtual machine as the Cassandra server.

I have changed the address in phpwrapper6 from 127.0.0.1 to 192.168.33.130 (the address of the Ubuntu machine) but when I try to read some data I get a write error from TSocket. Everything is fine if I use the Ubuntu machine for both Cassandra and Apache/PHP.

Any ideas?
Enjoyed this post?

Subscribe Now to receive new posts via Email as soon as they come out.

 Comments
Post your comments












Note: No link spamming! If your message contains link/s, it will NOT be published on the site before manually approved by one of our moderators.



About Us  |  Contact us  |  Privacy Policy  |  Terms & Conditions