Module fusus.book
Book workflow
We will transform scanned pages of a book into Unicode text following a number of processing steps.
The express way
In the terminal, cd
to a book directory (see below) and run
python3 -m fusus.book
This will process all scanned pages with default settings.
With more control and feedback
Copy the notebook example/do.ipynb
into a book directory (see below).
Run cells in the notebook, and see
doExample
to learn by example how you can configure the processing parameters
and control the processing of pages.
Book directory
A book directory should have subdirectories at the outset:
in
Contains image files (scans at 1800 x2700 pixels approximately);marks
(optional) Contains subdirectories with little rectangles copied from the scans and saved in individual files at the same resolution.
Marks
Marks are spots that will be wiped clean wherever they are found.
They are organized in bands which are sets of horizontal strokes on the page, relative to the individual lines.
Marks will only be searched for within the bands they belong to, in order to avoid false positives.
The marks
directory may contain the following bands:
name | kind | items | remarks |
---|---|---|---|
high |
marks | arbitrary images | in the upper band of a line |
low |
marks | arbitrary images | in the lower band of a line |
mid |
marks | arbitrary images | in the central, narrow band of a line, with lots of ink |
main() |
marks | arbitrary images | in the band where nearly all the letter material is |
broad |
marks | arbitrary images | as main() , but a bit broader |
inter |
marks | arbitrary images | between the lines |
When fusus reads the marks, it will crop all white borders from it and surround the result with a fixed small white border.
So you do not have to be very precise in trimming the mark templates.
After running the pipeline, the following subdirectories may have been produced:
inter
Intermediate files, such as page images with histograms displayed in it, or data files with information on the marks that have been encountered and wiped;clean
Cleaned page block images, input for OCR processing.out
Output (= final results). Tab separated files with one row per word.proof
Aids to assess the quality of the output. Tab separated files with one row per character. Normalized input images. Overlay HTML files with OCR results, coloured by confidence, both on character basis and on word basis.text
Plain HTML rendering of the full, recognized text with page and line indicators. Used for reading the results by human eyes.
!!! caution "Block information" If the layout algorithm has divided the page into blocks, the information of the blocks resides in the page object and is not currently stored on disk.
This information is needed after OCR to shift the coordinates with respect to
the blocks 9this is what comes out of the OCR) to coordinates with respect
to the page.
That means you cannot initialize the pipeline with the clean block images as sole
input. You have to start with layout detection.
Expand source code Browse git
"""Book workflow
We will transform scanned pages of a book into Unicode text
following a number of processing steps.
## The express way
In the terminal, `cd` to a book directory (see below) and run
```
python3 -m fusus.book
```
This will process all scanned pages with default settings.
## With more control and feedback
Copy the notebook `example/do.ipynb` into a book directory (see below).
Run cells in the notebook, and see
[doExample](https://github.com/among/fusus/blob/master/example/doExample.ipynb)
to learn by example how you can configure the processing parameters
and control the processing of pages.
## Book directory
A book directory should have subdirectories at the outset:
* `in`
Contains image files (scans at 1800 x2700 pixels approximately);
* `marks` (optional)
Contains subdirectories with little rectangles copied from the scans
and saved in individual files at the same resolution.
### Marks
Marks are spots that will be wiped clean wherever they are found.
They are organized in *bands* which are sets of horizontal strokes on the page,
relative to the individual lines.
Marks will only be searched for within the bands they belong to, in order to
avoid false positives.
The `marks` directory may contain the following bands:
name | kind | items | remarks
--- | --- | --- | ---
`high` | marks | arbitrary images | in the upper band of a line
`low` | marks | arbitrary images | in the lower band of a line
`mid` | marks | arbitrary images | in the central, narrow band of a line, with lots of ink
`main` | marks | arbitrary images | in the band where nearly all the letter material is
`broad` | marks | arbitrary images | as `main`, but a bit broader
`inter` | marks | arbitrary images | between the lines
When fusus reads the marks, it will crop all white borders from it and
surround the result with a fixed small white border.
So you do not have to be very precise in trimming the mark templates.
After running the pipeline, the following subdirectories may have been produced:
* `inter`
Intermediate files, such as page images with histograms displayed in it,
or data files with information on the marks that have been encountered and wiped;
* `clean`
Cleaned page block images, input for OCR processing.
* `out`
Output (= final results). Tab separated files with one row per word.
* `proof`
Aids to assess the quality of the output.
Tab separated files with one row per character.
Normalized input images. Overlay HTML files with OCR results,
coloured by confidence, both on character basis and on word basis.
* `text`
Plain HTML rendering of the full, recognized text with page and line
indicators. Used for reading the results by human eyes.
!!! caution "Block information" If the layout algorithm has
divided the page into blocks, the information of the blocks resides in the
page object and is not currently stored on disk.
This information is needed after OCR to shift the coordinates with respect to
the blocks 9this is what comes out of the OCR) to coordinates with respect
to the page.
That means you cannot initialize the pipeline with the clean block images as sole
input. You have to start with layout detection.
"""
import sys
import os
import collections
import cv2
from tf.core.timestamp import Timestamp
from tf.core.helpers import unexpanduser
from .parameters import Config, ALL_PAGES
from .lib import (
imageFileList,
imageFileListSub,
pagesRep,
select,
showImage,
splitext,
getNbPath,
getNbLink,
dh,
)
from .clean import reborder
from .page import Page
from .ocr import OCR, showConf, getProofColor
class Book:
def __init__(self, cd=None, **params):
"""Engine for book conversion.
Parameters
----------
cd: string, optional
If passed, performs a change directory to the directory specified.
Else the whole book processing takes place in the current directory.
You can use `~` to denote your home directory.
params: dict, optional
Any number of customizable settings from `fusus.parameters.SETTINGS`.
They will be in effect when running the workflow, until
a `Book.configure` action will modify them.
"""
if cd is None:
cd = ""
else:
cd = os.path.expanduser(cd)
os.chdir(cd)
if cd:
cd = f"{cd}/"
self.cd = cd
tm = Timestamp()
self.tm = tm
self.C = Config(tm, **params)
self._applySettings()
self.OCR = OCR(self)
def _applySettings(self):
"""After a settings update, recompute derived settings."""
C = self.C
whit = C.whiteGRS
markParams = C.markParams
tm = self.tm
error = tm.error
self.marks = {}
marks = self.marks
offsetBand = {band: offset for (band, offset) in C.offsetBand.items()}
self.offsetBand = offsetBand
files = imageFileListSub(C.marksDir)
seq = 0
for (band, images) in files.items():
for f in images:
tweakDict = {}
bare = splitext(f)[0]
parts = bare.rsplit("(", 1)
if len(parts) > 1:
bare = parts[0]
tweaks = parts[1][0:-1].split(",")
for tweak in tweaks:
if "=" not in tweak:
error(f"Malformed image parameter for {bare}: {tweak}")
continue
(k, v) = tweak.split("=", 1)
if k not in markParams:
error(f"Unknown image parameter for {bare}: {k} in {k}={v}")
continue
try:
tweakDict[k] = int(v) if k == "bw" else float(v)
except Exception:
error(f"Unknown image parameter for {bare}: {v} in {k}={v}")
full = f"{C.marksDir}/{band}/{f}"
image = cv2.imread(full)
gray = reborder(
cv2.cvtColor(image, cv2.COLOR_BGR2GRAY), 4, whit, crop=True
)
seq += 1
marks.setdefault(band, {})[bare] = dict(gray=gray, seq=seq)
dest = marks[band][bare]
for (k, kLong) in markParams.items():
dest[kLong] = tweakDict.get(k, getattr(C, kLong))
self.allPages = imageFileList(C.inDir)
self.allPagesDesc = pagesRep(self.allPages)
self.allPagesList = pagesRep(self.allPages, asList=True)
def info(self, msg):
tm = self.tm
info = tm.info
info(msg)
def warning(self, msg):
tm = self.tm
warning = tm.warning
warning(msg)
def error(self, msg):
tm = self.tm
error = tm.error
error(msg)
def configure(self, reset=False, **params):
"""Updates current settings based on new values.
The signature is the same as `fusus.parameters.Config.configure`.
"""
self.C.configure(reset=reset, **params)
self._applySettings()
def showSettings(self, params=None):
"""Display settings.
Parameters
----------
params: dict, optional
Any number of customizable settings from `fusus.parameters.SETTINGS`.
The current values of given parameters will be displayed.
The values that you give each of the `params` here is not used,
only their names. It is recommended to pass `None` as values:
`B.showSettings(blurX=None, blurY=None)`
"""
self.C.show(params=params)
def availableBands(self):
"""Display the characteristics of all defined *bands*."""
tm = self.tm
info = tm.info
info("Available bands and their offsets", tm=False)
for (band, offset) in sorted(self.offsetBand.items()):
bandRep = f"«{band}»"
info(
f"\t{bandRep:<10}: top={offset[0]:>4}, bottom={offset[1]:>4}", tm=False
)
def availableMarks(self, band=None, mark=None):
"""Display the characteristics of defined *marks*.
Parameters
----------
band: string, optional `None`
Show only marks in this band. If `None`, show marks in all bands.
mark: string, optional `None`
Show only marks in with this name. If `None`, show marks with any name.
"""
C = self.C
grey = C.greyGRS
tm = self.tm
info = tm.info
marks = self.marks
info("Marks and their settings", tm=False)
for (bnd, markItems) in sorted(marks.items()):
if band is not None and band != bnd:
continue
bandRep = f"[{bnd}]"
info(f"\tband {bandRep}", tm=False)
for (mrk, markInfo) in sorted(markItems.items()):
if mark is not None and mark != mrk:
continue
markRep = f"«{mrk}»"
seq = markInfo["seq"]
acc = markInfo["accuracy"]
bw = markInfo["connectBorder"]
r = markInfo["connectRatio"]
info(
f"\t\t{seq:>3}: {markRep:<20} acc={acc}, bw={bw}, r={r}",
tm=False,
)
markImage = reborder(markInfo["gray"], 2, grey)
showImage(markImage)
def availablePages(self):
"""Display the amount and page numbers of all pages."""
tm = self.tm
info = tm.info
allPages = self.allPages
pagesDesc = self.allPagesDesc
info(f"{len(allPages)} pages: {pagesDesc}")
def _doPage(
self,
f,
batch=False,
boxed=True,
quiet=False,
doOcr=True,
uptoLayout=False,
**kwargs,
):
"""Process a single page.
Executes all processing steps for a single page.
Parameters
----------
f: string
The file name of the scanned page with extension, without directory
batch: boolean, optional `False`
Whether to run in batch mode.
In batch mode everything is geared to the final output.
Less intermediate results are computed and stored.
Less feedback happens on the console.
boxed: boolean, optional `True`
If in batch mode, produce also images that display the cleaned marks
in boxes.
quiet: boolean, optional `False`
Whether to suppress warnings and the display of stroke separators.
doOcr: boolean, optional `True`
Whether to perform OCR processing
uptoLayout: boolean, optional `False`
Whether to stop after doing layout
Returns
-------
A `fusus.page.Page` object, which is the handle for further
inspection of what has happened during processing.
"""
tm = self.tm
info = tm.info
indent = tm.indent
if quiet:
tm.silentOn(deep=True)
else:
tm.silentOff()
# baseLevel = 1 if batch else 0
baseLevel = 1
subLevel = baseLevel + 1
indent(level=baseLevel, reset=True)
bare = splitext(f)[0]
if not batch:
info(f"Processing {bare}")
page = Page(self, f, batch=batch, boxed=boxed, **kwargs)
if batch or not page.empty:
if not batch:
indent(level=subLevel, reset=True)
info("normalizing")
page.doNormalize()
if page.empty:
return page
if not batch:
info("layout")
page.doLayout()
if not uptoLayout:
if not batch:
info("cleaning")
page.cleaning(showKept=not batch or boxed)
if not page.empty and doOcr:
if not batch:
info("ocr")
page.ocring()
tm.silentOff()
return page
def process(
self,
pages=None,
batch=True,
quiet=True,
boxed=False,
doOcr=True,
uptoLayout=False,
**kwargs,
):
"""Process directory of images.
Executes all processing steps for all images.
Parameters
----------
pages: string | int, optional `None`
Specification of pages to do. If absent or `None`: all pages.
If an int, do only that page.
Otherwise it must be a comma separated string of (ranges of) page numbers.
Half ranges are also allowed: `-10` (from beginning up to and including `10`)
and `10-` (from 10 till end).
E.g. `1` and `5-7` and `2-5,8-10`, and `-10,15-20,30-`.
No spaces allowed.
batch: boolean, optional `True`
Whether to run in batch mode.
In batch mode everything is geared to the final output.
Less intermediate results are computed and stored.
Less feedback happens on the console.
boxed: boolean, optional `False`
If in batch mode, produce also images that display the cleaned marks
in boxes.
quiet: boolean, optional `True`
Whether to suppress warnings and the display of stroke separators.
doOcr: boolean, optional `True`
Whether to perform OCR processing
uptoLayout: boolean, optional `False`
Whether to stop after doing layout
Returns
-------
A `fusus.page.Page` object for the last page processed,
which is the handle for further
inspection of what has happened during processing.
"""
tm = self.tm
info = tm.info
indent = tm.indent
allPages = self.allPages
tm.silentOff()
indent(reset=True)
C = self.C
interDir = C.interDir
outDir = C.outDir
cleanDir = C.cleanDir
proofDir = C.proofDir
htmlDir = C.htmlDir
for d in (interDir, outDir, cleanDir, proofDir, htmlDir):
if not os.path.exists(d):
os.makedirs(d, exist_ok=True)
imageFiles = select(allPages, pages)
pagesDesc = pagesRep(imageFiles)
info(f"Batch of {len(imageFiles)} pages: {pagesDesc}")
info("Start batch processing images")
page = None
for (i, imFile) in enumerate(sorted(imageFiles)):
indent(level=1, reset=True)
msg = f"{i + 1:>5} {imFile:<40}"
info(f"{msg}\r", nl=False)
page = self._doPage(
imFile,
batch=batch,
boxed=boxed,
quiet=quiet,
doOcr=doOcr,
uptoLayout=uptoLayout,
**kwargs,
)
if not page.empty:
page.write(stage="normalized,histogram,clean", perBlock=False)
if uptoLayout:
info(f"{msg}")
else:
if not batch:
page.write(stage="markData")
if boxed:
page.write(stage="boxed")
info(f"{msg}")
indent(level=0)
info("all done")
return page # the last page processed
def stageDir(self, stage):
C = self.C
(stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[stage]
trail = stage if stagePart is None else "" if not stagePart else stagePart
trail = "" if not trail else f"-{trail}"
return (getattr(C, stageDir or "interDir"), trail, stageExt)
def measureQuality(self, pages=None, showStats=True, updateProofs=False):
"""Measure the reported quality of the ocr processing.
pages: string | int, optional `None`
Specification of pages to do. If absent or `None`: all pages.
If an int, do only that page.
Otherwise it must be a comma separated string of (ranges of) page numbers.
Half ranges are also allowed: `-10` (from beginning up to and including `10`)
and `10-` (from 10 till end).
E.g. `1` and `5-7` and `2-5,8-10`, and `-10,15-20,30-`.
No spaces allowed.
showStats: boolean, optional `True`
Compute and show quality statistics
updateProofs: boolean, optional `False`
If true, regenerate all proofing pages.
This is desriable if you have tweaked the coloring of OCR results
depending on the confidence.
The OCR itself does not have to be performed again for this.
"""
tm = self.tm
info = tm.info
indent = tm.indent
cd = self.cd
allPages = self.allPages
imageFiles = select(allPages, pages)
pagesDesc = pagesRep(imageFiles)
info(f"Batch of {len(imageFiles)} pages: {pagesDesc}")
info("Start measuring ocr quality of these images")
if updateProofs:
info(" end regenrating proof files")
page = None
results = dict(char=[], word=[])
resultsChar = collections.defaultdict(list)
for (i, imFile) in enumerate(sorted(imageFiles)):
indent(level=1, reset=True)
msg = f"{i + 1:>5} {imFile:<40}"
info(f"{msg}\r", nl=False)
page = Page(self, imFile, minimal=True)
page.read(stage=("normalized,line," if updateProofs else "") + "word,char")
if page.empty:
continue
if updateProofs:
page.proofing()
if not showStats:
continue
stages = page.stages
pg = page.bare
pageRep = f"p{pg}"
for stage in ("word", "char"):
proofStage = f"proof{stage}"
thisPageRep = f"""<a href="{page.stagePath(proofStage)}">p{pg}</a>"""
isCharStage = stage == "char"
confI = -2 if isCharStage else -3
n = 0
totC = 0
(minC, maxC) = (100, 0)
for fields in stages[stage]:
conf = int(fields[confI])
totC += conf
if conf < minC:
minC = conf
if conf > maxC:
maxC = conf
n += 1
if isCharStage:
c = fields[-1]
resultsChar[c].append((pageRep, conf))
if n > 0:
results[stage].append((thisPageRep, n, minC, maxC, totC, ""))
if not showStats:
indent(level=0)
info("all done")
return
for stage in ("word", "char"):
stageResults = results[stage]
if not len(stageResults):
continue
grandN = sum(r[1] for r in stageResults)
grandMin = min(r[2] for r in stageResults)
grandMax = max(r[3] for r in stageResults)
grandTot = sum(r[4] for r in stageResults)
toShow = [
("overall", grandN, grandMin, grandMax, grandTot, "")
] + stageResults
info(f"{stage}-confidences of OCR results for {len(stageResults)} pages")
showConf(stage, toShow)
info(f"by-char-confidences of OCR results for {len(resultsChar)} characters")
resultsCollected = []
(sDir, sTrail, sExt) = self.stageDir("proofchar")
showPath = unexpanduser(f"{cd}{sDir}")
(isLink, nbLink) = getNbPath(showPath)
for c in sorted(resultsChar):
occs = sorted(resultsChar[c], key=lambda x: x[1])
elem = "a" if isLink else "span"
href = nbLink if isLink else None
worstExamples = []
for x in occs[0:20]:
href = (
f'''href="{nbLink}/{x[0][1:]}{sTrail}.{sExt}"''' if isLink else ""
)
worstExamples.append(
f"""
<{elem}
style="background-color: {getProofColor(x[1])};"
title="{showPath}/{x[0][1:]}{sTrail}.{sExt}"
{href}
>{x[0]}</{elem}>
"""
)
worstExamples = " ".join(worstExamples)
nOccs = len(occs)
minC = min(r[1] for r in occs)
maxC = max(r[1] for r in occs)
totC = sum(r[1] for r in occs)
resultsCollected.append((f"⌊{c}⌋", nOccs, minC, maxC, totC, worstExamples))
showConf(stage, resultsCollected, label="worst results")
indent(level=0)
info("all done")
def exportTsv(self, pages=None):
"""Combine the tsv data per page to one big tsv file.
pages: string | int, optional `None`
Specification of pages to do. If absent or `None`: all pages.
If an int, do only that page.
Otherwise it must be a comma separated string of (ranges of) page numbers.
Half ranges are also allowed: `-10`
(from beginning up to and including `10`)
and `10-` (from 10 till end).
E.g. `1` and `5-7` and `2-5,8-10`, and `-10,15-20,30-`.
No spaces allowed.
The output is written to the working directory.
"""
tm = self.tm
info = tm.info
allPages = self.allPages
imageFiles = select(allPages, pages)
pagesFile = ALL_PAGES if pages is None else pagesRep(imageFiles)
pagesDesc = pagesRep(imageFiles)
info(f"Batch of {len(imageFiles)} pages: {pagesDesc}")
info("Start producing single TSV file of these pages")
path = f"{pagesFile}.tsv"
first = True
f = None
for (i, imFile) in enumerate(sorted(imageFiles)):
page = Page(self, imFile, minimal=True)
stages = page.stages
stage = "word"
if first:
headers = page.dataHeaders.get(stage, None)
header = "\t".join(str(head) for head in headers)
f = open(path, "w")
f.write(f"{header}\n")
first = False
pageNum = int(page.bare.lstrip("0") or "0")
page.read(stage=stage)
data = stages[stage]
if not data:
continue
for fields in stages[stage]:
f.write(f"{pageNum}\t" + "\t".join(str(f) for f in fields) + "\n")
if f:
info(f"written to {path}")
f.close()
else:
info("Nothing written")
def htmlPages(self, pages=None):
"""Get the text in html from the ocr output in one file
pages: string | int, optional `None`
Specification of pages to do. If absent or `None`: all pages.
If an int, do only that page.
Otherwise it must be a comma separated string of (ranges of) page numbers.
Half ranges are also allowed: `-10` (from beginning up to and including `10`)
and `10-` (from 10 till end).
E.g. `1` and `5-7` and `2-5,8-10`, and `-10,15-20,30-`.
No spaces allowed.
The output is written to the `text` subdirectory.
"""
tm = self.tm
info = tm.info
indent = tm.indent
C = self.C
cd = self.cd
htmlDir = C.htmlDir
if not os.path.exists(htmlDir):
os.makedirs(htmlDir, exist_ok=True)
allPages = self.allPages
imageFiles = select(allPages, pages)
pagesDesc = pagesRep(imageFiles)
info(f"Batch of {len(imageFiles)} pages: {pagesDesc}")
info("Start producing plain text of these pages")
page = None
fileName = f"{pagesDesc}.html"
path = f"{htmlDir}/{fileName}"
doc = """\
<html>
<head>
<meta charset="utf-8"/>
<style>
body {
font-size: x-large;
text-align: right;
direction: rtl;
}
div.page {
text-align: right;
}
div.stripe {
display: flex;
flex-flow: row nowrap;
}
div.c, div.cl, div.cr {
text-align: right;
}
h3 {
text-align: right;
}
span.ln {
font-style: italic;
font-size: small;
vertical-align: super;
text-align: right;
}
</style>
</head>
«body»
</body>
</html>
"""
body = []
for (i, imFile) in enumerate(sorted(imageFiles)):
pageMaterial = []
indent(level=1, reset=True)
msg = f"{i + 1:>5} {imFile:<40}"
info(f"{msg}\r", nl=False)
page = Page(self, imFile, minimal=True)
page.read(stage="word")
pg = page.bare.lstrip("0")
if pg == "":
pg = "0"
pg = int(pg)
pageRep = f"p{pg:>03}"
pageMaterial.append(f"""<div page="{pageRep}"><h3>{pg}</h3>""")
if page.empty:
pageMaterial.append("""</div>""")
body.append("\n".join(pageMaterial))
continue
stages = page.stages
stage = "word"
(prevStripe, prevBlock, prevLine) = (None, None, None)
stripeMaterial = []
blockMaterial = []
lineMaterial = []
for fields in stages[stage]:
(stripe, block, line) = fields[0:3]
if stripe != prevStripe:
if prevStripe is not None:
stripeMaterial.append("</div>")
pageMaterial.append("\n".join(stripeMaterial))
stripeMaterial = []
stripeMaterial.append(f"""<div class="stripe" stripe="{stripe}">""")
prevBlock = None
if block != prevBlock:
if prevBlock is not None:
blockMaterial.append("</div>")
stripeMaterial.append("\n".join(blockMaterial))
blockMaterial = []
blockMaterial.append(f"""<div class="c{block}">""")
prevLine = None
if line != prevLine:
if prevLine is not None:
lineMaterial.append("</div>")
blockMaterial.append(" ".join(lineMaterial))
lineMaterial = []
lineMaterial.append(
f"""<div line="{line}"><span class="ln">{line}</span>"""
)
(prevStripe, prevBlock, prevLine) = (stripe, block, line)
word = fields[-2]
punc = fields[-1]
lineMaterial.append(f"{word}{punc}")
blockMaterial.append(" ".join(lineMaterial))
stripeMaterial.append("\n".join(blockMaterial))
pageMaterial.append("\n".join(stripeMaterial))
pageMaterial.append("</div>")
body.append("\n".join(pageMaterial))
indent(level=0)
with open(path, "w") as f:
f.write(doc.replace("«body»", "\n".join(body)))
info(f"written to {path}")
showPath = unexpanduser(f"{cd}{path}")
nbLink = getNbLink(showPath, fileName)
if nbLink is None:
dh(showPath)
else:
dh(f"""{nbLink} (local file: {showPath})""")
def main():
"""Process a whole book with default settings.
Go to the book directory and say
```
python3 -m fusus.book [pages]
```
where `pages` is an optional string specifying ranges
of pages as in `Book.process`
"""
pages = None
if len(sys.argv) > 1:
pages = sys.argv[1]
B = Book()
B.process(pages=pages)
if __name__ == "__main__":
main()
Functions
def main()
-
Process a whole book with default settings.
Go to the book directory and say
python3 -m fusus.book [pages]
where
pages
is an optional string specifying ranges of pages as inBook.process()
Expand source code Browse git
def main(): """Process a whole book with default settings. Go to the book directory and say ``` python3 -m fusus.book [pages] ``` where `pages` is an optional string specifying ranges of pages as in `Book.process` """ pages = None if len(sys.argv) > 1: pages = sys.argv[1] B = Book() B.process(pages=pages)
Classes
class Book (cd=None, **params)
-
Engine for book conversion.
Parameters
cd
:string
, optional- If passed, performs a change directory to the directory specified.
Else the whole book processing takes place in the current directory.
You can use
~
to denote your home directory. params
:dict
, optional-
Any number of customizable settings from
SETTINGS
.They will be in effect when running the workflow, until a
Book.configure()
action will modify them.
Expand source code Browse git
class Book: def __init__(self, cd=None, **params): """Engine for book conversion. Parameters ---------- cd: string, optional If passed, performs a change directory to the directory specified. Else the whole book processing takes place in the current directory. You can use `~` to denote your home directory. params: dict, optional Any number of customizable settings from `fusus.parameters.SETTINGS`. They will be in effect when running the workflow, until a `Book.configure` action will modify them. """ if cd is None: cd = "" else: cd = os.path.expanduser(cd) os.chdir(cd) if cd: cd = f"{cd}/" self.cd = cd tm = Timestamp() self.tm = tm self.C = Config(tm, **params) self._applySettings() self.OCR = OCR(self) def _applySettings(self): """After a settings update, recompute derived settings.""" C = self.C whit = C.whiteGRS markParams = C.markParams tm = self.tm error = tm.error self.marks = {} marks = self.marks offsetBand = {band: offset for (band, offset) in C.offsetBand.items()} self.offsetBand = offsetBand files = imageFileListSub(C.marksDir) seq = 0 for (band, images) in files.items(): for f in images: tweakDict = {} bare = splitext(f)[0] parts = bare.rsplit("(", 1) if len(parts) > 1: bare = parts[0] tweaks = parts[1][0:-1].split(",") for tweak in tweaks: if "=" not in tweak: error(f"Malformed image parameter for {bare}: {tweak}") continue (k, v) = tweak.split("=", 1) if k not in markParams: error(f"Unknown image parameter for {bare}: {k} in {k}={v}") continue try: tweakDict[k] = int(v) if k == "bw" else float(v) except Exception: error(f"Unknown image parameter for {bare}: {v} in {k}={v}") full = f"{C.marksDir}/{band}/{f}" image = cv2.imread(full) gray = reborder( cv2.cvtColor(image, cv2.COLOR_BGR2GRAY), 4, whit, crop=True ) seq += 1 marks.setdefault(band, {})[bare] = dict(gray=gray, seq=seq) dest = marks[band][bare] for (k, kLong) in markParams.items(): dest[kLong] = tweakDict.get(k, getattr(C, kLong)) self.allPages = imageFileList(C.inDir) self.allPagesDesc = pagesRep(self.allPages) self.allPagesList = pagesRep(self.allPages, asList=True) def info(self, msg): tm = self.tm info = tm.info info(msg) def warning(self, msg): tm = self.tm warning = tm.warning warning(msg) def error(self, msg): tm = self.tm error = tm.error error(msg) def configure(self, reset=False, **params): """Updates current settings based on new values. The signature is the same as `fusus.parameters.Config.configure`. """ self.C.configure(reset=reset, **params) self._applySettings() def showSettings(self, params=None): """Display settings. Parameters ---------- params: dict, optional Any number of customizable settings from `fusus.parameters.SETTINGS`. The current values of given parameters will be displayed. The values that you give each of the `params` here is not used, only their names. It is recommended to pass `None` as values: `B.showSettings(blurX=None, blurY=None)` """ self.C.show(params=params) def availableBands(self): """Display the characteristics of all defined *bands*.""" tm = self.tm info = tm.info info("Available bands and their offsets", tm=False) for (band, offset) in sorted(self.offsetBand.items()): bandRep = f"«{band}»" info( f"\t{bandRep:<10}: top={offset[0]:>4}, bottom={offset[1]:>4}", tm=False ) def availableMarks(self, band=None, mark=None): """Display the characteristics of defined *marks*. Parameters ---------- band: string, optional `None` Show only marks in this band. If `None`, show marks in all bands. mark: string, optional `None` Show only marks in with this name. If `None`, show marks with any name. """ C = self.C grey = C.greyGRS tm = self.tm info = tm.info marks = self.marks info("Marks and their settings", tm=False) for (bnd, markItems) in sorted(marks.items()): if band is not None and band != bnd: continue bandRep = f"[{bnd}]" info(f"\tband {bandRep}", tm=False) for (mrk, markInfo) in sorted(markItems.items()): if mark is not None and mark != mrk: continue markRep = f"«{mrk}»" seq = markInfo["seq"] acc = markInfo["accuracy"] bw = markInfo["connectBorder"] r = markInfo["connectRatio"] info( f"\t\t{seq:>3}: {markRep:<20} acc={acc}, bw={bw}, r={r}", tm=False, ) markImage = reborder(markInfo["gray"], 2, grey) showImage(markImage) def availablePages(self): """Display the amount and page numbers of all pages.""" tm = self.tm info = tm.info allPages = self.allPages pagesDesc = self.allPagesDesc info(f"{len(allPages)} pages: {pagesDesc}") def _doPage( self, f, batch=False, boxed=True, quiet=False, doOcr=True, uptoLayout=False, **kwargs, ): """Process a single page. Executes all processing steps for a single page. Parameters ---------- f: string The file name of the scanned page with extension, without directory batch: boolean, optional `False` Whether to run in batch mode. In batch mode everything is geared to the final output. Less intermediate results are computed and stored. Less feedback happens on the console. boxed: boolean, optional `True` If in batch mode, produce also images that display the cleaned marks in boxes. quiet: boolean, optional `False` Whether to suppress warnings and the display of stroke separators. doOcr: boolean, optional `True` Whether to perform OCR processing uptoLayout: boolean, optional `False` Whether to stop after doing layout Returns ------- A `fusus.page.Page` object, which is the handle for further inspection of what has happened during processing. """ tm = self.tm info = tm.info indent = tm.indent if quiet: tm.silentOn(deep=True) else: tm.silentOff() # baseLevel = 1 if batch else 0 baseLevel = 1 subLevel = baseLevel + 1 indent(level=baseLevel, reset=True) bare = splitext(f)[0] if not batch: info(f"Processing {bare}") page = Page(self, f, batch=batch, boxed=boxed, **kwargs) if batch or not page.empty: if not batch: indent(level=subLevel, reset=True) info("normalizing") page.doNormalize() if page.empty: return page if not batch: info("layout") page.doLayout() if not uptoLayout: if not batch: info("cleaning") page.cleaning(showKept=not batch or boxed) if not page.empty and doOcr: if not batch: info("ocr") page.ocring() tm.silentOff() return page def process( self, pages=None, batch=True, quiet=True, boxed=False, doOcr=True, uptoLayout=False, **kwargs, ): """Process directory of images. Executes all processing steps for all images. Parameters ---------- pages: string | int, optional `None` Specification of pages to do. If absent or `None`: all pages. If an int, do only that page. Otherwise it must be a comma separated string of (ranges of) page numbers. Half ranges are also allowed: `-10` (from beginning up to and including `10`) and `10-` (from 10 till end). E.g. `1` and `5-7` and `2-5,8-10`, and `-10,15-20,30-`. No spaces allowed. batch: boolean, optional `True` Whether to run in batch mode. In batch mode everything is geared to the final output. Less intermediate results are computed and stored. Less feedback happens on the console. boxed: boolean, optional `False` If in batch mode, produce also images that display the cleaned marks in boxes. quiet: boolean, optional `True` Whether to suppress warnings and the display of stroke separators. doOcr: boolean, optional `True` Whether to perform OCR processing uptoLayout: boolean, optional `False` Whether to stop after doing layout Returns ------- A `fusus.page.Page` object for the last page processed, which is the handle for further inspection of what has happened during processing. """ tm = self.tm info = tm.info indent = tm.indent allPages = self.allPages tm.silentOff() indent(reset=True) C = self.C interDir = C.interDir outDir = C.outDir cleanDir = C.cleanDir proofDir = C.proofDir htmlDir = C.htmlDir for d in (interDir, outDir, cleanDir, proofDir, htmlDir): if not os.path.exists(d): os.makedirs(d, exist_ok=True) imageFiles = select(allPages, pages) pagesDesc = pagesRep(imageFiles) info(f"Batch of {len(imageFiles)} pages: {pagesDesc}") info("Start batch processing images") page = None for (i, imFile) in enumerate(sorted(imageFiles)): indent(level=1, reset=True) msg = f"{i + 1:>5} {imFile:<40}" info(f"{msg}\r", nl=False) page = self._doPage( imFile, batch=batch, boxed=boxed, quiet=quiet, doOcr=doOcr, uptoLayout=uptoLayout, **kwargs, ) if not page.empty: page.write(stage="normalized,histogram,clean", perBlock=False) if uptoLayout: info(f"{msg}") else: if not batch: page.write(stage="markData") if boxed: page.write(stage="boxed") info(f"{msg}") indent(level=0) info("all done") return page # the last page processed def stageDir(self, stage): C = self.C (stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[stage] trail = stage if stagePart is None else "" if not stagePart else stagePart trail = "" if not trail else f"-{trail}" return (getattr(C, stageDir or "interDir"), trail, stageExt) def measureQuality(self, pages=None, showStats=True, updateProofs=False): """Measure the reported quality of the ocr processing. pages: string | int, optional `None` Specification of pages to do. If absent or `None`: all pages. If an int, do only that page. Otherwise it must be a comma separated string of (ranges of) page numbers. Half ranges are also allowed: `-10` (from beginning up to and including `10`) and `10-` (from 10 till end). E.g. `1` and `5-7` and `2-5,8-10`, and `-10,15-20,30-`. No spaces allowed. showStats: boolean, optional `True` Compute and show quality statistics updateProofs: boolean, optional `False` If true, regenerate all proofing pages. This is desriable if you have tweaked the coloring of OCR results depending on the confidence. The OCR itself does not have to be performed again for this. """ tm = self.tm info = tm.info indent = tm.indent cd = self.cd allPages = self.allPages imageFiles = select(allPages, pages) pagesDesc = pagesRep(imageFiles) info(f"Batch of {len(imageFiles)} pages: {pagesDesc}") info("Start measuring ocr quality of these images") if updateProofs: info(" end regenrating proof files") page = None results = dict(char=[], word=[]) resultsChar = collections.defaultdict(list) for (i, imFile) in enumerate(sorted(imageFiles)): indent(level=1, reset=True) msg = f"{i + 1:>5} {imFile:<40}" info(f"{msg}\r", nl=False) page = Page(self, imFile, minimal=True) page.read(stage=("normalized,line," if updateProofs else "") + "word,char") if page.empty: continue if updateProofs: page.proofing() if not showStats: continue stages = page.stages pg = page.bare pageRep = f"p{pg}" for stage in ("word", "char"): proofStage = f"proof{stage}" thisPageRep = f"""<a href="{page.stagePath(proofStage)}">p{pg}</a>""" isCharStage = stage == "char" confI = -2 if isCharStage else -3 n = 0 totC = 0 (minC, maxC) = (100, 0) for fields in stages[stage]: conf = int(fields[confI]) totC += conf if conf < minC: minC = conf if conf > maxC: maxC = conf n += 1 if isCharStage: c = fields[-1] resultsChar[c].append((pageRep, conf)) if n > 0: results[stage].append((thisPageRep, n, minC, maxC, totC, "")) if not showStats: indent(level=0) info("all done") return for stage in ("word", "char"): stageResults = results[stage] if not len(stageResults): continue grandN = sum(r[1] for r in stageResults) grandMin = min(r[2] for r in stageResults) grandMax = max(r[3] for r in stageResults) grandTot = sum(r[4] for r in stageResults) toShow = [ ("overall", grandN, grandMin, grandMax, grandTot, "") ] + stageResults info(f"{stage}-confidences of OCR results for {len(stageResults)} pages") showConf(stage, toShow) info(f"by-char-confidences of OCR results for {len(resultsChar)} characters") resultsCollected = [] (sDir, sTrail, sExt) = self.stageDir("proofchar") showPath = unexpanduser(f"{cd}{sDir}") (isLink, nbLink) = getNbPath(showPath) for c in sorted(resultsChar): occs = sorted(resultsChar[c], key=lambda x: x[1]) elem = "a" if isLink else "span" href = nbLink if isLink else None worstExamples = [] for x in occs[0:20]: href = ( f'''href="{nbLink}/{x[0][1:]}{sTrail}.{sExt}"''' if isLink else "" ) worstExamples.append( f""" <{elem} style="background-color: {getProofColor(x[1])};" title="{showPath}/{x[0][1:]}{sTrail}.{sExt}" {href} >{x[0]}</{elem}> """ ) worstExamples = " ".join(worstExamples) nOccs = len(occs) minC = min(r[1] for r in occs) maxC = max(r[1] for r in occs) totC = sum(r[1] for r in occs) resultsCollected.append((f"⌊{c}⌋", nOccs, minC, maxC, totC, worstExamples)) showConf(stage, resultsCollected, label="worst results") indent(level=0) info("all done") def exportTsv(self, pages=None): """Combine the tsv data per page to one big tsv file. pages: string | int, optional `None` Specification of pages to do. If absent or `None`: all pages. If an int, do only that page. Otherwise it must be a comma separated string of (ranges of) page numbers. Half ranges are also allowed: `-10` (from beginning up to and including `10`) and `10-` (from 10 till end). E.g. `1` and `5-7` and `2-5,8-10`, and `-10,15-20,30-`. No spaces allowed. The output is written to the working directory. """ tm = self.tm info = tm.info allPages = self.allPages imageFiles = select(allPages, pages) pagesFile = ALL_PAGES if pages is None else pagesRep(imageFiles) pagesDesc = pagesRep(imageFiles) info(f"Batch of {len(imageFiles)} pages: {pagesDesc}") info("Start producing single TSV file of these pages") path = f"{pagesFile}.tsv" first = True f = None for (i, imFile) in enumerate(sorted(imageFiles)): page = Page(self, imFile, minimal=True) stages = page.stages stage = "word" if first: headers = page.dataHeaders.get(stage, None) header = "\t".join(str(head) for head in headers) f = open(path, "w") f.write(f"{header}\n") first = False pageNum = int(page.bare.lstrip("0") or "0") page.read(stage=stage) data = stages[stage] if not data: continue for fields in stages[stage]: f.write(f"{pageNum}\t" + "\t".join(str(f) for f in fields) + "\n") if f: info(f"written to {path}") f.close() else: info("Nothing written") def htmlPages(self, pages=None): """Get the text in html from the ocr output in one file pages: string | int, optional `None` Specification of pages to do. If absent or `None`: all pages. If an int, do only that page. Otherwise it must be a comma separated string of (ranges of) page numbers. Half ranges are also allowed: `-10` (from beginning up to and including `10`) and `10-` (from 10 till end). E.g. `1` and `5-7` and `2-5,8-10`, and `-10,15-20,30-`. No spaces allowed. The output is written to the `text` subdirectory. """ tm = self.tm info = tm.info indent = tm.indent C = self.C cd = self.cd htmlDir = C.htmlDir if not os.path.exists(htmlDir): os.makedirs(htmlDir, exist_ok=True) allPages = self.allPages imageFiles = select(allPages, pages) pagesDesc = pagesRep(imageFiles) info(f"Batch of {len(imageFiles)} pages: {pagesDesc}") info("Start producing plain text of these pages") page = None fileName = f"{pagesDesc}.html" path = f"{htmlDir}/{fileName}" doc = """\ <html> <head> <meta charset="utf-8"/> <style> body { font-size: x-large; text-align: right; direction: rtl; } div.page { text-align: right; } div.stripe { display: flex; flex-flow: row nowrap; } div.c, div.cl, div.cr { text-align: right; } h3 { text-align: right; } span.ln { font-style: italic; font-size: small; vertical-align: super; text-align: right; } </style> </head> «body» </body> </html> """ body = [] for (i, imFile) in enumerate(sorted(imageFiles)): pageMaterial = [] indent(level=1, reset=True) msg = f"{i + 1:>5} {imFile:<40}" info(f"{msg}\r", nl=False) page = Page(self, imFile, minimal=True) page.read(stage="word") pg = page.bare.lstrip("0") if pg == "": pg = "0" pg = int(pg) pageRep = f"p{pg:>03}" pageMaterial.append(f"""<div page="{pageRep}"><h3>{pg}</h3>""") if page.empty: pageMaterial.append("""</div>""") body.append("\n".join(pageMaterial)) continue stages = page.stages stage = "word" (prevStripe, prevBlock, prevLine) = (None, None, None) stripeMaterial = [] blockMaterial = [] lineMaterial = [] for fields in stages[stage]: (stripe, block, line) = fields[0:3] if stripe != prevStripe: if prevStripe is not None: stripeMaterial.append("</div>") pageMaterial.append("\n".join(stripeMaterial)) stripeMaterial = [] stripeMaterial.append(f"""<div class="stripe" stripe="{stripe}">""") prevBlock = None if block != prevBlock: if prevBlock is not None: blockMaterial.append("</div>") stripeMaterial.append("\n".join(blockMaterial)) blockMaterial = [] blockMaterial.append(f"""<div class="c{block}">""") prevLine = None if line != prevLine: if prevLine is not None: lineMaterial.append("</div>") blockMaterial.append(" ".join(lineMaterial)) lineMaterial = [] lineMaterial.append( f"""<div line="{line}"><span class="ln">{line}</span>""" ) (prevStripe, prevBlock, prevLine) = (stripe, block, line) word = fields[-2] punc = fields[-1] lineMaterial.append(f"{word}{punc}") blockMaterial.append(" ".join(lineMaterial)) stripeMaterial.append("\n".join(blockMaterial)) pageMaterial.append("\n".join(stripeMaterial)) pageMaterial.append("</div>") body.append("\n".join(pageMaterial)) indent(level=0) with open(path, "w") as f: f.write(doc.replace("«body»", "\n".join(body))) info(f"written to {path}") showPath = unexpanduser(f"{cd}{path}") nbLink = getNbLink(showPath, fileName) if nbLink is None: dh(showPath) else: dh(f"""{nbLink} (local file: {showPath})""")
Methods
def availableBands(self)
-
Display the characteristics of all defined bands.
Expand source code Browse git
def availableBands(self): """Display the characteristics of all defined *bands*.""" tm = self.tm info = tm.info info("Available bands and their offsets", tm=False) for (band, offset) in sorted(self.offsetBand.items()): bandRep = f"«{band}»" info( f"\t{bandRep:<10}: top={offset[0]:>4}, bottom={offset[1]:>4}", tm=False )
def availableMarks(self, band=None, mark=None)
-
Display the characteristics of defined marks.
Parameters
band
:string
, optionalNone
- Show only marks in this band. If
None
, show marks in all bands. mark
:string
, optionalNone
- Show only marks in with this name. If
None
, show marks with any name.
Expand source code Browse git
def availableMarks(self, band=None, mark=None): """Display the characteristics of defined *marks*. Parameters ---------- band: string, optional `None` Show only marks in this band. If `None`, show marks in all bands. mark: string, optional `None` Show only marks in with this name. If `None`, show marks with any name. """ C = self.C grey = C.greyGRS tm = self.tm info = tm.info marks = self.marks info("Marks and their settings", tm=False) for (bnd, markItems) in sorted(marks.items()): if band is not None and band != bnd: continue bandRep = f"[{bnd}]" info(f"\tband {bandRep}", tm=False) for (mrk, markInfo) in sorted(markItems.items()): if mark is not None and mark != mrk: continue markRep = f"«{mrk}»" seq = markInfo["seq"] acc = markInfo["accuracy"] bw = markInfo["connectBorder"] r = markInfo["connectRatio"] info( f"\t\t{seq:>3}: {markRep:<20} acc={acc}, bw={bw}, r={r}", tm=False, ) markImage = reborder(markInfo["gray"], 2, grey) showImage(markImage)
def availablePages(self)
-
Display the amount and page numbers of all pages.
Expand source code Browse git
def availablePages(self): """Display the amount and page numbers of all pages.""" tm = self.tm info = tm.info allPages = self.allPages pagesDesc = self.allPagesDesc info(f"{len(allPages)} pages: {pagesDesc}")
def configure(self, reset=False, **params)
-
Updates current settings based on new values.
The signature is the same as
Config.configure()
.Expand source code Browse git
def configure(self, reset=False, **params): """Updates current settings based on new values. The signature is the same as `fusus.parameters.Config.configure`. """ self.C.configure(reset=reset, **params) self._applySettings()
def error(self, msg)
-
Expand source code Browse git
def error(self, msg): tm = self.tm error = tm.error error(msg)
def exportTsv(self, pages=None)
-
Combine the tsv data per page to one big tsv file.
pages: string | int, optional
None
Specification of pages to do. If absent orNone
: all pages. If an int, do only that page. Otherwise it must be a comma separated string of (ranges of) page numbers. Half ranges are also allowed:-10
(from beginning up to and including10
) and10-
(from 10 till end). E.g.1
and5-7
and2-5,8-10
, and-10,15-20,30-
. No spaces allowed.The output is written to the working directory.
Expand source code Browse git
def exportTsv(self, pages=None): """Combine the tsv data per page to one big tsv file. pages: string | int, optional `None` Specification of pages to do. If absent or `None`: all pages. If an int, do only that page. Otherwise it must be a comma separated string of (ranges of) page numbers. Half ranges are also allowed: `-10` (from beginning up to and including `10`) and `10-` (from 10 till end). E.g. `1` and `5-7` and `2-5,8-10`, and `-10,15-20,30-`. No spaces allowed. The output is written to the working directory. """ tm = self.tm info = tm.info allPages = self.allPages imageFiles = select(allPages, pages) pagesFile = ALL_PAGES if pages is None else pagesRep(imageFiles) pagesDesc = pagesRep(imageFiles) info(f"Batch of {len(imageFiles)} pages: {pagesDesc}") info("Start producing single TSV file of these pages") path = f"{pagesFile}.tsv" first = True f = None for (i, imFile) in enumerate(sorted(imageFiles)): page = Page(self, imFile, minimal=True) stages = page.stages stage = "word" if first: headers = page.dataHeaders.get(stage, None) header = "\t".join(str(head) for head in headers) f = open(path, "w") f.write(f"{header}\n") first = False pageNum = int(page.bare.lstrip("0") or "0") page.read(stage=stage) data = stages[stage] if not data: continue for fields in stages[stage]: f.write(f"{pageNum}\t" + "\t".join(str(f) for f in fields) + "\n") if f: info(f"written to {path}") f.close() else: info("Nothing written")
def htmlPages(self, pages=None)
-
Get the text in html from the ocr output in one file
pages: string | int, optional
None
Specification of pages to do. If absent orNone
: all pages. If an int, do only that page. Otherwise it must be a comma separated string of (ranges of) page numbers. Half ranges are also allowed:-10
(from beginning up to and including10
) and10-
(from 10 till end). E.g.1
and5-7
and2-5,8-10
, and-10,15-20,30-
. No spaces allowed.The output is written to the
text
subdirectory.Expand source code Browse git
def htmlPages(self, pages=None): """Get the text in html from the ocr output in one file pages: string | int, optional `None` Specification of pages to do. If absent or `None`: all pages. If an int, do only that page. Otherwise it must be a comma separated string of (ranges of) page numbers. Half ranges are also allowed: `-10` (from beginning up to and including `10`) and `10-` (from 10 till end). E.g. `1` and `5-7` and `2-5,8-10`, and `-10,15-20,30-`. No spaces allowed. The output is written to the `text` subdirectory. """ tm = self.tm info = tm.info indent = tm.indent C = self.C cd = self.cd htmlDir = C.htmlDir if not os.path.exists(htmlDir): os.makedirs(htmlDir, exist_ok=True) allPages = self.allPages imageFiles = select(allPages, pages) pagesDesc = pagesRep(imageFiles) info(f"Batch of {len(imageFiles)} pages: {pagesDesc}") info("Start producing plain text of these pages") page = None fileName = f"{pagesDesc}.html" path = f"{htmlDir}/{fileName}" doc = """\ <html> <head> <meta charset="utf-8"/> <style> body { font-size: x-large; text-align: right; direction: rtl; } div.page { text-align: right; } div.stripe { display: flex; flex-flow: row nowrap; } div.c, div.cl, div.cr { text-align: right; } h3 { text-align: right; } span.ln { font-style: italic; font-size: small; vertical-align: super; text-align: right; } </style> </head> «body» </body> </html> """ body = [] for (i, imFile) in enumerate(sorted(imageFiles)): pageMaterial = [] indent(level=1, reset=True) msg = f"{i + 1:>5} {imFile:<40}" info(f"{msg}\r", nl=False) page = Page(self, imFile, minimal=True) page.read(stage="word") pg = page.bare.lstrip("0") if pg == "": pg = "0" pg = int(pg) pageRep = f"p{pg:>03}" pageMaterial.append(f"""<div page="{pageRep}"><h3>{pg}</h3>""") if page.empty: pageMaterial.append("""</div>""") body.append("\n".join(pageMaterial)) continue stages = page.stages stage = "word" (prevStripe, prevBlock, prevLine) = (None, None, None) stripeMaterial = [] blockMaterial = [] lineMaterial = [] for fields in stages[stage]: (stripe, block, line) = fields[0:3] if stripe != prevStripe: if prevStripe is not None: stripeMaterial.append("</div>") pageMaterial.append("\n".join(stripeMaterial)) stripeMaterial = [] stripeMaterial.append(f"""<div class="stripe" stripe="{stripe}">""") prevBlock = None if block != prevBlock: if prevBlock is not None: blockMaterial.append("</div>") stripeMaterial.append("\n".join(blockMaterial)) blockMaterial = [] blockMaterial.append(f"""<div class="c{block}">""") prevLine = None if line != prevLine: if prevLine is not None: lineMaterial.append("</div>") blockMaterial.append(" ".join(lineMaterial)) lineMaterial = [] lineMaterial.append( f"""<div line="{line}"><span class="ln">{line}</span>""" ) (prevStripe, prevBlock, prevLine) = (stripe, block, line) word = fields[-2] punc = fields[-1] lineMaterial.append(f"{word}{punc}") blockMaterial.append(" ".join(lineMaterial)) stripeMaterial.append("\n".join(blockMaterial)) pageMaterial.append("\n".join(stripeMaterial)) pageMaterial.append("</div>") body.append("\n".join(pageMaterial)) indent(level=0) with open(path, "w") as f: f.write(doc.replace("«body»", "\n".join(body))) info(f"written to {path}") showPath = unexpanduser(f"{cd}{path}") nbLink = getNbLink(showPath, fileName) if nbLink is None: dh(showPath) else: dh(f"""{nbLink} (local file: {showPath})""")
def info(self, msg)
-
Expand source code Browse git
def info(self, msg): tm = self.tm info = tm.info info(msg)
def measureQuality(self, pages=None, showStats=True, updateProofs=False)
-
Measure the reported quality of the ocr processing.
pages: string | int, optional
None
Specification of pages to do. If absent orNone
: all pages. If an int, do only that page. Otherwise it must be a comma separated string of (ranges of) page numbers. Half ranges are also allowed:-10
(from beginning up to and including10
) and10-
(from 10 till end). E.g.1
and5-7
and2-5,8-10
, and-10,15-20,30-
. No spaces allowed.showStats: boolean, optional
True
Compute and show quality statisticsupdateProofs: boolean, optional
False
If true, regenerate all proofing pages. This is desriable if you have tweaked the coloring of OCR results depending on the confidence. The OCR itself does not have to be performed again for this.Expand source code Browse git
def measureQuality(self, pages=None, showStats=True, updateProofs=False): """Measure the reported quality of the ocr processing. pages: string | int, optional `None` Specification of pages to do. If absent or `None`: all pages. If an int, do only that page. Otherwise it must be a comma separated string of (ranges of) page numbers. Half ranges are also allowed: `-10` (from beginning up to and including `10`) and `10-` (from 10 till end). E.g. `1` and `5-7` and `2-5,8-10`, and `-10,15-20,30-`. No spaces allowed. showStats: boolean, optional `True` Compute and show quality statistics updateProofs: boolean, optional `False` If true, regenerate all proofing pages. This is desriable if you have tweaked the coloring of OCR results depending on the confidence. The OCR itself does not have to be performed again for this. """ tm = self.tm info = tm.info indent = tm.indent cd = self.cd allPages = self.allPages imageFiles = select(allPages, pages) pagesDesc = pagesRep(imageFiles) info(f"Batch of {len(imageFiles)} pages: {pagesDesc}") info("Start measuring ocr quality of these images") if updateProofs: info(" end regenrating proof files") page = None results = dict(char=[], word=[]) resultsChar = collections.defaultdict(list) for (i, imFile) in enumerate(sorted(imageFiles)): indent(level=1, reset=True) msg = f"{i + 1:>5} {imFile:<40}" info(f"{msg}\r", nl=False) page = Page(self, imFile, minimal=True) page.read(stage=("normalized,line," if updateProofs else "") + "word,char") if page.empty: continue if updateProofs: page.proofing() if not showStats: continue stages = page.stages pg = page.bare pageRep = f"p{pg}" for stage in ("word", "char"): proofStage = f"proof{stage}" thisPageRep = f"""<a href="{page.stagePath(proofStage)}">p{pg}</a>""" isCharStage = stage == "char" confI = -2 if isCharStage else -3 n = 0 totC = 0 (minC, maxC) = (100, 0) for fields in stages[stage]: conf = int(fields[confI]) totC += conf if conf < minC: minC = conf if conf > maxC: maxC = conf n += 1 if isCharStage: c = fields[-1] resultsChar[c].append((pageRep, conf)) if n > 0: results[stage].append((thisPageRep, n, minC, maxC, totC, "")) if not showStats: indent(level=0) info("all done") return for stage in ("word", "char"): stageResults = results[stage] if not len(stageResults): continue grandN = sum(r[1] for r in stageResults) grandMin = min(r[2] for r in stageResults) grandMax = max(r[3] for r in stageResults) grandTot = sum(r[4] for r in stageResults) toShow = [ ("overall", grandN, grandMin, grandMax, grandTot, "") ] + stageResults info(f"{stage}-confidences of OCR results for {len(stageResults)} pages") showConf(stage, toShow) info(f"by-char-confidences of OCR results for {len(resultsChar)} characters") resultsCollected = [] (sDir, sTrail, sExt) = self.stageDir("proofchar") showPath = unexpanduser(f"{cd}{sDir}") (isLink, nbLink) = getNbPath(showPath) for c in sorted(resultsChar): occs = sorted(resultsChar[c], key=lambda x: x[1]) elem = "a" if isLink else "span" href = nbLink if isLink else None worstExamples = [] for x in occs[0:20]: href = ( f'''href="{nbLink}/{x[0][1:]}{sTrail}.{sExt}"''' if isLink else "" ) worstExamples.append( f""" <{elem} style="background-color: {getProofColor(x[1])};" title="{showPath}/{x[0][1:]}{sTrail}.{sExt}" {href} >{x[0]}</{elem}> """ ) worstExamples = " ".join(worstExamples) nOccs = len(occs) minC = min(r[1] for r in occs) maxC = max(r[1] for r in occs) totC = sum(r[1] for r in occs) resultsCollected.append((f"⌊{c}⌋", nOccs, minC, maxC, totC, worstExamples)) showConf(stage, resultsCollected, label="worst results") indent(level=0) info("all done")
def process(self, pages=None, batch=True, quiet=True, boxed=False, doOcr=True, uptoLayout=False, **kwargs)
-
Process directory of images.
Executes all processing steps for all images.
Parameters
pages
:string | int
, optionalNone
- Specification of pages to do. If absent or
None
: all pages. If an int, do only that page. Otherwise it must be a comma separated string of (ranges of) page numbers. Half ranges are also allowed:-10
(from beginning up to and including10
) and10-
(from 10 till end). E.g.1
and5-7
and2-5,8-10
, and-10,15-20,30-
. No spaces allowed. batch
:boolean
, optionalTrue
- Whether to run in batch mode. In batch mode everything is geared to the final output. Less intermediate results are computed and stored. Less feedback happens on the console.
boxed
:boolean
, optionalFalse
- If in batch mode, produce also images that display the cleaned marks in boxes.
quiet
:boolean
, optionalTrue
- Whether to suppress warnings and the display of stroke separators.
doOcr
:boolean
, optionalTrue
- Whether to perform OCR processing
uptoLayout
:boolean
, optionalFalse
- Whether to stop after doing layout
Returns
- A
Page
object for the last page processed, which is the handle for further
inspection of what has happened during processing.
Expand source code Browse git
def process( self, pages=None, batch=True, quiet=True, boxed=False, doOcr=True, uptoLayout=False, **kwargs, ): """Process directory of images. Executes all processing steps for all images. Parameters ---------- pages: string | int, optional `None` Specification of pages to do. If absent or `None`: all pages. If an int, do only that page. Otherwise it must be a comma separated string of (ranges of) page numbers. Half ranges are also allowed: `-10` (from beginning up to and including `10`) and `10-` (from 10 till end). E.g. `1` and `5-7` and `2-5,8-10`, and `-10,15-20,30-`. No spaces allowed. batch: boolean, optional `True` Whether to run in batch mode. In batch mode everything is geared to the final output. Less intermediate results are computed and stored. Less feedback happens on the console. boxed: boolean, optional `False` If in batch mode, produce also images that display the cleaned marks in boxes. quiet: boolean, optional `True` Whether to suppress warnings and the display of stroke separators. doOcr: boolean, optional `True` Whether to perform OCR processing uptoLayout: boolean, optional `False` Whether to stop after doing layout Returns ------- A `fusus.page.Page` object for the last page processed, which is the handle for further inspection of what has happened during processing. """ tm = self.tm info = tm.info indent = tm.indent allPages = self.allPages tm.silentOff() indent(reset=True) C = self.C interDir = C.interDir outDir = C.outDir cleanDir = C.cleanDir proofDir = C.proofDir htmlDir = C.htmlDir for d in (interDir, outDir, cleanDir, proofDir, htmlDir): if not os.path.exists(d): os.makedirs(d, exist_ok=True) imageFiles = select(allPages, pages) pagesDesc = pagesRep(imageFiles) info(f"Batch of {len(imageFiles)} pages: {pagesDesc}") info("Start batch processing images") page = None for (i, imFile) in enumerate(sorted(imageFiles)): indent(level=1, reset=True) msg = f"{i + 1:>5} {imFile:<40}" info(f"{msg}\r", nl=False) page = self._doPage( imFile, batch=batch, boxed=boxed, quiet=quiet, doOcr=doOcr, uptoLayout=uptoLayout, **kwargs, ) if not page.empty: page.write(stage="normalized,histogram,clean", perBlock=False) if uptoLayout: info(f"{msg}") else: if not batch: page.write(stage="markData") if boxed: page.write(stage="boxed") info(f"{msg}") indent(level=0) info("all done") return page # the last page processed
def showSettings(self, params=None)
-
Display settings.
Parameters
params
:dict
, optional-
Any number of customizable settings from
SETTINGS
.The current values of given parameters will be displayed. The values that you give each of the
params
here is not used, only their names. It is recommended to passNone
as values:B.showSettings(blurX=None, blurY=None)
Expand source code Browse git
def showSettings(self, params=None): """Display settings. Parameters ---------- params: dict, optional Any number of customizable settings from `fusus.parameters.SETTINGS`. The current values of given parameters will be displayed. The values that you give each of the `params` here is not used, only their names. It is recommended to pass `None` as values: `B.showSettings(blurX=None, blurY=None)` """ self.C.show(params=params)
def stageDir(self, stage)
-
Expand source code Browse git
def stageDir(self, stage): C = self.C (stageType, stageColor, stageExt, stageDir, stagePart) = C.stages[stage] trail = stage if stagePart is None else "" if not stagePart else stagePart trail = "" if not trail else f"-{trail}" return (getattr(C, stageDir or "interDir"), trail, stageExt)
def warning(self, msg)
-
Expand source code Browse git
def warning(self, msg): tm = self.tm warning = tm.warning warning(msg)