Snowpro Advanced Architect: Data Movement

Image by Peter H from Pixabay

Domain Objectives

  • Data Loading & Data Unloading
    • List best practices and the impact of different scenarios
      • Formats
      • File number
      • File size
      • Warehouse sizes
  • Continuous Data Loads Using Snowpipe
    • Outline how Snowpipe is different from Bulk Data loading
    • SQL syntax to create a pipe
  • Streams & Tasks
    • Working with Streams and Tasks
      • Limitations
      • Rest API endpoints
      • Billing
    • SQL Syntax to create and clone a Stream and Task

As aware as I am that this might sound a little sad, this side of the exam covers some of my favourite features of Snowflake. What really makes Snowflake impressive as a database service is how a user can migrate and transform data with relative ease. A lot of data engineering can be incredibly complicated but Snowflake really does revolutionise the field.

Data Loading Basics

Stages

In Snowflake, there are two types of stage: an internal stage and an external stage. Both are database objects that belong within schemas, they are a sort-of holding pen for your data before it’s ready to go into your tables. The data will be held there in file format and you can chose whether to perform transformations on that data before loading it into a table, or not.

Internal Stage An internal stage exists within Snowflake and comes in three flavours:

  • User: for files managed by one user, files can be loaded into multiple tables. This type of internal stage cannot be altered or dropped.

  • Table: this type of internal stage is not a separate object, but rather an implicit stage tied to each table. Table Internal Stages are useful for files managed by multiple users but only copied into one table.

  • Named: a Named Internal Stage is a named database object, it is managed by many users and copied into many tables - this is the stage created with the CREATE STAGE command and is the type of internal stage that feels most similar to an external stage.

External Stage An external stage is a named database object. This object stored the URL, access settings and convenience settings (this means stuff like the file format, for example) of your files stored with an external storage provider (e.g. AWS S3).

Ingestion Types

When it comes to moving files from stages to tables, there are two ways of doing so, you can either do it in bulk, or continuously. With both methods you can perform simple transformations of your data:

  • Column reordering
  • Column omission
  • Casts
  • Truncating text strings that exceed the target column length

Bulk Ingestion Bulk ingestion means ingesting a batch of already available files. All of your data exists and can be stored in either one file or multiple. This data is then ingested together in one swoop. This kind of ingestion depends on user-provided virtual warehouses which are specified in the COPY command. Files do not need to have the same number and order of columns as your target table.

Continuous Ingestion Continuous ingestion is a more automated way of doing things - you don’t have all your data, more data will definitely be produced in the future, but you know the general shape, size and format of your data, enough to be able to predict the sorts of transformations you will or won’t use on the data. Continuous ingestion means using something called a Snowpipe (which we will talk about later) to continuously ingest micro-batches of data. Snowpipe uses Snowflake-provided compute resources. Tasks (which again we will talk about later) can also be used.

Semi-Structured Data

Semi-structured data is any form of data that does not conform to the tabular style of, for example, CSVs or XLSX files. Nevertheless, they do possess some structure. The most common type of semi-structured data is JSON data.

When semi-structured data is loaded into Snowflake, the entire contents of the file is loaded into a single VARIANT column. Within your COPY command (used for ingestion) you will write the code for transforming this semi-structured data into a relational table. These transformations require prior-knowledge of column definitions. If that prior-knowledge is, for whatever reason, lacking then Snowflake offers two SQL functions that can help:

  • INFER_SCHEMA – detects column definitions in staged files and the metadata in suitable format for object creation.

  • GENERATE_COLUMN_DESCRIPTION – generates a list of columns from a staged file using the INFER_SCHEMA’s output.

Here is an example from Snowflake’s documentation:

select generate_column_description(array_agg(object_construct(*)), 'table') as columns
  from table (
    infer_schema(
      location=>'@mystage',
      file_format=>'my_parquet_format'
    )
  );

Source

You can create tables using these functions with CREATE TABLE … USING TEMPLATE

Data Loading Considerations

  • The number of load operations that run in parallel cannot exceed the number of data files to be loaded.

  • In order to optimise parallel operations files should be roughly 100-250 MB (or larger) in size compressed.

  • Loading files over 100 GB is not recommended.

  • Aggregate smaller files to minimise the processing overhead for each file, and split larger files into smaller ones. Snowflake recommends splitting large files by line to avoid records that span chunks.
  • The VARIANT data type imposes a 16 MB (compressed) size limit on individual rows.

  • When loading JSON, you can enable STRIP_OUTER_ARRAY to remove the outer array structure and load records into separate table rows.

  • When loading continuously, there’s an overhead of 0.06 credits per 1000 files queued.

  • Files being loaded via Snowpipes should be between 100-250 MB as a minimum, 100 GB as a maximum, bigger files should be split up.

  • If it takes longer than one minute to accumulate MBs of data in source application, consider creating new, smaller data files once per minute – this can reduce latency but increase overhead costs.

  • Amazon Kinesis Firehose is one recommendation for batching data files – the buffer size allows you to define desired file size and a wait interval which controls when a file is sent.

  • Avoid commas in numbers.

  • Loading large data sets can affect query performance – it’s recommended you use separate warehouses for data loading and data querying.

  • When staging data, consider organising it by path (or prefix in AWS terminology) – these paths should include identifying details, esp date.

  • COPY INTO includes parameters for loading specific files and files that match patterns.

  • Snowflake maintains metadata of files it has loaded, which expires after 64 days. After 64 days there is ambiguity over whether a file has been loaded or not – you can use LOAD_UNCERTAIN_FILES if you definitely want them to be loaded.

  • In a VARIANT column, null values are stored as a string ‘null’. Snowflake recommend setting STRIP_NULL_VALUES to TRUE because this is a waste of storage space.

  • When loading CSV data, Snowflake recommend using TRIM_SPACE to prevent leading or trailing white space.

  • If you specify the PURGE option in a COPY INTO command, files from an internal stage will be removed once loaded. This can also improve performance.

  • COPY_HISTORY can help explain failures – the STATUS column indicated whether it was loaded or not, and FIRST_ERROR_MESSAGE provides an explanation.

  • Executing a COPY command in VALIDATION_MODE with RETURN_ALL_ERRORS set to TRUE can help explain issues.

Data Unloading

Much of the commentary above applies to data unloading in Snowflake. Data is unloaded via a COPY INTO statement which will include a SELECT statement. Optionally, a user can chose to PARTITION BY a column so that data is unloaded into separate files.

Data will be unloaded to either an internal or external stage.

Snowpipes

Snowpipes are a method for automating the ingestion of data - continuous ingestion. The main use case for Snowpipes is event-based ingestion meaning, in short, that when your file is uploaded to a stage, this will trigger the Snowpipe meaning it’s avilable as soon as possible as opposed to being made available according to a pre-defined schedule.

Image Credit: Snowflake

While some of the below may be slightly superfluous for the needs of this exam, I think since the exam specification requires a very practical hands-on knowledge of Snowpipes (you should know all the SQL syntax involved) this section of the blog will guide you through the creation of Snowpipes from start to finish with some theoretical explainers intertwined.

Storage Integrations

A storage integration is a Snowflake object that stores an Identity and Access Management (IAM) entity for external cloud storage. This entity is then granted permissions by the administrators of your cloud provider. Your storage integration is then attached to any number of external stages in order to allow them access to the files stored with your cloud provider. While this might sound like a bit of an extravagance, what a storage integration does is it allows you to handle authentication between your cloud provider and Snowflake without having to either store credentials in Snowflake (dangerous) or constantly provide temporary tokens (hard to automate).

The SQL is as follows:

AWS

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<iam_role>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]


Note that in AWS you will have to create an IAM Policy and an IAM Role. You will then have to use DESC INTEGRATION (see below) to retrieve the integration’s AWS_IAM_ROLE_ARN and AWS_EXTERNAL_ID and add these to the trust relationship of your IAM role.

GCP

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = GCS
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/') ]


Note that you will have to use DESC INTEGRATION to retrieve the GCP_SERVICE_ACCOUNT which you will use to create an IAM role.

Azure

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = AZURE
  ENABLED = TRUE
  AZURE_TENANT_ID = '<tenant_id>'
  STORAGE_ALLOWED_LOCATIONS = ('azure://<account>.blob.core.windows.net/<container>/<path>/', 'azure://<account>.blob.core.windows.net/<container>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('azure://<account>.blob.core.windows.net/<container>/<path>/', 'azure://<account>.blob.core.windows.net/<container>/<path>/') ]


Note: In Azure the ‘tenant id’ is the ID for your Office 365 tenant that the allowed and blocked storage accounts belong to. To find this click Azure Active Directory > Properties. After this is done you will use DESC INTEGRATION to find the AZURE_CONSENT_URL and AZURE_MULTI_TENANT_APP_NAME which you will use to create an IAM role for the Azure service principal created by Snowflake.

Extra Syntax

  • ALTER STORAGE INTEGRATION: allows you to edit parameters in the storage integration.

  • DROP INTEGRATION: allows you to delete the integration.

  • SHOW INTEGRATIONS: gives you a list of integrations.

  • DESCRIBE INTEGRATION: describes the properties of an integration, you can abbreviate this to just DESC INTEGRATION.

Event Notifications(/Triggers)

In your cloud provider you will need to create an event notification for the creation/uploading of files to your cloud provided file storage location. In AWS this can be done via S3 Event Notifications, SQS and SNS, in GCP via a Pub/Sub topic that receives event messages from your GCS bucket, and in Azure via Event Grid subscriptions.

Note that with Azure and GCP you will have to create a Notification Integration in Snowflake:

CREATE NOTIFICATION INTEGRATION <integration_name>
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';

CREATE NOTIFICATION INTEGRATION <integration_name>
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
  AZURE_STORAGE_QUEUE_PRIMARY_URI = '<queue_URL>'
  AZURE_TENANT_ID = '<directory_ID>';


External Stages

External stages have been described above. The basic syntax for creating them is as follows:

CREATE STAGE <stage_name>
  URL = '<external_stage_url>'
  STORAGE_INTEGRATION = <storage_integration_name>;


Simply by including the URL and storage integration parameters this will become an external stage. Excluding those parameters will create an internal stage.

Many more parameters exist for stages, for a full list click here

Creating the Pipe

The syntax for creating a pipe consists of two sections, the first is the CREATE PIPE statement followed by the parameters of the pipe, and the second is the COPY INTO statement (which is where one can perform any small transformations necessary).

The basic syntax is as follows:

CREATE PIPE <pipe_name>
AUTO_INGEST = TRUE
AS <copy_statement>


When using Azure or GCP you must include:

INTEGRATION = '<NOTIFICATION_INTEGRATION_NAME>'


A full list of optional parameters can be found here*

At a minimum, the COPY INTO statement must have the following:

  COPY INTO <database.schema.tabke_name>
  FROM @<database.schema.stage_name>
  FILE_FORMAT = <file_format>


One can find a full list of parameters here

Note that using the pipe will require OWNERSHIP of the named pipe, USAGE on the stage, file format, database and schema as well as READ on the stage and INSERT + SELECT on the target table.

To load historical data, you will need to use:

ALTER PIPE <pipe_name> REFRESH


If a pipe is paused for more than 14 days it is considered stale and you will need to do the following to reactivate it:

SELECT SYSTEM$PIPE_FORCE_RESUME('<pipe_name>', 'staleness_check_override')


Snowpipe REST Endpoints

  • Calls to Snowpipe REST Endpoints (SREs) use key-based authentication because the service doesn’t maintain client sessions.
  • To follow principle of least privilege, Snowflake recommends creating a separate user and role for pipe ingestion. User should have this role as default.
  • To ingest files a client calls insertFiles with a list of files and a defined pipe.
  • You can view the load history for Snowpipe REST Endpoints:
    • Via REST Endpoints:
      • insertReport
      • loadHistoryScan
    • Via Information Schema table function:
      • COPY_HISTORY
    • Via Account Usage view:
      • COPY_HISTORY
  • SREs can be used in AWS Lambda

Streams

A stream is a way of doing change data capture, that is, monitoring changes to your data over time. When you create a stream, a snapshot of every row in the data is taken at a point in time (called an offset if you want to use the professional vocabulary) the stream then records any changes enacted by DML statements on those rows after the snapshot is taken:

  • Change records provide the state of a row before and after the change has occurred.

  • Change information mirrors the column structure of the table being tracked and gives you additional metadata columns that describe each change event.

The stream itself doesn’t contain any table data – instead, when the stream is created, hidden columns are added to the table (which use minimal storage) which store change tracking metadata, CDC records when querying a stream essentially utilise both these hidden columns and the snapshot (or offset) stored in the stream to give you the CDC info you require.

“The following example shows a source table with 10 committed versions in the timeline. The offset for stream s1 is currently between table versions v3 and v4. When the stream is queried (or consumed), the records returned include all transactions between table version v4, the version immediately after the stream offset in the table timeline, and v10, the most recent committed table version in the timeline, inclusive.”

Image and Text Source: Snowflake

The aforementioned extra columns are:

  • METADATA$ACTION – indicates the DML operation recorded.

  • METADATA$ISUPDATE – indicates whether the operation was part of an UPDATE statement.

  • METADATA$ROW_ID – unique and immutable row ID.

There are three types of streams:

  • Standard - Tracks all DML statements – this stream performs a join on inserted & deleted rows to provide row level delta. If a row is inserted & deleted, it won’t appear in the delta.

  • Append-Only - Tracks row-inserts only, updates and deletes are not recorded. If a row is inserted and then deleted, it will still be shown. This kind of stream can be much more performant than a standard stream. If you only need inserts, then use this stream for speed.

  • Insert-Only - Only supported for external tables. Tracks row-inserts only, delete operations are not tracked. Overwritten or appended files are essentially handed as new files.

Image Credit: Snowflake

Notes on Streams:

  • Streams become stale when the offset is outside the data retention period, historical data is thereafter not available. New changes can only be tracked if you recreate the stream.

  • If a data retention period is less than 14 days then Snowflake temporarily extends this period to prevent staleness. Max period it can extend by is defined by MAX_DATA_EXTENSION_TIME_IN_DAYS.

  • As an alternative to streams, you can query change tracking metadata with the CHANGES clause in a SELECT statement.

Stream SQL Syntax

Creating a stream is relatively easy

-- Table stream
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
  <name>
  [ COPY GRANTS ]
  ON TABLE <table_name>
  [ { AT | BEFORE } { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> } ]
  [ APPEND_ONLY = TRUE | FALSE ]
  [ SHOW_INITIAL_ROWS = TRUE | FALSE ]
  [ COMMENT = '<string_literal>' ]

-- External table stream
CREATE [ OR REPLACE ] STREAM [IF NOT EXISTS]
  <name>
  [ COPY GRANTS ]
  ON EXTERNAL TABLE <external_table_name>
  [ { AT | BEFORE } { TIMESTAMP => <timestamp> | OFFSET => <time_difference> | STATEMENT => <id> } ]
  [ INSERT_ONLY = TRUE ]
  [ SHOW_INITIAL_ROWS = TRUE | FALSE ]
  [ COMMENT = '<string_literal>' ]

Source

You can also create a clone of a stream which inherits the offset of the original stream.

CREATE [ OR REPLACE ] STREAM <name> CLONE <source_stream>
  [ COPY GRANTS ]
  [ ... ]

Source

The following commands are also available:

  • ALTER STREAM
  • DROP STREAM
  • SHOW STREAMS
  • DESCRIBE STREAM

Billing

  • When a stream is not consumed regularly, Snowflake will temporarily extend the data retention period for the source table. This will increase monthly data storage charges.

  • Although small, those extra columns will also add to storage costs.

  • A stream must be processed by a virtual warehouse when the stream is being queried - this will be the main cost associated with a stream.

Tasks

Tasks are scheduled SQL statements (they are not event-triggered). It is possible to develop a ‘tree’ of tasks where predecessor tasks are defined when creating a task. One tree can contain 1,000 tasks, a child can only have one parent task but a parent task can have as many as 100 child tasks.

Image Credit: Snowflake

The same tree cannot run at the same time as itself (i.e. you cannot have two instances of the same task running simultaneously) if an overlap occurs, the schedule of the root task is delayed until all child tasks have finished running. This could mean one run of the task is skipped if the first run takes too long. To prevent this occurring set the ALLOW_OVERLAPPING_EXECUTION parameter to true in the creation of the task (this parameter is false by default).

Limitations & Troubleshooting

  • If you sever the link between parent and child tasks, then child tasks will become either root tasks or standalone tasks.

  • When a task runs, a version of the task is created. Root tasks create versions of an entire tree.

  • To modify a child task, the root task must be suspended – all future runs will be cancelled, but any runs in progress will continue.

  • When a scheduled task is changed, the changes don’t take effect until the next run.

  • You can set session parameters for tasks.

  • TASK_HISTORY table function lets you view the task history.

  • Tasks are run with the privileges of the task owner.

  • When a task does not run, follow these steps:
    1. Verify the Task Did Not Run Look at the TASK_HISTORY table function. Note times and error codes. Verify whether parent tasks completed.
    2. Verify the Task was Resumed Verify that the state of the task is RESUMED using DESCRIBE TASK or SHOW TASKS. If not, ALTER TASK… RESUME.
    3. Verify the Permissions Granted to the Task Owner Use SHOW GRANTS TO ROLE to see if the task owner has the requisite permissions.
    4. Verify the Condition If the task includes a WHEN clause with a SYSTEM$STREAM_HAS_DATA condition, verify that the specified stream contain CDC records.
  • By default, tasks have a 60 minute limit on a single run which exists to prevent never-ending tasks from running forever. If a task is cancelled this way, there are two suggested remedies:
    1. If you query TASK_HISTORY and find that the task was cancelled or exceeded the window schedule for the task then resizing the warehouse as it may be undersized.
    2. You can also use ALTER TASK … SET USER_TASK_TIMEOUT_MS and set to a higher value.

SQL Syntax

The syntax to create a task is as follows:

CREATE [ OR REPLACE ] TASK [ IF NOT EXISTS ] <name>
  WAREHOUSE = <string>
  [ SCHEDULE = '{ <num> MINUTE | USING CRON <expr> <time_zone> }' ]
  [ ALLOW_OVERLAPPING_EXECUTION = TRUE | FALSE ]
  [ <session_parameter> = <value> [ , <session_parameter> = <value> ... ] ]
  [ USER_TASK_TIMEOUT_MS = <num> ]
  [ COPY GRANTS ]
  [ COMMENT = '<string_literal>' ]
  [ AFTER <string> ]
[ WHEN <boolean_expr> ]
AS
  <sql>

Source

You can, like streams, clone a task which will inherit all the parameter values:

CREATE [ OR REPLACE ] TASK <name> CLONE <source_task>
  [ COPY GRANTS ]
  [ ... ]


The following commands are also available:

  • ALTER TASK
  • DROP TASK
  • DESCRIBE TASK
  • SHOW TASKS

Further Reading

Loading Data into Snowflake

Unloading Data from Snowflake

Continuous Data Pipelines