Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TIKA-4097 #1226

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -41,6 +43,8 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.file.Counters;
import org.apache.commons.io.file.PathUtils;
import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,10 +81,18 @@ public class PipesClient implements Closeable {
private DataOutputStream output;
private DataInputStream input;
private int filesProcessed = 0;
//this is the client-specific subdirectory of the pipesConfig's getPipesTmpDir
final Path tmpDir;

public PipesClient(PipesConfigBase pipesConfig) {
this.pipesConfig = pipesConfig;
this.pipesClientId = CLIENT_COUNTER.getAndIncrement();
try {
tmpDir = Files.createTempDirectory(pipesConfig.getPipesTmpDir(),
"client-" + this.pipesClientId + "-");
} catch (IOException e) {
throw new RuntimeException("Couldn't create temp dir?!", e);
}
}

public int getFilesProcessed() {
Expand Down Expand Up @@ -277,6 +289,19 @@ private void destroyForcibly() throws InterruptedException {
if (process.isAlive()) {
LOG.error("Process still alive after {}ms", WAIT_ON_DESTROY_MS);
}
try {
if (Files.isDirectory(tmpDir)) {
LOG.debug("about to delete the full async temp directory: {}",
pipesConfig.getPipesTmpDir().toAbsolutePath());
Counters.PathCounters pathCounters =
PathUtils.deleteDirectory(pipesConfig.getPipesTmpDir());
LOG.debug("Successfully deleted {} temporary files in {} directories",
pathCounters.getFileCounter().get(),
pathCounters.getDirectoryCounter().get());
}
} catch (IllegalArgumentException | IOException e) {
LOG.warn("Failed to delete temporary directory: " + tmpDir.toAbsolutePath(), e);
}
}

private PipesResult readResults(FetchEmitTuple t, long start) throws IOException {
Expand Down Expand Up @@ -420,6 +445,9 @@ private void restart() throws IOException, InterruptedException, TimeoutExceptio
}
executorService = Executors.newFixedThreadPool(1);
}
if (! Files.isDirectory(tmpDir)) {
Files.createDirectories(tmpDir);
}
LOG.info("pipesClientId={}: restarting process", pipesClientId);
} else {
LOG.info("pipesClientId={}: starting process", pipesClientId);
Expand Down Expand Up @@ -524,6 +552,11 @@ private String[] getCommandline() {
origGCString = arg;
newGCLogString = arg.replace("${pipesClientId}", "id-" + pipesClientId);
}
if (arg.startsWith("-Djava.io.tmpdir=")) {
throw new IllegalArgumentException("Can't specify java.io.tmpdir in jvmargs. Set " +
"the overall tmpdir for all async process and its forked processes in the" +
"<pipesTmpDir/> attribute.");
}
}

if (origGCString != null && newGCLogString != null) {
Expand Down Expand Up @@ -552,6 +585,7 @@ private String[] getCommandline() {
"-Dlog4j.configurationFile=classpath:pipes-fork-server-default-log4j2.xml");
}
commandLine.add("-DpipesClientId=" + pipesClientId);
commandLine.add("-Djava.io.tmpdir=" + ProcessUtils.escapeCommandLine(tmpDir.toAbsolutePath().toString()));
commandLine.addAll(configArgs);
commandLine.add("org.apache.tika.pipes.PipesServer");
commandLine.add(ProcessUtils.escapeCommandLine(
Expand Down
23 changes: 23 additions & 0 deletions tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.tika.pipes;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand Down Expand Up @@ -59,6 +61,8 @@ public class PipesConfigBase extends ConfigBase {
private Path tikaConfig;
private String javaPath = "java";

private Path pipesTmpDir = null;

public long getTimeoutMillis() {
return timeoutMillis;
}
Expand Down Expand Up @@ -171,4 +175,23 @@ public long getSleepOnStartupTimeoutMillis() {
public void setSleepOnStartupTimeoutMillis(long sleepOnStartupTimeoutMillis) {
this.sleepOnStartupTimeoutMillis = sleepOnStartupTimeoutMillis;
}

public void setPipesTmpDir(String pipesTmpDir) {
setPipesTmpDirPath(Paths.get(pipesTmpDir));
}

public void setPipesTmpDirPath(Path pipesTmpDir) {
this.pipesTmpDir = pipesTmpDir;
}

public Path getPipesTmpDir() throws IOException {
if (pipesTmpDir == null) {
pipesTmpDir = Files.createTempDirectory("tika-pipes-tmp-dir-");
} else {
if (! Files.isDirectory(pipesTmpDir)) {
Files.createDirectories(pipesTmpDir);
}
}
return pipesTmpDir;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
Expand All @@ -30,6 +32,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.io.file.Counters;
import org.apache.commons.io.file.PathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -82,6 +86,26 @@ public AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator) throws T
asyncConfig.getNumClients() + asyncConfig.getNumEmitters() + 1);
this.executorCompletionService =
new ExecutorCompletionService<>(executorService);

final Path tmpDir = asyncConfig.getPipesTmpDir();
final List<FetchEmitWorker> workers = new ArrayList<>();

for (int i = 0; i < asyncConfig.getNumClients(); i++) {
workers.add(new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitData));
}

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
executorService.shutdownNow();
for (FetchEmitWorker worker : workers) {
try {
worker.close();
} catch (IOException e) {
LOG.warn("Exception closing worker", e);
}
}
cleanTmpDir(tmpDir);
}));

try {
if (!tikaConfigPath.toAbsolutePath().equals(asyncConfig.getTikaConfig().toAbsolutePath())) {
LOG.warn("TikaConfig for AsyncProcessor ({}) is different " +
Expand All @@ -105,9 +129,8 @@ public AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator) throws T
startCounter((TotalCounter) pipesIterator);
}

for (int i = 0; i < asyncConfig.getNumClients(); i++) {
executorCompletionService.submit(
new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitData));
for (FetchEmitWorker worker : workers) {
executorCompletionService.submit(worker);
}

EmitterManager emitterManager = EmitterManager.load(asyncConfig.getTikaConfig());
Expand Down Expand Up @@ -252,30 +275,53 @@ public synchronized boolean checkActive() throws InterruptedException {
public void close() throws IOException {
executorService.shutdownNow();
asyncConfig.getPipesReporter().close();
cleanTmpDir(asyncConfig.getPipesTmpDir());
}

private static void cleanTmpDir(Path tmpDir) {
if (tmpDir == null) {
return;
}
if (Files.isDirectory(tmpDir)) {
try {
LOG.debug("about to delete the full async temp directory: {}",
tmpDir);
Counters.PathCounters pathCounters = PathUtils.deleteDirectory(tmpDir);
LOG.debug("Successfully deleted {} temporary files in {} directories",
pathCounters.getFileCounter().get(),
pathCounters.getDirectoryCounter().get());
} catch (IllegalArgumentException e) {
LOG.debug("null tmpDir? " + tmpDir, e);
} catch (IOException e) {
LOG.warn("Couldn't delete tmpDir: " + tmpDir, e);
}
}
}

public long getTotalProcessed() {
return totalProcessed.get();
}

private class FetchEmitWorker implements Callable<Integer> {
private class FetchEmitWorker implements Callable<Integer>, Closeable {

private final AsyncConfig asyncConfig;
private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
private final ArrayBlockingQueue<EmitData> emitDataQueue;
private final PipesClient pipesClient;

private FetchEmitWorker(AsyncConfig asyncConfig,
ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples,
ArrayBlockingQueue<EmitData> emitDataQueue) {
this.asyncConfig = asyncConfig;
this.fetchEmitTuples = fetchEmitTuples;
this.emitDataQueue = emitDataQueue;
this.pipesClient = new PipesClient(asyncConfig);
}

@Override
public Integer call() throws Exception {

try (PipesClient pipesClient = new PipesClient(asyncConfig)) {
try {
while (true) {
FetchEmitTuple t = fetchEmitTuples.poll(1, TimeUnit.SECONDS);
if (t == null) {
Expand Down Expand Up @@ -322,9 +368,15 @@ public Integer call() throws Exception {
totalProcessed.incrementAndGet();
}
}
} finally {
close();
}
}

public void close() throws IOException {
pipesClient.close();
}

private boolean shouldEmit(PipesResult result) {

if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS ||
Expand Down
Loading