Log Incident Detector in Rust

Agents

⏱️ 6h 10min
📦 37 modules
Rust

A log watcher that reads your incidents out loud

Most log monitoring stops at "grep for ERROR." This one goes further: it tails a file like tail -f, filters for real incidents, asks an AI to summarize the burst in one sentence, turns that sentence into speech, and plays it through your speakers. In a few hundred lines of async Rust you get a tour of file tailing, channels, batching, the OpenAI API, and audio playback — all running concurrently. We'll grow it one small piece at a time.

The core idea in one sentence: three independent components — watcher, summarizer, player — pass data down a pipeline of channels, each at its own pace.

Three actors, one pipeline

The whole architecture is three structs, each with the same shape: a constructor and an async fn run(self) -> Result<()>. Data flows in one direction through typed channels:

log file
   └─> Watcher
          ──[String]──> Summarizer
                  ──[Vec<u8>]──> AudioPlayer
                          ──> speakers

Each arrow is a channel, and each box is an actor that owns one job and knows nothing about its neighbours' internals. The Watcher only knows it sends Strings; the Summarizer only knows it receives them and emits Vec<u8>. That ignorance is what lets you later swap any box for another without touching the rest.

The channels are mpsc (multi-producer, single-consumer) from tokio, and Result is the catch-all type from the anyhow crate, so every ? bubbles a failure up without us defining error types. Because every component exposes the identical run signature, wiring them together is uniform:

let mut tasks = Vec::new();
tasks.push(watcher.run().boxed());
tasks.push(summarizer.run().boxed());
tasks.push(player.run().boxed());

We collect the three futures into one Vec. The .boxed() call (from futures) erases each future's concrete type so they all share a single Vec element type — without it, three different async fn bodies would be three incompatible types. Nothing runs yet; we've only described the work.

// first task to finish (or fail) wins
select_all(tasks).await.0?;

select_all (also from futures) drives all three concurrently and returns the moment the first one completes. Its result is a tuple, so .0 pulls out that first future's Result, and the trailing ? re-raises any error straight out of main. No JoinHandle bookkeeping, no shared shutdown flag — if any actor errors, the whole pipeline unwinds.

Tailing a file without re-reading it

A naive watcher re-reads the whole file every cycle, which is wasteful and would re-announce old incidents. The fix is to remember a byte cursor and seek to it each pass. The Watcher carries that state in its fields:

struct Watcher {
    log_path: PathBuf,
    last_position: usize,
    pattern: Regex,
    log_tx: UnboundedSender<String>,
}

last_position is the cursor — the byte offset where we stopped reading last time. pattern is the compiled filter (more on it shortly), and log_tx is the sending half of the first channel. Keeping the cursor in the struct is what turns a one-shot read into a true tail.

Each pass opens the file fresh and jumps straight to the cursor:

let mut reader = BufReader::new(file);
let start = SeekFrom::Start(self.last_position as u64);
reader.seek(start).await?;

BufReader (from tokio) wraps the file so reads are buffered instead of hitting the OS per byte. SeekFrom::Start builds an absolute offset from last_position, and seek moves the read head there, so we skip everything we've already processed. The ? propagates an I/O error if the file vanished under us.

Now read the new lines one at a time:

let mut line = String::new();
loop {
    line.clear();
    let bytes = reader.read_line(&mut line).await?;
    if bytes == 0 { break; }       // EOF
    self.last_position += bytes;   // advance cursor
    // ... filter ...
}

read_line (from the AsyncBufReadExt trait) appends one line into our reused buffer and returns how many bytes it read. A return of 0 means end of file, so we break. Otherwise we add those bytes to last_position, which is what lets the next pass resume exactly here. Reusing one String and calling clear() avoids allocating a fresh buffer per line.

run wraps this in a poll loop: read what's there, wait, repeat. We use the async sleep (from tokio) for the wait — written sleep(500ms) in spirit — not the blocking thread::sleep from the standard library, so the runtime thread stays free for the other two actors while we idle.

Filtering for what actually matters

We don't ship every line to the AI — only incidents. A compiled Regex (from the regex crate) does the filtering. We define the pattern as a constant and compile it once when the Watcher is built, never per line:

const PATTERN: &str =
    r"(?i)\b(error|warning|warn|fatal|critical)\b";

(?i) makes the whole match case-insensitive, so ERROR and error both hit. The \b markers are word boundaries: they stop "warning" from matching inside a longer word like "rewarning." Compiling this string into a Regex is relatively expensive, which is exactly why it lives in a const and the struct, not the hot loop.

Inside the read loop, the test is one branch:

if self.pattern.is_match(&line) {
    // hand off to the summarizer
    self.log_tx.send(line.clone())?;
}

is_match just answers yes/no without allocating capture groups, which is all we need. We clone the line because line itself gets reused (and cleared) on the next iteration, so the channel needs its own owned copy. The send returns an error only if the receiver has been dropped — and that error is precisely the shutdown signal we want to propagate.

Batch, don't spam the API

Calling the model once per line would be slow, expensive, and noisy. Instead the summarizer accumulates lines and flushes them on a timer. Its event loop has to wait on two things at once — incoming lines and the clock — which is what select! is for:

async fn step(&mut self) -> Result<()> {
    select! {
        msg = self.log_rx.recv()
            => self.handle_message(msg),
        _ = self.interval.tick()
            => self.handle_tick().await?,
    }
    Ok(())
}

select! (from tokio) polls both branches and runs whichever fires first, then returns. One branch wakes when a new line arrives on the channel; the other wakes when the timer ticks. The interval (from tokio) fires every 5 seconds and self-corrects for processing time, unlike chaining plain sleeps which would drift.

Messages just land in a buffer — the tick is what triggers actual work:

fn handle_message(&mut self, msg: Option<String>) {
    match msg {
        Some(line) => self.buffer.add_new(line),
        // watcher dropped its sender: shut down
        None => self.active = false,
    }
}

recv hands us an Option. A Some is a fresh incident, so we stash it in the buffer for the next flush. A None means every sender upstream is gone, so the watcher has stopped — we flip active off, and run's while self.active loop exits cleanly on its next turn. That's graceful shutdown with no shared flags or signalling.

Two buffers: new errors and remembered ones

Good summaries need context — the AI should know what already happened so it doesn't repeat itself across flushes. So the buffer has two compartments:

struct MessageBuffer {
    new_messages: Vec<String>,
    // rolling history, capped
    processed_messages: VecDeque<String>,
}

new_messages holds lines that have arrived but haven't been summarized yet. processed_messages is the rolling memory of what we've already reported. It's a VecDeque — a ring buffer — because we need to push onto one end and drop from the other cheaply.

After a successful summary, the new messages rotate into history:

fn mark_as_processed(&mut self) {
    for msg in self.new_messages.drain(..) {
        if self.processed_messages.len() >= DEPTH {
            self.processed_messages.pop_front();
        }
        self.processed_messages.push_back(msg);
    }
}

drain(..) moves each String out of new_messages without cloning, emptying it in the process so the next batch starts clean. Before each push we check the cap: once history reaches DEPTH (1024) entries, pop_front evicts the oldest. The result is bounded memory no matter how long the detector runs — old incidents simply age out.

Asking the model for a one-liner

We build the request with the typed builders from async-openai. The messages carry both the fresh errors and the historical context, clearly labeled so the model knows which to focus on, plus a system prompt loaded with include_str! so it stays editable without recompiling logic. The request itself is a fluent builder:

let request = CreateChatCompletionRequestArgs::default()
    .model(MODEL)                  // small, fast
    .messages(messages)
    .max_completion_tokens(150u32) // force brevity
    .temperature(0.3)              // factual
    .build()?;

temperature(0.3) keeps the output stable and matter-of-fact across runs — a high temperature would make the same burst read differently each time. max_completion_tokens(150) caps both verbosity and cost, which matters when this fires during a real flood. build()? finalizes the builder and fails loudly if a required field was missed.

Then we send it and dig out the text:

let response =
    self.client.chat().create(request).await?;
let summary = response.choices
    .into_iter().next()
    .and_then(|c| c.message.content)
    .context("model returned no content")?;

create(request).await? makes the network call, reusing one shared Client for every request. The response holds a list of choices; we take the first and pull its content, which is itself optional. .context() (from anyhow) turns a missing field into a readable error instead of a silent None. The API key behind that client comes from --api-key or the OPENAI_API_KEY env var via clap's env feature.

From text to sound

The same client hits the speech endpoint. We describe the audio we want with another builder:

let speech = CreateSpeechRequestArgs::default()
    .model(SpeechModel::Tts1)
    .voice(Voice::Alloy)
    .response_format(SpeechResponseFormat::Mp3)
    .input(&summary)
    .build()?;

Each call picks one facet of the output: the text-to-speech model, the voice, the encoding, and the actual text to read — our one-line summary. Choosing Mp3 keeps the payload small for the trip down the channel. As before, build()? fails early if anything required is missing.

Then we run it and push the bytes to the player:

let audio =
    self.client.audio().speech().create(speech).await?;
self.audio_tx.send(audio.bytes.to_vec())?;

The call returns raw audio bytes, which we copy into an owned Vec<u8> and send down the second channel. The Summarizer is now done with this burst and immediately goes back to batching — it never waits for the sound to finish playing, because that's the player's job entirely.

Playing audio off the channel

The player uses rodio (cross-platform, built on cpal). It holds an OutputStream open for its whole life — dropping that handle would cut the sound mid-word — and pulls byte chunks off the channel as they arrive:

while let Some(data) = self.rx.recv().await {
    let source =
        Decoder::try_from(Cursor::new(data))?;
    // returns immediately, plays in background
    self.stream_handle.mixer().add(source);
}

Cursor::new(data) makes a plain Vec<u8> look like a seekable file, which is what the Decoder needs to parse the in-memory MP3. mixer().add(source) hands the decoded audio to the output and returns at once — playback continues in the background while the loop waits for the next chunk. The while let ends naturally when the channel closes, giving the player the same clean shutdown as the others.

In the real code the decode is wrapped in an if let Err(err) arm rather than a bare ?, so one corrupt chunk gets logged and skipped instead of taking down the whole player.

Where to go next, in production

The toy is honest, but real deployments want more:

  • Inotify over polling — replace the 500ms poll with notify for instant, zero-CPU reaction to changes, and handle log rotation (the file shrinking) by resetting last_position.
  • Backpressure — unbounded channels can grow without limit during a log storm. Swap in bounded channels so a slow AI step slows ingestion instead of eating memory.
  • Deduplication and rate limits — collapse identical stack traces before sending, and cap how often you call the model during a flood.
  • tracing everywhere — the project already uses tracing; ship it to a real collector so the detector is itself observable.

Why build it

Build this once and "async pipeline" stops being a buzzword. You've connected file I/O, regex filtering, time-based batching, an LLM call, and live audio — three concurrent actors talking over typed channels, each ignorant of the others' internals. That decoupling is the real lesson: swap the player for a Slack webhook, or the regex for a parser, and nothing else moves. The architecture, not the AI, is the interesting part.

Practice