diff --git a/clouddb/CMakeLists.txt b/clouddb/CMakeLists.txt index 2d4742869cb9e34b9e5a0a67603b61bb5a39151b..ed3ddfec54054e760039e78b5f53f51d9d6c3ae0 100644 --- a/clouddb/CMakeLists.txt +++ b/clouddb/CMakeLists.txt @@ -5,7 +5,9 @@ list(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake) find_package(PostgreSQL REQUIRED) -set(CMAKE_CXX_STANDARD 14) +# set(CMAKE_BUILD_TYPE Debug) + +set(CMAKE_CXX_STANDARD 17) include_directories(${PostgreSQL_INCLUDE_DIRS} ${PostgreSQL_SERVER_INCLUDE_DIRS}) diff --git a/clouddb/src/clouddb/CMakeLists.txt b/clouddb/src/clouddb/CMakeLists.txt index a92292c4226eb0839264e7ec8b879e375f35e3d5..0b11d0ba4f58431c80214646ba20095a1aec3012 100644 --- a/clouddb/src/clouddb/CMakeLists.txt +++ b/clouddb/src/clouddb/CMakeLists.txt @@ -1,31 +1,139 @@ -set(cpp_sources - facade/query_builder.cpp - facade/drivers/cassandra_driver/cassandra_driver.cpp - facade/session.cpp - facade/command/cassandra_command/cas_transaction_command.cpp - facade/command/command.cpp - facade/utils/utils.cpp - cpp.cpp + +set(USE_AWS 1) #(?) + +# Платформа (?) +set(PLATFORM "OS_LINUX") + +# Флаги линковки для платформы (need) +set(PLATFORM_LDFLAGS + "-laws-cpp-sdk-s3" + "-laws-cpp-sdk-kinesis" + "-laws-cpp-sdk-core" + "-laws-cpp-sdk-transfer" + "-lpthread" + "-lrt" + "-ldl" + "-fPIC" + "-lsnappy" + "-lgflags" + "-lz" + "-lbz2" + "-llz4" + "-lzstd" + "-lnuma" + "-ltbb" + "-luring" + "-ldl" ) -set(c_sources - utils/utils.c - tam/src/ddl_func_tam.c - tam/src/index_scan_tam.c - tam/src/misc_func_tam.c - tam/src/mod_tuple_tam.c - tam/src/non_mod_tuple_tam.c - tam/src/parallel_scan_tam.c - tam/src/scan_func_tam.c - tam/src/slot_func_tam.c - tam/tam.c - iam/iam.c - iam/src/scan_func_iam.c - iam/src/mod_func_iam.c - hooks/src/hook.c - clouddb.c +# Флаги компиляции для платформы (?) +set(PLATFORM_CCFLAGS + "-DROCKSDB_PLATFORM_POSIX" + "-DROCKSDB_LIB_IO_POSIX" + "-DOS_LINUX" + "-fno-builtin-memcmp" + "-DROCKSDB_FALLOCATE_PRESENT" + "-DSNAPPY" + "-DGFLAGS=1" + "-DZLIB" + "-DBZIP2" + "-DLZ4" + "-DZSTD" + "-DNUMA" + "-DTBB" + "-DROCKSDB_MALLOC_USABLE_SIZE" + "-DROCKSDB_PTHREAD_ADAPTIVE_MUTEX" + "-DROCKSDB_BACKTRACE" + "-DROCKSDB_RANGESYNC_PRESENT" + "-DROCKSDB_SCHED_GETCPU_PRESENT" + "-DROCKSDB_AUXV_GETAUXVAL_PRESENT" + "-DROCKSDB_IOURING_PRESENT" + "-I/usr/local/include/" + "-DUSE_AWS" + "-march=native" + "-DHAVE_UINT128_EXTENSION" ) +set(PLATFORM_CXXFLAGS #(?) + "-std=c++17" + "-faligned-new" + "-DHAVE_ALIGNED_NEW" + "-DROCKSDB_PLATFORM_POSIX" + "-DROCKSDB_LIB_IO_POSIX" + "-DOS_LINUX" + "-fno-builtin-memcmp" + "-DROCKSDB_FALLOCATE_PRESENT" + "-DSNAPPY" + "-DGFLAGS=1" + "-DZLIB" + "-DBZIP2" + "-DLZ4" + "-DZSTD" + "-DNUMA" + "-DTBB" + "-DROCKSDB_MALLOC_USABLE_SIZE" + "-DROCKSDB_PTHREAD_ADAPTIVE_MUTEX" + "-DROCKSDB_BACKTRACE" + "-DROCKSDB_RANGESYNC_PRESENT" + "-DROCKSDB_SCHED_GETCPU_PRESENT" + "-DROCKSDB_AUXV_GETAUXVAL_PRESENT" + "-DROCKSDB_IOURING_PRESENT" + "-I/usr/local/include/" + "-DUSE_AWS" + "-march=native" + "-DHAVE_UINT128_EXTENSION" +) + +# Флаги для создания общих библиотек (?) +set(PLATFORM_SHARED_CFLAGS "-fPIC") +set(PLATFORM_SHARED_EXT "so") +set(PLATFORM_SHARED_LDFLAGS "-Wl,--no-as-needed -shared -Wl,-soname -Wl,") +set(PLATFORM_SHARED_VERSIONED true) + +# Флаги линковки для исполняемых файлов (?) +set(EXEC_LDFLAGS "-ldl") + +# JEMALLOC (?) +set(JEMALLOC_INCLUDE "") +set(JEMALLOC_LIB "") +set(JEMALLOC 1) +set(WITH_JEMALLOC_FLAG 1) + +# Версии RocksDB (?) +set(ROCKSDB_MAJOR 9) +set(ROCKSDB_MINOR 1) +set(ROCKSDB_PATCH 1) + +# Другие переменные (?) +set(CLANG_SCAN_BUILD "scan-build") +set(CLANG_ANALYZER "/usr/bin/clang++") +set(PROFILING_FLAGS "-pg") +set(FIND "find") +set(WATCH "watch") +set(FOLLY_PATH "") +set(LUA_PATH "") + +set(target_source facade/query_builder.cpp + utils/utils.c + facade/drivers/cassandra_driver/cassandra_driver.cpp + facade/drivers/rocksdb_driver/rocksdb_driver.cpp + facade/session.cpp + facade/command/cassandra_command/cas_transaction_command.cpp + facade/command/rocksdb_command/roc_transaction_command.cpp + facade/command/command.cpp + tam/src/ddl_func_tam.c + tam/src/index_scan_tam.c + tam/src/misc_func_tam.c + tam/src/mod_tuple_tam.c + tam/src/non_mod_tuple_tam.c + tam/src/parallel_scan_tam.c + tam/src/scan_func_tam.c + tam/src/slot_func_tam.c + tam/tam.c + cpp.cpp + hooks/src/hook.c + clouddb.c) + add_postgresql_extension( clouddb VERSION 1.0 @@ -34,6 +142,15 @@ add_postgresql_extension( REGRESS basic ) +link_directories("/home/user/rocksdb-cloud/") link_directories("/usr/local/pgsql/lib/") +include_directories("/home/user/rocksdb-cloud/include/") + +# Установка флагов компиляции +target_compile_options(clouddb PRIVATE ${PLATFORM_CXXFLAGS} ${PLATFORM_CCFLAGS}) + +# Установка флагов линковки +target_link_libraries(clouddb PRIVATE rocksdb cassandra ${PLATFORM_LDFLAGS}) + -target_link_libraries(clouddb cassandra pq) \ No newline at end of file +target_link_libraries(clouddb cassandra pq) diff --git a/clouddb/src/clouddb/clouddb.c b/clouddb/src/clouddb/clouddb.c index e0627b75b50f0c1a7fcecc0b286e40849956ebe2..068a66aa78c345f4d5257a82b535ce4fc24a18fb 100644 --- a/clouddb/src/clouddb/clouddb.c +++ b/clouddb/src/clouddb/clouddb.c @@ -9,12 +9,8 @@ #include "hooks/include/hook.h" PG_MODULE_MAGIC; - void _PG_init() { - char *m = "hello"; - some *some2 = &(some){.message = 6}; - some2 = create_class(some2); - // printf("%d\n", getInClass(some2)); + hook_init(); } diff --git a/clouddb/src/clouddb/facade/command/cassandra_command/cas_transaction_command.cpp b/clouddb/src/clouddb/facade/command/cassandra_command/cas_transaction_command.cpp index a5e3aee27eca8802ce6f2d235cb15bfb9ff24935..b7f9726559a9b2a405ca160803f35fc40db76b71 100644 --- a/clouddb/src/clouddb/facade/command/cassandra_command/cas_transaction_command.cpp +++ b/clouddb/src/clouddb/facade/command/cassandra_command/cas_transaction_command.cpp @@ -148,6 +148,8 @@ ITransactionCommand *CasTransactionCommand::createInsertCommand( (char **)malloc(sizeof(char *) * RelationGetNumberOfAttributes(relation)); int *fieldSize = (int *)malloc(sizeof(int) * RelationGetNumberOfAttributes(relation)); + // std::cout << "size: " << RelationGetNumberOfAttributes(relation) << + // std::endl; TupleDesc desc = slot->tts_tupleDescriptor; for (int i = 0; i < desc->natts; i++) { @@ -170,6 +172,7 @@ ITransactionCommand *CasTransactionCommand::createInsertCommand( this->operation = INSERT_VALUE; this->tableName = RelationGetRelationName(relation); + // std::cout << this->tableName << " tableName" << std::endl; this->fieldData = convertToVector(fieldData, RelationGetNumberOfAttributes(relation)); free(fieldData); @@ -181,9 +184,10 @@ ITransactionCommand *CasTransactionCommand::createInsertCommand( this->fieldName.push_back(strdup(NameStr(attr->attname))); } - auto driver = std::static_pointer_cast<CassandraDriver>(session->getDriver()); slot->tts_tid = driver->getNewPointer( std::string(RelationGetRelationName(relation)), driver->getKeyspace()); + // std::cout << slot->tts_tid.ip_blkid.bi_hi << slot->tts_tid.ip_blkid.bi_lo + // << slot->tts_tid.ip_posid << std::endl; this->metadata.xmin = GetCurrentTransactionId(); this->metadata.xmin_commited = true; @@ -349,6 +353,8 @@ ITransactionCommand *CasTransactionCommand::createUpdateSlotCommand( (char **)malloc(sizeof(char *) * RelationGetNumberOfAttributes(relation)); int *fieldSize = (int *)malloc(sizeof(int) * RelationGetNumberOfAttributes(relation)); + // std::cout << "size: " << RelationGetNumberOfAttributes(relation) << + // std::endl; TupleDesc desc = slot->tts_tupleDescriptor; for (int i = 0; i < desc->natts; i++) { @@ -419,4 +425,4 @@ ITransactionCommand *CasTransactionCommand::createTruncateTable( this->operation = TRUNCATE_TABLE; this->tableName = std::string(relName); return this; -} \ No newline at end of file +} diff --git a/clouddb/src/clouddb/facade/command/command.cpp b/clouddb/src/clouddb/facade/command/command.cpp index 4f2a1095e94fe6c9006230df4c54265226a5b8bb..aa05b91fbf4cfb8934b25cad39c04230b1a2da96 100644 --- a/clouddb/src/clouddb/facade/command/command.cpp +++ b/clouddb/src/clouddb/facade/command/command.cpp @@ -5,10 +5,11 @@ ITransactionCommand *createTransactionCommand(std::shared_ptr<IDriver> driver) { return new CasTransactionCommand( std::dynamic_pointer_cast<CassandraDriver>(driver)); } - // else if (std::dynamic_pointer_cast<YDBDriver>(driver)) { - // return YDBTransactionCommand( - // std::dynamic_pointer_cast<YDBDriver>(driver)); - // } + else if (std::dynamic_pointer_cast<RocksDBDriver>(driver)) { + return new RocTransactionCommand( + std::dynamic_pointer_cast<RocksDBDriver>(driver)); + } + return nullptr; } @@ -116,4 +117,4 @@ EXPORT_C ITransactionCommand *update_tuple_to_index(Session *session, TupleTable ITransactionCommand *command = createTransactionCommand(session->getDriver()); command = command->update_tuple_to_index(session, slot, indexInfo, indexRelation, heapRelation); return command; -} \ No newline at end of file +} diff --git a/clouddb/src/clouddb/facade/command/command.h b/clouddb/src/clouddb/facade/command/command.h index 4e9a8d2c28cad0c7181bbaf480d291aac881d840..6b6963651819c70a1abdd133b006418b020efaa8 100644 --- a/clouddb/src/clouddb/facade/command/command.h +++ b/clouddb/src/clouddb/facade/command/command.h @@ -5,6 +5,8 @@ // команды каждого драйвера #include "cassandra_command/cas_transaction_command.h" +#include "rocksdb_command/roc_transaction_command.h" + #ifdef __cplusplus diff --git a/clouddb/src/clouddb/facade/command/rocksdb_command/roc_transaction_command.cpp b/clouddb/src/clouddb/facade/command/rocksdb_command/roc_transaction_command.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fdcfe28b5bca3d16a72778329967202f0af7d952 --- /dev/null +++ b/clouddb/src/clouddb/facade/command/rocksdb_command/roc_transaction_command.cpp @@ -0,0 +1,150 @@ +#include "roc_transaction_command.h" +#include <iostream> + +namespace { +std::vector<std::string> convertToVector(char **charArray, int size) { + std::vector<std::string> result; + if (charArray == nullptr) { + return result; // Если входной указатель null, возвращаем пустой вектор + } + + // Перебираем каждый элемент массива charArray + for (size_t i = 0; i < size; ++i) { + if (charArray[i] != nullptr) { + result.emplace_back(charArray[i]); // Добавляем строку в вектор + } else { + result.emplace_back(""); // Добавляем пустую строку для nullptr + } + } + return result; +} + +std::vector<size_t> convertToVector(int *intArray, int size) { + std::vector<size_t> result; + for (int i = 0; i < size; i++) { + result.push_back(intArray[i]); + } + return result; +} +} // namespace + + +int RocTransactionCommand::execute() { + if (this->operation == RCREATE_TABLE) { + driver->createTable(tableName); + }else if(this->operation == RINSERT_VALUE) { + driver->insertData(tableName, fieldName, fieldData, fieldSize, pointer); + } + return 0; +} + +RocTransactionCommand::RocTransactionCommand(std::shared_ptr<RocksDBDriver> driver) { + this->driver = std::dynamic_pointer_cast<RocksDBDriver>(driver); +} + +ITransactionCommand *RocTransactionCommand::createCreateTableCommand( + Session *session, Relation rel) { + this->operation = RCREATE_TABLE; + + // забавно, но здесь нет ни одной функции, а следовательно они безопасны для + // c++ + this->tableName = std::string(RelationGetRelationName(rel)); + for (int i = 0; i < RelationGetNumberOfAttributes(rel); i++) { + Form_pg_attribute attr = TupleDescAttr(rel->rd_att, i); + this->fieldName.push_back(strdup(NameStr(attr->attname))); + } + return this; +} + +ITransactionCommand *RocTransactionCommand::createInsertCommand( + Session *session, Relation relation, TupleTableSlot *slot, CommandId cid, + int options, BulkInsertState bistate) { + char **fieldData = + (char **)malloc(sizeof(char *) * RelationGetNumberOfAttributes(relation)); + int *fieldSize = + (int *)malloc(sizeof(int) * RelationGetNumberOfAttributes(relation)); + + TupleDesc desc = slot->tts_tupleDescriptor; + for (int i = 0; i < desc->natts; i++) { + if (slot->tts_isnull[i]) { + fieldData[i] = NULL; + fieldSize[i] = -1; + continue; + } + Oid outFunc; + bool typIsVarlena; + Form_pg_attribute attr = TupleDescAttr(desc, i); + getTypeOutputInfo(attr->atttypid, &outFunc, &typIsVarlena); + fieldData[i] = OidOutputFunctionCall(outFunc, slot->tts_values[i]); + fieldSize[i] = strlen(fieldData[i]); + } + + this->operation = RINSERT_VALUE; + + this->tableName = RelationGetRelationName(relation); + this->fieldData = + convertToVector(fieldData, RelationGetNumberOfAttributes(relation)); + free(fieldData); + this->fieldSize = + convertToVector(fieldSize, RelationGetNumberOfAttributes(relation)); + free(fieldSize); + for (int i = 0; i < RelationGetNumberOfAttributes(relation); i++) { + Form_pg_attribute attr = TupleDescAttr(relation->rd_att, i); + this->fieldName.push_back(strdup(NameStr(attr->attname))); + } + + slot->tts_tid = driver->getNewPointer( + std::string(RelationGetRelationName(relation))); + pointer = slot->tts_tid; + return this; +} + +ITransactionCommand *RocTransactionCommand::beginScanCommand( + Session *session, Relation relation, Snapshot snapshot, int nkeys, + struct ScanKeyData *key, ParallelTableScanDesc parallel_scan, uint32 flags, + TransactionId id) { + + return this; +} + +ITransactionCommand *RocTransactionCommand::selectDataCommand( + Session *session, Snapshot snapshot) { + + return this; +} + +ITransactionCommand *RocTransactionCommand::createDeleteSlotCommand( + Session *session, TransactionId id, ItemPointer newtid, Relation relation, + ItemPointer tid, CommandId cid, Snapshot snapshot, Snapshot crosscheck, + bool wait, TM_FailureData *tmfd, bool changingPart) { + + return this; +} + +ITransactionCommand *RocTransactionCommand::createUpdateSlotCommand( + Session *session, TransactionId id, Relation relation, ItemPointer otid, + TupleTableSlot *slot, CommandId cid, Snapshot snapshot, Snapshot crosscheck, + bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, + TU_UpdateIndexes *update_indexes) { + + return this; +} + +ITransactionCommand *RocTransactionCommand::createVacuumCommand( + Session *session, TransactionId id, Relation rel, VacuumParams *params, + BufferAccessStrategy bstrategy) { + + return this; +} + +ITransactionCommand *RocTransactionCommand::createDropTable(Session *session, + char *relName) { + + return this; +} + +ITransactionCommand *RocTransactionCommand::createTruncateTable( + Session *session, char *relName) { + + return this; +} diff --git a/clouddb/src/clouddb/facade/command/rocksdb_command/roc_transaction_command.h b/clouddb/src/clouddb/facade/command/rocksdb_command/roc_transaction_command.h new file mode 100644 index 0000000000000000000000000000000000000000..9ea6c2c3fd098253e0dbc7a808bf01cf6d76e3a5 --- /dev/null +++ b/clouddb/src/clouddb/facade/command/rocksdb_command/roc_transaction_command.h @@ -0,0 +1,89 @@ +#ifndef ROC_TRANSACTION_COMMAND_H +#define ROC_TRANSACTION_COMMAND_H + +#include "../../query_builder.h" +#include "../itransaction_command.h" + +#ifdef __cplusplus +#include <vector> + +#include "../../utils/metadata.h" + +enum RocTransactionOperation { + RCREATE_TABLE, + RINSERT_VALUE, + RBEGIN_SCAN, + RSELECT_DATA, + RDELETE_SLOT, + RUPDATE_SLOT, + RVACUUM_TABLE, + RTRUNCATE_TABLE, + RDROP_TABLE +}; + +class RocTransactionCommand : public ITransactionCommand { + private: + std::string tableName; + std::vector<std::string> fieldName; + std::vector<std::string> fieldData; + std::vector<size_t> fieldSize; +// RocksDBBuilder builder; + RocTransactionOperation operation; + ItemPointerData pointer; + Metadata metadata; + + // scan data + Relation relation; + Snapshot snapshot; + int nkeys; + struct ScanKeyData *key; + ParallelTableScanDesc parallel_scan; + uint32 flags; + TransactionId id; + + std::shared_ptr<RocksDBDriver> driver; + + public: + RocTransactionCommand() {}; + RocTransactionCommand(std::shared_ptr<RocksDBDriver> driver); + int execute() override; + ITransactionCommand *createCreateTableCommand(Session *session, + Relation rel) override; + ITransactionCommand *createInsertCommand(Session *session, Relation relation, + TupleTableSlot *slot, CommandId cid, + int options, + BulkInsertState bistate) override; + ITransactionCommand *beginScanCommand(Session *session, Relation relation, + Snapshot snapshot, int nkeys, + struct ScanKeyData *key, + ParallelTableScanDesc parallel_scan, + uint32 flags, + TransactionId id) override; + ITransactionCommand *selectDataCommand(Session *session, + Snapshot snapshot) override; + + ITransactionCommand *createDeleteSlotCommand( + Session *session, TransactionId id, ItemPointer newtid, Relation relation, + ItemPointer tid, CommandId cid, Snapshot snapshot, Snapshot crosscheck, + bool wait, TM_FailureData *tmfd, bool changingPart) override; + + ITransactionCommand *createUpdateSlotCommand( + Session *session, TransactionId id, Relation relation, ItemPointer otid, + TupleTableSlot *slot, CommandId cid, Snapshot snapshot, + Snapshot crosscheck, bool wait, TM_FailureData *tmfd, + LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes) override; + + ITransactionCommand *createVacuumCommand( + Session *session, TransactionId id, Relation rel, VacuumParams *params, + BufferAccessStrategy bstrategy) override; + + ITransactionCommand *createDropTable(Session *session, + char *relName) override; + + ITransactionCommand *createTruncateTable(Session *session, + char *relName) override; +}; + +#endif // !__cplusplus + +#endif // !CAS_TRANSACTION_COMMAND_H diff --git a/clouddb/src/clouddb/facade/drivers/cassandra_driver/cassandra_driver.cpp b/clouddb/src/clouddb/facade/drivers/cassandra_driver/cassandra_driver.cpp index acfbbc263796d8e17f7d08421d2e2630d8d5b7c5..a705bc3dfc1f7c9ee3d88051d9fea946eb40c752 100644 --- a/clouddb/src/clouddb/facade/drivers/cassandra_driver/cassandra_driver.cpp +++ b/clouddb/src/clouddb/facade/drivers/cassandra_driver/cassandra_driver.cpp @@ -5,6 +5,7 @@ #include <storage/itemptr.h> #include <cassandra.h> +#include <chrono> #include <cstring> #include <iostream> #include <unordered_map> @@ -124,11 +125,12 @@ CassandraDriver::CassandraDriver( : mapPointers() { builder = CassandraBuilder(); - hostname = config["cassandra.host"]; - keyspace = config["cassandra.keyspace"]; - replicationFactor = std::atoi(config["cassandra.replication_factor"].c_str()); - std::cout << this->hostname << " " << this->keyspace << " " - << this->replicationFactor << '\n'; + this->hostname = config["cassandra.host"]; + this->keyspace = config["cassandra.keyspace"]; + this->replicationFactor = + std::atoi(config["cassandra.replication_factor"].c_str()); + // std::cout << this->hostname << " " << this->keyspace << " " + // << this->replicationFactor << std::endl; clusterPtr = cass_cluster_new(); cass_cluster_set_contact_points(clusterPtr, hostname.data()); @@ -151,6 +153,7 @@ CassandraDriver::CassandraDriver( std::string query = this->builder.buildKeyspace(this->keyspace, this->replicationFactor); + // std::cout << query << std::endl; execCqlQuery(query); } @@ -178,9 +181,14 @@ void CassandraDriver::execCqlQuery(std::string query, Metadata metadata, int var) { CassError rc = CASS_OK; CassFuture *future = NULL; + // std::cout << query << std::endl; + // std::cout << metadata.mask << std::endl; CassStatement *statement = cass_statement_new(query.data(), var); convert data = {.data = metadata.ctid}; - + // std::cout << metadata.t_ctid.ip_blkid.bi_hi << " " + // << metadata.t_ctid.ip_blkid.bi_lo << " " << + // metadata.t_ctid.ip_posid + // << std::endl; cass_statement_bind_bytes(statement, var - 1, (cass_byte_t *)data.a, 6); bindPartMetadataInStatement(statement, metadata); @@ -204,6 +212,7 @@ void CassandraDriver::execCqlQuery(std::string query, CassError rc = CASS_OK; CassFuture *future = NULL; CassStatement *statement = cass_statement_new(query.data(), var); + // std::cout << "\n\n\n"; for (int i = 0; i < fields.size(); ++i) { char *a = const_cast<char *>(fields[i].c_str()); convert b; @@ -232,11 +241,20 @@ void CassandraDriver::execCqlQuery(std::string query, std::vector<std::string> fieldData, std::vector<size_t> fieldSize, Metadata metadata, int var) { + // auto start = std::chrono::high_resolution_clock::now(); CassError rc = CASS_OK; CassFuture *future = NULL; + static int i = 0; + i++; CassStatement *statement = cass_statement_new(query.data(), var); + static CassBatch *batch = NULL; + if (batch == NULL) + batch = cass_batch_new(CASS_BATCH_TYPE_UNLOGGED); // Создаем пакет + // std::cout << query << std::endl; + // std::cout << fieldData.size() << std::endl; for (int i = 0; i < fieldData.size(); ++i) { + // std::cout << fieldData[i] << " vlaue" << std::endl; if (fieldData[i] == "") { cass_statement_bind_null(statement, i + 1); continue; @@ -247,43 +265,35 @@ void CassandraDriver::execCqlQuery(std::string query, fieldSize[i]); } bindMetadataInStatement(statement, metadata, fieldData.size() + 1); - - future = cass_session_execute(this->sessionPtr, statement); - // cass_future_wait(future); - - // rc = cass_future_error_code(future); - // if (rc != CASS_OK) { - // const char *message; - // size_t message_length; - // cass_future_error_message(future, &message, &message_length); - // std::cerr << "Connection error: " << std::string(message, message_length) - // << std::endl; - // } - cass_future_free(future); + cass_batch_add_statement(batch, statement); cass_statement_free(statement); -} - -void CassandraDriver::execCqlQueryIndex(std::string query, - std::vector<std::string> fieldData, - std::vector<size_t> fieldSize, - Metadata metadata, int var) { - CassError rc = CASS_OK; - CassFuture *future = NULL; - CassStatement *statement = cass_statement_new(query.data(), var); - - for (int i = 0; i < fieldData.size(); ++i) { - if (fieldData[i] == "") { - cass_statement_bind_null(statement, i + 2); - continue; - } - - cass_statement_bind_bytes(statement, i + 2, - (const cass_byte_t *)fieldData[i].c_str(), - fieldSize[i]); - } - bindMetadataInStatementWithIndex(statement, metadata, fieldData.size() + 2); - future = cass_session_execute(this->sessionPtr, statement); + if (i == 10) { + // std::cout << "yee " << batch << " " << &batch << std::endl; + i = 0; + + future = cass_session_execute_batch(this->sessionPtr, batch); + cass_batch_free(batch); // Освобождаем пакет + batch = NULL; + // cass_future_wait(future); + + // rc = cass_future_error_code(future); + // + // if (rc != CASS_OK) { + // const char *message; + // size_t message_length; + // cass_future_error_message(future, &message, &message_length); + // std::cout << "Connection error: " << std::string(message, + // message_length) + // << std::endl; + // } + } + // auto end = std::chrono::high_resolution_clock::now(); + // std::chrono::duration<double> duration = end - start; + // std::cout << "Время выполнения функции: " << duration.count() << " секунд." + // << std::endl; + + // future = cass_session_execute(this->sessionPtr, statement); // cass_future_wait(future); // rc = cass_future_error_code(future); @@ -294,8 +304,7 @@ void CassandraDriver::execCqlQueryIndex(std::string query, // std::cerr << "Connection error: " << std::string(message, message_length) // << std::endl; // } - cass_future_free(future); - cass_statement_free(statement); + // cass_future_free(future); } CassFuture *CassandraDriver::execCqlQueryWithRes(std::string query) { @@ -349,6 +358,7 @@ ItemPointerData CassandraDriver::getLastPointer(const std::string &tableName, } std::string query = builder.buildGetLastPointer(key, tableName); + // std::cout << "query last pointer: " << query << std::endl; CassFuture *future = execCqlQueryWithRes(query); ItemPointerData last = {}; @@ -502,7 +512,11 @@ int CassandraDriver::selectData(Snapshot snapshot, size_t metadataCount) { CassRow *row = const_cast<CassRow *>(cass_iterator_get_row(this->iterator.iter)); CassError error = loadMetadata(row, metadata); - + // std::cout << metadata.xmin << " " << metadata.xmax << " select" + // << std::endl; + if (!inVisibilityArea(metadata, snapshot, ReadNextTransactionId() - 1)) { + return selectData(snapshot); // надо пофиксить + } this->iterator.row.value.clear(); this->iterator.row.size.clear(); @@ -523,7 +537,12 @@ int CassandraDriver::selectData(Snapshot snapshot, size_t metadataCount) { cass_value_get_bytes(casValue, &bytes, &size); this->iterator.row.value.push_back(bytes); this->iterator.row.size.push_back(size); - } + } + const cass_byte_t *bytes; + cass_value_get_bytes(cass_row_get_column_by_name(row, "key_name"), &bytes, + &size); + // std::cout << "select data: " << this->iterator.row.value[0] << " " + // << this->iterator.row.size.size() << std::endl; return 1; } return 0; @@ -584,7 +603,8 @@ void CassandraDriver::vacuum(std::string query, CassFuture *future, loadMetadata(row, metadata); cass_value_get_bytes(cass_row_get_column_by_name(row, KEY_NAME), &bytes, &size); - + // std::cout << metadata.xmax << " xmax metadata and id - " << id + // << std::endl; if (metadata.xmax == 0) { continue; } @@ -607,11 +627,18 @@ void CassandraDriver::vacuum(std::string query, CassFuture *future, if (keysInBatch > 0) { std::vector<std::string> keysV; for (int i = 0; i < keysInBatch; i++) { - + // std::cout << keys[i].data.ip_blkid.bi_hi << " " + // << keys[i].data.ip_blkid.bi_lo << " " << + // keys[i].data.ip_posid + // << std::endl; + // keysV.emplace_back(keys[i].a, 6); char *a = const_cast<char *>(keysV[i].c_str()); convert b; memcpy(b.a, a, 6); + // std::cout << b.data.ip_blkid.bi_hi << " " << b.data.ip_blkid.bi_lo << " + // " + // << b.data.ip_posid << std::endl; } execCqlQuery(builder.buildDropTuple(keyspace, tableName, keysInBatch), keysV, keysInBatch); diff --git a/clouddb/src/clouddb/facade/drivers/rocksdb_driver/rocksdb_driver.cpp b/clouddb/src/clouddb/facade/drivers/rocksdb_driver/rocksdb_driver.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ac3be46eb23e725b09400c60c81c603a7546539a --- /dev/null +++ b/clouddb/src/clouddb/facade/drivers/rocksdb_driver/rocksdb_driver.cpp @@ -0,0 +1,200 @@ +#include "rocksdb_driver.h" +#include <bits/this_thread_sleep.h> + +#include <iostream> + +using namespace ROCKSDB_NAMESPACE; + +std::string kDBPath = "/tmp/rocksdb_cloud_durable"; + +std::string kBucketSuffix = "cloud.durable.example."; +std::string kRegion = "us-west-2"; + +static const bool flushAtEnd = true; +static const bool disableWAL = false; + + +RocksDBDriver::RocksDBDriver(std::unordered_map<std::string, std::string> config) : mapPointers() { + std::cout << "Construct" << std::endl; + cloud_fs_options.aws_options.endpoint_override = "127.0.0.1:9000"; + + // Store a reference to a cloud file system. A new cloud env object should be + // associated with every new cloud-db. + + cloud_fs_options.credentials.InitializeSimple( + getenv("AWS_ACCESS_KEY_ID"), getenv("AWS_SECRET_ACCESS_KEY")); + if (!cloud_fs_options.credentials.HasValid().ok()) { + fprintf( + stderr, + "Please set env variables " + "AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY with cloud credentials"); + } + + // Append the user name to the bucket name in an attempt to make it + // globally unique. S3 bucket-names need to be globally unique. + // If you want to rerun this example, then unique user-name suffix here. + char* user = getenv("USER"); + kBucketSuffix.append(user); + + // "rockset." is the default bucket prefix + const std::string bucketPrefix = ""; + cloud_fs_options.src_bucket.SetBucketName(kBucketSuffix, bucketPrefix); + cloud_fs_options.dest_bucket.SetBucketName(kBucketSuffix, bucketPrefix); + + // create a bucket name for debugging purposes + const std::string bucketName = bucketPrefix + kBucketSuffix; + + // Create a new AWS cloud env Status + Status s = CloudFileSystemEnv::NewAwsFileSystem( + FileSystem::Default(), kBucketSuffix, kDBPath, kRegion, kBucketSuffix, + kDBPath, kRegion, cloud_fs_options, nullptr, &cfs); + if (!s.ok()) { + fprintf(stderr, "Unable to create cloud env in bucket %s. %s\n", + bucketName.c_str(), s.ToString().c_str()); + } + cloud_fs.reset(cfs); + + // Create options and use the AWS file system that we created earlier + cloud_env = NewCompositeEnv(cloud_fs); + options.env = cloud_env.get(); + options.create_if_missing = true; + + // No persistent read-cache + std::string persistent_cache = ""; + + // options for each write + wopt.disableWAL = disableWAL; + + // open DB + s = DBCloud::Open(options, kDBPath, persistent_cache, 0, &db); + if (!s.ok()) { + throw std::runtime_error("Unable to open db at path1 " + + kDBPath + + " with bucket " + + bucketName + + ". " + + s.ToString() + "\n"); + } + + + /*********************** + * * + * Test * + * * + ************************/ + + // s = db->Put(wopt, "key1", "value"); + + // db->Flush(FlushOptions()); + + // std::cout << "Test succes" << std::endl; +} + +void RocksDBDriver::createTable(std::string tableName){ + std::cout << "Create" << std::endl; + // ROCKSDB_NAMESPACE::ColumnFamilyOptions cfOptions; + // // Здесь можно настроить параметры семейства колонок, если необходимо + // ROCKSDB_NAMESPACE::ColumnFamilyHandle* cfHandle; + // auto status = db->CreateColumnFamily(cfOptions, tableName, &cfHandle); + // if (!status.ok()) { + // throw std::runtime_error("Unable to create column family: " + status.ToString()); + // } + + // // Сохраняем дескриптор семейства колонок в мапу + // columnFamilies[tableName] = cfHandle; +} + +void RocksDBDriver::insertData(std::string tableName, std::vector<std::string> fieldName, std::vector<std::string> fieldData, std::vector<size_t> sizes, ItemPointerData pointer) { + static int countOperation = 0; + + std::string key = std::to_string(pointer.ip_blkid.bi_hi) + "_" + + std::to_string(pointer.ip_blkid.bi_lo) + "_" + + std::to_string(pointer.ip_posid); + + std::string value; + for (size_t i = 0; i < fieldData.size(); ++i) { + value += fieldData[i]; + if (i < fieldData.size() - 1) { + value += "|"; // Use a delimiter to separate fields + } + } + + // auto batch = rocksdb::WriteBatch(); + auto status = db->Put(this->wopt, key, value); + + if (!status.ok()) { + throw std::runtime_error("Failed to insert data: " + status.ToString()); + } + + countOperation++; + if(countOperation % 50000 == 0){ + db->Flush(FlushOptions()); + } +} + +ItemPointerData RocksDBDriver::getNewPointer(std::string tableName) { + ItemPointerData data = getLastPointer(tableName); + if (data.ip_posid < UINT16_MAX) { + data.ip_posid++; + } else { + data.ip_posid = 1; + if (data.ip_blkid.bi_lo < UINT16_MAX) { + data.ip_blkid.bi_lo++; + } else { + data.ip_blkid.bi_lo = 0; + if (data.ip_blkid.bi_hi < UINT16_MAX) { + data.ip_blkid.bi_hi++; + } else { + data.ip_blkid.bi_hi = 0; + } + } + } + mapPointers[tableName] = data; + return data; +} + +ItemPointerData RocksDBDriver::getLastPointer(const std::string &tableName) { + auto it = this->mapPointers.find(tableName); + if (it != this->mapPointers.end()) { + return it->second; + } + + ItemPointerData last = {}; + ROCKSDB_NAMESPACE::ReadOptions readOptions; + ROCKSDB_NAMESPACE::Iterator* itr = db->NewIterator(readOptions); + + itr->SeekToLast(); + if (itr->Valid()) { + std::string lastKey = itr->key().ToString(); + std::istringstream iss(lastKey); + std::string token; + std::vector<uint16_t> parts; + + while (std::getline(iss, token, '_')) { + parts.push_back(static_cast<uint16_t>(std::stoi(token))); + } + + if (parts.size() == 3) { + last.ip_blkid.bi_hi = parts[0]; + last.ip_blkid.bi_lo = parts[1]; + last.ip_posid = parts[2]; + } + }else{ + last.ip_blkid.bi_hi = 1; + last.ip_blkid.bi_lo = 1; + last.ip_posid = 1; + return last; + } + + delete itr; + + mapPointers[tableName] = last; + return last; + // last.ip_blkid.bi_hi = 1; + // last.ip_blkid.bi_lo = 1; + // last.ip_posid = 1; + // return last; + +} + + diff --git a/clouddb/src/clouddb/facade/drivers/rocksdb_driver/rocksdb_driver.h b/clouddb/src/clouddb/facade/drivers/rocksdb_driver/rocksdb_driver.h new file mode 100644 index 0000000000000000000000000000000000000000..5c150f57090ee8c8c809057ef1da6f6e187c9312 --- /dev/null +++ b/clouddb/src/clouddb/facade/drivers/rocksdb_driver/rocksdb_driver.h @@ -0,0 +1,77 @@ +#ifndef ROCKSDB_DRIVER_H +#define ROCKSDB_DRIVER_H + +#include "../idriver.h" + +#ifdef __cplusplus +extern "C" { +#endif +#include <postgres.h> +#include <fmgr.h> + +#include <access/transam.h> +#ifdef __cplusplus +} +#endif + +#ifdef __cplusplus + +#include <string> +#include <unordered_map> +#include <sstream> + +#include "../../../utils/utils.h" +#include "../../utils/metadata.h" + +#include <rocksdb/listener.h> +#include <rocksdb/options.h> +#include <rocksdb/db.h> +#include <rocksdb/slice.h> +#include <rocksdb/iterator.h> + +#include <bits/std_thread.h> + +#include <rocksdb/cloud/db_cloud.h> +#include <rocksdb/options.h> + + +class RocksDBDriver: public IDriver { +private: + ROCKSDB_NAMESPACE::DBCloud* db; + std::unordered_map<std::string, ROCKSDB_NAMESPACE::ColumnFamilyHandle*> columnFamilies; + + //require live for correct work + std::unordered_map<std::string, ItemPointerData> mapPointers; + ROCKSDB_NAMESPACE::WriteOptions wopt; + std::shared_ptr<ROCKSDB_NAMESPACE::FileSystem> cloud_fs; + ROCKSDB_NAMESPACE::Options options; + std::unique_ptr<ROCKSDB_NAMESPACE::Env> cloud_env; + ROCKSDB_NAMESPACE::CloudFileSystemOptions cloud_fs_options; + ROCKSDB_NAMESPACE::CloudFileSystem* cfs; +public: + ~RocksDBDriver() = default; + RocksDBDriver(std::unordered_map<std::string, std::string> config); + + void createTable(std::string tableName); + + void insertData(std::string tableName, std::vector<std::string> fieldName, std::vector<std::string> fieldData, std::vector<size_t> sizes, ItemPointerData pointer) ; + + void beginScan(Relation relation, Snapshot snapshot, int nkeys, + struct ScanKeyData* key, + ParallelTableScanDesc parallel_scan, uint32 flags, + TransactionId id) { return; }; + int selectData(Snapshot snapshot) { return 0; }; + char** getValue() { return 0; }; + ItemPointerData getTid() { return ItemPointerData(); }; + + ItemPointerData getNewPointer(std::string tableName); + ItemPointerData getLastPointer(const std::string &tableName); + + void startFlushThread(); + void stopFlushThread(); +}; + +#endif + + +#endif \ No newline at end of file diff --git a/clouddb/src/clouddb/facade/query_builder.cpp b/clouddb/src/clouddb/facade/query_builder.cpp index d969304ef4a11820b1743536aefc3edb39498640..4ab0dbebd8fefb6de45ec9ed4fea33c86b8b3338 100644 --- a/clouddb/src/clouddb/facade/query_builder.cpp +++ b/clouddb/src/clouddb/facade/query_builder.cpp @@ -220,9 +220,7 @@ std::string CassandraBuilder::buildSelectAll(std::string keyspace, query += " <= "; query += std::to_string(id); query += " ALLOW FILTERING;"; - // std::cout << query << std::endl; - return query; } diff --git a/clouddb/src/clouddb/facade/session.cpp b/clouddb/src/clouddb/facade/session.cpp index 6216efe3d40477545b57f26d2f457e70e56c606d..f9f8f6f704db8eff5f7b09d3a0d88ed6fba95ca9 100644 --- a/clouddb/src/clouddb/facade/session.cpp +++ b/clouddb/src/clouddb/facade/session.cpp @@ -19,6 +19,8 @@ std::unordered_map<std::string, std::string> parseConfig(char **config) { Session::Session(std::unordered_map<std::string, std::string> config) { if (config["driver"] == "cassandra") { this->driver = std::make_shared<CassandraDriver>(config); + }else if(config["driver"] == "rocksdb") { + this->driver = std::make_shared<RocksDBDriver>(config); } } diff --git a/clouddb/src/clouddb/facade/session.h b/clouddb/src/clouddb/facade/session.h index dd0cdccbbbcff799c81f189a47bb2e7da295f4b8..38d6e5410149d10ffc086d7fd4834ba4c290bdfe 100644 --- a/clouddb/src/clouddb/facade/session.h +++ b/clouddb/src/clouddb/facade/session.h @@ -4,14 +4,16 @@ #include "../utils/c_export.h" #include "command/icommand.h" #include "drivers/cassandra_driver/cassandra_driver.h" +#include "drivers/rocksdb_driver/rocksdb_driver.h" // #ifdef __cplusplus #include <memory> #include <string> #include <unordered_map> +#include <iostream> -enum DRIVERS { Cassandra }; +enum DRIVERS { Cassandra, RocksDb }; class Session { public: @@ -27,7 +29,7 @@ class Session { std::weak_ptr<ICommand> command; Session(std::unordered_map<std::string, std::string> config); Session(); - ~Session() = default; + ~Session() {std::cout << "~Session" << std::endl;}; // delete all Session(const Session &) = delete; diff --git a/clouddb/src/clouddb/tam/all_tam.h b/clouddb/src/clouddb/tam/all_tam.h index 43134944a9170064d27bc9543277dacd2b0d20ff..d00e11646c937c6c66020baf5524bf1acffb5053 100644 --- a/clouddb/src/clouddb/tam/all_tam.h +++ b/clouddb/src/clouddb/tam/all_tam.h @@ -16,114 +16,7 @@ #include "../facade/command/command.h" #include "../utils/utils.h" -#include "access/hio.h" -#include "access/multixact.h" -#include "access/relscan.h" -#include "access/sysattr.h" -#include "access/transam.h" -#include "access/valid.h" -#include "access/visibilitymap.h" -#include "access/xact.h" -#include "access/xlogutils.h" -#include "catalog/catalog.h" -#include "catalog/namespace.h" -#include "miscadmin.h" -#include "pgstat.h" -#include "storage/bufmgr.h" -#include "storage/freespace.h" -#include "storage/lmgr.h" -#include "storage/procarray.h" -#include "storage/smgr.h" -#include "storage/standby.h" -#include "utils/datum.h" -#include "utils/inval.h" -#include "utils/snapmgr.h" -#include "utils/syscache.h" - - -#include "access/genam.h" -#include "access/table.h" -#include "catalog/indexing.h" -#include "catalog/objectaddress.h" -#include "catalog/pg_description.h" -#include "catalog/pg_shdescription.h" -#include "commands/comment.h" -#include "commands/dbcommands.h" -#include "utils/fmgroids.h" -#include "catalog/pg_index.h" -#include "catalog/pg_class.h" -#include "catalog/pg_namespace.h" -#include "utils/relcache.h" -#include "utils/varbit.h" -#include "commands/sequence.h" - - -#include <access/heapam.h> -#include <access/htup_details.h> -#include <access/relation.h> -#include <access/tableam.h> -#include <cassandra.h> -#include <catalog/index.h> -#include <commands/vacuum.h> -#include <executor/executor.h> -#include <executor/spi.h> -#include <executor/tuptable.h> -#include <fmgr.h> -#include <miscadmin.h> -#include <nodes/execnodes.h> -#include <postmaster/bgworker.h> -#include <postmaster/interrupt.h> -#include <stdlib.h> -#include <storage/ipc.h> -#include <storage/lwlock.h> -#include <storage/shmem.h> -#include <string.h> -#include <tcop/tcopprot.h> -#include <utils/builtins.h> -#include <utils/guc.h> -#include <utils/lsyscache.h> -#include <utils/rel.h> /* ... and relations */ -#include <utils/snapmgr.h> - -#include "access/amapi.h" -#include "access/genam.h" -#include "access/relscan.h" -#include "catalog/index.h" -#include "catalog/pg_type.h" -#include "commands/vacuum.h" -#include "miscadmin.h" -#include "storage/bufmgr.h" -#include "utils/memutils.h" -#include "nodes/pathnodes.h" - -#include <access/heapam.h> -#include <access/htup_details.h> -#include <access/relation.h> -#include <access/tableam.h> -#include <cassandra.h> -#include <catalog/index.h> -#include <commands/vacuum.h> -#include <executor/executor.h> -#include <executor/spi.h> -#include <executor/tuptable.h> -#include <fmgr.h> -#include <miscadmin.h> -#include <nodes/execnodes.h> -#include <postmaster/bgworker.h> -#include <postmaster/interrupt.h> -#include <stdlib.h> -#include <storage/ipc.h> -#include <storage/lwlock.h> -#include <storage/shmem.h> -#include <string.h> -#include <tcop/tcopprot.h> -#include <utils/builtins.h> -#include <utils/guc.h> -#include <utils/lsyscache.h> -#include <utils/rel.h> /* ... and relations */ -#include <utils/snapmgr.h> -#include "access/amapi.h" - +extern Session* session; extern const TableAmRoutine clouddb_methods; #endif diff --git a/clouddb/src/clouddb/tam/src/mod_tuple_tam.c b/clouddb/src/clouddb/tam/src/mod_tuple_tam.c index 4a7c40cf7a8340fc073e41e729aa8400ce434f12..9ddd5e479eeebd75972e42a56d717c4b47dc55cd 100644 --- a/clouddb/src/clouddb/tam/src/mod_tuple_tam.c +++ b/clouddb/src/clouddb/tam/src/mod_tuple_tam.c @@ -4,9 +4,14 @@ void clouddb_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, int options, BulkInsertState bistate) { + // Вычисляем разницу + /* clock_t start = clock(); */ Session *session = getSessionInstance(NULL); executeCommand(session, createInsertCommand(session, relation, slot, cid, options, bistate)); + /* clock_t end = clock(); */ + /* double time_spent = (double)(end - start) / CLOCKS_PER_SEC * 1000; */ + /* printf("Время выполнения функции: %.3f milliseconds\n", time_spent); */ } void clouddb_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, @@ -136,3 +141,4 @@ TM_Result clouddb_tuple_lock(Relation relation, ItemPointer tid, TM_Result result = 0; return result; } + diff --git a/clouddb/src/clouddb/tam/src/scan_func_tam.c b/clouddb/src/clouddb/tam/src/scan_func_tam.c index b7a9878899a35734fa00e814d487e4bca3c15176..489bef859d4a833c3577ae28cc5d9f9721454ce5 100644 --- a/clouddb/src/clouddb/tam/src/scan_func_tam.c +++ b/clouddb/src/clouddb/tam/src/scan_func_tam.c @@ -43,8 +43,12 @@ bool clouddb_getnextslot(TableScanDesc sscan, ScanDirection direction, char **values = getValueForScan(driver); slot->tts_tid = getTidForScan(driver); + /* printf("%d %d %d\n", slot->tts_tid.ip_posid, slot->tts_tid.ip_blkid.bi_lo, + */ + /* slot->tts_tid.ip_blkid.bi_hi); */ TupleDesc desc = RelationGetDescr(sscan->rs_rd); + /* printf("%s\n", values[0]); */ for (int i = 0; i < desc->natts; ++i) { char *curValue = values[i]; diff --git a/clouddb/src/clouddb/tam/tam.c b/clouddb/src/clouddb/tam/tam.c index 3d2059d29870985c7ba96c15ef00e3a59760a2e4..dbd294918d24202544cc1981094e2acf8d28b593 100644 --- a/clouddb/src/clouddb/tam/tam.c +++ b/clouddb/src/clouddb/tam/tam.c @@ -7,6 +7,8 @@ #include "include/scan_func_tam.h" #include "include/slot_func_tam.h" +Session* session; + const TableAmRoutine clouddb_methods = { .type = T_TableAmRoutine, @@ -71,20 +73,21 @@ const TableAmRoutine clouddb_methods = { PG_FUNCTION_INFO_V1(clouddb_table_handler); Datum clouddb_table_handler(PG_FUNCTION_ARGS) { - char *keyspace = (char *)GetConfigOption("cassandra.keyspace", false, false); - char *host = (char *)GetConfigOption("cassandra.host", false, false); + printf("Init\n"); + char *host = GetConfigOption("cassandra.host", false, false); + char *keyspace = GetConfigOption("cassandra.keyspace", false, false); char *replicationFactor = (char *)GetConfigOption("cassandra.replication_factor", false, false); - char **config = (char **)malloc(sizeof(char *) * 7); - config[0] = "cassandra"; + char **config = (char **)malloc(sizeof(char *) * 7); + config[0] = "rocksdb"; config[1] = "cassandra.host"; - config[2] = host; + config[2] = "host"; config[3] = "cassandra.keyspace"; - config[4] = keyspace; + config[4] = "keyspace"; config[5] = "cassandra.replication_factor"; - config[6] = replicationFactor; - Session *session = getSessionInstance((void *)config); + config[6] = "replicationFactor"; + session = getSessionInstance(config); PG_RETURN_POINTER(&clouddb_methods); }