14

Retrieving data from Splunk Dashboard Panels via API

 2 years ago
source link: https://avleonov.com/2019/02/07/retrieving-data-from-splunk-dashboard-panels-via-api/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Retrieving data from Splunk Dashboard Panels via API

Fist of all, why might someone want to get data from the panels of a dashboard in Splunk? Why it might be useful? Well, if the script can process everything that human analyst sees on a Splunk dashboard, all the automation comes very natural. You just figure out what routine operations the analyst usually does using the dashboard and repeat his actions in the script as is. It may be the anomaly detection, remediation task creation, reaction on various events, whatever. It really opens endless possibilities without alerts, reports and all this stuff. I’m very excited about this. 🙂

Exporting data from Splunk dashboard

Let’s say we have a Splunk dashboard and want to get data from the table panel using a python script. The problem is that the content of the table that we see is not actually stored anywhere. In fact it is the results of some search query, from the XML representation of the dashboard, executed by Splunk web GUI. To get this data we should execute the same search request.

That’s why we should:

  1. Get XML code of the dashboard
  2. Get the search query for each panel
  3. Process searches based on other searches and get complete search query for each panel
  4. Launch the search request and get the results

First of all, we need to create a special account that will be used for getting data from Splunk. In Web GUI “Access controls -> Users”.

user = "splunk_user"
password = "password123"

Getting XML code of the dashboard

Dashboard URL it already contains the name of application and the name of dashboard:

https://[server]:8000/en-US/app/important_aplication/important_dashboard

app_name = "important_aplication"
dashboard_name = "important_dashboard"

We need to get app_author:

import requests
import json

splunk_server = "https://splunk.corporation.com:8089"

app_author = ""
data = {'output_mode': 'json'}
response = requests.get( splunk_server + '/services/apps/local?count=-1', data=data,
                             auth=(user, password), verify=False)
for entry in json.loads(response.text)['entry']:
    if entry['name'] == app_name:
        app_author = entry['author']

print(app_author)

Output:

nobody

When we have app_author, app_name and dashboard_name we can get dashboard XML:

data = {'output_mode': 'json'}
response = requests.get( splunk_server + '/servicesNS/' + app_author + '/' + app_name + '/data/ui/views/' + dashboard_name, data=data,
                             auth=(user, password), verify=False)
dashboard_xml = json.loads(response.text)['entry'][0]['content']['eai:data']

Getting the search query for each panel

We will parse XML code of this dashboard with Beautiful soup:

from bs4 import BeautifulSoup
soup = BeautifulSoup(dashboard_xml, 'xml')
panels = list()
for panel in soup.find_all('panel'):
    panel_dict = dict()
    if type(panel.title) != type(None):
        panel_dict['title'] = panel.title.text
    else:
        panel_dict['title'] = 'unnamed'
    if type(panel.query) != type(None):
        panel_dict['query'] = panel.query.text
    else:
        panel_dict['query'] = 'empty'
    if type(panel.search) != type(None):
        if 'id' in panel.search.attrs:
            panel_dict['search_id'] = panel.search['id']
        else:
            panel_dict['search_id'] = False
        if 'base' in panel.search.attrs:
            panel_dict['search_base'] = panel.search['base']
        else:
            panel_dict['search_base'] = False
    else:
        panel_dict['search_id'] = False
        panel_dict['search_base'] = False
    if type(panel.earliest) != type(None):
        panel_dict['search_earliest'] =  panel.earliest.text
    else:
        panel_dict['search_earliest'] = False
    if type(panel.latest) != type(None):
        panel_dict['search_latest'] = panel.latest.text
    else:
        panel_dict['search_latest'] = False
    panels.append(panel_dict)

Output:

[{'query': u'eventstats max(date) as maxdate | where date == maxdate | fields - maxdate | fields ImportantField', 'search_base': u'first_search_id', 'search_id': False, 'title': u'Important Title'},...]

Combining based search queries in complete search queries

Now we should get rid of connected searches. This part is a bit tricky. For each panel I recursively get the chain of based search IDs and combine related search queries. I also edit “complete” search queries to make them start with search command, which can be dropped in dashboard XML, but is mandatory in API requests, or “|” (I assume the case “| loadjob savedsearch…”)

import re

def get_search_id_list(search_base, panels):
    search_id_list = list()
    def get_base(search_base, panels):
        for panel in panels:
            if panel['search_id'] == search_base:
                search_id_list.append(panel['search_id'])
                if panel['search_base']:
                    get_base(panel['search_base'], panels)
    get_base(search_base, panels)
    reversed_search_id_list = list()
    for title in reversed(search_id_list):
        reversed_search_id_list.append(title)
    return(reversed_search_id_list)

def get_panel_by_search_id(search_id, panels):
    for panel in panels:
        if panel['search_id'] == search_id:
            return(panel)

def get_query_from_panel(panel):
    query = panel['query']
    if panel['search_earliest']:
        query = "earliest=" + panel['search_earliest'] + " " + query
    if panel['search_latest']:
        query = "latest=" + panel['search_latest'] + " " + query
    return query

dashboard_searches = dict()
for panel in panels:
    query = ""
    if panel['search_base']:
        search_id_list = get_search_id_list(panel['search_base'], panels)
        for search_id in search_id_list:
            previos_panel = get_panel_by_search_id(search_id, panels)
            query += " | " + get_query_from_panel(previos_panel)
    query +=  " | " + get_query_from_panel(panel)
    query = re.sub("^ \| ","",query)
    query = re.sub("[ \t]*\|[ \t]*\|[ \t]*", " | ", query)
    if not re.findall("^[ \t]*search",query) and not re.findall("[ \t]*^\|",query):
        query = "search " + query

    if panel['title'] in dashboard_searches:
        n = 1
        while panel['title'] + "_" + str(n) in dashboard_searches:
            n += 1
        panel['title'] = panel['title'] + "_" + str(n)

    dashboard_searches[panel['title']] = query

We get the dictionary, where title of the panel is the key and search query is the value.

Making a search request

The final thing is to make the search request and get the results. You can do it like this:

import time

dashboard = "Important Panel Title"
query = dashboard_searches[dashboard]

data = {'search': query, 'output_mode': 'json', 'max_count':'10000000'}
response = requests.post(splunk_server + '/services/search/jobs', data=data,
                         auth=(user, password), verify=False)

job_id = json.loads(response.text)['sid']

dispatchState = "UNKNOWN"
while dispatchState!="DONE" and dispatchState!="FAILED":
    data = {'search': query, 'output_mode': 'json', 'max_count':'10000000'}
    response = requests.post(splunk_server + '/services/search/jobs/' + job_id, data=data,
                             auth=(user, password), verify=False)
    dispatchState = json.loads(response.text)['entry'][0]['content']['dispatchState']
    time.sleep(1)
    print(dispatchState)

if dispatchState=="DONE":
    results_complete = False
    offset = 0
    results = list()
    while not results_complete:
        data = {'output_mode': 'json'}
        response = requests.get(splunk_server + '/services/search/jobs/' + job_id +
                                '/results?count=50000&offset='+str(offset),
                                data=data, auth=(user, password), verify=False)
        response = json.loads(response.text)
        results += response['results']
        if len(response['results']) == 0: #This means that we got all of the results
            results_complete = True
        else:
            offset += 50000
    print(results)

Output:

[{u'data': u'value1'}, {u'data': u'value2'},...]

The content of the table will be returned as a list of dictionaries, where name of the column is the key and cell value is the value in dictionary.

Hi! My name is Alexander and I am an Information Security Automation specialist. You can read more about me here. Currently, the best way to follow me is my Telegram channel @avleonovcom. I update it more often than this site. If you haven’t used Telegram yet, give it a try. It’s great. You can discuss my posts or ask questions at @avleonovchat.

This entry was posted in API, Security Information and Event Management (SIEM) and tagged dashboard, json, panel, python, Splunk, xml on February 7, 2019.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK