探索如何利用 XML 和 Amazon Web Services 集成企业应用程序,以及使用Microsoft® .NET (C#) 和 Java™ 平台构建跨平台应用程序集成功能。

       队列 是用于存储等待处理的消息的临时数据结构。Amazon Simple Queue Services (Amazon SQS) 是一个支持 Web 服务的高可用性可伸缩消息队列。Amazon SQS 的主要益处包括:

  • 基于云的解决方案。由 Amazon 管理,不需使用私有基础设施,也不需要专业支持知识。
  • 基于 Internet。任何连接到 Internet 的客户端都可以通过 Web 服务访问该服务,因此支持业务到业务(B2B)集成。
  • 冗余。该服务在多个服务器上存储所有消息,以提供高可用性和容错。
  • 多个并发读/写。Amazon SQS 支持多个进程同时读写一个队列,以及在处理窗口时锁定消息,以避免两个客户端同时处理一条消息。
  • 可配置。通过使用 Amazon SQS 服务,您可以根据存储在队列中的消息的处理需求设计和锁定窗口。锁定窗口能够阻止两个队列读取器同时处理同一个队列项。对于处理时间更长的队列项,则需要更长时间地锁定窗口。锁定窗口由可见性超时参数控制,您可以给每个队列配置该参数。
  • 易于使用的 API。它为常见的语言(包括 Java 和 Microsoft .NET 平台)提供 API 包装器,以支持快速开发并无缝地集成到现有应用程序中。
  • 低成本的解决方案。公司仅需为它们的 HTTP 请求使用的带宽付费;Amazon SQS 不收取其他额外的费用。

       在开始 Amazon SQS 开发之前,了解它的一些特征是非常有帮助的。如果不了解这些特征,您刚开始使用 Amazon SQS 时可能会碰到挫折,或感到困惑。

       首先,Amazon 不能保证队列项的处理顺序。这意味着先进先出(first-in-first-out,FIFO)处理不得到保证,这在很多消息队列实现中都很常见。Amazon 仅保证所有消息都分发出去。

       Amazon SQS 的第二大特征是最终一致性。大型数据库系统的主要特征是一致性、高可用性和可伸缩性。Amazon 不是关注所有 3 个特征,而是主要关注高可用性和可伸缩性,然后再以此为基础提供最终的一致性。这意味着 Amazon 通过将所有消息发送到多个服务器来实现高可用性和可伸缩性。Amazon 保证最终会将所有消息分发出去,但不保证什么时候分发它们。从实用的角度看,这意味着假如您向一个队列发送 3 条消息,当下次尝试接收这些消息时,不一定能收到所有 3 条消息。您可能在一个 Read 中收到所有 3 条消息,或在第一个 Read 中收到前两条消息,在第二个 Read 中收到第三条消息。如果您持续地轮询该队列,最终肯定能收到所有 3 条消息。

       为了开始使用 Amazon SQS,您必须根据自己使用的语言获取 Amazon SQS 的 API 库。Amazon 为所有常见的语言都提供了一个库,比如 Perl、Microsoft Visual Basic®.NET、C#、Java 和 PHP。这些库是开源的,并且易于使用。参考资料小节提供这些库的下载链接。

通过 Java 语言使用 Amazon SQS

       现在您首先学习如何使用 Java 语言创建队列、发送消息和接收消息。第一步是创建一个 Amazon SQS 队列。清单 1 中的代码显示了如何为 Amazon SQS 创建 HTTP 客户端、实例化 CreateQueueRequest 对象和调用队列创建请求。Access Key ID(由 20 个字母和数字组成)是请求身份验证或读取队列项所需的密匙。为了创建或操作队列项,您需要使用 Secret Access Key(由 40 个字母和数字组成)。注册 Amazon 时就会收到这些密匙
清单 1. 创建 Amazon SQS 队列

				
String queueName = "TestQueue";

// create http client
AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);

// instantiate create queue request
CreateQueueRequest request = new CreateQueueRequest();
request.setQueueName(queueName);
request.setDefaultVisibilityTimeout(30);

// execute create queue operation and get the server response
System.out.print("Creating Queue: " + queueName);    
CreateQueueResponse response = service.createQueue(request);
if (response.isSetCreateQueueResult()) {
	System.out.print("Create Queue Result:");    
	CreateQueueResult createQueueResult = response.getCreateQueueResult();
	if (createQueueResult.isSetQueueUrl()) {
		System.out.print("Queue Url: " + createQueueResult.getQueueUrl());
	}
}

 

 

       下一步是向最新创建的队列发送一条消息。清单 2 中的代码显示了如何为 Amazon SQS 创建 HTTP 客户端,以及如何向队列发送一个简单的消息。


清单 2. 向队列发送消息

				
String queueName = "TestQueue";

// create http client
AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);

// instantiate send message request
SendMessageRequest request = new SendMessageRequest();
request.setQueueName(queueName);
request.setMessageBody("Test SQS Message");

// execute the send message operation and get the server response
SendMessageResponse response = service.sendMessage(request);
if (response.isSetSendMessageResult()) {
	System.out.print("Send Message Result: ");

	SendMessageResult sendMessageResult = response.getSendMessageResult();
	if (sendMessageResult.isSetMessageId()) {
		System.out.print("\tMessageId: " + sendMessageResult.getMessageId());
	}
} 

 

 

       现在,我们尝试从队列接收消息。清单 3 显示了如何为 Amazon SQS 创建 HTTP 客户端,以及如何从队列接收消息。Message 包含来自队列的消息并公开几个关键方法:

  • getMessageId返回消息的唯一标识符。您可以使用 isSetMessageId 确定消息 ID 是否已设置。
  • getReceiptHandle将句柄返回给消息。句柄用于删除消息。您可以使用 isSetReceiptHandle 确定消息句柄是否已设置。
  • getBody以字符串的形式返回消息体。消息可以是纯文本或 XML,您可以使用 isSetBody 确定消息体是否已设置。


清单 3. 从队列接收消息

				
String queueName = "TestQueue";

// create http client
AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);

// instantiate the receive message request
ReceiveMessageRequest request = new ReceiveMessageRequest();
request.setQueueName(queueName);
// the following two parameters are optional
request.setMaxNumberOfMessages(10); // set maximum number of messages to receive
request.setVisibilityTimeout(30); // set visibility window
		 
// execute the receive messages operation and get server response
ReceiveMessageResponse response = service.receiveMessage(request);

System.out.print("Receive Message Response:");

if (response.isSetReceiveMessageResult()) {
	ReceiveMessageResult  receiveMessageResult = response.getReceiveMessageResult();
	java.util.List<Message> messageList = receiveMessageResult.getMessage();
	for (Message message : messageList) {			
		if (message.isSetMessageId()) {
			System.out.print("MessageId: " + message.getMessageId());
		}

		if (message.isSetReceiptHandle()) {
			System.out.print("ReceiptHandle: " + message.getReceiptHandle());
		}
		if (message.isSetBody()) {
			System.out.print("Body: " + message.getBody());
		}
}

 

 

通过 C# 使用 Amazon SQS

       现在,您将使用 C# 将一个对象系列化到 XML,并将其作为 Amazon SQS 消息发送。

       第一步是创建一个将被系列化的业务对象;清单 4 显示了一个 Product 对象。公共属性被控制 XML 系列化的属性修饰。C# 属性类似于 Java 注释,定义属性如何映射到 XML 元素或 XML 属性。此外,这个类包含将对象实例系列化到 XML 的 ToXml() 方法。


清单 4. 创建用于系列化的业务对象

				
namespace Stewart.Test
{
/// <summary>
/// Product
/// </summary>
[XmlRoot(ElementName="Product")]
public class Product
{
	/// <summary>
	/// Product Name
	/// </summary>
	[XmlElement("ProductName")]
	public string ProductName;

	/// <summary>
	/// Product Price
	/// </summary>
	[XmlElement("ProductPrice")]
	public decimal ProductPrice;

	/// <summary>
	/// Quantity in stock
	/// </summary>
	[XmlElement("InStock")]
	public bool InStock;

	/// <summary>
	/// Product Id
	/// </summary>
	[XmlAttributeAttribute(AttributeName = "Id", DataType = "integer")]
	public string Id;

	/// <summary>
	/// Initializes a new instance of the <see cref="Product"/> class.
	/// </summary>
	public Product()
	{
	}

	/// <summary>
	/// Initializes a new instance of the <see cref="Product"/> class.
	/// </summary>
	/// <param>Name of the product.</param>
	/// <param>The product price.</param>
	public Product(string productName, decimal productPrice)
	{
		this.ProductName = productName;
		this.ProductPrice = productPrice;
	}

	/// <summary>
	/// Converts to XML.
	/// </summary>
	/// <returns></returns>
	public String ToXml()
	{
		StringBuilder output = new StringBuilder();

		// no name space
		XmlSerializerNamespaces ns = new XmlSerializerNamespaces();
		ns.Add("", "");

		// settings to omit xml declaration
		XmlWriterSettings settings = new XmlWriterSettings();
		settings.OmitXmlDeclaration = true;

		// finally serialize to string
		XmlWriter writer = XmlTextWriter.Create(output, settings);
		XmlSerializer serializer = new XmlSerializer(typeof(Product));            
		serializer.Serialize(writer, this, ns);

		// return string containing XML document
		return output.ToString();
	}
}

 

 

       接下来,发送 XML 消息。用于 Amazon SQS 的 Amazon C# API 在功能上类似于 Java API。清单 5 中的代码显示了如何使用 C# 发送消息。


清单 5. 使用 C# 发送消息

				
Product prod = new Product("Widget", 1.5M);
string accessKeyId = ConfigurationSettings.AppSettings["AmazonAccessKeyID"];
string secretAccessKey = ConfigurationSettings.AppSettings["AmazonSecretAccessKey"];

AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);
SendMessageRequest request = new SendMessageRequest();
request.MessageBody = prod.ToXml();
request.QueueName = "TestQueue";

SendMessageResponse response = service.SendMessage(request);
if (response.IsSetSendMessageResult())
{
	Console.WriteLine("Send Message Response: ");
	SendMessageResult sendMessageResult = response.SendMessageResult;
	if (sendMessageResult.IsSetMessageId())
	{
		Console.WriteLine(String.Format("MessageId {0}", 
			sendMessageResult.MessageId));
	}
	if (sendMessageResult.IsSetMD5OfMessageBody())
	{
		Console.WriteLine(String.Format("MD5OfMessageBody: {0}", 
			sendMessageResult.MD5OfMessageBody));
	}
} 

 

 

图 1 显示了 清单 5 的输出结果。


图 1. 发送 XML 消息的输出

       最后一步是从队列接收 XML 消息,并反系列化实例。清单 6 显示了将 XML 消息反系列化到 Product 实例的代码。


清单 6. 反序列化 XML 消息

				
Product prod = null;            

string accessKeyId = ConfigurationSettings.AppSettings["AmazonAccessKeyID"];
string secretAccessKey = ConfigurationSettings.AppSettings["AmazonSecretAccessKey"];

AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);

ReceiveMessageRequest request = new ReceiveMessageRequest();
request.QueueName = "TestQueue";
ReceiveMessageResponse response = service.ReceiveMessage(request);

if (response.IsSetReceiveMessageResult())
{
	Console.WriteLine("Receive Message Result:");
	ReceiveMessageResult receiveMessageResult = response.ReceiveMessageResult;
	List<Message> messageList = receiveMessageResult.Message;
	foreach (Message message in messageList)
	{                    
		if (message.IsSetMessageId())
		{
			Console.WriteLine(String.Format("MessageId: {0}",
				message.MessageId));
		}                                      
		if (message.IsSetBody())
		{
			Console.WriteLine(string.Format("Body: {0}", message.Body));
			String xml = message.Body;
			StringReader sr = new StringReader(xml);
			XmlSerializer serializer = new XmlSerializer(typeof(Product));
			prod = (Product) serializer.Deserialize(sr);
			Console.WriteLine(string.Format("Id: {0}", prod.Id));
			Console.WriteLine(string.Format("Name: {0}", prod.ProductName));
			Console.WriteLine(string.Format("Price: {0}", prod.ProductPrice));
		}
	}
} 

 

 

图 2 显示了 清单 6 的输出结果。


图 2. 接收 XML 消息输出

       尽管以上的例子非常简单,但是它们是非常强大的,因为您可以系列化一个对象,并向另一个不局限于本地物理网络的应用程序发送消息。这里没有复杂的防火墙限制或安全性考虑事项。此外,不需要用相同的语言编写消息的发送器和接收器,甚至不需要使用相同的平台。

技术概述和设计

       这个示例解决方案包含需要集成业务流程的分销商和制造商。分销商 从制造商 处购买商品并出售给客户

       当客户需要商品时,分销商使用 C# WinForm 客户端提交一个客户订单。订单提交程序将订单细节存储在一个本地 MySQL 数据库中。该客户端还允许用户浏览库存、查看订单和 Amazon SQS 队列。

       一般而言,分销商的库存能够满足客户的订购需求。如果库存不足的话,分销商会及时向制造商发送一个购买订单。然后,当物品已经发出时,制造商发送回一个订单汇总。所有这些通信都使用 Amazon SQS 来完成。

       分销商的 Order Fulfillment 和 Inventory Management 服务也是使用 C# 构建的,它轮询进入的商品和待处理的客户订单。当处理客户订单时发现商品库存少于订购数量,那么将使用 Amazon SQS 向制造商发送一个购买订单。队列项的消息体是一个包含购买订单的 XML 文档。

       制造商的 Order Processing Service 是基于 Java 平台构建的,它处理购买订单队列。当物品已经发出时,它将使用 Amazon SQS 向分销商回复一条消息。该消息的消息体是包含订单汇总的 XML 文档。

图 3 显示了涉及到的系统。


图 3. 解决方案概图

创建 XML 模式

       第一步是为在分销商和制造商之间发送的消息定义 XML 模式。您需要两个模式:一个购买订单和一个订单汇总。

清单 7 显示了购买订单的模式。


清单 7. 购买订单模式

				
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
  <xs:element>
    <xs:complexType>
      <xs:sequence>
        <xs:element minOccurs="0" />
        <xs:element minOccurs="0" />
        <xs:element minOccurs="0" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
            </xs:sequence>
          </xs:complexType>
        </xs:element>
        <xs:element minOccurs="0" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
            </xs:sequence>
          </xs:complexType>
        </xs:element>
        <xs:element minOccurs="0" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element minOccurs="0" maxOccurs="unbounded">
                <xs:complexType>
                  <xs:attribute />
                  <xs:attribute />
                  <xs:attribute />
                </xs:complexType>
              </xs:element>
            </xs:sequence>
          </xs:complexType>
        </xs:element>
      </xs:sequence>
    </xs:complexType>
  </xs:element>  
</xs:schema>		

 

 

Purchase Order XML 模式包含下列关键元素:

表 1. 购买订单模式中的关键元素

 

关键元素

描述

Id

包含 Purchase Order 的唯一标识符的字符串

OrderDate

包含 Purchase Order 日期的字符串

Company

包含分销商的关键地址信息,包括公司名称、街道地址、城市、州和邮政编码

Vendor

包含制造商的关键地址信息,包括公司名称、街道地址、城市、州和邮政编码

Items

包含订购商品的所有信息,包括商品 ID、商品名称和数量

 

清单 8 显示了订单汇总的模式。

清单 8. 订单汇总模式

				
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
  <xs:element>
    <xs:complexType>
      <xs:sequence>
        <xs:element minOccurs="0" />
        <xs:element minOccurs="0" />
        <xs:element minOccurs="0" />
        <xs:element minOccurs="0" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
            </xs:sequence>
          </xs:complexType>
        </xs:element>
        <xs:element minOccurs="0" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
              <xs:element minOccurs="0" />
            </xs:sequence>
          </xs:complexType>
        </xs:element>
        <xs:element minOccurs="0" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element minOccurs="0" maxOccurs="unbounded">
                <xs:complexType>
                  <xs:attribute />
                  <xs:attribute />
                  <xs:attribute />
                </xs:complexType>
              </xs:element>
            </xs:sequence>
          </xs:complexType>
        </xs:element>
      </xs:sequence>
    </xs:complexType>
  </xs:element>  
</xs:schema>

 

 

Order Summary XML 模式包含下列关键元素:

表 2. 订单汇总模式中的关键元素

 

Id

包含 Order Summary 的唯一标识符的字符串

ReferenceId

包含初始 Purchase Order 的 ID 的字符串

OrderDate

包含 Order Summary 日期的字符串

CustomerAddress

包含分销商的关键地址信息,包括公司名称、街道地址、城市、州和邮政编码

VendorAddress

包含制造商的关键地址信息,包括公司名称、街道地址、城市、州和邮政编码

Items

包含订购商品的所有信息,包括商品 ID、商品名称和数量

 

 

定义数据库实体模型

接下来,我们定义数据库模式。图 4 显示了数据库实体模型。


图 4. 数据库实体模型

Reseller 数据实体包括以下内容:

  • Customer 包含订单的客户联系信息。
  • CustomerOrder 包含客户订单的订单信息。
  • CustomerOrderDetail 包含客户订单的商品细节。
  • Inventory 包含分销商的库存。

Manufacturer 数据实体为:

  • VendorOrder 跟踪由制造商的 Order Processing Service 处理的购买订单。

定义消息队列

最后需要定义的组建是消息队列。表 3 显示了这个解决方案的消息队列。

表 3. Amazon SMS 消息队列

 

队列名

可见性超时

描述

POQueue

30 秒

从分销商发送给制造商的购买订单消息

OSQueue

30 秒

从制造商发送给分销商的订单汇总消息

 

 

关注中国IDC圈官方微信:idc-quan 我们将定期推送IDC产业最新资讯

查看心情排 行你看到此篇文章的感受是:


  • 支持

  • 高兴

  • 震惊

  • 愤怒

  • 无聊

  • 无奈

  • 谎言

  • 枪稿

  • 不解

  • 标题党
2023-06-20 13:48:51
市场情报 Amazon SageMaker地理空间功能现已全面可用,新增安全特性及更多用例
Amazon SageMaker地理空间功能让数据科学家和机器学习工程师能够使用地理空间数据来构建、训练和部署机器学习模型。 <详情>
2023-05-04 10:07:49
市场情报 埃森哲使用Amazon CodeWhisperer助力开发人员提高工作效率
借助CodeWhisperer,开发人员无需在IDE与文档或开发者论坛之间切换,加快编码过程。 <详情>
2023-04-26 10:57:42
市场情报 亚马逊云科技推出Amazon GuardDuty三项新功能,帮助客户保护容器、数据库和Serverless工作负载
新功能扩大检测覆盖范围、持续强化该服务的机器学习能力、异常检测和集成威胁情报,进一步提升对客户工作负载的安全保护。 <详情>
2023-03-27 13:11:59
市场情报 亚马逊云科技推出对象存储服务Amazon S3诸多新的访问、复制功能
目前,Amazon S3拥有超过 280 万亿个对象,平均每秒处理超过 1 亿个请求。 <详情>
2023-02-10 10:12:39
市场情报 亚马逊云科技在中国区域推出六款基于自研芯片Graviton及英特尔Ice Lake新实例
这些实例均基于Amazon Nitro系统构建,Nitro可最小化硬件虚拟化带来的性能消耗,为每个实例提供几乎所有的计算、网络和内存资源,为客户提供更高的计算、网络性能以及更高 <详情>