keyspace = $keyspace; $this->host = $host; $this->port = $port; // Support for global variables //if (isset($GLOBALS['cassandra_host'])) $this->host = $GLOBALS['cassandra_host']; //if (isset($GLOBALS['cassandra_port'])) $this->port = $GLOBALS['cassandra_port']; } // Connect function Connect() { // Initialize $this->err_str = ''; $this->flag_failed_init = 0; $result = 0; try { // Make a connection to the Thrift interface to Cassandra $this->socket = new TSocket($this->host, $this->port); $this->transport = new TFramedTransport($this->socket, 1024, 1024); $this->protocol = new TBinaryProtocol/*Accelerated*/($this->transport); $this->client = new CassandraClient($this->protocol); $this->transport->open(); $this->client->set_keyspace($this->keyspace); // If we're up to here, all is well $result = 1; } catch (TException $tx) { // Error occured $this->flag_failed_init = 1; $this->err_str = $tx->why; $this->Debug($this->err_str); } // Return result return $result; } // Get last error function GetLastError() { return $this->err_str; } // Disconnect function Disconnect() { // Free resources unset($this->client); $this->transport->close(); unset($this->transport); $this->socket->close(); unset($this->socket); unset($this->protocol); $this->transport = null; } // Destructor function __destruct() { // Free resources $this->Disconnect(); } // Get time in milliseconds function time() { list($usec, $sec) = explode(" ",microtime()); $val = ((float)$usec + (float)$sec)*100; // Increment an internal counter, so that in the case multiple // Inserts/Deletes happen at the same microsecond, we'll still be good $this->time_counter++; if ($this->time_counter>9999) $this->time_counter=0; // Return result return $val.sprintf("%04d", $this->time_counter); } // Truncate a columnfamily function TruncateTable($table) { // If we failed init, bail if ($this->flag_failed_init) return array(); // Initialize $result = 0; try { // Truncate table $this->client->truncate($table); // If we're here, all is well $result = 1; } catch (TException $tx) { // Error occured $this->err_str = $tx->why; $this->Debug($this->err_str); } // Return result return $result; } // Get list of all columnfamilies in this keyspace function GetTables() { // If we failed init, bail if ($this->flag_failed_init) return array(); // Initialize $result = array(); try { // Get list of all tables $tables = $this->client->describe_keyspace($this->keyspace); if (!empty($tables)) foreach ($tables as $table_name => $table_options) $result[] = $table_name; } catch (TException $tx) { $this->Debug("GetTables error"); print_r($tx); // Error occured $this->err_str = $tx->why; $this->Debug($this->err_str); } // Return result return $result; } // Delete columnfamily function DeleteColumnFamily($column_family) { // Initialize $result = 0; try { // Delete column family $this->client->system_drop_column_family($column_family); } catch (TException $tx) { $this->Debug("DeleteColumnFamily error"); print_r($tx); // Error occured $this->err_str = $tx->why; $this->Debug($this->err_str); } // Return result return $result; } // Add columnfamily function AddColumnFamily($cfdef) { // Initialize $result = 0; try { // Add column family $this->client->system_add_column_family($cfdef); } catch (TException $tx) { // If columnfamily already defined if (strpos($tx->why,"already defined")!==false) { // Consider this a success $result = 1; } // (Otherwise - different error) else { $this->Debug("AddColumnFamily error"); print_r($tx); // Error occured $this->err_str = $tx->why; $this->Debug($this->err_str); } } // Return result return $result; } // Add keyspace function AddKeyspace($cfdef) { // Initialize $result = 0; try { // Prepare record $ks = new cassandra_KsDef(array("name"=>$this->keyspace ,"replication_factor"=>"1" ,"strategy_class"=>"org.apache.cassandra.locator.OldNetworkTopologyStrategy", "cf_defs"=>$cfdef)); // Create new keyspace $this->client->system_add_keyspace($ks); // If we're up to here - all is well $result = 1; $this->flag_failed_init = 0; // Disconnect $this->Disconnect(); // Reconnect $this->Connect(); } catch (TException $tx) { // If keyspace already exists if (strpos($tx->why,"already exists")!==false) { // Consider this a success $result = 1; } // (Otherwise - different error) else { // Error occured $this->err_str = $tx->why; $this->Debug($this->err_str); } } // Return result return $result; } // Delete Column from ColumnFamily // (Equivalent to RDBMS Insert record to a table) function DeleteRecord ($table /* ColumnFamily */, $key /* ColumnFamily Key */, $super_column=null, $columns=null) { // Initialize $this->err_str = ''; $cnt_retries = 0; $flag_timeout = 0; // If we failed init, bail if ($this->flag_failed_init) return 0; do { try { try { // Timestamp for update $timestamp = $this->time(); // Reset exception $tx = array(); if (!empty($columns)) { $slice_range = new cassandra_SliceRange(); //$slice_range->count = 100; $slice_range->start = $super_column; $slice_range->finish = $super_column; $predicate = new cassandra_SlicePredicate(); $predicate->column_names = $columns; } else $predicate = null; $cfmap = array(); $cfmap[$key][$table][] = new cassandra_mutation( array("deletion"=> new cassandra_deletion( array("super_column"=>$super_column, "predicate"=>$predicate,"clock"=>new cassandra_Clock( array('timestamp'=>$timestamp) ), "timestamp"=>$this->time()) )) ); $this->client->batch_mutate($cfmap, $this->consistency); // If we're up to here, all is well $result = 1; } catch (TException $tx) { // Error occured $result = 0; $this->err_str = $tx->why; $this->Debug($this->err_str); } // If this was a timeout error if (!$result) { // If this was a timeout error if (Strcasecmp(get_class($tx),"cassandra_TimedOutException")==0 || Strcasecmp(get_class($tx),"cassandra_UnavailableException")==0) { // Print error and retry $this->Debug("Timeout error detected. Sleeping 1 second and retrying $cnt_retries / 10 times"); sleep(1); $flag_timeout = true; $cnt_retries++; // Disconnect and re-connect $this->Disconnect(); $this->Connect(); } // (Otherwise - this was not a timeout) if (!empty($tx)) { // Print exception info print_r($tx); } } } catch (TException $tx) { } } while ($flag_timeout && $cnt_retries<10); // Return result return $result; } // Insert Column into ColumnFamily // (Equivalent to RDBMS Insert record to a table) function InsertRecord ($table /* ColumnFamily */, $key /* ColumnFamily Key */, $record /* Columns */) { // Initialize $this->err_str = ''; $cnt_retries = 0; $flag_timeout = 0; // If we failed init, bail if ($this->flag_failed_init) return 0; do { try { try { // Timestamp for update $timestamp = $this->time(); // Build batch mutation $cfmap = array(); $cfmap[$key][$table] = $this->array_to_supercolumns_or_columns($record, $timestamp); // Insert $mutation_map = null; $mutation_map["$table"][$key] = $cfmap; $this->client->batch_mutate($cfmap, $this->consistency); // If we're up to here, all is well $result = 1; } catch (TException $tx) { // Error occured $result = 0; $this->err_str = $tx->why; $this->Debug($tx->why." ".$tx->getMessage()); } // If this was a timeout error if (!$result) { // If this was a timeout error if (Strcasecmp(get_class($tx),"cassandra_TimedOutException")==0) { // Print error and retry $this->Debug("Timeout error detected. Sleeping 1 second and retrying $cnt_retries / 10 times"); sleep(1); $flag_timeout = true; $cnt_retries++; // Disconnect and re-connect $this->Disconnect(); $this->Connect(); } // (Otherwise - this was not a timeout) if (!empty($tx)) { // Print exception info print_r($tx); } } } catch (TException $tx) { } } while ($flag_timeout && $cnt_retries<10); // Return result return $result; } // Insert SuperColumn into SuperColumnFamily // (Equivalent to RDMBS Insert record to a "nested table") function InsertRecordArray ($table /* SuperColumnFamily */, $key_parent /* Super CF */, $record /* Columns */) { // Initialize $err_str = ''; // If we failed init, bail if ($this->flag_failed_init) return 0; try { // Timestamp for update $timestamp = $this->time(); // Build batch mutation $cfmap = array(); $cfmap[$key][$table] = $this->array_to_supercolumns_or_columns($record, $timestamp); // Insert $mutation_map = null; $mutation_map["$table"][$key] = $cfmap; $this->client->batch_mutate($cfmap, $this->consistency); // If we're up to here, all is well $result = 1; } catch (TException $tx) { // Error occured $result = 0; $this->err_str = $tx->why; $this->Debug($this->err_str); } // Return result return $result; } // Get keys function GetKeys($table /* ColumnFamily or SuperColumnFamily */, $start_from="", $end_at="", $super_column=null, $count=100, $reversed=0) { // Initialize $err_str = ''; // If we failed init, bail if ($this->flag_failed_init) return array(); try { $range = new cassandra_KeyRange(array("start_key"=>$start_from,"end_key"=>$end_at,"count"=>$count)); $column_parent = new cassandra_ColumnParent(); $column_parent->column_family = $table; $column_parent->super_column = $this->unparse_column_name($super_column, false); $slice_range = new cassandra_SliceRange(); $slice_range->start = "ignore";//(empty($slice_start) ? "" : $slice_start); $slice_range->finish = "ignore";//(empty($slice_finish) ? "" : $slice_finish); $slice_range->count = $count; $slice_range->reversed = $reversed; $predicate = new cassandra_SlicePredicate(); $predicate->slice_range = $slice_range; $resp = $this->client->get_range_slices($column_parent, $predicate, $range, /*$this->consistency*/cassandra_ConsistencyLevel::ONE); if (!empty($resp)) { $i = 0; foreach ($resp as $data) { $i++; $arr_result[] = $data->key; // Limit to 'count' results if ($i==$count) break; } } return ($arr_result); } catch (TException $tx) { // Error occured $this->err_str = $tx->getMessage(); $this->Debug($this->err_str); return array(); } } // Get record by key function GetRecordByKey ($table /* ColumnFamily or SuperColumnFamily */, $key, $start_from="", $end_at="", $reversed=0, $super_column="", $bStaleOk=0) { // Initialize $err_str = ''; // If we failed init, bail if ($this->flag_failed_init) return array(); try { return $this->get($table, $key, $super_column, $start_from, $end_at, $reversed, $bStaleOk); } catch (TException $tx) { // Error occured $this->err_str = $tx->why; $this->Debug($this->err_str); return array(); } } // Print debug message function Debug ($str) { // If verbose is off, we're done if (!$this->display_errors) return; // Print echo date("Y-m-d h:i:s")." CassandraDB ERROR: $str\r\n"; } // Turn verbose debug on/off (Default is off) function SetDisplayErrors($flag) { $this->display_errors = $flag; } // Set Consistency level (Default is 1) function SetConsistency ($consistency) { $this->consistency = $consistency; } // Build cf array function array_to_supercolumns_or_columns($array, $timestamp=null) { if(empty($timestamp)) $timestamp = $this->time(); if (!empty($array)) foreach($array as $name => $value) { $c_or_sc = new cassandra_ColumnOrSuperColumn(); if(is_array($value)) { $c_or_sc->super_column = new cassandra_SuperColumn(); $c_or_sc->super_column->name = $this->unparse_column_name($name, true); $c_or_sc->super_column->columns = $this->array_to_columns($value, $timestamp); $c_or_sc->super_column->timestamp = $timestamp; $c_or_sc->super_column->clock = new cassandra_Clock( array('timestamp'=>$timestamp) ); } else { $c_or_sc->column = new cassandra_Column(); $c_or_sc->column->name = $this->unparse_column_name($name, true); $c_or_sc->column->value = $value; $c_or_sc->column->timestamp = $timestamp; $c_or_sc->column->clock = new cassandra_Clock( array('timestamp'=>$timestamp) ); } $ret[] = new cassandra_mutation(array('column_or_supercolumn' => $c_or_sc )); } return $ret; } // Parse column names for Cassandra function parse_column_name($column_name, $is_column=true) { if(!$column_name) return NULL; return $column_name; } // Unparse column names for Cassandra function unparse_column_name($column_name, $is_column=true) { if(!$column_name) return NULL; return $column_name; } // Convert supercolumns or columns into an array function supercolumns_or_columns_to_array($array) { $ret = null; for ($i=0; $icolumns)) { $record = array(); for ($j=0; $jcolumns); $j++) { $column = $object->columns[$j]; $record[$column->name] = $column->value; } $ret[$object->name] = $record; } // (Otherwise - not supercolumn) else { $ret[$object->name] = $object->value; } } } return $ret; } // Get record from Cassandra function get($table, $key, $super_column=NULL, $slice_start="", $slice_finish="", $reversed=0, $bStaleOk=0) { try { // If we failed init, bail if ($this->flag_failed_init) return array(); // If stale data is ok if ($bStaleOk) { // Use ONE consistency $consistency = cassandra_ConsistencyLevel::ONE; } // (Otherwise - stale data not ok) else { // Use default consistency $consistency = $this->consistency; } // Prepare query $column_parent = new cassandra_ColumnParent(); $column_parent->column_family = $table; $column_parent->super_column = $this->unparse_column_name($super_column, false); $slice_range = new cassandra_SliceRange(); $slice_range->start = (empty($slice_start) ? "" : $slice_start); $slice_range->finish = (empty($slice_finish) ? "" : $slice_finish); $slice_range->count = 100; if ($reversed) $slice_range->reversed = true; $predicate = new cassandra_SlicePredicate(); $predicate->slice_range = $slice_range; if (is_array($key)) { $arr_result = array(); $resp = $this->client->multiget_slice(/*$this->keyspace,*/ $key, $column_parent, $predicate, $consistency); if (!empty($resp)) foreach ($resp as $key => $data) { $arr_result[$key] = $this->supercolumns_or_columns_to_array($data); } return $arr_result; } else { $resp = $this->client->get_slice(/*$this->keyspace,*/ $key, $column_parent, $predicate, $consistency); return $this->supercolumns_or_columns_to_array($resp); } } catch (TException $tx) { $this->err_str = $tx->why; $this->Debug($this->err_str); return array(); } } // Convert array to columns function array_to_columns($array, $timestamp=null) { if(empty($timestamp)) $timestamp = $this->time(); $ret = null; foreach($array as $name => $value) { $column = new cassandra_Column(); $column->name = $this->unparse_column_name($name, false); $column->value = $value; $column->timestamp = $timestamp; $column->clock = new cassandra_Clock( array('timestamp'=>$timestamp) ); $ret[] = $column; } return $ret; } // Get error string function ErrorStr() { return $this->err_str; } } ?>