import http, { IncomingMessage } from 'http' import fs from 'fs' import path from 'path' import crypto from 'node:crypto' import { fileURLToPath } from 'url' // import Docker from 'dockerode' import cron from 'node-cron' import loginUser, { getUser } from './helpers/resolve-user' import { authCall, callNewChat, ollamaInp, schedInp } from './helpers/ollamaCalls' const DEFAULT_TZ = "America/New_York" const openApiSpec = { openapi: '3.0.1', info: { title: 'Scheduler API', description: 'API for managing scheduled prompts. Cron expressions run in America/New_York (EST).', version: '1.0.0' }, servers: [ { url: '/', description: 'Current server' } ], paths: { '/api/schedules': { get: { summary: 'List schedules', security: [{ BearerAuth: [] }], responses: { 200: { description: 'Successful response', content: { 'application/json': { schema: { type: 'object', properties: { ok: { type: 'boolean', example: true }, items: { type: 'array', items: { $ref: '#/components/schemas/Schedule' } } } } } } }, 401: { $ref: '#/components/responses/Unauthorized' } } }, post: { summary: 'Create or replace a schedule', security: [{ BearerAuth: [] }], requestBody: { required: true, content: { 'application/json': { schema: { $ref: '#/components/schemas/ScheduleInput' } } } }, responses: { 201: { description: 'Schedule created or updated', content: { 'application/json': { schema: { type: 'object', properties: { ok: { type: 'boolean', example: true } } } } } }, 400: { $ref: '#/components/responses/BadRequest' }, 401: { $ref: '#/components/responses/Unauthorized' } } } }, '/api/schedules/{name}': { delete: { summary: 'Delete a schedule', security: [{ BearerAuth: [] }], parameters: [ { name: 'name', in: 'path', description: 'Schedule name (raw or scoped)', required: true, schema: { type: 'string' } } ], responses: { 204: { description: 'Schedule deleted' }, 401: { $ref: '#/components/responses/Unauthorized' }, 403: { $ref: '#/components/responses/Forbidden' }, 404: { $ref: '#/components/responses/NotFound' } } } } }, components: { securitySchemes: { BearerAuth: { type: 'http', scheme: 'bearer', bearerFormat: 'JWT' } }, schemas: { Schedule: { type: 'object', properties: { name: { type: 'string', description: 'Scoped identifier used internally' }, displayName: { type: 'string' }, userId: { type: 'string' }, schedules: { type: 'array', items: { type: 'string', description: 'Cron expression' } }, startAt: { type: 'string', format: 'date-time', nullable: true }, oneShot: { type: 'boolean' }, templateRef: { $ref: '#/components/schemas/TemplateRef' }, prompt: { type: 'string' }, model: { type: 'string' }, tools: { type: 'array', items: { type: 'string' } }, features: { type: 'object', additionalProperties: { type: 'boolean' } }, files: { type: 'array', items: { type: 'object', properties: { fname: { type: 'string' }, fkey: { type: 'string' } } } } }, required: ['name', 'userId', 'schedules', 'oneShot'] }, ScheduleInput: { type: 'object', properties: { name: { type: 'string' }, when: { $ref: '#/components/schemas/ScheduleWhen' }, oneShot: { type: 'boolean', default: false }, template: { $ref: '#/components/schemas/TemplateInput' }, parameters: { type: 'object', additionalProperties: true }, files: { type: 'array', items: { type: 'object', properties: { fname: { type: 'string' }, fkey: { type: 'string', description: 'Optional previously stored file reference' }, content: { type: 'string', description: 'Base64 encoded file contents' } }, required: ['fname'] } }, prompt: { type: 'string' }, model: { type: 'string' }, tools: { type: 'array', items: { type: 'string' } }, features: { type: 'object', additionalProperties: { type: 'boolean' } } }, required: ['name', 'when', 'template', 'parameters', 'prompt', 'model', 'tools'] }, ScheduleWhen: { type: 'object', properties: { cron: { type: 'string', description: '5-field cron expression evaluated in America/New_York' }, start: { type: 'string', format: 'date-time', description: 'Optional start gate; required when oneShot is true' } }, required: [] }, TemplateInput: { type: 'object', properties: { name: { type: 'string' }, clusterScope: { type: 'boolean', default: false } }, required: ['name'] }, TemplateRef: { type: 'object', properties: { name: { type: 'string' }, clusterScope: { type: 'boolean' } }, required: ['name'] } }, responses: { Unauthorized: { description: 'Missing or invalid credentials' }, BadRequest: { description: 'Invalid request payload', content: { 'application/json': { schema: { type: 'object', properties: { ok: { type: 'boolean', example: false }, error: { type: 'string' } } } } } }, Forbidden: { description: 'Schedule exists but belongs to another user' }, NotFound: { description: 'Schedule not found' } } } } as const; const __filename = fileURLToPath(import.meta.url), __dirname = path.dirname(__filename), // folders/files DATA_DIR = process.env.DATA_DIR || path.join(__dirname, 'data'), SCHEDULES_FILE = path.join(DATA_DIR, 'schedules.json'), TEMPLATES_FILE = process.env.TEMPLATES_FILE || path.join(__dirname, 'templates.json'), FILES_DIR_PREFERRED = process.env.FILES_DIR || '/app/data/files', FILES_DIR_FALLBACK = path.join(DATA_DIR, 'files'); let FILES_DIR = FILES_DIR_PREFERRED; // defaults PORT = Number(process.env.PORT) || 12253 // connect to docker (via local socket by default) // const docker = new Docker({ socketPath: process.env.DOCKER_SOCKET || '/var/run/docker.sock' }) // in-memory schedule registry const tasks = new Map() // key -> { task, def } export const readBodyJson = (req: http.IncomingMessage): Promise => new Promise((resolve, reject) => { let d = ''; req.on('data', c => d += c) req.on('end', () => { try { resolve(JSON.parse(d || '{}')) } catch (e) { reject(e) } }) req.on('error', reject) }); const ensureDir = (p: string) => { try { fs.mkdirSync(p, { recursive: true }) } catch { } }, readJsonFile = (p: string, fallback: any = null) => { try { return JSON.parse(fs.readFileSync(p, 'utf8')) } catch { return fallback } }, writeJsonFile = (p: string, obj: object) => fs.writeFileSync(p, JSON.stringify(obj, null, 2)) // build cron string from an iso timestamp (treated as America/New_York) const cronFromISO = (iso?: string) => { if (!iso) return; const dt = new Date(iso); if (Number.isNaN(dt.getTime())) return; const parts: { year: number, month: number, day: number, hour: number, minute: number } = new Intl.DateTimeFormat('en-US', { timeZone: DEFAULT_TZ, year: 'numeric', month: 'numeric', day: 'numeric', hour: 'numeric', minute: '2-digit', hour12: false }).formatToParts(dt).reduce((a, p) => (a[p.type] = Number(p.value), a), {} as any); return `${parts.minute} ${parts.hour} ${parts.day} ${parts.month} *`; } const normalizeFeatures = (value: unknown): Record => { if (!value || typeof value !== 'object' || Array.isArray(value)) return {}; return Object.fromEntries(Object.entries(value as Record).map(([k, v]) => [k, Boolean(v)])); }; type IncomingFile = { fname?: string, fkey?: string, content?: string }; const sanitizeBase64 = (raw?: string) => typeof raw === 'string' ? raw.replace(/^data:[^;]+;base64,/, '') : ''; const prepareIncomingFiles = (value: unknown): { fname: string, fkey: string }[] | string => { if (!value) return []; if (!Array.isArray(value)) return 'files must be an array'; const saved: { fname: string, fkey: string }[] = []; for (const entry of value as IncomingFile[]) { if (!entry || typeof entry !== 'object') return 'invalid file entry'; const fname = typeof entry.fname === 'string' ? entry.fname.trim() : ''; if (!fname) return 'file entries require fname'; if (entry.content && typeof entry.content === 'string') { const b64 = sanitizeBase64(entry.content); if (!b64) return `file ${fname} is missing content`; const fkey = crypto.randomUUID(); const target = path.join(FILES_DIR, fkey); try { fs.writeFileSync(target, Buffer.from(b64, 'base64')); } catch (err) { console.error('failed to store file', fname, err); return `failed to store file ${fname}`; } saved.push({ fname, fkey }); continue; } if (entry.fkey && typeof entry.fkey === 'string' && entry.fkey.trim()) { saved.push({ fname, fkey: entry.fkey.trim() }); continue; } return `file ${fname} is missing content or fkey`; } return saved; }; // derive a docker-safe, user-scoped name and preserve a human display name const scopedName = (name: string, userId: string) => { const base = String(name).toLowerCase().replace(/[^a-z0-9-]+/g, '-').replace(/^-+|-+$/g, '').slice(0, 40), suffix = String(userId).toLowerCase().replace(/[^a-z0-9]+/g, '').slice(0, 8) || 'anon'; return `${base}--u-${suffix}`; } const fetchUserID = async (req: IncomingMessage) => { const authheader = String(req.headers['authorization'] || '').trim(); if (!authheader) throw Object.assign(new Error('Missing authorization header'), { status: 401 }); if (!authheader.startsWith('Bearer')) throw Object.assign(new Error('Invalid token'), { status: 401 }); try { const token = authheader.split(' ')[1], user = await authCall(token); return user.id; } catch { throw Object.assign(new Error('Invalid token'), { status: 401 }); } } // load templates (maps "name" -> { image, command?, args?, env? }) const loadTemplates = () => readJsonFile(TEMPLATES_FILE, { items: [] }).items || []; // persistence of schedules (for restart durability) ensureDir(DATA_DIR) ensureDir(FILES_DIR) if (!fs.existsSync(FILES_DIR)) { FILES_DIR = FILES_DIR_FALLBACK; ensureDir(FILES_DIR); } const persist = () => { const defs = [...tasks.values()].map(v => v.def) writeJsonFile(SCHEDULES_FILE, { items: defs }) }; const restore = () => { const saved = readJsonFile(SCHEDULES_FILE, { items: [] }).items || []; for (const def of saved) scheduleOrReplace(def); }; // schedule management (create/update) function scheduleOrReplace(defInput: ollamaInp) { const def: ollamaInp = { ...defInput, features: normalizeFeatures(defInput?.features) }; // stop existing const key = def.name if (tasks.has(key)) { try { tasks.get(key).task.stop() } catch { } tasks.delete(key) } const startTs = def.startAt ? new Date(def.startAt).getTime() : null; // forbid overlapping runs per schedule (like argo's Forbid) let running = false const task = cron.schedule(def.schedule, async () => { if (running) return; if (startTs && Date.now() < startTs) return; running = true try { await callNewChat(def); // one-shot: stop after first success if (def.oneShot) { try { task.stop() } catch { } tasks.delete(key) persist() } } catch (err) { // TODO: collect errors here console.error(err); // this failed, stop the task now try { task.stop() } catch { } try { tasks.delete(key) } catch { } persist() } finally { running = false } }, { timezone: DEFAULT_TZ }) tasks.set(key, { task, def }) task.start() persist() } // also does validation const toScheduleDef = ({ name, when, oneShot = false, template = { name: '' }, parameters = {}, prompt, userId, model, tools, features = {}, files = [] }: schedInp): ollamaInp | string => { const normalizedFeatures = normalizeFeatures(features); const startAt = when.start?.trim() || ''; const hasStartAt = Boolean(startAt); const processedFiles = prepareIncomingFiles(files); if (typeof processedFiles === 'string') return processedFiles; if (oneShot) { if (!hasStartAt) return "One-shot schedules require a start date"; const schedule = cronFromISO(startAt); if (!schedule) return "Invalid start date"; const startDate = new Date(startAt); if (startDate.getTime() < Date.now()) return "Date can not be in the past"; return { name: scopedName(name, userId), displayName: name, userId, schedule, startAt, oneShot: true, template, parameters, prompt, model, tools, features: normalizedFeatures, cookie: '', files: processedFiles }; } const schedule = when.cron?.trim(); if (!schedule) return "Cron expression is required"; if (hasStartAt) { const startDate = new Date(startAt); if (Number.isNaN(startDate.getTime())) return "Invalid start date"; if (startDate.getTime() < Date.now()) return "Start date can not be in the past"; } return { name: scopedName(name, userId), displayName: name, userId, schedule, startAt: hasStartAt ? startAt : undefined, oneShot: false, template, parameters, prompt, model, tools, features: normalizedFeatures, cookie: '', files: processedFiles }; } const publicDir = path.join(__dirname, 'public'); function reqToURL(req: IncomingMessage) { let pathname = '/'; try { pathname = new URL(req.url!, `http://${req.headers.host || 'localhost'}`).pathname; } catch { pathname = req.url || '/'; } return pathname; } // http server const server = http.createServer(async (req, res) => { try { // very light cors const origin = req.headers.origin || '*' res.setHeader('access-control-allow-origin', origin) res.setHeader('vary', 'origin') res.setHeader('access-control-allow-headers', 'content-type, x-user-id') res.setHeader('access-control-allow-methods', 'GET, POST, DELETE, OPTIONS') if (req.method === 'OPTIONS') return res.writeHead(204).end() const pathname = reqToURL(req), allowed = ['/', ...fs.readdirSync(publicDir).map(f => `/${f}`)]; if (req.method === 'GET' && pathname === '/openapi.json') { return res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify(openApiSpec, null, 2)); } // list schedules for the calling user if (req.method === 'GET' && pathname === '/api/schedules') { const userId = await fetchUserID(req), items = [...tasks.values()] .map(v => v.def) .filter(d => d.userId === userId) .map(d => ({ name: d.name, displayName: d.displayName, userId: d.userId, schedules: [d.schedule], startAt: d.startAt, oneShot: d.oneShot, templateRef: d.template, prompt: d.prompt, model: d.model, tools: d.tools, features: d.features, files: d.files })); return res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true, items })) } // list "workflow templates" => just expose templates.json names if (req.method === 'GET' && pathname === '/api/workflowtemplates') { const items = loadTemplates().map((t: { name: string }) => ({ name: t.name })) return res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true, items })) } if (req.method === 'GET' && allowed.includes(pathname)) { try { const fileName = pathname === '/' ? 'index.html' : pathname.slice(1), filePath = path.join(publicDir, fileName), ext = path.extname(fileName).toLowerCase(), type = ext === '.js' ? 'application/javascript' : ext === '.css' ? 'text/css' : 'text/html; charset=utf-8', content = fs.readFileSync(filePath, 'utf8') return res.writeHead(200, { 'content-type': type }).end(content); } catch { return res.writeHead(404).end('ui not found'); } } // create/update a user-scoped schedule if (req.method === 'POST' && pathname === '/api/schedules') { const userId = await fetchUserID(req), input = await readBodyJson(req), def = toScheduleDef({ ...input, userId }); if (typeof def === 'string') { return res.writeHead(400, { 'content-type': 'application/json' }) .end(JSON.stringify({ error: def, ok: false })); } def.cookie = req.headers['authorization'].split(' ')[1]; scheduleOrReplace(def); return res.writeHead(201, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true })); } if (req.method === 'POST' && pathname.startsWith('/login')) { return await loginUser(req, res).catch((err) => { console.error(err); res.writeHead(500).end(); }); } if (req.method === 'GET' && pathname.startsWith('/api/me')) { return await getUser(req, res).catch((err) => { console.error(err); res.writeHead(500).end(); }); } // delete a schedule owned by the calling user if (req.method === 'DELETE' && pathname.startsWith('/api/schedules/')) { const paramRaw = pathname.split('/').pop(); if (!paramRaw) { return res.writeHead(400).end({ error: "missing schedule ID" }); } const userId = await fetchUserID(req), nameParam = decodeURIComponent(paramRaw), // stored names are already scoped; accept either raw or scoped key = tasks.has(nameParam) ? nameParam : scopedName(nameParam, userId), existing = tasks.get(key)?.def; if (!existing) { return res.writeHead(404, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: false, error: 'not found' })); } if (existing.userId !== userId) { return res.writeHead(403, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: false, error: 'forbidden: schedule not owned by this user' })); } try { tasks.get(key).task.stop(); } catch { } tasks.delete(key); persist(); return res.writeHead(204).end(); } res.writeHead(404).end('not found'); } catch (e: any) { console.error(e); const code = Number(e.status) || 500 res.writeHead(code, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: false, error: e.message || String(e) })) } }) // boot restore(); server.listen(PORT, "0.0.0.0", () => { const addr = server!.address(); console.log(`schedules api listening on ${typeof addr === 'string' ? addr : addr?.address}:${PORT}`) });