8000 Support auto transform SOFATracer Span in SOFA ThreadPool by HzjNeverStop · Pull Request #190 · sofastack/sofa-common-tools · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Support auto transform SOFATracer Span in SOFA ThreadPool #190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.alipay.sofa.common</groupId>
<artifactId>sofa-common-tools</artifactId>
<version>2.0.1</version>
<version>2.0.2</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand All @@ -30,6 +30,7 @@
<spring.boot.version>3.0.2</spring.boot.version>
<junit.version>4.13.1</junit.version>
<guava.version>27.0-jre</guava.version>
<sofa.tracer.version>4.0.0</sofa.tracer.version>
</properties>

<dependencyManagement>
Expand All @@ -50,6 +51,12 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>tracer-core</artifactId>
<version>${sofa.tracer.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.alipay.sofa.common.thread.space.SpaceNamedThreadFactory;
import com.alipay.sofa.common.utils.StringUtil;

import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,11 +34,12 @@
* @version SofaScheduledThreadPoolExecutor.java, v 0.1 2020年11月09日 2:19 下午 huzijie Exp $
*/
public class SofaScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
private static final String SIMPLE_CLASS_NAME = SofaScheduledThreadPoolExecutor.class
.getSimpleName();
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private static final String SIMPLE_CLASS_NAME = SofaScheduledThreadPoolExecutor.class
.getSimpleName();
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private final ThreadPoolConfig config;
private final ThreadPoolStatistics statistics;
private boolean sofaTracerTransmit = false;

/**
* Basic constructor
Expand Down Expand Up @@ -193,4 +196,43 @@ public ThreadPoolConfig getConfig() {
public ThreadPoolStatistics getStatistics() {
return statistics;
}

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (sofaTracerTransmit) {
command = SofaTracerCommandFactory.ofRunnable(command);
}
return super.schedule(command, delay, unit);
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (sofaTracerTransmit) {
callable = SofaTracerCommandFactory.ofCallable(callable);
}
return super.schedule(callable, delay, unit);
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
if (sofaTracerTransmit) {
command = SofaTracerCommandFactory.ofRunnable(command);
}
return super.scheduleAtFixedRate(command, initialDelay, period, unit);
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
if (sofaTracerTransmit) {
command = SofaTracerCommandFactory.ofRunnable(command);
}
return super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
this.sofaTracerTransmit = sofaTracerTransmit;
}

public boolean isSofaTracerTransmit() {
return sofaTracerTransmit;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
* Created on 2020/3/16
*/
public class SofaThreadPoolExecutor extends ThreadPoolExecutor {
private static final String SIMPLE_CLASS_NAME = SofaThreadPoolExecutor.class
.getSimpleName();
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private static final String SIMPLE_CLASS_NAME = SofaThreadPoolExecutor.class
.getSimpleName();
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private final ThreadPoolConfig config;
private final ThreadPoolStatistics statistics;
private boolean sofaTracerTransmit = false;

/**
* Basic constructor
Expand Down Expand Up @@ -142,7 +143,8 @@ public SofaThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAl

@Override
public void execute(Runnable command) {
ExecutingRunnable runner = new ExecutingRunnable(command);
ExecutingRunnable runner = sofaTracerTransmit ? SofaTracerCommandFactory
.ofExecutingRunnable(command) : new ExecutingRunnable(command);
runner.setEnqueueTime(System.currentTimeMillis());
super.execute(runner);
}
Expand Down Expand Up @@ -220,4 +222,12 @@ public ThreadPoolStatistics getStatistics() {
private String createName() {
return SIMPLE_CLASS_NAME + String.format("%08x", POOL_COUNTER.getAndIncrement());
}

public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
this.sofaTracerTransmit = sofaTracerTransmit;
}

public boolean isSofaTracerTransmit() {
return sofaTracerTransmit;
}
}
5D39
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class SofaThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

protected long period;

protected boolean sofaTracerTransmit;

@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
Expand Down Expand Up @@ -73,6 +75,7 @@ public void execute(Runnable command) {
rejectedExecutionHandler, threadPoolName, spaceName, taskTimeout, period,
TimeUnit.MILLISECONDS);
}
executor.setSofaTracerTransmit(sofaTracerTransmit);

Boolean allowCoreThreadTimeOut = ClassUtil.getField("allowCoreThreadTimeOut", this);
if (allowCoreThreadTimeOut) {
Expand Down Expand Up @@ -144,4 +147,12 @@ public TimeUnit getTimeUnit() {
}
return sofaThreadPoolExecutor.getConfig().getTimeUnit();
}

public boolean isSofaTracerTransmit() {
return sofaTracerTransmit;
}

public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
this.sofaTracerTransmit = sofaTracerTransmit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class SofaThreadPoolTaskScheduler extends ThreadPoolTaskScheduler {

protected long period;

protected boolean sofaTracerTransmit;

@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
Expand All @@ -54,6 +56,7 @@ protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
SofaScheduledThreadPoolExecutor executor = new SofaScheduledThreadPoolExecutor(
getPoolSize(), threadFactory, rejectedExecutionHandler, threadPoolName, spaceName,
taskTimeout, period, TimeUnit.MILLISECONDS);
executor.setSofaTracerTransmit(sofaTracerTransmit);

Boolean removeOnCancelPolicy = ClassUtil.getField("removeOnCancelPolicy", this);
if (removeOnCancelPolicy) {
Expand Down Expand Up @@ -126,4 +129,8 @@ public TimeUnit getTimeUnit() {
}
return sofaScheduledThreadPoolExecutor.getConfig().getTimeUnit();
}

public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
this.sofaTracerTransmit = sofaTracerTransmit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.common.thread;

import com.alipay.common.tracer.core.async.SofaTracerCallable;
import com.alipay.common.tracer.core.async.SofaTracerRunnable;
import com.alipay.sofa.common.utils.ClassUtil;

import java.util.concurrent.Callable;

/**
* Factory to create SOFA-Tracer work command.
* @author huzijie
* @version SofaTracerCommandFactory.java, v 0.1 2023年09月26日 2:53 PM huzijie Exp $
*/
public class SofaTracerCommandFactory {

private static final String SOFA_TRACER_RUNNABLE_CLASS_NAME = "com.alipay.common.tracer.core.async.SofaTracerRunnable";
private static final boolean SOFA_TRACER_CLASS_PRESENT = ClassUtil
.isPresent(
SOFA_TRACER_RUNNABLE_CLASS_NAME,
SofaTracerCommandFactory.class
.getClassLoader());

static ExecutingRunnable ofExecutingRunnable(Runnable runnable) {
if (!SOFA_TRACER_CLASS_PRESENT) {
return new ExecutingRunnable(runnable);
}
return new SofaTracerCommandFactory.SofaTracerExecutingRunnable(runnable);
}

static Runnable ofRunnable(Runnable runnable) {
if (!SOFA_TRACER_CLASS_PRESENT) {
return runnable;
}
if (runnable instanceof SofaTracerRunnable) {
return runnable;
}
return new SofaTracerRunnable(runnable);
}

static <V> Callable<V> ofCallable(Callable<V> callable) {
if (!SOFA_TRACER_CLASS_PRESENT) {
return callable;
}
if (callable instanceof SofaTracerCallable) {
return callable;
}
return new SofaTracerCallable<>(callable);
}

/**
* The wrapper to the {@link ExecutingRunnable} to transmit SofaTracerSpan.
* @author huzijie
* @version SofaTracerExecutingRunnable.java, v 0.1 2023年09月26日 11:45 AM huzijie Exp $
*/
public static class SofaTracerExecutingRunnable extends ExecutingRunnable {

private final SofaTracerRunnable sofaTracerRunnable;

public SofaTracerExecutingRunnable(Runnable originRunnable) {
super(originRunnable);
if (originRunnable instanceof SofaTracerRunnable) {
this.sofaTracerRunnable = (SofaTracerRunnable) originRunnable;
} else {
this.sofaTracerRunnable = new SofaTracerRunnable(originRunnable);
}
}

@Override
public void run() {
sofaTracerRunnable.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author huzijie
Expand Down Expand Up @@ -135,4 +137,81 @@ public void testLoggingBurst() throws Exception {
Assert.assertEquals(numThreads, aberrantListAppender.list.size());
Assert.assertTrue(isLastInfoMatch("Thread pool with name '\\S+' unregistered"));
}

@Test
public void testNoTracerTransmit() throws InterruptedException {
AtomicInteger success = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(1);
threadPool.schedule(() -> {
try {
assertTraceSpanNotExist();
success.incrementAndGet();
} finally {
countDownLatch.countDown();
}
}, 10, TimeUnit.MILLISECONDS);
countDownLatch.await();
Assert.assertEquals(success.get(), 1);
}

@Test
public void testEnableTracerTransmit() throws InterruptedException {
threadPool.setSofaTracerTransmit(true);

AtomicInteger fail = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(1);
threadPool.schedule(() -> {
try {
assertTraceSpanExist();
} catch (Throwable t) {
fail.incrementAndGet();
} finally {
countDownLatch.countDown();
}
}, 10, TimeUnit.MILLISECONDS);
Assert.assertTrue(countDownLatch.await(20, TimeUnit.MILLISECONDS));
Assert.assertEquals(fail.get(), 0);

fail.set(0);
threadPool.schedule(() -> {
try {
return assertTraceSpanExist();
} catch (Throwable t) {
fail.incrementAndGet();
return null;
} finally {
countDownLatch.countDown();
}
}, 10, TimeUnit.MILLISECONDS);
Assert.assertTrue(countDownLatch.await(20, TimeUnit.MILLISECONDS));
Assert.assertEquals(fail.get(), 0);

fail.set(0);
CountDownLatch fixRateCountDownLatch = new CountDownLatch(2);
threadPool.scheduleAtFixedRate(() -> {
try {
assertTraceSpanExist();
} catch (Throwable t) {
fail.incrementAndGet();
} finally {
fixRateCountDownLatch.countDown();
}
}, 10, 10, TimeUnit.MILLISECONDS);
Assert.assertTrue(fixRateCountDownLatch.await(30, TimeUnit.MILLISECONDS));
Assert.assertEquals(fail.get(), 0);

fail.set(0);
CountDownLatch fixDelayCountDownLatch = new CountDownLatch(2);
threadPool.scheduleWithFixedDelay(() -> {
try {
assertTraceSpanExist();
} catch (Throwable t) {
fail.incrementAndGet();
} finally {
fixDelayCountDownLatch.countDown();
}
}, 10, 10, TimeUnit.MILLISECONDS);
Assert.assertTrue(fixDelayCountDownLatch.await(30, TimeUnit.MILLISECONDS));
Assert.assertEquals(fail.get(), 0);
}
}
Loading
0