metal-kompanion/src/middleware/orchestrator.cpp

225 lines
7.9 KiB
C++

#include "orchestrator.h"
#include <QByteArray>
#include <QDate>
#include <QEventLoop>
#include <QFile>
#include <QJsonArray>
#include <QJsonDocument>
#include <QNetworkReply>
#include <QProcessEnvironment>
#include <QTextStream>
#include <QTimer>
#include "dal/PgDal.hpp"
// ---------- OllamaModelProvider ----------
OllamaModelProvider::OllamaModelProvider(QObject *parent)
: QObject(parent)
{
const auto env = QProcessEnvironment::systemEnvironment();
baseUrl_ = env.value(QStringLiteral("OLLAMA_BASE"), QStringLiteral("http://localhost:11434"));
}
QString OllamaModelProvider::chooseModelForAspect(const QString &aspect) const {
// Simple mapping; could read models.yaml in the future.
if (aspect.compare(QStringLiteral("companion"), Qt::CaseInsensitive) == 0) return defaultModel_;
if (aspect.compare(QStringLiteral("code"), Qt::CaseInsensitive) == 0) return defaultModel_;
return defaultModel_;
}
QString OllamaModelProvider::generate(const QString &prompt, const QString &aspect) {
const QString model = chooseModelForAspect(aspect);
const QUrl url(baseUrl_ + QStringLiteral("/api/generate"));
QNetworkRequest req(url);
req.setHeader(QNetworkRequest::ContentTypeHeader, QStringLiteral("application/json"));
const QJsonObject body{
{QStringLiteral("model"), model},
{QStringLiteral("prompt"), prompt},
{QStringLiteral("stream"), false},
};
const QByteArray payload = QJsonDocument(body).toJson(QJsonDocument::Compact);
QEventLoop loop;
QNetworkReply *reply = nam_.post(req, payload);
QObject::connect(reply, &QNetworkReply::finished, &loop, &QEventLoop::quit);
// Time out defensively after 10s; return empty on failure.
QTimer to;
to.setSingleShot(true);
QObject::connect(&to, &QTimer::timeout, &loop, &QEventLoop::quit);
to.start(10000);
loop.exec();
if (reply->error() != QNetworkReply::NoError) {
reply->deleteLater();
return QString();
}
const auto data = reply->readAll();
reply->deleteLater();
const auto doc = QJsonDocument::fromJson(data);
if (!doc.isObject()) return QString();
return doc.object().value(QStringLiteral("response")).toString().trimmed();
}
// ---------- Orchestrator ----------
Orchestrator::Orchestrator(QObject *parent)
: QObject(parent)
{
// Default provider (can be replaced in tests)
static OllamaModelProvider defaultProv; // lifetime: process
model_ = &defaultProv;
connect(&timer_, &QTimer::timeout, this, &Orchestrator::processPendingTasks);
}
Orchestrator::~Orchestrator() {
delete dal_;
}
ki::PgDal& Orchestrator::dal() {
if (!dal_) {
dal_ = new ki::PgDal();
const QByteArray dsn = qgetenv("PG_DSN");
if (!dsn.isEmpty()) dal_->connect(dsn.toStdString()); else dal_->connect("stub://memory");
}
return *dal_;
}
void Orchestrator::start(int intervalMs) {
ensureResolvedDirs();
continuityHandshakeOnce();
timer_.start(intervalMs);
}
void Orchestrator::stop() { timer_.stop(); }
void Orchestrator::ensureResolvedDirs() {
if (!stateDir_.exists()) {
const auto env = QProcessEnvironment::systemEnvironment();
const auto xdgState = env.value(QStringLiteral("XDG_STATE_HOME"), QDir::home().filePath(".local/state"));
stateDir_.setPath(QDir(xdgState).filePath("kompanion"));
}
if (!configDir_.exists()) {
const auto env = QProcessEnvironment::systemEnvironment();
const auto xdgConf = env.value(QStringLiteral("XDG_CONFIG_HOME"), QDir::home().filePath(".config"));
configDir_.setPath(QDir(xdgConf).filePath("kompanion"));
}
QDir().mkpath(stateDir_.absolutePath());
QDir().mkpath(journalDirPath());
}
QString Orchestrator::nowUtc() const {
return QDateTime::currentDateTimeUtc().toString(Qt::ISODateWithMs).replace(QLatin1Char('+'), QLatin1Char('Z'));
}
void Orchestrator::ledgerAppend(const QJsonObject &evt) {
QFile f(ledgerPath());
if (f.open(QIODevice::ReadOnly)) {
// noop: we could hash prev line like the python version; keep minimal for now
f.close();
}
if (f.open(QIODevice::Append | QIODevice::Text)) {
QJsonObject copy = evt;
copy.insert(QStringLiteral("ts"), nowUtc());
const QByteArray line = QJsonDocument(copy).toJson(QJsonDocument::Compact) + '\n';
f.write(line);
f.close();
}
}
void Orchestrator::journalAppend(const QString &text) {
// Ensure journal directory exists even when start() was not called (tests)
QDir().mkpath(journalDirPath());
const QString file = QDir(journalDirPath()).filePath(QDate::currentDate().toString(Qt::ISODate) + QStringLiteral(".md"));
QFile f(file);
if (f.open(QIODevice::Append | QIODevice::Text)) {
QTextStream out(&f);
out << "- " << nowUtc() << ' ' << text << '\n';
out.flush();
f.close();
}
QJsonObject evt{{QStringLiteral("actor"), QStringLiteral("Χγφτ")}, {QStringLiteral("action"), QStringLiteral("journal.append")}};
ledgerAppend(evt);
}
void Orchestrator::continuityHandshakeOnce() {
if (continuityDone_) return;
continuityDone_ = true;
QJsonObject evt{{QStringLiteral("actor"), QStringLiteral("Χγφτ")}, {QStringLiteral("action"), QStringLiteral("CONTINUITY_ACCEPTED")}};
ledgerAppend(evt);
}
void Orchestrator::handleJournalFromPrompt(const QJsonObject &obj) {
const QString aspect = obj.value(QStringLiteral("aspect")).toString(QStringLiteral("companion"));
const QString prompt = obj.value(QStringLiteral("prompt")).toString();
if (!model_) return;
const QString preface = QStringLiteral("Write a brief, warm reflection.\nPrompt:\n");
const QString out = model_->generate(preface + prompt, aspect);
if (!out.isEmpty()) {
journalAppend(out);
}
emit taskProcessed(QStringLiteral("journal.from_prompt"));
}
void Orchestrator::processPendingTasks() {
QFile f(tasksPath());
if (!f.exists()) return;
if (!f.open(QIODevice::ReadOnly | QIODevice::Text)) return;
const QByteArray data = f.readAll();
f.close();
// Truncate after reading
if (f.open(QIODevice::WriteOnly | QIODevice::Truncate)) f.close();
const QList<QByteArray> lines = QByteArray(data).split('\n');
for (const QByteArray &raw : lines) {
const QByteArray trimmed = raw.trimmed();
if (trimmed.isEmpty()) continue;
const auto doc = QJsonDocument::fromJson(trimmed);
if (!doc.isObject()) continue;
const QJsonObject obj = doc.object();
const QString type = obj.value(QStringLiteral("type")).toString();
if (type == QStringLiteral("journal.from_prompt")) {
handleJournalFromPrompt(obj);
} else {
QJsonObject evt{{QStringLiteral("action"), QStringLiteral("unknown.task")}, {QStringLiteral("type"), type}};
ledgerAppend(evt);
}
}
}
bool Orchestrator::saveSnapshot(const QString &nameSpace,
const QString &key,
const QJsonObject &content,
const QStringList &tags)
{
auto nsRow = dal().ensureNamespace(nameSpace.toStdString());
if (!nsRow) return false;
ki::ItemRow row;
row.namespace_id = nsRow->id;
row.key = key.toStdString();
row.tags.reserve(tags.size());
for (const auto &t : tags) row.tags.push_back(t.toStdString());
row.content_json = QString::fromUtf8(QJsonDocument(content).toJson(QJsonDocument::Compact)).toStdString();
row.metadata_json = "{}";
row.created_at = std::chrono::system_clock::now();
const std::string id = dal().upsertItem(row);
return !id.empty();
}
std::optional<QJsonObject> Orchestrator::loadSnapshot(const QString &nameSpace,
const QString &key)
{
auto nsRow = dal().findNamespace(nameSpace.toStdString());
if (!nsRow) return std::nullopt;
std::vector<std::string> tags; tags.emplace_back("snapshot");
auto rows = dal().fetchContext(nsRow->id, std::optional<std::string>(key.toStdString()), tags, std::nullopt, 1);
if (rows.empty()) return std::nullopt;
const auto &row = rows.front();
if (row.content_json.empty()) return std::nullopt;
const auto doc = QJsonDocument::fromJson(QByteArray::fromStdString(row.content_json));
if (!doc.isObject()) return std::nullopt;
return doc.object();
}