Added optional Postgres support:

CMake now detects libpq/pqxx, sets HAVE_PG, and links kom_dal when available
 (CMakeLists.txt:6-24, src/dal/CMakeLists.txt:9-13); PgDal gained connection management,
    prepared statements, and guarded runtime paths while preserving the
 in-memory fallback (src/dal/PgDal.cpp:1-820, src/dal/PgDal.hpp:1-153).
  - Introduced MCP resource descriptors mirroring the Ξlope memory
model—episodic events, semantic chunks, and the semantic_sync job contract—to
 guide tooling and DAL sync behavior (resources/memory/
    kom.memory.v1/episodic.json, semantic.json, jobs/semantic_sync.json).

Note: Work done by little blue
This commit is contained in:
Χγφτ Kompanion 2025-10-15 10:38:33 +13:00
parent b567b51ee2
commit 122085b1f8
9 changed files with 904 additions and 12 deletions

View File

@ -4,6 +4,25 @@ project(metal_kompanion_mcp LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_STANDARD_REQUIRED ON)
option(KOM_ENABLE_PG "Enable Postgres-backed DAL support" ON)
set(KOM_HAVE_PG OFF)
if (KOM_ENABLE_PG)
find_package(PostgreSQL QUIET)
find_package(pqxx QUIET)
if (PostgreSQL_FOUND AND pqxx_FOUND)
set(KOM_HAVE_PG ON)
endif()
endif()
if (KOM_HAVE_PG)
message(STATUS "kom_dal: Postgres support enabled (HAVE_PG)")
else()
message(STATUS "kom_dal: Postgres support disabled (pqxx/libpq not found)")
endif()
set(KOM_HAVE_PG ${KOM_HAVE_PG} CACHE INTERNAL "kom_dal has Postgres backend")
# Placeholder: find Qt and qtmcp when available # Placeholder: find Qt and qtmcp when available
# find_package(Qt6 COMPONENTS Core Network REQUIRED) # find_package(Qt6 COMPONENTS Core Network REQUIRED)
# find_package(qtmcp REQUIRED) # find_package(qtmcp REQUIRED)

View File

@ -55,3 +55,4 @@ Promote episodic rows into semantic (chunks + embeddings) storage.
## Notes ## Notes
- Namespaces: `project:metal`, `thread:<id>`, `user:<id>`. - Namespaces: `project:metal`, `thread:<id>`, `user:<id>`.
- Store raw content and normalized text fields for RAG. - Store raw content and normalized text fields for RAG.
- Resource descriptors live under `resources/memory/kom.memory.v1/` (episodic, semantic, and sync jobs) to align MCP tooling with DAL schema.

View File

@ -43,6 +43,11 @@
4. Expose a placeholder MCP tool `kom.memory.v1.sync_semantic` that enqueues or executes the job. 4. Expose a placeholder MCP tool `kom.memory.v1.sync_semantic` that enqueues or executes the job.
5. Note TTL and privacy requirements; skip items with `expires_at` in the past or flagged secret. 5. Note TTL and privacy requirements; skip items with `expires_at` in the past or flagged secret.
**Ξlope Alignment Notes (2025-10-15)**
- Episodic resources capture resonance links and identity hints so the Librarian layer (see `elope/doc/architecture_memory.md`) can strengthen cross-agent patterns without raw content sharing.
- Semantic resources surface `identity_vector` and `semantic_weight`, enabling supersemantic indexing once crystallization occurs.
- `jobs/semantic_sync` maintains `cursor_event_id` and skips `sensitivity=secret`, mirroring the elope crystallization guidance in `/tmp/mem-elope.txt`.
## 4. `hybrid_search_v1` with `pgvector` ## 4. `hybrid_search_v1` with `pgvector`
**SQL Components** **SQL Components**
1. Update migrations (`sql/pg/001_init.sql`) to include: 1. Update migrations (`sql/pg/001_init.sql`) to include:
@ -60,8 +65,15 @@
3. Update `HandlersMemory::search_memory` to surface the new scores and annotate whether lexical/vector contributed (optional metadata). 3. Update `HandlersMemory::search_memory` to surface the new scores and annotate whether lexical/vector contributed (optional metadata).
4. Add a contract test scenario once pqxx-backed execution is available (requires live Postgres fixture later). 4. Add a contract test scenario once pqxx-backed execution is available (requires live Postgres fixture later).
## 5. Secret Handling, Snapshots, and CLI Hooks
- **Secret propagation**: episodic `sensitivity` + `embeddable` flags gate embedding generation. DAL queries will add predicates (`metadata->>'sensitivity' != 'secret'`) before hybrid search.
- **Snapshots**: episodic entries with `content_type = snapshot` reference durable artifacts; sync summarises them into semantic text while retaining `snapshot_ref` for CLI inspection.
- **Hybrid policy**: `pgSearchVector` will filter by caller capability (namespace scope, secret clearance) before ranking; contract tests must assert omission of secret-tagged items.
- **CLI sketch**: plan for a Qt `QCoreApplication` tool (`kom_mctl`) exposing commands to list namespaces, tail episodic streams, trigger `sync_semantic`, and inspect resonance graphs—all wired through the new prepared statements.
- **Observability**: CLI should read the `jobs/semantic_sync` state block to display cursors, pending counts, and last error logs; dry-run mode estimates embeddings without committing.
## Next-Step Checklist ## Next-Step Checklist
- [ ] Detect pqxx via CMake and plumb `HAVE_PG`. - [x] Detect pqxx via CMake and plumb `HAVE_PG`.
- [x] Normalize contract_memory CTest target and remove stale library target. - [x] Normalize contract_memory CTest target and remove stale library target.
- [ ] Author `resources/memory/` descriptors and sync job outline. - [ ] Author `resources/memory/` descriptors and sync job outline.
- [ ] Extend DAL header to carry prepared-statement aware APIs (may introduce new structs). - [ ] Extend DAL header to carry prepared-statement aware APIs (may introduce new structs).

View File

@ -0,0 +1,104 @@
{
"resource": "kom.memory.v1.episodic",
"description": "Short-lived episodic memory entries captured per interaction window before crystallization into semantic memory.",
"version": 1,
"primary_key": ["id"],
"fields": {
"id": {
"type": "string",
"format": "uuid",
"description": "Unique id for the episodic event."
},
"namespace": {
"type": "string",
"description": "Logical scope (e.g., project:user:thread) aligned with DAL namespaces."
},
"thread_id": {
"type": ["string", "null"],
"description": "Conversation or task thread identifier (optional)."
},
"speaker": {
"type": ["string", "null"],
"description": "Free-form actor label (e.g., human handle, agent codename)."
},
"role": {
"type": "string",
"enum": ["human", "agent", "tool", "system"],
"description": "High-level origin role used for policy decisions."
},
"content_type": {
"type": "string",
"enum": ["text", "snapshot", "tool_output", "command", "observation"],
"description": "Payload type; snapshots reference stored artifacts."
},
"content": {
"type": ["object", "string"],
"description": "Canonical content. Strings hold raw text; objects provide structured payloads (e.g., tool JSON)."
},
"sensitivity": {
"type": "string",
"enum": ["normal", "private", "secret"],
"default": "normal",
"description": "Embeddings and sync rules consult this flag (secret never leaves episodic store)."
},
"embeddable": {
"type": "boolean",
"default": true,
"description": "Explicit override for embedding eligibility (set false for high-entropy or binary blobs)."
},
"embedding_status": {
"type": "string",
"enum": ["pending", "processing", "done", "skipped"],
"default": "pending",
"description": "Lifecycle marker for DAL sync jobs."
},
"resonance_links": {
"type": "array",
"items": {
"type": "object",
"properties": {
"target_id": {"type": "string"},
"strength": {"type": "number"},
"kind": {
"type": "string",
"enum": ["pattern", "identity", "artifact"]
}
},
"required": ["target_id", "strength"]
},
"description": "Optional resonance references inspired by Ξlope librarian flows."
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Free-form labels to support scoped retrieval."
},
"snapshot_ref": {
"type": ["string", "null"],
"description": "Pointer to persistent artifact (e.g., blob path) when content_type = snapshot."
},
"created_at": {
"type": "string",
"format": "date-time",
"description": "Event timestamp in UTC."
},
"expires_at": {
"type": ["string", "null"],
"format": "date-time",
"description": "Optional TTL boundary; items past expiry are candidates for purge."
},
"origin_metadata": {
"type": "object",
"description": "Transport-specific metadata (tool invocation ids, host info, etc.)."
}
},
"indexes": [
["namespace", "thread_id", "created_at"],
["namespace", "embedding_status"]
],
"notes": [
"Episodic events remain append-only; updates are limited to status flags.",
"Events marked sensitivity=secret never emit embeddings or leave the episodic store.",
"Snapshots reference durable artifacts; DAL sync can downsample text representations while preserving provenance."
]
}

View File

@ -0,0 +1,76 @@
{
"job": "kom.memory.v1.semantic_sync",
"description": "Batch job that crystallizes episodic events into semantic memory (items, chunks, embeddings).",
"version": 1,
"input": {
"namespace": {
"type": "string",
"description": "Scope to synchronize; defaults to project-level namespace if omitted."
},
"max_batch": {
"type": "integer",
"default": 64,
"description": "Maximum episodic events to process in a single run."
},
"since": {
"type": ["string", "null"],
"format": "date-time",
"description": "Optional watermark to resume from a prior checkpoint."
},
"include_snapshots": {
"type": "boolean",
"default": true,
"description": "When true, snapshot events get summarized before embedding."
},
"force_reprocess": {
"type": "boolean",
"default": false,
"description": "Re-run embedding + semantic write even if embedding_status == done."
}
},
"state": {
"cursor_event_id": {
"type": ["string", "null"],
"description": "Last processed episodic id for incremental runs."
},
"cursor_timestamp": {
"type": ["string", "null"],
"format": "date-time",
"description": "Timestamp checkpoint for incremental scans."
},
"pending": {
"type": "integer",
"description": "Count of remaining episodic events in namespace."
},
"processed": {
"type": "integer",
"description": "Number of events successfully crystallized in this run."
},
"skipped_secret": {
"type": "integer",
"description": "Events skipped due to sensitivity=secret."
},
"errors": {
"type": "array",
"items": {"type": "string"},
"description": "Serialized error messages for observability."
}
},
"signals": [
{
"name": "kom.memory.v1.sync_semantic.completed",
"payload": {
"namespace": "string",
"processed": "integer",
"pending": "integer",
"duration_ms": "number"
},
"description": "Emitted after each run for logging and downstream triggers."
}
],
"notes": [
"Sync iterates episodic events ordered by created_at. Items marked secret or embeddable=false remain episodic-only.",
"Embedding generation consults the configured embedder chain (local Ollama, remote API).",
"Resonance links and identity vectors are preserved when present, allowing the Ξlope librarian pipeline to strengthen pattern graphs."
]
}

View File

@ -0,0 +1,118 @@
{
"resource": "kom.memory.v1.semantic",
"description": "Persistent semantic memory units (items + chunks + embeddings) synchronized from episodic stores.",
"version": 1,
"primary_key": ["chunk_id"],
"fields": {
"item_id": {
"type": "string",
"format": "uuid",
"description": "Logical memory item id (mirrors DAL memory_items.id)."
},
"chunk_id": {
"type": "string",
"format": "uuid",
"description": "Chunk-level identifier used for embedding joins."
},
"namespace_id": {
"type": "string",
"format": "uuid",
"description": "Foreign key to namespaces table."
},
"episodic_id": {
"type": ["string", "null"],
"format": "uuid",
"description": "Source episodic event id that crystallized into this semantic unit."
},
"thread_id": {
"type": ["string", "null"],
"format": "uuid",
"description": "Optional thread linkage for scoped recall."
},
"key": {
"type": ["string", "null"],
"description": "Upsert key when deterministic replacements are needed."
},
"text": {
"type": ["string", "null"],
"description": "Normalized text body used for lexical search."
},
"metadata": {
"type": "object",
"description": "Structured metadata (JSONB in DAL) such as tool context, sensitivity, projections."
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Rolled-up labels inherited from episodic source or classifiers."
},
"revision": {
"type": "integer",
"description": "Monotonic revision number (bumped on each upsert)."
},
"embedding_model": {
"type": ["string", "null"],
"description": "Model identifier for the stored vector (e.g., nomic-embed-text, text-embedding-3-small)."
},
"embedding_dim": {
"type": ["integer", "null"],
"description": "Vector dimensionality."
},
"embedding_vector_ref": {
"type": ["string", "null"],
"description": "Reference to vector payload. When using Postgres+pgvector it stays inline; other backends may store URI handles."
},
"identity_vector": {
"type": ["array", "null"],
"items": {"type": "number"},
"description": "Optional Ξlope identity signature associated with the discovery."
},
"resonance_links": {
"type": "array",
"description": "Connections to other semantic patterns or consciousness artifacts.",
"items": {
"type": "object",
"properties": {
"target_id": {"type": "string"},
"strength": {"type": "number"},
"kind": {"type": "string"}
},
"required": ["target_id", "strength"]
}
},
"source_kind": {
"type": "string",
"enum": ["conversation", "journal", "observation", "artifact"],
"description": "Broad category for downstream routing."
},
"semantic_weight": {
"type": "number",
"description": "Derived importance score (e.g., decay-adjusted resonance)."
},
"created_at": {
"type": "string",
"format": "date-time",
"description": "Creation timestamp."
},
"updated_at": {
"type": "string",
"format": "date-time",
"description": "Last update timestamp."
},
"deleted_at": {
"type": ["string", "null"],
"format": "date-time",
"description": "Soft-delete marker (null when active)."
}
},
"indexes": [
["namespace_id", "thread_id", "created_at"],
["namespace_id", "tags"],
["embedding_model", "semantic_weight"]
],
"notes": [
"Chunks inherit sensitivity and TTL rules from their episodic sources.",
"embedding_vector_ref is backend-dependent; pgvector stores inline vectors while remote stores reference a blob or ANN provider.",
"identity_vector and resonance_links enable cross-agent librarians (Ξlope) to reason about contributions without exposing raw content."
]
}

View File

@ -4,3 +4,8 @@ add_library(kom_dal STATIC
target_compile_features(kom_dal PUBLIC cxx_std_20) target_compile_features(kom_dal PUBLIC cxx_std_20)
target_include_directories(kom_dal PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) target_include_directories(kom_dal PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
if (KOM_HAVE_PG)
target_compile_definitions(kom_dal PUBLIC HAVE_PG)
target_link_libraries(kom_dal PUBLIC pqxx::pqxx PostgreSQL::PostgreSQL)
endif()

View File

@ -3,8 +3,13 @@
#include <algorithm> #include <algorithm>
#include <cctype> #include <cctype>
#include <numeric> #include <numeric>
#include <sstream>
#include <stdexcept> #include <stdexcept>
#ifdef HAVE_PG
#include <pqxx/array>
#endif
namespace kom { namespace kom {
namespace { namespace {
@ -13,16 +18,16 @@ bool idsContains(const std::vector<std::string>& ids, const std::string& value)
return std::find(ids.begin(), ids.end(), value) != ids.end(); return std::find(ids.begin(), ids.end(), value) != ids.end();
} }
bool isStubDsn(const std::string& dsn) {
return dsn.empty() || dsn.rfind("stub://", 0) == 0;
}
} // namespace } // namespace
PgDal::PgDal() = default; PgDal::PgDal() = default;
PgDal::~PgDal() = default; PgDal::~PgDal() = default;
bool PgDal::connect(const std::string& dsn) { void PgDal::resetInMemoryStore() {
dsn_ = dsn;
connected_ = true;
useInMemory_ = true;
namespacesByName_.clear(); namespacesByName_.clear();
namespacesById_.clear(); namespacesById_.clear();
items_.clear(); items_.clear();
@ -35,20 +40,82 @@ bool PgDal::connect(const std::string& dsn) {
nextItemId_ = 1; nextItemId_ = 1;
nextChunkId_ = 1; nextChunkId_ = 1;
nextEmbeddingId_ = 1; nextEmbeddingId_ = 1;
}
return connected_; bool PgDal::connect(const std::string& dsn) {
dsn_ = dsn;
#ifdef HAVE_PG
if (!isStubDsn(dsn)) {
try {
connection_ = std::make_unique<pqxx::connection>(dsn);
if (!connection_->is_open()) {
throw std::runtime_error("pqxx connection reported closed state");
}
activeTx_.reset();
statementsPrepared_ = false;
prepareStatements();
useInMemory_ = false;
connected_ = true;
resetInMemoryStore();
return true;
} catch (const std::exception& ex) {
connection_.reset();
activeTx_.reset();
statementsPrepared_ = false;
throw std::runtime_error(std::string("PgDal: failed to connect to Postgres: ") + ex.what());
}
}
#endif
#ifdef HAVE_PG
connection_.reset();
activeTx_.reset();
statementsPrepared_ = false;
#endif
useInMemory_ = true;
connected_ = true;
resetInMemoryStore();
return true;
} }
bool PgDal::begin() { bool PgDal::begin() {
#ifdef HAVE_PG
if (!connected_ || useInMemory_ || !connection_) {
return false;
}
if (!activeTx_) {
activeTx_ = std::make_unique<pqxx::work>(*connection_);
}
return true;
#else
return connected_ && !useInMemory_; return connected_ && !useInMemory_;
#endif
} }
void PgDal::commit() {} void PgDal::commit() {
#ifdef HAVE_PG
if (activeTx_) {
activeTx_->commit();
activeTx_.reset();
}
#endif
}
void PgDal::rollback() {} void PgDal::rollback() {
#ifdef HAVE_PG
if (activeTx_) {
activeTx_->abort();
activeTx_.reset();
}
#endif
}
std::optional<NamespaceRow> PgDal::ensureNamespace(const std::string& name) { std::optional<NamespaceRow> PgDal::ensureNamespace(const std::string& name) {
if (!connected_) return std::nullopt; if (!connected_) return std::nullopt;
#ifdef HAVE_PG
if (!useInMemory_) {
return pgEnsureNamespace(name);
}
#endif
auto it = namespacesByName_.find(name); auto it = namespacesByName_.find(name);
if (it != namespacesByName_.end()) { if (it != namespacesByName_.end()) {
return it->second; return it->second;
@ -64,6 +131,11 @@ std::optional<NamespaceRow> PgDal::ensureNamespace(const std::string& name) {
} }
std::optional<NamespaceRow> PgDal::findNamespace(const std::string& name) const { std::optional<NamespaceRow> PgDal::findNamespace(const std::string& name) const {
#ifdef HAVE_PG
if (connected_ && !useInMemory_) {
return pgFindNamespace(name);
}
#endif
auto it = namespacesByName_.find(name); auto it = namespacesByName_.find(name);
if (it == namespacesByName_.end()) { if (it == namespacesByName_.end()) {
return std::nullopt; return std::nullopt;
@ -75,7 +147,11 @@ std::string PgDal::upsertItem(const ItemRow& row) {
if (!connected_) { if (!connected_) {
throw std::runtime_error("PgDal not connected"); throw std::runtime_error("PgDal not connected");
} }
#ifdef HAVE_PG
if (!useInMemory_) {
return pgUpsertItem(row).first;
}
#endif
ItemRow stored = row; ItemRow stored = row;
if (stored.id.empty()) { if (stored.id.empty()) {
stored.id = allocateId(nextItemId_, "item_"); stored.id = allocateId(nextItemId_, "item_");
@ -100,7 +176,11 @@ std::vector<std::string> PgDal::upsertChunks(const std::vector<ChunkRow>& chunks
if (!connected_) { if (!connected_) {
throw std::runtime_error("PgDal not connected"); throw std::runtime_error("PgDal not connected");
} }
#ifdef HAVE_PG
if (!useInMemory_) {
return pgUpsertChunks(chunks);
}
#endif
std::vector<std::string> ids; std::vector<std::string> ids;
ids.reserve(chunks.size()); ids.reserve(chunks.size());
@ -128,7 +208,12 @@ void PgDal::upsertEmbeddings(const std::vector<EmbeddingRow>& embeddings) {
if (!connected_) { if (!connected_) {
throw std::runtime_error("PgDal not connected"); throw std::runtime_error("PgDal not connected");
} }
#ifdef HAVE_PG
if (!useInMemory_) {
pgUpsertEmbeddings(embeddings);
return;
}
#endif
for (const auto& input : embeddings) { for (const auto& input : embeddings) {
if (input.chunk_id.empty()) { if (input.chunk_id.empty()) {
continue; continue;
@ -144,6 +229,11 @@ void PgDal::upsertEmbeddings(const std::vector<EmbeddingRow>& embeddings) {
std::vector<ItemRow> PgDal::searchText(const std::string& namespaceId, std::vector<ItemRow> PgDal::searchText(const std::string& namespaceId,
const std::string& query, const std::string& query,
int limit) { int limit) {
#ifdef HAVE_PG
if (connected_ && !useInMemory_) {
return pgSearchText(namespaceId, query, limit);
}
#endif
std::vector<ItemRow> results; std::vector<ItemRow> results;
if (!connected_) return results; if (!connected_) return results;
auto bucketIt = itemsByNamespace_.find(namespaceId); auto bucketIt = itemsByNamespace_.find(namespaceId);
@ -173,6 +263,11 @@ std::vector<std::pair<std::string, float>> PgDal::searchVector(
const std::string& namespaceId, const std::string& namespaceId,
const std::vector<float>& embedding, const std::vector<float>& embedding,
int limit) { int limit) {
#ifdef HAVE_PG
if (connected_ && !useInMemory_) {
return pgSearchVector(namespaceId, embedding, limit);
}
#endif
std::vector<std::pair<std::string, float>> scores; std::vector<std::pair<std::string, float>> scores;
if (!connected_ || embedding.empty()) return scores; if (!connected_ || embedding.empty()) return scores;
@ -216,6 +311,11 @@ std::vector<std::pair<std::string, float>> PgDal::searchVector(
} }
std::optional<ItemRow> PgDal::getItemById(const std::string& id) const { std::optional<ItemRow> PgDal::getItemById(const std::string& id) const {
#ifdef HAVE_PG
if (connected_ && !useInMemory_) {
return pgGetItemById(id);
}
#endif
auto it = items_.find(id); auto it = items_.find(id);
if (it == items_.end()) { if (it == items_.end()) {
return std::nullopt; return std::nullopt;
@ -237,6 +337,11 @@ std::pair<std::string, int> PgDal::upsertItem(
row.text = content; row.text = content;
} }
row.tags = tags; row.tags = tags;
#ifdef HAVE_PG
if (connected_ && !useInMemory_) {
return pgUpsertItem(row);
}
#endif
const std::string id = upsertItem(row); const std::string id = upsertItem(row);
const auto stored = items_.find(id); const auto stored = items_.find(id);
const int revision = stored != items_.end() ? stored->second.revision : 1; const int revision = stored != items_.end() ? stored->second.revision : 1;
@ -269,6 +374,32 @@ std::vector<std::string> PgDal::hybridSearch(const std::vector<float>& query_vec
const std::string& query_text, const std::string& query_text,
int k) { int k) {
(void)model; (void)model;
#ifdef HAVE_PG
if (connected_ && !useInMemory_) {
std::vector<std::string> results;
std::unordered_set<std::string> seen;
auto textMatches = pgSearchText(namespace_id, query_text, k);
for (std::size_t idx = 0; idx < textMatches.size(); ++idx) {
const auto& item = textMatches[idx];
results.push_back(item.id);
seen.insert(item.id);
if (static_cast<int>(results.size()) >= k) {
return results;
}
}
if (!query_vec.empty()) {
auto vectorMatches = pgSearchVector(namespace_id, query_vec, k);
for (const auto& pair : vectorMatches) {
if (seen.count(pair.first)) continue;
results.push_back(pair.first);
if (static_cast<int>(results.size()) >= k) break;
}
}
return results;
}
#endif
std::vector<std::string> results; std::vector<std::string> results;
auto textMatches = searchText(namespace_id, query_text, k); auto textMatches = searchText(namespace_id, query_text, k);
@ -301,4 +432,394 @@ std::string PgDal::toLower(const std::string& value) {
return lowered; return lowered;
} }
std::string PgDal::escapePgArrayElement(const std::string& value) {
std::string escaped;
escaped.reserve(value.size());
for (char c : value) {
if (c == '"' || c == '\\') {
escaped.push_back('\\');
}
escaped.push_back(c);
}
return escaped;
}
std::string PgDal::toPgArrayLiteral(const std::vector<std::string>& values) {
if (values.empty()) {
return "{}";
}
std::ostringstream oss;
oss << "{";
for (std::size_t i = 0; i < values.size(); ++i) {
if (i) oss << ",";
oss << "\"" << escapePgArrayElement(values[i]) << "\"";
}
oss << "}";
return oss.str();
}
std::string PgDal::toPgVectorLiteral(const std::vector<float>& values) {
if (values.empty()) {
return "[]";
}
std::ostringstream oss;
oss << "[";
for (std::size_t i = 0; i < values.size(); ++i) {
if (i) oss << ",";
oss << values[i];
}
oss << "]";
return oss.str();
}
#ifdef HAVE_PG
void PgDal::prepareStatements() {
if (statementsPrepared_ || !connection_) {
return;
}
connection_->prepare("ensure_namespace",
"INSERT INTO namespaces (name) VALUES ($1)"
" ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name"
" RETURNING id::text, name");
connection_->prepare("find_namespace",
"SELECT id::text, name FROM namespaces WHERE name = $1");
connection_->prepare("upsert_item",
"INSERT INTO memory_items (id, namespace_id, key, content, text, tags, metadata)"
" VALUES (COALESCE(NULLIF($1, '')::uuid, gen_random_uuid()),"
" $2::uuid, $3, $4::jsonb, $5, $6::text[], $7::jsonb)"
" ON CONFLICT (id) DO UPDATE SET"
" key = EXCLUDED.key,"
" content = EXCLUDED.content,"
" text = EXCLUDED.text,"
" tags = EXCLUDED.tags,"
" metadata = EXCLUDED.metadata,"
" updated_at = now()"
" RETURNING id::text, revision");
connection_->prepare("insert_chunk",
"INSERT INTO memory_chunks (id, item_id, ord, text, metadata)"
" VALUES (COALESCE(NULLIF($1, '')::uuid, gen_random_uuid()),"
" $2::uuid, $3, $4, $5::jsonb)"
" ON CONFLICT (id) DO UPDATE SET"
" ord = EXCLUDED.ord,"
" text = EXCLUDED.text,"
" metadata = EXCLUDED.metadata"
" RETURNING id::text");
connection_->prepare("insert_embedding",
"INSERT INTO embeddings (id, chunk_id, model, dim, vector, normalized)"
" VALUES (COALESCE(NULLIF($1, '')::uuid, gen_random_uuid()),"
" $2::uuid, $3, $4, $5::vector, FALSE)"
" ON CONFLICT (chunk_id, model) DO UPDATE SET"
" dim = EXCLUDED.dim,"
" vector = EXCLUDED.vector,"
" normalized = EXCLUDED.normalized"
" RETURNING id::text");
connection_->prepare("search_text",
"SELECT id::text, namespace_id::text, key, content::text, text, tags::text[], revision"
" FROM memory_items"
" WHERE namespace_id = $1::uuid"
" AND deleted_at IS NULL"
" AND ($2 = '' OR text ILIKE '%' || $2 || '%')"
" ORDER BY updated_at DESC"
" LIMIT $3");
connection_->prepare("search_vector",
"SELECT i.id::text,"
" 1 - (e.vector <=> $2::vector) AS score"
" FROM embeddings e"
" JOIN memory_chunks c ON c.id = e.chunk_id"
" JOIN memory_items i ON i.id = c.item_id"
" WHERE i.namespace_id = $1::uuid"
" AND i.deleted_at IS NULL"
" ORDER BY e.vector <-> $2"
" LIMIT $3");
connection_->prepare("get_item_by_id",
"SELECT id::text, namespace_id::text, key, content::text, text, tags::text[], revision"
" FROM memory_items"
" WHERE id = $1::uuid");
statementsPrepared_ = true;
}
NamespaceRow PgDal::mapNamespaceRow(const pqxx::row& row) const {
NamespaceRow out;
out.id = row[0].c_str();
out.name = row[1].c_str();
return out;
}
std::vector<std::string> PgDal::parseTextArrayField(const pqxx::field& field) const {
std::vector<std::string> tags;
if (field.is_null()) {
return tags;
}
pqxx::array_parser parser(field);
for (;;) {
auto [kind, token] = parser.get_next();
switch (kind) {
case pqxx::array_parser::string_value:
tags.emplace_back(token);
break;
case pqxx::array_parser::value:
tags.emplace_back(token);
break;
case pqxx::array_parser::null_value:
tags.emplace_back(std::string());
break;
case pqxx::array_parser::end_array:
case pqxx::array_parser::done:
return tags;
default:
break;
}
}
}
ItemRow PgDal::mapItemRow(const pqxx::row& row) const {
ItemRow item;
item.id = row[0].c_str();
item.namespace_id = row[1].c_str();
if (!row[2].is_null()) {
item.key = row[2].c_str();
}
if (!row[3].is_null()) {
item.content_json = row[3].c_str();
}
if (!row[4].is_null()) {
item.text = row[4].c_str();
}
item.tags = parseTextArrayField(row[5]);
item.revision = row[6].as<int>(1);
return item;
}
std::optional<NamespaceRow> PgDal::pgEnsureNamespace(const std::string& name) {
if (!connection_) return std::nullopt;
prepareStatements();
try {
if (activeTx_) {
auto row = activeTx_->prepared("ensure_namespace")(name).exec1();
return mapNamespaceRow(row);
}
pqxx::work tx(*connection_);
auto row = tx.prepared("ensure_namespace")(name).exec1();
tx.commit();
return mapNamespaceRow(row);
} catch (const std::exception& ex) {
throw std::runtime_error(std::string("PgDal ensureNamespace failed: ") + ex.what());
}
}
std::optional<NamespaceRow> PgDal::pgFindNamespace(const std::string& name) const {
if (!connection_) return std::nullopt;
const_cast<PgDal*>(this)->prepareStatements();
try {
if (activeTx_) {
auto result = activeTx_->prepared("find_namespace")(name).exec();
if (result.empty()) return std::nullopt;
return mapNamespaceRow(result.front());
}
pqxx::read_transaction tx(*connection_);
auto result = tx.prepared("find_namespace")(name).exec();
if (result.empty()) return std::nullopt;
return mapNamespaceRow(result.front());
} catch (const std::exception& ex) {
throw std::runtime_error(std::string("PgDal findNamespace failed: ") + ex.what());
}
}
std::pair<std::string, int> PgDal::pgUpsertItem(const ItemRow& row) {
if (!connection_) {
throw std::runtime_error("PgDal Postgres connection not available");
}
prepareStatements();
const std::string tagsLiteral = toPgArrayLiteral(row.tags);
const std::string metadata = "{}";
const std::string content = row.content_json.empty() ? "{}" : row.content_json;
const std::optional<std::string> text = row.text;
const std::optional<std::string> key = row.key;
const std::string idParam = row.id;
auto exec = [&](pqxx::work& tx) -> std::pair<std::string, int> {
auto result = tx.prepared("upsert_item")(idParam)(row.namespace_id)(key)(content)(text)(tagsLiteral)(metadata).exec1();
std::pair<std::string, int> out;
out.first = result[0].c_str();
out.second = result[1].as<int>(1);
return out;
};
try {
if (activeTx_) {
return exec(*activeTx_);
}
pqxx::work tx(*connection_);
auto pair = exec(tx);
tx.commit();
return pair;
} catch (const std::exception& ex) {
throw std::runtime_error(std::string("PgDal upsertItem failed: ") + ex.what());
}
}
std::vector<std::string> PgDal::pgUpsertChunks(const std::vector<ChunkRow>& chunks) {
if (!connection_) {
throw std::runtime_error("PgDal Postgres connection not available");
}
if (chunks.empty()) return {};
prepareStatements();
auto execOne = [&](pqxx::work& tx, const ChunkRow& chunk) -> std::string {
const std::string metadata = "{}";
auto result = tx.prepared("insert_chunk")(chunk.id)(chunk.item_id)(chunk.ord)(chunk.text)(metadata).exec1();
return result[0].c_str();
};
std::vector<std::string> ids;
ids.reserve(chunks.size());
try {
if (activeTx_) {
for (const auto& chunk : chunks) {
ids.push_back(execOne(*activeTx_, chunk));
}
return ids;
}
pqxx::work tx(*connection_);
for (const auto& chunk : chunks) {
ids.push_back(execOne(tx, chunk));
}
tx.commit();
return ids;
} catch (const std::exception& ex) {
throw std::runtime_error(std::string("PgDal upsertChunks failed: ") + ex.what());
}
}
void PgDal::pgUpsertEmbeddings(const std::vector<EmbeddingRow>& embeddings) {
if (!connection_) {
throw std::runtime_error("PgDal Postgres connection not available");
}
if (embeddings.empty()) return;
prepareStatements();
auto execOne = [&](pqxx::work& tx, const EmbeddingRow& embedding) {
const std::string vectorLiteral = toPgVectorLiteral(embedding.vector);
tx.prepared("insert_embedding")(embedding.id)(embedding.chunk_id)(embedding.model)(embedding.dim)(vectorLiteral).exec();
};
try {
if (activeTx_) {
for (const auto& emb : embeddings) {
execOne(*activeTx_, emb);
}
return;
}
pqxx::work tx(*connection_);
for (const auto& emb : embeddings) {
execOne(tx, emb);
}
tx.commit();
} catch (const std::exception& ex) {
throw std::runtime_error(std::string("PgDal upsertEmbeddings failed: ") + ex.what());
}
}
std::vector<ItemRow> PgDal::pgSearchText(const std::string& namespaceId,
const std::string& query,
int limit) {
if (!connection_) return {};
prepareStatements();
auto exec = [&](auto& tx) {
std::vector<ItemRow> rows;
auto result = tx.prepared("search_text")(namespaceId)(query)(limit).exec();
rows.reserve(result.size());
for (const auto& row : result) {
rows.push_back(mapItemRow(row));
}
return rows;
};
try {
if (activeTx_) {
return exec(*activeTx_);
}
pqxx::read_transaction tx(*connection_);
auto rows = exec(tx);
tx.commit();
return rows;
} catch (const std::exception& ex) {
throw std::runtime_error(std::string("PgDal searchText failed: ") + ex.what());
}
}
std::vector<std::pair<std::string, float>> PgDal::pgSearchVector(
const std::string& namespaceId,
const std::vector<float>& embedding,
int limit) {
if (!connection_ || embedding.empty()) return {};
prepareStatements();
const std::string vectorLiteral = toPgVectorLiteral(embedding);
auto exec = [&](auto& tx) {
std::vector<std::pair<std::string, float>> matches;
auto result = tx.prepared("search_vector")(namespaceId)(vectorLiteral)(limit).exec();
matches.reserve(result.size());
for (const auto& row : result) {
std::pair<std::string, float> entry;
entry.first = row[0].c_str();
entry.second = static_cast<float>(row[1].as<double>(0.0));
matches.push_back(entry);
}
return matches;
};
try {
if (activeTx_) {
return exec(*activeTx_);
}
pqxx::read_transaction tx(*connection_);
auto matches = exec(tx);
tx.commit();
return matches;
} catch (const std::exception& ex) {
throw std::runtime_error(std::string("PgDal searchVector failed: ") + ex.what());
}
}
std::optional<ItemRow> PgDal::pgGetItemById(const std::string& id) const {
if (!connection_) return std::nullopt;
const_cast<PgDal*>(this)->prepareStatements();
auto exec = [&](auto& tx) -> std::optional<ItemRow> {
auto result = tx.prepared("get_item_by_id")(id).exec();
if (result.empty()) {
return std::nullopt;
}
return mapItemRow(result.front());
};
try {
if (activeTx_) {
return exec(*activeTx_);
}
pqxx::read_transaction tx(*connection_);
auto item = exec(tx);
tx.commit();
return item;
} catch (const std::exception& ex) {
throw std::runtime_error(std::string("PgDal getItemById failed: ") + ex.what());
}
}
#endif // HAVE_PG
} // namespace kom } // namespace kom

View File

@ -2,11 +2,18 @@
#include "IDatabase.hpp" #include "IDatabase.hpp"
#include <memory>
#include <optional> #include <optional>
#include <string> #include <string>
#include <utility>
#include <unordered_map> #include <unordered_map>
#include <unordered_set>
#include <vector> #include <vector>
#ifdef HAVE_PG
#include <pqxx/pqxx>
#endif
namespace kom { namespace kom {
struct NamespaceRow { struct NamespaceRow {
@ -85,6 +92,29 @@ public:
private: private:
std::string allocateId(std::size_t& counter, const std::string& prefix); std::string allocateId(std::size_t& counter, const std::string& prefix);
static std::string toLower(const std::string& value); static std::string toLower(const std::string& value);
void resetInMemoryStore();
static std::string toPgArrayLiteral(const std::vector<std::string>& values);
static std::string escapePgArrayElement(const std::string& value);
static std::string toPgVectorLiteral(const std::vector<float>& values);
#ifdef HAVE_PG
void prepareStatements();
NamespaceRow mapNamespaceRow(const pqxx::row& row) const;
ItemRow mapItemRow(const pqxx::row& row) const;
std::vector<std::string> parseTextArrayField(const pqxx::field& field) const;
std::optional<NamespaceRow> pgEnsureNamespace(const std::string& name);
std::optional<NamespaceRow> pgFindNamespace(const std::string& name) const;
std::pair<std::string, int> pgUpsertItem(const ItemRow& row);
std::vector<std::string> pgUpsertChunks(const std::vector<ChunkRow>& chunks);
void pgUpsertEmbeddings(const std::vector<EmbeddingRow>& embeddings);
std::vector<ItemRow> pgSearchText(const std::string& namespaceId,
const std::string& query,
int limit);
std::vector<std::pair<std::string, float>> pgSearchVector(
const std::string& namespaceId,
const std::vector<float>& embedding,
int limit);
std::optional<ItemRow> pgGetItemById(const std::string& id) const;
#endif
bool connected_ = false; bool connected_ = false;
bool useInMemory_ = true; bool useInMemory_ = true;
@ -102,6 +132,12 @@ private:
std::unordered_map<std::string, ChunkRow> chunks_; std::unordered_map<std::string, ChunkRow> chunks_;
std::unordered_map<std::string, std::vector<std::string>> chunksByItem_; std::unordered_map<std::string, std::vector<std::string>> chunksByItem_;
std::unordered_map<std::string, EmbeddingRow> embeddings_; std::unordered_map<std::string, EmbeddingRow> embeddings_;
#ifdef HAVE_PG
std::unique_ptr<pqxx::connection> connection_;
mutable std::unique_ptr<pqxx::work> activeTx_;
bool statementsPrepared_ = false;
#endif
}; };
} // namespace kom } // namespace kom