Merge pull request #23710 from eileenmcnaughton/civi_import
[civicrm-core.git] / Civi / Pipe / BasicPipeClient.php
1 <?php
2 /*
3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
5 | |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
10 */
11
12 namespace Civi\Pipe;
13
14 /**
15 * This is a thin/trivial client implementation that connects to Civi::pipe()
16 * and synchronously exchanges JSON messages.
17 *
18 * It is intended for use E2E testing.
19 *
20 * @code
21 * $rpc = new BasicPipeClient('drush ev \'civicrm_initialize(); Civi::pipe();\'');
22 * $rpc->call('login', ['contactId' => 202]);
23 * $contacts = $rpc->call('api4', ['Contact', 'get']);
24 * @endCode
25 *
26 * Failed method-calls will emit `JsonRpcMethodException`.
27 * Errors in protocol handling will emit `RuntimeExcpetion`.
28 */
29 class BasicPipeClient {
30
31 /**
32 * Maximum length of a requst
33 *
34 * @var int
35 */
36 private $bufferSize;
37
38 /**
39 * @var array
40 */
41 private $pipes;
42
43 /**
44 * @var resource|false|null
45 */
46 private $process;
47
48 /**
49 * @var array|null
50 */
51 private $welcome;
52
53 /**
54 * @param string|null $command
55 * The shell command to start the pipe. If given, auto-connect.
56 * If omitted, then you can call connect($command) later.
57 * Ex: `cv ev 'Civi::pipe();'`, `cv ev 'Civi::pipe("u");'`, `drush ev 'civicrm_initialize(); Civi::pipe("vt");'`
58 * @param int $bufferSize
59 */
60 public function __construct(?string $command = NULL, int $bufferSize = 32767) {
61 $this->bufferSize = $bufferSize;
62 if ($command) {
63 $this->connect($command);
64 }
65 }
66
67 public function __destruct() {
68 if ($this->process) {
69 $this->close();
70 }
71 }
72
73 /**
74 * Start a worker process.
75 *
76 * @param string $command
77 * The shell command to start the pipe.
78 * Ex: `cv ev 'Civi::pipe();'`, `cv ev 'Civi::pipe("u");'`, `drush ev 'civicrm_initialize(); Civi::pipe("vt");'`
79 * @return array
80 * Returns the header/welcome message for the connection.
81 */
82 public function connect(string $command): array {
83 if ($this->process) {
84 throw new \RuntimeException('Client error: Already connected');
85 }
86
87 $desc = [['pipe', 'r'], ['pipe', 'w'], ['pipe', 'a']];
88 $this->process = proc_open($command, $desc, $this->pipes);
89 if (!$this->process) {
90 throw new \RuntimeException("Client error: Failed to open process: $command");
91 }
92 $line = stream_get_line($this->pipes[1], $this->bufferSize, "\n");
93 $this->welcome = json_decode($line, TRUE);
94 if ($this->welcome === NULL || !isset($this->welcome['Civi::pipe'])) {
95 throw new \RuntimeException('Protocol error: Received malformed welcome');
96 }
97 return $this->welcome['Civi::pipe'];
98 }
99
100 public function close(): void {
101 proc_close($this->process);
102 $this->pipes = NULL;
103 $this->process = NULL;
104 }
105
106 /**
107 * Call a method and return the result.
108 *
109 * @param string $method
110 * @param array $params
111 * @param string|int|null $id
112 * @return array{result: array, error: array, jsonrpc: string, id: string|int|null}
113 * The JSON-RPC response recrd. Contains `result` or `error`.
114 */
115 public function call(string $method, array $params, $id = NULL): array {
116 if (!$this->process) {
117 throw new \RuntimeException('Client error: Connection was not been opened yet.');
118 }
119
120 $requestLine = json_encode(['jsonrpc' => '2.0', 'method' => $method, 'params' => $params, 'id' => $id]);
121 fwrite($this->pipes[0], $requestLine . "\n");
122 $responseLine = stream_get_line($this->pipes[1], $this->bufferSize, "\n");
123 $decode = json_decode($responseLine, TRUE);
124 if (!isset($decode['jsonrpc']) || $decode['jsonrpc'] !== '2.0') {
125 throw new \RuntimeException('Protocol error: Response lacks JSON-RPC header.');
126 }
127 if (!array_key_exists('id', $decode) || $decode['id'] !== $id) {
128 throw new \RuntimeException('Protocol error: Received response for wrong request.');
129 }
130
131 if (array_key_exists('error', $decode) && !array_key_exists('result', $decode)) {
132 throw new JsonRpcMethodException($decode);
133 }
134 if (array_key_exists('result', $decode) && !array_key_exists('error', $decode)) {
135 return $decode['result'];
136 }
137 throw new \RuntimeException("Protocol error: Response must include 'result' xor 'error'.");
138 }
139
140 /**
141 * @param int $bufferSize
142 * @return $this
143 */
144 public function setBufferSize(int $bufferSize) {
145 $this->bufferSize = $bufferSize;
146 if ($this->process) {
147 $this->call('options', ['bufferSize' => $bufferSize]);
148 }
149 return $this;
150 }
151
152 /**
153 * @return int
154 */
155 public function getBufferSize(): int {
156 return $this->bufferSize;
157 }
158
159 /**
160 * @return array|NULL
161 */
162 public function getWelcome(): ?array {
163 return $this->welcome['Civi::pipe'] ?? NULL;
164 }
165
166 }