Skip to content

Function Reference

Complete reference for all 150+ built-in Rhai functions available in Kelora. Functions are organized by category for easy lookup.

Function Call Syntax

Rhai allows two styles: value.method(args) or function(value, args). Use whichever feels more natural.

Quick Navigation


String Functions

Extraction and Searching

text.extract_regex(pattern [, group])

Extract first regex match or capture group.

e.error_code = e.message.extract_regex(r"ERR-(\d+)", 1)  // "ERR-404" → "404"
e.full_match = e.line.extract_regex(r"\d{3}")            // First 3-digit number

text.extract_regexes(pattern [, group])

Extract all regex matches as array.

e.numbers = e.line.extract_regexes(r"\d+")             // All numbers
e.codes = e.message.extract_regexes(r"ERR-(\d+)", 1)   // All error codes

text.extract_regex_maps(pattern, field)

Extract regex matches as array of maps for fan-out with emit_each().

// Extract all error codes with context
let errors = e.log.extract_regex_maps(r"(?P<code>ERR-\d+): (?P<msg>[^\n]+)", "error");
emit_each(errors)  // Each match becomes an event with 'code' and 'msg' fields

text.extract_ip([nth])

Extract IP address from text (nth: 1=first, -1=last).

e.client_ip = e.headers.extract_ip()                  // First IP
e.origin_ip = e.forwarded.extract_ip(-1)              // Last IP

text.extract_ips()

Extract all IP addresses as array.

e.all_ips = e.headers.extract_ips()                   // ["192.168.1.1", "10.0.0.1"]

text.extract_url([nth])

Extract URL from text (nth: 1=first, -1=last).

e.link = e.message.extract_url()                      // First URL

text.extract_email([nth])

Extract email address from text (nth: 1=first, -1=last).

e.contact = e.message.extract_email()                 // First email
e.sender = e.log.extract_email(1)                     // First email
e.recipient = e.log.extract_email(-1)                 // Last email

text.extract_emails()

Extract all email addresses as array.

e.all_contacts = e.message.extract_emails()           // ["alice@example.com", "bob@test.org"]

text.extract_domain()

Extract domain from URL or email address.

e.domain = "https://api.example.com/path".extract_domain()  // "example.com"
e.mail_domain = "user@corp.example.com".extract_domain()    // "corp.example.com"

String Slicing and Position

text.before(delimiter [, nth])

Text before occurrence of delimiter (nth: 1=first, -1=last).

e.user = e.email.before("@")                          // "user@host.com" → "user"
e.path = e.url.before("?")                            // Strip query string

text.after(delimiter [, nth])

Text after occurrence of delimiter (nth: 1=first, -1=last).

e.extension = e.filename.after(".")                   // "file.txt" → "txt"
e.domain = e.email.after("@")                         // "user@host.com" → "host.com"

text.between(start, end [, nth])

Text between start and end delimiters (nth: 1=first, -1=last).

Note: text.between(left, right, nth) is equivalent to text.after(left, nth).before(right).

e.quoted = e.line.between('"', '"')                   // Extract quoted string
"[a][b][c]".between("[", "]", 2)                      // "b" - same as .after("[", 2).before("]")

text.starting_with(prefix [, nth])

Return substring from prefix to end (nth: 1=first, -1=last).

e.from_error = e.log.starting_with("ERROR:")          // "INFO: ok ERROR: bad" → "ERROR: bad"

text.ending_with(suffix [, nth])

Return substring from start to end of suffix (nth: 1=first, -1=last).

e.up_to_end = e.log.ending_with(".txt")               // "file.txt more" → "file.txt"

text.slice(spec)

Slice text using Python notation (e.g., "1:5", ":3", "-2:").

e.first_three = e.code.slice(":3")                    // "ABCDEF" → "ABC"
e.last_two = e.code.slice("-2:")                      // "ABCDEF" → "EF"
e.middle = e.code.slice("2:5")                        // "ABCDEF" → "CDE"

Column Extraction

text.col(spec [, separator])

Extract columns by index/range/list (e.g., '0', '0,2,4', '1:4'). Indices are 0-based.

e.first = e.line.col("0")                             // First column (0-indexed)
e.cols = e.line.col("0,2,4")                          // Columns 0, 2, 4
e.range = e.line.col("1:4", "\t")                     // Columns 1-3, tab-separated

text.cols(col1, col2 [, col3, ...] [, separator])

Extract multiple columns as an array. Supports up to 6 column indices (0-indexed). Returns an array of column values.

// Extract columns 0, 2, 4 as array
let values = e.line.cols(0, 2, 4)                     // ["value0", "value2", "value4"]
e.user = values[0]
e.action = values[1]

// With custom separator
let data = e.line.cols(1, 3, "\t")                    // Tab-separated columns

// Practical example: Apache log parsing
let parts = e.log.cols(0, 3, 6, 8)                    // IP, timestamp, path, status
e.ip = parts[0]
e.timestamp = parts[1]
e.path = parts[2]
e.status = parts[3]

Parsing Functions

text.parse_json()

Parse JSON string into map/array.

e.data = e.payload.parse_json()
e.value = e.data["key"]

text.parse_logfmt()

Parse logfmt line into structured fields.

let fields = e.line.parse_logfmt()
e.level = fields["level"]

text.parse_syslog()

Parse syslog line into structured fields.

let syslog = e.line.parse_syslog()
e.priority = syslog["priority"]
e.message = syslog["message"]

text.parse_combined()

Parse Apache/Nginx combined log line.

let access = e.line.parse_combined()
e.ip = access["ip"]
e.status = access["status"]

text.parse_cef()

Parse Common Event Format line into fields.

let cef = e.line.parse_cef()
e.severity = cef["severity"]

text.parse_kv([sep [, kv_sep]])

Split key-value pairs from text. Only extracts tokens containing the key-value separator; tokens without the separator are skipped (e.g., prose words or unpaired values).

This is a simple splitter and is not quote-aware: it does not strip surrounding quotes from values and will split on a separator that appears inside a quoted value. For key="value with spaces" input (logfmt-style logs), use parse_logfmt() instead, which handles quoting and infers numeric/boolean types.

e.params = e.query.parse_kv("&", "=")                 // "a=1&b=2" → {a: "1", b: "2"}
e.fields = e.msg.parse_kv()                           // "Payment timeout order=1234" → {order: "1234"}
// e.msg.parse_kv() on 'err="connection refused"' would split mid-value — use parse_logfmt()

text.parse_url()

Parse URL into structured components.

let url = e.request.parse_url()
e.scheme = url["scheme"]
e.host = url["host"]
e.path = url["path"]

text.parse_query_params()

Parse URL query string into map.

e.params = e.query_string.parse_query_params()        // "a=1&b=2" → {a: "1", b: "2"}

text.parse_email()

Parse email address into parts.

let email = "User Name <user@example.com>".parse_email()
e.name = email["name"]       // "User Name"
e.address = email["address"] // "user@example.com"

text.parse_user_agent()

Parse common user-agent strings into components.

let ua = e.user_agent.parse_user_agent()
e.browser = ua["browser"]
e.os = ua["os"]

text.parse_jwt()

Parse a JWT into header and claims (the decoded payload) without verification. Also returns signature_b64u plus alg/kid/typ when present in the header. The standard NumericDate claims exp, iat, and nbf are additionally exposed as datetime values under expires_at, issued_at, and not_before, so they compose with now() and datetime arithmetic.

The decoded datetimes pair with their source claims as follows (the raw integers remain available under claims):

Datetime field Source claim Meaning
expires_at claims.exp Expiration time
issued_at claims.iat Issued-at time
not_before claims.nbf Not-valid-before time

A field is present only when its claim is present and a valid numeric date.

These three are real datetime values, not strings: expires_at < now() is a chronological comparison and expires_at - issued_at yields a duration, whereas claims.exp is the raw integer (Unix seconds) straight from the token. To render a datetime back to text, call .to_iso() or .format("%Y-%m-%d") — never compare it as a string, since lexical and chronological order can disagree.

let jwt = e.token.parse_jwt()
e.user_id = jwt["claims"]["sub"]
e.expired = jwt.expires_at < now()                 // bool (chronological)
e.lifetime = (jwt.expires_at - jwt.issued_at).to_string()
e.exp_iso = jwt.expires_at.to_iso()                // datetime -> string
e.exp_raw = jwt.claims.exp                          // raw int, e.g. 1735689600

text.parse_path()

Parse filesystem path into components.

let path = "/var/log/app.log".parse_path()
e.dir = path["dir"]          // "/var/log"
e.file = path["file"]        // "app.log"

text.parse_media_type()

Parse media type tokens and parameters.

let mt = "text/html; charset=utf-8".parse_media_type()
e.type = mt["type"]          // "text"
e.subtype = mt["subtype"]    // "html"

text.parse_content_disposition()

Parse Content-Disposition header parameters.

let cd = e.header.parse_content_disposition()
e.filename = cd["filename"]

Encoding and Hashing

text.encode_b64() / text.decode_b64()

Base64 encoding/decoding.

e.encoded = e.data.encode_b64()
e.decoded = e.payload.decode_b64()

text.encode_hex() / text.decode_hex()

Hexadecimal encoding/decoding.

e.hex = e.bytes.encode_hex()
e.bytes = e.hex_string.decode_hex()

text.encode_url() / text.decode_url()

URL percent encoding/decoding.

e.encoded = e.param.encode_url()                      // "hello world" → "hello%20world"
e.decoded = e.url_param.decode_url()

text.escape_json() / text.unescape_json()

JSON escape sequence handling.

e.escaped = e.text.escape_json()
e.unescaped = e.json_string.unescape_json()

text.escape_html() / text.unescape_html()

HTML entity escaping/unescaping.

e.safe = e.user_input.escape_html()                   // "<script>" → "&lt;script&gt;"
e.text = e.html_entity.unescape_html()

text.hash([algo])

Hash with algorithm (default: sha256, also: xxh3). One-way digest to redact or anonymize a value; for stable, correlatable aliases use pseudonym(value, domain) instead.

e.checksum = e.content.hash()                         // SHA-256
e.fast = e.data.hash("xxh3")                          // Fast non-crypto hash

text.bucket()

Fast hash for sampling/grouping (returns INT for modulo operations).

// Sample 10% of events
if e.user_id.bucket() % 10 == 0 {
    e.sampled = true
}

IP Address Functions

text.is_ipv4() / text.is_ipv6()

Check if text is a valid IP address.

if e.addr.is_ipv4() {
    e.ip_version = 4
}

text.is_private_ip()

Check if IP is in private/internal ranges.

Includes RFC1918 IPv4, IPv6 unique local (fc00::/7), IPv6 link-local (fe80::/10), and loopback addresses.

if e.ip.is_private_ip() {
    e.internal = true
}

text.is_in_cidr(cidr)

Check if IP address is in CIDR network.

if e.ip.is_in_cidr("10.0.0.0/8") {
    e.corp_network = true
}

text.mask_ip([octets])

Mask the host portion of an IP address while preserving the network prefix.

e.masked_ip = e.client_ip.mask_ip()                   // "192.168.1.100" → "192.168.1.0"
e.partial = e.ip.mask_ip(2)                           // "192.168.1.100" → "192.168.0.0"
e.ipv6_masked = e.ip.mask_ip(2)                       // "2001:db8:1:2:3:4:5:6" → "2001:db8:1:2:3:4::"

Pattern Normalization

text.normalized([patterns])

Replace variable patterns with placeholders (e.g., <ipv4>, <email>).

Useful for identifying unique log patterns by normalizing variable data like IP addresses, UUIDs, and email addresses to fixed placeholders.

// Default patterns (IPs, emails, UUIDs, hashes, etc.)
e.pattern = e.message.normalized()
// "User user@test.com from 192.168.1.5" → "User <email> from <ipv4>"

// CSV-style pattern list
e.simple = e.message.normalized("ipv4,email")

// Array-style pattern list
e.custom = e.message.normalized(["uuid", "sha256", "url"])

Default patterns (when no argument provided): ipv4_port, ipv4, ipv6, email, url, fqdn, uuid, mac, md5, sha1, sha256, path, oauth, function, hexcolor, version

Available patterns (opt-in): hexnum, duration, num, credit_card, ssn, phone

PII-oriented patterns are opt-in on purpose:

  • credit_card - Luhn-validated payment card numbers
  • ssn - US Social Security numbers in strict XXX-XX-XXXX format (hyphens required; spaces/dots not matched); rejects SSA-invalid area codes (000, 666, 900–999), group 00, and serial 0000
  • phone - NANP-aware validation for US/CA numbers, with permissive support for other international numbers

Common use case - Pattern discovery:

# Recommended alias for easy pattern discovery
kelora --save-alias patterns \
  --exec 'track_unique("patterns", e.message.normalized())' \
  --metrics -q

# Usage
kelora -a patterns app.log

Output with many patterns:

patterns     (127 unique):
  User <email> from <ipv4>
  Request to <url> failed
  Error <uuid> occurred
  Connection <ipv4_port> established
  Processing <fqdn> with <sha256>
  [+122 more. Use --metrics-file or --end script for full list]

For custom analysis, access full data in --end scripts or --metrics-file.

String Manipulation

text.strip([chars]) / text.lstrip([chars]) / text.rstrip([chars])

Remove whitespace or specified characters.

e.clean = e.text.strip()                              // Remove leading/trailing whitespace
e.trimmed = e.line.lstrip("# ")                       // Remove "# " from left
e.path = e.filename.rstrip("/")                       // Remove trailing slashes

text.clip() / text.lclip() / text.rclip()

Remove non-alphanumeric characters from edges.

e.word = "'hello!'".clip()                            // → "hello"
e.left = "...start".lclip()                           // → "start"
e.right = "end...".rclip()                            // → "end"

text.upper() / text.lower()

Case conversion. Note: Both upper()/lower() and to_upper()/to_lower() are available - use whichever you prefer (Rhai builtins vs Python-style).

e.normalized = e.country_code.upper()                 // "us" → "US"
e.also_upper = e.code.to_upper()                      // Same as upper()
e.lowercase = e.name.lower()                          // "Hello" → "hello"
e.also_lower = e.name.to_lower()                      // Same as lower()

text.replace(pattern, replacement)

Replace all occurrences of pattern.

e.cleaned = e.text.replace("ERROR", "WARN")

text.split(separator) / text.split_regex(pattern)

Split string into array.

e.parts = e.path.split("/")
e.tokens = e.line.split_regex(r"\s+")                 // Split on whitespace

String Testing

text.contains(pattern)

Check if text contains pattern.

if e.message.contains("timeout") {
    e.timeout_error = true
}

text.like(pattern)

Glob match (anchored) with * and ?.

if e.message.like("ERROR * timeout") {
    e.timeout_error = true
}

text.ilike(pattern)

Case-insensitive glob match with Unicode folding.

if e.message.ilike("*straße*") {
    e.locale = "de"
}

text.matches(pattern)

Regex search with cached compilation. Invalid patterns raise errors.

if e.path.matches(r"^/api/[^/]+/details$") {
    e.route = "details"
}

Text Matching Functions Comparison

Function Anchored Errors on invalid pattern Case handling Use case
like() Yes N/A (glob syntax) Exact Simple wildcard matching
ilike() Yes N/A Unicode fold Case-insensitive glob
matches() No Yes Regex-driven Full regex search with caching

⚠️ Regex performance tips: avoid nested quantifiers like (.*)*, prefer anchored patterns when possible, and reuse patterns to benefit from the per-thread cache.

text.is_digit()

Check if text contains only digits.

if e.status.is_digit() {
    e.status_code = e.status.to_int()
}

text.count(pattern)

Count occurrences of pattern in text.

e.error_count = e.log.count("ERROR")

text.edit_distance(other)

Compute Levenshtein edit distance between two strings.

if e.message.edit_distance("connection reset") <= 3 {
    e.is_connection_issue = true
}

text.index_of(substring [, start])

Find 0-based position of literal substring (-1 if not found). Optional start parameter specifies where to begin searching.

e.at_pos = e.url.index_of("?")                        // Find first "?"
e.second = e.text.index_of("test", 10)                // Search starting at position 10

Array Functions

Sorting and Filtering

array.sorted()

Return new sorted array (numeric/lexicographic).

e.sorted_scores = sorted(e.scores)                    // [3, 1, 2] → [1, 2, 3]
e.sorted_names = sorted(e.names)                      // Alphabetical

array.sorted_by(field)

Sort array of objects by field name.

let sorted_users = sorted_by(e.users, "age")
e.oldest = sorted_users[-1]

array.reversed()

Return new array in reverse order.

e.reversed = reversed(e.items)

array.slice(spec)

Slice array using Python notation (e.g., "1:5", ":3", "-2:").

e.top_three = e.values.slice(":3")                   // [9, 8, 7, 6] → [9, 8, 7]
e.tail = e.values.slice("-2:")                       // [9, 8, 7, 6] → [7, 6]
e.every_other = e.values.slice("0::2")               // [9, 8, 7, 6] → [9, 7]

array.unique()

Remove all duplicate elements (preserves first occurrence).

e.unique_tags = unique(e.tags)                        // [1, 2, 1, 3] → [1, 2, 3]

array.filter(|item| condition)

Keep elements matching condition.

e.errors = e.logs.filter(|log| log.level == "ERROR")

Aggregation

array.max() / array.min()

Find maximum/minimum value in array.

e.max_score = e.scores.max()
e.min_time = e.times.min()

array.percentile(pct)

Calculate percentile of numeric array.

e.p95 = e.latencies.percentile(95)
e.median = e.values.percentile(50)

array.sum()

Calculate sum of all numeric values (int, float) in array. All elements must be actual numbers (i64, f64). Mixed-type arrays (numbers + strings/booleans) are rejected and return (). No automatic string-to-number coercion. Returns () for empty arrays.

e.total_bytes = e.requests.pluck_as_nums("bytes").sum()
e.total_errors = [10, 20, 30].sum()                    // 60.0
[10, 20.5, 30].sum()                                   // 60.5
[10, "20", 30].sum()                                   // () - mixed types rejected
[].sum()                                               // () - empty array

array.mean()

Calculate arithmetic mean (average) of numeric values (int, float). All elements must be actual numbers (i64, f64). Mixed-type arrays (numbers + strings/booleans) are rejected. No automatic string-to-number coercion. Returns error for empty arrays or non-numeric arrays.

e.avg_latency = e.latencies.mean()
e.avg_score = [10, 20, 30].mean()                      // 20.0
[10, "20", 30].mean()                                  // ERROR - mixed types rejected

array.variance()

Calculate population variance of numeric values (int, float). All elements must be actual numbers (i64, f64). Mixed-type arrays are rejected. No automatic string-to-number coercion. Returns error for empty arrays or non-numeric arrays.

e.latency_variance = e.latencies.variance()
if e.latency_variance > 100.0 {
    print("High variance detected")
}

array.stddev()

Calculate standard deviation (population) of numeric values (int, float). All elements must be actual numbers (i64, f64). Mixed-type arrays are rejected. No automatic string-to-number coercion. Returns error for empty arrays or non-numeric arrays.

e.latency_stddev = e.latencies.stddev()
if e.latency_stddev > 10.0 {
    print("High variation: " + e.latency_stddev)
}

array.reduce(|acc, item| expr, init)

Aggregate array into single value.

e.total = e.amounts.reduce(|sum, x| sum + x, 0)

Transformation

array.map(|item| expression)

Transform each element.

e.doubled = e.numbers.map(|n| n * 2)
e.names = e.users.map(|u| u.name)

array.pluck(field) / array.pluck_as_nums(field)

Extract a single field from each element in an array of maps/objects, returning a new array of just those field values.

pluck(field) - Extract field values as-is, skipping elements where the field is missing or ().

pluck_as_nums(field) - Extract and convert field values to f64 numbers, skipping elements where conversion fails or the field is missing.

// Given array of event objects
let events = [
    #{status: 200, time: "1.5"},
    #{status: 404, time: "0.3"},
    #{status: 200, time: "2.1"}
]

// Extract field values
let statuses = events.pluck("status")        // [200, 404, 200]
let times = events.pluck_as_nums("time")     // [1.5, 0.3, 2.1] (converted to numbers)

// Compare to manual approach
let manual = events.map(|e| e.status)        // Same result, but errors if field missing

Common use cases:

// Calculate average response time
let times = events.pluck_as_nums("response_time")
let avg = times.reduce(|sum, x| sum + x, 0) / times.len()

// Find most common status codes
let codes = events.pluck("status")
for code in codes {
    track_freq("code", code)
}

// With window for rolling analysis (requires --window)
let recent_times = window.pluck_as_nums("response_time")
e.avg_recent = recent_times.reduce(|sum, x| sum + x, 0) / recent_times.len()
e.spike = recent_times.filter(|t| t > 1000).len()

Why use pluck() vs map():

  • Safe: Automatically skips missing fields instead of erroring
  • Clear intent: Explicitly shows you're extracting one field
  • Type conversion: pluck_as_nums() handles string-to-number conversion

array.flattened([style [, max_depth]])

Flatten nested arrays/objects.

e.flat = [[1, 2], [3, 4]].flattened()                 // Returns flat map
e.fields = e.nested.flattened("dot", 2)               // Flatten to dot notation

Testing

array.contains(value)

Check if array contains value.

if e.roles.contains("admin") {
    e.is_admin = true
}

array.contains_any(search_array)

Check if array contains any search values.

if e.tags.contains_any(["error", "critical"]) {
    e.alert = true
}

array.starts_with_any(search_array)

Check if array starts with any search values.

if e.path_parts.starts_with_any(["/api", "/v1"]) {
    e.api_call = true
}

array.all(|item| condition) / array.some(|item| condition)

Check if all/any elements match condition.

e.all_valid = e.scores.all(|s| s >= 0)
e.has_errors = e.logs.some(|l| l.level == "ERROR")

Other Operations

array.join(separator)

Join array elements with separator.

e.path = e.parts.join("/")
e.csv = e.values.join(",")

array.push(item) / array.pop()

Add/remove items from array.

e.tags.push("new_tag")
let last = e.items.pop()

Map/Object Functions

Field Access

map.get_path("field.path" [, default])

Safe nested field access with fallback.

e.user_name = e.get_path("user.profile.name", "unknown")
e.score = e.get_path("stats.score", 0)

map.has_path("field.path")

Check if nested field path exists.

if e.has_path("error.details.code") {
    e.detailed_error = true
}

map.path_equals("path", value)

Safe nested field comparison.

if path_equals(e, "user.role", "admin") {
    e.elevated = true
}

map.has("key")

Check if map contains key with non-unit value.

if e.has("error_code") {
    // Field exists and has a value
}

Field Manipulation

map.keep(["field1", ...])

Return a new map containing only selected top-level fields that exist.

let shaped = e.keep(["service", "level", "msg"])
// Missing fields are ignored; result contains only existing selected keys

map.drop(["field1", ...])

Return a new map containing all top-level fields except selected ones.

let trimmed = e.drop(["_raw", "_file", "_offset"])
// Missing fields are ignored; empty list returns a copy of the original map

Both methods are pure: they return a new top-level map and do not mutate e. Field names are matched exactly as strings (no path traversal or wildcards).

map.rename_field("old", "new")

Rename a field, returns true if successful.

e.rename_field("old_name", "new_name")

map.merge(other_map)

Merge another map into this one (overwrites existing keys).

e.merge(#{status: "ok", timestamp: now()})

map.enrich(other_map)

Merge another map, inserting only missing keys (does not overwrite).

e.enrich(#{user: "default", level: "info"})  // Only adds if keys don't exist

map.flattened([style [, max_depth]])

Flatten nested object to dot notation.

let flat = e.nested.flattened("dot")                  // {a: {b: 1}} → {"a.b": 1}
let flat = e.nested.flattened("dot", 2)               // With max depth

map.flatten_field("field_name")

Flatten just one specific field from the map.

let flat = e.flatten_field("metadata")                // Flattens only e.metadata

map.unflatten([separator])

Reconstruct nested object from flat keys.

let nested = e.flat.unflatten(".")                    // {"a.b": 1} → {a: {b: 1}}

Format Conversion

map.to_json([pretty])

Convert map to JSON string.

e.payload = e.data.to_json()
e.readable = e.data.to_json(true)                     // Pretty-printed

map.to_logfmt()

Convert map to logfmt format string.

e.formatted = e.fields.to_logfmt()                    // {a: 1, b: 2} → "a=1 b=2"

map.to_kv([sep [, kv_sep]])

Convert map to key-value string with separators.

e.query = e.params.to_kv("&", "=")                    // {a: 1, b: 2} → "a=1&b=2"

map.to_syslog() / map.to_cef() / map.to_combined()

Convert map to specific log format.

e.syslog_line = e.fields.to_syslog()
e.cef_line = e.security_event.to_cef()
e.access_log = e.request.to_combined()

DateTime Functions

Creation

now()

Current timestamp (UTC).

e.timestamp = now()

to_datetime(text [, fmt [, tz]])

Convert string into datetime value with optional hints.

e.parsed = to_datetime("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S", "UTC")
e.auto = to_datetime("2024-01-15T10:30:00Z")          // Auto-detect format

to_duration("1h30m")

Convert duration string into duration value.

let timeout = to_duration("5m")
e.deadline = now() + timeout

duration_from_seconds(n), duration_from_minutes(n), etc.

Create duration from specific units.

let hour = duration_from_hours(1)
let day = duration_from_days(1)

Formatting

dt.to_iso()

Convert datetime to ISO 8601 string.

e.iso_timestamp = e.timestamp.to_iso()                // "2024-01-15T10:30:00Z"

dt.format("format_string")

Format datetime using custom format string (see --help-time).

e.date = e.timestamp.format("%Y-%m-%d")               // "2024-01-15"
e.time = e.timestamp.format("%H:%M:%S")               // "10:30:00"

Component Extraction

dt.year(), dt.month(), dt.day()

Extract date components.

e.year = e.timestamp.year()
e.month = e.timestamp.month()
e.day = e.timestamp.day()

dt.hour(), dt.minute(), dt.second()

Extract time components.

e.hour = e.timestamp.hour()

Timezone Conversion

dt.to_utc() / dt.to_local()

Convert timezone.

e.utc_time = e.local_timestamp.to_utc()
e.local_time = e.utc_timestamp.to_local()

dt.to_timezone("tz_name")

Convert to named timezone.

e.ny_time = e.timestamp.to_timezone("America/New_York")

dt.timezone_name()

Get timezone name as string.

e.tz = e.timestamp.timezone_name()                    // "UTC"

Time Bucketing

dt.round_to("interval")

Round timestamp down to the nearest interval. Useful for grouping events into time buckets for histograms and time-series analysis.

Accepts duration strings like "5m", "1h", "1d", etc.

// Group events into 5-minute buckets
let timestamp = to_datetime(e.timestamp);
e.bucket = timestamp.round_to("5m").to_iso();
track_freq("requests_per_5min", e.bucket);

// Hourly buckets
e.hour_bucket = to_datetime(e.time).round_to("1h").format("%Y-%m-%d %H:00");

// Daily buckets
e.day = timestamp.round_to("1d").format("%Y-%m-%d");

Common intervals: - "1m", "5m", "15m" - Minute-level bucketing - "1h", "6h", "12h" - Hour-level bucketing - "1d", "7d" - Day/week-level bucketing

dt.ceil_to("interval")

Round timestamp up to the next interval boundary. If the timestamp is already exactly on a boundary, it stays unchanged.

Useful for computing bucket end-times or "next window" boundaries.

let ts = to_datetime(e.timestamp);
e.bucket_start = ts.round_to("1h").to_iso();
e.bucket_end = ts.ceil_to("1h").to_iso();

// 12:34:56 ceil to 5m → 12:35:00
// 12:30:00 ceil to 5m → 12:30:00 (already on boundary)

Arithmetic and Comparison

dt + duration, dt - duration

Add/subtract duration from datetime.

e.future = now() + duration_from_hours(1)
e.past = now() - duration_from_days(7)

dt1 - dt2

Get duration between datetimes.

let elapsed = now() - e.start_time
e.duration_ms = elapsed.as_milliseconds()

dt1 == dt2, dt1 > dt2, etc.

Compare datetimes.

if e.timestamp > to_datetime("2024-01-01") {
    e.this_year = true
}

Duration Operations

duration.as_seconds(), duration.as_milliseconds(), etc.

Convert duration to specific units.

e.seconds = duration.as_seconds()
e.ms = duration.as_milliseconds()
e.hours = duration.as_hours()

duration.to_string() / humanize_duration(ms)

Format duration as human-readable string.

e.readable = duration.to_string()                     // "1h 30m"
e.humanized = humanize_duration(5400000)              // "1h 30m"

duration.to_debug()

Format duration with full precision for debugging. Useful for inspecting exact duration values.

e.debug_duration = duration.to_debug()                // Full precision debug output

Math Functions

abs(x)

Absolute value of number.

e.magnitude = abs(e.value)

clamp(value, min, max)

Constrain value to be within min/max range.

e.bounded = clamp(e.score, 0, 100)

floor(x) / round(x)

Rounding operations.

e.floored = floor(e.value)
e.rounded = round(e.value)

mod(a, b) / a % b

Modulo operation with division-by-zero protection.

e.bucket = e.id % 10

rand() / rand_int(min, max)

Random number generation.

e.random_id = rand_int(1000, 9999)                    // Random ID assignment

// For sampling, prefer sample_every() instead:
// if sample_every(10) { e.sampled = true }           // Better: counter-based

Set the KELORA_SEED environment variable to a non-negative integer to make rand(), rand_int(), and sample_prob() reproducible (e.g. for tests or repeatable sampling). Reproducibility holds in sequential mode; under --parallel, thread scheduling still affects which worker consumes which value.

sample_every(n)

Sample every Nth event - returns true on calls N, 2N, 3N, etc.

Fast counter-based sampling (thread-local, approximate in parallel mode). Each unique N value maintains its own counter. For deterministic sampling across parallel threads, use bucket() instead.

// Keep only every 100th event (1% sampling)
if !sample_every(100) { skip() }

// Keep every 10th event (10% sampling)
if sample_every(10) {
    e.sampled = true
}

// Different N values have independent counters
sample_every(10)    // Returns true on calls 10, 20, 30...
sample_every(100)   // Returns true on calls 100, 200, 300...

Comparison with bucket(): - sample_every(n) - Fast counter, approximate in parallel mode, non-deterministic - e.field.bucket() % n == 0 - Hash-based, deterministic across runs/threads, slightly slower

sample_prob(p)

Probabilistic sampling — returns true with probability p (0.0–1.0). Useful for "keep ~N% of events" without manual rand() checks.

// Keep ~1% of events
if !sample_prob(0.01) { skip() }

// 10% sampling for metrics
if sample_prob(0.10) {
    track_sum("sampled_errors", 1)
}

Comparison with other sampling methods: - sample_prob(p) - Probabilistic, ~p fraction kept, non-deterministic - sample_every(n) - Counter-based, exact 1/n fraction, approximate in parallel - e.field.bucket() % n == 0 - Hash-based, deterministic across runs/threads


Output Formatting Functions

String-returning helpers for rendering numbers in human-readable form. Useful in inline event output, eprint, and end-of-stream summary reports.

human_bytes(n)

Format byte count with binary/IEC units (1024-based): B, KiB, MiB, GiB, TiB, PiB, EiB.

human_bytes(1536)                                     // "1.5 KiB"
human_bytes(1073741824)                               // "1.0 GiB"
e.size_h = human_bytes(e.bytes)

human_bytes_si(n)

Format byte count with decimal/SI units (1000-based): B, KB, MB, GB, TB, PB, EB.

human_bytes_si(1500)                                  // "1.5 KB"
human_bytes_si(1_500_000_000)                         // "1.5 GB"
e.size_h = human_bytes_si(e.bytes)

format_decimals(value, decimals)

Format a number as a string with exactly N digits after the decimal point. Negative decimals is treated as 0; very large values are clamped to 20.

format_decimals(1.0 / 3.0, 3)                         // "0.333"
format_decimals(1.0, 2)                               // "1.00"
format_decimals(42.987, 0)                            // "43"

format_percent(ratio, decimals)

Format a ratio (0.0–1.0) as a percentage string with N decimals and % suffix. The input is multiplied by 100, so 0.042 renders as "4.2%".

format_percent(0.042, 1)                              // "4.2%"
format_percent(0.5, 0)                                // "50%"
format_percent(e.errors.to_float() / e.total, 2)      // "3.14%"

Padding & Alignment

All padding and shortening helpers are Unicode-width aware — CJK / wide characters count as 2 columns, zero-width combining marks as 0. Use these for aligned columns in summary tables and inline output.

text.ljust(n) / text.ljust(n, fill)

Left-justify by padding the right side with spaces (or fill) to reach display width n. Strings already at or beyond n are returned unchanged.

"hi".ljust(5)                                         // "hi   "
"ERROR".ljust(8, '.')                                 // "ERROR..."

text.rjust(n) / text.rjust(n, fill)

Right-justify by padding the left side. Common for numeric columns and zero padding.

"42".rjust(5)                                         // "   42"
"42".rjust(5, '0')                                    // "00042"

text.center(n) / text.center(n, fill)

Center within width n. If the remaining pad is odd, the extra column goes on the right.

"hi".center(6)                                        // "  hi  "
" TITLE ".center(20, '=')                             // "====== TITLE ======="

text.shorten(n) / text.shorten(n, marker)

If text exceeds display width n, keep the start and append marker (default "…", 1 column). Pass "" for a hard truncate, or "..." for an ASCII-only marker.

"hello world".shorten(8)                              // "hello w…"
"hello world".shorten(8, "...")                       // "hello..."
"hello world".shorten(5, "")                          // "hello"

text.shorten_middle(n) / text.shorten_middle(n, marker)

If text exceeds display width n, keep both ends and insert marker in the middle. Ideal for paths, URLs, UUIDs, and fully-qualified names where the distinguishing info lives at the end.

let path = "/home/user/projects/kelora/src/rhai_functions/formatting.rs";
path.shorten_middle(30)                               // "/home/user/proj…formatting.rs"
path.shorten_middle(30, "...")                        // ASCII marker

Colors & Styles

ANSI escape-sequence helpers that wrap the string with a color or style code and a reset. When colors are disabled (output is not a TTY, NO_COLOR is set, or --no-color was passed), these functions return the string unchanged, so scripts work transparently whether output is piped to a file or a terminal. No flag detection needed inside scripts.

text.red() / .green() / .yellow() / .blue() / .cyan() / .magenta()

Wrap text with the corresponding ANSI foreground color. The palette matches Kelora's existing logfmt output (bright red/green/yellow/magenta, regular blue/cyan).

"ERROR".red()
"OK".green()
"WARN".yellow()
e.level = e.level.yellow()

text.bold() / text.dim()

Apply bold or dim styling.

"header".bold()
e.timestamp = e.timestamp.dim()

Color and style helpers are chainable — they compose by stacking SGR codes:

"CRITICAL".bold().red()                               // bold red
"meta".dim().cyan()                                   // dim cyan

Charts & Sparklines

bar(value, max, width)

Render a fixed-width Unicode bar showing value / max.

This is the single, unambiguous form. For a pre-normalized ratio, pass max as 1 (e.g. bar(0.42, 1, 10)).

  • Uses eighth-block characters for sub-cell resolution
  • Clamps values outside 0..max
  • Returns spaces when max <= 0
bar(7, 10, 10)                                        // "███████   "
bar(3, 8, 4)                                          // "█▌  "
bar(0.42, 1, 10)                                      // "████▏     "

sparkline(array)

Render an array of numbers as a single-line sparkline using ▁▂▃▄▅▆▇█.

  • Scales values from 0..max(array)
  • Negative and non-numeric values render as spaces
  • Empty arrays return ""
sparkline([1, 4, 2, 8, 5, 7])                         // "▁▄▂█▅▇"

Type Conversion Functions

to_int(value) / to_float(value) / to_bool(value)

Convert value to type (returns () on error).

e.status = to_int(e.status_string)
e.score = to_float(e.score_string)

to_int(value, thousands_sep) / to_float(value, thousands_sep, decimal_sep)

Parse formatted numbers with explicit separators.

Parameters: - thousands_sep - The thousands/grouping separator (single char or empty string) - decimal_sep - The decimal separator (single char or empty string)

Examples:

// US format (comma thousands, dot decimal)
e.price = "1,234.56".to_float(',', '.')     // → 1234.56
e.count = "1,234,567".to_int(',')           // → 1234567

// EU format (dot thousands, comma decimal)
e.price = "1.234,56".to_float('.', ',')     // → 1234.56
e.count = "1.234.567".to_int('.')           // → 1234567

// French format (space thousands, comma decimal)
e.price = "1 234,56".to_float(' ', ',')     // → 1234.56
e.count = "2 000 000".to_int(' ')           // → 2000000

// No thousands separator (empty string)
e.price = "1234.56".to_float("", '.')       // → 1234.56

to_int_or(value, default) / to_float_or(value, default) / to_bool_or(value, default)

Convert value to type with fallback.

e.status = e.status_string.to_int_or(0)
e.score = e.score_string.to_float_or(0.0)

to_int_or(value, thousands_sep, default) / to_float_or(value, thousands_sep, decimal_sep, default)

Parse formatted numbers with separators and fallback.

// With error handling
e.amount = e.value.to_float_or(',', '.', 0.0)   // Default to 0.0 if invalid
e.count = e.total.to_int_or(',', 0)             // Default to 0 if invalid

value.or_empty()

Convert empty values to Unit () for removal/filtering.

Converts conceptually "empty" values to Unit, which:

  • Removes the field when assigned (e.g., e.field = value.or_empty())
  • Gets skipped by track_*() functions
  • Works with missing fields (passes Unit through unchanged)

Supported empty values:

  • Empty string: ""()
  • Empty array: []()
  • Empty map: #{}()
  • Unit itself: ()() (pass-through)

String extraction:

// Extract only when prefix exists, otherwise remove field
e.name = e.message.after("prefix:").or_empty()

// Track only non-empty values
track_unique("names", e.extracted.or_empty())

Array filtering:

// Only assign tags if array is non-empty
e.tags = e.tags.or_empty()  // [] becomes (), field removed

// Track only events with items
track_freq("item_count", e.items.len())
if e.items.len() == 0 {
    e.items = e.items.or_empty()  // Remove empty array
}

Map filtering:

// Only keep non-empty metadata
e.metadata = e.parse_json().or_empty()  // {} becomes (), field removed

// Safe chaining with missing fields
e.optional = e.maybe_field.or_empty()  // Works even if maybe_field is ()

Common pattern - conditional extraction and tracking:

e.extracted = e.message.after("User:").or_empty()
track_unique("users", e.extracted)  // Only tracks when extraction succeeds

// Filter events with no data
e.results = e.search_results.or_empty()
track_unique("result_sets", e.results)  // Skips empty arrays and ()


Utility Functions

get_env(var [, default])

Get environment variable with optional default.

e.branch = get_env("CI_BRANCH", "main")
e.build_id = get_env("BUILD_ID")

pseudonym(value, domain)

Generate a domain-separated pseudonym — use this to redact, anonymize, or mask a value with a stable, reproducible alias.

Set the KELORA_SECRET environment variable to produce stable pseudonyms that match across separate runs (e.g. correlating the same IP between two batches processed on different days). Without KELORA_SECRET, kelora falls back to an ephemeral per-run key: pseudonymization still works, but the values change on every run and will not correlate across runs. In ephemeral mode kelora prints a one-time warning to stderr (suppressed by --silent/--no-diagnostics).

e.user_alias = pseudonym(e.username, "users")
e.ip_alias = pseudonym(e.client_ip, "ips")

See also (redaction / anonymization / masking): text.hash([algo]) for one-way hashing, text.mask_ip([octets]) for masking IP octets, and text.normalized([patterns]) for replacing sensitive patterns with placeholders.

read_file(path) / read_lines(path)

Read file contents.

e.config = read_file("config.json")
e.lines = read_lines("data.txt")

drain_template(text [, options])

Add a line to the Drain template model and return {template, count, is_new}. Sequential mode only.

let r = drain_template(e.message);
e.template = r.template;

Default token filters normalize: ipv4_port, ipv4, ipv6, email, url, fqdn, uuid, mac, md5, sha1, sha256, path, oauth, function, hexcolor, version, hexnum, duration, timestamp, date, time, num. For lightweight normalization without Drain, use normalized() on the field instead.

Optional options map keys:

  • depth (int)
  • max_children (int)
  • similarity (float)
  • filters (string CSV or array of grok patterns)

drain_templates()

Return array of {template, count} from the current Drain model. Sequential mode only.

let templates = drain_templates();

print(message) / eprint(message)

Print to stdout/stderr (suppressed with --no-script-output or data-only modes).

print("Processing event: " + e.id)
eprint("Warning: " + e.error)

exit(code)

Exit kelora with given exit code.

if e.critical {
    exit(1)
}

skip()

Skip the current event, mark it as filtered, and continue with the next one. Downstream stages and output for the skipped event do not run.

if e.endpoint == "/health" {
    skip();
}

status_class(status_code)

Convert HTTP status code to class string ("1xx", "2xx", "3xx", "4xx", "5xx", or "unknown").

e.status_category = status_class(e.status)            // 404 → "4xx", 200 → "2xx"
e.is_error = status_class(e.code) == "5xx"

// Track errors by class
track_freq("status_class", status_class(e.status))

// Group status codes for analysis
e.status_group = status_class(e.response_code)        // 503 → "5xx"

type_of(value)

Get type name as string.

e.value_type = type_of(e.value)                       // "string", "int", "array", etc.

window.pluck(field) / window.pluck_as_nums(field)

Extract field values from the sliding window array (requires --window). See array.pluck() for detailed documentation.

The window variable is an array containing the N most recent events, making pluck() especially useful for rolling calculations and burst detection.

// Rolling average of response times
let recent_times = window.pluck_as_nums("response_time")
e.avg_recent = recent_times.reduce(|sum, x| sum + x, 0) / recent_times.len()

// Detect error bursts
let recent_statuses = window.pluck("status")
e.error_burst = recent_statuses.filter(|s| s >= 500).len() >= 3

// Compare current vs recent average
let recent_vals = window.pluck_as_nums("value")
e.spike = e.value > (recent_vals.reduce(|s, x| s + x, 0) / recent_vals.len()) * 2

State Management Functions

The global state object provides a mutable map for tracking information across events. Only available in sequential mode - accessing state in --parallel mode will raise an error.

Parallel Mode

State management is not available when using --parallel. All state operations will raise errors. Use --metrics tracking functions for parallel-safe aggregation.

Basic Operations

state["key"] / state[key] = value

Get or set values using indexer syntax.

// Initialize counter
state["count"] = 0

// Increment counter
state["count"] = state["count"] + 1

// Track unique IPs
if !state.contains("seen_ips") {
    state["seen_ips"] = []
}
state["seen_ips"].push(e.ip)

state.get(key) / state.get(key, default) / state.set(key, value)

Get or set values using method syntax. get(key) returns () if the key doesn't exist. get(key, default) returns default when the key is missing or holds (), mirroring map.get(key, default).

let count = state.get("count")                        // Returns () if not found
state.set("total_bytes", 0)

// Default-arg form, equivalent to the `??` idiom below
let current = state.get("count", 0)
state.set("count", current + 1)

// `??` remains the recommended idiom for inline use
state.set("count", (state.get("count") ?? 0) + 1)

state.contains(key)

Check if a key exists in state.

if !state.contains("initialized") {
    state["initialized"] = true
    state["start_time"] = now()
}

Map Operations

state.keys() / state.values()

Get arrays of all keys or values.

let all_keys = state.keys()                           // ["count", "total", "seen_ips"]
let all_values = state.values()                       // [42, 1024, [...]]

// Iterate over all state entries
for key in state.keys() {
    print(key + ": " + state[key].to_string())
}

state.len() / state.is_empty()

Get number of entries or check if empty.

if state.is_empty() {
    state["initialized"] = true
}

let num_keys = state.len()                            // Number of entries

state.remove(key)

Remove a key from state and return its value (or () if not found).

let old_value = state.remove("temp_data")             // Remove and get value
state.remove("cache")                                 // Just remove

state.clear()

Remove all entries from state.

// Reset state
state.clear()

Bulk Operations

state.mixin(map)

Merge a map into state, overwriting existing keys.

// Initialize multiple values
state.mixin(#{
    count: 0,
    total_bytes: 0,
    seen_users: []
})

// Merge new data
state.mixin(e.metadata)                               // Add all metadata fields

state.fill_with(map)

Replace entire state with a new map.

// Reset state with new values
state.fill_with(#{
    count: 0,
    start_time: now()
})

state += map

Operator form of mixin() - merge map into state.

state += #{ new_field: 42, another: "value" }

Conversion

state.to_map()

Convert state to a regular map for use with other functions.

// Export state as JSON
let state_json = state.to_map().to_json()
print(state_json)

// Export as logfmt
let state_logfmt = state.to_map().to_logfmt()

// Use in conditions
let snapshot = state.to_map()
if snapshot.contains("error_count") && snapshot["error_count"] > 100 {
    exit(1)
}

Practical Examples

Counter Pattern:

// Initialize on first event
if state.is_empty() {
    state["event_count"] = 0
    state["error_count"] = 0
}

// Increment counters
state["event_count"] = state["event_count"] + 1
if e.level == "ERROR" {
    state["error_count"] = state["error_count"] + 1
}

// Output summary at end
--end 'print("Events: " + state["event_count"] + ", Errors: " + state["error_count"])'

Deduplication Pattern:

// Initialize seen set
if !state.contains("seen_ids") {
    state["seen_ids"] = #{}  // Use map as set
}

// Skip duplicates
if state["seen_ids"].contains(e.request_id) {
    skip()
}
state["seen_ids"][e.request_id] = true

Session Tracking:

// Track active sessions
if !state.contains("sessions") {
    state["sessions"] = #{}
}

let session_id = e.session_id
if !state["sessions"].contains(session_id) {
    state["sessions"][session_id] = #{
        start: e.timestamp,
        events: 0
    }
}

// Update session
let session = state["sessions"][session_id]
session["events"] = session["events"] + 1
session["last_seen"] = e.timestamp


Tracking/Metrics Functions

All tracking functions require the --metrics flag.

Shared conventions across the track_*() family:

  • Unit values are skipped. Missing fields and failed conversions produce Unit (), which every track_*() function skips instead of erroring. Skips are counted per metric and surfaced via --diagnostics, so a field-name typo is detectable.
  • Categorical arguments accept any scalar. Category and item arguments take strings, numbers, and bools; non-string values are stringified (track_freq("status", e.status) just works).
  • One metric name, one function. Using the same metric name with two different track_*() functions is an error — the aggregation strategies are incompatible. In --parallel runs a conflict between --begin and the event stages is reported as a warning at merge time instead of a per-call error. Known limitation: the check is per aggregation, so a track_stats("lat", ...) suffix key (lat_min, lat_sum, ...) can silently share a name with a standalone call of the matching function (e.g. track_min("lat_min", ...)) — keep track_stats base names distinct.
  • __kelora_* and __op_* metric names are reserved. Kelora uses these prefixes for internal bookkeeping and hides them from all metrics output.

Tracking Functions

track_avg(key, value)

Track average of numeric values for key. Automatically computes the average during output. Skips Unit () values. Works correctly in parallel mode.

track_avg("avg_latency", e.response_time)
track_avg(e.endpoint, e.duration_ms)

// Safe with conversions that may fail
let latency = e.latency_str.to_float()  // Returns () on error
track_avg("avg_ms", latency)            // Skips () values

track_freq(name, value)

Build a frequency table: count occurrences of each distinct value under the metric name. The result is a nested map {name: {value: count}}. Values may be strings, numbers, or bools (stringified into the map key, so no to_string() is needed); Unit () values are skipped. track_freq is the full-distribution sibling of track_top/track_bottom, which keep only the most/least frequent N.

track_freq("service", e.service)        // Count events per service
track_freq("status", e.status)          // Numeric values just work
track_freq("level", e.level)            // {level: {ERROR: 12, INFO: 3041}}

// A histogram "bucket" is just a value you computed yourself:
track_freq("status_class", e.status / 100 * 100)        // 200/300/400/500
track_freq("latency_ms", floor(e.response_time / 100) * 100)

// For a single running counter, use track_inc (or track_sum):
track_inc("total")                      // same as track_sum("total", 1)

Changed in kelora 2.0

The categorical counter was named track_count in earlier 2.0 previews (and track_count(value) / track_bucket(key, bucket) in 1.x). It is now track_freq(name, value), because "count" was ambiguous between a per-value frequency table and a plain scalar counter. Use track_freq(name, value) for frequency tables and track_inc(name) / track_sum(name, 1) for running counters. track_count and track_bucket were removed and error with a migration hint.

track_inc(name)

Increment a running counter by 1 — readable sugar for track_sum(name, 1). Shares the additive sum operation, so it merges identically across parallel workers and span windows.

track_inc("events")                     // total event count
if e.level == "ERROR" {
    track_inc("errors")                 // conditional counter
}

track_sum(key, value)

Accumulate numeric values for key. Skips Unit () values.

track_sum("total_bytes", e.bytes)
track_sum(e.endpoint, e.response_time)

// Safe with conversions that may fail
let score = e.score_str.to_int()  // Returns () on error
track_sum("total_score", score)   // Skips () values

track_min(key, value) / track_max(key, value)

Track minimum/maximum value for key. Skips Unit () values.

track_min("fastest", e.response_time)
track_max("slowest", e.response_time)

track_unique(key, value)

Track unique values for key. Skips Unit () values.

track_unique("users", e.user_id)
track_unique("ips", e.client_ip)

// Combined with .or_empty() for conditional tracking
track_unique("names", e.message.after("User:").or_empty())

track_cardinality(key, value) / track_cardinality(key, value, error_rate)

Estimate unique count using HyperLogLog algorithm. Uses ~12KB of memory regardless of cardinality, with ~1% standard error by default. Skips Unit () values. Works correctly in parallel mode.

When to use: For high-cardinality data (millions of unique values) where track_unique() would consume too much memory. Use track_unique() when you need the actual values or have low cardinality.

// Basic usage - ~1% standard error, ~12KB memory
track_cardinality("unique_ips", e.client_ip)
track_cardinality("unique_sessions", e.session_id)

// Custom error rate for higher precision (uses more memory)
track_cardinality("unique_users", e.user_id, 0.005)  // 0.5% error

// Safe with optional fields
track_cardinality("unique_emails", e.email.or_empty())

Output format: Shows prefix in text output to indicate approximate value:

unique_ips   ≈ 1234567

Error rate bounds: 0.001 (0.1%) to 0.26 (26%). Lower error = more memory.

track_cardinality vs track_unique

track_unique() track_cardinality()
Memory O(n) - grows with cardinality O(1) - fixed ~12KB
Accuracy Exact ~1% error (configurable)
Scale Thousands Billions
Values stored Yes (can list them) No (count only)

track_top(name, item [, n]) / track_bottom(name, item [, n])

Track the N most (track_top) or least (track_bottom) frequent items. n defaults to 10. Items may be strings, numbers, or bools; Unit () items are skipped.

// Top 10 most common errors (default n)
track_top("common_errors", e.error_type)

// Top 5 most active users
track_top("active_users", e.user_id, 5)

// Bottom 5 rarest errors
track_bottom("rare_errors", e.error_type, 5)

Output format: [{key: "item", count: 42}, ...], sorted by count (descending for top, ascending for bottom), ties broken alphabetically by key.

track_top_by(name, item, score [, n]) / track_bottom_by(name, item, score [, n])

Track the N items with the highest (track_top_by) or lowest (track_bottom_by) score. Each item keeps its best score (max for top, min for bottom). n defaults to 10. Unit () items or scores are skipped.

// Top 10 slowest endpoints by latency
track_top_by("slowest_endpoints", e.endpoint, e.latency_ms)

// Top 5 biggest requests by bytes
track_top_by("heavy_requests", e.request_id, e.bytes, 5)

// 10 fastest endpoints by latency
track_bottom_by("fastest_endpoints", e.endpoint, e.latency_ms)

// Handles missing values gracefully
track_top_by("cpu_hogs", e.process, e.cpu_time.or_empty())  // Skips ()

Output format: [{key: "item", value: 123.4}, ...], sorted by score (descending for top, ascending for bottom), ties broken alphabetically by key.

Changed in kelora 2.0

In 1.x, score-based ranking was the 4-argument form track_top(key, item, n, value). It is now its own function with the score in the natural position and n optional: track_top_by(name, item, score [, n]).

Memory Efficiency

track_top() and track_bottom() use bounded memory (O(N) per key) unlike track_unique() (which stores every distinct value) or track_freq() (one map entry per distinct category). For high-cardinality fields, prefer top/bottom tracking.

Parallel Mode Behavior

In parallel mode, each worker maintains its own top/bottom N. During merge, the lists are combined, re-sorted, and trimmed to N. Final results are deterministic.

track_percentiles(key, value [, [percentiles]])

Track streaming percentiles using the t-digest algorithm for memory-efficient percentile estimation. Automatically creates suffixed metrics for each percentile (e.g., latency_p50, latency_p95, latency_p99.9). This is the only track_*() function that auto-suffixes because percentiles are inherently multi-valued. Skips Unit () values. Works correctly in parallel mode.

Default percentiles: [0.50, 0.95, 0.99] when no array provided.

Percentile notation: Use 0.0-1.0 range (quantile notation): - 0.50 = 50th percentile (median) → creates key_p50 - 0.95 = 95th percentile → creates key_p95 - 0.999 = 99.9th percentile → creates key_p99.9

Memory efficiency: Uses ~4KB per metric regardless of event count (vs. storing all values). Suitable for millions of events.

Accuracy: ~1-2% relative error, suitable for operational monitoring.

// Default percentiles [0.50, 0.95, 0.99]
track_percentiles("api_latency", e.response_time)
// Creates: api_latency_p50, api_latency_p95, api_latency_p99

// Custom percentiles
track_percentiles("latency", e.duration_ms, [0.50, 0.95, 0.99])
// Creates: latency_p50, latency_p95, latency_p99

// High-precision percentiles
track_percentiles("latency", e.duration_ms, [0.999, 0.9999])
// Creates: latency_p99.9, latency_p99.99

// Per-endpoint tracking
track_percentiles("latency_" + e.endpoint, e.response_time, [0.95, 0.99])

// Safe with conversions that may fail
let latency = e.latency_str.to_float()  // Returns () on error
track_percentiles("api_p95", latency)   // Skips () values

When to Use Percentiles vs. Average

Use track_percentiles() instead of track_avg() when:

  • You need tail latency metrics (p95, p99) for SLO monitoring
  • Data has outliers that would skew the average
  • You need multiple percentile values (median, p95, p99)
  • Working with latency, response time, or duration metrics

Parallel Mode Behavior

In parallel mode, each worker maintains its own t-digest. During merge, digests are combined using the t-digest merge algorithm, preserving accuracy. Final percentile values are deterministic.


track_stats(key, value [, [percentiles]])

Convenience function that tracks comprehensive statistics in a single call: min, max, avg, count, sum, and percentiles. Automatically creates suffixed metrics for each statistic. Ideal for getting the complete statistical picture of a metric without calling multiple track_*() functions. Skips Unit () values. Works correctly in parallel mode.

Auto-created metrics: - {key}_min - Minimum value - {key}_max - Maximum value - {key}_avg - Average (stored as sum+count for parallel merging) - {key}_count - Total count - {key}_sum - Total sum - {key}_p50, {key}_p95, {key}_p99 - Percentiles (default)

Default percentiles: [0.50, 0.95, 0.99] when no array provided.

Percentile notation: Same as track_percentiles() - use 0.0-1.0 range (quantile notation).

// Default percentiles [0.50, 0.95, 0.99]
track_stats("response_time", e.duration_ms)
// Creates: response_time_min, response_time_max, response_time_avg,
//          response_time_count, response_time_sum,
//          response_time_p50, response_time_p95, response_time_p99

// Custom percentiles
track_stats("latency", e.duration, [0.50, 0.90, 0.99, 0.999])
// Creates all basic stats plus: latency_p50, latency_p90, latency_p99, latency_p99.9

// Per-endpoint comprehensive tracking
track_stats("api_" + e.endpoint, e.response_time)

// Safe with conversions that may fail
let duration = e.duration_str.to_float()  // Returns () on error
track_stats("request_ms", duration)        // Skips () values

When to Use track_stats() vs. Individual Functions

Use track_stats() when:

  • You want the complete statistical picture (min, max, avg, percentiles)
  • Analyzing latency, response time, or duration metrics
  • Building dashboards that need multiple statistical views
  • Prototyping or exploring data characteristics

Use individual track_min/max/avg/percentiles when:

  • You only need specific statistics (performance optimization)
  • Fine-grained control over which metrics are tracked
  • Minimizing memory usage (percentiles use ~4KB per metric)

Performance Considerations

track_stats() internally calls the same logic as individual tracking functions, so it has the same performance characteristics. The main overhead is from percentile tracking (~4KB memory per metric). If you don't need percentiles, use track_min(), track_max(), and track_avg() instead.

Parallel Mode Behavior

All generated metrics use existing merge operations (min, max, avg, count, sum, percentiles), so track_stats() works correctly in parallel mode with no special handling required.


File Output Functions

All file output functions require the --allow-fs-writes flag.

append_file(path, text_or_array)

Append line(s) to file; arrays append one line per element.

append_file("errors.log", e.message)
append_file("batch.log", [e.line1, e.line2, e.line3])

truncate_file(path)

Create or zero-length a file for fresh output.

truncate_file("output.log")

mkdir(path [, recursive])

Create directory (set recursive=true to create parents).

mkdir("logs")
mkdir("deep/nested/path", true)

Event Manipulation

emit_each(array [, base_map])

Fan out array elements as separate events (returns emitted count).

emit_each(e.users)                                    // Each user becomes an event
emit_each(e.items, #{batch_id: e.batch_id})           // Add batch_id to each

// Use return value to track emission count
let count = emit_each(e.batch_items, #{batch_id: e.id})
track_sum("items_emitted", count)

e = ()

Clear entire event (remove all fields).

if e.should_drop {
    e = ()  // Event is filtered out
}

e.field = ()

Remove individual field from event.

e.password = ()                                       // Remove sensitive field
e.temp_data = ()                                      // Clean up temporary field

e.absorb_kv(field [, options])

Parse inline key=value tokens from a string field, merge the pairs into the event, and get a status report back. Returns a map with status, data, written, remainder, removed_source, and error so scripts can branch without guessing.

let res = e.absorb_kv("msg", #{ sep: ",", kv_sep: "=", keep_source: true });
if res.status == "applied" {
    e.cleaned_msg = res.remainder ?? "";
    // Parsed keys now live on the event; res.data mirrors the inserted pairs
}

Options:

  • sep: string or () (default whitespace) – token separator; () normalizes whitespace.
  • kv_sep: string (default "=") – separator between key and value.
  • keep_source: bool (default false) – leave the original field untouched; use remainder for cleaned text.
  • overwrite: bool (default true) – allow parsed keys to overwrite existing event fields; set false to skip conflicts.

Unknown option keys set status = "invalid_option"; in --strict mode this aborts the pipeline.

absorb_kv is a simple splitter and is not quote-aware — it keeps surrounding quotes on values and splits on separators inside quoted values. For logfmt-style fields with quoted values (e.g. err="connection refused"), use absorb_logfmt() instead.

e.absorb_logfmt(field [, options])

Parse a logfmt string field, merge its keys into the event, and return the same status map as absorb_kv(). Unlike absorb_kv(), this is quote-aware (surrounding quotes are stripped and quoted values may contain spaces) and infers numeric/boolean types. It is all-or-nothing like absorb_json(): a bare, unpaired token makes the whole field a parse_error (no partial extraction), so remainder is always (). On success the source field is deleted unless keep_source is true.

// 'pod="kube-system/foo" err="connection refused" replicas=3'
let res = e.absorb_logfmt("msg");
if res.status == "applied" {
    // e.pod == "kube-system/foo", e.err == "connection refused", e.replicas == 3 (int)
} else if res.status == "parse_error" {
    warn(`not logfmt: ${res.error}`);
}

Options:

  • keep_source: bool (default false) – keep the original logfmt string instead of deleting the field.
  • overwrite: bool (default true) – allow parsed keys to replace existing event fields (false skips conflicts).

Other absorb options (like sep/kv_sep) are accepted for consistency but ignored — logfmt has a fixed syntax.

e.absorb_json(field [, options])

Parse a JSON object from a string field, merge its keys into the event, and return the same status map as absorb_kv(). On success the source field is deleted unless keep_source is true, and remainder is always ().

let res = e.absorb_json("payload");
if res.status == "applied" {
    e.actor = e.actor ?? e.user;      // merged from payload
} else if res.status == "parse_error" {
    warn(`bad payload: ${res.error}`);
}

Options:

  • keep_source: bool (default false) – keep the original JSON string instead of deleting the field.
  • overwrite: bool (default true) – allow parsed keys to replace existing event fields (false skips conflicts).

Other absorb options (like sep) are accepted for consistency but ignored. JSON parsing is all-or-nothing: invalid JSON or non-object payloads set status = "parse_error" and leave the event untouched.

e.absorb_jwt(field [, options])

Parse a JWT from a string field and merge its claims (the decoded payload) into the event, returning the same status map as absorb_kv(). The header and signature are ignored — only the claims are flattened, mirroring how absorb_json() flattens a JSON object. Signatures are not verified, so this is for debugging / trusted tokens only. On success the source field is deleted unless keep_source is true, and remainder is always ().

// e.token = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhbGljZSIsInJvbGUiOiJhZG1pbiJ9.sig"
let res = e.absorb_jwt("token");
if res.status == "applied" {
    // e.sub == "alice", e.role == "admin", e.exp == 1735689600 (int), ...
} else if res.status == "parse_error" {
    warn(`bad token: ${res.error}`);
}

Options:

  • keep_source: bool (default false) – keep the original token string instead of deleting the field.
  • overwrite: bool (default true) – allow claims to replace existing event fields (false skips conflicts).

Other absorb options (like sep) are accepted for consistency but ignored — a JWT's structure is fixed. Parsing is all-or-nothing: a malformed token sets status = "parse_error" and leaves the event untouched. The time claims land as raw integers; for datetime-typed exp/iat/nbf (e.g. to compare against now()), use parse_jwt() instead.

e.absorb_regex(field, pattern [, options])

Extract named capture groups from a string field using a regular expression pattern, merge the extracted values into the event, and return a status map (same structure as absorb_kv() and absorb_json()).

The pattern must use named capture groups ((?P<name>...)) to define which parts of the text to extract. Only named captures become event fields; numbered groups are ignored.

// Extract user and IP from log message
let res = e.absorb_regex("msg", #"User (?P<user>\w+) logged in from (?P<ip>[\d.]+)"#);
if res.status == "applied" {
    print(`${e.user} from ${e.ip}`);  // Extracted fields now on event
}

// Parse structured log line with multiple fields
let pattern = #"(?P<date>[\d-]+) (?P<level>\w+) (?P<file>[\w.]+):(?P<line>\d+) (?P<message>.+)"#;
e.absorb_regex("line", pattern);
// Now e.date, e.level, e.file, e.line, e.message are all populated

Options:

  • keep_source: bool (default false) – preserve the original field instead of removing it after extraction
  • overwrite: bool (default true) – allow extracted fields to overwrite existing event fields (false skips conflicts)

Status values:

  • "applied" – pattern matched and fields were extracted
  • "empty" – pattern didn't match (no captures)
  • "parse_error" – invalid regex pattern
  • "missing_field" – source field doesn't exist
  • "not_string" – source field is not a string
  • "invalid_option" – unknown option key (aborts in --strict mode)

When to use:

  • absorb_regex() – Extract structured data from unstructured text with custom patterns
  • absorb_kv() – Parse key=value pairs (simpler, faster)
  • absorb_json() – Parse JSON objects (type-aware)
  • Regex input format (-f regex) – Use for whole-log parsing at input time
// Complex example: parse Apache access log format
let apache_pattern = r#"(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+)[^"]*" (?P<status>\d+) (?P<bytes>\d+)"#;
e.absorb_regex("line", apache_pattern);

// Keep source for debugging
e.absorb_regex("raw_message", r"ERROR: (?P<error_code>\d+) - (?P<error_msg>.+)",
               #{ keep_source: true });

Span Context – --span-close Only

A read-only span object is injected into scope whenever a --span-close script runs. Use it to emit per-span rollups after Kelora closes a count- or time-based window.

Span Identity

span.id returns the current span identifier. Count-based spans use #<index> (zero-based). Time-based spans use ISO_START/DURATION (e.g. 2024-05-19T12:00:00Z/5m).

let id = span.id;  // "#0" or "2024-05-19T12:05:00Z/5m"

Span Boundaries

span.start and span.end expose the half-open window bounds as DateTime values. Count-based spans return () for both fields.

if span.start != () {
    print(`Window: ${span.start} → ${span.end}`);
}

Span Size and Events

span.size reports how many events survived filters and were buffered in the span. span.events returns those events in arrival order. Each map includes span metadata fields (span_status, span_id, span_start, span_end) alongside the original event data.

let included = span.events
    .filter(|evt| evt.span_status == "included")
    .len();

Metrics Snapshot

span.metrics contains per-window values from track_* calls, computed automatically for each span so you can emit summaries without manual bookkeeping. This works for additive aggregators: track_count, track_sum, track_avg, and track_unique.

Non-additive aggregators are omitted

track_min, track_max, track_percentiles, track_cardinality, track_top, and track_bottom accumulate global state that cannot be reduced to a single window (a t-digest or HLL has no subtraction, and a global max is not a per-window max). These keys are omitted from span.metrics and Kelora prints a one-time warning. Compute them per window by iterating span.events instead — e.g. span.events.map(|ev| ev.rt).filter(|v| v != ()).reduce(|a, b| if b > a { b } else { a }) for a per-window max.

let metrics = span.metrics;
let hits = metrics["events"];          // from track_sum("events", 1)
let failures = metrics["failures"];    // from track_sum("failures", 1)
let ratio = if hits > 0 { failures * 100 / hits } else { 0 };
print(span.id + ": " + ratio.to_string() + "% failure rate");

Quick Reference by Use Case

Error Extraction:

e.error_code = e.message.extract_regex(r"ERR-(\d+)", 1)

IP Anonymization:

e.masked_ip = e.client_ip.mask_ip()
e.ip_alias = pseudonym(e.client_ip, "ips")

Time Filtering:

if e.timestamp > to_datetime("2024-01-01") {
    // Process recent events
}

Metrics Tracking:

track_freq("service", e.service)
track_sum("bytes", e.response_size)
track_unique("users", e.user_id)

Array Fan-Out:

emit_each(e.users, #{batch_id: e.batch_id})

Safe Field Access:

e.user_name = e.get_path("user.profile.name", "unknown")
if e.has_path("error.details.code") {
    e.detailed = true
}


See Also

For more details, run:

kelora --help-functions       # This reference in CLI form
kelora --help-functions ip    # Search the catalogue by keyword (name/description)
kelora --help-rhai            # Rhai language guide
kelora --help-examples        # Common usage patterns