Examples: Using the API to Insert a Security Event Document to aella-ser Index

This section describes how to use the Stellar Cyber API to insert a security event document in the aella-ser index.

Refer to Introducing the Stellar Cyber API for general requirements to use the API.

Required Privileges to Make API Calls

To perform API calls, you must have:

  • Root scope

  • Super Admin privileges (must be the default profile template)

We recommend creating a Stellar Cyber user dedicated to API calls. That way you can easily track changes made through API calls under System | Users | Activity Log.

Finding the API Key

You will also need an API key for programmatic access to the API.

Generate an API key as follows:

  1. Navigate to System | Users.

  2. Locate the user account to perform the API call and select Edit () in its row.

    Remember that the user performing the call must have Root scope and Super Admin privileges.

  3. Locate the API Access item in the dialog box that appears and select Generate New Token.

  4. Copy and paste the token into a text file to store it temporarily.

Generating a JSON Web Token (JWT) from the API Key

Requests to the Stellar Cyber public APIs can be secured using JSON Web Token (JWT) authentication.

Using the API to Insert a Security Event Document

You can use POST calls to insert a security event document to aella-ser. You can use any language to make the call. For our example, we use Python.

Inserting a Security Event with Basic Authentication

headers={'Accept': 'application/json', 'Content-type': 'application/json'}
elastic_url ='https:// YourStellarCyberServer/connect/api/v1/insert_ser'
json_data = {
"event_name": "EventName",
"timestamp": Timestamp,
"event_status": "EventStatus",
"event_source": "EventSource",
}
query = json.dumps(json_data)
response = requests.post(elastic_url, auth=('Username','API Key'), data=query, verify=False, headers = headers)
print (response.text)

Your call must have the header and the /connect/api/v1/insert_ser path, and must have at least the following fields, in addition to any optional aella-ser index fields:

  • event_name

  • timestamp

  • event_status

  • event_source

Enter your own information for the arguments in bold.

Argument Description
Username User name of the admin making the call
API Key

API key for that user name.

YourStellarCyberServer The URL or IP address of your Stellar Cyber server
EventName The name of the event
Timestamp The time the event took place. You can use the following code to set this field to the current time in Python: long(time.time() * 1000).
EventStatus

You can set the status to:

  • New
  • In Progress
  • Ignored
  • Closed
EventSource

The source of the event. For example, ActZero.

The contents of response.text indicate success or failure and will be similar to:

  • OK 200—the API call was successful.
  • Invalid event status value: closed—the API call failed because the supplied status was invalid.
  • Missing fields to be updated for the event—the API call failed because you left out one of the mandatory fields.

Sample insert_ser Call

Here is an example with arguments filled in:

headers={'Accept': 'application/json', 'Content-type': 'application/json'}
elastic_url ='https://192.168.1.24/connect/api/v1/insert_ser'
json_data = {
"event_name": "demo test",
"detected_field": "my field test",
"tryout": "test",
"timestamp": long(time.time() * 1000),
"event_status": "New",
"comments": "Test Comment",
"event_source": "ActZero",
}
query = json.dumps(json_data)
response = requests.post(elastic_url, auth=('admin','APISquared''), data=query, verify=False, headers = headers)
print (response.text)

This example performs a POST call as the user admin with the obviously fake API key of APISquared. The call is to the Stellar Cyber server at 192.168.1.24 to insert a document in the aella-ser index with the following fields:

  • event_name—demo test

  • detected_field—my field test

  • tryout—test

  • timestamp—Current time in Python (long(time.time() * 1000))

  • event_status—New

  • comments—Test Comment

  • event_source—ActZero

Inserting a Security Event with JWT Authentication

You can also use JWT authentication, as in the example below.

Your call must have the header and the /connect/api/v1/insert_ser path, and must have at least the following fields, in addition to any optional aella-ser index fields:

  • event_name

  • timestamp

  • event_status

  • event_source

Enter your own information in the script for the arguments in the table below.

Argument Description
userid User name of the admin making the call. Sample script below uses myuser@stellarcyber.ai.
API Key (Refresh Token)

API key for the user, retrieved from the user interface. Sample script below uses 2iRpBAyQYEfv77R2QtATlJN6Nvq6uzftBdzotSy2pjT-IvJTLw9aiHyh7Y2mo12IDSWc-FfHwUyPpmiHQnJrSH.

HOST The URL or IP address of your Stellar Cyber server. Sample script below uses myserver@stellarcyber.cloud.
EventName The name of the event
Timestamp The time the event took place. You can use the following code to set this field to the current time in Python: long(time.time() * 1000).
EventStatus

You can set the status to:

  • New
  • In Progress
  • Ignored
  • Closed
EventSource

The source of the event. For example, ActZero.

The contents of response.text indicate success or failure and will be similar to:

  • OK 200—the API call was successful.
  • Invalid event status value: closed—the API call failed because the supplied status was invalid.
  • Missing fields to be updated for the event—the API call failed because you left out one of the mandatory fields.

Sample insert_ser Script

The sample script below works as follows:

  • The script sets the host, userid, and refresh_token parameters in Step 1 in the sample.

  • Because JWTs expire ten minutes after they are generated, this script includes logic that generates and uses a fresh JWT every time the script is run. The script runs the getAccessToken procedure to generate the new JWT (Step 2 in the sample).

  • The script uses the generated JWT to make a POST to the insert_ser API in the putCase procedure (Step 3 in the sample). The script posts the following JSON data:

    • event_name—demo test
    • detected_field—my field test

    • tryout—test

    • timestamp—Current time in Python (long(time.time() * 1000))

    • event_status—New

    • comments—Test Comment

    • event_source—ActZero

  • The script also prints the generated JWT to the screen. This, however, is not strictly necessary since the getAccessToken procedure already prints the status code for the call to the access_token API (200 for success; 401 for failure).

Copy
#!/usr/bin/python3

import requests
import base64
import json
import time
from urllib.parse import urlunparse
requests.packages.urllib3.disable_warnings()

# Step 1
# Add DP IP/hostname, userid, and refresh token from GUI here
HOST = "myserver.stellarcyber.cloud"
userid = "myuser@stellarcyber.ai"
refresh_token ="2iRpBAyQYEfv77R2QtATlJN6Nvq6uzftBdzotSy2pjT-IvJTLw9aiHyh7Y2mo12IDSWc-FfHwUyPpmiHQnJrSH"

def getAccessToken(userid, refresh_token):
    auth = base64.b64encode(bytes(userid + ":" + refresh_token, "utf-8")).decode("utf-8")
    headers = {
        "Authorization": "Basic " + auth,
        "Content-Type": "application/x-www-form-urlencoded",
    }
    url = urlunparse(("https", HOST, "/connect/api/v1/access_token", "", "", ""))
    res = requests.post(url, headers=headers, verify=False)
    print(res.status_code)
    return res.json()["access_token"]
    
def putCase(token):
    headers = {"Authorization": "Bearer " + token, 'content-type': 'application/json' }
    url = urlunparse(("https", HOST, "/connect/api/v1/insert_ser", "", "", ""))
    json_data = {
      "event_name": "demo test",
      "detected_field": "my field test",
      "tryout": "test",
      "timestamp": time.time() * 1000,
      "event_status": "New",
      "comments": "Test Comment",
      "event_source": "ActZero",
    }
    query = json.dumps(json_data)
    res = requests.post(url, data=query, headers=headers, verify=False)
    print(res.text)
    print(res.status_code)
    return res.json()
    
if __name__ == "__main__":

    # Step 2: Use getAccessToken with supplied credentials to generate JWT
    jwt = getAccessToken(userid, refresh_token)
    print("------------ jwt -------------")
    print(jwt)
    print("------------ jwt  end -------------")
    
    # Step 3: use JWT token to call public API
    data = putCase(jwt)
    print("------------ show result of put case -------------")
    print(data)
    print("------------ end api results -------------")

Fields in aella-ser Index

The table below lists the fields in the aella-ser index. Required fields listed are listed in bold with an asterisk. You can also include custom fields. Custom fields are displayed after standard fields under custom_ser_field.

Field Name Elasticsearch Type Description

actual

float

Either an anomalous value detected by ML jobs or an aggregation value detected by Statistical Analytics jobs.

typical

float

The typical value established by the ML model.

stellar.alert_time

date

The time at which the anomaly was detected

timestamp *

date

The actual time at which the event took place.

write_time

date

The time the record was written to Elasticsearch.

event_category

string

The category of the event category. Acceptable values are killchain, network, or uba.

event_name *

string

The name of the event.

event_score

long

The score assigned to the event, indicating a combination of severity, fidelity, and threat_score. Acceptable values range from 0-100.

event_source *

string

The source of the event. For example, ActZero.

event_status *

string

The status of the event. Acceptable values include New, In Progress, Ignored, and Closed.

event_type

string

The type of event.

severity

long

A numerical score indicating the severity of an event on a scale from 0 (least severe) to 100 (most severe).

fidelity

double

A numerical score indicating the level of confidence in a security event on a scale from 0 (least confident) to 100 (most confident).

threat_score

long

A numerical score measuring the threat level for the event, based on severity, fidelity, and threat intel. Acceptable values range from 0 (lowest threat) to 100 (highest threat).

lateral

boolean

Specifies whether the Interflow data supporting the event is from private or to private.

orig_id

string

The ID for the raw data supporting the event in Elasticsearch.

orig_index

string

The index for the raw data supporting the event in Elasticsearch.

detected_field

string

For ML jobs with a single detection field.

detected_fields

array of strings

For ML jobs with multiple detection fields.

detected_value

string

For ML jobs with a single detection field.

detected_values

array of strings

For ML jobs with multiple detection fields.

detector_index

long

The detector index in ML detections.

start_bucket_time

long

The starting timestamp of the data that caused the anomaly.

end_bucket_time

long

The ending timestamp of the data that cause the anomaly. This field combines with start_bucket_time to form a time range for the event.

custom_ser_field

object

Use a unique namespace (for example, ActZero), to avoid conflicts with existing fields in ElasticSearch.

Using the API to Check the Update

Stellar Cyber responds to a successful insert_ser call with a response including the index and _id for the inserted event. For example:

{"index":"aella-ser-2021.04.28-","_id":"ZbWyFnkBZsFu_Ay8bQxZ"} 200

You can use the returned index and _id to query Stellar Cyber for the inserted document.

This example uses the index and _id returned from our sample insert, as indicated in bold below:

headers={'Accept': 'application/json', 'Content-type': 'application/json'}
elastic_url ='https:// YourStellarCyberServer/connect/api/data/aella-ser-2021.04.28-/amsg/ZbWyFnkBZsFu_Ay8bQxZ'
response = requests.get(elastic_url, auth=('Username','API Key'), verify=False, headers = headers)
a = json.loads(response.text)
print(json.dumps(a, indent=4, sort_keys=True))

In response, we get a document similar to the following:

{
"_id": "ZbWyFnkBZsFu_Ay8bQxZ",
"_index": "aella-ser-1618012988732-",
"_primary_term": 1,
"_seq_no": 3122162,
"_source": {
"custom_ser_field": {
"comments": "Test Comment",
"tryout": "test"
},
"detected_field": "my field test",
"event_name": "demo test",
"event_source": "ActZero",
"event_status": "New",
"msg_origin": {
"source": "API"
},
"tenantid": "",
"timestamp": 1619583462308,
"write_by": "API",
"write_time": 1619583462741
},
"_type": "amsg",
"_version": 1,
"found": true
}