1 package org.apache.helix.webapp.resources;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.net.InetAddress;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.UUID;
28
29 import org.apache.helix.HelixDataAccessor;
30 import org.apache.helix.HelixException;
31 import org.apache.helix.InstanceType;
32 import org.apache.helix.PropertyPathConfig;
33 import org.apache.helix.PropertyType;
34 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
35 import org.apache.helix.manager.zk.ZkClient;
36 import org.apache.helix.model.LiveInstance;
37 import org.apache.helix.model.Message;
38 import org.apache.helix.model.Message.MessageType;
39 import org.apache.helix.tools.ClusterSetup;
40 import org.apache.helix.webapp.RestAdminApplication;
41 import org.apache.log4j.Logger;
42 import org.codehaus.jackson.JsonGenerationException;
43 import org.codehaus.jackson.map.JsonMappingException;
44 import org.restlet.Context;
45 import org.restlet.data.Form;
46 import org.restlet.data.MediaType;
47 import org.restlet.data.Request;
48 import org.restlet.data.Response;
49 import org.restlet.data.Status;
50 import org.restlet.resource.Representation;
51 import org.restlet.resource.Resource;
52 import org.restlet.resource.StringRepresentation;
53 import org.restlet.resource.Variant;
54
55
56
57
58
59
60 public class SchedulerTasksResource extends Resource
61 {
62 private final static Logger LOG = Logger.getLogger(SchedulerTasksResource.class);
63
64 public static String CRITERIA = "Criteria";
65 public static String MESSAGETEMPLATE = "MessageTemplate";
66 public static String TASKQUEUENAME = "TaskQueueName";
67 public SchedulerTasksResource(Context context,
68 Request request,
69 Response response)
70 {
71 super(context, request, response);
72 getVariants().add(new Variant(MediaType.TEXT_PLAIN));
73 getVariants().add(new Variant(MediaType.APPLICATION_JSON));
74 }
75
76 @Override
77 public boolean allowGet()
78 {
79 return true;
80 }
81
82 @Override
83 public boolean allowPost()
84 {
85 return true;
86 }
87
88 @Override
89 public boolean allowPut()
90 {
91 return false;
92 }
93
94 @Override
95 public boolean allowDelete()
96 {
97 return false;
98 }
99
100 @Override
101 public Representation represent(Variant variant)
102 {
103 StringRepresentation presentation = null;
104 try
105 {
106 presentation = getSchedulerTasksRepresentation();
107 }
108
109 catch(Exception e)
110 {
111 String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
112 presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
113
114 LOG.error("", e);
115 }
116 return presentation;
117 }
118
119 StringRepresentation getSchedulerTasksRepresentation() throws JsonGenerationException, JsonMappingException, IOException
120 {
121 String clusterName = (String)getRequest().getAttributes().get("clusterName");
122 String instanceName = (String)getRequest().getAttributes().get("instanceName");
123 ZkClient zkClient = (ZkClient)getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
124 ClusterSetup setupTool = new ClusterSetup(zkClient);
125 List<String> instances = setupTool.getClusterManagementTool().getInstancesInCluster(clusterName);
126
127 HelixDataAccessor accessor = ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
128 LiveInstance liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance(instanceName));
129 String sessionId = liveInstance.getSessionId();
130
131 StringRepresentation representation = new StringRepresentation("");
132
133 return representation;
134 }
135
136 @Override
137 public void acceptRepresentation(Representation entity)
138 {
139 try
140 {
141 String clusterName = (String)getRequest().getAttributes().get("clusterName");
142 Form form = new Form(entity);
143 ZkClient zkClient = (ZkClient)getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
144
145 String msgTemplateString = ClusterRepresentationUtil.getFormJsonParameterString(form, MESSAGETEMPLATE);
146 if(msgTemplateString == null)
147 {
148 throw new HelixException("SchedulerTasksResource need to have MessageTemplate specified.");
149 }
150 Map<String, String> messageTemplate = ClusterRepresentationUtil.getFormJsonParameters(form, MESSAGETEMPLATE);
151
152 String criteriaString = ClusterRepresentationUtil.getFormJsonParameterString(form, CRITERIA);
153 if(criteriaString == null)
154 {
155 throw new HelixException("SchedulerTasksResource need to have Criteria specified.");
156 }
157 HelixDataAccessor accessor = ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
158 LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
159 if(leader == null)
160 {
161 throw new HelixException("There is no leader for the cluster " + clusterName);
162 }
163
164 Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG, UUID.randomUUID().toString());
165 schedulerMessage.getRecord().getSimpleFields().put(CRITERIA, criteriaString);
166
167 schedulerMessage.getRecord().getMapFields().put(MESSAGETEMPLATE, messageTemplate);
168
169 schedulerMessage.setTgtSessionId(leader.getSessionId());
170 schedulerMessage.setTgtName("CONTROLLER");
171 schedulerMessage.setSrcInstanceType(InstanceType.CONTROLLER);
172 String taskQueueName = ClusterRepresentationUtil.getFormJsonParameterString(form, TASKQUEUENAME);
173 if(taskQueueName != null && taskQueueName.length() > 0)
174 {
175 schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, taskQueueName);
176 }
177 accessor.setProperty(accessor.keyBuilder().controllerMessage(schedulerMessage.getMsgId()), schedulerMessage);
178
179 Map<String, String> resultMap = new HashMap<String, String>();
180 resultMap.put("StatusUpdatePath", PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName, MessageType.SCHEDULER_MSG.toString(),schedulerMessage.getMsgId()));
181 resultMap.put("MessageType", Message.MessageType.SCHEDULER_MSG.toString());
182 resultMap.put("MsgId", schedulerMessage.getMsgId());
183
184
185 String ipAddress = InetAddress.getLocalHost().getCanonicalHostName();
186 String url = "http://" + ipAddress+":" + getContext().getAttributes().get(RestAdminApplication.PORT)
187 + "/clusters/" + clusterName+"/Controller/statusUpdates/SCHEDULER_MSG/" + schedulerMessage.getMsgId();
188 resultMap.put("statusUpdateUrl", url);
189
190 getResponse().setEntity(ClusterRepresentationUtil.ObjectToJson(resultMap), MediaType.APPLICATION_JSON);
191 getResponse().setStatus(Status.SUCCESS_OK);
192 }
193 catch(Exception e)
194 {
195 getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
196 MediaType.APPLICATION_JSON);
197 getResponse().setStatus(Status.SUCCESS_OK);
198 LOG.error("", e);
199 }
200 }
201 }