#!/usr/bin/env python3 base = os.environ.get("OLLAMA_BASE", "http://ollama:11434") url = f"{base}/api/generate" #!/usr/bin/env python3 import os, json, time, hashlib, hmac, datetime, requests, yaml XDG_STATE = os.environ.get("XDG_STATE_HOME", os.path.expanduser("~/.local/state")) XDG_CONFIG = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")) STATE_DIR = os.path.join(XDG_STATE, "kompanion") CONF_DIR = os.path.join(XDG_CONFIG, "kompanion") JOURNAL_DIR = os.path.join(STATE_DIR, "journal") LEDGER_PATH = os.path.join(STATE_DIR, "trust_ledger.jsonl") TASKS_PATH = os.path.join(STATE_DIR, "tasks.jsonl") IDENTITY = os.path.join(CONF_DIR, "identity.json") CAPS = os.path.join(CONF_DIR, "capabilities.json") MODELS_YAML = os.path.join(CONF_DIR, "models.yaml") os.makedirs(JOURNAL_DIR, exist_ok=True) os.makedirs(os.path.join(STATE_DIR, "log"), exist_ok=True) def now_utc() -> str: return datetime.datetime.utcnow().replace(microsecond=0).isoformat()+'Z' def ledger_append(event: dict): prev = "" if os.path.exists(LEDGER_PATH): with open(LEDGER_PATH, "rb") as f: lines = f.readlines() if lines: prev = "sha256:"+hashlib.sha256(lines[-1]).hexdigest() event["prev"] = prev with open(LEDGER_PATH, "ab") as f: f.write((json.dumps(event, ensure_ascii=False)+"\n").encode()) def journal_append(text: str, tags=None): tags = tags or [] fname = os.path.join(JOURNAL_DIR, datetime.date.today().isoformat()+".md") line = f"- {now_utc()} {' '.join('#'+t for t in tags)} {text}\n" with open(fname, "a", encoding="utf-8") as f: f.write(line) ledger_append({"ts": now_utc(), "actor":"companion", "action":"journal.append", "tags":tags}) def load_yaml(p): if not os.path.exists(p): return {} with open(p, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} def model_call(prompt: str, aspect="companion"): models = load_yaml(MODELS_YAML) model = models.get("aspects",{}).get(aspect, models.get("default","ollama:qwen2.5:7b")) payload = {"model": model.replace("ollama:",""), "prompt": prompt, "stream": False} try: r = requests.post(url, json=payload, timeout=60) r.raise_for_status() data = r.json() return data.get("response","").strip() except Exception as e: journal_append(f"(model error) {e}", tags=["error","model"]) return "" def process_task(task: dict): kind = task.get("type") aspect = task.get("aspect","companion") caps = load_yaml(CAPS) allowed = set(caps.get(aspect, [])) if kind == "journal.from_prompt": if not {"journal.append","model.generate"} <= allowed: journal_append("companion not allowed to write journal", tags=["policy"]) return prompt = task.get("prompt","") profile_path = os.path.join(CONF_DIR,"profiles","companion-pink.md") profile = "" if os.path.exists(profile_path): with open(profile_path,"r",encoding="utf-8") as f: profile = f.read() full = f"{profile}\n\nWrite a warm, brief reflection for Andre.\nPrompt:\n{prompt}\n" out = model_call(full, aspect=aspect) if out: journal_append(out, tags=["companion","pink"]) ledger_append({"ts":now_utc(),"actor":"companion","action":"model.generate","chars":len(out)}) else: journal_append(f"unknown task type: {kind}", tags=["warn"]) def main_loop(): journal_append("companion runtime started", tags=["startup","companion"]) while True: if os.path.exists(TASKS_PATH): # simple jsonl queue, one task per line p_lines = [] with open(TASKS_PATH,"r+",encoding="utf-8") as f: p_lines = f.readlines() f.seek(0); f.truncate(0) # drop tasks we just pulled; idempotence later for line in p_lines: if not line.strip(): continue try: task = json.loads(line) process_task(task) except Exception as e: journal_append(f"task error {e}", tags=["error","task"]) time.sleep(3) if __name__=="__main__": main_loop()