File size: 7,427 Bytes
8652957 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
#include "lm/common/compare.hh"
#include "lm/common/model_buffer.hh"
#include "lm/common/ngram.hh"
#include "util/stream/chain.hh"
#include "util/stream/multi_stream.hh"
#include "util/stream/sort.hh"
#include "lm/interpolate/split_worker.hh"
#include <boost/program_options.hpp>
#include <boost/version.hpp>
#if defined(_WIN32) || defined(_WIN64)
// Windows doesn't define <unistd.h>
//
// So we define what we need here instead:
//
#define STDIN_FILENO = 0
#define STDOUT_FILENO = 1
#else // Huzzah for POSIX!
#include <unistd.h>
#endif
/*
* This is a simple example program that takes in intermediate
* suffix-sorted ngram files and outputs two sets of files: one for backoff
* probability values (raw numbers, in suffix order) and one for
* probability values (ngram id and probability, in *context* order)
*/
int main(int argc, char *argv[]) {
using namespace lm::interpolate;
const std::size_t ONE_GB = 1 << 30;
const std::size_t SIXTY_FOUR_MB = 1 << 26;
const std::size_t NUMBER_OF_BLOCKS = 2;
std::string FILE_NAME = "ngrams";
std::string CONTEXT_SORTED_FILENAME = "csorted-ngrams";
std::string BACKOFF_FILENAME = "backoffs";
std::string TMP_DIR = "/tmp/";
try {
namespace po = boost::program_options;
po::options_description options("canhazinterp Pass-3 options");
options.add_options()
("help,h", po::bool_switch(), "Show this help message")
("ngrams,n", po::value<std::string>(&FILE_NAME), "ngrams file")
("csortngrams,c", po::value<std::string>(&CONTEXT_SORTED_FILENAME), "context sorted ngrams file")
("backoffs,b", po::value<std::string>(&BACKOFF_FILENAME), "backoffs file")
("tmpdir,t", po::value<std::string>(&TMP_DIR), "tmp dir");
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, options), vm);
// Display help
if(vm["help"].as<bool>()) {
std::cerr << "Usage: " << options << std::endl;
return 1;
}
}
catch(const std::exception &e) {
std::cerr << e.what() << std::endl;
return 1;
}
// The basic strategy here is to have three chains:
// - The first reads the ngram order inputs using ModelBuffer. Those are
// then stripped of their backoff values and fed into the third chain;
// the backoff values *themselves* are written to the second chain.
//
// - The second chain takes the backoff values and writes them out to a
// file (one for each order).
//
// - The third chain takes just the probability values and ngrams and
// writes them out, sorted in context-order, to a file (one for each
// order).
// This will be used to read in the binary intermediate files. There is
// one file per order (e.g. ngrams.1, ngrams.2, ...)
lm::ModelBuffer buffer(FILE_NAME);
// Create a separate chains for each ngram order for:
// - Input from the intermediate files
// - Output to the backoff file
// - Output to the (context-sorted) probability file
util::stream::Chains ngram_inputs(buffer.Order());
util::stream::Chains backoff_chains(buffer.Order());
util::stream::Chains prob_chains(buffer.Order());
for (std::size_t i = 0; i < buffer.Order(); ++i) {
ngram_inputs.push_back(util::stream::ChainConfig(
lm::NGram<lm::ProbBackoff>::TotalSize(i + 1), NUMBER_OF_BLOCKS, ONE_GB));
backoff_chains.push_back(
util::stream::ChainConfig(sizeof(float), NUMBER_OF_BLOCKS, ONE_GB));
prob_chains.push_back(util::stream::ChainConfig(
sizeof(lm::WordIndex) * (i + 1) + sizeof(float), NUMBER_OF_BLOCKS,
ONE_GB));
}
// This sets the input for each of the ngram order chains to the
// appropriate file
buffer.Source(ngram_inputs);
util::FixedArray<util::scoped_ptr<SplitWorker> > workers(buffer.Order());
for (std::size_t i = 0; i < buffer.Order(); ++i) {
// Attach a SplitWorker to each of the ngram input chains, writing to the
// corresponding order's backoff and probability chains
workers.push_back(
new SplitWorker(i + 1, backoff_chains[i], prob_chains[i]));
ngram_inputs[i] >> boost::ref(*workers.back());
}
util::stream::SortConfig sort_cfg;
sort_cfg.temp_prefix = TMP_DIR;
sort_cfg.buffer_size = SIXTY_FOUR_MB;
sort_cfg.total_memory = ONE_GB;
// This will parallel merge sort the individual order files, putting
// them in context-order instead of suffix-order.
//
// Two new threads will be running, each owned by the chains[i] object.
// - The first executes BlockSorter.Run() to sort the n-gram entries
// - The second executes WriteAndRecycle.Run() to write each sorted
// block to disk as a temporary file
util::stream::Sorts<lm::ContextOrder> sorts(buffer.Order());
for (std::size_t i = 0; i < prob_chains.size(); ++i) {
sorts.push_back(prob_chains[i], sort_cfg, lm::ContextOrder(i + 1));
}
// Set the sort output to be on the same chain
for (std::size_t i = 0; i < prob_chains.size(); ++i) {
// The following call to Chain::Wait()
// joins the threads owned by chains[i].
//
// As such the following call won't return
// until all threads owned by chains[i] have completed.
//
// The following call also resets chain[i]
// so that it can be reused
// (including free'ing the memory previously used by the chain)
prob_chains[i].Wait();
// In an ideal world (without memory restrictions)
// we could merge all of the previously sorted blocks
// by reading them all completely into memory
// and then running merge sort over them.
//
// In the real world, we have memory restrictions;
// depending on how many blocks we have,
// and how much memory we can use to read from each block
// (sort_config.buffer_size)
// it may be the case that we have insufficient memory
// to read sort_config.buffer_size of data from each block from disk.
//
// If this occurs, then it will be necessary to perform one or more rounds
// of merge sort on disk;
// doing so will reduce the number of blocks that we will eventually
// need to read from
// when performing the final round of merge sort in memory.
//
// So, the following call determines whether it is necessary
// to perform one or more rounds of merge sort on disk;
// if such on-disk merge sorting is required, such sorting is performed.
//
// Finally, the following method launches a thread that calls
// OwningMergingReader.Run()
// to perform the final round of merge sort in memory.
//
// Merge sort could have be invoked directly
// so that merge sort memory doesn't coexist with Chain memory.
sorts[i].Output(prob_chains[i]);
}
// Create another model buffer for our output on e.g. csorted-ngrams.1,
// csorted-ngrams.2, ...
lm::ModelBuffer output_buf(CONTEXT_SORTED_FILENAME, true, false);
output_buf.Sink(prob_chains, buffer.Counts());
// Create a third model buffer for our backoff output on e.g. backoff.1,
// backoff.2, ...
lm::ModelBuffer boff_buf(BACKOFF_FILENAME, true, false);
boff_buf.Sink(backoff_chains, buffer.Counts());
// Joins all threads that chains owns,
// and does a for loop over each chain object in chains,
// calling chain.Wait() on each such chain object
ngram_inputs.Wait(true);
backoff_chains.Wait(true);
prob_chains.Wait(true);
return 0;
}
|