Using OpenSearch to monitor client's state in real time

Initial challenge: We want to monitor client's state in real time. It is possible to solve this challenge using OpenSearch stack.

We have a data we want to analyse in csv format. The architecture could look like this:


For example, our application continuously writes data into the csv file. We are using Logstash to stream this data to our OpenSearch nodes, and later use OpenSearch dashboards to visualize and analyse the data. And since Logstash is designed to continuously stream data, it allows us to get an insight into the client's state in real time.

What is OpenSearch?

OpenSearch is an open-source software designed for search, observability, analytics and visualisation. OpenSearch allows us to perform full-text search on our data, search by fields, sort and aggregate. Then we can visualize our data with OpenSearch Dashboards. OpenSearch Dashboards is an open-source tool for real-time application data visualisation and monitoring, with the help of various graphs and charts we can observe trends and patterns in our data and act accordingly.

What is Logstash?

Logstash is an open source data collection engine with real-time pipelining capabilities. With logstash we can ingest data to our preferable destination from variety of sources. It is also possible to transform data during the transfer to prepare it for future visualisation.

Those technologies allow us to monitor client state in real-time based on our data.


OpenSearch is a distributed system. It means that we interact with clusters, which consist of nodes. One node represent a server that store our data and process our request. For our demo we are going to create a cluster with three nodes: two for the OpenSearch itself, and one more for OpenSearch Dashboards.

To deploy our demo cluster we are going to use sample docker-compose file provided by OpenSearch documentation with a few modifications. We are adding a Logstash container to the docker-compose which will be responsible for data ingestion.

1 version: '3'
2 services:
3  opensearch-node1: # This is also the hostname of the container within the Docker network (i.e. <https://opensearch-node1/>)
4    image: opensearchproject/opensearch:latest
5    container_name: opensearch-node1
6    environment:
7      - # Name the cluster
8      - # Name the node that will run in this container
9      - discovery.seed_hosts=opensearch-node1,opensearch-node2 # Nodes to look for when discovering the cluster
10      - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2 # Nodes eligibile to serve as cluster manager
11      - bootstrap.memory_lock=true # Disable JVM heap memory swapping
12      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # Set min and max JVM heap sizes to at least 50% of system RAM
13    ulimits:
14      memlock:
15        soft: -1 # Set memlock to unlimited (no soft or hard limit)
16        hard: -1
17      nofile:
18        soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536
19        hard: 65536
20    volumes:
21      - opensearch-data1:/usr/share/opensearch/data # Creates volume called opensearch-data1 and mounts it to the container
22    ports:
23      - 9200:9200 # REST API
24      - 9600:9600 # Performance Analyzer
25    networks:
26      - opensearch-net # All of the containers will join the same Docker bridge network
27  opensearch-node2:
28    image: opensearchproject/opensearch:latest # This should be the same image used for opensearch-node1 to avoid issues
29    container_name: opensearch-node2
30    environment:
31      -
32      -
33      - discovery.seed_hosts=opensearch-node1,opensearch-node2
34      - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
35      - bootstrap.memory_lock=true
36      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
37    ulimits:
38      memlock:
39        soft: -1
40        hard: -1
41      nofile:
42        soft: 65536
43        hard: 65536
44    volumes:
45      - opensearch-data2:/usr/share/opensearch/data
46    networks:
47      - opensearch-net
48  opensearch-dashboards:
49    image: opensearchproject/opensearch-dashboards:latest # Make sure the version of opensearch-dashboards matches the version of opensearch installed on other nodes
50    container_name: opensearch-dashboards
51    ports:
52      - 5601:5601 # Map host port 5601 to container port 5601
53    expose:
54      - "5601" # Expose port 5601 for web access to OpenSearch Dashboards
55    environment:
56      OPENSEARCH_HOSTS: '["<https://opensearch-node1:9200","https://opensearch-node2:9200"]'> # Define the OpenSearch nodes that OpenSearch Dashboards will query
57    volumes:
58      - ./opensearch_dashboards.yml:/usr/share/opensearch-dashboards/config/opensearch_dashboards.yml
59    networks:
60      - opensearch-net
61  logstash:
62    image: opensearchproject/logstash-oss-with-opensearch-output-plugin:latest
63    container_name: logstash
64    volumes:
65      - ~/Downloads/opensearch-playground/logstash:/usr/share/logstash/pipeline
66      - ~/Downloads/opensearch-playground/data:/usr/share/cussas-data
67      - ~/Downloads/opensearch-playground/logstash.yml:/usr/share/logstash/config/logstash.yml
68    networks:
69      - opensearch-net # We deploy it to the same network as our OpeanSearch nodes
71 volumes:
72  opensearch-data1:
73  opensearch-data2:
75 networks:
76  opensearch-net:

We have added Logstash to it and deployed it to the same network. We also need to mount some volumes to the Logstash container:

  1. logstash.conf - here we will describe the configuration of our pipeline
    1 input {
    2    file {
    3        path => "/usr/share/cussas-data/results.csv"
    4        start_position => beginning
    5        sincedb_path =>  "NUL"
    6    }
    7 }
    8 filter {
    9    csv {
    10        columns => [
    11                "Action",
    12                "Action_CLV",
    13                "CLV_Segment",
    14                "CLV",
    15                "Cohort",
    16                "Period",
    17                "Client ID",
    18                "First order date",
    19                "Last order date",
    20                "Transaction count",
    21                "Client total sum",
    22                "Average order sum",
    23                "Dormant time, days",
    24                "Lifetime,  days",
    25                "Interaction time,  days",
    26                "RFM",
    27                "ABC-group",
    28                "Order frequency",
    29                "Expected order time, days",
    30                "Expected order time, months",
    31                "Lifetime,  months",
    32                "Interaction time,  months",
    33                "Time between 1 and 2 orders, days",
    34                "Dormant time,  months",
    35                "Lifecycle segment",
    36                "Previous lifecycle segment",
    37                "RFM-segment",
    38                "Date"
    40        ]
    41        separator => ";"
    42        }
    43    mutate {
    44        remove_field => [ "message", "path" ]
    45        convert => {
    46            "CLV" => "float"
    47            "Cohort" => "integer"
    48            "Transaction count" => "integer"
    49            "Client total sum" => "integer"
    50            "Average order sum" => "float"
    51            "Dormant time, days" => "integer"
    52            "Lifetime,  days" => "integer"
    53            "Interaction time,  days" => "integer"
    54            "RFM" => "integer"
    55            "Order frequency" => "float"
    56            "Expected order time, days" => "float"
    57            "Expected order time, months" => "float"
    58            "Lifetime,  months" => "integer"
    59            "Interaction time,  months" => "integer"
    60            "Time between 1 and 2 orders, days" => "integer"
    61            "Dormant time,  months" => "integer"
    62        }
    63    }
    64    date {
    65        match => ["Period", "MM/dd/YYYY"]
    66        target => "Period"
    67    }
    68    date {
    69        match => ["Date", "MM/dd/YYYY"]
    70        target => "Date"
    71    }
    72    date {
    73        match => ["First order date", "MM/dd/YYYY"]
    74        target => "First order date"
    75    }
    76    date {
    77        match => ["Last order date", "MM/dd/YYYY"]
    78        target => "Last order date"
    79    }
    80 }
    81 output {
    82    stdout
    83    {
    84        codec => rubydebug
    85    }
    86    opensearch {
    87        action => "index"
    88        hosts => ["<https://opensearch-node1:9200",> "<https://opensearch-node2:9200"]>
    89        index => "cussas-data"
    90        user => "admin"
    91        password => "admin"
    92        ssl => true
    93        ssl_certificate_verification => false
    94    }
    95 }

    logstash.conf is divided into 3 sections: input, filter, output.


    In this section we define the input source for our pipeline. In our case we specify that we would like to use file as a source. We provide absolute path to the file, preferable starting position and the location where logstash will store the info about last read line.


    Here we use CSV plugin and explicitly mention all the columns and separator for our data. Next with the mutate plugin we remove extra fields added by Logstash, since we do not need them in our case. By default, all the columns are treated as strings and if we want to use some of them, for example, for aggregation we need to convert some fields to proper data types. We do this by using convert. And lastly, we convert the fields that contain date by using date plugin with the appropriate format.


    In the output section we specify our destination, in our case OpenSearch. stdout section is for debugging.

  2. logstash.yml - here we are disabling ecs_compatibility to get rid of some of redundant fields.
    1 ""
    2 pipeline.ecs_compatibility: disabled

    Since Logstash 8.0 this setting is on by default, we disable it here to remove the field which duplicates the message send to add data to OpenSearch.

  3. results.csv - is our csv file with the data we want to use

Now we can run docker to start our cluster:

docker-compose -f docker-compose.yml up

Working with OpenSearch console

Everything should be up and running now. Firstly, let's verify that our data is ingested to OpenSearch:

Open http://localhost:5601/ in browser and login.

In the console execute request the following to see that our index is created:

	GET _cat/indices

In the response we should see the index with the name we specified previously. The response could look like this:

1 green open security-auditlog-2022.12.15    4xhB1Et6RN6dA9IiJKPGlg 1 1    30 0 366.2kb 174.9kb
2 green open security-auditlog-2022.12.23    _d_9m8jGTcam9iC2UWRhDQ 1 1   894 0   2.7mb   1.4mb
3 green open cusaas-data                     6ll_TX8xTLWz6t1Q1LnH8g 1 1 20063 0  12.5mb   6.3mb
4 green open security-auditlog-2022.12.14    7Ku2-bj3TWWVb3d7fmvmSg 1 1   895 0   2.8mb   1.4mb
5 green open security-auditlog-2022.12.24    kczt0RDmSD-rerojNkeTaQ 1 1   462 0   1.4mb 735.6kb
6 green open security-auditlog-2022.12.30    5yEJ4Hw8SxyfcaqNLw4xww 1 1    12 0 200.6kb 166.5kb
7 green open security-auditlog-2022.12.21    CRIdhzm_TnaMHKOnc5X5zw 1 1    68 0 245.6kb 100.8kb

Execute simple search request:

1 GET cusaas-data/_search/
2 {
3     "query": {
4         "match": {
5             "Lifetime,  days": 20
6         }
7     }
8 }

Here we are searching for the data where the field Lifetime, days is equal to 20. We can see the ingested data in response:

1 {
2  "took" : 11,
3  "timed_out" : false,
4  "_shards" : {
5    "total" : 1,
6    "successful" : 1,
7    "skipped" : 0,
8    "failed" : 0
9  },
10  "hits" : {
11    "total" : {
12      "value" : 18,
13      "relation" : "eq"
14    },
15    "max_score" : 1.0,
16    "hits" : [
17      {
18        "_index" : "cussas-data",
19        "_id" : "XIaSU4UB-xSWDumAfiFp",
20        "_score" : 1.0,
21        "_source" : {
22          "CLV_Segment" : null,
23          "Interaction time,  days" : 1204,
24          "Cohort" : 223,
25          "Order frequency" : 4.5,
26          "Dormant time,  months" : 38,
27          "Previous lifecycle segment" : "churn ",
28          "RFM-segment" : null,
29          "Dormant time, days" : 1184,
30          "Expected order time, days" : 10.0,
31          "Interaction time,  months" : 39,
32          "host" : "f97451f2daab",
33          "RFM" : null,
34          "Period" : "2021-12-01T00:00:00.000Z",
35          "Lifetime,  days" : 20,
36          "Transaction count" : 3,
37          "Client total sum" : 645,
38          "Last order date" : "2018-09-04T00:00:00.000Z",
39          "Action" : null,
40          "Average order sum" : 215.073333333333,
41          "Action_CLV" : null,
42          "Date" : "2021-12-01T00:00:00.000Z",
43          "ABC-group" : "A",
44          "Expected order time, months" : 0.0,
45          "First order date" : "2018-08-15T00:00:00.000Z",
46          "Time between 1 and 2 orders, days" : 16,
47          "@version" : "1",
48          "Lifecycle segment" : "churn",
49          "@timestamp" : "2022-12-27T12:33:05.679116927Z",
50          "Client ID" : "D507618",
51          "Lifetime,  months" : 0,
52          "CLV" : null
53        }
54      },
55      .......


To visualize the data first we need to create an index pattern for our index. This is needed to match to our indexes by name, this way we can visualize data from different indexes, for example, if we have data from different years in the separate indexes with year suffix. (index_name_2020, index_name_2021 etc. can be used with index pattern index_name* ). Then we can proceed with the desired visualisation.


Average dormant time

Multiple orders

Customer by group

First orders by month

Active customers area

