Description
This article describes how to write SQL queries that can be used in a report.
Scope
FortiAnalyzer.
Solution
In FortiAnalyzer, under Reports -> Datasets, there is a big variety of predefined queries, which cover most use cases for the data available in the different log types.
The Dataset names generally give some idea about what results to expect, but of course it is possible to review the SQL syntax for more details, and test the output.
The Search box in the Dataset section is filtering not only based on the names, but looks also into the SQL queries.
This allows easy searching for specific log columns, log tables, functions, macros, variables, etc..
Fortinet recommends using primarily these predefined Datasets, mainly because they are:
- Optimized to work correctly with the syntax specifics of our report engine.
- Optimized to perform well within the hcache context, ensuring fast report generation and optimal resource utilization
- Updated by the firmware upgrades, in case of log schema, function, or variables change.
- Fully supported by the Fortinet Technical Support, in case of problems.
If a Dataset returns more columns/data than required, or if the columns are not ordered as liked, there is no need to modify the Dataset itself.
It can be still used in a custom chart, which allows removing columns, different formatting, filters, changing the positions and names of the columns, overriding the Dataset’s ORDER BY, setting a Limit, etc..
If additional columns are needed, the best practice is to find a predefined Dataset, as close as possible to your requirement, and clone it.
The SQL query in the cloned Dataset can then be modified, which usually requires less effort and knowledge than creating new one form scratch.
If none of the predefined Datasets is close enough to the needs, review the 'Log View' tables.
Since the reports are based on the logs, exporting a table view using chart builder normally helps.
The resulting query can be then modified if additional columns or logic are needed.
The FortiView charts can also be exported as report chart, but the FortiView queries are usually quite complex and may be hard to understand and modify.
If none of the above options can be used, FortiAnalyzer allows creating custom Datasets from scratch.
This requires good SQL knowledge, as well as understanding of the log schema of the respective logging Device.
It is still a good idea to go through the predefined datasets, in order to understand the FortiAnalyzer specific SQL syntax.
Additional information regarding the FortiAnalyzer SQL syntax is available in the NSE 5 training documentation.
To create new custom dataset, go to Reports -> Datasets and select 'Create New'.
- Choose Log Type.
This is the base table to search in.
The specific tables, depending on Device and time period will be stored in the $log macro used below.
For example, to search something specific related to traffic, 'Traffic' table must be selected as 'Log Type'.
- Type or paste the SQL query in the Query text area.
- Test the query using the options on the right, set the test time period and set all Devices or specify Devices/VDOMs to test with
Note.
Only administrators assigned with the default 'Super_User' access profile can run test queries.
IMPORTANT.
$filter has to appear at the WHERE clause for the time filter on the right to be applied.
This also affects any filters later imposed at the chart or the report level.
Another point to consider is that the 'Test' button will not return all the available results but just a sample of them, based on the first 100 records of the selected log table, in order to help validate the logic of the query.
Note that if the field to be retrieved from the selected table is unknown, the global SQL query 'select * from $log' can be run.
This will show all columns of the tables included in $log.
For example, if 'Log Type Traffic' is selected, the output will include all columns of the traffic logs, and the data from the first 100 records.
One can then see the column names and what type of data which contains, and use the required ones in the SQL query.
Use Case Example:
Report of traffic hitting specific policy from a specific VDOM for each day.
There are basically two approaches:
- Make the query as stripped down as possible.
- Apply all the ordering and filtering on the upper levels or include everything in a single SQL query.
The first one favors re usability of a report with minor changes, while the second gives more control and insight of the behavior from the start.
First approach.
Build a basic dataset.
select from_itime(itime)::date as datestamp, sum(rcvdbyte) as traffic_in, sum(sentbyte) as traffic_out, policyid, devid
from $log
where $filter
group by datestamp, policyid, devid
Apply the Dataset it in a Chart.
Format the columns and specify order by if needed (when enabled in the Chart, 'Order By' overrides the ORDER BY clause of the Dataset).
In this example the user wants to see the traffic for each day with the newest one appearing first, so 'Order By': datestamp descending is chosen
.
Insert the chart in new report.
On the Layout tab, one can add comments, pictures, tables etc., format the page in general and also insert a default or custom chart on the top left corner.
Chart specific filters can be specified when inserting the Chart in the Layout or after the chart is Inserted, by selectingthe chart -> Chart Properties.
Note.
Caution is needed when choosing log fields, as it has to match the columns of the log type the dataset is querying. For this specific example:
Alternatively, the filter can be applied on the whole report.
This is under the Settings tab of the report.
Time Period and Devices can be specified globally for the report, along with filters:
Second approach.
select from_itime(itime)::date as datestamp, sum(rcvdbyte) as traffic_in, sum(sentbyte) as traffic_out, policyid, devid
from $log
where $filter and policyid=1 and vd='root'
group by datestamp, policyid, devid
order by datestamp descSince the filtering and ordering are defined in the SQL query, no filtering is required afterwards by using this approach.
However, if the filters need to be modified in future, change to the SQL query are required.
This is a good approach for applying filters that never change, like excluding certain log records that are not part of the result, or can cause problems with the calculations.
An example of such records are the intermediate traffic logs, generated every 2 minutes for the long lasting firewall sessions.
If not excluded, those may cause incorrect traffic statistics.
To filter out these log records, the following has to be added to the above WHERE clause (It is also good idea to use this filter in First approach):
FortiAnalyzer version 6.2:
…
where $filter and (logflag&1>0) <----- This is a special string, which excludes the intermediate long session logs, records for session start, and other types of logs that are excluded from the traffic statistics.
…
FortiAnalyzer versions 5.6 and 6.0:
…
where $filter and logid_to_int(logid) not in (4,7,14,20)
…
Other SQL examples.
Example 1:
Check for packet denied by UTM for a source or destination matching '172.30.xx.xx':
select from_dtime(dtime) as tstamp, vd, srcip, dstip, srcport, dstport, srcintf, dstintf, service, msg, count(*) as totalnum
from $log
where $filter and utmaction='block' and (ipstr(srcip) like '172.30%' or ipstr(dstip) like '172.30%')
group by tstamp, srcip, dstip, srcport, dstport, srcintf, dstintf, vd, service, msg
order by totalnum descExample 2:
SQL request example to report the top 10 hosts that got most packet dropped with error message 'no session matched' or 'replay packet(allow_err), drop':
Select from_dtime(dtime) as tstamp, vd, srcip, dstip, srcport, dstport, srcintf, dstintf, service, msg, count(*) as totalnum
from $log
where $filter and (msg='no sessions matched' OR msg='replay packet(allow_err), drop')
group by tstamp, srcip, dstip, srcport, dstport, srcintf, dstintf, vd, service, msg
order by totalnum desc
Example 3:
Top10 traffic shapers by dropped bytes:
select shapersentname, shapingpolicyid, sum(coalesce(shaperdroprcvdbyte, 0)) as dropped_rcvd, sum(coalesce(shaperdropsentbyte, 0)) as dropped_sent, (sum(coalesce(shaperdroprcvdbyte, 0))+sum(coalesce(shaperdropsentbyte, 0))) as dropped_total
from $log where $filter and (logflag&1>0) and shapingpolicyid is not null
group by shapersentname, shapingpolicyid
order by dropped_total desc
limit 10
Example 4:
Combining data from different log types, in multiple SELECT statements:
select daystamp, sum(total_num) as total_num
from (
###( select $day_of_week as daystamp,
count(*) as total_num
from $log-virus
where $filter and nullifna(virus) is not null
group by daystamp)###
union all
###( select $day_of_week as daystamp,
count(*) as total_num
from $log-attack
where $filter and nullifna(attack) is not null
group by daystamp)###
union all
###( select $day_of_week as daystamp,
count(*) as total_num
from $log-app-ctrl
where $filter and lower(appcat)='botnet'
group by daystamp)###
) t
group by daystamp
order by daystamp
The available log types are:
$log-attack
$log-dlp
$log-event
$log-netscan
$log-appctrl
$log-emailfilter
$log-traffic
$log-virus
$log-webfilter
Example 5:
Using $flex_timestamp and the FortiAnalyzer Date/Time macros to adjust the timescale resolution.
The query below returns average number of concurrent SSL VPN users/tunnels per time sample:
select $flex_timestamp as time_sample,
coalesce(count(distinct `user`), 0) as total_num_user,
coalesce(count(distinct tunnelid), 0) as total_num_tunnels
from $log
where $filter and subtype='vpn' and tunneltype like 'ssl-tunnel' and action in ('tunnel-up', 'tunnel-stats', 'tunnel-down') and tunnelid is not null
group by time_sampleWhen $flex_timestamp is used, the timescale is changing automatically depending on the report time range:
Alternatively, some of the FortiAnalyzer Date/Time macros can be used for this type of queries:
Example 6:
One relatively more complex query for monitoring remote VPN users activity.
It includes the users’ public and tunnel IPs, and uses 'string_agg(distinct' function to aggregate the unique IP entries in one row for each user:
select coalesce(user_agg, xauthuser_agg) as user_src,
group_src,
(case when t_type like 'ipsec%' then 'IPsec' else 'SSL' end) as tunneltype,
string_agg(distinct ipstr(remip), ' \\ ') as remip,
string_agg(distinct ipstr(tunnelip), ' // ') as tunnelip,
from_dtime(min(s_time)) as start_time,
from_dtime(max(e_time)) as end_time,
sum(duration) as duration,
sum(total_num) as total_num
from (
select
devid,
vd,
t_type,
remip,
tunnelip,
string_agg(distinct user_agg, ' ') as user_agg,
string_agg(distinct xauthuser_agg, ' ') as xauthuser_agg,
coalesce(group_agg, xauthgroup_agg) as group_src,
tunnelid,
min(s_time) as s_time,
max(e_time) as e_time,
(case when min(s_time)=max(e_time) then max(max_duration) else max(max_duration)-min(min_duration) end) as duration,
sum(tunnelup) as total_num
from ###(
select
devid,
vd,
remip,
nullifna(`user`) as user_agg,
nullifna(`xauthuser`) as xauthuser_agg,
nullifna(`group`) as group_agg,
nullifna(`xauthgroup`) as xauthgroup_agg,
(case when tunneltype like 'ipsec%' then 'ipsec' else tunneltype end) as t_type,
tunnelid,
tunnelip,
min(coalesce(dtime, 0)) as s_time,
max(coalesce(dtime, 0)) as e_time,
max(coalesce(duration,0)) as max_duration,
min(coalesce(duration,0)) as min_duration,
sum((case when action='tunnel-up' then 1 else 0 end)) as tunnelup
from $log
where $filter and subtype='vpn' and (tunneltype like 'ipsec%' or tunneltype like '%tunnel') and action in ('tunnel-up', 'tunnel-stats', 'tunnel-down') and tunnelid is not null and tunnelid!=0
group by user_agg, xauthuser_agg, group_agg, xauthgroup_agg, devid, vd, remip, tunnelid, tunnelip, t_type
)### t
where (t_type like '%tunnel' or (t_type like 'ipsec%' and not (tunnelip is null or tunnelip='0.0.0.0')))
group by devid, vd, remip, t_type, tunnelip, tunnelid, group_src) tt
where group_src is not null
group by user_src, group_src, tunneltype
order by duration desc