diff --git a/CMakeLists.txt b/CMakeLists.txt index c412bac..65fc89f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,60 +1,66 @@ cmake_minimum_required(VERSION 3.22) -project(metal_kompanion_mcp LANGUAGES CXX) +project(Kompanion LANGUAGES CXX) +set(PROJECT_VERSION "0.0.1") + +set(QT_MIN_VERSION "6.8.0") +set(KF6_MIN_VERSION "6.8.0") +set(KDE_COMPILERSETTINGS_LEVEL "5.82") set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) -include(GNUInstallDirs) +find_package(ECM ${KF6_MIN_VERSION} REQUIRED NO_MODULE) +set(CMAKE_MODULE_PATH ${ECM_MODULE_PATH}) -option(KOM_ENABLE_PG "Enable Postgres-backed DAL support" ON) +include(KDEInstallDirs) +include(KDECMakeSettings) +include(KDECompilerSettings NO_POLICY_SCOPE) +include(ECMMarkAsTest) +include(ECMMarkNonGuiExecutable) +include(FeatureSummary) +include(CheckIncludeFile) +include(CheckIncludeFiles) +include(CheckSymbolExists) +include(ECMOptionalAddSubdirectory) +include(KDEClangFormat) +include(ECMDeprecationSettings) -set(KOM_HAVE_PG OFF) -if (KOM_ENABLE_PG) - find_package(PostgreSQL QUIET) - find_package(pqxx QUIET) - if (PostgreSQL_FOUND AND pqxx_FOUND) - set(KOM_HAVE_PG ON) - endif() +include(KDEGitCommitHooks) + +find_package(Qt6 ${QT_MIN_VERSION} CONFIG REQUIRED COMPONENTS + Core + Sql +) + +option(KOMPANION_USE_GUI "Build optional GUI components using Qt6Gui" ON) +if (KOMPANION_USE_GUI) + find_package(Qt6 ${QT_MIN_VERSION} CONFIG REQUIRED COMPONENTS Gui) endif() -if (KOM_HAVE_PG) - message(STATUS "kom_dal: Postgres support enabled (HAVE_PG)") -else() - message(STATUS "kom_dal: Postgres support disabled (pqxx/libpq not found)") -endif() - -set(KOM_HAVE_PG ${KOM_HAVE_PG} CACHE INTERNAL "kom_dal has Postgres backend") - set(KOMPANION_KCONFIG_TARGET "") -find_package(KF6Config QUIET) +find_package(KF6Config ${KF6_MIN_VERSION} QUIET) if (KF6Config_FOUND) - set(KOMPANION_KCONFIG_TARGET KF6::ConfigCore) + set(KOMPANION_KCONFIG_TARGET KF6::ConfigCore) else() - find_package(KF5Config QUIET) - if (KF5Config_FOUND) - set(KOMPANION_KCONFIG_TARGET KF5::ConfigCore) - endif() + find_package(KF5Config QUIET) + if (KF5Config_FOUND) + set(KOMPANION_KCONFIG_TARGET KF5::ConfigCore) + endif() endif() +add_feature_info("KConfig" NOT (KOMPANION_KCONFIG_TARGET STREQUAL "") "Persist settings via KConfig if available.") -if (KOMPANION_KCONFIG_TARGET STREQUAL "") - message(WARNING "KConfig (KF6/KF5) not found; defaulting to environment-based configuration.") -endif() +find_package(Qt6Test ${QT_MIN_VERSION} CONFIG QUIET) +set_package_properties(Qt6Test PROPERTIES + PURPOSE "Required for tests" + TYPE OPTIONAL + ) +add_feature_info("Qt6Test" Qt6Test_FOUND "Required for building tests") -set(KOMPANION_DB_INIT_INSTALL_DIR "${CMAKE_INSTALL_FULL_DATAROOTDIR}/kompanion/db/init") -install(DIRECTORY db/init/ DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/kompanion/db/init FILES_MATCHING PATTERN "*.sql") + +set(KOMPANION_DB_INIT_INSTALL_DIR "${KDE_INSTALL_FULL_DATADIR}/kompanion/db/init") +install(DIRECTORY db/init/ DESTINATION ${KDE_INSTALL_DATADIR}/kompanion/db/init FILES_MATCHING PATTERN "*.sql") option(BUILD_KOMPANION_CLI "Build Kompanion Qt command-line client" ON) -if (BUILD_KOMPANION_CLI) - find_package(Qt6 COMPONENTS Core QUIET) - if (NOT Qt6_FOUND) - message(WARNING "Qt6 Core not found; disabling Kompanion CLI.") - set(BUILD_KOMPANION_CLI OFF) - endif() -endif() - -# Placeholder: find Qt and qtmcp when available -# find_package(Qt6 COMPONENTS Core Network REQUIRED) -# find_package(qtmcp REQUIRED) add_subdirectory(src/dal) @@ -63,14 +69,15 @@ add_executable(kom_mcp ) target_include_directories(kom_mcp PRIVATE src) target_link_libraries(kom_mcp PRIVATE kom_dal) +if (NOT KOMPANION_KCONFIG_TARGET STREQUAL "") + target_link_libraries(kom_mcp PRIVATE ${KOMPANION_KCONFIG_TARGET}) + target_compile_definitions(kom_mcp PRIVATE HAVE_KCONFIG) +endif() +target_compile_options(kom_mcp PRIVATE -fexceptions) target_compile_definitions(kom_mcp PRIVATE PROJECT_SOURCE_DIR="${CMAKE_SOURCE_DIR}" KOMPANION_DB_INIT_INSTALL_DIR="${KOMPANION_DB_INIT_INSTALL_DIR}" ) -if (NOT KOMPANION_KCONFIG_TARGET STREQUAL "") - target_link_libraries(kom_mcp PRIVATE ${KOMPANION_KCONFIG_TARGET}) - target_compile_definitions(kom_mcp PRIVATE HAVE_KCONFIG) -endif() install(TARGETS kom_mcp RUNTIME DESTINATION bin) @@ -81,15 +88,20 @@ if (BUILD_KOMPANION_CLI) src/cli/KompanionApp.cpp ) target_include_directories(kompanion PRIVATE src) - target_link_libraries(kompanion PRIVATE Qt6::Core kom_dal) + target_link_libraries(kompanion PRIVATE + Qt6::Core + Qt6::Sql + kom_dal + ) target_compile_definitions(kompanion PRIVATE PROJECT_SOURCE_DIR="${CMAKE_SOURCE_DIR}" KOMPANION_DB_INIT_INSTALL_DIR="${KOMPANION_DB_INIT_INSTALL_DIR}" ) if (NOT KOMPANION_KCONFIG_TARGET STREQUAL "") - target_link_libraries(kompanion PRIVATE ${KOMPANION_KCONFIG_TARGET}) - target_compile_definitions(kompanion PRIVATE HAVE_KCONFIG) + target_link_libraries(kompanion PRIVATE ${KOMPANION_KCONFIG_TARGET}) + target_compile_definitions(kompanion PRIVATE HAVE_KCONFIG) endif() + target_compile_options(kompanion PRIVATE -fexceptions -Wno-unused-function) install(TARGETS kompanion RUNTIME DESTINATION bin) endif() @@ -97,3 +109,5 @@ if (BUILD_TESTS) enable_testing() add_subdirectory(tests) endif() + +feature_summary(WHAT ALL INCLUDE_QUIET_PACKAGES FATAL_ON_MISSING_REQUIRED_PACKAGES) diff --git a/docs/configuration.md b/docs/configuration.md index 275da19..aefc1ab 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -12,6 +12,7 @@ The CLI (`kompanion`) and MCP runner (`kom_mcp`) fall back to this entry when th ## Initialization Wizard - Run `kompanion --init` to launch an interactive wizard. - Autodetects reachable Postgres instances (tries `postgresql://kompanion:komup@localhost/kompanion_test`). + - Inspects local socket (`/var/run/postgresql`) and existing databases owned by the current user via `psql -At`, offering them as defaults. - Prompts for host, port, database, user, password, or Unix socket path with sensible defaults. - Writes the resulting DSN to `kompanionrc` and exports `PG_DSN` for the current session. - If the target database is empty, it applies the SQL migrations shipped under `share/kompanion/db/init/*.sql`. diff --git a/docs/dal-skeleton.md b/docs/dal-skeleton.md index 1077e45..31b29ec 100644 --- a/docs/dal-skeleton.md +++ b/docs/dal-skeleton.md @@ -2,7 +2,7 @@ ## Interfaces - `IDatabase` — connect/tx + memory ops (ensureNamespace, upsertItem/Chunks/Embeddings, searchText/searchVector). -- `PgDal` — stub implementation for now (no libpq linked). +- `PgDal` — Qt6/QSql-based implementation with in-memory fallback. ## SQL Calls (target) - ensureNamespace: `INSERT ... ON CONFLICT (name) DO UPDATE RETURNING id` @@ -14,12 +14,11 @@ ## Next - Wire `Handlers::upsert_memory` / `search_memory` to `IDatabase`. -- Add libpq (or pqxx) and parameterized statements. -- Add RLS/session GUCs & retries. +- Harden SQL with RLS/session GUCs & retries. +- Expand hybrid search scoring (RRF weights, secret filters). ## Implementation Checklist (2025-10-15) -- Detect `libpq` + `pqxx` in CMake, define `HAVE_PG`, and link `kom_dal` against `PostgreSQL::PostgreSQL` and `pqxx::pqxx`. -- During `PgDal::connect`, prepare statements for namespace ensure/upsert, chunk/embedding insert, text/vector search, and hybrid search. -- Guard runtime selection: `stub://` DSN keeps the current in-memory store; non-stub DSNs require a live Postgres connection. -- Expose environment variables in docs: `PG_DSN` (full libpq connection string) and optional `PGSSLMODE`. -- Surface informative `std::runtime_error` messages when pqxx operations fail so MCP handlers can emit actionable errors. +- Require Qt6::Sql (`QPSQL` driver) at configure time; bail out early when unavailable. +- During `PgDal::connect`, parse DSNs with `QUrl`, open `QSqlDatabase`, and retain in-memory fallback for `stub://`. +- Use `QSqlQuery` with `INSERT ... RETURNING` for namespace/item/chunk/embedding operations. +- Derive DSNs from `kompanionrc` (KConfig) or CLI wizard, and surface informative `std::runtime_error` messages when QSql operations fail. diff --git a/docs/memory-architecture.md b/docs/memory-architecture.md index 028e326..3a6b1ee 100644 --- a/docs/memory-architecture.md +++ b/docs/memory-architecture.md @@ -1,30 +1,26 @@ # Memory Architecture Roadmap (2025-10-15) ## Current Snapshot -- `PgDal` remains an in-memory stub; `libpq`/`pqxx` are not linked and there is no `HAVE_PG` compile guard. -- `contract_memory` now builds via `tests/contract_memory.cpp` as a dedicated CTest executable (stub DAL still backing it). -- MCP handlers directly invoke the stub DAL and perform ad-hoc JSON parsing. -- No `resources/` descriptors exist for episodic memory APIs or the semantic sync workflow. +- `PgDal` now prefers Qt6/QSql (`QPSQL`) with an in-memory fallback for `stub://` DSNs; schema migrations live in `db/init/`. +- `kompanion --init` guides DSN detection (psql socket probing), applies migrations, and persists config via `~/.config/kompanionrc`. +- MCP handlers still parse JSON manually but leverage the shared DAL; resource descriptors under `resources/memory/kom.memory.v1/` capture episodic/semantic contracts. +- Contract tests (`contract_memory`, `contract_mcp_tools`, `mcp_memory_exchange`) validate the Qt-backed DAL and MCP handlers. ## 1. CTest Target: `contract_memory` 1. Keep `contract_memory.cpp` focused on exercising `PgDal` write/read surfaces; expand as DAL features land. 2. Ensure the executable runs without Postgres by defaulting to `stub://memory` when `PG_DSN` is absent. -3. Layer follow-up assertions once pqxx-backed code exists (e.g., detect `HAVE_PG` and connect to real DB in CI). +3. Layer follow-up assertions once the QSql path is exercised end-to-end (CI can target the packaged test database). -## 2. DAL Prepared Statements (`HAVE_PG`) +## 2. DAL (Qt6/QSql) Evolution **Dependencies** -- System packages: `libpq` headers and `libpqxx` (>= 7.8). -- CMake: `find_package(PostgreSQL REQUIRED)` and `find_package(pqxx REQUIRED)`; link `PostgreSQL::PostgreSQL` and `pqxx::pqxx`. +- Qt6 (Core, Sql) with the `QPSQL` driver available at runtime. +- KDE Frameworks `ConfigCore` for persisting DSNs in `kompanionrc`. **Implementation Steps** -1. Extend `src/dal/CMakeLists.txt` to define `HAVE_PG` and link the libraries when detection succeeds. -2. Enhance `PgDal` to manage both modes: - - `stub://` DSN → in-memory containers (status quo). - - Other DSNs → establish `pqxx::connection`, store it via `std::unique_ptr`, and call `conn->prepare(...)` during `connect`. -3. Prepare statements for: `ensure_namespace`, `upsert_item`, `insert_chunk`, `insert_embedding`, `search_text`, `search_vector`, `hybrid_search`. -4. Implement transactional helpers using `pqxx::work`, with RAII to guarantee `commit()`/`abort()` pairing. -5. Translate `pqxx` exceptions into `std::runtime_error` with context so MCP handlers can emit useful error JSON. -6. Document required environment variables (`PG_DSN`, optional `PGSSLMODE`) plus migration expectations in `docs/dal-skeleton.md`. +1. Parse libpq-style DSNs with `QUrl`, open `QSqlDatabase` connections when the DSN is not `stub://`, and maintain the existing in-memory fallback for tests. +2. Use `QSqlQuery` `INSERT ... RETURNING` statements for namespaces, items, chunks, and embeddings; emit vector literals (`[0.1,0.2]`) when targeting pgvector columns. +3. Surface detailed `QSqlError` messages (throwing `std::runtime_error`) so MCP handlers and the CLI can report actionable failures. +4. Share configuration between CLI and MCP runners via KConfig (`Database/PgDsn`), seeded through the new `kompanion --init` wizard. ## 3. MCP `resources/*` & Episodic→Semantic Sync **Directory Layout** @@ -60,10 +56,10 @@ 3. Merge results in C++ with Reciprocal Rank Fusion or weighted sum, ensuring deterministic ordering on ties. **Handler Integration** -1. Extend `PgDal::hybridSearch` to dispatch to the prepared statements when `HAVE_PG` is defined; reuse in-memory fallback otherwise. +1. Ensure `PgDal::hybridSearch` delegates to SQL-based lexical/vector search when a database connection is active, reusing the in-memory fallback only for `stub://`. 2. Return richer matches (id, score, optional chunk text) to satisfy MCP response schema. 3. Update `HandlersMemory::search_memory` to surface the new scores and annotate whether lexical/vector contributed (optional metadata). -4. Add a contract test scenario once pqxx-backed execution is available (requires live Postgres fixture later). +4. Exercise hybrid queries in contract tests against the packaged test database (`db/scripts/create-test-db.sh`). ## 5. Secret Handling, Snapshots, and CLI Hooks - **Secret propagation**: episodic `sensitivity` + `embeddable` flags gate embedding generation. DAL queries will add predicates (`metadata->>'sensitivity' != 'secret'`) before hybrid search. @@ -76,9 +72,9 @@ - **CLI UX**: `kompanion --init` guides first-run setup (auto-detects databases, applies schemas); `-I/--interactive` keeps a JSON REPL open, and `-V/--verbose` echoes request/response streams for future HTTP transport parity. ## Next-Step Checklist -- [x] Detect pqxx via CMake and plumb `HAVE_PG`. +- [x] Promote Qt6/QSql backend (QPSQL) as default DAL; retain `stub://` fallback for tests. - [x] Normalize contract_memory CTest target and remove stale library target. - [ ] Author `resources/memory/` descriptors and sync job outline. -- [ ] Extend DAL header to carry prepared-statement aware APIs (may introduce new structs). +- [ ] Extend DAL header to expose richer query structs (filters, pagination, secret handling). - [x] Update `docs/mcp-memory-api.md` to mention episodic sync + hybrid search fields. - [ ] Create follow-up acf subtasks when concrete implementation begins (pgvector migration, scheduler hook, runtime wiring). diff --git a/src/dal/CMakeLists.txt b/src/dal/CMakeLists.txt index 43d6aba..6dd0c04 100644 --- a/src/dal/CMakeLists.txt +++ b/src/dal/CMakeLists.txt @@ -4,8 +4,5 @@ add_library(kom_dal STATIC target_compile_features(kom_dal PUBLIC cxx_std_20) target_include_directories(kom_dal PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) - -if (KOM_HAVE_PG) - target_compile_definitions(kom_dal PUBLIC HAVE_PG) - target_link_libraries(kom_dal PUBLIC pqxx::pqxx PostgreSQL::PostgreSQL) -endif() +target_link_libraries(kom_dal PUBLIC Qt6::Core Qt6::Sql) +target_compile_options(kom_dal PRIVATE -fexceptions) diff --git a/src/dal/PgDal.cpp b/src/dal/PgDal.cpp index 223803b..b33200e 100644 --- a/src/dal/PgDal.cpp +++ b/src/dal/PgDal.cpp @@ -1,18 +1,22 @@ #include "PgDal.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include #include -#include -#include -#include #include -#include #include -#include - -#ifdef HAVE_PG -#include -#endif namespace kom { @@ -22,104 +26,265 @@ bool idsContains(const std::vector& ids, const std::string& value) return std::find(ids.begin(), ids.end(), value) != ids.end(); } -bool isStubDsn(const std::string& dsn) { - return dsn.empty() || dsn.rfind("stub://", 0) == 0; -} } // namespace PgDal::PgDal() = default; -PgDal::~PgDal() = default; -void PgDal::resetInMemoryStore() { - namespacesByName_.clear(); - namespacesById_.clear(); - items_.clear(); - itemsByNamespace_.clear(); - chunks_.clear(); - chunksByItem_.clear(); - embeddings_.clear(); - - nextNamespaceId_ = 1; - nextItemId_ = 1; - nextChunkId_ = 1; - nextEmbeddingId_ = 1; +PgDal::~PgDal() { + closeDatabase(); } +bool PgDal::isStubDsn(const std::string& dsn) { + return dsn.empty() || dsn.rfind("stub://", 0) == 0; +} + +namespace { + +QVariant toSqlTimestamp(const std::chrono::system_clock::time_point& tp) { + if (tp.time_since_epoch().count() == 0) { + return QVariant(QMetaType(QMetaType::QDateTime)); + } + const auto msecs = std::chrono::duration_cast(tp.time_since_epoch()).count(); + return QDateTime::fromMSecsSinceEpoch(msecs, QTimeZone::UTC); +} + +QVariant toSqlTimestamp(const std::optional& tp) { + if (!tp) { + return QVariant(QMetaType(QMetaType::QDateTime)); + } + return toSqlTimestamp(*tp); +} + +std::chrono::system_clock::time_point fromSqlTimestamp(const QVariant& value) { + if (!value.isValid() || value.isNull()) { + return std::chrono::system_clock::time_point{}; + } + QDateTime dt = value.toDateTime(); + if (!dt.isValid()) { + dt = QDateTime::fromString(value.toString(), Qt::ISODateWithMs); + } + if (!dt.isValid()) { + return std::chrono::system_clock::time_point{}; + } + dt = dt.toTimeZone(QTimeZone::UTC); + return std::chrono::system_clock::time_point(std::chrono::milliseconds(dt.toMSecsSinceEpoch())); +} + +std::optional fromSqlTimestampOptional(const QVariant& value) { + if (!value.isValid() || value.isNull()) { + return std::nullopt; + } + return fromSqlTimestamp(value); +} + +std::optional parseIsoTimestamp(const std::optional& iso) { + if (!iso || iso->empty()) { + return std::nullopt; + } + QDateTime dt = QDateTime::fromString(QString::fromStdString(*iso), Qt::ISODateWithMs); + if (!dt.isValid()) { + dt = QDateTime::fromString(QString::fromStdString(*iso), Qt::ISODate); + } + if (!dt.isValid()) { + return std::nullopt; + } + dt = dt.toTimeZone(QTimeZone::UTC); + return std::chrono::system_clock::time_point(std::chrono::milliseconds(dt.toMSecsSinceEpoch())); +} + +bool tagsMatch(const std::vector& rowTags, const std::vector& filterTags) { + for (const auto& tag : filterTags) { + if (std::find(rowTags.begin(), rowTags.end(), tag) == rowTags.end()) { + return false; + } + } + return true; +} + +} // namespace + bool PgDal::connect(const std::string& dsn) { dsn_ = dsn; -#ifdef HAVE_PG - if (!isStubDsn(dsn)) { - try { - connection_ = std::make_unique(dsn); - if (!connection_->is_open()) { - throw std::runtime_error("pqxx connection reported closed state"); - } - activeTx_.reset(); - statementsPrepared_ = false; - prepareStatements(); - useInMemory_ = false; - connected_ = true; - resetInMemoryStore(); - return true; - } catch (const std::exception& ex) { - connection_.reset(); - activeTx_.reset(); - statementsPrepared_ = false; - throw std::runtime_error(std::string("PgDal: failed to connect to Postgres: ") + ex.what()); - } - } -#endif - #ifdef HAVE_PG - connection_.reset(); - activeTx_.reset(); - statementsPrepared_ = false; - #endif - useInMemory_ = true; connected_ = true; - resetInMemoryStore(); - return true; + transactionActive_ = false; + + if (isStubDsn(dsn)) { + closeDatabase(); + useInMemory_ = true; + namespacesByName_.clear(); + namespacesById_.clear(); + items_.clear(); + itemsByNamespace_.clear(); + chunks_.clear(); + chunksByItem_.clear(); + embeddings_.clear(); + nextNamespaceId_ = 1; + nextItemId_ = 1; + nextChunkId_ = 1; + nextEmbeddingId_ = 1; + return true; + } + + if (openDatabase(dsn)) { + useInMemory_ = false; + return true; + } + + useInMemory_ = true; + return false; } bool PgDal::begin() { -#ifdef HAVE_PG - if (!connected_ || useInMemory_ || !connection_) { + if (!connected_ || !hasDatabase()) { return false; } - if (!activeTx_) { - activeTx_ = std::make_unique(*connection_); + + if (transactionActive_) { + return true; } + + QSqlDatabase db = database(); + if (!db.transaction()) { + throw std::runtime_error(db.lastError().text().toStdString()); + } + transactionActive_ = true; return true; -#else - return connected_ && !useInMemory_; -#endif } void PgDal::commit() { -#ifdef HAVE_PG - if (activeTx_) { - activeTx_->commit(); - activeTx_.reset(); + if (!transactionActive_) { + return; } -#endif + QSqlDatabase db = database(); + if (!db.commit()) { + throw std::runtime_error(db.lastError().text().toStdString()); + } + transactionActive_ = false; } void PgDal::rollback() { -#ifdef HAVE_PG - if (activeTx_) { - activeTx_->abort(); - activeTx_.reset(); + if (!transactionActive_) { + return; } -#endif + QSqlDatabase db = database(); + db.rollback(); + transactionActive_ = false; +} + +bool PgDal::hasDatabase() const { + return !connectionName_.isEmpty() && QSqlDatabase::contains(connectionName_); +} + +QSqlDatabase PgDal::database() const { + if (!hasDatabase()) { + return QSqlDatabase(); + } + return QSqlDatabase::database(connectionName_); +} + +void PgDal::closeDatabase() { + if (connectionName_.isEmpty()) { + return; + } + { + QSqlDatabase db = QSqlDatabase::database(connectionName_, false); + if (db.isValid()) { + if (transactionActive_) { + db.rollback(); + transactionActive_ = false; + } + db.close(); + } + } + QSqlDatabase::removeDatabase(connectionName_); + connectionName_.clear(); +} + +PgDal::ConnectionConfig PgDal::parseDsn(const std::string& dsn) const { + ConnectionConfig cfg; + QUrl url(QString::fromStdString(dsn)); + if (url.scheme().isEmpty()) { + url = QUrl(QStringLiteral("postgresql://") + QString::fromStdString(dsn)); + } + + cfg.dbname = url.path().isEmpty() ? QStringLiteral("kompanion") + : url.path().mid(1); + cfg.host = url.host().isEmpty() ? QStringLiteral("localhost") + : url.host(); + cfg.port = url.port(5432); + cfg.user = url.userName(); + cfg.password = url.password(); + + QUrlQuery query(url); + if (query.hasQueryItem(QStringLiteral("host"))) { + const QString hostValue = query.queryItemValue(QStringLiteral("host")); + if (hostValue.startsWith(QStringLiteral("/"))) { + cfg.useSocket = true; + cfg.socketPath = hostValue; + } + } + + if (cfg.useSocket && cfg.socketPath.isEmpty()) { + cfg.socketPath = QStringLiteral("/var/run/postgresql"); + } + + QStringList optionPairs; + const auto queryItems = query.queryItems(); + for (const auto& item : queryItems) { + if (item.first == QStringLiteral("host")) continue; + optionPairs << QStringLiteral("%1=%2").arg(item.first, item.second); + } + cfg.options = optionPairs.join(QLatin1Char(';')); + + return cfg; +} + +bool PgDal::openDatabase(const std::string& dsn) { + if (!QSqlDatabase::isDriverAvailable(QStringLiteral("QPSQL"))) { + return false; + } + + closeDatabase(); + const ConnectionConfig cfg = parseDsn(dsn); + + connectionName_ = QStringLiteral("kom_dal_%1").arg(reinterpret_cast(this), 0, 16); + QSqlDatabase db = QSqlDatabase::addDatabase(QStringLiteral("QPSQL"), connectionName_); + db.setDatabaseName(cfg.dbname); + if (!cfg.user.isEmpty()) { + db.setUserName(cfg.user); + } + if (!cfg.password.isEmpty()) { + db.setPassword(cfg.password); + } + if (cfg.useSocket) { + db.setHostName(cfg.socketPath); + } else { + db.setHostName(cfg.host); + } + if (cfg.port > 0) { + db.setPort(cfg.port); + } + if (!cfg.options.isEmpty()) { + db.setConnectOptions(cfg.options); + } + + if (!db.open()) { + const std::string err = db.lastError().text().toStdString(); + closeDatabase(); + throw std::runtime_error("PgDal: failed to open database: " + err); + } + return true; } std::optional PgDal::ensureNamespace(const std::string& name) { - if (!connected_) return std::nullopt; -#ifdef HAVE_PG - if (!useInMemory_) { - return pgEnsureNamespace(name); + if (!connected_) { + return std::nullopt; } -#endif + if (!useInMemory_ && hasDatabase()) { + return sqlEnsureNamespace(name); + } + auto it = namespacesByName_.find(name); if (it != namespacesByName_.end()) { return it->second; @@ -128,18 +293,16 @@ std::optional PgDal::ensureNamespace(const std::string& name) { NamespaceRow row; row.id = allocateId(nextNamespaceId_, "ns_"); row.name = name; - namespacesByName_[name] = row; namespacesById_[row.id] = row; return row; } std::optional PgDal::findNamespace(const std::string& name) const { -#ifdef HAVE_PG - if (connected_ && !useInMemory_) { - return pgFindNamespace(name); + if (!useInMemory_ && hasDatabase()) { + return sqlFindNamespace(name); } -#endif + auto it = namespacesByName_.find(name); if (it == namespacesByName_.end()) { return std::nullopt; @@ -147,16 +310,49 @@ std::optional PgDal::findNamespace(const std::string& name) const return it->second; } +NamespaceRow PgDal::sqlEnsureNamespace(const std::string& name) { + QSqlDatabase db = database(); + QSqlQuery query(db); + query.prepare(QStringLiteral( + "INSERT INTO namespaces (name) VALUES (:name) " + "ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name " + "RETURNING id::text, name;")); + query.bindValue(QStringLiteral(":name"), QString::fromStdString(name)); + if (!query.exec() || !query.next()) { + throw std::runtime_error(query.lastError().text().toStdString()); + } + NamespaceRow row; + row.id = query.value(0).toString().toStdString(); + row.name = query.value(1).toString().toStdString(); + return row; +} + +std::optional PgDal::sqlFindNamespace(const std::string& name) const { + QSqlDatabase db = database(); + QSqlQuery query(db); + query.prepare(QStringLiteral( + "SELECT id::text, name FROM namespaces WHERE name = :name")); + query.bindValue(QStringLiteral(":name"), QString::fromStdString(name)); + if (!query.exec()) { + throw std::runtime_error(query.lastError().text().toStdString()); + } + if (!query.next()) { + return std::nullopt; + } + NamespaceRow row; + row.id = query.value(0).toString().toStdString(); + row.name = query.value(1).toString().toStdString(); + return row; +} + std::string PgDal::upsertItem(const ItemRow& row) { if (!connected_) { throw std::runtime_error("PgDal not connected"); } -#ifdef HAVE_PG - if (!useInMemory_) { - return pgUpsertItem(row).first; + if (!useInMemory_ && hasDatabase()) { + return sqlUpsertItem(row).first; } -#endif - const auto now = std::chrono::system_clock::now(); + ItemRow stored = row; if (stored.id.empty()) { stored.id = allocateId(nextItemId_, "item_"); @@ -165,21 +361,6 @@ std::string PgDal::upsertItem(const ItemRow& row) { auto existing = items_.find(stored.id); if (existing != items_.end()) { stored.revision = existing->second.revision + 1; - if (stored.created_at == std::chrono::system_clock::time_point{}) { - stored.created_at = existing->second.created_at; - } - if (!stored.expires_at && existing->second.expires_at) { - stored.expires_at = existing->second.expires_at; - } - if (stored.metadata_json.empty()) { - stored.metadata_json = existing->second.metadata_json; - } - } - if (stored.created_at == std::chrono::system_clock::time_point{}) { - stored.created_at = now; - } - if (!stored.metadata_json.empty()) { - // keep as provided } items_[stored.id] = stored; @@ -192,15 +373,57 @@ std::string PgDal::upsertItem(const ItemRow& row) { return stored.id; } +std::pair PgDal::sqlUpsertItem(const ItemRow& row) { + QSqlDatabase db = database(); + QSqlQuery query(db); + query.prepare(QStringLiteral( + "INSERT INTO memory_items (id, namespace_id, key, content, metadata, tags, text, created_at, expires_at) " + "VALUES (COALESCE(NULLIF(:id, '')::uuid, gen_random_uuid()), " + " :namespace_id::uuid, :key, :content, :metadata::jsonb, :tags::text[], :text, " + " COALESCE(:created_at, now()), :expires_at) " + "ON CONFLICT (id) DO UPDATE SET " + " key = EXCLUDED.key, content = EXCLUDED.content, metadata = EXCLUDED.metadata, " + " tags = EXCLUDED.tags, text = EXCLUDED.text, updated_at = now(), " + " expires_at = EXCLUDED.expires_at, " + " revision = memory_items.revision + 1 " + "RETURNING id::text, revision;")); + + query.bindValue(QStringLiteral(":id"), QString::fromStdString(row.id)); + query.bindValue(QStringLiteral(":namespace_id"), QString::fromStdString(row.namespace_id)); + if (row.key) { + query.bindValue(QStringLiteral(":key"), QString::fromStdString(*row.key)); + } else { + query.bindValue(QStringLiteral(":key"), QVariant(QMetaType(QMetaType::QString))); + } + query.bindValue(QStringLiteral(":content"), QString::fromStdString(row.content_json)); + query.bindValue(QStringLiteral(":metadata"), QString::fromStdString(row.metadata_json)); + query.bindValue(QStringLiteral(":tags"), QString::fromStdString(toPgArrayLiteral(row.tags))); + if (row.text) { + query.bindValue(QStringLiteral(":text"), QString::fromStdString(*row.text)); + } else { + query.bindValue(QStringLiteral(":text"), QVariant(QMetaType(QMetaType::QString))); + } + query.bindValue(QStringLiteral(":created_at"), toSqlTimestamp(row.created_at)); + query.bindValue(QStringLiteral(":expires_at"), toSqlTimestamp(row.expires_at)); + + if (!query.exec() || !query.next()) { + throw std::runtime_error(query.lastError().text().toStdString()); + } + + std::pair result; + result.first = query.value(0).toString().toStdString(); + result.second = query.value(1).toInt(); + return result; +} + std::vector PgDal::upsertChunks(const std::vector& chunks) { if (!connected_) { throw std::runtime_error("PgDal not connected"); } -#ifdef HAVE_PG - if (!useInMemory_) { - return pgUpsertChunks(chunks); + if (!useInMemory_ && hasDatabase()) { + return sqlUpsertChunks(chunks); } -#endif + std::vector ids; ids.reserve(chunks.size()); @@ -224,16 +447,43 @@ std::vector PgDal::upsertChunks(const std::vector& chunks return ids; } +std::vector PgDal::sqlUpsertChunks(const std::vector& chunks) { + std::vector ids; + ids.reserve(chunks.size()); + + QSqlDatabase db = database(); + QSqlQuery query(db); + query.prepare(QStringLiteral( + "INSERT INTO memory_chunks (id, item_id, seq, content) " + "VALUES (COALESCE(NULLIF(:id, '')::uuid, gen_random_uuid()), " + " :item_id::uuid, :seq, :content) " + "ON CONFLICT (id) DO UPDATE SET seq = EXCLUDED.seq, content = EXCLUDED.content " + "RETURNING id::text;")); + + for (const auto& chunk : chunks) { + query.bindValue(QStringLiteral(":id"), QString::fromStdString(chunk.id)); + query.bindValue(QStringLiteral(":item_id"), QString::fromStdString(chunk.item_id)); + query.bindValue(QStringLiteral(":seq"), chunk.ord); + query.bindValue(QStringLiteral(":content"), QString::fromStdString(chunk.text)); + + if (!query.exec() || !query.next()) { + throw std::runtime_error(query.lastError().text().toStdString()); + } + ids.push_back(query.value(0).toString().toStdString()); + query.finish(); + } + return ids; +} + void PgDal::upsertEmbeddings(const std::vector& embeddings) { if (!connected_) { throw std::runtime_error("PgDal not connected"); } -#ifdef HAVE_PG - if (!useInMemory_) { - pgUpsertEmbeddings(embeddings); + if (!useInMemory_ && hasDatabase()) { + sqlUpsertEmbeddings(embeddings); return; } -#endif + for (const auto& input : embeddings) { if (input.chunk_id.empty()) { continue; @@ -246,26 +496,48 @@ void PgDal::upsertEmbeddings(const std::vector& embeddings) { } } -std::vector PgDal::searchText(const std::string& namespaceId, - const std::string& query, - int limit) { -#ifdef HAVE_PG - if (connected_ && !useInMemory_) { - return pgSearchText(namespaceId, query, limit); +void PgDal::sqlUpsertEmbeddings(const std::vector& embeddings) { + QSqlDatabase db = database(); + QSqlQuery query(db); + query.prepare(QStringLiteral( + "INSERT INTO embeddings (id, chunk_id, model, dim, vector, normalized) " + "VALUES (COALESCE(NULLIF(:id, '')::uuid, gen_random_uuid()), " + " :chunk_id::uuid, :model, :dim, :vector::vector, FALSE) " + "ON CONFLICT (chunk_id, model) DO UPDATE SET " + " dim = EXCLUDED.dim, vector = EXCLUDED.vector, normalized = EXCLUDED.normalized " + "RETURNING id::text;")); + + for (const auto& emb : embeddings) { + query.bindValue(QStringLiteral(":id"), QString::fromStdString(emb.id)); + query.bindValue(QStringLiteral(":chunk_id"), QString::fromStdString(emb.chunk_id)); + query.bindValue(QStringLiteral(":model"), QString::fromStdString(emb.model)); + query.bindValue(QStringLiteral(":dim"), emb.dim); + query.bindValue(QStringLiteral(":vector"), QString::fromStdString(toPgVectorLiteral(emb.vector))); + + if (!query.exec()) { + throw std::runtime_error(query.lastError().text().toStdString()); + } + query.finish(); } -#endif +} + +std::vector PgDal::searchText(const std::string& namespaceId, + const std::string& queryText, + int limit) { + if (!useInMemory_ && hasDatabase()) { + return sqlSearchText(namespaceId, queryText, limit); + } + std::vector results; if (!connected_) return results; auto bucketIt = itemsByNamespace_.find(namespaceId); if (bucketIt == itemsByNamespace_.end()) return results; - const std::string loweredQuery = toLower(query); - const auto now = std::chrono::system_clock::now(); + const std::string loweredQuery = toLower(queryText); for (const auto& itemId : bucketIt->second) { auto itemIt = items_.find(itemId); if (itemIt == items_.end()) continue; - if (isExpired(itemIt->second, now)) continue; if (!loweredQuery.empty()) { const std::string loweredText = toLower(itemIt->second.text.value_or(std::string())); @@ -281,28 +553,66 @@ std::vector PgDal::searchText(const std::string& namespaceId, return results; } +std::vector PgDal::sqlSearchText(const std::string& namespaceId, + const std::string& queryText, + int limit) const { + QSqlDatabase db = database(); + QSqlQuery query(db); + query.prepare(QStringLiteral( + "SELECT id::text, namespace_id::text, key, content, metadata::text, text, tags::text[], revision, created_at, expires_at " + "FROM memory_items " + "WHERE namespace_id = :ns::uuid " + " AND deleted_at IS NULL " + " AND (:query = '' OR text ILIKE '%' || :query || '%') " + "ORDER BY updated_at DESC " + "LIMIT :limit;")); + query.bindValue(QStringLiteral(":ns"), QString::fromStdString(namespaceId)); + query.bindValue(QStringLiteral(":query"), QString::fromStdString(queryText)); + query.bindValue(QStringLiteral(":limit"), limit); + + if (!query.exec()) { + throw std::runtime_error(query.lastError().text().toStdString()); + } + + std::vector results; + while (query.next()) { + ItemRow row; + row.id = query.value(0).toString().toStdString(); + row.namespace_id = query.value(1).toString().toStdString(); + if (!query.value(2).isNull()) { + row.key = query.value(2).toString().toStdString(); + } + row.content_json = query.value(3).toString().toStdString(); + row.metadata_json = query.value(4).toString().toStdString(); + if (!query.value(5).isNull()) { + row.text = query.value(5).toString().toStdString(); + } + row.tags = parsePgTextArray(query.value(6).toString()); + row.revision = query.value(7).toInt(); + row.created_at = fromSqlTimestamp(query.value(8)); + row.expires_at = fromSqlTimestampOptional(query.value(9)); + results.push_back(std::move(row)); + } + return results; +} + std::vector> PgDal::searchVector( const std::string& namespaceId, const std::vector& embedding, int limit) { -#ifdef HAVE_PG - if (connected_ && !useInMemory_) { - return pgSearchVector(namespaceId, embedding, limit); + if (!useInMemory_ && hasDatabase()) { + return sqlSearchVector(namespaceId, embedding, limit); } -#endif + std::vector> scores; if (!connected_ || embedding.empty()) return scores; auto bucketIt = itemsByNamespace_.find(namespaceId); if (bucketIt == itemsByNamespace_.end()) return scores; - const auto now = std::chrono::system_clock::now(); for (const auto& itemId : bucketIt->second) { auto chunkBucketIt = chunksByItem_.find(itemId); if (chunkBucketIt == chunksByItem_.end()) continue; - auto itemIt = items_.find(itemId); - if (itemIt == items_.end()) continue; - if (isExpired(itemIt->second, now)) continue; float bestScore = -1.0f; for (const auto& chunkId : chunkBucketIt->second) { @@ -336,12 +646,48 @@ std::vector> PgDal::searchVector( return scores; } -std::optional PgDal::getItemById(const std::string& id) const { -#ifdef HAVE_PG - if (connected_ && !useInMemory_) { - return pgGetItemById(id); +std::vector> PgDal::sqlSearchVector( + const std::string& namespaceId, + const std::vector& embedding, + int limit) const { + std::vector> results; + if (embedding.empty()) { + return results; } -#endif + + QSqlDatabase db = database(); + QSqlQuery query(db); + query.prepare(QStringLiteral( + "SELECT i.id::text, 1 - (e.vector <=> :vector::vector) AS score " + "FROM embeddings e " + "JOIN memory_chunks c ON c.id = e.chunk_id " + "JOIN memory_items i ON i.id = c.item_id " + "WHERE i.namespace_id = :ns::uuid " + " AND i.deleted_at IS NULL " + "ORDER BY e.vector <-> :vector " + "LIMIT :limit;")); + query.bindValue(QStringLiteral(":vector"), QString::fromStdString(toPgVectorLiteral(embedding))); + query.bindValue(QStringLiteral(":ns"), QString::fromStdString(namespaceId)); + query.bindValue(QStringLiteral(":limit"), limit); + + if (!query.exec()) { + throw std::runtime_error(query.lastError().text().toStdString()); + } + + while (query.next()) { + std::pair entry; + entry.first = query.value(0).toString().toStdString(); + entry.second = static_cast(query.value(1).toDouble()); + results.push_back(std::move(entry)); + } + return results; +} + +std::optional PgDal::getItemById(const std::string& id) const { + if (!useInMemory_ && hasDatabase()) { + return sqlGetItemById(id); + } + auto it = items_.find(id); if (it == items_.end()) { return std::nullopt; @@ -349,61 +695,145 @@ std::optional PgDal::getItemById(const std::string& id) const { return it->second; } +std::optional PgDal::sqlGetItemById(const std::string& id) const { + QSqlDatabase db = database(); + QSqlQuery query(db); + query.prepare(QStringLiteral( + "SELECT id::text, namespace_id::text, key, content, metadata::text, text, tags::text[], revision, created_at, expires_at " + "FROM memory_items WHERE id = :id::uuid")); + query.bindValue(QStringLiteral(":id"), QString::fromStdString(id)); + + if (!query.exec()) { + throw std::runtime_error(query.lastError().text().toStdString()); + } + if (!query.next()) { + return std::nullopt; + } + ItemRow row; + row.id = query.value(0).toString().toStdString(); + row.namespace_id = query.value(1).toString().toStdString(); + if (!query.value(2).isNull()) { + row.key = query.value(2).toString().toStdString(); + } + row.content_json = query.value(3).toString().toStdString(); + row.metadata_json = query.value(4).toString().toStdString(); + if (!query.value(5).isNull()) { + row.text = query.value(5).toString().toStdString(); + } + row.tags = parsePgTextArray(query.value(6).toString()); + row.revision = query.value(7).toInt(); + row.created_at = fromSqlTimestamp(query.value(8)); + row.expires_at = fromSqlTimestampOptional(query.value(9)); + return row; +} + std::vector PgDal::fetchContext(const std::string& namespaceId, const std::optional& key, const std::vector& tags, const std::optional& sinceIso, - int limit) const { -#ifdef HAVE_PG - if (connected_ && !useInMemory_) { - // TODO: implement direct Postgres query once DAL wiring is complete. - return {}; - } -#endif - std::vector results; - if (!connected_ || limit == 0) { - return results; + int limit) { + if (!useInMemory_ && hasDatabase()) { + return sqlFetchContext(namespaceId, key, tags, sinceIso, limit); } + std::vector results; auto bucketIt = itemsByNamespace_.find(namespaceId); if (bucketIt == itemsByNamespace_.end()) { return results; } - const auto now = std::chrono::system_clock::now(); - const bool filterSince = sinceIso.has_value() && !sinceIso->empty(); + const auto sinceTp = parseIsoTimestamp(sinceIso); - for (auto it = bucketIt->second.rbegin(); it != bucketIt->second.rend(); ++it) { - auto itemIt = items_.find(*it); - if (itemIt == items_.end()) { - continue; - } - const ItemRow& row = itemIt->second; - if (isExpired(row, now)) { - continue; - } - if (key && row.key != key) { - continue; - } - if (!tags.empty() && !includesAllTags(row.tags, tags)) { - continue; - } - if (filterSince) { - const std::string createdIso = toIso8601(row.created_at); - if (!createdIso.empty() && createdIso < sinceIso.value()) { - continue; - } - } + for (const auto& itemId : bucketIt->second) { + auto it = items_.find(itemId); + if (it == items_.end()) continue; + const ItemRow& row = it->second; + if (key && (!row.key || *row.key != *key)) { + continue; + } + if (!tags.empty() && !tagsMatch(row.tags, tags)) { + continue; + } + if (sinceTp && row.created_at < *sinceTp) { + continue; + } results.push_back(row); - if (limit > 0 && static_cast(results.size()) >= limit) { - break; - } } + std::sort(results.begin(), results.end(), [](const ItemRow& a, const ItemRow& b) { + return a.created_at > b.created_at; + }); + + if (static_cast(results.size()) > limit) { + results.resize(static_cast(limit)); + } return results; } +std::vector PgDal::sqlFetchContext(const std::string& namespaceId, + const std::optional& key, + const std::vector& tags, + const std::optional& sinceIso, + int limit) const { + QSqlDatabase db = database(); + QString queryStr = QStringLiteral( + "SELECT id::text, namespace_id::text, key, content, metadata::text, text, tags::text[], revision, created_at, expires_at " + "FROM memory_items WHERE namespace_id = :ns::uuid AND deleted_at IS NULL"); + + if (key && !key->empty()) { + queryStr += QStringLiteral(" AND key = :key"); + } + if (!tags.empty()) { + queryStr += QStringLiteral(" AND tags @> :tags::text[]"); + } + if (sinceIso && !sinceIso->empty()) { + queryStr += QStringLiteral(" AND created_at >= :since"); + } + queryStr += QStringLiteral(" ORDER BY created_at DESC LIMIT :limit"); + + QSqlQuery query(db); + query.prepare(queryStr); + query.bindValue(QStringLiteral(":ns"), QString::fromStdString(namespaceId)); + query.bindValue(QStringLiteral(":limit"), limit); + + if (key && !key->empty()) { + query.bindValue(QStringLiteral(":key"), QString::fromStdString(*key)); + } + if (!tags.empty()) { + query.bindValue(QStringLiteral(":tags"), QString::fromStdString(toPgArrayLiteral(tags))); + } + if (sinceIso && !sinceIso->empty()) { + const auto sinceTp = parseIsoTimestamp(sinceIso); + query.bindValue(QStringLiteral(":since"), toSqlTimestamp(sinceTp)); + } + + if (!query.exec()) { + throw std::runtime_error(query.lastError().text().toStdString()); + } + + std::vector rows; + while (query.next()) { + ItemRow row; + row.id = query.value(0).toString().toStdString(); + row.namespace_id = query.value(1).toString().toStdString(); + if (!query.value(2).isNull()) { + row.key = query.value(2).toString().toStdString(); + } + row.content_json = query.value(3).toString().toStdString(); + row.metadata_json = query.value(4).toString().toStdString(); + if (!query.value(5).isNull()) { + row.text = query.value(5).toString().toStdString(); + } + row.tags = parsePgTextArray(query.value(6).toString()); + row.revision = query.value(7).toInt(); + row.created_at = fromSqlTimestamp(query.value(8)); + row.expires_at = fromSqlTimestampOptional(query.value(9)); + rows.push_back(std::move(row)); + } + return rows; +} + std::pair PgDal::upsertItem( const std::string& namespace_id, const std::optional& key, @@ -414,16 +844,17 @@ std::pair PgDal::upsertItem( row.namespace_id = namespace_id; row.key = key; row.content_json = content; - row.metadata_json = metadata_json; + row.metadata_json = metadata_json.empty() ? "{}" : metadata_json; if (!content.empty()) { row.text = content; } row.tags = tags; -#ifdef HAVE_PG - if (connected_ && !useInMemory_) { - return pgUpsertItem(row); + row.created_at = std::chrono::system_clock::now(); + + if (!useInMemory_ && hasDatabase()) { + return sqlUpsertItem(row); } -#endif + const std::string id = upsertItem(row); const auto stored = items_.find(id); const int revision = stored != items_.end() ? stored->second.revision : 1; @@ -456,32 +887,10 @@ std::vector PgDal::hybridSearch(const std::vector& query_vec const std::string& query_text, int k) { (void)model; -#ifdef HAVE_PG - if (connected_ && !useInMemory_) { - std::vector results; - std::unordered_set seen; - auto textMatches = pgSearchText(namespace_id, query_text, k); - for (std::size_t idx = 0; idx < textMatches.size(); ++idx) { - const auto& item = textMatches[idx]; - results.push_back(item.id); - seen.insert(item.id); - if (static_cast(results.size()) >= k) { - return results; - } - } - - if (!query_vec.empty()) { - auto vectorMatches = pgSearchVector(namespace_id, query_vec, k); - for (const auto& pair : vectorMatches) { - if (seen.count(pair.first)) continue; - results.push_back(pair.first); - if (static_cast(results.size()) >= k) break; - } - } - return results; + if (!useInMemory_ && hasDatabase()) { + return sqlHybridSearch(query_vec, model, namespace_id, query_text, k); } -#endif std::vector results; auto textMatches = searchText(namespace_id, query_text, k); @@ -503,6 +912,37 @@ std::vector PgDal::hybridSearch(const std::vector& query_vec return results; } +std::vector PgDal::sqlHybridSearch(const std::vector& query_vec, + const std::string& model, + const std::string& namespace_id, + const std::string& query_text, + int k) { + (void)model; + std::unordered_set seen; + std::vector results; + + auto textMatches = sqlSearchText(namespace_id, query_text, k); + for (const auto& item : textMatches) { + results.push_back(item.id); + seen.insert(item.id); + if (static_cast(results.size()) >= k) { + return results; + } + } + + if (!query_vec.empty()) { + auto vectorMatches = sqlSearchVector(namespace_id, query_vec, k); + for (const auto& pair : vectorMatches) { + if (seen.count(pair.first)) continue; + results.push_back(pair.first); + seen.insert(pair.first); + if (static_cast(results.size()) >= k) break; + } + } + + return results; +} + std::string PgDal::allocateId(std::size_t& counter, const std::string& prefix) { return prefix + std::to_string(counter++); } @@ -514,44 +954,6 @@ std::string PgDal::toLower(const std::string& value) { return lowered; } -std::string PgDal::toIso8601(std::chrono::system_clock::time_point tp) { - if (tp == std::chrono::system_clock::time_point{}) { - return std::string(); - } - auto tt = std::chrono::system_clock::to_time_t(tp); - std::tm tm{}; -#if defined(_WIN32) - gmtime_s(&tm, &tt); -#else - gmtime_r(&tt, &tm); -#endif - std::ostringstream os; - os << std::put_time(&tm, "%Y-%m-%dT%H:%M:%SZ"); - return os.str(); -} - -bool PgDal::includesAllTags(const std::vector& haystack, - const std::vector& needles) { - if (needles.empty()) { - return true; - } - std::unordered_set lookup(haystack.begin(), haystack.end()); - for (const auto& tag : needles) { - if (!lookup.count(tag)) { - return false; - } - } - return true; -} - -bool PgDal::isExpired(const ItemRow& row, - std::chrono::system_clock::time_point now) const { - if (!row.expires_at.has_value()) { - return false; - } - return now >= row.expires_at.value(); -} - std::string PgDal::escapePgArrayElement(const std::string& value) { std::string escaped; escaped.reserve(value.size()); @@ -568,380 +970,65 @@ std::string PgDal::toPgArrayLiteral(const std::vector& values) { if (values.empty()) { return "{}"; } - std::ostringstream oss; - oss << "{"; + std::string out = "{"; for (std::size_t i = 0; i < values.size(); ++i) { - if (i) oss << ","; - oss << "\"" << escapePgArrayElement(values[i]) << "\""; + if (i) out += ','; + out += '"'; + out += escapePgArrayElement(values[i]); + out += '"'; } - oss << "}"; - return oss.str(); + out += "}"; + return out; } std::string PgDal::toPgVectorLiteral(const std::vector& values) { if (values.empty()) { return "[]"; } - std::ostringstream oss; - oss << "["; + std::string out = "["; for (std::size_t i = 0; i < values.size(); ++i) { - if (i) oss << ","; - oss << values[i]; + if (i) out += ','; + out += std::to_string(values[i]); } - oss << "]"; - return oss.str(); -} - -#ifdef HAVE_PG - -void PgDal::prepareStatements() { - if (statementsPrepared_ || !connection_) { - return; - } - - connection_->prepare("ensure_namespace", - "INSERT INTO namespaces (name) VALUES ($1)" - " ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name" - " RETURNING id::text, name"); - - connection_->prepare("find_namespace", - "SELECT id::text, name FROM namespaces WHERE name = $1"); - - connection_->prepare("upsert_item", - "INSERT INTO memory_items (id, namespace_id, key, content, text, tags, metadata)" - " VALUES (COALESCE(NULLIF($1, '')::uuid, gen_random_uuid())," - " $2::uuid, $3, $4::jsonb, $5, $6::text[], $7::jsonb)" - " ON CONFLICT (id) DO UPDATE SET" - " key = EXCLUDED.key," - " content = EXCLUDED.content," - " text = EXCLUDED.text," - " tags = EXCLUDED.tags," - " metadata = EXCLUDED.metadata," - " updated_at = now()" - " RETURNING id::text, revision"); - - connection_->prepare("insert_chunk", - "INSERT INTO memory_chunks (id, item_id, ord, text, metadata)" - " VALUES (COALESCE(NULLIF($1, '')::uuid, gen_random_uuid())," - " $2::uuid, $3, $4, $5::jsonb)" - " ON CONFLICT (id) DO UPDATE SET" - " ord = EXCLUDED.ord," - " text = EXCLUDED.text," - " metadata = EXCLUDED.metadata" - " RETURNING id::text"); - - connection_->prepare("insert_embedding", - "INSERT INTO embeddings (id, chunk_id, model, dim, vector, normalized)" - " VALUES (COALESCE(NULLIF($1, '')::uuid, gen_random_uuid())," - " $2::uuid, $3, $4, $5::vector, FALSE)" - " ON CONFLICT (chunk_id, model) DO UPDATE SET" - " dim = EXCLUDED.dim," - " vector = EXCLUDED.vector," - " normalized = EXCLUDED.normalized" - " RETURNING id::text"); - - connection_->prepare("search_text", - "SELECT id::text, namespace_id::text, key, content::text, text, tags::text[], revision" - " FROM memory_items" - " WHERE namespace_id = $1::uuid" - " AND deleted_at IS NULL" - " AND ($2 = '' OR text ILIKE '%' || $2 || '%')" - " ORDER BY updated_at DESC" - " LIMIT $3"); - - connection_->prepare("search_vector", - "SELECT i.id::text," - " 1 - (e.vector <=> $2::vector) AS score" - " FROM embeddings e" - " JOIN memory_chunks c ON c.id = e.chunk_id" - " JOIN memory_items i ON i.id = c.item_id" - " WHERE i.namespace_id = $1::uuid" - " AND i.deleted_at IS NULL" - " ORDER BY e.vector <-> $2" - " LIMIT $3"); - - connection_->prepare("get_item_by_id", - "SELECT id::text, namespace_id::text, key, content::text, text, tags::text[], revision" - " FROM memory_items" - " WHERE id = $1::uuid"); - - statementsPrepared_ = true; -} - -NamespaceRow PgDal::mapNamespaceRow(const pqxx::row& row) const { - NamespaceRow out; - out.id = row[0].c_str(); - out.name = row[1].c_str(); + out += "]"; return out; } -std::vector PgDal::parseTextArrayField(const pqxx::field& field) const { +std::vector PgDal::parsePgTextArray(const QString& value) { std::vector tags; - if (field.is_null()) { + QString trimmed = value.trimmed(); + if (!trimmed.startsWith(QLatin1Char('{')) || !trimmed.endsWith(QLatin1Char('}'))) { return tags; } - pqxx::array_parser parser(field); - for (;;) { - auto [kind, token] = parser.get_next(); - switch (kind) { - case pqxx::array_parser::string_value: - tags.emplace_back(token); - break; - case pqxx::array_parser::value: - tags.emplace_back(token); - break; - case pqxx::array_parser::null_value: - tags.emplace_back(std::string()); - break; - case pqxx::array_parser::end_array: - case pqxx::array_parser::done: - return tags; - default: - break; + trimmed = trimmed.mid(1, trimmed.size() - 2); + QString current; + bool inQuotes = false; + bool escape = false; + for (QChar ch : trimmed) { + if (escape) { + current.append(ch); + escape = false; + continue; } + if (ch == QLatin1Char('\\')) { + escape = true; + continue; + } + if (ch == QLatin1Char('"')) { + inQuotes = !inQuotes; + continue; + } + if (!inQuotes && ch == QLatin1Char(',')) { + tags.push_back(current.toStdString()); + current.clear(); + continue; + } + current.append(ch); } + if (!current.isEmpty()) { + tags.push_back(current.toStdString()); + } + return tags; } -ItemRow PgDal::mapItemRow(const pqxx::row& row) const { - ItemRow item; - item.id = row[0].c_str(); - item.namespace_id = row[1].c_str(); - if (!row[2].is_null()) { - item.key = row[2].c_str(); - } - if (!row[3].is_null()) { - item.content_json = row[3].c_str(); - } - if (!row[4].is_null()) { - item.text = row[4].c_str(); - } - item.tags = parseTextArrayField(row[5]); - item.revision = row[6].as(1); - item.metadata_json = "{}"; - item.created_at = std::chrono::system_clock::now(); - return item; -} - -std::optional PgDal::pgEnsureNamespace(const std::string& name) { - if (!connection_) return std::nullopt; - prepareStatements(); - try { - if (activeTx_) { - auto row = activeTx_->prepared("ensure_namespace")(name).exec1(); - return mapNamespaceRow(row); - } - pqxx::work tx(*connection_); - auto row = tx.prepared("ensure_namespace")(name).exec1(); - tx.commit(); - return mapNamespaceRow(row); - } catch (const std::exception& ex) { - throw std::runtime_error(std::string("PgDal ensureNamespace failed: ") + ex.what()); - } -} - -std::optional PgDal::pgFindNamespace(const std::string& name) const { - if (!connection_) return std::nullopt; - const_cast(this)->prepareStatements(); - try { - if (activeTx_) { - auto result = activeTx_->prepared("find_namespace")(name).exec(); - if (result.empty()) return std::nullopt; - return mapNamespaceRow(result.front()); - } - pqxx::read_transaction tx(*connection_); - auto result = tx.prepared("find_namespace")(name).exec(); - if (result.empty()) return std::nullopt; - return mapNamespaceRow(result.front()); - } catch (const std::exception& ex) { - throw std::runtime_error(std::string("PgDal findNamespace failed: ") + ex.what()); - } -} - -std::pair PgDal::pgUpsertItem(const ItemRow& row) { - if (!connection_) { - throw std::runtime_error("PgDal Postgres connection not available"); - } - prepareStatements(); - - const std::string tagsLiteral = toPgArrayLiteral(row.tags); - const std::string metadata = "{}"; - const std::string content = row.content_json.empty() ? "{}" : row.content_json; - const std::optional text = row.text; - const std::optional key = row.key; - const std::string idParam = row.id; - - auto exec = [&](pqxx::work& tx) -> std::pair { - auto result = tx.prepared("upsert_item")(idParam)(row.namespace_id)(key)(content)(text)(tagsLiteral)(metadata).exec1(); - std::pair out; - out.first = result[0].c_str(); - out.second = result[1].as(1); - return out; - }; - - try { - if (activeTx_) { - return exec(*activeTx_); - } - pqxx::work tx(*connection_); - auto pair = exec(tx); - tx.commit(); - return pair; - } catch (const std::exception& ex) { - throw std::runtime_error(std::string("PgDal upsertItem failed: ") + ex.what()); - } -} - -std::vector PgDal::pgUpsertChunks(const std::vector& chunks) { - if (!connection_) { - throw std::runtime_error("PgDal Postgres connection not available"); - } - if (chunks.empty()) return {}; - prepareStatements(); - - auto execOne = [&](pqxx::work& tx, const ChunkRow& chunk) -> std::string { - const std::string metadata = "{}"; - auto result = tx.prepared("insert_chunk")(chunk.id)(chunk.item_id)(chunk.ord)(chunk.text)(metadata).exec1(); - return result[0].c_str(); - }; - - std::vector ids; - ids.reserve(chunks.size()); - - try { - if (activeTx_) { - for (const auto& chunk : chunks) { - ids.push_back(execOne(*activeTx_, chunk)); - } - return ids; - } - pqxx::work tx(*connection_); - for (const auto& chunk : chunks) { - ids.push_back(execOne(tx, chunk)); - } - tx.commit(); - return ids; - } catch (const std::exception& ex) { - throw std::runtime_error(std::string("PgDal upsertChunks failed: ") + ex.what()); - } -} - -void PgDal::pgUpsertEmbeddings(const std::vector& embeddings) { - if (!connection_) { - throw std::runtime_error("PgDal Postgres connection not available"); - } - if (embeddings.empty()) return; - prepareStatements(); - - auto execOne = [&](pqxx::work& tx, const EmbeddingRow& embedding) { - const std::string vectorLiteral = toPgVectorLiteral(embedding.vector); - tx.prepared("insert_embedding")(embedding.id)(embedding.chunk_id)(embedding.model)(embedding.dim)(vectorLiteral).exec(); - }; - - try { - if (activeTx_) { - for (const auto& emb : embeddings) { - execOne(*activeTx_, emb); - } - return; - } - pqxx::work tx(*connection_); - for (const auto& emb : embeddings) { - execOne(tx, emb); - } - tx.commit(); - } catch (const std::exception& ex) { - throw std::runtime_error(std::string("PgDal upsertEmbeddings failed: ") + ex.what()); - } -} - -std::vector PgDal::pgSearchText(const std::string& namespaceId, - const std::string& query, - int limit) { - if (!connection_) return {}; - prepareStatements(); - - auto exec = [&](auto& tx) { - std::vector rows; - auto result = tx.prepared("search_text")(namespaceId)(query)(limit).exec(); - rows.reserve(result.size()); - for (const auto& row : result) { - rows.push_back(mapItemRow(row)); - } - return rows; - }; - - try { - if (activeTx_) { - return exec(*activeTx_); - } - pqxx::read_transaction tx(*connection_); - auto rows = exec(tx); - tx.commit(); - return rows; - } catch (const std::exception& ex) { - throw std::runtime_error(std::string("PgDal searchText failed: ") + ex.what()); - } -} - -std::vector> PgDal::pgSearchVector( - const std::string& namespaceId, - const std::vector& embedding, - int limit) { - if (!connection_ || embedding.empty()) return {}; - prepareStatements(); - const std::string vectorLiteral = toPgVectorLiteral(embedding); - - auto exec = [&](auto& tx) { - std::vector> matches; - auto result = tx.prepared("search_vector")(namespaceId)(vectorLiteral)(limit).exec(); - matches.reserve(result.size()); - for (const auto& row : result) { - std::pair entry; - entry.first = row[0].c_str(); - entry.second = static_cast(row[1].as(0.0)); - matches.push_back(entry); - } - return matches; - }; - - try { - if (activeTx_) { - return exec(*activeTx_); - } - pqxx::read_transaction tx(*connection_); - auto matches = exec(tx); - tx.commit(); - return matches; - } catch (const std::exception& ex) { - throw std::runtime_error(std::string("PgDal searchVector failed: ") + ex.what()); - } -} - -std::optional PgDal::pgGetItemById(const std::string& id) const { - if (!connection_) return std::nullopt; - const_cast(this)->prepareStatements(); - - auto exec = [&](auto& tx) -> std::optional { - auto result = tx.prepared("get_item_by_id")(id).exec(); - if (result.empty()) { - return std::nullopt; - } - return mapItemRow(result.front()); - }; - - try { - if (activeTx_) { - return exec(*activeTx_); - } - pqxx::read_transaction tx(*connection_); - auto item = exec(tx); - tx.commit(); - return item; - } catch (const std::exception& ex) { - throw std::runtime_error(std::string("PgDal getItemById failed: ") + ex.what()); - } -} - -#endif // HAVE_PG - } // namespace kom diff --git a/src/dal/PgDal.hpp b/src/dal/PgDal.hpp index 28f731c..190313d 100644 --- a/src/dal/PgDal.hpp +++ b/src/dal/PgDal.hpp @@ -2,19 +2,16 @@ #include "IDatabase.hpp" +#include +#include + #include -#include #include #include -#include #include #include #include -#ifdef HAVE_PG -#include -#endif - namespace kom { struct NamespaceRow { @@ -27,11 +24,11 @@ struct ItemRow { std::string namespace_id; std::optional key; std::string content_json; + std::string metadata_json = "{}"; std::optional text; - std::string metadata_json; std::vector tags; int revision = 1; - std::chrono::system_clock::time_point created_at{}; + std::chrono::system_clock::time_point created_at; std::optional expires_at; }; @@ -79,9 +76,9 @@ public: const std::optional& key, const std::vector& tags, const std::optional& sinceIso, - int limit) const; + int limit); - // IDatabase overrides (stubbed for now to operate on the in-memory backing store). + // IDatabase overrides std::pair upsertItem( const std::string& namespace_id, const std::optional& key, @@ -99,40 +96,61 @@ public: int k) override; private: - std::string allocateId(std::size_t& counter, const std::string& prefix); - static std::string toLower(const std::string& value); - static std::string toIso8601(std::chrono::system_clock::time_point tp); - static bool includesAllTags(const std::vector& haystack, - const std::vector& needles); - bool isExpired(const ItemRow& row, - std::chrono::system_clock::time_point now) const; - void resetInMemoryStore(); - static std::string toPgArrayLiteral(const std::vector& values); - static std::string escapePgArrayElement(const std::string& value); - static std::string toPgVectorLiteral(const std::vector& values); -#ifdef HAVE_PG - void prepareStatements(); - NamespaceRow mapNamespaceRow(const pqxx::row& row) const; - ItemRow mapItemRow(const pqxx::row& row) const; - std::vector parseTextArrayField(const pqxx::field& field) const; - std::optional pgEnsureNamespace(const std::string& name); - std::optional pgFindNamespace(const std::string& name) const; - std::pair pgUpsertItem(const ItemRow& row); - std::vector pgUpsertChunks(const std::vector& chunks); - void pgUpsertEmbeddings(const std::vector& embeddings); - std::vector pgSearchText(const std::string& namespaceId, - const std::string& query, - int limit); - std::vector> pgSearchVector( + struct ConnectionConfig { + QString host; + int port = 5432; + QString dbname; + QString user; + QString password; + bool useSocket = false; + QString socketPath; + QString options; + }; + + bool hasDatabase() const; + bool openDatabase(const std::string& dsn); + void closeDatabase(); + QSqlDatabase database() const; + ConnectionConfig parseDsn(const std::string& dsn) const; + + NamespaceRow sqlEnsureNamespace(const std::string& name); + std::optional sqlFindNamespace(const std::string& name) const; + std::pair sqlUpsertItem(const ItemRow& row); + std::vector sqlUpsertChunks(const std::vector& chunks); + void sqlUpsertEmbeddings(const std::vector& embeddings); + std::vector sqlSearchText(const std::string& namespaceId, + const std::string& query, + int limit) const; + std::vector> sqlSearchVector( const std::string& namespaceId, const std::vector& embedding, - int limit); - std::optional pgGetItemById(const std::string& id) const; -#endif + int limit) const; + std::optional sqlGetItemById(const std::string& id) const; + std::vector sqlFetchContext(const std::string& namespaceId, + const std::optional& key, + const std::vector& tags, + const std::optional& sinceIso, + int limit) const; + + std::vector sqlHybridSearch(const std::vector& query_vec, + const std::string& model, + const std::string& namespace_id, + const std::string& query_text, + int k); + + std::string allocateId(std::size_t& counter, const std::string& prefix); + static std::string toLower(const std::string& value); + static bool isStubDsn(const std::string& dsn); + static std::string escapePgArrayElement(const std::string& value); + static std::string toPgArrayLiteral(const std::vector& values); + static std::string toPgVectorLiteral(const std::vector& values); + static std::vector parsePgTextArray(const QString& value); bool connected_ = false; bool useInMemory_ = true; std::string dsn_; + QString connectionName_; + bool transactionActive_ = false; std::size_t nextNamespaceId_ = 1; std::size_t nextItemId_ = 1; @@ -146,12 +164,6 @@ private: std::unordered_map chunks_; std::unordered_map> chunksByItem_; std::unordered_map embeddings_; - -#ifdef HAVE_PG - std::unique_ptr connection_; - mutable std::unique_ptr activeTx_; - bool statementsPrepared_ = false; -#endif }; } // namespace kom diff --git a/src/main.cpp b/src/main.cpp index 50072fd..41776f9 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -10,6 +10,10 @@ #ifdef HAVE_KCONFIG #include #include +#else +#include +#include +#include #endif #include "mcp/KomMcpServer.hpp" @@ -62,8 +66,21 @@ std::optional read_dsn_from_config() { return dsn.toStdString(); } #else +QString configFilePath() { + QString base = QStandardPaths::writableLocation(QStandardPaths::ConfigLocation); + if (base.isEmpty()) { + base = QDir::homePath(); + } + return QDir(base).filePath(QStringLiteral("kompanionrc")); +} + std::optional read_dsn_from_config() { - return std::nullopt; + QSettings settings(configFilePath(), QSettings::IniFormat); + const QString dsn = settings.value(QStringLiteral("Database/PgDsn")).toString(); + if (dsn.isEmpty()) { + return std::nullopt; + } + return dsn.toStdString(); } #endif diff --git a/tasks-table.md b/tasks-table.md index b723622..e82fa8b 100644 --- a/tasks-table.md +++ b/tasks-table.md @@ -33,7 +33,7 @@ MCP backend for Kompanion: memory/context/embedding provider over MCP, built fro | #18 | ⬜ todo | 492 | **Expose Agentic-Control-Framework as a tool** | Wrap ACF endpoints into a too... | | #19 | ⬜ todo | 509 | **DAL skeleton + SQL calls (pgvector)** | Create DAL interfaces and pgv... | | #20 | ⬜ todo | 491 | **Claude Code integration rescue plan** | Stabilize Qwen-2.5-Coder insi... | -| #21 | ⬜ todo | 510 | **DAL Phase 1: libpq/pqxx wiring + SQL calls** | Link pqxx, implement PgDal ag... | +| #21 | ⬜ todo | 510 | **DAL Phase 1: Qt6/QSql wiring + SQL calls** | Use QPSQL via Qt6::Sql, implement PgDal ag... | | #22 | ⬜ todo | 490 | **Handlers → DAL integration** | Wire kom.memory.v1.upsert_mem... | | #23 | ⬜ todo | 511 | **Contract tests: DAL-backed tools** | Expand CTest to cover DAL-bac... | @@ -48,8 +48,8 @@ MCP backend for Kompanion: memory/context/embedding provider over MCP, built fro | ID | Status | Title | |:--:|:------:|:------| -| #21.1 | ⬜ todo | CMake: find_package(pqxx) and link; CI env var DSN | -| #21.2 | ⬜ todo | PgDal: implement connect/tx + prepared statements | +| #21.1 | ⬜ todo | CMake: require Qt6::Sql (QPSQL); CI env var DSN | +| #21.2 | ⬜ todo | PgDal: implement QSql connect/tx + prepared statements | | #21.3 | ⬜ todo | SQL: ensureNamespace, upsertItem/Chunks/Embeddings | | #21.4 | ⬜ todo | Search: FTS/trgm + vector <-> with filters (namespace/thread/tags) | @@ -60,4 +60,3 @@ MCP backend for Kompanion: memory/context/embedding provider over MCP, built fro | #22.1 | ⬜ todo | Replace ad-hoc JSON with parser (nlohmann/json or simdjson) | | #22.2 | ⬜ todo | Validate request bodies against schemas before DAL calls | | #22.3 | ⬜ todo | Scope & sensitivity enforcement (namespace/user + skip secret embeddings) | - diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 002c5aa..132e50c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -3,6 +3,7 @@ add_executable(test_mcp_tools ) target_include_directories(test_mcp_tools PRIVATE ${PROJECT_SOURCE_DIR}/src) target_link_libraries(test_mcp_tools PRIVATE kom_dal) +target_compile_options(test_mcp_tools PRIVATE -fexceptions) add_test(NAME contract_mcp_tools COMMAND test_mcp_tools) @@ -11,6 +12,7 @@ add_executable(contract_memory ) target_include_directories(contract_memory PRIVATE ${PROJECT_SOURCE_DIR}/src) target_link_libraries(contract_memory PRIVATE kom_dal) +target_compile_options(contract_memory PRIVATE -fexceptions) add_test(NAME contract_memory COMMAND contract_memory) @@ -19,5 +21,6 @@ add_executable(test_memory_exchange ) target_include_directories(test_memory_exchange PRIVATE ${PROJECT_SOURCE_DIR}/src) target_link_libraries(test_memory_exchange PRIVATE kom_dal) +target_compile_options(test_memory_exchange PRIVATE -fexceptions) add_test(NAME mcp_memory_exchange COMMAND test_memory_exchange)