Examples: Using the API to Insert a Security Event Document to aella-ser Index
This section describes how to use the Stellar Cyber API to insert a security event document in the aella-ser index.
Refer to Introducing the Stellar Cyber API for general requirements to use the API.
Required Privileges to Make API Calls
To perform API calls, you must have:
-
Root scope
-
Super Admin privileges (must be the default profile template)
We recommend creating a Stellar Cyber user dedicated to API calls. That way you can easily track changes made through API calls under System | Users | Activity Log.
Finding the API Key
You will also need an API key for programmatic access to the API.
Generate an API key as follows:
-
Navigate to System | Users.
-
Locate the user account to perform the API call and select Edit () in its row.
Remember that the user performing the call must have Root scope and Super Admin privileges.
-
Locate the API Access item in the dialog box that appears and select Generate New Token.
-
Copy and paste the token into a text file to store it temporarily.
Generating a JSON Web Token (JWT) from the API Key
Requests to the Stellar Cyber public APIs
JWT authentication offers rapid, scalable, and secure authentication for API calls. They are also time-sensitive, expiring ten minutes after they are generated. Because of this, scripts you write to pull data from the Stellar Cyber API should typically include a section used to refresh the requesting user's JWT.
The sample scripts in this topic include a section that refreshes the JWT using the supplied API Key.
You generate a JWT by sending a request to the public https://<YourStellarCyberServer>/connect/api/v1/access_token API on the DP. The request must include the Email Addressand API Key you retrieved in the table above. The API Key you retrieve from the System | Users page acts as a refresh key that you use to generate a new JWT.
For example, the sample Python script below retrieves a JWT and prints it to the screen using the following details:
-
DP – myserver.stellarcyber.cloud
This is specified in the HOST variable at the top of the sample script.
-
Email Address – myuser@stellarcyber.ai
Note that the email address is case sensitive and must match the case used in the Stellar Cyber user interface.
-
Refresh Token from GUI – 2iRpBAyQYEfv77R2QtATlJN6Nvq6uzftBdzotSy2pjT-IvJTLw9aiHyh7Y2mo12IDSWc-FfHwUyPpmiHQnJrSH
These are specified in the userid and refresh_token variables in the sample script.
#!/usr/bin/python3
import requests
import base64
import json
from urllib.parse import urlunparse
requests.packages.urllib3.disable_warnings()
# Step 1
# Add DP IP/hostname, userid, and refresh token from GUI here
HOST = "myserver.stellarcyber.cloud"
userid = "myuser@stellarcyber.ai"
refresh_token = "2iRpBAyQYEfv77R2QtATlJN6Nvq6uzftBdzotSy2pjT-IvJTLw9aiHyh7Y2mo12IDSWc-FfHwUyPpmiHQnJrSH"
def getAccessToken(userid, refresh_token):
auth = base64.b64encode(bytes(userid + ":" + refresh_token, "utf-8")).decode("utf-8")
headers = {
"Authorization": "Basic " + auth,
"Content-Type": "application/x-www-form-urlencoded",
}
url = urlunparse(("https", HOST, "/connect/api/v1/access_token", "", "", ""))
res = requests.post(url, headers=headers, verify=False)
print(res.status_code)
return res.json()["access_token"]
if __name__ == "__main__":
# Step 2: Use getAccessToken with supplied credentials to generate JWT
jwt = getAccessToken(userid, refresh_token)
print("------------ jwt -------------")
print(jwt)
print("------------ jwt end -------------")
Using the API to Insert a Security Event Document
You can use POST calls to insert a security event document to aella-ser. You can use any language to make the call. For our example, we use Python.
Inserting a Security Event with Basic Authentication
headers={'Accept': 'application/json', 'Content-type': 'application/json'}
elastic_url ='https:// YourStellarCyberServer/connect/api/v1/insert_ser'
json_data = {
"event_name": "EventName",
"timestamp": Timestamp,
"event_status": "EventStatus",
"event_source": "EventSource",
}
query = json.dumps(json_data)
response = requests.post(elastic_url, auth=('Username','API Key'), data=query, verify=False, headers = headers)
print (response.text)
Your call must have the header and the /connect/api/v1/insert_ser
path, and must have at least the following fields, in addition to any optional aella-ser index fields:
-
event_name
-
timestamp
-
event_status
-
event_source
Enter your own information for the arguments in bold.
Argument | Description |
---|---|
Username | User name of the admin making the call |
API Key |
API key for that user name. |
YourStellarCyberServer | The URL or IP address of your Stellar Cyber server |
EventName | The name of the event |
Timestamp | The time the event took place. You can use the following code to set this field to the current time in Python: long(time.time() * 1000). |
EventStatus |
You can set the status to:
|
EventSource |
The source of the event. For example, ActZero. |
The contents of response.text indicate success or failure and will be similar to:
- OK 200—the API call was successful.
- Invalid event status value: closed—the API call failed because the supplied status was invalid.
- Missing fields to be updated for the event—the API call failed because you left out one of the mandatory fields.
Sample insert_ser Call
Here is an example with arguments filled in:
headers={'Accept': 'application/json', 'Content-type': 'application/json'}
elastic_url ='https://192.168.1.24/connect/api/v1/insert_ser'
json_data = {
"event_name": "demo test",
"detected_field": "my field test",
"tryout": "test",
"timestamp": long(time.time() * 1000),
"event_status": "New",
"comments": "Test Comment",
"event_source": "ActZero",
}
query = json.dumps(json_data)
response = requests.post(elastic_url, auth=('admin','APISquared''), data=query, verify=False, headers = headers)
print (response.text)
This example performs a POST call as the user admin with the obviously fake API key of APISquared. The call is to the Stellar Cyber server at 192.168.1.24 to insert a document in the aella-ser index with the following fields:
-
event_name—demo test
-
detected_field—my field test
-
tryout—test
-
timestamp—Current time in Python (long(time.time() * 1000))
-
event_status—New
-
comments—Test Comment
-
event_source—ActZero
Inserting a Security Event with JWT Authentication
You can also use JWT authentication, as in the example below.
Your call must have the header and the /connect/api/v1/insert_ser
path, and must have at least the following fields, in addition to any optional aella-ser index fields:
-
event_name
-
timestamp
-
event_status
-
event_source
Enter your own information in the script for the arguments in the table below.
Argument | Description |
---|---|
userid | User name of the admin making the call. Sample script below uses myuser@stellarcyber.ai. |
API Key (Refresh Token) |
API key for the user, retrieved from the user interface. Sample script below uses 2iRpBAyQYEfv77R2QtATlJN6Nvq6uzftBdzotSy2pjT-IvJTLw9aiHyh7Y2mo12IDSWc-FfHwUyPpmiHQnJrSH. |
HOST | The URL or IP address of your Stellar Cyber server. Sample script below uses myserver@stellarcyber.cloud. |
EventName | The name of the event |
Timestamp | The time the event took place. You can use the following code to set this field to the current time in Python: long(time.time() * 1000). |
EventStatus |
You can set the status to:
|
EventSource |
The source of the event. For example, ActZero. |
The contents of response.text indicate success or failure and will be similar to:
- OK 200—the API call was successful.
- Invalid event status value: closed—the API call failed because the supplied status was invalid.
- Missing fields to be updated for the event—the API call failed because you left out one of the mandatory fields.
Sample insert_ser Script
The sample script below works as follows:
-
The script sets the host, userid, and refresh_token parameters in Step 1 in the sample.
-
Because JWTs expire ten minutes after they are generated, this script includes logic that generates and uses a fresh JWT every time the script is run. The script runs the getAccessToken procedure to generate the new JWT (Step 2 in the sample).
-
The script uses the generated JWT to make a POST to the insert_ser API in the putCase procedure (Step 3 in the sample). The script posts the following JSON data:
- event_name—demo test
-
detected_field—my field test
-
tryout—test
-
timestamp—Current time in Python (long(time.time() * 1000))
-
event_status—New
-
comments—Test Comment
-
event_source—ActZero
-
The script also prints the generated JWT to the screen. This, however, is not strictly necessary since the getAccessToken procedure already prints the status code for the call to the access_token API (200 for success; 401 for failure).
#!/usr/bin/python3
import requests
import base64
import json
import time
from urllib.parse import urlunparse
requests.packages.urllib3.disable_warnings()
# Step 1
# Add DP IP/hostname, userid, and refresh token from GUI here
HOST = "myserver.stellarcyber.cloud"
userid = "myuser@stellarcyber.ai"
refresh_token ="2iRpBAyQYEfv77R2QtATlJN6Nvq6uzftBdzotSy2pjT-IvJTLw9aiHyh7Y2mo12IDSWc-FfHwUyPpmiHQnJrSH"
def getAccessToken(userid, refresh_token):
auth = base64.b64encode(bytes(userid + ":" + refresh_token, "utf-8")).decode("utf-8")
headers = {
"Authorization": "Basic " + auth,
"Content-Type": "application/x-www-form-urlencoded",
}
url = urlunparse(("https", HOST, "/connect/api/v1/access_token", "", "", ""))
res = requests.post(url, headers=headers, verify=False)
print(res.status_code)
return res.json()["access_token"]
def putCase(token):
headers = {"Authorization": "Bearer " + token, 'content-type': 'application/json' }
url = urlunparse(("https", HOST, "/connect/api/v1/insert_ser", "", "", ""))
json_data = {
"event_name": "demo test",
"detected_field": "my field test",
"tryout": "test",
"timestamp": time.time() * 1000,
"event_status": "New",
"comments": "Test Comment",
"event_source": "ActZero",
}
query = json.dumps(json_data)
res = requests.post(url, data=query, headers=headers, verify=False)
print(res.text)
print(res.status_code)
return res.json()
if __name__ == "__main__":
# Step 2: Use getAccessToken with supplied credentials to generate JWT
jwt = getAccessToken(userid, refresh_token)
print("------------ jwt -------------")
print(jwt)
print("------------ jwt end -------------")
# Step 3: use JWT token to call public API
data = putCase(jwt)
print("------------ show result of put case -------------")
print(data)
print("------------ end api results -------------")
Fields in aella-ser Index
The table below lists the fields in the aella-ser index. Required fields listed are listed in bold with an asterisk. You can also include custom fields. Custom fields are displayed after standard fields under custom_ser_field.
Field Name | Elasticsearch Type | Description |
---|---|---|
actual |
float |
Either an anomalous value detected by ML jobs or an aggregation value detected by Statistical Analytics jobs. |
typical |
float |
The typical value established by the ML model. |
stellar.alert_time |
date |
The time at which the anomaly was detected |
timestamp * |
date |
The actual time at which the event took place. |
write_time |
date |
The time the record was written to Elasticsearch. |
event_category |
string |
The category of the event category. Acceptable values are killchain, network, or uba. |
event_name * |
string |
The name of the event. |
event_score |
long |
The score assigned to the event, indicating a combination of severity, fidelity, and threat_score. Acceptable values range from 0-100. |
event_source * |
string |
The source of the event. For example, ActZero. |
event_status * |
string |
The status of the event. Acceptable values include New, In Progress, Ignored, and Closed. |
event_type |
string |
The type of event. |
severity |
long |
A numerical score indicating the severity of an event on a scale from 0 (least severe) to 100 (most severe). |
fidelity |
double |
A numerical score indicating the level of confidence in a security event on a scale from 0 (least confident) to 100 (most confident). |
threat_score |
long |
A numerical score measuring the threat level for the event, based on severity, fidelity, and threat intel. Acceptable values range from 0 (lowest threat) to 100 (highest threat). |
lateral |
boolean |
Specifies whether the Interflow data supporting the event is from private or to private. |
orig_id |
string |
The ID for the raw data supporting the event in Elasticsearch. |
orig_index |
string |
The index for the raw data supporting the event in Elasticsearch. |
detected_field |
string |
For ML jobs with a single detection field. |
detected_fields |
array of strings |
For ML jobs with multiple detection fields. |
detected_value |
string |
For ML jobs with a single detection field. |
detected_values |
array of strings |
For ML jobs with multiple detection fields. |
detector_index |
long |
The detector index in ML detections. |
start_bucket_time |
long |
The starting timestamp of the data that caused the anomaly. |
end_bucket_time |
long |
The ending timestamp of the data that cause the anomaly. This field combines with start_bucket_time to form a time range for the event. |
custom_ser_field |
object |
Use a unique namespace (for example, ActZero), to avoid conflicts with existing fields in ElasticSearch. |
Using the API to Check the Update
Stellar Cyber responds to a successful insert_ser call with a response including the index and _id for the inserted event. For example:
{"index":"aella-ser-2021.04.28-","_id":"ZbWyFnkBZsFu_Ay8bQxZ"} 200
You can use the returned index and _id to query Stellar Cyber for the inserted document.
This example uses the index and _id returned from our sample insert, as indicated in bold below:
headers={'Accept': 'application/json', 'Content-type': 'application/json'}
elastic_url ='https:// YourStellarCyberServer/connect/api/data/aella-ser-2021.04.28-/amsg/ZbWyFnkBZsFu_Ay8bQxZ'
response = requests.get(elastic_url, auth=('Username','API Key'), verify=False, headers = headers)
a = json.loads(response.text)
print(json.dumps(a, indent=4, sort_keys=True))
In response, we get a document similar to the following:
{
"_id": "ZbWyFnkBZsFu_Ay8bQxZ",
"_index": "aella-ser-1618012988732-",
"_primary_term": 1,
"_seq_no": 3122162,
"_source": {
"custom_ser_field": {
"comments": "Test Comment",
"tryout": "test"
},
"detected_field": "my field test",
"event_name": "demo test",
"event_source": "ActZero",
"event_status": "New",
"msg_origin": {
"source": "API"
},
"tenantid": "",
"timestamp": 1619583462308,
"write_by": "API",
"write_time": 1619583462741
},
"_type": "amsg",
"_version": 1,
"found": true
}