ePrivacy and GPDR Cookie Consent by Cookie Consent Skip to main content

How to rerun identity stitching for all/some entities/events and recalculate attributes

PS edge cases

It may happen when setting up PS rules, that you miss out some edge cases and this caused stitching customer entities wrongly. Examples of this cases can be:

  • ps_dentifier in the payload was parsed to text value None or 0. CPS is checking if PS identifier is not NULL, but None will be a valid value for the algorythm and it will stitch all customer event where pd_identifier = "None" to one huge entity.
  • ps_dentifier was parsed into integer and float for different data sources: 12345 and 12345.0 are different values for CPS and entities with these values won't be stitched.

Usually at the very beginning of the project when only small samples of the data were loaded, it makes more sense to rerun PS from scratch. However if big volume of the data was already processed rerunning PS from zero can take days to catchup and it is simply not effective. In this case it might be more sufficient to clean only problematic customer entities and customer events.

Check this documentation for more edge cases

Preparation

CockroachDB process

Purging


If you find yourself in need to remove all customer entities and restitch all customer events from scratch, use this:

UPDATE customer_attributes SET modified = NULL, identifiers = ARRAY[];
-- wait until plumbing removes all entities (3-5 minutes)
UPDATE customer_events SET stitched = NULL, identifiers = NULL;


Explanation: the update on customer_attributes sets a modified timestamp to NULL, which is a signal for a clean up routine to delete it from customer attributes and send a command to OpenSearch to delete it from there.

Restitching

If your entity to restitch is small (less than 1000 events), you can do the whole restitch in one step:

WITH entities AS (
UPDATE customer_attributes 
SET modified = NULL 
WHERE customer_entity_id = 'abc' RETURNING identifiers
)
UPDATE customer_events SET stitched = NULL, identifiers = NULL
FROM entities WHERE customer_events.identifiers <@ entities.identifiers;

If you need to restitch 1 or more customer entities:
1. First export event ids to separate table in external_data schema:

DROP TABLE IF EXISTS external_data.events_abc;
CREATE TABLE external_data.events_abc (id uuid primary key);
INSERT INTO external_data.events_abc
WITH unnested AS (
        SELECT unnest(identifiers) AS identifier FROM customer_attributes 
        WHERE customer_entity_id = 'abc')
SELECT id
    FROM customer_events
    INNER JOIN unnested ON customer_events.identifier = unnested.identifier

The example given above filters the events by customer_entity_id. However, you can also filter by other conditions if you want to only restitch certain events based on some other condition, such as a value in the event payload, or events in a certain time period based on event_time.

* It is possible that there are too many identifiers, and the export of event ids to external data will take too long to run. In this case, you may want to create workspace in MI to extract and then load the event ids.

Query example for the connector in the MI workspace:

/*aost*/
WITH unnested AS (
    SELECT unnest(identifiers) AS identifier FROM customer_attributes 
    WHERE customer_entity_id = 'abc'
    )
SELECT id
FROM customer_events
INNER JOIN unnested ON customer_events.identifier = unnested.identifier

Important It’s important to add the line /*aost*/ for the query to improve the query performance by decreasing transaction conflicts, you can read more about it in CockroachDB docs here

2. Update entity in customer_attributes. Set its modified to NULL, which means it will be deleted (both in CR and OS), set identifiers to NULL to make sure the connection between customer_events and the entity will be deleted:

UPDATE customer_attributes 
SET modified = NULL,identifiers = NULL
WHERE customer_entity_id = 'abc'; 

3. After approx 1 minute check if entities have been deleted from DB:

SELECT  customer_entity_id, modified, calculated
FROM customer_attributes 
WHERE customer_entity_id = 'abc';

4. Finally update customer_events. For all of customer events belonging to this customer entity the timestamp stitched is set to NULL, which tells identity stitching to stitch it again. If there is a lot of rows, it is recommended to run the query with high priority: 

BEGIN;
SET TRANSACTION PRIORITY HIGH;
UPDATE customer_events
SET stitched    = NULL,
    identifiers = NULL
from external_data.events_abc events
where customer_events.id = events.id::UUID ;
COMMIT;

* It is possible that there will be a big amount of events to update. In that case it is recommended to run the update statement in batches using Python 3 code configuration in Meiro Integrations.

Example code in Meiro Integrations:

import psycopg2, json
from psycopg2 import extras

CONFIG_PATH = "/data/config.json"
FIELDNAMES_MAP = {
    "CR_host": "host",
    "CR_db": "dbname",
    "CR_user": "user",
    "CR_port": "port",
    "#CR_pass": "password"
}

def parse_config(config_path=CONFIG_PATH):
    with open(config_path) as inf:
        return json.load(inf)
        
def get_db_creds(config, fieldnames_map=FIELDNAMES_MAP):
    return {value: config['parameters']['vars'][key] for key, value in fieldnames_map.items()}
    
def restitch(db_creds):
    conn = None
    try:
        conn = psycopg2.connect(**db_creds)
        cur = conn.cursor()
        cur.execute(""" 
            with deleted as (
                delete from external_data.events_abc limit 10000 returning *
            ), updated as (
            update customer_events
            SET stitched = NULL, identifiers = NULL
            where id in(select id::uuid from deleted)
            returning id)
            select * from updated;
        """)
        conn.commit()
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()
            
config = parse_config()
db_creds = get_db_creds(config)
restitch(db_creds)

While running this code the plumbing queue needs to be monitored carefully, you can do this by monitoring the Grafana logs in Business Explorer. Refer to this doc for more details (TBA)

Recalculating

If you simply want to recalculate customer entity (calculate all attributes) without messing with customer events, stitching, etc.:

UPDATE customer_attributes 
SET calculated = NOW() - '1 year'::interval
WHERE customer_entity_id = 'abc'

Before the cleaning:

  • Define the problematic customer entity(s). If it is just one huge entity which needs to be "unstitched" - save it's value, if there many entities - decide how to filter these entities.
  • Make sure you know how PS rule should be fixed and test it on a relevant data sample
  • Disable CPS component in MI and stop all the processes with app_name "Meiro CPS" from running. 
--Check DB there is no query with app_name "Meiro CPS" running:
SELECT * FROM pg_stat_activity;

--If there is, kill it:
SELECT pg_terminate_backend(pid);
  • Create a separate schema where tables with entities and events which needs to be restiched will be stored.

Warning: Make sure CPS is not running until you finish with the cleaning.

How to make the cleaning

PostgreSQL process (to be deprecated)

Step 0

If events need to be deleted, delete them as the first step.

delete from cdp_ce.customer_events ce where id in --condition based on which entity(ies) are filtered
Step 1

Create a table where entities to delete will be stored.

create table temp_table.entities_to_clean as
	select customer_entity_id from public.customer_events
	where --condition based on which entity(ies) are filtered
Step 2
Create a table to store all IDs of customer_events belonging to these customer entities
create table temp_table.events_to_clean as
	select id from public.customer_events
	where customer_entity_id in (select * from temp_table.entities_to_clean)
Step 3
 Remove all rows with customer_entity_ids from cdp_ps.matching table (this will "unstitch" the events).
delete from cdp_ps.matching
where matching.customer_entity_id in (select customer_entity_id::uuid from temp_table.entities_to_clean)
Step 4
Insert customer_entity_id and delete action into cdp_attr.customer_entities_audit to clean everything related to these customer_entity_ids.
insert into cdp_attr.customer_entities_audit(customer_entity_id, created, action)
select distinct customer_entity_id::uuid,current_timestamp, 'delete' from temp_table.entities_to_clean
Step 5
insert all customer_events ids belonging to the entities to clean into cdp_ps.customer_events_to_process to restitch the events.
insert into cdp_ps.customer_events_to_process(customer_event_id, created)
select id::uuid, current_timestamp from temp_table.events_to_clean
Step 6

Update PS rules and enable CPS for running.

Important: Run the queries one by one and make sure that the previous step was finished before running the next one. 


Help! I have accidentally deleted the customer event ids from cdp_ps.matching and lost the customer event ids to clean before inserting them into cdp_ps.customer_events_to_process !

Don't panic, first, make sure CPS is not running, then run the following query to insert ALL events that are not present in cdp_ps.matching table, a.k.a. events that have not been stitched.

insert into cdp_ps.customer_events_to_process(customer_event_id, created)
select id::uuid, current_timestamp from cdp_ce.customer_events ce 
left join cdp_ps.matching m on ce.id::uuid = m.customer_event_id where m.customer_event_id is null;

Warning: Take note that this query can be expensive depending on how large the volume of data is. To avoid this problem from happening in the first place, follow the previous steps carefully. This is intended to be a workaround only.