Files
ollama-plus/scheduler/server.mjs
T
2025-09-15 10:45:38 -04:00

306 lines
10 KiB
JavaScript

import http from 'http'
import fs from 'fs'
import path from 'path'
import { fileURLToPath } from 'url'
import Docker from 'dockerode'
import cron from 'node-cron'
const LABEL_USER_KEY = 'openwebui.user-id',
ANNO_DISPLAY_NAME = 'openwebui/display-name'
const __filename = fileURLToPath(import.meta.url),
__dirname = path.dirname(__filename),
// folders/files
DATA_DIR = process.env.DATA_DIR || path.join(__dirname, 'data'),
SCHEDULES_FILE = path.join(DATA_DIR, 'schedules.json'),
TEMPLATES_FILE = process.env.TEMPLATES_FILE || path.join(__dirname, 'templates.json'),
// defaults
DEFAULT_TZ = 'America/New_York',
PORT = Number(process.env.PORT) || 12253
// connect to docker (via local socket by default)
const docker = new Docker({ socketPath: process.env.DOCKER_SOCKET || '/var/run/docker.sock' })
// in-memory schedule registry
const tasks = new Map() // key -> { task, def }
const readBodyJson = (req) => new Promise((resolve, reject) => {
let d = ''; req.on('data', c => d += c)
req.on('end', () => { try { resolve(JSON.parse(d || '{}')) } catch (e) { reject(e) } })
req.on('error', reject)
}),
ensureDir = (p) => { try { fs.mkdirSync(p, { recursive: true }) } catch { } },
readJsonFile = (p, fallback = null) => { try { return JSON.parse(fs.readFileSync(p, 'utf8')) } catch { return fallback } },
writeJsonFile = (p, obj) => fs.writeFileSync(p, JSON.stringify(obj, null, 2))
// build cron string from an iso timestamp in a timezone (same as your original)
const cronFromISO = (iso, tz = DEFAULT_TZ) => {
const dt = new Date(iso),
parts = new Intl.DateTimeFormat('en-US', {
timeZone: tz, year: 'numeric', month: 'numeric', day: 'numeric',
hour: 'numeric', minute: '2-digit', hour12: false
}).formatToParts(dt).reduce((a, p) => (a[p.type] = p.value, a), {})
const m = Number(parts.month), d = Number(parts.day), h = Number(parts.hour), min = Number(parts.minute)
return `${min} ${h} ${d} ${m} *`
}
// derive a docker-safe, user-scoped name and preserve a human display name
const scopedName = (name, userId) => {
const base = String(name).toLowerCase().replace(/[^a-z0-9-]+/g, '-').replace(/^-+|-+$/g, '').slice(0, 40),
suffix = String(userId).toLowerCase().replace(/[^a-z0-9]+/g, '').slice(0, 8) || 'anon'
return `${base}--u-${suffix}`
}
// ensure we have a user id header
const requireUserId = (req) => {
const userId = String(req.headers['x-user-id'] || '').trim()
if (!userId) throw Object.assign(new Error('missing x-user-id header'), { status: 401 })
return userId
}
// load templates (maps "name" -> { image, command?, args?, env? })
const loadTemplates = () => readJsonFile(TEMPLATES_FILE, { items: [] }).items || []
const findTemplate = (name) => loadTemplates().find(t => t.name === name)
// create/start a container for a template (run once)
async function runContainer({ displayName, template, parameters = {}, userId, entrypoint }) {
if (!template?.name) throw Object.assign(new Error('missing template.name'), { status: 400 })
const t = findTemplate(template.name)
if (!t) throw Object.assign(new Error(`unknown template: ${template.name}`), { status: 404 })
// env: pass user + params as env vars (simple & portable)
const env = [
`USER_ID=${userId}`,
`DISPLAY_NAME=${displayName}`,
...Object.entries(parameters).map(([k, v]) => `PARAM_${String(k).toUpperCase()}=${String(v)}`)
]
// image pull if absent then create+start (mirrors docker run flow)
// ref: docker engine api sequence: create -> pull if 404 -> create -> start
// https://docs.docker.com/reference/api/engine/version/v1.24/ (section 4.1)
try {
await docker.getImage(t.image).inspect()
} catch {
await new Promise((resolve, reject) => {
docker.pull(t.image, (err, stream) => {
if (err) return reject(err)
docker.modem.followProgress(stream, (err2) => err2 ? reject(err2) : resolve())
})
})
}
const nameActual = `${scopedName(displayName || t.name, userId)}-${Math.random().toString(36).slice(2, 8)}`
// build container create options
const createOpts = {
Image: t.image,
// optional explicit entrypoint/cmd wiring
Entrypoint: entrypoint ? [entrypoint] : (t.entrypoint ? [].concat(t.entrypoint) : undefined),
Cmd: t.command ? [].concat(t.command, t.args || []) : (t.args ? [].concat(t.args) : undefined),
Env: env,
Labels: {
[LABEL_USER_KEY]: userId,
[ANNO_DISPLAY_NAME]: displayName || t.name
},
HostConfig: {
AutoRemove: true
},
name: nameActual
}
const container = await docker.createContainer(createOpts) // create
await container.start() // start
return { id: container.id, name: nameActual }
}
// persistence of schedules (for restart durability)
ensureDir(DATA_DIR)
const persist = () => {
const defs = [...tasks.values()].map(v => v.def)
writeJsonFile(SCHEDULES_FILE, { items: defs })
}
const restore = () => {
const saved = readJsonFile(SCHEDULES_FILE, { items: [] }).items || []
for (const def of saved) scheduleOrReplace(def)
}
// schedule management (create/update)
function scheduleOrReplace(def) {
// stop existing
const key = def.name
if (tasks.has(key)) { try { tasks.get(key).task.stop() } catch { } tasks.delete(key) }
// forbid overlapping runs per schedule (like argo's Forbid)
let running = false
const task = cron.schedule(def.schedule, async () => {
if (running) return
running = true
try {
await runContainer({
displayName: def.displayName,
template: def.template,
parameters: def.parameters,
userId: def.userId,
entrypoint: def.entrypoint
})
// one-shot: stop after first success
if (def.oneShot) {
try { task.stop() } catch { }
tasks.delete(key)
persist()
}
} catch (e) {
// you could log or collect errors here
} finally {
running = false
}
}, { timezone: def.timezone || DEFAULT_TZ })
tasks.set(key, { task, def })
task.start()
persist()
}
// convert input to schedule def
const toScheduleDef = ({ name, when, tz = DEFAULT_TZ, oneShot = false, template = { name: '' }, parameters = {}, entrypoint, userId }) => {
const schedule = when?.cron ?? cronFromISO(when?.iso, tz)
return {
name: scopedName(name, userId),
displayName: name,
userId,
timezone: tz,
schedule,
oneShot: Boolean(oneShot),
template,
parameters,
entrypoint
}
}
const publicDir = path.join(__dirname, 'public');
function reqToURL(req) {
let pathname = '/';
try {
pathname = new URL(req.url, `http://${req.headers.host || 'localhost'}`).pathname;
} catch {
pathname = req.url || '/';
}
return pathname;
}
// http server
const server = http.createServer(async (req, res) => {
try {
// very light cors
const origin = req.headers.origin || '*'
res.setHeader('access-control-allow-origin', origin)
res.setHeader('vary', 'origin')
res.setHeader('access-control-allow-headers', 'content-type, x-user-id')
res.setHeader('access-control-allow-methods', 'GET, POST, DELETE, OPTIONS')
if (req.method === 'OPTIONS') return res.writeHead(204).end()
const pathname = reqToURL(req),
allowed = ['/', '/index.html', '/script.js', '/style.css'];
//#region GET requests
// list schedules for the calling user
if (req.method === 'GET' && pathname === '/api/schedules') {
const userId = requireUserId(req),
items = [...tasks.values()]
.map(v => v.def)
.filter(d => d.userId === userId)
.map(d => ({
name: d.name,
displayName: d.displayName,
userId: d.userId,
timezone: d.timezone,
schedules: [d.schedule],
oneShot: d.oneShot,
templateRef: d.template,
entrypoint: d.entrypoint
}));
return res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true, items }))
}
// list "workflow templates" => just expose templates.json names
if (req.method === 'GET' && pathname === '/api/workflowtemplates') {
const items = loadTemplates().map(t => ({ name: t.name }))
return res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true, items }))
}
if (req.method === 'GET' && allowed.includes(pathname)) {
try {
const fileName = pathname === '/' ? 'index.html' : pathname.slice(1),
filePath = path.join(publicDir, fileName),
ext = path.extname(fileName).toLowerCase(),
type = ext === '.js' ? 'application/javascript' : ext === '.css' ? 'text/css' : 'text/html; charset=utf-8',
content = fs.readFileSync(filePath, 'utf8')
return res.writeHead(200, { 'content-type': type }).end(content);
} catch {
return res.writeHead(404).end('ui not found');
}
}
// DO NOT PUT ANY GET REQUESTS BELOW THIS LINE, THEY WILL FAIL
//#endregion
// create/update a user-scoped schedule
if (req.method === 'POST' && pathname === '/schedules') {
const userId = requireUserId(req),
input = await readBodyJson(req),
def = toScheduleDef({ ...input, userId })
scheduleOrReplace(def)
return res.writeHead(201, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true }))
}
// run a job now for the calling user (no schedule)
if (req.method === 'POST' && pathname === '/run-now') {
const userId = requireUserId(req),
input = await readBodyJson(req)
await runContainer({
displayName: input.name,
template: input.template,
parameters: input.parameters || {},
userId,
entrypoint: input.entrypoint
})
return res.writeHead(201, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true }))
}
// delete a schedule owned by the calling user
if (req.method === 'DELETE' && pathname.startsWith('/schedules/')) {
const userId = requireUserId(req),
nameParam = decodeURIComponent(pathname.split('/').pop()),
// stored names are already scoped; accept either raw or scoped
key = tasks.has(nameParam) ? nameParam : scopedName(nameParam, userId),
existing = tasks.get(key)?.def
if (!existing) return res.writeHead(404, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: false, error: 'not found' }))
if (existing.userId !== userId) return res.writeHead(403, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: false, error: 'forbidden: schedule not owned by this user' }))
try { tasks.get(key).task.stop() } catch { }
tasks.delete(key)
persist()
return res.writeHead(204).end()
}
res.writeHead(404).end('not found')
} catch (e) {
const code = Number(e.status) || 500
res.writeHead(code, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: false, error: e.message || String(e) }))
}
})
// boot
restore()
server.listen(PORT, "0.0.0.0", () => console.log(`schedules api listening on ${server.address().address}:${PORT}`));