时间:2020-12-22 python教程 查看: 892
本文实例讲述了python实现的批量分析xml标签中各个类别个数功能。分享给大家供大家参考,具体如下:
文章目录
需要个脚本分析下各个目标的数目 顺带练习下多进程,自用,直接上代码:
# -*- coding: utf-8 -*-
# @Time : 2019/06/10 18:56
# @Author : TuanZhangSama
import os
import xml.etree.ElementTree as ET
from multiprocessing import Pool,freeze_support,cpu_count
import imghdr
import logging
def get_all_xml_path(xml_dir:str,filter=['.xml']):
#遍历文件夹下所有xml
result=[]
#maindir是当前搜索的目录 subdir是当前目录下的文件夹名 file是目录下文件名
for maindir,subdir,file_name_list in os.walk(xml_dir):
for filename in file_name_list:
ext=os.path.splitext(filename)[1]#返回扩展名
if ext in filter:
result.append(os.path.join(maindir,filename))
return result
def analysis_xml(xml_path:str):
tree=ET.parse(xml_path)
root=tree.getroot()
result_dict={}
for obj in root.findall('object'):
obj_name = obj.find('name').text
obj_num=result_dict.get(obj_name,0)+1
result_dict[obj_name]=obj_num
if imghdr.what(xml_path.replace('.xml','.jpg')) != 'jpeg':
print(xml_path.replace('.xml','.jpg'),'is worng')
# logging.info(xml_path.replace('.xml','.jpg'))
if is_valid_jpg(xml_path.replace('.xml','.jpg')):
pass
return result_dict
def analysis_xmls_batch(xmls_path_list:list):
result_list=[]
for i in xmls_path_list:
result_list.append(analysis_xml(i))
return result_list
def collect_result(result_list:list):
all_result_dict={}
for result_dict in result_list:
for key,values in result_dict.items():
obj_num=all_result_dict.get(key,0)+values
all_result_dict[key]=obj_num
return all_result_dict
def main(xml_dir:str,result_save_path:str =None):
r'''根据xml文件统计所有样本的数目.对于文件不完整的图片和有xml但无图片的样本,直接进行删除.默认跑满所有的cpu核心
Parameters
----------
xml_dir : str
xml所在的文件夹.用的递归形式,因此只需保证xml在此目录的子目录下即可.对应的图片和其xml要在同一目录
result_save_path : str
分析结果的日志保存路径.默认 None 无日志
'''
if result_save_path is not None:
assert isinstance(result_save_path,str),'{} is illegal path'.format(result_save_path)
else:
logging.basicConfig(filename=result_save_path,filemode='w',level=logging.INFO)
freeze_support()#windows 上用
xmls_path=get_all_xml_path(xml_dir)
worker_num=cpu_count()
print('your CPU num is',cpu_count())
length=float(len(xmls_path))/float(worker_num)
#计算下标,尽可能均匀地划分输入文件的列表
indices=[int(round(i*length)) for i in range(worker_num+1)]
#生成每个进程要处理的子文件列表
sublists=[xmls_path[indices[i]:indices[i+1]] for i in range(worker_num)]
pool=Pool(processes=worker_num)
all_process_result_list=[]
for i in range(worker_num):
all_process_result_list.append(pool.apply_async(analysis_xmls_batch,args=(sublists[i],)))
pool.close()
pool.join()
print('analysis done!')
_temp_list=[]
for i in all_process_result_list:
_temp_list=_temp_list+i.get()
result=collect_result(_temp_list)
logging.info(result)
print(result)
def is_valid_jpg(jpg_file):
"""判断JPG文件下载是否完整 """
if not os.path.exists(jpg_file):
print(jpg_file,'is not existes')
os.remove(jpg_file.replace('.jpg','.xml'))
with open(jpg_file, 'rb') as fr:
fr.seek(-2, 2)
if fr.read() == b'\xff\xd9':
return True
else:
os.remove(jpg_file)
os.remove(jpg_file.replace('.jpg','.xml'))
print(jpg_file)
logging.error(jpg_file,'is imperfect img')
return False
if __name__=='__main__':
test_dir='/home/chiebotgpuhq/Share/winshare/origin'
save_path='/home/chiebotgpuhq/MyCode/python/pytorch/mmdetection-master/result.log'
main(test_dir,save_path)
希望本文所述对大家Python程序设计有所帮助。