modifying scheduler
This commit is contained in:
@@ -0,0 +1,305 @@
|
||||
import http from 'http'
|
||||
import fs from 'fs'
|
||||
import path from 'path'
|
||||
import { fileURLToPath } from 'url'
|
||||
import Docker from 'dockerode'
|
||||
import cron from 'node-cron'
|
||||
|
||||
const LABEL_USER_KEY = 'openwebui.user-id',
|
||||
ANNO_DISPLAY_NAME = 'openwebui/display-name'
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url),
|
||||
__dirname = path.dirname(__filename),
|
||||
|
||||
// folders/files
|
||||
DATA_DIR = process.env.DATA_DIR || path.join(__dirname, 'data'),
|
||||
SCHEDULES_FILE = path.join(DATA_DIR, 'schedules.json'),
|
||||
TEMPLATES_FILE = process.env.TEMPLATES_FILE || path.join(__dirname, 'templates.json'),
|
||||
|
||||
// defaults
|
||||
DEFAULT_TZ = 'America/New_York',
|
||||
PORT = Number(process.env.PORT) || 12253
|
||||
|
||||
// connect to docker (via local socket by default)
|
||||
const docker = new Docker({ socketPath: process.env.DOCKER_SOCKET || '/var/run/docker.sock' })
|
||||
|
||||
// in-memory schedule registry
|
||||
const tasks = new Map() // key -> { task, def }
|
||||
|
||||
const readBodyJson = (req) => new Promise((resolve, reject) => {
|
||||
let d = ''; req.on('data', c => d += c)
|
||||
req.on('end', () => { try { resolve(JSON.parse(d || '{}')) } catch (e) { reject(e) } })
|
||||
req.on('error', reject)
|
||||
}),
|
||||
ensureDir = (p) => { try { fs.mkdirSync(p, { recursive: true }) } catch { } },
|
||||
readJsonFile = (p, fallback = null) => { try { return JSON.parse(fs.readFileSync(p, 'utf8')) } catch { return fallback } },
|
||||
writeJsonFile = (p, obj) => fs.writeFileSync(p, JSON.stringify(obj, null, 2))
|
||||
|
||||
// build cron string from an iso timestamp in a timezone (same as your original)
|
||||
const cronFromISO = (iso, tz = DEFAULT_TZ) => {
|
||||
const dt = new Date(iso),
|
||||
parts = new Intl.DateTimeFormat('en-US', {
|
||||
timeZone: tz, year: 'numeric', month: 'numeric', day: 'numeric',
|
||||
hour: 'numeric', minute: '2-digit', hour12: false
|
||||
}).formatToParts(dt).reduce((a, p) => (a[p.type] = p.value, a), {})
|
||||
const m = Number(parts.month), d = Number(parts.day), h = Number(parts.hour), min = Number(parts.minute)
|
||||
return `${min} ${h} ${d} ${m} *`
|
||||
}
|
||||
|
||||
// derive a docker-safe, user-scoped name and preserve a human display name
|
||||
const scopedName = (name, userId) => {
|
||||
const base = String(name).toLowerCase().replace(/[^a-z0-9-]+/g, '-').replace(/^-+|-+$/g, '').slice(0, 40),
|
||||
suffix = String(userId).toLowerCase().replace(/[^a-z0-9]+/g, '').slice(0, 8) || 'anon'
|
||||
return `${base}--u-${suffix}`
|
||||
}
|
||||
|
||||
// ensure we have a user id header
|
||||
const requireUserId = (req) => {
|
||||
const userId = String(req.headers['x-user-id'] || '').trim()
|
||||
if (!userId) throw Object.assign(new Error('missing x-user-id header'), { status: 401 })
|
||||
return userId
|
||||
}
|
||||
|
||||
// load templates (maps "name" -> { image, command?, args?, env? })
|
||||
const loadTemplates = () => readJsonFile(TEMPLATES_FILE, { items: [] }).items || []
|
||||
const findTemplate = (name) => loadTemplates().find(t => t.name === name)
|
||||
|
||||
// create/start a container for a template (run once)
|
||||
async function runContainer({ displayName, template, parameters = {}, userId, entrypoint }) {
|
||||
if (!template?.name) throw Object.assign(new Error('missing template.name'), { status: 400 })
|
||||
const t = findTemplate(template.name)
|
||||
if (!t) throw Object.assign(new Error(`unknown template: ${template.name}`), { status: 404 })
|
||||
|
||||
// env: pass user + params as env vars (simple & portable)
|
||||
const env = [
|
||||
`USER_ID=${userId}`,
|
||||
`DISPLAY_NAME=${displayName}`,
|
||||
...Object.entries(parameters).map(([k, v]) => `PARAM_${String(k).toUpperCase()}=${String(v)}`)
|
||||
]
|
||||
|
||||
// image pull if absent then create+start (mirrors docker run flow)
|
||||
// ref: docker engine api sequence: create -> pull if 404 -> create -> start
|
||||
// https://docs.docker.com/reference/api/engine/version/v1.24/ (section 4.1)
|
||||
try {
|
||||
await docker.getImage(t.image).inspect()
|
||||
} catch {
|
||||
await new Promise((resolve, reject) => {
|
||||
docker.pull(t.image, (err, stream) => {
|
||||
if (err) return reject(err)
|
||||
docker.modem.followProgress(stream, (err2) => err2 ? reject(err2) : resolve())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
const nameActual = `${scopedName(displayName || t.name, userId)}-${Math.random().toString(36).slice(2, 8)}`
|
||||
|
||||
// build container create options
|
||||
const createOpts = {
|
||||
Image: t.image,
|
||||
// optional explicit entrypoint/cmd wiring
|
||||
Entrypoint: entrypoint ? [entrypoint] : (t.entrypoint ? [].concat(t.entrypoint) : undefined),
|
||||
Cmd: t.command ? [].concat(t.command, t.args || []) : (t.args ? [].concat(t.args) : undefined),
|
||||
Env: env,
|
||||
Labels: {
|
||||
[LABEL_USER_KEY]: userId,
|
||||
[ANNO_DISPLAY_NAME]: displayName || t.name
|
||||
},
|
||||
HostConfig: {
|
||||
AutoRemove: true
|
||||
},
|
||||
name: nameActual
|
||||
}
|
||||
|
||||
const container = await docker.createContainer(createOpts) // create
|
||||
await container.start() // start
|
||||
return { id: container.id, name: nameActual }
|
||||
}
|
||||
|
||||
// persistence of schedules (for restart durability)
|
||||
ensureDir(DATA_DIR)
|
||||
const persist = () => {
|
||||
const defs = [...tasks.values()].map(v => v.def)
|
||||
writeJsonFile(SCHEDULES_FILE, { items: defs })
|
||||
}
|
||||
const restore = () => {
|
||||
const saved = readJsonFile(SCHEDULES_FILE, { items: [] }).items || []
|
||||
for (const def of saved) scheduleOrReplace(def)
|
||||
}
|
||||
|
||||
// schedule management (create/update)
|
||||
function scheduleOrReplace(def) {
|
||||
// stop existing
|
||||
const key = def.name
|
||||
if (tasks.has(key)) { try { tasks.get(key).task.stop() } catch { } tasks.delete(key) }
|
||||
|
||||
// forbid overlapping runs per schedule (like argo's Forbid)
|
||||
let running = false
|
||||
const task = cron.schedule(def.schedule, async () => {
|
||||
if (running) return
|
||||
running = true
|
||||
try {
|
||||
await runContainer({
|
||||
displayName: def.displayName,
|
||||
template: def.template,
|
||||
parameters: def.parameters,
|
||||
userId: def.userId,
|
||||
entrypoint: def.entrypoint
|
||||
})
|
||||
// one-shot: stop after first success
|
||||
if (def.oneShot) {
|
||||
try { task.stop() } catch { }
|
||||
tasks.delete(key)
|
||||
persist()
|
||||
}
|
||||
} catch (e) {
|
||||
// you could log or collect errors here
|
||||
} finally {
|
||||
running = false
|
||||
}
|
||||
}, { timezone: def.timezone || DEFAULT_TZ })
|
||||
|
||||
tasks.set(key, { task, def })
|
||||
task.start()
|
||||
persist()
|
||||
}
|
||||
|
||||
// convert input to schedule def
|
||||
const toScheduleDef = ({ name, when, tz = DEFAULT_TZ, oneShot = false, template = { name: '' }, parameters = {}, entrypoint, userId }) => {
|
||||
const schedule = when?.cron ?? cronFromISO(when?.iso, tz)
|
||||
return {
|
||||
name: scopedName(name, userId),
|
||||
displayName: name,
|
||||
userId,
|
||||
timezone: tz,
|
||||
schedule,
|
||||
oneShot: Boolean(oneShot),
|
||||
template,
|
||||
parameters,
|
||||
entrypoint
|
||||
}
|
||||
}
|
||||
|
||||
const publicDir = path.join(__dirname, 'public');
|
||||
|
||||
function reqToURL(req) {
|
||||
let pathname = '/';
|
||||
try {
|
||||
pathname = new URL(req.url, `http://${req.headers.host || 'localhost'}`).pathname;
|
||||
} catch {
|
||||
pathname = req.url || '/';
|
||||
}
|
||||
|
||||
return pathname;
|
||||
}
|
||||
|
||||
// http server
|
||||
const server = http.createServer(async (req, res) => {
|
||||
try {
|
||||
// very light cors
|
||||
const origin = req.headers.origin || '*'
|
||||
res.setHeader('access-control-allow-origin', origin)
|
||||
res.setHeader('vary', 'origin')
|
||||
res.setHeader('access-control-allow-headers', 'content-type, x-user-id')
|
||||
res.setHeader('access-control-allow-methods', 'GET, POST, DELETE, OPTIONS')
|
||||
if (req.method === 'OPTIONS') return res.writeHead(204).end()
|
||||
|
||||
const pathname = reqToURL(req),
|
||||
allowed = ['/', '/index.html', '/script.js', '/style.css'];
|
||||
|
||||
|
||||
//#region GET requests
|
||||
|
||||
// list schedules for the calling user
|
||||
if (req.method === 'GET' && pathname === '/api/schedules') {
|
||||
const userId = requireUserId(req),
|
||||
items = [...tasks.values()]
|
||||
.map(v => v.def)
|
||||
.filter(d => d.userId === userId)
|
||||
.map(d => ({
|
||||
name: d.name,
|
||||
displayName: d.displayName,
|
||||
userId: d.userId,
|
||||
timezone: d.timezone,
|
||||
schedules: [d.schedule],
|
||||
oneShot: d.oneShot,
|
||||
templateRef: d.template,
|
||||
entrypoint: d.entrypoint
|
||||
}));
|
||||
|
||||
return res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true, items }))
|
||||
}
|
||||
|
||||
// list "workflow templates" => just expose templates.json names
|
||||
if (req.method === 'GET' && pathname === '/api/workflowtemplates') {
|
||||
const items = loadTemplates().map(t => ({ name: t.name }))
|
||||
return res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true, items }))
|
||||
}
|
||||
|
||||
if (req.method === 'GET' && allowed.includes(pathname)) {
|
||||
try {
|
||||
const fileName = pathname === '/' ? 'index.html' : pathname.slice(1),
|
||||
filePath = path.join(publicDir, fileName),
|
||||
ext = path.extname(fileName).toLowerCase(),
|
||||
type = ext === '.js' ? 'application/javascript' : ext === '.css' ? 'text/css' : 'text/html; charset=utf-8',
|
||||
content = fs.readFileSync(filePath, 'utf8')
|
||||
return res.writeHead(200, { 'content-type': type }).end(content);
|
||||
} catch {
|
||||
return res.writeHead(404).end('ui not found');
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// DO NOT PUT ANY GET REQUESTS BELOW THIS LINE, THEY WILL FAIL
|
||||
|
||||
//#endregion
|
||||
|
||||
// create/update a user-scoped schedule
|
||||
if (req.method === 'POST' && pathname === '/schedules') {
|
||||
const userId = requireUserId(req),
|
||||
input = await readBodyJson(req),
|
||||
def = toScheduleDef({ ...input, userId })
|
||||
scheduleOrReplace(def)
|
||||
return res.writeHead(201, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true }))
|
||||
}
|
||||
|
||||
// run a job now for the calling user (no schedule)
|
||||
if (req.method === 'POST' && pathname === '/run-now') {
|
||||
const userId = requireUserId(req),
|
||||
input = await readBodyJson(req)
|
||||
await runContainer({
|
||||
displayName: input.name,
|
||||
template: input.template,
|
||||
parameters: input.parameters || {},
|
||||
userId,
|
||||
entrypoint: input.entrypoint
|
||||
})
|
||||
return res.writeHead(201, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true }))
|
||||
}
|
||||
|
||||
// delete a schedule owned by the calling user
|
||||
if (req.method === 'DELETE' && pathname.startsWith('/schedules/')) {
|
||||
const userId = requireUserId(req),
|
||||
nameParam = decodeURIComponent(pathname.split('/').pop()),
|
||||
// stored names are already scoped; accept either raw or scoped
|
||||
key = tasks.has(nameParam) ? nameParam : scopedName(nameParam, userId),
|
||||
existing = tasks.get(key)?.def
|
||||
|
||||
if (!existing) return res.writeHead(404, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: false, error: 'not found' }))
|
||||
if (existing.userId !== userId) return res.writeHead(403, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: false, error: 'forbidden: schedule not owned by this user' }))
|
||||
|
||||
try { tasks.get(key).task.stop() } catch { }
|
||||
tasks.delete(key)
|
||||
persist()
|
||||
return res.writeHead(204).end()
|
||||
}
|
||||
|
||||
res.writeHead(404).end('not found')
|
||||
} catch (e) {
|
||||
const code = Number(e.status) || 500
|
||||
res.writeHead(code, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: false, error: e.message || String(e) }))
|
||||
}
|
||||
})
|
||||
|
||||
// boot
|
||||
restore()
|
||||
server.listen(PORT, "0.0.0.0", () => console.log(`schedules api listening on ${server.address().address}:${PORT}`));
|
||||
Reference in New Issue
Block a user