hello, i have problem when i getting log from FortiAnalyzer
This function is designed to retrieve logs from FortiAnalyzer and extract the application names from those logs.
However, I'm encountering an issue where not all logs are being retrieved consistently.
Some get log requests return the expected results, while others fail without any clear reason.
def process_logs_for_policy_two_interfaces(policies_two_interfaces, src_interface_name, des_interface_name, offset, last_offset):
# Ensure policies_one_interface is always a list
global error_count
if isinstance(policies_two_interfaces, dict): # If a single policy is passed
policies_two_interfaces = [policies_two_interfaces] # Convert it to a list
session_token = None
# Check for existing policies before logging in
policies_to_process = []
for policy in policies_two_interfaces[:]: # Use a slice to iterate over a copy of the list
policy_name = policy["name"]
policy_id = policy["policyid"]
existing_policy = check_existing_policy(policy_id)
state = save_to_excel_for_two_interfaces(src_interface_name, des_interface_name, policy_id, "",
policy_name, "", "", "")
if existing_policy:
print( f"Policy name '{policy_name}' already exists in sheet '{existing_policy['sheet_name']}'. Reusing data...")
# Extract the row data (excluding the interface name)
row_data = existing_policy["row_data"]
policy_name = row_data[1] # Policy Name
app_unknown = row_data[3] # Unknown Apps
total_logs = row_data[4] # total logs
offset_policy = row_data[5] # offset
all_app_name = row_data[6:] # App Names
# Save the policy with the new interface name
state = save_to_excel_for_two_interfaces(src_interface_name, des_interface_name, policy_id, all_app_name,
policy_name, app_unknown, total_logs, offset_policy)
if state:
break
else:
# Add the policy to the list of policies to process
policies_to_process.append(policy)
# If no policies need processing, exit early
if not policies_to_process:
print("All policies already exist in the Excel file. No processing needed.")
return
# Login to FortiAnalyzer (only if there are policies to process)
login_payload = {
"method": "exec",
"params": [
{
"data": {
"user": f"{analyzer_username}", # Replace with your analyzer_username if needed
"passwd": f"{analyzer_password}" # Replace with your analyzer_password if needed
},
"url": "/sys/login/user"
}
],
"id": 1
}
print("Logging in to FortiAnalyzer...")
attempt_login = 0
max_retry = 2
retry_delay = 5 # Delay between retries in seconds
while attempt_login <= max_retry:
try:
login_resp = requests.post(base_analyzer_url, json=login_payload, verify=False)
login_resp.raise_for_status() # Raise an exception for HTTP errors
login_data = login_resp.json()
session_token = login_data.get("session")
if session_token:
print(f"Login successful! Session token: {session_token}")
break # Exit the retry loop if login is successful
else:
print("Failed to retrieve session token.")
except requests.exceptions.RequestException as e:
print(f"Failed to connect to FortiAnalyzer: {e}")
attempt_login += 1
if attempt_login <= max_retry:
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
if not session_token:
print("Max retries reached. Unable to log in. Exiting.")
return
print(f"Session token: {session_token}")
# Process only the policies that need to be processed
for policy in policies_to_process:
all_app_name = set()
policy_name = policy["name"]
policy_id = policy["policyid"]
app_unknown = 0
print(f"Processing logs for policy: {policy_name} (Policy ID: {policy_id})")
attempt_search = 0
# Create a log search task for the policy
search_payload = {
"id": "123456789",
"jsonrpc": "2.0",
"method": "add",
"params": [
{
"apiver": 3,
"case-sensitive": False,
"device": [
{"devid": f"{device_id}"} # Replace with your device ID
],
"filter": f"policyid=={policy_id}", # Filter by policy ID
"logtype": "traffic", # Log type is traffic
"time-order": "desc",
"url": f"/logview/adom/{adom}/logsearch",
}
],
"session": session_token
}
while attempt_search <= max_retry:
try:
print("\nCreating search task...")
search_resp = requests.post(base_analyzer_url, json=search_payload, verify=False)
search_data = search_resp.json()
tid = search_data.get("result", {}).get("tid")
if tid:
print(" create search task. 'tid' found.")
break
else:
print("Failed to retrieve 'tid' token.")
except requests.exceptions.RequestException as e:
print(f"Failed to connect to FortiAnalyzer to get 'tid': {e}")
attempt_search += 1
if attempt_search <= max_retry:
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
if not tid:
print("Max retries reached. Unable to get 'tid'. Exiting.")
return
print(f"Search task created with tid: {tid}")
# Poll for search task completion
logs_payload = {
"id": "123456789",
"jsonrpc": "2.0",
"method": "get",
"params": [
{
"apiver": 3,
"offset": 0,
"limit": 1000,
"url": f"/logview/adom/{adom}/logsearch/{tid}"
}
],
"session": session_token
}
print("\nPolling for search task progress...")
total_logs = 0
max_poll_attempts = 60 # Maximum 1 minute wait (60 attempts * 1 second)
poll_attempt = 0
while poll_attempt < max_poll_attempts:
try:
count_resp = requests.post(base_analyzer_url, json=logs_payload, verify=False,timeout=10)
count_resp.raise_for_status() # Raise HTTP errors
count_data = count_resp.json()
result = count_data.get("result", {})
progress = result.get("percentage", 0)
total_logs = result.get("total-count", 0)
task_status = result.get("status", "")
print(f"Progress: {progress}%, Total logs: {total_logs}, Status: {task_status}")
# Break conditions
if progress == 100:
print("Search task completed successfully")
break
if total_logs >= last_offset:
print(f"Reached target log count: {total_logs}")
break
if task_status == "error":
print("Search task failed")
break
except Exception as e:
print(f"Polling error: {str(e)}")
if "session" in str(e).lower():
# Session likely expired - refresh and retry
logs_payload["session"] = session_token
poll_attempt += 1
time.sleep(1) # Polling interval
else:
print(f"Max polling attempts reached ({max_poll_attempts}) without completion")
# Handle timeout case (cancel task, etc.)
return
# Final adjustment of last_offset
if 0 < total_logs < last_offset:
print(f"Adjusting last_offset from {last_offset} to {total_logs}")
last_offset = total_logs
elif total_logs == 0:
last_offset = total_logs
print("Warning: No logs found for this policy")
# Retrieve logs using pagination
all_logs = []
limit = 1000 # Use 1000 logs per page
max_attempts = 2 # Maximum attempts to retry fetching logs
error_count = 0
print("\nRetrieving logs in pages...")
while offset < last_offset :
logs_payload = {
"id": "123456789",
"jsonrpc": "2.0",
"method": "get",
"params": [
{
"apiver": 3,
"offset": offset,
"limit": limit,
"url": f"/logview/adom/{adom}/logsearch/{tid}"
}
],
"session": session_token
}
# Retry mechanism for fetching logs
attempt = 0
while attempt < max_attempts:
logs_resp = requests.post(base_analyzer_url, json=logs_payload, verify=False)
time.sleep(1)
logs_resp = requests.post(base_analyzer_url, json=logs_payload, verify=False)
logs_data = logs_resp.json()
data = logs_data["result"].get("data", [])
if len(data) == 0:
print(f"Error fetching logs at offset {offset}")
if attempt == 2:
state = save_offset_error_to_excel(src_interface_name, des_interface_name, policy_name, policy_id,offset, 2)
if state:
offset+= limit
break
error_count += 1
attempt += 1
time.sleep(5) # Wait before retrying
continue
if state:
break
all_logs.extend(data)
for log in all_logs:
if 'app' in log and 'appid' in log and 'appcat' in log:
app_name_with_id = f"{log['app'].lower()} ({log['appid']})" # Combine app name and app ID
all_app_name.add(app_name_with_id) # Add to the set
else:
app_unknown += 1
all_logs.clear()
break # Exit the retry loop if data is fetched successfully
print(
f"Retrieved {len(data)} logs at offset {offset} %{int(100 * (offset + len(data)) / last_offset)} | app name:{all_app_name}")
# Save progress after processing each offset
save_state = save_to_excel_for_two_interfaces(src_interface_name, des_interface_name, policy_id, all_app_name,
policy_name, app_unknown, last_offset, offset + limit)
if save_state:
break
offset += len(data) # Move to the next batch
print(f"\nTotal logs retrieved for policy {policy_name}: {last_offset}")
print(f"Total unique app names found: {len(all_app_name)}")
print(f"Total logs with unknown app: {app_unknown}\n")
if error_count == 0 :
# Save app names to Excel
save_state = save_to_excel_for_two_interfaces(src_interface_name, des_interface_name, policy_id, all_app_name,
policy_name, app_unknown, last_offset, offset, is_finished=True)
if save_state:
break
# Remove values for all_app_name and app_unknown for the next policy
all_app_name.clear()
# Delete the search task (cleanup)
delete_payload = {
"session": session_token,
"id": "123456789",
"jsonrpc": "2.0",
"method": "delete",
"params": [
{
"apiver": 3,
"url": f"/logview/adom/{adom}/logsearch/{tid}"
}
]
}
attempt_delete = 0
while attempt_delete <= max_retry:
try:
print("\nDeleting search task...")
delete_resp = requests.post(base_analyzer_url, json=delete_payload, verify=False)
delete_data = delete_resp.json()
if delete_data:
print("\nDeleted search task...")
break
else:
print("Failed to Deleting search task.")
except requests.exceptions.RequestException as e:
print(f"Failed to connect to FortiAnalyzer to Deleting search task: {e}")
attempt_delete += 1
if attempt_delete <= max_retry:
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
if not delete_data:
print("Max retries reached. Unable to delete search task. Exiting.")
return
print("Deleted search task...")
# Logout from FortiAnalyzer with retry mechanism
logout_payload = {
"method": "exec",
"params": [
{
"url": "/sys/logout"
}
],
"session": session_token,
"id": 2
}
print("\nLogging out...")
attempt_logout = 0
max_retry = 2
retry_delay = 5 # Delay between retries in seconds
while attempt_logout <= max_retry:
try:
logout_resp = requests.post(base_analyzer_url, json=logout_payload, verify=False)
logout_resp.raise_for_status() # Raise an exception for HTTP errors
logout_data = logout_resp.json()
if logout_data:
print("Logout successful!")
break
else:
print("Failed to logout.")
except requests.exceptions.RequestException as e:
print(f"Failed to log out: {e}")
attempt_logout += 1
if attempt_logout <= max_retry:
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
if attempt_logout > max_retry:
print("Max retries reached. Unable to log out.")
this part related to get logs request
# Retrieve logs using pagination
all_logs = []
limit = 1000 # Use 1000 logs per page
max_attempts = 2 # Maximum attempts to retry fetching logs
error_count = 0
print("\nRetrieving logs in pages...")
while offset < last_offset :
logs_payload = {
"id": "123456789",
"jsonrpc": "2.0",
"method": "get",
"params": [
{
"apiver": 3,
"offset": offset,
"limit": limit,
"url": f"/logview/adom/{adom}/logsearch/{tid}"
}
],
"session": session_token
}
# Retry mechanism for fetching logs
attempt = 0
while attempt < max_attempts:
logs_resp = requests.post(base_analyzer_url, json=logs_payload, verify=False)
time.sleep(1)
logs_resp = requests.post(base_analyzer_url, json=logs_payload, verify=False)
logs_data = logs_resp.json()
data = logs_data["result"].get("data", [])
if len(data) == 0:
print(f"Error fetching logs at offset {offset}")
if attempt == 2:
state = save_offset_error_to_excel(src_interface_name, des_interface_name, policy_name, policy_id,offset, 2)
if state:
offset+= limit
break
error_count += 1
attempt += 1
time.sleep(5) # Wait before retrying
continue
if state:
break
all_logs.extend(data)
for log in all_logs:
if 'app' in log and 'appid' in log and 'appcat' in log:
app_name_with_id = f"{log['app'].lower()} ({log['appid']})" # Combine app name and app ID
all_app_name.add(app_name_with_id) # Add to the set
else:
app_unknown += 1
all_logs.clear()
break # Exit the retry loop if data is fetched successfully
print(
f"Retrieved {len(data)} logs at offset {offset} %{int(100 * (offset + len(data)) / last_offset)} | app name:{all_app_name}")
Hello,
Thank you for using the Community Forum. I will seek to get you an answer or help. We will reply to this thread with an update as soon as possible.
Thanks,
Hello,
We are still looking for an answer to your question.
We will come back to you ASAP.
Thanks,
Hello Mohammed,
I have found this solution, can you tell me if it helps?
To troubleshoot the issue of inconsistent log retrieval from FortiAnalyzer, follow these steps:
2. Verify API Requests:
3. Review Error Handling:
4. Check Network Connectivity:
5. Inspect Log Search Task:
6. Pagination and Offset Management:
7. Debugging and Logging:
8. Review FortiAnalyzer Logs: Check the FortiAnalyzer logs for any errors or warnings that might indicate issues with log retrieval. By following these steps, you should be able to identify and resolve the issue with inconsistent log retrieval from FortiAnalyzer. If the problem persists, consider reaching out to Fortinet support for further assistance.
User | Count |
---|---|
2534 | |
1351 | |
795 | |
641 | |
455 |
The Fortinet Security Fabric brings together the concepts of convergence and consolidation to provide comprehensive cybersecurity protection for all users, devices, and applications and across all network edges.
Copyright 2025 Fortinet, Inc. All Rights Reserved.