Apache Camel - 8 - Camel特殊的Endpoint
Apache Camel 特殊的Endopint DirectApache Camel相关代码已经上传GitHub,需要的自取:GitHub - Apache Camel 完整Demo如果觉得还行,麻烦点个StarEndpoint Direct用于在两个编排好的路由之间实现Exchange消息的连接;上一个路由中由最后一个元素处理完的Exchange对象,将被发送至由Dire...
this blog not in maintenance, updates
this is my latest blog address:https://simba-cheng.cn/
Apache Camel 特殊的Endopint Direct
Apache Camel相关代码已经上传GitHub,需要的自取:GitHub - Apache Camel 完整Demo
如果觉得还行,麻烦点个Star
Endpoint Direct用于在两个编排好的路由之间实现Exchange消息的连接;
上一个路由中由最后一个元素处理完的Exchange对象,将被发送至由Direct连接的下一个路由起始位置。
注意:
两个被连接的路由一定要是可用的,并且存在于同一个Camel服务中。
原博中有一个栗子,用来说明Endpoint Direct的简单使用,但问题我运行之后,没法调用...
于是按照之前的一个Demo,修改了一下...
项目结构
代码一:
package com.test.camel.directcamel;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.ModelCamelContext;
import org.apache.log4j.PropertyConfigurator;
public class DirectCamel {
public static void main(String[] args) throws Exception {
// 加载日志
PropertyConfigurator.configure("./conf/log4j.properties");
PropertyConfigurator.configureAndWatch("./conf/log4j.properties", 1000);
// 这是camel上下文对象,整个路由的驱动全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 启动route
camelContext.start();
// 首先将两个完整有效的路由注册到Camel服务中
camelContext.addRoutes(new DirectRouteARouteBuilder());
camelContext.addRoutes(new DirectRouteBRouteBuilder());
// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (DirectCamel.class) {
DirectCamel.class.wait();
}
}
}
代码二:
package com.test.camel.directcamel;
import org.apache.camel.builder.RouteBuilder;
public class DirectRouteARouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
// 连接路由:DirectRouteB
from("jetty:http://0.0.0.0:8282/directCamel").process(new DirectRouteAProcessor()).to("direct:directRouteBRouteBuilder")
.to("log:directRouteARouteBuilder?showExchangeId=true");
}
}
代码三:
package com.test.camel.directcamel;
import org.apache.camel.builder.RouteBuilder;
public class DirectRouteBRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:directRouteBRouteBuilder").process(new DirectRouteBProcessor()).to("log:directRouteBRouteBuilder?showExchangeId=true");
}
}
代码四:
package com.test.camel.directcamel;
import java.io.IOException;
import java.io.InputStream;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.http.common.HttpMessage;
import org.apache.commons.io.IOUtils;
public class DirectRouteAProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
// 因为很明确消息格式是http的,所以才使用这个类
// 否则还是建议使用org.apache.camel.Message这个抽象接口
HttpMessage message = (HttpMessage) exchange.getIn();
InputStream bodyStream = (InputStream) message.getBody();
String inputContext = this.analysisMessage(bodyStream);
System.out.println("A inputContext -- " + inputContext);
bodyStream.close();
// 存入到exchange的out区域
if (exchange.getPattern() == ExchangePattern.InOut) {
Message outMessage = exchange.getOut();
outMessage.setBody(inputContext + " || ---");
}
}
/**
* 从stream中分析字符串内容
*
* @param bodyStream
* @return
*/
private String analysisMessage(InputStream bodyStream) throws IOException {
String responseStr = IOUtils.toString(bodyStream, "UTF-8");
return responseStr;
}
}
代码五:
package com.test.camel.directcamel;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.http.common.HttpMessage;
public class DirectRouteBProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
// 因为很明确消息格式是http的,所以才使用这个类
// 否则还是建议使用org.apache.camel.Message这个抽象接口
HttpMessage message = (HttpMessage) exchange.getIn();
String bodyStream = (String) message.getBody();
System.out.println("B bodyStream -- " + bodyStream);
// 存入到exchange的out区域
if (exchange.getPattern() == ExchangePattern.InOut) {
Message outMessage = exchange.getOut();
outMessage.setBody(bodyStream + " || out");
}
}
}
代码六,测试代码:
package com.test.client.http;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.nio.charset.Charset;
public class HttpClient {
public static final String CODEFORMAT = "UTF-8";
public static String doPost(String requestBody, int timeout, HttpURLConnection http) throws Exception {
String retResult = "";
try {
// 设置是否从HttpURLConnection读入 ,默认是true
http.setDoInput(true);
// post请求的时候需要将DoOutput 设置成true 参数要放在http正文内,因此需要设为true ,默认是false
http.setDoOutput(true);
// post请求UseCaches 设置成false 不能设置缓存
http.setUseCaches(false);
// 连接主机超时时间
http.setConnectTimeout(timeout);
// 从主机读取数据超时 (都是毫秒)
http.setReadTimeout(timeout);
// 默认是get
http.setRequestMethod("POST");
http.setRequestProperty("accept", "*/*");
http.setRequestProperty("connection", "Keep-Alive");
http.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
// 参数设置完成之后进行连接
http.connect();
OutputStreamWriter osw = new OutputStreamWriter(http.getOutputStream(), Charset.forName("UTF-8"));
osw.write(requestBody);
osw.flush();
osw.close();
if (http.getResponseCode() == 200) {
BufferedReader in = new BufferedReader(new InputStreamReader(http.getInputStream(), Charset.forName("UTF-8")));
String inputLine;
while ((inputLine = in.readLine()) != null) {
retResult += inputLine;
}
in.close();
} else {
throw new Exception("the http.getResponseCode() is " + http.getResponseCode());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (http != null) {
http.disconnect();
http = null;
}
}
return retResult;
}
}
代码七,测试代码:
package com.test.client.http;
import java.net.HttpURLConnection;
import java.net.URL;
import org.json.JSONObject;
public class TestClient {
public static void main(String[] args) {
URL url = null;
HttpURLConnection http = null;
try {
// url = new URL("http://0.0.0.0:8282/doHelloWorld");
url = new URL("http://0.0.0.0:8282/directCamel");
for (int i = 0; i < 1; i++) {
System.out.println("http post start !!!");
Long startTime = System.currentTimeMillis();
http = (HttpURLConnection) url.openConnection();
// ************************************************************
JSONObject authorityJson = new JSONObject();
authorityJson.put("userid", "222222222222222"); // 用户身份证号码
authorityJson.put("username", "VIP_USER");// 用户姓名
authorityJson.put("userdept", "VIP");// 用户单位
JSONObject queryInfoJson = new JSONObject();
queryInfoJson.put("source", "60155");// 测试用
queryInfoJson.put("condition", "FIRSTKEY = '320103671118051'");
queryInfoJson.put("starttime", "");
queryInfoJson.put("endtime", "");
JSONObject requestJson = new JSONObject();
// requestJson.put("authority", authorityJson);
requestJson.put("queryInfo", queryInfoJson);
// ************************************************************
StringBuffer sb = new StringBuffer();
sb.append(requestJson.toString());
String result = HttpClient.doPost(sb.toString(), 30000000, http);
System.out.println("http post end cost :" + (System.currentTimeMillis() - startTime) + "ms");
System.out.println(result);
Thread.sleep(500);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
代码八,日志配置:
#####输出到控制台#####
log4j.rootLogger=DEBUG,CONSOLE,ARKSERVICES
log4j.addivity.org.apache=true
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=DEBUG
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%d] [%t] [%p] [%l] - %m%n
#####输出到文件#####
log4j.appender.ARKSERVICES=org.apache.log4j.RollingFileAppender
log4j.appender.ARKSERVICES.File=./log/camel-test.log
log4j.appender.ARKSERVICES.Threshold=DEBUG
log4j.appender.ARKSERVICES.MaxBackupIndex=5
log4j.appender.ARKSERVICES.MaxFileSize=100MB
log4j.appender.ARKSERVICES.layout=org.apache.log4j.PatternLayout
log4j.appender.ARKSERVICES.layout.ConversionPattern=[%d] [%t] [%p] [%l] - %m%n
log4j.logger.org.eclipse=OFF
在上面的代码中,我们编排了两个可用的路由,命名为"DirectRouteARouteBuilder","DirectRouteBRouteBuilder"。
其中"DirectRouteARouteBuilder"实例在最后一个Endpoint控制端点("direct:directRouteBRouteBuilder")中使用Endpoint Direct将Exchange消息发送至"DirectRouteBRouteBuilder"实例的开始位置。
运行并测试,结果如下:
从上面运行的效果看,被连接的两个路由使用的Exchange对象是同一个,也就是说在"DirectRouteBRouteBuilder"路由中如果Exchange对象中的内容发生了变化,就会在随后继续执行的"DirectRouteBRouteBuilder"路由中产生影响。
Endopint Direct元素在我们实际使用Camel进行路由编排时,应用频度非常高。
因为它可以把多个已编排好的路由按照业务要求连接起来,形成一个新的路由,保持原有路由的良好重用。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)