Technical Note: [FortiSOAR / Cybersponse Tricks'n'Tips] Get the Status of a Playbook Externally using an API call along with Public/Private Keys in Python Django - CODE
Description
from os.path import abspath, dirname, join from django.http import HttpResponse from django.shortcuts import render from datetime import datetime from django.urls import path import requests import hashlib import base64 import json import hmac HOST_URI = "000.00.0.00" TRIGGER = "......................." DEFAULT_ALGORITHM = "sha256" CURRENT_DIR = dirname(abspath(__file__)) #--------------------------------------------------------------------------------------------------------------- with open(join(CURRENT_DIR, '-----------.txt'), 'r') as public_key_file: public_key = public_key_file.read() with open(join(CURRENT_DIR, '-----------.txt'), 'r') as private_key_file: private_key = private_key_file.read() #-------------------------------------------------------------------------------------------- -------------------- def home(request): full_uri = 'https://' + HOST_URI + TRIGGER auth_header = generate_hmac('GET',full_uri, None, private_key, public_key) headers = { 'Authorization': auth_header } try: req = requests.request(method='GET', url=full_uri, headers=headers, verify=False) if req.json()['isActive'] == True: context = {'response_string' :'Playbook is Active'} else: context = {'response_string' :'Playbook is InActive'} except Exception as e: return HttpResponse(e) return render(request, "Base.html", context) #-------------------------------------------------------------------------------------------- -------------------- def generate_hmac(method, full_uri, payload, private_key, public_key): if method == 'GET': payload = public_key timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") print (timestamp) payload = payload if type(payload) is bytes else payload.encode() digest_method = hashlib.new(DEFAULT_ALGORITHM) digest_method.update(payload) hashed_payload = digest_method.hexdigest() raw_fingerprint = "{0}.{1}.{2}.{3}.{4}".format(DEFAULT_ALGORITHM, method, timestamp, full_uri, hashed_payload) hashed = hmac.new(private_key.encode(), raw_fingerprint.encode(), hashlib.sha256) hashed_fingerprint = hashed.hexdigest() header = base64.b64encode( '{0};{1};{2};{3}'.format(DEFAULT_ALGORITHM, timestamp, public_key, hashed_fingerprint).encode()) return 'CS {}'.format(header.decode()) #-------------------------------------------------------------------------------------------- ---------------------- def switch(request, status): print (status) if status == "on": full_uri = 'https://' + HOST_URI + TRIGGER auth_header = generate_hmac('GET',full_uri, None, private_key, public_key) headers = { 'Authorization': auth_header } try: req = requests.request(method='GET', url=full_uri, headers=headers, verify=False) if req.json()['isActive'] == True: context = {'response_string' :'Playbook is Already - Active'} else: payload = {'isActive': True} auth_header = generate_hmac('PUT',full_uri, json.dumps(payload), private_key, public_key) headers = { 'Authorization': auth_header } req = requests.request(method='PUT', url=full_uri, headers=headers, data=json.dumps(payload), verify=False) if req.ok: context = {'response_string' : 'Playbook Set to Active'} return render(request, 'home.html', context) except Exception as e: print(e) return HttpResponse(e) elif status == "off": full_uri = 'https://' + HOST_URI + TRIGGER auth_header = generate_hmac('GET',full_uri, None, private_key, public_key) headers = { 'Authorization': auth_header } try: req = requests.request(method='GET', url=full_uri, headers=headers, verify=False) if req.json()['isActive'] == False: context = {'response_string' :'Playbook is Already - InActive'} else: payload = {'isActive': False} auth_header = generate_hmac('PUT',full_uri, json.dumps(payload), private_key, public_key) headers = { 'Authorization': auth_header } req = requests.request(method='PUT', url=full_uri, headers=headers, data=json.dumps(payload), verify=False) if req.ok: context = {'response_string' : 'Playbook Set to InActive'} return render(request, 'Base.html', context) except Exception as e: print(e) return HttpResponse(e) from os.path import abspath, dirname, join from django.http import HttpResponse from django.shortcuts import render from datetime import datetime from django.urls import path import requests import hashlib import base64 import json import hmac HOST_URI = "000.00.0.00" TRIGGER = "......................." DEFAULT_ALGORITHM = "sha256" CURRENT_DIR = dirname(abspath(__file__)) #--------------------------------------------------------------------------------------------------------------- with open(join(CURRENT_DIR, '-----------.txt'), 'r') as public_key_file: public_key = public_key_file.read() with open(join(CURRENT_DIR, '-----------.txt'), 'r') as private_key_file: private_key = private_key_file.read() #-------------------------------------------------------------------------------------------- -------------------- def home(request): full_uri = 'https://' + HOST_URI + TRIGGER auth_header = generate_hmac('GET',full_uri, None, private_key, public_key) headers = { 'Authorization': auth_header } try: req = requests.request(method='GET', url=full_uri, headers=headers, verify=False) if req.json()['isActive'] == True: context = {'response_string' :'Playbook is Active'} else: context = {'response_string' :'Playbook is InActive'} except Exception as e: return HttpResponse(e) return render(request, "Base.html", context) #-------------------------------------------------------------------------------------------- -------------------- def generate_hmac(method, full_uri, payload, private_key, public_key): if method == 'GET': payload = public_key timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") print (timestamp) payload = payload if type(payload) is bytes else payload.encode() digest_method = hashlib.new(DEFAULT_ALGORITHM) digest_method.update(payload) hashed_payload = digest_method.hexdigest() raw_fingerprint = "{0}.{1}.{2}.{3}.{4}".format(DEFAULT_ALGORITHM, method, timestamp, full_uri, hashed_payload) hashed = hmac.new(private_key.encode(), raw_fingerprint.encode(), hashlib.sha256) hashed_fingerprint = hashed.hexdigest() header = base64.b64encode( '{0};{1};{2};{3}'.format(DEFAULT_ALGORITHM, timestamp, public_key, hashed_fingerprint).encode()) return 'CS {}'.format(header.decode()) #-------------------------------------------------------------------------------------------- ---------------------- def switch(request, status): print (status) if status == "on": full_uri = 'https://' + HOST_URI + TRIGGER auth_header = generate_hmac('GET',full_uri, None, private_key, public_key) headers = { 'Authorization': auth_header } try: req = requests.request(method='GET', url=full_uri, headers=headers, verify=False) if req.json()['isActive'] == True: context = {'response_string' :'Playbook is Already - Active'} else: payload = {'isActive': True} auth_header = generate_hmac('PUT',full_uri, json.dumps(payload), private_key, public_key) headers = { 'Authorization': auth_header } req = requests.request(method='PUT', url=full_uri, headers=headers, data=json.dumps(payload), verify=False) if req.ok: context = {'response_string' : 'Playbook Set to Active'} return render(request, 'home.html', context) except Exception as e: print(e) return HttpResponse(e) elif status == "off": full_uri = 'https://' + HOST_URI + TRIGGER auth_header = generate_hmac('GET',full_uri, None, private_key, public_key) headers = { 'Authorization': auth_header } try: req = requests.request(method='GET', url=full_uri, headers=headers, verify=False) if req.json()['isActive'] == False: context = {'response_string' :'Playbook is Already - InActive'} else: payload = {'isActive': False} auth_header = generate_hmac('PUT',full_uri, json.dumps(payload), private_key, public_key) headers = { 'Authorization': auth_header } req = requests.request(method='PUT', url=full_uri, headers=headers, data=json.dumps(payload), verify=False) if req.ok: context = {'response_string' : 'Playbook Set to InActive'} return render(request, 'Base.html', context) except Exception as e: print(e) return HttpResponse(e)