FortiSOAR Discussions
gurveersingh
New Contributor

Cisco Meraki L7 Connector Fixes

Hello,

 

Here are the Fixes for the Cisco Meraki L7 connector:

API Header: Updated the API header to align with the Cisco Meraki API requirements.

Health Check Endpoint: The endpoint for health checks has been updated.

New Functions Added:

  1. get_organization: Fetches the organization details.
  2. get_networks: Retrieves the network ID.

Firewall Rules Update Function:

The update_network_firewall_rules function has been updated to append new rules rather than overwriting existing ones. This ensures that any new rules you add will be integrated with the current rules instead of replacing them.

 

The input for the new rules parameter should be like this

{
            "policy": "<string>",
            "type": "<string>",
            "value": "<string>"
        }

 

Connector.py

from .operations import _check_health , get_curr_oper_info, api_request, operations
from connectors.core.connector import Connector, get_logger, ConnectorError


logger = get_logger('cisco_meraki_mx_l7_firewall')


class CiscoMeraki(Connector):
    def execute(self, config, operation, params, **kwargs):
        try:
            connector_info = {"connector_name": self._info_json.get('name'),
                              "connector_version": self._info_json.get('version')}
            operation = operations.get(operation)
        except Exception as err:
            logger.exception(err)
            raise ConnectorError(err)
        return operation(config, params, connector_info)

    def check_health(self, config):
        logger.info('starting health check')
        connector_info = {"connector_name": self._info_json.get('name'),
                          "connector_version": self._info_json.get('version')}
        _check_health(config, connector_info)
        logger.info('Completed health check and no errors found')

 

operations.py

from requests import request, exceptions as req_exceptions
from connectors.core.connector import get_logger, ConnectorError
import json


logger = get_logger('cisco_meraki_mx_l7_firewall')


def get_curr_oper_info(info_json, action):
    try:
        operations = info_json.get('operations')
        exec_action = [action_info for action_info in operations if action_info['operation'] == action]
        return exec_action[0]

    except Exception as err:
        logger.error("{0}".format(str(err)))
        



def api_request(config, url, method='GET', params=None, body=None, connector_info=None):
        data = None
        server_url = config.get('server_url')
        if not server_url.startswith('https://'):
            server_url = 'https://' + server_url

        headers = {'Authorization': f'Bearer {config.get("api_key")}',
                   'Accept': 'application/json',
                   'Content-Type': 'application/json'}

        response = request(method=method, url=url, params=params, headers=headers, data=body,
                           verify=config.get('verify_ssl'))
        
        if response.status_code in[200, 201]:
            return response.json()

        if response.status_code != 200 and response.status_code != 201:
            err_msg = "Status Code: {0} Details: {1}".format(response.status_code, response.reason)
            


def check_health(config):
    try:
        url = '{0}/api/v1/organizations'.format(config.get('server_url'))
        resp= api_request(config,  url=url , method='GET', params={})
        if resp:
            return True

    except Exception as e:
        logger.error(e)
        
        
def _check_health(config, connector_info):
    try:
        url = '{0}/api/v1/organizations'.format(config.get('server_url'))
        resp= api_request(config,  url=url , method='GET', params={})
        if resp:
            return True

    except Exception as e:
        logger.error(e)
        
        
def get_organizations(config, parms, connector_info):
    organization_list = 'https://{0}/api/v1/organizations'.format(config.get('server_url'))
    return api_request(config, organization_list, method='GET', params={}, connector_info=connector_info)
  
def get_networks(config, parms, connector_info):
    organization_list = 'https://{0}/api/v1/organizations/{1}/networks'.format(config.get('server_url'), parms.get('organizationID'))
    return api_request(config, organization_list, method='GET', params={}, connector_info=connector_info)


def get_network_firewall_rules(config, parms, connector_info):
    organization_list = 'https://{0}/api/v1/networks/{1}/appliance/firewall/l7FirewallRules/'.format(config.get('server_url'), parms.get('networkId'))
    return api_request(config, organization_list, method='GET', connector_info=connector_info)

  
def update_network_firewall_rules(config, parms, connector_info):
    organization_list = 'https://{0}/api/v1/networks/{1}/appliance/firewall/l7FirewallRules/'.format(config.get('server_url'), parms.get('networkId'))
    response = api_request(config, organization_list, method='GET', connector_info=connector_info)

    new_rule = parms.get('rule')

    if 'rules' in response:
        response['rules'].append(new_rule)
    else:
        # If 'rules' doesn't exist, create it and add the new rule
        response['rules'] = [new_rule]

    json_response = api_request(config, organization_list, method='PUT',body=json.dumps(response), connector_info=connector_info)

    return json_response



def get_network_firewall_rules_application_categories(config, parms, connector_info):
    organization_list = 'https://{0}/api/v1//networks/{1}/appliance/firewall/l7FirewallRules/applicationCategories'.format(config.get('server_url'), parms.get('networkId'))

    return api_request(config, organization_list, method='GET', connector_info=connector_info)

operations = {
    'get_network_firewall_rules': get_network_firewall_rules,
    'update_network_firewall_rules': update_network_firewall_rules,
    'get_network_firewall_rules_application_categories': get_network_firewall_rules_application_categories,
    'get_organizations': get_organizations,
  	'get_networks': get_networks
}

 

1 REPLY 1
Prerna_Joshi
Staff
Staff

Thanks Gurveer. We will incorporate these changes in the next version of the connector.