Files

409 lines
13 KiB
TypeScript
Raw Permalink Normal View History

2025-09-26 14:28:04 -04:00
import http, { IncomingMessage } from 'http'
import fs from 'fs'
import path from 'path'
import crypto from 'node:crypto'
import { fileURLToPath } from 'url'
// import Docker from 'dockerode'
import cron from 'node-cron'
import loginUser, { getUser } from './helpers/resolve-user'
import { authCall, callNewChat, ollamaInp, schedInp } from './helpers/ollamaCalls'
2025-10-03 08:41:37 -04:00
// KEEP AS STRING
const openApiSpec = fs.readFileSync('./openapi.json', 'utf-8'),
__filename = fileURLToPath(import.meta.url),
2025-09-26 14:28:04 -04:00
__dirname = path.dirname(__filename),
2025-10-03 08:41:37 -04:00
DEFAULT_TZ = "America/New_York",
2025-09-26 14:28:04 -04:00
// folders/files
DATA_DIR = process.env.DATA_DIR || path.join(__dirname, 'data'),
SCHEDULES_FILE = path.join(DATA_DIR, 'schedules.json'),
TEMPLATES_FILE = process.env.TEMPLATES_FILE || path.join(__dirname, 'templates.json'),
FILES_DIR_PREFERRED = process.env.FILES_DIR || '/app/data/files',
FILES_DIR_FALLBACK = path.join(DATA_DIR, 'files');
let FILES_DIR = FILES_DIR_PREFERRED;
2025-10-03 08:41:37 -04:00
// defaults
const PORT = Number(process.env.PORT) || 12253
2025-09-26 14:28:04 -04:00
// connect to docker (via local socket by default)
// const docker = new Docker({ socketPath: process.env.DOCKER_SOCKET || '/var/run/docker.sock' })
// in-memory schedule registry
const tasks = new Map() // key -> { task, def }
export const readBodyJson = (req: http.IncomingMessage): Promise<any> => new Promise((resolve, reject) => {
let d = ''; req.on('data', c => d += c)
req.on('end', () => { try { resolve(JSON.parse(d || '{}')) } catch (e) { reject(e) } })
req.on('error', reject)
});
const ensureDir = (p: string) => { try { fs.mkdirSync(p, { recursive: true }) } catch { } },
readJsonFile = (p: string, fallback: any = null) => { try { return JSON.parse(fs.readFileSync(p, 'utf8')) } catch { return fallback } },
writeJsonFile = (p: string, obj: object) => fs.writeFileSync(p, JSON.stringify(obj, null, 2))
// build cron string from an iso timestamp (treated as America/New_York)
const cronFromISO = (iso?: string) => {
if (!iso) return;
const dt = new Date(iso);
if (Number.isNaN(dt.getTime())) return;
const parts: { year: number, month: number, day: number, hour: number, minute: number } = new Intl.DateTimeFormat('en-US', {
timeZone: DEFAULT_TZ, year: 'numeric', month: 'numeric', day: 'numeric',
hour: 'numeric', minute: '2-digit', hour12: false
}).formatToParts(dt).reduce((a, p) => (a[p.type] = Number(p.value), a), {} as any);
return `${parts.minute} ${parts.hour} ${parts.day} ${parts.month} *`;
}
const normalizeFeatures = (value: unknown): Record<string, boolean> => {
if (!value || typeof value !== 'object' || Array.isArray(value)) return {};
return Object.fromEntries(Object.entries(value as Record<string, unknown>).map(([k, v]) => [k, Boolean(v)]));
};
type IncomingFile = { fname?: string, fkey?: string, content?: string };
const sanitizeBase64 = (raw?: string) => typeof raw === 'string' ? raw.replace(/^data:[^;]+;base64,/, '') : '';
const prepareIncomingFiles = (value: unknown): { fname: string, fkey: string }[] | string => {
if (!value) return [];
if (!Array.isArray(value)) return 'files must be an array';
const saved: { fname: string, fkey: string }[] = [];
for (const entry of value as IncomingFile[]) {
if (!entry || typeof entry !== 'object') return 'invalid file entry';
2025-10-03 08:41:37 -04:00
2025-09-26 14:28:04 -04:00
const fname = typeof entry.fname === 'string' ? entry.fname.trim() : '';
if (!fname) return 'file entries require fname';
if (entry.content && typeof entry.content === 'string') {
const b64 = sanitizeBase64(entry.content);
if (!b64) return `file ${fname} is missing content`;
2025-10-03 08:41:37 -04:00
const fkey = crypto.randomUUID(),
target = path.join(FILES_DIR, fkey);
2025-09-26 14:28:04 -04:00
try {
fs.writeFileSync(target, Buffer.from(b64, 'base64'));
} catch (err) {
console.error('failed to store file', fname, err);
return `failed to store file ${fname}`;
}
2025-10-03 08:41:37 -04:00
2025-09-26 14:28:04 -04:00
saved.push({ fname, fkey });
continue;
}
if (entry.fkey && typeof entry.fkey === 'string' && entry.fkey.trim()) {
saved.push({ fname, fkey: entry.fkey.trim() });
continue;
}
return `file ${fname} is missing content or fkey`;
}
return saved;
};
// derive a docker-safe, user-scoped name and preserve a human display name
const scopedName = (name: string, userId: string) => {
const base = String(name).toLowerCase().replace(/[^a-z0-9-]+/g, '-').replace(/^-+|-+$/g, '').slice(0, 40),
suffix = String(userId).toLowerCase().replace(/[^a-z0-9]+/g, '').slice(0, 8) || 'anon';
return `${base}--u-${suffix}`;
}
const fetchUserID = async (req: IncomingMessage) => {
const authheader = String(req.headers['authorization'] || '').trim();
if (!authheader) throw Object.assign(new Error('Missing authorization header'), { status: 401 });
if (!authheader.startsWith('Bearer')) throw Object.assign(new Error('Invalid token'), { status: 401 });
try {
const token = authheader.split(' ')[1],
user = await authCall(token);
return user.id;
}
catch {
throw Object.assign(new Error('Invalid token'), { status: 401 });
}
}
// load templates (maps "name" -> { image, command?, args?, env? })
const loadTemplates = () => readJsonFile(TEMPLATES_FILE, { items: [] }).items || [];
// persistence of schedules (for restart durability)
ensureDir(DATA_DIR)
ensureDir(FILES_DIR)
if (!fs.existsSync(FILES_DIR)) {
FILES_DIR = FILES_DIR_FALLBACK;
ensureDir(FILES_DIR);
}
const persist = () => {
const defs = [...tasks.values()].map(v => v.def)
writeJsonFile(SCHEDULES_FILE, { items: defs })
};
const restore = () => {
const saved = readJsonFile(SCHEDULES_FILE, { items: [] }).items || [];
for (const def of saved) scheduleOrReplace(def);
};
// schedule management (create/update)
function scheduleOrReplace(defInput: ollamaInp) {
const def: ollamaInp = { ...defInput, features: normalizeFeatures(defInput?.features) };
// stop existing
const key = def.name
if (tasks.has(key)) { try { tasks.get(key).task.stop() } catch { } tasks.delete(key) }
2025-10-03 08:41:37 -04:00
const startTs = def.startAt ? new Date(def.startAt).getTime() : null;
2025-09-26 14:28:04 -04:00
// forbid overlapping runs per schedule (like argo's Forbid)
let running = false
const task = cron.schedule(def.schedule, async () => {
if (running) return;
if (startTs && Date.now() < startTs) return;
running = true
try {
await callNewChat(def);
// one-shot: stop after first success
if (def.oneShot) {
try { task.stop() } catch { }
tasks.delete(key)
persist()
}
} catch (err) {
// TODO: collect errors here
console.error(err);
// this failed, stop the task now
try { task.stop() } catch { }
try { tasks.delete(key) } catch { }
persist()
} finally {
running = false
}
}, { timezone: DEFAULT_TZ })
tasks.set(key, { task, def })
task.start()
persist()
}
// also does validation
const toScheduleDef = ({ name, when, oneShot = false, template = { name: '' }, parameters = {}, prompt, userId, model, tools, features = {}, files = [] }: schedInp): ollamaInp | string => {
const normalizedFeatures = normalizeFeatures(features);
const startAt = when.start?.trim() || '';
const hasStartAt = Boolean(startAt);
const processedFiles = prepareIncomingFiles(files);
if (typeof processedFiles === 'string') return processedFiles;
if (oneShot) {
if (!hasStartAt) return "One-shot schedules require a start date";
const schedule = cronFromISO(startAt);
if (!schedule) return "Invalid start date";
const startDate = new Date(startAt);
if (startDate.getTime() < Date.now()) return "Date can not be in the past";
return {
name: scopedName(name, userId),
displayName: name,
userId,
schedule,
startAt,
oneShot: true,
template,
parameters,
prompt,
model,
tools,
features: normalizedFeatures,
cookie: '',
files: processedFiles
};
}
const schedule = when.cron?.trim();
if (!schedule) return "Cron expression is required";
if (hasStartAt) {
const startDate = new Date(startAt);
if (Number.isNaN(startDate.getTime())) return "Invalid start date";
if (startDate.getTime() < Date.now()) return "Start date can not be in the past";
}
return {
name: scopedName(name, userId),
displayName: name,
userId,
schedule,
startAt: hasStartAt ? startAt : undefined,
oneShot: false,
template,
parameters,
prompt,
model,
tools,
features: normalizedFeatures,
cookie: '',
files: processedFiles
};
}
const publicDir = path.join(__dirname, 'public');
function reqToURL(req: IncomingMessage) {
let pathname = '/';
try {
pathname = new URL(req.url!, `http://${req.headers.host || 'localhost'}`).pathname;
} catch {
pathname = req.url || '/';
}
return pathname;
}
// http server
const server = http.createServer(async (req, res) => {
try {
// very light cors
const origin = req.headers.origin || '*'
res.setHeader('access-control-allow-origin', origin)
res.setHeader('vary', 'origin')
res.setHeader('access-control-allow-headers', 'content-type, x-user-id')
res.setHeader('access-control-allow-methods', 'GET, POST, DELETE, OPTIONS')
if (req.method === 'OPTIONS') return res.writeHead(204).end()
const pathname = reqToURL(req),
allowed = ['/', ...fs.readdirSync(publicDir).map(f => `/${f}`)];
if (req.method === 'GET' && pathname === '/openapi.json') {
2025-10-03 08:41:37 -04:00
return res.writeHead(200, { 'content-type': 'application/json' }).end(openApiSpec);
2025-09-26 14:28:04 -04:00
}
// list schedules for the calling user
if (req.method === 'GET' && pathname === '/api/schedules') {
const userId = await fetchUserID(req),
items = [...tasks.values()]
.map(v => v.def)
.filter(d => d.userId === userId)
.map(d => ({
name: d.name,
displayName: d.displayName,
userId: d.userId,
schedules: [d.schedule],
startAt: d.startAt,
oneShot: d.oneShot,
templateRef: d.template,
prompt: d.prompt,
model: d.model,
tools: d.tools,
features: d.features,
files: d.files
}));
return res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true, items }))
}
// list "workflow templates" => just expose templates.json names
if (req.method === 'GET' && pathname === '/api/workflowtemplates') {
const items = loadTemplates().map((t: { name: string }) => ({ name: t.name }))
return res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true, items }))
}
if (req.method === 'GET' && allowed.includes(pathname)) {
try {
const fileName = pathname === '/' ? 'index.html' : pathname.slice(1),
filePath = path.join(publicDir, fileName),
ext = path.extname(fileName).toLowerCase(),
type = ext === '.js' ? 'application/javascript' : ext === '.css' ? 'text/css' : 'text/html; charset=utf-8',
content = fs.readFileSync(filePath, 'utf8')
return res.writeHead(200, { 'content-type': type }).end(content);
} catch {
return res.writeHead(404).end('ui not found');
}
}
// create/update a user-scoped schedule
if (req.method === 'POST' && pathname === '/api/schedules') {
const userId = await fetchUserID(req),
input = await readBodyJson(req),
def = toScheduleDef({ ...input, userId });
if (typeof def === 'string') {
return res.writeHead(400, { 'content-type': 'application/json' })
.end(JSON.stringify({ error: def, ok: false }));
}
def.cookie = req.headers['authorization'].split(' ')[1];
scheduleOrReplace(def);
return res.writeHead(201, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true }));
}
if (req.method === 'POST' && pathname.startsWith('/login')) {
return await loginUser(req, res).catch((err) => {
console.error(err);
res.writeHead(500).end();
});
}
if (req.method === 'GET' && pathname.startsWith('/api/me')) {
return await getUser(req, res).catch((err) => {
console.error(err);
res.writeHead(500).end();
});
}
// delete a schedule owned by the calling user
if (req.method === 'DELETE' && pathname.startsWith('/api/schedules/')) {
const paramRaw = pathname.split('/').pop();
if (!paramRaw) {
return res.writeHead(400).end({ error: "missing schedule ID" });
}
const userId = await fetchUserID(req),
nameParam = decodeURIComponent(paramRaw),
2025-10-03 08:41:37 -04:00
2025-09-26 14:28:04 -04:00
// stored names are already scoped; accept either raw or scoped
key = tasks.has(nameParam) ? nameParam : scopedName(nameParam, userId),
existing = tasks.get(key)?.def;
if (!existing) {
return res.writeHead(404, {
'content-type': 'application/json'
}).end(JSON.stringify({
ok: false,
error: 'not found'
}));
}
if (existing.userId !== userId) {
return res.writeHead(403, {
'content-type': 'application/json'
}).end(JSON.stringify({
ok: false,
error: 'forbidden: schedule not owned by this user'
}));
}
try { tasks.get(key).task.stop(); } catch { }
tasks.delete(key);
persist();
return res.writeHead(204).end();
}
res.writeHead(404).end('not found');
} catch (e: any) {
console.error(e);
const code = Number(e.status) || 500
res.writeHead(code, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: false, error: e.message || String(e) }))
}
})
// boot
restore();
server.listen(PORT, "0.0.0.0", () => {
const addr = server!.address();
console.log(`schedules api listening on ${typeof addr === 'string' ? addr : addr?.address}:${PORT}`)
});