ePrivacy and GPDR Cookie Consent by Cookie Consent Skip to main content

Load event to CDP

A component for loading data into the CDP data model

This component provides set of primitives for interacting with the CDP database.database - tables customer_events and attribute_tags.

The order of the columns in the input csvCSV DOES matter!!!! and it's checked at runtime

Configuration

PostgreSQL backend

{
  "parameters": {
     "action": "",
     "debug": true,
     "dry_run": false,
     "backend": "postgres",
     "auth": {
        "host": "",
        "dbname": "postgres",
        "user": "postgres",
        "password": "",
        "cdp_schema": null #optional
     }
  }
}
  • dry_run (default false); if true do not commit any changes to the database. If false changes are commited after all input files are processed
  • auth.cdp_schema - optional if, for some reason, the data model doesn't reside in the public schema this sets the search_path to the specified value (which means the default $user,public  value doesn't apply anymore). Only use this if you know what you are doingdoing.

BigQuery backend

{
  "parameters": {
     "action": "",
     "debug": true,
     "backend": "bigquery",
     "auth": {
        "#json_credentials_string": "{foo...}",
        "bq_project_name": "",
        "bq_dataset_name": "",
        "bq_location": ""
     }
  }
}
  • json_credentials_string is just the contents of the json auth file

Common config

    All

  • allCVSs csvsin in in/tables/*.csv are processed according to the action parameter
  • parameter.

    debug defaults to true

Available actions are

actionpostgresbigquery
upsert_sourcesxx
upsert_eventsxx
upsert_attributesxx
upsert_tagsx 
insert_customer_eventsxx
upsert_customer_eventsx 
delete_customer_events x
update_matched_eventsxx
insert_matched_events_from_queryx 
update_merged_customer_entitiesxx
update_merged_customer_entities_sparsexx
update_merged_customer_entities_customer_event_ids_entities_sparsex 
upsert_customer_attributes_full_structurex 
upsert_customer_attributesxx
delsert_customer_attributesxx
upsert_customer_attributes_from_viewx 
delsert_customer_attributes_from_viewx 
upsert_source_tagsx 
upsert_attribute_tagsx 
remove_customer_entities x
execute_custom_queryx 

Sources, attributes, events, tags

This is populated at the beginning of a CDP implementation and kept as a ground truth of defined attributes and sources

and UPSERTS the rows into relevant tables in the CDM.

This component does not delete any attributes. Can be implemented if necessary, for now delete it by hand and remove the respective row from the input tables.

the columns modified and created must be there, but can be empty, the correct datetime values will be populated in db

Sources

"action": "upsert_sources"

id,name,description,type,frontend_settings,is_hidden
"profile_stitching","Profile stitching","Attributes generated during profile stitching","profile_stitching","{""color"": ""red""}",1

attributes

"action": "upsert_attributes"

id,source_id,name,is_disabled,is_unique,data_type,order_index,description,is_hidden
customer_defining_event_id,profile_stitching,Customer defining event_id,0,0,"string",1000,"Points at event ids in customer_events which contain primary stitching attributes for given customer entity",1

(given source_id must already exist) - order_index is used to order attributes in frontend. If you don't care, just put 1000 there.

events

updates table events containing event definitions along with schemas

"action": "upsert_events"

source_id,type,version,name,schema,is_hidden
test_source,purchase,2-0-0,Updated test event type,"{""foo"": 42}",0
test_source,purchase,3-0-0,Updated test event type,"{""bar"": 84}",0

(given source_id must already exist)

tags

"action": "upsert_tags"

the id should be snake_case version of the name ascii only (matching [a-z0-9]_+)

id,name
my_tag_1,"My taG 1$"

sources tags

"action": "upsert_source_tags"

source_id,tag_id
mailchimp,my_tag_1

attribute tags

"action": "upsert_attribute_tags"

attribute_id,tag_id
mailchimp_email,my_tag_1

removing customer entities

"action": "remove_customer_entities"

given customer_entity_id removeare:

  • all customer_attributesinsert_customer_events
  • set customer_events.customer_entity_id=nullupsert_customer_events
  • upsert_attribute_tags

Customer events

Deleting customer_events

"action": "delete_customer_events" Input csv:

id
id_of_event_to_be_deleted
id_of_event2_to_be_deleted

Loading new customer_events

"action": "insert_customer_events"

This inserts new events, if there is a conflict on id column, skip  this row altogetheraltogether.

  • The event_id must be already defined in the events table before upload

"action":The "insert_customer_events"structure of the input file:

id,customer_entity_id,event_id,source_id,event_time,type,version,payload,created
"c",,md5(source_id || type || version),"test_source",2018-06-23T12:34:56Z,"shouldn't be imported"subscribed","1-0-0","{""foo"": 126}","2018-06-23T12:34:56Z"
"d",,md5(source_id || type || version),"test_source",2018-06-23T12:34:56Z,"purchase","1-0-0","{""foo"": 126}","2018-06-23T12:34:56Z"

Upserting customer events (dangerous)

same strucutre as customer_events.csv

"action": "upsert_customer_events"

This inserts new events, if there is a conflict on id column, updates payload  and leaves everything else untouched.Input file should have the same structure as for loading new customer events (customer_events.csv).

This is an experimental mode. It can be useful when you have a set of events loaded in the database and later you find out that the payloads are missing some field and you want to backfill the data.

This must be done in a backward compatible way (adding new field to the json shouldn't break any follow-up customer attributes). Before using this, make sure you know what you are doing and the implications of it.

 

AssignUpserting customer_entity_idattribute to customer_eventstags

From csv (old way)

"action": "update_matched_events"

customer_entity_id,event_id
robin,a

to assign event_id a to customer_entity_id robin

From view (experimental)

With detached table ce.customer_events_entities

"action": "insert_matched_events_from_query"

place your query

SELECT     attr."customer_entity_id"
           ,ev."id"  "event_id"
FROM       "public"."customer_attributes" attr
INNER JOIN "ce"."customer_events" ev
        ON attr."value"=TRIM(ev."payload"->>'appsflyer_id')
WHERE      ev."source_id"='appsflyer_bca'
       AND ev."version" IN ('1-0-0')
       AND ev."type" IN ('app_registered', 'app_installed', 'app_bca_applied', 'app_opened', 'app_used')
       AND ev."customer_entity_id" IS NULL
       AND attr."attribute_id" IN ( 'stitching_appsflyer_id' )
UNION ALL
...

into in/files/match_customer_events.sql

In order to assign customer_entity_id to the respective customer_events.

Customer attributes

The value of each attribute_id is verified and error is raised appropriately. Available data types (and accepted values are)

data_typeallowed_values (regex)notes
bool[01]1 is true, 0 is false
string.*anything
date^\d{4}-\d{2}-\d{2}$yyyy-mm-dd
datetime^\d{4}-\d{2}-\d{2}[ T]?\d{2}:\d{2}:\d{2}[Z]?$iso8601 format YYYY-MM-DDTHH:MM:SSZ for postgres; YYYY-MM-DD HH:MM:SS for bigquery
float-?\d+\.?\d* 
int\d+ 
compoundeach subattribute is one of the above datatypesSee example below

Example compound attribute: the syntax is like

compound([["SUBATTRIBUTE_ID", "SUBATTRIBUTE NAME", "SUBATTRIBUTE DATA TYPE"]])

for example

compound([["origin","Origin","string"],["destination","Destination","string"],["date","Date","date"],["price","Price","float"]])

can be vieiwed as

compound(
    [   # id           # name        # datatype
        ["origin",     "Origin",     "string"], # subattribute 1
        ["destination","Destination","string"], # subattribute 2
        ["date",       "Date",       "date"],   # subattribute 3
        ["price",      "Price",      "float"]   # subatribute 4
    ]
)

From csv

The id column is computed dynamically based on attributes table. The modified and created values are set to current UTC timestamp. You can supply duplicated rows and they will be deduplicated before load.

The input is a csv in/tables/foo.csv

customer_entity_id,attribute_id,value
robin,unique,jedinecny-updated
robin,unique,jedinecny-updated
robin,unique_attr2,42

Delsert

"action": "delsert_customer_attributes"

All attribute_ids from the csv are first deleted from customer_attributes and then inserted. This is most likely the function you are looking for when loading customer_attributes.

Using this prevents a subtle bug of upserting sparse customer_attributes. On day one you compute attribute as true and insert into database. On day two, new event arrives and the value is now false - if you were using upsert, the value wouldn't be inserted (because we do not store empty strings, nulls, 0, false)

upsert_attribute_tags"

This doesn'tupserts covertag_ids for the bugattributes in 100%, but in general should be good to go. The only remaining edge case is when you compute attribute email_opened_in_last_30days on day 1. On day 2 this evaluates to false for all customers (so the csvattribute_tags youtable. areDoesn't about to load is empty and there is no way to recognize what attributes should be deleted from customer_attributes first - this information is read from the csv) so nothing is deleted/inserted, but in production with real data and a lot of customers this should happen only rarely.

Upsert

"action": "upsert_customer_attributes"

All attributes from the input csv are upserted. This does not delete anything from the database. You probably want to use delsert_customer_attributes unless you are using it to load customer attributes from profile-stitching.

From views

Two actions possible, work the same as customer_attributes from csv (above)table.

attribute_id,tag_id
"action": "upsert_customer_attributes_from_view",
      "action": "delsert_customer_attributes_from_view",
mailchimp_email,my_tag_1

Attributes can be calculated inside a VIEW and inserted directly into customer_attributes bypassing the expensive IO moving around and serializing CSV files.

config.json

{
   "parameters": {
      "action": "delsert_customer_attributes_from_view",
      "config": {
          #  one of the following
          "schema": "my_schema",
          "views": ["my_schema.number_of_transactions_total", "my_schema.revenue_total"]
      }
   }
 }

if you add config.schema all views existing in given schema will be upserted into customer_attributes. If you add config.views the specific views will be upserted.

The structure of the views is the same as the csvs in case of upserting attributes (columns customer_entity_id, attribute_id, value) and the attribute_id must exist in attributes table

Merged customer entities

pre profile stitching 1.0.0

"action": "update_merged_customer_entities"

old_customer_entity_id,new_customer_entity_id
robin,batman
robin2,batman

would

  • update all customer_events.customer_entity_id from robin or robin2 to batman
  • DELETE all rows from customer_attributes where customer_entity_id is one of the former customer_entities (robin or robin2). Instead, if we just updatedcustomer_entity_id(and recomputedid`) it would break consistency for unique attributes (imagine each former entity has it's own revenue calculated, after the join we would end up with 2 different revenues for the same customer_entity). The safe way is to just drop it.

This is useful in cdp-profile-stitching until version <1.0.0. When using later version, use the sparse option below

profile stitching 1.0.0 +

"action": "update_merged_customer_entities_sparse"

The newer profile stitching has slightly changed logic of merging 2+ customer entities the new event no longer becomes the new "parent" customer entity. Instead the the smaller entity (less # of attributes) along with the event entity is merged into the existing entity

# old way
    A  B  (= existing customer entities)
     \/
 NEW_ENTITY (= parent)

# new way
 B   NEW_ENTITY
  \    /
   \  /
    A  (= existing entity == parent entity)

This gets saved as

old_customer_entity_id,new_customer_entity_id
B,A

and we need to - delete from customer_attributes both B and A. - update customer_events.customer_entity_id from the old_customer_entity_id to the new_customer_entity_id.

In this case this action must run (and finish) BEFORE the upsert of stitching customer attributes finishes

This is what the suffix _sparse represents.

for use with customer_event_entities instead of customer_events

"action": "update_merged_customer_entities_customer_event_ids_entities_sparse" same input csv

Executing custom queries

If "action": "execute_custom_query"

all files placed into `in/files are assumed to contain an sql query each and they will be executed one by one Be very careful, as you can easily delete everything**

This is useful, for example when creating customer attribute views

Use in GDPR protection

If a customer entity requests a GDPR protection, we must - Get all customer_events.id for given customer_entity containing PII data - Delete all customer_attributes for given customer_entity_id - Set all affected customer_events.customer_entity_id=NULL (this is a precaution to make sure there aren't any orphaned events left) - hash sensitive data in event payloads, reformat them (because sensitive data can contain elements which are used in the event id hash calculation) - insert reformatted events - rerun profile stitching

For this purpose the actions remove_customer_entities and delete_customer_events can be used.