Skip to content

Commit

Permalink
added regex functionality for allow lists in the analyzer (#1357)
Browse files Browse the repository at this point in the history
  • Loading branch information
NarekAra authored Apr 25, 2024
1 parent e64d8ec commit 55bfb8f
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 10 deletions.
53 changes: 43 additions & 10 deletions presidio-analyzer/presidio_analyzer/analyzer_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
from typing import List, Optional
import regex as re

from presidio_analyzer import (
RecognizerRegistry,
Expand Down Expand Up @@ -136,6 +137,8 @@ def analyze(
ad_hoc_recognizers: Optional[List[EntityRecognizer]] = None,
context: Optional[List[str]] = None,
allow_list: Optional[List[str]] = None,
allow_list_match: Optional[str] = "exact",
regex_flags: Optional[int] = re.DOTALL | re.MULTILINE | re.IGNORECASE,
nlp_artifacts: Optional[NlpArtifacts] = None,
) -> List[RecognizerResult]:
"""
Expand All @@ -156,6 +159,10 @@ def analyze(
with the recognized entity's recognizer context
:param allow_list: List of words that the user defines as being allowed to keep
in the text
:param allow_list_match: How the allow_list should be interpreted; either as "exact" or as "regex".
- If `regex`, results which match with any regex condition in the allow_list would be allowed and not be returned as potential PII.
- if `exact`, results which exactly match any value in the allow_list would be allowed and not be returned as potential PII.
:param regex_flags: regex flags to be used for when allow_list_match is "regex"
:param nlp_artifacts: precomputed NlpArtifacts
:return: an array of the found entities in the text
Expand All @@ -172,6 +179,7 @@ def analyze(
>>> print(results)
[type: PHONE_NUMBER, start: 19, end: 31, score: 0.85]
"""

all_fields = not entities

recognizers = self.registry.get_recognizers(
Expand Down Expand Up @@ -228,7 +236,9 @@ def analyze(
results = self.__remove_low_scores(results, score_threshold)

if allow_list:
results = self._remove_allow_list(results, allow_list, text)
results = self._remove_allow_list(
results, allow_list, text, regex_flags, allow_list_match
)

if not return_decision_process:
results = self.__remove_decision_process(results)
Expand Down Expand Up @@ -314,22 +324,45 @@ def __remove_low_scores(

@staticmethod
def _remove_allow_list(
results: List[RecognizerResult], allow_list: List[str], text: str
results: List[RecognizerResult],
allow_list: List[str],
text: str,
regex_flags: Optional[int],
allow_list_match: str,
) -> List[RecognizerResult]:
"""
Remove results which are part of the allow list.
:param results: List of RecognizerResult
:param allow_list: list of allowed terms
:param text: the text to analyze
:param regex_flags: regex flags to be used for when allow_list_match is "regex"
:param allow_list_match: How the allow_list
should be interpreted; either as "exact" or as "regex"
:return: List[RecognizerResult]
"""
new_results = []
for result in results:
word = text[result.start : result.end]
# if the word is not specified to be allowed, keep in the PII entities
if word not in allow_list:
new_results.append(result)
if allow_list_match == "regex":
pattern = "|".join(allow_list)
re_compiled = re.compile(pattern, flags=regex_flags)

for result in results:
word = text[result.start : result.end]

# if the word is not specified to be allowed, keep in the PII entities
if not re_compiled.search(word):
new_results.append(result)
elif allow_list_match == "exact":
for result in results:
word = text[result.start : result.end]

# if the word is not specified to be allowed, keep in the PII entities
if word not in allow_list:
new_results.append(result)
else:
raise ValueError(
"allow_list_match must either be set to 'exact' or 'regex'."
)

return new_results

Expand Down Expand Up @@ -357,9 +390,9 @@ def __add_recognizer_id_if_not_exists(
RecognizerResult.RECOGNIZER_IDENTIFIER_KEY
] = recognizer.id
if RecognizerResult.RECOGNIZER_NAME_KEY not in result.recognition_metadata:
result.recognition_metadata[
RecognizerResult.RECOGNIZER_NAME_KEY
] = recognizer.name
result.recognition_metadata[RecognizerResult.RECOGNIZER_NAME_KEY] = (
recognizer.name
)

@staticmethod
def __remove_decision_process(
Expand Down
71 changes: 71 additions & 0 deletions presidio-analyzer/tests/test_analyzer_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
from abc import ABC
from typing import List, Optional
import re

import pytest

Expand Down Expand Up @@ -240,6 +241,76 @@ def test_when_allow_list_specified_multiple_items(loaded_analyzer_engine):
assert len(results) == 0


def test_when_regex_allow_list_specified(loaded_analyzer_engine):
text = "bing.com is his favorite website, microsoft.com is his second favorite, azure.com is his third favorite"
results = loaded_analyzer_engine.analyze(
text=text,
language="en",
)
assert len(results) == 3
assert_result(results[0], "URL", 0, 8, 0.5)

results = loaded_analyzer_engine.analyze(
text=text, language="en", allow_list=["bing"], allow_list_match = "regex"
)
assert len(results) == 2
assert text[results[0].start : results[0].end] == "microsoft.com"
assert text[results[1].start : results[1].end] == "azure.com"


def test_when_regex_allow_list_specified_but_none_in_file(loaded_analyzer_engine):

text = "bing.com is his favorite website"
results = loaded_analyzer_engine.analyze(
text=text,
language="en",
)
assert len(results) == 1
assert_result(results[0], "URL", 0, 8, 0.5)

results = loaded_analyzer_engine.analyze(
text=text, language="en", allow_list=["microsoft"], allow_list_match = "regex"
)
assert len(results) == 1
assert_result(results[0], "URL", 0, 8, 0.5)


def test_when_regex_allow_list_specified_multiple_items_with_missing_flags(loaded_analyzer_engine):
text = "bing.com is his favorite website, microsoft.com is his second favorite, azure.com is his third favorite"
results = loaded_analyzer_engine.analyze(
text=text,
language="en",
)
assert len(results) == 3
assert_result(results[0], "URL", 0, 8, 0.5)

results = loaded_analyzer_engine.analyze(
text=text, language="en", allow_list=["bing", "microsoft"], allow_list_match = "regex",
)
assert len(results) == 1
assert text[results[0].start : results[0].end] == "azure.com"


def test_when_regex_allow_list_specified_with_regex_flags(loaded_analyzer_engine):
text = "bing.com is his favorite website, microsoft.com is his second favorite, azure.com is his third favorite"
results = loaded_analyzer_engine.analyze(
text=text,
language="en",
)
assert len(results) == 3
assert_result(results[0], "URL", 0, 8, 0.5)

results = loaded_analyzer_engine.analyze(
text=text, language="en", allow_list=["BING", "MICROSOFT", "AZURE"], allow_list_match = "regex", regex_flags=0
)
assert len(results) == 3

results = loaded_analyzer_engine.analyze(
text=text, language="en", allow_list=["BING", "MICROSOFT", "AZURE"], allow_list_match = "regex", regex_flags=re.IGNORECASE
)
assert len(results) == 0


def test_when_removed_pattern_recognizer_then_doesnt_work(unit_test_guid):
pattern = Pattern("spaceship pattern", r"\W*(spaceship)\W*", 0.8)
pattern_recognizer = PatternRecognizer(
Expand Down

0 comments on commit 55bfb8f

Please sign in to comment.