In this post I will show how to query data from Wazuh indexer using Python’s requests module and some analysis examples with Pandas library.

If you are not familiar with Wazuh it’s an open source security platform. It has really great search and visualization capabilities and usually this kind of direct quering of the Indexer is not necessary, but there could be situations where it might still be handy.

Wazuh indexer is a open source fork from OpenSearch, so you can pretty much check OpenSearch’s documentation for reference, and also Elasticsearch’s to some extent.

Quering and analyzing Windows logon events

Here’s an example of how to query Windows logon events from the Wazuh archives.

Wazuh archives are not enabled by default and you can find instructions to enable it from here.

First, we will create a file creds.json that will have the credentials we use to authenticate with wazuh indexer. Content of the JSON file should look like this:

{"username": "<username>", "password": "<password>"}

Then we can import some needed moduls and load the creds.json.

import requests
import json
import pandas as pd
from requests.auth import HTTPBasicAuth
from datetime import datetime

with open('creds.json') as f:
    data = json.load(f)
    user = data['username']
    password = data['password']

Next, we define a query to search for Windows logon events (4624).

# maximum number of results to return
result_max = 1000
query = {"from" : 0, "size" : result_max,
         "query":
          {
            "match":{"data.win.system.eventID": "4624"}
          }
        }

Next, we define the target index or indices for the query. Note that wildcards are supported like in the example. We also set base address for the Wazuh indexer.

target_index = "wazuh-archives-4.x-2024.02.*"
# Wazuh indexer URI e.g. "https://my-server.local:9200"
wazuh_server = https://127.0.0.1:9200

The base structure for Wazuh’s archive indices is wazuh-alerts-<version>.x-<date>, so in my above example, I’m targeting all the archive indices from february.

Next, we are ready to execute the actual query.

r = requests.post(f"{wazuh_server}/{target_index}/_search", json=query, verify=False, auth=HTTPBasicAuth(user,password))

Note that I have verify=False set as I’m doing queries locally over localhost address. You should use verified TLS connections with remote connections.

Verifying results

Let’s verify that we got some results. Actual result data is found in JSON path _source.hits.hits.data. We can first check how many hits there was in total.

print(r.json()['hits']['total']['value'])
    318

We can also verify that we have the data for each hit. This could differ if we wouldn’t have defined from/size in our query, but with limit set in 1000 we should get all the data.

print(len(r.json()['hits']['hits']))
    318

Analyzing results

Now that we know there was some data found we can do some analysis. Below code loops over the results and prints out time, username and computer of the event.

I have break set at the end just to see one example output, but removing the break would result printing all the results.

for hit in r.json()['hits']['hits']:
    win_event_data = hit['_source']['data']['win']['eventdata']
    win_system_data = hit['_source']['data']['win']['system']
    print(win_system_data['systemTime'])
    print(win_event_data['targetUserName'])
    print(win_system_data['computer'])
    break
    2024-02-20T12:15:01.2249938Z
    SYSTEM
    EXAMPLE-MACHINE1

With Windows’ logons the logon type is usually something you want to check so lets add that.

for hit in r.json()['hits']['hits']:
    win_event_data = hit['_source']['data']['win']['eventdata']
    win_system_data = hit['_source']['data']['win']['system']
    print(win_system_data['systemTime'])
    print(win_event_data['targetUserName'])
    print(win_event_data['logonType'])
    print(win_system_data['computer'])
    break
   2024-02-20T12:15:01.2249938Z
   SYSTEM
   5
   EXAMPLE-MACHINE1

We can do a quick mapper dictionary to enrich the logon type from number to string that tells more about the actual logon type.

logon_types = {
    "2":  "Interactive",
    "3": "Network",
    "4": "Batch",
    "5": "Service",
    "7": "Unlock",
    "8": "NetworkClearText",
    "9": "NewCredentials",
    "10": "RemoteInterActive",
    "11": "CachedInteractive"
}

Now, let’s do another loop, but create a list of dicts with all the results. Each dict will have the following data of one event:

{"date": "<date>", "computer": "<hostname>", "username": "<target user>", "logontype": "<logon type>", "logontype_descr": "<logon type description>"}
logons = []
for hit in r.json()['hits']['hits']:
    
    win_event_data = hit['_source']['data']['win']['eventdata']
    win_system_data = hit['_source']['data']['win']['system']
    # Put logon type to own variable and then use that with the mapper
    win_logon_type = win_event_data['logonType']
    win_dt = datetime.strptime(win_system_data['systemTime'][:-4], '%Y-%m-%dT%H:%M:%S.%f')
    _logon = {
        "date": win_dt,
        "computer": win_system_data['computer'],
        "username": win_event_data['targetUserName'],
        "logontype": win_logon_type,
        "logontype_descr": logon_types[win_logon_type]
    }
    logons.append(_logon) 

Here’s the first event from the list as an example. I changed systemTime from string to datetime object, so it’s possible to filter data based on time.

print(logons[0])
    {'date': datetime.datetime(2024, 2, 20, 12, 15, 1, 224900), 'computer': 'EXAMPLE-MACHINE1', 'username': 'SYSTEM', 'logontype': '5', 'logontype_descr': 'Service'}

Now we have the main information from the Logon events neatly structured. We can put the data into a Panda’s data frame for further analysis.

df = pd.DataFrame(logons)

Let’s check all the events where the logon type is not 5 (Service).

df[(df['logontype'] != '5')]
date computer username logontype logontype_descr
45 2024-02-08 16:26:23.383800 EXAMPLE-MACHINE2 exampleuser1 7 Unlock
62 2024-02-08 18:29:36.678200 EXAMPLE-MACHINE2 DWM-2 2 Interactive
76 2024-02-08 16:26:23.282300 EXAMPLE-MACHINE2 exampleuser1 11 CachedInteractive
94 2024-02-08 18:29:36.600500 EXAMPLE-MACHINE2 UMFD-2 2 Interactive
95 2024-02-08 18:29:36.678200 EXAMPLE-MACHINE2 DWM-2 2 Interactive
171 2024-02-24 15:00:11.339700 EXAMPLE-MACHINE1 exampleuser2 11 CachedInteractive
228 2024-02-24 15:00:11.399500 EXAMPLE-MACHINE1 exampleuser2 7 Unlock
290 2024-02-19 21:10:09.815100 EXAMPLE-MACHINE2 UMFD-2 2 Interactive
291 2024-02-19 09:20:40.668400 EXAMPLE-MACHINE1 exampleuser2 11 CachedInteractive
292 2024-02-19 09:20:40.720900 EXAMPLE-MACHINE1 exampleuser2 7 Unlock
312 2024-02-19 21:10:09.887700 EXAMPLE-MACHINE2 DWM-2 2 Interactive
313 2024-02-19 21:10:09.887700 EXAMPLE-MACHINE2 DWM-2 2 Interactive

We can check the total count for different logon types.

pd.DataFrame(df['logontype_descr'].value_counts())
count logontype_descr
Service 306
Interactive 6
Unlock 3
CachedInteractive 3

It could be interesting to search for logon events in between specific hours. To do this, I’ll will first change the index of the data frame to the date column.

df = df.set_index('date')

Now we can use between_time function to search between specific hours.

df.between_time("17:00", "8:00")
date computer username logontype logontype_descr
2024-02-08 17:01:07.405700 EXAMPLE-MACHINE2 SYSTEM 5 Service
2024-02-08 18:02:14.302700 EXAMPLE-MACHINE2 SYSTEM 5 Service
2024-02-08 18:02:24.544600 EXAMPLE-MACHINE2 SYSTEM 5 Service
2024-02-08 18:03:46.177100 EXAMPLE-MACHINE2 SYSTEM 5 Service
2024-02-08 18:04:26.899100 EXAMPLE-MACHINE2 SYSTEM 5 Service
... ... ... ... ...
2024-02-19 21:10:09.887700 EXAMPLE-MACHINE2 DWM-2 2 Interactive
2024-02-19 21:09:22.045200 EXAMPLE-MACHINE2 SYSTEM 5 Service
2024-02-19 21:10:00.808600 EXAMPLE-MACHINE2 SYSTEM 5 Service
2024-02-19 21:10:14.969700 EXAMPLE-MACHINE2 SYSTEM 5 Service
2024-02-19 21:10:14.837200 EXAMPLE-MACHINE2 SYSTEM 5 Service

Quering and analyzing vulnerability status

This example shows how to query vulnerability events from Wazuh alerts and then filter down the results to get situational picture of the vulnerability status in you environment.

I have to mention that Wazuh has really nice pre-built dashboards for vulnerability status, so mostly get this type of information out of the box. Only limitation is that it limits pre-built visualization per host (agent), so you don’t really get “big picture” dashboards without building one yourself.

I think this kind of requests+Pandas approach can currently allow better vulnerability status analysis for the whole environment instead of one host. There’s also some deduplication needed for the events which might be hard to do directly within the Wazuh indexer.

In this example, we are doing a similar query as before. The main difference is that we are now targeting wazuh-alerts* indices and match events where field data.vulnerability.status is Active or Solved.

Below is the full code which is mostly same as the previous example. It also uses creds.json file for authentication.

import requests
import json
import pandas as pd
from requests.auth import HTTPBasicAuth
from datetime import datetime
from dateutil.relativedelta import relativedelta 

target_index = "wazuh-alerts-4.x-2024.02*"
wazuh_server = "https://127.0.0.1:9200"

with open('creds.json') as f:
    data = json.load(f)
    user = data['username']
    password = data['password']

# maximum number of results to return
result_max = 1000
query = {"from" : 0, "size" : result_max,
         "query":
         {"bool":
          {"should":
           [
               {"match":{"data.vulnerability.status": "Active"}},
               {"match":{"data.vulnerability.status": "Solved"}}
           ]
          }
         }
        }


r = requests.post(f"{wazuh_server}/{target_index}/_search", json=query, verify=False, auth=HTTPBasicAuth(user,password))

vulns = []
for hit in r.json()['hits']['hits']:
    vuln_data = hit['_source']['data']['vulnerability']
    vuln = {
        "timestamp": datetime.strptime(hit['_source']['@timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ'),
        "affected": hit['_source']['agent']['name'],
        "CVE": vuln_data['cve'],
        "package": vuln_data['package']['name'],
        "status": vuln_data['status']
    }
    vulns.append(vuln)
df = pd.DataFrame(vulns)
df = df.set_index('timestamp')

Now we have the results in a data frame that is indexed by the timestamp and results look like this.

affected CVE package status
timestamp
2024-02-24 05:04:30.904 testmachine1 CVE-2024-25629 libc-ares2 Active
2024-02-24 15:01:12.572 testmachine2 CVE-2024-21341 Windows 10 Active
2024-02-24 15:01:12.595 testmachine2 CVE-2024-21338 Windows 10 Active
2024-02-24 19:17:36.220 testmachine1 CVE-2023-42117 exim4-base Active
2024-02-24 19:17:37.211 testmachine1 CVE-2021-3981 grub-common Active
... ... ... ... ...
2024-02-13 18:01:53.392 testmachine1 CVE-2023-5679 bind9-dnsutils Active
2024-02-13 18:01:53.475 testmachine1 CVE-2023-5679 bind9-host Active
2024-02-13 18:01:58.236 testmachine1 CVE-2023-4408 bind9-dnsutils Active
2024-02-13 18:01:58.405 testmachine1 CVE-2023-4408 bind9-libs Active
2024-02-13 18:01:58.630 testmachine1 CVE-2023-50387 bind9-dnsutils Active

At this point we might have duplicate data that does not give very clear picture of the environments vulnerability status. Wazuh will report same vulnerability during the every scan if it’s not fixed so “duplicate” events may accumalate.

We can get better situational picture if we order the events by timestamp and then remove duplicates based on fields “CVE” and “affected”. After this we have last events where “CVE” and “affected” fields are unique so effectively we get the latest status per vulnerability by host.

df = df.sort_values('timestamp').drop_duplicates(['CVE','affected'], keep='last')

Now we can futher filter the results to see events where the vulnerability status is still “Active”.

df[(df['status'] == 'Active')]
affected CVE package status
timestamp
2024-02-16 18:23:06.593 testmachine1 CVE-2024-22667 xxd Active
2024-02-24 05:04:30.904 testmachine1 CVE-2024-25629 libc-ares2 Active

You can also check statistics for solved events within a specific period. Here’s an example to get results for the last month without hard coded dates.

now = datetime.now()
now_date = now.strftime('%Y-%m-%d')
last_month_date = (now + relativedelta(months=-1)).strftime('%Y-%m-%d')
solved = df[(df['status'] == 'Solved')].loc[last_month_date:now_date]['CVE'].count()
print(f"{solved} vulnerabilities were resolved during the last month.")
    234 vulnerabilities were resolved during the last month.

You can also check amount of “Active” vulnerabilities per host with hosts that have at least one “Active” vulnerability.

pd.DataFrame(df[(df['status'] == 'Active')]['affected'].value_counts())
count
affected
testmachine1 2

Query file integrity monitoring data and analyze it with Virus Total API

This example shows how to query virus total api with new files added to machines monitored by Wazuh’s file integrity module.

Wazuh has virus total integration available, but with free API key its capabilities are quite limited (4 requests per minute).

We will first do a similar query as before and construct a data frame from the results. This query targets added files from syscheck events.

Here creds.json should look like this:

{"username": "<username>", "password": "<password>", "vtapikey": "<Virustotal API key>"}
import requests
import json
import time
import pandas as pd
from requests.auth import HTTPBasicAuth
from datetime import datetime
from dateutil.relativedelta import relativedelta 

target_index = "wazuh-alerts-4.x-2024.02*"
wazuh_server = "https://127.0.0.1:9200"

with open('creds.json') as f:
    data = json.load(f)
    user = data['username']
    password = data['password']
    vtapikey = data['vtapikey']

# maximum number of results to return
result_max = 1000
query = {"from" : 0, "size" : result_max,
         "query":
         {"bool":
          {"must":
           [
               {"match":{"rule.groups": "syscheck_file"}},
               {"match": {"syscheck.event": "added"}}
           ]
          }
         }
        }


r = requests.post(f"{wazuh_server}/{target_index}/_search", json=query, verify=False, auth=HTTPBasicAuth(user,password))

file_added_events = []
for hit in r.json()['hits']['hits']:
    fim_data = hit['_source']['syscheck']
    _base_dict = {
        "agent": hit['_source']['agent']['name'],
        "timestamp": datetime.strptime(hit['_source']['@timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ')
    }
    _file_event = _base_dict | fim_data
    file_added_events.append(_file_event)
df = pd.DataFrame(file_added_events)
df = df.set_index('timestamp')

Now we have the data in a data frame that is indexed by the timestamp and results look like this.

df
agent uname_after mtime_after size_after gid_after mode path sha1_after gname_after uid_after perm_after event md5_after sha256_after inode_after
timestamp
2024-02-24 21:30:12.739 testmachine1 root 2024-01-31T23:14:09 7039552 0 scheduled /boot/vmlinuz-5.10.0-28-amd64 f8f4abe111a413b87d3c7926dbe86ed3b4fcb403 root 0 rw-r--r-- added 297b6a92dec82d8c36072a246f2ce611 99d46c742f33117b1a30fb487005f1cb319c4f16a7ad0b... 5519275
2024-02-24 21:30:17.354 testmachine1 root 2024-01-31T23:14:09 236364 0 scheduled /boot/config-5.10.0-28-amd64 b29a61e9d5aded5a2e2560057939b14dd10d94b7 root 0 rw-r--r-- added 9695055abf02a617e8cd06c9ef4dad78 7a5d1191b15eb1f4b0e8a62149e3ec710273b35af7d8fe... 5519237
2024-02-24 21:30:17.355 testmachine1 root 2024-01-31T23:14:09 83 0 scheduled /boot/System.map-5.10.0-28-amd64 5c06ff9db57a64ec70c6a8b71a0cd23c4de6b5f4 root 0 rw-r--r-- added e0a6c09212a19ed48183a052eab847e7 f47b4c00d87f4b5378487972ded9d447074d4771bea98a... 5519225
2024-02-24 21:30:12.427 testmachine1 root 2024-02-24T20:57:47 30051389 0 scheduled /boot/initrd.img-5.10.0-28-amd64 a561f6c201b6359d3a6d90a9c0c23523d30b3925 root 0 rw-r--r-- added 33cb59a129eeb2916791ead278d60ea0 4fef4d48f2096d8e7f98043d411e3f5a2fd816fb1933e6... 5520765

Below is a function that will parse the results from the VT API. It creates a dictionary like this per file:

{
    # Set to "yes" if any of the engines has flagged the checksum
    "malicious": "<yes/no>",
    # Set to yes if checksum was found in VT (response not 404)
    "found_in_vt": "<yes/no>", 
    # Engine that flagged the checksum. One key per engine.
    "<engine>": "<category>"
}
def vt_result_analyze(res,status_code):
    results = {"malicious": "no", "found_in_vt": "yes"}
    if status_code == 404:
        results['found_in_vt'] = "no"
        return results
    analysis_results = res['data']['attributes']['last_analysis_results']
    for engine in analysis_results:
        _category = analysis_results[engine]['category']
        if _category not in ["undetected", "type-unsupported"]:
            results['malicious'] = "yes"
            results[engine] = _category
    return results

Here we will loop over the values in sha256_after column and call https://www.virustotal.com/api/v3/files/<checksum> endpoint for each hash.

Loop will create a dict where result of the vt_result_analyze function is combined with a dict {"sha256_after": "<hash>"} and those are appended to a list.

We are using the same column name sha256_after, so we can combine the original data with results from the VT API.

The code will sleep for a minute after four API requests because of the VT API’s limitations with a free API key.

headers = {"x-apikey": vtapikey}
vt_results = []
req_n = 1
for hash in df['sha256_after']:
    if req_n == 4:
        print("Waiting for 60 seconds...")
        time.sleep(60)
        req_n = 1
    r = requests.get(f"https://www.virustotal.com/api/v3/files/{hash}", headers=headers)
    if r.status_code not in [200,404]:
        print(r.text, str(r.status_code))
        break
    vt_res = vt_result_analyze(r.json(), r.status_code)
    # Add column with the same name as in in df so we can merge based on this column
    vt_results.append({"sha256_after": hash}|vt_res)
    req_n += 1

Now we have all the results from VT in vt_results variable.

Next, we will turn that into a data frame and combine it with the original data frame base on the sha256_aftercolumns.

vtdf = pd.DataFrame(vt_results)
dfinal = df.merge(vtdf, on='sha256_after')

Now we have the original file integrity data combined with virus total data in dfinal variable.

dfinal
agent uname_after mtime_after size_after gid_after mode path sha1_after gname_after uid_after perm_after event md5_after sha256_after inode_after malicious found_in_vt Trapmine
0 testmachine1 root 2024-01-31T23:14:09 7039552 0 scheduled /boot/vmlinuz-5.10.0-28-amd64 f8f4abe111a413b87d3c7926dbe86ed3b4fcb403 root 0 rw-r--r-- added 297b6a92dec82d8c36072a246f2ce611 99d46c742f33117b1a30fb487005f1cb319c4f16a7ad0b... 5519275 yes yes malicious
1 testmachine1 root 2024-01-31T23:14:09 236364 0 scheduled /boot/config-5.10.0-28-amd64 b29a61e9d5aded5a2e2560057939b14dd10d94b7 root 0 rw-r--r-- added 9695055abf02a617e8cd06c9ef4dad78 7a5d1191b15eb1f4b0e8a62149e3ec710273b35af7d8fe... 5519237 no no NaN
2 testmachine1 root 2024-01-31T23:14:09 83 0 scheduled /boot/System.map-5.10.0-28-amd64 5c06ff9db57a64ec70c6a8b71a0cd23c4de6b5f4 root 0 rw-r--r-- added e0a6c09212a19ed48183a052eab847e7 f47b4c00d87f4b5378487972ded9d447074d4771bea98a... 5519225 no yes NaN
3 testmachine1 root 2024-02-24T20:57:47 30051389 0 scheduled /boot/initrd.img-5.10.0-28-amd64 a561f6c201b6359d3a6d90a9c0c23523d30b3925 root 0 rw-r--r-- added 33cb59a129eeb2916791ead278d60ea0 4fef4d48f2096d8e7f98043d411e3f5a2fd816fb1933e6... 5520765 no no NaN

Funnily enough Trapmine engine flagged the new Linux kernel as malicious. I checked the details from virus total and this was based on Malicious.high.ml.score, so some trustworthy ML analysis. :)