Cross-Event Logic with state¶
The state global map remembers values across events, so you can solve
problems that per-event --exec and the write-only track_*() functions
cannot: deduplication, request/response correlation, session reconstruction,
and state machines.
When to reach for state¶
Use state when a decision about the current event depends on events you've
already seen. For plain counting and aggregation, prefer track_*() — they're
faster and work in parallel.
state |
track_*() |
|
|---|---|---|
| Purpose | Cross-event logic | Metrics & aggregations |
| Read during processing | ✅ Yes | ❌ No (read in --end) |
| Parallel mode | ❌ Sequential only | ✅ Works in parallel |
| Stores | Any Rhai value | Any value |
| Speed | Slower (RwLock) | Faster |
| Use for | Dedup, FSMs, correlation | Counting, unique tracking, bucketing |
Sequential only
state requires sequential processing. Using it with --parallel raises a
runtime error ('state' is not available in --parallel mode). For
parallel-safe accumulation, use track_*() instead.
Deduplication — process each ID once¶
Drop repeated entries for the same key, keeping only the first occurrence:
kelora -j logs.jsonl \
--exec 'if !state.contains(e.request_id) {
state[e.request_id] = true;
e.is_first = true;
} else {
e.is_first = false;
}' \
--filter 'e.is_first == true' \
-k request_id,status
Track complex per-key state¶
Store nested maps to accumulate several attributes per user:
kelora -j examples/user-events.jsonl \
--exec 'if !state.contains(e.user) {
state[e.user] = #{login_count: 0, last_seen: (), errors: []};
}
let user_state = state[e.user];
user_state.login_count += 1;
user_state.last_seen = e.timestamp;
if e.has("error") {
user_state.errors.push(e.error);
}
state[e.user] = user_state;
e.user_login_count = user_state.login_count' \
-k timestamp,user,user_login_count
timestamp='2024-01-15T10:00:00Z' user='alice' user_login_count=1
timestamp='2024-01-15T10:01:00Z' user='bob' user_login_count=1
timestamp='2024-01-15T10:02:00Z' user='alice' user_login_count=2
timestamp='2024-01-15T10:03:00Z' user='alice' user_login_count=3
timestamp='2024-01-15T10:04:00Z' user='bob' user_login_count=2
timestamp='2024-01-15T10:05:00Z' user='charlie' user_login_count=1
timestamp='2024-01-15T10:06:00Z' user='alice' user_login_count=4
timestamp='2024-01-15T10:07:00Z' user='bob' user_login_count=3
timestamp='2024-01-15T10:08:00Z' user='charlie' user_login_count=2
timestamp='2024-01-15T10:09:00Z' user='alice' user_login_count=5
{"timestamp":"2024-01-15T10:00:00Z","user":"alice","event":"login"}
{"timestamp":"2024-01-15T10:01:00Z","user":"bob","event":"login"}
{"timestamp":"2024-01-15T10:02:00Z","user":"alice","event":"view_page"}
{"timestamp":"2024-01-15T10:03:00Z","user":"alice","event":"error","error":"timeout"}
{"timestamp":"2024-01-15T10:04:00Z","user":"bob","event":"purchase"}
{"timestamp":"2024-01-15T10:05:00Z","user":"charlie","event":"login"}
{"timestamp":"2024-01-15T10:06:00Z","user":"alice","event":"login"}
{"timestamp":"2024-01-15T10:07:00Z","user":"bob","event":"error","error":"payment_failed"}
{"timestamp":"2024-01-15T10:08:00Z","user":"charlie","event":"view_page"}
{"timestamp":"2024-01-15T10:09:00Z","user":"alice","event":"logout"}
Sequential event numbering¶
Assign a global counter across all events:
kelora -j logs.jsonl \
--begin 'state["count"] = 0' \
--exec 'state["count"] += 1; e.seq = state["count"]' \
-k seq,timestamp,message -F csv
For simple counting by category, prefer track_freq("category", e.category).
Convert state to a regular map¶
state is a special StateMap with limited operations. Convert it before
using map helpers like .to_logfmt() or .to_kv():
{"timestamp":"2024-01-15T10:00:00Z","level":"INFO","service":"api","message":"Application started","version":"1.2.3"}
{"timestamp":"2024-01-15T10:00:05Z","level":"DEBUG","service":"api","message":"Loading configuration","config_file":"/etc/app/config.yml"}
{"timestamp":"2024-01-15T10:00:10Z","level":"INFO","service":"database","message":"Connection pool initialized","max_connections":50}
{"timestamp":"2024-01-15T10:01:00Z","level":"WARN","service":"api","message":"High memory usage detected","memory_percent":85}
{"timestamp":"2024-01-15T10:01:30Z","level":"ERROR","service":"database","message":"Query timeout","query":"SELECT * FROM users","duration_ms":5000}
Event correlation (request/response pairs)¶
Match requests to responses, compute latency, and emit complete transactions:
kelora -j api-events.jsonl \
--exec 'if e.event_type == "request" {
state[e.request_id] = #{sent_at: e.timestamp, method: e.method};
e = (); # Don't emit until we see the response
} else if e.event_type == "response" && state.contains(e.request_id) {
let req = state[e.request_id];
e.duration_ms = (e.timestamp - req.sent_at).as_millis();
e.method = req.method;
state.remove(e.request_id); # Clean up
}' \
-k request_id,method,duration_ms,status
State machines for protocol analysis¶
Track connections through their lifecycle and flag invalid transitions:
kelora -j network-events.jsonl \
--exec 'if !state.contains(e.conn_id) {
state[e.conn_id] = "NEW";
}
let current_state = state[e.conn_id];
if current_state == "NEW" && e.event == "SYN" {
state[e.conn_id] = "SYN_SENT";
} else if current_state == "SYN_SENT" && e.event == "SYN_ACK" {
state[e.conn_id] = "ESTABLISHED";
} else if current_state == "ESTABLISHED" && e.event == "FIN" {
state[e.conn_id] = "CLOSING";
} else if e.event != "DATA" {
e.protocol_error = true; # Invalid transition
}
e.connection_state = state[e.conn_id]' \
--filter 'e.has("protocol_error")' \
-k timestamp,conn_id,event,connection_state
Session reconstruction¶
Accumulate events into a session and emit only when it ends:
kelora -j user-events.jsonl \
--exec 'if e.event == "login" {
state[e.session_id] = #{ user: e.user, events: [], start: e.timestamp };
}
if state.contains(e.session_id) {
state[e.session_id].events.push(#{event: e.event, ts: e.timestamp});
}
if e.event == "logout" {
let session = state[e.session_id];
session.end = e.timestamp;
session.event_count = session.events.len();
print(session.to_json());
state.remove(e.session_id);
}
e = ()' -q # Only emit complete sessions
Rate limiting — first N per key¶
Emit only the first 100 events per API key, then suppress the rest:
kelora -j api-logs.jsonl \
--exec 'if !state.contains(e.api_key) {
state[e.api_key] = 0;
}
state[e.api_key] += 1;
if state[e.api_key] > 100 {
e = (); # Drop after first 100 per key
}' \
-k timestamp,api_key,endpoint
Memory management for large state¶
For millions of keys, cap and reset state periodically:
kelora -j huge-logs.jsonl \
--exec 'if !state.contains("counter") { state["counter"] = 0; }
state["counter"] += 1;
# Periodic cleanup every 100k events
if state["counter"] % 100000 == 0 {
eprint("State size: " + state.len() + " keys");
if state.len() > 500000 {
state.clear();
eprint("State cleared");
}
}
if !state.contains(e.request_id) {
state[e.request_id] = true;
} else {
e = ();
}'
See Also¶
- Power-User Techniques — the wider feature gallery
- Advanced Scripting — multi-stage transforms
- Metrics and Tracking —
track_*()aggregation - Scripting Stages — how
--begin/--exec/--endrun