In this blog, I am going to demonstrate how to create your own consolidated audit log archive across many mysql instances. In a followup I’ll show how to extend this example by creating a simple hash chain on that archive – so you can prove whether or not its been modified or tainted in any way and if so where.
In this blog’s example code I’ll be using a new extension to the mysql audit_log_read function as well as show why the mysql x api for sql can make certain tasks much simpler. This new audit reading feature was released in MySQL 8.0.22 Enterprise Edition. This blog relies the MySQL Shell running in both sql and python modes.
A few additional techniques I will show include
- Extracting rows from JSON audit data – transforming JSON data to tabular using the JSON_TABLE function.
- Inserting those rows from an audited database to an audit data archive MySQL database. As you’ll see the mysqlx api makes the far simpler.
A few factoids. As many DBAs can tell you, whether regulations prevent or for other security reasons, they don’t have and often time don’t want access to the underlying os server where mysql is running. No SSH for DBAs! In general, the fewer things that go onto that OS running a database service the better, especially from a security stand point.
For many reasons – security, analysis, etc – its a best practice to get get audit data off of the mysql server frequently and onto some central data store where you can access activity across all your mysql servers. Why?
- Easier to run analysis
- Prevents the data from being corrupted
- Regulatory Requirements
- Storage Mangement
Certainly there are various ways to perform moving the audit data task using a variety of products. This is just 1 possible design pattern you might use and it could easily be adapted for third party integration or changed to write data to an object store or some other audit data repo.
Terminology wise, I’ll call the server that consolidates the audit data the “archiving server”. That server is going to have a user account I will call the “auditarchiver” that can only insert and select on the audit_data table. (It can’t change data).
For each server where audit data will be extracted there will be an account that reads the audit data via SQL connection to read the JSON data from the audit files.
So first lets login as an admin on the archiving mysql server instance – I will use root. This entire example requires using the mysql shell (which rocks!). It includes python for the scheduled batch archiving from targeted servers which to pull the audit data from.
Step 1 – Audit Archive database Setup.
Create the schema and table on the archive server
On the audit data archiving server
> mysqlsh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
\sql \connect root@<archiving server>; create schema audit_archive; use audit_archive; CREATE TABLE `audit_data` ( `server_uuid` varchar(45) NOT NULL, `id` int NOT NULL, `ts` timestamp NOT NULL, `class` varchar(20) DEFAULT NULL, `event` varchar(80) DEFAULT NULL, `the_account` varchar(80) DEFAULT NULL, `login_ip` varchar(200) DEFAULT NULL, `login_os` varchar(200) DEFAULT NULL, `login_user` varchar(200) DEFAULT NULL, `login_proxy` varchar(200) DEFAULT NULL, `connection_id` varchar(80) DEFAULT NULL, `db` varchar(40) DEFAULT NULL, `status` int DEFAULT NULL, `connection_type` varchar(40) DEFAULT NULL, `connect_os` varchar(40) DEFAULT NULL, `pid` varchar(40) DEFAULT NULL, `_client_name` varchar(80) DEFAULT NULL, `_client_version` varchar(80) DEFAULT NULL, `program_name` varchar(80) DEFAULT NULL, `_platform` varchar(80) DEFAULT NULL, `command` varchar(40) DEFAULT NULL, `sql_command` varchar(40) DEFAULT NULL, `command_status` varchar(40) DEFAULT NULL, `query` varchar(40) DEFAULT NULL, `query_status` int DEFAULT NULL, `start_server_id` varchar(400) DEFAULT NULL, `server_os_version` varchar(100) DEFAULT NULL, `server_mysqlversion` varchar(100) DEFAULT NULL, `args` varchar(80) DEFAULT NULL, `account_host` varchar(80) DEFAULT NULL, `mysql_version` varchar(80) DEFAULT NULL, `the_os` varchar(80) DEFAULT NULL, `the_os_ver` varchar(80) DEFAULT NULL, `server_id` varchar(8) DEFAULT NULL, PRIMARY KEY (`server_uuid`,`id`,`ts`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; |
Step 2 – on the archiving server
First create the accounts archive server. Grant select and insert to the auditarchiver account. (no more)
1
2
3
|
\connect root@<archiving server>; create user auditarchiver identified by 'Th3Archivista!'; grant select, insert on audit_archive.audit_data to auditarchiver; |
Step 3 – for each server to read (pull) audit data – create an auditreader account
> mysqlsh
1
2
3
|
\sql \connect root@<servers to pull audit trail>; select @@version; |
/* If you aren’t running mysql enterprise version 8.0.22 or higher – stop and upgrade */
1
2
3
|
create user auditreader identified by 'Pu11Archivister!'; GRANT AUDIT_ADMIN ON *.* TO 'auditreader'; |
Step 4 – install enterprise audit if not already installed.
Install mysql enterprise audit
In a nutshell, execute the following SQL located in the mysql share dir – audit_log_filter_linux_install.sql
Step 5 –
Edit /etc/my.cnf – to minimally include first 3 lines are required for this example.
1
2
3
4
5
|
[mysqld] plugin-load-add=audit_log.so audit-log=FORCE_PLUS_PERMANENT audit-log-format=JSON audit-log-strategy=SYNCHRONOUS |
Restart the server
Step 6 – add audit filters and bind to user
If you haven’t added audit filters previously – the following will log everything for everyone connecting. This will log a great deal, but for the purpose of seeing this work in a test environment this makes sense. In production you’ll likely want to be more selective.
1
2
3
4
|
--- create audit filter - here log everything (this will generate a great deal of data) SELECT audit_log_filter_set_filter('log_all', '{ "filter": { "log": true } }'); --- attach filter to accounts - here for all users SELECT audit_log_filter_set_user('%', 'log_all'); |
Repeat for each audited instance. Probably best to start with one and go from there.
Step 7 – generate some audit data activity
Run some adhoc SQL on the servers as various users where you installed mysql enterprise audit.
Step 8 – pick a server where you can schedule mysqlsh in batch mode
What follows is how this batch python script will work (The consolidated code is repeated at the end to copy edit and run.
Please change my passwords and use specific server names etc.
So first I will connect using the mysqlx api to the read server with its own session and the archive server with its own session.
Change “localhost” to the ip/hostname of your archiving server.
1 |
archive_session = mysqlx.get_session( {</code> <code>'host': 'localhost', 'port': 33060,</code> <code>'user': 'auditarchiver', 'password': 'Th3Archivista!'} ) |
Change “localhost” to the ip/hostname of your audited server.
1
2
3
|
read_session = mysqlx.get_session( { 'host': 'localhost', 'port': 33060, 'user': 'auditreader', 'password': 'Pu11Archivister!'} ) |
Ok now I need to see if I have prior archive data – so I can point to where in the audit data I need to start reading newer data. If the archive contains no data for this instance – I am going to start at the beginning of my log data.
I create a json string with the “start” in the json if the archive table contains no data for this instance (identified by its server_uuid). The “start” tells the function to perform a general date time search.
However if there already prior data loaded, then I get the last timestamp and event id that was inserted and use that as a pointer into the audit data – in this case there is not a “start” in the JSON search string.
1
2
3
4
5
6
7
8
9
10
|
archive_empty = archive_session.run_sql("select count(*) from audit_archive.audit_data limit 1").fetch_one() if (archive_empty[0] > 0): print("Data in archive getting last event ts and id") search_args = archive_session.run_sql("select id, ts from audit_archive.audit_data order by ts desc, id desc limit 1").fetch_one() x = "set @nextts ='{ \"timestamp\": \"" + str(search_args[1]) + "\",\"id\":" + str(search_args[0]) + ", \"max_array_length\": 100 }'" setnext = read_session.run_sql(x) else: print("The archive is empty - get oldest audit event") read_session.run_sql("set @nextts='{ \"start\": { \"timestamp\": \"2020-01-01\"}, \"max_array_length\": 100 }'") |
Ok we’ve now set the session variable for my search criteria – @nextts.
If you want to look at the json search string
view_nextts = read_session.run_sql("select @nextts")
Now – in the next step you’ll see in the SQL the call to the audit_log_read component
AUDIT_LOG_READ(@nextts)
And you will see that I wanted to store the data in rows in my archive – so I am using the JSON_TABLE function to convert from JSON to rows.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
readaudit = read_session.run_sql(" " "SELECT @@server_uuid as server_uuid, id, ts, class, event, the_account,login_ip,login_os,login_user,login_proxy,connection_id,db, " " status,connection_type,connect_os,pid,_client_name,_client_version, " " program_name,_platform,command,sql_command,command_status,query, " " query_status,start_server_id,server_os_version,server_mysqlversion,args, " " account_host,mysql_version,the_os,the_os_ver,server_id " "FROM " "JSON_TABLE " "( " " AUDIT_LOG_READ(@nextts), " " '$[*]' " " COLUMNS " " ( " " id INT PATH '$.id', " " ts TIMESTAMP PATH '$.timestamp', " " class VARCHAR(20) PATH '$.class', " " event VARCHAR(80) PATH '$.event', " " the_account VARCHAR(80) PATH '$.account', " " login_ip VARCHAR(200) PATH '$.login.ip', " " login_os VARCHAR(200) PATH '$.login.os', " " login_user VARCHAR(200) PATH '$.login.user', " " login_proxy VARCHAR(200) PATH '$.login.proxy', " " connection_id VARCHAR(80) PATH '$.connection_id', " " db VARCHAR(40) PATH '$.connection_data.db', " " status INT PATH '$.connection_data.status', " " connection_type VARCHAR(40) PATH '$.connection_data.connection_type', " " connect_os VARCHAR(40) PATH '$.connection_data.connection_attributes._os', " " pid VARCHAR(40) PATH '$.connection_data.connection_attributes._pid', " " _client_name VARCHAR(80) PATH '$.connection_data.connection_attributes._client_name', " " _client_version VARCHAR(80) PATH '$.connection_data.connection_attributes._client_version', " " program_name VARCHAR(80) PATH '$.connection_data.connection_attributes.program_name', " " _platform VARCHAR(80) PATH '$.connection_data.connection_attributes._platform', " " command VARCHAR(40) PATH '$.general_data.command', " " sql_command VARCHAR(40) PATH '$.general_data.sql_command', " " command_status VARCHAR(40) PATH '$.general_data.status', " " query VARCHAR(40) PATH '$.genera_data.query', " " query_status INT PATH '$.general_data.status', " " start_server_id VARCHAR(400) PATH '$.startup_data.server_id', " " server_os_version VARCHAR(100) PATH '$.startup_data.os_version', " " server_mysqlversion VARCHAR(100) PATH '$.startup_data.mysql_version', " " args VARCHAR(80) PATH '$.startup_data.args', " " account_host VARCHAR(80) PATH '$.account.host', " " mysql_version VARCHAR(80) PATH '$.startup_data.mysql_version', " " the_os VARCHAR(80) PATH '$.startup_data.os', " " the_os_ver VARCHAR(80) PATH '$.startup_data.os_version', " " server_id VARCHAR(80) PATH '$.startup_data.server_id' " " ) " ") AS auditdata; ") |
Now you could go all JSON and store each event in a JSON data type. That would be simple as well. But in this case I’m storing in a table. It’s up to you.
Ok – now as the auditarchiver – I am going to save the data I just extracted.
Here’s where sqlx api is very handy. I can loop on results and save to the table with very little code.
1
2
3
4
5
6
7
8
9
10
11
12
|
aschema=archive_session.get_schema('audit_archive') atable=aschema.get_table('audit_data') if (archive_empty[0] > 0): evt = readaudit.fetch_one_object() print("Archive was not empty - skip first duplicate event") else: print("Archive was empty - load all") evt = readaudit.fetch_one_object() while evt: atable.insert(evt).execute() evt= readaudit.fetch_one_object() |
As you may have noticed – I’m not trying to extract to much as one time from the audit log. I’ve maxed it at 100 events. You’re also limited by the buffer size you’ve set for –audit-log-read-buffer-size.
See https://dev.mysql.com/doc/refman/8.0/en/audit-log-reference.html#sysvar_audit_log_read_buffer_size
So save the this script to a dir.
cd to the dir
Now you just run mysqlsh in batch mode.
First run –
frank@Mike % mysqlsh --py < archiveauditbatch
The archive is empty – get oldest audit event
Archive was empty – load all
The next runs
frank@Mike % mysqlsh --py < archiveauditbatch
Data in archive getting last event ts and id
Archive was not empty – skip first duplicate event
Ok – now that you’ve run a few tests, create a scheduled batch with cron or you favorite scheduler. Have it loop till empty etc etc.
Step 9 – login to the archive server and have a look at the data
select * from audit_archive.audit_data order by server_uuid, id, ts;
Run some stats
1
2
3
4
5
|
select event, count(event) from audit_archive.audit_data group by event; select login_user, event, count(event) from audit_archive.audit_data group by login_user, event; select distinct login_user, sql_command, status, query_status from audit_archive.audit_data ; |
Finally – this is far from production quality code
I inlined the passwords, I didn’t provide a parameter for the audit servers to loop and collect data etc. I didn’t check errors etc. The point was to demonstrate some techniques to help jump start folks.
In a followup blog –
I’ll show you how to perform hash chaining etc – so you can prove your audit data is immutable and untainted.
Thanks for using MySQL. Here’s the complete batch script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
archive_session = mysqlx.get_session( { 'host': 'localhost', 'port': 33060, 'user': 'auditarchiver', 'password': 'Th3Archivista!'} ) read_session = mysqlx.get_session( { 'host': 'localhost', 'port': 33060, 'user': 'auditreader', 'password': 'Pu11Archivister!'} ) archive_empty = archive_session.run_sql("select count(*) from audit_archive.audit_data limit 1").fetch_one() if (archive_empty[0] > 0): print("Data in archive getting last event ts and id") search_args = archive_session.run_sql("select id, ts from audit_archive.audit_data order by ts desc, id desc limit 1").fetch_one() x = "set @nextts ='{ \"timestamp\": \"" + str(search_args[1]) + "\",\"id\":" + str(search_args[0]) + ", \"max_array_length\": 100 }'" setnext = read_session.run_sql(x) else: print("The archive is empty - get oldest audit event") read_session.run_sql("set @nextts='{ \"start\": { \"timestamp\": \"2020-01-01\"}, \"max_array_length\": 100 }'") nextis = read_session.run_sql("select @nextts") readaudit = read_session.run_sql(" " "SELECT @@server_uuid as server_uuid, id, ts, class, event, the_account,login_ip,login_os,login_user,login_proxy,connection_id,db, " " status,connection_type,connect_os,pid,_client_name,_client_version, " " program_name,_platform,command,sql_command,command_status,query, " " query_status,start_server_id,server_os_version,server_mysqlversion,args, " " account_host,mysql_version,the_os,the_os_ver,server_id " "FROM " "JSON_TABLE " "( " " AUDIT_LOG_READ(@nextts), " " '$[*]' " " COLUMNS " " ( " " id INT PATH '$.id', " " ts TIMESTAMP PATH '$.timestamp', " " class VARCHAR(20) PATH '$.class', " " event VARCHAR(80) PATH '$.event', " " the_account VARCHAR(80) PATH '$.account', " " login_ip VARCHAR(200) PATH '$.login.ip', " " login_os VARCHAR(200) PATH '$.login.os', " " login_user VARCHAR(200) PATH '$.login.user', " " login_proxy VARCHAR(200) PATH '$.login.proxy', " " connection_id VARCHAR(80) PATH '$.connection_id', " " db VARCHAR(40) PATH '$.connection_data.db', " " status INT PATH '$.connection_data.status', " " connection_type VARCHAR(40) PATH '$.connection_data.connection_type', " " connect_os VARCHAR(40) PATH '$.connection_data.connection_attributes._os', " " pid VARCHAR(40) PATH '$.connection_data.connection_attributes._pid', " " _client_name VARCHAR(80) PATH '$.connection_data.connection_attributes._client_name', " " _client_version VARCHAR(80) PATH '$.connection_data.connection_attributes._client_version', " " program_name VARCHAR(80) PATH '$.connection_data.connection_attributes.program_name', " " _platform VARCHAR(80) PATH '$.connection_data.connection_attributes._platform', " " command VARCHAR(40) PATH '$.general_data.command', " " sql_command VARCHAR(40) PATH '$.general_data.sql_command', " " command_status VARCHAR(40) PATH '$.general_data.status', " " query VARCHAR(40) PATH '$.genera_data.query', " " query_status INT PATH '$.general_data.status', " " start_server_id VARCHAR(400) PATH '$.startup_data.server_id', " " server_os_version VARCHAR(100) PATH '$.startup_data.os_version', " " server_mysqlversion VARCHAR(100) PATH '$.startup_data.mysql_version', " " args VARCHAR(80) PATH '$.startup_data.args', " " account_host VARCHAR(80) PATH '$.account.host', " " mysql_version VARCHAR(80) PATH '$.startup_data.mysql_version', " " the_os VARCHAR(80) PATH '$.startup_data.os', " " the_os_ver VARCHAR(80) PATH '$.startup_data.os_version', " " server_id VARCHAR(80) PATH '$.startup_data.server_id' " " ) " ") AS auditdata; ") aschema=archive_session.get_schema('audit_archive') atable=aschema.get_table('audit_data') if (archive_empty[0] > 0): evt = readaudit.fetch_one_object() print("Archive was not empty - skip first duplicate event") else: print("Archive was empty - load all") evt = readaudit.fetch_one_object() while evt: atable.insert(evt).execute() evt= readaudit.fetch_one_object() |