Hello,
Here are the Fixes for the Cisco Meraki L7 connector:
API Header: Updated the API header to align with the Cisco Meraki API requirements.
Health Check Endpoint: The endpoint for health checks has been updated.
New Functions Added:
Firewall Rules Update Function:
The update_network_firewall_rules function has been updated to append new rules rather than overwriting existing ones. This ensures that any new rules you add will be integrated with the current rules instead of replacing them.
The input for the new rules parameter should be like this
Connector.py
from .operations import _check_health , get_curr_oper_info, api_request, operations
from connectors.core.connector import Connector, get_logger, ConnectorError
logger = get_logger('cisco_meraki_mx_l7_firewall')
class CiscoMeraki(Connector):
def execute(self, config, operation, params, **kwargs):
try:
connector_info = {"connector_name": self._info_json.get('name'),
"connector_version": self._info_json.get('version')}
operation = operations.get(operation)
except Exception as err:
logger.exception(err)
raise ConnectorError(err)
return operation(config, params, connector_info)
def check_health(self, config):
logger.info('starting health check')
connector_info = {"connector_name": self._info_json.get('name'),
"connector_version": self._info_json.get('version')}
_check_health(config, connector_info)
logger.info('Completed health check and no errors found')
operations.py
from requests import request, exceptions as req_exceptions
from connectors.core.connector import get_logger, ConnectorError
import json
logger = get_logger('cisco_meraki_mx_l7_firewall')
def get_curr_oper_info(info_json, action):
try:
operations = info_json.get('operations')
exec_action = [action_info for action_info in operations if action_info['operation'] == action]
return exec_action[0]
except Exception as err:
logger.error("{0}".format(str(err)))
def api_request(config, url, method='GET', params=None, body=None, connector_info=None):
data = None
server_url = config.get('server_url')
if not server_url.startswith('https://'):
server_url = 'https://' + server_url
headers = {'Authorization': f'Bearer {config.get("api_key")}',
'Accept': 'application/json',
'Content-Type': 'application/json'}
response = request(method=method, url=url, params=params, headers=headers, data=body,
verify=config.get('verify_ssl'))
if response.status_code in[200, 201]:
return response.json()
if response.status_code != 200 and response.status_code != 201:
err_msg = "Status Code: {0} Details: {1}".format(response.status_code, response.reason)
def check_health(config):
try:
url = '{0}/api/v1/organizations'.format(config.get('server_url'))
resp= api_request(config, url=url , method='GET', params={})
if resp:
return True
except Exception as e:
logger.error(e)
def _check_health(config, connector_info):
try:
url = '{0}/api/v1/organizations'.format(config.get('server_url'))
resp= api_request(config, url=url , method='GET', params={})
if resp:
return True
except Exception as e:
logger.error(e)
def get_organizations(config, parms, connector_info):
organization_list = 'https://{0}/api/v1/organizations'.format(config.get('server_url'))
return api_request(config, organization_list, method='GET', params={}, connector_info=connector_info)
def get_networks(config, parms, connector_info):
organization_list = 'https://{0}/api/v1/organizations/{1}/networks'.format(config.get('server_url'), parms.get('organizationID'))
return api_request(config, organization_list, method='GET', params={}, connector_info=connector_info)
def get_network_firewall_rules(config, parms, connector_info):
organization_list = 'https://{0}/api/v1/networks/{1}/appliance/firewall/l7FirewallRules/'.format(config.get('server_url'), parms.get('networkId'))
return api_request(config, organization_list, method='GET', connector_info=connector_info)
def update_network_firewall_rules(config, parms, connector_info):
organization_list = 'https://{0}/api/v1/networks/{1}/appliance/firewall/l7FirewallRules/'.format(config.get('server_url'), parms.get('networkId'))
response = api_request(config, organization_list, method='GET', connector_info=connector_info)
new_rule = parms.get('rule')
if 'rules' in response:
response['rules'].append(new_rule)
else:
# If 'rules' doesn't exist, create it and add the new rule
response['rules'] = [new_rule]
json_response = api_request(config, organization_list, method='PUT',body=json.dumps(response), connector_info=connector_info)
return json_response
def get_network_firewall_rules_application_categories(config, parms, connector_info):
organization_list = 'https://{0}/api/v1//networks/{1}/appliance/firewall/l7FirewallRules/applicationCategories'.format(config.get('server_url'), parms.get('networkId'))
return api_request(config, organization_list, method='GET', connector_info=connector_info)
operations = {
'get_network_firewall_rules': get_network_firewall_rules,
'update_network_firewall_rules': update_network_firewall_rules,
'get_network_firewall_rules_application_categories': get_network_firewall_rules_application_categories,
'get_organizations': get_organizations,
'get_networks': get_networks
}
Thanks Gurveer. We will incorporate these changes in the next version of the connector.
The Fortinet Security Fabric brings together the concepts of convergence and consolidation to provide comprehensive cybersecurity protection for all users, devices, and applications and across all network edges.
Copyright 2024 Fortinet, Inc. All Rights Reserved.