124 lines
4.3 KiB
JavaScript
124 lines
4.3 KiB
JavaScript
// bun run server.mjs
|
|
// tiny schedules api to manage argo cronworkflows/workflows via k8s CRDs
|
|
// comments intentionally lowercase per original style
|
|
|
|
import http from 'http'
|
|
import { KubeConfig, CustomObjectsApi } from '@kubernetes/client-node'
|
|
|
|
const GROUP = 'argoproj.io'
|
|
const VERSION = 'v1alpha1'
|
|
const CRON_PLURAL = 'cronworkflows'
|
|
const WF_PLURAL = 'workflows'
|
|
const NAMESPACE = process.env.NS || 'argo'
|
|
|
|
// load cluster credentials (or fallback to local kubeconfig for dev)
|
|
const kc = new KubeConfig()
|
|
try { kc.loadFromCluster() } catch { kc.loadFromDefault() }
|
|
const co = kc.makeApiClient(CustomObjectsApi)
|
|
|
|
// helper: build cron string from an iso timestamp in a tz
|
|
const cronFromISO = (iso, tz = 'America/New_York') => {
|
|
const dt = new Date(iso)
|
|
const parts = new Intl.DateTimeFormat('en-US', {
|
|
timeZone: tz, year: 'numeric', month: 'numeric', day: 'numeric',
|
|
hour: 'numeric', minute: '2-digit', hour12: false
|
|
}).formatToParts(dt).reduce((a, p) => (a[p.type] = p.value, a), {})
|
|
const m = Number(parts.month), d = Number(parts.day), h = Number(parts.hour), min = Number(parts.minute)
|
|
return `${min} ${h} ${d} ${m} *`
|
|
}
|
|
|
|
// create or update a cronworkflow that runs a workflowtemplate
|
|
async function upsertCronWorkflow({
|
|
name, when, tz = 'America/New_York', oneShot = false,
|
|
template = { name: '', clusterScope: false },
|
|
parameters = {}, entrypoint
|
|
}) {
|
|
const schedule = when.cron ?? cronFromISO(when.iso, tz)
|
|
const args = Object.entries(parameters).map(([name, value]) => ({ name, value }))
|
|
|
|
const body = {
|
|
apiVersion: `${GROUP}/${VERSION}`,
|
|
kind: 'CronWorkflow',
|
|
metadata: { name },
|
|
spec: {
|
|
timezone: tz,
|
|
schedules: [schedule],
|
|
concurrencyPolicy: 'Forbid',
|
|
...(oneShot ? { stopStrategy: { expression: 'cronworkflow.succeeded >= 1' } } : {}),
|
|
workflowSpec: {
|
|
...(entrypoint ? { entrypoint } : {}),
|
|
arguments: args.length ? { parameters: args } : undefined,
|
|
workflowTemplateRef: {
|
|
name: template.name,
|
|
...(template.clusterScope ? { clusterScope: true } : {})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// try patch, else create
|
|
try {
|
|
await co.patchNamespacedCustomObject(
|
|
GROUP, VERSION, NAMESPACE, CRON_PLURAL, name, body,
|
|
undefined, undefined, undefined,
|
|
{ headers: { 'content-type': 'application/merge-patch+json' } }
|
|
)
|
|
} catch {
|
|
await co.createNamespacedCustomObject(GROUP, VERSION, NAMESPACE, CRON_PLURAL, body)
|
|
}
|
|
}
|
|
|
|
// run immediately (no schedule) by creating a workflow from the same template
|
|
async function runNow({ name, template, parameters = {}, entrypoint }) {
|
|
const args = Object.entries(parameters).map(([name, value]) => ({ name, value }))
|
|
const wf = {
|
|
apiVersion: `${GROUP}/${VERSION}`,
|
|
kind: 'Workflow',
|
|
metadata: { generateName: `${name}-` },
|
|
spec: {
|
|
...(entrypoint ? { entrypoint } : {}),
|
|
arguments: args.length ? { parameters: args } : undefined,
|
|
workflowTemplateRef: {
|
|
name: template.name,
|
|
...(template.clusterScope ? { clusterScope: true } : {})
|
|
}
|
|
}
|
|
}
|
|
await co.createNamespacedCustomObject(GROUP, VERSION, NAMESPACE, WF_PLURAL, wf)
|
|
}
|
|
|
|
// tiny http api
|
|
const server = http.createServer(async (req, res) => {
|
|
try {
|
|
if (req.method === 'POST' && req.url === '/schedules') {
|
|
const input = JSON.parse(await new Promise(r => {
|
|
let d = ''; req.on('data', c => d += c); req.on('end', () => r(d))
|
|
}))
|
|
await upsertCronWorkflow(input)
|
|
res.writeHead(201).end(JSON.stringify({ ok: true }))
|
|
return
|
|
}
|
|
if (req.method === 'POST' && req.url === '/run-now') {
|
|
const input = JSON.parse(await new Promise(r => {
|
|
let d = ''; req.on('data', c => d += c); req.on('end', () => r(d))
|
|
}))
|
|
await runNow(input)
|
|
res.writeHead(201).end(JSON.stringify({ ok: true }))
|
|
return
|
|
}
|
|
if (req.method === 'DELETE' && req.url?.startsWith('/schedules/')) {
|
|
const name = decodeURIComponent(req.url.split('/').pop())
|
|
await co.deleteNamespacedCustomObject(GROUP, VERSION, NAMESPACE, CRON_PLURAL, name)
|
|
res.writeHead(204).end()
|
|
return
|
|
}
|
|
res.writeHead(404).end('not found')
|
|
} catch (e) {
|
|
res.writeHead(500).end(JSON.stringify({ ok: false, error: e.message }))
|
|
}
|
|
})
|
|
|
|
const port = Number(process.env.PORT) || 3000
|
|
server.listen(port, () => console.log(`schedules api listening on :${port}`))
|
|
|