Par Thales & Claude -- CEO & AI CTO, ZeroSuite, Inc.
Le planificateur est le seul composant qui compte.
Tout le reste dans 0cron -- l'API, le tableau de bord, le système de notifications, le parseur NLP -- existe au service d'une seule fonction : exécuter une requête HTTP au bon moment. Si le planificateur échoue, le produit échoue. Si le planificateur est lent, chaque tâche s'exécute en retard. Si le planificateur se déclenche deux fois, les utilisateurs reçoivent des appels webhook en double et des données corrompues.
Cet article est une analyse ligne par ligne de la façon dont nous avons construit le moteur de planification de 0cron en Rust. Le planificateur complet fait 199 lignes. L'exécuteur fait 309 lignes. Ensemble, ils forment le coeur d'un service qui doit être correct, rapide, et résilient à l'exécution concurrente sur plusieurs instances de serveur.
La décision d'architecture : pourquoi les sorted sets Redis
Avant d'écrire le moindre code, nous devions choisir une primitive de planification. Les options :
Option 1 : polling de base de données. Interroger PostgreSQL toutes les N secondes pour les tâches où next_run_at <= NOW(). Simple, mais PostgreSQL est optimisé pour les requêtes complexes, pas pour le polling haute fréquence. Une requête chaque seconde contre une table de millions de lignes est un gaspillage, même avec un index.
Option 2 : roue de temporisation en mémoire. Stocker toutes les planifications dans une structure de données comme une file de priorité ou une roue de temporisation dans la mémoire de l'application. Rapide, mais volatile -- un redémarrage du serveur perd tout l'état de planification. Aussi incompatible avec la scalabilité horizontale : si vous exécutez deux instances, laquelle possède quelles tâches ?
Option 3 : sorted sets Redis. Chaque tâche est un membre d'un sorted set, scoré par son horodatage de prochaine exécution. Trouver les tâches dues est ZRANGEBYSCORE scheduled_jobs 0 <now> -- une opération O(log(N) + M) où M est le nombre de tâches dues. Redis est conçu exactement pour ce pattern d'accès : lectures haute fréquence de petits ensembles de données triés.
Nous avons choisi l'option 3 pour trois raisons :
- Vitesse. Les requêtes sur les sorted sets Redis sont sub-milliseconde. Le polling chaque seconde ajoute une charge négligeable.
- Durabilité. Redis persiste sur disque (snapshots RDB ou AOF). Un redémarrage du serveur ne perd pas l'état de planification.
- Distribution. Plusieurs instances de 0cron peuvent interroger le même sorted set Redis. Combiné avec des verrous distribués, cela nous donne une scalabilité horizontale sans protocole de consensus.
Le compromis : Redis est une dépendance d'infrastructure supplémentaire. Pour un produit qui nécessite déjà PostgreSQL pour le stockage persistant, c'est acceptable. L'alternative -- construire une couche de planification distribuée from scratch en utilisant Raft ou Paxos -- serait un excès architectural pour un service à 1,99 $/mois.
Le struct Scheduler
Le planificateur est un struct avec trois champs :
rustuse chrono::Utc;
use cron::Schedule;
use redis::AsyncCommands;
use sqlx::PgPool;
use std::str::FromStr;
use std::sync::Arc;
use uuid::Uuid;
pub struct Scheduler {
redis: redis::Client,
pool: PgPool,
config: Arc<AppConfig>,
}
impl Scheduler {
pub fn new(redis: redis::Client, pool: PgPool, config: Arc<AppConfig>) -> Self {
Self { redis, pool, config }
}
}redis::Client est une fabrique de connexions, pas une connexion elle-même. Chaque opération ouvre une connexion multiplexée depuis le client. PgPool est le pool de connexions de SQLx -- il gère un ensemble de connexions PostgreSQL et les distribue à la demande. AppConfig contient la configuration runtime (clé de chiffrement, paramètres serveur) encapsulée dans un Arc pour un partage peu coûteux entre les tâches asynchrones.
Le planificateur est créé dans main.rs et encapsulé dans un Arc pour pouvoir être déplacé dans la tâche en arrière-plan :
rust// In main.rs
let scheduler = Arc::new(Scheduler::new(
state.redis.clone(),
state.db.pool.clone(),
Arc::clone(&state.config),
));
let _scheduler_handle = scheduler.start();
tracing::info!("Scheduler background task started");La méthode start() lance une tâche tokio et retourne un JoinHandle. Le handle est stocké dans _scheduler_handle -- le préfixe underscore signale que nous ne l'attendons ni ne l'annulons intentionnellement. Le planificateur s'exécute pour toute la durée de vie du processus.
La boucle de polling
La méthode start() tient en quatre lignes :
rustpub fn start(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
tracing::info!("Scheduler started");
loop {
if let Err(e) = self.poll().await {
tracing::error!("Scheduler poll error: {e}");
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
})
}Une boucle infinie. Appeler poll(). Dormir 1 seconde. Répéter.
Si poll() retourne une erreur -- Redis est injoignable, une requête échoue -- l'erreur est journalisée et la boucle continue. Le planificateur ne plante pas sur les pannes transitoires. Il journalise, dort, et réessaie au tick suivant. C'est un choix de conception délibéré : un planificateur qui meurt au premier timeout Redis est inutile en production où les micro-coupures réseau sont routine.
Le sleep d'1 seconde détermine la résolution temporelle du système. Une tâche planifiée à 14 h 30 min 00 s s'exécutera entre 14 h 30 min 00 s et 14 h 30 min 01 s -- au maximum une seconde de délai, comparé à la gigue de 4 à 40 secondes de cron-job.org. Pour la plupart des cas d'usage, une précision sub-seconde n'est pas nécessaire. Pour les rares cas où cela compte, la durée du sleep est configurable.
La méthode poll : là où le travail se fait
Voici le coeur du planificateur :
rustasync fn poll(&self) -> AppResult<()> {
let mut conn = self.redis
.get_multiplexed_async_connection().await
.map_err(|e| AppError::Redis(e))?;
let now = Utc::now().timestamp() as f64;
// Get all jobs due for execution (score <= now)
let due_jobs: Vec<String> = conn
.zrangebyscore("scheduled_jobs", 0.0_f64, now)
.await
.map_err(|e| AppError::Redis(e))?;
for job_id_str in due_jobs {
let job_id = match Uuid::parse_str(&job_id_str) {
Ok(id) => id,
Err(_) => {
tracing::warn!("Invalid job ID in scheduled set: {job_id_str}");
continue;
}
};
// Attempt distributed lock
let lock_key = format!("lock:{job_id}");
let locked: bool = redis::cmd("SET")
.arg(&lock_key)
.arg("1")
.arg("NX")
.arg("EX")
.arg(60)
.query_async(&mut conn)
.await
.unwrap_or(false);
if !locked {
continue; // Another instance holds the lock
}
// Remove from sorted set
let _: () = conn
.zrem("scheduled_jobs", &job_id_str)
.await
.unwrap_or(());
// Spawn executor task
let pool = self.pool.clone();
let config = self.config.clone();
let redis_client = self.redis.clone();
tokio::spawn(async move {
let job = sqlx::query_as::<_, Job>(
"SELECT * FROM jobs WHERE id = $1"
)
.bind(job_id)
.fetch_optional(&pool)
.await;
match job {
Ok(Some(job)) if job.status == "active" => {
if let Err(e) = executor::execute_job(
&job, &pool, encryption_key, "schedule", Some(&config)
).await {
tracing::error!(job_id = %job.id,
"Execution failed: {e}");
}
// Compute next run and re-schedule
if let Ok(next) = compute_next_run(&job.schedule_cron) {
let next_ts = next.timestamp() as f64;
if let Ok(mut conn) = redis_client
.get_multiplexed_async_connection().await
{
let _: Result<(), _> = conn
.zadd("scheduled_jobs",
job_id.to_string(), next_ts)
.await;
}
let _ = sqlx::query(
"UPDATE jobs SET next_run_at = $1 WHERE id = $2"
)
.bind(next)
.bind(job_id)
.execute(&pool)
.await;
}
}
Ok(Some(_)) => {
tracing::info!(job_id = %job_id,
"Job is not active, skipping");
}
Ok(None) => {
tracing::warn!(job_id = %job_id,
"Job not found in database");
}
Err(e) => {
tracing::error!(job_id = %job_id,
"Failed to fetch job: {e}");
}
}
});
}
Ok(())
}Parcourons cela étape par étape.
Étape 1 : interroger les tâches dues
rustlet due_jobs: Vec<String> = conn
.zrangebyscore("scheduled_jobs", 0.0_f64, now)
.await?;ZRANGEBYSCORE scheduled_jobs 0 <horodatage_courant> retourne tous les membres du sorted set dont le score (heure de prochaine exécution) est inférieur ou égal à l'heure actuelle. Ce sont les tâches dues.
Les membres du sorted set sont des UUID de tâches stockés comme chaînes. Les scores sont des timestamps Unix en flottants. Redis maintient l'ensemble trié par score, donc cette requête est efficace même avec des centaines de milliers de tâches -- elle parcourt une skip list depuis le score le plus bas jusqu'au point de coupure.
Étape 2 : acquérir le verrou distribué
rustlet lock_key = format!("lock:{job_id}");
let locked: bool = redis::cmd("SET")
.arg(&lock_key)
.arg("1")
.arg("NX")
.arg("EX")
.arg(60)
.query_async(&mut conn)
.await
.unwrap_or(false);
if !locked {
continue;
}SET lock:<job_id> 1 NX EX 60 est le pattern standard de verrou distribué Redis. NX signifie « définir seulement si la clé n'existe pas » -- un compare-and-set atomique. EX 60 définit une expiration de 60 secondes, qui sert de soupape de sécurité : si l'instance qui a acquis le verrou plante en cours d'exécution, le verrou se libère automatiquement après une minute.
Si locked est false, une autre instance de 0cron a déjà revendiqué cette tâche. L'instance courante la saute et passe à la tâche suivante dans la liste des tâches dues.
Ce n'est pas Redlock. Nous n'exécutons pas Redis Sentinel ni un cluster Redis avec plusieurs masters. Pour l'échelle et le prix de 0cron, une seule instance Redis avec SET NX est suffisante. Le mode de défaillance -- deux instances exécutant la même tâche pendant un split-brain Redis -- est un risque acceptable à ce niveau. Si 0cron grandit au point où cela pose problème, nous passerons à Redlock ou à un véritable service de coordination distribuée. Ingénierie pour l'échelle actuelle, pas pour l'hypothétique.
Étape 3 : retirer du sorted set et lancer
rustlet _: () = conn
.zrem("scheduled_jobs", &job_id_str)
.await
.unwrap_or(());Après avoir acquis le verrou, la tâche est retirée du sorted set. Cela empêche d'autres instances de la voir à leur prochain poll. L'ordre est important : verrouiller d'abord, puis retirer. Si nous retirions d'abord et échouions ensuite à verrouiller, la tâche disparaîtrait de la planification sans être exécutée.
L'exécution effective est lancée comme une tâche tokio séparée. C'est critique pour le débit : la boucle de poll ne doit pas bloquer sur les exécutions individuelles de tâches. Si une tâche prend 30 secondes à compléter (l'endpoint de l'utilisateur est lent), le planificateur continue de poller et de dispatcher d'autres tâches en parallèle.
Étape 4 : exécuter et replanifier
Après l'exécution, le planificateur calcule la prochaine heure d'exécution à partir de l'expression cron et rajoute la tâche au sorted set avec le nouvel horodatage comme score. Il met aussi à jour next_run_at dans PostgreSQL pour que le tableau de bord puisse afficher la prochaine exécution planifiée.
rustif let Ok(next) = compute_next_run(&job.schedule_cron) {
let next_ts = next.timestamp() as f64;
let _: Result<(), _> = conn
.zadd("scheduled_jobs", job_id.to_string(), next_ts)
.await;
let _ = sqlx::query("UPDATE jobs SET next_run_at = $1 WHERE id = $2")
.bind(next)
.bind(job_id)
.execute(&pool)
.await;
}La fonction compute_next_run utilise le crate cron pour parser l'expression et trouver la prochaine heure correspondante après now :
rustpub fn compute_next_run(
cron_expr: &str,
) -> AppResult<chrono::DateTime<Utc>> {
let schedule = Schedule::from_str(cron_expr)
.map_err(|e| AppError::Validation(
format!("Invalid cron expression: {e}")))?;
schedule
.upcoming(Utc)
.next()
.ok_or_else(|| AppError::Internal(
"No upcoming schedule found".to_string()))
}Le crate cron parse les expressions cron standard et génère un itérateur des prochaines heures correspondantes. .next() retourne la plus proche. Ce calcul se fait une fois par exécution, pas une fois par tick -- le résultat est stocké dans Redis comme nouveau score.
L'exécuteur : ce qui se passe quand une tâche se déclenche
Le planificateur dispatche ; l'exécuteur exécute. La fonction execute_job dans executor.rs gère les retries, et execute_single gère le cycle de vie de la requête HTTP.
La boucle de retry
rustpub async fn execute_job(
job: &Job,
db: &PgPool,
encryption_key: &[u8],
triggered_by: &str,
config_app: Option<&AppConfig>,
) -> AppResult<Execution> {
let config: JobConfig = serde_json::from_value(job.config.clone())?;
let retry_config = config.retry.clone().unwrap_or(RetryConfig {
attempts: 1,
backoff: "exponential".to_string(),
delay: 5,
});
let mut last_execution = None;
for attempt in 0..retry_config.attempts {
let execution = execute_single(
job, &config, db, encryption_key,
attempt as i32, triggered_by
).await;
match &execution {
Ok(exec) if exec.status == "success" => {
update_job_stats(job.id, exec, db).await?;
send_job_notifications(job, exec, db, config_app).await;
return execution;
}
Ok(exec) => {
last_execution = Some(exec.clone());
if attempt + 1 < retry_config.attempts {
let delay = compute_backoff(&retry_config, attempt);
tokio::time::sleep(
std::time::Duration::from_secs(delay)
).await;
}
}
Err(e) => {
tracing::error!(job_id = %job.id,
"Execution error: {e}");
if attempt + 1 < retry_config.attempts {
let delay = compute_backoff(&retry_config, attempt);
tokio::time::sleep(
std::time::Duration::from_secs(delay)
).await;
}
}
}
}
// All retries exhausted
if let Some(ref exec) = last_execution {
update_job_stats(job.id, exec, db).await?;
send_job_notifications(job, exec, db, config_app).await;
return Ok(exec.clone());
}
Err(AppError::Internal(format!(
"Job {} failed after all retries", job.id
)))
}La logique de retry par défaut est d'une tentative (pas de retry) avec un backoff exponentiel commençant à 5 secondes. Les utilisateurs peuvent configurer jusqu'à 3 tentatives avec leur choix de stratégie de backoff.
En cas de succès, la boucle retourne immédiatement après mise à jour des statistiques et envoi des notifications. En cas d'échec, elle journalise la tentative, stocke l'enregistrement d'exécution, calcule le délai de backoff, dort, et réessaie. Après épuisement de toutes les tentatives, elle rapporte l'échec final.
Chaque tentative -- réussie ou non -- est enregistrée comme une ligne Execution dans PostgreSQL. L'utilisateur peut voir chaque retry dans son historique d'exécution, avec le corps de la réponse et le code de statut de chaque tentative.
Le calcul du backoff
rustfn compute_backoff(retry_config: &RetryConfig, attempt: u32) -> u64 {
let base = retry_config.delay as u64;
match retry_config.backoff.as_str() {
"exponential" => base * 2_u64.pow(attempt),
"linear" => base * (attempt as u64 + 1),
"fixed" | _ => base,
}
}Trois stratégies :
- Exponentiel : 5 s, 10 s, 20 s, 40 s... Double à chaque retry. Bon pour les pannes transitoires où reculer réduit la contention.
- Linéaire : 5 s, 10 s, 15 s, 20 s... Incrément constant. Plus doux que l'exponentiel ; utile quand le service cible a des limites de taux avec des fenêtres de réinitialisation prévisibles.
- Fixe : 5 s, 5 s, 5 s... Même délai à chaque fois. Pour les cas où la panne est soit résolue, soit pas, et attendre plus longtemps n'aide pas.
Le défaut (le bras catch-all _) est fixe. Si un utilisateur fournit une stratégie de backoff non reconnue, il obtient des retries à délai fixe prévisibles plutôt qu'une erreur.
Le constructeur de requête HTTP
La fonction execute_single construit et envoie la requête HTTP effective :
rustasync fn execute_single(
job: &Job,
config: &JobConfig,
db: &PgPool,
encryption_key: &[u8],
attempt: i32,
triggered_by: &str,
) -> AppResult<Execution> {
let started_at = Utc::now();
let client = reqwest::Client::new();
// Interpolate secrets in URL, headers, and body
let url = secrets::interpolate_secrets(
&config.url, job.team_id, db, encryption_key
).await?;
let mut headers = HashMap::new();
for (k, v) in &config.headers {
let interpolated = secrets::interpolate_secrets(
v, job.team_id, db, encryption_key
).await?;
headers.insert(k.clone(), interpolated);
}
// Build the request
let method: reqwest::Method = config.method.parse()?;
let mut req = client
.request(method.clone(), &url)
.timeout(std::time::Duration::from_secs(
config.timeout as u64
));
for (k, v) in &headers {
req = req.header(k.as_str(), v.as_str());
}
if let Some(ref b) = body {
req = req.body(b.clone());
}
// Execute and capture everything
let result = req.send().await;
let finished_at = Utc::now();
let duration_ms = (finished_at - started_at).num_milliseconds() as i32;
// ... status extraction, response capture, DB insert
}L'étape d'interpolation des secrets mérite d'être soulignée. Les utilisateurs peuvent stocker des clés API et des tokens comme secrets chiffrés et les référencer dans les configurations de tâches en utilisant la syntaxe ${secrets.API_KEY}. Avant que la requête HTTP ne soit envoyée, chaque occurrence de ${secrets.<KEY>} dans l'URL, les en-têtes et le corps est remplacée par la valeur déchiffrée. Le déchiffrement utilise AES-256-GCM avec la clé de chiffrement du serveur -- les secrets ne sont jamais stockés en clair et jamais journalisés.
La capture de la réponse est exhaustive. Chaque champ est enregistré :
- Requête : URL, méthode, en-têtes, corps
- Réponse : code de statut, en-têtes, corps (tronqué à 1 Mo)
- Métadonnées : started_at, finished_at, durée en millisecondes, numéro de tentative de retry, triggered_by (planification, manuel, ou API)
- Erreur : si la requête a échoué au niveau réseau, le message d'erreur
Cela signifie que les utilisateurs peuvent déboguer les tâches en échec en regardant exactement ce que 0cron a envoyé et exactement ce qui est revenu. Pas de devinette, pas de « ça dit juste échec ».
Gestion de l'état de planification
Le module planificateur exporte les fonctions schedule_job et unschedule_job utilisées par le service de gestion des tâches. La séparation est nette : le planificateur ne voit que ce qui est dans le sorted set Redis ; le service de tâches contrôle ce qui y entre et en sort.
L'ordre est toujours base de données d'abord, Redis ensuite :
- Créer une tâche = INSERT dans PostgreSQL + ZADD dans Redis
- Mettre en pause une tâche = UPDATE du statut dans PostgreSQL + ZREM de Redis
- Reprendre une tâche = UPDATE du statut dans PostgreSQL + ZADD dans Redis
- Supprimer une tâche = DELETE de PostgreSQL + ZREM de Redis
Si les deux stores divergent (un crash entre l'INSERT et le ZADD), la tâche existe dans la base de données mais n'est pas planifiée -- un mode de défaillance sûr. L'utilisateur voit sa tâche dans le tableau de bord et peut la re-sauvegarder pour la replanifier. L'alternative dangereuse -- une tâche planifiée dans Redis mais absente de PostgreSQL -- ne se produit jamais parce que les écritures en base de données passent toujours en premier.
Pourquoi pas : les alternatives que nous avons rejetées
Nous avons évalué quatre alternatives avant d'opter pour les sorted sets Redis :
Roue de temporisation en mémoire (sleep_until de tokio avec une file de priorité) : rapide, mais volatile. Un redémarrage du serveur perd tout l'état de planification. Reconstruire depuis la base de données au démarrage signifie scanner chaque tâche active -- un délai de plusieurs secondes pendant lequel les tâches dues ne s'exécutent pas.
PostgreSQL LISTEN/NOTIFY : par connexion, pas durable. Si le planificateur se déconnecte (micro-coupure réseau, rotation du pool), il rate des notifications. Il faut un mécanisme de polling de secours de toute façon, ce qui rend la couche de notification redondante.
File de messages (RabbitMQ, SQS, Kafka) : ajoute de la complexité d'infrastructure (gestion du broker, dead letter queues, consumer groups) pour des garanties de durabilité dont nous n'avons pas besoin. Un tick manqué est récupéré au prochain poll une seconde plus tard.
Planificateur distribué (Dkron, Celery Beat) : construire 0cron au-dessus d'un autre moteur de planification signifie hériter de sa complexité et de ses exigences opérationnelles. Dkron nécessite un consensus Raft et un cluster. Notre planificateur fait 199 lignes. Nous comprenons chaque ligne.
Compromis connus
Aucun système n'est parfait dès la première itération. Quatre éléments sont sur la liste v2 :
- ZRANGEBYSCORE + ZREM devrait être atomique. Le verrou distribué gère correctement la condition de course, mais
ZPOPMINou un script Lua serait plus propre. - Le timeout du verrou est codé en dur à 60 secondes. Il devrait être au moins aussi long que le timeout configuré de la tâche plus une marge de sécurité.
- Les opérations Redis et PostgreSQL ne sont pas transactionnelles. Un pattern saga ou une table outbox fournirait une cohérence plus forte.
- Le crate cron attend des expressions à 7 champs (avec les secondes), alors que le cron standard utilise 5 champs. Le parseur NLP génère des expressions à 5 champs avec un préfixe
0secondes, ce qui fonctionne mais crée une incompatibilité d'impédance.
Ce sont des compromis délibérés à l'échelle actuelle de 0cron. Chacun a un chemin de mise à niveau clair quand le nombre d'utilisateurs le justifiera.
199 lignes
L'intégralité du planificateur -- polling, verrouillage, dispatch, replanification, plus les fonctions helper schedule_job et unschedule_job -- tient en 199 lignes de Rust. L'exécuteur ajoute 309 lignes supplémentaires. Combinés, 508 lignes de code forment le coeur d'un service de tâches cron qui gère la planification, l'exécution, les retries, la journalisation et les notifications.
C'est l'avantage de choisir la bonne primitive. Les sorted sets Redis font le gros du travail de maintenir une file ordonnée dans le temps avec des opérations O(log N). Le crate cron gère le parsing des expressions et le calcul de la prochaine exécution. reqwest gère HTTP. sqlx gère l'accès base de données. Tokio gère la concurrence asynchrone.
Le code applicatif -- nos 508 lignes -- est la colle. Il connecte ces primitives en un pipeline d'exécution cohérent. Il n'y a pas de framework, pas d'ORM, pas de conteneur d'injection de dépendances, pas de bus d'événements. Juste des structs, des fonctions, et le système de types Rust garantissant qu'ils s'assemblent correctement.
Parfois, la meilleure architecture est la plus simple qui fonctionne.
Ceci est la partie 3 d'une série en trois parties sur la construction de 0cron.dev. Si vous les avez manquées : Partie 1 : Pourquoi le monde a besoin d'un service cron à 2 $ couvre l'analyse de marché et la philosophie tarifaire. Partie 2 : 4 agents, 1 produit couvre la méthodologie de build parallèle qui a produit l'intégralité de la base de code en une seule session.