Module fusus.page
Single page processing.
Expand source code Browse git
"""Single page processing.
"""
import os
import json
import pprint
import cv2
import numpy as np
from IPython.display import HTML, display
from tf.core.helpers import unexpanduser
from .lib import (
DEFAULT_EXTENSION,
parseStages,
parseBands,
parseMarks,
cropBorders,
removeBorders,
showImage,
writeImage,
splitext,
getNbLink,
)
from .clean import addBox, cluster, connected, reborder
from .lines import getInkDistribution
from .layout import (
applyHRules,
getBlocks,
getStretches,
getStripes,
grayInterBlocks,
overlay,
)
MARK_HEADERS = """
band
seq
mark
kept
value
connectdegree
connectborder
top
bottom
left
right
""".strip().split()
HEADERS = tuple(
"""
page
stripe
block
line
left
top
right
bottom
confidence
letters
punc
""".strip().split()
)
DATA_TYPES = tuple(
"""
int
str
int
int
int
int
int
int
str
str
""".strip().split()
)
class Page:
def __init__(
self, engine, f, minimal=False, sizeW=1, sizeH=1, batch=False, boxed=True
):
"""All processing steps for a single page.
Parameters
----------
engine: object
The `fusus.book.Book` object
f: string
The file name of the scanned page with extension, without directory
sizeW: float, default 1
If the image is a fraction of a page, this is the fraction of the width
sizeH: float, default 1
If the image is a fraction of a page, this is the fraction of the size
minimal: boolean, optional `False`
If true, do not read image files, just initialize data structures
batch: boolean, optional `False`
Whether to run in batch mode.
In batch mode everything is geared to the final output.
Less intermediate results are computed and stored.
Less feedback happens on the console.
boxed: boolean, optional `True`
If in batch mode, produce also images that display the cleaned marks
in boxes.
"""
self.engine = engine
C = engine.C
tm = engine.tm
error = tm.error
self.file = f
(self.bare, self.ext) = splitext(f, withDot=False)
self.empty = False
self.batch = batch
self.boxed = boxed
self.stages = {}
self.blocks = {}
self.dataHeaders = dict(char=HEADERS[0:-1], word=HEADERS, line=HEADERS[0:-3])
self.dataTypes = dict(
char=DATA_TYPES[0:-1], word=DATA_TYPES, line=DATA_TYPES[0:-3]
)
if minimal:
self.stages = {}
else:
inDir = C.inDir
path = f"{inDir}/{f}"
if not batch and not os.path.exists(path):
error(f"Page file not found: {path}")
return
orig = cv2.imread(path)
(maxH, maxW) = orig.shape[0:2]
self.pageH = maxH if not sizeH or sizeH == 1 else int(round(maxH / sizeH))
self.pageW = maxW if not sizeW or sizeW == 1 else int(round(maxW / sizeW))
self.stages = {"orig": orig}
def show(self, stage=None, band=None, mark=None, **displayParams):
"""Displays processing stages of an page.
See `fusus.parameters.STAGES`.
Parameters
----------
stage: string | iterable, optional `None`
If no stage is passed, all stages are shown as thumbnails.
Otherwise, the indicated stages are shown.
If a string, it may be a comma-separated list of stage names.
Otherwise it is an iterable of stage names.
band: string | iterable, optional `None`
If no band is passed, no bands are indicated.
Otherwise, the indicated bands are shown.
If a string, it may be a comma-separated list of band names.
Otherwise it is an iterable of band names.
mark: string | iterable, optional `None`
If `None` is passed, no marks are shown.
If `""` is passed, all marks on the selected bands are shown.
Otherwise, the indicated mark boxes are shown, irrespective
of their bands:
If given as a string, it may be a comma-separated list of mark names.
Otherwise it is an iterable of mark names.
This information will be taken from the result of the `markData` stage.
display: dict, optional
A set of display parameters, such as `width`, `height`
(anything accepted by `IPython.display.Image`).
Notes
-----
The mark option works for the "boxed" stage:
All marks not specified in the mark parameter will not be shown.
But this option also works for all other image stages: the marks
will be displayed on a fresh copy of that stage.
When used for a grayscale stage, the color of the mark boxes is lost.
"""
engine = self.engine
tm = engine.tm
error = tm.error
C = engine.C
cd = engine.cd
stages = self.stages
marks = engine.marks
blocks = self.blocks
for s in parseStages(stage, set(stages), C.stageOrder, error):
stageData = stages[s]
(stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[s]
white = C.whiteRGB if stageColor else C.whiteGRS
if stageType == "data":
display(HTML(f"<hr>\n<div><b>{s}</b>: <i>data:</i></div>"))
self._serial(s, stageData, stageExt)
elif stageType == "link":
path = self.stagePath(s)
if os.path.exists(path):
showPath = unexpanduser(f"{cd}{path}")
nbLink = getNbLink(showPath, stageData)
if nbLink is None:
display(HTML(f"{stageData} in {showPath}"))
else:
display(
HTML(
f"""<hr>\n<div><b>{s}</b>: {nbLink}"""
f""" (local file: {showPath})</div>"""
)
)
else:
display(
HTML(
f"<hr>\n<div><b>{s}</b>: "
f"<i>{showPath} does not exist.</i></div>"
)
)
else:
img = stageData
headingInfo = []
if band is not None or mark is not None:
img = (
stages["demarginedC"]
if s == "boxed" and mark is not None
else stageData
).copy()
for ((stripe, block), data) in blocks.items():
bands = data["bands"]
doBands = (
() if band is None else parseBands(band, set(bands), error)
)
doBandSet = set(doBands)
bandRep = f" with bands {', '.join(doBands)}" if doBands else ""
doMarks = (
set()
if mark is None
else parseMarks(mark, marks, set(doBands), error)
)
markRep = (
f" with marks {', '.join(sorted(doMarks))}"
if doMarks
else ""
)
headingInfo.append(
f"<b>{stripe}{block}</b> {bandRep}{markRep}"
)
(leftB, topB, rightB, bottomB) = data["inner"]
imBW = rightB - leftB
for band in doBands:
bandInfo = bands[band]
lines = bandInfo["lines"]
bColor = bandInfo["color"]
for (up, lo) in lines:
theUpper = topB + up
theLower = topB + lo
theLeft = leftB + 10
theRight = leftB + imBW - 10
cv2.rectangle(
img,
(theLeft, theUpper),
(theRight, theLower),
bColor,
2,
)
overlay(
img,
leftB,
theUpper,
theLeft,
theLower,
white,
bColor,
)
overlay(
img,
theRight,
theUpper,
leftB + imBW,
theLower,
white,
bColor,
)
markData = stages.get("markData", {})
markLegend = {}
for (band, bandMarks) in markData.items():
if doBands and band not in doBandSet:
continue
for ((seq, mrk), hits) in bandMarks.items():
if mrk not in doMarks:
continue
markKey = f"{'' if band == 'main' else band[0]}{seq}"
markValue = (band, mrk, len(hits))
markLegend[markKey] = markValue
for (
kept,
value,
connDegree,
connectBorder,
stripe,
block,
left,
top,
right,
bottom,
) in hits:
addBox(
C,
img,
left,
top,
right,
bottom,
kept,
band,
seq,
connDegree,
)
if markLegend:
html = []
html.append(
"<details open><summary>Mark legend</summary><table>"
)
html.append(
"<tr><th>acro</th><th>band</th>"
"<th>mark</th><th>hits</th></tr>"
)
for (k, (b, m, n)) in sorted(markLegend.items()):
html.append(
f"<tr><td>{k}</td><td>{b}</td>"
f"<td>{m}</td><td>{n}</td></tr>"
)
html.append("</table></details>")
display(HTML("".join(html)))
display(HTML(f"<hr>\n<div><b>{s}</b> {';'.join(headingInfo)}</div>"))
showImage(img, **displayParams)
def stagePath(self, stage, inter=None):
engine = self.engine
C = engine.C
bare = self.bare
ext = self.ext
(stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[stage]
dest = getattr(C, stageDir or "interDir")
inter = "" if inter is None else f"-{inter}"
trail = stage if stagePart is None else "" if not stagePart else stagePart
trail = "" if not trail else f"-{trail}"
ext = DEFAULT_EXTENSION if stageType == "image" else (stageExt or ext)
base = f"{dest}/{bare}{inter}{trail}"
return f"{base}.{ext}"
def read(self, stage=None):
"""Reads processing data for selected stages from disk
Parameters
----------
stage: string | iterable, optional `None`
If no stage is passed, all stages will be read, if corresponding files
are present.
Otherwise, the indicated stages are read.
"""
engine = self.engine
tm = engine.tm
error = tm.error
C = engine.C
stages = self.stages
for s in parseStages(stage, set(C.stages), C.stageOrder, error):
(stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[s]
if stageType == "link":
# stages of type link will be written to disk upon creation
# and not stored and need not be retrieved
pass
else:
sPath = self.stagePath(s)
if not os.path.exists(sPath):
stages[s] = None
if s in {"normalized", "char", "word"}:
self.empty = True
continue
if stageType == "image":
stages[s] = cv2.imread(sPath)
elif stageType == "data":
with open(sPath) as f:
stages[s] = self._ingest(s, stageType, stageExt, f)
def write(self, stage=None, perBlock=False):
"""Writes processing stages of an page to disk.
Parameters
----------
stage: string | iterable, optional `None`
If no stage is passed, all stages are shown as thumbnails.
Otherwise, the indicated stages are shown.
If a string, it may be a comma-separated list of stage names.
Otherwise it is an iterable of stage names.
perBlock: boolean, optional `False`
If True, the stage output will be split into blocks and written
to disk separately. The stripe and block are appended
to the file name.
Returns
-------
None
The stages are written into the `inter` or `clean` subdirectory,
with the name of the stage appended to the file name.
If `clean`, the name of the stage is omitted.
"""
engine = self.engine
tm = engine.tm
error = tm.error
C = engine.C
stages = self.stages
blocks = self.blocks
for s in parseStages(stage, set(C.stages), C.stageOrder, error):
if s not in stages:
continue
stageData = stages[s]
(stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[s]
if stageType == "image":
if perBlock:
for ((stripe, block), data) in blocks.items():
blockSpec = f"{stripe:02d}{block}"
(leftB, topB, rightB, bottomB) = data["inner"]
roi = stageData[topB:bottomB, leftB:rightB]
thisPath = self.stagePath(stage, inter=blockSpec)
writeImage(roi, thisPath)
else:
writeImage(stageData, self.stagePath(s))
elif stageType == "data":
with open(self.stagePath(s), "w") as f:
self._serial(s, stageData, stageExt, handle=f)
elif stageType == "link":
# stages of type link will be written to disk upon creation
# and not stored
pass
def _serial(self, stage, data, extension, handle=None):
"""serializes data in accordance with file type.
Parameters
----------
stage: string
The processing stage to which the data belongs
data: any
The data to serialize. The type of data must be compatible with
the *extension*.
extension: string
The file type according to which the data must be serialized.
handle: string, optional `None`
If `None`, output is prepared for display in a Jupter notebook,
else it is a file handle and the data is serialized
in the canonical way and written to that file.
Returns
-------
string
The serialized data.
If the extension does not match a recognized file type,
the Python `repr` of the data is returned.
Notes
-----
The following data type/extension combinations are supported:
extension | data type
--- | ---
html | html text
tsv | tuple/list of tuple/list
json | json
"""
headers = self.dataHeaders.get(stage, None)
header = "\t".join(str(head) for head in headers) if headers else None
if handle:
if stage == "markData":
source = data
data = []
data.append(MARK_HEADERS)
for (band, markInfo) in sorted(source.items()):
for ((seq, mark), entries) in sorted(markInfo.items()):
for entry in sorted(entries):
data.append((band, seq, mark, *entry))
if header:
handle.write(f"{header}\n")
handle.write(
"".join("\t".join(str(field) for field in row) + "\n" for row in data)
if extension == "tsv"
else json.dumps(data)
if extension == "json"
else data
if extension == "html"
else repr(data)
)
else:
if header:
print(header)
if stage == "markData":
self._showCleanInfo()
elif extension == "tsv":
for row in data:
print("\t".join(str(col) for col in row))
else:
pprint.pp(data)
def _ingest(self, stage, tp, extension, f):
"""ingests stage data in accordance with file type.
Parameters
----------
stage: string
The processing stage to which the data belongs
tp: string
The type of processing stage
extension: string
The file type according to which the data must be serialized.
f: string
A file handle of the file that contains the data to be ingested.
Returns
-------
data:
The data read from disk.
The data will be structured according to the type of stage and file.
Notes
-----
The following data type/extension combinations are supported:
extension | data type
--- | ---
html | html text
tsv | tuple/list of tuple/list
json | json
"""
if extension == "tsv":
dataTypes = self.dataTypes[stage]
data = []
next(f) # header line
for line in f:
fields = line.rstrip("\n").split("\t")
fields = tuple(
int(x) if dataTypes[i] == "int" else x
for (i, x) in enumerate(fields)
)
data.append(fields)
elif extension == "json":
data = json.load(f)
elif extension == "html":
pass
else:
data = f.read()
return data
def doNormalize(self):
"""Normalizes a page.
Previously needed to unskew pages.
But now we assume pages are already skewed.
Skewing turned out to be risky: when pages are filled in unusual
ways, we got unexpected and unwanted rotations.
So we don't do that anymore.
If the input page images have skew artefacts
(black sharp triangles in the corners)
as a result of previous skewing these will be removed.
Normalization produces the stages:
* *gray*: grayscale version of *orig*
* *blurred*: inverted, black-white, blurred without skew artefacts,
needed for histograms later on;
* *normalized*: *gray* without skew artefacts;
* *normalizedC*: *orig* without skew artefacts.
"""
if self.empty:
return
engine = self.engine
tm = engine.tm
info = tm.info
C = engine.C
batch = self.batch
boxed = self.boxed
stages = self.stages
orig = stages["orig"]
gray = cv2.cvtColor(orig, cv2.COLOR_BGR2GRAY)
stages["gray"] = gray
blurredGray = cv2.GaussianBlur(gray, (C.blurX, C.blurY), 0, 0)
(th, blurredGray) = cv2.threshold(
blurredGray, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU
)
crop = cropBorders(blurredGray)
normalized = gray.copy()
removeBorders(normalized, crop, C.whiteGRS)
stages["normalized"] = normalized
if not batch or boxed:
normalizedC = orig.copy()
removeBorders(normalizedC, crop, C.whiteRGB)
stages["normalizedC"] = normalizedC
blurred = cv2.GaussianBlur(normalized, (C.blurX, C.blurY), 0, 0)
(th, blurred) = cv2.threshold(
blurred, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU
)
stages["blurred"] = blurred
# detect if the image is empty
pts = cv2.findNonZero(blurred)
if pts is None:
self.empty = True
if not batch:
info("empty page")
return
self.empty = False
def doLayout(self):
"""Divide the page into stripes and the stripes into blocks.
We detect vertical strokes as block separators and horizontal strokes
as separators to split off top and bottom material.
A page may or may not be partially divided into blocks.
Where there is a vertical stroke, we define a stripe: the
horizontal band that contains the vertical stroke tightly and extends to
the full with of the page.
Between the stripes corresponding to block separators we have stripes that
are not split into blocks.
The stripes will be numbered from top to bottom, starting at 1.
If a stripe is not split, it defines a roi (region of interest) with
label `(i, '')`.
If it is split, it defines blocks with labels `(i, 'r')` and `(i, 'l')`.
Every horizontal stripe will be examined. We have to determine whether
it is a top separator or a bottom separator.
As a rule of thumb: horizontal stripes in the top stripe are top-separators,
all other horizontal stripes are bottom separators.
If there are multiple horizontal strokes in a roi, the most aggressive
one will be taken, i.e. the one that causes the most matarial to be discarded.
All further operations will take place on these blocks (and not on the
page as a whole).
The result of this stage is, besides the blocks, an image of the page
with the blocks marked and labelled.
"""
batch = self.batch
boxed = self.boxed
engine = self.engine
C = engine.C
debug = C.debug
tm = engine.tm
indent = tm.indent
info = tm.info
pageW = self.pageW
pageH = self.pageH
stages = self.stages
if not batch or boxed:
stages["layout"] = stages["normalizedC"].copy()
indent(level=3)
stretchesH = getStretches(C, info, stages, pageW, True, batch)
stretchesV = getStretches(C, info, stages, pageH, False, batch)
stripes = getStripes(stages, stretchesV)
blocks = getBlocks(C, info, stages, pageH, stripes, stretchesH, batch)
if debug:
showImage(stages["layout"])
self.blocks = blocks
applyHRules(C, stages, stretchesH, stripes, blocks, batch, boxed)
emptyBlocks = getInkDistribution(C, info, stages, pageH, blocks, batch, boxed)
if not batch:
grayInterBlocks(C, stages, blocks)
for b in emptyBlocks:
del blocks[b]
def cleaning(self, mark=None, block=None, line=None, showKept=False):
"""Remove marks from the page.
The blocks of the page are cleaned of marks.
New stages of the page are added:
* *clean* all targeted marks removed
* *cleanh* all targeted marks highlighted in light gray
* *boxed* all targeted marks boxed in light gray
* *markData* information about each detected mark.
Parameters
----------
mark: iterable of tuples (band, mark, [params]), optional `None`
If `None`, all marks that are presented in the book
directory are used.
Otherwise, a series of marks is specified together with the band
where this mark is searched in. Optionally you can also
put parameters in the tuple: the accuracy, connectBorder and connectRatio.
block: (integer, string), optional `None`
Block identifier. If specified, only this block will be cleaned.
If absent, cleans all blocks.
line: integer, optional `None`
Line number specifying the line numbers to clean.
In all specified blocks, only the line with this number will be cleaned.
If absent, cleans all lines in the specified blocks.
showKept: boolean, optional `False`
Whether to show the mark candidates that are kept.
If False, kept marks do not show up as green boxes,
and they do not contribute to the markData layer.
"""
if self.empty:
return
engine = self.engine
tm = engine.tm
indent = tm.indent
info = tm.info
error = tm.error
warning = tm.warning
C = engine.C
marks = engine.marks
batch = self.batch
boxed = self.boxed
markParams = C.markParams
boxBorder = C.boxBorder
connectBorder = C.connectBorder
threshold = C.connectThreshold
maxHits = C.maxHits
color = dict(
clean=C.cleanRGB,
cleanh=C.cleanhRGB,
)
if mark is None:
searchMarks = {subdir: markItems for (subdir, markItems) in marks.items()}
else:
searchMarks = {}
for item in mark:
(band, name) = item[0:2]
if band not in marks or name not in marks[band]:
error(f"No such mark: {band}/{mark}")
continue
params = item[2] if len(item) > 2 else {}
for (acro, v) in params.items():
if acro not in markParams:
error(f"Unknown parameter `{acro}` = `{v}`")
configuredMark = marks[band][name]
seq = configuredMark["seq"]
searchMarks.setdefault(band, {})[name] = dict(
seq=seq, gray=configuredMark["gray"]
)
for (acro, full) in markParams.items():
searchMarks[band][name][full] = params.get(
acro, configuredMark[full]
)
stages = self.stages
demargined = stages.get("demargined", stages["gray"])
if batch:
resultStages = ("clean", "boxed") if boxed else ("clean",)
if boxed:
demarginedC = stages.get("demarginedC", stages["orig"])
else:
demarginedC = stages.get("demarginedC", stages["orig"])
resultStages = ("clean", "cleanh", "boxed")
for stage in resultStages:
stages[stage] = (demarginedC if stage == "boxed" else demargined).copy()
tasks = [
(
stage,
stages[stage],
None if stage == "boxed" else color[stage],
boxBorder if stage == "boxed" else -1,
)
for stage in resultStages
]
foundHits = {}
cleanClr = color["clean"]
blocks = self.blocks
markResults = {}
for ((stripe, block), data) in blocks.items():
if block is not None and block != (stripe, block):
continue
(leftB, topB, rightB, bottomB) = data["inner"]
thisDemargined = demargined[topB:bottomB, leftB:rightB]
if not batch or boxed:
thisBoxed = stages["boxed"][topB:bottomB, leftB:rightB]
theUpper = None
theLower = None
maxH = bottomB - topB
for (band, markData) in searchMarks.items():
if "bands" not in data:
# error(f"No bands in {stripe}{block}")
continue
bandData = data["bands"][band]
lines = bandData["lines"]
for (markName, markInfo) in markData.items():
foundHits.setdefault(band, {})[markName] = 0
seq = markInfo["seq"]
mark = markInfo["gray"]
connectBorder = markInfo["connectBorder"]
accuracy = markInfo["accuracy"]
ratio = markInfo["connectRatio"]
(markH, markW) = mark.shape[:2]
nPts = 0
clusters = []
for (i, (up, lo)) in enumerate(lines):
if line is not None:
if i < line - 1:
continue
elif i > line - 1:
break
if line is not None and i == line - 1:
if theUpper is None or theUpper > up:
theUpper = up
if theLower is None or theLower < lo:
theLower = lo
roi = thisDemargined[up : lo + 1]
(roih, roiw) = roi.shape[:2]
if roih < markH or roiw < markW:
# search template exceeds roi image
continue
result = cv2.matchTemplate(roi, mark, cv2.TM_CCOEFF_NORMED)
loc = np.where(result >= accuracy)
pts = list(zip(*loc))
# if too many hits: bad template or required accuracy too low
if len(pts) > maxHits:
error(
f"mark '{band}:{markName}':"
f" too many hits: {len(pts)} > {maxHits}"
)
warning("Increase accuracy for this template")
continue
if not pts:
continue
# fuzzy matching produces several hits in the neighbourhood
# of marks. We have to reduce that to the best hit.
# We cluster the hits into clusters of neighbouring hits.
nPts += len(pts)
clusters = cluster(pts, result)
# We pick the representant hit from each cluster and
# check the ink connectedness
# Explanation in `fusus.clean`
for (pt, value) in clusters:
connDegree = connected(
markH, markW, connectBorder, threshold, roi, pt
)
pt = (pt[0] + up + topB, pt[1] + leftB)
(left, top, right, bottom) = (
pt[1],
pt[0],
pt[1] + markW,
pt[0] + markH,
)
if connDegree > ratio:
if showKept and (not batch or boxed):
im = stages["boxed"]
addBox(
C,
im,
left,
top,
right,
bottom,
True,
band,
seq,
connDegree,
)
markResults.setdefault(band, {}).setdefault(
(seq, markName), []
).append(
(
True,
value,
connDegree,
connectBorder,
stripe,
block,
left,
top,
right,
bottom,
)
)
else:
if batch and not boxed:
cv2.rectangle(
stages["clean"],
(left, top),
(right, bottom),
cleanClr,
-1,
)
else:
for (stage, im, clr, brd) in tasks:
isBoxed = stage == "boxed"
if isBoxed:
addBox(
C,
im,
left,
top,
right,
bottom,
False,
band,
seq,
connDegree,
)
markResults.setdefault(band, {}).setdefault(
(seq, markName), []
).append(
(
False,
value,
connDegree,
connectBorder,
stripe,
block,
left,
top,
right,
bottom,
)
)
else:
cv2.rectangle(
im,
(left, top),
(right, bottom),
clr,
-1,
)
if not batch or boxed:
if line is not None and theUpper is not None and theLower is not None:
grace = 20
thisTop = max(0, theUpper - grace)
thisBottom = min(maxH, theLower + grace)
info(
f"block {stripe}{block} line {line} BEFORE/AFTER cleaning\n",
tm=False,
)
roi = thisDemargined[thisTop:thisBottom]
showImage(roi)
roiBoxed = thisBoxed[thisTop:thisBottom]
showImage(roiBoxed)
stages["markData"] = markResults
if line is None:
for (band, bandMarks) in sorted(markResults.items()):
for ((seq, mark), entries) in sorted(bandMarks.items()):
indent(level=2)
kept = sum(1 for e in entries if e[0])
wiped = len(entries) - kept
warning(
f"{seq:>2} - {band:<10}: {mark:<20}"
f" wiped {wiped:>4} x, kept {kept:>4} x",
tm=False,
)
else:
self.show(stage="markData")
indent(level=1)
clean = stages["clean"]
(th, threshed) = cv2.threshold(
clean, 127, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU
)
pts = cv2.findNonZero(threshed)
if pts is None:
self.empty = True
if not batch:
info("empty page")
return
info("cleaning done")
def _showCleanInfo(self):
"""Pretty-prints the result of the cleaning stage."""
engine = self.engine
C = engine.C
grey = C.greyGRS
tm = engine.tm
info = tm.info
indent = tm.indent
stages = self.stages
markData = stages.get("markData", {})
total = 0
total = 0
for (band, markInfo) in sorted(markData.items()):
indent(level=0)
info(band, tm=False)
for ((seq, mark), entries) in sorted(markInfo.items()):
indent(level=1)
wiped = sum(1 for e in entries if not e[0])
total += wiped
notWiped = len(entries) - wiped
markImage = reborder(engine.marks[band][mark]["gray"], 2, grey)
showImage(markImage)
info(
f"{seq:>2}: {mark:<20} wiped {wiped:>4} x, kept {notWiped:>4} x",
tm=False,
)
info(
f"{'':24} kept {notWiped:>4} x",
tm=False,
)
for (
k,
value,
conn,
border,
stripe,
block,
top,
bottom,
left,
right,
) in sorted(entries):
indent(level=2)
wRep = "kept" if k else "wiped"
block = f"{stripe}{block}"
info(
f"{wRep:<5} [{block:>3}]"
f" tblr={top:>4} {bottom:>4} {left:>4} {right:>4},"
f" value={value:5.2f} conn={conn:5.3f} border={border:>2}",
tm=False,
)
def ocring(self):
"""Calls the OCR engine for a page."""
engine = self.engine
OCR = engine.OCR
OCR.read(self)
OCR.proofing(self)
def proofing(self):
"""Produces proofing images"""
engine = self.engine
OCR = engine.OCR
OCR.proofing(self)
Classes
class Page (engine, f, minimal=False, sizeW=1, sizeH=1, batch=False, boxed=True)
-
All processing steps for a single page.
Parameters
engine
:object
- The
Book
object f
:string
- The file name of the scanned page with extension, without directory
sizeW
:float
, default1
- If the image is a fraction of a page, this is the fraction of the width
sizeH
:float
, default1
- If the image is a fraction of a page, this is the fraction of the size
minimal
:boolean
, optionalFalse
- If true, do not read image files, just initialize data structures
batch
:boolean
, optionalFalse
- Whether to run in batch mode. In batch mode everything is geared to the final output. Less intermediate results are computed and stored. Less feedback happens on the console.
boxed
:boolean
, optionalTrue
- If in batch mode, produce also images that display the cleaned marks in boxes.
Expand source code Browse git
class Page: def __init__( self, engine, f, minimal=False, sizeW=1, sizeH=1, batch=False, boxed=True ): """All processing steps for a single page. Parameters ---------- engine: object The `fusus.book.Book` object f: string The file name of the scanned page with extension, without directory sizeW: float, default 1 If the image is a fraction of a page, this is the fraction of the width sizeH: float, default 1 If the image is a fraction of a page, this is the fraction of the size minimal: boolean, optional `False` If true, do not read image files, just initialize data structures batch: boolean, optional `False` Whether to run in batch mode. In batch mode everything is geared to the final output. Less intermediate results are computed and stored. Less feedback happens on the console. boxed: boolean, optional `True` If in batch mode, produce also images that display the cleaned marks in boxes. """ self.engine = engine C = engine.C tm = engine.tm error = tm.error self.file = f (self.bare, self.ext) = splitext(f, withDot=False) self.empty = False self.batch = batch self.boxed = boxed self.stages = {} self.blocks = {} self.dataHeaders = dict(char=HEADERS[0:-1], word=HEADERS, line=HEADERS[0:-3]) self.dataTypes = dict( char=DATA_TYPES[0:-1], word=DATA_TYPES, line=DATA_TYPES[0:-3] ) if minimal: self.stages = {} else: inDir = C.inDir path = f"{inDir}/{f}" if not batch and not os.path.exists(path): error(f"Page file not found: {path}") return orig = cv2.imread(path) (maxH, maxW) = orig.shape[0:2] self.pageH = maxH if not sizeH or sizeH == 1 else int(round(maxH / sizeH)) self.pageW = maxW if not sizeW or sizeW == 1 else int(round(maxW / sizeW)) self.stages = {"orig": orig} def show(self, stage=None, band=None, mark=None, **displayParams): """Displays processing stages of an page. See `fusus.parameters.STAGES`. Parameters ---------- stage: string | iterable, optional `None` If no stage is passed, all stages are shown as thumbnails. Otherwise, the indicated stages are shown. If a string, it may be a comma-separated list of stage names. Otherwise it is an iterable of stage names. band: string | iterable, optional `None` If no band is passed, no bands are indicated. Otherwise, the indicated bands are shown. If a string, it may be a comma-separated list of band names. Otherwise it is an iterable of band names. mark: string | iterable, optional `None` If `None` is passed, no marks are shown. If `""` is passed, all marks on the selected bands are shown. Otherwise, the indicated mark boxes are shown, irrespective of their bands: If given as a string, it may be a comma-separated list of mark names. Otherwise it is an iterable of mark names. This information will be taken from the result of the `markData` stage. display: dict, optional A set of display parameters, such as `width`, `height` (anything accepted by `IPython.display.Image`). Notes ----- The mark option works for the "boxed" stage: All marks not specified in the mark parameter will not be shown. But this option also works for all other image stages: the marks will be displayed on a fresh copy of that stage. When used for a grayscale stage, the color of the mark boxes is lost. """ engine = self.engine tm = engine.tm error = tm.error C = engine.C cd = engine.cd stages = self.stages marks = engine.marks blocks = self.blocks for s in parseStages(stage, set(stages), C.stageOrder, error): stageData = stages[s] (stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[s] white = C.whiteRGB if stageColor else C.whiteGRS if stageType == "data": display(HTML(f"<hr>\n<div><b>{s}</b>: <i>data:</i></div>")) self._serial(s, stageData, stageExt) elif stageType == "link": path = self.stagePath(s) if os.path.exists(path): showPath = unexpanduser(f"{cd}{path}") nbLink = getNbLink(showPath, stageData) if nbLink is None: display(HTML(f"{stageData} in {showPath}")) else: display( HTML( f"""<hr>\n<div><b>{s}</b>: {nbLink}""" f""" (local file: {showPath})</div>""" ) ) else: display( HTML( f"<hr>\n<div><b>{s}</b>: " f"<i>{showPath} does not exist.</i></div>" ) ) else: img = stageData headingInfo = [] if band is not None or mark is not None: img = ( stages["demarginedC"] if s == "boxed" and mark is not None else stageData ).copy() for ((stripe, block), data) in blocks.items(): bands = data["bands"] doBands = ( () if band is None else parseBands(band, set(bands), error) ) doBandSet = set(doBands) bandRep = f" with bands {', '.join(doBands)}" if doBands else "" doMarks = ( set() if mark is None else parseMarks(mark, marks, set(doBands), error) ) markRep = ( f" with marks {', '.join(sorted(doMarks))}" if doMarks else "" ) headingInfo.append( f"<b>{stripe}{block}</b> {bandRep}{markRep}" ) (leftB, topB, rightB, bottomB) = data["inner"] imBW = rightB - leftB for band in doBands: bandInfo = bands[band] lines = bandInfo["lines"] bColor = bandInfo["color"] for (up, lo) in lines: theUpper = topB + up theLower = topB + lo theLeft = leftB + 10 theRight = leftB + imBW - 10 cv2.rectangle( img, (theLeft, theUpper), (theRight, theLower), bColor, 2, ) overlay( img, leftB, theUpper, theLeft, theLower, white, bColor, ) overlay( img, theRight, theUpper, leftB + imBW, theLower, white, bColor, ) markData = stages.get("markData", {}) markLegend = {} for (band, bandMarks) in markData.items(): if doBands and band not in doBandSet: continue for ((seq, mrk), hits) in bandMarks.items(): if mrk not in doMarks: continue markKey = f"{'' if band == 'main' else band[0]}{seq}" markValue = (band, mrk, len(hits)) markLegend[markKey] = markValue for ( kept, value, connDegree, connectBorder, stripe, block, left, top, right, bottom, ) in hits: addBox( C, img, left, top, right, bottom, kept, band, seq, connDegree, ) if markLegend: html = [] html.append( "<details open><summary>Mark legend</summary><table>" ) html.append( "<tr><th>acro</th><th>band</th>" "<th>mark</th><th>hits</th></tr>" ) for (k, (b, m, n)) in sorted(markLegend.items()): html.append( f"<tr><td>{k}</td><td>{b}</td>" f"<td>{m}</td><td>{n}</td></tr>" ) html.append("</table></details>") display(HTML("".join(html))) display(HTML(f"<hr>\n<div><b>{s}</b> {';'.join(headingInfo)}</div>")) showImage(img, **displayParams) def stagePath(self, stage, inter=None): engine = self.engine C = engine.C bare = self.bare ext = self.ext (stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[stage] dest = getattr(C, stageDir or "interDir") inter = "" if inter is None else f"-{inter}" trail = stage if stagePart is None else "" if not stagePart else stagePart trail = "" if not trail else f"-{trail}" ext = DEFAULT_EXTENSION if stageType == "image" else (stageExt or ext) base = f"{dest}/{bare}{inter}{trail}" return f"{base}.{ext}" def read(self, stage=None): """Reads processing data for selected stages from disk Parameters ---------- stage: string | iterable, optional `None` If no stage is passed, all stages will be read, if corresponding files are present. Otherwise, the indicated stages are read. """ engine = self.engine tm = engine.tm error = tm.error C = engine.C stages = self.stages for s in parseStages(stage, set(C.stages), C.stageOrder, error): (stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[s] if stageType == "link": # stages of type link will be written to disk upon creation # and not stored and need not be retrieved pass else: sPath = self.stagePath(s) if not os.path.exists(sPath): stages[s] = None if s in {"normalized", "char", "word"}: self.empty = True continue if stageType == "image": stages[s] = cv2.imread(sPath) elif stageType == "data": with open(sPath) as f: stages[s] = self._ingest(s, stageType, stageExt, f) def write(self, stage=None, perBlock=False): """Writes processing stages of an page to disk. Parameters ---------- stage: string | iterable, optional `None` If no stage is passed, all stages are shown as thumbnails. Otherwise, the indicated stages are shown. If a string, it may be a comma-separated list of stage names. Otherwise it is an iterable of stage names. perBlock: boolean, optional `False` If True, the stage output will be split into blocks and written to disk separately. The stripe and block are appended to the file name. Returns ------- None The stages are written into the `inter` or `clean` subdirectory, with the name of the stage appended to the file name. If `clean`, the name of the stage is omitted. """ engine = self.engine tm = engine.tm error = tm.error C = engine.C stages = self.stages blocks = self.blocks for s in parseStages(stage, set(C.stages), C.stageOrder, error): if s not in stages: continue stageData = stages[s] (stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[s] if stageType == "image": if perBlock: for ((stripe, block), data) in blocks.items(): blockSpec = f"{stripe:02d}{block}" (leftB, topB, rightB, bottomB) = data["inner"] roi = stageData[topB:bottomB, leftB:rightB] thisPath = self.stagePath(stage, inter=blockSpec) writeImage(roi, thisPath) else: writeImage(stageData, self.stagePath(s)) elif stageType == "data": with open(self.stagePath(s), "w") as f: self._serial(s, stageData, stageExt, handle=f) elif stageType == "link": # stages of type link will be written to disk upon creation # and not stored pass def _serial(self, stage, data, extension, handle=None): """serializes data in accordance with file type. Parameters ---------- stage: string The processing stage to which the data belongs data: any The data to serialize. The type of data must be compatible with the *extension*. extension: string The file type according to which the data must be serialized. handle: string, optional `None` If `None`, output is prepared for display in a Jupter notebook, else it is a file handle and the data is serialized in the canonical way and written to that file. Returns ------- string The serialized data. If the extension does not match a recognized file type, the Python `repr` of the data is returned. Notes ----- The following data type/extension combinations are supported: extension | data type --- | --- html | html text tsv | tuple/list of tuple/list json | json """ headers = self.dataHeaders.get(stage, None) header = "\t".join(str(head) for head in headers) if headers else None if handle: if stage == "markData": source = data data = [] data.append(MARK_HEADERS) for (band, markInfo) in sorted(source.items()): for ((seq, mark), entries) in sorted(markInfo.items()): for entry in sorted(entries): data.append((band, seq, mark, *entry)) if header: handle.write(f"{header}\n") handle.write( "".join("\t".join(str(field) for field in row) + "\n" for row in data) if extension == "tsv" else json.dumps(data) if extension == "json" else data if extension == "html" else repr(data) ) else: if header: print(header) if stage == "markData": self._showCleanInfo() elif extension == "tsv": for row in data: print("\t".join(str(col) for col in row)) else: pprint.pp(data) def _ingest(self, stage, tp, extension, f): """ingests stage data in accordance with file type. Parameters ---------- stage: string The processing stage to which the data belongs tp: string The type of processing stage extension: string The file type according to which the data must be serialized. f: string A file handle of the file that contains the data to be ingested. Returns ------- data: The data read from disk. The data will be structured according to the type of stage and file. Notes ----- The following data type/extension combinations are supported: extension | data type --- | --- html | html text tsv | tuple/list of tuple/list json | json """ if extension == "tsv": dataTypes = self.dataTypes[stage] data = [] next(f) # header line for line in f: fields = line.rstrip("\n").split("\t") fields = tuple( int(x) if dataTypes[i] == "int" else x for (i, x) in enumerate(fields) ) data.append(fields) elif extension == "json": data = json.load(f) elif extension == "html": pass else: data = f.read() return data def doNormalize(self): """Normalizes a page. Previously needed to unskew pages. But now we assume pages are already skewed. Skewing turned out to be risky: when pages are filled in unusual ways, we got unexpected and unwanted rotations. So we don't do that anymore. If the input page images have skew artefacts (black sharp triangles in the corners) as a result of previous skewing these will be removed. Normalization produces the stages: * *gray*: grayscale version of *orig* * *blurred*: inverted, black-white, blurred without skew artefacts, needed for histograms later on; * *normalized*: *gray* without skew artefacts; * *normalizedC*: *orig* without skew artefacts. """ if self.empty: return engine = self.engine tm = engine.tm info = tm.info C = engine.C batch = self.batch boxed = self.boxed stages = self.stages orig = stages["orig"] gray = cv2.cvtColor(orig, cv2.COLOR_BGR2GRAY) stages["gray"] = gray blurredGray = cv2.GaussianBlur(gray, (C.blurX, C.blurY), 0, 0) (th, blurredGray) = cv2.threshold( blurredGray, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU ) crop = cropBorders(blurredGray) normalized = gray.copy() removeBorders(normalized, crop, C.whiteGRS) stages["normalized"] = normalized if not batch or boxed: normalizedC = orig.copy() removeBorders(normalizedC, crop, C.whiteRGB) stages["normalizedC"] = normalizedC blurred = cv2.GaussianBlur(normalized, (C.blurX, C.blurY), 0, 0) (th, blurred) = cv2.threshold( blurred, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU ) stages["blurred"] = blurred # detect if the image is empty pts = cv2.findNonZero(blurred) if pts is None: self.empty = True if not batch: info("empty page") return self.empty = False def doLayout(self): """Divide the page into stripes and the stripes into blocks. We detect vertical strokes as block separators and horizontal strokes as separators to split off top and bottom material. A page may or may not be partially divided into blocks. Where there is a vertical stroke, we define a stripe: the horizontal band that contains the vertical stroke tightly and extends to the full with of the page. Between the stripes corresponding to block separators we have stripes that are not split into blocks. The stripes will be numbered from top to bottom, starting at 1. If a stripe is not split, it defines a roi (region of interest) with label `(i, '')`. If it is split, it defines blocks with labels `(i, 'r')` and `(i, 'l')`. Every horizontal stripe will be examined. We have to determine whether it is a top separator or a bottom separator. As a rule of thumb: horizontal stripes in the top stripe are top-separators, all other horizontal stripes are bottom separators. If there are multiple horizontal strokes in a roi, the most aggressive one will be taken, i.e. the one that causes the most matarial to be discarded. All further operations will take place on these blocks (and not on the page as a whole). The result of this stage is, besides the blocks, an image of the page with the blocks marked and labelled. """ batch = self.batch boxed = self.boxed engine = self.engine C = engine.C debug = C.debug tm = engine.tm indent = tm.indent info = tm.info pageW = self.pageW pageH = self.pageH stages = self.stages if not batch or boxed: stages["layout"] = stages["normalizedC"].copy() indent(level=3) stretchesH = getStretches(C, info, stages, pageW, True, batch) stretchesV = getStretches(C, info, stages, pageH, False, batch) stripes = getStripes(stages, stretchesV) blocks = getBlocks(C, info, stages, pageH, stripes, stretchesH, batch) if debug: showImage(stages["layout"]) self.blocks = blocks applyHRules(C, stages, stretchesH, stripes, blocks, batch, boxed) emptyBlocks = getInkDistribution(C, info, stages, pageH, blocks, batch, boxed) if not batch: grayInterBlocks(C, stages, blocks) for b in emptyBlocks: del blocks[b] def cleaning(self, mark=None, block=None, line=None, showKept=False): """Remove marks from the page. The blocks of the page are cleaned of marks. New stages of the page are added: * *clean* all targeted marks removed * *cleanh* all targeted marks highlighted in light gray * *boxed* all targeted marks boxed in light gray * *markData* information about each detected mark. Parameters ---------- mark: iterable of tuples (band, mark, [params]), optional `None` If `None`, all marks that are presented in the book directory are used. Otherwise, a series of marks is specified together with the band where this mark is searched in. Optionally you can also put parameters in the tuple: the accuracy, connectBorder and connectRatio. block: (integer, string), optional `None` Block identifier. If specified, only this block will be cleaned. If absent, cleans all blocks. line: integer, optional `None` Line number specifying the line numbers to clean. In all specified blocks, only the line with this number will be cleaned. If absent, cleans all lines in the specified blocks. showKept: boolean, optional `False` Whether to show the mark candidates that are kept. If False, kept marks do not show up as green boxes, and they do not contribute to the markData layer. """ if self.empty: return engine = self.engine tm = engine.tm indent = tm.indent info = tm.info error = tm.error warning = tm.warning C = engine.C marks = engine.marks batch = self.batch boxed = self.boxed markParams = C.markParams boxBorder = C.boxBorder connectBorder = C.connectBorder threshold = C.connectThreshold maxHits = C.maxHits color = dict( clean=C.cleanRGB, cleanh=C.cleanhRGB, ) if mark is None: searchMarks = {subdir: markItems for (subdir, markItems) in marks.items()} else: searchMarks = {} for item in mark: (band, name) = item[0:2] if band not in marks or name not in marks[band]: error(f"No such mark: {band}/{mark}") continue params = item[2] if len(item) > 2 else {} for (acro, v) in params.items(): if acro not in markParams: error(f"Unknown parameter `{acro}` = `{v}`") configuredMark = marks[band][name] seq = configuredMark["seq"] searchMarks.setdefault(band, {})[name] = dict( seq=seq, gray=configuredMark["gray"] ) for (acro, full) in markParams.items(): searchMarks[band][name][full] = params.get( acro, configuredMark[full] ) stages = self.stages demargined = stages.get("demargined", stages["gray"]) if batch: resultStages = ("clean", "boxed") if boxed else ("clean",) if boxed: demarginedC = stages.get("demarginedC", stages["orig"]) else: demarginedC = stages.get("demarginedC", stages["orig"]) resultStages = ("clean", "cleanh", "boxed") for stage in resultStages: stages[stage] = (demarginedC if stage == "boxed" else demargined).copy() tasks = [ ( stage, stages[stage], None if stage == "boxed" else color[stage], boxBorder if stage == "boxed" else -1, ) for stage in resultStages ] foundHits = {} cleanClr = color["clean"] blocks = self.blocks markResults = {} for ((stripe, block), data) in blocks.items(): if block is not None and block != (stripe, block): continue (leftB, topB, rightB, bottomB) = data["inner"] thisDemargined = demargined[topB:bottomB, leftB:rightB] if not batch or boxed: thisBoxed = stages["boxed"][topB:bottomB, leftB:rightB] theUpper = None theLower = None maxH = bottomB - topB for (band, markData) in searchMarks.items(): if "bands" not in data: # error(f"No bands in {stripe}{block}") continue bandData = data["bands"][band] lines = bandData["lines"] for (markName, markInfo) in markData.items(): foundHits.setdefault(band, {})[markName] = 0 seq = markInfo["seq"] mark = markInfo["gray"] connectBorder = markInfo["connectBorder"] accuracy = markInfo["accuracy"] ratio = markInfo["connectRatio"] (markH, markW) = mark.shape[:2] nPts = 0 clusters = [] for (i, (up, lo)) in enumerate(lines): if line is not None: if i < line - 1: continue elif i > line - 1: break if line is not None and i == line - 1: if theUpper is None or theUpper > up: theUpper = up if theLower is None or theLower < lo: theLower = lo roi = thisDemargined[up : lo + 1] (roih, roiw) = roi.shape[:2] if roih < markH or roiw < markW: # search template exceeds roi image continue result = cv2.matchTemplate(roi, mark, cv2.TM_CCOEFF_NORMED) loc = np.where(result >= accuracy) pts = list(zip(*loc)) # if too many hits: bad template or required accuracy too low if len(pts) > maxHits: error( f"mark '{band}:{markName}':" f" too many hits: {len(pts)} > {maxHits}" ) warning("Increase accuracy for this template") continue if not pts: continue # fuzzy matching produces several hits in the neighbourhood # of marks. We have to reduce that to the best hit. # We cluster the hits into clusters of neighbouring hits. nPts += len(pts) clusters = cluster(pts, result) # We pick the representant hit from each cluster and # check the ink connectedness # Explanation in `fusus.clean` for (pt, value) in clusters: connDegree = connected( markH, markW, connectBorder, threshold, roi, pt ) pt = (pt[0] + up + topB, pt[1] + leftB) (left, top, right, bottom) = ( pt[1], pt[0], pt[1] + markW, pt[0] + markH, ) if connDegree > ratio: if showKept and (not batch or boxed): im = stages["boxed"] addBox( C, im, left, top, right, bottom, True, band, seq, connDegree, ) markResults.setdefault(band, {}).setdefault( (seq, markName), [] ).append( ( True, value, connDegree, connectBorder, stripe, block, left, top, right, bottom, ) ) else: if batch and not boxed: cv2.rectangle( stages["clean"], (left, top), (right, bottom), cleanClr, -1, ) else: for (stage, im, clr, brd) in tasks: isBoxed = stage == "boxed" if isBoxed: addBox( C, im, left, top, right, bottom, False, band, seq, connDegree, ) markResults.setdefault(band, {}).setdefault( (seq, markName), [] ).append( ( False, value, connDegree, connectBorder, stripe, block, left, top, right, bottom, ) ) else: cv2.rectangle( im, (left, top), (right, bottom), clr, -1, ) if not batch or boxed: if line is not None and theUpper is not None and theLower is not None: grace = 20 thisTop = max(0, theUpper - grace) thisBottom = min(maxH, theLower + grace) info( f"block {stripe}{block} line {line} BEFORE/AFTER cleaning\n", tm=False, ) roi = thisDemargined[thisTop:thisBottom] showImage(roi) roiBoxed = thisBoxed[thisTop:thisBottom] showImage(roiBoxed) stages["markData"] = markResults if line is None: for (band, bandMarks) in sorted(markResults.items()): for ((seq, mark), entries) in sorted(bandMarks.items()): indent(level=2) kept = sum(1 for e in entries if e[0]) wiped = len(entries) - kept warning( f"{seq:>2} - {band:<10}: {mark:<20}" f" wiped {wiped:>4} x, kept {kept:>4} x", tm=False, ) else: self.show(stage="markData") indent(level=1) clean = stages["clean"] (th, threshed) = cv2.threshold( clean, 127, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU ) pts = cv2.findNonZero(threshed) if pts is None: self.empty = True if not batch: info("empty page") return info("cleaning done") def _showCleanInfo(self): """Pretty-prints the result of the cleaning stage.""" engine = self.engine C = engine.C grey = C.greyGRS tm = engine.tm info = tm.info indent = tm.indent stages = self.stages markData = stages.get("markData", {}) total = 0 total = 0 for (band, markInfo) in sorted(markData.items()): indent(level=0) info(band, tm=False) for ((seq, mark), entries) in sorted(markInfo.items()): indent(level=1) wiped = sum(1 for e in entries if not e[0]) total += wiped notWiped = len(entries) - wiped markImage = reborder(engine.marks[band][mark]["gray"], 2, grey) showImage(markImage) info( f"{seq:>2}: {mark:<20} wiped {wiped:>4} x, kept {notWiped:>4} x", tm=False, ) info( f"{'':24} kept {notWiped:>4} x", tm=False, ) for ( k, value, conn, border, stripe, block, top, bottom, left, right, ) in sorted(entries): indent(level=2) wRep = "kept" if k else "wiped" block = f"{stripe}{block}" info( f"{wRep:<5} [{block:>3}]" f" tblr={top:>4} {bottom:>4} {left:>4} {right:>4}," f" value={value:5.2f} conn={conn:5.3f} border={border:>2}", tm=False, ) def ocring(self): """Calls the OCR engine for a page.""" engine = self.engine OCR = engine.OCR OCR.read(self) OCR.proofing(self) def proofing(self): """Produces proofing images""" engine = self.engine OCR = engine.OCR OCR.proofing(self)
Methods
def cleaning(self, mark=None, block=None, line=None, showKept=False)
-
Remove marks from the page.
The blocks of the page are cleaned of marks.
New stages of the page are added:
- clean all targeted marks removed
- cleanh all targeted marks highlighted in light gray
- boxed all targeted marks boxed in light gray
- markData information about each detected mark.
Parameters
mark
:iterable
oftuples (band, mark, [params])
, optionalNone
- If
None
, all marks that are presented in the book directory are used. Otherwise, a series of marks is specified together with the band where this mark is searched in. Optionally you can also put parameters in the tuple: the accuracy, connectBorder and connectRatio. block
:(integer, string)
, optionalNone
- Block identifier. If specified, only this block will be cleaned. If absent, cleans all blocks.
line
:integer
, optionalNone
- Line number specifying the line numbers to clean. In all specified blocks, only the line with this number will be cleaned. If absent, cleans all lines in the specified blocks.
showKept
:boolean
, optionalFalse
- Whether to show the mark candidates that are kept. If False, kept marks do not show up as green boxes, and they do not contribute to the markData layer.
Expand source code Browse git
def cleaning(self, mark=None, block=None, line=None, showKept=False): """Remove marks from the page. The blocks of the page are cleaned of marks. New stages of the page are added: * *clean* all targeted marks removed * *cleanh* all targeted marks highlighted in light gray * *boxed* all targeted marks boxed in light gray * *markData* information about each detected mark. Parameters ---------- mark: iterable of tuples (band, mark, [params]), optional `None` If `None`, all marks that are presented in the book directory are used. Otherwise, a series of marks is specified together with the band where this mark is searched in. Optionally you can also put parameters in the tuple: the accuracy, connectBorder and connectRatio. block: (integer, string), optional `None` Block identifier. If specified, only this block will be cleaned. If absent, cleans all blocks. line: integer, optional `None` Line number specifying the line numbers to clean. In all specified blocks, only the line with this number will be cleaned. If absent, cleans all lines in the specified blocks. showKept: boolean, optional `False` Whether to show the mark candidates that are kept. If False, kept marks do not show up as green boxes, and they do not contribute to the markData layer. """ if self.empty: return engine = self.engine tm = engine.tm indent = tm.indent info = tm.info error = tm.error warning = tm.warning C = engine.C marks = engine.marks batch = self.batch boxed = self.boxed markParams = C.markParams boxBorder = C.boxBorder connectBorder = C.connectBorder threshold = C.connectThreshold maxHits = C.maxHits color = dict( clean=C.cleanRGB, cleanh=C.cleanhRGB, ) if mark is None: searchMarks = {subdir: markItems for (subdir, markItems) in marks.items()} else: searchMarks = {} for item in mark: (band, name) = item[0:2] if band not in marks or name not in marks[band]: error(f"No such mark: {band}/{mark}") continue params = item[2] if len(item) > 2 else {} for (acro, v) in params.items(): if acro not in markParams: error(f"Unknown parameter `{acro}` = `{v}`") configuredMark = marks[band][name] seq = configuredMark["seq"] searchMarks.setdefault(band, {})[name] = dict( seq=seq, gray=configuredMark["gray"] ) for (acro, full) in markParams.items(): searchMarks[band][name][full] = params.get( acro, configuredMark[full] ) stages = self.stages demargined = stages.get("demargined", stages["gray"]) if batch: resultStages = ("clean", "boxed") if boxed else ("clean",) if boxed: demarginedC = stages.get("demarginedC", stages["orig"]) else: demarginedC = stages.get("demarginedC", stages["orig"]) resultStages = ("clean", "cleanh", "boxed") for stage in resultStages: stages[stage] = (demarginedC if stage == "boxed" else demargined).copy() tasks = [ ( stage, stages[stage], None if stage == "boxed" else color[stage], boxBorder if stage == "boxed" else -1, ) for stage in resultStages ] foundHits = {} cleanClr = color["clean"] blocks = self.blocks markResults = {} for ((stripe, block), data) in blocks.items(): if block is not None and block != (stripe, block): continue (leftB, topB, rightB, bottomB) = data["inner"] thisDemargined = demargined[topB:bottomB, leftB:rightB] if not batch or boxed: thisBoxed = stages["boxed"][topB:bottomB, leftB:rightB] theUpper = None theLower = None maxH = bottomB - topB for (band, markData) in searchMarks.items(): if "bands" not in data: # error(f"No bands in {stripe}{block}") continue bandData = data["bands"][band] lines = bandData["lines"] for (markName, markInfo) in markData.items(): foundHits.setdefault(band, {})[markName] = 0 seq = markInfo["seq"] mark = markInfo["gray"] connectBorder = markInfo["connectBorder"] accuracy = markInfo["accuracy"] ratio = markInfo["connectRatio"] (markH, markW) = mark.shape[:2] nPts = 0 clusters = [] for (i, (up, lo)) in enumerate(lines): if line is not None: if i < line - 1: continue elif i > line - 1: break if line is not None and i == line - 1: if theUpper is None or theUpper > up: theUpper = up if theLower is None or theLower < lo: theLower = lo roi = thisDemargined[up : lo + 1] (roih, roiw) = roi.shape[:2] if roih < markH or roiw < markW: # search template exceeds roi image continue result = cv2.matchTemplate(roi, mark, cv2.TM_CCOEFF_NORMED) loc = np.where(result >= accuracy) pts = list(zip(*loc)) # if too many hits: bad template or required accuracy too low if len(pts) > maxHits: error( f"mark '{band}:{markName}':" f" too many hits: {len(pts)} > {maxHits}" ) warning("Increase accuracy for this template") continue if not pts: continue # fuzzy matching produces several hits in the neighbourhood # of marks. We have to reduce that to the best hit. # We cluster the hits into clusters of neighbouring hits. nPts += len(pts) clusters = cluster(pts, result) # We pick the representant hit from each cluster and # check the ink connectedness # Explanation in `fusus.clean` for (pt, value) in clusters: connDegree = connected( markH, markW, connectBorder, threshold, roi, pt ) pt = (pt[0] + up + topB, pt[1] + leftB) (left, top, right, bottom) = ( pt[1], pt[0], pt[1] + markW, pt[0] + markH, ) if connDegree > ratio: if showKept and (not batch or boxed): im = stages["boxed"] addBox( C, im, left, top, right, bottom, True, band, seq, connDegree, ) markResults.setdefault(band, {}).setdefault( (seq, markName), [] ).append( ( True, value, connDegree, connectBorder, stripe, block, left, top, right, bottom, ) ) else: if batch and not boxed: cv2.rectangle( stages["clean"], (left, top), (right, bottom), cleanClr, -1, ) else: for (stage, im, clr, brd) in tasks: isBoxed = stage == "boxed" if isBoxed: addBox( C, im, left, top, right, bottom, False, band, seq, connDegree, ) markResults.setdefault(band, {}).setdefault( (seq, markName), [] ).append( ( False, value, connDegree, connectBorder, stripe, block, left, top, right, bottom, ) ) else: cv2.rectangle( im, (left, top), (right, bottom), clr, -1, ) if not batch or boxed: if line is not None and theUpper is not None and theLower is not None: grace = 20 thisTop = max(0, theUpper - grace) thisBottom = min(maxH, theLower + grace) info( f"block {stripe}{block} line {line} BEFORE/AFTER cleaning\n", tm=False, ) roi = thisDemargined[thisTop:thisBottom] showImage(roi) roiBoxed = thisBoxed[thisTop:thisBottom] showImage(roiBoxed) stages["markData"] = markResults if line is None: for (band, bandMarks) in sorted(markResults.items()): for ((seq, mark), entries) in sorted(bandMarks.items()): indent(level=2) kept = sum(1 for e in entries if e[0]) wiped = len(entries) - kept warning( f"{seq:>2} - {band:<10}: {mark:<20}" f" wiped {wiped:>4} x, kept {kept:>4} x", tm=False, ) else: self.show(stage="markData") indent(level=1) clean = stages["clean"] (th, threshed) = cv2.threshold( clean, 127, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU ) pts = cv2.findNonZero(threshed) if pts is None: self.empty = True if not batch: info("empty page") return info("cleaning done")
def doLayout(self)
-
Divide the page into stripes and the stripes into blocks.
We detect vertical strokes as block separators and horizontal strokes as separators to split off top and bottom material.
A page may or may not be partially divided into blocks. Where there is a vertical stroke, we define a stripe: the horizontal band that contains the vertical stroke tightly and extends to the full with of the page.
Between the stripes corresponding to block separators we have stripes that are not split into blocks.
The stripes will be numbered from top to bottom, starting at 1.
If a stripe is not split, it defines a roi (region of interest) with label
(i, '')
.If it is split, it defines blocks with labels
(i, 'r')
and(i, 'l')
.Every horizontal stripe will be examined. We have to determine whether it is a top separator or a bottom separator. As a rule of thumb: horizontal stripes in the top stripe are top-separators, all other horizontal stripes are bottom separators.
If there are multiple horizontal strokes in a roi, the most aggressive one will be taken, i.e. the one that causes the most matarial to be discarded.
All further operations will take place on these blocks (and not on the page as a whole).
The result of this stage is, besides the blocks, an image of the page with the blocks marked and labelled.
Expand source code Browse git
def doLayout(self): """Divide the page into stripes and the stripes into blocks. We detect vertical strokes as block separators and horizontal strokes as separators to split off top and bottom material. A page may or may not be partially divided into blocks. Where there is a vertical stroke, we define a stripe: the horizontal band that contains the vertical stroke tightly and extends to the full with of the page. Between the stripes corresponding to block separators we have stripes that are not split into blocks. The stripes will be numbered from top to bottom, starting at 1. If a stripe is not split, it defines a roi (region of interest) with label `(i, '')`. If it is split, it defines blocks with labels `(i, 'r')` and `(i, 'l')`. Every horizontal stripe will be examined. We have to determine whether it is a top separator or a bottom separator. As a rule of thumb: horizontal stripes in the top stripe are top-separators, all other horizontal stripes are bottom separators. If there are multiple horizontal strokes in a roi, the most aggressive one will be taken, i.e. the one that causes the most matarial to be discarded. All further operations will take place on these blocks (and not on the page as a whole). The result of this stage is, besides the blocks, an image of the page with the blocks marked and labelled. """ batch = self.batch boxed = self.boxed engine = self.engine C = engine.C debug = C.debug tm = engine.tm indent = tm.indent info = tm.info pageW = self.pageW pageH = self.pageH stages = self.stages if not batch or boxed: stages["layout"] = stages["normalizedC"].copy() indent(level=3) stretchesH = getStretches(C, info, stages, pageW, True, batch) stretchesV = getStretches(C, info, stages, pageH, False, batch) stripes = getStripes(stages, stretchesV) blocks = getBlocks(C, info, stages, pageH, stripes, stretchesH, batch) if debug: showImage(stages["layout"]) self.blocks = blocks applyHRules(C, stages, stretchesH, stripes, blocks, batch, boxed) emptyBlocks = getInkDistribution(C, info, stages, pageH, blocks, batch, boxed) if not batch: grayInterBlocks(C, stages, blocks) for b in emptyBlocks: del blocks[b]
def doNormalize(self)
-
Normalizes a page.
Previously needed to unskew pages. But now we assume pages are already skewed.
Skewing turned out to be risky: when pages are filled in unusual ways, we got unexpected and unwanted rotations. So we don't do that anymore.
If the input page images have skew artefacts (black sharp triangles in the corners) as a result of previous skewing these will be removed.
Normalization produces the stages:
- gray: grayscale version of orig
- blurred: inverted, black-white, blurred without skew artefacts, needed for histograms later on;
- normalized: gray without skew artefacts;
- normalizedC: orig without skew artefacts.
Expand source code Browse git
def doNormalize(self): """Normalizes a page. Previously needed to unskew pages. But now we assume pages are already skewed. Skewing turned out to be risky: when pages are filled in unusual ways, we got unexpected and unwanted rotations. So we don't do that anymore. If the input page images have skew artefacts (black sharp triangles in the corners) as a result of previous skewing these will be removed. Normalization produces the stages: * *gray*: grayscale version of *orig* * *blurred*: inverted, black-white, blurred without skew artefacts, needed for histograms later on; * *normalized*: *gray* without skew artefacts; * *normalizedC*: *orig* without skew artefacts. """ if self.empty: return engine = self.engine tm = engine.tm info = tm.info C = engine.C batch = self.batch boxed = self.boxed stages = self.stages orig = stages["orig"] gray = cv2.cvtColor(orig, cv2.COLOR_BGR2GRAY) stages["gray"] = gray blurredGray = cv2.GaussianBlur(gray, (C.blurX, C.blurY), 0, 0) (th, blurredGray) = cv2.threshold( blurredGray, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU ) crop = cropBorders(blurredGray) normalized = gray.copy() removeBorders(normalized, crop, C.whiteGRS) stages["normalized"] = normalized if not batch or boxed: normalizedC = orig.copy() removeBorders(normalizedC, crop, C.whiteRGB) stages["normalizedC"] = normalizedC blurred = cv2.GaussianBlur(normalized, (C.blurX, C.blurY), 0, 0) (th, blurred) = cv2.threshold( blurred, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU ) stages["blurred"] = blurred # detect if the image is empty pts = cv2.findNonZero(blurred) if pts is None: self.empty = True if not batch: info("empty page") return self.empty = False
def ocring(self)
-
Calls the OCR engine for a page.
Expand source code Browse git
def ocring(self): """Calls the OCR engine for a page.""" engine = self.engine OCR = engine.OCR OCR.read(self) OCR.proofing(self)
def proofing(self)
-
Produces proofing images
Expand source code Browse git
def proofing(self): """Produces proofing images""" engine = self.engine OCR = engine.OCR OCR.proofing(self)
def read(self, stage=None)
-
Reads processing data for selected stages from disk
Parameters
stage
:string | iterable
, optionalNone
- If no stage is passed, all stages will be read, if corresponding files are present. Otherwise, the indicated stages are read.
Expand source code Browse git
def read(self, stage=None): """Reads processing data for selected stages from disk Parameters ---------- stage: string | iterable, optional `None` If no stage is passed, all stages will be read, if corresponding files are present. Otherwise, the indicated stages are read. """ engine = self.engine tm = engine.tm error = tm.error C = engine.C stages = self.stages for s in parseStages(stage, set(C.stages), C.stageOrder, error): (stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[s] if stageType == "link": # stages of type link will be written to disk upon creation # and not stored and need not be retrieved pass else: sPath = self.stagePath(s) if not os.path.exists(sPath): stages[s] = None if s in {"normalized", "char", "word"}: self.empty = True continue if stageType == "image": stages[s] = cv2.imread(sPath) elif stageType == "data": with open(sPath) as f: stages[s] = self._ingest(s, stageType, stageExt, f)
def show(self, stage=None, band=None, mark=None, **displayParams)
-
Displays processing stages of an page.
See
STAGES
.Parameters
stage
:string | iterable
, optionalNone
- If no stage is passed, all stages are shown as thumbnails. Otherwise, the indicated stages are shown. If a string, it may be a comma-separated list of stage names. Otherwise it is an iterable of stage names.
band
:string | iterable
, optionalNone
- If no band is passed, no bands are indicated. Otherwise, the indicated bands are shown. If a string, it may be a comma-separated list of band names. Otherwise it is an iterable of band names.
mark
:string | iterable
, optionalNone
- If
None
is passed, no marks are shown. If""
is passed, all marks on the selected bands are shown. Otherwise, the indicated mark boxes are shown, irrespective of their bands: If given as a string, it may be a comma-separated list of mark names. Otherwise it is an iterable of mark names. This information will be taken from the result of themarkData
stage. display
:dict
, optional- A set of display parameters, such as
width
,height
(anything accepted byIPython.display.Image
).
Notes
The mark option works for the "boxed" stage: All marks not specified in the mark parameter will not be shown.
But this option also works for all other image stages: the marks will be displayed on a fresh copy of that stage.
When used for a grayscale stage, the color of the mark boxes is lost.
Expand source code Browse git
def show(self, stage=None, band=None, mark=None, **displayParams): """Displays processing stages of an page. See `fusus.parameters.STAGES`. Parameters ---------- stage: string | iterable, optional `None` If no stage is passed, all stages are shown as thumbnails. Otherwise, the indicated stages are shown. If a string, it may be a comma-separated list of stage names. Otherwise it is an iterable of stage names. band: string | iterable, optional `None` If no band is passed, no bands are indicated. Otherwise, the indicated bands are shown. If a string, it may be a comma-separated list of band names. Otherwise it is an iterable of band names. mark: string | iterable, optional `None` If `None` is passed, no marks are shown. If `""` is passed, all marks on the selected bands are shown. Otherwise, the indicated mark boxes are shown, irrespective of their bands: If given as a string, it may be a comma-separated list of mark names. Otherwise it is an iterable of mark names. This information will be taken from the result of the `markData` stage. display: dict, optional A set of display parameters, such as `width`, `height` (anything accepted by `IPython.display.Image`). Notes ----- The mark option works for the "boxed" stage: All marks not specified in the mark parameter will not be shown. But this option also works for all other image stages: the marks will be displayed on a fresh copy of that stage. When used for a grayscale stage, the color of the mark boxes is lost. """ engine = self.engine tm = engine.tm error = tm.error C = engine.C cd = engine.cd stages = self.stages marks = engine.marks blocks = self.blocks for s in parseStages(stage, set(stages), C.stageOrder, error): stageData = stages[s] (stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[s] white = C.whiteRGB if stageColor else C.whiteGRS if stageType == "data": display(HTML(f"<hr>\n<div><b>{s}</b>: <i>data:</i></div>")) self._serial(s, stageData, stageExt) elif stageType == "link": path = self.stagePath(s) if os.path.exists(path): showPath = unexpanduser(f"{cd}{path}") nbLink = getNbLink(showPath, stageData) if nbLink is None: display(HTML(f"{stageData} in {showPath}")) else: display( HTML( f"""<hr>\n<div><b>{s}</b>: {nbLink}""" f""" (local file: {showPath})</div>""" ) ) else: display( HTML( f"<hr>\n<div><b>{s}</b>: " f"<i>{showPath} does not exist.</i></div>" ) ) else: img = stageData headingInfo = [] if band is not None or mark is not None: img = ( stages["demarginedC"] if s == "boxed" and mark is not None else stageData ).copy() for ((stripe, block), data) in blocks.items(): bands = data["bands"] doBands = ( () if band is None else parseBands(band, set(bands), error) ) doBandSet = set(doBands) bandRep = f" with bands {', '.join(doBands)}" if doBands else "" doMarks = ( set() if mark is None else parseMarks(mark, marks, set(doBands), error) ) markRep = ( f" with marks {', '.join(sorted(doMarks))}" if doMarks else "" ) headingInfo.append( f"<b>{stripe}{block}</b> {bandRep}{markRep}" ) (leftB, topB, rightB, bottomB) = data["inner"] imBW = rightB - leftB for band in doBands: bandInfo = bands[band] lines = bandInfo["lines"] bColor = bandInfo["color"] for (up, lo) in lines: theUpper = topB + up theLower = topB + lo theLeft = leftB + 10 theRight = leftB + imBW - 10 cv2.rectangle( img, (theLeft, theUpper), (theRight, theLower), bColor, 2, ) overlay( img, leftB, theUpper, theLeft, theLower, white, bColor, ) overlay( img, theRight, theUpper, leftB + imBW, theLower, white, bColor, ) markData = stages.get("markData", {}) markLegend = {} for (band, bandMarks) in markData.items(): if doBands and band not in doBandSet: continue for ((seq, mrk), hits) in bandMarks.items(): if mrk not in doMarks: continue markKey = f"{'' if band == 'main' else band[0]}{seq}" markValue = (band, mrk, len(hits)) markLegend[markKey] = markValue for ( kept, value, connDegree, connectBorder, stripe, block, left, top, right, bottom, ) in hits: addBox( C, img, left, top, right, bottom, kept, band, seq, connDegree, ) if markLegend: html = [] html.append( "<details open><summary>Mark legend</summary><table>" ) html.append( "<tr><th>acro</th><th>band</th>" "<th>mark</th><th>hits</th></tr>" ) for (k, (b, m, n)) in sorted(markLegend.items()): html.append( f"<tr><td>{k}</td><td>{b}</td>" f"<td>{m}</td><td>{n}</td></tr>" ) html.append("</table></details>") display(HTML("".join(html))) display(HTML(f"<hr>\n<div><b>{s}</b> {';'.join(headingInfo)}</div>")) showImage(img, **displayParams)
def stagePath(self, stage, inter=None)
-
Expand source code Browse git
def stagePath(self, stage, inter=None): engine = self.engine C = engine.C bare = self.bare ext = self.ext (stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[stage] dest = getattr(C, stageDir or "interDir") inter = "" if inter is None else f"-{inter}" trail = stage if stagePart is None else "" if not stagePart else stagePart trail = "" if not trail else f"-{trail}" ext = DEFAULT_EXTENSION if stageType == "image" else (stageExt or ext) base = f"{dest}/{bare}{inter}{trail}" return f"{base}.{ext}"
def write(self, stage=None, perBlock=False)
-
Writes processing stages of an page to disk.
Parameters
stage
:string | iterable
, optionalNone
- If no stage is passed, all stages are shown as thumbnails. Otherwise, the indicated stages are shown. If a string, it may be a comma-separated list of stage names. Otherwise it is an iterable of stage names.
perBlock
:boolean
, optionalFalse
- If True, the stage output will be split into blocks and written to disk separately. The stripe and block are appended to the file name.
Returns
None
- The stages are written into the
inter
orclean
subdirectory, with the name of the stage appended to the file name. Ifclean
, the name of the stage is omitted.
Expand source code Browse git
def write(self, stage=None, perBlock=False): """Writes processing stages of an page to disk. Parameters ---------- stage: string | iterable, optional `None` If no stage is passed, all stages are shown as thumbnails. Otherwise, the indicated stages are shown. If a string, it may be a comma-separated list of stage names. Otherwise it is an iterable of stage names. perBlock: boolean, optional `False` If True, the stage output will be split into blocks and written to disk separately. The stripe and block are appended to the file name. Returns ------- None The stages are written into the `inter` or `clean` subdirectory, with the name of the stage appended to the file name. If `clean`, the name of the stage is omitted. """ engine = self.engine tm = engine.tm error = tm.error C = engine.C stages = self.stages blocks = self.blocks for s in parseStages(stage, set(C.stages), C.stageOrder, error): if s not in stages: continue stageData = stages[s] (stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[s] if stageType == "image": if perBlock: for ((stripe, block), data) in blocks.items(): blockSpec = f"{stripe:02d}{block}" (leftB, topB, rightB, bottomB) = data["inner"] roi = stageData[topB:bottomB, leftB:rightB] thisPath = self.stagePath(stage, inter=blockSpec) writeImage(roi, thisPath) else: writeImage(stageData, self.stagePath(s)) elif stageType == "data": with open(self.stagePath(s), "w") as f: self._serial(s, stageData, stageExt, handle=f) elif stageType == "link": # stages of type link will be written to disk upon creation # and not stored pass