# Datasets for Ingest
When ingesting data using Flo.w Realtime you use the MQTT topic name to specify a target Flo.w dataset. The MQTT Ingester uses the dataset metadata and database connection details to perform SQL insert, upsert and delete operations against the database table backing the dataset.
To prepare an empty dataset ready to receive ingested data, you must:
- Create a new database table with the required columns, primary keys and indexes.
- Create a new table-type Flo.w dataset (using the Flo.w CLI) specifying the new database table.
To verify that the dataset has been created successfully, perform the following Flo.w CLI command to view the detected dataset attributes:
flow datasets attrs <datasetID>
If the reported attributes, attribute types, and primary keys settings match the expected values then the dataset is ready for ingest.
To verify that the dataset can be queried and is empty, perform the following command:
flow datasets query <datasetID>
You should receive 'No items' as output.
# Dataset Ingest Strategies
Data ingest typically uses one of two strategies. The choice of strategy will determine how to design the database table schema and which operation to use when ingesting data.
# Time Series Strategy
Time series datasets hold timestamped, historical data. The total number of records increases as time proceeds.
Use the insert action to ingest data using this strategy. The database table backing a time series dataset should specify a composite primary key including the timestamp column and a unique ID for each entity of interest.
See Working with Time Series Data for more information on constraints and caveats when using this strategy.
# Latest Value Strategy
Latest value datasets (or 'upsert' datasets) hold the latest record for each entity of interest.
Use the upsert action to ingest data using this strategy. The database table backing a latest value dataset should specify a unique ID for each entity of interest as a primary key.
Upserting data into a latest value dataset will execute insert or update operations as necessary to maintain records for all entities encountered. The table will grow in size initially as new entities are encountered and then will plateau.
If required, include a periodic operation in your stream processing pipeline to purge stale entity records. Alternatively, specify a where
clause when querying the dataset to filter out stale records.
# Creating the Backing Table
Flo.w Engine uses drivers to access database engines, but administration of those databases is outside the scope of Flo.w tools. Use management tools provided by your database supplier (or third-party clients) to create tables for ingest. For PostgreSQL/PostGIS we suggest using pgAdmin (opens new window), which is available for Linux, Mac OS and Microsoft Windows.
# Primary Keys and Indexes
You should specify appropriate primary keys depending on the data you are going store, how it will be ingested and how you intend to query it. At a minimum, the following primary keys should be defined:
- Time series strategy: a composite primary key comprising entity ID and timestamp.
- Latest value strategy: a primary key entity ID.
Add additional indexes appropriate for your use case and expected queries.
# Timestamp Columns
When using PostgreSQL/PostGIS, we recommend always using the TIMESTAMP WITH TIME ZONE
column type. Where possible you should always work in Coordinated Universal Time (UTC) and use ISO8601 (opens new window)-formatted timestamp strings when ingesting data. Care should be taken when working with timestamp data from external sources that you understand the format of the timestamp and any associated daylight saving or time zone issues.
# Geometry Columns
When using PostgreSQL/PostGIS, use an explicit PostGIS geometry type with SRID when specifying geometry columns. For example geometry(Polygon,4326)
, specifies a polygon column using WGS84 (opens new window) (lng/lat).
Use the flow datasets attrs <datasetID>
Flo.w CLI command to verify geometry attributes. If the reported SRID is -1 then the geometry column's SRID has not been recognized. Check that you have created the geometry column correctly.
# Updating the Backing Table Schema
If you alter the schema of the backing table then you must run flow datasets update <datasetID>
to allow Flo.w to update dataset metadata that defines available attributes.
You can run flow datasets attrs <datasetID>
at any time to verify that the dataset attribute metadata is correct.