Add files via upload

This commit is contained in:
anonymous123rainy 2022-11-12 18:16:43 +08:00 committed by GitHub
parent d65e26b974
commit af9f3c2120
7 changed files with 4142 additions and 0 deletions

393
License.py Normal file
View File

@ -0,0 +1,393 @@
# _*_coding:utf-8_*_
'''
一个许可证 = n * 条款
'''
import json
import logging
import os
import re
import pandas as pd
import shutil
from Term import Term
import utils
from TermRelated import TermRelated
from AC import shortTextClassification
from model.PreprocessData import cleanData_intoTestDir
from model.LocateTerms import ner_predict
from model.DetermAtti import get_treeAtti
from model.config import config as term_config
DIR = os.path.dirname(os.path.abspath(__file__))+'/'
class License:
def __init__(self, name=None, termList=None, text=None, textNeedTE=None, matchedLnameList=None):
'''
过程中的被处理形式期待是termList.
:param name:
:param termList:
:param text:
'''
self.name = name
self.termList = termList # termExtraction
self.text = text # text. 经过条款提取进入termList
self.textNeedTE = textNeedTE ##
self.matchedLnameList = matchedLnameList ##
self.entity_mention_set = None
if self.termList is None:
self.termList = []
# (条款细节抽取的相关)
self.words = None
self.labs = None
self.entities_chunks = None
self.jj_etChunkInx = None ##
##
self.termRelatedList = None # 来源extract_termRelated()
# List[ TermRelated(Object) ]
def printTermlist(self, base_termlist=None):
if base_termlist:
attiList = [term_config['attiType_label'][tt.atti] for tt in base_termlist]
else:
attiList = [term_config['attiType_label'][tt.atti] for tt in self.termList]
return attiList
def termExtraction(self, nlp, ld, ner_model_ee5, re_args, re_model, ac_model):
'''
由self.text进行条款提取self.name当做data文件夹下的文件名
填充其self.termList
这里的所有都只涉及到一个许可证(每次用NER预测一个)不会被fname一样而影响
tree里的text一定要去检测CPS有可能进行条款提取根据标志位情况所有ref的都放matchedLnameList去直接找label基础
'''
## 把matchedLnameList对应的label结果拿过来
matchedLnameList = list(set(self.matchedLnameList))
for mathedLiname in matchedLnameList:
base_termlist = ld.give_termList_from_liname(mathedLiname)
if base_termlist:
self.setTermList(base_termlist)
print('base_termlist', mathedLiname, ' '.join([str(k) for k in self.printTermlist(base_termlist=base_termlist)]))
''' 进行条款提取 '''
print('self.textNeedTE:', self.textNeedTE)
if self.textNeedTE:
# # 预处理
# with open(DIR + 'model/data/' + self.name + '.txt', 'w', encoding="utf-8") as fw:
# fw.write(self.text)
# fw.close()
# # 主体步骤
# cleanData_intoTestDir.main()
# ner_predict.main(model=ner_model)
# _ = get_treeAtti.main(nlp=nlp)
''' 对于这篇文本 '''
tmpTermList = []
text = utils.cleanText(self.text)
sentsList = utils.sentences_split(text)
for sent in sentsList:
''' (按顺序)对每一个句子 '''
## 创建TermRelated对象 初始化
tr = TermRelated(sentence=sent.strip(), )
# (预测)实体识别
words, labs, entities_chunks = tr.predict_allEntityExtraction(ner_model_ee5)
# (ee->re 整理格式)
dataList = tr.prepare_data_fromEE_toREpredict(words, labs, entities_chunks)
if not dataList:
continue
# (预测)关系识别
test_pre_logits, dataList_final = tr.predict_relationExtraction(dataList, re_args, re_model)
if not dataList_final:
continue
# 。。如果EE准确率太低影响到整体效果就在这里用test_pre_logits进行过滤筛选,得到新的dataList_final
##
## (可能的条件后动作)
ConditionalActionList = []
for sp_dict in dataList_final:
if sp_dict["relation"] == "Condition-Action(e1,e2)" \
and utils.get_type_from_etcPos(entities_chunks, sp_dict["t"]["pos"])=='Action':
ConditionalActionList.append(sp_dict["t"]["pos"])
condInx_jj = {}
## (所有动作)
actionList = []
# for i, entity_chunk in enumerate(entities_chunks):
# et_type = entity_chunk[0]
# if et_type == 'Action':
# #actionList.append(str(entity_chunk[1])+' '+str(entity_chunk[2]))
# actionList.append(entity_chunk[1:3])
# 先放条件后动作 然后再条件前动作
for i, entity_chunk in enumerate(entities_chunks):
et_type = entity_chunk[0]
if et_type == 'Action' and entity_chunk[1:3] in ConditionalActionList: #
actionList.append(entity_chunk[1:3])
for i, entity_chunk in enumerate(entities_chunks):
et_type = entity_chunk[0]
if et_type == 'Action' and entity_chunk[1:3] not in ConditionalActionList: #
actionList.append(entity_chunk[1:3])
# print('最初', ConditionalActionList)
for at in actionList:
''' 对每一个出现的action '''
####
# 预备一个term对象不一定会被消费
tt = Term()
# 其对象
for sp_dict in dataList_final:
if sp_dict["h"]["pos"]==at and sp_dict["relation"]=="Action-Recipient(e1,e2)":
tt.setRecipient(recipient=sp_dict["t"]["name"])
break
if not tt.recipient:
tt.setRecipient(recipient="")
# 其动作(type)
termStr = ' '.join(words[at[0]:at[1]])+' '+tt.recipient
content_id = shortTextClassification.predict(text=termStr, ac_model=ac_model) ##(23分类)
content = term_config['term_list'][content_id]
tt.setContent(content=content)
# 其态度(type)
attilist = []
for sp_dict in dataList_final:
if sp_dict["h"]["pos"]==at and sp_dict["relation"]=="Action-Attitude(e1,e2)":
attilist.append(sp_dict["t"]["name"])
attiLabel = get_treeAtti.getAtti(attilist=attilist)
atti = term_config['attiLabel_type'][attiLabel]
tt.setAtti(atti=atti)
# 其条件(id-list)
if at in ConditionalActionList:
if tt.content not in [tm.content for tm in tmpTermList]:
### tt可以被add
tt.setCondInxs(condInxs=[])
tmpTermList.append(tt) #####
condInx_jj[str(at[0])+' '+str(at[1])] = utils.get_type2id()[tt.content]
# print('添加为', condInx_jj)
else:
# 不能add那就顺便也从ConditionalActionList中除去
inx = ConditionalActionList.index(at)
ConditionalActionList.pop(inx)
# print(ConditionalActionList)
else:
if tt.content not in [tm.content for tm in tmpTermList]:
### tt可以被add
# print(condInx_jj)
tt.setCondInxs(condInxs=[condInx_jj[str(ct[0])+' '+str(ct[1])] for ct in ConditionalActionList])
tmpTermList.append(tt) #####
''' (解析结束) '''
print('len(tmpTermList):',len(tmpTermList))
assert len(tmpTermList)<=23
for j in range(23):
content = term_config['term_list'][j]
#if content in [tm.content for tm in tmpTermList]:
if self.existsTerm(content=content): ## 已有base
#if tt.isMentioned(): # 1/2/3
if content in [tm.content for tm in tmpTermList]:
self.updateTerm(tmpTermList[[tm.content for tm in tmpTermList].index(content)]) ### 覆盖上去
#print(' updateTerm:', tmpTermList[[tm.content for tm in tmpTermList].index(content)].content, tmpTermList[[tm.content for tm in tmpTermList].index(content)].atti, '【from text】', self.text)
else:
if content in [tm.content for tm in tmpTermList]:
self.addTerm(tmpTermList[[tm.content for tm in tmpTermList].index(content)]) ###
else:
tt = Term(content=content)
tt.set_all_default()
self.addTerm(tt) ###
print('len(self.termList):',len(self.termList))
assert len(self.termList)==23
return
def extract_termRelated(self, nlp, ner_model_ee5, re_args, re_model, term_jj):
'''
self.words, self.labs, self.entities_chunks 从中找一个动作实体对应的部分
进行细节抽取
:return:
'''
etcInx = self.jj_etChunkInx[term_jj]
entity_chunk = self.entities_chunks[etcInx]
# 围绕每一个动作实体
actionStr = ' '.join(self.words[entity_chunk[1]:entity_chunk[2]])
action_j = int(entity_chunk[0])
action_atti = self.termList[term_jj].getAtti() ###
sent_beginIdx, sentStr = utils.getItsSequence(self.words, entity_chunk)
action_beginIdx = entity_chunk[1] - sent_beginIdx # action在sent里的位置索引
action_endIdx = entity_chunk[2] - sent_beginIdx # (左闭右开)
## 创建TermRelated对象 初始化
tr = TermRelated(sentence=sentStr, action_idxs=(action_beginIdx, action_endIdx),
action=actionStr,action_j=action_j, action_atti=action_atti)
tr.run_extract(ner_model_ee5, re_args, re_model) # 实体预测在tr那儿这儿一步到位就行
return tr ##
def getName(self):
return self.name
def getTermList(self):
# return self.termList
tmp = []
for tt in self.termList:
tmp.append(tt.get())
return tmp
def setTermList(self, termList):
self.termList = termList
return
def addTerm(self, term):
self.termList.append(term)
return
def updateTerm(self, tt):
for term in self.termList:
if term.content == tt.content:
term.atti = tt.atti
return
def existsTerm(self, content):
for term in self.termList:
if term.content == content:
return True
return False
def isSatisNeed(self, termList):
'''
给定需求判断此license对象是否满足. 准确符合这个需求
满足给定的条款集合即可其他多余的条款不管
:param termList:
:return:
'''
for tn in termList:
fg = False
for term in self.termList:
if term.content == tn.content and term.atti == tn.atti:
fg = True
break
if not fg:
return False
return True
def isSatisNeed_2(self, termlist_need_fromChildren, termlist_need_fromParent):
'''
给定需求判断此license对象是否满足. 满足这个范围要求
'''
for j in range(23):
if not termlist_need_fromParent:
if not self.termList[j].isMoreStrict(termlist_need_fromChildren[j], self.termList, termlist_need_fromChildren):
return False
else:
if not (self.termList[j].isMoreStrict(termlist_need_fromChildren[j], self.termList, termlist_need_fromChildren)
and termlist_need_fromParent[j].isMoreStrict(self.termList[j], termlist_need_fromParent, self.termList)):
return False
# # [遇到“父节点没权限时才考虑上层需求”]
# if not self.termList[j].isMoreStrict(termlist_need_fromChildren[j], self.termList, termlist_need_fromChildren):
# return False
return True
def parse_get_entity_mention(self, extractType, tokenizer, nlp, prefix, midFilesDir, max_seq_length):
'''
该许可证文本 解析出 想要的entity_mention
并且生成对应ids
:return:
'''
entity_mention_set = [] # ids
fw1 = open(os.path.join(midFilesDir, 'mention_strs' + prefix + '.txt'), 'w', encoding="utf-8")
fw2 = open(os.path.join(midFilesDir,'mention_ids' + prefix + '.txt'), 'w', encoding="utf-8")
sentences = utils.sentences_split(self.text)
for sent in sentences:
sent = sent.strip()
if not sent:
continue
sent = ' '.join(sent.split(' ')[:max_seq_length]) ###
outputFormat = 'json'
dpResult = nlp.annotate(sent, properties={'annotators': 'depparse', 'outputFormat': outputFormat, })
# 句子太长时 可能会无法输出dpResult为空
try:
enhancedPlusPlusDependencies = json.loads(dpResult)["sentences"][0]["enhancedPlusPlusDependencies"]
except Exception as e:
print(e)
print(dpResult)
print(sent)
continue
tokens = json.loads(dpResult)["sentences"][0]["tokens"]
findedIDSet = utils.extract_entity_mention(extractType, tokens, enhancedPlusPlusDependencies, 0, [])
for tp in findedIDSet:
tp.reverse()
phrase = utils.get_words_from_ids(tp, tokens)
phrase_ids = utils.generate_bert_ids_for_sentence(tokenizer=tokenizer, sentence=phrase, fg=1)
entity_mention_set.append(phrase_ids)
#
fw1.write(' '.join(phrase) + '\n')
fw2.write(' '.join([str(a) for a in phrase_ids]) + '\n')
#entity_mention_set = list(set(entity_mention_set))
entity_mention_set = utils.get_unique_lists_in_list(entity_mention_set)
self.entity_mention_set = entity_mention_set
# print('self.entity_mention_set', len(self.entity_mention_set))
fw1.close()
fw2.close()
return entity_mention_set
'''
license = License(name="GYL")
license.addTerm(Term(content="Distribute",atti="cannot"))
license.addTerm(Term(content="Distribute",atti="can"))
license.addTerm(Term("Modify","cannot"))
print(license.getTermList())
'''

284
LicenseDataset.py Normal file
View File

@ -0,0 +1,284 @@
# _*_coding:utf-8_*_
'''
数据库= n * 许可证
'''
import json
import logging
import os
import re
import pandas as pd
import pickle
from Term import Term
from License import License
import utils
from model.config import config as term_config
DIR = os.path.dirname(os.path.abspath(__file__))+'/'
class Licensedataset:
def __init__(self, licenseList=None):
self.licenseList = licenseList
self.licenses = None # dict(name:text). 未经结构化的许可证数据库(原始的若干个许可证文本)
self.sentBertIdsDataset = None # list的list。 若干个句子的ids各个许可证的句子ids总体再消重对应roberta-base的。
if self.licenseList is None:
self.licenseList = []
def printLicenseList(self):
for ll in self.licenseList:
print(ll.getName(), ll.getTermList())
return
def addLicense(self, license):
self.licenseList.append(license)
return
def load_licenses_from_csv(self, nlp, ld, ner_model_ee5, re_args, re_model, ac_model):
'''
直接读取 已经结构化的许可证 数据库
:return:
'''
df = pd.read_csv(DIR+"data/tldr-licenses-forSpdx.csv")
# contentList = list(df.columns)[1:]
for row in df.itertuples():
# 每行是一个许可证
i = len(self.licenseList)
# 获取文本内容
words, labs, entities_chunks = utils.get_entities(DIR + "data/termEntityTagging/" + str(i + 1) + '.txt', clean=False)
text = ' '.join(words)
### 构造一个License对象
li = License(name=row[1], text=text, matchedLnameList=[], textNeedTE=True)
if os.path.exists(os.path.join(DIR, 'ld_save', li.name+'.json')):
with open(os.path.join(DIR, 'ld_save', li.name+'.json'), 'r', encoding="utf-8") as fr:
liJSON = json.load(fr)
for tjson in liJSON:
tt = Term()
tt.setContent(tjson['content'])
tt.setAtti(tjson['atti'])
tt.setRecipient(tjson['recipient'])
tt.setCondInxs(tjson['condInxs'])
li.addTerm(tt)
assert len(li.termList) == 23
else:
li.termExtraction(nlp, ld, ner_model_ee5, re_args, re_model, ac_model)
with open(os.path.join(DIR, 'ld_save', li.name + '.json'), 'w', encoding="utf-8") as fw:
liJSON = []
for tt in li.termList:
tjson = {}
tjson['content'] = tt.content
tjson['atti'] = tt.atti
tjson['recipient'] = tt.recipient
tjson['condInxs'] = tt.condInxs
liJSON.append(tjson)
json.dump(liJSON, fw)
# 覆盖atti
for j, atti in enumerate(row[2:]):
# 某许可证的一个条款with极性
li.termList[j].setAtti(atti=atti)
# 设置缺省认定值 这里就都设成123 省的兼容性检测时不统一 导致bug
li.termList[j].set_absentAtti()
# ### 更新self.termList
# li.addTerm(tt)
assert len(li.termList) == 23
self.addLicense(li)
print("load ld: ", i)
##self.printLicenseList() #### (海星 cond
return self.licenseList
def give_termList_from_liname(self, name):
for li in self.licenseList:
kk = li.name.split('___')
for k in kk:
if k==name:
return li.termList
print('【这个matchedLiName竟然在ld里面找不到对应的】,,,,,', name)
# (记录一下)
with open(os.path.join(DIR, 'gap_spdx_tldr.txt'), 'a', encoding="utf-8") as fw:
fw.write(name + '\n')
return []
def read_licenses(self, dataDir):
'''
读取原始的若干个许可证文本
文本预处理
:return:
'''
licenses = {}
for file in os.listdir(dataDir):
with open(os.path.join(dataDir, file), 'r', encoding="utf-8")as fr:
text = ' '.join([line.strip() for line in fr.readlines()])
text = utils.cleanText(text)
fr.close()
# print(text)
licenses[file[:-4]] = text
self.licenses = licenses
print('self.licenses', len(self.licenses))
return self.licenses
def generate_bert_ids_for_licenses(self,tokenizer, idsDir, max_seq_length):
'''
生成input_ids.h5是list的list若干个句子的ids各个许可证的句子ids总体再消重
对应roberta-base的
'''
ids = []
for text in self.licenses.values():
sentences = utils.sentences_split(text)
for sent in sentences:
sent = sent.strip().split(' ')[:max_seq_length] ###
sent_ids = utils.generate_bert_ids_for_sentence(tokenizer=tokenizer,sentence=sent, fg=2)
ids.append(sent_ids)
# ids = list(set(ids))
ids = utils.get_unique_lists_in_list(ids)
print('ids', len(ids))
self.sentBertIdsDataset = ids
# 写文件
import h5py
f = h5py.File(idsDir, 'w') # 创建一个h5文件文件指针是f
#f['data'] = str(ids) # 将数据写入文件的主键data下面
f.create_dataset(name='data', data=ids, dtype=int)
f.close()
return self.sentBertIdsDataset
def generate_entity_mention_position_file(self, entity_mention_set, posDir):
'''
为了mention融合成entity需要提前搜集该mention在数据库中(即self.sentBertIdsDataset)出现的所有句子 作为生成embedding的基础
一个mention有一个group里面是若干个出现在某句中的某位置
生成entity_pos.pkl
暂时 每个待预测许可证生成一个entity_mention_set再生生成对应的一个pkl文件吧
'''
# 初始化
groups = {}
for j in range(len(entity_mention_set)):
groups[j] = [] # 一个group
# 遍历self.sentBertIdsDataset填充groups
for i in range(len(self.sentBertIdsDataset)):
sent_ids = self.sentBertIdsDataset[i]
for j in range(len(entity_mention_set)):
phrase_ids = entity_mention_set[j]
sent_str = ' '.join([ str(a) for a in sent_ids])
phra_str = ' '.join([ str(a) for a in phrase_ids])
if sent_str.find(phra_str) > -1:
# (可能有多次出现在此句中)
#starts = [each.start() for each in re.finditer(phra_str, sent_str)] # 注意 空格 对于id-pos是多余的
starts = [sent_str[:each.start()].count(' ')+1-1 for each in re.finditer(phra_str, sent_str)]
ends = [start + len(phrase_ids) for start in starts] #### 左开右闭
spans = [(start, end) for start, end in zip(starts, ends)]
for sp in spans:
# 一次出现
cur_item = [i, sp[0], sp[1]]
groups[j].append(cur_item)
print('groups', len(groups))
# for j in range(len(entity_mention_set)):
# print(str(len(groups[j])))
# 以二进制方式来存储,rb,wb,wrb,ab
p = open(posDir, 'wb')
# 将字典数据存储为一个pkl文件
pickle.dump(groups, p)
p.close()
return groups
def isNeedSatisfied(self,termList):
'''
判断本数据库中 是否存在满足此需求的许可证 准确符合这个需求
输出license对象的列表
:return:
'''
abled = []
for ll in self.licenseList:
if ll.isSatisNeed(termList):
abled.append(ll)
return abled
def isNeedSatisfied_2(self,termlist_need_fromChildren, termlist_need_fromParent):
'''
判断本数据库中 是否存在满足此需求的许可证 满足这个范围要求
输出license对象的列表
:return:
'''
abled = []
for ll in self.licenseList:
if ll.isSatisNeed_2(termlist_need_fromChildren, termlist_need_fromParent):
abled.append(ll)
print(ll.name, ' '.join([str(k) for k in ll.printTermlist()]))
return abled
'''
ld = Licensedataset()
ld.printLicenseList()
license = License(name="GYL")
license.addTerm(Term(content="Distribute",atti="cannot"))
ld.addLicense(license)
ld.printLicenseList()
'''
'''
ld = Licensedataset()
ld.load_licenses_from_csv()
ld.printLicenseList()
'''
# df = pd.read_csv(DIR+"data/tldr-licenses-forSpdx.csv")
# print(list(df.columns)[1:])

896
LicenseRepair.py Normal file
View File

@ -0,0 +1,896 @@
# _*_coding:utf-8_*_
'''
许可证问题de具体修复方案
'''
import json
import logging
import os
import re
import pandas as pd
from itertools import product
from treelib import Tree, Node
from model.config import config as term_config
from Term import Term
from License import License
from LicenseDataset import Licensedataset
import utils
class LicenseRepair:
def __init__(self, licenseTree=None, nid_filepath=None, hasPL=None, nid_textNeedTE=None, nid_matchedLnameList=None):
self.licenseTree = licenseTree # 树结构(节点的索引、内容、层次、)
self.nid_filepath = nid_filepath # dict {nid: str}
self.nid_textNeedTE = nid_textNeedTE
self.nid_matchedLnameList = nid_matchedLnameList
self.hasPL = hasPL
self.nid_license = {} # dict {nid: LicenseObject}
self.fixable_nid = [] # list[int]
self.fixable_nid_all = []
self.fixable_nid_pl = []
self.fixable_nid_ch = []
self.nid_termListFromChildren = {} #(保存一下这个信息)
self.incomNid_termLists = {} # dict {部分nid: [list[TermObject], list[TermObject]] } # 下界和上界 # 比下界更紧and比上界更松。
self.incomNid_filepathLists = {}
# (和上面格式一致,只是对应换成 对应的term的对应极性的filepath。) # 一个atti对应的filepath可能是多个用|来连接。...好复杂
# 其实不用放 filepath from parent need。反正exception的文本中不用涉及父节点。
# {nid: list[ dict{atti: str-filepaths} ]} 不用str-filepaths 只写nid即可 list[nid]然后简化成了一个nid
self.incomAndFixable_nid = [] # list[int]
def show_licenseTree(self):
self.licenseTree.show()
return
def turn_into_licenseObjects(self, nlp, ld, ner_model_ee5, re_args, re_model, ac_model):
'''
填充了self.nid_license
'''
for nid in self.licenseTree.expand_tree(mode=Tree.DEPTH, sorting=False):
if nid == 1:
continue
print('(条款提取)', nid, '/', len(self.licenseTree.nodes.keys()))
ntag = self.licenseTree[nid].tag
nname = self.nid_filepath[nid].split('/')[-1].replace(':','.')
###
li = License(name=nname,text=ntag, textNeedTE=self.nid_textNeedTE[nid], matchedLnameList=self.nid_matchedLnameList[nid])
li.termExtraction(nlp, ld, ner_model_ee5, re_args, re_model, ac_model)
self.nid_license[nid] = li
return
def search_fixable_places(self, nlp):
'''
licenseTree的节点的tag检查里面可能存在的copyright holder信息,
1PL有copyright holder信息再看内层有没有最终可能修复至少一个位置
2PL若无对应那只能修复PL这一个位置
copyright and (C), authored by, written by, etc.
:return:返回对应位置的nid列表
'''
self.fixable_nid = []
self.fixable_nid_all = []
self.fixable_nid_pl = []
self.fixable_nid_ch = []
'''先找到PL的'''
PL_holders = []
#PL_holder_possibleSent = [] ## (有的不规范不典型表达 可能检测不出holder但若PL和很多CL都是这样写的句子 也可以表示它们是同一作者,,,)
for nid in self.licenseTree.expand_tree(mode=Tree.DEPTH, sorting=False):
if nid >= 2:
self.fixable_nid_all.append(nid)
if self.licenseTree.level(nid) == 1:
self.fixable_nid.append(nid) #####
self.fixable_nid_pl.append(nid)
ntag = self.licenseTree[nid].tag
# 检查ntag的内容
text = ntag ## .lower() 原本的大小写对NER很有影响.
sentsList = utils.sentences_split(text)
for sent in sentsList:
if utils.check_text_for_CPS(sent): # 存在copyright相关语句
print('存在CPS格式的句子', sent)
holders = utils.identify_PERSON_ORGANIZATION_by_corenlp(nlp, sent)
if holders:
PL_holders.extend(holders)
print('存在PL_holder的句子', holders, ' ::: ', sent)
self.fixable_nid_ch.append(nid)
#else:
# PL_holder_possibleSent.append(sent)
print("PL_holders: ", PL_holders)
'''再看里面的'''
for nid in self.licenseTree.expand_tree(mode=Tree.DEPTH, sorting=False):
if self.licenseTree.level(nid) > 1:
#CL_holder_possibleNid = False ##
print('检查CPS', nid, '/', len(self.licenseTree.nodes.keys()))
ntag = self.licenseTree[nid].tag
# 检查ntag的内容
text = ntag ## .lower()
CL_holders = [] ##
sentsList = utils.sentences_split(text)
for sent in sentsList:
if utils.check_text_for_CPS(sent): # 存在copyright相关语句
print('存在CPS格式的句子', sent)
holders = utils.identify_PERSON_ORGANIZATION_by_corenlp(nlp, sent)
if holders:
CL_holders.extend(holders)
#elif PL_holder_possibleSent and utils.existsSameSent(PL_holder_possibleSent, sent):
# CL_holder_possibleNid = True
print('它的CL_holder', CL_holders)
if set(CL_holders) & set(PL_holders): ##
self.fixable_nid.append(nid) #####
self.fixable_nid_ch.append(nid)
#elif CL_holder_possibleNid: ##
# print('它存在CL_holder_possibleNid.')
# self.fixable_nid.append(nid)
return
def isConflictNeed(self, termList):
'''
这个termlist本身内部是否存在矛盾(存在不一致)
这个版本是针对于 termList放的是杂七杂八 极性不同的都放一次
'''
for tt in product(termList,termList):
if tt[0].isconflict(tt[1]):
return True
return False
def isConflictNeed2(self, termList):
'''
这个版本是针对于 termList放的是 某条款只放一次 只不过极性冲突的已经用'conflict'来表示了
'''
for tt in termList:
if tt.isconflict2():
return True
return False
def getConflictNeed2(self, termList):
conf_tt_j_list = []
for j, tt in enumerate(termList):
if tt.isconflict2():
conf_tt_j_list.append(j)
return conf_tt_j_list
def getConflictNeeds(self, termList):
'''
这个termlist本身内部 存在的矛盾 的具体位置情况
'''
cfTupIndList0 = [] # 先每个元组是一对id
for i in range(len(termList)):
for j in range(i+1, len(termList)):
if termList[i].isconflict(termList[j]):
cfTupIndList0.append((i,j))
# 保证关于同term.content的只出现一个元组 (每个元组是》=2个id
cfTupIndList = []
for i in range(len(cfTupIndList0)):
tp1 = cfTupIndList0[i]
tmp = list(tp1)
for j in range(i+1, len(cfTupIndList0)):
tp2 = cfTupIndList0[j]
if tp1 != tp2 and termList[tp1[0]].isSameContent(termList[tp2[0]]):
tmp.extend(list(tp2))
tmp = set(tmp)
fg = False
for tp in cfTupIndList:
if set(tp).issuperset(tmp):
fg = True
if not fg:
cfTupIndList.append(tuple(list(tmp)))
cfTupIndList = list(set(cfTupIndList))
return cfTupIndList
def repair_choose_popular_licenses(self, termlist_need_fromChildren, termlist_need_fromParent, ld):
'''
判断本数据库中 是否存在满足此需求的许可证 满足这个范围要求
OK
'''
'''
确实可以顺便推荐改动最小的方案
'''
return ld.isNeedSatisfied_2(termlist_need_fromChildren, termlist_need_fromParent)
def repair_generate_one_custom_license(self,termlist_need_fromChildren, termlist_need_fromParent):
'''
理论上:应该生成一段文本 极性是在这两个termlist的范围圈定之间的
(目前 暂时只用termlist_need_fromChildren的直接生成)
暂时先这样
其实 如果可以在范围内的波动自由的话就可以考虑用最小修改来求一个优化结果
'''
l_custom = ''
termContent_template = utils.read_custom_template()
for tt in termlist_need_fromChildren:
template = termContent_template[tt.content]
l_custom += ('You '+tt.atti+' '+ template + '.'+'\n')
return l_custom
def repair_generate_one_custom_license_2(self, termlist_need_fromChildren, termlist_need_fromParent, nid, nlp, ner_model_ee5, re_args, re_model):
text = ''
for j in range(23):
# 每个条款下 可能添加至少一句话
# 有exception的是有多个atti没有的是只有一个atti那反正遍历atti就行了呗总而言之 每次给到一句话)
atti_cid = self.incomNid_filepathLists[nid][j] # dict{ atti-str: nid-int}
for atti, cid in atti_cid.items():
cidFilepathList = self.nid_filepath[cid]
# 添加1句话
# 去找cid结点里j条款的说辞并解构
ll = self.nid_license[cid]
# 得到对应的tr对象
# (组装成一句话)
# tr = ll.extract_termRelated(nlp, ner_model_ee5, re_args, re_model, j)
# sent = tr.composeOneSent()
sent = ll.termList[j].composeOneSent(ll.termList)
text += sent
text += '\n'
return text
def repair_generate_one_exception_license(self, termList, termList_filepathList, cfTupIndList, ):
'''
:param termList:
:param termList_filepathList:
:param cfTUupIndList:
:return:
'''
l_exception = ''
termContent_template = utils.read_custom_template()
termList_alre = [False]*len(termList)
for tp in cfTupIndList:
template = termContent_template[termList[tp[0]].content]
for k in tp:
k_atti = termList[k].atti
k_obj = termList_filepathList[k]
l_exception += ('For the code in ' + k_obj + ', you ' + k_atti + ' ' + template + '; ')
termList_alre[k] = True
l_exception += '\n'
l_exception += 'The other terms are below: ' + '\n'
for i in range(len(termList)):
if not termList_alre[i]:
tt = termList[i]
template = termContent_template[tt.content]
l_exception += ('You ' + tt.atti + ' ' + template + '.' + '\n')
return l_exception
def repair_generate_one_exception_license_2(self, termList, nid, conf_tt_j_list, nlp, ner_model_ee5, re_args, re_model):
'''
等兼容性检测那里填充好self.incomNid_filepathLists这里就按那个数据结构来写
:param termList:
:param termList_filepathList:
:return:
'''
# return '(we will generate a exception license for you ...)'
'''
'''
text = ''
for j in range(23):
# 每个条款下 可能添加至少一句话
if j in conf_tt_j_list:
# 当前条款的极性有exception时
text += 'ONE EXCEPTION: '
# 有exception的是有多个atti没有的是只有一个atti那反正遍历atti就行了呗总而言之 每次给到一句话)
atti_cid = self.incomNid_filepathLists[nid][j] # dict{ atti-str: nid-int}
for atti, cid in atti_cid.items():
cidFilepath = self.nid_filepath[cid]
if j in conf_tt_j_list:
text += 'For the code in : '+cidFilepath+', '
# 添加1句话
# 去找cid结点里j条款的说辞并解构
ll = self.nid_license[cid]
# tr = ll.extract_termRelated(nlp, ner_model_ee5, re_args, re_model, j)
# # 得到对应的tr对象
# # (组装成一句话)
# sent = tr.composeOneSent()
sent = ll.termList[j].composeOneSent(ll.termList)
text += sent
text += '\n'
return text
def repair_onePlace(self, nid, ld, nlp, ner_model_ee5, re_args, re_model):
'''
输入本次待修复的位置nid
输出给此位置的修复建议一段文本
'''
'''
# (一些测试参数)
termList = [
Term('Distribute', 'can'),
Term('Modify', 'can'),
Term('Commercial Use', 'cannot'),
Term('Hold Liable', 'cannot'),
Term('Include Copyright', 'must'),
Term('Sublicense', 'can'),
Term('Disclose Source', 'must'),
Term('Rename', 'must'),
]
termList_filepathList = []
'''
# 该位置的已知信息
termlist_need_fromChildren = self.incomNid_termLists[nid][0]
termlist_need_fromParent = self.incomNid_termLists[nid][1]
# termlist_real = self.nid_license[nid].termList
# termList_filepathList = [] #self.incomNid_filepathLists[nid]
print('【【【【termlist_need_fromChildren: ', ' '.join([str(term_config['attiType_label'][tt.getAtti()]) for tt in termlist_need_fromChildren]))
# 修复过程
if self.isConflictNeed2(termList=termlist_need_fromChildren):
print("【需求存在矛盾生成带有exception的自定义许可证】")
# cfTupIndList = lr.getConflictNeeds(termList=termList)
conf_tt_j_list = self.getConflictNeed2(termList=termlist_need_fromChildren)
text = self.repair_generate_one_exception_license_2(termList=termlist_need_fromChildren, nid=nid, conf_tt_j_list=conf_tt_j_list,
nlp=nlp, ner_model_ee5=ner_model_ee5, re_args=re_args, re_model=re_model)
return 1, text
else:
# termlist_need_fromParent肯定不含有conflict
# termlist_need_fromChildren若有的话会进上面的exception因此下面popular和custom肯定是有效的atti。
abledList = self.repair_choose_popular_licenses(termlist_need_fromChildren, termlist_need_fromParent, ld)
if not abledList:
print("【数据库无法满足需求,生成自定义许可证】")
text = self.repair_generate_one_custom_license_2(termlist_need_fromChildren=termlist_need_fromChildren,
termlist_need_fromParent=termlist_need_fromParent,
nid=nid, nlp=nlp, ner_model_ee5=ner_model_ee5, re_args=re_args, re_model=re_model)
return 3, text
else:
print("【数据库满足需求,推荐以下已有许可证】")
return 2, str([ll.name for ll in abledList])
def isCompatible_real_for_needs(self, nid, needtermlist):
'''
比较两个termlist一个节点上的本身VS被需求
输入两个termlist
输出是否
每个term.content上 本身atti 应该比 被需求atti 一样or更加严格
'''
realTermlist = self.nid_license[nid].termList
# print(nid, realTermlist, needtermlist)
if not realTermlist or not needtermlist:
print(nid, realTermlist, self.nid_license[nid].matchedLnameList)
if not realTermlist:
return True
'''
暂时简化成按顺序直接就term.content已经对应了
'''
#print(nid, [tt.atti for tt in realTermlist], [tt.atti for tt in needtermlist])
for j in range(23):
if not realTermlist[j].isMoreStrict(needtermlist[j], realTermlist, needtermlist):
#print(j, realTermlist[j].atti, needtermlist[j].atti)
return False
return True
def get_oneNode_needs_from_its_childern(self, termlists_of_cid):
'''
得到此节点的低层需求termlist从其所有子节点的termlist
输入若干个termlist
输出一个termlist
每个term.content上 找其中最严格的那种atti
最严格们冲突 则atti='conflict'下游直接就不兼容了
'''
termlist = []
attiCidsList = []
for j in range(23):
terms_sameCont_diffAtti = []
corr_cid = []
for cid in termlists_of_cid.keys():
termlist_tmp = termlists_of_cid[cid]
# (这里暂时简化了 原本应该写函数去寻找哪些term的content是一样的
# (这里直接按顺序来的 因为当时nid_license就是按顺序放进去的
if not termlist_tmp:
continue
tt = termlist_tmp[j]
# 设置缺省认定值 改成最初就都设成123 省的兼容性检测时不统一 导致bug
# tt.set_absentAtti()
terms_sameCont_diffAtti.append(tt)
corr_cid.append(cid)
# 找其中最严格的那种atti
mostStrictOne, atti_cids = terms_sameCont_diffAtti[0].find_mostStrictAtti(terms_sameCont_diffAtti, corr_cid)
termlist.append(mostStrictOne)
attiCidsList.append(atti_cids)
return termlist, attiCidsList
def upward_get_allNodes_needs_from_childern(self):
'''
逐层向上对于非叶子结点得到各自的低层需求termlist
找非叶子节点
按深度排序
保证在计算它时它的所有子节点已经计算过
遍历其所有子节点的termlist其中若为叶子则使用其本身termlist/若为非叶子则用其需求termlist
按深度排序then依次计算 先这样写
or
写一个递归函数
'''
nid_termListFromChildren = {}
nid_attiCidsListFromChildren = {}
nids_of_leaves = [nd.identifier for nd in self.licenseTree.leaves()]
nids_of_not_leaves = set(list(self.licenseTree.nodes.keys())) - set(list([1])) - set(nids_of_leaves) ###
nid_level = dict(zip(nids_of_not_leaves, [self.licenseTree.level(nid) for nid in nids_of_not_leaves]))
sorted_nid_level = sorted(nid_level.items(), key=lambda d:d[1], reverse=True)
for nid, nlevel in sorted_nid_level:
# 找到所有子节点
childrenList = self.licenseTree.is_branch(nid)
termlists_of_cid = {}
# 找到子节点的termlist若为叶子则使用其本身termlist/若为非叶子则用其需求termlist
for cid in childrenList:
# 每一个子节点:
assert cid in nids_of_leaves or cid in nid_termListFromChildren.keys()
'''
if cid in nids_of_leaves:
termlists_of_cid[cid] = self.nid_license[cid].termList
else:
termlists_of_cid[cid] = nid_termListFromChildren[cid]
'''
termlists_of_cid[cid] = self.nid_license[cid].termList #### 【1109】
############################
# if nid in [4,48,51]:
###########################
# 更新nid_termListFromChildren
termlist_from_children, attiCidsList_from_children = self.get_oneNode_needs_from_its_childern(termlists_of_cid)
nid_termListFromChildren[nid] = termlist_from_children
nid_attiCidsListFromChildren[nid] = attiCidsList_from_children
'''
但为了get_PL_needs_from_childern万一从叶子nid_termListFromChildren也放入叶子的本身
'''
for nid in nids_of_leaves:
nid_termListFromChildren[nid] = self.nid_license[nid].termList
return nid_termListFromChildren, nid_attiCidsListFromChildren
def get_PL_needs_from_childern(self):
'''
在项目不含PL时(self.hasPL=False):
填充 self.incomNid_termList[-1] incomNid_filepathLists[-1]
此时已经计算完了全OSS的层次化兼容性检测在此基础上找第一层 for PL
'''
termlists_of_cid = {}
for nid in self.nid_termListFromChildren.keys():
if self.licenseTree.level(nid) == 1:
termlists_of_cid[nid] = self.nid_termListFromChildren[nid]
termlist_from_children, attiCidsList_from_children = self.get_oneNode_needs_from_its_childern(termlists_of_cid)
self.incomNid_termLists[-1] = [termlist_from_children, []]
self.incomNid_filepathLists[-1] = attiCidsList_from_children
return
def get_incomNodes_needs_from_parent(self, nid):
'''
对那些不兼容的位置只向上看一层
其实实际编程时 这个就简单了父节点最多一个高层需求基本就是复制父节点的termlist
'''
nParid = self.licenseTree.parent(nid).identifier
termlist_from_parent = self.nid_license[nParid].termList
return termlist_from_parent
def detect_incompatibility_hierarchically(self):
'''
从最内层向外 汇总当前位置被内层导致的需求 判断当前位置是否发生了不兼容
以一个项目即一个子树为单位
使用self.licenseTreeself.nid_licenseself.nid_filepath
最终结果填充self.incomNid_termList和incomNid_filepathLists
1. 逐层向上得到各自的低层需求termlist非叶子结点但为了get_PL_needs_from_childern万一从叶子nid_termListFromChildren也放入叶子的本身
1109改成了 不传递 只看当前的父子关系
2. 比较各自的需求termlist和本身termlist得到发生不兼容的点不兼容and非叶子节点
3. 逐层向下只向上看一层对那些不兼容的位置 根据其高层需求得到各自的高层需求termlist只为了修复时不至于产生新的冲突 不兼容and非叶子结点and非根节点
'''
# 1
self.nid_termListFromChildren, nid_attiCidsListFromChildren = self.upward_get_allNodes_needs_from_childern()
# 2
for nid, needtermlist in self.nid_termListFromChildren.items():
if not self.isCompatible_real_for_needs(nid, needtermlist):
self.incomNid_termLists[nid] = [needtermlist] # 添加下界
self.incomNid_filepathLists[nid] = nid_attiCidsListFromChildren[nid]
print(self.incomNid_termLists.keys())
print(self.incomNid_filepathLists)
# 3
for icNid in self.incomNid_termLists.keys():
if self.licenseTree.level(icNid) > 1:
termlist_from_parent = self.get_incomNodes_needs_from_parent(icNid)
self.incomNid_termLists[icNid].append(termlist_from_parent) # 添加上界
else:
self.incomNid_termLists[icNid].append([])
return
def get_incom_and_fixable_places(self):
'''
填充self.incomAndFixable_nid列表
'''
incom_nids = self.incomNid_termLists.keys()
fixable_nids = self.fixable_nid
self.incomAndFixable_nid = list(set(incom_nids) & set(fixable_nids))
return
def getShortPath(self,nid,repoName):
rootDir = os.path.dirname(os.path.abspath(__file__))
return self.nid_filepath[nid][len(os.path.join(os.path.dirname(rootDir), 'repos', repoName)+'/'):]
def baseline_tool_nonhiera(self, repoName):
reportList = []
if self.hasPL:
termlist_PL = self.nid_license[2].termList
for nid in self.licenseTree.expand_tree(mode=Tree.DEPTH, sorting=False):
if nid <= 2:
continue
termlist_CLi = self.nid_license[nid].termList
##
tmp_j_list = []
for j in range(23):
if not termlist_PL[j].isMoreStrict(termlist_CLi[j], termlist_PL, termlist_CLi): ##
# tmp_j_list.append(j)
tmp_j_list.append(term_config['term_list'][j])
if tmp_j_list:
sent = {}
sent['A'] = self.getShortPath(nid=2, repoName=repoName)
sent['B'] = self.getShortPath(nid=nid, repoName=repoName)
sent['incomterms'] = ', '.join(tmp_j_list)
reportList.append(sent)
else:
PL = License(name='PL')
for j in range(23):
content = term_config['term_list'][j]
tt = Term(content=content)
tt.set_all_default()
PL.addTerm(tt) ###
termlist_PL = PL.termList
for nid in self.licenseTree.expand_tree(mode=Tree.DEPTH, sorting=False):
if nid <= 1:
continue
termlist_CLi = self.nid_license[nid].termList
##
tmp_j_list = []
for j in range(23):
if not termlist_PL[j].isMoreStrict(termlist_CLi[j], termlist_PL, termlist_CLi): ##
# tmp_j_list.append(j)
tmp_j_list.append(term_config['term_list'][j])
if tmp_j_list:
sent = {}
sent['A'] = self.getShortPath(nid=2, repoName=repoName)
sent['B'] = self.getShortPath(nid=nid, repoName=repoName)
sent['incomterms'] = ', '.join(tmp_j_list)
reportList.append(sent)
# print()
# cids = []
# for nid in self.licenseTree.expand_tree(mode=Tree.DEPTH, sorting=False):
# if nid == 1:
# continue
# cids.append(nid)
# for d1 in range(0, len(cids)):
# for d2 in range(d1 + 1, len(cids)):
# termlist_CL1 = self.nid_license[cids[d1]].termList
# termlist_CL2 = self.nid_license[cids[d2]].termList
# ##
# tmp_j_list = []
# for j in range(23):
# if not termlist_CL1[j].isTwoOccurConflict(termlist_CL2[j]): ##
# # tmp_j_list.append(j)
# tmp_j_list.append(term_config['term_list'][j])
# if tmp_j_list:
# sent = {}
# sent['A'] = self.getShortPath(nid=cids[d1], repoName=repoName)
# sent['B'] = self.getShortPath(nid=cids[d2], repoName=repoName)
# sent['incomterms'] = ', '.join(tmp_j_list)
# reportList.append(sent)
return reportList
'''
from pythonModuleStructreWithLicenses import demo
licenseTree = demo.get_license_tree('testPro')
lr = LicenseRepair(licenseTree=licenseTree)
lr.show_licenseTree()
整个pipeline
拿到项目文件夹生成树每个许可证生成对应的license对象[]
一边检查copyright找到能修复的位置
一边做层次兼容性检测 从最内层向外 汇总当前位置被内层导致的需求 找到所有发生不兼容的位置[]
对每一个发生不兼容and能修复的位置
输入需求每个位置有对应的termList和termList_filepathList
做修复
判断是否有需求矛盾
如果有生成exception许可证
如果无判断数据库是否存在许可证满足需求
存在推荐已有许可证
不存在生成自定义许可证
输出此位置应该使用的许可证内容
兼容性检测将若干个license对象 消去其中相同的term分析输出为若干个term对象
输出termList和termList_filepathList
termList_filepathList每个term对象 对应的 哪些文件/模块/包等等
需求一个列表其中每个元素是一个term对象
需求矛盾term对象矛盾某几个term对象 其内容或动作或什么一样 但极性或对象啥的不一样
'''
'''
ancestor;successor
parent;children
tree的一些个函数
.is_branch(nid) 是的 下一层的子节点
node的一些个函数
.tag
.identifier
.predecessor()
.successors()
'''
'''模块案例测试'''
def runLicenseRepair(repo, nlp, ld, ner_model_ee5, re_args, re_model, ac_model):
'''
输入项目名 默认其在文件夹./unzips/
输出修复结果以及lr的一些属性统计数据
调试信息会适当地控制台输出
'''
print('repo: ', repo)
# 生成许可证树
# import projectLicenseTree
from projectLicenseTree import get_license_tree
print('开始构建许可证树……')
licenseTree, nid_filepath, hasPL, nid_textNeedTE, nid_matchedLnameList = get_license_tree(repo=repo) # nid_filepath 每个叶子结点所对应的文件路径。
print('hasPL: ', hasPL)
for key, val in nid_matchedLnameList.items():
print(key, val)
lr = LicenseRepair(licenseTree=licenseTree, nid_filepath=nid_filepath, hasPL=hasPL,
nid_textNeedTE=nid_textNeedTE, nid_matchedLnameList=nid_matchedLnameList)
# lr.show_licenseTree()
# 遍历输出看一下 确实是DFS的顺序
print('关于projectLicenseTree的一些遍历信息')
for nid in lr.licenseTree.expand_tree(mode=Tree.DEPTH, sorting=False):
if nid == 1:
continue
# (试用一些个函数)
ntag = lr.licenseTree[nid].tag
nidd = lr.licenseTree[nid].identifier
npath = lr.nid_filepath[nid]
nlevel = lr.licenseTree.level(nid) # PL的level=1.
nparent = lr.licenseTree.parent(nid).identifier
nchildren = lr.licenseTree.is_branch(nid)
# print('\t'.join([str(key),val[len('D:\Python\OSSL2//unzips/'):]]))
print('\t'.join([str(nid), str(nidd), str(nlevel), npath, str(nparent), str(nchildren)]))
print('所有结点:', lr.licenseTree.nodes.keys())
print('叶子结点:', [nd.identifier for nd in lr.licenseTree.leaves()])
# (有可能一个许可证都没有此时会导致root变成唯一的叶子结点》》最好趁早退出)
if len(lr.licenseTree.leaves())==1 and lr.licenseTree.leaves()[0].identifier==1:
return lr, lr.hasPL, 0, 0, 0, []
# 找有权限的位置
if lr.hasPL:
lr.search_fixable_places(nlp=nlp)
else:
lr.fixable_nid.append(-1)
lr.fixable_nid_pl.append(-1)
lr.fixable_nid_all.append(-1)
print('找到可修复的位置:')
print('lr.fixable_nid: ', len(lr.fixable_nid), lr.fixable_nid)
# 每个许可证节点生成对应的license对象
# 条款提取 填充self.nid_license
print('开始进行条款提取 都对应生成License对象……')
lr.turn_into_licenseObjects(nlp, ld, ner_model_ee5, re_args, re_model, ac_model)
# 层次兼容性检测
# (找到发生不兼容的位置 及其需求填充self.incomNid_termList
print('开始进行层次化的兼容性检测……')
lr.detect_incompatibility_hierarchically()
if not lr.hasPL: # 需要计算得到'nid=-1'时的self.incomNid_termLists
lr.get_PL_needs_from_childern()
# 找到不兼容and能修复的位置
if lr.hasPL:
lr.get_incom_and_fixable_places()
else: # (此时不管是否兼容 反正都得生成一个新的PL。)
lr.incomAndFixable_nid.append(-1)
print('找到发生不兼容且我们能修复的位置:')
print('lr.incomAndFixable_nid: ', len(lr.incomAndFixable_nid), lr.incomAndFixable_nid)
# 修复
print('开始修复……')
repairMethod = []
DIR = os.path.dirname(os.path.abspath(__file__)) + '/'
fw = open(os.path.join(DIR, 'REPAIRED', repo + '.json'), 'w', encoding="utf-8")
REPAIRED_DATA = []
for nid_to_repair in lr.incomAndFixable_nid:
print('====================================================================')
print('将要修复的位置:', nid_to_repair)
print('所在文件路径:', lr.nid_filepath[nid_to_repair])
## 修复
repairMethod_i, licenseText_repaired = lr.repair_onePlace(nid=nid_to_repair, ld=ld,
nlp=nlp, ner_model_ee5=ner_model_ee5,
re_args=re_args, re_model=re_model)
repairMethod.append(repairMethod_i)
print('修复完成。')
print('建议该位置的许可证文本改为如下:', licenseText_repaired)
REPAIRED_DATA.append({'nid':nid_to_repair,
'filepath':lr.nid_filepath[nid_to_repair],
'method':repairMethod_i,
'text': licenseText_repaired})
json.dump(REPAIRED_DATA, fw)
fw.close()
return lr, lr.hasPL, len(lr.fixable_nid), len(lr.incomNid_termLists), len(lr.incomAndFixable_nid), repairMethod
'''
REGEXP = [
re.compile(r'^copyright \(c\) \.$'),
re.compile(r'^copyright\s*\(c\) ((?!\.+).*?) \. (?:.*)$'),
re.compile(r'((?!\.+).*?)\.\s* copyright\s*\(c\) ((?!\.+).*?) \. (?:.*)$'),
re.compile(r'((?!\.+).*?)\.\s* copyright\s*\(c\) ((?!\.+).*?)\.(?:.*)$')
]
s = 'copyright(c) aaaaaa . .. bbbbbb'
s = 'mit license. copyright (c) gao ya . '
matched = REGEXP[2].match(s)
if matched:
name = matched.groups(0)[1]
print(name)
'''

317
Term.py Normal file
View File

@ -0,0 +1,317 @@
# _*_coding:utf-8_*_
import json
import logging
import os
import re
from itertools import product
from model.config import config as term_config
import utils
'''
一个条款 = 内容 + 极性
(内容是按23列表来固定的)
'''
class Term:
def __init__(self, content=None, atti=None, condInxs=None, recipient=None):
self.content = content
self.atti = atti
self.condInxs = condInxs
if self.condInxs is None:
self.condInxs = []
self.recipient = recipient
if self.recipient is None:
self.recipient = ""
def composeOneSent(self,termlist):
'''
用这些被解构出的属性组装出一条自然语言文本.
:return:
'''
sent = ''
sent += "This license "
sent += 'claims that you '
sent += self.atti +' '
sent += self.content +' '
if self.recipient:
sent += 'for '
sent += self.recipient
if self.condInxs:
sent += ', provided that : '
for i, condInx in enumerate(self.condInxs):
sent += 'you '
sent += termlist[condInx].atti +' '
sent += termlist[condInx].content +' '
if termlist[condInx].recipient:
sent += 'for '
sent += termlist[condInx].recipient + ' '
if i<len(self.condInxs)-1:
sent += 'and '
sent += '.'
return sent
def get(self):
return self.content, self.atti, self.condInxs
def getAtti(self):
return self.atti
def set(self, content=None, atti=None):
if content:
self.content = content
if atti:
self.atti = atti
return
def setContent(self, content=None):
if content:
self.content = content
return
def setAtti(self, atti=None):
if atti:
self.atti = atti
return
def setRecipient(self, recipient=None):
if recipient:
self.recipient = recipient
return
def setCondInxs(self, condInxs=None):
if condInxs:
self.condInxs = condInxs
return
def set_all_default(self):
# self.content = content
self.atti = term_config['attiLabel_type'][0]
self.set_absentAtti()
self.condInxs = []
self.recipient = ""
return
def set_absentAtti(self):
'''
权利cannot义务can
无返回值直接修改自己
'''
if self.atti==term_config['attiLabel_type'][0]:
termId = term_config['term_list'].index(self.content)
attiLabel = term_config['absentAtti'][termId]
absentAtti = term_config['attiLabel_type'][attiLabel]
self.atti = absentAtti
return
def isMentioned(self):
if self.atti == term_config['attiLabel_type'][0]:
return False
return True
def isconflict(self, termB):
'''
是否存在不一致(冲突)
'''
if self.content == termB.content and self.atti != termB.atti: # (这个规则,之后再斟酌吧,,)
return True
return False
def isconflict2(self):
if self.atti == term_config['attiLabel_type'][4]:
return True
return False
def isSameContent(self, termB):
if self.content == termB.content : # (这个规则,之后再修改,,)
return True
return False
def isTwoOccurConflict(self, termB):
'''
self比termB 冲突CL和CL的那种
前置情况都是1/2/3.
'''
la1 = term_config['attiType_label'][self.atti]
la2 = term_config['attiType_label'][termB.atti]
la3 = term_config['atti_moreStrictTable'][la1 - 1][la2 - 1]
# print(la1,la2,la3)
if la3 == 4:
return True
else:
return False
def isMoreStrict(self, termB, termlistA, termlistB):
'''
self比termB 一样or更加严格
前置情况他俩已经都非confilct了都是1/2/3
加上了对condition的考虑
看两者的条件列表是否一一对应兼容 用递归吧(只需要看本身atti即可)
若是看两者本身atti是否兼容
若否看两者的反atti是否兼容
v1 加上了对recipient的比较
>> v2可以不比较 因为已经在content的信息里了
>> v3 还是得比较 但是只有当动作+对象一样时都无或者都有一样才有atti的可比性否则看做不同的条款 兼容性没关系
'''
if not termlistA or not termlistB:
# (第二层进来的)
if termB.atti == term_config['attiLabel_type'][4]:
return False
la1 = term_config['attiType_label'][self.atti]
la2 = term_config['attiType_label'][termB.atti]
la3 = term_config['atti_moreStrictTable'][la1 - 1][la2 - 1]
# print(la1,la2,la3)
if la3 == la1:
return True
# if utils.clean_recipientWords(self.recipient) == utils.clean_recipientWords(termB.recipient):
# return True
# else:
# return False
# (极性一样时:对象是否一样 即 是否同一条款,都兼容)
else:
# return False
# (极性不一样时:若对象一样(即相同条款)则不兼容;若不同对象(即不同条款)没关系 则兼容。)
if utils.clean_recipientWords(self.recipient) == utils.clean_recipientWords(termB.recipient):
return False
else:
return True
else:
# 主线
# 找两者各自的条件列表
condInxsA = self.condInxs
condInxsB = termB.condInxs
FG = True
for kj in termB.condInxs:
if kj not in self.condInxs:
FG = False
break
if not termlistA[kj].isMoreStrict(termlistB[kj], [], []):
FG = False
break
if FG:
# 正
if termB.atti == term_config['attiLabel_type'][4]:
return False
la1 = term_config['attiType_label'][self.atti]
la2 = term_config['attiType_label'][termB.atti]
la3 = term_config['atti_moreStrictTable'][la1 - 1][la2 - 1]
if la3 == la1:
return True
# if utils.clean_recipientWords(self.recipient) == utils.clean_recipientWords(termB.recipient):
# return True
# else:
# return False
else:
# return False
if utils.clean_recipientWords(self.recipient) == utils.clean_recipientWords(termB.recipient):
return False
else:
return True
else:
# 反
if termB.atti == term_config['attiLabel_type'][4]:
return False
la1 = term_config['turn_oppo'][term_config['attiType_label'][self.atti]-1]
la2 = term_config['turn_oppo'][term_config['attiType_label'][termB.atti]-1]
la3 = term_config['atti_moreStrictTable'][la1 - 1][la2 - 1]
if la3 == la1:
return True
# if utils.clean_recipientWords(self.recipient) == utils.clean_recipientWords(termB.recipient):
# return True
# else:
# return False
else:
# return False
if utils.clean_recipientWords(self.recipient) == utils.clean_recipientWords(termB.recipient):
return False
else:
return True
def find_mostStrictAtti(self, termList, corr_cid):
'''
找其中最严格的那种atti不用管selfself是其中的一个
(最严格们冲突 则atti='conflict')
输出 这个term with mostStrictAtti
'''
assert len(set([tt.content for tt in termList]))==1
mostStrictOne = Term(content=self.content)
attis = list(set([tt.atti for tt in termList])) #####
atti_cids = {} # {str:int}
moreStrictAtti = attis[0]
if len(attis)>1:
for at in attis[1:]:
la1 = term_config['attiType_label'][moreStrictAtti]
la2 = term_config['attiType_label'][at]
moreStrictAtti = term_config['attiLabel_type'][term_config['atti_moreStrictTable'][la1 - 1][la2 - 1]]
if moreStrictAtti == term_config['attiLabel_type'][4]:# 已经出现conflict 各取一个代表file即可
# 没问题就算是比如2+4>>4只要记录对应来源cid即可到时候顺着写filepath即可正好是“对XXX文件夹……”
atti_cids[term_config['attiLabel_type'][la1]] = corr_cid[[tt.atti for tt in termList].index(term_config['attiLabel_type'][la1])]
atti_cids[term_config['attiLabel_type'][la2]] = corr_cid[[tt.atti for tt in termList].index(term_config['attiLabel_type'][la2])]
break #####
else:
atti_cids[moreStrictAtti] = corr_cid[[tt.atti for tt in termList].index(moreStrictAtti)] #取一个代表file即可 记录atti_cids只对conflict有意义
# elif len(attis)==1 and moreStrictAtti == term_config['attiLabel_type'][4]:
# atti_cids[term_config['attiLabel_type'][4]] = corr_cid
elif len(attis) == 1:
atti_cids[attis[0]] = corr_cid[0] #取一个代表file即可
mostStrictOne.set(atti=moreStrictAtti)
return mostStrictOne, atti_cids
'''
term = Term()
term.set("Distribute","cannot")
print(term.get())
'''

285
TermRelated.py Normal file
View File

@ -0,0 +1,285 @@
# _*_coding:utf-8_*_
import json
import logging
import os
import re
from itertools import product
import shutil
import utils
from RE import re_predict
from EE5.LocateTerms import ner_predict
'''
在许可证理解的过程中 辅助的一个类没有实际意义
一个条款 = 围绕一个条款动作的 其他细节
单位某条款动作所在的句子 范围内可以找到的条款细节们
'''
DIR = os.path.dirname(os.path.abspath(__file__))+'/'
class TermRelated:
def __init__(self, sentence=None, action_idxs=None, action=None, action_j=None, action_atti=None):
self.Sentence = sentence # strings
self.Action_idxs = action_idxs
self.Action = action # strings
self.action_j = action_j # int (0-22)
self.action_atti = action_atti # str
##
self.Performer = 'The licensor ' # str
self.Recipient = 'this work ' # str
self.Attitude = 'can ' # str
self.Condition = [] # list[ dict{"action":str, "performer":str, "recipient":str, "attitude":str } ]
# condition中可能有多个action 各自捎带着一些细节,,, 若当前tr就是指的是一个conditionAction那它的self.Condition自然没有就行。
def predict_allEntityExtraction(self, ner_model_ee5):
'''
输入self.Sentence
调用已经训练好的模型识别出所有possible的实体们
得到所有实体的对应的words, labs, entities_chunks
'''
EEdir = DIR+'EE5/LocateTerms/'
# (self.Sentence已经在getOOO和getItsSequence都做过清洗了直接OOO就行)
# # 放入EE5的测试数据文件夹
# utils.write_BIO_file(self.Sentence.split(' '), ['O']*len(self.Sentence.split(' ')),
# os.path.join(EEdir, 'data/test', 'oneSentenceFromTR.txt'))
#
# # 进行预测
# ner_predict.main(model=ner_model_ee5)
# print(self.Sentence)
# print(self.Sentence.split(' '))
''' (先用旧的调通代码 等lly的弄好再换进来 '''
# 放入EE5的测试数据文件夹
utils.write_BIO_file([self.Sentence.split(' ')], [['O']*len(self.Sentence.split(' '))],
os.path.join(EEdir, 'data/test', 'oneSentenceFromTR.txt'))
# 进行预测
ner_predict.main(model=ner_model_ee5)
# 从NER结果test-pre/ 得到self的words, labs, entities_chunks
words, labs, entities_chunks = utils.get_entities(
os.path.join(EEdir, 'data/test-pre/', 'oneSentenceFromTR.txt'), clean=False)
assert len(words)==len(labs)
# print(len(words), len(self.Sentence.split(' ')))
assert len(words)==len(self.Sentence.split(' ')) # 因为要保证action的位置依旧 在EE的过程中没被弄乱
##
for d in [
DIR + 'EE5/LocateTerms/data/test/',
DIR + 'EE5/LocateTerms/data/test-pre/',
]:
if os.path.exists(d):
try:
shutil.rmtree(d)
os.mkdir(d)
except Exception as e:
print(e, d)
continue
return words, labs, entities_chunks
def prepare_data_fromEE_toREpredict_0(self, words, labs, entities_chunks):
'''
输入: EE5的输出数据
输出RE的输入数据
'''
# id2rel = utils.get_id2rel(filename=r'./rel2id-relation.json')
dataList = []
possible_CA_list = []
for i, entity_chunk in enumerate(entities_chunks):
# 对每一个实体
## 看看是否出现条件
et_type = entity_chunk[0]
if et_type=='ConditionalAction':
possible_CA_list.append(i)
### (组装)
sp_dict = {}
sp_dict["token"] = words
sp_dict["h"] = {}
sp_dict["h"]["name"] = self.Action # 动作
sp_dict["h"]["pos"] = list(self.Action_idxs)
sp_dict["t"] = {}
sp_dict["t"]["name"] = ' '.join(words[entity_chunk[1]:entity_chunk[2]]) # 另外一个实体
sp_dict["t"]["pos"] = entity_chunk[1:3]
sp_dict["relation"] = 'UNKNOWN'
dataList.append(sp_dict)
#assert len(dataList)==len(entities_chunks)
# (若存在条件(存在条件动作):把它也和其他实体组合一遍(除了自己))
if possible_CA_list:
for cai in possible_CA_list: # (一般也就最多一两个吧)
for i, entity_chunk in enumerate(entities_chunks):
# 对每一个实体
if i==cai:
continue
### (组装)
sp_dict = {}
sp_dict["token"] = words
sp_dict["h"] = {}
sp_dict["h"]["name"] = ' '.join(words[entities_chunks[cai][1]:entities_chunks[cai][2]]) # 条件动作
sp_dict["h"]["pos"] = entities_chunks[cai][1:3]
sp_dict["t"] = {}
sp_dict["t"]["name"] = ' '.join(words[entity_chunk[1]:entity_chunk[2]]) # 另外一个实体
sp_dict["t"]["pos"] = entity_chunk[1:3]
sp_dict["relation"] = 'UNKNOWN'
dataList.append(sp_dict)
return dataList
def compose_one_RE_sample(self, words, etc_h, etc_t):
sp_dict = {}
sp_dict["token"] = words
sp_dict["h"] = {}
sp_dict["h"]["name"] = ' '.join(words[etc_h[1]:etc_h[2]])
sp_dict["h"]["pos"] = etc_h[1:3]
sp_dict["t"] = {}
sp_dict["t"]["name"] = ' '.join(words[etc_t[1]:etc_t[2]])
sp_dict["t"]["pos"] = etc_t[1:3]
sp_dict["relation"] = 'Other'
return sp_dict
def prepare_data_fromEE_toREpredict(self, words, labs, entities_chunks):
'''
输入: EE5的输出数据
输出RE的输入数据
'''
dataList = []
# 找到所有的 各类型实体
all_action_list = []
all_recipient_list = []
all_attitude_list = []
all_condition_list = []
for i, entity_chunk in enumerate(entities_chunks):
# 对每一个实体
et_type = entity_chunk[0]
if et_type=='Action':
all_action_list.append(i)
elif et_type=='Recipient':
all_recipient_list.append(i)
elif et_type=='Attitude':
all_attitude_list.append(i)
elif et_type=='Condition':
all_condition_list.append(i)
# 组装:动作和对象
for k in all_action_list:
for t in all_recipient_list:
sp_dict = self.compose_one_RE_sample(words, entities_chunks[k], entities_chunks[t])
dataList.append(sp_dict)
# 组装:动作和态度
for k in all_action_list:
for t in all_attitude_list:
sp_dict = self.compose_one_RE_sample(words, entities_chunks[k], entities_chunks[t])
dataList.append(sp_dict)
# 组装:动作和条件
for k in all_action_list:
for t in all_condition_list:
sp_dict = self.compose_one_RE_sample(words, entities_chunks[k], entities_chunks[t])
dataList.append(sp_dict)
# 组装:条件和动作
for k in all_condition_list:
for t in all_action_list:
sp_dict = self.compose_one_RE_sample(words, entities_chunks[k], entities_chunks[t])
dataList.append(sp_dict)
return dataList
def predict_relationExtraction(self, dataList, re_args, re_model):
'''
调用已经训练好的模型目的是预测已有action和所有entity的关系类别模型输出的是每一对实体的关系类别
经过检查和过滤其实EE5之后已经有对实体类型的推测了但经过关系分类再一次矫正and去掉关系概率低的搭配
效果填充进self.Performerself.Recipientself.Attitudeself.Condition
'''
REdir = DIR+'RE/'
# 放入RE的测试数据文件夹
utils.write_RE_file(dataList, os.path.join(REdir, 'dataset/ossl2', 'test.txt'))
### 进行预测
# (那些参数已经都变成了默认参数 不用另外再给了。)
test_pre_logits, preds = re_predict.predict_re(args=re_args, lit_model=re_model)
# print(len(preds),len(dataList))
# assert len(preds)==len(dataList)
if len(preds)!=len(dataList):
print('!!!!! len(preds)!=len(dataList) from one sent', len(preds),len(dataList))
return [], []
# 暂时用preds给把dataList里面的label补全
id2rel = utils.get_id2rel(filename=os.path.join(REdir, 'dataset/ossl2', 'rel2id.json'))
dataList_final = []
for i, sp_dict in enumerate(dataList):
sp_dict["relation"] = id2rel[preds[i]]
dataList_final.append(sp_dict)
# (其实也不太需要过滤了,毕竟实体识别已经是一层过滤了(下游就直接遍历找 遇到一个就认可吧))
return test_pre_logits, dataList_final
''' (用不上了,,,,,) '''
def genCandidates_0(self, nlp):
'''
:return: 所有实体无差别
'''
outputFormat = 'json'
dpResult = nlp.annotate(self.Sentence, properties={'annotators': 'depparse', 'outputFormat': outputFormat, })
enhancedPlusPlusDependencies = json.loads(dpResult)["sentences"][0]["enhancedPlusPlusDependencies"]
tokens = json.loads(dpResult)["sentences"][0]["tokens"]
# print(tokens)
candidates = extract_its_hierac_nsubj(enhancedPlusPlusDependencies, self.Action_idxs[0]+1)
candidates.extend(extract_its_hierac_obj(enhancedPlusPlusDependencies, self.Action_idxs[0]+1))
candidates.extend(extrac_its_hierac_mod(enhancedPlusPlusDependencies, self.Action_idxs[0]+1))
candidates.extend(extract_its_hierac_cond(enhancedPlusPlusDependencies, self.Action_idxs[0]+1))
candidates = get_unique_lists_in_list(lis=candidates, isInt = True)
# self.printCandiResults(candidates)
return candidates
def printCandiResults(self, candidates):
print('------------------')
print(self.Sentence)
print(' '.join(self.Sentence.split(' ')[self.Action_idxs[0]:self.Action_idxs[1]]))
print(':::')
print(candidates)
for ids in candidates:
phrase = ' '.join([self.Sentence.split(' ')[id-1] for id in ids])
print(phrase)

487
projectLicenseTree.py Normal file
View File

@ -0,0 +1,487 @@
# -*- coding:utf-8 -*-
'''
'''
import re
import os
import utils
rootDir = os.path.dirname(os.path.abspath(__file__))
#unDir = rootDir + '\\unzips\\'
#unDir = r'D:\GY\OSSL2\repos'
unDir = r'D:\GY\OSSL2\repos_simpled'
#unDir = r'D:\Python\OSSL2\evaluation_projects/repos_simpled_200/'
# unDir = os.path.join(os.path.dirname(rootDir), 'repos')
outputDir000 = rootDir + '/output/'
outputDir = rootDir + '/output/pros/'
DIR = outputDir
licenseDir = os.path.dirname(os.path.abspath(__file__))+'/data/licenses'
def checkLicenseFile(filename,dir):
licensett = ''
if re.findall(r'^license$', filename, flags=re.IGNORECASE) or re.findall(r'^license\.[a-zA-Z]+', filename,
flags=re.IGNORECASE) \
or re.findall(r'^copying$', filename, flags=re.IGNORECASE) or re.findall(r'^copying\.[a-zA-Z]+', filename,
flags=re.IGNORECASE):
with open(os.path.join(dir,filename), 'r', encoding="utf-8") as fr:
for line in fr.readlines():
if line:
licensett += line.strip() + '. '
return licensett
return licensett
REGEXP = [
re.compile(r'^import (.+)$'),
re.compile(r'^from ((?!\.+).*?) import (?:.*)$')
]
def checkPackageImport2(filepath):
try:
imports = []
with open(filepath, 'r', encoding="utf-8") as fr:
for line in fr.readlines():
if "import " in line:
if "from" in line:
match = REGEXP[1].match(line.strip())
if match:
name = match.groups(0)[0]
for im in name.partition(' as ')[0].partition(','):
nm = im.strip().partition('.')[0].strip()
if len(nm) > 1:
imports.append(nm)
else:
match = REGEXP[0].match(line.strip())
if match:
name = match.groups(0)[0]
for im in name.partition(' as ')[0].partition(','):
nm = im.strip().partition('.')[0].strip()
if len(nm) > 1:
imports.append(nm)
return list(set(imports))
except Exception:
print(filepath)
return []
def checkLicenseInline(filepath):
'''
安装ninka需要在Linux上另外处理
使用其Comment extractor Split sentences Filter good sentences
得到inline部分
'''
# return "" # 读取ninka对其处理结果文件
'''
为了简单测试可以暂先使用下面的替代函数
'''
try:
targetText = ""
with open(filepath, 'r', encoding="utf-8") as fr:
fg = False
for line in fr.readlines():
if line.strip().startswith("#"):
targetText += line.strip()[1:].strip() + '. '
elif line.strip().startswith("\'\'\'") or line.strip().startswith("\"\"\""):
if not fg:
# start ...
if line.strip().endswith("\'\'\'", 3, len(line.strip())) or line.strip().endswith("\"\"\"", 3,
len(
line.strip())):
targetText += line.strip()[3:-3].strip() + '. '
else:
targetText += line.strip()[3:].strip() + '. '
fg = True
else:
fg = False
elif line.strip():
if fg:
targetText += line.strip() + '. '
else:
break
fr.close()
if re.findall('license', targetText, flags=re.IGNORECASE):
# print(filepath+str(len(targetText)))
return targetText
else:
return ""
except Exception:
print(filepath)
return ""
from treelib import Tree, Node
tree = Tree()
nid_filepath = {}
nid_textNeedTE = {}
nid_matchedLnameList = {}
license_check, _ = utils.get_licenseNameList1(os.path.dirname(os.path.abspath(__file__))+'/data/filter-exclude-list.txt')
licenseNameList = utils.get_licenseNameList2(licenseDir)
licenseTextDict = utils.get_licenseTextDict2(licenseDir)
def add_node(parent, ziji, ziji_content, checked=True):
'''
:param parent:
:param ziji:
:param ziji_content:
:param checked:
:return:
'''
'''
if checked:
for cnd in tree.children(parent):
if cnd.tag == ziji_content:
rmv_id() ## 加上这个“为节省成本” 但会引起bug而且父子关系和filepath看起来会混乱
# 》》》暂时先不要了,
#虽然确实会让 所有结点数量,不兼容节点数量,会让数量变多挺多的,,,
return cnd.identifier
'''
tree.create_node(parent=parent, identifier=ziji, tag=ziji_content)
return ziji
def update_tag(nid, tag):
# tree.update_node(nid=nid, attrs={'tag':tag}) ## (这个函数似乎没起作用,,)
tree[nid].tag = tag
print("更新PL")
print(nid, tag)
print("现在的PL为")
print(tree[nid].tag)
return
IDsave = 0
def gen_id():
global IDsave
IDsave += 1
return IDsave
def rmv_id():
global IDsave
IDsave -= 1
return IDsave
def checkPro(dir, parent, fg):
'''
:param dir:
:param parent:
:return:
'''
'''
目标项目的存放路径
'''
repoDir = os.path.join(unDir,dir)
dir_prt = parent
pac_prt = parent
print(repoDir) ### 真的有在充分遍历里面的
# 先看file后看py对结果有影响。
FileList = []
for dd in os.listdir(repoDir):
dd_path = os.path.join(repoDir, dd)
if os.path.isfile(dd_path) and not dd_path.endswith(".py"):
FileList.append(dd)
for dd in os.listdir(repoDir):
dd_path = os.path.join(repoDir, dd)
if os.path.isfile(dd_path) and dd_path.endswith(".py"):
FileList.append(dd)
#####
for dd in FileList:
dd_path = os.path.join(repoDir, dd)
print(dd_path) ### 真的有在充分遍历里面的
text = ''
# if not dd_path.endswith(".py") and utils.checkLicenseFileName(dd):
if utils.checkLicenseFileName(dd):
text = utils.read_text(dd_path)
if text and utils.check_text_for_licenseWords(text, license_check, licenseNameList):
'''
matchedLnameList0 = utils.match_availableText_for_possible_refLicenseTexts(text, licenseTextDict)
refText, matchedLnameList1 = utils.add_possible_refLicenseTexts(licenseNameList, text, './data/licenses')
text += refText
'''
matchedLnameList0 = utils.match_availableText_for_possible_refLicenseTexts(text, licenseTextDict)
refText, matchedLnameList1 = utils.add_possible_refLicenseTexts(licenseNameList, text, licenseDir)
textNeedTE = True
if matchedLnameList0:
textNeedTE = False
if parent == 1 and fg != -1:
# PL若多个文件 认为是互相补充的 故合成一份text一个节点
update_tag(nid=fg, tag=tree[fg].tag + text) # setup.py和__pkginfo__.py也可能会进入这里
'''
'''
if nid_textNeedTE[fg] or textNeedTE:
nid_textNeedTE[fg] = True
else:
nid_textNeedTE[fg] = False
# if not nid_textNeedTE[fg] or not textNeedTE:
# nid_textNeedTE[fg] = False
else:
file_id = gen_id()
dir_prt = add_node(parent, file_id, text)
nid_filepath[file_id] = repoDir ###
nid_matchedLnameList[file_id] = matchedLnameList0 + matchedLnameList1
nid_textNeedTE[file_id] = textNeedTE
pac_prt = dir_prt
print('pac_prt=',pac_prt)
if parent == 1:
fg = file_id
if dd_path.endswith(".py"):
pac_prt_py = int(pac_prt) # (同地址赋值;引用赋值)
text = utils.extract_comments_in_pyFile(dd_path)
if text and utils.check_text_for_licenseWords(text, license_check, licenseNameList):
matchedLnameList0 = utils.match_availableText_for_possible_refLicenseTexts(text, licenseTextDict)
refText, matchedLnameList1 = utils.add_possible_refLicenseTexts(licenseNameList, text, licenseDir)
textNeedTE = True
if matchedLnameList0:
textNeedTE = False
if (dd=='setup.py' or dd=='__pkginfo__.py') and parent == 1 and fg != -1:
# setup.py可能也加进去一般只涉及到PL
update_tag(nid=fg, tag=tree[fg].tag + text)
'''
if nid_textNeedTE[fg] or textNeedTE:
nid_textNeedTE[fg] = True
else:
nid_textNeedTE[fg] = False
'''
if not nid_textNeedTE[fg] or not textNeedTE:
nid_textNeedTE[fg] = False
else:
inline_id = gen_id()
pac_prt_py = add_node(pac_prt, inline_id, text)
nid_filepath[inline_id] = os.path.join(repoDir, dd) ###
nid_matchedLnameList[inline_id] = matchedLnameList0 + matchedLnameList1
nid_textNeedTE[inline_id] = textNeedTE
packages = checkPackageImport2(dd_path)
for aa in packages:
if aa in library_license.keys():
ll = library_license[aa] #
print(' ', aa, ':::::', ll)
# (找到ll对应的text)
refText, matchedLnameList1 = utils.add_possible_refLicenseTexts(licenseNameList, ll, licenseDir)
text = ''
#if text: # 能在SPDX找到的才算进去吧
if matchedLnameList1:
ll_id = gen_id()
add_node(pac_prt_py, ll_id, text)
nid_filepath[ll_id] = os.path.join(repoDir, dd) + ':' + aa ###
nid_matchedLnameList[ll_id] = [] + matchedLnameList1
nid_textNeedTE[ll_id] = False
for dd in os.listdir(repoDir):
dd_path = os.path.join(repoDir,dd)
if os.path.isdir(dd_path):
# print(dd_path)
'''
递归
'''
checkPro(dd_path, dir_prt, fg)
return
def check_PL(repo):
repoDir = os.path.join(unDir, repo)
repoDir = os.path.join(repoDir, os.listdir(repoDir)[0])
'''
按从GitHub下载的文件夹 第二层才是正经文件
'''
for file in os.listdir(repoDir):
itsCompletePath = os.path.join(repoDir, file)
print('check_PL:', itsCompletePath)
if os.path.isfile(itsCompletePath):
text = ''
if utils.checkLicenseFileName(file):
text = utils.read_text(itsCompletePath)
if text:
'''
return True
'''
if utils.check_text_for_licenseWords(text, license_check, licenseNameList):
return True
nid_filepath[-1] = repoDir
nid_matchedLnameList[-1] = []
nid_textNeedTE[-1] = False
return False
def get_licenses():
for repo in os.listdir(unDir):
add_node(tree.root, gen_id(), 'root', checked=False)
checkPro(repo,1)
tree.show()
'''
并不与文件结构完全一致某些模块没有显性许可证那会默认与它最近父节点一致那这个边就会省略
'''
return tree
'''
这里是调用入口 从licenseRepair类那里
'''
def get_license_tree(repo):
init()
'''
'''
global tree
tree = Tree()
global nid_filepath
nid_filepath = {}
global nid_textNeedTE
nid_textNeedTE = {}
global nid_matchedLnameList
nid_matchedLnameList = {}
global IDsave
IDsave = 0
#print(license_check)
#print(licenseNameList)
add_node(tree.root, gen_id(), 'root', checked=False)
checkPro(repo, 1, -1)
hasPL = check_PL(repo)
return tree, nid_filepath, hasPL, nid_textNeedTE, nid_matchedLnameList
def cleanIt(text):
text = re.sub('!/usr/bin/env python', ' ', text)
text = re.sub('! /usr/bin/env python', ' ', text)
text = re.sub('!/usr/bin/python', ' ', text)
text = re.sub('! /usr/bin/python', ' ', text)
text = re.sub('-\*- coding: utf-8 -\*-', ' ', text)
text = re.sub('-\*-coding:utf-8-\*-', ' ', text)
text = re.sub('coding utf-8', ' ', text)
text = re.sub('=+', ' ', text)
text = re.sub('-+', ' ', text)
text = re.sub('#+', ' ', text)
text = re.sub('\*+', ' ', text)
text = re.sub('~+', ' ', text)
text = re.sub(' +', ' ', text)
legalCharSet = [
'(', ')', '[', ']', ':', ';', '-', '"', ',', '.', ' '
]
ww = ""
for c in text.lower():
if (c >= 'a' and c <= 'z') or c in legalCharSet:
ww += c
ww = re.sub(' +', ' ', ww)
return ww
def cleanInlineLicenses():
numm = 0
for pro in os.listdir(DIR):
# every pro
for file in os.listdir(os.path.join(DIR, pro)):
if file.startswith("inline-license-"):
# every text
text = ""
with open(os.path.join(DIR, pro, file), 'r', encoding='utf-8') as fr:
for line in fr.readlines():
if line.strip():
text += line.strip() + ' '
fr.close()
# clean the inline text ...
# 尽可能地去躁
text1 = cleanIt(text)
with open(os.path.join(DIR, pro, file.replace("inline-", "inline2-")), 'w', encoding='utf-8') as fw:
fw.write(text1)
fw.close()
numm += 1
print(str(numm) + '/' + str(len(os.listdir(DIR))))
library_license = {}
def init():
with open(outputDir000 + "library_license.txt", 'r', encoding='utf-8')as fr:
for line in fr.readlines():
if line.strip():
line = line.strip()
library_license[line.split(" ::::: ")[0]] = line.split(" ::::: ")[1]
fr.close()
#print(library_license)
#print("library_license: " + str(len(library_license)))
if __name__ == '__main__':
'''
'''
init()
#######
get_licenses()

1480
utils.py Normal file

File diff suppressed because it is too large Load Diff