Setup
Requirements
In order to setup completely new way of profile stitching customers and calculate their attributes, few requirement must be met.
CDP Schema - there needs to be a CDP schema with all standard tables as created by Meiro Business Explorer. Tables like attributes, sources, etc. Make a note of all names of the schemas, you will need them later.
Raw events schema - this is where customer_events table (immutable log of customer events) lives. It should be named cdp_ce in all newer projects. This is where all new customer events are uploaded to.
Profile Stitching Schema - it should exist, but if it does not, create it. Good name is cdp_ps. It should contain table stitching_attributes_config. If it does not, create it and (see below) and configure it properly.
Attributes Schema - this is a new schema. Good name is cdp_attr. Before running this for the first time, it should contain table attributes with definition on how each attribute should be created. See more details below.
Meiro Integrations setup
Now that you have all schemas at place, prepare Meiro Integrations. Create a new workspace with 1 component Python from GIT, use the following repository and configuration. Use the schema names you created in the previous step.
https://github.com/meiroio/cps
{
"postgresql": {
"host": "cdp.meiro.space",
"user": "postgres",
"password": "",
"dbname": "postgres"
},
"cdp_schema": "public",
"ce_schema": "cdp_ce",
"ps_schema": "cdp_ps",
"attr_schema": "cdp_attr"
}
Profile Stitching Configuration
For Profile Stitching to work, it needs to have a configuration. If the table with configuration (stitching_attributes_config) does not exist, please, create it:
CREATE TABLE cdp_ps.stitching_attributes_config (
attribute_id character varying(255) REFERENCES public.attributes(id),
event_id character varying(255) REFERENCES public.events(id),
value_expression text
);
create index on cdp_ps.stitching_attributes_config ("attribute_id");
create index on cdp_ps.stitching_attributes_config ("event_id");
CREATE UNIQUE INDEX ON cdp_ps.stitching_attributes_config("event_id", "attribute_id");
Contents of the table should describe for each identifier in the event used for Profile Stitching one row, with information about how to get that value using JSON operators in PostgreSQL. Example:
value_expresssion |
attribute_id |
event_id |
---|---|---|
payload->>user_id |
stitching_user_id |
440df32e443bc6425c5c108c6e1789a0 |
lower(payload->>email) |
stitching_email |
a341e63a5bb831591aa8a390c6fd252c |
The table is the only requirement for Profile Stitching to work. Please make sure you have correctly included all identifiers from all events in here.
You can use(and should) configure Profile Stitching Rules in CDP DBdirectly, loaderthere componentis witha ”action”:”load_stitching_attributes_config”nice tofor loadit thein tableSetup to the database (helps with translating source_id, type, version to event_id)tab.
Attributes Calculation Configuration
attributes table in Attributes Schema
Basic principle of Attributes Calculation is to define all Attributes Definition in attributes table. To help with that, we have prepared few basic types of attributes. If pre-defined attribute types does not cover what you need, you can create custom ones.
Apart from pre-defined typed attributes, there is one more distinction. The way how Attributes are calculated is that for the Attribute query calculating the value only events belonging to the customer entity are available.
The attributes table is as follows:
CREATE TABLE cdp_attr.attributes (
id text PRIMARY KEY,
definition jsonb
);
CREATE UNIQUE INDEX attributes_pkey ON cdp_attr.attributes(id text_ops);
Where id is attribute ID as defined in attributes table in CDP Schema and definition is a JSON with following structure:
{
"id": "mx_device_family", # attribute ID
"type": "last", # type of attribute calculation
"value": "payload->>'device_family'",
"outer_value": "value",
"weight": "1",
"filter": "",
"outer_filter": "",
"sources": ["bank_web", "bank_app", "ewallet_app", "mx"],
"types": ["app_signin", "app_signup", "web_signin", "web_signup", "account_created", "ewallet_signin", "ewallet_signup", "forms_submitted"],
"versions": ["1-0-0"]
}
ToThe fields in configuration stand for:
- sources, types, version - a filter for customer events. Filters only customer events you want the attribute to be calculated on. If empty, it means "all" (default empty)
- type - type of calculation (see below, required)
- value - value extracted from payload, usually PostgreSQL JSON value expression (required)
- outer_value - value extracted from value (one above) - used for multi level extraction (default "value")
- weight - a weight of value, used for most_frequent, least_frequent calculation types if one customer event contains value with its weight (default 1)
- outer_weight - weight for the value extracted from value (default 1)
- filter - additional filter you can apply on the customer events, any valid PostgreSQL expression (default empty)
- outer_filter - filter applied on the value extracted from value (default empty)
Predefined calculation types:
- avg
- first
- last
- least_frequent
- list (list of
andallhowuniquepre-definedvalues) - max
- min
- most_frequent
- num_of_unique
refer(numberto:ofhttps://bitbucket.org/meiro_core/cps/src/c965817a727091bb5f98789af3c7cf4e21b49189/cps/attr.py#lines-125unique values) - sum
- value (first value we find - no ordering)
- custom
If you want to define your own, custom attribute, the JSON config for it should like this:
{
"id": "my_custom_attribute", # attribute ID
"type": "custom", # type of attribute calculation
"query": "SELECT count(*) AS value FROM base" # custom SQL query
}
Where base is a table available in the query that holds all customer events for the customer entity that is being calculated. It has a header: id, source_id, type, version, event_time, payload
Please beware that the custom query MUST return only 1 column with a name value. Do not forget that, please.
The query must return always 1 column, can return 0 to N rows.
Context Queries
However, there are attributes which do require some context about other customer entities, like Engagement attributes. For this case, there is a concept of context table available to your query, which behaves like a key-value store holding results of queries ran on the whole customer base.
Imagine you need an information about 0.9 percentile of customer events count on the whole customer entities base. In contex which is defined like this:
CREATE TABLE cdp_attr.context (
key text PRIMARY KEY,
value text,
query text,
modified timestamp without time zone
);
CREATE UNIQUE INDEX context_pkey ON cdp_attr.context(key text_ops);
You can define a key maximum_perc_all, with query:
select percentile_disc(0.9) WITHIN GROUP (ORDER BY count_all) as m from (
select
customer_entity_id,
count(*) as count_all
from public.customer_events
where "customer_entity_id" IS NOT NULL group by 1
) counts
Once you that, next time the component runs and it gets to the attribute calculation, before it starts with actual calculation for each customer entity, it will run the query and store the result of it in the valuecolumn. Hence, you can use this pre-calculated values in your custom queries in attributes definition, for example like this:
{
"id": "my_custom_attribute", # attribute ID
"type": "custom", # type of attribute calculation
"query": "SELECT count(*)/(SELECT value FROM cdp_attr.context WHERE key = 'maximum_perc_all') AS value FROM base" # custom SQL query
}
All values for context are updated before attribute calculation runs if they are older than 24 hours or if have not been calculated yet.
Attributes for all customers are calculated every 48 hours if there was no activity on the customer entity. If there was an activity on customer entity (new event, merging of customer entities), it will be updated with next run.
How to run Attributes from scratch
Simply truncate (with CASCADE) customer_entities in Attributes Schema. This will DELETE ALL VALUE for all attributes for all customer entities and with next run of the component it will start to calculate the attributes from the scratch. This is good if you really want to delete everything and start from scratch.
TRUNCATE TABLE cdp_attr.customer_entities CASCADE;
If you want to only force CPS to recalculate the customer entities, but KEEP DATA, you can do it by simply telling CPS “update all customer attributes right now, but keep existing data”:
Disable CPS component in MI;
Check DB there is no query with app_name "Meiro CPS" running:
SELECT * FROM pg_stat_activity;
If there is, kill it:
SELECT pg_terminate_backend(pid);
UPDATE cdp_attr.customer_entities SET calculated = NULL;
Enable CPS component in MI;
And to force context recalculation.
UPDATE cdp_attr.context SET modified = NULL;
How to run Profile Stitching from scratch
If you want to run Profile Stitching from scratch, re-configure your stitching_attributes_configfirst, before you do anything else. Once you are happy, truncate customer_entities in Attributes Schema, truncate matching table in Profile Stitching Schema and drop table customer_events_to_process in Profile Stitching Schema. Next time the component runs, it will think it runs from the start and it will start with Profile Stitching first, then Attributes Calculation.
TRUNCATE TABLE cdp_attr.customer_entities CASCADE;
TRUNCATE TABLE cdp_ps.matching;
DROP TABLE cdp_ps.customer_events_to_process;
DROP TABLE public.cached_source_events_per_date;
How to force cache update
In order to force the cache update in the CPS itself, set modified column to NULL in following tables. After that restart CPS component and watch logs for CACHE: messages about the cache being filled. Then refresh cache in MBE.
UPDATE cdp_attr.cached_attributes_counts_customers_per_attribute SET modified = NULL;
UPDATE cdp_attr.cached_attribute_values_counts SET modified = NULL;
How to deploy to production
If you do not include “production”: true in the config of CPS, it will calculate profile stitching, attributes, etc., but it will NOT touch anything production related. Only after you turn the production parameter to true and follow next steps, it will replace customer_events view and other parts and you “switch” to CPS.
After you are sure you have everything set up, configured and calculated, here are the steps to switch CDP to use this new architecture.
Stop all processing in Meiro Integrations.Disable current Profile Stitching and Attributes Calculation workspaces (the old ones)Disable (set to something like once a week, or make sure it will not run in next hour) Cache Refresh in CDPCheck that no processing is running usingselect * from pg_stat_activity;Do this to backup old views and tables in case of disaster recovery:ALTER TABLE public.pivoted_customer_attributes RENAME TO pivoted_customer_attributes_backup; ALTER VIEW public.customer_events RENAME TO customer_events_backup;Add”production”: trueto the config of CPS component, run it.After a moment you should see new viewspublic.pivoted_customer_attributesandpublic.customer_eventsProfit!
What if it does not work and you want to go back?
DROP VIEW IF EXISTS public.pivoted_customer_attributes;
DROP VIEW IF EXISTS public.customer_events;
ALTER TABLE public.pivoted_customer_attributes_backup RENAME TO pivoted_customer_attributes;
ALTER VIEW public.customer_events_backup RENAME TO customer_events;
CREATE VIEW public.pivoted_customer_attributes_view ...(from saved definition);
Troubleshooting
Calculating customer attributes takes a long time (and all indexes are there) → runVACUUM ANALYZE cdp_ce.customer_eventsto update statistics so query planner can make smarter decisions.