I'm trying to extract text from various file formats for a search engine and trying to avoid any external packages. I have 10 thousand files to extract text from and a lot of them are WPS files (Microsoft Works - you know, that free office suite that comes preinstalled on many windows boxes).
I was opening the files and using regular expressions and some text sanitizing to try and get decent text from the file. Unfortunately, the text is split into blocks so I got some part-words as long as font names and other junk words from the metadata of the files. I had used libwps in the past but didn't want the dependency in my code. Most Windows based document formats from that stage use some sort of OLE-Stream content that is often kind of difficult to get your head around when your staring at the bytes in a hex editor. After a little reading of the libwps code, a few calculations an jumps in Hex Workshop and a few guesses I managed to work out some code to pull text from this format. Its work in progress and it needs some extensive testing (and it'll get it when I'm using it!) but looks good so far:
import re
import struct
WPSMAGICPATTERN = re.compile(r"(CHNKWKS|CHNKINK)")
WPSSTRIPPATTERN = re.compile(r"\r")
class WPSReader(object):
TEXT_BLOCK = 0x0E00
def __init__(self, file_name):
self.file_name = file_name
self.magic_pattern = WPSMAGICPATTERN
self.strip_pattern = WPSSTRIPPATTERN
def _process_entries(self, entry_buff):
magic, local, next_offset = struct.unpack("<HHI", entry_buff[:8])
if magic != 0x01F8:
raise ReaderError("Invalid format - Entry magic tag incorrect")
entry_pos = 0x08 #2 WORDS & 1 DWORD
for i in range(local):
size = struct.unpack("<H", entry_buff[entry_pos:entry_pos+0x2])[0]
name, offset, size = struct.unpack("<2x4s10xII", entry_buff[entry_pos:entry_pos+size])
if name == "TEXT": #Success!
return (local, 0x00, offset, size)
entry_pos += size
return (local, next_offset, 0x00, 0x00) #Needs to be run again
def extract_text(self):
with open(self.file_name, "rb") as fd:
buff = fd.read()
matches = self.magic_pattern.search(buff)
if not matches:
raise ReaderError("No 'Magic' block: not a valid WPS file")
if matches.groups()[0] == "CHNKINK":
raise ReaderError("Unable to convert a WPS file prior to version 8")
headers_start = matches.start()
entries_pos = headers_start + 24
total_entries = struct.unpack("<12xH", buff[headers_start:headers_start+14])[0]
while True:
entries, next_offset, text_header_offset, text_size = \
self._process_entries(buff[entries_pos:])
if text_size: #TEXT found
break
total_entries -= entries
if total_entries and next_offset:
entries_pos = next_offset + self.TEXT_BLOCK #Move to next block
else:
raise ReaderError("Unable to find TEXT secion. File corrupt?")
text_offset = text_header_offset + headers_start #Move to start of text
block_size = min(self.TEXT_BLOCK, text_size)
text = buff[text_offset:text_offset+block_size]
text_size -= block_size
block_size = min(self.TEXT_BLOCK, text_size)
if text_size:
text_offset = 0x800 #Seems to always be the location of second block
text += buff[text_offset:text_offset+block_size]
text_size -= block_size
if text_size:
text_offset = text_header_offset + headers_start + self.TEXT_BLOCK
text += buff[text_offset:text_offset+text_size]
return self.strip_pattern.sub("\r\n", unicode(text, "UTF16"))
import sys
print WPSReader(sys.argv[1]).extract_text()
import struct
WPSMAGICPATTERN = re.compile(r"(CHNKWKS|CHNKINK)")
WPSSTRIPPATTERN = re.compile(r"\r")
class WPSReader(object):
TEXT_BLOCK = 0x0E00
def __init__(self, file_name):
self.file_name = file_name
self.magic_pattern = WPSMAGICPATTERN
self.strip_pattern = WPSSTRIPPATTERN
def _process_entries(self, entry_buff):
magic, local, next_offset = struct.unpack("<HHI", entry_buff[:8])
if magic != 0x01F8:
raise ReaderError("Invalid format - Entry magic tag incorrect")
entry_pos = 0x08 #2 WORDS & 1 DWORD
for i in range(local):
size = struct.unpack("<H", entry_buff[entry_pos:entry_pos+0x2])[0]
name, offset, size = struct.unpack("<2x4s10xII", entry_buff[entry_pos:entry_pos+size])
if name == "TEXT": #Success!
return (local, 0x00, offset, size)
entry_pos += size
return (local, next_offset, 0x00, 0x00) #Needs to be run again
def extract_text(self):
with open(self.file_name, "rb") as fd:
buff = fd.read()
matches = self.magic_pattern.search(buff)
if not matches:
raise ReaderError("No 'Magic' block: not a valid WPS file")
if matches.groups()[0] == "CHNKINK":
raise ReaderError("Unable to convert a WPS file prior to version 8")
headers_start = matches.start()
entries_pos = headers_start + 24
total_entries = struct.unpack("<12xH", buff[headers_start:headers_start+14])[0]
while True:
entries, next_offset, text_header_offset, text_size = \
self._process_entries(buff[entries_pos:])
if text_size: #TEXT found
break
total_entries -= entries
if total_entries and next_offset:
entries_pos = next_offset + self.TEXT_BLOCK #Move to next block
else:
raise ReaderError("Unable to find TEXT secion. File corrupt?")
text_offset = text_header_offset + headers_start #Move to start of text
block_size = min(self.TEXT_BLOCK, text_size)
text = buff[text_offset:text_offset+block_size]
text_size -= block_size
block_size = min(self.TEXT_BLOCK, text_size)
if text_size:
text_offset = 0x800 #Seems to always be the location of second block
text += buff[text_offset:text_offset+block_size]
text_size -= block_size
if text_size:
text_offset = text_header_offset + headers_start + self.TEXT_BLOCK
text += buff[text_offset:text_offset+text_size]
return self.strip_pattern.sub("\r\n", unicode(text, "UTF16"))
import sys
print WPSReader(sys.argv[1]).extract_text()
The greatest number of files are the old style Word 95-2003 (doc) files. Now I need to try do the same with those!
No comments:
Post a Comment