文章目录
-
- 在当今快速变化的商业环境中,企业需要能够快速响应市场变化的柔性供应链系统。WordPress作为全球最流行的内容管理系统,其插件架构为供应链管理软件的开发提供了强大基础。然而,当我们将供应链系统扩展到分布式架构时,事务处理成为了一个关键挑战。 传统的单体应用事务处理在分布式环境中不再适用,因为供应链系统通常涉及多个独立的服务:库存管理、订单处理、物流跟踪、支付网关等。这些服务可能部署在不同的服务器上,甚至使用不同的数据库技术。本文将深入探讨在WordPress柔性供应链插件开发中实现分布式事务的完整解决方案。
-
- 两阶段提交协议是经典的分布式事务解决方案,包含准备阶段和提交阶段。 <?php /** * WordPress供应链分布式事务管理器 - 两阶段提交实现 */ class SupplyChain_2PC_Coordinator { private $participants = []; private $wpdb; public function __construct() { global $wpdb; $this->wpdb = $wpdb; } /** * 添加事务参与者 * @param string $service_name 服务名称 * @param string $prepare_url 准备阶段API端点 * @param string $commit_url 提交阶段API端点 * @param string $rollback_url 回滚API端点 */ public function add_participant($service_name, $prepare_url, $commit_url, $rollback_url) { $this->participants[$service_name] = [ 'prepare_url' => $prepare_url, 'commit_url' => $commit_url, 'rollback_url' => $rollback_url, 'status' => 'init' ]; } /** * 执行分布式事务 * @param array $transaction_data 事务数据 * @return bool 事务是否成功 */ public function execute_transaction($transaction_data) { // 第一阶段:准备阶段 $prepare_results = $this->prepare_phase($transaction_data); if (!$this->all_prepared($prepare_results)) { $this->rollback_all($prepare_results); return false; } // 第二阶段:提交阶段 $commit_results = $this->commit_phase(); return $this->all_committed($commit_results); } /** * 准备阶段:询问所有参与者是否可以提交 */ private function prepare_phase($data) { $results = []; foreach ($this->participants as $name => &$participant) { $response = wp_remote_post($participant['prepare_url'], [ 'timeout' => 30, 'body' => json_encode($data), 'headers' => ['Content-Type' => 'application/json'] ]); if (is_wp_error($response)) { $participant['status'] = 'failed'; $results[$name] = false; } else { $body = json_decode(wp_remote_retrieve_body($response), true); $participant['status'] = $body['can_commit'] ? 'prepared' : 'failed'; $results[$name] = $body['can_commit']; $participant['transaction_id'] = $body['transaction_id'] ?? null; } } return $results; } /** * 提交阶段:通知所有参与者提交事务 */ private function commit_phase() { $results = []; foreach ($this->participants as $name => &$participant) { if ($participant['status'] !== 'prepared') { continue; } $response = wp_remote_post($participant['commit_url'], [ 'timeout' => 30, 'body' => json_encode([ 'transaction_id' => $participant['transaction_id'] ]) ]); $participant['status'] = is_wp_error($response) ? 'commit_failed' : 'committed'; $results[$name] = !is_wp_error($response); } return $results; } /** * 回滚所有参与者 */ private function rollback_all($prepare_results) { foreach ($this->participants as $name => &$participant) { if ($participant['status'] === 'prepared' && isset($participant['transaction_id'])) { wp_remote_post($participant['rollback_url'], [ 'timeout' => 15, 'body' => json_encode([ 'transaction_id' => $participant['transaction_id'] ]) ]); $participant['status'] = 'rolled_back'; } } } private function all_prepared($results) { return !in_array(false, $results, true); } private function all_committed($results) { return !in_array(false, $results, true); } } ?>
- <?php /** * WordPress订单处理服务 - 2PC参与者实现 */ class OrderService_2PC_Participant { /** * 准备阶段处理 */ public function prepare_order($request) { $data = json_decode($request->get_body(), true); // 生成临时事务ID $transaction_id = 'order_' . wp_generate_uuid4(); // 在临时表中创建订单记录(未提交状态) global $wpdb; $wpdb->insert( $wpdb->prefix . 'sc_temp_orders', [ 'transaction_id' => $transaction_id, 'order_data' => json_encode($data), 'status' => 'prepared', 'created_at' => current_time('mysql') ] ); // 检查库存是否充足(模拟检查) $inventory_adequate = $this->check_inventory($data['items']); return [ 'can_commit' => $inventory_adequate, 'transaction_id' => $transaction_id ]; } /** * 提交阶段处理 */ public function commit_order($request) { $data = json_decode($request->get_body(), true); $transaction_id = $data['transaction_id']; global $wpdb; // 开始数据库事务 $wpdb->query('START TRANSACTION'); try { // 从临时表获取订单数据 $temp_order = $wpdb->get_row( $wpdb->prepare( "SELECT * FROM {$wpdb->prefix}sc_temp_orders WHERE transaction_id = %s", $transaction_id ) ); if (!$temp_order) { throw new Exception('Transaction not found'); } $order_data = json_decode($temp_order->order_data, true); // 创建正式订单 $order_id = $this->create_final_order($order_data); // 更新库存 $this->update_inventory($order_data['items']); // 删除临时记录 $wpdb->delete( $wpdb->prefix . 'sc_temp_orders', ['transaction_id' => $transaction_id] ); // 提交数据库事务 $wpdb->query('COMMIT'); return ['success' => true, 'order_id' => $order_id]; } catch (Exception $e) { $wpdb->query('ROLLBACK'); return ['success' => false, 'error' => $e->getMessage()]; } } /** * 回滚处理 */ public function rollback_order($request) { $data = json_decode($request->get_body(), true); $transaction_id = $data['transaction_id']; global $wpdb; // 删除临时记录 $wpdb->delete( $wpdb->prefix . 'sc_temp_orders', ['transaction_id' => $transaction_id] ); return ['success' => true]; } private function check_inventory($items) { // 实现库存检查逻辑 return true; // 简化示例 } private function create_final_order($order_data) { // 实现订单创建逻辑 return 12345; // 返回订单ID } private function update_inventory($items) { // 实现库存更新逻辑 } } // WordPress REST API端点注册 add_action('rest_api_init', function() { $participant = new OrderService_2PC_Participant(); register_rest_route('supply-chain/v1', '/order/prepare', [ 'methods' => 'POST', 'callback' => [$participant, 'prepare_order'], 'permission_callback' => '__return_true' ]); register_rest_route('supply-chain/v1', '/order/commit', [ 'methods' => 'POST', 'callback' => [$participant, 'commit_order'], 'permission_callback' => '__return_true' ]); register_rest_route('supply-chain/v1', '/order/rollback', [ 'methods' => 'POST', 'callback' => [$participant, 'rollback_order'], 'permission_callback' => '__return_true' ]); }); ?>
-
- Saga模式通过一系列本地事务和补偿操作来管理分布式事务。每个服务执行自己的本地事务,如果某个步骤失败,则执行反向补偿操作。 <?php /** * WordPress供应链Saga事务协调器 */ class SupplyChain_Saga_Orchestrator { private $steps = []; private $compensations = []; private $current_step = 0; /** * 添加Saga步骤 * @param string $name 步骤名称 * @param callable $action 执行函数 * @param callable $compensation 补偿函数 */ public function add_step($name, $action, $compensation) { $this->steps[] = [ 'name' => $name, 'action' => $action ]; $this->compensations[$name] = $compensation; } /** * 执行Saga事务 * @return array 执行结果 */ public function execute() { $executed_steps = []; $results = []; try { foreach ($this->steps as $index => $step) { $this->current_step = $index; // 执行当前步骤 $result = call_user_func($step['action']); if ($result['success'] !== true) { throw new Exception("Step {$step['name']} failed: " . ($result['error'] ?? 'Unknown error')); } $executed_steps[] = $step['name']; $results[$step['name']] = $result; // 记录执行日志 $this->log_step($step['name'], 'executed', $result); } // 所有步骤成功完成 $this->log_saga_complete(); return [ 'success' => true, 'results' => $results, 'message' => 'Saga transaction completed successfully' ]; } catch (Exception $e) { // 执行补偿操作 $compensation_results = $this->compensate($executed_steps); return [ 'success' => false, 'error' => $e->getMessage(), 'compensation_results' => $compensation_results, 'failed_step' => $this->current_step ]; } } /** * 执行补偿操作 */ private function compensate($executed_steps) { $results = []; // 按相反顺序执行补偿 $reversed_steps = array_reverse($executed_steps); foreach ($reversed_steps as $step_name) { if (isset($this->compensations[$step_name])) { $comp_result = call_user_func($this->compensations[$step_name]); $results[$step_name] = $comp_result; $this->log_step($step_name, 'compensated', $comp_result); } } return $results; } private function log_step($step_name, $status, $data) { global $wpdb; $wpdb->insert( $wpdb->prefix . 'sc_saga_logs', [ 'step_name' => $step_name, 'status' => $status, 'data' => json_encode($data), 'created_at' => current_time('mysql') ] ); } private function log_saga_complete() { global $wpdb; $wpdb->insert( $wpdb->prefix . 'sc_saga_logs', [ 'step_name' => 'SAGA_COMPLETE', 'status' => 'completed', 'data' => json_encode(['message' => 'All steps completed successfully']), 'created_at' => current_time('mysql') ] ); } } ?>
- <?php /** * WordPress订单处理Saga示例 */ class OrderProcessingSaga { public static function create_order_saga($order_data) { $saga = new SupplyChain_Saga_Orchestrator(); // 步骤1: 验证订单 $saga->add_step( 'validate_order', function() use ($order_data) { return self::validate_order($order_data); }, function() use ($order_data) { return self::compensate_validation($order_data); } ); // 步骤2: 预留库存 $saga->add_step( 'reserve_inventory', function() use ($order_data) { return self::reserve_inventory($order_data['items']); }, function() use ($order_data) { return self::release_inventory($order_data['items']); } ); // 步骤3: 处理支付 $saga->add_step( 'process_payment', function() use ($order_data) { return self::process_payment($order_data['payment']); }, function() use ($order_data) { return self::refund_payment($order_data['payment']); } ); // 步骤4: 创建物流单 $saga->add_step( 'create_shipment', function() use ($order_data) { return self::create_shipment($order_data); }, function() use ($order_data) { return self::cancel_shipment($order_data); } ); // 步骤5: 发送确认通知 $saga->add_step( 'send_confirmation', function() use ($order_data) { return self::send_order_confirmation($order_data['customer_email']); }, function() use ($order_data) { // 通知步骤通常不需要补偿 return ['success' => true, 'message' => 'No compensation needed for notification']; } ); return $saga->execute(); } private static function validate_order($order_data) { // 实现订单验证逻辑 if (empty($order_data['items'])) { return ['success' => false, 'error' => 'Order has no items']; } return ['success' => true, 'validated' => true]; } private static function compensate_validation($order_data) { // 验证步骤的补偿操作(通常不需要实际操作) return ['success' => true, 'message' => 'Validation compensation completed']; } private static function reserve_inventory($items) { global $wpdb; try { $wpdb->query('START TRANSACTION'); foreach ($items as $item) { $wpdb->query( $wpdb->prepare( "UPDATE {$wpdb->prefix}sc_inventory SET reserved = reserved + %d WHERE product_id = %d AND quantity >= (reserved + %d)", $item['quantity'], $item['product_id'], $item['quantity'] ) ); if ($wpdb->rows_affected === 0) { throw new Exception("Insufficient inventory for product {$item['product_id']}"); } } $wpdb->query('COMMIT'); return ['success' => true, 'reserved' => true]; } catch (Exception $e) { $wpdb->query('ROLLBACK'); return ['success' => false, 'error' => $e->getMessage()]; } } private static function release_inventory($items) { global $wpdb; $wpdb->query('START TRANSACTION'); foreach ($items as $item) { $wpdb->query( $wpdb->prepare( "UPDATE {$wpdb->prefix}sc_inventory SET reserved = GREATEST(0, reserved - %d) WHERE product_id = %d", $item['quantity'], $item['product_id'] ) ); } $wpdb->query('COMMIT'); return ['success' => true, 'released' => true]; } private static function process_payment($payment_data) { // 集成支付网关处理 // 这里使用模拟支付处理 if (!$payment_success) { return ['success' => false, 'error' => 'Payment processing failed']; } return ['success' => true, 'transaction_id' => 'pay_' . wp_generate_uuid4()]; } private static function refund_payment($payment_data) { // 实现支付退款逻辑 return ['success' => true, 'refunded' => true]; } private static function create_shipment($order_data) { // 集成物流服务API $shipment_id = 'ship_' . wp_generate_uuid4(); // 记录发货信息到数据库 global $wpdb; $wpdb->insert( $wpdb->prefix . 'sc_shipments', [ 'order_id' => $order_data['order_id'] ?? 0, 'shipment_id' => $shipment_id, 'status' => 'created', 'created_at' => current_time('mysql') ] ); return ['success' => true, 'shipment_id' => $shipment_id]; } private static function cancel_shipment($order_data) { // 取消物流单逻辑 global $wpdb; $wpdb->update( $wpdb->prefix . 'sc_shipments', ['status' => 'cancelled'], ['order_id' => $order_data['order_id'] ?? 0] ); return ['success' => true, 'cancelled' => true]; } private static function send_order_confirmation($email) { // 发送订单确认邮件 $subject = '您的订单已确认'; $message = '感谢您的订购,我们正在处理您的订单。'; $sent = wp_mail($email, $subject, $message); return ['success' => $sent, 'email_sent' => $sent]; } } // WordPress短代码集成Saga订单处理 add_shortcode('process_order_saga', function($atts) { $atts = shortcode_atts([ 'order_id' => 0 ], $atts); // 获取订单数据 $order_data = self::get_order_data($atts['order_id']); if (empty($order_data)) { return '<div class="error">订单不存在</div>'; } // 执行Saga事务 $result = OrderProcessingSaga::create_order_saga($order_data); if ($result['success']) { return '<div class="success">订单处理成功!</div>'; } else { return '<div class="error">订单处理失败: ' . esc_html($result['error']) . '</div>'; } }); ?>
-
- <?php /** * WordPress供应链数据库优化 */ class SupplyChain_DB_Optimizer { /** * 创建优化索引 */ public static function create_optimized_indexes() { global $wpdb; $indexes = [ // 订单表索引 "CREATE INDEX idx_orders_status ON {$wpdb->prefix}sc_orders(status)", "CREATE INDEX idx_orders_customer ON {$wpdb->prefix}sc_orders(customer_id, created_at)", // 库存表索引 "CREATE INDEX idx_inventory_product ON {$wpdb->prefix}sc_inventory(product_id)", "CREATE INDEX idx_inventory_quantity ON {$wpdb->prefix}sc_inventory(quantity, reserved)", // 消息队列表索引 "CREATE INDEX idx_queue_processing ON {$wpdb->prefix}sc_message_queue(status, message_type, created_at)", // Saga日志表索引 "CREATE INDEX idx_saga_steps ON {$wpdb->prefix}sc_saga_logs(step_name, status, created_at)", ]; foreach ($indexes as $sql) { $wpdb->query($sql); } } /** * 分区大表 */ public static function partition_large_tables() { global $wpdb; // 按月份分区订单表 $partition_sql = " ALTER TABLE {$wpdb->prefix}sc_orders PARTITION BY RANGE (YEAR(created_at) * 100 + MONTH(created_at)) ( PARTITION p202401 VALUES LESS THAN (202402), PARTITION p202402 VALUES LESS THAN (202403), PARTITION p202403 VALUES LESS THAN (202404), PARTITION p_future VALUES LESS THAN MAXVALUE )"; $wpdb->query($partition_sql); } /** * 查询性能监控 */ public static function monitor_query_performance() { add_filter('query', function($query) { if (defined('SAVEQUERIES') && SAVEQUERIES) { $start_time = microtime(true); // 记录慢查询 add_filter('posts_pre_query', function($posts, $wp_query) use ($start_time) { $end_time = microtime(true); $execution_time = ($end_time - $start_time) * 1000; if ($execution_time > 100) { // 超过100毫秒视为慢查询 error_log(sprintf( "Slow query detected: %.2f ms - %s", $execution_time, $wp_query->request ?? 'N/A' )); } return $posts; }, 10, 2); } return $query; }); } /** * 数据库连接池模拟 */ public static function init_connection_pool() { static $connections = null; if ($connections === null) { $connections = []; $max_connections = 10; for ($i = 0; $i < $max_connections; $i++) { $connections[] = [ 'in_use' => false, 'last_used' => time() ]; } } return $connections; } } ?>
- <?php /** * WordPress供应链缓存管理器 */ class SupplyChain_Cache_Manager { private $cache_group = 'supply_chain'; private $cache_expiration = 3600; // 1小时
在当今快速变化的商业环境中,企业需要能够快速响应市场变化的柔性供应链系统。WordPress作为全球最流行的内容管理系统,其插件架构为供应链管理软件的开发提供了强大基础。然而,当我们将供应链系统扩展到分布式架构时,事务处理成为了一个关键挑战。
传统的单体应用事务处理在分布式环境中不再适用,因为供应链系统通常涉及多个独立的服务:库存管理、订单处理、物流跟踪、支付网关等。这些服务可能部署在不同的服务器上,甚至使用不同的数据库技术。本文将深入探讨在WordPress柔性供应链插件开发中实现分布式事务的完整解决方案。
分布式事务是指跨越多个网络节点、多个数据库或多个服务的事务操作。在供应链系统中,一个完整的订单处理可能涉及:
- 库存服务减少库存数量
- 订单服务创建订单记录
- 支付服务处理付款
- 物流服务生成发货单
所有这些操作必须作为一个原子单元执行:要么全部成功,要么全部回滚。
根据CAP定理,分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)中的两项。对于供应链系统,我们通常需要在保证分区容错性的前提下,在一致性和可用性之间做出权衡。
两阶段提交协议是经典的分布式事务解决方案,包含准备阶段和提交阶段。
<?php
/**
* WordPress供应链分布式事务管理器 - 两阶段提交实现
*/
class SupplyChain_2PC_Coordinator {
private $participants = [];
private $wpdb;
public function __construct() {
global $wpdb;
$this->wpdb = $wpdb;
}
/**
* 添加事务参与者
* @param string $service_name 服务名称
* @param string $prepare_url 准备阶段API端点
* @param string $commit_url 提交阶段API端点
* @param string $rollback_url 回滚API端点
*/
public function add_participant($service_name, $prepare_url, $commit_url, $rollback_url) {
$this->participants[$service_name] = [
'prepare_url' => $prepare_url,
'commit_url' => $commit_url,
'rollback_url' => $rollback_url,
'status' => 'init'
];
}
/**
* 执行分布式事务
* @param array $transaction_data 事务数据
* @return bool 事务是否成功
*/
public function execute_transaction($transaction_data) {
// 第一阶段:准备阶段
$prepare_results = $this->prepare_phase($transaction_data);
if (!$this->all_prepared($prepare_results)) {
$this->rollback_all($prepare_results);
return false;
}
// 第二阶段:提交阶段
$commit_results = $this->commit_phase();
return $this->all_committed($commit_results);
}
/**
* 准备阶段:询问所有参与者是否可以提交
*/
private function prepare_phase($data) {
$results = [];
foreach ($this->participants as $name => &$participant) {
$response = wp_remote_post($participant['prepare_url'], [
'timeout' => 30,
'body' => json_encode($data),
'headers' => ['Content-Type' => 'application/json']
]);
if (is_wp_error($response)) {
$participant['status'] = 'failed';
$results[$name] = false;
} else {
$body = json_decode(wp_remote_retrieve_body($response), true);
$participant['status'] = $body['can_commit'] ? 'prepared' : 'failed';
$results[$name] = $body['can_commit'];
$participant['transaction_id'] = $body['transaction_id'] ?? null;
}
}
return $results;
}
/**
* 提交阶段:通知所有参与者提交事务
*/
private function commit_phase() {
$results = [];
foreach ($this->participants as $name => &$participant) {
if ($participant['status'] !== 'prepared') {
continue;
}
$response = wp_remote_post($participant['commit_url'], [
'timeout' => 30,
'body' => json_encode([
'transaction_id' => $participant['transaction_id']
])
]);
$participant['status'] = is_wp_error($response) ? 'commit_failed' : 'committed';
$results[$name] = !is_wp_error($response);
}
return $results;
}
/**
* 回滚所有参与者
*/
private function rollback_all($prepare_results) {
foreach ($this->participants as $name => &$participant) {
if ($participant['status'] === 'prepared' && isset($participant['transaction_id'])) {
wp_remote_post($participant['rollback_url'], [
'timeout' => 15,
'body' => json_encode([
'transaction_id' => $participant['transaction_id']
])
]);
$participant['status'] = 'rolled_back';
}
}
}
private function all_prepared($results) {
return !in_array(false, $results, true);
}
private function all_committed($results) {
return !in_array(false, $results, true);
}
}
?>
<?php
/**
* WordPress订单处理服务 - 2PC参与者实现
*/
class OrderService_2PC_Participant {
/**
* 准备阶段处理
*/
public function prepare_order($request) {
$data = json_decode($request->get_body(), true);
// 生成临时事务ID
$transaction_id = 'order_' . wp_generate_uuid4();
// 在临时表中创建订单记录(未提交状态)
global $wpdb;
$wpdb->insert(
$wpdb->prefix . 'sc_temp_orders',
[
'transaction_id' => $transaction_id,
'order_data' => json_encode($data),
'status' => 'prepared',
'created_at' => current_time('mysql')
]
);
// 检查库存是否充足(模拟检查)
$inventory_adequate = $this->check_inventory($data['items']);
return [
'can_commit' => $inventory_adequate,
'transaction_id' => $transaction_id
];
}
/**
* 提交阶段处理
*/
public function commit_order($request) {
$data = json_decode($request->get_body(), true);
$transaction_id = $data['transaction_id'];
global $wpdb;
// 开始数据库事务
$wpdb->query('START TRANSACTION');
try {
// 从临时表获取订单数据
$temp_order = $wpdb->get_row(
$wpdb->prepare(
"SELECT * FROM {$wpdb->prefix}sc_temp_orders WHERE transaction_id = %s",
$transaction_id
)
);
if (!$temp_order) {
throw new Exception('Transaction not found');
}
$order_data = json_decode($temp_order->order_data, true);
// 创建正式订单
$order_id = $this->create_final_order($order_data);
// 更新库存
$this->update_inventory($order_data['items']);
// 删除临时记录
$wpdb->delete(
$wpdb->prefix . 'sc_temp_orders',
['transaction_id' => $transaction_id]
);
// 提交数据库事务
$wpdb->query('COMMIT');
return ['success' => true, 'order_id' => $order_id];
} catch (Exception $e) {
$wpdb->query('ROLLBACK');
return ['success' => false, 'error' => $e->getMessage()];
}
}
/**
* 回滚处理
*/
public function rollback_order($request) {
$data = json_decode($request->get_body(), true);
$transaction_id = $data['transaction_id'];
global $wpdb;
// 删除临时记录
$wpdb->delete(
$wpdb->prefix . 'sc_temp_orders',
['transaction_id' => $transaction_id]
);
return ['success' => true];
}
private function check_inventory($items) {
// 实现库存检查逻辑
return true; // 简化示例
}
private function create_final_order($order_data) {
// 实现订单创建逻辑
return 12345; // 返回订单ID
}
private function update_inventory($items) {
// 实现库存更新逻辑
}
}
// WordPress REST API端点注册
add_action('rest_api_init', function() {
$participant = new OrderService_2PC_Participant();
register_rest_route('supply-chain/v1', '/order/prepare', [
'methods' => 'POST',
'callback' => [$participant, 'prepare_order'],
'permission_callback' => '__return_true'
]);
register_rest_route('supply-chain/v1', '/order/commit', [
'methods' => 'POST',
'callback' => [$participant, 'commit_order'],
'permission_callback' => '__return_true'
]);
register_rest_route('supply-chain/v1', '/order/rollback', [
'methods' => 'POST',
'callback' => [$participant, 'rollback_order'],
'permission_callback' => '__return_true'
]);
});
?>
<?php
/**
* WordPress订单处理服务 - 2PC参与者实现
*/
class OrderService_2PC_Participant {
/**
* 准备阶段处理
*/
public function prepare_order($request) {
$data = json_decode($request->get_body(), true);
// 生成临时事务ID
$transaction_id = 'order_' . wp_generate_uuid4();
// 在临时表中创建订单记录(未提交状态)
global $wpdb;
$wpdb->insert(
$wpdb->prefix . 'sc_temp_orders',
[
'transaction_id' => $transaction_id,
'order_data' => json_encode($data),
'status' => 'prepared',
'created_at' => current_time('mysql')
]
);
// 检查库存是否充足(模拟检查)
$inventory_adequate = $this->check_inventory($data['items']);
return [
'can_commit' => $inventory_adequate,
'transaction_id' => $transaction_id
];
}
/**
* 提交阶段处理
*/
public function commit_order($request) {
$data = json_decode($request->get_body(), true);
$transaction_id = $data['transaction_id'];
global $wpdb;
// 开始数据库事务
$wpdb->query('START TRANSACTION');
try {
// 从临时表获取订单数据
$temp_order = $wpdb->get_row(
$wpdb->prepare(
"SELECT * FROM {$wpdb->prefix}sc_temp_orders WHERE transaction_id = %s",
$transaction_id
)
);
if (!$temp_order) {
throw new Exception('Transaction not found');
}
$order_data = json_decode($temp_order->order_data, true);
// 创建正式订单
$order_id = $this->create_final_order($order_data);
// 更新库存
$this->update_inventory($order_data['items']);
// 删除临时记录
$wpdb->delete(
$wpdb->prefix . 'sc_temp_orders',
['transaction_id' => $transaction_id]
);
// 提交数据库事务
$wpdb->query('COMMIT');
return ['success' => true, 'order_id' => $order_id];
} catch (Exception $e) {
$wpdb->query('ROLLBACK');
return ['success' => false, 'error' => $e->getMessage()];
}
}
/**
* 回滚处理
*/
public function rollback_order($request) {
$data = json_decode($request->get_body(), true);
$transaction_id = $data['transaction_id'];
global $wpdb;
// 删除临时记录
$wpdb->delete(
$wpdb->prefix . 'sc_temp_orders',
['transaction_id' => $transaction_id]
);
return ['success' => true];
}
private function check_inventory($items) {
// 实现库存检查逻辑
return true; // 简化示例
}
private function create_final_order($order_data) {
// 实现订单创建逻辑
return 12345; // 返回订单ID
}
private function update_inventory($items) {
// 实现库存更新逻辑
}
}
// WordPress REST API端点注册
add_action('rest_api_init', function() {
$participant = new OrderService_2PC_Participant();
register_rest_route('supply-chain/v1', '/order/prepare', [
'methods' => 'POST',
'callback' => [$participant, 'prepare_order'],
'permission_callback' => '__return_true'
]);
register_rest_route('supply-chain/v1', '/order/commit', [
'methods' => 'POST',
'callback' => [$participant, 'commit_order'],
'permission_callback' => '__return_true'
]);
register_rest_route('supply-chain/v1', '/order/rollback', [
'methods' => 'POST',
'callback' => [$participant, 'rollback_order'],
'permission_callback' => '__return_true'
]);
});
?>
Saga模式通过一系列本地事务和补偿操作来管理分布式事务。每个服务执行自己的本地事务,如果某个步骤失败,则执行反向补偿操作。
<?php
/**
* WordPress供应链Saga事务协调器
*/
class SupplyChain_Saga_Orchestrator {
private $steps = [];
private $compensations = [];
private $current_step = 0;
/**
* 添加Saga步骤
* @param string $name 步骤名称
* @param callable $action 执行函数
* @param callable $compensation 补偿函数
*/
public function add_step($name, $action, $compensation) {
$this->steps[] = [
'name' => $name,
'action' => $action
];
$this->compensations[$name] = $compensation;
}
/**
* 执行Saga事务
* @return array 执行结果
*/
public function execute() {
$executed_steps = [];
$results = [];
try {
foreach ($this->steps as $index => $step) {
$this->current_step = $index;
// 执行当前步骤
$result = call_user_func($step['action']);
if ($result['success'] !== true) {
throw new Exception("Step {$step['name']} failed: " . ($result['error'] ?? 'Unknown error'));
}
$executed_steps[] = $step['name'];
$results[$step['name']] = $result;
// 记录执行日志
$this->log_step($step['name'], 'executed', $result);
}
// 所有步骤成功完成
$this->log_saga_complete();
return [
'success' => true,
'results' => $results,
'message' => 'Saga transaction completed successfully'
];
} catch (Exception $e) {
// 执行补偿操作
$compensation_results = $this->compensate($executed_steps);
return [
'success' => false,
'error' => $e->getMessage(),
'compensation_results' => $compensation_results,
'failed_step' => $this->current_step
];
}
}
/**
* 执行补偿操作
*/
private function compensate($executed_steps) {
$results = [];
// 按相反顺序执行补偿
$reversed_steps = array_reverse($executed_steps);
foreach ($reversed_steps as $step_name) {
if (isset($this->compensations[$step_name])) {
$comp_result = call_user_func($this->compensations[$step_name]);
$results[$step_name] = $comp_result;
$this->log_step($step_name, 'compensated', $comp_result);
}
}
return $results;
}
private function log_step($step_name, $status, $data) {
global $wpdb;
$wpdb->insert(
$wpdb->prefix . 'sc_saga_logs',
[
'step_name' => $step_name,
'status' => $status,
'data' => json_encode($data),
'created_at' => current_time('mysql')
]
);
}
private function log_saga_complete() {
global $wpdb;
$wpdb->insert(
$wpdb->prefix . 'sc_saga_logs',
[
'step_name' => 'SAGA_COMPLETE',
'status' => 'completed',
'data' => json_encode(['message' => 'All steps completed successfully']),
'created_at' => current_time('mysql')
]
);
}
}
?>
<?php
/**
* WordPress订单处理Saga示例
*/
class OrderProcessingSaga {
public static function create_order_saga($order_data) {
$saga = new SupplyChain_Saga_Orchestrator();
// 步骤1: 验证订单
$saga->add_step(
'validate_order',
function() use ($order_data) {
return self::validate_order($order_data);
},
function() use ($order_data) {
return self::compensate_validation($order_data);
}
);
// 步骤2: 预留库存
$saga->add_step(
'reserve_inventory',
function() use ($order_data) {
return self::reserve_inventory($order_data['items']);
},
function() use ($order_data) {
return self::release_inventory($order_data['items']);
}
);
// 步骤3: 处理支付
$saga->add_step(
'process_payment',
function() use ($order_data) {
return self::process_payment($order_data['payment']);
},
function() use ($order_data) {
return self::refund_payment($order_data['payment']);
}
);
// 步骤4: 创建物流单
$saga->add_step(
'create_shipment',
function() use ($order_data) {
return self::create_shipment($order_data);
},
function() use ($order_data) {
return self::cancel_shipment($order_data);
}
);
// 步骤5: 发送确认通知
$saga->add_step(
'send_confirmation',
function() use ($order_data) {
return self::send_order_confirmation($order_data['customer_email']);
},
function() use ($order_data) {
// 通知步骤通常不需要补偿
return ['success' => true, 'message' => 'No compensation needed for notification'];
}
);
return $saga->execute();
}
private static function validate_order($order_data) {
// 实现订单验证逻辑
if (empty($order_data['items'])) {
return ['success' => false, 'error' => 'Order has no items'];
}
return ['success' => true, 'validated' => true];
}
private static function compensate_validation($order_data) {
// 验证步骤的补偿操作(通常不需要实际操作)
return ['success' => true, 'message' => 'Validation compensation completed'];
}
private static function reserve_inventory($items) {
global $wpdb;
try {
$wpdb->query('START TRANSACTION');
foreach ($items as $item) {
$wpdb->query(
$wpdb->prepare(
"UPDATE {$wpdb->prefix}sc_inventory
SET reserved = reserved + %d
WHERE product_id = %d AND quantity >= (reserved + %d)",
$item['quantity'],
$item['product_id'],
$item['quantity']
)
);
if ($wpdb->rows_affected === 0) {
throw new Exception("Insufficient inventory for product {$item['product_id']}");
}
}
$wpdb->query('COMMIT');
return ['success' => true, 'reserved' => true];
} catch (Exception $e) {
$wpdb->query('ROLLBACK');
return ['success' => false, 'error' => $e->getMessage()];
}
}
private static function release_inventory($items) {
global $wpdb;
$wpdb->query('START TRANSACTION');
foreach ($items as $item) {
$wpdb->query(
$wpdb->prepare(
"UPDATE {$wpdb->prefix}sc_inventory
SET reserved = GREATEST(0, reserved - %d)
WHERE product_id = %d",
$item['quantity'],
$item['product_id']
)
);
}
$wpdb->query('COMMIT');
return ['success' => true, 'released' => true];
}
private static function process_payment($payment_data) {
// 集成支付网关处理
// 这里使用模拟支付处理
if (!$payment_success) {
return ['success' => false, 'error' => 'Payment processing failed'];
}
return ['success' => true, 'transaction_id' => 'pay_' . wp_generate_uuid4()];
}
private static function refund_payment($payment_data) {
// 实现支付退款逻辑
return ['success' => true, 'refunded' => true];
}
private static function create_shipment($order_data) {
// 集成物流服务API
$shipment_id = 'ship_' . wp_generate_uuid4();
// 记录发货信息到数据库
global $wpdb;
$wpdb->insert(
$wpdb->prefix . 'sc_shipments',
[
'order_id' => $order_data['order_id'] ?? 0,
'shipment_id' => $shipment_id,
'status' => 'created',
'created_at' => current_time('mysql')
]
);
return ['success' => true, 'shipment_id' => $shipment_id];
}
private static function cancel_shipment($order_data) {
// 取消物流单逻辑
global $wpdb;
$wpdb->update(
$wpdb->prefix . 'sc_shipments',
['status' => 'cancelled'],
['order_id' => $order_data['order_id'] ?? 0]
);
return ['success' => true, 'cancelled' => true];
}
private static function send_order_confirmation($email) {
// 发送订单确认邮件
$subject = '您的订单已确认';
$message = '感谢您的订购,我们正在处理您的订单。';
$sent = wp_mail($email, $subject, $message);
return ['success' => $sent, 'email_sent' => $sent];
}
}
// WordPress短代码集成Saga订单处理
add_shortcode('process_order_saga', function($atts) {
$atts = shortcode_atts([
'order_id' => 0
], $atts);
// 获取订单数据
$order_data = self::get_order_data($atts['order_id']);
if (empty($order_data)) {
return '<div class="error">订单不存在</div>';
}
// 执行Saga事务
$result = OrderProcessingSaga::create_order_saga($order_data);
if ($result['success']) {
return '<div class="success">订单处理成功!</div>';
} else {
return '<div class="error">订单处理失败: ' . esc_html($result['error']) . '</div>';
}
});
?>
<?php
/**
* WordPress订单处理Saga示例
*/
class OrderProcessingSaga {
public static function create_order_saga($order_data) {
$saga = new SupplyChain_Saga_Orchestrator();
// 步骤1: 验证订单
$saga->add_step(
'validate_order',
function() use ($order_data) {
return self::validate_order($order_data);
},
function() use ($order_data) {
return self::compensate_validation($order_data);
}
);
// 步骤2: 预留库存
$saga->add_step(
'reserve_inventory',
function() use ($order_data) {
return self::reserve_inventory($order_data['items']);
},
function() use ($order_data) {
return self::release_inventory($order_data['items']);
}
);
// 步骤3: 处理支付
$saga->add_step(
'process_payment',
function() use ($order_data) {
return self::process_payment($order_data['payment']);
},
function() use ($order_data) {
return self::refund_payment($order_data['payment']);
}
);
// 步骤4: 创建物流单
$saga->add_step(
'create_shipment',
function() use ($order_data) {
return self::create_shipment($order_data);
},
function() use ($order_data) {
return self::cancel_shipment($order_data);
}
);
// 步骤5: 发送确认通知
$saga->add_step(
'send_confirmation',
function() use ($order_data) {
return self::send_order_confirmation($order_data['customer_email']);
},
function() use ($order_data) {
// 通知步骤通常不需要补偿
return ['success' => true, 'message' => 'No compensation needed for notification'];
}
);
return $saga->execute();
}
private static function validate_order($order_data) {
// 实现订单验证逻辑
if (empty($order_data['items'])) {
return ['success' => false, 'error' => 'Order has no items'];
}
return ['success' => true, 'validated' => true];
}
private static function compensate_validation($order_data) {
// 验证步骤的补偿操作(通常不需要实际操作)
return ['success' => true, 'message' => 'Validation compensation completed'];
}
private static function reserve_inventory($items) {
global $wpdb;
try {
$wpdb->query('START TRANSACTION');
foreach ($items as $item) {
$wpdb->query(
$wpdb->prepare(
"UPDATE {$wpdb->prefix}sc_inventory
SET reserved = reserved + %d
WHERE product_id = %d AND quantity >= (reserved + %d)",
$item['quantity'],
$item['product_id'],
$item['quantity']
)
);
if ($wpdb->rows_affected === 0) {
throw new Exception("Insufficient inventory for product {$item['product_id']}");
}
}
$wpdb->query('COMMIT');
return ['success' => true, 'reserved' => true];
} catch (Exception $e) {
$wpdb->query('ROLLBACK');
return ['success' => false, 'error' => $e->getMessage()];
}
}
private static function release_inventory($items) {
global $wpdb;
$wpdb->query('START TRANSACTION');
foreach ($items as $item) {
$wpdb->query(
$wpdb->prepare(
"UPDATE {$wpdb->prefix}sc_inventory
SET reserved = GREATEST(0, reserved - %d)
WHERE product_id = %d",
$item['quantity'],
$item['product_id']
)
);
}
$wpdb->query('COMMIT');
return ['success' => true, 'released' => true];
}
private static function process_payment($payment_data) {
// 集成支付网关处理
// 这里使用模拟支付处理
if (!$payment_success) {
return ['success' => false, 'error' => 'Payment processing failed'];
}
return ['success' => true, 'transaction_id' => 'pay_' . wp_generate_uuid4()];
}
private static function refund_payment($payment_data) {
// 实现支付退款逻辑
return ['success' => true, 'refunded' => true];
}
private static function create_shipment($order_data) {
// 集成物流服务API
$shipment_id = 'ship_' . wp_generate_uuid4();
// 记录发货信息到数据库
global $wpdb;
$wpdb->insert(
$wpdb->prefix . 'sc_shipments',
[
'order_id' => $order_data['order_id'] ?? 0,
'shipment_id' => $shipment_id,
'status' => 'created',
'created_at' => current_time('mysql')
]
);
return ['success' => true, 'shipment_id' => $shipment_id];
}
private static function cancel_shipment($order_data) {
// 取消物流单逻辑
global $wpdb;
$wpdb->update(
$wpdb->prefix . 'sc_shipments',
['status' => 'cancelled'],
['order_id' => $order_data['order_id'] ?? 0]
);
return ['success' => true, 'cancelled' => true];
}
private static function send_order_confirmation($email) {
// 发送订单确认邮件
$subject = '您的订单已确认';
$message = '感谢您的订购,我们正在处理您的订单。';
$sent = wp_mail($email, $subject, $message);
return ['success' => $sent, 'email_sent' => $sent];
}
}
// WordPress短代码集成Saga订单处理
add_shortcode('process_order_saga', function($atts) {
$atts = shortcode_atts([
'order_id' => 0
], $atts);
// 获取订单数据
$order_data = self::get_order_data($atts['order_id']);
if (empty($order_data)) {
return '<div class="error">订单不存在</div>';
}
// 执行Saga事务
$result = OrderProcessingSaga::create_order_saga($order_data);
if ($result['success']) {
return '<div class="success">订单处理成功!</div>';
} else {
return '<div class="error">订单处理失败: ' . esc_html($result['error']) . '</div>';
}
});
?>
<?php
/**
* WordPress供应链消息队列服务
*/
class SupplyChain_Message_Queue {
private $queue_table;
public function __construct() {
global $wpdb;
$this->queue_table = $wpdb->prefix . 'sc_message_queue';
$this->create_queue_table();
}
/**
* 创建消息队列表
*/
private function create_queue_table() {
global $wpdb;
$charset_collate = $wpdb->get_charset_collate();
$sql = "CREATE TABLE IF NOT EXISTS {$this->queue_table} (
id bigint(20) NOT NULL AUTO_INCREMENT,
message_type varchar(100) NOT NULL,
payload longtext NOT NULL,
status varchar(20) DEFAULT 'pending',
retry_count int(11) DEFAULT 0,
max_retries int(11) DEFAULT 3,
processed_at datetime DEFAULT NULL,
created_at datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
KEY status (status),
KEY message_type (message_type)
) $charset_collate;";
require_once(ABSPATH . 'wp-admin/includes/upgrade.php');
dbDelta($sql);
}
/**
* 发布消息到队列
*/
public function publish($message_type, $payload) {
global $wpdb;
return $wpdb->insert(
$this->queue_table,
[
'message_type' => $message_type,
'payload' => json_encode($payload),
'status' => 'pending'
]
);
}
/**
* 消费消息
*/
public function consume($message_type, $callback, $batch_size = 10) {
global $wpdb;
// 获取待处理消息
$messages = $wpdb->get_results(
$wpdb->prepare(
"SELECT * FROM {$this->queue_table}
WHERE message_type = %s AND status = 'pending'
ORDER BY created_at ASC
LIMIT %d",
$message_type,
$batch_size
)
);
$results = [];
foreach ($messages as $message) {
try {
// 标记为处理中
$wpdb->update(
$this->queue_table,
['status' => 'processing'],
['id' => $message->id]
);
// 执行回调函数
$payload = json_decode($message->payload, true);
$result = call_user_func($callback, $payload);
if ($result['success']) {
// 标记为成功
$wpdb->update(
$this->queue_table,
[
'status' => 'completed',
'processed_at' => current_time('mysql')
],
['id' => $message->id]
);
} else {
// 处理失败,重试或标记为失败
$this->handle_failure($message, $result['error']);
}
$results[] = [
'message_id' => $message->id,
'success' => $result['success'],
'result' => $result
];
} catch (Exception $e) {
$this->handle_failure($message, $e->getMessage());
$results[] = [
'message_id' => $message->id,
'success' => false,
'error' => $e->getMessage()
];
}
}
return $results;
}
/**
* 处理失败消息
*/
private function handle_failure($message, $error) {
global $wpdb;
$retry_count = $message->retry_count + 1;
$max_retries = $message->max_retries;
if ($retry_count >= $max_retries) {
// 超过最大重试次数,标记为失败
$wpdb->update(
$this->queue_table,
[
'status' => 'failed',
'retry_count' => $retry_count
],
['id' => $message->id]
);
// 记录错误日志
error_log("Message {$message->id} failed after {$max_retries} retries: {$error}");
} else {
// 重试
$wpdb->update(
$this->queue_table,
[
'status' => 'pending',
'retry_count' => $retry_count
],
['id' => $message->id]
);
}
}
/**
* 清理已完成的消息
*/
public function cleanup($days_old = 7) {
global $wpdb;
$cutoff_date = date('Y-m-d H:i:s', strtotime("-{$days_old} days"));
return $wpdb->query(
$wpdb->prepare(
"DELETE FROM {$this->queue_table}
WHERE status IN ('completed', 'failed')
AND created_at < %s",
$cutoff_date
)
);
}
}
/**
* 基于消息队列的订单处理服务
*/
class MessageQueue_Order_Service {
private $mq;
public function __construct() {
$this->mq = new SupplyChain_Message_Queue();
// 注册WordPress定时任务处理队列
add_action('sc_process_order_queue', [$this, 'process_order_queue']);
if (!wp_next_scheduled('sc_process_order_queue')) {
wp_schedule_event(time(), 'five_minutes', 'sc_process_order_queue');
}
}
/**
* 创建订单(异步方式)
*/
public function create_order_async($order_data) {
// 立即创建订单基础记录
$order_id = $this->create_order_base($order_data);
// 发布异步处理消息
$this->mq->publish('order_processing', [
'order_id' => $order_id,
'order_data' => $order_data,
'timestamp' => time()
]);
// 发布库存扣减消息
$this->mq->publish('inventory_update', [
'order_id' => $order_id,
'items' => $order_data['items'],
'action' => 'deduct',
'timestamp' => time()
]);
// 发布支付处理消息
$this->mq->publish('payment_processing', [
'order_id' => $order_id,
'payment_data' => $order_data['payment'],
'timestamp' => time()
]);
return [
'success' => true,
'order_id' => $order_id,
'message' => '订单已接收,正在异步处理'
];
}
/**
* 处理订单队列
*/
public function process_order_queue() {
// 处理订单消息
$this->mq->consume('order_processing', function($payload) {
return $this->process_order_message($payload);
});
// 处理库存消息
$this->mq->consume('inventory_update', function($payload) {
return $this->process_inventory_message($payload);
});
// 处理支付消息
$this->mq->consume('payment_processing', function($payload) {
return $this->process_payment_message($payload);
});
}
private function create_order_base($order_data) {
global $wpdb;
$wpdb->insert(
$wpdb->prefix . 'sc_orders',
[
'customer_id' => $order_data['customer_id'] ?? 0,
'total_amount' => $order_data['total_amount'] ?? 0,
'status' => 'processing',
'created_at' => current_time('mysql')
]
);
return $wpdb->insert_id;
}
private function process_order_message($payload) {
$order_id = $payload['order_id'];
// 更新订单状态
global $wpdb;
$wpdb->update(
$wpdb->prefix . 'sc_orders',
['status' => 'processed'],
['id' => $order_id]
);
return ['success' => true, 'order_id' => $order_id];
}
private function process_inventory_message($payload) {
global $wpdb;
try {
$wpdb->query('START TRANSACTION');
foreach ($payload['items'] as $item) {
if ($payload['action'] === 'deduct') {
// 扣减库存
$wpdb->query(
$wpdb->prepare(
"UPDATE {$wpdb->prefix}sc_inventory
SET quantity = quantity - %d
WHERE product_id = %d AND quantity >= %d",
$item['quantity'],
$item['product_id'],
$item['quantity']
)
);
} elseif ($payload['action'] === 'restore') {
// 恢复库存
$wpdb->query(
$wpdb->prepare(
"UPDATE {$wpdb->prefix}sc_inventory
SET quantity = quantity + %d
WHERE product_id = %d",
$item['quantity'],
$item['product_id']
)
);
}
if ($wpdb->rows_affected === 0) {
throw new Exception("Inventory update failed for product {$item['product_id']}");
}
}
$wpdb->query('COMMIT');
return ['success' => true];
} catch (Exception $e) {
$wpdb->query('ROLLBACK');
// 发布补偿消息
$this->mq->publish('order_compensation', [
'order_id' => $payload['order_id'],
'error' => $e->getMessage(),
'timestamp' => time()
]);
return ['success' => false, 'error' => $e->getMessage()];
}
}
private function process_payment_message($payload) {
// 调用支付网关API
$payment_result = $this->call_payment_gateway($payload['payment_data']);
if ($payment_result['success']) {
// 更新订单支付状态
global $wpdb;
$wpdb->update(
$wpdb->prefix . 'sc_orders',
[
'payment_status' => 'paid',
'payment_id' => $payment_result['transaction_id']
],
['id' => $payload['order_id']]
);
// 发布发货消息
$this->mq->publish('shipment_creation', [
'order_id' => $payload['order_id'],
'timestamp' => time()
]);
} else {
// 支付失败,发布补偿消息
$this->mq->publish('order_compensation', [
'order_id' => $payload['order_id'],
'error' => 'Payment failed',
'timestamp' => time()
]);
}
return $payment_result;
}
private function call_payment_gateway($payment_data) {
// 模拟支付网关调用
$success = rand(0, 100) > 10; // 90%成功率
return [
'success' => $success,
'transaction_id' => $success ? 'txn_' . wp_generate_uuid4() : null,
'message' => $success ? 'Payment successful' : 'Payment failed'
];
}
}
// 初始化消息队列服务
add_action('init', function() {
new MessageQueue_Order_Service();
});
// 注册自定义定时任务间隔
add_filter('cron_schedules', function($schedules) {
$schedules['five_minutes'] = [
'interval' => 300,
'display' => __('Every 5 Minutes')
];
return $schedules;
});
?>
<?php
/**
* WordPress供应链消息队列服务
*/
class SupplyChain_Message_Queue {
private $queue_table;
public function __construct() {
global $wpdb;
$this->queue_table = $wpdb->prefix . 'sc_message_queue';
$this->create_queue_table();
}
/**
* 创建消息队列表
*/
private function create_queue_table() {
global $wpdb;
$charset_collate = $wpdb->get_charset_collate();
$sql = "CREATE TABLE IF NOT EXISTS {$this->queue_table} (
id bigint(20) NOT NULL AUTO_INCREMENT,
message_type varchar(100) NOT NULL,
payload longtext NOT NULL,
status varchar(20) DEFAULT 'pending',
retry_count int(11) DEFAULT 0,
max_retries int(11) DEFAULT 3,
processed_at datetime DEFAULT NULL,
created_at datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
KEY status (status),
KEY message_type (message_type)
) $charset_collate;";
require_once(ABSPATH . 'wp-admin/includes/upgrade.php');
dbDelta($sql);
}
/**
* 发布消息到队列
*/
public function publish($message_type, $payload) {
global $wpdb;
return $wpdb->insert(
$this->queue_table,
[
'message_type' => $message_type,
'payload' => json_encode($payload),
'status' => 'pending'
]
);
}
/**
* 消费消息
*/
public function consume($message_type, $callback, $batch_size = 10) {
global $wpdb;
// 获取待处理消息
$messages = $wpdb->get_results(
$wpdb->prepare(
"SELECT * FROM {$this->queue_table}
WHERE message_type = %s AND status = 'pending'
ORDER BY created_at ASC
LIMIT %d",
$message_type,
$batch_size
)
);
$results = [];
foreach ($messages as $message) {
try {
// 标记为处理中
$wpdb->update(
$this->queue_table,
['status' => 'processing'],
['id' => $message->id]
);
// 执行回调函数
$payload = json_decode($message->payload, true);
$result = call_user_func($callback, $payload);
if ($result['success']) {
// 标记为成功
$wpdb->update(
$this->queue_table,
[
'status' => 'completed',
'processed_at' => current_time('mysql')
],
['id' => $message->id]
);
} else {
// 处理失败,重试或标记为失败
$this->handle_failure($message, $result['error']);
}
$results[] = [
'message_id' => $message->id,
'success' => $result['success'],
'result' => $result
];
} catch (Exception $e) {
$this->handle_failure($message, $e->getMessage());
$results[] = [
'message_id' => $message->id,
'success' => false,
'error' => $e->getMessage()
];
}
}
return $results;
}
/**
* 处理失败消息
*/
private function handle_failure($message, $error) {
global $wpdb;
$retry_count = $message->retry_count + 1;
$max_retries = $message->max_retries;
if ($retry_count >= $max_retries) {
// 超过最大重试次数,标记为失败
$wpdb->update(
$this->queue_table,
[
'status' => 'failed',
'retry_count' => $retry_count
],
['id' => $message->id]
);
// 记录错误日志
error_log("Message {$message->id} failed after {$max_retries} retries: {$error}");
} else {
// 重试
$wpdb->update(
$this->queue_table,
[
'status' => 'pending',
'retry_count' => $retry_count
],
['id' => $message->id]
);
}
}
/**
* 清理已完成的消息
*/
public function cleanup($days_old = 7) {
global $wpdb;
$cutoff_date = date('Y-m-d H:i:s', strtotime("-{$days_old} days"));
return $wpdb->query(
$wpdb->prepare(
"DELETE FROM {$this->queue_table}
WHERE status IN ('completed', 'failed')
AND created_at < %s",
$cutoff_date
)
);
}
}
/**
* 基于消息队列的订单处理服务
*/
class MessageQueue_Order_Service {
private $mq;
public function __construct() {
$this->mq = new SupplyChain_Message_Queue();
// 注册WordPress定时任务处理队列
add_action('sc_process_order_queue', [$this, 'process_order_queue']);
if (!wp_next_scheduled('sc_process_order_queue')) {
wp_schedule_event(time(), 'five_minutes', 'sc_process_order_queue');
}
}
/**
* 创建订单(异步方式)
*/
public function create_order_async($order_data) {
// 立即创建订单基础记录
$order_id = $this->create_order_base($order_data);
// 发布异步处理消息
$this->mq->publish('order_processing', [
'order_id' => $order_id,
'order_data' => $order_data,
'timestamp' => time()
]);
// 发布库存扣减消息
$this->mq->publish('inventory_update', [
'order_id' => $order_id,
'items' => $order_data['items'],
'action' => 'deduct',
'timestamp' => time()
]);
// 发布支付处理消息
$this->mq->publish('payment_processing', [
'order_id' => $order_id,
'payment_data' => $order_data['payment'],
'timestamp' => time()
]);
return [
'success' => true,
'order_id' => $order_id,
'message' => '订单已接收,正在异步处理'
];
}
/**
* 处理订单队列
*/
public function process_order_queue() {
// 处理订单消息
$this->mq->consume('order_processing', function($payload) {
return $this->process_order_message($payload);
});
// 处理库存消息
$this->mq->consume('inventory_update', function($payload) {
return $this->process_inventory_message($payload);
});
// 处理支付消息
$this->mq->consume('payment_processing', function($payload) {
return $this->process_payment_message($payload);
});
}
private function create_order_base($order_data) {
global $wpdb;
$wpdb->insert(
$wpdb->prefix . 'sc_orders',
[
'customer_id' => $order_data['customer_id'] ?? 0,
'total_amount' => $order_data['total_amount'] ?? 0,
'status' => 'processing',
'created_at' => current_time('mysql')
]
);
return $wpdb->insert_id;
}
private function process_order_message($payload) {
$order_id = $payload['order_id'];
// 更新订单状态
global $wpdb;
$wpdb->update(
$wpdb->prefix . 'sc_orders',
['status' => 'processed'],
['id' => $order_id]
);
return ['success' => true, 'order_id' => $order_id];
}
private function process_inventory_message($payload) {
global $wpdb;
try {
$wpdb->query('START TRANSACTION');
foreach ($payload['items'] as $item) {
if ($payload['action'] === 'deduct') {
// 扣减库存
$wpdb->query(
$wpdb->prepare(
"UPDATE {$wpdb->prefix}sc_inventory
SET quantity = quantity - %d
WHERE product_id = %d AND quantity >= %d",
$item['quantity'],
$item['product_id'],
$item['quantity']
)
);
} elseif ($payload['action'] === 'restore') {
// 恢复库存
$wpdb->query(
$wpdb->prepare(
"UPDATE {$wpdb->prefix}sc_inventory
SET quantity = quantity + %d
WHERE product_id = %d",
$item['quantity'],
$item['product_id']
)
);
}
if ($wpdb->rows_affected === 0) {
throw new Exception("Inventory update failed for product {$item['product_id']}");
}
}
$wpdb->query('COMMIT');
return ['success' => true];
} catch (Exception $e) {
$wpdb->query('ROLLBACK');
// 发布补偿消息
$this->mq->publish('order_compensation', [
'order_id' => $payload['order_id'],
'error' => $e->getMessage(),
'timestamp' => time()
]);
return ['success' => false, 'error' => $e->getMessage()];
}
}
private function process_payment_message($payload) {
// 调用支付网关API
$payment_result = $this->call_payment_gateway($payload['payment_data']);
if ($payment_result['success']) {
// 更新订单支付状态
global $wpdb;
$wpdb->update(
$wpdb->prefix . 'sc_orders',
[
'payment_status' => 'paid',
'payment_id' => $payment_result['transaction_id']
],
['id' => $payload['order_id']]
);
// 发布发货消息
$this->mq->publish('shipment_creation', [
'order_id' => $payload['order_id'],
'timestamp' => time()
]);
} else {
// 支付失败,发布补偿消息
$this->mq->publish('order_compensation', [
'order_id' => $payload['order_id'],
'error' => 'Payment failed',
'timestamp' => time()
]);
}
return $payment_result;
}
private function call_payment_gateway($payment_data) {
// 模拟支付网关调用
$success = rand(0, 100) > 10; // 90%成功率
return [
'success' => $success,
'transaction_id' => $success ? 'txn_' . wp_generate_uuid4() : null,
'message' => $success ? 'Payment successful' : 'Payment failed'
];
}
}
// 初始化消息队列服务
add_action('init', function() {
new MessageQueue_Order_Service();
});
// 注册自定义定时任务间隔
add_filter('cron_schedules', function($schedules) {
$schedules['five_minutes'] = [
'interval' => 300,
'display' => __('Every 5 Minutes')
];
return $schedules;
});
?>
<?php
/**
* WordPress供应链数据库优化
*/
class SupplyChain_DB_Optimizer {
/**
* 创建优化索引
*/
public static function create_optimized_indexes() {
global $wpdb;
$indexes = [
// 订单表索引
"CREATE INDEX idx_orders_status ON {$wpdb->prefix}sc_orders(status)",
"CREATE INDEX idx_orders_customer ON {$wpdb->prefix}sc_orders(customer_id, created_at)",
// 库存表索引
"CREATE INDEX idx_inventory_product ON {$wpdb->prefix}sc_inventory(product_id)",
"CREATE INDEX idx_inventory_quantity ON {$wpdb->prefix}sc_inventory(quantity, reserved)",
// 消息队列表索引
"CREATE INDEX idx_queue_processing ON {$wpdb->prefix}sc_message_queue(status, message_type, created_at)",
// Saga日志表索引
"CREATE INDEX idx_saga_steps ON {$wpdb->prefix}sc_saga_logs(step_name, status, created_at)",
];
foreach ($indexes as $sql) {
$wpdb->query($sql);
}
}
/**
* 分区大表
*/
public static function partition_large_tables() {
global $wpdb;
// 按月份分区订单表
$partition_sql = "
ALTER TABLE {$wpdb->prefix}sc_orders
PARTITION BY RANGE (YEAR(created_at) * 100 + MONTH(created_at)) (
PARTITION p202401 VALUES LESS THAN (202402),
PARTITION p202402 VALUES LESS THAN (202403),
PARTITION p202403 VALUES LESS THAN (202404),
PARTITION p_future VALUES LESS THAN MAXVALUE
)";
$wpdb->query($partition_sql);
}
/**
* 查询性能监控
*/
public static function monitor_query_performance() {
add_filter('query', function($query) {
if (defined('SAVEQUERIES') && SAVEQUERIES) {
$start_time = microtime(true);
// 记录慢查询
add_filter('posts_pre_query', function($posts, $wp_query) use ($start_time) {
$end_time = microtime(true);
$execution_time = ($end_time - $start_time) * 1000;
if ($execution_time > 100) { // 超过100毫秒视为慢查询
error_log(sprintf(
"Slow query detected: %.2f ms - %s",
$execution_time,
$wp_query->request ?? 'N/A'
));
}
return $posts;
}, 10, 2);
}
return $query;
});
}
/**
* 数据库连接池模拟
*/
public static function init_connection_pool() {
static $connections = null;
if ($connections === null) {
$connections = [];
$max_connections = 10;
for ($i = 0; $i < $max_connections; $i++) {
$connections[] = [
'in_use' => false,
'last_used' => time()
];
}
}
return $connections;
}
}
?>
<?php
/**
* WordPress供应链数据库优化
*/
class SupplyChain_DB_Optimizer {
/**
* 创建优化索引
*/
public static function create_optimized_indexes() {
global $wpdb;
$indexes = [
// 订单表索引
"CREATE INDEX idx_orders_status ON {$wpdb->prefix}sc_orders(status)",
"CREATE INDEX idx_orders_customer ON {$wpdb->prefix}sc_orders(customer_id, created_at)",
// 库存表索引
"CREATE INDEX idx_inventory_product ON {$wpdb->prefix}sc_inventory(product_id)",
"CREATE INDEX idx_inventory_quantity ON {$wpdb->prefix}sc_inventory(quantity, reserved)",
// 消息队列表索引
"CREATE INDEX idx_queue_processing ON {$wpdb->prefix}sc_message_queue(status, message_type, created_at)",
// Saga日志表索引
"CREATE INDEX idx_saga_steps ON {$wpdb->prefix}sc_saga_logs(step_name, status, created_at)",
];
foreach ($indexes as $sql) {
$wpdb->query($sql);
}
}
/**
* 分区大表
*/
public static function partition_large_tables() {
global $wpdb;
// 按月份分区订单表
$partition_sql = "
ALTER TABLE {$wpdb->prefix}sc_orders
PARTITION BY RANGE (YEAR(created_at) * 100 + MONTH(created_at)) (
PARTITION p202401 VALUES LESS THAN (202402),
PARTITION p202402 VALUES LESS THAN (202403),
PARTITION p202403 VALUES LESS THAN (202404),
PARTITION p_future VALUES LESS THAN MAXVALUE
)";
$wpdb->query($partition_sql);
}
/**
* 查询性能监控
*/
public static function monitor_query_performance() {
add_filter('query', function($query) {
if (defined('SAVEQUERIES') && SAVEQUERIES) {
$start_time = microtime(true);
// 记录慢查询
add_filter('posts_pre_query', function($posts, $wp_query) use ($start_time) {
$end_time = microtime(true);
$execution_time = ($end_time - $start_time) * 1000;
if ($execution_time > 100) { // 超过100毫秒视为慢查询
error_log(sprintf(
"Slow query detected: %.2f ms - %s",
$execution_time,
$wp_query->request ?? 'N/A'
));
}
return $posts;
}, 10, 2);
}
return $query;
});
}
/**
* 数据库连接池模拟
*/
public static function init_connection_pool() {
static $connections = null;
if ($connections === null) {
$connections = [];
$max_connections = 10;
for ($i = 0; $i < $max_connections; $i++) {
$connections[] = [
'in_use' => false,
'last_used' => time()
];
}
}
return $connections;
}
}
?>
<?php
/**
* WordPress供应链缓存管理器
*/
class SupplyChain_Cache_Manager {
private $cache_group = 'supply_chain';
private $cache_expiration = 3600; // 1小时
<?php
/**
* WordPress供应链缓存管理器
*/
class SupplyChain_Cache_Manager {
private $cache_group = 'supply_chain';
private $cache_expiration = 3600; // 1小时


