Transform events (CR)
Learn from this article:
Transform events into |
Format events into format suitable for customer_events table
The purpose of this component is to prepare the events' format for loading the events into the customer_events
table. For
CSV structure
Before loading data into the componentcustomer_events
usetable, we would need to transform the data into a specific structure so that it is ready for loading.
Here are the columns required in the csv, their data type and the values expected
Column Name | Type | Values |
source_id | string | The source id of a source defined under Sources in Meiro Business Explorer It should be in snake casing and human readable. |
type | string |
The type of event defined under Events in Meiro Business Explorer It should be in snake casing and human readable. |
version | string |
The version of event defined under Events in Meiro Business Explorer It should follow this |
user_identifiers | string |
A concatenated string of identifiers of a customer. Example: john.doe@meiro.io, john.doe@meiro.io420987654321 |
event_time | string - in datetime format | A datetime string in this format 'YYYY-MM-DD HH:MM:SS' This event time must be in UTC timezone |
payload | json | This is where the bulk of the data goes. So long as the payload is a valid json format, anything goes. |
Remember: Events should be supplied in new-line-delimited-JSONcsv formatformat, (suffixwith .ndjson).these 6 columns.
source_id, type, version, user_identifiers, event_time and payload
ConfigurationSample code in Python
For example, given a file in data/in/files/events_mc_subscribed.ndjson
with the following 2 rows:
{
"email":"robin@meiro.io", "meta": {"date": "2018-08-18T14:15:16Z"}, "status": "subscribed", "list_id": "12345b", "list_name": "Loyal customers"}
{"email":"foo@bar.io", "meta": {"date": "2018-08-18T15:16:17Z"}, "status": "subscribed", "list_id": "12345b", "list_name": "Loyal customers"}
The goal is to produce data/out/tables/events_mc_subscribed.csv
, which can be uploaded to customer_events
table:
source_id | type | version | payload | ||||||
---|---|---|---|---|---|---|---|---|---|
2018-08-18T14:15:16Z | {"email":"robin@"... - 1:1 copy of original} | ||||||||
2018-08-18T15:16:17Z | {"email":"foo@bar.io"... - 1:1 copy of original} |
Here is a sample code in python to do this transformation.
import json, csv
from os.path import exists
if exists('/data/in/files/events_mc_subscribed.ndjson'):
with open('/data/in/files/events_mc_subscribed.ndjson','r') as inf, open('/data/out/tables/events_mc_subscribed.csv', 'w') as outf:
outcsv = csv.DictWriter(outf, ['source_id', 'type', 'version', 'user_identifiers', 'event_time', 'payload'])
outcsv.writeheader()
for l in inf:
jl = json.loads(l)
if not jl['user_id']:
continue
outcsv.writerow({
'source_id': 'mailchimp',
'type': 'subscribed',
'version': '1-0-0',
'user_identifiers': jl['email'],
'event_time': jl['meta']['date'],
'payload': json.dumps(jl)
})
Sample code in SQL
It is also possible to skip the transformation step in Python and transform the data during extraction with a SQL script. Below is an example.
Assume we have mc_subscription_status
table with these data:
status | list_id | list_name | status_date | |
robin@meiro.io | subscribed | 12345b | Loyal customers | 2018-08-18T14:15:16Z |
foo@bar.io | subscribed | 12345b | Loyal customers | 2018-08-18T15:16:17Z |
InHere is a nutshell,sample code in python to do this component extracts some values from the ndjson events to:transformation.
WITH construct the id of the event
construct the event_id (a reference to the events
table)
extract event time
set the required columns
The config.json
describes where to find these values in the event jsons.
The id
calculation
The event id
is calculatedbase as an(
md5SELECT ofemail, event_time,status, source,list_id, event_type
list_name, bystatus_date
default.
OptionallyFROM youmc_subscription_status
can specify extra values to be included in the hash by extra_id_rules
parameter (below). The values are a dot-separated json paths (meta.value
)
wouldSELECT
resolve to 42
in this json {"foo": "bar", "meta": {"value": 42}}
Important: The order of the extra_id_rules
DOES matter(!),'mailchimp' as wesource_id,
are dealing with hashes.
The event_id
calculation
The formula, in sql syntax, is md5("source_id" || "type" || "version")
.
Vanilla config
{
"events": [
{
"filename": "mc_subscribed_events.ndjson",
"optional": true,
"version": "0-1-0",
"event_type": "subscribed",
"source": "mailchimp",
"event_time_rule": "meta.date",
"extra_id_rules": ["email", "list_id"],
"event_time_exclude": true
}
]
}
filename
Which input file contains the events.
optional
If true
, doesn't raise error if the file is not found (which can happen if there are no events for this particular batch).
source
Hardcoded source_id
(as defined in the sources
table).
event_time_rule
"path.to.event_time.in.payload"
(jq style) used to populate the event_time
column
extra_id_rules
An array of values which are included in the event id
calculation. This is must include values that uniquely identify the event (i.e. a customer_id + the event_id in the source system etc.). The values of this array are "paths" (=rules) of where to find the actual values in the event json.
event_type
If set to ''
or null
or left undefined, it is infered from the filename
. Usedsubscribed' as thetype,
value of the customer_events.type
column.
event_time_exclude
If set to True, event_time
won't be included to event id
calculation and id
will be calculated'1-0-0' as version,
md5 of source, event_type, extra_ids (if available)
. If not set or set to False - event_time
will be included in calculationstatus_date as usual.
event_time,
email as user_identifiers,
to_json(base)
versionFROM base
WHERE status = 'subscribed'
Hardcoded value of the version
column.
Remember: By default, all files defined the the events
array need to be supplied, otherwise an error will be thrown, if you want to continue on missing files, set optional
to true (false
by default).