// bun run server.mjs // tiny schedules api to manage argo cronworkflows/workflows via k8s CRDs // comments intentionally lowercase per original style import http from 'http' import { KubeConfig, CustomObjectsApi } from '@kubernetes/client-node' const GROUP = 'argoproj.io' const VERSION = 'v1alpha1' const CRON_PLURAL = 'cronworkflows' const WF_PLURAL = 'workflows' const NAMESPACE = process.env.NS || 'argo' // load cluster credentials (or fallback to local kubeconfig for dev) const kc = new KubeConfig() try { kc.loadFromCluster() } catch { kc.loadFromDefault() } const co = kc.makeApiClient(CustomObjectsApi) // helper: build cron string from an iso timestamp in a tz const cronFromISO = (iso, tz = 'America/New_York') => { const dt = new Date(iso) const parts = new Intl.DateTimeFormat('en-US', { timeZone: tz, year: 'numeric', month: 'numeric', day: 'numeric', hour: 'numeric', minute: '2-digit', hour12: false }).formatToParts(dt).reduce((a, p) => (a[p.type] = p.value, a), {}) const m = Number(parts.month), d = Number(parts.day), h = Number(parts.hour), min = Number(parts.minute) return `${min} ${h} ${d} ${m} *` } // create or update a cronworkflow that runs a workflowtemplate async function upsertCronWorkflow({ name, when, tz = 'America/New_York', oneShot = false, template = { name: '', clusterScope: false }, parameters = {}, entrypoint }) { const schedule = when.cron ?? cronFromISO(when.iso, tz) const args = Object.entries(parameters).map(([name, value]) => ({ name, value })) const body = { apiVersion: `${GROUP}/${VERSION}`, kind: 'CronWorkflow', metadata: { name }, spec: { timezone: tz, schedules: [schedule], concurrencyPolicy: 'Forbid', ...(oneShot ? { stopStrategy: { expression: 'cronworkflow.succeeded >= 1' } } : {}), workflowSpec: { ...(entrypoint ? { entrypoint } : {}), arguments: args.length ? { parameters: args } : undefined, workflowTemplateRef: { name: template.name, ...(template.clusterScope ? { clusterScope: true } : {}) } } } } // try patch, else create try { await co.patchNamespacedCustomObject( GROUP, VERSION, NAMESPACE, CRON_PLURAL, name, body, undefined, undefined, undefined, { headers: { 'content-type': 'application/merge-patch+json' } } ) } catch { await co.createNamespacedCustomObject(GROUP, VERSION, NAMESPACE, CRON_PLURAL, body) } } // run immediately (no schedule) by creating a workflow from the same template async function runNow({ name, template, parameters = {}, entrypoint }) { const args = Object.entries(parameters).map(([name, value]) => ({ name, value })) const wf = { apiVersion: `${GROUP}/${VERSION}`, kind: 'Workflow', metadata: { generateName: `${name}-` }, spec: { ...(entrypoint ? { entrypoint } : {}), arguments: args.length ? { parameters: args } : undefined, workflowTemplateRef: { name: template.name, ...(template.clusterScope ? { clusterScope: true } : {}) } } } await co.createNamespacedCustomObject(GROUP, VERSION, NAMESPACE, WF_PLURAL, wf) } // tiny http api const server = http.createServer(async (req, res) => { try { if (req.method === 'POST' && req.url === '/schedules') { const input = JSON.parse(await new Promise(r => { let d = ''; req.on('data', c => d += c); req.on('end', () => r(d)) })) await upsertCronWorkflow(input) res.writeHead(201).end(JSON.stringify({ ok: true })) return } if (req.method === 'POST' && req.url === '/run-now') { const input = JSON.parse(await new Promise(r => { let d = ''; req.on('data', c => d += c); req.on('end', () => r(d)) })) await runNow(input) res.writeHead(201).end(JSON.stringify({ ok: true })) return } if (req.method === 'DELETE' && req.url?.startsWith('/schedules/')) { const name = decodeURIComponent(req.url.split('/').pop()) await co.deleteNamespacedCustomObject(GROUP, VERSION, NAMESPACE, CRON_PLURAL, name) res.writeHead(204).end() return } res.writeHead(404).end('not found') } catch (e) { res.writeHead(500).end(JSON.stringify({ ok: false, error: e.message })) } }) const port = Number(process.env.PORT) || 3000 server.listen(port, () => console.log(`schedules api listening on :${port}`))