this blog not in maintenance, updates

this is my latest blog address:https://simba-cheng.cn/

Apache Camel 特殊的Endopint Direct

Apache Camel相关代码已经上传GitHub,需要的自取:GitHub - Apache Camel 完整Demo

如果觉得还行,麻烦点个Star

Endpoint Direct用于在两个编排好的路由之间实现Exchange消息的连接;

上一个路由中由最后一个元素处理完的Exchange对象,将被发送至由Direct连接的下一个路由起始位置。

注意:

两个被连接的路由一定要是可用的,并且存在于同一个Camel服务中。

原博中有一个栗子,用来说明Endpoint Direct的简单使用,但问题我运行之后,没法调用...

于是按照之前的一个Demo,修改了一下...

项目结构

代码一:

package com.test.camel.directcamel;

import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.ModelCamelContext;
import org.apache.log4j.PropertyConfigurator;

public class DirectCamel {

	public static void main(String[] args) throws Exception {

	// 加载日志
	PropertyConfigurator.configure("./conf/log4j.properties");
	PropertyConfigurator.configureAndWatch("./conf/log4j.properties", 1000);

	// 这是camel上下文对象,整个路由的驱动全靠它了。
	ModelCamelContext camelContext = new DefaultCamelContext();

	// 启动route
	camelContext.start();

	// 首先将两个完整有效的路由注册到Camel服务中
	camelContext.addRoutes(new DirectRouteARouteBuilder());
	camelContext.addRoutes(new DirectRouteBRouteBuilder());

	// 通用没有具体业务意义的代码,只是为了保证主线程不退出
	synchronized (DirectCamel.class) {
		DirectCamel.class.wait();
	}

	}
}

代码二:

package com.test.camel.directcamel;

import org.apache.camel.builder.RouteBuilder;

public class DirectRouteARouteBuilder extends RouteBuilder {

	@Override
	public void configure() throws Exception {
		// 连接路由:DirectRouteB
		from("jetty:http://0.0.0.0:8282/directCamel").process(new DirectRouteAProcessor()).to("direct:directRouteBRouteBuilder")
		.to("log:directRouteARouteBuilder?showExchangeId=true");
	}

}

代码三:

package com.test.camel.directcamel;

import org.apache.camel.builder.RouteBuilder;

public class DirectRouteBRouteBuilder extends RouteBuilder {

	@Override
	public void configure() throws Exception {
		from("direct:directRouteBRouteBuilder").process(new DirectRouteBProcessor()).to("log:directRouteBRouteBuilder?showExchangeId=true");
	}

}

代码四:

package com.test.camel.directcamel;

import java.io.IOException;
import java.io.InputStream;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.http.common.HttpMessage;
import org.apache.commons.io.IOUtils;

public class DirectRouteAProcessor implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {

	// 因为很明确消息格式是http的,所以才使用这个类
	// 否则还是建议使用org.apache.camel.Message这个抽象接口
	HttpMessage message = (HttpMessage) exchange.getIn();
	InputStream bodyStream = (InputStream) message.getBody();
	String inputContext = this.analysisMessage(bodyStream);

	System.out.println("A inputContext -- " + inputContext);

	bodyStream.close();

	// 存入到exchange的out区域
	if (exchange.getPattern() == ExchangePattern.InOut) {
		Message outMessage = exchange.getOut();
		outMessage.setBody(inputContext + " || ---");
	}
	}

	/**
	 * 从stream中分析字符串内容
	 * 
	 * @param bodyStream
	 * @return
	 */
	private String analysisMessage(InputStream bodyStream) throws IOException {
		String responseStr = IOUtils.toString(bodyStream, "UTF-8");
		return responseStr;

	}

}

代码五:

package com.test.camel.directcamel;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.http.common.HttpMessage;

public class DirectRouteBProcessor implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {

	// 因为很明确消息格式是http的,所以才使用这个类
	// 否则还是建议使用org.apache.camel.Message这个抽象接口
	HttpMessage message = (HttpMessage) exchange.getIn();
	String bodyStream = (String) message.getBody();
	System.out.println("B bodyStream  -- " + bodyStream);

	// 存入到exchange的out区域
	if (exchange.getPattern() == ExchangePattern.InOut) {
		Message outMessage = exchange.getOut();
		outMessage.setBody(bodyStream + " || out");
	}
	}

}

代码六,测试代码:

package com.test.client.http;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.nio.charset.Charset;

public class HttpClient {

	public static final String CODEFORMAT = "UTF-8";

	public static String doPost(String requestBody, int timeout, HttpURLConnection http) throws Exception {
		String retResult = "";
		try {
			// 设置是否从HttpURLConnection读入 ,默认是true
			http.setDoInput(true);
			// post请求的时候需要将DoOutput 设置成true 参数要放在http正文内,因此需要设为true ,默认是false
			http.setDoOutput(true);
			// post请求UseCaches 设置成false 不能设置缓存
			http.setUseCaches(false);
			// 连接主机超时时间
			http.setConnectTimeout(timeout);
			// 从主机读取数据超时 (都是毫秒)
			http.setReadTimeout(timeout);
			// 默认是get
			http.setRequestMethod("POST");
			http.setRequestProperty("accept", "*/*");
			http.setRequestProperty("connection", "Keep-Alive");
			http.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
			// 参数设置完成之后进行连接
			http.connect();
			OutputStreamWriter osw = new OutputStreamWriter(http.getOutputStream(), Charset.forName("UTF-8"));
			osw.write(requestBody);
			osw.flush();
			osw.close();
			if (http.getResponseCode() == 200) {
				BufferedReader in = new BufferedReader(new InputStreamReader(http.getInputStream(), Charset.forName("UTF-8")));
				String inputLine;
				while ((inputLine = in.readLine()) != null) {
						retResult += inputLine;
				}
				in.close();
			} else {
				throw new Exception("the http.getResponseCode() is " + http.getResponseCode());
			}
		} catch (Exception e) {
				e.printStackTrace();
		} finally {
			if (http != null) {
					http.disconnect();
					http = null;
			}
		}
		return retResult;
	}
}

代码七,测试代码:

package com.test.client.http;

import java.net.HttpURLConnection;
import java.net.URL;

import org.json.JSONObject;

public class TestClient {

	public static void main(String[] args) {
			URL url = null;
			HttpURLConnection http = null;

	try {

//			url = new URL("http://0.0.0.0:8282/doHelloWorld");
	url = new URL("http://0.0.0.0:8282/directCamel");
	

	for (int i = 0; i < 1; i++) {
			System.out.println("http post start !!!");
			Long startTime = System.currentTimeMillis();

	http = (HttpURLConnection) url.openConnection();

	// ************************************************************
	JSONObject authorityJson = new JSONObject();
	authorityJson.put("userid", "222222222222222"); // 用户身份证号码
	authorityJson.put("username", "VIP_USER");// 用户姓名
	authorityJson.put("userdept", "VIP");// 用户单位

	JSONObject queryInfoJson = new JSONObject();
	queryInfoJson.put("source", "60155");// 测试用
	queryInfoJson.put("condition", "FIRSTKEY = '320103671118051'");
	queryInfoJson.put("starttime", "");
	queryInfoJson.put("endtime", "");

	JSONObject requestJson = new JSONObject();
//				requestJson.put("authority", authorityJson);
	requestJson.put("queryInfo", queryInfoJson);
	// ************************************************************

	StringBuffer sb = new StringBuffer();
	sb.append(requestJson.toString());

	String result = HttpClient.doPost(sb.toString(), 30000000, http);

	System.out.println("http post end cost :" + (System.currentTimeMillis() - startTime) + "ms");
	System.out.println(result);

	Thread.sleep(500);
	}

	} catch (Exception e) {
			e.printStackTrace();
	}

	}
}

代码八,日志配置:

#####输出到控制台#####
log4j.rootLogger=DEBUG,CONSOLE,ARKSERVICES
log4j.addivity.org.apache=true
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=DEBUG
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%d] [%t] [%p] [%l] - %m%n

#####输出到文件#####
log4j.appender.ARKSERVICES=org.apache.log4j.RollingFileAppender
log4j.appender.ARKSERVICES.File=./log/camel-test.log
log4j.appender.ARKSERVICES.Threshold=DEBUG
log4j.appender.ARKSERVICES.MaxBackupIndex=5
log4j.appender.ARKSERVICES.MaxFileSize=100MB
log4j.appender.ARKSERVICES.layout=org.apache.log4j.PatternLayout
log4j.appender.ARKSERVICES.layout.ConversionPattern=[%d] [%t] [%p] [%l] - %m%n

log4j.logger.org.eclipse=OFF

在上面的代码中,我们编排了两个可用的路由,命名为"DirectRouteARouteBuilder","DirectRouteBRouteBuilder"。

其中"DirectRouteARouteBuilder"实例在最后一个Endpoint控制端点("direct:directRouteBRouteBuilder")中使用Endpoint Direct将Exchange消息发送至"DirectRouteBRouteBuilder"实例的开始位置。

运行并测试,结果如下:

从上面运行的效果看,被连接的两个路由使用的Exchange对象是同一个,也就是说在"DirectRouteBRouteBuilder"路由中如果Exchange对象中的内容发生了变化,就会在随后继续执行的"DirectRouteBRouteBuilder"路由中产生影响。

Endopint Direct元素在我们实际使用Camel进行路由编排时,应用频度非常高。

因为它可以把多个已编排好的路由按照业务要求连接起来,形成一个新的路由,保持原有路由的良好重用。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐