文本相似性

基于字符串匹配的文本相似度

  1. Hamming distance

​ 两个相同长度的字符串,有多少个位置是不同的token. e.g., d(cap, cat) = 1

  1. 编辑距离

​ 给定两个句子,最少需要经过多少步操作(删除,添加,替换)能够从一个句子转化成另一个句子

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def editDistDP(s1, s2): 
"""编辑距离计算
params:文本1,string
文本2,string
"""
m = len(s1.strip())
n = len(s2.strip())
# 创建一张表格记录所有子问题的答案
dp = [[0 for x in range(n+1)] for x in range(m+1)]
# 从上往下填充DP表格
for i in range(m+1):
for j in range(n+1):
if i == 0 or j == 0:
dp[i][j] = max(i, j)
# 如果两个字符串结尾字母相同,距离不变
elif s1[i-1] == s2[j-1]:
dp[i][j] = dp[i-1][j-1]
# 如果结尾字母不同,那我们就需要考虑三种情况,取最小的编辑距离
# 替换,添加,删除
else:
dp[i][j] = 1 + min(dp[i-1][j-1], dp[i][j-1], dp[i-1][j])

return dp[m][n]
  1. Jaccard Similarity

​ 给定两句话,把两句话中出现的单词取交集和并集,交集和并集的大小之商即为Jaccard Similarity。

​ Jaccard Similarity只考虑单词出现与否,忽略每个单词的含义,忽略单词的顺序,没有考虑单词出现的次数。

1
2
3
4
5
6
def jaccard_sim(s1, s2):
"""交并比"""
a = set(s1.strip().split())
b = set(s2.strip().split())
c = a.intersection(b)
return float(len(c) / (len(a) + len(b) - len(c)))

基于文本特征的相似度计算方法

SimHash

  1. 选择一个hashsize,例如32
  2. V = [0] * 32
  3. 把一段text变成features (shingles)
  4. 把每个feature都hash成32位
  5. 对于每个hash的每个位置,如果位置上是1就把V[i]加1,如果不是就把V[i]减1
  6. 最后,如果V[i]>0就设为1,否则设为0,得到的V就是我们想要的simhash值
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    import re
    import sys
    import hashlib
    import logging
    import numbers
    import collections
    from itertools import groupby


    def _hashfunc(x): # 使用的hash函数
    return int(hashlib.md5(x).hexdigest(), 16)


    class Simhash(object):
    def __init__(self,
    value,
    f=64,
    reg=r'[\w\u4e00-\u9fcc]+',
    hashfunc=None,
    log=None):
    """
    `f` is the dimensions of fingerprints

    `reg` is meaningful only when `value` is str and describes
    what is considered to be a letter inside parsed string. Regexp
    object can also be specified (some attempt to handle any letters
    is to specify reg=re.compile(r'\w', re.UNICODE))

    `hashfunc` accepts a utf-8 encoded string and returns a unsigned
    integer in at least `f` bits.
    """

    self.f = f
    self.reg = reg
    self.value = None

    if hashfunc is None:
    self.hashfunc = _hashfunc
    else:
    self.hashfunc = hashfunc

    if log is None:
    self.log = logging.getLogger("simhash")
    else:
    self.log = log

    if isinstance(value, Simhash):
    self.value = value.value
    elif isinstance(value, str):
    # print("build by text")
    self.build_by_text(str(value))
    elif isinstance(value, collections.Iterable):
    self.build_by_features(value)
    elif isinstance(value, numbers.Integral):
    self.value = value
    else:
    raise Exception('Bad parameter with type {}'.format(type(value)))

    def __eq__(self, other):
    """
    Compare two simhashes by their value.

    :param Simhash other: The Simhash object to compare to
    """
    return self.value == other.value

    def _slide(self, content, width=4):
    return [
    content[i:i + width]
    for i in range(max(len(content) - width + 1, 1))
    ]

    def _tokenize(self, content):
    content = content.lower()
    content = ''.join(re.findall(self.reg, content))
    ans = self._slide(content)
    return ans

    def build_by_text(self, content):
    features = self._tokenize(content)
    features = {k: sum(1 for _ in g) for k, g in groupby(sorted(features))}
    return self.build_by_features(features)

    def build_by_features(self, features):
    """
    核心方法
    `features` might be a list of unweighted tokens (a weight of 1
    will be assumed), a list of (token, weight) tuples or
    a token -> weight dict.
    """
    v = [0] * self.f # 初始化 [0,0,0,...]
    masks = [1 << i for i in range(self.f)] # 二进制下[1000, 0100, 0010, ...]

    if isinstance(features, dict):
    features = features.items()

    for f in features:
    if isinstance(f, str):
    h = self.hashfunc(f.encode('utf-8')) # hash成32位
    w = 1
    else:
    assert isinstance(f, collections.Iterable)
    h = self.hashfunc(f[0].encode('utf-8'))
    w = f[1]
    for i in range(self.f):
    # mask位置为1,则vi加上w,否则减去w
    v[i] += w if h & masks[i] else -w

    ans = 0
    for i in range(self.f):
    if v[i] > 0: # 如果大于0,就把那一位变成1
    ans |= masks[i]
    self.value = ans

    def distance(self, another):
    """计算两个vector有多少个位置不一样"""
    assert self.f == another.f
    x = (self.value ^ another.value) & ((1 << self.f) - 1) # (1 << self.f) - 1: self.f个位的1
    ans = 0
    while x: # bin(x)不全为0,即x非0
    ans += 1
    x &= x - 1 # bin计算,每算一次,低位的第一个1变为0
    return ans
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class SimhashIndex(object):
def __init__(self, objs, f=64, k=2, log=None):
"""
使用Simhash进行相似字符串检索
`objs` is a list of (obj_id, simhash)
obj_id is a string, simhash is an instance of Simhash
`f` is the same with the one for Simhash
`k` is the tolerance
"""
self.k = k
self.f = f
count = len(objs)

if log is None:
self.log = logging.getLogger("simhash")
else:
self.log = log

self.log.info('Initializing %s data.', count)

self.bucket = collections.defaultdict(set)

for i, q in enumerate(objs):
if i % 10000 == 0 or i == count - 1:
self.log.info('%s/%s', i + 1, count)
self.add(*q)

def get_near_dups(self, simhash):
"""
`simhash` is an instance of Simhash
return a list of obj_id, which is in type of str
"""
assert simhash.f == self.f

ans = set()

for key in self.get_keys(simhash):
dups = self.bucket[key]
self.log.debug('key:%s', key)
if len(dups) > 200:
self.log.warning('Big bucket found. key:%s, len:%s', key,
len(dups))

for dup in dups:
sim2, obj_id = dup.split(',', 1)
sim2 = Simhash(int(sim2, 16), self.f)

d = simhash.distance(sim2)
if d <= self.k:
ans.add(obj_id)
return list(ans)

def add(self, obj_id, simhash):
"""
`obj_id` is a string
`simhash` is an instance of Simhash
"""
assert simhash.f == self.f

for key in self.get_keys(simhash):
v = '%x,%s' % (simhash.value, obj_id)
self.bucket[key].add(v)

def delete(self, obj_id, simhash):
"""
`obj_id` is a string
`simhash` is an instance of Simhash
"""
assert simhash.f == self.f

for key in self.get_keys(simhash):
v = '%x,%s' % (simhash.value, obj_id)
if v in self.bucket[key]:
self.bucket[key].remove(v)

@property
def offsets(self):
"""
You may optimize this method according to <http://www.wwwconference.org/www2007/papers/paper215.pdf>
"""
return [self.f // (self.k + 1) * i for i in range(self.k + 1)]

def get_keys(self, simhash):
for i, offset in enumerate(self.offsets):
if i == (len(self.offsets) - 1):
m = 2**(self.f - offset) - 1
else:
m = 2**(self.offsets[i + 1] - offset) - 1
c = simhash.value >> offset & m
yield '%x:%x' % (c, i)

def bucket_size(self):
return len(self.bucket)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
data = {
1: u'How are you? I am fine. blar blar blar blar blar Thanks.',
2: u'How are you i am fine. blar blar blar blar blar Thanks.',
3: u'This is a simhash test',
}

# 序号和hash值保存
objs = [(str(k), Simhash(v)) for k, v in data.items()]

# 建立SimhashIndex对象,`k` is the tolerance
index = SimhashIndex(objs, k=10)
print("相似文本Bucket数量:", index.bucket_size())

# 输入需要查找的文本的hash值,get_near_dups获取相似文本
s1 = Simhash(u'How are you i am fine. blar blar blar blar blar thanks')
print("相似文本id:", index.get_near_dups(s1))

# 加入data文本
index.add('4', s1)
print("相似文本id:", index.get_near_dups(s1))

s2 = Simhash(u'How are you i am fine. blar blar blar thanks')
index.add('5', s2)
print("相似文本id:", index.get_near_dups(s1))

Cosine Similarity

  1. 将文本转化为feature vectors。(bag of words或者TF-IDF)
  2. 利用feature vectors计算文本相似度
1
2
3
4
5
6
7
8
9
10
11
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# bow
def bow_cosine(s1, s2):
"""返回值为ndarray类型"""
vectorizer = CountVectorizer()
vectorizer.fit([s1, s2]) # 词频统计词表
X = vectorizer.transform([s1, s2])
print(X.toarray())
return cosine_similarity(X[0], X[1])
1
2
3
4
5
6
7
8
from sklearn.feature_extraction.text import TfidfVectorizer

def tfidf_cosine(s1, s2):
vectorizer = TfidfVectorizer()
vectorizer.fit([s1, s2])
X = vectorizer.transform([s1, s2])
print(X.toarray())
return cosine_similarity(X[0], X[1])

Word2Vec

​ 利用句子中的单词做Word Averaging计算句子相似度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gensim
import gensim.downloader as api
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

model = api.load("glove-twitter-25")

def wordavg(model, words):
"""计算句子每个词的平均词向量"""
res = np.mean([model.get_vector(w) for w in words if w in model.vocab], 0)
return res

s1_vec = wordavg(model, s1.lower().split())
s2_vec = wordavg(model, s2.lower().split())

cosine_similarity(s1_vec.reshape((1, -1)), s2_vec.reshape((1, -1)))

Doc2Vec

​ 类似word2vec,只是在输入时加入一个全局的doc vec和word vec一起输入,doc vec由doc id和doc matrix相乘生成。计算方法有两种,DM(Distributed Memory)算法类似CBOW,DBOW(Distributed Bag of Words)类似Skip gram(只输入doc vec预测随机抽取词的概率分布)。

gensim官方文档:https://radimrehurek.com/gensim/models/doc2vec.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import gensim.models as g
from gensim.corpora import WikiCorpus
import logging
from hanziconv import HanziConv

docvec_size=192
class TaggedWikiDocument(object):
def __init__(self, wiki):
self.wiki = wiki
self.wiki.metadata = True
def __iter__(self):
import jieba
# tags信息是word2vec没有的,辅助训练
for content, (page_id, title) in self.wiki.get_texts():
yield g.doc2vec.LabeledSentence(words=[w for c in content for w in jieba.cut(HanziConv.toSimplified(c))], tags=[title])

def my_function():
zhwiki_name = './data/zhwiki-latest-pages-articles.xml.bz2'
wiki = WikiCorpus(zhwiki_name, lemmatize=False, dictionary={})
documents = TaggedWikiDocument(wiki)

model = g.Doc2Vec(documents, dm=0, dbow_words=1, size=docvec_size, window=8, min_count=19, iter=5, workers=8)
model.save('data/zhiwiki_news.doc2vec')

模型的推断。根据输入文档,在doc matrix的中infer出最后结果,要指定infer_epoch。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import gensim.models as g
import codecs
import numpy as np

def SimlarityCalu(Vector1,Vector2):
Vector1Mod=np.sqrt(Vector1.dot(Vector1))
Vector2Mod=np.sqrt(Vector2.dot(Vector2))
if Vector2Mod!=0 and Vector1Mod!=0:
simlarity=(Vector1.dot(Vector2))/(Vector1Mod*Vector2Mod)
else:
simlarity=0
return simlarity

#parameters
model='toy_data/model.bin'
test_docs='toy_data/t_docs.txt'
output_file='toy_data/test_vectors.txt'

#inference hyper-parameters
start_alpha=0.01
infer_epoch=1000

#load model
m = g.Doc2Vec.load(model)
test_docs = [x.strip().split() for x in codecs.open(test_docs, 'r', 'utf-8').readlines()]

#infer test vectors
output = open(output_file, 'w')
a=[]

for d in test_docs:
output.write(' '.join([str(x) for x in m.infer_vector(d, alpha=start_alpha, steps=infer_epoch)]) + '\n' )
a.append(m.infer_vector(d, alpha=start_alpha, steps=infer_epoch))
output.flush()
output.close()
print(SimlarityCalu(a[0],a[1]))