attempting to add scheduler UI
This commit is contained in:
+213
-103
@@ -1,123 +1,233 @@
|
||||
// bun run server.mjs
|
||||
// tiny schedules api to manage argo cronworkflows/workflows via k8s CRDs
|
||||
// comments intentionally lowercase per original style
|
||||
|
||||
import http from 'http'
|
||||
import fs from 'fs'
|
||||
import path from 'path'
|
||||
import { fileURLToPath } from 'url'
|
||||
import { KubeConfig, CustomObjectsApi } from '@kubernetes/client-node'
|
||||
|
||||
const GROUP = 'argoproj.io'
|
||||
const VERSION = 'v1alpha1'
|
||||
const CRON_PLURAL = 'cronworkflows'
|
||||
const WF_PLURAL = 'workflows'
|
||||
const NAMESPACE = process.env.NS || 'argo'
|
||||
const GROUP = 'argoproj.io',
|
||||
VERSION = 'v1alpha1',
|
||||
CRON_PLURAL = 'cronworkflows',
|
||||
WF_PLURAL = 'workflows',
|
||||
NAMESPACE = process.env.NS || 'argo',
|
||||
|
||||
// load cluster credentials (or fallback to local kubeconfig for dev)
|
||||
const kc = new KubeConfig()
|
||||
// k8s label/annotation keys (must be lowercase dns-labels)
|
||||
LABEL_USER_KEY = 'openwebui.user-id',
|
||||
ANNO_DISPLAY_NAME = 'openwebui/display-name';
|
||||
|
||||
// load cluster credentials
|
||||
const kc = new KubeConfig();
|
||||
try { kc.loadFromCluster() } catch { kc.loadFromDefault() }
|
||||
const co = kc.makeApiClient(CustomObjectsApi)
|
||||
|
||||
// helper: build cron string from an iso timestamp in a tz
|
||||
const co = kc.makeApiClient(CustomObjectsApi);
|
||||
|
||||
// build cron string from an iso timestamp in a tz
|
||||
const cronFromISO = (iso, tz = 'America/New_York') => {
|
||||
const dt = new Date(iso)
|
||||
const parts = new Intl.DateTimeFormat('en-US', {
|
||||
timeZone: tz, year: 'numeric', month: 'numeric', day: 'numeric',
|
||||
hour: 'numeric', minute: '2-digit', hour12: false
|
||||
}).formatToParts(dt).reduce((a, p) => (a[p.type] = p.value, a), {})
|
||||
const m = Number(parts.month), d = Number(parts.day), h = Number(parts.hour), min = Number(parts.minute)
|
||||
return `${min} ${h} ${d} ${m} *`
|
||||
const dt = new Date(iso),
|
||||
parts = new Intl.DateTimeFormat('en-US', {
|
||||
timeZone: tz, year: 'numeric', month: 'numeric', day: 'numeric',
|
||||
hour: 'numeric', minute: '2-digit', hour12: false
|
||||
}).formatToParts(dt).reduce((a, p) => (a[p.type] = p.value, a), {}),
|
||||
|
||||
m = Number(parts.month), d = Number(parts.day), h = Number(parts.hour), min = Number(parts.minute);
|
||||
|
||||
return `${min} ${h} ${d} ${m} *`;
|
||||
}
|
||||
|
||||
// create or update a cronworkflow that runs a workflowtemplate
|
||||
// derive a k8s-safe, user-scoped name and preserve a human display name
|
||||
const scopedName = (name, userId) => {
|
||||
// keep to dns-1123 by trimming/normalizing a bit; add an 8-char user suffix for uniqueness
|
||||
const base = String(name).toLowerCase().replace(/[^a-z0-9-]+/g, '-').replace(/^-+|-+$/g, '').slice(0, 40),
|
||||
suffix = String(userId).toLowerCase().replace(/[^a-z0-9]+/g, '').slice(0, 8) || 'anon';
|
||||
return `${base}--u-${suffix}`;
|
||||
}
|
||||
|
||||
// ensure we have a user id header
|
||||
const requireUserId = (req) => {
|
||||
const userId = String(req.headers['x-user-id'] || '').trim();
|
||||
if (!userId) throw Object.assign(new Error('missing x-user-id header'), { status: 401 });
|
||||
return userId;
|
||||
}
|
||||
|
||||
// normalize parameters and force-inject user_id
|
||||
const buildParams = (parameters = {}, userId) => {
|
||||
const merged = { ...parameters, user_id: userId },
|
||||
args = Object.entries(merged).map(([name, value]) => ({ name, value }));
|
||||
return args.length ? { parameters: args } : undefined;
|
||||
}
|
||||
|
||||
// create or update a cronworkflow that runs a workflowtemplate (scoped to user)
|
||||
async function upsertCronWorkflow({
|
||||
name, when, tz = 'America/New_York', oneShot = false,
|
||||
template = { name: '', clusterScope: false },
|
||||
parameters = {}, entrypoint
|
||||
name, when, tz = 'America/New_York', oneShot = false,
|
||||
template = { name: '', clusterScope: false },
|
||||
parameters = {}, entrypoint, userId
|
||||
}) {
|
||||
const schedule = when.cron ?? cronFromISO(when.iso, tz)
|
||||
const args = Object.entries(parameters).map(([name, value]) => ({ name, value }))
|
||||
const schedule = when.cron ?? cronFromISO(when.iso, tz),
|
||||
nameActual = scopedName(name, userId),
|
||||
|
||||
const body = {
|
||||
apiVersion: `${GROUP}/${VERSION}`,
|
||||
kind: 'CronWorkflow',
|
||||
metadata: { name },
|
||||
spec: {
|
||||
timezone: tz,
|
||||
schedules: [schedule],
|
||||
concurrencyPolicy: 'Forbid',
|
||||
...(oneShot ? { stopStrategy: { expression: 'cronworkflow.succeeded >= 1' } } : {}),
|
||||
workflowSpec: {
|
||||
...(entrypoint ? { entrypoint } : {}),
|
||||
arguments: args.length ? { parameters: args } : undefined,
|
||||
workflowTemplateRef: {
|
||||
name: template.name,
|
||||
...(template.clusterScope ? { clusterScope: true } : {})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
body = {
|
||||
apiVersion: `${GROUP}/${VERSION}`,
|
||||
kind: 'CronWorkflow',
|
||||
metadata: {
|
||||
name: nameActual,
|
||||
labels: { [LABEL_USER_KEY]: userId },
|
||||
annotations: { [ANNO_DISPLAY_NAME]: name },
|
||||
},
|
||||
spec: {
|
||||
timezone: tz,
|
||||
schedules: [schedule],
|
||||
concurrencyPolicy: 'Forbid',
|
||||
...(oneShot ? { stopStrategy: { expression: 'cronworkflow.succeeded >= 1' } } : {}),
|
||||
workflowSpec: {
|
||||
...(entrypoint ? { entrypoint } : {}),
|
||||
arguments: buildParams(parameters, userId),
|
||||
workflowTemplateRef: {
|
||||
name: template.name,
|
||||
...(template.clusterScope ? { clusterScope: true } : {})
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// try patch, else create
|
||||
try {
|
||||
await co.patchNamespacedCustomObject(
|
||||
GROUP, VERSION, NAMESPACE, CRON_PLURAL, name, body,
|
||||
undefined, undefined, undefined,
|
||||
{ headers: { 'content-type': 'application/merge-patch+json' } }
|
||||
)
|
||||
} catch {
|
||||
await co.createNamespacedCustomObject(GROUP, VERSION, NAMESPACE, CRON_PLURAL, body)
|
||||
}
|
||||
// try patch, else create
|
||||
try {
|
||||
await co.patchNamespacedCustomObject(
|
||||
GROUP, VERSION, NAMESPACE, CRON_PLURAL, nameActual, body,
|
||||
undefined, undefined, undefined,
|
||||
{ headers: { 'content-type': 'application/merge-patch+json' } }
|
||||
);
|
||||
} catch {
|
||||
await co.createNamespacedCustomObject(GROUP, VERSION, NAMESPACE, CRON_PLURAL, body);
|
||||
}
|
||||
}
|
||||
|
||||
// run immediately (no schedule) by creating a workflow from the same template
|
||||
async function runNow({ name, template, parameters = {}, entrypoint }) {
|
||||
const args = Object.entries(parameters).map(([name, value]) => ({ name, value }))
|
||||
const wf = {
|
||||
apiVersion: `${GROUP}/${VERSION}`,
|
||||
kind: 'Workflow',
|
||||
metadata: { generateName: `${name}-` },
|
||||
spec: {
|
||||
...(entrypoint ? { entrypoint } : {}),
|
||||
arguments: args.length ? { parameters: args } : undefined,
|
||||
workflowTemplateRef: {
|
||||
name: template.name,
|
||||
...(template.clusterScope ? { clusterScope: true } : {})
|
||||
}
|
||||
}
|
||||
}
|
||||
await co.createNamespacedCustomObject(GROUP, VERSION, NAMESPACE, WF_PLURAL, wf)
|
||||
// run immediately (no schedule) by creating a workflow from the same template (scoped to user)
|
||||
async function runNow({ name, template, parameters = {}, entrypoint, userId }) {
|
||||
const wf = {
|
||||
apiVersion: `${GROUP}/${VERSION}`,
|
||||
kind: 'Workflow',
|
||||
metadata: {
|
||||
generateName: `${scopedName(name, userId)}-`,
|
||||
labels: { [LABEL_USER_KEY]: userId },
|
||||
annotations: { [ANNO_DISPLAY_NAME]: name },
|
||||
},
|
||||
spec: {
|
||||
...(entrypoint ? { entrypoint } : {}),
|
||||
arguments: buildParams(parameters, userId),
|
||||
workflowTemplateRef: {
|
||||
name: template.name,
|
||||
...(template.clusterScope ? { clusterScope: true } : {})
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
await co.createNamespacedCustomObject(GROUP, VERSION, NAMESPACE, WF_PLURAL, wf);
|
||||
}
|
||||
|
||||
// tiny http api
|
||||
const __filename = fileURLToPath(import.meta.url),
|
||||
__dirname = path.dirname(__filename),
|
||||
publicDir = path.join(__dirname, 'public');
|
||||
|
||||
// tiny json helper
|
||||
const readJson = (req) => new Promise((resolve, reject) => {
|
||||
let d = ''; req.on('data', c => d += c);
|
||||
req.on('end', () => { try { resolve(JSON.parse(d || '{}')) } catch (e) { reject(e) } });
|
||||
req.on('error', reject);
|
||||
});
|
||||
|
||||
const server = http.createServer(async (req, res) => {
|
||||
try {
|
||||
if (req.method === 'POST' && req.url === '/schedules') {
|
||||
const input = JSON.parse(await new Promise(r => {
|
||||
let d = ''; req.on('data', c => d += c); req.on('end', () => r(d))
|
||||
}))
|
||||
await upsertCronWorkflow(input)
|
||||
res.writeHead(201).end(JSON.stringify({ ok: true }))
|
||||
return
|
||||
}
|
||||
if (req.method === 'POST' && req.url === '/run-now') {
|
||||
const input = JSON.parse(await new Promise(r => {
|
||||
let d = ''; req.on('data', c => d += c); req.on('end', () => r(d))
|
||||
}))
|
||||
await runNow(input)
|
||||
res.writeHead(201).end(JSON.stringify({ ok: true }))
|
||||
return
|
||||
}
|
||||
if (req.method === 'DELETE' && req.url?.startsWith('/schedules/')) {
|
||||
const name = decodeURIComponent(req.url.split('/').pop())
|
||||
await co.deleteNamespacedCustomObject(GROUP, VERSION, NAMESPACE, CRON_PLURAL, name)
|
||||
res.writeHead(204).end()
|
||||
return
|
||||
}
|
||||
res.writeHead(404).end('not found')
|
||||
} catch (e) {
|
||||
res.writeHead(500).end(JSON.stringify({ ok: false, error: e.message }))
|
||||
}
|
||||
})
|
||||
try {
|
||||
// death
|
||||
const origin = req.headers.origin || '*'
|
||||
res.setHeader('access-control-allow-origin', origin)
|
||||
res.setHeader('vary', 'origin')
|
||||
res.setHeader('access-control-allow-headers', 'content-type, x-user-id')
|
||||
res.setHeader('access-control-allow-methods', 'GET, POST, DELETE, OPTIONS')
|
||||
if (req.method === 'OPTIONS') return res.writeHead(204).end()
|
||||
|
||||
const port = Number(process.env.PORT) || 3000
|
||||
server.listen(port, () => console.log(`schedules api listening on :${port}`))
|
||||
// minimal static ui
|
||||
if (req.method === 'GET' && (req.url === '/' || req.url === '/index.html')) {
|
||||
try {
|
||||
const html = fs.readFileSync(path.join(publicDir, 'index.html'), 'utf8');
|
||||
res.writeHead(200, { 'content-type': 'text/html; charset=utf-8' }).end(html);
|
||||
} catch {
|
||||
res.writeHead(404).end('ui not found');
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// list CronWorkflows for the calling user
|
||||
if (req.method === 'GET' && req.url === '/api/schedules') {
|
||||
const userId = requireUserId(req),
|
||||
list = await co.listNamespacedCustomObject(
|
||||
GROUP, VERSION, NAMESPACE, CRON_PLURAL,
|
||||
undefined, undefined, undefined, `${LABEL_USER_KEY}=${userId}` // labelSelector
|
||||
),
|
||||
items = (list.body.items || []).map(it => ({
|
||||
name: it.metadata?.name,
|
||||
displayName: it.metadata?.annotations?.[ANNO_DISPLAY_NAME] || it.metadata?.name,
|
||||
userId: it.metadata?.labels?.[LABEL_USER_KEY],
|
||||
timezone: it.spec?.timezone,
|
||||
schedules: it.spec?.schedules,
|
||||
oneShot: Boolean(it.spec?.stopStrategy),
|
||||
templateRef: it.spec?.workflowSpec?.workflowTemplateRef,
|
||||
entrypoint: it.spec?.workflowSpec?.entrypoint,
|
||||
}));
|
||||
|
||||
return res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true, items }));
|
||||
}
|
||||
|
||||
// list WorkflowTemplates for UI (shared)
|
||||
if (req.method === 'GET' && req.url === '/api/workflowtemplates') {
|
||||
const list = await co.listNamespacedCustomObject(GROUP, VERSION, NAMESPACE, 'workflowtemplates'),
|
||||
items = (list.body.items || []).map(it => ({ name: it.metadata?.name }));
|
||||
|
||||
return res.writeHead(200, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true, items }));
|
||||
}
|
||||
|
||||
// create/update a user-scoped schedule
|
||||
if (req.method === 'POST' && req.url === '/schedules') {
|
||||
const userId = requireUserId(req),
|
||||
input = await readJson(req);
|
||||
|
||||
await upsertCronWorkflow({ ...input, userId });
|
||||
return res.writeHead(201, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true }));
|
||||
}
|
||||
|
||||
// run a job now for the calling user
|
||||
if (req.method === 'POST' && req.url === '/run-now') {
|
||||
const userId = requireUserId(req),
|
||||
input = await readJson(req);
|
||||
|
||||
await runNow({ ...input, userId });
|
||||
return res.writeHead(201, { 'content-type': 'application/json' }).end(JSON.stringify({ ok: true }));
|
||||
}
|
||||
|
||||
// delete a schedule owned by the calling user
|
||||
if (req.method === 'DELETE' && req.url?.startsWith('/schedules/')) {
|
||||
const userId = requireUserId(req),
|
||||
name = decodeURIComponent(req.url.split('/').pop());
|
||||
|
||||
// guard: verify ownership via label before deletion
|
||||
const obj = await co.getNamespacedCustomObject(GROUP, VERSION, NAMESPACE, CRON_PLURAL, name),
|
||||
owner = obj.body?.metadata?.labels?.[LABEL_USER_KEY];
|
||||
|
||||
if (owner !== userId) {
|
||||
res.writeHead(403, { 'content-type': 'application/json' })
|
||||
.end(JSON.stringify({ ok: false, error: 'forbidden: schedule not owned by this user' }));
|
||||
return;
|
||||
}
|
||||
|
||||
await co.deleteNamespacedCustomObject(GROUP, VERSION, NAMESPACE, CRON_PLURAL, name);
|
||||
return res.writeHead(204).end();
|
||||
}
|
||||
|
||||
res.writeHead(404).end('not found');
|
||||
} catch (e) {
|
||||
const code = Number(e.status) || 500;
|
||||
res.writeHead(code, { 'content-type': 'application/json' })
|
||||
.end(JSON.stringify({ ok: false, error: e.message || String(e) }));
|
||||
}
|
||||
});
|
||||
|
||||
const port = Number(process.env.PORT) || 12253;
|
||||
server.listen(port, () => console.log(`schedules api listening on :${port}`));
|
||||
|
||||
Reference in New Issue
Block a user