ePrivacy and GPDR Cookie Consent by Cookie Consent Skip to main content

Setup

Learn from this article:

Requirements
Meiro Integrations setup
Profile stitching configuration
Attributes calculation configuration
How to run attributes from scratch
How to run profile stitching from scratch
How to force cache update


Requirements

In order to setup completely new way of profile stitching customers and calculate their attributes, few requirements must be met.

  • CDP Schema - there needs to be a CDP schema with all standard tables as created by Meiro Business Explorer. Tables like attributes, sources, etc. Make a note of all names of the schemas, you will need them later.
  • Raw events schema - this is where customer_events table (immutable log of customer events) lives. It should be named cdp_ce in all newer projects. This is where all new customer events are uploaded to.
  • Profile Stitching Schema - it should exist, but if it does not, create it. Good name is cdp_ps. It should contain table stitching_attributes_config. If it does not, create it and (see below) and configure it properly.
  • Attributes Schema - this is a new schema. Good name is cdp_attr. Before running this for the first time, it should contain table attributes with definition on how each attribute should be created. See more details below.

Meiro Integrations setup

Now that you have all schemas at place, prepare Meiro Integrations. Create a new workspace with 1 component Python from GIT, use the following repository and configuration. Use the schema names you created in the previous step.

https://github.com/meiroio/cps

{
  "postgresql": {
    "host": "cdp.meiro.space",
    "user": "postgres",
    "password": "",
    "dbname": "postgres"
  },
  "cdp_schema": "public",
  "ce_schema": "cdp_ce",
  "ps_schema": "cdp_ps",
  "attr_schema": "cdp_attr"
}

Profile stitching configuration

For profile stitching to work, it needs to have a configuration. If the table with configuration (stitching_attributes_config) does not exist, please, create it:

CREATE TABLE cdp_ps.stitching_attributes_config (
    attribute_id character varying(255) REFERENCES public.attributes(id),
    event_id  character varying(255) REFERENCES public.events(id),
    value_expression text
);
create index on cdp_ps.stitching_attributes_config ("attribute_id");
create index on cdp_ps.stitching_attributes_config ("event_id");
CREATE UNIQUE INDEX ON cdp_ps.stitching_attributes_config("event_id", "attribute_id");

Contents of the table should describe for each identifier in the event used for profile stitching one row, with information about how to get that value using JSON operators in PostgreSQL. For example:

value_expresssion

attribute_id

event_id

payload->>user_id

stitching_user_id

440df32e443bc6425c5c108c6e1789a0

lower(payload->>email)

stitching_email

a341e63a5bb831591aa8a390c6fd252c

The table is the only requirement for profile stitching to work. Please make sure you have correctly included all identifiers from all events in here.

Configure profile stitching rules in Meiro Business Explorer directly, it is possible in the Setup/ Identity Stitching tab. More details here.

Attributes calculation configuration

attributes table is in attributes schema

Basic principle of attributes calculation is to define all attributes definition in attributes table. To help with that, we have prepared a few basic types of attributes. If pre-defined attribute types do not cover what you need, you can create custom ones.

Apart from pre-defined typed attributes, there is one more distinction. The way how attributes are calculated is that for the attribute query calculating the value only events belonging to the customer entity are available.

The attributes table is as follows:

CREATE TABLE cdp_attr.attributes (
    id text PRIMARY KEY,
    definition jsonb
);
CREATE UNIQUE INDEX attributes_pkey ON cdp_attr.attributes(id text_ops);

Where id is attribute ID as defined in attributes table in CDP schema and definition is a JSON with following structure:

{
  "id": "mx_device_family", # attribute ID
  "type": "last", # type of attribute calculation
  "value": "payload->>'device_family'", 
  "outer_value": "value", 
  "weight": "1", 
  "filter": "", 
  "outer_filter": "",
  "sources": ["bank_web", "bank_app", "ewallet_app", "mx"], 
  "types": ["app_signin", "app_signup", "web_signin", "web_signup", "account_created", "ewallet_signin", "ewallet_signup", "forms_submitted"], 
  "versions": ["1-0-0"]
}

The fields in configuration stand for:

sources, types, version a filter for customer events. Filters only customer events you want the attribute to be calculated on. If empty, it means "all" (default empty)
type type of calculation (see below, required) 
value value extracted from payload, usually PostgreSQL JSON value expression (required)
outer_value value extracted from value (one above) - used for multi level extraction (default "value")
weight weight of value, used for most_frequent, least_frequent calculation types if one customer event contains value with its weight (default 1)
outer_weight

weight for the value extracted from value (default 1)

filter additional filter you can apply on the customer events, any valid PostgreSQL expression (default empty)
outer_filter filter applied on the value extracted from value (default empty)


Predefined calculation types:

  • avg
  • first
  • last
  • least_frequent
  • list (list of all unique values)
  • max
  • min
  • most_frequent
  • num_of_unique (number of unique values)
  • sum
  • value (first value we find - no ordering)
  • custom

If you want to define your own, custom attribute, the JSON config for it should like this:

{
  "id": "my_custom_attribute", # attribute ID
  "type": "custom", # type of attribute calculation
  "query": "SELECT count(*) AS value FROM base" # custom SQL query 
}

Where base is a table available in the query that holds all customer events for the customer entity that is being calculated. It has a header: id, source_id, type, version, event_time, payload

Please beware that the custom query MUST return only 1 column with a name value. Do not forget that, please.

The query must return always 1 column, can return 0 to N rows.

Context queries

However, there are attributes that do require some context about other customer entities, like engagement attributes. For this case, there is a concept of context table available to your query, which behaves like a key-value store holding results of queries ran on the whole customer base.

Imagine you need information about the 0.9 percentile of customer events count on the whole customer entities base. In contex which is defined like this:

CREATE TABLE cdp_attr.context (
    key text PRIMARY KEY,
    value text,
    query text,
    modified timestamp without time zone
);
CREATE UNIQUE INDEX context_pkey ON cdp_attr.context(key text_ops);

You can define a key maximum_perc_all, with query:

select percentile_disc(0.9) WITHIN GROUP (ORDER BY count_all) as m from (
  select 
    customer_entity_id,
    count(*) as count_all
  from public.customer_events
  where "customer_entity_id" IS NOT NULL group by 1 
) counts

Once you that, next time the component runs and it gets to the attribute calculation, before it starts with actual calculation for each customer entity, it will run the query and store the result of it in the valuecolumn. Hence, you can use these pre-calculated values in your custom queries in attributes definition, for example like this:

{
  "id": "my_custom_attribute", # attribute ID
  "type": "custom", # type of attribute calculation
  "query": "SELECT count(*)/(SELECT value FROM cdp_attr.context WHERE key = 'maximum_perc_all') AS value FROM base" # custom SQL query 
}

All values for context are updated before attribute calculation runs if they are older than 24 hours or if have not been calculated yet.

Meta attributes

Have you found yourself in a situation where you have a bunch of attributes all using same or similar logic? Something like common clean up on events before extracting a value, repeating and copy pasting the logic from one attribute to five or ten more attributes? And then when you found a problem in that cleaning logic you had to change it in all ten attributes? Well, not anymore. 

If you have common thing you would like to use in multiple attributes (for example calculating all bought products and their prices, then using this information to get attributes like average price per product, total quantity, total price, average number of products per transaction...), you can define it as meta attribute (simple CTE) that you can use in any other attribute calculation. Speed and convenience, yay!

There is a new table cdp_attr.meta_attributes. It has ID and definition column. To ID put a name, lower case, no spaces, in definition have the query you would like to have available as CTE. It can be anything, really, it will be available as a table. Look at the example below, where I configure 1 meta attribute used by other attributes.

-- meta attribute
-- ID: products_bought
-- definition:
select 
	payload::jsonb->>'product_id', 
    payload::jsonb->>'quantity', 
    payload::jsonb->>'price'
from base
where type = 'purchase' and event_time > NOW() - '30 days'::interval

-- attribute
-- ID: avg_price_per_product
-- definition (custom query):
select avg(price) from products_bought

-- attribute
-- ID: sum_price
-- definition (custom query):
select sum(price) from products_bought;

-- attribute
-- ID: sum_quantity
-- definition (custom query):
select sum(quantity) from products_bought;

How to run attributes from scratch

If you want to only force CPS to recalculate the customer entities, you can do it by simply telling CPS “update all customer attributes right now, but keep existing data”:

Step 1: Disable CPS component in MI.

Step 2: Check DB there is no query with app_name "Meiro CPS" running, if there is, kill it:

select pg_terminate_backend(pid)
from pg_stat_activity
where application_name ilike '%cps%';

Step 3: Force CPS to recalculate attributes

UPDATE cdp_attr.customer_entities SET calculated = NULL;

To force context recalculation:

UPDATE cdp_attr.context SET modified = NULL;

Step 4: Enable CPS component in MI.

How to run profile stitching from scratch

If you want to run profile stitching from scratch:

Step 1: stop CPS and kill all CPS related processes in your DB:

select pg_terminate_backend(pid)
from pg_stat_activity
where application_name ilike '%cps%';

Step 2: re-configure your stitching_attributes_config before you do anything else.

Step 3: Once you are happy, clean the tables created by CPS process. Next time the component runs, it will think it runs from the start and it will create the tables and restart the process from scratch.

DROP VIEW IF EXISTS public.customer_events;
DROP MATERIALIZED VIEW IF EXISTS public.cached_source_events_per_date;
DROP TABLE IF EXISTS cdp_attr.customer_entities CASCADE;
DROP TABLE IF EXISTS public.pivoted_customer_attributes;
DROP TABLE IF EXISTS cdp_attr.customer_entities_to_refresh;
DROP TABLE IF EXISTS cdp_attr.customer_entities_to_refresh_attrs;
DROP TABLE IF EXISTS cdp_attr.customer_entities_audit;
DROP TABLE IF EXISTS cdp_ps.matching;
DROP TABLE IF EXISTS cdp_ps.prep;
DROP TABLE IF EXISTS cdp_ps.customer_events_to_process;

How to force cache update

In order to force the cache update in the CPS itself, set modified column to NULL in the following tables. After that restart the CPS component and watch logs for CACHE: messages about the cache being filled. Then refresh the cache in Merio Business Explorer.

UPDATE cdp_attr.cached_attributes_counts_customers_per_attribute SET modified = NULL; 

UPDATE cdp_attr.cached_attribute_values_counts SET modified = NULL;