Query and analyze Wazuh indexer data with Python
In this post I will show how to query data from Wazuh indexer using Python’s requests module and some analysis examples with Pandas library.
If you are not familiar with Wazuh it’s an open source security platform. It has really great search and visualization capabilities and usually this kind of direct quering of the Indexer is not necessary, but there could be situations where it might still be handy.
Wazuh indexer is a open source fork from OpenSearch, so you can pretty much check OpenSearch’s documentation for reference, and also Elasticsearch’s to some extent.
Quering and analyzing Windows logon events
Here’s an example of how to query Windows logon events from the Wazuh archives.
Wazuh archives are not enabled by default and you can find instructions to enable it from here.
First, we will create a file creds.json
that will have the credentials we use to authenticate with wazuh indexer. Content of the JSON file should look like this:
{"username": "<username>", "password": "<password>"}
Then we can import some needed moduls and load the creds.json
.
import requests
import json
import pandas as pd
from requests.auth import HTTPBasicAuth
from datetime import datetime
with open('creds.json') as f:
data = json.load(f)
user = data['username']
password = data['password']
Next, we define a query to search for Windows logon events (4624).
# maximum number of results to return
result_max = 1000
query = {"from" : 0, "size" : result_max,
"query":
{
"match":{"data.win.system.eventID": "4624"}
}
}
Next, we define the target index or indices for the query. Note that wildcards are supported like in the example. We also set base address for the Wazuh indexer.
target_index = "wazuh-archives-4.x-2024.02.*"
# Wazuh indexer URI e.g. "https://my-server.local:9200"
wazuh_server = https://127.0.0.1:9200
The base structure for Wazuh’s archive indices is wazuh-alerts-<version>.x-<date>
, so in my above example, I’m targeting all the archive indices from february.
Next, we are ready to execute the actual query.
r = requests.post(f"{wazuh_server}/{target_index}/_search", json=query, verify=False, auth=HTTPBasicAuth(user,password))
Note that I have verify=False
set as I’m doing queries locally over localhost address. You should use verified TLS connections with remote connections.
Verifying results
Let’s verify that we got some results. Actual result data is found in JSON path _source.hits.hits.data
. We can first check how many hits there was in total.
print(r.json()['hits']['total']['value'])
318
We can also verify that we have the data for each hit. This could differ if we wouldn’t have defined from
/size
in our query, but with limit set in 1000 we should get all the data.
print(len(r.json()['hits']['hits']))
318
Analyzing results
Now that we know there was some data found we can do some analysis. Below code loops over the results and prints out time, username and computer of the event.
I have break
set at the end just to see one example output, but removing the break
would result printing all the results.
for hit in r.json()['hits']['hits']:
win_event_data = hit['_source']['data']['win']['eventdata']
win_system_data = hit['_source']['data']['win']['system']
print(win_system_data['systemTime'])
print(win_event_data['targetUserName'])
print(win_system_data['computer'])
break
2024-02-20T12:15:01.2249938Z
SYSTEM
EXAMPLE-MACHINE1
With Windows’ logons the logon type is usually something you want to check so lets add that.
for hit in r.json()['hits']['hits']:
win_event_data = hit['_source']['data']['win']['eventdata']
win_system_data = hit['_source']['data']['win']['system']
print(win_system_data['systemTime'])
print(win_event_data['targetUserName'])
print(win_event_data['logonType'])
print(win_system_data['computer'])
break
2024-02-20T12:15:01.2249938Z
SYSTEM
5
EXAMPLE-MACHINE1
We can do a quick mapper dictionary to enrich the logon type from number to string that tells more about the actual logon type.
logon_types = {
"2": "Interactive",
"3": "Network",
"4": "Batch",
"5": "Service",
"7": "Unlock",
"8": "NetworkClearText",
"9": "NewCredentials",
"10": "RemoteInterActive",
"11": "CachedInteractive"
}
Now, let’s do another loop, but create a list of dicts with all the results. Each dict will have the following data of one event:
{"date": "<date>", "computer": "<hostname>", "username": "<target user>", "logontype": "<logon type>", "logontype_descr": "<logon type description>"}
logons = []
for hit in r.json()['hits']['hits']:
win_event_data = hit['_source']['data']['win']['eventdata']
win_system_data = hit['_source']['data']['win']['system']
# Put logon type to own variable and then use that with the mapper
win_logon_type = win_event_data['logonType']
win_dt = datetime.strptime(win_system_data['systemTime'][:-4], '%Y-%m-%dT%H:%M:%S.%f')
_logon = {
"date": win_dt,
"computer": win_system_data['computer'],
"username": win_event_data['targetUserName'],
"logontype": win_logon_type,
"logontype_descr": logon_types[win_logon_type]
}
logons.append(_logon)
Here’s the first event from the list as an example. I changed systemTime
from string to datetime object, so it’s possible to filter data based on time.
print(logons[0])
{'date': datetime.datetime(2024, 2, 20, 12, 15, 1, 224900), 'computer': 'EXAMPLE-MACHINE1', 'username': 'SYSTEM', 'logontype': '5', 'logontype_descr': 'Service'}
Now we have the main information from the Logon events neatly structured. We can put the data into a Panda’s data frame for further analysis.
df = pd.DataFrame(logons)
Let’s check all the events where the logon type is not 5 (Service).
df[(df['logontype'] != '5')]
date | computer | username | logontype | logontype_descr | |
---|---|---|---|---|---|
45 | 2024-02-08 16:26:23.383800 | EXAMPLE-MACHINE2 | exampleuser1 | 7 | Unlock |
62 | 2024-02-08 18:29:36.678200 | EXAMPLE-MACHINE2 | DWM-2 | 2 | Interactive |
76 | 2024-02-08 16:26:23.282300 | EXAMPLE-MACHINE2 | exampleuser1 | 11 | CachedInteractive |
94 | 2024-02-08 18:29:36.600500 | EXAMPLE-MACHINE2 | UMFD-2 | 2 | Interactive |
95 | 2024-02-08 18:29:36.678200 | EXAMPLE-MACHINE2 | DWM-2 | 2 | Interactive |
171 | 2024-02-24 15:00:11.339700 | EXAMPLE-MACHINE1 | exampleuser2 | 11 | CachedInteractive |
228 | 2024-02-24 15:00:11.399500 | EXAMPLE-MACHINE1 | exampleuser2 | 7 | Unlock |
290 | 2024-02-19 21:10:09.815100 | EXAMPLE-MACHINE2 | UMFD-2 | 2 | Interactive |
291 | 2024-02-19 09:20:40.668400 | EXAMPLE-MACHINE1 | exampleuser2 | 11 | CachedInteractive |
292 | 2024-02-19 09:20:40.720900 | EXAMPLE-MACHINE1 | exampleuser2 | 7 | Unlock |
312 | 2024-02-19 21:10:09.887700 | EXAMPLE-MACHINE2 | DWM-2 | 2 | Interactive |
313 | 2024-02-19 21:10:09.887700 | EXAMPLE-MACHINE2 | DWM-2 | 2 | Interactive |
We can check the total count for different logon types.
pd.DataFrame(df['logontype_descr'].value_counts())
count | logontype_descr |
---|---|
Service | 306 |
Interactive | 6 |
Unlock | 3 |
CachedInteractive | 3 |
It could be interesting to search for logon events in between specific hours. To do this, I’ll will first change the index of the data frame to the date
column.
df = df.set_index('date')
Now we can use between_time
function to search between specific hours.
df.between_time("17:00", "8:00")
date | computer | username | logontype | logontype_descr |
---|---|---|---|---|
2024-02-08 17:01:07.405700 | EXAMPLE-MACHINE2 | SYSTEM | 5 | Service |
2024-02-08 18:02:14.302700 | EXAMPLE-MACHINE2 | SYSTEM | 5 | Service |
2024-02-08 18:02:24.544600 | EXAMPLE-MACHINE2 | SYSTEM | 5 | Service |
2024-02-08 18:03:46.177100 | EXAMPLE-MACHINE2 | SYSTEM | 5 | Service |
2024-02-08 18:04:26.899100 | EXAMPLE-MACHINE2 | SYSTEM | 5 | Service |
... | ... | ... | ... | ... |
2024-02-19 21:10:09.887700 | EXAMPLE-MACHINE2 | DWM-2 | 2 | Interactive |
2024-02-19 21:09:22.045200 | EXAMPLE-MACHINE2 | SYSTEM | 5 | Service |
2024-02-19 21:10:00.808600 | EXAMPLE-MACHINE2 | SYSTEM | 5 | Service |
2024-02-19 21:10:14.969700 | EXAMPLE-MACHINE2 | SYSTEM | 5 | Service |
2024-02-19 21:10:14.837200 | EXAMPLE-MACHINE2 | SYSTEM | 5 | Service |
Quering and analyzing vulnerability status
This example shows how to query vulnerability events from Wazuh alerts and then filter down the results to get situational picture of the vulnerability status in you environment.
I have to mention that Wazuh has really nice pre-built dashboards for vulnerability status, so mostly get this type of information out of the box. Only limitation is that it limits pre-built visualization per host (agent), so you don’t really get “big picture” dashboards without building one yourself.
I think this kind of requests+Pandas approach can currently allow better vulnerability status analysis for the whole environment instead of one host. There’s also some deduplication needed for the events which might be hard to do directly within the Wazuh indexer.
In this example, we are doing a similar query as before. The main difference is that we are now targeting wazuh-alerts*
indices and match events where field data.vulnerability.status
is Active
or Solved
.
Below is the full code which is mostly same as the previous example. It also uses creds.json
file for authentication.
import requests
import json
import pandas as pd
from requests.auth import HTTPBasicAuth
from datetime import datetime
from dateutil.relativedelta import relativedelta
target_index = "wazuh-alerts-4.x-2024.02*"
wazuh_server = "https://127.0.0.1:9200"
with open('creds.json') as f:
data = json.load(f)
user = data['username']
password = data['password']
# maximum number of results to return
result_max = 1000
query = {"from" : 0, "size" : result_max,
"query":
{"bool":
{"should":
[
{"match":{"data.vulnerability.status": "Active"}},
{"match":{"data.vulnerability.status": "Solved"}}
]
}
}
}
r = requests.post(f"{wazuh_server}/{target_index}/_search", json=query, verify=False, auth=HTTPBasicAuth(user,password))
vulns = []
for hit in r.json()['hits']['hits']:
vuln_data = hit['_source']['data']['vulnerability']
vuln = {
"timestamp": datetime.strptime(hit['_source']['@timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ'),
"affected": hit['_source']['agent']['name'],
"CVE": vuln_data['cve'],
"package": vuln_data['package']['name'],
"status": vuln_data['status']
}
vulns.append(vuln)
df = pd.DataFrame(vulns)
df = df.set_index('timestamp')
Now we have the results in a data frame that is indexed by the timestamp and results look like this.
affected | CVE | package | status | |
---|---|---|---|---|
timestamp | ||||
2024-02-24 05:04:30.904 | testmachine1 | CVE-2024-25629 | libc-ares2 | Active |
2024-02-24 15:01:12.572 | testmachine2 | CVE-2024-21341 | Windows 10 | Active |
2024-02-24 15:01:12.595 | testmachine2 | CVE-2024-21338 | Windows 10 | Active |
2024-02-24 19:17:36.220 | testmachine1 | CVE-2023-42117 | exim4-base | Active |
2024-02-24 19:17:37.211 | testmachine1 | CVE-2021-3981 | grub-common | Active |
... | ... | ... | ... | ... |
2024-02-13 18:01:53.392 | testmachine1 | CVE-2023-5679 | bind9-dnsutils | Active |
2024-02-13 18:01:53.475 | testmachine1 | CVE-2023-5679 | bind9-host | Active |
2024-02-13 18:01:58.236 | testmachine1 | CVE-2023-4408 | bind9-dnsutils | Active |
2024-02-13 18:01:58.405 | testmachine1 | CVE-2023-4408 | bind9-libs | Active |
2024-02-13 18:01:58.630 | testmachine1 | CVE-2023-50387 | bind9-dnsutils | Active |
At this point we might have duplicate data that does not give very clear picture of the environments vulnerability status. Wazuh will report same vulnerability during the every scan if it’s not fixed so “duplicate” events may accumalate.
We can get better situational picture if we order the events by timestamp and then remove duplicates based on fields “CVE” and “affected”. After this we have last events where “CVE” and “affected” fields are unique so effectively we get the latest status per vulnerability by host.
df = df.sort_values('timestamp').drop_duplicates(['CVE','affected'], keep='last')
Now we can futher filter the results to see events where the vulnerability status is still “Active”.
df[(df['status'] == 'Active')]
affected | CVE | package | status | |
---|---|---|---|---|
timestamp | ||||
2024-02-16 18:23:06.593 | testmachine1 | CVE-2024-22667 | xxd | Active |
2024-02-24 05:04:30.904 | testmachine1 | CVE-2024-25629 | libc-ares2 | Active |
You can also check statistics for solved events within a specific period. Here’s an example to get results for the last month without hard coded dates.
now = datetime.now()
now_date = now.strftime('%Y-%m-%d')
last_month_date = (now + relativedelta(months=-1)).strftime('%Y-%m-%d')
solved = df[(df['status'] == 'Solved')].loc[last_month_date:now_date]['CVE'].count()
print(f"{solved} vulnerabilities were resolved during the last month.")
234 vulnerabilities were resolved during the last month.
You can also check amount of “Active” vulnerabilities per host with hosts that have at least one “Active” vulnerability.
pd.DataFrame(df[(df['status'] == 'Active')]['affected'].value_counts())
count | |
---|---|
affected | |
testmachine1 | 2 |
Query file integrity monitoring data and analyze it with Virus Total API
This example shows how to query virus total api with new files added to machines monitored by Wazuh’s file integrity module.
Wazuh has virus total integration available, but with free API key its capabilities are quite limited (4 requests per minute).
We will first do a similar query as before and construct a data frame from the results. This query targets added files from syscheck
events.
Here creds.json
should look like this:
{"username": "<username>", "password": "<password>", "vtapikey": "<Virustotal API key>"}
import requests
import json
import time
import pandas as pd
from requests.auth import HTTPBasicAuth
from datetime import datetime
from dateutil.relativedelta import relativedelta
target_index = "wazuh-alerts-4.x-2024.02*"
wazuh_server = "https://127.0.0.1:9200"
with open('creds.json') as f:
data = json.load(f)
user = data['username']
password = data['password']
vtapikey = data['vtapikey']
# maximum number of results to return
result_max = 1000
query = {"from" : 0, "size" : result_max,
"query":
{"bool":
{"must":
[
{"match":{"rule.groups": "syscheck_file"}},
{"match": {"syscheck.event": "added"}}
]
}
}
}
r = requests.post(f"{wazuh_server}/{target_index}/_search", json=query, verify=False, auth=HTTPBasicAuth(user,password))
file_added_events = []
for hit in r.json()['hits']['hits']:
fim_data = hit['_source']['syscheck']
_base_dict = {
"agent": hit['_source']['agent']['name'],
"timestamp": datetime.strptime(hit['_source']['@timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ')
}
_file_event = _base_dict | fim_data
file_added_events.append(_file_event)
df = pd.DataFrame(file_added_events)
df = df.set_index('timestamp')
Now we have the data in a data frame that is indexed by the timestamp and results look like this.
df
agent | uname_after | mtime_after | size_after | gid_after | mode | path | sha1_after | gname_after | uid_after | perm_after | event | md5_after | sha256_after | inode_after | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
timestamp | |||||||||||||||
2024-02-24 21:30:12.739 | testmachine1 | root | 2024-01-31T23:14:09 | 7039552 | 0 | scheduled | /boot/vmlinuz-5.10.0-28-amd64 | f8f4abe111a413b87d3c7926dbe86ed3b4fcb403 | root | 0 | rw-r--r-- | added | 297b6a92dec82d8c36072a246f2ce611 | 99d46c742f33117b1a30fb487005f1cb319c4f16a7ad0b... | 5519275 |
2024-02-24 21:30:17.354 | testmachine1 | root | 2024-01-31T23:14:09 | 236364 | 0 | scheduled | /boot/config-5.10.0-28-amd64 | b29a61e9d5aded5a2e2560057939b14dd10d94b7 | root | 0 | rw-r--r-- | added | 9695055abf02a617e8cd06c9ef4dad78 | 7a5d1191b15eb1f4b0e8a62149e3ec710273b35af7d8fe... | 5519237 |
2024-02-24 21:30:17.355 | testmachine1 | root | 2024-01-31T23:14:09 | 83 | 0 | scheduled | /boot/System.map-5.10.0-28-amd64 | 5c06ff9db57a64ec70c6a8b71a0cd23c4de6b5f4 | root | 0 | rw-r--r-- | added | e0a6c09212a19ed48183a052eab847e7 | f47b4c00d87f4b5378487972ded9d447074d4771bea98a... | 5519225 |
2024-02-24 21:30:12.427 | testmachine1 | root | 2024-02-24T20:57:47 | 30051389 | 0 | scheduled | /boot/initrd.img-5.10.0-28-amd64 | a561f6c201b6359d3a6d90a9c0c23523d30b3925 | root | 0 | rw-r--r-- | added | 33cb59a129eeb2916791ead278d60ea0 | 4fef4d48f2096d8e7f98043d411e3f5a2fd816fb1933e6... | 5520765 |
Below is a function that will parse the results from the VT API. It creates a dictionary like this per file:
{
# Set to "yes" if any of the engines has flagged the checksum
"malicious": "<yes/no>",
# Set to yes if checksum was found in VT (response not 404)
"found_in_vt": "<yes/no>",
# Engine that flagged the checksum. One key per engine.
"<engine>": "<category>"
}
def vt_result_analyze(res,status_code):
results = {"malicious": "no", "found_in_vt": "yes"}
if status_code == 404:
results['found_in_vt'] = "no"
return results
analysis_results = res['data']['attributes']['last_analysis_results']
for engine in analysis_results:
_category = analysis_results[engine]['category']
if _category not in ["undetected", "type-unsupported"]:
results['malicious'] = "yes"
results[engine] = _category
return results
Here we will loop over the values in sha256_after
column and call https://www.virustotal.com/api/v3/files/<checksum>
endpoint for each hash.
Loop will create a dict where result of the vt_result_analyze
function is combined with a dict {"sha256_after": "<hash>"}
and those are appended to a list.
We are using the same column name sha256_after
, so we can combine the original data with results from the VT API.
The code will sleep for a minute after four API requests because of the VT API’s limitations with a free API key.
headers = {"x-apikey": vtapikey}
vt_results = []
req_n = 1
for hash in df['sha256_after']:
if req_n == 4:
print("Waiting for 60 seconds...")
time.sleep(60)
req_n = 1
r = requests.get(f"https://www.virustotal.com/api/v3/files/{hash}", headers=headers)
if r.status_code not in [200,404]:
print(r.text, str(r.status_code))
break
vt_res = vt_result_analyze(r.json(), r.status_code)
# Add column with the same name as in in df so we can merge based on this column
vt_results.append({"sha256_after": hash}|vt_res)
req_n += 1
Now we have all the results from VT in vt_results
variable.
Next, we will turn that into a data frame and combine it with the original data frame base on the sha256_after
columns.
vtdf = pd.DataFrame(vt_results)
dfinal = df.merge(vtdf, on='sha256_after')
Now we have the original file integrity data combined with virus total data in dfinal
variable.
dfinal
agent | uname_after | mtime_after | size_after | gid_after | mode | path | sha1_after | gname_after | uid_after | perm_after | event | md5_after | sha256_after | inode_after | malicious | found_in_vt | Trapmine | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | testmachine1 | root | 2024-01-31T23:14:09 | 7039552 | 0 | scheduled | /boot/vmlinuz-5.10.0-28-amd64 | f8f4abe111a413b87d3c7926dbe86ed3b4fcb403 | root | 0 | rw-r--r-- | added | 297b6a92dec82d8c36072a246f2ce611 | 99d46c742f33117b1a30fb487005f1cb319c4f16a7ad0b... | 5519275 | yes | yes | malicious |
1 | testmachine1 | root | 2024-01-31T23:14:09 | 236364 | 0 | scheduled | /boot/config-5.10.0-28-amd64 | b29a61e9d5aded5a2e2560057939b14dd10d94b7 | root | 0 | rw-r--r-- | added | 9695055abf02a617e8cd06c9ef4dad78 | 7a5d1191b15eb1f4b0e8a62149e3ec710273b35af7d8fe... | 5519237 | no | no | NaN |
2 | testmachine1 | root | 2024-01-31T23:14:09 | 83 | 0 | scheduled | /boot/System.map-5.10.0-28-amd64 | 5c06ff9db57a64ec70c6a8b71a0cd23c4de6b5f4 | root | 0 | rw-r--r-- | added | e0a6c09212a19ed48183a052eab847e7 | f47b4c00d87f4b5378487972ded9d447074d4771bea98a... | 5519225 | no | yes | NaN |
3 | testmachine1 | root | 2024-02-24T20:57:47 | 30051389 | 0 | scheduled | /boot/initrd.img-5.10.0-28-amd64 | a561f6c201b6359d3a6d90a9c0c23523d30b3925 | root | 0 | rw-r--r-- | added | 33cb59a129eeb2916791ead278d60ea0 | 4fef4d48f2096d8e7f98043d411e3f5a2fd816fb1933e6... | 5520765 | no | no | NaN |
Funnily enough Trapmine
engine flagged the new Linux kernel as malicious. I checked the details from virus total and this was based on Malicious.high.ml.score, so some trustworthy ML analysis. :)