Examples: Using the API to Download Sensor Images

You can use the Stellar Cyber API to download sensor images from the Stellar Cyber Platform. Using the API to download sensor images has the following benefits:

  • Reduces the load on the Stellar Cyber GUI server and associated network infrastructure.

  • Allows deployment of server sensors in customized automation scripts.

Refer to Configuring API Authentication for general requirements to use the API.

API Endpoints for Sensor Image Downloads

The API provides the following endpoints for use in downloading sensor images:

  • Retrieve information on the sensor images available for download:

    https://<Platform URL>/connect/api/v1/sensor-images

  • Download a specified image:

    https://<Platform URL>/connect/api/v1/sensor-images/download/<version>/<image>

Syntax Details

The reference material from the API's swagger.json file provides details on the syntax for both of the API endpoints listed above. In addition, you can use the ? > API Docs link to try out specific commands directly from your browser. Click the Authorize button at the top of the page that appears, supply the token from the System | Users page for your account, and you can try out public API commands.

Examples

This section provides some Python 3 scripts that you can adapt for your own needs. In all cases, you will need to perform the following changes:

  • Replace the value HOST = "<Platform Hostname or IP>" with the hostname or IP address of your Stellar Cyber Platform.

  • Replace the value email = "<email>" with the email address for a valid account on your own Stellar Cyber Platform.

  • Replace the value refresh_token = "<token>" with the actual API token for the specified user account from the System | Users page.

Retrieve and Display Information on Available Images

The first script simply prints the available images to the screen:

Copy
#!/usr/bin/python3

import base64
from urllib.parse import urlunparse
import hashlib
from tqdm import tqdm

import requests

requests.packages.urllib3.disable_warnings()

# Hostname or IP for our Stellar Cyber Platform
HOST = "<Platform Hostname or IP>"


def getAccessToken(email, refresh_token):
    auth = base64.b64encode(bytes(email + ":" + refresh_token, "utf-8")).decode("utf-8")
    headers = {
        "Authorization": "Basic " + auth,
        "Content-Type": "application/x-www-form-urlencoded",
    }
    url = urlunparse(("https", HOST, "/connect/api/v1/access_token", "", "", ""))
    res = requests.post(url, headers=headers, verify=False)
    print(res.status_code)
    #    print(res)
    return res.json()["access_token"]


def getSensorImagesInfo(token):
    headers = {"Authorization": "Bearer " + token}
    url = urlunparse(("https", HOST, "/connect/api/v1/sensor-images", "", "", ""))
    res = requests.get(url, headers=headers, verify=False)
    return res.json()

if __name__ == "__main__":
    # step1: generate refresh token via UI
    #  --> done
    email = "<email>"
    refresh_token = "<token>"

    # step2: generate access token (JWT)
    jwt = getAccessToken(email, refresh_token)
    # print(jwt)

    # step3: get info
    res = getSensorImagesInfo(jwt)
    print(res)

Sample Output

Download Images

The second script downloads images and calculates and compares their hash values to make sure they downloaded successfully. The files are stored in /tmp/stellar-images/download/. You can change this path by editing the script.

Copy
#!/usr/bin/python3

import base64
from urllib.parse import urlunparse
import hashlib
from tqdm import tqdm

import requests

requests.packages.urllib3.disable_warnings()

# Hostname or IP for our Stellar Cyber Platform
HOST = "<Platform Hostname or IP>"


def getAccessToken(email, refresh_token):
    auth = base64.b64encode(bytes(email + ":" + refresh_token, "utf-8")).decode("utf-8")
    headers = {
        "Authorization": "Basic " + auth,
        "Content-Type": "application/x-www-form-urlencoded",
    }
    url = urlunparse(("https", HOST, "/connect/api/v1/access_token", "", "", ""))
    res = requests.post(url, headers=headers, verify=False)
    print(res.status_code)
    #    print(res)
    return res.json()["access_token"]


def getSensorImagesInfo(token):
    headers = {"Authorization": "Bearer " + token}
    url = urlunparse(("https", HOST, "/connect/api/v1/sensor-images", "", "", ""))
    res = requests.get(url, headers=headers, verify=False)
    return res.json()


def download_sensor_image(token, path, destination, expected_sha1):
    headers = {"Authorization": "Bearer " + token}
    url = urlunparse(("https", HOST, path, "", "", ""))
    response = requests.get(url, headers=headers, verify=False, stream=True)

    if response.status_code == 200:
        total_size = int(response.headers.get("Content-Length", 0))
        with open(destination, "wb") as file, tqdm(total=total_size, unit="B", unit_scale=True) as pbar:
            for data in response.iter_content(chunk_size=1024):
                pbar.update(len(data))
                file.write(data)

        calculated_sha1 = calculate_sha1(destination)
        print(f"Sha1 of the downloaded file: {calculated_sha1}")
        print(f"Sha1 of the expected     : {expected_sha1}")

        if calculated_sha1 == expected_sha1:
            print("File downloaded and verified successfully.")
        else:
            print("Verification failed. The downloaded file may be corrupted.")
    else:
        print(f"Download failed. Status code: {response.status_code}")


def calculate_sha1(file_path):
    sha1 = hashlib.sha1()
    with open(file_path, "rb") as file:
        for byte_block in iter(lambda: file.read(1024 * 1024), b""):
            sha1.update(byte_block)
    return sha1.hexdigest()


if __name__ == "__main__":
    # step1: generate refresh token via UI
    #  --> done
    email = "<email>"
    refresh_token = "<token>"

    # step2: generate access token (JWT)
    jwt = getAccessToken(email, refresh_token)
    # print(jwt)

    # step3: get info
    res = getSensorImagesInfo(jwt)
    # print(res)

    # step4: get
    download_path = res["data"]["datasensor"]["upgrade-linux_saas"]["download_path"]
    expected_sha1 = res["data"]["datasensor"]["upgrade-linux_saas"]["sha1"]
    file_name = res["data"]["datasensor"]["upgrade-linux_saas"]["package"]
    destination = f"/tmp/stellar-images/download/{file_name}"
    print(f"Download file from: {download_path} to ")
    download_sensor_image(jwt, download_path, destination, expected_sha1)