Did you know that one task can have more than one parent? No?
Run the following code:
CREATE STAGE frosty_aws_stage
URL = 's3://frostyfridaychallenges/';
EXECUTE IMMEDIATE FROM @frosty_aws_stage/challenge_81/starter_code.sql;
Now you want to create 5 tasks:
Task 1 will run the following:
Task 1
INSERT INTO w81_raw_product (data)
SELECT parse_json(column1)
FROM
VALUES
('{"product_id": 21, "product_name": "Product U", "category": "Electronics", "price": 120.99, "created_at": "2024-02-16"}'),
('{"product_id": 22, "product_name": "Product V", "category": "Books", "price": 35.00, "created_at": "2024-02-16"}');
INSERT INTO w81_raw_customer (data)
SELECT parse_json(column1)
FROM
VALUES
('{"customer_id": 6, "customer_name": "Frank", "email": "frank@example.com", "created_at": "2024-02-16"}'),
('{"customer_id": 7, "customer_name": "Grace", "email": "grace@example.com", "created_at": "2024-02-16"}');
INSERT INTO w81_raw_sales (data)
SELECT parse_json(column1)
FROM
VALUES
('{"sale_id": 11, "product_id": 21, "customer_id": 6, "quantity": 1, "sale_date": "2024-02-17"}'), -- New product, new customer
('{"sale_id": 12, "product_id": 22, "customer_id": 1, "quantity": 1, "sale_date": "2024-02-17"}'), -- New product, existing customer
('{"sale_id": 13, "product_id": 2, "customer_id": 7, "quantity": 2, "sale_date": "2024-02-17"}'), -- Existing product, new customer
('{"sale_id": 14, "product_id": 3, "customer_id": 6, "quantity": 1, "sale_date": "2024-02-17"}'), -- Existing product, new customer
('{"sale_id": 15, "product_id": 21, "customer_id": 5, "quantity": 1, "sale_date": "2024-02-17"}'); -- New product, existing customer;
Tasks 2-4 will parse the JSON from the raw tables
Task 5 will run the following AFTER tasks 2-4 have completed:
Task 5
CREATE OR replace VIEW aggregated_sales
AS
SELECT c.customer_name,
p.product_name,
SUM(s.quantity) AS total_quantity,
SUM(s.quantity * p.price) AS total_sales
FROM sales s
join product p
ON s.product_id = p.product_id
join customer c
ON s.customer_id = c.customer_id
GROUP BY c.customer_name,
p.product_name
In the end, you should run:
End run
SELECT t.name,
t.predecessors
FROM TABLE(information_schema.Task_dependents(task_name => 'INSERT_INTO_RAW', recursive => TRUE)) t;
And get:
data:image/s3,"s3://crabby-images/61a37/61a3730c37835fc81f47e56b90c82bf76b7dd59b" alt=""
Happy DAGing!
There are a lot of moving parts in this challenge, but it’s nice seeing the final solution come out at the end. It’s also nice looking at the task DAG in the UI and seeing the flow