这个Aho-Corasick算法,名字挺难读,所以有的地方会简称为AC自动机算法。我先前是不知道这个算法的,在看《Pro .NET Memory Management》这本书的前言中,介绍了一个由Adam Sitnik维护有关.NET性能的代码库,这里面有一篇介绍.NET性能调优的文章 High-performance .NET by example: Filtering bot traffic,这篇文章写的很好,它在里面举了个例子:说是有一个广告运营公司,会根据不同的用户,在页面上展示不同的广告,然后会根据点击效果向客户收取费用。但在互联网上,有很多机器人(bot)访问,比如一些搜索引擎爬虫,还有一些恶意的刷流量的脚本。对于这些访问,在页面上展示广告是浪费资源的,现在需要辨别出哪些是正常用户的点击,那些是爬虫产生的点击,对于爬虫或者一些恶意的访问,进行一些特别的操作。

辨别访客身份是通过HTTP请求里面的user agent字段来进行的,每个浏览器都有一个身份,比如使用Fiddler可以看到,我在Windows10上用Chrome浏览器访问自己博客时,客户端Agent显示为:

User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36

一些爬虫,比如Google的它的Agent如下:

Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)

可以看到,这其中有Googlebot关键字。还有一些常见的爬虫或者脚本访问的User Agent如下:

googlebot
bingbot
twitterbot
curl
......

需要说明的是,这个user agent可以仿造,一种方式是直接使用脚本运行Chrome浏览器,然后在里面执行一系列模拟操作,比如Selenium,还有一种是通过代码的方式手动设置user agent然后发起访问。

现在问题就简化为了:给出一系列模式串以及一个文本串,然后判断模式串在文本串中是否存在。

简单但粗暴的办法


最简单粗暴的方法就是遍历整个关键字数组keywords,逐个判断字符串str中是否存在某个关键字。

public bool Exist(List<string> keywords,string str)
{
	foreach (string keyword in keywords)
	{
		if (str.IndexOf(keyword) > -1)
		{
			return true;
		}
	}
	return false;
}

如果keywords很大,并且调用很频繁,这样效率会很低。有没有更好的办法呢?Aho-Corasick算法这类多模式匹配算法就是专门用来处理这类问题的。

Aho-Corasick自动机算法


Aho-Corasick自动机算法(以下简称AC自动机算法)它是一个经典的多模式匹配算法,它借鉴了KMP算法的思想(KMP算法于1974年提出),可以由有限状态机(Finite State Automata:FSA)来表示。KMP是单模式匹配算法,所谓单模式匹配:给出一个模式串,一个文本串,然后看模式串在文本串中是否存在。AC自动机是多模式串的字符匹配算法,给出多个模式串,一个文本串,然后可以判断出模式串在文本串中是否存在。这个算法出自1975年的Efficient string matching: an aid to bibliographic search 这篇论文,里面介绍的很详细,下面也是对这篇论文的介绍,有兴趣的建议直接阅读论文。

▲ 1975年发表的AC算法的原文

算法的基本原理是:

先根据多模式串建立一个有限状态自动机FSA,在进行模式匹配时,设当前状态为Scur,输入串中的当前符号为C,运行FSA,试图得到下一状态Snext。如果Snext无效,即发生失配,则根据规则,将当前状态回退到一个适合的状态Sfail,然后重复上一过程,直到Snext有效或Scur为0。在匹配的过程中,如果当前状态正好匹配了某个模式P,则输出它。

这里面的关键点在于,用于匹配的FSA跟输入的字符串无关,只跟模式串有关。匹配中如果发生失败,则FSA回退到某一状态,而输入的字符串则无须回退。

所以整个算法分为两大部分,第一部分是根据给定的关键字数组构建一个有限状态机FSA, 第二部分就是对于给定的字符串,将单个字符依次作为状态机的输入。当发生完整匹配时,产生匹配完成信号。

构建FSA


要构建状态机,需要一下三个函数:

  1. 状态跳转表,goto函数(g):用来确定对于当前的状态S和给定的条件C,如何得到下一个状态Snext
  2. 失配跳转表,fail函数(f):当goto函数根据当前状态S和给定的条件C,得到下一个状态无效时,应该回退到哪一个状态。
  3. 匹配结果输出表,output函数(output):决定处于哪个状态时,输出哪个匹配好的模式。

依然很抽象,这里举一个简单的例子,这个例子也是上述论文中的例子,假设关键字表为{“he”,“she”,“his”,“hers”},需要待匹配的字符串为“ushers”。

首先决定状态和跳转函数,然后计算失配跳转表,匹配结果输出表在这两个过程中完成。

对于第一个关键字 “he",构造图如下:

每一个圆圈表示一个状态,里面的数字为状态编号,初始状态为0,箭头表示状态转换,箭头由当前状态指向下一个状态,箭头上方的文字表示状态转换的条件。

然后将每个关键字按字符依次添加到状态表中。状态从0->1->2的过程能够匹配输出关键字“he”,所以输出表output中可以将状态2和“he”关联。

接着添加关键字“she”,得到如下状态转换表:

需要注意的是,处理每一个关键字的时候,都必须从初始状态0开始。在输出表output中,可以将状态5和“she”关联。

继续添加关键字“his”,得到如下状态转换表:

也是从状态0开始,在添加关键字“his”的时候,由于已经存在从0经过条件h到状态1的转换,所以不必要再次添加从0到1的转换。在输出的output表中,可以将状态7和“his”关联。

最后添加关键字“hers”,得到如下图:

▲Goto函数

还是从状态0开始,因为从0到1和1到2已经存在,所以不必添加。在output表中,将状态9和“hers”关联起来。

还有一点需要注意的是,对于初始状态0,任何不为h或s的条件,得到的下一个状态仍然为0。

上述的有限状态机图其实代表了goto表,上面的每个状态及状态转换都是有效的。goto表映的是有效的状态转换。如果转换失败时,就需要失配跳转表来回退到失配后的状态。

失配跳转表fail


首先定义一个深度depth,表示从初始状态到某个状态的最短距离。比如从状态0到状态1、3的深度是1,从0到状态2、6、4的深度是2,从0到状态5、7、8的深度是3等等。

先计算depth为1的状态的失配跳转表,然后计算depth为2的,这其实就是按照树的层级来依次计算。

首先规与状态0距离为1即深度为1的所有状态的fail值为1,对应上图中,就是状态1、3状态的fail为0。假设当前状态为S1,求f(S1)。可以确定的是S1的前一状态是唯一的(从上面的FSA图中也可以看出来)。假设S1的前一状态是S2,S2转换到当前状态S1的条件是C即:S1=g(S2,C),测试从S2的fail值使用条件C转换的状态:S3=g(f(S2),C)。如果S3成功,则表示fail(S1)=g(f(S2,C)。如果S3的状态无效,

深度为d的所有状态的fail值的计算,可以由所有深度为d-1的fail值的有效goto跳转表来获得。

假设r为深度为d-1的状态,要计算当前深度为d的状态s的f(s)的值,且已知g(r,c)=s,即r为s的前一个状态,且转移条件为c:

  1. 设状态state为r的失配状态f(r)。
  2. 计算state=f(state)一次或多次,直到g(state,c)匹配成功(一定能找到,因为g(0,a)一定是一个确定的值)。
  3. 则s的失配值f(s)=g(state,a)。

以上图为例,深度为1的状态的是失配值为0,即:

  • f(3)=f(1)=0。

现在要计算深度为2的状态,2、6、4的fail值。

  • 计算f(2),它的前一个状态是1,转换条件是e。f(1)=0,g(0,e)=0,所以f(2)=0.
  • 计算f(6),它的前一个状态是1,转换条件是i。f(1)=0,g(0,i)=0,所以f(6)=0.
  • 计算f(4),它的前一个状态是3,转换条件是h。f(3)=0,g(0,h)=1, 所以f(4)=1.

接下来计算深度为3的状态,8、7、5的fail值。

  • 计算f(8),它的前一个状态是2,转换条件是r。f(2)=0,g(0,r)=0,所以f(8)=0.
  • 计算f(7),它的前一个状态是6,转换条件是s。f(6)=0,g(0,s)=3,所以f(7)=3.
  • 计算f(5),它的前一个状态是4,转换条件是e。f(4)=1,g(1,e)=2,所以f(5)=2.

接下来计算深度为4的状态,9的fail值。

  • 计算f(9),它的前一个状态是8,转换条件是s。f(8)=0,g(0,s)=3。所以f(9)=3。

现在计算出了所有的状态的失配值fail,列表如下:

▲fail函数

在goto图上将fail函数绘制在一起:

▲黑色实线箭头表示goto,红色虚线箭头表示fail

在计算fail值的时候,也可以补充output表,比如f(5)=2,当FSA状态转换到状态5时,不仅匹配了she,还匹配了he,所以对于两个状态S和S', S>S', 如果f(S)=S',则output(S)应该添加到output(S')。output表如下:

▲output函数

匹配过程


有了gfoutput三个函数,对于输入的字符串“ushers”就能开始匹配了。

  1. 从状态0开始,输入条件“u”,发现无法跳转到下一个有效状态,所以回到状态0。
  2. 从状态0开始,输入条件"s",g(0,"s")=3,跳到状态3。
  3. 从状态3开始,输入条件“h”,g(3,"h")=4,跳转到状态4。
  4. 从状态4开始,输入条件“e”,g(4,"e")=5,跳转到状态5,状态5是output状态,所以输出匹配"she,he"。
  5. 从状态5开始,输入条件“r”,g(5,"r")=fail,无法跳转到下一个有效状态,所以查找状态5的回退函数,f(5)=2,跳转到状态2
  6. 从状态2开始,输入条件“r”,g(2,"r")=8,跳转到状态8。
  7. 从状态8开始,输入条件“s”,g(8,"s")=9,跳转到状态9,状态9是output状态,所以输出“hers”。

只需要对字符串“ushers”一次遍历,就能找到匹配的字串“she、he、hers”,这明显比简单粗暴的foreach循环版本高效了很多。

代码实现


AC算法的C#实现在High-performance .NET by example: Filtering bot traffic这篇文章里有提及,且它是将其作为性能优化的例子来说明的。这里摘录如下。

首先对于DFS的每一个状态,定义一个node。

[DebuggerDisplay("Value = {Value}, TransitionsCount = {_gotoDictionary.Count}")]
internal class AhoCorasickTreeNode
{
    public readonly char Value;
    private readonly List<string> _outputs;
    private readonly Dictionary<char, AhoCorasickTreeNode> _transitionDictionary;
    private readonly AhoCorasickTreeNode _parent;

    public AhoCorasickTreeNode Failure { get; set; }
    public IEnumerable<string> Outputs => _outputs;
    public AhoCorasickTreeNode ParentFailture => _parent == null ? null : _parent.Failure;
    public IEnumerable<AhoCorasickTreeNode> Transitions => _transitionDictionary.Values;

    public AhoCorasickTreeNode() : this(null, ' ')
    { }

    public AhoCorasickTreeNode(AhoCorasickTreeNode parent, char value)
    {
        Value = value;
        _parent = parent;
        _outputs = new List<string>();
        _transitionDictionary = new Dictionary<char, AhoCorasickTreeNode>();
    }

    public void AddOutput(string result)
    {
        if (!_outputs.Contains(result))
        {
            _outputs.Add(result);
        }
    }

    public void AddOutputs(IEnumerable<string> results)
    {
        foreach (var result in results)
        {
            AddOutput(result);
        }
    }

    public AhoCorasickTreeNode AddTransition(char c)
    {
        var node = new AhoCorasickTreeNode(this, c);
        _transitionDictionary.Add(node.Value, node);
        return node;
    }

    public AhoCorasickTreeNode GetTransition(char c)
    {
        AhoCorasickTreeNode result = null;
        _transitionDictionary.TryGetValue(c, out result);
        return result;
    }

    public bool ContainTransition(char c)
    {
        return _transitionDictionary.ContainsKey(c);
    }
}

每一个Node定义了一个Value,表示关键字里面的单个字符。output表示如果该状态在output表里,则用来存储关键字,_transitionDicitonary则用来存储到下一个节点的转换。

紧接着就是算法实现的主体。

internal class AhoCorasickTree
{
    internal AhoCorasickTreeNode Root { get; set; }
    public AhoCorasickTree(IEnumerable<string> keywords)
    {
        Root = new AhoCorasickTreeNode();
        if (keywords != null)
        {
            foreach (string keyword in keywords)
            {
                AddPatternToTree(keyword);
            }
            SetFailtureNodes();
        }
    }

    public bool Contains(string text)
    {
        return Contains(text, false);
    }

    public bool ContainsThatStart(string text)
    {
        return Contains(text, true);
    }

    public IEnumerable<string> FindAll(string text)
    {
        var pointer = Root;
        foreach (var c in text)
        {
            var transition = GetTransition(c, ref pointer);
            if (transition != null)
            {
                pointer = transition;
            }
            foreach (var result in pointer.Outputs)
            {
                yield return result;
            }
        }
    }

    private void SetFailtureNodes()
    {
        var nodes = FailToRootNode();
        FailUsingBFS(nodes);
        Root.Failure = Root;

    }

    private void FailUsingBFS(List<AhoCorasickTreeNode> nodes)
    {
        while (nodes.Count != 0)
        {
            var newNodes = new List<AhoCorasickTreeNode>();
            foreach (var node in nodes)
            {
                var failture = node.ParentFailture;
                var value = node.Value;
                while (failture != null && !failture.ContainTransition(value))
                {
                    failture = failture.Failure;
                }
                if (failture == null)
                {
                    node.Failure = Root;
                }
                else
                {
                    node.Failure = failture.GetTransition(value);
                    node.AddOutputs(node.Failure.Outputs);
                }
                newNodes.AddRange(node.Transitions);
            }
            nodes = newNodes;
        }
    }

    private List<AhoCorasickTreeNode> FailToRootNode()
    {
        var nodes = new List<AhoCorasickTreeNode>();
        foreach (var node in Root.Transitions)
        {
            node.Failure = Root;
            nodes.AddRange(node.Transitions);
        }
        return nodes;
    }

    private void AddPatternToTree(string keyword)
    {
        var node = Root;
        foreach (var c in keyword)
        {
            node = node.GetTransition(c) ?? node.AddTransition(c);
        }
        node.AddOutput(keyword);
    }

    private bool Contains(string text, bool onlyStarts)
    {
        var pointer = Root;
        foreach (var c in text)
        {
            var transition = GetTransition(c, ref pointer);
            if (transition != null)
            {
                pointer = transition;
            }
            else if (onlyStarts)
            {
                return false;
            }

            if (pointer.Outputs.Any())
            {
                return true;
            }
        }
        return false;
    }

    private AhoCorasickTreeNode GetTransition(char c, ref AhoCorasickTreeNode pointer)
    {
        AhoCorasickTreeNode transition = null;
        while (transition == null)
        {
            transition = pointer.GetTransition(c);
            if (pointer == Root)
            {
                break;
            }
            if (transition == null)
            {
                pointer = pointer.Failure;
            }
        }
        return transition;
    }
}

在构造函数里面,接收关键字数组,然后创建一个空白的Root节点,然后将所有关键字,按照字符逐个添加到Root节点的下面。然后按广度优先的顺序依次计算每一层节点的fail失配状态。

用法如下:

var tree = new AhoCorasickTree(new[] { "he", "she", "his", "hers" });
var b = tree.Contains("ushers");
var items = tree.FindAll("ushers");

适用场景


凡是给出一系列模式串,一个文本串,然后需要判断模式串在文本串中是否存在这类问题都可以考虑AC自动机算法,比如:

  • 有一个上百万的中文词典,给一中文句子,如何快速找到句子中出现的词典中的所有词 
  • 有一个上百万的中文词典,给一中文句子,如何将这一个句子进行分词
  • 有一系列包含有大量脏字和敏感词的字典,判断用户的评论中是否存在脏字或者敏感词,如果有,找出来
  • 有一系列垃圾邮件的关键字,给定一个邮件标题,如何快速判断该封邮件是否是垃圾邮件
  • ....

如果遇到以上这些或者类似的问题,让我们丢掉暴力且低效的循环遍历这类单模式匹配方法,拥抱高效的、具有典型的代表性的Aho-Corasick自动机这类多模式匹配算法吧~

 

参考:

https://gist.github.com/alexandrnikitin/e4176d6b472b39155a7e0e5d68264e65 

https://alexandrnikitin.github.io/blog/high-performance-dotnet-by-example/ 

https://github.com/adamsitnik/awesome-dot-net-performance 

https://www.cnblogs.com/StevenL/p/6818466.html 

https://blog.csdn.net/qq_43800119/article/details/125004298 

Efficient string matching (acm.org)