Load event to CDP
A component for loading data into the CDP data model
This component provides set of primitives for interacting with the CDP database.database - tables customer_events
and attribute_tags
.
The order of the columns in the input csvCSV DOES matter!!!! and it's checked at runtime
Configuration
PostgreSQL backend
{ "parameters": { "action": "", "debug": true, "dry_run": false, "backend": "postgres", "auth": { "host": "", "dbname": "postgres", "user": "postgres", "password": "", "cdp_schema": null #optional } } }
dry_run
(defaultfalse
); iftrue
do not commit any changes to the database. Iffalse
changes are commited after all input files are processedauth.cdp_schema
- optional if, for some reason, the data model doesn't reside in the public schema this sets thesearch_path
to the specified value (which means the default$user,public
doingdoing.
BigQuery backend
json_credentials_string(can be prefixed with #) -https://cloud.google.com/storage/docs/reference/libraries#setting_up_authentication
{"parameters":{"action":"","debug":true,"backend":"bigquery","auth":{"#json_credentials_string":"{foo...}","bq_project_name":"","bq_dataset_name":"","bq_location":""}}}
json_credentials_string is just the contents of the json auth file
Common config
allCVSscsvsininin/tables/*.csv
are processed according to theaction
parameter- parameter.
debug
defaults totrue
All
Available actions are
Sources, attributes, events, tags
This is populated at the beginning of a CDP implementation and kept as a ground truth of defined attributes and sources
and UPSERTS the rows into relevant tables in the CDM.
This component does not delete any attributes. Can be implemented if necessary, for now delete it by hand and remove the respective row from the input tables.
the columns modified and created must be there, but can be empty, the correct datetime values will be populated in db
Sources
"action": "upsert_sources"
id,name,description,type,frontend_settings,is_hidden "profile_stitching","Profile stitching","Attributes generated during profile stitching","profile_stitching","{""color"": ""red""}",1
attributes
"action": "upsert_attributes"
id,source_id,name,is_disabled,is_unique,data_type,order_index,description,is_hidden customer_defining_event_id,profile_stitching,Customer defining event_id,0,0,"string",1000,"Points at event ids in customer_events which contain primary stitching attributes for given customer entity",1
(given source_id must already exist) - order_index is used to order attributes in frontend. If you don't care, just put 1000 there.
events
updates table events containing event definitions along with schemas
"action": "upsert_events"
source_id,type,version,name,schema,is_hidden test_source,purchase,2-0-0,Updated test event type,"{""foo"": 42}",0 test_source,purchase,3-0-0,Updated test event type,"{""bar"": 84}",0
(given source_id must already exist)
tags
"action": "upsert_tags"
the id should be snake_case version of the name ascii only (matching [a-z0-9]_+)
id,name my_tag_1,"My taG 1$"
sources tags
"action": "upsert_source_tags"
source_id,tag_id mailchimp,my_tag_1
attribute tags
"action": "upsert_attribute_tags"
attribute_id,tag_id mailchimp_email,my_tag_1
removing customer entities
"action": "remove_customer_entities"
given customer_entity_id removeare:
all customer_attributesinsert_customer_eventsset
upsert_customer_eventscustomer_events.customer_entity_id=null- upsert_attribute_tags
Customer events
Deleting customer_events
"action": "delete_customer_events" Input csv:
id id_of_event_to_be_deleted id_of_event2_to_be_deleted
Loading new customer_events
"action": "insert_customer_events"
This inserts new events, if there is a conflict on id
column, skip this row altogetheraltogether.
The
event_id
must be already defined in theevents
table before upload
structure of the input file:"action":The "insert_customer_events"
id,customer_entity_id,event_id,source_id,event_time,type,version,payload,created "c",,md5(source_id || type || version),"test_source",2018-06-23T12:34:56Z,"shouldn't be imported"subscribed","1-0-0","{""foo"": 126}","2018-06-23T12:34:56Z" "d",,md5(source_id || type || version),"test_source",2018-06-23T12:34:56Z,"purchase","1-0-0","{""foo"": 126}","2018-06-23T12:34:56Z"
Upserting customer events (dangerous)
same strucutre as customer_events.csv
"action": "upsert_customer_events"
This inserts new events, if there is a conflict on id
column, updates payload
and leaves everything else untouched.Input file should have the same structure as for loading new customer events (customer_events.csv
).
This is an experimental mode. It can be useful when you have a set of events loaded in the database and later you find out that the payloads are missing some field and you want to backfill the data.
This must be done in a backward compatible way (adding new field to the json shouldn't break any follow-up customer attributes). Before using this, make sure you know what you are doing and the implications of it.
AssignUpserting customer_entity_idattribute to customer_eventstags
From csv (old way)
"action": "
update_matched_events"
customer_entity_id,event_id robin,a
to assign event_id a to customer_entity_id robin
From view (experimental)
With detached table ce.customer_events_entities
"action": "insert_matched_events_from_query"
place your query
SELECT attr."customer_entity_id" ,ev."id" "event_id" FROM "public"."customer_attributes" attr INNER JOIN "ce"."customer_events" ev ON attr."value"=TRIM(ev."payload"->>'appsflyer_id') WHERE ev."source_id"='appsflyer_bca' AND ev."version" IN ('1-0-0') AND ev."type" IN ('app_registered', 'app_installed', 'app_bca_applied', 'app_opened', 'app_used') AND ev."customer_entity_id" IS NULL AND attr."attribute_id" IN ( 'stitching_appsflyer_id' ) UNION ALL ...
into in/files/match_customer_events.sql
In order to assign customer_entity_id to the respective customer_events.
Customer attributes
The value of each attribute_id is verified and error is raised appropriately. Available data types (and accepted values are)
| ||
| ||
| ||
|
| |
| ||
| ||
Example compound attribute: the syntax is like
compound([["SUBATTRIBUTE_ID", "SUBATTRIBUTE NAME", "SUBATTRIBUTE DATA TYPE"]])
for example
compound([["origin","Origin","string"],["destination","Destination","string"],["date","Date","date"],["price","Price","float"]])
can be vieiwed as
compound( [ # id # name # datatype ["origin", "Origin", "string"], # subattribute 1 ["destination","Destination","string"], # subattribute 2 ["date", "Date", "date"], # subattribute 3 ["price", "Price", "float"] # subatribute 4 ] )
From csv
The id column is computed dynamically based on attributes table. The modified and created values are set to current UTC timestamp. You can supply duplicated rows and they will be deduplicated before load.
The input is a csv in/tables/foo.csv
customer_entity_id,attribute_id,value robin,unique,jedinecny-updated robin,unique,jedinecny-updated robin,unique_attr2,42
Delsert
"action": "delsert_customer_attributes"
All attribute_ids from the csv are first deleted from customer_attributes and then inserted. This is most likely the function you are looking for when loading customer_attributes.
Using this prevents a subtle bug of upserting sparse customer_attributes. On day one you compute attribute as true and insert into database. On day two, new event arrives and the value is now false - if you were using upsert, the value wouldn't be inserted (because we do not store empty strings, nulls, 0, false)
This doesn'tupserts covertag_ids
for the bugattributes in 100%, but in general should be good to go. The only remaining edge case is when you compute attribute email_opened_in_last_30days on day 1. On day 2 this evaluates to false for all customers (so the csvattribute_tags
youtable. areDoesn't about to load is empty and there is no way to recognize what attributes should be deleted from customer_attributes first - this information is read from the csv) so nothing is deleted/inserted, but in production with real data and a lot of customers this should happen only rarely.
Upsert
"action": "upsert_customer_attributes"
All attributes from the input csv are upserted. This does not delete anything from the database. You probably want to use delsert_customer_attributes unless you are using it to load customer attributes from profile-stitching.
From views
Two actions possible, work the same as customer_attributes from csv (above)table.
attribute_id,tag_id"action": "upsert_customer_attributes_from_view",
"action": "delsert_customer_attributes_from_view",mailchimp_email,my_tag_1
Attributes can be calculated inside a VIEWand inserted directly into customer_attributes bypassing the expensive IO moving around and serializing CSV files.
config.json
{"parameters":{"action":"delsert_customer_attributes_from_view","config":{#oneofthefollowing"schema":"my_schema","views":["my_schema.number_of_transactions_total","my_schema.revenue_total"]}}}
if you add config.schema all views existing in given schema will be upserted into customer_attributes. If you add config.views the specific views will be upserted.
The structure of the views is the same as the csvs in case of upserting attributes (columns customer_entity_id, attribute_id, value) and the attribute_id must exist in attributes table
Merged customer entities
pre profile stitching 1.0.0
"action": "update_merged_customer_entities"
old_customer_entity_id,new_customer_entity_id robin,batman robin2,batman
would
update allcustomer_events.customer_entity_idfromrobinorrobin2tobatmanDELETEall rows from customer_attributes wherecustomer_entity_idis one of the former customer_entities (robinorrobin2). Instead, if we just updatedcustomer_entity_id(and recomputedid`) it would break consistency for unique attributes (imagine each former entity has it's own revenue calculated, after the join we would end up with 2 different revenues for the same customer_entity). The safe way is to just drop it.
This is useful in cdp-profile-stitching until version <1.0.0. When using later version, use the sparse option below
profile stitching 1.0.0 +
"action": "update_merged_customer_entities_sparse"
The newer profile stitching has slightly changed logic of merging 2+ customer entities the new event no longer becomes the new "parent" customer entity. Instead the the smaller entity (less # of attributes) along with the event entity is merged into the existing entity
# old way A B (= existing customer entities) \/ NEW_ENTITY (= parent) # new way B NEW_ENTITY \ / \ / A (= existing entity == parent entity)
This gets saved as
old_customer_entity_id,new_customer_entity_id B,A
and we need to - delete from customer_attributes both B and A. - update customer_events.customer_entity_id from the old_customer_entity_id to the new_customer_entity_id.
In this case this action must run (and finish) BEFORE the upsert of stitching customer attributes finishes
This is what the suffix _sparse represents.
for use with customer_event_entities instead of customer_events
"action": "update_merged_customer_entities_customer_event_ids_entities_sparse" same input csv
Executing custom queries
If "action": "execute_custom_query"
all files placed into `in/files are assumed to contain an sql query each and they will be executed one by one Be very careful, as you can easily delete everything**
This is useful, for example when creating customer attribute views
Use in GDPR protection
If a customer entity requests a GDPR protection, we must - Get all customer_events.id for given customer_entity containing PII data - Delete all customer_attributes for given customer_entity_id - Set all affected customer_events.customer_entity_id=NULL (this is a precaution to make sure there aren't any orphaned events left) - hash sensitive data in event payloads, reformat them (because sensitive data can contain elements which are used in the event id hash calculation) - insert reformatted events - rerun profile stitching
For this purpose the actions remove_customer_entities and delete_customer_events can be used.