Skip to content

Composed Pipelines: Building Powerful Log Analysis Workflows

Learn to build sophisticated log analysis pipelines by composing Kelora's features into multi-stage workflows. This tutorial demonstrates real-world patterns that combine section isolation, multiline reconstruction, format parsing, filtering, metrics, and span aggregation into powerful analytical recipes.

Overview

Most production incidents require more than a single command. You need to:

  1. Isolate the relevant section (time window, service, severity)
  2. Reconstruct multi-line context (stack traces, JSON payloads)
  3. Parse the format correctly
  4. Filter and transform to extract insights
  5. Aggregate with metrics and span rollups

This tutorial shows how to compose these capabilities into complete workflows that answer complex questions.

What You'll Learn

  • Isolate log sections by time, service, or severity
  • Reconstruct multi-line stack traces and payloads
  • Chain filters and transformations for progressive refinement
  • Combine metrics tracking with span-based aggregation
  • Build reusable pipeline patterns for common scenarios
  • Export results at different pipeline stages

Prerequisites

Sample Data

This tutorial uses:

  • examples/multiline_stacktrace.log - Application logs with stack traces
  • examples/api_logs.jsonl - API gateway structured logs
  • examples/incident_story.log - Simulated deployment incident

Pattern 1: Time Window Isolation + Error Analysis

Scenario: A deployment happened at 12:32. Analyze errors in the 5-minute window after deployment.

Step 1: Isolate the Time Section

First, narrow down to the relevant time window:

kelora examples/incident_story.log \
  --since "2024-06-19T12:32:00Z" \
  --until "2024-06-19T12:37:00Z" \
  --stats
kelora examples/incident_story.log \
  --since "2024-06-19T12:32:00Z" \
  --until "2024-06-19T12:37:00Z" \
  --stats
Detected format: line
Lines processed: 6 total, 0 filtered (0.0%), 0 errors (0.0%)
Events created: 6 total, 0 output, 6 filtered (100.0%)
Throughput: 1248 lines/s in 4ms
Timestamp: (none found, 6 events) - 0/0 parsed (0.0%); 6 missing. Hint: Try --ts-field or --ts-format.
Keys seen: line

What to look for:

  • Event count in the window
  • Time span confirmation
  • Field availability

Step 2: Filter to Errors + Extract Structure

Now parse the format and filter to errors:

kelora examples/incident_story.log \
  --since "2024-06-19T12:32:00Z" \
  --until "2024-06-19T12:37:00Z" \
  --exec 'e.absorb_kv("line")' \
  --filter 'e.level == "ERROR"' \
  -k timestamp,level,pod,detail
kelora examples/incident_story.log \
  --since "2024-06-19T12:32:00Z" \
  --until "2024-06-19T12:37:00Z" \
  --exec 'e.absorb_kv("line")' \
  --filter 'e.level == "ERROR"' \
  -k timestamp,level,pod,detail

Pipeline flow:

  1. --since/--until: Isolate time window
  2. --exec: Parse key=value pairs from the line first
  3. --filter: Then keep only ERROR level (can access parsed fields)
  4. -k: Display specific fields

Step 3: Add Metrics for Summary

Combine with metrics to get error statistics:

kelora examples/incident_story.log \
  --since "2024-06-19T12:32:00Z" \
  --until "2024-06-19T12:37:00Z" \
  --exec 'e.absorb_kv("line")' \
  --filter 'e.level == "ERROR"' \
  --exec '
    track_count("total_errors");
    track_top("error_source", e.get_path("pod", "unknown"), 10);
  ' \
  --metrics
kelora examples/incident_story.log \
  --since "2024-06-19T12:32:00Z" \
  --until "2024-06-19T12:37:00Z" \
  --exec 'e.absorb_kv("line")' \
  --filter 'e.level == "ERROR"' \
  --exec '
    track_count("total_errors");
    track_top("error_source", e.get_path("pod", "unknown"), 10);
  ' \
  --metrics
error_source (2 items):
  #1  checkout-v2                    2
  #2  unknown                        1
total_errors = 3

Key insight: Chaining time filtering → error filtering → parsing → metrics gives you both detailed events and aggregate statistics in a single pass.


Pattern 2: Multiline Reconstruction + Pattern Analysis

Scenario: Extract and analyze stack traces from application logs to identify root causes.

Step 1: Reconstruct Stack Traces

Use multiline joining to reconstruct complete stack traces:

kelora examples/multiline_stacktrace.log \
  --multiline 'regex:match=^[0-9]{4}-[0-9]{2}-[0-9]{2}' --multiline-join=newline \
  --filter 'e.line.contains("ERROR")' \
  --take 3
kelora examples/multiline_stacktrace.log \
  --multiline 'regex:match=^[0-9]{4}-[0-9]{2}-[0-9]{2}' --multiline-join=newline \
  --filter 'e.line.contains("ERROR")' \
  --take 3
line='2024-01-15 10:01:00 ERROR Failed to process request\nTraceback (most recent call last):\n  File "/app/server.py", line 42, in handle_request\n    result = process_data(request.body)\n  File "/app/processor.py", line 15, in process_data\n    return json.loads(data)\nValueError: Invalid JSON format at line 3'
line='2024-01-15 10:02:00 ERROR Database connection failed\njava.sql.SQLException: Connection timeout\n\tat com.example.db.ConnectionPool.getConnection(ConnectionPool.java:123)\n\tat com.example.api.UserController.getUser(UserController.java:45)\n\tat com.example.api.RequestHandler.handle(RequestHandler.java:89)\nCaused by: java.net.SocketTimeoutException: Read timed out\n\tat java.net.SocketInputStream.socketRead0(Native Method)\n\tat java.net.SocketInputStream.read(SocketInputStream.java:150)'
line='2024-01-15 10:03:00 ERROR Unhandled exception in worker thread\nRuntimeError: Maximum retry attempts exceeded\n  File "/app/worker.py", line 67, in run\n    self.process_job(job)\n  File "/app/worker.py", line 98, in process_job\n    raise RuntimeError("Maximum retry attempts exceeded")'

What's happening:

  • --multiline: Lines not matching the timestamp pattern are joined to the previous event
  • --multiline-join=newline: Preserves line breaks in the grouped stack trace
  • Stack traces become part of the error event's line field with formatting intact
  • Now we have complete context for each error

Step 2: Parse and Extract Error Details

Parse the timestamp and level from the reconstructed line, then extract error types:

kelora examples/multiline_stacktrace.log \
  --multiline 'regex:match=^[0-9]{4}-[0-9]{2}-[0-9]{2}' --multiline-join=newline \
  --filter 'e.line.contains("ERROR")' \
  --exec '
    // Extract timestamp/level from the first line only
    let header = e.line.split("\n")[0];
    e.timestamp = header.col("0:2"); // date + time
    e.level = header.col("2");

    // Extract error message from the first line
    e.error_summary = header.col("3:");
  ' \
  -k timestamp,level,error_summary \
  --take 3
kelora examples/multiline_stacktrace.log \
  --multiline 'regex:match=^[0-9]{4}-[0-9]{2}-[0-9]{2}' --multiline-join=newline \
  --filter 'e.line.contains("ERROR")' \
  --exec '
    // Extract timestamp/level from the first line only
    let header = e.line.split("\n")[0];
    e.timestamp = header.col("0:2"); // date + time
    e.level = header.col("2");

    // Extract error message from the first line
    e.error_summary = header.col("3:");
  ' \
  -k timestamp,level,error_summary \
  --take 3
timestamp='2024-01-15 10:01:00' level='ERROR' error_summary='Failed to process request'
timestamp='2024-01-15 10:02:00' level='ERROR' error_summary='Database connection failed'
timestamp='2024-01-15 10:03:00' level='ERROR' error_summary='Unhandled exception in worker thread'

Step 3: Aggregate with Drain Pattern Discovery

Use drain to find common error patterns in the reconstructed stack traces:

kelora examples/multiline_stacktrace.log \
  --multiline 'regex:match=^[0-9]{4}-[0-9]{2}-[0-9]{2}' --multiline-join=newline \
  --filter 'e.line.contains("ERROR")' \
  --drain -k line
kelora examples/multiline_stacktrace.log \
  --multiline 'regex:match=^[0-9]{4}-[0-9]{2}-[0-9]{2}' --multiline-join=newline \
  --filter 'e.line.contains("ERROR")' \
  --drain -k line
templates (3 items):
  1: <date> <time> ERROR Database connection failed
java.sql.SQLException: Connection timeout
    at <fqdn> <fqdn> <fqdn> by: java.net.SocketTimeoutException: Read timed out
    at <num> Method)
    at <function>
  1: <date> <time> ERROR Failed to process request
Traceback (most recent call last):  File <path> line <num> in handle_request    result = <function>  File <path> line <num> in process_data    return <function> Invalid JSON format at line <num>
  1: <date> <time> ERROR Unhandled exception in worker thread
RuntimeError: Maximum retry attempts exceeded  File <path> line <num> in run    <function>  File <path> line <num> in process_job    raise RuntimeError("Maximum retry attempts exceeded")

Complete workflow:

  1. Reconstruct multi-line stack traces with preserved line breaks
  2. Filter to ERROR events
  3. Extract error message from reconstructed multiline block
  4. Use drain to discover patterns

Pattern 3: Service Isolation + Span-Based Rollup

Scenario: Analyze API errors per service with 1-minute rollups to identify problem services.

Step 1: Isolate Service and Level

Filter to a specific service and error level:

kelora -j examples/api_logs.jsonl \
  --filter 'e.service == "auth-service" && e.level == "ERROR"' \
  -k timestamp,service,message,status
kelora -j examples/api_logs.jsonl \
  --filter 'e.service == "auth-service" && e.level == "ERROR"' \
  -k timestamp,service,message,status
timestamp='2025-01-15T10:24:12Z' service='auth-service'
  message='Connection timeout while validating user credentials' status=500
timestamp='2025-01-15T10:26:44Z' service='auth-service'
  message='Unauthorized access attempt detected' status=403

Step 2: Add Metrics Tracking

Track error statistics:

kelora -j examples/api_logs.jsonl \
  --filter 'e.service == "auth-service" && e.level == "ERROR"' \
  --exec '
    track_count("errors");
    track_stats("response_time", e.get_path("response_time", 0.0));
    track_top("error_type", e.message, 5);
  ' \
  --metrics
kelora -j examples/api_logs.jsonl \
  --filter 'e.service == "auth-service" && e.level == "ERROR"' \
  --exec '
    track_count("errors");
    track_stats("response_time", e.get_path("response_time", 0.0));
    track_top("error_type", e.message, 5);
  ' \
  --metrics
error_type   (2 items):
  #1  Connection timeout while validating user credentials 1
  #2  Unauthorized access attempt detected 1
errors       = 2
response_time_avg = 2.5615
response_time_count = 2
response_time_max = 5.123
response_time_min = 0
response_time_p50 = 2.56
response_time_p95 = 4.87
response_time_p99 = 5.07
response_time_sum = 5.123

Step 3: Add Span-Based Time Rollup

Now add time-based spans for per-minute summaries:

kelora -j examples/api_logs.jsonl \
  --filter 'e.service == "auth-service" && e.level == "ERROR"' \
  --exec '
    track_count("errors");
    track_stats("response_time", e.get_path("response_time", 0.0));
  ' \
  --span 1m \
  --span-close '
    let m = span.metrics;
    let errors = m.get_path("errors", 0);
    let avg_time = m.get_path("response_time_avg", 0);
    print(`${span.start.to_iso()}: ${errors} errors, avg ${avg_time}s response time`);
  '
kelora -j examples/api_logs.jsonl \
  --filter 'e.service == "auth-service" && e.level == "ERROR"' \
  --exec '
    track_count("errors");
    track_stats("response_time", e.get_path("response_time", 0.0));
  ' \
  --span 1m \
  --span-close '
    let m = span.metrics;
    let errors = m.get_path("errors", 0);
    let avg_time = m.get_path("response_time_avg", 0);
    print(`${span.start.to_iso()}: ${errors} errors, avg ${avg_time}s response time`);
  '
2025-01-15T10:23:00+00:00: 0 errors, avg 0s response time
timestamp='2025-01-15T10:24:12Z' level='ERROR'
  message='Connection timeout while validating user credentials' service='auth-service'
  request_id='req-e5f6g7h8' user_id=103 response_time=5.123 status=500 client_ip='10.0.5.23'
  path='/api/auth/login' method='POST' error='ConnectionError: timeout after 5000ms'
  stack_trace='at validateCredentials (auth.js:234)\n  at processLogin (handler.js:89)'
2025-01-15T10:24:00+00:00: 1 errors, avg 0s response time
2025-01-15T10:25:00+00:00: 0 errors, avg 0s response time
timestamp='2025-01-15T10:26:44Z' level='ERROR' message='Unauthorized access attempt detected'
  service='auth-service' request_id='req-g3h4i5j6' user_id=999 status=403 client_ip='172.16.88.6'
  path='/api/admin/users' method='DELETE' source_ip='172.16.88.6'
2025-01-15T10:26:00+00:00: 1 errors, avg 0s response time
2025-01-15T10:27:00+00:00: 0 errors, avg 0s response time
2025-01-15T10:28:00+00:00: 0 errors, avg 0s response time
2025-01-15T10:29:00+00:00: 0 errors, avg 0s response time
2025-01-15T10:30:00+00:00: 0 errors, avg 0s response time
2025-01-15T10:31:00+00:00: 0 errors, avg 0s response time
2025-01-15T10:32:00+00:00: 0 errors, avg 0s response time
kelora hint: Metrics recorded; rerun with -m or --metrics=json to view them.

Composed pipeline:

  1. Filter to specific service and level
  2. Track error metrics per event
  3. Group into 1-minute windows
  4. Emit per-minute summaries with aggregates

Pattern 4: Progressive Filtering + Multi-Stage Transformation

Scenario: Find slow API requests, enrich with computed fields, then analyze by endpoint.

Step 1: Initial Filter for Slow Requests

kelora -j examples/api_logs.jsonl \
  --filter 'e.get_path("response_time", 0.0) > 1.0' \
  -k timestamp,service,path,response_time
kelora -j examples/api_logs.jsonl \
  --filter 'e.get_path("response_time", 0.0) > 1.0' \
  -k timestamp,service,path,response_time
timestamp='2025-01-15T10:24:12Z' service='auth-service' path='/api/auth/login' response_time=5.123
timestamp='2025-01-15T10:25:01Z' service='payment-service' path='/api/payments' response_time=2.567
timestamp='2025-01-15T10:29:11Z' service='analytics' path='/api/analytics' response_time=1.789
timestamp='2025-01-15T10:30:35Z' service='order-service' path='/api/orders' response_time=1.123

Step 2: Add Computed Fields

Classify response times into buckets:

kelora -j examples/api_logs.jsonl \
  --filter 'e.get_path("response_time", 0.0) > 1.0' \
  --exec '
    let rt = e.get_path("response_time", 0.0);
    e.latency_class = if rt > 5.0 {
      "critical"
    } else if rt > 2.0 {
      "high"
    } else {
      "moderate"
    };
  ' \
  -k timestamp,service,response_time,latency_class
kelora -j examples/api_logs.jsonl \
  --filter 'e.get_path("response_time", 0.0) > 1.0' \
  --exec '
    let rt = e.get_path("response_time", 0.0);
    e.latency_class = if rt > 5.0 {
      "critical"
    } else if rt > 2.0 {
      "high"
    } else {
      "moderate"
    };
  ' \
  -k timestamp,service,response_time,latency_class
timestamp='2025-01-15T10:24:12Z' service='auth-service' response_time=5.123 latency_class='critical'
timestamp='2025-01-15T10:25:01Z' service='payment-service' response_time=2.567 latency_class='high'
timestamp='2025-01-15T10:29:11Z' service='analytics' response_time=1.789 latency_class='moderate'
timestamp='2025-01-15T10:30:35Z' service='order-service' response_time=1.123
  latency_class='moderate'

Step 3: Filter to Critical Cases + Aggregate

Add another filter for critical cases and track:

kelora -j examples/api_logs.jsonl \
  --filter 'e.get_path("response_time", 0.0) > 1.0' \
  --exec '
    let rt = e.get_path("response_time", 0.0);
    e.latency_class = if rt > 5.0 {
      "critical"
    } else if rt > 2.0 {
      "high"
    } else {
      "moderate"
    };
  ' \
  --filter 'e.latency_class == "critical"' \
  --exec '
    track_count("critical_requests");
    track_top("service", e.service, 10);
    track_stats("latency", e.response_time);
  ' \
  --metrics
kelora -j examples/api_logs.jsonl \
  --filter 'e.get_path("response_time", 0.0) > 1.0' \
  --exec '
    let rt = e.get_path("response_time", 0.0);
    e.latency_class = if rt > 5.0 {
      "critical"
    } else if rt > 2.0 {
      "high"
    } else {
      "moderate"
    };
  ' \
  --filter 'e.latency_class == "critical"' \
  --exec '
    track_count("critical_requests");
    track_top("service", e.service, 10);
    track_stats("latency", e.response_time);
  ' \
  --metrics
critical_requests = 1
latency_avg  = 5.123
latency_count = 1
latency_max  = 5.123
latency_min  = 5.123
latency_p50  = 5.12
latency_p95  = 5.12
latency_p99  = 5.12
latency_sum  = 5.123
service      (1 items):
  #1  auth-service                   1

Multi-stage approach:

  1. First filter: Identify slow requests (>1s)
  2. First exec: Classify into latency buckets
  3. Second filter: Narrow to critical cases
  4. Second exec: Track metrics on critical subset

This progressive refinement lets you work with smaller datasets at each stage.


Pattern 5: Format Detection + Mixed Content Handling

Scenario: Logs contain both structured JSON and unstructured text. Extract errors from both.

Approach 1: Process Each Format Separately

For best results with mixed formats, use preprocessing:

# Extract and analyze JSON errors
grep '^{' examples/mixed_format.log | \
  kelora -f json --filter 'e.level == "ERROR"' -k timestamp,message

# Extract and analyze text errors
grep -v '^{' examples/mixed_format.log | \
  kelora -f line --filter 'e.line.contains("ERROR")' -k line

Approach 2: Fallback Parsing

Handle as line format and parse JSON where possible:

kelora examples/mixed_format.log \
  -f line \
  --exec '
    // Try to parse as JSON
    if e.line.starts_with("{") {
      let parsed = e.line.parse_json();
      if parsed != () {
        e.level = parsed.get_path("level", "UNKNOWN");
        e.message = parsed.get_path("message", "");
      }
    } else {
      // Plain text - extract level
      if e.line.contains("ERROR") {
        e.level = "ERROR";
      }
    }
  ' \
  --filter 'e.get_path("level") == "ERROR"' \
  -k line_num,level,message

Best practice: Separate formats upstream with grep for best performance and accuracy.


Pattern 6: Complete Incident Analysis Workflow

Scenario: End-to-end analysis of an API incident with time isolation, metrics, and rollup.

This brings together everything we've learned:

kelora -j examples/api_logs.jsonl \
  --since "2025-01-15T10:24:00Z" \
  --until "2025-01-15T10:30:00Z" \
  --filter 'e.level == "ERROR" || e.get_path("response_time", 0.0) > 2.0' \
  --exec-file examples/incident_workflow_exec.rhai \
  --span 2m \
  --span-close '
    let m = span.metrics;
    print(`\n=== Window: ${span.start.to_iso()} ===$()`);
    print(`  Total issues: ${span.size}`);
    print(`  Errors: ${m.get_path("error", 0)}`);
    print(`  Latency: ${m.get_path("latency", 0)}`);
    print(`  Avg response: ${m.get_path("response_time_avg", 0)}s`);
    print(`  P95 response: ${m.get_path("response_time_p95", 0)}s`);
  '
kelora -j examples/api_logs.jsonl \
  --since "2025-01-15T10:24:00Z" \
  --until "2025-01-15T10:30:00Z" \
  --filter 'e.level == "ERROR" || e.get_path("response_time", 0.0) > 2.0' \
  --exec-file examples/incident_workflow_exec.rhai \
  --span 2m \
  --span-close '
    let m = span.metrics;
    print(`\n=== Window: ${span.start.to_iso()} ===$()`);
    print(`  Total issues: ${span.size}`);
    print(`  Errors: ${m.get_path("error", 0)}`);
    print(`  Latency: ${m.get_path("latency", 0)}`);
    print(`  Avg response: ${m.get_path("response_time_avg", 0)}s`);
    print(`  P95 response: ${m.get_path("response_time_p95", 0)}s`);
  '
\n=== Window: 2025-01-15T10:22:00+00:00 ===$()
  Total issues: 0
  Errors: 0
  Latency: 0
  Avg response: 0s
  P95 response: 0s
timestamp='2025-01-15T10:24:12Z' level='ERROR'
  message='Connection timeout while validating user credentials' service='auth-service'
  request_id='req-e5f6g7h8' user_id=103 response_time=5.123 status=500 client_ip='10.0.5.23'
  path='/api/auth/login' method='POST' error='ConnectionError: timeout after 5000ms'
  stack_trace='at validateCredentials (auth.js:234)\n  at processLogin (handler.js:89)'
  issue_type='error'
timestamp='2025-01-15T10:25:01Z' level='WARN' message='Payment processing timeout - retrying'
  service='payment-service' request_id='req-m3n4o5p6' user_id=42 response_time=2.567 status=200
  client_ip='192.168.1.100' path='/api/payments' method='POST'
  referer='https://checkout.example.com' issue_type='latency'
timestamp='2025-01-15T10:25:18Z' level='ERROR' severity='critical'
  message='Database connection pool exhausted' service='database' request_id='req-q7r8s9t0'
  response_time=0.001 error='PoolExhausted: no available connections' issue_type='error'
timestamp='2025-01-15T10:25:45Z' level='ERROR' message='Invalid JWT token provided'
  service='api-gateway' request_id='req-u1v2w3x4' status=401 client_ip='198.51.100.77'
  path='/api/admin' method='GET'
  token='eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyMTIzIiwicm9sZSI6ImFkbWluIiwiZXhwIjoxNzA1MzE3NjAwfQ.dGVzdC1zaWduYXR1cmU'
  issue_type='error'
\n=== Window: 2025-01-15T10:24:00+00:00 ===$()
  Total issues: 4
  Errors: 3
  Latency: 1
  Avg response: 0s
  P95 response: 0s
timestamp='2025-01-15T10:26:44Z' level='ERROR' message='Unauthorized access attempt detected'
  service='auth-service' request_id='req-g3h4i5j6' user_id=999 status=403 client_ip='172.16.88.6'
  path='/api/admin/users' method='DELETE' source_ip='172.16.88.6' issue_type='error'
timestamp='2025-01-15T10:27:26Z' level='ERROR' message='File upload failed - size limit exceeded'
  service='storage' request_id='req-o1p2q3r4' user_id=156 status=413 client_ip='198.51.100.88'
  path='/api/upload' method='POST' error='FileSizeError: maximum size 10MB exceeded'
  issue_type='error'
\n=== Window: 2025-01-15T10:26:00+00:00 ===$()
  Total issues: 2
  Errors: 2
  Latency: 0
  Avg response: 0s
  P95 response: 0s
timestamp='2025-01-15T10:28:50Z' level='ERROR' message='Endpoint not found' service='api-gateway'
  request_id='req-e7f8g9h0' status=404 client_ip='172.16.88.7' path='/wp-admin' method='GET'
  issue_type='error'
\n=== Window: 2025-01-15T10:28:00+00:00 ===$()
  Total issues: 1
  Errors: 1
  Latency: 0
  Avg response: 0s
  P95 response: 0s
\n=== Window: 2025-01-15T10:30:00+00:00 ===$()
  Total issues: 0
  Errors: 2
  Latency: 0
  Avg response: 0s
  P95 response: 0s
\n=== Window: 2025-01-15T10:32:00+00:00 ===$()
  Total issues: 0
  Errors: 0
  Latency: 0
  Avg response: 0s
  P95 response: 0s
kelora hint: Metrics recorded; rerun with -m or --metrics=json to view them.

Complete pipeline stages:

  1. Isolation: Time window (--since/--until)
  2. Filtering: Errors or slow requests
  3. Classification: Compute issue types
  4. Metrics: Track by category and service
  5. Aggregation: 2-minute window rollups
  6. Summary: Per-window statistics

Pattern 7: Export Pipeline at Multiple Stages

Scenario: Export filtered data for external tools while also computing local metrics.

Export Filtered Events

Save filtered events to a file:

kelora -j examples/api_logs.jsonl \
  --filter 'e.level == "ERROR"' \
  --exec 'e.absorb_kv("message")' \
  -F json > errors-export.jsonl

Export Metrics to JSON

Compute metrics and save to file:

kelora -j examples/api_logs.jsonl \
  --filter 'e.level == "ERROR"' \
  --exec '
    track_count("total");
    track_top("service", e.service, 10);
    track_stats("response_time", e.get_path("response_time", 0.0));
  ' \
  --metrics-file incident-metrics.json \
  --silent

Combined: Events + Metrics

Export events to stdout, metrics to file:

kelora -j examples/api_logs.jsonl \
  --filter 'e.level == "ERROR"' \
  --exec 'track_count(e.service)' \
  --metrics-file metrics.json \
  -F json > events.jsonl

Use cases:

  • Forward events to external systems (Elasticsearch, S3)
  • Save metrics for trending/dashboards
  • Share analysis results with team

Common Mistakes

❌ Problem: Forgetting multiline reconstruction loses context

kelora stack-traces.log --filter 'e.line.contains("ERROR")'
# Stack traces are split across events
✅ Solution: Use --multiline with --multiline-join=newline to reconstruct:
kelora stack-traces.log --multiline 'regex:match=^[0-9]{4}-' --multiline-join=newline --filter 'e.line.contains("ERROR")'
Note: --multiline-join=newline preserves line breaks in the grouped stack trace, keeping the structure intact.


❌ Problem: Wrong filter order processes too much data

kelora huge.log --exec 'expensive_transform(e)' --filter 'e.level == "ERROR"'
# Transform runs on ALL events, then filters
✅ Solution: Filter first, then transform:
kelora huge.log --filter 'e.level == "ERROR"' --exec 'expensive_transform(e)'


❌ Problem: Not using safe field access causes crashes

kelora api.log --filter 'e.response_time > 1.0'
# Crashes if response_time field is missing
✅ Solution: Use .get_path() with defaults:
kelora api.log --filter 'e.get_path("response_time", 0.0) > 1.0'


❌ Problem: Span mode incompatible with parallel processing

kelora huge.log --parallel --span 5m --span-close '...'
# Error: --span incompatible with --parallel
✅ Solution: Remove --parallel for span processing:
kelora huge.log --span 5m --span-close '...'


❌ Problem: Time filters on unsorted logs miss events

kelora unsorted.log --since "2024-01-15T10:00:00Z"
# May miss events if timestamps are out of order
✅ Solution: Pre-sort by timestamp or use line-based filtering:
sort -t'"' -k4 unsorted.log | kelora -j --since "2024-01-15T10:00:00Z"


Tips & Best Practices

Pipeline Design Principles

  1. Filter early, transform late: Reduce data volume as soon as possible
  2. Use progressive refinement: Multiple simple filters beat one complex filter
  3. Safe field access: Always use .get_path(field, default) for optional fields
  4. Reconstruct context first: Apply multiline joins before other processing
  5. Combine filters and metrics: Single-pass analysis is more efficient

Composition Patterns

Pattern: Funnel Analysis

# Wide → narrow with metrics at each stage
kelora app.log \
  --exec 'track_count("total")' \
  --filter 'e.level == "ERROR"' \
  --exec 'track_count("errors")' \
  --filter 'e.service == "api"' \
  --exec 'track_count("api_errors")' \
  --metrics

Pattern: Enrich Then Filter

# Add computed fields, then filter on them
kelora api.log \
  --exec 'e.is_slow = e.response_time > 1.0' \
  --exec 'e.is_error = e.status >= 500' \
  --filter 'e.is_slow && e.is_error' \
  --metrics

Pattern: Multi-Dimensional Aggregation

# Track multiple dimensions in one pass
kelora app.log \
  --exec '
    track_count(e.level);
    track_count(e.service);
    track_count(e.level + ":" + e.service);
    track_stats("latency", e.response_time);
  ' \
  --metrics

Performance Optimization

  1. Filter before expensive operations: Parsing, transformations, tracking
  2. Use format-specific options: -j for JSON, -f logfmt for key=value
  3. Leverage parallel mode: For large files without spans: --parallel
  4. Sample huge datasets: --filter 'sample_every(100)' for multi-TB logs
  5. Limit output early: Use --take N to stop after N events

Debugging Pipelines

Check each stage:

# Stage 1: Verify time filtering
kelora app.log --since "2024-01-15T10:00:00Z" --stats

# Stage 2: Add error filtering
kelora app.log --since "2024-01-15T10:00:00Z" --filter 'e.level == "ERROR"' --stats

# Stage 3: Add parsing
kelora app.log --since "2024-01-15T10:00:00Z" --filter 'e.level == "ERROR"' --exec 'e.absorb_kv("line")' -J

# Stage 4: Add metrics
kelora app.log --since "2024-01-15T10:00:00Z" --filter 'e.level == "ERROR"' --exec 'track_count("total")' -m

Use inspect format:

kelora app.log --take 1 -F inspect
# Shows all fields and their types

Reusability

Use Kelora's alias system for reusable pipelines:

# In .kelora.ini
[aliases]
errors = --filter 'e.level == "ERROR"' --exec 'track_count("total"); track_top("service", e.service, 10)' --metrics

Then run: kelora -a errors app.log


Summary

You've learned to compose powerful log analysis pipelines:

  • Section isolation with time filters and service/level filtering
  • Multiline reconstruction for stack traces and context
  • Progressive filtering to refine datasets efficiently
  • Multi-stage transformation to enrich and classify events
  • Metrics tracking for aggregate statistics
  • Span-based rollups for time-window summaries
  • Export at multiple stages for external tools
  • Debugging techniques to verify each pipeline stage

Key composition patterns:

Pattern Use Case Example
Time + Filter + Metrics Incident analysis --since X --filter Y --exec 'track_*()' -m
Multiline + Parse + Drain Stack trace analysis --multiline 'regex:match=...' --drain
Filter + Enrich + Filter Progressive refinement --filter X --exec 'e.field=...' --filter Y
Service + Metrics + Span Per-service rollups --filter 'service==X' --exec 'track_*()' --span 1m
Export + Metrics External + local analysis -F json > file.json --metrics-file m.json

Next Steps

Now that you understand pipeline composition, explore advanced topics:

Related guides: