A quick walkthrough of Logstash, the ETL engine offered by the Elastic Stack.

Logstash is an open source, server-side data processing pipeline that ingests data from a multitude of sources simultaneously, transforms it, and then sends it to your favorite stash

Logstash gained its initial popularity with log and metric collection, such as log4j logs, Apache web logs and syslog. Its application has broadened, to all kinds of data sources like large scale event streams, webhooks, database and message queue integration. Once data is transformed and cleaned up is routed to a final destination (i.e. the stash), Elasticsearch is one option, but lots of other choices are there (mongo, S3, Nagios, IRC, email).

After unpacking the official tarball, to run all you need is a pipeline configuration, such as:

input { stdin { } }
output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

And to run:

bin/logstash -f simple.conf

Simply typing some stdin foo and bar see the following:

foo
{
  "@timestamp" => 2018-12-08T08:16:50.572Z,
        "host" => "bigdatabox",
     "message" => "foo",
    "@version" => "1"
}
bar
{
  "@timestamp" => 2018-12-08T11:55:10.303Z,
        "host" => "bigdatabox",
     "message" => "bar",
    "@version" => "1"
}

Given Elasticsearch was the stash, should see a newly created index by visiting http://localhost:9200/_cat/indices:

yellow open logstash-2018.12.08 FW365UQGTrqUZggbKzV95A 5 1    2 0   9.2kb   9.2kb

The index contains two documents. http://localhost:9200/logstash-2018.12.08/_search shows:

{
  "_index" : "logstash-2018.12.08",
  "_type" : "doc",
  "_id" : "bm-rjWcB1pKAKB8739Ua",
  "_score" : 1.0,
  "_source" : {
    "@timestamp" : "2018-12-08T11:55:10.303Z",
    "host" : "bigdatabox",
    "message" : "bar",
    "@version" : "1"
  }
},
{
  "_index" : "logstash-2018.12.08",
  "_type" : "doc",
  "_id" : "bW_jjGcB1pKAKB87_dWH",
  "_score" : 1.0,
  "_source" : {
    "@timestamp" : "2018-12-08T08:16:50.572Z",
    "host" : "bigdatabox",
    "message" : "foo",
    "@version" : "1"
  }
}

The unit of work in Logstash is the event, and are modelled as documents.

Execution Model

Coined a pipeline (similar to other middleware I’ve delt with), represents one logical flow of (events) data. Pipeline ingest data through inputs (protocol specific), pass them through an internal queue, and then hand them off to workers to process (filter and output). The queue (pub/sub) between the input and workers enables some crazy scale options if useful.

A Logstash instance is the Logstash process itself, and may contain many pipelines.

Queuing and Guaranteed Delivery

Logstash provides a few different queue types, depending on the level of guarantees needed.

The In-Memory Queue is blazing fast however is not durable. The Persistence Queue on the other hand ensures data is written to disk until delivered, but comes with a performance penality.

Logstash provides at least once delivery guarantee, while more conservative, puts the emphasis on message delivery. Practically speaking this equates to exactly once in general use, but in the case of unclean shutdown with the persistence queue for example, a worker section may be re-dispatched resulting in a duplicate message. This highlights the importance of idempotent operations (i.e. operations that can deal with being re-executed with the same message, without causing undesirable side effects).

Dead Letter Queue (DLQ)

Used for storing messages that are undeliverable (i.e. reprocessing them is futile). Logstash can either drop and log, or send them on to the DLQ (for possible future replay).

Configuration

A specific pipeline is defined by creating a conf file, which is yaml based. A section for each category of plugin is available, input plugins, output plugins and filter plugins.

input {
  ...
}

# implicit queue (back pressure etc)

filter {
  ...
}

output {
  ...
}

Global configuration (logstash.yml)

Defines instance level config, which can be defined either in hierarchical form:

pipeline:
  batch:
  size: 125
  delay: 5

Or as flat keys:

pipeline.batch.size: 125
pipeline.batch.delay: 5

For example, setting node.name uniquely id’s a Logstash node, which is particularly useful when using metrics. Can be set to a static value node.name: darthvadar or the hostname node.name: ${HOSTNAME}.

Pipeline configuration (pipelines.yml)

Complimentary to the specific pipeline configuration files, Logstash provides some core configuration in the config dir. Of note is pipelines.yml. Formatted in YAML can define a list of dictionaries, where each dictionary describes a pipelines. Multiple pipelines can be defined, but doesn’t have to. If fact, its still useful to deinfe a single pipeline in this file, as it allows you to invoke the logstash executable without any additional arguments. A single pipeline:

- pipeline.id: ben-test
  queue.type: persisted
  path.config: /home/ben/Downloads/logstash-6.5.2/config/ben-simple.conf

Or many pipelines:

- pipeline.id: test
  pipeline.workers: 1
  pipeline.batch.size: 1
  config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
- pipeline.id: another_test
  queue.type: persisted
  path.config: "/tmp/logstash/*.config"

pipelines.yml has available arguments commented inline within the file. Settings relate to not only the functional aspects of pipeline configurations, but also non-functional concerns around the execution model, such as the queue type or the number of workers.

Pipeline Examples

JDBC to Elasticsearch

input {
  jdbc {
    jdbc_driver_library => "/opt/sqljdbc_6.4/enu/mssql-jdbc-6.4.0.jre8.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://odin:1433;databaseName=FooDb"
    jdbc_user => "logstash"
    jdbc_password => "mypassword"
    jdbc_validate_connection => true
    jdbc_statement => "SELECT Id, FamilyName, GivenName, EmailAddress, IsActive, CreatedBy, CreatedDate, ModifiedBy, ModifiedDate FROM Customers WHERE ModifiedDate > :sql_last_value ORDER BY ModifiedDate ASC"
    schedule => "* * * * *"
    use_column_value => true
    tracking_column => "modifieddate" #lower_case_column_names are lower
    tracking_column_type => ["timestamp"]
    clean_run => false
    record_last_run => true
    lowercase_column_names => true
  }
}

filter {
  mutate {
    remove_field => ["@version", "path", "host", "message", "tags", "@timestamp"]
  }
}

output {
  stdout { codec => "dots" }
  elasticsearch {
    hosts => [ "elasticcluster:9200" ]
    index => [ "customers-%{+yyyy.MM.dd}" ]
    user => "elastic"
    password => "mypassword"
    document_id => "%{id}"
  }
}

This showcases some useful JDBC input plugin features:

  • tracking_column uses this field in the result set to keep track of the last ingested record. Only tuples more recent than the last will be ingested.
  • statement two things, (1) notice the ORDER BY clause which is essential because using the tracking_column which keeps a journal of the latest ModifiedDate it comes across - only records with a later ModifiedDate are ingested, and (2) the :sql_last_value bind variable which restricts the query to only records that have changed in the source database since logstash last ingested.
  • record_last_run the log of the latest ModifiedDate discovered by Logstash so far. By default stored in the $HOME of the account running Logstash, in a small log file called .logstash_jdbc_last_run is used to track the value of the last tracked_column field.

Mapping flat source to hierarchical JSON structure

Look to the mutate filter.

mutate {
  rename => {
    "deviceip" => "[IP][device]"
    "srcip" => "[IP][source]"
    "dstip" => "[IP][destination]"
  }
}

Resources