java ee wildfly spring 在线程池的线程中注入
2023-09-14 09:08:27 时间
public class RtmpSpyingTests extends AbstractTransactionalJUnit4SpringContextTests { @Autowired ThreadPoolTaskExecutor rtmpSpyingTaskExecutor; @Autowired ApplicationContext ctx; @Autowired RtmpSourceRepository rtmpRep; @Test public void test() { RtmpSource rtmpSourceSample = new RtmpSource("test"); rtmpRep.save(rtmpSourceSample); rtmpRep.flush(); List<RtmpSource> rtmpSourceList = rtmpRep.findAll(); // Here I get a list containing rtmpSourceSample RtmpSpyingTask rtmpSpyingTask = ctx.getBean(RtmpSpyingTask.class, "arg1","arg2"); rtmpSpyingTaskExecutor.execute(rtmpSpyingTask); } } public class RtmpSpyingTask implements Runnable { @Autowired RtmpSourceRepository rtmpRep; String nameIdCh; String rtmpUrl; public RtmpSpyingTask(String nameIdCh, String rtmpUrl) { this.nameIdCh = nameIdCh; this.rtmpUrl = rtmpUrl; } public void run() { // Here I should get a list containing rtmpSourceSample, but instead of that // I get an empty list List<RtmpSource> rtmpSource = rtmpRep.findAll(); } } 应该用 @Service public class AsyncTransactionService { @Autowired RtmpSourceRepository rtmpRep; @Transactional(readOnly = true) public List<RtmpSource> getRtmpSources() { return rtmpRep.findAll(); } @Transactional(propagation = Propagation.REQUIRES_NEW) public void insertRtmpSource(RtmpSource rtmpSource) { rtmpRep.save(rtmpSource); } }
或者
用内部类。
package com.italktv.platform.audioDist.service; import java.io.Serializable; import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.italktv.platform.audioDist.mongo.CustomerRepository; import com.italktv.platform.audioDist.mongo.PlayUrl; import com.italktv.platform.audioDist.mongo.PlayUrl.MyUrl; import com.italktv.platform.audioDist.mongo.PlayUrlRepository; import com.italktv.platform.audioDist.mysql.SubSet; import com.italktv.platform.audioDist.mysql.UserRepository; import com.italktv.platform.audioDist.task.MyTask; import com.italktv.platform.audioDist.task.TaskManager; @Component public class ScheduleJobs { private static final Logger log = LoggerFactory.getLogger(ScheduleJobs.class); public final static long SECOND = 1 * 1000; LocalDateTime nowDate = LocalDateTime.now(); @Autowired // This means to get the bean called userRepository // Which is auto-generated by Spring, we will use it to handle the data private UserRepository userRepository; @Autowired private PlayUrlRepository repository; @Autowired private CustomerRepository cc; @Autowired private UserRepository user; @Autowired TaskManager taskManager; @Scheduled(fixedRate = SECOND * 400) public void fixedRateJob() { nowDate = LocalDateTime.now(); System.out.println("=== start distribution: " + nowDate); dotask(); } // @PostConstruct // public void init() { // // taskManager = new TaskManager(); // taskManager.init(); // } // // @PreDestroy // void destroy() { // taskManager.destroy(); // } void dotask() { Map<Integer, List<SubSet>> map = userRepository.getUploadFileMap(); for (Entry<Integer, List<SubSet>> subject : map.entrySet()) { int subjectId = subject.getKey(); log.info(" subject id:" + subjectId); List<SubSet> allsub = subject.getValue(); for (SubSet item : allsub) { log.info(" sub:" + item.toString()); taskManager.add(new MessagePublish(item.id, item.path)); } //wait them finished //TODO: //update subject status //TODO } } ////////////////////////内部类//////////////////////// public class MessagePublish extends MyTask implements Serializable{ public MessagePublish() { super(); } public MessagePublish(int id,String name ){ this.srcFile = name; this.partId=id; } @Value("${platform.audio.dist.domain}") private String domain; @Override public String call() { System.out.println(srcFile + " is uploading..."); try { //获取消息发布的区域 TimeUnit.SECONDS.sleep(new Random().nextInt(10)+1); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(srcFile + " uploaded."); //2.RECORD TO MONGO DB PlayUrl play=new PlayUrl(); play.programid="programid fake"+ ""; play.domain=domain; play.protocol="HTTP"; MyUrl myurl=new MyUrl(); myurl.high="http://xxx.xxx/xi//"; play.url=myurl; repository.save(play); //TODO: //IF FAILED, RETRY, RECORD RETRY TIMES. //TODO: return "ok"; } } } package com.italktv.platform.audioDist.task; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Component public class TaskManager { private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TaskManager.class); // @Resource(lookup = "java:comp/DefaultManagedScheduledExecutorService") // ManagedScheduledExecutorService executor; Map<String, Future<String>> tasks; ExecutorService executor ; @PostConstruct public void init() { logger.info(" === init TaskManager==="); tasks = new HashMap<String, Future<String>>(); executor = Executors.newFixedThreadPool(3); } public void add(MyTask task) { logger.info("add delay:"+ task.partId+task.srcFile); Future<String> future = executor.submit(task); tasks.put(task.srcFile, future); } public boolean cancel(String name) { logger.info("cancel "+ name); boolean ret = false; Future<String> future = tasks.get(name); if (future == null) { logger.info("Not found name:" + name); } else { ret = future.cancel(true); logger.info("cancel "+ name+":"+ret); tasks.remove(name); } return ret; } public void waitTaskDone(){ Collection<Future<String>> futuretasks = tasks.values(); for(Future<String> future: futuretasks ){ System.out.println("future done? " + future.isDone()); String result=""; try { result = future.get(); } catch (InterruptedException | ExecutionException e) { logger.error("future exec failed."); e.printStackTrace(); } System.out.println("future done? " + future.isDone()); System.out.print("result: " + result); } } @PreDestroy public void destroy(){ try { System.out.println("attempt to shutdown executor"); executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { System.err.println("tasks interrupted"); } finally { if (!executor.isTerminated()) { System.err.println("cancel non-finished tasks"); } executor.shutdownNow(); System.out.println("shutdown finished"); } } } package com.italktv.platform.audioDist.task; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; public abstract class MyTask implements Callable<String> { protected String srcFile; protected int partId; String programId; protected MyTask() { } }
相关文章
- 小猴子吃了一堆桃,第一天吃了一半_Java猴子吃桃问题
- java编译命令是什么_Java编译命令整理
- java jce_了解Java JCE的加密
- 编写java判断闰年_用Java程序判断是否是闰年的简单实例[通俗易懂]
- Java基础测试「建议收藏」
- java是什么?java能用来干嘛?[通俗易懂]
- java和c语言哪个简单_Java编程和C语言哪个好学
- 安卓java游戏模拟器_Java手机游戏模拟器
- MySQL字段类型如何转为java_Java JDBC中,MySQL字段类型到JAVA类型的转换
- JAVA中最常用的十个快捷键
- n皇后问题c语言代码_求n的阶乘java代码
- Java安全基础(二)Servlet核心技术
- Java框架Spring入门-第一个spring项目
- Java面试集锦(一)之Spring/SpringMVC
- java计算数学表达式详解编程语言
- Oracle 视图 ALL_JAVA_RESOLVERS 官方解释,作用,如何使用详细说明
- Spring Boot(五):spring data jpa的使用详解编程语言
- 系统命令Java实现Linux系统命令调用的探究(java调用linux)
- Java编程操作Oracle数据库(java操作oracle)
- 使用Java实现Redis数据自动过期(redisjava过期)
- key处理解决Redis中Java端过期key的方法(redisjava过期)
- 让Java桥接Linux新世界的重要性(java链接linux)
- 程序Oracle调用Java程序的实现方法(oracle调用java)
- 使用Java去连接MySQL数据库(java jdbc mysql)
- Java解答Oracle使用更轻松的实现方式(oracle写成java)
- java求数组最大值和最小数示例分享